ArrayTraits.cs
152 lines
| 5.1 KiB
| text/x-csharp
|
CSharpLexer
cin
|
r15 | using System; | ||
using System.Collections.Generic; | ||||
using System.Diagnostics; | ||||
using System.Linq; | ||||
using System.Text; | ||||
using System.Threading; | ||||
namespace Implab.Parallels { | ||||
public static class ArrayTraits { | ||||
class ArrayIterator<TSrc> : DispatchPool<int> { | ||||
readonly Action<TSrc> m_action; | ||||
readonly TSrc[] m_source; | ||||
readonly Promise<int> m_promise = new Promise<int>(); | ||||
int m_pending; | ||||
int m_next; | ||||
public ArrayIterator(TSrc[] source, Action<TSrc> action, int threads) | ||||
: base(threads) { | ||||
Debug.Assert(source != null); | ||||
Debug.Assert(action != null); | ||||
m_next = 0; | ||||
m_source = source; | ||||
m_pending = source.Length; | ||||
m_action = action; | ||||
m_promise.Anyway(() => Dispose()); | ||||
m_promise.Cancelled(() => Dispose()); | ||||
InitPool(); | ||||
} | ||||
public Promise<int> Promise { | ||||
get { | ||||
return m_promise; | ||||
} | ||||
} | ||||
protected override bool TryDequeue(out int unit) { | ||||
int index; | ||||
unit = -1; | ||||
do { | ||||
index = m_next; | ||||
if (index >= m_source.Length) | ||||
return false; | ||||
} while (index != Interlocked.CompareExchange(ref m_next, index + 1, index)); | ||||
unit = index; | ||||
return true; | ||||
} | ||||
protected override void InvokeUnit(int unit) { | ||||
try { | ||||
m_action(m_source[unit]); | ||||
int pending; | ||||
do { | ||||
pending = m_pending; | ||||
} while (pending != Interlocked.CompareExchange(ref m_pending, pending - 1, pending)); | ||||
pending--; | ||||
if (pending == 0) | ||||
m_promise.Resolve(m_source.Length); | ||||
} catch (Exception e) { | ||||
m_promise.Reject(e); | ||||
} | ||||
} | ||||
} | ||||
class ArrayMapper<TSrc, TDst>: DispatchPool<int> { | ||||
readonly Func<TSrc, TDst> m_transform; | ||||
readonly TSrc[] m_source; | ||||
readonly TDst[] m_dest; | ||||
readonly Promise<TDst[]> m_promise = new Promise<TDst[]>(); | ||||
int m_pending; | ||||
int m_next; | ||||
public ArrayMapper(TSrc[] source, Func<TSrc, TDst> transform, int threads) | ||||
: base(threads) { | ||||
Debug.Assert (source != null); | ||||
Debug.Assert( transform != null); | ||||
m_next = 0; | ||||
m_source = source; | ||||
m_dest = new TDst[source.Length]; | ||||
m_pending = source.Length; | ||||
m_transform = transform; | ||||
m_promise.Anyway(() => Dispose()); | ||||
m_promise.Cancelled(() => Dispose()); | ||||
InitPool(); | ||||
} | ||||
public Promise<TDst[]> Promise { | ||||
get { | ||||
return m_promise; | ||||
} | ||||
} | ||||
protected override bool TryDequeue(out int unit) { | ||||
int index; | ||||
unit = -1; | ||||
do { | ||||
index = m_next; | ||||
if (index >= m_source.Length) | ||||
return false; | ||||
} while (index != Interlocked.CompareExchange(ref m_next, index + 1, index)); | ||||
unit = index; | ||||
return true; | ||||
} | ||||
protected override void InvokeUnit(int unit) { | ||||
try { | ||||
m_dest[unit] = m_transform(m_source[unit]); | ||||
int pending; | ||||
do { | ||||
pending = m_pending; | ||||
} while ( pending != Interlocked.CompareExchange(ref m_pending,pending -1, pending)); | ||||
pending --; | ||||
if (pending == 0) | ||||
m_promise.Resolve(m_dest); | ||||
} catch (Exception e) { | ||||
m_promise.Reject(e); | ||||
} | ||||
} | ||||
} | ||||
public static Promise<TDst[]> ParallelMap<TSrc, TDst> (this TSrc[] source, Func<TSrc,TDst> transform, int threads) { | ||||
if (source == null) | ||||
throw new ArgumentNullException("source"); | ||||
if (transform == null) | ||||
throw new ArgumentNullException("transform"); | ||||
var mapper = new ArrayMapper<TSrc, TDst>(source, transform, threads); | ||||
return mapper.Promise; | ||||
} | ||||
public static Promise<int> ParallelForEach<TSrc>(this TSrc[] source, Action<TSrc> action, int threads) { | ||||
if (source == null) | ||||
throw new ArgumentNullException("source"); | ||||
if (action == null) | ||||
throw new ArgumentNullException("action"); | ||||
var iter = new ArrayIterator<TSrc>(source, action, threads); | ||||
return iter.Promise; | ||||
} | ||||
} | ||||
} | ||||