##// END OF EJS Templates
added memory barriers
added memory barriers

File last commit:

r80:4f20870d0816 v2
r80:4f20870d0816 v2
Show More
WorkerPool.cs
106 lines | 3.1 KiB | text/x-csharp | CSharpLexer
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading;
using System.Diagnostics;
using Implab.Diagnostics;
namespace Implab.Parallels {
public class WorkerPool : DispatchPool<Action> {
MTQueue<Action> m_queue = new MTQueue<Action>();
int m_queueLength = 0;
readonly int m_threshold = 1;
int m_workers = 0;
public WorkerPool(int minThreads, int maxThreads, int threshold)
: base(minThreads, maxThreads) {
m_threshold = threshold;
m_workers = minThreads;
InitPool();
}
public WorkerPool(int minThreads, int maxThreads) :
base(minThreads, maxThreads) {
m_workers = minThreads;
InitPool();
}
public WorkerPool(int threads)
: base(threads) {
m_workers = threads;
InitPool();
}
public WorkerPool()
: base() {
InitPool();
}
public Promise<T> Invoke<T>(Func<T> task) {
if (task == null)
throw new ArgumentNullException("task");
if (IsDisposed)
throw new ObjectDisposedException(ToString());
var promise = new Promise<T>();
var caller = TraceContext.Snapshot();
EnqueueTask(delegate() {
caller.Invoke(delegate() {
try {
promise.Resolve(task());
} catch (Exception e) {
promise.Reject(e);
}
});
});
return promise;
}
protected void EnqueueTask(Action unit) {
Debug.Assert(unit != null);
var len = Interlocked.Increment(ref m_queueLength);
m_queue.Enqueue(unit);
if (len > m_threshold * m_workers) {
Interlocked.Increment(ref m_workers);
GrowPool();
}
}
protected override bool TryDequeue(out Action unit) {
if (m_queue.TryDequeue(out unit)) {
Interlocked.Decrement(ref m_queueLength);
return true;
}
return false;
}
protected override bool Suspend() {
// This override solves race condition
// WORKER CLIENT
// ---------------------------------------
// TryDeque == false
// Enqueue(unit), queueLen++
// GrowPool? == NO
// ActiveThreads--
// Suspend
// queueLength > 0
// continue
Thread.MemoryBarrier();
if (m_queueLength > 0)
return true;
Interlocked.Decrement(ref m_workers);
return base.Suspend();
}
protected override void InvokeUnit(Action unit) {
unit();
}
}
}