##// END OF EJS Templates
fixed error handling for chained actions
fixed error handling for chained actions

File last commit:

r125:f803565868a4 v2
r198:b305c678923a default
Show More
DispatchPool.cs
197 lines | 6.0 KiB | text/x-csharp | CSharpLexer
cin
implemented parallel map and foreach for arrays...
r15 using System;
using System.Threading;
namespace Implab.Parallels {
public abstract class DispatchPool<TUnit> : IDisposable {
cin
dispatch pool rewritten
r81 readonly int m_minThreadsLimit;
readonly int m_maxThreadsLimit;
readonly int m_releaseTimeout = 1000; // the timeout while the working thread will wait for the new tasks before exit
cin
DispatchPool works again, but performance is poor in some cases
r21
cin
improved tracing...
r93 int m_threads; // the current size of the pool
int m_maxRunningThreads; // the meximum reached size of the pool
int m_exit; // the pool is going to shutdown, all unused workers are released
cin
added memory barriers
r80
cin
dispatch pool rewritten
r81 readonly object m_signal = new object(); // used to pulse waiting threads
cin
implemented parallel map and foreach for arrays...
r15
protected DispatchPool(int min, int max) {
if (min < 0)
throw new ArgumentOutOfRangeException("min");
if (max <= 0)
throw new ArgumentOutOfRangeException("max");
if (min > max)
min = max;
cin
dispatch pool rewritten
r81 m_minThreadsLimit = min;
m_maxThreadsLimit = max;
cin
implemented parallel map and foreach for arrays...
r15 }
protected DispatchPool(int threads)
: this(threads, threads) {
}
protected DispatchPool() {
cin
dispatch pool rewritten
r81 m_minThreadsLimit = 0;
cin
improved performance of promises
r125 m_maxThreadsLimit = Environment.ProcessorCount;
cin
implemented parallel map and foreach for arrays...
r15 }
protected void InitPool() {
cin
dispatch pool rewritten
r81 for (int i = 0; i < m_minThreadsLimit; i++)
cin
implemented parallel map and foreach for arrays...
r15 StartWorker();
}
cin
refactoring, sync
r20 public int PoolSize {
cin
implemented parallel map and foreach for arrays...
r15 get {
cin
added memory barriers
r80 Thread.MemoryBarrier();
cin
dispatch pool rewritten
r81 return m_threads;
cin
refactoring, sync
r20 }
}
cin
dispatch pool rewritten
r81
cin
implemented parallel map and foreach for arrays...
r15 public int MaxRunningThreads {
get {
cin
added memory barriers
r80 Thread.MemoryBarrier();
cin
implemented parallel map and foreach for arrays...
r15 return m_maxRunningThreads;
}
}
protected bool IsDisposed {
get {
cin
added memory barriers
r80 Thread.MemoryBarrier();
cin
dispatch pool rewritten
r81 return m_exit == 1;
cin
implemented parallel map and foreach for arrays...
r15 }
}
cin
Improved worker pool
r17 protected abstract bool TryDequeue(out TUnit unit);
cin
improved performance of a chained map operation
r89 bool Dequeue(out TUnit unit, int timeout) {
cin
dispatch pool rewritten
r81 int ts = Environment.TickCount;
if (TryDequeue(out unit))
return true;
lock (m_signal) {
while (!TryDequeue(out unit) && m_exit == 0)
if(!Monitor.Wait(m_signal, Math.Max(0, ts + timeout - Environment.TickCount))) {
// timeout
return false;
cin
added memory barriers
r80 }
cin
dispatch pool rewritten
r81 // queue item or terminate
Monitor.Pulse(m_signal);
if (m_exit == 1)
return false;
cin
added memory barriers
r80 }
cin
dispatch pool rewritten
r81 return true;
cin
implemented nonblocking wake singnals processing
r22 }
cin
dispatch pool rewritten
r81 protected void SignalThread() {
lock (m_signal) {
Monitor.Pulse(m_signal);
cin
small refactoring, cleanup.
r30 }
}
cin
Improved worker pool
r17 #region thread slots traits
bool AllocateThreadSlot() {
cin
sync
r16 int current;
cin
implemented parallel map and foreach for arrays...
r15 // use spins to allocate slot for the new thread
do {
cin
dispatch pool rewritten
r81 current = m_threads;
if (current >= m_maxThreadsLimit || m_exit == 1)
cin
implemented parallel map and foreach for arrays...
r15 // no more slots left or the pool has been disposed
return false;
cin
dispatch pool rewritten
r81 } while (current != Interlocked.CompareExchange(ref m_threads, current + 1, current));
cin
implemented parallel map and foreach for arrays...
r15
cin
Improved worker pool
r17 UpdateMaxThreads(current + 1);
return true;
}
cin
implemented parallel map and foreach for arrays...
r15
cin
Improved worker pool
r17 bool AllocateThreadSlot(int desired) {
cin
dispatch pool rewritten
r81 if (desired - 1 != Interlocked.CompareExchange(ref m_threads, desired, desired - 1))
cin
Improved worker pool
r17 return false;
UpdateMaxThreads(desired);
cin
implemented parallel map and foreach for arrays...
r15
cin
Improved worker pool
r17 return true;
}
bool ReleaseThreadSlot(out bool last) {
last = false;
int current;
// use spins to release slot for the new thread
cin
added memory barriers
r80 Thread.MemoryBarrier();
cin
Improved worker pool
r17 do {
cin
dispatch pool rewritten
r81 current = m_threads;
if (current <= m_minThreadsLimit && m_exit == 0)
cin
Improved worker pool
r17 // the thread is reserved
return false;
cin
dispatch pool rewritten
r81 } while (current != Interlocked.CompareExchange(ref m_threads, current - 1, current));
cin
Improved worker pool
r17
last = (current == 1);
cin
implemented parallel map and foreach for arrays...
r15
return true;
}
cin
Improved worker pool
r17 void UpdateMaxThreads(int count) {
int max;
cin
sync
r16 do {
cin
Improved worker pool
r17 max = m_maxRunningThreads;
if (max >= count)
break;
} while(max != Interlocked.CompareExchange(ref m_maxRunningThreads, count, max));
cin
sync
r16 }
cin
Improved worker pool
r17 #endregion
cin
dispatch pool rewritten
r81 protected bool StartWorker() {
cin
Improved worker pool
r17 if (AllocateThreadSlot()) {
// slot successfully allocated
cin
rewritten tracing
r92 var worker = new Thread(Worker);
cin
Improved worker pool
r17 worker.IsBackground = true;
worker.Start();
return true;
}
cin
rewritten tracing
r92 return false;
cin
sync
r16 }
cin
DispatchPool works again, but performance is poor in some cases
r21 protected abstract void InvokeUnit(TUnit unit);
cin
Added TraceContext support to array traits
r41 protected virtual void Worker() {
cin
DispatchPool works again, but performance is poor in some cases
r21 TUnit unit;
cin
dispatch pool rewritten
r81 bool last;
cin
implemented parallel map and foreach for arrays...
r15 do {
cin
dispatch pool rewritten
r81 while (Dequeue(out unit, m_releaseTimeout)) {
cin
DispatchPool works again, but performance is poor in some cases
r21 InvokeUnit(unit);
cin
dispatch pool rewritten
r81 }
if(!ReleaseThreadSlot(out last))
cin
DispatchPool works again, but performance is poor in some cases
r21 continue;
cin
dispatch pool rewritten
r81 // queue may be not empty
if (last && TryDequeue(out unit)) {
InvokeUnit(unit);
if (AllocateThreadSlot(1))
continue;
// we can safely exit since pool is alive
cin
implemented parallel map and foreach for arrays...
r15 }
cin
dispatch pool rewritten
r81 break;
} while(true);
}
cin
implemented parallel map and foreach for arrays...
r15
protected virtual void Dispose(bool disposing) {
if (disposing) {
cin
dispatch pool rewritten
r81 if (0 == Interlocked.CompareExchange(ref m_exit, 1, 0)) { // implies memory barrier
cin
implemented parallel map and foreach for arrays...
r15 // wake sleeping threads
cin
dispatch pool rewritten
r81 SignalThread();
cin
implemented parallel map and foreach for arrays...
r15 GC.SuppressFinalize(this);
}
}
}
public void Dispose() {
Dispose(true);
}
~DispatchPool() {
Dispose(false);
}
}
}