##// END OF EJS Templates
Added Skip method to JSON parser to skip contents of the current node
Added Skip method to JSON parser to skip contents of the current node

File last commit:

r41:2fc0fbe7d58b default
r62:62b440d46313 default
Show More
DispatchPool.cs
334 lines | 11.7 KiB | text/x-csharp | CSharpLexer
cin
implemented parallel map and foreach for arrays...
r15 using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading;
using System.Diagnostics;
namespace Implab.Parallels {
public abstract class DispatchPool<TUnit> : IDisposable {
readonly int m_minThreads;
readonly int m_maxThreads;
cin
DispatchPool works again, but performance is poor in some cases
r21
int m_createdThreads = 0; // the current size of the pool
int m_activeThreads = 0; // the count of threads which are active
int m_sleepingThreads = 0; // the count of currently inactive threads
int m_maxRunningThreads = 0; // the meximum reached size of the pool
int m_exitRequired = 0; // the pool is going to shutdown, all unused workers are released
int m_releaseTimeout = 100; // the timeout while the working thread will wait for the new tasks before exit
int m_wakeEvents = 0; // the count of wake events
cin
implemented parallel map and foreach for arrays...
r15 AutoResetEvent m_hasTasks = new AutoResetEvent(false);
protected DispatchPool(int min, int max) {
if (min < 0)
throw new ArgumentOutOfRangeException("min");
if (max <= 0)
throw new ArgumentOutOfRangeException("max");
if (min > max)
min = max;
m_minThreads = min;
m_maxThreads = max;
}
protected DispatchPool(int threads)
: this(threads, threads) {
}
protected DispatchPool() {
int maxThreads, maxCP;
ThreadPool.GetMaxThreads(out maxThreads, out maxCP);
m_minThreads = 0;
m_maxThreads = maxThreads;
}
protected void InitPool() {
for (int i = 0; i < m_minThreads; i++)
StartWorker();
}
cin
refactoring, sync
r20 public int PoolSize {
cin
implemented parallel map and foreach for arrays...
r15 get {
cin
refactoring, sync
r20 return m_createdThreads;
}
}
public int ActiveThreads {
get {
return m_activeThreads;
cin
implemented parallel map and foreach for arrays...
r15 }
}
public int MaxRunningThreads {
get {
return m_maxRunningThreads;
}
}
protected bool IsDisposed {
get {
return m_exitRequired != 0;
}
}
cin
Improved worker pool
r17 protected abstract bool TryDequeue(out TUnit unit);
cin
DispatchPool works again, but performance is poor in some cases
r21 #region thread execution traits
int SignalThread() {
var signals = Interlocked.Increment(ref m_wakeEvents);
if(signals == 1)
m_hasTasks.Set();
return signals;
}
cin
implemented nonblocking wake singnals processing
r22 bool FetchSignalOrWait(int timeout) {
var start = Environment.TickCount;
// означает, что поток владеет блокировкой и при успешном получении сигнала должен
// ее вернуть, чтобы другой ожидающий поток смог
bool hasLock = false;
do {
int signals;
do {
signals = m_wakeEvents;
if (signals == 0)
break;
} while (Interlocked.CompareExchange(ref m_wakeEvents, signals - 1, signals) != signals);
if (signals >= 1) {
if (signals > 1 && hasLock)
m_hasTasks.Set();
return true;
}
if (timeout != -1)
timeout = Math.Max(0, timeout - (Environment.TickCount - start));
// если сигналов больше не осталось, то первый поток, который дошел сюда сбросит событие
// и уйдет на пустой цикл, после чего заблокируется
hasLock = true;
} while (m_hasTasks.WaitOne(timeout));
return false;
}
cin
DispatchPool works again, but performance is poor in some cases
r21 bool Sleep(int timeout) {
Interlocked.Increment(ref m_sleepingThreads);
cin
implemented nonblocking wake singnals processing
r22 if (FetchSignalOrWait(timeout)) {
Interlocked.Decrement(ref m_sleepingThreads);
cin
Improved worker pool
r17 return true;
cin
DispatchPool works again, but performance is poor in some cases
r21 } else {
Interlocked.Decrement(ref m_sleepingThreads);
return false;
cin
refactoring, sync
r20 }
cin
Improved worker pool
r17 }
cin
DispatchPool works again, but performance is poor in some cases
r21 #endregion
cin
Improved worker pool
r17
/// <summary>
/// Запускает либо новый поток, если раньше не было ни одного потока, либо устанавливает событие пробуждение одного спящего потока
/// </summary>
cin
DispatchPool works again, but performance is poor in some cases
r21 protected void GrowPool() {
if (m_exitRequired != 0)
return;
if (m_sleepingThreads > m_wakeEvents) {
cin
implemented nonblocking wake singnals processing
r22 //Console.WriteLine("Waking threads (sleeps {0}, pending {1})", m_sleepingThreads, m_wakeEvents);
cin
DispatchPool works again, but performance is poor in some cases
r21 // all sleeping threads may gone
SignalThread(); // wake a sleeping thread;
cin
Improved worker pool
r17
cin
DispatchPool works again, but performance is poor in some cases
r21 // we can't check whether signal has been processed
// anyway it may take some time for the thread to start
// we will ensure that at least one thread is running
cin
small refactoring, cleanup.
r30 EnsurePoolIsAlive();
cin
DispatchPool works again, but performance is poor in some cases
r21 } else {
// if there is no sleeping threads in the pool
cin
small refactoring, cleanup.
r30 if (!StartWorker()) {
// we haven't started a new thread, but the current can be on the way to terminate and it can't process the queue
cin
fixed dispatch pool race condition
r24 // send it a signal to spin again
cin
small refactoring, cleanup.
r30 SignalThread();
EnsurePoolIsAlive();
}
}
}
cin
fixed race condition in DispatchPool
r34 protected void EnsurePoolIsAlive() {
cin
small refactoring, cleanup.
r30 if (AllocateThreadSlot(1)) {
// if there were no threads in the pool
var worker = new Thread(this.Worker);
worker.IsBackground = true;
worker.Start();
cin
Improved worker pool
r17 }
}
cin
fixed race condition in DispatchPool
r34 protected virtual bool Suspend() {
cin
refactoring, sync
r20 //no tasks left, exit if the thread is no longer needed
bool last;
bool requestExit;
cin
DispatchPool works again, but performance is poor in some cases
r21 // if threads have a timeout before releasing
cin
refactoring, sync
r20 if (m_releaseTimeout > 0)
requestExit = !Sleep(m_releaseTimeout);
else
requestExit = true;
cin
DispatchPool works again, but performance is poor in some cases
r21 if (!requestExit)
return true;
// release unsused thread
cin
refactoring, sync
r20 if (requestExit && ReleaseThreadSlot(out last)) {
// in case at the moment the last thread was being released
// a new task was added to the queue, we need to try
// to revoke the thread to avoid the situation when the task is left unprocessed
cin
small refactoring, cleanup.
r30 if (last && FetchSignalOrWait(0)) { // FetchSignalOrWait(0) will fetch pending task or will return false
SignalThread(); // since FetchSignalOrWait(0) has fetched the signal we need to reschedule it
return AllocateThreadSlot(1); // ensure that at least one thread is alive
cin
refactoring, sync
r20 }
return false;
}
cin
DispatchPool works again, but performance is poor in some cases
r21 // wait till infinity
cin
refactoring, sync
r20 Sleep(-1);
return true;
cin
Improved worker pool
r17 }
#region thread slots traits
bool AllocateThreadSlot() {
cin
sync
r16 int current;
cin
implemented parallel map and foreach for arrays...
r15 // use spins to allocate slot for the new thread
do {
cin
refactoring, sync
r20 current = m_createdThreads;
cin
implemented parallel map and foreach for arrays...
r15 if (current >= m_maxThreads || m_exitRequired != 0)
// no more slots left or the pool has been disposed
return false;
cin
refactoring, sync
r20 } while (current != Interlocked.CompareExchange(ref m_createdThreads, current + 1, current));
cin
implemented parallel map and foreach for arrays...
r15
cin
Improved worker pool
r17 UpdateMaxThreads(current + 1);
return true;
}
cin
implemented parallel map and foreach for arrays...
r15
cin
Improved worker pool
r17 bool AllocateThreadSlot(int desired) {
cin
refactoring, sync
r20 if (desired - 1 != Interlocked.CompareExchange(ref m_createdThreads, desired, desired - 1))
cin
Improved worker pool
r17 return false;
UpdateMaxThreads(desired);
cin
implemented parallel map and foreach for arrays...
r15
cin
Improved worker pool
r17 return true;
}
bool ReleaseThreadSlot(out bool last) {
last = false;
int current;
// use spins to release slot for the new thread
do {
cin
refactoring, sync
r20 current = m_createdThreads;
cin
Improved worker pool
r17 if (current <= m_minThreads && m_exitRequired == 0)
// the thread is reserved
return false;
cin
refactoring, sync
r20 } while (current != Interlocked.CompareExchange(ref m_createdThreads, current - 1, current));
cin
Improved worker pool
r17
last = (current == 1);
cin
implemented parallel map and foreach for arrays...
r15
return true;
}
cin
Improved worker pool
r17 /// <summary>
/// releases thread slot unconditionally, used during cleanup
/// </summary>
/// <returns>true - no more threads left</returns>
bool ReleaseThreadSlotAnyway() {
cin
refactoring, sync
r20 var left = Interlocked.Decrement(ref m_createdThreads);
cin
Improved worker pool
r17 return left == 0;
cin
implemented parallel map and foreach for arrays...
r15 }
cin
Improved worker pool
r17 void UpdateMaxThreads(int count) {
int max;
cin
sync
r16 do {
cin
Improved worker pool
r17 max = m_maxRunningThreads;
if (max >= count)
break;
} while(max != Interlocked.CompareExchange(ref m_maxRunningThreads, count, max));
cin
sync
r16 }
cin
Improved worker pool
r17 #endregion
bool StartWorker() {
if (AllocateThreadSlot()) {
// slot successfully allocated
var worker = new Thread(this.Worker);
worker.IsBackground = true;
worker.Start();
return true;
} else {
return false;
}
cin
sync
r16 }
cin
DispatchPool works again, but performance is poor in some cases
r21 protected abstract void InvokeUnit(TUnit unit);
cin
Added TraceContext support to array traits
r41 protected virtual void Worker() {
cin
DispatchPool works again, but performance is poor in some cases
r21 TUnit unit;
cin
implemented nonblocking wake singnals processing
r22 //Console.WriteLine("{0}: Active", Thread.CurrentThread.ManagedThreadId);
cin
DispatchPool works again, but performance is poor in some cases
r21 Interlocked.Increment(ref m_activeThreads);
cin
implemented parallel map and foreach for arrays...
r15 do {
// exit if requested
if (m_exitRequired != 0) {
// release the thread slot
cin
refactoring, sync
r20 Interlocked.Decrement(ref m_activeThreads);
cin
Improved worker pool
r17 if (ReleaseThreadSlotAnyway()) // it was the last worker
cin
implemented parallel map and foreach for arrays...
r15 m_hasTasks.Dispose();
else
cin
DispatchPool works again, but performance is poor in some cases
r21 SignalThread(); // wake next worker
break;
cin
implemented parallel map and foreach for arrays...
r15 }
// fetch task
if (TryDequeue(out unit)) {
cin
DispatchPool works again, but performance is poor in some cases
r21 InvokeUnit(unit);
continue;
cin
implemented parallel map and foreach for arrays...
r15 }
cin
refactoring, sync
r20 Interlocked.Decrement(ref m_activeThreads);
cin
implemented parallel map and foreach for arrays...
r15
cin
sync
r16 // entering suspend state
// keep this thread and wait
cin
refactoring, sync
r20 if (!Suspend())
cin
DispatchPool works again, but performance is poor in some cases
r21 break;
cin
implemented nonblocking wake singnals processing
r22 //Console.WriteLine("{0}: Awake", Thread.CurrentThread.ManagedThreadId);
cin
refactoring, sync
r20 Interlocked.Increment(ref m_activeThreads);
cin
implemented parallel map and foreach for arrays...
r15 } while (true);
cin
implemented nonblocking wake singnals processing
r22 //Console.WriteLine("{0}: Exited", Thread.CurrentThread.ManagedThreadId);
cin
implemented parallel map and foreach for arrays...
r15 }
protected virtual void Dispose(bool disposing) {
if (disposing) {
if (m_exitRequired == 0) {
if (Interlocked.CompareExchange(ref m_exitRequired, 1, 0) != 0)
return;
// wake sleeping threads
cin
DispatchPool works again, but performance is poor in some cases
r21 if (m_createdThreads > 0)
SignalThread();
else
m_hasTasks.Dispose();
cin
implemented parallel map and foreach for arrays...
r15 GC.SuppressFinalize(this);
}
}
}
public void Dispose() {
Dispose(true);
}
~DispatchPool() {
Dispose(false);
}
}
}