# HG changeset patch # User cin # Date 2015-01-14 23:43:14 # Node ID a336cb13c6a9655bc8b68d0c4e0fb4639bdc571c # Parent f4d6ea6969cc21006dedc2227988670dd4115be2 major update, added Drain mathod to AsyncQueue class diff --git a/Implab.Test/AsyncTests.cs b/Implab.Test/AsyncTests.cs --- a/Implab.Test/AsyncTests.cs +++ b/Implab.Test/AsyncTests.cs @@ -249,7 +249,7 @@ namespace Implab.Test { for (int i = 0; i < writersCount; i++) { Interlocked.Increment(ref writers); AsyncPool - .InvokeNewThread(() => { + .RunThread(() => { for (int ii = 0; ii < itemsPerWriter; ii++) { queue.Enqueue(1); } @@ -261,7 +261,7 @@ namespace Implab.Test { for (int i = 0; i < 10; i++) { Interlocked.Increment(ref readers); AsyncPool - .InvokeNewThread(() => { + .RunThread(() => { int t; do { while (queue.TryDequeue(out t)) @@ -336,7 +336,7 @@ namespace Implab.Test { Console.WriteLine("done reader #2: {0} ms", Environment.TickCount - t1); } ) - .Combine() + .Bundle() .Join(); Assert.AreEqual(count * 3, res1 + res2); @@ -414,7 +414,7 @@ namespace Implab.Test { Console.WriteLine("done reader #2: {0} ms", Environment.TickCount - t1); } ) - .Combine() + .Bundle() .Join(); Assert.AreEqual(summ , r1 + r2); @@ -490,7 +490,110 @@ namespace Implab.Test { Console.WriteLine("done reader #2: {0} ms, avg chunk size: {1}", Environment.TickCount - t1, avgchunk); } ) - .Combine() + .Bundle() + .Join(); + + Assert.AreEqual(summ , r1 + r2); + + Console.WriteLine( + "done: {0} ms, summ#1: {1}, summ#2: {2}, total: {3}, count: {4}", + Environment.TickCount - t1, + r1, + r2, + r1 + r2, + total + ); + } + + [TestMethod] + public void AsyncQueueDrainTest() { + var queue = new AsyncQueue(); + + const int wBatch = 11; + const int wCount = 200000; + const int total = wBatch * wCount * 3; + const int summ = wBatch * wCount * 3; + + int r1 = 0, r2 = 0; + const int rBatch = 11; + int read = 0; + + var t1 = Environment.TickCount; + + AsyncPool.RunThread( + () => { + var buffer = new int[wBatch]; + for(int i = 0; i { + for(int i =0; i < wCount * wBatch; i++) + queue.Enqueue(1); + Console.WriteLine("done writer #2: {0} ms", Environment.TickCount - t1); + }, + () => { + var buffer = new int[wBatch]; + for(int i = 0; i { + int temp; + int count = 0; + while (read < total) + if (queue.TryDequeue(out temp)) { + count++; + r1 += temp; + Interlocked.Increment(ref read); + } + Console.WriteLine("done reader #1: {0} ms, {1} count", Environment.TickCount - t1, count); + },*/ + /*() => { + var buffer = new int[rBatch]; + var count = 0; + while(read < total) { + int actual; + if (queue.TryDequeueRange(buffer,0,rBatch,out actual)) { + for(int i=0; i< actual; i++) + r1 += buffer[i]; + Interlocked.Add(ref read, actual); + count += actual; + } + } + + Console.WriteLine("done reader #1: {0} ms, {1} items", Environment.TickCount - t1, count); + },*/ + () => { + var count = 0; + while(read < total) { + var buffer = queue.Drain(); + for(int i=0; i< buffer.Length; i++) + r1 += buffer[i]; + Interlocked.Add(ref read, buffer.Length); + count += buffer.Length; + } + Console.WriteLine("done reader #1: {0} ms, {1} items", Environment.TickCount - t1, count); + }, + () => { + var count = 0; + while(read < total) { + var buffer = queue.Drain(); + for(int i=0; i< buffer.Length; i++) + r2 += buffer[i]; + Interlocked.Add(ref read, buffer.Length); + count += buffer.Length; + } + Console.WriteLine("done reader #2: {0} ms, {1} items", Environment.TickCount - t1, count); + } + ) + .Bundle() .Join(); Assert.AreEqual(summ , r1 + r2); diff --git a/Implab/Parallels/ArrayTraits.cs b/Implab/Parallels/ArrayTraits.cs --- a/Implab/Parallels/ArrayTraits.cs +++ b/Implab/Parallels/ArrayTraits.cs @@ -162,7 +162,7 @@ namespace Implab.Parallels { int slots = threads; // Analysis disable AccessToDisposedClosure - AsyncPool.InvokeNewThread(() => { + AsyncPool.RunThread(() => { for (int i = 0; i < source.Length; i++) { if(promise.IsResolved) break; // stop processing in case of error or cancellation diff --git a/Implab/Parallels/AsyncPool.cs b/Implab/Parallels/AsyncPool.cs --- a/Implab/Parallels/AsyncPool.cs +++ b/Implab/Parallels/AsyncPool.cs @@ -31,7 +31,7 @@ namespace Implab.Parallels { return p; } - public static IPromise InvokeNewThread(Func func) { + public static IPromise RunThread(Func func) { var p = new Promise(); var caller = TraceContext.Instance.CurrentOperation; @@ -53,7 +53,7 @@ namespace Implab.Parallels { } - public static IPromise InvokeNewThread(Action func) { + public static IPromise RunThread(Action func) { var p = new Promise(); var caller = TraceContext.Instance.CurrentOperation; @@ -76,11 +76,11 @@ namespace Implab.Parallels { } public static IPromise[] RunThread(params Action[] func) { - return func.Select(f => InvokeNewThread(f)).ToArray(); + return func.Select(f => RunThread(f)).ToArray(); } public static IPromise[] RunThread(params Func[] func) { - return func.Select(f => InvokeNewThread(f)).ToArray(); + return func.Select(f => RunThread(f)).ToArray(); } } } diff --git a/Implab/Parallels/AsyncQueue.cs b/Implab/Parallels/AsyncQueue.cs --- a/Implab/Parallels/AsyncQueue.cs +++ b/Implab/Parallels/AsyncQueue.cs @@ -2,6 +2,7 @@ using System.Collections.Generic; using System; using System.Collections; +using System.Diagnostics; namespace Implab.Parallels { public class AsyncQueue : IEnumerable { @@ -60,6 +61,16 @@ namespace Implab.Parallels { return true; } + /// + /// Prevents from allocating new space in the chunk and waits for all write operations to complete + /// + public void Commit() { + var actual = Math.Min(Interlocked.Exchange(ref m_alloc, m_size + 1), m_size); + + while (m_hi != actual) + Thread.MemoryBarrier(); + } + public bool TryDequeue(out T value, out bool recycle) { int low; do { @@ -359,76 +370,114 @@ namespace Implab.Parallels { if (last != null) last.next = chunk; - else + else { m_first = chunk; + } return true; } void RecycleFirstChunk(Chunk first) { var next = first.next; + if (first != Interlocked.CompareExchange(ref m_first, next, first)) + return; + if (next == null) { - // looks like this is the last chunk + if (first != Interlocked.CompareExchange(ref m_last, null, first)) { + /*while (first.next == null) + Thread.MemoryBarrier();*/ + // race - // maybe someone already recycled this chunk - // or a new chunk has been appedned to the queue - - return; // give up + // someone already updated the tail, restore the pointer to the queue head + m_first = first; } // the tail is updated } // we need to update the head - Interlocked.CompareExchange(ref m_first, next, first); + //Interlocked.CompareExchange(ref m_first, next, first); // if the head is already updated then give up - return; + //return; } public void Clear() { // start the new queue - var t = new Chunk(m_chunkSize); - Thread.MemoryBarrier(); - m_last = t; - Thread.MemoryBarrier(); + var chunk = new Chunk(m_chunkSize); + + do { + Thread.MemoryBarrier(); + var first = m_first; + var last = m_last; + + if (last == null) // nothing to clear + return; - // make the new queue available to the readers, and stop the old one - m_first = t; - Thread.MemoryBarrier(); + if (first == null || (first.next == null && first != last)) // inconcistency + continue; + + // here we will create inconsistency which will force others to spin + // and prevent from fetching. chunk.next = null + if (first != Interlocked.CompareExchange(ref m_first, chunk, first)) + continue;// inconsistent + + m_last = chunk; + + return; + + } while(true); } public T[] Drain() { // start the new queue - var t = new Chunk(m_chunkSize); - Thread.MemoryBarrier(); - m_last = t; - Thread.MemoryBarrier(); - - // make the new queue available to the readers, and stop the old one - Chunk first; + var chunk = new Chunk(m_chunkSize); do { - first = m_first; - } while(first != Interlocked.CompareExchange(ref m_first - Thread.MemoryBarrier(); + Thread.MemoryBarrier(); + var first = m_first; + var last = m_last; + + if (last == null) + return new T[0]; + + if (first == null || (first.next == null && first != last)) + continue; + // here we will create inconsistency which will force others to spin + // and prevent from fetching. chunk.next = null + if (first != Interlocked.CompareExchange(ref m_first, chunk, first)) + continue;// inconsistent + last = Interlocked.Exchange(ref m_last, chunk); + + return ReadChunks(first, last); + + } while(true); } - T[] ReadChunks(Chunk chunk) { + T[] ReadChunks(Chunk chunk, object last) { var result = new List(); var buffer = new T[m_chunkSize]; int actual; bool recycle; while (chunk != null) { + // ensure all write operations on the chunk are complete + chunk.Commit(); + // we need to read the chunk using this way // since some client still may completing the dequeue // operation, such clients most likely won't get results while (chunk.TryDequeueBatch(buffer, 0, buffer.Length, out actual, out recycle)) result.AddRange(new ArraySegmentCollection(buffer, 0, actual)); - chunk = chunk.next; + if (chunk == last) { + chunk = null; + } else { + while (chunk.next == null) + Thread.MemoryBarrier(); + chunk = chunk.next; + } } return result.ToArray(); diff --git a/Implab/PromiseExtensions.cs b/Implab/PromiseExtensions.cs --- a/Implab/PromiseExtensions.cs +++ b/Implab/PromiseExtensions.cs @@ -94,12 +94,18 @@ namespace Implab { return that; } - public static IPromise Combine(this ICollection that) { + public static IPromise Bundle(this ICollection that) { Safe.ArgumentNotNull(that, "that"); int count = that.Count; + int errors = 0; var medium = new Promise(); + medium.On(() => { + foreach(var p2 in that) + p2.Cancel(); + }, PromiseEventType.ErrorOrCancel); + foreach (var p in that) p.On( () => { @@ -107,15 +113,62 @@ namespace Implab { medium.Resolve(); }, error => { - throw new Exception("The dependency promise is failed", error); + if (Interlocked.Increment(ref errors) == 1) + medium.Reject( + new Exception("The dependency promise is failed", error) + ); }, () => { - throw new OperationCanceledException("The dependency promise is cancelled"); + if (Interlocked.Increment(ref errors) == 1) + medium.Reject( + new Exception("The dependency promise is cancelled") + ); } ); return medium; } + + public static IPromise Bundle(this ICollection> that) { + Safe.ArgumentNotNull(that, "that"); + + int count = that.Count; + int errors = 0; + var medium = new Promise(); + var results = new T[that.Count]; + + medium.On(() => { + foreach(var p2 in that) + p2.Cancel(); + }, PromiseEventType.ErrorOrCancel); + + int i = 0; + foreach (var p in that) { + var idx = i; + p.On( + x => { + results[idx] = x; + if (Interlocked.Decrement(ref count) == 0) + medium.Resolve(results); + }, + error => { + if (Interlocked.Increment(ref errors) == 1) + medium.Reject( + new Exception("The dependency promise is failed", error) + ); + }, + () => { + if (Interlocked.Increment(ref errors) == 1) + medium.Reject( + new Exception("The dependency promise is cancelled") + ); + } + ); + i++; + } + + return medium; + } #if NET_4_5 diff --git a/MonoPlay/Program.cs b/MonoPlay/Program.cs --- a/MonoPlay/Program.cs +++ b/MonoPlay/Program.cs @@ -51,7 +51,7 @@ namespace MonoPlay { Console.WriteLine("done reader #2: {0} ms", Environment.TickCount - t1); } ) - .Combine() + .Bundle() .Join(); Console.WriteLine("done: {0} ms, summ#1: {1}, summ#2: {2}, total: {3}, count: {4}", Environment.TickCount - t1, res1, res2, res1 + res2, count*2); @@ -107,7 +107,7 @@ namespace MonoPlay { } ) - .Combine() + .Bundle() .Join();