DispatchPool.cs
183 lines
| 5.6 KiB
| text/x-csharp
|
CSharpLexer
cin
|
r15 | using System; | ||
using System.Collections.Generic; | ||||
using System.Linq; | ||||
using System.Text; | ||||
using System.Threading; | ||||
using System.Diagnostics; | ||||
namespace Implab.Parallels { | ||||
public abstract class DispatchPool<TUnit> : IDisposable { | ||||
readonly int m_minThreads; | ||||
readonly int m_maxThreads; | ||||
int m_runningThreads = 0; | ||||
int m_maxRunningThreads = 0; | ||||
int m_suspended = 0; | ||||
int m_exitRequired = 0; | ||||
AutoResetEvent m_hasTasks = new AutoResetEvent(false); | ||||
protected DispatchPool(int min, int max) { | ||||
if (min < 0) | ||||
throw new ArgumentOutOfRangeException("min"); | ||||
if (max <= 0) | ||||
throw new ArgumentOutOfRangeException("max"); | ||||
if (min > max) | ||||
min = max; | ||||
m_minThreads = min; | ||||
m_maxThreads = max; | ||||
} | ||||
protected DispatchPool(int threads) | ||||
: this(threads, threads) { | ||||
} | ||||
protected DispatchPool() { | ||||
int maxThreads, maxCP; | ||||
ThreadPool.GetMaxThreads(out maxThreads, out maxCP); | ||||
m_minThreads = 0; | ||||
m_maxThreads = maxThreads; | ||||
} | ||||
protected void InitPool() { | ||||
for (int i = 0; i < m_minThreads; i++) | ||||
StartWorker(); | ||||
} | ||||
public int ThreadCount { | ||||
get { | ||||
return m_runningThreads; | ||||
} | ||||
} | ||||
public int MaxRunningThreads { | ||||
get { | ||||
return m_maxRunningThreads; | ||||
} | ||||
} | ||||
protected bool IsDisposed { | ||||
get { | ||||
return m_exitRequired != 0; | ||||
} | ||||
} | ||||
bool StartWorker() { | ||||
cin
|
r16 | int current; | ||
cin
|
r15 | // use spins to allocate slot for the new thread | ||
do { | ||||
cin
|
r16 | current = m_runningThreads; | ||
cin
|
r15 | if (current >= m_maxThreads || m_exitRequired != 0) | ||
// no more slots left or the pool has been disposed | ||||
return false; | ||||
} while (current != Interlocked.CompareExchange(ref m_runningThreads, current + 1, current)); | ||||
m_maxRunningThreads = Math.Max(m_maxRunningThreads, current + 1); | ||||
// slot successfully allocated | ||||
var worker = new Thread(this.Worker); | ||||
worker.IsBackground = true; | ||||
worker.Start(); | ||||
return true; | ||||
} | ||||
protected abstract bool TryDequeue(out TUnit unit); | ||||
cin
|
r16 | protected virtual void WakeNewWorker(bool extend) { | ||
cin
|
r15 | if (m_suspended > 0) | ||
m_hasTasks.Set(); | ||||
else | ||||
StartWorker(); | ||||
} | ||||
cin
|
r16 | /// <summary> | ||
/// Запускает либо новый поток, если раньше не было ни одного потока, либо устанавливает событие пробуждение одного спящего потока | ||||
/// </summary> | ||||
protected void StartIfIdle() { | ||||
int threads; | ||||
do { | ||||
} | ||||
} | ||||
protected virtual void Suspend() { | ||||
m_hasTasks.WaitOne(); | ||||
} | ||||
cin
|
r15 | bool FetchTask(out TUnit unit) { | ||
do { | ||||
// exit if requested | ||||
if (m_exitRequired != 0) { | ||||
// release the thread slot | ||||
cin
|
r16 | var running = Interlocked.Decrement(ref m_runningThreads); | ||
cin
|
r15 | if (running == 0) // it was the last worker | ||
m_hasTasks.Dispose(); | ||||
else | ||||
m_hasTasks.Set(); // release next worker | ||||
unit = default(TUnit); | ||||
return false; | ||||
} | ||||
// fetch task | ||||
if (TryDequeue(out unit)) { | ||||
cin
|
r16 | WakeNewWorker(true); | ||
cin
|
r15 | return true; | ||
} | ||||
//no tasks left, exit if the thread is no longer needed | ||||
int runningThreads; | ||||
bool exit = true; | ||||
do { | ||||
runningThreads = m_runningThreads; | ||||
if (runningThreads <= m_minThreads) { | ||||
cin
|
r16 | // check wheather this is the last thread and we have tasks | ||
cin
|
r15 | exit = false; | ||
break; | ||||
} | ||||
} while (runningThreads != Interlocked.CompareExchange(ref m_runningThreads, runningThreads - 1, runningThreads)); | ||||
if (exit) { | ||||
return false; | ||||
} | ||||
cin
|
r16 | // entering suspend state | ||
cin
|
r15 | Interlocked.Increment(ref m_suspended); | ||
cin
|
r16 | // keep this thread and wait | ||
Suspend(); | ||||
cin
|
r15 | Interlocked.Decrement(ref m_suspended); | ||
} while (true); | ||||
} | ||||
protected abstract void InvokeUnit(TUnit unit); | ||||
void Worker() { | ||||
TUnit unit; | ||||
while (FetchTask(out unit)) | ||||
InvokeUnit(unit); | ||||
} | ||||
protected virtual void Dispose(bool disposing) { | ||||
if (disposing) { | ||||
if (m_exitRequired == 0) { | ||||
if (Interlocked.CompareExchange(ref m_exitRequired, 1, 0) != 0) | ||||
return; | ||||
// wake sleeping threads | ||||
m_hasTasks.Set(); | ||||
GC.SuppressFinalize(this); | ||||
} | ||||
} | ||||
} | ||||
public void Dispose() { | ||||
Dispose(true); | ||||
} | ||||
~DispatchPool() { | ||||
Dispose(false); | ||||
} | ||||
} | ||||
} | ||||