DispatchPool.cs
        
        
            
                    197 lines
            
             | 6.0 KiB
            
                | text/x-csharp
            
             |
                CSharpLexer
            
          
        |  | r15 | using System; | ||
| using System.Threading; | ||||
| namespace Implab.Parallels { | ||||
| public abstract class DispatchPool<TUnit> : IDisposable { | ||||
|  | r81 | readonly int m_minThreadsLimit; | ||
| readonly int m_maxThreadsLimit; | ||||
| readonly int m_releaseTimeout = 1000; // the timeout while the working thread will wait for the new tasks before exit | ||||
|  | r21 | |||
|  | r93 | int m_threads; // the current size of the pool | ||
| int m_maxRunningThreads; // the meximum reached size of the pool | ||||
| int m_exit; // the pool is going to shutdown, all unused workers are released | ||||
|  | r80 | |||
|  | r81 | readonly object m_signal = new object(); // used to pulse waiting threads | ||
|  | r15 | |||
| protected DispatchPool(int min, int max) { | ||||
| if (min < 0) | ||||
| throw new ArgumentOutOfRangeException("min"); | ||||
| if (max <= 0) | ||||
| throw new ArgumentOutOfRangeException("max"); | ||||
| if (min > max) | ||||
| min = max; | ||||
|  | r81 | m_minThreadsLimit = min; | ||
| m_maxThreadsLimit = max; | ||||
|  | r15 | } | ||
| protected DispatchPool(int threads) | ||||
| : this(threads, threads) { | ||||
| } | ||||
| protected DispatchPool() { | ||||
|  | r81 | m_minThreadsLimit = 0; | ||
|  | r125 | m_maxThreadsLimit = Environment.ProcessorCount; | ||
|  | r15 | } | ||
| protected void InitPool() { | ||||
|  | r81 | for (int i = 0; i < m_minThreadsLimit; i++) | ||
|  | r15 | StartWorker(); | ||
| } | ||||
|  | r20 | public int PoolSize { | ||
|  | r15 | get { | ||
|  | r80 | Thread.MemoryBarrier(); | ||
|  | r81 | return m_threads; | ||
|  | r20 | } | ||
| } | ||||
|  | r81 | |||
|  | r15 | public int MaxRunningThreads { | ||
| get { | ||||
|  | r80 | Thread.MemoryBarrier(); | ||
|  | r15 | return m_maxRunningThreads; | ||
| } | ||||
| } | ||||
| protected bool IsDisposed { | ||||
| get { | ||||
|  | r80 | Thread.MemoryBarrier(); | ||
|  | r81 | return m_exit == 1; | ||
|  | r15 | } | ||
| } | ||||
|  | r17 | protected abstract bool TryDequeue(out TUnit unit); | ||
|  | r89 | bool Dequeue(out TUnit unit, int timeout) { | ||
|  | r81 | int ts = Environment.TickCount; | ||
| if (TryDequeue(out unit)) | ||||
| return true; | ||||
| lock (m_signal) { | ||||
| while (!TryDequeue(out unit) && m_exit == 0) | ||||
| if(!Monitor.Wait(m_signal, Math.Max(0, ts + timeout - Environment.TickCount))) { | ||||
| // timeout | ||||
| return false; | ||||
|  | r80 | } | ||
|  | r81 | // queue item or terminate | ||
| Monitor.Pulse(m_signal); | ||||
| if (m_exit == 1) | ||||
| return false; | ||||
|  | r80 | } | ||
|  | r81 | return true; | ||
|  | r22 | } | ||
|  | r81 | protected void SignalThread() { | ||
| lock (m_signal) { | ||||
| Monitor.Pulse(m_signal); | ||||
|  | r30 | } | ||
| } | ||||
|  | r17 | #region thread slots traits | ||
| bool AllocateThreadSlot() { | ||||
|  | r16 | int current; | ||
|  | r15 | // use spins to allocate slot for the new thread | ||
| do { | ||||
|  | r81 | current = m_threads; | ||
| if (current >= m_maxThreadsLimit || m_exit == 1) | ||||
|  | r15 | // no more slots left or the pool has been disposed | ||
| return false; | ||||
|  | r81 | } while (current != Interlocked.CompareExchange(ref m_threads, current + 1, current)); | ||
|  | r15 | |||
|  | r17 | UpdateMaxThreads(current + 1); | ||
| return true; | ||||
| } | ||||
|  | r15 | |||
|  | r17 | bool AllocateThreadSlot(int desired) { | ||
|  | r81 | if (desired - 1 != Interlocked.CompareExchange(ref m_threads, desired, desired - 1)) | ||
|  | r17 | return false; | ||
| UpdateMaxThreads(desired); | ||||
|  | r15 | |||
|  | r17 | return true; | ||
| } | ||||
| bool ReleaseThreadSlot(out bool last) { | ||||
| last = false; | ||||
| int current; | ||||
| // use spins to release slot for the new thread | ||||
|  | r80 | Thread.MemoryBarrier(); | ||
|  | r17 | do { | ||
|  | r81 | current = m_threads; | ||
| if (current <= m_minThreadsLimit && m_exit == 0) | ||||
|  | r17 | // the thread is reserved | ||
| return false; | ||||
|  | r81 | } while (current != Interlocked.CompareExchange(ref m_threads, current - 1, current)); | ||
|  | r17 | |||
| last = (current == 1); | ||||
|  | r15 | |||
| return true; | ||||
| } | ||||
|  | r17 | void UpdateMaxThreads(int count) { | ||
| int max; | ||||
|  | r16 | do { | ||
|  | r17 | max = m_maxRunningThreads; | ||
| if (max >= count) | ||||
| break; | ||||
| } while(max != Interlocked.CompareExchange(ref m_maxRunningThreads, count, max)); | ||||
|  | r16 | } | ||
|  | r17 | #endregion | ||
|  | r81 | protected bool StartWorker() { | ||
|  | r17 | if (AllocateThreadSlot()) { | ||
| // slot successfully allocated | ||||
|  | r92 | var worker = new Thread(Worker); | ||
|  | r17 | worker.IsBackground = true; | ||
| worker.Start(); | ||||
| return true; | ||||
| } | ||||
|  | r92 | return false; | ||
|  | r16 | } | ||
|  | r21 | protected abstract void InvokeUnit(TUnit unit); | ||
|  | r41 | protected virtual void Worker() { | ||
|  | r21 | TUnit unit; | ||
|  | r81 | bool last; | ||
|  | r15 | do { | ||
|  | r81 | while (Dequeue(out unit, m_releaseTimeout)) { | ||
|  | r21 | InvokeUnit(unit); | ||
|  | r81 | } | ||
| if(!ReleaseThreadSlot(out last)) | ||||
|  | r21 | continue; | ||
|  | r81 | // queue may be not empty | ||
| if (last && TryDequeue(out unit)) { | ||||
| InvokeUnit(unit); | ||||
| if (AllocateThreadSlot(1)) | ||||
| continue; | ||||
| // we can safely exit since pool is alive | ||||
|  | r15 | } | ||
|  | r81 | break; | ||
| } while(true); | ||||
| } | ||||
|  | r15 | |||
| protected virtual void Dispose(bool disposing) { | ||||
| if (disposing) { | ||||
|  | r81 | if (0 == Interlocked.CompareExchange(ref m_exit, 1, 0)) { // implies memory barrier | ||
|  | r15 | // wake sleeping threads | ||
|  | r81 | SignalThread(); | ||
|  | r15 | GC.SuppressFinalize(this); | ||
| } | ||||
| } | ||||
| } | ||||
| public void Dispose() { | ||||
| Dispose(true); | ||||
| } | ||||
| ~DispatchPool() { | ||||
| Dispose(false); | ||||
| } | ||||
| } | ||||
| } | ||||
