using System; using System.Collections.Generic; using System.Linq; using System.Text; using System.Threading; using System.Diagnostics; namespace Implab.Parallels { public class WorkerPool : DispatchPool { MTQueue m_queue = new MTQueue(); int m_queueLength = 0; readonly int m_threshold = 1; public WorkerPool(int minThreads, int maxThreads, int threshold) : base(minThreads, maxThreads) { m_threshold = threshold; InitPool(); } public WorkerPool(int minThreads, int maxThreads) : base(minThreads, maxThreads) { InitPool(); } public WorkerPool(int threads) : base(threads) { InitPool(); } public WorkerPool() : base() { InitPool(); } public Promise Invoke(Func task) { if (task == null) throw new ArgumentNullException("task"); if (IsDisposed) throw new ObjectDisposedException(ToString()); var promise = new Promise(); EnqueueTask(delegate() { try { promise.Resolve(task()); } catch (Exception e) { promise.Reject(e); } }); return promise; } protected void EnqueueTask(Action unit) { Debug.Assert(unit != null); var len = Interlocked.Increment(ref m_queueLength); m_queue.Enqueue(unit); if(!ExtendPool()) WakePool(); } protected override bool ExtendPool() { if (m_queueLength <= m_threshold*ThreadCount) // in this case we are in active thread and it request for additional workers // satisfy it only when queue is longer than threshold return false; return base.ExtendPool(); } protected override bool TryDequeue(out Action unit) { if (m_queue.TryDequeue(out unit)) { Interlocked.Decrement(ref m_queueLength); return true; } return false; } protected override void InvokeUnit(Action unit) { unit(); } protected override void Suspend() { if (m_queueLength == 0) base.Suspend(); } } }