##// END OF EJS Templates
implemented parallel map and foreach for arrays...
implemented parallel map and foreach for arrays rewritten WorkerPool with MTQueue for more efficiency

File last commit:

r15:0f982f9b7d4d promises
r15:0f982f9b7d4d promises
Show More
ArrayTraits.cs
152 lines | 5.1 KiB | text/x-csharp | CSharpLexer
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Linq;
using System.Text;
using System.Threading;
namespace Implab.Parallels {
public static class ArrayTraits {
class ArrayIterator<TSrc> : DispatchPool<int> {
readonly Action<TSrc> m_action;
readonly TSrc[] m_source;
readonly Promise<int> m_promise = new Promise<int>();
int m_pending;
int m_next;
public ArrayIterator(TSrc[] source, Action<TSrc> action, int threads)
: base(threads) {
Debug.Assert(source != null);
Debug.Assert(action != null);
m_next = 0;
m_source = source;
m_pending = source.Length;
m_action = action;
m_promise.Anyway(() => Dispose());
m_promise.Cancelled(() => Dispose());
InitPool();
}
public Promise<int> Promise {
get {
return m_promise;
}
}
protected override bool TryDequeue(out int unit) {
int index;
unit = -1;
do {
index = m_next;
if (index >= m_source.Length)
return false;
} while (index != Interlocked.CompareExchange(ref m_next, index + 1, index));
unit = index;
return true;
}
protected override void InvokeUnit(int unit) {
try {
m_action(m_source[unit]);
int pending;
do {
pending = m_pending;
} while (pending != Interlocked.CompareExchange(ref m_pending, pending - 1, pending));
pending--;
if (pending == 0)
m_promise.Resolve(m_source.Length);
} catch (Exception e) {
m_promise.Reject(e);
}
}
}
class ArrayMapper<TSrc, TDst>: DispatchPool<int> {
readonly Func<TSrc, TDst> m_transform;
readonly TSrc[] m_source;
readonly TDst[] m_dest;
readonly Promise<TDst[]> m_promise = new Promise<TDst[]>();
int m_pending;
int m_next;
public ArrayMapper(TSrc[] source, Func<TSrc, TDst> transform, int threads)
: base(threads) {
Debug.Assert (source != null);
Debug.Assert( transform != null);
m_next = 0;
m_source = source;
m_dest = new TDst[source.Length];
m_pending = source.Length;
m_transform = transform;
m_promise.Anyway(() => Dispose());
m_promise.Cancelled(() => Dispose());
InitPool();
}
public Promise<TDst[]> Promise {
get {
return m_promise;
}
}
protected override bool TryDequeue(out int unit) {
int index;
unit = -1;
do {
index = m_next;
if (index >= m_source.Length)
return false;
} while (index != Interlocked.CompareExchange(ref m_next, index + 1, index));
unit = index;
return true;
}
protected override void InvokeUnit(int unit) {
try {
m_dest[unit] = m_transform(m_source[unit]);
int pending;
do {
pending = m_pending;
} while ( pending != Interlocked.CompareExchange(ref m_pending,pending -1, pending));
pending --;
if (pending == 0)
m_promise.Resolve(m_dest);
} catch (Exception e) {
m_promise.Reject(e);
}
}
}
public static Promise<TDst[]> ParallelMap<TSrc, TDst> (this TSrc[] source, Func<TSrc,TDst> transform, int threads) {
if (source == null)
throw new ArgumentNullException("source");
if (transform == null)
throw new ArgumentNullException("transform");
var mapper = new ArrayMapper<TSrc, TDst>(source, transform, threads);
return mapper.Promise;
}
public static Promise<int> ParallelForEach<TSrc>(this TSrc[] source, Action<TSrc> action, int threads) {
if (source == null)
throw new ArgumentNullException("source");
if (action == null)
throw new ArgumentNullException("action");
var iter = new ArrayIterator<TSrc>(source, action, threads);
return iter.Promise;
}
}
}