##// END OF EJS Templates
working version of AsyncQueue and batch operations...
cin -
r121:62d2f1e98c4e v2
parent child
Show More
@@ -1,513 +1,600
1 using System;
1 using System;
2 using System.Reflection;
2 using System.Reflection;
3 using System.Threading;
3 using System.Threading;
4 using Implab.Parallels;
4 using Implab.Parallels;
5
5
6 #if MONO
6 #if MONO
7
7
8 using NUnit.Framework;
8 using NUnit.Framework;
9 using TestClassAttribute = NUnit.Framework.TestFixtureAttribute;
9 using TestClassAttribute = NUnit.Framework.TestFixtureAttribute;
10 using TestMethod = NUnit.Framework.TestAttribute;
10 using TestMethod = NUnit.Framework.TestAttribute;
11
11
12 #else
12 #else
13
13
14 using Microsoft.VisualStudio.TestTools.UnitTesting;
14 using Microsoft.VisualStudio.TestTools.UnitTesting;
15
15
16 #endif
16 #endif
17
17
18 namespace Implab.Test {
18 namespace Implab.Test {
19 [TestClass]
19 [TestClass]
20 public class AsyncTests {
20 public class AsyncTests {
21 [TestMethod]
21 [TestMethod]
22 public void ResolveTest() {
22 public void ResolveTest() {
23 int res = -1;
23 int res = -1;
24 var p = new Promise<int>();
24 var p = new Promise<int>();
25 p.Then(x => res = x);
25 p.Then(x => res = x);
26 p.Resolve(100);
26 p.Resolve(100);
27
27
28 Assert.AreEqual(100, res);
28 Assert.AreEqual(100, res);
29 }
29 }
30
30
31 [TestMethod]
31 [TestMethod]
32 public void RejectTest() {
32 public void RejectTest() {
33 int res = -1;
33 int res = -1;
34 Exception err = null;
34 Exception err = null;
35
35
36 var p = new Promise<int>();
36 var p = new Promise<int>();
37 p.Then(
37 p.Then(
38 x => res = x,
38 x => res = x,
39 e => {
39 e => {
40 err = e;
40 err = e;
41 return -2;
41 return -2;
42 }
42 }
43 );
43 );
44 p.Reject(new ApplicationException("error"));
44 p.Reject(new ApplicationException("error"));
45
45
46 Assert.AreEqual(res, -1);
46 Assert.AreEqual(res, -1);
47 Assert.AreEqual(err.Message, "error");
47 Assert.AreEqual(err.Message, "error");
48
48
49 }
49 }
50
50
51 [TestMethod]
51 [TestMethod]
52 public void CancelExceptionTest() {
52 public void CancelExceptionTest() {
53 var p = new Promise<bool>();
53 var p = new Promise<bool>();
54 p.Cancel();
54 p.Cancel();
55
55
56 var p2 = p.Cancelled(() => {
56 var p2 = p.Cancelled(() => {
57 throw new ApplicationException("CANCELLED");
57 throw new ApplicationException("CANCELLED");
58 });
58 });
59
59
60 try {
60 try {
61 p2.Join();
61 p2.Join();
62 Assert.Fail();
62 Assert.Fail();
63 } catch (ApplicationException err) {
63 } catch (ApplicationException err) {
64 Assert.AreEqual("CANCELLED", err.InnerException.Message);
64 Assert.AreEqual("CANCELLED", err.InnerException.Message);
65 }
65 }
66
66
67 }
67 }
68
68
69 [TestMethod]
69 [TestMethod]
70 public void ContinueOnCancelTest() {
70 public void ContinueOnCancelTest() {
71 var p = new Promise<bool>();
71 var p = new Promise<bool>();
72 p.Cancel();
72 p.Cancel();
73
73
74 var p2 = p
74 var p2 = p
75 .Cancelled<bool>(() => {
75 .Cancelled<bool>(() => {
76 throw new ApplicationException("CANCELLED");
76 throw new ApplicationException("CANCELLED");
77 })
77 })
78 .Error(e => true);
78 .Error(e => true);
79
79
80 Assert.AreEqual(true, p2.Join());
80 Assert.AreEqual(true, p2.Join());
81 }
81 }
82
82
83 [TestMethod]
83 [TestMethod]
84 public void JoinSuccessTest() {
84 public void JoinSuccessTest() {
85 var p = new Promise<int>();
85 var p = new Promise<int>();
86 p.Resolve(100);
86 p.Resolve(100);
87 Assert.AreEqual(p.Join(), 100);
87 Assert.AreEqual(p.Join(), 100);
88 }
88 }
89
89
90 [TestMethod]
90 [TestMethod]
91 public void JoinFailTest() {
91 public void JoinFailTest() {
92 var p = new Promise<int>();
92 var p = new Promise<int>();
93 p.Reject(new ApplicationException("failed"));
93 p.Reject(new ApplicationException("failed"));
94
94
95 try {
95 try {
96 p.Join();
96 p.Join();
97 throw new ApplicationException("WRONG!");
97 throw new ApplicationException("WRONG!");
98 } catch (TargetInvocationException err) {
98 } catch (TargetInvocationException err) {
99 Assert.AreEqual(err.InnerException.Message, "failed");
99 Assert.AreEqual(err.InnerException.Message, "failed");
100 } catch {
100 } catch {
101 Assert.Fail("Got wrong excaption");
101 Assert.Fail("Got wrong excaption");
102 }
102 }
103 }
103 }
104
104
105 [TestMethod]
105 [TestMethod]
106 public void MapTest() {
106 public void MapTest() {
107 var p = new Promise<int>();
107 var p = new Promise<int>();
108
108
109 var p2 = p.Then(x => x.ToString());
109 var p2 = p.Then(x => x.ToString());
110 p.Resolve(100);
110 p.Resolve(100);
111
111
112 Assert.AreEqual(p2.Join(), "100");
112 Assert.AreEqual(p2.Join(), "100");
113 }
113 }
114
114
115 [TestMethod]
115 [TestMethod]
116 public void FixErrorTest() {
116 public void FixErrorTest() {
117 var p = new Promise<int>();
117 var p = new Promise<int>();
118
118
119 var p2 = p.Error(e => 101);
119 var p2 = p.Error(e => 101);
120
120
121 p.Reject(new Exception());
121 p.Reject(new Exception());
122
122
123 Assert.AreEqual(p2.Join(), 101);
123 Assert.AreEqual(p2.Join(), 101);
124 }
124 }
125
125
126 [TestMethod]
126 [TestMethod]
127 public void ChainTest() {
127 public void ChainTest() {
128 var p1 = new Promise<int>();
128 var p1 = new Promise<int>();
129
129
130 var p3 = p1.Chain(x => {
130 var p3 = p1.Chain(x => {
131 var p2 = new Promise<string>();
131 var p2 = new Promise<string>();
132 p2.Resolve(x.ToString());
132 p2.Resolve(x.ToString());
133 return p2;
133 return p2;
134 });
134 });
135
135
136 p1.Resolve(100);
136 p1.Resolve(100);
137
137
138 Assert.AreEqual(p3.Join(), "100");
138 Assert.AreEqual(p3.Join(), "100");
139 }
139 }
140
140
141 [TestMethod]
141 [TestMethod]
142 public void ChainFailTest() {
142 public void ChainFailTest() {
143 var p1 = new Promise<int>();
143 var p1 = new Promise<int>();
144
144
145 var p3 = p1.Chain(x => {
145 var p3 = p1.Chain(x => {
146 var p2 = new Promise<string>();
146 var p2 = new Promise<string>();
147 p2.Reject(new Exception("DIE!!!"));
147 p2.Reject(new Exception("DIE!!!"));
148 return p2;
148 return p2;
149 });
149 });
150
150
151 p1.Resolve(100);
151 p1.Resolve(100);
152
152
153 Assert.IsTrue(p3.IsResolved);
153 Assert.IsTrue(p3.IsResolved);
154 }
154 }
155
155
156 [TestMethod]
156 [TestMethod]
157 public void PoolTest() {
157 public void PoolTest() {
158 var pid = Thread.CurrentThread.ManagedThreadId;
158 var pid = Thread.CurrentThread.ManagedThreadId;
159 var p = AsyncPool.Invoke(() => Thread.CurrentThread.ManagedThreadId);
159 var p = AsyncPool.Invoke(() => Thread.CurrentThread.ManagedThreadId);
160
160
161 Assert.AreNotEqual(pid, p.Join());
161 Assert.AreNotEqual(pid, p.Join());
162 }
162 }
163
163
164 [TestMethod]
164 [TestMethod]
165 public void WorkerPoolSizeTest() {
165 public void WorkerPoolSizeTest() {
166 var pool = new WorkerPool(5, 10, 1);
166 var pool = new WorkerPool(5, 10, 1);
167
167
168 Assert.AreEqual(5, pool.PoolSize);
168 Assert.AreEqual(5, pool.PoolSize);
169
169
170 pool.Invoke(() => { Thread.Sleep(100000000); return 10; });
170 pool.Invoke(() => { Thread.Sleep(100000000); return 10; });
171 pool.Invoke(() => { Thread.Sleep(100000000); return 10; });
171 pool.Invoke(() => { Thread.Sleep(100000000); return 10; });
172 pool.Invoke(() => { Thread.Sleep(100000000); return 10; });
172 pool.Invoke(() => { Thread.Sleep(100000000); return 10; });
173
173
174 Assert.AreEqual(5, pool.PoolSize);
174 Assert.AreEqual(5, pool.PoolSize);
175
175
176 for (int i = 0; i < 100; i++)
176 for (int i = 0; i < 100; i++)
177 pool.Invoke(() => { Thread.Sleep(100000000); return 10; });
177 pool.Invoke(() => { Thread.Sleep(100000000); return 10; });
178 Thread.Sleep(200);
178 Thread.Sleep(200);
179 Assert.AreEqual(10, pool.PoolSize);
179 Assert.AreEqual(10, pool.PoolSize);
180
180
181 pool.Dispose();
181 pool.Dispose();
182 }
182 }
183
183
184 [TestMethod]
184 [TestMethod]
185 public void WorkerPoolCorrectTest() {
185 public void WorkerPoolCorrectTest() {
186 var pool = new WorkerPool(0,1000,100);
186 var pool = new WorkerPool(0,1000,100);
187
187
188 const int iterations = 1000;
188 const int iterations = 1000;
189 int pending = iterations;
189 int pending = iterations;
190 var stop = new ManualResetEvent(false);
190 var stop = new ManualResetEvent(false);
191
191
192 var count = 0;
192 var count = 0;
193 for (int i = 0; i < iterations; i++) {
193 for (int i = 0; i < iterations; i++) {
194 pool
194 pool
195 .Invoke(() => 1)
195 .Invoke(() => 1)
196 .Then(x => Interlocked.Add(ref count, x))
196 .Then(x => Interlocked.Add(ref count, x))
197 .Then(x => Math.Log10(x))
197 .Then(x => Math.Log10(x))
198 .On(() => {
198 .On(() => {
199 Interlocked.Decrement(ref pending);
199 Interlocked.Decrement(ref pending);
200 if (pending == 0)
200 if (pending == 0)
201 stop.Set();
201 stop.Set();
202 }, PromiseEventType.All);
202 }, PromiseEventType.All);
203 }
203 }
204
204
205 stop.WaitOne();
205 stop.WaitOne();
206
206
207 Assert.AreEqual(iterations, count);
207 Assert.AreEqual(iterations, count);
208 Console.WriteLine("Max threads: {0}", pool.MaxRunningThreads);
208 Console.WriteLine("Max threads: {0}", pool.MaxRunningThreads);
209 pool.Dispose();
209 pool.Dispose();
210
210
211 }
211 }
212
212
213 [TestMethod]
213 [TestMethod]
214 public void WorkerPoolDisposeTest() {
214 public void WorkerPoolDisposeTest() {
215 var pool = new WorkerPool(5, 20);
215 var pool = new WorkerPool(5, 20);
216 Assert.AreEqual(5, pool.PoolSize);
216 Assert.AreEqual(5, pool.PoolSize);
217 pool.Dispose();
217 pool.Dispose();
218 Thread.Sleep(500);
218 Thread.Sleep(500);
219 Assert.AreEqual(0, pool.PoolSize);
219 Assert.AreEqual(0, pool.PoolSize);
220 pool.Dispose();
220 pool.Dispose();
221 }
221 }
222
222
223 [TestMethod]
223 [TestMethod]
224 public void MTQueueTest() {
224 public void MTQueueTest() {
225 var queue = new MTQueue<int>();
225 var queue = new MTQueue<int>();
226 int res;
226 int res;
227
227
228 queue.Enqueue(10);
228 queue.Enqueue(10);
229 Assert.IsTrue(queue.TryDequeue(out res));
229 Assert.IsTrue(queue.TryDequeue(out res));
230 Assert.AreEqual(10, res);
230 Assert.AreEqual(10, res);
231 Assert.IsFalse(queue.TryDequeue(out res));
231 Assert.IsFalse(queue.TryDequeue(out res));
232
232
233 for (int i = 0; i < 1000; i++)
233 for (int i = 0; i < 1000; i++)
234 queue.Enqueue(i);
234 queue.Enqueue(i);
235
235
236 for (int i = 0; i < 1000; i++) {
236 for (int i = 0; i < 1000; i++) {
237 queue.TryDequeue(out res);
237 queue.TryDequeue(out res);
238 Assert.AreEqual(i, res);
238 Assert.AreEqual(i, res);
239 }
239 }
240
240
241 int writers = 0;
241 int writers = 0;
242 int readers = 0;
242 int readers = 0;
243 var stop = new ManualResetEvent(false);
243 var stop = new ManualResetEvent(false);
244 int total = 0;
244 int total = 0;
245
245
246 const int itemsPerWriter = 10000;
246 const int itemsPerWriter = 10000;
247 const int writersCount = 10;
247 const int writersCount = 10;
248
248
249 for (int i = 0; i < writersCount; i++) {
249 for (int i = 0; i < writersCount; i++) {
250 Interlocked.Increment(ref writers);
250 Interlocked.Increment(ref writers);
251 AsyncPool
251 AsyncPool
252 .InvokeNewThread(() => {
252 .InvokeNewThread(() => {
253 for (int ii = 0; ii < itemsPerWriter; ii++) {
253 for (int ii = 0; ii < itemsPerWriter; ii++) {
254 queue.Enqueue(1);
254 queue.Enqueue(1);
255 }
255 }
256 return 1;
256 return 1;
257 })
257 })
258 .On(() => Interlocked.Decrement(ref writers), PromiseEventType.All);
258 .On(() => Interlocked.Decrement(ref writers), PromiseEventType.All);
259 }
259 }
260
260
261 for (int i = 0; i < 10; i++) {
261 for (int i = 0; i < 10; i++) {
262 Interlocked.Increment(ref readers);
262 Interlocked.Increment(ref readers);
263 AsyncPool
263 AsyncPool
264 .InvokeNewThread(() => {
264 .InvokeNewThread(() => {
265 int t;
265 int t;
266 do {
266 do {
267 while (queue.TryDequeue(out t))
267 while (queue.TryDequeue(out t))
268 Interlocked.Add(ref total, t);
268 Interlocked.Add(ref total, t);
269 } while (writers > 0);
269 } while (writers > 0);
270 return 1;
270 return 1;
271 })
271 })
272 .On(() => {
272 .On(() => {
273 Interlocked.Decrement(ref readers);
273 Interlocked.Decrement(ref readers);
274 if (readers == 0)
274 if (readers == 0)
275 stop.Set();
275 stop.Set();
276 }, PromiseEventType.All);
276 }, PromiseEventType.All);
277 }
277 }
278
278
279 stop.WaitOne();
279 stop.WaitOne();
280
280
281 Assert.AreEqual(100000, total);
281 Assert.AreEqual(100000, total);
282 }
282 }
283
283
284 [TestMethod]
284 [TestMethod]
285 public void AsyncQueueTest() {
285 public void AsyncQueueTest() {
286 var queue = new AsyncQueue<int>();
286 var queue = new AsyncQueue<int>();
287 int res;
287 int res;
288
288
289 queue.Enqueue(10);
289 queue.Enqueue(10);
290 Assert.IsTrue(queue.TryDequeue(out res));
290 Assert.IsTrue(queue.TryDequeue(out res));
291 Assert.AreEqual(10, res);
291 Assert.AreEqual(10, res);
292 Assert.IsFalse(queue.TryDequeue(out res));
292 Assert.IsFalse(queue.TryDequeue(out res));
293
293
294 for (int i = 0; i < 1000; i++)
294 for (int i = 0; i < 1000; i++)
295 queue.Enqueue(i);
295 queue.Enqueue(i);
296
296
297 for (int i = 0; i < 1000; i++) {
297 for (int i = 0; i < 1000; i++) {
298 queue.TryDequeue(out res);
298 queue.TryDequeue(out res);
299 Assert.AreEqual(i, res);
299 Assert.AreEqual(i, res);
300 }
300 }
301
301
302 int writers = 0;
302 const int count = 10000000;
303 int readers = 0;
304 var stop = new ManualResetEvent(false);
305 int total = 0;
306
303
307 const int itemsPerWriter = 10000;
304 int res1 = 0, res2 = 0;
308 const int writersCount = 10;
305 var t1 = Environment.TickCount;
309
306
310 for (int i = 0; i < writersCount; i++) {
307 AsyncPool.RunThread(
311 Interlocked.Increment(ref writers);
308 () => {
312 AsyncPool
309 for (var i = 0; i < count; i++)
313 .InvokeNewThread(() => {
310 queue.Enqueue(1);
314 for (int ii = 0; ii < itemsPerWriter; ii++) {
311 Console.WriteLine("done writer #1: {0} ms", Environment.TickCount - t1);
315 queue.Enqueue(1);
312 },
313 () => {
314 for (var i = 0; i < count; i++)
315 queue.Enqueue(2);
316 Console.WriteLine("done writer #2: {0} ms", Environment.TickCount - t1);
317 },
318 () => {
319 int temp;
320 int i = 0;
321 while (i < count)
322 if (queue.TryDequeue(out temp)) {
323 i++;
324 res1 += temp;
316 }
325 }
317 return 1;
326 Console.WriteLine("done reader #1: {0} ms", Environment.TickCount - t1);
318 })
327 },
319 .On(() => Interlocked.Decrement(ref writers), PromiseEventType.All);
328 () => {
320 }
329 int temp;
330 int i = 0;
331 while (i < count)
332 if (queue.TryDequeue(out temp)) {
333 i++;
334 res2 += temp;
335 }
336 Console.WriteLine("done reader #2: {0} ms", Environment.TickCount - t1);
337 }
338 )
339 .Combine()
340 .Join();
341
342 Assert.AreEqual(count * 3, res1 + res2);
343
344 Console.WriteLine(
345 "done: {0} ms, summ#1: {1}, summ#2: {2}, total: {3}, count: {4}",
346 Environment.TickCount - t1,
347 res1,
348 res2,
349 res1 + res2,
350 count
351 );
352 }
353
354 [TestMethod]
355 public void AsyncQueueBatchTest() {
356 var queue = new AsyncQueue<int>();
357
358 const int wBatch = 29;
359 const int wCount = 400000;
360 const int total = wBatch * wCount * 2;
361 const int summ = wBatch * wCount * 3;
321
362
322 for (int i = 0; i < 10; i++) {
363 int r1 = 0, r2 = 0;
323 Interlocked.Increment(ref readers);
364 const int rBatch = 111;
324 AsyncPool
365 int read = 0;
325 .InvokeNewThread(() => {
366
326 int t;
367 var t1 = Environment.TickCount;
327 do {
368
328 while (queue.TryDequeue(out t))
369 AsyncPool.RunThread(
329 Interlocked.Add(ref total, t);
370 () => {
330 } while (writers > 0);
371 var buffer = new int[wBatch];
331 return 1;
372 for(int i = 0; i<wBatch; i++)
332 })
373 buffer[i] = 1;
333 .On(() => {
374
334 Interlocked.Decrement(ref readers);
375 for(int i =0; i < wCount; i++)
335 if (readers == 0)
376 queue.EnqueueRange(buffer,0,wBatch);
336 stop.Set();
377 Console.WriteLine("done writer #1: {0} ms", Environment.TickCount - t1);
337 }, PromiseEventType.All);
378 },
338 }
379 () => {
380 var buffer = new int[wBatch];
381 for(int i = 0; i<wBatch; i++)
382 buffer[i] = 2;
383
384 for(int i =0; i < wCount; i++)
385 queue.EnqueueRange(buffer,0,wBatch);
386 Console.WriteLine("done writer #2: {0} ms", Environment.TickCount - t1);
387 },
388 () => {
389 var buffer = new int[rBatch];
339
390
340 stop.WaitOne();
391 while(read < total) {
392 int actual;
393 if (queue.TryDequeueRange(buffer,0,rBatch,out actual)) {
394 for(int i=0; i< actual; i++)
395 r1 += buffer[i];
396 Interlocked.Add(ref read, actual);
397 }
398 }
399
400 Console.WriteLine("done reader #1: {0} ms", Environment.TickCount - t1);
401 },
402 () => {
403 var buffer = new int[rBatch];
341
404
342 Assert.AreEqual(itemsPerWriter * writersCount, total);
405 while(read < total) {
406 int actual;
407 if (queue.TryDequeueRange(buffer,0,rBatch,out actual)) {
408 for(int i=0; i< actual; i++)
409 r2 += buffer[i];
410 Interlocked.Add(ref read, actual);
411 }
412 }
413
414 Console.WriteLine("done reader #2: {0} ms", Environment.TickCount - t1);
415 }
416 )
417 .Combine()
418 .Join();
419
420 Assert.AreEqual(summ , r1 + r2);
421
422 Console.WriteLine(
423 "done: {0} ms, summ#1: {1}, summ#2: {2}, total: {3}, count: {4}",
424 Environment.TickCount - t1,
425 r1,
426 r2,
427 r1 + r2,
428 total
429 );
343 }
430 }
344
431
345 [TestMethod]
432 [TestMethod]
346 public void ParallelMapTest() {
433 public void ParallelMapTest() {
347
434
348 const int count = 100000;
435 const int count = 100000;
349
436
350 var args = new double[count];
437 var args = new double[count];
351 var rand = new Random();
438 var rand = new Random();
352
439
353 for (int i = 0; i < count; i++)
440 for (int i = 0; i < count; i++)
354 args[i] = rand.NextDouble();
441 args[i] = rand.NextDouble();
355
442
356 var t = Environment.TickCount;
443 var t = Environment.TickCount;
357 var res = args.ParallelMap(x => Math.Sin(x*x), 4).Join();
444 var res = args.ParallelMap(x => Math.Sin(x*x), 4).Join();
358
445
359 Console.WriteLine("Map complete in {0} ms", Environment.TickCount - t);
446 Console.WriteLine("Map complete in {0} ms", Environment.TickCount - t);
360
447
361 t = Environment.TickCount;
448 t = Environment.TickCount;
362 for (int i = 0; i < count; i++)
449 for (int i = 0; i < count; i++)
363 Assert.AreEqual(Math.Sin(args[i] * args[i]), res[i]);
450 Assert.AreEqual(Math.Sin(args[i] * args[i]), res[i]);
364 Console.WriteLine("Verified in {0} ms", Environment.TickCount - t);
451 Console.WriteLine("Verified in {0} ms", Environment.TickCount - t);
365 }
452 }
366
453
367 [TestMethod]
454 [TestMethod]
368 public void ChainedMapTest() {
455 public void ChainedMapTest() {
369
456
370 using (var pool = new WorkerPool(0,10,1)) {
457 using (var pool = new WorkerPool(0,10,1)) {
371 const int count = 10000;
458 const int count = 10000;
372
459
373 var args = new double[count];
460 var args = new double[count];
374 var rand = new Random();
461 var rand = new Random();
375
462
376 for (int i = 0; i < count; i++)
463 for (int i = 0; i < count; i++)
377 args[i] = rand.NextDouble();
464 args[i] = rand.NextDouble();
378
465
379 var t = Environment.TickCount;
466 var t = Environment.TickCount;
380 var res = args
467 var res = args
381 .ChainedMap(
468 .ChainedMap(
382 // Analysis disable once AccessToDisposedClosure
469 // Analysis disable once AccessToDisposedClosure
383 x => pool.Invoke(
470 x => pool.Invoke(
384 () => Math.Sin(x * x)
471 () => Math.Sin(x * x)
385 ),
472 ),
386 4
473 4
387 )
474 )
388 .Join();
475 .Join();
389
476
390 Console.WriteLine("Map complete in {0} ms", Environment.TickCount - t);
477 Console.WriteLine("Map complete in {0} ms", Environment.TickCount - t);
391
478
392 t = Environment.TickCount;
479 t = Environment.TickCount;
393 for (int i = 0; i < count; i++)
480 for (int i = 0; i < count; i++)
394 Assert.AreEqual(Math.Sin(args[i] * args[i]), res[i]);
481 Assert.AreEqual(Math.Sin(args[i] * args[i]), res[i]);
395 Console.WriteLine("Verified in {0} ms", Environment.TickCount - t);
482 Console.WriteLine("Verified in {0} ms", Environment.TickCount - t);
396 Console.WriteLine("Max workers: {0}", pool.MaxRunningThreads);
483 Console.WriteLine("Max workers: {0}", pool.MaxRunningThreads);
397 }
484 }
398 }
485 }
399
486
400 [TestMethod]
487 [TestMethod]
401 public void ParallelForEachTest() {
488 public void ParallelForEachTest() {
402
489
403 const int count = 100000;
490 const int count = 100000;
404
491
405 var args = new int[count];
492 var args = new int[count];
406 var rand = new Random();
493 var rand = new Random();
407
494
408 for (int i = 0; i < count; i++)
495 for (int i = 0; i < count; i++)
409 args[i] = (int)(rand.NextDouble() * 100);
496 args[i] = (int)(rand.NextDouble() * 100);
410
497
411 int result = 0;
498 int result = 0;
412
499
413 var t = Environment.TickCount;
500 var t = Environment.TickCount;
414 args.ParallelForEach(x => Interlocked.Add(ref result, x), 4).Join();
501 args.ParallelForEach(x => Interlocked.Add(ref result, x), 4).Join();
415
502
416 Console.WriteLine("Iteration complete in {0} ms, result: {1}", Environment.TickCount - t, result);
503 Console.WriteLine("Iteration complete in {0} ms, result: {1}", Environment.TickCount - t, result);
417
504
418 int result2 = 0;
505 int result2 = 0;
419
506
420 t = Environment.TickCount;
507 t = Environment.TickCount;
421 for (int i = 0; i < count; i++)
508 for (int i = 0; i < count; i++)
422 result2 += args[i];
509 result2 += args[i];
423 Assert.AreEqual(result2, result);
510 Assert.AreEqual(result2, result);
424 Console.WriteLine("Verified in {0} ms", Environment.TickCount - t);
511 Console.WriteLine("Verified in {0} ms", Environment.TickCount - t);
425 }
512 }
426
513
427 [TestMethod]
514 [TestMethod]
428 public void ComplexCase1Test() {
515 public void ComplexCase1Test() {
429 var flags = new bool[3];
516 var flags = new bool[3];
430
517
431 // op1 (aync 200ms) => op2 (async 200ms) => op3 (sync map)
518 // op1 (aync 200ms) => op2 (async 200ms) => op3 (sync map)
432
519
433 var step1 = PromiseHelper
520 var step1 = PromiseHelper
434 .Sleep(200, "Alan")
521 .Sleep(200, "Alan")
435 .On(() => flags[0] = true, PromiseEventType.Cancelled);
522 .On(() => flags[0] = true, PromiseEventType.Cancelled);
436 var p = step1
523 var p = step1
437 .Chain(x =>
524 .Chain(x =>
438 PromiseHelper
525 PromiseHelper
439 .Sleep(200, "Hi, " + x)
526 .Sleep(200, "Hi, " + x)
440 .Then(y => y)
527 .Then(y => y)
441 .On(() => flags[1] = true, PromiseEventType.Cancelled)
528 .On(() => flags[1] = true, PromiseEventType.Cancelled)
442 )
529 )
443 .On(() => flags[2] = true, PromiseEventType.Cancelled);
530 .On(() => flags[2] = true, PromiseEventType.Cancelled);
444 step1.Join();
531 step1.Join();
445 p.Cancel();
532 p.Cancel();
446 try {
533 try {
447 Assert.AreEqual(p.Join(), "Hi, Alan");
534 Assert.AreEqual(p.Join(), "Hi, Alan");
448 Assert.Fail("Shouldn't get here");
535 Assert.Fail("Shouldn't get here");
449 } catch (OperationCanceledException) {
536 } catch (OperationCanceledException) {
450 }
537 }
451
538
452 Assert.IsFalse(flags[0]);
539 Assert.IsFalse(flags[0]);
453 Assert.IsTrue(flags[1]);
540 Assert.IsTrue(flags[1]);
454 Assert.IsTrue(flags[2]);
541 Assert.IsTrue(flags[2]);
455 }
542 }
456
543
457 [TestMethod]
544 [TestMethod]
458 public void ChainedCancel1Test() {
545 public void ChainedCancel1Test() {
459 // ΠΏΡ€ΠΈ ΠΎΡ‚ΠΌΠ΅Π½Π΅ сцСплСнной асинхронной ΠΎΠΏΠ΅Ρ€Π°Ρ†ΠΈΠΈ всС ΠΎΠ±Π΅Ρ‰Π°Π½ΠΈΠ΅ Π΄ΠΎΠ»ΠΆΠ½ΠΎ
546 // ΠΏΡ€ΠΈ ΠΎΡ‚ΠΌΠ΅Π½Π΅ сцСплСнной асинхронной ΠΎΠΏΠ΅Ρ€Π°Ρ†ΠΈΠΈ всС ΠΎΠ±Π΅Ρ‰Π°Π½ΠΈΠ΅ Π΄ΠΎΠ»ΠΆΠ½ΠΎ
460 // Π·Π°Π²Π΅Ρ€ΡˆΠ°Ρ‚ΡŒΡΡ ошибкой OperationCanceledException
547 // Π·Π°Π²Π΅Ρ€ΡˆΠ°Ρ‚ΡŒΡΡ ошибкой OperationCanceledException
461 var p = PromiseHelper
548 var p = PromiseHelper
462 .Sleep(1, "Hi, HAL!")
549 .Sleep(1, "Hi, HAL!")
463 .Then(x => {
550 .Then(x => {
464 // запускаСм Π΄Π²Π΅ асинхронныС ΠΎΠΏΠ΅Ρ€Π°Ρ†ΠΈΠΈ
551 // запускаСм Π΄Π²Π΅ асинхронныС ΠΎΠΏΠ΅Ρ€Π°Ρ†ΠΈΠΈ
465 var result = PromiseHelper.Sleep(1000, "HEM ENABLED!!!");
552 var result = PromiseHelper.Sleep(1000, "HEM ENABLED!!!");
466 // вторая опСрация отмСняСт ΠΏΠ΅Ρ€Π²ΡƒΡŽ Π΄ΠΎ Π·Π°Π²Π΅Ρ€ΡˆΠ΅Π½ΠΈΡ
553 // вторая опСрация отмСняСт ΠΏΠ΅Ρ€Π²ΡƒΡŽ Π΄ΠΎ Π·Π°Π²Π΅Ρ€ΡˆΠ΅Π½ΠΈΡ
467 PromiseHelper
554 PromiseHelper
468 .Sleep(100, "HAL, STOP!")
555 .Sleep(100, "HAL, STOP!")
469 .Then(result.Cancel);
556 .Then(result.Cancel);
470 return result;
557 return result;
471 });
558 });
472 try {
559 try {
473 p.Join();
560 p.Join();
474 } catch (TargetInvocationException err) {
561 } catch (TargetInvocationException err) {
475 Assert.IsTrue(err.InnerException is OperationCanceledException);
562 Assert.IsTrue(err.InnerException is OperationCanceledException);
476 }
563 }
477 }
564 }
478
565
479 [TestMethod]
566 [TestMethod]
480 public void ChainedCancel2Test() {
567 public void ChainedCancel2Test() {
481 // ΠΏΡ€ΠΈ ΠΎΡ‚ΠΌΠ΅Π½Π΅ Ρ†Π΅ΠΏΠΎΡ‡ΠΊΠΈ ΠΎΠ±Π΅Ρ‰Π°Π½ΠΈΠΉ, Π²Π»ΠΎΠΆΠ΅Π½Π½Ρ‹Π΅ ΠΎΠΏΠ΅Ρ€Π°Ρ†ΠΈΠΈ Ρ‚Π°ΠΊΠΆΠ΅ Π΄ΠΎΠ»ΠΆΠ½Ρ‹ ΠΎΡ‚ΠΌΠ΅Π½ΡΡ‚ΡŒΡΡ
568 // ΠΏΡ€ΠΈ ΠΎΡ‚ΠΌΠ΅Π½Π΅ Ρ†Π΅ΠΏΠΎΡ‡ΠΊΠΈ ΠΎΠ±Π΅Ρ‰Π°Π½ΠΈΠΉ, Π²Π»ΠΎΠΆΠ΅Π½Π½Ρ‹Π΅ ΠΎΠΏΠ΅Ρ€Π°Ρ†ΠΈΠΈ Ρ‚Π°ΠΊΠΆΠ΅ Π΄ΠΎΠ»ΠΆΠ½Ρ‹ ΠΎΡ‚ΠΌΠ΅Π½ΡΡ‚ΡŒΡΡ
482 var pSurvive = new Promise<bool>();
569 var pSurvive = new Promise<bool>();
483 var hemStarted = new ManualResetEvent(false);
570 var hemStarted = new ManualResetEvent(false);
484 var p = PromiseHelper
571 var p = PromiseHelper
485 .Sleep(1, "Hi, HAL!")
572 .Sleep(1, "Hi, HAL!")
486 .Chain(x => {
573 .Chain(x => {
487 hemStarted.Set();
574 hemStarted.Set();
488 // запускаСм Π΄Π²Π΅ асинхронныС ΠΎΠΏΠ΅Ρ€Π°Ρ†ΠΈΠΈ
575 // запускаСм Π΄Π²Π΅ асинхронныС ΠΎΠΏΠ΅Ρ€Π°Ρ†ΠΈΠΈ
489 var result = PromiseHelper
576 var result = PromiseHelper
490 .Sleep(100000000, "HEM ENABLED!!!")
577 .Sleep(100000000, "HEM ENABLED!!!")
491 .Then(s => {
578 .Then(s => {
492 pSurvive.Resolve(false);
579 pSurvive.Resolve(false);
493 return s;
580 return s;
494 });
581 });
495
582
496 result
583 result
497 .Cancelled(() => pSurvive.Resolve(true));
584 .Cancelled(() => pSurvive.Resolve(true));
498
585
499 return result;
586 return result;
500 });
587 });
501
588
502 hemStarted.WaitOne();
589 hemStarted.WaitOne();
503 p.Cancel();
590 p.Cancel();
504
591
505 try {
592 try {
506 p.Join();
593 p.Join();
507 } catch (OperationCanceledException) {
594 } catch (OperationCanceledException) {
508 Assert.IsTrue(pSurvive.Join());
595 Assert.IsTrue(pSurvive.Join());
509 }
596 }
510 }
597 }
511 }
598 }
512 }
599 }
513
600
@@ -1,86 +1,86
1 using Implab.Diagnostics;
1 using Implab.Diagnostics;
2 using System;
2 using System;
3 using System.Threading;
3 using System.Threading;
4 using System.Linq;
4 using System.Linq;
5
5
6 namespace Implab.Parallels {
6 namespace Implab.Parallels {
7 /// <summary>
7 /// <summary>
8 /// Класс для распаралСливания Π·Π°Π΄Π°Ρ‡.
8 /// Класс для распаралСливания Π·Π°Π΄Π°Ρ‡.
9 /// </summary>
9 /// </summary>
10 /// <remarks>
10 /// <remarks>
11 /// Π˜ΡΠΏΠΎΠ»ΡŒΠ·ΡƒΡ Π΄Π°Π½Π½Ρ‹ΠΉ класс ΠΈ лямда выраТСния ΠΌΠΎΠΆΠ½ΠΎ Ρ€Π°ΡΠΏΠ°Ρ€Π°Π»Π»Π΅Π»ΠΈΡ‚ΡŒ
11 /// Π˜ΡΠΏΠΎΠ»ΡŒΠ·ΡƒΡ Π΄Π°Π½Π½Ρ‹ΠΉ класс ΠΈ лямда выраТСния ΠΌΠΎΠΆΠ½ΠΎ Ρ€Π°ΡΠΏΠ°Ρ€Π°Π»Π»Π΅Π»ΠΈΡ‚ΡŒ
12 /// вычислСния, для этого ΠΈΡΠΏΠΎΠ»ΡŒΠ·ΡƒΠ΅Ρ‚ΡΡ концСпция ΠΎΠ±Π΅Ρ‰Π°Π½ΠΈΠΉ.
12 /// вычислСния, для этого ΠΈΡΠΏΠΎΠ»ΡŒΠ·ΡƒΠ΅Ρ‚ΡΡ концСпция ΠΎΠ±Π΅Ρ‰Π°Π½ΠΈΠΉ.
13 /// </remarks>
13 /// </remarks>
14 public static class AsyncPool {
14 public static class AsyncPool {
15
15
16 public static IPromise<T> Invoke<T>(Func<T> func) {
16 public static IPromise<T> Invoke<T>(Func<T> func) {
17 var p = new Promise<T>();
17 var p = new Promise<T>();
18 var caller = TraceContext.Instance.CurrentOperation;
18 var caller = TraceContext.Instance.CurrentOperation;
19
19
20 ThreadPool.QueueUserWorkItem(param => {
20 ThreadPool.QueueUserWorkItem(param => {
21 TraceContext.Instance.EnterLogicalOperation(caller,false);
21 TraceContext.Instance.EnterLogicalOperation(caller,false);
22 try {
22 try {
23 p.Resolve(func());
23 p.Resolve(func());
24 } catch(Exception e) {
24 } catch(Exception e) {
25 p.Reject(e);
25 p.Reject(e);
26 } finally {
26 } finally {
27 TraceContext.Instance.Leave();
27 TraceContext.Instance.Leave();
28 }
28 }
29 });
29 });
30
30
31 return p;
31 return p;
32 }
32 }
33
33
34 public static IPromise<T> InvokeNewThread<T>(Func<T> func) {
34 public static IPromise<T> InvokeNewThread<T>(Func<T> func) {
35 var p = new Promise<T>();
35 var p = new Promise<T>();
36
36
37 var caller = TraceContext.Instance.CurrentOperation;
37 var caller = TraceContext.Instance.CurrentOperation;
38
38
39 var worker = new Thread(() => {
39 var worker = new Thread(() => {
40 TraceContext.Instance.EnterLogicalOperation(caller,false);
40 TraceContext.Instance.EnterLogicalOperation(caller,false);
41 try {
41 try {
42 p.Resolve(func());
42 p.Resolve(func());
43 } catch (Exception e) {
43 } catch (Exception e) {
44 p.Reject(e);
44 p.Reject(e);
45 } finally {
45 } finally {
46 TraceContext.Instance.Leave();
46 TraceContext.Instance.Leave();
47 }
47 }
48 });
48 });
49 worker.IsBackground = true;
49 worker.IsBackground = true;
50 worker.Start();
50 worker.Start();
51
51
52 return p;
52 return p;
53 }
53 }
54
54
55
55
56 public static IPromise InvokeNewThread(Action func) {
56 public static IPromise InvokeNewThread(Action func) {
57 var p = new Promise();
57 var p = new Promise();
58
58
59 var caller = TraceContext.Instance.CurrentOperation;
59 var caller = TraceContext.Instance.CurrentOperation;
60
60
61 var worker = new Thread(() => {
61 var worker = new Thread(() => {
62 TraceContext.Instance.EnterLogicalOperation(caller,false);
62 TraceContext.Instance.EnterLogicalOperation(caller,false);
63 try {
63 try {
64 func();
64 func();
65 p.Resolve();
65 p.Resolve();
66 } catch (Exception e) {
66 } catch (Exception e) {
67 p.Reject(e);
67 p.Reject(e);
68 } finally {
68 } finally {
69 TraceContext.Instance.Leave();
69 TraceContext.Instance.Leave();
70 }
70 }
71 });
71 });
72 worker.IsBackground = true;
72 worker.IsBackground = true;
73 worker.Start();
73 worker.Start();
74
74
75 return p;
75 return p;
76 }
76 }
77
77
78 public static IPromise[] ThreadRun(params Action[] func) {
78 public static IPromise[] RunThread(params Action[] func) {
79 return func.Select(f => InvokeNewThread(f)).ToArray();
79 return func.Select(f => InvokeNewThread(f)).ToArray();
80 }
80 }
81
81
82 public static IPromise<T>[] ThreadRun<T>(params Func<T>[] func) {
82 public static IPromise<T>[] RunThread<T>(params Func<T>[] func) {
83 return func.Select(f => InvokeNewThread(f)).ToArray();
83 return func.Select(f => InvokeNewThread(f)).ToArray();
84 }
84 }
85 }
85 }
86 }
86 }
@@ -1,271 +1,474
1 using System.Threading;
1 using System.Threading;
2 using System.Collections.Generic;
2 using System.Collections.Generic;
3 using System;
3 using System;
4 using System.Collections;
4 using System.Collections;
5
5
6 namespace Implab.Parallels {
6 namespace Implab.Parallels {
7 public class AsyncQueue<T> : IEnumerable<T> {
7 public class AsyncQueue<T> : IEnumerable<T> {
8 class Chunk {
8 class Chunk {
9 public Chunk next;
9 public Chunk next;
10
10
11 int m_low;
11 int m_low;
12 int m_hi;
12 int m_hi;
13 int m_alloc;
13 int m_alloc;
14 readonly int m_size;
14 readonly int m_size;
15 readonly T[] m_data;
15 readonly T[] m_data;
16
16
17 public Chunk(int size) {
17 public Chunk(int size) {
18 m_size = size;
18 m_size = size;
19 m_data = new T[size];
19 m_data = new T[size];
20 }
20 }
21
21
22 public Chunk(int size, T value) {
22 public Chunk(int size, T value) {
23 m_size = size;
23 m_size = size;
24 m_hi = 1;
24 m_hi = 1;
25 m_alloc = 1;
25 m_alloc = 1;
26 m_data = new T[size];
26 m_data = new T[size];
27 m_data[0] = value;
27 m_data[0] = value;
28 }
28 }
29
29
30 public Chunk(int size, T[] data, int offset, int length, int alloc) {
31 m_size = size;
32 m_hi = length;
33 m_alloc = alloc;
34 m_data = new T[size];
35 Array.Copy(data, offset, m_data, 0, length);
36 }
37
30 public int Low {
38 public int Low {
31 get { return m_low; }
39 get { return m_low; }
32 }
40 }
33
41
34 public int Hi {
42 public int Hi {
35 get { return m_hi; }
43 get { return m_hi; }
36 }
44 }
37
45
38 public bool TryEnqueue(T value,out bool extend) {
46 public bool TryEnqueue(T value, out bool extend) {
39 var alloc = Interlocked.Increment(ref m_alloc) - 1;
47 var alloc = Interlocked.Increment(ref m_alloc) - 1;
40
48
41 if (alloc >= m_size) {
49 if (alloc >= m_size) {
42 extend = alloc == m_size;
50 extend = alloc == m_size;
43 return false;
51 return false;
44 }
52 }
45
53
46 extend = false;
54 extend = false;
47 m_data[alloc] = value;
55 m_data[alloc] = value;
48
56
49 while (alloc != Interlocked.CompareExchange(ref m_hi, alloc + 1, alloc)) {
57 while (alloc != Interlocked.CompareExchange(ref m_hi, alloc + 1, alloc)) {
50 // spin wait for commit
58 // spin wait for commit
51 }
59 }
52 return true;
60 return true;
53 }
61 }
54
62
55 public bool TryDequeue(out T value,out bool recycle) {
63 public bool TryDequeue(out T value, out bool recycle) {
56 int low;
64 int low;
57 do {
65 do {
58 low = m_low;
66 low = m_low;
59 if (low >= m_hi) {
67 if (low >= m_hi) {
60 value = default(T);
68 value = default(T);
61 recycle = (low == m_size);
69 recycle = (low == m_size);
62 return false;
70 return false;
63 }
71 }
64 } while(low != Interlocked.CompareExchange(ref m_low, low + 1, low));
72 } while(low != Interlocked.CompareExchange(ref m_low, low + 1, low));
65
73
66 recycle = (low == m_size - 1);
74 recycle = (low == m_size - 1);
67 value = m_data[low];
75 value = m_data[low];
68
76
69 return true;
77 return true;
70 }
78 }
71
79
72 public bool TryEnqueueBatch(T[] batch, int offset, int length, out int enqueued, out bool extend) {
80 public bool TryEnqueueBatch(T[] batch, int offset, int length, out int enqueued, out bool extend) {
73 int alloc;
81 int alloc;
74 int allocSize;
82 int allocSize;
75
83
84 // in case the batch size is larger than a free space in chunk
85 // tailGap is used to over allocate the space in the chunk to
86 // get exclusive permission on creation of the next one.
87 int tailGap = 0;
88
76 do {
89 do {
77 alloc = m_alloc;
90 alloc = m_alloc;
78
91
79 if (alloc > m_size) {
92 if (alloc > m_size) {
80 enqueued = 0;
93 // the chunk is full and someone already
81 extend = false;
94 // creating the new one
82 return false;
95 enqueued = 0; // nothing was added
96 extend = false; // the caller shouldn't try to extend the queue
97 return false; // nothing was added
83 }
98 }
84
99
85 allocSize = Math.Min(m_size - m_alloc, length);
100 allocSize = Math.Min(m_size - alloc, length);
86 } while(alloc != Interlocked.CompareExchange(ref m_alloc, alloc + allocSize, alloc));
101 if (allocSize < length) // the chunk doesn't have enough space to hold the whole batch
102 tailGap = 1; // overallocate space to get exclusive permission to extend queue
103 } while(alloc != Interlocked.CompareExchange(ref m_alloc, alloc + allocSize + tailGap, alloc));
104
105 extend = tailGap != 0;
106 enqueued = allocSize;
87
107
88 if (alloc == m_size) {
108 // if the chunk was already full then length > 0, allocSize = 0, tailGap = 1
89 enqueued = 0;
109 if (alloc == m_size)
90 extend = true;
91 return false;
110 return false;
92 }
93
111
94 Array.Copy(batch, offset, m_data, alloc, allocSize);
112 Array.Copy(batch, offset, m_data, alloc, allocSize);
95 enqueued = allocSize;
96 extend = false;
97
113
98 while (alloc != Interlocked.CompareExchange(ref m_hi, alloc + allocSize, alloc)) {
114 while (alloc != Interlocked.CompareExchange(ref m_hi, alloc + allocSize, alloc)) {
99 // spin wait for commit
115 // spin wait for commit
100 }
116 }
101 return true;
117 return true;
102 }
118 }
103
119
120 public bool TryDequeueBatch(T[] buffer, int offset, int length,out int dequeued, out bool recycle) {
121 int low, hi, batchSize;
122
123 do {
124 low = m_low;
125 hi = m_hi;
126 if (low >= hi) {
127 dequeued = 0;
128 recycle = (low == m_size); // recycling could be restarted and we need to signal again
129 return false;
130 }
131 batchSize = Math.Min(hi - low, length);
132 } while(low != Interlocked.CompareExchange(ref m_low, low + batchSize, low));
133
134 recycle = (low == m_size - batchSize);
135 dequeued = batchSize;
136
137 Array.Copy(m_data, low, buffer, offset, batchSize);
138
139 return true;
140 }
141
104 public T GetAt(int pos) {
142 public T GetAt(int pos) {
105 return m_data[pos];
143 return m_data[pos];
106 }
144 }
107 }
145 }
108
146
109 public const int DEFAULT_CHUNK_SIZE = 32;
147 public const int DEFAULT_CHUNK_SIZE = 32;
148 public const int MAX_CHUNK_SIZE = 262144;
110
149
111 readonly int m_chunkSize = DEFAULT_CHUNK_SIZE;
150 readonly int m_chunkSize = DEFAULT_CHUNK_SIZE;
112
151
113 Chunk m_first;
152 Chunk m_first;
114 Chunk m_last;
153 Chunk m_last;
115
154
116 public AsyncQueue() {
155 public AsyncQueue() {
117 m_last = m_first = new Chunk(m_chunkSize);
156 m_last = m_first = new Chunk(m_chunkSize);
118 }
157 }
119
158
159 /// <summary>
160 /// Adds the specified value to the queue.
161 /// </summary>
162 /// <param name="value">Tha value which will be added to the queue.</param>
120 public void Enqueue(T value) {
163 public void Enqueue(T value) {
121 var last = m_last;
164 var last = m_last;
122 // spin wait to the new chunk
165 // spin wait to the new chunk
123 bool extend = true;
166 bool extend = true;
124 while(last == null || !last.TryEnqueue(value, out extend)) {
167 while (last == null || !last.TryEnqueue(value, out extend)) {
125 // try to extend queue
168 // try to extend queue
126 if (extend || last == null) {
169 if (extend || last == null) {
127 var chunk = new Chunk(m_chunkSize, value);
170 var chunk = new Chunk(m_chunkSize, value);
128 if (EnqueueChunk(last, chunk))
171 if (EnqueueChunk(last, chunk))
129 break;
172 break;
130 last = m_last;
173 last = m_last;
131 } else {
174 } else {
132 while (last != m_last) {
175 while (last == m_last) {
133 Thread.MemoryBarrier();
176 Thread.MemoryBarrier();
134 last = m_last;
135 }
177 }
178 last = m_last;
136 }
179 }
137 }
180 }
138 }
181 }
139
182
183 /// <summary>
184 /// Adds the specified data to the queue.
185 /// </summary>
186 /// <param name="data">The buffer which contains the data to be enqueued.</param>
187 /// <param name="offset">The offset of the data in the buffer.</param>
188 /// <param name="length">The size of the data to read from the buffer.</param>
189 public void EnqueueRange(T[] data, int offset, int length) {
190 if (data == null)
191 throw new ArgumentNullException("data");
192 if (offset < 0)
193 throw new ArgumentOutOfRangeException("offset");
194 if (length < 1 || offset + length > data.Length)
195 throw new ArgumentOutOfRangeException("length");
196
197 var last = m_last;
198
199 bool extend;
200 int enqueued;
201
202 while (length > 0) {
203 extend = true;
204 if (last != null && last.TryEnqueueBatch(data, offset, length, out enqueued, out extend)) {
205 length -= enqueued;
206 offset += enqueued;
207 }
208
209 if (extend) {
210 // there was no enough space in the chunk
211 // or there was no chunks in the queue
212
213 while (length > 0) {
214
215 var size = Math.Min(length, MAX_CHUNK_SIZE);
216
217 var chunk = new Chunk(
218 Math.Max(size, m_chunkSize),
219 data,
220 offset,
221 size,
222 length // length >= size
223 );
224
225 if (!EnqueueChunk(last, chunk)) {
226 // looks like the queue has been updated then proceed from the beginning
227 last = m_last;
228 break;
229 }
230
231 // we have successfully added the new chunk
232 last = chunk;
233 length -= size;
234 offset += size;
235 }
236 } else {
237 // we don't need to extend the queue, if we successfully enqueued data
238 if (length == 0)
239 break;
240
241 // if we need to wait while someone is extending the queue
242 // spinwait
243 while (last == m_last) {
244 Thread.MemoryBarrier();
245 }
246
247 last = m_last;
248 }
249 }
250 }
251
252 /// <summary>
253 /// Tries to retrieve the first element from the queue.
254 /// </summary>
255 /// <returns><c>true</c>, if element is dequeued, <c>false</c> otherwise.</returns>
256 /// <param name="value">The value of the dequeued element.</param>
140 public bool TryDequeue(out T value) {
257 public bool TryDequeue(out T value) {
141 var chunk = m_first;
258 var chunk = m_first;
142 bool recycle;
259 bool recycle;
143 while (chunk != null) {
260 while (chunk != null) {
144
261
145 var result = chunk.TryDequeue(out value, out recycle);
262 var result = chunk.TryDequeue(out value, out recycle);
146
263
147 if (recycle) // this chunk is waste
264 if (recycle) // this chunk is waste
148 RecycleFirstChunk(chunk);
265 RecycleFirstChunk(chunk);
149 else
266 else
150 return result; // this chunk is usable and returned actual result
267 return result; // this chunk is usable and returned actual result
151
268
152 if (result) // this chunk is waste but the true result is always actual
269 if (result) // this chunk is waste but the true result is always actual
153 return true;
270 return true;
154
271
155 // try again
272 // try again
156 chunk = m_first;
273 chunk = m_first;
157 }
274 }
158
275
159 // the queue is empty
276 // the queue is empty
160 value = default(T);
277 value = default(T);
161 return false;
278 return false;
162 }
279 }
163
280
281 /// <summary>
282 /// Tries to dequeue the specified amount of data from the queue.
283 /// </summary>
284 /// <returns><c>true</c>, if data was deuqueued, <c>false</c> otherwise.</returns>
285 /// <param name="buffer">The buffer to which the data will be written.</param>
286 /// <param name="offset">The offset in the buffer at which the data will be written.</param>
287 /// <param name="length">The maximum amount of data to be retrieved.</param>
288 /// <param name="dequeued">The actual amout of the retrieved data.</param>
289 public bool TryDequeueRange(T[] buffer, int offset, int length, out int dequeued) {
290 if (buffer == null)
291 throw new ArgumentNullException("buffer");
292 if (offset < 0)
293 throw new ArgumentOutOfRangeException("offset");
294 if (length < 1 || offset + length > buffer.Length)
295 throw new ArgumentOutOfRangeException("length");
296
297 var chunk = m_first;
298 bool recycle;
299 dequeued = 0;
300 while (chunk != null) {
301
302 int actual;
303 if (chunk.TryDequeueBatch(buffer, offset, length, out actual, out recycle)) {
304 offset += actual;
305 length -= actual;
306 dequeued += actual;
307 }
308
309 if (recycle) // this chunk is waste
310 RecycleFirstChunk(chunk);
311 else if (actual == 0)
312 break; // the chunk is usable, but doesn't contain any data (it's the last chunk in the queue)
313
314 if (length == 0)
315 return true;
316
317 // we still may dequeue something
318 // try again
319 chunk = m_first;
320 }
321
322 return dequeued != 0;
323 }
324
325 /// <summary>
326 /// Tries to dequeue all remaining data in the first chunk.
327 /// </summary>
328 /// <returns><c>true</c>, if data was dequeued, <c>false</c> otherwise.</returns>
329 /// <param name="buffer">The buffer to which data will be written.</param>
330 /// <param name="offset">The offset in the buffer at which the data will be written.</param>
331 /// <param name="length">Tha maximum amount of the data to be dequeued.</param>
332 /// <param name="dequeued">The actual amount of the dequeued data.</param>
333 public bool TryDequeueChunk(T[] buffer, int offset, int length, out int dequeued) {
334 if (buffer == null)
335 throw new ArgumentNullException("buffer");
336 if (offset < 0)
337 throw new ArgumentOutOfRangeException("offset");
338 if (length < 1 || offset + length > buffer.Length)
339 throw new ArgumentOutOfRangeException("length");
340
341 var chunk = m_first;
342 bool recycle;
343 dequeued = 0;
344
345 while (chunk != null) {
346
347 int actual;
348 if (chunk.TryDequeueBatch(buffer, offset, length, out actual, out recycle)) {
349 dequeued = actual;
350 }
351
352 if (recycle) // this chunk is waste
353 RecycleFirstChunk(chunk);
354
355 // if we have dequeued any data, then return
356 if (dequeued != 0)
357 return true;
358
359 // we still may dequeue something
360 // try again
361 chunk = m_first;
362 }
363
364 return false;
365 }
366
164 bool EnqueueChunk(Chunk last, Chunk chunk) {
367 bool EnqueueChunk(Chunk last, Chunk chunk) {
165 if (Interlocked.CompareExchange(ref m_last, chunk, last) != last)
368 if (Interlocked.CompareExchange(ref m_last, chunk, last) != last)
166 return false;
369 return false;
167
370
168 if (last != null)
371 if (last != null)
169 last.next = chunk;
372 last.next = chunk;
170 else
373 else
171 m_first = chunk;
374 m_first = chunk;
172 return true;
375 return true;
173 }
376 }
174
377
175 void RecycleFirstChunk(Chunk first) {
378 void RecycleFirstChunk(Chunk first) {
176 var next = first.next;
379 var next = first.next;
177
380
178 if (next == null) {
381 if (next == null) {
179 // looks like this is the last chunk
382 // looks like this is the last chunk
180 if (first != Interlocked.CompareExchange(ref m_last, null, first)) {
383 if (first != Interlocked.CompareExchange(ref m_last, null, first)) {
181 // race
384 // race
182 // maybe someone already recycled this chunk
385 // maybe someone already recycled this chunk
183 // or a new chunk has been appedned to the queue
386 // or a new chunk has been appedned to the queue
184
387
185 return; // give up
388 return; // give up
186 }
389 }
187 // the tail is updated
390 // the tail is updated
188 }
391 }
189
392
190 // we need to update the head
393 // we need to update the head
191 Interlocked.CompareExchange(ref m_first, next, first);
394 Interlocked.CompareExchange(ref m_first, next, first);
192 // if the head is already updated then give up
395 // if the head is already updated then give up
193 return;
396 return;
194
397
195 }
398 }
196
399
197 #region IEnumerable implementation
400 #region IEnumerable implementation
198
401
199 class Enumerator : IEnumerator<T> {
402 class Enumerator : IEnumerator<T> {
200 Chunk m_current;
403 Chunk m_current;
201 int m_pos = -1;
404 int m_pos = -1;
202
405
203 public Enumerator(Chunk fisrt) {
406 public Enumerator(Chunk fisrt) {
204 m_current = fisrt;
407 m_current = fisrt;
205 }
408 }
206
409
207 #region IEnumerator implementation
410 #region IEnumerator implementation
208
411
209 public bool MoveNext() {
412 public bool MoveNext() {
210 if (m_current == null)
413 if (m_current == null)
211 return false;
414 return false;
212
415
213 if (m_pos == -1)
416 if (m_pos == -1)
214 m_pos = m_current.Low;
417 m_pos = m_current.Low;
215 else
418 else
216 m_pos++;
419 m_pos++;
217 if (m_pos == m_current.Hi) {
420 if (m_pos == m_current.Hi) {
218 m_pos = 0;
421 m_pos = 0;
219 m_current = m_current.next;
422 m_current = m_current.next;
220 }
423 }
221
424
222 return true;
425 return true;
223 }
426 }
224
427
225 public void Reset() {
428 public void Reset() {
226 throw new NotSupportedException();
429 throw new NotSupportedException();
227 }
430 }
228
431
229 object IEnumerator.Current {
432 object IEnumerator.Current {
230 get {
433 get {
231 return Current;
434 return Current;
232 }
435 }
233 }
436 }
234
437
235 #endregion
438 #endregion
236
439
237 #region IDisposable implementation
440 #region IDisposable implementation
238
441
239 public void Dispose() {
442 public void Dispose() {
240 }
443 }
241
444
242 #endregion
445 #endregion
243
446
244 #region IEnumerator implementation
447 #region IEnumerator implementation
245
448
246 public T Current {
449 public T Current {
247 get {
450 get {
248 if (m_pos == -1 || m_current == null)
451 if (m_pos == -1 || m_current == null)
249 throw new InvalidOperationException();
452 throw new InvalidOperationException();
250 return m_current.GetAt(m_pos);
453 return m_current.GetAt(m_pos);
251 }
454 }
252 }
455 }
253
456
254 #endregion
457 #endregion
255 }
458 }
256
459
257 public IEnumerator<T> GetEnumerator() {
460 public IEnumerator<T> GetEnumerator() {
258 return new Enumerator(m_first);
461 return new Enumerator(m_first);
259 }
462 }
260
463
261 #endregion
464 #endregion
262
465
263 #region IEnumerable implementation
466 #region IEnumerable implementation
264
467
265 IEnumerator IEnumerable.GetEnumerator() {
468 IEnumerator IEnumerable.GetEnumerator() {
266 return GetEnumerator();
469 return GetEnumerator();
267 }
470 }
268
471
269 #endregion
472 #endregion
270 }
473 }
271 }
474 }
@@ -1,86 +1,86
1 using System;
1 using System;
2 using System.Collections.Generic;
2 using System.Collections.Generic;
3 using System.Linq;
3 using System.Linq;
4 using System.Text;
4 using System.Text;
5 using System.Text.RegularExpressions;
5 using System.Text.RegularExpressions;
6 using System.Diagnostics;
6 using System.Diagnostics;
7
7
8 namespace Implab
8 namespace Implab
9 {
9 {
10 public static class Safe
10 public static class Safe
11 {
11 {
12 public static void ArgumentMatch(string param, string name, Regex rx) {
12 public static void ArgumentMatch(string value, string paramName, Regex rx) {
13 if (rx == null)
13 if (rx == null)
14 throw new ArgumentNullException("rx");
14 throw new ArgumentNullException("rx");
15 if (!rx.IsMatch(param))
15 if (!rx.IsMatch(value))
16 throw new ArgumentException(String.Format("The prameter value must match {0}", rx), name);
16 throw new ArgumentException(String.Format("The prameter value must match {0}", rx), paramName);
17 }
17 }
18
18
19 public static void ArgumentNotEmpty(string param, string name) {
19 public static void ArgumentNotEmpty(string value, string paramName) {
20 if (String.IsNullOrEmpty(param))
20 if (String.IsNullOrEmpty(value))
21 throw new ArgumentException("The parameter can't be empty", name);
21 throw new ArgumentException("The parameter can't be empty", paramName);
22 }
22 }
23
23
24 public static void ArgumentNotEmpty<T>(T[] param, string name) {
24 public static void ArgumentNotEmpty<T>(T[] value, string paramName) {
25 if (param == null || param.Length == 0)
25 if (value == null || value.Length == 0)
26 throw new ArgumentException("The array must be not emty", name);
26 throw new ArgumentException("The array must be not emty", paramName);
27 }
27 }
28
28
29 public static void ArgumentNotNull(object param, string name) {
29 public static void ArgumentNotNull(object value, string paramName) {
30 if (param == null)
30 if (value == null)
31 throw new ArgumentNullException(name);
31 throw new ArgumentNullException(paramName);
32 }
32 }
33
33
34 public static void ArgumentInRange(int arg, int min, int max, string name) {
34 public static void ArgumentInRange(int value, int min, int max, string paramName) {
35 if (arg < min || arg > max)
35 if (value < min || value > max)
36 throw new ArgumentOutOfRangeException(name);
36 throw new ArgumentOutOfRangeException(paramName);
37 }
37 }
38
38
39 public static void Dispose<T>(T obj) where T : class
39 public static void Dispose<T>(T obj) where T : class
40 {
40 {
41 var disp = obj as IDisposable;
41 var disp = obj as IDisposable;
42 if (disp != null)
42 if (disp != null)
43 disp.Dispose();
43 disp.Dispose();
44 }
44 }
45
45
46 [DebuggerStepThrough]
46 [DebuggerStepThrough]
47 public static IPromise<T> InvokePromise<T>(Func<T> action) {
47 public static IPromise<T> InvokePromise<T>(Func<T> action) {
48 ArgumentNotNull(action, "action");
48 ArgumentNotNull(action, "action");
49
49
50 var p = new Promise<T>();
50 var p = new Promise<T>();
51 try {
51 try {
52 p.Resolve(action());
52 p.Resolve(action());
53 } catch (Exception err) {
53 } catch (Exception err) {
54 p.Reject(err);
54 p.Reject(err);
55 }
55 }
56
56
57 return p;
57 return p;
58 }
58 }
59
59
60 [DebuggerStepThrough]
60 [DebuggerStepThrough]
61 public static IPromise InvokePromise(Action action) {
61 public static IPromise InvokePromise(Action action) {
62 ArgumentNotNull(action, "action");
62 ArgumentNotNull(action, "action");
63
63
64 var p = new Promise();
64 var p = new Promise();
65 try {
65 try {
66 action();
66 action();
67 p.Resolve();
67 p.Resolve();
68 } catch (Exception err) {
68 } catch (Exception err) {
69 p.Reject(err);
69 p.Reject(err);
70 }
70 }
71
71
72 return p;
72 return p;
73 }
73 }
74
74
75 [DebuggerStepThrough]
75 [DebuggerStepThrough]
76 public static IPromise<T> InvokePromise<T>(Func<IPromise<T>> action) {
76 public static IPromise<T> InvokePromise<T>(Func<IPromise<T>> action) {
77 ArgumentNotNull(action, "action");
77 ArgumentNotNull(action, "action");
78
78
79 try {
79 try {
80 return action() ?? Promise<T>.ExceptionToPromise(new Exception("The action returned null"));
80 return action() ?? Promise<T>.ExceptionToPromise(new Exception("The action returned null"));
81 } catch (Exception err) {
81 } catch (Exception err) {
82 return Promise<T>.ExceptionToPromise(err);
82 return Promise<T>.ExceptionToPromise(err);
83 }
83 }
84 }
84 }
85 }
85 }
86 }
86 }
@@ -1,110 +1,120
1 using System;
1 using System;
2 using Implab.Diagnostics;
2 using Implab.Diagnostics;
3 using Implab.Parallels;
3 using Implab.Parallels;
4 using Implab;
4 using Implab;
5 using System.Collections.Generic;
5 using System.Collections.Generic;
6 using System.Collections.Concurrent;
6 using System.Collections.Concurrent;
7
7
8 namespace MonoPlay {
8 namespace MonoPlay {
9 class MainClass {
9 class MainClass {
10 public static void Main(string[] args) {
10 public static void Main(string[] args) {
11 if (args == null)
11 if (args == null)
12 throw new ArgumentNullException("args");
12 throw new ArgumentNullException("args");
13
13
14 var q1 = new AsyncQueue<int>();
14 var q1 = new AsyncQueue<int>();
15 var q2 = new Queue<int>();
15 var q2 = new Queue<int>();
16
16
17 const int count = 10000000;
17 const int count = 10000000;
18
18
19
19 int res1 = 0, res2 = 0;
20 var t1 = Environment.TickCount;
20 var t1 = Environment.TickCount;
21
21
22 AsyncPool.ThreadRun(
22 AsyncPool.RunThread(
23 () => {
23 () => {
24 for (var i = 0; i < count; i++)
24 for (var i = 0; i < count; i++)
25 q1.Enqueue(i);
25 q1.Enqueue(1);
26 Console.WriteLine("done writer #1: {0} ms", Environment.TickCount - t1);
26 },
27 },
27 () => {
28 () => {
28 for (var i = 0; i < count; i++)
29 for (var i = 0; i < count; i++)
29 q1.Enqueue(i);
30 q1.Enqueue(2);
31 Console.WriteLine("done writer #2: {0} ms", Environment.TickCount - t1);
30 },
32 },
31 () => {
33 () => {
32 int temp = 0;
34 int temp = 0;
33 int i = 0;
35 int i = 0;
34 while (i < count)
36 while (i < count)
35 if (q1.TryDequeue(out temp))
37 if (q1.TryDequeue(out temp)) {
36 i++;
38 i++;
39 res1 += temp;
40 }
41 Console.WriteLine("done reader #1: {0} ms", Environment.TickCount - t1);
37 },
42 },
38 () => {
43 () => {
39 int temp = 0;
44 int temp = 0;
40 int i = 0;
45 int i = 0;
41 while (i < count)
46 while (i < count)
42 if (q1.TryDequeue(out temp))
47 if (q1.TryDequeue(out temp)) {
43 i++;
48 i++;
49 res2 += temp;
50 }
51 Console.WriteLine("done reader #2: {0} ms", Environment.TickCount - t1);
44 }
52 }
45 )
53 )
46 .Combine()
54 .Combine()
47 .Join();
55 .Join();
48
56
57 Console.WriteLine("done: {0} ms, summ#1: {1}, summ#2: {2}, total: {3}", Environment.TickCount - t1, res1, res2, res1 + res2);
58
49 var t2 = Environment.TickCount;
59 var t2 = Environment.TickCount;
50 Console.WriteLine("MTQueue: {0} ms", t2 - t1);
60 Console.WriteLine("MTQueue: {0} ms", t2 - t1);
51
61
52 t1 = Environment.TickCount;
62 t1 = Environment.TickCount;
53
63
54 for (var i = 0; i < count * 2; i++)
64 for (var i = 0; i < count * 2; i++)
55 q2.Enqueue(i);
65 q2.Enqueue(i);
56
66
57 for (var i = 0; i < count * 2; i++)
67 for (var i = 0; i < count * 2; i++)
58 q2.Dequeue();
68 q2.Dequeue();
59
69
60 t2 = Environment.TickCount;
70 t2 = Environment.TickCount;
61 Console.WriteLine("Queue: {0} ms", t2 - t1);
71 Console.WriteLine("Queue: {0} ms", t2 - t1);
62
72
63 q2 = new Queue<int>();
73 q2 = new Queue<int>();
64
74
65 t1 = Environment.TickCount;
75 t1 = Environment.TickCount;
66
76
67
77
68 AsyncPool.ThreadRun(
78 AsyncPool.RunThread(
69 () => {
79 () => {
70 for (var i = 0; i < count; i++)
80 for (var i = 0; i < count; i++)
71 lock (q2)
81 lock (q2)
72 q2.Enqueue(i);
82 q2.Enqueue(i);
73 },
83 },
74 () => {
84 () => {
75 for (var i = 0; i < count; i++)
85 for (var i = 0; i < count; i++)
76 lock (q2)
86 lock (q2)
77 q2.Enqueue(i);
87 q2.Enqueue(i);
78 },
88 },
79 () => {
89 () => {
80 for (int i = 0; i < count ;)
90 for (int i = 0; i < count ;)
81 lock (q2) {
91 lock (q2) {
82 if (q2.Count == 0)
92 if (q2.Count == 0)
83 continue;
93 continue;
84 q2.Dequeue();
94 q2.Dequeue();
85 i++;
95 i++;
86 }
96 }
87
97
88 },
98 },
89 () => {
99 () => {
90 for (int i = 0; i < count ;)
100 for (int i = 0; i < count ;)
91 lock (q2) {
101 lock (q2) {
92 if (q2.Count == 0)
102 if (q2.Count == 0)
93 continue;
103 continue;
94 q2.Dequeue();
104 q2.Dequeue();
95 i++;
105 i++;
96 }
106 }
97
107
98 }
108 }
99 )
109 )
100 .Combine()
110 .Combine()
101 .Join();
111 .Join();
102
112
103
113
104
114
105 t2 = Environment.TickCount;
115 t2 = Environment.TickCount;
106 Console.WriteLine("Queue+Lock: {0} ms", t2 - t1);
116 Console.WriteLine("Queue+Lock: {0} ms", t2 - t1);
107
117
108 }
118 }
109 }
119 }
110 }
120 }
General Comments 0
You need to be logged in to leave comments. Login now