##// END OF EJS Templates
fixed promise chaining behavior, the error handler doesn't handle result or cancellation handlers exceptions these exceptions are propagated to the next handlers.
fixed promise chaining behavior, the error handler doesn't handle result or cancellation handlers exceptions these exceptions are propagated to the next handlers.

File last commit:

r145:706fccb85524 v2
r196:40d7fed4a09e default
Show More
ArrayTraits.cs
207 lines | 7.3 KiB | text/x-csharp | CSharpLexer
cin
Added TraceContext support to array traits
r41 using Implab.Diagnostics;
using System;
cin
implemented parallel map and foreach for arrays...
r15 using System.Diagnostics;
using System.Threading;
namespace Implab.Parallels {
public static class ArrayTraits {
class ArrayIterator<TSrc> : DispatchPool<int> {
readonly Action<TSrc> m_action;
readonly TSrc[] m_source;
readonly Promise<int> m_promise = new Promise<int>();
cin
rewritten tracing
r92 readonly LogicalOperation m_logicalOperation;
cin
implemented parallel map and foreach for arrays...
r15
int m_pending;
int m_next;
public ArrayIterator(TSrc[] source, Action<TSrc> action, int threads)
: base(threads) {
Debug.Assert(source != null);
Debug.Assert(action != null);
cin
rewritten tracing
r92 m_logicalOperation = TraceContext.Instance.CurrentOperation;
cin
implemented parallel map and foreach for arrays...
r15 m_next = 0;
m_source = source;
m_pending = source.Length;
m_action = action;
cin
Promises rewritten, added improved version of AsyncQueue
r119 m_promise.On(Dispose, PromiseEventType.All);
cin
implemented parallel map and foreach for arrays...
r15
InitPool();
}
public Promise<int> Promise {
get {
return m_promise;
}
}
cin
Added TraceContext support to array traits
r41 protected override void Worker() {
cin
rewritten tracing
r92 TraceContext.Instance.EnterLogicalOperation(m_logicalOperation, false);
try {
base.Worker();
} finally {
TraceContext.Instance.Leave();
}
cin
Added TraceContext support to array traits
r41 }
cin
implemented parallel map and foreach for arrays...
r15 protected override bool TryDequeue(out int unit) {
cin
sync
r16 unit = Interlocked.Increment(ref m_next) - 1;
cin
major refactoring, added tasks support
r75 return unit < m_source.Length;
cin
implemented parallel map and foreach for arrays...
r15 }
protected override void InvokeUnit(int unit) {
try {
m_action(m_source[unit]);
cin
sync
r16 var pending = Interlocked.Decrement(ref m_pending);
cin
implemented parallel map and foreach for arrays...
r15 if (pending == 0)
m_promise.Resolve(m_source.Length);
} catch (Exception e) {
m_promise.Reject(e);
}
}
}
class ArrayMapper<TSrc, TDst>: DispatchPool<int> {
readonly Func<TSrc, TDst> m_transform;
readonly TSrc[] m_source;
readonly TDst[] m_dest;
readonly Promise<TDst[]> m_promise = new Promise<TDst[]>();
cin
rewritten tracing
r92 readonly LogicalOperation m_logicalOperation;
cin
implemented parallel map and foreach for arrays...
r15
int m_pending;
int m_next;
public ArrayMapper(TSrc[] source, Func<TSrc, TDst> transform, int threads)
: base(threads) {
Debug.Assert (source != null);
Debug.Assert( transform != null);
m_next = 0;
m_source = source;
m_dest = new TDst[source.Length];
m_pending = source.Length;
m_transform = transform;
cin
rewritten tracing
r92 m_logicalOperation = TraceContext.Instance.CurrentOperation;
cin
implemented parallel map and foreach for arrays...
r15
cin
Promises rewritten, added improved version of AsyncQueue
r119 m_promise.On(Dispose, PromiseEventType.All);
cin
implemented parallel map and foreach for arrays...
r15
InitPool();
}
public Promise<TDst[]> Promise {
get {
return m_promise;
}
}
cin
Added TraceContext support to array traits
r41 protected override void Worker() {
cin
rewritten tracing
r92 TraceContext.Instance.EnterLogicalOperation(m_logicalOperation,false);
try {
base.Worker();
} finally {
TraceContext.Instance.Leave();
}
cin
Added TraceContext support to array traits
r41 }
cin
implemented parallel map and foreach for arrays...
r15 protected override bool TryDequeue(out int unit) {
cin
sync
r16 unit = Interlocked.Increment(ref m_next) - 1;
cin
rewritten tracing
r92 return unit < m_source.Length;
cin
implemented parallel map and foreach for arrays...
r15 }
protected override void InvokeUnit(int unit) {
try {
m_dest[unit] = m_transform(m_source[unit]);
cin
sync
r16 var pending = Interlocked.Decrement(ref m_pending);
cin
implemented parallel map and foreach for arrays...
r15 if (pending == 0)
m_promise.Resolve(m_dest);
} catch (Exception e) {
m_promise.Reject(e);
}
}
}
cin
small refactoring, cleanup.
r30 public static IPromise<TDst[]> ParallelMap<TSrc, TDst> (this TSrc[] source, Func<TSrc,TDst> transform, int threads) {
cin
implemented parallel map and foreach for arrays...
r15 if (source == null)
throw new ArgumentNullException("source");
if (transform == null)
throw new ArgumentNullException("transform");
var mapper = new ArrayMapper<TSrc, TDst>(source, transform, threads);
return mapper.Promise;
}
cin
small refactoring, cleanup.
r30 public static IPromise<int> ParallelForEach<TSrc>(this TSrc[] source, Action<TSrc> action, int threads) {
cin
implemented parallel map and foreach for arrays...
r15 if (source == null)
throw new ArgumentNullException("source");
if (action == null)
throw new ArgumentNullException("action");
var iter = new ArrayIterator<TSrc>(source, action, threads);
return iter.Promise;
}
cin
sync
r16
cin
code cleanup...
r101 public static IPromise<TDst[]> ChainedMap<TSrc, TDst>(this TSrc[] source, Func<TSrc, IPromise<TDst>> transform, int threads) {
cin
sync
r16 if (source == null)
throw new ArgumentNullException("source");
if (transform == null)
throw new ArgumentNullException("transform");
if (threads <= 0)
cin
code cleanup...
r101 throw new ArgumentOutOfRangeException("threads","Threads number must be greater then zero");
cin
sync
r16
cin
fixed: StackOverflow in IPromiseBase.Then(handler)...
r32 if (source.Length == 0)
cin
RC: cancellation support for promises + tests
r145 return Promise<TDst[]>.FromResult(new TDst[0]);
cin
fixed: StackOverflow in IPromiseBase.Then(handler)...
r32
cin
sync
r16 var promise = new Promise<TDst[]>();
var res = new TDst[source.Length];
var pending = source.Length;
cin
small refactoring, cleanup.
r30
cin
added memory barriers
r80 object locker = new object();
int slots = threads;
cin
sync
r16
cin
major refactoring, added tasks support
r75 // Analysis disable AccessToDisposedClosure
cin
major update, added Drain mathod to AsyncQueue class
r124 AsyncPool.RunThread<int>(() => {
cin
sync
r16 for (int i = 0; i < source.Length; i++) {
cin
Promise is rewritten to use interlocked operations instead of locks
r19 if(promise.IsResolved)
cin
sync
r16 break; // stop processing in case of error or cancellation
var idx = i;
cin
major refactoring, added tasks support
r75
cin
improved performance of a chained map operation
r89 if (Interlocked.Decrement(ref slots) < 0) {
lock(locker) {
while(slots < 0)
Monitor.Wait(locker);
}
cin
added memory barriers
r80 }
cin
improved performance of a chained map operation
r89
cin
sync
r16 try {
cin
added memory barriers
r80 transform(source[i])
cin
Promises rewritten, added improved version of AsyncQueue
r119 .On( x => {
cin
improved performance of a chained map operation
r89 Interlocked.Increment(ref slots);
lock (locker) {
cin
added memory barriers
r80 Monitor.Pulse(locker);
}
})
cin
renamed Promise.Last -> Promise.On...
r104 .On(
cin
added memory barriers
r80 x => {
res[idx] = x;
var left = Interlocked.Decrement(ref pending);
if (left == 0)
promise.Resolve(res);
},
cin
improved performance of a chained map operation
r89 promise.Reject
cin
added memory barriers
r80 );
cin
sync
r16
} catch (Exception e) {
promise.Reject(e);
}
}
return 0;
});
cin
added memory barriers
r80 return promise;
cin
sync
r16 }
cin
fixed dispatch pool race condition
r24
cin
implemented parallel map and foreach for arrays...
r15 }
}