WorkerPool.cs
98 lines
| 2.8 KiB
| text/x-csharp
|
CSharpLexer
cin
|
r12 | using System; | ||
using System.Collections.Generic; | ||||
using System.Linq; | ||||
using System.Text; | ||||
using System.Threading; | ||||
using System.Diagnostics; | ||||
cin
|
r35 | using Implab.Diagnostics; | ||
cin
|
r12 | |||
namespace Implab.Parallels { | ||||
cin
|
r15 | public class WorkerPool : DispatchPool<Action> { | ||
cin
|
r12 | |||
cin
|
r15 | MTQueue<Action> m_queue = new MTQueue<Action>(); | ||
int m_queueLength = 0; | ||||
cin
|
r16 | readonly int m_threshold = 1; | ||
cin
|
r13 | |||
cin
|
r16 | public WorkerPool(int minThreads, int maxThreads, int threshold) | ||
cin
|
r15 | : base(minThreads, maxThreads) { | ||
cin
|
r16 | m_threshold = threshold; | ||
InitPool(); | ||||
} | ||||
public WorkerPool(int minThreads, int maxThreads) : | ||||
base(minThreads, maxThreads) { | ||||
InitPool(); | ||||
cin
|
r13 | } | ||
cin
|
r15 | public WorkerPool(int threads) | ||
: base(threads) { | ||||
cin
|
r16 | InitPool(); | ||
cin
|
r13 | } | ||
cin
|
r15 | public WorkerPool() | ||
: base() { | ||||
cin
|
r16 | InitPool(); | ||
cin
|
r13 | } | ||
cin
|
r12 | public Promise<T> Invoke<T>(Func<T> task) { | ||
if (task == null) | ||||
throw new ArgumentNullException("task"); | ||||
cin
|
r15 | if (IsDisposed) | ||
throw new ObjectDisposedException(ToString()); | ||||
cin
|
r12 | |||
var promise = new Promise<T>(); | ||||
cin
|
r40 | var caller = TraceContext.Snapshot(); | ||
cin
|
r35 | |||
cin
|
r15 | EnqueueTask(delegate() { | ||
cin
|
r40 | caller.Invoke(delegate() { | ||
try { | ||||
promise.Resolve(task()); | ||||
} catch (Exception e) { | ||||
promise.Reject(e); | ||||
} | ||||
}); | ||||
cin
|
r13 | }); | ||
cin
|
r12 | |||
return promise; | ||||
} | ||||
cin
|
r15 | protected void EnqueueTask(Action unit) { | ||
Debug.Assert(unit != null); | ||||
cin
|
r16 | var len = Interlocked.Increment(ref m_queueLength); | ||
cin
|
r15 | m_queue.Enqueue(unit); | ||
cin
|
r16 | |||
cin
|
r21 | if (len > m_threshold*ActiveThreads) | ||
GrowPool(); | ||||
cin
|
r12 | } | ||
cin
|
r15 | protected override bool TryDequeue(out Action unit) { | ||
if (m_queue.TryDequeue(out unit)) { | ||||
Interlocked.Decrement(ref m_queueLength); | ||||
return true; | ||||
cin
|
r12 | } | ||
cin
|
r15 | return false; | ||
cin
|
r12 | } | ||
cin
|
r34 | protected override bool Suspend() { | ||
// This override solves race condition | ||||
// WORKER CLIENT | ||||
// --------------------------------------- | ||||
// TryDeque == false | ||||
// Enqueue(unit), queueLen++ | ||||
// GrowPool? == NO | ||||
// ActiveThreads-- | ||||
// Suspend | ||||
// queueLength > 0 | ||||
// continue | ||||
if (m_queueLength > 0) | ||||
return true; | ||||
return base.Suspend(); | ||||
} | ||||
cin
|
r15 | protected override void InvokeUnit(Action unit) { | ||
unit(); | ||||
cin
|
r12 | } | ||
cin
|
r16 | |||
cin
|
r12 | } | ||
} | ||||