##// END OF EJS Templates
fixed promises cancellation
cin -
r149:eb793fbbe4ea v2
parent child
Show More
@@ -746,33 +746,31 namespace Implab.Test {
746 746 public void ChainedCancel2Test() {
747 747 // при отмене цепочки обещаний, вложенные операции также должны отменяться
748 748 var pSurvive = new Promise<bool>();
749 var hemStarted = new ManualResetEvent(false);
749 var hemStarted = new Signal();
750 750 var p = PromiseHelper
751 751 .Sleep(1, "Hi, HAL!")
752 .Chain(x => {
752 .Chain(() => {
753 753 hemStarted.Set();
754 754 // запускаем две асинхронные операции
755 755 var result = PromiseHelper
756 .Sleep(100000000, "HEM ENABLED!!!")
757 .Then(s => {
758 pSurvive.Resolve(false);
759 return s;
760 });
756 .Sleep(2000, "HEM ENABLED!!!")
757 .Then(() => pSurvive.Resolve(false));
761 758
762 759 result
763 760 .On(() => pSurvive.Resolve(true), PromiseEventType.Cancelled);
764
761
765 762 return result;
766 763 });
767 764
768 hemStarted.WaitOne();
765 hemStarted.Wait();
769 766 p.Cancel();
770 767
771 768 try {
772 769 p.Join();
770 Assert.Fail();
773 771 } catch (OperationCanceledException) {
774 Assert.IsTrue(pSurvive.Join());
775 772 }
773 Assert.IsTrue(pSurvive.Join());
776 774 }
777 775
778 776 [TestMethod]
@@ -48,7 +48,7 namespace Implab.Test {
48 48 bool run = false;
49 49 var task = new ActionTask(() => {
50 50 run = true;
51 }, null, null);
51 }, null, null, true);
52 52
53 53 // request cancelation
54 54 task.Cancel();
@@ -65,7 +65,7 namespace Implab.Test {
65 65 var task = new ActionTask(() => {
66 66 started.Set();
67 67 finish.Wait();
68 }, null, null);
68 }, null, null, true);
69 69
70 70 AsyncPool.RunThread(() => {
71 71 task.Resolve();
@@ -85,14 +85,18 namespace Implab.Test {
85 85
86 86 [TestMethod]
87 87 public void CancelTaskChainFromBottom() {
88 var started = new Signal();
88 89 var check1 = new Signal();
89 90 var requested = false;
90 91 var p1 = AsyncPool.RunThread(token => {
91 92 token.CancellationRequested(reason => requested = true);
93 started.Set();
92 94 check1.Wait();
93 95 token.CancelOperationIfRequested();
94 96 });
95 97
98 started.Wait();
99
96 100 var p2 = p1.Then(() => {
97 101 });
98 102
@@ -4,7 +4,8 using System.Threading;
4 4 namespace Implab.Test {
5 5 static class PromiseHelper {
6 6 public static IPromise<T> Sleep<T>(int timeout, T retVal) {
7 return AsyncPool.Invoke(() => {
7 return AsyncPool.Invoke((ct) => {
8 ct.CancellationRequested(ct.CancelOperation);
8 9 Thread.Sleep(timeout);
9 10 return retVal;
10 11 });
@@ -4,14 +4,16 namespace Implab {
4 4 public class ActionChainTask : ActionChainTaskBase, IDeferred {
5 5 readonly Func<IPromise> m_task;
6 6
7 public ActionChainTask(Func<IPromise> task, Func<Exception, IPromise> error, Func<Exception, IPromise> cancel) : base(error,cancel) {
7 public ActionChainTask(Func<IPromise> task, Func<Exception, IPromise> error, Func<Exception, IPromise> cancel, bool autoCancellable) : base(error,cancel, autoCancellable) {
8 8 m_task = task;
9 9 }
10 10
11 11 public void Resolve() {
12 12 if (m_task != null && LockCancelation()) {
13 13 try {
14 Observe(m_task());
14 var p = m_task();
15 p.On(SetResult, HandleErrorInternal, SetCancelled);
16 CancellationRequested(p.Cancel);
15 17 } catch(Exception err) {
16 18 HandleErrorInternal(err);
17 19 }
@@ -8,9 +8,11 namespace Implab {
8 8
9 9 int m_cancelationLock;
10 10
11 protected ActionChainTaskBase( Func<Exception, IPromise> error, Func<Exception, IPromise> cancel) {
11 protected ActionChainTaskBase(Func<Exception, IPromise> error, Func<Exception, IPromise> cancel, bool autoCancellable) {
12 12 m_error = error;
13 13 m_cancel = cancel;
14 if (autoCancellable)
15 CancellationRequested(CancelOperation);
14 16 }
15 17
16 18 public void Reject(Exception error) {
@@ -21,21 +23,26 namespace Implab {
21 23
22 24
23 25 public override void CancelOperation(Exception reason) {
24 if (m_cancel != null && LockCancelation()) {
25 try {
26 Observe(m_cancel(reason));
27 } catch(Exception err) {
28 HandleErrorInternal(err);
26 if (LockCancelation()) {
27 if (m_cancel != null) {
28 try {
29 m_cancel(reason).On(SetResult, SetError, SetCancelled);
30 } catch (Exception err) {
31 HandleErrorInternal(err);
32 }
33 } else {
34 SetCancelled(reason);
29 35 }
30 36 }
31
32 37 }
33 38
34 39 protected void HandleErrorInternal(Exception error) {
35 40 if (m_error != null) {
36 41 try {
37 Observe(m_error(error));
38 } catch(Exception err) {
42 var p = m_error(error);
43 p.On(SetResult,SetError,SetCancelled);
44 CancellationRequested(p.Cancel);
45 } catch (Exception err) {
39 46 SetError(err);
40 47 }
41 48 } else {
@@ -43,17 +50,6 namespace Implab {
43 50 }
44 51 }
45 52
46 protected void Observe(IPromise operation) {
47 if (operation == null)
48 throw new NullReferenceException("The task returned null promise");
49
50 // pass operation results to the current promise
51 operation.On(SetResult, SetError, SetCancelled);
52
53 // pass the cancelation request
54 CancellationRequested(operation.Cancel);
55 }
56
57 53 protected bool LockCancelation() {
58 54 return 0 == Interlocked.CompareExchange(ref m_cancelationLock, 1, 0);
59 55 }
@@ -4,14 +4,16 namespace Implab {
4 4 public class ActionChainTask<T> : ActionChainTaskBase, IDeferred<T> {
5 5 readonly Func<T, IPromise> m_task;
6 6
7 public ActionChainTask(Func<T, IPromise> task, Func<Exception, IPromise> error, Func<Exception, IPromise> cancel) : base(error,cancel) {
7 public ActionChainTask(Func<T, IPromise> task, Func<Exception, IPromise> error, Func<Exception, IPromise> cancel, bool autoCancellable) : base(error,cancel, autoCancellable) {
8 8 m_task = task;
9 9 }
10 10
11 11 public void Resolve(T value) {
12 12 if (m_task != null && LockCancelation()) {
13 13 try {
14 Observe(m_task(value));
14 var p = m_task(value);
15 p.On(SetResult, HandleErrorInternal, SetCancelled);
16 CancellationRequested(p.Cancel);
15 17 } catch(Exception err) {
16 18 HandleErrorInternal(err);
17 19 }
@@ -3,7 +3,7
3 3 namespace Implab {
4 4 public class ActionTask : ActionTaskBase, IDeferred {
5 5 readonly Action m_task;
6 public ActionTask(Action task, Action<Exception> error, Action<Exception> cancel) : base(error,cancel) {
6 public ActionTask(Action task, Action<Exception> error, Action<Exception> cancel, bool autoCancellable) : base(error,cancel, autoCancellable) {
7 7 m_task = task;
8 8 }
9 9
@@ -8,9 +8,11 namespace Implab {
8 8
9 9 int m_cancelationLock;
10 10
11 protected ActionTaskBase( Action<Exception> error, Action<Exception> cancel) {
11 protected ActionTaskBase( Action<Exception> error, Action<Exception> cancel, bool autoCancellable) {
12 12 m_error = error;
13 13 m_cancel = cancel;
14 if (autoCancellable)
15 CancellationRequested(CancelOperation);
14 16 }
15 17
16 18 public void Reject(Exception error) {
@@ -3,7 +3,7
3 3 namespace Implab {
4 4 public class ActionTask<T> : ActionTaskBase, IDeferred<T> {
5 5 readonly Action<T> m_task;
6 public ActionTask(Action<T> task, Action<Exception> error, Action<Exception> cancel) : base(error,cancel) {
6 public ActionTask(Action<T> task, Action<Exception> error, Action<Exception> cancel, bool autoCancellable) : base(error,cancel, autoCancellable) {
7 7 m_task = task;
8 8 }
9 9
@@ -4,14 +4,17 namespace Implab {
4 4 public class FuncChainTask<TResult> : FuncChainTaskBase<TResult>, IDeferred {
5 5 readonly Func<IPromise<TResult>> m_task;
6 6
7 public FuncChainTask(Func<IPromise<TResult>> task, Func<Exception, IPromise<TResult>> error, Func<Exception, IPromise<TResult>> cancel) : base(error, cancel){
7 public FuncChainTask(Func<IPromise<TResult>> task, Func<Exception, IPromise<TResult>> error, Func<Exception, IPromise<TResult>> cancel, bool autoCancellable)
8 : base(error, cancel, autoCancellable) {
8 9 m_task = task;
9 10 }
10 11
11 12 public void Resolve() {
12 13 if (m_task != null && LockCancelation()) {
13 14 try {
14 Observe(m_task());
15 var operation = m_task();
16 operation.On(SetResult, HandleErrorInternal, SetCancelled);
17 CancellationRequested(operation.Cancel);
15 18 } catch (Exception err) {
16 19 HandleErrorInternal(err);
17 20 }
@@ -8,9 +8,11 namespace Implab {
8 8
9 9 int m_cancelationLock;
10 10
11 protected FuncChainTaskBase( Func<Exception, IPromise<TResult>> error, Func<Exception, IPromise<TResult>> cancel) {
11 protected FuncChainTaskBase( Func<Exception, IPromise<TResult>> error, Func<Exception, IPromise<TResult>> cancel, bool autoCancellable) {
12 12 m_error = error;
13 13 m_cancel = cancel;
14 if (autoCancellable)
15 CancellationRequested(CancelOperation);
14 16 }
15 17
16 18 public void Reject(Exception error) {
@@ -19,11 +21,15 namespace Implab {
19 21 }
20 22
21 23 public override void CancelOperation(Exception reason) {
22 if (m_cancel != null && LockCancelation()) {
23 try {
24 Observe(m_cancel(reason));
25 } catch(Exception err) {
26 HandleErrorInternal(err);
24 if (LockCancelation()) {
25 if (m_cancel != null) {
26 try {
27 m_cancel(reason).On(SetResult, HandleErrorInternal, SetCancelled);
28 } catch (Exception err) {
29 HandleErrorInternal(err);
30 }
31 } else {
32 SetCancelled(reason);
27 33 }
28 34 }
29 35
@@ -32,7 +38,10 namespace Implab {
32 38 protected void HandleErrorInternal(Exception error) {
33 39 if (m_error != null) {
34 40 try {
35 Observe(m_error(error));
41 var operation = m_error(error);
42
43 operation.On(SetResult, SetError, SetCancelled);
44 CancellationRequested(operation.Cancel);
36 45 } catch(Exception err) {
37 46 SetError(err);
38 47 }
@@ -41,17 +50,6 namespace Implab {
41 50 }
42 51 }
43 52
44 protected void Observe(IPromise<TResult> operation) {
45 if (operation == null)
46 throw new NullReferenceException("The task returned null promise");
47
48 // pass operation results to the current promise
49 operation.On(SetResult, SetError, SetCancelled);
50
51 // pass the cancelation request
52 CancellationRequested(operation.Cancel);
53 }
54
55 53 protected bool LockCancelation() {
56 54 return 0 == Interlocked.CompareExchange(ref m_cancelationLock, 1, 0);
57 55 }
@@ -4,14 +4,16 namespace Implab {
4 4 public class FuncChainTask<TArg,TResult> : FuncChainTaskBase<TResult>, IDeferred<TArg> {
5 5 readonly Func<TArg, IPromise<TResult>> m_task;
6 6
7 public FuncChainTask(Func<TArg, IPromise<TResult>> task, Func<Exception, IPromise<TResult>> error, Func<Exception, IPromise<TResult>> cancel) : base(error, cancel){
7 public FuncChainTask(Func<TArg, IPromise<TResult>> task, Func<Exception, IPromise<TResult>> error, Func<Exception, IPromise<TResult>> cancel, bool autoCancellable) : base(error, cancel, autoCancellable){
8 8 m_task = task;
9 9 }
10 10
11 11 public void Resolve(TArg value) {
12 12 if (m_task != null && LockCancelation()) {
13 13 try {
14 Observe(m_task(value));
14 var operation = m_task(value);
15 operation.On(SetResult, HandleErrorInternal, SetCancelled);
16 CancellationRequested(operation.Cancel);
15 17 } catch (Exception err) {
16 18 HandleErrorInternal(err);
17 19 }
@@ -5,7 +5,7 namespace Implab {
5 5 public class FuncTask<T> : FuncTaskBase<T>, IDeferred {
6 6 readonly Func<T> m_task;
7 7
8 public FuncTask(Func<T> task, Func<Exception, T> error, Func<Exception, T> cancel) : base(error,cancel) {
8 public FuncTask(Func<T> task, Func<Exception, T> error, Func<Exception, T> cancel, bool autoCancellable) : base(error, cancel, autoCancellable) {
9 9 m_task = task;
10 10 }
11 11
@@ -8,9 +8,11 namespace Implab {
8 8
9 9 int m_cancelationLock;
10 10
11 protected FuncTaskBase( Func<Exception, TResult> error, Func<Exception, TResult> cancel) {
11 protected FuncTaskBase( Func<Exception, TResult> error, Func<Exception, TResult> cancel, bool autoCancellable) {
12 12 m_error = error;
13 13 m_cancel = cancel;
14 if (autoCancellable)
15 CancellationRequested(CancelOperation);
14 16 }
15 17
16 18 public void Reject(Exception error) {
@@ -4,7 +4,7 namespace Implab {
4 4 public class FuncTask<TArg, TResult> : FuncTaskBase<TResult>, IDeferred<TArg> {
5 5 readonly Func<TArg, TResult> m_task;
6 6
7 public FuncTask(Func<TArg, TResult> task, Func<Exception, TResult> error,Func<Exception, TResult> cancel) : base(error,cancel) {
7 public FuncTask(Func<TArg, TResult> task, Func<Exception, TResult> error,Func<Exception, TResult> cancel, bool autoCancellable) : base(error,cancel, autoCancellable) {
8 8 m_task = task;
9 9 }
10 10
@@ -30,7 +30,49 namespace Implab.Parallels {
30 30 InitPool();
31 31 }
32 32
33 public Promise<T> Invoke<T>(Func<T> task) {
33 public IPromise<T> Invoke<T>(Func<T> task) {
34 if (task == null)
35 throw new ArgumentNullException("task");
36 if (IsDisposed)
37 throw new ObjectDisposedException(ToString());
38
39 var promise = new FuncTask<T>(task, null, null, true);
40
41 var lop = TraceContext.Instance.CurrentOperation;
42
43 EnqueueTask(delegate {
44 TraceContext.Instance.EnterLogicalOperation(lop, false);
45
46 promise.Resolve();
47
48 TraceContext.Instance.Leave();
49 });
50
51 return promise;
52 }
53
54 public IPromise Invoke(Action task) {
55 if (task == null)
56 throw new ArgumentNullException("task");
57 if (IsDisposed)
58 throw new ObjectDisposedException(ToString());
59
60 var promise = new ActionTask(task, null, null, true);
61
62 var lop = TraceContext.Instance.CurrentOperation;
63
64 EnqueueTask(delegate {
65 TraceContext.Instance.EnterLogicalOperation(lop, false);
66
67 promise.Resolve();
68
69 TraceContext.Instance.Leave();
70 });
71
72 return promise;
73 }
74
75 public IPromise<T> Invoke<T>(Func<ICancellationToken, T> task) {
34 76 if (task == null)
35 77 throw new ArgumentNullException("task");
36 78 if (IsDisposed)
@@ -43,7 +85,35 namespace Implab.Parallels {
43 85 EnqueueTask(delegate {
44 86 TraceContext.Instance.EnterLogicalOperation(lop, false);
45 87 try {
46 promise.Resolve(task());
88 if (!promise.CancelOperationIfRequested())
89 promise.Resolve(task(promise));
90 } catch (Exception e) {
91 promise.Reject(e);
92 } finally {
93 TraceContext.Instance.Leave();
94 }
95 });
96
97 return promise;
98 }
99
100 public IPromise Invoke<T>(Action<ICancellationToken> task) {
101 if (task == null)
102 throw new ArgumentNullException("task");
103 if (IsDisposed)
104 throw new ObjectDisposedException(ToString());
105
106 var promise = new Promise();
107
108 var lop = TraceContext.Instance.CurrentOperation;
109
110 EnqueueTask(delegate {
111 TraceContext.Instance.EnterLogicalOperation(lop, false);
112 try {
113 if (!promise.CancelOperationIfRequested()) {
114 task(promise);
115 promise.Resolve();
116 }
47 117 } catch (Exception e) {
48 118 promise.Reject(e);
49 119 } finally {
@@ -178,7 +178,7 namespace Implab {
178 178 public static IPromise Then(this IPromise that, Action success, Action<Exception> error, Action<Exception> cancel) {
179 179 Safe.ArgumentNotNull(that, "that");
180 180
181 var d = new ActionTask(success, error, cancel);
181 var d = new ActionTask(success, error, cancel, false);
182 182 that.On(d.Resolve, d.Reject, d.CancelOperation);
183 183 if (success != null)
184 184 d.CancellationRequested(that.Cancel);
@@ -196,7 +196,7 namespace Implab {
196 196 public static IPromise<T> Then<T>(this IPromise that, Func<T> success, Func<Exception, T> error, Func<Exception, T> cancel) {
197 197 Safe.ArgumentNotNull(that, "that");
198 198
199 var d = new FuncTask<T>(success, error, cancel);
199 var d = new FuncTask<T>(success, error, cancel, false);
200 200 that.On(d.Resolve, d.Reject, d.CancelOperation);
201 201 if (success != null)
202 202 d.CancellationRequested(that.Cancel);
@@ -213,7 +213,7 namespace Implab {
213 213
214 214 public static IPromise<T2> Then<T, T2>(this IPromise<T> that, Func<T, T2> success, Func<Exception, T2> error, Func<Exception, T2> cancel) {
215 215 Safe.ArgumentNotNull(that, "that");
216 var d = new FuncTask<T,T2>(success, error, cancel);
216 var d = new FuncTask<T,T2>(success, error, cancel, false);
217 217 that.On(d.Resolve, d.Reject, d.CancelOperation);
218 218 if (success != null)
219 219 d.CancellationRequested(that.Cancel);
@@ -232,7 +232,7 namespace Implab {
232 232 public static IPromise Chain(this IPromise that, Func<IPromise> success, Func<Exception,IPromise> error, Func<Exception,IPromise> cancel) {
233 233 Safe.ArgumentNotNull(that, "that");
234 234
235 var d = new ActionChainTask(success, error, cancel);
235 var d = new ActionChainTask(success, error, cancel, false);
236 236 that.On(d.Resolve, d.Reject, d.CancelOperation);
237 237 if (success != null)
238 238 d.CancellationRequested(that.Cancel);
@@ -250,7 +250,7 namespace Implab {
250 250 public static IPromise<T> Chain<T>(this IPromise that, Func<IPromise<T>> success, Func<Exception, IPromise<T>> error, Func<Exception, IPromise<T>> cancel) {
251 251 Safe.ArgumentNotNull(that, "that");
252 252
253 var d = new FuncChainTask<T>(success, error, cancel);
253 var d = new FuncChainTask<T>(success, error, cancel, false);
254 254 that.On(d.Resolve, d.Reject, d.CancelOperation);
255 255 if (success != null)
256 256 d.CancellationRequested(that.Cancel);
@@ -267,7 +267,7 namespace Implab {
267 267
268 268 public static IPromise<T2> Chain<T, T2>(this IPromise<T> that, Func<T, IPromise<T2>> success, Func<Exception, IPromise<T2>> error, Func<Exception, IPromise<T2>> cancel) {
269 269 Safe.ArgumentNotNull(that, "that");
270 var d = new FuncChainTask<T,T2>(success, error, cancel);
270 var d = new FuncChainTask<T,T2>(success, error, cancel, false);
271 271 that.On(d.Resolve, d.Reject, d.CancelOperation);
272 272 if (success != null)
273 273 d.CancellationRequested(that.Cancel);
General Comments 0
You need to be logged in to leave comments. Login now