##// END OF EJS Templates
major update, added Drain mathod to AsyncQueue class
cin -
r124:a336cb13c6a9 v2
parent child
Show More
@@ -249,7 +249,7 namespace Implab.Test {
249 249 for (int i = 0; i < writersCount; i++) {
250 250 Interlocked.Increment(ref writers);
251 251 AsyncPool
252 .InvokeNewThread(() => {
252 .RunThread(() => {
253 253 for (int ii = 0; ii < itemsPerWriter; ii++) {
254 254 queue.Enqueue(1);
255 255 }
@@ -261,7 +261,7 namespace Implab.Test {
261 261 for (int i = 0; i < 10; i++) {
262 262 Interlocked.Increment(ref readers);
263 263 AsyncPool
264 .InvokeNewThread(() => {
264 .RunThread(() => {
265 265 int t;
266 266 do {
267 267 while (queue.TryDequeue(out t))
@@ -336,7 +336,7 namespace Implab.Test {
336 336 Console.WriteLine("done reader #2: {0} ms", Environment.TickCount - t1);
337 337 }
338 338 )
339 .Combine()
339 .Bundle()
340 340 .Join();
341 341
342 342 Assert.AreEqual(count * 3, res1 + res2);
@@ -414,7 +414,7 namespace Implab.Test {
414 414 Console.WriteLine("done reader #2: {0} ms", Environment.TickCount - t1);
415 415 }
416 416 )
417 .Combine()
417 .Bundle()
418 418 .Join();
419 419
420 420 Assert.AreEqual(summ , r1 + r2);
@@ -490,7 +490,110 namespace Implab.Test {
490 490 Console.WriteLine("done reader #2: {0} ms, avg chunk size: {1}", Environment.TickCount - t1, avgchunk);
491 491 }
492 492 )
493 .Combine()
493 .Bundle()
494 .Join();
495
496 Assert.AreEqual(summ , r1 + r2);
497
498 Console.WriteLine(
499 "done: {0} ms, summ#1: {1}, summ#2: {2}, total: {3}, count: {4}",
500 Environment.TickCount - t1,
501 r1,
502 r2,
503 r1 + r2,
504 total
505 );
506 }
507
508 [TestMethod]
509 public void AsyncQueueDrainTest() {
510 var queue = new AsyncQueue<int>();
511
512 const int wBatch = 11;
513 const int wCount = 200000;
514 const int total = wBatch * wCount * 3;
515 const int summ = wBatch * wCount * 3;
516
517 int r1 = 0, r2 = 0;
518 const int rBatch = 11;
519 int read = 0;
520
521 var t1 = Environment.TickCount;
522
523 AsyncPool.RunThread(
524 () => {
525 var buffer = new int[wBatch];
526 for(int i = 0; i<wBatch; i++)
527 buffer[i] = 1;
528
529 for(int i =0; i < wCount; i++)
530 queue.EnqueueRange(buffer,0,wBatch);
531 Console.WriteLine("done writer #1: {0} ms", Environment.TickCount - t1);
532 },
533 () => {
534 for(int i =0; i < wCount * wBatch; i++)
535 queue.Enqueue(1);
536 Console.WriteLine("done writer #2: {0} ms", Environment.TickCount - t1);
537 },
538 () => {
539 var buffer = new int[wBatch];
540 for(int i = 0; i<wBatch; i++)
541 buffer[i] = 1;
542
543 for(int i =0; i < wCount; i++)
544 queue.EnqueueRange(buffer,0,wBatch);
545 Console.WriteLine("done writer #3: {0} ms", Environment.TickCount - t1);
546 },
547 /*() => {
548 int temp;
549 int count = 0;
550 while (read < total)
551 if (queue.TryDequeue(out temp)) {
552 count++;
553 r1 += temp;
554 Interlocked.Increment(ref read);
555 }
556 Console.WriteLine("done reader #1: {0} ms, {1} count", Environment.TickCount - t1, count);
557 },*/
558 /*() => {
559 var buffer = new int[rBatch];
560 var count = 0;
561 while(read < total) {
562 int actual;
563 if (queue.TryDequeueRange(buffer,0,rBatch,out actual)) {
564 for(int i=0; i< actual; i++)
565 r1 += buffer[i];
566 Interlocked.Add(ref read, actual);
567 count += actual;
568 }
569 }
570
571 Console.WriteLine("done reader #1: {0} ms, {1} items", Environment.TickCount - t1, count);
572 },*/
573 () => {
574 var count = 0;
575 while(read < total) {
576 var buffer = queue.Drain();
577 for(int i=0; i< buffer.Length; i++)
578 r1 += buffer[i];
579 Interlocked.Add(ref read, buffer.Length);
580 count += buffer.Length;
581 }
582 Console.WriteLine("done reader #1: {0} ms, {1} items", Environment.TickCount - t1, count);
583 },
584 () => {
585 var count = 0;
586 while(read < total) {
587 var buffer = queue.Drain();
588 for(int i=0; i< buffer.Length; i++)
589 r2 += buffer[i];
590 Interlocked.Add(ref read, buffer.Length);
591 count += buffer.Length;
592 }
593 Console.WriteLine("done reader #2: {0} ms, {1} items", Environment.TickCount - t1, count);
594 }
595 )
596 .Bundle()
494 597 .Join();
495 598
496 599 Assert.AreEqual(summ , r1 + r2);
@@ -162,7 +162,7 namespace Implab.Parallels {
162 162 int slots = threads;
163 163
164 164 // Analysis disable AccessToDisposedClosure
165 AsyncPool.InvokeNewThread<int>(() => {
165 AsyncPool.RunThread<int>(() => {
166 166 for (int i = 0; i < source.Length; i++) {
167 167 if(promise.IsResolved)
168 168 break; // stop processing in case of error or cancellation
@@ -31,7 +31,7 namespace Implab.Parallels {
31 31 return p;
32 32 }
33 33
34 public static IPromise<T> InvokeNewThread<T>(Func<T> func) {
34 public static IPromise<T> RunThread<T>(Func<T> func) {
35 35 var p = new Promise<T>();
36 36
37 37 var caller = TraceContext.Instance.CurrentOperation;
@@ -53,7 +53,7 namespace Implab.Parallels {
53 53 }
54 54
55 55
56 public static IPromise InvokeNewThread(Action func) {
56 public static IPromise RunThread(Action func) {
57 57 var p = new Promise();
58 58
59 59 var caller = TraceContext.Instance.CurrentOperation;
@@ -76,11 +76,11 namespace Implab.Parallels {
76 76 }
77 77
78 78 public static IPromise[] RunThread(params Action[] func) {
79 return func.Select(f => InvokeNewThread(f)).ToArray();
79 return func.Select(f => RunThread(f)).ToArray();
80 80 }
81 81
82 82 public static IPromise<T>[] RunThread<T>(params Func<T>[] func) {
83 return func.Select(f => InvokeNewThread(f)).ToArray();
83 return func.Select(f => RunThread(f)).ToArray();
84 84 }
85 85 }
86 86 }
@@ -2,6 +2,7
2 2 using System.Collections.Generic;
3 3 using System;
4 4 using System.Collections;
5 using System.Diagnostics;
5 6
6 7 namespace Implab.Parallels {
7 8 public class AsyncQueue<T> : IEnumerable<T> {
@@ -60,6 +61,16 namespace Implab.Parallels {
60 61 return true;
61 62 }
62 63
64 /// <summary>
65 /// Prevents from allocating new space in the chunk and waits for all write operations to complete
66 /// </summary>
67 public void Commit() {
68 var actual = Math.Min(Interlocked.Exchange(ref m_alloc, m_size + 1), m_size);
69
70 while (m_hi != actual)
71 Thread.MemoryBarrier();
72 }
73
63 74 public bool TryDequeue(out T value, out bool recycle) {
64 75 int low;
65 76 do {
@@ -359,77 +370,115 namespace Implab.Parallels {
359 370
360 371 if (last != null)
361 372 last.next = chunk;
362 else
373 else {
363 374 m_first = chunk;
375 }
364 376 return true;
365 377 }
366 378
367 379 void RecycleFirstChunk(Chunk first) {
368 380 var next = first.next;
369 381
382 if (first != Interlocked.CompareExchange(ref m_first, next, first))
383 return;
384
370 385 if (next == null) {
371 // looks like this is the last chunk
386
372 387 if (first != Interlocked.CompareExchange(ref m_last, null, first)) {
388 /*while (first.next == null)
389 Thread.MemoryBarrier();*/
390
373 391 // race
374 // maybe someone already recycled this chunk
375 // or a new chunk has been appedned to the queue
376
377 return; // give up
392 // someone already updated the tail, restore the pointer to the queue head
393 m_first = first;
378 394 }
379 395 // the tail is updated
380 396 }
381 397
382 398 // we need to update the head
383 Interlocked.CompareExchange(ref m_first, next, first);
399 //Interlocked.CompareExchange(ref m_first, next, first);
384 400 // if the head is already updated then give up
385 return;
401 //return;
386 402
387 403 }
388 404
389 405 public void Clear() {
390 406 // start the new queue
391 var t = new Chunk(m_chunkSize);
392 Thread.MemoryBarrier();
393 m_last = t;
407 var chunk = new Chunk(m_chunkSize);
408
409 do {
394 410 Thread.MemoryBarrier();
411 var first = m_first;
412 var last = m_last;
395 413
396 // make the new queue available to the readers, and stop the old one
397 m_first = t;
398 Thread.MemoryBarrier();
414 if (last == null) // nothing to clear
415 return;
416
417 if (first == null || (first.next == null && first != last)) // inconcistency
418 continue;
419
420 // here we will create inconsistency which will force others to spin
421 // and prevent from fetching. chunk.next = null
422 if (first != Interlocked.CompareExchange(ref m_first, chunk, first))
423 continue;// inconsistent
424
425 m_last = chunk;
426
427 return;
428
429 } while(true);
399 430 }
400 431
401 432 public T[] Drain() {
402 433 // start the new queue
403 var t = new Chunk(m_chunkSize);
404 Thread.MemoryBarrier();
405 m_last = t;
406 Thread.MemoryBarrier();
407
408 // make the new queue available to the readers, and stop the old one
409 Chunk first;
434 var chunk = new Chunk(m_chunkSize);
410 435
411 436 do {
412 first = m_first;
413 } while(first != Interlocked.CompareExchange(ref m_first
414 437 Thread.MemoryBarrier();
438 var first = m_first;
439 var last = m_last;
415 440
441 if (last == null)
442 return new T[0];
416 443
444 if (first == null || (first.next == null && first != last))
445 continue;
446
447 // here we will create inconsistency which will force others to spin
448 // and prevent from fetching. chunk.next = null
449 if (first != Interlocked.CompareExchange(ref m_first, chunk, first))
450 continue;// inconsistent
451
452 last = Interlocked.Exchange(ref m_last, chunk);
453
454 return ReadChunks(first, last);
455
456 } while(true);
417 457 }
418 458
419 T[] ReadChunks(Chunk chunk) {
459 T[] ReadChunks(Chunk chunk, object last) {
420 460 var result = new List<T>();
421 461 var buffer = new T[m_chunkSize];
422 462 int actual;
423 463 bool recycle;
424 464 while (chunk != null) {
465 // ensure all write operations on the chunk are complete
466 chunk.Commit();
467
425 468 // we need to read the chunk using this way
426 469 // since some client still may completing the dequeue
427 470 // operation, such clients most likely won't get results
428 471 while (chunk.TryDequeueBatch(buffer, 0, buffer.Length, out actual, out recycle))
429 472 result.AddRange(new ArraySegmentCollection(buffer, 0, actual));
430 473
474 if (chunk == last) {
475 chunk = null;
476 } else {
477 while (chunk.next == null)
478 Thread.MemoryBarrier();
431 479 chunk = chunk.next;
432 480 }
481 }
433 482
434 483 return result.ToArray();
435 484 }
@@ -94,12 +94,18 namespace Implab {
94 94 return that;
95 95 }
96 96
97 public static IPromise Combine(this ICollection<IPromise> that) {
97 public static IPromise Bundle(this ICollection<IPromise> that) {
98 98 Safe.ArgumentNotNull(that, "that");
99 99
100 100 int count = that.Count;
101 int errors = 0;
101 102 var medium = new Promise();
102 103
104 medium.On(() => {
105 foreach(var p2 in that)
106 p2.Cancel();
107 }, PromiseEventType.ErrorOrCancel);
108
103 109 foreach (var p in that)
104 110 p.On(
105 111 () => {
@@ -107,16 +113,63 namespace Implab {
107 113 medium.Resolve();
108 114 },
109 115 error => {
110 throw new Exception("The dependency promise is failed", error);
116 if (Interlocked.Increment(ref errors) == 1)
117 medium.Reject(
118 new Exception("The dependency promise is failed", error)
119 );
111 120 },
112 121 () => {
113 throw new OperationCanceledException("The dependency promise is cancelled");
122 if (Interlocked.Increment(ref errors) == 1)
123 medium.Reject(
124 new Exception("The dependency promise is cancelled")
125 );
114 126 }
115 127 );
116 128
117 129 return medium;
118 130 }
119 131
132 public static IPromise<T[]> Bundle<T>(this ICollection<IPromise<T>> that) {
133 Safe.ArgumentNotNull(that, "that");
134
135 int count = that.Count;
136 int errors = 0;
137 var medium = new Promise<T[]>();
138 var results = new T[that.Count];
139
140 medium.On(() => {
141 foreach(var p2 in that)
142 p2.Cancel();
143 }, PromiseEventType.ErrorOrCancel);
144
145 int i = 0;
146 foreach (var p in that) {
147 var idx = i;
148 p.On(
149 x => {
150 results[idx] = x;
151 if (Interlocked.Decrement(ref count) == 0)
152 medium.Resolve(results);
153 },
154 error => {
155 if (Interlocked.Increment(ref errors) == 1)
156 medium.Reject(
157 new Exception("The dependency promise is failed", error)
158 );
159 },
160 () => {
161 if (Interlocked.Increment(ref errors) == 1)
162 medium.Reject(
163 new Exception("The dependency promise is cancelled")
164 );
165 }
166 );
167 i++;
168 }
169
170 return medium;
171 }
172
120 173 #if NET_4_5
121 174
122 175 public static Task<T> GetTask<T>(this IPromise<T> that) {
@@ -51,7 +51,7 namespace MonoPlay {
51 51 Console.WriteLine("done reader #2: {0} ms", Environment.TickCount - t1);
52 52 }
53 53 )
54 .Combine()
54 .Bundle()
55 55 .Join();
56 56
57 57 Console.WriteLine("done: {0} ms, summ#1: {1}, summ#2: {2}, total: {3}, count: {4}", Environment.TickCount - t1, res1, res2, res1 + res2, count*2);
@@ -107,7 +107,7 namespace MonoPlay {
107 107
108 108 }
109 109 )
110 .Combine()
110 .Bundle()
111 111 .Join();
112 112
113 113
General Comments 0
You need to be logged in to leave comments. Login now