@@ -249,7 +249,7 namespace Implab.Test { | |||
|
249 | 249 | for (int i = 0; i < writersCount; i++) { |
|
250 | 250 | Interlocked.Increment(ref writers); |
|
251 | 251 | AsyncPool |
|
252 |
. |
|
|
252 | .RunThread(() => { | |
|
253 | 253 | for (int ii = 0; ii < itemsPerWriter; ii++) { |
|
254 | 254 | queue.Enqueue(1); |
|
255 | 255 | } |
@@ -261,7 +261,7 namespace Implab.Test { | |||
|
261 | 261 | for (int i = 0; i < 10; i++) { |
|
262 | 262 | Interlocked.Increment(ref readers); |
|
263 | 263 | AsyncPool |
|
264 |
. |
|
|
264 | .RunThread(() => { | |
|
265 | 265 | int t; |
|
266 | 266 | do { |
|
267 | 267 | while (queue.TryDequeue(out t)) |
@@ -336,7 +336,7 namespace Implab.Test { | |||
|
336 | 336 | Console.WriteLine("done reader #2: {0} ms", Environment.TickCount - t1); |
|
337 | 337 | } |
|
338 | 338 | ) |
|
339 |
. |
|
|
339 | .Bundle() | |
|
340 | 340 | .Join(); |
|
341 | 341 | |
|
342 | 342 | Assert.AreEqual(count * 3, res1 + res2); |
@@ -414,7 +414,7 namespace Implab.Test { | |||
|
414 | 414 | Console.WriteLine("done reader #2: {0} ms", Environment.TickCount - t1); |
|
415 | 415 | } |
|
416 | 416 | ) |
|
417 |
. |
|
|
417 | .Bundle() | |
|
418 | 418 | .Join(); |
|
419 | 419 | |
|
420 | 420 | Assert.AreEqual(summ , r1 + r2); |
@@ -490,7 +490,110 namespace Implab.Test { | |||
|
490 | 490 | Console.WriteLine("done reader #2: {0} ms, avg chunk size: {1}", Environment.TickCount - t1, avgchunk); |
|
491 | 491 | } |
|
492 | 492 | ) |
|
493 |
. |
|
|
493 | .Bundle() | |
|
494 | .Join(); | |
|
495 | ||
|
496 | Assert.AreEqual(summ , r1 + r2); | |
|
497 | ||
|
498 | Console.WriteLine( | |
|
499 | "done: {0} ms, summ#1: {1}, summ#2: {2}, total: {3}, count: {4}", | |
|
500 | Environment.TickCount - t1, | |
|
501 | r1, | |
|
502 | r2, | |
|
503 | r1 + r2, | |
|
504 | total | |
|
505 | ); | |
|
506 | } | |
|
507 | ||
|
508 | [TestMethod] | |
|
509 | public void AsyncQueueDrainTest() { | |
|
510 | var queue = new AsyncQueue<int>(); | |
|
511 | ||
|
512 | const int wBatch = 11; | |
|
513 | const int wCount = 200000; | |
|
514 | const int total = wBatch * wCount * 3; | |
|
515 | const int summ = wBatch * wCount * 3; | |
|
516 | ||
|
517 | int r1 = 0, r2 = 0; | |
|
518 | const int rBatch = 11; | |
|
519 | int read = 0; | |
|
520 | ||
|
521 | var t1 = Environment.TickCount; | |
|
522 | ||
|
523 | AsyncPool.RunThread( | |
|
524 | () => { | |
|
525 | var buffer = new int[wBatch]; | |
|
526 | for(int i = 0; i<wBatch; i++) | |
|
527 | buffer[i] = 1; | |
|
528 | ||
|
529 | for(int i =0; i < wCount; i++) | |
|
530 | queue.EnqueueRange(buffer,0,wBatch); | |
|
531 | Console.WriteLine("done writer #1: {0} ms", Environment.TickCount - t1); | |
|
532 | }, | |
|
533 | () => { | |
|
534 | for(int i =0; i < wCount * wBatch; i++) | |
|
535 | queue.Enqueue(1); | |
|
536 | Console.WriteLine("done writer #2: {0} ms", Environment.TickCount - t1); | |
|
537 | }, | |
|
538 | () => { | |
|
539 | var buffer = new int[wBatch]; | |
|
540 | for(int i = 0; i<wBatch; i++) | |
|
541 | buffer[i] = 1; | |
|
542 | ||
|
543 | for(int i =0; i < wCount; i++) | |
|
544 | queue.EnqueueRange(buffer,0,wBatch); | |
|
545 | Console.WriteLine("done writer #3: {0} ms", Environment.TickCount - t1); | |
|
546 | }, | |
|
547 | /*() => { | |
|
548 | int temp; | |
|
549 | int count = 0; | |
|
550 | while (read < total) | |
|
551 | if (queue.TryDequeue(out temp)) { | |
|
552 | count++; | |
|
553 | r1 += temp; | |
|
554 | Interlocked.Increment(ref read); | |
|
555 | } | |
|
556 | Console.WriteLine("done reader #1: {0} ms, {1} count", Environment.TickCount - t1, count); | |
|
557 | },*/ | |
|
558 | /*() => { | |
|
559 | var buffer = new int[rBatch]; | |
|
560 | var count = 0; | |
|
561 | while(read < total) { | |
|
562 | int actual; | |
|
563 | if (queue.TryDequeueRange(buffer,0,rBatch,out actual)) { | |
|
564 | for(int i=0; i< actual; i++) | |
|
565 | r1 += buffer[i]; | |
|
566 | Interlocked.Add(ref read, actual); | |
|
567 | count += actual; | |
|
568 | } | |
|
569 | } | |
|
570 | ||
|
571 | Console.WriteLine("done reader #1: {0} ms, {1} items", Environment.TickCount - t1, count); | |
|
572 | },*/ | |
|
573 | () => { | |
|
574 | var count = 0; | |
|
575 | while(read < total) { | |
|
576 | var buffer = queue.Drain(); | |
|
577 | for(int i=0; i< buffer.Length; i++) | |
|
578 | r1 += buffer[i]; | |
|
579 | Interlocked.Add(ref read, buffer.Length); | |
|
580 | count += buffer.Length; | |
|
581 | } | |
|
582 | Console.WriteLine("done reader #1: {0} ms, {1} items", Environment.TickCount - t1, count); | |
|
583 | }, | |
|
584 | () => { | |
|
585 | var count = 0; | |
|
586 | while(read < total) { | |
|
587 | var buffer = queue.Drain(); | |
|
588 | for(int i=0; i< buffer.Length; i++) | |
|
589 | r2 += buffer[i]; | |
|
590 | Interlocked.Add(ref read, buffer.Length); | |
|
591 | count += buffer.Length; | |
|
592 | } | |
|
593 | Console.WriteLine("done reader #2: {0} ms, {1} items", Environment.TickCount - t1, count); | |
|
594 | } | |
|
595 | ) | |
|
596 | .Bundle() | |
|
494 | 597 | .Join(); |
|
495 | 598 | |
|
496 | 599 | Assert.AreEqual(summ , r1 + r2); |
@@ -162,7 +162,7 namespace Implab.Parallels { | |||
|
162 | 162 | int slots = threads; |
|
163 | 163 | |
|
164 | 164 | // Analysis disable AccessToDisposedClosure |
|
165 |
AsyncPool. |
|
|
165 | AsyncPool.RunThread<int>(() => { | |
|
166 | 166 | for (int i = 0; i < source.Length; i++) { |
|
167 | 167 | if(promise.IsResolved) |
|
168 | 168 | break; // stop processing in case of error or cancellation |
@@ -31,7 +31,7 namespace Implab.Parallels { | |||
|
31 | 31 | return p; |
|
32 | 32 | } |
|
33 | 33 | |
|
34 |
public static IPromise<T> |
|
|
34 | public static IPromise<T> RunThread<T>(Func<T> func) { | |
|
35 | 35 | var p = new Promise<T>(); |
|
36 | 36 | |
|
37 | 37 | var caller = TraceContext.Instance.CurrentOperation; |
@@ -53,7 +53,7 namespace Implab.Parallels { | |||
|
53 | 53 | } |
|
54 | 54 | |
|
55 | 55 | |
|
56 |
public static IPromise |
|
|
56 | public static IPromise RunThread(Action func) { | |
|
57 | 57 | var p = new Promise(); |
|
58 | 58 | |
|
59 | 59 | var caller = TraceContext.Instance.CurrentOperation; |
@@ -76,11 +76,11 namespace Implab.Parallels { | |||
|
76 | 76 | } |
|
77 | 77 | |
|
78 | 78 | public static IPromise[] RunThread(params Action[] func) { |
|
79 |
return func.Select(f => |
|
|
79 | return func.Select(f => RunThread(f)).ToArray(); | |
|
80 | 80 | } |
|
81 | 81 | |
|
82 | 82 | public static IPromise<T>[] RunThread<T>(params Func<T>[] func) { |
|
83 |
return func.Select(f => |
|
|
83 | return func.Select(f => RunThread(f)).ToArray(); | |
|
84 | 84 | } |
|
85 | 85 | } |
|
86 | 86 | } |
@@ -2,6 +2,7 | |||
|
2 | 2 | using System.Collections.Generic; |
|
3 | 3 | using System; |
|
4 | 4 | using System.Collections; |
|
5 | using System.Diagnostics; | |
|
5 | 6 | |
|
6 | 7 | namespace Implab.Parallels { |
|
7 | 8 | public class AsyncQueue<T> : IEnumerable<T> { |
@@ -60,6 +61,16 namespace Implab.Parallels { | |||
|
60 | 61 | return true; |
|
61 | 62 | } |
|
62 | 63 | |
|
64 | /// <summary> | |
|
65 | /// Prevents from allocating new space in the chunk and waits for all write operations to complete | |
|
66 | /// </summary> | |
|
67 | public void Commit() { | |
|
68 | var actual = Math.Min(Interlocked.Exchange(ref m_alloc, m_size + 1), m_size); | |
|
69 | ||
|
70 | while (m_hi != actual) | |
|
71 | Thread.MemoryBarrier(); | |
|
72 | } | |
|
73 | ||
|
63 | 74 | public bool TryDequeue(out T value, out bool recycle) { |
|
64 | 75 | int low; |
|
65 | 76 | do { |
@@ -359,76 +370,114 namespace Implab.Parallels { | |||
|
359 | 370 | |
|
360 | 371 | if (last != null) |
|
361 | 372 | last.next = chunk; |
|
362 | else | |
|
373 | else { | |
|
363 | 374 | m_first = chunk; |
|
375 | } | |
|
364 | 376 | return true; |
|
365 | 377 | } |
|
366 | 378 | |
|
367 | 379 | void RecycleFirstChunk(Chunk first) { |
|
368 | 380 | var next = first.next; |
|
369 | 381 | |
|
382 | if (first != Interlocked.CompareExchange(ref m_first, next, first)) | |
|
383 | return; | |
|
384 | ||
|
370 | 385 | if (next == null) { |
|
371 | // looks like this is the last chunk | |
|
386 | ||
|
372 | 387 | if (first != Interlocked.CompareExchange(ref m_last, null, first)) { |
|
388 | /*while (first.next == null) | |
|
389 | Thread.MemoryBarrier();*/ | |
|
390 | ||
|
373 | 391 | // race |
|
374 |
// |
|
|
375 | // or a new chunk has been appedned to the queue | |
|
376 | ||
|
377 | return; // give up | |
|
392 | // someone already updated the tail, restore the pointer to the queue head | |
|
393 | m_first = first; | |
|
378 | 394 | } |
|
379 | 395 | // the tail is updated |
|
380 | 396 | } |
|
381 | 397 | |
|
382 | 398 | // we need to update the head |
|
383 | Interlocked.CompareExchange(ref m_first, next, first); | |
|
399 | //Interlocked.CompareExchange(ref m_first, next, first); | |
|
384 | 400 | // if the head is already updated then give up |
|
385 | return; | |
|
401 | //return; | |
|
386 | 402 | |
|
387 | 403 | } |
|
388 | 404 | |
|
389 | 405 | public void Clear() { |
|
390 | 406 | // start the new queue |
|
391 |
var |
|
|
392 | Thread.MemoryBarrier(); | |
|
393 |
|
|
|
394 | Thread.MemoryBarrier(); | |
|
407 | var chunk = new Chunk(m_chunkSize); | |
|
408 | ||
|
409 | do { | |
|
410 | Thread.MemoryBarrier(); | |
|
411 | var first = m_first; | |
|
412 | var last = m_last; | |
|
413 | ||
|
414 | if (last == null) // nothing to clear | |
|
415 | return; | |
|
395 | 416 | |
|
396 | // make the new queue available to the readers, and stop the old one | |
|
397 | m_first = t; | |
|
398 | Thread.MemoryBarrier(); | |
|
417 | if (first == null || (first.next == null && first != last)) // inconcistency | |
|
418 | continue; | |
|
419 | ||
|
420 | // here we will create inconsistency which will force others to spin | |
|
421 | // and prevent from fetching. chunk.next = null | |
|
422 | if (first != Interlocked.CompareExchange(ref m_first, chunk, first)) | |
|
423 | continue;// inconsistent | |
|
424 | ||
|
425 | m_last = chunk; | |
|
426 | ||
|
427 | return; | |
|
428 | ||
|
429 | } while(true); | |
|
399 | 430 | } |
|
400 | 431 | |
|
401 | 432 | public T[] Drain() { |
|
402 | 433 | // start the new queue |
|
403 |
var |
|
|
404 | Thread.MemoryBarrier(); | |
|
405 | m_last = t; | |
|
406 | Thread.MemoryBarrier(); | |
|
407 | ||
|
408 | // make the new queue available to the readers, and stop the old one | |
|
409 | Chunk first; | |
|
434 | var chunk = new Chunk(m_chunkSize); | |
|
410 | 435 | |
|
411 | 436 | do { |
|
412 | first = m_first; | |
|
413 | } while(first != Interlocked.CompareExchange(ref m_first | |
|
414 | Thread.MemoryBarrier(); | |
|
437 | Thread.MemoryBarrier(); | |
|
438 | var first = m_first; | |
|
439 | var last = m_last; | |
|
440 | ||
|
441 | if (last == null) | |
|
442 | return new T[0]; | |
|
443 | ||
|
444 | if (first == null || (first.next == null && first != last)) | |
|
445 | continue; | |
|
415 | 446 | |
|
447 | // here we will create inconsistency which will force others to spin | |
|
448 | // and prevent from fetching. chunk.next = null | |
|
449 | if (first != Interlocked.CompareExchange(ref m_first, chunk, first)) | |
|
450 | continue;// inconsistent | |
|
416 | 451 | |
|
452 | last = Interlocked.Exchange(ref m_last, chunk); | |
|
453 | ||
|
454 | return ReadChunks(first, last); | |
|
455 | ||
|
456 | } while(true); | |
|
417 | 457 | } |
|
418 | 458 | |
|
419 | T[] ReadChunks(Chunk chunk) { | |
|
459 | T[] ReadChunks(Chunk chunk, object last) { | |
|
420 | 460 | var result = new List<T>(); |
|
421 | 461 | var buffer = new T[m_chunkSize]; |
|
422 | 462 | int actual; |
|
423 | 463 | bool recycle; |
|
424 | 464 | while (chunk != null) { |
|
465 | // ensure all write operations on the chunk are complete | |
|
466 | chunk.Commit(); | |
|
467 | ||
|
425 | 468 | // we need to read the chunk using this way |
|
426 | 469 | // since some client still may completing the dequeue |
|
427 | 470 | // operation, such clients most likely won't get results |
|
428 | 471 | while (chunk.TryDequeueBatch(buffer, 0, buffer.Length, out actual, out recycle)) |
|
429 | 472 | result.AddRange(new ArraySegmentCollection(buffer, 0, actual)); |
|
430 | 473 | |
|
431 |
chunk = |
|
|
474 | if (chunk == last) { | |
|
475 | chunk = null; | |
|
476 | } else { | |
|
477 | while (chunk.next == null) | |
|
478 | Thread.MemoryBarrier(); | |
|
479 | chunk = chunk.next; | |
|
480 | } | |
|
432 | 481 | } |
|
433 | 482 | |
|
434 | 483 | return result.ToArray(); |
@@ -94,12 +94,18 namespace Implab { | |||
|
94 | 94 | return that; |
|
95 | 95 | } |
|
96 | 96 | |
|
97 |
public static IPromise |
|
|
97 | public static IPromise Bundle(this ICollection<IPromise> that) { | |
|
98 | 98 | Safe.ArgumentNotNull(that, "that"); |
|
99 | 99 | |
|
100 | 100 | int count = that.Count; |
|
101 | int errors = 0; | |
|
101 | 102 | var medium = new Promise(); |
|
102 | 103 | |
|
104 | medium.On(() => { | |
|
105 | foreach(var p2 in that) | |
|
106 | p2.Cancel(); | |
|
107 | }, PromiseEventType.ErrorOrCancel); | |
|
108 | ||
|
103 | 109 | foreach (var p in that) |
|
104 | 110 | p.On( |
|
105 | 111 | () => { |
@@ -107,15 +113,62 namespace Implab { | |||
|
107 | 113 | medium.Resolve(); |
|
108 | 114 | }, |
|
109 | 115 | error => { |
|
110 | throw new Exception("The dependency promise is failed", error); | |
|
116 | if (Interlocked.Increment(ref errors) == 1) | |
|
117 | medium.Reject( | |
|
118 | new Exception("The dependency promise is failed", error) | |
|
119 | ); | |
|
111 | 120 | }, |
|
112 | 121 | () => { |
|
113 | throw new OperationCanceledException("The dependency promise is cancelled"); | |
|
122 | if (Interlocked.Increment(ref errors) == 1) | |
|
123 | medium.Reject( | |
|
124 | new Exception("The dependency promise is cancelled") | |
|
125 | ); | |
|
114 | 126 | } |
|
115 | 127 | ); |
|
116 | 128 | |
|
117 | 129 | return medium; |
|
118 | 130 | } |
|
131 | ||
|
132 | public static IPromise<T[]> Bundle<T>(this ICollection<IPromise<T>> that) { | |
|
133 | Safe.ArgumentNotNull(that, "that"); | |
|
134 | ||
|
135 | int count = that.Count; | |
|
136 | int errors = 0; | |
|
137 | var medium = new Promise<T[]>(); | |
|
138 | var results = new T[that.Count]; | |
|
139 | ||
|
140 | medium.On(() => { | |
|
141 | foreach(var p2 in that) | |
|
142 | p2.Cancel(); | |
|
143 | }, PromiseEventType.ErrorOrCancel); | |
|
144 | ||
|
145 | int i = 0; | |
|
146 | foreach (var p in that) { | |
|
147 | var idx = i; | |
|
148 | p.On( | |
|
149 | x => { | |
|
150 | results[idx] = x; | |
|
151 | if (Interlocked.Decrement(ref count) == 0) | |
|
152 | medium.Resolve(results); | |
|
153 | }, | |
|
154 | error => { | |
|
155 | if (Interlocked.Increment(ref errors) == 1) | |
|
156 | medium.Reject( | |
|
157 | new Exception("The dependency promise is failed", error) | |
|
158 | ); | |
|
159 | }, | |
|
160 | () => { | |
|
161 | if (Interlocked.Increment(ref errors) == 1) | |
|
162 | medium.Reject( | |
|
163 | new Exception("The dependency promise is cancelled") | |
|
164 | ); | |
|
165 | } | |
|
166 | ); | |
|
167 | i++; | |
|
168 | } | |
|
169 | ||
|
170 | return medium; | |
|
171 | } | |
|
119 | 172 | |
|
120 | 173 | #if NET_4_5 |
|
121 | 174 |
@@ -51,7 +51,7 namespace MonoPlay { | |||
|
51 | 51 | Console.WriteLine("done reader #2: {0} ms", Environment.TickCount - t1); |
|
52 | 52 | } |
|
53 | 53 | ) |
|
54 |
. |
|
|
54 | .Bundle() | |
|
55 | 55 | .Join(); |
|
56 | 56 | |
|
57 | 57 | Console.WriteLine("done: {0} ms, summ#1: {1}, summ#2: {2}, total: {3}, count: {4}", Environment.TickCount - t1, res1, res2, res1 + res2, count*2); |
@@ -107,7 +107,7 namespace MonoPlay { | |||
|
107 | 107 | |
|
108 | 108 | } |
|
109 | 109 | ) |
|
110 |
. |
|
|
110 | .Bundle() | |
|
111 | 111 | .Join(); |
|
112 | 112 | |
|
113 | 113 |
General Comments 0
You need to be logged in to leave comments.
Login now