@@ -249,7 +249,7 namespace Implab.Test { | |||||
249 | for (int i = 0; i < writersCount; i++) { |
|
249 | for (int i = 0; i < writersCount; i++) { | |
250 | Interlocked.Increment(ref writers); |
|
250 | Interlocked.Increment(ref writers); | |
251 | AsyncPool |
|
251 | AsyncPool | |
252 |
. |
|
252 | .RunThread(() => { | |
253 | for (int ii = 0; ii < itemsPerWriter; ii++) { |
|
253 | for (int ii = 0; ii < itemsPerWriter; ii++) { | |
254 | queue.Enqueue(1); |
|
254 | queue.Enqueue(1); | |
255 | } |
|
255 | } | |
@@ -261,7 +261,7 namespace Implab.Test { | |||||
261 | for (int i = 0; i < 10; i++) { |
|
261 | for (int i = 0; i < 10; i++) { | |
262 | Interlocked.Increment(ref readers); |
|
262 | Interlocked.Increment(ref readers); | |
263 | AsyncPool |
|
263 | AsyncPool | |
264 |
. |
|
264 | .RunThread(() => { | |
265 | int t; |
|
265 | int t; | |
266 | do { |
|
266 | do { | |
267 | while (queue.TryDequeue(out t)) |
|
267 | while (queue.TryDequeue(out t)) | |
@@ -336,7 +336,7 namespace Implab.Test { | |||||
336 | Console.WriteLine("done reader #2: {0} ms", Environment.TickCount - t1); |
|
336 | Console.WriteLine("done reader #2: {0} ms", Environment.TickCount - t1); | |
337 | } |
|
337 | } | |
338 | ) |
|
338 | ) | |
339 |
. |
|
339 | .Bundle() | |
340 | .Join(); |
|
340 | .Join(); | |
341 |
|
341 | |||
342 | Assert.AreEqual(count * 3, res1 + res2); |
|
342 | Assert.AreEqual(count * 3, res1 + res2); | |
@@ -414,7 +414,7 namespace Implab.Test { | |||||
414 | Console.WriteLine("done reader #2: {0} ms", Environment.TickCount - t1); |
|
414 | Console.WriteLine("done reader #2: {0} ms", Environment.TickCount - t1); | |
415 | } |
|
415 | } | |
416 | ) |
|
416 | ) | |
417 |
. |
|
417 | .Bundle() | |
418 | .Join(); |
|
418 | .Join(); | |
419 |
|
419 | |||
420 | Assert.AreEqual(summ , r1 + r2); |
|
420 | Assert.AreEqual(summ , r1 + r2); | |
@@ -490,7 +490,110 namespace Implab.Test { | |||||
490 | Console.WriteLine("done reader #2: {0} ms, avg chunk size: {1}", Environment.TickCount - t1, avgchunk); |
|
490 | Console.WriteLine("done reader #2: {0} ms, avg chunk size: {1}", Environment.TickCount - t1, avgchunk); | |
491 | } |
|
491 | } | |
492 | ) |
|
492 | ) | |
493 |
. |
|
493 | .Bundle() | |
|
494 | .Join(); | |||
|
495 | ||||
|
496 | Assert.AreEqual(summ , r1 + r2); | |||
|
497 | ||||
|
498 | Console.WriteLine( | |||
|
499 | "done: {0} ms, summ#1: {1}, summ#2: {2}, total: {3}, count: {4}", | |||
|
500 | Environment.TickCount - t1, | |||
|
501 | r1, | |||
|
502 | r2, | |||
|
503 | r1 + r2, | |||
|
504 | total | |||
|
505 | ); | |||
|
506 | } | |||
|
507 | ||||
|
508 | [TestMethod] | |||
|
509 | public void AsyncQueueDrainTest() { | |||
|
510 | var queue = new AsyncQueue<int>(); | |||
|
511 | ||||
|
512 | const int wBatch = 11; | |||
|
513 | const int wCount = 200000; | |||
|
514 | const int total = wBatch * wCount * 3; | |||
|
515 | const int summ = wBatch * wCount * 3; | |||
|
516 | ||||
|
517 | int r1 = 0, r2 = 0; | |||
|
518 | const int rBatch = 11; | |||
|
519 | int read = 0; | |||
|
520 | ||||
|
521 | var t1 = Environment.TickCount; | |||
|
522 | ||||
|
523 | AsyncPool.RunThread( | |||
|
524 | () => { | |||
|
525 | var buffer = new int[wBatch]; | |||
|
526 | for(int i = 0; i<wBatch; i++) | |||
|
527 | buffer[i] = 1; | |||
|
528 | ||||
|
529 | for(int i =0; i < wCount; i++) | |||
|
530 | queue.EnqueueRange(buffer,0,wBatch); | |||
|
531 | Console.WriteLine("done writer #1: {0} ms", Environment.TickCount - t1); | |||
|
532 | }, | |||
|
533 | () => { | |||
|
534 | for(int i =0; i < wCount * wBatch; i++) | |||
|
535 | queue.Enqueue(1); | |||
|
536 | Console.WriteLine("done writer #2: {0} ms", Environment.TickCount - t1); | |||
|
537 | }, | |||
|
538 | () => { | |||
|
539 | var buffer = new int[wBatch]; | |||
|
540 | for(int i = 0; i<wBatch; i++) | |||
|
541 | buffer[i] = 1; | |||
|
542 | ||||
|
543 | for(int i =0; i < wCount; i++) | |||
|
544 | queue.EnqueueRange(buffer,0,wBatch); | |||
|
545 | Console.WriteLine("done writer #3: {0} ms", Environment.TickCount - t1); | |||
|
546 | }, | |||
|
547 | /*() => { | |||
|
548 | int temp; | |||
|
549 | int count = 0; | |||
|
550 | while (read < total) | |||
|
551 | if (queue.TryDequeue(out temp)) { | |||
|
552 | count++; | |||
|
553 | r1 += temp; | |||
|
554 | Interlocked.Increment(ref read); | |||
|
555 | } | |||
|
556 | Console.WriteLine("done reader #1: {0} ms, {1} count", Environment.TickCount - t1, count); | |||
|
557 | },*/ | |||
|
558 | /*() => { | |||
|
559 | var buffer = new int[rBatch]; | |||
|
560 | var count = 0; | |||
|
561 | while(read < total) { | |||
|
562 | int actual; | |||
|
563 | if (queue.TryDequeueRange(buffer,0,rBatch,out actual)) { | |||
|
564 | for(int i=0; i< actual; i++) | |||
|
565 | r1 += buffer[i]; | |||
|
566 | Interlocked.Add(ref read, actual); | |||
|
567 | count += actual; | |||
|
568 | } | |||
|
569 | } | |||
|
570 | ||||
|
571 | Console.WriteLine("done reader #1: {0} ms, {1} items", Environment.TickCount - t1, count); | |||
|
572 | },*/ | |||
|
573 | () => { | |||
|
574 | var count = 0; | |||
|
575 | while(read < total) { | |||
|
576 | var buffer = queue.Drain(); | |||
|
577 | for(int i=0; i< buffer.Length; i++) | |||
|
578 | r1 += buffer[i]; | |||
|
579 | Interlocked.Add(ref read, buffer.Length); | |||
|
580 | count += buffer.Length; | |||
|
581 | } | |||
|
582 | Console.WriteLine("done reader #1: {0} ms, {1} items", Environment.TickCount - t1, count); | |||
|
583 | }, | |||
|
584 | () => { | |||
|
585 | var count = 0; | |||
|
586 | while(read < total) { | |||
|
587 | var buffer = queue.Drain(); | |||
|
588 | for(int i=0; i< buffer.Length; i++) | |||
|
589 | r2 += buffer[i]; | |||
|
590 | Interlocked.Add(ref read, buffer.Length); | |||
|
591 | count += buffer.Length; | |||
|
592 | } | |||
|
593 | Console.WriteLine("done reader #2: {0} ms, {1} items", Environment.TickCount - t1, count); | |||
|
594 | } | |||
|
595 | ) | |||
|
596 | .Bundle() | |||
494 | .Join(); |
|
597 | .Join(); | |
495 |
|
598 | |||
496 | Assert.AreEqual(summ , r1 + r2); |
|
599 | Assert.AreEqual(summ , r1 + r2); |
@@ -162,7 +162,7 namespace Implab.Parallels { | |||||
162 | int slots = threads; |
|
162 | int slots = threads; | |
163 |
|
163 | |||
164 | // Analysis disable AccessToDisposedClosure |
|
164 | // Analysis disable AccessToDisposedClosure | |
165 |
AsyncPool. |
|
165 | AsyncPool.RunThread<int>(() => { | |
166 | for (int i = 0; i < source.Length; i++) { |
|
166 | for (int i = 0; i < source.Length; i++) { | |
167 | if(promise.IsResolved) |
|
167 | if(promise.IsResolved) | |
168 | break; // stop processing in case of error or cancellation |
|
168 | break; // stop processing in case of error or cancellation |
@@ -31,7 +31,7 namespace Implab.Parallels { | |||||
31 | return p; |
|
31 | return p; | |
32 | } |
|
32 | } | |
33 |
|
33 | |||
34 |
public static IPromise<T> |
|
34 | public static IPromise<T> RunThread<T>(Func<T> func) { | |
35 | var p = new Promise<T>(); |
|
35 | var p = new Promise<T>(); | |
36 |
|
36 | |||
37 | var caller = TraceContext.Instance.CurrentOperation; |
|
37 | var caller = TraceContext.Instance.CurrentOperation; | |
@@ -53,7 +53,7 namespace Implab.Parallels { | |||||
53 | } |
|
53 | } | |
54 |
|
54 | |||
55 |
|
55 | |||
56 |
public static IPromise |
|
56 | public static IPromise RunThread(Action func) { | |
57 | var p = new Promise(); |
|
57 | var p = new Promise(); | |
58 |
|
58 | |||
59 | var caller = TraceContext.Instance.CurrentOperation; |
|
59 | var caller = TraceContext.Instance.CurrentOperation; | |
@@ -76,11 +76,11 namespace Implab.Parallels { | |||||
76 | } |
|
76 | } | |
77 |
|
77 | |||
78 | public static IPromise[] RunThread(params Action[] func) { |
|
78 | public static IPromise[] RunThread(params Action[] func) { | |
79 |
return func.Select(f => |
|
79 | return func.Select(f => RunThread(f)).ToArray(); | |
80 | } |
|
80 | } | |
81 |
|
81 | |||
82 | public static IPromise<T>[] RunThread<T>(params Func<T>[] func) { |
|
82 | public static IPromise<T>[] RunThread<T>(params Func<T>[] func) { | |
83 |
return func.Select(f => |
|
83 | return func.Select(f => RunThread(f)).ToArray(); | |
84 | } |
|
84 | } | |
85 | } |
|
85 | } | |
86 | } |
|
86 | } |
@@ -2,6 +2,7 | |||||
2 | using System.Collections.Generic; |
|
2 | using System.Collections.Generic; | |
3 | using System; |
|
3 | using System; | |
4 | using System.Collections; |
|
4 | using System.Collections; | |
|
5 | using System.Diagnostics; | |||
5 |
|
6 | |||
6 | namespace Implab.Parallels { |
|
7 | namespace Implab.Parallels { | |
7 | public class AsyncQueue<T> : IEnumerable<T> { |
|
8 | public class AsyncQueue<T> : IEnumerable<T> { | |
@@ -60,6 +61,16 namespace Implab.Parallels { | |||||
60 | return true; |
|
61 | return true; | |
61 | } |
|
62 | } | |
62 |
|
63 | |||
|
64 | /// <summary> | |||
|
65 | /// Prevents from allocating new space in the chunk and waits for all write operations to complete | |||
|
66 | /// </summary> | |||
|
67 | public void Commit() { | |||
|
68 | var actual = Math.Min(Interlocked.Exchange(ref m_alloc, m_size + 1), m_size); | |||
|
69 | ||||
|
70 | while (m_hi != actual) | |||
|
71 | Thread.MemoryBarrier(); | |||
|
72 | } | |||
|
73 | ||||
63 | public bool TryDequeue(out T value, out bool recycle) { |
|
74 | public bool TryDequeue(out T value, out bool recycle) { | |
64 | int low; |
|
75 | int low; | |
65 | do { |
|
76 | do { | |
@@ -359,77 +370,115 namespace Implab.Parallels { | |||||
359 |
|
370 | |||
360 | if (last != null) |
|
371 | if (last != null) | |
361 | last.next = chunk; |
|
372 | last.next = chunk; | |
362 | else |
|
373 | else { | |
363 | m_first = chunk; |
|
374 | m_first = chunk; | |
|
375 | } | |||
364 | return true; |
|
376 | return true; | |
365 | } |
|
377 | } | |
366 |
|
378 | |||
367 | void RecycleFirstChunk(Chunk first) { |
|
379 | void RecycleFirstChunk(Chunk first) { | |
368 | var next = first.next; |
|
380 | var next = first.next; | |
369 |
|
381 | |||
|
382 | if (first != Interlocked.CompareExchange(ref m_first, next, first)) | |||
|
383 | return; | |||
|
384 | ||||
370 | if (next == null) { |
|
385 | if (next == null) { | |
371 | // looks like this is the last chunk |
|
386 | ||
372 | if (first != Interlocked.CompareExchange(ref m_last, null, first)) { |
|
387 | if (first != Interlocked.CompareExchange(ref m_last, null, first)) { | |
|
388 | /*while (first.next == null) | |||
|
389 | Thread.MemoryBarrier();*/ | |||
|
390 | ||||
373 | // race |
|
391 | // race | |
374 |
// |
|
392 | // someone already updated the tail, restore the pointer to the queue head | |
375 | // or a new chunk has been appedned to the queue |
|
393 | m_first = first; | |
376 |
|
||||
377 | return; // give up |
|
|||
378 | } |
|
394 | } | |
379 | // the tail is updated |
|
395 | // the tail is updated | |
380 | } |
|
396 | } | |
381 |
|
397 | |||
382 | // we need to update the head |
|
398 | // we need to update the head | |
383 | Interlocked.CompareExchange(ref m_first, next, first); |
|
399 | //Interlocked.CompareExchange(ref m_first, next, first); | |
384 | // if the head is already updated then give up |
|
400 | // if the head is already updated then give up | |
385 | return; |
|
401 | //return; | |
386 |
|
402 | |||
387 | } |
|
403 | } | |
388 |
|
404 | |||
389 | public void Clear() { |
|
405 | public void Clear() { | |
390 | // start the new queue |
|
406 | // start the new queue | |
391 |
var |
|
407 | var chunk = new Chunk(m_chunkSize); | |
392 | Thread.MemoryBarrier(); |
|
408 | ||
393 |
|
|
409 | do { | |
394 | Thread.MemoryBarrier(); |
|
410 | Thread.MemoryBarrier(); | |
|
411 | var first = m_first; | |||
|
412 | var last = m_last; | |||
395 |
|
413 | |||
396 | // make the new queue available to the readers, and stop the old one |
|
414 | if (last == null) // nothing to clear | |
397 | m_first = t; |
|
415 | return; | |
398 | Thread.MemoryBarrier(); |
|
416 | ||
|
417 | if (first == null || (first.next == null && first != last)) // inconcistency | |||
|
418 | continue; | |||
|
419 | ||||
|
420 | // here we will create inconsistency which will force others to spin | |||
|
421 | // and prevent from fetching. chunk.next = null | |||
|
422 | if (first != Interlocked.CompareExchange(ref m_first, chunk, first)) | |||
|
423 | continue;// inconsistent | |||
|
424 | ||||
|
425 | m_last = chunk; | |||
|
426 | ||||
|
427 | return; | |||
|
428 | ||||
|
429 | } while(true); | |||
399 | } |
|
430 | } | |
400 |
|
431 | |||
401 | public T[] Drain() { |
|
432 | public T[] Drain() { | |
402 | // start the new queue |
|
433 | // start the new queue | |
403 |
var |
|
434 | var chunk = new Chunk(m_chunkSize); | |
404 | Thread.MemoryBarrier(); |
|
|||
405 | m_last = t; |
|
|||
406 | Thread.MemoryBarrier(); |
|
|||
407 |
|
||||
408 | // make the new queue available to the readers, and stop the old one |
|
|||
409 | Chunk first; |
|
|||
410 |
|
435 | |||
411 | do { |
|
436 | do { | |
412 | first = m_first; |
|
|||
413 | } while(first != Interlocked.CompareExchange(ref m_first |
|
|||
414 | Thread.MemoryBarrier(); |
|
437 | Thread.MemoryBarrier(); | |
|
438 | var first = m_first; | |||
|
439 | var last = m_last; | |||
415 |
|
440 | |||
|
441 | if (last == null) | |||
|
442 | return new T[0]; | |||
416 |
|
443 | |||
|
444 | if (first == null || (first.next == null && first != last)) | |||
|
445 | continue; | |||
|
446 | ||||
|
447 | // here we will create inconsistency which will force others to spin | |||
|
448 | // and prevent from fetching. chunk.next = null | |||
|
449 | if (first != Interlocked.CompareExchange(ref m_first, chunk, first)) | |||
|
450 | continue;// inconsistent | |||
|
451 | ||||
|
452 | last = Interlocked.Exchange(ref m_last, chunk); | |||
|
453 | ||||
|
454 | return ReadChunks(first, last); | |||
|
455 | ||||
|
456 | } while(true); | |||
417 | } |
|
457 | } | |
418 |
|
458 | |||
419 | T[] ReadChunks(Chunk chunk) { |
|
459 | T[] ReadChunks(Chunk chunk, object last) { | |
420 | var result = new List<T>(); |
|
460 | var result = new List<T>(); | |
421 | var buffer = new T[m_chunkSize]; |
|
461 | var buffer = new T[m_chunkSize]; | |
422 | int actual; |
|
462 | int actual; | |
423 | bool recycle; |
|
463 | bool recycle; | |
424 | while (chunk != null) { |
|
464 | while (chunk != null) { | |
|
465 | // ensure all write operations on the chunk are complete | |||
|
466 | chunk.Commit(); | |||
|
467 | ||||
425 | // we need to read the chunk using this way |
|
468 | // we need to read the chunk using this way | |
426 | // since some client still may completing the dequeue |
|
469 | // since some client still may completing the dequeue | |
427 | // operation, such clients most likely won't get results |
|
470 | // operation, such clients most likely won't get results | |
428 | while (chunk.TryDequeueBatch(buffer, 0, buffer.Length, out actual, out recycle)) |
|
471 | while (chunk.TryDequeueBatch(buffer, 0, buffer.Length, out actual, out recycle)) | |
429 | result.AddRange(new ArraySegmentCollection(buffer, 0, actual)); |
|
472 | result.AddRange(new ArraySegmentCollection(buffer, 0, actual)); | |
430 |
|
473 | |||
|
474 | if (chunk == last) { | |||
|
475 | chunk = null; | |||
|
476 | } else { | |||
|
477 | while (chunk.next == null) | |||
|
478 | Thread.MemoryBarrier(); | |||
431 | chunk = chunk.next; |
|
479 | chunk = chunk.next; | |
432 | } |
|
480 | } | |
|
481 | } | |||
433 |
|
482 | |||
434 | return result.ToArray(); |
|
483 | return result.ToArray(); | |
435 | } |
|
484 | } |
@@ -94,12 +94,18 namespace Implab { | |||||
94 | return that; |
|
94 | return that; | |
95 | } |
|
95 | } | |
96 |
|
96 | |||
97 |
public static IPromise |
|
97 | public static IPromise Bundle(this ICollection<IPromise> that) { | |
98 | Safe.ArgumentNotNull(that, "that"); |
|
98 | Safe.ArgumentNotNull(that, "that"); | |
99 |
|
99 | |||
100 | int count = that.Count; |
|
100 | int count = that.Count; | |
|
101 | int errors = 0; | |||
101 | var medium = new Promise(); |
|
102 | var medium = new Promise(); | |
102 |
|
103 | |||
|
104 | medium.On(() => { | |||
|
105 | foreach(var p2 in that) | |||
|
106 | p2.Cancel(); | |||
|
107 | }, PromiseEventType.ErrorOrCancel); | |||
|
108 | ||||
103 | foreach (var p in that) |
|
109 | foreach (var p in that) | |
104 | p.On( |
|
110 | p.On( | |
105 | () => { |
|
111 | () => { | |
@@ -107,16 +113,63 namespace Implab { | |||||
107 | medium.Resolve(); |
|
113 | medium.Resolve(); | |
108 | }, |
|
114 | }, | |
109 | error => { |
|
115 | error => { | |
110 | throw new Exception("The dependency promise is failed", error); |
|
116 | if (Interlocked.Increment(ref errors) == 1) | |
|
117 | medium.Reject( | |||
|
118 | new Exception("The dependency promise is failed", error) | |||
|
119 | ); | |||
111 | }, |
|
120 | }, | |
112 | () => { |
|
121 | () => { | |
113 | throw new OperationCanceledException("The dependency promise is cancelled"); |
|
122 | if (Interlocked.Increment(ref errors) == 1) | |
|
123 | medium.Reject( | |||
|
124 | new Exception("The dependency promise is cancelled") | |||
|
125 | ); | |||
114 | } |
|
126 | } | |
115 | ); |
|
127 | ); | |
116 |
|
128 | |||
117 | return medium; |
|
129 | return medium; | |
118 | } |
|
130 | } | |
119 |
|
131 | |||
|
132 | public static IPromise<T[]> Bundle<T>(this ICollection<IPromise<T>> that) { | |||
|
133 | Safe.ArgumentNotNull(that, "that"); | |||
|
134 | ||||
|
135 | int count = that.Count; | |||
|
136 | int errors = 0; | |||
|
137 | var medium = new Promise<T[]>(); | |||
|
138 | var results = new T[that.Count]; | |||
|
139 | ||||
|
140 | medium.On(() => { | |||
|
141 | foreach(var p2 in that) | |||
|
142 | p2.Cancel(); | |||
|
143 | }, PromiseEventType.ErrorOrCancel); | |||
|
144 | ||||
|
145 | int i = 0; | |||
|
146 | foreach (var p in that) { | |||
|
147 | var idx = i; | |||
|
148 | p.On( | |||
|
149 | x => { | |||
|
150 | results[idx] = x; | |||
|
151 | if (Interlocked.Decrement(ref count) == 0) | |||
|
152 | medium.Resolve(results); | |||
|
153 | }, | |||
|
154 | error => { | |||
|
155 | if (Interlocked.Increment(ref errors) == 1) | |||
|
156 | medium.Reject( | |||
|
157 | new Exception("The dependency promise is failed", error) | |||
|
158 | ); | |||
|
159 | }, | |||
|
160 | () => { | |||
|
161 | if (Interlocked.Increment(ref errors) == 1) | |||
|
162 | medium.Reject( | |||
|
163 | new Exception("The dependency promise is cancelled") | |||
|
164 | ); | |||
|
165 | } | |||
|
166 | ); | |||
|
167 | i++; | |||
|
168 | } | |||
|
169 | ||||
|
170 | return medium; | |||
|
171 | } | |||
|
172 | ||||
120 | #if NET_4_5 |
|
173 | #if NET_4_5 | |
121 |
|
174 | |||
122 | public static Task<T> GetTask<T>(this IPromise<T> that) { |
|
175 | public static Task<T> GetTask<T>(this IPromise<T> that) { |
@@ -51,7 +51,7 namespace MonoPlay { | |||||
51 | Console.WriteLine("done reader #2: {0} ms", Environment.TickCount - t1); |
|
51 | Console.WriteLine("done reader #2: {0} ms", Environment.TickCount - t1); | |
52 | } |
|
52 | } | |
53 | ) |
|
53 | ) | |
54 |
. |
|
54 | .Bundle() | |
55 | .Join(); |
|
55 | .Join(); | |
56 |
|
56 | |||
57 | Console.WriteLine("done: {0} ms, summ#1: {1}, summ#2: {2}, total: {3}, count: {4}", Environment.TickCount - t1, res1, res2, res1 + res2, count*2); |
|
57 | Console.WriteLine("done: {0} ms, summ#1: {1}, summ#2: {2}, total: {3}, count: {4}", Environment.TickCount - t1, res1, res2, res1 + res2, count*2); | |
@@ -107,7 +107,7 namespace MonoPlay { | |||||
107 |
|
107 | |||
108 | } |
|
108 | } | |
109 | ) |
|
109 | ) | |
110 |
. |
|
110 | .Bundle() | |
111 | .Join(); |
|
111 | .Join(); | |
112 |
|
112 | |||
113 |
|
113 |
General Comments 0
You need to be logged in to leave comments.
Login now