##// END OF EJS Templates
minor fixes and improvements of AsyncQueue, additional tests
cin -
r122:0c8685c8b56b v2
parent child
Show More
@@ -1,600 +1,676
1 1 using System;
2 2 using System.Reflection;
3 3 using System.Threading;
4 4 using Implab.Parallels;
5 5
6 6 #if MONO
7 7
8 8 using NUnit.Framework;
9 9 using TestClassAttribute = NUnit.Framework.TestFixtureAttribute;
10 10 using TestMethod = NUnit.Framework.TestAttribute;
11 11
12 12 #else
13 13
14 14 using Microsoft.VisualStudio.TestTools.UnitTesting;
15 15
16 16 #endif
17 17
18 18 namespace Implab.Test {
19 19 [TestClass]
20 20 public class AsyncTests {
21 21 [TestMethod]
22 22 public void ResolveTest() {
23 23 int res = -1;
24 24 var p = new Promise<int>();
25 25 p.Then(x => res = x);
26 26 p.Resolve(100);
27 27
28 28 Assert.AreEqual(100, res);
29 29 }
30 30
31 31 [TestMethod]
32 32 public void RejectTest() {
33 33 int res = -1;
34 34 Exception err = null;
35 35
36 36 var p = new Promise<int>();
37 37 p.Then(
38 38 x => res = x,
39 39 e => {
40 40 err = e;
41 41 return -2;
42 42 }
43 43 );
44 44 p.Reject(new ApplicationException("error"));
45 45
46 46 Assert.AreEqual(res, -1);
47 47 Assert.AreEqual(err.Message, "error");
48 48
49 49 }
50 50
51 51 [TestMethod]
52 52 public void CancelExceptionTest() {
53 53 var p = new Promise<bool>();
54 54 p.Cancel();
55 55
56 56 var p2 = p.Cancelled(() => {
57 57 throw new ApplicationException("CANCELLED");
58 58 });
59 59
60 60 try {
61 61 p2.Join();
62 62 Assert.Fail();
63 63 } catch (ApplicationException err) {
64 64 Assert.AreEqual("CANCELLED", err.InnerException.Message);
65 65 }
66 66
67 67 }
68 68
69 69 [TestMethod]
70 70 public void ContinueOnCancelTest() {
71 71 var p = new Promise<bool>();
72 72 p.Cancel();
73 73
74 74 var p2 = p
75 75 .Cancelled<bool>(() => {
76 76 throw new ApplicationException("CANCELLED");
77 77 })
78 78 .Error(e => true);
79 79
80 80 Assert.AreEqual(true, p2.Join());
81 81 }
82 82
83 83 [TestMethod]
84 84 public void JoinSuccessTest() {
85 85 var p = new Promise<int>();
86 86 p.Resolve(100);
87 87 Assert.AreEqual(p.Join(), 100);
88 88 }
89 89
90 90 [TestMethod]
91 91 public void JoinFailTest() {
92 92 var p = new Promise<int>();
93 93 p.Reject(new ApplicationException("failed"));
94 94
95 95 try {
96 96 p.Join();
97 97 throw new ApplicationException("WRONG!");
98 98 } catch (TargetInvocationException err) {
99 99 Assert.AreEqual(err.InnerException.Message, "failed");
100 100 } catch {
101 101 Assert.Fail("Got wrong excaption");
102 102 }
103 103 }
104 104
105 105 [TestMethod]
106 106 public void MapTest() {
107 107 var p = new Promise<int>();
108 108
109 109 var p2 = p.Then(x => x.ToString());
110 110 p.Resolve(100);
111 111
112 112 Assert.AreEqual(p2.Join(), "100");
113 113 }
114 114
115 115 [TestMethod]
116 116 public void FixErrorTest() {
117 117 var p = new Promise<int>();
118 118
119 119 var p2 = p.Error(e => 101);
120 120
121 121 p.Reject(new Exception());
122 122
123 123 Assert.AreEqual(p2.Join(), 101);
124 124 }
125 125
126 126 [TestMethod]
127 127 public void ChainTest() {
128 128 var p1 = new Promise<int>();
129 129
130 130 var p3 = p1.Chain(x => {
131 131 var p2 = new Promise<string>();
132 132 p2.Resolve(x.ToString());
133 133 return p2;
134 134 });
135 135
136 136 p1.Resolve(100);
137 137
138 138 Assert.AreEqual(p3.Join(), "100");
139 139 }
140 140
141 141 [TestMethod]
142 142 public void ChainFailTest() {
143 143 var p1 = new Promise<int>();
144 144
145 145 var p3 = p1.Chain(x => {
146 146 var p2 = new Promise<string>();
147 147 p2.Reject(new Exception("DIE!!!"));
148 148 return p2;
149 149 });
150 150
151 151 p1.Resolve(100);
152 152
153 153 Assert.IsTrue(p3.IsResolved);
154 154 }
155 155
156 156 [TestMethod]
157 157 public void PoolTest() {
158 158 var pid = Thread.CurrentThread.ManagedThreadId;
159 159 var p = AsyncPool.Invoke(() => Thread.CurrentThread.ManagedThreadId);
160 160
161 161 Assert.AreNotEqual(pid, p.Join());
162 162 }
163 163
164 164 [TestMethod]
165 165 public void WorkerPoolSizeTest() {
166 166 var pool = new WorkerPool(5, 10, 1);
167 167
168 168 Assert.AreEqual(5, pool.PoolSize);
169 169
170 170 pool.Invoke(() => { Thread.Sleep(100000000); return 10; });
171 171 pool.Invoke(() => { Thread.Sleep(100000000); return 10; });
172 172 pool.Invoke(() => { Thread.Sleep(100000000); return 10; });
173 173
174 174 Assert.AreEqual(5, pool.PoolSize);
175 175
176 176 for (int i = 0; i < 100; i++)
177 177 pool.Invoke(() => { Thread.Sleep(100000000); return 10; });
178 178 Thread.Sleep(200);
179 179 Assert.AreEqual(10, pool.PoolSize);
180 180
181 181 pool.Dispose();
182 182 }
183 183
184 184 [TestMethod]
185 185 public void WorkerPoolCorrectTest() {
186 186 var pool = new WorkerPool(0,1000,100);
187 187
188 188 const int iterations = 1000;
189 189 int pending = iterations;
190 190 var stop = new ManualResetEvent(false);
191 191
192 192 var count = 0;
193 193 for (int i = 0; i < iterations; i++) {
194 194 pool
195 195 .Invoke(() => 1)
196 196 .Then(x => Interlocked.Add(ref count, x))
197 197 .Then(x => Math.Log10(x))
198 198 .On(() => {
199 199 Interlocked.Decrement(ref pending);
200 200 if (pending == 0)
201 201 stop.Set();
202 202 }, PromiseEventType.All);
203 203 }
204 204
205 205 stop.WaitOne();
206 206
207 207 Assert.AreEqual(iterations, count);
208 208 Console.WriteLine("Max threads: {0}", pool.MaxRunningThreads);
209 209 pool.Dispose();
210 210
211 211 }
212 212
213 213 [TestMethod]
214 214 public void WorkerPoolDisposeTest() {
215 215 var pool = new WorkerPool(5, 20);
216 216 Assert.AreEqual(5, pool.PoolSize);
217 217 pool.Dispose();
218 218 Thread.Sleep(500);
219 219 Assert.AreEqual(0, pool.PoolSize);
220 220 pool.Dispose();
221 221 }
222 222
223 223 [TestMethod]
224 224 public void MTQueueTest() {
225 225 var queue = new MTQueue<int>();
226 226 int res;
227 227
228 228 queue.Enqueue(10);
229 229 Assert.IsTrue(queue.TryDequeue(out res));
230 230 Assert.AreEqual(10, res);
231 231 Assert.IsFalse(queue.TryDequeue(out res));
232 232
233 233 for (int i = 0; i < 1000; i++)
234 234 queue.Enqueue(i);
235 235
236 236 for (int i = 0; i < 1000; i++) {
237 237 queue.TryDequeue(out res);
238 238 Assert.AreEqual(i, res);
239 239 }
240 240
241 241 int writers = 0;
242 242 int readers = 0;
243 243 var stop = new ManualResetEvent(false);
244 244 int total = 0;
245 245
246 246 const int itemsPerWriter = 10000;
247 247 const int writersCount = 10;
248 248
249 249 for (int i = 0; i < writersCount; i++) {
250 250 Interlocked.Increment(ref writers);
251 251 AsyncPool
252 252 .InvokeNewThread(() => {
253 253 for (int ii = 0; ii < itemsPerWriter; ii++) {
254 254 queue.Enqueue(1);
255 255 }
256 256 return 1;
257 257 })
258 258 .On(() => Interlocked.Decrement(ref writers), PromiseEventType.All);
259 259 }
260 260
261 261 for (int i = 0; i < 10; i++) {
262 262 Interlocked.Increment(ref readers);
263 263 AsyncPool
264 264 .InvokeNewThread(() => {
265 265 int t;
266 266 do {
267 267 while (queue.TryDequeue(out t))
268 268 Interlocked.Add(ref total, t);
269 269 } while (writers > 0);
270 270 return 1;
271 271 })
272 272 .On(() => {
273 273 Interlocked.Decrement(ref readers);
274 274 if (readers == 0)
275 275 stop.Set();
276 276 }, PromiseEventType.All);
277 277 }
278 278
279 279 stop.WaitOne();
280 280
281 281 Assert.AreEqual(100000, total);
282 282 }
283 283
284 284 [TestMethod]
285 285 public void AsyncQueueTest() {
286 286 var queue = new AsyncQueue<int>();
287 287 int res;
288 288
289 289 queue.Enqueue(10);
290 290 Assert.IsTrue(queue.TryDequeue(out res));
291 291 Assert.AreEqual(10, res);
292 292 Assert.IsFalse(queue.TryDequeue(out res));
293 293
294 294 for (int i = 0; i < 1000; i++)
295 295 queue.Enqueue(i);
296 296
297 297 for (int i = 0; i < 1000; i++) {
298 298 queue.TryDequeue(out res);
299 299 Assert.AreEqual(i, res);
300 300 }
301 301
302 302 const int count = 10000000;
303 303
304 304 int res1 = 0, res2 = 0;
305 305 var t1 = Environment.TickCount;
306 306
307 307 AsyncPool.RunThread(
308 308 () => {
309 309 for (var i = 0; i < count; i++)
310 310 queue.Enqueue(1);
311 311 Console.WriteLine("done writer #1: {0} ms", Environment.TickCount - t1);
312 312 },
313 313 () => {
314 314 for (var i = 0; i < count; i++)
315 315 queue.Enqueue(2);
316 316 Console.WriteLine("done writer #2: {0} ms", Environment.TickCount - t1);
317 317 },
318 318 () => {
319 319 int temp;
320 320 int i = 0;
321 321 while (i < count)
322 322 if (queue.TryDequeue(out temp)) {
323 323 i++;
324 324 res1 += temp;
325 325 }
326 326 Console.WriteLine("done reader #1: {0} ms", Environment.TickCount - t1);
327 327 },
328 328 () => {
329 329 int temp;
330 330 int i = 0;
331 331 while (i < count)
332 332 if (queue.TryDequeue(out temp)) {
333 333 i++;
334 334 res2 += temp;
335 335 }
336 336 Console.WriteLine("done reader #2: {0} ms", Environment.TickCount - t1);
337 337 }
338 338 )
339 339 .Combine()
340 340 .Join();
341 341
342 342 Assert.AreEqual(count * 3, res1 + res2);
343 343
344 344 Console.WriteLine(
345 345 "done: {0} ms, summ#1: {1}, summ#2: {2}, total: {3}, count: {4}",
346 346 Environment.TickCount - t1,
347 347 res1,
348 348 res2,
349 349 res1 + res2,
350 350 count
351 351 );
352 352 }
353 353
354 354 [TestMethod]
355 355 public void AsyncQueueBatchTest() {
356 356 var queue = new AsyncQueue<int>();
357 357
358 358 const int wBatch = 29;
359 359 const int wCount = 400000;
360 360 const int total = wBatch * wCount * 2;
361 361 const int summ = wBatch * wCount * 3;
362 362
363 363 int r1 = 0, r2 = 0;
364 364 const int rBatch = 111;
365 365 int read = 0;
366 366
367 367 var t1 = Environment.TickCount;
368 368
369 369 AsyncPool.RunThread(
370 370 () => {
371 371 var buffer = new int[wBatch];
372 372 for(int i = 0; i<wBatch; i++)
373 373 buffer[i] = 1;
374 374
375 375 for(int i =0; i < wCount; i++)
376 376 queue.EnqueueRange(buffer,0,wBatch);
377 377 Console.WriteLine("done writer #1: {0} ms", Environment.TickCount - t1);
378 378 },
379 379 () => {
380 380 var buffer = new int[wBatch];
381 381 for(int i = 0; i<wBatch; i++)
382 382 buffer[i] = 2;
383 383
384 384 for(int i =0; i < wCount; i++)
385 385 queue.EnqueueRange(buffer,0,wBatch);
386 386 Console.WriteLine("done writer #2: {0} ms", Environment.TickCount - t1);
387 387 },
388 388 () => {
389 389 var buffer = new int[rBatch];
390 390
391 391 while(read < total) {
392 392 int actual;
393 393 if (queue.TryDequeueRange(buffer,0,rBatch,out actual)) {
394 394 for(int i=0; i< actual; i++)
395 395 r1 += buffer[i];
396 396 Interlocked.Add(ref read, actual);
397 397 }
398 398 }
399 399
400 400 Console.WriteLine("done reader #1: {0} ms", Environment.TickCount - t1);
401 401 },
402 402 () => {
403 403 var buffer = new int[rBatch];
404 404
405 405 while(read < total) {
406 406 int actual;
407 407 if (queue.TryDequeueRange(buffer,0,rBatch,out actual)) {
408 408 for(int i=0; i< actual; i++)
409 409 r2 += buffer[i];
410 410 Interlocked.Add(ref read, actual);
411 411 }
412 412 }
413 413
414 414 Console.WriteLine("done reader #2: {0} ms", Environment.TickCount - t1);
415 415 }
416 416 )
417 417 .Combine()
418 418 .Join();
419 419
420 420 Assert.AreEqual(summ , r1 + r2);
421 421
422 422 Console.WriteLine(
423 423 "done: {0} ms, summ#1: {1}, summ#2: {2}, total: {3}, count: {4}",
424 424 Environment.TickCount - t1,
425 425 r1,
426 426 r2,
427 427 r1 + r2,
428 428 total
429 429 );
430 430 }
431 431
432 432 [TestMethod]
433 public void AsyncQueueChunkDequeueTest() {
434 var queue = new AsyncQueue<int>();
435
436 const int wBatch = 31;
437 const int wCount = 200000;
438 const int total = wBatch * wCount * 3;
439 const int summ = wBatch * wCount * 6;
440
441 int r1 = 0, r2 = 0;
442 const int rBatch = 1024;
443 int read = 0;
444
445 var t1 = Environment.TickCount;
446
447 AsyncPool.RunThread(
448 () => {
449 var buffer = new int[wBatch];
450 for(int i = 0; i<wBatch; i++)
451 buffer[i] = 1;
452
453 for(int i =0; i < wCount; i++)
454 queue.EnqueueRange(buffer,0,wBatch);
455 Console.WriteLine("done writer #1: {0} ms", Environment.TickCount - t1);
456 },
457 () => {
458 var buffer = new int[wBatch];
459 for(int i = 0; i<wBatch; i++)
460 buffer[i] = 2;
461
462 for(int i =0; i < wCount; i++)
463 queue.EnqueueRange(buffer,0,wBatch);
464 Console.WriteLine("done writer #2: {0} ms", Environment.TickCount - t1);
465 },
466 () => {
467 var buffer = new int[wBatch];
468 for(int i = 0; i<wBatch; i++)
469 buffer[i] = 3;
470
471 for(int i =0; i < wCount; i++)
472 queue.EnqueueRange(buffer,0,wBatch);
473 Console.WriteLine("done writer #3: {0} ms", Environment.TickCount - t1);
474 },
475 () => {
476 var buffer = new int[rBatch];
477 int count = 1;
478 double avgchunk = 0;
479 while(read < total) {
480 int actual;
481 if (queue.TryDequeueChunk(buffer,0,rBatch,out actual)) {
482 for(int i=0; i< actual; i++)
483 r2 += buffer[i];
484 Interlocked.Add(ref read, actual);
485 avgchunk = avgchunk*(count-1)/count + actual/(double)count;
486 count ++;
487 }
488 }
489
490 Console.WriteLine("done reader #2: {0} ms, avg chunk size: {1}", Environment.TickCount - t1, avgchunk);
491 }
492 )
493 .Combine()
494 .Join();
495
496 Assert.AreEqual(summ , r1 + r2);
497
498 Console.WriteLine(
499 "done: {0} ms, summ#1: {1}, summ#2: {2}, total: {3}, count: {4}",
500 Environment.TickCount - t1,
501 r1,
502 r2,
503 r1 + r2,
504 total
505 );
506 }
507
508 [TestMethod]
433 509 public void ParallelMapTest() {
434 510
435 511 const int count = 100000;
436 512
437 513 var args = new double[count];
438 514 var rand = new Random();
439 515
440 516 for (int i = 0; i < count; i++)
441 517 args[i] = rand.NextDouble();
442 518
443 519 var t = Environment.TickCount;
444 520 var res = args.ParallelMap(x => Math.Sin(x*x), 4).Join();
445 521
446 522 Console.WriteLine("Map complete in {0} ms", Environment.TickCount - t);
447 523
448 524 t = Environment.TickCount;
449 525 for (int i = 0; i < count; i++)
450 526 Assert.AreEqual(Math.Sin(args[i] * args[i]), res[i]);
451 527 Console.WriteLine("Verified in {0} ms", Environment.TickCount - t);
452 528 }
453 529
454 530 [TestMethod]
455 531 public void ChainedMapTest() {
456 532
457 533 using (var pool = new WorkerPool(0,10,1)) {
458 534 const int count = 10000;
459 535
460 536 var args = new double[count];
461 537 var rand = new Random();
462 538
463 539 for (int i = 0; i < count; i++)
464 540 args[i] = rand.NextDouble();
465 541
466 542 var t = Environment.TickCount;
467 543 var res = args
468 544 .ChainedMap(
469 545 // Analysis disable once AccessToDisposedClosure
470 546 x => pool.Invoke(
471 547 () => Math.Sin(x * x)
472 548 ),
473 549 4
474 550 )
475 551 .Join();
476 552
477 553 Console.WriteLine("Map complete in {0} ms", Environment.TickCount - t);
478 554
479 555 t = Environment.TickCount;
480 556 for (int i = 0; i < count; i++)
481 557 Assert.AreEqual(Math.Sin(args[i] * args[i]), res[i]);
482 558 Console.WriteLine("Verified in {0} ms", Environment.TickCount - t);
483 559 Console.WriteLine("Max workers: {0}", pool.MaxRunningThreads);
484 560 }
485 561 }
486 562
487 563 [TestMethod]
488 564 public void ParallelForEachTest() {
489 565
490 566 const int count = 100000;
491 567
492 568 var args = new int[count];
493 569 var rand = new Random();
494 570
495 571 for (int i = 0; i < count; i++)
496 572 args[i] = (int)(rand.NextDouble() * 100);
497 573
498 574 int result = 0;
499 575
500 576 var t = Environment.TickCount;
501 577 args.ParallelForEach(x => Interlocked.Add(ref result, x), 4).Join();
502 578
503 579 Console.WriteLine("Iteration complete in {0} ms, result: {1}", Environment.TickCount - t, result);
504 580
505 581 int result2 = 0;
506 582
507 583 t = Environment.TickCount;
508 584 for (int i = 0; i < count; i++)
509 585 result2 += args[i];
510 586 Assert.AreEqual(result2, result);
511 587 Console.WriteLine("Verified in {0} ms", Environment.TickCount - t);
512 588 }
513 589
514 590 [TestMethod]
515 591 public void ComplexCase1Test() {
516 592 var flags = new bool[3];
517 593
518 594 // op1 (aync 200ms) => op2 (async 200ms) => op3 (sync map)
519 595
520 596 var step1 = PromiseHelper
521 597 .Sleep(200, "Alan")
522 598 .On(() => flags[0] = true, PromiseEventType.Cancelled);
523 599 var p = step1
524 600 .Chain(x =>
525 601 PromiseHelper
526 602 .Sleep(200, "Hi, " + x)
527 603 .Then(y => y)
528 604 .On(() => flags[1] = true, PromiseEventType.Cancelled)
529 605 )
530 606 .On(() => flags[2] = true, PromiseEventType.Cancelled);
531 607 step1.Join();
532 608 p.Cancel();
533 609 try {
534 610 Assert.AreEqual(p.Join(), "Hi, Alan");
535 611 Assert.Fail("Shouldn't get here");
536 612 } catch (OperationCanceledException) {
537 613 }
538 614
539 615 Assert.IsFalse(flags[0]);
540 616 Assert.IsTrue(flags[1]);
541 617 Assert.IsTrue(flags[2]);
542 618 }
543 619
544 620 [TestMethod]
545 621 public void ChainedCancel1Test() {
546 622 // ΠΏΡ€ΠΈ ΠΎΡ‚ΠΌΠ΅Π½Π΅ сцСплСнной асинхронной ΠΎΠΏΠ΅Ρ€Π°Ρ†ΠΈΠΈ всС ΠΎΠ±Π΅Ρ‰Π°Π½ΠΈΠ΅ Π΄ΠΎΠ»ΠΆΠ½ΠΎ
547 623 // Π·Π°Π²Π΅Ρ€ΡˆΠ°Ρ‚ΡŒΡΡ ошибкой OperationCanceledException
548 624 var p = PromiseHelper
549 625 .Sleep(1, "Hi, HAL!")
550 626 .Then(x => {
551 627 // запускаСм Π΄Π²Π΅ асинхронныС ΠΎΠΏΠ΅Ρ€Π°Ρ†ΠΈΠΈ
552 628 var result = PromiseHelper.Sleep(1000, "HEM ENABLED!!!");
553 629 // вторая опСрация отмСняСт ΠΏΠ΅Ρ€Π²ΡƒΡŽ Π΄ΠΎ Π·Π°Π²Π΅Ρ€ΡˆΠ΅Π½ΠΈΡ
554 630 PromiseHelper
555 631 .Sleep(100, "HAL, STOP!")
556 632 .Then(result.Cancel);
557 633 return result;
558 634 });
559 635 try {
560 636 p.Join();
561 637 } catch (TargetInvocationException err) {
562 638 Assert.IsTrue(err.InnerException is OperationCanceledException);
563 639 }
564 640 }
565 641
566 642 [TestMethod]
567 643 public void ChainedCancel2Test() {
568 644 // ΠΏΡ€ΠΈ ΠΎΡ‚ΠΌΠ΅Π½Π΅ Ρ†Π΅ΠΏΠΎΡ‡ΠΊΠΈ ΠΎΠ±Π΅Ρ‰Π°Π½ΠΈΠΉ, Π²Π»ΠΎΠΆΠ΅Π½Π½Ρ‹Π΅ ΠΎΠΏΠ΅Ρ€Π°Ρ†ΠΈΠΈ Ρ‚Π°ΠΊΠΆΠ΅ Π΄ΠΎΠ»ΠΆΠ½Ρ‹ ΠΎΡ‚ΠΌΠ΅Π½ΡΡ‚ΡŒΡΡ
569 645 var pSurvive = new Promise<bool>();
570 646 var hemStarted = new ManualResetEvent(false);
571 647 var p = PromiseHelper
572 648 .Sleep(1, "Hi, HAL!")
573 649 .Chain(x => {
574 650 hemStarted.Set();
575 651 // запускаСм Π΄Π²Π΅ асинхронныС ΠΎΠΏΠ΅Ρ€Π°Ρ†ΠΈΠΈ
576 652 var result = PromiseHelper
577 653 .Sleep(100000000, "HEM ENABLED!!!")
578 654 .Then(s => {
579 655 pSurvive.Resolve(false);
580 656 return s;
581 657 });
582 658
583 659 result
584 660 .Cancelled(() => pSurvive.Resolve(true));
585 661
586 662 return result;
587 663 });
588 664
589 665 hemStarted.WaitOne();
590 666 p.Cancel();
591 667
592 668 try {
593 669 p.Join();
594 670 } catch (OperationCanceledException) {
595 671 Assert.IsTrue(pSurvive.Join());
596 672 }
597 673 }
598 674 }
599 675 }
600 676
@@ -1,474 +1,463
1 1 using System.Threading;
2 2 using System.Collections.Generic;
3 3 using System;
4 4 using System.Collections;
5 5
6 6 namespace Implab.Parallels {
7 7 public class AsyncQueue<T> : IEnumerable<T> {
8 8 class Chunk {
9 9 public Chunk next;
10 10
11 11 int m_low;
12 12 int m_hi;
13 13 int m_alloc;
14 14 readonly int m_size;
15 15 readonly T[] m_data;
16 16
17 17 public Chunk(int size) {
18 18 m_size = size;
19 19 m_data = new T[size];
20 20 }
21 21
22 22 public Chunk(int size, T value) {
23 23 m_size = size;
24 24 m_hi = 1;
25 25 m_alloc = 1;
26 26 m_data = new T[size];
27 27 m_data[0] = value;
28 28 }
29 29
30 30 public Chunk(int size, T[] data, int offset, int length, int alloc) {
31 31 m_size = size;
32 32 m_hi = length;
33 33 m_alloc = alloc;
34 34 m_data = new T[size];
35 35 Array.Copy(data, offset, m_data, 0, length);
36 36 }
37 37
38 38 public int Low {
39 39 get { return m_low; }
40 40 }
41 41
42 42 public int Hi {
43 43 get { return m_hi; }
44 44 }
45 45
46 46 public bool TryEnqueue(T value, out bool extend) {
47 47 var alloc = Interlocked.Increment(ref m_alloc) - 1;
48 48
49 49 if (alloc >= m_size) {
50 50 extend = alloc == m_size;
51 51 return false;
52 52 }
53 53
54 54 extend = false;
55 55 m_data[alloc] = value;
56 56
57 57 while (alloc != Interlocked.CompareExchange(ref m_hi, alloc + 1, alloc)) {
58 58 // spin wait for commit
59 59 }
60 60 return true;
61 61 }
62 62
63 63 public bool TryDequeue(out T value, out bool recycle) {
64 64 int low;
65 65 do {
66 66 low = m_low;
67 67 if (low >= m_hi) {
68 68 value = default(T);
69 69 recycle = (low == m_size);
70 70 return false;
71 71 }
72 72 } while(low != Interlocked.CompareExchange(ref m_low, low + 1, low));
73 73
74 74 recycle = (low == m_size - 1);
75 75 value = m_data[low];
76 76
77 77 return true;
78 78 }
79 79
80 80 public bool TryEnqueueBatch(T[] batch, int offset, int length, out int enqueued, out bool extend) {
81 int alloc;
82 int allocSize;
81 //int alloc;
82 //int allocSize;
83 83
84 // in case the batch size is larger than a free space in chunk
85 // tailGap is used to over allocate the space in the chunk to
86 // get exclusive permission on creation of the next one.
87 int tailGap = 0;
88
89 do {
90 alloc = m_alloc;
91
84 var alloc = Interlocked.Add(ref m_alloc, length) - length;
92 85 if (alloc > m_size) {
93 86 // the chunk is full and someone already
94 87 // creating the new one
95 88 enqueued = 0; // nothing was added
96 89 extend = false; // the caller shouldn't try to extend the queue
97 90 return false; // nothing was added
98 91 }
99 92
100 allocSize = Math.Min(m_size - alloc, length);
101 if (allocSize < length) // the chunk doesn't have enough space to hold the whole batch
102 tailGap = 1; // overallocate space to get exclusive permission to extend queue
103 } while(alloc != Interlocked.CompareExchange(ref m_alloc, alloc + allocSize + tailGap, alloc));
93 enqueued = Math.Min(m_size - alloc, length);
94 extend = length > enqueued;
104 95
105 extend = tailGap != 0;
106 enqueued = allocSize;
107
108 // if the chunk was already full then length > 0, allocSize = 0, tailGap = 1
109 if (alloc == m_size)
96 if (enqueued == 0)
110 97 return false;
111 98
112 Array.Copy(batch, offset, m_data, alloc, allocSize);
113 99
114 while (alloc != Interlocked.CompareExchange(ref m_hi, alloc + allocSize, alloc)) {
100 Array.Copy(batch, offset, m_data, alloc, enqueued);
101
102 while (alloc != Interlocked.CompareExchange(ref m_hi, alloc + enqueued, alloc)) {
115 103 // spin wait for commit
116 104 }
105
117 106 return true;
118 107 }
119 108
120 109 public bool TryDequeueBatch(T[] buffer, int offset, int length,out int dequeued, out bool recycle) {
121 110 int low, hi, batchSize;
122 111
123 112 do {
124 113 low = m_low;
125 114 hi = m_hi;
126 115 if (low >= hi) {
127 116 dequeued = 0;
128 117 recycle = (low == m_size); // recycling could be restarted and we need to signal again
129 118 return false;
130 119 }
131 120 batchSize = Math.Min(hi - low, length);
132 121 } while(low != Interlocked.CompareExchange(ref m_low, low + batchSize, low));
133 122
134 123 recycle = (low == m_size - batchSize);
135 124 dequeued = batchSize;
136 125
137 126 Array.Copy(m_data, low, buffer, offset, batchSize);
138 127
139 128 return true;
140 129 }
141 130
142 131 public T GetAt(int pos) {
143 132 return m_data[pos];
144 133 }
145 134 }
146 135
147 136 public const int DEFAULT_CHUNK_SIZE = 32;
148 137 public const int MAX_CHUNK_SIZE = 262144;
149 138
150 139 readonly int m_chunkSize = DEFAULT_CHUNK_SIZE;
151 140
152 141 Chunk m_first;
153 142 Chunk m_last;
154 143
155 144 public AsyncQueue() {
156 145 m_last = m_first = new Chunk(m_chunkSize);
157 146 }
158 147
159 148 /// <summary>
160 149 /// Adds the specified value to the queue.
161 150 /// </summary>
162 151 /// <param name="value">Tha value which will be added to the queue.</param>
163 152 public void Enqueue(T value) {
164 153 var last = m_last;
165 154 // spin wait to the new chunk
166 155 bool extend = true;
167 156 while (last == null || !last.TryEnqueue(value, out extend)) {
168 157 // try to extend queue
169 158 if (extend || last == null) {
170 159 var chunk = new Chunk(m_chunkSize, value);
171 160 if (EnqueueChunk(last, chunk))
172 break;
161 break; // success! exit!
173 162 last = m_last;
174 163 } else {
175 164 while (last == m_last) {
176 165 Thread.MemoryBarrier();
177 166 }
178 167 last = m_last;
179 168 }
180 169 }
181 170 }
182 171
183 172 /// <summary>
184 173 /// Adds the specified data to the queue.
185 174 /// </summary>
186 175 /// <param name="data">The buffer which contains the data to be enqueued.</param>
187 176 /// <param name="offset">The offset of the data in the buffer.</param>
188 177 /// <param name="length">The size of the data to read from the buffer.</param>
189 178 public void EnqueueRange(T[] data, int offset, int length) {
190 179 if (data == null)
191 180 throw new ArgumentNullException("data");
192 181 if (offset < 0)
193 182 throw new ArgumentOutOfRangeException("offset");
194 183 if (length < 1 || offset + length > data.Length)
195 184 throw new ArgumentOutOfRangeException("length");
196 185
197 186 var last = m_last;
198 187
199 188 bool extend;
200 189 int enqueued;
201 190
202 191 while (length > 0) {
203 192 extend = true;
204 193 if (last != null && last.TryEnqueueBatch(data, offset, length, out enqueued, out extend)) {
205 194 length -= enqueued;
206 195 offset += enqueued;
207 196 }
208 197
209 198 if (extend) {
210 199 // there was no enough space in the chunk
211 200 // or there was no chunks in the queue
212 201
213 202 while (length > 0) {
214 203
215 204 var size = Math.Min(length, MAX_CHUNK_SIZE);
216 205
217 206 var chunk = new Chunk(
218 207 Math.Max(size, m_chunkSize),
219 208 data,
220 209 offset,
221 210 size,
222 211 length // length >= size
223 212 );
224 213
225 214 if (!EnqueueChunk(last, chunk)) {
226 215 // looks like the queue has been updated then proceed from the beginning
227 216 last = m_last;
228 217 break;
229 218 }
230 219
231 220 // we have successfully added the new chunk
232 221 last = chunk;
233 222 length -= size;
234 223 offset += size;
235 224 }
236 225 } else {
237 226 // we don't need to extend the queue, if we successfully enqueued data
238 227 if (length == 0)
239 228 break;
240 229
241 230 // if we need to wait while someone is extending the queue
242 231 // spinwait
243 232 while (last == m_last) {
244 233 Thread.MemoryBarrier();
245 234 }
246 235
247 236 last = m_last;
248 237 }
249 238 }
250 239 }
251 240
252 241 /// <summary>
253 242 /// Tries to retrieve the first element from the queue.
254 243 /// </summary>
255 244 /// <returns><c>true</c>, if element is dequeued, <c>false</c> otherwise.</returns>
256 245 /// <param name="value">The value of the dequeued element.</param>
257 246 public bool TryDequeue(out T value) {
258 247 var chunk = m_first;
259 248 bool recycle;
260 249 while (chunk != null) {
261 250
262 251 var result = chunk.TryDequeue(out value, out recycle);
263 252
264 253 if (recycle) // this chunk is waste
265 254 RecycleFirstChunk(chunk);
266 255 else
267 256 return result; // this chunk is usable and returned actual result
268 257
269 258 if (result) // this chunk is waste but the true result is always actual
270 259 return true;
271 260
272 261 // try again
273 262 chunk = m_first;
274 263 }
275 264
276 265 // the queue is empty
277 266 value = default(T);
278 267 return false;
279 268 }
280 269
281 270 /// <summary>
282 271 /// Tries to dequeue the specified amount of data from the queue.
283 272 /// </summary>
284 273 /// <returns><c>true</c>, if data was deuqueued, <c>false</c> otherwise.</returns>
285 274 /// <param name="buffer">The buffer to which the data will be written.</param>
286 275 /// <param name="offset">The offset in the buffer at which the data will be written.</param>
287 276 /// <param name="length">The maximum amount of data to be retrieved.</param>
288 277 /// <param name="dequeued">The actual amout of the retrieved data.</param>
289 278 public bool TryDequeueRange(T[] buffer, int offset, int length, out int dequeued) {
290 279 if (buffer == null)
291 280 throw new ArgumentNullException("buffer");
292 281 if (offset < 0)
293 282 throw new ArgumentOutOfRangeException("offset");
294 283 if (length < 1 || offset + length > buffer.Length)
295 284 throw new ArgumentOutOfRangeException("length");
296 285
297 286 var chunk = m_first;
298 287 bool recycle;
299 288 dequeued = 0;
300 289 while (chunk != null) {
301 290
302 291 int actual;
303 292 if (chunk.TryDequeueBatch(buffer, offset, length, out actual, out recycle)) {
304 293 offset += actual;
305 294 length -= actual;
306 295 dequeued += actual;
307 296 }
308 297
309 298 if (recycle) // this chunk is waste
310 299 RecycleFirstChunk(chunk);
311 300 else if (actual == 0)
312 301 break; // the chunk is usable, but doesn't contain any data (it's the last chunk in the queue)
313 302
314 303 if (length == 0)
315 304 return true;
316 305
317 306 // we still may dequeue something
318 307 // try again
319 308 chunk = m_first;
320 309 }
321 310
322 311 return dequeued != 0;
323 312 }
324 313
325 314 /// <summary>
326 315 /// Tries to dequeue all remaining data in the first chunk.
327 316 /// </summary>
328 317 /// <returns><c>true</c>, if data was dequeued, <c>false</c> otherwise.</returns>
329 /// <param name="buffer">The buffer to which data will be written.</param>
318 /// <param name="buffer">The buffer to which the data will be written.</param>
330 319 /// <param name="offset">The offset in the buffer at which the data will be written.</param>
331 320 /// <param name="length">Tha maximum amount of the data to be dequeued.</param>
332 321 /// <param name="dequeued">The actual amount of the dequeued data.</param>
333 322 public bool TryDequeueChunk(T[] buffer, int offset, int length, out int dequeued) {
334 323 if (buffer == null)
335 324 throw new ArgumentNullException("buffer");
336 325 if (offset < 0)
337 326 throw new ArgumentOutOfRangeException("offset");
338 327 if (length < 1 || offset + length > buffer.Length)
339 328 throw new ArgumentOutOfRangeException("length");
340 329
341 330 var chunk = m_first;
342 331 bool recycle;
343 332 dequeued = 0;
344 333
345 334 while (chunk != null) {
346 335
347 336 int actual;
348 337 if (chunk.TryDequeueBatch(buffer, offset, length, out actual, out recycle)) {
349 338 dequeued = actual;
350 339 }
351 340
352 341 if (recycle) // this chunk is waste
353 342 RecycleFirstChunk(chunk);
354 343
355 344 // if we have dequeued any data, then return
356 345 if (dequeued != 0)
357 346 return true;
358 347
359 348 // we still may dequeue something
360 349 // try again
361 350 chunk = m_first;
362 351 }
363 352
364 353 return false;
365 354 }
366 355
367 356 bool EnqueueChunk(Chunk last, Chunk chunk) {
368 357 if (Interlocked.CompareExchange(ref m_last, chunk, last) != last)
369 358 return false;
370 359
371 360 if (last != null)
372 361 last.next = chunk;
373 362 else
374 363 m_first = chunk;
375 364 return true;
376 365 }
377 366
378 367 void RecycleFirstChunk(Chunk first) {
379 368 var next = first.next;
380 369
381 370 if (next == null) {
382 371 // looks like this is the last chunk
383 372 if (first != Interlocked.CompareExchange(ref m_last, null, first)) {
384 373 // race
385 374 // maybe someone already recycled this chunk
386 375 // or a new chunk has been appedned to the queue
387 376
388 377 return; // give up
389 378 }
390 379 // the tail is updated
391 380 }
392 381
393 382 // we need to update the head
394 383 Interlocked.CompareExchange(ref m_first, next, first);
395 384 // if the head is already updated then give up
396 385 return;
397 386
398 387 }
399 388
400 389 #region IEnumerable implementation
401 390
402 391 class Enumerator : IEnumerator<T> {
403 392 Chunk m_current;
404 393 int m_pos = -1;
405 394
406 395 public Enumerator(Chunk fisrt) {
407 396 m_current = fisrt;
408 397 }
409 398
410 399 #region IEnumerator implementation
411 400
412 401 public bool MoveNext() {
413 402 if (m_current == null)
414 403 return false;
415 404
416 405 if (m_pos == -1)
417 406 m_pos = m_current.Low;
418 407 else
419 408 m_pos++;
420 409 if (m_pos == m_current.Hi) {
421 410 m_pos = 0;
422 411 m_current = m_current.next;
423 412 }
424 413
425 414 return true;
426 415 }
427 416
428 417 public void Reset() {
429 418 throw new NotSupportedException();
430 419 }
431 420
432 421 object IEnumerator.Current {
433 422 get {
434 423 return Current;
435 424 }
436 425 }
437 426
438 427 #endregion
439 428
440 429 #region IDisposable implementation
441 430
442 431 public void Dispose() {
443 432 }
444 433
445 434 #endregion
446 435
447 436 #region IEnumerator implementation
448 437
449 438 public T Current {
450 439 get {
451 440 if (m_pos == -1 || m_current == null)
452 441 throw new InvalidOperationException();
453 442 return m_current.GetAt(m_pos);
454 443 }
455 444 }
456 445
457 446 #endregion
458 447 }
459 448
460 449 public IEnumerator<T> GetEnumerator() {
461 450 return new Enumerator(m_first);
462 451 }
463 452
464 453 #endregion
465 454
466 455 #region IEnumerable implementation
467 456
468 457 IEnumerator IEnumerable.GetEnumerator() {
469 458 return GetEnumerator();
470 459 }
471 460
472 461 #endregion
473 462 }
474 463 }
@@ -1,120 +1,120
1 1 using System;
2 2 using Implab.Diagnostics;
3 3 using Implab.Parallels;
4 4 using Implab;
5 5 using System.Collections.Generic;
6 6 using System.Collections.Concurrent;
7 7
8 8 namespace MonoPlay {
9 9 class MainClass {
10 10 public static void Main(string[] args) {
11 11 if (args == null)
12 12 throw new ArgumentNullException("args");
13 13
14 14 var q1 = new AsyncQueue<int>();
15 15 var q2 = new Queue<int>();
16 16
17 17 const int count = 10000000;
18 18
19 19 int res1 = 0, res2 = 0;
20 20 var t1 = Environment.TickCount;
21 21
22 22 AsyncPool.RunThread(
23 23 () => {
24 24 for (var i = 0; i < count; i++)
25 25 q1.Enqueue(1);
26 26 Console.WriteLine("done writer #1: {0} ms", Environment.TickCount - t1);
27 27 },
28 28 () => {
29 29 for (var i = 0; i < count; i++)
30 30 q1.Enqueue(2);
31 31 Console.WriteLine("done writer #2: {0} ms", Environment.TickCount - t1);
32 32 },
33 33 () => {
34 34 int temp = 0;
35 35 int i = 0;
36 36 while (i < count)
37 37 if (q1.TryDequeue(out temp)) {
38 38 i++;
39 39 res1 += temp;
40 40 }
41 41 Console.WriteLine("done reader #1: {0} ms", Environment.TickCount - t1);
42 42 },
43 43 () => {
44 44 int temp = 0;
45 45 int i = 0;
46 46 while (i < count)
47 47 if (q1.TryDequeue(out temp)) {
48 48 i++;
49 49 res2 += temp;
50 50 }
51 51 Console.WriteLine("done reader #2: {0} ms", Environment.TickCount - t1);
52 52 }
53 53 )
54 54 .Combine()
55 55 .Join();
56 56
57 Console.WriteLine("done: {0} ms, summ#1: {1}, summ#2: {2}, total: {3}", Environment.TickCount - t1, res1, res2, res1 + res2);
57 Console.WriteLine("done: {0} ms, summ#1: {1}, summ#2: {2}, total: {3}, count: {4}", Environment.TickCount - t1, res1, res2, res1 + res2, count*2);
58 58
59 59 var t2 = Environment.TickCount;
60 60 Console.WriteLine("MTQueue: {0} ms", t2 - t1);
61 61
62 62 t1 = Environment.TickCount;
63 63
64 64 for (var i = 0; i < count * 2; i++)
65 65 q2.Enqueue(i);
66 66
67 67 for (var i = 0; i < count * 2; i++)
68 68 q2.Dequeue();
69 69
70 70 t2 = Environment.TickCount;
71 71 Console.WriteLine("Queue: {0} ms", t2 - t1);
72 72
73 73 q2 = new Queue<int>();
74 74
75 75 t1 = Environment.TickCount;
76 76
77 77
78 78 AsyncPool.RunThread(
79 79 () => {
80 80 for (var i = 0; i < count; i++)
81 81 lock (q2)
82 82 q2.Enqueue(i);
83 83 },
84 84 () => {
85 85 for (var i = 0; i < count; i++)
86 86 lock (q2)
87 87 q2.Enqueue(i);
88 88 },
89 89 () => {
90 90 for (int i = 0; i < count ;)
91 91 lock (q2) {
92 92 if (q2.Count == 0)
93 93 continue;
94 94 q2.Dequeue();
95 95 i++;
96 96 }
97 97
98 98 },
99 99 () => {
100 100 for (int i = 0; i < count ;)
101 101 lock (q2) {
102 102 if (q2.Count == 0)
103 103 continue;
104 104 q2.Dequeue();
105 105 i++;
106 106 }
107 107
108 108 }
109 109 )
110 110 .Combine()
111 111 .Join();
112 112
113 113
114 114
115 115 t2 = Environment.TickCount;
116 116 Console.WriteLine("Queue+Lock: {0} ms", t2 - t1);
117 117
118 118 }
119 119 }
120 120 }
General Comments 0
You need to be logged in to leave comments. Login now