##// END OF EJS Templates
implemeted new cancellable promises concept
cin -
r10:aa33d0bb8c0c promises
parent child
Show More
@@ -0,0 +1,16
1 using System;
2 using System.Collections.Generic;
3 using System.Linq;
4 using System.Text;
5 using System.Threading;
6
7 namespace Implab.Test {
8 class PromiseHelper {
9 public static Promise<T> Sleep<T>(int timeout, T retVal) {
10 return AsyncPool.Invoke(() => {
11 Thread.Sleep(timeout);
12 return retVal;
13 });
14 }
15 }
16 }
@@ -10,3 +10,4 Implab.Fx/obj/
10 10 Implab.Fx/bin/
11 11 Implab.Fx.Test/bin/
12 12 Implab.Fx.Test/obj/
13 _ReSharper.Implab/
@@ -1,10 +1,9
1 1 using System;
2 2 using Microsoft.VisualStudio.TestTools.UnitTesting;
3 using Implab;
4 3 using System.Reflection;
5 4 using System.Threading;
6 5
7 namespace Implab.Tests
6 namespace Implab.Test
8 7 {
9 8 [TestClass]
10 9 public class AsyncTests
@@ -90,12 +89,39 namespace Implab.Tests
90 89 public void PoolTest ()
91 90 {
92 91 var pid = Thread.CurrentThread.ManagedThreadId;
93 var p = AsyncPool.Invoke (() => {
94 return Thread.CurrentThread.ManagedThreadId;
95 });
92 var p = AsyncPool.Invoke (() => Thread.CurrentThread.ManagedThreadId);
96 93
97 94 Assert.AreNotEqual (pid, p.Join ());
98 95 }
96
97 [TestMethod]
98 public void ComplexCase1Test() {
99 var flags = new bool[3];
100
101 // op1 (aync 200ms) => op2 (async 200ms) => op3 (sync map)
102
103 var p = PromiseHelper
104 .Sleep(200, "Alan")
105 .Cancelled(() => flags[0] = true)
106 .Chain(x =>
107 PromiseHelper
108 .Sleep(200, "Hi, " + x)
109 .Map( y => y )
110 .Cancelled(() => flags[1] = true)
111 )
112 .Cancelled(() => flags[2] = true);
113 Thread.Sleep(300);
114 p.Cancel();
115 try {
116 Assert.AreEqual(p.Join(), "Hi, Alan");
117 Assert.Fail("Shouldn't get here");
118 } catch(OperationCanceledException) {
119 }
120
121 Assert.IsFalse(flags[0]);
122 Assert.IsTrue(flags[1]);
123 Assert.IsTrue(flags[2]);
124 }
99 125 }
100 126 }
101 127
@@ -46,6 +46,7
46 46 </ItemGroup>
47 47 <ItemGroup>
48 48 <Compile Include="AsyncTests.cs" />
49 <Compile Include="PromiseHelper.cs" />
49 50 <Compile Include="Properties\AssemblyInfo.cs" />
50 51 </ItemGroup>
51 52 <ItemGroup>
1 NO CONTENT: modified file, binary diff hidden
@@ -29,5 +29,12 namespace Implab
29 29 /// <param name="dependencies">Try to cancel the whole promise chain, the parent promise will be cancelled only if it has only one promise</param>
30 30 /// <returns></returns>
31 31 bool Cancel(bool dependencies);
32
33 /// <summary>
34 /// Registers handler for the case when the promise is cencelled. If the promise already cancelled the
35 /// handler will be invoked immediatelly.
36 /// </summary>
37 /// <param name="handler">The handler</param>
38 void HandleCancelled(Action handler);
32 39 }
33 40 }
@@ -1,8 +1,6
1 1 using System;
2 2 using System.Collections.Generic;
3 using System.Linq;
4 3 using System.Reflection;
5 using System.Text;
6 4 using System.Diagnostics;
7 5 using System.Threading;
8 6
@@ -10,9 +8,9 namespace Implab {
10 8
11 9 public delegate void ErrorHandler(Exception e);
12 10
13 public delegate void ResultHandler<T>(T result);
14 public delegate TNew ResultMapper<TSrc, TNew>(TSrc result);
15 public delegate Promise<TNew> ChainedOperation<TSrc, TNew>(TSrc result);
11 public delegate void ResultHandler<in T>(T result);
12 public delegate TNew ResultMapper<in TSrc, out TNew>(TSrc result);
13 public delegate Promise<TNew> ChainedOperation<in TSrc, TNew>(TSrc result);
16 14
17 15 /// <summary>
18 16 /// Класс для асинхронного получения результатов. Так называемое "обещание".
@@ -55,20 +53,19 namespace Implab {
55 53 public ErrorHandler errorHandler;
56 54 }
57 55
58 IPromise m_parent;
56 readonly IPromise m_parent;
59 57
60 58 LinkedList<ResultHandlerInfo> m_resultHandlers = new LinkedList<ResultHandlerInfo>();
61 59 LinkedList<Action> m_cancelHandlers = new LinkedList<Action>();
62 60
63 object m_lock = new Object();
64 bool m_cancellable;
61 readonly object m_lock = new Object();
62 readonly bool m_cancellable;
63 int m_childrenCount = 0;
65 64
66 65 PromiseState m_state;
67 66 T m_result;
68 67 Exception m_error;
69 68
70 int m_childrenCount;
71
72 69 public Promise() {
73 70 m_cancellable = true;
74 71 }
@@ -76,15 +73,14 namespace Implab {
76 73 public Promise(IPromise parent, bool cancellable) {
77 74 m_cancellable = cancellable;
78 75 m_parent = parent;
76 if (parent != null)
77 parent.HandleCancelled(InternalCancel);
79 78 }
80 79
81 /// <summary>
82 /// Событие, возникающее при отмене асинхронной операции.
83 /// </summary>
84 /// <description>
85 /// Как правило используется для оповещения объекта, выполняющего асинхронную операцию, о том, что ее следует отменить.
86 /// </description>
87 public event EventHandler Cancelled;
80 void InternalCancel() {
81 // don't try to cancel parent :)
82 Cancel(false);
83 }
88 84
89 85 /// <summary>
90 86 /// Выполняет обещание, сообщая об успешном выполнении.
@@ -101,14 +97,7 namespace Implab {
101 97 m_state = PromiseState.Resolved;
102 98 }
103 99
104 // state has been changed to rejected new handlers can't be added
105
106 foreach (var handler in m_resultHandlers)
107 InvokeHandler(handler);
108
109 /* ResultHandlerInfo handler;
110 while (FetchNextHandler(out handler))
111 InvokeHandler(handler); */
100 OnStateChanged();
112 101 }
113 102
114 103 /// <summary>
@@ -126,14 +115,7 namespace Implab {
126 115 m_state = PromiseState.Rejected;
127 116 }
128 117
129 // state has been changed to rejected new handlers can't be added
130
131 foreach (var handler in m_resultHandlers)
132 InvokeHandler(handler);
133
134 /*ResultHandlerInfo handler;
135 while (FetchNextHandler(out handler))
136 InvokeHandler(handler);*/
118 OnStateChanged();
137 119 }
138 120
139 121 /// <summary>
@@ -144,6 +126,39 namespace Implab {
144 126 return Cancel(true);
145 127 }
146 128
129 protected virtual void OnStateChanged() {
130 switch (m_state) {
131 case PromiseState.Resolved:
132 foreach (var resultHandlerInfo in m_resultHandlers)
133 try {
134 if (resultHandlerInfo.resultHandler != null)
135 resultHandlerInfo.resultHandler(m_result);
136 } catch (Exception e) {
137 try {
138 if (resultHandlerInfo.errorHandler != null)
139 resultHandlerInfo.errorHandler(e);
140 } catch { }
141 }
142 break;
143 case PromiseState.Cancelled:
144 foreach (var cancelHandler in m_cancelHandlers)
145 cancelHandler();
146 break;
147 case PromiseState.Rejected:
148 foreach (var resultHandlerInfo in m_resultHandlers)
149 try {
150 if (resultHandlerInfo.errorHandler != null)
151 resultHandlerInfo.errorHandler(m_error);
152 } catch { }
153 break;
154 default:
155 throw new InvalidOperationException(String.Format("Promise entered an invalid state {0}", m_state));
156 }
157
158 m_resultHandlers = null;
159 m_cancelHandlers = null;
160 }
161
147 162 /// <summary>
148 163 /// Добавляет обработчики событий выполнения обещания.
149 164 /// </summary>
@@ -162,15 +177,11 namespace Implab {
162 177
163 178 if (success != null)
164 179 handlerInfo.resultHandler = x => {
165 try {
166 success(x);
167 medium.Resolve(x);
168 } catch (Exception e) {
169 medium.Reject(e);
170 }
180 success(x);
181 medium.Resolve(x);
171 182 };
172 183 else
173 handlerInfo.resultHandler = x => medium.Resolve(x);
184 handlerInfo.resultHandler = medium.Resolve;
174 185
175 186 if (error != null)
176 187 handlerInfo.errorHandler = x => {
@@ -180,7 +191,7 namespace Implab {
180 191 medium.Reject(x);
181 192 };
182 193 else
183 handlerInfo.errorHandler = x => medium.Reject(x);
194 handlerInfo.errorHandler = medium.Reject;
184 195
185 196 AddHandler(handlerInfo);
186 197
@@ -203,6 +214,7 namespace Implab {
203 214
204 215 AddHandler(new ResultHandlerInfo {
205 216 resultHandler = x => {
217 // to avoid handler being called multiple times we handle exception by ourselfs
206 218 try {
207 219 handler();
208 220 medium.Resolve(x);
@@ -234,20 +246,15 namespace Implab {
234 246 throw new ArgumentNullException("mapper");
235 247
236 248 // создаем прицепленное обещание
237 Promise<TNew> chained = new Promise<TNew>();
249 var chained = new Promise<TNew>();
238 250
239 251 AddHandler(new ResultHandlerInfo() {
240 resultHandler = delegate(T result) {
241 try {
242 // если преобразование выдаст исключение, то сработает reject сцепленного deferred
243 chained.Resolve(mapper(result));
244 } catch (Exception e) {
245 chained.Reject(e);
246 }
247 },
252 resultHandler = result => chained.Resolve(mapper(result)),
248 253 errorHandler = delegate(Exception e) {
249 254 if (error != null)
250 error(e);
255 try {
256 error(e);
257 } catch { }
251 258 // в случае ошибки нужно передать исключение дальше по цепочке
252 259 chained.Reject(e);
253 260 }
@@ -276,19 +283,21 namespace Implab {
276 283 // создать посредника, к которому будут подвызяваться следующие обработчики.
277 284 // когда будет выполнена реальная асинхронная операция, она обратиться к посреднику, чтобы
278 285 // передать через него результаты работы.
279 Promise<TNew> medium = new Promise<TNew>();
286 var medium = new Promise<TNew>(this, true);
280 287
281 AddHandler(new ResultHandlerInfo() {
288 AddHandler(new ResultHandlerInfo {
282 289 resultHandler = delegate(T result) {
283 try {
284 chained(result).Then(
285 x => medium.Resolve(x),
286 e => medium.Reject(e)
287 );
288 } catch (Exception e) {
289 // если сцепленное действие выдало исключение вместо обещания, то передаем ошибку по цепочке
290 medium.Reject(e);
291 }
290 if (medium.State == PromiseState.Cancelled)
291 return;
292
293 var promise = chained(result);
294
295 // notify chained operation that it's not needed
296 medium.Cancelled(() => promise.Cancel());
297 promise.Then(
298 medium.Resolve,
299 medium.Reject
300 );
292 301 },
293 302 errorHandler = delegate(Exception e) {
294 303 if (error != null)
@@ -305,6 +314,22 namespace Implab {
305 314 return Chain(chained, null);
306 315 }
307 316
317 public Promise<T> Cancelled(Action handler) {
318 if (handler == null)
319 return this;
320 lock (m_lock) {
321 if (m_state == PromiseState.Unresolved)
322 m_cancelHandlers.AddLast(handler);
323 else if (m_state == PromiseState.Cancelled)
324 handler();
325 }
326 return this;
327 }
328
329 public void HandleCancelled(Action handler) {
330 Cancelled(handler);
331 }
332
308 333 /// <summary>
309 334 /// Дожидается отложенного обещания и в случае успеха, возвращает
310 335 /// его, результат, в противном случае бросает исключение.
@@ -327,51 +352,37 namespace Implab {
327 352 /// <param name="timeout">Время ожидания</param>
328 353 /// <returns>Результат выполнения обещания</returns>
329 354 public T Join(int timeout) {
330 ManualResetEvent evt = new ManualResetEvent(false);
355 var evt = new ManualResetEvent(false);
331 356 Anyway(() => evt.Set());
357 Cancelled(() => evt.Set());
332 358
333 359 if (!evt.WaitOne(timeout, true))
334 360 throw new TimeoutException();
335 361
336 if (m_error != null)
337 throw new TargetInvocationException(m_error);
338 else
339 return m_result;
362 switch (State) {
363 case PromiseState.Resolved:
364 return m_result;
365 case PromiseState.Cancelled:
366 throw new OperationCanceledException();
367 case PromiseState.Rejected:
368 throw new TargetInvocationException(m_error);
369 default:
370 throw new ApplicationException(String.Format("Invalid promise state {0}", State));
371 }
340 372 }
341 373
342 374 public T Join() {
343 375 return Join(Timeout.Infinite);
344 376 }
345 377
346 /// <summary>
347 /// Данный метод последовательно извлекает обработчики обещания и когда
348 /// их больше не осталось - ставит состояние "разрешено".
349 /// </summary>
350 /// <param name="handler">Информация об обработчике</param>
351 /// <returns>Признак того, что еще остались обработчики в очереди</returns>
352 bool FetchNextHandler(out ResultHandlerInfo handler) {
353 handler = default(ResultHandlerInfo);
354
355 lock (this) {
356 Debug.Assert(m_state != PromiseState.Unresolved);
357
358 if (m_resultHandlers.Count > 0) {
359 handler = m_resultHandlers.First.Value;
360 m_resultHandlers.RemoveFirst();
361 return true;
362 } else {
363 return false;
364 }
365 }
366 }
367
368 378 void AddHandler(ResultHandlerInfo handler) {
369 379 bool invokeRequired = false;
370 380
371 lock (this) {
372 if (m_state == PromiseState.Unresolved)
381 lock (m_lock) {
382 m_childrenCount++;
383 if (m_state == PromiseState.Unresolved) {
373 384 m_resultHandlers.AddLast(handler);
374 else
385 } else
375 386 invokeRequired = true;
376 387 }
377 388
@@ -381,18 +392,27 namespace Implab {
381 392 }
382 393
383 394 void InvokeHandler(ResultHandlerInfo handler) {
384 if (m_error == null) {
385 try {
386 if (handler.resultHandler != null)
387 handler.resultHandler(m_result);
388 } catch { }
389 }
390
391 if (m_error != null) {
392 try {
393 if (handler.errorHandler != null)
394 handler.errorHandler(m_error);
395 } catch { }
395 switch (m_state) {
396 case PromiseState.Resolved:
397 try {
398 if (handler.resultHandler != null)
399 handler.resultHandler(m_result);
400 } catch (Exception e) {
401 try {
402 if (handler.errorHandler != null)
403 handler.errorHandler(e);
404 } catch { }
405 }
406 break;
407 case PromiseState.Rejected:
408 try {
409 if (handler.errorHandler != null)
410 handler.errorHandler(m_error);
411 } catch { }
412 break;
413 default:
414 // do nothing
415 return;
396 416 }
397 417 }
398 418
@@ -426,15 +446,11 namespace Implab {
426 446 }
427 447 }
428 448
429 if (dependencies && m_parent != null && m_parent.IsExclusive) {
430 // TODO syncronize IsExclusive, AddHandler, Cancel (maybe CancelExclusive)
431 m_parent.Cancel(true);
432 }
449 if (result)
450 OnStateChanged();
433 451
434 if (result) {
435 // state has been changed to cancelled, new handlers can't be added
436 foreach (var handler in m_cancelHandlers)
437 handler();
452 if (dependencies && m_parent != null && m_parent.IsExclusive) {
453 m_parent.Cancel(true);
438 454 }
439 455
440 456 return result;
@@ -14,9 +14,8 namespace Implab
14 14 /// </remarks>
15 15 class TaskController
16 16 {
17 object m_lock;
17 readonly object m_lock;
18 18 string m_message;
19 bool m_cancelled;
20 19
21 20 float m_current;
22 21 float m_max;
General Comments 0
You need to be logged in to leave comments. Login now