diff --git a/Implab.Test/AsyncTests.cs b/Implab.Test/AsyncTests.cs --- a/Implab.Test/AsyncTests.cs +++ b/Implab.Test/AsyncTests.cs @@ -633,7 +633,7 @@ namespace Implab.Test { [TestMethod] public void ChainedMapTest() { - using (var pool = new WorkerPool(0,10,1)) { + using (var pool = new WorkerPool()) { const int count = 10000; var args = new double[count]; diff --git a/Implab/AbstractPromise.cs b/Implab/AbstractPromise.cs --- a/Implab/AbstractPromise.cs +++ b/Implab/AbstractPromise.cs @@ -12,10 +12,16 @@ namespace Implab { const int REJECTED_STATE = 3; const int CANCELLED_STATE = 4; + const int RESERVED_HANDLERS_COUNT = 4; + int m_state; Exception m_error; + int m_handlersCount; - readonly AsyncQueue m_handlers = new AsyncQueue(); + readonly THandler[] m_handlers = new THandler[RESERVED_HANDLERS_COUNT]; + MTQueue m_extraHandlers; + int m_handlerPointer = -1; + int m_handlersCommited; #region state managment bool BeginTransit() { @@ -88,21 +94,58 @@ namespace Implab { protected abstract void SignalCancelled(THandler handler); void OnSuccess() { - THandler handler; - while (m_handlers.TryDequeue(out handler)) - SignalSuccess(handler); + var hp = m_handlerPointer; + var slot = hp +1 ; + while (slot < m_handlersCommited) { + if (Interlocked.CompareExchange(ref m_handlerPointer, slot, hp) == hp) { + SignalSuccess(m_handlers[slot]); + } + hp = m_handlerPointer; + slot = hp +1 ; + } + + + if (m_extraHandlers != null) { + THandler handler; + while (m_extraHandlers.TryDequeue(out handler)) + SignalSuccess(handler); + } } void OnError() { - THandler handler; - while (m_handlers.TryDequeue(out handler)) - SignalError(handler,m_error); + var hp = m_handlerPointer; + var slot = hp +1 ; + while (slot < m_handlersCommited) { + if (Interlocked.CompareExchange(ref m_handlerPointer, slot, hp) == hp) { + SignalError(m_handlers[slot],m_error); + } + hp = m_handlerPointer; + slot = hp +1 ; + } + + if (m_extraHandlers != null) { + THandler handler; + while (m_extraHandlers.TryDequeue(out handler)) + SignalError(handler, m_error); + } } void OnCancelled() { - THandler handler; - while (m_handlers.TryDequeue(out handler)) - SignalCancelled(handler); + var hp = m_handlerPointer; + var slot = hp +1 ; + while (slot < m_handlersCommited) { + if (Interlocked.CompareExchange(ref m_handlerPointer, slot, hp) == hp) { + SignalCancelled(m_handlers[slot]); + } + hp = m_handlerPointer; + slot = hp +1 ; + } + + if (m_extraHandlers != null) { + THandler handler; + while (m_extraHandlers.TryDequeue(out handler)) + SignalCancelled(handler); + } } #endregion @@ -145,21 +188,48 @@ namespace Implab { protected void AddHandler(THandler handler) { - if (IsResolved) { - InvokeHandler(handler); - - } else { + if (m_state > 1) { // the promise is in the resolved state, just invoke the handler - m_handlers.Enqueue(handler); + InvokeHandler(handler); + } else { + var slot = Interlocked.Increment(ref m_handlersCount) - 1; + + if (slot < RESERVED_HANDLERS_COUNT) { + m_handlers[slot] = handler; + + while (slot != Interlocked.CompareExchange(ref m_handlersCommited, slot + 1, slot)) { + } + if (m_state > 1) { + do { + var hp = m_handlerPointer; + slot = hp + 1; + if (slot < m_handlersCommited) { + if (Interlocked.CompareExchange(ref m_handlerPointer, slot, hp) != hp) + continue; + InvokeHandler(m_handlers[slot]); + } + break; + } while(true); + } + } else { + if (slot == RESERVED_HANDLERS_COUNT) { + m_extraHandlers = new MTQueue(); + } else { + while (m_extraHandlers == null) + Thread.MemoryBarrier(); + } - if (IsResolved && m_handlers.TryDequeue(out handler)) + m_extraHandlers.Enqueue(handler); + + if (m_state > 1 && m_extraHandlers.TryDequeue(out handler)) // if the promise have been resolved while we was adding the handler to the queue // we can't guarantee that someone is still processing it // therefore we need to fetch a handler from the queue and execute it // note that fetched handler may be not the one that we have added // even we can fetch no handlers at all :) InvokeHandler(handler); + } } } diff --git a/Implab/Parallels/AsyncQueue.cs b/Implab/Parallels/AsyncQueue.cs --- a/Implab/Parallels/AsyncQueue.cs +++ b/Implab/Parallels/AsyncQueue.cs @@ -147,15 +147,9 @@ namespace Implab.Parallels { public const int DEFAULT_CHUNK_SIZE = 32; public const int MAX_CHUNK_SIZE = 262144; - readonly int m_chunkSize = DEFAULT_CHUNK_SIZE; - Chunk m_first; Chunk m_last; - public AsyncQueue() { - m_last = m_first = new Chunk(m_chunkSize); - } - /// /// Adds the specified value to the queue. /// @@ -167,7 +161,7 @@ namespace Implab.Parallels { while (last == null || !last.TryEnqueue(value, out extend)) { // try to extend queue if (extend || last == null) { - var chunk = new Chunk(m_chunkSize, value); + var chunk = new Chunk(DEFAULT_CHUNK_SIZE, value); if (EnqueueChunk(last, chunk)) break; // success! exit! last = m_last; @@ -215,7 +209,7 @@ namespace Implab.Parallels { var size = Math.Min(length, MAX_CHUNK_SIZE); var chunk = new Chunk( - Math.Max(size, m_chunkSize), + Math.Max(size, DEFAULT_CHUNK_SIZE), data, offset, size, @@ -404,7 +398,7 @@ namespace Implab.Parallels { public void Clear() { // start the new queue - var chunk = new Chunk(m_chunkSize); + var chunk = new Chunk(DEFAULT_CHUNK_SIZE); do { Thread.MemoryBarrier(); @@ -431,7 +425,7 @@ namespace Implab.Parallels { public T[] Drain() { // start the new queue - var chunk = new Chunk(m_chunkSize); + var chunk = new Chunk(DEFAULT_CHUNK_SIZE); do { Thread.MemoryBarrier(); @@ -458,7 +452,7 @@ namespace Implab.Parallels { T[] ReadChunks(Chunk chunk, object last) { var result = new List(); - var buffer = new T[m_chunkSize]; + var buffer = new T[DEFAULT_CHUNK_SIZE]; int actual; bool recycle; while (chunk != null) { diff --git a/Implab/Parallels/DispatchPool.cs b/Implab/Parallels/DispatchPool.cs --- a/Implab/Parallels/DispatchPool.cs +++ b/Implab/Parallels/DispatchPool.cs @@ -30,11 +30,9 @@ namespace Implab.Parallels { } protected DispatchPool() { - int maxThreads, maxCP; - ThreadPool.GetMaxThreads(out maxThreads, out maxCP); m_minThreadsLimit = 0; - m_maxThreadsLimit = maxThreads; + m_maxThreadsLimit = Environment.ProcessorCount; } protected void InitPool() { diff --git a/MonoPlay/Program.cs b/MonoPlay/Program.cs --- a/MonoPlay/Program.cs +++ b/MonoPlay/Program.cs @@ -11,109 +11,23 @@ namespace MonoPlay { if (args == null) throw new ArgumentNullException("args"); - var q1 = new AsyncQueue(); - var q2 = new Queue(); - const int count = 10000000; - int res1 = 0, res2 = 0; var t1 = Environment.TickCount; - AsyncPool.RunThread( - () => { - for (var i = 0; i < count; i++) - q1.Enqueue(1); - Console.WriteLine("done writer #1: {0} ms", Environment.TickCount - t1); - }, - () => { - for (var i = 0; i < count; i++) - q1.Enqueue(2); - Console.WriteLine("done writer #2: {0} ms", Environment.TickCount - t1); - }, - () => { - int temp = 0; - int i = 0; - while (i < count) - if (q1.TryDequeue(out temp)) { - i++; - res1 += temp; - } - Console.WriteLine("done reader #1: {0} ms", Environment.TickCount - t1); - }, - () => { - int temp = 0; - int i = 0; - while (i < count) - if (q1.TryDequeue(out temp)) { - i++; - res2 += temp; - } - Console.WriteLine("done reader #2: {0} ms", Environment.TickCount - t1); - } - ) - .Bundle() - .Join(); + for (int i = 0; i < count; i++) { + var p = new Promise(); + + p.On(x => {}).On(x => {}); - Console.WriteLine("done: {0} ms, summ#1: {1}, summ#2: {2}, total: {3}, count: {4}", Environment.TickCount - t1, res1, res2, res1 + res2, count*2); + p.Resolve(i); + + } + + var t2 = Environment.TickCount; - Console.WriteLine("MTQueue: {0} ms", t2 - t1); - - t1 = Environment.TickCount; - - for (var i = 0; i < count * 2; i++) - q2.Enqueue(i); - - for (var i = 0; i < count * 2; i++) - q2.Dequeue(); - - t2 = Environment.TickCount; - Console.WriteLine("Queue: {0} ms", t2 - t1); - - q2 = new Queue(); - - t1 = Environment.TickCount; - - - AsyncPool.RunThread( - () => { - for (var i = 0; i < count; i++) - lock (q2) - q2.Enqueue(i); - }, - () => { - for (var i = 0; i < count; i++) - lock (q2) - q2.Enqueue(i); - }, - () => { - for (int i = 0; i < count ;) - lock (q2) { - if (q2.Count == 0) - continue; - q2.Dequeue(); - i++; - } - - }, - () => { - for (int i = 0; i < count ;) - lock (q2) { - if (q2.Count == 0) - continue; - q2.Dequeue(); - i++; - } - - } - ) - .Bundle() - .Join(); - - - - t2 = Environment.TickCount; - Console.WriteLine("Queue+Lock: {0} ms", t2 - t1); + Console.WriteLine("done: {0} ms, {1:.00} Mb, {2} GC", t2 - t1, GC.GetTotalMemory(false) / (1024*1024), GC.CollectionCount(0) ); } }