##// END OF EJS Templates
Working on promises
cin -
r242:cbe10ac0731e v3
parent child
Show More
@@ -1,204 +1,204
1 using Implab.Components;
1 using Implab.Components;
2 using Implab.Diagnostics;
2 using Implab.Diagnostics;
3 using Implab.Parallels;
3 using Implab.Parallels;
4 using System;
4 using System;
5 using System.Collections.Generic;
5 using System.Collections.Generic;
6 using System.Linq;
6 using System.Linq;
7 using System.Text;
7 using System.Text;
8 using System.Threading;
8 using System.Threading;
9 using System.Threading.Tasks;
9 using System.Threading.Tasks;
10 using System.Windows.Forms;
10 using System.Windows.Forms;
11
11
12 namespace Implab.Fx {
12 namespace Implab.Fx {
13 public class StaApartment : RunnableComponent {
13 public class StaApartment : RunnableComponent {
14 readonly Thread m_worker;
14 readonly Thread m_worker;
15 SynchronizationContext m_syncContext;
15 SynchronizationContext m_syncContext;
16 SyncContextPromise m_enterPromise;
16 SyncContextPromise m_enterPromise;
17
17
18 readonly Promise m_threadStarted;
18 readonly Promise m_threadStarted;
19 readonly Promise m_threadTerminated;
19 readonly Promise m_threadTerminated;
20
20
21 public StaApartment() : base(true) {
21 public StaApartment() : base(true) {
22 m_threadStarted = new Promise();
22 m_threadStarted = new Promise();
23 m_threadTerminated = new Promise();
23 m_threadTerminated = new Promise();
24
24
25 m_worker = new Thread(WorkerEntry);
25 m_worker = new Thread(WorkerEntry);
26 m_worker.SetApartmentState(ApartmentState.STA);
26 m_worker.SetApartmentState(ApartmentState.STA);
27 m_worker.IsBackground = true;
27 m_worker.IsBackground = true;
28 m_worker.Name = "STA managed aparment";
28 m_worker.Name = "STA managed aparment";
29 }
29 }
30
30
31 public SynchronizationContext SyncContext {
31 public SynchronizationContext SyncContext {
32 get {
32 get {
33 if (m_syncContext == null)
33 if (m_syncContext == null)
34 throw new InvalidOperationException();
34 throw new InvalidOperationException();
35 return m_syncContext;
35 return m_syncContext;
36 }
36 }
37 }
37 }
38
38
39 /// <summary>
39 /// <summary>
40 /// Returns the promise which will dispatch all handlers inside the apartment using it's <see cref="SynchronizationContext"/>
40 /// Returns the promise which will dispatch all handlers inside the apartment using it's <see cref="SynchronizationContext"/>
41 /// </summary>
41 /// </summary>
42 /// <remarks>
42 /// <remarks>
43 /// Current implementation is optimized and will lost aync operation stack
43 /// Current implementation is optimized and will lost aync operation stack
44 /// </remarks>
44 /// </remarks>
45 /// <returns>The promise</returns>
45 /// <returns>The promise</returns>
46 public IPromise Enter() {
46 public IPromise Enter() {
47 if (m_enterPromise == null)
47 if (m_enterPromise == null)
48 throw new InvalidOperationException();
48 throw new InvalidOperationException();
49 return m_enterPromise;
49 return m_enterPromise;
50 }
50 }
51
51
52 public IPromise Invoke(Action<ICancellationToken> action) {
52 public IPromise Invoke(Action<ICancellationToken> action) {
53 Safe.ArgumentNotNull(action, "action");
53 Safe.ArgumentNotNull(action, "action");
54
54
55 if (m_syncContext == null)
55 if (m_syncContext == null)
56 throw new InvalidOperationException();
56 throw new InvalidOperationException();
57 var p = new Promise();
57 var p = new Promise();
58 var lop = TraceContext.Instance.CurrentOperation;
58 var lop = TraceContext.Instance.CurrentOperation;
59
59
60 m_syncContext.Post(x => {
60 m_syncContext.Post(x => {
61 TraceContext.Instance.EnterLogicalOperation(lop, false);
61 TraceContext.Instance.EnterLogicalOperation(lop, false);
62 try {
62 try {
63 if (p.CancelOperationIfRequested())
63 if (p.CancelOperationIfRequested())
64 return;
64 return;
65
65
66 action(p);
66 action(p);
67 p.Resolve();
67 p.Resolve();
68 } catch (Exception e) {
68 } catch (Exception e) {
69 p.Reject(e);
69 p.Reject(e);
70 } finally {
70 } finally {
71 TraceContext.Instance.Leave();
71 TraceContext.Instance.Leave();
72 }
72 }
73 }, null);
73 }, null);
74
74
75 return p;
75 return p;
76 }
76 }
77
77
78 public IPromise<T> Invoke<T>(Func<ICancellationToken, T> action) {
78 public IPromise<T> Invoke<T>(Func<ICancellationToken, T> action) {
79 Safe.ArgumentNotNull(action, "action");
79 Safe.ArgumentNotNull(action, "action");
80
80
81 if (m_syncContext == null)
81 if (m_syncContext == null)
82 throw new InvalidOperationException();
82 throw new InvalidOperationException();
83 var p = new Promise<T>();
83 var p = new Promise<T>();
84 var lop = TraceContext.Instance.CurrentOperation;
84 var lop = TraceContext.Instance.CurrentOperation;
85
85
86 m_syncContext.Post(x => {
86 m_syncContext.Post(x => {
87 TraceContext.Instance.EnterLogicalOperation(lop, false);
87 TraceContext.Instance.EnterLogicalOperation(lop, false);
88 try {
88 try {
89 if (p.CancelOperationIfRequested())
89 if (p.CancelOperationIfRequested())
90 return;
90 return;
91 p.Resolve(action(p));
91 p.Resolve(action(p));
92 } catch (Exception e) {
92 } catch (Exception e) {
93 p.Reject(e);
93 p.Reject(e);
94 } finally {
94 } finally {
95 TraceContext.Instance.Leave();
95 TraceContext.Instance.Leave();
96 }
96 }
97 }, null);
97 }, null);
98
98
99 return p;
99 return p;
100 }
100 }
101
101
102 public IPromise Invoke(Action action) {
102 public IPromise Invoke(Action action) {
103 Safe.ArgumentNotNull(action, "action");
103 Safe.ArgumentNotNull(action, "action");
104
104
105 if (m_syncContext == null)
105 if (m_syncContext == null)
106 throw new InvalidOperationException();
106 throw new InvalidOperationException();
107 var p = new Promise();
107 var p = new Promise();
108 var lop = TraceContext.Instance.CurrentOperation;
108 var lop = TraceContext.Instance.CurrentOperation;
109
109
110 m_syncContext.Post(x => {
110 m_syncContext.Post(x => {
111 TraceContext.Instance.EnterLogicalOperation(lop, false);
111 TraceContext.Instance.EnterLogicalOperation(lop, false);
112 try {
112 try {
113 if (p.CancelOperationIfRequested())
113 if (p.CancelOperationIfRequested())
114 return;
114 return;
115 action();
115 action();
116 p.Resolve();
116 p.Resolve();
117 } catch (Exception e) {
117 } catch (Exception e) {
118 p.Reject(e);
118 p.Reject(e);
119 } finally {
119 } finally {
120 TraceContext.Instance.Leave();
120 TraceContext.Instance.Leave();
121 }
121 }
122 }, null);
122 }, null);
123
123
124 return p;
124 return p;
125 }
125 }
126
126
127 public IPromise<T> Invoke<T>(Func<T> action) {
127 public IPromise<T> Invoke<T>(Func<T> action) {
128 Safe.ArgumentNotNull(action, "action");
128 Safe.ArgumentNotNull(action, "action");
129
129
130 if (m_syncContext == null)
130 if (m_syncContext == null)
131 throw new InvalidOperationException();
131 throw new InvalidOperationException();
132 var p = new Promise<T>();
132 var p = new Promise<T>();
133 var lop = TraceContext.Instance.CurrentOperation;
133 var lop = TraceContext.Instance.CurrentOperation;
134
134
135 m_syncContext.Post(x => {
135 m_syncContext.Post(x => {
136 TraceContext.Instance.EnterLogicalOperation(lop, false);
136 TraceContext.Instance.EnterLogicalOperation(lop, false);
137 try {
137 try {
138 if (p.CancelOperationIfRequested())
138 if (p.CancelOperationIfRequested())
139 return;
139 return;
140 p.Resolve(action());
140 p.Resolve(action());
141 } catch (Exception e) {
141 } catch (Exception e) {
142 p.Reject(e);
142 p.Reject(e);
143 } finally {
143 } finally {
144 TraceContext.Instance.Leave();
144 TraceContext.Instance.Leave();
145 }
145 }
146 }, null);
146 }, null);
147
147
148 return p;
148 return p;
149 }
149 }
150
150
151
151
152 /// <summary>
152 /// <summary>
153 /// Starts the apartment thread
153 /// Starts the apartment thread
154 /// </summary>
154 /// </summary>
155 /// <returns>Promise which will be fullfiled when the syncronization
155 /// <returns>Promise which will be fullfiled when the syncronization
156 /// context will be ready to accept tasks.</returns>
156 /// context will be ready to accept tasks.</returns>
157 protected override IPromise OnStart() {
157 protected override IPromise OnStart() {
158 m_worker.Start();
158 m_worker.Start();
159 return m_threadStarted;
159 return m_threadStarted;
160 }
160 }
161
161
162 /// <summary>
162 /// <summary>
163 /// Posts quit message to the message loop of the apartment
163 /// Posts quit message to the message loop of the apartment
164 /// </summary>
164 /// </summary>
165 /// <returns>Promise</returns>
165 /// <returns>Promise</returns>
166 protected override IPromise OnStop() {
166 protected override IPromise OnStop() {
167 m_syncContext.Post(x => Application.ExitThread(), null);
167 m_syncContext.Post(x => Application.ExitThread(), null);
168 return m_threadTerminated;
168 return m_threadTerminated;
169 }
169 }
170
170
171 void WorkerEntry() {
171 void WorkerEntry() {
172 m_syncContext = new WindowsFormsSynchronizationContext();
172 m_syncContext = new WindowsFormsSynchronizationContext();
173 SynchronizationContext.SetSynchronizationContext(m_syncContext);
173 SynchronizationContext.SetSynchronizationContext(m_syncContext);
174 m_enterPromise = new SyncContextPromise(m_syncContext);
174 m_enterPromise = new SyncContextPromise(m_syncContext);
175 m_threadStarted.Resolve();
175 m_threadStarted.Resolve();
176 m_enterPromise.Resolve();
176 m_enterPromise.Resolve();
177
177
178 Application.OleRequired();
178 Application.OleRequired();
179 Application.Run();
179 Application.Run();
180
180
181 try {
181 try {
182 OnShutdown();
182 OnShutdown();
183 m_threadTerminated.Resolve();
183 m_threadTerminated.Resolve();
184 } catch(Exception err) {
184 } catch(Exception err) {
185 m_threadTerminated.Reject(err);
185 m_threadTerminated.Reject(err);
186 }
186 }
187 }
187 }
188
188
189 /// <summary>
189 /// <summary>
190 /// Called from the STA apartment after the message loop is terminated, override this
190 /// Called from the STA apartment after the message loop is terminated, override this
191 /// method to handle apartment cleanup.
191 /// method to handle apartment cleanup.
192 /// </summary>
192 /// </summary>
193 protected virtual void OnShutdown() {
193 protected virtual void OnShutdown() {
194 }
194 }
195
195
196 protected override void Dispose(bool disposing) {
196 protected override void Dispose(bool disposing) {
197 if (disposing) {
197 if (disposing) {
198 if (!m_threadTerminated.IsResolved)
198 if (!m_threadTerminated.IsFulfilled)
199 m_syncContext.Post(x => Application.ExitThread(), null);
199 m_syncContext.Post(x => Application.ExitThread(), null);
200 }
200 }
201 base.Dispose(disposing);
201 base.Dispose(disposing);
202 }
202 }
203 }
203 }
204 }
204 }
@@ -1,148 +1,148
1 using System;
1 using System;
2 using Implab.Parallels;
2 using Implab.Parallels;
3
3
4 #if MONO
4 #if MONO
5
5
6 using NUnit.Framework;
6 using NUnit.Framework;
7 using TestClassAttribute = NUnit.Framework.TestFixtureAttribute;
7 using TestClassAttribute = NUnit.Framework.TestFixtureAttribute;
8 using TestMethodAttribute = NUnit.Framework.TestAttribute;
8 using TestMethodAttribute = NUnit.Framework.TestAttribute;
9
9
10 #else
10 #else
11
11
12 using Microsoft.VisualStudio.TestTools.UnitTesting;
12 using Microsoft.VisualStudio.TestTools.UnitTesting;
13
13
14 #endif
14 #endif
15
15
16 namespace Implab.Test {
16 namespace Implab.Test {
17 [TestClass]
17 [TestClass]
18 public class CancelationTests {
18 public class CancelationTests {
19
19
20 [TestMethod]
20 [TestMethod]
21 public void PromiseCancelTest() {
21 public void PromiseCancelTest() {
22 var p = new Promise();
22 var p = new Promise();
23 bool requested = false;
23 bool requested = false;
24 var reason = new Exception("Test");
24 var reason = new Exception("Test");
25
25
26 // request cancelation
26 // request cancelation
27 p.Cancel(reason);
27 p.Cancel(reason);
28
28
29 Assert.IsTrue(p.IsCancellationRequested);
29 Assert.IsTrue(p.IsCancellationRequested);
30 Assert.AreSame(reason, p.CancellationReason);
30 Assert.AreSame(reason, p.CancellationReason);
31 Assert.IsFalse(p.IsCancelled);
31 Assert.IsFalse(p.IsCancelled);
32
32
33 p.CancellationRequested(r => {
33 p.CancellationRequested(r => {
34 Assert.AreSame(reason, r);
34 Assert.AreSame(reason, r);
35 requested = true;
35 requested = true;
36 });
36 });
37
37
38 Assert.IsTrue(requested);
38 Assert.IsTrue(requested);
39
39
40 // cancel the promise
40 // cancel the promise
41 Assert.IsTrue(p.CancelOperationIfRequested());
41 Assert.IsTrue(p.CancelOperationIfRequested());
42 Assert.IsTrue(p.IsCancelled);
42 Assert.IsTrue(p.IsCancelled);
43 Assert.AreSame(reason, p.Error);
43 Assert.AreSame(reason, p.RejectReason);
44 }
44 }
45
45
46 [TestMethod]
46 [TestMethod]
47 public void CancelActionBeforeStartTask() {
47 public void CancelActionBeforeStartTask() {
48 bool run = false;
48 bool run = false;
49 var task = new ActionTask(() => {
49 var task = new ActionTask(() => {
50 run = true;
50 run = true;
51 }, null, null, true);
51 }, null, null, true);
52
52
53 // request cancelation
53 // request cancelation
54 task.Cancel();
54 task.Cancel();
55 Assert.IsTrue(task.IsCancelled);
55 Assert.IsTrue(task.IsCancelled);
56 task.Resolve();
56 task.Resolve();
57 Assert.IsFalse(run);
57 Assert.IsFalse(run);
58 }
58 }
59
59
60 [TestMethod]
60 [TestMethod]
61 public void CancelActionAfterTaskStarted() {
61 public void CancelActionAfterTaskStarted() {
62 var finish = new Signal();
62 var finish = new Signal();
63 var started = new Signal();
63 var started = new Signal();
64
64
65 var task = new ActionTask(() => {
65 var task = new ActionTask(() => {
66 started.Set();
66 started.Set();
67 finish.Wait();
67 finish.Wait();
68 }, null, null, true);
68 }, null, null, true);
69
69
70 AsyncPool.RunThread(() => {
70 AsyncPool.RunThread(() => {
71 task.Resolve();
71 task.Resolve();
72 });
72 });
73
73
74 started.Wait(1000);
74 started.Wait(1000);
75
75
76 task.Cancel();
76 task.Cancel();
77 Assert.IsTrue(task.IsCancellationRequested);
77 Assert.IsTrue(task.IsCancellationRequested);
78 Assert.IsFalse(task.IsCancelled);
78 Assert.IsFalse(task.IsCancelled);
79 Assert.IsFalse(task.IsResolved);
79 Assert.IsFalse(task.IsFulfilled);
80
80
81 finish.Set();
81 finish.Set();
82 task.Join(1000);
82 task.Join(1000);
83
83
84 }
84 }
85
85
86 [TestMethod]
86 [TestMethod]
87 public void CancelTaskChainFromBottom() {
87 public void CancelTaskChainFromBottom() {
88 var started = new Signal();
88 var started = new Signal();
89 var check1 = new Signal();
89 var check1 = new Signal();
90 var requested = false;
90 var requested = false;
91 var p1 = AsyncPool.RunThread(token => {
91 var p1 = AsyncPool.RunThread(token => {
92 token.CancellationRequested(reason => requested = true);
92 token.CancellationRequested(reason => requested = true);
93 started.Set();
93 started.Set();
94 check1.Wait();
94 check1.Wait();
95 token.CancelOperationIfRequested();
95 token.CancelOperationIfRequested();
96 });
96 });
97
97
98 started.Wait();
98 started.Wait();
99
99
100 var p2 = p1.Then(() => {
100 var p2 = p1.Then(() => {
101 });
101 });
102
102
103 Assert.IsFalse(p1.IsResolved);
103 Assert.IsFalse(p1.IsResolved);
104 Assert.IsFalse(p2.IsResolved);
104 Assert.IsFalse(p2.IsResolved);
105
105
106 p2.Cancel();
106 p2.Cancel();
107
107
108 Assert.IsFalse(p2.IsCancelled);
108 Assert.IsFalse(p2.IsCancelled);
109 Assert.IsFalse(p1.IsCancelled);
109 Assert.IsFalse(p1.IsCancelled);
110 Assert.IsTrue(requested);
110 Assert.IsTrue(requested);
111
111
112 check1.Set();
112 check1.Set();
113
113
114 try {
114 try {
115 p2.Join(1000);
115 p2.Join(1000);
116 Assert.Fail("The chain isn't cancelled");
116 Assert.Fail("The chain isn't cancelled");
117 } catch(OperationCanceledException){
117 } catch(OperationCanceledException){
118 }
118 }
119
119
120 Assert.IsTrue(p1.IsCancelled);
120 Assert.IsTrue(p1.IsCancelled);
121 Assert.IsTrue(p2.IsCancelled);
121 Assert.IsTrue(p2.IsCancelled);
122 }
122 }
123
123
124
124
125
125
126 [TestMethod]
126 [TestMethod]
127 public void CancellableAsyncTask() {
127 public void CancellableAsyncTask() {
128 var finish = new Signal();
128 var finish = new Signal();
129 var started = new Signal();
129 var started = new Signal();
130
130
131 var p = AsyncPool.RunThread(token => {
131 var p = AsyncPool.RunThread(token => {
132 token.CancellationRequested(r => finish.Set());
132 token.CancellationRequested(r => finish.Set());
133 started.Set();
133 started.Set();
134 finish.Wait();
134 finish.Wait();
135 Assert.IsTrue(token.CancelOperationIfRequested());
135 Assert.IsTrue(token.CancelOperationIfRequested());
136 });
136 });
137
137
138 started.Wait(1000);
138 started.Wait(1000);
139 Assert.IsFalse(p.IsResolved);
139 Assert.IsFalse(p.IsResolved);
140 p.Cancel();
140 p.Cancel();
141 try {
141 try {
142 p.Join(1000);
142 p.Join(1000);
143 } catch (OperationCanceledException) {
143 } catch (OperationCanceledException) {
144 }
144 }
145 }
145 }
146 }
146 }
147 }
147 }
148
148
@@ -1,298 +1,170
1 using System;
1 using System;
2 using Implab.Parallels;
2 using Implab.Parallels;
3 using System.Threading;
3 using System.Threading;
4 using System.Reflection;
4 using System.Reflection;
5 using System.Diagnostics;
5
6
6 namespace Implab {
7 namespace Implab {
7 public abstract class AbstractEvent<THandler> : ICancellable {
8 public abstract class AbstractEvent<THandler> where THandler : class {
8
9
9 const int UNRESOLVED_SATE = 0;
10 const int PENDING_SATE = 0;
10 const int TRANSITIONAL_STATE = 1;
11 protected const int TRANSITIONAL_STATE = 1;
12
11 protected const int SUCCEEDED_STATE = 2;
13 protected const int SUCCEEDED_STATE = 2;
12 protected const int REJECTED_STATE = 3;
14 protected const int REJECTED_STATE = 3;
13 protected const int CANCELLED_STATE = 4;
14
15
15 const int CANCEL_NOT_REQUESTED = 0;
16 volatile int m_state;
16 const int CANCEL_REQUESTING = 1;
17 const int CANCEL_REQUESTED = 2;
18
19 const int RESERVED_HANDLERS_COUNT = 4;
20
21 int m_state;
22 Exception m_error;
17 Exception m_error;
23 int m_handlersCount;
24
18
25 //readonly THandler[] m_handlers = new THandler[RESERVED_HANDLERS_COUNT];
19 THandler m_handler;
26 THandler[] m_handlers;
27 SimpleAsyncQueue<THandler> m_extraHandlers;
20 SimpleAsyncQueue<THandler> m_extraHandlers;
28 int m_handlerPointer = -1;
29 int m_handlersCommited;
30
31 int m_cancelRequest;
32 Exception m_cancelationReason;
33
21
34 #region state managment
22 #region state managment
35 bool BeginTransit() {
23 protected bool BeginTransit() {
36 return UNRESOLVED_SATE == Interlocked.CompareExchange(ref m_state, TRANSITIONAL_STATE, UNRESOLVED_SATE);
24 return PENDING_SATE == Interlocked.CompareExchange(ref m_state, TRANSITIONAL_STATE, PENDING_SATE);
37 }
25 }
38
26
39 void CompleteTransit(int state) {
27 protected void CompleteTransit(int state) {
28 #if DEBUG
40 if (TRANSITIONAL_STATE != Interlocked.CompareExchange(ref m_state, state, TRANSITIONAL_STATE))
29 if (TRANSITIONAL_STATE != Interlocked.CompareExchange(ref m_state, state, TRANSITIONAL_STATE))
41 throw new InvalidOperationException("Can't complete transition when the object isn't in the transitional state");
30 throw new InvalidOperationException("Can't complete transition when the object isn't in the transitional state");
31 #else
32 m_state = state;
33 #endif
34 Signal();
42 }
35 }
43
36
44 void WaitTransition() {
37 protected void WaitTransition() {
45 while (m_state == TRANSITIONAL_STATE) {
38 if (m_state == TRANSITIONAL_STATE) {
46 Thread.MemoryBarrier();
39 SpinWait spin;
40 do {
41 spin.SpinOnce();
42 } while (m_state == TRANSITIONAL_STATE);
47 }
43 }
48 }
44 }
49
45
50 protected bool BeginSetResult() {
46 protected bool BeginSetResult() {
51 if (!BeginTransit()) {
47 if (!BeginTransit()) {
52 WaitTransition();
48 WaitTransition();
53 if (m_state != CANCELLED_STATE)
54 throw new InvalidOperationException("The promise is already resolved");
55 return false;
49 return false;
56 }
50 }
57 return true;
51 return true;
58 }
52 }
59
53
60 protected void EndSetResult() {
54 protected void EndSetResult() {
61 CompleteTransit(SUCCEEDED_STATE);
55 CompleteTransit(SUCCEEDED_STATE);
62 Signal();
63 }
56 }
64
57
65
58
66
59
67 /// <summary>
60 /// <summary>
68 /// Выполняет обещание, сообщая об ошибке
61 /// Выполняет обещание, сообщая об ошибке
69 /// </summary>
62 /// </summary>
70 /// <remarks>
63 /// <remarks>
71 /// Поскольку обещание должно работать в многопточной среде, при его выполнении сразу несколько потоков
64 /// Поскольку обещание должно работать в многопточной среде, при его выполнении сразу несколько потоков
72 /// могу вернуть ошибку, при этом только первая будет использована в качестве результата, остальные
65 /// могу вернуть ошибку, при этом только первая будет использована в качестве результата, остальные
73 /// будут проигнорированы.
66 /// будут проигнорированы.
74 /// </remarks>
67 /// </remarks>
75 /// <param name="error">Исключение возникшее при выполнении операции</param>
68 /// <param name="error">Исключение возникшее при выполнении операции</param>
76 /// <exception cref="InvalidOperationException">Данное обещание уже выполнено</exception>
69 /// <exception cref="InvalidOperationException">Данное обещание уже выполнено</exception>
77 protected void SetError(Exception error) {
70 protected void SetError(Exception error) {
78 if (BeginTransit()) {
71 if (BeginTransit()) {
79 m_error = error;
72 m_error = error;
80 CompleteTransit(REJECTED_STATE);
73 CompleteTransit(REJECTED_STATE);
81
82 Signal();
83 } else {
74 } else {
84 WaitTransition();
75 WaitTransition();
85 if (m_state == SUCCEEDED_STATE)
76 if (m_state == SUCCEEDED_STATE)
86 throw new InvalidOperationException("The promise is already resolved");
77 throw new InvalidOperationException("The promise is already resolved");
87 }
78 }
88 }
79 }
89
80
90 /// <summary>
91 /// Отменяет операцию, если это возможно.
92 /// </summary>
93 /// <remarks>Для определения была ли операция отменена следует использовать свойство <see cref="IsCancelled"/>.</remarks>
94 protected void SetCancelled(Exception reason) {
95 if (BeginTransit()) {
96 m_error = reason;
97 CompleteTransit(CANCELLED_STATE);
98 Signal();
99 }
100 }
101
102 protected abstract void SignalHandler(THandler handler, int signal);
81 protected abstract void SignalHandler(THandler handler, int signal);
103
82
104 void Signal() {
83 void Signal() {
105 var hp = m_handlerPointer;
84 THandler handler;
106 var slot = hp +1 ;
85 while (TryDequeueHandler(out handler))
107 while (slot < m_handlersCommited) {
86 SignalHandler(handler, m_state);
108 if (Interlocked.CompareExchange(ref m_handlerPointer, slot, hp) == hp) {
109 SignalHandler(m_handlers[slot], m_state);
110 }
111 hp = m_handlerPointer;
112 slot = hp +1 ;
113 }
114
115
116 if (m_extraHandlers != null) {
117 THandler handler;
118 while (m_extraHandlers.TryDequeue(out handler))
119 SignalHandler(handler, m_state);
120 }
121 }
87 }
122
88
123 #endregion
89 #endregion
124
90
125 protected abstract Signal GetResolveSignal();
91 protected abstract Signal GetFulfillSignal();
126
92
127 #region synchronization traits
93 #region synchronization traits
128 protected void WaitResult(int timeout) {
94 protected void WaitResult(int timeout) {
129 if (!(IsResolved || GetResolveSignal().Wait(timeout)))
95 if (!(IsFulfilled || GetFulfillSignal().Wait(timeout)))
130 throw new TimeoutException();
96 throw new TimeoutException();
131
97
132 switch (m_state) {
98 if (IsRejected)
133 case SUCCEEDED_STATE:
99 Rethrow();
134 return;
100 }
135 case CANCELLED_STATE:
101
136 throw new OperationCanceledException("The operation has been cancelled", m_error);
102 protected void Rethrow() {
137 case REJECTED_STATE:
103 Debug.Assert(m_error != null);
138 throw new TargetInvocationException(m_error);
104 if (m_error is OperationCanceledException)
139 default:
105 throw new OperationCanceledException("Operation cancelled", m_error);
140 throw new ApplicationException(String.Format("The promise state {0} is invalid", m_state));
106 else
141 }
107 throw new TargetInvocationException(m_error);
142 }
108 }
143 #endregion
109 #endregion
144
110
145 #region handlers managment
111 #region handlers managment
146
112
147 protected void AddHandler(THandler handler) {
113 protected void AddHandler(THandler handler) {
148
114
149 if (m_state > 1) {
115 if (m_state > 1) {
150 // the promise is in the resolved state, just invoke the handler
116 // the promise is in the resolved state, just invoke the handler
151 SignalHandler(handler, m_state);
117 SignalHandler(handler, m_state);
152 } else {
118 } else {
153 var slot = Interlocked.Increment(ref m_handlersCount) - 1;
119 if (Interlocked.CompareExchange(ref m_handler, handler, null) != null) {
154
120 if (m_extraHandlers == null)
155 if (slot < RESERVED_HANDLERS_COUNT) {
121 // compare-exchange will fprotect from loosing already created queue
156
122 Interlocked.CompareExchange(ref m_extraHandlers, new SimpleAsyncQueue<THandler>(), null);
157 if (slot == 0) {
123 m_extraHandlers.Enqueue(handler);
158 m_handlers = new THandler[RESERVED_HANDLERS_COUNT];
124 }
159 } else {
160 while (m_handlers == null)
161 Thread.MemoryBarrier();
162 }
163
164 m_handlers[slot] = handler;
165
166 while (slot != Interlocked.CompareExchange(ref m_handlersCommited, slot + 1, slot)) {
167 }
168
125
169 if (m_state > 1) {
126 if (m_state > 1 && TryDequeueHandler(out handler))
170 do {
171 var hp = m_handlerPointer;
172 slot = hp + 1;
173 if (slot < m_handlersCommited) {
174 if (Interlocked.CompareExchange(ref m_handlerPointer, slot, hp) != hp)
175 continue;
176 SignalHandler(m_handlers[slot], m_state);
177 }
178 break;
179 } while(true);
180 }
181 } else {
182 if (slot == RESERVED_HANDLERS_COUNT) {
183 m_extraHandlers = new SimpleAsyncQueue<THandler>();
184 } else {
185 while (m_extraHandlers == null)
186 Thread.MemoryBarrier();
187 }
188
189 m_extraHandlers.Enqueue(handler);
190
191 if (m_state > 1 && m_extraHandlers.TryDequeue(out handler))
192 // if the promise have been resolved while we was adding the handler to the queue
127 // if the promise have been resolved while we was adding the handler to the queue
193 // we can't guarantee that someone is still processing it
128 // we can't guarantee that someone is still processing it
194 // therefore we need to fetch a handler from the queue and execute it
129 // therefore we need to fetch a handler from the queue and execute it
195 // note that fetched handler may be not the one that we have added
130 // note that fetched handler may be not the one that we have added
196 // even we can fetch no handlers at all :)
131 // even we can fetch no handlers at all :)
197 SignalHandler(handler, m_state);
132 SignalHandler(handler, m_state);
198 }
199 }
133 }
134
135 }
136
137 bool TryDequeueHandler(out THandler handler) {
138 handler = Interlocked.Exchange(ref m_handler, null);
139 if (handler != null)
140 return true;
141 return m_extraHandlers != null && m_extraHandlers.TryDequeue(out handler);
200 }
142 }
201
143
202 #endregion
144 #endregion
203
145
204 #region IPromise implementation
146 #region IPromise implementation
205
147
206 public bool IsResolved {
148 public bool IsFulfilled {
207 get {
149 get {
208 Thread.MemoryBarrier();
150 return m_state > TRANSITIONAL_STATE;
209 return m_state > 1;
210 }
151 }
211 }
152 }
212
153
213 public bool IsCancelled {
154 public bool IsRejected {
214 get {
155 get {
215 Thread.MemoryBarrier();
156 return m_state == REJECTED_STATE;
216 return m_state == CANCELLED_STATE;
217 }
157 }
218 }
158 }
219
159
220 #endregion
160 #endregion
221
161
222 public Exception Error {
162 public Exception RejectReason {
223 get {
163 get {
224 return m_error;
164 return m_error;
225 }
165 }
226 }
166 }
227
167
228 public bool CancelOperationIfRequested() {
229 if (IsCancellationRequested) {
230 CancelOperation(CancellationReason);
231 return true;
232 }
233 return false;
234 }
235
236 public virtual void CancelOperation(Exception reason) {
237 SetCancelled(reason);
238 }
239
240 public void CancellationRequested(Action<Exception> handler) {
241 Safe.ArgumentNotNull(handler, "handler");
242 if (IsCancellationRequested)
243 handler(CancellationReason);
244
245 if (m_cancelationHandlers == null)
246 Interlocked.CompareExchange(ref m_cancelationHandlers, new SimpleAsyncQueue<Action<Exception>>(), null);
247
248 m_cancelationHandlers.Enqueue(handler);
249
250 if (IsCancellationRequested && m_cancelationHandlers.TryDequeue(out handler))
251 // TryDeque implies MemoryBarrier()
252 handler(m_cancelationReason);
253 }
254
255 public bool IsCancellationRequested {
256 get {
257 do {
258 if (m_cancelRequest == CANCEL_NOT_REQUESTED)
259 return false;
260 if (m_cancelRequest == CANCEL_REQUESTED)
261 return true;
262 Thread.MemoryBarrier();
263 } while(true);
264 }
265 }
266
267 public Exception CancellationReason {
268 get {
269 do {
270 Thread.MemoryBarrier();
271 } while(m_cancelRequest == CANCEL_REQUESTING);
272
273 return m_cancelationReason;
274 }
275 }
276
277 #region ICancellable implementation
278
279 public void Cancel() {
280 Cancel(null);
281 }
282
283 public void Cancel(Exception reason) {
284 if (CANCEL_NOT_REQUESTED == Interlocked.CompareExchange(ref m_cancelRequest, CANCEL_REQUESTING, CANCEL_NOT_REQUESTED)) {
285 m_cancelationReason = reason;
286 m_cancelRequest = CANCEL_REQUESTED;
287 if (m_cancelationHandlers != null) {
288 Action<Exception> handler;
289 while (m_cancelationHandlers.TryDequeue(out handler))
290 handler(m_cancelationReason);
291 }
292 }
293 }
294
295 #endregion
296 }
168 }
297 }
169 }
298
170
@@ -1,140 +1,87
1 using System;
1 using System;
2 using Implab.Parallels;
2 using Implab.Parallels;
3
3
4 namespace Implab {
4 namespace Implab {
5 public abstract class AbstractPromise : AbstractEvent<AbstractPromise.HandlerDescriptor>, IPromise {
5 public abstract class AbstractPromise : AbstractEvent<AbstractPromise.HandlerDescriptor>, IPromise {
6 public struct HandlerDescriptor {
6 public class HandlerDescriptor {
7 readonly Action m_handler;
7 readonly Action m_handler;
8 readonly Action<Exception> m_error;
8 readonly Action<Exception> m_error;
9 readonly Action<Exception> m_cancel;
9 public HandlerDescriptor(Action success, Action<Exception> error) {
10 readonly PromiseEventType m_mask;
11
12 public HandlerDescriptor(Action success, Action<Exception> error, Action<Exception> cancel) {
13 m_handler = success;
10 m_handler = success;
14 m_error = error;
11 m_error = error;
15 m_cancel = cancel;
16 m_mask = PromiseEventType.Success;
17 }
18
19 public HandlerDescriptor(Action handler, PromiseEventType mask) {
20 m_handler = handler;
21 m_error = null;
22 m_cancel = null;
23 m_mask = mask;
24 }
12 }
25
13
26 public void SignalSuccess() {
14 public void SignalSuccess() {
27 if ((m_mask & PromiseEventType.Success) != 0 && m_handler != null) {
15 if (m_handler != null) {
28 try {
16 try {
29 m_handler();
17 m_handler();
30 // Analysis disable once EmptyGeneralCatchClause
18 // Analysis disable once EmptyGeneralCatchClause
31 } catch {
19 } catch {
32 }
20 }
33 }
21 }
34 }
22 }
35
23
36 public void SignalError(Exception err) {
24 public void SignalError(Exception err) {
37 if (m_error != null) {
25 if (m_error != null) {
38 try {
26 try {
39 m_error(err);
27 m_error(err);
40 // Analysis disable once EmptyGeneralCatchClause
28 // Analysis disable once EmptyGeneralCatchClause
41 } catch {
29 } catch {
42 }
30 }
43 } else if ((m_mask & PromiseEventType.Error ) != 0 && m_handler != null) {
44 try {
45 m_handler();
46 // Analysis disable once EmptyGeneralCatchClause
47 } catch {
48 }
49 }
50 }
51
52 public void SignalCancel(Exception reason) {
53 if (m_cancel != null) {
54 try {
55 m_cancel(reason);
56 // Analysis disable once EmptyGeneralCatchClause
57 } catch {
58 }
59 } else if ( (m_mask & PromiseEventType.Cancelled) != 0 && m_handler != null) {
60 try {
61 m_handler();
62 // Analysis disable once EmptyGeneralCatchClause
63 } catch {
64 }
65 }
31 }
66 }
32 }
67 }
33 }
68
34
69
35
70 #region implemented abstract members of AbstractPromise
36 #region implemented abstract members of AbstractPromise
71
37
72 protected override void SignalHandler(HandlerDescriptor handler, int signal) {
38 protected override void SignalHandler(HandlerDescriptor handler, int signal) {
73 switch (signal) {
39 switch (signal) {
74 case SUCCEEDED_STATE:
40 case SUCCEEDED_STATE:
75 handler.SignalSuccess();
41 handler.SignalSuccess();
76 break;
42 break;
77 case REJECTED_STATE:
43 case REJECTED_STATE:
78 handler.SignalError(Error);
44 handler.SignalError(RejectReason);
79 break;
80 case CANCELLED_STATE:
81 handler.SignalCancel(CancellationReason);
82 break;
45 break;
83 default:
46 default:
84 throw new InvalidOperationException(String.Format("Invalid promise signal: {0}", signal));
47 throw new InvalidOperationException(String.Format("Invalid promise signal: {0}", signal));
85 }
48 }
86 }
49 }
87
50
88 protected override Signal GetResolveSignal() {
51 protected override Signal GetFulfillSignal() {
89 var signal = new Signal();
52 var signal = new Signal();
90 On(signal.Set, PromiseEventType.All);
53 On(signal.Set, e => signal.Set());
91 return signal;
54 return signal;
92 }
55 }
93
56
94 #endregion
57 #endregion
95
58
96 public Type PromiseType {
59 public Type ResultType {
97 get {
60 get {
98 return typeof(void);
61 return typeof(void);
99 }
62 }
100 }
63 }
101
64
102 public IPromise On(Action success, Action<Exception> error, Action<Exception> cancel) {
65 public void On(Action success, Action<Exception> error) {
103 AddHandler(new HandlerDescriptor(success, error, cancel));
66 AddHandler(new HandlerDescriptor(success, error));
104 return this;
105 }
106
107 public IPromise On(Action success, Action<Exception> error) {
108 AddHandler(new HandlerDescriptor(success, error, null));
109 return this;
110 }
111
112 public IPromise On(Action success) {
113 AddHandler(new HandlerDescriptor(success, null, null));
114 return this;
115 }
116
117 public IPromise On(Action handler, PromiseEventType events) {
118 AddHandler(new HandlerDescriptor(handler,events));
119 return this;
120 }
67 }
121
68
122 public IPromise<T> Cast<T>() {
69 public IPromise<T> Cast<T>() {
123 throw new InvalidCastException();
70 throw new InvalidCastException();
124 }
71 }
125
72
126 public void Join() {
73 public void Join() {
127 WaitResult(-1);
74 WaitResult(-1);
128 }
75 }
129
76
130 public void Join(int timeout) {
77 public void Join(int timeout) {
131 WaitResult(timeout);
78 WaitResult(timeout);
132 }
79 }
133
80
134 protected void SetResult() {
81 protected void SetResult() {
135 if(BeginSetResult())
82 if(BeginSetResult())
136 EndSetResult();
83 EndSetResult();
137 }
84 }
138 }
85 }
139 }
86 }
140
87
@@ -1,204 +1,204
1 using System;
1 using System;
2 using Implab.Parallels;
2 using Implab.Parallels;
3
3
4 namespace Implab {
4 namespace Implab {
5 public abstract class AbstractPromise<T> : AbstractEvent<AbstractPromise<T>.HandlerDescriptor>, IPromise<T> {
5 public abstract class AbstractPromise<T> : AbstractEvent<AbstractPromise<T>.HandlerDescriptor>, IPromise<T> {
6 public struct HandlerDescriptor {
6 public struct HandlerDescriptor {
7 readonly Action m_handler;
7 readonly Action m_handler;
8 readonly Action<T> m_success;
8 readonly Action<T> m_success;
9 readonly Action<Exception> m_error;
9 readonly Action<Exception> m_error;
10 readonly Action<Exception> m_cancel;
10 readonly Action<Exception> m_cancel;
11 readonly PromiseEventType m_mask;
11 readonly PromiseEventType m_mask;
12
12
13 public HandlerDescriptor(Action<T> success, Action<Exception> error, Action<Exception> cancel) {
13 public HandlerDescriptor(Action<T> success, Action<Exception> error, Action<Exception> cancel) {
14 m_success = success;
14 m_success = success;
15 m_error = error;
15 m_error = error;
16 m_cancel = cancel;
16 m_cancel = cancel;
17
17
18 m_handler = null;
18 m_handler = null;
19 m_mask = 0;
19 m_mask = 0;
20 }
20 }
21
21
22 public HandlerDescriptor(Action success, Action<Exception> error, Action<Exception> cancel) {
22 public HandlerDescriptor(Action success, Action<Exception> error, Action<Exception> cancel) {
23 m_handler = success;
23 m_handler = success;
24 m_success = null;
24 m_success = null;
25 m_error = error;
25 m_error = error;
26 m_cancel = cancel;
26 m_cancel = cancel;
27 m_mask = PromiseEventType.Success;
27 m_mask = PromiseEventType.Success;
28 }
28 }
29
29
30 public HandlerDescriptor(Action handler, PromiseEventType mask) {
30 public HandlerDescriptor(Action handler, PromiseEventType mask) {
31 m_handler = handler;
31 m_handler = handler;
32 m_mask = mask;
32 m_mask = mask;
33 m_success = null;
33 m_success = null;
34 m_error = null;
34 m_error = null;
35 m_cancel = null;
35 m_cancel = null;
36 }
36 }
37
37
38 public void SignalSuccess(T result) {
38 public void SignalSuccess(T result) {
39 if (m_success != null) {
39 if (m_success != null) {
40 try {
40 try {
41 m_success(result);
41 m_success(result);
42 // Analysis disable once EmptyGeneralCatchClause
42 // Analysis disable once EmptyGeneralCatchClause
43 } catch {
43 } catch {
44 }
44 }
45 } else if ((m_mask & PromiseEventType.Success) != 0 && m_handler != null) {
45 } else if ((m_mask & PromiseEventType.Success) != 0 && m_handler != null) {
46 try {
46 try {
47 m_handler();
47 m_handler();
48 // Analysis disable once EmptyGeneralCatchClause
48 // Analysis disable once EmptyGeneralCatchClause
49 } catch {
49 } catch {
50 }
50 }
51 }
51 }
52 }
52 }
53
53
54 public void SignalError(Exception err) {
54 public void SignalError(Exception err) {
55 if (m_error != null) {
55 if (m_error != null) {
56 try {
56 try {
57 m_error(err);
57 m_error(err);
58 // Analysis disable once EmptyGeneralCatchClause
58 // Analysis disable once EmptyGeneralCatchClause
59 } catch {
59 } catch {
60 }
60 }
61 } else if ((m_mask & PromiseEventType.Error) != 0 && m_handler != null) {
61 } else if ((m_mask & PromiseEventType.Error) != 0 && m_handler != null) {
62 try {
62 try {
63 m_handler();
63 m_handler();
64 // Analysis disable once EmptyGeneralCatchClause
64 // Analysis disable once EmptyGeneralCatchClause
65 } catch {
65 } catch {
66 }
66 }
67 }
67 }
68 }
68 }
69
69
70 public void SignalCancel(Exception reason) {
70 public void SignalCancel(Exception reason) {
71 if (m_cancel != null) {
71 if (m_cancel != null) {
72 try {
72 try {
73 m_cancel(reason);
73 m_cancel(reason);
74 // Analysis disable once EmptyGeneralCatchClause
74 // Analysis disable once EmptyGeneralCatchClause
75 } catch {
75 } catch {
76 }
76 }
77 } else if ((m_mask & PromiseEventType.Cancelled) != 0 && m_handler != null) {
77 } else if ((m_mask & PromiseEventType.Cancelled) != 0 && m_handler != null) {
78 try {
78 try {
79 m_handler();
79 m_handler();
80 // Analysis disable once EmptyGeneralCatchClause
80 // Analysis disable once EmptyGeneralCatchClause
81 } catch {
81 } catch {
82 }
82 }
83 }
83 }
84 }
84 }
85 }
85 }
86
86
87 public Type PromiseType {
87 public Type ResultType {
88 get {
88 get {
89 return typeof(T);
89 return typeof(T);
90 }
90 }
91 }
91 }
92
92
93 public T Join() {
93 public T Join() {
94 WaitResult(-1);
94 WaitResult(-1);
95 return m_result;
95 return m_result;
96 }
96 }
97 public T Join(int timeout) {
97 public T Join(int timeout) {
98 WaitResult(timeout);
98 WaitResult(timeout);
99 return m_result;
99 return m_result;
100 }
100 }
101
101
102 void IPromise.Join() {
102 void IPromise.Join() {
103 WaitResult(-1);
103 WaitResult(-1);
104 }
104 }
105 void IPromise.Join(int timeout) {
105 void IPromise.Join(int timeout) {
106 WaitResult(timeout);
106 WaitResult(timeout);
107 }
107 }
108
108
109 public IPromise<T> On(Action<T> success, Action<Exception> error, Action<Exception> cancel) {
109 public IPromise<T> On(Action<T> success, Action<Exception> error, Action<Exception> cancel) {
110 AddHandler(new HandlerDescriptor(success, error, cancel));
110 AddHandler(new HandlerDescriptor(success, error, cancel));
111 return this;
111 return this;
112 }
112 }
113
113
114 public IPromise<T> On(Action<T> success, Action<Exception> error) {
114 public IPromise<T> On(Action<T> success, Action<Exception> error) {
115 AddHandler(new HandlerDescriptor(success, error, null));
115 AddHandler(new HandlerDescriptor(success, error, null));
116 return this;
116 return this;
117 }
117 }
118
118
119 public IPromise<T> On(Action<T> success) {
119 public IPromise<T> On(Action<T> success) {
120 AddHandler(new HandlerDescriptor(success, null, null));
120 AddHandler(new HandlerDescriptor(success, null, null));
121 return this;
121 return this;
122 }
122 }
123
123
124 public IPromise<T> On(Action handler, PromiseEventType events) {
124 public IPromise<T> On(Action handler, PromiseEventType events) {
125 AddHandler(new HandlerDescriptor(handler, events));
125 AddHandler(new HandlerDescriptor(handler, events));
126 return this;
126 return this;
127 }
127 }
128
128
129 public IPromise<T> On(Action success, Action<Exception> error, Action<Exception> cancel) {
129 public IPromise<T> On(Action success, Action<Exception> error, Action<Exception> cancel) {
130 AddHandler(new HandlerDescriptor(success, error, cancel));
130 AddHandler(new HandlerDescriptor(success, error, cancel));
131 return this;
131 return this;
132 }
132 }
133
133
134 public IPromise<T> On(Action success, Action<Exception> error) {
134 public IPromise<T> On(Action success, Action<Exception> error) {
135 AddHandler(new HandlerDescriptor(success, error, null));
135 AddHandler(new HandlerDescriptor(success, error, null));
136 return this;
136 return this;
137 }
137 }
138
138
139 public IPromise<T> On(Action success) {
139 public IPromise<T> On(Action success) {
140 AddHandler(new HandlerDescriptor(success, null, null));
140 AddHandler(new HandlerDescriptor(success, null, null));
141 return this;
141 return this;
142 }
142 }
143
143
144 IPromise IPromise.On(Action success, Action<Exception> error, Action<Exception> cancel) {
144 IPromise IPromise.On(Action success, Action<Exception> error, Action<Exception> cancel) {
145 AddHandler(new HandlerDescriptor(success, error, cancel));
145 AddHandler(new HandlerDescriptor(success, error, cancel));
146 return this;
146 return this;
147 }
147 }
148
148
149 IPromise IPromise.On(Action success, Action<Exception> error) {
149 IPromise IPromise.On(Action success, Action<Exception> error) {
150 AddHandler(new HandlerDescriptor(success, error, null));
150 AddHandler(new HandlerDescriptor(success, error, null));
151 return this;
151 return this;
152 }
152 }
153
153
154 IPromise IPromise.On(Action success) {
154 IPromise IPromise.On(Action success) {
155 AddHandler(new HandlerDescriptor(success, null, null));
155 AddHandler(new HandlerDescriptor(success, null, null));
156 return this;
156 return this;
157 }
157 }
158
158
159 IPromise IPromise.On(Action handler, PromiseEventType events) {
159 IPromise IPromise.On(Action handler, PromiseEventType events) {
160 AddHandler(new HandlerDescriptor(handler, events));
160 AddHandler(new HandlerDescriptor(handler, events));
161 return this;
161 return this;
162 }
162 }
163
163
164 public IPromise<T2> Cast<T2>() {
164 public IPromise<T2> Cast<T2>() {
165 return (IPromise<T2>)this;
165 return (IPromise<T2>)this;
166 }
166 }
167
167
168 #region implemented abstract members of AbstractPromise
168 #region implemented abstract members of AbstractPromise
169
169
170 protected override Signal GetResolveSignal() {
170 protected override Signal GetFulfillSignal() {
171 var signal = new Signal();
171 var signal = new Signal();
172 AddHandler(new HandlerDescriptor(signal.Set, PromiseEventType.All));
172 AddHandler(new HandlerDescriptor(signal.Set, PromiseEventType.All));
173 return signal;
173 return signal;
174 }
174 }
175
175
176 protected override void SignalHandler(HandlerDescriptor handler, int signal) {
176 protected override void SignalHandler(HandlerDescriptor handler, int signal) {
177 switch (signal) {
177 switch (signal) {
178 case SUCCEEDED_STATE:
178 case SUCCEEDED_STATE:
179 handler.SignalSuccess(m_result);
179 handler.SignalSuccess(m_result);
180 break;
180 break;
181 case REJECTED_STATE:
181 case REJECTED_STATE:
182 handler.SignalError(Error);
182 handler.SignalError(RejectReason);
183 break;
183 break;
184 case CANCELLED_STATE:
184 case CANCELLED_STATE:
185 handler.SignalCancel(CancellationReason);
185 handler.SignalCancel(CancellationReason);
186 break;
186 break;
187 default:
187 default:
188 throw new InvalidOperationException(String.Format("Invalid promise signal: {0}", signal));
188 throw new InvalidOperationException(String.Format("Invalid promise signal: {0}", signal));
189 }
189 }
190 }
190 }
191
191
192 #endregion
192 #endregion
193
193
194 T m_result;
194 T m_result;
195
195
196 protected void SetResult(T value) {
196 protected void SetResult(T value) {
197 if (BeginSetResult()) {
197 if (BeginSetResult()) {
198 m_result = value;
198 m_result = value;
199 EndSetResult();
199 EndSetResult();
200 }
200 }
201 }
201 }
202 }
202 }
203 }
203 }
204
204
@@ -1,100 +1,100
1 using System;
1 using System;
2 using System.Reflection;
2 using System.Reflection;
3
3
4 namespace Implab {
4 namespace Implab {
5 public class FailedPromise : IPromise {
5 public class FailedPromise : IPromise {
6 readonly Exception m_error;
6 readonly Exception m_error;
7 public FailedPromise(Exception error) {
7 public FailedPromise(Exception error) {
8 Safe.ArgumentNotNull(error, "error");
8 Safe.ArgumentNotNull(error, "error");
9 m_error = error;
9 m_error = error;
10 }
10 }
11
11
12 #region IPromise implementation
12 #region IPromise implementation
13
13
14 public IPromise On(Action success, Action<Exception> error, Action<Exception> cancel) {
14 public IPromise On(Action success, Action<Exception> error, Action<Exception> cancel) {
15 if (error != null) {
15 if (error != null) {
16 try {
16 try {
17 error(m_error);
17 error(m_error);
18 // Analysis disable once EmptyGeneralCatchClause
18 // Analysis disable once EmptyGeneralCatchClause
19 } catch {
19 } catch {
20 }
20 }
21 }
21 }
22 return this;
22 return this;
23 }
23 }
24
24
25 public IPromise On(Action success, Action<Exception> error) {
25 public IPromise On(Action success, Action<Exception> error) {
26 if (error != null) {
26 if (error != null) {
27 try {
27 try {
28 error(m_error);
28 error(m_error);
29 // Analysis disable once EmptyGeneralCatchClause
29 // Analysis disable once EmptyGeneralCatchClause
30 } catch {
30 } catch {
31 }
31 }
32 }
32 }
33 return this;
33 return this;
34 }
34 }
35
35
36 public IPromise On(Action success) {
36 public IPromise On(Action success) {
37 return this;
37 return this;
38 }
38 }
39
39
40 public IPromise On(Action handler, PromiseEventType events) {
40 public IPromise On(Action handler, PromiseEventType events) {
41 if ((events & PromiseEventType.Error) != 0) {
41 if ((events & PromiseEventType.Error) != 0) {
42 try {
42 try {
43 handler();
43 handler();
44 // Analysis disable once EmptyGeneralCatchClause
44 // Analysis disable once EmptyGeneralCatchClause
45 } catch {
45 } catch {
46 }
46 }
47 }
47 }
48 return this;
48 return this;
49 }
49 }
50
50
51 public IPromise<T> Cast<T>() {
51 public IPromise<T> Cast<T>() {
52 return (IPromise<T>)this;
52 return (IPromise<T>)this;
53 }
53 }
54
54
55 public void Join() {
55 public void Join() {
56 throw new TargetInvocationException(Error);
56 throw new TargetInvocationException(RejectReason);
57 }
57 }
58
58
59 public void Join(int timeout) {
59 public void Join(int timeout) {
60 throw new TargetInvocationException(Error);
60 throw new TargetInvocationException(RejectReason);
61 }
61 }
62
62
63 public virtual Type PromiseType {
63 public virtual Type ResultType {
64 get {
64 get {
65 return typeof(void);
65 return typeof(void);
66 }
66 }
67 }
67 }
68
68
69 public bool IsResolved {
69 public bool IsFulfilled {
70 get {
70 get {
71 return true;
71 return true;
72 }
72 }
73 }
73 }
74
74
75 public bool IsCancelled {
75 public bool IsCancelled {
76 get {
76 get {
77 return false;
77 return false;
78 }
78 }
79 }
79 }
80
80
81 public Exception Error {
81 public Exception RejectReason {
82 get {
82 get {
83 return m_error;
83 return m_error;
84 }
84 }
85 }
85 }
86
86
87 #endregion
87 #endregion
88
88
89 #region ICancellable implementation
89 #region ICancellable implementation
90
90
91 public void Cancel() {
91 public void Cancel() {
92 }
92 }
93
93
94 public void Cancel(Exception reason) {
94 public void Cancel(Exception reason) {
95 }
95 }
96
96
97 #endregion
97 #endregion
98 }
98 }
99 }
99 }
100
100
@@ -1,65 +1,65
1 using System;
1 using System;
2 using System.Reflection;
2 using System.Reflection;
3
3
4 namespace Implab {
4 namespace Implab {
5 public class FailedPromise<T> : FailedPromise, IPromise<T> {
5 public class FailedPromise<T> : FailedPromise, IPromise<T> {
6 public FailedPromise(Exception error) : base(error) {
6 public FailedPromise(Exception error) : base(error) {
7 }
7 }
8
8
9 public IPromise<T> On(Action<T> success, Action<Exception> error, Action<Exception> cancel) {
9 public IPromise<T> On(Action<T> success, Action<Exception> error, Action<Exception> cancel) {
10 if (error != null) {
10 if (error != null) {
11 try {
11 try {
12 error(Error);
12 error(RejectReason);
13 // Analysis disable once EmptyGeneralCatchClause
13 // Analysis disable once EmptyGeneralCatchClause
14 } catch {
14 } catch {
15 }
15 }
16 }
16 }
17 return this;
17 return this;
18 }
18 }
19
19
20 public IPromise<T> On(Action<T> success, Action<Exception> error) {
20 public IPromise<T> On(Action<T> success, Action<Exception> error) {
21 if (error != null) {
21 if (error != null) {
22 try {
22 try {
23 error(Error);
23 error(RejectReason);
24 // Analysis disable once EmptyGeneralCatchClause
24 // Analysis disable once EmptyGeneralCatchClause
25 } catch {
25 } catch {
26 }
26 }
27 }
27 }
28 return this;
28 return this;
29 }
29 }
30
30
31 public IPromise<T> On(Action<T> success) {
31 public IPromise<T> On(Action<T> success) {
32 return this;
32 return this;
33 }
33 }
34
34
35 T IPromise<T>.Join() {
35 T IPromise<T>.Join() {
36 throw new TargetInvocationException(Error);
36 throw new TargetInvocationException(RejectReason);
37 }
37 }
38
38
39 T IPromise<T>.Join(int timeout) {
39 T IPromise<T>.Join(int timeout) {
40 throw new TargetInvocationException(Error);
40 throw new TargetInvocationException(RejectReason);
41 }
41 }
42
42
43
43
44 IPromise<T> IPromise<T>.On(Action success, Action<Exception> error, Action<Exception> cancel) {
44 IPromise<T> IPromise<T>.On(Action success, Action<Exception> error, Action<Exception> cancel) {
45 On(success, error, cancel);
45 On(success, error, cancel);
46 return this;
46 return this;
47 }
47 }
48
48
49 IPromise<T> IPromise<T>.On(Action success, Action<Exception> error) {
49 IPromise<T> IPromise<T>.On(Action success, Action<Exception> error) {
50 On(success, error);
50 On(success, error);
51 return this;
51 return this;
52 }
52 }
53
53
54 IPromise<T> IPromise<T>.On(Action success) {
54 IPromise<T> IPromise<T>.On(Action success) {
55 On(success);
55 On(success);
56 return this;
56 return this;
57 }
57 }
58
58
59 IPromise<T> IPromise<T>.On(Action handler, PromiseEventType events) {
59 IPromise<T> IPromise<T>.On(Action handler, PromiseEventType events) {
60 On(handler, events);
60 On(handler, events);
61 return this;
61 return this;
62 }
62 }
63 }
63 }
64 }
64 }
65
65
@@ -1,66 +1,53
1 using System;
1 using System;
2 using System.Collections.Generic;
2 using System.Collections.Generic;
3 using System.Linq;
3 using System.Linq;
4 using System.Text;
4 using System.Text;
5
5
6 namespace Implab {
6 namespace Implab {
7 public interface IPromise: ICancellable {
7 public interface IPromise: ICancellable {
8
8
9 /// <summary>
9 /// <summary>
10 /// Тип результата, получаемого через данное обещание.
10 /// Тип результата, получаемого через данное обещание.
11 /// </summary>
11 /// </summary>
12 Type PromiseType { get; }
12 Type ResultType { get; }
13
13
14 /// <summary>
14 /// <summary>
15 /// Обещание является выполненым, либо успешно, либо с ошибкой, либо отменено.
15 /// Обещание является выполненым, либо успешно, либо с ошибкой, либо отменено.
16 /// </summary>
16 /// </summary>
17 bool IsResolved { get; }
17 bool IsFulfilled { get; }
18
18
19 /// <summary>
19 bool IsRejected { get; }
20 /// Обещание было отменено.
20
21 /// </summary>
21 bool IsResolved { get; }
22 bool IsCancelled { get; }
23
22
24 /// <summary>
23 /// <summary>
25 /// Исключение возникшее в результате выполнения обещания, либо причина отмены.
24 /// Исключение возникшее в результате выполнения обещания, либо причина отмены.
26 /// </summary>
25 /// </summary>
27 Exception Error { get; }
26 Exception RejectReason { get; }
28
27
29 /// <summary>
28 /// <summary>
30 /// Adds specified listeners to the current promise.
29 /// Adds specified listeners to the current promise.
31 /// </summary>
30 /// </summary>
32 /// <param name="success">The handler called on the successful promise completion.</param>
31 /// <param name="success">The handler called on the successful promise completion.</param>
33 /// <param name="error">The handler is called if an error while completing the promise occurred.</param>
32 /// <param name="error">The handler is called if an error while completing the promise occurred.</param>
34 /// <param name="cancel">The handler is called in case of promise cancellation.</param>
35 /// <returns>The current promise.</returns>
33 /// <returns>The current promise.</returns>
36 IPromise On(Action success, Action<Exception> error, Action<Exception> cancel);
34 void On(Action success, Action<Exception> error);
37 IPromise On(Action success, Action<Exception> error);
38 IPromise On(Action success);
39
40 /// <summary>
41 /// Adds specified listeners to the current promise.
42 /// </summary>
43 /// <param name="handler">The handler called on the specified events.</param>
44 /// <param name = "events">The combination of flags denoting the events for which the
45 /// handler shoud be called.</param>
46 /// <returns>The current promise.</returns>
47 IPromise On(Action handler, PromiseEventType events);
48
35
49 /// <summary>
36 /// <summary>
50 /// Преобразует результат обещания к заданному типу и возвращает новое обещание.
37 /// Преобразует результат обещания к заданному типу и возвращает новое обещание.
51 /// </summary>
38 /// </summary>
52 IPromise<T> Cast<T>();
39 IPromise<T> Cast<T>();
53
40
54 /// <summary>
41 /// <summary>
55 /// Синхронизирует текущий поток с обещанием.
42 /// Синхронизирует текущий поток с обещанием.
56 /// </summary>
43 /// </summary>
57 void Join();
44 void Join();
58 /// <summary>
45 /// <summary>
59 /// Синхронизирует текущий поток с обещанием.
46 /// Синхронизирует текущий поток с обещанием.
60 /// </summary>
47 /// </summary>
61 /// <param name="timeout">Время ожидания, по его истечению возникнет исключение.</param>
48 /// <param name="timeout">Время ожидания, по его истечению возникнет исключение.</param>
62 /// <exception cref="TimeoutException">Превышено время ожидания.</exception>
49 /// <exception cref="TimeoutException">Превышено время ожидания.</exception>
63 void Join(int timeout);
50 void Join(int timeout);
64
51
65 }
52 }
66 }
53 }
@@ -1,25 +1,12
1 using System;
1 using System;
2
2
3 namespace Implab {
3 namespace Implab {
4 public interface IPromise<out T> : IPromise {
4 public interface IPromise<out T> : IPromise {
5
5
6 IPromise<T> On(Action<T> success, Action<Exception> error, Action<Exception> cancel);
6 void On(Action<T> success, Action<Exception> error);
7
8 IPromise<T> On(Action<T> success, Action<Exception> error);
9
10 IPromise<T> On(Action<T> success);
11
7
12 new T Join();
8 new T Join();
13
9
14 new T Join(int timeout);
10 new T Join(int timeout);
15
16 new IPromise<T> On(Action success, Action<Exception> error, Action<Exception> cancel);
17
18 new IPromise<T> On(Action success, Action<Exception> error);
19
20 new IPromise<T> On(Action success);
21
22 new IPromise<T> On(Action handler, PromiseEventType events);
23
24 }
11 }
25 }
12 }
@@ -1,207 +1,207
1 using Implab.Diagnostics;
1 using Implab.Diagnostics;
2 using System;
2 using System;
3 using System.Diagnostics;
3 using System.Diagnostics;
4 using System.Threading;
4 using System.Threading;
5
5
6 namespace Implab.Parallels {
6 namespace Implab.Parallels {
7 public static class ArrayTraits {
7 public static class ArrayTraits {
8 class ArrayIterator<TSrc> : DispatchPool<int> {
8 class ArrayIterator<TSrc> : DispatchPool<int> {
9 readonly Action<TSrc> m_action;
9 readonly Action<TSrc> m_action;
10 readonly TSrc[] m_source;
10 readonly TSrc[] m_source;
11 readonly Promise<int> m_promise = new Promise<int>();
11 readonly Promise<int> m_promise = new Promise<int>();
12 readonly LogicalOperation m_logicalOperation;
12 readonly LogicalOperation m_logicalOperation;
13
13
14 int m_pending;
14 int m_pending;
15 int m_next;
15 int m_next;
16
16
17 public ArrayIterator(TSrc[] source, Action<TSrc> action, int threads)
17 public ArrayIterator(TSrc[] source, Action<TSrc> action, int threads)
18 : base(threads) {
18 : base(threads) {
19
19
20 Debug.Assert(source != null);
20 Debug.Assert(source != null);
21 Debug.Assert(action != null);
21 Debug.Assert(action != null);
22
22
23 m_logicalOperation = TraceContext.Instance.CurrentOperation;
23 m_logicalOperation = TraceContext.Instance.CurrentOperation;
24 m_next = 0;
24 m_next = 0;
25 m_source = source;
25 m_source = source;
26 m_pending = source.Length;
26 m_pending = source.Length;
27 m_action = action;
27 m_action = action;
28
28
29 m_promise.On(Dispose, PromiseEventType.All);
29 m_promise.On(Dispose, PromiseEventType.All);
30
30
31 InitPool();
31 InitPool();
32 }
32 }
33
33
34 public Promise<int> Promise {
34 public Promise<int> Promise {
35 get {
35 get {
36 return m_promise;
36 return m_promise;
37 }
37 }
38 }
38 }
39
39
40 protected override void Worker() {
40 protected override void Worker() {
41 TraceContext.Instance.EnterLogicalOperation(m_logicalOperation, false);
41 TraceContext.Instance.EnterLogicalOperation(m_logicalOperation, false);
42 try {
42 try {
43 base.Worker();
43 base.Worker();
44 } finally {
44 } finally {
45 TraceContext.Instance.Leave();
45 TraceContext.Instance.Leave();
46 }
46 }
47 }
47 }
48
48
49 protected override bool TryDequeue(out int unit) {
49 protected override bool TryDequeue(out int unit) {
50 unit = Interlocked.Increment(ref m_next) - 1;
50 unit = Interlocked.Increment(ref m_next) - 1;
51 return unit < m_source.Length;
51 return unit < m_source.Length;
52 }
52 }
53
53
54 protected override void InvokeUnit(int unit) {
54 protected override void InvokeUnit(int unit) {
55 try {
55 try {
56 m_action(m_source[unit]);
56 m_action(m_source[unit]);
57 var pending = Interlocked.Decrement(ref m_pending);
57 var pending = Interlocked.Decrement(ref m_pending);
58 if (pending == 0)
58 if (pending == 0)
59 m_promise.Resolve(m_source.Length);
59 m_promise.Resolve(m_source.Length);
60 } catch (Exception e) {
60 } catch (Exception e) {
61 m_promise.Reject(e);
61 m_promise.Reject(e);
62 }
62 }
63 }
63 }
64 }
64 }
65
65
66 class ArrayMapper<TSrc, TDst>: DispatchPool<int> {
66 class ArrayMapper<TSrc, TDst>: DispatchPool<int> {
67 readonly Func<TSrc, TDst> m_transform;
67 readonly Func<TSrc, TDst> m_transform;
68 readonly TSrc[] m_source;
68 readonly TSrc[] m_source;
69 readonly TDst[] m_dest;
69 readonly TDst[] m_dest;
70 readonly Promise<TDst[]> m_promise = new Promise<TDst[]>();
70 readonly Promise<TDst[]> m_promise = new Promise<TDst[]>();
71 readonly LogicalOperation m_logicalOperation;
71 readonly LogicalOperation m_logicalOperation;
72
72
73 int m_pending;
73 int m_pending;
74 int m_next;
74 int m_next;
75
75
76 public ArrayMapper(TSrc[] source, Func<TSrc, TDst> transform, int threads)
76 public ArrayMapper(TSrc[] source, Func<TSrc, TDst> transform, int threads)
77 : base(threads) {
77 : base(threads) {
78
78
79 Debug.Assert (source != null);
79 Debug.Assert (source != null);
80 Debug.Assert( transform != null);
80 Debug.Assert( transform != null);
81
81
82 m_next = 0;
82 m_next = 0;
83 m_source = source;
83 m_source = source;
84 m_dest = new TDst[source.Length];
84 m_dest = new TDst[source.Length];
85 m_pending = source.Length;
85 m_pending = source.Length;
86 m_transform = transform;
86 m_transform = transform;
87 m_logicalOperation = TraceContext.Instance.CurrentOperation;
87 m_logicalOperation = TraceContext.Instance.CurrentOperation;
88
88
89 m_promise.On(Dispose, PromiseEventType.All);
89 m_promise.On(Dispose, PromiseEventType.All);
90
90
91 InitPool();
91 InitPool();
92 }
92 }
93
93
94 public Promise<TDst[]> Promise {
94 public Promise<TDst[]> Promise {
95 get {
95 get {
96 return m_promise;
96 return m_promise;
97 }
97 }
98 }
98 }
99
99
100 protected override void Worker() {
100 protected override void Worker() {
101 TraceContext.Instance.EnterLogicalOperation(m_logicalOperation,false);
101 TraceContext.Instance.EnterLogicalOperation(m_logicalOperation,false);
102 try {
102 try {
103 base.Worker();
103 base.Worker();
104 } finally {
104 } finally {
105 TraceContext.Instance.Leave();
105 TraceContext.Instance.Leave();
106 }
106 }
107 }
107 }
108
108
109 protected override bool TryDequeue(out int unit) {
109 protected override bool TryDequeue(out int unit) {
110 unit = Interlocked.Increment(ref m_next) - 1;
110 unit = Interlocked.Increment(ref m_next) - 1;
111 return unit < m_source.Length;
111 return unit < m_source.Length;
112 }
112 }
113
113
114 protected override void InvokeUnit(int unit) {
114 protected override void InvokeUnit(int unit) {
115 try {
115 try {
116 m_dest[unit] = m_transform(m_source[unit]);
116 m_dest[unit] = m_transform(m_source[unit]);
117 var pending = Interlocked.Decrement(ref m_pending);
117 var pending = Interlocked.Decrement(ref m_pending);
118 if (pending == 0)
118 if (pending == 0)
119 m_promise.Resolve(m_dest);
119 m_promise.Resolve(m_dest);
120 } catch (Exception e) {
120 } catch (Exception e) {
121 m_promise.Reject(e);
121 m_promise.Reject(e);
122 }
122 }
123 }
123 }
124 }
124 }
125
125
126 public static IPromise<TDst[]> ParallelMap<TSrc, TDst> (this TSrc[] source, Func<TSrc,TDst> transform, int threads) {
126 public static IPromise<TDst[]> ParallelMap<TSrc, TDst> (this TSrc[] source, Func<TSrc,TDst> transform, int threads) {
127 if (source == null)
127 if (source == null)
128 throw new ArgumentNullException("source");
128 throw new ArgumentNullException("source");
129 if (transform == null)
129 if (transform == null)
130 throw new ArgumentNullException("transform");
130 throw new ArgumentNullException("transform");
131
131
132 var mapper = new ArrayMapper<TSrc, TDst>(source, transform, threads);
132 var mapper = new ArrayMapper<TSrc, TDst>(source, transform, threads);
133 return mapper.Promise;
133 return mapper.Promise;
134 }
134 }
135
135
136 public static IPromise<int> ParallelForEach<TSrc>(this TSrc[] source, Action<TSrc> action, int threads) {
136 public static IPromise<int> ParallelForEach<TSrc>(this TSrc[] source, Action<TSrc> action, int threads) {
137 if (source == null)
137 if (source == null)
138 throw new ArgumentNullException("source");
138 throw new ArgumentNullException("source");
139 if (action == null)
139 if (action == null)
140 throw new ArgumentNullException("action");
140 throw new ArgumentNullException("action");
141
141
142 var iter = new ArrayIterator<TSrc>(source, action, threads);
142 var iter = new ArrayIterator<TSrc>(source, action, threads);
143 return iter.Promise;
143 return iter.Promise;
144 }
144 }
145
145
146 public static IPromise<TDst[]> ChainedMap<TSrc, TDst>(this TSrc[] source, Func<TSrc, IPromise<TDst>> transform, int threads) {
146 public static IPromise<TDst[]> ChainedMap<TSrc, TDst>(this TSrc[] source, Func<TSrc, IPromise<TDst>> transform, int threads) {
147 if (source == null)
147 if (source == null)
148 throw new ArgumentNullException("source");
148 throw new ArgumentNullException("source");
149 if (transform == null)
149 if (transform == null)
150 throw new ArgumentNullException("transform");
150 throw new ArgumentNullException("transform");
151 if (threads <= 0)
151 if (threads <= 0)
152 throw new ArgumentOutOfRangeException("threads","Threads number must be greater then zero");
152 throw new ArgumentOutOfRangeException("threads","Threads number must be greater then zero");
153
153
154 if (source.Length == 0)
154 if (source.Length == 0)
155 return Promise<TDst[]>.FromResult(new TDst[0]);
155 return Promise<TDst[]>.FromResult(new TDst[0]);
156
156
157 var promise = new Promise<TDst[]>();
157 var promise = new Promise<TDst[]>();
158 var res = new TDst[source.Length];
158 var res = new TDst[source.Length];
159 var pending = source.Length;
159 var pending = source.Length;
160
160
161 object locker = new object();
161 object locker = new object();
162 int slots = threads;
162 int slots = threads;
163
163
164 // Analysis disable AccessToDisposedClosure
164 // Analysis disable AccessToDisposedClosure
165 AsyncPool.RunThread<int>(() => {
165 AsyncPool.RunThread<int>(() => {
166 for (int i = 0; i < source.Length; i++) {
166 for (int i = 0; i < source.Length; i++) {
167 if(promise.IsResolved)
167 if(promise.IsFulfilled)
168 break; // stop processing in case of error or cancellation
168 break; // stop processing in case of error or cancellation
169 var idx = i;
169 var idx = i;
170
170
171 if (Interlocked.Decrement(ref slots) < 0) {
171 if (Interlocked.Decrement(ref slots) < 0) {
172 lock(locker) {
172 lock(locker) {
173 while(slots < 0)
173 while(slots < 0)
174 Monitor.Wait(locker);
174 Monitor.Wait(locker);
175 }
175 }
176 }
176 }
177
177
178 try {
178 try {
179 transform(source[i])
179 transform(source[i])
180 .On( x => {
180 .On( x => {
181 Interlocked.Increment(ref slots);
181 Interlocked.Increment(ref slots);
182 lock (locker) {
182 lock (locker) {
183 Monitor.Pulse(locker);
183 Monitor.Pulse(locker);
184 }
184 }
185 })
185 })
186 .On(
186 .On(
187 x => {
187 x => {
188 res[idx] = x;
188 res[idx] = x;
189 var left = Interlocked.Decrement(ref pending);
189 var left = Interlocked.Decrement(ref pending);
190 if (left == 0)
190 if (left == 0)
191 promise.Resolve(res);
191 promise.Resolve(res);
192 },
192 },
193 promise.Reject
193 promise.Reject
194 );
194 );
195
195
196 } catch (Exception e) {
196 } catch (Exception e) {
197 promise.Reject(e);
197 promise.Reject(e);
198 }
198 }
199 }
199 }
200 return 0;
200 return 0;
201 });
201 });
202
202
203 return promise;
203 return promise;
204 }
204 }
205
205
206 }
206 }
207 }
207 }
@@ -1,131 +1,140
1 using System.Threading;
1 using System.Threading;
2 using System.Collections.Generic;
2 using System.Collections.Generic;
3 using System;
3 using System;
4 using System.Collections;
4 using System.Collections;
5
5
6 namespace Implab.Parallels {
6 namespace Implab.Parallels {
7 public class SimpleAsyncQueue<T> : IEnumerable<T> {
7 public class SimpleAsyncQueue<T> : IEnumerable<T> {
8 class Node {
8 class Node {
9 public Node(T value) {
9 public Node(T value) {
10 this.value = value;
10 this.value = value;
11 }
11 }
12 public readonly T value;
12 public readonly T value;
13 public volatile Node next;
13 public volatile Node next;
14 }
14 }
15
15
16 // the reader and the writer are mainteined completely independent,
16 // the reader and the writer are mainteined completely independent,
17 // the reader can read next item when m_first.next is not null
17 // the reader can read next item when m_first.next is not null
18 // the writer creates the a new node, moves m_last to this node and
18 // the writer creates a new node, moves m_last to this node and
19 // only after that restores the reference from the previous node
19 // only after that restores the reference from the previous node
20 // making available the reader to read the new node.
20 // making the reader be able to read the new node.
21
21
22 Node m_first; // position on the node which is already read
22 volatile Node m_first; // position on the node which is already read
23 Node m_last; // position on the node which is already written
23 volatile Node m_last; // position on the node which is already written
24
24
25 public SimpleAsyncQueue() {
25 public SimpleAsyncQueue() {
26 m_first = m_last = new Node(default(T));
26 m_first = m_last = new Node(default(T));
27 }
27 }
28
28
29 public void Enqueue(T value) {
29 public void Enqueue(T value) {
30 var next = new Node(value);
30 var next = new Node(value);
31
31
32 // Interlocaked.CompareExchange implies Thread.MemoryBarrier();
32 // Interlocaked.CompareExchange implies Thread.MemoryBarrier();
33 // to ensure that the next node is completely constructed
33 // to ensure that the next node is completely constructed
34 var last = Interlocked.Exchange(ref m_last, next);
34 var last = Interlocked.Exchange(ref m_last, next);
35
35
36 // release-fence
36 // release-fence
37 last.next = next;
37 last.next = next;
38
38
39 }
39 }
40
40
41 public bool TryDequeue(out T value) {
41 public bool TryDequeue(out T value) {
42 Node first;
42 Node first = m_first; ;
43 Node next;
43 Node next = first.next; ;
44
45 if (next == null) {
46 value = default(T);
47 return false;
48 }
49
50 var first2 = Interlocked.CompareExchange(ref m_first, next, first);
51
52 if (first != first2) {
53 // head is updated by someone else
44
54
45 Thread.MemoryBarrier(); // ensure m_first is fresh
55 SpinWait spin = new SpinWait();
46 SpinWait spin = new SpinWait();
56 do {
47 do {
57 first = first2;
48 first = m_first;
58 next = first.next;
49 // aquire-fence
59 if (next == null) {
50 next = first.next;
60 value = default(T);
51 if (next == null) {
61 return false;
52 value = default(T);
62 }
53 return false;
63
54 }
64 first2 = Interlocked.CompareExchange(ref m_first, next, first);
55
65 if (first == first2)
56 if (first == Interlocked.CompareExchange(ref m_first, next, first))
66 break;
57 // head succesfully updated
67 spin.SpinOnce();
58 break;
68 } while (true);
59 spin.SpinOnce();
69 }
60 } while (true);
61
70
62 value = next.value;
71 value = next.value;
63 return true;
72 return true;
64 }
73 }
65
74
66 #region IEnumerable implementation
75 #region IEnumerable implementation
67
76
68 class Enumerator : IEnumerator<T> {
77 class Enumerator : IEnumerator<T> {
69 Node m_current;
78 Node m_current;
70 Node m_first;
79 Node m_first;
71
80
72 public Enumerator(Node first) {
81 public Enumerator(Node first) {
73 m_first = first;
82 m_first = first;
74 }
83 }
75
84
76 #region IEnumerator implementation
85 #region IEnumerator implementation
77
86
78 public bool MoveNext() {
87 public bool MoveNext() {
79 m_current = m_current == null ? m_first : m_current.next;
88 m_current = m_current == null ? m_first : m_current.next;
80 return m_current != null;
89 return m_current != null;
81 }
90 }
82
91
83 public void Reset() {
92 public void Reset() {
84 m_current = null;
93 m_current = null;
85 }
94 }
86
95
87 object IEnumerator.Current {
96 object IEnumerator.Current {
88 get {
97 get {
89 if (m_current == null)
98 if (m_current == null)
90 throw new InvalidOperationException();
99 throw new InvalidOperationException();
91 return m_current.value;
100 return m_current.value;
92 }
101 }
93 }
102 }
94
103
95 #endregion
104 #endregion
96
105
97 #region IDisposable implementation
106 #region IDisposable implementation
98
107
99 public void Dispose() {
108 public void Dispose() {
100 }
109 }
101
110
102 #endregion
111 #endregion
103
112
104 #region IEnumerator implementation
113 #region IEnumerator implementation
105
114
106 public T Current {
115 public T Current {
107 get {
116 get {
108 if (m_current == null)
117 if (m_current == null)
109 throw new InvalidOperationException();
118 throw new InvalidOperationException();
110 return m_current.value;
119 return m_current.value;
111 }
120 }
112 }
121 }
113
122
114 #endregion
123 #endregion
115 }
124 }
116
125
117 public IEnumerator<T> GetEnumerator() {
126 public IEnumerator<T> GetEnumerator() {
118 return new Enumerator(m_first);
127 return new Enumerator(m_first);
119 }
128 }
120
129
121 #endregion
130 #endregion
122
131
123 #region IEnumerable implementation
132 #region IEnumerable implementation
124
133
125 IEnumerator IEnumerable.GetEnumerator() {
134 IEnumerator IEnumerable.GetEnumerator() {
126 return GetEnumerator();
135 return GetEnumerator();
127 }
136 }
128
137
129 #endregion
138 #endregion
130 }
139 }
131 }
140 }
@@ -1,28 +1,28
1 using System;
1 using System;
2 using System.Runtime.CompilerServices;
2 using System.Runtime.CompilerServices;
3
3
4 namespace Implab {
4 namespace Implab {
5 public struct PromiseAwaiter : INotifyCompletion {
5 public struct PromiseAwaiter : INotifyCompletion {
6 readonly IPromise m_promise;
6 readonly IPromise m_promise;
7
7
8 public PromiseAwaiter(IPromise promise) {
8 public PromiseAwaiter(IPromise promise) {
9 m_promise = promise;
9 m_promise = promise;
10 }
10 }
11
11
12 public void OnCompleted (Action continuation) {
12 public void OnCompleted (Action continuation) {
13 if (m_promise != null)
13 if (m_promise != null)
14 m_promise.On(continuation, PromiseEventType.All);
14 m_promise.On(continuation, PromiseEventType.All);
15 }
15 }
16
16
17 public void GetResult() {
17 public void GetResult() {
18 m_promise.Join();
18 m_promise.Join();
19 }
19 }
20
20
21 public bool IsCompleted {
21 public bool IsCompleted {
22 get {
22 get {
23 return m_promise.IsResolved;
23 return m_promise.IsFulfilled;
24 }
24 }
25 }
25 }
26 }
26 }
27 }
27 }
28
28
@@ -1,28 +1,28
1 using System;
1 using System;
2 using System.Runtime.CompilerServices;
2 using System.Runtime.CompilerServices;
3
3
4 namespace Implab {
4 namespace Implab {
5 public struct PromiseAwaiter<T> : INotifyCompletion {
5 public struct PromiseAwaiter<T> : INotifyCompletion {
6 readonly IPromise<T> m_promise;
6 readonly IPromise<T> m_promise;
7
7
8 public PromiseAwaiter(IPromise<T> promise) {
8 public PromiseAwaiter(IPromise<T> promise) {
9 m_promise = promise;
9 m_promise = promise;
10 }
10 }
11
11
12 public void OnCompleted (Action continuation) {
12 public void OnCompleted (Action continuation) {
13 if (m_promise != null)
13 if (m_promise != null)
14 m_promise.On(continuation, PromiseEventType.All);
14 m_promise.On(continuation, PromiseEventType.All);
15 }
15 }
16
16
17 public T GetResult() {
17 public T GetResult() {
18 return m_promise.Join();
18 return m_promise.Join();
19 }
19 }
20
20
21 public bool IsCompleted {
21 public bool IsCompleted {
22 get {
22 get {
23 return m_promise.IsResolved;
23 return m_promise.IsFulfilled;
24 }
24 }
25 }
25 }
26 }
26 }
27 }
27 }
28
28
@@ -1,99 +1,99
1 using System;
1 using System;
2
2
3 namespace Implab {
3 namespace Implab {
4 public class SuccessPromise : IPromise {
4 public class SuccessPromise : IPromise {
5 #region IPromise implementation
5 #region IPromise implementation
6
6
7 public IPromise On(Action success, Action<Exception> error, Action<Exception> cancel) {
7 public IPromise On(Action success, Action<Exception> error, Action<Exception> cancel) {
8 if (success != null) {
8 if (success != null) {
9 try {
9 try {
10 success();
10 success();
11 // Analysis disable once EmptyGeneralCatchClause
11 // Analysis disable once EmptyGeneralCatchClause
12 } catch {
12 } catch {
13 }
13 }
14 }
14 }
15 return this;
15 return this;
16 }
16 }
17
17
18 public IPromise On(Action success, Action<Exception> error) {
18 public IPromise On(Action success, Action<Exception> error) {
19 if (success != null) {
19 if (success != null) {
20 try {
20 try {
21 success();
21 success();
22 // Analysis disable once EmptyGeneralCatchClause
22 // Analysis disable once EmptyGeneralCatchClause
23 } catch {
23 } catch {
24 }
24 }
25 }
25 }
26 return this;
26 return this;
27 }
27 }
28
28
29 public IPromise On(Action success) {
29 public IPromise On(Action success) {
30 if (success != null) {
30 if (success != null) {
31 try {
31 try {
32 success();
32 success();
33 // Analysis disable once EmptyGeneralCatchClause
33 // Analysis disable once EmptyGeneralCatchClause
34 } catch {
34 } catch {
35 }
35 }
36 }
36 }
37 return this;
37 return this;
38 }
38 }
39
39
40 public IPromise On(Action handler, PromiseEventType events) {
40 public IPromise On(Action handler, PromiseEventType events) {
41 if (handler != null && events.HasFlag(PromiseEventType.Success)) {
41 if (handler != null && events.HasFlag(PromiseEventType.Success)) {
42 try {
42 try {
43 handler();
43 handler();
44 // Analysis disable once EmptyGeneralCatchClause
44 // Analysis disable once EmptyGeneralCatchClause
45 } catch {
45 } catch {
46 }
46 }
47 }
47 }
48 return this;
48 return this;
49 }
49 }
50
50
51 public IPromise<T> Cast<T>() {
51 public IPromise<T> Cast<T>() {
52 throw new InvalidCastException();
52 throw new InvalidCastException();
53 }
53 }
54
54
55 public void Join() {
55 public void Join() {
56 }
56 }
57
57
58 public void Join(int timeout) {
58 public void Join(int timeout) {
59 }
59 }
60
60
61 public Type PromiseType {
61 public Type ResultType {
62 get {
62 get {
63 return typeof(void);
63 return typeof(void);
64 }
64 }
65 }
65 }
66
66
67 public bool IsResolved {
67 public bool IsFulfilled {
68 get {
68 get {
69 return true;
69 return true;
70 }
70 }
71 }
71 }
72
72
73 public bool IsCancelled {
73 public bool IsCancelled {
74 get {
74 get {
75 return false;
75 return false;
76 }
76 }
77 }
77 }
78
78
79 public Exception Error {
79 public Exception RejectReason {
80 get {
80 get {
81 return null;
81 return null;
82 }
82 }
83 }
83 }
84
84
85 #endregion
85 #endregion
86
86
87 #region ICancellable implementation
87 #region ICancellable implementation
88
88
89 public void Cancel() {
89 public void Cancel() {
90 }
90 }
91
91
92 public void Cancel(Exception reason) {
92 public void Cancel(Exception reason) {
93 }
93 }
94
94
95 #endregion
95 #endregion
96
96
97 }
97 }
98 }
98 }
99
99
@@ -1,153 +1,153
1 using System;
1 using System;
2
2
3 namespace Implab {
3 namespace Implab {
4 public class SuccessPromise<T> : IPromise<T> {
4 public class SuccessPromise<T> : IPromise<T> {
5 readonly T m_value;
5 readonly T m_value;
6
6
7 public SuccessPromise(T value){
7 public SuccessPromise(T value){
8 m_value = value;
8 m_value = value;
9 }
9 }
10
10
11 public IPromise<T> On(Action<T> success, Action<Exception> error, Action<Exception> cancel) {
11 public IPromise<T> On(Action<T> success, Action<Exception> error, Action<Exception> cancel) {
12 if (success != null) {
12 if (success != null) {
13 try {
13 try {
14 success(m_value);
14 success(m_value);
15 // Analysis disable once EmptyGeneralCatchClause
15 // Analysis disable once EmptyGeneralCatchClause
16 } catch {
16 } catch {
17 }
17 }
18 }
18 }
19 return this;
19 return this;
20 }
20 }
21
21
22 public IPromise<T> On(Action<T> success, Action<Exception> error) {
22 public IPromise<T> On(Action<T> success, Action<Exception> error) {
23 if (success != null) {
23 if (success != null) {
24 try {
24 try {
25 success(m_value);
25 success(m_value);
26 // Analysis disable once EmptyGeneralCatchClause
26 // Analysis disable once EmptyGeneralCatchClause
27 } catch {
27 } catch {
28 }
28 }
29 }
29 }
30 return this;
30 return this;
31 }
31 }
32
32
33 public IPromise<T> On(Action<T> success) {
33 public IPromise<T> On(Action<T> success) {
34 if (success != null) {
34 if (success != null) {
35 try {
35 try {
36 success(m_value);
36 success(m_value);
37 // Analysis disable once EmptyGeneralCatchClause
37 // Analysis disable once EmptyGeneralCatchClause
38 } catch {
38 } catch {
39 }
39 }
40 }
40 }
41 return this;
41 return this;
42 }
42 }
43
43
44 public T Join() {
44 public T Join() {
45 return m_value;
45 return m_value;
46 }
46 }
47
47
48 public T Join(int timeout) {
48 public T Join(int timeout) {
49 return m_value;
49 return m_value;
50 }
50 }
51
51
52 public IPromise<T> On(Action success, Action<Exception> error, Action<Exception> cancel) {
52 public IPromise<T> On(Action success, Action<Exception> error, Action<Exception> cancel) {
53 if (success != null) {
53 if (success != null) {
54 try {
54 try {
55 success();
55 success();
56 // Analysis disable once EmptyGeneralCatchClause
56 // Analysis disable once EmptyGeneralCatchClause
57 } catch {
57 } catch {
58 }
58 }
59 }
59 }
60 return this;
60 return this;
61 }
61 }
62
62
63 public IPromise<T> On(Action success, Action<Exception> error) {
63 public IPromise<T> On(Action success, Action<Exception> error) {
64 if (success != null) {
64 if (success != null) {
65 try {
65 try {
66 success();
66 success();
67 // Analysis disable once EmptyGeneralCatchClause
67 // Analysis disable once EmptyGeneralCatchClause
68 } catch {
68 } catch {
69 }
69 }
70 }
70 }
71 return this;
71 return this;
72 }
72 }
73
73
74 public IPromise<T> On(Action success) {
74 public IPromise<T> On(Action success) {
75 if (success != null) {
75 if (success != null) {
76 try {
76 try {
77 success();
77 success();
78 // Analysis disable once EmptyGeneralCatchClause
78 // Analysis disable once EmptyGeneralCatchClause
79 } catch {
79 } catch {
80 }
80 }
81 }
81 }
82 return this;
82 return this;
83 }
83 }
84
84
85 public IPromise<T> On(Action handler, PromiseEventType events) {
85 public IPromise<T> On(Action handler, PromiseEventType events) {
86 if (handler != null && events.HasFlag(PromiseEventType.Success)) {
86 if (handler != null && events.HasFlag(PromiseEventType.Success)) {
87 try {
87 try {
88 handler();
88 handler();
89 // Analysis disable once EmptyGeneralCatchClause
89 // Analysis disable once EmptyGeneralCatchClause
90 } catch {
90 } catch {
91 }
91 }
92 }
92 }
93 return this;
93 return this;
94 }
94 }
95
95
96 IPromise IPromise.On(Action success, Action<Exception> error, Action<Exception> cancel) {
96 IPromise IPromise.On(Action success, Action<Exception> error, Action<Exception> cancel) {
97 return On(success, error, cancel);
97 return On(success, error, cancel);
98 }
98 }
99
99
100 IPromise IPromise.On(Action success, Action<Exception> error) {
100 IPromise IPromise.On(Action success, Action<Exception> error) {
101 return On(success, error);
101 return On(success, error);
102 }
102 }
103
103
104 IPromise IPromise.On(Action success) {
104 IPromise IPromise.On(Action success) {
105 return On(success);
105 return On(success);
106 }
106 }
107
107
108 IPromise IPromise.On(Action handler, PromiseEventType events) {
108 IPromise IPromise.On(Action handler, PromiseEventType events) {
109 return On(handler, events);
109 return On(handler, events);
110 }
110 }
111
111
112 public IPromise<T2> Cast<T2>() {
112 public IPromise<T2> Cast<T2>() {
113 return new SuccessPromise<T2>((T2)(object)m_value);
113 return new SuccessPromise<T2>((T2)(object)m_value);
114 }
114 }
115
115
116 void IPromise.Join() {
116 void IPromise.Join() {
117 }
117 }
118
118
119 void IPromise.Join(int timeout) {
119 void IPromise.Join(int timeout) {
120 }
120 }
121
121
122 public Type PromiseType {
122 public Type ResultType {
123 get {
123 get {
124 return typeof(T);
124 return typeof(T);
125 }
125 }
126 }
126 }
127
127
128 public bool IsResolved {
128 public bool IsFulfilled {
129 get {
129 get {
130 return true;
130 return true;
131 }
131 }
132 }
132 }
133
133
134 public bool IsCancelled {
134 public bool IsCancelled {
135 get {
135 get {
136 return false;
136 return false;
137 }
137 }
138 }
138 }
139
139
140 public Exception Error {
140 public Exception RejectReason {
141 get {
141 get {
142 return null;
142 return null;
143 }
143 }
144 }
144 }
145
145
146 public void Cancel() {
146 public void Cancel() {
147 }
147 }
148
148
149 public void Cancel(Exception reason) {
149 public void Cancel(Exception reason) {
150 }
150 }
151 }
151 }
152 }
152 }
153
153
@@ -1,5 +1,6
1 <?xml version="1.0" encoding="utf-8"?>
1 <?xml version="1.0" encoding="utf-8"?>
2 <repositories>
2 <repositories>
3 <repository path="../Implab.Test/Implab.Format.Test/packages.config" />
3 <repository path="../Implab.Test/Implab.Format.Test/packages.config" />
4 <repository path="../Implab.Test/packages.config" />
4 <repository path="../MonoPlay/packages.config" />
5 <repository path="../MonoPlay/packages.config" />
5 </repositories> No newline at end of file
6 </repositories>
General Comments 3
Under Review
author

Auto status change to "Under Review"

Approved
author

ok, latest stable version should be in default

You need to be logged in to leave comments. Login now