##// END OF EJS Templates
missing file
missing file

File last commit:

r125:f803565868a4 v2
r258:d0876436d95d v3
Show More
DispatchPool.cs
197 lines | 6.0 KiB | text/x-csharp | CSharpLexer
using System;
using System.Threading;
namespace Implab.Parallels {
public abstract class DispatchPool<TUnit> : IDisposable {
readonly int m_minThreadsLimit;
readonly int m_maxThreadsLimit;
readonly int m_releaseTimeout = 1000; // the timeout while the working thread will wait for the new tasks before exit
int m_threads; // the current size of the pool
int m_maxRunningThreads; // the meximum reached size of the pool
int m_exit; // the pool is going to shutdown, all unused workers are released
readonly object m_signal = new object(); // used to pulse waiting threads
protected DispatchPool(int min, int max) {
if (min < 0)
throw new ArgumentOutOfRangeException("min");
if (max <= 0)
throw new ArgumentOutOfRangeException("max");
if (min > max)
min = max;
m_minThreadsLimit = min;
m_maxThreadsLimit = max;
}
protected DispatchPool(int threads)
: this(threads, threads) {
}
protected DispatchPool() {
m_minThreadsLimit = 0;
m_maxThreadsLimit = Environment.ProcessorCount;
}
protected void InitPool() {
for (int i = 0; i < m_minThreadsLimit; i++)
StartWorker();
}
public int PoolSize {
get {
Thread.MemoryBarrier();
return m_threads;
}
}
public int MaxRunningThreads {
get {
Thread.MemoryBarrier();
return m_maxRunningThreads;
}
}
protected bool IsDisposed {
get {
Thread.MemoryBarrier();
return m_exit == 1;
}
}
protected abstract bool TryDequeue(out TUnit unit);
bool Dequeue(out TUnit unit, int timeout) {
int ts = Environment.TickCount;
if (TryDequeue(out unit))
return true;
lock (m_signal) {
while (!TryDequeue(out unit) && m_exit == 0)
if(!Monitor.Wait(m_signal, Math.Max(0, ts + timeout - Environment.TickCount))) {
// timeout
return false;
}
// queue item or terminate
Monitor.Pulse(m_signal);
if (m_exit == 1)
return false;
}
return true;
}
protected void SignalThread() {
lock (m_signal) {
Monitor.Pulse(m_signal);
}
}
#region thread slots traits
bool AllocateThreadSlot() {
int current;
// use spins to allocate slot for the new thread
do {
current = m_threads;
if (current >= m_maxThreadsLimit || m_exit == 1)
// no more slots left or the pool has been disposed
return false;
} while (current != Interlocked.CompareExchange(ref m_threads, current + 1, current));
UpdateMaxThreads(current + 1);
return true;
}
bool AllocateThreadSlot(int desired) {
if (desired - 1 != Interlocked.CompareExchange(ref m_threads, desired, desired - 1))
return false;
UpdateMaxThreads(desired);
return true;
}
bool ReleaseThreadSlot(out bool last) {
last = false;
int current;
// use spins to release slot for the new thread
Thread.MemoryBarrier();
do {
current = m_threads;
if (current <= m_minThreadsLimit && m_exit == 0)
// the thread is reserved
return false;
} while (current != Interlocked.CompareExchange(ref m_threads, current - 1, current));
last = (current == 1);
return true;
}
void UpdateMaxThreads(int count) {
int max;
do {
max = m_maxRunningThreads;
if (max >= count)
break;
} while(max != Interlocked.CompareExchange(ref m_maxRunningThreads, count, max));
}
#endregion
protected bool StartWorker() {
if (AllocateThreadSlot()) {
// slot successfully allocated
var worker = new Thread(Worker);
worker.IsBackground = true;
worker.Start();
return true;
}
return false;
}
protected abstract void InvokeUnit(TUnit unit);
protected virtual void Worker() {
TUnit unit;
bool last;
do {
while (Dequeue(out unit, m_releaseTimeout)) {
InvokeUnit(unit);
}
if(!ReleaseThreadSlot(out last))
continue;
// queue may be not empty
if (last && TryDequeue(out unit)) {
InvokeUnit(unit);
if (AllocateThreadSlot(1))
continue;
// we can safely exit since pool is alive
}
break;
} while(true);
}
protected virtual void Dispose(bool disposing) {
if (disposing) {
if (0 == Interlocked.CompareExchange(ref m_exit, 1, 0)) { // implies memory barrier
// wake sleeping threads
SignalThread();
GC.SuppressFinalize(this);
}
}
}
public void Dispose() {
Dispose(true);
}
~DispatchPool() {
Dispose(false);
}
}
}