# HG changeset patch # User cin # Date 2015-01-12 15:19:41 # Node ID 62d2f1e98c4e6f4c45f33227b1f245c8f9650b15 # Parent f1b8979992609f8d18320baae2a8e83ff223357b working version of AsyncQueue and batch operations tests diff --git a/Implab.Test/AsyncTests.cs b/Implab.Test/AsyncTests.cs --- a/Implab.Test/AsyncTests.cs +++ b/Implab.Test/AsyncTests.cs @@ -299,47 +299,134 @@ namespace Implab.Test { Assert.AreEqual(i, res); } - int writers = 0; - int readers = 0; - var stop = new ManualResetEvent(false); - int total = 0; + const int count = 10000000; - const int itemsPerWriter = 10000; - const int writersCount = 10; + int res1 = 0, res2 = 0; + var t1 = Environment.TickCount; - for (int i = 0; i < writersCount; i++) { - Interlocked.Increment(ref writers); - AsyncPool - .InvokeNewThread(() => { - for (int ii = 0; ii < itemsPerWriter; ii++) { - queue.Enqueue(1); + AsyncPool.RunThread( + () => { + for (var i = 0; i < count; i++) + queue.Enqueue(1); + Console.WriteLine("done writer #1: {0} ms", Environment.TickCount - t1); + }, + () => { + for (var i = 0; i < count; i++) + queue.Enqueue(2); + Console.WriteLine("done writer #2: {0} ms", Environment.TickCount - t1); + }, + () => { + int temp; + int i = 0; + while (i < count) + if (queue.TryDequeue(out temp)) { + i++; + res1 += temp; } - return 1; - }) - .On(() => Interlocked.Decrement(ref writers), PromiseEventType.All); - } + Console.WriteLine("done reader #1: {0} ms", Environment.TickCount - t1); + }, + () => { + int temp; + int i = 0; + while (i < count) + if (queue.TryDequeue(out temp)) { + i++; + res2 += temp; + } + Console.WriteLine("done reader #2: {0} ms", Environment.TickCount - t1); + } + ) + .Combine() + .Join(); + + Assert.AreEqual(count * 3, res1 + res2); + + Console.WriteLine( + "done: {0} ms, summ#1: {1}, summ#2: {2}, total: {3}, count: {4}", + Environment.TickCount - t1, + res1, + res2, + res1 + res2, + count + ); + } + + [TestMethod] + public void AsyncQueueBatchTest() { + var queue = new AsyncQueue(); + + const int wBatch = 29; + const int wCount = 400000; + const int total = wBatch * wCount * 2; + const int summ = wBatch * wCount * 3; - for (int i = 0; i < 10; i++) { - Interlocked.Increment(ref readers); - AsyncPool - .InvokeNewThread(() => { - int t; - do { - while (queue.TryDequeue(out t)) - Interlocked.Add(ref total, t); - } while (writers > 0); - return 1; - }) - .On(() => { - Interlocked.Decrement(ref readers); - if (readers == 0) - stop.Set(); - }, PromiseEventType.All); - } + int r1 = 0, r2 = 0; + const int rBatch = 111; + int read = 0; + + var t1 = Environment.TickCount; + + AsyncPool.RunThread( + () => { + var buffer = new int[wBatch]; + for(int i = 0; i { + var buffer = new int[wBatch]; + for(int i = 0; i { + var buffer = new int[rBatch]; - stop.WaitOne(); + while(read < total) { + int actual; + if (queue.TryDequeueRange(buffer,0,rBatch,out actual)) { + for(int i=0; i< actual; i++) + r1 += buffer[i]; + Interlocked.Add(ref read, actual); + } + } + + Console.WriteLine("done reader #1: {0} ms", Environment.TickCount - t1); + }, + () => { + var buffer = new int[rBatch]; - Assert.AreEqual(itemsPerWriter * writersCount, total); + while(read < total) { + int actual; + if (queue.TryDequeueRange(buffer,0,rBatch,out actual)) { + for(int i=0; i< actual; i++) + r2 += buffer[i]; + Interlocked.Add(ref read, actual); + } + } + + Console.WriteLine("done reader #2: {0} ms", Environment.TickCount - t1); + } + ) + .Combine() + .Join(); + + Assert.AreEqual(summ , r1 + r2); + + Console.WriteLine( + "done: {0} ms, summ#1: {1}, summ#2: {2}, total: {3}, count: {4}", + Environment.TickCount - t1, + r1, + r2, + r1 + r2, + total + ); } [TestMethod] diff --git a/Implab/Parallels/AsyncPool.cs b/Implab/Parallels/AsyncPool.cs --- a/Implab/Parallels/AsyncPool.cs +++ b/Implab/Parallels/AsyncPool.cs @@ -75,11 +75,11 @@ namespace Implab.Parallels { return p; } - public static IPromise[] ThreadRun(params Action[] func) { + public static IPromise[] RunThread(params Action[] func) { return func.Select(f => InvokeNewThread(f)).ToArray(); } - public static IPromise[] ThreadRun(params Func[] func) { + public static IPromise[] RunThread(params Func[] func) { return func.Select(f => InvokeNewThread(f)).ToArray(); } } diff --git a/Implab/Parallels/AsyncQueue.cs b/Implab/Parallels/AsyncQueue.cs --- a/Implab/Parallels/AsyncQueue.cs +++ b/Implab/Parallels/AsyncQueue.cs @@ -27,6 +27,14 @@ namespace Implab.Parallels { m_data[0] = value; } + public Chunk(int size, T[] data, int offset, int length, int alloc) { + m_size = size; + m_hi = length; + m_alloc = alloc; + m_data = new T[size]; + Array.Copy(data, offset, m_data, 0, length); + } + public int Low { get { return m_low; } } @@ -35,7 +43,7 @@ namespace Implab.Parallels { get { return m_hi; } } - public bool TryEnqueue(T value,out bool extend) { + public bool TryEnqueue(T value, out bool extend) { var alloc = Interlocked.Increment(ref m_alloc) - 1; if (alloc >= m_size) { @@ -52,7 +60,7 @@ namespace Implab.Parallels { return true; } - public bool TryDequeue(out T value,out bool recycle) { + public bool TryDequeue(out T value, out bool recycle) { int low; do { low = m_low; @@ -73,27 +81,35 @@ namespace Implab.Parallels { int alloc; int allocSize; + // in case the batch size is larger than a free space in chunk + // tailGap is used to over allocate the space in the chunk to + // get exclusive permission on creation of the next one. + int tailGap = 0; + do { alloc = m_alloc; if (alloc > m_size) { - enqueued = 0; - extend = false; - return false; + // the chunk is full and someone already + // creating the new one + enqueued = 0; // nothing was added + extend = false; // the caller shouldn't try to extend the queue + return false; // nothing was added } - allocSize = Math.Min(m_size - m_alloc, length); - } while(alloc != Interlocked.CompareExchange(ref m_alloc, alloc + allocSize, alloc)); + allocSize = Math.Min(m_size - alloc, length); + if (allocSize < length) // the chunk doesn't have enough space to hold the whole batch + tailGap = 1; // overallocate space to get exclusive permission to extend queue + } while(alloc != Interlocked.CompareExchange(ref m_alloc, alloc + allocSize + tailGap, alloc)); + + extend = tailGap != 0; + enqueued = allocSize; - if (alloc == m_size) { - enqueued = 0; - extend = true; + // if the chunk was already full then length > 0, allocSize = 0, tailGap = 1 + if (alloc == m_size) return false; - } Array.Copy(batch, offset, m_data, alloc, allocSize); - enqueued = allocSize; - extend = false; while (alloc != Interlocked.CompareExchange(ref m_hi, alloc + allocSize, alloc)) { // spin wait for commit @@ -101,12 +117,35 @@ namespace Implab.Parallels { return true; } + public bool TryDequeueBatch(T[] buffer, int offset, int length,out int dequeued, out bool recycle) { + int low, hi, batchSize; + + do { + low = m_low; + hi = m_hi; + if (low >= hi) { + dequeued = 0; + recycle = (low == m_size); // recycling could be restarted and we need to signal again + return false; + } + batchSize = Math.Min(hi - low, length); + } while(low != Interlocked.CompareExchange(ref m_low, low + batchSize, low)); + + recycle = (low == m_size - batchSize); + dequeued = batchSize; + + Array.Copy(m_data, low, buffer, offset, batchSize); + + return true; + } + public T GetAt(int pos) { return m_data[pos]; } } public const int DEFAULT_CHUNK_SIZE = 32; + public const int MAX_CHUNK_SIZE = 262144; readonly int m_chunkSize = DEFAULT_CHUNK_SIZE; @@ -117,11 +156,15 @@ namespace Implab.Parallels { m_last = m_first = new Chunk(m_chunkSize); } + /// + /// Adds the specified value to the queue. + /// + /// Tha value which will be added to the queue. public void Enqueue(T value) { var last = m_last; // spin wait to the new chunk bool extend = true; - while(last == null || !last.TryEnqueue(value, out extend)) { + while (last == null || !last.TryEnqueue(value, out extend)) { // try to extend queue if (extend || last == null) { var chunk = new Chunk(m_chunkSize, value); @@ -129,14 +172,88 @@ namespace Implab.Parallels { break; last = m_last; } else { - while (last != m_last) { + while (last == m_last) { Thread.MemoryBarrier(); - last = m_last; } + last = m_last; } } } + /// + /// Adds the specified data to the queue. + /// + /// The buffer which contains the data to be enqueued. + /// The offset of the data in the buffer. + /// The size of the data to read from the buffer. + public void EnqueueRange(T[] data, int offset, int length) { + if (data == null) + throw new ArgumentNullException("data"); + if (offset < 0) + throw new ArgumentOutOfRangeException("offset"); + if (length < 1 || offset + length > data.Length) + throw new ArgumentOutOfRangeException("length"); + + var last = m_last; + + bool extend; + int enqueued; + + while (length > 0) { + extend = true; + if (last != null && last.TryEnqueueBatch(data, offset, length, out enqueued, out extend)) { + length -= enqueued; + offset += enqueued; + } + + if (extend) { + // there was no enough space in the chunk + // or there was no chunks in the queue + + while (length > 0) { + + var size = Math.Min(length, MAX_CHUNK_SIZE); + + var chunk = new Chunk( + Math.Max(size, m_chunkSize), + data, + offset, + size, + length // length >= size + ); + + if (!EnqueueChunk(last, chunk)) { + // looks like the queue has been updated then proceed from the beginning + last = m_last; + break; + } + + // we have successfully added the new chunk + last = chunk; + length -= size; + offset += size; + } + } else { + // we don't need to extend the queue, if we successfully enqueued data + if (length == 0) + break; + + // if we need to wait while someone is extending the queue + // spinwait + while (last == m_last) { + Thread.MemoryBarrier(); + } + + last = m_last; + } + } + } + + /// + /// Tries to retrieve the first element from the queue. + /// + /// true, if element is dequeued, false otherwise. + /// The value of the dequeued element. public bool TryDequeue(out T value) { var chunk = m_first; bool recycle; @@ -161,6 +278,92 @@ namespace Implab.Parallels { return false; } + /// + /// Tries to dequeue the specified amount of data from the queue. + /// + /// true, if data was deuqueued, false otherwise. + /// The buffer to which the data will be written. + /// The offset in the buffer at which the data will be written. + /// The maximum amount of data to be retrieved. + /// The actual amout of the retrieved data. + public bool TryDequeueRange(T[] buffer, int offset, int length, out int dequeued) { + if (buffer == null) + throw new ArgumentNullException("buffer"); + if (offset < 0) + throw new ArgumentOutOfRangeException("offset"); + if (length < 1 || offset + length > buffer.Length) + throw new ArgumentOutOfRangeException("length"); + + var chunk = m_first; + bool recycle; + dequeued = 0; + while (chunk != null) { + + int actual; + if (chunk.TryDequeueBatch(buffer, offset, length, out actual, out recycle)) { + offset += actual; + length -= actual; + dequeued += actual; + } + + if (recycle) // this chunk is waste + RecycleFirstChunk(chunk); + else if (actual == 0) + break; // the chunk is usable, but doesn't contain any data (it's the last chunk in the queue) + + if (length == 0) + return true; + + // we still may dequeue something + // try again + chunk = m_first; + } + + return dequeued != 0; + } + + /// + /// Tries to dequeue all remaining data in the first chunk. + /// + /// true, if data was dequeued, false otherwise. + /// The buffer to which data will be written. + /// The offset in the buffer at which the data will be written. + /// Tha maximum amount of the data to be dequeued. + /// The actual amount of the dequeued data. + public bool TryDequeueChunk(T[] buffer, int offset, int length, out int dequeued) { + if (buffer == null) + throw new ArgumentNullException("buffer"); + if (offset < 0) + throw new ArgumentOutOfRangeException("offset"); + if (length < 1 || offset + length > buffer.Length) + throw new ArgumentOutOfRangeException("length"); + + var chunk = m_first; + bool recycle; + dequeued = 0; + + while (chunk != null) { + + int actual; + if (chunk.TryDequeueBatch(buffer, offset, length, out actual, out recycle)) { + dequeued = actual; + } + + if (recycle) // this chunk is waste + RecycleFirstChunk(chunk); + + // if we have dequeued any data, then return + if (dequeued != 0) + return true; + + // we still may dequeue something + // try again + chunk = m_first; + } + + return false; + } + bool EnqueueChunk(Chunk last, Chunk chunk) { if (Interlocked.CompareExchange(ref m_last, chunk, last) != last) return false; diff --git a/Implab/Safe.cs b/Implab/Safe.cs --- a/Implab/Safe.cs +++ b/Implab/Safe.cs @@ -9,31 +9,31 @@ namespace Implab { public static class Safe { - public static void ArgumentMatch(string param, string name, Regex rx) { + public static void ArgumentMatch(string value, string paramName, Regex rx) { if (rx == null) throw new ArgumentNullException("rx"); - if (!rx.IsMatch(param)) - throw new ArgumentException(String.Format("The prameter value must match {0}", rx), name); + if (!rx.IsMatch(value)) + throw new ArgumentException(String.Format("The prameter value must match {0}", rx), paramName); } - public static void ArgumentNotEmpty(string param, string name) { - if (String.IsNullOrEmpty(param)) - throw new ArgumentException("The parameter can't be empty", name); + public static void ArgumentNotEmpty(string value, string paramName) { + if (String.IsNullOrEmpty(value)) + throw new ArgumentException("The parameter can't be empty", paramName); } - public static void ArgumentNotEmpty(T[] param, string name) { - if (param == null || param.Length == 0) - throw new ArgumentException("The array must be not emty", name); + public static void ArgumentNotEmpty(T[] value, string paramName) { + if (value == null || value.Length == 0) + throw new ArgumentException("The array must be not emty", paramName); } - public static void ArgumentNotNull(object param, string name) { - if (param == null) - throw new ArgumentNullException(name); + public static void ArgumentNotNull(object value, string paramName) { + if (value == null) + throw new ArgumentNullException(paramName); } - public static void ArgumentInRange(int arg, int min, int max, string name) { - if (arg < min || arg > max) - throw new ArgumentOutOfRangeException(name); + public static void ArgumentInRange(int value, int min, int max, string paramName) { + if (value < min || value > max) + throw new ArgumentOutOfRangeException(paramName); } public static void Dispose(T obj) where T : class diff --git a/MonoPlay/Program.cs b/MonoPlay/Program.cs --- a/MonoPlay/Program.cs +++ b/MonoPlay/Program.cs @@ -16,36 +16,46 @@ namespace MonoPlay { const int count = 10000000; - + int res1 = 0, res2 = 0; var t1 = Environment.TickCount; - AsyncPool.ThreadRun( + AsyncPool.RunThread( () => { for (var i = 0; i < count; i++) - q1.Enqueue(i); + q1.Enqueue(1); + Console.WriteLine("done writer #1: {0} ms", Environment.TickCount - t1); }, () => { for (var i = 0; i < count; i++) - q1.Enqueue(i); + q1.Enqueue(2); + Console.WriteLine("done writer #2: {0} ms", Environment.TickCount - t1); }, () => { int temp = 0; int i = 0; while (i < count) - if (q1.TryDequeue(out temp)) + if (q1.TryDequeue(out temp)) { i++; + res1 += temp; + } + Console.WriteLine("done reader #1: {0} ms", Environment.TickCount - t1); }, () => { int temp = 0; int i = 0; while (i < count) - if (q1.TryDequeue(out temp)) + if (q1.TryDequeue(out temp)) { i++; + res2 += temp; + } + Console.WriteLine("done reader #2: {0} ms", Environment.TickCount - t1); } ) .Combine() .Join(); + Console.WriteLine("done: {0} ms, summ#1: {1}, summ#2: {2}, total: {3}", Environment.TickCount - t1, res1, res2, res1 + res2); + var t2 = Environment.TickCount; Console.WriteLine("MTQueue: {0} ms", t2 - t1); @@ -65,7 +75,7 @@ namespace MonoPlay { t1 = Environment.TickCount; - AsyncPool.ThreadRun( + AsyncPool.RunThread( () => { for (var i = 0; i < count; i++) lock (q2)