##// END OF EJS Templates
fixes
cin -
r27:a236cd1f0477 default
parent child
Show More
1 NO CONTENT: modified file, binary diff hidden
@@ -1,656 +1,656
1 1 using System;
2 2 using System.Collections.Generic;
3 3 using System.Reflection;
4 4 using System.Diagnostics;
5 5 using System.Threading;
6 6 using Implab.Parallels;
7 7
8 8 namespace Implab {
9 9
10 10 public delegate void ErrorHandler(Exception e);
11 11 public delegate T ErrorHandler<out T>(Exception e);
12 12 public delegate void ResultHandler<in T>(T result);
13 13 public delegate TNew ResultMapper<in TSrc, out TNew>(TSrc result);
14 14 public delegate Promise<TNew> ChainedOperation<in TSrc, TNew>(TSrc result);
15 15
16 16 /// <summary>
17 17 /// Класс для асинхронного получения результатов. Так называемое "обещание".
18 18 /// </summary>
19 19 /// <typeparam name="T">Тип получаемого результата</typeparam>
20 20 /// <remarks>
21 21 /// <para>Сервис при обращении к его методу дает обещаиние о выполнении операции,
22 22 /// клиент получив такое обещание может установить ряд обратных вызово для получения
23 23 /// событий выполнения обещания, тоесть завершения операции и предоставлении результатов.</para>
24 24 /// <para>
25 25 /// Обещение может быть как выполнено, так и выполнено с ошибкой. Для подписки на
26 26 /// данные события клиент должен использовать методы <c>Then</c>.
27 27 /// </para>
28 28 /// <para>
29 29 /// Сервис, в свою очередь, по окончанию выполнения операции (возможно с ошибкой),
30 30 /// использует методы <c>Resolve</c> либо <c>Reject</c> для оповещения клиетна о
31 31 /// выполнении обещания.
32 32 /// </para>
33 33 /// <para>
34 34 /// Если сервер успел выполнить обещание еще до того, как клиент на него подписался,
35 35 /// то в момент подписки клиента будут вызваны соответсвующие события в синхронном
36 36 /// режиме и клиент будет оповещен в любом случае. Иначе, обработчики добавляются в
37 37 /// список в порядке подписания и в этом же порядке они будут вызваны при выполнении
38 38 /// обещания.
39 39 /// </para>
40 40 /// <para>
41 41 /// Обрабатывая результаты обещания можно преобразовывать результаты либо инициировать
42 42 /// связанные асинхронные операции, которые также возвращают обещания. Для этого следует
43 43 /// использовать соответствующую форму методе <c>Then</c>.
44 44 /// </para>
45 45 /// <para>
46 46 /// Также хорошим правилом является то, что <c>Resolve</c> и <c>Reject</c> должен вызывать
47 47 /// только инициатор обещания иначе могут возникнуть противоречия.
48 48 /// </para>
49 49 /// </remarks>
50 50 public class Promise<T> : IPromise<T> {
51 51
52 52 struct HandlerDescriptor {
53 53 public ResultHandler<T> resultHandler;
54 54 public ErrorHandler errorHandler;
55 55 public Action cancellHandler;
56 56
57 57 public void Resolve(T result) {
58 58 if (resultHandler != null)
59 59 try {
60 60 resultHandler(result);
61 61 } catch (Exception e) {
62 62 Reject(e);
63 63 }
64 64 }
65 65
66 66 public void Reject(Exception err) {
67 67 if (errorHandler != null)
68 68 try {
69 69 errorHandler(err);
70 70 } catch {
71 71 }
72 72 }
73 73
74 74 public void Cancel() {
75 75 if (cancellHandler != null)
76 76 try {
77 77 cancellHandler();
78 78 } catch {
79 79 }
80 80 }
81 81 }
82 82
83 83 const int UnresolvedSate = 0;
84 84 const int TransitionalState = 1;
85 85 const int SucceededState = 2;
86 86 const int RejectedState = 3;
87 87 const int CancelledState = 4;
88 88
89 89 readonly IPromiseBase m_parent;
90 90 readonly bool m_cancellable;
91 91
92 92 int m_childrenCount = 0;
93 93 int m_state;
94 94 T m_result;
95 95 Exception m_error;
96 96
97 97 readonly MTQueue<HandlerDescriptor> m_handlers = new MTQueue<HandlerDescriptor>();
98 98
99 99 public Promise() {
100 100 m_cancellable = true;
101 101 }
102 102
103 103 public Promise(IPromiseBase parent, bool cancellable) {
104 104 m_cancellable = cancellable;
105 105 m_parent = parent;
106 106 }
107 107
108 108 void InternalCancel() {
109 109 // don't try to cancel parent :)
110 110 Cancel(false);
111 111 }
112 112
113 113 bool BeginTransit() {
114 114 return UnresolvedSate == Interlocked.CompareExchange(ref m_state, TransitionalState, UnresolvedSate);
115 115 }
116 116
117 117 void CompleteTransit(int state) {
118 118 if (TransitionalState != Interlocked.CompareExchange(ref m_state, state, TransitionalState))
119 119 throw new InvalidOperationException("Can't complete transition when the object isn't in the transitional state");
120 120 }
121 121
122 122 void WaitTransition() {
123 123 while (m_state == TransitionalState) {
124 124 /* noop */
125 125 }
126 126 }
127 127
128 128 public bool IsResolved {
129 129 get {
130 130 return m_state > 1;
131 131 }
132 132 }
133 133
134 134 public bool IsCancelled {
135 135 get {
136 136 return m_state == CancelledState;
137 137 }
138 138 }
139 139
140 140 /// <summary>
141 141 /// Выполняет обещание, сообщая об успешном выполнении.
142 142 /// </summary>
143 143 /// <param name="result">Результат выполнения.</param>
144 144 /// <exception cref="InvalidOperationException">Данное обещание уже выполнено</exception>
145 145 public void Resolve(T result) {
146 146 if (BeginTransit()) {
147 147 m_result = result;
148 148 CompleteTransit(SucceededState);
149 149 OnStateChanged();
150 150 } else {
151 151 WaitTransition();
152 152 if (m_state != CancelledState)
153 153 throw new InvalidOperationException("The promise is already resolved");
154 154 }
155 155 }
156 156
157 157 /// <summary>
158 158 /// Выполняет обещание, сообщая об ошибке
159 159 /// </summary>
160 160 /// <remarks>
161 161 /// Поскольку обещание должно работать в многопточной среде, при его выполнении сразу несколько потоков
162 162 /// могу вернуть ошибку, при этом только первая будет использована в качестве результата, остальные
163 163 /// будут проигнорированы.
164 164 /// </remarks>
165 165 /// <param name="error">Исключение возникшее при выполнении операции</param>
166 166 /// <exception cref="InvalidOperationException">Данное обещание уже выполнено</exception>
167 167 public void Reject(Exception error) {
168 168 if (BeginTransit()) {
169 169 m_error = error;
170 170 CompleteTransit(RejectedState);
171 171 OnStateChanged();
172 172 } else {
173 173 WaitTransition();
174 174 if (m_state == SucceededState)
175 175 throw new InvalidOperationException("The promise is already resolved");
176 176 }
177 177 }
178 178
179 179 /// <summary>
180 180 /// Отменяет операцию, если это возможно.
181 181 /// </summary>
182 182 /// <returns><c>true</c> Операция была отменена, обработчики не будут вызваны.<c>false</c> отмена не возможна, поскольку обещание уже выполнено и обработчики отработали.</returns>
183 183 public bool Cancel() {
184 184 return Cancel(true);
185 185 }
186 186
187 187 /// <summary>
188 188 /// Adds new handlers to this promise.
189 189 /// </summary>
190 190 /// <param name="success">The handler of the successfully completed operation.
191 191 /// This handler will recieve an operation result as a parameter.</param>
192 192 /// <param name="error">Handles an exception that may occur during the operation.</param>
193 193 /// <returns>The new promise chained to this one.</returns>
194 194 public IPromise<T> Then(ResultHandler<T> success, ErrorHandler error) {
195 195 if (success == null && error == null)
196 196 return this;
197 197
198 198 var medium = new Promise<T>(this, true);
199 199
200 200 ResultHandler<T> resultHandler;
201 201 if (success != null)
202 202 resultHandler = x => {
203 203 success(x);
204 204 medium.Resolve(x);
205 205 };
206 206 else
207 207 resultHandler = medium.Resolve;
208 208
209 209 ErrorHandler errorHandler;
210 210 if (error != null)
211 211 errorHandler = x => {
212 212 // несмотря на то, что обработчик ошибки вызывается безопасно,
213 213 // т.е. возникшие в нем ошибки будут подавлены, нам нужно
214 214 // гарантировать, что ошибка будет передана дальше по цепочке обещаний
215 215 try {
216 216 error(x);
217 217 } catch { }
218 218 medium.Reject(x);
219 219 };
220 220 else
221 221 errorHandler = medium.Reject;
222 222
223 223 AddHandler(resultHandler, errorHandler, medium.InternalCancel);
224 224
225 225 return medium;
226 226 }
227 227
228 228 public IPromiseBase Then(Action success,ErrorHandler error)
229 229 {
230 230 return Then(x => success(), error);
231 231 }
232 232
233 233 public IPromiseBase Then(Action success)
234 234 {
235 235 return Then(success);
236 236 }
237 237
238 238 /// <summary>
239 239 /// Adds new handlers to this promise.
240 240 /// </summary>
241 241 /// <param name="success">The handler of the successfully completed operation.
242 242 /// This handler will recieve an operation result as a parameter.</param>
243 243 /// <param name="error">Handles an exception that may occur during the operation and returns the value which will be used as the result of the operation.</param>
244 244 /// <returns>The new promise chained to this one.</returns>
245 245 public IPromise<T> Then(ResultHandler<T> success, ErrorHandler<T> error) {
246 246 if (success == null && error == null)
247 247 return this;
248 248
249 249 var medium = new Promise<T>(this, true);
250 250
251 251 ResultHandler<T> resultHandler;
252 252 ErrorHandler errorHandler;
253 253
254 254 if (success != null)
255 255 resultHandler = x => {
256 256 success(x);
257 257 medium.Resolve(x);
258 258 };
259 259 else
260 260 resultHandler = medium.Resolve;
261 261
262 262 if (error != null)
263 263 errorHandler = x => {
264 264 try {
265 265 medium.Resolve(error(x));
266 266 } catch(Exception e) {
267 267 medium.Reject(e);
268 268 }
269 269 };
270 270 else
271 271 errorHandler = medium.Reject;
272 272
273 273 AddHandler(resultHandler, errorHandler, medium.InternalCancel);
274 274
275 275 return medium;
276 276 }
277 277
278 278
279 279 public IPromise<T> Then(ResultHandler<T> success) {
280 280 if (success == null)
281 281 return this;
282 282
283 283 var medium = new Promise<T>(this, true);
284 284
285 285 ResultHandler<T> resultHandler;
286 286
287 287 if (success != null)
288 288 resultHandler = x => {
289 289 success(x);
290 290 medium.Resolve(x);
291 291 };
292 292 else
293 293 resultHandler = medium.Resolve;
294 294
295 295 AddHandler(resultHandler, medium.Reject, medium.InternalCancel);
296 296
297 297 return medium;
298 298 }
299 299
300 300 public IPromise<T> Error(ErrorHandler error) {
301 301 return Then((ResultHandler<T>)null, error);
302 302 }
303 303
304 304 /// <summary>
305 305 /// Handles error and allows to keep the promise.
306 306 /// </summary>
307 307 /// <remarks>
308 308 /// If the specified handler throws an exception, this exception will be used to reject the promise.
309 309 /// </remarks>
310 310 /// <param name="handler">The error handler which returns the result of the promise.</param>
311 311 /// <returns>New promise.</returns>
312 312 public IPromise<T> Error(ErrorHandler<T> handler) {
313 313 if (handler == null)
314 314 return this;
315 315
316 316 var medium = new Promise<T>(this, true);
317 317
318 318 AddHandler(
319 null,
319 x => medium.Resolve(x),
320 320 e => {
321 321 try {
322 322 medium.Resolve(handler(e));
323 323 } catch (Exception e2) {
324 324 medium.Reject(e2);
325 325 }
326 326 },
327 327 medium.InternalCancel
328 328 );
329 329
330 330 return medium;
331 331 }
332 332
333 333 public IPromise<T> Anyway(Action handler) {
334 334 if (handler == null)
335 335 return this;
336 336
337 337 var medium = new Promise<T>();
338 338
339 339 AddHandler(
340 340 x => {
341 341 // to avoid handler being called multiple times we handle exception by ourselfs
342 342 try {
343 343 handler();
344 344 medium.Resolve(x);
345 345 } catch (Exception e) {
346 346 medium.Reject(e);
347 347 }
348 348 },
349 349
350 350 e => {
351 351 try {
352 352 handler();
353 353 } catch { }
354 354 medium.Reject(e);
355 355 },
356 356
357 357 medium.InternalCancel
358 358 );
359 359
360 360 return medium;
361 361 }
362 362
363 363 /// <summary>
364 364 /// Позволяет преобразовать результат выполения операции к новому типу.
365 365 /// </summary>
366 366 /// <typeparam name="TNew">Новый тип результата.</typeparam>
367 367 /// <param name="mapper">Преобразование результата к новому типу.</param>
368 368 /// <param name="error">Обработчик ошибки. Данный обработчик получит
369 369 /// исключение возникшее при выполнении операции.</param>
370 370 /// <returns>Новое обещание, которое будет выполнено при выполнении исходного обещания.</returns>
371 371 public IPromise<TNew> Map<TNew>(ResultMapper<T, TNew> mapper, ErrorHandler error) {
372 372 if (mapper == null)
373 373 throw new ArgumentNullException("mapper");
374 374
375 375 // создаем прицепленное обещание
376 376 var chained = new Promise<TNew>();
377 377
378 378 ResultHandler<T> resultHandler = result => chained.Resolve(mapper(result));
379 379 ErrorHandler errorHandler = delegate(Exception e) {
380 380 if (error != null)
381 381 try {
382 382 error(e);
383 383 } catch { }
384 384 // в случае ошибки нужно передать исключение дальше по цепочке
385 385 chained.Reject(e);
386 386 };
387 387
388 388
389 389 AddHandler(
390 390 resultHandler,
391 391 errorHandler,
392 392 chained.InternalCancel
393 393 );
394 394
395 395 return chained;
396 396 }
397 397
398 398 public IPromise<TNew> Map<TNew>(ResultMapper<T, TNew> mapper) {
399 399 return Map(mapper, null);
400 400 }
401 401
402 402 /// <summary>
403 403 /// Сцепляет несколько аснхронных операций. Указанная асинхронная операция будет вызвана после
404 404 /// выполнения текущей, а результат текущей операции может быть использован для инициализации
405 405 /// новой операции.
406 406 /// </summary>
407 407 /// <typeparam name="TNew">Тип результата указанной асинхронной операции.</typeparam>
408 408 /// <param name="chained">Асинхронная операция, которая должна будет начаться после выполнения текущей.</param>
409 409 /// <param name="error">Обработчик ошибки. Данный обработчик получит
410 410 /// исключение возникшее при выполнении текуещй операции.</param>
411 411 /// <returns>Новое обещание, которое будет выполнено по окончанию указанной аснхронной операции.</returns>
412 412 public IPromise<TNew> Chain<TNew>(ChainedOperation<T, TNew> chained, ErrorHandler error) {
413 413
414 414 // проблема в том, что на момент связывания еще не начата асинхронная операция, поэтому нужно
415 415 // создать посредника, к которому будут подвызяваться следующие обработчики.
416 416 // когда будет выполнена реальная асинхронная операция, она обратиться к посреднику, чтобы
417 417 // передать через него результаты работы.
418 418 var medium = new Promise<TNew>(this, true);
419 419
420 420 ResultHandler<T> resultHandler = delegate(T result) {
421 421 if (medium.IsCancelled)
422 422 return;
423 423
424 424 var promise = chained(result);
425 425
426 426 // notify chained operation that it's not needed
427 427 medium.Cancelled(() => promise.Cancel());
428 428 promise.Then(
429 429 x => medium.Resolve(x),
430 430 e => medium.Reject(e)
431 431 );
432 432 };
433 433
434 434 ErrorHandler errorHandler = delegate(Exception e) {
435 435 if (error != null)
436 436 error(e);
437 437 // в случае ошибки нужно передать исключение дальше по цепочке
438 438 medium.Reject(e);
439 439 };
440 440
441 441 AddHandler(
442 442 resultHandler,
443 443 errorHandler,
444 444 medium.InternalCancel
445 445 );
446 446
447 447 return medium;
448 448 }
449 449
450 450 public IPromise<TNew> Chain<TNew>(ChainedOperation<T, TNew> chained) {
451 451 return Chain(chained, null);
452 452 }
453 453
454 454 public IPromise<T> Cancelled(Action handler) {
455 455 AddHandler(null, null, handler);
456 456 return this;
457 457 }
458 458
459 459 /// <summary>
460 460 /// Adds the specified handler for all cases (success, error, cancel)
461 461 /// </summary>
462 462 /// <param name="handler">The handler that will be called anyway</param>
463 463 /// <returns>self</returns>
464 464 public IPromise<T> Finally(Action handler) {
465 465 if (handler == null)
466 466 throw new ArgumentNullException("handler");
467 467 AddHandler(
468 468 x => handler(),
469 469 e => handler(),
470 470 handler
471 471 );
472 472 return this;
473 473 }
474 474
475 475 /// <summary>
476 476 /// Дожидается отложенного обещания и в случае успеха, возвращает
477 477 /// его, результат, в противном случае бросает исключение.
478 478 /// </summary>
479 479 /// <remarks>
480 480 /// <para>
481 481 /// Если ожидание обещания было прервано по таймауту, это не значит,
482 482 /// что обещание было отменено или что-то в этом роде, это только
483 483 /// означает, что мы его не дождались, однако все зарегистрированные
484 484 /// обработчики, как были так остались и они будут вызваны, когда
485 485 /// обещание будет выполнено.
486 486 /// </para>
487 487 /// <para>
488 488 /// Такое поведение вполне оправдано поскольку таймаут может истечь
489 489 /// в тот момент, когда началась обработка цепочки обработчиков, и
490 490 /// к тому же текущее обещание может стоять в цепочке обещаний и его
491 491 /// отклонение может привести к непрогнозируемому результату.
492 492 /// </para>
493 493 /// </remarks>
494 494 /// <param name="timeout">Время ожидания</param>
495 495 /// <returns>Результат выполнения обещания</returns>
496 496 public T Join(int timeout) {
497 497 var evt = new ManualResetEvent(false);
498 498 Anyway(() => evt.Set());
499 499 Cancelled(() => evt.Set());
500 500
501 501 if (!evt.WaitOne(timeout, true))
502 502 throw new TimeoutException();
503 503
504 504 switch (m_state) {
505 505 case SucceededState:
506 506 return m_result;
507 507 case CancelledState:
508 508 throw new OperationCanceledException();
509 509 case RejectedState:
510 510 throw new TargetInvocationException(m_error);
511 511 default:
512 512 throw new ApplicationException(String.Format("Invalid promise state {0}", m_state));
513 513 }
514 514 }
515 515
516 516 public T Join() {
517 517 return Join(Timeout.Infinite);
518 518 }
519 519
520 520 void AddHandler(ResultHandler<T> success, ErrorHandler error, Action cancel) {
521 521 Interlocked.Increment(ref m_childrenCount);
522 522
523 523 HandlerDescriptor handler = new HandlerDescriptor {
524 524 resultHandler = success,
525 525 errorHandler = error,
526 526 cancellHandler = cancel
527 527 };
528 528
529 529 bool queued;
530 530
531 531 if (!IsResolved) {
532 532 m_handlers.Enqueue(handler);
533 533 queued = true;
534 534 } else {
535 535 // the promise is in resolved state, just invoke the handled with minimum overhead
536 536 queued = false;
537 537 InvokeHandler(handler);
538 538 }
539 539
540 540 if (queued && IsResolved && m_handlers.TryDequeue(out handler))
541 541 // if the promise have been resolved while we was adding handler to the queue
542 542 // we can't guarantee that someone is still processing it
543 543 // therefore we will fetch a handler from the queue and execute it
544 // note that fetched handler may be not the one we have added
544 // note that fetched handler may be not the one that we have added
545 // even we can fetch no handlers at all :)
545 546 InvokeHandler(handler);
546
547 547 }
548 548
549 void InvokeHandler(HandlerDescriptor handler) {
549 protected virtual void InvokeHandler(HandlerDescriptor handler) {
550 550 switch (m_state) {
551 551 case SucceededState:
552 552 handler.Resolve(m_result);
553 553 break;
554 554 case RejectedState:
555 555 handler.Reject(m_error);
556 556 break;
557 557 case CancelledState:
558 558 handler.Cancel();
559 559 break;
560 560 default:
561 561 // do nothing
562 562 return;
563 563 }
564 564 }
565 565
566 566 protected virtual void OnStateChanged() {
567 567 HandlerDescriptor handler;
568 568 while (m_handlers.TryDequeue(out handler))
569 569 InvokeHandler(handler);
570 570 }
571 571
572 572 public bool IsExclusive {
573 573 get {
574 574 return m_childrenCount <= 1;
575 575 }
576 576 }
577 577
578 578 protected bool Cancel(bool dependencies) {
579 579 if (BeginTransit()) {
580 580 CompleteTransit(CancelledState);
581 581 OnStateChanged();
582 582
583 583 if (dependencies && m_parent != null && m_parent.IsExclusive)
584 584 m_parent.Cancel();
585 585
586 586 return true;
587 587 } else {
588 588 return false;
589 589 }
590 590 }
591 591
592 592 /// <summary>
593 593 /// Объединяет несколько обещаний в одно, результатом которого является массив результатов других обещаний.
594 594 /// Если хотябы одно из переданных обещаний не будет выполнено, то новое обещение тоже не будет выполнено.
595 595 /// При отмене нового обещания, переданные обещания также будут отменены, если никто больше на них не подписан.
596 596 /// </summary>
597 597 /// <param name="promises">Список обещаний. Если список пустой, то результирующее обещание возвращается уже выполненным.</param>
598 598 /// <returns>Обещание объединяющее в себе результат переданных обещаний.</returns>
599 599 /// <exception cref="ArgumentNullException"><paramref name="promises"/> не может быть null</exception>
600 600 public static Promise<T[]> CreateComposite(IList<Promise<T>> promises) {
601 601 if (promises == null)
602 602 throw new ArgumentNullException();
603 603
604 604 // создаем аккумулятор для результатов и результирующее обещание
605 605 var result = new T[promises.Count];
606 606 var promise = new Promise<T[]>();
607 607
608 608 // special case
609 609 if (promises.Count == 0) {
610 610 promise.Resolve(result);
611 611 return promise;
612 612 }
613 613
614 614 int pending = promises.Count;
615 615
616 616 for (int i = 0; i < promises.Count; i++) {
617 617 var dest = i;
618 618
619 619 promises[i].Then(
620 620 x => {
621 621 result[dest] = x;
622 622 if(Interlocked.Decrement(ref pending) == 0)
623 623 promise.Resolve(result);
624 624 },
625 625 e => promise.Reject(e)
626 626 );
627 627 }
628 628
629 629 promise.Cancelled(
630 630 () => {
631 631 foreach(var d in promises)
632 632 if(d.IsExclusive)
633 633 d.Cancel();
634 634 }
635 635 );
636 636
637 637 return promise;
638 638 }
639 639
640 640 public static Promise<T> ResultToPromise(T result) {
641 641 var p = new Promise<T>();
642 642 p.Resolve(result);
643 643 return p;
644 644 }
645 645
646 646 public static Promise<T> ExceptionToPromise(Exception error) {
647 647 if (error == null)
648 648 throw new ArgumentNullException();
649 649
650 650 var p = new Promise<T>();
651 651 p.Reject(error);
652 652 return p;
653 653 }
654 654
655 655 }
656 656 }
General Comments 0
You need to be logged in to leave comments. Login now