##// END OF EJS Templates
RC: cancellation support for promises + tests
cin -
r145:706fccb85524 v2
parent child
Show More
@@ -0,0 +1,144
1 using System;
2 using Implab.Parallels;
3
4 #if MONO
5
6 using NUnit.Framework;
7 using TestClassAttribute = NUnit.Framework.TestFixtureAttribute;
8 using TestMethodAttribute = NUnit.Framework.TestAttribute;
9
10 #else
11
12 using Microsoft.VisualStudio.TestTools.UnitTesting;
13
14 #endif
15
16 namespace Implab.Test {
17 [TestClass]
18 public class CancelationTests {
19
20 [TestMethod]
21 public void PromiseCancelTest() {
22 var p = new Promise();
23 bool requested = false;
24 var reason = new Exception("Test");
25
26 // request cancelation
27 p.Cancel(reason);
28
29 Assert.IsTrue(p.IsCancellationRequested);
30 Assert.AreSame(reason, p.CancellationReason);
31 Assert.IsFalse(p.IsCancelled);
32
33 p.CancellationRequested(r => {
34 Assert.AreSame(reason, r);
35 requested = true;
36 });
37
38 Assert.IsTrue(requested);
39
40 // cancel the promise
41 Assert.IsTrue(p.CancelOperationIfRequested());
42 Assert.IsTrue(p.IsCancelled);
43 Assert.AreSame(reason, p.Error);
44 }
45
46 [TestMethod]
47 public void CancelActionBeforeStartTask() {
48 bool run = false;
49 var task = new ActionTask(() => {
50 run = true;
51 }, null, null);
52
53 // request cancelation
54 task.Cancel();
55 Assert.IsTrue(task.IsCancelled);
56 task.Resolve();
57 Assert.IsFalse(run);
58 }
59
60 [TestMethod]
61 public void CancelActionAfterTaskStarted() {
62 var finish = new Signal();
63 var started = new Signal();
64
65 var task = new ActionTask(() => {
66 started.Set();
67 finish.Wait();
68 }, null, null);
69
70 AsyncPool.RunThread(() => {
71 task.Resolve();
72 });
73
74 started.Wait(1000);
75
76 task.Cancel();
77 Assert.IsTrue(task.IsCancellationRequested);
78 Assert.IsFalse(task.IsCancelled);
79 Assert.IsFalse(task.IsResolved);
80
81 finish.Set();
82 task.Join(1000);
83
84 }
85
86 [TestMethod]
87 public void CancelTaskChainFromBottom() {
88 var check1 = new Signal();
89 var requested = false;
90 var p1 = AsyncPool.RunThread(token => {
91 token.CancellationRequested(reason => requested = true);
92 check1.Wait();
93 token.CancelOperationIfRequested();
94 });
95
96 var p2 = p1.Then(() => {
97 });
98
99 Assert.IsFalse(p1.IsResolved);
100 Assert.IsFalse(p2.IsResolved);
101
102 p2.Cancel();
103
104 Assert.IsFalse(p2.IsCancelled);
105 Assert.IsFalse(p1.IsCancelled);
106 Assert.IsTrue(requested);
107
108 check1.Set();
109
110 try {
111 p2.Join(1000);
112 Assert.Fail("The chain isn't cancelled");
113 } catch(OperationCanceledException){
114 }
115
116 Assert.IsTrue(p1.IsCancelled);
117 Assert.IsTrue(p2.IsCancelled);
118 }
119
120
121
122 [TestMethod]
123 public void CancellableAsyncTask() {
124 var finish = new Signal();
125 var started = new Signal();
126
127 var p = AsyncPool.RunThread(token => {
128 token.CancellationRequested(r => finish.Set());
129 started.Set();
130 finish.Wait();
131 Assert.IsTrue(token.CancelOperationIfRequested());
132 });
133
134 started.Wait(1000);
135 Assert.IsFalse(p.IsResolved);
136 p.Cancel();
137 try {
138 p.Join(1000);
139 } catch (OperationCanceledException) {
140 }
141 }
142 }
143 }
144
@@ -0,0 +1,23
1 using System;
2
3 namespace Implab {
4 public class ActionChainTask : ActionChainTaskBase, IDeferred {
5 readonly Func<IPromise> m_task;
6
7 public ActionChainTask(Func<IPromise> task, Func<Exception, IPromise> error, Func<Exception, IPromise> cancel) : base(error,cancel) {
8 m_task = task;
9 }
10
11 public void Resolve() {
12 if (m_task != null && LockCancelation()) {
13 try {
14 Observe(m_task());
15 } catch(Exception err) {
16 HandleErrorInternal(err);
17 }
18 }
19 }
20
21 }
22 }
23
@@ -0,0 +1,62
1 using System;
2 using System.Threading;
3
4 namespace Implab {
5 public class ActionChainTaskBase : AbstractPromise {
6 readonly Func<Exception, IPromise> m_error;
7 readonly Func<Exception, IPromise> m_cancel;
8
9 int m_cancelationLock;
10
11 protected ActionChainTaskBase( Func<Exception, IPromise> error, Func<Exception, IPromise> cancel) {
12 m_error = error;
13 m_cancel = cancel;
14 }
15
16 public void Reject(Exception error) {
17 if (LockCancelation())
18 HandleErrorInternal(error);
19 }
20
21
22
23 public override void CancelOperation(Exception reason) {
24 if (m_cancel != null && LockCancelation()) {
25 try {
26 Observe(m_cancel(reason));
27 } catch(Exception err) {
28 HandleErrorInternal(err);
29 }
30 }
31
32 }
33
34 protected void HandleErrorInternal(Exception error) {
35 if (m_error != null) {
36 try {
37 Observe(m_error(error));
38 } catch(Exception err) {
39 SetError(err);
40 }
41 } else {
42 SetError(error);
43 }
44 }
45
46 protected void Observe(IPromise operation) {
47 if (operation == null)
48 throw new NullReferenceException("The task returned null promise");
49
50 // pass operation results to the current promise
51 operation.On(SetResult, SetError, SetCancelled);
52
53 // pass the cancelation request
54 CancellationRequested(operation.Cancel);
55 }
56
57 protected bool LockCancelation() {
58 return 0 == Interlocked.CompareExchange(ref m_cancelationLock, 1, 0);
59 }
60 }
61 }
62
@@ -0,0 +1,23
1 using System;
2
3 namespace Implab {
4 public class ActionChainTask<T> : ActionChainTaskBase, IDeferred<T> {
5 readonly Func<T, IPromise> m_task;
6
7 public ActionChainTask(Func<T, IPromise> task, Func<Exception, IPromise> error, Func<Exception, IPromise> cancel) : base(error,cancel) {
8 m_task = task;
9 }
10
11 public void Resolve(T value) {
12 if (m_task != null && LockCancelation()) {
13 try {
14 Observe(m_task(value));
15 } catch(Exception err) {
16 HandleErrorInternal(err);
17 }
18 }
19 }
20
21 }
22 }
23
@@ -0,0 +1,22
1 using System;
2
3 namespace Implab {
4 public class ActionTask : ActionTaskBase, IDeferred {
5 readonly Action m_task;
6 public ActionTask(Action task, Action<Exception> error, Action<Exception> cancel) : base(error,cancel) {
7 m_task = task;
8 }
9
10 public void Resolve() {
11 if (m_task != null && LockCancelation()) {
12 try {
13 m_task();
14 SetResult();
15 } catch(Exception err) {
16 HandleErrorInternal(err);
17 }
18 }
19 }
20 }
21 }
22
@@ -0,0 +1,55
1 using System;
2 using System.Threading;
3
4 namespace Implab {
5 public class ActionTaskBase : AbstractPromise {
6 readonly Action<Exception> m_cancel;
7 readonly Action<Exception> m_error;
8
9 int m_cancelationLock;
10
11 protected ActionTaskBase( Action<Exception> error, Action<Exception> cancel) {
12 m_error = error;
13 m_cancel = cancel;
14 }
15
16 public void Reject(Exception error) {
17 Safe.ArgumentNotNull(error, "error");
18 if (LockCancelation())
19 HandleErrorInternal(error);
20 }
21
22 protected void HandleErrorInternal(Exception error) {
23 if (m_error != null) {
24 try {
25 m_error(error);
26 SetResult();
27 } catch(Exception err) {
28 SetError(err);
29 }
30 } else {
31 SetError(error);
32 }
33 }
34
35 public override void CancelOperation(Exception reason) {
36 if (LockCancelation()) {
37 if (m_cancel != null) {
38 try {
39 m_cancel(reason);
40 SetResult();
41 } catch (Exception err) {
42 HandleErrorInternal(err);
43 }
44 } else {
45 SetCancelled(reason);
46 }
47 }
48 }
49
50 protected bool LockCancelation() {
51 return 0 == Interlocked.CompareExchange(ref m_cancelationLock, 1, 0);
52 }
53 }
54 }
55
@@ -0,0 +1,22
1 using System;
2
3 namespace Implab {
4 public class ActionTask<T> : ActionTaskBase, IDeferred<T> {
5 readonly Action<T> m_task;
6 public ActionTask(Action<T> task, Action<Exception> error, Action<Exception> cancel) : base(error,cancel) {
7 m_task = task;
8 }
9
10 public void Resolve(T value) {
11 if (m_task != null && LockCancelation()) {
12 try {
13 m_task(value);
14 SetResult();
15 } catch(Exception err) {
16 HandleErrorInternal(err);
17 }
18 }
19 }
20 }
21 }
22
@@ -0,0 +1,21
1 using System;
2
3 namespace Implab {
4 public class FuncChainTask<TResult> : FuncChainTaskBase<TResult>, IDeferred {
5 readonly Func<IPromise<TResult>> m_task;
6
7 public FuncChainTask(Func<IPromise<TResult>> task, Func<Exception, IPromise<TResult>> error, Func<Exception, IPromise<TResult>> cancel) : base(error, cancel){
8 m_task = task;
9 }
10
11 public void Resolve() {
12 if (m_task != null && LockCancelation()) {
13 try {
14 Observe(m_task());
15 } catch (Exception err) {
16 HandleErrorInternal(err);
17 }
18 }
19 }
20 }
21 } No newline at end of file
@@ -0,0 +1,60
1 using System;
2 using System.Threading;
3
4 namespace Implab {
5 public class FuncChainTaskBase<TResult> : AbstractPromise<TResult> {
6 readonly Func<Exception, IPromise<TResult>> m_error;
7 readonly Func<Exception, IPromise<TResult>> m_cancel;
8
9 int m_cancelationLock;
10
11 protected FuncChainTaskBase( Func<Exception, IPromise<TResult>> error, Func<Exception, IPromise<TResult>> cancel) {
12 m_error = error;
13 m_cancel = cancel;
14 }
15
16 public void Reject(Exception error) {
17 if (LockCancelation())
18 HandleErrorInternal(error);
19 }
20
21 public override void CancelOperation(Exception reason) {
22 if (m_cancel != null && LockCancelation()) {
23 try {
24 Observe(m_cancel(reason));
25 } catch(Exception err) {
26 HandleErrorInternal(err);
27 }
28 }
29
30 }
31
32 protected void HandleErrorInternal(Exception error) {
33 if (m_error != null) {
34 try {
35 Observe(m_error(error));
36 } catch(Exception err) {
37 SetError(err);
38 }
39 } else {
40 SetError(error);
41 }
42 }
43
44 protected void Observe(IPromise<TResult> operation) {
45 if (operation == null)
46 throw new NullReferenceException("The task returned null promise");
47
48 // pass operation results to the current promise
49 operation.On(SetResult, SetError, SetCancelled);
50
51 // pass the cancelation request
52 CancellationRequested(operation.Cancel);
53 }
54
55 protected bool LockCancelation() {
56 return 0 == Interlocked.CompareExchange(ref m_cancelationLock, 1, 0);
57 }
58 }
59 }
60
@@ -0,0 +1,21
1 using System;
2
3 namespace Implab {
4 public class FuncChainTask<TArg,TResult> : FuncChainTaskBase<TResult>, IDeferred<TArg> {
5 readonly Func<TArg, IPromise<TResult>> m_task;
6
7 public FuncChainTask(Func<TArg, IPromise<TResult>> task, Func<Exception, IPromise<TResult>> error, Func<Exception, IPromise<TResult>> cancel) : base(error, cancel){
8 m_task = task;
9 }
10
11 public void Resolve(TArg value) {
12 if (m_task != null && LockCancelation()) {
13 try {
14 Observe(m_task(value));
15 } catch (Exception err) {
16 HandleErrorInternal(err);
17 }
18 }
19 }
20 }
21 } No newline at end of file
@@ -0,0 +1,36
1 using System;
2
3 namespace Implab {
4 public interface ICancellationToken {
5 /// <summary>
6 /// Indicates wherther the cancellation was requested.
7 /// </summary>
8 bool IsCancellationRequested { get ; }
9
10 /// <summary>
11 /// The reason why the operation should be cancelled.
12 /// </summary>
13 Exception CancellationReason { get ; }
14
15 /// <summary>
16 /// Accepts if requested.
17 /// </summary>
18 /// <returns><c>true</c>, if if requested was accepted, <c>false</c> otherwise.</returns>
19 bool CancelOperationIfRequested();
20
21 /// <summary>
22 /// Sets the token to cancelled state.
23 /// </summary>
24 /// <param name="reason">The reason why the operation was cancelled.</param>
25 void CancelOperation(Exception reason);
26
27 /// <summary>
28 /// Adds the listener for the cancellation request, is the cancellation was requested the <paramref name="handler"/>
29 /// is executed immediatelly.
30 /// </summary>
31 /// <param name="handler">The handler which will be executed if the cancel occurs.</param>
32 void CancellationRequested(Action<Exception> handler);
33
34 }
35 }
36
@@ -0,0 +1,111
1 using System;
2
3 namespace Implab {
4 public class SuccessPromise : IPromise {
5 #region IPromise implementation
6
7 public IPromise On(Action success, Action<Exception> error, Action<Exception> cancel) {
8 if (success != null) {
9 try {
10 success();
11 } catch(Exception err) {
12 if (error != null) {
13 try {
14 error(err);
15 // Analysis disable once EmptyGeneralCatchClause
16 } catch {
17 }
18 }
19 }
20 }
21 return this;
22 }
23
24 public IPromise On(Action success, Action<Exception> error) {
25 if (success != null) {
26 try {
27 success();
28 } catch(Exception err) {
29 if (error != null) {
30 try {
31 error(err);
32 // Analysis disable once EmptyGeneralCatchClause
33 } catch {
34 }
35 }
36 }
37 }
38 return this;
39 }
40
41 public IPromise On(Action success) {
42 if (success != null) {
43 try {
44 success();
45 // Analysis disable once EmptyGeneralCatchClause
46 } catch {
47 }
48 }
49 return this;
50 }
51
52 public IPromise On(Action handler, PromiseEventType events) {
53 if (handler != null && events.HasFlag(PromiseEventType.Success)) {
54 try {
55 handler();
56 // Analysis disable once EmptyGeneralCatchClause
57 } catch {
58 }
59 }
60 return this;
61 }
62
63 public IPromise<T> Cast<T>() {
64 throw new InvalidCastException();
65 }
66
67 public void Join() {
68 }
69
70 public void Join(int timeout) {
71 }
72
73 public Type PromiseType {
74 get {
75 return typeof(void);
76 }
77 }
78
79 public bool IsResolved {
80 get {
81 return true;
82 }
83 }
84
85 public bool IsCancelled {
86 get {
87 return false;
88 }
89 }
90
91 public Exception Error {
92 get {
93 return null;
94 }
95 }
96
97 #endregion
98
99 #region ICancellable implementation
100
101 public void Cancel() {
102 }
103
104 public void Cancel(Exception reason) {
105 }
106
107 #endregion
108
109 }
110 }
111
@@ -0,0 +1,177
1 using System;
2
3 namespace Implab {
4 public class SuccessPromise<T> : IPromise<T> {
5 readonly T m_value;
6
7 public SuccessPromise(T value){
8 m_value = value;
9 }
10
11 public IPromise<T> On(Action<T> success, Action<Exception> error, Action<Exception> cancel) {
12 if (success != null) {
13 try {
14 success(m_value);
15 } catch(Exception err) {
16 if (error != null) {
17 try {
18 error(err);
19 // Analysis disable once EmptyGeneralCatchClause
20 } catch {
21 }
22 }
23 }
24 }
25 return this;
26 }
27
28 public IPromise<T> On(Action<T> success, Action<Exception> error) {
29 if (success != null) {
30 try {
31 success(m_value);
32 } catch(Exception err) {
33 if (error != null) {
34 try {
35 error(err);
36 // Analysis disable once EmptyGeneralCatchClause
37 } catch {
38 }
39 }
40 }
41 }
42 return this;
43 }
44
45 public IPromise<T> On(Action<T> success) {
46 if (success != null) {
47 try {
48 success(m_value);
49 // Analysis disable once EmptyGeneralCatchClause
50 } catch {
51 }
52 }
53 return this;
54 }
55
56 public T Join() {
57 return m_value;
58 }
59
60 public T Join(int timeout) {
61 return m_value;
62 }
63
64 public IPromise<T> On(Action success, Action<Exception> error, Action<Exception> cancel) {
65 if (success != null) {
66 try {
67 success();
68 } catch(Exception err) {
69 if (error != null) {
70 try {
71 error(err);
72 // Analysis disable once EmptyGeneralCatchClause
73 } catch {
74 }
75 }
76 }
77 }
78 return this;
79 }
80
81 public IPromise<T> On(Action success, Action<Exception> error) {
82 if (success != null) {
83 try {
84 success();
85 } catch(Exception err) {
86 if (error != null) {
87 try {
88 error(err);
89 // Analysis disable once EmptyGeneralCatchClause
90 } catch {
91 }
92 }
93 }
94 }
95 return this;
96 }
97
98 public IPromise<T> On(Action success) {
99 if (success != null) {
100 try {
101 success();
102 // Analysis disable once EmptyGeneralCatchClause
103 } catch {
104 }
105 }
106 return this;
107 }
108
109 public IPromise<T> On(Action handler, PromiseEventType events) {
110 if (handler != null && events.HasFlag(PromiseEventType.Success)) {
111 try {
112 handler();
113 // Analysis disable once EmptyGeneralCatchClause
114 } catch {
115 }
116 }
117 return this;
118 }
119
120 IPromise IPromise.On(Action success, Action<Exception> error, Action<Exception> cancel) {
121 return On(success, error, cancel);
122 }
123
124 IPromise IPromise.On(Action success, Action<Exception> error) {
125 return On(success, error);
126 }
127
128 IPromise IPromise.On(Action success) {
129 return On(success);
130 }
131
132 IPromise IPromise.On(Action handler, PromiseEventType events) {
133 return On(handler, events);
134 }
135
136 public IPromise<T2> Cast<T2>() {
137 return new SuccessPromise<T2>((T2)(object)m_value);
138 }
139
140 void IPromise.Join() {
141 }
142
143 void IPromise.Join(int timeout) {
144 }
145
146 public Type PromiseType {
147 get {
148 return typeof(T);
149 }
150 }
151
152 public bool IsResolved {
153 get {
154 return true;
155 }
156 }
157
158 public bool IsCancelled {
159 get {
160 return false;
161 }
162 }
163
164 public Exception Error {
165 get {
166 return null;
167 }
168 }
169
170 public void Cancel() {
171 }
172
173 public void Cancel(Exception reason) {
174 }
175 }
176 }
177
@@ -12,23 +12,23 namespace Implab.Fx {
12 12 m_target = target;
13 13 }
14 14
15 protected override void SignalSuccess(IDeferred<T> handler) {
15 protected override void SignalSuccess(Promise<T>.HandlerDescriptor handler) {
16 16 if (m_target.InvokeRequired)
17 m_target.BeginInvoke(new Action<IDeferred<T>>(base.SignalSuccess), handler);
17 m_target.BeginInvoke(new Action<Promise<T>.HandlerDescriptor>(base.SignalSuccess), handler);
18 18 else
19 19 base.SignalSuccess(handler);
20 20 }
21 21
22 protected override void SignalCancelled(IDeferred<T> handler, Exception reason) {
22 protected override void SignalCancelled(Promise<T>.HandlerDescriptor handler, Exception reason) {
23 23 if (m_target.InvokeRequired)
24 m_target.BeginInvoke(new Action<IDeferred<T>,Exception>(base.SignalCancelled), handler, reason);
24 m_target.BeginInvoke(new Action<Promise<T>.HandlerDescriptor,Exception>(base.SignalCancelled), handler, reason);
25 25 else
26 26 base.SignalCancelled(handler, reason);
27 27 }
28 28
29 protected override void SignalError(IDeferred<T> handler, Exception error) {
29 protected override void SignalError(Promise<T>.HandlerDescriptor handler, Exception error) {
30 30 if (m_target.InvokeRequired)
31 m_target.BeginInvoke(new Action<IDeferred<T>,Exception>(base.SignalError), handler, error);
31 m_target.BeginInvoke(new Action<Promise<T>.HandlerDescriptor,Exception>(base.SignalError), handler, error);
32 32 else
33 33 base.SignalError(handler, error);
34 34 }
@@ -7,7 +7,7 using Implab.Parallels;
7 7
8 8 using NUnit.Framework;
9 9 using TestClassAttribute = NUnit.Framework.TestFixtureAttribute;
10 using TestMethod = NUnit.Framework.TestAttribute;
10 using TestMethodAttribute = NUnit.Framework.TestAttribute;
11 11
12 12 #else
13 13
@@ -51,7 +51,7 namespace Implab.Test {
51 51 [TestMethod]
52 52 public void CancelExceptionTest() {
53 53 var p = new Promise<bool>();
54 p.Cancel();
54 p.CancelOperation(null);
55 55
56 56 var p2 = p.Then(x => x, null, reason => {
57 57 throw new ApplicationException("CANCELLED");
@@ -69,10 +69,10 namespace Implab.Test {
69 69 [TestMethod]
70 70 public void ContinueOnCancelTest() {
71 71 var p = new Promise<bool>();
72 p.Cancel();
72 p.CancelOperation(null);
73 73
74 74 var p2 = p
75 .Then<bool>(x => x, null, reason => {
75 .Then(x => x, null, reason => {
76 76 throw new ApplicationException("CANCELLED");
77 77 })
78 78 .Then(x => x, e => true);
@@ -56,6 +56,7
56 56 <Compile Include="AsyncTests.cs" />
57 57 <Compile Include="PromiseHelper.cs" />
58 58 <Compile Include="Properties\AssemblyInfo.cs" />
59 <Compile Include="CancelationTests.cs" />
59 60 </ItemGroup>
60 61 <ItemGroup>
61 62 <ProjectReference Include="..\Implab\Implab.csproj">
@@ -4,7 +4,7 using System.Threading;
4 4 using System.Reflection;
5 5
6 6 namespace Implab {
7 public abstract class AbstractEvent<THandler> : ICancelationToken, ICancellable {
7 public abstract class AbstractEvent<THandler> : ICancellationToken, ICancellable {
8 8
9 9 const int UNRESOLVED_SATE = 0;
10 10 const int TRANSITIONAL_STATE = 1;
@@ -280,31 +280,34 namespace Implab {
280 280 }
281 281 }
282 282
283 public bool AcceptIfRequested() {
284 if (IsCancelRequested)
285 CancelOperation(CancelReason);
283 public bool CancelOperationIfRequested() {
284 if (IsCancellationRequested) {
285 CancelOperation(CancellationReason);
286 return true;
287 }
288 return false;
286 289 }
287 290
288 291 public virtual void CancelOperation(Exception reason) {
289 292 SetCancelled(reason);
290 293 }
291 294
292 public void CancelationRequested(Action<Exception> handler) {
295 public void CancellationRequested(Action<Exception> handler) {
293 296 Safe.ArgumentNotNull(handler, "handler");
294 if (IsCancelRequested)
295 handler(CancelReason);
297 if (IsCancellationRequested)
298 handler(CancellationReason);
296 299
297 300 if (m_cancelationHandlers == null)
298 301 Interlocked.CompareExchange(ref m_cancelationHandlers, new MTQueue<Action<Exception>>(), null);
299 302
300 303 m_cancelationHandlers.Enqueue(handler);
301 304
302 if (IsCancelRequested && m_cancelationHandlers.TryDequeue(out handler))
305 if (IsCancellationRequested && m_cancelationHandlers.TryDequeue(out handler))
303 306 // TryDeque implies MemoryBarrier()
304 307 handler(m_cancelationReason);
305 308 }
306 309
307 public bool IsCancelRequested {
310 public bool IsCancellationRequested {
308 311 get {
309 312 do {
310 313 if (m_cancelRequest == CANCEL_NOT_REQUESTED)
@@ -316,7 +319,7 namespace Implab {
316 319 }
317 320 }
318 321
319 public Exception CancelReason {
322 public Exception CancellationReason {
320 323 get {
321 324 do {
322 325 Thread.MemoryBarrier();
@@ -333,7 +336,7 namespace Implab {
333 336 }
334 337
335 338 public void Cancel(Exception reason) {
336 if (CANCEL_NOT_REQUESTED == Interlocked.CompareExchange(ref m_cancelRequest, CANCEL_REQUESTING)) {
339 if (CANCEL_NOT_REQUESTED == Interlocked.CompareExchange(ref m_cancelRequest, CANCEL_REQUESTING, CANCEL_NOT_REQUESTED)) {
337 340 m_cancelationReason = reason;
338 341 m_cancelRequest = CANCEL_REQUESTED;
339 342 if (m_cancelationHandlers != null) {
@@ -18,11 +18,13 namespace Implab {
18 18
19 19 public HandlerDescriptor(Action handler, PromiseEventType mask) {
20 20 m_handler = handler;
21 m_error = null;
22 m_cancel = null;
21 23 m_mask = mask;
22 24 }
23 25
24 26 public void SignalSuccess() {
25 if (m_mask & PromiseEventType.Success && m_handler != null) {
27 if ((m_mask & PromiseEventType.Success) != 0 && m_handler != null) {
26 28 try {
27 29 m_handler();
28 30 } catch (Exception err) {
@@ -40,7 +42,7 namespace Implab {
40 42 // Analysis disable once EmptyGeneralCatchClause
41 43 } catch {
42 44 }
43 } else if (m_mask & PromiseEventType.Error && m_handler != null) {
45 } else if ((m_mask & PromiseEventType.Error ) != 0 && m_handler != null) {
44 46 try {
45 47 m_handler();
46 48 // Analysis disable once EmptyGeneralCatchClause
@@ -56,7 +58,7 namespace Implab {
56 58 } catch (Exception err) {
57 59 SignalError(err);
58 60 }
59 } else if (m_mask & PromiseEventType.Cancelled && m_handler != null) {
61 } else if ( (m_mask & PromiseEventType.Cancelled) != 0 && m_handler != null) {
60 62 try {
61 63 m_handler();
62 64 // Analysis disable once EmptyGeneralCatchClause
@@ -84,11 +86,11 namespace Implab {
84 86 protected override Signal GetResolveSignal() {
85 87 var signal = new Signal();
86 88 On(signal.Set, PromiseEventType.All);
89 return signal;
87 90 }
88 91
89 92 #endregion
90 93
91
92 94 public Type PromiseType {
93 95 get {
94 96 return typeof(void);
@@ -14,10 +14,14 namespace Implab {
14 14 m_success = success;
15 15 m_error = error;
16 16 m_cancel = cancel;
17
18 m_handler = null;
19 m_mask = 0;
17 20 }
18 21
19 22 public HandlerDescriptor(Action success, Action<Exception> error, Action<Exception> cancel) {
20 23 m_handler = success;
24 m_success = null;
21 25 m_error = error;
22 26 m_cancel = cancel;
23 27 m_mask = PromiseEventType.Success;
@@ -26,6 +30,9 namespace Implab {
26 30 public HandlerDescriptor(Action handler, PromiseEventType mask) {
27 31 m_handler = handler;
28 32 m_mask = mask;
33 m_success = null;
34 m_error = null;
35 m_cancel = null;
29 36 }
30 37
31 38 public void SignalSuccess(T result) {
@@ -35,7 +42,7 namespace Implab {
35 42 } catch(Exception err) {
36 43 SignalError(err);
37 44 }
38 } else if (m_mask & PromiseEventType.Success && m_handler != null) {
45 } else if ((m_mask & PromiseEventType.Success) != 0 && m_handler != null) {
39 46 try {
40 47 m_handler();
41 48 } catch(Exception err) {
@@ -53,7 +60,7 namespace Implab {
53 60 // Analysis disable once EmptyGeneralCatchClause
54 61 } catch {
55 62 }
56 } else if (m_mask & PromiseEventType.Error && m_handler != null) {
63 } else if ((m_mask & PromiseEventType.Error) != 0 && m_handler != null) {
57 64 try {
58 65 m_handler();
59 66 // Analysis disable once EmptyGeneralCatchClause
@@ -69,7 +76,7 namespace Implab {
69 76 } catch (Exception err) {
70 77 SignalError(err);
71 78 }
72 } else if (m_mask & PromiseEventType.Cancelled && m_handler != null) {
79 } else if ((m_mask & PromiseEventType.Cancelled) != 0 && m_handler != null) {
73 80 try {
74 81 m_handler();
75 82 // Analysis disable once EmptyGeneralCatchClause
@@ -79,23 +86,28 namespace Implab {
79 86 }
80 87 }
81 88
82
83
84 89 public Type PromiseType {
85 90 get {
86 91 return typeof(T);
87 92 }
88 93 }
89 94
90 public new T Join() {
95 public T Join() {
91 96 WaitResult(-1);
92 97 return m_result;
93 98 }
94 public new T Join(int timeout) {
99 public T Join(int timeout) {
95 100 WaitResult(timeout);
96 101 return m_result;
97 102 }
98 103
104 void IPromise.Join() {
105 WaitResult(-1);
106 }
107 void IPromise.Join(int timeout) {
108 WaitResult(timeout);
109 }
110
99 111 public IPromise<T> On(Action<T> success, Action<Exception> error, Action<Exception> cancel) {
100 112 AddHandler(new HandlerDescriptor(success, error, cancel));
101 113 return this;
@@ -146,6 +158,11 namespace Implab {
146 158 return this;
147 159 }
148 160
161 IPromise IPromise.On(Action handler, PromiseEventType events) {
162 AddHandler(new HandlerDescriptor(handler, events));
163 return this;
164 }
165
149 166 public IPromise<T2> Cast<T2>() {
150 167 return (IPromise<T2>)this;
151 168 }
@@ -4,7 +4,7 namespace Implab {
4 4 /// <summary>
5 5 /// Deferred result, usually used by asynchronous services as the service part of the promise.
6 6 /// </summary>
7 public interface IDeferred : ICancelationToken {
7 public interface IDeferred : ICancellationToken {
8 8
9 9 void Resolve();
10 10
@@ -1,7 +1,7
1 1 using System;
2 2
3 3 namespace Implab {
4 public interface IDeferred<T> : ICancelationToken {
4 public interface IDeferred<in T> : ICancellationToken {
5 5 void Resolve(T value);
6 6
7 7 void Reject(Exception error);
@@ -1,7 +1,4
1 1 using System;
2 using System.Collections.Generic;
3 using System.Linq;
4 using System.Text;
5 2
6 3 namespace Implab {
7 4 public interface ITaskController: IProgressHandler, ICancellable {
@@ -157,14 +157,24
157 157 <Compile Include="Diagnostics\ILogWriter.cs" />
158 158 <Compile Include="Diagnostics\ListenerBase.cs" />
159 159 <Compile Include="Parallels\BlockingQueue.cs" />
160 <Compile Include="ICancelationToken.cs" />
161 160 <Compile Include="AbstractEvent.cs" />
162 161 <Compile Include="AbstractPromise.cs" />
163 162 <Compile Include="AbstractPromiseT.cs" />
164 163 <Compile Include="FuncTask.cs" />
165 164 <Compile Include="FuncTaskBase.cs" />
166 165 <Compile Include="FuncTaskT.cs" />
167 <Compile Include="ChainTask.cs" />
166 <Compile Include="ActionChainTaskBase.cs" />
167 <Compile Include="ActionChainTask.cs" />
168 <Compile Include="ActionChainTaskT.cs" />
169 <Compile Include="FuncChainTaskBase.cs" />
170 <Compile Include="FuncChainTask.cs" />
171 <Compile Include="FuncChainTaskT.cs" />
172 <Compile Include="ActionTaskBase.cs" />
173 <Compile Include="ActionTask.cs" />
174 <Compile Include="ActionTaskT.cs" />
175 <Compile Include="ICancellationToken.cs" />
176 <Compile Include="SuccessPromise.cs" />
177 <Compile Include="SuccessPromiseT.cs" />
168 178 </ItemGroup>
169 179 <Import Project="$(MSBuildBinPath)\Microsoft.CSharp.targets" />
170 180 <ItemGroup />
@@ -152,7 +152,7 namespace Implab.Parallels {
152 152 throw new ArgumentOutOfRangeException("threads","Threads number must be greater then zero");
153 153
154 154 if (source.Length == 0)
155 return Promise<TDst[]>.ResultToPromise(new TDst[0]);
155 return Promise<TDst[]>.FromResult(new TDst[0]);
156 156
157 157 var promise = new Promise<TDst[]>();
158 158 var res = new TDst[source.Length];
@@ -31,6 +31,24 namespace Implab.Parallels {
31 31 return p;
32 32 }
33 33
34 public static IPromise<T> Invoke<T>(Func<ICancellationToken, T> func) {
35 var p = new Promise<T>();
36 var caller = TraceContext.Instance.CurrentOperation;
37
38 ThreadPool.QueueUserWorkItem(param => {
39 TraceContext.Instance.EnterLogicalOperation(caller,false);
40 try {
41 p.Resolve(func(p));
42 } catch(Exception e) {
43 p.Reject(e);
44 } finally {
45 TraceContext.Instance.Leave();
46 }
47 });
48
49 return p;
50 }
51
34 52 public static IPromise<T> RunThread<T>(Func<T> func) {
35 53 var p = new Promise<T>();
36 54
@@ -52,6 +70,27 namespace Implab.Parallels {
52 70 return p;
53 71 }
54 72
73 public static IPromise<T> RunThread<T>(Func<ICancellationToken, T> func) {
74 var p = new Promise<T>();
75
76 var caller = TraceContext.Instance.CurrentOperation;
77
78 var worker = new Thread(() => {
79 TraceContext.Instance.EnterLogicalOperation(caller,false);
80 try {
81 p.Resolve(func(p));
82 } catch (Exception e) {
83 p.Reject(e);
84 } finally {
85 TraceContext.Instance.Leave();
86 }
87 });
88 worker.IsBackground = true;
89 worker.Start();
90
91 return p;
92 }
93
55 94
56 95 public static IPromise RunThread(Action func) {
57 96 var p = new Promise();
@@ -75,12 +114,42 namespace Implab.Parallels {
75 114 return p;
76 115 }
77 116
117 public static IPromise RunThread(Action<ICancellationToken> func) {
118 var p = new Promise();
119
120 var caller = TraceContext.Instance.CurrentOperation;
121
122 var worker = new Thread(() => {
123 TraceContext.Instance.EnterLogicalOperation(caller,false);
124 try {
125 func(p);
126 p.Resolve();
127 } catch (Exception e) {
128 p.Reject(e);
129 } finally {
130 TraceContext.Instance.Leave();
131 }
132 });
133 worker.IsBackground = true;
134 worker.Start();
135
136 return p;
137 }
138
78 139 public static IPromise[] RunThread(params Action[] func) {
79 140 return func.Select(f => RunThread(f)).ToArray();
80 141 }
81 142
143 public static IPromise[] RunThread(params Action<ICancellationToken>[] func) {
144 return func.Select(f => RunThread(f)).ToArray();
145 }
146
82 147 public static IPromise<T>[] RunThread<T>(params Func<T>[] func) {
83 148 return func.Select(f => RunThread(f)).ToArray();
84 149 }
150
151 public static IPromise<T>[] RunThread<T>(params Func<ICancellationToken, T>[] func) {
152 return func.Select(f => RunThread(f)).ToArray();
153 }
85 154 }
86 155 }
@@ -174,6 +174,116 namespace Implab {
174 174
175 175 return medium;
176 176 }
177
178 public static IPromise Then(this IPromise that, Action success, Action<Exception> error, Action<Exception> cancel) {
179 Safe.ArgumentNotNull(that, "that");
180
181 var d = new ActionTask(success, error, cancel);
182 that.On(d.Resolve, d.Reject, d.CancelOperation);
183 if (success != null)
184 d.CancellationRequested(that.Cancel);
185 return d;
186 }
187
188 public static IPromise Then(this IPromise that, Action success, Action<Exception> error) {
189 return Then(that, success, error, null);
190 }
191
192 public static IPromise Then(this IPromise that, Action success) {
193 return Then(that, success, null, null);
194 }
195
196 public static IPromise<T> Then<T>(this IPromise that, Func<T> success, Func<Exception, T> error, Func<Exception, T> cancel) {
197 Safe.ArgumentNotNull(that, "that");
198
199 var d = new FuncTask<T>(success, error, cancel);
200 that.On(d.Resolve, d.Reject, d.CancelOperation);
201 if (success != null)
202 d.CancellationRequested(that.Cancel);
203 return d;
204 }
205
206 public static IPromise<T> Then<T>(this IPromise that, Func<T> success, Func<Exception, T> error) {
207 return Then(that, success, error, null);
208 }
209
210 public static IPromise<T> Then<T>(this IPromise that, Func<T> success) {
211 return Then(that, success, null, null);
212 }
213
214 public static IPromise<T2> Then<T, T2>(this IPromise<T> that, Func<T, T2> success, Func<Exception, T2> error, Func<Exception, T2> cancel) {
215 Safe.ArgumentNotNull(that, "that");
216 var d = new FuncTask<T,T2>(success, error, cancel);
217 that.On(d.Resolve, d.Reject, d.CancelOperation);
218 if (success != null)
219 d.CancellationRequested(that.Cancel);
220 return d;
221 }
222
223 public static IPromise<T2> Then<T, T2>(this IPromise<T> that, Func<T, T2> success, Func<Exception, T2> error) {
224 return Then(that, success, error, null);
225 }
226
227 public static IPromise<T2> Then<T, T2>(this IPromise<T> that, Func<T, T2> success) {
228 return Then(that, success, null, null);
229 }
230
231 #region chain traits
232 public static IPromise Chain(this IPromise that, Func<IPromise> success, Func<Exception,IPromise> error, Func<Exception,IPromise> cancel) {
233 Safe.ArgumentNotNull(that, "that");
234
235 var d = new ActionChainTask(success, error, cancel);
236 that.On(d.Resolve, d.Reject, d.CancelOperation);
237 if (success != null)
238 d.CancellationRequested(that.Cancel);
239 return d;
240 }
241
242 public static IPromise Chain(this IPromise that, Func<IPromise> success, Func<Exception,IPromise> error) {
243 return Chain(that, success, error, null);
244 }
245
246 public static IPromise Chain(this IPromise that, Func<IPromise> success) {
247 return Chain(that, success, null, null);
248 }
249
250 public static IPromise<T> Chain<T>(this IPromise that, Func<IPromise<T>> success, Func<Exception, IPromise<T>> error, Func<Exception, IPromise<T>> cancel) {
251 Safe.ArgumentNotNull(that, "that");
252
253 var d = new FuncChainTask<T>(success, error, cancel);
254 that.On(d.Resolve, d.Reject, d.CancelOperation);
255 if (success != null)
256 d.CancellationRequested(that.Cancel);
257 return d;
258 }
259
260 public static IPromise<T> Chain<T>(this IPromise that, Func<IPromise<T>> success, Func<Exception, IPromise<T>> error) {
261 return Chain(that, success, error, null);
262 }
263
264 public static IPromise<T> Chain<T>(this IPromise that, Func<IPromise<T>> success) {
265 return Chain(that, success, null, null);
266 }
267
268 public static IPromise<T2> Chain<T, T2>(this IPromise<T> that, Func<T, IPromise<T2>> success, Func<Exception, IPromise<T2>> error, Func<Exception, IPromise<T2>> cancel) {
269 Safe.ArgumentNotNull(that, "that");
270 var d = new FuncChainTask<T,T2>(success, error, cancel);
271 that.On(d.Resolve, d.Reject, d.CancelOperation);
272 if (success != null)
273 d.CancellationRequested(that.Cancel);
274 return d;
275 }
276
277 public static IPromise<T2> Chain<T, T2>(this IPromise<T> that, Func<T, IPromise<T2>> success, Func<Exception, IPromise<T2>> error) {
278 return Chain(that, success, error, null);
279 }
280
281 public static IPromise<T2> Chain<T, T2>(this IPromise<T> that, Func<T, IPromise<T2>> success) {
282 return Chain(that, success, null, null);
283 }
284
285 #endregion
286
177 287
178 288 #if NET_4_5
179 289
@@ -109,9 +109,9 namespace Implab
109 109 ArgumentNotNull(action, "action");
110 110
111 111 try {
112 return action() ?? Promise<T>.ExceptionToPromise(new Exception("The action returned null"));
112 return action() ?? Promise<T>.FromException(new Exception("The action returned null"));
113 113 } catch (Exception err) {
114 return Promise<T>.ExceptionToPromise(err);
114 return Promise<T>.FromException(err);
115 115 }
116 116 }
117 117 }
@@ -10,15 +10,15 namespace Implab {
10 10 m_context = context;
11 11 }
12 12
13 protected override void SignalSuccess(IDeferred<T> handler) {
13 protected override void SignalSuccess(Promise<T>.HandlerDescriptor handler) {
14 14 m_context.Post(x => base.SignalSuccess(handler), null);
15 15 }
16 16
17 protected override void SignalError(IDeferred<T> handler, Exception error) {
17 protected override void SignalError(Promise<T>.HandlerDescriptor handler, Exception error) {
18 18 m_context.Post(x => base.SignalError(handler, error), null);
19 19 }
20 20
21 protected override void SignalCancelled(IDeferred<T> handler, Exception reason) {
21 protected override void SignalCancelled(Promise<T>.HandlerDescriptor handler, Exception reason) {
22 22 m_context.Post(x => base.SignalCancelled(handler, reason), null);
23 23 }
24 24 }
@@ -8,86 +8,33 using System.Threading;
8 8
9 9 namespace MonoPlay {
10 10 class MainClass {
11
12
11 13 public static void Main(string[] args) {
12 14 if (args == null)
13 15 throw new ArgumentNullException("args");
14 16
15 17 var t1 = Environment.TickCount;
16 18
17 const int reads = 100000;
18 const int writes = 1000;
19 const int readThreads = 8;
20 const int writeThreads = 0;
21
22 var l = new SharedLock();
23 var st = new HashSet<int>();
24
25 Action reader1 = () => {
26 for (int i =0; i < reads; i++) {
27 try {
28 l.LockShared();
29 st.Contains(i % 1000);
30 Thread.Sleep(0);
31 } finally {
32 l.Release();
33 }
34 }
35 };
36
37 Action reader2 = () => {
38 for(var i = 0; i < reads; i++)
39 lock(st) {
40 st.Contains(i % 1000);
41 Thread.Sleep(0);
42 }
43 };
44
45 Action writer1 = () => {
46 var rnd = new Random(Environment.TickCount);
47 for (int i = 0; i < writes; i++) {
48 try {
49 l.LockExclusive();
50 st.Add(rnd.Next(1000));
51 //Thread.Sleep(1);
52 } finally {
53 l.Release();
54 }
55 }
56 };
57
58 Action writer2 = () => {
59 var rnd = new Random(Environment.TickCount);
60 for (int i = 0; i < writes; i++) {
61 lock (st) {
62 st.Add(rnd.Next(1000));
63 //Thread.Sleep(1);
64 }
65 }
66 };
67
68
69
70 var readers = new IPromise[readThreads];
71 for (int i = 0; i < readThreads; i++)
72 readers[i] = AsyncPool.RunThread(reader2);
73
74 var writers = new IPromise[writeThreads];
75 for (int i = 0; i < writeThreads; i++)
76 writers[i] = AsyncPool.RunThread(writer1);
77
78
79 new [] {
80 readers.Bundle().On(() => Console.WriteLine("readers complete in {0} ms", Environment.TickCount - t1)),
81 writers.Bundle().On(() => Console.WriteLine("writers complete in {0} ms", Environment.TickCount - t1))
82 }.Bundle().Join();
83
84
19 for (int i = 0; i < 10000000; i++) {
20
21 var p = new Promise<int>();
22 p.On(HandleResult);
23 p.Resolve(i);
24 }
85 25
86 26 var t2 = Environment.TickCount;
87 27 Console.WriteLine("done: {0} ms, {1:.00} Mb, {2} GC", t2 - t1, GC.GetTotalMemory(false) / (1024*1024), GC.CollectionCount(0) );
88 28
89 29 }
90 30
31 static void HandleAction ()
32 {
33
34 }
91 35
36 static void HandleResult(int x) {
37
38 }
92 39 }
93 40 }
1 NO CONTENT: file was removed
1 NO CONTENT: file was removed
General Comments 0
You need to be logged in to leave comments. Login now