WorkerPool.cs
171 lines
| 4.8 KiB
| text/x-csharp
|
CSharpLexer
cin
|
r12 | using System; | ||
using System.Collections.Generic; | ||||
using System.Linq; | ||||
using System.Text; | ||||
using System.Threading; | ||||
using System.Diagnostics; | ||||
namespace Implab.Parallels { | ||||
public class WorkerPool : IDisposable { | ||||
readonly int m_minThreads; | ||||
readonly int m_maxThreads; | ||||
int m_runningThreads; | ||||
object m_lock = new object(); | ||||
bool m_disposed = false; | ||||
cin
|
r13 | |||
// this event will signal that workers can try to fetch a task from queue or the pool has been disposed | ||||
cin
|
r12 | ManualResetEvent m_hasTasks = new ManualResetEvent(false); | ||
Queue<Action> m_queue = new Queue<Action>(); | ||||
public WorkerPool(int min, int max) { | ||||
if (min < 0) | ||||
throw new ArgumentOutOfRangeException("min"); | ||||
cin
|
r13 | if (max <= 0) | ||
throw new ArgumentOutOfRangeException("max"); | ||||
cin
|
r12 | if (min > max) | ||
min = max; | ||||
m_minThreads = min; | ||||
m_maxThreads = max; | ||||
cin
|
r13 | InitPool(); | ||
} | ||||
public WorkerPool(int max) | ||||
: this(0, max) { | ||||
} | ||||
public WorkerPool() { | ||||
int maxThreads, maxCP; | ||||
ThreadPool.GetMaxThreads(out maxThreads, out maxCP); | ||||
m_minThreads = 0; | ||||
m_maxThreads = maxThreads; | ||||
InitPool(); | ||||
} | ||||
void InitPool() { | ||||
cin
|
r12 | for (int i = 0; i < m_minThreads; i++) | ||
StartWorker(); | ||||
} | ||||
cin
|
r13 | public int ThreadCount { | ||
get { | ||||
return m_runningThreads; | ||||
} | ||||
} | ||||
cin
|
r12 | public Promise<T> Invoke<T>(Func<T> task) { | ||
if (m_disposed) | ||||
throw new ObjectDisposedException(ToString()); | ||||
if (task == null) | ||||
throw new ArgumentNullException("task"); | ||||
var promise = new Promise<T>(); | ||||
cin
|
r13 | var queueLen = EnqueueTask(delegate() { | ||
try { | ||||
promise.Resolve(task()); | ||||
} catch (Exception e) { | ||||
promise.Reject(e); | ||||
} | ||||
}); | ||||
cin
|
r12 | |||
cin
|
r13 | if (queueLen > 1) | ||
StartWorker(); | ||||
cin
|
r12 | |||
return promise; | ||||
} | ||||
bool StartWorker() { | ||||
var current = m_runningThreads; | ||||
// use spins to allocate slot for the new thread | ||||
do { | ||||
if (current >= m_maxThreads) | ||||
// no more slots left | ||||
return false; | ||||
} while (current != Interlocked.CompareExchange(ref m_runningThreads, current + 1, current)); | ||||
// slot successfully allocated | ||||
var worker = new Thread(this.Worker); | ||||
cin
|
r13 | worker.IsBackground = true; | ||
cin
|
r12 | worker.Start(); | ||
return true; | ||||
} | ||||
cin
|
r13 | int EnqueueTask(Action task) { | ||
cin
|
r12 | Debug.Assert(task != null); | ||
lock (m_queue) { | ||||
m_queue.Enqueue(task); | ||||
m_hasTasks.Set(); | ||||
cin
|
r13 | return m_queue.Count; | ||
cin
|
r12 | } | ||
} | ||||
bool FetchTask(out Action task) { | ||||
task = null; | ||||
while (true) { | ||||
m_hasTasks.WaitOne(); | ||||
if (m_disposed) | ||||
return false; | ||||
lock (m_queue) { | ||||
if (m_queue.Count > 0) { | ||||
task = m_queue.Dequeue(); | ||||
return true; | ||||
} | ||||
// no tasks left | ||||
cin
|
r13 | // signal that no more tasks left, current lock ensures that this event won't suppress newly added task | ||
cin
|
r12 | m_hasTasks.Reset(); | ||
} | ||||
bool exit = true; | ||||
var current = m_runningThreads; | ||||
do { | ||||
if (current <= m_minThreads) { | ||||
exit = false; // this thread should return and wait for the new events | ||||
break; | ||||
} | ||||
} while (current != Interlocked.CompareExchange(ref m_runningThreads, current - 1, current)); | ||||
if (exit) | ||||
return false; | ||||
} | ||||
} | ||||
void Worker() { | ||||
Action task; | ||||
while (FetchTask(out task)) | ||||
task(); | ||||
} | ||||
protected virtual void Dispose(bool disposing) { | ||||
if (disposing) { | ||||
lock (m_lock) { | ||||
if (m_disposed) | ||||
return; | ||||
m_disposed = true; | ||||
} | ||||
m_hasTasks.Set(); | ||||
GC.SuppressFinalize(this); | ||||
} | ||||
} | ||||
public void Dispose() { | ||||
Dispose(true); | ||||
} | ||||
~WorkerPool() { | ||||
Dispose(false); | ||||
} | ||||
} | ||||
} | ||||