using System; using System.Threading; namespace Implab.Parallels { public abstract class DispatchPool : IDisposable { readonly int m_minThreadsLimit; readonly int m_maxThreadsLimit; readonly int m_releaseTimeout = 1000; // the timeout while the working thread will wait for the new tasks before exit int m_threads; // the current size of the pool int m_maxRunningThreads; // the meximum reached size of the pool int m_exit; // the pool is going to shutdown, all unused workers are released readonly object m_signal = new object(); // used to pulse waiting threads protected DispatchPool(int min, int max) { if (min < 0) throw new ArgumentOutOfRangeException("min"); if (max <= 0) throw new ArgumentOutOfRangeException("max"); if (min > max) min = max; m_minThreadsLimit = min; m_maxThreadsLimit = max; } protected DispatchPool(int threads) : this(threads, threads) { } protected DispatchPool() { m_minThreadsLimit = 0; m_maxThreadsLimit = Environment.ProcessorCount; } protected void InitPool() { for (int i = 0; i < m_minThreadsLimit; i++) StartWorker(); } public int PoolSize { get { Thread.MemoryBarrier(); return m_threads; } } public int MaxRunningThreads { get { Thread.MemoryBarrier(); return m_maxRunningThreads; } } protected bool IsDisposed { get { Thread.MemoryBarrier(); return m_exit == 1; } } protected abstract bool TryDequeue(out TUnit unit); bool Dequeue(out TUnit unit, int timeout) { int ts = Environment.TickCount; if (TryDequeue(out unit)) return true; lock (m_signal) { while (!TryDequeue(out unit) && m_exit == 0) if(!Monitor.Wait(m_signal, Math.Max(0, ts + timeout - Environment.TickCount))) { // timeout return false; } // queue item or terminate Monitor.Pulse(m_signal); if (m_exit == 1) return false; } return true; } protected void SignalThread() { lock (m_signal) { Monitor.Pulse(m_signal); } } #region thread slots traits bool AllocateThreadSlot() { int current; // use spins to allocate slot for the new thread do { current = m_threads; if (current >= m_maxThreadsLimit || m_exit == 1) // no more slots left or the pool has been disposed return false; } while (current != Interlocked.CompareExchange(ref m_threads, current + 1, current)); UpdateMaxThreads(current + 1); return true; } bool AllocateThreadSlot(int desired) { if (desired - 1 != Interlocked.CompareExchange(ref m_threads, desired, desired - 1)) return false; UpdateMaxThreads(desired); return true; } bool ReleaseThreadSlot(out bool last) { last = false; int current; // use spins to release slot for the new thread Thread.MemoryBarrier(); do { current = m_threads; if (current <= m_minThreadsLimit && m_exit == 0) // the thread is reserved return false; } while (current != Interlocked.CompareExchange(ref m_threads, current - 1, current)); last = (current == 1); return true; } void UpdateMaxThreads(int count) { int max; do { max = m_maxRunningThreads; if (max >= count) break; } while(max != Interlocked.CompareExchange(ref m_maxRunningThreads, count, max)); } #endregion protected bool StartWorker() { if (AllocateThreadSlot()) { // slot successfully allocated var worker = new Thread(Worker); worker.IsBackground = true; worker.Start(); return true; } return false; } protected abstract void InvokeUnit(TUnit unit); protected virtual void Worker() { TUnit unit; bool last; do { while (Dequeue(out unit, m_releaseTimeout)) { InvokeUnit(unit); } if(!ReleaseThreadSlot(out last)) continue; // queue may be not empty if (last && TryDequeue(out unit)) { InvokeUnit(unit); if (AllocateThreadSlot(1)) continue; // we can safely exit since pool is alive } break; } while(true); } protected virtual void Dispose(bool disposing) { if (disposing) { if (0 == Interlocked.CompareExchange(ref m_exit, 1, 0)) { // implies memory barrier // wake sleeping threads SignalThread(); GC.SuppressFinalize(this); } } } public void Dispose() { Dispose(true); } ~DispatchPool() { Dispose(false); } } }