# HG changeset patch # User cin # Date 2014-09-25 23:32:34 # Node ID 4f20870d0816a16b35008113dc90205a7b180cef # Parent 05e6468f066fe4b05297d6a473d3f4b50db6167d added memory barriers diff --git a/Implab.Test/AsyncTests.cs b/Implab.Test/AsyncTests.cs --- a/Implab.Test/AsyncTests.cs +++ b/Implab.Test/AsyncTests.cs @@ -291,7 +291,7 @@ namespace Implab.Test { [TestMethod] public void ChainedMapTest() { - using (var pool = new WorkerPool(0,100,100)) { + using (var pool = new WorkerPool(0,10,100)) { const int count = 10000; var args = new double[count]; diff --git a/Implab.mono.sln b/Implab.mono.sln --- a/Implab.mono.sln +++ b/Implab.mono.sln @@ -16,8 +16,6 @@ Project("{2150E333-8FDC-42A3-9474-1A3956 EndProject Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Implab.Test.mono", "Implab.Test\Implab.Test.mono.csproj", "{2BD05F84-E067-4B87-9477-FDC2676A21C6}" EndProject -Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Implab.Fx.Test.mono", "Implab.Fx.Test\Implab.Fx.Test.mono.csproj", "{2BD05F84-E067-4B87-9477-FDC2676A21C6}" -EndProject Global GlobalSection(SolutionConfigurationPlatforms) = preSolution Debug|Any CPU = Debug|Any CPU @@ -77,7 +75,6 @@ Global EndGlobalSection GlobalSection(NestedProjects) = preSolution {2BD05F84-E067-4B87-9477-FDC2676A21C6} = {BCA337C3-BFDC-4825-BBDB-E6D467E4E452} - {2BD05F84-E067-4B87-9477-FDC2676A21C6} = {BCA337C3-BFDC-4825-BBDB-E6D467E4E452} EndGlobalSection GlobalSection(MonoDevelopProperties) = preSolution StartupItem = Implab\Implab.csproj diff --git a/Implab/Implab.csproj b/Implab/Implab.csproj --- a/Implab/Implab.csproj +++ b/Implab/Implab.csproj @@ -7,6 +7,8 @@ Library Implab Implab + 8.0.30703 + 2.0 true @@ -121,6 +123,8 @@ + + diff --git a/Implab/JSON/JSONXmlReaderOptions.cs b/Implab/JSON/JSONXmlReaderOptions.cs --- a/Implab/JSON/JSONXmlReaderOptions.cs +++ b/Implab/JSON/JSONXmlReaderOptions.cs @@ -19,7 +19,9 @@ namespace Implab.JSON { } /// - /// Интерпретировать массивы как множественные элементы (убирает один уровень вложенности) + /// Интерпретировать массивы как множественные элементы (убирает один уровень вложенности), иначе массив + /// представляется в виде узла, дочерними элементами которого являются элементы массива, имена дочерних элементов + /// определяются свойством . По умолчанию false. /// public bool FlattenArrays { get; @@ -44,6 +46,7 @@ namespace Implab.JSON { /// /// Имя элемента для массивов, если не включена опция . + /// По умолчанию item. /// public string ArrayItemName { get; diff --git a/Implab/Parallels/ArrayTraits.cs b/Implab/Parallels/ArrayTraits.cs --- a/Implab/Parallels/ArrayTraits.cs +++ b/Implab/Parallels/ArrayTraits.cs @@ -153,7 +153,8 @@ namespace Implab.Parallels { var res = new TDst[source.Length]; var pending = source.Length; - var semaphore = new Semaphore(threads, threads); + object locker = new object(); + int slots = threads; // Analysis disable AccessToDisposedClosure AsyncPool.InvokeNewThread(() => { @@ -162,22 +163,28 @@ namespace Implab.Parallels { break; // stop processing in case of error or cancellation var idx = i; - semaphore.WaitOne(); + lock(locker) { + while(slots == 0) + Monitor.Wait(locker); + slots--; + } try { - var p1 = transform(source[i]); - p1.Anyway(() => semaphore.Release()); - p1.Then( - x => { - res[idx] = x; - var left = Interlocked.Decrement(ref pending); - if (left == 0) - promise.Resolve(res); - }, - e => { - promise.Reject(e); - throw new TransientPromiseException(e); - } - ); + transform(source[i]) + .Anyway(() => { + lock(locker) { + slots ++; + Monitor.Pulse(locker); + } + }) + .Last( + x => { + res[idx] = x; + var left = Interlocked.Decrement(ref pending); + if (left == 0) + promise.Resolve(res); + }, + e => promise.Reject(e) + ); } catch (Exception e) { promise.Reject(e); @@ -186,7 +193,7 @@ namespace Implab.Parallels { return 0; }); - return promise.Anyway(semaphore.Dispose); + return promise; } } diff --git a/Implab/Parallels/DispatchPool.cs b/Implab/Parallels/DispatchPool.cs --- a/Implab/Parallels/DispatchPool.cs +++ b/Implab/Parallels/DispatchPool.cs @@ -9,16 +9,17 @@ namespace Implab.Parallels { public abstract class DispatchPool : IDisposable { readonly int m_minThreads; readonly int m_maxThreads; + readonly int m_releaseTimeout = 100; // the timeout while the working thread will wait for the new tasks before exit int m_createdThreads = 0; // the current size of the pool int m_activeThreads = 0; // the count of threads which are active int m_sleepingThreads = 0; // the count of currently inactive threads int m_maxRunningThreads = 0; // the meximum reached size of the pool int m_exitRequired = 0; // the pool is going to shutdown, all unused workers are released - int m_releaseTimeout = 100; // the timeout while the working thread will wait for the new tasks before exit + int m_wakeEvents = 0; // the count of wake events - AutoResetEvent m_hasTasks = new AutoResetEvent(false); + readonly object m_signalLocker = new object(); protected DispatchPool(int min, int max) { if (min < 0) @@ -51,68 +52,76 @@ namespace Implab.Parallels { public int PoolSize { get { + Thread.MemoryBarrier(); return m_createdThreads; } } public int ActiveThreads { get { + Thread.MemoryBarrier(); return m_activeThreads; } } public int MaxRunningThreads { get { + Thread.MemoryBarrier(); return m_maxRunningThreads; } } protected bool IsDisposed { get { - return m_exitRequired != 0; + Thread.MemoryBarrier(); + return m_exitRequired == 1; } } protected abstract bool TryDequeue(out TUnit unit); - #region thread execution traits + #region thread signaling traits int SignalThread() { var signals = Interlocked.Increment(ref m_wakeEvents); if(signals == 1) - m_hasTasks.Set(); + lock(m_signalLocker) + Monitor.Pulse(m_signalLocker); return signals; } bool FetchSignalOrWait(int timeout) { var start = Environment.TickCount; - - // означает, что поток владеет блокировкой и при успешном получении сигнала должен - // ее вернуть, чтобы другой ожидающий поток смог - bool hasLock = false; + int signals; + Thread.MemoryBarrier(); // m_wakeEvents volatile first read do { - int signals; - do { - signals = m_wakeEvents; - if (signals == 0) - break; - } while (Interlocked.CompareExchange(ref m_wakeEvents, signals - 1, signals) != signals); + signals = m_wakeEvents; + if (signals == 0) + break; + } while (Interlocked.CompareExchange(ref m_wakeEvents, signals - 1, signals) != signals); - if (signals >= 1) { - if (signals > 1 && hasLock) - m_hasTasks.Set(); + if (signals == 0) { + // no signal is fetched + lock(m_signalLocker) { + while(m_wakeEvents == 0) { + if (timeout != -1) + timeout = Math.Max(0, timeout - (Environment.TickCount - start)); + if(!Monitor.Wait(m_signalLocker,timeout)) + return false; // timeout + } + // m_wakeEvents > 0 + if (Interlocked.Decrement(ref m_wakeEvents) > 0) //syncronized + Monitor.Pulse(m_signalLocker); + + // signal fetched return true; } - if (timeout != -1) - timeout = Math.Max(0, timeout - (Environment.TickCount - start)); + } else { + // signal fetched + return true; + } - // если сигналов больше не осталось, то первый поток, который дошел сюда сбросит событие - // и уйдет на пустой цикл, после чего заблокируется - hasLock = true; - } while (m_hasTasks.WaitOne(timeout)); - - return false; } bool Sleep(int timeout) { @@ -131,7 +140,8 @@ namespace Implab.Parallels { /// Запускает либо новый поток, если раньше не было ни одного потока, либо устанавливает событие пробуждение одного спящего потока /// protected void GrowPool() { - if (m_exitRequired != 0) + Thread.MemoryBarrier(); + if (m_exitRequired == 1) return; if (m_sleepingThreads > m_wakeEvents) { //Console.WriteLine("Waking threads (sleeps {0}, pending {1})", m_sleepingThreads, m_wakeEvents); @@ -204,7 +214,7 @@ namespace Implab.Parallels { // use spins to allocate slot for the new thread do { current = m_createdThreads; - if (current >= m_maxThreads || m_exitRequired != 0) + if (current >= m_maxThreads || m_exitRequired == 1) // no more slots left or the pool has been disposed return false; } while (current != Interlocked.CompareExchange(ref m_createdThreads, current + 1, current)); @@ -227,6 +237,7 @@ namespace Implab.Parallels { last = false; int current; // use spins to release slot for the new thread + Thread.MemoryBarrier(); do { current = m_createdThreads; if (current <= m_minThreads && m_exitRequired == 0) @@ -264,6 +275,7 @@ namespace Implab.Parallels { // slot successfully allocated var worker = new Thread(this.Worker); worker.IsBackground = true; + Interlocked.Increment(ref m_activeThreads); worker.Start(); return true; @@ -277,15 +289,14 @@ namespace Implab.Parallels { protected virtual void Worker() { TUnit unit; //Console.WriteLine("{0}: Active", Thread.CurrentThread.ManagedThreadId); - Interlocked.Increment(ref m_activeThreads); + int count = 0;; + Thread.MemoryBarrier(); do { // exit if requested - if (m_exitRequired != 0) { + if (m_exitRequired == 1) { // release the thread slot Interlocked.Decrement(ref m_activeThreads); - if (ReleaseThreadSlotAnyway()) // it was the last worker - m_hasTasks.Dispose(); - else + if (!ReleaseThreadSlotAnyway()) // it was the last worker SignalThread(); // wake next worker break; } @@ -293,14 +304,17 @@ namespace Implab.Parallels { // fetch task if (TryDequeue(out unit)) { InvokeUnit(unit); + count ++; continue; } Interlocked.Decrement(ref m_activeThreads); + Console.WriteLine("{0}: Suspend processed({1})", Thread.CurrentThread.ManagedThreadId,count); // entering suspend state // keep this thread and wait if (!Suspend()) break; + count = 0; //Console.WriteLine("{0}: Awake", Thread.CurrentThread.ManagedThreadId); Interlocked.Increment(ref m_activeThreads); } while (true); @@ -309,15 +323,10 @@ namespace Implab.Parallels { protected virtual void Dispose(bool disposing) { if (disposing) { - if (m_exitRequired == 0) { - if (Interlocked.CompareExchange(ref m_exitRequired, 1, 0) != 0) - return; - + if (0 == Interlocked.CompareExchange(ref m_exitRequired, 1, 0)) { // implies memory barrier // wake sleeping threads if (m_createdThreads > 0) SignalThread(); - else - m_hasTasks.Dispose(); GC.SuppressFinalize(this); } } diff --git a/Implab/Parallels/MTQueue.cs b/Implab/Parallels/MTQueue.cs --- a/Implab/Parallels/MTQueue.cs +++ b/Implab/Parallels/MTQueue.cs @@ -18,6 +18,8 @@ namespace Implab.Parallels { Node m_last; public void Enqueue(T value) { + Thread.MemoryBarrier(); + var last = m_last; var next = new Node(value); @@ -35,6 +37,7 @@ namespace Implab.Parallels { Node next = null; value = default(T); + Thread.MemoryBarrier(); do { first = m_first; if (first == null) diff --git a/Implab/Parallels/WorkerPool.cs b/Implab/Parallels/WorkerPool.cs --- a/Implab/Parallels/WorkerPool.cs +++ b/Implab/Parallels/WorkerPool.cs @@ -12,20 +12,24 @@ namespace Implab.Parallels { MTQueue m_queue = new MTQueue(); int m_queueLength = 0; readonly int m_threshold = 1; + int m_workers = 0; public WorkerPool(int minThreads, int maxThreads, int threshold) : base(minThreads, maxThreads) { m_threshold = threshold; + m_workers = minThreads; InitPool(); } public WorkerPool(int minThreads, int maxThreads) : base(minThreads, maxThreads) { + m_workers = minThreads; InitPool(); } public WorkerPool(int threads) : base(threads) { + m_workers = threads; InitPool(); } @@ -62,8 +66,10 @@ namespace Implab.Parallels { var len = Interlocked.Increment(ref m_queueLength); m_queue.Enqueue(unit); - if (len > m_threshold*ActiveThreads) + if (len > m_threshold * m_workers) { + Interlocked.Increment(ref m_workers); GrowPool(); + } } protected override bool TryDequeue(out Action unit) { @@ -85,8 +91,10 @@ namespace Implab.Parallels { // Suspend // queueLength > 0 // continue + Thread.MemoryBarrier(); if (m_queueLength > 0) return true; + Interlocked.Decrement(ref m_workers); return base.Suspend(); } diff --git a/Implab/Promise.cs b/Implab/Promise.cs --- a/Implab/Promise.cs +++ b/Implab/Promise.cs @@ -142,18 +142,20 @@ namespace Implab { void WaitTransition() { while (m_state == TRANSITIONAL_STATE) { - /* noop */ + Thread.MemoryBarrier(); } } public bool IsResolved { get { + Thread.MemoryBarrier(); return m_state > 1; } } public bool IsCancelled { get { + Thread.MemoryBarrier(); return m_state == CANCELLED_STATE; } } diff --git a/Implab/SyncPool.cs b/Implab/SyncPool.cs new file mode 100644 --- /dev/null +++ b/Implab/SyncPool.cs @@ -0,0 +1,85 @@ +using System; +using Implab.Parallels; +using System.Threading; + +namespace Implab { + /*public class SyncPool : IDisposable { + readonly Func m_factory; + readonly Action m_cleanup; + readonly int m_size; + readonly MTQueue m_queue = new MTQueue(); + + volatile bool m_disposed; + + volatile int m_count; + + public SyncPool(Func factory, Action cleanup, int size) { + Safe.ArgumentNotNull(factory, "factory"); + Safe.ArgumentInRange(size, 1, size, "size"); + + m_factory = factory; + m_cleanup = cleanup; + m_size = size; + } + + public SyncPool(Func factory, Action cleanup) : this(factory,cleanup,Environment.ProcessorCount+1) { + } + + public SyncPool(Func factory) : this(factory,null,Environment.ProcessorCount+1) { + } + + public SyncPoolWrapper Allocate() { + if (m_disposed) + throw new ObjectDisposedException(this.ToString()); + + T instance; + if (m_queue.TryDequeue(out instance)) { + Interlocked.Decrement(ref m_count); + return instance; + } else { + instance = m_factory(); + } + return new SyncPoolWrapper(instance, this); + } + + public void Release(T instance) { + if (m_count < m_size && !m_disposed) { + Interlocked.Increment(ref m_count); + + if (m_cleanup != null) + m_cleanup(instance); + + m_queue.Enqueue(instance); + + // пока элемент возвращался в кеш, была начата операция освобождения всего кеша + // и возможно уже законцена, в таком случае следует извлечь элемент обратно и + // освободить его. Если операция освобождения кеша еще не заврешилась, то будет + // изъят и освобожден произвольный элемен, что не повлияет на ход всего процесса. + if (m_disposed && m_queue.TryDequeue(out instance)) + Safe.Dispose(instance); + + } else { + Safe.Dispose(instance); + } + } + + protected virtual void Dispose(bool disposing) { + if (disposing) { + m_disposed = true; + T instance; + while (m_queue.TryDequeue(out instance)) + Safe.Dispose(instance); + } + } + + #region IDisposable implementation + + public void Dispose() { + Dispose(true); + GC.SuppressFinalize(this); + } + + #endregion + }*/ +} + diff --git a/Implab/SyncPoolWrapper.cs b/Implab/SyncPoolWrapper.cs new file mode 100644 --- /dev/null +++ b/Implab/SyncPoolWrapper.cs @@ -0,0 +1,24 @@ +using System; + +namespace Implab { + /*public struct SyncPoolWrapper : IDisposable { + readonly T m_value; + readonly SyncPool m_pool; + + internal SyncPoolWrapper(T value, SyncPool pool) { + m_value = value; + m_pool = pool; + } + + public T Value { + get { return m_value; } + } + + #region IDisposable implementation + public void Dispose() { + m_pool.Release(m_value); + } + #endregion + }*/ +} +