##// END OF EJS Templates
improved performance of promises
cin -
r125:f803565868a4 v2
parent child
Show More
@@ -1,779 +1,779
1 1 using System;
2 2 using System.Reflection;
3 3 using System.Threading;
4 4 using Implab.Parallels;
5 5
6 6 #if MONO
7 7
8 8 using NUnit.Framework;
9 9 using TestClassAttribute = NUnit.Framework.TestFixtureAttribute;
10 10 using TestMethod = NUnit.Framework.TestAttribute;
11 11
12 12 #else
13 13
14 14 using Microsoft.VisualStudio.TestTools.UnitTesting;
15 15
16 16 #endif
17 17
18 18 namespace Implab.Test {
19 19 [TestClass]
20 20 public class AsyncTests {
21 21 [TestMethod]
22 22 public void ResolveTest() {
23 23 int res = -1;
24 24 var p = new Promise<int>();
25 25 p.Then(x => res = x);
26 26 p.Resolve(100);
27 27
28 28 Assert.AreEqual(100, res);
29 29 }
30 30
31 31 [TestMethod]
32 32 public void RejectTest() {
33 33 int res = -1;
34 34 Exception err = null;
35 35
36 36 var p = new Promise<int>();
37 37 p.Then(
38 38 x => res = x,
39 39 e => {
40 40 err = e;
41 41 return -2;
42 42 }
43 43 );
44 44 p.Reject(new ApplicationException("error"));
45 45
46 46 Assert.AreEqual(res, -1);
47 47 Assert.AreEqual(err.Message, "error");
48 48
49 49 }
50 50
51 51 [TestMethod]
52 52 public void CancelExceptionTest() {
53 53 var p = new Promise<bool>();
54 54 p.Cancel();
55 55
56 56 var p2 = p.Cancelled(() => {
57 57 throw new ApplicationException("CANCELLED");
58 58 });
59 59
60 60 try {
61 61 p2.Join();
62 62 Assert.Fail();
63 63 } catch (ApplicationException err) {
64 64 Assert.AreEqual("CANCELLED", err.InnerException.Message);
65 65 }
66 66
67 67 }
68 68
69 69 [TestMethod]
70 70 public void ContinueOnCancelTest() {
71 71 var p = new Promise<bool>();
72 72 p.Cancel();
73 73
74 74 var p2 = p
75 75 .Cancelled<bool>(() => {
76 76 throw new ApplicationException("CANCELLED");
77 77 })
78 78 .Error(e => true);
79 79
80 80 Assert.AreEqual(true, p2.Join());
81 81 }
82 82
83 83 [TestMethod]
84 84 public void JoinSuccessTest() {
85 85 var p = new Promise<int>();
86 86 p.Resolve(100);
87 87 Assert.AreEqual(p.Join(), 100);
88 88 }
89 89
90 90 [TestMethod]
91 91 public void JoinFailTest() {
92 92 var p = new Promise<int>();
93 93 p.Reject(new ApplicationException("failed"));
94 94
95 95 try {
96 96 p.Join();
97 97 throw new ApplicationException("WRONG!");
98 98 } catch (TargetInvocationException err) {
99 99 Assert.AreEqual(err.InnerException.Message, "failed");
100 100 } catch {
101 101 Assert.Fail("Got wrong excaption");
102 102 }
103 103 }
104 104
105 105 [TestMethod]
106 106 public void MapTest() {
107 107 var p = new Promise<int>();
108 108
109 109 var p2 = p.Then(x => x.ToString());
110 110 p.Resolve(100);
111 111
112 112 Assert.AreEqual(p2.Join(), "100");
113 113 }
114 114
115 115 [TestMethod]
116 116 public void FixErrorTest() {
117 117 var p = new Promise<int>();
118 118
119 119 var p2 = p.Error(e => 101);
120 120
121 121 p.Reject(new Exception());
122 122
123 123 Assert.AreEqual(p2.Join(), 101);
124 124 }
125 125
126 126 [TestMethod]
127 127 public void ChainTest() {
128 128 var p1 = new Promise<int>();
129 129
130 130 var p3 = p1.Chain(x => {
131 131 var p2 = new Promise<string>();
132 132 p2.Resolve(x.ToString());
133 133 return p2;
134 134 });
135 135
136 136 p1.Resolve(100);
137 137
138 138 Assert.AreEqual(p3.Join(), "100");
139 139 }
140 140
141 141 [TestMethod]
142 142 public void ChainFailTest() {
143 143 var p1 = new Promise<int>();
144 144
145 145 var p3 = p1.Chain(x => {
146 146 var p2 = new Promise<string>();
147 147 p2.Reject(new Exception("DIE!!!"));
148 148 return p2;
149 149 });
150 150
151 151 p1.Resolve(100);
152 152
153 153 Assert.IsTrue(p3.IsResolved);
154 154 }
155 155
156 156 [TestMethod]
157 157 public void PoolTest() {
158 158 var pid = Thread.CurrentThread.ManagedThreadId;
159 159 var p = AsyncPool.Invoke(() => Thread.CurrentThread.ManagedThreadId);
160 160
161 161 Assert.AreNotEqual(pid, p.Join());
162 162 }
163 163
164 164 [TestMethod]
165 165 public void WorkerPoolSizeTest() {
166 166 var pool = new WorkerPool(5, 10, 1);
167 167
168 168 Assert.AreEqual(5, pool.PoolSize);
169 169
170 170 pool.Invoke(() => { Thread.Sleep(100000000); return 10; });
171 171 pool.Invoke(() => { Thread.Sleep(100000000); return 10; });
172 172 pool.Invoke(() => { Thread.Sleep(100000000); return 10; });
173 173
174 174 Assert.AreEqual(5, pool.PoolSize);
175 175
176 176 for (int i = 0; i < 100; i++)
177 177 pool.Invoke(() => { Thread.Sleep(100000000); return 10; });
178 178 Thread.Sleep(200);
179 179 Assert.AreEqual(10, pool.PoolSize);
180 180
181 181 pool.Dispose();
182 182 }
183 183
184 184 [TestMethod]
185 185 public void WorkerPoolCorrectTest() {
186 186 var pool = new WorkerPool(0,1000,100);
187 187
188 188 const int iterations = 1000;
189 189 int pending = iterations;
190 190 var stop = new ManualResetEvent(false);
191 191
192 192 var count = 0;
193 193 for (int i = 0; i < iterations; i++) {
194 194 pool
195 195 .Invoke(() => 1)
196 196 .Then(x => Interlocked.Add(ref count, x))
197 197 .Then(x => Math.Log10(x))
198 198 .On(() => {
199 199 Interlocked.Decrement(ref pending);
200 200 if (pending == 0)
201 201 stop.Set();
202 202 }, PromiseEventType.All);
203 203 }
204 204
205 205 stop.WaitOne();
206 206
207 207 Assert.AreEqual(iterations, count);
208 208 Console.WriteLine("Max threads: {0}", pool.MaxRunningThreads);
209 209 pool.Dispose();
210 210
211 211 }
212 212
213 213 [TestMethod]
214 214 public void WorkerPoolDisposeTest() {
215 215 var pool = new WorkerPool(5, 20);
216 216 Assert.AreEqual(5, pool.PoolSize);
217 217 pool.Dispose();
218 218 Thread.Sleep(500);
219 219 Assert.AreEqual(0, pool.PoolSize);
220 220 pool.Dispose();
221 221 }
222 222
223 223 [TestMethod]
224 224 public void MTQueueTest() {
225 225 var queue = new MTQueue<int>();
226 226 int res;
227 227
228 228 queue.Enqueue(10);
229 229 Assert.IsTrue(queue.TryDequeue(out res));
230 230 Assert.AreEqual(10, res);
231 231 Assert.IsFalse(queue.TryDequeue(out res));
232 232
233 233 for (int i = 0; i < 1000; i++)
234 234 queue.Enqueue(i);
235 235
236 236 for (int i = 0; i < 1000; i++) {
237 237 queue.TryDequeue(out res);
238 238 Assert.AreEqual(i, res);
239 239 }
240 240
241 241 int writers = 0;
242 242 int readers = 0;
243 243 var stop = new ManualResetEvent(false);
244 244 int total = 0;
245 245
246 246 const int itemsPerWriter = 10000;
247 247 const int writersCount = 10;
248 248
249 249 for (int i = 0; i < writersCount; i++) {
250 250 Interlocked.Increment(ref writers);
251 251 AsyncPool
252 252 .RunThread(() => {
253 253 for (int ii = 0; ii < itemsPerWriter; ii++) {
254 254 queue.Enqueue(1);
255 255 }
256 256 return 1;
257 257 })
258 258 .On(() => Interlocked.Decrement(ref writers), PromiseEventType.All);
259 259 }
260 260
261 261 for (int i = 0; i < 10; i++) {
262 262 Interlocked.Increment(ref readers);
263 263 AsyncPool
264 264 .RunThread(() => {
265 265 int t;
266 266 do {
267 267 while (queue.TryDequeue(out t))
268 268 Interlocked.Add(ref total, t);
269 269 } while (writers > 0);
270 270 return 1;
271 271 })
272 272 .On(() => {
273 273 Interlocked.Decrement(ref readers);
274 274 if (readers == 0)
275 275 stop.Set();
276 276 }, PromiseEventType.All);
277 277 }
278 278
279 279 stop.WaitOne();
280 280
281 281 Assert.AreEqual(100000, total);
282 282 }
283 283
284 284 [TestMethod]
285 285 public void AsyncQueueTest() {
286 286 var queue = new AsyncQueue<int>();
287 287 int res;
288 288
289 289 queue.Enqueue(10);
290 290 Assert.IsTrue(queue.TryDequeue(out res));
291 291 Assert.AreEqual(10, res);
292 292 Assert.IsFalse(queue.TryDequeue(out res));
293 293
294 294 for (int i = 0; i < 1000; i++)
295 295 queue.Enqueue(i);
296 296
297 297 for (int i = 0; i < 1000; i++) {
298 298 queue.TryDequeue(out res);
299 299 Assert.AreEqual(i, res);
300 300 }
301 301
302 302 const int count = 10000000;
303 303
304 304 int res1 = 0, res2 = 0;
305 305 var t1 = Environment.TickCount;
306 306
307 307 AsyncPool.RunThread(
308 308 () => {
309 309 for (var i = 0; i < count; i++)
310 310 queue.Enqueue(1);
311 311 Console.WriteLine("done writer #1: {0} ms", Environment.TickCount - t1);
312 312 },
313 313 () => {
314 314 for (var i = 0; i < count; i++)
315 315 queue.Enqueue(2);
316 316 Console.WriteLine("done writer #2: {0} ms", Environment.TickCount - t1);
317 317 },
318 318 () => {
319 319 int temp;
320 320 int i = 0;
321 321 while (i < count)
322 322 if (queue.TryDequeue(out temp)) {
323 323 i++;
324 324 res1 += temp;
325 325 }
326 326 Console.WriteLine("done reader #1: {0} ms", Environment.TickCount - t1);
327 327 },
328 328 () => {
329 329 int temp;
330 330 int i = 0;
331 331 while (i < count)
332 332 if (queue.TryDequeue(out temp)) {
333 333 i++;
334 334 res2 += temp;
335 335 }
336 336 Console.WriteLine("done reader #2: {0} ms", Environment.TickCount - t1);
337 337 }
338 338 )
339 339 .Bundle()
340 340 .Join();
341 341
342 342 Assert.AreEqual(count * 3, res1 + res2);
343 343
344 344 Console.WriteLine(
345 345 "done: {0} ms, summ#1: {1}, summ#2: {2}, total: {3}, count: {4}",
346 346 Environment.TickCount - t1,
347 347 res1,
348 348 res2,
349 349 res1 + res2,
350 350 count
351 351 );
352 352 }
353 353
354 354 [TestMethod]
355 355 public void AsyncQueueBatchTest() {
356 356 var queue = new AsyncQueue<int>();
357 357
358 358 const int wBatch = 29;
359 359 const int wCount = 400000;
360 360 const int total = wBatch * wCount * 2;
361 361 const int summ = wBatch * wCount * 3;
362 362
363 363 int r1 = 0, r2 = 0;
364 364 const int rBatch = 111;
365 365 int read = 0;
366 366
367 367 var t1 = Environment.TickCount;
368 368
369 369 AsyncPool.RunThread(
370 370 () => {
371 371 var buffer = new int[wBatch];
372 372 for(int i = 0; i<wBatch; i++)
373 373 buffer[i] = 1;
374 374
375 375 for(int i =0; i < wCount; i++)
376 376 queue.EnqueueRange(buffer,0,wBatch);
377 377 Console.WriteLine("done writer #1: {0} ms", Environment.TickCount - t1);
378 378 },
379 379 () => {
380 380 var buffer = new int[wBatch];
381 381 for(int i = 0; i<wBatch; i++)
382 382 buffer[i] = 2;
383 383
384 384 for(int i =0; i < wCount; i++)
385 385 queue.EnqueueRange(buffer,0,wBatch);
386 386 Console.WriteLine("done writer #2: {0} ms", Environment.TickCount - t1);
387 387 },
388 388 () => {
389 389 var buffer = new int[rBatch];
390 390
391 391 while(read < total) {
392 392 int actual;
393 393 if (queue.TryDequeueRange(buffer,0,rBatch,out actual)) {
394 394 for(int i=0; i< actual; i++)
395 395 r1 += buffer[i];
396 396 Interlocked.Add(ref read, actual);
397 397 }
398 398 }
399 399
400 400 Console.WriteLine("done reader #1: {0} ms", Environment.TickCount - t1);
401 401 },
402 402 () => {
403 403 var buffer = new int[rBatch];
404 404
405 405 while(read < total) {
406 406 int actual;
407 407 if (queue.TryDequeueRange(buffer,0,rBatch,out actual)) {
408 408 for(int i=0; i< actual; i++)
409 409 r2 += buffer[i];
410 410 Interlocked.Add(ref read, actual);
411 411 }
412 412 }
413 413
414 414 Console.WriteLine("done reader #2: {0} ms", Environment.TickCount - t1);
415 415 }
416 416 )
417 417 .Bundle()
418 418 .Join();
419 419
420 420 Assert.AreEqual(summ , r1 + r2);
421 421
422 422 Console.WriteLine(
423 423 "done: {0} ms, summ#1: {1}, summ#2: {2}, total: {3}, count: {4}",
424 424 Environment.TickCount - t1,
425 425 r1,
426 426 r2,
427 427 r1 + r2,
428 428 total
429 429 );
430 430 }
431 431
432 432 [TestMethod]
433 433 public void AsyncQueueChunkDequeueTest() {
434 434 var queue = new AsyncQueue<int>();
435 435
436 436 const int wBatch = 31;
437 437 const int wCount = 200000;
438 438 const int total = wBatch * wCount * 3;
439 439 const int summ = wBatch * wCount * 6;
440 440
441 441 int r1 = 0, r2 = 0;
442 442 const int rBatch = 1024;
443 443 int read = 0;
444 444
445 445 var t1 = Environment.TickCount;
446 446
447 447 AsyncPool.RunThread(
448 448 () => {
449 449 var buffer = new int[wBatch];
450 450 for(int i = 0; i<wBatch; i++)
451 451 buffer[i] = 1;
452 452
453 453 for(int i =0; i < wCount; i++)
454 454 queue.EnqueueRange(buffer,0,wBatch);
455 455 Console.WriteLine("done writer #1: {0} ms", Environment.TickCount - t1);
456 456 },
457 457 () => {
458 458 var buffer = new int[wBatch];
459 459 for(int i = 0; i<wBatch; i++)
460 460 buffer[i] = 2;
461 461
462 462 for(int i =0; i < wCount; i++)
463 463 queue.EnqueueRange(buffer,0,wBatch);
464 464 Console.WriteLine("done writer #2: {0} ms", Environment.TickCount - t1);
465 465 },
466 466 () => {
467 467 var buffer = new int[wBatch];
468 468 for(int i = 0; i<wBatch; i++)
469 469 buffer[i] = 3;
470 470
471 471 for(int i =0; i < wCount; i++)
472 472 queue.EnqueueRange(buffer,0,wBatch);
473 473 Console.WriteLine("done writer #3: {0} ms", Environment.TickCount - t1);
474 474 },
475 475 () => {
476 476 var buffer = new int[rBatch];
477 477 int count = 1;
478 478 double avgchunk = 0;
479 479 while(read < total) {
480 480 int actual;
481 481 if (queue.TryDequeueChunk(buffer,0,rBatch,out actual)) {
482 482 for(int i=0; i< actual; i++)
483 483 r2 += buffer[i];
484 484 Interlocked.Add(ref read, actual);
485 485 avgchunk = avgchunk*(count-1)/count + actual/(double)count;
486 486 count ++;
487 487 }
488 488 }
489 489
490 490 Console.WriteLine("done reader #2: {0} ms, avg chunk size: {1}", Environment.TickCount - t1, avgchunk);
491 491 }
492 492 )
493 493 .Bundle()
494 494 .Join();
495 495
496 496 Assert.AreEqual(summ , r1 + r2);
497 497
498 498 Console.WriteLine(
499 499 "done: {0} ms, summ#1: {1}, summ#2: {2}, total: {3}, count: {4}",
500 500 Environment.TickCount - t1,
501 501 r1,
502 502 r2,
503 503 r1 + r2,
504 504 total
505 505 );
506 506 }
507 507
508 508 [TestMethod]
509 509 public void AsyncQueueDrainTest() {
510 510 var queue = new AsyncQueue<int>();
511 511
512 512 const int wBatch = 11;
513 513 const int wCount = 200000;
514 514 const int total = wBatch * wCount * 3;
515 515 const int summ = wBatch * wCount * 3;
516 516
517 517 int r1 = 0, r2 = 0;
518 518 const int rBatch = 11;
519 519 int read = 0;
520 520
521 521 var t1 = Environment.TickCount;
522 522
523 523 AsyncPool.RunThread(
524 524 () => {
525 525 var buffer = new int[wBatch];
526 526 for(int i = 0; i<wBatch; i++)
527 527 buffer[i] = 1;
528 528
529 529 for(int i =0; i < wCount; i++)
530 530 queue.EnqueueRange(buffer,0,wBatch);
531 531 Console.WriteLine("done writer #1: {0} ms", Environment.TickCount - t1);
532 532 },
533 533 () => {
534 534 for(int i =0; i < wCount * wBatch; i++)
535 535 queue.Enqueue(1);
536 536 Console.WriteLine("done writer #2: {0} ms", Environment.TickCount - t1);
537 537 },
538 538 () => {
539 539 var buffer = new int[wBatch];
540 540 for(int i = 0; i<wBatch; i++)
541 541 buffer[i] = 1;
542 542
543 543 for(int i =0; i < wCount; i++)
544 544 queue.EnqueueRange(buffer,0,wBatch);
545 545 Console.WriteLine("done writer #3: {0} ms", Environment.TickCount - t1);
546 546 },
547 547 /*() => {
548 548 int temp;
549 549 int count = 0;
550 550 while (read < total)
551 551 if (queue.TryDequeue(out temp)) {
552 552 count++;
553 553 r1 += temp;
554 554 Interlocked.Increment(ref read);
555 555 }
556 556 Console.WriteLine("done reader #1: {0} ms, {1} count", Environment.TickCount - t1, count);
557 557 },*/
558 558 /*() => {
559 559 var buffer = new int[rBatch];
560 560 var count = 0;
561 561 while(read < total) {
562 562 int actual;
563 563 if (queue.TryDequeueRange(buffer,0,rBatch,out actual)) {
564 564 for(int i=0; i< actual; i++)
565 565 r1 += buffer[i];
566 566 Interlocked.Add(ref read, actual);
567 567 count += actual;
568 568 }
569 569 }
570 570
571 571 Console.WriteLine("done reader #1: {0} ms, {1} items", Environment.TickCount - t1, count);
572 572 },*/
573 573 () => {
574 574 var count = 0;
575 575 while(read < total) {
576 576 var buffer = queue.Drain();
577 577 for(int i=0; i< buffer.Length; i++)
578 578 r1 += buffer[i];
579 579 Interlocked.Add(ref read, buffer.Length);
580 580 count += buffer.Length;
581 581 }
582 582 Console.WriteLine("done reader #1: {0} ms, {1} items", Environment.TickCount - t1, count);
583 583 },
584 584 () => {
585 585 var count = 0;
586 586 while(read < total) {
587 587 var buffer = queue.Drain();
588 588 for(int i=0; i< buffer.Length; i++)
589 589 r2 += buffer[i];
590 590 Interlocked.Add(ref read, buffer.Length);
591 591 count += buffer.Length;
592 592 }
593 593 Console.WriteLine("done reader #2: {0} ms, {1} items", Environment.TickCount - t1, count);
594 594 }
595 595 )
596 596 .Bundle()
597 597 .Join();
598 598
599 599 Assert.AreEqual(summ , r1 + r2);
600 600
601 601 Console.WriteLine(
602 602 "done: {0} ms, summ#1: {1}, summ#2: {2}, total: {3}, count: {4}",
603 603 Environment.TickCount - t1,
604 604 r1,
605 605 r2,
606 606 r1 + r2,
607 607 total
608 608 );
609 609 }
610 610
611 611 [TestMethod]
612 612 public void ParallelMapTest() {
613 613
614 614 const int count = 100000;
615 615
616 616 var args = new double[count];
617 617 var rand = new Random();
618 618
619 619 for (int i = 0; i < count; i++)
620 620 args[i] = rand.NextDouble();
621 621
622 622 var t = Environment.TickCount;
623 623 var res = args.ParallelMap(x => Math.Sin(x*x), 4).Join();
624 624
625 625 Console.WriteLine("Map complete in {0} ms", Environment.TickCount - t);
626 626
627 627 t = Environment.TickCount;
628 628 for (int i = 0; i < count; i++)
629 629 Assert.AreEqual(Math.Sin(args[i] * args[i]), res[i]);
630 630 Console.WriteLine("Verified in {0} ms", Environment.TickCount - t);
631 631 }
632 632
633 633 [TestMethod]
634 634 public void ChainedMapTest() {
635 635
636 using (var pool = new WorkerPool(0,10,1)) {
636 using (var pool = new WorkerPool()) {
637 637 const int count = 10000;
638 638
639 639 var args = new double[count];
640 640 var rand = new Random();
641 641
642 642 for (int i = 0; i < count; i++)
643 643 args[i] = rand.NextDouble();
644 644
645 645 var t = Environment.TickCount;
646 646 var res = args
647 647 .ChainedMap(
648 648 // Analysis disable once AccessToDisposedClosure
649 649 x => pool.Invoke(
650 650 () => Math.Sin(x * x)
651 651 ),
652 652 4
653 653 )
654 654 .Join();
655 655
656 656 Console.WriteLine("Map complete in {0} ms", Environment.TickCount - t);
657 657
658 658 t = Environment.TickCount;
659 659 for (int i = 0; i < count; i++)
660 660 Assert.AreEqual(Math.Sin(args[i] * args[i]), res[i]);
661 661 Console.WriteLine("Verified in {0} ms", Environment.TickCount - t);
662 662 Console.WriteLine("Max workers: {0}", pool.MaxRunningThreads);
663 663 }
664 664 }
665 665
666 666 [TestMethod]
667 667 public void ParallelForEachTest() {
668 668
669 669 const int count = 100000;
670 670
671 671 var args = new int[count];
672 672 var rand = new Random();
673 673
674 674 for (int i = 0; i < count; i++)
675 675 args[i] = (int)(rand.NextDouble() * 100);
676 676
677 677 int result = 0;
678 678
679 679 var t = Environment.TickCount;
680 680 args.ParallelForEach(x => Interlocked.Add(ref result, x), 4).Join();
681 681
682 682 Console.WriteLine("Iteration complete in {0} ms, result: {1}", Environment.TickCount - t, result);
683 683
684 684 int result2 = 0;
685 685
686 686 t = Environment.TickCount;
687 687 for (int i = 0; i < count; i++)
688 688 result2 += args[i];
689 689 Assert.AreEqual(result2, result);
690 690 Console.WriteLine("Verified in {0} ms", Environment.TickCount - t);
691 691 }
692 692
693 693 [TestMethod]
694 694 public void ComplexCase1Test() {
695 695 var flags = new bool[3];
696 696
697 697 // op1 (aync 200ms) => op2 (async 200ms) => op3 (sync map)
698 698
699 699 var step1 = PromiseHelper
700 700 .Sleep(200, "Alan")
701 701 .On(() => flags[0] = true, PromiseEventType.Cancelled);
702 702 var p = step1
703 703 .Chain(x =>
704 704 PromiseHelper
705 705 .Sleep(200, "Hi, " + x)
706 706 .Then(y => y)
707 707 .On(() => flags[1] = true, PromiseEventType.Cancelled)
708 708 )
709 709 .On(() => flags[2] = true, PromiseEventType.Cancelled);
710 710 step1.Join();
711 711 p.Cancel();
712 712 try {
713 713 Assert.AreEqual(p.Join(), "Hi, Alan");
714 714 Assert.Fail("Shouldn't get here");
715 715 } catch (OperationCanceledException) {
716 716 }
717 717
718 718 Assert.IsFalse(flags[0]);
719 719 Assert.IsTrue(flags[1]);
720 720 Assert.IsTrue(flags[2]);
721 721 }
722 722
723 723 [TestMethod]
724 724 public void ChainedCancel1Test() {
725 725 // ΠΏΡ€ΠΈ ΠΎΡ‚ΠΌΠ΅Π½Π΅ сцСплСнной асинхронной ΠΎΠΏΠ΅Ρ€Π°Ρ†ΠΈΠΈ всС ΠΎΠ±Π΅Ρ‰Π°Π½ΠΈΠ΅ Π΄ΠΎΠ»ΠΆΠ½ΠΎ
726 726 // Π·Π°Π²Π΅Ρ€ΡˆΠ°Ρ‚ΡŒΡΡ ошибкой OperationCanceledException
727 727 var p = PromiseHelper
728 728 .Sleep(1, "Hi, HAL!")
729 729 .Then(x => {
730 730 // запускаСм Π΄Π²Π΅ асинхронныС ΠΎΠΏΠ΅Ρ€Π°Ρ†ΠΈΠΈ
731 731 var result = PromiseHelper.Sleep(1000, "HEM ENABLED!!!");
732 732 // вторая опСрация отмСняСт ΠΏΠ΅Ρ€Π²ΡƒΡŽ Π΄ΠΎ Π·Π°Π²Π΅Ρ€ΡˆΠ΅Π½ΠΈΡ
733 733 PromiseHelper
734 734 .Sleep(100, "HAL, STOP!")
735 735 .Then(result.Cancel);
736 736 return result;
737 737 });
738 738 try {
739 739 p.Join();
740 740 } catch (TargetInvocationException err) {
741 741 Assert.IsTrue(err.InnerException is OperationCanceledException);
742 742 }
743 743 }
744 744
745 745 [TestMethod]
746 746 public void ChainedCancel2Test() {
747 747 // ΠΏΡ€ΠΈ ΠΎΡ‚ΠΌΠ΅Π½Π΅ Ρ†Π΅ΠΏΠΎΡ‡ΠΊΠΈ ΠΎΠ±Π΅Ρ‰Π°Π½ΠΈΠΉ, Π²Π»ΠΎΠΆΠ΅Π½Π½Ρ‹Π΅ ΠΎΠΏΠ΅Ρ€Π°Ρ†ΠΈΠΈ Ρ‚Π°ΠΊΠΆΠ΅ Π΄ΠΎΠ»ΠΆΠ½Ρ‹ ΠΎΡ‚ΠΌΠ΅Π½ΡΡ‚ΡŒΡΡ
748 748 var pSurvive = new Promise<bool>();
749 749 var hemStarted = new ManualResetEvent(false);
750 750 var p = PromiseHelper
751 751 .Sleep(1, "Hi, HAL!")
752 752 .Chain(x => {
753 753 hemStarted.Set();
754 754 // запускаСм Π΄Π²Π΅ асинхронныС ΠΎΠΏΠ΅Ρ€Π°Ρ†ΠΈΠΈ
755 755 var result = PromiseHelper
756 756 .Sleep(100000000, "HEM ENABLED!!!")
757 757 .Then(s => {
758 758 pSurvive.Resolve(false);
759 759 return s;
760 760 });
761 761
762 762 result
763 763 .Cancelled(() => pSurvive.Resolve(true));
764 764
765 765 return result;
766 766 });
767 767
768 768 hemStarted.WaitOne();
769 769 p.Cancel();
770 770
771 771 try {
772 772 p.Join();
773 773 } catch (OperationCanceledException) {
774 774 Assert.IsTrue(pSurvive.Join());
775 775 }
776 776 }
777 777 }
778 778 }
779 779
@@ -1,219 +1,289
1 1 using System;
2 2 using Implab.Parallels;
3 3 using System.Threading;
4 4 using System.Reflection;
5 5
6 6 namespace Implab {
7 7 public abstract class AbstractPromise<THandler> {
8 8
9 9 const int UNRESOLVED_SATE = 0;
10 10 const int TRANSITIONAL_STATE = 1;
11 11 const int SUCCEEDED_STATE = 2;
12 12 const int REJECTED_STATE = 3;
13 13 const int CANCELLED_STATE = 4;
14 14
15 const int RESERVED_HANDLERS_COUNT = 4;
16
15 17 int m_state;
16 18 Exception m_error;
19 int m_handlersCount;
17 20
18 readonly AsyncQueue<THandler> m_handlers = new AsyncQueue<THandler>();
21 readonly THandler[] m_handlers = new THandler[RESERVED_HANDLERS_COUNT];
22 MTQueue<THandler> m_extraHandlers;
23 int m_handlerPointer = -1;
24 int m_handlersCommited;
19 25
20 26 #region state managment
21 27 bool BeginTransit() {
22 28 return UNRESOLVED_SATE == Interlocked.CompareExchange(ref m_state, TRANSITIONAL_STATE, UNRESOLVED_SATE);
23 29 }
24 30
25 31 void CompleteTransit(int state) {
26 32 if (TRANSITIONAL_STATE != Interlocked.CompareExchange(ref m_state, state, TRANSITIONAL_STATE))
27 33 throw new InvalidOperationException("Can't complete transition when the object isn't in the transitional state");
28 34 }
29 35
30 36 void WaitTransition() {
31 37 while (m_state == TRANSITIONAL_STATE) {
32 38 Thread.MemoryBarrier();
33 39 }
34 40 }
35 41
36 42 protected void BeginSetResult() {
37 43 if (!BeginTransit()) {
38 44 WaitTransition();
39 45 if (m_state != CANCELLED_STATE)
40 46 throw new InvalidOperationException("The promise is already resolved");
41 47 }
42 48 }
43 49
44 50 protected void EndSetResult() {
45 51 CompleteTransit(SUCCEEDED_STATE);
46 52 OnSuccess();
47 53 }
48 54
49 55
50 56
51 57 /// <summary>
52 58 /// ВыполняСт ΠΎΠ±Π΅Ρ‰Π°Π½ΠΈΠ΅, сообщая ΠΎΠ± ошибкС
53 59 /// </summary>
54 60 /// <remarks>
55 61 /// ΠŸΠΎΡΠΊΠΎΠ»ΡŒΠΊΡƒ ΠΎΠ±Π΅Ρ‰Π°Π½ΠΈΠ΅ Π΄ΠΎΠ»ΠΆΠ½ΠΎ Ρ€Π°Π±ΠΎΡ‚Π°Ρ‚ΡŒ Π² ΠΌΠ½ΠΎΠ³ΠΎΠΏΡ‚ΠΎΡ‡Π½ΠΎΠΉ срСдС, ΠΏΡ€ΠΈ Π΅Π³ΠΎ Π²Ρ‹ΠΏΠΎΠ»Π½Π΅Π½ΠΈΠΈ сразу нСсколько ΠΏΠΎΡ‚ΠΎΠΊΠΎΠ²
56 62 /// ΠΌΠΎΠ³Ρƒ Π²Π΅Ρ€Π½ΡƒΡ‚ΡŒ ΠΎΡˆΠΈΠ±ΠΊΡƒ, ΠΏΡ€ΠΈ этом Ρ‚ΠΎΠ»ΡŒΠΊΠΎ пСрвая Π±ΡƒΠ΄Π΅Ρ‚ использована Π² качСствС Ρ€Π΅Π·ΡƒΠ»ΡŒΡ‚Π°Ρ‚Π°, ΠΎΡΡ‚Π°Π»ΡŒΠ½Ρ‹Π΅
57 63 /// Π±ΡƒΠ΄ΡƒΡ‚ ΠΏΡ€ΠΎΠΈΠ³Π½ΠΎΡ€ΠΈΡ€ΠΎΠ²Π°Π½Ρ‹.
58 64 /// </remarks>
59 65 /// <param name="error">Π˜ΡΠΊΠ»ΡŽΡ‡Π΅Π½ΠΈΠ΅ возникшСС ΠΏΡ€ΠΈ Π²Ρ‹ΠΏΠΎΠ»Π½Π΅Π½ΠΈΠΈ ΠΎΠΏΠ΅Ρ€Π°Ρ†ΠΈΠΈ</param>
60 66 /// <exception cref="InvalidOperationException">Π”Π°Π½Π½ΠΎΠ΅ ΠΎΠ±Π΅Ρ‰Π°Π½ΠΈΠ΅ ΡƒΠΆΠ΅ Π²Ρ‹ΠΏΠΎΠ»Π½Π΅Π½ΠΎ</exception>
61 67 protected void SetError(Exception error) {
62 68 if (BeginTransit()) {
63 69 m_error = error is PromiseTransientException ? error.InnerException : error;
64 70 CompleteTransit(REJECTED_STATE);
65 71 OnError();
66 72 } else {
67 73 WaitTransition();
68 74 if (m_state == SUCCEEDED_STATE)
69 75 throw new InvalidOperationException("The promise is already resolved");
70 76 }
71 77 }
72 78
73 79 /// <summary>
74 80 /// ΠžΡ‚ΠΌΠ΅Π½ΡΠ΅Ρ‚ ΠΎΠΏΠ΅Ρ€Π°Ρ†ΠΈΡŽ, Ссли это Π²ΠΎΠ·ΠΌΠΎΠΆΠ½ΠΎ.
75 81 /// </summary>
76 82 /// <remarks>Для опрСдСлСния Π±Ρ‹Π»Π° Π»ΠΈ опСрация ΠΎΡ‚ΠΌΠ΅Π½Π΅Π½Π° слСдуСт ΠΈΡΠΏΠΎΠ»ΡŒΠ·ΠΎΠ²Π°Ρ‚ΡŒ свойство <see cref="IsCancelled"/>.</remarks>
77 83 protected void SetCancelled() {
78 84 if (BeginTransit()) {
79 85 CompleteTransit(CANCELLED_STATE);
80 86 OnCancelled();
81 87 }
82 88 }
83 89
84 90 protected abstract void SignalSuccess(THandler handler);
85 91
86 92 protected abstract void SignalError(THandler handler, Exception error);
87 93
88 94 protected abstract void SignalCancelled(THandler handler);
89 95
90 96 void OnSuccess() {
97 var hp = m_handlerPointer;
98 var slot = hp +1 ;
99 while (slot < m_handlersCommited) {
100 if (Interlocked.CompareExchange(ref m_handlerPointer, slot, hp) == hp) {
101 SignalSuccess(m_handlers[slot]);
102 }
103 hp = m_handlerPointer;
104 slot = hp +1 ;
105 }
106
107
108 if (m_extraHandlers != null) {
91 109 THandler handler;
92 while (m_handlers.TryDequeue(out handler))
110 while (m_extraHandlers.TryDequeue(out handler))
93 111 SignalSuccess(handler);
94 112 }
113 }
95 114
96 115 void OnError() {
116 var hp = m_handlerPointer;
117 var slot = hp +1 ;
118 while (slot < m_handlersCommited) {
119 if (Interlocked.CompareExchange(ref m_handlerPointer, slot, hp) == hp) {
120 SignalError(m_handlers[slot],m_error);
121 }
122 hp = m_handlerPointer;
123 slot = hp +1 ;
124 }
125
126 if (m_extraHandlers != null) {
97 127 THandler handler;
98 while (m_handlers.TryDequeue(out handler))
128 while (m_extraHandlers.TryDequeue(out handler))
99 129 SignalError(handler,m_error);
100 130 }
131 }
101 132
102 133 void OnCancelled() {
134 var hp = m_handlerPointer;
135 var slot = hp +1 ;
136 while (slot < m_handlersCommited) {
137 if (Interlocked.CompareExchange(ref m_handlerPointer, slot, hp) == hp) {
138 SignalCancelled(m_handlers[slot]);
139 }
140 hp = m_handlerPointer;
141 slot = hp +1 ;
142 }
143
144 if (m_extraHandlers != null) {
103 145 THandler handler;
104 while (m_handlers.TryDequeue(out handler))
146 while (m_extraHandlers.TryDequeue(out handler))
105 147 SignalCancelled(handler);
106 148 }
149 }
107 150
108 151 #endregion
109 152
110 153 protected abstract void Listen(PromiseEventType events, Action handler);
111 154
112 155 #region synchronization traits
113 156 protected void WaitResult(int timeout) {
114 157 if (!IsResolved) {
115 158 var lk = new object();
116 159
117 160 Listen(PromiseEventType.All, () => {
118 161 lock(lk) {
119 162 Monitor.Pulse(lk);
120 163 }
121 164 });
122 165
123 166 lock (lk) {
124 167 while(!IsResolved) {
125 168 if(!Monitor.Wait(lk,timeout))
126 169 throw new TimeoutException();
127 170 }
128 171 }
129 172
130 173 }
131 174 switch (m_state) {
132 175 case SUCCEEDED_STATE:
133 176 return;
134 177 case CANCELLED_STATE:
135 178 throw new OperationCanceledException();
136 179 case REJECTED_STATE:
137 180 throw new TargetInvocationException(m_error);
138 181 default:
139 182 throw new ApplicationException(String.Format("Invalid promise state {0}", m_state));
140 183 }
141 184 }
142 185 #endregion
143 186
144 187 #region handlers managment
145 188
146 189 protected void AddHandler(THandler handler) {
147 190
148 if (IsResolved) {
149 InvokeHandler(handler);
150
151 } else {
191 if (m_state > 1) {
152 192 // the promise is in the resolved state, just invoke the handler
153 m_handlers.Enqueue(handler);
193 InvokeHandler(handler);
194 } else {
195 var slot = Interlocked.Increment(ref m_handlersCount) - 1;
196
197 if (slot < RESERVED_HANDLERS_COUNT) {
198 m_handlers[slot] = handler;
199
200 while (slot != Interlocked.CompareExchange(ref m_handlersCommited, slot + 1, slot)) {
201 }
154 202
203 if (m_state > 1) {
204 do {
205 var hp = m_handlerPointer;
206 slot = hp + 1;
207 if (slot < m_handlersCommited) {
208 if (Interlocked.CompareExchange(ref m_handlerPointer, slot, hp) != hp)
209 continue;
210 InvokeHandler(m_handlers[slot]);
211 }
212 break;
213 } while(true);
214 }
215 } else {
216 if (slot == RESERVED_HANDLERS_COUNT) {
217 m_extraHandlers = new MTQueue<THandler>();
218 } else {
219 while (m_extraHandlers == null)
220 Thread.MemoryBarrier();
221 }
155 222
156 if (IsResolved && m_handlers.TryDequeue(out handler))
223 m_extraHandlers.Enqueue(handler);
224
225 if (m_state > 1 && m_extraHandlers.TryDequeue(out handler))
157 226 // if the promise have been resolved while we was adding the handler to the queue
158 227 // we can't guarantee that someone is still processing it
159 228 // therefore we need to fetch a handler from the queue and execute it
160 229 // note that fetched handler may be not the one that we have added
161 230 // even we can fetch no handlers at all :)
162 231 InvokeHandler(handler);
163 232 }
164 233 }
234 }
165 235
166 236 protected void InvokeHandler(THandler handler) {
167 237 switch (m_state) {
168 238 case SUCCEEDED_STATE:
169 239 SignalSuccess(handler);
170 240 break;
171 241 case CANCELLED_STATE:
172 242 SignalCancelled(handler);
173 243 break;
174 244 case REJECTED_STATE:
175 245 SignalError(handler, m_error);
176 246 break;
177 247 default:
178 248 throw new Exception(String.Format("Invalid promise state {0}", m_state));
179 249 }
180 250 }
181 251
182 252 #endregion
183 253
184 254 #region IPromise implementation
185 255
186 256 public void Join(int timeout) {
187 257 WaitResult(timeout);
188 258 }
189 259
190 260 public void Join() {
191 261 WaitResult(-1);
192 262 }
193 263
194 264 public bool IsResolved {
195 265 get {
196 266 Thread.MemoryBarrier();
197 267 return m_state > 1;
198 268 }
199 269 }
200 270
201 271 public bool IsCancelled {
202 272 get {
203 273 Thread.MemoryBarrier();
204 274 return m_state == CANCELLED_STATE;
205 275 }
206 276 }
207 277
208 278 #endregion
209 279
210 280 #region ICancellable implementation
211 281
212 282 public void Cancel() {
213 283 SetCancelled();
214 284 }
215 285
216 286 #endregion
217 287 }
218 288 }
219 289
@@ -1,625 +1,619
1 1 using System.Threading;
2 2 using System.Collections.Generic;
3 3 using System;
4 4 using System.Collections;
5 5 using System.Diagnostics;
6 6
7 7 namespace Implab.Parallels {
8 8 public class AsyncQueue<T> : IEnumerable<T> {
9 9 class Chunk {
10 10 public Chunk next;
11 11
12 12 int m_low;
13 13 int m_hi;
14 14 int m_alloc;
15 15 readonly int m_size;
16 16 readonly T[] m_data;
17 17
18 18 public Chunk(int size) {
19 19 m_size = size;
20 20 m_data = new T[size];
21 21 }
22 22
23 23 public Chunk(int size, T value) {
24 24 m_size = size;
25 25 m_hi = 1;
26 26 m_alloc = 1;
27 27 m_data = new T[size];
28 28 m_data[0] = value;
29 29 }
30 30
31 31 public Chunk(int size, T[] data, int offset, int length, int alloc) {
32 32 m_size = size;
33 33 m_hi = length;
34 34 m_alloc = alloc;
35 35 m_data = new T[size];
36 36 Array.Copy(data, offset, m_data, 0, length);
37 37 }
38 38
39 39 public int Low {
40 40 get { return m_low; }
41 41 }
42 42
43 43 public int Hi {
44 44 get { return m_hi; }
45 45 }
46 46
47 47 public bool TryEnqueue(T value, out bool extend) {
48 48 var alloc = Interlocked.Increment(ref m_alloc) - 1;
49 49
50 50 if (alloc >= m_size) {
51 51 extend = alloc == m_size;
52 52 return false;
53 53 }
54 54
55 55 extend = false;
56 56 m_data[alloc] = value;
57 57
58 58 while (alloc != Interlocked.CompareExchange(ref m_hi, alloc + 1, alloc)) {
59 59 // spin wait for commit
60 60 }
61 61 return true;
62 62 }
63 63
64 64 /// <summary>
65 65 /// Prevents from allocating new space in the chunk and waits for all write operations to complete
66 66 /// </summary>
67 67 public void Commit() {
68 68 var actual = Math.Min(Interlocked.Exchange(ref m_alloc, m_size + 1), m_size);
69 69
70 70 while (m_hi != actual)
71 71 Thread.MemoryBarrier();
72 72 }
73 73
74 74 public bool TryDequeue(out T value, out bool recycle) {
75 75 int low;
76 76 do {
77 77 low = m_low;
78 78 if (low >= m_hi) {
79 79 value = default(T);
80 80 recycle = (low == m_size);
81 81 return false;
82 82 }
83 83 } while(low != Interlocked.CompareExchange(ref m_low, low + 1, low));
84 84
85 85 recycle = (low == m_size - 1);
86 86 value = m_data[low];
87 87
88 88 return true;
89 89 }
90 90
91 91 public bool TryEnqueueBatch(T[] batch, int offset, int length, out int enqueued, out bool extend) {
92 92 //int alloc;
93 93 //int allocSize;
94 94
95 95 var alloc = Interlocked.Add(ref m_alloc, length) - length;
96 96 if (alloc > m_size) {
97 97 // the chunk is full and someone already
98 98 // creating the new one
99 99 enqueued = 0; // nothing was added
100 100 extend = false; // the caller shouldn't try to extend the queue
101 101 return false; // nothing was added
102 102 }
103 103
104 104 enqueued = Math.Min(m_size - alloc, length);
105 105 extend = length > enqueued;
106 106
107 107 if (enqueued == 0)
108 108 return false;
109 109
110 110
111 111 Array.Copy(batch, offset, m_data, alloc, enqueued);
112 112
113 113 while (alloc != Interlocked.CompareExchange(ref m_hi, alloc + enqueued, alloc)) {
114 114 // spin wait for commit
115 115 }
116 116
117 117 return true;
118 118 }
119 119
120 120 public bool TryDequeueBatch(T[] buffer, int offset, int length,out int dequeued, out bool recycle) {
121 121 int low, hi, batchSize;
122 122
123 123 do {
124 124 low = m_low;
125 125 hi = m_hi;
126 126 if (low >= hi) {
127 127 dequeued = 0;
128 128 recycle = (low == m_size); // recycling could be restarted and we need to signal again
129 129 return false;
130 130 }
131 131 batchSize = Math.Min(hi - low, length);
132 132 } while(low != Interlocked.CompareExchange(ref m_low, low + batchSize, low));
133 133
134 134 recycle = (low == m_size - batchSize);
135 135 dequeued = batchSize;
136 136
137 137 Array.Copy(m_data, low, buffer, offset, batchSize);
138 138
139 139 return true;
140 140 }
141 141
142 142 public T GetAt(int pos) {
143 143 return m_data[pos];
144 144 }
145 145 }
146 146
147 147 public const int DEFAULT_CHUNK_SIZE = 32;
148 148 public const int MAX_CHUNK_SIZE = 262144;
149 149
150 readonly int m_chunkSize = DEFAULT_CHUNK_SIZE;
151
152 150 Chunk m_first;
153 151 Chunk m_last;
154 152
155 public AsyncQueue() {
156 m_last = m_first = new Chunk(m_chunkSize);
157 }
158
159 153 /// <summary>
160 154 /// Adds the specified value to the queue.
161 155 /// </summary>
162 156 /// <param name="value">Tha value which will be added to the queue.</param>
163 157 public void Enqueue(T value) {
164 158 var last = m_last;
165 159 // spin wait to the new chunk
166 160 bool extend = true;
167 161 while (last == null || !last.TryEnqueue(value, out extend)) {
168 162 // try to extend queue
169 163 if (extend || last == null) {
170 var chunk = new Chunk(m_chunkSize, value);
164 var chunk = new Chunk(DEFAULT_CHUNK_SIZE, value);
171 165 if (EnqueueChunk(last, chunk))
172 166 break; // success! exit!
173 167 last = m_last;
174 168 } else {
175 169 while (last == m_last) {
176 170 Thread.MemoryBarrier();
177 171 }
178 172 last = m_last;
179 173 }
180 174 }
181 175 }
182 176
183 177 /// <summary>
184 178 /// Adds the specified data to the queue.
185 179 /// </summary>
186 180 /// <param name="data">The buffer which contains the data to be enqueued.</param>
187 181 /// <param name="offset">The offset of the data in the buffer.</param>
188 182 /// <param name="length">The size of the data to read from the buffer.</param>
189 183 public void EnqueueRange(T[] data, int offset, int length) {
190 184 if (data == null)
191 185 throw new ArgumentNullException("data");
192 186 if (offset < 0)
193 187 throw new ArgumentOutOfRangeException("offset");
194 188 if (length < 1 || offset + length > data.Length)
195 189 throw new ArgumentOutOfRangeException("length");
196 190
197 191 var last = m_last;
198 192
199 193 bool extend;
200 194 int enqueued;
201 195
202 196 while (length > 0) {
203 197 extend = true;
204 198 if (last != null && last.TryEnqueueBatch(data, offset, length, out enqueued, out extend)) {
205 199 length -= enqueued;
206 200 offset += enqueued;
207 201 }
208 202
209 203 if (extend) {
210 204 // there was no enough space in the chunk
211 205 // or there was no chunks in the queue
212 206
213 207 while (length > 0) {
214 208
215 209 var size = Math.Min(length, MAX_CHUNK_SIZE);
216 210
217 211 var chunk = new Chunk(
218 Math.Max(size, m_chunkSize),
212 Math.Max(size, DEFAULT_CHUNK_SIZE),
219 213 data,
220 214 offset,
221 215 size,
222 216 length // length >= size
223 217 );
224 218
225 219 if (!EnqueueChunk(last, chunk)) {
226 220 // looks like the queue has been updated then proceed from the beginning
227 221 last = m_last;
228 222 break;
229 223 }
230 224
231 225 // we have successfully added the new chunk
232 226 last = chunk;
233 227 length -= size;
234 228 offset += size;
235 229 }
236 230 } else {
237 231 // we don't need to extend the queue, if we successfully enqueued data
238 232 if (length == 0)
239 233 break;
240 234
241 235 // if we need to wait while someone is extending the queue
242 236 // spinwait
243 237 while (last == m_last) {
244 238 Thread.MemoryBarrier();
245 239 }
246 240
247 241 last = m_last;
248 242 }
249 243 }
250 244 }
251 245
252 246 /// <summary>
253 247 /// Tries to retrieve the first element from the queue.
254 248 /// </summary>
255 249 /// <returns><c>true</c>, if element is dequeued, <c>false</c> otherwise.</returns>
256 250 /// <param name="value">The value of the dequeued element.</param>
257 251 public bool TryDequeue(out T value) {
258 252 var chunk = m_first;
259 253 bool recycle;
260 254 while (chunk != null) {
261 255
262 256 var result = chunk.TryDequeue(out value, out recycle);
263 257
264 258 if (recycle) // this chunk is waste
265 259 RecycleFirstChunk(chunk);
266 260 else
267 261 return result; // this chunk is usable and returned actual result
268 262
269 263 if (result) // this chunk is waste but the true result is always actual
270 264 return true;
271 265
272 266 // try again
273 267 chunk = m_first;
274 268 }
275 269
276 270 // the queue is empty
277 271 value = default(T);
278 272 return false;
279 273 }
280 274
281 275 /// <summary>
282 276 /// Tries to dequeue the specified amount of data from the queue.
283 277 /// </summary>
284 278 /// <returns><c>true</c>, if data was deuqueued, <c>false</c> otherwise.</returns>
285 279 /// <param name="buffer">The buffer to which the data will be written.</param>
286 280 /// <param name="offset">The offset in the buffer at which the data will be written.</param>
287 281 /// <param name="length">The maximum amount of data to be retrieved.</param>
288 282 /// <param name="dequeued">The actual amout of the retrieved data.</param>
289 283 public bool TryDequeueRange(T[] buffer, int offset, int length, out int dequeued) {
290 284 if (buffer == null)
291 285 throw new ArgumentNullException("buffer");
292 286 if (offset < 0)
293 287 throw new ArgumentOutOfRangeException("offset");
294 288 if (length < 1 || offset + length > buffer.Length)
295 289 throw new ArgumentOutOfRangeException("length");
296 290
297 291 var chunk = m_first;
298 292 bool recycle;
299 293 dequeued = 0;
300 294 while (chunk != null) {
301 295
302 296 int actual;
303 297 if (chunk.TryDequeueBatch(buffer, offset, length, out actual, out recycle)) {
304 298 offset += actual;
305 299 length -= actual;
306 300 dequeued += actual;
307 301 }
308 302
309 303 if (recycle) // this chunk is waste
310 304 RecycleFirstChunk(chunk);
311 305 else if (actual == 0)
312 306 break; // the chunk is usable, but doesn't contain any data (it's the last chunk in the queue)
313 307
314 308 if (length == 0)
315 309 return true;
316 310
317 311 // we still may dequeue something
318 312 // try again
319 313 chunk = m_first;
320 314 }
321 315
322 316 return dequeued != 0;
323 317 }
324 318
325 319 /// <summary>
326 320 /// Tries to dequeue all remaining data in the first chunk.
327 321 /// </summary>
328 322 /// <returns><c>true</c>, if data was dequeued, <c>false</c> otherwise.</returns>
329 323 /// <param name="buffer">The buffer to which the data will be written.</param>
330 324 /// <param name="offset">The offset in the buffer at which the data will be written.</param>
331 325 /// <param name="length">Tha maximum amount of the data to be dequeued.</param>
332 326 /// <param name="dequeued">The actual amount of the dequeued data.</param>
333 327 public bool TryDequeueChunk(T[] buffer, int offset, int length, out int dequeued) {
334 328 if (buffer == null)
335 329 throw new ArgumentNullException("buffer");
336 330 if (offset < 0)
337 331 throw new ArgumentOutOfRangeException("offset");
338 332 if (length < 1 || offset + length > buffer.Length)
339 333 throw new ArgumentOutOfRangeException("length");
340 334
341 335 var chunk = m_first;
342 336 bool recycle;
343 337 dequeued = 0;
344 338
345 339 while (chunk != null) {
346 340
347 341 int actual;
348 342 if (chunk.TryDequeueBatch(buffer, offset, length, out actual, out recycle)) {
349 343 dequeued = actual;
350 344 }
351 345
352 346 if (recycle) // this chunk is waste
353 347 RecycleFirstChunk(chunk);
354 348
355 349 // if we have dequeued any data, then return
356 350 if (dequeued != 0)
357 351 return true;
358 352
359 353 // we still may dequeue something
360 354 // try again
361 355 chunk = m_first;
362 356 }
363 357
364 358 return false;
365 359 }
366 360
367 361 bool EnqueueChunk(Chunk last, Chunk chunk) {
368 362 if (Interlocked.CompareExchange(ref m_last, chunk, last) != last)
369 363 return false;
370 364
371 365 if (last != null)
372 366 last.next = chunk;
373 367 else {
374 368 m_first = chunk;
375 369 }
376 370 return true;
377 371 }
378 372
379 373 void RecycleFirstChunk(Chunk first) {
380 374 var next = first.next;
381 375
382 376 if (first != Interlocked.CompareExchange(ref m_first, next, first))
383 377 return;
384 378
385 379 if (next == null) {
386 380
387 381 if (first != Interlocked.CompareExchange(ref m_last, null, first)) {
388 382 /*while (first.next == null)
389 383 Thread.MemoryBarrier();*/
390 384
391 385 // race
392 386 // someone already updated the tail, restore the pointer to the queue head
393 387 m_first = first;
394 388 }
395 389 // the tail is updated
396 390 }
397 391
398 392 // we need to update the head
399 393 //Interlocked.CompareExchange(ref m_first, next, first);
400 394 // if the head is already updated then give up
401 395 //return;
402 396
403 397 }
404 398
405 399 public void Clear() {
406 400 // start the new queue
407 var chunk = new Chunk(m_chunkSize);
401 var chunk = new Chunk(DEFAULT_CHUNK_SIZE);
408 402
409 403 do {
410 404 Thread.MemoryBarrier();
411 405 var first = m_first;
412 406 var last = m_last;
413 407
414 408 if (last == null) // nothing to clear
415 409 return;
416 410
417 411 if (first == null || (first.next == null && first != last)) // inconcistency
418 412 continue;
419 413
420 414 // here we will create inconsistency which will force others to spin
421 415 // and prevent from fetching. chunk.next = null
422 416 if (first != Interlocked.CompareExchange(ref m_first, chunk, first))
423 417 continue;// inconsistent
424 418
425 419 m_last = chunk;
426 420
427 421 return;
428 422
429 423 } while(true);
430 424 }
431 425
432 426 public T[] Drain() {
433 427 // start the new queue
434 var chunk = new Chunk(m_chunkSize);
428 var chunk = new Chunk(DEFAULT_CHUNK_SIZE);
435 429
436 430 do {
437 431 Thread.MemoryBarrier();
438 432 var first = m_first;
439 433 var last = m_last;
440 434
441 435 if (last == null)
442 436 return new T[0];
443 437
444 438 if (first == null || (first.next == null && first != last))
445 439 continue;
446 440
447 441 // here we will create inconsistency which will force others to spin
448 442 // and prevent from fetching. chunk.next = null
449 443 if (first != Interlocked.CompareExchange(ref m_first, chunk, first))
450 444 continue;// inconsistent
451 445
452 446 last = Interlocked.Exchange(ref m_last, chunk);
453 447
454 448 return ReadChunks(first, last);
455 449
456 450 } while(true);
457 451 }
458 452
459 453 T[] ReadChunks(Chunk chunk, object last) {
460 454 var result = new List<T>();
461 var buffer = new T[m_chunkSize];
455 var buffer = new T[DEFAULT_CHUNK_SIZE];
462 456 int actual;
463 457 bool recycle;
464 458 while (chunk != null) {
465 459 // ensure all write operations on the chunk are complete
466 460 chunk.Commit();
467 461
468 462 // we need to read the chunk using this way
469 463 // since some client still may completing the dequeue
470 464 // operation, such clients most likely won't get results
471 465 while (chunk.TryDequeueBatch(buffer, 0, buffer.Length, out actual, out recycle))
472 466 result.AddRange(new ArraySegmentCollection(buffer, 0, actual));
473 467
474 468 if (chunk == last) {
475 469 chunk = null;
476 470 } else {
477 471 while (chunk.next == null)
478 472 Thread.MemoryBarrier();
479 473 chunk = chunk.next;
480 474 }
481 475 }
482 476
483 477 return result.ToArray();
484 478 }
485 479
486 480 struct ArraySegmentCollection : ICollection<T> {
487 481 readonly T[] m_data;
488 482 readonly int m_offset;
489 483 readonly int m_length;
490 484
491 485 public ArraySegmentCollection(T[] data, int offset, int length) {
492 486 m_data = data;
493 487 m_offset = offset;
494 488 m_length = length;
495 489 }
496 490
497 491 #region ICollection implementation
498 492
499 493 public void Add(T item) {
500 494 throw new InvalidOperationException();
501 495 }
502 496
503 497 public void Clear() {
504 498 throw new InvalidOperationException();
505 499 }
506 500
507 501 public bool Contains(T item) {
508 502 return false;
509 503 }
510 504
511 505 public void CopyTo(T[] array, int arrayIndex) {
512 506 Array.Copy(m_data,m_offset,array,arrayIndex, m_length);
513 507 }
514 508
515 509 public bool Remove(T item) {
516 510 throw new NotImplementedException();
517 511 }
518 512
519 513 public int Count {
520 514 get {
521 515 return m_length;
522 516 }
523 517 }
524 518
525 519 public bool IsReadOnly {
526 520 get {
527 521 return true;
528 522 }
529 523 }
530 524
531 525 #endregion
532 526
533 527 #region IEnumerable implementation
534 528
535 529 public IEnumerator<T> GetEnumerator() {
536 530 for (int i = m_offset; i < m_length + m_offset; i++)
537 531 yield return m_data[i];
538 532 }
539 533
540 534 #endregion
541 535
542 536 #region IEnumerable implementation
543 537
544 538 IEnumerator IEnumerable.GetEnumerator() {
545 539 return GetEnumerator();
546 540 }
547 541
548 542 #endregion
549 543 }
550 544
551 545 #region IEnumerable implementation
552 546
553 547 class Enumerator : IEnumerator<T> {
554 548 Chunk m_current;
555 549 int m_pos = -1;
556 550
557 551 public Enumerator(Chunk fisrt) {
558 552 m_current = fisrt;
559 553 }
560 554
561 555 #region IEnumerator implementation
562 556
563 557 public bool MoveNext() {
564 558 if (m_current == null)
565 559 return false;
566 560
567 561 if (m_pos == -1)
568 562 m_pos = m_current.Low;
569 563 else
570 564 m_pos++;
571 565 if (m_pos == m_current.Hi) {
572 566 m_pos = 0;
573 567 m_current = m_current.next;
574 568 }
575 569
576 570 return true;
577 571 }
578 572
579 573 public void Reset() {
580 574 throw new NotSupportedException();
581 575 }
582 576
583 577 object IEnumerator.Current {
584 578 get {
585 579 return Current;
586 580 }
587 581 }
588 582
589 583 #endregion
590 584
591 585 #region IDisposable implementation
592 586
593 587 public void Dispose() {
594 588 }
595 589
596 590 #endregion
597 591
598 592 #region IEnumerator implementation
599 593
600 594 public T Current {
601 595 get {
602 596 if (m_pos == -1 || m_current == null)
603 597 throw new InvalidOperationException();
604 598 return m_current.GetAt(m_pos);
605 599 }
606 600 }
607 601
608 602 #endregion
609 603 }
610 604
611 605 public IEnumerator<T> GetEnumerator() {
612 606 return new Enumerator(m_first);
613 607 }
614 608
615 609 #endregion
616 610
617 611 #region IEnumerable implementation
618 612
619 613 IEnumerator IEnumerable.GetEnumerator() {
620 614 return GetEnumerator();
621 615 }
622 616
623 617 #endregion
624 618 }
625 619 }
@@ -1,199 +1,197
1 1 using System;
2 2 using System.Threading;
3 3
4 4 namespace Implab.Parallels {
5 5 public abstract class DispatchPool<TUnit> : IDisposable {
6 6 readonly int m_minThreadsLimit;
7 7 readonly int m_maxThreadsLimit;
8 8 readonly int m_releaseTimeout = 1000; // the timeout while the working thread will wait for the new tasks before exit
9 9
10 10 int m_threads; // the current size of the pool
11 11 int m_maxRunningThreads; // the meximum reached size of the pool
12 12 int m_exit; // the pool is going to shutdown, all unused workers are released
13 13
14 14 readonly object m_signal = new object(); // used to pulse waiting threads
15 15
16 16 protected DispatchPool(int min, int max) {
17 17 if (min < 0)
18 18 throw new ArgumentOutOfRangeException("min");
19 19 if (max <= 0)
20 20 throw new ArgumentOutOfRangeException("max");
21 21
22 22 if (min > max)
23 23 min = max;
24 24 m_minThreadsLimit = min;
25 25 m_maxThreadsLimit = max;
26 26 }
27 27
28 28 protected DispatchPool(int threads)
29 29 : this(threads, threads) {
30 30 }
31 31
32 32 protected DispatchPool() {
33 int maxThreads, maxCP;
34 ThreadPool.GetMaxThreads(out maxThreads, out maxCP);
35 33
36 34 m_minThreadsLimit = 0;
37 m_maxThreadsLimit = maxThreads;
35 m_maxThreadsLimit = Environment.ProcessorCount;
38 36 }
39 37
40 38 protected void InitPool() {
41 39 for (int i = 0; i < m_minThreadsLimit; i++)
42 40 StartWorker();
43 41 }
44 42
45 43 public int PoolSize {
46 44 get {
47 45 Thread.MemoryBarrier();
48 46 return m_threads;
49 47 }
50 48 }
51 49
52 50 public int MaxRunningThreads {
53 51 get {
54 52 Thread.MemoryBarrier();
55 53 return m_maxRunningThreads;
56 54 }
57 55 }
58 56
59 57 protected bool IsDisposed {
60 58 get {
61 59 Thread.MemoryBarrier();
62 60 return m_exit == 1;
63 61 }
64 62 }
65 63
66 64 protected abstract bool TryDequeue(out TUnit unit);
67 65
68 66 bool Dequeue(out TUnit unit, int timeout) {
69 67 int ts = Environment.TickCount;
70 68 if (TryDequeue(out unit))
71 69 return true;
72 70 lock (m_signal) {
73 71 while (!TryDequeue(out unit) && m_exit == 0)
74 72 if(!Monitor.Wait(m_signal, Math.Max(0, ts + timeout - Environment.TickCount))) {
75 73 // timeout
76 74 return false;
77 75 }
78 76 // queue item or terminate
79 77 Monitor.Pulse(m_signal);
80 78 if (m_exit == 1)
81 79 return false;
82 80 }
83 81 return true;
84 82 }
85 83
86 84 protected void SignalThread() {
87 85 lock (m_signal) {
88 86 Monitor.Pulse(m_signal);
89 87 }
90 88 }
91 89
92 90 #region thread slots traits
93 91
94 92 bool AllocateThreadSlot() {
95 93 int current;
96 94 // use spins to allocate slot for the new thread
97 95 do {
98 96 current = m_threads;
99 97 if (current >= m_maxThreadsLimit || m_exit == 1)
100 98 // no more slots left or the pool has been disposed
101 99 return false;
102 100 } while (current != Interlocked.CompareExchange(ref m_threads, current + 1, current));
103 101
104 102 UpdateMaxThreads(current + 1);
105 103
106 104 return true;
107 105 }
108 106
109 107 bool AllocateThreadSlot(int desired) {
110 108 if (desired - 1 != Interlocked.CompareExchange(ref m_threads, desired, desired - 1))
111 109 return false;
112 110
113 111 UpdateMaxThreads(desired);
114 112
115 113 return true;
116 114 }
117 115
118 116 bool ReleaseThreadSlot(out bool last) {
119 117 last = false;
120 118 int current;
121 119 // use spins to release slot for the new thread
122 120 Thread.MemoryBarrier();
123 121 do {
124 122 current = m_threads;
125 123 if (current <= m_minThreadsLimit && m_exit == 0)
126 124 // the thread is reserved
127 125 return false;
128 126 } while (current != Interlocked.CompareExchange(ref m_threads, current - 1, current));
129 127
130 128 last = (current == 1);
131 129
132 130 return true;
133 131 }
134 132
135 133 void UpdateMaxThreads(int count) {
136 134 int max;
137 135 do {
138 136 max = m_maxRunningThreads;
139 137 if (max >= count)
140 138 break;
141 139 } while(max != Interlocked.CompareExchange(ref m_maxRunningThreads, count, max));
142 140 }
143 141
144 142 #endregion
145 143
146 144 protected bool StartWorker() {
147 145 if (AllocateThreadSlot()) {
148 146 // slot successfully allocated
149 147 var worker = new Thread(Worker);
150 148 worker.IsBackground = true;
151 149 worker.Start();
152 150
153 151 return true;
154 152 }
155 153 return false;
156 154 }
157 155
158 156 protected abstract void InvokeUnit(TUnit unit);
159 157
160 158 protected virtual void Worker() {
161 159 TUnit unit;
162 160 bool last;
163 161 do {
164 162 while (Dequeue(out unit, m_releaseTimeout)) {
165 163 InvokeUnit(unit);
166 164 }
167 165 if(!ReleaseThreadSlot(out last))
168 166 continue;
169 167 // queue may be not empty
170 168 if (last && TryDequeue(out unit)) {
171 169 InvokeUnit(unit);
172 170 if (AllocateThreadSlot(1))
173 171 continue;
174 172 // we can safely exit since pool is alive
175 173 }
176 174 break;
177 175 } while(true);
178 176 }
179 177
180 178
181 179 protected virtual void Dispose(bool disposing) {
182 180 if (disposing) {
183 181 if (0 == Interlocked.CompareExchange(ref m_exit, 1, 0)) { // implies memory barrier
184 182 // wake sleeping threads
185 183 SignalThread();
186 184 GC.SuppressFinalize(this);
187 185 }
188 186 }
189 187 }
190 188
191 189 public void Dispose() {
192 190 Dispose(true);
193 191 }
194 192
195 193 ~DispatchPool() {
196 194 Dispose(false);
197 195 }
198 196 }
199 197 }
@@ -1,120 +1,34
1 1 using System;
2 2 using Implab.Diagnostics;
3 3 using Implab.Parallels;
4 4 using Implab;
5 5 using System.Collections.Generic;
6 6 using System.Collections.Concurrent;
7 7
8 8 namespace MonoPlay {
9 9 class MainClass {
10 10 public static void Main(string[] args) {
11 11 if (args == null)
12 12 throw new ArgumentNullException("args");
13 13
14 var q1 = new AsyncQueue<int>();
15 var q2 = new Queue<int>();
16
17 14 const int count = 10000000;
18 15
19 int res1 = 0, res2 = 0;
20 16 var t1 = Environment.TickCount;
21 17
22 AsyncPool.RunThread(
23 () => {
24 for (var i = 0; i < count; i++)
25 q1.Enqueue(1);
26 Console.WriteLine("done writer #1: {0} ms", Environment.TickCount - t1);
27 },
28 () => {
29 for (var i = 0; i < count; i++)
30 q1.Enqueue(2);
31 Console.WriteLine("done writer #2: {0} ms", Environment.TickCount - t1);
32 },
33 () => {
34 int temp = 0;
35 int i = 0;
36 while (i < count)
37 if (q1.TryDequeue(out temp)) {
38 i++;
39 res1 += temp;
40 }
41 Console.WriteLine("done reader #1: {0} ms", Environment.TickCount - t1);
42 },
43 () => {
44 int temp = 0;
45 int i = 0;
46 while (i < count)
47 if (q1.TryDequeue(out temp)) {
48 i++;
49 res2 += temp;
50 }
51 Console.WriteLine("done reader #2: {0} ms", Environment.TickCount - t1);
52 }
53 )
54 .Bundle()
55 .Join();
56
57 Console.WriteLine("done: {0} ms, summ#1: {1}, summ#2: {2}, total: {3}, count: {4}", Environment.TickCount - t1, res1, res2, res1 + res2, count*2);
58
59 var t2 = Environment.TickCount;
60 Console.WriteLine("MTQueue: {0} ms", t2 - t1);
61
62 t1 = Environment.TickCount;
18 for (int i = 0; i < count; i++) {
19 var p = new Promise<int>();
63 20
64 for (var i = 0; i < count * 2; i++)
65 q2.Enqueue(i);
66
67 for (var i = 0; i < count * 2; i++)
68 q2.Dequeue();
69
70 t2 = Environment.TickCount;
71 Console.WriteLine("Queue: {0} ms", t2 - t1);
72
73 q2 = new Queue<int>();
74
75 t1 = Environment.TickCount;
76
21 p.On(x => {}).On(x => {});
77 22
78 AsyncPool.RunThread(
79 () => {
80 for (var i = 0; i < count; i++)
81 lock (q2)
82 q2.Enqueue(i);
83 },
84 () => {
85 for (var i = 0; i < count; i++)
86 lock (q2)
87 q2.Enqueue(i);
88 },
89 () => {
90 for (int i = 0; i < count ;)
91 lock (q2) {
92 if (q2.Count == 0)
93 continue;
94 q2.Dequeue();
95 i++;
96 }
97
98 },
99 () => {
100 for (int i = 0; i < count ;)
101 lock (q2) {
102 if (q2.Count == 0)
103 continue;
104 q2.Dequeue();
105 i++;
106 }
23 p.Resolve(i);
107 24
108 25 }
109 )
110 .Bundle()
111 .Join();
112 26
113 27
114 28
115 t2 = Environment.TickCount;
116 Console.WriteLine("Queue+Lock: {0} ms", t2 - t1);
29 var t2 = Environment.TickCount;
30 Console.WriteLine("done: {0} ms, {1:.00} Mb, {2} GC", t2 - t1, GC.GetTotalMemory(false) / (1024*1024), GC.CollectionCount(0) );
117 31
118 32 }
119 33 }
120 34 }
General Comments 0
You need to be logged in to leave comments. Login now