WorkerPool.cs
152 lines
| 4.5 KiB
| text/x-csharp
|
CSharpLexer
|
|
r12 | using System; | ||
| using System.Threading; | ||||
| using System.Diagnostics; | ||||
|
|
r35 | using Implab.Diagnostics; | ||
|
|
r12 | |||
| namespace Implab.Parallels { | ||||
|
|
r15 | public class WorkerPool : DispatchPool<Action> { | ||
|
|
r12 | |||
|
|
r119 | AsyncQueue<Action> m_queue = new AsyncQueue<Action>(); | ||
|
|
r129 | int m_queueLength; | ||
|
|
r16 | readonly int m_threshold = 1; | ||
|
|
r13 | |||
|
|
r16 | public WorkerPool(int minThreads, int maxThreads, int threshold) | ||
|
|
r15 | : base(minThreads, maxThreads) { | ||
|
|
r16 | m_threshold = threshold; | ||
| InitPool(); | ||||
| } | ||||
| public WorkerPool(int minThreads, int maxThreads) : | ||||
| base(minThreads, maxThreads) { | ||||
| InitPool(); | ||||
|
|
r13 | } | ||
|
|
r15 | public WorkerPool(int threads) | ||
| : base(threads) { | ||||
|
|
r16 | InitPool(); | ||
|
|
r13 | } | ||
|
|
r92 | public WorkerPool() { | ||
|
|
r16 | InitPool(); | ||
|
|
r13 | } | ||
|
|
r149 | public IPromise<T> Invoke<T>(Func<T> task) { | ||
| if (task == null) | ||||
| throw new ArgumentNullException("task"); | ||||
| if (IsDisposed) | ||||
| throw new ObjectDisposedException(ToString()); | ||||
| var promise = new FuncTask<T>(task, null, null, true); | ||||
| var lop = TraceContext.Instance.CurrentOperation; | ||||
| EnqueueTask(delegate { | ||||
| TraceContext.Instance.EnterLogicalOperation(lop, false); | ||||
| promise.Resolve(); | ||||
| TraceContext.Instance.Leave(); | ||||
| }); | ||||
| return promise; | ||||
| } | ||||
| public IPromise Invoke(Action task) { | ||||
| if (task == null) | ||||
| throw new ArgumentNullException("task"); | ||||
| if (IsDisposed) | ||||
| throw new ObjectDisposedException(ToString()); | ||||
| var promise = new ActionTask(task, null, null, true); | ||||
| var lop = TraceContext.Instance.CurrentOperation; | ||||
| EnqueueTask(delegate { | ||||
| TraceContext.Instance.EnterLogicalOperation(lop, false); | ||||
| promise.Resolve(); | ||||
| TraceContext.Instance.Leave(); | ||||
| }); | ||||
| return promise; | ||||
| } | ||||
| public IPromise<T> Invoke<T>(Func<ICancellationToken, T> task) { | ||||
|
|
r12 | if (task == null) | ||
| throw new ArgumentNullException("task"); | ||||
|
|
r15 | if (IsDisposed) | ||
| throw new ObjectDisposedException(ToString()); | ||||
|
|
r12 | |||
| var promise = new Promise<T>(); | ||||
|
|
r92 | var lop = TraceContext.Instance.CurrentOperation; | ||
|
|
r35 | |||
|
|
r129 | EnqueueTask(delegate { | ||
|
|
r92 | TraceContext.Instance.EnterLogicalOperation(lop, false); | ||
| try { | ||||
|
|
r149 | if (!promise.CancelOperationIfRequested()) | ||
| promise.Resolve(task(promise)); | ||||
| } catch (Exception e) { | ||||
| promise.Reject(e); | ||||
| } finally { | ||||
| TraceContext.Instance.Leave(); | ||||
| } | ||||
| }); | ||||
| return promise; | ||||
| } | ||||
| public IPromise Invoke<T>(Action<ICancellationToken> task) { | ||||
| if (task == null) | ||||
| throw new ArgumentNullException("task"); | ||||
| if (IsDisposed) | ||||
| throw new ObjectDisposedException(ToString()); | ||||
| var promise = new Promise(); | ||||
| var lop = TraceContext.Instance.CurrentOperation; | ||||
| EnqueueTask(delegate { | ||||
| TraceContext.Instance.EnterLogicalOperation(lop, false); | ||||
| try { | ||||
| if (!promise.CancelOperationIfRequested()) { | ||||
| task(promise); | ||||
| promise.Resolve(); | ||||
| } | ||||
|
|
r92 | } catch (Exception e) { | ||
| promise.Reject(e); | ||||
| } finally { | ||||
| TraceContext.Instance.Leave(); | ||||
| } | ||||
|
|
r13 | }); | ||
|
|
r12 | |||
| return promise; | ||||
| } | ||||
|
|
r15 | protected void EnqueueTask(Action unit) { | ||
| Debug.Assert(unit != null); | ||||
|
|
r16 | var len = Interlocked.Increment(ref m_queueLength); | ||
|
|
r15 | m_queue.Enqueue(unit); | ||
|
|
r16 | |||
|
|
r81 | if (len > m_threshold * PoolSize) { | ||
| StartWorker(); | ||||
|
|
r80 | } | ||
|
|
r81 | |||
| SignalThread(); | ||||
|
|
r12 | } | ||
|
|
r15 | protected override bool TryDequeue(out Action unit) { | ||
| if (m_queue.TryDequeue(out unit)) { | ||||
| Interlocked.Decrement(ref m_queueLength); | ||||
| return true; | ||||
|
|
r12 | } | ||
|
|
r15 | return false; | ||
|
|
r12 | } | ||
|
|
r15 | protected override void InvokeUnit(Action unit) { | ||
| unit(); | ||||
|
|
r12 | } | ||
|
|
r16 | |||
|
|
r12 | } | ||
| } | ||||
