ArrayTraits.cs
173 lines
| 6.1 KiB
| text/x-csharp
|
CSharpLexer
cin
|
r15 | using System; | ||
using System.Collections.Generic; | ||||
using System.Diagnostics; | ||||
using System.Linq; | ||||
using System.Text; | ||||
using System.Threading; | ||||
namespace Implab.Parallels { | ||||
public static class ArrayTraits { | ||||
class ArrayIterator<TSrc> : DispatchPool<int> { | ||||
readonly Action<TSrc> m_action; | ||||
readonly TSrc[] m_source; | ||||
readonly Promise<int> m_promise = new Promise<int>(); | ||||
int m_pending; | ||||
int m_next; | ||||
public ArrayIterator(TSrc[] source, Action<TSrc> action, int threads) | ||||
: base(threads) { | ||||
Debug.Assert(source != null); | ||||
Debug.Assert(action != null); | ||||
m_next = 0; | ||||
m_source = source; | ||||
m_pending = source.Length; | ||||
m_action = action; | ||||
m_promise.Anyway(() => Dispose()); | ||||
m_promise.Cancelled(() => Dispose()); | ||||
InitPool(); | ||||
} | ||||
public Promise<int> Promise { | ||||
get { | ||||
return m_promise; | ||||
} | ||||
} | ||||
protected override bool TryDequeue(out int unit) { | ||||
cin
|
r16 | unit = Interlocked.Increment(ref m_next) - 1; | ||
return unit >= m_source.Length ? false : true; | ||||
cin
|
r15 | } | ||
protected override void InvokeUnit(int unit) { | ||||
try { | ||||
m_action(m_source[unit]); | ||||
cin
|
r16 | var pending = Interlocked.Decrement(ref m_pending); | ||
cin
|
r15 | if (pending == 0) | ||
m_promise.Resolve(m_source.Length); | ||||
} catch (Exception e) { | ||||
m_promise.Reject(e); | ||||
} | ||||
} | ||||
} | ||||
class ArrayMapper<TSrc, TDst>: DispatchPool<int> { | ||||
readonly Func<TSrc, TDst> m_transform; | ||||
readonly TSrc[] m_source; | ||||
readonly TDst[] m_dest; | ||||
readonly Promise<TDst[]> m_promise = new Promise<TDst[]>(); | ||||
int m_pending; | ||||
int m_next; | ||||
public ArrayMapper(TSrc[] source, Func<TSrc, TDst> transform, int threads) | ||||
: base(threads) { | ||||
Debug.Assert (source != null); | ||||
Debug.Assert( transform != null); | ||||
m_next = 0; | ||||
m_source = source; | ||||
m_dest = new TDst[source.Length]; | ||||
m_pending = source.Length; | ||||
m_transform = transform; | ||||
m_promise.Anyway(() => Dispose()); | ||||
m_promise.Cancelled(() => Dispose()); | ||||
InitPool(); | ||||
} | ||||
public Promise<TDst[]> Promise { | ||||
get { | ||||
return m_promise; | ||||
} | ||||
} | ||||
protected override bool TryDequeue(out int unit) { | ||||
cin
|
r16 | unit = Interlocked.Increment(ref m_next) - 1; | ||
return unit >= m_source.Length ? false : true; | ||||
cin
|
r15 | } | ||
protected override void InvokeUnit(int unit) { | ||||
try { | ||||
m_dest[unit] = m_transform(m_source[unit]); | ||||
cin
|
r16 | var pending = Interlocked.Decrement(ref m_pending); | ||
cin
|
r15 | if (pending == 0) | ||
m_promise.Resolve(m_dest); | ||||
} catch (Exception e) { | ||||
m_promise.Reject(e); | ||||
} | ||||
} | ||||
} | ||||
cin
|
r30 | public static IPromise<TDst[]> ParallelMap<TSrc, TDst> (this TSrc[] source, Func<TSrc,TDst> transform, int threads) { | ||
cin
|
r15 | if (source == null) | ||
throw new ArgumentNullException("source"); | ||||
if (transform == null) | ||||
throw new ArgumentNullException("transform"); | ||||
var mapper = new ArrayMapper<TSrc, TDst>(source, transform, threads); | ||||
return mapper.Promise; | ||||
} | ||||
cin
|
r30 | public static IPromise<int> ParallelForEach<TSrc>(this TSrc[] source, Action<TSrc> action, int threads) { | ||
cin
|
r15 | if (source == null) | ||
throw new ArgumentNullException("source"); | ||||
if (action == null) | ||||
throw new ArgumentNullException("action"); | ||||
var iter = new ArrayIterator<TSrc>(source, action, threads); | ||||
return iter.Promise; | ||||
} | ||||
cin
|
r16 | |||
cin
|
r26 | public static IPromise<TDst[]> ChainedMap<TSrc, TDst>(this TSrc[] source, ChainedOperation<TSrc, TDst> transform, int threads) { | ||
cin
|
r16 | if (source == null) | ||
throw new ArgumentNullException("source"); | ||||
if (transform == null) | ||||
throw new ArgumentNullException("transform"); | ||||
if (threads <= 0) | ||||
throw new ArgumentOutOfRangeException("Threads number must be greater then zero"); | ||||
var promise = new Promise<TDst[]>(); | ||||
var res = new TDst[source.Length]; | ||||
var pending = source.Length; | ||||
cin
|
r30 | |||
cin
|
r16 | var semaphore = new Semaphore(threads, threads); | ||
AsyncPool.InvokeNewThread(() => { | ||||
for (int i = 0; i < source.Length; i++) { | ||||
cin
|
r19 | if(promise.IsResolved) | ||
cin
|
r16 | break; // stop processing in case of error or cancellation | ||
var idx = i; | ||||
semaphore.WaitOne(); | ||||
try { | ||||
var p1 = transform(source[i]); | ||||
p1.Anyway(() => semaphore.Release()); | ||||
p1.Cancelled(() => semaphore.Release()); | ||||
p1.Then( | ||||
x => { | ||||
res[idx] = x; | ||||
var left = Interlocked.Decrement(ref pending); | ||||
if (left == 0) | ||||
promise.Resolve(res); | ||||
}, | ||||
e => promise.Reject(e) | ||||
); | ||||
} catch (Exception e) { | ||||
promise.Reject(e); | ||||
} | ||||
} | ||||
return 0; | ||||
}); | ||||
return promise.Anyway(() => semaphore.Dispose()); | ||||
} | ||||
cin
|
r24 | |||
cin
|
r15 | } | ||
} | ||||