##// END OF EJS Templates
Working on promises
cin -
r242:cbe10ac0731e v3
parent child
Show More
@@ -1,204 +1,204
1 1 using Implab.Components;
2 2 using Implab.Diagnostics;
3 3 using Implab.Parallels;
4 4 using System;
5 5 using System.Collections.Generic;
6 6 using System.Linq;
7 7 using System.Text;
8 8 using System.Threading;
9 9 using System.Threading.Tasks;
10 10 using System.Windows.Forms;
11 11
12 12 namespace Implab.Fx {
13 13 public class StaApartment : RunnableComponent {
14 14 readonly Thread m_worker;
15 15 SynchronizationContext m_syncContext;
16 16 SyncContextPromise m_enterPromise;
17 17
18 18 readonly Promise m_threadStarted;
19 19 readonly Promise m_threadTerminated;
20 20
21 21 public StaApartment() : base(true) {
22 22 m_threadStarted = new Promise();
23 23 m_threadTerminated = new Promise();
24 24
25 25 m_worker = new Thread(WorkerEntry);
26 26 m_worker.SetApartmentState(ApartmentState.STA);
27 27 m_worker.IsBackground = true;
28 28 m_worker.Name = "STA managed aparment";
29 29 }
30 30
31 31 public SynchronizationContext SyncContext {
32 32 get {
33 33 if (m_syncContext == null)
34 34 throw new InvalidOperationException();
35 35 return m_syncContext;
36 36 }
37 37 }
38 38
39 39 /// <summary>
40 40 /// Returns the promise which will dispatch all handlers inside the apartment using it's <see cref="SynchronizationContext"/>
41 41 /// </summary>
42 42 /// <remarks>
43 43 /// Current implementation is optimized and will lost aync operation stack
44 44 /// </remarks>
45 45 /// <returns>The promise</returns>
46 46 public IPromise Enter() {
47 47 if (m_enterPromise == null)
48 48 throw new InvalidOperationException();
49 49 return m_enterPromise;
50 50 }
51 51
52 52 public IPromise Invoke(Action<ICancellationToken> action) {
53 53 Safe.ArgumentNotNull(action, "action");
54 54
55 55 if (m_syncContext == null)
56 56 throw new InvalidOperationException();
57 57 var p = new Promise();
58 58 var lop = TraceContext.Instance.CurrentOperation;
59 59
60 60 m_syncContext.Post(x => {
61 61 TraceContext.Instance.EnterLogicalOperation(lop, false);
62 62 try {
63 63 if (p.CancelOperationIfRequested())
64 64 return;
65 65
66 66 action(p);
67 67 p.Resolve();
68 68 } catch (Exception e) {
69 69 p.Reject(e);
70 70 } finally {
71 71 TraceContext.Instance.Leave();
72 72 }
73 73 }, null);
74 74
75 75 return p;
76 76 }
77 77
78 78 public IPromise<T> Invoke<T>(Func<ICancellationToken, T> action) {
79 79 Safe.ArgumentNotNull(action, "action");
80 80
81 81 if (m_syncContext == null)
82 82 throw new InvalidOperationException();
83 83 var p = new Promise<T>();
84 84 var lop = TraceContext.Instance.CurrentOperation;
85 85
86 86 m_syncContext.Post(x => {
87 87 TraceContext.Instance.EnterLogicalOperation(lop, false);
88 88 try {
89 89 if (p.CancelOperationIfRequested())
90 90 return;
91 91 p.Resolve(action(p));
92 92 } catch (Exception e) {
93 93 p.Reject(e);
94 94 } finally {
95 95 TraceContext.Instance.Leave();
96 96 }
97 97 }, null);
98 98
99 99 return p;
100 100 }
101 101
102 102 public IPromise Invoke(Action action) {
103 103 Safe.ArgumentNotNull(action, "action");
104 104
105 105 if (m_syncContext == null)
106 106 throw new InvalidOperationException();
107 107 var p = new Promise();
108 108 var lop = TraceContext.Instance.CurrentOperation;
109 109
110 110 m_syncContext.Post(x => {
111 111 TraceContext.Instance.EnterLogicalOperation(lop, false);
112 112 try {
113 113 if (p.CancelOperationIfRequested())
114 114 return;
115 115 action();
116 116 p.Resolve();
117 117 } catch (Exception e) {
118 118 p.Reject(e);
119 119 } finally {
120 120 TraceContext.Instance.Leave();
121 121 }
122 122 }, null);
123 123
124 124 return p;
125 125 }
126 126
127 127 public IPromise<T> Invoke<T>(Func<T> action) {
128 128 Safe.ArgumentNotNull(action, "action");
129 129
130 130 if (m_syncContext == null)
131 131 throw new InvalidOperationException();
132 132 var p = new Promise<T>();
133 133 var lop = TraceContext.Instance.CurrentOperation;
134 134
135 135 m_syncContext.Post(x => {
136 136 TraceContext.Instance.EnterLogicalOperation(lop, false);
137 137 try {
138 138 if (p.CancelOperationIfRequested())
139 139 return;
140 140 p.Resolve(action());
141 141 } catch (Exception e) {
142 142 p.Reject(e);
143 143 } finally {
144 144 TraceContext.Instance.Leave();
145 145 }
146 146 }, null);
147 147
148 148 return p;
149 149 }
150 150
151 151
152 152 /// <summary>
153 153 /// Starts the apartment thread
154 154 /// </summary>
155 155 /// <returns>Promise which will be fullfiled when the syncronization
156 156 /// context will be ready to accept tasks.</returns>
157 157 protected override IPromise OnStart() {
158 158 m_worker.Start();
159 159 return m_threadStarted;
160 160 }
161 161
162 162 /// <summary>
163 163 /// Posts quit message to the message loop of the apartment
164 164 /// </summary>
165 165 /// <returns>Promise</returns>
166 166 protected override IPromise OnStop() {
167 167 m_syncContext.Post(x => Application.ExitThread(), null);
168 168 return m_threadTerminated;
169 169 }
170 170
171 171 void WorkerEntry() {
172 172 m_syncContext = new WindowsFormsSynchronizationContext();
173 173 SynchronizationContext.SetSynchronizationContext(m_syncContext);
174 174 m_enterPromise = new SyncContextPromise(m_syncContext);
175 175 m_threadStarted.Resolve();
176 176 m_enterPromise.Resolve();
177 177
178 178 Application.OleRequired();
179 179 Application.Run();
180 180
181 181 try {
182 182 OnShutdown();
183 183 m_threadTerminated.Resolve();
184 184 } catch(Exception err) {
185 185 m_threadTerminated.Reject(err);
186 186 }
187 187 }
188 188
189 189 /// <summary>
190 190 /// Called from the STA apartment after the message loop is terminated, override this
191 191 /// method to handle apartment cleanup.
192 192 /// </summary>
193 193 protected virtual void OnShutdown() {
194 194 }
195 195
196 196 protected override void Dispose(bool disposing) {
197 197 if (disposing) {
198 if (!m_threadTerminated.IsResolved)
198 if (!m_threadTerminated.IsFulfilled)
199 199 m_syncContext.Post(x => Application.ExitThread(), null);
200 200 }
201 201 base.Dispose(disposing);
202 202 }
203 203 }
204 204 }
@@ -1,148 +1,148
1 1 using System;
2 2 using Implab.Parallels;
3 3
4 4 #if MONO
5 5
6 6 using NUnit.Framework;
7 7 using TestClassAttribute = NUnit.Framework.TestFixtureAttribute;
8 8 using TestMethodAttribute = NUnit.Framework.TestAttribute;
9 9
10 10 #else
11 11
12 12 using Microsoft.VisualStudio.TestTools.UnitTesting;
13 13
14 14 #endif
15 15
16 16 namespace Implab.Test {
17 17 [TestClass]
18 18 public class CancelationTests {
19 19
20 20 [TestMethod]
21 21 public void PromiseCancelTest() {
22 22 var p = new Promise();
23 23 bool requested = false;
24 24 var reason = new Exception("Test");
25 25
26 26 // request cancelation
27 27 p.Cancel(reason);
28 28
29 29 Assert.IsTrue(p.IsCancellationRequested);
30 30 Assert.AreSame(reason, p.CancellationReason);
31 31 Assert.IsFalse(p.IsCancelled);
32 32
33 33 p.CancellationRequested(r => {
34 34 Assert.AreSame(reason, r);
35 35 requested = true;
36 36 });
37 37
38 38 Assert.IsTrue(requested);
39 39
40 40 // cancel the promise
41 41 Assert.IsTrue(p.CancelOperationIfRequested());
42 42 Assert.IsTrue(p.IsCancelled);
43 Assert.AreSame(reason, p.Error);
43 Assert.AreSame(reason, p.RejectReason);
44 44 }
45 45
46 46 [TestMethod]
47 47 public void CancelActionBeforeStartTask() {
48 48 bool run = false;
49 49 var task = new ActionTask(() => {
50 50 run = true;
51 51 }, null, null, true);
52 52
53 53 // request cancelation
54 54 task.Cancel();
55 55 Assert.IsTrue(task.IsCancelled);
56 56 task.Resolve();
57 57 Assert.IsFalse(run);
58 58 }
59 59
60 60 [TestMethod]
61 61 public void CancelActionAfterTaskStarted() {
62 62 var finish = new Signal();
63 63 var started = new Signal();
64 64
65 65 var task = new ActionTask(() => {
66 66 started.Set();
67 67 finish.Wait();
68 68 }, null, null, true);
69 69
70 70 AsyncPool.RunThread(() => {
71 71 task.Resolve();
72 72 });
73 73
74 74 started.Wait(1000);
75 75
76 76 task.Cancel();
77 77 Assert.IsTrue(task.IsCancellationRequested);
78 78 Assert.IsFalse(task.IsCancelled);
79 Assert.IsFalse(task.IsResolved);
79 Assert.IsFalse(task.IsFulfilled);
80 80
81 81 finish.Set();
82 82 task.Join(1000);
83 83
84 84 }
85 85
86 86 [TestMethod]
87 87 public void CancelTaskChainFromBottom() {
88 88 var started = new Signal();
89 89 var check1 = new Signal();
90 90 var requested = false;
91 91 var p1 = AsyncPool.RunThread(token => {
92 92 token.CancellationRequested(reason => requested = true);
93 93 started.Set();
94 94 check1.Wait();
95 95 token.CancelOperationIfRequested();
96 96 });
97 97
98 98 started.Wait();
99 99
100 100 var p2 = p1.Then(() => {
101 101 });
102 102
103 103 Assert.IsFalse(p1.IsResolved);
104 104 Assert.IsFalse(p2.IsResolved);
105 105
106 106 p2.Cancel();
107 107
108 108 Assert.IsFalse(p2.IsCancelled);
109 109 Assert.IsFalse(p1.IsCancelled);
110 110 Assert.IsTrue(requested);
111 111
112 112 check1.Set();
113 113
114 114 try {
115 115 p2.Join(1000);
116 116 Assert.Fail("The chain isn't cancelled");
117 117 } catch(OperationCanceledException){
118 118 }
119 119
120 120 Assert.IsTrue(p1.IsCancelled);
121 121 Assert.IsTrue(p2.IsCancelled);
122 122 }
123 123
124 124
125 125
126 126 [TestMethod]
127 127 public void CancellableAsyncTask() {
128 128 var finish = new Signal();
129 129 var started = new Signal();
130 130
131 131 var p = AsyncPool.RunThread(token => {
132 132 token.CancellationRequested(r => finish.Set());
133 133 started.Set();
134 134 finish.Wait();
135 135 Assert.IsTrue(token.CancelOperationIfRequested());
136 136 });
137 137
138 138 started.Wait(1000);
139 139 Assert.IsFalse(p.IsResolved);
140 140 p.Cancel();
141 141 try {
142 142 p.Join(1000);
143 143 } catch (OperationCanceledException) {
144 144 }
145 145 }
146 146 }
147 147 }
148 148
@@ -1,298 +1,170
1 1 using System;
2 2 using Implab.Parallels;
3 3 using System.Threading;
4 4 using System.Reflection;
5 using System.Diagnostics;
5 6
6 7 namespace Implab {
7 public abstract class AbstractEvent<THandler> : ICancellable {
8 public abstract class AbstractEvent<THandler> where THandler : class {
8 9
9 const int UNRESOLVED_SATE = 0;
10 const int TRANSITIONAL_STATE = 1;
10 const int PENDING_SATE = 0;
11 protected const int TRANSITIONAL_STATE = 1;
12
11 13 protected const int SUCCEEDED_STATE = 2;
12 14 protected const int REJECTED_STATE = 3;
13 protected const int CANCELLED_STATE = 4;
14 15
15 const int CANCEL_NOT_REQUESTED = 0;
16 const int CANCEL_REQUESTING = 1;
17 const int CANCEL_REQUESTED = 2;
18
19 const int RESERVED_HANDLERS_COUNT = 4;
20
21 int m_state;
16 volatile int m_state;
22 17 Exception m_error;
23 int m_handlersCount;
24 18
25 //readonly THandler[] m_handlers = new THandler[RESERVED_HANDLERS_COUNT];
26 THandler[] m_handlers;
19 THandler m_handler;
27 20 SimpleAsyncQueue<THandler> m_extraHandlers;
28 int m_handlerPointer = -1;
29 int m_handlersCommited;
30
31 int m_cancelRequest;
32 Exception m_cancelationReason;
33 21
34 22 #region state managment
35 bool BeginTransit() {
36 return UNRESOLVED_SATE == Interlocked.CompareExchange(ref m_state, TRANSITIONAL_STATE, UNRESOLVED_SATE);
23 protected bool BeginTransit() {
24 return PENDING_SATE == Interlocked.CompareExchange(ref m_state, TRANSITIONAL_STATE, PENDING_SATE);
37 25 }
38 26
39 void CompleteTransit(int state) {
27 protected void CompleteTransit(int state) {
28 #if DEBUG
40 29 if (TRANSITIONAL_STATE != Interlocked.CompareExchange(ref m_state, state, TRANSITIONAL_STATE))
41 30 throw new InvalidOperationException("Can't complete transition when the object isn't in the transitional state");
31 #else
32 m_state = state;
33 #endif
34 Signal();
42 35 }
43 36
44 void WaitTransition() {
45 while (m_state == TRANSITIONAL_STATE) {
46 Thread.MemoryBarrier();
37 protected void WaitTransition() {
38 if (m_state == TRANSITIONAL_STATE) {
39 SpinWait spin;
40 do {
41 spin.SpinOnce();
42 } while (m_state == TRANSITIONAL_STATE);
47 43 }
48 44 }
49 45
50 46 protected bool BeginSetResult() {
51 47 if (!BeginTransit()) {
52 48 WaitTransition();
53 if (m_state != CANCELLED_STATE)
54 throw new InvalidOperationException("The promise is already resolved");
55 49 return false;
56 50 }
57 51 return true;
58 52 }
59 53
60 54 protected void EndSetResult() {
61 55 CompleteTransit(SUCCEEDED_STATE);
62 Signal();
63 56 }
64 57
65 58
66 59
67 60 /// <summary>
68 61 /// Выполняет обещание, сообщая об ошибке
69 62 /// </summary>
70 63 /// <remarks>
71 64 /// Поскольку обещание должно работать в многопточной среде, при его выполнении сразу несколько потоков
72 65 /// могу вернуть ошибку, при этом только первая будет использована в качестве результата, остальные
73 66 /// будут проигнорированы.
74 67 /// </remarks>
75 68 /// <param name="error">Исключение возникшее при выполнении операции</param>
76 69 /// <exception cref="InvalidOperationException">Данное обещание уже выполнено</exception>
77 70 protected void SetError(Exception error) {
78 71 if (BeginTransit()) {
79 72 m_error = error;
80 73 CompleteTransit(REJECTED_STATE);
81
82 Signal();
83 74 } else {
84 75 WaitTransition();
85 76 if (m_state == SUCCEEDED_STATE)
86 77 throw new InvalidOperationException("The promise is already resolved");
87 78 }
88 79 }
89 80
90 /// <summary>
91 /// Отменяет операцию, если это возможно.
92 /// </summary>
93 /// <remarks>Для определения была ли операция отменена следует использовать свойство <see cref="IsCancelled"/>.</remarks>
94 protected void SetCancelled(Exception reason) {
95 if (BeginTransit()) {
96 m_error = reason;
97 CompleteTransit(CANCELLED_STATE);
98 Signal();
99 }
100 }
101
102 81 protected abstract void SignalHandler(THandler handler, int signal);
103 82
104 83 void Signal() {
105 var hp = m_handlerPointer;
106 var slot = hp +1 ;
107 while (slot < m_handlersCommited) {
108 if (Interlocked.CompareExchange(ref m_handlerPointer, slot, hp) == hp) {
109 SignalHandler(m_handlers[slot], m_state);
110 }
111 hp = m_handlerPointer;
112 slot = hp +1 ;
113 }
114
115
116 if (m_extraHandlers != null) {
117 84 THandler handler;
118 while (m_extraHandlers.TryDequeue(out handler))
85 while (TryDequeueHandler(out handler))
119 86 SignalHandler(handler, m_state);
120 87 }
121 }
122 88
123 89 #endregion
124 90
125 protected abstract Signal GetResolveSignal();
91 protected abstract Signal GetFulfillSignal();
126 92
127 93 #region synchronization traits
128 94 protected void WaitResult(int timeout) {
129 if (!(IsResolved || GetResolveSignal().Wait(timeout)))
95 if (!(IsFulfilled || GetFulfillSignal().Wait(timeout)))
130 96 throw new TimeoutException();
131 97
132 switch (m_state) {
133 case SUCCEEDED_STATE:
134 return;
135 case CANCELLED_STATE:
136 throw new OperationCanceledException("The operation has been cancelled", m_error);
137 case REJECTED_STATE:
98 if (IsRejected)
99 Rethrow();
100 }
101
102 protected void Rethrow() {
103 Debug.Assert(m_error != null);
104 if (m_error is OperationCanceledException)
105 throw new OperationCanceledException("Operation cancelled", m_error);
106 else
138 107 throw new TargetInvocationException(m_error);
139 default:
140 throw new ApplicationException(String.Format("The promise state {0} is invalid", m_state));
141 }
142 108 }
143 109 #endregion
144 110
145 111 #region handlers managment
146 112
147 113 protected void AddHandler(THandler handler) {
148 114
149 115 if (m_state > 1) {
150 116 // the promise is in the resolved state, just invoke the handler
151 117 SignalHandler(handler, m_state);
152 118 } else {
153 var slot = Interlocked.Increment(ref m_handlersCount) - 1;
154
155 if (slot < RESERVED_HANDLERS_COUNT) {
156
157 if (slot == 0) {
158 m_handlers = new THandler[RESERVED_HANDLERS_COUNT];
159 } else {
160 while (m_handlers == null)
161 Thread.MemoryBarrier();
162 }
163
164 m_handlers[slot] = handler;
165
166 while (slot != Interlocked.CompareExchange(ref m_handlersCommited, slot + 1, slot)) {
119 if (Interlocked.CompareExchange(ref m_handler, handler, null) != null) {
120 if (m_extraHandlers == null)
121 // compare-exchange will fprotect from loosing already created queue
122 Interlocked.CompareExchange(ref m_extraHandlers, new SimpleAsyncQueue<THandler>(), null);
123 m_extraHandlers.Enqueue(handler);
167 124 }
168 125
169 if (m_state > 1) {
170 do {
171 var hp = m_handlerPointer;
172 slot = hp + 1;
173 if (slot < m_handlersCommited) {
174 if (Interlocked.CompareExchange(ref m_handlerPointer, slot, hp) != hp)
175 continue;
176 SignalHandler(m_handlers[slot], m_state);
177 }
178 break;
179 } while(true);
180 }
181 } else {
182 if (slot == RESERVED_HANDLERS_COUNT) {
183 m_extraHandlers = new SimpleAsyncQueue<THandler>();
184 } else {
185 while (m_extraHandlers == null)
186 Thread.MemoryBarrier();
187 }
188
189 m_extraHandlers.Enqueue(handler);
190
191 if (m_state > 1 && m_extraHandlers.TryDequeue(out handler))
126 if (m_state > 1 && TryDequeueHandler(out handler))
192 127 // if the promise have been resolved while we was adding the handler to the queue
193 128 // we can't guarantee that someone is still processing it
194 129 // therefore we need to fetch a handler from the queue and execute it
195 130 // note that fetched handler may be not the one that we have added
196 131 // even we can fetch no handlers at all :)
197 132 SignalHandler(handler, m_state);
198 133 }
134
199 135 }
136
137 bool TryDequeueHandler(out THandler handler) {
138 handler = Interlocked.Exchange(ref m_handler, null);
139 if (handler != null)
140 return true;
141 return m_extraHandlers != null && m_extraHandlers.TryDequeue(out handler);
200 142 }
201 143
202 144 #endregion
203 145
204 146 #region IPromise implementation
205 147
206 public bool IsResolved {
148 public bool IsFulfilled {
207 149 get {
208 Thread.MemoryBarrier();
209 return m_state > 1;
150 return m_state > TRANSITIONAL_STATE;
210 151 }
211 152 }
212 153
213 public bool IsCancelled {
154 public bool IsRejected {
214 155 get {
215 Thread.MemoryBarrier();
216 return m_state == CANCELLED_STATE;
156 return m_state == REJECTED_STATE;
217 157 }
218 158 }
219 159
220 160 #endregion
221 161
222 public Exception Error {
162 public Exception RejectReason {
223 163 get {
224 164 return m_error;
225 165 }
226 166 }
227 167
228 public bool CancelOperationIfRequested() {
229 if (IsCancellationRequested) {
230 CancelOperation(CancellationReason);
231 return true;
232 }
233 return false;
234 }
235
236 public virtual void CancelOperation(Exception reason) {
237 SetCancelled(reason);
238 }
239
240 public void CancellationRequested(Action<Exception> handler) {
241 Safe.ArgumentNotNull(handler, "handler");
242 if (IsCancellationRequested)
243 handler(CancellationReason);
244
245 if (m_cancelationHandlers == null)
246 Interlocked.CompareExchange(ref m_cancelationHandlers, new SimpleAsyncQueue<Action<Exception>>(), null);
247
248 m_cancelationHandlers.Enqueue(handler);
249
250 if (IsCancellationRequested && m_cancelationHandlers.TryDequeue(out handler))
251 // TryDeque implies MemoryBarrier()
252 handler(m_cancelationReason);
253 }
254
255 public bool IsCancellationRequested {
256 get {
257 do {
258 if (m_cancelRequest == CANCEL_NOT_REQUESTED)
259 return false;
260 if (m_cancelRequest == CANCEL_REQUESTED)
261 return true;
262 Thread.MemoryBarrier();
263 } while(true);
264 168 }
265 169 }
266 170
267 public Exception CancellationReason {
268 get {
269 do {
270 Thread.MemoryBarrier();
271 } while(m_cancelRequest == CANCEL_REQUESTING);
272
273 return m_cancelationReason;
274 }
275 }
276
277 #region ICancellable implementation
278
279 public void Cancel() {
280 Cancel(null);
281 }
282
283 public void Cancel(Exception reason) {
284 if (CANCEL_NOT_REQUESTED == Interlocked.CompareExchange(ref m_cancelRequest, CANCEL_REQUESTING, CANCEL_NOT_REQUESTED)) {
285 m_cancelationReason = reason;
286 m_cancelRequest = CANCEL_REQUESTED;
287 if (m_cancelationHandlers != null) {
288 Action<Exception> handler;
289 while (m_cancelationHandlers.TryDequeue(out handler))
290 handler(m_cancelationReason);
291 }
292 }
293 }
294
295 #endregion
296 }
297 }
298
@@ -1,140 +1,87
1 1 using System;
2 2 using Implab.Parallels;
3 3
4 4 namespace Implab {
5 5 public abstract class AbstractPromise : AbstractEvent<AbstractPromise.HandlerDescriptor>, IPromise {
6 public struct HandlerDescriptor {
6 public class HandlerDescriptor {
7 7 readonly Action m_handler;
8 8 readonly Action<Exception> m_error;
9 readonly Action<Exception> m_cancel;
10 readonly PromiseEventType m_mask;
11
12 public HandlerDescriptor(Action success, Action<Exception> error, Action<Exception> cancel) {
9 public HandlerDescriptor(Action success, Action<Exception> error) {
13 10 m_handler = success;
14 11 m_error = error;
15 m_cancel = cancel;
16 m_mask = PromiseEventType.Success;
17 }
18
19 public HandlerDescriptor(Action handler, PromiseEventType mask) {
20 m_handler = handler;
21 m_error = null;
22 m_cancel = null;
23 m_mask = mask;
24 12 }
25 13
26 14 public void SignalSuccess() {
27 if ((m_mask & PromiseEventType.Success) != 0 && m_handler != null) {
15 if (m_handler != null) {
28 16 try {
29 17 m_handler();
30 18 // Analysis disable once EmptyGeneralCatchClause
31 19 } catch {
32 20 }
33 21 }
34 22 }
35 23
36 24 public void SignalError(Exception err) {
37 25 if (m_error != null) {
38 26 try {
39 27 m_error(err);
40 28 // Analysis disable once EmptyGeneralCatchClause
41 29 } catch {
42 30 }
43 } else if ((m_mask & PromiseEventType.Error ) != 0 && m_handler != null) {
44 try {
45 m_handler();
46 // Analysis disable once EmptyGeneralCatchClause
47 } catch {
48 }
49 }
50 }
51
52 public void SignalCancel(Exception reason) {
53 if (m_cancel != null) {
54 try {
55 m_cancel(reason);
56 // Analysis disable once EmptyGeneralCatchClause
57 } catch {
58 }
59 } else if ( (m_mask & PromiseEventType.Cancelled) != 0 && m_handler != null) {
60 try {
61 m_handler();
62 // Analysis disable once EmptyGeneralCatchClause
63 } catch {
64 }
65 31 }
66 32 }
67 33 }
68 34
69 35
70 36 #region implemented abstract members of AbstractPromise
71 37
72 38 protected override void SignalHandler(HandlerDescriptor handler, int signal) {
73 39 switch (signal) {
74 40 case SUCCEEDED_STATE:
75 41 handler.SignalSuccess();
76 42 break;
77 43 case REJECTED_STATE:
78 handler.SignalError(Error);
79 break;
80 case CANCELLED_STATE:
81 handler.SignalCancel(CancellationReason);
44 handler.SignalError(RejectReason);
82 45 break;
83 46 default:
84 47 throw new InvalidOperationException(String.Format("Invalid promise signal: {0}", signal));
85 48 }
86 49 }
87 50
88 protected override Signal GetResolveSignal() {
51 protected override Signal GetFulfillSignal() {
89 52 var signal = new Signal();
90 On(signal.Set, PromiseEventType.All);
53 On(signal.Set, e => signal.Set());
91 54 return signal;
92 55 }
93 56
94 57 #endregion
95 58
96 public Type PromiseType {
59 public Type ResultType {
97 60 get {
98 61 return typeof(void);
99 62 }
100 63 }
101 64
102 public IPromise On(Action success, Action<Exception> error, Action<Exception> cancel) {
103 AddHandler(new HandlerDescriptor(success, error, cancel));
104 return this;
105 }
106
107 public IPromise On(Action success, Action<Exception> error) {
108 AddHandler(new HandlerDescriptor(success, error, null));
109 return this;
110 }
111
112 public IPromise On(Action success) {
113 AddHandler(new HandlerDescriptor(success, null, null));
114 return this;
115 }
116
117 public IPromise On(Action handler, PromiseEventType events) {
118 AddHandler(new HandlerDescriptor(handler,events));
119 return this;
65 public void On(Action success, Action<Exception> error) {
66 AddHandler(new HandlerDescriptor(success, error));
120 67 }
121 68
122 69 public IPromise<T> Cast<T>() {
123 70 throw new InvalidCastException();
124 71 }
125 72
126 73 public void Join() {
127 74 WaitResult(-1);
128 75 }
129 76
130 77 public void Join(int timeout) {
131 78 WaitResult(timeout);
132 79 }
133 80
134 81 protected void SetResult() {
135 82 if(BeginSetResult())
136 83 EndSetResult();
137 84 }
138 85 }
139 86 }
140 87
@@ -1,204 +1,204
1 1 using System;
2 2 using Implab.Parallels;
3 3
4 4 namespace Implab {
5 5 public abstract class AbstractPromise<T> : AbstractEvent<AbstractPromise<T>.HandlerDescriptor>, IPromise<T> {
6 6 public struct HandlerDescriptor {
7 7 readonly Action m_handler;
8 8 readonly Action<T> m_success;
9 9 readonly Action<Exception> m_error;
10 10 readonly Action<Exception> m_cancel;
11 11 readonly PromiseEventType m_mask;
12 12
13 13 public HandlerDescriptor(Action<T> success, Action<Exception> error, Action<Exception> cancel) {
14 14 m_success = success;
15 15 m_error = error;
16 16 m_cancel = cancel;
17 17
18 18 m_handler = null;
19 19 m_mask = 0;
20 20 }
21 21
22 22 public HandlerDescriptor(Action success, Action<Exception> error, Action<Exception> cancel) {
23 23 m_handler = success;
24 24 m_success = null;
25 25 m_error = error;
26 26 m_cancel = cancel;
27 27 m_mask = PromiseEventType.Success;
28 28 }
29 29
30 30 public HandlerDescriptor(Action handler, PromiseEventType mask) {
31 31 m_handler = handler;
32 32 m_mask = mask;
33 33 m_success = null;
34 34 m_error = null;
35 35 m_cancel = null;
36 36 }
37 37
38 38 public void SignalSuccess(T result) {
39 39 if (m_success != null) {
40 40 try {
41 41 m_success(result);
42 42 // Analysis disable once EmptyGeneralCatchClause
43 43 } catch {
44 44 }
45 45 } else if ((m_mask & PromiseEventType.Success) != 0 && m_handler != null) {
46 46 try {
47 47 m_handler();
48 48 // Analysis disable once EmptyGeneralCatchClause
49 49 } catch {
50 50 }
51 51 }
52 52 }
53 53
54 54 public void SignalError(Exception err) {
55 55 if (m_error != null) {
56 56 try {
57 57 m_error(err);
58 58 // Analysis disable once EmptyGeneralCatchClause
59 59 } catch {
60 60 }
61 61 } else if ((m_mask & PromiseEventType.Error) != 0 && m_handler != null) {
62 62 try {
63 63 m_handler();
64 64 // Analysis disable once EmptyGeneralCatchClause
65 65 } catch {
66 66 }
67 67 }
68 68 }
69 69
70 70 public void SignalCancel(Exception reason) {
71 71 if (m_cancel != null) {
72 72 try {
73 73 m_cancel(reason);
74 74 // Analysis disable once EmptyGeneralCatchClause
75 75 } catch {
76 76 }
77 77 } else if ((m_mask & PromiseEventType.Cancelled) != 0 && m_handler != null) {
78 78 try {
79 79 m_handler();
80 80 // Analysis disable once EmptyGeneralCatchClause
81 81 } catch {
82 82 }
83 83 }
84 84 }
85 85 }
86 86
87 public Type PromiseType {
87 public Type ResultType {
88 88 get {
89 89 return typeof(T);
90 90 }
91 91 }
92 92
93 93 public T Join() {
94 94 WaitResult(-1);
95 95 return m_result;
96 96 }
97 97 public T Join(int timeout) {
98 98 WaitResult(timeout);
99 99 return m_result;
100 100 }
101 101
102 102 void IPromise.Join() {
103 103 WaitResult(-1);
104 104 }
105 105 void IPromise.Join(int timeout) {
106 106 WaitResult(timeout);
107 107 }
108 108
109 109 public IPromise<T> On(Action<T> success, Action<Exception> error, Action<Exception> cancel) {
110 110 AddHandler(new HandlerDescriptor(success, error, cancel));
111 111 return this;
112 112 }
113 113
114 114 public IPromise<T> On(Action<T> success, Action<Exception> error) {
115 115 AddHandler(new HandlerDescriptor(success, error, null));
116 116 return this;
117 117 }
118 118
119 119 public IPromise<T> On(Action<T> success) {
120 120 AddHandler(new HandlerDescriptor(success, null, null));
121 121 return this;
122 122 }
123 123
124 124 public IPromise<T> On(Action handler, PromiseEventType events) {
125 125 AddHandler(new HandlerDescriptor(handler, events));
126 126 return this;
127 127 }
128 128
129 129 public IPromise<T> On(Action success, Action<Exception> error, Action<Exception> cancel) {
130 130 AddHandler(new HandlerDescriptor(success, error, cancel));
131 131 return this;
132 132 }
133 133
134 134 public IPromise<T> On(Action success, Action<Exception> error) {
135 135 AddHandler(new HandlerDescriptor(success, error, null));
136 136 return this;
137 137 }
138 138
139 139 public IPromise<T> On(Action success) {
140 140 AddHandler(new HandlerDescriptor(success, null, null));
141 141 return this;
142 142 }
143 143
144 144 IPromise IPromise.On(Action success, Action<Exception> error, Action<Exception> cancel) {
145 145 AddHandler(new HandlerDescriptor(success, error, cancel));
146 146 return this;
147 147 }
148 148
149 149 IPromise IPromise.On(Action success, Action<Exception> error) {
150 150 AddHandler(new HandlerDescriptor(success, error, null));
151 151 return this;
152 152 }
153 153
154 154 IPromise IPromise.On(Action success) {
155 155 AddHandler(new HandlerDescriptor(success, null, null));
156 156 return this;
157 157 }
158 158
159 159 IPromise IPromise.On(Action handler, PromiseEventType events) {
160 160 AddHandler(new HandlerDescriptor(handler, events));
161 161 return this;
162 162 }
163 163
164 164 public IPromise<T2> Cast<T2>() {
165 165 return (IPromise<T2>)this;
166 166 }
167 167
168 168 #region implemented abstract members of AbstractPromise
169 169
170 protected override Signal GetResolveSignal() {
170 protected override Signal GetFulfillSignal() {
171 171 var signal = new Signal();
172 172 AddHandler(new HandlerDescriptor(signal.Set, PromiseEventType.All));
173 173 return signal;
174 174 }
175 175
176 176 protected override void SignalHandler(HandlerDescriptor handler, int signal) {
177 177 switch (signal) {
178 178 case SUCCEEDED_STATE:
179 179 handler.SignalSuccess(m_result);
180 180 break;
181 181 case REJECTED_STATE:
182 handler.SignalError(Error);
182 handler.SignalError(RejectReason);
183 183 break;
184 184 case CANCELLED_STATE:
185 185 handler.SignalCancel(CancellationReason);
186 186 break;
187 187 default:
188 188 throw new InvalidOperationException(String.Format("Invalid promise signal: {0}", signal));
189 189 }
190 190 }
191 191
192 192 #endregion
193 193
194 194 T m_result;
195 195
196 196 protected void SetResult(T value) {
197 197 if (BeginSetResult()) {
198 198 m_result = value;
199 199 EndSetResult();
200 200 }
201 201 }
202 202 }
203 203 }
204 204
@@ -1,100 +1,100
1 1 using System;
2 2 using System.Reflection;
3 3
4 4 namespace Implab {
5 5 public class FailedPromise : IPromise {
6 6 readonly Exception m_error;
7 7 public FailedPromise(Exception error) {
8 8 Safe.ArgumentNotNull(error, "error");
9 9 m_error = error;
10 10 }
11 11
12 12 #region IPromise implementation
13 13
14 14 public IPromise On(Action success, Action<Exception> error, Action<Exception> cancel) {
15 15 if (error != null) {
16 16 try {
17 17 error(m_error);
18 18 // Analysis disable once EmptyGeneralCatchClause
19 19 } catch {
20 20 }
21 21 }
22 22 return this;
23 23 }
24 24
25 25 public IPromise On(Action success, Action<Exception> error) {
26 26 if (error != null) {
27 27 try {
28 28 error(m_error);
29 29 // Analysis disable once EmptyGeneralCatchClause
30 30 } catch {
31 31 }
32 32 }
33 33 return this;
34 34 }
35 35
36 36 public IPromise On(Action success) {
37 37 return this;
38 38 }
39 39
40 40 public IPromise On(Action handler, PromiseEventType events) {
41 41 if ((events & PromiseEventType.Error) != 0) {
42 42 try {
43 43 handler();
44 44 // Analysis disable once EmptyGeneralCatchClause
45 45 } catch {
46 46 }
47 47 }
48 48 return this;
49 49 }
50 50
51 51 public IPromise<T> Cast<T>() {
52 52 return (IPromise<T>)this;
53 53 }
54 54
55 55 public void Join() {
56 throw new TargetInvocationException(Error);
56 throw new TargetInvocationException(RejectReason);
57 57 }
58 58
59 59 public void Join(int timeout) {
60 throw new TargetInvocationException(Error);
60 throw new TargetInvocationException(RejectReason);
61 61 }
62 62
63 public virtual Type PromiseType {
63 public virtual Type ResultType {
64 64 get {
65 65 return typeof(void);
66 66 }
67 67 }
68 68
69 public bool IsResolved {
69 public bool IsFulfilled {
70 70 get {
71 71 return true;
72 72 }
73 73 }
74 74
75 75 public bool IsCancelled {
76 76 get {
77 77 return false;
78 78 }
79 79 }
80 80
81 public Exception Error {
81 public Exception RejectReason {
82 82 get {
83 83 return m_error;
84 84 }
85 85 }
86 86
87 87 #endregion
88 88
89 89 #region ICancellable implementation
90 90
91 91 public void Cancel() {
92 92 }
93 93
94 94 public void Cancel(Exception reason) {
95 95 }
96 96
97 97 #endregion
98 98 }
99 99 }
100 100
@@ -1,65 +1,65
1 1 using System;
2 2 using System.Reflection;
3 3
4 4 namespace Implab {
5 5 public class FailedPromise<T> : FailedPromise, IPromise<T> {
6 6 public FailedPromise(Exception error) : base(error) {
7 7 }
8 8
9 9 public IPromise<T> On(Action<T> success, Action<Exception> error, Action<Exception> cancel) {
10 10 if (error != null) {
11 11 try {
12 error(Error);
12 error(RejectReason);
13 13 // Analysis disable once EmptyGeneralCatchClause
14 14 } catch {
15 15 }
16 16 }
17 17 return this;
18 18 }
19 19
20 20 public IPromise<T> On(Action<T> success, Action<Exception> error) {
21 21 if (error != null) {
22 22 try {
23 error(Error);
23 error(RejectReason);
24 24 // Analysis disable once EmptyGeneralCatchClause
25 25 } catch {
26 26 }
27 27 }
28 28 return this;
29 29 }
30 30
31 31 public IPromise<T> On(Action<T> success) {
32 32 return this;
33 33 }
34 34
35 35 T IPromise<T>.Join() {
36 throw new TargetInvocationException(Error);
36 throw new TargetInvocationException(RejectReason);
37 37 }
38 38
39 39 T IPromise<T>.Join(int timeout) {
40 throw new TargetInvocationException(Error);
40 throw new TargetInvocationException(RejectReason);
41 41 }
42 42
43 43
44 44 IPromise<T> IPromise<T>.On(Action success, Action<Exception> error, Action<Exception> cancel) {
45 45 On(success, error, cancel);
46 46 return this;
47 47 }
48 48
49 49 IPromise<T> IPromise<T>.On(Action success, Action<Exception> error) {
50 50 On(success, error);
51 51 return this;
52 52 }
53 53
54 54 IPromise<T> IPromise<T>.On(Action success) {
55 55 On(success);
56 56 return this;
57 57 }
58 58
59 59 IPromise<T> IPromise<T>.On(Action handler, PromiseEventType events) {
60 60 On(handler, events);
61 61 return this;
62 62 }
63 63 }
64 64 }
65 65
@@ -1,66 +1,53
1 1 using System;
2 2 using System.Collections.Generic;
3 3 using System.Linq;
4 4 using System.Text;
5 5
6 6 namespace Implab {
7 7 public interface IPromise: ICancellable {
8 8
9 9 /// <summary>
10 10 /// Тип результата, получаемого через данное обещание.
11 11 /// </summary>
12 Type PromiseType { get; }
12 Type ResultType { get; }
13 13
14 14 /// <summary>
15 15 /// Обещание является выполненым, либо успешно, либо с ошибкой, либо отменено.
16 16 /// </summary>
17 bool IsResolved { get; }
17 bool IsFulfilled { get; }
18 18
19 /// <summary>
20 /// Обещание было отменено.
21 /// </summary>
22 bool IsCancelled { get; }
19 bool IsRejected { get; }
20
21 bool IsResolved { get; }
23 22
24 23 /// <summary>
25 24 /// Исключение возникшее в результате выполнения обещания, либо причина отмены.
26 25 /// </summary>
27 Exception Error { get; }
26 Exception RejectReason { get; }
28 27
29 28 /// <summary>
30 29 /// Adds specified listeners to the current promise.
31 30 /// </summary>
32 31 /// <param name="success">The handler called on the successful promise completion.</param>
33 32 /// <param name="error">The handler is called if an error while completing the promise occurred.</param>
34 /// <param name="cancel">The handler is called in case of promise cancellation.</param>
35 33 /// <returns>The current promise.</returns>
36 IPromise On(Action success, Action<Exception> error, Action<Exception> cancel);
37 IPromise On(Action success, Action<Exception> error);
38 IPromise On(Action success);
39
40 /// <summary>
41 /// Adds specified listeners to the current promise.
42 /// </summary>
43 /// <param name="handler">The handler called on the specified events.</param>
44 /// <param name = "events">The combination of flags denoting the events for which the
45 /// handler shoud be called.</param>
46 /// <returns>The current promise.</returns>
47 IPromise On(Action handler, PromiseEventType events);
34 void On(Action success, Action<Exception> error);
48 35
49 36 /// <summary>
50 37 /// Преобразует результат обещания к заданному типу и возвращает новое обещание.
51 38 /// </summary>
52 39 IPromise<T> Cast<T>();
53 40
54 41 /// <summary>
55 42 /// Синхронизирует текущий поток с обещанием.
56 43 /// </summary>
57 44 void Join();
58 45 /// <summary>
59 46 /// Синхронизирует текущий поток с обещанием.
60 47 /// </summary>
61 48 /// <param name="timeout">Время ожидания, по его истечению возникнет исключение.</param>
62 49 /// <exception cref="TimeoutException">Превышено время ожидания.</exception>
63 50 void Join(int timeout);
64 51
65 52 }
66 53 }
@@ -1,25 +1,12
1 1 using System;
2 2
3 3 namespace Implab {
4 4 public interface IPromise<out T> : IPromise {
5 5
6 IPromise<T> On(Action<T> success, Action<Exception> error, Action<Exception> cancel);
7
8 IPromise<T> On(Action<T> success, Action<Exception> error);
9
10 IPromise<T> On(Action<T> success);
6 void On(Action<T> success, Action<Exception> error);
11 7
12 8 new T Join();
13 9
14 10 new T Join(int timeout);
15
16 new IPromise<T> On(Action success, Action<Exception> error, Action<Exception> cancel);
17
18 new IPromise<T> On(Action success, Action<Exception> error);
19
20 new IPromise<T> On(Action success);
21
22 new IPromise<T> On(Action handler, PromiseEventType events);
23
24 11 }
25 12 }
@@ -1,207 +1,207
1 1 using Implab.Diagnostics;
2 2 using System;
3 3 using System.Diagnostics;
4 4 using System.Threading;
5 5
6 6 namespace Implab.Parallels {
7 7 public static class ArrayTraits {
8 8 class ArrayIterator<TSrc> : DispatchPool<int> {
9 9 readonly Action<TSrc> m_action;
10 10 readonly TSrc[] m_source;
11 11 readonly Promise<int> m_promise = new Promise<int>();
12 12 readonly LogicalOperation m_logicalOperation;
13 13
14 14 int m_pending;
15 15 int m_next;
16 16
17 17 public ArrayIterator(TSrc[] source, Action<TSrc> action, int threads)
18 18 : base(threads) {
19 19
20 20 Debug.Assert(source != null);
21 21 Debug.Assert(action != null);
22 22
23 23 m_logicalOperation = TraceContext.Instance.CurrentOperation;
24 24 m_next = 0;
25 25 m_source = source;
26 26 m_pending = source.Length;
27 27 m_action = action;
28 28
29 29 m_promise.On(Dispose, PromiseEventType.All);
30 30
31 31 InitPool();
32 32 }
33 33
34 34 public Promise<int> Promise {
35 35 get {
36 36 return m_promise;
37 37 }
38 38 }
39 39
40 40 protected override void Worker() {
41 41 TraceContext.Instance.EnterLogicalOperation(m_logicalOperation, false);
42 42 try {
43 43 base.Worker();
44 44 } finally {
45 45 TraceContext.Instance.Leave();
46 46 }
47 47 }
48 48
49 49 protected override bool TryDequeue(out int unit) {
50 50 unit = Interlocked.Increment(ref m_next) - 1;
51 51 return unit < m_source.Length;
52 52 }
53 53
54 54 protected override void InvokeUnit(int unit) {
55 55 try {
56 56 m_action(m_source[unit]);
57 57 var pending = Interlocked.Decrement(ref m_pending);
58 58 if (pending == 0)
59 59 m_promise.Resolve(m_source.Length);
60 60 } catch (Exception e) {
61 61 m_promise.Reject(e);
62 62 }
63 63 }
64 64 }
65 65
66 66 class ArrayMapper<TSrc, TDst>: DispatchPool<int> {
67 67 readonly Func<TSrc, TDst> m_transform;
68 68 readonly TSrc[] m_source;
69 69 readonly TDst[] m_dest;
70 70 readonly Promise<TDst[]> m_promise = new Promise<TDst[]>();
71 71 readonly LogicalOperation m_logicalOperation;
72 72
73 73 int m_pending;
74 74 int m_next;
75 75
76 76 public ArrayMapper(TSrc[] source, Func<TSrc, TDst> transform, int threads)
77 77 : base(threads) {
78 78
79 79 Debug.Assert (source != null);
80 80 Debug.Assert( transform != null);
81 81
82 82 m_next = 0;
83 83 m_source = source;
84 84 m_dest = new TDst[source.Length];
85 85 m_pending = source.Length;
86 86 m_transform = transform;
87 87 m_logicalOperation = TraceContext.Instance.CurrentOperation;
88 88
89 89 m_promise.On(Dispose, PromiseEventType.All);
90 90
91 91 InitPool();
92 92 }
93 93
94 94 public Promise<TDst[]> Promise {
95 95 get {
96 96 return m_promise;
97 97 }
98 98 }
99 99
100 100 protected override void Worker() {
101 101 TraceContext.Instance.EnterLogicalOperation(m_logicalOperation,false);
102 102 try {
103 103 base.Worker();
104 104 } finally {
105 105 TraceContext.Instance.Leave();
106 106 }
107 107 }
108 108
109 109 protected override bool TryDequeue(out int unit) {
110 110 unit = Interlocked.Increment(ref m_next) - 1;
111 111 return unit < m_source.Length;
112 112 }
113 113
114 114 protected override void InvokeUnit(int unit) {
115 115 try {
116 116 m_dest[unit] = m_transform(m_source[unit]);
117 117 var pending = Interlocked.Decrement(ref m_pending);
118 118 if (pending == 0)
119 119 m_promise.Resolve(m_dest);
120 120 } catch (Exception e) {
121 121 m_promise.Reject(e);
122 122 }
123 123 }
124 124 }
125 125
126 126 public static IPromise<TDst[]> ParallelMap<TSrc, TDst> (this TSrc[] source, Func<TSrc,TDst> transform, int threads) {
127 127 if (source == null)
128 128 throw new ArgumentNullException("source");
129 129 if (transform == null)
130 130 throw new ArgumentNullException("transform");
131 131
132 132 var mapper = new ArrayMapper<TSrc, TDst>(source, transform, threads);
133 133 return mapper.Promise;
134 134 }
135 135
136 136 public static IPromise<int> ParallelForEach<TSrc>(this TSrc[] source, Action<TSrc> action, int threads) {
137 137 if (source == null)
138 138 throw new ArgumentNullException("source");
139 139 if (action == null)
140 140 throw new ArgumentNullException("action");
141 141
142 142 var iter = new ArrayIterator<TSrc>(source, action, threads);
143 143 return iter.Promise;
144 144 }
145 145
146 146 public static IPromise<TDst[]> ChainedMap<TSrc, TDst>(this TSrc[] source, Func<TSrc, IPromise<TDst>> transform, int threads) {
147 147 if (source == null)
148 148 throw new ArgumentNullException("source");
149 149 if (transform == null)
150 150 throw new ArgumentNullException("transform");
151 151 if (threads <= 0)
152 152 throw new ArgumentOutOfRangeException("threads","Threads number must be greater then zero");
153 153
154 154 if (source.Length == 0)
155 155 return Promise<TDst[]>.FromResult(new TDst[0]);
156 156
157 157 var promise = new Promise<TDst[]>();
158 158 var res = new TDst[source.Length];
159 159 var pending = source.Length;
160 160
161 161 object locker = new object();
162 162 int slots = threads;
163 163
164 164 // Analysis disable AccessToDisposedClosure
165 165 AsyncPool.RunThread<int>(() => {
166 166 for (int i = 0; i < source.Length; i++) {
167 if(promise.IsResolved)
167 if(promise.IsFulfilled)
168 168 break; // stop processing in case of error or cancellation
169 169 var idx = i;
170 170
171 171 if (Interlocked.Decrement(ref slots) < 0) {
172 172 lock(locker) {
173 173 while(slots < 0)
174 174 Monitor.Wait(locker);
175 175 }
176 176 }
177 177
178 178 try {
179 179 transform(source[i])
180 180 .On( x => {
181 181 Interlocked.Increment(ref slots);
182 182 lock (locker) {
183 183 Monitor.Pulse(locker);
184 184 }
185 185 })
186 186 .On(
187 187 x => {
188 188 res[idx] = x;
189 189 var left = Interlocked.Decrement(ref pending);
190 190 if (left == 0)
191 191 promise.Resolve(res);
192 192 },
193 193 promise.Reject
194 194 );
195 195
196 196 } catch (Exception e) {
197 197 promise.Reject(e);
198 198 }
199 199 }
200 200 return 0;
201 201 });
202 202
203 203 return promise;
204 204 }
205 205
206 206 }
207 207 }
@@ -1,131 +1,140
1 1 using System.Threading;
2 2 using System.Collections.Generic;
3 3 using System;
4 4 using System.Collections;
5 5
6 6 namespace Implab.Parallels {
7 7 public class SimpleAsyncQueue<T> : IEnumerable<T> {
8 8 class Node {
9 9 public Node(T value) {
10 10 this.value = value;
11 11 }
12 12 public readonly T value;
13 13 public volatile Node next;
14 14 }
15 15
16 16 // the reader and the writer are mainteined completely independent,
17 17 // the reader can read next item when m_first.next is not null
18 // the writer creates the a new node, moves m_last to this node and
18 // the writer creates a new node, moves m_last to this node and
19 19 // only after that restores the reference from the previous node
20 // making available the reader to read the new node.
20 // making the reader be able to read the new node.
21 21
22 Node m_first; // position on the node which is already read
23 Node m_last; // position on the node which is already written
22 volatile Node m_first; // position on the node which is already read
23 volatile Node m_last; // position on the node which is already written
24 24
25 25 public SimpleAsyncQueue() {
26 26 m_first = m_last = new Node(default(T));
27 27 }
28 28
29 29 public void Enqueue(T value) {
30 30 var next = new Node(value);
31 31
32 32 // Interlocaked.CompareExchange implies Thread.MemoryBarrier();
33 33 // to ensure that the next node is completely constructed
34 34 var last = Interlocked.Exchange(ref m_last, next);
35 35
36 36 // release-fence
37 37 last.next = next;
38 38
39 39 }
40 40
41 41 public bool TryDequeue(out T value) {
42 Node first;
43 Node next;
42 Node first = m_first; ;
43 Node next = first.next; ;
44 44
45 Thread.MemoryBarrier(); // ensure m_first is fresh
45 if (next == null) {
46 value = default(T);
47 return false;
48 }
49
50 var first2 = Interlocked.CompareExchange(ref m_first, next, first);
51
52 if (first != first2) {
53 // head is updated by someone else
54
46 55 SpinWait spin = new SpinWait();
47 56 do {
48 first = m_first;
49 // aquire-fence
57 first = first2;
50 58 next = first.next;
51 59 if (next == null) {
52 60 value = default(T);
53 61 return false;
54 62 }
55 63
56 if (first == Interlocked.CompareExchange(ref m_first, next, first))
57 // head succesfully updated
64 first2 = Interlocked.CompareExchange(ref m_first, next, first);
65 if (first == first2)
58 66 break;
59 67 spin.SpinOnce();
60 68 } while (true);
69 }
61 70
62 71 value = next.value;
63 72 return true;
64 73 }
65 74
66 75 #region IEnumerable implementation
67 76
68 77 class Enumerator : IEnumerator<T> {
69 78 Node m_current;
70 79 Node m_first;
71 80
72 81 public Enumerator(Node first) {
73 82 m_first = first;
74 83 }
75 84
76 85 #region IEnumerator implementation
77 86
78 87 public bool MoveNext() {
79 88 m_current = m_current == null ? m_first : m_current.next;
80 89 return m_current != null;
81 90 }
82 91
83 92 public void Reset() {
84 93 m_current = null;
85 94 }
86 95
87 96 object IEnumerator.Current {
88 97 get {
89 98 if (m_current == null)
90 99 throw new InvalidOperationException();
91 100 return m_current.value;
92 101 }
93 102 }
94 103
95 104 #endregion
96 105
97 106 #region IDisposable implementation
98 107
99 108 public void Dispose() {
100 109 }
101 110
102 111 #endregion
103 112
104 113 #region IEnumerator implementation
105 114
106 115 public T Current {
107 116 get {
108 117 if (m_current == null)
109 118 throw new InvalidOperationException();
110 119 return m_current.value;
111 120 }
112 121 }
113 122
114 123 #endregion
115 124 }
116 125
117 126 public IEnumerator<T> GetEnumerator() {
118 127 return new Enumerator(m_first);
119 128 }
120 129
121 130 #endregion
122 131
123 132 #region IEnumerable implementation
124 133
125 134 IEnumerator IEnumerable.GetEnumerator() {
126 135 return GetEnumerator();
127 136 }
128 137
129 138 #endregion
130 139 }
131 140 }
@@ -1,28 +1,28
1 1 using System;
2 2 using System.Runtime.CompilerServices;
3 3
4 4 namespace Implab {
5 5 public struct PromiseAwaiter : INotifyCompletion {
6 6 readonly IPromise m_promise;
7 7
8 8 public PromiseAwaiter(IPromise promise) {
9 9 m_promise = promise;
10 10 }
11 11
12 12 public void OnCompleted (Action continuation) {
13 13 if (m_promise != null)
14 14 m_promise.On(continuation, PromiseEventType.All);
15 15 }
16 16
17 17 public void GetResult() {
18 18 m_promise.Join();
19 19 }
20 20
21 21 public bool IsCompleted {
22 22 get {
23 return m_promise.IsResolved;
23 return m_promise.IsFulfilled;
24 24 }
25 25 }
26 26 }
27 27 }
28 28
@@ -1,28 +1,28
1 1 using System;
2 2 using System.Runtime.CompilerServices;
3 3
4 4 namespace Implab {
5 5 public struct PromiseAwaiter<T> : INotifyCompletion {
6 6 readonly IPromise<T> m_promise;
7 7
8 8 public PromiseAwaiter(IPromise<T> promise) {
9 9 m_promise = promise;
10 10 }
11 11
12 12 public void OnCompleted (Action continuation) {
13 13 if (m_promise != null)
14 14 m_promise.On(continuation, PromiseEventType.All);
15 15 }
16 16
17 17 public T GetResult() {
18 18 return m_promise.Join();
19 19 }
20 20
21 21 public bool IsCompleted {
22 22 get {
23 return m_promise.IsResolved;
23 return m_promise.IsFulfilled;
24 24 }
25 25 }
26 26 }
27 27 }
28 28
@@ -1,99 +1,99
1 1 using System;
2 2
3 3 namespace Implab {
4 4 public class SuccessPromise : IPromise {
5 5 #region IPromise implementation
6 6
7 7 public IPromise On(Action success, Action<Exception> error, Action<Exception> cancel) {
8 8 if (success != null) {
9 9 try {
10 10 success();
11 11 // Analysis disable once EmptyGeneralCatchClause
12 12 } catch {
13 13 }
14 14 }
15 15 return this;
16 16 }
17 17
18 18 public IPromise On(Action success, Action<Exception> error) {
19 19 if (success != null) {
20 20 try {
21 21 success();
22 22 // Analysis disable once EmptyGeneralCatchClause
23 23 } catch {
24 24 }
25 25 }
26 26 return this;
27 27 }
28 28
29 29 public IPromise On(Action success) {
30 30 if (success != null) {
31 31 try {
32 32 success();
33 33 // Analysis disable once EmptyGeneralCatchClause
34 34 } catch {
35 35 }
36 36 }
37 37 return this;
38 38 }
39 39
40 40 public IPromise On(Action handler, PromiseEventType events) {
41 41 if (handler != null && events.HasFlag(PromiseEventType.Success)) {
42 42 try {
43 43 handler();
44 44 // Analysis disable once EmptyGeneralCatchClause
45 45 } catch {
46 46 }
47 47 }
48 48 return this;
49 49 }
50 50
51 51 public IPromise<T> Cast<T>() {
52 52 throw new InvalidCastException();
53 53 }
54 54
55 55 public void Join() {
56 56 }
57 57
58 58 public void Join(int timeout) {
59 59 }
60 60
61 public Type PromiseType {
61 public Type ResultType {
62 62 get {
63 63 return typeof(void);
64 64 }
65 65 }
66 66
67 public bool IsResolved {
67 public bool IsFulfilled {
68 68 get {
69 69 return true;
70 70 }
71 71 }
72 72
73 73 public bool IsCancelled {
74 74 get {
75 75 return false;
76 76 }
77 77 }
78 78
79 public Exception Error {
79 public Exception RejectReason {
80 80 get {
81 81 return null;
82 82 }
83 83 }
84 84
85 85 #endregion
86 86
87 87 #region ICancellable implementation
88 88
89 89 public void Cancel() {
90 90 }
91 91
92 92 public void Cancel(Exception reason) {
93 93 }
94 94
95 95 #endregion
96 96
97 97 }
98 98 }
99 99
@@ -1,153 +1,153
1 1 using System;
2 2
3 3 namespace Implab {
4 4 public class SuccessPromise<T> : IPromise<T> {
5 5 readonly T m_value;
6 6
7 7 public SuccessPromise(T value){
8 8 m_value = value;
9 9 }
10 10
11 11 public IPromise<T> On(Action<T> success, Action<Exception> error, Action<Exception> cancel) {
12 12 if (success != null) {
13 13 try {
14 14 success(m_value);
15 15 // Analysis disable once EmptyGeneralCatchClause
16 16 } catch {
17 17 }
18 18 }
19 19 return this;
20 20 }
21 21
22 22 public IPromise<T> On(Action<T> success, Action<Exception> error) {
23 23 if (success != null) {
24 24 try {
25 25 success(m_value);
26 26 // Analysis disable once EmptyGeneralCatchClause
27 27 } catch {
28 28 }
29 29 }
30 30 return this;
31 31 }
32 32
33 33 public IPromise<T> On(Action<T> success) {
34 34 if (success != null) {
35 35 try {
36 36 success(m_value);
37 37 // Analysis disable once EmptyGeneralCatchClause
38 38 } catch {
39 39 }
40 40 }
41 41 return this;
42 42 }
43 43
44 44 public T Join() {
45 45 return m_value;
46 46 }
47 47
48 48 public T Join(int timeout) {
49 49 return m_value;
50 50 }
51 51
52 52 public IPromise<T> On(Action success, Action<Exception> error, Action<Exception> cancel) {
53 53 if (success != null) {
54 54 try {
55 55 success();
56 56 // Analysis disable once EmptyGeneralCatchClause
57 57 } catch {
58 58 }
59 59 }
60 60 return this;
61 61 }
62 62
63 63 public IPromise<T> On(Action success, Action<Exception> error) {
64 64 if (success != null) {
65 65 try {
66 66 success();
67 67 // Analysis disable once EmptyGeneralCatchClause
68 68 } catch {
69 69 }
70 70 }
71 71 return this;
72 72 }
73 73
74 74 public IPromise<T> On(Action success) {
75 75 if (success != null) {
76 76 try {
77 77 success();
78 78 // Analysis disable once EmptyGeneralCatchClause
79 79 } catch {
80 80 }
81 81 }
82 82 return this;
83 83 }
84 84
85 85 public IPromise<T> On(Action handler, PromiseEventType events) {
86 86 if (handler != null && events.HasFlag(PromiseEventType.Success)) {
87 87 try {
88 88 handler();
89 89 // Analysis disable once EmptyGeneralCatchClause
90 90 } catch {
91 91 }
92 92 }
93 93 return this;
94 94 }
95 95
96 96 IPromise IPromise.On(Action success, Action<Exception> error, Action<Exception> cancel) {
97 97 return On(success, error, cancel);
98 98 }
99 99
100 100 IPromise IPromise.On(Action success, Action<Exception> error) {
101 101 return On(success, error);
102 102 }
103 103
104 104 IPromise IPromise.On(Action success) {
105 105 return On(success);
106 106 }
107 107
108 108 IPromise IPromise.On(Action handler, PromiseEventType events) {
109 109 return On(handler, events);
110 110 }
111 111
112 112 public IPromise<T2> Cast<T2>() {
113 113 return new SuccessPromise<T2>((T2)(object)m_value);
114 114 }
115 115
116 116 void IPromise.Join() {
117 117 }
118 118
119 119 void IPromise.Join(int timeout) {
120 120 }
121 121
122 public Type PromiseType {
122 public Type ResultType {
123 123 get {
124 124 return typeof(T);
125 125 }
126 126 }
127 127
128 public bool IsResolved {
128 public bool IsFulfilled {
129 129 get {
130 130 return true;
131 131 }
132 132 }
133 133
134 134 public bool IsCancelled {
135 135 get {
136 136 return false;
137 137 }
138 138 }
139 139
140 public Exception Error {
140 public Exception RejectReason {
141 141 get {
142 142 return null;
143 143 }
144 144 }
145 145
146 146 public void Cancel() {
147 147 }
148 148
149 149 public void Cancel(Exception reason) {
150 150 }
151 151 }
152 152 }
153 153
@@ -1,5 +1,6
1 1 <?xml version="1.0" encoding="utf-8"?>
2 2 <repositories>
3 3 <repository path="../Implab.Test/Implab.Format.Test/packages.config" />
4 <repository path="../Implab.Test/packages.config" />
4 5 <repository path="../MonoPlay/packages.config" />
5 6 </repositories> No newline at end of file
General Comments 3
Under Review
author

Auto status change to "Under Review"

Approved
author

ok, latest stable version should be in default

You need to be logged in to leave comments. Login now