##// END OF EJS Templates
working version of AsyncQueue and batch operations...
cin -
r121:62d2f1e98c4e v2
parent child
Show More
@@ -1,513 +1,600
1 1 using System;
2 2 using System.Reflection;
3 3 using System.Threading;
4 4 using Implab.Parallels;
5 5
6 6 #if MONO
7 7
8 8 using NUnit.Framework;
9 9 using TestClassAttribute = NUnit.Framework.TestFixtureAttribute;
10 10 using TestMethod = NUnit.Framework.TestAttribute;
11 11
12 12 #else
13 13
14 14 using Microsoft.VisualStudio.TestTools.UnitTesting;
15 15
16 16 #endif
17 17
18 18 namespace Implab.Test {
19 19 [TestClass]
20 20 public class AsyncTests {
21 21 [TestMethod]
22 22 public void ResolveTest() {
23 23 int res = -1;
24 24 var p = new Promise<int>();
25 25 p.Then(x => res = x);
26 26 p.Resolve(100);
27 27
28 28 Assert.AreEqual(100, res);
29 29 }
30 30
31 31 [TestMethod]
32 32 public void RejectTest() {
33 33 int res = -1;
34 34 Exception err = null;
35 35
36 36 var p = new Promise<int>();
37 37 p.Then(
38 38 x => res = x,
39 39 e => {
40 40 err = e;
41 41 return -2;
42 42 }
43 43 );
44 44 p.Reject(new ApplicationException("error"));
45 45
46 46 Assert.AreEqual(res, -1);
47 47 Assert.AreEqual(err.Message, "error");
48 48
49 49 }
50 50
51 51 [TestMethod]
52 52 public void CancelExceptionTest() {
53 53 var p = new Promise<bool>();
54 54 p.Cancel();
55 55
56 56 var p2 = p.Cancelled(() => {
57 57 throw new ApplicationException("CANCELLED");
58 58 });
59 59
60 60 try {
61 61 p2.Join();
62 62 Assert.Fail();
63 63 } catch (ApplicationException err) {
64 64 Assert.AreEqual("CANCELLED", err.InnerException.Message);
65 65 }
66 66
67 67 }
68 68
69 69 [TestMethod]
70 70 public void ContinueOnCancelTest() {
71 71 var p = new Promise<bool>();
72 72 p.Cancel();
73 73
74 74 var p2 = p
75 75 .Cancelled<bool>(() => {
76 76 throw new ApplicationException("CANCELLED");
77 77 })
78 78 .Error(e => true);
79 79
80 80 Assert.AreEqual(true, p2.Join());
81 81 }
82 82
83 83 [TestMethod]
84 84 public void JoinSuccessTest() {
85 85 var p = new Promise<int>();
86 86 p.Resolve(100);
87 87 Assert.AreEqual(p.Join(), 100);
88 88 }
89 89
90 90 [TestMethod]
91 91 public void JoinFailTest() {
92 92 var p = new Promise<int>();
93 93 p.Reject(new ApplicationException("failed"));
94 94
95 95 try {
96 96 p.Join();
97 97 throw new ApplicationException("WRONG!");
98 98 } catch (TargetInvocationException err) {
99 99 Assert.AreEqual(err.InnerException.Message, "failed");
100 100 } catch {
101 101 Assert.Fail("Got wrong excaption");
102 102 }
103 103 }
104 104
105 105 [TestMethod]
106 106 public void MapTest() {
107 107 var p = new Promise<int>();
108 108
109 109 var p2 = p.Then(x => x.ToString());
110 110 p.Resolve(100);
111 111
112 112 Assert.AreEqual(p2.Join(), "100");
113 113 }
114 114
115 115 [TestMethod]
116 116 public void FixErrorTest() {
117 117 var p = new Promise<int>();
118 118
119 119 var p2 = p.Error(e => 101);
120 120
121 121 p.Reject(new Exception());
122 122
123 123 Assert.AreEqual(p2.Join(), 101);
124 124 }
125 125
126 126 [TestMethod]
127 127 public void ChainTest() {
128 128 var p1 = new Promise<int>();
129 129
130 130 var p3 = p1.Chain(x => {
131 131 var p2 = new Promise<string>();
132 132 p2.Resolve(x.ToString());
133 133 return p2;
134 134 });
135 135
136 136 p1.Resolve(100);
137 137
138 138 Assert.AreEqual(p3.Join(), "100");
139 139 }
140 140
141 141 [TestMethod]
142 142 public void ChainFailTest() {
143 143 var p1 = new Promise<int>();
144 144
145 145 var p3 = p1.Chain(x => {
146 146 var p2 = new Promise<string>();
147 147 p2.Reject(new Exception("DIE!!!"));
148 148 return p2;
149 149 });
150 150
151 151 p1.Resolve(100);
152 152
153 153 Assert.IsTrue(p3.IsResolved);
154 154 }
155 155
156 156 [TestMethod]
157 157 public void PoolTest() {
158 158 var pid = Thread.CurrentThread.ManagedThreadId;
159 159 var p = AsyncPool.Invoke(() => Thread.CurrentThread.ManagedThreadId);
160 160
161 161 Assert.AreNotEqual(pid, p.Join());
162 162 }
163 163
164 164 [TestMethod]
165 165 public void WorkerPoolSizeTest() {
166 166 var pool = new WorkerPool(5, 10, 1);
167 167
168 168 Assert.AreEqual(5, pool.PoolSize);
169 169
170 170 pool.Invoke(() => { Thread.Sleep(100000000); return 10; });
171 171 pool.Invoke(() => { Thread.Sleep(100000000); return 10; });
172 172 pool.Invoke(() => { Thread.Sleep(100000000); return 10; });
173 173
174 174 Assert.AreEqual(5, pool.PoolSize);
175 175
176 176 for (int i = 0; i < 100; i++)
177 177 pool.Invoke(() => { Thread.Sleep(100000000); return 10; });
178 178 Thread.Sleep(200);
179 179 Assert.AreEqual(10, pool.PoolSize);
180 180
181 181 pool.Dispose();
182 182 }
183 183
184 184 [TestMethod]
185 185 public void WorkerPoolCorrectTest() {
186 186 var pool = new WorkerPool(0,1000,100);
187 187
188 188 const int iterations = 1000;
189 189 int pending = iterations;
190 190 var stop = new ManualResetEvent(false);
191 191
192 192 var count = 0;
193 193 for (int i = 0; i < iterations; i++) {
194 194 pool
195 195 .Invoke(() => 1)
196 196 .Then(x => Interlocked.Add(ref count, x))
197 197 .Then(x => Math.Log10(x))
198 198 .On(() => {
199 199 Interlocked.Decrement(ref pending);
200 200 if (pending == 0)
201 201 stop.Set();
202 202 }, PromiseEventType.All);
203 203 }
204 204
205 205 stop.WaitOne();
206 206
207 207 Assert.AreEqual(iterations, count);
208 208 Console.WriteLine("Max threads: {0}", pool.MaxRunningThreads);
209 209 pool.Dispose();
210 210
211 211 }
212 212
213 213 [TestMethod]
214 214 public void WorkerPoolDisposeTest() {
215 215 var pool = new WorkerPool(5, 20);
216 216 Assert.AreEqual(5, pool.PoolSize);
217 217 pool.Dispose();
218 218 Thread.Sleep(500);
219 219 Assert.AreEqual(0, pool.PoolSize);
220 220 pool.Dispose();
221 221 }
222 222
223 223 [TestMethod]
224 224 public void MTQueueTest() {
225 225 var queue = new MTQueue<int>();
226 226 int res;
227 227
228 228 queue.Enqueue(10);
229 229 Assert.IsTrue(queue.TryDequeue(out res));
230 230 Assert.AreEqual(10, res);
231 231 Assert.IsFalse(queue.TryDequeue(out res));
232 232
233 233 for (int i = 0; i < 1000; i++)
234 234 queue.Enqueue(i);
235 235
236 236 for (int i = 0; i < 1000; i++) {
237 237 queue.TryDequeue(out res);
238 238 Assert.AreEqual(i, res);
239 239 }
240 240
241 241 int writers = 0;
242 242 int readers = 0;
243 243 var stop = new ManualResetEvent(false);
244 244 int total = 0;
245 245
246 246 const int itemsPerWriter = 10000;
247 247 const int writersCount = 10;
248 248
249 249 for (int i = 0; i < writersCount; i++) {
250 250 Interlocked.Increment(ref writers);
251 251 AsyncPool
252 252 .InvokeNewThread(() => {
253 253 for (int ii = 0; ii < itemsPerWriter; ii++) {
254 254 queue.Enqueue(1);
255 255 }
256 256 return 1;
257 257 })
258 258 .On(() => Interlocked.Decrement(ref writers), PromiseEventType.All);
259 259 }
260 260
261 261 for (int i = 0; i < 10; i++) {
262 262 Interlocked.Increment(ref readers);
263 263 AsyncPool
264 264 .InvokeNewThread(() => {
265 265 int t;
266 266 do {
267 267 while (queue.TryDequeue(out t))
268 268 Interlocked.Add(ref total, t);
269 269 } while (writers > 0);
270 270 return 1;
271 271 })
272 272 .On(() => {
273 273 Interlocked.Decrement(ref readers);
274 274 if (readers == 0)
275 275 stop.Set();
276 276 }, PromiseEventType.All);
277 277 }
278 278
279 279 stop.WaitOne();
280 280
281 281 Assert.AreEqual(100000, total);
282 282 }
283 283
284 284 [TestMethod]
285 285 public void AsyncQueueTest() {
286 286 var queue = new AsyncQueue<int>();
287 287 int res;
288 288
289 289 queue.Enqueue(10);
290 290 Assert.IsTrue(queue.TryDequeue(out res));
291 291 Assert.AreEqual(10, res);
292 292 Assert.IsFalse(queue.TryDequeue(out res));
293 293
294 294 for (int i = 0; i < 1000; i++)
295 295 queue.Enqueue(i);
296 296
297 297 for (int i = 0; i < 1000; i++) {
298 298 queue.TryDequeue(out res);
299 299 Assert.AreEqual(i, res);
300 300 }
301 301
302 int writers = 0;
303 int readers = 0;
304 var stop = new ManualResetEvent(false);
305 int total = 0;
302 const int count = 10000000;
306 303
307 const int itemsPerWriter = 10000;
308 const int writersCount = 10;
304 int res1 = 0, res2 = 0;
305 var t1 = Environment.TickCount;
309 306
310 for (int i = 0; i < writersCount; i++) {
311 Interlocked.Increment(ref writers);
312 AsyncPool
313 .InvokeNewThread(() => {
314 for (int ii = 0; ii < itemsPerWriter; ii++) {
307 AsyncPool.RunThread(
308 () => {
309 for (var i = 0; i < count; i++)
315 310 queue.Enqueue(1);
311 Console.WriteLine("done writer #1: {0} ms", Environment.TickCount - t1);
312 },
313 () => {
314 for (var i = 0; i < count; i++)
315 queue.Enqueue(2);
316 Console.WriteLine("done writer #2: {0} ms", Environment.TickCount - t1);
317 },
318 () => {
319 int temp;
320 int i = 0;
321 while (i < count)
322 if (queue.TryDequeue(out temp)) {
323 i++;
324 res1 += temp;
316 325 }
317 return 1;
318 })
319 .On(() => Interlocked.Decrement(ref writers), PromiseEventType.All);
326 Console.WriteLine("done reader #1: {0} ms", Environment.TickCount - t1);
327 },
328 () => {
329 int temp;
330 int i = 0;
331 while (i < count)
332 if (queue.TryDequeue(out temp)) {
333 i++;
334 res2 += temp;
335 }
336 Console.WriteLine("done reader #2: {0} ms", Environment.TickCount - t1);
337 }
338 )
339 .Combine()
340 .Join();
341
342 Assert.AreEqual(count * 3, res1 + res2);
343
344 Console.WriteLine(
345 "done: {0} ms, summ#1: {1}, summ#2: {2}, total: {3}, count: {4}",
346 Environment.TickCount - t1,
347 res1,
348 res2,
349 res1 + res2,
350 count
351 );
320 352 }
321 353
322 for (int i = 0; i < 10; i++) {
323 Interlocked.Increment(ref readers);
324 AsyncPool
325 .InvokeNewThread(() => {
326 int t;
327 do {
328 while (queue.TryDequeue(out t))
329 Interlocked.Add(ref total, t);
330 } while (writers > 0);
331 return 1;
332 })
333 .On(() => {
334 Interlocked.Decrement(ref readers);
335 if (readers == 0)
336 stop.Set();
337 }, PromiseEventType.All);
354 [TestMethod]
355 public void AsyncQueueBatchTest() {
356 var queue = new AsyncQueue<int>();
357
358 const int wBatch = 29;
359 const int wCount = 400000;
360 const int total = wBatch * wCount * 2;
361 const int summ = wBatch * wCount * 3;
362
363 int r1 = 0, r2 = 0;
364 const int rBatch = 111;
365 int read = 0;
366
367 var t1 = Environment.TickCount;
368
369 AsyncPool.RunThread(
370 () => {
371 var buffer = new int[wBatch];
372 for(int i = 0; i<wBatch; i++)
373 buffer[i] = 1;
374
375 for(int i =0; i < wCount; i++)
376 queue.EnqueueRange(buffer,0,wBatch);
377 Console.WriteLine("done writer #1: {0} ms", Environment.TickCount - t1);
378 },
379 () => {
380 var buffer = new int[wBatch];
381 for(int i = 0; i<wBatch; i++)
382 buffer[i] = 2;
383
384 for(int i =0; i < wCount; i++)
385 queue.EnqueueRange(buffer,0,wBatch);
386 Console.WriteLine("done writer #2: {0} ms", Environment.TickCount - t1);
387 },
388 () => {
389 var buffer = new int[rBatch];
390
391 while(read < total) {
392 int actual;
393 if (queue.TryDequeueRange(buffer,0,rBatch,out actual)) {
394 for(int i=0; i< actual; i++)
395 r1 += buffer[i];
396 Interlocked.Add(ref read, actual);
397 }
338 398 }
339 399
340 stop.WaitOne();
400 Console.WriteLine("done reader #1: {0} ms", Environment.TickCount - t1);
401 },
402 () => {
403 var buffer = new int[rBatch];
404
405 while(read < total) {
406 int actual;
407 if (queue.TryDequeueRange(buffer,0,rBatch,out actual)) {
408 for(int i=0; i< actual; i++)
409 r2 += buffer[i];
410 Interlocked.Add(ref read, actual);
411 }
412 }
341 413
342 Assert.AreEqual(itemsPerWriter * writersCount, total);
414 Console.WriteLine("done reader #2: {0} ms", Environment.TickCount - t1);
415 }
416 )
417 .Combine()
418 .Join();
419
420 Assert.AreEqual(summ , r1 + r2);
421
422 Console.WriteLine(
423 "done: {0} ms, summ#1: {1}, summ#2: {2}, total: {3}, count: {4}",
424 Environment.TickCount - t1,
425 r1,
426 r2,
427 r1 + r2,
428 total
429 );
343 430 }
344 431
345 432 [TestMethod]
346 433 public void ParallelMapTest() {
347 434
348 435 const int count = 100000;
349 436
350 437 var args = new double[count];
351 438 var rand = new Random();
352 439
353 440 for (int i = 0; i < count; i++)
354 441 args[i] = rand.NextDouble();
355 442
356 443 var t = Environment.TickCount;
357 444 var res = args.ParallelMap(x => Math.Sin(x*x), 4).Join();
358 445
359 446 Console.WriteLine("Map complete in {0} ms", Environment.TickCount - t);
360 447
361 448 t = Environment.TickCount;
362 449 for (int i = 0; i < count; i++)
363 450 Assert.AreEqual(Math.Sin(args[i] * args[i]), res[i]);
364 451 Console.WriteLine("Verified in {0} ms", Environment.TickCount - t);
365 452 }
366 453
367 454 [TestMethod]
368 455 public void ChainedMapTest() {
369 456
370 457 using (var pool = new WorkerPool(0,10,1)) {
371 458 const int count = 10000;
372 459
373 460 var args = new double[count];
374 461 var rand = new Random();
375 462
376 463 for (int i = 0; i < count; i++)
377 464 args[i] = rand.NextDouble();
378 465
379 466 var t = Environment.TickCount;
380 467 var res = args
381 468 .ChainedMap(
382 469 // Analysis disable once AccessToDisposedClosure
383 470 x => pool.Invoke(
384 471 () => Math.Sin(x * x)
385 472 ),
386 473 4
387 474 )
388 475 .Join();
389 476
390 477 Console.WriteLine("Map complete in {0} ms", Environment.TickCount - t);
391 478
392 479 t = Environment.TickCount;
393 480 for (int i = 0; i < count; i++)
394 481 Assert.AreEqual(Math.Sin(args[i] * args[i]), res[i]);
395 482 Console.WriteLine("Verified in {0} ms", Environment.TickCount - t);
396 483 Console.WriteLine("Max workers: {0}", pool.MaxRunningThreads);
397 484 }
398 485 }
399 486
400 487 [TestMethod]
401 488 public void ParallelForEachTest() {
402 489
403 490 const int count = 100000;
404 491
405 492 var args = new int[count];
406 493 var rand = new Random();
407 494
408 495 for (int i = 0; i < count; i++)
409 496 args[i] = (int)(rand.NextDouble() * 100);
410 497
411 498 int result = 0;
412 499
413 500 var t = Environment.TickCount;
414 501 args.ParallelForEach(x => Interlocked.Add(ref result, x), 4).Join();
415 502
416 503 Console.WriteLine("Iteration complete in {0} ms, result: {1}", Environment.TickCount - t, result);
417 504
418 505 int result2 = 0;
419 506
420 507 t = Environment.TickCount;
421 508 for (int i = 0; i < count; i++)
422 509 result2 += args[i];
423 510 Assert.AreEqual(result2, result);
424 511 Console.WriteLine("Verified in {0} ms", Environment.TickCount - t);
425 512 }
426 513
427 514 [TestMethod]
428 515 public void ComplexCase1Test() {
429 516 var flags = new bool[3];
430 517
431 518 // op1 (aync 200ms) => op2 (async 200ms) => op3 (sync map)
432 519
433 520 var step1 = PromiseHelper
434 521 .Sleep(200, "Alan")
435 522 .On(() => flags[0] = true, PromiseEventType.Cancelled);
436 523 var p = step1
437 524 .Chain(x =>
438 525 PromiseHelper
439 526 .Sleep(200, "Hi, " + x)
440 527 .Then(y => y)
441 528 .On(() => flags[1] = true, PromiseEventType.Cancelled)
442 529 )
443 530 .On(() => flags[2] = true, PromiseEventType.Cancelled);
444 531 step1.Join();
445 532 p.Cancel();
446 533 try {
447 534 Assert.AreEqual(p.Join(), "Hi, Alan");
448 535 Assert.Fail("Shouldn't get here");
449 536 } catch (OperationCanceledException) {
450 537 }
451 538
452 539 Assert.IsFalse(flags[0]);
453 540 Assert.IsTrue(flags[1]);
454 541 Assert.IsTrue(flags[2]);
455 542 }
456 543
457 544 [TestMethod]
458 545 public void ChainedCancel1Test() {
459 546 // ΠΏΡ€ΠΈ ΠΎΡ‚ΠΌΠ΅Π½Π΅ сцСплСнной асинхронной ΠΎΠΏΠ΅Ρ€Π°Ρ†ΠΈΠΈ всС ΠΎΠ±Π΅Ρ‰Π°Π½ΠΈΠ΅ Π΄ΠΎΠ»ΠΆΠ½ΠΎ
460 547 // Π·Π°Π²Π΅Ρ€ΡˆΠ°Ρ‚ΡŒΡΡ ошибкой OperationCanceledException
461 548 var p = PromiseHelper
462 549 .Sleep(1, "Hi, HAL!")
463 550 .Then(x => {
464 551 // запускаСм Π΄Π²Π΅ асинхронныС ΠΎΠΏΠ΅Ρ€Π°Ρ†ΠΈΠΈ
465 552 var result = PromiseHelper.Sleep(1000, "HEM ENABLED!!!");
466 553 // вторая опСрация отмСняСт ΠΏΠ΅Ρ€Π²ΡƒΡŽ Π΄ΠΎ Π·Π°Π²Π΅Ρ€ΡˆΠ΅Π½ΠΈΡ
467 554 PromiseHelper
468 555 .Sleep(100, "HAL, STOP!")
469 556 .Then(result.Cancel);
470 557 return result;
471 558 });
472 559 try {
473 560 p.Join();
474 561 } catch (TargetInvocationException err) {
475 562 Assert.IsTrue(err.InnerException is OperationCanceledException);
476 563 }
477 564 }
478 565
479 566 [TestMethod]
480 567 public void ChainedCancel2Test() {
481 568 // ΠΏΡ€ΠΈ ΠΎΡ‚ΠΌΠ΅Π½Π΅ Ρ†Π΅ΠΏΠΎΡ‡ΠΊΠΈ ΠΎΠ±Π΅Ρ‰Π°Π½ΠΈΠΉ, Π²Π»ΠΎΠΆΠ΅Π½Π½Ρ‹Π΅ ΠΎΠΏΠ΅Ρ€Π°Ρ†ΠΈΠΈ Ρ‚Π°ΠΊΠΆΠ΅ Π΄ΠΎΠ»ΠΆΠ½Ρ‹ ΠΎΡ‚ΠΌΠ΅Π½ΡΡ‚ΡŒΡΡ
482 569 var pSurvive = new Promise<bool>();
483 570 var hemStarted = new ManualResetEvent(false);
484 571 var p = PromiseHelper
485 572 .Sleep(1, "Hi, HAL!")
486 573 .Chain(x => {
487 574 hemStarted.Set();
488 575 // запускаСм Π΄Π²Π΅ асинхронныС ΠΎΠΏΠ΅Ρ€Π°Ρ†ΠΈΠΈ
489 576 var result = PromiseHelper
490 577 .Sleep(100000000, "HEM ENABLED!!!")
491 578 .Then(s => {
492 579 pSurvive.Resolve(false);
493 580 return s;
494 581 });
495 582
496 583 result
497 584 .Cancelled(() => pSurvive.Resolve(true));
498 585
499 586 return result;
500 587 });
501 588
502 589 hemStarted.WaitOne();
503 590 p.Cancel();
504 591
505 592 try {
506 593 p.Join();
507 594 } catch (OperationCanceledException) {
508 595 Assert.IsTrue(pSurvive.Join());
509 596 }
510 597 }
511 598 }
512 599 }
513 600
@@ -1,86 +1,86
1 1 using Implab.Diagnostics;
2 2 using System;
3 3 using System.Threading;
4 4 using System.Linq;
5 5
6 6 namespace Implab.Parallels {
7 7 /// <summary>
8 8 /// Класс для распаралСливания Π·Π°Π΄Π°Ρ‡.
9 9 /// </summary>
10 10 /// <remarks>
11 11 /// Π˜ΡΠΏΠΎΠ»ΡŒΠ·ΡƒΡ Π΄Π°Π½Π½Ρ‹ΠΉ класс ΠΈ лямда выраТСния ΠΌΠΎΠΆΠ½ΠΎ Ρ€Π°ΡΠΏΠ°Ρ€Π°Π»Π»Π΅Π»ΠΈΡ‚ΡŒ
12 12 /// вычислСния, для этого ΠΈΡΠΏΠΎΠ»ΡŒΠ·ΡƒΠ΅Ρ‚ΡΡ концСпция ΠΎΠ±Π΅Ρ‰Π°Π½ΠΈΠΉ.
13 13 /// </remarks>
14 14 public static class AsyncPool {
15 15
16 16 public static IPromise<T> Invoke<T>(Func<T> func) {
17 17 var p = new Promise<T>();
18 18 var caller = TraceContext.Instance.CurrentOperation;
19 19
20 20 ThreadPool.QueueUserWorkItem(param => {
21 21 TraceContext.Instance.EnterLogicalOperation(caller,false);
22 22 try {
23 23 p.Resolve(func());
24 24 } catch(Exception e) {
25 25 p.Reject(e);
26 26 } finally {
27 27 TraceContext.Instance.Leave();
28 28 }
29 29 });
30 30
31 31 return p;
32 32 }
33 33
34 34 public static IPromise<T> InvokeNewThread<T>(Func<T> func) {
35 35 var p = new Promise<T>();
36 36
37 37 var caller = TraceContext.Instance.CurrentOperation;
38 38
39 39 var worker = new Thread(() => {
40 40 TraceContext.Instance.EnterLogicalOperation(caller,false);
41 41 try {
42 42 p.Resolve(func());
43 43 } catch (Exception e) {
44 44 p.Reject(e);
45 45 } finally {
46 46 TraceContext.Instance.Leave();
47 47 }
48 48 });
49 49 worker.IsBackground = true;
50 50 worker.Start();
51 51
52 52 return p;
53 53 }
54 54
55 55
56 56 public static IPromise InvokeNewThread(Action func) {
57 57 var p = new Promise();
58 58
59 59 var caller = TraceContext.Instance.CurrentOperation;
60 60
61 61 var worker = new Thread(() => {
62 62 TraceContext.Instance.EnterLogicalOperation(caller,false);
63 63 try {
64 64 func();
65 65 p.Resolve();
66 66 } catch (Exception e) {
67 67 p.Reject(e);
68 68 } finally {
69 69 TraceContext.Instance.Leave();
70 70 }
71 71 });
72 72 worker.IsBackground = true;
73 73 worker.Start();
74 74
75 75 return p;
76 76 }
77 77
78 public static IPromise[] ThreadRun(params Action[] func) {
78 public static IPromise[] RunThread(params Action[] func) {
79 79 return func.Select(f => InvokeNewThread(f)).ToArray();
80 80 }
81 81
82 public static IPromise<T>[] ThreadRun<T>(params Func<T>[] func) {
82 public static IPromise<T>[] RunThread<T>(params Func<T>[] func) {
83 83 return func.Select(f => InvokeNewThread(f)).ToArray();
84 84 }
85 85 }
86 86 }
@@ -1,271 +1,474
1 1 using System.Threading;
2 2 using System.Collections.Generic;
3 3 using System;
4 4 using System.Collections;
5 5
6 6 namespace Implab.Parallels {
7 7 public class AsyncQueue<T> : IEnumerable<T> {
8 8 class Chunk {
9 9 public Chunk next;
10 10
11 11 int m_low;
12 12 int m_hi;
13 13 int m_alloc;
14 14 readonly int m_size;
15 15 readonly T[] m_data;
16 16
17 17 public Chunk(int size) {
18 18 m_size = size;
19 19 m_data = new T[size];
20 20 }
21 21
22 22 public Chunk(int size, T value) {
23 23 m_size = size;
24 24 m_hi = 1;
25 25 m_alloc = 1;
26 26 m_data = new T[size];
27 27 m_data[0] = value;
28 28 }
29 29
30 public Chunk(int size, T[] data, int offset, int length, int alloc) {
31 m_size = size;
32 m_hi = length;
33 m_alloc = alloc;
34 m_data = new T[size];
35 Array.Copy(data, offset, m_data, 0, length);
36 }
37
30 38 public int Low {
31 39 get { return m_low; }
32 40 }
33 41
34 42 public int Hi {
35 43 get { return m_hi; }
36 44 }
37 45
38 46 public bool TryEnqueue(T value,out bool extend) {
39 47 var alloc = Interlocked.Increment(ref m_alloc) - 1;
40 48
41 49 if (alloc >= m_size) {
42 50 extend = alloc == m_size;
43 51 return false;
44 52 }
45 53
46 54 extend = false;
47 55 m_data[alloc] = value;
48 56
49 57 while (alloc != Interlocked.CompareExchange(ref m_hi, alloc + 1, alloc)) {
50 58 // spin wait for commit
51 59 }
52 60 return true;
53 61 }
54 62
55 63 public bool TryDequeue(out T value,out bool recycle) {
56 64 int low;
57 65 do {
58 66 low = m_low;
59 67 if (low >= m_hi) {
60 68 value = default(T);
61 69 recycle = (low == m_size);
62 70 return false;
63 71 }
64 72 } while(low != Interlocked.CompareExchange(ref m_low, low + 1, low));
65 73
66 74 recycle = (low == m_size - 1);
67 75 value = m_data[low];
68 76
69 77 return true;
70 78 }
71 79
72 80 public bool TryEnqueueBatch(T[] batch, int offset, int length, out int enqueued, out bool extend) {
73 81 int alloc;
74 82 int allocSize;
75 83
84 // in case the batch size is larger than a free space in chunk
85 // tailGap is used to over allocate the space in the chunk to
86 // get exclusive permission on creation of the next one.
87 int tailGap = 0;
88
76 89 do {
77 90 alloc = m_alloc;
78 91
79 92 if (alloc > m_size) {
80 enqueued = 0;
81 extend = false;
82 return false;
93 // the chunk is full and someone already
94 // creating the new one
95 enqueued = 0; // nothing was added
96 extend = false; // the caller shouldn't try to extend the queue
97 return false; // nothing was added
83 98 }
84 99
85 allocSize = Math.Min(m_size - m_alloc, length);
86 } while(alloc != Interlocked.CompareExchange(ref m_alloc, alloc + allocSize, alloc));
100 allocSize = Math.Min(m_size - alloc, length);
101 if (allocSize < length) // the chunk doesn't have enough space to hold the whole batch
102 tailGap = 1; // overallocate space to get exclusive permission to extend queue
103 } while(alloc != Interlocked.CompareExchange(ref m_alloc, alloc + allocSize + tailGap, alloc));
87 104
88 if (alloc == m_size) {
89 enqueued = 0;
90 extend = true;
105 extend = tailGap != 0;
106 enqueued = allocSize;
107
108 // if the chunk was already full then length > 0, allocSize = 0, tailGap = 1
109 if (alloc == m_size)
91 110 return false;
92 }
93 111
94 112 Array.Copy(batch, offset, m_data, alloc, allocSize);
95 enqueued = allocSize;
96 extend = false;
97 113
98 114 while (alloc != Interlocked.CompareExchange(ref m_hi, alloc + allocSize, alloc)) {
99 115 // spin wait for commit
100 116 }
101 117 return true;
102 118 }
103 119
120 public bool TryDequeueBatch(T[] buffer, int offset, int length,out int dequeued, out bool recycle) {
121 int low, hi, batchSize;
122
123 do {
124 low = m_low;
125 hi = m_hi;
126 if (low >= hi) {
127 dequeued = 0;
128 recycle = (low == m_size); // recycling could be restarted and we need to signal again
129 return false;
130 }
131 batchSize = Math.Min(hi - low, length);
132 } while(low != Interlocked.CompareExchange(ref m_low, low + batchSize, low));
133
134 recycle = (low == m_size - batchSize);
135 dequeued = batchSize;
136
137 Array.Copy(m_data, low, buffer, offset, batchSize);
138
139 return true;
140 }
141
104 142 public T GetAt(int pos) {
105 143 return m_data[pos];
106 144 }
107 145 }
108 146
109 147 public const int DEFAULT_CHUNK_SIZE = 32;
148 public const int MAX_CHUNK_SIZE = 262144;
110 149
111 150 readonly int m_chunkSize = DEFAULT_CHUNK_SIZE;
112 151
113 152 Chunk m_first;
114 153 Chunk m_last;
115 154
116 155 public AsyncQueue() {
117 156 m_last = m_first = new Chunk(m_chunkSize);
118 157 }
119 158
159 /// <summary>
160 /// Adds the specified value to the queue.
161 /// </summary>
162 /// <param name="value">Tha value which will be added to the queue.</param>
120 163 public void Enqueue(T value) {
121 164 var last = m_last;
122 165 // spin wait to the new chunk
123 166 bool extend = true;
124 167 while(last == null || !last.TryEnqueue(value, out extend)) {
125 168 // try to extend queue
126 169 if (extend || last == null) {
127 170 var chunk = new Chunk(m_chunkSize, value);
128 171 if (EnqueueChunk(last, chunk))
129 172 break;
130 173 last = m_last;
131 174 } else {
132 while (last != m_last) {
175 while (last == m_last) {
133 176 Thread.MemoryBarrier();
177 }
134 178 last = m_last;
135 179 }
136 180 }
137 181 }
182
183 /// <summary>
184 /// Adds the specified data to the queue.
185 /// </summary>
186 /// <param name="data">The buffer which contains the data to be enqueued.</param>
187 /// <param name="offset">The offset of the data in the buffer.</param>
188 /// <param name="length">The size of the data to read from the buffer.</param>
189 public void EnqueueRange(T[] data, int offset, int length) {
190 if (data == null)
191 throw new ArgumentNullException("data");
192 if (offset < 0)
193 throw new ArgumentOutOfRangeException("offset");
194 if (length < 1 || offset + length > data.Length)
195 throw new ArgumentOutOfRangeException("length");
196
197 var last = m_last;
198
199 bool extend;
200 int enqueued;
201
202 while (length > 0) {
203 extend = true;
204 if (last != null && last.TryEnqueueBatch(data, offset, length, out enqueued, out extend)) {
205 length -= enqueued;
206 offset += enqueued;
138 207 }
139 208
209 if (extend) {
210 // there was no enough space in the chunk
211 // or there was no chunks in the queue
212
213 while (length > 0) {
214
215 var size = Math.Min(length, MAX_CHUNK_SIZE);
216
217 var chunk = new Chunk(
218 Math.Max(size, m_chunkSize),
219 data,
220 offset,
221 size,
222 length // length >= size
223 );
224
225 if (!EnqueueChunk(last, chunk)) {
226 // looks like the queue has been updated then proceed from the beginning
227 last = m_last;
228 break;
229 }
230
231 // we have successfully added the new chunk
232 last = chunk;
233 length -= size;
234 offset += size;
235 }
236 } else {
237 // we don't need to extend the queue, if we successfully enqueued data
238 if (length == 0)
239 break;
240
241 // if we need to wait while someone is extending the queue
242 // spinwait
243 while (last == m_last) {
244 Thread.MemoryBarrier();
245 }
246
247 last = m_last;
248 }
249 }
250 }
251
252 /// <summary>
253 /// Tries to retrieve the first element from the queue.
254 /// </summary>
255 /// <returns><c>true</c>, if element is dequeued, <c>false</c> otherwise.</returns>
256 /// <param name="value">The value of the dequeued element.</param>
140 257 public bool TryDequeue(out T value) {
141 258 var chunk = m_first;
142 259 bool recycle;
143 260 while (chunk != null) {
144 261
145 262 var result = chunk.TryDequeue(out value, out recycle);
146 263
147 264 if (recycle) // this chunk is waste
148 265 RecycleFirstChunk(chunk);
149 266 else
150 267 return result; // this chunk is usable and returned actual result
151 268
152 269 if (result) // this chunk is waste but the true result is always actual
153 270 return true;
154 271
155 272 // try again
156 273 chunk = m_first;
157 274 }
158 275
159 276 // the queue is empty
160 277 value = default(T);
161 278 return false;
162 279 }
163 280
281 /// <summary>
282 /// Tries to dequeue the specified amount of data from the queue.
283 /// </summary>
284 /// <returns><c>true</c>, if data was deuqueued, <c>false</c> otherwise.</returns>
285 /// <param name="buffer">The buffer to which the data will be written.</param>
286 /// <param name="offset">The offset in the buffer at which the data will be written.</param>
287 /// <param name="length">The maximum amount of data to be retrieved.</param>
288 /// <param name="dequeued">The actual amout of the retrieved data.</param>
289 public bool TryDequeueRange(T[] buffer, int offset, int length, out int dequeued) {
290 if (buffer == null)
291 throw new ArgumentNullException("buffer");
292 if (offset < 0)
293 throw new ArgumentOutOfRangeException("offset");
294 if (length < 1 || offset + length > buffer.Length)
295 throw new ArgumentOutOfRangeException("length");
296
297 var chunk = m_first;
298 bool recycle;
299 dequeued = 0;
300 while (chunk != null) {
301
302 int actual;
303 if (chunk.TryDequeueBatch(buffer, offset, length, out actual, out recycle)) {
304 offset += actual;
305 length -= actual;
306 dequeued += actual;
307 }
308
309 if (recycle) // this chunk is waste
310 RecycleFirstChunk(chunk);
311 else if (actual == 0)
312 break; // the chunk is usable, but doesn't contain any data (it's the last chunk in the queue)
313
314 if (length == 0)
315 return true;
316
317 // we still may dequeue something
318 // try again
319 chunk = m_first;
320 }
321
322 return dequeued != 0;
323 }
324
325 /// <summary>
326 /// Tries to dequeue all remaining data in the first chunk.
327 /// </summary>
328 /// <returns><c>true</c>, if data was dequeued, <c>false</c> otherwise.</returns>
329 /// <param name="buffer">The buffer to which data will be written.</param>
330 /// <param name="offset">The offset in the buffer at which the data will be written.</param>
331 /// <param name="length">Tha maximum amount of the data to be dequeued.</param>
332 /// <param name="dequeued">The actual amount of the dequeued data.</param>
333 public bool TryDequeueChunk(T[] buffer, int offset, int length, out int dequeued) {
334 if (buffer == null)
335 throw new ArgumentNullException("buffer");
336 if (offset < 0)
337 throw new ArgumentOutOfRangeException("offset");
338 if (length < 1 || offset + length > buffer.Length)
339 throw new ArgumentOutOfRangeException("length");
340
341 var chunk = m_first;
342 bool recycle;
343 dequeued = 0;
344
345 while (chunk != null) {
346
347 int actual;
348 if (chunk.TryDequeueBatch(buffer, offset, length, out actual, out recycle)) {
349 dequeued = actual;
350 }
351
352 if (recycle) // this chunk is waste
353 RecycleFirstChunk(chunk);
354
355 // if we have dequeued any data, then return
356 if (dequeued != 0)
357 return true;
358
359 // we still may dequeue something
360 // try again
361 chunk = m_first;
362 }
363
364 return false;
365 }
366
164 367 bool EnqueueChunk(Chunk last, Chunk chunk) {
165 368 if (Interlocked.CompareExchange(ref m_last, chunk, last) != last)
166 369 return false;
167 370
168 371 if (last != null)
169 372 last.next = chunk;
170 373 else
171 374 m_first = chunk;
172 375 return true;
173 376 }
174 377
175 378 void RecycleFirstChunk(Chunk first) {
176 379 var next = first.next;
177 380
178 381 if (next == null) {
179 382 // looks like this is the last chunk
180 383 if (first != Interlocked.CompareExchange(ref m_last, null, first)) {
181 384 // race
182 385 // maybe someone already recycled this chunk
183 386 // or a new chunk has been appedned to the queue
184 387
185 388 return; // give up
186 389 }
187 390 // the tail is updated
188 391 }
189 392
190 393 // we need to update the head
191 394 Interlocked.CompareExchange(ref m_first, next, first);
192 395 // if the head is already updated then give up
193 396 return;
194 397
195 398 }
196 399
197 400 #region IEnumerable implementation
198 401
199 402 class Enumerator : IEnumerator<T> {
200 403 Chunk m_current;
201 404 int m_pos = -1;
202 405
203 406 public Enumerator(Chunk fisrt) {
204 407 m_current = fisrt;
205 408 }
206 409
207 410 #region IEnumerator implementation
208 411
209 412 public bool MoveNext() {
210 413 if (m_current == null)
211 414 return false;
212 415
213 416 if (m_pos == -1)
214 417 m_pos = m_current.Low;
215 418 else
216 419 m_pos++;
217 420 if (m_pos == m_current.Hi) {
218 421 m_pos = 0;
219 422 m_current = m_current.next;
220 423 }
221 424
222 425 return true;
223 426 }
224 427
225 428 public void Reset() {
226 429 throw new NotSupportedException();
227 430 }
228 431
229 432 object IEnumerator.Current {
230 433 get {
231 434 return Current;
232 435 }
233 436 }
234 437
235 438 #endregion
236 439
237 440 #region IDisposable implementation
238 441
239 442 public void Dispose() {
240 443 }
241 444
242 445 #endregion
243 446
244 447 #region IEnumerator implementation
245 448
246 449 public T Current {
247 450 get {
248 451 if (m_pos == -1 || m_current == null)
249 452 throw new InvalidOperationException();
250 453 return m_current.GetAt(m_pos);
251 454 }
252 455 }
253 456
254 457 #endregion
255 458 }
256 459
257 460 public IEnumerator<T> GetEnumerator() {
258 461 return new Enumerator(m_first);
259 462 }
260 463
261 464 #endregion
262 465
263 466 #region IEnumerable implementation
264 467
265 468 IEnumerator IEnumerable.GetEnumerator() {
266 469 return GetEnumerator();
267 470 }
268 471
269 472 #endregion
270 473 }
271 474 }
@@ -1,86 +1,86
1 1 using System;
2 2 using System.Collections.Generic;
3 3 using System.Linq;
4 4 using System.Text;
5 5 using System.Text.RegularExpressions;
6 6 using System.Diagnostics;
7 7
8 8 namespace Implab
9 9 {
10 10 public static class Safe
11 11 {
12 public static void ArgumentMatch(string param, string name, Regex rx) {
12 public static void ArgumentMatch(string value, string paramName, Regex rx) {
13 13 if (rx == null)
14 14 throw new ArgumentNullException("rx");
15 if (!rx.IsMatch(param))
16 throw new ArgumentException(String.Format("The prameter value must match {0}", rx), name);
15 if (!rx.IsMatch(value))
16 throw new ArgumentException(String.Format("The prameter value must match {0}", rx), paramName);
17 17 }
18 18
19 public static void ArgumentNotEmpty(string param, string name) {
20 if (String.IsNullOrEmpty(param))
21 throw new ArgumentException("The parameter can't be empty", name);
19 public static void ArgumentNotEmpty(string value, string paramName) {
20 if (String.IsNullOrEmpty(value))
21 throw new ArgumentException("The parameter can't be empty", paramName);
22 22 }
23 23
24 public static void ArgumentNotEmpty<T>(T[] param, string name) {
25 if (param == null || param.Length == 0)
26 throw new ArgumentException("The array must be not emty", name);
24 public static void ArgumentNotEmpty<T>(T[] value, string paramName) {
25 if (value == null || value.Length == 0)
26 throw new ArgumentException("The array must be not emty", paramName);
27 27 }
28 28
29 public static void ArgumentNotNull(object param, string name) {
30 if (param == null)
31 throw new ArgumentNullException(name);
29 public static void ArgumentNotNull(object value, string paramName) {
30 if (value == null)
31 throw new ArgumentNullException(paramName);
32 32 }
33 33
34 public static void ArgumentInRange(int arg, int min, int max, string name) {
35 if (arg < min || arg > max)
36 throw new ArgumentOutOfRangeException(name);
34 public static void ArgumentInRange(int value, int min, int max, string paramName) {
35 if (value < min || value > max)
36 throw new ArgumentOutOfRangeException(paramName);
37 37 }
38 38
39 39 public static void Dispose<T>(T obj) where T : class
40 40 {
41 41 var disp = obj as IDisposable;
42 42 if (disp != null)
43 43 disp.Dispose();
44 44 }
45 45
46 46 [DebuggerStepThrough]
47 47 public static IPromise<T> InvokePromise<T>(Func<T> action) {
48 48 ArgumentNotNull(action, "action");
49 49
50 50 var p = new Promise<T>();
51 51 try {
52 52 p.Resolve(action());
53 53 } catch (Exception err) {
54 54 p.Reject(err);
55 55 }
56 56
57 57 return p;
58 58 }
59 59
60 60 [DebuggerStepThrough]
61 61 public static IPromise InvokePromise(Action action) {
62 62 ArgumentNotNull(action, "action");
63 63
64 64 var p = new Promise();
65 65 try {
66 66 action();
67 67 p.Resolve();
68 68 } catch (Exception err) {
69 69 p.Reject(err);
70 70 }
71 71
72 72 return p;
73 73 }
74 74
75 75 [DebuggerStepThrough]
76 76 public static IPromise<T> InvokePromise<T>(Func<IPromise<T>> action) {
77 77 ArgumentNotNull(action, "action");
78 78
79 79 try {
80 80 return action() ?? Promise<T>.ExceptionToPromise(new Exception("The action returned null"));
81 81 } catch (Exception err) {
82 82 return Promise<T>.ExceptionToPromise(err);
83 83 }
84 84 }
85 85 }
86 86 }
@@ -1,110 +1,120
1 1 using System;
2 2 using Implab.Diagnostics;
3 3 using Implab.Parallels;
4 4 using Implab;
5 5 using System.Collections.Generic;
6 6 using System.Collections.Concurrent;
7 7
8 8 namespace MonoPlay {
9 9 class MainClass {
10 10 public static void Main(string[] args) {
11 11 if (args == null)
12 12 throw new ArgumentNullException("args");
13 13
14 14 var q1 = new AsyncQueue<int>();
15 15 var q2 = new Queue<int>();
16 16
17 17 const int count = 10000000;
18 18
19
19 int res1 = 0, res2 = 0;
20 20 var t1 = Environment.TickCount;
21 21
22 AsyncPool.ThreadRun(
22 AsyncPool.RunThread(
23 23 () => {
24 24 for (var i = 0; i < count; i++)
25 q1.Enqueue(i);
25 q1.Enqueue(1);
26 Console.WriteLine("done writer #1: {0} ms", Environment.TickCount - t1);
26 27 },
27 28 () => {
28 29 for (var i = 0; i < count; i++)
29 q1.Enqueue(i);
30 q1.Enqueue(2);
31 Console.WriteLine("done writer #2: {0} ms", Environment.TickCount - t1);
30 32 },
31 33 () => {
32 34 int temp = 0;
33 35 int i = 0;
34 36 while (i < count)
35 if (q1.TryDequeue(out temp))
37 if (q1.TryDequeue(out temp)) {
36 38 i++;
39 res1 += temp;
40 }
41 Console.WriteLine("done reader #1: {0} ms", Environment.TickCount - t1);
37 42 },
38 43 () => {
39 44 int temp = 0;
40 45 int i = 0;
41 46 while (i < count)
42 if (q1.TryDequeue(out temp))
47 if (q1.TryDequeue(out temp)) {
43 48 i++;
49 res2 += temp;
50 }
51 Console.WriteLine("done reader #2: {0} ms", Environment.TickCount - t1);
44 52 }
45 53 )
46 54 .Combine()
47 55 .Join();
48 56
57 Console.WriteLine("done: {0} ms, summ#1: {1}, summ#2: {2}, total: {3}", Environment.TickCount - t1, res1, res2, res1 + res2);
58
49 59 var t2 = Environment.TickCount;
50 60 Console.WriteLine("MTQueue: {0} ms", t2 - t1);
51 61
52 62 t1 = Environment.TickCount;
53 63
54 64 for (var i = 0; i < count * 2; i++)
55 65 q2.Enqueue(i);
56 66
57 67 for (var i = 0; i < count * 2; i++)
58 68 q2.Dequeue();
59 69
60 70 t2 = Environment.TickCount;
61 71 Console.WriteLine("Queue: {0} ms", t2 - t1);
62 72
63 73 q2 = new Queue<int>();
64 74
65 75 t1 = Environment.TickCount;
66 76
67 77
68 AsyncPool.ThreadRun(
78 AsyncPool.RunThread(
69 79 () => {
70 80 for (var i = 0; i < count; i++)
71 81 lock (q2)
72 82 q2.Enqueue(i);
73 83 },
74 84 () => {
75 85 for (var i = 0; i < count; i++)
76 86 lock (q2)
77 87 q2.Enqueue(i);
78 88 },
79 89 () => {
80 90 for (int i = 0; i < count ;)
81 91 lock (q2) {
82 92 if (q2.Count == 0)
83 93 continue;
84 94 q2.Dequeue();
85 95 i++;
86 96 }
87 97
88 98 },
89 99 () => {
90 100 for (int i = 0; i < count ;)
91 101 lock (q2) {
92 102 if (q2.Count == 0)
93 103 continue;
94 104 q2.Dequeue();
95 105 i++;
96 106 }
97 107
98 108 }
99 109 )
100 110 .Combine()
101 111 .Join();
102 112
103 113
104 114
105 115 t2 = Environment.TickCount;
106 116 Console.WriteLine("Queue+Lock: {0} ms", t2 - t1);
107 117
108 118 }
109 119 }
110 120 }
General Comments 0
You need to be logged in to leave comments. Login now