##// END OF EJS Templates
small fixes, WorkerPool still incomplete
cin -
r13:b0feb5b9ad1c promises
parent child
Show More
@@ -107,6 +107,36 namespace Implab.Test
107 107 }
108 108
109 109 [TestMethod]
110 public void WorkerPoolSizeTest() {
111 var pool = new WorkerPool(5,10);
112
113 Assert.AreEqual(5, pool.ThreadCount);
114
115 pool.Invoke(() => { Thread.Sleep(1000); return 10; });
116 pool.Invoke(() => { Thread.Sleep(1000); return 10; });
117 pool.Invoke(() => { Thread.Sleep(1000); return 10; });
118
119 Assert.AreEqual(5, pool.ThreadCount);
120
121 for (int i = 0; i < 100; i++)
122 pool.Invoke(() => { Thread.Sleep(1000); return 10; });
123 Assert.AreEqual(10, pool.ThreadCount);
124 }
125
126 [TestMethod]
127 public void WorkerPoolCorrectTest() {
128 var pool = new WorkerPool(5, 20);
129
130 var count = 0;
131 for (int i = 0; i < 1000; i++)
132 pool
133 .Invoke(() => 1)
134 .Then(x => Interlocked.Add(ref count, x));
135
136 Assert.AreEqual(1000, count);
137 }
138
139 [TestMethod]
110 140 public void ComplexCase1Test() {
111 141 var flags = new bool[3];
112 142
@@ -9,6 +9,6 namespace Implab
9 9 {
10 10 event EventHandler<ValueEventArgs<string>> MessageUpdated;
11 11 event EventHandler<ValueEventArgs<float>> ProgressUpdated;
12 EventHandler<ProgressInitEventArgs> ProgressInit;
12 event EventHandler<ProgressInitEventArgs> ProgressInit;
13 13 }
14 14 }
@@ -5,7 +5,7 using System.Text;
5 5
6 6 namespace Implab {
7 7
8 public class ManagedPromise<T>: Promise<T>, ITaskController, IProgressNotifier {
8 /*public class ManagedPromise<T>: Promise<T>, ITaskController, IProgressNotifier {
9 9
10 }
10 }*/
11 11 }
@@ -13,21 +13,50 namespace Implab.Parallels {
13 13 object m_lock = new object();
14 14
15 15 bool m_disposed = false;
16
17 // this event will signal that workers can try to fetch a task from queue or the pool has been disposed
16 18 ManualResetEvent m_hasTasks = new ManualResetEvent(false);
17 19 Queue<Action> m_queue = new Queue<Action>();
18 20
19 21 public WorkerPool(int min, int max) {
20 22 if (min < 0)
21 23 throw new ArgumentOutOfRangeException("min");
24 if (max <= 0)
25 throw new ArgumentOutOfRangeException("max");
26
22 27 if (min > max)
23 28 min = max;
24 29 m_minThreads = min;
25 30 m_maxThreads = max;
26 31
32 InitPool();
33 }
34
35 public WorkerPool(int max)
36 : this(0, max) {
37 }
38
39 public WorkerPool() {
40 int maxThreads, maxCP;
41 ThreadPool.GetMaxThreads(out maxThreads, out maxCP);
42
43 m_minThreads = 0;
44 m_maxThreads = maxThreads;
45
46 InitPool();
47 }
48
49 void InitPool() {
27 50 for (int i = 0; i < m_minThreads; i++)
28 51 StartWorker();
29 52 }
30 53
54 public int ThreadCount {
55 get {
56 return m_runningThreads;
57 }
58 }
59
31 60 public Promise<T> Invoke<T>(Func<T> task) {
32 61 if (m_disposed)
33 62 throw new ObjectDisposedException(ToString());
@@ -36,7 +65,16 namespace Implab.Parallels {
36 65
37 66 var promise = new Promise<T>();
38 67
68 var queueLen = EnqueueTask(delegate() {
69 try {
70 promise.Resolve(task());
71 } catch (Exception e) {
72 promise.Reject(e);
73 }
74 });
39 75
76 if (queueLen > 1)
77 StartWorker();
40 78
41 79 return promise;
42 80 }
@@ -53,16 +91,18 namespace Implab.Parallels {
53 91 // slot successfully allocated
54 92
55 93 var worker = new Thread(this.Worker);
94 worker.IsBackground = true;
56 95 worker.Start();
57 96
58 97 return true;
59 98 }
60 99
61 void EnqueueTask(Action task) {
100 int EnqueueTask(Action task) {
62 101 Debug.Assert(task != null);
63 102 lock (m_queue) {
64 103 m_queue.Enqueue(task);
65 104 m_hasTasks.Set();
105 return m_queue.Count;
66 106 }
67 107 }
68 108
@@ -83,7 +123,7 namespace Implab.Parallels {
83 123 }
84 124
85 125 // no tasks left
86 // signal that no more tasks left, lock ensures that this event won't suppress newly added task
126 // signal that no more tasks left, current lock ensures that this event won't suppress newly added task
87 127 m_hasTasks.Reset();
88 128 }
89 129
General Comments 0
You need to be logged in to leave comments. Login now