##// END OF EJS Templates
Promises rewritten, added improved version of AsyncQueue
cin -
r119:2573b562e328 v2
parent child
Show More
@@ -0,0 +1,219
1 using System;
2 using Implab.Parallels;
3 using System.Threading;
4 using System.Reflection;
5
6 namespace Implab {
7 public abstract class AbstractPromise<THandler> {
8
9 const int UNRESOLVED_SATE = 0;
10 const int TRANSITIONAL_STATE = 1;
11 const int SUCCEEDED_STATE = 2;
12 const int REJECTED_STATE = 3;
13 const int CANCELLED_STATE = 4;
14
15 int m_state;
16 Exception m_error;
17
18 readonly AsyncQueue<THandler> m_handlers = new AsyncQueue<THandler>();
19
20 #region state managment
21 bool BeginTransit() {
22 return UNRESOLVED_SATE == Interlocked.CompareExchange(ref m_state, TRANSITIONAL_STATE, UNRESOLVED_SATE);
23 }
24
25 void CompleteTransit(int state) {
26 if (TRANSITIONAL_STATE != Interlocked.CompareExchange(ref m_state, state, TRANSITIONAL_STATE))
27 throw new InvalidOperationException("Can't complete transition when the object isn't in the transitional state");
28 }
29
30 void WaitTransition() {
31 while (m_state == TRANSITIONAL_STATE) {
32 Thread.MemoryBarrier();
33 }
34 }
35
36 protected void BeginSetResult() {
37 if (!BeginTransit()) {
38 WaitTransition();
39 if (m_state != CANCELLED_STATE)
40 throw new InvalidOperationException("The promise is already resolved");
41 }
42 }
43
44 protected void EndSetResult() {
45 CompleteTransit(SUCCEEDED_STATE);
46 OnSuccess();
47 }
48
49
50
51 /// <summary>
52 /// Выполняет обещание, сообщая об ошибке
53 /// </summary>
54 /// <remarks>
55 /// Поскольку обещание должно работать в многопточной среде, при его выполнении сразу несколько потоков
56 /// могу вернуть ошибку, при этом только первая будет использована в качестве результата, остальные
57 /// будут проигнорированы.
58 /// </remarks>
59 /// <param name="error">Исключение возникшее при выполнении операции</param>
60 /// <exception cref="InvalidOperationException">Данное обещание уже выполнено</exception>
61 protected void SetError(Exception error) {
62 if (BeginTransit()) {
63 m_error = error is PromiseTransientException ? error.InnerException : error;
64 CompleteTransit(REJECTED_STATE);
65 OnError();
66 } else {
67 WaitTransition();
68 if (m_state == SUCCEEDED_STATE)
69 throw new InvalidOperationException("The promise is already resolved");
70 }
71 }
72
73 /// <summary>
74 /// Отменяет операцию, если это возможно.
75 /// </summary>
76 /// <remarks>Для определения была ли операция отменена следует использовать свойство <see cref="IsCancelled"/>.</remarks>
77 protected void SetCancelled() {
78 if (BeginTransit()) {
79 CompleteTransit(CANCELLED_STATE);
80 OnCancelled();
81 }
82 }
83
84 protected abstract void SignalSuccess(THandler handler);
85
86 protected abstract void SignalError(THandler handler, Exception error);
87
88 protected abstract void SignalCancelled(THandler handler);
89
90 void OnSuccess() {
91 THandler handler;
92 while (m_handlers.TryDequeue(out handler))
93 SignalSuccess(handler);
94 }
95
96 void OnError() {
97 THandler handler;
98 while (m_handlers.TryDequeue(out handler))
99 SignalError(handler,m_error);
100 }
101
102 void OnCancelled() {
103 THandler handler;
104 while (m_handlers.TryDequeue(out handler))
105 SignalCancelled(handler);
106 }
107
108 #endregion
109
110 protected abstract void Listen(PromiseEventType events, Action handler);
111
112 #region synchronization traits
113 protected void WaitResult(int timeout) {
114 if (!IsResolved) {
115 var lk = new object();
116
117 Listen(PromiseEventType.All, () => {
118 lock(lk) {
119 Monitor.Pulse(lk);
120 }
121 });
122
123 lock (lk) {
124 while(!IsResolved) {
125 if(!Monitor.Wait(lk,timeout))
126 throw new TimeoutException();
127 }
128 }
129
130 }
131 switch (m_state) {
132 case SUCCEEDED_STATE:
133 return;
134 case CANCELLED_STATE:
135 throw new OperationCanceledException();
136 case REJECTED_STATE:
137 throw new TargetInvocationException(m_error);
138 default:
139 throw new ApplicationException(String.Format("Invalid promise state {0}", m_state));
140 }
141 }
142 #endregion
143
144 #region handlers managment
145
146 protected void AddHandler(THandler handler) {
147
148 if (IsResolved) {
149 InvokeHandler(handler);
150
151 } else {
152 // the promise is in the resolved state, just invoke the handler
153 m_handlers.Enqueue(handler);
154
155
156 if (IsResolved && m_handlers.TryDequeue(out handler))
157 // if the promise have been resolved while we was adding the handler to the queue
158 // we can't guarantee that someone is still processing it
159 // therefore we need to fetch a handler from the queue and execute it
160 // note that fetched handler may be not the one that we have added
161 // even we can fetch no handlers at all :)
162 InvokeHandler(handler);
163 }
164 }
165
166 protected void InvokeHandler(THandler handler) {
167 switch (m_state) {
168 case SUCCEEDED_STATE:
169 SignalSuccess(handler);
170 break;
171 case CANCELLED_STATE:
172 SignalCancelled(handler);
173 break;
174 case REJECTED_STATE:
175 SignalError(handler, m_error);
176 break;
177 default:
178 throw new Exception(String.Format("Invalid promise state {0}", m_state));
179 }
180 }
181
182 #endregion
183
184 #region IPromise implementation
185
186 public void Join(int timeout) {
187 WaitResult(timeout);
188 }
189
190 public void Join() {
191 WaitResult(-1);
192 }
193
194 public bool IsResolved {
195 get {
196 Thread.MemoryBarrier();
197 return m_state > 1;
198 }
199 }
200
201 public bool IsCancelled {
202 get {
203 Thread.MemoryBarrier();
204 return m_state == CANCELLED_STATE;
205 }
206 }
207
208 #endregion
209
210 #region ICancellable implementation
211
212 public void Cancel() {
213 SetCancelled();
214 }
215
216 #endregion
217 }
218 }
219
@@ -0,0 +1,14
1 using System;
2
3 namespace Implab {
4 /// <summary>
5 /// Deferred result, usually used by asynchronous services as the service part of the promise.
6 /// </summary>
7 public interface IDeferred : ICancellable {
8
9 void Resolve();
10
11 void Reject(Exception error);
12 }
13 }
14
@@ -0,0 +1,10
1 using System;
2
3 namespace Implab {
4 public interface IDeferred<T> : ICancellable {
5 void Resolve(T value);
6
7 void Reject(Exception error);
8 }
9 }
10
@@ -0,0 +1,244
1 using System.Threading;
2 using System.Collections.Generic;
3 using System;
4 using System.Collections;
5
6 namespace Implab.Parallels {
7 public class AsyncQueue<T> : IEnumerable<T> {
8 class Chunk {
9 public Chunk next;
10
11 int m_low;
12 int m_hi;
13 int m_alloc;
14 readonly int m_size;
15 readonly T[] m_data;
16
17 public Chunk(int size) {
18 m_size = size;
19 m_data = new T[size];
20 }
21
22 public Chunk(int size, T value) {
23 m_size = size;
24 m_hi = 1;
25 m_alloc = 1;
26 m_data = new T[size];
27 m_data[0] = value;
28 }
29
30 public int Low {
31 get { return m_low; }
32 }
33
34 public int Hi {
35 get { return m_hi; }
36 }
37
38 public bool TryEnqueue(T value,out bool extend) {
39 extend = false;
40 int alloc;
41 do {
42 alloc = m_alloc;
43 if (alloc > m_size)
44 return false;
45 } while(alloc != Interlocked.CompareExchange(ref m_alloc, alloc+1, alloc));
46
47 if (alloc == m_size) {
48 extend = true;
49 return false;
50 }
51
52 m_data[alloc] = value;
53
54 while (alloc != Interlocked.CompareExchange(ref m_hi, alloc + 1, alloc)) {
55 // spin wait for commit
56 }
57 return true;
58 }
59
60 public bool TryDequeue(out T value,out bool recycle) {
61 int low;
62 do {
63 low = m_low;
64 if (low >= m_hi) {
65 value = default(T);
66 recycle = (low == m_size);
67 return false;
68 }
69 } while(low != Interlocked.CompareExchange(ref m_low, low + 1, low));
70
71 recycle = (low == m_size - 1);
72 value = m_data[low];
73
74 return true;
75 }
76
77 public T GetAt(int pos) {
78 return m_data[pos];
79 }
80 }
81
82 public const int DEFAULT_CHUNK_SIZE = 32;
83
84 readonly int m_chunkSize = DEFAULT_CHUNK_SIZE;
85
86 Chunk m_first;
87 Chunk m_last;
88
89 public AsyncQueue() {
90 m_last = m_first = new Chunk(m_chunkSize);
91 }
92
93 public void Enqueue(T value) {
94 var last = m_last;
95 // spin wait to the new chunk
96 bool extend = true;
97 while(last == null || !last.TryEnqueue(value, out extend)) {
98 // try to extend queue
99 if (extend || last == null) {
100 var chunk = new Chunk(m_chunkSize, value);
101 if (EnqueueChunk(last, chunk))
102 break;
103 last = m_last;
104 } else {
105 while (last != m_last) {
106 Thread.MemoryBarrier();
107 last = m_last;
108 }
109 }
110 }
111 }
112
113 public bool TryDequeue(out T value) {
114 var chunk = m_first;
115 bool recycle;
116 while (chunk != null) {
117
118 var result = chunk.TryDequeue(out value, out recycle);
119
120 if (recycle) // this chunk is waste
121 RecycleFirstChunk(chunk);
122 else
123 return result; // this chunk is usable and returned actual result
124
125 if (result) // this chunk is waste but the true result is always actual
126 return true;
127
128 // try again
129 chunk = m_first;
130 }
131
132 // the queue is empty
133 value = default(T);
134 return false;
135 }
136
137 bool EnqueueChunk(Chunk last, Chunk chunk) {
138 if (Interlocked.CompareExchange(ref m_last, chunk, last) != last)
139 return false;
140
141 if (last != null)
142 last.next = chunk;
143 else
144 m_first = chunk;
145 return true;
146 }
147
148 void RecycleFirstChunk(Chunk first) {
149 var next = first.next;
150
151 if (next == null) {
152 // looks like this is the last chunk
153 if (first != Interlocked.CompareExchange(ref m_last, null, first)) {
154 // race
155 // maybe someone already recycled this chunk
156 // or a new chunk has been appedned to the queue
157
158 return; // give up
159 }
160 // the tail is updated
161 }
162
163 // we need to update the head
164 Interlocked.CompareExchange(ref m_first, next, first);
165 // if the head is already updated then give up
166 return;
167
168 }
169
170 #region IEnumerable implementation
171
172 class Enumerator : IEnumerator<T> {
173 Chunk m_current;
174 int m_pos = -1;
175
176 public Enumerator(Chunk fisrt) {
177 m_current = fisrt;
178 }
179
180 #region IEnumerator implementation
181
182 public bool MoveNext() {
183 if (m_current == null)
184 return false;
185
186 if (m_pos == -1)
187 m_pos = m_current.Low;
188 else
189 m_pos++;
190 if (m_pos == m_current.Hi) {
191 m_pos = 0;
192 m_current = m_current.next;
193 }
194
195 return true;
196 }
197
198 public void Reset() {
199 throw new NotSupportedException();
200 }
201
202 object IEnumerator.Current {
203 get {
204 return Current;
205 }
206 }
207
208 #endregion
209
210 #region IDisposable implementation
211
212 public void Dispose() {
213 }
214
215 #endregion
216
217 #region IEnumerator implementation
218
219 public T Current {
220 get {
221 if (m_pos == -1 || m_current == null)
222 throw new InvalidOperationException();
223 return m_current.GetAt(m_pos);
224 }
225 }
226
227 #endregion
228 }
229
230 public IEnumerator<T> GetEnumerator() {
231 return new Enumerator(m_first);
232 }
233
234 #endregion
235
236 #region IEnumerable implementation
237
238 IEnumerator IEnumerable.GetEnumerator() {
239 return GetEnumerator();
240 }
241
242 #endregion
243 }
244 }
This diff has been collapsed as it changes many lines, (621 lines changed) Show them Hide them
@@ -0,0 +1,621
1 using System;
2 using System.Diagnostics;
3
4 namespace Implab {
5
6 /// <summary>
7 /// Класс для асинхронного получения результатов. Так называемое "обещание".
8 /// </summary>
9 /// <typeparam name="T">Тип получаемого результата</typeparam>
10 /// <remarks>
11 /// <para>Сервис при обращении к его методу дает обещаиние о выполнении операции,
12 /// клиент получив такое обещание может установить ряд обратных вызово для получения
13 /// событий выполнения обещания, тоесть завершения операции и предоставлении результатов.</para>
14 /// <para>
15 /// Обещение может быть как выполнено, так и выполнено с ошибкой. Для подписки на
16 /// данные события клиент должен использовать методы <c>Then</c>.
17 /// </para>
18 /// <para>
19 /// Сервис, в свою очередь, по окончанию выполнения операции (возможно с ошибкой),
20 /// использует методы <c>Resolve</c> либо <c>Reject</c> для оповещения клиетна о
21 /// выполнении обещания.
22 /// </para>
23 /// <para>
24 /// Если сервер успел выполнить обещание еще до того, как клиент на него подписался,
25 /// то в момент подписки клиента будут вызваны соответсвующие события в синхронном
26 /// режиме и клиент будет оповещен в любом случае. Иначе, обработчики добавляются в
27 /// список в порядке подписания и в этом же порядке они будут вызваны при выполнении
28 /// обещания.
29 /// </para>
30 /// <para>
31 /// Обрабатывая результаты обещания можно преобразовывать результаты либо инициировать
32 /// связанные асинхронные операции, которые также возвращают обещания. Для этого следует
33 /// использовать соответствующую форму методе <c>Then</c>.
34 /// </para>
35 /// <para>
36 /// Также хорошим правилом является то, что <c>Resolve</c> и <c>Reject</c> должен вызывать
37 /// только инициатор обещания иначе могут возникнуть противоречия.
38 /// </para>
39 /// </remarks>
40 public class Promise<T> : AbstractPromise<IDeferred<T>>, IPromise<T>, IDeferred<T> {
41
42 class StubDeferred : IDeferred<T> {
43 public static readonly StubDeferred instance = new StubDeferred();
44
45 StubDeferred() {
46 }
47
48 #region IDeferred implementation
49
50 public void Resolve(T value) {
51 }
52
53 public void Reject(Exception error) {
54 }
55
56 #endregion
57
58 #region ICancellable implementation
59
60 public void Cancel() {
61 }
62
63 #endregion
64
65
66 }
67
68 class RemapDescriptor<T2> : IDeferred<T> {
69 readonly Func<T,T2> m_remap;
70 readonly Func<Exception,T2> m_failed;
71 readonly Func<T2> m_cancel;
72 readonly IDeferred<T2> m_deferred;
73
74 public RemapDescriptor(Func<T,T2> remap, Func<Exception,T2> failed, Func<T2> cancel, IDeferred<T2> deferred ) {
75 Debug.Assert(deferred != null);
76 m_remap = remap;
77 m_failed = failed;
78 m_cancel = cancel;
79 m_deferred = deferred;
80 }
81
82
83
84 #region IDeferred implementation
85
86 public void Resolve(T value) {
87 if (m_remap != null) {
88 try {
89 m_deferred.Resolve(m_remap(value));
90 } catch (Exception ex) {
91 Reject(ex);
92 }
93 }
94 }
95
96 public void Reject(Exception error) {
97 if (m_failed != null) {
98 try {
99 m_deferred.Resolve(m_failed(error));
100 } catch (Exception ex) {
101 m_deferred.Reject(ex);
102 }
103 } else {
104 m_deferred.Reject(error);
105 }
106 }
107
108
109 #endregion
110
111 #region ICancellable implementation
112
113 public void Cancel() {
114 if (m_cancel != null) {
115 try {
116 m_deferred.Resolve(m_cancel());
117 } catch (Exception ex) {
118 Reject(ex);
119 }
120 } else {
121 m_deferred.Cancel();
122 }
123 }
124
125 #endregion
126 }
127
128 class ListenerDescriptor : IDeferred<T> {
129 readonly Action m_handler;
130 readonly PromiseEventType m_events;
131
132 public ListenerDescriptor(Action handler, PromiseEventType events) {
133 Debug.Assert(handler != null);
134
135 m_handler = handler;
136 m_events = events;
137 }
138
139 #region IDeferred implementation
140
141 public void Resolve(T value) {
142 if (m_events.HasFlag(PromiseEventType.Success)) {
143 try {
144 m_handler();
145 // Analysis disable once EmptyGeneralCatchClause
146 } catch {
147 }
148 }
149 }
150
151 public void Reject(Exception error) {
152 if (m_events.HasFlag(PromiseEventType.Error)){
153 try {
154 m_handler();
155 // Analysis disable once EmptyGeneralCatchClause
156 } catch {
157 }
158 }
159 }
160
161 #endregion
162
163 #region ICancellable implementation
164
165 public void Cancel() {
166 if (m_events.HasFlag(PromiseEventType.Cancelled)){
167 try {
168 m_handler();
169 // Analysis disable once EmptyGeneralCatchClause
170 } catch {
171 }
172 }
173 }
174
175 #endregion
176 }
177
178 class ValueEventDescriptor : IDeferred<T> {
179 readonly Action<T> m_success;
180 readonly Action<Exception> m_failed;
181 readonly Action m_cancelled;
182 readonly IDeferred<T> m_deferred;
183
184 public ValueEventDescriptor(Action<T> success, Action<Exception> failed, Action cancelled, IDeferred<T> deferred) {
185 Debug.Assert(deferred != null);
186
187 m_success = success;
188 m_failed = failed;
189 m_cancelled = cancelled;
190 m_deferred = deferred;
191 }
192
193 #region IDeferred implementation
194
195 public void Resolve(T value) {
196 if (m_success != null) {
197 try {
198 m_success(value);
199 m_deferred.Resolve(value);
200 } catch (Exception ex) {
201 Reject(ex);
202 }
203 }
204 }
205
206 public void Reject(Exception error) {
207 if (m_failed != null) {
208 try {
209 m_failed(error);
210 m_deferred.Resolve(default(T));
211 } catch(Exception ex) {
212 m_deferred.Reject(ex);
213 }
214 } else {
215 m_deferred.Reject(error);
216 }
217 }
218
219 #endregion
220
221 #region ICancellable implementation
222
223 public void Cancel() {
224 if (m_cancelled != null) {
225 try {
226 m_cancelled();
227 m_deferred.Resolve(default(T));
228 } catch(Exception ex) {
229 Reject(ex);
230 }
231 } else {
232 m_deferred.Cancel();
233 }
234 }
235
236 #endregion
237 }
238
239 public class EventDescriptor : IDeferred<T> {
240 readonly Action m_success;
241 readonly Action<Exception> m_failed;
242 readonly Action m_cancelled;
243 readonly IDeferred<T> m_deferred;
244
245 public EventDescriptor(Action success, Action<Exception> failed, Action cancelled, IDeferred<T> deferred) {
246 Debug.Assert(deferred != null);
247
248 m_success = success;
249 m_failed = failed;
250 m_cancelled = cancelled;
251 m_deferred = deferred;
252 }
253
254 #region IDeferred implementation
255
256 public void Resolve(T value) {
257 if (m_success != null) {
258 try {
259 m_success();
260 m_deferred.Resolve(value);
261 } catch (Exception ex) {
262 Reject(ex);
263 }
264 }
265 }
266
267 public void Reject(Exception error) {
268 if (m_failed != null) {
269 try {
270 m_failed(error);
271 m_deferred.Resolve(default(T));
272 }catch (Exception ex)
273 {
274 m_deferred.Reject(ex);
275 }
276 } else {
277 m_deferred.Reject(error);
278 }
279
280 }
281
282 #endregion
283
284 #region ICancellable implementation
285
286 public void Cancel() {
287 if (m_cancelled != null) {
288 try {
289 m_cancelled();
290 m_deferred.Resolve(default(T));
291 } catch (Exception ex) {
292 Reject(ex);
293 }
294 } else {
295 m_deferred.Cancel();
296 }
297 }
298
299 #endregion
300 }
301
302 T m_result;
303
304 public virtual void Resolve(T value) {
305 BeginSetResult();
306 m_result = value;
307 EndSetResult();
308 }
309
310 public void Reject(Exception error) {
311 SetError(error);
312 }
313
314 public Type PromiseType {
315 get {
316 return typeof(T);
317 }
318 }
319
320 public new T Join() {
321 WaitResult(-1);
322 return m_result;
323 }
324 public new T Join(int timeout) {
325 WaitResult(timeout);
326 return m_result;
327 }
328
329 public IPromise<T> On(Action<T> success, Action<Exception> error, Action cancel) {
330 AddHandler(new ValueEventDescriptor(success, error, cancel, StubDeferred.instance));
331 return this;
332 }
333
334 public IPromise<T> On(Action<T> success, Action<Exception> error) {
335 AddHandler(new ValueEventDescriptor(success, error, null, StubDeferred.instance));
336 return this;
337 }
338
339 public IPromise<T> On(Action<T> success) {
340 AddHandler(new ValueEventDescriptor(success, null, null, StubDeferred.instance));
341 return this;
342 }
343
344 public IPromise<T> On(Action handler, PromiseEventType events) {
345 Listen(events, handler);
346 return this;
347 }
348
349 public IPromise<T2> Then<T2>(Func<T, T2> mapper, Func<Exception, T2> error, Func<T2> cancel) {
350 var promise = new Promise<T2>();
351 AddHandler(new RemapDescriptor<T2>(mapper, error, cancel, promise));
352 return promise;
353 }
354
355 public IPromise<T2> Then<T2>(Func<T, T2> mapper, Func<Exception, T2> error) {
356 var promise = new Promise<T2>();
357 AddHandler(new RemapDescriptor<T2>(mapper, error, null, promise));
358 return promise;
359 }
360
361 public IPromise<T2> Then<T2>(Func<T, T2> mapper) {
362 var promise = new Promise<T2>();
363 AddHandler(new RemapDescriptor<T2>(mapper, null, null, promise));
364 return promise;
365 }
366
367 public IPromise<T2> Chain<T2>(Func<T, IPromise<T2>> chained, Func<Exception, IPromise<T2>> error, Func<IPromise<T2>> cancel) {
368 // this promise will be resolved when an asyc operation is started
369 var promise = new Promise<IPromise<T2>>();
370
371 AddHandler(new RemapDescriptor<IPromise<T2>>(
372 chained,
373 error,
374 cancel,
375 promise
376 ));
377
378 var medium = new Promise<T2>();
379
380 if (chained != null)
381 medium.On(Cancel, PromiseEventType.Cancelled);
382
383 // we need to connect started async operation with the medium
384 // if the async operation hasn't been started by the some reason
385 // report is to the medium
386 promise.On(
387 result => ConnectPromise<T2>(result, medium),
388 medium.Reject,
389 medium.Cancel
390 );
391
392 return medium;
393 }
394
395 static void ConnectPromise<T2>(IPromise<T2> result, Promise<T2> medium) {
396 if (result != null) {
397 result.On(
398 medium.Resolve,
399 medium.Reject,
400 () => medium.Reject(new OperationCanceledException())
401 );
402 medium.On(result.Cancel, PromiseEventType.Cancelled);
403 } else {
404 medium.Reject(
405 new NullReferenceException(
406 "The chained asynchronous operation returned" +
407 " 'null' where the promise instance is expected"
408 )
409 );
410 }
411 }
412
413 public IPromise<T2> Chain<T2>(Func<T, IPromise<T2>> chained, Func<Exception, IPromise<T2>> error) {
414 return Chain(chained, error, null);
415 }
416
417 public IPromise<T2> Chain<T2>(Func<T, IPromise<T2>> chained) {
418 return Chain(chained, null, null);
419 }
420
421 public IPromise<T2> Error<T2>(Func<Exception, T2> error) {
422 var promise = new Promise<T2>();
423 if (error != null)
424 On(
425 (Action<T>)null,
426 ex => {
427 try {
428 promise.Resolve(error(ex));
429 } catch (Exception ex2) {
430 promise.Reject(ex2);
431 }
432 }
433 );
434 else
435 Listen(PromiseEventType.Error, () => promise.Resolve(default(T2)));
436 return promise;
437 }
438
439 public IPromise<T2> Cancelled<T2>(Func<T2> handler) {
440 var promise = new Promise<T2>();
441 if (handler != null)
442 On(
443 (Action<T>)null,
444 null,
445 () => {
446 try {
447 promise.Resolve(handler());
448 } catch (Exception ex) {
449 promise.Reject(ex);
450 }
451 });
452 else
453 Listen(PromiseEventType.Cancelled, () => promise.Resolve(default(T2)));
454 return promise;
455 }
456
457 public IPromise Then(Action success, Action<Exception> error, Action cancel) {
458 var promise = new Promise<T>();
459 if (success != null)
460 promise.On(Cancel, PromiseEventType.Cancelled);
461
462 AddHandler(new EventDescriptor(success, error, cancel, promise));
463
464 return promise;
465 }
466
467 public IPromise Then(Action success, Action<Exception> error) {
468 return Then(success, error, null);
469 }
470
471 public IPromise Then(Action success) {
472 return Then(success, null, null);
473 }
474
475 public IPromise Chain(Func<IPromise> chained, Func<Exception, IPromise> error, Func<IPromise> cancel) {
476 var promise = new Promise<IPromise>();
477
478 AddHandler(
479 new RemapDescriptor<IPromise>(
480 x => chained(),
481 error,
482 cancel,
483 promise
484 )
485 );
486
487 var medium = new Promise();
488 if (chained != null)
489 medium.On(Cancel, PromiseEventType.Cancelled);
490
491 promise.On(
492 result => ConnectPromise(result, medium),
493 medium.Reject,
494 medium.Cancel
495 );
496
497 return medium;
498 }
499
500 static void ConnectPromise(IPromise result, Promise medium) {
501 if (result != null) {
502 result.On(
503 medium.Resolve,
504 medium.Reject,
505 () => medium.Reject(new OperationCanceledException())
506 );
507 medium.On(result.Cancel, PromiseEventType.Cancelled);
508 } else {
509 medium.Reject(
510 new NullReferenceException(
511 "The chained asynchronous operation returned" +
512 " 'null' where the promise instance is expected"
513 )
514 );
515 }
516 }
517
518 public IPromise Chain(Func<IPromise> chained, Func<Exception, IPromise> error) {
519 return Chain(chained, error, null);
520 }
521
522 public IPromise Chain(Func<IPromise> chained) {
523 return Chain(chained, null, null);
524 }
525
526 public IPromise On(Action success, Action<Exception> error, Action cancel) {
527 AddHandler(new EventDescriptor(success,error,cancel, StubDeferred.instance));
528 return this;
529 }
530
531 public IPromise On(Action success, Action<Exception> error) {
532 AddHandler(new EventDescriptor(success, error, null, StubDeferred.instance));
533 return this;
534 }
535
536 public IPromise On(Action success) {
537 Listen(PromiseEventType.Success, success);
538 return this;
539 }
540
541 IPromise IPromise.On(Action handler, PromiseEventType events) {
542 Listen(events,handler);
543 return this;
544 }
545
546 public IPromise Error(Action<Exception> error) {
547 var promise = new Promise();
548 if (error != null)
549 On(
550 (Action<T>)null,
551 ex => {
552 try {
553 error(ex);
554 promise.Resolve();
555 } catch (Exception ex2) {
556 promise.Reject(ex2);
557 }
558 });
559 else
560 Listen(PromiseEventType.Error, promise.Resolve);
561 return promise;
562 }
563
564 public IPromise Cancelled(Action handler) {
565 var promise = new Promise();
566 if (handler != null)
567 On(
568 (Action<T>)null,
569 null,
570 () => {
571 try {
572 handler();
573 promise.Resolve();
574 } catch (Exception ex) {
575 promise.Reject(ex);
576 }
577 });
578 else
579 Listen(PromiseEventType.Cancelled, promise.Resolve);
580 return promise;
581 }
582
583 public IPromise<T2> Cast<T2>() {
584 return (IPromise<T2>)this;
585 }
586
587 #region implemented abstract members of AbstractPromise
588
589 protected override void SignalSuccess(IDeferred<T> handler) {
590 handler.Resolve(m_result);
591 }
592
593 protected override void SignalError(IDeferred<T> handler, Exception error) {
594 handler.Reject(error);
595 }
596
597 protected override void SignalCancelled(IDeferred<T> handler) {
598 handler.Cancel();
599 }
600
601 protected override void Listen(PromiseEventType events, Action handler) {
602 if (handler != null)
603 AddHandler(new ListenerDescriptor(handler, events));
604 }
605
606 #endregion
607
608 public static IPromise<T> ResultToPromise(T value) {
609 var p = new Promise<T>();
610 p.Resolve(value);
611 return p;
612 }
613
614 public static IPromise<T> ExceptionToPromise(Exception error) {
615 var p = new Promise<T>();
616 p.Reject(error);
617 return p;
618 }
619
620 }
621 }
@@ -0,0 +1,33
1 using System;
2
3 namespace Implab {
4
5 [Serializable]
6 public class PromiseTransientException : Exception {
7 /// <summary>
8 /// Initializes a new instance of the <see cref="PromiseTransientException"/> class.
9 /// </summary>
10 /// <param name="inner">The exception that is the cause of the current exception.</param>
11 public PromiseTransientException(Exception inner) : base("The preceding promise has failed", inner) {
12 }
13
14 /// <summary>
15 /// Initializes a new instance of the <see cref="PromiseTransientException"/> class
16 /// </summary>
17 /// <param name="message">A <see cref="T:System.String"/> that describes the exception. </param>
18 /// <param name="inner">The exception that is the cause of the current exception. </param>
19 public PromiseTransientException(string message, Exception inner)
20 : base(message, inner) {
21 }
22
23 /// <summary>
24 /// Initializes a new instance of the <see cref="PromiseTransientException"/> class
25 /// </summary>
26 /// <param name="context">The contextual information about the source or destination.</param>
27 /// <param name="info">The object that holds the serialized object data.</param>
28 protected PromiseTransientException(System.Runtime.Serialization.SerializationInfo info, System.Runtime.Serialization.StreamingContext context)
29 : base(info, context) {
30 }
31 }
32 }
33
@@ -1,30 +1,38
1 1 using System.Windows.Forms;
2 2 using System;
3 3
4 4
5 5 namespace Implab.Fx {
6 6 public class ControlBoundPromise<T> : Promise<T> {
7 7 readonly Control m_target;
8 8
9 9 public ControlBoundPromise(Control target) {
10 10 Safe.ArgumentNotNull(target, "target");
11 11
12 12 m_target = target;
13 13 }
14 14
15 public ControlBoundPromise(Control target, IPromise parent)
16 : base(parent) {
17 Safe.ArgumentNotNull(target, "target");
18
19 m_target = target;
15 protected override void SignalSuccess(IDeferred<T> handler) {
16 if (m_target.InvokeRequired)
17 m_target.BeginInvoke(new Action<IDeferred<T>>(base.SignalSuccess), handler);
18 else
19 base.SignalSuccess(handler);
20 20 }
21 21
22 protected override void InvokeHandler(AbstractHandler handler) {
22 protected override void SignalCancelled(IDeferred<T> handler) {
23 23 if (m_target.InvokeRequired)
24 m_target.BeginInvoke(new Action<AbstractHandler>(base.InvokeHandler), handler);
24 m_target.BeginInvoke(new Action<IDeferred<T>>(base.SignalCancelled), handler);
25 25 else
26 base.InvokeHandler(handler);
26 base.SignalCancelled(handler);
27 27 }
28
29 protected override void SignalError(IDeferred<T> handler, Exception error) {
30 if (m_target.InvokeRequired)
31 m_target.BeginInvoke(new Action<IDeferred<T>,Exception>(base.SignalError), handler, error);
32 else
33 base.SignalError(handler, error);
34 }
35
28 36 }
29 37 }
30 38
@@ -1,41 +1,43
1 1 using System;
2 2 using System.Windows.Forms;
3 3 using System.Threading;
4 4
5 5 namespace Implab.Fx
6 6 {
7 7 public static class PromiseHelpers
8 8 {
9 9 /// <summary>
10 10 /// Перенаправляет обработку обещания в поток указанного элемента управления.
11 11 /// </summary>
12 12 /// <typeparam name="T">Тип результата обещания</typeparam>
13 13 /// <param name="that">Исходное обещание</param>
14 14 /// <param name="ctl">Элемент управления</param>
15 15 /// <returns>Новое обещание, обработчики которого будут выполнены в потоке элемента управления.</returns>
16 16 /// <exception cref="ArgumentNullException">Параметр не может быть <c>null</c>.</exception>
17 17 /// <example>
18 18 /// client
19 19 /// .Get("description.txt") // returns a promise
20 20 /// .DispatchToControl(m_ctl) // handle the promise in the thread of the control
21 21 /// .Then(
22 22 /// description => m_ctl.Text = description // now it's safe
23 23 /// )
24 24 /// </example>
25 25 public static IPromise<T> DispatchToControl<T>(this IPromise<T> that, Control ctl)
26 26 {
27 27 Safe.ArgumentNotNull(that, "that");
28 28 Safe.ArgumentNotNull(ctl, "ctl");
29 29
30 var directed = new ControlBoundPromise<T>(ctl,that);
30 var directed = new ControlBoundPromise<T>(ctl);
31
32 directed.On(that.Cancel, PromiseEventType.Cancelled);
31 33
32 34 that.On(
33 35 directed.Resolve,
34 36 directed.Reject,
35 37 directed.Cancel
36 38 );
37 39
38 40 return directed;
39 41 }
40 42 }
41 43 }
@@ -1,452 +1,513
1 1 using System;
2 2 using System.Reflection;
3 3 using System.Threading;
4 4 using Implab.Parallels;
5 5
6 6 #if MONO
7 7
8 8 using NUnit.Framework;
9 9 using TestClassAttribute = NUnit.Framework.TestFixtureAttribute;
10 10 using TestMethod = NUnit.Framework.TestAttribute;
11 11
12 12 #else
13 13
14 14 using Microsoft.VisualStudio.TestTools.UnitTesting;
15 15
16 16 #endif
17 17
18 18 namespace Implab.Test {
19 19 [TestClass]
20 20 public class AsyncTests {
21 21 [TestMethod]
22 22 public void ResolveTest() {
23 23 int res = -1;
24 24 var p = new Promise<int>();
25 25 p.Then(x => res = x);
26 26 p.Resolve(100);
27 27
28 28 Assert.AreEqual(100, res);
29 29 }
30 30
31 31 [TestMethod]
32 32 public void RejectTest() {
33 33 int res = -1;
34 34 Exception err = null;
35 35
36 36 var p = new Promise<int>();
37 37 p.Then(
38 38 x => res = x,
39 39 e => {
40 40 err = e;
41 41 return -2;
42 42 }
43 43 );
44 44 p.Reject(new ApplicationException("error"));
45 45
46 46 Assert.AreEqual(res, -1);
47 47 Assert.AreEqual(err.Message, "error");
48 48
49 49 }
50 50
51 51 [TestMethod]
52 52 public void CancelExceptionTest() {
53 53 var p = new Promise<bool>();
54 54 p.Cancel();
55 55
56 56 var p2 = p.Cancelled(() => {
57 57 throw new ApplicationException("CANCELLED");
58 58 });
59 59
60 60 try {
61 61 p2.Join();
62 62 Assert.Fail();
63 63 } catch (ApplicationException err) {
64 64 Assert.AreEqual("CANCELLED", err.InnerException.Message);
65 65 }
66 66
67 67 }
68 68
69 69 [TestMethod]
70 70 public void ContinueOnCancelTest() {
71 71 var p = new Promise<bool>();
72 72 p.Cancel();
73 73
74 74 var p2 = p
75 .Cancelled(() => {
75 .Cancelled<bool>(() => {
76 76 throw new ApplicationException("CANCELLED");
77 77 })
78 78 .Error(e => true);
79 79
80 80 Assert.AreEqual(true, p2.Join());
81 81 }
82 82
83 83 [TestMethod]
84 84 public void JoinSuccessTest() {
85 85 var p = new Promise<int>();
86 86 p.Resolve(100);
87 87 Assert.AreEqual(p.Join(), 100);
88 88 }
89 89
90 90 [TestMethod]
91 91 public void JoinFailTest() {
92 92 var p = new Promise<int>();
93 93 p.Reject(new ApplicationException("failed"));
94 94
95 95 try {
96 96 p.Join();
97 97 throw new ApplicationException("WRONG!");
98 98 } catch (TargetInvocationException err) {
99 99 Assert.AreEqual(err.InnerException.Message, "failed");
100 100 } catch {
101 101 Assert.Fail("Got wrong excaption");
102 102 }
103 103 }
104 104
105 105 [TestMethod]
106 106 public void MapTest() {
107 107 var p = new Promise<int>();
108 108
109 109 var p2 = p.Then(x => x.ToString());
110 110 p.Resolve(100);
111 111
112 112 Assert.AreEqual(p2.Join(), "100");
113 113 }
114 114
115 115 [TestMethod]
116 116 public void FixErrorTest() {
117 117 var p = new Promise<int>();
118 118
119 119 var p2 = p.Error(e => 101);
120 120
121 121 p.Reject(new Exception());
122 122
123 123 Assert.AreEqual(p2.Join(), 101);
124 124 }
125 125
126 126 [TestMethod]
127 127 public void ChainTest() {
128 128 var p1 = new Promise<int>();
129 129
130 130 var p3 = p1.Chain(x => {
131 131 var p2 = new Promise<string>();
132 132 p2.Resolve(x.ToString());
133 133 return p2;
134 134 });
135 135
136 136 p1.Resolve(100);
137 137
138 138 Assert.AreEqual(p3.Join(), "100");
139 139 }
140 140
141 141 [TestMethod]
142 142 public void ChainFailTest() {
143 143 var p1 = new Promise<int>();
144 144
145 145 var p3 = p1.Chain(x => {
146 146 var p2 = new Promise<string>();
147 147 p2.Reject(new Exception("DIE!!!"));
148 148 return p2;
149 149 });
150 150
151 151 p1.Resolve(100);
152 152
153 153 Assert.IsTrue(p3.IsResolved);
154 154 }
155 155
156 156 [TestMethod]
157 157 public void PoolTest() {
158 158 var pid = Thread.CurrentThread.ManagedThreadId;
159 159 var p = AsyncPool.Invoke(() => Thread.CurrentThread.ManagedThreadId);
160 160
161 161 Assert.AreNotEqual(pid, p.Join());
162 162 }
163 163
164 164 [TestMethod]
165 165 public void WorkerPoolSizeTest() {
166 166 var pool = new WorkerPool(5, 10, 1);
167 167
168 168 Assert.AreEqual(5, pool.PoolSize);
169 169
170 170 pool.Invoke(() => { Thread.Sleep(100000000); return 10; });
171 171 pool.Invoke(() => { Thread.Sleep(100000000); return 10; });
172 172 pool.Invoke(() => { Thread.Sleep(100000000); return 10; });
173 173
174 174 Assert.AreEqual(5, pool.PoolSize);
175 175
176 176 for (int i = 0; i < 100; i++)
177 177 pool.Invoke(() => { Thread.Sleep(100000000); return 10; });
178 178 Thread.Sleep(200);
179 179 Assert.AreEqual(10, pool.PoolSize);
180 180
181 181 pool.Dispose();
182 182 }
183 183
184 184 [TestMethod]
185 185 public void WorkerPoolCorrectTest() {
186 186 var pool = new WorkerPool(0,1000,100);
187 187
188 188 const int iterations = 1000;
189 189 int pending = iterations;
190 190 var stop = new ManualResetEvent(false);
191 191
192 192 var count = 0;
193 193 for (int i = 0; i < iterations; i++) {
194 194 pool
195 195 .Invoke(() => 1)
196 196 .Then(x => Interlocked.Add(ref count, x))
197 197 .Then(x => Math.Log10(x))
198 .Anyway(() => {
198 .On(() => {
199 199 Interlocked.Decrement(ref pending);
200 200 if (pending == 0)
201 201 stop.Set();
202 });
202 }, PromiseEventType.All);
203 203 }
204 204
205 205 stop.WaitOne();
206 206
207 207 Assert.AreEqual(iterations, count);
208 208 Console.WriteLine("Max threads: {0}", pool.MaxRunningThreads);
209 209 pool.Dispose();
210 210
211 211 }
212 212
213 213 [TestMethod]
214 214 public void WorkerPoolDisposeTest() {
215 215 var pool = new WorkerPool(5, 20);
216 216 Assert.AreEqual(5, pool.PoolSize);
217 217 pool.Dispose();
218 218 Thread.Sleep(500);
219 219 Assert.AreEqual(0, pool.PoolSize);
220 220 pool.Dispose();
221 221 }
222 222
223 223 [TestMethod]
224 224 public void MTQueueTest() {
225 225 var queue = new MTQueue<int>();
226 226 int res;
227 227
228 228 queue.Enqueue(10);
229 229 Assert.IsTrue(queue.TryDequeue(out res));
230 230 Assert.AreEqual(10, res);
231 231 Assert.IsFalse(queue.TryDequeue(out res));
232 232
233 233 for (int i = 0; i < 1000; i++)
234 234 queue.Enqueue(i);
235 235
236 236 for (int i = 0; i < 1000; i++) {
237 237 queue.TryDequeue(out res);
238 238 Assert.AreEqual(i, res);
239 239 }
240 240
241 241 int writers = 0;
242 242 int readers = 0;
243 243 var stop = new ManualResetEvent(false);
244 244 int total = 0;
245 245
246 246 const int itemsPerWriter = 10000;
247 247 const int writersCount = 10;
248 248
249 249 for (int i = 0; i < writersCount; i++) {
250 250 Interlocked.Increment(ref writers);
251 251 AsyncPool
252 252 .InvokeNewThread(() => {
253 253 for (int ii = 0; ii < itemsPerWriter; ii++) {
254 254 queue.Enqueue(1);
255 255 }
256 256 return 1;
257 257 })
258 .Anyway(() => Interlocked.Decrement(ref writers));
258 .On(() => Interlocked.Decrement(ref writers), PromiseEventType.All);
259 259 }
260 260
261 261 for (int i = 0; i < 10; i++) {
262 262 Interlocked.Increment(ref readers);
263 263 AsyncPool
264 264 .InvokeNewThread(() => {
265 265 int t;
266 266 do {
267 267 while (queue.TryDequeue(out t))
268 268 Interlocked.Add(ref total, t);
269 269 } while (writers > 0);
270 270 return 1;
271 271 })
272 .Anyway(() => {
272 .On(() => {
273 273 Interlocked.Decrement(ref readers);
274 274 if (readers == 0)
275 275 stop.Set();
276 });
276 }, PromiseEventType.All);
277 }
278
279 stop.WaitOne();
280
281 Assert.AreEqual(100000, total);
282 }
283
284 [TestMethod]
285 public void AsyncQueueTest() {
286 var queue = new AsyncQueue<int>();
287 int res;
288
289 queue.Enqueue(10);
290 Assert.IsTrue(queue.TryDequeue(out res));
291 Assert.AreEqual(10, res);
292 Assert.IsFalse(queue.TryDequeue(out res));
293
294 for (int i = 0; i < 1000; i++)
295 queue.Enqueue(i);
296
297 for (int i = 0; i < 1000; i++) {
298 queue.TryDequeue(out res);
299 Assert.AreEqual(i, res);
300 }
301
302 int writers = 0;
303 int readers = 0;
304 var stop = new ManualResetEvent(false);
305 int total = 0;
306
307 const int itemsPerWriter = 10000;
308 const int writersCount = 10;
309
310 for (int i = 0; i < writersCount; i++) {
311 Interlocked.Increment(ref writers);
312 AsyncPool
313 .InvokeNewThread(() => {
314 for (int ii = 0; ii < itemsPerWriter; ii++) {
315 queue.Enqueue(1);
316 }
317 return 1;
318 })
319 .On(() => Interlocked.Decrement(ref writers), PromiseEventType.All);
320 }
321
322 for (int i = 0; i < 10; i++) {
323 Interlocked.Increment(ref readers);
324 AsyncPool
325 .InvokeNewThread(() => {
326 int t;
327 do {
328 while (queue.TryDequeue(out t))
329 Interlocked.Add(ref total, t);
330 } while (writers > 0);
331 return 1;
332 })
333 .On(() => {
334 Interlocked.Decrement(ref readers);
335 if (readers == 0)
336 stop.Set();
337 }, PromiseEventType.All);
277 338 }
278 339
279 340 stop.WaitOne();
280 341
281 342 Assert.AreEqual(itemsPerWriter * writersCount, total);
282 343 }
283 344
284 345 [TestMethod]
285 346 public void ParallelMapTest() {
286 347
287 348 const int count = 100000;
288 349
289 350 var args = new double[count];
290 351 var rand = new Random();
291 352
292 353 for (int i = 0; i < count; i++)
293 354 args[i] = rand.NextDouble();
294 355
295 356 var t = Environment.TickCount;
296 357 var res = args.ParallelMap(x => Math.Sin(x*x), 4).Join();
297 358
298 359 Console.WriteLine("Map complete in {0} ms", Environment.TickCount - t);
299 360
300 361 t = Environment.TickCount;
301 362 for (int i = 0; i < count; i++)
302 363 Assert.AreEqual(Math.Sin(args[i] * args[i]), res[i]);
303 364 Console.WriteLine("Verified in {0} ms", Environment.TickCount - t);
304 365 }
305 366
306 367 [TestMethod]
307 368 public void ChainedMapTest() {
308 369
309 370 using (var pool = new WorkerPool(0,10,1)) {
310 371 const int count = 10000;
311 372
312 373 var args = new double[count];
313 374 var rand = new Random();
314 375
315 376 for (int i = 0; i < count; i++)
316 377 args[i] = rand.NextDouble();
317 378
318 379 var t = Environment.TickCount;
319 380 var res = args
320 381 .ChainedMap(
321 382 // Analysis disable once AccessToDisposedClosure
322 383 x => pool.Invoke(
323 384 () => Math.Sin(x * x)
324 385 ),
325 386 4
326 387 )
327 388 .Join();
328 389
329 390 Console.WriteLine("Map complete in {0} ms", Environment.TickCount - t);
330 391
331 392 t = Environment.TickCount;
332 393 for (int i = 0; i < count; i++)
333 394 Assert.AreEqual(Math.Sin(args[i] * args[i]), res[i]);
334 395 Console.WriteLine("Verified in {0} ms", Environment.TickCount - t);
335 396 Console.WriteLine("Max workers: {0}", pool.MaxRunningThreads);
336 397 }
337 398 }
338 399
339 400 [TestMethod]
340 401 public void ParallelForEachTest() {
341 402
342 403 const int count = 100000;
343 404
344 405 var args = new int[count];
345 406 var rand = new Random();
346 407
347 408 for (int i = 0; i < count; i++)
348 409 args[i] = (int)(rand.NextDouble() * 100);
349 410
350 411 int result = 0;
351 412
352 413 var t = Environment.TickCount;
353 414 args.ParallelForEach(x => Interlocked.Add(ref result, x), 4).Join();
354 415
355 416 Console.WriteLine("Iteration complete in {0} ms, result: {1}", Environment.TickCount - t, result);
356 417
357 418 int result2 = 0;
358 419
359 420 t = Environment.TickCount;
360 421 for (int i = 0; i < count; i++)
361 422 result2 += args[i];
362 423 Assert.AreEqual(result2, result);
363 424 Console.WriteLine("Verified in {0} ms", Environment.TickCount - t);
364 425 }
365 426
366 427 [TestMethod]
367 428 public void ComplexCase1Test() {
368 429 var flags = new bool[3];
369 430
370 431 // op1 (aync 200ms) => op2 (async 200ms) => op3 (sync map)
371 432
372 433 var step1 = PromiseHelper
373 434 .Sleep(200, "Alan")
374 .Cancelled(() => flags[0] = true);
435 .On(() => flags[0] = true, PromiseEventType.Cancelled);
375 436 var p = step1
376 437 .Chain(x =>
377 438 PromiseHelper
378 439 .Sleep(200, "Hi, " + x)
379 440 .Then(y => y)
380 .Cancelled(() => flags[1] = true)
441 .On(() => flags[1] = true, PromiseEventType.Cancelled)
381 442 )
382 .Cancelled(() => flags[2] = true);
443 .On(() => flags[2] = true, PromiseEventType.Cancelled);
383 444 step1.Join();
384 445 p.Cancel();
385 446 try {
386 447 Assert.AreEqual(p.Join(), "Hi, Alan");
387 448 Assert.Fail("Shouldn't get here");
388 449 } catch (OperationCanceledException) {
389 450 }
390 451
391 452 Assert.IsFalse(flags[0]);
392 453 Assert.IsTrue(flags[1]);
393 454 Assert.IsTrue(flags[2]);
394 455 }
395 456
396 457 [TestMethod]
397 458 public void ChainedCancel1Test() {
398 459 // при отмене сцепленной асинхронной операции все обещание должно
399 460 // завершаться ошибкой OperationCanceledException
400 461 var p = PromiseHelper
401 462 .Sleep(1, "Hi, HAL!")
402 463 .Then(x => {
403 464 // запускаем две асинхронные операции
404 465 var result = PromiseHelper.Sleep(1000, "HEM ENABLED!!!");
405 466 // вторая операция отменяет первую до завершения
406 467 PromiseHelper
407 468 .Sleep(100, "HAL, STOP!")
408 469 .Then(result.Cancel);
409 470 return result;
410 471 });
411 472 try {
412 473 p.Join();
413 474 } catch (TargetInvocationException err) {
414 475 Assert.IsTrue(err.InnerException is OperationCanceledException);
415 476 }
416 477 }
417 478
418 479 [TestMethod]
419 480 public void ChainedCancel2Test() {
420 481 // при отмене цепочки обещаний, вложенные операции также должны отменяться
421 482 var pSurvive = new Promise<bool>();
422 483 var hemStarted = new ManualResetEvent(false);
423 484 var p = PromiseHelper
424 485 .Sleep(1, "Hi, HAL!")
425 486 .Chain(x => {
426 487 hemStarted.Set();
427 488 // запускаем две асинхронные операции
428 489 var result = PromiseHelper
429 490 .Sleep(100000000, "HEM ENABLED!!!")
430 491 .Then(s => {
431 492 pSurvive.Resolve(false);
432 493 return s;
433 494 });
434 495
435 496 result
436 497 .Cancelled(() => pSurvive.Resolve(true));
437 498
438 499 return result;
439 500 });
440 501
441 502 hemStarted.WaitOne();
442 503 p.Cancel();
443 504
444 505 try {
445 506 p.Join();
446 507 } catch (OperationCanceledException) {
447 508 Assert.IsTrue(pSurvive.Join());
448 509 }
449 510 }
450 511 }
451 512 }
452 513
@@ -1,48 +1,48
1 1 using System;
2 2 using Implab.Parallels;
3 3 using System.Threading;
4 4
5 5 namespace Implab {
6 6 public class ComponentContainer : IComponentContainer, IDisposable {
7 7 static readonly ComponentContainer _appContainer;
8 8
9 9 static ComponentContainer() {
10 10 _appContainer = new ComponentContainer();
11 11 AppDomain.CurrentDomain.ProcessExit += HandleProcessExit;
12 12 }
13 13
14 14 public static ComponentContainer Global {
15 15 get {
16 16 return _appContainer;
17 17 }
18 18 }
19 19
20 20 bool m_disposed;
21 readonly MTQueue<IDisposable> m_components = new MTQueue<IDisposable>();
21 readonly AsyncQueue<IDisposable> m_components = new AsyncQueue<IDisposable>();
22 22
23 23 public void Add(IDisposable item) {
24 24 Safe.ArgumentNotNull(item, "item");
25 25 Thread.MemoryBarrier();
26 26 if (m_disposed) {
27 27 item.Dispose();
28 28 } else {
29 29 m_components.Enqueue(item);
30 30 if (m_disposed && m_components.TryDequeue(out item))
31 31 item.Dispose();
32 32 }
33 33 }
34 34
35 35 public void Dispose() {
36 36 m_disposed = true;
37 37 IDisposable item;
38 38 while (m_components.TryDequeue(out item))
39 39 item.Dispose();
40 40 }
41 41
42 42 static void HandleProcessExit (object sender, EventArgs e)
43 43 {
44 44 _appContainer.Dispose();
45 45 }
46 46 }
47 47 }
48 48
@@ -1,43 +1,43
1 namespace Implab.Diagnostics {
1 using System;
2
3 namespace Implab.Diagnostics {
2 4 public static class Extensions {
3 5 public static IPromise<T> EndLogicalOperation<T>(this IPromise<T> promise) {
4 6 Safe.ArgumentNotNull(promise, "promise");
5 7 var op = TraceContext.Instance.DetachLogicalOperation();
6 8
7 return promise.Then<T>(
8 x => {
9 return promise.On(
10 x => {
9 11 TraceContext.Instance.EnterLogicalOperation(op,true);
10 12 TraceLog.TraceInformation("promise = {0}", x);
11 13 TraceLog.EndLogicalOperation();
12 14 TraceContext.Instance.Leave();
13 return x;
14 15 },
15 16 err =>{
16 17 TraceContext.Instance.EnterLogicalOperation(op,true);
17 18 TraceLog.TraceError("promise died {0}", err);
18 19 TraceLog.EndLogicalOperation();
19 20 TraceContext.Instance.Leave();
20 throw new TransientPromiseException(err);
21 21 },
22 22 () => {
23 23 TraceContext.Instance.EnterLogicalOperation(op,true);
24 24 TraceLog.TraceInformation("promise cancelled");
25 25 TraceLog.EndLogicalOperation();
26 26 TraceContext.Instance.Leave();
27 27 }
28 28 );
29 29 }
30 30
31 31 public static IPromise EndLogicalOperation(this IPromise promise) {
32 32 Safe.ArgumentNotNull(promise, "promise");
33 33 var op = TraceContext.Instance.DetachLogicalOperation();
34 34
35 return promise.Anyway(() => {
35 return promise.On(() => {
36 36 TraceContext.Instance.EnterLogicalOperation(op,true);
37 37 TraceLog.EndLogicalOperation();
38 38 TraceContext.Instance.Leave();
39 });
39 }, PromiseEventType.All);
40 40 }
41 41 }
42 42 }
43 43
@@ -1,83 +1,83
1 1 using System;
2 2 using System.Collections.Generic;
3 3 using System.Threading;
4 4
5 5 namespace Implab.Diagnostics {
6 6 /// <summary>
7 7 /// Trace context is bound to the specific thread, each thread has it's own ThreadContext.
8 8 /// </summary>
9 9 /// <remarks>
10 10 /// ThreadContext manages relations between logical operations and threads.
11 11 /// </remarks>
12 12 public class TraceContext {
13 13
14 14 [ThreadStatic]
15 15 static TraceContext _instance;
16 16
17 17 OperationContext m_current = OperationContext.EMPTY;
18 18 readonly Stack<OperationContext> m_stack = new Stack<OperationContext>();
19 19 readonly int m_threadId;
20 20
21 21 public static TraceContext Instance {
22 22 get {
23 23 if (_instance == null)
24 24 _instance = new TraceContext();
25 25 return _instance;
26 26 }
27 27 }
28 28
29 29 public TraceContext() {
30 30 m_threadId = Thread.CurrentThread.ManagedThreadId;
31 31 }
32 32
33 33 public int ThreadId {
34 34 get { return m_threadId; }
35 35 }
36 36
37 37 public LogicalOperation CurrentOperation {
38 38 get {
39 39 return m_current.CurrentOperation;
40 40 }
41 41 }
42 42
43 43 public void EnterLogicalOperation(LogicalOperation operation, bool takeOwnership) {
44 44 var prev = CurrentOperation;
45 45 m_stack.Push(m_current);
46 46 m_current = new OperationContext(operation, takeOwnership);
47 47 LogChannel<TraceEvent>.Default.LogEvent(new TraceEvent(takeOwnership ? TraceEventType.Attach : TraceEventType.Enter, String.Format("{0} -> {1}",prev.Name, operation.Name)));
48 48 }
49 49
50 50 public void StartLogicalOperation(string name) {
51 51 m_current.BeginLogicalOperation(name);
52 52 LogChannel<TraceEvent>.Default.LogEvent(new TraceEvent(TraceEventType.OperationStarted, String.Format("+{0}",CurrentOperation.Name)));
53 53 }
54 54
55 55 public void StartLogicalOperation() {
56 56 StartLogicalOperation(String.Empty);
57 57 }
58 58
59 59 public void EndLogicalOperation() {
60 60 LogChannel<TraceEvent>.Default.LogEvent(new TraceEvent(TraceEventType.OperationCompleted, String.Format("-{0} : {1}ms",CurrentOperation.Name, CurrentOperation.Duration)));
61 61 m_current.EndLogicalOperation();
62 62 }
63 63
64 64 public LogicalOperation DetachLogicalOperation() {
65 65 var prev = m_current.DetachLogicalOperation();
66 66 LogChannel<TraceEvent>.Default.LogEvent(new TraceEvent(TraceEventType.Detach, String.Format("{0} -> {1}",prev.Name, CurrentOperation.Name)));
67 67 return prev;
68 68 }
69 69
70 70 public void Leave() {
71 71 if (m_stack.Count > 0) {
72 72 m_current.Leave();
73 73 var prev = CurrentOperation;
74 74 m_current = m_stack.Pop();
75 75 LogChannel<TraceEvent>.Default.LogEvent(new TraceEvent(TraceEventType.Leave, String.Format("{0} -> {1}", prev.Name, CurrentOperation.Name)));
76 76 } else {
77 TraceLog.TraceWarning("Attemtp to leave the last operation context");
77 TraceLog.TraceWarning("Attempt to leave the last operation context");
78 78 m_current = OperationContext.EMPTY;
79 79 }
80 80 }
81 81 }
82 82 }
83 83
@@ -1,90 +1,90
1 1 using System;
2 2 using Implab.Parallels;
3 3 using System.Threading;
4 4 using System.Diagnostics;
5 5 using System.Diagnostics.CodeAnalysis;
6 6
7 7 namespace Implab {
8 8 public abstract class DisposablePool<T> : IDisposable {
9 9 readonly int m_size;
10 readonly MTQueue<T> m_queue = new MTQueue<T>();
10 readonly AsyncQueue<T> m_queue = new AsyncQueue<T>();
11 11
12 12 [SuppressMessage("Microsoft.Design", "CA1000:DoNotDeclareStaticMembersOnGenericTypes")]
13 13 static readonly bool _isValueType = typeof(T).IsValueType;
14 14
15 15 bool m_disposed;
16 16
17 17 int m_count;
18 18
19 19 protected DisposablePool(int size) {
20 20 m_size = size;
21 21 }
22 22
23 23 protected DisposablePool() : this(Environment.ProcessorCount+1) {
24 24 }
25 25
26 26 public T Allocate() {
27 27 if (m_disposed)
28 28 throw new ObjectDisposedException(ToString());
29 29
30 30 T instance;
31 31 if (m_queue.TryDequeue(out instance)) {
32 32 Interlocked.Decrement(ref m_count);
33 33 } else {
34 34 instance = CreateInstance();
35 35 Debug.Assert(!Object.Equals(instance, default(T)) || _isValueType);
36 36 }
37 37 return instance;
38 38 }
39 39
40 40 protected abstract T CreateInstance();
41 41
42 42 protected virtual void CleanupInstance(T instance) {
43 43 }
44 44
45 45 public void Release(T instance) {
46 46 if ( Object.Equals(instance,default(T)) && !_isValueType)
47 47 return;
48 48
49 49 Thread.MemoryBarrier();
50 50 if (m_count < m_size && !m_disposed) {
51 51 Interlocked.Increment(ref m_count);
52 52
53 53 CleanupInstance(instance);
54 54
55 55 m_queue.Enqueue(instance);
56 56
57 57 // пока элемент возвращался в кеш, была начата операция освобождения всего кеша
58 58 // и возможно уже законцена, в таком случае следует извлечь элемент обратно и
59 59 // освободить его. Если операция освобождения кеша еще не заврешилась, то будет
60 60 // изъят и освобожден произвольный элемен, что не повлияет на ход всего процесса.
61 61 if (m_disposed && m_queue.TryDequeue(out instance) && instance is IDisposable)
62 62 ((IDisposable)instance).Dispose() ;
63 63
64 64 } else {
65 65 if (instance is IDisposable)
66 66 ((IDisposable)instance).Dispose();
67 67 }
68 68 }
69 69
70 70 protected virtual void Dispose(bool disposing) {
71 71 if (disposing) {
72 72 m_disposed = true;
73 73 T instance;
74 74 while (m_queue.TryDequeue(out instance))
75 75 if (instance is IDisposable)
76 76 ((IDisposable)instance).Dispose();
77 77 }
78 78 }
79 79
80 80 #region IDisposable implementation
81 81
82 82 public void Dispose() {
83 83 Dispose(true);
84 84 GC.SuppressFinalize(this);
85 85 }
86 86
87 87 #endregion
88 88 }
89 89 }
90 90
@@ -1,87 +1,135
1 1 using System;
2 2 using System.Collections.Generic;
3 3 using System.Linq;
4 4 using System.Text;
5 5
6 6 namespace Implab {
7 7 public interface IPromise: ICancellable {
8 /// <summary>
9 /// Check whereather the promise has no more than one dependent promise.
10 /// </summary>
11 bool IsExclusive {
12 get;
13 }
14 8
15 9 /// <summary>
16 10 /// Тип результата, получаемого через данное обещание.
17 11 /// </summary>
18 12 Type PromiseType { get; }
19 13
20 14 /// <summary>
21 15 /// Обещание является выполненым, либо успешно, либо с ошибкой, либо отменено.
22 16 /// </summary>
23 17 bool IsResolved { get; }
24 18
25 19 /// <summary>
26 20 /// Обещание было отменено.
27 21 /// </summary>
28 22 bool IsCancelled { get; }
29 23
24 /// <summary>
25 /// Creates a new promise dependend on the current one and resolved on
26 /// executing the specified handlers.
27 /// </summary>
28 /// <param name="success">The handler called on the successful promise completion.</param>
29 /// <param name="error">The handler is called if an error while completing the promise occurred.</param>
30 /// <param name="cancel">The handler is called in case of promise cancellation.</param>
31 /// <returns>The newly created dependant promise.</returns>
32 /// <remarks>
33 /// <para>
34 /// If the success handler is specified the dependend promise will be resolved after the handler is
35 /// executed and the dependent promise will be linked to the current one, i.e. the cancellation
36 /// of the dependent property will lead to the cancellation of the current promise. If the
37 /// success handler isn't specified the dependent promise will not be linked to and
38 /// will not be resolved after the successfull resolution of the current one.
39 /// </para>
40 /// <para>
41 /// When the error handler is specified, the exception raised during the current promise completion
42 /// will be passed to it as the parameter. If the error handler returns without raising an
43 /// exception then the dependant promise will be resolved successfully, otherwise the exception
44 /// raised by the handler will be transmitted to the dependent promise. If the handler wants
45 /// to passthrough the original exception it needs to wrap the exception with
46 /// the <see cref="PromiseTransientException"/>.
47 /// </para>
48 /// <para>
49 /// If the cancelation handler is specified and the current promise is cancelled then the dependent
50 /// promise will be resolved after the handler is executed. If the cancelation hendler raises the
51 /// exception it will be passed to the dependent promise.
52 /// </para>
53 /// </remarks>
30 54 IPromise Then(Action success, Action<Exception> error, Action cancel);
31 55 IPromise Then(Action success, Action<Exception> error);
32 56 IPromise Then(Action success);
33 57
34 IPromise Chain(Func<IPromise> chained, Func<Exception, IPromise> error, Action cancel);
58 IPromise Chain(Func<IPromise> chained, Func<Exception, IPromise> error, Func<IPromise> cancel);
35 59 IPromise Chain(Func<IPromise> chained, Func<Exception, IPromise> error);
36 60 IPromise Chain(Func<IPromise> chained);
37 61
38 62 /// <summary>
39 /// Добавляет последнй обработчик в цепочку обещаний, не создает промежуточных обещаний.
63 /// Adds specified listeners to the current promise.
40 64 /// </summary>
41 /// <param name="success">Success.</param>
42 /// <param name="error">Error.</param>
43 /// <param name="cancel">Cancel.</param>
44 void On(Action success, Action<Exception> error, Action cancel);
45 void On(Action success, Action<Exception> error);
46 void On(Action success);
47 void On(Action success, PromiseEventType events);
65 /// <param name="success">The handler called on the successful promise completion.</param>
66 /// <param name="error">The handler is called if an error while completing the promise occurred.</param>
67 /// <param name="cancel">The handler is called in case of promise cancellation.</param>
68 /// <returns>The current promise.</returns>
69 IPromise On(Action success, Action<Exception> error, Action cancel);
70 IPromise On(Action success, Action<Exception> error);
71 IPromise On(Action success);
48 72
49 IPromise Error(Action<Exception> error);
50 73 /// <summary>
51 /// Обрабатывает либо ошибку, либо результат, либо отмену.
74 /// Adds specified listeners to the current promise.
75 /// </summary>
76 /// <param name="handler">The handler called on the specified events.</param>
77 /// <param name = "events">The combination of flags denoting the events for which the
78 /// handler shoud be called.</param>
79 /// <returns>The current promise.</returns>
80 IPromise On(Action handler, PromiseEventType events);
81
82 /// <summary>
83 /// Adds the specified error handler to the current promise
84 /// and creates the new dependant promise.
52 85 /// </summary>
53 /// <param name="handler">Обработчик.</param>
54 /// <remarks>После обработке ошибки, она передается дальше.</remarks>
86 /// <param name="error">
87 /// The error handler. If the error handler returns without
88 /// an error the dependant promise will be successfully resolved.
89 /// </param>
90 /// <returns>
91 /// The new dependant promise which will be resolved after the error
92 /// handler is executed.
93 /// </returns>
94 /// <remarks>
95 /// The successfull result of the current promise will be ignored.
96 /// </remarks>
97 IPromise Error(Action<Exception> error);
98
55 99 /// <summary>
56 /// Обрабатывает либо ошибку, либо результат, либо отмену обещания.
100 /// Adds the specified cncellation handler to the current promise
101 /// and creates the new dependant promise.
57 102 /// </summary>
58 /// <param name="handler">Обработчик.</param>
59 /// <remarks>После обработке ошибки, она передается дальше.</remarks>
60 IPromise Anyway(Action handler);
61 /// <summary>
62 /// Обработчик для регистрации отмены обещания.
63 /// </summary>
64 /// <returns>Новое обещание, связанное с текущим, выполнится после указанного обработчика.</returns>
65 /// <param name="handler">Обработчик события.</param>
66 /// <remarks>Если обработчик вызывает исключение, то оно передается обработчику ошибки, результат работы
67 /// которого будет передан связанному обещанию</remarks>
103 /// <returns>
104 /// The new dependant promise which will be resolved after the cancellation
105 /// handler is executed.
106 /// </returns>
107 /// <param name="handler">
108 /// The cancellation handler.
109 /// </param>
110 /// <remarks>
111 /// If the cancellation handler is executed without an error the dependent
112 /// promise will be successfully resolved, otherwise the raised exception
113 /// will be passed to the dependant promise. The successful result of the
114 /// current promise will be ignored.
115 /// </remarks>
68 116 IPromise Cancelled(Action handler);
69 117
70 118 /// <summary>
71 119 /// Преобразует результат обещания к заданному типу и возвращает новое обещание.
72 120 /// </summary>
73 121 IPromise<T> Cast<T>();
74 122
75 123 /// <summary>
76 124 /// Синхронизирует текущий поток с обещанием.
77 125 /// </summary>
78 126 void Join();
79 127 /// <summary>
80 128 /// Синхронизирует текущий поток с обещанием.
81 129 /// </summary>
82 130 /// <param name="timeout">Время ожидания, по его истечению возникнет исключение.</param>
83 131 /// <exception cref="TimeoutException">Превышено время ожидания.</exception>
84 132 void Join(int timeout);
85 133
86 134 }
87 135 }
@@ -1,34 +1,34
1 1 using System;
2 2
3 3 namespace Implab {
4 public interface IPromise<T> : IPromise {
4 public interface IPromise<out T> : IPromise {
5 5
6 6 new T Join();
7 7
8 8 new T Join(int timeout);
9 9
10 void On(Action<T> success, Action<Exception> error, Action cancel);
10 IPromise<T> On(Action<T> success, Action<Exception> error, Action cancel);
11 11
12 void On(Action<T> success, Action<Exception> error);
12 IPromise<T> On(Action<T> success, Action<Exception> error);
13 13
14 void On(Action<T> success);
14 IPromise<T> On(Action<T> success);
15 15
16 IPromise<T2> Then<T2>(Func<T, T2> mapper, Func<Exception,T2> error, Action cancel);
16 new IPromise<T> On(Action handler, PromiseEventType events);
17
18 IPromise<T2> Then<T2>(Func<T, T2> mapper, Func<Exception,T2> error, Func<T2> cancel);
17 19
18 20 IPromise<T2> Then<T2>(Func<T, T2> mapper, Func<Exception,T2> error);
19 21
20 22 IPromise<T2> Then<T2>(Func<T, T2> mapper);
21 23
22 IPromise<T2> Chain<T2>(Func<T, IPromise<T2>> chained, Func<Exception,IPromise<T2>> error, Action cancel);
24 IPromise<T2> Chain<T2>(Func<T, IPromise<T2>> chained, Func<Exception,IPromise<T2>> error, Func<IPromise<T2>> cancel);
23 25
24 26 IPromise<T2> Chain<T2>(Func<T, IPromise<T2>> chained, Func<Exception,IPromise<T2>> error);
25 27
26 28 IPromise<T2> Chain<T2>(Func<T, IPromise<T2>> chained);
27 29
28 IPromise<T> Error(Func<Exception,T> error);
30 IPromise<T2> Error<T2>(Func<Exception,T2> error);
29 31
30 new IPromise<T> Cancelled(Action handler);
31
32 new IPromise<T> Anyway(Action handler);
32 IPromise<T2> Cancelled<T2>(Func<T2> handler);
33 33 }
34 34 }
@@ -1,225 +1,232
1 1 <?xml version="1.0" encoding="utf-8"?>
2 2 <Project DefaultTargets="Build" ToolsVersion="4.0" xmlns="http://schemas.microsoft.com/developer/msbuild/2003">
3 3 <PropertyGroup>
4 4 <Configuration Condition=" '$(Configuration)' == '' ">Debug</Configuration>
5 5 <Platform Condition=" '$(Platform)' == '' ">AnyCPU</Platform>
6 6 <ProjectGuid>{F550F1F8-8746-4AD0-9614-855F4C4B7F05}</ProjectGuid>
7 7 <OutputType>Library</OutputType>
8 8 <RootNamespace>Implab</RootNamespace>
9 9 <AssemblyName>Implab</AssemblyName>
10 <ProductVersion>8.0.30703</ProductVersion>
11 <SchemaVersion>2.0</SchemaVersion>
10 12 </PropertyGroup>
11 13 <PropertyGroup Condition=" '$(Configuration)|$(Platform)' == 'Debug|AnyCPU' ">
12 14 <DebugSymbols>true</DebugSymbols>
13 15 <DebugType>full</DebugType>
14 16 <Optimize>false</Optimize>
15 17 <OutputPath>bin\Debug</OutputPath>
16 18 <DefineConstants>TRACE;DEBUG;</DefineConstants>
17 19 <ErrorReport>prompt</ErrorReport>
18 20 <WarningLevel>4</WarningLevel>
19 21 <ConsolePause>false</ConsolePause>
20 22 <RunCodeAnalysis>true</RunCodeAnalysis>
21 23 </PropertyGroup>
22 24 <PropertyGroup Condition=" '$(Configuration)|$(Platform)' == 'Release|AnyCPU' ">
23 25 <DebugType>full</DebugType>
24 26 <Optimize>true</Optimize>
25 27 <OutputPath>bin\Release</OutputPath>
26 28 <ErrorReport>prompt</ErrorReport>
27 29 <WarningLevel>4</WarningLevel>
28 30 <ConsolePause>false</ConsolePause>
29 31 </PropertyGroup>
30 32 <PropertyGroup Condition=" '$(Configuration)|$(Platform)' == 'Debug 4.5|AnyCPU' ">
31 33 <DebugSymbols>true</DebugSymbols>
32 34 <DebugType>full</DebugType>
33 35 <Optimize>false</Optimize>
34 36 <OutputPath>bin\Debug</OutputPath>
35 37 <DefineConstants>TRACE;DEBUG;NET_4_5</DefineConstants>
36 38 <ErrorReport>prompt</ErrorReport>
37 39 <WarningLevel>4</WarningLevel>
38 40 <RunCodeAnalysis>true</RunCodeAnalysis>
39 41 <ConsolePause>false</ConsolePause>
40 42 </PropertyGroup>
41 43 <PropertyGroup Condition=" '$(Configuration)|$(Platform)' == 'Release 4.5|AnyCPU' ">
42 44 <Optimize>true</Optimize>
43 45 <OutputPath>bin\Release</OutputPath>
44 46 <ErrorReport>prompt</ErrorReport>
45 47 <WarningLevel>4</WarningLevel>
46 48 <ConsolePause>false</ConsolePause>
47 49 <DefineConstants>NET_4_5</DefineConstants>
48 50 </PropertyGroup>
49 51 <PropertyGroup Condition=" '$(Configuration)|$(Platform)' == 'DebugMono|AnyCPU' ">
50 52 <DebugSymbols>true</DebugSymbols>
51 53 <DebugType>full</DebugType>
52 54 <Optimize>false</Optimize>
53 55 <OutputPath>bin\Debug</OutputPath>
54 56 <DefineConstants>TRACE;DEBUG;NET_4_5;MONO</DefineConstants>
55 57 <ErrorReport>prompt</ErrorReport>
56 58 <WarningLevel>4</WarningLevel>
57 59 <RunCodeAnalysis>true</RunCodeAnalysis>
58 60 <ConsolePause>false</ConsolePause>
59 61 </PropertyGroup>
60 62 <PropertyGroup Condition=" '$(Configuration)|$(Platform)' == 'ReleaseMono|AnyCPU' ">
61 63 <Optimize>true</Optimize>
62 64 <OutputPath>bin\Release</OutputPath>
63 65 <DefineConstants>NET_4_5;MONO;</DefineConstants>
64 66 <ErrorReport>prompt</ErrorReport>
65 67 <WarningLevel>4</WarningLevel>
66 68 <ConsolePause>false</ConsolePause>
67 69 </PropertyGroup>
68 70 <ItemGroup>
69 71 <Reference Include="System" />
70 72 <Reference Include="System.Xml" />
71 73 </ItemGroup>
72 74 <ItemGroup>
73 75 <Compile Include="Component.cs" />
74 76 <Compile Include="CustomEqualityComparer.cs" />
75 77 <Compile Include="Diagnostics\ConsoleTraceListener.cs" />
76 78 <Compile Include="Diagnostics\EventText.cs" />
77 79 <Compile Include="Diagnostics\IEventTextFormatter.cs" />
78 80 <Compile Include="Diagnostics\LogChannel.cs" />
79 81 <Compile Include="Diagnostics\LogicalOperation.cs" />
80 82 <Compile Include="Diagnostics\TextFileListener.cs" />
81 83 <Compile Include="Diagnostics\TextListenerBase.cs" />
82 84 <Compile Include="Diagnostics\TraceLog.cs" />
83 85 <Compile Include="Diagnostics\TraceEvent.cs" />
84 86 <Compile Include="Diagnostics\TraceEventType.cs" />
85 87 <Compile Include="Disposable.cs" />
86 88 <Compile Include="ICancellable.cs" />
87 89 <Compile Include="IProgressHandler.cs" />
88 90 <Compile Include="IProgressNotifier.cs" />
89 91 <Compile Include="IPromiseT.cs" />
90 92 <Compile Include="IPromise.cs" />
91 93 <Compile Include="IServiceLocator.cs" />
92 94 <Compile Include="ITaskController.cs" />
93 95 <Compile Include="JSON\JSONElementContext.cs" />
94 96 <Compile Include="JSON\JSONElementType.cs" />
95 97 <Compile Include="JSON\JSONGrammar.cs" />
96 98 <Compile Include="JSON\JSONParser.cs" />
97 99 <Compile Include="JSON\JSONScanner.cs" />
98 100 <Compile Include="JSON\JsonTokenType.cs" />
99 101 <Compile Include="JSON\JSONWriter.cs" />
100 102 <Compile Include="JSON\JSONXmlReader.cs" />
101 103 <Compile Include="JSON\JSONXmlReaderOptions.cs" />
102 104 <Compile Include="JSON\StringTranslator.cs" />
103 105 <Compile Include="Parallels\DispatchPool.cs" />
104 106 <Compile Include="Parallels\ArrayTraits.cs" />
105 107 <Compile Include="Parallels\MTQueue.cs" />
106 108 <Compile Include="Parallels\WorkerPool.cs" />
107 109 <Compile Include="Parsing\Alphabet.cs" />
108 110 <Compile Include="Parsing\AlphabetBase.cs" />
109 111 <Compile Include="Parsing\AltToken.cs" />
110 112 <Compile Include="Parsing\BinaryToken.cs" />
111 113 <Compile Include="Parsing\CatToken.cs" />
112 114 <Compile Include="Parsing\CDFADefinition.cs" />
113 115 <Compile Include="Parsing\DFABuilder.cs" />
114 116 <Compile Include="Parsing\DFADefinitionBase.cs" />
115 117 <Compile Include="Parsing\DFAStateDescriptor.cs" />
116 118 <Compile Include="Parsing\DFAutomaton.cs" />
117 119 <Compile Include="Parsing\EDFADefinition.cs" />
118 120 <Compile Include="Parsing\EmptyToken.cs" />
119 121 <Compile Include="Parsing\EndToken.cs" />
120 122 <Compile Include="Parsing\EnumAlphabet.cs" />
121 123 <Compile Include="Parsing\Grammar.cs" />
122 124 <Compile Include="Parsing\IAlphabet.cs" />
123 125 <Compile Include="Parsing\IDFADefinition.cs" />
124 126 <Compile Include="Parsing\IVisitor.cs" />
125 127 <Compile Include="Parsing\ParserException.cs" />
126 128 <Compile Include="Parsing\Scanner.cs" />
127 129 <Compile Include="Parsing\StarToken.cs" />
128 130 <Compile Include="Parsing\SymbolToken.cs" />
129 131 <Compile Include="Parsing\Token.cs" />
130 132 <Compile Include="ServiceLocator.cs" />
131 133 <Compile Include="TaskController.cs" />
132 134 <Compile Include="ProgressInitEventArgs.cs" />
133 135 <Compile Include="Properties\AssemblyInfo.cs" />
134 <Compile Include="Promise.cs" />
135 136 <Compile Include="Parallels\AsyncPool.cs" />
136 137 <Compile Include="Safe.cs" />
137 138 <Compile Include="ValueEventArgs.cs" />
138 139 <Compile Include="PromiseExtensions.cs" />
139 <Compile Include="TransientPromiseException.cs" />
140 140 <Compile Include="SyncContextPromise.cs" />
141 141 <Compile Include="Diagnostics\OperationContext.cs" />
142 142 <Compile Include="Diagnostics\TraceContext.cs" />
143 143 <Compile Include="Diagnostics\LogEventArgs.cs" />
144 144 <Compile Include="Diagnostics\LogEventArgsT.cs" />
145 145 <Compile Include="Diagnostics\Extensions.cs" />
146 146 <Compile Include="IComponentContainer.cs" />
147 147 <Compile Include="PromiseEventType.cs" />
148 148 <Compile Include="Parallels\MTCustomQueue.cs" />
149 149 <Compile Include="Parallels\MTCustomQueueNode.cs" />
150 150 <Compile Include="ComponentContainer.cs" />
151 151 <Compile Include="DisposablePool.cs" />
152 152 <Compile Include="ObjectPool.cs" />
153 <Compile Include="Parallels\AsyncQueue.cs" />
154 <Compile Include="PromiseT.cs" />
155 <Compile Include="IDeferred.cs" />
156 <Compile Include="IDeferredT.cs" />
157 <Compile Include="AbstractPromise.cs" />
158 <Compile Include="Promise.cs" />
159 <Compile Include="PromiseTransientException.cs" />
153 160 </ItemGroup>
154 161 <Import Project="$(MSBuildBinPath)\Microsoft.CSharp.targets" />
155 162 <ItemGroup />
156 163 <ProjectExtensions>
157 164 <MonoDevelop>
158 165 <Properties>
159 166 <Policies>
160 167 <CSharpFormattingPolicy IndentSwitchBody="True" NamespaceBraceStyle="EndOfLine" ClassBraceStyle="EndOfLine" InterfaceBraceStyle="EndOfLine" StructBraceStyle="EndOfLine" EnumBraceStyle="EndOfLine" MethodBraceStyle="EndOfLine" ConstructorBraceStyle="EndOfLine" DestructorBraceStyle="EndOfLine" BeforeMethodDeclarationParentheses="False" BeforeMethodCallParentheses="False" BeforeConstructorDeclarationParentheses="False" NewLineBeforeConstructorInitializerColon="NewLine" NewLineAfterConstructorInitializerColon="SameLine" BeforeIndexerDeclarationBracket="False" BeforeDelegateDeclarationParentheses="False" NewParentheses="False" SpacesBeforeBrackets="False" inheritsSet="Mono" inheritsScope="text/x-csharp" scope="text/x-csharp" />
161 168 <TextStylePolicy FileWidth="120" EolMarker="Unix" inheritsSet="VisualStudio" inheritsScope="text/plain" scope="text/x-csharp" />
162 169 <DotNetNamingPolicy DirectoryNamespaceAssociation="PrefixedHierarchical" ResourceNamePolicy="MSBuild" />
163 170 <TextStylePolicy FileWidth="120" TabsToSpaces="False" inheritsSet="VisualStudio" inheritsScope="text/plain" scope="application/xml" />
164 171 <XmlFormattingPolicy inheritsSet="Mono" inheritsScope="application/xml" scope="application/xml" />
165 172 <TextStylePolicy FileWidth="120" TabsToSpaces="False" inheritsSet="VisualStudio" inheritsScope="text/plain" scope="text/plain" />
166 173 <NameConventionPolicy>
167 174 <Rules>
168 175 <NamingRule Name="Namespaces" AffectedEntity="Namespace" VisibilityMask="VisibilityMask" NamingStyle="PascalCase" IncludeInstanceMembers="True" IncludeStaticEntities="True" />
169 176 <NamingRule Name="Types" AffectedEntity="Class, Struct, Enum, Delegate" VisibilityMask="VisibilityMask" NamingStyle="PascalCase" IncludeInstanceMembers="True" IncludeStaticEntities="True" />
170 177 <NamingRule Name="Interfaces" AffectedEntity="Interface" VisibilityMask="VisibilityMask" NamingStyle="PascalCase" IncludeInstanceMembers="True" IncludeStaticEntities="True">
171 178 <RequiredPrefixes>
172 179 <String>I</String>
173 180 </RequiredPrefixes>
174 181 </NamingRule>
175 182 <NamingRule Name="Attributes" AffectedEntity="CustomAttributes" VisibilityMask="VisibilityMask" NamingStyle="PascalCase" IncludeInstanceMembers="True" IncludeStaticEntities="True">
176 183 <RequiredSuffixes>
177 184 <String>Attribute</String>
178 185 </RequiredSuffixes>
179 186 </NamingRule>
180 187 <NamingRule Name="Event Arguments" AffectedEntity="CustomEventArgs" VisibilityMask="VisibilityMask" NamingStyle="PascalCase" IncludeInstanceMembers="True" IncludeStaticEntities="True">
181 188 <RequiredSuffixes>
182 189 <String>EventArgs</String>
183 190 </RequiredSuffixes>
184 191 </NamingRule>
185 192 <NamingRule Name="Exceptions" AffectedEntity="CustomExceptions" VisibilityMask="VisibilityMask" NamingStyle="PascalCase" IncludeInstanceMembers="True" IncludeStaticEntities="True">
186 193 <RequiredSuffixes>
187 194 <String>Exception</String>
188 195 </RequiredSuffixes>
189 196 </NamingRule>
190 197 <NamingRule Name="Methods" AffectedEntity="Methods" VisibilityMask="VisibilityMask" NamingStyle="PascalCase" IncludeInstanceMembers="True" IncludeStaticEntities="True" />
191 198 <NamingRule Name="Static Readonly Fields" AffectedEntity="ReadonlyField" VisibilityMask="Internal, Protected, Public" NamingStyle="CamelCase" IncludeInstanceMembers="False" IncludeStaticEntities="True" />
192 199 <NamingRule Name="Fields (Non Private)" AffectedEntity="Field" VisibilityMask="Internal, Public" NamingStyle="CamelCase" IncludeInstanceMembers="True" IncludeStaticEntities="True" />
193 200 <NamingRule Name="ReadOnly Fields (Non Private)" AffectedEntity="ReadonlyField" VisibilityMask="Internal, Public" NamingStyle="CamelCase" IncludeInstanceMembers="True" IncludeStaticEntities="False" />
194 201 <NamingRule Name="Fields (Private)" AffectedEntity="Field, ReadonlyField" VisibilityMask="Private, Protected" NamingStyle="CamelCase" IncludeInstanceMembers="True" IncludeStaticEntities="False">
195 202 <RequiredPrefixes>
196 203 <String>m_</String>
197 204 </RequiredPrefixes>
198 205 </NamingRule>
199 206 <NamingRule Name="Static Fields (Private)" AffectedEntity="Field" VisibilityMask="Private" NamingStyle="CamelCase" IncludeInstanceMembers="False" IncludeStaticEntities="True">
200 207 <RequiredPrefixes>
201 208 <String>_</String>
202 209 </RequiredPrefixes>
203 210 </NamingRule>
204 211 <NamingRule Name="ReadOnly Fields (Private)" AffectedEntity="ReadonlyField" VisibilityMask="Private, Protected" NamingStyle="CamelCase" IncludeInstanceMembers="True" IncludeStaticEntities="False">
205 212 <RequiredPrefixes>
206 213 <String>m_</String>
207 214 </RequiredPrefixes>
208 215 </NamingRule>
209 216 <NamingRule Name="Constant Fields" AffectedEntity="ConstantField" VisibilityMask="VisibilityMask" NamingStyle="AllUpper" IncludeInstanceMembers="True" IncludeStaticEntities="True" />
210 217 <NamingRule Name="Properties" AffectedEntity="Property" VisibilityMask="VisibilityMask" NamingStyle="PascalCase" IncludeInstanceMembers="True" IncludeStaticEntities="True" />
211 218 <NamingRule Name="Events" AffectedEntity="Event" VisibilityMask="VisibilityMask" NamingStyle="PascalCase" IncludeInstanceMembers="True" IncludeStaticEntities="True" />
212 219 <NamingRule Name="Enum Members" AffectedEntity="EnumMember" VisibilityMask="VisibilityMask" NamingStyle="PascalCase" IncludeInstanceMembers="True" IncludeStaticEntities="True" />
213 220 <NamingRule Name="Parameters" AffectedEntity="Parameter, LocalVariable" VisibilityMask="VisibilityMask" NamingStyle="CamelCase" IncludeInstanceMembers="True" IncludeStaticEntities="True" />
214 221 <NamingRule Name="Type Parameters" AffectedEntity="TypeParameter" VisibilityMask="VisibilityMask" NamingStyle="PascalCase" IncludeInstanceMembers="True" IncludeStaticEntities="True">
215 222 <RequiredPrefixes>
216 223 <String>T</String>
217 224 </RequiredPrefixes>
218 225 </NamingRule>
219 226 </Rules>
220 227 </NameConventionPolicy>
221 228 </Policies>
222 229 </Properties>
223 230 </MonoDevelop>
224 231 </ProjectExtensions>
225 232 </Project> No newline at end of file
@@ -1,255 +1,251
1 1 using System;
2 2 using System.Collections.Generic;
3 3 using System.IO;
4 using System.Linq;
5 using System.Text;
6 using System.Threading.Tasks;
7 4
8 5 namespace Implab.JSON {
9 6 public class JSONWriter {
10 7 struct Context {
11 8 public bool needComma;
12 9 public JSONElementContext element;
13 10 }
14 11 Stack<Context> m_contextStack = new Stack<Context>();
15 12 Context m_context;
16 13
17 14 TextWriter m_writer;
18 15 readonly bool m_indent = true;
19 16 readonly int m_indentSize = 4;
20 17
21 18 static readonly char [] _escapeBKS,
22 19 _escapeFWD,
23 20 _escapeCR,
24 21 _escapeNL,
25 22 _escapeTAB,
26 _escapeSLASH,
27 23 _escapeBSLASH,
28 24 _escapeQ;
29 25
30 26 static JSONWriter() {
31 27 _escapeBKS = "\\b".ToCharArray();
32 28 _escapeFWD = "\\f".ToCharArray();
33 29 _escapeCR = "\\r".ToCharArray();
34 30 _escapeNL = "\\n".ToCharArray();
35 31 _escapeTAB = "\\t".ToCharArray();
36 32 _escapeBSLASH = "\\\\".ToCharArray();
37 _escapeSLASH = "\\/".ToCharArray();
38 33 _escapeQ = "\\\"".ToCharArray();
39 34 }
40 35
41 36 public JSONWriter(TextWriter writer) {
42 37 Safe.ArgumentNotNull(writer, "writer");
43 38
44 39 m_writer = writer;
45 40 }
46 41
47 42 public JSONWriter(TextWriter writer, bool indent) {
48 43 Safe.ArgumentNotNull(writer, "writer");
49 44
50 45 m_writer = writer;
51 46 m_indent = indent;
52 47 }
53 48
54 49 void WriteIndent() {
55 50 if (m_indent) {
56 51 var indent = new char[m_contextStack.Count * m_indentSize + 1];
57 52 indent[0] = '\n';
58 53 for (int i = 1; i < indent.Length; i++)
59 54 indent[i] = ' ';
60 55 m_writer.Write(new String(indent));
61 56 } else {
62 57 m_writer.Write(' ');
63 58 }
64 59 }
65 60
66 61 void WriteMemberName(string name) {
67 62 Safe.ArgumentNotEmpty(name, "name");
68 63 if (m_context.element != JSONElementContext.Object)
69 64 OperationNotApplicable("WriteMember");
70 65 if (m_context.needComma)
71 66 m_writer.Write(",");
72 67
73 68 WriteIndent();
74 69 m_context.needComma = true;
75 70 Write(name);
76 71 m_writer.Write(" : ");
77 72 }
78 73
79 74 public void WriteValue(string name, string value) {
80 75 WriteMemberName(name);
81 76 Write(value);
82 77 }
83 78
84 79 public void WriteValue(string name, bool value) {
85 80 WriteMemberName(name);
86 81 Write(value);
87 82 }
88 83
89 84 public void WriteValue(string name, double value) {
90 85 WriteMemberName(name);
91 86 Write(value);
92 87 }
93 88
94 89 public void WriteValue(string value) {
95 90 if (m_context.element != JSONElementContext.Array)
96 91 OperationNotApplicable("WriteValue");
97 92 if (m_context.needComma)
98 93 m_writer.Write(",");
99 94 WriteIndent();
100 95 m_context.needComma = true;
101 96
102 97 Write(value);
103 98 }
104 99
105 100 public void WriteValue(bool value) {
106 101 if (m_context.element != JSONElementContext.Array)
107 102 OperationNotApplicable("WriteValue");
108 103 if (m_context.needComma)
109 104 m_writer.Write(",");
110 105 m_context.needComma = true;
111 106
112 107 WriteIndent();
113 108 Write(value);
114 109 }
115 110
116 111 public void WriteValue(double value) {
117 112 if (m_context.element != JSONElementContext.Array)
118 113 OperationNotApplicable("WriteValue");
119 114 if (m_context.needComma)
120 115 m_writer.Write(",");
121 116 m_context.needComma = true;
122 117
123 118 WriteIndent();
124 119 Write(value);
125 120 }
126 121
127 122 public void BeginObject() {
128 123 if (m_context.element != JSONElementContext.None && m_context.element != JSONElementContext.Array)
129 124 OperationNotApplicable("BeginObject");
130 125 if (m_context.needComma)
131 126 m_writer.Write(",");
132 127
133 128 WriteIndent();
134 129
135 130 m_context.needComma = true;
136 131
137 132 m_contextStack.Push(m_context);
138 133
139 134 m_context = new Context { element = JSONElementContext.Object, needComma = false };
140 135 m_writer.Write("{");
141 136 }
142 137
143 138 public void BeginObject(string name) {
144 139 WriteMemberName(name);
145 140
146 141 m_contextStack.Push(m_context);
147 142
148 143 m_context = new Context { element = JSONElementContext.Object, needComma = false };
149 144 m_writer.Write("{");
150 145 }
151 146
152 147 public void EndObject() {
153 148 if (m_context.element != JSONElementContext.Object)
154 149 OperationNotApplicable("EndArray");
155 150
156 151 m_context = m_contextStack.Pop();
157 152 WriteIndent();
158 153 m_writer.Write("}");
159 154 }
160 155
161 156 public void BeginArray() {
162 157 if (m_context.element != JSONElementContext.None && m_context.element != JSONElementContext.Array)
163 158 throw new InvalidOperationException();
164 159 if (m_context.needComma) {
165 160 m_writer.Write(",");
166 161
167 162 }
168 163 m_context.needComma = true;
169 164
170 165 WriteIndent();
171 166 m_contextStack.Push(m_context);
172 167 m_context = new Context { element = JSONElementContext.Array, needComma = false };
173 168 m_writer.Write("[");
174 169 }
175 170
176 171 public void BeginArray(string name) {
177 172 WriteMemberName(name);
178 173
179 174 m_contextStack.Push(m_context);
180 175
181 176 m_context = new Context { element = JSONElementContext.Array, needComma = false };
182 177 m_writer.Write("[");
183 178 }
184 179
185 180 public void EndArray() {
186 181 if (m_context.element != JSONElementContext.Array)
187 182 OperationNotApplicable("EndArray");
188 183
189 184 m_context = m_contextStack.Pop();
190 185 WriteIndent();
191 186 m_writer.Write("]");
192 187 }
193 188
194 189 void Write(bool value) {
195 190 m_writer.Write(value ? "true" : "false");
196 191 }
197 192
198 193
199 194 void Write(string value) {
200 195 if (value == null) {
201 196 m_writer.Write("null");
202 197 return;
203 198 }
204 199
205 200 var chars = value.ToCharArray();
206 201 m_writer.Write('"');
207 202
203 // Analysis disable once ForCanBeConvertedToForeach
208 204 for (int i = 0; i < chars.Length; i++) {
209 205 var ch = chars[i];
210 206
211 207 switch (ch) {
212 208 case '\b':
213 209 m_writer.Write(_escapeBKS);
214 210 break;
215 211 case '\f':
216 212 m_writer.Write(_escapeFWD);
217 213 break;
218 214 case '\r':
219 215 m_writer.Write(_escapeCR);
220 216 break;
221 217 case '\n':
222 218 m_writer.Write(_escapeNL);
223 219 break;
224 220 case '\t':
225 221 m_writer.Write(_escapeTAB);
226 222 break;
227 223 case '\\':
228 224 m_writer.Write(_escapeBSLASH);
229 225 break;
230 226 case '"':
231 227 m_writer.Write(_escapeQ);
232 228 break;
233 229 default:
234 230 if (ch < 0x20) {
235 231 m_writer.Write("\\u00{0:x2}",(int)ch);
236 232 } else {
237 233 m_writer.Write(ch);
238 234 }
239 235 break;
240 236 }
241 237 }
242 238
243 239 m_writer.Write('"');
244 240 }
245 241
246 242 void Write(double value) {
247 243 m_writer.Write(value);
248 244 }
249 245
250 246 void OperationNotApplicable(string opName) {
251 247 throw new InvalidOperationException(String.Format("The operation '{0}' isn't applicable in the context of '{1}'", opName, m_context.element ));
252 248 }
253 249
254 250 }
255 251 }
@@ -1,60 +1,60
1 1 using Implab.Parallels;
2 2 using System;
3 3 using System.Threading;
4 4
5 5 namespace Implab {
6 6 /// <summary>
7 7 /// Базовый класс для создания пулов объектов.
8 8 /// </summary>
9 9 /// <remarks>
10 10 /// <para>Пул объектов позволяет многократно использовать один и тотже объект,
11 11 /// что актуально для объектов, создание которых требует существенных ресурсов.
12 12 /// Пул объектов использует слабые ссылки, чтобы не препятствовать освобождению
13 13 /// ресурсов и создает новые объекты при необходимости.</para>
14 14 /// <para>
15 15 /// Наследники должны реализовывать метод <see cref="CreateInstance()"/> для создания
16 16 /// новых экземпляров.
17 17 /// </para>
18 18 /// <para>Пул поддерживает обращения сразу из нескольких потоков.</para>
19 19 /// </remarks>
20 20 public abstract class ObjectPool<T> where T : class {
21 readonly MTQueue<WeakReference> m_queue = new MTQueue<WeakReference>();
21 readonly AsyncQueue<WeakReference> m_queue = new AsyncQueue<WeakReference>();
22 22 readonly int m_size;
23 23 int m_count = 0;
24 24
25 25 protected ObjectPool() : this(Environment.ProcessorCount+1) {
26 26
27 27 }
28 28
29 29 protected ObjectPool(int size) {
30 30 Safe.ArgumentInRange(size,1,size,"size");
31 31
32 32 m_size = size;
33 33 }
34 34
35 35 protected abstract T CreateInstance();
36 36
37 37 protected virtual void CleanupInstance(T instance) {
38 38 }
39 39
40 40 public T Allocate() {
41 41 WeakReference reference;
42 42 while (m_queue.TryDequeue(out reference)) {
43 43 Interlocked.Decrement(ref m_count);
44 44 object instance = reference.Target;
45 45 if (instance == null)
46 46 continue;
47 47 return (T)instance;
48 48 }
49 49 return CreateInstance();
50 50 }
51 51
52 52 public void Release(T instance) {
53 53 if (m_count < m_size && instance != null) {
54 54 Interlocked.Increment(ref m_count);
55 55 CleanupInstance(instance);
56 56 m_queue.Enqueue(new WeakReference(instance));
57 57 }
58 58 }
59 59 }
60 60 }
@@ -1,207 +1,207
1 1 using Implab.Diagnostics;
2 2 using System;
3 3 using System.Diagnostics;
4 4 using System.Threading;
5 5
6 6 namespace Implab.Parallels {
7 7 public static class ArrayTraits {
8 8 class ArrayIterator<TSrc> : DispatchPool<int> {
9 9 readonly Action<TSrc> m_action;
10 10 readonly TSrc[] m_source;
11 11 readonly Promise<int> m_promise = new Promise<int>();
12 12 readonly LogicalOperation m_logicalOperation;
13 13
14 14 int m_pending;
15 15 int m_next;
16 16
17 17 public ArrayIterator(TSrc[] source, Action<TSrc> action, int threads)
18 18 : base(threads) {
19 19
20 20 Debug.Assert(source != null);
21 21 Debug.Assert(action != null);
22 22
23 23 m_logicalOperation = TraceContext.Instance.CurrentOperation;
24 24 m_next = 0;
25 25 m_source = source;
26 26 m_pending = source.Length;
27 27 m_action = action;
28 28
29 m_promise.Anyway(Dispose);
29 m_promise.On(Dispose, PromiseEventType.All);
30 30
31 31 InitPool();
32 32 }
33 33
34 34 public Promise<int> Promise {
35 35 get {
36 36 return m_promise;
37 37 }
38 38 }
39 39
40 40 protected override void Worker() {
41 41 TraceContext.Instance.EnterLogicalOperation(m_logicalOperation, false);
42 42 try {
43 43 base.Worker();
44 44 } finally {
45 45 TraceContext.Instance.Leave();
46 46 }
47 47 }
48 48
49 49 protected override bool TryDequeue(out int unit) {
50 50 unit = Interlocked.Increment(ref m_next) - 1;
51 51 return unit < m_source.Length;
52 52 }
53 53
54 54 protected override void InvokeUnit(int unit) {
55 55 try {
56 56 m_action(m_source[unit]);
57 57 var pending = Interlocked.Decrement(ref m_pending);
58 58 if (pending == 0)
59 59 m_promise.Resolve(m_source.Length);
60 60 } catch (Exception e) {
61 61 m_promise.Reject(e);
62 62 }
63 63 }
64 64 }
65 65
66 66 class ArrayMapper<TSrc, TDst>: DispatchPool<int> {
67 67 readonly Func<TSrc, TDst> m_transform;
68 68 readonly TSrc[] m_source;
69 69 readonly TDst[] m_dest;
70 70 readonly Promise<TDst[]> m_promise = new Promise<TDst[]>();
71 71 readonly LogicalOperation m_logicalOperation;
72 72
73 73 int m_pending;
74 74 int m_next;
75 75
76 76 public ArrayMapper(TSrc[] source, Func<TSrc, TDst> transform, int threads)
77 77 : base(threads) {
78 78
79 79 Debug.Assert (source != null);
80 80 Debug.Assert( transform != null);
81 81
82 82 m_next = 0;
83 83 m_source = source;
84 84 m_dest = new TDst[source.Length];
85 85 m_pending = source.Length;
86 86 m_transform = transform;
87 87 m_logicalOperation = TraceContext.Instance.CurrentOperation;
88 88
89 m_promise.Anyway(Dispose);
89 m_promise.On(Dispose, PromiseEventType.All);
90 90
91 91 InitPool();
92 92 }
93 93
94 94 public Promise<TDst[]> Promise {
95 95 get {
96 96 return m_promise;
97 97 }
98 98 }
99 99
100 100 protected override void Worker() {
101 101 TraceContext.Instance.EnterLogicalOperation(m_logicalOperation,false);
102 102 try {
103 103 base.Worker();
104 104 } finally {
105 105 TraceContext.Instance.Leave();
106 106 }
107 107 }
108 108
109 109 protected override bool TryDequeue(out int unit) {
110 110 unit = Interlocked.Increment(ref m_next) - 1;
111 111 return unit < m_source.Length;
112 112 }
113 113
114 114 protected override void InvokeUnit(int unit) {
115 115 try {
116 116 m_dest[unit] = m_transform(m_source[unit]);
117 117 var pending = Interlocked.Decrement(ref m_pending);
118 118 if (pending == 0)
119 119 m_promise.Resolve(m_dest);
120 120 } catch (Exception e) {
121 121 m_promise.Reject(e);
122 122 }
123 123 }
124 124 }
125 125
126 126 public static IPromise<TDst[]> ParallelMap<TSrc, TDst> (this TSrc[] source, Func<TSrc,TDst> transform, int threads) {
127 127 if (source == null)
128 128 throw new ArgumentNullException("source");
129 129 if (transform == null)
130 130 throw new ArgumentNullException("transform");
131 131
132 132 var mapper = new ArrayMapper<TSrc, TDst>(source, transform, threads);
133 133 return mapper.Promise;
134 134 }
135 135
136 136 public static IPromise<int> ParallelForEach<TSrc>(this TSrc[] source, Action<TSrc> action, int threads) {
137 137 if (source == null)
138 138 throw new ArgumentNullException("source");
139 139 if (action == null)
140 140 throw new ArgumentNullException("action");
141 141
142 142 var iter = new ArrayIterator<TSrc>(source, action, threads);
143 143 return iter.Promise;
144 144 }
145 145
146 146 public static IPromise<TDst[]> ChainedMap<TSrc, TDst>(this TSrc[] source, Func<TSrc, IPromise<TDst>> transform, int threads) {
147 147 if (source == null)
148 148 throw new ArgumentNullException("source");
149 149 if (transform == null)
150 150 throw new ArgumentNullException("transform");
151 151 if (threads <= 0)
152 152 throw new ArgumentOutOfRangeException("threads","Threads number must be greater then zero");
153 153
154 154 if (source.Length == 0)
155 155 return Promise<TDst[]>.ResultToPromise(new TDst[0]);
156 156
157 157 var promise = new Promise<TDst[]>();
158 158 var res = new TDst[source.Length];
159 159 var pending = source.Length;
160 160
161 161 object locker = new object();
162 162 int slots = threads;
163 163
164 164 // Analysis disable AccessToDisposedClosure
165 AsyncPool.InvokeNewThread(() => {
165 AsyncPool.InvokeNewThread<int>(() => {
166 166 for (int i = 0; i < source.Length; i++) {
167 167 if(promise.IsResolved)
168 168 break; // stop processing in case of error or cancellation
169 169 var idx = i;
170 170
171 171 if (Interlocked.Decrement(ref slots) < 0) {
172 172 lock(locker) {
173 173 while(slots < 0)
174 174 Monitor.Wait(locker);
175 175 }
176 176 }
177 177
178 178 try {
179 179 transform(source[i])
180 .Anyway(() => {
180 .On( x => {
181 181 Interlocked.Increment(ref slots);
182 182 lock (locker) {
183 183 Monitor.Pulse(locker);
184 184 }
185 185 })
186 186 .On(
187 187 x => {
188 188 res[idx] = x;
189 189 var left = Interlocked.Decrement(ref pending);
190 190 if (left == 0)
191 191 promise.Resolve(res);
192 192 },
193 193 promise.Reject
194 194 );
195 195
196 196 } catch (Exception e) {
197 197 promise.Reject(e);
198 198 }
199 199 }
200 200 return 0;
201 201 });
202 202
203 203 return promise;
204 204 }
205 205
206 206 }
207 207 }
@@ -1,77 +1,77
1 1 using Implab.Diagnostics;
2 2 using System;
3 3 using System.Threading;
4 4
5 5 namespace Implab.Parallels {
6 6 /// <summary>
7 7 /// Класс для распаралеливания задач.
8 8 /// </summary>
9 9 /// <remarks>
10 10 /// Используя данный класс и лямда выражения можно распараллелить
11 11 /// вычисления, для этого используется концепция обещаний.
12 12 /// </remarks>
13 13 public static class AsyncPool {
14 14
15 15 public static IPromise<T> Invoke<T>(Func<T> func) {
16 16 var p = new Promise<T>();
17 17 var caller = TraceContext.Instance.CurrentOperation;
18 18
19 19 ThreadPool.QueueUserWorkItem(param => {
20 20 TraceContext.Instance.EnterLogicalOperation(caller,false);
21 21 try {
22 22 p.Resolve(func());
23 23 } catch(Exception e) {
24 24 p.Reject(e);
25 25 } finally {
26 26 TraceContext.Instance.Leave();
27 27 }
28 28 });
29 29
30 30 return p;
31 31 }
32 32
33 33 public static IPromise<T> InvokeNewThread<T>(Func<T> func) {
34 34 var p = new Promise<T>();
35 35
36 36 var caller = TraceContext.Instance.CurrentOperation;
37 37
38 38 var worker = new Thread(() => {
39 39 TraceContext.Instance.EnterLogicalOperation(caller,false);
40 40 try {
41 41 p.Resolve(func());
42 42 } catch (Exception e) {
43 43 p.Reject(e);
44 44 } finally {
45 45 TraceContext.Instance.Leave();
46 46 }
47 47 });
48 48 worker.IsBackground = true;
49 49 worker.Start();
50 50
51 51 return p;
52 52 }
53 53
54 54
55 55 public static IPromise InvokeNewThread(Action func) {
56 var p = new Promise<object>();
56 var p = new Promise();
57 57
58 58 var caller = TraceContext.Instance.CurrentOperation;
59 59
60 60 var worker = new Thread(() => {
61 61 TraceContext.Instance.EnterLogicalOperation(caller,false);
62 62 try {
63 63 func();
64 64 p.Resolve();
65 65 } catch (Exception e) {
66 66 p.Reject(e);
67 67 } finally {
68 68 TraceContext.Instance.Leave();
69 69 }
70 70 });
71 71 worker.IsBackground = true;
72 72 worker.Start();
73 73
74 74 return p;
75 75 }
76 76 }
77 77 }
@@ -1,82 +1,82
1 1 using System;
2 2 using System.Threading;
3 3 using System.Diagnostics;
4 4 using Implab.Diagnostics;
5 5
6 6 namespace Implab.Parallels {
7 7 public class WorkerPool : DispatchPool<Action> {
8 8
9 MTQueue<Action> m_queue = new MTQueue<Action>();
9 AsyncQueue<Action> m_queue = new AsyncQueue<Action>();
10 10 int m_queueLength = 0;
11 11 readonly int m_threshold = 1;
12 12
13 13 public WorkerPool(int minThreads, int maxThreads, int threshold)
14 14 : base(minThreads, maxThreads) {
15 15 m_threshold = threshold;
16 16 InitPool();
17 17 }
18 18
19 19 public WorkerPool(int minThreads, int maxThreads) :
20 20 base(minThreads, maxThreads) {
21 21 InitPool();
22 22 }
23 23
24 24 public WorkerPool(int threads)
25 25 : base(threads) {
26 26 InitPool();
27 27 }
28 28
29 29 public WorkerPool() {
30 30 InitPool();
31 31 }
32 32
33 33 public Promise<T> Invoke<T>(Func<T> task) {
34 34 if (task == null)
35 35 throw new ArgumentNullException("task");
36 36 if (IsDisposed)
37 37 throw new ObjectDisposedException(ToString());
38 38
39 39 var promise = new Promise<T>();
40 40
41 41 var lop = TraceContext.Instance.CurrentOperation;
42 42
43 43 EnqueueTask(delegate() {
44 44 TraceContext.Instance.EnterLogicalOperation(lop, false);
45 45 try {
46 46 promise.Resolve(task());
47 47 } catch (Exception e) {
48 48 promise.Reject(e);
49 49 } finally {
50 50 TraceContext.Instance.Leave();
51 51 }
52 52 });
53 53
54 54 return promise;
55 55 }
56 56
57 57 protected void EnqueueTask(Action unit) {
58 58 Debug.Assert(unit != null);
59 59 var len = Interlocked.Increment(ref m_queueLength);
60 60 m_queue.Enqueue(unit);
61 61
62 62 if (len > m_threshold * PoolSize) {
63 63 StartWorker();
64 64 }
65 65
66 66 SignalThread();
67 67 }
68 68
69 69 protected override bool TryDequeue(out Action unit) {
70 70 if (m_queue.TryDequeue(out unit)) {
71 71 Interlocked.Decrement(ref m_queueLength);
72 72 return true;
73 73 }
74 74 return false;
75 75 }
76 76
77 77 protected override void InvokeUnit(Action unit) {
78 78 unit();
79 79 }
80 80
81 81 }
82 82 }
This diff has been collapsed as it changes many lines, (1212 lines changed) Show them Hide them
@@ -1,954 +1,258
1 using System;
2 using System.Collections.Generic;
3 using System.Reflection;
4 using System.Threading;
5 using Implab.Parallels;
6
7 namespace Implab {
8
9 /// <summary>
10 /// Класс для асинхронного получения результатов. Так называемое "обещание".
11 /// </summary>
12 /// <typeparam name="T">Тип получаемого результата</typeparam>
13 /// <remarks>
14 /// <para>Сервис при обращении к его методу дает обещаиние о выполнении операции,
15 /// клиент получив такое обещание может установить ряд обратных вызово для получения
16 /// событий выполнения обещания, тоесть завершения операции и предоставлении результатов.</para>
17 /// <para>
18 /// Обещение может быть как выполнено, так и выполнено с ошибкой. Для подписки на
19 /// данные события клиент должен использовать методы <c>Then</c>.
20 /// </para>
21 /// <para>
22 /// Сервис, в свою очередь, по окончанию выполнения операции (возможно с ошибкой),
23 /// использует методы <c>Resolve</c> либо <c>Reject</c> для оповещения клиетна о
24 /// выполнении обещания.
25 /// </para>
26 /// <para>
27 /// Если сервер успел выполнить обещание еще до того, как клиент на него подписался,
28 /// то в момент подписки клиента будут вызваны соответсвующие события в синхронном
29 /// режиме и клиент будет оповещен в любом случае. Иначе, обработчики добавляются в
30 /// список в порядке подписания и в этом же порядке они будут вызваны при выполнении
31 /// обещания.
32 /// </para>
33 /// <para>
34 /// Обрабатывая результаты обещания можно преобразовывать результаты либо инициировать
35 /// связанные асинхронные операции, которые также возвращают обещания. Для этого следует
36 /// использовать соответствующую форму методе <c>Then</c>.
37 /// </para>
38 /// <para>
39 /// Также хорошим правилом является то, что <c>Resolve</c> и <c>Reject</c> должен вызывать
40 /// только инициатор обещания иначе могут возникнуть противоречия.
41 /// </para>
42 /// </remarks>
43 public class Promise<T> : IPromise<T> {
44
45 protected abstract class AbstractHandler : MTCustomQueueNode<AbstractHandler> {
46 public abstract void Resolve(T result);
47 public abstract void Reject(Exception error);
48 public abstract void Cancel();
49 }
50
51 protected class RemapDescriptor<T2> : AbstractHandler {
52
53 readonly Func<T,T2> m_resultHandler;
54 readonly Func<Exception,T2> m_errorHandler;
55 readonly Action m_cancellHandler;
56 readonly Promise<T2> m_medium;
57
58 public RemapDescriptor(Func<T,T2> resultHandler, Func<Exception,T2> errorHandler, Action cancelHandler, Promise<T2> medium) {
59 m_resultHandler = resultHandler;
60 m_errorHandler = errorHandler;
61 m_cancellHandler = cancelHandler;
62 m_medium = medium;
63 }
64
65 public override void Resolve(T result) {
66 if (m_resultHandler != null) {
67 try {
68 if (m_medium != null)
69 m_medium.Resolve(m_resultHandler(result));
70 else
71 m_resultHandler(result);
72 } catch (Exception e) {
73 Reject(e);
74 }
75 } else if(m_medium != null)
76 m_medium.Resolve(default(T2));
77 }
78
79 public override void Reject(Exception error) {
80 if (m_errorHandler != null) {
81 try {
82 var res = m_errorHandler(error);
83 if (m_medium != null)
84 m_medium.Resolve(res);
85 } catch (Exception err2) {
86 if (m_medium != null)
87 m_medium.Reject(err2);
88 }
89 } else if (m_medium != null)
90 m_medium.Reject(error);
91 }
92
93 public override void Cancel() {
94 if (m_cancellHandler != null) {
95 try {
96 m_cancellHandler();
97 } catch (Exception err) {
98 Reject(err);
99 return;
100 }
101 }
102 if (m_medium != null)
103 m_medium.Cancel();
104 }
105 }
106
107 protected class HandlerDescriptor : AbstractHandler {
108
109 readonly Action<T> m_resultHandler;
110 readonly Action<Exception> m_errorHandler;
111 readonly Action m_cancellHandler;
112 readonly Promise<T> m_medium;
113
114 public HandlerDescriptor(Action<T> resultHandler, Action<Exception> errorHandler, Action cancelHandler, Promise<T> medium) {
115 m_resultHandler = resultHandler;
116 m_errorHandler = errorHandler;
117 m_cancellHandler = cancelHandler;
118 m_medium = medium;
119 }
120
121 public override void Resolve(T result) {
122 if (m_resultHandler != null) {
123 try {
124 m_resultHandler(result);
125 } catch (Exception e) {
126 Reject(e);
127 return;
128 }
129 }
130 if(m_medium != null)
131 m_medium.Resolve(result);
132 }
133
134 public override void Reject(Exception error) {
135 if (m_errorHandler != null) {
136 try {
137 m_errorHandler(error);
138 if (m_medium != null)
139 m_medium.Resolve(default(T));
140 } catch (Exception err2) {
141 if (m_medium != null)
142 m_medium.Reject(err2);
143 }
144 } else if (m_medium != null)
145 m_medium.Reject(error);
146 }
147
148 public override void Cancel() {
149 if (m_cancellHandler != null) {
150 try {
151 m_cancellHandler();
152 } catch (Exception err) {
153 Reject(err);
154 return;
155 }
156 }
157 if (m_medium != null)
158 m_medium.Cancel();
159 }
160 }
161
162 const int UNRESOLVED_SATE = 0;
163 const int TRANSITIONAL_STATE = 1;
164 const int SUCCEEDED_STATE = 2;
165 const int REJECTED_STATE = 3;
166 const int CANCELLED_STATE = 4;
167
168 int m_childrenCount;
169 int m_state;
170 T m_result;
171 Exception m_error;
172
173 readonly MTCustomQueue<AbstractHandler> m_handlers = new MTCustomQueue<AbstractHandler>();
174 //readonly MTQueue<AbstractHandler> m_handlers = new MTQueue<AbstractHandler>();
175
176 public Promise() {
177 }
178
179 public Promise(IPromise parent) {
180 if (parent != null)
181 AddHandler(
182 null,
183 null,
184 () => {
185 if (parent.IsExclusive)
186 parent.Cancel();
187 },
188 null,
189 false
190 );
191 }
192
193 bool BeginTransit() {
194 return UNRESOLVED_SATE == Interlocked.CompareExchange(ref m_state, TRANSITIONAL_STATE, UNRESOLVED_SATE);
195 }
196
197 void CompleteTransit(int state) {
198 if (TRANSITIONAL_STATE != Interlocked.CompareExchange(ref m_state, state, TRANSITIONAL_STATE))
199 throw new InvalidOperationException("Can't complete transition when the object isn't in the transitional state");
200 }
201
202 void WaitTransition() {
203 while (m_state == TRANSITIONAL_STATE) {
204 Thread.MemoryBarrier();
205 }
206 }
207
208 public bool IsResolved {
209 get {
210 Thread.MemoryBarrier();
211 return m_state > 1;
212 }
213 }
214
215 public bool IsCancelled {
216 get {
217 Thread.MemoryBarrier();
218 return m_state == CANCELLED_STATE;
219 }
220 }
221
222 public Type PromiseType {
223 get { return typeof(T); }
224 }
225
226 /// <summary>
227 /// Выполняет обещание, сообщая об успешном выполнении.
228 /// </summary>
229 /// <param name="result">Результат выполнения.</param>
230 /// <exception cref="InvalidOperationException">Данное обещание уже выполнено</exception>
231 public void Resolve(T result) {
232 if (BeginTransit()) {
233 m_result = result;
234 CompleteTransit(SUCCEEDED_STATE);
235 OnStateChanged();
236 } else {
237 WaitTransition();
238 if (m_state != CANCELLED_STATE)
239 throw new InvalidOperationException("The promise is already resolved");
240 }
241 }
242
243 /// <summary>
244 /// Выполняет обещание, сообщая об успешном выполнении. Результатом выполнения будет пустое значения.
245 /// </summary>
246 /// <remarks>
247 /// Данный вариант удобен в случаях, когда интересен факт выполнения операции, нежели полученное значение.
248 /// </remarks>
249 public void Resolve() {
250 Resolve(default(T));
251 }
252
253 /// <summary>
254 /// Выполняет обещание, сообщая об ошибке
255 /// </summary>
256 /// <remarks>
257 /// Поскольку обещание должно работать в многопточной среде, при его выполнении сразу несколько потоков
258 /// могу вернуть ошибку, при этом только первая будет использована в качестве результата, остальные
259 /// будут проигнорированы.
260 /// </remarks>
261 /// <param name="error">Исключение возникшее при выполнении операции</param>
262 /// <exception cref="InvalidOperationException">Данное обещание уже выполнено</exception>
263 public void Reject(Exception error) {
264 if (BeginTransit()) {
265 m_error = error is TransientPromiseException ? error.InnerException : error;
266 CompleteTransit(REJECTED_STATE);
267 OnStateChanged();
268 } else {
269 WaitTransition();
270 if (m_state == SUCCEEDED_STATE)
271 throw new InvalidOperationException("The promise is already resolved");
272 }
273 }
274
275 /// <summary>
276 /// Отменяет операцию, если это возможно.
277 /// </summary>
278 /// <remarks>Для определения была ли операция отменена следует использовать свойство <see cref="IsCancelled"/>.</remarks>
279 public void Cancel() {
280 if (BeginTransit()) {
281 CompleteTransit(CANCELLED_STATE);
282 OnStateChanged();
283 }
284 }
285
286 /// <summary>
287 /// Последний обработчик в цепочки обещаний.
288 /// </summary>
289 /// <param name="success"></param>
290 /// <param name="error"></param>
291 /// <param name="cancel"></param>
292 /// <remarks>
293 /// <para>
294 /// Данный метод не создает связанного с текущим обещания и предназначен для окончания
295 /// фсинхронной цепочки.
296 /// </para>
297 /// <para>
298 /// Если данный метод вызвать несколько раз, либо добавить другие обработчики, то цепочка
299 /// не будет одиночной <see cref="IsExclusive"/> и, как следствие, будет невозможна отмена
300 /// всей цепи обещаний снизу (с самого последнего обещания).
301 /// </para>
302 /// </remarks>
303 public void On(Action<T> success, Action<Exception> error, Action cancel) {
304 if (success == null && error == null && cancel == null)
305 return;
306
307 AddHandler(success, error, cancel, null, false);
308 }
309
310 public void On(Action<T> success, Action<Exception> error) {
311 AddHandler(success, error, null, null, false);
312 }
313
314 public void On(Action<T> success) {
315 AddHandler(success, null, null, null, false);
316 }
317
318 public void On(Action handler, PromiseEventType events) {
319 Safe.ArgumentNotNull(handler, "handler");
320
321
322 AddHandler(
323 events.HasFlag(PromiseEventType.Success) ? new Action<T>(x => handler()) : null,
324 events.HasFlag(PromiseEventType.Error) ? new Action<Exception>( x => handler()) : null,
325 events.HasFlag(PromiseEventType.Cancelled) ? handler : null,
326 null,
327 false
328 );
329 }
330
331 public IPromise Error(Action<Exception> error) {
332 if (error == null)
333 return this;
334
335 var medium = new Promise<T>(this);
336
337 AddMappers(
338 x => x,
339 e => {
340 error(e);
341 return default(T);
342 },
343 null,
344 medium,
345 true
346 );
347
348 return medium;
349 }
350
351 /// <summary>
352 /// Handles error and allows to keep the promise.
353 /// </summary>
354 /// <remarks>
355 /// If the specified handler throws an exception, this exception will be used to reject the promise.
356 /// </remarks>
357 /// <param name="handler">The error handler which returns the result of the promise.</param>
358 /// <returns>New promise.</returns>
359 public IPromise<T> Error(Func<Exception,T> handler) {
360 if (handler == null)
361 return this;
362
363 var medium = new Promise<T>(this);
364
365 AddMappers(x => x, handler, null, medium, true);
366
367 return medium;
368 }
369
370 /// <summary>
371 /// Позволяет преобразовать результат выполения операции к новому типу.
372 /// </summary>
373 /// <typeparam name="TNew">Новый тип результата.</typeparam>
374 /// <param name="mapper">Преобразование результата к новому типу.</param>
375 /// <param name="error">Обработчик ошибки. Данный обработчик получит
376 /// исключение возникшее при выполнении операции.</param>
377 /// <returns>Новое обещание, которое будет выполнено при выполнении исходного обещания.</returns>
378 /// <param name = "cancel"></param>
379 public IPromise<TNew> Then<TNew>(Func<T, TNew> mapper, Func<Exception,TNew> error, Action cancel) {
380 Safe.ArgumentNotNull(mapper, "mapper");
381
382 // создаем прицепленное обещание
383 var medium = new Promise<TNew>(this);
384
385 AddMappers(
386 mapper,
387 error,
388 cancel,
389 medium,
390 true
391 );
392
393 return medium;
394 }
395
396 public IPromise<TNew> Then<TNew>(Func<T, TNew> mapper, Func<Exception,TNew> error) {
397 return Then(mapper, error, null);
398 }
399
400 public IPromise<TNew> Then<TNew>(Func<T, TNew> mapper) {
401 return Then(mapper, null, null);
402 }
403
404 /// <summary>
405 /// Сцепляет несколько аснхронных операций. Указанная асинхронная операция будет вызвана после
406 /// выполнения текущей, а результат текущей операции может быть использован для инициализации
407 /// новой операции.
408 /// </summary>
409 /// <typeparam name="TNew">Тип результата указанной асинхронной операции.</typeparam>
410 /// <param name="chained">Асинхронная операция, которая должна будет начаться после выполнения текущей.</param>
411 /// <param name="error">Обработчик ошибки. Данный обработчик получит
412 /// исключение возникшее при выполнении текуещй операции.</param>
413 /// <returns>Новое обещание, которое будет выполнено по окончанию указанной аснхронной операции.</returns>
414 /// <param name = "cancel"></param>
415 public IPromise<TNew> Chain<TNew>(Func<T, IPromise<TNew>> chained, Func<Exception,IPromise<TNew>> error, Action cancel) {
416
417 Safe.ArgumentNotNull(chained, "chained");
418
419 // проблема в том, что на момент связывания еще не начата асинхронная операция, поэтому нужно
420 // создать посредника, к которому будут подвызяваться следующие обработчики.
421 // когда будет выполнена реальная асинхронная операция, она обратиться к посреднику, чтобы
422 // передать через него результаты работы.
423 var medium = new Promise<TNew>(this);
424
425 Func<T,T> resultHandler = delegate(T result) {
426 if (medium.IsCancelled)
427 return default(T);
428
429 var promise = chained(result);
430
431 promise.On(
432 medium.Resolve,
433 medium.Reject,
434 () => medium.Reject(new OperationCanceledException()) // внешняя отмена связанной операции рассматривается как ошибка
435 );
436
437 // notify chained operation that it's not needed anymore
438 // порядок вызова Then, Cancelled важен, поскольку от этого
439 // зависит IsExclusive
440 medium.On(
441 null,
442 null,
443 () => {
444 if (promise.IsExclusive)
445 promise.Cancel();
446 }
447 );
448
449 return default(T);
450 };
451
452 Func<Exception,T> errorHandler;
453
454 if (error != null)
455 errorHandler = delegate(Exception e) {
456 try {
457 var promise = error(e);
458
459 promise.On(
460 medium.Resolve,
461 medium.Reject,
462 () => medium.Reject(new OperationCanceledException()) // внешняя отмена связанной операции рассматривается как ошибка
463 );
464
465 // notify chained operation that it's not needed anymore
466 // порядок вызова Then, Cancelled важен, поскольку от этого
467 // зависит IsExclusive
468 medium.Cancelled(() => {
469 if (promise.IsExclusive)
470 promise.Cancel();
471 });
472 } catch (Exception e2) {
473 medium.Reject(e2);
474 }
475 return default(T);
476 };
477 else
478 errorHandler = err => {
479 medium.Reject(err);
480 return default(T);
481 };
482
483
484 Action cancelHandler;
485 if (cancel != null)
486 cancelHandler = () => {
487 if (cancel != null)
488 cancel();
489 medium.Cancel();
490 };
491 else
492 cancelHandler = medium.Cancel;
493
494 AddMappers(
495 resultHandler,
496 errorHandler,
497 cancelHandler,
498 null,
499 true
500 );
501
502 return medium;
503 }
504
505 public IPromise<TNew> Chain<TNew>(Func<T, IPromise<TNew>> chained, Func<Exception,IPromise<TNew>> error) {
506 return Chain(chained, error, null);
507 }
508
509 public IPromise<TNew> Chain<TNew>(Func<T, IPromise<TNew>> chained) {
510 return Chain(chained, null, null);
511 }
512
513 public IPromise<T> Cancelled(Action handler) {
514 var medium = new Promise<T>(this);
515 AddHandler(null, null, handler, medium, false);
516 return medium;
517 }
518
519 /// <summary>
520 /// Adds the specified handler for all cases (success, error, cancel)
521 /// </summary>
522 /// <param name="handler">The handler that will be called anyway</param>
523 /// <returns>self</returns>
524 public IPromise<T> Anyway(Action handler) {
525 Safe.ArgumentNotNull(handler, "handler");
526
527 var medium = new Promise<T>(this);
528
529 AddHandler(
530 x => handler(),
531 e => {
532 handler();
533 throw new TransientPromiseException(e);
534 },
535 handler,
536 medium,
537 true
538 );
539
540 return medium;
541 }
542
543 /// <summary>
544 /// Преобразует результат обещания к нужному типу
545 /// </summary>
546 /// <typeparam name="T2"></typeparam>
547 /// <returns></returns>
548 public IPromise<T2> Cast<T2>() {
549 return Then(x => (T2)(object)x, null);
550 }
551
552 /// <summary>
553 /// Дожидается отложенного обещания и в случае успеха, возвращает
554 /// его, результат, в противном случае бросает исключение.
555 /// </summary>
556 /// <remarks>
557 /// <para>
558 /// Если ожидание обещания было прервано по таймауту, это не значит,
559 /// что обещание было отменено или что-то в этом роде, это только
560 /// означает, что мы его не дождались, однако все зарегистрированные
561 /// обработчики, как были так остались и они будут вызваны, когда
562 /// обещание будет выполнено.
563 /// </para>
564 /// <para>
565 /// Такое поведение вполне оправдано поскольку таймаут может истечь
566 /// в тот момент, когда началась обработка цепочки обработчиков, и
567 /// к тому же текущее обещание может стоять в цепочке обещаний и его
568 /// отклонение может привести к непрогнозируемому результату.
569 /// </para>
570 /// </remarks>
571 /// <param name="timeout">Время ожидания</param>
572 /// <returns>Результат выполнения обещания</returns>
573 public T Join(int timeout) {
574 var evt = new ManualResetEvent(false);
575 Anyway(() => evt.Set());
576
577 if (!evt.WaitOne(timeout, true))
578 throw new TimeoutException();
579
580 switch (m_state) {
581 case SUCCEEDED_STATE:
582 return m_result;
583 case CANCELLED_STATE:
584 throw new OperationCanceledException();
585 case REJECTED_STATE:
586 throw new TargetInvocationException(m_error);
587 default:
588 throw new ApplicationException(String.Format("Invalid promise state {0}", m_state));
589 }
590 }
591
592 public T Join() {
593 return Join(Timeout.Infinite);
594 }
595
596 void AddMappers<T2>(Func<T,T2> success, Func<Exception,T2> error, Action cancel, Promise<T2> medium, bool inc) {
597 if (inc)
598 Interlocked.Increment(ref m_childrenCount);
599
600 AbstractHandler handler = new RemapDescriptor<T2>(success, error, cancel, medium);
601
602 bool queued;
603
604 if (!IsResolved) {
605 m_handlers.Enqueue(handler);
606 queued = true;
607 } else {
608 // the promise is in resolved state, just invoke the handled with minimum overhead
609 queued = false;
610 InvokeHandler(handler);
611 }
612
613 if (queued && IsResolved && m_handlers.TryDequeue(out handler))
614 // if the promise have been resolved while we was adding handler to the queue
615 // we can't guarantee that someone is still processing it
616 // therefore we will fetch a handler from the queue and execute it
617 // note that fetched handler may be not the one that we have added
618 // even we can fetch no handlers at all :)
619 InvokeHandler(handler);
620 }
621
622 void AddHandler(Action<T> success, Action<Exception> error, Action cancel, Promise<T> medium, bool inc) {
623 if (inc)
624 Interlocked.Increment(ref m_childrenCount);
625
626 AbstractHandler handler = new HandlerDescriptor(success, error, cancel, medium);
627
628 bool queued;
629
630 if (!IsResolved) {
631 m_handlers.Enqueue(handler);
632 queued = true;
633 } else {
634 // the promise is in resolved state, just invoke the handled with minimum overhead
635 queued = false;
636 InvokeHandler(handler);
637 }
638
639 if (queued && IsResolved && m_handlers.TryDequeue(out handler))
640 // if the promise have been resolved while we was adding handler to the queue
641 // we can't guarantee that someone is still processing it
642 // therefore we will fetch a handler from the queue and execute it
643 // note that fetched handler may be not the one that we have added
644 // even we can fetch no handlers at all :)
645 InvokeHandler(handler);
646 }
647
648 protected virtual void InvokeHandler(AbstractHandler handler) {
649 switch (m_state) {
650 case SUCCEEDED_STATE:
651 handler.Resolve(m_result);
652 break;
653 case REJECTED_STATE:
654 handler.Reject(m_error);
655 break;
656 case CANCELLED_STATE:
657 handler.Cancel();
658 break;
659 default:
660 // do nothing
661 return;
662 }
663 }
664
665 void OnStateChanged() {
666 AbstractHandler handler;
667 while (m_handlers.TryDequeue(out handler))
668 InvokeHandler(handler);
669 }
670
671 public bool IsExclusive {
672 get {
673 return m_childrenCount <= 1;
674 }
675 }
676
677 /// <summary>
678 /// Объединяет несколько обещаний в одно, результатом которого является массив результатов других обещаний.
679 /// Если хотябы одно из переданных обещаний не будет выполнено, то новое обещение тоже не будет выполнено.
680 /// При отмене нового обещания, переданные обещания также будут отменены, если никто больше на них не подписан.
681 /// </summary>
682 /// <param name="promises">Список обещаний. Если список пустой, то результирующее обещание возвращается уже выполненным.</param>
683 /// <returns>Обещание объединяющее в себе результат переданных обещаний.</returns>
684 /// <exception cref="ArgumentNullException"><paramref name="promises"/> не может быть null</exception>
685 public static IPromise<T[]> CreateComposite(IList<IPromise<T>> promises) {
686 if (promises == null)
687 throw new ArgumentNullException();
688
689 // создаем аккумулятор для результатов и результирующее обещание
690 var result = new T[promises.Count];
691 var promise = new Promise<T[]>();
692
693 // special case
694 if (promises.Count == 0) {
695 promise.Resolve(result);
696 return promise;
697 }
698
699 int pending = promises.Count;
700
701 for (int i = 0; i < promises.Count; i++) {
702 var dest = i;
703
704 if (promises[i] != null) {
705 promises[i].On(
706 x => {
707 result[dest] = x;
708 if (Interlocked.Decrement(ref pending) == 0)
709 promise.Resolve(result);
710 },
711 promise.Reject
712 );
713 } else {
714 if (Interlocked.Decrement(ref pending) == 0)
715 promise.Resolve(result);
716 }
717 }
718
719 promise.Cancelled(
720 () => {
721 foreach (var d in promises)
722 if (d != null && d.IsExclusive)
723 d.Cancel();
724 }
725 );
726
727 return promise;
728 }
729
730 /// <summary>
731 /// Объединяет несколько обещаний в одно. Результирующее обещание будет выполнено при
732 /// выполнении всех указанных обещаний. При этом возвращаемые значения первичных обещаний
733 /// игнорируются.
734 /// </summary>
735 /// <param name="promises">Коллекция первичных обещаний, которые будут объеденены в одно.</param>
736 /// <returns>Новое обещание, объединяющее в себе переданные.</returns>
737 /// <remarks>
738 /// Если в коллекции встречаюься <c>null</c>, то они воспринимаются как выполненные обещания.
739 /// </remarks>
740 public static IPromise CreateComposite(ICollection<IPromise> promises) {
741 if (promises == null)
742 throw new ArgumentNullException();
743 if (promises.Count == 0)
744 return Promise<object>.ResultToPromise(null);
745
746 int countdown = promises.Count;
747
748 var result = new Promise<object>();
749
750 foreach (var d in promises) {
751 if (d == null) {
752 if (Interlocked.Decrement(ref countdown) == 0)
753 result.Resolve(null);
754 } else {
755 d.Then(() => {
756 if (Interlocked.Decrement(ref countdown) == 0)
757 result.Resolve(null);
758 });
759 }
760 }
761
762 result.Cancelled(() => {
763 foreach (var d in promises)
764 if (d != null && d.IsExclusive)
765 d.Cancel();
766 });
767
768 return result;
769 }
770
771 public static Promise<T> ResultToPromise(T result) {
772 var p = new Promise<T>();
773 p.Resolve(result);
774 return p;
775 }
776
777 public static Promise<T> ExceptionToPromise(Exception error) {
778 if (error == null)
779 throw new ArgumentNullException();
780
781 var p = new Promise<T>();
782 p.Reject(error);
783 return p;
784 }
785
786 #region IPromiseBase explicit implementation
787
788 IPromise IPromise.Then(Action success, Action<Exception> error, Action cancel) {
789 return Then(
790 success != null ? new Func<T,T>(x => {
791 success();
792 return x;
793 }) : null,
794 error != null ? new Func<Exception,T>(e => {
795 error(e);
796 return default(T);
797 }) : null,
798 cancel
799 );
800 }
801
802 IPromise IPromise.Then(Action success, Action<Exception> error) {
803 return Then(
804 success != null ? new Func<T,T>(x => {
805 success();
806 return x;
807 }) : null,
808 error != null ? new Func<Exception,T>(e => {
809 error(e);
810 return default(T);
811 }) : null
812 );
813 }
814
815 IPromise IPromise.Then(Action success) {
816 Safe.ArgumentNotNull(success, "success");
817 return Then(x => {
818 success();
819 return x;
820 });
821 }
822
823 IPromise IPromise.Chain(Func<IPromise> chained, Func<Exception,IPromise> error, Action cancel) {
824 return ChainNoResult(chained, error, cancel);
825 }
826
827 IPromise ChainNoResult(Func<IPromise> chained, Func<Exception,IPromise> error, Action cancel) {
828 Safe.ArgumentNotNull(chained, "chained");
829
830 var medium = new Promise<object>(this);
831
832 Func<T,T> resultHandler = delegate {
833 if (medium.IsCancelled)
834 return default(T);
835
836 var promise = chained();
837
838 promise.On(
839 medium.Resolve,
840 medium.Reject,
841 () => medium.Reject(new OperationCanceledException()) // внешняя отмена связанной операции рассматривается как ошибка
842 );
843
844 // notify chained operation that it's not needed anymore
845 // порядок вызова Then, Cancelled важен, поскольку от этого
846 // зависит IsExclusive
847 medium.Cancelled(() => {
848 if (promise.IsExclusive)
849 promise.Cancel();
850 });
851
852 return default(T);
853 };
854
855 Func<Exception,T> errorHandler;
856
857 if (error != null)
858 errorHandler = delegate(Exception e) {
859 try {
860 var promise = error(e);
861
862 promise.On(
863 medium.Resolve,
864 medium.Reject,
865 () => medium.Reject(new OperationCanceledException()) // внешняя отмена связанной операции рассматривается как ошибка
866 );
867
868 // notify chained operation that it's not needed anymore
869 // порядок вызова Then, Cancelled важен, поскольку от этого
870 // зависит IsExclusive
871 medium.Cancelled(() => {
872 if (promise.IsExclusive)
873 promise.Cancel();
874 });
875 } catch (Exception e2) {
876 medium.Reject(e2);
877 }
878 return default(T);
879 };
880 else
881 errorHandler = err => {
882 medium.Reject(err);
883 return default(T);
884 };
885
886
887 Action cancelHandler;
888 if (cancel != null)
889 cancelHandler = () => {
890 if (cancel != null)
891 cancel();
892 medium.Cancel();
893 };
894 else
895 cancelHandler = medium.Cancel;
896
897 AddMappers(
898 resultHandler,
899 errorHandler,
900 cancelHandler,
901 null,
902 true
903 );
904
905 return medium;
906 }
907
908 IPromise IPromise.Chain(Func<IPromise> chained, Func<Exception,IPromise> error) {
909 return ChainNoResult(chained, error, null);
910 }
911
912 IPromise IPromise.Chain(Func<IPromise> chained) {
913 return ChainNoResult(chained, null, null);
914 }
915
916
917 void IPromise.On(Action success, Action<Exception> error, Action cancel) {
918 On(success != null ? new Action<T>(x => success()) : null, error, cancel);
919 }
920
921 void IPromise.On(Action success, Action<Exception> error) {
922 On(x => success(), error, null);
923 }
924
925 void IPromise.On(Action success) {
926 On(x => success(), null, null);
927 }
928
929 IPromise IPromise.Error(Action<Exception> error) {
930 return Error(error);
931 }
932
933 IPromise IPromise.Anyway(Action handler) {
934 return Anyway(handler);
935 }
936
937 IPromise IPromise.Cancelled(Action handler) {
938 return Cancelled(handler);
939 }
940
941 void IPromise.Join() {
942 Join();
943 }
944
945 void IPromise.Join(int timeout) {
946 Join(timeout);
947 }
948
949 #endregion
950
951
952
953 }
954 }
1 using System;
2 using System.Diagnostics;
3
4 namespace Implab {
5 public class Promise : AbstractPromise<Promise.HandlerDescriptor>, IPromise, IDeferred {
6
7 public struct HandlerDescriptor {
8 readonly Action m_success;
9 readonly Action<Exception> m_error;
10 readonly Action m_cancel;
11 readonly IDeferred m_deferred;
12
13 public HandlerDescriptor(Action success, Action<Exception> error, Action cancel, IDeferred deferred) {
14 m_success = success;
15 m_error = error;
16 m_cancel = cancel;
17 m_deferred = deferred;
18 }
19
20 public void SignalSuccess() {
21 if (m_success != null) {
22 try {
23 m_success();
24 if (m_deferred != null)
25 m_deferred.Resolve();
26 } catch (Exception err) {
27 SignalError(err);
28 }
29 }
30 }
31
32 public void SignalError(Exception err) {
33 if (m_error != null) {
34 try {
35 m_error(err);
36 if (m_deferred != null)
37 m_deferred.Resolve();
38 } catch (Exception err2) {
39 if (m_deferred != null)
40 m_deferred.Reject(err2);
41 }
42 } else {
43 if (m_deferred != null)
44 m_deferred.Reject(err);
45 }
46 }
47
48 public void SignalCancel() {
49 if (m_cancel != null) {
50 try {
51 m_cancel();
52 if (m_deferred != null)
53 m_deferred.Resolve();
54 } catch (Exception err) {
55 SignalError(err);
56 }
57 } else {
58 if (m_deferred != null)
59 m_deferred.Cancel();
60 }
61 }
62 }
63
64 public void Resolve() {
65 BeginSetResult();
66 EndSetResult();
67 }
68
69 public void Reject(Exception error) {
70 SetError(error);
71 }
72
73 #region implemented abstract members of AbstractPromise
74
75 protected override void SignalSuccess(HandlerDescriptor handler) {
76 handler.SignalSuccess();
77 }
78
79 protected override void SignalError(HandlerDescriptor handler, Exception error) {
80 handler.SignalError(error);
81 }
82
83 protected override void SignalCancelled(HandlerDescriptor handler) {
84 handler.SignalCancel();
85 }
86
87 protected override void Listen(PromiseEventType events, Action handler) {
88 AddHandler(new HandlerDescriptor(
89 events.HasFlag(PromiseEventType.Success) ? handler : null,
90 events.HasFlag(PromiseEventType.Error) ? new Action<Exception>(err => handler()) : null,
91 events.HasFlag(PromiseEventType.Cancelled) ? handler : null,
92 null
93 ));
94 }
95
96 #endregion
97
98
99 public Type PromiseType {
100 get {
101 return typeof(void);
102 }
103 }
104
105 public IPromise Then(Action success, Action<Exception> error, Action cancel) {
106 var promise = new Promise();
107 if (success != null)
108 promise.On(Cancel, PromiseEventType.Cancelled);
109
110 AddHandler(new HandlerDescriptor(success, error, cancel, promise));
111
112 return promise;
113 }
114
115 public IPromise Then(Action success, Action<Exception> error) {
116 return Then(success, error, null);
117 }
118
119 public IPromise Then(Action success) {
120 return Then(success, null, null);
121 }
122
123 public IPromise On(Action success, Action<Exception> error, Action cancel) {
124 AddHandler(new HandlerDescriptor(success, error, cancel, null));
125 return this;
126 }
127
128 public IPromise On(Action success, Action<Exception> error) {
129 return On(success, error, null);
130 }
131
132 public IPromise On(Action success) {
133 return On(success, null, null);
134 }
135
136 public IPromise On(Action handler, PromiseEventType events) {
137 return On(
138 events.HasFlag(PromiseEventType.Success) ? handler : null,
139 events.HasFlag(PromiseEventType.Error) ? new Action<Exception>(err => handler()) : null,
140 events.HasFlag(PromiseEventType.Cancelled) ? handler : null
141 );
142 }
143
144 public IPromise<T> Cast<T>() {
145 throw new InvalidCastException();
146 }
147
148 public IPromise Chain(Func<IPromise> chained, Func<Exception, IPromise> error, Func<IPromise> cancel) {
149 var medium = new Promise();
150
151 On(
152 () => {
153 if (medium.IsCancelled)
154 return;
155 if (chained != null)
156 ConnectPromise(chained(), medium);
157 },
158 ex => {
159 if (medium.IsCancelled)
160 return;
161 if (error != null) {
162 try {
163 ConnectPromise(error(ex), medium);
164 } catch (Exception ex2) {
165 medium.Reject(ex2);
166 }
167 } else {
168 medium.Reject(ex);
169 }
170 },
171 () => {
172 if (medium.IsCancelled)
173 return;
174 if (cancel != null)
175 ConnectPromise(cancel(), medium);
176 else
177 medium.Cancel();
178 }
179 );
180
181 if (chained != null)
182 medium.On(Cancel, PromiseEventType.Cancelled);
183
184 return medium;
185 }
186
187 static void ConnectPromise(IPromise result, Promise medium) {
188 if (result != null) {
189 result.On(
190 medium.Resolve,
191 medium.Reject,
192 () => medium.Reject(new OperationCanceledException())
193 );
194 medium.On(result.Cancel, PromiseEventType.Cancelled);
195 } else {
196 medium.Reject(
197 new NullReferenceException(
198 "The chained asynchronous operation returned" +
199 " 'null' where the promise instance is expected"
200 )
201 );
202 }
203 }
204
205 public IPromise Chain(Func<IPromise> chained, Func<Exception, IPromise> error) {
206 return Chain(chained, error, null);
207 }
208
209 public IPromise Chain(Func<IPromise> chained) {
210 return Chain(chained, null, null);
211 }
212
213 public IPromise Error(Action<Exception> error) {
214 var promise = new Promise();
215 On(
216 null,
217 err => {
218 if (error != null)
219 try {
220 error(err);
221 promise.Resolve();
222 } catch (Exception err2) {
223 promise.Reject(err2);
224 }
225 else
226 promise.Reject(err);
227 }
228 );
229
230 return promise;
231 }
232
233 public IPromise Cancelled(Action handler) {
234 var promise = new Promise();
235 On(
236 null,
237 null,
238 () => {
239 if (handler != null) {
240 try {
241 handler();
242 promise.Resolve();
243 } catch (Exception err) {
244 promise.Reject(err);
245 }
246 } else {
247 promise.Cancel();
248 }
249 }
250 );
251
252 return promise;
253 }
254
255
256 }
257 }
258
@@ -1,107 +1,134
1 1 using System.Threading;
2 2 using System;
3 3 using Implab.Diagnostics;
4 using System.Collections.Generic;
4 5
5 6
6 7 #if NET_4_5
7 8 using System.Threading.Tasks;
8 9 #endif
9 10
10 11 namespace Implab {
11 12 public static class PromiseExtensions {
12 13 public static IPromise<T> DispatchToCurrentContext<T>(this IPromise<T> that) {
13 14 Safe.ArgumentNotNull(that, "that");
14 15 var context = SynchronizationContext.Current;
15 16 if (context == null)
16 17 return that;
17 18
18 var p = new SyncContextPromise<T>(context, that);
19 var p = new SyncContextPromise<T>(context);
20 p.On(that.Cancel, PromiseEventType.Cancelled);
19 21
20 22 that.On(
21 23 p.Resolve,
22 24 p.Reject,
23 25 p.Cancel
24 26 );
25 27 return p;
26 28 }
27 29
28 30 public static IPromise<T> DispatchToContext<T>(this IPromise<T> that, SynchronizationContext context) {
29 31 Safe.ArgumentNotNull(that, "that");
30 32 Safe.ArgumentNotNull(context, "context");
31 33
32 var p = new SyncContextPromise<T>(context, that);
34 var p = new SyncContextPromise<T>(context);
35 p.On(that.Cancel, PromiseEventType.Cancelled);
36
33 37
34 38 that.On(
35 39 p.Resolve,
36 40 p.Reject,
37 41 p.Cancel
38 42 );
39 43 return p;
40 44 }
41 45
42 46 /// <summary>
43 47 /// Ensures the dispatched.
44 48 /// </summary>
45 49 /// <returns>The dispatched.</returns>
46 50 /// <param name="that">That.</param>
47 51 /// <param name="head">Head.</param>
48 52 /// <param name="cleanup">Cleanup.</param>
49 53 /// <typeparam name="TPromise">The 1st type parameter.</typeparam>
50 54 /// <typeparam name="T">The 2nd type parameter.</typeparam>
51 55 public static TPromise EnsureDispatched<TPromise,T>(this TPromise that, IPromise<T> head, Action<T> cleanup) where TPromise : IPromise{
52 56 Safe.ArgumentNotNull(that, "that");
53 57 Safe.ArgumentNotNull(head, "head");
54 58
55 59 that.On(null,null,() => head.On(cleanup));
56 60
57 61 return that;
58 62 }
59 63
60 64 public static AsyncCallback AsyncCallback<T>(this Promise<T> that, Func<IAsyncResult,T> callback) {
61 65 Safe.ArgumentNotNull(that, "that");
62 66 Safe.ArgumentNotNull(callback, "callback");
63 67 var op = TraceContext.Instance.CurrentOperation;
64 68 return ar => {
65 69 TraceContext.Instance.EnterLogicalOperation(op,false);
66 70 try {
67 71 that.Resolve(callback(ar));
68 72 } catch (Exception err) {
69 73 that.Reject(err);
70 74 } finally {
71 75 TraceContext.Instance.Leave();
72 76 }
73 77 };
74 78 }
75 79
76 80 static void CancelCallback(object cookie) {
77 81 ((ICancellable)cookie).Cancel();
78 82 }
79 83
80 84 /// <summary>
81 85 /// Cancells promise after the specified timeout is elapsed.
82 86 /// </summary>
83 87 /// <param name="that">The promise to cancel on timeout.</param>
84 88 /// <param name="milliseconds">The timeout in milliseconds.</param>
85 89 /// <typeparam name="TPromise">The 1st type parameter.</typeparam>
86 90 public static TPromise Timeout<TPromise>(this TPromise that, int milliseconds) where TPromise : IPromise {
87 91 Safe.ArgumentNotNull(that, "that");
88 92 var timer = new Timer(CancelCallback, that, milliseconds, -1);
89 93 that.On(timer.Dispose, PromiseEventType.All);
90 94 return that;
91 95 }
96
97 public static IPromise Combine(this ICollection<IPromise> that) {
98 Safe.ArgumentNotNull(that, "that");
99
100 int count = that.Count;
101 var medium = new Promise();
102
103 foreach (var p in that)
104 p.On(
105 () => {
106 if (Interlocked.Decrement(ref count) == 0)
107 medium.Resolve();
108 },
109 error => {
110 throw new Exception("The dependency promise is failed", error);
111 },
112 () => {
113 throw new OperationCanceledException("The dependency promise is cancelled");
114 }
115 );
116
117 return medium;
118 }
92 119
93 120 #if NET_4_5
94 121
95 122 public static Task<T> GetTask<T>(this IPromise<T> that) {
96 123 Safe.ArgumentNotNull(that, "that");
97 124 var tcs = new TaskCompletionSource<T>();
98 125
99 126 that.On(tcs.SetResult, tcs.SetException, tcs.SetCanceled);
100 127
101 128 return tcs.Task;
102 129 }
103 130
104 131 #endif
105 132 }
106 133 }
107 134
@@ -1,86 +1,86
1 1 using System;
2 2 using System.Collections.Generic;
3 3 using System.Linq;
4 4 using System.Text;
5 5 using System.Text.RegularExpressions;
6 6 using System.Diagnostics;
7 7
8 8 namespace Implab
9 9 {
10 10 public static class Safe
11 11 {
12 12 public static void ArgumentMatch(string param, string name, Regex rx) {
13 13 if (rx == null)
14 14 throw new ArgumentNullException("rx");
15 15 if (!rx.IsMatch(param))
16 16 throw new ArgumentException(String.Format("The prameter value must match {0}", rx), name);
17 17 }
18 18
19 19 public static void ArgumentNotEmpty(string param, string name) {
20 20 if (String.IsNullOrEmpty(param))
21 21 throw new ArgumentException("The parameter can't be empty", name);
22 22 }
23 23
24 24 public static void ArgumentNotEmpty<T>(T[] param, string name) {
25 25 if (param == null || param.Length == 0)
26 throw new ArgumentException("The array must be not emty");
26 throw new ArgumentException("The array must be not emty", name);
27 27 }
28 28
29 29 public static void ArgumentNotNull(object param, string name) {
30 30 if (param == null)
31 31 throw new ArgumentNullException(name);
32 32 }
33 33
34 34 public static void ArgumentInRange(int arg, int min, int max, string name) {
35 35 if (arg < min || arg > max)
36 36 throw new ArgumentOutOfRangeException(name);
37 37 }
38 38
39 39 public static void Dispose<T>(T obj) where T : class
40 40 {
41 41 var disp = obj as IDisposable;
42 42 if (disp != null)
43 43 disp.Dispose();
44 44 }
45 45
46 46 [DebuggerStepThrough]
47 47 public static IPromise<T> InvokePromise<T>(Func<T> action) {
48 48 ArgumentNotNull(action, "action");
49 49
50 50 var p = new Promise<T>();
51 51 try {
52 52 p.Resolve(action());
53 53 } catch (Exception err) {
54 54 p.Reject(err);
55 55 }
56 56
57 57 return p;
58 58 }
59 59
60 60 [DebuggerStepThrough]
61 61 public static IPromise InvokePromise(Action action) {
62 62 ArgumentNotNull(action, "action");
63 63
64 var p = new Promise<object>();
64 var p = new Promise();
65 65 try {
66 66 action();
67 67 p.Resolve();
68 68 } catch (Exception err) {
69 69 p.Reject(err);
70 70 }
71 71
72 72 return p;
73 73 }
74 74
75 75 [DebuggerStepThrough]
76 76 public static IPromise<T> InvokePromise<T>(Func<IPromise<T>> action) {
77 77 ArgumentNotNull(action, "action");
78 78
79 79 try {
80 80 return action() ?? Promise<T>.ExceptionToPromise(new Exception("The action returned null"));
81 81 } catch (Exception err) {
82 82 return Promise<T>.ExceptionToPromise(err);
83 83 }
84 84 }
85 85 }
86 86 }
@@ -1,22 +1,25
1 1 using System.Threading;
2 2
3 3 namespace Implab {
4 4 public class SyncContextPromise<T> : Promise<T> {
5 5 readonly SynchronizationContext m_context;
6 6
7 7 public SyncContextPromise(SynchronizationContext context) {
8 8 Safe.ArgumentNotNull(context, "context");
9 9 m_context = context;
10 10 }
11 11
12 public SyncContextPromise(SynchronizationContext context, IPromise parent)
13 : base(parent) {
14 Safe.ArgumentNotNull(context, "context");
15 m_context = context;
12 protected override void SignalSuccess(IDeferred<T> handler) {
13 m_context.Post(x => base.SignalSuccess(handler), null);
16 14 }
17 protected override void InvokeHandler(AbstractHandler handler) {
18 m_context.Post(x => base.InvokeHandler(handler),null);
15
16 protected override void SignalError(IDeferred<T> handler, System.Exception error) {
17 m_context.Post(x => base.SignalError(handler, error), null);
18 }
19
20 protected override void SignalCancelled(IDeferred<T> handler) {
21 m_context.Post(x => base.SignalCancelled(handler), null);
19 22 }
20 23 }
21 24 }
22 25
@@ -1,79 +1,110
1 1 using System;
2 2 using Implab.Diagnostics;
3 3 using Implab.Parallels;
4 4 using Implab;
5 5 using System.Collections.Generic;
6 6 using System.Collections.Concurrent;
7 7
8 8 namespace MonoPlay {
9 9 class MainClass {
10 10 public static void Main(string[] args) {
11 11 if (args == null)
12 12 throw new ArgumentNullException("args");
13 13
14 var q1 = new MTQueue<int>();
14 var q1 = new AsyncQueue<int>();
15 15 var q2 = new Queue<int>();
16 16
17 17 const int count = 10000000;
18 18
19 19
20 20 var t1 = Environment.TickCount;
21 21
22 Promise<int>.CreateComposite(
23 new [] {
24 AsyncPool.InvokeNewThread(() => {
25 for (var i = 0; i < count; i++)
26 q1.Enqueue(i);
27 }),
28 AsyncPool.InvokeNewThread(() => {
29 int temp = 0;
30 for(int i =0 ; i< count ; i++)
31 while(!q1.TryDequeue(out temp)){
32 }
33 })
34 }
35 ).Join();
22 new [] {
23 AsyncPool.InvokeNewThread(() => {
24 for (var i = 0; i < count; i++)
25 q1.Enqueue(i);
26 }),
27 AsyncPool.InvokeNewThread(() => {
28 for (var i = 0; i < count; i++)
29 q1.Enqueue(i);
30 }),
31 AsyncPool.InvokeNewThread(() => {
32 int temp = 0;
33 int i = 0;
34 while (i < count)
35 if (q1.TryDequeue(out temp))
36 i++;
37 }),
38 AsyncPool.InvokeNewThread(() => {
39 int temp = 0;
40 int i = 0;
41 while (i < count)
42 if (q1.TryDequeue(out temp))
43 i++;
44 })
45 }
46 .Combine()
47 .Join();
36 48
37 49 var t2 = Environment.TickCount;
38 50 Console.WriteLine("MTQueue: {0} ms", t2 - t1);
39 51
40 52 t1 = Environment.TickCount;
41 53
42 for (var i = 0; i < count; i++)
54 for (var i = 0; i < count * 2; i++)
43 55 q2.Enqueue(i);
44 56
57 for (var i = 0; i < count * 2; i++)
58 q2.Dequeue();
59
45 60 t2 = Environment.TickCount;
46 Console.WriteLine("LinkedList: {0} ms", t2 - t1);
61 Console.WriteLine("Queue: {0} ms", t2 - t1);
47 62
48 63 q2 = new Queue<int>();
49 64
50 65 t1 = Environment.TickCount;
51 66
52 Promise<int>.CreateComposite(
53 new [] {
54 AsyncPool.InvokeNewThread(() => {
55 for (var i = 0; i < count; i++)
56 lock (q2)
57 q2.Enqueue(i);
58 }),
59 AsyncPool.InvokeNewThread(() => {
60 for(int i = 0 ; i< count ;)
61 lock(q2) {
62 if(q2.Count == 0)
63 continue;
64 q2.Dequeue();
65 i++;
66 }
67
68 new [] {
69 AsyncPool.InvokeNewThread(() => {
70 for (var i = 0; i < count; i++)
71 lock (q2)
72 q2.Enqueue(i);
73 }),
74 AsyncPool.InvokeNewThread(() => {
75 for (var i = 0; i < count; i++)
76 lock (q2)
77 q2.Enqueue(i);
78 }),
79 AsyncPool.InvokeNewThread(() => {
80 for (int i = 0; i < count ;)
81 lock (q2) {
82 if (q2.Count == 0)
83 continue;
84 q2.Dequeue();
85 i++;
86 }
67 87
68 })
69 }
70 ).Join();
88 }),
89 AsyncPool.InvokeNewThread(() => {
90 for (int i = 0; i < count ;)
91 lock (q2) {
92 if (q2.Count == 0)
93 continue;
94 q2.Dequeue();
95 i++;
96 }
97
98 })
99 }
100 .Combine()
101 .Join();
71 102
72 103
73 104
74 105 t2 = Environment.TickCount;
75 Console.WriteLine("LinkedList+Lock: {0} ms", t2 - t1);
106 Console.WriteLine("Queue+Lock: {0} ms", t2 - t1);
76 107
77 108 }
78 109 }
79 110 }
1 NO CONTENT: file was removed
General Comments 0
You need to be logged in to leave comments. Login now