##// END OF EJS Templates
fixed promise chaining behavior, the error handler doesn't handle result or cancellation handlers exceptions these exceptions are propagated to the next handlers.
fixed promise chaining behavior, the error handler doesn't handle result or cancellation handlers exceptions these exceptions are propagated to the next handlers.

File last commit:

r125:f803565868a4 v2
r196:40d7fed4a09e default
Show More
DispatchPool.cs
197 lines | 6.0 KiB | text/x-csharp | CSharpLexer
using System;
using System.Threading;
namespace Implab.Parallels {
public abstract class DispatchPool<TUnit> : IDisposable {
readonly int m_minThreadsLimit;
readonly int m_maxThreadsLimit;
readonly int m_releaseTimeout = 1000; // the timeout while the working thread will wait for the new tasks before exit
int m_threads; // the current size of the pool
int m_maxRunningThreads; // the meximum reached size of the pool
int m_exit; // the pool is going to shutdown, all unused workers are released
readonly object m_signal = new object(); // used to pulse waiting threads
protected DispatchPool(int min, int max) {
if (min < 0)
throw new ArgumentOutOfRangeException("min");
if (max <= 0)
throw new ArgumentOutOfRangeException("max");
if (min > max)
min = max;
m_minThreadsLimit = min;
m_maxThreadsLimit = max;
}
protected DispatchPool(int threads)
: this(threads, threads) {
}
protected DispatchPool() {
m_minThreadsLimit = 0;
m_maxThreadsLimit = Environment.ProcessorCount;
}
protected void InitPool() {
for (int i = 0; i < m_minThreadsLimit; i++)
StartWorker();
}
public int PoolSize {
get {
Thread.MemoryBarrier();
return m_threads;
}
}
public int MaxRunningThreads {
get {
Thread.MemoryBarrier();
return m_maxRunningThreads;
}
}
protected bool IsDisposed {
get {
Thread.MemoryBarrier();
return m_exit == 1;
}
}
protected abstract bool TryDequeue(out TUnit unit);
bool Dequeue(out TUnit unit, int timeout) {
int ts = Environment.TickCount;
if (TryDequeue(out unit))
return true;
lock (m_signal) {
while (!TryDequeue(out unit) && m_exit == 0)
if(!Monitor.Wait(m_signal, Math.Max(0, ts + timeout - Environment.TickCount))) {
// timeout
return false;
}
// queue item or terminate
Monitor.Pulse(m_signal);
if (m_exit == 1)
return false;
}
return true;
}
protected void SignalThread() {
lock (m_signal) {
Monitor.Pulse(m_signal);
}
}
#region thread slots traits
bool AllocateThreadSlot() {
int current;
// use spins to allocate slot for the new thread
do {
current = m_threads;
if (current >= m_maxThreadsLimit || m_exit == 1)
// no more slots left or the pool has been disposed
return false;
} while (current != Interlocked.CompareExchange(ref m_threads, current + 1, current));
UpdateMaxThreads(current + 1);
return true;
}
bool AllocateThreadSlot(int desired) {
if (desired - 1 != Interlocked.CompareExchange(ref m_threads, desired, desired - 1))
return false;
UpdateMaxThreads(desired);
return true;
}
bool ReleaseThreadSlot(out bool last) {
last = false;
int current;
// use spins to release slot for the new thread
Thread.MemoryBarrier();
do {
current = m_threads;
if (current <= m_minThreadsLimit && m_exit == 0)
// the thread is reserved
return false;
} while (current != Interlocked.CompareExchange(ref m_threads, current - 1, current));
last = (current == 1);
return true;
}
void UpdateMaxThreads(int count) {
int max;
do {
max = m_maxRunningThreads;
if (max >= count)
break;
} while(max != Interlocked.CompareExchange(ref m_maxRunningThreads, count, max));
}
#endregion
protected bool StartWorker() {
if (AllocateThreadSlot()) {
// slot successfully allocated
var worker = new Thread(Worker);
worker.IsBackground = true;
worker.Start();
return true;
}
return false;
}
protected abstract void InvokeUnit(TUnit unit);
protected virtual void Worker() {
TUnit unit;
bool last;
do {
while (Dequeue(out unit, m_releaseTimeout)) {
InvokeUnit(unit);
}
if(!ReleaseThreadSlot(out last))
continue;
// queue may be not empty
if (last && TryDequeue(out unit)) {
InvokeUnit(unit);
if (AllocateThreadSlot(1))
continue;
// we can safely exit since pool is alive
}
break;
} while(true);
}
protected virtual void Dispose(bool disposing) {
if (disposing) {
if (0 == Interlocked.CompareExchange(ref m_exit, 1, 0)) { // implies memory barrier
// wake sleeping threads
SignalThread();
GC.SuppressFinalize(this);
}
}
}
public void Dispose() {
Dispose(true);
}
~DispatchPool() {
Dispose(false);
}
}
}