##// END OF EJS Templates
dispatch pool rewritten
cin -
r81:2c5631b43c7d v2
parent child
Show More
@@ -1,434 +1,434
1 using System;
1 using System;
2 using System.Reflection;
2 using System.Reflection;
3 using System.Threading;
3 using System.Threading;
4 using Implab.Parallels;
4 using Implab.Parallels;
5
5
6 #if MONO
6 #if MONO
7
7
8 using NUnit.Framework;
8 using NUnit.Framework;
9 using TestClassAttribute = NUnit.Framework.TestFixtureAttribute;
9 using TestClassAttribute = NUnit.Framework.TestFixtureAttribute;
10 using TestMethod = NUnit.Framework.TestAttribute;
10 using TestMethod = NUnit.Framework.TestAttribute;
11
11
12 #else
12 #else
13
13
14 using Microsoft.VisualStudio.TestTools.UnitTesting;
14 using Microsoft.VisualStudio.TestTools.UnitTesting;
15
15
16 #endif
16 #endif
17
17
18 namespace Implab.Test {
18 namespace Implab.Test {
19 [TestClass]
19 [TestClass]
20 public class AsyncTests {
20 public class AsyncTests {
21 [TestMethod]
21 [TestMethod]
22 public void ResolveTest() {
22 public void ResolveTest() {
23 int res = -1;
23 int res = -1;
24 var p = new Promise<int>();
24 var p = new Promise<int>();
25 p.Then(x => res = x);
25 p.Then(x => res = x);
26 p.Resolve(100);
26 p.Resolve(100);
27
27
28 Assert.AreEqual(100, res);
28 Assert.AreEqual(100, res);
29 }
29 }
30
30
31 [TestMethod]
31 [TestMethod]
32 public void RejectTest() {
32 public void RejectTest() {
33 int res = -1;
33 int res = -1;
34 Exception err = null;
34 Exception err = null;
35
35
36 var p = new Promise<int>();
36 var p = new Promise<int>();
37 p.Then(
37 p.Then(
38 x => res = x,
38 x => res = x,
39 e => {
39 e => {
40 err = e;
40 err = e;
41 return -2;
41 return -2;
42 }
42 }
43 );
43 );
44 p.Reject(new ApplicationException("error"));
44 p.Reject(new ApplicationException("error"));
45
45
46 Assert.AreEqual(res, -1);
46 Assert.AreEqual(res, -1);
47 Assert.AreEqual(err.Message, "error");
47 Assert.AreEqual(err.Message, "error");
48
48
49 }
49 }
50
50
51 [TestMethod]
51 [TestMethod]
52 public void CancelExceptionTest() {
52 public void CancelExceptionTest() {
53 var p = new Promise<bool>();
53 var p = new Promise<bool>();
54 p.Cancel();
54 p.Cancel();
55
55
56 var p2 = p.Cancelled(() => {
56 var p2 = p.Cancelled(() => {
57 throw new ApplicationException("CANCELLED");
57 throw new ApplicationException("CANCELLED");
58 });
58 });
59
59
60 try {
60 try {
61 p2.Join();
61 p2.Join();
62 Assert.Fail();
62 Assert.Fail();
63 } catch (ApplicationException err) {
63 } catch (ApplicationException err) {
64 Assert.AreEqual("CANCELLED", err.InnerException.Message);
64 Assert.AreEqual("CANCELLED", err.InnerException.Message);
65 }
65 }
66
66
67 }
67 }
68
68
69 [TestMethod]
69 [TestMethod]
70 public void ContinueOnCancelTest() {
70 public void ContinueOnCancelTest() {
71 var p = new Promise<bool>();
71 var p = new Promise<bool>();
72 p.Cancel();
72 p.Cancel();
73
73
74 var p2 = p
74 var p2 = p
75 .Cancelled(() => {
75 .Cancelled(() => {
76 throw new ApplicationException("CANCELLED");
76 throw new ApplicationException("CANCELLED");
77 })
77 })
78 .Error(e => true);
78 .Error(e => true);
79
79
80 Assert.AreEqual(true, p2.Join());
80 Assert.AreEqual(true, p2.Join());
81 }
81 }
82
82
83 [TestMethod]
83 [TestMethod]
84 public void JoinSuccessTest() {
84 public void JoinSuccessTest() {
85 var p = new Promise<int>();
85 var p = new Promise<int>();
86 p.Resolve(100);
86 p.Resolve(100);
87 Assert.AreEqual(p.Join(), 100);
87 Assert.AreEqual(p.Join(), 100);
88 }
88 }
89
89
90 [TestMethod]
90 [TestMethod]
91 public void JoinFailTest() {
91 public void JoinFailTest() {
92 var p = new Promise<int>();
92 var p = new Promise<int>();
93 p.Reject(new ApplicationException("failed"));
93 p.Reject(new ApplicationException("failed"));
94
94
95 try {
95 try {
96 p.Join();
96 p.Join();
97 throw new ApplicationException("WRONG!");
97 throw new ApplicationException("WRONG!");
98 } catch (TargetInvocationException err) {
98 } catch (TargetInvocationException err) {
99 Assert.AreEqual(err.InnerException.Message, "failed");
99 Assert.AreEqual(err.InnerException.Message, "failed");
100 } catch {
100 } catch {
101 Assert.Fail("Got wrong excaption");
101 Assert.Fail("Got wrong excaption");
102 }
102 }
103 }
103 }
104
104
105 [TestMethod]
105 [TestMethod]
106 public void MapTest() {
106 public void MapTest() {
107 var p = new Promise<int>();
107 var p = new Promise<int>();
108
108
109 var p2 = p.Then(x => x.ToString());
109 var p2 = p.Then(x => x.ToString());
110 p.Resolve(100);
110 p.Resolve(100);
111
111
112 Assert.AreEqual(p2.Join(), "100");
112 Assert.AreEqual(p2.Join(), "100");
113 }
113 }
114
114
115 [TestMethod]
115 [TestMethod]
116 public void FixErrorTest() {
116 public void FixErrorTest() {
117 var p = new Promise<int>();
117 var p = new Promise<int>();
118
118
119 var p2 = p.Error(e => 101);
119 var p2 = p.Error(e => 101);
120
120
121 p.Reject(new Exception());
121 p.Reject(new Exception());
122
122
123 Assert.AreEqual(p2.Join(), 101);
123 Assert.AreEqual(p2.Join(), 101);
124 }
124 }
125
125
126 [TestMethod]
126 [TestMethod]
127 public void ChainTest() {
127 public void ChainTest() {
128 var p1 = new Promise<int>();
128 var p1 = new Promise<int>();
129
129
130 var p3 = p1.Chain(x => {
130 var p3 = p1.Chain(x => {
131 var p2 = new Promise<string>();
131 var p2 = new Promise<string>();
132 p2.Resolve(x.ToString());
132 p2.Resolve(x.ToString());
133 return p2;
133 return p2;
134 });
134 });
135
135
136 p1.Resolve(100);
136 p1.Resolve(100);
137
137
138 Assert.AreEqual(p3.Join(), "100");
138 Assert.AreEqual(p3.Join(), "100");
139 }
139 }
140
140
141 [TestMethod]
141 [TestMethod]
142 public void PoolTest() {
142 public void PoolTest() {
143 var pid = Thread.CurrentThread.ManagedThreadId;
143 var pid = Thread.CurrentThread.ManagedThreadId;
144 var p = AsyncPool.Invoke(() => Thread.CurrentThread.ManagedThreadId);
144 var p = AsyncPool.Invoke(() => Thread.CurrentThread.ManagedThreadId);
145
145
146 Assert.AreNotEqual(pid, p.Join());
146 Assert.AreNotEqual(pid, p.Join());
147 }
147 }
148
148
149 [TestMethod]
149 [TestMethod]
150 public void WorkerPoolSizeTest() {
150 public void WorkerPoolSizeTest() {
151 var pool = new WorkerPool(5, 10, 0);
151 var pool = new WorkerPool(5, 10, 1);
152
152
153 Assert.AreEqual(5, pool.PoolSize);
153 Assert.AreEqual(5, pool.PoolSize);
154
154
155 pool.Invoke(() => { Thread.Sleep(100000000); return 10; });
155 pool.Invoke(() => { Thread.Sleep(100000000); return 10; });
156 pool.Invoke(() => { Thread.Sleep(100000000); return 10; });
156 pool.Invoke(() => { Thread.Sleep(100000000); return 10; });
157 pool.Invoke(() => { Thread.Sleep(100000000); return 10; });
157 pool.Invoke(() => { Thread.Sleep(100000000); return 10; });
158
158
159 Assert.AreEqual(5, pool.PoolSize);
159 Assert.AreEqual(5, pool.PoolSize);
160
160
161 for (int i = 0; i < 100; i++)
161 for (int i = 0; i < 100; i++)
162 pool.Invoke(() => { Thread.Sleep(100000000); return 10; });
162 pool.Invoke(() => { Thread.Sleep(100000000); return 10; });
163 Thread.Sleep(200);
163 Thread.Sleep(200);
164 Assert.AreEqual(10, pool.PoolSize);
164 Assert.AreEqual(10, pool.PoolSize);
165
165
166 pool.Dispose();
166 pool.Dispose();
167 }
167 }
168
168
169 [TestMethod]
169 [TestMethod]
170 public void WorkerPoolCorrectTest() {
170 public void WorkerPoolCorrectTest() {
171 var pool = new WorkerPool(0,1000,100);
171 var pool = new WorkerPool(0,1000,100);
172
172
173 const int iterations = 1000;
173 const int iterations = 1000;
174 int pending = iterations;
174 int pending = iterations;
175 var stop = new ManualResetEvent(false);
175 var stop = new ManualResetEvent(false);
176
176
177 var count = 0;
177 var count = 0;
178 for (int i = 0; i < iterations; i++) {
178 for (int i = 0; i < iterations; i++) {
179 pool
179 pool
180 .Invoke(() => 1)
180 .Invoke(() => 1)
181 .Then(x => Interlocked.Add(ref count, x))
181 .Then(x => Interlocked.Add(ref count, x))
182 .Then(x => Math.Log10(x))
182 .Then(x => Math.Log10(x))
183 .Anyway(() => {
183 .Anyway(() => {
184 Interlocked.Decrement(ref pending);
184 Interlocked.Decrement(ref pending);
185 if (pending == 0)
185 if (pending == 0)
186 stop.Set();
186 stop.Set();
187 });
187 });
188 }
188 }
189
189
190 stop.WaitOne();
190 stop.WaitOne();
191
191
192 Assert.AreEqual(iterations, count);
192 Assert.AreEqual(iterations, count);
193 Console.WriteLine("Max threads: {0}", pool.MaxRunningThreads);
193 Console.WriteLine("Max threads: {0}", pool.MaxRunningThreads);
194 pool.Dispose();
194 pool.Dispose();
195
195
196 }
196 }
197
197
198 [TestMethod]
198 [TestMethod]
199 public void WorkerPoolDisposeTest() {
199 public void WorkerPoolDisposeTest() {
200 var pool = new WorkerPool(5, 20);
200 var pool = new WorkerPool(5, 20);
201 Assert.AreEqual(5, pool.PoolSize);
201 Assert.AreEqual(5, pool.PoolSize);
202 pool.Dispose();
202 pool.Dispose();
203 Thread.Sleep(500);
203 Thread.Sleep(500);
204 Assert.AreEqual(0, pool.PoolSize);
204 Assert.AreEqual(0, pool.PoolSize);
205 pool.Dispose();
205 pool.Dispose();
206 }
206 }
207
207
208 [TestMethod]
208 [TestMethod]
209 public void MTQueueTest() {
209 public void MTQueueTest() {
210 var queue = new MTQueue<int>();
210 var queue = new MTQueue<int>();
211 int res;
211 int res;
212
212
213 queue.Enqueue(10);
213 queue.Enqueue(10);
214 Assert.IsTrue(queue.TryDequeue(out res));
214 Assert.IsTrue(queue.TryDequeue(out res));
215 Assert.AreEqual(10, res);
215 Assert.AreEqual(10, res);
216 Assert.IsFalse(queue.TryDequeue(out res));
216 Assert.IsFalse(queue.TryDequeue(out res));
217
217
218 for (int i = 0; i < 1000; i++)
218 for (int i = 0; i < 1000; i++)
219 queue.Enqueue(i);
219 queue.Enqueue(i);
220
220
221 for (int i = 0; i < 1000; i++) {
221 for (int i = 0; i < 1000; i++) {
222 queue.TryDequeue(out res);
222 queue.TryDequeue(out res);
223 Assert.AreEqual(i, res);
223 Assert.AreEqual(i, res);
224 }
224 }
225
225
226 int writers = 0;
226 int writers = 0;
227 int readers = 0;
227 int readers = 0;
228 var stop = new ManualResetEvent(false);
228 var stop = new ManualResetEvent(false);
229 int total = 0;
229 int total = 0;
230
230
231 const int itemsPerWriter = 10000;
231 const int itemsPerWriter = 10000;
232 const int writersCount = 10;
232 const int writersCount = 10;
233
233
234 for (int i = 0; i < writersCount; i++) {
234 for (int i = 0; i < writersCount; i++) {
235 Interlocked.Increment(ref writers);
235 Interlocked.Increment(ref writers);
236 AsyncPool
236 AsyncPool
237 .InvokeNewThread(() => {
237 .InvokeNewThread(() => {
238 for (int ii = 0; ii < itemsPerWriter; ii++) {
238 for (int ii = 0; ii < itemsPerWriter; ii++) {
239 queue.Enqueue(1);
239 queue.Enqueue(1);
240 }
240 }
241 return 1;
241 return 1;
242 })
242 })
243 .Anyway(() => Interlocked.Decrement(ref writers));
243 .Anyway(() => Interlocked.Decrement(ref writers));
244 }
244 }
245
245
246 for (int i = 0; i < 10; i++) {
246 for (int i = 0; i < 10; i++) {
247 Interlocked.Increment(ref readers);
247 Interlocked.Increment(ref readers);
248 AsyncPool
248 AsyncPool
249 .InvokeNewThread(() => {
249 .InvokeNewThread(() => {
250 int t;
250 int t;
251 do {
251 do {
252 while (queue.TryDequeue(out t))
252 while (queue.TryDequeue(out t))
253 Interlocked.Add(ref total, t);
253 Interlocked.Add(ref total, t);
254 } while (writers > 0);
254 } while (writers > 0);
255 return 1;
255 return 1;
256 })
256 })
257 .Anyway(() => {
257 .Anyway(() => {
258 Interlocked.Decrement(ref readers);
258 Interlocked.Decrement(ref readers);
259 if (readers == 0)
259 if (readers == 0)
260 stop.Set();
260 stop.Set();
261 });
261 });
262 }
262 }
263
263
264 stop.WaitOne();
264 stop.WaitOne();
265
265
266 Assert.AreEqual(itemsPerWriter * writersCount, total);
266 Assert.AreEqual(itemsPerWriter * writersCount, total);
267 }
267 }
268
268
269 [TestMethod]
269 [TestMethod]
270 public void ParallelMapTest() {
270 public void ParallelMapTest() {
271
271
272 const int count = 100000;
272 const int count = 100000;
273
273
274 var args = new double[count];
274 var args = new double[count];
275 var rand = new Random();
275 var rand = new Random();
276
276
277 for (int i = 0; i < count; i++)
277 for (int i = 0; i < count; i++)
278 args[i] = rand.NextDouble();
278 args[i] = rand.NextDouble();
279
279
280 var t = Environment.TickCount;
280 var t = Environment.TickCount;
281 var res = args.ParallelMap(x => Math.Sin(x*x), 4).Join();
281 var res = args.ParallelMap(x => Math.Sin(x*x), 4).Join();
282
282
283 Console.WriteLine("Map complete in {0} ms", Environment.TickCount - t);
283 Console.WriteLine("Map complete in {0} ms", Environment.TickCount - t);
284
284
285 t = Environment.TickCount;
285 t = Environment.TickCount;
286 for (int i = 0; i < count; i++)
286 for (int i = 0; i < count; i++)
287 Assert.AreEqual(Math.Sin(args[i] * args[i]), res[i]);
287 Assert.AreEqual(Math.Sin(args[i] * args[i]), res[i]);
288 Console.WriteLine("Verified in {0} ms", Environment.TickCount - t);
288 Console.WriteLine("Verified in {0} ms", Environment.TickCount - t);
289 }
289 }
290
290
291 [TestMethod]
291 [TestMethod]
292 public void ChainedMapTest() {
292 public void ChainedMapTest() {
293
293
294 using (var pool = new WorkerPool(0,10,100)) {
294 using (var pool = new WorkerPool(0,10,1)) {
295 const int count = 10000;
295 const int count = 10000;
296
296
297 var args = new double[count];
297 var args = new double[count];
298 var rand = new Random();
298 var rand = new Random();
299
299
300 for (int i = 0; i < count; i++)
300 for (int i = 0; i < count; i++)
301 args[i] = rand.NextDouble();
301 args[i] = rand.NextDouble();
302
302
303 var t = Environment.TickCount;
303 var t = Environment.TickCount;
304 var res = args
304 var res = args
305 .ChainedMap(
305 .ChainedMap(
306 // Analysis disable once AccessToDisposedClosure
306 // Analysis disable once AccessToDisposedClosure
307 x => pool.Invoke(
307 x => pool.Invoke(
308 () => Math.Sin(x * x)
308 () => Math.Sin(x * x)
309 ),
309 ),
310 4
310 4
311 )
311 )
312 .Join();
312 .Join();
313
313
314 Console.WriteLine("Map complete in {0} ms", Environment.TickCount - t);
314 Console.WriteLine("Map complete in {0} ms", Environment.TickCount - t);
315
315
316 t = Environment.TickCount;
316 t = Environment.TickCount;
317 for (int i = 0; i < count; i++)
317 for (int i = 0; i < count; i++)
318 Assert.AreEqual(Math.Sin(args[i] * args[i]), res[i]);
318 Assert.AreEqual(Math.Sin(args[i] * args[i]), res[i]);
319 Console.WriteLine("Verified in {0} ms", Environment.TickCount - t);
319 Console.WriteLine("Verified in {0} ms", Environment.TickCount - t);
320 Console.WriteLine("Max workers: {0}", pool.MaxRunningThreads);
320 Console.WriteLine("Max workers: {0}", pool.MaxRunningThreads);
321 }
321 }
322 }
322 }
323
323
324 [TestMethod]
324 [TestMethod]
325 public void ParallelForEachTest() {
325 public void ParallelForEachTest() {
326
326
327 const int count = 100000;
327 const int count = 100000;
328
328
329 var args = new int[count];
329 var args = new int[count];
330 var rand = new Random();
330 var rand = new Random();
331
331
332 for (int i = 0; i < count; i++)
332 for (int i = 0; i < count; i++)
333 args[i] = (int)(rand.NextDouble() * 100);
333 args[i] = (int)(rand.NextDouble() * 100);
334
334
335 int result = 0;
335 int result = 0;
336
336
337 var t = Environment.TickCount;
337 var t = Environment.TickCount;
338 args.ParallelForEach(x => Interlocked.Add(ref result, x), 4).Join();
338 args.ParallelForEach(x => Interlocked.Add(ref result, x), 4).Join();
339
339
340 Console.WriteLine("Iteration complete in {0} ms, result: {1}", Environment.TickCount - t, result);
340 Console.WriteLine("Iteration complete in {0} ms, result: {1}", Environment.TickCount - t, result);
341
341
342 int result2 = 0;
342 int result2 = 0;
343
343
344 t = Environment.TickCount;
344 t = Environment.TickCount;
345 for (int i = 0; i < count; i++)
345 for (int i = 0; i < count; i++)
346 result2 += args[i];
346 result2 += args[i];
347 Assert.AreEqual(result2, result);
347 Assert.AreEqual(result2, result);
348 Console.WriteLine("Verified in {0} ms", Environment.TickCount - t);
348 Console.WriteLine("Verified in {0} ms", Environment.TickCount - t);
349 }
349 }
350
350
351 [TestMethod]
351 [TestMethod]
352 public void ComplexCase1Test() {
352 public void ComplexCase1Test() {
353 var flags = new bool[3];
353 var flags = new bool[3];
354
354
355 // op1 (aync 200ms) => op2 (async 200ms) => op3 (sync map)
355 // op1 (aync 200ms) => op2 (async 200ms) => op3 (sync map)
356
356
357 var step1 = PromiseHelper
357 var step1 = PromiseHelper
358 .Sleep(200, "Alan")
358 .Sleep(200, "Alan")
359 .Cancelled(() => flags[0] = true);
359 .Cancelled(() => flags[0] = true);
360 var p = step1
360 var p = step1
361 .Chain(x =>
361 .Chain(x =>
362 PromiseHelper
362 PromiseHelper
363 .Sleep(200, "Hi, " + x)
363 .Sleep(200, "Hi, " + x)
364 .Then(y => y)
364 .Then(y => y)
365 .Cancelled(() => flags[1] = true)
365 .Cancelled(() => flags[1] = true)
366 )
366 )
367 .Cancelled(() => flags[2] = true);
367 .Cancelled(() => flags[2] = true);
368 step1.Join();
368 step1.Join();
369 p.Cancel();
369 p.Cancel();
370 try {
370 try {
371 Assert.AreEqual(p.Join(), "Hi, Alan");
371 Assert.AreEqual(p.Join(), "Hi, Alan");
372 Assert.Fail("Shouldn't get here");
372 Assert.Fail("Shouldn't get here");
373 } catch (OperationCanceledException) {
373 } catch (OperationCanceledException) {
374 }
374 }
375
375
376 Assert.IsFalse(flags[0]);
376 Assert.IsFalse(flags[0]);
377 Assert.IsTrue(flags[1]);
377 Assert.IsTrue(flags[1]);
378 Assert.IsTrue(flags[2]);
378 Assert.IsTrue(flags[2]);
379 }
379 }
380
380
381 [TestMethod]
381 [TestMethod]
382 public void ChainedCancel1Test() {
382 public void ChainedCancel1Test() {
383 // ΠΏΡ€ΠΈ ΠΎΡ‚ΠΌΠ΅Π½Π΅ сцСплСнной асинхронной ΠΎΠΏΠ΅Ρ€Π°Ρ†ΠΈΠΈ всС ΠΎΠ±Π΅Ρ‰Π°Π½ΠΈΠ΅ Π΄ΠΎΠ»ΠΆΠ½ΠΎ
383 // ΠΏΡ€ΠΈ ΠΎΡ‚ΠΌΠ΅Π½Π΅ сцСплСнной асинхронной ΠΎΠΏΠ΅Ρ€Π°Ρ†ΠΈΠΈ всС ΠΎΠ±Π΅Ρ‰Π°Π½ΠΈΠ΅ Π΄ΠΎΠ»ΠΆΠ½ΠΎ
384 // Π·Π°Π²Π΅Ρ€ΡˆΠ°Ρ‚ΡŒΡΡ ошибкой OperationCanceledException
384 // Π·Π°Π²Π΅Ρ€ΡˆΠ°Ρ‚ΡŒΡΡ ошибкой OperationCanceledException
385 var p = PromiseHelper
385 var p = PromiseHelper
386 .Sleep(1, "Hi, HAL!")
386 .Sleep(1, "Hi, HAL!")
387 .Then(x => {
387 .Then(x => {
388 // запускаСм Π΄Π²Π΅ асинхронныС ΠΎΠΏΠ΅Ρ€Π°Ρ†ΠΈΠΈ
388 // запускаСм Π΄Π²Π΅ асинхронныС ΠΎΠΏΠ΅Ρ€Π°Ρ†ΠΈΠΈ
389 var result = PromiseHelper.Sleep(1000, "HEM ENABLED!!!");
389 var result = PromiseHelper.Sleep(1000, "HEM ENABLED!!!");
390 // вторая опСрация отмСняСт ΠΏΠ΅Ρ€Π²ΡƒΡŽ Π΄ΠΎ Π·Π°Π²Π΅Ρ€ΡˆΠ΅Π½ΠΈΡ
390 // вторая опСрация отмСняСт ΠΏΠ΅Ρ€Π²ΡƒΡŽ Π΄ΠΎ Π·Π°Π²Π΅Ρ€ΡˆΠ΅Π½ΠΈΡ
391 PromiseHelper
391 PromiseHelper
392 .Sleep(100, "HAL, STOP!")
392 .Sleep(100, "HAL, STOP!")
393 .Then(result.Cancel);
393 .Then(result.Cancel);
394 return result;
394 return result;
395 });
395 });
396 try {
396 try {
397 p.Join();
397 p.Join();
398 } catch (TargetInvocationException err) {
398 } catch (TargetInvocationException err) {
399 Assert.IsTrue(err.InnerException is OperationCanceledException);
399 Assert.IsTrue(err.InnerException is OperationCanceledException);
400 }
400 }
401 }
401 }
402
402
403 [TestMethod]
403 [TestMethod]
404 public void ChainedCancel2Test() {
404 public void ChainedCancel2Test() {
405 // ΠΏΡ€ΠΈ ΠΎΡ‚ΠΌΠ΅Π½Π΅ Ρ†Π΅ΠΏΠΎΡ‡ΠΊΠΈ ΠΎΠ±Π΅Ρ‰Π°Π½ΠΈΠΉ, Π²Π»ΠΎΠΆΠ΅Π½Π½Ρ‹Π΅ ΠΎΠΏΠ΅Ρ€Π°Ρ†ΠΈΠΈ Ρ‚Π°ΠΊΠΆΠ΅ Π΄ΠΎΠ»ΠΆΠ½Ρ‹ ΠΎΡ‚ΠΌΠ΅Π½ΡΡ‚ΡŒΡΡ
405 // ΠΏΡ€ΠΈ ΠΎΡ‚ΠΌΠ΅Π½Π΅ Ρ†Π΅ΠΏΠΎΡ‡ΠΊΠΈ ΠΎΠ±Π΅Ρ‰Π°Π½ΠΈΠΉ, Π²Π»ΠΎΠΆΠ΅Π½Π½Ρ‹Π΅ ΠΎΠΏΠ΅Ρ€Π°Ρ†ΠΈΠΈ Ρ‚Π°ΠΊΠΆΠ΅ Π΄ΠΎΠ»ΠΆΠ½Ρ‹ ΠΎΡ‚ΠΌΠ΅Π½ΡΡ‚ΡŒΡΡ
406 var pSurvive = new Promise<bool>();
406 var pSurvive = new Promise<bool>();
407 var hemStarted = new ManualResetEvent(false);
407 var hemStarted = new ManualResetEvent(false);
408 var p = PromiseHelper
408 var p = PromiseHelper
409 .Sleep(1, "Hi, HAL!")
409 .Sleep(1, "Hi, HAL!")
410 .Chain(x => {
410 .Chain(x => {
411 hemStarted.Set();
411 hemStarted.Set();
412 // запускаСм Π΄Π²Π΅ асинхронныС ΠΎΠΏΠ΅Ρ€Π°Ρ†ΠΈΠΈ
412 // запускаСм Π΄Π²Π΅ асинхронныС ΠΎΠΏΠ΅Ρ€Π°Ρ†ΠΈΠΈ
413 var result = PromiseHelper
413 var result = PromiseHelper
414 .Sleep(10000, "HEM ENABLED!!!")
414 .Sleep(10000, "HEM ENABLED!!!")
415 .Then(s => pSurvive.Resolve(false));
415 .Then(s => pSurvive.Resolve(false));
416
416
417 result
417 result
418 .Cancelled(() => pSurvive.Resolve(true));
418 .Cancelled(() => pSurvive.Resolve(true));
419
419
420 return result;
420 return result;
421 });
421 });
422
422
423 hemStarted.WaitOne();
423 hemStarted.WaitOne();
424 p.Cancel();
424 p.Cancel();
425
425
426 try {
426 try {
427 p.Join();
427 p.Join();
428 } catch (OperationCanceledException) {
428 } catch (OperationCanceledException) {
429 Assert.IsTrue(pSurvive.Join());
429 Assert.IsTrue(pSurvive.Join());
430 }
430 }
431 }
431 }
432 }
432 }
433 }
433 }
434
434
@@ -1,343 +1,204
1 using System;
1 using System;
2 using System.Collections.Generic;
2 using System.Collections.Generic;
3 using System.Linq;
3 using System.Linq;
4 using System.Text;
4 using System.Text;
5 using System.Threading;
5 using System.Threading;
6 using System.Diagnostics;
6 using System.Diagnostics;
7
7
8 namespace Implab.Parallels {
8 namespace Implab.Parallels {
9 public abstract class DispatchPool<TUnit> : IDisposable {
9 public abstract class DispatchPool<TUnit> : IDisposable {
10 readonly int m_minThreads;
10 readonly int m_minThreadsLimit;
11 readonly int m_maxThreads;
11 readonly int m_maxThreadsLimit;
12 readonly int m_releaseTimeout = 100; // the timeout while the working thread will wait for the new tasks before exit
12 readonly int m_releaseTimeout = 1000; // the timeout while the working thread will wait for the new tasks before exit
13
13
14 int m_createdThreads = 0; // the current size of the pool
14 int m_threads = 0; // the current size of the pool
15 int m_activeThreads = 0; // the count of threads which are active
16 int m_sleepingThreads = 0; // the count of currently inactive threads
17 int m_maxRunningThreads = 0; // the meximum reached size of the pool
15 int m_maxRunningThreads = 0; // the meximum reached size of the pool
18 int m_exitRequired = 0; // the pool is going to shutdown, all unused workers are released
16 int m_exit = 0; // the pool is going to shutdown, all unused workers are released
19
17
20 int m_wakeEvents = 0; // the count of wake events
18 readonly object m_signal = new object(); // used to pulse waiting threads
21
22 readonly object m_signalLocker = new object();
23
19
24 protected DispatchPool(int min, int max) {
20 protected DispatchPool(int min, int max) {
25 if (min < 0)
21 if (min < 0)
26 throw new ArgumentOutOfRangeException("min");
22 throw new ArgumentOutOfRangeException("min");
27 if (max <= 0)
23 if (max <= 0)
28 throw new ArgumentOutOfRangeException("max");
24 throw new ArgumentOutOfRangeException("max");
29
25
30 if (min > max)
26 if (min > max)
31 min = max;
27 min = max;
32 m_minThreads = min;
28 m_minThreadsLimit = min;
33 m_maxThreads = max;
29 m_maxThreadsLimit = max;
34 }
30 }
35
31
36 protected DispatchPool(int threads)
32 protected DispatchPool(int threads)
37 : this(threads, threads) {
33 : this(threads, threads) {
38 }
34 }
39
35
40 protected DispatchPool() {
36 protected DispatchPool() {
41 int maxThreads, maxCP;
37 int maxThreads, maxCP;
42 ThreadPool.GetMaxThreads(out maxThreads, out maxCP);
38 ThreadPool.GetMaxThreads(out maxThreads, out maxCP);
43
39
44 m_minThreads = 0;
40 m_minThreadsLimit = 0;
45 m_maxThreads = maxThreads;
41 m_maxThreadsLimit = maxThreads;
46 }
42 }
47
43
48 protected void InitPool() {
44 protected void InitPool() {
49 for (int i = 0; i < m_minThreads; i++)
45 for (int i = 0; i < m_minThreadsLimit; i++)
50 StartWorker();
46 StartWorker();
51 }
47 }
52
48
53 public int PoolSize {
49 public int PoolSize {
54 get {
50 get {
55 Thread.MemoryBarrier();
51 Thread.MemoryBarrier();
56 return m_createdThreads;
52 return m_threads;
57 }
53 }
58 }
54 }
59
55
60 public int ActiveThreads {
61 get {
62 Thread.MemoryBarrier();
63 return m_activeThreads;
64 }
65 }
66
67 public int MaxRunningThreads {
56 public int MaxRunningThreads {
68 get {
57 get {
69 Thread.MemoryBarrier();
58 Thread.MemoryBarrier();
70 return m_maxRunningThreads;
59 return m_maxRunningThreads;
71 }
60 }
72 }
61 }
73
62
74 protected bool IsDisposed {
63 protected bool IsDisposed {
75 get {
64 get {
76 Thread.MemoryBarrier();
65 Thread.MemoryBarrier();
77 return m_exitRequired == 1;
66 return m_exit == 1;
78 }
67 }
79 }
68 }
80
69
81 protected abstract bool TryDequeue(out TUnit unit);
70 protected abstract bool TryDequeue(out TUnit unit);
82
71
83 #region thread signaling traits
72 private bool Dequeue(out TUnit unit, int timeout) {
84 int SignalThread() {
73 int ts = Environment.TickCount;
85 var signals = Interlocked.Increment(ref m_wakeEvents);
74 if (TryDequeue(out unit))
86 if(signals == 1)
75 return true;
87 lock(m_signalLocker)
76 lock (m_signal) {
88 Monitor.Pulse(m_signalLocker);
77 while (!TryDequeue(out unit) && m_exit == 0)
89 return signals;
78 if(!Monitor.Wait(m_signal, Math.Max(0, ts + timeout - Environment.TickCount))) {
90 }
79 // timeout
91
80 return false;
92 bool FetchSignalOrWait(int timeout) {
93 var start = Environment.TickCount;
94 int signals;
95 Thread.MemoryBarrier(); // m_wakeEvents volatile first read
96 do {
97 signals = m_wakeEvents;
98 if (signals == 0)
99 break;
100 } while (Interlocked.CompareExchange(ref m_wakeEvents, signals - 1, signals) != signals);
101
102 if (signals == 0) {
103 // no signal is fetched
104 lock(m_signalLocker) {
105 while(m_wakeEvents == 0) {
106 if (timeout != -1)
107 timeout = Math.Max(0, timeout - (Environment.TickCount - start));
108 if(!Monitor.Wait(m_signalLocker,timeout))
109 return false; // timeout
110 }
81 }
111 // m_wakeEvents > 0
82 // queue item or terminate
112 if (Interlocked.Decrement(ref m_wakeEvents) > 0) //syncronized
83 Monitor.Pulse(m_signal);
113 Monitor.Pulse(m_signalLocker);
84 if (m_exit == 1)
114
85 return false;
115 // signal fetched
116 return true;
117 }
118
119 } else {
120 // signal fetched
121 return true;
122 }
86 }
123
87 return true;
124
125 }
88 }
126
89
127 bool Sleep(int timeout) {
90 protected void SignalThread() {
128 Interlocked.Increment(ref m_sleepingThreads);
91 lock (m_signal) {
129 if (FetchSignalOrWait(timeout)) {
92 Monitor.Pulse(m_signal);
130 Interlocked.Decrement(ref m_sleepingThreads);
131 return true;
132 } else {
133 Interlocked.Decrement(ref m_sleepingThreads);
134 return false;
135 }
136 }
137 #endregion
138
139 /// <summary>
140 /// ЗапускаСт Π»ΠΈΠ±ΠΎ Π½ΠΎΠ²Ρ‹ΠΉ ΠΏΠΎΡ‚ΠΎΠΊ, Ссли Ρ€Π°Π½ΡŒΡˆΠ΅ Π½Π΅ Π±Ρ‹Π»ΠΎ Π½ΠΈ ΠΎΠ΄Π½ΠΎΠ³ΠΎ ΠΏΠΎΡ‚ΠΎΠΊΠ°, Π»ΠΈΠ±ΠΎ устанавливаСт событиС ΠΏΡ€ΠΎΠ±ΡƒΠΆΠ΄Π΅Π½ΠΈΠ΅ ΠΎΠ΄Π½ΠΎΠ³ΠΎ спящСго ΠΏΠΎΡ‚ΠΎΠΊΠ°
141 /// </summary>
142 protected void GrowPool() {
143 Thread.MemoryBarrier();
144 if (m_exitRequired == 1)
145 return;
146 if (m_sleepingThreads > m_wakeEvents) {
147 //Console.WriteLine("Waking threads (sleeps {0}, pending {1})", m_sleepingThreads, m_wakeEvents);
148
149 // all sleeping threads may gone
150 SignalThread(); // wake a sleeping thread;
151
152 // we can't check whether signal has been processed
153 // anyway it may take some time for the thread to start
154 // we will ensure that at least one thread is running
155
156 EnsurePoolIsAlive();
157 } else {
158 // if there is no sleeping threads in the pool
159 if (!StartWorker()) {
160 // we haven't started a new thread, but the current can be on the way to terminate and it can't process the queue
161 // send it a signal to spin again
162 SignalThread();
163 EnsurePoolIsAlive();
164 }
165 }
93 }
166 }
94 }
167
95
168 protected void EnsurePoolIsAlive() {
169 if (AllocateThreadSlot(1)) {
170 // if there were no threads in the pool
171 var worker = new Thread(this.Worker);
172 worker.IsBackground = true;
173 worker.Start();
174 }
175 }
176
177 protected virtual bool Suspend() {
178 //no tasks left, exit if the thread is no longer needed
179 bool last;
180 bool requestExit;
181
182 // if threads have a timeout before releasing
183 if (m_releaseTimeout > 0)
184 requestExit = !Sleep(m_releaseTimeout);
185 else
186 requestExit = true;
187
188 if (!requestExit)
189 return true;
190
191 // release unsused thread
192 if (requestExit && ReleaseThreadSlot(out last)) {
193 // in case at the moment the last thread was being released
194 // a new task was added to the queue, we need to try
195 // to revoke the thread to avoid the situation when the task is left unprocessed
196 if (last && FetchSignalOrWait(0)) { // FetchSignalOrWait(0) will fetch pending task or will return false
197 SignalThread(); // since FetchSignalOrWait(0) has fetched the signal we need to reschedule it
198 return AllocateThreadSlot(1); // ensure that at least one thread is alive
199 }
200
201 return false;
202 }
203
204 // wait till infinity
205 Sleep(-1);
206
207 return true;
208 }
209
210 #region thread slots traits
96 #region thread slots traits
211
97
212 bool AllocateThreadSlot() {
98 bool AllocateThreadSlot() {
213 int current;
99 int current;
214 // use spins to allocate slot for the new thread
100 // use spins to allocate slot for the new thread
215 do {
101 do {
216 current = m_createdThreads;
102 current = m_threads;
217 if (current >= m_maxThreads || m_exitRequired == 1)
103 if (current >= m_maxThreadsLimit || m_exit == 1)
218 // no more slots left or the pool has been disposed
104 // no more slots left or the pool has been disposed
219 return false;
105 return false;
220 } while (current != Interlocked.CompareExchange(ref m_createdThreads, current + 1, current));
106 } while (current != Interlocked.CompareExchange(ref m_threads, current + 1, current));
221
107
222 UpdateMaxThreads(current + 1);
108 UpdateMaxThreads(current + 1);
223
109
224 return true;
110 return true;
225 }
111 }
226
112
227 bool AllocateThreadSlot(int desired) {
113 bool AllocateThreadSlot(int desired) {
228 if (desired - 1 != Interlocked.CompareExchange(ref m_createdThreads, desired, desired - 1))
114 if (desired - 1 != Interlocked.CompareExchange(ref m_threads, desired, desired - 1))
229 return false;
115 return false;
230
116
231 UpdateMaxThreads(desired);
117 UpdateMaxThreads(desired);
232
118
233 return true;
119 return true;
234 }
120 }
235
121
236 bool ReleaseThreadSlot(out bool last) {
122 bool ReleaseThreadSlot(out bool last) {
237 last = false;
123 last = false;
238 int current;
124 int current;
239 // use spins to release slot for the new thread
125 // use spins to release slot for the new thread
240 Thread.MemoryBarrier();
126 Thread.MemoryBarrier();
241 do {
127 do {
242 current = m_createdThreads;
128 current = m_threads;
243 if (current <= m_minThreads && m_exitRequired == 0)
129 if (current <= m_minThreadsLimit && m_exit == 0)
244 // the thread is reserved
130 // the thread is reserved
245 return false;
131 return false;
246 } while (current != Interlocked.CompareExchange(ref m_createdThreads, current - 1, current));
132 } while (current != Interlocked.CompareExchange(ref m_threads, current - 1, current));
247
133
248 last = (current == 1);
134 last = (current == 1);
249
135
250 return true;
136 return true;
251 }
137 }
252
138
253 /// <summary>
254 /// releases thread slot unconditionally, used during cleanup
255 /// </summary>
256 /// <returns>true - no more threads left</returns>
257 bool ReleaseThreadSlotAnyway() {
258 var left = Interlocked.Decrement(ref m_createdThreads);
259 return left == 0;
260 }
261
262 void UpdateMaxThreads(int count) {
139 void UpdateMaxThreads(int count) {
263 int max;
140 int max;
264 do {
141 do {
265 max = m_maxRunningThreads;
142 max = m_maxRunningThreads;
266 if (max >= count)
143 if (max >= count)
267 break;
144 break;
268 } while(max != Interlocked.CompareExchange(ref m_maxRunningThreads, count, max));
145 } while(max != Interlocked.CompareExchange(ref m_maxRunningThreads, count, max));
269 }
146 }
270
147
271 #endregion
148 #endregion
272
149
273 bool StartWorker() {
150 protected bool StartWorker() {
274 if (AllocateThreadSlot()) {
151 if (AllocateThreadSlot()) {
275 // slot successfully allocated
152 // slot successfully allocated
276 var worker = new Thread(this.Worker);
153 var worker = new Thread(this.Worker);
277 worker.IsBackground = true;
154 worker.IsBackground = true;
278 Interlocked.Increment(ref m_activeThreads);
279 worker.Start();
155 worker.Start();
280
156
281 return true;
157 return true;
282 } else {
158 } else {
283 return false;
159 return false;
284 }
160 }
285 }
161 }
286
162
287 protected abstract void InvokeUnit(TUnit unit);
163 protected abstract void InvokeUnit(TUnit unit);
288
164
289 protected virtual void Worker() {
165 protected virtual void Worker() {
290 TUnit unit;
166 TUnit unit;
291 //Console.WriteLine("{0}: Active", Thread.CurrentThread.ManagedThreadId);
167 bool last;
292 int count = 0;;
293 Thread.MemoryBarrier();
294 do {
168 do {
295 // exit if requested
169 while (Dequeue(out unit, m_releaseTimeout)) {
296 if (m_exitRequired == 1) {
297 // release the thread slot
298 Interlocked.Decrement(ref m_activeThreads);
299 if (!ReleaseThreadSlotAnyway()) // it was the last worker
300 SignalThread(); // wake next worker
301 break;
302 }
303
304 // fetch task
305 if (TryDequeue(out unit)) {
306 InvokeUnit(unit);
170 InvokeUnit(unit);
307 count ++;
171 }
172 if(!ReleaseThreadSlot(out last))
308 continue;
173 continue;
174 // queue may be not empty
175 if (last && TryDequeue(out unit)) {
176 InvokeUnit(unit);
177 if (AllocateThreadSlot(1))
178 continue;
179 // we can safely exit since pool is alive
309 }
180 }
310 Interlocked.Decrement(ref m_activeThreads);
181 break;
182 } while(true);
183 }
311
184
312 Console.WriteLine("{0}: Suspend processed({1})", Thread.CurrentThread.ManagedThreadId,count);
313 // entering suspend state
314 // keep this thread and wait
315 if (!Suspend())
316 break;
317 count = 0;
318 //Console.WriteLine("{0}: Awake", Thread.CurrentThread.ManagedThreadId);
319 Interlocked.Increment(ref m_activeThreads);
320 } while (true);
321 //Console.WriteLine("{0}: Exited", Thread.CurrentThread.ManagedThreadId);
322 }
323
185
324 protected virtual void Dispose(bool disposing) {
186 protected virtual void Dispose(bool disposing) {
325 if (disposing) {
187 if (disposing) {
326 if (0 == Interlocked.CompareExchange(ref m_exitRequired, 1, 0)) { // implies memory barrier
188 if (0 == Interlocked.CompareExchange(ref m_exit, 1, 0)) { // implies memory barrier
327 // wake sleeping threads
189 // wake sleeping threads
328 if (m_createdThreads > 0)
190 SignalThread();
329 SignalThread();
330 GC.SuppressFinalize(this);
191 GC.SuppressFinalize(this);
331 }
192 }
332 }
193 }
333 }
194 }
334
195
335 public void Dispose() {
196 public void Dispose() {
336 Dispose(true);
197 Dispose(true);
337 }
198 }
338
199
339 ~DispatchPool() {
200 ~DispatchPool() {
340 Dispose(false);
201 Dispose(false);
341 }
202 }
342 }
203 }
343 }
204 }
@@ -1,106 +1,89
1 using System;
1 using System;
2 using System.Collections.Generic;
2 using System.Collections.Generic;
3 using System.Linq;
3 using System.Linq;
4 using System.Text;
4 using System.Text;
5 using System.Threading;
5 using System.Threading;
6 using System.Diagnostics;
6 using System.Diagnostics;
7 using Implab.Diagnostics;
7 using Implab.Diagnostics;
8
8
9 namespace Implab.Parallels {
9 namespace Implab.Parallels {
10 public class WorkerPool : DispatchPool<Action> {
10 public class WorkerPool : DispatchPool<Action> {
11
11
12 MTQueue<Action> m_queue = new MTQueue<Action>();
12 MTQueue<Action> m_queue = new MTQueue<Action>();
13 int m_queueLength = 0;
13 int m_queueLength = 0;
14 readonly int m_threshold = 1;
14 readonly int m_threshold = 1;
15 int m_workers = 0;
15 int m_workers = 0;
16
16
17 public WorkerPool(int minThreads, int maxThreads, int threshold)
17 public WorkerPool(int minThreads, int maxThreads, int threshold)
18 : base(minThreads, maxThreads) {
18 : base(minThreads, maxThreads) {
19 m_threshold = threshold;
19 m_threshold = threshold;
20 m_workers = minThreads;
20 m_workers = minThreads;
21 InitPool();
21 InitPool();
22 }
22 }
23
23
24 public WorkerPool(int minThreads, int maxThreads) :
24 public WorkerPool(int minThreads, int maxThreads) :
25 base(minThreads, maxThreads) {
25 base(minThreads, maxThreads) {
26 m_workers = minThreads;
26 m_workers = minThreads;
27 InitPool();
27 InitPool();
28 }
28 }
29
29
30 public WorkerPool(int threads)
30 public WorkerPool(int threads)
31 : base(threads) {
31 : base(threads) {
32 m_workers = threads;
32 m_workers = threads;
33 InitPool();
33 InitPool();
34 }
34 }
35
35
36 public WorkerPool()
36 public WorkerPool()
37 : base() {
37 : base() {
38 InitPool();
38 InitPool();
39 }
39 }
40
40
41 public Promise<T> Invoke<T>(Func<T> task) {
41 public Promise<T> Invoke<T>(Func<T> task) {
42 if (task == null)
42 if (task == null)
43 throw new ArgumentNullException("task");
43 throw new ArgumentNullException("task");
44 if (IsDisposed)
44 if (IsDisposed)
45 throw new ObjectDisposedException(ToString());
45 throw new ObjectDisposedException(ToString());
46
46
47 var promise = new Promise<T>();
47 var promise = new Promise<T>();
48
48
49 var caller = TraceContext.Snapshot();
49 var caller = TraceContext.Snapshot();
50
50
51 EnqueueTask(delegate() {
51 EnqueueTask(delegate() {
52 caller.Invoke(delegate() {
52 caller.Invoke(delegate() {
53 try {
53 try {
54 promise.Resolve(task());
54 promise.Resolve(task());
55 } catch (Exception e) {
55 } catch (Exception e) {
56 promise.Reject(e);
56 promise.Reject(e);
57 }
57 }
58 });
58 });
59 });
59 });
60
60
61 return promise;
61 return promise;
62 }
62 }
63
63
64 protected void EnqueueTask(Action unit) {
64 protected void EnqueueTask(Action unit) {
65 Debug.Assert(unit != null);
65 Debug.Assert(unit != null);
66 var len = Interlocked.Increment(ref m_queueLength);
66 var len = Interlocked.Increment(ref m_queueLength);
67 m_queue.Enqueue(unit);
67 m_queue.Enqueue(unit);
68
68
69 if (len > m_threshold * m_workers) {
69 if (len > m_threshold * PoolSize) {
70 Interlocked.Increment(ref m_workers);
70 StartWorker();
71 GrowPool();
72 }
71 }
72
73 SignalThread();
73 }
74 }
74
75
75 protected override bool TryDequeue(out Action unit) {
76 protected override bool TryDequeue(out Action unit) {
76 if (m_queue.TryDequeue(out unit)) {
77 if (m_queue.TryDequeue(out unit)) {
77 Interlocked.Decrement(ref m_queueLength);
78 Interlocked.Decrement(ref m_queueLength);
78 return true;
79 return true;
79 }
80 }
80 return false;
81 return false;
81 }
82 }
82
83
83 protected override bool Suspend() {
84 // This override solves race condition
85 // WORKER CLIENT
86 // ---------------------------------------
87 // TryDeque == false
88 // Enqueue(unit), queueLen++
89 // GrowPool? == NO
90 // ActiveThreads--
91 // Suspend
92 // queueLength > 0
93 // continue
94 Thread.MemoryBarrier();
95 if (m_queueLength > 0)
96 return true;
97 Interlocked.Decrement(ref m_workers);
98 return base.Suspend();
99 }
100
101 protected override void InvokeUnit(Action unit) {
84 protected override void InvokeUnit(Action unit) {
102 unit();
85 unit();
103 }
86 }
104
87
105 }
88 }
106 }
89 }
General Comments 0
You need to be logged in to leave comments. Login now