##// END OF EJS Templates
removed the reference to the parent from the promise object this allows...
removed the reference to the parent from the promise object this allows resolved promises to release parents and results they are holding. Added complete set of operations to IPromiseBase interface Subscribing to the cancellation event of the promise should not affect it's IsExclusive property More tests.

File last commit:

r30:2fad2d1f4b03 default
r33:b255e4aeef17 default
Show More
DispatchPool.cs
335 lines | 11.7 KiB | text/x-csharp | CSharpLexer
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading;
using System.Diagnostics;
namespace Implab.Parallels {
public abstract class DispatchPool<TUnit> : IDisposable {
readonly int m_minThreads;
readonly int m_maxThreads;
int m_createdThreads = 0; // the current size of the pool
int m_activeThreads = 0; // the count of threads which are active
int m_sleepingThreads = 0; // the count of currently inactive threads
int m_maxRunningThreads = 0; // the meximum reached size of the pool
int m_exitRequired = 0; // the pool is going to shutdown, all unused workers are released
int m_releaseTimeout = 100; // the timeout while the working thread will wait for the new tasks before exit
int m_wakeEvents = 0; // the count of wake events
AutoResetEvent m_hasTasks = new AutoResetEvent(false);
protected DispatchPool(int min, int max) {
if (min < 0)
throw new ArgumentOutOfRangeException("min");
if (max <= 0)
throw new ArgumentOutOfRangeException("max");
if (min > max)
min = max;
m_minThreads = min;
m_maxThreads = max;
}
protected DispatchPool(int threads)
: this(threads, threads) {
}
protected DispatchPool() {
int maxThreads, maxCP;
ThreadPool.GetMaxThreads(out maxThreads, out maxCP);
m_minThreads = 0;
m_maxThreads = maxThreads;
}
protected void InitPool() {
for (int i = 0; i < m_minThreads; i++)
StartWorker();
}
public int PoolSize {
get {
return m_createdThreads;
}
}
public int ActiveThreads {
get {
return m_activeThreads;
}
}
public int MaxRunningThreads {
get {
return m_maxRunningThreads;
}
}
protected bool IsDisposed {
get {
return m_exitRequired != 0;
}
}
protected abstract bool TryDequeue(out TUnit unit);
#region thread execution traits
int SignalThread() {
var signals = Interlocked.Increment(ref m_wakeEvents);
if(signals == 1)
m_hasTasks.Set();
return signals;
}
bool FetchSignalOrWait(int timeout) {
var start = Environment.TickCount;
// означает, что поток владеет блокировкой и при успешном получении сигнала должен
// ее вернуть, чтобы другой ожидающий поток смог
bool hasLock = false;
do {
int signals;
do {
signals = m_wakeEvents;
if (signals == 0)
break;
} while (Interlocked.CompareExchange(ref m_wakeEvents, signals - 1, signals) != signals);
if (signals >= 1) {
if (signals > 1 && hasLock)
m_hasTasks.Set();
return true;
}
if (timeout != -1)
timeout = Math.Max(0, timeout - (Environment.TickCount - start));
// если сигналов больше не осталось, то первый поток, который дошел сюда сбросит событие
// и уйдет на пустой цикл, после чего заблокируется
hasLock = true;
} while (m_hasTasks.WaitOne(timeout));
return false;
}
bool Sleep(int timeout) {
Interlocked.Increment(ref m_sleepingThreads);
if (FetchSignalOrWait(timeout)) {
Interlocked.Decrement(ref m_sleepingThreads);
return true;
} else {
Interlocked.Decrement(ref m_sleepingThreads);
return false;
}
}
#endregion
/// <summary>
/// Запускает либо новый поток, если раньше не было ни одного потока, либо устанавливает событие пробуждение одного спящего потока
/// </summary>
protected void GrowPool() {
if (m_exitRequired != 0)
return;
if (m_sleepingThreads > m_wakeEvents) {
//Console.WriteLine("Waking threads (sleeps {0}, pending {1})", m_sleepingThreads, m_wakeEvents);
// all sleeping threads may gone
SignalThread(); // wake a sleeping thread;
// we can't check whether signal has been processed
// anyway it may take some time for the thread to start
// we will ensure that at least one thread is running
EnsurePoolIsAlive();
} else {
// if there is no sleeping threads in the pool
if (!StartWorker()) {
// we haven't started a new thread, but the current can be on the way to terminate and it can't process the queue
// send it a signal to spin again
SignalThread();
EnsurePoolIsAlive();
}
}
}
private void EnsurePoolIsAlive() {
if (AllocateThreadSlot(1)) {
// if there were no threads in the pool
var worker = new Thread(this.Worker);
worker.IsBackground = true;
worker.Start();
}
}
private bool Suspend() {
//no tasks left, exit if the thread is no longer needed
bool last;
bool requestExit;
// if threads have a timeout before releasing
if (m_releaseTimeout > 0)
requestExit = !Sleep(m_releaseTimeout);
else
requestExit = true;
if (!requestExit)
return true;
// release unsused thread
if (requestExit && ReleaseThreadSlot(out last)) {
// in case at the moment the last thread was being released
// a new task was added to the queue, we need to try
// to revoke the thread to avoid the situation when the task is left unprocessed
if (last && FetchSignalOrWait(0)) { // FetchSignalOrWait(0) will fetch pending task or will return false
SignalThread(); // since FetchSignalOrWait(0) has fetched the signal we need to reschedule it
return AllocateThreadSlot(1); // ensure that at least one thread is alive
}
return false;
}
// wait till infinity
Sleep(-1);
return true;
}
#region thread slots traits
bool AllocateThreadSlot() {
int current;
// use spins to allocate slot for the new thread
do {
current = m_createdThreads;
if (current >= m_maxThreads || m_exitRequired != 0)
// no more slots left or the pool has been disposed
return false;
} while (current != Interlocked.CompareExchange(ref m_createdThreads, current + 1, current));
UpdateMaxThreads(current + 1);
return true;
}
bool AllocateThreadSlot(int desired) {
if (desired - 1 != Interlocked.CompareExchange(ref m_createdThreads, desired, desired - 1))
return false;
UpdateMaxThreads(desired);
return true;
}
bool ReleaseThreadSlot(out bool last) {
last = false;
int current;
// use spins to release slot for the new thread
do {
current = m_createdThreads;
if (current <= m_minThreads && m_exitRequired == 0)
// the thread is reserved
return false;
} while (current != Interlocked.CompareExchange(ref m_createdThreads, current - 1, current));
last = (current == 1);
return true;
}
/// <summary>
/// releases thread slot unconditionally, used during cleanup
/// </summary>
/// <returns>true - no more threads left</returns>
bool ReleaseThreadSlotAnyway() {
var left = Interlocked.Decrement(ref m_createdThreads);
return left == 0;
}
void UpdateMaxThreads(int count) {
int max;
do {
max = m_maxRunningThreads;
if (max >= count)
break;
} while(max != Interlocked.CompareExchange(ref m_maxRunningThreads, count, max));
}
#endregion
bool StartWorker() {
if (AllocateThreadSlot()) {
// slot successfully allocated
var worker = new Thread(this.Worker);
worker.IsBackground = true;
worker.Start();
return true;
} else {
return false;
}
}
protected abstract void InvokeUnit(TUnit unit);
void Worker() {
TUnit unit;
//Console.WriteLine("{0}: Active", Thread.CurrentThread.ManagedThreadId);
Interlocked.Increment(ref m_activeThreads);
do {
// exit if requested
if (m_exitRequired != 0) {
// release the thread slot
Interlocked.Decrement(ref m_activeThreads);
if (ReleaseThreadSlotAnyway()) // it was the last worker
m_hasTasks.Dispose();
else
SignalThread(); // wake next worker
break;
}
// fetch task
if (TryDequeue(out unit)) {
InvokeUnit(unit);
continue;
}
Interlocked.Decrement(ref m_activeThreads);
// entering suspend state
// keep this thread and wait
if (!Suspend())
break;
//Console.WriteLine("{0}: Awake", Thread.CurrentThread.ManagedThreadId);
Interlocked.Increment(ref m_activeThreads);
} while (true);
//Console.WriteLine("{0}: Exited", Thread.CurrentThread.ManagedThreadId);
}
protected virtual void Dispose(bool disposing) {
if (disposing) {
if (m_exitRequired == 0) {
if (Interlocked.CompareExchange(ref m_exitRequired, 1, 0) != 0)
return;
// wake sleeping threads
if (m_createdThreads > 0)
SignalThread();
else
m_hasTasks.Dispose();
GC.SuppressFinalize(this);
}
}
}
public void Dispose() {
Dispose(true);
}
~DispatchPool() {
Dispose(false);
}
}
}