WorkerPool.cs
89 lines
| 2.5 KiB
| text/x-csharp
|
CSharpLexer
cin
|
r12 | using System; | ||
using System.Collections.Generic; | ||||
using System.Linq; | ||||
using System.Text; | ||||
using System.Threading; | ||||
using System.Diagnostics; | ||||
namespace Implab.Parallels { | ||||
cin
|
r15 | public class WorkerPool : DispatchPool<Action> { | ||
cin
|
r12 | |||
cin
|
r15 | MTQueue<Action> m_queue = new MTQueue<Action>(); | ||
int m_queueLength = 0; | ||||
cin
|
r16 | readonly int m_threshold = 1; | ||
cin
|
r13 | |||
cin
|
r16 | public WorkerPool(int minThreads, int maxThreads, int threshold) | ||
cin
|
r15 | : base(minThreads, maxThreads) { | ||
cin
|
r16 | m_threshold = threshold; | ||
InitPool(); | ||||
} | ||||
public WorkerPool(int minThreads, int maxThreads) : | ||||
base(minThreads, maxThreads) { | ||||
InitPool(); | ||||
cin
|
r13 | } | ||
cin
|
r15 | public WorkerPool(int threads) | ||
: base(threads) { | ||||
cin
|
r16 | InitPool(); | ||
cin
|
r13 | } | ||
cin
|
r15 | public WorkerPool() | ||
: base() { | ||||
cin
|
r16 | InitPool(); | ||
cin
|
r13 | } | ||
cin
|
r12 | public Promise<T> Invoke<T>(Func<T> task) { | ||
if (task == null) | ||||
throw new ArgumentNullException("task"); | ||||
cin
|
r15 | if (IsDisposed) | ||
throw new ObjectDisposedException(ToString()); | ||||
cin
|
r12 | |||
var promise = new Promise<T>(); | ||||
cin
|
r15 | EnqueueTask(delegate() { | ||
cin
|
r13 | try { | ||
promise.Resolve(task()); | ||||
} catch (Exception e) { | ||||
promise.Reject(e); | ||||
} | ||||
}); | ||||
cin
|
r12 | |||
return promise; | ||||
} | ||||
cin
|
r15 | protected void EnqueueTask(Action unit) { | ||
Debug.Assert(unit != null); | ||||
cin
|
r16 | var len = Interlocked.Increment(ref m_queueLength); | ||
cin
|
r15 | m_queue.Enqueue(unit); | ||
cin
|
r16 | |||
cin
|
r17 | if(!ExtendPool()) | ||
WakePool(); | ||||
cin
|
r16 | } | ||
cin
|
r17 | protected override bool ExtendPool() { | ||
if (m_queueLength <= m_threshold*ThreadCount) | ||||
cin
|
r16 | // in this case we are in active thread and it request for additional workers | ||
// satisfy it only when queue is longer than threshold | ||||
cin
|
r17 | return false; | ||
return base.ExtendPool(); | ||||
cin
|
r12 | } | ||
cin
|
r15 | protected override bool TryDequeue(out Action unit) { | ||
if (m_queue.TryDequeue(out unit)) { | ||||
Interlocked.Decrement(ref m_queueLength); | ||||
return true; | ||||
cin
|
r12 | } | ||
cin
|
r15 | return false; | ||
cin
|
r12 | } | ||
cin
|
r15 | protected override void InvokeUnit(Action unit) { | ||
unit(); | ||||
cin
|
r12 | } | ||
cin
|
r16 | |||
protected override void Suspend() { | ||||
if (m_queueLength == 0) | ||||
base.Suspend(); | ||||
} | ||||
cin
|
r12 | } | ||
} | ||||