DispatchPool.cs
334 lines
| 11.7 KiB
| text/x-csharp
|
CSharpLexer
cin
|
r15 | using System; | ||
using System.Collections.Generic; | ||||
using System.Linq; | ||||
using System.Text; | ||||
using System.Threading; | ||||
using System.Diagnostics; | ||||
namespace Implab.Parallels { | ||||
public abstract class DispatchPool<TUnit> : IDisposable { | ||||
readonly int m_minThreads; | ||||
readonly int m_maxThreads; | ||||
cin
|
r21 | |||
int m_createdThreads = 0; // the current size of the pool | ||||
int m_activeThreads = 0; // the count of threads which are active | ||||
int m_sleepingThreads = 0; // the count of currently inactive threads | ||||
int m_maxRunningThreads = 0; // the meximum reached size of the pool | ||||
int m_exitRequired = 0; // the pool is going to shutdown, all unused workers are released | ||||
int m_releaseTimeout = 100; // the timeout while the working thread will wait for the new tasks before exit | ||||
int m_wakeEvents = 0; // the count of wake events | ||||
cin
|
r15 | AutoResetEvent m_hasTasks = new AutoResetEvent(false); | ||
protected DispatchPool(int min, int max) { | ||||
if (min < 0) | ||||
throw new ArgumentOutOfRangeException("min"); | ||||
if (max <= 0) | ||||
throw new ArgumentOutOfRangeException("max"); | ||||
if (min > max) | ||||
min = max; | ||||
m_minThreads = min; | ||||
m_maxThreads = max; | ||||
} | ||||
protected DispatchPool(int threads) | ||||
: this(threads, threads) { | ||||
} | ||||
protected DispatchPool() { | ||||
int maxThreads, maxCP; | ||||
ThreadPool.GetMaxThreads(out maxThreads, out maxCP); | ||||
m_minThreads = 0; | ||||
m_maxThreads = maxThreads; | ||||
} | ||||
protected void InitPool() { | ||||
for (int i = 0; i < m_minThreads; i++) | ||||
StartWorker(); | ||||
} | ||||
cin
|
r20 | public int PoolSize { | ||
cin
|
r15 | get { | ||
cin
|
r20 | return m_createdThreads; | ||
} | ||||
} | ||||
public int ActiveThreads { | ||||
get { | ||||
return m_activeThreads; | ||||
cin
|
r15 | } | ||
} | ||||
public int MaxRunningThreads { | ||||
get { | ||||
return m_maxRunningThreads; | ||||
} | ||||
} | ||||
protected bool IsDisposed { | ||||
get { | ||||
return m_exitRequired != 0; | ||||
} | ||||
} | ||||
cin
|
r17 | protected abstract bool TryDequeue(out TUnit unit); | ||
cin
|
r21 | #region thread execution traits | ||
int SignalThread() { | ||||
var signals = Interlocked.Increment(ref m_wakeEvents); | ||||
if(signals == 1) | ||||
m_hasTasks.Set(); | ||||
return signals; | ||||
} | ||||
cin
|
r22 | bool FetchSignalOrWait(int timeout) { | ||
var start = Environment.TickCount; | ||||
// означает, что поток владеет блокировкой и при успешном получении сигнала должен | ||||
// ее вернуть, чтобы другой ожидающий поток смог | ||||
bool hasLock = false; | ||||
do { | ||||
int signals; | ||||
do { | ||||
signals = m_wakeEvents; | ||||
if (signals == 0) | ||||
break; | ||||
} while (Interlocked.CompareExchange(ref m_wakeEvents, signals - 1, signals) != signals); | ||||
if (signals >= 1) { | ||||
if (signals > 1 && hasLock) | ||||
m_hasTasks.Set(); | ||||
return true; | ||||
} | ||||
if (timeout != -1) | ||||
timeout = Math.Max(0, timeout - (Environment.TickCount - start)); | ||||
// если сигналов больше не осталось, то первый поток, который дошел сюда сбросит событие | ||||
// и уйдет на пустой цикл, после чего заблокируется | ||||
hasLock = true; | ||||
} while (m_hasTasks.WaitOne(timeout)); | ||||
return false; | ||||
} | ||||
cin
|
r21 | bool Sleep(int timeout) { | ||
Interlocked.Increment(ref m_sleepingThreads); | ||||
cin
|
r22 | if (FetchSignalOrWait(timeout)) { | ||
Interlocked.Decrement(ref m_sleepingThreads); | ||||
cin
|
r17 | return true; | ||
cin
|
r21 | } else { | ||
Interlocked.Decrement(ref m_sleepingThreads); | ||||
return false; | ||||
cin
|
r20 | } | ||
cin
|
r17 | } | ||
cin
|
r21 | #endregion | ||
cin
|
r17 | |||
/// <summary> | ||||
/// Запускает либо новый поток, если раньше не было ни одного потока, либо устанавливает событие пробуждение одного спящего потока | ||||
/// </summary> | ||||
cin
|
r21 | protected void GrowPool() { | ||
if (m_exitRequired != 0) | ||||
return; | ||||
if (m_sleepingThreads > m_wakeEvents) { | ||||
cin
|
r22 | //Console.WriteLine("Waking threads (sleeps {0}, pending {1})", m_sleepingThreads, m_wakeEvents); | ||
cin
|
r21 | // all sleeping threads may gone | ||
SignalThread(); // wake a sleeping thread; | ||||
cin
|
r17 | |||
cin
|
r21 | // we can't check whether signal has been processed | ||
// anyway it may take some time for the thread to start | ||||
// we will ensure that at least one thread is running | ||||
cin
|
r30 | EnsurePoolIsAlive(); | ||
cin
|
r21 | } else { | ||
// if there is no sleeping threads in the pool | ||||
cin
|
r30 | if (!StartWorker()) { | ||
// we haven't started a new thread, but the current can be on the way to terminate and it can't process the queue | ||||
cin
|
r24 | // send it a signal to spin again | ||
cin
|
r30 | SignalThread(); | ||
EnsurePoolIsAlive(); | ||||
} | ||||
} | ||||
} | ||||
cin
|
r34 | protected void EnsurePoolIsAlive() { | ||
cin
|
r30 | if (AllocateThreadSlot(1)) { | ||
// if there were no threads in the pool | ||||
var worker = new Thread(this.Worker); | ||||
worker.IsBackground = true; | ||||
worker.Start(); | ||||
cin
|
r17 | } | ||
} | ||||
cin
|
r34 | protected virtual bool Suspend() { | ||
cin
|
r20 | //no tasks left, exit if the thread is no longer needed | ||
bool last; | ||||
bool requestExit; | ||||
cin
|
r21 | // if threads have a timeout before releasing | ||
cin
|
r20 | if (m_releaseTimeout > 0) | ||
requestExit = !Sleep(m_releaseTimeout); | ||||
else | ||||
requestExit = true; | ||||
cin
|
r21 | if (!requestExit) | ||
return true; | ||||
// release unsused thread | ||||
cin
|
r20 | if (requestExit && ReleaseThreadSlot(out last)) { | ||
// in case at the moment the last thread was being released | ||||
// a new task was added to the queue, we need to try | ||||
// to revoke the thread to avoid the situation when the task is left unprocessed | ||||
cin
|
r30 | if (last && FetchSignalOrWait(0)) { // FetchSignalOrWait(0) will fetch pending task or will return false | ||
SignalThread(); // since FetchSignalOrWait(0) has fetched the signal we need to reschedule it | ||||
return AllocateThreadSlot(1); // ensure that at least one thread is alive | ||||
cin
|
r20 | } | ||
return false; | ||||
} | ||||
cin
|
r21 | // wait till infinity | ||
cin
|
r20 | Sleep(-1); | ||
return true; | ||||
cin
|
r17 | } | ||
#region thread slots traits | ||||
bool AllocateThreadSlot() { | ||||
cin
|
r16 | int current; | ||
cin
|
r15 | // use spins to allocate slot for the new thread | ||
do { | ||||
cin
|
r20 | current = m_createdThreads; | ||
cin
|
r15 | if (current >= m_maxThreads || m_exitRequired != 0) | ||
// no more slots left or the pool has been disposed | ||||
return false; | ||||
cin
|
r20 | } while (current != Interlocked.CompareExchange(ref m_createdThreads, current + 1, current)); | ||
cin
|
r15 | |||
cin
|
r17 | UpdateMaxThreads(current + 1); | ||
return true; | ||||
} | ||||
cin
|
r15 | |||
cin
|
r17 | bool AllocateThreadSlot(int desired) { | ||
cin
|
r20 | if (desired - 1 != Interlocked.CompareExchange(ref m_createdThreads, desired, desired - 1)) | ||
cin
|
r17 | return false; | ||
UpdateMaxThreads(desired); | ||||
cin
|
r15 | |||
cin
|
r17 | return true; | ||
} | ||||
bool ReleaseThreadSlot(out bool last) { | ||||
last = false; | ||||
int current; | ||||
// use spins to release slot for the new thread | ||||
do { | ||||
cin
|
r20 | current = m_createdThreads; | ||
cin
|
r17 | if (current <= m_minThreads && m_exitRequired == 0) | ||
// the thread is reserved | ||||
return false; | ||||
cin
|
r20 | } while (current != Interlocked.CompareExchange(ref m_createdThreads, current - 1, current)); | ||
cin
|
r17 | |||
last = (current == 1); | ||||
cin
|
r15 | |||
return true; | ||||
} | ||||
cin
|
r17 | /// <summary> | ||
/// releases thread slot unconditionally, used during cleanup | ||||
/// </summary> | ||||
/// <returns>true - no more threads left</returns> | ||||
bool ReleaseThreadSlotAnyway() { | ||||
cin
|
r20 | var left = Interlocked.Decrement(ref m_createdThreads); | ||
cin
|
r17 | return left == 0; | ||
cin
|
r15 | } | ||
cin
|
r17 | void UpdateMaxThreads(int count) { | ||
int max; | ||||
cin
|
r16 | do { | ||
cin
|
r17 | max = m_maxRunningThreads; | ||
if (max >= count) | ||||
break; | ||||
} while(max != Interlocked.CompareExchange(ref m_maxRunningThreads, count, max)); | ||||
cin
|
r16 | } | ||
cin
|
r17 | #endregion | ||
bool StartWorker() { | ||||
if (AllocateThreadSlot()) { | ||||
// slot successfully allocated | ||||
var worker = new Thread(this.Worker); | ||||
worker.IsBackground = true; | ||||
worker.Start(); | ||||
return true; | ||||
} else { | ||||
return false; | ||||
} | ||||
cin
|
r16 | } | ||
cin
|
r21 | protected abstract void InvokeUnit(TUnit unit); | ||
void Worker() { | ||||
TUnit unit; | ||||
cin
|
r22 | //Console.WriteLine("{0}: Active", Thread.CurrentThread.ManagedThreadId); | ||
cin
|
r21 | Interlocked.Increment(ref m_activeThreads); | ||
cin
|
r15 | do { | ||
// exit if requested | ||||
if (m_exitRequired != 0) { | ||||
// release the thread slot | ||||
cin
|
r20 | Interlocked.Decrement(ref m_activeThreads); | ||
cin
|
r17 | if (ReleaseThreadSlotAnyway()) // it was the last worker | ||
cin
|
r15 | m_hasTasks.Dispose(); | ||
else | ||||
cin
|
r21 | SignalThread(); // wake next worker | ||
break; | ||||
cin
|
r15 | } | ||
// fetch task | ||||
if (TryDequeue(out unit)) { | ||||
cin
|
r21 | InvokeUnit(unit); | ||
continue; | ||||
cin
|
r15 | } | ||
cin
|
r20 | Interlocked.Decrement(ref m_activeThreads); | ||
cin
|
r15 | |||
cin
|
r16 | // entering suspend state | ||
// keep this thread and wait | ||||
cin
|
r20 | if (!Suspend()) | ||
cin
|
r21 | break; | ||
cin
|
r22 | //Console.WriteLine("{0}: Awake", Thread.CurrentThread.ManagedThreadId); | ||
cin
|
r20 | Interlocked.Increment(ref m_activeThreads); | ||
cin
|
r15 | } while (true); | ||
cin
|
r22 | //Console.WriteLine("{0}: Exited", Thread.CurrentThread.ManagedThreadId); | ||
cin
|
r15 | } | ||
protected virtual void Dispose(bool disposing) { | ||||
if (disposing) { | ||||
if (m_exitRequired == 0) { | ||||
if (Interlocked.CompareExchange(ref m_exitRequired, 1, 0) != 0) | ||||
return; | ||||
// wake sleeping threads | ||||
cin
|
r21 | if (m_createdThreads > 0) | ||
SignalThread(); | ||||
else | ||||
m_hasTasks.Dispose(); | ||||
cin
|
r15 | GC.SuppressFinalize(this); | ||
} | ||||
} | ||||
} | ||||
public void Dispose() { | ||||
Dispose(true); | ||||
} | ||||
~DispatchPool() { | ||||
Dispose(false); | ||||
} | ||||
} | ||||
} | ||||