##// END OF EJS Templates
fixed promises cancellation
cin -
r149:eb793fbbe4ea v2
parent child
Show More
@@ -1,852 +1,850
1 1 using System;
2 2 using System.Reflection;
3 3 using System.Threading;
4 4 using Implab.Parallels;
5 5
6 6 #if MONO
7 7
8 8 using NUnit.Framework;
9 9 using TestClassAttribute = NUnit.Framework.TestFixtureAttribute;
10 10 using TestMethodAttribute = NUnit.Framework.TestAttribute;
11 11
12 12 #else
13 13
14 14 using Microsoft.VisualStudio.TestTools.UnitTesting;
15 15
16 16 #endif
17 17
18 18 namespace Implab.Test {
19 19 [TestClass]
20 20 public class AsyncTests {
21 21 [TestMethod]
22 22 public void ResolveTest() {
23 23 int res = -1;
24 24 var p = new Promise<int>();
25 25 p.Then(x => res = x);
26 26 p.Resolve(100);
27 27
28 28 Assert.AreEqual(100, res);
29 29 }
30 30
31 31 [TestMethod]
32 32 public void RejectTest() {
33 33 int res = -1;
34 34 Exception err = null;
35 35
36 36 var p = new Promise<int>();
37 37 p.Then(
38 38 x => res = x,
39 39 e => {
40 40 err = e;
41 41 return -2;
42 42 }
43 43 );
44 44 p.Reject(new ApplicationException("error"));
45 45
46 46 Assert.AreEqual(res, -1);
47 47 Assert.AreEqual(err.Message, "error");
48 48
49 49 }
50 50
51 51 [TestMethod]
52 52 public void CancelExceptionTest() {
53 53 var p = new Promise<bool>();
54 54 p.CancelOperation(null);
55 55
56 56 var p2 = p.Then(x => x, null, reason => {
57 57 throw new ApplicationException("CANCELLED");
58 58 });
59 59
60 60 try {
61 61 p2.Join();
62 62 Assert.Fail();
63 63 } catch (ApplicationException err) {
64 64 Assert.AreEqual("CANCELLED", err.InnerException.Message);
65 65 }
66 66
67 67 }
68 68
69 69 [TestMethod]
70 70 public void ContinueOnCancelTest() {
71 71 var p = new Promise<bool>();
72 72 p.CancelOperation(null);
73 73
74 74 var p2 = p
75 75 .Then(x => x, null, reason => {
76 76 throw new ApplicationException("CANCELLED");
77 77 })
78 78 .Then(x => x, e => true);
79 79
80 80 Assert.AreEqual(true, p2.Join());
81 81 }
82 82
83 83 [TestMethod]
84 84 public void JoinSuccessTest() {
85 85 var p = new Promise<int>();
86 86 p.Resolve(100);
87 87 Assert.AreEqual(p.Join(), 100);
88 88 }
89 89
90 90 [TestMethod]
91 91 public void JoinFailTest() {
92 92 var p = new Promise<int>();
93 93 p.Reject(new ApplicationException("failed"));
94 94
95 95 try {
96 96 p.Join();
97 97 throw new ApplicationException("WRONG!");
98 98 } catch (TargetInvocationException err) {
99 99 Assert.AreEqual(err.InnerException.Message, "failed");
100 100 } catch {
101 101 Assert.Fail("Got wrong excaption");
102 102 }
103 103 }
104 104
105 105 [TestMethod]
106 106 public void MapTest() {
107 107 var p = new Promise<int>();
108 108
109 109 var p2 = p.Then(x => x.ToString());
110 110 p.Resolve(100);
111 111
112 112 Assert.AreEqual(p2.Join(), "100");
113 113 }
114 114
115 115 [TestMethod]
116 116 public void FixErrorTest() {
117 117 var p = new Promise<int>();
118 118
119 119 var p2 = p.Then(x => x, e => 101);
120 120
121 121 p.Reject(new Exception());
122 122
123 123 Assert.AreEqual(p2.Join(), 101);
124 124 }
125 125
126 126 [TestMethod]
127 127 public void ChainTest() {
128 128 var p1 = new Promise<int>();
129 129
130 130 var p3 = p1.Chain(x => {
131 131 var p2 = new Promise<string>();
132 132 p2.Resolve(x.ToString());
133 133 return p2;
134 134 });
135 135
136 136 p1.Resolve(100);
137 137
138 138 Assert.AreEqual(p3.Join(), "100");
139 139 }
140 140
141 141 [TestMethod]
142 142 public void ChainFailTest() {
143 143 var p1 = new Promise<int>();
144 144
145 145 var p3 = p1.Chain(x => {
146 146 var p2 = new Promise<string>();
147 147 p2.Reject(new Exception("DIE!!!"));
148 148 return p2;
149 149 });
150 150
151 151 p1.Resolve(100);
152 152
153 153 Assert.IsTrue(p3.IsResolved);
154 154 }
155 155
156 156 [TestMethod]
157 157 public void PoolTest() {
158 158 var pid = Thread.CurrentThread.ManagedThreadId;
159 159 var p = AsyncPool.Invoke(() => Thread.CurrentThread.ManagedThreadId);
160 160
161 161 Assert.AreNotEqual(pid, p.Join());
162 162 }
163 163
164 164 [TestMethod]
165 165 public void WorkerPoolSizeTest() {
166 166 var pool = new WorkerPool(5, 10, 1);
167 167
168 168 Assert.AreEqual(5, pool.PoolSize);
169 169
170 170 pool.Invoke(() => { Thread.Sleep(100000000); return 10; });
171 171 pool.Invoke(() => { Thread.Sleep(100000000); return 10; });
172 172 pool.Invoke(() => { Thread.Sleep(100000000); return 10; });
173 173
174 174 Assert.AreEqual(5, pool.PoolSize);
175 175
176 176 for (int i = 0; i < 100; i++)
177 177 pool.Invoke(() => { Thread.Sleep(100000000); return 10; });
178 178 Thread.Sleep(200);
179 179 Assert.AreEqual(10, pool.PoolSize);
180 180
181 181 pool.Dispose();
182 182 }
183 183
184 184 [TestMethod]
185 185 public void WorkerPoolCorrectTest() {
186 186 var pool = new WorkerPool(0,1000,100);
187 187
188 188 const int iterations = 1000;
189 189 int pending = iterations;
190 190 var stop = new ManualResetEvent(false);
191 191
192 192 var count = 0;
193 193 for (int i = 0; i < iterations; i++) {
194 194 pool
195 195 .Invoke(() => 1)
196 196 .Then(x => Interlocked.Add(ref count, x))
197 197 .Then(x => Math.Log10(x))
198 198 .On(() => {
199 199 Interlocked.Decrement(ref pending);
200 200 if (pending == 0)
201 201 stop.Set();
202 202 }, PromiseEventType.All);
203 203 }
204 204
205 205 stop.WaitOne();
206 206
207 207 Assert.AreEqual(iterations, count);
208 208 Console.WriteLine("Max threads: {0}", pool.MaxRunningThreads);
209 209 pool.Dispose();
210 210
211 211 }
212 212
213 213 [TestMethod]
214 214 public void WorkerPoolDisposeTest() {
215 215 var pool = new WorkerPool(5, 20);
216 216 Assert.AreEqual(5, pool.PoolSize);
217 217 pool.Dispose();
218 218 Thread.Sleep(500);
219 219 Assert.AreEqual(0, pool.PoolSize);
220 220 pool.Dispose();
221 221 }
222 222
223 223 [TestMethod]
224 224 public void MTQueueTest() {
225 225 var queue = new MTQueue<int>();
226 226 int res;
227 227
228 228 queue.Enqueue(10);
229 229 Assert.IsTrue(queue.TryDequeue(out res));
230 230 Assert.AreEqual(10, res);
231 231 Assert.IsFalse(queue.TryDequeue(out res));
232 232
233 233 for (int i = 0; i < 1000; i++)
234 234 queue.Enqueue(i);
235 235
236 236 for (int i = 0; i < 1000; i++) {
237 237 queue.TryDequeue(out res);
238 238 Assert.AreEqual(i, res);
239 239 }
240 240
241 241 int writers = 0;
242 242 int readers = 0;
243 243 var stop = new ManualResetEvent(false);
244 244 int total = 0;
245 245
246 246 const int itemsPerWriter = 10000;
247 247 const int writersCount = 10;
248 248
249 249 for (int i = 0; i < writersCount; i++) {
250 250 Interlocked.Increment(ref writers);
251 251 AsyncPool
252 252 .RunThread(() => {
253 253 for (int ii = 0; ii < itemsPerWriter; ii++) {
254 254 queue.Enqueue(1);
255 255 }
256 256 return 1;
257 257 })
258 258 .On(() => Interlocked.Decrement(ref writers), PromiseEventType.All);
259 259 }
260 260
261 261 for (int i = 0; i < 10; i++) {
262 262 Interlocked.Increment(ref readers);
263 263 AsyncPool
264 264 .RunThread(() => {
265 265 int t;
266 266 do {
267 267 while (queue.TryDequeue(out t))
268 268 Interlocked.Add(ref total, t);
269 269 } while (writers > 0);
270 270 return 1;
271 271 })
272 272 .On(() => {
273 273 Interlocked.Decrement(ref readers);
274 274 if (readers == 0)
275 275 stop.Set();
276 276 }, PromiseEventType.All);
277 277 }
278 278
279 279 stop.WaitOne();
280 280
281 281 Assert.AreEqual(100000, total);
282 282 }
283 283
284 284 [TestMethod]
285 285 public void AsyncQueueTest() {
286 286 var queue = new AsyncQueue<int>();
287 287 int res;
288 288
289 289 queue.Enqueue(10);
290 290 Assert.IsTrue(queue.TryDequeue(out res));
291 291 Assert.AreEqual(10, res);
292 292 Assert.IsFalse(queue.TryDequeue(out res));
293 293
294 294 for (int i = 0; i < 1000; i++)
295 295 queue.Enqueue(i);
296 296
297 297 for (int i = 0; i < 1000; i++) {
298 298 queue.TryDequeue(out res);
299 299 Assert.AreEqual(i, res);
300 300 }
301 301
302 302 const int count = 10000000;
303 303
304 304 int res1 = 0, res2 = 0;
305 305 var t1 = Environment.TickCount;
306 306
307 307 AsyncPool.RunThread(
308 308 () => {
309 309 for (var i = 0; i < count; i++)
310 310 queue.Enqueue(1);
311 311 Console.WriteLine("done writer #1: {0} ms", Environment.TickCount - t1);
312 312 },
313 313 () => {
314 314 for (var i = 0; i < count; i++)
315 315 queue.Enqueue(2);
316 316 Console.WriteLine("done writer #2: {0} ms", Environment.TickCount - t1);
317 317 },
318 318 () => {
319 319 int temp;
320 320 int i = 0;
321 321 while (i < count)
322 322 if (queue.TryDequeue(out temp)) {
323 323 i++;
324 324 res1 += temp;
325 325 }
326 326 Console.WriteLine("done reader #1: {0} ms", Environment.TickCount - t1);
327 327 },
328 328 () => {
329 329 int temp;
330 330 int i = 0;
331 331 while (i < count)
332 332 if (queue.TryDequeue(out temp)) {
333 333 i++;
334 334 res2 += temp;
335 335 }
336 336 Console.WriteLine("done reader #2: {0} ms", Environment.TickCount - t1);
337 337 }
338 338 )
339 339 .Bundle()
340 340 .Join();
341 341
342 342 Assert.AreEqual(count * 3, res1 + res2);
343 343
344 344 Console.WriteLine(
345 345 "done: {0} ms, summ#1: {1}, summ#2: {2}, total: {3}, count: {4}",
346 346 Environment.TickCount - t1,
347 347 res1,
348 348 res2,
349 349 res1 + res2,
350 350 count
351 351 );
352 352 }
353 353
354 354 [TestMethod]
355 355 public void AsyncQueueBatchTest() {
356 356 var queue = new AsyncQueue<int>();
357 357
358 358 const int wBatch = 29;
359 359 const int wCount = 400000;
360 360 const int total = wBatch * wCount * 2;
361 361 const int summ = wBatch * wCount * 3;
362 362
363 363 int r1 = 0, r2 = 0;
364 364 const int rBatch = 111;
365 365 int read = 0;
366 366
367 367 var t1 = Environment.TickCount;
368 368
369 369 AsyncPool.RunThread(
370 370 () => {
371 371 var buffer = new int[wBatch];
372 372 for(int i = 0; i<wBatch; i++)
373 373 buffer[i] = 1;
374 374
375 375 for(int i =0; i < wCount; i++)
376 376 queue.EnqueueRange(buffer,0,wBatch);
377 377 Console.WriteLine("done writer #1: {0} ms", Environment.TickCount - t1);
378 378 },
379 379 () => {
380 380 var buffer = new int[wBatch];
381 381 for(int i = 0; i<wBatch; i++)
382 382 buffer[i] = 2;
383 383
384 384 for(int i =0; i < wCount; i++)
385 385 queue.EnqueueRange(buffer,0,wBatch);
386 386 Console.WriteLine("done writer #2: {0} ms", Environment.TickCount - t1);
387 387 },
388 388 () => {
389 389 var buffer = new int[rBatch];
390 390
391 391 while(read < total) {
392 392 int actual;
393 393 if (queue.TryDequeueRange(buffer,0,rBatch,out actual)) {
394 394 for(int i=0; i< actual; i++)
395 395 r1 += buffer[i];
396 396 Interlocked.Add(ref read, actual);
397 397 }
398 398 }
399 399
400 400 Console.WriteLine("done reader #1: {0} ms", Environment.TickCount - t1);
401 401 },
402 402 () => {
403 403 var buffer = new int[rBatch];
404 404
405 405 while(read < total) {
406 406 int actual;
407 407 if (queue.TryDequeueRange(buffer,0,rBatch,out actual)) {
408 408 for(int i=0; i< actual; i++)
409 409 r2 += buffer[i];
410 410 Interlocked.Add(ref read, actual);
411 411 }
412 412 }
413 413
414 414 Console.WriteLine("done reader #2: {0} ms", Environment.TickCount - t1);
415 415 }
416 416 )
417 417 .Bundle()
418 418 .Join();
419 419
420 420 Assert.AreEqual(summ , r1 + r2);
421 421
422 422 Console.WriteLine(
423 423 "done: {0} ms, summ#1: {1}, summ#2: {2}, total: {3}, count: {4}",
424 424 Environment.TickCount - t1,
425 425 r1,
426 426 r2,
427 427 r1 + r2,
428 428 total
429 429 );
430 430 }
431 431
432 432 [TestMethod]
433 433 public void AsyncQueueChunkDequeueTest() {
434 434 var queue = new AsyncQueue<int>();
435 435
436 436 const int wBatch = 31;
437 437 const int wCount = 200000;
438 438 const int total = wBatch * wCount * 3;
439 439 const int summ = wBatch * wCount * 6;
440 440
441 441 int r1 = 0, r2 = 0;
442 442 const int rBatch = 1024;
443 443 int read = 0;
444 444
445 445 var t1 = Environment.TickCount;
446 446
447 447 AsyncPool.RunThread(
448 448 () => {
449 449 var buffer = new int[wBatch];
450 450 for(int i = 0; i<wBatch; i++)
451 451 buffer[i] = 1;
452 452
453 453 for(int i =0; i < wCount; i++)
454 454 queue.EnqueueRange(buffer,0,wBatch);
455 455 Console.WriteLine("done writer #1: {0} ms", Environment.TickCount - t1);
456 456 },
457 457 () => {
458 458 var buffer = new int[wBatch];
459 459 for(int i = 0; i<wBatch; i++)
460 460 buffer[i] = 2;
461 461
462 462 for(int i =0; i < wCount; i++)
463 463 queue.EnqueueRange(buffer,0,wBatch);
464 464 Console.WriteLine("done writer #2: {0} ms", Environment.TickCount - t1);
465 465 },
466 466 () => {
467 467 var buffer = new int[wBatch];
468 468 for(int i = 0; i<wBatch; i++)
469 469 buffer[i] = 3;
470 470
471 471 for(int i =0; i < wCount; i++)
472 472 queue.EnqueueRange(buffer,0,wBatch);
473 473 Console.WriteLine("done writer #3: {0} ms", Environment.TickCount - t1);
474 474 },
475 475 () => {
476 476 var buffer = new int[rBatch];
477 477 int count = 1;
478 478 double avgchunk = 0;
479 479 while(read < total) {
480 480 int actual;
481 481 if (queue.TryDequeueChunk(buffer,0,rBatch,out actual)) {
482 482 for(int i=0; i< actual; i++)
483 483 r2 += buffer[i];
484 484 Interlocked.Add(ref read, actual);
485 485 avgchunk = avgchunk*(count-1)/count + actual/(double)count;
486 486 count ++;
487 487 }
488 488 }
489 489
490 490 Console.WriteLine("done reader #2: {0} ms, avg chunk size: {1}", Environment.TickCount - t1, avgchunk);
491 491 }
492 492 )
493 493 .Bundle()
494 494 .Join();
495 495
496 496 Assert.AreEqual(summ , r1 + r2);
497 497
498 498 Console.WriteLine(
499 499 "done: {0} ms, summ#1: {1}, summ#2: {2}, total: {3}, count: {4}",
500 500 Environment.TickCount - t1,
501 501 r1,
502 502 r2,
503 503 r1 + r2,
504 504 total
505 505 );
506 506 }
507 507
508 508 [TestMethod]
509 509 public void AsyncQueueDrainTest() {
510 510 var queue = new AsyncQueue<int>();
511 511
512 512 const int wBatch = 11;
513 513 const int wCount = 200000;
514 514 const int total = wBatch * wCount * 3;
515 515 const int summ = wBatch * wCount * 3;
516 516
517 517 int r1 = 0, r2 = 0;
518 518 const int rBatch = 11;
519 519 int read = 0;
520 520
521 521 var t1 = Environment.TickCount;
522 522
523 523 AsyncPool.RunThread(
524 524 () => {
525 525 var buffer = new int[wBatch];
526 526 for(int i = 0; i<wBatch; i++)
527 527 buffer[i] = 1;
528 528
529 529 for(int i =0; i < wCount; i++)
530 530 queue.EnqueueRange(buffer,0,wBatch);
531 531 Console.WriteLine("done writer #1: {0} ms", Environment.TickCount - t1);
532 532 },
533 533 () => {
534 534 for(int i =0; i < wCount * wBatch; i++)
535 535 queue.Enqueue(1);
536 536 Console.WriteLine("done writer #2: {0} ms", Environment.TickCount - t1);
537 537 },
538 538 () => {
539 539 var buffer = new int[wBatch];
540 540 for(int i = 0; i<wBatch; i++)
541 541 buffer[i] = 1;
542 542
543 543 for(int i =0; i < wCount; i++)
544 544 queue.EnqueueRange(buffer,0,wBatch);
545 545 Console.WriteLine("done writer #3: {0} ms", Environment.TickCount - t1);
546 546 },
547 547 /*() => {
548 548 int temp;
549 549 int count = 0;
550 550 while (read < total)
551 551 if (queue.TryDequeue(out temp)) {
552 552 count++;
553 553 r1 += temp;
554 554 Interlocked.Increment(ref read);
555 555 }
556 556 Console.WriteLine("done reader #1: {0} ms, {1} count", Environment.TickCount - t1, count);
557 557 },*/
558 558 /*() => {
559 559 var buffer = new int[rBatch];
560 560 var count = 0;
561 561 while(read < total) {
562 562 int actual;
563 563 if (queue.TryDequeueRange(buffer,0,rBatch,out actual)) {
564 564 for(int i=0; i< actual; i++)
565 565 r1 += buffer[i];
566 566 Interlocked.Add(ref read, actual);
567 567 count += actual;
568 568 }
569 569 }
570 570
571 571 Console.WriteLine("done reader #1: {0} ms, {1} items", Environment.TickCount - t1, count);
572 572 },*/
573 573 () => {
574 574 var count = 0;
575 575 while(read < total) {
576 576 var buffer = queue.Drain();
577 577 for(int i=0; i< buffer.Length; i++)
578 578 r1 += buffer[i];
579 579 Interlocked.Add(ref read, buffer.Length);
580 580 count += buffer.Length;
581 581 }
582 582 Console.WriteLine("done reader #1: {0} ms, {1} items", Environment.TickCount - t1, count);
583 583 },
584 584 () => {
585 585 var count = 0;
586 586 while(read < total) {
587 587 var buffer = queue.Drain();
588 588 for(int i=0; i< buffer.Length; i++)
589 589 r2 += buffer[i];
590 590 Interlocked.Add(ref read, buffer.Length);
591 591 count += buffer.Length;
592 592 }
593 593 Console.WriteLine("done reader #2: {0} ms, {1} items", Environment.TickCount - t1, count);
594 594 }
595 595 )
596 596 .Bundle()
597 597 .Join();
598 598
599 599 Assert.AreEqual(summ , r1 + r2);
600 600
601 601 Console.WriteLine(
602 602 "done: {0} ms, summ#1: {1}, summ#2: {2}, total: {3}, count: {4}",
603 603 Environment.TickCount - t1,
604 604 r1,
605 605 r2,
606 606 r1 + r2,
607 607 total
608 608 );
609 609 }
610 610
611 611 [TestMethod]
612 612 public void ParallelMapTest() {
613 613
614 614 const int count = 100000;
615 615
616 616 var args = new double[count];
617 617 var rand = new Random();
618 618
619 619 for (int i = 0; i < count; i++)
620 620 args[i] = rand.NextDouble();
621 621
622 622 var t = Environment.TickCount;
623 623 var res = args.ParallelMap(x => Math.Sin(x*x), 4).Join();
624 624
625 625 Console.WriteLine("Map complete in {0} ms", Environment.TickCount - t);
626 626
627 627 t = Environment.TickCount;
628 628 for (int i = 0; i < count; i++)
629 629 Assert.AreEqual(Math.Sin(args[i] * args[i]), res[i]);
630 630 Console.WriteLine("Verified in {0} ms", Environment.TickCount - t);
631 631 }
632 632
633 633 [TestMethod]
634 634 public void ChainedMapTest() {
635 635
636 636 using (var pool = new WorkerPool()) {
637 637 const int count = 10000;
638 638
639 639 var args = new double[count];
640 640 var rand = new Random();
641 641
642 642 for (int i = 0; i < count; i++)
643 643 args[i] = rand.NextDouble();
644 644
645 645 var t = Environment.TickCount;
646 646 var res = args
647 647 .ChainedMap(
648 648 // Analysis disable once AccessToDisposedClosure
649 649 x => pool.Invoke(
650 650 () => Math.Sin(x * x)
651 651 ),
652 652 4
653 653 )
654 654 .Join();
655 655
656 656 Console.WriteLine("Map complete in {0} ms", Environment.TickCount - t);
657 657
658 658 t = Environment.TickCount;
659 659 for (int i = 0; i < count; i++)
660 660 Assert.AreEqual(Math.Sin(args[i] * args[i]), res[i]);
661 661 Console.WriteLine("Verified in {0} ms", Environment.TickCount - t);
662 662 Console.WriteLine("Max workers: {0}", pool.MaxRunningThreads);
663 663 }
664 664 }
665 665
666 666 [TestMethod]
667 667 public void ParallelForEachTest() {
668 668
669 669 const int count = 100000;
670 670
671 671 var args = new int[count];
672 672 var rand = new Random();
673 673
674 674 for (int i = 0; i < count; i++)
675 675 args[i] = (int)(rand.NextDouble() * 100);
676 676
677 677 int result = 0;
678 678
679 679 var t = Environment.TickCount;
680 680 args.ParallelForEach(x => Interlocked.Add(ref result, x), 4).Join();
681 681
682 682 Console.WriteLine("Iteration complete in {0} ms, result: {1}", Environment.TickCount - t, result);
683 683
684 684 int result2 = 0;
685 685
686 686 t = Environment.TickCount;
687 687 for (int i = 0; i < count; i++)
688 688 result2 += args[i];
689 689 Assert.AreEqual(result2, result);
690 690 Console.WriteLine("Verified in {0} ms", Environment.TickCount - t);
691 691 }
692 692
693 693 [TestMethod]
694 694 public void ComplexCase1Test() {
695 695 var flags = new bool[3];
696 696
697 697 // op1 (aync 200ms) => op2 (async 200ms) => op3 (sync map)
698 698
699 699 var step1 = PromiseHelper
700 700 .Sleep(200, "Alan")
701 701 .On(() => flags[0] = true, PromiseEventType.Cancelled);
702 702 var p = step1
703 703 .Chain(x =>
704 704 PromiseHelper
705 705 .Sleep(200, "Hi, " + x)
706 706 .Then(y => y)
707 707 .On(() => flags[1] = true, PromiseEventType.Cancelled)
708 708 )
709 709 .On(() => flags[2] = true, PromiseEventType.Cancelled);
710 710 step1.Join();
711 711 p.Cancel();
712 712 try {
713 713 Assert.AreEqual(p.Join(), "Hi, Alan");
714 714 Assert.Fail("Shouldn't get here");
715 715 } catch (OperationCanceledException) {
716 716 }
717 717
718 718 Assert.IsFalse(flags[0]);
719 719 Assert.IsTrue(flags[1]);
720 720 Assert.IsTrue(flags[2]);
721 721 }
722 722
723 723 [TestMethod]
724 724 public void ChainedCancel1Test() {
725 725 // ΠΏΡ€ΠΈ ΠΎΡ‚ΠΌΠ΅Π½Π΅ сцСплСнной асинхронной ΠΎΠΏΠ΅Ρ€Π°Ρ†ΠΈΠΈ всС ΠΎΠ±Π΅Ρ‰Π°Π½ΠΈΠ΅ Π΄ΠΎΠ»ΠΆΠ½ΠΎ
726 726 // Π·Π°Π²Π΅Ρ€ΡˆΠ°Ρ‚ΡŒΡΡ ошибкой OperationCanceledException
727 727 var p = PromiseHelper
728 728 .Sleep(1, "Hi, HAL!")
729 729 .Then(x => {
730 730 // запускаСм Π΄Π²Π΅ асинхронныС ΠΎΠΏΠ΅Ρ€Π°Ρ†ΠΈΠΈ
731 731 var result = PromiseHelper.Sleep(1000, "HEM ENABLED!!!");
732 732 // вторая опСрация отмСняСт ΠΏΠ΅Ρ€Π²ΡƒΡŽ Π΄ΠΎ Π·Π°Π²Π΅Ρ€ΡˆΠ΅Π½ΠΈΡ
733 733 PromiseHelper
734 734 .Sleep(100, "HAL, STOP!")
735 735 .Then(result.Cancel);
736 736 return result;
737 737 });
738 738 try {
739 739 p.Join();
740 740 } catch (TargetInvocationException err) {
741 741 Assert.IsTrue(err.InnerException is OperationCanceledException);
742 742 }
743 743 }
744 744
745 745 [TestMethod]
746 746 public void ChainedCancel2Test() {
747 747 // ΠΏΡ€ΠΈ ΠΎΡ‚ΠΌΠ΅Π½Π΅ Ρ†Π΅ΠΏΠΎΡ‡ΠΊΠΈ ΠΎΠ±Π΅Ρ‰Π°Π½ΠΈΠΉ, Π²Π»ΠΎΠΆΠ΅Π½Π½Ρ‹Π΅ ΠΎΠΏΠ΅Ρ€Π°Ρ†ΠΈΠΈ Ρ‚Π°ΠΊΠΆΠ΅ Π΄ΠΎΠ»ΠΆΠ½Ρ‹ ΠΎΡ‚ΠΌΠ΅Π½ΡΡ‚ΡŒΡΡ
748 748 var pSurvive = new Promise<bool>();
749 var hemStarted = new ManualResetEvent(false);
749 var hemStarted = new Signal();
750 750 var p = PromiseHelper
751 751 .Sleep(1, "Hi, HAL!")
752 .Chain(x => {
752 .Chain(() => {
753 753 hemStarted.Set();
754 754 // запускаСм Π΄Π²Π΅ асинхронныС ΠΎΠΏΠ΅Ρ€Π°Ρ†ΠΈΠΈ
755 755 var result = PromiseHelper
756 .Sleep(100000000, "HEM ENABLED!!!")
757 .Then(s => {
758 pSurvive.Resolve(false);
759 return s;
760 });
756 .Sleep(2000, "HEM ENABLED!!!")
757 .Then(() => pSurvive.Resolve(false));
761 758
762 759 result
763 760 .On(() => pSurvive.Resolve(true), PromiseEventType.Cancelled);
764
761
765 762 return result;
766 763 });
767 764
768 hemStarted.WaitOne();
765 hemStarted.Wait();
769 766 p.Cancel();
770 767
771 768 try {
772 769 p.Join();
770 Assert.Fail();
773 771 } catch (OperationCanceledException) {
774 Assert.IsTrue(pSurvive.Join());
775 772 }
773 Assert.IsTrue(pSurvive.Join());
776 774 }
777 775
778 776 [TestMethod]
779 777 public void SharedLockTest() {
780 778 var l = new SharedLock();
781 779 int shared = 0;
782 780 int exclusive = 0;
783 781 var s1 = new Signal();
784 782 var log = new AsyncQueue<string>();
785 783
786 784 try {
787 785 AsyncPool.RunThread(
788 786 () => {
789 787 log.Enqueue("Reader #1 started");
790 788 try {
791 789 l.LockShared();
792 790 log.Enqueue("Reader #1 lock got");
793 791 if (Interlocked.Increment(ref shared) == 2)
794 792 s1.Set();
795 793 s1.Wait();
796 794 log.Enqueue("Reader #1 finished");
797 795 Interlocked.Decrement(ref shared);
798 796 } finally {
799 797 l.Release();
800 798 log.Enqueue("Reader #1 lock released");
801 799 }
802 800 },
803 801 () => {
804 802 log.Enqueue("Reader #2 started");
805 803
806 804 try {
807 805 l.LockShared();
808 806 log.Enqueue("Reader #2 lock got");
809 807
810 808 if (Interlocked.Increment(ref shared) == 2)
811 809 s1.Set();
812 810 s1.Wait();
813 811 log.Enqueue("Reader #2 upgrading to writer");
814 812 Interlocked.Decrement(ref shared);
815 813 l.Upgrade();
816 814 log.Enqueue("Reader #2 upgraded");
817 815
818 816 Assert.AreEqual(1, Interlocked.Increment(ref exclusive));
819 817 Assert.AreEqual(0, shared);
820 818 log.Enqueue("Reader #2 finished");
821 819 Interlocked.Decrement(ref exclusive);
822 820 } finally {
823 821 l.Release();
824 822 log.Enqueue("Reader #2 lock released");
825 823 }
826 824 },
827 825 () => {
828 826 log.Enqueue("Writer #1 started");
829 827 try {
830 828 l.LockExclusive();
831 829 log.Enqueue("Writer #1 got the lock");
832 830 Assert.AreEqual(1, Interlocked.Increment(ref exclusive));
833 831 Interlocked.Decrement(ref exclusive);
834 832 log.Enqueue("Writer #1 is finished");
835 833 } finally {
836 834 l.Release();
837 835 log.Enqueue("Writer #1 lock released");
838 836 }
839 837 }
840 838 ).Bundle().Join(1000);
841 839 log.Enqueue("Done");
842 840 } catch(Exception error) {
843 841 log.Enqueue(error.Message);
844 842 throw;
845 843 } finally {
846 844 foreach (var m in log)
847 845 Console.WriteLine(m);
848 846 }
849 847 }
850 848 }
851 849 }
852 850
@@ -1,144 +1,148
1 1 using System;
2 2 using Implab.Parallels;
3 3
4 4 #if MONO
5 5
6 6 using NUnit.Framework;
7 7 using TestClassAttribute = NUnit.Framework.TestFixtureAttribute;
8 8 using TestMethodAttribute = NUnit.Framework.TestAttribute;
9 9
10 10 #else
11 11
12 12 using Microsoft.VisualStudio.TestTools.UnitTesting;
13 13
14 14 #endif
15 15
16 16 namespace Implab.Test {
17 17 [TestClass]
18 18 public class CancelationTests {
19 19
20 20 [TestMethod]
21 21 public void PromiseCancelTest() {
22 22 var p = new Promise();
23 23 bool requested = false;
24 24 var reason = new Exception("Test");
25 25
26 26 // request cancelation
27 27 p.Cancel(reason);
28 28
29 29 Assert.IsTrue(p.IsCancellationRequested);
30 30 Assert.AreSame(reason, p.CancellationReason);
31 31 Assert.IsFalse(p.IsCancelled);
32 32
33 33 p.CancellationRequested(r => {
34 34 Assert.AreSame(reason, r);
35 35 requested = true;
36 36 });
37 37
38 38 Assert.IsTrue(requested);
39 39
40 40 // cancel the promise
41 41 Assert.IsTrue(p.CancelOperationIfRequested());
42 42 Assert.IsTrue(p.IsCancelled);
43 43 Assert.AreSame(reason, p.Error);
44 44 }
45 45
46 46 [TestMethod]
47 47 public void CancelActionBeforeStartTask() {
48 48 bool run = false;
49 49 var task = new ActionTask(() => {
50 50 run = true;
51 }, null, null);
51 }, null, null, true);
52 52
53 53 // request cancelation
54 54 task.Cancel();
55 55 Assert.IsTrue(task.IsCancelled);
56 56 task.Resolve();
57 57 Assert.IsFalse(run);
58 58 }
59 59
60 60 [TestMethod]
61 61 public void CancelActionAfterTaskStarted() {
62 62 var finish = new Signal();
63 63 var started = new Signal();
64 64
65 65 var task = new ActionTask(() => {
66 66 started.Set();
67 67 finish.Wait();
68 }, null, null);
68 }, null, null, true);
69 69
70 70 AsyncPool.RunThread(() => {
71 71 task.Resolve();
72 72 });
73 73
74 74 started.Wait(1000);
75 75
76 76 task.Cancel();
77 77 Assert.IsTrue(task.IsCancellationRequested);
78 78 Assert.IsFalse(task.IsCancelled);
79 79 Assert.IsFalse(task.IsResolved);
80 80
81 81 finish.Set();
82 82 task.Join(1000);
83 83
84 84 }
85 85
86 86 [TestMethod]
87 87 public void CancelTaskChainFromBottom() {
88 var started = new Signal();
88 89 var check1 = new Signal();
89 90 var requested = false;
90 91 var p1 = AsyncPool.RunThread(token => {
91 92 token.CancellationRequested(reason => requested = true);
93 started.Set();
92 94 check1.Wait();
93 95 token.CancelOperationIfRequested();
94 96 });
95 97
98 started.Wait();
99
96 100 var p2 = p1.Then(() => {
97 101 });
98 102
99 103 Assert.IsFalse(p1.IsResolved);
100 104 Assert.IsFalse(p2.IsResolved);
101 105
102 106 p2.Cancel();
103 107
104 108 Assert.IsFalse(p2.IsCancelled);
105 109 Assert.IsFalse(p1.IsCancelled);
106 110 Assert.IsTrue(requested);
107 111
108 112 check1.Set();
109 113
110 114 try {
111 115 p2.Join(1000);
112 116 Assert.Fail("The chain isn't cancelled");
113 117 } catch(OperationCanceledException){
114 118 }
115 119
116 120 Assert.IsTrue(p1.IsCancelled);
117 121 Assert.IsTrue(p2.IsCancelled);
118 122 }
119 123
120 124
121 125
122 126 [TestMethod]
123 127 public void CancellableAsyncTask() {
124 128 var finish = new Signal();
125 129 var started = new Signal();
126 130
127 131 var p = AsyncPool.RunThread(token => {
128 132 token.CancellationRequested(r => finish.Set());
129 133 started.Set();
130 134 finish.Wait();
131 135 Assert.IsTrue(token.CancelOperationIfRequested());
132 136 });
133 137
134 138 started.Wait(1000);
135 139 Assert.IsFalse(p.IsResolved);
136 140 p.Cancel();
137 141 try {
138 142 p.Join(1000);
139 143 } catch (OperationCanceledException) {
140 144 }
141 145 }
142 146 }
143 147 }
144 148
@@ -1,13 +1,14
1 1 using Implab.Parallels;
2 2 using System.Threading;
3 3
4 4 namespace Implab.Test {
5 5 static class PromiseHelper {
6 6 public static IPromise<T> Sleep<T>(int timeout, T retVal) {
7 return AsyncPool.Invoke(() => {
7 return AsyncPool.Invoke((ct) => {
8 ct.CancellationRequested(ct.CancelOperation);
8 9 Thread.Sleep(timeout);
9 10 return retVal;
10 11 });
11 12 }
12 13 }
13 14 }
@@ -1,23 +1,25
1 1 using System;
2 2
3 3 namespace Implab {
4 4 public class ActionChainTask : ActionChainTaskBase, IDeferred {
5 5 readonly Func<IPromise> m_task;
6 6
7 public ActionChainTask(Func<IPromise> task, Func<Exception, IPromise> error, Func<Exception, IPromise> cancel) : base(error,cancel) {
7 public ActionChainTask(Func<IPromise> task, Func<Exception, IPromise> error, Func<Exception, IPromise> cancel, bool autoCancellable) : base(error,cancel, autoCancellable) {
8 8 m_task = task;
9 9 }
10 10
11 11 public void Resolve() {
12 12 if (m_task != null && LockCancelation()) {
13 13 try {
14 Observe(m_task());
14 var p = m_task();
15 p.On(SetResult, HandleErrorInternal, SetCancelled);
16 CancellationRequested(p.Cancel);
15 17 } catch(Exception err) {
16 18 HandleErrorInternal(err);
17 19 }
18 20 }
19 21 }
20 22
21 23 }
22 24 }
23 25
@@ -1,62 +1,58
1 1 using System;
2 2 using System.Threading;
3 3
4 4 namespace Implab {
5 5 public class ActionChainTaskBase : AbstractPromise {
6 6 readonly Func<Exception, IPromise> m_error;
7 7 readonly Func<Exception, IPromise> m_cancel;
8 8
9 9 int m_cancelationLock;
10 10
11 protected ActionChainTaskBase( Func<Exception, IPromise> error, Func<Exception, IPromise> cancel) {
11 protected ActionChainTaskBase(Func<Exception, IPromise> error, Func<Exception, IPromise> cancel, bool autoCancellable) {
12 12 m_error = error;
13 13 m_cancel = cancel;
14 if (autoCancellable)
15 CancellationRequested(CancelOperation);
14 16 }
15 17
16 18 public void Reject(Exception error) {
17 19 if (LockCancelation())
18 20 HandleErrorInternal(error);
19 21 }
20 22
21 23
22 24
23 25 public override void CancelOperation(Exception reason) {
24 if (m_cancel != null && LockCancelation()) {
25 try {
26 Observe(m_cancel(reason));
27 } catch(Exception err) {
28 HandleErrorInternal(err);
26 if (LockCancelation()) {
27 if (m_cancel != null) {
28 try {
29 m_cancel(reason).On(SetResult, SetError, SetCancelled);
30 } catch (Exception err) {
31 HandleErrorInternal(err);
32 }
33 } else {
34 SetCancelled(reason);
29 35 }
30 36 }
31
32 37 }
33 38
34 39 protected void HandleErrorInternal(Exception error) {
35 40 if (m_error != null) {
36 41 try {
37 Observe(m_error(error));
38 } catch(Exception err) {
42 var p = m_error(error);
43 p.On(SetResult,SetError,SetCancelled);
44 CancellationRequested(p.Cancel);
45 } catch (Exception err) {
39 46 SetError(err);
40 47 }
41 48 } else {
42 49 SetError(error);
43 50 }
44 51 }
45 52
46 protected void Observe(IPromise operation) {
47 if (operation == null)
48 throw new NullReferenceException("The task returned null promise");
49
50 // pass operation results to the current promise
51 operation.On(SetResult, SetError, SetCancelled);
52
53 // pass the cancelation request
54 CancellationRequested(operation.Cancel);
55 }
56
57 53 protected bool LockCancelation() {
58 54 return 0 == Interlocked.CompareExchange(ref m_cancelationLock, 1, 0);
59 55 }
60 56 }
61 57 }
62 58
@@ -1,23 +1,25
1 1 using System;
2 2
3 3 namespace Implab {
4 4 public class ActionChainTask<T> : ActionChainTaskBase, IDeferred<T> {
5 5 readonly Func<T, IPromise> m_task;
6 6
7 public ActionChainTask(Func<T, IPromise> task, Func<Exception, IPromise> error, Func<Exception, IPromise> cancel) : base(error,cancel) {
7 public ActionChainTask(Func<T, IPromise> task, Func<Exception, IPromise> error, Func<Exception, IPromise> cancel, bool autoCancellable) : base(error,cancel, autoCancellable) {
8 8 m_task = task;
9 9 }
10 10
11 11 public void Resolve(T value) {
12 12 if (m_task != null && LockCancelation()) {
13 13 try {
14 Observe(m_task(value));
14 var p = m_task(value);
15 p.On(SetResult, HandleErrorInternal, SetCancelled);
16 CancellationRequested(p.Cancel);
15 17 } catch(Exception err) {
16 18 HandleErrorInternal(err);
17 19 }
18 20 }
19 21 }
20 22
21 23 }
22 24 }
23 25
@@ -1,22 +1,22
1 1 using System;
2 2
3 3 namespace Implab {
4 4 public class ActionTask : ActionTaskBase, IDeferred {
5 5 readonly Action m_task;
6 public ActionTask(Action task, Action<Exception> error, Action<Exception> cancel) : base(error,cancel) {
6 public ActionTask(Action task, Action<Exception> error, Action<Exception> cancel, bool autoCancellable) : base(error,cancel, autoCancellable) {
7 7 m_task = task;
8 8 }
9 9
10 10 public void Resolve() {
11 11 if (m_task != null && LockCancelation()) {
12 12 try {
13 13 m_task();
14 14 SetResult();
15 15 } catch(Exception err) {
16 16 HandleErrorInternal(err);
17 17 }
18 18 }
19 19 }
20 20 }
21 21 }
22 22
@@ -1,55 +1,57
1 1 using System;
2 2 using System.Threading;
3 3
4 4 namespace Implab {
5 5 public class ActionTaskBase : AbstractPromise {
6 6 readonly Action<Exception> m_cancel;
7 7 readonly Action<Exception> m_error;
8 8
9 9 int m_cancelationLock;
10 10
11 protected ActionTaskBase( Action<Exception> error, Action<Exception> cancel) {
11 protected ActionTaskBase( Action<Exception> error, Action<Exception> cancel, bool autoCancellable) {
12 12 m_error = error;
13 13 m_cancel = cancel;
14 if (autoCancellable)
15 CancellationRequested(CancelOperation);
14 16 }
15 17
16 18 public void Reject(Exception error) {
17 19 Safe.ArgumentNotNull(error, "error");
18 20 if (LockCancelation())
19 21 HandleErrorInternal(error);
20 22 }
21 23
22 24 protected void HandleErrorInternal(Exception error) {
23 25 if (m_error != null) {
24 26 try {
25 27 m_error(error);
26 28 SetResult();
27 29 } catch(Exception err) {
28 30 SetError(err);
29 31 }
30 32 } else {
31 33 SetError(error);
32 34 }
33 35 }
34 36
35 37 public override void CancelOperation(Exception reason) {
36 38 if (LockCancelation()) {
37 39 if (m_cancel != null) {
38 40 try {
39 41 m_cancel(reason);
40 42 SetResult();
41 43 } catch (Exception err) {
42 44 HandleErrorInternal(err);
43 45 }
44 46 } else {
45 47 SetCancelled(reason);
46 48 }
47 49 }
48 50 }
49 51
50 52 protected bool LockCancelation() {
51 53 return 0 == Interlocked.CompareExchange(ref m_cancelationLock, 1, 0);
52 54 }
53 55 }
54 56 }
55 57
@@ -1,22 +1,22
1 1 using System;
2 2
3 3 namespace Implab {
4 4 public class ActionTask<T> : ActionTaskBase, IDeferred<T> {
5 5 readonly Action<T> m_task;
6 public ActionTask(Action<T> task, Action<Exception> error, Action<Exception> cancel) : base(error,cancel) {
6 public ActionTask(Action<T> task, Action<Exception> error, Action<Exception> cancel, bool autoCancellable) : base(error,cancel, autoCancellable) {
7 7 m_task = task;
8 8 }
9 9
10 10 public void Resolve(T value) {
11 11 if (m_task != null && LockCancelation()) {
12 12 try {
13 13 m_task(value);
14 14 SetResult();
15 15 } catch(Exception err) {
16 16 HandleErrorInternal(err);
17 17 }
18 18 }
19 19 }
20 20 }
21 21 }
22 22
@@ -1,21 +1,24
1 1 using System;
2 2
3 3 namespace Implab {
4 4 public class FuncChainTask<TResult> : FuncChainTaskBase<TResult>, IDeferred {
5 5 readonly Func<IPromise<TResult>> m_task;
6 6
7 public FuncChainTask(Func<IPromise<TResult>> task, Func<Exception, IPromise<TResult>> error, Func<Exception, IPromise<TResult>> cancel) : base(error, cancel){
7 public FuncChainTask(Func<IPromise<TResult>> task, Func<Exception, IPromise<TResult>> error, Func<Exception, IPromise<TResult>> cancel, bool autoCancellable)
8 : base(error, cancel, autoCancellable) {
8 9 m_task = task;
9 10 }
10 11
11 12 public void Resolve() {
12 13 if (m_task != null && LockCancelation()) {
13 14 try {
14 Observe(m_task());
15 var operation = m_task();
16 operation.On(SetResult, HandleErrorInternal, SetCancelled);
17 CancellationRequested(operation.Cancel);
15 18 } catch (Exception err) {
16 19 HandleErrorInternal(err);
17 20 }
18 21 }
19 22 }
20 23 }
21 24 } No newline at end of file
@@ -1,60 +1,58
1 1 using System;
2 2 using System.Threading;
3 3
4 4 namespace Implab {
5 5 public class FuncChainTaskBase<TResult> : AbstractPromise<TResult> {
6 6 readonly Func<Exception, IPromise<TResult>> m_error;
7 7 readonly Func<Exception, IPromise<TResult>> m_cancel;
8 8
9 9 int m_cancelationLock;
10 10
11 protected FuncChainTaskBase( Func<Exception, IPromise<TResult>> error, Func<Exception, IPromise<TResult>> cancel) {
11 protected FuncChainTaskBase( Func<Exception, IPromise<TResult>> error, Func<Exception, IPromise<TResult>> cancel, bool autoCancellable) {
12 12 m_error = error;
13 13 m_cancel = cancel;
14 if (autoCancellable)
15 CancellationRequested(CancelOperation);
14 16 }
15 17
16 18 public void Reject(Exception error) {
17 19 if (LockCancelation())
18 20 HandleErrorInternal(error);
19 21 }
20 22
21 23 public override void CancelOperation(Exception reason) {
22 if (m_cancel != null && LockCancelation()) {
23 try {
24 Observe(m_cancel(reason));
25 } catch(Exception err) {
26 HandleErrorInternal(err);
24 if (LockCancelation()) {
25 if (m_cancel != null) {
26 try {
27 m_cancel(reason).On(SetResult, HandleErrorInternal, SetCancelled);
28 } catch (Exception err) {
29 HandleErrorInternal(err);
30 }
31 } else {
32 SetCancelled(reason);
27 33 }
28 34 }
29 35
30 36 }
31 37
32 38 protected void HandleErrorInternal(Exception error) {
33 39 if (m_error != null) {
34 40 try {
35 Observe(m_error(error));
41 var operation = m_error(error);
42
43 operation.On(SetResult, SetError, SetCancelled);
44 CancellationRequested(operation.Cancel);
36 45 } catch(Exception err) {
37 46 SetError(err);
38 47 }
39 48 } else {
40 49 SetError(error);
41 50 }
42 51 }
43 52
44 protected void Observe(IPromise<TResult> operation) {
45 if (operation == null)
46 throw new NullReferenceException("The task returned null promise");
47
48 // pass operation results to the current promise
49 operation.On(SetResult, SetError, SetCancelled);
50
51 // pass the cancelation request
52 CancellationRequested(operation.Cancel);
53 }
54
55 53 protected bool LockCancelation() {
56 54 return 0 == Interlocked.CompareExchange(ref m_cancelationLock, 1, 0);
57 55 }
58 56 }
59 57 }
60 58
@@ -1,21 +1,23
1 1 using System;
2 2
3 3 namespace Implab {
4 4 public class FuncChainTask<TArg,TResult> : FuncChainTaskBase<TResult>, IDeferred<TArg> {
5 5 readonly Func<TArg, IPromise<TResult>> m_task;
6 6
7 public FuncChainTask(Func<TArg, IPromise<TResult>> task, Func<Exception, IPromise<TResult>> error, Func<Exception, IPromise<TResult>> cancel) : base(error, cancel){
7 public FuncChainTask(Func<TArg, IPromise<TResult>> task, Func<Exception, IPromise<TResult>> error, Func<Exception, IPromise<TResult>> cancel, bool autoCancellable) : base(error, cancel, autoCancellable){
8 8 m_task = task;
9 9 }
10 10
11 11 public void Resolve(TArg value) {
12 12 if (m_task != null && LockCancelation()) {
13 13 try {
14 Observe(m_task(value));
14 var operation = m_task(value);
15 operation.On(SetResult, HandleErrorInternal, SetCancelled);
16 CancellationRequested(operation.Cancel);
15 17 } catch (Exception err) {
16 18 HandleErrorInternal(err);
17 19 }
18 20 }
19 21 }
20 22 }
21 23 } No newline at end of file
@@ -1,23 +1,23
1 1 using System;
2 2 using System.Threading;
3 3
4 4 namespace Implab {
5 5 public class FuncTask<T> : FuncTaskBase<T>, IDeferred {
6 6 readonly Func<T> m_task;
7 7
8 public FuncTask(Func<T> task, Func<Exception, T> error, Func<Exception, T> cancel) : base(error,cancel) {
8 public FuncTask(Func<T> task, Func<Exception, T> error, Func<Exception, T> cancel, bool autoCancellable) : base(error, cancel, autoCancellable) {
9 9 m_task = task;
10 10 }
11 11
12 12 public void Resolve() {
13 13 if (m_task != null && LockCancelation()) {
14 14 try {
15 15 SetResult(m_task());
16 16 } catch(Exception err) {
17 17 HandleErrorInternal(err);
18 18 }
19 19 }
20 20 }
21 21 }
22 22 }
23 23
@@ -1,53 +1,55
1 1 using System;
2 2 using System.Threading;
3 3
4 4 namespace Implab {
5 5 public class FuncTaskBase<TResult> : AbstractPromise<TResult> {
6 6 readonly Func<Exception, TResult> m_cancel;
7 7 readonly Func<Exception, TResult> m_error;
8 8
9 9 int m_cancelationLock;
10 10
11 protected FuncTaskBase( Func<Exception, TResult> error, Func<Exception, TResult> cancel) {
11 protected FuncTaskBase( Func<Exception, TResult> error, Func<Exception, TResult> cancel, bool autoCancellable) {
12 12 m_error = error;
13 13 m_cancel = cancel;
14 if (autoCancellable)
15 CancellationRequested(CancelOperation);
14 16 }
15 17
16 18 public void Reject(Exception error) {
17 19 Safe.ArgumentNotNull(error, "error");
18 20 if (LockCancelation())
19 21 HandleErrorInternal(error);
20 22 }
21 23
22 24 protected void HandleErrorInternal(Exception error) {
23 25 if (m_error != null) {
24 26 try {
25 27 SetResult(m_error(error));
26 28 } catch(Exception err) {
27 29 SetError(err);
28 30 }
29 31 } else {
30 32 SetError(error);
31 33 }
32 34 }
33 35
34 36 public override void CancelOperation(Exception reason) {
35 37 if (LockCancelation()) {
36 38 if (m_cancel != null) {
37 39 try {
38 40 SetResult(m_cancel(reason));
39 41 } catch (Exception err) {
40 42 HandleErrorInternal(err);
41 43 }
42 44 } else {
43 45 SetCancelled(reason);
44 46 }
45 47 }
46 48 }
47 49
48 50 protected bool LockCancelation() {
49 51 return 0 == Interlocked.CompareExchange(ref m_cancelationLock, 1, 0);
50 52 }
51 53 }
52 54 }
53 55
@@ -1,22 +1,22
1 1 using System;
2 2
3 3 namespace Implab {
4 4 public class FuncTask<TArg, TResult> : FuncTaskBase<TResult>, IDeferred<TArg> {
5 5 readonly Func<TArg, TResult> m_task;
6 6
7 public FuncTask(Func<TArg, TResult> task, Func<Exception, TResult> error,Func<Exception, TResult> cancel) : base(error,cancel) {
7 public FuncTask(Func<TArg, TResult> task, Func<Exception, TResult> error,Func<Exception, TResult> cancel, bool autoCancellable) : base(error,cancel, autoCancellable) {
8 8 m_task = task;
9 9 }
10 10
11 11 public void Resolve(TArg value) {
12 12 if (m_task != null && LockCancelation()) {
13 13 try {
14 14 SetResult(m_task(value));
15 15 } catch (Exception err) {
16 16 HandleErrorInternal(err);
17 17 }
18 18 }
19 19 }
20 20 }
21 21 }
22 22
@@ -1,82 +1,152
1 1 using System;
2 2 using System.Threading;
3 3 using System.Diagnostics;
4 4 using Implab.Diagnostics;
5 5
6 6 namespace Implab.Parallels {
7 7 public class WorkerPool : DispatchPool<Action> {
8 8
9 9 AsyncQueue<Action> m_queue = new AsyncQueue<Action>();
10 10 int m_queueLength;
11 11 readonly int m_threshold = 1;
12 12
13 13 public WorkerPool(int minThreads, int maxThreads, int threshold)
14 14 : base(minThreads, maxThreads) {
15 15 m_threshold = threshold;
16 16 InitPool();
17 17 }
18 18
19 19 public WorkerPool(int minThreads, int maxThreads) :
20 20 base(minThreads, maxThreads) {
21 21 InitPool();
22 22 }
23 23
24 24 public WorkerPool(int threads)
25 25 : base(threads) {
26 26 InitPool();
27 27 }
28 28
29 29 public WorkerPool() {
30 30 InitPool();
31 31 }
32 32
33 public Promise<T> Invoke<T>(Func<T> task) {
33 public IPromise<T> Invoke<T>(Func<T> task) {
34 if (task == null)
35 throw new ArgumentNullException("task");
36 if (IsDisposed)
37 throw new ObjectDisposedException(ToString());
38
39 var promise = new FuncTask<T>(task, null, null, true);
40
41 var lop = TraceContext.Instance.CurrentOperation;
42
43 EnqueueTask(delegate {
44 TraceContext.Instance.EnterLogicalOperation(lop, false);
45
46 promise.Resolve();
47
48 TraceContext.Instance.Leave();
49 });
50
51 return promise;
52 }
53
54 public IPromise Invoke(Action task) {
55 if (task == null)
56 throw new ArgumentNullException("task");
57 if (IsDisposed)
58 throw new ObjectDisposedException(ToString());
59
60 var promise = new ActionTask(task, null, null, true);
61
62 var lop = TraceContext.Instance.CurrentOperation;
63
64 EnqueueTask(delegate {
65 TraceContext.Instance.EnterLogicalOperation(lop, false);
66
67 promise.Resolve();
68
69 TraceContext.Instance.Leave();
70 });
71
72 return promise;
73 }
74
75 public IPromise<T> Invoke<T>(Func<ICancellationToken, T> task) {
34 76 if (task == null)
35 77 throw new ArgumentNullException("task");
36 78 if (IsDisposed)
37 79 throw new ObjectDisposedException(ToString());
38 80
39 81 var promise = new Promise<T>();
40 82
41 83 var lop = TraceContext.Instance.CurrentOperation;
42 84
43 85 EnqueueTask(delegate {
44 86 TraceContext.Instance.EnterLogicalOperation(lop, false);
45 87 try {
46 promise.Resolve(task());
88 if (!promise.CancelOperationIfRequested())
89 promise.Resolve(task(promise));
90 } catch (Exception e) {
91 promise.Reject(e);
92 } finally {
93 TraceContext.Instance.Leave();
94 }
95 });
96
97 return promise;
98 }
99
100 public IPromise Invoke<T>(Action<ICancellationToken> task) {
101 if (task == null)
102 throw new ArgumentNullException("task");
103 if (IsDisposed)
104 throw new ObjectDisposedException(ToString());
105
106 var promise = new Promise();
107
108 var lop = TraceContext.Instance.CurrentOperation;
109
110 EnqueueTask(delegate {
111 TraceContext.Instance.EnterLogicalOperation(lop, false);
112 try {
113 if (!promise.CancelOperationIfRequested()) {
114 task(promise);
115 promise.Resolve();
116 }
47 117 } catch (Exception e) {
48 118 promise.Reject(e);
49 119 } finally {
50 120 TraceContext.Instance.Leave();
51 121 }
52 122 });
53 123
54 124 return promise;
55 125 }
56 126
57 127 protected void EnqueueTask(Action unit) {
58 128 Debug.Assert(unit != null);
59 129 var len = Interlocked.Increment(ref m_queueLength);
60 130 m_queue.Enqueue(unit);
61 131
62 132 if (len > m_threshold * PoolSize) {
63 133 StartWorker();
64 134 }
65 135
66 136 SignalThread();
67 137 }
68 138
69 139 protected override bool TryDequeue(out Action unit) {
70 140 if (m_queue.TryDequeue(out unit)) {
71 141 Interlocked.Decrement(ref m_queueLength);
72 142 return true;
73 143 }
74 144 return false;
75 145 }
76 146
77 147 protected override void InvokeUnit(Action unit) {
78 148 unit();
79 149 }
80 150
81 151 }
82 152 }
@@ -1,302 +1,302
1 1 using System.Threading;
2 2 using System;
3 3 using Implab.Diagnostics;
4 4 using System.Collections.Generic;
5 5
6 6
7 7 #if NET_4_5
8 8 using System.Threading.Tasks;
9 9 #endif
10 10
11 11 namespace Implab {
12 12 public static class PromiseExtensions {
13 13 public static IPromise<T> DispatchToCurrentContext<T>(this IPromise<T> that) {
14 14 Safe.ArgumentNotNull(that, "that");
15 15 var context = SynchronizationContext.Current;
16 16 if (context == null)
17 17 return that;
18 18
19 19 var p = new SyncContextPromise<T>(context);
20 20 p.On(that.Cancel, PromiseEventType.Cancelled);
21 21
22 22 that.On(
23 23 p.Resolve,
24 24 p.Reject,
25 25 p.Cancel
26 26 );
27 27 return p;
28 28 }
29 29
30 30 public static IPromise<T> DispatchToContext<T>(this IPromise<T> that, SynchronizationContext context) {
31 31 Safe.ArgumentNotNull(that, "that");
32 32 Safe.ArgumentNotNull(context, "context");
33 33
34 34 var p = new SyncContextPromise<T>(context);
35 35 p.On(that.Cancel, PromiseEventType.Cancelled);
36 36
37 37
38 38 that.On(
39 39 p.Resolve,
40 40 p.Reject,
41 41 p.Cancel
42 42 );
43 43 return p;
44 44 }
45 45
46 46 /// <summary>
47 47 /// Ensures the dispatched.
48 48 /// </summary>
49 49 /// <returns>The dispatched.</returns>
50 50 /// <param name="that">That.</param>
51 51 /// <param name="head">Head.</param>
52 52 /// <param name="cleanup">Cleanup.</param>
53 53 /// <typeparam name="TPromise">The 1st type parameter.</typeparam>
54 54 /// <typeparam name="T">The 2nd type parameter.</typeparam>
55 55 public static TPromise EnsureDispatched<TPromise,T>(this TPromise that, IPromise<T> head, Action<T> cleanup) where TPromise : IPromise{
56 56 Safe.ArgumentNotNull(that, "that");
57 57 Safe.ArgumentNotNull(head, "head");
58 58
59 59 that.On(() => head.On(cleanup), PromiseEventType.Cancelled);
60 60
61 61 return that;
62 62 }
63 63
64 64 public static AsyncCallback AsyncCallback<T>(this Promise<T> that, Func<IAsyncResult,T> callback) {
65 65 Safe.ArgumentNotNull(that, "that");
66 66 Safe.ArgumentNotNull(callback, "callback");
67 67 var op = TraceContext.Instance.CurrentOperation;
68 68 return ar => {
69 69 TraceContext.Instance.EnterLogicalOperation(op,false);
70 70 try {
71 71 that.Resolve(callback(ar));
72 72 } catch (Exception err) {
73 73 that.Reject(err);
74 74 } finally {
75 75 TraceContext.Instance.Leave();
76 76 }
77 77 };
78 78 }
79 79
80 80 static void CancelCallback(object cookie) {
81 81 ((ICancellable)cookie).Cancel();
82 82 }
83 83
84 84 /// <summary>
85 85 /// Cancells promise after the specified timeout is elapsed.
86 86 /// </summary>
87 87 /// <param name="that">The promise to cancel on timeout.</param>
88 88 /// <param name="milliseconds">The timeout in milliseconds.</param>
89 89 /// <typeparam name="TPromise">The 1st type parameter.</typeparam>
90 90 public static TPromise Timeout<TPromise>(this TPromise that, int milliseconds) where TPromise : IPromise {
91 91 Safe.ArgumentNotNull(that, "that");
92 92 var timer = new Timer(CancelCallback, that, milliseconds, -1);
93 93 that.On(timer.Dispose, PromiseEventType.All);
94 94 return that;
95 95 }
96 96
97 97 public static IPromise Bundle(this ICollection<IPromise> that) {
98 98 Safe.ArgumentNotNull(that, "that");
99 99
100 100 int count = that.Count;
101 101 int errors = 0;
102 102 var medium = new Promise();
103 103
104 104 if (count == 0) {
105 105 medium.Resolve();
106 106 return medium;
107 107 }
108 108
109 109 medium.On(() => {
110 110 foreach(var p2 in that)
111 111 p2.Cancel();
112 112 }, PromiseEventType.ErrorOrCancel);
113 113
114 114 foreach (var p in that)
115 115 p.On(
116 116 () => {
117 117 if (Interlocked.Decrement(ref count) == 0)
118 118 medium.Resolve();
119 119 },
120 120 error => {
121 121 if (Interlocked.Increment(ref errors) == 1)
122 122 medium.Reject(
123 123 new Exception("The dependency promise is failed", error)
124 124 );
125 125 },
126 126 reason => {
127 127 if (Interlocked.Increment(ref errors) == 1)
128 128 medium.Cancel(
129 129 new Exception("The dependency promise is cancelled")
130 130 );
131 131 }
132 132 );
133 133
134 134 return medium;
135 135 }
136 136
137 137 public static IPromise<T[]> Bundle<T>(this ICollection<IPromise<T>> that) {
138 138 Safe.ArgumentNotNull(that, "that");
139 139
140 140 int count = that.Count;
141 141 int errors = 0;
142 142 var medium = new Promise<T[]>();
143 143 var results = new T[that.Count];
144 144
145 145 medium.On(() => {
146 146 foreach(var p2 in that)
147 147 p2.Cancel();
148 148 }, PromiseEventType.ErrorOrCancel);
149 149
150 150 int i = 0;
151 151 foreach (var p in that) {
152 152 var idx = i;
153 153 p.On(
154 154 x => {
155 155 results[idx] = x;
156 156 if (Interlocked.Decrement(ref count) == 0)
157 157 medium.Resolve(results);
158 158 },
159 159 error => {
160 160 if (Interlocked.Increment(ref errors) == 1)
161 161 medium.Reject(
162 162 new Exception("The dependency promise is failed", error)
163 163 );
164 164 },
165 165 reason => {
166 166 if (Interlocked.Increment(ref errors) == 1)
167 167 medium.Cancel(
168 168 new Exception("The dependency promise is cancelled", reason)
169 169 );
170 170 }
171 171 );
172 172 i++;
173 173 }
174 174
175 175 return medium;
176 176 }
177 177
178 178 public static IPromise Then(this IPromise that, Action success, Action<Exception> error, Action<Exception> cancel) {
179 179 Safe.ArgumentNotNull(that, "that");
180 180
181 var d = new ActionTask(success, error, cancel);
181 var d = new ActionTask(success, error, cancel, false);
182 182 that.On(d.Resolve, d.Reject, d.CancelOperation);
183 183 if (success != null)
184 184 d.CancellationRequested(that.Cancel);
185 185 return d;
186 186 }
187 187
188 188 public static IPromise Then(this IPromise that, Action success, Action<Exception> error) {
189 189 return Then(that, success, error, null);
190 190 }
191 191
192 192 public static IPromise Then(this IPromise that, Action success) {
193 193 return Then(that, success, null, null);
194 194 }
195 195
196 196 public static IPromise<T> Then<T>(this IPromise that, Func<T> success, Func<Exception, T> error, Func<Exception, T> cancel) {
197 197 Safe.ArgumentNotNull(that, "that");
198 198
199 var d = new FuncTask<T>(success, error, cancel);
199 var d = new FuncTask<T>(success, error, cancel, false);
200 200 that.On(d.Resolve, d.Reject, d.CancelOperation);
201 201 if (success != null)
202 202 d.CancellationRequested(that.Cancel);
203 203 return d;
204 204 }
205 205
206 206 public static IPromise<T> Then<T>(this IPromise that, Func<T> success, Func<Exception, T> error) {
207 207 return Then(that, success, error, null);
208 208 }
209 209
210 210 public static IPromise<T> Then<T>(this IPromise that, Func<T> success) {
211 211 return Then(that, success, null, null);
212 212 }
213 213
214 214 public static IPromise<T2> Then<T, T2>(this IPromise<T> that, Func<T, T2> success, Func<Exception, T2> error, Func<Exception, T2> cancel) {
215 215 Safe.ArgumentNotNull(that, "that");
216 var d = new FuncTask<T,T2>(success, error, cancel);
216 var d = new FuncTask<T,T2>(success, error, cancel, false);
217 217 that.On(d.Resolve, d.Reject, d.CancelOperation);
218 218 if (success != null)
219 219 d.CancellationRequested(that.Cancel);
220 220 return d;
221 221 }
222 222
223 223 public static IPromise<T2> Then<T, T2>(this IPromise<T> that, Func<T, T2> success, Func<Exception, T2> error) {
224 224 return Then(that, success, error, null);
225 225 }
226 226
227 227 public static IPromise<T2> Then<T, T2>(this IPromise<T> that, Func<T, T2> success) {
228 228 return Then(that, success, null, null);
229 229 }
230 230
231 231 #region chain traits
232 232 public static IPromise Chain(this IPromise that, Func<IPromise> success, Func<Exception,IPromise> error, Func<Exception,IPromise> cancel) {
233 233 Safe.ArgumentNotNull(that, "that");
234 234
235 var d = new ActionChainTask(success, error, cancel);
235 var d = new ActionChainTask(success, error, cancel, false);
236 236 that.On(d.Resolve, d.Reject, d.CancelOperation);
237 237 if (success != null)
238 238 d.CancellationRequested(that.Cancel);
239 239 return d;
240 240 }
241 241
242 242 public static IPromise Chain(this IPromise that, Func<IPromise> success, Func<Exception,IPromise> error) {
243 243 return Chain(that, success, error, null);
244 244 }
245 245
246 246 public static IPromise Chain(this IPromise that, Func<IPromise> success) {
247 247 return Chain(that, success, null, null);
248 248 }
249 249
250 250 public static IPromise<T> Chain<T>(this IPromise that, Func<IPromise<T>> success, Func<Exception, IPromise<T>> error, Func<Exception, IPromise<T>> cancel) {
251 251 Safe.ArgumentNotNull(that, "that");
252 252
253 var d = new FuncChainTask<T>(success, error, cancel);
253 var d = new FuncChainTask<T>(success, error, cancel, false);
254 254 that.On(d.Resolve, d.Reject, d.CancelOperation);
255 255 if (success != null)
256 256 d.CancellationRequested(that.Cancel);
257 257 return d;
258 258 }
259 259
260 260 public static IPromise<T> Chain<T>(this IPromise that, Func<IPromise<T>> success, Func<Exception, IPromise<T>> error) {
261 261 return Chain(that, success, error, null);
262 262 }
263 263
264 264 public static IPromise<T> Chain<T>(this IPromise that, Func<IPromise<T>> success) {
265 265 return Chain(that, success, null, null);
266 266 }
267 267
268 268 public static IPromise<T2> Chain<T, T2>(this IPromise<T> that, Func<T, IPromise<T2>> success, Func<Exception, IPromise<T2>> error, Func<Exception, IPromise<T2>> cancel) {
269 269 Safe.ArgumentNotNull(that, "that");
270 var d = new FuncChainTask<T,T2>(success, error, cancel);
270 var d = new FuncChainTask<T,T2>(success, error, cancel, false);
271 271 that.On(d.Resolve, d.Reject, d.CancelOperation);
272 272 if (success != null)
273 273 d.CancellationRequested(that.Cancel);
274 274 return d;
275 275 }
276 276
277 277 public static IPromise<T2> Chain<T, T2>(this IPromise<T> that, Func<T, IPromise<T2>> success, Func<Exception, IPromise<T2>> error) {
278 278 return Chain(that, success, error, null);
279 279 }
280 280
281 281 public static IPromise<T2> Chain<T, T2>(this IPromise<T> that, Func<T, IPromise<T2>> success) {
282 282 return Chain(that, success, null, null);
283 283 }
284 284
285 285 #endregion
286 286
287 287
288 288 #if NET_4_5
289 289
290 290 public static Task<T> GetTask<T>(this IPromise<T> that) {
291 291 Safe.ArgumentNotNull(that, "that");
292 292 var tcs = new TaskCompletionSource<T>();
293 293
294 294 that.On(tcs.SetResult, tcs.SetException, r => tcs.SetCanceled());
295 295
296 296 return tcs.Task;
297 297 }
298 298
299 299 #endif
300 300 }
301 301 }
302 302
General Comments 0
You need to be logged in to leave comments. Login now