##// END OF EJS Templates
Bound promise to CancellationToken...
Bound promise to CancellationToken Added new states to ExecutionSate enum. Added Safe.Guard() method to handle cleanup of the result of the promise

File last commit:

r145:706fccb85524 v2
r209:a867536c68fc v2
Show More
ArrayTraits.cs
207 lines | 7.3 KiB | text/x-csharp | CSharpLexer
using Implab.Diagnostics;
using System;
using System.Diagnostics;
using System.Threading;
namespace Implab.Parallels {
public static class ArrayTraits {
class ArrayIterator<TSrc> : DispatchPool<int> {
readonly Action<TSrc> m_action;
readonly TSrc[] m_source;
readonly Promise<int> m_promise = new Promise<int>();
readonly LogicalOperation m_logicalOperation;
int m_pending;
int m_next;
public ArrayIterator(TSrc[] source, Action<TSrc> action, int threads)
: base(threads) {
Debug.Assert(source != null);
Debug.Assert(action != null);
m_logicalOperation = TraceContext.Instance.CurrentOperation;
m_next = 0;
m_source = source;
m_pending = source.Length;
m_action = action;
m_promise.On(Dispose, PromiseEventType.All);
InitPool();
}
public Promise<int> Promise {
get {
return m_promise;
}
}
protected override void Worker() {
TraceContext.Instance.EnterLogicalOperation(m_logicalOperation, false);
try {
base.Worker();
} finally {
TraceContext.Instance.Leave();
}
}
protected override bool TryDequeue(out int unit) {
unit = Interlocked.Increment(ref m_next) - 1;
return unit < m_source.Length;
}
protected override void InvokeUnit(int unit) {
try {
m_action(m_source[unit]);
var pending = Interlocked.Decrement(ref m_pending);
if (pending == 0)
m_promise.Resolve(m_source.Length);
} catch (Exception e) {
m_promise.Reject(e);
}
}
}
class ArrayMapper<TSrc, TDst>: DispatchPool<int> {
readonly Func<TSrc, TDst> m_transform;
readonly TSrc[] m_source;
readonly TDst[] m_dest;
readonly Promise<TDst[]> m_promise = new Promise<TDst[]>();
readonly LogicalOperation m_logicalOperation;
int m_pending;
int m_next;
public ArrayMapper(TSrc[] source, Func<TSrc, TDst> transform, int threads)
: base(threads) {
Debug.Assert (source != null);
Debug.Assert( transform != null);
m_next = 0;
m_source = source;
m_dest = new TDst[source.Length];
m_pending = source.Length;
m_transform = transform;
m_logicalOperation = TraceContext.Instance.CurrentOperation;
m_promise.On(Dispose, PromiseEventType.All);
InitPool();
}
public Promise<TDst[]> Promise {
get {
return m_promise;
}
}
protected override void Worker() {
TraceContext.Instance.EnterLogicalOperation(m_logicalOperation,false);
try {
base.Worker();
} finally {
TraceContext.Instance.Leave();
}
}
protected override bool TryDequeue(out int unit) {
unit = Interlocked.Increment(ref m_next) - 1;
return unit < m_source.Length;
}
protected override void InvokeUnit(int unit) {
try {
m_dest[unit] = m_transform(m_source[unit]);
var pending = Interlocked.Decrement(ref m_pending);
if (pending == 0)
m_promise.Resolve(m_dest);
} catch (Exception e) {
m_promise.Reject(e);
}
}
}
public static IPromise<TDst[]> ParallelMap<TSrc, TDst> (this TSrc[] source, Func<TSrc,TDst> transform, int threads) {
if (source == null)
throw new ArgumentNullException("source");
if (transform == null)
throw new ArgumentNullException("transform");
var mapper = new ArrayMapper<TSrc, TDst>(source, transform, threads);
return mapper.Promise;
}
public static IPromise<int> ParallelForEach<TSrc>(this TSrc[] source, Action<TSrc> action, int threads) {
if (source == null)
throw new ArgumentNullException("source");
if (action == null)
throw new ArgumentNullException("action");
var iter = new ArrayIterator<TSrc>(source, action, threads);
return iter.Promise;
}
public static IPromise<TDst[]> ChainedMap<TSrc, TDst>(this TSrc[] source, Func<TSrc, IPromise<TDst>> transform, int threads) {
if (source == null)
throw new ArgumentNullException("source");
if (transform == null)
throw new ArgumentNullException("transform");
if (threads <= 0)
throw new ArgumentOutOfRangeException("threads","Threads number must be greater then zero");
if (source.Length == 0)
return Promise<TDst[]>.FromResult(new TDst[0]);
var promise = new Promise<TDst[]>();
var res = new TDst[source.Length];
var pending = source.Length;
object locker = new object();
int slots = threads;
// Analysis disable AccessToDisposedClosure
AsyncPool.RunThread<int>(() => {
for (int i = 0; i < source.Length; i++) {
if(promise.IsResolved)
break; // stop processing in case of error or cancellation
var idx = i;
if (Interlocked.Decrement(ref slots) < 0) {
lock(locker) {
while(slots < 0)
Monitor.Wait(locker);
}
}
try {
transform(source[i])
.On( x => {
Interlocked.Increment(ref slots);
lock (locker) {
Monitor.Pulse(locker);
}
})
.On(
x => {
res[idx] = x;
var left = Interlocked.Decrement(ref pending);
if (left == 0)
promise.Resolve(res);
},
promise.Reject
);
} catch (Exception e) {
promise.Reject(e);
}
}
return 0;
});
return promise;
}
}
}