|
|
using Implab.Diagnostics;
|
|
|
using System;
|
|
|
using System.Collections.Generic;
|
|
|
using System.Diagnostics;
|
|
|
using System.Linq;
|
|
|
using System.Text;
|
|
|
using System.Threading;
|
|
|
|
|
|
namespace Implab.Parallels {
|
|
|
public static class ArrayTraits {
|
|
|
class ArrayIterator<TSrc> : DispatchPool<int> {
|
|
|
readonly Action<TSrc> m_action;
|
|
|
readonly TSrc[] m_source;
|
|
|
readonly Promise<int> m_promise = new Promise<int>();
|
|
|
readonly TraceContext m_traceContext;
|
|
|
|
|
|
int m_pending;
|
|
|
int m_next;
|
|
|
|
|
|
public ArrayIterator(TSrc[] source, Action<TSrc> action, int threads)
|
|
|
: base(threads) {
|
|
|
|
|
|
Debug.Assert(source != null);
|
|
|
Debug.Assert(action != null);
|
|
|
|
|
|
m_traceContext = TraceContext.Snapshot();
|
|
|
m_next = 0;
|
|
|
m_source = source;
|
|
|
m_pending = source.Length;
|
|
|
m_action = action;
|
|
|
|
|
|
m_promise.Anyway(Dispose);
|
|
|
|
|
|
InitPool();
|
|
|
}
|
|
|
|
|
|
public Promise<int> Promise {
|
|
|
get {
|
|
|
return m_promise;
|
|
|
}
|
|
|
}
|
|
|
|
|
|
protected override void Worker() {
|
|
|
TraceContext.Fork(m_traceContext);
|
|
|
base.Worker();
|
|
|
}
|
|
|
|
|
|
protected override bool TryDequeue(out int unit) {
|
|
|
unit = Interlocked.Increment(ref m_next) - 1;
|
|
|
return unit < m_source.Length;
|
|
|
}
|
|
|
|
|
|
protected override void InvokeUnit(int unit) {
|
|
|
try {
|
|
|
m_action(m_source[unit]);
|
|
|
var pending = Interlocked.Decrement(ref m_pending);
|
|
|
if (pending == 0)
|
|
|
m_promise.Resolve(m_source.Length);
|
|
|
} catch (Exception e) {
|
|
|
m_promise.Reject(e);
|
|
|
}
|
|
|
}
|
|
|
}
|
|
|
|
|
|
class ArrayMapper<TSrc, TDst>: DispatchPool<int> {
|
|
|
readonly Func<TSrc, TDst> m_transform;
|
|
|
readonly TSrc[] m_source;
|
|
|
readonly TDst[] m_dest;
|
|
|
readonly Promise<TDst[]> m_promise = new Promise<TDst[]>();
|
|
|
readonly TraceContext m_traceContext;
|
|
|
|
|
|
int m_pending;
|
|
|
int m_next;
|
|
|
|
|
|
public ArrayMapper(TSrc[] source, Func<TSrc, TDst> transform, int threads)
|
|
|
: base(threads) {
|
|
|
|
|
|
Debug.Assert (source != null);
|
|
|
Debug.Assert( transform != null);
|
|
|
|
|
|
m_next = 0;
|
|
|
m_source = source;
|
|
|
m_dest = new TDst[source.Length];
|
|
|
m_pending = source.Length;
|
|
|
m_transform = transform;
|
|
|
m_traceContext = TraceContext.Snapshot();
|
|
|
|
|
|
m_promise.Anyway(Dispose);
|
|
|
|
|
|
InitPool();
|
|
|
}
|
|
|
|
|
|
public Promise<TDst[]> Promise {
|
|
|
get {
|
|
|
return m_promise;
|
|
|
}
|
|
|
}
|
|
|
|
|
|
protected override void Worker() {
|
|
|
TraceContext.Fork(m_traceContext);
|
|
|
base.Worker();
|
|
|
}
|
|
|
|
|
|
protected override bool TryDequeue(out int unit) {
|
|
|
unit = Interlocked.Increment(ref m_next) - 1;
|
|
|
return unit >= m_source.Length ? false : true;
|
|
|
}
|
|
|
|
|
|
protected override void InvokeUnit(int unit) {
|
|
|
try {
|
|
|
m_dest[unit] = m_transform(m_source[unit]);
|
|
|
var pending = Interlocked.Decrement(ref m_pending);
|
|
|
if (pending == 0)
|
|
|
m_promise.Resolve(m_dest);
|
|
|
} catch (Exception e) {
|
|
|
m_promise.Reject(e);
|
|
|
}
|
|
|
}
|
|
|
}
|
|
|
|
|
|
public static IPromise<TDst[]> ParallelMap<TSrc, TDst> (this TSrc[] source, Func<TSrc,TDst> transform, int threads) {
|
|
|
if (source == null)
|
|
|
throw new ArgumentNullException("source");
|
|
|
if (transform == null)
|
|
|
throw new ArgumentNullException("transform");
|
|
|
|
|
|
var mapper = new ArrayMapper<TSrc, TDst>(source, transform, threads);
|
|
|
return mapper.Promise;
|
|
|
}
|
|
|
|
|
|
public static IPromise<int> ParallelForEach<TSrc>(this TSrc[] source, Action<TSrc> action, int threads) {
|
|
|
if (source == null)
|
|
|
throw new ArgumentNullException("source");
|
|
|
if (action == null)
|
|
|
throw new ArgumentNullException("action");
|
|
|
|
|
|
var iter = new ArrayIterator<TSrc>(source, action, threads);
|
|
|
return iter.Promise;
|
|
|
}
|
|
|
|
|
|
public static IPromise<TDst[]> ChainedMap<TSrc, TDst>(this TSrc[] source, ResultMapper<TSrc, IPromise<TDst>> transform, int threads) {
|
|
|
if (source == null)
|
|
|
throw new ArgumentNullException("source");
|
|
|
if (transform == null)
|
|
|
throw new ArgumentNullException("transform");
|
|
|
if (threads <= 0)
|
|
|
throw new ArgumentOutOfRangeException("Threads number must be greater then zero");
|
|
|
|
|
|
if (source.Length == 0)
|
|
|
return Promise<TDst[]>.ResultToPromise(new TDst[0]);
|
|
|
|
|
|
var promise = new Promise<TDst[]>();
|
|
|
var res = new TDst[source.Length];
|
|
|
var pending = source.Length;
|
|
|
|
|
|
object locker = new object();
|
|
|
int slots = threads;
|
|
|
|
|
|
// Analysis disable AccessToDisposedClosure
|
|
|
AsyncPool.InvokeNewThread(() => {
|
|
|
for (int i = 0; i < source.Length; i++) {
|
|
|
if(promise.IsResolved)
|
|
|
break; // stop processing in case of error or cancellation
|
|
|
var idx = i;
|
|
|
|
|
|
if (Interlocked.Decrement(ref slots) < 0) {
|
|
|
lock(locker) {
|
|
|
while(slots < 0)
|
|
|
Monitor.Wait(locker);
|
|
|
}
|
|
|
}
|
|
|
|
|
|
try {
|
|
|
transform(source[i])
|
|
|
.Anyway(() => {
|
|
|
Interlocked.Increment(ref slots);
|
|
|
lock (locker) {
|
|
|
Monitor.Pulse(locker);
|
|
|
}
|
|
|
})
|
|
|
.Last(
|
|
|
x => {
|
|
|
res[idx] = x;
|
|
|
var left = Interlocked.Decrement(ref pending);
|
|
|
if (left == 0)
|
|
|
promise.Resolve(res);
|
|
|
},
|
|
|
promise.Reject
|
|
|
);
|
|
|
|
|
|
} catch (Exception e) {
|
|
|
promise.Reject(e);
|
|
|
}
|
|
|
}
|
|
|
return 0;
|
|
|
});
|
|
|
|
|
|
return promise;
|
|
|
}
|
|
|
|
|
|
}
|
|
|
}
|
|
|
|