DispatchPool.cs
171 lines
| 5.2 KiB
| text/x-csharp
|
CSharpLexer
cin
|
r15 | using System; | ||
using System.Collections.Generic; | ||||
using System.Linq; | ||||
using System.Text; | ||||
using System.Threading; | ||||
using System.Diagnostics; | ||||
namespace Implab.Parallels { | ||||
public abstract class DispatchPool<TUnit> : IDisposable { | ||||
readonly int m_minThreads; | ||||
readonly int m_maxThreads; | ||||
int m_runningThreads = 0; | ||||
int m_maxRunningThreads = 0; | ||||
int m_suspended = 0; | ||||
int m_exitRequired = 0; | ||||
AutoResetEvent m_hasTasks = new AutoResetEvent(false); | ||||
protected DispatchPool(int min, int max) { | ||||
if (min < 0) | ||||
throw new ArgumentOutOfRangeException("min"); | ||||
if (max <= 0) | ||||
throw new ArgumentOutOfRangeException("max"); | ||||
if (min > max) | ||||
min = max; | ||||
m_minThreads = min; | ||||
m_maxThreads = max; | ||||
} | ||||
protected DispatchPool(int threads) | ||||
: this(threads, threads) { | ||||
} | ||||
protected DispatchPool() { | ||||
int maxThreads, maxCP; | ||||
ThreadPool.GetMaxThreads(out maxThreads, out maxCP); | ||||
m_minThreads = 0; | ||||
m_maxThreads = maxThreads; | ||||
} | ||||
protected void InitPool() { | ||||
for (int i = 0; i < m_minThreads; i++) | ||||
StartWorker(); | ||||
} | ||||
public int ThreadCount { | ||||
get { | ||||
return m_runningThreads; | ||||
} | ||||
} | ||||
public int MaxRunningThreads { | ||||
get { | ||||
return m_maxRunningThreads; | ||||
} | ||||
} | ||||
protected bool IsDisposed { | ||||
get { | ||||
return m_exitRequired != 0; | ||||
} | ||||
} | ||||
bool StartWorker() { | ||||
var current = m_runningThreads; | ||||
// use spins to allocate slot for the new thread | ||||
do { | ||||
if (current >= m_maxThreads || m_exitRequired != 0) | ||||
// no more slots left or the pool has been disposed | ||||
return false; | ||||
} while (current != Interlocked.CompareExchange(ref m_runningThreads, current + 1, current)); | ||||
m_maxRunningThreads = Math.Max(m_maxRunningThreads, current + 1); | ||||
// slot successfully allocated | ||||
var worker = new Thread(this.Worker); | ||||
worker.IsBackground = true; | ||||
worker.Start(); | ||||
return true; | ||||
} | ||||
protected abstract bool TryDequeue(out TUnit unit); | ||||
protected virtual void WakeNewWorker() { | ||||
if (m_suspended > 0) | ||||
m_hasTasks.Set(); | ||||
else | ||||
StartWorker(); | ||||
} | ||||
bool FetchTask(out TUnit unit) { | ||||
do { | ||||
// exit if requested | ||||
if (m_exitRequired != 0) { | ||||
// release the thread slot | ||||
int running; | ||||
do { | ||||
running = m_runningThreads; | ||||
} while (running != Interlocked.CompareExchange(ref m_runningThreads, running - 1, running)); | ||||
running--; | ||||
if (running == 0) // it was the last worker | ||||
m_hasTasks.Dispose(); | ||||
else | ||||
m_hasTasks.Set(); // release next worker | ||||
unit = default(TUnit); | ||||
return false; | ||||
} | ||||
// fetch task | ||||
if (TryDequeue(out unit)) { | ||||
WakeNewWorker(); | ||||
return true; | ||||
} | ||||
//no tasks left, exit if the thread is no longer needed | ||||
int runningThreads; | ||||
bool exit = true; | ||||
do { | ||||
runningThreads = m_runningThreads; | ||||
if (runningThreads <= m_minThreads) { | ||||
exit = false; | ||||
break; | ||||
} | ||||
} while (runningThreads != Interlocked.CompareExchange(ref m_runningThreads, runningThreads - 1, runningThreads)); | ||||
if (exit) { | ||||
Interlocked.Decrement(ref m_runningThreads); | ||||
return false; | ||||
} | ||||
// keep this thread and wait | ||||
Interlocked.Increment(ref m_suspended); | ||||
m_hasTasks.WaitOne(); | ||||
Interlocked.Decrement(ref m_suspended); | ||||
} while (true); | ||||
} | ||||
protected abstract void InvokeUnit(TUnit unit); | ||||
void Worker() { | ||||
TUnit unit; | ||||
while (FetchTask(out unit)) | ||||
InvokeUnit(unit); | ||||
} | ||||
protected virtual void Dispose(bool disposing) { | ||||
if (disposing) { | ||||
if (m_exitRequired == 0) { | ||||
if (Interlocked.CompareExchange(ref m_exitRequired, 1, 0) != 0) | ||||
return; | ||||
// wake sleeping threads | ||||
m_hasTasks.Set(); | ||||
GC.SuppressFinalize(this); | ||||
} | ||||
} | ||||
} | ||||
public void Dispose() { | ||||
Dispose(true); | ||||
} | ||||
~DispatchPool() { | ||||
Dispose(false); | ||||
} | ||||
} | ||||
} | ||||