##// END OF EJS Templates
RC: cancellation support for promises + tests
cin -
r145:706fccb85524 v2
parent child
Show More
@@ -0,0 +1,144
1 using System;
2 using Implab.Parallels;
3
4 #if MONO
5
6 using NUnit.Framework;
7 using TestClassAttribute = NUnit.Framework.TestFixtureAttribute;
8 using TestMethodAttribute = NUnit.Framework.TestAttribute;
9
10 #else
11
12 using Microsoft.VisualStudio.TestTools.UnitTesting;
13
14 #endif
15
16 namespace Implab.Test {
17 [TestClass]
18 public class CancelationTests {
19
20 [TestMethod]
21 public void PromiseCancelTest() {
22 var p = new Promise();
23 bool requested = false;
24 var reason = new Exception("Test");
25
26 // request cancelation
27 p.Cancel(reason);
28
29 Assert.IsTrue(p.IsCancellationRequested);
30 Assert.AreSame(reason, p.CancellationReason);
31 Assert.IsFalse(p.IsCancelled);
32
33 p.CancellationRequested(r => {
34 Assert.AreSame(reason, r);
35 requested = true;
36 });
37
38 Assert.IsTrue(requested);
39
40 // cancel the promise
41 Assert.IsTrue(p.CancelOperationIfRequested());
42 Assert.IsTrue(p.IsCancelled);
43 Assert.AreSame(reason, p.Error);
44 }
45
46 [TestMethod]
47 public void CancelActionBeforeStartTask() {
48 bool run = false;
49 var task = new ActionTask(() => {
50 run = true;
51 }, null, null);
52
53 // request cancelation
54 task.Cancel();
55 Assert.IsTrue(task.IsCancelled);
56 task.Resolve();
57 Assert.IsFalse(run);
58 }
59
60 [TestMethod]
61 public void CancelActionAfterTaskStarted() {
62 var finish = new Signal();
63 var started = new Signal();
64
65 var task = new ActionTask(() => {
66 started.Set();
67 finish.Wait();
68 }, null, null);
69
70 AsyncPool.RunThread(() => {
71 task.Resolve();
72 });
73
74 started.Wait(1000);
75
76 task.Cancel();
77 Assert.IsTrue(task.IsCancellationRequested);
78 Assert.IsFalse(task.IsCancelled);
79 Assert.IsFalse(task.IsResolved);
80
81 finish.Set();
82 task.Join(1000);
83
84 }
85
86 [TestMethod]
87 public void CancelTaskChainFromBottom() {
88 var check1 = new Signal();
89 var requested = false;
90 var p1 = AsyncPool.RunThread(token => {
91 token.CancellationRequested(reason => requested = true);
92 check1.Wait();
93 token.CancelOperationIfRequested();
94 });
95
96 var p2 = p1.Then(() => {
97 });
98
99 Assert.IsFalse(p1.IsResolved);
100 Assert.IsFalse(p2.IsResolved);
101
102 p2.Cancel();
103
104 Assert.IsFalse(p2.IsCancelled);
105 Assert.IsFalse(p1.IsCancelled);
106 Assert.IsTrue(requested);
107
108 check1.Set();
109
110 try {
111 p2.Join(1000);
112 Assert.Fail("The chain isn't cancelled");
113 } catch(OperationCanceledException){
114 }
115
116 Assert.IsTrue(p1.IsCancelled);
117 Assert.IsTrue(p2.IsCancelled);
118 }
119
120
121
122 [TestMethod]
123 public void CancellableAsyncTask() {
124 var finish = new Signal();
125 var started = new Signal();
126
127 var p = AsyncPool.RunThread(token => {
128 token.CancellationRequested(r => finish.Set());
129 started.Set();
130 finish.Wait();
131 Assert.IsTrue(token.CancelOperationIfRequested());
132 });
133
134 started.Wait(1000);
135 Assert.IsFalse(p.IsResolved);
136 p.Cancel();
137 try {
138 p.Join(1000);
139 } catch (OperationCanceledException) {
140 }
141 }
142 }
143 }
144
@@ -0,0 +1,23
1 using System;
2
3 namespace Implab {
4 public class ActionChainTask : ActionChainTaskBase, IDeferred {
5 readonly Func<IPromise> m_task;
6
7 public ActionChainTask(Func<IPromise> task, Func<Exception, IPromise> error, Func<Exception, IPromise> cancel) : base(error,cancel) {
8 m_task = task;
9 }
10
11 public void Resolve() {
12 if (m_task != null && LockCancelation()) {
13 try {
14 Observe(m_task());
15 } catch(Exception err) {
16 HandleErrorInternal(err);
17 }
18 }
19 }
20
21 }
22 }
23
@@ -0,0 +1,62
1 using System;
2 using System.Threading;
3
4 namespace Implab {
5 public class ActionChainTaskBase : AbstractPromise {
6 readonly Func<Exception, IPromise> m_error;
7 readonly Func<Exception, IPromise> m_cancel;
8
9 int m_cancelationLock;
10
11 protected ActionChainTaskBase( Func<Exception, IPromise> error, Func<Exception, IPromise> cancel) {
12 m_error = error;
13 m_cancel = cancel;
14 }
15
16 public void Reject(Exception error) {
17 if (LockCancelation())
18 HandleErrorInternal(error);
19 }
20
21
22
23 public override void CancelOperation(Exception reason) {
24 if (m_cancel != null && LockCancelation()) {
25 try {
26 Observe(m_cancel(reason));
27 } catch(Exception err) {
28 HandleErrorInternal(err);
29 }
30 }
31
32 }
33
34 protected void HandleErrorInternal(Exception error) {
35 if (m_error != null) {
36 try {
37 Observe(m_error(error));
38 } catch(Exception err) {
39 SetError(err);
40 }
41 } else {
42 SetError(error);
43 }
44 }
45
46 protected void Observe(IPromise operation) {
47 if (operation == null)
48 throw new NullReferenceException("The task returned null promise");
49
50 // pass operation results to the current promise
51 operation.On(SetResult, SetError, SetCancelled);
52
53 // pass the cancelation request
54 CancellationRequested(operation.Cancel);
55 }
56
57 protected bool LockCancelation() {
58 return 0 == Interlocked.CompareExchange(ref m_cancelationLock, 1, 0);
59 }
60 }
61 }
62
@@ -0,0 +1,23
1 using System;
2
3 namespace Implab {
4 public class ActionChainTask<T> : ActionChainTaskBase, IDeferred<T> {
5 readonly Func<T, IPromise> m_task;
6
7 public ActionChainTask(Func<T, IPromise> task, Func<Exception, IPromise> error, Func<Exception, IPromise> cancel) : base(error,cancel) {
8 m_task = task;
9 }
10
11 public void Resolve(T value) {
12 if (m_task != null && LockCancelation()) {
13 try {
14 Observe(m_task(value));
15 } catch(Exception err) {
16 HandleErrorInternal(err);
17 }
18 }
19 }
20
21 }
22 }
23
@@ -0,0 +1,22
1 using System;
2
3 namespace Implab {
4 public class ActionTask : ActionTaskBase, IDeferred {
5 readonly Action m_task;
6 public ActionTask(Action task, Action<Exception> error, Action<Exception> cancel) : base(error,cancel) {
7 m_task = task;
8 }
9
10 public void Resolve() {
11 if (m_task != null && LockCancelation()) {
12 try {
13 m_task();
14 SetResult();
15 } catch(Exception err) {
16 HandleErrorInternal(err);
17 }
18 }
19 }
20 }
21 }
22
@@ -0,0 +1,55
1 using System;
2 using System.Threading;
3
4 namespace Implab {
5 public class ActionTaskBase : AbstractPromise {
6 readonly Action<Exception> m_cancel;
7 readonly Action<Exception> m_error;
8
9 int m_cancelationLock;
10
11 protected ActionTaskBase( Action<Exception> error, Action<Exception> cancel) {
12 m_error = error;
13 m_cancel = cancel;
14 }
15
16 public void Reject(Exception error) {
17 Safe.ArgumentNotNull(error, "error");
18 if (LockCancelation())
19 HandleErrorInternal(error);
20 }
21
22 protected void HandleErrorInternal(Exception error) {
23 if (m_error != null) {
24 try {
25 m_error(error);
26 SetResult();
27 } catch(Exception err) {
28 SetError(err);
29 }
30 } else {
31 SetError(error);
32 }
33 }
34
35 public override void CancelOperation(Exception reason) {
36 if (LockCancelation()) {
37 if (m_cancel != null) {
38 try {
39 m_cancel(reason);
40 SetResult();
41 } catch (Exception err) {
42 HandleErrorInternal(err);
43 }
44 } else {
45 SetCancelled(reason);
46 }
47 }
48 }
49
50 protected bool LockCancelation() {
51 return 0 == Interlocked.CompareExchange(ref m_cancelationLock, 1, 0);
52 }
53 }
54 }
55
@@ -0,0 +1,22
1 using System;
2
3 namespace Implab {
4 public class ActionTask<T> : ActionTaskBase, IDeferred<T> {
5 readonly Action<T> m_task;
6 public ActionTask(Action<T> task, Action<Exception> error, Action<Exception> cancel) : base(error,cancel) {
7 m_task = task;
8 }
9
10 public void Resolve(T value) {
11 if (m_task != null && LockCancelation()) {
12 try {
13 m_task(value);
14 SetResult();
15 } catch(Exception err) {
16 HandleErrorInternal(err);
17 }
18 }
19 }
20 }
21 }
22
@@ -0,0 +1,21
1 using System;
2
3 namespace Implab {
4 public class FuncChainTask<TResult> : FuncChainTaskBase<TResult>, IDeferred {
5 readonly Func<IPromise<TResult>> m_task;
6
7 public FuncChainTask(Func<IPromise<TResult>> task, Func<Exception, IPromise<TResult>> error, Func<Exception, IPromise<TResult>> cancel) : base(error, cancel){
8 m_task = task;
9 }
10
11 public void Resolve() {
12 if (m_task != null && LockCancelation()) {
13 try {
14 Observe(m_task());
15 } catch (Exception err) {
16 HandleErrorInternal(err);
17 }
18 }
19 }
20 }
21 } No newline at end of file
@@ -0,0 +1,60
1 using System;
2 using System.Threading;
3
4 namespace Implab {
5 public class FuncChainTaskBase<TResult> : AbstractPromise<TResult> {
6 readonly Func<Exception, IPromise<TResult>> m_error;
7 readonly Func<Exception, IPromise<TResult>> m_cancel;
8
9 int m_cancelationLock;
10
11 protected FuncChainTaskBase( Func<Exception, IPromise<TResult>> error, Func<Exception, IPromise<TResult>> cancel) {
12 m_error = error;
13 m_cancel = cancel;
14 }
15
16 public void Reject(Exception error) {
17 if (LockCancelation())
18 HandleErrorInternal(error);
19 }
20
21 public override void CancelOperation(Exception reason) {
22 if (m_cancel != null && LockCancelation()) {
23 try {
24 Observe(m_cancel(reason));
25 } catch(Exception err) {
26 HandleErrorInternal(err);
27 }
28 }
29
30 }
31
32 protected void HandleErrorInternal(Exception error) {
33 if (m_error != null) {
34 try {
35 Observe(m_error(error));
36 } catch(Exception err) {
37 SetError(err);
38 }
39 } else {
40 SetError(error);
41 }
42 }
43
44 protected void Observe(IPromise<TResult> operation) {
45 if (operation == null)
46 throw new NullReferenceException("The task returned null promise");
47
48 // pass operation results to the current promise
49 operation.On(SetResult, SetError, SetCancelled);
50
51 // pass the cancelation request
52 CancellationRequested(operation.Cancel);
53 }
54
55 protected bool LockCancelation() {
56 return 0 == Interlocked.CompareExchange(ref m_cancelationLock, 1, 0);
57 }
58 }
59 }
60
@@ -0,0 +1,21
1 using System;
2
3 namespace Implab {
4 public class FuncChainTask<TArg,TResult> : FuncChainTaskBase<TResult>, IDeferred<TArg> {
5 readonly Func<TArg, IPromise<TResult>> m_task;
6
7 public FuncChainTask(Func<TArg, IPromise<TResult>> task, Func<Exception, IPromise<TResult>> error, Func<Exception, IPromise<TResult>> cancel) : base(error, cancel){
8 m_task = task;
9 }
10
11 public void Resolve(TArg value) {
12 if (m_task != null && LockCancelation()) {
13 try {
14 Observe(m_task(value));
15 } catch (Exception err) {
16 HandleErrorInternal(err);
17 }
18 }
19 }
20 }
21 } No newline at end of file
@@ -0,0 +1,36
1 using System;
2
3 namespace Implab {
4 public interface ICancellationToken {
5 /// <summary>
6 /// Indicates wherther the cancellation was requested.
7 /// </summary>
8 bool IsCancellationRequested { get ; }
9
10 /// <summary>
11 /// The reason why the operation should be cancelled.
12 /// </summary>
13 Exception CancellationReason { get ; }
14
15 /// <summary>
16 /// Accepts if requested.
17 /// </summary>
18 /// <returns><c>true</c>, if if requested was accepted, <c>false</c> otherwise.</returns>
19 bool CancelOperationIfRequested();
20
21 /// <summary>
22 /// Sets the token to cancelled state.
23 /// </summary>
24 /// <param name="reason">The reason why the operation was cancelled.</param>
25 void CancelOperation(Exception reason);
26
27 /// <summary>
28 /// Adds the listener for the cancellation request, is the cancellation was requested the <paramref name="handler"/>
29 /// is executed immediatelly.
30 /// </summary>
31 /// <param name="handler">The handler which will be executed if the cancel occurs.</param>
32 void CancellationRequested(Action<Exception> handler);
33
34 }
35 }
36
@@ -0,0 +1,111
1 using System;
2
3 namespace Implab {
4 public class SuccessPromise : IPromise {
5 #region IPromise implementation
6
7 public IPromise On(Action success, Action<Exception> error, Action<Exception> cancel) {
8 if (success != null) {
9 try {
10 success();
11 } catch(Exception err) {
12 if (error != null) {
13 try {
14 error(err);
15 // Analysis disable once EmptyGeneralCatchClause
16 } catch {
17 }
18 }
19 }
20 }
21 return this;
22 }
23
24 public IPromise On(Action success, Action<Exception> error) {
25 if (success != null) {
26 try {
27 success();
28 } catch(Exception err) {
29 if (error != null) {
30 try {
31 error(err);
32 // Analysis disable once EmptyGeneralCatchClause
33 } catch {
34 }
35 }
36 }
37 }
38 return this;
39 }
40
41 public IPromise On(Action success) {
42 if (success != null) {
43 try {
44 success();
45 // Analysis disable once EmptyGeneralCatchClause
46 } catch {
47 }
48 }
49 return this;
50 }
51
52 public IPromise On(Action handler, PromiseEventType events) {
53 if (handler != null && events.HasFlag(PromiseEventType.Success)) {
54 try {
55 handler();
56 // Analysis disable once EmptyGeneralCatchClause
57 } catch {
58 }
59 }
60 return this;
61 }
62
63 public IPromise<T> Cast<T>() {
64 throw new InvalidCastException();
65 }
66
67 public void Join() {
68 }
69
70 public void Join(int timeout) {
71 }
72
73 public Type PromiseType {
74 get {
75 return typeof(void);
76 }
77 }
78
79 public bool IsResolved {
80 get {
81 return true;
82 }
83 }
84
85 public bool IsCancelled {
86 get {
87 return false;
88 }
89 }
90
91 public Exception Error {
92 get {
93 return null;
94 }
95 }
96
97 #endregion
98
99 #region ICancellable implementation
100
101 public void Cancel() {
102 }
103
104 public void Cancel(Exception reason) {
105 }
106
107 #endregion
108
109 }
110 }
111
@@ -0,0 +1,177
1 using System;
2
3 namespace Implab {
4 public class SuccessPromise<T> : IPromise<T> {
5 readonly T m_value;
6
7 public SuccessPromise(T value){
8 m_value = value;
9 }
10
11 public IPromise<T> On(Action<T> success, Action<Exception> error, Action<Exception> cancel) {
12 if (success != null) {
13 try {
14 success(m_value);
15 } catch(Exception err) {
16 if (error != null) {
17 try {
18 error(err);
19 // Analysis disable once EmptyGeneralCatchClause
20 } catch {
21 }
22 }
23 }
24 }
25 return this;
26 }
27
28 public IPromise<T> On(Action<T> success, Action<Exception> error) {
29 if (success != null) {
30 try {
31 success(m_value);
32 } catch(Exception err) {
33 if (error != null) {
34 try {
35 error(err);
36 // Analysis disable once EmptyGeneralCatchClause
37 } catch {
38 }
39 }
40 }
41 }
42 return this;
43 }
44
45 public IPromise<T> On(Action<T> success) {
46 if (success != null) {
47 try {
48 success(m_value);
49 // Analysis disable once EmptyGeneralCatchClause
50 } catch {
51 }
52 }
53 return this;
54 }
55
56 public T Join() {
57 return m_value;
58 }
59
60 public T Join(int timeout) {
61 return m_value;
62 }
63
64 public IPromise<T> On(Action success, Action<Exception> error, Action<Exception> cancel) {
65 if (success != null) {
66 try {
67 success();
68 } catch(Exception err) {
69 if (error != null) {
70 try {
71 error(err);
72 // Analysis disable once EmptyGeneralCatchClause
73 } catch {
74 }
75 }
76 }
77 }
78 return this;
79 }
80
81 public IPromise<T> On(Action success, Action<Exception> error) {
82 if (success != null) {
83 try {
84 success();
85 } catch(Exception err) {
86 if (error != null) {
87 try {
88 error(err);
89 // Analysis disable once EmptyGeneralCatchClause
90 } catch {
91 }
92 }
93 }
94 }
95 return this;
96 }
97
98 public IPromise<T> On(Action success) {
99 if (success != null) {
100 try {
101 success();
102 // Analysis disable once EmptyGeneralCatchClause
103 } catch {
104 }
105 }
106 return this;
107 }
108
109 public IPromise<T> On(Action handler, PromiseEventType events) {
110 if (handler != null && events.HasFlag(PromiseEventType.Success)) {
111 try {
112 handler();
113 // Analysis disable once EmptyGeneralCatchClause
114 } catch {
115 }
116 }
117 return this;
118 }
119
120 IPromise IPromise.On(Action success, Action<Exception> error, Action<Exception> cancel) {
121 return On(success, error, cancel);
122 }
123
124 IPromise IPromise.On(Action success, Action<Exception> error) {
125 return On(success, error);
126 }
127
128 IPromise IPromise.On(Action success) {
129 return On(success);
130 }
131
132 IPromise IPromise.On(Action handler, PromiseEventType events) {
133 return On(handler, events);
134 }
135
136 public IPromise<T2> Cast<T2>() {
137 return new SuccessPromise<T2>((T2)(object)m_value);
138 }
139
140 void IPromise.Join() {
141 }
142
143 void IPromise.Join(int timeout) {
144 }
145
146 public Type PromiseType {
147 get {
148 return typeof(T);
149 }
150 }
151
152 public bool IsResolved {
153 get {
154 return true;
155 }
156 }
157
158 public bool IsCancelled {
159 get {
160 return false;
161 }
162 }
163
164 public Exception Error {
165 get {
166 return null;
167 }
168 }
169
170 public void Cancel() {
171 }
172
173 public void Cancel(Exception reason) {
174 }
175 }
176 }
177
@@ -1,38 +1,38
1 1 using System.Windows.Forms;
2 2 using System;
3 3
4 4
5 5 namespace Implab.Fx {
6 6 public class ControlBoundPromise<T> : Promise<T> {
7 7 readonly Control m_target;
8 8
9 9 public ControlBoundPromise(Control target) {
10 10 Safe.ArgumentNotNull(target, "target");
11 11
12 12 m_target = target;
13 13 }
14 14
15 protected override void SignalSuccess(IDeferred<T> handler) {
15 protected override void SignalSuccess(Promise<T>.HandlerDescriptor handler) {
16 16 if (m_target.InvokeRequired)
17 m_target.BeginInvoke(new Action<IDeferred<T>>(base.SignalSuccess), handler);
17 m_target.BeginInvoke(new Action<Promise<T>.HandlerDescriptor>(base.SignalSuccess), handler);
18 18 else
19 19 base.SignalSuccess(handler);
20 20 }
21 21
22 protected override void SignalCancelled(IDeferred<T> handler, Exception reason) {
22 protected override void SignalCancelled(Promise<T>.HandlerDescriptor handler, Exception reason) {
23 23 if (m_target.InvokeRequired)
24 m_target.BeginInvoke(new Action<IDeferred<T>,Exception>(base.SignalCancelled), handler, reason);
24 m_target.BeginInvoke(new Action<Promise<T>.HandlerDescriptor,Exception>(base.SignalCancelled), handler, reason);
25 25 else
26 26 base.SignalCancelled(handler, reason);
27 27 }
28 28
29 protected override void SignalError(IDeferred<T> handler, Exception error) {
29 protected override void SignalError(Promise<T>.HandlerDescriptor handler, Exception error) {
30 30 if (m_target.InvokeRequired)
31 m_target.BeginInvoke(new Action<IDeferred<T>,Exception>(base.SignalError), handler, error);
31 m_target.BeginInvoke(new Action<Promise<T>.HandlerDescriptor,Exception>(base.SignalError), handler, error);
32 32 else
33 33 base.SignalError(handler, error);
34 34 }
35 35
36 36 }
37 37 }
38 38
@@ -1,852 +1,852
1 1 using System;
2 2 using System.Reflection;
3 3 using System.Threading;
4 4 using Implab.Parallels;
5 5
6 6 #if MONO
7 7
8 8 using NUnit.Framework;
9 9 using TestClassAttribute = NUnit.Framework.TestFixtureAttribute;
10 using TestMethod = NUnit.Framework.TestAttribute;
10 using TestMethodAttribute = NUnit.Framework.TestAttribute;
11 11
12 12 #else
13 13
14 14 using Microsoft.VisualStudio.TestTools.UnitTesting;
15 15
16 16 #endif
17 17
18 18 namespace Implab.Test {
19 19 [TestClass]
20 20 public class AsyncTests {
21 21 [TestMethod]
22 22 public void ResolveTest() {
23 23 int res = -1;
24 24 var p = new Promise<int>();
25 25 p.Then(x => res = x);
26 26 p.Resolve(100);
27 27
28 28 Assert.AreEqual(100, res);
29 29 }
30 30
31 31 [TestMethod]
32 32 public void RejectTest() {
33 33 int res = -1;
34 34 Exception err = null;
35 35
36 36 var p = new Promise<int>();
37 37 p.Then(
38 38 x => res = x,
39 39 e => {
40 40 err = e;
41 41 return -2;
42 42 }
43 43 );
44 44 p.Reject(new ApplicationException("error"));
45 45
46 46 Assert.AreEqual(res, -1);
47 47 Assert.AreEqual(err.Message, "error");
48 48
49 49 }
50 50
51 51 [TestMethod]
52 52 public void CancelExceptionTest() {
53 53 var p = new Promise<bool>();
54 p.Cancel();
54 p.CancelOperation(null);
55 55
56 56 var p2 = p.Then(x => x, null, reason => {
57 57 throw new ApplicationException("CANCELLED");
58 58 });
59 59
60 60 try {
61 61 p2.Join();
62 62 Assert.Fail();
63 63 } catch (ApplicationException err) {
64 64 Assert.AreEqual("CANCELLED", err.InnerException.Message);
65 65 }
66 66
67 67 }
68 68
69 69 [TestMethod]
70 70 public void ContinueOnCancelTest() {
71 71 var p = new Promise<bool>();
72 p.Cancel();
72 p.CancelOperation(null);
73 73
74 74 var p2 = p
75 .Then<bool>(x => x, null, reason => {
75 .Then(x => x, null, reason => {
76 76 throw new ApplicationException("CANCELLED");
77 77 })
78 78 .Then(x => x, e => true);
79 79
80 80 Assert.AreEqual(true, p2.Join());
81 81 }
82 82
83 83 [TestMethod]
84 84 public void JoinSuccessTest() {
85 85 var p = new Promise<int>();
86 86 p.Resolve(100);
87 87 Assert.AreEqual(p.Join(), 100);
88 88 }
89 89
90 90 [TestMethod]
91 91 public void JoinFailTest() {
92 92 var p = new Promise<int>();
93 93 p.Reject(new ApplicationException("failed"));
94 94
95 95 try {
96 96 p.Join();
97 97 throw new ApplicationException("WRONG!");
98 98 } catch (TargetInvocationException err) {
99 99 Assert.AreEqual(err.InnerException.Message, "failed");
100 100 } catch {
101 101 Assert.Fail("Got wrong excaption");
102 102 }
103 103 }
104 104
105 105 [TestMethod]
106 106 public void MapTest() {
107 107 var p = new Promise<int>();
108 108
109 109 var p2 = p.Then(x => x.ToString());
110 110 p.Resolve(100);
111 111
112 112 Assert.AreEqual(p2.Join(), "100");
113 113 }
114 114
115 115 [TestMethod]
116 116 public void FixErrorTest() {
117 117 var p = new Promise<int>();
118 118
119 119 var p2 = p.Then(x => x, e => 101);
120 120
121 121 p.Reject(new Exception());
122 122
123 123 Assert.AreEqual(p2.Join(), 101);
124 124 }
125 125
126 126 [TestMethod]
127 127 public void ChainTest() {
128 128 var p1 = new Promise<int>();
129 129
130 130 var p3 = p1.Chain(x => {
131 131 var p2 = new Promise<string>();
132 132 p2.Resolve(x.ToString());
133 133 return p2;
134 134 });
135 135
136 136 p1.Resolve(100);
137 137
138 138 Assert.AreEqual(p3.Join(), "100");
139 139 }
140 140
141 141 [TestMethod]
142 142 public void ChainFailTest() {
143 143 var p1 = new Promise<int>();
144 144
145 145 var p3 = p1.Chain(x => {
146 146 var p2 = new Promise<string>();
147 147 p2.Reject(new Exception("DIE!!!"));
148 148 return p2;
149 149 });
150 150
151 151 p1.Resolve(100);
152 152
153 153 Assert.IsTrue(p3.IsResolved);
154 154 }
155 155
156 156 [TestMethod]
157 157 public void PoolTest() {
158 158 var pid = Thread.CurrentThread.ManagedThreadId;
159 159 var p = AsyncPool.Invoke(() => Thread.CurrentThread.ManagedThreadId);
160 160
161 161 Assert.AreNotEqual(pid, p.Join());
162 162 }
163 163
164 164 [TestMethod]
165 165 public void WorkerPoolSizeTest() {
166 166 var pool = new WorkerPool(5, 10, 1);
167 167
168 168 Assert.AreEqual(5, pool.PoolSize);
169 169
170 170 pool.Invoke(() => { Thread.Sleep(100000000); return 10; });
171 171 pool.Invoke(() => { Thread.Sleep(100000000); return 10; });
172 172 pool.Invoke(() => { Thread.Sleep(100000000); return 10; });
173 173
174 174 Assert.AreEqual(5, pool.PoolSize);
175 175
176 176 for (int i = 0; i < 100; i++)
177 177 pool.Invoke(() => { Thread.Sleep(100000000); return 10; });
178 178 Thread.Sleep(200);
179 179 Assert.AreEqual(10, pool.PoolSize);
180 180
181 181 pool.Dispose();
182 182 }
183 183
184 184 [TestMethod]
185 185 public void WorkerPoolCorrectTest() {
186 186 var pool = new WorkerPool(0,1000,100);
187 187
188 188 const int iterations = 1000;
189 189 int pending = iterations;
190 190 var stop = new ManualResetEvent(false);
191 191
192 192 var count = 0;
193 193 for (int i = 0; i < iterations; i++) {
194 194 pool
195 195 .Invoke(() => 1)
196 196 .Then(x => Interlocked.Add(ref count, x))
197 197 .Then(x => Math.Log10(x))
198 198 .On(() => {
199 199 Interlocked.Decrement(ref pending);
200 200 if (pending == 0)
201 201 stop.Set();
202 202 }, PromiseEventType.All);
203 203 }
204 204
205 205 stop.WaitOne();
206 206
207 207 Assert.AreEqual(iterations, count);
208 208 Console.WriteLine("Max threads: {0}", pool.MaxRunningThreads);
209 209 pool.Dispose();
210 210
211 211 }
212 212
213 213 [TestMethod]
214 214 public void WorkerPoolDisposeTest() {
215 215 var pool = new WorkerPool(5, 20);
216 216 Assert.AreEqual(5, pool.PoolSize);
217 217 pool.Dispose();
218 218 Thread.Sleep(500);
219 219 Assert.AreEqual(0, pool.PoolSize);
220 220 pool.Dispose();
221 221 }
222 222
223 223 [TestMethod]
224 224 public void MTQueueTest() {
225 225 var queue = new MTQueue<int>();
226 226 int res;
227 227
228 228 queue.Enqueue(10);
229 229 Assert.IsTrue(queue.TryDequeue(out res));
230 230 Assert.AreEqual(10, res);
231 231 Assert.IsFalse(queue.TryDequeue(out res));
232 232
233 233 for (int i = 0; i < 1000; i++)
234 234 queue.Enqueue(i);
235 235
236 236 for (int i = 0; i < 1000; i++) {
237 237 queue.TryDequeue(out res);
238 238 Assert.AreEqual(i, res);
239 239 }
240 240
241 241 int writers = 0;
242 242 int readers = 0;
243 243 var stop = new ManualResetEvent(false);
244 244 int total = 0;
245 245
246 246 const int itemsPerWriter = 10000;
247 247 const int writersCount = 10;
248 248
249 249 for (int i = 0; i < writersCount; i++) {
250 250 Interlocked.Increment(ref writers);
251 251 AsyncPool
252 252 .RunThread(() => {
253 253 for (int ii = 0; ii < itemsPerWriter; ii++) {
254 254 queue.Enqueue(1);
255 255 }
256 256 return 1;
257 257 })
258 258 .On(() => Interlocked.Decrement(ref writers), PromiseEventType.All);
259 259 }
260 260
261 261 for (int i = 0; i < 10; i++) {
262 262 Interlocked.Increment(ref readers);
263 263 AsyncPool
264 264 .RunThread(() => {
265 265 int t;
266 266 do {
267 267 while (queue.TryDequeue(out t))
268 268 Interlocked.Add(ref total, t);
269 269 } while (writers > 0);
270 270 return 1;
271 271 })
272 272 .On(() => {
273 273 Interlocked.Decrement(ref readers);
274 274 if (readers == 0)
275 275 stop.Set();
276 276 }, PromiseEventType.All);
277 277 }
278 278
279 279 stop.WaitOne();
280 280
281 281 Assert.AreEqual(100000, total);
282 282 }
283 283
284 284 [TestMethod]
285 285 public void AsyncQueueTest() {
286 286 var queue = new AsyncQueue<int>();
287 287 int res;
288 288
289 289 queue.Enqueue(10);
290 290 Assert.IsTrue(queue.TryDequeue(out res));
291 291 Assert.AreEqual(10, res);
292 292 Assert.IsFalse(queue.TryDequeue(out res));
293 293
294 294 for (int i = 0; i < 1000; i++)
295 295 queue.Enqueue(i);
296 296
297 297 for (int i = 0; i < 1000; i++) {
298 298 queue.TryDequeue(out res);
299 299 Assert.AreEqual(i, res);
300 300 }
301 301
302 302 const int count = 10000000;
303 303
304 304 int res1 = 0, res2 = 0;
305 305 var t1 = Environment.TickCount;
306 306
307 307 AsyncPool.RunThread(
308 308 () => {
309 309 for (var i = 0; i < count; i++)
310 310 queue.Enqueue(1);
311 311 Console.WriteLine("done writer #1: {0} ms", Environment.TickCount - t1);
312 312 },
313 313 () => {
314 314 for (var i = 0; i < count; i++)
315 315 queue.Enqueue(2);
316 316 Console.WriteLine("done writer #2: {0} ms", Environment.TickCount - t1);
317 317 },
318 318 () => {
319 319 int temp;
320 320 int i = 0;
321 321 while (i < count)
322 322 if (queue.TryDequeue(out temp)) {
323 323 i++;
324 324 res1 += temp;
325 325 }
326 326 Console.WriteLine("done reader #1: {0} ms", Environment.TickCount - t1);
327 327 },
328 328 () => {
329 329 int temp;
330 330 int i = 0;
331 331 while (i < count)
332 332 if (queue.TryDequeue(out temp)) {
333 333 i++;
334 334 res2 += temp;
335 335 }
336 336 Console.WriteLine("done reader #2: {0} ms", Environment.TickCount - t1);
337 337 }
338 338 )
339 339 .Bundle()
340 340 .Join();
341 341
342 342 Assert.AreEqual(count * 3, res1 + res2);
343 343
344 344 Console.WriteLine(
345 345 "done: {0} ms, summ#1: {1}, summ#2: {2}, total: {3}, count: {4}",
346 346 Environment.TickCount - t1,
347 347 res1,
348 348 res2,
349 349 res1 + res2,
350 350 count
351 351 );
352 352 }
353 353
354 354 [TestMethod]
355 355 public void AsyncQueueBatchTest() {
356 356 var queue = new AsyncQueue<int>();
357 357
358 358 const int wBatch = 29;
359 359 const int wCount = 400000;
360 360 const int total = wBatch * wCount * 2;
361 361 const int summ = wBatch * wCount * 3;
362 362
363 363 int r1 = 0, r2 = 0;
364 364 const int rBatch = 111;
365 365 int read = 0;
366 366
367 367 var t1 = Environment.TickCount;
368 368
369 369 AsyncPool.RunThread(
370 370 () => {
371 371 var buffer = new int[wBatch];
372 372 for(int i = 0; i<wBatch; i++)
373 373 buffer[i] = 1;
374 374
375 375 for(int i =0; i < wCount; i++)
376 376 queue.EnqueueRange(buffer,0,wBatch);
377 377 Console.WriteLine("done writer #1: {0} ms", Environment.TickCount - t1);
378 378 },
379 379 () => {
380 380 var buffer = new int[wBatch];
381 381 for(int i = 0; i<wBatch; i++)
382 382 buffer[i] = 2;
383 383
384 384 for(int i =0; i < wCount; i++)
385 385 queue.EnqueueRange(buffer,0,wBatch);
386 386 Console.WriteLine("done writer #2: {0} ms", Environment.TickCount - t1);
387 387 },
388 388 () => {
389 389 var buffer = new int[rBatch];
390 390
391 391 while(read < total) {
392 392 int actual;
393 393 if (queue.TryDequeueRange(buffer,0,rBatch,out actual)) {
394 394 for(int i=0; i< actual; i++)
395 395 r1 += buffer[i];
396 396 Interlocked.Add(ref read, actual);
397 397 }
398 398 }
399 399
400 400 Console.WriteLine("done reader #1: {0} ms", Environment.TickCount - t1);
401 401 },
402 402 () => {
403 403 var buffer = new int[rBatch];
404 404
405 405 while(read < total) {
406 406 int actual;
407 407 if (queue.TryDequeueRange(buffer,0,rBatch,out actual)) {
408 408 for(int i=0; i< actual; i++)
409 409 r2 += buffer[i];
410 410 Interlocked.Add(ref read, actual);
411 411 }
412 412 }
413 413
414 414 Console.WriteLine("done reader #2: {0} ms", Environment.TickCount - t1);
415 415 }
416 416 )
417 417 .Bundle()
418 418 .Join();
419 419
420 420 Assert.AreEqual(summ , r1 + r2);
421 421
422 422 Console.WriteLine(
423 423 "done: {0} ms, summ#1: {1}, summ#2: {2}, total: {3}, count: {4}",
424 424 Environment.TickCount - t1,
425 425 r1,
426 426 r2,
427 427 r1 + r2,
428 428 total
429 429 );
430 430 }
431 431
432 432 [TestMethod]
433 433 public void AsyncQueueChunkDequeueTest() {
434 434 var queue = new AsyncQueue<int>();
435 435
436 436 const int wBatch = 31;
437 437 const int wCount = 200000;
438 438 const int total = wBatch * wCount * 3;
439 439 const int summ = wBatch * wCount * 6;
440 440
441 441 int r1 = 0, r2 = 0;
442 442 const int rBatch = 1024;
443 443 int read = 0;
444 444
445 445 var t1 = Environment.TickCount;
446 446
447 447 AsyncPool.RunThread(
448 448 () => {
449 449 var buffer = new int[wBatch];
450 450 for(int i = 0; i<wBatch; i++)
451 451 buffer[i] = 1;
452 452
453 453 for(int i =0; i < wCount; i++)
454 454 queue.EnqueueRange(buffer,0,wBatch);
455 455 Console.WriteLine("done writer #1: {0} ms", Environment.TickCount - t1);
456 456 },
457 457 () => {
458 458 var buffer = new int[wBatch];
459 459 for(int i = 0; i<wBatch; i++)
460 460 buffer[i] = 2;
461 461
462 462 for(int i =0; i < wCount; i++)
463 463 queue.EnqueueRange(buffer,0,wBatch);
464 464 Console.WriteLine("done writer #2: {0} ms", Environment.TickCount - t1);
465 465 },
466 466 () => {
467 467 var buffer = new int[wBatch];
468 468 for(int i = 0; i<wBatch; i++)
469 469 buffer[i] = 3;
470 470
471 471 for(int i =0; i < wCount; i++)
472 472 queue.EnqueueRange(buffer,0,wBatch);
473 473 Console.WriteLine("done writer #3: {0} ms", Environment.TickCount - t1);
474 474 },
475 475 () => {
476 476 var buffer = new int[rBatch];
477 477 int count = 1;
478 478 double avgchunk = 0;
479 479 while(read < total) {
480 480 int actual;
481 481 if (queue.TryDequeueChunk(buffer,0,rBatch,out actual)) {
482 482 for(int i=0; i< actual; i++)
483 483 r2 += buffer[i];
484 484 Interlocked.Add(ref read, actual);
485 485 avgchunk = avgchunk*(count-1)/count + actual/(double)count;
486 486 count ++;
487 487 }
488 488 }
489 489
490 490 Console.WriteLine("done reader #2: {0} ms, avg chunk size: {1}", Environment.TickCount - t1, avgchunk);
491 491 }
492 492 )
493 493 .Bundle()
494 494 .Join();
495 495
496 496 Assert.AreEqual(summ , r1 + r2);
497 497
498 498 Console.WriteLine(
499 499 "done: {0} ms, summ#1: {1}, summ#2: {2}, total: {3}, count: {4}",
500 500 Environment.TickCount - t1,
501 501 r1,
502 502 r2,
503 503 r1 + r2,
504 504 total
505 505 );
506 506 }
507 507
508 508 [TestMethod]
509 509 public void AsyncQueueDrainTest() {
510 510 var queue = new AsyncQueue<int>();
511 511
512 512 const int wBatch = 11;
513 513 const int wCount = 200000;
514 514 const int total = wBatch * wCount * 3;
515 515 const int summ = wBatch * wCount * 3;
516 516
517 517 int r1 = 0, r2 = 0;
518 518 const int rBatch = 11;
519 519 int read = 0;
520 520
521 521 var t1 = Environment.TickCount;
522 522
523 523 AsyncPool.RunThread(
524 524 () => {
525 525 var buffer = new int[wBatch];
526 526 for(int i = 0; i<wBatch; i++)
527 527 buffer[i] = 1;
528 528
529 529 for(int i =0; i < wCount; i++)
530 530 queue.EnqueueRange(buffer,0,wBatch);
531 531 Console.WriteLine("done writer #1: {0} ms", Environment.TickCount - t1);
532 532 },
533 533 () => {
534 534 for(int i =0; i < wCount * wBatch; i++)
535 535 queue.Enqueue(1);
536 536 Console.WriteLine("done writer #2: {0} ms", Environment.TickCount - t1);
537 537 },
538 538 () => {
539 539 var buffer = new int[wBatch];
540 540 for(int i = 0; i<wBatch; i++)
541 541 buffer[i] = 1;
542 542
543 543 for(int i =0; i < wCount; i++)
544 544 queue.EnqueueRange(buffer,0,wBatch);
545 545 Console.WriteLine("done writer #3: {0} ms", Environment.TickCount - t1);
546 546 },
547 547 /*() => {
548 548 int temp;
549 549 int count = 0;
550 550 while (read < total)
551 551 if (queue.TryDequeue(out temp)) {
552 552 count++;
553 553 r1 += temp;
554 554 Interlocked.Increment(ref read);
555 555 }
556 556 Console.WriteLine("done reader #1: {0} ms, {1} count", Environment.TickCount - t1, count);
557 557 },*/
558 558 /*() => {
559 559 var buffer = new int[rBatch];
560 560 var count = 0;
561 561 while(read < total) {
562 562 int actual;
563 563 if (queue.TryDequeueRange(buffer,0,rBatch,out actual)) {
564 564 for(int i=0; i< actual; i++)
565 565 r1 += buffer[i];
566 566 Interlocked.Add(ref read, actual);
567 567 count += actual;
568 568 }
569 569 }
570 570
571 571 Console.WriteLine("done reader #1: {0} ms, {1} items", Environment.TickCount - t1, count);
572 572 },*/
573 573 () => {
574 574 var count = 0;
575 575 while(read < total) {
576 576 var buffer = queue.Drain();
577 577 for(int i=0; i< buffer.Length; i++)
578 578 r1 += buffer[i];
579 579 Interlocked.Add(ref read, buffer.Length);
580 580 count += buffer.Length;
581 581 }
582 582 Console.WriteLine("done reader #1: {0} ms, {1} items", Environment.TickCount - t1, count);
583 583 },
584 584 () => {
585 585 var count = 0;
586 586 while(read < total) {
587 587 var buffer = queue.Drain();
588 588 for(int i=0; i< buffer.Length; i++)
589 589 r2 += buffer[i];
590 590 Interlocked.Add(ref read, buffer.Length);
591 591 count += buffer.Length;
592 592 }
593 593 Console.WriteLine("done reader #2: {0} ms, {1} items", Environment.TickCount - t1, count);
594 594 }
595 595 )
596 596 .Bundle()
597 597 .Join();
598 598
599 599 Assert.AreEqual(summ , r1 + r2);
600 600
601 601 Console.WriteLine(
602 602 "done: {0} ms, summ#1: {1}, summ#2: {2}, total: {3}, count: {4}",
603 603 Environment.TickCount - t1,
604 604 r1,
605 605 r2,
606 606 r1 + r2,
607 607 total
608 608 );
609 609 }
610 610
611 611 [TestMethod]
612 612 public void ParallelMapTest() {
613 613
614 614 const int count = 100000;
615 615
616 616 var args = new double[count];
617 617 var rand = new Random();
618 618
619 619 for (int i = 0; i < count; i++)
620 620 args[i] = rand.NextDouble();
621 621
622 622 var t = Environment.TickCount;
623 623 var res = args.ParallelMap(x => Math.Sin(x*x), 4).Join();
624 624
625 625 Console.WriteLine("Map complete in {0} ms", Environment.TickCount - t);
626 626
627 627 t = Environment.TickCount;
628 628 for (int i = 0; i < count; i++)
629 629 Assert.AreEqual(Math.Sin(args[i] * args[i]), res[i]);
630 630 Console.WriteLine("Verified in {0} ms", Environment.TickCount - t);
631 631 }
632 632
633 633 [TestMethod]
634 634 public void ChainedMapTest() {
635 635
636 636 using (var pool = new WorkerPool()) {
637 637 const int count = 10000;
638 638
639 639 var args = new double[count];
640 640 var rand = new Random();
641 641
642 642 for (int i = 0; i < count; i++)
643 643 args[i] = rand.NextDouble();
644 644
645 645 var t = Environment.TickCount;
646 646 var res = args
647 647 .ChainedMap(
648 648 // Analysis disable once AccessToDisposedClosure
649 649 x => pool.Invoke(
650 650 () => Math.Sin(x * x)
651 651 ),
652 652 4
653 653 )
654 654 .Join();
655 655
656 656 Console.WriteLine("Map complete in {0} ms", Environment.TickCount - t);
657 657
658 658 t = Environment.TickCount;
659 659 for (int i = 0; i < count; i++)
660 660 Assert.AreEqual(Math.Sin(args[i] * args[i]), res[i]);
661 661 Console.WriteLine("Verified in {0} ms", Environment.TickCount - t);
662 662 Console.WriteLine("Max workers: {0}", pool.MaxRunningThreads);
663 663 }
664 664 }
665 665
666 666 [TestMethod]
667 667 public void ParallelForEachTest() {
668 668
669 669 const int count = 100000;
670 670
671 671 var args = new int[count];
672 672 var rand = new Random();
673 673
674 674 for (int i = 0; i < count; i++)
675 675 args[i] = (int)(rand.NextDouble() * 100);
676 676
677 677 int result = 0;
678 678
679 679 var t = Environment.TickCount;
680 680 args.ParallelForEach(x => Interlocked.Add(ref result, x), 4).Join();
681 681
682 682 Console.WriteLine("Iteration complete in {0} ms, result: {1}", Environment.TickCount - t, result);
683 683
684 684 int result2 = 0;
685 685
686 686 t = Environment.TickCount;
687 687 for (int i = 0; i < count; i++)
688 688 result2 += args[i];
689 689 Assert.AreEqual(result2, result);
690 690 Console.WriteLine("Verified in {0} ms", Environment.TickCount - t);
691 691 }
692 692
693 693 [TestMethod]
694 694 public void ComplexCase1Test() {
695 695 var flags = new bool[3];
696 696
697 697 // op1 (aync 200ms) => op2 (async 200ms) => op3 (sync map)
698 698
699 699 var step1 = PromiseHelper
700 700 .Sleep(200, "Alan")
701 701 .On(() => flags[0] = true, PromiseEventType.Cancelled);
702 702 var p = step1
703 703 .Chain(x =>
704 704 PromiseHelper
705 705 .Sleep(200, "Hi, " + x)
706 706 .Then(y => y)
707 707 .On(() => flags[1] = true, PromiseEventType.Cancelled)
708 708 )
709 709 .On(() => flags[2] = true, PromiseEventType.Cancelled);
710 710 step1.Join();
711 711 p.Cancel();
712 712 try {
713 713 Assert.AreEqual(p.Join(), "Hi, Alan");
714 714 Assert.Fail("Shouldn't get here");
715 715 } catch (OperationCanceledException) {
716 716 }
717 717
718 718 Assert.IsFalse(flags[0]);
719 719 Assert.IsTrue(flags[1]);
720 720 Assert.IsTrue(flags[2]);
721 721 }
722 722
723 723 [TestMethod]
724 724 public void ChainedCancel1Test() {
725 725 // при отмене сцепленной асинхронной операции все обещание должно
726 726 // завершаться ошибкой OperationCanceledException
727 727 var p = PromiseHelper
728 728 .Sleep(1, "Hi, HAL!")
729 729 .Then(x => {
730 730 // запускаем две асинхронные операции
731 731 var result = PromiseHelper.Sleep(1000, "HEM ENABLED!!!");
732 732 // вторая операция отменяет первую до завершения
733 733 PromiseHelper
734 734 .Sleep(100, "HAL, STOP!")
735 735 .Then(result.Cancel);
736 736 return result;
737 737 });
738 738 try {
739 739 p.Join();
740 740 } catch (TargetInvocationException err) {
741 741 Assert.IsTrue(err.InnerException is OperationCanceledException);
742 742 }
743 743 }
744 744
745 745 [TestMethod]
746 746 public void ChainedCancel2Test() {
747 747 // при отмене цепочки обещаний, вложенные операции также должны отменяться
748 748 var pSurvive = new Promise<bool>();
749 749 var hemStarted = new ManualResetEvent(false);
750 750 var p = PromiseHelper
751 751 .Sleep(1, "Hi, HAL!")
752 752 .Chain(x => {
753 753 hemStarted.Set();
754 754 // запускаем две асинхронные операции
755 755 var result = PromiseHelper
756 756 .Sleep(100000000, "HEM ENABLED!!!")
757 757 .Then(s => {
758 758 pSurvive.Resolve(false);
759 759 return s;
760 760 });
761 761
762 762 result
763 763 .On(() => pSurvive.Resolve(true), PromiseEventType.Cancelled);
764 764
765 765 return result;
766 766 });
767 767
768 768 hemStarted.WaitOne();
769 769 p.Cancel();
770 770
771 771 try {
772 772 p.Join();
773 773 } catch (OperationCanceledException) {
774 774 Assert.IsTrue(pSurvive.Join());
775 775 }
776 776 }
777 777
778 778 [TestMethod]
779 779 public void SharedLockTest() {
780 780 var l = new SharedLock();
781 781 int shared = 0;
782 782 int exclusive = 0;
783 783 var s1 = new Signal();
784 784 var log = new AsyncQueue<string>();
785 785
786 786 try {
787 787 AsyncPool.RunThread(
788 788 () => {
789 789 log.Enqueue("Reader #1 started");
790 790 try {
791 791 l.LockShared();
792 792 log.Enqueue("Reader #1 lock got");
793 793 if (Interlocked.Increment(ref shared) == 2)
794 794 s1.Set();
795 795 s1.Wait();
796 796 log.Enqueue("Reader #1 finished");
797 797 Interlocked.Decrement(ref shared);
798 798 } finally {
799 799 l.Release();
800 800 log.Enqueue("Reader #1 lock released");
801 801 }
802 802 },
803 803 () => {
804 804 log.Enqueue("Reader #2 started");
805 805
806 806 try {
807 807 l.LockShared();
808 808 log.Enqueue("Reader #2 lock got");
809 809
810 810 if (Interlocked.Increment(ref shared) == 2)
811 811 s1.Set();
812 812 s1.Wait();
813 813 log.Enqueue("Reader #2 upgrading to writer");
814 814 Interlocked.Decrement(ref shared);
815 815 l.Upgrade();
816 816 log.Enqueue("Reader #2 upgraded");
817 817
818 818 Assert.AreEqual(1, Interlocked.Increment(ref exclusive));
819 819 Assert.AreEqual(0, shared);
820 820 log.Enqueue("Reader #2 finished");
821 821 Interlocked.Decrement(ref exclusive);
822 822 } finally {
823 823 l.Release();
824 824 log.Enqueue("Reader #2 lock released");
825 825 }
826 826 },
827 827 () => {
828 828 log.Enqueue("Writer #1 started");
829 829 try {
830 830 l.LockExclusive();
831 831 log.Enqueue("Writer #1 got the lock");
832 832 Assert.AreEqual(1, Interlocked.Increment(ref exclusive));
833 833 Interlocked.Decrement(ref exclusive);
834 834 log.Enqueue("Writer #1 is finished");
835 835 } finally {
836 836 l.Release();
837 837 log.Enqueue("Writer #1 lock released");
838 838 }
839 839 }
840 840 ).Bundle().Join(1000);
841 841 log.Enqueue("Done");
842 842 } catch(Exception error) {
843 843 log.Enqueue(error.Message);
844 844 throw;
845 845 } finally {
846 846 foreach (var m in log)
847 847 Console.WriteLine(m);
848 848 }
849 849 }
850 850 }
851 851 }
852 852
@@ -1,66 +1,67
1 1 <?xml version="1.0" encoding="utf-8"?>
2 2 <Project DefaultTargets="Build" ToolsVersion="4.0" xmlns="http://schemas.microsoft.com/developer/msbuild/2003">
3 3 <PropertyGroup>
4 4 <Configuration Condition=" '$(Configuration)' == '' ">Debug</Configuration>
5 5 <Platform Condition=" '$(Platform)' == '' ">AnyCPU</Platform>
6 6 <ProductVersion>8.0.30703</ProductVersion>
7 7 <SchemaVersion>2.0</SchemaVersion>
8 8 <ProjectGuid>{2BD05F84-E067-4B87-9477-FDC2676A21C6}</ProjectGuid>
9 9 <OutputType>Library</OutputType>
10 10 <RootNamespace>Implab.Test</RootNamespace>
11 11 <AssemblyName>Implab.Test</AssemblyName>
12 12 <TargetFrameworkVersion>v4.5</TargetFrameworkVersion>
13 13 </PropertyGroup>
14 14 <PropertyGroup Condition=" '$(Configuration)|$(Platform)' == 'Debug|AnyCPU' ">
15 15 <DebugSymbols>true</DebugSymbols>
16 16 <DebugType>full</DebugType>
17 17 <Optimize>false</Optimize>
18 18 <OutputPath>bin\Debug</OutputPath>
19 19 <DefineConstants>DEBUG;MONO</DefineConstants>
20 20 <ErrorReport>prompt</ErrorReport>
21 21 <WarningLevel>4</WarningLevel>
22 22 <ConsolePause>false</ConsolePause>
23 23 </PropertyGroup>
24 24 <PropertyGroup Condition=" '$(Configuration)|$(Platform)' == 'Release|AnyCPU' ">
25 25 <Optimize>true</Optimize>
26 26 <OutputPath>bin\Release</OutputPath>
27 27 <ErrorReport>prompt</ErrorReport>
28 28 <WarningLevel>4</WarningLevel>
29 29 <ConsolePause>false</ConsolePause>
30 30 <DefineConstants>MONO</DefineConstants>
31 31 </PropertyGroup>
32 32 <PropertyGroup Condition=" '$(Configuration)|$(Platform)' == 'Debug 4.5|AnyCPU' ">
33 33 <DebugSymbols>true</DebugSymbols>
34 34 <DebugType>full</DebugType>
35 35 <Optimize>false</Optimize>
36 36 <OutputPath>bin\Debug</OutputPath>
37 37 <DefineConstants>DEBUG;TRACE;NET_4_5;MONO</DefineConstants>
38 38 <ErrorReport>prompt</ErrorReport>
39 39 <WarningLevel>4</WarningLevel>
40 40 <ConsolePause>false</ConsolePause>
41 41 </PropertyGroup>
42 42 <PropertyGroup Condition=" '$(Configuration)|$(Platform)' == 'Release 4.5|AnyCPU' ">
43 43 <Optimize>true</Optimize>
44 44 <OutputPath>bin\Release</OutputPath>
45 45 <DefineConstants>NET_4_5;MONO</DefineConstants>
46 46 <ErrorReport>prompt</ErrorReport>
47 47 <WarningLevel>4</WarningLevel>
48 48 <ConsolePause>false</ConsolePause>
49 49 </PropertyGroup>
50 50 <ItemGroup>
51 51 <Reference Include="System" />
52 52 <Reference Include="nunit.framework" />
53 53 </ItemGroup>
54 54 <Import Project="$(MSBuildBinPath)\Microsoft.CSharp.targets" />
55 55 <ItemGroup>
56 56 <Compile Include="AsyncTests.cs" />
57 57 <Compile Include="PromiseHelper.cs" />
58 58 <Compile Include="Properties\AssemblyInfo.cs" />
59 <Compile Include="CancelationTests.cs" />
59 60 </ItemGroup>
60 61 <ItemGroup>
61 62 <ProjectReference Include="..\Implab\Implab.csproj">
62 63 <Project>{F550F1F8-8746-4AD0-9614-855F4C4B7F05}</Project>
63 64 <Name>Implab</Name>
64 65 </ProjectReference>
65 66 </ItemGroup>
66 67 </Project> No newline at end of file
@@ -1,350 +1,353
1 1 using System;
2 2 using Implab.Parallels;
3 3 using System.Threading;
4 4 using System.Reflection;
5 5
6 6 namespace Implab {
7 public abstract class AbstractEvent<THandler> : ICancelationToken, ICancellable {
7 public abstract class AbstractEvent<THandler> : ICancellationToken, ICancellable {
8 8
9 9 const int UNRESOLVED_SATE = 0;
10 10 const int TRANSITIONAL_STATE = 1;
11 11 const int SUCCEEDED_STATE = 2;
12 12 const int REJECTED_STATE = 3;
13 13 const int CANCELLED_STATE = 4;
14 14
15 15 const int CANCEL_NOT_REQUESTED = 0;
16 16 const int CANCEL_REQUESTING = 1;
17 17 const int CANCEL_REQUESTED = 2;
18 18
19 19 const int RESERVED_HANDLERS_COUNT = 4;
20 20
21 21 int m_state;
22 22 Exception m_error;
23 23 int m_handlersCount;
24 24
25 25 readonly THandler[] m_handlers = new THandler[RESERVED_HANDLERS_COUNT];
26 26 MTQueue<THandler> m_extraHandlers;
27 27 int m_handlerPointer = -1;
28 28 int m_handlersCommited;
29 29
30 30 int m_cancelRequest;
31 31 Exception m_cancelationReason;
32 32 MTQueue<Action<Exception>> m_cancelationHandlers;
33 33
34 34
35 35 #region state managment
36 36 bool BeginTransit() {
37 37 return UNRESOLVED_SATE == Interlocked.CompareExchange(ref m_state, TRANSITIONAL_STATE, UNRESOLVED_SATE);
38 38 }
39 39
40 40 void CompleteTransit(int state) {
41 41 if (TRANSITIONAL_STATE != Interlocked.CompareExchange(ref m_state, state, TRANSITIONAL_STATE))
42 42 throw new InvalidOperationException("Can't complete transition when the object isn't in the transitional state");
43 43 }
44 44
45 45 void WaitTransition() {
46 46 while (m_state == TRANSITIONAL_STATE) {
47 47 Thread.MemoryBarrier();
48 48 }
49 49 }
50 50
51 51 protected bool BeginSetResult() {
52 52 if (!BeginTransit()) {
53 53 WaitTransition();
54 54 if (m_state != CANCELLED_STATE)
55 55 throw new InvalidOperationException("The promise is already resolved");
56 56 return false;
57 57 }
58 58 return true;
59 59 }
60 60
61 61 protected void EndSetResult() {
62 62 CompleteTransit(SUCCEEDED_STATE);
63 63 OnSuccess();
64 64 }
65 65
66 66
67 67
68 68 /// <summary>
69 69 /// Выполняет обещание, сообщая об ошибке
70 70 /// </summary>
71 71 /// <remarks>
72 72 /// Поскольку обещание должно работать в многопточной среде, при его выполнении сразу несколько потоков
73 73 /// могу вернуть ошибку, при этом только первая будет использована в качестве результата, остальные
74 74 /// будут проигнорированы.
75 75 /// </remarks>
76 76 /// <param name="error">Исключение возникшее при выполнении операции</param>
77 77 /// <exception cref="InvalidOperationException">Данное обещание уже выполнено</exception>
78 78 protected void SetError(Exception error) {
79 79 if (BeginTransit()) {
80 80 if (error is OperationCanceledException) {
81 81 CompleteTransit(CANCELLED_STATE);
82 82 m_error = error.InnerException;
83 83 OnCancelled();
84 84 } else {
85 85 m_error = error is PromiseTransientException ? error.InnerException : error;
86 86 CompleteTransit(REJECTED_STATE);
87 87 OnError();
88 88 }
89 89 } else {
90 90 WaitTransition();
91 91 if (m_state == SUCCEEDED_STATE)
92 92 throw new InvalidOperationException("The promise is already resolved");
93 93 }
94 94 }
95 95
96 96 /// <summary>
97 97 /// Отменяет операцию, если это возможно.
98 98 /// </summary>
99 99 /// <remarks>Для определения была ли операция отменена следует использовать свойство <see cref="IsCancelled"/>.</remarks>
100 100 protected void SetCancelled(Exception reason) {
101 101 if (BeginTransit()) {
102 102 m_error = reason;
103 103 CompleteTransit(CANCELLED_STATE);
104 104 OnCancelled();
105 105 }
106 106 }
107 107
108 108 protected abstract void SignalSuccess(THandler handler);
109 109
110 110 protected abstract void SignalError(THandler handler, Exception error);
111 111
112 112 protected abstract void SignalCancelled(THandler handler, Exception reason);
113 113
114 114 void OnSuccess() {
115 115 var hp = m_handlerPointer;
116 116 var slot = hp +1 ;
117 117 while (slot < m_handlersCommited) {
118 118 if (Interlocked.CompareExchange(ref m_handlerPointer, slot, hp) == hp) {
119 119 SignalSuccess(m_handlers[slot]);
120 120 }
121 121 hp = m_handlerPointer;
122 122 slot = hp +1 ;
123 123 }
124 124
125 125
126 126 if (m_extraHandlers != null) {
127 127 THandler handler;
128 128 while (m_extraHandlers.TryDequeue(out handler))
129 129 SignalSuccess(handler);
130 130 }
131 131 }
132 132
133 133 void OnError() {
134 134 var hp = m_handlerPointer;
135 135 var slot = hp +1 ;
136 136 while (slot < m_handlersCommited) {
137 137 if (Interlocked.CompareExchange(ref m_handlerPointer, slot, hp) == hp) {
138 138 SignalError(m_handlers[slot],m_error);
139 139 }
140 140 hp = m_handlerPointer;
141 141 slot = hp +1 ;
142 142 }
143 143
144 144 if (m_extraHandlers != null) {
145 145 THandler handler;
146 146 while (m_extraHandlers.TryDequeue(out handler))
147 147 SignalError(handler, m_error);
148 148 }
149 149 }
150 150
151 151 void OnCancelled() {
152 152 var hp = m_handlerPointer;
153 153 var slot = hp +1 ;
154 154 while (slot < m_handlersCommited) {
155 155 if (Interlocked.CompareExchange(ref m_handlerPointer, slot, hp) == hp) {
156 156 SignalCancelled(m_handlers[slot], m_error);
157 157 }
158 158 hp = m_handlerPointer;
159 159 slot = hp +1 ;
160 160 }
161 161
162 162 if (m_extraHandlers != null) {
163 163 THandler handler;
164 164 while (m_extraHandlers.TryDequeue(out handler))
165 165 SignalCancelled(handler, m_error);
166 166 }
167 167 }
168 168
169 169 #endregion
170 170
171 171 protected abstract Signal GetResolveSignal();
172 172
173 173 #region synchronization traits
174 174 protected void WaitResult(int timeout) {
175 175 if (!IsResolved)
176 176 GetResolveSignal().Wait(timeout);
177 177
178 178 switch (m_state) {
179 179 case SUCCEEDED_STATE:
180 180 return;
181 181 case CANCELLED_STATE:
182 182 throw new OperationCanceledException();
183 183 case REJECTED_STATE:
184 184 throw new TargetInvocationException(m_error);
185 185 default:
186 186 throw new ApplicationException(String.Format("Invalid promise state {0}", m_state));
187 187 }
188 188 }
189 189 #endregion
190 190
191 191 #region handlers managment
192 192
193 193 protected void AddHandler(THandler handler) {
194 194
195 195 if (m_state > 1) {
196 196 // the promise is in the resolved state, just invoke the handler
197 197 InvokeHandler(handler);
198 198 } else {
199 199 var slot = Interlocked.Increment(ref m_handlersCount) - 1;
200 200
201 201 if (slot < RESERVED_HANDLERS_COUNT) {
202 202
203 203 m_handlers[slot] = handler;
204 204
205 205 while (slot != Interlocked.CompareExchange(ref m_handlersCommited, slot + 1, slot)) {
206 206 }
207 207
208 208 if (m_state > 1) {
209 209 do {
210 210 var hp = m_handlerPointer;
211 211 slot = hp + 1;
212 212 if (slot < m_handlersCommited) {
213 213 if (Interlocked.CompareExchange(ref m_handlerPointer, slot, hp) != hp)
214 214 continue;
215 215 InvokeHandler(m_handlers[slot]);
216 216 }
217 217 break;
218 218 } while(true);
219 219 }
220 220 } else {
221 221 if (slot == RESERVED_HANDLERS_COUNT) {
222 222 m_extraHandlers = new MTQueue<THandler>();
223 223 } else {
224 224 while (m_extraHandlers == null)
225 225 Thread.MemoryBarrier();
226 226 }
227 227
228 228 m_extraHandlers.Enqueue(handler);
229 229
230 230 if (m_state > 1 && m_extraHandlers.TryDequeue(out handler))
231 231 // if the promise have been resolved while we was adding the handler to the queue
232 232 // we can't guarantee that someone is still processing it
233 233 // therefore we need to fetch a handler from the queue and execute it
234 234 // note that fetched handler may be not the one that we have added
235 235 // even we can fetch no handlers at all :)
236 236 InvokeHandler(handler);
237 237 }
238 238 }
239 239 }
240 240
241 241 protected void InvokeHandler(THandler handler) {
242 242 switch (m_state) {
243 243 case SUCCEEDED_STATE:
244 244 SignalSuccess(handler);
245 245 break;
246 246 case CANCELLED_STATE:
247 247 SignalCancelled(handler, m_error);
248 248 break;
249 249 case REJECTED_STATE:
250 250 SignalError(handler, m_error);
251 251 break;
252 252 default:
253 253 throw new Exception(String.Format("Invalid promise state {0}", m_state));
254 254 }
255 255 }
256 256
257 257 #endregion
258 258
259 259 #region IPromise implementation
260 260
261 261 public bool IsResolved {
262 262 get {
263 263 Thread.MemoryBarrier();
264 264 return m_state > 1;
265 265 }
266 266 }
267 267
268 268 public bool IsCancelled {
269 269 get {
270 270 Thread.MemoryBarrier();
271 271 return m_state == CANCELLED_STATE;
272 272 }
273 273 }
274 274
275 275 #endregion
276 276
277 277 public Exception Error {
278 278 get {
279 279 return m_error;
280 280 }
281 281 }
282 282
283 public bool AcceptIfRequested() {
284 if (IsCancelRequested)
285 CancelOperation(CancelReason);
283 public bool CancelOperationIfRequested() {
284 if (IsCancellationRequested) {
285 CancelOperation(CancellationReason);
286 return true;
287 }
288 return false;
286 289 }
287 290
288 291 public virtual void CancelOperation(Exception reason) {
289 292 SetCancelled(reason);
290 293 }
291 294
292 public void CancelationRequested(Action<Exception> handler) {
295 public void CancellationRequested(Action<Exception> handler) {
293 296 Safe.ArgumentNotNull(handler, "handler");
294 if (IsCancelRequested)
295 handler(CancelReason);
297 if (IsCancellationRequested)
298 handler(CancellationReason);
296 299
297 300 if (m_cancelationHandlers == null)
298 301 Interlocked.CompareExchange(ref m_cancelationHandlers, new MTQueue<Action<Exception>>(), null);
299 302
300 303 m_cancelationHandlers.Enqueue(handler);
301 304
302 if (IsCancelRequested && m_cancelationHandlers.TryDequeue(out handler))
305 if (IsCancellationRequested && m_cancelationHandlers.TryDequeue(out handler))
303 306 // TryDeque implies MemoryBarrier()
304 307 handler(m_cancelationReason);
305 308 }
306 309
307 public bool IsCancelRequested {
310 public bool IsCancellationRequested {
308 311 get {
309 312 do {
310 313 if (m_cancelRequest == CANCEL_NOT_REQUESTED)
311 314 return false;
312 315 if (m_cancelRequest == CANCEL_REQUESTED)
313 316 return true;
314 317 Thread.MemoryBarrier();
315 318 } while(true);
316 319 }
317 320 }
318 321
319 public Exception CancelReason {
322 public Exception CancellationReason {
320 323 get {
321 324 do {
322 325 Thread.MemoryBarrier();
323 326 } while(m_cancelRequest == CANCEL_REQUESTING);
324 327
325 328 return m_cancelationReason;
326 329 }
327 330 }
328 331
329 332 #region ICancellable implementation
330 333
331 334 public void Cancel() {
332 335 Cancel(null);
333 336 }
334 337
335 338 public void Cancel(Exception reason) {
336 if (CANCEL_NOT_REQUESTED == Interlocked.CompareExchange(ref m_cancelRequest, CANCEL_REQUESTING)) {
339 if (CANCEL_NOT_REQUESTED == Interlocked.CompareExchange(ref m_cancelRequest, CANCEL_REQUESTING, CANCEL_NOT_REQUESTED)) {
337 340 m_cancelationReason = reason;
338 341 m_cancelRequest = CANCEL_REQUESTED;
339 342 if (m_cancelationHandlers != null) {
340 343 Action<Exception> handler;
341 344 while (m_cancelationHandlers.TryDequeue(out handler))
342 345 handler(m_cancelationReason);
343 346 }
344 347 }
345 348 }
346 349
347 350 #endregion
348 351 }
349 352 }
350 353
@@ -1,136 +1,138
1 1 using System;
2 2 using Implab.Parallels;
3 3
4 4 namespace Implab {
5 5 public abstract class AbstractPromise : AbstractEvent<AbstractPromise.HandlerDescriptor>, IPromise {
6 6 public struct HandlerDescriptor {
7 7 readonly Action m_handler;
8 8 readonly Action<Exception> m_error;
9 9 readonly Action<Exception> m_cancel;
10 10 readonly PromiseEventType m_mask;
11 11
12 12 public HandlerDescriptor(Action success, Action<Exception> error, Action<Exception> cancel) {
13 13 m_handler = success;
14 14 m_error = error;
15 15 m_cancel = cancel;
16 16 m_mask = PromiseEventType.Success;
17 17 }
18 18
19 19 public HandlerDescriptor(Action handler, PromiseEventType mask) {
20 20 m_handler = handler;
21 m_error = null;
22 m_cancel = null;
21 23 m_mask = mask;
22 24 }
23 25
24 26 public void SignalSuccess() {
25 if (m_mask & PromiseEventType.Success && m_handler != null) {
27 if ((m_mask & PromiseEventType.Success) != 0 && m_handler != null) {
26 28 try {
27 29 m_handler();
28 30 } catch (Exception err) {
29 31 // avoid calling handler twice in case of error
30 32 if (m_error != null)
31 33 SignalError(err);
32 34 }
33 35 }
34 36 }
35 37
36 38 public void SignalError(Exception err) {
37 39 if (m_error != null) {
38 40 try {
39 41 m_error(err);
40 42 // Analysis disable once EmptyGeneralCatchClause
41 43 } catch {
42 44 }
43 } else if (m_mask & PromiseEventType.Error && m_handler != null) {
45 } else if ((m_mask & PromiseEventType.Error ) != 0 && m_handler != null) {
44 46 try {
45 47 m_handler();
46 48 // Analysis disable once EmptyGeneralCatchClause
47 49 } catch {
48 50 }
49 51 }
50 52 }
51 53
52 54 public void SignalCancel(Exception reason) {
53 55 if (m_cancel != null) {
54 56 try {
55 57 m_cancel(reason);
56 58 } catch (Exception err) {
57 59 SignalError(err);
58 60 }
59 } else if (m_mask & PromiseEventType.Cancelled && m_handler != null) {
61 } else if ( (m_mask & PromiseEventType.Cancelled) != 0 && m_handler != null) {
60 62 try {
61 63 m_handler();
62 64 // Analysis disable once EmptyGeneralCatchClause
63 65 } catch {
64 66 }
65 67 }
66 68 }
67 69 }
68 70
69 71
70 72 #region implemented abstract members of AbstractPromise
71 73
72 74 protected override void SignalSuccess(HandlerDescriptor handler) {
73 75 handler.SignalSuccess();
74 76 }
75 77
76 78 protected override void SignalError(HandlerDescriptor handler, Exception error) {
77 79 handler.SignalError(error);
78 80 }
79 81
80 82 protected override void SignalCancelled(HandlerDescriptor handler, Exception reason) {
81 83 handler.SignalCancel(reason);
82 84 }
83 85
84 86 protected override Signal GetResolveSignal() {
85 87 var signal = new Signal();
86 88 On(signal.Set, PromiseEventType.All);
89 return signal;
87 90 }
88 91
89 92 #endregion
90 93
91
92 94 public Type PromiseType {
93 95 get {
94 96 return typeof(void);
95 97 }
96 98 }
97 99
98 100 public IPromise On(Action success, Action<Exception> error, Action<Exception> cancel) {
99 101 AddHandler(new HandlerDescriptor(success, error, cancel));
100 102 return this;
101 103 }
102 104
103 105 public IPromise On(Action success, Action<Exception> error) {
104 106 AddHandler(new HandlerDescriptor(success, error, null));
105 107 return this;
106 108 }
107 109
108 110 public IPromise On(Action success) {
109 111 AddHandler(new HandlerDescriptor(success, null, null));
110 112 return this;
111 113 }
112 114
113 115 public IPromise On(Action handler, PromiseEventType events) {
114 116 AddHandler(new HandlerDescriptor(handler,events));
115 117 return this;
116 118 }
117 119
118 120 public IPromise<T> Cast<T>() {
119 121 throw new InvalidCastException();
120 122 }
121 123
122 124 public void Join() {
123 125 WaitResult(-1);
124 126 }
125 127
126 128 public void Join(int timeout) {
127 129 WaitResult(timeout);
128 130 }
129 131
130 132 protected void SetResult() {
131 133 BeginSetResult();
132 134 EndSetResult();
133 135 }
134 136 }
135 137 }
136 138
@@ -1,185 +1,202
1 1 using System;
2 2 using Implab.Parallels;
3 3
4 4 namespace Implab {
5 5 public abstract class AbstractPromise<T> : AbstractEvent<AbstractPromise<T>.HandlerDescriptor>, IPromise<T> {
6 6 public struct HandlerDescriptor {
7 7 readonly Action m_handler;
8 8 readonly Action<T> m_success;
9 9 readonly Action<Exception> m_error;
10 10 readonly Action<Exception> m_cancel;
11 11 readonly PromiseEventType m_mask;
12 12
13 13 public HandlerDescriptor(Action<T> success, Action<Exception> error, Action<Exception> cancel) {
14 14 m_success = success;
15 15 m_error = error;
16 16 m_cancel = cancel;
17
18 m_handler = null;
19 m_mask = 0;
17 20 }
18 21
19 22 public HandlerDescriptor(Action success, Action<Exception> error, Action<Exception> cancel) {
20 23 m_handler = success;
24 m_success = null;
21 25 m_error = error;
22 26 m_cancel = cancel;
23 27 m_mask = PromiseEventType.Success;
24 28 }
25 29
26 30 public HandlerDescriptor(Action handler, PromiseEventType mask) {
27 31 m_handler = handler;
28 32 m_mask = mask;
33 m_success = null;
34 m_error = null;
35 m_cancel = null;
29 36 }
30 37
31 38 public void SignalSuccess(T result) {
32 39 if (m_success != null) {
33 40 try {
34 41 m_success(result);
35 42 } catch(Exception err) {
36 43 SignalError(err);
37 44 }
38 } else if (m_mask & PromiseEventType.Success && m_handler != null) {
45 } else if ((m_mask & PromiseEventType.Success) != 0 && m_handler != null) {
39 46 try {
40 47 m_handler();
41 48 } catch(Exception err) {
42 49 // avoid calling handler twice in case of error
43 50 if (m_error != null)
44 51 SignalError(err);
45 52 }
46 53 }
47 54 }
48 55
49 56 public void SignalError(Exception err) {
50 57 if (m_error != null) {
51 58 try {
52 59 m_error(err);
53 60 // Analysis disable once EmptyGeneralCatchClause
54 61 } catch {
55 62 }
56 } else if (m_mask & PromiseEventType.Error && m_handler != null) {
63 } else if ((m_mask & PromiseEventType.Error) != 0 && m_handler != null) {
57 64 try {
58 65 m_handler();
59 66 // Analysis disable once EmptyGeneralCatchClause
60 67 } catch {
61 68 }
62 69 }
63 70 }
64 71
65 72 public void SignalCancel(Exception reason) {
66 73 if (m_cancel != null) {
67 74 try {
68 75 m_cancel(reason);
69 76 } catch (Exception err) {
70 77 SignalError(err);
71 78 }
72 } else if (m_mask & PromiseEventType.Cancelled && m_handler != null) {
79 } else if ((m_mask & PromiseEventType.Cancelled) != 0 && m_handler != null) {
73 80 try {
74 81 m_handler();
75 82 // Analysis disable once EmptyGeneralCatchClause
76 83 } catch {
77 84 }
78 85 }
79 86 }
80 87 }
81 88
82
83
84 89 public Type PromiseType {
85 90 get {
86 91 return typeof(T);
87 92 }
88 93 }
89 94
90 public new T Join() {
95 public T Join() {
91 96 WaitResult(-1);
92 97 return m_result;
93 98 }
94 public new T Join(int timeout) {
99 public T Join(int timeout) {
95 100 WaitResult(timeout);
96 101 return m_result;
97 102 }
98 103
104 void IPromise.Join() {
105 WaitResult(-1);
106 }
107 void IPromise.Join(int timeout) {
108 WaitResult(timeout);
109 }
110
99 111 public IPromise<T> On(Action<T> success, Action<Exception> error, Action<Exception> cancel) {
100 112 AddHandler(new HandlerDescriptor(success, error, cancel));
101 113 return this;
102 114 }
103 115
104 116 public IPromise<T> On(Action<T> success, Action<Exception> error) {
105 117 AddHandler(new HandlerDescriptor(success, error, null));
106 118 return this;
107 119 }
108 120
109 121 public IPromise<T> On(Action<T> success) {
110 122 AddHandler(new HandlerDescriptor(success, null, null));
111 123 return this;
112 124 }
113 125
114 126 public IPromise<T> On(Action handler, PromiseEventType events) {
115 127 AddHandler(new HandlerDescriptor(handler, events));
116 128 return this;
117 129 }
118 130
119 131 public IPromise<T> On(Action success, Action<Exception> error, Action<Exception> cancel) {
120 132 AddHandler(new HandlerDescriptor(success, error, cancel));
121 133 return this;
122 134 }
123 135
124 136 public IPromise<T> On(Action success, Action<Exception> error) {
125 137 AddHandler(new HandlerDescriptor(success, error, null));
126 138 return this;
127 139 }
128 140
129 141 public IPromise<T> On(Action success) {
130 142 AddHandler(new HandlerDescriptor(success, null, null));
131 143 return this;
132 144 }
133 145
134 146 IPromise IPromise.On(Action success, Action<Exception> error, Action<Exception> cancel) {
135 147 AddHandler(new HandlerDescriptor(success, error, cancel));
136 148 return this;
137 149 }
138 150
139 151 IPromise IPromise.On(Action success, Action<Exception> error) {
140 152 AddHandler(new HandlerDescriptor(success, error, null));
141 153 return this;
142 154 }
143 155
144 156 IPromise IPromise.On(Action success) {
145 157 AddHandler(new HandlerDescriptor(success, null, null));
146 158 return this;
147 159 }
148 160
161 IPromise IPromise.On(Action handler, PromiseEventType events) {
162 AddHandler(new HandlerDescriptor(handler, events));
163 return this;
164 }
165
149 166 public IPromise<T2> Cast<T2>() {
150 167 return (IPromise<T2>)this;
151 168 }
152 169
153 170 #region implemented abstract members of AbstractPromise
154 171
155 172 protected override Signal GetResolveSignal() {
156 173 var signal = new Signal();
157 174 AddHandler(new HandlerDescriptor(signal.Set, PromiseEventType.All));
158 175 return signal;
159 176 }
160 177
161 178 protected override void SignalSuccess(HandlerDescriptor handler) {
162 179 handler.SignalSuccess(m_result);
163 180 }
164 181
165 182 protected override void SignalError(HandlerDescriptor handler, Exception error) {
166 183 handler.SignalError(error);
167 184 }
168 185
169 186 protected override void SignalCancelled(HandlerDescriptor handler, Exception reason) {
170 187 handler.SignalCancel(reason);
171 188 }
172 189
173 190 #endregion
174 191
175 192 T m_result;
176 193
177 194 protected void SetResult(T value) {
178 195 if (BeginSetResult()) {
179 196 m_result = value;
180 197 EndSetResult();
181 198 }
182 199 }
183 200 }
184 201 }
185 202
@@ -1,24 +1,24
1 1 using System;
2 2
3 3 namespace Implab {
4 4 /// <summary>
5 5 /// Deferred result, usually used by asynchronous services as the service part of the promise.
6 6 /// </summary>
7 public interface IDeferred : ICancelationToken {
7 public interface IDeferred : ICancellationToken {
8 8
9 9 void Resolve();
10 10
11 11 /// <summary>
12 12 /// Reject the promise with the specified error.
13 13 /// </summary>
14 14 /// <param name="error">The reason why the promise is rejected.</param>
15 15 /// <remarks>
16 16 /// Some exceptions are treated in a special case:
17 17 /// <see cref="OperationCanceledException"/> is interpreted as call to <see cref="Cancel()"/> method,
18 18 /// and <see cref="PromiseTransientException"/> is always unwrapped and its
19 19 /// <see cref="PromiseTransientException.InnerException"> is used as the reason to reject promise.
20 20 /// </remarks>
21 21 void Reject(Exception error);
22 22 }
23 23 }
24 24
@@ -1,10 +1,10
1 1 using System;
2 2
3 3 namespace Implab {
4 public interface IDeferred<T> : ICancelationToken {
4 public interface IDeferred<in T> : ICancellationToken {
5 5 void Resolve(T value);
6 6
7 7 void Reject(Exception error);
8 8 }
9 9 }
10 10
@@ -1,14 +1,11
1 1 using System;
2 using System.Collections.Generic;
3 using System.Linq;
4 using System.Text;
5 2
6 3 namespace Implab {
7 4 public interface ITaskController: IProgressHandler, ICancellable {
8 5 bool IsCancelled {
9 6 get;
10 7 }
11 8
12 9 event EventHandler Cancelled;
13 10 }
14 11 }
@@ -1,240 +1,250
1 1 <?xml version="1.0" encoding="utf-8"?>
2 2 <Project DefaultTargets="Build" ToolsVersion="4.0" xmlns="http://schemas.microsoft.com/developer/msbuild/2003">
3 3 <PropertyGroup>
4 4 <Configuration Condition=" '$(Configuration)' == '' ">Debug</Configuration>
5 5 <Platform Condition=" '$(Platform)' == '' ">AnyCPU</Platform>
6 6 <ProjectGuid>{F550F1F8-8746-4AD0-9614-855F4C4B7F05}</ProjectGuid>
7 7 <OutputType>Library</OutputType>
8 8 <RootNamespace>Implab</RootNamespace>
9 9 <AssemblyName>Implab</AssemblyName>
10 10 <ProductVersion>8.0.30703</ProductVersion>
11 11 <SchemaVersion>2.0</SchemaVersion>
12 12 </PropertyGroup>
13 13 <PropertyGroup Condition=" '$(Configuration)|$(Platform)' == 'Debug|AnyCPU' ">
14 14 <DebugSymbols>true</DebugSymbols>
15 15 <DebugType>full</DebugType>
16 16 <Optimize>false</Optimize>
17 17 <OutputPath>bin\Debug</OutputPath>
18 18 <DefineConstants>TRACE;DEBUG;</DefineConstants>
19 19 <ErrorReport>prompt</ErrorReport>
20 20 <WarningLevel>4</WarningLevel>
21 21 <ConsolePause>false</ConsolePause>
22 22 <RunCodeAnalysis>true</RunCodeAnalysis>
23 23 </PropertyGroup>
24 24 <PropertyGroup Condition=" '$(Configuration)|$(Platform)' == 'Release|AnyCPU' ">
25 25 <DebugType>full</DebugType>
26 26 <Optimize>true</Optimize>
27 27 <OutputPath>bin\Release</OutputPath>
28 28 <ErrorReport>prompt</ErrorReport>
29 29 <WarningLevel>4</WarningLevel>
30 30 <ConsolePause>false</ConsolePause>
31 31 </PropertyGroup>
32 32 <PropertyGroup Condition=" '$(Configuration)|$(Platform)' == 'Debug 4.5|AnyCPU' ">
33 33 <DebugSymbols>true</DebugSymbols>
34 34 <DebugType>full</DebugType>
35 35 <Optimize>false</Optimize>
36 36 <OutputPath>bin\Debug</OutputPath>
37 37 <DefineConstants>TRACE;DEBUG;NET_4_5</DefineConstants>
38 38 <ErrorReport>prompt</ErrorReport>
39 39 <WarningLevel>4</WarningLevel>
40 40 <RunCodeAnalysis>true</RunCodeAnalysis>
41 41 <ConsolePause>false</ConsolePause>
42 42 </PropertyGroup>
43 43 <PropertyGroup Condition=" '$(Configuration)|$(Platform)' == 'Release 4.5|AnyCPU' ">
44 44 <Optimize>true</Optimize>
45 45 <OutputPath>bin\Release</OutputPath>
46 46 <ErrorReport>prompt</ErrorReport>
47 47 <WarningLevel>4</WarningLevel>
48 48 <ConsolePause>false</ConsolePause>
49 49 <DefineConstants>NET_4_5</DefineConstants>
50 50 </PropertyGroup>
51 51 <PropertyGroup Condition=" '$(Configuration)|$(Platform)' == 'DebugMono|AnyCPU' ">
52 52 <DebugSymbols>true</DebugSymbols>
53 53 <DebugType>full</DebugType>
54 54 <Optimize>false</Optimize>
55 55 <OutputPath>bin\Debug</OutputPath>
56 56 <DefineConstants>TRACE;DEBUG;NET_4_5;MONO</DefineConstants>
57 57 <ErrorReport>prompt</ErrorReport>
58 58 <WarningLevel>4</WarningLevel>
59 59 <RunCodeAnalysis>true</RunCodeAnalysis>
60 60 <ConsolePause>false</ConsolePause>
61 61 </PropertyGroup>
62 62 <PropertyGroup Condition=" '$(Configuration)|$(Platform)' == 'ReleaseMono|AnyCPU' ">
63 63 <Optimize>true</Optimize>
64 64 <OutputPath>bin\Release</OutputPath>
65 65 <DefineConstants>NET_4_5;MONO;</DefineConstants>
66 66 <ErrorReport>prompt</ErrorReport>
67 67 <WarningLevel>4</WarningLevel>
68 68 <ConsolePause>false</ConsolePause>
69 69 </PropertyGroup>
70 70 <ItemGroup>
71 71 <Reference Include="System" />
72 72 <Reference Include="System.Xml" />
73 73 </ItemGroup>
74 74 <ItemGroup>
75 75 <Compile Include="Component.cs" />
76 76 <Compile Include="CustomEqualityComparer.cs" />
77 77 <Compile Include="Diagnostics\ConsoleTraceListener.cs" />
78 78 <Compile Include="Diagnostics\EventText.cs" />
79 79 <Compile Include="Diagnostics\LogChannel.cs" />
80 80 <Compile Include="Diagnostics\LogicalOperation.cs" />
81 81 <Compile Include="Diagnostics\TextFileListener.cs" />
82 82 <Compile Include="Diagnostics\TraceLog.cs" />
83 83 <Compile Include="Diagnostics\TraceEvent.cs" />
84 84 <Compile Include="Diagnostics\TraceEventType.cs" />
85 85 <Compile Include="Disposable.cs" />
86 86 <Compile Include="ICancellable.cs" />
87 87 <Compile Include="IProgressHandler.cs" />
88 88 <Compile Include="IProgressNotifier.cs" />
89 89 <Compile Include="IPromiseT.cs" />
90 90 <Compile Include="IPromise.cs" />
91 91 <Compile Include="IServiceLocator.cs" />
92 92 <Compile Include="ITaskController.cs" />
93 93 <Compile Include="JSON\JSONElementContext.cs" />
94 94 <Compile Include="JSON\JSONElementType.cs" />
95 95 <Compile Include="JSON\JSONGrammar.cs" />
96 96 <Compile Include="JSON\JSONParser.cs" />
97 97 <Compile Include="JSON\JSONScanner.cs" />
98 98 <Compile Include="JSON\JsonTokenType.cs" />
99 99 <Compile Include="JSON\JSONWriter.cs" />
100 100 <Compile Include="JSON\JSONXmlReader.cs" />
101 101 <Compile Include="JSON\JSONXmlReaderOptions.cs" />
102 102 <Compile Include="JSON\StringTranslator.cs" />
103 103 <Compile Include="Parallels\DispatchPool.cs" />
104 104 <Compile Include="Parallels\ArrayTraits.cs" />
105 105 <Compile Include="Parallels\MTQueue.cs" />
106 106 <Compile Include="Parallels\WorkerPool.cs" />
107 107 <Compile Include="Parsing\Alphabet.cs" />
108 108 <Compile Include="Parsing\AlphabetBase.cs" />
109 109 <Compile Include="Parsing\AltToken.cs" />
110 110 <Compile Include="Parsing\BinaryToken.cs" />
111 111 <Compile Include="Parsing\CatToken.cs" />
112 112 <Compile Include="Parsing\CDFADefinition.cs" />
113 113 <Compile Include="Parsing\DFABuilder.cs" />
114 114 <Compile Include="Parsing\DFADefinitionBase.cs" />
115 115 <Compile Include="Parsing\DFAStateDescriptor.cs" />
116 116 <Compile Include="Parsing\DFAutomaton.cs" />
117 117 <Compile Include="Parsing\EDFADefinition.cs" />
118 118 <Compile Include="Parsing\EmptyToken.cs" />
119 119 <Compile Include="Parsing\EndToken.cs" />
120 120 <Compile Include="Parsing\EnumAlphabet.cs" />
121 121 <Compile Include="Parsing\Grammar.cs" />
122 122 <Compile Include="Parsing\IAlphabet.cs" />
123 123 <Compile Include="Parsing\IDFADefinition.cs" />
124 124 <Compile Include="Parsing\IVisitor.cs" />
125 125 <Compile Include="Parsing\ParserException.cs" />
126 126 <Compile Include="Parsing\Scanner.cs" />
127 127 <Compile Include="Parsing\StarToken.cs" />
128 128 <Compile Include="Parsing\SymbolToken.cs" />
129 129 <Compile Include="Parsing\Token.cs" />
130 130 <Compile Include="ServiceLocator.cs" />
131 131 <Compile Include="TaskController.cs" />
132 132 <Compile Include="ProgressInitEventArgs.cs" />
133 133 <Compile Include="Properties\AssemblyInfo.cs" />
134 134 <Compile Include="Parallels\AsyncPool.cs" />
135 135 <Compile Include="Safe.cs" />
136 136 <Compile Include="ValueEventArgs.cs" />
137 137 <Compile Include="PromiseExtensions.cs" />
138 138 <Compile Include="SyncContextPromise.cs" />
139 139 <Compile Include="Diagnostics\OperationContext.cs" />
140 140 <Compile Include="Diagnostics\TraceContext.cs" />
141 141 <Compile Include="Diagnostics\LogEventArgs.cs" />
142 142 <Compile Include="Diagnostics\LogEventArgsT.cs" />
143 143 <Compile Include="Diagnostics\Extensions.cs" />
144 144 <Compile Include="IComponentContainer.cs" />
145 145 <Compile Include="PromiseEventType.cs" />
146 146 <Compile Include="ComponentContainer.cs" />
147 147 <Compile Include="DisposablePool.cs" />
148 148 <Compile Include="ObjectPool.cs" />
149 149 <Compile Include="Parallels\AsyncQueue.cs" />
150 150 <Compile Include="PromiseT.cs" />
151 151 <Compile Include="IDeferred.cs" />
152 152 <Compile Include="IDeferredT.cs" />
153 153 <Compile Include="Promise.cs" />
154 154 <Compile Include="PromiseTransientException.cs" />
155 155 <Compile Include="Parallels\Signal.cs" />
156 156 <Compile Include="Parallels\SharedLock.cs" />
157 157 <Compile Include="Diagnostics\ILogWriter.cs" />
158 158 <Compile Include="Diagnostics\ListenerBase.cs" />
159 159 <Compile Include="Parallels\BlockingQueue.cs" />
160 <Compile Include="ICancelationToken.cs" />
161 160 <Compile Include="AbstractEvent.cs" />
162 161 <Compile Include="AbstractPromise.cs" />
163 162 <Compile Include="AbstractPromiseT.cs" />
164 163 <Compile Include="FuncTask.cs" />
165 164 <Compile Include="FuncTaskBase.cs" />
166 165 <Compile Include="FuncTaskT.cs" />
167 <Compile Include="ChainTask.cs" />
166 <Compile Include="ActionChainTaskBase.cs" />
167 <Compile Include="ActionChainTask.cs" />
168 <Compile Include="ActionChainTaskT.cs" />
169 <Compile Include="FuncChainTaskBase.cs" />
170 <Compile Include="FuncChainTask.cs" />
171 <Compile Include="FuncChainTaskT.cs" />
172 <Compile Include="ActionTaskBase.cs" />
173 <Compile Include="ActionTask.cs" />
174 <Compile Include="ActionTaskT.cs" />
175 <Compile Include="ICancellationToken.cs" />
176 <Compile Include="SuccessPromise.cs" />
177 <Compile Include="SuccessPromiseT.cs" />
168 178 </ItemGroup>
169 179 <Import Project="$(MSBuildBinPath)\Microsoft.CSharp.targets" />
170 180 <ItemGroup />
171 181 <ProjectExtensions>
172 182 <MonoDevelop>
173 183 <Properties>
174 184 <Policies>
175 185 <CSharpFormattingPolicy IndentSwitchBody="True" NamespaceBraceStyle="EndOfLine" ClassBraceStyle="EndOfLine" InterfaceBraceStyle="EndOfLine" StructBraceStyle="EndOfLine" EnumBraceStyle="EndOfLine" MethodBraceStyle="EndOfLine" ConstructorBraceStyle="EndOfLine" DestructorBraceStyle="EndOfLine" BeforeMethodDeclarationParentheses="False" BeforeMethodCallParentheses="False" BeforeConstructorDeclarationParentheses="False" NewLineBeforeConstructorInitializerColon="NewLine" NewLineAfterConstructorInitializerColon="SameLine" BeforeIndexerDeclarationBracket="False" BeforeDelegateDeclarationParentheses="False" NewParentheses="False" SpacesBeforeBrackets="False" inheritsSet="Mono" inheritsScope="text/x-csharp" scope="text/x-csharp" />
176 186 <TextStylePolicy FileWidth="120" EolMarker="Unix" inheritsSet="VisualStudio" inheritsScope="text/plain" scope="text/x-csharp" />
177 187 <DotNetNamingPolicy DirectoryNamespaceAssociation="PrefixedHierarchical" ResourceNamePolicy="MSBuild" />
178 188 <TextStylePolicy FileWidth="120" TabsToSpaces="False" inheritsSet="VisualStudio" inheritsScope="text/plain" scope="application/xml" />
179 189 <XmlFormattingPolicy inheritsSet="Mono" inheritsScope="application/xml" scope="application/xml" />
180 190 <TextStylePolicy FileWidth="120" TabsToSpaces="False" inheritsSet="VisualStudio" inheritsScope="text/plain" scope="text/plain" />
181 191 <NameConventionPolicy>
182 192 <Rules>
183 193 <NamingRule Name="Namespaces" AffectedEntity="Namespace" VisibilityMask="VisibilityMask" NamingStyle="PascalCase" IncludeInstanceMembers="True" IncludeStaticEntities="True" />
184 194 <NamingRule Name="Types" AffectedEntity="Class, Struct, Enum, Delegate" VisibilityMask="VisibilityMask" NamingStyle="PascalCase" IncludeInstanceMembers="True" IncludeStaticEntities="True" />
185 195 <NamingRule Name="Interfaces" AffectedEntity="Interface" VisibilityMask="VisibilityMask" NamingStyle="PascalCase" IncludeInstanceMembers="True" IncludeStaticEntities="True">
186 196 <RequiredPrefixes>
187 197 <String>I</String>
188 198 </RequiredPrefixes>
189 199 </NamingRule>
190 200 <NamingRule Name="Attributes" AffectedEntity="CustomAttributes" VisibilityMask="VisibilityMask" NamingStyle="PascalCase" IncludeInstanceMembers="True" IncludeStaticEntities="True">
191 201 <RequiredSuffixes>
192 202 <String>Attribute</String>
193 203 </RequiredSuffixes>
194 204 </NamingRule>
195 205 <NamingRule Name="Event Arguments" AffectedEntity="CustomEventArgs" VisibilityMask="VisibilityMask" NamingStyle="PascalCase" IncludeInstanceMembers="True" IncludeStaticEntities="True">
196 206 <RequiredSuffixes>
197 207 <String>EventArgs</String>
198 208 </RequiredSuffixes>
199 209 </NamingRule>
200 210 <NamingRule Name="Exceptions" AffectedEntity="CustomExceptions" VisibilityMask="VisibilityMask" NamingStyle="PascalCase" IncludeInstanceMembers="True" IncludeStaticEntities="True">
201 211 <RequiredSuffixes>
202 212 <String>Exception</String>
203 213 </RequiredSuffixes>
204 214 </NamingRule>
205 215 <NamingRule Name="Methods" AffectedEntity="Methods" VisibilityMask="VisibilityMask" NamingStyle="PascalCase" IncludeInstanceMembers="True" IncludeStaticEntities="True" />
206 216 <NamingRule Name="Static Readonly Fields" AffectedEntity="ReadonlyField" VisibilityMask="Internal, Protected, Public" NamingStyle="CamelCase" IncludeInstanceMembers="False" IncludeStaticEntities="True" />
207 217 <NamingRule Name="Fields (Non Private)" AffectedEntity="Field" VisibilityMask="Internal, Public" NamingStyle="CamelCase" IncludeInstanceMembers="True" IncludeStaticEntities="True" />
208 218 <NamingRule Name="ReadOnly Fields (Non Private)" AffectedEntity="ReadonlyField" VisibilityMask="Internal, Public" NamingStyle="CamelCase" IncludeInstanceMembers="True" IncludeStaticEntities="False" />
209 219 <NamingRule Name="Fields (Private)" AffectedEntity="Field, ReadonlyField" VisibilityMask="Private, Protected" NamingStyle="CamelCase" IncludeInstanceMembers="True" IncludeStaticEntities="False">
210 220 <RequiredPrefixes>
211 221 <String>m_</String>
212 222 </RequiredPrefixes>
213 223 </NamingRule>
214 224 <NamingRule Name="Static Fields (Private)" AffectedEntity="Field" VisibilityMask="Private" NamingStyle="CamelCase" IncludeInstanceMembers="False" IncludeStaticEntities="True">
215 225 <RequiredPrefixes>
216 226 <String>_</String>
217 227 </RequiredPrefixes>
218 228 </NamingRule>
219 229 <NamingRule Name="ReadOnly Fields (Private)" AffectedEntity="ReadonlyField" VisibilityMask="Private, Protected" NamingStyle="CamelCase" IncludeInstanceMembers="True" IncludeStaticEntities="False">
220 230 <RequiredPrefixes>
221 231 <String>m_</String>
222 232 </RequiredPrefixes>
223 233 </NamingRule>
224 234 <NamingRule Name="Constant Fields" AffectedEntity="ConstantField" VisibilityMask="VisibilityMask" NamingStyle="AllUpper" IncludeInstanceMembers="True" IncludeStaticEntities="True" />
225 235 <NamingRule Name="Properties" AffectedEntity="Property" VisibilityMask="VisibilityMask" NamingStyle="PascalCase" IncludeInstanceMembers="True" IncludeStaticEntities="True" />
226 236 <NamingRule Name="Events" AffectedEntity="Event" VisibilityMask="VisibilityMask" NamingStyle="PascalCase" IncludeInstanceMembers="True" IncludeStaticEntities="True" />
227 237 <NamingRule Name="Enum Members" AffectedEntity="EnumMember" VisibilityMask="VisibilityMask" NamingStyle="PascalCase" IncludeInstanceMembers="True" IncludeStaticEntities="True" />
228 238 <NamingRule Name="Parameters" AffectedEntity="Parameter, LocalVariable" VisibilityMask="VisibilityMask" NamingStyle="CamelCase" IncludeInstanceMembers="True" IncludeStaticEntities="True" />
229 239 <NamingRule Name="Type Parameters" AffectedEntity="TypeParameter" VisibilityMask="VisibilityMask" NamingStyle="PascalCase" IncludeInstanceMembers="True" IncludeStaticEntities="True">
230 240 <RequiredPrefixes>
231 241 <String>T</String>
232 242 </RequiredPrefixes>
233 243 </NamingRule>
234 244 </Rules>
235 245 </NameConventionPolicy>
236 246 </Policies>
237 247 </Properties>
238 248 </MonoDevelop>
239 249 </ProjectExtensions>
240 250 </Project> No newline at end of file
@@ -1,207 +1,207
1 1 using Implab.Diagnostics;
2 2 using System;
3 3 using System.Diagnostics;
4 4 using System.Threading;
5 5
6 6 namespace Implab.Parallels {
7 7 public static class ArrayTraits {
8 8 class ArrayIterator<TSrc> : DispatchPool<int> {
9 9 readonly Action<TSrc> m_action;
10 10 readonly TSrc[] m_source;
11 11 readonly Promise<int> m_promise = new Promise<int>();
12 12 readonly LogicalOperation m_logicalOperation;
13 13
14 14 int m_pending;
15 15 int m_next;
16 16
17 17 public ArrayIterator(TSrc[] source, Action<TSrc> action, int threads)
18 18 : base(threads) {
19 19
20 20 Debug.Assert(source != null);
21 21 Debug.Assert(action != null);
22 22
23 23 m_logicalOperation = TraceContext.Instance.CurrentOperation;
24 24 m_next = 0;
25 25 m_source = source;
26 26 m_pending = source.Length;
27 27 m_action = action;
28 28
29 29 m_promise.On(Dispose, PromiseEventType.All);
30 30
31 31 InitPool();
32 32 }
33 33
34 34 public Promise<int> Promise {
35 35 get {
36 36 return m_promise;
37 37 }
38 38 }
39 39
40 40 protected override void Worker() {
41 41 TraceContext.Instance.EnterLogicalOperation(m_logicalOperation, false);
42 42 try {
43 43 base.Worker();
44 44 } finally {
45 45 TraceContext.Instance.Leave();
46 46 }
47 47 }
48 48
49 49 protected override bool TryDequeue(out int unit) {
50 50 unit = Interlocked.Increment(ref m_next) - 1;
51 51 return unit < m_source.Length;
52 52 }
53 53
54 54 protected override void InvokeUnit(int unit) {
55 55 try {
56 56 m_action(m_source[unit]);
57 57 var pending = Interlocked.Decrement(ref m_pending);
58 58 if (pending == 0)
59 59 m_promise.Resolve(m_source.Length);
60 60 } catch (Exception e) {
61 61 m_promise.Reject(e);
62 62 }
63 63 }
64 64 }
65 65
66 66 class ArrayMapper<TSrc, TDst>: DispatchPool<int> {
67 67 readonly Func<TSrc, TDst> m_transform;
68 68 readonly TSrc[] m_source;
69 69 readonly TDst[] m_dest;
70 70 readonly Promise<TDst[]> m_promise = new Promise<TDst[]>();
71 71 readonly LogicalOperation m_logicalOperation;
72 72
73 73 int m_pending;
74 74 int m_next;
75 75
76 76 public ArrayMapper(TSrc[] source, Func<TSrc, TDst> transform, int threads)
77 77 : base(threads) {
78 78
79 79 Debug.Assert (source != null);
80 80 Debug.Assert( transform != null);
81 81
82 82 m_next = 0;
83 83 m_source = source;
84 84 m_dest = new TDst[source.Length];
85 85 m_pending = source.Length;
86 86 m_transform = transform;
87 87 m_logicalOperation = TraceContext.Instance.CurrentOperation;
88 88
89 89 m_promise.On(Dispose, PromiseEventType.All);
90 90
91 91 InitPool();
92 92 }
93 93
94 94 public Promise<TDst[]> Promise {
95 95 get {
96 96 return m_promise;
97 97 }
98 98 }
99 99
100 100 protected override void Worker() {
101 101 TraceContext.Instance.EnterLogicalOperation(m_logicalOperation,false);
102 102 try {
103 103 base.Worker();
104 104 } finally {
105 105 TraceContext.Instance.Leave();
106 106 }
107 107 }
108 108
109 109 protected override bool TryDequeue(out int unit) {
110 110 unit = Interlocked.Increment(ref m_next) - 1;
111 111 return unit < m_source.Length;
112 112 }
113 113
114 114 protected override void InvokeUnit(int unit) {
115 115 try {
116 116 m_dest[unit] = m_transform(m_source[unit]);
117 117 var pending = Interlocked.Decrement(ref m_pending);
118 118 if (pending == 0)
119 119 m_promise.Resolve(m_dest);
120 120 } catch (Exception e) {
121 121 m_promise.Reject(e);
122 122 }
123 123 }
124 124 }
125 125
126 126 public static IPromise<TDst[]> ParallelMap<TSrc, TDst> (this TSrc[] source, Func<TSrc,TDst> transform, int threads) {
127 127 if (source == null)
128 128 throw new ArgumentNullException("source");
129 129 if (transform == null)
130 130 throw new ArgumentNullException("transform");
131 131
132 132 var mapper = new ArrayMapper<TSrc, TDst>(source, transform, threads);
133 133 return mapper.Promise;
134 134 }
135 135
136 136 public static IPromise<int> ParallelForEach<TSrc>(this TSrc[] source, Action<TSrc> action, int threads) {
137 137 if (source == null)
138 138 throw new ArgumentNullException("source");
139 139 if (action == null)
140 140 throw new ArgumentNullException("action");
141 141
142 142 var iter = new ArrayIterator<TSrc>(source, action, threads);
143 143 return iter.Promise;
144 144 }
145 145
146 146 public static IPromise<TDst[]> ChainedMap<TSrc, TDst>(this TSrc[] source, Func<TSrc, IPromise<TDst>> transform, int threads) {
147 147 if (source == null)
148 148 throw new ArgumentNullException("source");
149 149 if (transform == null)
150 150 throw new ArgumentNullException("transform");
151 151 if (threads <= 0)
152 152 throw new ArgumentOutOfRangeException("threads","Threads number must be greater then zero");
153 153
154 154 if (source.Length == 0)
155 return Promise<TDst[]>.ResultToPromise(new TDst[0]);
155 return Promise<TDst[]>.FromResult(new TDst[0]);
156 156
157 157 var promise = new Promise<TDst[]>();
158 158 var res = new TDst[source.Length];
159 159 var pending = source.Length;
160 160
161 161 object locker = new object();
162 162 int slots = threads;
163 163
164 164 // Analysis disable AccessToDisposedClosure
165 165 AsyncPool.RunThread<int>(() => {
166 166 for (int i = 0; i < source.Length; i++) {
167 167 if(promise.IsResolved)
168 168 break; // stop processing in case of error or cancellation
169 169 var idx = i;
170 170
171 171 if (Interlocked.Decrement(ref slots) < 0) {
172 172 lock(locker) {
173 173 while(slots < 0)
174 174 Monitor.Wait(locker);
175 175 }
176 176 }
177 177
178 178 try {
179 179 transform(source[i])
180 180 .On( x => {
181 181 Interlocked.Increment(ref slots);
182 182 lock (locker) {
183 183 Monitor.Pulse(locker);
184 184 }
185 185 })
186 186 .On(
187 187 x => {
188 188 res[idx] = x;
189 189 var left = Interlocked.Decrement(ref pending);
190 190 if (left == 0)
191 191 promise.Resolve(res);
192 192 },
193 193 promise.Reject
194 194 );
195 195
196 196 } catch (Exception e) {
197 197 promise.Reject(e);
198 198 }
199 199 }
200 200 return 0;
201 201 });
202 202
203 203 return promise;
204 204 }
205 205
206 206 }
207 207 }
@@ -1,86 +1,155
1 1 using Implab.Diagnostics;
2 2 using System;
3 3 using System.Threading;
4 4 using System.Linq;
5 5
6 6 namespace Implab.Parallels {
7 7 /// <summary>
8 8 /// Класс для распаралеливания задач.
9 9 /// </summary>
10 10 /// <remarks>
11 11 /// Используя данный класс и лямда выражения можно распараллелить
12 12 /// вычисления, для этого используется концепция обещаний.
13 13 /// </remarks>
14 14 public static class AsyncPool {
15 15
16 16 public static IPromise<T> Invoke<T>(Func<T> func) {
17 17 var p = new Promise<T>();
18 18 var caller = TraceContext.Instance.CurrentOperation;
19 19
20 20 ThreadPool.QueueUserWorkItem(param => {
21 21 TraceContext.Instance.EnterLogicalOperation(caller,false);
22 22 try {
23 23 p.Resolve(func());
24 24 } catch(Exception e) {
25 25 p.Reject(e);
26 26 } finally {
27 27 TraceContext.Instance.Leave();
28 28 }
29 29 });
30 30
31 31 return p;
32 32 }
33 33
34 public static IPromise<T> Invoke<T>(Func<ICancellationToken, T> func) {
35 var p = new Promise<T>();
36 var caller = TraceContext.Instance.CurrentOperation;
37
38 ThreadPool.QueueUserWorkItem(param => {
39 TraceContext.Instance.EnterLogicalOperation(caller,false);
40 try {
41 p.Resolve(func(p));
42 } catch(Exception e) {
43 p.Reject(e);
44 } finally {
45 TraceContext.Instance.Leave();
46 }
47 });
48
49 return p;
50 }
51
34 52 public static IPromise<T> RunThread<T>(Func<T> func) {
35 53 var p = new Promise<T>();
36 54
37 55 var caller = TraceContext.Instance.CurrentOperation;
38 56
39 57 var worker = new Thread(() => {
40 58 TraceContext.Instance.EnterLogicalOperation(caller,false);
41 59 try {
42 60 p.Resolve(func());
43 61 } catch (Exception e) {
44 62 p.Reject(e);
45 63 } finally {
46 64 TraceContext.Instance.Leave();
47 65 }
48 66 });
49 67 worker.IsBackground = true;
50 68 worker.Start();
51 69
52 70 return p;
53 71 }
54 72
73 public static IPromise<T> RunThread<T>(Func<ICancellationToken, T> func) {
74 var p = new Promise<T>();
75
76 var caller = TraceContext.Instance.CurrentOperation;
77
78 var worker = new Thread(() => {
79 TraceContext.Instance.EnterLogicalOperation(caller,false);
80 try {
81 p.Resolve(func(p));
82 } catch (Exception e) {
83 p.Reject(e);
84 } finally {
85 TraceContext.Instance.Leave();
86 }
87 });
88 worker.IsBackground = true;
89 worker.Start();
90
91 return p;
92 }
93
55 94
56 95 public static IPromise RunThread(Action func) {
57 96 var p = new Promise();
58 97
59 98 var caller = TraceContext.Instance.CurrentOperation;
60 99
61 100 var worker = new Thread(() => {
62 101 TraceContext.Instance.EnterLogicalOperation(caller,false);
63 102 try {
64 103 func();
65 104 p.Resolve();
66 105 } catch (Exception e) {
67 106 p.Reject(e);
68 107 } finally {
69 108 TraceContext.Instance.Leave();
70 109 }
71 110 });
72 111 worker.IsBackground = true;
73 112 worker.Start();
74 113
75 114 return p;
76 115 }
77 116
117 public static IPromise RunThread(Action<ICancellationToken> func) {
118 var p = new Promise();
119
120 var caller = TraceContext.Instance.CurrentOperation;
121
122 var worker = new Thread(() => {
123 TraceContext.Instance.EnterLogicalOperation(caller,false);
124 try {
125 func(p);
126 p.Resolve();
127 } catch (Exception e) {
128 p.Reject(e);
129 } finally {
130 TraceContext.Instance.Leave();
131 }
132 });
133 worker.IsBackground = true;
134 worker.Start();
135
136 return p;
137 }
138
78 139 public static IPromise[] RunThread(params Action[] func) {
79 140 return func.Select(f => RunThread(f)).ToArray();
80 141 }
81 142
143 public static IPromise[] RunThread(params Action<ICancellationToken>[] func) {
144 return func.Select(f => RunThread(f)).ToArray();
145 }
146
82 147 public static IPromise<T>[] RunThread<T>(params Func<T>[] func) {
83 148 return func.Select(f => RunThread(f)).ToArray();
84 149 }
150
151 public static IPromise<T>[] RunThread<T>(params Func<ICancellationToken, T>[] func) {
152 return func.Select(f => RunThread(f)).ToArray();
153 }
85 154 }
86 155 }
@@ -1,192 +1,302
1 1 using System.Threading;
2 2 using System;
3 3 using Implab.Diagnostics;
4 4 using System.Collections.Generic;
5 5
6 6
7 7 #if NET_4_5
8 8 using System.Threading.Tasks;
9 9 #endif
10 10
11 11 namespace Implab {
12 12 public static class PromiseExtensions {
13 13 public static IPromise<T> DispatchToCurrentContext<T>(this IPromise<T> that) {
14 14 Safe.ArgumentNotNull(that, "that");
15 15 var context = SynchronizationContext.Current;
16 16 if (context == null)
17 17 return that;
18 18
19 19 var p = new SyncContextPromise<T>(context);
20 20 p.On(that.Cancel, PromiseEventType.Cancelled);
21 21
22 22 that.On(
23 23 p.Resolve,
24 24 p.Reject,
25 25 p.Cancel
26 26 );
27 27 return p;
28 28 }
29 29
30 30 public static IPromise<T> DispatchToContext<T>(this IPromise<T> that, SynchronizationContext context) {
31 31 Safe.ArgumentNotNull(that, "that");
32 32 Safe.ArgumentNotNull(context, "context");
33 33
34 34 var p = new SyncContextPromise<T>(context);
35 35 p.On(that.Cancel, PromiseEventType.Cancelled);
36 36
37 37
38 38 that.On(
39 39 p.Resolve,
40 40 p.Reject,
41 41 p.Cancel
42 42 );
43 43 return p;
44 44 }
45 45
46 46 /// <summary>
47 47 /// Ensures the dispatched.
48 48 /// </summary>
49 49 /// <returns>The dispatched.</returns>
50 50 /// <param name="that">That.</param>
51 51 /// <param name="head">Head.</param>
52 52 /// <param name="cleanup">Cleanup.</param>
53 53 /// <typeparam name="TPromise">The 1st type parameter.</typeparam>
54 54 /// <typeparam name="T">The 2nd type parameter.</typeparam>
55 55 public static TPromise EnsureDispatched<TPromise,T>(this TPromise that, IPromise<T> head, Action<T> cleanup) where TPromise : IPromise{
56 56 Safe.ArgumentNotNull(that, "that");
57 57 Safe.ArgumentNotNull(head, "head");
58 58
59 59 that.On(() => head.On(cleanup), PromiseEventType.Cancelled);
60 60
61 61 return that;
62 62 }
63 63
64 64 public static AsyncCallback AsyncCallback<T>(this Promise<T> that, Func<IAsyncResult,T> callback) {
65 65 Safe.ArgumentNotNull(that, "that");
66 66 Safe.ArgumentNotNull(callback, "callback");
67 67 var op = TraceContext.Instance.CurrentOperation;
68 68 return ar => {
69 69 TraceContext.Instance.EnterLogicalOperation(op,false);
70 70 try {
71 71 that.Resolve(callback(ar));
72 72 } catch (Exception err) {
73 73 that.Reject(err);
74 74 } finally {
75 75 TraceContext.Instance.Leave();
76 76 }
77 77 };
78 78 }
79 79
80 80 static void CancelCallback(object cookie) {
81 81 ((ICancellable)cookie).Cancel();
82 82 }
83 83
84 84 /// <summary>
85 85 /// Cancells promise after the specified timeout is elapsed.
86 86 /// </summary>
87 87 /// <param name="that">The promise to cancel on timeout.</param>
88 88 /// <param name="milliseconds">The timeout in milliseconds.</param>
89 89 /// <typeparam name="TPromise">The 1st type parameter.</typeparam>
90 90 public static TPromise Timeout<TPromise>(this TPromise that, int milliseconds) where TPromise : IPromise {
91 91 Safe.ArgumentNotNull(that, "that");
92 92 var timer = new Timer(CancelCallback, that, milliseconds, -1);
93 93 that.On(timer.Dispose, PromiseEventType.All);
94 94 return that;
95 95 }
96 96
97 97 public static IPromise Bundle(this ICollection<IPromise> that) {
98 98 Safe.ArgumentNotNull(that, "that");
99 99
100 100 int count = that.Count;
101 101 int errors = 0;
102 102 var medium = new Promise();
103 103
104 104 if (count == 0) {
105 105 medium.Resolve();
106 106 return medium;
107 107 }
108 108
109 109 medium.On(() => {
110 110 foreach(var p2 in that)
111 111 p2.Cancel();
112 112 }, PromiseEventType.ErrorOrCancel);
113 113
114 114 foreach (var p in that)
115 115 p.On(
116 116 () => {
117 117 if (Interlocked.Decrement(ref count) == 0)
118 118 medium.Resolve();
119 119 },
120 120 error => {
121 121 if (Interlocked.Increment(ref errors) == 1)
122 122 medium.Reject(
123 123 new Exception("The dependency promise is failed", error)
124 124 );
125 125 },
126 126 reason => {
127 127 if (Interlocked.Increment(ref errors) == 1)
128 128 medium.Cancel(
129 129 new Exception("The dependency promise is cancelled")
130 130 );
131 131 }
132 132 );
133 133
134 134 return medium;
135 135 }
136 136
137 137 public static IPromise<T[]> Bundle<T>(this ICollection<IPromise<T>> that) {
138 138 Safe.ArgumentNotNull(that, "that");
139 139
140 140 int count = that.Count;
141 141 int errors = 0;
142 142 var medium = new Promise<T[]>();
143 143 var results = new T[that.Count];
144 144
145 145 medium.On(() => {
146 146 foreach(var p2 in that)
147 147 p2.Cancel();
148 148 }, PromiseEventType.ErrorOrCancel);
149 149
150 150 int i = 0;
151 151 foreach (var p in that) {
152 152 var idx = i;
153 153 p.On(
154 154 x => {
155 155 results[idx] = x;
156 156 if (Interlocked.Decrement(ref count) == 0)
157 157 medium.Resolve(results);
158 158 },
159 159 error => {
160 160 if (Interlocked.Increment(ref errors) == 1)
161 161 medium.Reject(
162 162 new Exception("The dependency promise is failed", error)
163 163 );
164 164 },
165 165 reason => {
166 166 if (Interlocked.Increment(ref errors) == 1)
167 167 medium.Cancel(
168 168 new Exception("The dependency promise is cancelled", reason)
169 169 );
170 170 }
171 171 );
172 172 i++;
173 173 }
174 174
175 175 return medium;
176 176 }
177
178 public static IPromise Then(this IPromise that, Action success, Action<Exception> error, Action<Exception> cancel) {
179 Safe.ArgumentNotNull(that, "that");
180
181 var d = new ActionTask(success, error, cancel);
182 that.On(d.Resolve, d.Reject, d.CancelOperation);
183 if (success != null)
184 d.CancellationRequested(that.Cancel);
185 return d;
186 }
187
188 public static IPromise Then(this IPromise that, Action success, Action<Exception> error) {
189 return Then(that, success, error, null);
190 }
191
192 public static IPromise Then(this IPromise that, Action success) {
193 return Then(that, success, null, null);
194 }
195
196 public static IPromise<T> Then<T>(this IPromise that, Func<T> success, Func<Exception, T> error, Func<Exception, T> cancel) {
197 Safe.ArgumentNotNull(that, "that");
198
199 var d = new FuncTask<T>(success, error, cancel);
200 that.On(d.Resolve, d.Reject, d.CancelOperation);
201 if (success != null)
202 d.CancellationRequested(that.Cancel);
203 return d;
204 }
205
206 public static IPromise<T> Then<T>(this IPromise that, Func<T> success, Func<Exception, T> error) {
207 return Then(that, success, error, null);
208 }
209
210 public static IPromise<T> Then<T>(this IPromise that, Func<T> success) {
211 return Then(that, success, null, null);
212 }
213
214 public static IPromise<T2> Then<T, T2>(this IPromise<T> that, Func<T, T2> success, Func<Exception, T2> error, Func<Exception, T2> cancel) {
215 Safe.ArgumentNotNull(that, "that");
216 var d = new FuncTask<T,T2>(success, error, cancel);
217 that.On(d.Resolve, d.Reject, d.CancelOperation);
218 if (success != null)
219 d.CancellationRequested(that.Cancel);
220 return d;
221 }
222
223 public static IPromise<T2> Then<T, T2>(this IPromise<T> that, Func<T, T2> success, Func<Exception, T2> error) {
224 return Then(that, success, error, null);
225 }
226
227 public static IPromise<T2> Then<T, T2>(this IPromise<T> that, Func<T, T2> success) {
228 return Then(that, success, null, null);
229 }
230
231 #region chain traits
232 public static IPromise Chain(this IPromise that, Func<IPromise> success, Func<Exception,IPromise> error, Func<Exception,IPromise> cancel) {
233 Safe.ArgumentNotNull(that, "that");
234
235 var d = new ActionChainTask(success, error, cancel);
236 that.On(d.Resolve, d.Reject, d.CancelOperation);
237 if (success != null)
238 d.CancellationRequested(that.Cancel);
239 return d;
240 }
241
242 public static IPromise Chain(this IPromise that, Func<IPromise> success, Func<Exception,IPromise> error) {
243 return Chain(that, success, error, null);
244 }
245
246 public static IPromise Chain(this IPromise that, Func<IPromise> success) {
247 return Chain(that, success, null, null);
248 }
249
250 public static IPromise<T> Chain<T>(this IPromise that, Func<IPromise<T>> success, Func<Exception, IPromise<T>> error, Func<Exception, IPromise<T>> cancel) {
251 Safe.ArgumentNotNull(that, "that");
252
253 var d = new FuncChainTask<T>(success, error, cancel);
254 that.On(d.Resolve, d.Reject, d.CancelOperation);
255 if (success != null)
256 d.CancellationRequested(that.Cancel);
257 return d;
258 }
259
260 public static IPromise<T> Chain<T>(this IPromise that, Func<IPromise<T>> success, Func<Exception, IPromise<T>> error) {
261 return Chain(that, success, error, null);
262 }
263
264 public static IPromise<T> Chain<T>(this IPromise that, Func<IPromise<T>> success) {
265 return Chain(that, success, null, null);
266 }
267
268 public static IPromise<T2> Chain<T, T2>(this IPromise<T> that, Func<T, IPromise<T2>> success, Func<Exception, IPromise<T2>> error, Func<Exception, IPromise<T2>> cancel) {
269 Safe.ArgumentNotNull(that, "that");
270 var d = new FuncChainTask<T,T2>(success, error, cancel);
271 that.On(d.Resolve, d.Reject, d.CancelOperation);
272 if (success != null)
273 d.CancellationRequested(that.Cancel);
274 return d;
275 }
276
277 public static IPromise<T2> Chain<T, T2>(this IPromise<T> that, Func<T, IPromise<T2>> success, Func<Exception, IPromise<T2>> error) {
278 return Chain(that, success, error, null);
279 }
280
281 public static IPromise<T2> Chain<T, T2>(this IPromise<T> that, Func<T, IPromise<T2>> success) {
282 return Chain(that, success, null, null);
283 }
284
285 #endregion
286
177 287
178 288 #if NET_4_5
179 289
180 290 public static Task<T> GetTask<T>(this IPromise<T> that) {
181 291 Safe.ArgumentNotNull(that, "that");
182 292 var tcs = new TaskCompletionSource<T>();
183 293
184 294 that.On(tcs.SetResult, tcs.SetException, r => tcs.SetCanceled());
185 295
186 296 return tcs.Task;
187 297 }
188 298
189 299 #endif
190 300 }
191 301 }
192 302
@@ -1,118 +1,118
1 1 using System;
2 2 using System.Collections.Generic;
3 3 using System.Linq;
4 4 using System.Text;
5 5 using System.Text.RegularExpressions;
6 6 using System.Diagnostics;
7 7
8 8 namespace Implab
9 9 {
10 10 public static class Safe
11 11 {
12 12 public static void ArgumentMatch(string value, string paramName, Regex rx) {
13 13 if (rx == null)
14 14 throw new ArgumentNullException("rx");
15 15 if (!rx.IsMatch(value))
16 16 throw new ArgumentException(String.Format("The prameter value must match {0}", rx), paramName);
17 17 }
18 18
19 19 public static void ArgumentNotEmpty(string value, string paramName) {
20 20 if (String.IsNullOrEmpty(value))
21 21 throw new ArgumentException("The parameter can't be empty", paramName);
22 22 }
23 23
24 24 public static void ArgumentNotEmpty<T>(T[] value, string paramName) {
25 25 if (value == null || value.Length == 0)
26 26 throw new ArgumentException("The array must be not emty", paramName);
27 27 }
28 28
29 29 public static void ArgumentNotNull(object value, string paramName) {
30 30 if (value == null)
31 31 throw new ArgumentNullException(paramName);
32 32 }
33 33
34 34 public static void ArgumentInRange(int value, int min, int max, string paramName) {
35 35 if (value < min || value > max)
36 36 throw new ArgumentOutOfRangeException(paramName);
37 37 }
38 38
39 39 public static void Dispose(params IDisposable[] objects) {
40 40 foreach (var d in objects)
41 41 if (d != null)
42 42 d.Dispose();
43 43 }
44 44
45 45 public static void Dispose(params object[] objects) {
46 46 foreach (var obj in objects) {
47 47 var d = obj as IDisposable;
48 48 if (d != null)
49 49 d.Dispose();
50 50 }
51 51 }
52 52
53 53 public static void Dispose(object obj) {
54 54 var d = obj as IDisposable;
55 55 if (d != null)
56 56 d.Dispose();
57 57 }
58 58
59 59 [DebuggerStepThrough]
60 60 public static IPromise<T> WrapPromise<T>(Func<T> action) {
61 61 ArgumentNotNull(action, "action");
62 62
63 63 var p = new Promise<T>();
64 64 try {
65 65 p.Resolve(action());
66 66 } catch (Exception err) {
67 67 p.Reject(err);
68 68 }
69 69
70 70 return p;
71 71 }
72 72
73 73 [DebuggerStepThrough]
74 74 public static IPromise WrapPromise(Action action) {
75 75 ArgumentNotNull(action, "action");
76 76
77 77 var p = new Promise();
78 78 try {
79 79 action();
80 80 p.Resolve();
81 81 } catch (Exception err) {
82 82 p.Reject(err);
83 83 }
84 84
85 85 return p;
86 86 }
87 87
88 88 [DebuggerStepThrough]
89 89 public static IPromise InvokePromise(Func<IPromise> action) {
90 90 ArgumentNotNull(action, "action");
91 91
92 92 try {
93 93 var p = action();
94 94 if (p == null) {
95 95 var d = new Promise();
96 96 d.Reject(new Exception("The action returned null"));
97 97 p = d;
98 98 }
99 99 return p;
100 100 } catch (Exception err) {
101 101 var p = new Promise();
102 102 p.Reject(err);
103 103 return p;
104 104 }
105 105 }
106 106
107 107 [DebuggerStepThrough]
108 108 public static IPromise<T> InvokePromise<T>(Func<IPromise<T>> action) {
109 109 ArgumentNotNull(action, "action");
110 110
111 111 try {
112 return action() ?? Promise<T>.ExceptionToPromise(new Exception("The action returned null"));
112 return action() ?? Promise<T>.FromException(new Exception("The action returned null"));
113 113 } catch (Exception err) {
114 return Promise<T>.ExceptionToPromise(err);
114 return Promise<T>.FromException(err);
115 115 }
116 116 }
117 117 }
118 118 }
@@ -1,26 +1,26
1 1 using System.Threading;
2 2 using System;
3 3
4 4 namespace Implab {
5 5 public class SyncContextPromise<T> : Promise<T> {
6 6 readonly SynchronizationContext m_context;
7 7
8 8 public SyncContextPromise(SynchronizationContext context) {
9 9 Safe.ArgumentNotNull(context, "context");
10 10 m_context = context;
11 11 }
12 12
13 protected override void SignalSuccess(IDeferred<T> handler) {
13 protected override void SignalSuccess(Promise<T>.HandlerDescriptor handler) {
14 14 m_context.Post(x => base.SignalSuccess(handler), null);
15 15 }
16 16
17 protected override void SignalError(IDeferred<T> handler, Exception error) {
17 protected override void SignalError(Promise<T>.HandlerDescriptor handler, Exception error) {
18 18 m_context.Post(x => base.SignalError(handler, error), null);
19 19 }
20 20
21 protected override void SignalCancelled(IDeferred<T> handler, Exception reason) {
21 protected override void SignalCancelled(Promise<T>.HandlerDescriptor handler, Exception reason) {
22 22 m_context.Post(x => base.SignalCancelled(handler, reason), null);
23 23 }
24 24 }
25 25 }
26 26
@@ -1,93 +1,40
1 1 using System;
2 2 using Implab.Diagnostics;
3 3 using Implab.Parallels;
4 4 using Implab;
5 5 using System.Collections.Generic;
6 6 using System.Collections.Concurrent;
7 7 using System.Threading;
8 8
9 9 namespace MonoPlay {
10 10 class MainClass {
11
12
11 13 public static void Main(string[] args) {
12 14 if (args == null)
13 15 throw new ArgumentNullException("args");
14 16
15 17 var t1 = Environment.TickCount;
16 18
17 const int reads = 100000;
18 const int writes = 1000;
19 const int readThreads = 8;
20 const int writeThreads = 0;
21
22 var l = new SharedLock();
23 var st = new HashSet<int>();
24
25 Action reader1 = () => {
26 for (int i =0; i < reads; i++) {
27 try {
28 l.LockShared();
29 st.Contains(i % 1000);
30 Thread.Sleep(0);
31 } finally {
32 l.Release();
33 }
34 }
35 };
36
37 Action reader2 = () => {
38 for(var i = 0; i < reads; i++)
39 lock(st) {
40 st.Contains(i % 1000);
41 Thread.Sleep(0);
42 }
43 };
44
45 Action writer1 = () => {
46 var rnd = new Random(Environment.TickCount);
47 for (int i = 0; i < writes; i++) {
48 try {
49 l.LockExclusive();
50 st.Add(rnd.Next(1000));
51 //Thread.Sleep(1);
52 } finally {
53 l.Release();
54 }
55 }
56 };
57
58 Action writer2 = () => {
59 var rnd = new Random(Environment.TickCount);
60 for (int i = 0; i < writes; i++) {
61 lock (st) {
62 st.Add(rnd.Next(1000));
63 //Thread.Sleep(1);
64 }
65 }
66 };
67
68
69
70 var readers = new IPromise[readThreads];
71 for (int i = 0; i < readThreads; i++)
72 readers[i] = AsyncPool.RunThread(reader2);
73
74 var writers = new IPromise[writeThreads];
75 for (int i = 0; i < writeThreads; i++)
76 writers[i] = AsyncPool.RunThread(writer1);
77
78
79 new [] {
80 readers.Bundle().On(() => Console.WriteLine("readers complete in {0} ms", Environment.TickCount - t1)),
81 writers.Bundle().On(() => Console.WriteLine("writers complete in {0} ms", Environment.TickCount - t1))
82 }.Bundle().Join();
83
84
19 for (int i = 0; i < 10000000; i++) {
20
21 var p = new Promise<int>();
22 p.On(HandleResult);
23 p.Resolve(i);
24 }
85 25
86 26 var t2 = Environment.TickCount;
87 27 Console.WriteLine("done: {0} ms, {1:.00} Mb, {2} GC", t2 - t1, GC.GetTotalMemory(false) / (1024*1024), GC.CollectionCount(0) );
88 28
89 29 }
90 30
31 static void HandleAction ()
32 {
33
34 }
91 35
36 static void HandleResult(int x) {
37
38 }
92 39 }
93 40 }
1 NO CONTENT: file was removed
1 NO CONTENT: file was removed
General Comments 0
You need to be logged in to leave comments. Login now