@@ -430,6 +430,82 namespace Implab.Test { | |||||
430 | } |
|
430 | } | |
431 |
|
431 | |||
432 | [TestMethod] |
|
432 | [TestMethod] | |
|
433 | public void AsyncQueueChunkDequeueTest() { | |||
|
434 | var queue = new AsyncQueue<int>(); | |||
|
435 | ||||
|
436 | const int wBatch = 31; | |||
|
437 | const int wCount = 200000; | |||
|
438 | const int total = wBatch * wCount * 3; | |||
|
439 | const int summ = wBatch * wCount * 6; | |||
|
440 | ||||
|
441 | int r1 = 0, r2 = 0; | |||
|
442 | const int rBatch = 1024; | |||
|
443 | int read = 0; | |||
|
444 | ||||
|
445 | var t1 = Environment.TickCount; | |||
|
446 | ||||
|
447 | AsyncPool.RunThread( | |||
|
448 | () => { | |||
|
449 | var buffer = new int[wBatch]; | |||
|
450 | for(int i = 0; i<wBatch; i++) | |||
|
451 | buffer[i] = 1; | |||
|
452 | ||||
|
453 | for(int i =0; i < wCount; i++) | |||
|
454 | queue.EnqueueRange(buffer,0,wBatch); | |||
|
455 | Console.WriteLine("done writer #1: {0} ms", Environment.TickCount - t1); | |||
|
456 | }, | |||
|
457 | () => { | |||
|
458 | var buffer = new int[wBatch]; | |||
|
459 | for(int i = 0; i<wBatch; i++) | |||
|
460 | buffer[i] = 2; | |||
|
461 | ||||
|
462 | for(int i =0; i < wCount; i++) | |||
|
463 | queue.EnqueueRange(buffer,0,wBatch); | |||
|
464 | Console.WriteLine("done writer #2: {0} ms", Environment.TickCount - t1); | |||
|
465 | }, | |||
|
466 | () => { | |||
|
467 | var buffer = new int[wBatch]; | |||
|
468 | for(int i = 0; i<wBatch; i++) | |||
|
469 | buffer[i] = 3; | |||
|
470 | ||||
|
471 | for(int i =0; i < wCount; i++) | |||
|
472 | queue.EnqueueRange(buffer,0,wBatch); | |||
|
473 | Console.WriteLine("done writer #3: {0} ms", Environment.TickCount - t1); | |||
|
474 | }, | |||
|
475 | () => { | |||
|
476 | var buffer = new int[rBatch]; | |||
|
477 | int count = 1; | |||
|
478 | double avgchunk = 0; | |||
|
479 | while(read < total) { | |||
|
480 | int actual; | |||
|
481 | if (queue.TryDequeueChunk(buffer,0,rBatch,out actual)) { | |||
|
482 | for(int i=0; i< actual; i++) | |||
|
483 | r2 += buffer[i]; | |||
|
484 | Interlocked.Add(ref read, actual); | |||
|
485 | avgchunk = avgchunk*(count-1)/count + actual/(double)count; | |||
|
486 | count ++; | |||
|
487 | } | |||
|
488 | } | |||
|
489 | ||||
|
490 | Console.WriteLine("done reader #2: {0} ms, avg chunk size: {1}", Environment.TickCount - t1, avgchunk); | |||
|
491 | } | |||
|
492 | ) | |||
|
493 | .Combine() | |||
|
494 | .Join(); | |||
|
495 | ||||
|
496 | Assert.AreEqual(summ , r1 + r2); | |||
|
497 | ||||
|
498 | Console.WriteLine( | |||
|
499 | "done: {0} ms, summ#1: {1}, summ#2: {2}, total: {3}, count: {4}", | |||
|
500 | Environment.TickCount - t1, | |||
|
501 | r1, | |||
|
502 | r2, | |||
|
503 | r1 + r2, | |||
|
504 | total | |||
|
505 | ); | |||
|
506 | } | |||
|
507 | ||||
|
508 | [TestMethod] | |||
433 | public void ParallelMapTest() { |
|
509 | public void ParallelMapTest() { | |
434 |
|
510 | |||
435 | const int count = 100000; |
|
511 | const int count = 100000; |
@@ -78,42 +78,31 namespace Implab.Parallels { | |||||
78 | } |
|
78 | } | |
79 |
|
79 | |||
80 | public bool TryEnqueueBatch(T[] batch, int offset, int length, out int enqueued, out bool extend) { |
|
80 | public bool TryEnqueueBatch(T[] batch, int offset, int length, out int enqueued, out bool extend) { | |
81 | int alloc; |
|
81 | //int alloc; | |
82 | int allocSize; |
|
82 | //int allocSize; | |
83 |
|
||||
84 | // in case the batch size is larger than a free space in chunk |
|
|||
85 | // tailGap is used to over allocate the space in the chunk to |
|
|||
86 | // get exclusive permission on creation of the next one. |
|
|||
87 | int tailGap = 0; |
|
|||
88 |
|
||||
89 | do { |
|
|||
90 | alloc = m_alloc; |
|
|||
91 |
|
83 | |||
92 | if (alloc > m_size) { |
|
84 | var alloc = Interlocked.Add(ref m_alloc, length) - length; | |
93 | // the chunk is full and someone already |
|
85 | if (alloc > m_size) { | |
94 | // creating the new one |
|
86 | // the chunk is full and someone already | |
95 |
|
|
87 | // creating the new one | |
96 | extend = false; // the caller shouldn't try to extend the queue |
|
88 | enqueued = 0; // nothing was added | |
97 | return false; // nothing was added |
|
89 | extend = false; // the caller shouldn't try to extend the queue | |
98 | } |
|
90 | return false; // nothing was added | |
|
91 | } | |||
99 |
|
92 | |||
100 |
|
|
93 | enqueued = Math.Min(m_size - alloc, length); | |
101 | if (allocSize < length) // the chunk doesn't have enough space to hold the whole batch |
|
94 | extend = length > enqueued; | |
102 | tailGap = 1; // overallocate space to get exclusive permission to extend queue |
|
|||
103 | } while(alloc != Interlocked.CompareExchange(ref m_alloc, alloc + allocSize + tailGap, alloc)); |
|
|||
104 |
|
||||
105 | extend = tailGap != 0; |
|
|||
106 | enqueued = allocSize; |
|
|||
107 |
|
95 | |||
108 | // if the chunk was already full then length > 0, allocSize = 0, tailGap = 1 |
|
96 | if (enqueued == 0) | |
109 | if (alloc == m_size) |
|
|||
110 | return false; |
|
97 | return false; | |
111 |
|
98 | |||
112 | Array.Copy(batch, offset, m_data, alloc, allocSize); |
|
|||
113 |
|
99 | |||
114 | while (alloc != Interlocked.CompareExchange(ref m_hi, alloc + allocSize, alloc)) { |
|
100 | Array.Copy(batch, offset, m_data, alloc, enqueued); | |
|
101 | ||||
|
102 | while (alloc != Interlocked.CompareExchange(ref m_hi, alloc + enqueued, alloc)) { | |||
115 | // spin wait for commit |
|
103 | // spin wait for commit | |
116 | } |
|
104 | } | |
|
105 | ||||
117 | return true; |
|
106 | return true; | |
118 | } |
|
107 | } | |
119 |
|
108 | |||
@@ -169,7 +158,7 namespace Implab.Parallels { | |||||
169 | if (extend || last == null) { |
|
158 | if (extend || last == null) { | |
170 | var chunk = new Chunk(m_chunkSize, value); |
|
159 | var chunk = new Chunk(m_chunkSize, value); | |
171 | if (EnqueueChunk(last, chunk)) |
|
160 | if (EnqueueChunk(last, chunk)) | |
172 | break; |
|
161 | break; // success! exit! | |
173 | last = m_last; |
|
162 | last = m_last; | |
174 | } else { |
|
163 | } else { | |
175 | while (last == m_last) { |
|
164 | while (last == m_last) { | |
@@ -326,7 +315,7 namespace Implab.Parallels { | |||||
326 | /// Tries to dequeue all remaining data in the first chunk. |
|
315 | /// Tries to dequeue all remaining data in the first chunk. | |
327 | /// </summary> |
|
316 | /// </summary> | |
328 | /// <returns><c>true</c>, if data was dequeued, <c>false</c> otherwise.</returns> |
|
317 | /// <returns><c>true</c>, if data was dequeued, <c>false</c> otherwise.</returns> | |
329 | /// <param name="buffer">The buffer to which data will be written.</param> |
|
318 | /// <param name="buffer">The buffer to which the data will be written.</param> | |
330 | /// <param name="offset">The offset in the buffer at which the data will be written.</param> |
|
319 | /// <param name="offset">The offset in the buffer at which the data will be written.</param> | |
331 | /// <param name="length">Tha maximum amount of the data to be dequeued.</param> |
|
320 | /// <param name="length">Tha maximum amount of the data to be dequeued.</param> | |
332 | /// <param name="dequeued">The actual amount of the dequeued data.</param> |
|
321 | /// <param name="dequeued">The actual amount of the dequeued data.</param> |
@@ -54,7 +54,7 namespace MonoPlay { | |||||
54 | .Combine() |
|
54 | .Combine() | |
55 | .Join(); |
|
55 | .Join(); | |
56 |
|
56 | |||
57 | Console.WriteLine("done: {0} ms, summ#1: {1}, summ#2: {2}, total: {3}", Environment.TickCount - t1, res1, res2, res1 + res2); |
|
57 | Console.WriteLine("done: {0} ms, summ#1: {1}, summ#2: {2}, total: {3}, count: {4}", Environment.TickCount - t1, res1, res2, res1 + res2, count*2); | |
58 |
|
58 | |||
59 | var t2 = Environment.TickCount; |
|
59 | var t2 = Environment.TickCount; | |
60 | Console.WriteLine("MTQueue: {0} ms", t2 - t1); |
|
60 | Console.WriteLine("MTQueue: {0} ms", t2 - t1); |
General Comments 0
You need to be logged in to leave comments.
Login now