DispatchPool.cs
302 lines
| 9.9 KiB
| text/x-csharp
|
CSharpLexer
cin
|
r15 | using System; | ||
using System.Collections.Generic; | ||||
using System.Linq; | ||||
using System.Text; | ||||
using System.Threading; | ||||
using System.Diagnostics; | ||||
namespace Implab.Parallels { | ||||
public abstract class DispatchPool<TUnit> : IDisposable { | ||||
readonly int m_minThreads; | ||||
readonly int m_maxThreads; | ||||
cin
|
r21 | |||
int m_createdThreads = 0; // the current size of the pool | ||||
int m_activeThreads = 0; // the count of threads which are active | ||||
int m_sleepingThreads = 0; // the count of currently inactive threads | ||||
int m_maxRunningThreads = 0; // the meximum reached size of the pool | ||||
int m_exitRequired = 0; // the pool is going to shutdown, all unused workers are released | ||||
int m_releaseTimeout = 100; // the timeout while the working thread will wait for the new tasks before exit | ||||
int m_wakeEvents = 0; // the count of wake events | ||||
cin
|
r15 | AutoResetEvent m_hasTasks = new AutoResetEvent(false); | ||
protected DispatchPool(int min, int max) { | ||||
if (min < 0) | ||||
throw new ArgumentOutOfRangeException("min"); | ||||
if (max <= 0) | ||||
throw new ArgumentOutOfRangeException("max"); | ||||
if (min > max) | ||||
min = max; | ||||
m_minThreads = min; | ||||
m_maxThreads = max; | ||||
} | ||||
protected DispatchPool(int threads) | ||||
: this(threads, threads) { | ||||
} | ||||
protected DispatchPool() { | ||||
int maxThreads, maxCP; | ||||
ThreadPool.GetMaxThreads(out maxThreads, out maxCP); | ||||
m_minThreads = 0; | ||||
m_maxThreads = maxThreads; | ||||
} | ||||
protected void InitPool() { | ||||
for (int i = 0; i < m_minThreads; i++) | ||||
StartWorker(); | ||||
} | ||||
cin
|
r20 | public int PoolSize { | ||
cin
|
r15 | get { | ||
cin
|
r20 | return m_createdThreads; | ||
} | ||||
} | ||||
public int ActiveThreads { | ||||
get { | ||||
return m_activeThreads; | ||||
cin
|
r15 | } | ||
} | ||||
public int MaxRunningThreads { | ||||
get { | ||||
return m_maxRunningThreads; | ||||
} | ||||
} | ||||
protected bool IsDisposed { | ||||
get { | ||||
return m_exitRequired != 0; | ||||
} | ||||
} | ||||
cin
|
r17 | protected abstract bool TryDequeue(out TUnit unit); | ||
cin
|
r21 | #region thread execution traits | ||
int SignalThread() { | ||||
var signals = Interlocked.Increment(ref m_wakeEvents); | ||||
if(signals == 1) | ||||
m_hasTasks.Set(); | ||||
return signals; | ||||
} | ||||
bool Sleep(int timeout) { | ||||
Interlocked.Increment(ref m_sleepingThreads); | ||||
if (m_hasTasks.WaitOne(timeout)) { | ||||
// this is autoreset event, only one thread can run this block simultaneously | ||||
var sleeping = Interlocked.Decrement(ref m_sleepingThreads); | ||||
if (Interlocked.Decrement(ref m_wakeEvents) > 0) | ||||
m_hasTasks.Set(); // wake next worker | ||||
cin
|
r17 | return true; | ||
cin
|
r21 | } else { | ||
Interlocked.Decrement(ref m_sleepingThreads); | ||||
return false; | ||||
cin
|
r20 | } | ||
cin
|
r17 | } | ||
cin
|
r21 | #endregion | ||
cin
|
r17 | |||
/// <summary> | ||||
/// Запускает либо новый поток, если раньше не было ни одного потока, либо устанавливает событие пробуждение одного спящего потока | ||||
/// </summary> | ||||
cin
|
r21 | protected void GrowPool() { | ||
if (m_exitRequired != 0) | ||||
return; | ||||
if (m_sleepingThreads > m_wakeEvents) { | ||||
// all sleeping threads may gone | ||||
SignalThread(); // wake a sleeping thread; | ||||
cin
|
r17 | |||
cin
|
r21 | // we can't check whether signal has been processed | ||
// anyway it may take some time for the thread to start | ||||
// we will ensure that at least one thread is running | ||||
if (AllocateThreadSlot(1)) { | ||||
// if there were no threads in the pool | ||||
var worker = new Thread(this.Worker); | ||||
worker.IsBackground = true; | ||||
worker.Start(); | ||||
} | ||||
} else { | ||||
// if there is no sleeping threads in the pool | ||||
StartWorker(); | ||||
cin
|
r17 | } | ||
} | ||||
cin
|
r21 | private bool Suspend() { | ||
cin
|
r20 | //no tasks left, exit if the thread is no longer needed | ||
bool last; | ||||
bool requestExit; | ||||
cin
|
r21 | |||
// if threads have a timeout before releasing | ||||
cin
|
r20 | if (m_releaseTimeout > 0) | ||
requestExit = !Sleep(m_releaseTimeout); | ||||
else | ||||
requestExit = true; | ||||
cin
|
r21 | if (!requestExit) | ||
return true; | ||||
// release unsused thread | ||||
cin
|
r20 | if (requestExit && ReleaseThreadSlot(out last)) { | ||
// in case at the moment the last thread was being released | ||||
// a new task was added to the queue, we need to try | ||||
// to revoke the thread to avoid the situation when the task is left unprocessed | ||||
cin
|
r21 | if (last && Sleep(0)) { // Sleep(0) will fetch pending task or will return false | ||
cin
|
r20 | if (AllocateThreadSlot(1)) | ||
return true; // spin again... | ||||
else | ||||
cin
|
r21 | SignalThread(); // since Sleep(0) has fetched the signal we neet to reschedule it | ||
cin
|
r20 | } | ||
return false; | ||||
} | ||||
cin
|
r21 | // wait till infinity | ||
cin
|
r20 | Sleep(-1); | ||
return true; | ||||
cin
|
r17 | } | ||
#region thread slots traits | ||||
bool AllocateThreadSlot() { | ||||
cin
|
r16 | int current; | ||
cin
|
r15 | // use spins to allocate slot for the new thread | ||
do { | ||||
cin
|
r20 | current = m_createdThreads; | ||
cin
|
r15 | if (current >= m_maxThreads || m_exitRequired != 0) | ||
// no more slots left or the pool has been disposed | ||||
return false; | ||||
cin
|
r20 | } while (current != Interlocked.CompareExchange(ref m_createdThreads, current + 1, current)); | ||
cin
|
r15 | |||
cin
|
r17 | UpdateMaxThreads(current + 1); | ||
return true; | ||||
} | ||||
cin
|
r15 | |||
cin
|
r17 | bool AllocateThreadSlot(int desired) { | ||
cin
|
r20 | if (desired - 1 != Interlocked.CompareExchange(ref m_createdThreads, desired, desired - 1)) | ||
cin
|
r17 | return false; | ||
UpdateMaxThreads(desired); | ||||
cin
|
r15 | |||
cin
|
r17 | return true; | ||
} | ||||
bool ReleaseThreadSlot(out bool last) { | ||||
last = false; | ||||
int current; | ||||
// use spins to release slot for the new thread | ||||
do { | ||||
cin
|
r20 | current = m_createdThreads; | ||
cin
|
r17 | if (current <= m_minThreads && m_exitRequired == 0) | ||
// the thread is reserved | ||||
return false; | ||||
cin
|
r20 | } while (current != Interlocked.CompareExchange(ref m_createdThreads, current - 1, current)); | ||
cin
|
r17 | |||
last = (current == 1); | ||||
cin
|
r15 | |||
return true; | ||||
} | ||||
cin
|
r17 | /// <summary> | ||
/// releases thread slot unconditionally, used during cleanup | ||||
/// </summary> | ||||
/// <returns>true - no more threads left</returns> | ||||
bool ReleaseThreadSlotAnyway() { | ||||
cin
|
r20 | var left = Interlocked.Decrement(ref m_createdThreads); | ||
cin
|
r17 | return left == 0; | ||
cin
|
r15 | } | ||
cin
|
r17 | void UpdateMaxThreads(int count) { | ||
int max; | ||||
cin
|
r16 | do { | ||
cin
|
r17 | max = m_maxRunningThreads; | ||
if (max >= count) | ||||
break; | ||||
} while(max != Interlocked.CompareExchange(ref m_maxRunningThreads, count, max)); | ||||
cin
|
r16 | } | ||
cin
|
r17 | #endregion | ||
bool StartWorker() { | ||||
if (AllocateThreadSlot()) { | ||||
// slot successfully allocated | ||||
var worker = new Thread(this.Worker); | ||||
worker.IsBackground = true; | ||||
worker.Start(); | ||||
return true; | ||||
} else { | ||||
return false; | ||||
} | ||||
cin
|
r16 | } | ||
cin
|
r21 | protected abstract void InvokeUnit(TUnit unit); | ||
void Worker() { | ||||
TUnit unit; | ||||
Interlocked.Increment(ref m_activeThreads); | ||||
Sleep(0); // remove wake request if the new thread is started | ||||
cin
|
r15 | do { | ||
// exit if requested | ||||
if (m_exitRequired != 0) { | ||||
// release the thread slot | ||||
cin
|
r20 | Interlocked.Decrement(ref m_activeThreads); | ||
cin
|
r17 | if (ReleaseThreadSlotAnyway()) // it was the last worker | ||
cin
|
r15 | m_hasTasks.Dispose(); | ||
else | ||||
cin
|
r21 | SignalThread(); // wake next worker | ||
cin
|
r15 | unit = default(TUnit); | ||
cin
|
r21 | break; | ||
cin
|
r15 | } | ||
// fetch task | ||||
if (TryDequeue(out unit)) { | ||||
cin
|
r21 | InvokeUnit(unit); | ||
continue; | ||||
cin
|
r15 | } | ||
cin
|
r20 | Interlocked.Decrement(ref m_activeThreads); | ||
cin
|
r15 | |||
cin
|
r16 | // entering suspend state | ||
// keep this thread and wait | ||||
cin
|
r20 | if (!Suspend()) | ||
cin
|
r21 | break; | ||
cin
|
r20 | |||
Interlocked.Increment(ref m_activeThreads); | ||||
cin
|
r15 | } while (true); | ||
cin
|
r21 | |||
cin
|
r15 | } | ||
protected virtual void Dispose(bool disposing) { | ||||
if (disposing) { | ||||
if (m_exitRequired == 0) { | ||||
if (Interlocked.CompareExchange(ref m_exitRequired, 1, 0) != 0) | ||||
return; | ||||
// wake sleeping threads | ||||
cin
|
r21 | if (m_createdThreads > 0) | ||
SignalThread(); | ||||
else | ||||
m_hasTasks.Dispose(); | ||||
cin
|
r15 | GC.SuppressFinalize(this); | ||
} | ||||
} | ||||
} | ||||
public void Dispose() { | ||||
Dispose(true); | ||||
} | ||||
~DispatchPool() { | ||||
Dispose(false); | ||||
} | ||||
} | ||||
} | ||||