##// END OF EJS Templates
Added 'Fail' method to RunnableComponent which allows component to move from...
Added 'Fail' method to RunnableComponent which allows component to move from Running to Failed state. Added PollingComponent a timer based runnable component More tests Added FailPromise a thin class to wrap exceptions Fixed error handling in SuccessPromise classes.

File last commit:

r125:f803565868a4 v2
r203:4d9830a9bbb8 v2
Show More
DispatchPool.cs
197 lines | 6.0 KiB | text/x-csharp | CSharpLexer
using System;
using System.Threading;
namespace Implab.Parallels {
public abstract class DispatchPool<TUnit> : IDisposable {
readonly int m_minThreadsLimit;
readonly int m_maxThreadsLimit;
readonly int m_releaseTimeout = 1000; // the timeout while the working thread will wait for the new tasks before exit
int m_threads; // the current size of the pool
int m_maxRunningThreads; // the meximum reached size of the pool
int m_exit; // the pool is going to shutdown, all unused workers are released
readonly object m_signal = new object(); // used to pulse waiting threads
protected DispatchPool(int min, int max) {
if (min < 0)
throw new ArgumentOutOfRangeException("min");
if (max <= 0)
throw new ArgumentOutOfRangeException("max");
if (min > max)
min = max;
m_minThreadsLimit = min;
m_maxThreadsLimit = max;
}
protected DispatchPool(int threads)
: this(threads, threads) {
}
protected DispatchPool() {
m_minThreadsLimit = 0;
m_maxThreadsLimit = Environment.ProcessorCount;
}
protected void InitPool() {
for (int i = 0; i < m_minThreadsLimit; i++)
StartWorker();
}
public int PoolSize {
get {
Thread.MemoryBarrier();
return m_threads;
}
}
public int MaxRunningThreads {
get {
Thread.MemoryBarrier();
return m_maxRunningThreads;
}
}
protected bool IsDisposed {
get {
Thread.MemoryBarrier();
return m_exit == 1;
}
}
protected abstract bool TryDequeue(out TUnit unit);
bool Dequeue(out TUnit unit, int timeout) {
int ts = Environment.TickCount;
if (TryDequeue(out unit))
return true;
lock (m_signal) {
while (!TryDequeue(out unit) && m_exit == 0)
if(!Monitor.Wait(m_signal, Math.Max(0, ts + timeout - Environment.TickCount))) {
// timeout
return false;
}
// queue item or terminate
Monitor.Pulse(m_signal);
if (m_exit == 1)
return false;
}
return true;
}
protected void SignalThread() {
lock (m_signal) {
Monitor.Pulse(m_signal);
}
}
#region thread slots traits
bool AllocateThreadSlot() {
int current;
// use spins to allocate slot for the new thread
do {
current = m_threads;
if (current >= m_maxThreadsLimit || m_exit == 1)
// no more slots left or the pool has been disposed
return false;
} while (current != Interlocked.CompareExchange(ref m_threads, current + 1, current));
UpdateMaxThreads(current + 1);
return true;
}
bool AllocateThreadSlot(int desired) {
if (desired - 1 != Interlocked.CompareExchange(ref m_threads, desired, desired - 1))
return false;
UpdateMaxThreads(desired);
return true;
}
bool ReleaseThreadSlot(out bool last) {
last = false;
int current;
// use spins to release slot for the new thread
Thread.MemoryBarrier();
do {
current = m_threads;
if (current <= m_minThreadsLimit && m_exit == 0)
// the thread is reserved
return false;
} while (current != Interlocked.CompareExchange(ref m_threads, current - 1, current));
last = (current == 1);
return true;
}
void UpdateMaxThreads(int count) {
int max;
do {
max = m_maxRunningThreads;
if (max >= count)
break;
} while(max != Interlocked.CompareExchange(ref m_maxRunningThreads, count, max));
}
#endregion
protected bool StartWorker() {
if (AllocateThreadSlot()) {
// slot successfully allocated
var worker = new Thread(Worker);
worker.IsBackground = true;
worker.Start();
return true;
}
return false;
}
protected abstract void InvokeUnit(TUnit unit);
protected virtual void Worker() {
TUnit unit;
bool last;
do {
while (Dequeue(out unit, m_releaseTimeout)) {
InvokeUnit(unit);
}
if(!ReleaseThreadSlot(out last))
continue;
// queue may be not empty
if (last && TryDequeue(out unit)) {
InvokeUnit(unit);
if (AllocateThreadSlot(1))
continue;
// we can safely exit since pool is alive
}
break;
} while(true);
}
protected virtual void Dispose(bool disposing) {
if (disposing) {
if (0 == Interlocked.CompareExchange(ref m_exit, 1, 0)) { // implies memory barrier
// wake sleeping threads
SignalThread();
GC.SuppressFinalize(this);
}
}
}
public void Dispose() {
Dispose(true);
}
~DispatchPool() {
Dispose(false);
}
}
}