##// END OF EJS Templates
Added Skip method to JSON parser to skip contents of the current node
Added Skip method to JSON parser to skip contents of the current node

File last commit:

r48:d9d794b61bb9 interactive logger
r62:62b440d46313 default
Show More
ArrayTraits.cs
191 lines | 6.7 KiB | text/x-csharp | CSharpLexer
cin
Added TraceContext support to array traits
r41 using Implab.Diagnostics;
using System;
cin
implemented parallel map and foreach for arrays...
r15 using System.Collections.Generic;
using System.Diagnostics;
using System.Linq;
using System.Text;
using System.Threading;
namespace Implab.Parallels {
public static class ArrayTraits {
class ArrayIterator<TSrc> : DispatchPool<int> {
readonly Action<TSrc> m_action;
readonly TSrc[] m_source;
readonly Promise<int> m_promise = new Promise<int>();
cin
Added TraceContext support to array traits
r41 readonly TraceContext m_traceContext;
cin
implemented parallel map and foreach for arrays...
r15
int m_pending;
int m_next;
public ArrayIterator(TSrc[] source, Action<TSrc> action, int threads)
: base(threads) {
Debug.Assert(source != null);
Debug.Assert(action != null);
cin
Added TraceContext support to array traits
r41 m_traceContext = TraceContext.Snapshot();
cin
implemented parallel map and foreach for arrays...
r15 m_next = 0;
m_source = source;
m_pending = source.Length;
m_action = action;
m_promise.Anyway(() => Dispose());
m_promise.Cancelled(() => Dispose());
InitPool();
}
public Promise<int> Promise {
get {
return m_promise;
}
}
cin
Added TraceContext support to array traits
r41 protected override void Worker() {
cin
Interactive tracing...
r48 TraceContext.Fork(m_traceContext);
cin
Added TraceContext support to array traits
r41 base.Worker();
}
cin
implemented parallel map and foreach for arrays...
r15 protected override bool TryDequeue(out int unit) {
cin
sync
r16 unit = Interlocked.Increment(ref m_next) - 1;
return unit >= m_source.Length ? false : true;
cin
implemented parallel map and foreach for arrays...
r15 }
protected override void InvokeUnit(int unit) {
try {
m_action(m_source[unit]);
cin
sync
r16 var pending = Interlocked.Decrement(ref m_pending);
cin
implemented parallel map and foreach for arrays...
r15 if (pending == 0)
m_promise.Resolve(m_source.Length);
} catch (Exception e) {
m_promise.Reject(e);
}
}
}
class ArrayMapper<TSrc, TDst>: DispatchPool<int> {
readonly Func<TSrc, TDst> m_transform;
readonly TSrc[] m_source;
readonly TDst[] m_dest;
readonly Promise<TDst[]> m_promise = new Promise<TDst[]>();
cin
Added TraceContext support to array traits
r41 readonly TraceContext m_traceContext;
cin
implemented parallel map and foreach for arrays...
r15
int m_pending;
int m_next;
public ArrayMapper(TSrc[] source, Func<TSrc, TDst> transform, int threads)
: base(threads) {
Debug.Assert (source != null);
Debug.Assert( transform != null);
m_next = 0;
m_source = source;
m_dest = new TDst[source.Length];
m_pending = source.Length;
m_transform = transform;
cin
Added TraceContext support to array traits
r41 m_traceContext = TraceContext.Snapshot();
cin
implemented parallel map and foreach for arrays...
r15
m_promise.Anyway(() => Dispose());
m_promise.Cancelled(() => Dispose());
InitPool();
}
public Promise<TDst[]> Promise {
get {
return m_promise;
}
}
cin
Added TraceContext support to array traits
r41 protected override void Worker() {
cin
Interactive tracing...
r48 TraceContext.Fork(m_traceContext);
cin
Added TraceContext support to array traits
r41 base.Worker();
}
cin
implemented parallel map and foreach for arrays...
r15 protected override bool TryDequeue(out int unit) {
cin
sync
r16 unit = Interlocked.Increment(ref m_next) - 1;
return unit >= m_source.Length ? false : true;
cin
implemented parallel map and foreach for arrays...
r15 }
protected override void InvokeUnit(int unit) {
try {
m_dest[unit] = m_transform(m_source[unit]);
cin
sync
r16 var pending = Interlocked.Decrement(ref m_pending);
cin
implemented parallel map and foreach for arrays...
r15 if (pending == 0)
m_promise.Resolve(m_dest);
} catch (Exception e) {
m_promise.Reject(e);
}
}
}
cin
small refactoring, cleanup.
r30 public static IPromise<TDst[]> ParallelMap<TSrc, TDst> (this TSrc[] source, Func<TSrc,TDst> transform, int threads) {
cin
implemented parallel map and foreach for arrays...
r15 if (source == null)
throw new ArgumentNullException("source");
if (transform == null)
throw new ArgumentNullException("transform");
var mapper = new ArrayMapper<TSrc, TDst>(source, transform, threads);
return mapper.Promise;
}
cin
small refactoring, cleanup.
r30 public static IPromise<int> ParallelForEach<TSrc>(this TSrc[] source, Action<TSrc> action, int threads) {
cin
implemented parallel map and foreach for arrays...
r15 if (source == null)
throw new ArgumentNullException("source");
if (action == null)
throw new ArgumentNullException("action");
var iter = new ArrayIterator<TSrc>(source, action, threads);
return iter.Promise;
}
cin
sync
r16
cin
refactoring
r26 public static IPromise<TDst[]> ChainedMap<TSrc, TDst>(this TSrc[] source, ChainedOperation<TSrc, TDst> transform, int threads) {
cin
sync
r16 if (source == null)
throw new ArgumentNullException("source");
if (transform == null)
throw new ArgumentNullException("transform");
if (threads <= 0)
throw new ArgumentOutOfRangeException("Threads number must be greater then zero");
cin
fixed: StackOverflow in IPromiseBase.Then(handler)...
r32 if (source.Length == 0)
return Promise<TDst[]>.ResultToPromise(new TDst[0]);
cin
sync
r16 var promise = new Promise<TDst[]>();
var res = new TDst[source.Length];
var pending = source.Length;
cin
small refactoring, cleanup.
r30
cin
sync
r16 var semaphore = new Semaphore(threads, threads);
AsyncPool.InvokeNewThread(() => {
for (int i = 0; i < source.Length; i++) {
cin
Promise is rewritten to use interlocked operations instead of locks
r19 if(promise.IsResolved)
cin
sync
r16 break; // stop processing in case of error or cancellation
var idx = i;
semaphore.WaitOne();
try {
var p1 = transform(source[i]);
p1.Anyway(() => semaphore.Release());
p1.Cancelled(() => semaphore.Release());
p1.Then(
x => {
res[idx] = x;
var left = Interlocked.Decrement(ref pending);
if (left == 0)
promise.Resolve(res);
},
e => promise.Reject(e)
);
} catch (Exception e) {
promise.Reject(e);
}
}
return 0;
});
return promise.Anyway(() => semaphore.Dispose());
}
cin
fixed dispatch pool race condition
r24
cin
implemented parallel map and foreach for arrays...
r15 }
}