ArrayTraits.cs
212 lines
| 7.8 KiB
| text/x-csharp
|
CSharpLexer
|
|
r15 | using System; | ||
| using System.Collections.Generic; | ||||
| using System.Diagnostics; | ||||
| using System.Linq; | ||||
| using System.Text; | ||||
| using System.Threading; | ||||
| namespace Implab.Parallels { | ||||
| public static class ArrayTraits { | ||||
| class ArrayIterator<TSrc> : DispatchPool<int> { | ||||
| readonly Action<TSrc> m_action; | ||||
| readonly TSrc[] m_source; | ||||
| readonly Promise<int> m_promise = new Promise<int>(); | ||||
| int m_pending; | ||||
| int m_next; | ||||
| public ArrayIterator(TSrc[] source, Action<TSrc> action, int threads) | ||||
| : base(threads) { | ||||
| Debug.Assert(source != null); | ||||
| Debug.Assert(action != null); | ||||
| m_next = 0; | ||||
| m_source = source; | ||||
| m_pending = source.Length; | ||||
| m_action = action; | ||||
| m_promise.Anyway(() => Dispose()); | ||||
| m_promise.Cancelled(() => Dispose()); | ||||
| InitPool(); | ||||
| } | ||||
| public Promise<int> Promise { | ||||
| get { | ||||
| return m_promise; | ||||
| } | ||||
| } | ||||
| protected override bool TryDequeue(out int unit) { | ||||
|
|
r16 | unit = Interlocked.Increment(ref m_next) - 1; | ||
| return unit >= m_source.Length ? false : true; | ||||
|
|
r15 | } | ||
| protected override void InvokeUnit(int unit) { | ||||
| try { | ||||
| m_action(m_source[unit]); | ||||
|
|
r16 | var pending = Interlocked.Decrement(ref m_pending); | ||
|
|
r15 | if (pending == 0) | ||
| m_promise.Resolve(m_source.Length); | ||||
| } catch (Exception e) { | ||||
| m_promise.Reject(e); | ||||
| } | ||||
| } | ||||
| } | ||||
| class ArrayMapper<TSrc, TDst>: DispatchPool<int> { | ||||
| readonly Func<TSrc, TDst> m_transform; | ||||
| readonly TSrc[] m_source; | ||||
| readonly TDst[] m_dest; | ||||
| readonly Promise<TDst[]> m_promise = new Promise<TDst[]>(); | ||||
| int m_pending; | ||||
| int m_next; | ||||
| public ArrayMapper(TSrc[] source, Func<TSrc, TDst> transform, int threads) | ||||
| : base(threads) { | ||||
| Debug.Assert (source != null); | ||||
| Debug.Assert( transform != null); | ||||
| m_next = 0; | ||||
| m_source = source; | ||||
| m_dest = new TDst[source.Length]; | ||||
| m_pending = source.Length; | ||||
| m_transform = transform; | ||||
| m_promise.Anyway(() => Dispose()); | ||||
| m_promise.Cancelled(() => Dispose()); | ||||
| InitPool(); | ||||
| } | ||||
| public Promise<TDst[]> Promise { | ||||
| get { | ||||
| return m_promise; | ||||
| } | ||||
| } | ||||
| protected override bool TryDequeue(out int unit) { | ||||
|
|
r16 | unit = Interlocked.Increment(ref m_next) - 1; | ||
| return unit >= m_source.Length ? false : true; | ||||
|
|
r15 | } | ||
| protected override void InvokeUnit(int unit) { | ||||
| try { | ||||
| m_dest[unit] = m_transform(m_source[unit]); | ||||
|
|
r16 | var pending = Interlocked.Decrement(ref m_pending); | ||
|
|
r15 | if (pending == 0) | ||
| m_promise.Resolve(m_dest); | ||||
| } catch (Exception e) { | ||||
| m_promise.Reject(e); | ||||
| } | ||||
| } | ||||
| } | ||||
| public static Promise<TDst[]> ParallelMap<TSrc, TDst> (this TSrc[] source, Func<TSrc,TDst> transform, int threads) { | ||||
| if (source == null) | ||||
| throw new ArgumentNullException("source"); | ||||
| if (transform == null) | ||||
| throw new ArgumentNullException("transform"); | ||||
| var mapper = new ArrayMapper<TSrc, TDst>(source, transform, threads); | ||||
| return mapper.Promise; | ||||
| } | ||||
| public static Promise<int> ParallelForEach<TSrc>(this TSrc[] source, Action<TSrc> action, int threads) { | ||||
| if (source == null) | ||||
| throw new ArgumentNullException("source"); | ||||
| if (action == null) | ||||
| throw new ArgumentNullException("action"); | ||||
| var iter = new ArrayIterator<TSrc>(source, action, threads); | ||||
| return iter.Promise; | ||||
| } | ||||
|
|
r16 | |||
| public static Promise<TDst[]> ChainedMap<TSrc, TDst>(this TSrc[] source, ChainedOperation<TSrc, TDst> transform, int threads) { | ||||
| if (source == null) | ||||
| throw new ArgumentNullException("source"); | ||||
| if (transform == null) | ||||
| throw new ArgumentNullException("transform"); | ||||
| if (threads <= 0) | ||||
| throw new ArgumentOutOfRangeException("Threads number must be greater then zero"); | ||||
| var promise = new Promise<TDst[]>(); | ||||
| var res = new TDst[source.Length]; | ||||
| var pending = source.Length; | ||||
| var semaphore = new Semaphore(threads, threads); | ||||
| AsyncPool.InvokeNewThread(() => { | ||||
| for (int i = 0; i < source.Length; i++) { | ||||
|
|
r19 | if(promise.IsResolved) | ||
|
|
r16 | break; // stop processing in case of error or cancellation | ||
| var idx = i; | ||||
| semaphore.WaitOne(); | ||||
| try { | ||||
| var p1 = transform(source[i]); | ||||
| p1.Anyway(() => semaphore.Release()); | ||||
| p1.Cancelled(() => semaphore.Release()); | ||||
| p1.Then( | ||||
| x => { | ||||
| res[idx] = x; | ||||
| var left = Interlocked.Decrement(ref pending); | ||||
| if (left == 0) | ||||
| promise.Resolve(res); | ||||
| }, | ||||
| e => promise.Reject(e) | ||||
| ); | ||||
| } catch (Exception e) { | ||||
| promise.Reject(e); | ||||
| } | ||||
| } | ||||
| return 0; | ||||
| }); | ||||
| return promise.Anyway(() => semaphore.Dispose()); | ||||
| } | ||||
|
|
r24 | |||
| /* | ||||
| this method is pretty fast, but it may cause a stack overflow if an element transformation is made faster then the next operation is | ||||
| be chained, in this case the syncronous callback invocation will occur | ||||
| public static Promise<TDst[]> ChainedMap2<TSrc, TDst>(this TSrc[] source, ChainedOperation<TSrc, TDst> transform, int threads) { | ||||
| if (source == null) | ||||
| throw new ArgumentNullException("source"); | ||||
| if (transform == null) | ||||
| throw new ArgumentNullException("transform"); | ||||
| if (threads <= 0) | ||||
| throw new ArgumentOutOfRangeException("Threads number must be greater then zero"); | ||||
| var promise = new Promise<TDst[]>(); | ||||
| var res = new TDst[source.Length]; | ||||
| var index = -1; // we will start with increment | ||||
| var len = source.Length; | ||||
| var pending = len; | ||||
| Action<int> callback = null; | ||||
| callback = (current) => { | ||||
| if (current < len) { | ||||
| transform(source[current]) | ||||
| .Then( | ||||
| x => { | ||||
| res[current] = x; | ||||
| if (Interlocked.Decrement(ref pending) == 0) | ||||
| promise.Resolve(res); | ||||
| else | ||||
| callback(Interlocked.Increment(ref index)); | ||||
| }, | ||||
| e => promise.Reject(e) | ||||
| ); | ||||
| } | ||||
| }; | ||||
| for (int i = 0; i < threads; i++) | ||||
| callback(Interlocked.Increment(ref index)); | ||||
| return promise; | ||||
| } | ||||
| */ | ||||
|
|
r15 | } | ||
| } | ||||
