##// END OF EJS Templates
Added Skip method to JSON parser to skip contents of the current node
Added Skip method to JSON parser to skip contents of the current node

File last commit:

r40:fe33f4e02ad5 default
r62:62b440d46313 default
Show More
WorkerPool.cs
98 lines | 2.8 KiB | text/x-csharp | CSharpLexer
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading;
using System.Diagnostics;
using Implab.Diagnostics;
namespace Implab.Parallels {
public class WorkerPool : DispatchPool<Action> {
MTQueue<Action> m_queue = new MTQueue<Action>();
int m_queueLength = 0;
readonly int m_threshold = 1;
public WorkerPool(int minThreads, int maxThreads, int threshold)
: base(minThreads, maxThreads) {
m_threshold = threshold;
InitPool();
}
public WorkerPool(int minThreads, int maxThreads) :
base(minThreads, maxThreads) {
InitPool();
}
public WorkerPool(int threads)
: base(threads) {
InitPool();
}
public WorkerPool()
: base() {
InitPool();
}
public Promise<T> Invoke<T>(Func<T> task) {
if (task == null)
throw new ArgumentNullException("task");
if (IsDisposed)
throw new ObjectDisposedException(ToString());
var promise = new Promise<T>();
var caller = TraceContext.Snapshot();
EnqueueTask(delegate() {
caller.Invoke(delegate() {
try {
promise.Resolve(task());
} catch (Exception e) {
promise.Reject(e);
}
});
});
return promise;
}
protected void EnqueueTask(Action unit) {
Debug.Assert(unit != null);
var len = Interlocked.Increment(ref m_queueLength);
m_queue.Enqueue(unit);
if (len > m_threshold*ActiveThreads)
GrowPool();
}
protected override bool TryDequeue(out Action unit) {
if (m_queue.TryDequeue(out unit)) {
Interlocked.Decrement(ref m_queueLength);
return true;
}
return false;
}
protected override bool Suspend() {
// This override solves race condition
// WORKER CLIENT
// ---------------------------------------
// TryDeque == false
// Enqueue(unit), queueLen++
// GrowPool? == NO
// ActiveThreads--
// Suspend
// queueLength > 0
// continue
if (m_queueLength > 0)
return true;
return base.Suspend();
}
protected override void InvokeUnit(Action unit) {
unit();
}
}
}