##// END OF EJS Templates
implemeted new cancellable promises concept
cin -
r10:aa33d0bb8c0c promises
parent child
Show More
@@ -0,0 +1,16
1 using System;
2 using System.Collections.Generic;
3 using System.Linq;
4 using System.Text;
5 using System.Threading;
6
7 namespace Implab.Test {
8 class PromiseHelper {
9 public static Promise<T> Sleep<T>(int timeout, T retVal) {
10 return AsyncPool.Invoke(() => {
11 Thread.Sleep(timeout);
12 return retVal;
13 });
14 }
15 }
16 }
@@ -10,3 +10,4 Implab.Fx/obj/
10 Implab.Fx/bin/
10 Implab.Fx/bin/
11 Implab.Fx.Test/bin/
11 Implab.Fx.Test/bin/
12 Implab.Fx.Test/obj/
12 Implab.Fx.Test/obj/
13 _ReSharper.Implab/
@@ -1,10 +1,9
1 using System;
1 using System;
2 using Microsoft.VisualStudio.TestTools.UnitTesting;
2 using Microsoft.VisualStudio.TestTools.UnitTesting;
3 using Implab;
4 using System.Reflection;
3 using System.Reflection;
5 using System.Threading;
4 using System.Threading;
6
5
7 namespace Implab.Tests
6 namespace Implab.Test
8 {
7 {
9 [TestClass]
8 [TestClass]
10 public class AsyncTests
9 public class AsyncTests
@@ -90,12 +89,39 namespace Implab.Tests
90 public void PoolTest ()
89 public void PoolTest ()
91 {
90 {
92 var pid = Thread.CurrentThread.ManagedThreadId;
91 var pid = Thread.CurrentThread.ManagedThreadId;
93 var p = AsyncPool.Invoke (() => {
92 var p = AsyncPool.Invoke (() => Thread.CurrentThread.ManagedThreadId);
94 return Thread.CurrentThread.ManagedThreadId;
95 });
96
93
97 Assert.AreNotEqual (pid, p.Join ());
94 Assert.AreNotEqual (pid, p.Join ());
98 }
95 }
96
97 [TestMethod]
98 public void ComplexCase1Test() {
99 var flags = new bool[3];
100
101 // op1 (aync 200ms) => op2 (async 200ms) => op3 (sync map)
102
103 var p = PromiseHelper
104 .Sleep(200, "Alan")
105 .Cancelled(() => flags[0] = true)
106 .Chain(x =>
107 PromiseHelper
108 .Sleep(200, "Hi, " + x)
109 .Map( y => y )
110 .Cancelled(() => flags[1] = true)
111 )
112 .Cancelled(() => flags[2] = true);
113 Thread.Sleep(300);
114 p.Cancel();
115 try {
116 Assert.AreEqual(p.Join(), "Hi, Alan");
117 Assert.Fail("Shouldn't get here");
118 } catch(OperationCanceledException) {
119 }
120
121 Assert.IsFalse(flags[0]);
122 Assert.IsTrue(flags[1]);
123 Assert.IsTrue(flags[2]);
124 }
99 }
125 }
100 }
126 }
101
127
@@ -46,6 +46,7
46 </ItemGroup>
46 </ItemGroup>
47 <ItemGroup>
47 <ItemGroup>
48 <Compile Include="AsyncTests.cs" />
48 <Compile Include="AsyncTests.cs" />
49 <Compile Include="PromiseHelper.cs" />
49 <Compile Include="Properties\AssemblyInfo.cs" />
50 <Compile Include="Properties\AssemblyInfo.cs" />
50 </ItemGroup>
51 </ItemGroup>
51 <ItemGroup>
52 <ItemGroup>
1 NO CONTENT: modified file, binary diff hidden
NO CONTENT: modified file, binary diff hidden
@@ -29,5 +29,12 namespace Implab
29 /// <param name="dependencies">Try to cancel the whole promise chain, the parent promise will be cancelled only if it has only one promise</param>
29 /// <param name="dependencies">Try to cancel the whole promise chain, the parent promise will be cancelled only if it has only one promise</param>
30 /// <returns></returns>
30 /// <returns></returns>
31 bool Cancel(bool dependencies);
31 bool Cancel(bool dependencies);
32
33 /// <summary>
34 /// Registers handler for the case when the promise is cencelled. If the promise already cancelled the
35 /// handler will be invoked immediatelly.
36 /// </summary>
37 /// <param name="handler">The handler</param>
38 void HandleCancelled(Action handler);
32 }
39 }
33 }
40 }
@@ -1,8 +1,6
1 using System;
1 using System;
2 using System.Collections.Generic;
2 using System.Collections.Generic;
3 using System.Linq;
4 using System.Reflection;
3 using System.Reflection;
5 using System.Text;
6 using System.Diagnostics;
4 using System.Diagnostics;
7 using System.Threading;
5 using System.Threading;
8
6
@@ -10,9 +8,9 namespace Implab {
10
8
11 public delegate void ErrorHandler(Exception e);
9 public delegate void ErrorHandler(Exception e);
12
10
13 public delegate void ResultHandler<T>(T result);
11 public delegate void ResultHandler<in T>(T result);
14 public delegate TNew ResultMapper<TSrc, TNew>(TSrc result);
12 public delegate TNew ResultMapper<in TSrc, out TNew>(TSrc result);
15 public delegate Promise<TNew> ChainedOperation<TSrc, TNew>(TSrc result);
13 public delegate Promise<TNew> ChainedOperation<in TSrc, TNew>(TSrc result);
16
14
17 /// <summary>
15 /// <summary>
18 /// Класс для асинхронного получения результатов. Так называемое "обещание".
16 /// Класс для асинхронного получения результатов. Так называемое "обещание".
@@ -55,20 +53,19 namespace Implab {
55 public ErrorHandler errorHandler;
53 public ErrorHandler errorHandler;
56 }
54 }
57
55
58 IPromise m_parent;
56 readonly IPromise m_parent;
59
57
60 LinkedList<ResultHandlerInfo> m_resultHandlers = new LinkedList<ResultHandlerInfo>();
58 LinkedList<ResultHandlerInfo> m_resultHandlers = new LinkedList<ResultHandlerInfo>();
61 LinkedList<Action> m_cancelHandlers = new LinkedList<Action>();
59 LinkedList<Action> m_cancelHandlers = new LinkedList<Action>();
62
60
63 object m_lock = new Object();
61 readonly object m_lock = new Object();
64 bool m_cancellable;
62 readonly bool m_cancellable;
63 int m_childrenCount = 0;
65
64
66 PromiseState m_state;
65 PromiseState m_state;
67 T m_result;
66 T m_result;
68 Exception m_error;
67 Exception m_error;
69
68
70 int m_childrenCount;
71
72 public Promise() {
69 public Promise() {
73 m_cancellable = true;
70 m_cancellable = true;
74 }
71 }
@@ -76,15 +73,14 namespace Implab {
76 public Promise(IPromise parent, bool cancellable) {
73 public Promise(IPromise parent, bool cancellable) {
77 m_cancellable = cancellable;
74 m_cancellable = cancellable;
78 m_parent = parent;
75 m_parent = parent;
76 if (parent != null)
77 parent.HandleCancelled(InternalCancel);
79 }
78 }
80
79
81 /// <summary>
80 void InternalCancel() {
82 /// Событие, возникающее при отмене асинхронной операции.
81 // don't try to cancel parent :)
83 /// </summary>
82 Cancel(false);
84 /// <description>
83 }
85 /// Как правило используется для оповещения объекта, выполняющего асинхронную операцию, о том, что ее следует отменить.
86 /// </description>
87 public event EventHandler Cancelled;
88
84
89 /// <summary>
85 /// <summary>
90 /// Выполняет обещание, сообщая об успешном выполнении.
86 /// Выполняет обещание, сообщая об успешном выполнении.
@@ -101,14 +97,7 namespace Implab {
101 m_state = PromiseState.Resolved;
97 m_state = PromiseState.Resolved;
102 }
98 }
103
99
104 // state has been changed to rejected new handlers can't be added
100 OnStateChanged();
105
106 foreach (var handler in m_resultHandlers)
107 InvokeHandler(handler);
108
109 /* ResultHandlerInfo handler;
110 while (FetchNextHandler(out handler))
111 InvokeHandler(handler); */
112 }
101 }
113
102
114 /// <summary>
103 /// <summary>
@@ -126,14 +115,7 namespace Implab {
126 m_state = PromiseState.Rejected;
115 m_state = PromiseState.Rejected;
127 }
116 }
128
117
129 // state has been changed to rejected new handlers can't be added
118 OnStateChanged();
130
131 foreach (var handler in m_resultHandlers)
132 InvokeHandler(handler);
133
134 /*ResultHandlerInfo handler;
135 while (FetchNextHandler(out handler))
136 InvokeHandler(handler);*/
137 }
119 }
138
120
139 /// <summary>
121 /// <summary>
@@ -144,6 +126,39 namespace Implab {
144 return Cancel(true);
126 return Cancel(true);
145 }
127 }
146
128
129 protected virtual void OnStateChanged() {
130 switch (m_state) {
131 case PromiseState.Resolved:
132 foreach (var resultHandlerInfo in m_resultHandlers)
133 try {
134 if (resultHandlerInfo.resultHandler != null)
135 resultHandlerInfo.resultHandler(m_result);
136 } catch (Exception e) {
137 try {
138 if (resultHandlerInfo.errorHandler != null)
139 resultHandlerInfo.errorHandler(e);
140 } catch { }
141 }
142 break;
143 case PromiseState.Cancelled:
144 foreach (var cancelHandler in m_cancelHandlers)
145 cancelHandler();
146 break;
147 case PromiseState.Rejected:
148 foreach (var resultHandlerInfo in m_resultHandlers)
149 try {
150 if (resultHandlerInfo.errorHandler != null)
151 resultHandlerInfo.errorHandler(m_error);
152 } catch { }
153 break;
154 default:
155 throw new InvalidOperationException(String.Format("Promise entered an invalid state {0}", m_state));
156 }
157
158 m_resultHandlers = null;
159 m_cancelHandlers = null;
160 }
161
147 /// <summary>
162 /// <summary>
148 /// Добавляет обработчики событий выполнения обещания.
163 /// Добавляет обработчики событий выполнения обещания.
149 /// </summary>
164 /// </summary>
@@ -162,15 +177,11 namespace Implab {
162
177
163 if (success != null)
178 if (success != null)
164 handlerInfo.resultHandler = x => {
179 handlerInfo.resultHandler = x => {
165 try {
180 success(x);
166 success(x);
181 medium.Resolve(x);
167 medium.Resolve(x);
168 } catch (Exception e) {
169 medium.Reject(e);
170 }
171 };
182 };
172 else
183 else
173 handlerInfo.resultHandler = x => medium.Resolve(x);
184 handlerInfo.resultHandler = medium.Resolve;
174
185
175 if (error != null)
186 if (error != null)
176 handlerInfo.errorHandler = x => {
187 handlerInfo.errorHandler = x => {
@@ -180,7 +191,7 namespace Implab {
180 medium.Reject(x);
191 medium.Reject(x);
181 };
192 };
182 else
193 else
183 handlerInfo.errorHandler = x => medium.Reject(x);
194 handlerInfo.errorHandler = medium.Reject;
184
195
185 AddHandler(handlerInfo);
196 AddHandler(handlerInfo);
186
197
@@ -203,6 +214,7 namespace Implab {
203
214
204 AddHandler(new ResultHandlerInfo {
215 AddHandler(new ResultHandlerInfo {
205 resultHandler = x => {
216 resultHandler = x => {
217 // to avoid handler being called multiple times we handle exception by ourselfs
206 try {
218 try {
207 handler();
219 handler();
208 medium.Resolve(x);
220 medium.Resolve(x);
@@ -234,20 +246,15 namespace Implab {
234 throw new ArgumentNullException("mapper");
246 throw new ArgumentNullException("mapper");
235
247
236 // создаем прицепленное обещание
248 // создаем прицепленное обещание
237 Promise<TNew> chained = new Promise<TNew>();
249 var chained = new Promise<TNew>();
238
250
239 AddHandler(new ResultHandlerInfo() {
251 AddHandler(new ResultHandlerInfo() {
240 resultHandler = delegate(T result) {
252 resultHandler = result => chained.Resolve(mapper(result)),
241 try {
242 // если преобразование выдаст исключение, то сработает reject сцепленного deferred
243 chained.Resolve(mapper(result));
244 } catch (Exception e) {
245 chained.Reject(e);
246 }
247 },
248 errorHandler = delegate(Exception e) {
253 errorHandler = delegate(Exception e) {
249 if (error != null)
254 if (error != null)
250 error(e);
255 try {
256 error(e);
257 } catch { }
251 // в случае ошибки нужно передать исключение дальше по цепочке
258 // в случае ошибки нужно передать исключение дальше по цепочке
252 chained.Reject(e);
259 chained.Reject(e);
253 }
260 }
@@ -276,19 +283,21 namespace Implab {
276 // создать посредника, к которому будут подвызяваться следующие обработчики.
283 // создать посредника, к которому будут подвызяваться следующие обработчики.
277 // когда будет выполнена реальная асинхронная операция, она обратиться к посреднику, чтобы
284 // когда будет выполнена реальная асинхронная операция, она обратиться к посреднику, чтобы
278 // передать через него результаты работы.
285 // передать через него результаты работы.
279 Promise<TNew> medium = new Promise<TNew>();
286 var medium = new Promise<TNew>(this, true);
280
287
281 AddHandler(new ResultHandlerInfo() {
288 AddHandler(new ResultHandlerInfo {
282 resultHandler = delegate(T result) {
289 resultHandler = delegate(T result) {
283 try {
290 if (medium.State == PromiseState.Cancelled)
284 chained(result).Then(
291 return;
285 x => medium.Resolve(x),
292
286 e => medium.Reject(e)
293 var promise = chained(result);
287 );
294
288 } catch (Exception e) {
295 // notify chained operation that it's not needed
289 // если сцепленное действие выдало исключение вместо обещания, то передаем ошибку по цепочке
296 medium.Cancelled(() => promise.Cancel());
290 medium.Reject(e);
297 promise.Then(
291 }
298 medium.Resolve,
299 medium.Reject
300 );
292 },
301 },
293 errorHandler = delegate(Exception e) {
302 errorHandler = delegate(Exception e) {
294 if (error != null)
303 if (error != null)
@@ -305,6 +314,22 namespace Implab {
305 return Chain(chained, null);
314 return Chain(chained, null);
306 }
315 }
307
316
317 public Promise<T> Cancelled(Action handler) {
318 if (handler == null)
319 return this;
320 lock (m_lock) {
321 if (m_state == PromiseState.Unresolved)
322 m_cancelHandlers.AddLast(handler);
323 else if (m_state == PromiseState.Cancelled)
324 handler();
325 }
326 return this;
327 }
328
329 public void HandleCancelled(Action handler) {
330 Cancelled(handler);
331 }
332
308 /// <summary>
333 /// <summary>
309 /// Дожидается отложенного обещания и в случае успеха, возвращает
334 /// Дожидается отложенного обещания и в случае успеха, возвращает
310 /// его, результат, в противном случае бросает исключение.
335 /// его, результат, в противном случае бросает исключение.
@@ -327,51 +352,37 namespace Implab {
327 /// <param name="timeout">Время ожидания</param>
352 /// <param name="timeout">Время ожидания</param>
328 /// <returns>Результат выполнения обещания</returns>
353 /// <returns>Результат выполнения обещания</returns>
329 public T Join(int timeout) {
354 public T Join(int timeout) {
330 ManualResetEvent evt = new ManualResetEvent(false);
355 var evt = new ManualResetEvent(false);
331 Anyway(() => evt.Set());
356 Anyway(() => evt.Set());
357 Cancelled(() => evt.Set());
332
358
333 if (!evt.WaitOne(timeout, true))
359 if (!evt.WaitOne(timeout, true))
334 throw new TimeoutException();
360 throw new TimeoutException();
335
361
336 if (m_error != null)
362 switch (State) {
337 throw new TargetInvocationException(m_error);
363 case PromiseState.Resolved:
338 else
364 return m_result;
339 return m_result;
365 case PromiseState.Cancelled:
366 throw new OperationCanceledException();
367 case PromiseState.Rejected:
368 throw new TargetInvocationException(m_error);
369 default:
370 throw new ApplicationException(String.Format("Invalid promise state {0}", State));
371 }
340 }
372 }
341
373
342 public T Join() {
374 public T Join() {
343 return Join(Timeout.Infinite);
375 return Join(Timeout.Infinite);
344 }
376 }
345
377
346 /// <summary>
347 /// Данный метод последовательно извлекает обработчики обещания и когда
348 /// их больше не осталось - ставит состояние "разрешено".
349 /// </summary>
350 /// <param name="handler">Информация об обработчике</param>
351 /// <returns>Признак того, что еще остались обработчики в очереди</returns>
352 bool FetchNextHandler(out ResultHandlerInfo handler) {
353 handler = default(ResultHandlerInfo);
354
355 lock (this) {
356 Debug.Assert(m_state != PromiseState.Unresolved);
357
358 if (m_resultHandlers.Count > 0) {
359 handler = m_resultHandlers.First.Value;
360 m_resultHandlers.RemoveFirst();
361 return true;
362 } else {
363 return false;
364 }
365 }
366 }
367
368 void AddHandler(ResultHandlerInfo handler) {
378 void AddHandler(ResultHandlerInfo handler) {
369 bool invokeRequired = false;
379 bool invokeRequired = false;
370
380
371 lock (this) {
381 lock (m_lock) {
372 if (m_state == PromiseState.Unresolved)
382 m_childrenCount++;
383 if (m_state == PromiseState.Unresolved) {
373 m_resultHandlers.AddLast(handler);
384 m_resultHandlers.AddLast(handler);
374 else
385 } else
375 invokeRequired = true;
386 invokeRequired = true;
376 }
387 }
377
388
@@ -381,18 +392,27 namespace Implab {
381 }
392 }
382
393
383 void InvokeHandler(ResultHandlerInfo handler) {
394 void InvokeHandler(ResultHandlerInfo handler) {
384 if (m_error == null) {
395 switch (m_state) {
385 try {
396 case PromiseState.Resolved:
386 if (handler.resultHandler != null)
397 try {
387 handler.resultHandler(m_result);
398 if (handler.resultHandler != null)
388 } catch { }
399 handler.resultHandler(m_result);
389 }
400 } catch (Exception e) {
390
401 try {
391 if (m_error != null) {
402 if (handler.errorHandler != null)
392 try {
403 handler.errorHandler(e);
393 if (handler.errorHandler != null)
404 } catch { }
394 handler.errorHandler(m_error);
405 }
395 } catch { }
406 break;
407 case PromiseState.Rejected:
408 try {
409 if (handler.errorHandler != null)
410 handler.errorHandler(m_error);
411 } catch { }
412 break;
413 default:
414 // do nothing
415 return;
396 }
416 }
397 }
417 }
398
418
@@ -426,15 +446,11 namespace Implab {
426 }
446 }
427 }
447 }
428
448
429 if (dependencies && m_parent != null && m_parent.IsExclusive) {
449 if (result)
430 // TODO syncronize IsExclusive, AddHandler, Cancel (maybe CancelExclusive)
450 OnStateChanged();
431 m_parent.Cancel(true);
432 }
433
451
434 if (result) {
452 if (dependencies && m_parent != null && m_parent.IsExclusive) {
435 // state has been changed to cancelled, new handlers can't be added
453 m_parent.Cancel(true);
436 foreach (var handler in m_cancelHandlers)
437 handler();
438 }
454 }
439
455
440 return result;
456 return result;
@@ -14,9 +14,8 namespace Implab
14 /// </remarks>
14 /// </remarks>
15 class TaskController
15 class TaskController
16 {
16 {
17 object m_lock;
17 readonly object m_lock;
18 string m_message;
18 string m_message;
19 bool m_cancelled;
20
19
21 float m_current;
20 float m_current;
22 float m_max;
21 float m_max;
General Comments 0
You need to be logged in to leave comments. Login now