DispatchPool.cs
238 lines
| 7.3 KiB
| text/x-csharp
|
CSharpLexer
cin
|
r15 | using System; | ||
using System.Collections.Generic; | ||||
using System.Linq; | ||||
using System.Text; | ||||
using System.Threading; | ||||
using System.Diagnostics; | ||||
namespace Implab.Parallels { | ||||
public abstract class DispatchPool<TUnit> : IDisposable { | ||||
readonly int m_minThreads; | ||||
readonly int m_maxThreads; | ||||
int m_runningThreads = 0; | ||||
int m_maxRunningThreads = 0; | ||||
int m_suspended = 0; | ||||
int m_exitRequired = 0; | ||||
AutoResetEvent m_hasTasks = new AutoResetEvent(false); | ||||
protected DispatchPool(int min, int max) { | ||||
if (min < 0) | ||||
throw new ArgumentOutOfRangeException("min"); | ||||
if (max <= 0) | ||||
throw new ArgumentOutOfRangeException("max"); | ||||
if (min > max) | ||||
min = max; | ||||
m_minThreads = min; | ||||
m_maxThreads = max; | ||||
} | ||||
protected DispatchPool(int threads) | ||||
: this(threads, threads) { | ||||
} | ||||
protected DispatchPool() { | ||||
int maxThreads, maxCP; | ||||
ThreadPool.GetMaxThreads(out maxThreads, out maxCP); | ||||
m_minThreads = 0; | ||||
m_maxThreads = maxThreads; | ||||
} | ||||
protected void InitPool() { | ||||
for (int i = 0; i < m_minThreads; i++) | ||||
StartWorker(); | ||||
} | ||||
public int ThreadCount { | ||||
get { | ||||
return m_runningThreads; | ||||
} | ||||
} | ||||
public int MaxRunningThreads { | ||||
get { | ||||
return m_maxRunningThreads; | ||||
} | ||||
} | ||||
protected bool IsDisposed { | ||||
get { | ||||
return m_exitRequired != 0; | ||||
} | ||||
} | ||||
cin
|
r17 | protected abstract bool TryDequeue(out TUnit unit); | ||
protected virtual bool ExtendPool() { | ||||
if (m_suspended > 0) { | ||||
m_hasTasks.Set(); | ||||
return true; | ||||
} else | ||||
return StartWorker(); | ||||
} | ||||
/// <summary> | ||||
/// Запускает либо новый поток, если раньше не было ни одного потока, либо устанавливает событие пробуждение одного спящего потока | ||||
/// </summary> | ||||
protected void WakePool() { | ||||
m_hasTasks.Set(); // wake sleeping thread; | ||||
if (AllocateThreadSlot(1)) { | ||||
var worker = new Thread(this.Worker); | ||||
worker.IsBackground = true; | ||||
worker.Start(); | ||||
} | ||||
} | ||||
protected virtual void Suspend() { | ||||
m_hasTasks.WaitOne(); | ||||
} | ||||
#region thread slots traits | ||||
bool AllocateThreadSlot() { | ||||
cin
|
r16 | int current; | ||
cin
|
r15 | // use spins to allocate slot for the new thread | ||
do { | ||||
cin
|
r16 | current = m_runningThreads; | ||
cin
|
r15 | if (current >= m_maxThreads || m_exitRequired != 0) | ||
// no more slots left or the pool has been disposed | ||||
return false; | ||||
} while (current != Interlocked.CompareExchange(ref m_runningThreads, current + 1, current)); | ||||
cin
|
r17 | UpdateMaxThreads(current + 1); | ||
return true; | ||||
} | ||||
cin
|
r15 | |||
cin
|
r17 | bool AllocateThreadSlot(int desired) { | ||
if (desired - 1 != Interlocked.CompareExchange(ref m_runningThreads, desired, desired - 1)) | ||||
return false; | ||||
UpdateMaxThreads(desired); | ||||
cin
|
r15 | |||
cin
|
r17 | return true; | ||
} | ||||
bool ReleaseThreadSlot(out bool last) { | ||||
last = false; | ||||
int current; | ||||
// use spins to release slot for the new thread | ||||
do { | ||||
current = m_runningThreads; | ||||
if (current <= m_minThreads && m_exitRequired == 0) | ||||
// the thread is reserved | ||||
return false; | ||||
} while (current != Interlocked.CompareExchange(ref m_runningThreads, current - 1, current)); | ||||
last = (current == 1); | ||||
cin
|
r15 | |||
return true; | ||||
} | ||||
cin
|
r17 | /// <summary> | ||
/// releases thread slot unconditionally, used during cleanup | ||||
/// </summary> | ||||
/// <returns>true - no more threads left</returns> | ||||
bool ReleaseThreadSlotAnyway() { | ||||
var left = Interlocked.Decrement(ref m_runningThreads); | ||||
return left == 0; | ||||
cin
|
r15 | } | ||
cin
|
r17 | void UpdateMaxThreads(int count) { | ||
int max; | ||||
cin
|
r16 | do { | ||
cin
|
r17 | max = m_maxRunningThreads; | ||
if (max >= count) | ||||
break; | ||||
} while(max != Interlocked.CompareExchange(ref m_maxRunningThreads, count, max)); | ||||
cin
|
r16 | } | ||
cin
|
r17 | #endregion | ||
bool StartWorker() { | ||||
if (AllocateThreadSlot()) { | ||||
// slot successfully allocated | ||||
var worker = new Thread(this.Worker); | ||||
worker.IsBackground = true; | ||||
worker.Start(); | ||||
return true; | ||||
} else { | ||||
return false; | ||||
} | ||||
cin
|
r16 | } | ||
cin
|
r15 | bool FetchTask(out TUnit unit) { | ||
do { | ||||
// exit if requested | ||||
if (m_exitRequired != 0) { | ||||
// release the thread slot | ||||
cin
|
r17 | if (ReleaseThreadSlotAnyway()) // it was the last worker | ||
cin
|
r15 | m_hasTasks.Dispose(); | ||
else | ||||
cin
|
r17 | m_hasTasks.Set(); // wake next worker | ||
cin
|
r15 | unit = default(TUnit); | ||
return false; | ||||
} | ||||
// fetch task | ||||
if (TryDequeue(out unit)) { | ||||
cin
|
r17 | ExtendPool(); | ||
cin
|
r15 | return true; | ||
} | ||||
//no tasks left, exit if the thread is no longer needed | ||||
cin
|
r17 | bool last; | ||
if (ReleaseThreadSlot(out last)) { | ||||
if (last && m_hasTasks.WaitOne(0)) { | ||||
if (AllocateThreadSlot(1)) | ||||
continue; // spin again... | ||||
else | ||||
// we failed to reallocate slot for this thread | ||||
// therefore we need to release the event | ||||
m_hasTasks.Set(); | ||||
} | ||||
cin
|
r16 | |||
cin
|
r15 | return false; | ||
} | ||||
cin
|
r16 | // entering suspend state | ||
cin
|
r15 | Interlocked.Increment(ref m_suspended); | ||
cin
|
r16 | // keep this thread and wait | ||
Suspend(); | ||||
cin
|
r15 | Interlocked.Decrement(ref m_suspended); | ||
} while (true); | ||||
} | ||||
protected abstract void InvokeUnit(TUnit unit); | ||||
void Worker() { | ||||
TUnit unit; | ||||
while (FetchTask(out unit)) | ||||
InvokeUnit(unit); | ||||
} | ||||
protected virtual void Dispose(bool disposing) { | ||||
if (disposing) { | ||||
if (m_exitRequired == 0) { | ||||
if (Interlocked.CompareExchange(ref m_exitRequired, 1, 0) != 0) | ||||
return; | ||||
// wake sleeping threads | ||||
m_hasTasks.Set(); | ||||
GC.SuppressFinalize(this); | ||||
} | ||||
} | ||||
} | ||||
public void Dispose() { | ||||
Dispose(true); | ||||
} | ||||
~DispatchPool() { | ||||
Dispose(false); | ||||
} | ||||
} | ||||
} | ||||