##// END OF EJS Templates
fixed dispatch pool race condition
cin -
r24:ee04e1fa78da default
parent child
Show More
@@ -1,333 +1,333
1 using System;
1 using System;
2 using Microsoft.VisualStudio.TestTools.UnitTesting;
2 using Microsoft.VisualStudio.TestTools.UnitTesting;
3 using System.Reflection;
3 using System.Reflection;
4 using System.Threading;
4 using System.Threading;
5 using Implab.Parallels;
5 using Implab.Parallels;
6
6
7 namespace Implab.Test {
7 namespace Implab.Test {
8 [TestClass]
8 [TestClass]
9 public class AsyncTests {
9 public class AsyncTests {
10 [TestMethod]
10 [TestMethod]
11 public void ResolveTest() {
11 public void ResolveTest() {
12 int res = -1;
12 int res = -1;
13 var p = new Promise<int>();
13 var p = new Promise<int>();
14 p.Then(x => res = x);
14 p.Then(x => res = x);
15 p.Resolve(100);
15 p.Resolve(100);
16
16
17 Assert.AreEqual(100, res);
17 Assert.AreEqual(100, res);
18 }
18 }
19
19
20 [TestMethod]
20 [TestMethod]
21 public void RejectTest() {
21 public void RejectTest() {
22 int res = -1;
22 int res = -1;
23 Exception err = null;
23 Exception err = null;
24
24
25 var p = new Promise<int>();
25 var p = new Promise<int>();
26 p.Then(x => res = x, e => err = e);
26 p.Then(x => res = x, e => err = e);
27 p.Reject(new ApplicationException("error"));
27 p.Reject(new ApplicationException("error"));
28
28
29 Assert.AreEqual(res, -1);
29 Assert.AreEqual(res, -1);
30 Assert.AreEqual(err.Message, "error");
30 Assert.AreEqual(err.Message, "error");
31
31
32 }
32 }
33
33
34 [TestMethod]
34 [TestMethod]
35 public void JoinSuccessTest() {
35 public void JoinSuccessTest() {
36 var p = new Promise<int>();
36 var p = new Promise<int>();
37 p.Resolve(100);
37 p.Resolve(100);
38 Assert.AreEqual(p.Join(), 100);
38 Assert.AreEqual(p.Join(), 100);
39 }
39 }
40
40
41 [TestMethod]
41 [TestMethod]
42 public void JoinFailTest() {
42 public void JoinFailTest() {
43 var p = new Promise<int>();
43 var p = new Promise<int>();
44 p.Reject(new ApplicationException("failed"));
44 p.Reject(new ApplicationException("failed"));
45
45
46 try {
46 try {
47 p.Join();
47 p.Join();
48 throw new ApplicationException("WRONG!");
48 throw new ApplicationException("WRONG!");
49 } catch (TargetInvocationException err) {
49 } catch (TargetInvocationException err) {
50 Assert.AreEqual(err.InnerException.Message, "failed");
50 Assert.AreEqual(err.InnerException.Message, "failed");
51 } catch {
51 } catch {
52 Assert.Fail("Got wrong excaption");
52 Assert.Fail("Got wrong excaption");
53 }
53 }
54 }
54 }
55
55
56 [TestMethod]
56 [TestMethod]
57 public void MapTest() {
57 public void MapTest() {
58 var p = new Promise<int>();
58 var p = new Promise<int>();
59
59
60 var p2 = p.Map(x => x.ToString());
60 var p2 = p.Map(x => x.ToString());
61 p.Resolve(100);
61 p.Resolve(100);
62
62
63 Assert.AreEqual(p2.Join(), "100");
63 Assert.AreEqual(p2.Join(), "100");
64 }
64 }
65
65
66 [TestMethod]
66 [TestMethod]
67 public void FixErrorTest() {
67 public void FixErrorTest() {
68 var p = new Promise<int>();
68 var p = new Promise<int>();
69
69
70 var p2 = p.Error(e => 101);
70 var p2 = p.Error(e => 101);
71
71
72 p.Reject(new Exception());
72 p.Reject(new Exception());
73
73
74 Assert.AreEqual(p2.Join(), 101);
74 Assert.AreEqual(p2.Join(), 101);
75 }
75 }
76
76
77 [TestMethod]
77 [TestMethod]
78 public void ChainTest() {
78 public void ChainTest() {
79 var p1 = new Promise<int>();
79 var p1 = new Promise<int>();
80
80
81 var p3 = p1.Chain(x => {
81 var p3 = p1.Chain(x => {
82 var p2 = new Promise<string>();
82 var p2 = new Promise<string>();
83 p2.Resolve(x.ToString());
83 p2.Resolve(x.ToString());
84 return p2;
84 return p2;
85 });
85 });
86
86
87 p1.Resolve(100);
87 p1.Resolve(100);
88
88
89 Assert.AreEqual(p3.Join(), "100");
89 Assert.AreEqual(p3.Join(), "100");
90 }
90 }
91
91
92 [TestMethod]
92 [TestMethod]
93 public void PoolTest() {
93 public void PoolTest() {
94 var pid = Thread.CurrentThread.ManagedThreadId;
94 var pid = Thread.CurrentThread.ManagedThreadId;
95 var p = AsyncPool.Invoke(() => Thread.CurrentThread.ManagedThreadId);
95 var p = AsyncPool.Invoke(() => Thread.CurrentThread.ManagedThreadId);
96
96
97 Assert.AreNotEqual(pid, p.Join());
97 Assert.AreNotEqual(pid, p.Join());
98 }
98 }
99
99
100 [TestMethod]
100 [TestMethod]
101 public void WorkerPoolSizeTest() {
101 public void WorkerPoolSizeTest() {
102 var pool = new WorkerPool(5, 10, 0);
102 var pool = new WorkerPool(5, 10, 0);
103
103
104 Assert.AreEqual(5, pool.PoolSize);
104 Assert.AreEqual(5, pool.PoolSize);
105
105
106 pool.Invoke(() => { Thread.Sleep(100000000); return 10; });
106 pool.Invoke(() => { Thread.Sleep(100000000); return 10; });
107 pool.Invoke(() => { Thread.Sleep(100000000); return 10; });
107 pool.Invoke(() => { Thread.Sleep(100000000); return 10; });
108 pool.Invoke(() => { Thread.Sleep(100000000); return 10; });
108 pool.Invoke(() => { Thread.Sleep(100000000); return 10; });
109
109
110 Assert.AreEqual(5, pool.PoolSize);
110 Assert.AreEqual(5, pool.PoolSize);
111
111
112 for (int i = 0; i < 100; i++)
112 for (int i = 0; i < 100; i++)
113 pool.Invoke(() => { Thread.Sleep(100000000); return 10; });
113 pool.Invoke(() => { Thread.Sleep(100000000); return 10; });
114 Thread.Sleep(200);
114 Thread.Sleep(200);
115 Assert.AreEqual(10, pool.PoolSize);
115 Assert.AreEqual(10, pool.PoolSize);
116
116
117 pool.Dispose();
117 pool.Dispose();
118 }
118 }
119
119
120 [TestMethod]
120 [TestMethod]
121 public void WorkerPoolCorrectTest() {
121 public void WorkerPoolCorrectTest() {
122 var pool = new WorkerPool(0,1000,100);
122 var pool = new WorkerPool(0,1000,100);
123
123
124 int iterations = 1000;
124 int iterations = 1000;
125 int pending = iterations;
125 int pending = iterations;
126 var stop = new ManualResetEvent(false);
126 var stop = new ManualResetEvent(false);
127
127
128 var count = 0;
128 var count = 0;
129 for (int i = 0; i < iterations; i++) {
129 for (int i = 0; i < iterations; i++) {
130 pool
130 pool
131 .Invoke(() => 1)
131 .Invoke(() => 1)
132 .Then(x => Interlocked.Add(ref count, x))
132 .Then(x => Interlocked.Add(ref count, x))
133 .Then(x => Math.Log10(x))
133 .Then(x => Math.Log10(x))
134 .Anyway(() => {
134 .Anyway(() => {
135 Interlocked.Decrement(ref pending);
135 Interlocked.Decrement(ref pending);
136 if (pending == 0)
136 if (pending == 0)
137 stop.Set();
137 stop.Set();
138 });
138 });
139 }
139 }
140
140
141 stop.WaitOne();
141 stop.WaitOne();
142
142
143 Assert.AreEqual(iterations, count);
143 Assert.AreEqual(iterations, count);
144 Console.WriteLine("Max threads: {0}", pool.MaxRunningThreads);
144 Console.WriteLine("Max threads: {0}", pool.MaxRunningThreads);
145 pool.Dispose();
145 pool.Dispose();
146
146
147 }
147 }
148
148
149 [TestMethod]
149 [TestMethod]
150 public void WorkerPoolDisposeTest() {
150 public void WorkerPoolDisposeTest() {
151 var pool = new WorkerPool(5, 20);
151 var pool = new WorkerPool(5, 20);
152 Assert.AreEqual(5, pool.PoolSize);
152 Assert.AreEqual(5, pool.PoolSize);
153 pool.Dispose();
153 pool.Dispose();
154 Thread.Sleep(500);
154 Thread.Sleep(500);
155 Assert.AreEqual(0, pool.PoolSize);
155 Assert.AreEqual(0, pool.PoolSize);
156 pool.Dispose();
156 pool.Dispose();
157 }
157 }
158
158
159 [TestMethod]
159 [TestMethod]
160 public void MTQueueTest() {
160 public void MTQueueTest() {
161 var queue = new MTQueue<int>();
161 var queue = new MTQueue<int>();
162 int res;
162 int res;
163
163
164 queue.Enqueue(10);
164 queue.Enqueue(10);
165 Assert.IsTrue(queue.TryDequeue(out res));
165 Assert.IsTrue(queue.TryDequeue(out res));
166 Assert.AreEqual(10, res);
166 Assert.AreEqual(10, res);
167 Assert.IsFalse(queue.TryDequeue(out res));
167 Assert.IsFalse(queue.TryDequeue(out res));
168
168
169 for (int i = 0; i < 1000; i++)
169 for (int i = 0; i < 1000; i++)
170 queue.Enqueue(i);
170 queue.Enqueue(i);
171
171
172 for (int i = 0; i < 1000; i++) {
172 for (int i = 0; i < 1000; i++) {
173 queue.TryDequeue(out res);
173 queue.TryDequeue(out res);
174 Assert.AreEqual(i, res);
174 Assert.AreEqual(i, res);
175 }
175 }
176
176
177 int writers = 0;
177 int writers = 0;
178 int readers = 0;
178 int readers = 0;
179 var stop = new ManualResetEvent(false);
179 var stop = new ManualResetEvent(false);
180 int total = 0;
180 int total = 0;
181
181
182 int itemsPerWriter = 1000;
182 int itemsPerWriter = 1000;
183 int writersCount = 3;
183 int writersCount = 3;
184
184
185 for (int i = 0; i < writersCount; i++) {
185 for (int i = 0; i < writersCount; i++) {
186 Interlocked.Increment(ref writers);
186 Interlocked.Increment(ref writers);
187 var wn = i;
187 var wn = i;
188 AsyncPool
188 AsyncPool
189 .InvokeNewThread(() => {
189 .InvokeNewThread(() => {
190 for (int ii = 0; ii < itemsPerWriter; ii++) {
190 for (int ii = 0; ii < itemsPerWriter; ii++) {
191 queue.Enqueue(1);
191 queue.Enqueue(1);
192 }
192 }
193 return 1;
193 return 1;
194 })
194 })
195 .Anyway(() => Interlocked.Decrement(ref writers));
195 .Anyway(() => Interlocked.Decrement(ref writers));
196 }
196 }
197
197
198 for (int i = 0; i < 10; i++) {
198 for (int i = 0; i < 10; i++) {
199 Interlocked.Increment(ref readers);
199 Interlocked.Increment(ref readers);
200 var wn = i;
200 var wn = i;
201 AsyncPool
201 AsyncPool
202 .InvokeNewThread(() => {
202 .InvokeNewThread(() => {
203 int t;
203 int t;
204 do {
204 do {
205 while (queue.TryDequeue(out t))
205 while (queue.TryDequeue(out t))
206 Interlocked.Add(ref total, t);
206 Interlocked.Add(ref total, t);
207 } while (writers > 0);
207 } while (writers > 0);
208 return 1;
208 return 1;
209 })
209 })
210 .Anyway(() => {
210 .Anyway(() => {
211 Interlocked.Decrement(ref readers);
211 Interlocked.Decrement(ref readers);
212 if (readers == 0)
212 if (readers == 0)
213 stop.Set();
213 stop.Set();
214 });
214 });
215 }
215 }
216
216
217 stop.WaitOne();
217 stop.WaitOne();
218
218
219 Assert.AreEqual(itemsPerWriter * writersCount, total);
219 Assert.AreEqual(itemsPerWriter * writersCount, total);
220 }
220 }
221
221
222 [TestMethod]
222 [TestMethod]
223 public void ParallelMapTest() {
223 public void ParallelMapTest() {
224
224
225 int count = 100000;
225 int count = 100000;
226
226
227 double[] args = new double[count];
227 double[] args = new double[count];
228 var rand = new Random();
228 var rand = new Random();
229
229
230 for (int i = 0; i < count; i++)
230 for (int i = 0; i < count; i++)
231 args[i] = rand.NextDouble();
231 args[i] = rand.NextDouble();
232
232
233 var t = Environment.TickCount;
233 var t = Environment.TickCount;
234 var res = args.ParallelMap(x => Math.Sin(x*x), 4).Join();
234 var res = args.ParallelMap(x => Math.Sin(x*x), 4).Join();
235
235
236 Console.WriteLine("Map complete in {0} ms", Environment.TickCount - t);
236 Console.WriteLine("Map complete in {0} ms", Environment.TickCount - t);
237
237
238 t = Environment.TickCount;
238 t = Environment.TickCount;
239 for (int i = 0; i < count; i++)
239 for (int i = 0; i < count; i++)
240 Assert.AreEqual(Math.Sin(args[i] * args[i]), res[i]);
240 Assert.AreEqual(Math.Sin(args[i] * args[i]), res[i]);
241 Console.WriteLine("Verified in {0} ms", Environment.TickCount - t);
241 Console.WriteLine("Verified in {0} ms", Environment.TickCount - t);
242 }
242 }
243
243
244 [TestMethod]
244 [TestMethod]
245 public void ChainedMapTest() {
245 public void ChainedMapTest() {
246
246
247 using (var pool = new WorkerPool(4,4,0)) {
247 using (var pool = new WorkerPool(0,100,0)) {
248 int count = 10000;
248 int count = 10000;
249
249
250 double[] args = new double[count];
250 double[] args = new double[count];
251 var rand = new Random();
251 var rand = new Random();
252
252
253 for (int i = 0; i < count; i++)
253 for (int i = 0; i < count; i++)
254 args[i] = rand.NextDouble();
254 args[i] = rand.NextDouble();
255
255
256 var t = Environment.TickCount;
256 var t = Environment.TickCount;
257 var res = args
257 var res = args
258 .ChainedMap(
258 .ChainedMap2(
259 x => pool.Invoke(
259 x => pool.Invoke(
260 () => Math.Sin(x * x)
260 () => Math.Sin(x * x)
261 ),
261 ),
262 4
262 4
263 )
263 )
264 .Join();
264 .Join();
265
265
266 Console.WriteLine("Map complete in {0} ms", Environment.TickCount - t);
266 Console.WriteLine("Map complete in {0} ms", Environment.TickCount - t);
267
267
268 t = Environment.TickCount;
268 t = Environment.TickCount;
269 for (int i = 0; i < count; i++)
269 for (int i = 0; i < count; i++)
270 Assert.AreEqual(Math.Sin(args[i] * args[i]), res[i]);
270 Assert.AreEqual(Math.Sin(args[i] * args[i]), res[i]);
271 Console.WriteLine("Verified in {0} ms", Environment.TickCount - t);
271 Console.WriteLine("Verified in {0} ms", Environment.TickCount - t);
272 Console.WriteLine("Max workers: {0}", pool.MaxRunningThreads);
272 Console.WriteLine("Max workers: {0}", pool.MaxRunningThreads);
273 }
273 }
274 }
274 }
275
275
276 [TestMethod]
276 [TestMethod]
277 public void ParallelForEachTest() {
277 public void ParallelForEachTest() {
278
278
279 int count = 100000;
279 int count = 100000;
280
280
281 int[] args = new int[count];
281 int[] args = new int[count];
282 var rand = new Random();
282 var rand = new Random();
283
283
284 for (int i = 0; i < count; i++)
284 for (int i = 0; i < count; i++)
285 args[i] = (int)(rand.NextDouble() * 100);
285 args[i] = (int)(rand.NextDouble() * 100);
286
286
287 int result = 0;
287 int result = 0;
288
288
289 var t = Environment.TickCount;
289 var t = Environment.TickCount;
290 args.ParallelForEach(x => Interlocked.Add(ref result, x), 4).Join();
290 args.ParallelForEach(x => Interlocked.Add(ref result, x), 4).Join();
291
291
292 Console.WriteLine("Iteration complete in {0} ms, result: {1}", Environment.TickCount - t, result);
292 Console.WriteLine("Iteration complete in {0} ms, result: {1}", Environment.TickCount - t, result);
293
293
294 int result2 = 0;
294 int result2 = 0;
295
295
296 t = Environment.TickCount;
296 t = Environment.TickCount;
297 for (int i = 0; i < count; i++)
297 for (int i = 0; i < count; i++)
298 result2 += args[i];
298 result2 += args[i];
299 Assert.AreEqual(result2, result);
299 Assert.AreEqual(result2, result);
300 Console.WriteLine("Verified in {0} ms", Environment.TickCount - t);
300 Console.WriteLine("Verified in {0} ms", Environment.TickCount - t);
301 }
301 }
302
302
303 [TestMethod]
303 [TestMethod]
304 public void ComplexCase1Test() {
304 public void ComplexCase1Test() {
305 var flags = new bool[3];
305 var flags = new bool[3];
306
306
307 // op1 (aync 200ms) => op2 (async 200ms) => op3 (sync map)
307 // op1 (aync 200ms) => op2 (async 200ms) => op3 (sync map)
308
308
309 var p = PromiseHelper
309 var p = PromiseHelper
310 .Sleep(200, "Alan")
310 .Sleep(200, "Alan")
311 .Cancelled(() => flags[0] = true)
311 .Cancelled(() => flags[0] = true)
312 .Chain(x =>
312 .Chain(x =>
313 PromiseHelper
313 PromiseHelper
314 .Sleep(200, "Hi, " + x)
314 .Sleep(200, "Hi, " + x)
315 .Map(y => y)
315 .Map(y => y)
316 .Cancelled(() => flags[1] = true)
316 .Cancelled(() => flags[1] = true)
317 )
317 )
318 .Cancelled(() => flags[2] = true);
318 .Cancelled(() => flags[2] = true);
319 Thread.Sleep(300);
319 Thread.Sleep(300);
320 p.Cancel();
320 p.Cancel();
321 try {
321 try {
322 Assert.AreEqual(p.Join(), "Hi, Alan");
322 Assert.AreEqual(p.Join(), "Hi, Alan");
323 Assert.Fail("Shouldn't get here");
323 Assert.Fail("Shouldn't get here");
324 } catch (OperationCanceledException) {
324 } catch (OperationCanceledException) {
325 }
325 }
326
326
327 Assert.IsFalse(flags[0]);
327 Assert.IsFalse(flags[0]);
328 Assert.IsTrue(flags[1]);
328 Assert.IsTrue(flags[1]);
329 Assert.IsTrue(flags[2]);
329 Assert.IsTrue(flags[2]);
330 }
330 }
331 }
331 }
332 }
332 }
333
333
1 NO CONTENT: modified file, binary diff hidden
NO CONTENT: modified file, binary diff hidden
@@ -1,171 +1,212
1 using System;
1 using System;
2 using System.Collections.Generic;
2 using System.Collections.Generic;
3 using System.Diagnostics;
3 using System.Diagnostics;
4 using System.Linq;
4 using System.Linq;
5 using System.Text;
5 using System.Text;
6 using System.Threading;
6 using System.Threading;
7
7
8 namespace Implab.Parallels {
8 namespace Implab.Parallels {
9 public static class ArrayTraits {
9 public static class ArrayTraits {
10 class ArrayIterator<TSrc> : DispatchPool<int> {
10 class ArrayIterator<TSrc> : DispatchPool<int> {
11 readonly Action<TSrc> m_action;
11 readonly Action<TSrc> m_action;
12 readonly TSrc[] m_source;
12 readonly TSrc[] m_source;
13 readonly Promise<int> m_promise = new Promise<int>();
13 readonly Promise<int> m_promise = new Promise<int>();
14
14
15 int m_pending;
15 int m_pending;
16 int m_next;
16 int m_next;
17
17
18 public ArrayIterator(TSrc[] source, Action<TSrc> action, int threads)
18 public ArrayIterator(TSrc[] source, Action<TSrc> action, int threads)
19 : base(threads) {
19 : base(threads) {
20
20
21 Debug.Assert(source != null);
21 Debug.Assert(source != null);
22 Debug.Assert(action != null);
22 Debug.Assert(action != null);
23
23
24 m_next = 0;
24 m_next = 0;
25 m_source = source;
25 m_source = source;
26 m_pending = source.Length;
26 m_pending = source.Length;
27 m_action = action;
27 m_action = action;
28
28
29 m_promise.Anyway(() => Dispose());
29 m_promise.Anyway(() => Dispose());
30 m_promise.Cancelled(() => Dispose());
30 m_promise.Cancelled(() => Dispose());
31
31
32 InitPool();
32 InitPool();
33 }
33 }
34
34
35 public Promise<int> Promise {
35 public Promise<int> Promise {
36 get {
36 get {
37 return m_promise;
37 return m_promise;
38 }
38 }
39 }
39 }
40
40
41 protected override bool TryDequeue(out int unit) {
41 protected override bool TryDequeue(out int unit) {
42 unit = Interlocked.Increment(ref m_next) - 1;
42 unit = Interlocked.Increment(ref m_next) - 1;
43 return unit >= m_source.Length ? false : true;
43 return unit >= m_source.Length ? false : true;
44 }
44 }
45
45
46 protected override void InvokeUnit(int unit) {
46 protected override void InvokeUnit(int unit) {
47 try {
47 try {
48 m_action(m_source[unit]);
48 m_action(m_source[unit]);
49 var pending = Interlocked.Decrement(ref m_pending);
49 var pending = Interlocked.Decrement(ref m_pending);
50 if (pending == 0)
50 if (pending == 0)
51 m_promise.Resolve(m_source.Length);
51 m_promise.Resolve(m_source.Length);
52 } catch (Exception e) {
52 } catch (Exception e) {
53 m_promise.Reject(e);
53 m_promise.Reject(e);
54 }
54 }
55 }
55 }
56 }
56 }
57
57
58 class ArrayMapper<TSrc, TDst>: DispatchPool<int> {
58 class ArrayMapper<TSrc, TDst>: DispatchPool<int> {
59 readonly Func<TSrc, TDst> m_transform;
59 readonly Func<TSrc, TDst> m_transform;
60 readonly TSrc[] m_source;
60 readonly TSrc[] m_source;
61 readonly TDst[] m_dest;
61 readonly TDst[] m_dest;
62 readonly Promise<TDst[]> m_promise = new Promise<TDst[]>();
62 readonly Promise<TDst[]> m_promise = new Promise<TDst[]>();
63
63
64 int m_pending;
64 int m_pending;
65 int m_next;
65 int m_next;
66
66
67 public ArrayMapper(TSrc[] source, Func<TSrc, TDst> transform, int threads)
67 public ArrayMapper(TSrc[] source, Func<TSrc, TDst> transform, int threads)
68 : base(threads) {
68 : base(threads) {
69
69
70 Debug.Assert (source != null);
70 Debug.Assert (source != null);
71 Debug.Assert( transform != null);
71 Debug.Assert( transform != null);
72
72
73 m_next = 0;
73 m_next = 0;
74 m_source = source;
74 m_source = source;
75 m_dest = new TDst[source.Length];
75 m_dest = new TDst[source.Length];
76 m_pending = source.Length;
76 m_pending = source.Length;
77 m_transform = transform;
77 m_transform = transform;
78
78
79 m_promise.Anyway(() => Dispose());
79 m_promise.Anyway(() => Dispose());
80 m_promise.Cancelled(() => Dispose());
80 m_promise.Cancelled(() => Dispose());
81
81
82 InitPool();
82 InitPool();
83 }
83 }
84
84
85 public Promise<TDst[]> Promise {
85 public Promise<TDst[]> Promise {
86 get {
86 get {
87 return m_promise;
87 return m_promise;
88 }
88 }
89 }
89 }
90
90
91 protected override bool TryDequeue(out int unit) {
91 protected override bool TryDequeue(out int unit) {
92 unit = Interlocked.Increment(ref m_next) - 1;
92 unit = Interlocked.Increment(ref m_next) - 1;
93 return unit >= m_source.Length ? false : true;
93 return unit >= m_source.Length ? false : true;
94 }
94 }
95
95
96 protected override void InvokeUnit(int unit) {
96 protected override void InvokeUnit(int unit) {
97 try {
97 try {
98 m_dest[unit] = m_transform(m_source[unit]);
98 m_dest[unit] = m_transform(m_source[unit]);
99 var pending = Interlocked.Decrement(ref m_pending);
99 var pending = Interlocked.Decrement(ref m_pending);
100 if (pending == 0)
100 if (pending == 0)
101 m_promise.Resolve(m_dest);
101 m_promise.Resolve(m_dest);
102 } catch (Exception e) {
102 } catch (Exception e) {
103 m_promise.Reject(e);
103 m_promise.Reject(e);
104 }
104 }
105 }
105 }
106 }
106 }
107
107
108 public static Promise<TDst[]> ParallelMap<TSrc, TDst> (this TSrc[] source, Func<TSrc,TDst> transform, int threads) {
108 public static Promise<TDst[]> ParallelMap<TSrc, TDst> (this TSrc[] source, Func<TSrc,TDst> transform, int threads) {
109 if (source == null)
109 if (source == null)
110 throw new ArgumentNullException("source");
110 throw new ArgumentNullException("source");
111 if (transform == null)
111 if (transform == null)
112 throw new ArgumentNullException("transform");
112 throw new ArgumentNullException("transform");
113
113
114 var mapper = new ArrayMapper<TSrc, TDst>(source, transform, threads);
114 var mapper = new ArrayMapper<TSrc, TDst>(source, transform, threads);
115 return mapper.Promise;
115 return mapper.Promise;
116 }
116 }
117
117
118 public static Promise<int> ParallelForEach<TSrc>(this TSrc[] source, Action<TSrc> action, int threads) {
118 public static Promise<int> ParallelForEach<TSrc>(this TSrc[] source, Action<TSrc> action, int threads) {
119 if (source == null)
119 if (source == null)
120 throw new ArgumentNullException("source");
120 throw new ArgumentNullException("source");
121 if (action == null)
121 if (action == null)
122 throw new ArgumentNullException("action");
122 throw new ArgumentNullException("action");
123
123
124 var iter = new ArrayIterator<TSrc>(source, action, threads);
124 var iter = new ArrayIterator<TSrc>(source, action, threads);
125 return iter.Promise;
125 return iter.Promise;
126 }
126 }
127
127
128 public static Promise<TDst[]> ChainedMap<TSrc, TDst>(this TSrc[] source, ChainedOperation<TSrc, TDst> transform, int threads) {
128 public static Promise<TDst[]> ChainedMap<TSrc, TDst>(this TSrc[] source, ChainedOperation<TSrc, TDst> transform, int threads) {
129 if (source == null)
129 if (source == null)
130 throw new ArgumentNullException("source");
130 throw new ArgumentNullException("source");
131 if (transform == null)
131 if (transform == null)
132 throw new ArgumentNullException("transform");
132 throw new ArgumentNullException("transform");
133 if (threads <= 0)
133 if (threads <= 0)
134 throw new ArgumentOutOfRangeException("Threads number must be greater then zero");
134 throw new ArgumentOutOfRangeException("Threads number must be greater then zero");
135
135
136 var promise = new Promise<TDst[]>();
136 var promise = new Promise<TDst[]>();
137 var res = new TDst[source.Length];
137 var res = new TDst[source.Length];
138 var pending = source.Length;
138 var pending = source.Length;
139 var semaphore = new Semaphore(threads, threads);
139 var semaphore = new Semaphore(threads, threads);
140
140
141 AsyncPool.InvokeNewThread(() => {
141 AsyncPool.InvokeNewThread(() => {
142 for (int i = 0; i < source.Length; i++) {
142 for (int i = 0; i < source.Length; i++) {
143 if(promise.IsResolved)
143 if(promise.IsResolved)
144 break; // stop processing in case of error or cancellation
144 break; // stop processing in case of error or cancellation
145 var idx = i;
145 var idx = i;
146 semaphore.WaitOne();
146 semaphore.WaitOne();
147 try {
147 try {
148 var p1 = transform(source[i]);
148 var p1 = transform(source[i]);
149 p1.Anyway(() => semaphore.Release());
149 p1.Anyway(() => semaphore.Release());
150 p1.Cancelled(() => semaphore.Release());
150 p1.Cancelled(() => semaphore.Release());
151 p1.Then(
151 p1.Then(
152 x => {
152 x => {
153 res[idx] = x;
153 res[idx] = x;
154 var left = Interlocked.Decrement(ref pending);
154 var left = Interlocked.Decrement(ref pending);
155 if (left == 0)
155 if (left == 0)
156 promise.Resolve(res);
156 promise.Resolve(res);
157 },
157 },
158 e => promise.Reject(e)
158 e => promise.Reject(e)
159 );
159 );
160
160
161 } catch (Exception e) {
161 } catch (Exception e) {
162 promise.Reject(e);
162 promise.Reject(e);
163 }
163 }
164 }
164 }
165 return 0;
165 return 0;
166 });
166 });
167
167
168 return promise.Anyway(() => semaphore.Dispose());
168 return promise.Anyway(() => semaphore.Dispose());
169 }
169 }
170
171 /*
172 this method is pretty fast, but it may cause a stack overflow if an element transformation is made faster then the next operation is
173 be chained, in this case the syncronous callback invocation will occur
174
175 public static Promise<TDst[]> ChainedMap2<TSrc, TDst>(this TSrc[] source, ChainedOperation<TSrc, TDst> transform, int threads) {
176 if (source == null)
177 throw new ArgumentNullException("source");
178 if (transform == null)
179 throw new ArgumentNullException("transform");
180 if (threads <= 0)
181 throw new ArgumentOutOfRangeException("Threads number must be greater then zero");
182
183 var promise = new Promise<TDst[]>();
184 var res = new TDst[source.Length];
185 var index = -1; // we will start with increment
186 var len = source.Length;
187 var pending = len;
188
189 Action<int> callback = null;
190 callback = (current) => {
191 if (current < len) {
192 transform(source[current])
193 .Then(
194 x => {
195 res[current] = x;
196 if (Interlocked.Decrement(ref pending) == 0)
197 promise.Resolve(res);
198 else
199 callback(Interlocked.Increment(ref index));
200 },
201 e => promise.Reject(e)
202 );
203 }
204 };
205
206 for (int i = 0; i < threads; i++)
207 callback(Interlocked.Increment(ref index));
208 return promise;
209 }
210 */
170 }
211 }
171 }
212 }
@@ -1,330 +1,332
1 using System;
1 using System;
2 using System.Collections.Generic;
2 using System.Collections.Generic;
3 using System.Linq;
3 using System.Linq;
4 using System.Text;
4 using System.Text;
5 using System.Threading;
5 using System.Threading;
6 using System.Diagnostics;
6 using System.Diagnostics;
7
7
8 namespace Implab.Parallels {
8 namespace Implab.Parallels {
9 public abstract class DispatchPool<TUnit> : IDisposable {
9 public abstract class DispatchPool<TUnit> : IDisposable {
10 readonly int m_minThreads;
10 readonly int m_minThreads;
11 readonly int m_maxThreads;
11 readonly int m_maxThreads;
12
12
13 int m_createdThreads = 0; // the current size of the pool
13 int m_createdThreads = 0; // the current size of the pool
14 int m_activeThreads = 0; // the count of threads which are active
14 int m_activeThreads = 0; // the count of threads which are active
15 int m_sleepingThreads = 0; // the count of currently inactive threads
15 int m_sleepingThreads = 0; // the count of currently inactive threads
16 int m_maxRunningThreads = 0; // the meximum reached size of the pool
16 int m_maxRunningThreads = 0; // the meximum reached size of the pool
17 int m_exitRequired = 0; // the pool is going to shutdown, all unused workers are released
17 int m_exitRequired = 0; // the pool is going to shutdown, all unused workers are released
18 int m_releaseTimeout = 100; // the timeout while the working thread will wait for the new tasks before exit
18 int m_releaseTimeout = 100; // the timeout while the working thread will wait for the new tasks before exit
19 int m_wakeEvents = 0; // the count of wake events
19 int m_wakeEvents = 0; // the count of wake events
20
20
21 AutoResetEvent m_hasTasks = new AutoResetEvent(false);
21 AutoResetEvent m_hasTasks = new AutoResetEvent(false);
22
22
23 protected DispatchPool(int min, int max) {
23 protected DispatchPool(int min, int max) {
24 if (min < 0)
24 if (min < 0)
25 throw new ArgumentOutOfRangeException("min");
25 throw new ArgumentOutOfRangeException("min");
26 if (max <= 0)
26 if (max <= 0)
27 throw new ArgumentOutOfRangeException("max");
27 throw new ArgumentOutOfRangeException("max");
28
28
29 if (min > max)
29 if (min > max)
30 min = max;
30 min = max;
31 m_minThreads = min;
31 m_minThreads = min;
32 m_maxThreads = max;
32 m_maxThreads = max;
33 }
33 }
34
34
35 protected DispatchPool(int threads)
35 protected DispatchPool(int threads)
36 : this(threads, threads) {
36 : this(threads, threads) {
37 }
37 }
38
38
39 protected DispatchPool() {
39 protected DispatchPool() {
40 int maxThreads, maxCP;
40 int maxThreads, maxCP;
41 ThreadPool.GetMaxThreads(out maxThreads, out maxCP);
41 ThreadPool.GetMaxThreads(out maxThreads, out maxCP);
42
42
43 m_minThreads = 0;
43 m_minThreads = 0;
44 m_maxThreads = maxThreads;
44 m_maxThreads = maxThreads;
45 }
45 }
46
46
47 protected void InitPool() {
47 protected void InitPool() {
48 for (int i = 0; i < m_minThreads; i++)
48 for (int i = 0; i < m_minThreads; i++)
49 StartWorker();
49 StartWorker();
50 }
50 }
51
51
52 public int PoolSize {
52 public int PoolSize {
53 get {
53 get {
54 return m_createdThreads;
54 return m_createdThreads;
55 }
55 }
56 }
56 }
57
57
58 public int ActiveThreads {
58 public int ActiveThreads {
59 get {
59 get {
60 return m_activeThreads;
60 return m_activeThreads;
61 }
61 }
62 }
62 }
63
63
64 public int MaxRunningThreads {
64 public int MaxRunningThreads {
65 get {
65 get {
66 return m_maxRunningThreads;
66 return m_maxRunningThreads;
67 }
67 }
68 }
68 }
69
69
70 protected bool IsDisposed {
70 protected bool IsDisposed {
71 get {
71 get {
72 return m_exitRequired != 0;
72 return m_exitRequired != 0;
73 }
73 }
74 }
74 }
75
75
76 protected abstract bool TryDequeue(out TUnit unit);
76 protected abstract bool TryDequeue(out TUnit unit);
77
77
78 #region thread execution traits
78 #region thread execution traits
79 int SignalThread() {
79 int SignalThread() {
80 var signals = Interlocked.Increment(ref m_wakeEvents);
80 var signals = Interlocked.Increment(ref m_wakeEvents);
81 if(signals == 1)
81 if(signals == 1)
82 m_hasTasks.Set();
82 m_hasTasks.Set();
83 return signals;
83 return signals;
84 }
84 }
85
85
86 bool FetchSignalOrWait(int timeout) {
86 bool FetchSignalOrWait(int timeout) {
87 var start = Environment.TickCount;
87 var start = Environment.TickCount;
88
88
89 // ΠΎΠ·Π½Π°Ρ‡Π°Π΅Ρ‚, Ρ‡Ρ‚ΠΎ ΠΏΠΎΡ‚ΠΎΠΊ Π²Π»Π°Π΄Π΅Π΅Ρ‚ Π±Π»ΠΎΠΊΠΈΡ€ΠΎΠ²ΠΊΠΎΠΉ ΠΈ ΠΏΡ€ΠΈ ΡƒΡΠΏΠ΅ΡˆΠ½ΠΎΠΌ ΠΏΠΎΠ»ΡƒΡ‡Π΅Π½ΠΈΠΈ сигнала Π΄ΠΎΠ»ΠΆΠ΅Π½
89 // ΠΎΠ·Π½Π°Ρ‡Π°Π΅Ρ‚, Ρ‡Ρ‚ΠΎ ΠΏΠΎΡ‚ΠΎΠΊ Π²Π»Π°Π΄Π΅Π΅Ρ‚ Π±Π»ΠΎΠΊΠΈΡ€ΠΎΠ²ΠΊΠΎΠΉ ΠΈ ΠΏΡ€ΠΈ ΡƒΡΠΏΠ΅ΡˆΠ½ΠΎΠΌ ΠΏΠΎΠ»ΡƒΡ‡Π΅Π½ΠΈΠΈ сигнала Π΄ΠΎΠ»ΠΆΠ΅Π½
90 // Π΅Π΅ Π²Π΅Ρ€Π½ΡƒΡ‚ΡŒ, Ρ‡Ρ‚ΠΎΠ±Ρ‹ Π΄Ρ€ΡƒΠ³ΠΎΠΉ ΠΎΠΆΠΈΠ΄Π°ΡŽΡ‰ΠΈΠΉ ΠΏΠΎΡ‚ΠΎΠΊ смог
90 // Π΅Π΅ Π²Π΅Ρ€Π½ΡƒΡ‚ΡŒ, Ρ‡Ρ‚ΠΎΠ±Ρ‹ Π΄Ρ€ΡƒΠ³ΠΎΠΉ ΠΎΠΆΠΈΠ΄Π°ΡŽΡ‰ΠΈΠΉ ΠΏΠΎΡ‚ΠΎΠΊ смог
91 bool hasLock = false;
91 bool hasLock = false;
92 do {
92 do {
93 int signals;
93 int signals;
94 do {
94 do {
95 signals = m_wakeEvents;
95 signals = m_wakeEvents;
96 if (signals == 0)
96 if (signals == 0)
97 break;
97 break;
98 } while (Interlocked.CompareExchange(ref m_wakeEvents, signals - 1, signals) != signals);
98 } while (Interlocked.CompareExchange(ref m_wakeEvents, signals - 1, signals) != signals);
99
99
100 if (signals >= 1) {
100 if (signals >= 1) {
101 if (signals > 1 && hasLock)
101 if (signals > 1 && hasLock)
102 m_hasTasks.Set();
102 m_hasTasks.Set();
103 return true;
103 return true;
104 }
104 }
105
105
106 if (timeout != -1)
106 if (timeout != -1)
107 timeout = Math.Max(0, timeout - (Environment.TickCount - start));
107 timeout = Math.Max(0, timeout - (Environment.TickCount - start));
108
108
109 // Ссли сигналов большС Π½Π΅ ΠΎΡΡ‚Π°Π»ΠΎΡΡŒ, Ρ‚ΠΎ ΠΏΠ΅Ρ€Π²Ρ‹ΠΉ ΠΏΠΎΡ‚ΠΎΠΊ, ΠΊΠΎΡ‚ΠΎΡ€Ρ‹ΠΉ дошСл сюда сбросит событиС
109 // Ссли сигналов большС Π½Π΅ ΠΎΡΡ‚Π°Π»ΠΎΡΡŒ, Ρ‚ΠΎ ΠΏΠ΅Ρ€Π²Ρ‹ΠΉ ΠΏΠΎΡ‚ΠΎΠΊ, ΠΊΠΎΡ‚ΠΎΡ€Ρ‹ΠΉ дошСл сюда сбросит событиС
110 // ΠΈ ΡƒΠΉΠ΄Π΅Ρ‚ Π½Π° пустой Ρ†ΠΈΠΊΠ», послС Ρ‡Π΅Π³ΠΎ заблокируСтся
110 // ΠΈ ΡƒΠΉΠ΄Π΅Ρ‚ Π½Π° пустой Ρ†ΠΈΠΊΠ», послС Ρ‡Π΅Π³ΠΎ заблокируСтся
111
111
112 hasLock = true;
112 hasLock = true;
113 } while (m_hasTasks.WaitOne(timeout));
113 } while (m_hasTasks.WaitOne(timeout));
114
114
115 return false;
115 return false;
116 }
116 }
117
117
118 bool Sleep(int timeout) {
118 bool Sleep(int timeout) {
119 Interlocked.Increment(ref m_sleepingThreads);
119 Interlocked.Increment(ref m_sleepingThreads);
120 if (FetchSignalOrWait(timeout)) {
120 if (FetchSignalOrWait(timeout)) {
121 Interlocked.Decrement(ref m_sleepingThreads);
121 Interlocked.Decrement(ref m_sleepingThreads);
122 return true;
122 return true;
123 } else {
123 } else {
124 Interlocked.Decrement(ref m_sleepingThreads);
124 Interlocked.Decrement(ref m_sleepingThreads);
125 return false;
125 return false;
126 }
126 }
127 }
127 }
128 #endregion
128 #endregion
129
129
130 /// <summary>
130 /// <summary>
131 /// ЗапускаСт Π»ΠΈΠ±ΠΎ Π½ΠΎΠ²Ρ‹ΠΉ ΠΏΠΎΡ‚ΠΎΠΊ, Ссли Ρ€Π°Π½ΡŒΡˆΠ΅ Π½Π΅ Π±Ρ‹Π»ΠΎ Π½ΠΈ ΠΎΠ΄Π½ΠΎΠ³ΠΎ ΠΏΠΎΡ‚ΠΎΠΊΠ°, Π»ΠΈΠ±ΠΎ устанавливаСт событиС ΠΏΡ€ΠΎΠ±ΡƒΠΆΠ΄Π΅Π½ΠΈΠ΅ ΠΎΠ΄Π½ΠΎΠ³ΠΎ спящСго ΠΏΠΎΡ‚ΠΎΠΊΠ°
131 /// ЗапускаСт Π»ΠΈΠ±ΠΎ Π½ΠΎΠ²Ρ‹ΠΉ ΠΏΠΎΡ‚ΠΎΠΊ, Ссли Ρ€Π°Π½ΡŒΡˆΠ΅ Π½Π΅ Π±Ρ‹Π»ΠΎ Π½ΠΈ ΠΎΠ΄Π½ΠΎΠ³ΠΎ ΠΏΠΎΡ‚ΠΎΠΊΠ°, Π»ΠΈΠ±ΠΎ устанавливаСт событиС ΠΏΡ€ΠΎΠ±ΡƒΠΆΠ΄Π΅Π½ΠΈΠ΅ ΠΎΠ΄Π½ΠΎΠ³ΠΎ спящСго ΠΏΠΎΡ‚ΠΎΠΊΠ°
132 /// </summary>
132 /// </summary>
133 protected void GrowPool() {
133 protected void GrowPool() {
134 if (m_exitRequired != 0)
134 if (m_exitRequired != 0)
135 return;
135 return;
136 if (m_sleepingThreads > m_wakeEvents) {
136 if (m_sleepingThreads > m_wakeEvents) {
137 //Console.WriteLine("Waking threads (sleeps {0}, pending {1})", m_sleepingThreads, m_wakeEvents);
137 //Console.WriteLine("Waking threads (sleeps {0}, pending {1})", m_sleepingThreads, m_wakeEvents);
138
138
139 // all sleeping threads may gone
139 // all sleeping threads may gone
140 SignalThread(); // wake a sleeping thread;
140 SignalThread(); // wake a sleeping thread;
141
141
142 // we can't check whether signal has been processed
142 // we can't check whether signal has been processed
143 // anyway it may take some time for the thread to start
143 // anyway it may take some time for the thread to start
144 // we will ensure that at least one thread is running
144 // we will ensure that at least one thread is running
145
145
146 if (AllocateThreadSlot(1)) {
146 if (AllocateThreadSlot(1)) {
147 // if there were no threads in the pool
147 // if there were no threads in the pool
148 var worker = new Thread(this.Worker);
148 var worker = new Thread(this.Worker);
149 worker.IsBackground = true;
149 worker.IsBackground = true;
150 worker.Start();
150 worker.Start();
151 }
151 }
152 } else {
152 } else {
153 // if there is no sleeping threads in the pool
153 // if there is no sleeping threads in the pool
154 StartWorker();
154 if (!StartWorker())
155 // we haven't started a new thread, but the current can be on the way and it can't process the queue
156 // send it a signal to spin again
157 SignalThread();
155 }
158 }
156 }
159 }
157
160
158 private bool Suspend() {
161 private bool Suspend() {
159 //no tasks left, exit if the thread is no longer needed
162 //no tasks left, exit if the thread is no longer needed
160 bool last;
163 bool last;
161 bool requestExit;
164 bool requestExit;
162
165
163 // if threads have a timeout before releasing
166 // if threads have a timeout before releasing
164 if (m_releaseTimeout > 0)
167 if (m_releaseTimeout > 0)
165 requestExit = !Sleep(m_releaseTimeout);
168 requestExit = !Sleep(m_releaseTimeout);
166 else
169 else
167 requestExit = true;
170 requestExit = true;
168
171
169 if (!requestExit)
172 if (!requestExit)
170 return true;
173 return true;
171
174
172 // release unsused thread
175 // release unsused thread
173 if (requestExit && ReleaseThreadSlot(out last)) {
176 if (requestExit && ReleaseThreadSlot(out last)) {
174 // in case at the moment the last thread was being released
177 // in case at the moment the last thread was being released
175 // a new task was added to the queue, we need to try
178 // a new task was added to the queue, we need to try
176 // to revoke the thread to avoid the situation when the task is left unprocessed
179 // to revoke the thread to avoid the situation when the task is left unprocessed
177 if (last && Sleep(0)) { // Sleep(0) will fetch pending task or will return false
180 if (last && Sleep(0)) { // Sleep(0) will fetch pending task or will return false
178 if (AllocateThreadSlot(1))
181 if (AllocateThreadSlot(1))
179 return true; // spin again...
182 return true; // spin again...
180 else
183 else
181 SignalThread(); // since Sleep(0) has fetched the signal we neet to reschedule it
184 SignalThread(); // since Sleep(0) has fetched the signal we neet to reschedule it
182
185
183 }
186 }
184
187
185 return false;
188 return false;
186 }
189 }
187
190
188 // wait till infinity
191 // wait till infinity
189 Sleep(-1);
192 Sleep(-1);
190
193
191 return true;
194 return true;
192 }
195 }
193
196
194 #region thread slots traits
197 #region thread slots traits
195
198
196 bool AllocateThreadSlot() {
199 bool AllocateThreadSlot() {
197 int current;
200 int current;
198 // use spins to allocate slot for the new thread
201 // use spins to allocate slot for the new thread
199 do {
202 do {
200 current = m_createdThreads;
203 current = m_createdThreads;
201 if (current >= m_maxThreads || m_exitRequired != 0)
204 if (current >= m_maxThreads || m_exitRequired != 0)
202 // no more slots left or the pool has been disposed
205 // no more slots left or the pool has been disposed
203 return false;
206 return false;
204 } while (current != Interlocked.CompareExchange(ref m_createdThreads, current + 1, current));
207 } while (current != Interlocked.CompareExchange(ref m_createdThreads, current + 1, current));
205
208
206 UpdateMaxThreads(current + 1);
209 UpdateMaxThreads(current + 1);
207
210
208 return true;
211 return true;
209 }
212 }
210
213
211 bool AllocateThreadSlot(int desired) {
214 bool AllocateThreadSlot(int desired) {
212 if (desired - 1 != Interlocked.CompareExchange(ref m_createdThreads, desired, desired - 1))
215 if (desired - 1 != Interlocked.CompareExchange(ref m_createdThreads, desired, desired - 1))
213 return false;
216 return false;
214
217
215 UpdateMaxThreads(desired);
218 UpdateMaxThreads(desired);
216
219
217 return true;
220 return true;
218 }
221 }
219
222
220 bool ReleaseThreadSlot(out bool last) {
223 bool ReleaseThreadSlot(out bool last) {
221 last = false;
224 last = false;
222 int current;
225 int current;
223 // use spins to release slot for the new thread
226 // use spins to release slot for the new thread
224 do {
227 do {
225 current = m_createdThreads;
228 current = m_createdThreads;
226 if (current <= m_minThreads && m_exitRequired == 0)
229 if (current <= m_minThreads && m_exitRequired == 0)
227 // the thread is reserved
230 // the thread is reserved
228 return false;
231 return false;
229 } while (current != Interlocked.CompareExchange(ref m_createdThreads, current - 1, current));
232 } while (current != Interlocked.CompareExchange(ref m_createdThreads, current - 1, current));
230
233
231 last = (current == 1);
234 last = (current == 1);
232
235
233 return true;
236 return true;
234 }
237 }
235
238
236 /// <summary>
239 /// <summary>
237 /// releases thread slot unconditionally, used during cleanup
240 /// releases thread slot unconditionally, used during cleanup
238 /// </summary>
241 /// </summary>
239 /// <returns>true - no more threads left</returns>
242 /// <returns>true - no more threads left</returns>
240 bool ReleaseThreadSlotAnyway() {
243 bool ReleaseThreadSlotAnyway() {
241 var left = Interlocked.Decrement(ref m_createdThreads);
244 var left = Interlocked.Decrement(ref m_createdThreads);
242 return left == 0;
245 return left == 0;
243 }
246 }
244
247
245 void UpdateMaxThreads(int count) {
248 void UpdateMaxThreads(int count) {
246 int max;
249 int max;
247 do {
250 do {
248 max = m_maxRunningThreads;
251 max = m_maxRunningThreads;
249 if (max >= count)
252 if (max >= count)
250 break;
253 break;
251 } while(max != Interlocked.CompareExchange(ref m_maxRunningThreads, count, max));
254 } while(max != Interlocked.CompareExchange(ref m_maxRunningThreads, count, max));
252 }
255 }
253
256
254 #endregion
257 #endregion
255
258
256 bool StartWorker() {
259 bool StartWorker() {
257 if (AllocateThreadSlot()) {
260 if (AllocateThreadSlot()) {
258 // slot successfully allocated
261 // slot successfully allocated
259 var worker = new Thread(this.Worker);
262 var worker = new Thread(this.Worker);
260 worker.IsBackground = true;
263 worker.IsBackground = true;
261 worker.Start();
264 worker.Start();
262
265
263 return true;
266 return true;
264 } else {
267 } else {
265 return false;
268 return false;
266 }
269 }
267 }
270 }
268
271
269 protected abstract void InvokeUnit(TUnit unit);
272 protected abstract void InvokeUnit(TUnit unit);
270
273
271 void Worker() {
274 void Worker() {
272 TUnit unit;
275 TUnit unit;
273 //Console.WriteLine("{0}: Active", Thread.CurrentThread.ManagedThreadId);
276 //Console.WriteLine("{0}: Active", Thread.CurrentThread.ManagedThreadId);
274 Interlocked.Increment(ref m_activeThreads);
277 Interlocked.Increment(ref m_activeThreads);
275 do {
278 do {
276 // exit if requested
279 // exit if requested
277 if (m_exitRequired != 0) {
280 if (m_exitRequired != 0) {
278 // release the thread slot
281 // release the thread slot
279 Interlocked.Decrement(ref m_activeThreads);
282 Interlocked.Decrement(ref m_activeThreads);
280 if (ReleaseThreadSlotAnyway()) // it was the last worker
283 if (ReleaseThreadSlotAnyway()) // it was the last worker
281 m_hasTasks.Dispose();
284 m_hasTasks.Dispose();
282 else
285 else
283 SignalThread(); // wake next worker
286 SignalThread(); // wake next worker
284 unit = default(TUnit);
285 break;
287 break;
286 }
288 }
287
289
288 // fetch task
290 // fetch task
289 if (TryDequeue(out unit)) {
291 if (TryDequeue(out unit)) {
290 InvokeUnit(unit);
292 InvokeUnit(unit);
291 continue;
293 continue;
292 }
294 }
293
295
294 Interlocked.Decrement(ref m_activeThreads);
296 Interlocked.Decrement(ref m_activeThreads);
295
297
296 // entering suspend state
298 // entering suspend state
297 // keep this thread and wait
299 // keep this thread and wait
298 if (!Suspend())
300 if (!Suspend())
299 break;
301 break;
300 //Console.WriteLine("{0}: Awake", Thread.CurrentThread.ManagedThreadId);
302 //Console.WriteLine("{0}: Awake", Thread.CurrentThread.ManagedThreadId);
301 Interlocked.Increment(ref m_activeThreads);
303 Interlocked.Increment(ref m_activeThreads);
302 } while (true);
304 } while (true);
303 //Console.WriteLine("{0}: Exited", Thread.CurrentThread.ManagedThreadId);
305 //Console.WriteLine("{0}: Exited", Thread.CurrentThread.ManagedThreadId);
304 }
306 }
305
307
306 protected virtual void Dispose(bool disposing) {
308 protected virtual void Dispose(bool disposing) {
307 if (disposing) {
309 if (disposing) {
308 if (m_exitRequired == 0) {
310 if (m_exitRequired == 0) {
309 if (Interlocked.CompareExchange(ref m_exitRequired, 1, 0) != 0)
311 if (Interlocked.CompareExchange(ref m_exitRequired, 1, 0) != 0)
310 return;
312 return;
311
313
312 // wake sleeping threads
314 // wake sleeping threads
313 if (m_createdThreads > 0)
315 if (m_createdThreads > 0)
314 SignalThread();
316 SignalThread();
315 else
317 else
316 m_hasTasks.Dispose();
318 m_hasTasks.Dispose();
317 GC.SuppressFinalize(this);
319 GC.SuppressFinalize(this);
318 }
320 }
319 }
321 }
320 }
322 }
321
323
322 public void Dispose() {
324 public void Dispose() {
323 Dispose(true);
325 Dispose(true);
324 }
326 }
325
327
326 ~DispatchPool() {
328 ~DispatchPool() {
327 Dispose(false);
329 Dispose(false);
328 }
330 }
329 }
331 }
330 }
332 }
@@ -1,75 +1,75
1 using System;
1 using System;
2 using System.Collections.Generic;
2 using System.Collections.Generic;
3 using System.Linq;
3 using System.Linq;
4 using System.Text;
4 using System.Text;
5 using System.Threading;
5 using System.Threading;
6
6
7 namespace Implab.Parallels {
7 namespace Implab.Parallels {
8 public class MTQueue<T> {
8 public class MTQueue<T> {
9 class Node {
9 class Node {
10 public Node(T value) {
10 public Node(T value) {
11 this.value = value;
11 this.value = value;
12 }
12 }
13 public readonly T value;
13 public readonly T value;
14 public Node next;
14 public Node next;
15 }
15 }
16
16
17 Node m_first;
17 Node m_first;
18 Node m_last;
18 Node m_last;
19
19
20 public void Enqueue(T value) {
20 public void Enqueue(T value) {
21 var last = m_last;
21 var last = m_last;
22 var next = new Node(value);
22 var next = new Node(value);
23
23
24 while (last != Interlocked.CompareExchange(ref m_last, next, last))
24 while (last != Interlocked.CompareExchange(ref m_last, next, last))
25 last = m_last;
25 last = m_last;
26
26
27 if (last != null)
27 if (last != null)
28 last.next = next;
28 last.next = next;
29 else
29 else
30 m_first = next;
30 m_first = next;
31 }
31 }
32
32
33 public bool TryDequeue(out T value) {
33 public bool TryDequeue(out T value) {
34 Node first;
34 Node first;
35 Node next = null;
35 Node next = null;
36 value = default(T);
36 value = default(T);
37
37
38 do {
38 do {
39 first = m_first;
39 first = m_first;
40 if (first == null)
40 if (first == null)
41 return false;
41 return false;
42 next = first.next;
42 next = first.next;
43 if (next == null) {
43 if (next == null) {
44 // this is the last element,
44 // this is the last element,
45 // then try to update the tail
45 // then try to update the tail
46 if (first != Interlocked.CompareExchange(ref m_last, null, first)) {
46 if (first != Interlocked.CompareExchange(ref m_last, null, first)) {
47 // this is a ace condition
47 // this is a race condition
48 if (m_last == null)
48 if (m_last == null)
49 // the queue is empty
49 // the queue is empty
50 return false;
50 return false;
51 // tail has been changed, than we need to restart
51 // tail has been changed, than we need to restart
52 continue;
52 continue;
53 }
53 }
54
54
55 // tail succesfully updated and first.next will never be changed
55 // tail succesfully updated and first.next will never be changed
56 // other readers will fail due to inconsistency m_last != m_fist, but m_first.next == null
56 // other readers will fail due to inconsistency m_last != m_fist, but m_first.next == null
57 // but the writer may update the m_first since the m_last is null
57 // but the writer may update the m_first since the m_last is null
58
58
59 // so we need to fix inconsistency by setting m_first to null, but if it already has been
59 // so we need to fix inconsistency by setting m_first to null, but if it already has been
60 // updated by a writer then we should just give up
60 // updated by a writer then we should just give up
61 Interlocked.CompareExchange(ref m_first, null, first);
61 Interlocked.CompareExchange(ref m_first, null, first);
62 break;
62 break;
63
63
64 } else {
64 } else {
65 if (first == Interlocked.CompareExchange(ref m_first, next, first))
65 if (first == Interlocked.CompareExchange(ref m_first, next, first))
66 // head succesfully updated
66 // head succesfully updated
67 break;
67 break;
68 }
68 }
69 } while (true);
69 } while (true);
70
70
71 value = first.value;
71 value = first.value;
72 return true;
72 return true;
73 }
73 }
74 }
74 }
75 }
75 }
General Comments 0
You need to be logged in to leave comments. Login now