##// END OF EJS Templates
small fixes, WorkerPool still incomplete
cin -
r13:b0feb5b9ad1c promises
parent child
Show More
@@ -107,6 +107,36 namespace Implab.Test
107 }
107 }
108
108
109 [TestMethod]
109 [TestMethod]
110 public void WorkerPoolSizeTest() {
111 var pool = new WorkerPool(5,10);
112
113 Assert.AreEqual(5, pool.ThreadCount);
114
115 pool.Invoke(() => { Thread.Sleep(1000); return 10; });
116 pool.Invoke(() => { Thread.Sleep(1000); return 10; });
117 pool.Invoke(() => { Thread.Sleep(1000); return 10; });
118
119 Assert.AreEqual(5, pool.ThreadCount);
120
121 for (int i = 0; i < 100; i++)
122 pool.Invoke(() => { Thread.Sleep(1000); return 10; });
123 Assert.AreEqual(10, pool.ThreadCount);
124 }
125
126 [TestMethod]
127 public void WorkerPoolCorrectTest() {
128 var pool = new WorkerPool(5, 20);
129
130 var count = 0;
131 for (int i = 0; i < 1000; i++)
132 pool
133 .Invoke(() => 1)
134 .Then(x => Interlocked.Add(ref count, x));
135
136 Assert.AreEqual(1000, count);
137 }
138
139 [TestMethod]
110 public void ComplexCase1Test() {
140 public void ComplexCase1Test() {
111 var flags = new bool[3];
141 var flags = new bool[3];
112
142
@@ -9,6 +9,6 namespace Implab
9 {
9 {
10 event EventHandler<ValueEventArgs<string>> MessageUpdated;
10 event EventHandler<ValueEventArgs<string>> MessageUpdated;
11 event EventHandler<ValueEventArgs<float>> ProgressUpdated;
11 event EventHandler<ValueEventArgs<float>> ProgressUpdated;
12 EventHandler<ProgressInitEventArgs> ProgressInit;
12 event EventHandler<ProgressInitEventArgs> ProgressInit;
13 }
13 }
14 }
14 }
@@ -5,7 +5,7 using System.Text;
5
5
6 namespace Implab {
6 namespace Implab {
7
7
8 public class ManagedPromise<T>: Promise<T>, ITaskController, IProgressNotifier {
8 /*public class ManagedPromise<T>: Promise<T>, ITaskController, IProgressNotifier {
9
9
10 }*/
10 }
11 }
11 }
@@ -13,21 +13,50 namespace Implab.Parallels {
13 object m_lock = new object();
13 object m_lock = new object();
14
14
15 bool m_disposed = false;
15 bool m_disposed = false;
16
17 // this event will signal that workers can try to fetch a task from queue or the pool has been disposed
16 ManualResetEvent m_hasTasks = new ManualResetEvent(false);
18 ManualResetEvent m_hasTasks = new ManualResetEvent(false);
17 Queue<Action> m_queue = new Queue<Action>();
19 Queue<Action> m_queue = new Queue<Action>();
18
20
19 public WorkerPool(int min, int max) {
21 public WorkerPool(int min, int max) {
20 if (min < 0)
22 if (min < 0)
21 throw new ArgumentOutOfRangeException("min");
23 throw new ArgumentOutOfRangeException("min");
24 if (max <= 0)
25 throw new ArgumentOutOfRangeException("max");
26
22 if (min > max)
27 if (min > max)
23 min = max;
28 min = max;
24 m_minThreads = min;
29 m_minThreads = min;
25 m_maxThreads = max;
30 m_maxThreads = max;
26
31
32 InitPool();
33 }
34
35 public WorkerPool(int max)
36 : this(0, max) {
37 }
38
39 public WorkerPool() {
40 int maxThreads, maxCP;
41 ThreadPool.GetMaxThreads(out maxThreads, out maxCP);
42
43 m_minThreads = 0;
44 m_maxThreads = maxThreads;
45
46 InitPool();
47 }
48
49 void InitPool() {
27 for (int i = 0; i < m_minThreads; i++)
50 for (int i = 0; i < m_minThreads; i++)
28 StartWorker();
51 StartWorker();
29 }
52 }
30
53
54 public int ThreadCount {
55 get {
56 return m_runningThreads;
57 }
58 }
59
31 public Promise<T> Invoke<T>(Func<T> task) {
60 public Promise<T> Invoke<T>(Func<T> task) {
32 if (m_disposed)
61 if (m_disposed)
33 throw new ObjectDisposedException(ToString());
62 throw new ObjectDisposedException(ToString());
@@ -36,7 +65,16 namespace Implab.Parallels {
36
65
37 var promise = new Promise<T>();
66 var promise = new Promise<T>();
38
67
68 var queueLen = EnqueueTask(delegate() {
69 try {
70 promise.Resolve(task());
71 } catch (Exception e) {
72 promise.Reject(e);
73 }
74 });
39
75
76 if (queueLen > 1)
77 StartWorker();
40
78
41 return promise;
79 return promise;
42 }
80 }
@@ -53,16 +91,18 namespace Implab.Parallels {
53 // slot successfully allocated
91 // slot successfully allocated
54
92
55 var worker = new Thread(this.Worker);
93 var worker = new Thread(this.Worker);
94 worker.IsBackground = true;
56 worker.Start();
95 worker.Start();
57
96
58 return true;
97 return true;
59 }
98 }
60
99
61 void EnqueueTask(Action task) {
100 int EnqueueTask(Action task) {
62 Debug.Assert(task != null);
101 Debug.Assert(task != null);
63 lock (m_queue) {
102 lock (m_queue) {
64 m_queue.Enqueue(task);
103 m_queue.Enqueue(task);
65 m_hasTasks.Set();
104 m_hasTasks.Set();
105 return m_queue.Count;
66 }
106 }
67 }
107 }
68
108
@@ -83,7 +123,7 namespace Implab.Parallels {
83 }
123 }
84
124
85 // no tasks left
125 // no tasks left
86 // signal that no more tasks left, lock ensures that this event won't suppress newly added task
126 // signal that no more tasks left, current lock ensures that this event won't suppress newly added task
87 m_hasTasks.Reset();
127 m_hasTasks.Reset();
88 }
128 }
89
129
General Comments 0
You need to be logged in to leave comments. Login now