##// END OF EJS Templates
minor fixes and improvements of AsyncQueue, additional tests
cin -
r122:0c8685c8b56b v2
parent child
Show More
@@ -430,6 +430,82 namespace Implab.Test {
430 430 }
431 431
432 432 [TestMethod]
433 public void AsyncQueueChunkDequeueTest() {
434 var queue = new AsyncQueue<int>();
435
436 const int wBatch = 31;
437 const int wCount = 200000;
438 const int total = wBatch * wCount * 3;
439 const int summ = wBatch * wCount * 6;
440
441 int r1 = 0, r2 = 0;
442 const int rBatch = 1024;
443 int read = 0;
444
445 var t1 = Environment.TickCount;
446
447 AsyncPool.RunThread(
448 () => {
449 var buffer = new int[wBatch];
450 for(int i = 0; i<wBatch; i++)
451 buffer[i] = 1;
452
453 for(int i =0; i < wCount; i++)
454 queue.EnqueueRange(buffer,0,wBatch);
455 Console.WriteLine("done writer #1: {0} ms", Environment.TickCount - t1);
456 },
457 () => {
458 var buffer = new int[wBatch];
459 for(int i = 0; i<wBatch; i++)
460 buffer[i] = 2;
461
462 for(int i =0; i < wCount; i++)
463 queue.EnqueueRange(buffer,0,wBatch);
464 Console.WriteLine("done writer #2: {0} ms", Environment.TickCount - t1);
465 },
466 () => {
467 var buffer = new int[wBatch];
468 for(int i = 0; i<wBatch; i++)
469 buffer[i] = 3;
470
471 for(int i =0; i < wCount; i++)
472 queue.EnqueueRange(buffer,0,wBatch);
473 Console.WriteLine("done writer #3: {0} ms", Environment.TickCount - t1);
474 },
475 () => {
476 var buffer = new int[rBatch];
477 int count = 1;
478 double avgchunk = 0;
479 while(read < total) {
480 int actual;
481 if (queue.TryDequeueChunk(buffer,0,rBatch,out actual)) {
482 for(int i=0; i< actual; i++)
483 r2 += buffer[i];
484 Interlocked.Add(ref read, actual);
485 avgchunk = avgchunk*(count-1)/count + actual/(double)count;
486 count ++;
487 }
488 }
489
490 Console.WriteLine("done reader #2: {0} ms, avg chunk size: {1}", Environment.TickCount - t1, avgchunk);
491 }
492 )
493 .Combine()
494 .Join();
495
496 Assert.AreEqual(summ , r1 + r2);
497
498 Console.WriteLine(
499 "done: {0} ms, summ#1: {1}, summ#2: {2}, total: {3}, count: {4}",
500 Environment.TickCount - t1,
501 r1,
502 r2,
503 r1 + r2,
504 total
505 );
506 }
507
508 [TestMethod]
433 509 public void ParallelMapTest() {
434 510
435 511 const int count = 100000;
@@ -78,17 +78,10 namespace Implab.Parallels {
78 78 }
79 79
80 80 public bool TryEnqueueBatch(T[] batch, int offset, int length, out int enqueued, out bool extend) {
81 int alloc;
82 int allocSize;
81 //int alloc;
82 //int allocSize;
83 83
84 // in case the batch size is larger than a free space in chunk
85 // tailGap is used to over allocate the space in the chunk to
86 // get exclusive permission on creation of the next one.
87 int tailGap = 0;
88
89 do {
90 alloc = m_alloc;
91
84 var alloc = Interlocked.Add(ref m_alloc, length) - length;
92 85 if (alloc > m_size) {
93 86 // the chunk is full and someone already
94 87 // creating the new one
@@ -97,23 +90,19 namespace Implab.Parallels {
97 90 return false; // nothing was added
98 91 }
99 92
100 allocSize = Math.Min(m_size - alloc, length);
101 if (allocSize < length) // the chunk doesn't have enough space to hold the whole batch
102 tailGap = 1; // overallocate space to get exclusive permission to extend queue
103 } while(alloc != Interlocked.CompareExchange(ref m_alloc, alloc + allocSize + tailGap, alloc));
93 enqueued = Math.Min(m_size - alloc, length);
94 extend = length > enqueued;
104 95
105 extend = tailGap != 0;
106 enqueued = allocSize;
107
108 // if the chunk was already full then length > 0, allocSize = 0, tailGap = 1
109 if (alloc == m_size)
96 if (enqueued == 0)
110 97 return false;
111 98
112 Array.Copy(batch, offset, m_data, alloc, allocSize);
113 99
114 while (alloc != Interlocked.CompareExchange(ref m_hi, alloc + allocSize, alloc)) {
100 Array.Copy(batch, offset, m_data, alloc, enqueued);
101
102 while (alloc != Interlocked.CompareExchange(ref m_hi, alloc + enqueued, alloc)) {
115 103 // spin wait for commit
116 104 }
105
117 106 return true;
118 107 }
119 108
@@ -169,7 +158,7 namespace Implab.Parallels {
169 158 if (extend || last == null) {
170 159 var chunk = new Chunk(m_chunkSize, value);
171 160 if (EnqueueChunk(last, chunk))
172 break;
161 break; // success! exit!
173 162 last = m_last;
174 163 } else {
175 164 while (last == m_last) {
@@ -326,7 +315,7 namespace Implab.Parallels {
326 315 /// Tries to dequeue all remaining data in the first chunk.
327 316 /// </summary>
328 317 /// <returns><c>true</c>, if data was dequeued, <c>false</c> otherwise.</returns>
329 /// <param name="buffer">The buffer to which data will be written.</param>
318 /// <param name="buffer">The buffer to which the data will be written.</param>
330 319 /// <param name="offset">The offset in the buffer at which the data will be written.</param>
331 320 /// <param name="length">Tha maximum amount of the data to be dequeued.</param>
332 321 /// <param name="dequeued">The actual amount of the dequeued data.</param>
@@ -54,7 +54,7 namespace MonoPlay {
54 54 .Combine()
55 55 .Join();
56 56
57 Console.WriteLine("done: {0} ms, summ#1: {1}, summ#2: {2}, total: {3}", Environment.TickCount - t1, res1, res2, res1 + res2);
57 Console.WriteLine("done: {0} ms, summ#1: {1}, summ#2: {2}, total: {3}, count: {4}", Environment.TickCount - t1, res1, res2, res1 + res2, count*2);
58 58
59 59 var t2 = Environment.TickCount;
60 60 Console.WriteLine("MTQueue: {0} ms", t2 - t1);
General Comments 0
You need to be logged in to leave comments. Login now