WorkerPool.cs
82 lines
| 2.2 KiB
| text/x-csharp
|
CSharpLexer
cin
|
r12 | using System; | ||
using System.Threading; | ||||
using System.Diagnostics; | ||||
cin
|
r35 | using Implab.Diagnostics; | ||
cin
|
r12 | |||
namespace Implab.Parallels { | ||||
cin
|
r15 | public class WorkerPool : DispatchPool<Action> { | ||
cin
|
r12 | |||
cin
|
r119 | AsyncQueue<Action> m_queue = new AsyncQueue<Action>(); | ||
cin
|
r129 | int m_queueLength; | ||
cin
|
r16 | readonly int m_threshold = 1; | ||
cin
|
r13 | |||
cin
|
r16 | public WorkerPool(int minThreads, int maxThreads, int threshold) | ||
cin
|
r15 | : base(minThreads, maxThreads) { | ||
cin
|
r16 | m_threshold = threshold; | ||
InitPool(); | ||||
} | ||||
public WorkerPool(int minThreads, int maxThreads) : | ||||
base(minThreads, maxThreads) { | ||||
InitPool(); | ||||
cin
|
r13 | } | ||
cin
|
r15 | public WorkerPool(int threads) | ||
: base(threads) { | ||||
cin
|
r16 | InitPool(); | ||
cin
|
r13 | } | ||
cin
|
r92 | public WorkerPool() { | ||
cin
|
r16 | InitPool(); | ||
cin
|
r13 | } | ||
cin
|
r12 | public Promise<T> Invoke<T>(Func<T> task) { | ||
if (task == null) | ||||
throw new ArgumentNullException("task"); | ||||
cin
|
r15 | if (IsDisposed) | ||
throw new ObjectDisposedException(ToString()); | ||||
cin
|
r12 | |||
var promise = new Promise<T>(); | ||||
cin
|
r92 | var lop = TraceContext.Instance.CurrentOperation; | ||
cin
|
r35 | |||
cin
|
r129 | EnqueueTask(delegate { | ||
cin
|
r92 | TraceContext.Instance.EnterLogicalOperation(lop, false); | ||
try { | ||||
promise.Resolve(task()); | ||||
} catch (Exception e) { | ||||
promise.Reject(e); | ||||
} finally { | ||||
TraceContext.Instance.Leave(); | ||||
} | ||||
cin
|
r13 | }); | ||
cin
|
r12 | |||
return promise; | ||||
} | ||||
cin
|
r15 | protected void EnqueueTask(Action unit) { | ||
Debug.Assert(unit != null); | ||||
cin
|
r16 | var len = Interlocked.Increment(ref m_queueLength); | ||
cin
|
r15 | m_queue.Enqueue(unit); | ||
cin
|
r16 | |||
cin
|
r81 | if (len > m_threshold * PoolSize) { | ||
StartWorker(); | ||||
cin
|
r80 | } | ||
cin
|
r81 | |||
SignalThread(); | ||||
cin
|
r12 | } | ||
cin
|
r15 | protected override bool TryDequeue(out Action unit) { | ||
if (m_queue.TryDequeue(out unit)) { | ||||
Interlocked.Decrement(ref m_queueLength); | ||||
return true; | ||||
cin
|
r12 | } | ||
cin
|
r15 | return false; | ||
cin
|
r12 | } | ||
cin
|
r15 | protected override void InvokeUnit(Action unit) { | ||
unit(); | ||||
cin
|
r12 | } | ||
cin
|
r16 | |||
cin
|
r12 | } | ||
} | ||||