DispatchPool.cs
197 lines
| 6.0 KiB
| text/x-csharp
|
CSharpLexer
cin
|
r289 | using System; | ||
using System.Threading; | ||||
namespace Implab.Parallels { | ||||
public abstract class DispatchPool<TUnit> : IDisposable { | ||||
readonly int m_minThreadsLimit; | ||||
readonly int m_maxThreadsLimit; | ||||
readonly int m_releaseTimeout = 1000; // the timeout while the working thread will wait for the new tasks before exit | ||||
int m_threads; // the current size of the pool | ||||
int m_maxRunningThreads; // the meximum reached size of the pool | ||||
int m_exit; // the pool is going to shutdown, all unused workers are released | ||||
readonly object m_signal = new object(); // used to pulse waiting threads | ||||
protected DispatchPool(int min, int max) { | ||||
if (min < 0) | ||||
throw new ArgumentOutOfRangeException("min"); | ||||
if (max <= 0) | ||||
throw new ArgumentOutOfRangeException("max"); | ||||
if (min > max) | ||||
min = max; | ||||
m_minThreadsLimit = min; | ||||
m_maxThreadsLimit = max; | ||||
} | ||||
protected DispatchPool(int threads) | ||||
: this(threads, threads) { | ||||
} | ||||
protected DispatchPool() { | ||||
m_minThreadsLimit = 0; | ||||
m_maxThreadsLimit = Environment.ProcessorCount; | ||||
} | ||||
protected void InitPool() { | ||||
for (int i = 0; i < m_minThreadsLimit; i++) | ||||
StartWorker(); | ||||
} | ||||
public int PoolSize { | ||||
get { | ||||
Thread.MemoryBarrier(); | ||||
return m_threads; | ||||
} | ||||
} | ||||
public int MaxRunningThreads { | ||||
get { | ||||
Thread.MemoryBarrier(); | ||||
return m_maxRunningThreads; | ||||
} | ||||
} | ||||
protected bool IsDisposed { | ||||
get { | ||||
Thread.MemoryBarrier(); | ||||
return m_exit == 1; | ||||
} | ||||
} | ||||
protected abstract bool TryDequeue(out TUnit unit); | ||||
bool Dequeue(out TUnit unit, int timeout) { | ||||
int ts = Environment.TickCount; | ||||
if (TryDequeue(out unit)) | ||||
return true; | ||||
lock (m_signal) { | ||||
while (!TryDequeue(out unit) && m_exit == 0) | ||||
if(!Monitor.Wait(m_signal, Math.Max(0, ts + timeout - Environment.TickCount))) { | ||||
// timeout | ||||
return false; | ||||
} | ||||
// queue item or terminate | ||||
Monitor.Pulse(m_signal); | ||||
if (m_exit == 1) | ||||
return false; | ||||
} | ||||
return true; | ||||
} | ||||
protected void SignalThread() { | ||||
lock (m_signal) { | ||||
Monitor.Pulse(m_signal); | ||||
} | ||||
} | ||||
#region thread slots traits | ||||
bool AllocateThreadSlot() { | ||||
int current; | ||||
// use spins to allocate slot for the new thread | ||||
do { | ||||
current = m_threads; | ||||
if (current >= m_maxThreadsLimit || m_exit == 1) | ||||
// no more slots left or the pool has been disposed | ||||
return false; | ||||
} while (current != Interlocked.CompareExchange(ref m_threads, current + 1, current)); | ||||
UpdateMaxThreads(current + 1); | ||||
return true; | ||||
} | ||||
bool AllocateThreadSlot(int desired) { | ||||
if (desired - 1 != Interlocked.CompareExchange(ref m_threads, desired, desired - 1)) | ||||
return false; | ||||
UpdateMaxThreads(desired); | ||||
return true; | ||||
} | ||||
bool ReleaseThreadSlot(out bool last) { | ||||
last = false; | ||||
int current; | ||||
// use spins to release slot for the new thread | ||||
Thread.MemoryBarrier(); | ||||
do { | ||||
current = m_threads; | ||||
if (current <= m_minThreadsLimit && m_exit == 0) | ||||
// the thread is reserved | ||||
return false; | ||||
} while (current != Interlocked.CompareExchange(ref m_threads, current - 1, current)); | ||||
last = (current == 1); | ||||
return true; | ||||
} | ||||
void UpdateMaxThreads(int count) { | ||||
int max; | ||||
do { | ||||
max = m_maxRunningThreads; | ||||
if (max >= count) | ||||
break; | ||||
} while(max != Interlocked.CompareExchange(ref m_maxRunningThreads, count, max)); | ||||
} | ||||
#endregion | ||||
protected bool StartWorker() { | ||||
if (AllocateThreadSlot()) { | ||||
// slot successfully allocated | ||||
var worker = new Thread(Worker); | ||||
worker.IsBackground = true; | ||||
worker.Start(); | ||||
return true; | ||||
} | ||||
return false; | ||||
} | ||||
protected abstract void InvokeUnit(TUnit unit); | ||||
protected virtual void Worker() { | ||||
TUnit unit; | ||||
bool last; | ||||
do { | ||||
while (Dequeue(out unit, m_releaseTimeout)) { | ||||
InvokeUnit(unit); | ||||
} | ||||
if(!ReleaseThreadSlot(out last)) | ||||
continue; | ||||
// queue may be not empty | ||||
if (last && TryDequeue(out unit)) { | ||||
InvokeUnit(unit); | ||||
if (AllocateThreadSlot(1)) | ||||
continue; | ||||
// we can safely exit since pool is alive | ||||
} | ||||
break; | ||||
} while(true); | ||||
} | ||||
protected virtual void Dispose(bool disposing) { | ||||
if (disposing) { | ||||
if (0 == Interlocked.CompareExchange(ref m_exit, 1, 0)) { // implies memory barrier | ||||
// wake sleeping threads | ||||
SignalThread(); | ||||
GC.SuppressFinalize(this); | ||||
} | ||||
} | ||||
} | ||||
public void Dispose() { | ||||
Dispose(true); | ||||
} | ||||
~DispatchPool() { | ||||
Dispose(false); | ||||
} | ||||
} | ||||
} | ||||