##// END OF EJS Templates
Promises rewritten, added improved version of AsyncQueue
cin -
r119:2573b562e328 v2
parent child
Show More
@@ -0,0 +1,219
1 using System;
2 using Implab.Parallels;
3 using System.Threading;
4 using System.Reflection;
5
6 namespace Implab {
7 public abstract class AbstractPromise<THandler> {
8
9 const int UNRESOLVED_SATE = 0;
10 const int TRANSITIONAL_STATE = 1;
11 const int SUCCEEDED_STATE = 2;
12 const int REJECTED_STATE = 3;
13 const int CANCELLED_STATE = 4;
14
15 int m_state;
16 Exception m_error;
17
18 readonly AsyncQueue<THandler> m_handlers = new AsyncQueue<THandler>();
19
20 #region state managment
21 bool BeginTransit() {
22 return UNRESOLVED_SATE == Interlocked.CompareExchange(ref m_state, TRANSITIONAL_STATE, UNRESOLVED_SATE);
23 }
24
25 void CompleteTransit(int state) {
26 if (TRANSITIONAL_STATE != Interlocked.CompareExchange(ref m_state, state, TRANSITIONAL_STATE))
27 throw new InvalidOperationException("Can't complete transition when the object isn't in the transitional state");
28 }
29
30 void WaitTransition() {
31 while (m_state == TRANSITIONAL_STATE) {
32 Thread.MemoryBarrier();
33 }
34 }
35
36 protected void BeginSetResult() {
37 if (!BeginTransit()) {
38 WaitTransition();
39 if (m_state != CANCELLED_STATE)
40 throw new InvalidOperationException("The promise is already resolved");
41 }
42 }
43
44 protected void EndSetResult() {
45 CompleteTransit(SUCCEEDED_STATE);
46 OnSuccess();
47 }
48
49
50
51 /// <summary>
52 /// Выполняет обещание, сообщая об ошибке
53 /// </summary>
54 /// <remarks>
55 /// Поскольку обещание должно работать в многопточной среде, при его выполнении сразу несколько потоков
56 /// могу вернуть ошибку, при этом только первая будет использована в качестве результата, остальные
57 /// будут проигнорированы.
58 /// </remarks>
59 /// <param name="error">Исключение возникшее при выполнении операции</param>
60 /// <exception cref="InvalidOperationException">Данное обещание уже выполнено</exception>
61 protected void SetError(Exception error) {
62 if (BeginTransit()) {
63 m_error = error is PromiseTransientException ? error.InnerException : error;
64 CompleteTransit(REJECTED_STATE);
65 OnError();
66 } else {
67 WaitTransition();
68 if (m_state == SUCCEEDED_STATE)
69 throw new InvalidOperationException("The promise is already resolved");
70 }
71 }
72
73 /// <summary>
74 /// Отменяет операцию, если это возможно.
75 /// </summary>
76 /// <remarks>Для определения была ли операция отменена следует использовать свойство <see cref="IsCancelled"/>.</remarks>
77 protected void SetCancelled() {
78 if (BeginTransit()) {
79 CompleteTransit(CANCELLED_STATE);
80 OnCancelled();
81 }
82 }
83
84 protected abstract void SignalSuccess(THandler handler);
85
86 protected abstract void SignalError(THandler handler, Exception error);
87
88 protected abstract void SignalCancelled(THandler handler);
89
90 void OnSuccess() {
91 THandler handler;
92 while (m_handlers.TryDequeue(out handler))
93 SignalSuccess(handler);
94 }
95
96 void OnError() {
97 THandler handler;
98 while (m_handlers.TryDequeue(out handler))
99 SignalError(handler,m_error);
100 }
101
102 void OnCancelled() {
103 THandler handler;
104 while (m_handlers.TryDequeue(out handler))
105 SignalCancelled(handler);
106 }
107
108 #endregion
109
110 protected abstract void Listen(PromiseEventType events, Action handler);
111
112 #region synchronization traits
113 protected void WaitResult(int timeout) {
114 if (!IsResolved) {
115 var lk = new object();
116
117 Listen(PromiseEventType.All, () => {
118 lock(lk) {
119 Monitor.Pulse(lk);
120 }
121 });
122
123 lock (lk) {
124 while(!IsResolved) {
125 if(!Monitor.Wait(lk,timeout))
126 throw new TimeoutException();
127 }
128 }
129
130 }
131 switch (m_state) {
132 case SUCCEEDED_STATE:
133 return;
134 case CANCELLED_STATE:
135 throw new OperationCanceledException();
136 case REJECTED_STATE:
137 throw new TargetInvocationException(m_error);
138 default:
139 throw new ApplicationException(String.Format("Invalid promise state {0}", m_state));
140 }
141 }
142 #endregion
143
144 #region handlers managment
145
146 protected void AddHandler(THandler handler) {
147
148 if (IsResolved) {
149 InvokeHandler(handler);
150
151 } else {
152 // the promise is in the resolved state, just invoke the handler
153 m_handlers.Enqueue(handler);
154
155
156 if (IsResolved && m_handlers.TryDequeue(out handler))
157 // if the promise have been resolved while we was adding the handler to the queue
158 // we can't guarantee that someone is still processing it
159 // therefore we need to fetch a handler from the queue and execute it
160 // note that fetched handler may be not the one that we have added
161 // even we can fetch no handlers at all :)
162 InvokeHandler(handler);
163 }
164 }
165
166 protected void InvokeHandler(THandler handler) {
167 switch (m_state) {
168 case SUCCEEDED_STATE:
169 SignalSuccess(handler);
170 break;
171 case CANCELLED_STATE:
172 SignalCancelled(handler);
173 break;
174 case REJECTED_STATE:
175 SignalError(handler, m_error);
176 break;
177 default:
178 throw new Exception(String.Format("Invalid promise state {0}", m_state));
179 }
180 }
181
182 #endregion
183
184 #region IPromise implementation
185
186 public void Join(int timeout) {
187 WaitResult(timeout);
188 }
189
190 public void Join() {
191 WaitResult(-1);
192 }
193
194 public bool IsResolved {
195 get {
196 Thread.MemoryBarrier();
197 return m_state > 1;
198 }
199 }
200
201 public bool IsCancelled {
202 get {
203 Thread.MemoryBarrier();
204 return m_state == CANCELLED_STATE;
205 }
206 }
207
208 #endregion
209
210 #region ICancellable implementation
211
212 public void Cancel() {
213 SetCancelled();
214 }
215
216 #endregion
217 }
218 }
219
@@ -0,0 +1,14
1 using System;
2
3 namespace Implab {
4 /// <summary>
5 /// Deferred result, usually used by asynchronous services as the service part of the promise.
6 /// </summary>
7 public interface IDeferred : ICancellable {
8
9 void Resolve();
10
11 void Reject(Exception error);
12 }
13 }
14
@@ -0,0 +1,10
1 using System;
2
3 namespace Implab {
4 public interface IDeferred<T> : ICancellable {
5 void Resolve(T value);
6
7 void Reject(Exception error);
8 }
9 }
10
@@ -0,0 +1,244
1 using System.Threading;
2 using System.Collections.Generic;
3 using System;
4 using System.Collections;
5
6 namespace Implab.Parallels {
7 public class AsyncQueue<T> : IEnumerable<T> {
8 class Chunk {
9 public Chunk next;
10
11 int m_low;
12 int m_hi;
13 int m_alloc;
14 readonly int m_size;
15 readonly T[] m_data;
16
17 public Chunk(int size) {
18 m_size = size;
19 m_data = new T[size];
20 }
21
22 public Chunk(int size, T value) {
23 m_size = size;
24 m_hi = 1;
25 m_alloc = 1;
26 m_data = new T[size];
27 m_data[0] = value;
28 }
29
30 public int Low {
31 get { return m_low; }
32 }
33
34 public int Hi {
35 get { return m_hi; }
36 }
37
38 public bool TryEnqueue(T value,out bool extend) {
39 extend = false;
40 int alloc;
41 do {
42 alloc = m_alloc;
43 if (alloc > m_size)
44 return false;
45 } while(alloc != Interlocked.CompareExchange(ref m_alloc, alloc+1, alloc));
46
47 if (alloc == m_size) {
48 extend = true;
49 return false;
50 }
51
52 m_data[alloc] = value;
53
54 while (alloc != Interlocked.CompareExchange(ref m_hi, alloc + 1, alloc)) {
55 // spin wait for commit
56 }
57 return true;
58 }
59
60 public bool TryDequeue(out T value,out bool recycle) {
61 int low;
62 do {
63 low = m_low;
64 if (low >= m_hi) {
65 value = default(T);
66 recycle = (low == m_size);
67 return false;
68 }
69 } while(low != Interlocked.CompareExchange(ref m_low, low + 1, low));
70
71 recycle = (low == m_size - 1);
72 value = m_data[low];
73
74 return true;
75 }
76
77 public T GetAt(int pos) {
78 return m_data[pos];
79 }
80 }
81
82 public const int DEFAULT_CHUNK_SIZE = 32;
83
84 readonly int m_chunkSize = DEFAULT_CHUNK_SIZE;
85
86 Chunk m_first;
87 Chunk m_last;
88
89 public AsyncQueue() {
90 m_last = m_first = new Chunk(m_chunkSize);
91 }
92
93 public void Enqueue(T value) {
94 var last = m_last;
95 // spin wait to the new chunk
96 bool extend = true;
97 while(last == null || !last.TryEnqueue(value, out extend)) {
98 // try to extend queue
99 if (extend || last == null) {
100 var chunk = new Chunk(m_chunkSize, value);
101 if (EnqueueChunk(last, chunk))
102 break;
103 last = m_last;
104 } else {
105 while (last != m_last) {
106 Thread.MemoryBarrier();
107 last = m_last;
108 }
109 }
110 }
111 }
112
113 public bool TryDequeue(out T value) {
114 var chunk = m_first;
115 bool recycle;
116 while (chunk != null) {
117
118 var result = chunk.TryDequeue(out value, out recycle);
119
120 if (recycle) // this chunk is waste
121 RecycleFirstChunk(chunk);
122 else
123 return result; // this chunk is usable and returned actual result
124
125 if (result) // this chunk is waste but the true result is always actual
126 return true;
127
128 // try again
129 chunk = m_first;
130 }
131
132 // the queue is empty
133 value = default(T);
134 return false;
135 }
136
137 bool EnqueueChunk(Chunk last, Chunk chunk) {
138 if (Interlocked.CompareExchange(ref m_last, chunk, last) != last)
139 return false;
140
141 if (last != null)
142 last.next = chunk;
143 else
144 m_first = chunk;
145 return true;
146 }
147
148 void RecycleFirstChunk(Chunk first) {
149 var next = first.next;
150
151 if (next == null) {
152 // looks like this is the last chunk
153 if (first != Interlocked.CompareExchange(ref m_last, null, first)) {
154 // race
155 // maybe someone already recycled this chunk
156 // or a new chunk has been appedned to the queue
157
158 return; // give up
159 }
160 // the tail is updated
161 }
162
163 // we need to update the head
164 Interlocked.CompareExchange(ref m_first, next, first);
165 // if the head is already updated then give up
166 return;
167
168 }
169
170 #region IEnumerable implementation
171
172 class Enumerator : IEnumerator<T> {
173 Chunk m_current;
174 int m_pos = -1;
175
176 public Enumerator(Chunk fisrt) {
177 m_current = fisrt;
178 }
179
180 #region IEnumerator implementation
181
182 public bool MoveNext() {
183 if (m_current == null)
184 return false;
185
186 if (m_pos == -1)
187 m_pos = m_current.Low;
188 else
189 m_pos++;
190 if (m_pos == m_current.Hi) {
191 m_pos = 0;
192 m_current = m_current.next;
193 }
194
195 return true;
196 }
197
198 public void Reset() {
199 throw new NotSupportedException();
200 }
201
202 object IEnumerator.Current {
203 get {
204 return Current;
205 }
206 }
207
208 #endregion
209
210 #region IDisposable implementation
211
212 public void Dispose() {
213 }
214
215 #endregion
216
217 #region IEnumerator implementation
218
219 public T Current {
220 get {
221 if (m_pos == -1 || m_current == null)
222 throw new InvalidOperationException();
223 return m_current.GetAt(m_pos);
224 }
225 }
226
227 #endregion
228 }
229
230 public IEnumerator<T> GetEnumerator() {
231 return new Enumerator(m_first);
232 }
233
234 #endregion
235
236 #region IEnumerable implementation
237
238 IEnumerator IEnumerable.GetEnumerator() {
239 return GetEnumerator();
240 }
241
242 #endregion
243 }
244 }
This diff has been collapsed as it changes many lines, (621 lines changed) Show them Hide them
@@ -0,0 +1,621
1 using System;
2 using System.Diagnostics;
3
4 namespace Implab {
5
6 /// <summary>
7 /// Класс для асинхронного получения результатов. Так называемое "обещание".
8 /// </summary>
9 /// <typeparam name="T">Тип получаемого результата</typeparam>
10 /// <remarks>
11 /// <para>Сервис при обращении к его методу дает обещаиние о выполнении операции,
12 /// клиент получив такое обещание может установить ряд обратных вызово для получения
13 /// событий выполнения обещания, тоесть завершения операции и предоставлении результатов.</para>
14 /// <para>
15 /// Обещение может быть как выполнено, так и выполнено с ошибкой. Для подписки на
16 /// данные события клиент должен использовать методы <c>Then</c>.
17 /// </para>
18 /// <para>
19 /// Сервис, в свою очередь, по окончанию выполнения операции (возможно с ошибкой),
20 /// использует методы <c>Resolve</c> либо <c>Reject</c> для оповещения клиетна о
21 /// выполнении обещания.
22 /// </para>
23 /// <para>
24 /// Если сервер успел выполнить обещание еще до того, как клиент на него подписался,
25 /// то в момент подписки клиента будут вызваны соответсвующие события в синхронном
26 /// режиме и клиент будет оповещен в любом случае. Иначе, обработчики добавляются в
27 /// список в порядке подписания и в этом же порядке они будут вызваны при выполнении
28 /// обещания.
29 /// </para>
30 /// <para>
31 /// Обрабатывая результаты обещания можно преобразовывать результаты либо инициировать
32 /// связанные асинхронные операции, которые также возвращают обещания. Для этого следует
33 /// использовать соответствующую форму методе <c>Then</c>.
34 /// </para>
35 /// <para>
36 /// Также хорошим правилом является то, что <c>Resolve</c> и <c>Reject</c> должен вызывать
37 /// только инициатор обещания иначе могут возникнуть противоречия.
38 /// </para>
39 /// </remarks>
40 public class Promise<T> : AbstractPromise<IDeferred<T>>, IPromise<T>, IDeferred<T> {
41
42 class StubDeferred : IDeferred<T> {
43 public static readonly StubDeferred instance = new StubDeferred();
44
45 StubDeferred() {
46 }
47
48 #region IDeferred implementation
49
50 public void Resolve(T value) {
51 }
52
53 public void Reject(Exception error) {
54 }
55
56 #endregion
57
58 #region ICancellable implementation
59
60 public void Cancel() {
61 }
62
63 #endregion
64
65
66 }
67
68 class RemapDescriptor<T2> : IDeferred<T> {
69 readonly Func<T,T2> m_remap;
70 readonly Func<Exception,T2> m_failed;
71 readonly Func<T2> m_cancel;
72 readonly IDeferred<T2> m_deferred;
73
74 public RemapDescriptor(Func<T,T2> remap, Func<Exception,T2> failed, Func<T2> cancel, IDeferred<T2> deferred ) {
75 Debug.Assert(deferred != null);
76 m_remap = remap;
77 m_failed = failed;
78 m_cancel = cancel;
79 m_deferred = deferred;
80 }
81
82
83
84 #region IDeferred implementation
85
86 public void Resolve(T value) {
87 if (m_remap != null) {
88 try {
89 m_deferred.Resolve(m_remap(value));
90 } catch (Exception ex) {
91 Reject(ex);
92 }
93 }
94 }
95
96 public void Reject(Exception error) {
97 if (m_failed != null) {
98 try {
99 m_deferred.Resolve(m_failed(error));
100 } catch (Exception ex) {
101 m_deferred.Reject(ex);
102 }
103 } else {
104 m_deferred.Reject(error);
105 }
106 }
107
108
109 #endregion
110
111 #region ICancellable implementation
112
113 public void Cancel() {
114 if (m_cancel != null) {
115 try {
116 m_deferred.Resolve(m_cancel());
117 } catch (Exception ex) {
118 Reject(ex);
119 }
120 } else {
121 m_deferred.Cancel();
122 }
123 }
124
125 #endregion
126 }
127
128 class ListenerDescriptor : IDeferred<T> {
129 readonly Action m_handler;
130 readonly PromiseEventType m_events;
131
132 public ListenerDescriptor(Action handler, PromiseEventType events) {
133 Debug.Assert(handler != null);
134
135 m_handler = handler;
136 m_events = events;
137 }
138
139 #region IDeferred implementation
140
141 public void Resolve(T value) {
142 if (m_events.HasFlag(PromiseEventType.Success)) {
143 try {
144 m_handler();
145 // Analysis disable once EmptyGeneralCatchClause
146 } catch {
147 }
148 }
149 }
150
151 public void Reject(Exception error) {
152 if (m_events.HasFlag(PromiseEventType.Error)){
153 try {
154 m_handler();
155 // Analysis disable once EmptyGeneralCatchClause
156 } catch {
157 }
158 }
159 }
160
161 #endregion
162
163 #region ICancellable implementation
164
165 public void Cancel() {
166 if (m_events.HasFlag(PromiseEventType.Cancelled)){
167 try {
168 m_handler();
169 // Analysis disable once EmptyGeneralCatchClause
170 } catch {
171 }
172 }
173 }
174
175 #endregion
176 }
177
178 class ValueEventDescriptor : IDeferred<T> {
179 readonly Action<T> m_success;
180 readonly Action<Exception> m_failed;
181 readonly Action m_cancelled;
182 readonly IDeferred<T> m_deferred;
183
184 public ValueEventDescriptor(Action<T> success, Action<Exception> failed, Action cancelled, IDeferred<T> deferred) {
185 Debug.Assert(deferred != null);
186
187 m_success = success;
188 m_failed = failed;
189 m_cancelled = cancelled;
190 m_deferred = deferred;
191 }
192
193 #region IDeferred implementation
194
195 public void Resolve(T value) {
196 if (m_success != null) {
197 try {
198 m_success(value);
199 m_deferred.Resolve(value);
200 } catch (Exception ex) {
201 Reject(ex);
202 }
203 }
204 }
205
206 public void Reject(Exception error) {
207 if (m_failed != null) {
208 try {
209 m_failed(error);
210 m_deferred.Resolve(default(T));
211 } catch(Exception ex) {
212 m_deferred.Reject(ex);
213 }
214 } else {
215 m_deferred.Reject(error);
216 }
217 }
218
219 #endregion
220
221 #region ICancellable implementation
222
223 public void Cancel() {
224 if (m_cancelled != null) {
225 try {
226 m_cancelled();
227 m_deferred.Resolve(default(T));
228 } catch(Exception ex) {
229 Reject(ex);
230 }
231 } else {
232 m_deferred.Cancel();
233 }
234 }
235
236 #endregion
237 }
238
239 public class EventDescriptor : IDeferred<T> {
240 readonly Action m_success;
241 readonly Action<Exception> m_failed;
242 readonly Action m_cancelled;
243 readonly IDeferred<T> m_deferred;
244
245 public EventDescriptor(Action success, Action<Exception> failed, Action cancelled, IDeferred<T> deferred) {
246 Debug.Assert(deferred != null);
247
248 m_success = success;
249 m_failed = failed;
250 m_cancelled = cancelled;
251 m_deferred = deferred;
252 }
253
254 #region IDeferred implementation
255
256 public void Resolve(T value) {
257 if (m_success != null) {
258 try {
259 m_success();
260 m_deferred.Resolve(value);
261 } catch (Exception ex) {
262 Reject(ex);
263 }
264 }
265 }
266
267 public void Reject(Exception error) {
268 if (m_failed != null) {
269 try {
270 m_failed(error);
271 m_deferred.Resolve(default(T));
272 }catch (Exception ex)
273 {
274 m_deferred.Reject(ex);
275 }
276 } else {
277 m_deferred.Reject(error);
278 }
279
280 }
281
282 #endregion
283
284 #region ICancellable implementation
285
286 public void Cancel() {
287 if (m_cancelled != null) {
288 try {
289 m_cancelled();
290 m_deferred.Resolve(default(T));
291 } catch (Exception ex) {
292 Reject(ex);
293 }
294 } else {
295 m_deferred.Cancel();
296 }
297 }
298
299 #endregion
300 }
301
302 T m_result;
303
304 public virtual void Resolve(T value) {
305 BeginSetResult();
306 m_result = value;
307 EndSetResult();
308 }
309
310 public void Reject(Exception error) {
311 SetError(error);
312 }
313
314 public Type PromiseType {
315 get {
316 return typeof(T);
317 }
318 }
319
320 public new T Join() {
321 WaitResult(-1);
322 return m_result;
323 }
324 public new T Join(int timeout) {
325 WaitResult(timeout);
326 return m_result;
327 }
328
329 public IPromise<T> On(Action<T> success, Action<Exception> error, Action cancel) {
330 AddHandler(new ValueEventDescriptor(success, error, cancel, StubDeferred.instance));
331 return this;
332 }
333
334 public IPromise<T> On(Action<T> success, Action<Exception> error) {
335 AddHandler(new ValueEventDescriptor(success, error, null, StubDeferred.instance));
336 return this;
337 }
338
339 public IPromise<T> On(Action<T> success) {
340 AddHandler(new ValueEventDescriptor(success, null, null, StubDeferred.instance));
341 return this;
342 }
343
344 public IPromise<T> On(Action handler, PromiseEventType events) {
345 Listen(events, handler);
346 return this;
347 }
348
349 public IPromise<T2> Then<T2>(Func<T, T2> mapper, Func<Exception, T2> error, Func<T2> cancel) {
350 var promise = new Promise<T2>();
351 AddHandler(new RemapDescriptor<T2>(mapper, error, cancel, promise));
352 return promise;
353 }
354
355 public IPromise<T2> Then<T2>(Func<T, T2> mapper, Func<Exception, T2> error) {
356 var promise = new Promise<T2>();
357 AddHandler(new RemapDescriptor<T2>(mapper, error, null, promise));
358 return promise;
359 }
360
361 public IPromise<T2> Then<T2>(Func<T, T2> mapper) {
362 var promise = new Promise<T2>();
363 AddHandler(new RemapDescriptor<T2>(mapper, null, null, promise));
364 return promise;
365 }
366
367 public IPromise<T2> Chain<T2>(Func<T, IPromise<T2>> chained, Func<Exception, IPromise<T2>> error, Func<IPromise<T2>> cancel) {
368 // this promise will be resolved when an asyc operation is started
369 var promise = new Promise<IPromise<T2>>();
370
371 AddHandler(new RemapDescriptor<IPromise<T2>>(
372 chained,
373 error,
374 cancel,
375 promise
376 ));
377
378 var medium = new Promise<T2>();
379
380 if (chained != null)
381 medium.On(Cancel, PromiseEventType.Cancelled);
382
383 // we need to connect started async operation with the medium
384 // if the async operation hasn't been started by the some reason
385 // report is to the medium
386 promise.On(
387 result => ConnectPromise<T2>(result, medium),
388 medium.Reject,
389 medium.Cancel
390 );
391
392 return medium;
393 }
394
395 static void ConnectPromise<T2>(IPromise<T2> result, Promise<T2> medium) {
396 if (result != null) {
397 result.On(
398 medium.Resolve,
399 medium.Reject,
400 () => medium.Reject(new OperationCanceledException())
401 );
402 medium.On(result.Cancel, PromiseEventType.Cancelled);
403 } else {
404 medium.Reject(
405 new NullReferenceException(
406 "The chained asynchronous operation returned" +
407 " 'null' where the promise instance is expected"
408 )
409 );
410 }
411 }
412
413 public IPromise<T2> Chain<T2>(Func<T, IPromise<T2>> chained, Func<Exception, IPromise<T2>> error) {
414 return Chain(chained, error, null);
415 }
416
417 public IPromise<T2> Chain<T2>(Func<T, IPromise<T2>> chained) {
418 return Chain(chained, null, null);
419 }
420
421 public IPromise<T2> Error<T2>(Func<Exception, T2> error) {
422 var promise = new Promise<T2>();
423 if (error != null)
424 On(
425 (Action<T>)null,
426 ex => {
427 try {
428 promise.Resolve(error(ex));
429 } catch (Exception ex2) {
430 promise.Reject(ex2);
431 }
432 }
433 );
434 else
435 Listen(PromiseEventType.Error, () => promise.Resolve(default(T2)));
436 return promise;
437 }
438
439 public IPromise<T2> Cancelled<T2>(Func<T2> handler) {
440 var promise = new Promise<T2>();
441 if (handler != null)
442 On(
443 (Action<T>)null,
444 null,
445 () => {
446 try {
447 promise.Resolve(handler());
448 } catch (Exception ex) {
449 promise.Reject(ex);
450 }
451 });
452 else
453 Listen(PromiseEventType.Cancelled, () => promise.Resolve(default(T2)));
454 return promise;
455 }
456
457 public IPromise Then(Action success, Action<Exception> error, Action cancel) {
458 var promise = new Promise<T>();
459 if (success != null)
460 promise.On(Cancel, PromiseEventType.Cancelled);
461
462 AddHandler(new EventDescriptor(success, error, cancel, promise));
463
464 return promise;
465 }
466
467 public IPromise Then(Action success, Action<Exception> error) {
468 return Then(success, error, null);
469 }
470
471 public IPromise Then(Action success) {
472 return Then(success, null, null);
473 }
474
475 public IPromise Chain(Func<IPromise> chained, Func<Exception, IPromise> error, Func<IPromise> cancel) {
476 var promise = new Promise<IPromise>();
477
478 AddHandler(
479 new RemapDescriptor<IPromise>(
480 x => chained(),
481 error,
482 cancel,
483 promise
484 )
485 );
486
487 var medium = new Promise();
488 if (chained != null)
489 medium.On(Cancel, PromiseEventType.Cancelled);
490
491 promise.On(
492 result => ConnectPromise(result, medium),
493 medium.Reject,
494 medium.Cancel
495 );
496
497 return medium;
498 }
499
500 static void ConnectPromise(IPromise result, Promise medium) {
501 if (result != null) {
502 result.On(
503 medium.Resolve,
504 medium.Reject,
505 () => medium.Reject(new OperationCanceledException())
506 );
507 medium.On(result.Cancel, PromiseEventType.Cancelled);
508 } else {
509 medium.Reject(
510 new NullReferenceException(
511 "The chained asynchronous operation returned" +
512 " 'null' where the promise instance is expected"
513 )
514 );
515 }
516 }
517
518 public IPromise Chain(Func<IPromise> chained, Func<Exception, IPromise> error) {
519 return Chain(chained, error, null);
520 }
521
522 public IPromise Chain(Func<IPromise> chained) {
523 return Chain(chained, null, null);
524 }
525
526 public IPromise On(Action success, Action<Exception> error, Action cancel) {
527 AddHandler(new EventDescriptor(success,error,cancel, StubDeferred.instance));
528 return this;
529 }
530
531 public IPromise On(Action success, Action<Exception> error) {
532 AddHandler(new EventDescriptor(success, error, null, StubDeferred.instance));
533 return this;
534 }
535
536 public IPromise On(Action success) {
537 Listen(PromiseEventType.Success, success);
538 return this;
539 }
540
541 IPromise IPromise.On(Action handler, PromiseEventType events) {
542 Listen(events,handler);
543 return this;
544 }
545
546 public IPromise Error(Action<Exception> error) {
547 var promise = new Promise();
548 if (error != null)
549 On(
550 (Action<T>)null,
551 ex => {
552 try {
553 error(ex);
554 promise.Resolve();
555 } catch (Exception ex2) {
556 promise.Reject(ex2);
557 }
558 });
559 else
560 Listen(PromiseEventType.Error, promise.Resolve);
561 return promise;
562 }
563
564 public IPromise Cancelled(Action handler) {
565 var promise = new Promise();
566 if (handler != null)
567 On(
568 (Action<T>)null,
569 null,
570 () => {
571 try {
572 handler();
573 promise.Resolve();
574 } catch (Exception ex) {
575 promise.Reject(ex);
576 }
577 });
578 else
579 Listen(PromiseEventType.Cancelled, promise.Resolve);
580 return promise;
581 }
582
583 public IPromise<T2> Cast<T2>() {
584 return (IPromise<T2>)this;
585 }
586
587 #region implemented abstract members of AbstractPromise
588
589 protected override void SignalSuccess(IDeferred<T> handler) {
590 handler.Resolve(m_result);
591 }
592
593 protected override void SignalError(IDeferred<T> handler, Exception error) {
594 handler.Reject(error);
595 }
596
597 protected override void SignalCancelled(IDeferred<T> handler) {
598 handler.Cancel();
599 }
600
601 protected override void Listen(PromiseEventType events, Action handler) {
602 if (handler != null)
603 AddHandler(new ListenerDescriptor(handler, events));
604 }
605
606 #endregion
607
608 public static IPromise<T> ResultToPromise(T value) {
609 var p = new Promise<T>();
610 p.Resolve(value);
611 return p;
612 }
613
614 public static IPromise<T> ExceptionToPromise(Exception error) {
615 var p = new Promise<T>();
616 p.Reject(error);
617 return p;
618 }
619
620 }
621 }
@@ -0,0 +1,33
1 using System;
2
3 namespace Implab {
4
5 [Serializable]
6 public class PromiseTransientException : Exception {
7 /// <summary>
8 /// Initializes a new instance of the <see cref="PromiseTransientException"/> class.
9 /// </summary>
10 /// <param name="inner">The exception that is the cause of the current exception.</param>
11 public PromiseTransientException(Exception inner) : base("The preceding promise has failed", inner) {
12 }
13
14 /// <summary>
15 /// Initializes a new instance of the <see cref="PromiseTransientException"/> class
16 /// </summary>
17 /// <param name="message">A <see cref="T:System.String"/> that describes the exception. </param>
18 /// <param name="inner">The exception that is the cause of the current exception. </param>
19 public PromiseTransientException(string message, Exception inner)
20 : base(message, inner) {
21 }
22
23 /// <summary>
24 /// Initializes a new instance of the <see cref="PromiseTransientException"/> class
25 /// </summary>
26 /// <param name="context">The contextual information about the source or destination.</param>
27 /// <param name="info">The object that holds the serialized object data.</param>
28 protected PromiseTransientException(System.Runtime.Serialization.SerializationInfo info, System.Runtime.Serialization.StreamingContext context)
29 : base(info, context) {
30 }
31 }
32 }
33
@@ -12,19 +12,27 namespace Implab.Fx {
12 12 m_target = target;
13 13 }
14 14
15 public ControlBoundPromise(Control target, IPromise parent)
16 : base(parent) {
17 Safe.ArgumentNotNull(target, "target");
18
19 m_target = target;
15 protected override void SignalSuccess(IDeferred<T> handler) {
16 if (m_target.InvokeRequired)
17 m_target.BeginInvoke(new Action<IDeferred<T>>(base.SignalSuccess), handler);
18 else
19 base.SignalSuccess(handler);
20 20 }
21 21
22 protected override void InvokeHandler(AbstractHandler handler) {
22 protected override void SignalCancelled(IDeferred<T> handler) {
23 23 if (m_target.InvokeRequired)
24 m_target.BeginInvoke(new Action<AbstractHandler>(base.InvokeHandler), handler);
24 m_target.BeginInvoke(new Action<IDeferred<T>>(base.SignalCancelled), handler);
25 25 else
26 base.InvokeHandler(handler);
26 base.SignalCancelled(handler);
27 27 }
28
29 protected override void SignalError(IDeferred<T> handler, Exception error) {
30 if (m_target.InvokeRequired)
31 m_target.BeginInvoke(new Action<IDeferred<T>,Exception>(base.SignalError), handler, error);
32 else
33 base.SignalError(handler, error);
34 }
35
28 36 }
29 37 }
30 38
@@ -27,7 +27,9 namespace Implab.Fx
27 27 Safe.ArgumentNotNull(that, "that");
28 28 Safe.ArgumentNotNull(ctl, "ctl");
29 29
30 var directed = new ControlBoundPromise<T>(ctl,that);
30 var directed = new ControlBoundPromise<T>(ctl);
31
32 directed.On(that.Cancel, PromiseEventType.Cancelled);
31 33
32 34 that.On(
33 35 directed.Resolve,
@@ -72,7 +72,7 namespace Implab.Test {
72 72 p.Cancel();
73 73
74 74 var p2 = p
75 .Cancelled(() => {
75 .Cancelled<bool>(() => {
76 76 throw new ApplicationException("CANCELLED");
77 77 })
78 78 .Error(e => true);
@@ -195,11 +195,11 namespace Implab.Test {
195 195 .Invoke(() => 1)
196 196 .Then(x => Interlocked.Add(ref count, x))
197 197 .Then(x => Math.Log10(x))
198 .Anyway(() => {
198 .On(() => {
199 199 Interlocked.Decrement(ref pending);
200 200 if (pending == 0)
201 201 stop.Set();
202 });
202 }, PromiseEventType.All);
203 203 }
204 204
205 205 stop.WaitOne();
@@ -255,7 +255,7 namespace Implab.Test {
255 255 }
256 256 return 1;
257 257 })
258 .Anyway(() => Interlocked.Decrement(ref writers));
258 .On(() => Interlocked.Decrement(ref writers), PromiseEventType.All);
259 259 }
260 260
261 261 for (int i = 0; i < 10; i++) {
@@ -269,11 +269,72 namespace Implab.Test {
269 269 } while (writers > 0);
270 270 return 1;
271 271 })
272 .Anyway(() => {
272 .On(() => {
273 273 Interlocked.Decrement(ref readers);
274 274 if (readers == 0)
275 275 stop.Set();
276 });
276 }, PromiseEventType.All);
277 }
278
279 stop.WaitOne();
280
281 Assert.AreEqual(100000, total);
282 }
283
284 [TestMethod]
285 public void AsyncQueueTest() {
286 var queue = new AsyncQueue<int>();
287 int res;
288
289 queue.Enqueue(10);
290 Assert.IsTrue(queue.TryDequeue(out res));
291 Assert.AreEqual(10, res);
292 Assert.IsFalse(queue.TryDequeue(out res));
293
294 for (int i = 0; i < 1000; i++)
295 queue.Enqueue(i);
296
297 for (int i = 0; i < 1000; i++) {
298 queue.TryDequeue(out res);
299 Assert.AreEqual(i, res);
300 }
301
302 int writers = 0;
303 int readers = 0;
304 var stop = new ManualResetEvent(false);
305 int total = 0;
306
307 const int itemsPerWriter = 10000;
308 const int writersCount = 10;
309
310 for (int i = 0; i < writersCount; i++) {
311 Interlocked.Increment(ref writers);
312 AsyncPool
313 .InvokeNewThread(() => {
314 for (int ii = 0; ii < itemsPerWriter; ii++) {
315 queue.Enqueue(1);
316 }
317 return 1;
318 })
319 .On(() => Interlocked.Decrement(ref writers), PromiseEventType.All);
320 }
321
322 for (int i = 0; i < 10; i++) {
323 Interlocked.Increment(ref readers);
324 AsyncPool
325 .InvokeNewThread(() => {
326 int t;
327 do {
328 while (queue.TryDequeue(out t))
329 Interlocked.Add(ref total, t);
330 } while (writers > 0);
331 return 1;
332 })
333 .On(() => {
334 Interlocked.Decrement(ref readers);
335 if (readers == 0)
336 stop.Set();
337 }, PromiseEventType.All);
277 338 }
278 339
279 340 stop.WaitOne();
@@ -371,15 +432,15 namespace Implab.Test {
371 432
372 433 var step1 = PromiseHelper
373 434 .Sleep(200, "Alan")
374 .Cancelled(() => flags[0] = true);
435 .On(() => flags[0] = true, PromiseEventType.Cancelled);
375 436 var p = step1
376 437 .Chain(x =>
377 438 PromiseHelper
378 439 .Sleep(200, "Hi, " + x)
379 440 .Then(y => y)
380 .Cancelled(() => flags[1] = true)
441 .On(() => flags[1] = true, PromiseEventType.Cancelled)
381 442 )
382 .Cancelled(() => flags[2] = true);
443 .On(() => flags[2] = true, PromiseEventType.Cancelled);
383 444 step1.Join();
384 445 p.Cancel();
385 446 try {
@@ -18,7 +18,7 namespace Implab {
18 18 }
19 19
20 20 bool m_disposed;
21 readonly MTQueue<IDisposable> m_components = new MTQueue<IDisposable>();
21 readonly AsyncQueue<IDisposable> m_components = new AsyncQueue<IDisposable>();
22 22
23 23 public void Add(IDisposable item) {
24 24 Safe.ArgumentNotNull(item, "item");
@@ -1,23 +1,23
1 namespace Implab.Diagnostics {
1 using System;
2
3 namespace Implab.Diagnostics {
2 4 public static class Extensions {
3 5 public static IPromise<T> EndLogicalOperation<T>(this IPromise<T> promise) {
4 6 Safe.ArgumentNotNull(promise, "promise");
5 7 var op = TraceContext.Instance.DetachLogicalOperation();
6 8
7 return promise.Then<T>(
9 return promise.On(
8 10 x => {
9 11 TraceContext.Instance.EnterLogicalOperation(op,true);
10 12 TraceLog.TraceInformation("promise = {0}", x);
11 13 TraceLog.EndLogicalOperation();
12 14 TraceContext.Instance.Leave();
13 return x;
14 15 },
15 16 err =>{
16 17 TraceContext.Instance.EnterLogicalOperation(op,true);
17 18 TraceLog.TraceError("promise died {0}", err);
18 19 TraceLog.EndLogicalOperation();
19 20 TraceContext.Instance.Leave();
20 throw new TransientPromiseException(err);
21 21 },
22 22 () => {
23 23 TraceContext.Instance.EnterLogicalOperation(op,true);
@@ -32,11 +32,11
32 32 Safe.ArgumentNotNull(promise, "promise");
33 33 var op = TraceContext.Instance.DetachLogicalOperation();
34 34
35 return promise.Anyway(() => {
35 return promise.On(() => {
36 36 TraceContext.Instance.EnterLogicalOperation(op,true);
37 37 TraceLog.EndLogicalOperation();
38 38 TraceContext.Instance.Leave();
39 });
39 }, PromiseEventType.All);
40 40 }
41 41 }
42 42 }
@@ -74,7 +74,7 namespace Implab.Diagnostics {
74 74 m_current = m_stack.Pop();
75 75 LogChannel<TraceEvent>.Default.LogEvent(new TraceEvent(TraceEventType.Leave, String.Format("{0} -> {1}", prev.Name, CurrentOperation.Name)));
76 76 } else {
77 TraceLog.TraceWarning("Attemtp to leave the last operation context");
77 TraceLog.TraceWarning("Attempt to leave the last operation context");
78 78 m_current = OperationContext.EMPTY;
79 79 }
80 80 }
@@ -7,7 +7,7 using System.Diagnostics.CodeAnalysis;
7 7 namespace Implab {
8 8 public abstract class DisposablePool<T> : IDisposable {
9 9 readonly int m_size;
10 readonly MTQueue<T> m_queue = new MTQueue<T>();
10 readonly AsyncQueue<T> m_queue = new AsyncQueue<T>();
11 11
12 12 [SuppressMessage("Microsoft.Design", "CA1000:DoNotDeclareStaticMembersOnGenericTypes")]
13 13 static readonly bool _isValueType = typeof(T).IsValueType;
@@ -5,12 +5,6 using System.Text;
5 5
6 6 namespace Implab {
7 7 public interface IPromise: ICancellable {
8 /// <summary>
9 /// Check whereather the promise has no more than one dependent promise.
10 /// </summary>
11 bool IsExclusive {
12 get;
13 }
14 8
15 9 /// <summary>
16 10 /// Тип результата, получаемого через данное обещание.
@@ -27,44 +21,98 namespace Implab {
27 21 /// </summary>
28 22 bool IsCancelled { get; }
29 23
24 /// <summary>
25 /// Creates a new promise dependend on the current one and resolved on
26 /// executing the specified handlers.
27 /// </summary>
28 /// <param name="success">The handler called on the successful promise completion.</param>
29 /// <param name="error">The handler is called if an error while completing the promise occurred.</param>
30 /// <param name="cancel">The handler is called in case of promise cancellation.</param>
31 /// <returns>The newly created dependant promise.</returns>
32 /// <remarks>
33 /// <para>
34 /// If the success handler is specified the dependend promise will be resolved after the handler is
35 /// executed and the dependent promise will be linked to the current one, i.e. the cancellation
36 /// of the dependent property will lead to the cancellation of the current promise. If the
37 /// success handler isn't specified the dependent promise will not be linked to and
38 /// will not be resolved after the successfull resolution of the current one.
39 /// </para>
40 /// <para>
41 /// When the error handler is specified, the exception raised during the current promise completion
42 /// will be passed to it as the parameter. If the error handler returns without raising an
43 /// exception then the dependant promise will be resolved successfully, otherwise the exception
44 /// raised by the handler will be transmitted to the dependent promise. If the handler wants
45 /// to passthrough the original exception it needs to wrap the exception with
46 /// the <see cref="PromiseTransientException"/>.
47 /// </para>
48 /// <para>
49 /// If the cancelation handler is specified and the current promise is cancelled then the dependent
50 /// promise will be resolved after the handler is executed. If the cancelation hendler raises the
51 /// exception it will be passed to the dependent promise.
52 /// </para>
53 /// </remarks>
30 54 IPromise Then(Action success, Action<Exception> error, Action cancel);
31 55 IPromise Then(Action success, Action<Exception> error);
32 56 IPromise Then(Action success);
33 57
34 IPromise Chain(Func<IPromise> chained, Func<Exception, IPromise> error, Action cancel);
58 IPromise Chain(Func<IPromise> chained, Func<Exception, IPromise> error, Func<IPromise> cancel);
35 59 IPromise Chain(Func<IPromise> chained, Func<Exception, IPromise> error);
36 60 IPromise Chain(Func<IPromise> chained);
37 61
38 62 /// <summary>
39 /// Добавляет последнй обработчик в цепочку обещаний, не создает промежуточных обещаний.
63 /// Adds specified listeners to the current promise.
40 64 /// </summary>
41 /// <param name="success">Success.</param>
42 /// <param name="error">Error.</param>
43 /// <param name="cancel">Cancel.</param>
44 void On(Action success, Action<Exception> error, Action cancel);
45 void On(Action success, Action<Exception> error);
46 void On(Action success);
47 void On(Action success, PromiseEventType events);
65 /// <param name="success">The handler called on the successful promise completion.</param>
66 /// <param name="error">The handler is called if an error while completing the promise occurred.</param>
67 /// <param name="cancel">The handler is called in case of promise cancellation.</param>
68 /// <returns>The current promise.</returns>
69 IPromise On(Action success, Action<Exception> error, Action cancel);
70 IPromise On(Action success, Action<Exception> error);
71 IPromise On(Action success);
48 72
49 IPromise Error(Action<Exception> error);
50 73 /// <summary>
51 /// Обрабатывает либо ошибку, либо результат, либо отмену.
74 /// Adds specified listeners to the current promise.
75 /// </summary>
76 /// <param name="handler">The handler called on the specified events.</param>
77 /// <param name = "events">The combination of flags denoting the events for which the
78 /// handler shoud be called.</param>
79 /// <returns>The current promise.</returns>
80 IPromise On(Action handler, PromiseEventType events);
81
82 /// <summary>
83 /// Adds the specified error handler to the current promise
84 /// and creates the new dependant promise.
52 85 /// </summary>
53 /// <param name="handler">Обработчик.</param>
54 /// <remarks>После обработке ошибки, она передается дальше.</remarks>
86 /// <param name="error">
87 /// The error handler. If the error handler returns without
88 /// an error the dependant promise will be successfully resolved.
89 /// </param>
90 /// <returns>
91 /// The new dependant promise which will be resolved after the error
92 /// handler is executed.
93 /// </returns>
94 /// <remarks>
95 /// The successfull result of the current promise will be ignored.
96 /// </remarks>
97 IPromise Error(Action<Exception> error);
98
55 99 /// <summary>
56 /// Обрабатывает либо ошибку, либо результат, либо отмену обещания.
100 /// Adds the specified cncellation handler to the current promise
101 /// and creates the new dependant promise.
57 102 /// </summary>
58 /// <param name="handler">Обработчик.</param>
59 /// <remarks>После обработке ошибки, она передается дальше.</remarks>
60 IPromise Anyway(Action handler);
61 /// <summary>
62 /// Обработчик для регистрации отмены обещания.
63 /// </summary>
64 /// <returns>Новое обещание, связанное с текущим, выполнится после указанного обработчика.</returns>
65 /// <param name="handler">Обработчик события.</param>
66 /// <remarks>Если обработчик вызывает исключение, то оно передается обработчику ошибки, результат работы
67 /// которого будет передан связанному обещанию</remarks>
103 /// <returns>
104 /// The new dependant promise which will be resolved after the cancellation
105 /// handler is executed.
106 /// </returns>
107 /// <param name="handler">
108 /// The cancellation handler.
109 /// </param>
110 /// <remarks>
111 /// If the cancellation handler is executed without an error the dependent
112 /// promise will be successfully resolved, otherwise the raised exception
113 /// will be passed to the dependant promise. The successful result of the
114 /// current promise will be ignored.
115 /// </remarks>
68 116 IPromise Cancelled(Action handler);
69 117
70 118 /// <summary>
@@ -1,34 +1,34
1 1 using System;
2 2
3 3 namespace Implab {
4 public interface IPromise<T> : IPromise {
4 public interface IPromise<out T> : IPromise {
5 5
6 6 new T Join();
7 7
8 8 new T Join(int timeout);
9 9
10 void On(Action<T> success, Action<Exception> error, Action cancel);
10 IPromise<T> On(Action<T> success, Action<Exception> error, Action cancel);
11 11
12 void On(Action<T> success, Action<Exception> error);
12 IPromise<T> On(Action<T> success, Action<Exception> error);
13 13
14 void On(Action<T> success);
14 IPromise<T> On(Action<T> success);
15 15
16 IPromise<T2> Then<T2>(Func<T, T2> mapper, Func<Exception,T2> error, Action cancel);
16 new IPromise<T> On(Action handler, PromiseEventType events);
17
18 IPromise<T2> Then<T2>(Func<T, T2> mapper, Func<Exception,T2> error, Func<T2> cancel);
17 19
18 20 IPromise<T2> Then<T2>(Func<T, T2> mapper, Func<Exception,T2> error);
19 21
20 22 IPromise<T2> Then<T2>(Func<T, T2> mapper);
21 23
22 IPromise<T2> Chain<T2>(Func<T, IPromise<T2>> chained, Func<Exception,IPromise<T2>> error, Action cancel);
24 IPromise<T2> Chain<T2>(Func<T, IPromise<T2>> chained, Func<Exception,IPromise<T2>> error, Func<IPromise<T2>> cancel);
23 25
24 26 IPromise<T2> Chain<T2>(Func<T, IPromise<T2>> chained, Func<Exception,IPromise<T2>> error);
25 27
26 28 IPromise<T2> Chain<T2>(Func<T, IPromise<T2>> chained);
27 29
28 IPromise<T> Error(Func<Exception,T> error);
30 IPromise<T2> Error<T2>(Func<Exception,T2> error);
29 31
30 new IPromise<T> Cancelled(Action handler);
31
32 new IPromise<T> Anyway(Action handler);
32 IPromise<T2> Cancelled<T2>(Func<T2> handler);
33 33 }
34 34 }
@@ -7,6 +7,8
7 7 <OutputType>Library</OutputType>
8 8 <RootNamespace>Implab</RootNamespace>
9 9 <AssemblyName>Implab</AssemblyName>
10 <ProductVersion>8.0.30703</ProductVersion>
11 <SchemaVersion>2.0</SchemaVersion>
10 12 </PropertyGroup>
11 13 <PropertyGroup Condition=" '$(Configuration)|$(Platform)' == 'Debug|AnyCPU' ">
12 14 <DebugSymbols>true</DebugSymbols>
@@ -131,12 +133,10
131 133 <Compile Include="TaskController.cs" />
132 134 <Compile Include="ProgressInitEventArgs.cs" />
133 135 <Compile Include="Properties\AssemblyInfo.cs" />
134 <Compile Include="Promise.cs" />
135 136 <Compile Include="Parallels\AsyncPool.cs" />
136 137 <Compile Include="Safe.cs" />
137 138 <Compile Include="ValueEventArgs.cs" />
138 139 <Compile Include="PromiseExtensions.cs" />
139 <Compile Include="TransientPromiseException.cs" />
140 140 <Compile Include="SyncContextPromise.cs" />
141 141 <Compile Include="Diagnostics\OperationContext.cs" />
142 142 <Compile Include="Diagnostics\TraceContext.cs" />
@@ -150,6 +150,13
150 150 <Compile Include="ComponentContainer.cs" />
151 151 <Compile Include="DisposablePool.cs" />
152 152 <Compile Include="ObjectPool.cs" />
153 <Compile Include="Parallels\AsyncQueue.cs" />
154 <Compile Include="PromiseT.cs" />
155 <Compile Include="IDeferred.cs" />
156 <Compile Include="IDeferredT.cs" />
157 <Compile Include="AbstractPromise.cs" />
158 <Compile Include="Promise.cs" />
159 <Compile Include="PromiseTransientException.cs" />
153 160 </ItemGroup>
154 161 <Import Project="$(MSBuildBinPath)\Microsoft.CSharp.targets" />
155 162 <ItemGroup />
@@ -1,9 +1,6
1 1 using System;
2 2 using System.Collections.Generic;
3 3 using System.IO;
4 using System.Linq;
5 using System.Text;
6 using System.Threading.Tasks;
7 4
8 5 namespace Implab.JSON {
9 6 public class JSONWriter {
@@ -23,7 +20,6 namespace Implab.JSON {
23 20 _escapeCR,
24 21 _escapeNL,
25 22 _escapeTAB,
26 _escapeSLASH,
27 23 _escapeBSLASH,
28 24 _escapeQ;
29 25
@@ -34,7 +30,6 namespace Implab.JSON {
34 30 _escapeNL = "\\n".ToCharArray();
35 31 _escapeTAB = "\\t".ToCharArray();
36 32 _escapeBSLASH = "\\\\".ToCharArray();
37 _escapeSLASH = "\\/".ToCharArray();
38 33 _escapeQ = "\\\"".ToCharArray();
39 34 }
40 35
@@ -205,6 +200,7 namespace Implab.JSON {
205 200 var chars = value.ToCharArray();
206 201 m_writer.Write('"');
207 202
203 // Analysis disable once ForCanBeConvertedToForeach
208 204 for (int i = 0; i < chars.Length; i++) {
209 205 var ch = chars[i];
210 206
@@ -18,7 +18,7 namespace Implab {
18 18 /// <para>Пул поддерживает обращения сразу из нескольких потоков.</para>
19 19 /// </remarks>
20 20 public abstract class ObjectPool<T> where T : class {
21 readonly MTQueue<WeakReference> m_queue = new MTQueue<WeakReference>();
21 readonly AsyncQueue<WeakReference> m_queue = new AsyncQueue<WeakReference>();
22 22 readonly int m_size;
23 23 int m_count = 0;
24 24
@@ -26,7 +26,7 namespace Implab.Parallels {
26 26 m_pending = source.Length;
27 27 m_action = action;
28 28
29 m_promise.Anyway(Dispose);
29 m_promise.On(Dispose, PromiseEventType.All);
30 30
31 31 InitPool();
32 32 }
@@ -86,7 +86,7 namespace Implab.Parallels {
86 86 m_transform = transform;
87 87 m_logicalOperation = TraceContext.Instance.CurrentOperation;
88 88
89 m_promise.Anyway(Dispose);
89 m_promise.On(Dispose, PromiseEventType.All);
90 90
91 91 InitPool();
92 92 }
@@ -162,7 +162,7 namespace Implab.Parallels {
162 162 int slots = threads;
163 163
164 164 // Analysis disable AccessToDisposedClosure
165 AsyncPool.InvokeNewThread(() => {
165 AsyncPool.InvokeNewThread<int>(() => {
166 166 for (int i = 0; i < source.Length; i++) {
167 167 if(promise.IsResolved)
168 168 break; // stop processing in case of error or cancellation
@@ -177,7 +177,7 namespace Implab.Parallels {
177 177
178 178 try {
179 179 transform(source[i])
180 .Anyway(() => {
180 .On( x => {
181 181 Interlocked.Increment(ref slots);
182 182 lock (locker) {
183 183 Monitor.Pulse(locker);
@@ -53,7 +53,7 namespace Implab.Parallels {
53 53
54 54
55 55 public static IPromise InvokeNewThread(Action func) {
56 var p = new Promise<object>();
56 var p = new Promise();
57 57
58 58 var caller = TraceContext.Instance.CurrentOperation;
59 59
@@ -6,7 +6,7 using Implab.Diagnostics;
6 6 namespace Implab.Parallels {
7 7 public class WorkerPool : DispatchPool<Action> {
8 8
9 MTQueue<Action> m_queue = new MTQueue<Action>();
9 AsyncQueue<Action> m_queue = new AsyncQueue<Action>();
10 10 int m_queueLength = 0;
11 11 readonly int m_threshold = 1;
12 12
This diff has been collapsed as it changes many lines, (1046 lines changed) Show them Hide them
@@ -1,954 +1,258
1 using System;
2 using System.Collections.Generic;
3 using System.Reflection;
4 using System.Threading;
5 using Implab.Parallels;
1 using System;
2 using System.Diagnostics;
6 3
7 4 namespace Implab {
5 public class Promise : AbstractPromise<Promise.HandlerDescriptor>, IPromise, IDeferred {
8 6
9 /// <summary>
10 /// Класс для асинхронного получения результатов. Так называемое "обещание".
11 /// </summary>
12 /// <typeparam name="T">Тип получаемого результата</typeparam>
13 /// <remarks>
14 /// <para>Сервис при обращении к его методу дает обещаиние о выполнении операции,
15 /// клиент получив такое обещание может установить ряд обратных вызово для получения
16 /// событий выполнения обещания, тоесть завершения операции и предоставлении результатов.</para>
17 /// <para>
18 /// Обещение может быть как выполнено, так и выполнено с ошибкой. Для подписки на
19 /// данные события клиент должен использовать методы <c>Then</c>.
20 /// </para>
21 /// <para>
22 /// Сервис, в свою очередь, по окончанию выполнения операции (возможно с ошибкой),
23 /// использует методы <c>Resolve</c> либо <c>Reject</c> для оповещения клиетна о
24 /// выполнении обещания.
25 /// </para>
26 /// <para>
27 /// Если сервер успел выполнить обещание еще до того, как клиент на него подписался,
28 /// то в момент подписки клиента будут вызваны соответсвующие события в синхронном
29 /// режиме и клиент будет оповещен в любом случае. Иначе, обработчики добавляются в
30 /// список в порядке подписания и в этом же порядке они будут вызваны при выполнении
31 /// обещания.
32 /// </para>
33 /// <para>
34 /// Обрабатывая результаты обещания можно преобразовывать результаты либо инициировать
35 /// связанные асинхронные операции, которые также возвращают обещания. Для этого следует
36 /// использовать соответствующую форму методе <c>Then</c>.
37 /// </para>
38 /// <para>
39 /// Также хорошим правилом является то, что <c>Resolve</c> и <c>Reject</c> должен вызывать
40 /// только инициатор обещания иначе могут возникнуть противоречия.
41 /// </para>
42 /// </remarks>
43 public class Promise<T> : IPromise<T> {
7 public struct HandlerDescriptor {
8 readonly Action m_success;
9 readonly Action<Exception> m_error;
10 readonly Action m_cancel;
11 readonly IDeferred m_deferred;
44 12
45 protected abstract class AbstractHandler : MTCustomQueueNode<AbstractHandler> {
46 public abstract void Resolve(T result);
47 public abstract void Reject(Exception error);
48 public abstract void Cancel();
13 public HandlerDescriptor(Action success, Action<Exception> error, Action cancel, IDeferred deferred) {
14 m_success = success;
15 m_error = error;
16 m_cancel = cancel;
17 m_deferred = deferred;
49 18 }
50 19
51 protected class RemapDescriptor<T2> : AbstractHandler {
52
53 readonly Func<T,T2> m_resultHandler;
54 readonly Func<Exception,T2> m_errorHandler;
55 readonly Action m_cancellHandler;
56 readonly Promise<T2> m_medium;
57
58 public RemapDescriptor(Func<T,T2> resultHandler, Func<Exception,T2> errorHandler, Action cancelHandler, Promise<T2> medium) {
59 m_resultHandler = resultHandler;
60 m_errorHandler = errorHandler;
61 m_cancellHandler = cancelHandler;
62 m_medium = medium;
63 }
64
65 public override void Resolve(T result) {
66 if (m_resultHandler != null) {
20 public void SignalSuccess() {
21 if (m_success != null) {
67 22 try {
68 if (m_medium != null)
69 m_medium.Resolve(m_resultHandler(result));
70 else
71 m_resultHandler(result);
72 } catch (Exception e) {
73 Reject(e);
74 }
75 } else if(m_medium != null)
76 m_medium.Resolve(default(T2));
23 m_success();
24 if (m_deferred != null)
25 m_deferred.Resolve();
26 } catch (Exception err) {
27 SignalError(err);
77 28 }
78
79 public override void Reject(Exception error) {
80 if (m_errorHandler != null) {
81 try {
82 var res = m_errorHandler(error);
83 if (m_medium != null)
84 m_medium.Resolve(res);
85 } catch (Exception err2) {
86 if (m_medium != null)
87 m_medium.Reject(err2);
88 }
89 } else if (m_medium != null)
90 m_medium.Reject(error);
91 }
92
93 public override void Cancel() {
94 if (m_cancellHandler != null) {
95 try {
96 m_cancellHandler();
97 } catch (Exception err) {
98 Reject(err);
99 return;
100 }
101 }
102 if (m_medium != null)
103 m_medium.Cancel();
104 29 }
105 30 }
106 31
107 protected class HandlerDescriptor : AbstractHandler {
108
109 readonly Action<T> m_resultHandler;
110 readonly Action<Exception> m_errorHandler;
111 readonly Action m_cancellHandler;
112 readonly Promise<T> m_medium;
113
114 public HandlerDescriptor(Action<T> resultHandler, Action<Exception> errorHandler, Action cancelHandler, Promise<T> medium) {
115 m_resultHandler = resultHandler;
116 m_errorHandler = errorHandler;
117 m_cancellHandler = cancelHandler;
118 m_medium = medium;
32 public void SignalError(Exception err) {
33 if (m_error != null) {
34 try {
35 m_error(err);
36 if (m_deferred != null)
37 m_deferred.Resolve();
38 } catch (Exception err2) {
39 if (m_deferred != null)
40 m_deferred.Reject(err2);
119 41 }
120
121 public override void Resolve(T result) {
122 if (m_resultHandler != null) {
123 try {
124 m_resultHandler(result);
125 } catch (Exception e) {
126 Reject(e);
127 return;
42 } else {
43 if (m_deferred != null)
44 m_deferred.Reject(err);
128 45 }
129 46 }
130 if(m_medium != null)
131 m_medium.Resolve(result);
132 }
133 47
134 public override void Reject(Exception error) {
135 if (m_errorHandler != null) {
48 public void SignalCancel() {
49 if (m_cancel != null) {
136 50 try {
137 m_errorHandler(error);
138 if (m_medium != null)
139 m_medium.Resolve(default(T));
140 } catch (Exception err2) {
141 if (m_medium != null)
142 m_medium.Reject(err2);
51 m_cancel();
52 if (m_deferred != null)
53 m_deferred.Resolve();
54 } catch (Exception err) {
55 SignalError(err);
143 56 }
144 } else if (m_medium != null)
145 m_medium.Reject(error);
57 } else {
58 if (m_deferred != null)
59 m_deferred.Cancel();
146 60 }
147
148 public override void Cancel() {
149 if (m_cancellHandler != null) {
150 try {
151 m_cancellHandler();
152 } catch (Exception err) {
153 Reject(err);
154 return;
155 }
156 }
157 if (m_medium != null)
158 m_medium.Cancel();
159 61 }
160 62 }
161 63
162 const int UNRESOLVED_SATE = 0;
163 const int TRANSITIONAL_STATE = 1;
164 const int SUCCEEDED_STATE = 2;
165 const int REJECTED_STATE = 3;
166 const int CANCELLED_STATE = 4;
64 public void Resolve() {
65 BeginSetResult();
66 EndSetResult();
67 }
167 68
168 int m_childrenCount;
169 int m_state;
170 T m_result;
171 Exception m_error;
172
173 readonly MTCustomQueue<AbstractHandler> m_handlers = new MTCustomQueue<AbstractHandler>();
174 //readonly MTQueue<AbstractHandler> m_handlers = new MTQueue<AbstractHandler>();
175
176 public Promise() {
69 public void Reject(Exception error) {
70 SetError(error);
177 71 }
178 72
179 public Promise(IPromise parent) {
180 if (parent != null)
181 AddHandler(
182 null,
183 null,
184 () => {
185 if (parent.IsExclusive)
186 parent.Cancel();
187 },
188 null,
189 false
190 );
73 #region implemented abstract members of AbstractPromise
74
75 protected override void SignalSuccess(HandlerDescriptor handler) {
76 handler.SignalSuccess();
191 77 }
192 78
193 bool BeginTransit() {
194 return UNRESOLVED_SATE == Interlocked.CompareExchange(ref m_state, TRANSITIONAL_STATE, UNRESOLVED_SATE);
79 protected override void SignalError(HandlerDescriptor handler, Exception error) {
80 handler.SignalError(error);
195 81 }
196 82
197 void CompleteTransit(int state) {
198 if (TRANSITIONAL_STATE != Interlocked.CompareExchange(ref m_state, state, TRANSITIONAL_STATE))
199 throw new InvalidOperationException("Can't complete transition when the object isn't in the transitional state");
200 }
201
202 void WaitTransition() {
203 while (m_state == TRANSITIONAL_STATE) {
204 Thread.MemoryBarrier();
205 }
206 }
207
208 public bool IsResolved {
209 get {
210 Thread.MemoryBarrier();
211 return m_state > 1;
212 }
83 protected override void SignalCancelled(HandlerDescriptor handler) {
84 handler.SignalCancel();
213 85 }
214 86
215 public bool IsCancelled {
216 get {
217 Thread.MemoryBarrier();
218 return m_state == CANCELLED_STATE;
219 }
220 }
221
222 public Type PromiseType {
223 get { return typeof(T); }
87 protected override void Listen(PromiseEventType events, Action handler) {
88 AddHandler(new HandlerDescriptor(
89 events.HasFlag(PromiseEventType.Success) ? handler : null,
90 events.HasFlag(PromiseEventType.Error) ? new Action<Exception>(err => handler()) : null,
91 events.HasFlag(PromiseEventType.Cancelled) ? handler : null,
92 null
93 ));
224 94 }
225 95
226 /// <summary>
227 /// Выполняет обещание, сообщая об успешном выполнении.
228 /// </summary>
229 /// <param name="result">Результат выполнения.</param>
230 /// <exception cref="InvalidOperationException">Данное обещание уже выполнено</exception>
231 public void Resolve(T result) {
232 if (BeginTransit()) {
233 m_result = result;
234 CompleteTransit(SUCCEEDED_STATE);
235 OnStateChanged();
236 } else {
237 WaitTransition();
238 if (m_state != CANCELLED_STATE)
239 throw new InvalidOperationException("The promise is already resolved");
96 #endregion
97
98
99 public Type PromiseType {
100 get {
101 return typeof(void);
240 102 }
241 103 }
242 104
243 /// <summary>
244 /// Выполняет обещание, сообщая об успешном выполнении. Результатом выполнения будет пустое значения.
245 /// </summary>
246 /// <remarks>
247 /// Данный вариант удобен в случаях, когда интересен факт выполнения операции, нежели полученное значение.
248 /// </remarks>
249 public void Resolve() {
250 Resolve(default(T));
105 public IPromise Then(Action success, Action<Exception> error, Action cancel) {
106 var promise = new Promise();
107 if (success != null)
108 promise.On(Cancel, PromiseEventType.Cancelled);
109
110 AddHandler(new HandlerDescriptor(success, error, cancel, promise));
111
112 return promise;
251 113 }
252 114
253 /// <summary>
254 /// Выполняет обещание, сообщая об ошибке
255 /// </summary>
256 /// <remarks>
257 /// Поскольку обещание должно работать в многопточной среде, при его выполнении сразу несколько потоков
258 /// могу вернуть ошибку, при этом только первая будет использована в качестве результата, остальные
259 /// будут проигнорированы.
260 /// </remarks>
261 /// <param name="error">Исключение возникшее при выполнении операции</param>
262 /// <exception cref="InvalidOperationException">Данное обещание уже выполнено</exception>
263 public void Reject(Exception error) {
264 if (BeginTransit()) {
265 m_error = error is TransientPromiseException ? error.InnerException : error;
266 CompleteTransit(REJECTED_STATE);
267 OnStateChanged();
268 } else {
269 WaitTransition();
270 if (m_state == SUCCEEDED_STATE)
271 throw new InvalidOperationException("The promise is already resolved");
272 }
115 public IPromise Then(Action success, Action<Exception> error) {
116 return Then(success, error, null);
273 117 }
274 118
275 /// <summary>
276 /// Отменяет операцию, если это возможно.
277 /// </summary>
278 /// <remarks>Для определения была ли операция отменена следует использовать свойство <see cref="IsCancelled"/>.</remarks>
279 public void Cancel() {
280 if (BeginTransit()) {
281 CompleteTransit(CANCELLED_STATE);
282 OnStateChanged();
283 }
119 public IPromise Then(Action success) {
120 return Then(success, null, null);
284 121 }
285 122
286 /// <summary>
287 /// Последний обработчик в цепочки обещаний.
288 /// </summary>
289 /// <param name="success"></param>
290 /// <param name="error"></param>
291 /// <param name="cancel"></param>
292 /// <remarks>
293 /// <para>
294 /// Данный метод не создает связанного с текущим обещания и предназначен для окончания
295 /// фсинхронной цепочки.
296 /// </para>
297 /// <para>
298 /// Если данный метод вызвать несколько раз, либо добавить другие обработчики, то цепочка
299 /// не будет одиночной <see cref="IsExclusive"/> и, как следствие, будет невозможна отмена
300 /// всей цепи обещаний снизу (с самого последнего обещания).
301 /// </para>
302 /// </remarks>
303 public void On(Action<T> success, Action<Exception> error, Action cancel) {
304 if (success == null && error == null && cancel == null)
305 return;
123 public IPromise On(Action success, Action<Exception> error, Action cancel) {
124 AddHandler(new HandlerDescriptor(success, error, cancel, null));
125 return this;
126 }
306 127
307 AddHandler(success, error, cancel, null, false);
128 public IPromise On(Action success, Action<Exception> error) {
129 return On(success, error, null);
308 130 }
309 131
310 public void On(Action<T> success, Action<Exception> error) {
311 AddHandler(success, error, null, null, false);
312 }
313
314 public void On(Action<T> success) {
315 AddHandler(success, null, null, null, false);
132 public IPromise On(Action success) {
133 return On(success, null, null);
316 134 }
317 135
318 public void On(Action handler, PromiseEventType events) {
319 Safe.ArgumentNotNull(handler, "handler");
320
321
322 AddHandler(
323 events.HasFlag(PromiseEventType.Success) ? new Action<T>(x => handler()) : null,
324 events.HasFlag(PromiseEventType.Error) ? new Action<Exception>( x => handler()) : null,
325 events.HasFlag(PromiseEventType.Cancelled) ? handler : null,
326 null,
327 false
136 public IPromise On(Action handler, PromiseEventType events) {
137 return On(
138 events.HasFlag(PromiseEventType.Success) ? handler : null,
139 events.HasFlag(PromiseEventType.Error) ? new Action<Exception>(err => handler()) : null,
140 events.HasFlag(PromiseEventType.Cancelled) ? handler : null
328 141 );
329 142 }
330 143
331 public IPromise Error(Action<Exception> error) {
332 if (error == null)
333 return this;
144 public IPromise<T> Cast<T>() {
145 throw new InvalidCastException();
146 }
334 147
335 var medium = new Promise<T>(this);
148 public IPromise Chain(Func<IPromise> chained, Func<Exception, IPromise> error, Func<IPromise> cancel) {
149 var medium = new Promise();
336 150
337 AddMappers(
338 x => x,
339 e => {
340 error(e);
341 return default(T);
151 On(
152 () => {
153 if (medium.IsCancelled)
154 return;
155 if (chained != null)
156 ConnectPromise(chained(), medium);
342 157 },
343 null,
344 medium,
345 true
158 ex => {
159 if (medium.IsCancelled)
160 return;
161 if (error != null) {
162 try {
163 ConnectPromise(error(ex), medium);
164 } catch (Exception ex2) {
165 medium.Reject(ex2);
166 }
167 } else {
168 medium.Reject(ex);
169 }
170 },
171 () => {
172 if (medium.IsCancelled)
173 return;
174 if (cancel != null)
175 ConnectPromise(cancel(), medium);
176 else
177 medium.Cancel();
178 }
346 179 );
347 180
181 if (chained != null)
182 medium.On(Cancel, PromiseEventType.Cancelled);
183
348 184 return medium;
349 185 }
350 186
351 /// <summary>
352 /// Handles error and allows to keep the promise.
353 /// </summary>
354 /// <remarks>
355 /// If the specified handler throws an exception, this exception will be used to reject the promise.
356 /// </remarks>
357 /// <param name="handler">The error handler which returns the result of the promise.</param>
358 /// <returns>New promise.</returns>
359 public IPromise<T> Error(Func<Exception,T> handler) {
360 if (handler == null)
361 return this;
362
363 var medium = new Promise<T>(this);
364
365 AddMappers(x => x, handler, null, medium, true);
366
367 return medium;
187 static void ConnectPromise(IPromise result, Promise medium) {
188 if (result != null) {
189 result.On(
190 medium.Resolve,
191 medium.Reject,
192 () => medium.Reject(new OperationCanceledException())
193 );
194 medium.On(result.Cancel, PromiseEventType.Cancelled);
195 } else {
196 medium.Reject(
197 new NullReferenceException(
198 "The chained asynchronous operation returned" +
199 " 'null' where the promise instance is expected"
200 )
201 );
368 202 }
369
370 /// <summary>
371 /// Позволяет преобразовать результат выполения операции к новому типу.
372 /// </summary>
373 /// <typeparam name="TNew">Новый тип результата.</typeparam>
374 /// <param name="mapper">Преобразование результата к новому типу.</param>
375 /// <param name="error">Обработчик ошибки. Данный обработчик получит
376 /// исключение возникшее при выполнении операции.</param>
377 /// <returns>Новое обещание, которое будет выполнено при выполнении исходного обещания.</returns>
378 /// <param name = "cancel"></param>
379 public IPromise<TNew> Then<TNew>(Func<T, TNew> mapper, Func<Exception,TNew> error, Action cancel) {
380 Safe.ArgumentNotNull(mapper, "mapper");
381
382 // создаем прицепленное обещание
383 var medium = new Promise<TNew>(this);
384
385 AddMappers(
386 mapper,
387 error,
388 cancel,
389 medium,
390 true
391 );
392
393 return medium;
394 }
395
396 public IPromise<TNew> Then<TNew>(Func<T, TNew> mapper, Func<Exception,TNew> error) {
397 return Then(mapper, error, null);
398 }
399
400 public IPromise<TNew> Then<TNew>(Func<T, TNew> mapper) {
401 return Then(mapper, null, null);
402 203 }
403 204
404 /// <summary>
405 /// Сцепляет несколько аснхронных операций. Указанная асинхронная операция будет вызвана после
406 /// выполнения текущей, а результат текущей операции может быть использован для инициализации
407 /// новой операции.
408 /// </summary>
409 /// <typeparam name="TNew">Тип результата указанной асинхронной операции.</typeparam>
410 /// <param name="chained">Асинхронная операция, которая должна будет начаться после выполнения текущей.</param>
411 /// <param name="error">Обработчик ошибки. Данный обработчик получит
412 /// исключение возникшее при выполнении текуещй операции.</param>
413 /// <returns>Новое обещание, которое будет выполнено по окончанию указанной аснхронной операции.</returns>
414 /// <param name = "cancel"></param>
415 public IPromise<TNew> Chain<TNew>(Func<T, IPromise<TNew>> chained, Func<Exception,IPromise<TNew>> error, Action cancel) {
416
417 Safe.ArgumentNotNull(chained, "chained");
418
419 // проблема в том, что на момент связывания еще не начата асинхронная операция, поэтому нужно
420 // создать посредника, к которому будут подвызяваться следующие обработчики.
421 // когда будет выполнена реальная асинхронная операция, она обратиться к посреднику, чтобы
422 // передать через него результаты работы.
423 var medium = new Promise<TNew>(this);
424
425 Func<T,T> resultHandler = delegate(T result) {
426 if (medium.IsCancelled)
427 return default(T);
428
429 var promise = chained(result);
430
431 promise.On(
432 medium.Resolve,
433 medium.Reject,
434 () => medium.Reject(new OperationCanceledException()) // внешняя отмена связанной операции рассматривается как ошибка
435 );
436
437 // notify chained operation that it's not needed anymore
438 // порядок вызова Then, Cancelled важен, поскольку от этого
439 // зависит IsExclusive
440 medium.On(
441 null,
442 null,
443 () => {
444 if (promise.IsExclusive)
445 promise.Cancel();
446 }
447 );
448
449 return default(T);
450 };
451
452 Func<Exception,T> errorHandler;
453
454 if (error != null)
455 errorHandler = delegate(Exception e) {
456 try {
457 var promise = error(e);
458
459 promise.On(
460 medium.Resolve,
461 medium.Reject,
462 () => medium.Reject(new OperationCanceledException()) // внешняя отмена связанной операции рассматривается как ошибка
463 );
464
465 // notify chained operation that it's not needed anymore
466 // порядок вызова Then, Cancelled важен, поскольку от этого
467 // зависит IsExclusive
468 medium.Cancelled(() => {
469 if (promise.IsExclusive)
470 promise.Cancel();
471 });
472 } catch (Exception e2) {
473 medium.Reject(e2);
474 }
475 return default(T);
476 };
477 else
478 errorHandler = err => {
479 medium.Reject(err);
480 return default(T);
481 };
482
483
484 Action cancelHandler;
485 if (cancel != null)
486 cancelHandler = () => {
487 if (cancel != null)
488 cancel();
489 medium.Cancel();
490 };
491 else
492 cancelHandler = medium.Cancel;
493
494 AddMappers(
495 resultHandler,
496 errorHandler,
497 cancelHandler,
498 null,
499 true
500 );
501
502 return medium;
503 }
504
505 public IPromise<TNew> Chain<TNew>(Func<T, IPromise<TNew>> chained, Func<Exception,IPromise<TNew>> error) {
205 public IPromise Chain(Func<IPromise> chained, Func<Exception, IPromise> error) {
506 206 return Chain(chained, error, null);
507 207 }
508 208
509 public IPromise<TNew> Chain<TNew>(Func<T, IPromise<TNew>> chained) {
209 public IPromise Chain(Func<IPromise> chained) {
510 210 return Chain(chained, null, null);
511 211 }
512 212
513 public IPromise<T> Cancelled(Action handler) {
514 var medium = new Promise<T>(this);
515 AddHandler(null, null, handler, medium, false);
516 return medium;
517 }
518
519 /// <summary>
520 /// Adds the specified handler for all cases (success, error, cancel)
521 /// </summary>
522 /// <param name="handler">The handler that will be called anyway</param>
523 /// <returns>self</returns>
524 public IPromise<T> Anyway(Action handler) {
525 Safe.ArgumentNotNull(handler, "handler");
526
527 var medium = new Promise<T>(this);
528
529 AddHandler(
530 x => handler(),
531 e => {
532 handler();
533 throw new TransientPromiseException(e);
534 },
535 handler,
536 medium,
537 true
538 );
539
540 return medium;
541 }
542
543 /// <summary>
544 /// Преобразует результат обещания к нужному типу
545 /// </summary>
546 /// <typeparam name="T2"></typeparam>
547 /// <returns></returns>
548 public IPromise<T2> Cast<T2>() {
549 return Then(x => (T2)(object)x, null);
550 }
551
552 /// <summary>
553 /// Дожидается отложенного обещания и в случае успеха, возвращает
554 /// его, результат, в противном случае бросает исключение.
555 /// </summary>
556 /// <remarks>
557 /// <para>
558 /// Если ожидание обещания было прервано по таймауту, это не значит,
559 /// что обещание было отменено или что-то в этом роде, это только
560 /// означает, что мы его не дождались, однако все зарегистрированные
561 /// обработчики, как были так остались и они будут вызваны, когда
562 /// обещание будет выполнено.
563 /// </para>
564 /// <para>
565 /// Такое поведение вполне оправдано поскольку таймаут может истечь
566 /// в тот момент, когда началась обработка цепочки обработчиков, и
567 /// к тому же текущее обещание может стоять в цепочке обещаний и его
568 /// отклонение может привести к непрогнозируемому результату.
569 /// </para>
570 /// </remarks>
571 /// <param name="timeout">Время ожидания</param>
572 /// <returns>Результат выполнения обещания</returns>
573 public T Join(int timeout) {
574 var evt = new ManualResetEvent(false);
575 Anyway(() => evt.Set());
576
577 if (!evt.WaitOne(timeout, true))
578 throw new TimeoutException();
579
580 switch (m_state) {
581 case SUCCEEDED_STATE:
582 return m_result;
583 case CANCELLED_STATE:
584 throw new OperationCanceledException();
585 case REJECTED_STATE:
586 throw new TargetInvocationException(m_error);
587 default:
588 throw new ApplicationException(String.Format("Invalid promise state {0}", m_state));
589 }
590 }
591
592 public T Join() {
593 return Join(Timeout.Infinite);
594 }
595
596 void AddMappers<T2>(Func<T,T2> success, Func<Exception,T2> error, Action cancel, Promise<T2> medium, bool inc) {
597 if (inc)
598 Interlocked.Increment(ref m_childrenCount);
599
600 AbstractHandler handler = new RemapDescriptor<T2>(success, error, cancel, medium);
601
602 bool queued;
603
604 if (!IsResolved) {
605 m_handlers.Enqueue(handler);
606 queued = true;
607 } else {
608 // the promise is in resolved state, just invoke the handled with minimum overhead
609 queued = false;
610 InvokeHandler(handler);
213 public IPromise Error(Action<Exception> error) {
214 var promise = new Promise();
215 On(
216 null,
217 err => {
218 if (error != null)
219 try {
220 error(err);
221 promise.Resolve();
222 } catch (Exception err2) {
223 promise.Reject(err2);
611 224 }
612
613 if (queued && IsResolved && m_handlers.TryDequeue(out handler))
614 // if the promise have been resolved while we was adding handler to the queue
615 // we can't guarantee that someone is still processing it
616 // therefore we will fetch a handler from the queue and execute it
617 // note that fetched handler may be not the one that we have added
618 // even we can fetch no handlers at all :)
619 InvokeHandler(handler);
620 }
621
622 void AddHandler(Action<T> success, Action<Exception> error, Action cancel, Promise<T> medium, bool inc) {
623 if (inc)
624 Interlocked.Increment(ref m_childrenCount);
625
626 AbstractHandler handler = new HandlerDescriptor(success, error, cancel, medium);
627
628 bool queued;
629
630 if (!IsResolved) {
631 m_handlers.Enqueue(handler);
632 queued = true;
633 } else {
634 // the promise is in resolved state, just invoke the handled with minimum overhead
635 queued = false;
636 InvokeHandler(handler);
637 }
638
639 if (queued && IsResolved && m_handlers.TryDequeue(out handler))
640 // if the promise have been resolved while we was adding handler to the queue
641 // we can't guarantee that someone is still processing it
642 // therefore we will fetch a handler from the queue and execute it
643 // note that fetched handler may be not the one that we have added
644 // even we can fetch no handlers at all :)
645 InvokeHandler(handler);
646 }
647
648 protected virtual void InvokeHandler(AbstractHandler handler) {
649 switch (m_state) {
650 case SUCCEEDED_STATE:
651 handler.Resolve(m_result);
652 break;
653 case REJECTED_STATE:
654 handler.Reject(m_error);
655 break;
656 case CANCELLED_STATE:
657 handler.Cancel();
658 break;
659 default:
660 // do nothing
661 return;
662 }
663 }
664
665 void OnStateChanged() {
666 AbstractHandler handler;
667 while (m_handlers.TryDequeue(out handler))
668 InvokeHandler(handler);
669 }
670
671 public bool IsExclusive {
672 get {
673 return m_childrenCount <= 1;
674 }
675 }
676
677 /// <summary>
678 /// Объединяет несколько обещаний в одно, результатом которого является массив результатов других обещаний.
679 /// Если хотябы одно из переданных обещаний не будет выполнено, то новое обещение тоже не будет выполнено.
680 /// При отмене нового обещания, переданные обещания также будут отменены, если никто больше на них не подписан.
681 /// </summary>
682 /// <param name="promises">Список обещаний. Если список пустой, то результирующее обещание возвращается уже выполненным.</param>
683 /// <returns>Обещание объединяющее в себе результат переданных обещаний.</returns>
684 /// <exception cref="ArgumentNullException"><paramref name="promises"/> не может быть null</exception>
685 public static IPromise<T[]> CreateComposite(IList<IPromise<T>> promises) {
686 if (promises == null)
687 throw new ArgumentNullException();
688
689 // создаем аккумулятор для результатов и результирующее обещание
690 var result = new T[promises.Count];
691 var promise = new Promise<T[]>();
692
693 // special case
694 if (promises.Count == 0) {
695 promise.Resolve(result);
696 return promise;
697 }
698
699 int pending = promises.Count;
700
701 for (int i = 0; i < promises.Count; i++) {
702 var dest = i;
703
704 if (promises[i] != null) {
705 promises[i].On(
706 x => {
707 result[dest] = x;
708 if (Interlocked.Decrement(ref pending) == 0)
709 promise.Resolve(result);
710 },
711 promise.Reject
712 );
713 } else {
714 if (Interlocked.Decrement(ref pending) == 0)
715 promise.Resolve(result);
716 }
717 }
718
719 promise.Cancelled(
720 () => {
721 foreach (var d in promises)
722 if (d != null && d.IsExclusive)
723 d.Cancel();
225 else
226 promise.Reject(err);
724 227 }
725 228 );
726 229
727 230 return promise;
728 231 }
729 232
730 /// <summary>
731 /// Объединяет несколько обещаний в одно. Результирующее обещание будет выполнено при
732 /// выполнении всех указанных обещаний. При этом возвращаемые значения первичных обещаний
733 /// игнорируются.
734 /// </summary>
735 /// <param name="promises">Коллекция первичных обещаний, которые будут объеденены в одно.</param>
736 /// <returns>Новое обещание, объединяющее в себе переданные.</returns>
737 /// <remarks>
738 /// Если в коллекции встречаюься <c>null</c>, то они воспринимаются как выполненные обещания.
739 /// </remarks>
740 public static IPromise CreateComposite(ICollection<IPromise> promises) {
741 if (promises == null)
742 throw new ArgumentNullException();
743 if (promises.Count == 0)
744 return Promise<object>.ResultToPromise(null);
745
746 int countdown = promises.Count;
747
748 var result = new Promise<object>();
749
750 foreach (var d in promises) {
751 if (d == null) {
752 if (Interlocked.Decrement(ref countdown) == 0)
753 result.Resolve(null);
233 public IPromise Cancelled(Action handler) {
234 var promise = new Promise();
235 On(
236 null,
237 null,
238 () => {
239 if (handler != null) {
240 try {
241 handler();
242 promise.Resolve();
243 } catch (Exception err) {
244 promise.Reject(err);
245 }
754 246 } else {
755 d.Then(() => {
756 if (Interlocked.Decrement(ref countdown) == 0)
757 result.Resolve(null);
758 });
247 promise.Cancel();
759 248 }
760 249 }
761
762 result.Cancelled(() => {
763 foreach (var d in promises)
764 if (d != null && d.IsExclusive)
765 d.Cancel();
766 });
767
768 return result;
769 }
770
771 public static Promise<T> ResultToPromise(T result) {
772 var p = new Promise<T>();
773 p.Resolve(result);
774 return p;
775 }
776
777 public static Promise<T> ExceptionToPromise(Exception error) {
778 if (error == null)
779 throw new ArgumentNullException();
780
781 var p = new Promise<T>();
782 p.Reject(error);
783 return p;
784 }
785
786 #region IPromiseBase explicit implementation
787
788 IPromise IPromise.Then(Action success, Action<Exception> error, Action cancel) {
789 return Then(
790 success != null ? new Func<T,T>(x => {
791 success();
792 return x;
793 }) : null,
794 error != null ? new Func<Exception,T>(e => {
795 error(e);
796 return default(T);
797 }) : null,
798 cancel
799 );
800 }
801
802 IPromise IPromise.Then(Action success, Action<Exception> error) {
803 return Then(
804 success != null ? new Func<T,T>(x => {
805 success();
806 return x;
807 }) : null,
808 error != null ? new Func<Exception,T>(e => {
809 error(e);
810 return default(T);
811 }) : null
812 );
813 }
814
815 IPromise IPromise.Then(Action success) {
816 Safe.ArgumentNotNull(success, "success");
817 return Then(x => {
818 success();
819 return x;
820 });
821 }
822
823 IPromise IPromise.Chain(Func<IPromise> chained, Func<Exception,IPromise> error, Action cancel) {
824 return ChainNoResult(chained, error, cancel);
825 }
826
827 IPromise ChainNoResult(Func<IPromise> chained, Func<Exception,IPromise> error, Action cancel) {
828 Safe.ArgumentNotNull(chained, "chained");
829
830 var medium = new Promise<object>(this);
831
832 Func<T,T> resultHandler = delegate {
833 if (medium.IsCancelled)
834 return default(T);
835
836 var promise = chained();
837
838 promise.On(
839 medium.Resolve,
840 medium.Reject,
841 () => medium.Reject(new OperationCanceledException()) // внешняя отмена связанной операции рассматривается как ошибка
842 250 );
843 251
844 // notify chained operation that it's not needed anymore
845 // порядок вызова Then, Cancelled важен, поскольку от этого
846 // зависит IsExclusive
847 medium.Cancelled(() => {
848 if (promise.IsExclusive)
849 promise.Cancel();
850 });
851
852 return default(T);
853 };
854
855 Func<Exception,T> errorHandler;
856
857 if (error != null)
858 errorHandler = delegate(Exception e) {
859 try {
860 var promise = error(e);
861
862 promise.On(
863 medium.Resolve,
864 medium.Reject,
865 () => medium.Reject(new OperationCanceledException()) // внешняя отмена связанной операции рассматривается как ошибка
866 );
867
868 // notify chained operation that it's not needed anymore
869 // порядок вызова Then, Cancelled важен, поскольку от этого
870 // зависит IsExclusive
871 medium.Cancelled(() => {
872 if (promise.IsExclusive)
873 promise.Cancel();
874 });
875 } catch (Exception e2) {
876 medium.Reject(e2);
252 return promise;
877 253 }
878 return default(T);
879 };
880 else
881 errorHandler = err => {
882 medium.Reject(err);
883 return default(T);
884 };
885
886
887 Action cancelHandler;
888 if (cancel != null)
889 cancelHandler = () => {
890 if (cancel != null)
891 cancel();
892 medium.Cancel();
893 };
894 else
895 cancelHandler = medium.Cancel;
896
897 AddMappers(
898 resultHandler,
899 errorHandler,
900 cancelHandler,
901 null,
902 true
903 );
904
905 return medium;
906 }
907
908 IPromise IPromise.Chain(Func<IPromise> chained, Func<Exception,IPromise> error) {
909 return ChainNoResult(chained, error, null);
910 }
911
912 IPromise IPromise.Chain(Func<IPromise> chained) {
913 return ChainNoResult(chained, null, null);
914 }
915
916
917 void IPromise.On(Action success, Action<Exception> error, Action cancel) {
918 On(success != null ? new Action<T>(x => success()) : null, error, cancel);
919 }
920
921 void IPromise.On(Action success, Action<Exception> error) {
922 On(x => success(), error, null);
923 }
924
925 void IPromise.On(Action success) {
926 On(x => success(), null, null);
927 }
928
929 IPromise IPromise.Error(Action<Exception> error) {
930 return Error(error);
931 }
932
933 IPromise IPromise.Anyway(Action handler) {
934 return Anyway(handler);
935 }
936
937 IPromise IPromise.Cancelled(Action handler) {
938 return Cancelled(handler);
939 }
940
941 void IPromise.Join() {
942 Join();
943 }
944
945 void IPromise.Join(int timeout) {
946 Join(timeout);
947 }
948
949 #endregion
950
951 254
952 255
953 256 }
954 257 }
258
@@ -1,6 +1,7
1 1 using System.Threading;
2 2 using System;
3 3 using Implab.Diagnostics;
4 using System.Collections.Generic;
4 5
5 6
6 7 #if NET_4_5
@@ -15,7 +16,8 namespace Implab {
15 16 if (context == null)
16 17 return that;
17 18
18 var p = new SyncContextPromise<T>(context, that);
19 var p = new SyncContextPromise<T>(context);
20 p.On(that.Cancel, PromiseEventType.Cancelled);
19 21
20 22 that.On(
21 23 p.Resolve,
@@ -29,7 +31,9 namespace Implab {
29 31 Safe.ArgumentNotNull(that, "that");
30 32 Safe.ArgumentNotNull(context, "context");
31 33
32 var p = new SyncContextPromise<T>(context, that);
34 var p = new SyncContextPromise<T>(context);
35 p.On(that.Cancel, PromiseEventType.Cancelled);
36
33 37
34 38 that.On(
35 39 p.Resolve,
@@ -90,6 +94,29 namespace Implab {
90 94 return that;
91 95 }
92 96
97 public static IPromise Combine(this ICollection<IPromise> that) {
98 Safe.ArgumentNotNull(that, "that");
99
100 int count = that.Count;
101 var medium = new Promise();
102
103 foreach (var p in that)
104 p.On(
105 () => {
106 if (Interlocked.Decrement(ref count) == 0)
107 medium.Resolve();
108 },
109 error => {
110 throw new Exception("The dependency promise is failed", error);
111 },
112 () => {
113 throw new OperationCanceledException("The dependency promise is cancelled");
114 }
115 );
116
117 return medium;
118 }
119
93 120 #if NET_4_5
94 121
95 122 public static Task<T> GetTask<T>(this IPromise<T> that) {
@@ -23,7 +23,7 namespace Implab
23 23
24 24 public static void ArgumentNotEmpty<T>(T[] param, string name) {
25 25 if (param == null || param.Length == 0)
26 throw new ArgumentException("The array must be not emty");
26 throw new ArgumentException("The array must be not emty", name);
27 27 }
28 28
29 29 public static void ArgumentNotNull(object param, string name) {
@@ -61,7 +61,7 namespace Implab
61 61 public static IPromise InvokePromise(Action action) {
62 62 ArgumentNotNull(action, "action");
63 63
64 var p = new Promise<object>();
64 var p = new Promise();
65 65 try {
66 66 action();
67 67 p.Resolve();
@@ -9,13 +9,16 namespace Implab {
9 9 m_context = context;
10 10 }
11 11
12 public SyncContextPromise(SynchronizationContext context, IPromise parent)
13 : base(parent) {
14 Safe.ArgumentNotNull(context, "context");
15 m_context = context;
12 protected override void SignalSuccess(IDeferred<T> handler) {
13 m_context.Post(x => base.SignalSuccess(handler), null);
16 14 }
17 protected override void InvokeHandler(AbstractHandler handler) {
18 m_context.Post(x => base.InvokeHandler(handler),null);
15
16 protected override void SignalError(IDeferred<T> handler, System.Exception error) {
17 m_context.Post(x => base.SignalError(handler, error), null);
18 }
19
20 protected override void SignalCancelled(IDeferred<T> handler) {
21 m_context.Post(x => base.SignalCancelled(handler), null);
19 22 }
20 23 }
21 24 }
@@ -11,7 +11,7 namespace MonoPlay {
11 11 if (args == null)
12 12 throw new ArgumentNullException("args");
13 13
14 var q1 = new MTQueue<int>();
14 var q1 = new AsyncQueue<int>();
15 15 var q2 = new Queue<int>();
16 16
17 17 const int count = 10000000;
@@ -19,37 +19,52 namespace MonoPlay {
19 19
20 20 var t1 = Environment.TickCount;
21 21
22 Promise<int>.CreateComposite(
23 22 new [] {
24 23 AsyncPool.InvokeNewThread(() => {
25 24 for (var i = 0; i < count; i++)
26 25 q1.Enqueue(i);
27 26 }),
28 27 AsyncPool.InvokeNewThread(() => {
28 for (var i = 0; i < count; i++)
29 q1.Enqueue(i);
30 }),
31 AsyncPool.InvokeNewThread(() => {
29 32 int temp = 0;
30 for(int i =0 ; i< count ; i++)
31 while(!q1.TryDequeue(out temp)){
32 }
33 int i = 0;
34 while (i < count)
35 if (q1.TryDequeue(out temp))
36 i++;
37 }),
38 AsyncPool.InvokeNewThread(() => {
39 int temp = 0;
40 int i = 0;
41 while (i < count)
42 if (q1.TryDequeue(out temp))
43 i++;
33 44 })
34 45 }
35 ).Join();
46 .Combine()
47 .Join();
36 48
37 49 var t2 = Environment.TickCount;
38 50 Console.WriteLine("MTQueue: {0} ms", t2 - t1);
39 51
40 52 t1 = Environment.TickCount;
41 53
42 for (var i = 0; i < count; i++)
54 for (var i = 0; i < count * 2; i++)
43 55 q2.Enqueue(i);
44 56
57 for (var i = 0; i < count * 2; i++)
58 q2.Dequeue();
59
45 60 t2 = Environment.TickCount;
46 Console.WriteLine("LinkedList: {0} ms", t2 - t1);
61 Console.WriteLine("Queue: {0} ms", t2 - t1);
47 62
48 63 q2 = new Queue<int>();
49 64
50 65 t1 = Environment.TickCount;
51 66
52 Promise<int>.CreateComposite(
67
53 68 new [] {
54 69 AsyncPool.InvokeNewThread(() => {
55 70 for (var i = 0; i < count; i++)
@@ -57,6 +72,11 namespace MonoPlay {
57 72 q2.Enqueue(i);
58 73 }),
59 74 AsyncPool.InvokeNewThread(() => {
75 for (var i = 0; i < count; i++)
76 lock (q2)
77 q2.Enqueue(i);
78 }),
79 AsyncPool.InvokeNewThread(() => {
60 80 for(int i = 0 ; i< count ;)
61 81 lock(q2) {
62 82 if(q2.Count == 0)
@@ -65,14 +85,25 namespace MonoPlay {
65 85 i++;
66 86 }
67 87
88 }),
89 AsyncPool.InvokeNewThread(() => {
90 for (int i = 0; i < count ;)
91 lock (q2) {
92 if (q2.Count == 0)
93 continue;
94 q2.Dequeue();
95 i++;
96 }
97
68 98 })
69 99 }
70 ).Join();
100 .Combine()
101 .Join();
71 102
72 103
73 104
74 105 t2 = Environment.TickCount;
75 Console.WriteLine("LinkedList+Lock: {0} ms", t2 - t1);
106 Console.WriteLine("Queue+Lock: {0} ms", t2 - t1);
76 107
77 108 }
78 109 }
1 NO CONTENT: file was removed
General Comments 0
You need to be logged in to leave comments. Login now