##// END OF EJS Templates
Added Skip method to JSON parser to skip contents of the current node
Added Skip method to JSON parser to skip contents of the current node

File last commit:

r40:fe33f4e02ad5 default
r62:62b440d46313 default
Show More
WorkerPool.cs
98 lines | 2.8 KiB | text/x-csharp | CSharpLexer
cin
refactoring, added WorkerPool
r12 using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading;
using System.Diagnostics;
cin
initial log capabilities
r35 using Implab.Diagnostics;
cin
refactoring, added WorkerPool
r12
namespace Implab.Parallels {
cin
implemented parallel map and foreach for arrays...
r15 public class WorkerPool : DispatchPool<Action> {
cin
refactoring, added WorkerPool
r12
cin
implemented parallel map and foreach for arrays...
r15 MTQueue<Action> m_queue = new MTQueue<Action>();
int m_queueLength = 0;
cin
sync
r16 readonly int m_threshold = 1;
cin
small fixes, WorkerPool still incomplete
r13
cin
sync
r16 public WorkerPool(int minThreads, int maxThreads, int threshold)
cin
implemented parallel map and foreach for arrays...
r15 : base(minThreads, maxThreads) {
cin
sync
r16 m_threshold = threshold;
InitPool();
}
public WorkerPool(int minThreads, int maxThreads) :
base(minThreads, maxThreads) {
InitPool();
cin
small fixes, WorkerPool still incomplete
r13 }
cin
implemented parallel map and foreach for arrays...
r15 public WorkerPool(int threads)
: base(threads) {
cin
sync
r16 InitPool();
cin
small fixes, WorkerPool still incomplete
r13 }
cin
implemented parallel map and foreach for arrays...
r15 public WorkerPool()
: base() {
cin
sync
r16 InitPool();
cin
small fixes, WorkerPool still incomplete
r13 }
cin
refactoring, added WorkerPool
r12 public Promise<T> Invoke<T>(Func<T> task) {
if (task == null)
throw new ArgumentNullException("task");
cin
implemented parallel map and foreach for arrays...
r15 if (IsDisposed)
throw new ObjectDisposedException(ToString());
cin
refactoring, added WorkerPool
r12
var promise = new Promise<T>();
cin
improved tracing...
r40 var caller = TraceContext.Snapshot();
cin
initial log capabilities
r35
cin
implemented parallel map and foreach for arrays...
r15 EnqueueTask(delegate() {
cin
improved tracing...
r40 caller.Invoke(delegate() {
try {
promise.Resolve(task());
} catch (Exception e) {
promise.Reject(e);
}
});
cin
small fixes, WorkerPool still incomplete
r13 });
cin
refactoring, added WorkerPool
r12
return promise;
}
cin
implemented parallel map and foreach for arrays...
r15 protected void EnqueueTask(Action unit) {
Debug.Assert(unit != null);
cin
sync
r16 var len = Interlocked.Increment(ref m_queueLength);
cin
implemented parallel map and foreach for arrays...
r15 m_queue.Enqueue(unit);
cin
sync
r16
cin
DispatchPool works again, but performance is poor in some cases
r21 if (len > m_threshold*ActiveThreads)
GrowPool();
cin
refactoring, added WorkerPool
r12 }
cin
implemented parallel map and foreach for arrays...
r15 protected override bool TryDequeue(out Action unit) {
if (m_queue.TryDequeue(out unit)) {
Interlocked.Decrement(ref m_queueLength);
return true;
cin
refactoring, added WorkerPool
r12 }
cin
implemented parallel map and foreach for arrays...
r15 return false;
cin
refactoring, added WorkerPool
r12 }
cin
fixed race condition in DispatchPool
r34 protected override bool Suspend() {
// This override solves race condition
// WORKER CLIENT
// ---------------------------------------
// TryDeque == false
// Enqueue(unit), queueLen++
// GrowPool? == NO
// ActiveThreads--
// Suspend
// queueLength > 0
// continue
if (m_queueLength > 0)
return true;
return base.Suspend();
}
cin
implemented parallel map and foreach for arrays...
r15 protected override void InvokeUnit(Action unit) {
unit();
cin
refactoring, added WorkerPool
r12 }
cin
sync
r16
cin
refactoring, added WorkerPool
r12 }
}