| @@ -430,6 +430,82 namespace Implab.Test { | |||||
| 430 | } | 
             | 
        430 | } | |
| 431 | 
             | 
        431 | |||
| 432 | [TestMethod] | 
             | 
        432 | [TestMethod] | |
| 
             | 
        433 | public void AsyncQueueChunkDequeueTest() { | |||
| 
             | 
        434 | var queue = new AsyncQueue<int>(); | |||
| 
             | 
        435 | ||||
| 
             | 
        436 | const int wBatch = 31; | |||
| 
             | 
        437 | const int wCount = 200000; | |||
| 
             | 
        438 | const int total = wBatch * wCount * 3; | |||
| 
             | 
        439 | const int summ = wBatch * wCount * 6; | |||
| 
             | 
        440 | ||||
| 
             | 
        441 | int r1 = 0, r2 = 0; | |||
| 
             | 
        442 | const int rBatch = 1024; | |||
| 
             | 
        443 | int read = 0; | |||
| 
             | 
        444 | ||||
| 
             | 
        445 | var t1 = Environment.TickCount; | |||
| 
             | 
        446 | ||||
| 
             | 
        447 | AsyncPool.RunThread( | |||
| 
             | 
        448 | () => { | |||
| 
             | 
        449 | var buffer = new int[wBatch]; | |||
| 
             | 
        450 | for(int i = 0; i<wBatch; i++) | |||
| 
             | 
        451 | buffer[i] = 1; | |||
| 
             | 
        452 | ||||
| 
             | 
        453 | for(int i =0; i < wCount; i++) | |||
| 
             | 
        454 | queue.EnqueueRange(buffer,0,wBatch); | |||
| 
             | 
        455 | Console.WriteLine("done writer #1: {0} ms", Environment.TickCount - t1); | |||
| 
             | 
        456 | }, | |||
| 
             | 
        457 | () => { | |||
| 
             | 
        458 | var buffer = new int[wBatch]; | |||
| 
             | 
        459 | for(int i = 0; i<wBatch; i++) | |||
| 
             | 
        460 | buffer[i] = 2; | |||
| 
             | 
        461 | ||||
| 
             | 
        462 | for(int i =0; i < wCount; i++) | |||
| 
             | 
        463 | queue.EnqueueRange(buffer,0,wBatch); | |||
| 
             | 
        464 | Console.WriteLine("done writer #2: {0} ms", Environment.TickCount - t1); | |||
| 
             | 
        465 | }, | |||
| 
             | 
        466 | () => { | |||
| 
             | 
        467 | var buffer = new int[wBatch]; | |||
| 
             | 
        468 | for(int i = 0; i<wBatch; i++) | |||
| 
             | 
        469 | buffer[i] = 3; | |||
| 
             | 
        470 | ||||
| 
             | 
        471 | for(int i =0; i < wCount; i++) | |||
| 
             | 
        472 | queue.EnqueueRange(buffer,0,wBatch); | |||
| 
             | 
        473 | Console.WriteLine("done writer #3: {0} ms", Environment.TickCount - t1); | |||
| 
             | 
        474 | }, | |||
| 
             | 
        475 | () => { | |||
| 
             | 
        476 | var buffer = new int[rBatch]; | |||
| 
             | 
        477 | int count = 1; | |||
| 
             | 
        478 | double avgchunk = 0; | |||
| 
             | 
        479 | while(read < total) { | |||
| 
             | 
        480 | int actual; | |||
| 
             | 
        481 | if (queue.TryDequeueChunk(buffer,0,rBatch,out actual)) { | |||
| 
             | 
        482 | for(int i=0; i< actual; i++) | |||
| 
             | 
        483 | r2 += buffer[i]; | |||
| 
             | 
        484 | Interlocked.Add(ref read, actual); | |||
| 
             | 
        485 | avgchunk = avgchunk*(count-1)/count + actual/(double)count; | |||
| 
             | 
        486 | count ++; | |||
| 
             | 
        487 | } | |||
| 
             | 
        488 | } | |||
| 
             | 
        489 | ||||
| 
             | 
        490 | Console.WriteLine("done reader #2: {0} ms, avg chunk size: {1}", Environment.TickCount - t1, avgchunk); | |||
| 
             | 
        491 | } | |||
| 
             | 
        492 | ) | |||
| 
             | 
        493 | .Combine() | |||
| 
             | 
        494 | .Join(); | |||
| 
             | 
        495 | ||||
| 
             | 
        496 | Assert.AreEqual(summ , r1 + r2); | |||
| 
             | 
        497 | ||||
| 
             | 
        498 | Console.WriteLine( | |||
| 
             | 
        499 | "done: {0} ms, summ#1: {1}, summ#2: {2}, total: {3}, count: {4}", | |||
| 
             | 
        500 | Environment.TickCount - t1, | |||
| 
             | 
        501 | r1, | |||
| 
             | 
        502 | r2, | |||
| 
             | 
        503 | r1 + r2, | |||
| 
             | 
        504 | total | |||
| 
             | 
        505 | ); | |||
| 
             | 
        506 | } | |||
| 
             | 
        507 | ||||
| 
             | 
        508 | [TestMethod] | |||
| 433 | public void ParallelMapTest() { | 
             | 
        509 | public void ParallelMapTest() { | |
| 434 | 
             | 
        510 | |||
| 435 | const int count = 100000; | 
             | 
        511 | const int count = 100000; | |
| @@ -78,42 +78,31 namespace Implab.Parallels { | |||||
| 78 | } | 
             | 
        78 | } | |
| 79 | 
             | 
        79 | |||
| 80 | public bool TryEnqueueBatch(T[] batch, int offset, int length, out int enqueued, out bool extend) { | 
             | 
        80 | public bool TryEnqueueBatch(T[] batch, int offset, int length, out int enqueued, out bool extend) { | |
| 81 | int alloc; | 
             | 
        81 | //int alloc; | |
| 82 | int allocSize; | 
             | 
        82 | //int allocSize; | |
| 83 | 
             | 
        ||||
| 84 | // in case the batch size is larger than a free space in chunk | 
             | 
        |||
| 85 | // tailGap is used to over allocate the space in the chunk to | 
             | 
        |||
| 86 | // get exclusive permission on creation of the next one. | 
             | 
        |||
| 87 | int tailGap = 0; | 
             | 
        |||
| 88 | 
             | 
        ||||
| 89 | do { | 
             | 
        |||
| 90 | alloc = m_alloc; | 
             | 
        |||
| 91 | 
             | 
        83 | |||
| 92 | if (alloc > m_size) { | 
             | 
        84 | var alloc = Interlocked.Add(ref m_alloc, length) - length; | |
| 93 | // the chunk is full and someone already | 
             | 
        85 | if (alloc > m_size) { | |
| 94 | // creating the new one | 
             | 
        86 | // the chunk is full and someone already | |
| 95 | 
            
                                 | 
        
             | 
        87 | // creating the new one | |
| 96 | extend = false; // the caller shouldn't try to extend the queue | 
             | 
        88 | enqueued = 0; // nothing was added | |
| 97 | return false; // nothing was added | 
             | 
        89 | extend = false; // the caller shouldn't try to extend the queue | |
| 98 | } | 
             | 
        90 | return false; // nothing was added | |
| 
             | 
        91 | } | |||
| 99 | 
             | 
        92 | |||
| 100 | 
            
                             | 
        
             | 
        93 | enqueued = Math.Min(m_size - alloc, length); | |
| 101 | if (allocSize < length) // the chunk doesn't have enough space to hold the whole batch | 
             | 
        94 | extend = length > enqueued; | |
| 102 | tailGap = 1; // overallocate space to get exclusive permission to extend queue | 
             | 
        |||
| 103 | } while(alloc != Interlocked.CompareExchange(ref m_alloc, alloc + allocSize + tailGap, alloc)); | 
             | 
        |||
| 104 | 
             | 
        ||||
| 105 | extend = tailGap != 0; | 
             | 
        |||
| 106 | enqueued = allocSize; | 
             | 
        |||
| 107 | 
             | 
        95 | |||
| 108 | // if the chunk was already full then length > 0, allocSize = 0, tailGap = 1 | 
             | 
        96 | if (enqueued == 0) | |
| 109 | if (alloc == m_size) | 
             | 
        |||
| 110 | return false; | 
             | 
        97 | return false; | |
| 111 | 
             | 
        98 | |||
| 112 | Array.Copy(batch, offset, m_data, alloc, allocSize); | 
             | 
        |||
| 113 | 
             | 
        99 | |||
| 114 | while (alloc != Interlocked.CompareExchange(ref m_hi, alloc + allocSize, alloc)) { | 
             | 
        100 | Array.Copy(batch, offset, m_data, alloc, enqueued); | |
| 
             | 
        101 | ||||
| 
             | 
        102 | while (alloc != Interlocked.CompareExchange(ref m_hi, alloc + enqueued, alloc)) { | |||
| 115 | // spin wait for commit | 
             | 
        103 | // spin wait for commit | |
| 116 | } | 
             | 
        104 | } | |
| 
             | 
        105 | ||||
| 117 | return true; | 
             | 
        106 | return true; | |
| 118 | } | 
             | 
        107 | } | |
| 119 | 
             | 
        108 | |||
| @@ -169,7 +158,7 namespace Implab.Parallels { | |||||
| 169 | if (extend || last == null) { | 
             | 
        158 | if (extend || last == null) { | |
| 170 | var chunk = new Chunk(m_chunkSize, value); | 
             | 
        159 | var chunk = new Chunk(m_chunkSize, value); | |
| 171 | if (EnqueueChunk(last, chunk)) | 
             | 
        160 | if (EnqueueChunk(last, chunk)) | |
| 172 | break; | 
             | 
        161 | break; // success! exit! | |
| 173 | last = m_last; | 
             | 
        162 | last = m_last; | |
| 174 | } else { | 
             | 
        163 | } else { | |
| 175 | while (last == m_last) { | 
             | 
        164 | while (last == m_last) { | |
| @@ -326,7 +315,7 namespace Implab.Parallels { | |||||
| 326 | /// Tries to dequeue all remaining data in the first chunk. | 
             | 
        315 | /// Tries to dequeue all remaining data in the first chunk. | |
| 327 | /// </summary> | 
             | 
        316 | /// </summary> | |
| 328 | /// <returns><c>true</c>, if data was dequeued, <c>false</c> otherwise.</returns> | 
             | 
        317 | /// <returns><c>true</c>, if data was dequeued, <c>false</c> otherwise.</returns> | |
| 329 | /// <param name="buffer">The buffer to which data will be written.</param> | 
             | 
        318 | /// <param name="buffer">The buffer to which the data will be written.</param> | |
| 330 | /// <param name="offset">The offset in the buffer at which the data will be written.</param> | 
             | 
        319 | /// <param name="offset">The offset in the buffer at which the data will be written.</param> | |
| 331 | /// <param name="length">Tha maximum amount of the data to be dequeued.</param> | 
             | 
        320 | /// <param name="length">Tha maximum amount of the data to be dequeued.</param> | |
| 332 | /// <param name="dequeued">The actual amount of the dequeued data.</param> | 
             | 
        321 | /// <param name="dequeued">The actual amount of the dequeued data.</param> | |
| @@ -54,7 +54,7 namespace MonoPlay { | |||||
| 54 | .Combine() | 
             | 
        54 | .Combine() | |
| 55 | .Join(); | 
             | 
        55 | .Join(); | |
| 56 | 
             | 
        56 | |||
| 57 | Console.WriteLine("done: {0} ms, summ#1: {1}, summ#2: {2}, total: {3}", Environment.TickCount - t1, res1, res2, res1 + res2); | 
             | 
        57 | Console.WriteLine("done: {0} ms, summ#1: {1}, summ#2: {2}, total: {3}, count: {4}", Environment.TickCount - t1, res1, res2, res1 + res2, count*2); | |
| 58 | 
             | 
        58 | |||
| 59 | var t2 = Environment.TickCount; | 
             | 
        59 | var t2 = Environment.TickCount; | |
| 60 | Console.WriteLine("MTQueue: {0} ms", t2 - t1); | 
             | 
        60 | Console.WriteLine("MTQueue: {0} ms", t2 - t1); | |
        
        General Comments 0
    
    
  
  
                      You need to be logged in to leave comments.
                      Login now
                    
                