DispatchPool.cs
199 lines
| 6.1 KiB
| text/x-csharp
|
CSharpLexer
cin
|
r15 | using System; | ||
using System.Threading; | ||||
namespace Implab.Parallels { | ||||
public abstract class DispatchPool<TUnit> : IDisposable { | ||||
cin
|
r81 | readonly int m_minThreadsLimit; | ||
readonly int m_maxThreadsLimit; | ||||
readonly int m_releaseTimeout = 1000; // the timeout while the working thread will wait for the new tasks before exit | ||||
cin
|
r21 | |||
cin
|
r93 | int m_threads; // the current size of the pool | ||
int m_maxRunningThreads; // the meximum reached size of the pool | ||||
int m_exit; // the pool is going to shutdown, all unused workers are released | ||||
cin
|
r80 | |||
cin
|
r81 | readonly object m_signal = new object(); // used to pulse waiting threads | ||
cin
|
r15 | |||
protected DispatchPool(int min, int max) { | ||||
if (min < 0) | ||||
throw new ArgumentOutOfRangeException("min"); | ||||
if (max <= 0) | ||||
throw new ArgumentOutOfRangeException("max"); | ||||
if (min > max) | ||||
min = max; | ||||
cin
|
r81 | m_minThreadsLimit = min; | ||
m_maxThreadsLimit = max; | ||||
cin
|
r15 | } | ||
protected DispatchPool(int threads) | ||||
: this(threads, threads) { | ||||
} | ||||
protected DispatchPool() { | ||||
int maxThreads, maxCP; | ||||
ThreadPool.GetMaxThreads(out maxThreads, out maxCP); | ||||
cin
|
r81 | m_minThreadsLimit = 0; | ||
m_maxThreadsLimit = maxThreads; | ||||
cin
|
r15 | } | ||
protected void InitPool() { | ||||
cin
|
r81 | for (int i = 0; i < m_minThreadsLimit; i++) | ||
cin
|
r15 | StartWorker(); | ||
} | ||||
cin
|
r20 | public int PoolSize { | ||
cin
|
r15 | get { | ||
cin
|
r80 | Thread.MemoryBarrier(); | ||
cin
|
r81 | return m_threads; | ||
cin
|
r20 | } | ||
} | ||||
cin
|
r81 | |||
cin
|
r15 | public int MaxRunningThreads { | ||
get { | ||||
cin
|
r80 | Thread.MemoryBarrier(); | ||
cin
|
r15 | return m_maxRunningThreads; | ||
} | ||||
} | ||||
protected bool IsDisposed { | ||||
get { | ||||
cin
|
r80 | Thread.MemoryBarrier(); | ||
cin
|
r81 | return m_exit == 1; | ||
cin
|
r15 | } | ||
} | ||||
cin
|
r17 | protected abstract bool TryDequeue(out TUnit unit); | ||
cin
|
r89 | bool Dequeue(out TUnit unit, int timeout) { | ||
cin
|
r81 | int ts = Environment.TickCount; | ||
if (TryDequeue(out unit)) | ||||
return true; | ||||
lock (m_signal) { | ||||
while (!TryDequeue(out unit) && m_exit == 0) | ||||
if(!Monitor.Wait(m_signal, Math.Max(0, ts + timeout - Environment.TickCount))) { | ||||
// timeout | ||||
return false; | ||||
cin
|
r80 | } | ||
cin
|
r81 | // queue item or terminate | ||
Monitor.Pulse(m_signal); | ||||
if (m_exit == 1) | ||||
return false; | ||||
cin
|
r80 | } | ||
cin
|
r81 | return true; | ||
cin
|
r22 | } | ||
cin
|
r81 | protected void SignalThread() { | ||
lock (m_signal) { | ||||
Monitor.Pulse(m_signal); | ||||
cin
|
r30 | } | ||
} | ||||
cin
|
r17 | #region thread slots traits | ||
bool AllocateThreadSlot() { | ||||
cin
|
r16 | int current; | ||
cin
|
r15 | // use spins to allocate slot for the new thread | ||
do { | ||||
cin
|
r81 | current = m_threads; | ||
if (current >= m_maxThreadsLimit || m_exit == 1) | ||||
cin
|
r15 | // no more slots left or the pool has been disposed | ||
return false; | ||||
cin
|
r81 | } while (current != Interlocked.CompareExchange(ref m_threads, current + 1, current)); | ||
cin
|
r15 | |||
cin
|
r17 | UpdateMaxThreads(current + 1); | ||
return true; | ||||
} | ||||
cin
|
r15 | |||
cin
|
r17 | bool AllocateThreadSlot(int desired) { | ||
cin
|
r81 | if (desired - 1 != Interlocked.CompareExchange(ref m_threads, desired, desired - 1)) | ||
cin
|
r17 | return false; | ||
UpdateMaxThreads(desired); | ||||
cin
|
r15 | |||
cin
|
r17 | return true; | ||
} | ||||
bool ReleaseThreadSlot(out bool last) { | ||||
last = false; | ||||
int current; | ||||
// use spins to release slot for the new thread | ||||
cin
|
r80 | Thread.MemoryBarrier(); | ||
cin
|
r17 | do { | ||
cin
|
r81 | current = m_threads; | ||
if (current <= m_minThreadsLimit && m_exit == 0) | ||||
cin
|
r17 | // the thread is reserved | ||
return false; | ||||
cin
|
r81 | } while (current != Interlocked.CompareExchange(ref m_threads, current - 1, current)); | ||
cin
|
r17 | |||
last = (current == 1); | ||||
cin
|
r15 | |||
return true; | ||||
} | ||||
cin
|
r17 | void UpdateMaxThreads(int count) { | ||
int max; | ||||
cin
|
r16 | do { | ||
cin
|
r17 | max = m_maxRunningThreads; | ||
if (max >= count) | ||||
break; | ||||
} while(max != Interlocked.CompareExchange(ref m_maxRunningThreads, count, max)); | ||||
cin
|
r16 | } | ||
cin
|
r17 | #endregion | ||
cin
|
r81 | protected bool StartWorker() { | ||
cin
|
r17 | if (AllocateThreadSlot()) { | ||
// slot successfully allocated | ||||
cin
|
r92 | var worker = new Thread(Worker); | ||
cin
|
r17 | worker.IsBackground = true; | ||
worker.Start(); | ||||
return true; | ||||
} | ||||
cin
|
r92 | return false; | ||
cin
|
r16 | } | ||
cin
|
r21 | protected abstract void InvokeUnit(TUnit unit); | ||
cin
|
r41 | protected virtual void Worker() { | ||
cin
|
r21 | TUnit unit; | ||
cin
|
r81 | bool last; | ||
cin
|
r15 | do { | ||
cin
|
r81 | while (Dequeue(out unit, m_releaseTimeout)) { | ||
cin
|
r21 | InvokeUnit(unit); | ||
cin
|
r81 | } | ||
if(!ReleaseThreadSlot(out last)) | ||||
cin
|
r21 | continue; | ||
cin
|
r81 | // queue may be not empty | ||
if (last && TryDequeue(out unit)) { | ||||
InvokeUnit(unit); | ||||
if (AllocateThreadSlot(1)) | ||||
continue; | ||||
// we can safely exit since pool is alive | ||||
cin
|
r15 | } | ||
cin
|
r81 | break; | ||
} while(true); | ||||
} | ||||
cin
|
r15 | |||
protected virtual void Dispose(bool disposing) { | ||||
if (disposing) { | ||||
cin
|
r81 | if (0 == Interlocked.CompareExchange(ref m_exit, 1, 0)) { // implies memory barrier | ||
cin
|
r15 | // wake sleeping threads | ||
cin
|
r81 | SignalThread(); | ||
cin
|
r15 | GC.SuppressFinalize(this); | ||
} | ||||
} | ||||
} | ||||
public void Dispose() { | ||||
Dispose(true); | ||||
} | ||||
~DispatchPool() { | ||||
Dispose(false); | ||||
} | ||||
} | ||||
} | ||||