ArrayTraits.cs
193 lines
| 6.7 KiB
| text/x-csharp
|
CSharpLexer
cin
|
r41 | using Implab.Diagnostics; | ||
using System; | ||||
cin
|
r15 | using System.Collections.Generic; | ||
using System.Diagnostics; | ||||
using System.Linq; | ||||
using System.Text; | ||||
using System.Threading; | ||||
namespace Implab.Parallels { | ||||
public static class ArrayTraits { | ||||
class ArrayIterator<TSrc> : DispatchPool<int> { | ||||
readonly Action<TSrc> m_action; | ||||
readonly TSrc[] m_source; | ||||
readonly Promise<int> m_promise = new Promise<int>(); | ||||
cin
|
r41 | readonly TraceContext m_traceContext; | ||
cin
|
r15 | |||
int m_pending; | ||||
int m_next; | ||||
public ArrayIterator(TSrc[] source, Action<TSrc> action, int threads) | ||||
: base(threads) { | ||||
Debug.Assert(source != null); | ||||
Debug.Assert(action != null); | ||||
cin
|
r41 | m_traceContext = TraceContext.Snapshot(); | ||
cin
|
r15 | m_next = 0; | ||
m_source = source; | ||||
m_pending = source.Length; | ||||
m_action = action; | ||||
cin
|
r76 | m_promise.Anyway(Dispose); | ||
cin
|
r15 | |||
InitPool(); | ||||
} | ||||
public Promise<int> Promise { | ||||
get { | ||||
return m_promise; | ||||
} | ||||
} | ||||
cin
|
r41 | protected override void Worker() { | ||
cin
|
r48 | TraceContext.Fork(m_traceContext); | ||
cin
|
r41 | base.Worker(); | ||
} | ||||
cin
|
r15 | protected override bool TryDequeue(out int unit) { | ||
cin
|
r16 | unit = Interlocked.Increment(ref m_next) - 1; | ||
cin
|
r75 | return unit < m_source.Length; | ||
cin
|
r15 | } | ||
protected override void InvokeUnit(int unit) { | ||||
try { | ||||
m_action(m_source[unit]); | ||||
cin
|
r16 | var pending = Interlocked.Decrement(ref m_pending); | ||
cin
|
r15 | if (pending == 0) | ||
m_promise.Resolve(m_source.Length); | ||||
} catch (Exception e) { | ||||
m_promise.Reject(e); | ||||
} | ||||
} | ||||
} | ||||
class ArrayMapper<TSrc, TDst>: DispatchPool<int> { | ||||
readonly Func<TSrc, TDst> m_transform; | ||||
readonly TSrc[] m_source; | ||||
readonly TDst[] m_dest; | ||||
readonly Promise<TDst[]> m_promise = new Promise<TDst[]>(); | ||||
cin
|
r41 | readonly TraceContext m_traceContext; | ||
cin
|
r15 | |||
int m_pending; | ||||
int m_next; | ||||
public ArrayMapper(TSrc[] source, Func<TSrc, TDst> transform, int threads) | ||||
: base(threads) { | ||||
Debug.Assert (source != null); | ||||
Debug.Assert( transform != null); | ||||
m_next = 0; | ||||
m_source = source; | ||||
m_dest = new TDst[source.Length]; | ||||
m_pending = source.Length; | ||||
m_transform = transform; | ||||
cin
|
r41 | m_traceContext = TraceContext.Snapshot(); | ||
cin
|
r15 | |||
cin
|
r76 | m_promise.Anyway(Dispose); | ||
cin
|
r15 | |||
InitPool(); | ||||
} | ||||
public Promise<TDst[]> Promise { | ||||
get { | ||||
return m_promise; | ||||
} | ||||
} | ||||
cin
|
r41 | protected override void Worker() { | ||
cin
|
r48 | TraceContext.Fork(m_traceContext); | ||
cin
|
r41 | base.Worker(); | ||
} | ||||
cin
|
r15 | protected override bool TryDequeue(out int unit) { | ||
cin
|
r16 | unit = Interlocked.Increment(ref m_next) - 1; | ||
return unit >= m_source.Length ? false : true; | ||||
cin
|
r15 | } | ||
protected override void InvokeUnit(int unit) { | ||||
try { | ||||
m_dest[unit] = m_transform(m_source[unit]); | ||||
cin
|
r16 | var pending = Interlocked.Decrement(ref m_pending); | ||
cin
|
r15 | if (pending == 0) | ||
m_promise.Resolve(m_dest); | ||||
} catch (Exception e) { | ||||
m_promise.Reject(e); | ||||
} | ||||
} | ||||
} | ||||
cin
|
r30 | public static IPromise<TDst[]> ParallelMap<TSrc, TDst> (this TSrc[] source, Func<TSrc,TDst> transform, int threads) { | ||
cin
|
r15 | if (source == null) | ||
throw new ArgumentNullException("source"); | ||||
if (transform == null) | ||||
throw new ArgumentNullException("transform"); | ||||
var mapper = new ArrayMapper<TSrc, TDst>(source, transform, threads); | ||||
return mapper.Promise; | ||||
} | ||||
cin
|
r30 | public static IPromise<int> ParallelForEach<TSrc>(this TSrc[] source, Action<TSrc> action, int threads) { | ||
cin
|
r15 | if (source == null) | ||
throw new ArgumentNullException("source"); | ||||
if (action == null) | ||||
throw new ArgumentNullException("action"); | ||||
var iter = new ArrayIterator<TSrc>(source, action, threads); | ||||
return iter.Promise; | ||||
} | ||||
cin
|
r16 | |||
cin
|
r76 | public static IPromise<TDst[]> ChainedMap<TSrc, TDst>(this TSrc[] source, ResultMapper<TSrc, IPromise<TDst>> transform, int threads) { | ||
cin
|
r16 | if (source == null) | ||
throw new ArgumentNullException("source"); | ||||
if (transform == null) | ||||
throw new ArgumentNullException("transform"); | ||||
if (threads <= 0) | ||||
throw new ArgumentOutOfRangeException("Threads number must be greater then zero"); | ||||
cin
|
r32 | if (source.Length == 0) | ||
return Promise<TDst[]>.ResultToPromise(new TDst[0]); | ||||
cin
|
r16 | var promise = new Promise<TDst[]>(); | ||
var res = new TDst[source.Length]; | ||||
var pending = source.Length; | ||||
cin
|
r30 | |||
cin
|
r16 | var semaphore = new Semaphore(threads, threads); | ||
cin
|
r75 | // Analysis disable AccessToDisposedClosure | ||
cin
|
r16 | AsyncPool.InvokeNewThread(() => { | ||
for (int i = 0; i < source.Length; i++) { | ||||
cin
|
r19 | if(promise.IsResolved) | ||
cin
|
r16 | break; // stop processing in case of error or cancellation | ||
var idx = i; | ||||
cin
|
r75 | |||
cin
|
r16 | semaphore.WaitOne(); | ||
try { | ||||
var p1 = transform(source[i]); | ||||
cin
|
r76 | p1.Anyway(() => semaphore.Release()); | ||
cin
|
r16 | p1.Then( | ||
x => { | ||||
res[idx] = x; | ||||
var left = Interlocked.Decrement(ref pending); | ||||
if (left == 0) | ||||
promise.Resolve(res); | ||||
}, | ||||
cin
|
r72 | e => { | ||
promise.Reject(e); | ||||
throw new TransientPromiseException(e); | ||||
} | ||||
cin
|
r16 | ); | ||
} catch (Exception e) { | ||||
promise.Reject(e); | ||||
} | ||||
} | ||||
return 0; | ||||
}); | ||||
cin
|
r76 | return promise.Anyway(semaphore.Dispose); | ||
cin
|
r16 | } | ||
cin
|
r24 | |||
cin
|
r15 | } | ||
} | ||||