##// END OF EJS Templates
fixed promise chaining behavior, the error handler doesn't handle result or cancellation handlers exceptions these exceptions are propagated to the next handlers.
fixed promise chaining behavior, the error handler doesn't handle result or cancellation handlers exceptions these exceptions are propagated to the next handlers.

File last commit:

r145:706fccb85524 v2
r196:40d7fed4a09e default
Show More
ArrayTraits.cs
207 lines | 7.3 KiB | text/x-csharp | CSharpLexer
using Implab.Diagnostics;
using System;
using System.Diagnostics;
using System.Threading;
namespace Implab.Parallels {
public static class ArrayTraits {
class ArrayIterator<TSrc> : DispatchPool<int> {
readonly Action<TSrc> m_action;
readonly TSrc[] m_source;
readonly Promise<int> m_promise = new Promise<int>();
readonly LogicalOperation m_logicalOperation;
int m_pending;
int m_next;
public ArrayIterator(TSrc[] source, Action<TSrc> action, int threads)
: base(threads) {
Debug.Assert(source != null);
Debug.Assert(action != null);
m_logicalOperation = TraceContext.Instance.CurrentOperation;
m_next = 0;
m_source = source;
m_pending = source.Length;
m_action = action;
m_promise.On(Dispose, PromiseEventType.All);
InitPool();
}
public Promise<int> Promise {
get {
return m_promise;
}
}
protected override void Worker() {
TraceContext.Instance.EnterLogicalOperation(m_logicalOperation, false);
try {
base.Worker();
} finally {
TraceContext.Instance.Leave();
}
}
protected override bool TryDequeue(out int unit) {
unit = Interlocked.Increment(ref m_next) - 1;
return unit < m_source.Length;
}
protected override void InvokeUnit(int unit) {
try {
m_action(m_source[unit]);
var pending = Interlocked.Decrement(ref m_pending);
if (pending == 0)
m_promise.Resolve(m_source.Length);
} catch (Exception e) {
m_promise.Reject(e);
}
}
}
class ArrayMapper<TSrc, TDst>: DispatchPool<int> {
readonly Func<TSrc, TDst> m_transform;
readonly TSrc[] m_source;
readonly TDst[] m_dest;
readonly Promise<TDst[]> m_promise = new Promise<TDst[]>();
readonly LogicalOperation m_logicalOperation;
int m_pending;
int m_next;
public ArrayMapper(TSrc[] source, Func<TSrc, TDst> transform, int threads)
: base(threads) {
Debug.Assert (source != null);
Debug.Assert( transform != null);
m_next = 0;
m_source = source;
m_dest = new TDst[source.Length];
m_pending = source.Length;
m_transform = transform;
m_logicalOperation = TraceContext.Instance.CurrentOperation;
m_promise.On(Dispose, PromiseEventType.All);
InitPool();
}
public Promise<TDst[]> Promise {
get {
return m_promise;
}
}
protected override void Worker() {
TraceContext.Instance.EnterLogicalOperation(m_logicalOperation,false);
try {
base.Worker();
} finally {
TraceContext.Instance.Leave();
}
}
protected override bool TryDequeue(out int unit) {
unit = Interlocked.Increment(ref m_next) - 1;
return unit < m_source.Length;
}
protected override void InvokeUnit(int unit) {
try {
m_dest[unit] = m_transform(m_source[unit]);
var pending = Interlocked.Decrement(ref m_pending);
if (pending == 0)
m_promise.Resolve(m_dest);
} catch (Exception e) {
m_promise.Reject(e);
}
}
}
public static IPromise<TDst[]> ParallelMap<TSrc, TDst> (this TSrc[] source, Func<TSrc,TDst> transform, int threads) {
if (source == null)
throw new ArgumentNullException("source");
if (transform == null)
throw new ArgumentNullException("transform");
var mapper = new ArrayMapper<TSrc, TDst>(source, transform, threads);
return mapper.Promise;
}
public static IPromise<int> ParallelForEach<TSrc>(this TSrc[] source, Action<TSrc> action, int threads) {
if (source == null)
throw new ArgumentNullException("source");
if (action == null)
throw new ArgumentNullException("action");
var iter = new ArrayIterator<TSrc>(source, action, threads);
return iter.Promise;
}
public static IPromise<TDst[]> ChainedMap<TSrc, TDst>(this TSrc[] source, Func<TSrc, IPromise<TDst>> transform, int threads) {
if (source == null)
throw new ArgumentNullException("source");
if (transform == null)
throw new ArgumentNullException("transform");
if (threads <= 0)
throw new ArgumentOutOfRangeException("threads","Threads number must be greater then zero");
if (source.Length == 0)
return Promise<TDst[]>.FromResult(new TDst[0]);
var promise = new Promise<TDst[]>();
var res = new TDst[source.Length];
var pending = source.Length;
object locker = new object();
int slots = threads;
// Analysis disable AccessToDisposedClosure
AsyncPool.RunThread<int>(() => {
for (int i = 0; i < source.Length; i++) {
if(promise.IsResolved)
break; // stop processing in case of error or cancellation
var idx = i;
if (Interlocked.Decrement(ref slots) < 0) {
lock(locker) {
while(slots < 0)
Monitor.Wait(locker);
}
}
try {
transform(source[i])
.On( x => {
Interlocked.Increment(ref slots);
lock (locker) {
Monitor.Pulse(locker);
}
})
.On(
x => {
res[idx] = x;
var left = Interlocked.Decrement(ref pending);
if (left == 0)
promise.Resolve(res);
},
promise.Reject
);
} catch (Exception e) {
promise.Reject(e);
}
}
return 0;
});
return promise;
}
}
}