ArrayTraits.cs
207 lines
| 7.3 KiB
| text/x-csharp
|
CSharpLexer
cin
|
r41 | using Implab.Diagnostics; | ||
using System; | ||||
cin
|
r15 | using System.Diagnostics; | ||
using System.Threading; | ||||
namespace Implab.Parallels { | ||||
public static class ArrayTraits { | ||||
class ArrayIterator<TSrc> : DispatchPool<int> { | ||||
readonly Action<TSrc> m_action; | ||||
readonly TSrc[] m_source; | ||||
readonly Promise<int> m_promise = new Promise<int>(); | ||||
cin
|
r92 | readonly LogicalOperation m_logicalOperation; | ||
cin
|
r15 | |||
int m_pending; | ||||
int m_next; | ||||
public ArrayIterator(TSrc[] source, Action<TSrc> action, int threads) | ||||
: base(threads) { | ||||
Debug.Assert(source != null); | ||||
Debug.Assert(action != null); | ||||
cin
|
r92 | m_logicalOperation = TraceContext.Instance.CurrentOperation; | ||
cin
|
r15 | m_next = 0; | ||
m_source = source; | ||||
m_pending = source.Length; | ||||
m_action = action; | ||||
cin
|
r119 | m_promise.On(Dispose, PromiseEventType.All); | ||
cin
|
r15 | |||
InitPool(); | ||||
} | ||||
public Promise<int> Promise { | ||||
get { | ||||
return m_promise; | ||||
} | ||||
} | ||||
cin
|
r41 | protected override void Worker() { | ||
cin
|
r92 | TraceContext.Instance.EnterLogicalOperation(m_logicalOperation, false); | ||
try { | ||||
base.Worker(); | ||||
} finally { | ||||
TraceContext.Instance.Leave(); | ||||
} | ||||
cin
|
r41 | } | ||
cin
|
r15 | protected override bool TryDequeue(out int unit) { | ||
cin
|
r16 | unit = Interlocked.Increment(ref m_next) - 1; | ||
cin
|
r75 | return unit < m_source.Length; | ||
cin
|
r15 | } | ||
protected override void InvokeUnit(int unit) { | ||||
try { | ||||
m_action(m_source[unit]); | ||||
cin
|
r16 | var pending = Interlocked.Decrement(ref m_pending); | ||
cin
|
r15 | if (pending == 0) | ||
m_promise.Resolve(m_source.Length); | ||||
} catch (Exception e) { | ||||
m_promise.Reject(e); | ||||
} | ||||
} | ||||
} | ||||
class ArrayMapper<TSrc, TDst>: DispatchPool<int> { | ||||
readonly Func<TSrc, TDst> m_transform; | ||||
readonly TSrc[] m_source; | ||||
readonly TDst[] m_dest; | ||||
readonly Promise<TDst[]> m_promise = new Promise<TDst[]>(); | ||||
cin
|
r92 | readonly LogicalOperation m_logicalOperation; | ||
cin
|
r15 | |||
int m_pending; | ||||
int m_next; | ||||
public ArrayMapper(TSrc[] source, Func<TSrc, TDst> transform, int threads) | ||||
: base(threads) { | ||||
Debug.Assert (source != null); | ||||
Debug.Assert( transform != null); | ||||
m_next = 0; | ||||
m_source = source; | ||||
m_dest = new TDst[source.Length]; | ||||
m_pending = source.Length; | ||||
m_transform = transform; | ||||
cin
|
r92 | m_logicalOperation = TraceContext.Instance.CurrentOperation; | ||
cin
|
r15 | |||
cin
|
r119 | m_promise.On(Dispose, PromiseEventType.All); | ||
cin
|
r15 | |||
InitPool(); | ||||
} | ||||
public Promise<TDst[]> Promise { | ||||
get { | ||||
return m_promise; | ||||
} | ||||
} | ||||
cin
|
r41 | protected override void Worker() { | ||
cin
|
r92 | TraceContext.Instance.EnterLogicalOperation(m_logicalOperation,false); | ||
try { | ||||
base.Worker(); | ||||
} finally { | ||||
TraceContext.Instance.Leave(); | ||||
} | ||||
cin
|
r41 | } | ||
cin
|
r15 | protected override bool TryDequeue(out int unit) { | ||
cin
|
r16 | unit = Interlocked.Increment(ref m_next) - 1; | ||
cin
|
r92 | return unit < m_source.Length; | ||
cin
|
r15 | } | ||
protected override void InvokeUnit(int unit) { | ||||
try { | ||||
m_dest[unit] = m_transform(m_source[unit]); | ||||
cin
|
r16 | var pending = Interlocked.Decrement(ref m_pending); | ||
cin
|
r15 | if (pending == 0) | ||
m_promise.Resolve(m_dest); | ||||
} catch (Exception e) { | ||||
m_promise.Reject(e); | ||||
} | ||||
} | ||||
} | ||||
cin
|
r30 | public static IPromise<TDst[]> ParallelMap<TSrc, TDst> (this TSrc[] source, Func<TSrc,TDst> transform, int threads) { | ||
cin
|
r15 | if (source == null) | ||
throw new ArgumentNullException("source"); | ||||
if (transform == null) | ||||
throw new ArgumentNullException("transform"); | ||||
var mapper = new ArrayMapper<TSrc, TDst>(source, transform, threads); | ||||
return mapper.Promise; | ||||
} | ||||
cin
|
r30 | public static IPromise<int> ParallelForEach<TSrc>(this TSrc[] source, Action<TSrc> action, int threads) { | ||
cin
|
r15 | if (source == null) | ||
throw new ArgumentNullException("source"); | ||||
if (action == null) | ||||
throw new ArgumentNullException("action"); | ||||
var iter = new ArrayIterator<TSrc>(source, action, threads); | ||||
return iter.Promise; | ||||
} | ||||
cin
|
r16 | |||
cin
|
r101 | public static IPromise<TDst[]> ChainedMap<TSrc, TDst>(this TSrc[] source, Func<TSrc, IPromise<TDst>> transform, int threads) { | ||
cin
|
r16 | if (source == null) | ||
throw new ArgumentNullException("source"); | ||||
if (transform == null) | ||||
throw new ArgumentNullException("transform"); | ||||
if (threads <= 0) | ||||
cin
|
r101 | throw new ArgumentOutOfRangeException("threads","Threads number must be greater then zero"); | ||
cin
|
r16 | |||
cin
|
r32 | if (source.Length == 0) | ||
return Promise<TDst[]>.ResultToPromise(new TDst[0]); | ||||
cin
|
r16 | var promise = new Promise<TDst[]>(); | ||
var res = new TDst[source.Length]; | ||||
var pending = source.Length; | ||||
cin
|
r30 | |||
cin
|
r80 | object locker = new object(); | ||
int slots = threads; | ||||
cin
|
r16 | |||
cin
|
r75 | // Analysis disable AccessToDisposedClosure | ||
cin
|
r119 | AsyncPool.InvokeNewThread<int>(() => { | ||
cin
|
r16 | for (int i = 0; i < source.Length; i++) { | ||
cin
|
r19 | if(promise.IsResolved) | ||
cin
|
r16 | break; // stop processing in case of error or cancellation | ||
var idx = i; | ||||
cin
|
r75 | |||
cin
|
r89 | if (Interlocked.Decrement(ref slots) < 0) { | ||
lock(locker) { | ||||
while(slots < 0) | ||||
Monitor.Wait(locker); | ||||
} | ||||
cin
|
r80 | } | ||
cin
|
r89 | |||
cin
|
r16 | try { | ||
cin
|
r80 | transform(source[i]) | ||
cin
|
r119 | .On( x => { | ||
cin
|
r89 | Interlocked.Increment(ref slots); | ||
lock (locker) { | ||||
cin
|
r80 | Monitor.Pulse(locker); | ||
} | ||||
}) | ||||
cin
|
r104 | .On( | ||
cin
|
r80 | x => { | ||
res[idx] = x; | ||||
var left = Interlocked.Decrement(ref pending); | ||||
if (left == 0) | ||||
promise.Resolve(res); | ||||
}, | ||||
cin
|
r89 | promise.Reject | ||
cin
|
r80 | ); | ||
cin
|
r16 | |||
} catch (Exception e) { | ||||
promise.Reject(e); | ||||
} | ||||
} | ||||
return 0; | ||||
}); | ||||
cin
|
r80 | return promise; | ||
cin
|
r16 | } | ||
cin
|
r24 | |||
cin
|
r15 | } | ||
} | ||||