##// END OF EJS Templates
Code review, added a non generic version of SyncContextPromise
Code review, added a non generic version of SyncContextPromise

File last commit:

r149:eb793fbbe4ea v2
r211:3eb3255d8cc5 v2
Show More
WorkerPool.cs
152 lines | 4.5 KiB | text/x-csharp | CSharpLexer
cin
refactoring, added WorkerPool
r12 using System;
using System.Threading;
using System.Diagnostics;
cin
initial log capabilities
r35 using Implab.Diagnostics;
cin
refactoring, added WorkerPool
r12
namespace Implab.Parallels {
cin
implemented parallel map and foreach for arrays...
r15 public class WorkerPool : DispatchPool<Action> {
cin
refactoring, added WorkerPool
r12
cin
Promises rewritten, added improved version of AsyncQueue
r119 AsyncQueue<Action> m_queue = new AsyncQueue<Action>();
cin
Added SharedLock to synchronization routines
r129 int m_queueLength;
cin
sync
r16 readonly int m_threshold = 1;
cin
small fixes, WorkerPool still incomplete
r13
cin
sync
r16 public WorkerPool(int minThreads, int maxThreads, int threshold)
cin
implemented parallel map and foreach for arrays...
r15 : base(minThreads, maxThreads) {
cin
sync
r16 m_threshold = threshold;
InitPool();
}
public WorkerPool(int minThreads, int maxThreads) :
base(minThreads, maxThreads) {
InitPool();
cin
small fixes, WorkerPool still incomplete
r13 }
cin
implemented parallel map and foreach for arrays...
r15 public WorkerPool(int threads)
: base(threads) {
cin
sync
r16 InitPool();
cin
small fixes, WorkerPool still incomplete
r13 }
cin
rewritten tracing
r92 public WorkerPool() {
cin
sync
r16 InitPool();
cin
small fixes, WorkerPool still incomplete
r13 }
cin
fixed promises cancellation
r149 public IPromise<T> Invoke<T>(Func<T> task) {
if (task == null)
throw new ArgumentNullException("task");
if (IsDisposed)
throw new ObjectDisposedException(ToString());
var promise = new FuncTask<T>(task, null, null, true);
var lop = TraceContext.Instance.CurrentOperation;
EnqueueTask(delegate {
TraceContext.Instance.EnterLogicalOperation(lop, false);
promise.Resolve();
TraceContext.Instance.Leave();
});
return promise;
}
public IPromise Invoke(Action task) {
if (task == null)
throw new ArgumentNullException("task");
if (IsDisposed)
throw new ObjectDisposedException(ToString());
var promise = new ActionTask(task, null, null, true);
var lop = TraceContext.Instance.CurrentOperation;
EnqueueTask(delegate {
TraceContext.Instance.EnterLogicalOperation(lop, false);
promise.Resolve();
TraceContext.Instance.Leave();
});
return promise;
}
public IPromise<T> Invoke<T>(Func<ICancellationToken, T> task) {
cin
refactoring, added WorkerPool
r12 if (task == null)
throw new ArgumentNullException("task");
cin
implemented parallel map and foreach for arrays...
r15 if (IsDisposed)
throw new ObjectDisposedException(ToString());
cin
refactoring, added WorkerPool
r12
var promise = new Promise<T>();
cin
rewritten tracing
r92 var lop = TraceContext.Instance.CurrentOperation;
cin
initial log capabilities
r35
cin
Added SharedLock to synchronization routines
r129 EnqueueTask(delegate {
cin
rewritten tracing
r92 TraceContext.Instance.EnterLogicalOperation(lop, false);
try {
cin
fixed promises cancellation
r149 if (!promise.CancelOperationIfRequested())
promise.Resolve(task(promise));
} catch (Exception e) {
promise.Reject(e);
} finally {
TraceContext.Instance.Leave();
}
});
return promise;
}
public IPromise Invoke<T>(Action<ICancellationToken> task) {
if (task == null)
throw new ArgumentNullException("task");
if (IsDisposed)
throw new ObjectDisposedException(ToString());
var promise = new Promise();
var lop = TraceContext.Instance.CurrentOperation;
EnqueueTask(delegate {
TraceContext.Instance.EnterLogicalOperation(lop, false);
try {
if (!promise.CancelOperationIfRequested()) {
task(promise);
promise.Resolve();
}
cin
rewritten tracing
r92 } catch (Exception e) {
promise.Reject(e);
} finally {
TraceContext.Instance.Leave();
}
cin
small fixes, WorkerPool still incomplete
r13 });
cin
refactoring, added WorkerPool
r12
return promise;
}
cin
implemented parallel map and foreach for arrays...
r15 protected void EnqueueTask(Action unit) {
Debug.Assert(unit != null);
cin
sync
r16 var len = Interlocked.Increment(ref m_queueLength);
cin
implemented parallel map and foreach for arrays...
r15 m_queue.Enqueue(unit);
cin
sync
r16
cin
dispatch pool rewritten
r81 if (len > m_threshold * PoolSize) {
StartWorker();
cin
added memory barriers
r80 }
cin
dispatch pool rewritten
r81
SignalThread();
cin
refactoring, added WorkerPool
r12 }
cin
implemented parallel map and foreach for arrays...
r15 protected override bool TryDequeue(out Action unit) {
if (m_queue.TryDequeue(out unit)) {
Interlocked.Decrement(ref m_queueLength);
return true;
cin
refactoring, added WorkerPool
r12 }
cin
implemented parallel map and foreach for arrays...
r15 return false;
cin
refactoring, added WorkerPool
r12 }
cin
implemented parallel map and foreach for arrays...
r15 protected override void InvokeUnit(Action unit) {
unit();
cin
refactoring, added WorkerPool
r12 }
cin
sync
r16
cin
refactoring, added WorkerPool
r12 }
}