@@ -430,6 +430,82 namespace Implab.Test { | |||
|
430 | 430 | } |
|
431 | 431 | |
|
432 | 432 | [TestMethod] |
|
433 | public void AsyncQueueChunkDequeueTest() { | |
|
434 | var queue = new AsyncQueue<int>(); | |
|
435 | ||
|
436 | const int wBatch = 31; | |
|
437 | const int wCount = 200000; | |
|
438 | const int total = wBatch * wCount * 3; | |
|
439 | const int summ = wBatch * wCount * 6; | |
|
440 | ||
|
441 | int r1 = 0, r2 = 0; | |
|
442 | const int rBatch = 1024; | |
|
443 | int read = 0; | |
|
444 | ||
|
445 | var t1 = Environment.TickCount; | |
|
446 | ||
|
447 | AsyncPool.RunThread( | |
|
448 | () => { | |
|
449 | var buffer = new int[wBatch]; | |
|
450 | for(int i = 0; i<wBatch; i++) | |
|
451 | buffer[i] = 1; | |
|
452 | ||
|
453 | for(int i =0; i < wCount; i++) | |
|
454 | queue.EnqueueRange(buffer,0,wBatch); | |
|
455 | Console.WriteLine("done writer #1: {0} ms", Environment.TickCount - t1); | |
|
456 | }, | |
|
457 | () => { | |
|
458 | var buffer = new int[wBatch]; | |
|
459 | for(int i = 0; i<wBatch; i++) | |
|
460 | buffer[i] = 2; | |
|
461 | ||
|
462 | for(int i =0; i < wCount; i++) | |
|
463 | queue.EnqueueRange(buffer,0,wBatch); | |
|
464 | Console.WriteLine("done writer #2: {0} ms", Environment.TickCount - t1); | |
|
465 | }, | |
|
466 | () => { | |
|
467 | var buffer = new int[wBatch]; | |
|
468 | for(int i = 0; i<wBatch; i++) | |
|
469 | buffer[i] = 3; | |
|
470 | ||
|
471 | for(int i =0; i < wCount; i++) | |
|
472 | queue.EnqueueRange(buffer,0,wBatch); | |
|
473 | Console.WriteLine("done writer #3: {0} ms", Environment.TickCount - t1); | |
|
474 | }, | |
|
475 | () => { | |
|
476 | var buffer = new int[rBatch]; | |
|
477 | int count = 1; | |
|
478 | double avgchunk = 0; | |
|
479 | while(read < total) { | |
|
480 | int actual; | |
|
481 | if (queue.TryDequeueChunk(buffer,0,rBatch,out actual)) { | |
|
482 | for(int i=0; i< actual; i++) | |
|
483 | r2 += buffer[i]; | |
|
484 | Interlocked.Add(ref read, actual); | |
|
485 | avgchunk = avgchunk*(count-1)/count + actual/(double)count; | |
|
486 | count ++; | |
|
487 | } | |
|
488 | } | |
|
489 | ||
|
490 | Console.WriteLine("done reader #2: {0} ms, avg chunk size: {1}", Environment.TickCount - t1, avgchunk); | |
|
491 | } | |
|
492 | ) | |
|
493 | .Combine() | |
|
494 | .Join(); | |
|
495 | ||
|
496 | Assert.AreEqual(summ , r1 + r2); | |
|
497 | ||
|
498 | Console.WriteLine( | |
|
499 | "done: {0} ms, summ#1: {1}, summ#2: {2}, total: {3}, count: {4}", | |
|
500 | Environment.TickCount - t1, | |
|
501 | r1, | |
|
502 | r2, | |
|
503 | r1 + r2, | |
|
504 | total | |
|
505 | ); | |
|
506 | } | |
|
507 | ||
|
508 | [TestMethod] | |
|
433 | 509 | public void ParallelMapTest() { |
|
434 | 510 | |
|
435 | 511 | const int count = 100000; |
@@ -78,42 +78,31 namespace Implab.Parallels { | |||
|
78 | 78 | } |
|
79 | 79 | |
|
80 | 80 | public bool TryEnqueueBatch(T[] batch, int offset, int length, out int enqueued, out bool extend) { |
|
81 | int alloc; | |
|
82 | int allocSize; | |
|
83 | ||
|
84 | // in case the batch size is larger than a free space in chunk | |
|
85 | // tailGap is used to over allocate the space in the chunk to | |
|
86 | // get exclusive permission on creation of the next one. | |
|
87 | int tailGap = 0; | |
|
88 | ||
|
89 | do { | |
|
90 | alloc = m_alloc; | |
|
81 | //int alloc; | |
|
82 | //int allocSize; | |
|
91 | 83 | |
|
92 | if (alloc > m_size) { | |
|
93 | // the chunk is full and someone already | |
|
94 | // creating the new one | |
|
95 |
|
|
|
96 | extend = false; // the caller shouldn't try to extend the queue | |
|
97 | return false; // nothing was added | |
|
98 | } | |
|
84 | var alloc = Interlocked.Add(ref m_alloc, length) - length; | |
|
85 | if (alloc > m_size) { | |
|
86 | // the chunk is full and someone already | |
|
87 | // creating the new one | |
|
88 | enqueued = 0; // nothing was added | |
|
89 | extend = false; // the caller shouldn't try to extend the queue | |
|
90 | return false; // nothing was added | |
|
91 | } | |
|
99 | 92 | |
|
100 |
|
|
|
101 | if (allocSize < length) // the chunk doesn't have enough space to hold the whole batch | |
|
102 | tailGap = 1; // overallocate space to get exclusive permission to extend queue | |
|
103 | } while(alloc != Interlocked.CompareExchange(ref m_alloc, alloc + allocSize + tailGap, alloc)); | |
|
104 | ||
|
105 | extend = tailGap != 0; | |
|
106 | enqueued = allocSize; | |
|
93 | enqueued = Math.Min(m_size - alloc, length); | |
|
94 | extend = length > enqueued; | |
|
107 | 95 | |
|
108 | // if the chunk was already full then length > 0, allocSize = 0, tailGap = 1 | |
|
109 | if (alloc == m_size) | |
|
96 | if (enqueued == 0) | |
|
110 | 97 | return false; |
|
111 | 98 | |
|
112 | Array.Copy(batch, offset, m_data, alloc, allocSize); | |
|
113 | 99 | |
|
114 | while (alloc != Interlocked.CompareExchange(ref m_hi, alloc + allocSize, alloc)) { | |
|
100 | Array.Copy(batch, offset, m_data, alloc, enqueued); | |
|
101 | ||
|
102 | while (alloc != Interlocked.CompareExchange(ref m_hi, alloc + enqueued, alloc)) { | |
|
115 | 103 | // spin wait for commit |
|
116 | 104 | } |
|
105 | ||
|
117 | 106 | return true; |
|
118 | 107 | } |
|
119 | 108 | |
@@ -169,7 +158,7 namespace Implab.Parallels { | |||
|
169 | 158 | if (extend || last == null) { |
|
170 | 159 | var chunk = new Chunk(m_chunkSize, value); |
|
171 | 160 | if (EnqueueChunk(last, chunk)) |
|
172 | break; | |
|
161 | break; // success! exit! | |
|
173 | 162 | last = m_last; |
|
174 | 163 | } else { |
|
175 | 164 | while (last == m_last) { |
@@ -326,7 +315,7 namespace Implab.Parallels { | |||
|
326 | 315 | /// Tries to dequeue all remaining data in the first chunk. |
|
327 | 316 | /// </summary> |
|
328 | 317 | /// <returns><c>true</c>, if data was dequeued, <c>false</c> otherwise.</returns> |
|
329 | /// <param name="buffer">The buffer to which data will be written.</param> | |
|
318 | /// <param name="buffer">The buffer to which the data will be written.</param> | |
|
330 | 319 | /// <param name="offset">The offset in the buffer at which the data will be written.</param> |
|
331 | 320 | /// <param name="length">Tha maximum amount of the data to be dequeued.</param> |
|
332 | 321 | /// <param name="dequeued">The actual amount of the dequeued data.</param> |
@@ -54,7 +54,7 namespace MonoPlay { | |||
|
54 | 54 | .Combine() |
|
55 | 55 | .Join(); |
|
56 | 56 | |
|
57 | Console.WriteLine("done: {0} ms, summ#1: {1}, summ#2: {2}, total: {3}", Environment.TickCount - t1, res1, res2, res1 + res2); | |
|
57 | Console.WriteLine("done: {0} ms, summ#1: {1}, summ#2: {2}, total: {3}, count: {4}", Environment.TickCount - t1, res1, res2, res1 + res2, count*2); | |
|
58 | 58 | |
|
59 | 59 | var t2 = Environment.TickCount; |
|
60 | 60 | Console.WriteLine("MTQueue: {0} ms", t2 - t1); |
General Comments 0
You need to be logged in to leave comments.
Login now