# HG changeset patch # User cin # Date 2013-11-05 21:07:55 # Node ID b0feb5b9ad1c91f52916ac22904a2720d2e24eb6 # Parent eb418ba8275b82bd1ab6fa2736cd06a6ace539fb small fixes, WorkerPool still incomplete diff --git a/Implab.Test/AsyncTests.cs b/Implab.Test/AsyncTests.cs --- a/Implab.Test/AsyncTests.cs +++ b/Implab.Test/AsyncTests.cs @@ -107,6 +107,36 @@ namespace Implab.Test } [TestMethod] + public void WorkerPoolSizeTest() { + var pool = new WorkerPool(5,10); + + Assert.AreEqual(5, pool.ThreadCount); + + pool.Invoke(() => { Thread.Sleep(1000); return 10; }); + pool.Invoke(() => { Thread.Sleep(1000); return 10; }); + pool.Invoke(() => { Thread.Sleep(1000); return 10; }); + + Assert.AreEqual(5, pool.ThreadCount); + + for (int i = 0; i < 100; i++) + pool.Invoke(() => { Thread.Sleep(1000); return 10; }); + Assert.AreEqual(10, pool.ThreadCount); + } + + [TestMethod] + public void WorkerPoolCorrectTest() { + var pool = new WorkerPool(5, 20); + + var count = 0; + for (int i = 0; i < 1000; i++) + pool + .Invoke(() => 1) + .Then(x => Interlocked.Add(ref count, x)); + + Assert.AreEqual(1000, count); + } + + [TestMethod] public void ComplexCase1Test() { var flags = new bool[3]; diff --git a/Implab/IProgressNotifier.cs b/Implab/IProgressNotifier.cs --- a/Implab/IProgressNotifier.cs +++ b/Implab/IProgressNotifier.cs @@ -9,6 +9,6 @@ namespace Implab { event EventHandler> MessageUpdated; event EventHandler> ProgressUpdated; - EventHandler ProgressInit; + event EventHandler ProgressInit; } } diff --git a/Implab/ManagedPromise.cs b/Implab/ManagedPromise.cs --- a/Implab/ManagedPromise.cs +++ b/Implab/ManagedPromise.cs @@ -5,7 +5,7 @@ using System.Text; namespace Implab { - public class ManagedPromise: Promise, ITaskController, IProgressNotifier { + /*public class ManagedPromise: Promise, ITaskController, IProgressNotifier { - } + }*/ } diff --git a/Implab/Parallels/WorkerPool.cs b/Implab/Parallels/WorkerPool.cs --- a/Implab/Parallels/WorkerPool.cs +++ b/Implab/Parallels/WorkerPool.cs @@ -13,21 +13,50 @@ namespace Implab.Parallels { object m_lock = new object(); bool m_disposed = false; + + // this event will signal that workers can try to fetch a task from queue or the pool has been disposed ManualResetEvent m_hasTasks = new ManualResetEvent(false); Queue m_queue = new Queue(); public WorkerPool(int min, int max) { if (min < 0) throw new ArgumentOutOfRangeException("min"); + if (max <= 0) + throw new ArgumentOutOfRangeException("max"); + if (min > max) min = max; m_minThreads = min; m_maxThreads = max; + InitPool(); + } + + public WorkerPool(int max) + : this(0, max) { + } + + public WorkerPool() { + int maxThreads, maxCP; + ThreadPool.GetMaxThreads(out maxThreads, out maxCP); + + m_minThreads = 0; + m_maxThreads = maxThreads; + + InitPool(); + } + + void InitPool() { for (int i = 0; i < m_minThreads; i++) StartWorker(); } + public int ThreadCount { + get { + return m_runningThreads; + } + } + public Promise Invoke(Func task) { if (m_disposed) throw new ObjectDisposedException(ToString()); @@ -36,7 +65,16 @@ namespace Implab.Parallels { var promise = new Promise(); + var queueLen = EnqueueTask(delegate() { + try { + promise.Resolve(task()); + } catch (Exception e) { + promise.Reject(e); + } + }); + if (queueLen > 1) + StartWorker(); return promise; } @@ -53,16 +91,18 @@ namespace Implab.Parallels { // slot successfully allocated var worker = new Thread(this.Worker); + worker.IsBackground = true; worker.Start(); return true; } - void EnqueueTask(Action task) { + int EnqueueTask(Action task) { Debug.Assert(task != null); lock (m_queue) { m_queue.Enqueue(task); m_hasTasks.Set(); + return m_queue.Count; } } @@ -83,7 +123,7 @@ namespace Implab.Parallels { } // no tasks left - // signal that no more tasks left, lock ensures that this event won't suppress newly added task + // signal that no more tasks left, current lock ensures that this event won't suppress newly added task m_hasTasks.Reset(); }