##// END OF EJS Templates
Added ResetState to RunnableComponent to reset in case of failure...
Added ResetState to RunnableComponent to reset in case of failure Added StateChanged event to IRunnable Renamed Promise.SUCCESS -> Promise.Success Added Promise.FromException Renamed Bundle -> PromiseAll in PromiseExtensions

File last commit:

r149:eb793fbbe4ea v2
r205:8200ab154c8a v2
Show More
WorkerPool.cs
152 lines | 4.5 KiB | text/x-csharp | CSharpLexer
cin
refactoring, added WorkerPool
r12 using System;
using System.Threading;
using System.Diagnostics;
cin
initial log capabilities
r35 using Implab.Diagnostics;
cin
refactoring, added WorkerPool
r12
namespace Implab.Parallels {
cin
implemented parallel map and foreach for arrays...
r15 public class WorkerPool : DispatchPool<Action> {
cin
refactoring, added WorkerPool
r12
cin
Promises rewritten, added improved version of AsyncQueue
r119 AsyncQueue<Action> m_queue = new AsyncQueue<Action>();
cin
Added SharedLock to synchronization routines
r129 int m_queueLength;
cin
sync
r16 readonly int m_threshold = 1;
cin
small fixes, WorkerPool still incomplete
r13
cin
sync
r16 public WorkerPool(int minThreads, int maxThreads, int threshold)
cin
implemented parallel map and foreach for arrays...
r15 : base(minThreads, maxThreads) {
cin
sync
r16 m_threshold = threshold;
InitPool();
}
public WorkerPool(int minThreads, int maxThreads) :
base(minThreads, maxThreads) {
InitPool();
cin
small fixes, WorkerPool still incomplete
r13 }
cin
implemented parallel map and foreach for arrays...
r15 public WorkerPool(int threads)
: base(threads) {
cin
sync
r16 InitPool();
cin
small fixes, WorkerPool still incomplete
r13 }
cin
rewritten tracing
r92 public WorkerPool() {
cin
sync
r16 InitPool();
cin
small fixes, WorkerPool still incomplete
r13 }
cin
fixed promises cancellation
r149 public IPromise<T> Invoke<T>(Func<T> task) {
if (task == null)
throw new ArgumentNullException("task");
if (IsDisposed)
throw new ObjectDisposedException(ToString());
var promise = new FuncTask<T>(task, null, null, true);
var lop = TraceContext.Instance.CurrentOperation;
EnqueueTask(delegate {
TraceContext.Instance.EnterLogicalOperation(lop, false);
promise.Resolve();
TraceContext.Instance.Leave();
});
return promise;
}
public IPromise Invoke(Action task) {
if (task == null)
throw new ArgumentNullException("task");
if (IsDisposed)
throw new ObjectDisposedException(ToString());
var promise = new ActionTask(task, null, null, true);
var lop = TraceContext.Instance.CurrentOperation;
EnqueueTask(delegate {
TraceContext.Instance.EnterLogicalOperation(lop, false);
promise.Resolve();
TraceContext.Instance.Leave();
});
return promise;
}
public IPromise<T> Invoke<T>(Func<ICancellationToken, T> task) {
cin
refactoring, added WorkerPool
r12 if (task == null)
throw new ArgumentNullException("task");
cin
implemented parallel map and foreach for arrays...
r15 if (IsDisposed)
throw new ObjectDisposedException(ToString());
cin
refactoring, added WorkerPool
r12
var promise = new Promise<T>();
cin
rewritten tracing
r92 var lop = TraceContext.Instance.CurrentOperation;
cin
initial log capabilities
r35
cin
Added SharedLock to synchronization routines
r129 EnqueueTask(delegate {
cin
rewritten tracing
r92 TraceContext.Instance.EnterLogicalOperation(lop, false);
try {
cin
fixed promises cancellation
r149 if (!promise.CancelOperationIfRequested())
promise.Resolve(task(promise));
} catch (Exception e) {
promise.Reject(e);
} finally {
TraceContext.Instance.Leave();
}
});
return promise;
}
public IPromise Invoke<T>(Action<ICancellationToken> task) {
if (task == null)
throw new ArgumentNullException("task");
if (IsDisposed)
throw new ObjectDisposedException(ToString());
var promise = new Promise();
var lop = TraceContext.Instance.CurrentOperation;
EnqueueTask(delegate {
TraceContext.Instance.EnterLogicalOperation(lop, false);
try {
if (!promise.CancelOperationIfRequested()) {
task(promise);
promise.Resolve();
}
cin
rewritten tracing
r92 } catch (Exception e) {
promise.Reject(e);
} finally {
TraceContext.Instance.Leave();
}
cin
small fixes, WorkerPool still incomplete
r13 });
cin
refactoring, added WorkerPool
r12
return promise;
}
cin
implemented parallel map and foreach for arrays...
r15 protected void EnqueueTask(Action unit) {
Debug.Assert(unit != null);
cin
sync
r16 var len = Interlocked.Increment(ref m_queueLength);
cin
implemented parallel map and foreach for arrays...
r15 m_queue.Enqueue(unit);
cin
sync
r16
cin
dispatch pool rewritten
r81 if (len > m_threshold * PoolSize) {
StartWorker();
cin
added memory barriers
r80 }
cin
dispatch pool rewritten
r81
SignalThread();
cin
refactoring, added WorkerPool
r12 }
cin
implemented parallel map and foreach for arrays...
r15 protected override bool TryDequeue(out Action unit) {
if (m_queue.TryDequeue(out unit)) {
Interlocked.Decrement(ref m_queueLength);
return true;
cin
refactoring, added WorkerPool
r12 }
cin
implemented parallel map and foreach for arrays...
r15 return false;
cin
refactoring, added WorkerPool
r12 }
cin
implemented parallel map and foreach for arrays...
r15 protected override void InvokeUnit(Action unit) {
unit();
cin
refactoring, added WorkerPool
r12 }
cin
sync
r16
cin
refactoring, added WorkerPool
r12 }
}