diff --git a/Implab/Parallels/AsyncPool.cs b/Implab/Parallels/AsyncPool.cs --- a/Implab/Parallels/AsyncPool.cs +++ b/Implab/Parallels/AsyncPool.cs @@ -1,6 +1,7 @@ using Implab.Diagnostics; using System; using System.Threading; +using System.Linq; namespace Implab.Parallels { /// @@ -73,5 +74,13 @@ namespace Implab.Parallels { return p; } + + public static IPromise[] ThreadRun(params Action[] func) { + return func.Select(f => InvokeNewThread(f)).ToArray(); + } + + public static IPromise[] ThreadRun(params Func[] func) { + return func.Select(f => InvokeNewThread(f)).ToArray(); + } } } diff --git a/Implab/Parallels/AsyncQueue.cs b/Implab/Parallels/AsyncQueue.cs --- a/Implab/Parallels/AsyncQueue.cs +++ b/Implab/Parallels/AsyncQueue.cs @@ -36,19 +36,14 @@ namespace Implab.Parallels { } public bool TryEnqueue(T value,out bool extend) { - extend = false; - int alloc; - do { - alloc = m_alloc; - if (alloc > m_size) - return false; - } while(alloc != Interlocked.CompareExchange(ref m_alloc, alloc+1, alloc)); + var alloc = Interlocked.Increment(ref m_alloc) - 1; - if (alloc == m_size) { - extend = true; + if (alloc >= m_size) { + extend = alloc == m_size; return false; } - + + extend = false; m_data[alloc] = value; while (alloc != Interlocked.CompareExchange(ref m_hi, alloc + 1, alloc)) { @@ -74,6 +69,38 @@ namespace Implab.Parallels { return true; } + public bool TryEnqueueBatch(T[] batch, int offset, int length, out int enqueued, out bool extend) { + int alloc; + int allocSize; + + do { + alloc = m_alloc; + + if (alloc > m_size) { + enqueued = 0; + extend = false; + return false; + } + + allocSize = Math.Min(m_size - m_alloc, length); + } while(alloc != Interlocked.CompareExchange(ref m_alloc, alloc + allocSize, alloc)); + + if (alloc == m_size) { + enqueued = 0; + extend = true; + return false; + } + + Array.Copy(batch, offset, m_data, alloc, allocSize); + enqueued = allocSize; + extend = false; + + while (alloc != Interlocked.CompareExchange(ref m_hi, alloc + allocSize, alloc)) { + // spin wait for commit + } + return true; + } + public T GetAt(int pos) { return m_data[pos]; } diff --git a/MonoPlay/Program.cs b/MonoPlay/Program.cs --- a/MonoPlay/Program.cs +++ b/MonoPlay/Program.cs @@ -19,30 +19,30 @@ namespace MonoPlay { var t1 = Environment.TickCount; - new [] { - AsyncPool.InvokeNewThread(() => { + AsyncPool.ThreadRun( + () => { for (var i = 0; i < count; i++) q1.Enqueue(i); - }), - AsyncPool.InvokeNewThread(() => { + }, + () => { for (var i = 0; i < count; i++) q1.Enqueue(i); - }), - AsyncPool.InvokeNewThread(() => { + }, + () => { int temp = 0; int i = 0; while (i < count) if (q1.TryDequeue(out temp)) i++; - }), - AsyncPool.InvokeNewThread(() => { + }, + () => { int temp = 0; int i = 0; while (i < count) if (q1.TryDequeue(out temp)) i++; - }) - } + } + ) .Combine() .Join(); @@ -65,18 +65,18 @@ namespace MonoPlay { t1 = Environment.TickCount; - new [] { - AsyncPool.InvokeNewThread(() => { + AsyncPool.ThreadRun( + () => { for (var i = 0; i < count; i++) lock (q2) q2.Enqueue(i); - }), - AsyncPool.InvokeNewThread(() => { + }, + () => { for (var i = 0; i < count; i++) lock (q2) q2.Enqueue(i); - }), - AsyncPool.InvokeNewThread(() => { + }, + () => { for (int i = 0; i < count ;) lock (q2) { if (q2.Count == 0) @@ -85,8 +85,8 @@ namespace MonoPlay { i++; } - }), - AsyncPool.InvokeNewThread(() => { + }, + () => { for (int i = 0; i < count ;) lock (q2) { if (q2.Count == 0) @@ -95,8 +95,8 @@ namespace MonoPlay { i++; } - }) - } + } + ) .Combine() .Join();