##// END OF EJS Templates
major update, added Drain mathod to AsyncQueue class
cin -
r124:a336cb13c6a9 v2
parent child
Show More
@@ -1,676 +1,779
1 1 using System;
2 2 using System.Reflection;
3 3 using System.Threading;
4 4 using Implab.Parallels;
5 5
6 6 #if MONO
7 7
8 8 using NUnit.Framework;
9 9 using TestClassAttribute = NUnit.Framework.TestFixtureAttribute;
10 10 using TestMethod = NUnit.Framework.TestAttribute;
11 11
12 12 #else
13 13
14 14 using Microsoft.VisualStudio.TestTools.UnitTesting;
15 15
16 16 #endif
17 17
18 18 namespace Implab.Test {
19 19 [TestClass]
20 20 public class AsyncTests {
21 21 [TestMethod]
22 22 public void ResolveTest() {
23 23 int res = -1;
24 24 var p = new Promise<int>();
25 25 p.Then(x => res = x);
26 26 p.Resolve(100);
27 27
28 28 Assert.AreEqual(100, res);
29 29 }
30 30
31 31 [TestMethod]
32 32 public void RejectTest() {
33 33 int res = -1;
34 34 Exception err = null;
35 35
36 36 var p = new Promise<int>();
37 37 p.Then(
38 38 x => res = x,
39 39 e => {
40 40 err = e;
41 41 return -2;
42 42 }
43 43 );
44 44 p.Reject(new ApplicationException("error"));
45 45
46 46 Assert.AreEqual(res, -1);
47 47 Assert.AreEqual(err.Message, "error");
48 48
49 49 }
50 50
51 51 [TestMethod]
52 52 public void CancelExceptionTest() {
53 53 var p = new Promise<bool>();
54 54 p.Cancel();
55 55
56 56 var p2 = p.Cancelled(() => {
57 57 throw new ApplicationException("CANCELLED");
58 58 });
59 59
60 60 try {
61 61 p2.Join();
62 62 Assert.Fail();
63 63 } catch (ApplicationException err) {
64 64 Assert.AreEqual("CANCELLED", err.InnerException.Message);
65 65 }
66 66
67 67 }
68 68
69 69 [TestMethod]
70 70 public void ContinueOnCancelTest() {
71 71 var p = new Promise<bool>();
72 72 p.Cancel();
73 73
74 74 var p2 = p
75 75 .Cancelled<bool>(() => {
76 76 throw new ApplicationException("CANCELLED");
77 77 })
78 78 .Error(e => true);
79 79
80 80 Assert.AreEqual(true, p2.Join());
81 81 }
82 82
83 83 [TestMethod]
84 84 public void JoinSuccessTest() {
85 85 var p = new Promise<int>();
86 86 p.Resolve(100);
87 87 Assert.AreEqual(p.Join(), 100);
88 88 }
89 89
90 90 [TestMethod]
91 91 public void JoinFailTest() {
92 92 var p = new Promise<int>();
93 93 p.Reject(new ApplicationException("failed"));
94 94
95 95 try {
96 96 p.Join();
97 97 throw new ApplicationException("WRONG!");
98 98 } catch (TargetInvocationException err) {
99 99 Assert.AreEqual(err.InnerException.Message, "failed");
100 100 } catch {
101 101 Assert.Fail("Got wrong excaption");
102 102 }
103 103 }
104 104
105 105 [TestMethod]
106 106 public void MapTest() {
107 107 var p = new Promise<int>();
108 108
109 109 var p2 = p.Then(x => x.ToString());
110 110 p.Resolve(100);
111 111
112 112 Assert.AreEqual(p2.Join(), "100");
113 113 }
114 114
115 115 [TestMethod]
116 116 public void FixErrorTest() {
117 117 var p = new Promise<int>();
118 118
119 119 var p2 = p.Error(e => 101);
120 120
121 121 p.Reject(new Exception());
122 122
123 123 Assert.AreEqual(p2.Join(), 101);
124 124 }
125 125
126 126 [TestMethod]
127 127 public void ChainTest() {
128 128 var p1 = new Promise<int>();
129 129
130 130 var p3 = p1.Chain(x => {
131 131 var p2 = new Promise<string>();
132 132 p2.Resolve(x.ToString());
133 133 return p2;
134 134 });
135 135
136 136 p1.Resolve(100);
137 137
138 138 Assert.AreEqual(p3.Join(), "100");
139 139 }
140 140
141 141 [TestMethod]
142 142 public void ChainFailTest() {
143 143 var p1 = new Promise<int>();
144 144
145 145 var p3 = p1.Chain(x => {
146 146 var p2 = new Promise<string>();
147 147 p2.Reject(new Exception("DIE!!!"));
148 148 return p2;
149 149 });
150 150
151 151 p1.Resolve(100);
152 152
153 153 Assert.IsTrue(p3.IsResolved);
154 154 }
155 155
156 156 [TestMethod]
157 157 public void PoolTest() {
158 158 var pid = Thread.CurrentThread.ManagedThreadId;
159 159 var p = AsyncPool.Invoke(() => Thread.CurrentThread.ManagedThreadId);
160 160
161 161 Assert.AreNotEqual(pid, p.Join());
162 162 }
163 163
164 164 [TestMethod]
165 165 public void WorkerPoolSizeTest() {
166 166 var pool = new WorkerPool(5, 10, 1);
167 167
168 168 Assert.AreEqual(5, pool.PoolSize);
169 169
170 170 pool.Invoke(() => { Thread.Sleep(100000000); return 10; });
171 171 pool.Invoke(() => { Thread.Sleep(100000000); return 10; });
172 172 pool.Invoke(() => { Thread.Sleep(100000000); return 10; });
173 173
174 174 Assert.AreEqual(5, pool.PoolSize);
175 175
176 176 for (int i = 0; i < 100; i++)
177 177 pool.Invoke(() => { Thread.Sleep(100000000); return 10; });
178 178 Thread.Sleep(200);
179 179 Assert.AreEqual(10, pool.PoolSize);
180 180
181 181 pool.Dispose();
182 182 }
183 183
184 184 [TestMethod]
185 185 public void WorkerPoolCorrectTest() {
186 186 var pool = new WorkerPool(0,1000,100);
187 187
188 188 const int iterations = 1000;
189 189 int pending = iterations;
190 190 var stop = new ManualResetEvent(false);
191 191
192 192 var count = 0;
193 193 for (int i = 0; i < iterations; i++) {
194 194 pool
195 195 .Invoke(() => 1)
196 196 .Then(x => Interlocked.Add(ref count, x))
197 197 .Then(x => Math.Log10(x))
198 198 .On(() => {
199 199 Interlocked.Decrement(ref pending);
200 200 if (pending == 0)
201 201 stop.Set();
202 202 }, PromiseEventType.All);
203 203 }
204 204
205 205 stop.WaitOne();
206 206
207 207 Assert.AreEqual(iterations, count);
208 208 Console.WriteLine("Max threads: {0}", pool.MaxRunningThreads);
209 209 pool.Dispose();
210 210
211 211 }
212 212
213 213 [TestMethod]
214 214 public void WorkerPoolDisposeTest() {
215 215 var pool = new WorkerPool(5, 20);
216 216 Assert.AreEqual(5, pool.PoolSize);
217 217 pool.Dispose();
218 218 Thread.Sleep(500);
219 219 Assert.AreEqual(0, pool.PoolSize);
220 220 pool.Dispose();
221 221 }
222 222
223 223 [TestMethod]
224 224 public void MTQueueTest() {
225 225 var queue = new MTQueue<int>();
226 226 int res;
227 227
228 228 queue.Enqueue(10);
229 229 Assert.IsTrue(queue.TryDequeue(out res));
230 230 Assert.AreEqual(10, res);
231 231 Assert.IsFalse(queue.TryDequeue(out res));
232 232
233 233 for (int i = 0; i < 1000; i++)
234 234 queue.Enqueue(i);
235 235
236 236 for (int i = 0; i < 1000; i++) {
237 237 queue.TryDequeue(out res);
238 238 Assert.AreEqual(i, res);
239 239 }
240 240
241 241 int writers = 0;
242 242 int readers = 0;
243 243 var stop = new ManualResetEvent(false);
244 244 int total = 0;
245 245
246 246 const int itemsPerWriter = 10000;
247 247 const int writersCount = 10;
248 248
249 249 for (int i = 0; i < writersCount; i++) {
250 250 Interlocked.Increment(ref writers);
251 251 AsyncPool
252 .InvokeNewThread(() => {
252 .RunThread(() => {
253 253 for (int ii = 0; ii < itemsPerWriter; ii++) {
254 254 queue.Enqueue(1);
255 255 }
256 256 return 1;
257 257 })
258 258 .On(() => Interlocked.Decrement(ref writers), PromiseEventType.All);
259 259 }
260 260
261 261 for (int i = 0; i < 10; i++) {
262 262 Interlocked.Increment(ref readers);
263 263 AsyncPool
264 .InvokeNewThread(() => {
264 .RunThread(() => {
265 265 int t;
266 266 do {
267 267 while (queue.TryDequeue(out t))
268 268 Interlocked.Add(ref total, t);
269 269 } while (writers > 0);
270 270 return 1;
271 271 })
272 272 .On(() => {
273 273 Interlocked.Decrement(ref readers);
274 274 if (readers == 0)
275 275 stop.Set();
276 276 }, PromiseEventType.All);
277 277 }
278 278
279 279 stop.WaitOne();
280 280
281 281 Assert.AreEqual(100000, total);
282 282 }
283 283
284 284 [TestMethod]
285 285 public void AsyncQueueTest() {
286 286 var queue = new AsyncQueue<int>();
287 287 int res;
288 288
289 289 queue.Enqueue(10);
290 290 Assert.IsTrue(queue.TryDequeue(out res));
291 291 Assert.AreEqual(10, res);
292 292 Assert.IsFalse(queue.TryDequeue(out res));
293 293
294 294 for (int i = 0; i < 1000; i++)
295 295 queue.Enqueue(i);
296 296
297 297 for (int i = 0; i < 1000; i++) {
298 298 queue.TryDequeue(out res);
299 299 Assert.AreEqual(i, res);
300 300 }
301 301
302 302 const int count = 10000000;
303 303
304 304 int res1 = 0, res2 = 0;
305 305 var t1 = Environment.TickCount;
306 306
307 307 AsyncPool.RunThread(
308 308 () => {
309 309 for (var i = 0; i < count; i++)
310 310 queue.Enqueue(1);
311 311 Console.WriteLine("done writer #1: {0} ms", Environment.TickCount - t1);
312 312 },
313 313 () => {
314 314 for (var i = 0; i < count; i++)
315 315 queue.Enqueue(2);
316 316 Console.WriteLine("done writer #2: {0} ms", Environment.TickCount - t1);
317 317 },
318 318 () => {
319 319 int temp;
320 320 int i = 0;
321 321 while (i < count)
322 322 if (queue.TryDequeue(out temp)) {
323 323 i++;
324 324 res1 += temp;
325 325 }
326 326 Console.WriteLine("done reader #1: {0} ms", Environment.TickCount - t1);
327 327 },
328 328 () => {
329 329 int temp;
330 330 int i = 0;
331 331 while (i < count)
332 332 if (queue.TryDequeue(out temp)) {
333 333 i++;
334 334 res2 += temp;
335 335 }
336 336 Console.WriteLine("done reader #2: {0} ms", Environment.TickCount - t1);
337 337 }
338 338 )
339 .Combine()
339 .Bundle()
340 340 .Join();
341 341
342 342 Assert.AreEqual(count * 3, res1 + res2);
343 343
344 344 Console.WriteLine(
345 345 "done: {0} ms, summ#1: {1}, summ#2: {2}, total: {3}, count: {4}",
346 346 Environment.TickCount - t1,
347 347 res1,
348 348 res2,
349 349 res1 + res2,
350 350 count
351 351 );
352 352 }
353 353
354 354 [TestMethod]
355 355 public void AsyncQueueBatchTest() {
356 356 var queue = new AsyncQueue<int>();
357 357
358 358 const int wBatch = 29;
359 359 const int wCount = 400000;
360 360 const int total = wBatch * wCount * 2;
361 361 const int summ = wBatch * wCount * 3;
362 362
363 363 int r1 = 0, r2 = 0;
364 364 const int rBatch = 111;
365 365 int read = 0;
366 366
367 367 var t1 = Environment.TickCount;
368 368
369 369 AsyncPool.RunThread(
370 370 () => {
371 371 var buffer = new int[wBatch];
372 372 for(int i = 0; i<wBatch; i++)
373 373 buffer[i] = 1;
374 374
375 375 for(int i =0; i < wCount; i++)
376 376 queue.EnqueueRange(buffer,0,wBatch);
377 377 Console.WriteLine("done writer #1: {0} ms", Environment.TickCount - t1);
378 378 },
379 379 () => {
380 380 var buffer = new int[wBatch];
381 381 for(int i = 0; i<wBatch; i++)
382 382 buffer[i] = 2;
383 383
384 384 for(int i =0; i < wCount; i++)
385 385 queue.EnqueueRange(buffer,0,wBatch);
386 386 Console.WriteLine("done writer #2: {0} ms", Environment.TickCount - t1);
387 387 },
388 388 () => {
389 389 var buffer = new int[rBatch];
390 390
391 391 while(read < total) {
392 392 int actual;
393 393 if (queue.TryDequeueRange(buffer,0,rBatch,out actual)) {
394 394 for(int i=0; i< actual; i++)
395 395 r1 += buffer[i];
396 396 Interlocked.Add(ref read, actual);
397 397 }
398 398 }
399 399
400 400 Console.WriteLine("done reader #1: {0} ms", Environment.TickCount - t1);
401 401 },
402 402 () => {
403 403 var buffer = new int[rBatch];
404 404
405 405 while(read < total) {
406 406 int actual;
407 407 if (queue.TryDequeueRange(buffer,0,rBatch,out actual)) {
408 408 for(int i=0; i< actual; i++)
409 409 r2 += buffer[i];
410 410 Interlocked.Add(ref read, actual);
411 411 }
412 412 }
413 413
414 414 Console.WriteLine("done reader #2: {0} ms", Environment.TickCount - t1);
415 415 }
416 416 )
417 .Combine()
417 .Bundle()
418 418 .Join();
419 419
420 420 Assert.AreEqual(summ , r1 + r2);
421 421
422 422 Console.WriteLine(
423 423 "done: {0} ms, summ#1: {1}, summ#2: {2}, total: {3}, count: {4}",
424 424 Environment.TickCount - t1,
425 425 r1,
426 426 r2,
427 427 r1 + r2,
428 428 total
429 429 );
430 430 }
431 431
432 432 [TestMethod]
433 433 public void AsyncQueueChunkDequeueTest() {
434 434 var queue = new AsyncQueue<int>();
435 435
436 436 const int wBatch = 31;
437 437 const int wCount = 200000;
438 438 const int total = wBatch * wCount * 3;
439 439 const int summ = wBatch * wCount * 6;
440 440
441 441 int r1 = 0, r2 = 0;
442 442 const int rBatch = 1024;
443 443 int read = 0;
444 444
445 445 var t1 = Environment.TickCount;
446 446
447 447 AsyncPool.RunThread(
448 448 () => {
449 449 var buffer = new int[wBatch];
450 450 for(int i = 0; i<wBatch; i++)
451 451 buffer[i] = 1;
452 452
453 453 for(int i =0; i < wCount; i++)
454 454 queue.EnqueueRange(buffer,0,wBatch);
455 455 Console.WriteLine("done writer #1: {0} ms", Environment.TickCount - t1);
456 456 },
457 457 () => {
458 458 var buffer = new int[wBatch];
459 459 for(int i = 0; i<wBatch; i++)
460 460 buffer[i] = 2;
461 461
462 462 for(int i =0; i < wCount; i++)
463 463 queue.EnqueueRange(buffer,0,wBatch);
464 464 Console.WriteLine("done writer #2: {0} ms", Environment.TickCount - t1);
465 465 },
466 466 () => {
467 467 var buffer = new int[wBatch];
468 468 for(int i = 0; i<wBatch; i++)
469 469 buffer[i] = 3;
470 470
471 471 for(int i =0; i < wCount; i++)
472 472 queue.EnqueueRange(buffer,0,wBatch);
473 473 Console.WriteLine("done writer #3: {0} ms", Environment.TickCount - t1);
474 474 },
475 475 () => {
476 476 var buffer = new int[rBatch];
477 477 int count = 1;
478 478 double avgchunk = 0;
479 479 while(read < total) {
480 480 int actual;
481 481 if (queue.TryDequeueChunk(buffer,0,rBatch,out actual)) {
482 482 for(int i=0; i< actual; i++)
483 483 r2 += buffer[i];
484 484 Interlocked.Add(ref read, actual);
485 485 avgchunk = avgchunk*(count-1)/count + actual/(double)count;
486 486 count ++;
487 487 }
488 488 }
489 489
490 490 Console.WriteLine("done reader #2: {0} ms, avg chunk size: {1}", Environment.TickCount - t1, avgchunk);
491 491 }
492 492 )
493 .Combine()
493 .Bundle()
494 .Join();
495
496 Assert.AreEqual(summ , r1 + r2);
497
498 Console.WriteLine(
499 "done: {0} ms, summ#1: {1}, summ#2: {2}, total: {3}, count: {4}",
500 Environment.TickCount - t1,
501 r1,
502 r2,
503 r1 + r2,
504 total
505 );
506 }
507
508 [TestMethod]
509 public void AsyncQueueDrainTest() {
510 var queue = new AsyncQueue<int>();
511
512 const int wBatch = 11;
513 const int wCount = 200000;
514 const int total = wBatch * wCount * 3;
515 const int summ = wBatch * wCount * 3;
516
517 int r1 = 0, r2 = 0;
518 const int rBatch = 11;
519 int read = 0;
520
521 var t1 = Environment.TickCount;
522
523 AsyncPool.RunThread(
524 () => {
525 var buffer = new int[wBatch];
526 for(int i = 0; i<wBatch; i++)
527 buffer[i] = 1;
528
529 for(int i =0; i < wCount; i++)
530 queue.EnqueueRange(buffer,0,wBatch);
531 Console.WriteLine("done writer #1: {0} ms", Environment.TickCount - t1);
532 },
533 () => {
534 for(int i =0; i < wCount * wBatch; i++)
535 queue.Enqueue(1);
536 Console.WriteLine("done writer #2: {0} ms", Environment.TickCount - t1);
537 },
538 () => {
539 var buffer = new int[wBatch];
540 for(int i = 0; i<wBatch; i++)
541 buffer[i] = 1;
542
543 for(int i =0; i < wCount; i++)
544 queue.EnqueueRange(buffer,0,wBatch);
545 Console.WriteLine("done writer #3: {0} ms", Environment.TickCount - t1);
546 },
547 /*() => {
548 int temp;
549 int count = 0;
550 while (read < total)
551 if (queue.TryDequeue(out temp)) {
552 count++;
553 r1 += temp;
554 Interlocked.Increment(ref read);
555 }
556 Console.WriteLine("done reader #1: {0} ms, {1} count", Environment.TickCount - t1, count);
557 },*/
558 /*() => {
559 var buffer = new int[rBatch];
560 var count = 0;
561 while(read < total) {
562 int actual;
563 if (queue.TryDequeueRange(buffer,0,rBatch,out actual)) {
564 for(int i=0; i< actual; i++)
565 r1 += buffer[i];
566 Interlocked.Add(ref read, actual);
567 count += actual;
568 }
569 }
570
571 Console.WriteLine("done reader #1: {0} ms, {1} items", Environment.TickCount - t1, count);
572 },*/
573 () => {
574 var count = 0;
575 while(read < total) {
576 var buffer = queue.Drain();
577 for(int i=0; i< buffer.Length; i++)
578 r1 += buffer[i];
579 Interlocked.Add(ref read, buffer.Length);
580 count += buffer.Length;
581 }
582 Console.WriteLine("done reader #1: {0} ms, {1} items", Environment.TickCount - t1, count);
583 },
584 () => {
585 var count = 0;
586 while(read < total) {
587 var buffer = queue.Drain();
588 for(int i=0; i< buffer.Length; i++)
589 r2 += buffer[i];
590 Interlocked.Add(ref read, buffer.Length);
591 count += buffer.Length;
592 }
593 Console.WriteLine("done reader #2: {0} ms, {1} items", Environment.TickCount - t1, count);
594 }
595 )
596 .Bundle()
494 597 .Join();
495 598
496 599 Assert.AreEqual(summ , r1 + r2);
497 600
498 601 Console.WriteLine(
499 602 "done: {0} ms, summ#1: {1}, summ#2: {2}, total: {3}, count: {4}",
500 603 Environment.TickCount - t1,
501 604 r1,
502 605 r2,
503 606 r1 + r2,
504 607 total
505 608 );
506 609 }
507 610
508 611 [TestMethod]
509 612 public void ParallelMapTest() {
510 613
511 614 const int count = 100000;
512 615
513 616 var args = new double[count];
514 617 var rand = new Random();
515 618
516 619 for (int i = 0; i < count; i++)
517 620 args[i] = rand.NextDouble();
518 621
519 622 var t = Environment.TickCount;
520 623 var res = args.ParallelMap(x => Math.Sin(x*x), 4).Join();
521 624
522 625 Console.WriteLine("Map complete in {0} ms", Environment.TickCount - t);
523 626
524 627 t = Environment.TickCount;
525 628 for (int i = 0; i < count; i++)
526 629 Assert.AreEqual(Math.Sin(args[i] * args[i]), res[i]);
527 630 Console.WriteLine("Verified in {0} ms", Environment.TickCount - t);
528 631 }
529 632
530 633 [TestMethod]
531 634 public void ChainedMapTest() {
532 635
533 636 using (var pool = new WorkerPool(0,10,1)) {
534 637 const int count = 10000;
535 638
536 639 var args = new double[count];
537 640 var rand = new Random();
538 641
539 642 for (int i = 0; i < count; i++)
540 643 args[i] = rand.NextDouble();
541 644
542 645 var t = Environment.TickCount;
543 646 var res = args
544 647 .ChainedMap(
545 648 // Analysis disable once AccessToDisposedClosure
546 649 x => pool.Invoke(
547 650 () => Math.Sin(x * x)
548 651 ),
549 652 4
550 653 )
551 654 .Join();
552 655
553 656 Console.WriteLine("Map complete in {0} ms", Environment.TickCount - t);
554 657
555 658 t = Environment.TickCount;
556 659 for (int i = 0; i < count; i++)
557 660 Assert.AreEqual(Math.Sin(args[i] * args[i]), res[i]);
558 661 Console.WriteLine("Verified in {0} ms", Environment.TickCount - t);
559 662 Console.WriteLine("Max workers: {0}", pool.MaxRunningThreads);
560 663 }
561 664 }
562 665
563 666 [TestMethod]
564 667 public void ParallelForEachTest() {
565 668
566 669 const int count = 100000;
567 670
568 671 var args = new int[count];
569 672 var rand = new Random();
570 673
571 674 for (int i = 0; i < count; i++)
572 675 args[i] = (int)(rand.NextDouble() * 100);
573 676
574 677 int result = 0;
575 678
576 679 var t = Environment.TickCount;
577 680 args.ParallelForEach(x => Interlocked.Add(ref result, x), 4).Join();
578 681
579 682 Console.WriteLine("Iteration complete in {0} ms, result: {1}", Environment.TickCount - t, result);
580 683
581 684 int result2 = 0;
582 685
583 686 t = Environment.TickCount;
584 687 for (int i = 0; i < count; i++)
585 688 result2 += args[i];
586 689 Assert.AreEqual(result2, result);
587 690 Console.WriteLine("Verified in {0} ms", Environment.TickCount - t);
588 691 }
589 692
590 693 [TestMethod]
591 694 public void ComplexCase1Test() {
592 695 var flags = new bool[3];
593 696
594 697 // op1 (aync 200ms) => op2 (async 200ms) => op3 (sync map)
595 698
596 699 var step1 = PromiseHelper
597 700 .Sleep(200, "Alan")
598 701 .On(() => flags[0] = true, PromiseEventType.Cancelled);
599 702 var p = step1
600 703 .Chain(x =>
601 704 PromiseHelper
602 705 .Sleep(200, "Hi, " + x)
603 706 .Then(y => y)
604 707 .On(() => flags[1] = true, PromiseEventType.Cancelled)
605 708 )
606 709 .On(() => flags[2] = true, PromiseEventType.Cancelled);
607 710 step1.Join();
608 711 p.Cancel();
609 712 try {
610 713 Assert.AreEqual(p.Join(), "Hi, Alan");
611 714 Assert.Fail("Shouldn't get here");
612 715 } catch (OperationCanceledException) {
613 716 }
614 717
615 718 Assert.IsFalse(flags[0]);
616 719 Assert.IsTrue(flags[1]);
617 720 Assert.IsTrue(flags[2]);
618 721 }
619 722
620 723 [TestMethod]
621 724 public void ChainedCancel1Test() {
622 725 // ΠΏΡ€ΠΈ ΠΎΡ‚ΠΌΠ΅Π½Π΅ сцСплСнной асинхронной ΠΎΠΏΠ΅Ρ€Π°Ρ†ΠΈΠΈ всС ΠΎΠ±Π΅Ρ‰Π°Π½ΠΈΠ΅ Π΄ΠΎΠ»ΠΆΠ½ΠΎ
623 726 // Π·Π°Π²Π΅Ρ€ΡˆΠ°Ρ‚ΡŒΡΡ ошибкой OperationCanceledException
624 727 var p = PromiseHelper
625 728 .Sleep(1, "Hi, HAL!")
626 729 .Then(x => {
627 730 // запускаСм Π΄Π²Π΅ асинхронныС ΠΎΠΏΠ΅Ρ€Π°Ρ†ΠΈΠΈ
628 731 var result = PromiseHelper.Sleep(1000, "HEM ENABLED!!!");
629 732 // вторая опСрация отмСняСт ΠΏΠ΅Ρ€Π²ΡƒΡŽ Π΄ΠΎ Π·Π°Π²Π΅Ρ€ΡˆΠ΅Π½ΠΈΡ
630 733 PromiseHelper
631 734 .Sleep(100, "HAL, STOP!")
632 735 .Then(result.Cancel);
633 736 return result;
634 737 });
635 738 try {
636 739 p.Join();
637 740 } catch (TargetInvocationException err) {
638 741 Assert.IsTrue(err.InnerException is OperationCanceledException);
639 742 }
640 743 }
641 744
642 745 [TestMethod]
643 746 public void ChainedCancel2Test() {
644 747 // ΠΏΡ€ΠΈ ΠΎΡ‚ΠΌΠ΅Π½Π΅ Ρ†Π΅ΠΏΠΎΡ‡ΠΊΠΈ ΠΎΠ±Π΅Ρ‰Π°Π½ΠΈΠΉ, Π²Π»ΠΎΠΆΠ΅Π½Π½Ρ‹Π΅ ΠΎΠΏΠ΅Ρ€Π°Ρ†ΠΈΠΈ Ρ‚Π°ΠΊΠΆΠ΅ Π΄ΠΎΠ»ΠΆΠ½Ρ‹ ΠΎΡ‚ΠΌΠ΅Π½ΡΡ‚ΡŒΡΡ
645 748 var pSurvive = new Promise<bool>();
646 749 var hemStarted = new ManualResetEvent(false);
647 750 var p = PromiseHelper
648 751 .Sleep(1, "Hi, HAL!")
649 752 .Chain(x => {
650 753 hemStarted.Set();
651 754 // запускаСм Π΄Π²Π΅ асинхронныС ΠΎΠΏΠ΅Ρ€Π°Ρ†ΠΈΠΈ
652 755 var result = PromiseHelper
653 756 .Sleep(100000000, "HEM ENABLED!!!")
654 757 .Then(s => {
655 758 pSurvive.Resolve(false);
656 759 return s;
657 760 });
658 761
659 762 result
660 763 .Cancelled(() => pSurvive.Resolve(true));
661 764
662 765 return result;
663 766 });
664 767
665 768 hemStarted.WaitOne();
666 769 p.Cancel();
667 770
668 771 try {
669 772 p.Join();
670 773 } catch (OperationCanceledException) {
671 774 Assert.IsTrue(pSurvive.Join());
672 775 }
673 776 }
674 777 }
675 778 }
676 779
@@ -1,207 +1,207
1 1 using Implab.Diagnostics;
2 2 using System;
3 3 using System.Diagnostics;
4 4 using System.Threading;
5 5
6 6 namespace Implab.Parallels {
7 7 public static class ArrayTraits {
8 8 class ArrayIterator<TSrc> : DispatchPool<int> {
9 9 readonly Action<TSrc> m_action;
10 10 readonly TSrc[] m_source;
11 11 readonly Promise<int> m_promise = new Promise<int>();
12 12 readonly LogicalOperation m_logicalOperation;
13 13
14 14 int m_pending;
15 15 int m_next;
16 16
17 17 public ArrayIterator(TSrc[] source, Action<TSrc> action, int threads)
18 18 : base(threads) {
19 19
20 20 Debug.Assert(source != null);
21 21 Debug.Assert(action != null);
22 22
23 23 m_logicalOperation = TraceContext.Instance.CurrentOperation;
24 24 m_next = 0;
25 25 m_source = source;
26 26 m_pending = source.Length;
27 27 m_action = action;
28 28
29 29 m_promise.On(Dispose, PromiseEventType.All);
30 30
31 31 InitPool();
32 32 }
33 33
34 34 public Promise<int> Promise {
35 35 get {
36 36 return m_promise;
37 37 }
38 38 }
39 39
40 40 protected override void Worker() {
41 41 TraceContext.Instance.EnterLogicalOperation(m_logicalOperation, false);
42 42 try {
43 43 base.Worker();
44 44 } finally {
45 45 TraceContext.Instance.Leave();
46 46 }
47 47 }
48 48
49 49 protected override bool TryDequeue(out int unit) {
50 50 unit = Interlocked.Increment(ref m_next) - 1;
51 51 return unit < m_source.Length;
52 52 }
53 53
54 54 protected override void InvokeUnit(int unit) {
55 55 try {
56 56 m_action(m_source[unit]);
57 57 var pending = Interlocked.Decrement(ref m_pending);
58 58 if (pending == 0)
59 59 m_promise.Resolve(m_source.Length);
60 60 } catch (Exception e) {
61 61 m_promise.Reject(e);
62 62 }
63 63 }
64 64 }
65 65
66 66 class ArrayMapper<TSrc, TDst>: DispatchPool<int> {
67 67 readonly Func<TSrc, TDst> m_transform;
68 68 readonly TSrc[] m_source;
69 69 readonly TDst[] m_dest;
70 70 readonly Promise<TDst[]> m_promise = new Promise<TDst[]>();
71 71 readonly LogicalOperation m_logicalOperation;
72 72
73 73 int m_pending;
74 74 int m_next;
75 75
76 76 public ArrayMapper(TSrc[] source, Func<TSrc, TDst> transform, int threads)
77 77 : base(threads) {
78 78
79 79 Debug.Assert (source != null);
80 80 Debug.Assert( transform != null);
81 81
82 82 m_next = 0;
83 83 m_source = source;
84 84 m_dest = new TDst[source.Length];
85 85 m_pending = source.Length;
86 86 m_transform = transform;
87 87 m_logicalOperation = TraceContext.Instance.CurrentOperation;
88 88
89 89 m_promise.On(Dispose, PromiseEventType.All);
90 90
91 91 InitPool();
92 92 }
93 93
94 94 public Promise<TDst[]> Promise {
95 95 get {
96 96 return m_promise;
97 97 }
98 98 }
99 99
100 100 protected override void Worker() {
101 101 TraceContext.Instance.EnterLogicalOperation(m_logicalOperation,false);
102 102 try {
103 103 base.Worker();
104 104 } finally {
105 105 TraceContext.Instance.Leave();
106 106 }
107 107 }
108 108
109 109 protected override bool TryDequeue(out int unit) {
110 110 unit = Interlocked.Increment(ref m_next) - 1;
111 111 return unit < m_source.Length;
112 112 }
113 113
114 114 protected override void InvokeUnit(int unit) {
115 115 try {
116 116 m_dest[unit] = m_transform(m_source[unit]);
117 117 var pending = Interlocked.Decrement(ref m_pending);
118 118 if (pending == 0)
119 119 m_promise.Resolve(m_dest);
120 120 } catch (Exception e) {
121 121 m_promise.Reject(e);
122 122 }
123 123 }
124 124 }
125 125
126 126 public static IPromise<TDst[]> ParallelMap<TSrc, TDst> (this TSrc[] source, Func<TSrc,TDst> transform, int threads) {
127 127 if (source == null)
128 128 throw new ArgumentNullException("source");
129 129 if (transform == null)
130 130 throw new ArgumentNullException("transform");
131 131
132 132 var mapper = new ArrayMapper<TSrc, TDst>(source, transform, threads);
133 133 return mapper.Promise;
134 134 }
135 135
136 136 public static IPromise<int> ParallelForEach<TSrc>(this TSrc[] source, Action<TSrc> action, int threads) {
137 137 if (source == null)
138 138 throw new ArgumentNullException("source");
139 139 if (action == null)
140 140 throw new ArgumentNullException("action");
141 141
142 142 var iter = new ArrayIterator<TSrc>(source, action, threads);
143 143 return iter.Promise;
144 144 }
145 145
146 146 public static IPromise<TDst[]> ChainedMap<TSrc, TDst>(this TSrc[] source, Func<TSrc, IPromise<TDst>> transform, int threads) {
147 147 if (source == null)
148 148 throw new ArgumentNullException("source");
149 149 if (transform == null)
150 150 throw new ArgumentNullException("transform");
151 151 if (threads <= 0)
152 152 throw new ArgumentOutOfRangeException("threads","Threads number must be greater then zero");
153 153
154 154 if (source.Length == 0)
155 155 return Promise<TDst[]>.ResultToPromise(new TDst[0]);
156 156
157 157 var promise = new Promise<TDst[]>();
158 158 var res = new TDst[source.Length];
159 159 var pending = source.Length;
160 160
161 161 object locker = new object();
162 162 int slots = threads;
163 163
164 164 // Analysis disable AccessToDisposedClosure
165 AsyncPool.InvokeNewThread<int>(() => {
165 AsyncPool.RunThread<int>(() => {
166 166 for (int i = 0; i < source.Length; i++) {
167 167 if(promise.IsResolved)
168 168 break; // stop processing in case of error or cancellation
169 169 var idx = i;
170 170
171 171 if (Interlocked.Decrement(ref slots) < 0) {
172 172 lock(locker) {
173 173 while(slots < 0)
174 174 Monitor.Wait(locker);
175 175 }
176 176 }
177 177
178 178 try {
179 179 transform(source[i])
180 180 .On( x => {
181 181 Interlocked.Increment(ref slots);
182 182 lock (locker) {
183 183 Monitor.Pulse(locker);
184 184 }
185 185 })
186 186 .On(
187 187 x => {
188 188 res[idx] = x;
189 189 var left = Interlocked.Decrement(ref pending);
190 190 if (left == 0)
191 191 promise.Resolve(res);
192 192 },
193 193 promise.Reject
194 194 );
195 195
196 196 } catch (Exception e) {
197 197 promise.Reject(e);
198 198 }
199 199 }
200 200 return 0;
201 201 });
202 202
203 203 return promise;
204 204 }
205 205
206 206 }
207 207 }
@@ -1,86 +1,86
1 1 using Implab.Diagnostics;
2 2 using System;
3 3 using System.Threading;
4 4 using System.Linq;
5 5
6 6 namespace Implab.Parallels {
7 7 /// <summary>
8 8 /// Класс для распаралСливания Π·Π°Π΄Π°Ρ‡.
9 9 /// </summary>
10 10 /// <remarks>
11 11 /// Π˜ΡΠΏΠΎΠ»ΡŒΠ·ΡƒΡ Π΄Π°Π½Π½Ρ‹ΠΉ класс ΠΈ лямда выраТСния ΠΌΠΎΠΆΠ½ΠΎ Ρ€Π°ΡΠΏΠ°Ρ€Π°Π»Π»Π΅Π»ΠΈΡ‚ΡŒ
12 12 /// вычислСния, для этого ΠΈΡΠΏΠΎΠ»ΡŒΠ·ΡƒΠ΅Ρ‚ΡΡ концСпция ΠΎΠ±Π΅Ρ‰Π°Π½ΠΈΠΉ.
13 13 /// </remarks>
14 14 public static class AsyncPool {
15 15
16 16 public static IPromise<T> Invoke<T>(Func<T> func) {
17 17 var p = new Promise<T>();
18 18 var caller = TraceContext.Instance.CurrentOperation;
19 19
20 20 ThreadPool.QueueUserWorkItem(param => {
21 21 TraceContext.Instance.EnterLogicalOperation(caller,false);
22 22 try {
23 23 p.Resolve(func());
24 24 } catch(Exception e) {
25 25 p.Reject(e);
26 26 } finally {
27 27 TraceContext.Instance.Leave();
28 28 }
29 29 });
30 30
31 31 return p;
32 32 }
33 33
34 public static IPromise<T> InvokeNewThread<T>(Func<T> func) {
34 public static IPromise<T> RunThread<T>(Func<T> func) {
35 35 var p = new Promise<T>();
36 36
37 37 var caller = TraceContext.Instance.CurrentOperation;
38 38
39 39 var worker = new Thread(() => {
40 40 TraceContext.Instance.EnterLogicalOperation(caller,false);
41 41 try {
42 42 p.Resolve(func());
43 43 } catch (Exception e) {
44 44 p.Reject(e);
45 45 } finally {
46 46 TraceContext.Instance.Leave();
47 47 }
48 48 });
49 49 worker.IsBackground = true;
50 50 worker.Start();
51 51
52 52 return p;
53 53 }
54 54
55 55
56 public static IPromise InvokeNewThread(Action func) {
56 public static IPromise RunThread(Action func) {
57 57 var p = new Promise();
58 58
59 59 var caller = TraceContext.Instance.CurrentOperation;
60 60
61 61 var worker = new Thread(() => {
62 62 TraceContext.Instance.EnterLogicalOperation(caller,false);
63 63 try {
64 64 func();
65 65 p.Resolve();
66 66 } catch (Exception e) {
67 67 p.Reject(e);
68 68 } finally {
69 69 TraceContext.Instance.Leave();
70 70 }
71 71 });
72 72 worker.IsBackground = true;
73 73 worker.Start();
74 74
75 75 return p;
76 76 }
77 77
78 78 public static IPromise[] RunThread(params Action[] func) {
79 return func.Select(f => InvokeNewThread(f)).ToArray();
79 return func.Select(f => RunThread(f)).ToArray();
80 80 }
81 81
82 82 public static IPromise<T>[] RunThread<T>(params Func<T>[] func) {
83 return func.Select(f => InvokeNewThread(f)).ToArray();
83 return func.Select(f => RunThread(f)).ToArray();
84 84 }
85 85 }
86 86 }
@@ -1,576 +1,625
1 1 using System.Threading;
2 2 using System.Collections.Generic;
3 3 using System;
4 4 using System.Collections;
5 using System.Diagnostics;
5 6
6 7 namespace Implab.Parallels {
7 8 public class AsyncQueue<T> : IEnumerable<T> {
8 9 class Chunk {
9 10 public Chunk next;
10 11
11 12 int m_low;
12 13 int m_hi;
13 14 int m_alloc;
14 15 readonly int m_size;
15 16 readonly T[] m_data;
16 17
17 18 public Chunk(int size) {
18 19 m_size = size;
19 20 m_data = new T[size];
20 21 }
21 22
22 23 public Chunk(int size, T value) {
23 24 m_size = size;
24 25 m_hi = 1;
25 26 m_alloc = 1;
26 27 m_data = new T[size];
27 28 m_data[0] = value;
28 29 }
29 30
30 31 public Chunk(int size, T[] data, int offset, int length, int alloc) {
31 32 m_size = size;
32 33 m_hi = length;
33 34 m_alloc = alloc;
34 35 m_data = new T[size];
35 36 Array.Copy(data, offset, m_data, 0, length);
36 37 }
37 38
38 39 public int Low {
39 40 get { return m_low; }
40 41 }
41 42
42 43 public int Hi {
43 44 get { return m_hi; }
44 45 }
45 46
46 47 public bool TryEnqueue(T value, out bool extend) {
47 48 var alloc = Interlocked.Increment(ref m_alloc) - 1;
48 49
49 50 if (alloc >= m_size) {
50 51 extend = alloc == m_size;
51 52 return false;
52 53 }
53 54
54 55 extend = false;
55 56 m_data[alloc] = value;
56 57
57 58 while (alloc != Interlocked.CompareExchange(ref m_hi, alloc + 1, alloc)) {
58 59 // spin wait for commit
59 60 }
60 61 return true;
61 62 }
62 63
64 /// <summary>
65 /// Prevents from allocating new space in the chunk and waits for all write operations to complete
66 /// </summary>
67 public void Commit() {
68 var actual = Math.Min(Interlocked.Exchange(ref m_alloc, m_size + 1), m_size);
69
70 while (m_hi != actual)
71 Thread.MemoryBarrier();
72 }
73
63 74 public bool TryDequeue(out T value, out bool recycle) {
64 75 int low;
65 76 do {
66 77 low = m_low;
67 78 if (low >= m_hi) {
68 79 value = default(T);
69 80 recycle = (low == m_size);
70 81 return false;
71 82 }
72 83 } while(low != Interlocked.CompareExchange(ref m_low, low + 1, low));
73 84
74 85 recycle = (low == m_size - 1);
75 86 value = m_data[low];
76 87
77 88 return true;
78 89 }
79 90
80 91 public bool TryEnqueueBatch(T[] batch, int offset, int length, out int enqueued, out bool extend) {
81 92 //int alloc;
82 93 //int allocSize;
83 94
84 95 var alloc = Interlocked.Add(ref m_alloc, length) - length;
85 96 if (alloc > m_size) {
86 97 // the chunk is full and someone already
87 98 // creating the new one
88 99 enqueued = 0; // nothing was added
89 100 extend = false; // the caller shouldn't try to extend the queue
90 101 return false; // nothing was added
91 102 }
92 103
93 104 enqueued = Math.Min(m_size - alloc, length);
94 105 extend = length > enqueued;
95 106
96 107 if (enqueued == 0)
97 108 return false;
98 109
99 110
100 111 Array.Copy(batch, offset, m_data, alloc, enqueued);
101 112
102 113 while (alloc != Interlocked.CompareExchange(ref m_hi, alloc + enqueued, alloc)) {
103 114 // spin wait for commit
104 115 }
105 116
106 117 return true;
107 118 }
108 119
109 120 public bool TryDequeueBatch(T[] buffer, int offset, int length,out int dequeued, out bool recycle) {
110 121 int low, hi, batchSize;
111 122
112 123 do {
113 124 low = m_low;
114 125 hi = m_hi;
115 126 if (low >= hi) {
116 127 dequeued = 0;
117 128 recycle = (low == m_size); // recycling could be restarted and we need to signal again
118 129 return false;
119 130 }
120 131 batchSize = Math.Min(hi - low, length);
121 132 } while(low != Interlocked.CompareExchange(ref m_low, low + batchSize, low));
122 133
123 134 recycle = (low == m_size - batchSize);
124 135 dequeued = batchSize;
125 136
126 137 Array.Copy(m_data, low, buffer, offset, batchSize);
127 138
128 139 return true;
129 140 }
130 141
131 142 public T GetAt(int pos) {
132 143 return m_data[pos];
133 144 }
134 145 }
135 146
136 147 public const int DEFAULT_CHUNK_SIZE = 32;
137 148 public const int MAX_CHUNK_SIZE = 262144;
138 149
139 150 readonly int m_chunkSize = DEFAULT_CHUNK_SIZE;
140 151
141 152 Chunk m_first;
142 153 Chunk m_last;
143 154
144 155 public AsyncQueue() {
145 156 m_last = m_first = new Chunk(m_chunkSize);
146 157 }
147 158
148 159 /// <summary>
149 160 /// Adds the specified value to the queue.
150 161 /// </summary>
151 162 /// <param name="value">Tha value which will be added to the queue.</param>
152 163 public void Enqueue(T value) {
153 164 var last = m_last;
154 165 // spin wait to the new chunk
155 166 bool extend = true;
156 167 while (last == null || !last.TryEnqueue(value, out extend)) {
157 168 // try to extend queue
158 169 if (extend || last == null) {
159 170 var chunk = new Chunk(m_chunkSize, value);
160 171 if (EnqueueChunk(last, chunk))
161 172 break; // success! exit!
162 173 last = m_last;
163 174 } else {
164 175 while (last == m_last) {
165 176 Thread.MemoryBarrier();
166 177 }
167 178 last = m_last;
168 179 }
169 180 }
170 181 }
171 182
172 183 /// <summary>
173 184 /// Adds the specified data to the queue.
174 185 /// </summary>
175 186 /// <param name="data">The buffer which contains the data to be enqueued.</param>
176 187 /// <param name="offset">The offset of the data in the buffer.</param>
177 188 /// <param name="length">The size of the data to read from the buffer.</param>
178 189 public void EnqueueRange(T[] data, int offset, int length) {
179 190 if (data == null)
180 191 throw new ArgumentNullException("data");
181 192 if (offset < 0)
182 193 throw new ArgumentOutOfRangeException("offset");
183 194 if (length < 1 || offset + length > data.Length)
184 195 throw new ArgumentOutOfRangeException("length");
185 196
186 197 var last = m_last;
187 198
188 199 bool extend;
189 200 int enqueued;
190 201
191 202 while (length > 0) {
192 203 extend = true;
193 204 if (last != null && last.TryEnqueueBatch(data, offset, length, out enqueued, out extend)) {
194 205 length -= enqueued;
195 206 offset += enqueued;
196 207 }
197 208
198 209 if (extend) {
199 210 // there was no enough space in the chunk
200 211 // or there was no chunks in the queue
201 212
202 213 while (length > 0) {
203 214
204 215 var size = Math.Min(length, MAX_CHUNK_SIZE);
205 216
206 217 var chunk = new Chunk(
207 218 Math.Max(size, m_chunkSize),
208 219 data,
209 220 offset,
210 221 size,
211 222 length // length >= size
212 223 );
213 224
214 225 if (!EnqueueChunk(last, chunk)) {
215 226 // looks like the queue has been updated then proceed from the beginning
216 227 last = m_last;
217 228 break;
218 229 }
219 230
220 231 // we have successfully added the new chunk
221 232 last = chunk;
222 233 length -= size;
223 234 offset += size;
224 235 }
225 236 } else {
226 237 // we don't need to extend the queue, if we successfully enqueued data
227 238 if (length == 0)
228 239 break;
229 240
230 241 // if we need to wait while someone is extending the queue
231 242 // spinwait
232 243 while (last == m_last) {
233 244 Thread.MemoryBarrier();
234 245 }
235 246
236 247 last = m_last;
237 248 }
238 249 }
239 250 }
240 251
241 252 /// <summary>
242 253 /// Tries to retrieve the first element from the queue.
243 254 /// </summary>
244 255 /// <returns><c>true</c>, if element is dequeued, <c>false</c> otherwise.</returns>
245 256 /// <param name="value">The value of the dequeued element.</param>
246 257 public bool TryDequeue(out T value) {
247 258 var chunk = m_first;
248 259 bool recycle;
249 260 while (chunk != null) {
250 261
251 262 var result = chunk.TryDequeue(out value, out recycle);
252 263
253 264 if (recycle) // this chunk is waste
254 265 RecycleFirstChunk(chunk);
255 266 else
256 267 return result; // this chunk is usable and returned actual result
257 268
258 269 if (result) // this chunk is waste but the true result is always actual
259 270 return true;
260 271
261 272 // try again
262 273 chunk = m_first;
263 274 }
264 275
265 276 // the queue is empty
266 277 value = default(T);
267 278 return false;
268 279 }
269 280
270 281 /// <summary>
271 282 /// Tries to dequeue the specified amount of data from the queue.
272 283 /// </summary>
273 284 /// <returns><c>true</c>, if data was deuqueued, <c>false</c> otherwise.</returns>
274 285 /// <param name="buffer">The buffer to which the data will be written.</param>
275 286 /// <param name="offset">The offset in the buffer at which the data will be written.</param>
276 287 /// <param name="length">The maximum amount of data to be retrieved.</param>
277 288 /// <param name="dequeued">The actual amout of the retrieved data.</param>
278 289 public bool TryDequeueRange(T[] buffer, int offset, int length, out int dequeued) {
279 290 if (buffer == null)
280 291 throw new ArgumentNullException("buffer");
281 292 if (offset < 0)
282 293 throw new ArgumentOutOfRangeException("offset");
283 294 if (length < 1 || offset + length > buffer.Length)
284 295 throw new ArgumentOutOfRangeException("length");
285 296
286 297 var chunk = m_first;
287 298 bool recycle;
288 299 dequeued = 0;
289 300 while (chunk != null) {
290 301
291 302 int actual;
292 303 if (chunk.TryDequeueBatch(buffer, offset, length, out actual, out recycle)) {
293 304 offset += actual;
294 305 length -= actual;
295 306 dequeued += actual;
296 307 }
297 308
298 309 if (recycle) // this chunk is waste
299 310 RecycleFirstChunk(chunk);
300 311 else if (actual == 0)
301 312 break; // the chunk is usable, but doesn't contain any data (it's the last chunk in the queue)
302 313
303 314 if (length == 0)
304 315 return true;
305 316
306 317 // we still may dequeue something
307 318 // try again
308 319 chunk = m_first;
309 320 }
310 321
311 322 return dequeued != 0;
312 323 }
313 324
314 325 /// <summary>
315 326 /// Tries to dequeue all remaining data in the first chunk.
316 327 /// </summary>
317 328 /// <returns><c>true</c>, if data was dequeued, <c>false</c> otherwise.</returns>
318 329 /// <param name="buffer">The buffer to which the data will be written.</param>
319 330 /// <param name="offset">The offset in the buffer at which the data will be written.</param>
320 331 /// <param name="length">Tha maximum amount of the data to be dequeued.</param>
321 332 /// <param name="dequeued">The actual amount of the dequeued data.</param>
322 333 public bool TryDequeueChunk(T[] buffer, int offset, int length, out int dequeued) {
323 334 if (buffer == null)
324 335 throw new ArgumentNullException("buffer");
325 336 if (offset < 0)
326 337 throw new ArgumentOutOfRangeException("offset");
327 338 if (length < 1 || offset + length > buffer.Length)
328 339 throw new ArgumentOutOfRangeException("length");
329 340
330 341 var chunk = m_first;
331 342 bool recycle;
332 343 dequeued = 0;
333 344
334 345 while (chunk != null) {
335 346
336 347 int actual;
337 348 if (chunk.TryDequeueBatch(buffer, offset, length, out actual, out recycle)) {
338 349 dequeued = actual;
339 350 }
340 351
341 352 if (recycle) // this chunk is waste
342 353 RecycleFirstChunk(chunk);
343 354
344 355 // if we have dequeued any data, then return
345 356 if (dequeued != 0)
346 357 return true;
347 358
348 359 // we still may dequeue something
349 360 // try again
350 361 chunk = m_first;
351 362 }
352 363
353 364 return false;
354 365 }
355 366
356 367 bool EnqueueChunk(Chunk last, Chunk chunk) {
357 368 if (Interlocked.CompareExchange(ref m_last, chunk, last) != last)
358 369 return false;
359 370
360 371 if (last != null)
361 372 last.next = chunk;
362 else
373 else {
363 374 m_first = chunk;
375 }
364 376 return true;
365 377 }
366 378
367 379 void RecycleFirstChunk(Chunk first) {
368 380 var next = first.next;
369 381
382 if (first != Interlocked.CompareExchange(ref m_first, next, first))
383 return;
384
370 385 if (next == null) {
371 // looks like this is the last chunk
386
372 387 if (first != Interlocked.CompareExchange(ref m_last, null, first)) {
388 /*while (first.next == null)
389 Thread.MemoryBarrier();*/
390
373 391 // race
374 // maybe someone already recycled this chunk
375 // or a new chunk has been appedned to the queue
376
377 return; // give up
392 // someone already updated the tail, restore the pointer to the queue head
393 m_first = first;
378 394 }
379 395 // the tail is updated
380 396 }
381 397
382 398 // we need to update the head
383 Interlocked.CompareExchange(ref m_first, next, first);
399 //Interlocked.CompareExchange(ref m_first, next, first);
384 400 // if the head is already updated then give up
385 return;
401 //return;
386 402
387 403 }
388 404
389 405 public void Clear() {
390 406 // start the new queue
391 var t = new Chunk(m_chunkSize);
392 Thread.MemoryBarrier();
393 m_last = t;
394 Thread.MemoryBarrier();
407 var chunk = new Chunk(m_chunkSize);
408
409 do {
410 Thread.MemoryBarrier();
411 var first = m_first;
412 var last = m_last;
413
414 if (last == null) // nothing to clear
415 return;
395 416
396 // make the new queue available to the readers, and stop the old one
397 m_first = t;
398 Thread.MemoryBarrier();
417 if (first == null || (first.next == null && first != last)) // inconcistency
418 continue;
419
420 // here we will create inconsistency which will force others to spin
421 // and prevent from fetching. chunk.next = null
422 if (first != Interlocked.CompareExchange(ref m_first, chunk, first))
423 continue;// inconsistent
424
425 m_last = chunk;
426
427 return;
428
429 } while(true);
399 430 }
400 431
401 432 public T[] Drain() {
402 433 // start the new queue
403 var t = new Chunk(m_chunkSize);
404 Thread.MemoryBarrier();
405 m_last = t;
406 Thread.MemoryBarrier();
407
408 // make the new queue available to the readers, and stop the old one
409 Chunk first;
434 var chunk = new Chunk(m_chunkSize);
410 435
411 436 do {
412 first = m_first;
413 } while(first != Interlocked.CompareExchange(ref m_first
414 Thread.MemoryBarrier();
437 Thread.MemoryBarrier();
438 var first = m_first;
439 var last = m_last;
440
441 if (last == null)
442 return new T[0];
443
444 if (first == null || (first.next == null && first != last))
445 continue;
415 446
447 // here we will create inconsistency which will force others to spin
448 // and prevent from fetching. chunk.next = null
449 if (first != Interlocked.CompareExchange(ref m_first, chunk, first))
450 continue;// inconsistent
416 451
452 last = Interlocked.Exchange(ref m_last, chunk);
453
454 return ReadChunks(first, last);
455
456 } while(true);
417 457 }
418 458
419 T[] ReadChunks(Chunk chunk) {
459 T[] ReadChunks(Chunk chunk, object last) {
420 460 var result = new List<T>();
421 461 var buffer = new T[m_chunkSize];
422 462 int actual;
423 463 bool recycle;
424 464 while (chunk != null) {
465 // ensure all write operations on the chunk are complete
466 chunk.Commit();
467
425 468 // we need to read the chunk using this way
426 469 // since some client still may completing the dequeue
427 470 // operation, such clients most likely won't get results
428 471 while (chunk.TryDequeueBatch(buffer, 0, buffer.Length, out actual, out recycle))
429 472 result.AddRange(new ArraySegmentCollection(buffer, 0, actual));
430 473
431 chunk = chunk.next;
474 if (chunk == last) {
475 chunk = null;
476 } else {
477 while (chunk.next == null)
478 Thread.MemoryBarrier();
479 chunk = chunk.next;
480 }
432 481 }
433 482
434 483 return result.ToArray();
435 484 }
436 485
437 486 struct ArraySegmentCollection : ICollection<T> {
438 487 readonly T[] m_data;
439 488 readonly int m_offset;
440 489 readonly int m_length;
441 490
442 491 public ArraySegmentCollection(T[] data, int offset, int length) {
443 492 m_data = data;
444 493 m_offset = offset;
445 494 m_length = length;
446 495 }
447 496
448 497 #region ICollection implementation
449 498
450 499 public void Add(T item) {
451 500 throw new InvalidOperationException();
452 501 }
453 502
454 503 public void Clear() {
455 504 throw new InvalidOperationException();
456 505 }
457 506
458 507 public bool Contains(T item) {
459 508 return false;
460 509 }
461 510
462 511 public void CopyTo(T[] array, int arrayIndex) {
463 512 Array.Copy(m_data,m_offset,array,arrayIndex, m_length);
464 513 }
465 514
466 515 public bool Remove(T item) {
467 516 throw new NotImplementedException();
468 517 }
469 518
470 519 public int Count {
471 520 get {
472 521 return m_length;
473 522 }
474 523 }
475 524
476 525 public bool IsReadOnly {
477 526 get {
478 527 return true;
479 528 }
480 529 }
481 530
482 531 #endregion
483 532
484 533 #region IEnumerable implementation
485 534
486 535 public IEnumerator<T> GetEnumerator() {
487 536 for (int i = m_offset; i < m_length + m_offset; i++)
488 537 yield return m_data[i];
489 538 }
490 539
491 540 #endregion
492 541
493 542 #region IEnumerable implementation
494 543
495 544 IEnumerator IEnumerable.GetEnumerator() {
496 545 return GetEnumerator();
497 546 }
498 547
499 548 #endregion
500 549 }
501 550
502 551 #region IEnumerable implementation
503 552
504 553 class Enumerator : IEnumerator<T> {
505 554 Chunk m_current;
506 555 int m_pos = -1;
507 556
508 557 public Enumerator(Chunk fisrt) {
509 558 m_current = fisrt;
510 559 }
511 560
512 561 #region IEnumerator implementation
513 562
514 563 public bool MoveNext() {
515 564 if (m_current == null)
516 565 return false;
517 566
518 567 if (m_pos == -1)
519 568 m_pos = m_current.Low;
520 569 else
521 570 m_pos++;
522 571 if (m_pos == m_current.Hi) {
523 572 m_pos = 0;
524 573 m_current = m_current.next;
525 574 }
526 575
527 576 return true;
528 577 }
529 578
530 579 public void Reset() {
531 580 throw new NotSupportedException();
532 581 }
533 582
534 583 object IEnumerator.Current {
535 584 get {
536 585 return Current;
537 586 }
538 587 }
539 588
540 589 #endregion
541 590
542 591 #region IDisposable implementation
543 592
544 593 public void Dispose() {
545 594 }
546 595
547 596 #endregion
548 597
549 598 #region IEnumerator implementation
550 599
551 600 public T Current {
552 601 get {
553 602 if (m_pos == -1 || m_current == null)
554 603 throw new InvalidOperationException();
555 604 return m_current.GetAt(m_pos);
556 605 }
557 606 }
558 607
559 608 #endregion
560 609 }
561 610
562 611 public IEnumerator<T> GetEnumerator() {
563 612 return new Enumerator(m_first);
564 613 }
565 614
566 615 #endregion
567 616
568 617 #region IEnumerable implementation
569 618
570 619 IEnumerator IEnumerable.GetEnumerator() {
571 620 return GetEnumerator();
572 621 }
573 622
574 623 #endregion
575 624 }
576 625 }
@@ -1,134 +1,187
1 1 using System.Threading;
2 2 using System;
3 3 using Implab.Diagnostics;
4 4 using System.Collections.Generic;
5 5
6 6
7 7 #if NET_4_5
8 8 using System.Threading.Tasks;
9 9 #endif
10 10
11 11 namespace Implab {
12 12 public static class PromiseExtensions {
13 13 public static IPromise<T> DispatchToCurrentContext<T>(this IPromise<T> that) {
14 14 Safe.ArgumentNotNull(that, "that");
15 15 var context = SynchronizationContext.Current;
16 16 if (context == null)
17 17 return that;
18 18
19 19 var p = new SyncContextPromise<T>(context);
20 20 p.On(that.Cancel, PromiseEventType.Cancelled);
21 21
22 22 that.On(
23 23 p.Resolve,
24 24 p.Reject,
25 25 p.Cancel
26 26 );
27 27 return p;
28 28 }
29 29
30 30 public static IPromise<T> DispatchToContext<T>(this IPromise<T> that, SynchronizationContext context) {
31 31 Safe.ArgumentNotNull(that, "that");
32 32 Safe.ArgumentNotNull(context, "context");
33 33
34 34 var p = new SyncContextPromise<T>(context);
35 35 p.On(that.Cancel, PromiseEventType.Cancelled);
36 36
37 37
38 38 that.On(
39 39 p.Resolve,
40 40 p.Reject,
41 41 p.Cancel
42 42 );
43 43 return p;
44 44 }
45 45
46 46 /// <summary>
47 47 /// Ensures the dispatched.
48 48 /// </summary>
49 49 /// <returns>The dispatched.</returns>
50 50 /// <param name="that">That.</param>
51 51 /// <param name="head">Head.</param>
52 52 /// <param name="cleanup">Cleanup.</param>
53 53 /// <typeparam name="TPromise">The 1st type parameter.</typeparam>
54 54 /// <typeparam name="T">The 2nd type parameter.</typeparam>
55 55 public static TPromise EnsureDispatched<TPromise,T>(this TPromise that, IPromise<T> head, Action<T> cleanup) where TPromise : IPromise{
56 56 Safe.ArgumentNotNull(that, "that");
57 57 Safe.ArgumentNotNull(head, "head");
58 58
59 59 that.On(null,null,() => head.On(cleanup));
60 60
61 61 return that;
62 62 }
63 63
64 64 public static AsyncCallback AsyncCallback<T>(this Promise<T> that, Func<IAsyncResult,T> callback) {
65 65 Safe.ArgumentNotNull(that, "that");
66 66 Safe.ArgumentNotNull(callback, "callback");
67 67 var op = TraceContext.Instance.CurrentOperation;
68 68 return ar => {
69 69 TraceContext.Instance.EnterLogicalOperation(op,false);
70 70 try {
71 71 that.Resolve(callback(ar));
72 72 } catch (Exception err) {
73 73 that.Reject(err);
74 74 } finally {
75 75 TraceContext.Instance.Leave();
76 76 }
77 77 };
78 78 }
79 79
80 80 static void CancelCallback(object cookie) {
81 81 ((ICancellable)cookie).Cancel();
82 82 }
83 83
84 84 /// <summary>
85 85 /// Cancells promise after the specified timeout is elapsed.
86 86 /// </summary>
87 87 /// <param name="that">The promise to cancel on timeout.</param>
88 88 /// <param name="milliseconds">The timeout in milliseconds.</param>
89 89 /// <typeparam name="TPromise">The 1st type parameter.</typeparam>
90 90 public static TPromise Timeout<TPromise>(this TPromise that, int milliseconds) where TPromise : IPromise {
91 91 Safe.ArgumentNotNull(that, "that");
92 92 var timer = new Timer(CancelCallback, that, milliseconds, -1);
93 93 that.On(timer.Dispose, PromiseEventType.All);
94 94 return that;
95 95 }
96 96
97 public static IPromise Combine(this ICollection<IPromise> that) {
97 public static IPromise Bundle(this ICollection<IPromise> that) {
98 98 Safe.ArgumentNotNull(that, "that");
99 99
100 100 int count = that.Count;
101 int errors = 0;
101 102 var medium = new Promise();
102 103
104 medium.On(() => {
105 foreach(var p2 in that)
106 p2.Cancel();
107 }, PromiseEventType.ErrorOrCancel);
108
103 109 foreach (var p in that)
104 110 p.On(
105 111 () => {
106 112 if (Interlocked.Decrement(ref count) == 0)
107 113 medium.Resolve();
108 114 },
109 115 error => {
110 throw new Exception("The dependency promise is failed", error);
116 if (Interlocked.Increment(ref errors) == 1)
117 medium.Reject(
118 new Exception("The dependency promise is failed", error)
119 );
111 120 },
112 121 () => {
113 throw new OperationCanceledException("The dependency promise is cancelled");
122 if (Interlocked.Increment(ref errors) == 1)
123 medium.Reject(
124 new Exception("The dependency promise is cancelled")
125 );
114 126 }
115 127 );
116 128
117 129 return medium;
118 130 }
131
132 public static IPromise<T[]> Bundle<T>(this ICollection<IPromise<T>> that) {
133 Safe.ArgumentNotNull(that, "that");
134
135 int count = that.Count;
136 int errors = 0;
137 var medium = new Promise<T[]>();
138 var results = new T[that.Count];
139
140 medium.On(() => {
141 foreach(var p2 in that)
142 p2.Cancel();
143 }, PromiseEventType.ErrorOrCancel);
144
145 int i = 0;
146 foreach (var p in that) {
147 var idx = i;
148 p.On(
149 x => {
150 results[idx] = x;
151 if (Interlocked.Decrement(ref count) == 0)
152 medium.Resolve(results);
153 },
154 error => {
155 if (Interlocked.Increment(ref errors) == 1)
156 medium.Reject(
157 new Exception("The dependency promise is failed", error)
158 );
159 },
160 () => {
161 if (Interlocked.Increment(ref errors) == 1)
162 medium.Reject(
163 new Exception("The dependency promise is cancelled")
164 );
165 }
166 );
167 i++;
168 }
169
170 return medium;
171 }
119 172
120 173 #if NET_4_5
121 174
122 175 public static Task<T> GetTask<T>(this IPromise<T> that) {
123 176 Safe.ArgumentNotNull(that, "that");
124 177 var tcs = new TaskCompletionSource<T>();
125 178
126 179 that.On(tcs.SetResult, tcs.SetException, tcs.SetCanceled);
127 180
128 181 return tcs.Task;
129 182 }
130 183
131 184 #endif
132 185 }
133 186 }
134 187
@@ -1,120 +1,120
1 1 using System;
2 2 using Implab.Diagnostics;
3 3 using Implab.Parallels;
4 4 using Implab;
5 5 using System.Collections.Generic;
6 6 using System.Collections.Concurrent;
7 7
8 8 namespace MonoPlay {
9 9 class MainClass {
10 10 public static void Main(string[] args) {
11 11 if (args == null)
12 12 throw new ArgumentNullException("args");
13 13
14 14 var q1 = new AsyncQueue<int>();
15 15 var q2 = new Queue<int>();
16 16
17 17 const int count = 10000000;
18 18
19 19 int res1 = 0, res2 = 0;
20 20 var t1 = Environment.TickCount;
21 21
22 22 AsyncPool.RunThread(
23 23 () => {
24 24 for (var i = 0; i < count; i++)
25 25 q1.Enqueue(1);
26 26 Console.WriteLine("done writer #1: {0} ms", Environment.TickCount - t1);
27 27 },
28 28 () => {
29 29 for (var i = 0; i < count; i++)
30 30 q1.Enqueue(2);
31 31 Console.WriteLine("done writer #2: {0} ms", Environment.TickCount - t1);
32 32 },
33 33 () => {
34 34 int temp = 0;
35 35 int i = 0;
36 36 while (i < count)
37 37 if (q1.TryDequeue(out temp)) {
38 38 i++;
39 39 res1 += temp;
40 40 }
41 41 Console.WriteLine("done reader #1: {0} ms", Environment.TickCount - t1);
42 42 },
43 43 () => {
44 44 int temp = 0;
45 45 int i = 0;
46 46 while (i < count)
47 47 if (q1.TryDequeue(out temp)) {
48 48 i++;
49 49 res2 += temp;
50 50 }
51 51 Console.WriteLine("done reader #2: {0} ms", Environment.TickCount - t1);
52 52 }
53 53 )
54 .Combine()
54 .Bundle()
55 55 .Join();
56 56
57 57 Console.WriteLine("done: {0} ms, summ#1: {1}, summ#2: {2}, total: {3}, count: {4}", Environment.TickCount - t1, res1, res2, res1 + res2, count*2);
58 58
59 59 var t2 = Environment.TickCount;
60 60 Console.WriteLine("MTQueue: {0} ms", t2 - t1);
61 61
62 62 t1 = Environment.TickCount;
63 63
64 64 for (var i = 0; i < count * 2; i++)
65 65 q2.Enqueue(i);
66 66
67 67 for (var i = 0; i < count * 2; i++)
68 68 q2.Dequeue();
69 69
70 70 t2 = Environment.TickCount;
71 71 Console.WriteLine("Queue: {0} ms", t2 - t1);
72 72
73 73 q2 = new Queue<int>();
74 74
75 75 t1 = Environment.TickCount;
76 76
77 77
78 78 AsyncPool.RunThread(
79 79 () => {
80 80 for (var i = 0; i < count; i++)
81 81 lock (q2)
82 82 q2.Enqueue(i);
83 83 },
84 84 () => {
85 85 for (var i = 0; i < count; i++)
86 86 lock (q2)
87 87 q2.Enqueue(i);
88 88 },
89 89 () => {
90 90 for (int i = 0; i < count ;)
91 91 lock (q2) {
92 92 if (q2.Count == 0)
93 93 continue;
94 94 q2.Dequeue();
95 95 i++;
96 96 }
97 97
98 98 },
99 99 () => {
100 100 for (int i = 0; i < count ;)
101 101 lock (q2) {
102 102 if (q2.Count == 0)
103 103 continue;
104 104 q2.Dequeue();
105 105 i++;
106 106 }
107 107
108 108 }
109 109 )
110 .Combine()
110 .Bundle()
111 111 .Join();
112 112
113 113
114 114
115 115 t2 = Environment.TickCount;
116 116 Console.WriteLine("Queue+Lock: {0} ms", t2 - t1);
117 117
118 118 }
119 119 }
120 120 }
General Comments 0
You need to be logged in to leave comments. Login now