using System; using System.Collections.Generic; using System.Diagnostics; using System.Linq; using System.Text; using System.Threading; namespace Implab.Parallels { public static class ArrayTraits { class ArrayIterator : DispatchPool { readonly Action m_action; readonly TSrc[] m_source; readonly Promise m_promise = new Promise(); int m_pending; int m_next; public ArrayIterator(TSrc[] source, Action action, int threads) : base(threads) { Debug.Assert(source != null); Debug.Assert(action != null); m_next = 0; m_source = source; m_pending = source.Length; m_action = action; m_promise.Anyway(() => Dispose()); m_promise.Cancelled(() => Dispose()); InitPool(); } public Promise Promise { get { return m_promise; } } protected override bool TryDequeue(out int unit) { int index; unit = -1; do { index = m_next; if (index >= m_source.Length) return false; } while (index != Interlocked.CompareExchange(ref m_next, index + 1, index)); unit = index; return true; } protected override void InvokeUnit(int unit) { try { m_action(m_source[unit]); int pending; do { pending = m_pending; } while (pending != Interlocked.CompareExchange(ref m_pending, pending - 1, pending)); pending--; if (pending == 0) m_promise.Resolve(m_source.Length); } catch (Exception e) { m_promise.Reject(e); } } } class ArrayMapper: DispatchPool { readonly Func m_transform; readonly TSrc[] m_source; readonly TDst[] m_dest; readonly Promise m_promise = new Promise(); int m_pending; int m_next; public ArrayMapper(TSrc[] source, Func transform, int threads) : base(threads) { Debug.Assert (source != null); Debug.Assert( transform != null); m_next = 0; m_source = source; m_dest = new TDst[source.Length]; m_pending = source.Length; m_transform = transform; m_promise.Anyway(() => Dispose()); m_promise.Cancelled(() => Dispose()); InitPool(); } public Promise Promise { get { return m_promise; } } protected override bool TryDequeue(out int unit) { int index; unit = -1; do { index = m_next; if (index >= m_source.Length) return false; } while (index != Interlocked.CompareExchange(ref m_next, index + 1, index)); unit = index; return true; } protected override void InvokeUnit(int unit) { try { m_dest[unit] = m_transform(m_source[unit]); int pending; do { pending = m_pending; } while ( pending != Interlocked.CompareExchange(ref m_pending,pending -1, pending)); pending --; if (pending == 0) m_promise.Resolve(m_dest); } catch (Exception e) { m_promise.Reject(e); } } } public static Promise ParallelMap (this TSrc[] source, Func transform, int threads) { if (source == null) throw new ArgumentNullException("source"); if (transform == null) throw new ArgumentNullException("transform"); var mapper = new ArrayMapper(source, transform, threads); return mapper.Promise; } public static Promise ParallelForEach(this TSrc[] source, Action action, int threads) { if (source == null) throw new ArgumentNullException("source"); if (action == null) throw new ArgumentNullException("action"); var iter = new ArrayIterator(source, action, threads); return iter.Promise; } } }