##// END OF EJS Templates
Promise is rewritten to use interlocked operations instead of locks
cin -
r19:e3935fdf59a2 promises
parent child
Show More
@@ -14,7 +14,7 namespace Implab.Test {
14 14 p.Then(x => res = x);
15 15 p.Resolve(100);
16 16
17 Assert.AreEqual(res, 100);
17 Assert.AreEqual(100, res);
18 18 }
19 19
20 20 [TestMethod]
@@ -244,7 +244,7 namespace Implab.Test {
244 244 [TestMethod]
245 245 public void ChainedMapTest() {
246 246
247 using (var pool = new WorkerPool(8,100,0)) {
247 using (var pool = new WorkerPool(4,4,0)) {
248 248 int count = 10000;
249 249
250 250 double[] args = new double[count];
1 NO CONTENT: modified file, binary diff hidden
@@ -15,19 +15,6 namespace Implab
15 15 get;
16 16 }
17 17
18 /// <summary>
19 /// The current state of the promise.
20 /// </summary>
21 PromiseState State
22 {
23 get;
24 }
25 18
26 /// <summary>
27 /// Registers handler for the case when the promise is cencelled. If the promise already cancelled the
28 /// handler will be invoked immediatelly.
29 /// </summary>
30 /// <param name="handler">The handler</param>
31 void HandleCancelled(Action handler);
32 19 }
33 20 }
@@ -140,7 +140,7 namespace Implab.Parallels {
140 140
141 141 AsyncPool.InvokeNewThread(() => {
142 142 for (int i = 0; i < source.Length; i++) {
143 if(promise.State != PromiseState.Unresolved)
143 if(promise.IsResolved)
144 144 break; // stop processing in case of error or cancellation
145 145 var idx = i;
146 146 semaphore.WaitOne();
@@ -42,12 +42,13 namespace Implab.Parallels {
42 42 next = first.next;
43 43 if (next == null) {
44 44 // this is the last element,
45 // then try to update tail
45 // then try to update the tail
46 46 if (first != Interlocked.CompareExchange(ref m_last, null, first)) {
47 // this is inconsistent situation which means that the queue is empty
47 // this is a ace condition
48 48 if (m_last == null)
49 // the queue is empty
49 50 return false;
50 // tail has been changed, that means that we need to restart
51 // tail has been changed, than we need to restart
51 52 continue;
52 53 }
53 54
@@ -3,6 +3,7 using System.Collections.Generic;
3 3 using System.Reflection;
4 4 using System.Diagnostics;
5 5 using System.Threading;
6 using Implab.Parallels;
6 7
7 8 namespace Implab {
8 9
@@ -48,24 +49,53 namespace Implab {
48 49 /// </remarks>
49 50 public class Promise<T> : IPromise {
50 51
51 struct ResultHandlerInfo {
52 struct HandlerDescriptor {
52 53 public ResultHandler<T> resultHandler;
53 54 public ErrorHandler errorHandler;
55 public Action cancellHandler;
56
57 public void Resolve(T result) {
58 if (resultHandler != null)
59 try {
60 resultHandler(result);
61 } catch (Exception e) {
62 Reject(e);
63 }
64 }
65
66 public void Reject(Exception err) {
67 if (errorHandler != null)
68 try {
69 errorHandler(err);
70 } catch {
71 }
72 }
73
74 public void Cancel() {
75 if (cancellHandler != null)
76 try {
77 cancellHandler();
78 } catch {
79 }
80 }
54 81 }
55 82
83 const int UnresolvedSate = 0;
84 const int TransitionalState = 1;
85 const int ResolvedState = 2;
86 const int RejectedState = 3;
87 const int CancelledState = 4;
88
56 89 readonly IPromise m_parent;
57
58 LinkedList<ResultHandlerInfo> m_resultHandlers = new LinkedList<ResultHandlerInfo>();
59 LinkedList<Action> m_cancelHandlers = new LinkedList<Action>();
90 readonly bool m_cancellable;
60 91
61 readonly object m_lock = new Object();
62 readonly bool m_cancellable;
63 92 int m_childrenCount = 0;
64
65 PromiseState m_state;
93 int m_state;
66 94 T m_result;
67 95 Exception m_error;
68 96
97 readonly MTQueue<HandlerDescriptor> m_handlers = new MTQueue<HandlerDescriptor>();
98
69 99 public Promise() {
70 100 m_cancellable = true;
71 101 }
@@ -73,8 +103,6 namespace Implab {
73 103 public Promise(IPromise parent, bool cancellable) {
74 104 m_cancellable = cancellable;
75 105 m_parent = parent;
76 if (parent != null)
77 parent.HandleCancelled(InternalCancel);
78 106 }
79 107
80 108 void InternalCancel() {
@@ -82,22 +110,39 namespace Implab {
82 110 Cancel(false);
83 111 }
84 112
113 bool BeginTransit() {
114 return UnresolvedSate == Interlocked.CompareExchange(ref m_state, TransitionalState, UnresolvedSate);
115 }
116
117 void CompleteTransit(int state) {
118 if (TransitionalState != Interlocked.CompareExchange(ref m_state, state, TransitionalState))
119 throw new InvalidOperationException("Can't complete transition when the object isn't in the transitional state");
120 }
121
122 public bool IsResolved {
123 get {
124 return m_state > 1;
125 }
126 }
127
128 public bool IsCancelled {
129 get {
130 return m_state == CancelledState;
131 }
132 }
133
85 134 /// <summary>
86 135 /// ВыполняСт ΠΎΠ±Π΅Ρ‰Π°Π½ΠΈΠ΅, сообщая ΠΎΠ± ΡƒΡΠΏΠ΅ΡˆΠ½ΠΎΠΌ Π²Ρ‹ΠΏΠΎΠ»Π½Π΅Π½ΠΈΠΈ.
87 136 /// </summary>
88 137 /// <param name="result">Π Π΅Π·ΡƒΠ»ΡŒΡ‚Π°Ρ‚ выполнСния.</param>
89 138 /// <exception cref="InvalidOperationException">Π”Π°Π½Π½ΠΎΠ΅ ΠΎΠ±Π΅Ρ‰Π°Π½ΠΈΠ΅ ΡƒΠΆΠ΅ Π²Ρ‹ΠΏΠΎΠ»Π½Π΅Π½ΠΎ</exception>
90 139 public void Resolve(T result) {
91 lock (m_lock) {
92 if (m_state == PromiseState.Cancelled)
93 return;
94 if (m_state != PromiseState.Unresolved)
95 throw new InvalidOperationException("The promise is already resolved");
140 if (BeginTransit()) {
96 141 m_result = result;
97 m_state = PromiseState.Resolved;
98 }
99
100 OnStateChanged();
142 CompleteTransit(ResolvedState);
143 OnStateChanged();
144 } else if (m_state != CancelledState)
145 throw new InvalidOperationException("The promise is already resolved");
101 146 }
102 147
103 148 /// <summary>
@@ -111,16 +156,12 namespace Implab {
111 156 /// <param name="error">Π˜ΡΠΊΠ»ΡŽΡ‡Π΅Π½ΠΈΠ΅ возникшСС ΠΏΡ€ΠΈ Π²Ρ‹ΠΏΠΎΠ»Π½Π΅Π½ΠΈΠΈ ΠΎΠΏΠ΅Ρ€Π°Ρ†ΠΈΠΈ</param>
112 157 /// <exception cref="InvalidOperationException">Π”Π°Π½Π½ΠΎΠ΅ ΠΎΠ±Π΅Ρ‰Π°Π½ΠΈΠ΅ ΡƒΠΆΠ΅ Π²Ρ‹ΠΏΠΎΠ»Π½Π΅Π½ΠΎ</exception>
113 158 public void Reject(Exception error) {
114 lock (m_lock) {
115 if (m_state == PromiseState.Cancelled || m_state == PromiseState.Rejected)
116 return;
117 if (m_state != PromiseState.Unresolved)
118 throw new InvalidOperationException("The promise is already resolved");
159 if (BeginTransit()) {
119 160 m_error = error;
120 m_state = PromiseState.Rejected;
121 }
122
123 OnStateChanged();
161 CompleteTransit(RejectedState);
162 OnStateChanged();
163 } else if (m_state == ResolvedState)
164 throw new InvalidOperationException("The promise is already resolved");
124 165 }
125 166
126 167 /// <summary>
@@ -144,27 +185,27 namespace Implab {
144 185
145 186 var medium = new Promise<T>(this, true);
146 187
147 var handlerInfo = new ResultHandlerInfo();
148
188 ResultHandler<T> resultHandler;
149 189 if (success != null)
150 handlerInfo.resultHandler = x => {
190 resultHandler = x => {
151 191 success(x);
152 192 medium.Resolve(x);
153 193 };
154 194 else
155 handlerInfo.resultHandler = medium.Resolve;
195 resultHandler = medium.Resolve;
156 196
197 ErrorHandler errorHandler;
157 198 if (error != null)
158 handlerInfo.errorHandler = x => {
199 errorHandler = x => {
159 200 try {
160 201 error(x);
161 202 } catch { }
162 203 medium.Reject(x);
163 204 };
164 205 else
165 handlerInfo.errorHandler = medium.Reject;
206 errorHandler = medium.Reject;
166 207
167 AddHandler(handlerInfo);
208 AddHandler(resultHandler, errorHandler, medium.InternalCancel);
168 209
169 210 return medium;
170 211 }
@@ -182,27 +223,28 namespace Implab {
182 223
183 224 var medium = new Promise<T>(this, true);
184 225
185 var handlerInfo = new ResultHandlerInfo();
226 ResultHandler<T> resultHandler;
227 ErrorHandler errorHandler;
186 228
187 229 if (success != null)
188 handlerInfo.resultHandler = x => {
230 resultHandler = x => {
189 231 success(x);
190 232 medium.Resolve(x);
191 233 };
192 234 else
193 handlerInfo.resultHandler = medium.Resolve;
235 resultHandler = medium.Resolve;
194 236
195 237 if (error != null)
196 handlerInfo.errorHandler = x => {
238 errorHandler = x => {
197 239 try {
198 240 medium.Resolve(error(x));
199 241 } catch { }
200 242 medium.Reject(x);
201 243 };
202 244 else
203 handlerInfo.errorHandler = medium.Reject;
245 errorHandler = medium.Reject;
204 246
205 AddHandler(handlerInfo);
247 AddHandler(resultHandler, errorHandler, medium.InternalCancel);
206 248
207 249 return medium;
208 250 }
@@ -214,19 +256,17 namespace Implab {
214 256
215 257 var medium = new Promise<T>(this, true);
216 258
217 var handlerInfo = new ResultHandlerInfo();
218
259 ResultHandler<T> resultHandler;
260
219 261 if (success != null)
220 handlerInfo.resultHandler = x => {
262 resultHandler = x => {
221 263 success(x);
222 264 medium.Resolve(x);
223 265 };
224 266 else
225 handlerInfo.resultHandler = medium.Resolve;
267 resultHandler = medium.Resolve;
226 268
227 handlerInfo.errorHandler = medium.Reject;
228
229 AddHandler(handlerInfo);
269 AddHandler(resultHandler, medium.Reject, medium.InternalCancel);
230 270
231 271 return medium;
232 272 }
@@ -249,15 +289,17 namespace Implab {
249 289
250 290 var medium = new Promise<T>(this, true);
251 291
252 AddHandler(new ResultHandlerInfo {
253 errorHandler = e => {
292 AddHandler(
293 null,
294 e => {
254 295 try {
255 296 medium.Resolve(handler(e));
256 297 } catch (Exception e2) {
257 298 medium.Reject(e2);
258 299 }
259 }
260 });
300 },
301 medium.InternalCancel
302 );
261 303
262 304 return medium;
263 305 }
@@ -268,8 +310,8 namespace Implab {
268 310
269 311 var medium = new Promise<T>();
270 312
271 AddHandler(new ResultHandlerInfo {
272 resultHandler = x => {
313 AddHandler(
314 x => {
273 315 // to avoid handler being called multiple times we handle exception by ourselfs
274 316 try {
275 317 handler();
@@ -278,13 +320,16 namespace Implab {
278 320 medium.Reject(e);
279 321 }
280 322 },
281 errorHandler = x => {
323
324 e => {
282 325 try {
283 326 handler();
284 327 } catch { }
285 medium.Reject(x);
286 }
287 });
328 medium.Reject(e);
329 },
330
331 medium.InternalCancel
332 );
288 333
289 334 return medium;
290 335 }
@@ -304,17 +349,22 namespace Implab {
304 349 // создаСм ΠΏΡ€ΠΈΡ†Π΅ΠΏΠ»Π΅Π½Π½ΠΎΠ΅ ΠΎΠ±Π΅Ρ‰Π°Π½ΠΈΠ΅
305 350 var chained = new Promise<TNew>();
306 351
307 AddHandler(new ResultHandlerInfo() {
308 resultHandler = result => chained.Resolve(mapper(result)),
309 errorHandler = delegate(Exception e) {
310 if (error != null)
311 try {
312 error(e);
313 } catch { }
314 // Π² случаС ошибки Π½ΡƒΠΆΠ½ΠΎ ΠΏΠ΅Ρ€Π΅Π΄Π°Ρ‚ΡŒ ΠΈΡΠΊΠ»ΡŽΡ‡Π΅Π½ΠΈΠ΅ дальшС ΠΏΠΎ Ρ†Π΅ΠΏΠΎΡ‡ΠΊΠ΅
315 chained.Reject(e);
316 }
317 });
352 ResultHandler<T> resultHandler = result => chained.Resolve(mapper(result));
353 ErrorHandler errorHandler = delegate(Exception e) {
354 if (error != null)
355 try {
356 error(e);
357 } catch { }
358 // Π² случаС ошибки Π½ΡƒΠΆΠ½ΠΎ ΠΏΠ΅Ρ€Π΅Π΄Π°Ρ‚ΡŒ ΠΈΡΠΊΠ»ΡŽΡ‡Π΅Π½ΠΈΠ΅ дальшС ΠΏΠΎ Ρ†Π΅ΠΏΠΎΡ‡ΠΊΠ΅
359 chained.Reject(e);
360 };
361
362
363 AddHandler(
364 resultHandler,
365 errorHandler,
366 chained.InternalCancel
367 );
318 368
319 369 return chained;
320 370 }
@@ -341,27 +391,32 namespace Implab {
341 391 // ΠΏΠ΅Ρ€Π΅Π΄Π°Ρ‚ΡŒ Ρ‡Π΅Ρ€Π΅Π· Π½Π΅Π³ΠΎ Ρ€Π΅Π·ΡƒΠ»ΡŒΡ‚Π°Ρ‚Ρ‹ Ρ€Π°Π±ΠΎΡ‚Ρ‹.
342 392 var medium = new Promise<TNew>(this, true);
343 393
344 AddHandler(new ResultHandlerInfo {
345 resultHandler = delegate(T result) {
346 if (medium.State == PromiseState.Cancelled)
347 return;
394 ResultHandler<T> resultHandler = delegate(T result) {
395 if (medium.IsCancelled)
396 return;
348 397
349 var promise = chained(result);
398 var promise = chained(result);
350 399
351 // notify chained operation that it's not needed
352 medium.Cancelled(() => promise.Cancel());
353 promise.Then(
354 x => medium.Resolve(x),
355 e => medium.Reject(e)
356 );
357 },
358 errorHandler = delegate(Exception e) {
359 if (error != null)
360 error(e);
361 // Π² случаС ошибки Π½ΡƒΠΆΠ½ΠΎ ΠΏΠ΅Ρ€Π΅Π΄Π°Ρ‚ΡŒ ΠΈΡΠΊΠ»ΡŽΡ‡Π΅Π½ΠΈΠ΅ дальшС ΠΏΠΎ Ρ†Π΅ΠΏΠΎΡ‡ΠΊΠ΅
362 medium.Reject(e);
363 }
364 });
400 // notify chained operation that it's not needed
401 medium.Cancelled(() => promise.Cancel());
402 promise.Then(
403 x => medium.Resolve(x),
404 e => medium.Reject(e)
405 );
406 };
407
408 ErrorHandler errorHandler = delegate(Exception e) {
409 if (error != null)
410 error(e);
411 // Π² случаС ошибки Π½ΡƒΠΆΠ½ΠΎ ΠΏΠ΅Ρ€Π΅Π΄Π°Ρ‚ΡŒ ΠΈΡΠΊΠ»ΡŽΡ‡Π΅Π½ΠΈΠ΅ дальшС ΠΏΠΎ Ρ†Π΅ΠΏΠΎΡ‡ΠΊΠ΅
412 medium.Reject(e);
413 };
414
415 AddHandler(
416 resultHandler,
417 errorHandler,
418 medium.InternalCancel
419 );
365 420
366 421 return medium;
367 422 }
@@ -371,19 +426,19 namespace Implab {
371 426 }
372 427
373 428 public Promise<T> Cancelled(Action handler) {
374 if (handler == null)
375 return this;
376 lock (m_lock) {
377 if (m_state == PromiseState.Unresolved)
378 m_cancelHandlers.AddLast(handler);
379 else if (m_state == PromiseState.Cancelled)
380 handler();
381 }
429 AddHandler(null, null, handler);
382 430 return this;
383 431 }
384 432
385 public void HandleCancelled(Action handler) {
386 Cancelled(handler);
433 public Promise<T> Finally(Action handler) {
434 if (handler == null)
435 throw new ArgumentNullException("handler");
436 AddHandler(
437 x => handler(),
438 e => handler(),
439 handler
440 );
441 return this;
387 442 }
388 443
389 444 /// <summary>
@@ -415,15 +470,15 namespace Implab {
415 470 if (!evt.WaitOne(timeout, true))
416 471 throw new TimeoutException();
417 472
418 switch (State) {
419 case PromiseState.Resolved:
473 switch (m_state) {
474 case ResolvedState:
420 475 return m_result;
421 case PromiseState.Cancelled:
476 case CancelledState:
422 477 throw new OperationCanceledException();
423 case PromiseState.Rejected:
478 case RejectedState:
424 479 throw new TargetInvocationException(m_error);
425 480 default:
426 throw new ApplicationException(String.Format("Invalid promise state {0}", State));
481 throw new ApplicationException(String.Format("Invalid promise state {0}", m_state));
427 482 }
428 483 }
429 484
@@ -431,40 +486,45 namespace Implab {
431 486 return Join(Timeout.Infinite);
432 487 }
433 488
434 void AddHandler(ResultHandlerInfo handler) {
435 bool invokeRequired = false;
489 void AddHandler(ResultHandler<T> success, ErrorHandler error, Action cancel) {
490 Interlocked.Increment(ref m_childrenCount);
491
492 HandlerDescriptor handler = new HandlerDescriptor {
493 resultHandler = success,
494 errorHandler = error,
495 cancellHandler = cancel
496 };
436 497
437 lock (m_lock) {
438 m_childrenCount++;
439 if (m_state == PromiseState.Unresolved) {
440 m_resultHandlers.AddLast(handler);
441 } else
442 invokeRequired = true;
498 bool queued;
499
500 if (!IsResolved) {
501 m_handlers.Enqueue(handler);
502 queued = true;
503 } else {
504 // the promise is in resolved state, just invoke the handled with minimum overhead
505 queued = false;
506 InvokeHandler(handler);
443 507 }
444 508
445 // ΠΎΠ±Ρ€Π°Π±ΠΎΡ‚Ρ‡ΠΈΠΊΠΈ Π½Π΅ Π΄ΠΎΠ»ΠΆΠ½Ρ‹ Π±Π»ΠΎΠΊΠΈΡ€ΠΎΠ²Π°Ρ‚ΡŒ сам ΠΎΠ±ΡŠΠ΅ΠΊΡ‚
446 if (invokeRequired)
509 if (queued && IsResolved && m_handlers.TryDequeue(out handler))
510 // if the promise have been resolved while we was adding handler to the queue
511 // we can't guarantee that someone is still processing it
512 // therefore we will fetch a handler from the queue and execute it
513 // note that fetched handler may be not the one we have added
447 514 InvokeHandler(handler);
515
448 516 }
449 517
450 void InvokeHandler(ResultHandlerInfo handler) {
518 void InvokeHandler(HandlerDescriptor handler) {
451 519 switch (m_state) {
452 case PromiseState.Resolved:
453 try {
454 if (handler.resultHandler != null)
455 handler.resultHandler(m_result);
456 } catch (Exception e) {
457 try {
458 if (handler.errorHandler != null)
459 handler.errorHandler(e);
460 } catch { }
461 }
520 case ResolvedState:
521 handler.Resolve(m_result);
462 522 break;
463 case PromiseState.Rejected:
464 try {
465 if (handler.errorHandler != null)
466 handler.errorHandler(m_error);
467 } catch { }
523 case RejectedState:
524 handler.Reject(m_error);
525 break;
526 case CancelledState:
527 handler.Cancel();
468 528 break;
469 529 default:
470 530 // do nothing
@@ -473,76 +533,31 namespace Implab {
473 533 }
474 534
475 535 protected virtual void OnStateChanged() {
476 switch (m_state) {
477 case PromiseState.Resolved:
478 foreach (var resultHandlerInfo in m_resultHandlers)
479 try {
480 if (resultHandlerInfo.resultHandler != null)
481 resultHandlerInfo.resultHandler(m_result);
482 } catch (Exception e) {
483 try {
484 if (resultHandlerInfo.errorHandler != null)
485 resultHandlerInfo.errorHandler(e);
486 } catch { }
487 }
488 break;
489 case PromiseState.Cancelled:
490 foreach (var cancelHandler in m_cancelHandlers)
491 cancelHandler();
492 break;
493 case PromiseState.Rejected:
494 foreach (var resultHandlerInfo in m_resultHandlers)
495 try {
496 if (resultHandlerInfo.errorHandler != null)
497 resultHandlerInfo.errorHandler(m_error);
498 } catch { }
499 break;
500 default:
501 throw new InvalidOperationException(String.Format("Promise entered an invalid state {0}", m_state));
502 }
503
504 m_resultHandlers = null;
505 m_cancelHandlers = null;
536 HandlerDescriptor handler;
537 while (m_handlers.TryDequeue(out handler))
538 InvokeHandler(handler);
506 539 }
507 540
508 541
509 542
510 543 public bool IsExclusive {
511 544 get {
512 lock (m_lock) {
513 return m_childrenCount <= 1;
514 }
515 }
516 }
517
518 public PromiseState State {
519 get {
520 lock (m_lock) {
521 return m_state;
522 }
545 return m_childrenCount <= 1;
523 546 }
524 547 }
525 548
526 549 protected bool Cancel(bool dependencies) {
527 bool result;
528
529 lock (m_lock) {
530 if (m_state == PromiseState.Unresolved) {
531 m_state = PromiseState.Cancelled;
532 result = true;
533 } else {
534 result = false;
535 }
536 }
537
538 if (result)
550 if (BeginTransit()) {
551 CompleteTransit(CancelledState);
539 552 OnStateChanged();
540 553
541 if (dependencies && m_parent != null && m_parent.IsExclusive) {
542 m_parent.Cancel();
554 if (dependencies && m_parent != null && m_parent.IsExclusive)
555 m_parent.Cancel();
556
557 return true;
558 } else {
559 return false;
543 560 }
544
545 return result;
546 561 }
547 562
548 563 }
General Comments 0
You need to be logged in to leave comments. Login now