##// END OF EJS Templates
minor fixes and improvements of AsyncQueue, additional tests
minor fixes and improvements of AsyncQueue, additional tests

File last commit:

r119:2573b562e328 v2
r122:0c8685c8b56b v2
Show More
ArrayTraits.cs
207 lines | 7.3 KiB | text/x-csharp | CSharpLexer
cin
Added TraceContext support to array traits
r41 using Implab.Diagnostics;
using System;
cin
implemented parallel map and foreach for arrays...
r15 using System.Diagnostics;
using System.Threading;
namespace Implab.Parallels {
public static class ArrayTraits {
class ArrayIterator<TSrc> : DispatchPool<int> {
readonly Action<TSrc> m_action;
readonly TSrc[] m_source;
readonly Promise<int> m_promise = new Promise<int>();
cin
rewritten tracing
r92 readonly LogicalOperation m_logicalOperation;
cin
implemented parallel map and foreach for arrays...
r15
int m_pending;
int m_next;
public ArrayIterator(TSrc[] source, Action<TSrc> action, int threads)
: base(threads) {
Debug.Assert(source != null);
Debug.Assert(action != null);
cin
rewritten tracing
r92 m_logicalOperation = TraceContext.Instance.CurrentOperation;
cin
implemented parallel map and foreach for arrays...
r15 m_next = 0;
m_source = source;
m_pending = source.Length;
m_action = action;
cin
Promises rewritten, added improved version of AsyncQueue
r119 m_promise.On(Dispose, PromiseEventType.All);
cin
implemented parallel map and foreach for arrays...
r15
InitPool();
}
public Promise<int> Promise {
get {
return m_promise;
}
}
cin
Added TraceContext support to array traits
r41 protected override void Worker() {
cin
rewritten tracing
r92 TraceContext.Instance.EnterLogicalOperation(m_logicalOperation, false);
try {
base.Worker();
} finally {
TraceContext.Instance.Leave();
}
cin
Added TraceContext support to array traits
r41 }
cin
implemented parallel map and foreach for arrays...
r15 protected override bool TryDequeue(out int unit) {
cin
sync
r16 unit = Interlocked.Increment(ref m_next) - 1;
cin
major refactoring, added tasks support
r75 return unit < m_source.Length;
cin
implemented parallel map and foreach for arrays...
r15 }
protected override void InvokeUnit(int unit) {
try {
m_action(m_source[unit]);
cin
sync
r16 var pending = Interlocked.Decrement(ref m_pending);
cin
implemented parallel map and foreach for arrays...
r15 if (pending == 0)
m_promise.Resolve(m_source.Length);
} catch (Exception e) {
m_promise.Reject(e);
}
}
}
class ArrayMapper<TSrc, TDst>: DispatchPool<int> {
readonly Func<TSrc, TDst> m_transform;
readonly TSrc[] m_source;
readonly TDst[] m_dest;
readonly Promise<TDst[]> m_promise = new Promise<TDst[]>();
cin
rewritten tracing
r92 readonly LogicalOperation m_logicalOperation;
cin
implemented parallel map and foreach for arrays...
r15
int m_pending;
int m_next;
public ArrayMapper(TSrc[] source, Func<TSrc, TDst> transform, int threads)
: base(threads) {
Debug.Assert (source != null);
Debug.Assert( transform != null);
m_next = 0;
m_source = source;
m_dest = new TDst[source.Length];
m_pending = source.Length;
m_transform = transform;
cin
rewritten tracing
r92 m_logicalOperation = TraceContext.Instance.CurrentOperation;
cin
implemented parallel map and foreach for arrays...
r15
cin
Promises rewritten, added improved version of AsyncQueue
r119 m_promise.On(Dispose, PromiseEventType.All);
cin
implemented parallel map and foreach for arrays...
r15
InitPool();
}
public Promise<TDst[]> Promise {
get {
return m_promise;
}
}
cin
Added TraceContext support to array traits
r41 protected override void Worker() {
cin
rewritten tracing
r92 TraceContext.Instance.EnterLogicalOperation(m_logicalOperation,false);
try {
base.Worker();
} finally {
TraceContext.Instance.Leave();
}
cin
Added TraceContext support to array traits
r41 }
cin
implemented parallel map and foreach for arrays...
r15 protected override bool TryDequeue(out int unit) {
cin
sync
r16 unit = Interlocked.Increment(ref m_next) - 1;
cin
rewritten tracing
r92 return unit < m_source.Length;
cin
implemented parallel map and foreach for arrays...
r15 }
protected override void InvokeUnit(int unit) {
try {
m_dest[unit] = m_transform(m_source[unit]);
cin
sync
r16 var pending = Interlocked.Decrement(ref m_pending);
cin
implemented parallel map and foreach for arrays...
r15 if (pending == 0)
m_promise.Resolve(m_dest);
} catch (Exception e) {
m_promise.Reject(e);
}
}
}
cin
small refactoring, cleanup.
r30 public static IPromise<TDst[]> ParallelMap<TSrc, TDst> (this TSrc[] source, Func<TSrc,TDst> transform, int threads) {
cin
implemented parallel map and foreach for arrays...
r15 if (source == null)
throw new ArgumentNullException("source");
if (transform == null)
throw new ArgumentNullException("transform");
var mapper = new ArrayMapper<TSrc, TDst>(source, transform, threads);
return mapper.Promise;
}
cin
small refactoring, cleanup.
r30 public static IPromise<int> ParallelForEach<TSrc>(this TSrc[] source, Action<TSrc> action, int threads) {
cin
implemented parallel map and foreach for arrays...
r15 if (source == null)
throw new ArgumentNullException("source");
if (action == null)
throw new ArgumentNullException("action");
var iter = new ArrayIterator<TSrc>(source, action, threads);
return iter.Promise;
}
cin
sync
r16
cin
code cleanup...
r101 public static IPromise<TDst[]> ChainedMap<TSrc, TDst>(this TSrc[] source, Func<TSrc, IPromise<TDst>> transform, int threads) {
cin
sync
r16 if (source == null)
throw new ArgumentNullException("source");
if (transform == null)
throw new ArgumentNullException("transform");
if (threads <= 0)
cin
code cleanup...
r101 throw new ArgumentOutOfRangeException("threads","Threads number must be greater then zero");
cin
sync
r16
cin
fixed: StackOverflow in IPromiseBase.Then(handler)...
r32 if (source.Length == 0)
return Promise<TDst[]>.ResultToPromise(new TDst[0]);
cin
sync
r16 var promise = new Promise<TDst[]>();
var res = new TDst[source.Length];
var pending = source.Length;
cin
small refactoring, cleanup.
r30
cin
added memory barriers
r80 object locker = new object();
int slots = threads;
cin
sync
r16
cin
major refactoring, added tasks support
r75 // Analysis disable AccessToDisposedClosure
cin
Promises rewritten, added improved version of AsyncQueue
r119 AsyncPool.InvokeNewThread<int>(() => {
cin
sync
r16 for (int i = 0; i < source.Length; i++) {
cin
Promise is rewritten to use interlocked operations instead of locks
r19 if(promise.IsResolved)
cin
sync
r16 break; // stop processing in case of error or cancellation
var idx = i;
cin
major refactoring, added tasks support
r75
cin
improved performance of a chained map operation
r89 if (Interlocked.Decrement(ref slots) < 0) {
lock(locker) {
while(slots < 0)
Monitor.Wait(locker);
}
cin
added memory barriers
r80 }
cin
improved performance of a chained map operation
r89
cin
sync
r16 try {
cin
added memory barriers
r80 transform(source[i])
cin
Promises rewritten, added improved version of AsyncQueue
r119 .On( x => {
cin
improved performance of a chained map operation
r89 Interlocked.Increment(ref slots);
lock (locker) {
cin
added memory barriers
r80 Monitor.Pulse(locker);
}
})
cin
renamed Promise.Last -> Promise.On...
r104 .On(
cin
added memory barriers
r80 x => {
res[idx] = x;
var left = Interlocked.Decrement(ref pending);
if (left == 0)
promise.Resolve(res);
},
cin
improved performance of a chained map operation
r89 promise.Reject
cin
added memory barriers
r80 );
cin
sync
r16
} catch (Exception e) {
promise.Reject(e);
}
}
return 0;
});
cin
added memory barriers
r80 return promise;
cin
sync
r16 }
cin
fixed dispatch pool race condition
r24
cin
implemented parallel map and foreach for arrays...
r15 }
}