##// END OF EJS Templates
major update, added Drain mathod to AsyncQueue class
cin -
r124:a336cb13c6a9 v2
parent child
Show More
@@ -249,7 +249,7 namespace Implab.Test {
249 for (int i = 0; i < writersCount; i++) {
249 for (int i = 0; i < writersCount; i++) {
250 Interlocked.Increment(ref writers);
250 Interlocked.Increment(ref writers);
251 AsyncPool
251 AsyncPool
252 .InvokeNewThread(() => {
252 .RunThread(() => {
253 for (int ii = 0; ii < itemsPerWriter; ii++) {
253 for (int ii = 0; ii < itemsPerWriter; ii++) {
254 queue.Enqueue(1);
254 queue.Enqueue(1);
255 }
255 }
@@ -261,7 +261,7 namespace Implab.Test {
261 for (int i = 0; i < 10; i++) {
261 for (int i = 0; i < 10; i++) {
262 Interlocked.Increment(ref readers);
262 Interlocked.Increment(ref readers);
263 AsyncPool
263 AsyncPool
264 .InvokeNewThread(() => {
264 .RunThread(() => {
265 int t;
265 int t;
266 do {
266 do {
267 while (queue.TryDequeue(out t))
267 while (queue.TryDequeue(out t))
@@ -336,7 +336,7 namespace Implab.Test {
336 Console.WriteLine("done reader #2: {0} ms", Environment.TickCount - t1);
336 Console.WriteLine("done reader #2: {0} ms", Environment.TickCount - t1);
337 }
337 }
338 )
338 )
339 .Combine()
339 .Bundle()
340 .Join();
340 .Join();
341
341
342 Assert.AreEqual(count * 3, res1 + res2);
342 Assert.AreEqual(count * 3, res1 + res2);
@@ -414,7 +414,7 namespace Implab.Test {
414 Console.WriteLine("done reader #2: {0} ms", Environment.TickCount - t1);
414 Console.WriteLine("done reader #2: {0} ms", Environment.TickCount - t1);
415 }
415 }
416 )
416 )
417 .Combine()
417 .Bundle()
418 .Join();
418 .Join();
419
419
420 Assert.AreEqual(summ , r1 + r2);
420 Assert.AreEqual(summ , r1 + r2);
@@ -490,7 +490,110 namespace Implab.Test {
490 Console.WriteLine("done reader #2: {0} ms, avg chunk size: {1}", Environment.TickCount - t1, avgchunk);
490 Console.WriteLine("done reader #2: {0} ms, avg chunk size: {1}", Environment.TickCount - t1, avgchunk);
491 }
491 }
492 )
492 )
493 .Combine()
493 .Bundle()
494 .Join();
495
496 Assert.AreEqual(summ , r1 + r2);
497
498 Console.WriteLine(
499 "done: {0} ms, summ#1: {1}, summ#2: {2}, total: {3}, count: {4}",
500 Environment.TickCount - t1,
501 r1,
502 r2,
503 r1 + r2,
504 total
505 );
506 }
507
508 [TestMethod]
509 public void AsyncQueueDrainTest() {
510 var queue = new AsyncQueue<int>();
511
512 const int wBatch = 11;
513 const int wCount = 200000;
514 const int total = wBatch * wCount * 3;
515 const int summ = wBatch * wCount * 3;
516
517 int r1 = 0, r2 = 0;
518 const int rBatch = 11;
519 int read = 0;
520
521 var t1 = Environment.TickCount;
522
523 AsyncPool.RunThread(
524 () => {
525 var buffer = new int[wBatch];
526 for(int i = 0; i<wBatch; i++)
527 buffer[i] = 1;
528
529 for(int i =0; i < wCount; i++)
530 queue.EnqueueRange(buffer,0,wBatch);
531 Console.WriteLine("done writer #1: {0} ms", Environment.TickCount - t1);
532 },
533 () => {
534 for(int i =0; i < wCount * wBatch; i++)
535 queue.Enqueue(1);
536 Console.WriteLine("done writer #2: {0} ms", Environment.TickCount - t1);
537 },
538 () => {
539 var buffer = new int[wBatch];
540 for(int i = 0; i<wBatch; i++)
541 buffer[i] = 1;
542
543 for(int i =0; i < wCount; i++)
544 queue.EnqueueRange(buffer,0,wBatch);
545 Console.WriteLine("done writer #3: {0} ms", Environment.TickCount - t1);
546 },
547 /*() => {
548 int temp;
549 int count = 0;
550 while (read < total)
551 if (queue.TryDequeue(out temp)) {
552 count++;
553 r1 += temp;
554 Interlocked.Increment(ref read);
555 }
556 Console.WriteLine("done reader #1: {0} ms, {1} count", Environment.TickCount - t1, count);
557 },*/
558 /*() => {
559 var buffer = new int[rBatch];
560 var count = 0;
561 while(read < total) {
562 int actual;
563 if (queue.TryDequeueRange(buffer,0,rBatch,out actual)) {
564 for(int i=0; i< actual; i++)
565 r1 += buffer[i];
566 Interlocked.Add(ref read, actual);
567 count += actual;
568 }
569 }
570
571 Console.WriteLine("done reader #1: {0} ms, {1} items", Environment.TickCount - t1, count);
572 },*/
573 () => {
574 var count = 0;
575 while(read < total) {
576 var buffer = queue.Drain();
577 for(int i=0; i< buffer.Length; i++)
578 r1 += buffer[i];
579 Interlocked.Add(ref read, buffer.Length);
580 count += buffer.Length;
581 }
582 Console.WriteLine("done reader #1: {0} ms, {1} items", Environment.TickCount - t1, count);
583 },
584 () => {
585 var count = 0;
586 while(read < total) {
587 var buffer = queue.Drain();
588 for(int i=0; i< buffer.Length; i++)
589 r2 += buffer[i];
590 Interlocked.Add(ref read, buffer.Length);
591 count += buffer.Length;
592 }
593 Console.WriteLine("done reader #2: {0} ms, {1} items", Environment.TickCount - t1, count);
594 }
595 )
596 .Bundle()
494 .Join();
597 .Join();
495
598
496 Assert.AreEqual(summ , r1 + r2);
599 Assert.AreEqual(summ , r1 + r2);
@@ -162,7 +162,7 namespace Implab.Parallels {
162 int slots = threads;
162 int slots = threads;
163
163
164 // Analysis disable AccessToDisposedClosure
164 // Analysis disable AccessToDisposedClosure
165 AsyncPool.InvokeNewThread<int>(() => {
165 AsyncPool.RunThread<int>(() => {
166 for (int i = 0; i < source.Length; i++) {
166 for (int i = 0; i < source.Length; i++) {
167 if(promise.IsResolved)
167 if(promise.IsResolved)
168 break; // stop processing in case of error or cancellation
168 break; // stop processing in case of error or cancellation
@@ -31,7 +31,7 namespace Implab.Parallels {
31 return p;
31 return p;
32 }
32 }
33
33
34 public static IPromise<T> InvokeNewThread<T>(Func<T> func) {
34 public static IPromise<T> RunThread<T>(Func<T> func) {
35 var p = new Promise<T>();
35 var p = new Promise<T>();
36
36
37 var caller = TraceContext.Instance.CurrentOperation;
37 var caller = TraceContext.Instance.CurrentOperation;
@@ -53,7 +53,7 namespace Implab.Parallels {
53 }
53 }
54
54
55
55
56 public static IPromise InvokeNewThread(Action func) {
56 public static IPromise RunThread(Action func) {
57 var p = new Promise();
57 var p = new Promise();
58
58
59 var caller = TraceContext.Instance.CurrentOperation;
59 var caller = TraceContext.Instance.CurrentOperation;
@@ -76,11 +76,11 namespace Implab.Parallels {
76 }
76 }
77
77
78 public static IPromise[] RunThread(params Action[] func) {
78 public static IPromise[] RunThread(params Action[] func) {
79 return func.Select(f => InvokeNewThread(f)).ToArray();
79 return func.Select(f => RunThread(f)).ToArray();
80 }
80 }
81
81
82 public static IPromise<T>[] RunThread<T>(params Func<T>[] func) {
82 public static IPromise<T>[] RunThread<T>(params Func<T>[] func) {
83 return func.Select(f => InvokeNewThread(f)).ToArray();
83 return func.Select(f => RunThread(f)).ToArray();
84 }
84 }
85 }
85 }
86 }
86 }
@@ -2,6 +2,7
2 using System.Collections.Generic;
2 using System.Collections.Generic;
3 using System;
3 using System;
4 using System.Collections;
4 using System.Collections;
5 using System.Diagnostics;
5
6
6 namespace Implab.Parallels {
7 namespace Implab.Parallels {
7 public class AsyncQueue<T> : IEnumerable<T> {
8 public class AsyncQueue<T> : IEnumerable<T> {
@@ -60,6 +61,16 namespace Implab.Parallels {
60 return true;
61 return true;
61 }
62 }
62
63
64 /// <summary>
65 /// Prevents from allocating new space in the chunk and waits for all write operations to complete
66 /// </summary>
67 public void Commit() {
68 var actual = Math.Min(Interlocked.Exchange(ref m_alloc, m_size + 1), m_size);
69
70 while (m_hi != actual)
71 Thread.MemoryBarrier();
72 }
73
63 public bool TryDequeue(out T value, out bool recycle) {
74 public bool TryDequeue(out T value, out bool recycle) {
64 int low;
75 int low;
65 do {
76 do {
@@ -359,76 +370,114 namespace Implab.Parallels {
359
370
360 if (last != null)
371 if (last != null)
361 last.next = chunk;
372 last.next = chunk;
362 else
373 else {
363 m_first = chunk;
374 m_first = chunk;
375 }
364 return true;
376 return true;
365 }
377 }
366
378
367 void RecycleFirstChunk(Chunk first) {
379 void RecycleFirstChunk(Chunk first) {
368 var next = first.next;
380 var next = first.next;
369
381
382 if (first != Interlocked.CompareExchange(ref m_first, next, first))
383 return;
384
370 if (next == null) {
385 if (next == null) {
371 // looks like this is the last chunk
386
372 if (first != Interlocked.CompareExchange(ref m_last, null, first)) {
387 if (first != Interlocked.CompareExchange(ref m_last, null, first)) {
388 /*while (first.next == null)
389 Thread.MemoryBarrier();*/
390
373 // race
391 // race
374 // maybe someone already recycled this chunk
392 // someone already updated the tail, restore the pointer to the queue head
375 // or a new chunk has been appedned to the queue
393 m_first = first;
376
377 return; // give up
378 }
394 }
379 // the tail is updated
395 // the tail is updated
380 }
396 }
381
397
382 // we need to update the head
398 // we need to update the head
383 Interlocked.CompareExchange(ref m_first, next, first);
399 //Interlocked.CompareExchange(ref m_first, next, first);
384 // if the head is already updated then give up
400 // if the head is already updated then give up
385 return;
401 //return;
386
402
387 }
403 }
388
404
389 public void Clear() {
405 public void Clear() {
390 // start the new queue
406 // start the new queue
391 var t = new Chunk(m_chunkSize);
407 var chunk = new Chunk(m_chunkSize);
392 Thread.MemoryBarrier();
408
393 m_last = t;
409 do {
394 Thread.MemoryBarrier();
410 Thread.MemoryBarrier();
411 var first = m_first;
412 var last = m_last;
413
414 if (last == null) // nothing to clear
415 return;
395
416
396 // make the new queue available to the readers, and stop the old one
417 if (first == null || (first.next == null && first != last)) // inconcistency
397 m_first = t;
418 continue;
398 Thread.MemoryBarrier();
419
420 // here we will create inconsistency which will force others to spin
421 // and prevent from fetching. chunk.next = null
422 if (first != Interlocked.CompareExchange(ref m_first, chunk, first))
423 continue;// inconsistent
424
425 m_last = chunk;
426
427 return;
428
429 } while(true);
399 }
430 }
400
431
401 public T[] Drain() {
432 public T[] Drain() {
402 // start the new queue
433 // start the new queue
403 var t = new Chunk(m_chunkSize);
434 var chunk = new Chunk(m_chunkSize);
404 Thread.MemoryBarrier();
405 m_last = t;
406 Thread.MemoryBarrier();
407
408 // make the new queue available to the readers, and stop the old one
409 Chunk first;
410
435
411 do {
436 do {
412 first = m_first;
437 Thread.MemoryBarrier();
413 } while(first != Interlocked.CompareExchange(ref m_first
438 var first = m_first;
414 Thread.MemoryBarrier();
439 var last = m_last;
440
441 if (last == null)
442 return new T[0];
443
444 if (first == null || (first.next == null && first != last))
445 continue;
415
446
447 // here we will create inconsistency which will force others to spin
448 // and prevent from fetching. chunk.next = null
449 if (first != Interlocked.CompareExchange(ref m_first, chunk, first))
450 continue;// inconsistent
416
451
452 last = Interlocked.Exchange(ref m_last, chunk);
453
454 return ReadChunks(first, last);
455
456 } while(true);
417 }
457 }
418
458
419 T[] ReadChunks(Chunk chunk) {
459 T[] ReadChunks(Chunk chunk, object last) {
420 var result = new List<T>();
460 var result = new List<T>();
421 var buffer = new T[m_chunkSize];
461 var buffer = new T[m_chunkSize];
422 int actual;
462 int actual;
423 bool recycle;
463 bool recycle;
424 while (chunk != null) {
464 while (chunk != null) {
465 // ensure all write operations on the chunk are complete
466 chunk.Commit();
467
425 // we need to read the chunk using this way
468 // we need to read the chunk using this way
426 // since some client still may completing the dequeue
469 // since some client still may completing the dequeue
427 // operation, such clients most likely won't get results
470 // operation, such clients most likely won't get results
428 while (chunk.TryDequeueBatch(buffer, 0, buffer.Length, out actual, out recycle))
471 while (chunk.TryDequeueBatch(buffer, 0, buffer.Length, out actual, out recycle))
429 result.AddRange(new ArraySegmentCollection(buffer, 0, actual));
472 result.AddRange(new ArraySegmentCollection(buffer, 0, actual));
430
473
431 chunk = chunk.next;
474 if (chunk == last) {
475 chunk = null;
476 } else {
477 while (chunk.next == null)
478 Thread.MemoryBarrier();
479 chunk = chunk.next;
480 }
432 }
481 }
433
482
434 return result.ToArray();
483 return result.ToArray();
@@ -94,12 +94,18 namespace Implab {
94 return that;
94 return that;
95 }
95 }
96
96
97 public static IPromise Combine(this ICollection<IPromise> that) {
97 public static IPromise Bundle(this ICollection<IPromise> that) {
98 Safe.ArgumentNotNull(that, "that");
98 Safe.ArgumentNotNull(that, "that");
99
99
100 int count = that.Count;
100 int count = that.Count;
101 int errors = 0;
101 var medium = new Promise();
102 var medium = new Promise();
102
103
104 medium.On(() => {
105 foreach(var p2 in that)
106 p2.Cancel();
107 }, PromiseEventType.ErrorOrCancel);
108
103 foreach (var p in that)
109 foreach (var p in that)
104 p.On(
110 p.On(
105 () => {
111 () => {
@@ -107,15 +113,62 namespace Implab {
107 medium.Resolve();
113 medium.Resolve();
108 },
114 },
109 error => {
115 error => {
110 throw new Exception("The dependency promise is failed", error);
116 if (Interlocked.Increment(ref errors) == 1)
117 medium.Reject(
118 new Exception("The dependency promise is failed", error)
119 );
111 },
120 },
112 () => {
121 () => {
113 throw new OperationCanceledException("The dependency promise is cancelled");
122 if (Interlocked.Increment(ref errors) == 1)
123 medium.Reject(
124 new Exception("The dependency promise is cancelled")
125 );
114 }
126 }
115 );
127 );
116
128
117 return medium;
129 return medium;
118 }
130 }
131
132 public static IPromise<T[]> Bundle<T>(this ICollection<IPromise<T>> that) {
133 Safe.ArgumentNotNull(that, "that");
134
135 int count = that.Count;
136 int errors = 0;
137 var medium = new Promise<T[]>();
138 var results = new T[that.Count];
139
140 medium.On(() => {
141 foreach(var p2 in that)
142 p2.Cancel();
143 }, PromiseEventType.ErrorOrCancel);
144
145 int i = 0;
146 foreach (var p in that) {
147 var idx = i;
148 p.On(
149 x => {
150 results[idx] = x;
151 if (Interlocked.Decrement(ref count) == 0)
152 medium.Resolve(results);
153 },
154 error => {
155 if (Interlocked.Increment(ref errors) == 1)
156 medium.Reject(
157 new Exception("The dependency promise is failed", error)
158 );
159 },
160 () => {
161 if (Interlocked.Increment(ref errors) == 1)
162 medium.Reject(
163 new Exception("The dependency promise is cancelled")
164 );
165 }
166 );
167 i++;
168 }
169
170 return medium;
171 }
119
172
120 #if NET_4_5
173 #if NET_4_5
121
174
@@ -51,7 +51,7 namespace MonoPlay {
51 Console.WriteLine("done reader #2: {0} ms", Environment.TickCount - t1);
51 Console.WriteLine("done reader #2: {0} ms", Environment.TickCount - t1);
52 }
52 }
53 )
53 )
54 .Combine()
54 .Bundle()
55 .Join();
55 .Join();
56
56
57 Console.WriteLine("done: {0} ms, summ#1: {1}, summ#2: {2}, total: {3}, count: {4}", Environment.TickCount - t1, res1, res2, res1 + res2, count*2);
57 Console.WriteLine("done: {0} ms, summ#1: {1}, summ#2: {2}, total: {3}, count: {4}", Environment.TickCount - t1, res1, res2, res1 + res2, count*2);
@@ -107,7 +107,7 namespace MonoPlay {
107
107
108 }
108 }
109 )
109 )
110 .Combine()
110 .Bundle()
111 .Join();
111 .Join();
112
112
113
113
General Comments 0
You need to be logged in to leave comments. Login now