##// END OF EJS Templates
added ICancellable.Cancel(Exception) to allow specify the reason of cancellation
cin -
r138:f75cfa58e3d4 v2
parent child
Show More
@@ -1,38 +1,38
1 1 using System.Windows.Forms;
2 2 using System;
3 3
4 4
5 5 namespace Implab.Fx {
6 6 public class ControlBoundPromise<T> : Promise<T> {
7 7 readonly Control m_target;
8 8
9 9 public ControlBoundPromise(Control target) {
10 10 Safe.ArgumentNotNull(target, "target");
11 11
12 12 m_target = target;
13 13 }
14 14
15 15 protected override void SignalSuccess(IDeferred<T> handler) {
16 16 if (m_target.InvokeRequired)
17 17 m_target.BeginInvoke(new Action<IDeferred<T>>(base.SignalSuccess), handler);
18 18 else
19 19 base.SignalSuccess(handler);
20 20 }
21 21
22 protected override void SignalCancelled(IDeferred<T> handler) {
22 protected override void SignalCancelled(IDeferred<T> handler, Exception reason) {
23 23 if (m_target.InvokeRequired)
24 m_target.BeginInvoke(new Action<IDeferred<T>>(base.SignalCancelled), handler);
24 m_target.BeginInvoke(new Action<IDeferred<T>,Exception>(base.SignalCancelled), handler, reason);
25 25 else
26 base.SignalCancelled(handler);
26 base.SignalCancelled(handler, reason);
27 27 }
28 28
29 29 protected override void SignalError(IDeferred<T> handler, Exception error) {
30 30 if (m_target.InvokeRequired)
31 31 m_target.BeginInvoke(new Action<IDeferred<T>,Exception>(base.SignalError), handler, error);
32 32 else
33 33 base.SignalError(handler, error);
34 34 }
35 35
36 36 }
37 37 }
38 38
@@ -1,852 +1,852
1 1 using System;
2 2 using System.Reflection;
3 3 using System.Threading;
4 4 using Implab.Parallels;
5 5
6 6 #if MONO
7 7
8 8 using NUnit.Framework;
9 9 using TestClassAttribute = NUnit.Framework.TestFixtureAttribute;
10 10 using TestMethod = NUnit.Framework.TestAttribute;
11 11
12 12 #else
13 13
14 14 using Microsoft.VisualStudio.TestTools.UnitTesting;
15 15
16 16 #endif
17 17
18 18 namespace Implab.Test {
19 19 [TestClass]
20 20 public class AsyncTests {
21 21 [TestMethod]
22 22 public void ResolveTest() {
23 23 int res = -1;
24 24 var p = new Promise<int>();
25 25 p.Then(x => res = x);
26 26 p.Resolve(100);
27 27
28 28 Assert.AreEqual(100, res);
29 29 }
30 30
31 31 [TestMethod]
32 32 public void RejectTest() {
33 33 int res = -1;
34 34 Exception err = null;
35 35
36 36 var p = new Promise<int>();
37 37 p.Then(
38 38 x => res = x,
39 39 e => {
40 40 err = e;
41 41 return -2;
42 42 }
43 43 );
44 44 p.Reject(new ApplicationException("error"));
45 45
46 46 Assert.AreEqual(res, -1);
47 47 Assert.AreEqual(err.Message, "error");
48 48
49 49 }
50 50
51 51 [TestMethod]
52 52 public void CancelExceptionTest() {
53 53 var p = new Promise<bool>();
54 54 p.Cancel();
55 55
56 var p2 = p.Cancelled(() => {
56 var p2 = p.Then(x => x, null, reason => {
57 57 throw new ApplicationException("CANCELLED");
58 58 });
59 59
60 60 try {
61 61 p2.Join();
62 62 Assert.Fail();
63 63 } catch (ApplicationException err) {
64 64 Assert.AreEqual("CANCELLED", err.InnerException.Message);
65 65 }
66 66
67 67 }
68 68
69 69 [TestMethod]
70 70 public void ContinueOnCancelTest() {
71 71 var p = new Promise<bool>();
72 72 p.Cancel();
73 73
74 74 var p2 = p
75 .Cancelled<bool>(() => {
75 .Then<bool>(x => x, null, reason => {
76 76 throw new ApplicationException("CANCELLED");
77 77 })
78 .Error(e => true);
78 .Then(x => x, e => true);
79 79
80 80 Assert.AreEqual(true, p2.Join());
81 81 }
82 82
83 83 [TestMethod]
84 84 public void JoinSuccessTest() {
85 85 var p = new Promise<int>();
86 86 p.Resolve(100);
87 87 Assert.AreEqual(p.Join(), 100);
88 88 }
89 89
90 90 [TestMethod]
91 91 public void JoinFailTest() {
92 92 var p = new Promise<int>();
93 93 p.Reject(new ApplicationException("failed"));
94 94
95 95 try {
96 96 p.Join();
97 97 throw new ApplicationException("WRONG!");
98 98 } catch (TargetInvocationException err) {
99 99 Assert.AreEqual(err.InnerException.Message, "failed");
100 100 } catch {
101 101 Assert.Fail("Got wrong excaption");
102 102 }
103 103 }
104 104
105 105 [TestMethod]
106 106 public void MapTest() {
107 107 var p = new Promise<int>();
108 108
109 109 var p2 = p.Then(x => x.ToString());
110 110 p.Resolve(100);
111 111
112 112 Assert.AreEqual(p2.Join(), "100");
113 113 }
114 114
115 115 [TestMethod]
116 116 public void FixErrorTest() {
117 117 var p = new Promise<int>();
118 118
119 var p2 = p.Error(e => 101);
119 var p2 = p.Then(x => x, e => 101);
120 120
121 121 p.Reject(new Exception());
122 122
123 123 Assert.AreEqual(p2.Join(), 101);
124 124 }
125 125
126 126 [TestMethod]
127 127 public void ChainTest() {
128 128 var p1 = new Promise<int>();
129 129
130 130 var p3 = p1.Chain(x => {
131 131 var p2 = new Promise<string>();
132 132 p2.Resolve(x.ToString());
133 133 return p2;
134 134 });
135 135
136 136 p1.Resolve(100);
137 137
138 138 Assert.AreEqual(p3.Join(), "100");
139 139 }
140 140
141 141 [TestMethod]
142 142 public void ChainFailTest() {
143 143 var p1 = new Promise<int>();
144 144
145 145 var p3 = p1.Chain(x => {
146 146 var p2 = new Promise<string>();
147 147 p2.Reject(new Exception("DIE!!!"));
148 148 return p2;
149 149 });
150 150
151 151 p1.Resolve(100);
152 152
153 153 Assert.IsTrue(p3.IsResolved);
154 154 }
155 155
156 156 [TestMethod]
157 157 public void PoolTest() {
158 158 var pid = Thread.CurrentThread.ManagedThreadId;
159 159 var p = AsyncPool.Invoke(() => Thread.CurrentThread.ManagedThreadId);
160 160
161 161 Assert.AreNotEqual(pid, p.Join());
162 162 }
163 163
164 164 [TestMethod]
165 165 public void WorkerPoolSizeTest() {
166 166 var pool = new WorkerPool(5, 10, 1);
167 167
168 168 Assert.AreEqual(5, pool.PoolSize);
169 169
170 170 pool.Invoke(() => { Thread.Sleep(100000000); return 10; });
171 171 pool.Invoke(() => { Thread.Sleep(100000000); return 10; });
172 172 pool.Invoke(() => { Thread.Sleep(100000000); return 10; });
173 173
174 174 Assert.AreEqual(5, pool.PoolSize);
175 175
176 176 for (int i = 0; i < 100; i++)
177 177 pool.Invoke(() => { Thread.Sleep(100000000); return 10; });
178 178 Thread.Sleep(200);
179 179 Assert.AreEqual(10, pool.PoolSize);
180 180
181 181 pool.Dispose();
182 182 }
183 183
184 184 [TestMethod]
185 185 public void WorkerPoolCorrectTest() {
186 186 var pool = new WorkerPool(0,1000,100);
187 187
188 188 const int iterations = 1000;
189 189 int pending = iterations;
190 190 var stop = new ManualResetEvent(false);
191 191
192 192 var count = 0;
193 193 for (int i = 0; i < iterations; i++) {
194 194 pool
195 195 .Invoke(() => 1)
196 196 .Then(x => Interlocked.Add(ref count, x))
197 197 .Then(x => Math.Log10(x))
198 198 .On(() => {
199 199 Interlocked.Decrement(ref pending);
200 200 if (pending == 0)
201 201 stop.Set();
202 202 }, PromiseEventType.All);
203 203 }
204 204
205 205 stop.WaitOne();
206 206
207 207 Assert.AreEqual(iterations, count);
208 208 Console.WriteLine("Max threads: {0}", pool.MaxRunningThreads);
209 209 pool.Dispose();
210 210
211 211 }
212 212
213 213 [TestMethod]
214 214 public void WorkerPoolDisposeTest() {
215 215 var pool = new WorkerPool(5, 20);
216 216 Assert.AreEqual(5, pool.PoolSize);
217 217 pool.Dispose();
218 218 Thread.Sleep(500);
219 219 Assert.AreEqual(0, pool.PoolSize);
220 220 pool.Dispose();
221 221 }
222 222
223 223 [TestMethod]
224 224 public void MTQueueTest() {
225 225 var queue = new MTQueue<int>();
226 226 int res;
227 227
228 228 queue.Enqueue(10);
229 229 Assert.IsTrue(queue.TryDequeue(out res));
230 230 Assert.AreEqual(10, res);
231 231 Assert.IsFalse(queue.TryDequeue(out res));
232 232
233 233 for (int i = 0; i < 1000; i++)
234 234 queue.Enqueue(i);
235 235
236 236 for (int i = 0; i < 1000; i++) {
237 237 queue.TryDequeue(out res);
238 238 Assert.AreEqual(i, res);
239 239 }
240 240
241 241 int writers = 0;
242 242 int readers = 0;
243 243 var stop = new ManualResetEvent(false);
244 244 int total = 0;
245 245
246 246 const int itemsPerWriter = 10000;
247 247 const int writersCount = 10;
248 248
249 249 for (int i = 0; i < writersCount; i++) {
250 250 Interlocked.Increment(ref writers);
251 251 AsyncPool
252 252 .RunThread(() => {
253 253 for (int ii = 0; ii < itemsPerWriter; ii++) {
254 254 queue.Enqueue(1);
255 255 }
256 256 return 1;
257 257 })
258 258 .On(() => Interlocked.Decrement(ref writers), PromiseEventType.All);
259 259 }
260 260
261 261 for (int i = 0; i < 10; i++) {
262 262 Interlocked.Increment(ref readers);
263 263 AsyncPool
264 264 .RunThread(() => {
265 265 int t;
266 266 do {
267 267 while (queue.TryDequeue(out t))
268 268 Interlocked.Add(ref total, t);
269 269 } while (writers > 0);
270 270 return 1;
271 271 })
272 272 .On(() => {
273 273 Interlocked.Decrement(ref readers);
274 274 if (readers == 0)
275 275 stop.Set();
276 276 }, PromiseEventType.All);
277 277 }
278 278
279 279 stop.WaitOne();
280 280
281 281 Assert.AreEqual(100000, total);
282 282 }
283 283
284 284 [TestMethod]
285 285 public void AsyncQueueTest() {
286 286 var queue = new AsyncQueue<int>();
287 287 int res;
288 288
289 289 queue.Enqueue(10);
290 290 Assert.IsTrue(queue.TryDequeue(out res));
291 291 Assert.AreEqual(10, res);
292 292 Assert.IsFalse(queue.TryDequeue(out res));
293 293
294 294 for (int i = 0; i < 1000; i++)
295 295 queue.Enqueue(i);
296 296
297 297 for (int i = 0; i < 1000; i++) {
298 298 queue.TryDequeue(out res);
299 299 Assert.AreEqual(i, res);
300 300 }
301 301
302 302 const int count = 10000000;
303 303
304 304 int res1 = 0, res2 = 0;
305 305 var t1 = Environment.TickCount;
306 306
307 307 AsyncPool.RunThread(
308 308 () => {
309 309 for (var i = 0; i < count; i++)
310 310 queue.Enqueue(1);
311 311 Console.WriteLine("done writer #1: {0} ms", Environment.TickCount - t1);
312 312 },
313 313 () => {
314 314 for (var i = 0; i < count; i++)
315 315 queue.Enqueue(2);
316 316 Console.WriteLine("done writer #2: {0} ms", Environment.TickCount - t1);
317 317 },
318 318 () => {
319 319 int temp;
320 320 int i = 0;
321 321 while (i < count)
322 322 if (queue.TryDequeue(out temp)) {
323 323 i++;
324 324 res1 += temp;
325 325 }
326 326 Console.WriteLine("done reader #1: {0} ms", Environment.TickCount - t1);
327 327 },
328 328 () => {
329 329 int temp;
330 330 int i = 0;
331 331 while (i < count)
332 332 if (queue.TryDequeue(out temp)) {
333 333 i++;
334 334 res2 += temp;
335 335 }
336 336 Console.WriteLine("done reader #2: {0} ms", Environment.TickCount - t1);
337 337 }
338 338 )
339 339 .Bundle()
340 340 .Join();
341 341
342 342 Assert.AreEqual(count * 3, res1 + res2);
343 343
344 344 Console.WriteLine(
345 345 "done: {0} ms, summ#1: {1}, summ#2: {2}, total: {3}, count: {4}",
346 346 Environment.TickCount - t1,
347 347 res1,
348 348 res2,
349 349 res1 + res2,
350 350 count
351 351 );
352 352 }
353 353
354 354 [TestMethod]
355 355 public void AsyncQueueBatchTest() {
356 356 var queue = new AsyncQueue<int>();
357 357
358 358 const int wBatch = 29;
359 359 const int wCount = 400000;
360 360 const int total = wBatch * wCount * 2;
361 361 const int summ = wBatch * wCount * 3;
362 362
363 363 int r1 = 0, r2 = 0;
364 364 const int rBatch = 111;
365 365 int read = 0;
366 366
367 367 var t1 = Environment.TickCount;
368 368
369 369 AsyncPool.RunThread(
370 370 () => {
371 371 var buffer = new int[wBatch];
372 372 for(int i = 0; i<wBatch; i++)
373 373 buffer[i] = 1;
374 374
375 375 for(int i =0; i < wCount; i++)
376 376 queue.EnqueueRange(buffer,0,wBatch);
377 377 Console.WriteLine("done writer #1: {0} ms", Environment.TickCount - t1);
378 378 },
379 379 () => {
380 380 var buffer = new int[wBatch];
381 381 for(int i = 0; i<wBatch; i++)
382 382 buffer[i] = 2;
383 383
384 384 for(int i =0; i < wCount; i++)
385 385 queue.EnqueueRange(buffer,0,wBatch);
386 386 Console.WriteLine("done writer #2: {0} ms", Environment.TickCount - t1);
387 387 },
388 388 () => {
389 389 var buffer = new int[rBatch];
390 390
391 391 while(read < total) {
392 392 int actual;
393 393 if (queue.TryDequeueRange(buffer,0,rBatch,out actual)) {
394 394 for(int i=0; i< actual; i++)
395 395 r1 += buffer[i];
396 396 Interlocked.Add(ref read, actual);
397 397 }
398 398 }
399 399
400 400 Console.WriteLine("done reader #1: {0} ms", Environment.TickCount - t1);
401 401 },
402 402 () => {
403 403 var buffer = new int[rBatch];
404 404
405 405 while(read < total) {
406 406 int actual;
407 407 if (queue.TryDequeueRange(buffer,0,rBatch,out actual)) {
408 408 for(int i=0; i< actual; i++)
409 409 r2 += buffer[i];
410 410 Interlocked.Add(ref read, actual);
411 411 }
412 412 }
413 413
414 414 Console.WriteLine("done reader #2: {0} ms", Environment.TickCount - t1);
415 415 }
416 416 )
417 417 .Bundle()
418 418 .Join();
419 419
420 420 Assert.AreEqual(summ , r1 + r2);
421 421
422 422 Console.WriteLine(
423 423 "done: {0} ms, summ#1: {1}, summ#2: {2}, total: {3}, count: {4}",
424 424 Environment.TickCount - t1,
425 425 r1,
426 426 r2,
427 427 r1 + r2,
428 428 total
429 429 );
430 430 }
431 431
432 432 [TestMethod]
433 433 public void AsyncQueueChunkDequeueTest() {
434 434 var queue = new AsyncQueue<int>();
435 435
436 436 const int wBatch = 31;
437 437 const int wCount = 200000;
438 438 const int total = wBatch * wCount * 3;
439 439 const int summ = wBatch * wCount * 6;
440 440
441 441 int r1 = 0, r2 = 0;
442 442 const int rBatch = 1024;
443 443 int read = 0;
444 444
445 445 var t1 = Environment.TickCount;
446 446
447 447 AsyncPool.RunThread(
448 448 () => {
449 449 var buffer = new int[wBatch];
450 450 for(int i = 0; i<wBatch; i++)
451 451 buffer[i] = 1;
452 452
453 453 for(int i =0; i < wCount; i++)
454 454 queue.EnqueueRange(buffer,0,wBatch);
455 455 Console.WriteLine("done writer #1: {0} ms", Environment.TickCount - t1);
456 456 },
457 457 () => {
458 458 var buffer = new int[wBatch];
459 459 for(int i = 0; i<wBatch; i++)
460 460 buffer[i] = 2;
461 461
462 462 for(int i =0; i < wCount; i++)
463 463 queue.EnqueueRange(buffer,0,wBatch);
464 464 Console.WriteLine("done writer #2: {0} ms", Environment.TickCount - t1);
465 465 },
466 466 () => {
467 467 var buffer = new int[wBatch];
468 468 for(int i = 0; i<wBatch; i++)
469 469 buffer[i] = 3;
470 470
471 471 for(int i =0; i < wCount; i++)
472 472 queue.EnqueueRange(buffer,0,wBatch);
473 473 Console.WriteLine("done writer #3: {0} ms", Environment.TickCount - t1);
474 474 },
475 475 () => {
476 476 var buffer = new int[rBatch];
477 477 int count = 1;
478 478 double avgchunk = 0;
479 479 while(read < total) {
480 480 int actual;
481 481 if (queue.TryDequeueChunk(buffer,0,rBatch,out actual)) {
482 482 for(int i=0; i< actual; i++)
483 483 r2 += buffer[i];
484 484 Interlocked.Add(ref read, actual);
485 485 avgchunk = avgchunk*(count-1)/count + actual/(double)count;
486 486 count ++;
487 487 }
488 488 }
489 489
490 490 Console.WriteLine("done reader #2: {0} ms, avg chunk size: {1}", Environment.TickCount - t1, avgchunk);
491 491 }
492 492 )
493 493 .Bundle()
494 494 .Join();
495 495
496 496 Assert.AreEqual(summ , r1 + r2);
497 497
498 498 Console.WriteLine(
499 499 "done: {0} ms, summ#1: {1}, summ#2: {2}, total: {3}, count: {4}",
500 500 Environment.TickCount - t1,
501 501 r1,
502 502 r2,
503 503 r1 + r2,
504 504 total
505 505 );
506 506 }
507 507
508 508 [TestMethod]
509 509 public void AsyncQueueDrainTest() {
510 510 var queue = new AsyncQueue<int>();
511 511
512 512 const int wBatch = 11;
513 513 const int wCount = 200000;
514 514 const int total = wBatch * wCount * 3;
515 515 const int summ = wBatch * wCount * 3;
516 516
517 517 int r1 = 0, r2 = 0;
518 518 const int rBatch = 11;
519 519 int read = 0;
520 520
521 521 var t1 = Environment.TickCount;
522 522
523 523 AsyncPool.RunThread(
524 524 () => {
525 525 var buffer = new int[wBatch];
526 526 for(int i = 0; i<wBatch; i++)
527 527 buffer[i] = 1;
528 528
529 529 for(int i =0; i < wCount; i++)
530 530 queue.EnqueueRange(buffer,0,wBatch);
531 531 Console.WriteLine("done writer #1: {0} ms", Environment.TickCount - t1);
532 532 },
533 533 () => {
534 534 for(int i =0; i < wCount * wBatch; i++)
535 535 queue.Enqueue(1);
536 536 Console.WriteLine("done writer #2: {0} ms", Environment.TickCount - t1);
537 537 },
538 538 () => {
539 539 var buffer = new int[wBatch];
540 540 for(int i = 0; i<wBatch; i++)
541 541 buffer[i] = 1;
542 542
543 543 for(int i =0; i < wCount; i++)
544 544 queue.EnqueueRange(buffer,0,wBatch);
545 545 Console.WriteLine("done writer #3: {0} ms", Environment.TickCount - t1);
546 546 },
547 547 /*() => {
548 548 int temp;
549 549 int count = 0;
550 550 while (read < total)
551 551 if (queue.TryDequeue(out temp)) {
552 552 count++;
553 553 r1 += temp;
554 554 Interlocked.Increment(ref read);
555 555 }
556 556 Console.WriteLine("done reader #1: {0} ms, {1} count", Environment.TickCount - t1, count);
557 557 },*/
558 558 /*() => {
559 559 var buffer = new int[rBatch];
560 560 var count = 0;
561 561 while(read < total) {
562 562 int actual;
563 563 if (queue.TryDequeueRange(buffer,0,rBatch,out actual)) {
564 564 for(int i=0; i< actual; i++)
565 565 r1 += buffer[i];
566 566 Interlocked.Add(ref read, actual);
567 567 count += actual;
568 568 }
569 569 }
570 570
571 571 Console.WriteLine("done reader #1: {0} ms, {1} items", Environment.TickCount - t1, count);
572 572 },*/
573 573 () => {
574 574 var count = 0;
575 575 while(read < total) {
576 576 var buffer = queue.Drain();
577 577 for(int i=0; i< buffer.Length; i++)
578 578 r1 += buffer[i];
579 579 Interlocked.Add(ref read, buffer.Length);
580 580 count += buffer.Length;
581 581 }
582 582 Console.WriteLine("done reader #1: {0} ms, {1} items", Environment.TickCount - t1, count);
583 583 },
584 584 () => {
585 585 var count = 0;
586 586 while(read < total) {
587 587 var buffer = queue.Drain();
588 588 for(int i=0; i< buffer.Length; i++)
589 589 r2 += buffer[i];
590 590 Interlocked.Add(ref read, buffer.Length);
591 591 count += buffer.Length;
592 592 }
593 593 Console.WriteLine("done reader #2: {0} ms, {1} items", Environment.TickCount - t1, count);
594 594 }
595 595 )
596 596 .Bundle()
597 597 .Join();
598 598
599 599 Assert.AreEqual(summ , r1 + r2);
600 600
601 601 Console.WriteLine(
602 602 "done: {0} ms, summ#1: {1}, summ#2: {2}, total: {3}, count: {4}",
603 603 Environment.TickCount - t1,
604 604 r1,
605 605 r2,
606 606 r1 + r2,
607 607 total
608 608 );
609 609 }
610 610
611 611 [TestMethod]
612 612 public void ParallelMapTest() {
613 613
614 614 const int count = 100000;
615 615
616 616 var args = new double[count];
617 617 var rand = new Random();
618 618
619 619 for (int i = 0; i < count; i++)
620 620 args[i] = rand.NextDouble();
621 621
622 622 var t = Environment.TickCount;
623 623 var res = args.ParallelMap(x => Math.Sin(x*x), 4).Join();
624 624
625 625 Console.WriteLine("Map complete in {0} ms", Environment.TickCount - t);
626 626
627 627 t = Environment.TickCount;
628 628 for (int i = 0; i < count; i++)
629 629 Assert.AreEqual(Math.Sin(args[i] * args[i]), res[i]);
630 630 Console.WriteLine("Verified in {0} ms", Environment.TickCount - t);
631 631 }
632 632
633 633 [TestMethod]
634 634 public void ChainedMapTest() {
635 635
636 636 using (var pool = new WorkerPool()) {
637 637 const int count = 10000;
638 638
639 639 var args = new double[count];
640 640 var rand = new Random();
641 641
642 642 for (int i = 0; i < count; i++)
643 643 args[i] = rand.NextDouble();
644 644
645 645 var t = Environment.TickCount;
646 646 var res = args
647 647 .ChainedMap(
648 648 // Analysis disable once AccessToDisposedClosure
649 649 x => pool.Invoke(
650 650 () => Math.Sin(x * x)
651 651 ),
652 652 4
653 653 )
654 654 .Join();
655 655
656 656 Console.WriteLine("Map complete in {0} ms", Environment.TickCount - t);
657 657
658 658 t = Environment.TickCount;
659 659 for (int i = 0; i < count; i++)
660 660 Assert.AreEqual(Math.Sin(args[i] * args[i]), res[i]);
661 661 Console.WriteLine("Verified in {0} ms", Environment.TickCount - t);
662 662 Console.WriteLine("Max workers: {0}", pool.MaxRunningThreads);
663 663 }
664 664 }
665 665
666 666 [TestMethod]
667 667 public void ParallelForEachTest() {
668 668
669 669 const int count = 100000;
670 670
671 671 var args = new int[count];
672 672 var rand = new Random();
673 673
674 674 for (int i = 0; i < count; i++)
675 675 args[i] = (int)(rand.NextDouble() * 100);
676 676
677 677 int result = 0;
678 678
679 679 var t = Environment.TickCount;
680 680 args.ParallelForEach(x => Interlocked.Add(ref result, x), 4).Join();
681 681
682 682 Console.WriteLine("Iteration complete in {0} ms, result: {1}", Environment.TickCount - t, result);
683 683
684 684 int result2 = 0;
685 685
686 686 t = Environment.TickCount;
687 687 for (int i = 0; i < count; i++)
688 688 result2 += args[i];
689 689 Assert.AreEqual(result2, result);
690 690 Console.WriteLine("Verified in {0} ms", Environment.TickCount - t);
691 691 }
692 692
693 693 [TestMethod]
694 694 public void ComplexCase1Test() {
695 695 var flags = new bool[3];
696 696
697 697 // op1 (aync 200ms) => op2 (async 200ms) => op3 (sync map)
698 698
699 699 var step1 = PromiseHelper
700 700 .Sleep(200, "Alan")
701 701 .On(() => flags[0] = true, PromiseEventType.Cancelled);
702 702 var p = step1
703 703 .Chain(x =>
704 704 PromiseHelper
705 705 .Sleep(200, "Hi, " + x)
706 706 .Then(y => y)
707 707 .On(() => flags[1] = true, PromiseEventType.Cancelled)
708 708 )
709 709 .On(() => flags[2] = true, PromiseEventType.Cancelled);
710 710 step1.Join();
711 711 p.Cancel();
712 712 try {
713 713 Assert.AreEqual(p.Join(), "Hi, Alan");
714 714 Assert.Fail("Shouldn't get here");
715 715 } catch (OperationCanceledException) {
716 716 }
717 717
718 718 Assert.IsFalse(flags[0]);
719 719 Assert.IsTrue(flags[1]);
720 720 Assert.IsTrue(flags[2]);
721 721 }
722 722
723 723 [TestMethod]
724 724 public void ChainedCancel1Test() {
725 725 // ΠΏΡ€ΠΈ ΠΎΡ‚ΠΌΠ΅Π½Π΅ сцСплСнной асинхронной ΠΎΠΏΠ΅Ρ€Π°Ρ†ΠΈΠΈ всС ΠΎΠ±Π΅Ρ‰Π°Π½ΠΈΠ΅ Π΄ΠΎΠ»ΠΆΠ½ΠΎ
726 726 // Π·Π°Π²Π΅Ρ€ΡˆΠ°Ρ‚ΡŒΡΡ ошибкой OperationCanceledException
727 727 var p = PromiseHelper
728 728 .Sleep(1, "Hi, HAL!")
729 729 .Then(x => {
730 730 // запускаСм Π΄Π²Π΅ асинхронныС ΠΎΠΏΠ΅Ρ€Π°Ρ†ΠΈΠΈ
731 731 var result = PromiseHelper.Sleep(1000, "HEM ENABLED!!!");
732 732 // вторая опСрация отмСняСт ΠΏΠ΅Ρ€Π²ΡƒΡŽ Π΄ΠΎ Π·Π°Π²Π΅Ρ€ΡˆΠ΅Π½ΠΈΡ
733 733 PromiseHelper
734 734 .Sleep(100, "HAL, STOP!")
735 735 .Then(result.Cancel);
736 736 return result;
737 737 });
738 738 try {
739 739 p.Join();
740 740 } catch (TargetInvocationException err) {
741 741 Assert.IsTrue(err.InnerException is OperationCanceledException);
742 742 }
743 743 }
744 744
745 745 [TestMethod]
746 746 public void ChainedCancel2Test() {
747 747 // ΠΏΡ€ΠΈ ΠΎΡ‚ΠΌΠ΅Π½Π΅ Ρ†Π΅ΠΏΠΎΡ‡ΠΊΠΈ ΠΎΠ±Π΅Ρ‰Π°Π½ΠΈΠΉ, Π²Π»ΠΎΠΆΠ΅Π½Π½Ρ‹Π΅ ΠΎΠΏΠ΅Ρ€Π°Ρ†ΠΈΠΈ Ρ‚Π°ΠΊΠΆΠ΅ Π΄ΠΎΠ»ΠΆΠ½Ρ‹ ΠΎΡ‚ΠΌΠ΅Π½ΡΡ‚ΡŒΡΡ
748 748 var pSurvive = new Promise<bool>();
749 749 var hemStarted = new ManualResetEvent(false);
750 750 var p = PromiseHelper
751 751 .Sleep(1, "Hi, HAL!")
752 752 .Chain(x => {
753 753 hemStarted.Set();
754 754 // запускаСм Π΄Π²Π΅ асинхронныС ΠΎΠΏΠ΅Ρ€Π°Ρ†ΠΈΠΈ
755 755 var result = PromiseHelper
756 756 .Sleep(100000000, "HEM ENABLED!!!")
757 757 .Then(s => {
758 758 pSurvive.Resolve(false);
759 759 return s;
760 760 });
761 761
762 762 result
763 .Cancelled(() => pSurvive.Resolve(true));
763 .On(() => pSurvive.Resolve(true), PromiseEventType.Cancelled);
764 764
765 765 return result;
766 766 });
767 767
768 768 hemStarted.WaitOne();
769 769 p.Cancel();
770 770
771 771 try {
772 772 p.Join();
773 773 } catch (OperationCanceledException) {
774 774 Assert.IsTrue(pSurvive.Join());
775 775 }
776 776 }
777 777
778 778 [TestMethod]
779 779 public void SharedLockTest() {
780 780 var l = new SharedLock();
781 781 int shared = 0;
782 782 int exclusive = 0;
783 783 var s1 = new Signal();
784 784 var log = new AsyncQueue<string>();
785 785
786 786 try {
787 787 AsyncPool.RunThread(
788 788 () => {
789 789 log.Enqueue("Reader #1 started");
790 790 try {
791 791 l.LockShared();
792 792 log.Enqueue("Reader #1 lock got");
793 793 if (Interlocked.Increment(ref shared) == 2)
794 794 s1.Set();
795 795 s1.Wait();
796 796 log.Enqueue("Reader #1 finished");
797 797 Interlocked.Decrement(ref shared);
798 798 } finally {
799 799 l.Release();
800 800 log.Enqueue("Reader #1 lock released");
801 801 }
802 802 },
803 803 () => {
804 804 log.Enqueue("Reader #2 started");
805 805
806 806 try {
807 807 l.LockShared();
808 808 log.Enqueue("Reader #2 lock got");
809 809
810 810 if (Interlocked.Increment(ref shared) == 2)
811 811 s1.Set();
812 812 s1.Wait();
813 813 log.Enqueue("Reader #2 upgrading to writer");
814 814 Interlocked.Decrement(ref shared);
815 815 l.Upgrade();
816 816 log.Enqueue("Reader #2 upgraded");
817 817
818 818 Assert.AreEqual(1, Interlocked.Increment(ref exclusive));
819 819 Assert.AreEqual(0, shared);
820 820 log.Enqueue("Reader #2 finished");
821 821 Interlocked.Decrement(ref exclusive);
822 822 } finally {
823 823 l.Release();
824 824 log.Enqueue("Reader #2 lock released");
825 825 }
826 826 },
827 827 () => {
828 828 log.Enqueue("Writer #1 started");
829 829 try {
830 830 l.LockExclusive();
831 831 log.Enqueue("Writer #1 got the lock");
832 832 Assert.AreEqual(1, Interlocked.Increment(ref exclusive));
833 833 Interlocked.Decrement(ref exclusive);
834 834 log.Enqueue("Writer #1 is finished");
835 835 } finally {
836 836 l.Release();
837 837 log.Enqueue("Writer #1 lock released");
838 838 }
839 839 }
840 840 ).Bundle().Join(1000);
841 841 log.Enqueue("Done");
842 842 } catch(Exception error) {
843 843 log.Enqueue(error.Message);
844 844 throw;
845 845 } finally {
846 846 foreach (var m in log)
847 847 Console.WriteLine(m);
848 848 }
849 849 }
850 850 }
851 851 }
852 852
@@ -1,291 +1,308
1 1 using System;
2 2 using Implab.Parallels;
3 3 using System.Threading;
4 4 using System.Reflection;
5 5
6 6 namespace Implab {
7 7 public abstract class AbstractPromise<THandler> {
8 8
9 9 const int UNRESOLVED_SATE = 0;
10 10 const int TRANSITIONAL_STATE = 1;
11 11 const int SUCCEEDED_STATE = 2;
12 12 const int REJECTED_STATE = 3;
13 13 const int CANCELLED_STATE = 4;
14 14
15 15 const int RESERVED_HANDLERS_COUNT = 4;
16 16
17 17 int m_state;
18 18 Exception m_error;
19 19 int m_handlersCount;
20 20
21 21 readonly THandler[] m_handlers = new THandler[RESERVED_HANDLERS_COUNT];
22 22 MTQueue<THandler> m_extraHandlers;
23 23 int m_handlerPointer = -1;
24 24 int m_handlersCommited;
25 25
26 26 #region state managment
27 27 bool BeginTransit() {
28 28 return UNRESOLVED_SATE == Interlocked.CompareExchange(ref m_state, TRANSITIONAL_STATE, UNRESOLVED_SATE);
29 29 }
30 30
31 31 void CompleteTransit(int state) {
32 32 if (TRANSITIONAL_STATE != Interlocked.CompareExchange(ref m_state, state, TRANSITIONAL_STATE))
33 33 throw new InvalidOperationException("Can't complete transition when the object isn't in the transitional state");
34 34 }
35 35
36 36 void WaitTransition() {
37 37 while (m_state == TRANSITIONAL_STATE) {
38 38 Thread.MemoryBarrier();
39 39 }
40 40 }
41 41
42 42 protected bool BeginSetResult() {
43 43 if (!BeginTransit()) {
44 44 WaitTransition();
45 45 if (m_state != CANCELLED_STATE)
46 46 throw new InvalidOperationException("The promise is already resolved");
47 47 return false;
48 48 }
49 49 return true;
50 50 }
51 51
52 52 protected void EndSetResult() {
53 53 CompleteTransit(SUCCEEDED_STATE);
54 54 OnSuccess();
55 55 }
56 56
57 57
58 58
59 59 /// <summary>
60 60 /// ВыполняСт ΠΎΠ±Π΅Ρ‰Π°Π½ΠΈΠ΅, сообщая ΠΎΠ± ошибкС
61 61 /// </summary>
62 62 /// <remarks>
63 63 /// ΠŸΠΎΡΠΊΠΎΠ»ΡŒΠΊΡƒ ΠΎΠ±Π΅Ρ‰Π°Π½ΠΈΠ΅ Π΄ΠΎΠ»ΠΆΠ½ΠΎ Ρ€Π°Π±ΠΎΡ‚Π°Ρ‚ΡŒ Π² ΠΌΠ½ΠΎΠ³ΠΎΠΏΡ‚ΠΎΡ‡Π½ΠΎΠΉ срСдС, ΠΏΡ€ΠΈ Π΅Π³ΠΎ Π²Ρ‹ΠΏΠΎΠ»Π½Π΅Π½ΠΈΠΈ сразу нСсколько ΠΏΠΎΡ‚ΠΎΠΊΠΎΠ²
64 64 /// ΠΌΠΎΠ³Ρƒ Π²Π΅Ρ€Π½ΡƒΡ‚ΡŒ ΠΎΡˆΠΈΠ±ΠΊΡƒ, ΠΏΡ€ΠΈ этом Ρ‚ΠΎΠ»ΡŒΠΊΠΎ пСрвая Π±ΡƒΠ΄Π΅Ρ‚ использована Π² качСствС Ρ€Π΅Π·ΡƒΠ»ΡŒΡ‚Π°Ρ‚Π°, ΠΎΡΡ‚Π°Π»ΡŒΠ½Ρ‹Π΅
65 65 /// Π±ΡƒΠ΄ΡƒΡ‚ ΠΏΡ€ΠΎΠΈΠ³Π½ΠΎΡ€ΠΈΡ€ΠΎΠ²Π°Π½Ρ‹.
66 66 /// </remarks>
67 67 /// <param name="error">Π˜ΡΠΊΠ»ΡŽΡ‡Π΅Π½ΠΈΠ΅ возникшСС ΠΏΡ€ΠΈ Π²Ρ‹ΠΏΠΎΠ»Π½Π΅Π½ΠΈΠΈ ΠΎΠΏΠ΅Ρ€Π°Ρ†ΠΈΠΈ</param>
68 68 /// <exception cref="InvalidOperationException">Π”Π°Π½Π½ΠΎΠ΅ ΠΎΠ±Π΅Ρ‰Π°Π½ΠΈΠ΅ ΡƒΠΆΠ΅ Π²Ρ‹ΠΏΠΎΠ»Π½Π΅Π½ΠΎ</exception>
69 69 protected void SetError(Exception error) {
70 70 if (BeginTransit()) {
71 m_error = error is PromiseTransientException ? error.InnerException : error;
72 CompleteTransit(REJECTED_STATE);
73 OnError();
71 if (error is OperationCanceledException) {
72 CompleteTransit(CANCELLED_STATE);
73 m_error = error.InnerException;
74 OnCancelled();
75 } else {
76 m_error = error is PromiseTransientException ? error.InnerException : error;
77 CompleteTransit(REJECTED_STATE);
78 OnError();
79 }
74 80 } else {
75 81 WaitTransition();
76 82 if (m_state == SUCCEEDED_STATE)
77 83 throw new InvalidOperationException("The promise is already resolved");
78 84 }
79 85 }
80 86
81 87 /// <summary>
82 88 /// ΠžΡ‚ΠΌΠ΅Π½ΡΠ΅Ρ‚ ΠΎΠΏΠ΅Ρ€Π°Ρ†ΠΈΡŽ, Ссли это Π²ΠΎΠ·ΠΌΠΎΠΆΠ½ΠΎ.
83 89 /// </summary>
84 90 /// <remarks>Для опрСдСлСния Π±Ρ‹Π»Π° Π»ΠΈ опСрация ΠΎΡ‚ΠΌΠ΅Π½Π΅Π½Π° слСдуСт ΠΈΡΠΏΠΎΠ»ΡŒΠ·ΠΎΠ²Π°Ρ‚ΡŒ свойство <see cref="IsCancelled"/>.</remarks>
85 protected void SetCancelled() {
91 protected void SetCancelled(Exception reason) {
86 92 if (BeginTransit()) {
93 m_error = reason;
87 94 CompleteTransit(CANCELLED_STATE);
88 95 OnCancelled();
89 96 }
90 97 }
91 98
92 99 protected abstract void SignalSuccess(THandler handler);
93 100
94 101 protected abstract void SignalError(THandler handler, Exception error);
95 102
96 protected abstract void SignalCancelled(THandler handler);
103 protected abstract void SignalCancelled(THandler handler, Exception reason);
97 104
98 105 void OnSuccess() {
99 106 var hp = m_handlerPointer;
100 107 var slot = hp +1 ;
101 108 while (slot < m_handlersCommited) {
102 109 if (Interlocked.CompareExchange(ref m_handlerPointer, slot, hp) == hp) {
103 110 SignalSuccess(m_handlers[slot]);
104 111 }
105 112 hp = m_handlerPointer;
106 113 slot = hp +1 ;
107 114 }
108 115
109 116
110 117 if (m_extraHandlers != null) {
111 118 THandler handler;
112 119 while (m_extraHandlers.TryDequeue(out handler))
113 120 SignalSuccess(handler);
114 121 }
115 122 }
116 123
117 124 void OnError() {
118 125 var hp = m_handlerPointer;
119 126 var slot = hp +1 ;
120 127 while (slot < m_handlersCommited) {
121 128 if (Interlocked.CompareExchange(ref m_handlerPointer, slot, hp) == hp) {
122 129 SignalError(m_handlers[slot],m_error);
123 130 }
124 131 hp = m_handlerPointer;
125 132 slot = hp +1 ;
126 133 }
127 134
128 135 if (m_extraHandlers != null) {
129 136 THandler handler;
130 137 while (m_extraHandlers.TryDequeue(out handler))
131 138 SignalError(handler, m_error);
132 139 }
133 140 }
134 141
135 142 void OnCancelled() {
136 143 var hp = m_handlerPointer;
137 144 var slot = hp +1 ;
138 145 while (slot < m_handlersCommited) {
139 146 if (Interlocked.CompareExchange(ref m_handlerPointer, slot, hp) == hp) {
140 SignalCancelled(m_handlers[slot]);
147 SignalCancelled(m_handlers[slot], m_error);
141 148 }
142 149 hp = m_handlerPointer;
143 150 slot = hp +1 ;
144 151 }
145 152
146 153 if (m_extraHandlers != null) {
147 154 THandler handler;
148 155 while (m_extraHandlers.TryDequeue(out handler))
149 SignalCancelled(handler);
156 SignalCancelled(handler, m_error);
150 157 }
151 158 }
152 159
153 160 #endregion
154 161
155 162 protected abstract void Listen(PromiseEventType events, Action handler);
156 163
157 164 #region synchronization traits
158 165 protected void WaitResult(int timeout) {
159 166 if (!IsResolved) {
160 167 var lk = new object();
161 168
162 169 Listen(PromiseEventType.All, () => {
163 170 lock(lk) {
164 171 Monitor.Pulse(lk);
165 172 }
166 173 });
167 174
168 175 lock (lk) {
169 176 while(!IsResolved) {
170 177 if(!Monitor.Wait(lk,timeout))
171 178 throw new TimeoutException();
172 179 }
173 180 }
174 181
175 182 }
176 183 switch (m_state) {
177 184 case SUCCEEDED_STATE:
178 185 return;
179 186 case CANCELLED_STATE:
180 187 throw new OperationCanceledException();
181 188 case REJECTED_STATE:
182 189 throw new TargetInvocationException(m_error);
183 190 default:
184 191 throw new ApplicationException(String.Format("Invalid promise state {0}", m_state));
185 192 }
186 193 }
187 194 #endregion
188 195
189 196 #region handlers managment
190 197
191 198 protected void AddHandler(THandler handler) {
192 199
193 200 if (m_state > 1) {
194 201 // the promise is in the resolved state, just invoke the handler
195 202 InvokeHandler(handler);
196 203 } else {
197 204 var slot = Interlocked.Increment(ref m_handlersCount) - 1;
198 205
199 206 if (slot < RESERVED_HANDLERS_COUNT) {
200 207 m_handlers[slot] = handler;
201 208
202 209 while (slot != Interlocked.CompareExchange(ref m_handlersCommited, slot + 1, slot)) {
203 210 }
204 211
205 212 if (m_state > 1) {
206 213 do {
207 214 var hp = m_handlerPointer;
208 215 slot = hp + 1;
209 216 if (slot < m_handlersCommited) {
210 217 if (Interlocked.CompareExchange(ref m_handlerPointer, slot, hp) != hp)
211 218 continue;
212 219 InvokeHandler(m_handlers[slot]);
213 220 }
214 221 break;
215 222 } while(true);
216 223 }
217 224 } else {
218 225 if (slot == RESERVED_HANDLERS_COUNT) {
219 226 m_extraHandlers = new MTQueue<THandler>();
220 227 } else {
221 228 while (m_extraHandlers == null)
222 229 Thread.MemoryBarrier();
223 230 }
224 231
225 232 m_extraHandlers.Enqueue(handler);
226 233
227 234 if (m_state > 1 && m_extraHandlers.TryDequeue(out handler))
228 235 // if the promise have been resolved while we was adding the handler to the queue
229 236 // we can't guarantee that someone is still processing it
230 237 // therefore we need to fetch a handler from the queue and execute it
231 238 // note that fetched handler may be not the one that we have added
232 239 // even we can fetch no handlers at all :)
233 240 InvokeHandler(handler);
234 241 }
235 242 }
236 243 }
237 244
238 245 protected void InvokeHandler(THandler handler) {
239 246 switch (m_state) {
240 247 case SUCCEEDED_STATE:
241 248 SignalSuccess(handler);
242 249 break;
243 250 case CANCELLED_STATE:
244 SignalCancelled(handler);
251 SignalCancelled(handler, m_error);
245 252 break;
246 253 case REJECTED_STATE:
247 254 SignalError(handler, m_error);
248 255 break;
249 256 default:
250 257 throw new Exception(String.Format("Invalid promise state {0}", m_state));
251 258 }
252 259 }
253 260
254 261 #endregion
255 262
256 263 #region IPromise implementation
257 264
258 265 public void Join(int timeout) {
259 266 WaitResult(timeout);
260 267 }
261 268
262 269 public void Join() {
263 270 WaitResult(-1);
264 271 }
265 272
266 273 public bool IsResolved {
267 274 get {
268 275 Thread.MemoryBarrier();
269 276 return m_state > 1;
270 277 }
271 278 }
272 279
273 280 public bool IsCancelled {
274 281 get {
275 282 Thread.MemoryBarrier();
276 283 return m_state == CANCELLED_STATE;
277 284 }
278 285 }
279 286
280 287 #endregion
281 288
282 289 #region ICancellable implementation
283 290
284 291 public void Cancel() {
285 SetCancelled();
292 SetCancelled(null);
293 }
294
295 public void Cancel(Exception reason) {
296 SetCancelled(reason);
286 297 }
287 298
288 299 #endregion
300
301 public Exception Error {
302 get {
303 return m_error;
304 }
305 }
289 306 }
290 307 }
291 308
@@ -1,41 +1,41
1 1 namespace Implab.Diagnostics {
2 2 public static class Extensions {
3 3 public static IPromise<T> EndLogicalOperation<T>(this IPromise<T> promise) {
4 4 Safe.ArgumentNotNull(promise, "promise");
5 5 var op = TraceContext.Instance.DetachLogicalOperation();
6 6
7 7 return promise.On(
8 8 x => {
9 9 TraceContext.Instance.EnterLogicalOperation(op,true);
10 10 TraceLog.TraceInformation("promise = {0}", x);
11 11 TraceLog.EndLogicalOperation();
12 12 TraceContext.Instance.Leave();
13 13 },
14 14 err =>{
15 15 TraceContext.Instance.EnterLogicalOperation(op,true);
16 16 TraceLog.TraceError("promise died {0}", err);
17 17 TraceLog.EndLogicalOperation();
18 18 TraceContext.Instance.Leave();
19 19 },
20 () => {
20 reason => {
21 21 TraceContext.Instance.EnterLogicalOperation(op,true);
22 TraceLog.TraceInformation("promise cancelled");
22 TraceLog.TraceInformation("promise cancelled {0}", reason.Message);
23 23 TraceLog.EndLogicalOperation();
24 24 TraceContext.Instance.Leave();
25 25 }
26 26 );
27 27 }
28 28
29 29 public static IPromise EndLogicalOperation(this IPromise promise) {
30 30 Safe.ArgumentNotNull(promise, "promise");
31 31 var op = TraceContext.Instance.DetachLogicalOperation();
32 32
33 33 return promise.On(() => {
34 34 TraceContext.Instance.EnterLogicalOperation(op,true);
35 35 TraceLog.EndLogicalOperation();
36 36 TraceContext.Instance.Leave();
37 37 }, PromiseEventType.All);
38 38 }
39 39 }
40 40 }
41 41
@@ -1,10 +1,11
1 1 using System;
2 2 using System.Collections.Generic;
3 3 using System.Linq;
4 4 using System.Text;
5 5
6 6 namespace Implab {
7 7 public interface ICancellable {
8 8 void Cancel();
9 void Cancel(Exception reason);
9 10 }
10 11 }
@@ -1,14 +1,24
1 1 using System;
2 2
3 3 namespace Implab {
4 4 /// <summary>
5 5 /// Deferred result, usually used by asynchronous services as the service part of the promise.
6 6 /// </summary>
7 7 public interface IDeferred : ICancellable {
8 8
9 9 void Resolve();
10 10
11 /// <summary>
12 /// Reject the promise with the specified error.
13 /// </summary>
14 /// <param name="error">The reason why the promise is rejected.</param>
15 /// <remarks>
16 /// Some exceptions are treated in a special case:
17 /// <see cref="OperationCanceledException"/> is interpreted as call to <see cref="Cancel()"/> method,
18 /// and <see cref="PromiseTransientException"/> is always unwrapped and its
19 /// <see cref="PromiseTransientException.InnerException"> is used as the reason to reject promise.
20 /// </remarks>
11 21 void Reject(Exception error);
12 22 }
13 23 }
14 24
@@ -1,135 +1,106
1 1 using System;
2 2 using System.Collections.Generic;
3 3 using System.Linq;
4 4 using System.Text;
5 5
6 6 namespace Implab {
7 7 public interface IPromise: ICancellable {
8 8
9 9 /// <summary>
10 10 /// Π’ΠΈΠΏ Ρ€Π΅Π·ΡƒΠ»ΡŒΡ‚Π°Ρ‚Π°, ΠΏΠΎΠ»ΡƒΡ‡Π°Π΅ΠΌΠΎΠ³ΠΎ Ρ‡Π΅Ρ€Π΅Π· Π΄Π°Π½Π½ΠΎΠ΅ ΠΎΠ±Π΅Ρ‰Π°Π½ΠΈΠ΅.
11 11 /// </summary>
12 12 Type PromiseType { get; }
13 13
14 14 /// <summary>
15 15 /// ΠžΠ±Π΅Ρ‰Π°Π½ΠΈΠ΅ являСтся Π²Ρ‹ΠΏΠΎΠ»Π½Π΅Π½Ρ‹ΠΌ, Π»ΠΈΠ±ΠΎ ΡƒΡΠΏΠ΅ΡˆΠ½ΠΎ, Π»ΠΈΠ±ΠΎ с ошибкой, Π»ΠΈΠ±ΠΎ ΠΎΡ‚ΠΌΠ΅Π½Π΅Π½ΠΎ.
16 16 /// </summary>
17 17 bool IsResolved { get; }
18 18
19 19 /// <summary>
20 20 /// ΠžΠ±Π΅Ρ‰Π°Π½ΠΈΠ΅ Π±Ρ‹Π»ΠΎ ΠΎΡ‚ΠΌΠ΅Π½Π΅Π½ΠΎ.
21 21 /// </summary>
22 22 bool IsCancelled { get; }
23 23
24 24 /// <summary>
25 /// Π˜ΡΠΊΠ»ΡŽΡ‡Π΅Π½ΠΈΠ΅ возникшСС Π² Ρ€Π΅Π·ΡƒΠ»ΡŒΡ‚Π°Ρ‚Π΅ выполнСния обСщания, Π»ΠΈΠ±ΠΎ ΠΏΡ€ΠΈΡ‡ΠΈΠ½Π° ΠΎΡ‚ΠΌΠ΅Π½Ρ‹.
26 /// </summary>
27 Exception Error { get; }
28
29 /// <summary>
25 30 /// Creates a new promise dependend on the current one and resolved on
26 31 /// executing the specified handlers.
27 32 /// </summary>
28 33 /// <param name="success">The handler called on the successful promise completion.</param>
29 34 /// <param name="error">The handler is called if an error while completing the promise occurred.</param>
30 35 /// <param name="cancel">The handler is called in case of promise cancellation.</param>
31 36 /// <returns>The newly created dependant promise.</returns>
32 37 /// <remarks>
33 38 /// <para>
34 39 /// If the success handler is specified the dependend promise will be resolved after the handler is
35 40 /// executed and the dependent promise will be linked to the current one, i.e. the cancellation
36 41 /// of the dependent property will lead to the cancellation of the current promise. If the
37 42 /// success handler isn't specified the dependent promise will not be linked to and
38 43 /// will not be resolved after the successfull resolution of the current one.
39 44 /// </para>
40 45 /// <para>
41 46 /// When the error handler is specified, the exception raised during the current promise completion
42 47 /// will be passed to it as the parameter. If the error handler returns without raising an
43 48 /// exception then the dependant promise will be resolved successfully, otherwise the exception
44 49 /// raised by the handler will be transmitted to the dependent promise. If the handler wants
45 50 /// to passthrough the original exception it needs to wrap the exception with
46 /// the <see cref="PromiseTransientException"/>.
51 /// the <see cref="PromiseTransientException"/>. The handler may raise <see cref="OperationCanceledException"/>
52 /// to cancel the dependant promise, the innner exception specifies the reason why the promise
53 /// is canceled.
47 54 /// </para>
48 55 /// <para>
49 56 /// If the cancelation handler is specified and the current promise is cancelled then the dependent
50 /// promise will be resolved after the handler is executed. If the cancelation hendler raises the
57 /// promise will be resolved after the handler is executed. If the cancelation handler raises the
51 58 /// exception it will be passed to the dependent promise.
52 59 /// </para>
53 60 /// </remarks>
54 IPromise Then(Action success, Action<Exception> error, Action cancel);
61 IPromise Then(Action success, Action<Exception> error, Action<Exception> cancel);
55 62 IPromise Then(Action success, Action<Exception> error);
56 63 IPromise Then(Action success);
57 64
58 IPromise Chain(Func<IPromise> chained, Func<Exception, IPromise> error, Func<IPromise> cancel);
65 IPromise Chain(Func<IPromise> chained, Func<Exception, IPromise> error, Func<Exception, IPromise> cancel);
59 66 IPromise Chain(Func<IPromise> chained, Func<Exception, IPromise> error);
60 67 IPromise Chain(Func<IPromise> chained);
61 68
62 69 /// <summary>
63 70 /// Adds specified listeners to the current promise.
64 71 /// </summary>
65 72 /// <param name="success">The handler called on the successful promise completion.</param>
66 73 /// <param name="error">The handler is called if an error while completing the promise occurred.</param>
67 74 /// <param name="cancel">The handler is called in case of promise cancellation.</param>
68 75 /// <returns>The current promise.</returns>
69 IPromise On(Action success, Action<Exception> error, Action cancel);
76 IPromise On(Action success, Action<Exception> error, Action<Exception> cancel);
70 77 IPromise On(Action success, Action<Exception> error);
71 78 IPromise On(Action success);
72 79
73 80 /// <summary>
74 81 /// Adds specified listeners to the current promise.
75 82 /// </summary>
76 83 /// <param name="handler">The handler called on the specified events.</param>
77 84 /// <param name = "events">The combination of flags denoting the events for which the
78 85 /// handler shoud be called.</param>
79 86 /// <returns>The current promise.</returns>
80 87 IPromise On(Action handler, PromiseEventType events);
81 88
82 89 /// <summary>
83 /// Adds the specified error handler to the current promise
84 /// and creates the new dependant promise.
85 /// </summary>
86 /// <param name="error">
87 /// The error handler. If the error handler returns without
88 /// an error the dependant promise will be successfully resolved.
89 /// </param>
90 /// <returns>
91 /// The new dependant promise which will be resolved after the error
92 /// handler is executed.
93 /// </returns>
94 /// <remarks>
95 /// The successfull result of the current promise will be ignored.
96 /// </remarks>
97 IPromise Error(Action<Exception> error);
98
99 /// <summary>
100 /// Adds the specified cncellation handler to the current promise
101 /// and creates the new dependant promise.
102 /// </summary>
103 /// <returns>
104 /// The new dependant promise which will be resolved after the cancellation
105 /// handler is executed.
106 /// </returns>
107 /// <param name="handler">
108 /// The cancellation handler.
109 /// </param>
110 /// <remarks>
111 /// If the cancellation handler is executed without an error the dependent
112 /// promise will be successfully resolved, otherwise the raised exception
113 /// will be passed to the dependant promise. The successful result of the
114 /// current promise will be ignored.
115 /// </remarks>
116 IPromise Cancelled(Action handler);
117
118 /// <summary>
119 90 /// ΠŸΡ€Π΅ΠΎΠ±Ρ€Π°Π·ΡƒΠ΅Ρ‚ Ρ€Π΅Π·ΡƒΠ»ΡŒΡ‚Π°Ρ‚ обСщания ΠΊ Π·Π°Π΄Π°Π½Π½ΠΎΠΌΡƒ Ρ‚ΠΈΠΏΡƒ ΠΈ Π²ΠΎΠ·Π²Ρ€Π°Ρ‰Π°Π΅Ρ‚ Π½ΠΎΠ²ΠΎΠ΅ ΠΎΠ±Π΅Ρ‰Π°Π½ΠΈΠ΅.
120 91 /// </summary>
121 92 IPromise<T> Cast<T>();
122 93
123 94 /// <summary>
124 95 /// Π‘ΠΈΠ½Ρ…Ρ€ΠΎΠ½ΠΈΠ·ΠΈΡ€ΡƒΠ΅Ρ‚ Ρ‚Π΅ΠΊΡƒΡ‰ΠΈΠΉ ΠΏΠΎΡ‚ΠΎΠΊ с ΠΎΠ±Π΅Ρ‰Π°Π½ΠΈΠ΅ΠΌ.
125 96 /// </summary>
126 97 void Join();
127 98 /// <summary>
128 99 /// Π‘ΠΈΠ½Ρ…Ρ€ΠΎΠ½ΠΈΠ·ΠΈΡ€ΡƒΠ΅Ρ‚ Ρ‚Π΅ΠΊΡƒΡ‰ΠΈΠΉ ΠΏΠΎΡ‚ΠΎΠΊ с ΠΎΠ±Π΅Ρ‰Π°Π½ΠΈΠ΅ΠΌ.
129 100 /// </summary>
130 101 /// <param name="timeout">ВрСмя оТидания, ΠΏΠΎ Π΅Π³ΠΎ ΠΈΡΡ‚Π΅Ρ‡Π΅Π½ΠΈΡŽ Π²ΠΎΠ·Π½ΠΈΠΊΠ½Π΅Ρ‚ ΠΈΡΠΊΠ»ΡŽΡ‡Π΅Π½ΠΈΠ΅.</param>
131 102 /// <exception cref="TimeoutException">ΠŸΡ€Π΅Π²Ρ‹ΡˆΠ΅Π½ΠΎ врСмя оТидания.</exception>
132 103 void Join(int timeout);
133 104
134 105 }
135 106 }
@@ -1,34 +1,30
1 1 using System;
2 2
3 3 namespace Implab {
4 4 public interface IPromise<out T> : IPromise {
5 5
6 6 new T Join();
7 7
8 8 new T Join(int timeout);
9 9
10 IPromise<T> On(Action<T> success, Action<Exception> error, Action cancel);
10 IPromise<T> On(Action<T> success, Action<Exception> error, Action<Exception> cancel);
11 11
12 12 IPromise<T> On(Action<T> success, Action<Exception> error);
13 13
14 14 IPromise<T> On(Action<T> success);
15 15
16 16 new IPromise<T> On(Action handler, PromiseEventType events);
17 17
18 IPromise<T2> Then<T2>(Func<T, T2> mapper, Func<Exception,T2> error, Func<T2> cancel);
18 IPromise<T2> Then<T2>(Func<T, T2> mapper, Func<Exception, T2> error, Func<Exception, T2> cancel);
19 19
20 IPromise<T2> Then<T2>(Func<T, T2> mapper, Func<Exception,T2> error);
20 IPromise<T2> Then<T2>(Func<T, T2> mapper, Func<Exception, T2> error);
21 21
22 22 IPromise<T2> Then<T2>(Func<T, T2> mapper);
23 23
24 IPromise<T2> Chain<T2>(Func<T, IPromise<T2>> chained, Func<Exception,IPromise<T2>> error, Func<IPromise<T2>> cancel);
24 IPromise<T2> Chain<T2>(Func<T, IPromise<T2>> chained, Func<Exception, IPromise<T2>> error, Func<Exception, IPromise<T2>> cancel);
25 25
26 IPromise<T2> Chain<T2>(Func<T, IPromise<T2>> chained, Func<Exception,IPromise<T2>> error);
26 IPromise<T2> Chain<T2>(Func<T, IPromise<T2>> chained, Func<Exception, IPromise<T2>> error);
27 27
28 28 IPromise<T2> Chain<T2>(Func<T, IPromise<T2>> chained);
29
30 IPromise<T2> Error<T2>(Func<Exception,T2> error);
31
32 IPromise<T2> Cancelled<T2>(Func<T2> handler);
33 29 }
34 30 }
@@ -1,258 +1,222
1 1 using System;
2 2 using System.Diagnostics;
3 3
4 4 namespace Implab {
5 5 public class Promise : AbstractPromise<Promise.HandlerDescriptor>, IPromise, IDeferred {
6 6
7 7 public struct HandlerDescriptor {
8 8 readonly Action m_success;
9 9 readonly Action<Exception> m_error;
10 readonly Action m_cancel;
10 readonly Action<Exception> m_cancel;
11 11 readonly IDeferred m_deferred;
12 12
13 public HandlerDescriptor(Action success, Action<Exception> error, Action cancel, IDeferred deferred) {
13 public HandlerDescriptor(Action success, Action<Exception> error, Action<Exception> cancel, IDeferred deferred) {
14 14 m_success = success;
15 15 m_error = error;
16 16 m_cancel = cancel;
17 17 m_deferred = deferred;
18 18 }
19 19
20 20 public void SignalSuccess() {
21 21 if (m_success != null) {
22 22 try {
23 23 m_success();
24 24 if (m_deferred != null)
25 25 m_deferred.Resolve();
26 26 } catch (Exception err) {
27 27 SignalError(err);
28 28 }
29 29 }
30 30 }
31 31
32 32 public void SignalError(Exception err) {
33 33 if (m_error != null) {
34 34 try {
35 35 m_error(err);
36 36 if (m_deferred != null)
37 37 m_deferred.Resolve();
38 38 } catch (Exception err2) {
39 39 if (m_deferred != null)
40 40 m_deferred.Reject(err2);
41 41 }
42 42 } else {
43 43 if (m_deferred != null)
44 44 m_deferred.Reject(err);
45 45 }
46 46 }
47 47
48 public void SignalCancel() {
48 public void SignalCancel(Exception reason) {
49 49 if (m_cancel != null) {
50 50 try {
51 m_cancel();
51 m_cancel(reason);
52 if (m_deferred != null)
53 m_deferred.Resolve();
54 } catch (Exception err) {
55 SignalError(err);
56 }
57 } else if (reason != null && m_error != null) {
58 try {
59 m_error(new OperationCanceledException("The operation was canceled.", reason));
52 60 if (m_deferred != null)
53 61 m_deferred.Resolve();
54 62 } catch (Exception err) {
55 63 SignalError(err);
56 64 }
57 65 } else {
58 66 if (m_deferred != null)
59 m_deferred.Cancel();
67 m_deferred.Cancel(reason);
60 68 }
61 69 }
62 70 }
63 71
64 72 public void Resolve() {
65 73 BeginSetResult();
66 74 EndSetResult();
67 75 }
68 76
69 77 public void Reject(Exception error) {
70 78 SetError(error);
71 79 }
72 80
73 81 #region implemented abstract members of AbstractPromise
74 82
75 83 protected override void SignalSuccess(HandlerDescriptor handler) {
76 84 handler.SignalSuccess();
77 85 }
78 86
79 87 protected override void SignalError(HandlerDescriptor handler, Exception error) {
80 88 handler.SignalError(error);
81 89 }
82 90
83 protected override void SignalCancelled(HandlerDescriptor handler) {
84 handler.SignalCancel();
91 protected override void SignalCancelled(HandlerDescriptor handler, Exception reason) {
92 handler.SignalCancel(reason);
85 93 }
86 94
87 95 protected override void Listen(PromiseEventType events, Action handler) {
88 96 AddHandler(new HandlerDescriptor(
89 97 events.HasFlag(PromiseEventType.Success) ? handler : null,
90 98 events.HasFlag(PromiseEventType.Error) ? new Action<Exception>(err => handler()) : null,
91 events.HasFlag(PromiseEventType.Cancelled) ? handler : null,
99 events.HasFlag(PromiseEventType.Cancelled) ? new Action<Exception>(reason => handler()) : null,
92 100 null
93 101 ));
94 102 }
95 103
96 104 #endregion
97 105
98 106
99 107 public Type PromiseType {
100 108 get {
101 109 return typeof(void);
102 110 }
103 111 }
104 112
105 public IPromise Then(Action success, Action<Exception> error, Action cancel) {
113 public IPromise Then(Action success, Action<Exception> error, Action<Exception> cancel) {
106 114 var promise = new Promise();
107 115 if (success != null)
108 116 promise.On(Cancel, PromiseEventType.Cancelled);
109 117
110 118 AddHandler(new HandlerDescriptor(success, error, cancel, promise));
111 119
112 120 return promise;
113 121 }
114 122
115 123 public IPromise Then(Action success, Action<Exception> error) {
116 124 return Then(success, error, null);
117 125 }
118 126
119 127 public IPromise Then(Action success) {
120 128 return Then(success, null, null);
121 129 }
122 130
123 public IPromise On(Action success, Action<Exception> error, Action cancel) {
131 public IPromise On(Action success, Action<Exception> error, Action<Exception> cancel) {
124 132 AddHandler(new HandlerDescriptor(success, error, cancel, null));
125 133 return this;
126 134 }
127 135
128 136 public IPromise On(Action success, Action<Exception> error) {
129 137 return On(success, error, null);
130 138 }
131 139
132 140 public IPromise On(Action success) {
133 141 return On(success, null, null);
134 142 }
135 143
136 144 public IPromise On(Action handler, PromiseEventType events) {
137 145 return On(
138 146 events.HasFlag(PromiseEventType.Success) ? handler : null,
139 147 events.HasFlag(PromiseEventType.Error) ? new Action<Exception>(err => handler()) : null,
140 events.HasFlag(PromiseEventType.Cancelled) ? handler : null
148 events.HasFlag(PromiseEventType.Cancelled) ? new Action<Exception>(reason => handler()) : null
141 149 );
142 150 }
143 151
144 152 public IPromise<T> Cast<T>() {
145 153 throw new InvalidCastException();
146 154 }
147 155
148 public IPromise Chain(Func<IPromise> chained, Func<Exception, IPromise> error, Func<IPromise> cancel) {
156 public IPromise Chain(Func<IPromise> chained, Func<Exception, IPromise> error, Func<Exception,IPromise> cancel) {
149 157 var medium = new Promise();
150 158
151 159 On(
152 160 () => {
153 161 if (medium.IsCancelled)
154 162 return;
155 163 if (chained != null)
156 164 ConnectPromise(chained(), medium);
157 165 },
158 166 ex => {
159 167 if (medium.IsCancelled)
160 168 return;
161 169 if (error != null) {
162 170 try {
163 171 ConnectPromise(error(ex), medium);
164 172 } catch (Exception ex2) {
165 173 medium.Reject(ex2);
166 174 }
167 175 } else {
168 176 medium.Reject(ex);
169 177 }
170 178 },
171 () => {
179 reason => {
172 180 if (medium.IsCancelled)
173 181 return;
174 182 if (cancel != null)
175 ConnectPromise(cancel(), medium);
183 ConnectPromise(cancel(reason), medium);
176 184 else
177 medium.Cancel();
185 medium.Cancel(reason);
178 186 }
179 187 );
180 188
181 189 if (chained != null)
182 190 medium.On(Cancel, PromiseEventType.Cancelled);
183 191
184 192 return medium;
185 193 }
186 194
187 195 static void ConnectPromise(IPromise result, Promise medium) {
188 196 if (result != null) {
189 197 result.On(
190 198 medium.Resolve,
191 199 medium.Reject,
192 () => medium.Reject(new OperationCanceledException())
200 medium.Cancel
193 201 );
194 medium.On(result.Cancel, PromiseEventType.Cancelled);
202 medium.On(null,null,result.Cancel);
195 203 } else {
196 204 medium.Reject(
197 205 new NullReferenceException(
198 206 "The chained asynchronous operation returned" +
199 207 " 'null' where the promise instance is expected"
200 208 )
201 209 );
202 210 }
203 211 }
204 212
205 213 public IPromise Chain(Func<IPromise> chained, Func<Exception, IPromise> error) {
206 214 return Chain(chained, error, null);
207 215 }
208 216
209 217 public IPromise Chain(Func<IPromise> chained) {
210 218 return Chain(chained, null, null);
211 219 }
212
213 public IPromise Error(Action<Exception> error) {
214 var promise = new Promise();
215 On(
216 null,
217 err => {
218 if (error != null)
219 try {
220 error(err);
221 promise.Resolve();
222 } catch (Exception err2) {
223 promise.Reject(err2);
224 }
225 else
226 promise.Reject(err);
227 }
228 );
229
230 return promise;
231 }
232
233 public IPromise Cancelled(Action handler) {
234 var promise = new Promise();
235 On(
236 null,
237 null,
238 () => {
239 if (handler != null) {
240 try {
241 handler();
242 promise.Resolve();
243 } catch (Exception err) {
244 promise.Reject(err);
245 }
246 } else {
247 promise.Cancel();
248 }
249 }
250 );
251
252 return promise;
253 }
254
255
256 220 }
257 221 }
258 222
@@ -1,192 +1,192
1 1 using System.Threading;
2 2 using System;
3 3 using Implab.Diagnostics;
4 4 using System.Collections.Generic;
5 5
6 6
7 7 #if NET_4_5
8 8 using System.Threading.Tasks;
9 9 #endif
10 10
11 11 namespace Implab {
12 12 public static class PromiseExtensions {
13 13 public static IPromise<T> DispatchToCurrentContext<T>(this IPromise<T> that) {
14 14 Safe.ArgumentNotNull(that, "that");
15 15 var context = SynchronizationContext.Current;
16 16 if (context == null)
17 17 return that;
18 18
19 19 var p = new SyncContextPromise<T>(context);
20 20 p.On(that.Cancel, PromiseEventType.Cancelled);
21 21
22 22 that.On(
23 23 p.Resolve,
24 24 p.Reject,
25 25 p.Cancel
26 26 );
27 27 return p;
28 28 }
29 29
30 30 public static IPromise<T> DispatchToContext<T>(this IPromise<T> that, SynchronizationContext context) {
31 31 Safe.ArgumentNotNull(that, "that");
32 32 Safe.ArgumentNotNull(context, "context");
33 33
34 34 var p = new SyncContextPromise<T>(context);
35 35 p.On(that.Cancel, PromiseEventType.Cancelled);
36 36
37 37
38 38 that.On(
39 39 p.Resolve,
40 40 p.Reject,
41 41 p.Cancel
42 42 );
43 43 return p;
44 44 }
45 45
46 46 /// <summary>
47 47 /// Ensures the dispatched.
48 48 /// </summary>
49 49 /// <returns>The dispatched.</returns>
50 50 /// <param name="that">That.</param>
51 51 /// <param name="head">Head.</param>
52 52 /// <param name="cleanup">Cleanup.</param>
53 53 /// <typeparam name="TPromise">The 1st type parameter.</typeparam>
54 54 /// <typeparam name="T">The 2nd type parameter.</typeparam>
55 55 public static TPromise EnsureDispatched<TPromise,T>(this TPromise that, IPromise<T> head, Action<T> cleanup) where TPromise : IPromise{
56 56 Safe.ArgumentNotNull(that, "that");
57 57 Safe.ArgumentNotNull(head, "head");
58 58
59 that.On(null,null,() => head.On(cleanup));
59 that.On(() => head.On(cleanup), PromiseEventType.Cancelled);
60 60
61 61 return that;
62 62 }
63 63
64 64 public static AsyncCallback AsyncCallback<T>(this Promise<T> that, Func<IAsyncResult,T> callback) {
65 65 Safe.ArgumentNotNull(that, "that");
66 66 Safe.ArgumentNotNull(callback, "callback");
67 67 var op = TraceContext.Instance.CurrentOperation;
68 68 return ar => {
69 69 TraceContext.Instance.EnterLogicalOperation(op,false);
70 70 try {
71 71 that.Resolve(callback(ar));
72 72 } catch (Exception err) {
73 73 that.Reject(err);
74 74 } finally {
75 75 TraceContext.Instance.Leave();
76 76 }
77 77 };
78 78 }
79 79
80 80 static void CancelCallback(object cookie) {
81 81 ((ICancellable)cookie).Cancel();
82 82 }
83 83
84 84 /// <summary>
85 85 /// Cancells promise after the specified timeout is elapsed.
86 86 /// </summary>
87 87 /// <param name="that">The promise to cancel on timeout.</param>
88 88 /// <param name="milliseconds">The timeout in milliseconds.</param>
89 89 /// <typeparam name="TPromise">The 1st type parameter.</typeparam>
90 90 public static TPromise Timeout<TPromise>(this TPromise that, int milliseconds) where TPromise : IPromise {
91 91 Safe.ArgumentNotNull(that, "that");
92 92 var timer = new Timer(CancelCallback, that, milliseconds, -1);
93 93 that.On(timer.Dispose, PromiseEventType.All);
94 94 return that;
95 95 }
96 96
97 97 public static IPromise Bundle(this ICollection<IPromise> that) {
98 98 Safe.ArgumentNotNull(that, "that");
99 99
100 100 int count = that.Count;
101 101 int errors = 0;
102 102 var medium = new Promise();
103 103
104 104 if (count == 0) {
105 105 medium.Resolve();
106 106 return medium;
107 107 }
108 108
109 109 medium.On(() => {
110 110 foreach(var p2 in that)
111 111 p2.Cancel();
112 112 }, PromiseEventType.ErrorOrCancel);
113 113
114 114 foreach (var p in that)
115 115 p.On(
116 116 () => {
117 117 if (Interlocked.Decrement(ref count) == 0)
118 118 medium.Resolve();
119 119 },
120 120 error => {
121 121 if (Interlocked.Increment(ref errors) == 1)
122 122 medium.Reject(
123 123 new Exception("The dependency promise is failed", error)
124 124 );
125 125 },
126 () => {
126 reason => {
127 127 if (Interlocked.Increment(ref errors) == 1)
128 medium.Reject(
128 medium.Cancel(
129 129 new Exception("The dependency promise is cancelled")
130 130 );
131 131 }
132 132 );
133 133
134 134 return medium;
135 135 }
136 136
137 137 public static IPromise<T[]> Bundle<T>(this ICollection<IPromise<T>> that) {
138 138 Safe.ArgumentNotNull(that, "that");
139 139
140 140 int count = that.Count;
141 141 int errors = 0;
142 142 var medium = new Promise<T[]>();
143 143 var results = new T[that.Count];
144 144
145 145 medium.On(() => {
146 146 foreach(var p2 in that)
147 147 p2.Cancel();
148 148 }, PromiseEventType.ErrorOrCancel);
149 149
150 150 int i = 0;
151 151 foreach (var p in that) {
152 152 var idx = i;
153 153 p.On(
154 154 x => {
155 155 results[idx] = x;
156 156 if (Interlocked.Decrement(ref count) == 0)
157 157 medium.Resolve(results);
158 158 },
159 159 error => {
160 160 if (Interlocked.Increment(ref errors) == 1)
161 161 medium.Reject(
162 162 new Exception("The dependency promise is failed", error)
163 163 );
164 164 },
165 () => {
165 reason => {
166 166 if (Interlocked.Increment(ref errors) == 1)
167 medium.Reject(
168 new Exception("The dependency promise is cancelled")
167 medium.Cancel(
168 new Exception("The dependency promise is cancelled", reason)
169 169 );
170 170 }
171 171 );
172 172 i++;
173 173 }
174 174
175 175 return medium;
176 176 }
177 177
178 178 #if NET_4_5
179 179
180 180 public static Task<T> GetTask<T>(this IPromise<T> that) {
181 181 Safe.ArgumentNotNull(that, "that");
182 182 var tcs = new TaskCompletionSource<T>();
183 183
184 that.On(tcs.SetResult, tcs.SetException, tcs.SetCanceled);
184 that.On(tcs.SetResult, tcs.SetException, r => tcs.SetCanceled());
185 185
186 186 return tcs.Task;
187 187 }
188 188
189 189 #endif
190 190 }
191 191 }
192 192
@@ -1,628 +1,574
1 1 using System;
2 2 using System.Diagnostics;
3 3
4 4 namespace Implab {
5 5
6 6 /// <summary>
7 7 /// Класс для асинхронного получСния Ρ€Π΅Π·ΡƒΠ»ΡŒΡ‚Π°Ρ‚ΠΎΠ². Π’Π°ΠΊ Π½Π°Π·Ρ‹Π²Π°Π΅ΠΌΠΎΠ΅ "ΠΎΠ±Π΅Ρ‰Π°Π½ΠΈΠ΅".
8 8 /// </summary>
9 9 /// <typeparam name="T">Π’ΠΈΠΏ ΠΏΠΎΠ»ΡƒΡ‡Π°Π΅ΠΌΠΎΠ³ΠΎ Ρ€Π΅Π·ΡƒΠ»ΡŒΡ‚Π°Ρ‚Π°</typeparam>
10 10 /// <remarks>
11 11 /// <para>БСрвис ΠΏΡ€ΠΈ ΠΎΠ±Ρ€Π°Ρ‰Π΅Π½ΠΈΠΈ ΠΊ Π΅Π³ΠΎ ΠΌΠ΅Ρ‚ΠΎΠ΄Ρƒ Π΄Π°Π΅Ρ‚ ΠΎΠ±Π΅Ρ‰Π°ΠΈΠ½ΠΈΠ΅ ΠΎ Π²Ρ‹ΠΏΠΎΠ»Π½Π΅Π½ΠΈΠΈ ΠΎΠΏΠ΅Ρ€Π°Ρ†ΠΈΠΈ,
12 12 /// ΠΊΠ»ΠΈΠ΅Π½Ρ‚ ΠΏΠΎΠ»ΡƒΡ‡ΠΈΠ² Ρ‚Π°ΠΊΠΎΠ΅ ΠΎΠ±Π΅Ρ‰Π°Π½ΠΈΠ΅ ΠΌΠΎΠΆΠ΅Ρ‚ ΡƒΡΡ‚Π°Π½ΠΎΠ²ΠΈΡ‚ΡŒ ряд ΠΎΠ±Ρ€Π°Ρ‚Π½Ρ‹Ρ… Π²Ρ‹Π·ΠΎΠ²ΠΎ для получСния
13 13 /// событий выполнСния обСщания, Ρ‚ΠΎΠ΅ΡΡ‚ΡŒ Π·Π°Π²Π΅Ρ€ΡˆΠ΅Π½ΠΈΡ ΠΎΠΏΠ΅Ρ€Π°Ρ†ΠΈΠΈ ΠΈ прСдоставлСнии Ρ€Π΅Π·ΡƒΠ»ΡŒΡ‚Π°Ρ‚ΠΎΠ².</para>
14 14 /// <para>
15 15 /// ΠžΠ±Π΅Ρ‰Π΅Π½ΠΈΠ΅ ΠΌΠΎΠΆΠ΅Ρ‚ Π±Ρ‹Ρ‚ΡŒ ΠΊΠ°ΠΊ Π²Ρ‹ΠΏΠΎΠ»Π½Π΅Π½ΠΎ, Ρ‚Π°ΠΊ ΠΈ Π²Ρ‹ΠΏΠΎΠ»Π½Π΅Π½ΠΎ с ошибкой. Для подписки Π½Π°
16 16 /// Π΄Π°Π½Π½Ρ‹Π΅ события ΠΊΠ»ΠΈΠ΅Π½Ρ‚ Π΄ΠΎΠ»ΠΆΠ΅Π½ ΠΈΡΠΏΠΎΠ»ΡŒΠ·ΠΎΠ²Π°Ρ‚ΡŒ ΠΌΠ΅Ρ‚ΠΎΠ΄Ρ‹ <c>Then</c>.
17 17 /// </para>
18 18 /// <para>
19 19 /// БСрвис, Π² свою ΠΎΡ‡Π΅Ρ€Π΅Π΄ΡŒ, ΠΏΠΎ ΠΎΠΊΠΎΠ½Ρ‡Π°Π½ΠΈΡŽ выполнСния ΠΎΠΏΠ΅Ρ€Π°Ρ†ΠΈΠΈ (Π²ΠΎΠ·ΠΌΠΎΠΆΠ½ΠΎ с ошибкой),
20 20 /// ΠΈΡΠΏΠΎΠ»ΡŒΠ·ΡƒΠ΅Ρ‚ ΠΌΠ΅Ρ‚ΠΎΠ΄Ρ‹ <c>Resolve</c> Π»ΠΈΠ±ΠΎ <c>Reject</c> для оповСщСния ΠΊΠ»ΠΈΠ΅Ρ‚Π½Π° ΠΎ
21 21 /// Π²Ρ‹ΠΏΠΎΠ»Π½Π΅Π½ΠΈΠΈ обСщания.
22 22 /// </para>
23 23 /// <para>
24 24 /// Если сСрвСр успСл Π²Ρ‹ΠΏΠΎΠ»Π½ΠΈΡ‚ΡŒ ΠΎΠ±Π΅Ρ‰Π°Π½ΠΈΠ΅ Π΅Ρ‰Π΅ Π΄ΠΎ Ρ‚ΠΎΠ³ΠΎ, ΠΊΠ°ΠΊ ΠΊΠ»ΠΈΠ΅Π½Ρ‚ Π½Π° Π½Π΅Π³ΠΎ подписался,
25 25 /// Ρ‚ΠΎ Π² ΠΌΠΎΠΌΠ΅Π½Ρ‚ подписки ΠΊΠ»ΠΈΠ΅Π½Ρ‚Π° Π±ΡƒΠ΄ΡƒΡ‚ Π²Ρ‹Π·Π²Π°Π½Ρ‹ ΡΠΎΠΎΡ‚Π²Π΅Ρ‚ΡΠ²ΡƒΡŽΡ‰ΠΈΠ΅ события Π² синхронном
26 26 /// Ρ€Π΅ΠΆΠΈΠΌΠ΅ ΠΈ ΠΊΠ»ΠΈΠ΅Π½Ρ‚ Π±ΡƒΠ΄Π΅Ρ‚ ΠΎΠΏΠΎΠ²Π΅Ρ‰Π΅Π½ Π² любом случаС. Π˜Π½Π°Ρ‡Π΅, ΠΎΠ±Ρ€Π°Π±ΠΎΡ‚Ρ‡ΠΈΠΊΠΈ Π΄ΠΎΠ±Π°Π²Π»ΡΡŽΡ‚ΡΡ Π²
27 27 /// список Π² порядкС подписания ΠΈ Π² этом ΠΆΠ΅ порядкС ΠΎΠ½ΠΈ Π±ΡƒΠ΄ΡƒΡ‚ Π²Ρ‹Π·Π²Π°Π½Ρ‹ ΠΏΡ€ΠΈ Π²Ρ‹ΠΏΠΎΠ»Π½Π΅Π½ΠΈΠΈ
28 28 /// обСщания.
29 29 /// </para>
30 30 /// <para>
31 31 /// ΠžΠ±Ρ€Π°Π±Π°Ρ‚Ρ‹Π²Π°Ρ Ρ€Π΅Π·ΡƒΠ»ΡŒΡ‚Π°Ρ‚Ρ‹ обСщания ΠΌΠΎΠΆΠ½ΠΎ ΠΏΡ€Π΅ΠΎΠ±Ρ€Π°Π·ΠΎΠ²Ρ‹Π²Π°Ρ‚ΡŒ Ρ€Π΅Π·ΡƒΠ»ΡŒΡ‚Π°Ρ‚Ρ‹ Π»ΠΈΠ±ΠΎ ΠΈΠ½ΠΈΡ†ΠΈΠΈΡ€ΠΎΠ²Π°Ρ‚ΡŒ
32 32 /// связанныС асинхронныС ΠΎΠΏΠ΅Ρ€Π°Ρ†ΠΈΠΈ, ΠΊΠΎΡ‚ΠΎΡ€Ρ‹Π΅ Ρ‚Π°ΠΊΠΆΠ΅ Π²ΠΎΠ·Π²Ρ€Π°Ρ‰Π°ΡŽΡ‚ обСщания. Для этого слСдуСт
33 33 /// ΠΈΡΠΏΠΎΠ»ΡŒΠ·ΠΎΠ²Π°Ρ‚ΡŒ ΡΠΎΠΎΡ‚Π²Π΅Ρ‚ΡΡ‚Π²ΡƒΡŽΡ‰ΡƒΡŽ Ρ„ΠΎΡ€ΠΌΡƒ ΠΌΠ΅Ρ‚ΠΎΠ΄Π΅ <c>Then</c>.
34 34 /// </para>
35 35 /// <para>
36 36 /// Π’Π°ΠΊΠΆΠ΅ Ρ…ΠΎΡ€ΠΎΡˆΠΈΠΌ ΠΏΡ€Π°Π²ΠΈΠ»ΠΎΠΌ являСтся Ρ‚ΠΎ, Ρ‡Ρ‚ΠΎ <c>Resolve</c> ΠΈ <c>Reject</c> Π΄ΠΎΠ»ΠΆΠ΅Π½ Π²Ρ‹Π·Ρ‹Π²Π°Ρ‚ΡŒ
37 37 /// Ρ‚ΠΎΠ»ΡŒΠΊΠΎ ΠΈΠ½ΠΈΡ†ΠΈΠ°Ρ‚ΠΎΡ€ обСщания ΠΈΠ½Π°Ρ‡Π΅ ΠΌΠΎΠ³ΡƒΡ‚ Π²ΠΎΠ·Π½ΠΈΠΊΠ½ΡƒΡ‚ΡŒ противорСчия.
38 38 /// </para>
39 39 /// </remarks>
40 40 public class Promise<T> : AbstractPromise<IDeferred<T>>, IPromise<T>, IDeferred<T> {
41 41
42 class StubDeferred : IDeferred<T> {
42 class StubDeferred : IDeferred, IDeferred<T> {
43 43 public static readonly StubDeferred instance = new StubDeferred();
44 44
45 45 StubDeferred() {
46 46 }
47 47
48 48 #region IDeferred implementation
49 49
50 50 public void Resolve(T value) {
51 51 }
52 52
53 public void Resolve() {
54 }
55
53 56 public void Reject(Exception error) {
54 57 }
55 58
56 59 #endregion
57 60
58 61 #region ICancellable implementation
59 62
60 63 public void Cancel() {
61 64 }
62 65
66 public void Cancel(Exception reason) {
67 }
68
63 69 #endregion
64 70
65 71
66 72 }
67 73
68 74 class RemapDescriptor<T2> : IDeferred<T> {
69 75 readonly Func<T,T2> m_remap;
70 readonly Func<Exception,T2> m_failed;
71 readonly Func<T2> m_cancel;
76 readonly Func<Exception, T2> m_failed;
77 readonly Func<Exception, T2> m_cancel;
72 78 readonly IDeferred<T2> m_deferred;
73 79
74 public RemapDescriptor(Func<T,T2> remap, Func<Exception,T2> failed, Func<T2> cancel, IDeferred<T2> deferred ) {
80 public RemapDescriptor(Func<T,T2> remap, Func<Exception,T2> failed, Func<Exception, T2> cancel, IDeferred<T2> deferred ) {
75 81 Debug.Assert(deferred != null);
76 82 m_remap = remap;
77 83 m_failed = failed;
78 84 m_cancel = cancel;
79 85 m_deferred = deferred;
80 86 }
81 87
82 88
83 89
84 90 #region IDeferred implementation
85 91
86 92 public void Resolve(T value) {
87 93 if (m_remap != null) {
88 94 try {
89 95 m_deferred.Resolve(m_remap(value));
90 96 } catch (Exception ex) {
91 97 Reject(ex);
92 98 }
93 99 }
94 100 }
95 101
96 102 public void Reject(Exception error) {
97 103 if (m_failed != null) {
98 104 try {
99 105 m_deferred.Resolve(m_failed(error));
100 106 } catch (Exception ex) {
101 107 m_deferred.Reject(ex);
102 108 }
103 109 } else {
104 110 m_deferred.Reject(error);
105 111 }
106 112 }
107 113
108 114
109 115 #endregion
110 116
111 117 #region ICancellable implementation
112 118
113 public void Cancel() {
119 public void Cancel(Exception reason) {
114 120 if (m_cancel != null) {
115 121 try {
116 m_deferred.Resolve(m_cancel());
122 m_deferred.Resolve(m_cancel(reason));
117 123 } catch (Exception ex) {
118 124 Reject(ex);
119 125 }
120 126 } else {
121 m_deferred.Cancel();
127 m_deferred.Cancel(reason);
122 128 }
123 129 }
124 130
131 public void Cancel() {
132 Cancel(null);
133 }
125 134 #endregion
126 135 }
127 136
128 137 class ListenerDescriptor : IDeferred<T> {
129 138 readonly Action m_handler;
130 139 readonly PromiseEventType m_events;
131 140
132 141 public ListenerDescriptor(Action handler, PromiseEventType events) {
133 142 Debug.Assert(handler != null);
134 143
135 144 m_handler = handler;
136 145 m_events = events;
137 146 }
138 147
139 148 #region IDeferred implementation
140 149
141 150 public void Resolve(T value) {
142 151 if (m_events.HasFlag(PromiseEventType.Success)) {
143 152 try {
144 153 m_handler();
145 154 // Analysis disable once EmptyGeneralCatchClause
146 155 } catch {
147 156 }
148 157 }
149 158 }
150 159
151 160 public void Reject(Exception error) {
152 161 if (m_events.HasFlag(PromiseEventType.Error)){
153 162 try {
154 163 m_handler();
155 164 // Analysis disable once EmptyGeneralCatchClause
156 165 } catch {
157 166 }
158 167 }
159 168 }
160 169
161 170 #endregion
162 171
163 172 #region ICancellable implementation
164 173
165 174 public void Cancel() {
175 Cancel(null);
176 }
177
178 public void Cancel(Exception reason) {
166 179 if (m_events.HasFlag(PromiseEventType.Cancelled)){
167 180 try {
168 181 m_handler();
169 182 // Analysis disable once EmptyGeneralCatchClause
170 183 } catch {
171 184 }
172 185 }
173 186 }
174 187
175 188 #endregion
176 189 }
177 190
178 191 class ValueEventDescriptor : IDeferred<T> {
179 192 readonly Action<T> m_success;
180 193 readonly Action<Exception> m_failed;
181 readonly Action m_cancelled;
194 readonly Action<Exception> m_cancelled;
182 195 readonly IDeferred<T> m_deferred;
183 196
184 public ValueEventDescriptor(Action<T> success, Action<Exception> failed, Action cancelled, IDeferred<T> deferred) {
197 public ValueEventDescriptor(Action<T> success, Action<Exception> failed, Action<Exception> cancelled, IDeferred<T> deferred) {
185 198 Debug.Assert(deferred != null);
186 199
187 200 m_success = success;
188 201 m_failed = failed;
189 202 m_cancelled = cancelled;
190 203 m_deferred = deferred;
191 204 }
192 205
193 206 #region IDeferred implementation
194 207
195 208 public void Resolve(T value) {
196 209 if (m_success != null) {
197 210 try {
198 211 m_success(value);
199 212 m_deferred.Resolve(value);
200 213 } catch (Exception ex) {
201 214 Reject(ex);
202 215 }
203 216 }
204 217 }
205 218
206 219 public void Reject(Exception error) {
207 220 if (m_failed != null) {
208 221 try {
209 222 m_failed(error);
210 223 m_deferred.Resolve(default(T));
211 224 } catch(Exception ex) {
212 225 m_deferred.Reject(ex);
213 226 }
214 227 } else {
215 228 m_deferred.Reject(error);
216 229 }
217 230 }
218 231
219 232 #endregion
220 233
221 234 #region ICancellable implementation
222 235
223 public void Cancel() {
236 public void Cancel(Exception reason) {
224 237 if (m_cancelled != null) {
225 238 try {
226 m_cancelled();
239 m_cancelled(reason);
227 240 m_deferred.Resolve(default(T));
228 } catch(Exception ex) {
241 } catch (Exception ex) {
229 242 Reject(ex);
230 243 }
231 244 } else {
232 m_deferred.Cancel();
245 m_deferred.Cancel(reason);
233 246 }
234 247 }
235 248
249 public void Cancel() {
250 Cancel(null);
251 }
252
236 253 #endregion
237 254 }
238 255
239 256 public class EventDescriptor : IDeferred<T> {
240 257 readonly Action m_success;
241 258 readonly Action<Exception> m_failed;
242 readonly Action m_cancelled;
243 readonly IDeferred<T> m_deferred;
259 readonly Action<Exception> m_cancelled;
260 readonly IDeferred m_deferred;
244 261
245 public EventDescriptor(Action success, Action<Exception> failed, Action cancelled, IDeferred<T> deferred) {
262 public EventDescriptor(Action success, Action<Exception> failed, Action<Exception> cancelled, IDeferred deferred) {
246 263 Debug.Assert(deferred != null);
247 264
248 265 m_success = success;
249 266 m_failed = failed;
250 267 m_cancelled = cancelled;
251 268 m_deferred = deferred;
252 269 }
253 270
254 271 #region IDeferred implementation
255 272
256 273 public void Resolve(T value) {
257 274 if (m_success != null) {
258 275 try {
259 276 m_success();
260 m_deferred.Resolve(value);
277 m_deferred.Resolve();
261 278 } catch (Exception ex) {
262 279 Reject(ex);
263 280 }
264 281 }
265 282 }
266 283
267 284 public void Reject(Exception error) {
268 285 if (m_failed != null) {
269 286 try {
270 287 m_failed(error);
271 m_deferred.Resolve(default(T));
272 }catch (Exception ex)
273 {
288 m_deferred.Resolve();
289 } catch (Exception ex) {
274 290 m_deferred.Reject(ex);
275 291 }
276 292 } else {
277 293 m_deferred.Reject(error);
278 294 }
279
280 295 }
281 296
282 297 #endregion
283 298
284 299 #region ICancellable implementation
285 300
286 public void Cancel() {
301 public void Cancel(Exception reason) {
287 302 if (m_cancelled != null) {
288 303 try {
289 m_cancelled();
290 m_deferred.Resolve(default(T));
304 m_cancelled(reason);
305 m_deferred.Resolve();
291 306 } catch (Exception ex) {
292 307 Reject(ex);
293 308 }
294 309 } else {
295 m_deferred.Cancel();
310 m_deferred.Cancel(reason);
296 311 }
297 312 }
298 313
314 public void Cancel() {
315 Cancel(null);
316 }
317
299 318 #endregion
300 319 }
301 320
302 321 T m_result;
303 322
304 323 public virtual void Resolve(T value) {
305 324 if (BeginSetResult()) {
306 325 m_result = value;
307 326 EndSetResult();
308 327 }
309 328 }
310 329
311 330 public void Reject(Exception error) {
312 331 SetError(error);
313 332 }
314 333
315 334 public Type PromiseType {
316 335 get {
317 336 return typeof(T);
318 337 }
319 338 }
320 339
321 340 public new T Join() {
322 341 WaitResult(-1);
323 342 return m_result;
324 343 }
325 344 public new T Join(int timeout) {
326 345 WaitResult(timeout);
327 346 return m_result;
328 347 }
329 348
330 public IPromise<T> On(Action<T> success, Action<Exception> error, Action cancel) {
349 public IPromise<T> On(Action<T> success, Action<Exception> error, Action<Exception> cancel) {
331 350 AddHandler(new ValueEventDescriptor(success, error, cancel, StubDeferred.instance));
332 351 return this;
333 352 }
334 353
335 354 public IPromise<T> On(Action<T> success, Action<Exception> error) {
336 355 AddHandler(new ValueEventDescriptor(success, error, null, StubDeferred.instance));
337 356 return this;
338 357 }
339 358
340 359 public IPromise<T> On(Action<T> success) {
341 360 AddHandler(new ValueEventDescriptor(success, null, null, StubDeferred.instance));
342 361 return this;
343 362 }
344 363
345 364 public IPromise<T> On(Action handler, PromiseEventType events) {
346 365 Listen(events, handler);
347 366 return this;
348 367 }
349 368
350 public IPromise<T2> Then<T2>(Func<T, T2> mapper, Func<Exception, T2> error, Func<T2> cancel) {
369 public IPromise<T2> Then<T2>(Func<T, T2> mapper, Func<Exception, T2> error, Func<Exception, T2> cancel) {
351 370 var promise = new Promise<T2>();
352 371 if (mapper != null)
353 promise.On(Cancel, PromiseEventType.Cancelled);
372 promise.On((Action)null, null, Cancel);
354 373 AddHandler(new RemapDescriptor<T2>(mapper, error, cancel, promise));
355 374 return promise;
356 375 }
357 376
358 377 public IPromise<T2> Then<T2>(Func<T, T2> mapper, Func<Exception, T2> error) {
359 378 var promise = new Promise<T2>();
360 379 if (mapper != null)
361 promise.On(Cancel, PromiseEventType.Cancelled);
380 promise.On((Action)null, null, Cancel);
362 381 AddHandler(new RemapDescriptor<T2>(mapper, error, null, promise));
363 382 return promise;
364 383 }
365 384
366 385 public IPromise<T2> Then<T2>(Func<T, T2> mapper) {
367 386 var promise = new Promise<T2>();
368 387 if (mapper != null)
369 promise.On(Cancel, PromiseEventType.Cancelled);
388 promise.On((Action)null, null, Cancel);
370 389 AddHandler(new RemapDescriptor<T2>(mapper, null, null, promise));
371 390 return promise;
372 391 }
373 392
374 public IPromise<T2> Chain<T2>(Func<T, IPromise<T2>> chained, Func<Exception, IPromise<T2>> error, Func<IPromise<T2>> cancel) {
393 public IPromise<T2> Chain<T2>(Func<T, IPromise<T2>> chained, Func<Exception, IPromise<T2>> error, Func<Exception, IPromise<T2>> cancel) {
375 394 // this promise will be resolved when an asyc operation is started
376 395 var promise = new Promise<IPromise<T2>>();
377 396
378 397 AddHandler(new RemapDescriptor<IPromise<T2>>(
379 398 chained,
380 399 error,
381 400 cancel,
382 401 promise
383 402 ));
384 403
385 404 var medium = new Promise<T2>();
386 405
387 406 if (chained != null)
388 407 medium.On(Cancel, PromiseEventType.Cancelled);
389 408
390 409 // we need to connect started async operation with the medium
391 410 // if the async operation hasn't been started by the some reason
392 411 // report is to the medium
393 412 promise.On(
394 413 result => ConnectPromise<T2>(result, medium),
395 414 medium.Reject,
396 415 medium.Cancel
397 416 );
398 417
399 418 return medium;
400 419 }
401 420
402 421 static void ConnectPromise<T2>(IPromise<T2> result, Promise<T2> medium) {
403 422 if (result != null) {
404 423 result.On(
405 424 medium.Resolve,
406 425 medium.Reject,
407 () => medium.Reject(new OperationCanceledException())
426 medium.Cancel
408 427 );
409 428 medium.On(result.Cancel, PromiseEventType.Cancelled);
410 429 } else {
411 430 medium.Reject(
412 431 new NullReferenceException(
413 432 "The chained asynchronous operation returned" +
414 433 " 'null' where the promise instance is expected"
415 434 )
416 435 );
417 436 }
418 437 }
419 438
420 439 public IPromise<T2> Chain<T2>(Func<T, IPromise<T2>> chained, Func<Exception, IPromise<T2>> error) {
421 440 return Chain(chained, error, null);
422 441 }
423 442
424 443 public IPromise<T2> Chain<T2>(Func<T, IPromise<T2>> chained) {
425 444 return Chain(chained, null, null);
426 445 }
427 446
428 public IPromise<T2> Error<T2>(Func<Exception, T2> error) {
429 var promise = new Promise<T2>();
430 if (error != null)
431 On(
432 (Action<T>)null,
433 ex => {
434 try {
435 promise.Resolve(error(ex));
436 } catch (Exception ex2) {
437 promise.Reject(ex2);
438 }
439 }
440 );
441 else
442 Listen(PromiseEventType.Error, () => promise.Resolve(default(T2)));
443 return promise;
444 }
445
446 public IPromise<T2> Cancelled<T2>(Func<T2> handler) {
447 var promise = new Promise<T2>();
448 if (handler != null)
449 On(
450 (Action<T>)null,
451 null,
452 () => {
453 try {
454 promise.Resolve(handler());
455 } catch (Exception ex) {
456 promise.Reject(ex);
457 }
458 });
459 else
460 Listen(PromiseEventType.Cancelled, () => promise.Resolve(default(T2)));
461 return promise;
462 }
463
464 public IPromise Then(Action success, Action<Exception> error, Action cancel) {
465 var promise = new Promise<T>();
447 public IPromise Then(Action success, Action<Exception> error, Action<Exception> cancel) {
448 var promise = new Promise();
466 449 if (success != null)
467 promise.On(Cancel, PromiseEventType.Cancelled);
450 promise.On(null, null, Cancel);
468 451
469 452 AddHandler(new EventDescriptor(success, error, cancel, promise));
470 453
471 454 return promise;
472 455 }
473 456
474 457 public IPromise Then(Action success, Action<Exception> error) {
475 458 return Then(success, error, null);
476 459 }
477 460
478 461 public IPromise Then(Action success) {
479 462 return Then(success, null, null);
480 463 }
481 464
482 public IPromise Chain(Func<IPromise> chained, Func<Exception, IPromise> error, Func<IPromise> cancel) {
465 public IPromise Chain(Func<IPromise> chained, Func<Exception, IPromise> error, Func<Exception, IPromise> cancel) {
483 466 var promise = new Promise<IPromise>();
484 467
485 468 AddHandler(
486 469 new RemapDescriptor<IPromise>(
487 470 x => chained(),
488 471 error,
489 472 cancel,
490 473 promise
491 474 )
492 475 );
493 476
494 477 var medium = new Promise();
495 478 if (chained != null)
496 medium.On(Cancel, PromiseEventType.Cancelled);
479 medium.On(null, null, Cancel);
497 480
498 481 promise.On(
499 482 result => ConnectPromise(result, medium),
500 483 medium.Reject,
501 484 medium.Cancel
502 485 );
503 486
504 487 return medium;
505 488 }
506 489
507 490 static void ConnectPromise(IPromise result, Promise medium) {
508 491 if (result != null) {
509 492 result.On(
510 493 medium.Resolve,
511 494 medium.Reject,
512 () => medium.Reject(new OperationCanceledException())
495 medium.Cancel
513 496 );
514 medium.On(result.Cancel, PromiseEventType.Cancelled);
497 medium.On(null, null, result.Cancel);
515 498 } else {
516 499 medium.Reject(
517 500 new NullReferenceException(
518 501 "The chained asynchronous operation returned" +
519 502 " 'null' where the promise instance is expected"
520 503 )
521 504 );
522 505 }
523 506 }
524 507
525 508 public IPromise Chain(Func<IPromise> chained, Func<Exception, IPromise> error) {
526 509 return Chain(chained, error, null);
527 510 }
528 511
529 512 public IPromise Chain(Func<IPromise> chained) {
530 513 return Chain(chained, null, null);
531 514 }
532 515
533 public IPromise On(Action success, Action<Exception> error, Action cancel) {
516 public IPromise On(Action success, Action<Exception> error, Action<Exception> cancel) {
534 517 AddHandler(new EventDescriptor(success,error,cancel, StubDeferred.instance));
535 518 return this;
536 519 }
537 520
538 521 public IPromise On(Action success, Action<Exception> error) {
539 522 AddHandler(new EventDescriptor(success, error, null, StubDeferred.instance));
540 523 return this;
541 524 }
542 525
543 526 public IPromise On(Action success) {
544 527 Listen(PromiseEventType.Success, success);
545 528 return this;
546 529 }
547 530
548 531 IPromise IPromise.On(Action handler, PromiseEventType events) {
549 532 Listen(events,handler);
550 533 return this;
551 534 }
552 535
553 public IPromise Error(Action<Exception> error) {
554 var promise = new Promise();
555 if (error != null)
556 On(
557 (Action<T>)null,
558 ex => {
559 try {
560 error(ex);
561 promise.Resolve();
562 } catch (Exception ex2) {
563 promise.Reject(ex2);
564 }
565 });
566 else
567 Listen(PromiseEventType.Error, promise.Resolve);
568 return promise;
569 }
570
571 public IPromise Cancelled(Action handler) {
572 var promise = new Promise();
573 if (handler != null)
574 On(
575 (Action<T>)null,
576 null,
577 () => {
578 try {
579 handler();
580 promise.Resolve();
581 } catch (Exception ex) {
582 promise.Reject(ex);
583 }
584 });
585 else
586 Listen(PromiseEventType.Cancelled, promise.Resolve);
587 return promise;
588 }
589
590 536 public IPromise<T2> Cast<T2>() {
591 537 return (IPromise<T2>)this;
592 538 }
593 539
594 540 #region implemented abstract members of AbstractPromise
595 541
596 542 protected override void SignalSuccess(IDeferred<T> handler) {
597 543 handler.Resolve(m_result);
598 544 }
599 545
600 546 protected override void SignalError(IDeferred<T> handler, Exception error) {
601 547 handler.Reject(error);
602 548 }
603 549
604 protected override void SignalCancelled(IDeferred<T> handler) {
605 handler.Cancel();
550 protected override void SignalCancelled(IDeferred<T> handler, Exception reason) {
551 handler.Cancel(reason);
606 552 }
607 553
608 554 protected override void Listen(PromiseEventType events, Action handler) {
609 555 if (handler != null)
610 556 AddHandler(new ListenerDescriptor(handler, events));
611 557 }
612 558
613 559 #endregion
614 560
615 561 public static IPromise<T> ResultToPromise(T value) {
616 562 var p = new Promise<T>();
617 563 p.Resolve(value);
618 564 return p;
619 565 }
620 566
621 567 public static IPromise<T> ExceptionToPromise(Exception error) {
622 568 var p = new Promise<T>();
623 569 p.Reject(error);
624 570 return p;
625 571 }
626 572
627 573 }
628 574 }
@@ -1,33 +1,32
1 1 using System;
2 2
3 3 namespace Implab {
4
5 4 [Serializable]
6 5 public class PromiseTransientException : Exception {
7 6 /// <summary>
8 7 /// Initializes a new instance of the <see cref="PromiseTransientException"/> class.
9 8 /// </summary>
10 9 /// <param name="inner">The exception that is the cause of the current exception.</param>
11 10 public PromiseTransientException(Exception inner) : base("The preceding promise has failed", inner) {
12 11 }
13 12
14 13 /// <summary>
15 14 /// Initializes a new instance of the <see cref="PromiseTransientException"/> class
16 15 /// </summary>
17 16 /// <param name="message">A <see cref="T:System.String"/> that describes the exception. </param>
18 17 /// <param name="inner">The exception that is the cause of the current exception. </param>
19 18 public PromiseTransientException(string message, Exception inner)
20 19 : base(message, inner) {
21 20 }
22 21
23 22 /// <summary>
24 23 /// Initializes a new instance of the <see cref="PromiseTransientException"/> class
25 24 /// </summary>
26 25 /// <param name="context">The contextual information about the source or destination.</param>
27 26 /// <param name="info">The object that holds the serialized object data.</param>
28 27 protected PromiseTransientException(System.Runtime.Serialization.SerializationInfo info, System.Runtime.Serialization.StreamingContext context)
29 28 : base(info, context) {
30 29 }
31 30 }
32 31 }
33 32
@@ -1,25 +1,26
1 1 using System.Threading;
2 using System;
2 3
3 4 namespace Implab {
4 5 public class SyncContextPromise<T> : Promise<T> {
5 6 readonly SynchronizationContext m_context;
6 7
7 8 public SyncContextPromise(SynchronizationContext context) {
8 9 Safe.ArgumentNotNull(context, "context");
9 10 m_context = context;
10 11 }
11 12
12 13 protected override void SignalSuccess(IDeferred<T> handler) {
13 14 m_context.Post(x => base.SignalSuccess(handler), null);
14 15 }
15 16
16 protected override void SignalError(IDeferred<T> handler, System.Exception error) {
17 protected override void SignalError(IDeferred<T> handler, Exception error) {
17 18 m_context.Post(x => base.SignalError(handler, error), null);
18 19 }
19 20
20 protected override void SignalCancelled(IDeferred<T> handler) {
21 m_context.Post(x => base.SignalCancelled(handler), null);
21 protected override void SignalCancelled(IDeferred<T> handler, Exception reason) {
22 m_context.Post(x => base.SignalCancelled(handler, reason), null);
22 23 }
23 24 }
24 25 }
25 26
@@ -1,136 +1,143
1 1 using System;
2 2 using System.Collections.Generic;
3 3 using System.Linq;
4 4 using System.Text;
5 5 using System.Threading;
6 6
7 7 namespace Implab
8 8 {
9 9 /// <summary>
10 10 /// This class allows to interact with asyncronuos task.
11 11 /// </summary>
12 12 /// <remarks>
13 13 /// Members of this object are thread safe.
14 14 /// </remarks>
15 15 public class TaskController: IProgressNotifier, ITaskController
16 16 {
17 17 readonly object m_lock;
18 18 string m_message;
19 19
20 20 float m_current;
21 21 float m_max;
22 22
23 23 bool m_cancelled;
24 24
25 25 public event EventHandler Cancelled;
26 26 public event EventHandler<ValueEventArgs<string>> MessageUpdated;
27 27 public event EventHandler<ValueEventArgs<float>> ProgressUpdated;
28 28 public event EventHandler<ProgressInitEventArgs> ProgressInit;
29 29
30 30 public TaskController()
31 31 {
32 32 m_lock = new Object();
33 33 }
34 34
35 35 public string Message
36 36 {
37 37 get
38 38 {
39 39 lock (m_lock)
40 40 return m_message;
41 41 }
42 42 set
43 43 {
44 44 lock (m_lock)
45 45 {
46 46 m_message = value;
47 47 OnMessageUpdated();
48 48 }
49 49 }
50 50 }
51 51
52 52 public float CurrentProgress
53 53 {
54 54 get
55 55 {
56 56 lock (m_lock)
57 57 return m_current;
58 58 }
59 59 set
60 60 {
61 61 lock (m_lock)
62 62 {
63 63 var prev = m_current;
64 64 m_current = value;
65 65 if (m_current >= m_max)
66 66 m_current = m_max;
67 67 if (m_current != prev)
68 68 OnProgressUpdated();
69 69 }
70 70 }
71 71 }
72 72
73 73 public void InitProgress(float current, float max, string message)
74 74 {
75 75 if (max < 0)
76 76 throw new ArgumentOutOfRangeException("max");
77 77 if (current < 0 || current > max)
78 78 throw new ArgumentOutOfRangeException("current");
79 79
80 80 lock(m_lock) {
81 81 m_current = current;
82 82 m_max = max;
83 83 m_message = message;
84 84 OnProgressInit();
85 85 }
86 86 }
87 87
88 88 public bool IsCancelled {
89 89 get {
90 90 lock (m_lock)
91 91 return m_cancelled;
92 92 }
93 93 }
94 94
95 95 public void Cancel() {
96 96 lock (m_lock) {
97 97 if (!m_cancelled)
98 98 m_cancelled = true;
99 99 }
100 100 }
101 101
102 public void Cancel(Exception reason) {
103 lock (m_lock) {
104 if (!m_cancelled)
105 m_cancelled = true;
106 }
107 }
108
102 109 protected virtual void OnCancelled() {
103 110 var temp = Cancelled;
104 111 if (temp != null) {
105 112 temp(this,new EventArgs());
106 113 }
107 114 }
108 115
109 116 protected virtual void OnMessageUpdated()
110 117 {
111 118 var temp = MessageUpdated;
112 119 if (temp != null)
113 120 {
114 121 temp(this, new ValueEventArgs<string>(m_message));
115 122 }
116 123 }
117 124
118 125 protected virtual void OnProgressUpdated()
119 126 {
120 127 var temp = ProgressUpdated;
121 128 if (temp != null)
122 129 {
123 130 temp(this,new ValueEventArgs<float>(m_current));
124 131 }
125 132 }
126 133
127 134 protected virtual void OnProgressInit()
128 135 {
129 136 var temp = ProgressInit;
130 137 if (temp != null)
131 138 {
132 139 temp(this, new ProgressInitEventArgs(m_current,m_max, m_message));
133 140 }
134 141 }
135 142 }
136 143 }
@@ -1,93 +1,93
1 1 using System;
2 2 using Implab.Diagnostics;
3 3 using Implab.Parallels;
4 4 using Implab;
5 5 using System.Collections.Generic;
6 6 using System.Collections.Concurrent;
7 7 using System.Threading;
8 8
9 9 namespace MonoPlay {
10 10 class MainClass {
11 11 public static void Main(string[] args) {
12 12 if (args == null)
13 13 throw new ArgumentNullException("args");
14 14
15 15 var t1 = Environment.TickCount;
16 16
17 17 const int reads = 100000;
18 18 const int writes = 1000;
19 19 const int readThreads = 8;
20 20 const int writeThreads = 0;
21 21
22 22 var l = new SharedLock();
23 23 var st = new HashSet<int>();
24 24
25 25 Action reader1 = () => {
26 26 for (int i =0; i < reads; i++) {
27 27 try {
28 28 l.LockShared();
29 29 st.Contains(i % 1000);
30 30 Thread.Sleep(0);
31 31 } finally {
32 32 l.Release();
33 33 }
34 34 }
35 35 };
36 36
37 37 Action reader2 = () => {
38 38 for(var i = 0; i < reads; i++)
39 39 lock(st) {
40 40 st.Contains(i % 1000);
41 41 Thread.Sleep(0);
42 42 }
43 43 };
44 44
45 45 Action writer1 = () => {
46 46 var rnd = new Random(Environment.TickCount);
47 47 for (int i = 0; i < writes; i++) {
48 48 try {
49 49 l.LockExclusive();
50 50 st.Add(rnd.Next(1000));
51 51 //Thread.Sleep(1);
52 52 } finally {
53 53 l.Release();
54 54 }
55 55 }
56 56 };
57 57
58 58 Action writer2 = () => {
59 59 var rnd = new Random(Environment.TickCount);
60 60 for (int i = 0; i < writes; i++) {
61 61 lock (st) {
62 62 st.Add(rnd.Next(1000));
63 63 //Thread.Sleep(1);
64 64 }
65 65 }
66 66 };
67 67
68 68
69 69
70 70 var readers = new IPromise[readThreads];
71 71 for (int i = 0; i < readThreads; i++)
72 readers[i] = AsyncPool.RunThread(reader1);
72 readers[i] = AsyncPool.RunThread(reader2);
73 73
74 74 var writers = new IPromise[writeThreads];
75 75 for (int i = 0; i < writeThreads; i++)
76 76 writers[i] = AsyncPool.RunThread(writer1);
77 77
78 78
79 79 new [] {
80 80 readers.Bundle().On(() => Console.WriteLine("readers complete in {0} ms", Environment.TickCount - t1)),
81 81 writers.Bundle().On(() => Console.WriteLine("writers complete in {0} ms", Environment.TickCount - t1))
82 82 }.Bundle().Join();
83 83
84 84
85 85
86 86 var t2 = Environment.TickCount;
87 87 Console.WriteLine("done: {0} ms, {1:.00} Mb, {2} GC", t2 - t1, GC.GetTotalMemory(false) / (1024*1024), GC.CollectionCount(0) );
88 88
89 89 }
90 90
91 91
92 92 }
93 93 }
General Comments 0
You need to be logged in to leave comments. Login now