|
|
using System;
|
|
|
using System.Collections.Generic;
|
|
|
using System.Linq;
|
|
|
using System.Text;
|
|
|
using System.Threading;
|
|
|
using System.Diagnostics;
|
|
|
using Implab.Diagnostics;
|
|
|
|
|
|
namespace Implab.Parallels {
|
|
|
public class WorkerPool : DispatchPool<Action> {
|
|
|
|
|
|
MTQueue<Action> m_queue = new MTQueue<Action>();
|
|
|
int m_queueLength = 0;
|
|
|
readonly int m_threshold = 1;
|
|
|
|
|
|
public WorkerPool(int minThreads, int maxThreads, int threshold)
|
|
|
: base(minThreads, maxThreads) {
|
|
|
m_threshold = threshold;
|
|
|
InitPool();
|
|
|
}
|
|
|
|
|
|
public WorkerPool(int minThreads, int maxThreads) :
|
|
|
base(minThreads, maxThreads) {
|
|
|
InitPool();
|
|
|
}
|
|
|
|
|
|
public WorkerPool(int threads)
|
|
|
: base(threads) {
|
|
|
InitPool();
|
|
|
}
|
|
|
|
|
|
public WorkerPool()
|
|
|
: base() {
|
|
|
InitPool();
|
|
|
}
|
|
|
|
|
|
public Promise<T> Invoke<T>(Func<T> task) {
|
|
|
if (task == null)
|
|
|
throw new ArgumentNullException("task");
|
|
|
if (IsDisposed)
|
|
|
throw new ObjectDisposedException(ToString());
|
|
|
|
|
|
var promise = new Promise<T>();
|
|
|
|
|
|
var caller = TraceContext.Snapshot();
|
|
|
|
|
|
EnqueueTask(delegate() {
|
|
|
caller.Invoke(delegate() {
|
|
|
try {
|
|
|
promise.Resolve(task());
|
|
|
} catch (Exception e) {
|
|
|
promise.Reject(e);
|
|
|
}
|
|
|
});
|
|
|
});
|
|
|
|
|
|
return promise;
|
|
|
}
|
|
|
|
|
|
protected void EnqueueTask(Action unit) {
|
|
|
Debug.Assert(unit != null);
|
|
|
var len = Interlocked.Increment(ref m_queueLength);
|
|
|
m_queue.Enqueue(unit);
|
|
|
|
|
|
if (len > m_threshold*ActiveThreads)
|
|
|
GrowPool();
|
|
|
}
|
|
|
|
|
|
protected override bool TryDequeue(out Action unit) {
|
|
|
if (m_queue.TryDequeue(out unit)) {
|
|
|
Interlocked.Decrement(ref m_queueLength);
|
|
|
return true;
|
|
|
}
|
|
|
return false;
|
|
|
}
|
|
|
|
|
|
protected override bool Suspend() {
|
|
|
// This override solves race condition
|
|
|
// WORKER CLIENT
|
|
|
// ---------------------------------------
|
|
|
// TryDeque == false
|
|
|
// Enqueue(unit), queueLen++
|
|
|
// GrowPool? == NO
|
|
|
// ActiveThreads--
|
|
|
// Suspend
|
|
|
// queueLength > 0
|
|
|
// continue
|
|
|
if (m_queueLength > 0)
|
|
|
return true;
|
|
|
return base.Suspend();
|
|
|
}
|
|
|
|
|
|
protected override void InvokeUnit(Action unit) {
|
|
|
unit();
|
|
|
}
|
|
|
|
|
|
}
|
|
|
}
|
|
|
|