diff --git a/Implab.Test/AsyncTests.cs b/Implab.Test/AsyncTests.cs --- a/Implab.Test/AsyncTests.cs +++ b/Implab.Test/AsyncTests.cs @@ -430,6 +430,82 @@ namespace Implab.Test { } [TestMethod] + public void AsyncQueueChunkDequeueTest() { + var queue = new AsyncQueue(); + + const int wBatch = 31; + const int wCount = 200000; + const int total = wBatch * wCount * 3; + const int summ = wBatch * wCount * 6; + + int r1 = 0, r2 = 0; + const int rBatch = 1024; + int read = 0; + + var t1 = Environment.TickCount; + + AsyncPool.RunThread( + () => { + var buffer = new int[wBatch]; + for(int i = 0; i { + var buffer = new int[wBatch]; + for(int i = 0; i { + var buffer = new int[wBatch]; + for(int i = 0; i { + var buffer = new int[rBatch]; + int count = 1; + double avgchunk = 0; + while(read < total) { + int actual; + if (queue.TryDequeueChunk(buffer,0,rBatch,out actual)) { + for(int i=0; i< actual; i++) + r2 += buffer[i]; + Interlocked.Add(ref read, actual); + avgchunk = avgchunk*(count-1)/count + actual/(double)count; + count ++; + } + } + + Console.WriteLine("done reader #2: {0} ms, avg chunk size: {1}", Environment.TickCount - t1, avgchunk); + } + ) + .Combine() + .Join(); + + Assert.AreEqual(summ , r1 + r2); + + Console.WriteLine( + "done: {0} ms, summ#1: {1}, summ#2: {2}, total: {3}, count: {4}", + Environment.TickCount - t1, + r1, + r2, + r1 + r2, + total + ); + } + + [TestMethod] public void ParallelMapTest() { const int count = 100000; diff --git a/Implab/Parallels/AsyncQueue.cs b/Implab/Parallels/AsyncQueue.cs --- a/Implab/Parallels/AsyncQueue.cs +++ b/Implab/Parallels/AsyncQueue.cs @@ -78,42 +78,31 @@ namespace Implab.Parallels { } public bool TryEnqueueBatch(T[] batch, int offset, int length, out int enqueued, out bool extend) { - int alloc; - int allocSize; - - // in case the batch size is larger than a free space in chunk - // tailGap is used to over allocate the space in the chunk to - // get exclusive permission on creation of the next one. - int tailGap = 0; - - do { - alloc = m_alloc; + //int alloc; + //int allocSize; - if (alloc > m_size) { - // the chunk is full and someone already - // creating the new one - enqueued = 0; // nothing was added - extend = false; // the caller shouldn't try to extend the queue - return false; // nothing was added - } + var alloc = Interlocked.Add(ref m_alloc, length) - length; + if (alloc > m_size) { + // the chunk is full and someone already + // creating the new one + enqueued = 0; // nothing was added + extend = false; // the caller shouldn't try to extend the queue + return false; // nothing was added + } - allocSize = Math.Min(m_size - alloc, length); - if (allocSize < length) // the chunk doesn't have enough space to hold the whole batch - tailGap = 1; // overallocate space to get exclusive permission to extend queue - } while(alloc != Interlocked.CompareExchange(ref m_alloc, alloc + allocSize + tailGap, alloc)); - - extend = tailGap != 0; - enqueued = allocSize; + enqueued = Math.Min(m_size - alloc, length); + extend = length > enqueued; - // if the chunk was already full then length > 0, allocSize = 0, tailGap = 1 - if (alloc == m_size) + if (enqueued == 0) return false; - Array.Copy(batch, offset, m_data, alloc, allocSize); - while (alloc != Interlocked.CompareExchange(ref m_hi, alloc + allocSize, alloc)) { + Array.Copy(batch, offset, m_data, alloc, enqueued); + + while (alloc != Interlocked.CompareExchange(ref m_hi, alloc + enqueued, alloc)) { // spin wait for commit } + return true; } @@ -169,7 +158,7 @@ namespace Implab.Parallels { if (extend || last == null) { var chunk = new Chunk(m_chunkSize, value); if (EnqueueChunk(last, chunk)) - break; + break; // success! exit! last = m_last; } else { while (last == m_last) { @@ -326,7 +315,7 @@ namespace Implab.Parallels { /// Tries to dequeue all remaining data in the first chunk. /// /// true, if data was dequeued, false otherwise. - /// The buffer to which data will be written. + /// The buffer to which the data will be written. /// The offset in the buffer at which the data will be written. /// Tha maximum amount of the data to be dequeued. /// The actual amount of the dequeued data. diff --git a/MonoPlay/Program.cs b/MonoPlay/Program.cs --- a/MonoPlay/Program.cs +++ b/MonoPlay/Program.cs @@ -54,7 +54,7 @@ namespace MonoPlay { .Combine() .Join(); - Console.WriteLine("done: {0} ms, summ#1: {1}, summ#2: {2}, total: {3}", Environment.TickCount - t1, res1, res2, res1 + res2); + Console.WriteLine("done: {0} ms, summ#1: {1}, summ#2: {2}, total: {3}, count: {4}", Environment.TickCount - t1, res1, res2, res1 + res2, count*2); var t2 = Environment.TickCount; Console.WriteLine("MTQueue: {0} ms", t2 - t1);