##// END OF EJS Templates
minor fixes and improvements of AsyncQueue, additional tests
cin -
r122:0c8685c8b56b v2
parent child
Show More
@@ -1,600 +1,676
1 using System;
1 using System;
2 using System.Reflection;
2 using System.Reflection;
3 using System.Threading;
3 using System.Threading;
4 using Implab.Parallels;
4 using Implab.Parallels;
5
5
6 #if MONO
6 #if MONO
7
7
8 using NUnit.Framework;
8 using NUnit.Framework;
9 using TestClassAttribute = NUnit.Framework.TestFixtureAttribute;
9 using TestClassAttribute = NUnit.Framework.TestFixtureAttribute;
10 using TestMethod = NUnit.Framework.TestAttribute;
10 using TestMethod = NUnit.Framework.TestAttribute;
11
11
12 #else
12 #else
13
13
14 using Microsoft.VisualStudio.TestTools.UnitTesting;
14 using Microsoft.VisualStudio.TestTools.UnitTesting;
15
15
16 #endif
16 #endif
17
17
18 namespace Implab.Test {
18 namespace Implab.Test {
19 [TestClass]
19 [TestClass]
20 public class AsyncTests {
20 public class AsyncTests {
21 [TestMethod]
21 [TestMethod]
22 public void ResolveTest() {
22 public void ResolveTest() {
23 int res = -1;
23 int res = -1;
24 var p = new Promise<int>();
24 var p = new Promise<int>();
25 p.Then(x => res = x);
25 p.Then(x => res = x);
26 p.Resolve(100);
26 p.Resolve(100);
27
27
28 Assert.AreEqual(100, res);
28 Assert.AreEqual(100, res);
29 }
29 }
30
30
31 [TestMethod]
31 [TestMethod]
32 public void RejectTest() {
32 public void RejectTest() {
33 int res = -1;
33 int res = -1;
34 Exception err = null;
34 Exception err = null;
35
35
36 var p = new Promise<int>();
36 var p = new Promise<int>();
37 p.Then(
37 p.Then(
38 x => res = x,
38 x => res = x,
39 e => {
39 e => {
40 err = e;
40 err = e;
41 return -2;
41 return -2;
42 }
42 }
43 );
43 );
44 p.Reject(new ApplicationException("error"));
44 p.Reject(new ApplicationException("error"));
45
45
46 Assert.AreEqual(res, -1);
46 Assert.AreEqual(res, -1);
47 Assert.AreEqual(err.Message, "error");
47 Assert.AreEqual(err.Message, "error");
48
48
49 }
49 }
50
50
51 [TestMethod]
51 [TestMethod]
52 public void CancelExceptionTest() {
52 public void CancelExceptionTest() {
53 var p = new Promise<bool>();
53 var p = new Promise<bool>();
54 p.Cancel();
54 p.Cancel();
55
55
56 var p2 = p.Cancelled(() => {
56 var p2 = p.Cancelled(() => {
57 throw new ApplicationException("CANCELLED");
57 throw new ApplicationException("CANCELLED");
58 });
58 });
59
59
60 try {
60 try {
61 p2.Join();
61 p2.Join();
62 Assert.Fail();
62 Assert.Fail();
63 } catch (ApplicationException err) {
63 } catch (ApplicationException err) {
64 Assert.AreEqual("CANCELLED", err.InnerException.Message);
64 Assert.AreEqual("CANCELLED", err.InnerException.Message);
65 }
65 }
66
66
67 }
67 }
68
68
69 [TestMethod]
69 [TestMethod]
70 public void ContinueOnCancelTest() {
70 public void ContinueOnCancelTest() {
71 var p = new Promise<bool>();
71 var p = new Promise<bool>();
72 p.Cancel();
72 p.Cancel();
73
73
74 var p2 = p
74 var p2 = p
75 .Cancelled<bool>(() => {
75 .Cancelled<bool>(() => {
76 throw new ApplicationException("CANCELLED");
76 throw new ApplicationException("CANCELLED");
77 })
77 })
78 .Error(e => true);
78 .Error(e => true);
79
79
80 Assert.AreEqual(true, p2.Join());
80 Assert.AreEqual(true, p2.Join());
81 }
81 }
82
82
83 [TestMethod]
83 [TestMethod]
84 public void JoinSuccessTest() {
84 public void JoinSuccessTest() {
85 var p = new Promise<int>();
85 var p = new Promise<int>();
86 p.Resolve(100);
86 p.Resolve(100);
87 Assert.AreEqual(p.Join(), 100);
87 Assert.AreEqual(p.Join(), 100);
88 }
88 }
89
89
90 [TestMethod]
90 [TestMethod]
91 public void JoinFailTest() {
91 public void JoinFailTest() {
92 var p = new Promise<int>();
92 var p = new Promise<int>();
93 p.Reject(new ApplicationException("failed"));
93 p.Reject(new ApplicationException("failed"));
94
94
95 try {
95 try {
96 p.Join();
96 p.Join();
97 throw new ApplicationException("WRONG!");
97 throw new ApplicationException("WRONG!");
98 } catch (TargetInvocationException err) {
98 } catch (TargetInvocationException err) {
99 Assert.AreEqual(err.InnerException.Message, "failed");
99 Assert.AreEqual(err.InnerException.Message, "failed");
100 } catch {
100 } catch {
101 Assert.Fail("Got wrong excaption");
101 Assert.Fail("Got wrong excaption");
102 }
102 }
103 }
103 }
104
104
105 [TestMethod]
105 [TestMethod]
106 public void MapTest() {
106 public void MapTest() {
107 var p = new Promise<int>();
107 var p = new Promise<int>();
108
108
109 var p2 = p.Then(x => x.ToString());
109 var p2 = p.Then(x => x.ToString());
110 p.Resolve(100);
110 p.Resolve(100);
111
111
112 Assert.AreEqual(p2.Join(), "100");
112 Assert.AreEqual(p2.Join(), "100");
113 }
113 }
114
114
115 [TestMethod]
115 [TestMethod]
116 public void FixErrorTest() {
116 public void FixErrorTest() {
117 var p = new Promise<int>();
117 var p = new Promise<int>();
118
118
119 var p2 = p.Error(e => 101);
119 var p2 = p.Error(e => 101);
120
120
121 p.Reject(new Exception());
121 p.Reject(new Exception());
122
122
123 Assert.AreEqual(p2.Join(), 101);
123 Assert.AreEqual(p2.Join(), 101);
124 }
124 }
125
125
126 [TestMethod]
126 [TestMethod]
127 public void ChainTest() {
127 public void ChainTest() {
128 var p1 = new Promise<int>();
128 var p1 = new Promise<int>();
129
129
130 var p3 = p1.Chain(x => {
130 var p3 = p1.Chain(x => {
131 var p2 = new Promise<string>();
131 var p2 = new Promise<string>();
132 p2.Resolve(x.ToString());
132 p2.Resolve(x.ToString());
133 return p2;
133 return p2;
134 });
134 });
135
135
136 p1.Resolve(100);
136 p1.Resolve(100);
137
137
138 Assert.AreEqual(p3.Join(), "100");
138 Assert.AreEqual(p3.Join(), "100");
139 }
139 }
140
140
141 [TestMethod]
141 [TestMethod]
142 public void ChainFailTest() {
142 public void ChainFailTest() {
143 var p1 = new Promise<int>();
143 var p1 = new Promise<int>();
144
144
145 var p3 = p1.Chain(x => {
145 var p3 = p1.Chain(x => {
146 var p2 = new Promise<string>();
146 var p2 = new Promise<string>();
147 p2.Reject(new Exception("DIE!!!"));
147 p2.Reject(new Exception("DIE!!!"));
148 return p2;
148 return p2;
149 });
149 });
150
150
151 p1.Resolve(100);
151 p1.Resolve(100);
152
152
153 Assert.IsTrue(p3.IsResolved);
153 Assert.IsTrue(p3.IsResolved);
154 }
154 }
155
155
156 [TestMethod]
156 [TestMethod]
157 public void PoolTest() {
157 public void PoolTest() {
158 var pid = Thread.CurrentThread.ManagedThreadId;
158 var pid = Thread.CurrentThread.ManagedThreadId;
159 var p = AsyncPool.Invoke(() => Thread.CurrentThread.ManagedThreadId);
159 var p = AsyncPool.Invoke(() => Thread.CurrentThread.ManagedThreadId);
160
160
161 Assert.AreNotEqual(pid, p.Join());
161 Assert.AreNotEqual(pid, p.Join());
162 }
162 }
163
163
164 [TestMethod]
164 [TestMethod]
165 public void WorkerPoolSizeTest() {
165 public void WorkerPoolSizeTest() {
166 var pool = new WorkerPool(5, 10, 1);
166 var pool = new WorkerPool(5, 10, 1);
167
167
168 Assert.AreEqual(5, pool.PoolSize);
168 Assert.AreEqual(5, pool.PoolSize);
169
169
170 pool.Invoke(() => { Thread.Sleep(100000000); return 10; });
170 pool.Invoke(() => { Thread.Sleep(100000000); return 10; });
171 pool.Invoke(() => { Thread.Sleep(100000000); return 10; });
171 pool.Invoke(() => { Thread.Sleep(100000000); return 10; });
172 pool.Invoke(() => { Thread.Sleep(100000000); return 10; });
172 pool.Invoke(() => { Thread.Sleep(100000000); return 10; });
173
173
174 Assert.AreEqual(5, pool.PoolSize);
174 Assert.AreEqual(5, pool.PoolSize);
175
175
176 for (int i = 0; i < 100; i++)
176 for (int i = 0; i < 100; i++)
177 pool.Invoke(() => { Thread.Sleep(100000000); return 10; });
177 pool.Invoke(() => { Thread.Sleep(100000000); return 10; });
178 Thread.Sleep(200);
178 Thread.Sleep(200);
179 Assert.AreEqual(10, pool.PoolSize);
179 Assert.AreEqual(10, pool.PoolSize);
180
180
181 pool.Dispose();
181 pool.Dispose();
182 }
182 }
183
183
184 [TestMethod]
184 [TestMethod]
185 public void WorkerPoolCorrectTest() {
185 public void WorkerPoolCorrectTest() {
186 var pool = new WorkerPool(0,1000,100);
186 var pool = new WorkerPool(0,1000,100);
187
187
188 const int iterations = 1000;
188 const int iterations = 1000;
189 int pending = iterations;
189 int pending = iterations;
190 var stop = new ManualResetEvent(false);
190 var stop = new ManualResetEvent(false);
191
191
192 var count = 0;
192 var count = 0;
193 for (int i = 0; i < iterations; i++) {
193 for (int i = 0; i < iterations; i++) {
194 pool
194 pool
195 .Invoke(() => 1)
195 .Invoke(() => 1)
196 .Then(x => Interlocked.Add(ref count, x))
196 .Then(x => Interlocked.Add(ref count, x))
197 .Then(x => Math.Log10(x))
197 .Then(x => Math.Log10(x))
198 .On(() => {
198 .On(() => {
199 Interlocked.Decrement(ref pending);
199 Interlocked.Decrement(ref pending);
200 if (pending == 0)
200 if (pending == 0)
201 stop.Set();
201 stop.Set();
202 }, PromiseEventType.All);
202 }, PromiseEventType.All);
203 }
203 }
204
204
205 stop.WaitOne();
205 stop.WaitOne();
206
206
207 Assert.AreEqual(iterations, count);
207 Assert.AreEqual(iterations, count);
208 Console.WriteLine("Max threads: {0}", pool.MaxRunningThreads);
208 Console.WriteLine("Max threads: {0}", pool.MaxRunningThreads);
209 pool.Dispose();
209 pool.Dispose();
210
210
211 }
211 }
212
212
213 [TestMethod]
213 [TestMethod]
214 public void WorkerPoolDisposeTest() {
214 public void WorkerPoolDisposeTest() {
215 var pool = new WorkerPool(5, 20);
215 var pool = new WorkerPool(5, 20);
216 Assert.AreEqual(5, pool.PoolSize);
216 Assert.AreEqual(5, pool.PoolSize);
217 pool.Dispose();
217 pool.Dispose();
218 Thread.Sleep(500);
218 Thread.Sleep(500);
219 Assert.AreEqual(0, pool.PoolSize);
219 Assert.AreEqual(0, pool.PoolSize);
220 pool.Dispose();
220 pool.Dispose();
221 }
221 }
222
222
223 [TestMethod]
223 [TestMethod]
224 public void MTQueueTest() {
224 public void MTQueueTest() {
225 var queue = new MTQueue<int>();
225 var queue = new MTQueue<int>();
226 int res;
226 int res;
227
227
228 queue.Enqueue(10);
228 queue.Enqueue(10);
229 Assert.IsTrue(queue.TryDequeue(out res));
229 Assert.IsTrue(queue.TryDequeue(out res));
230 Assert.AreEqual(10, res);
230 Assert.AreEqual(10, res);
231 Assert.IsFalse(queue.TryDequeue(out res));
231 Assert.IsFalse(queue.TryDequeue(out res));
232
232
233 for (int i = 0; i < 1000; i++)
233 for (int i = 0; i < 1000; i++)
234 queue.Enqueue(i);
234 queue.Enqueue(i);
235
235
236 for (int i = 0; i < 1000; i++) {
236 for (int i = 0; i < 1000; i++) {
237 queue.TryDequeue(out res);
237 queue.TryDequeue(out res);
238 Assert.AreEqual(i, res);
238 Assert.AreEqual(i, res);
239 }
239 }
240
240
241 int writers = 0;
241 int writers = 0;
242 int readers = 0;
242 int readers = 0;
243 var stop = new ManualResetEvent(false);
243 var stop = new ManualResetEvent(false);
244 int total = 0;
244 int total = 0;
245
245
246 const int itemsPerWriter = 10000;
246 const int itemsPerWriter = 10000;
247 const int writersCount = 10;
247 const int writersCount = 10;
248
248
249 for (int i = 0; i < writersCount; i++) {
249 for (int i = 0; i < writersCount; i++) {
250 Interlocked.Increment(ref writers);
250 Interlocked.Increment(ref writers);
251 AsyncPool
251 AsyncPool
252 .InvokeNewThread(() => {
252 .InvokeNewThread(() => {
253 for (int ii = 0; ii < itemsPerWriter; ii++) {
253 for (int ii = 0; ii < itemsPerWriter; ii++) {
254 queue.Enqueue(1);
254 queue.Enqueue(1);
255 }
255 }
256 return 1;
256 return 1;
257 })
257 })
258 .On(() => Interlocked.Decrement(ref writers), PromiseEventType.All);
258 .On(() => Interlocked.Decrement(ref writers), PromiseEventType.All);
259 }
259 }
260
260
261 for (int i = 0; i < 10; i++) {
261 for (int i = 0; i < 10; i++) {
262 Interlocked.Increment(ref readers);
262 Interlocked.Increment(ref readers);
263 AsyncPool
263 AsyncPool
264 .InvokeNewThread(() => {
264 .InvokeNewThread(() => {
265 int t;
265 int t;
266 do {
266 do {
267 while (queue.TryDequeue(out t))
267 while (queue.TryDequeue(out t))
268 Interlocked.Add(ref total, t);
268 Interlocked.Add(ref total, t);
269 } while (writers > 0);
269 } while (writers > 0);
270 return 1;
270 return 1;
271 })
271 })
272 .On(() => {
272 .On(() => {
273 Interlocked.Decrement(ref readers);
273 Interlocked.Decrement(ref readers);
274 if (readers == 0)
274 if (readers == 0)
275 stop.Set();
275 stop.Set();
276 }, PromiseEventType.All);
276 }, PromiseEventType.All);
277 }
277 }
278
278
279 stop.WaitOne();
279 stop.WaitOne();
280
280
281 Assert.AreEqual(100000, total);
281 Assert.AreEqual(100000, total);
282 }
282 }
283
283
284 [TestMethod]
284 [TestMethod]
285 public void AsyncQueueTest() {
285 public void AsyncQueueTest() {
286 var queue = new AsyncQueue<int>();
286 var queue = new AsyncQueue<int>();
287 int res;
287 int res;
288
288
289 queue.Enqueue(10);
289 queue.Enqueue(10);
290 Assert.IsTrue(queue.TryDequeue(out res));
290 Assert.IsTrue(queue.TryDequeue(out res));
291 Assert.AreEqual(10, res);
291 Assert.AreEqual(10, res);
292 Assert.IsFalse(queue.TryDequeue(out res));
292 Assert.IsFalse(queue.TryDequeue(out res));
293
293
294 for (int i = 0; i < 1000; i++)
294 for (int i = 0; i < 1000; i++)
295 queue.Enqueue(i);
295 queue.Enqueue(i);
296
296
297 for (int i = 0; i < 1000; i++) {
297 for (int i = 0; i < 1000; i++) {
298 queue.TryDequeue(out res);
298 queue.TryDequeue(out res);
299 Assert.AreEqual(i, res);
299 Assert.AreEqual(i, res);
300 }
300 }
301
301
302 const int count = 10000000;
302 const int count = 10000000;
303
303
304 int res1 = 0, res2 = 0;
304 int res1 = 0, res2 = 0;
305 var t1 = Environment.TickCount;
305 var t1 = Environment.TickCount;
306
306
307 AsyncPool.RunThread(
307 AsyncPool.RunThread(
308 () => {
308 () => {
309 for (var i = 0; i < count; i++)
309 for (var i = 0; i < count; i++)
310 queue.Enqueue(1);
310 queue.Enqueue(1);
311 Console.WriteLine("done writer #1: {0} ms", Environment.TickCount - t1);
311 Console.WriteLine("done writer #1: {0} ms", Environment.TickCount - t1);
312 },
312 },
313 () => {
313 () => {
314 for (var i = 0; i < count; i++)
314 for (var i = 0; i < count; i++)
315 queue.Enqueue(2);
315 queue.Enqueue(2);
316 Console.WriteLine("done writer #2: {0} ms", Environment.TickCount - t1);
316 Console.WriteLine("done writer #2: {0} ms", Environment.TickCount - t1);
317 },
317 },
318 () => {
318 () => {
319 int temp;
319 int temp;
320 int i = 0;
320 int i = 0;
321 while (i < count)
321 while (i < count)
322 if (queue.TryDequeue(out temp)) {
322 if (queue.TryDequeue(out temp)) {
323 i++;
323 i++;
324 res1 += temp;
324 res1 += temp;
325 }
325 }
326 Console.WriteLine("done reader #1: {0} ms", Environment.TickCount - t1);
326 Console.WriteLine("done reader #1: {0} ms", Environment.TickCount - t1);
327 },
327 },
328 () => {
328 () => {
329 int temp;
329 int temp;
330 int i = 0;
330 int i = 0;
331 while (i < count)
331 while (i < count)
332 if (queue.TryDequeue(out temp)) {
332 if (queue.TryDequeue(out temp)) {
333 i++;
333 i++;
334 res2 += temp;
334 res2 += temp;
335 }
335 }
336 Console.WriteLine("done reader #2: {0} ms", Environment.TickCount - t1);
336 Console.WriteLine("done reader #2: {0} ms", Environment.TickCount - t1);
337 }
337 }
338 )
338 )
339 .Combine()
339 .Combine()
340 .Join();
340 .Join();
341
341
342 Assert.AreEqual(count * 3, res1 + res2);
342 Assert.AreEqual(count * 3, res1 + res2);
343
343
344 Console.WriteLine(
344 Console.WriteLine(
345 "done: {0} ms, summ#1: {1}, summ#2: {2}, total: {3}, count: {4}",
345 "done: {0} ms, summ#1: {1}, summ#2: {2}, total: {3}, count: {4}",
346 Environment.TickCount - t1,
346 Environment.TickCount - t1,
347 res1,
347 res1,
348 res2,
348 res2,
349 res1 + res2,
349 res1 + res2,
350 count
350 count
351 );
351 );
352 }
352 }
353
353
354 [TestMethod]
354 [TestMethod]
355 public void AsyncQueueBatchTest() {
355 public void AsyncQueueBatchTest() {
356 var queue = new AsyncQueue<int>();
356 var queue = new AsyncQueue<int>();
357
357
358 const int wBatch = 29;
358 const int wBatch = 29;
359 const int wCount = 400000;
359 const int wCount = 400000;
360 const int total = wBatch * wCount * 2;
360 const int total = wBatch * wCount * 2;
361 const int summ = wBatch * wCount * 3;
361 const int summ = wBatch * wCount * 3;
362
362
363 int r1 = 0, r2 = 0;
363 int r1 = 0, r2 = 0;
364 const int rBatch = 111;
364 const int rBatch = 111;
365 int read = 0;
365 int read = 0;
366
366
367 var t1 = Environment.TickCount;
367 var t1 = Environment.TickCount;
368
368
369 AsyncPool.RunThread(
369 AsyncPool.RunThread(
370 () => {
370 () => {
371 var buffer = new int[wBatch];
371 var buffer = new int[wBatch];
372 for(int i = 0; i<wBatch; i++)
372 for(int i = 0; i<wBatch; i++)
373 buffer[i] = 1;
373 buffer[i] = 1;
374
374
375 for(int i =0; i < wCount; i++)
375 for(int i =0; i < wCount; i++)
376 queue.EnqueueRange(buffer,0,wBatch);
376 queue.EnqueueRange(buffer,0,wBatch);
377 Console.WriteLine("done writer #1: {0} ms", Environment.TickCount - t1);
377 Console.WriteLine("done writer #1: {0} ms", Environment.TickCount - t1);
378 },
378 },
379 () => {
379 () => {
380 var buffer = new int[wBatch];
380 var buffer = new int[wBatch];
381 for(int i = 0; i<wBatch; i++)
381 for(int i = 0; i<wBatch; i++)
382 buffer[i] = 2;
382 buffer[i] = 2;
383
383
384 for(int i =0; i < wCount; i++)
384 for(int i =0; i < wCount; i++)
385 queue.EnqueueRange(buffer,0,wBatch);
385 queue.EnqueueRange(buffer,0,wBatch);
386 Console.WriteLine("done writer #2: {0} ms", Environment.TickCount - t1);
386 Console.WriteLine("done writer #2: {0} ms", Environment.TickCount - t1);
387 },
387 },
388 () => {
388 () => {
389 var buffer = new int[rBatch];
389 var buffer = new int[rBatch];
390
390
391 while(read < total) {
391 while(read < total) {
392 int actual;
392 int actual;
393 if (queue.TryDequeueRange(buffer,0,rBatch,out actual)) {
393 if (queue.TryDequeueRange(buffer,0,rBatch,out actual)) {
394 for(int i=0; i< actual; i++)
394 for(int i=0; i< actual; i++)
395 r1 += buffer[i];
395 r1 += buffer[i];
396 Interlocked.Add(ref read, actual);
396 Interlocked.Add(ref read, actual);
397 }
397 }
398 }
398 }
399
399
400 Console.WriteLine("done reader #1: {0} ms", Environment.TickCount - t1);
400 Console.WriteLine("done reader #1: {0} ms", Environment.TickCount - t1);
401 },
401 },
402 () => {
402 () => {
403 var buffer = new int[rBatch];
403 var buffer = new int[rBatch];
404
404
405 while(read < total) {
405 while(read < total) {
406 int actual;
406 int actual;
407 if (queue.TryDequeueRange(buffer,0,rBatch,out actual)) {
407 if (queue.TryDequeueRange(buffer,0,rBatch,out actual)) {
408 for(int i=0; i< actual; i++)
408 for(int i=0; i< actual; i++)
409 r2 += buffer[i];
409 r2 += buffer[i];
410 Interlocked.Add(ref read, actual);
410 Interlocked.Add(ref read, actual);
411 }
411 }
412 }
412 }
413
413
414 Console.WriteLine("done reader #2: {0} ms", Environment.TickCount - t1);
414 Console.WriteLine("done reader #2: {0} ms", Environment.TickCount - t1);
415 }
415 }
416 )
416 )
417 .Combine()
417 .Combine()
418 .Join();
418 .Join();
419
419
420 Assert.AreEqual(summ , r1 + r2);
420 Assert.AreEqual(summ , r1 + r2);
421
421
422 Console.WriteLine(
422 Console.WriteLine(
423 "done: {0} ms, summ#1: {1}, summ#2: {2}, total: {3}, count: {4}",
423 "done: {0} ms, summ#1: {1}, summ#2: {2}, total: {3}, count: {4}",
424 Environment.TickCount - t1,
424 Environment.TickCount - t1,
425 r1,
425 r1,
426 r2,
426 r2,
427 r1 + r2,
427 r1 + r2,
428 total
428 total
429 );
429 );
430 }
430 }
431
431
432 [TestMethod]
432 [TestMethod]
433 public void AsyncQueueChunkDequeueTest() {
434 var queue = new AsyncQueue<int>();
435
436 const int wBatch = 31;
437 const int wCount = 200000;
438 const int total = wBatch * wCount * 3;
439 const int summ = wBatch * wCount * 6;
440
441 int r1 = 0, r2 = 0;
442 const int rBatch = 1024;
443 int read = 0;
444
445 var t1 = Environment.TickCount;
446
447 AsyncPool.RunThread(
448 () => {
449 var buffer = new int[wBatch];
450 for(int i = 0; i<wBatch; i++)
451 buffer[i] = 1;
452
453 for(int i =0; i < wCount; i++)
454 queue.EnqueueRange(buffer,0,wBatch);
455 Console.WriteLine("done writer #1: {0} ms", Environment.TickCount - t1);
456 },
457 () => {
458 var buffer = new int[wBatch];
459 for(int i = 0; i<wBatch; i++)
460 buffer[i] = 2;
461
462 for(int i =0; i < wCount; i++)
463 queue.EnqueueRange(buffer,0,wBatch);
464 Console.WriteLine("done writer #2: {0} ms", Environment.TickCount - t1);
465 },
466 () => {
467 var buffer = new int[wBatch];
468 for(int i = 0; i<wBatch; i++)
469 buffer[i] = 3;
470
471 for(int i =0; i < wCount; i++)
472 queue.EnqueueRange(buffer,0,wBatch);
473 Console.WriteLine("done writer #3: {0} ms", Environment.TickCount - t1);
474 },
475 () => {
476 var buffer = new int[rBatch];
477 int count = 1;
478 double avgchunk = 0;
479 while(read < total) {
480 int actual;
481 if (queue.TryDequeueChunk(buffer,0,rBatch,out actual)) {
482 for(int i=0; i< actual; i++)
483 r2 += buffer[i];
484 Interlocked.Add(ref read, actual);
485 avgchunk = avgchunk*(count-1)/count + actual/(double)count;
486 count ++;
487 }
488 }
489
490 Console.WriteLine("done reader #2: {0} ms, avg chunk size: {1}", Environment.TickCount - t1, avgchunk);
491 }
492 )
493 .Combine()
494 .Join();
495
496 Assert.AreEqual(summ , r1 + r2);
497
498 Console.WriteLine(
499 "done: {0} ms, summ#1: {1}, summ#2: {2}, total: {3}, count: {4}",
500 Environment.TickCount - t1,
501 r1,
502 r2,
503 r1 + r2,
504 total
505 );
506 }
507
508 [TestMethod]
433 public void ParallelMapTest() {
509 public void ParallelMapTest() {
434
510
435 const int count = 100000;
511 const int count = 100000;
436
512
437 var args = new double[count];
513 var args = new double[count];
438 var rand = new Random();
514 var rand = new Random();
439
515
440 for (int i = 0; i < count; i++)
516 for (int i = 0; i < count; i++)
441 args[i] = rand.NextDouble();
517 args[i] = rand.NextDouble();
442
518
443 var t = Environment.TickCount;
519 var t = Environment.TickCount;
444 var res = args.ParallelMap(x => Math.Sin(x*x), 4).Join();
520 var res = args.ParallelMap(x => Math.Sin(x*x), 4).Join();
445
521
446 Console.WriteLine("Map complete in {0} ms", Environment.TickCount - t);
522 Console.WriteLine("Map complete in {0} ms", Environment.TickCount - t);
447
523
448 t = Environment.TickCount;
524 t = Environment.TickCount;
449 for (int i = 0; i < count; i++)
525 for (int i = 0; i < count; i++)
450 Assert.AreEqual(Math.Sin(args[i] * args[i]), res[i]);
526 Assert.AreEqual(Math.Sin(args[i] * args[i]), res[i]);
451 Console.WriteLine("Verified in {0} ms", Environment.TickCount - t);
527 Console.WriteLine("Verified in {0} ms", Environment.TickCount - t);
452 }
528 }
453
529
454 [TestMethod]
530 [TestMethod]
455 public void ChainedMapTest() {
531 public void ChainedMapTest() {
456
532
457 using (var pool = new WorkerPool(0,10,1)) {
533 using (var pool = new WorkerPool(0,10,1)) {
458 const int count = 10000;
534 const int count = 10000;
459
535
460 var args = new double[count];
536 var args = new double[count];
461 var rand = new Random();
537 var rand = new Random();
462
538
463 for (int i = 0; i < count; i++)
539 for (int i = 0; i < count; i++)
464 args[i] = rand.NextDouble();
540 args[i] = rand.NextDouble();
465
541
466 var t = Environment.TickCount;
542 var t = Environment.TickCount;
467 var res = args
543 var res = args
468 .ChainedMap(
544 .ChainedMap(
469 // Analysis disable once AccessToDisposedClosure
545 // Analysis disable once AccessToDisposedClosure
470 x => pool.Invoke(
546 x => pool.Invoke(
471 () => Math.Sin(x * x)
547 () => Math.Sin(x * x)
472 ),
548 ),
473 4
549 4
474 )
550 )
475 .Join();
551 .Join();
476
552
477 Console.WriteLine("Map complete in {0} ms", Environment.TickCount - t);
553 Console.WriteLine("Map complete in {0} ms", Environment.TickCount - t);
478
554
479 t = Environment.TickCount;
555 t = Environment.TickCount;
480 for (int i = 0; i < count; i++)
556 for (int i = 0; i < count; i++)
481 Assert.AreEqual(Math.Sin(args[i] * args[i]), res[i]);
557 Assert.AreEqual(Math.Sin(args[i] * args[i]), res[i]);
482 Console.WriteLine("Verified in {0} ms", Environment.TickCount - t);
558 Console.WriteLine("Verified in {0} ms", Environment.TickCount - t);
483 Console.WriteLine("Max workers: {0}", pool.MaxRunningThreads);
559 Console.WriteLine("Max workers: {0}", pool.MaxRunningThreads);
484 }
560 }
485 }
561 }
486
562
487 [TestMethod]
563 [TestMethod]
488 public void ParallelForEachTest() {
564 public void ParallelForEachTest() {
489
565
490 const int count = 100000;
566 const int count = 100000;
491
567
492 var args = new int[count];
568 var args = new int[count];
493 var rand = new Random();
569 var rand = new Random();
494
570
495 for (int i = 0; i < count; i++)
571 for (int i = 0; i < count; i++)
496 args[i] = (int)(rand.NextDouble() * 100);
572 args[i] = (int)(rand.NextDouble() * 100);
497
573
498 int result = 0;
574 int result = 0;
499
575
500 var t = Environment.TickCount;
576 var t = Environment.TickCount;
501 args.ParallelForEach(x => Interlocked.Add(ref result, x), 4).Join();
577 args.ParallelForEach(x => Interlocked.Add(ref result, x), 4).Join();
502
578
503 Console.WriteLine("Iteration complete in {0} ms, result: {1}", Environment.TickCount - t, result);
579 Console.WriteLine("Iteration complete in {0} ms, result: {1}", Environment.TickCount - t, result);
504
580
505 int result2 = 0;
581 int result2 = 0;
506
582
507 t = Environment.TickCount;
583 t = Environment.TickCount;
508 for (int i = 0; i < count; i++)
584 for (int i = 0; i < count; i++)
509 result2 += args[i];
585 result2 += args[i];
510 Assert.AreEqual(result2, result);
586 Assert.AreEqual(result2, result);
511 Console.WriteLine("Verified in {0} ms", Environment.TickCount - t);
587 Console.WriteLine("Verified in {0} ms", Environment.TickCount - t);
512 }
588 }
513
589
514 [TestMethod]
590 [TestMethod]
515 public void ComplexCase1Test() {
591 public void ComplexCase1Test() {
516 var flags = new bool[3];
592 var flags = new bool[3];
517
593
518 // op1 (aync 200ms) => op2 (async 200ms) => op3 (sync map)
594 // op1 (aync 200ms) => op2 (async 200ms) => op3 (sync map)
519
595
520 var step1 = PromiseHelper
596 var step1 = PromiseHelper
521 .Sleep(200, "Alan")
597 .Sleep(200, "Alan")
522 .On(() => flags[0] = true, PromiseEventType.Cancelled);
598 .On(() => flags[0] = true, PromiseEventType.Cancelled);
523 var p = step1
599 var p = step1
524 .Chain(x =>
600 .Chain(x =>
525 PromiseHelper
601 PromiseHelper
526 .Sleep(200, "Hi, " + x)
602 .Sleep(200, "Hi, " + x)
527 .Then(y => y)
603 .Then(y => y)
528 .On(() => flags[1] = true, PromiseEventType.Cancelled)
604 .On(() => flags[1] = true, PromiseEventType.Cancelled)
529 )
605 )
530 .On(() => flags[2] = true, PromiseEventType.Cancelled);
606 .On(() => flags[2] = true, PromiseEventType.Cancelled);
531 step1.Join();
607 step1.Join();
532 p.Cancel();
608 p.Cancel();
533 try {
609 try {
534 Assert.AreEqual(p.Join(), "Hi, Alan");
610 Assert.AreEqual(p.Join(), "Hi, Alan");
535 Assert.Fail("Shouldn't get here");
611 Assert.Fail("Shouldn't get here");
536 } catch (OperationCanceledException) {
612 } catch (OperationCanceledException) {
537 }
613 }
538
614
539 Assert.IsFalse(flags[0]);
615 Assert.IsFalse(flags[0]);
540 Assert.IsTrue(flags[1]);
616 Assert.IsTrue(flags[1]);
541 Assert.IsTrue(flags[2]);
617 Assert.IsTrue(flags[2]);
542 }
618 }
543
619
544 [TestMethod]
620 [TestMethod]
545 public void ChainedCancel1Test() {
621 public void ChainedCancel1Test() {
546 // ΠΏΡ€ΠΈ ΠΎΡ‚ΠΌΠ΅Π½Π΅ сцСплСнной асинхронной ΠΎΠΏΠ΅Ρ€Π°Ρ†ΠΈΠΈ всС ΠΎΠ±Π΅Ρ‰Π°Π½ΠΈΠ΅ Π΄ΠΎΠ»ΠΆΠ½ΠΎ
622 // ΠΏΡ€ΠΈ ΠΎΡ‚ΠΌΠ΅Π½Π΅ сцСплСнной асинхронной ΠΎΠΏΠ΅Ρ€Π°Ρ†ΠΈΠΈ всС ΠΎΠ±Π΅Ρ‰Π°Π½ΠΈΠ΅ Π΄ΠΎΠ»ΠΆΠ½ΠΎ
547 // Π·Π°Π²Π΅Ρ€ΡˆΠ°Ρ‚ΡŒΡΡ ошибкой OperationCanceledException
623 // Π·Π°Π²Π΅Ρ€ΡˆΠ°Ρ‚ΡŒΡΡ ошибкой OperationCanceledException
548 var p = PromiseHelper
624 var p = PromiseHelper
549 .Sleep(1, "Hi, HAL!")
625 .Sleep(1, "Hi, HAL!")
550 .Then(x => {
626 .Then(x => {
551 // запускаСм Π΄Π²Π΅ асинхронныС ΠΎΠΏΠ΅Ρ€Π°Ρ†ΠΈΠΈ
627 // запускаСм Π΄Π²Π΅ асинхронныС ΠΎΠΏΠ΅Ρ€Π°Ρ†ΠΈΠΈ
552 var result = PromiseHelper.Sleep(1000, "HEM ENABLED!!!");
628 var result = PromiseHelper.Sleep(1000, "HEM ENABLED!!!");
553 // вторая опСрация отмСняСт ΠΏΠ΅Ρ€Π²ΡƒΡŽ Π΄ΠΎ Π·Π°Π²Π΅Ρ€ΡˆΠ΅Π½ΠΈΡ
629 // вторая опСрация отмСняСт ΠΏΠ΅Ρ€Π²ΡƒΡŽ Π΄ΠΎ Π·Π°Π²Π΅Ρ€ΡˆΠ΅Π½ΠΈΡ
554 PromiseHelper
630 PromiseHelper
555 .Sleep(100, "HAL, STOP!")
631 .Sleep(100, "HAL, STOP!")
556 .Then(result.Cancel);
632 .Then(result.Cancel);
557 return result;
633 return result;
558 });
634 });
559 try {
635 try {
560 p.Join();
636 p.Join();
561 } catch (TargetInvocationException err) {
637 } catch (TargetInvocationException err) {
562 Assert.IsTrue(err.InnerException is OperationCanceledException);
638 Assert.IsTrue(err.InnerException is OperationCanceledException);
563 }
639 }
564 }
640 }
565
641
566 [TestMethod]
642 [TestMethod]
567 public void ChainedCancel2Test() {
643 public void ChainedCancel2Test() {
568 // ΠΏΡ€ΠΈ ΠΎΡ‚ΠΌΠ΅Π½Π΅ Ρ†Π΅ΠΏΠΎΡ‡ΠΊΠΈ ΠΎΠ±Π΅Ρ‰Π°Π½ΠΈΠΉ, Π²Π»ΠΎΠΆΠ΅Π½Π½Ρ‹Π΅ ΠΎΠΏΠ΅Ρ€Π°Ρ†ΠΈΠΈ Ρ‚Π°ΠΊΠΆΠ΅ Π΄ΠΎΠ»ΠΆΠ½Ρ‹ ΠΎΡ‚ΠΌΠ΅Π½ΡΡ‚ΡŒΡΡ
644 // ΠΏΡ€ΠΈ ΠΎΡ‚ΠΌΠ΅Π½Π΅ Ρ†Π΅ΠΏΠΎΡ‡ΠΊΠΈ ΠΎΠ±Π΅Ρ‰Π°Π½ΠΈΠΉ, Π²Π»ΠΎΠΆΠ΅Π½Π½Ρ‹Π΅ ΠΎΠΏΠ΅Ρ€Π°Ρ†ΠΈΠΈ Ρ‚Π°ΠΊΠΆΠ΅ Π΄ΠΎΠ»ΠΆΠ½Ρ‹ ΠΎΡ‚ΠΌΠ΅Π½ΡΡ‚ΡŒΡΡ
569 var pSurvive = new Promise<bool>();
645 var pSurvive = new Promise<bool>();
570 var hemStarted = new ManualResetEvent(false);
646 var hemStarted = new ManualResetEvent(false);
571 var p = PromiseHelper
647 var p = PromiseHelper
572 .Sleep(1, "Hi, HAL!")
648 .Sleep(1, "Hi, HAL!")
573 .Chain(x => {
649 .Chain(x => {
574 hemStarted.Set();
650 hemStarted.Set();
575 // запускаСм Π΄Π²Π΅ асинхронныС ΠΎΠΏΠ΅Ρ€Π°Ρ†ΠΈΠΈ
651 // запускаСм Π΄Π²Π΅ асинхронныС ΠΎΠΏΠ΅Ρ€Π°Ρ†ΠΈΠΈ
576 var result = PromiseHelper
652 var result = PromiseHelper
577 .Sleep(100000000, "HEM ENABLED!!!")
653 .Sleep(100000000, "HEM ENABLED!!!")
578 .Then(s => {
654 .Then(s => {
579 pSurvive.Resolve(false);
655 pSurvive.Resolve(false);
580 return s;
656 return s;
581 });
657 });
582
658
583 result
659 result
584 .Cancelled(() => pSurvive.Resolve(true));
660 .Cancelled(() => pSurvive.Resolve(true));
585
661
586 return result;
662 return result;
587 });
663 });
588
664
589 hemStarted.WaitOne();
665 hemStarted.WaitOne();
590 p.Cancel();
666 p.Cancel();
591
667
592 try {
668 try {
593 p.Join();
669 p.Join();
594 } catch (OperationCanceledException) {
670 } catch (OperationCanceledException) {
595 Assert.IsTrue(pSurvive.Join());
671 Assert.IsTrue(pSurvive.Join());
596 }
672 }
597 }
673 }
598 }
674 }
599 }
675 }
600
676
@@ -1,474 +1,463
1 using System.Threading;
1 using System.Threading;
2 using System.Collections.Generic;
2 using System.Collections.Generic;
3 using System;
3 using System;
4 using System.Collections;
4 using System.Collections;
5
5
6 namespace Implab.Parallels {
6 namespace Implab.Parallels {
7 public class AsyncQueue<T> : IEnumerable<T> {
7 public class AsyncQueue<T> : IEnumerable<T> {
8 class Chunk {
8 class Chunk {
9 public Chunk next;
9 public Chunk next;
10
10
11 int m_low;
11 int m_low;
12 int m_hi;
12 int m_hi;
13 int m_alloc;
13 int m_alloc;
14 readonly int m_size;
14 readonly int m_size;
15 readonly T[] m_data;
15 readonly T[] m_data;
16
16
17 public Chunk(int size) {
17 public Chunk(int size) {
18 m_size = size;
18 m_size = size;
19 m_data = new T[size];
19 m_data = new T[size];
20 }
20 }
21
21
22 public Chunk(int size, T value) {
22 public Chunk(int size, T value) {
23 m_size = size;
23 m_size = size;
24 m_hi = 1;
24 m_hi = 1;
25 m_alloc = 1;
25 m_alloc = 1;
26 m_data = new T[size];
26 m_data = new T[size];
27 m_data[0] = value;
27 m_data[0] = value;
28 }
28 }
29
29
30 public Chunk(int size, T[] data, int offset, int length, int alloc) {
30 public Chunk(int size, T[] data, int offset, int length, int alloc) {
31 m_size = size;
31 m_size = size;
32 m_hi = length;
32 m_hi = length;
33 m_alloc = alloc;
33 m_alloc = alloc;
34 m_data = new T[size];
34 m_data = new T[size];
35 Array.Copy(data, offset, m_data, 0, length);
35 Array.Copy(data, offset, m_data, 0, length);
36 }
36 }
37
37
38 public int Low {
38 public int Low {
39 get { return m_low; }
39 get { return m_low; }
40 }
40 }
41
41
42 public int Hi {
42 public int Hi {
43 get { return m_hi; }
43 get { return m_hi; }
44 }
44 }
45
45
46 public bool TryEnqueue(T value, out bool extend) {
46 public bool TryEnqueue(T value, out bool extend) {
47 var alloc = Interlocked.Increment(ref m_alloc) - 1;
47 var alloc = Interlocked.Increment(ref m_alloc) - 1;
48
48
49 if (alloc >= m_size) {
49 if (alloc >= m_size) {
50 extend = alloc == m_size;
50 extend = alloc == m_size;
51 return false;
51 return false;
52 }
52 }
53
53
54 extend = false;
54 extend = false;
55 m_data[alloc] = value;
55 m_data[alloc] = value;
56
56
57 while (alloc != Interlocked.CompareExchange(ref m_hi, alloc + 1, alloc)) {
57 while (alloc != Interlocked.CompareExchange(ref m_hi, alloc + 1, alloc)) {
58 // spin wait for commit
58 // spin wait for commit
59 }
59 }
60 return true;
60 return true;
61 }
61 }
62
62
63 public bool TryDequeue(out T value, out bool recycle) {
63 public bool TryDequeue(out T value, out bool recycle) {
64 int low;
64 int low;
65 do {
65 do {
66 low = m_low;
66 low = m_low;
67 if (low >= m_hi) {
67 if (low >= m_hi) {
68 value = default(T);
68 value = default(T);
69 recycle = (low == m_size);
69 recycle = (low == m_size);
70 return false;
70 return false;
71 }
71 }
72 } while(low != Interlocked.CompareExchange(ref m_low, low + 1, low));
72 } while(low != Interlocked.CompareExchange(ref m_low, low + 1, low));
73
73
74 recycle = (low == m_size - 1);
74 recycle = (low == m_size - 1);
75 value = m_data[low];
75 value = m_data[low];
76
76
77 return true;
77 return true;
78 }
78 }
79
79
80 public bool TryEnqueueBatch(T[] batch, int offset, int length, out int enqueued, out bool extend) {
80 public bool TryEnqueueBatch(T[] batch, int offset, int length, out int enqueued, out bool extend) {
81 int alloc;
81 //int alloc;
82 int allocSize;
82 //int allocSize;
83
84 // in case the batch size is larger than a free space in chunk
85 // tailGap is used to over allocate the space in the chunk to
86 // get exclusive permission on creation of the next one.
87 int tailGap = 0;
88
89 do {
90 alloc = m_alloc;
91
83
92 if (alloc > m_size) {
84 var alloc = Interlocked.Add(ref m_alloc, length) - length;
93 // the chunk is full and someone already
85 if (alloc > m_size) {
94 // creating the new one
86 // the chunk is full and someone already
95 enqueued = 0; // nothing was added
87 // creating the new one
96 extend = false; // the caller shouldn't try to extend the queue
88 enqueued = 0; // nothing was added
97 return false; // nothing was added
89 extend = false; // the caller shouldn't try to extend the queue
98 }
90 return false; // nothing was added
91 }
99
92
100 allocSize = Math.Min(m_size - alloc, length);
93 enqueued = Math.Min(m_size - alloc, length);
101 if (allocSize < length) // the chunk doesn't have enough space to hold the whole batch
94 extend = length > enqueued;
102 tailGap = 1; // overallocate space to get exclusive permission to extend queue
103 } while(alloc != Interlocked.CompareExchange(ref m_alloc, alloc + allocSize + tailGap, alloc));
104
105 extend = tailGap != 0;
106 enqueued = allocSize;
107
95
108 // if the chunk was already full then length > 0, allocSize = 0, tailGap = 1
96 if (enqueued == 0)
109 if (alloc == m_size)
110 return false;
97 return false;
111
98
112 Array.Copy(batch, offset, m_data, alloc, allocSize);
113
99
114 while (alloc != Interlocked.CompareExchange(ref m_hi, alloc + allocSize, alloc)) {
100 Array.Copy(batch, offset, m_data, alloc, enqueued);
101
102 while (alloc != Interlocked.CompareExchange(ref m_hi, alloc + enqueued, alloc)) {
115 // spin wait for commit
103 // spin wait for commit
116 }
104 }
105
117 return true;
106 return true;
118 }
107 }
119
108
120 public bool TryDequeueBatch(T[] buffer, int offset, int length,out int dequeued, out bool recycle) {
109 public bool TryDequeueBatch(T[] buffer, int offset, int length,out int dequeued, out bool recycle) {
121 int low, hi, batchSize;
110 int low, hi, batchSize;
122
111
123 do {
112 do {
124 low = m_low;
113 low = m_low;
125 hi = m_hi;
114 hi = m_hi;
126 if (low >= hi) {
115 if (low >= hi) {
127 dequeued = 0;
116 dequeued = 0;
128 recycle = (low == m_size); // recycling could be restarted and we need to signal again
117 recycle = (low == m_size); // recycling could be restarted and we need to signal again
129 return false;
118 return false;
130 }
119 }
131 batchSize = Math.Min(hi - low, length);
120 batchSize = Math.Min(hi - low, length);
132 } while(low != Interlocked.CompareExchange(ref m_low, low + batchSize, low));
121 } while(low != Interlocked.CompareExchange(ref m_low, low + batchSize, low));
133
122
134 recycle = (low == m_size - batchSize);
123 recycle = (low == m_size - batchSize);
135 dequeued = batchSize;
124 dequeued = batchSize;
136
125
137 Array.Copy(m_data, low, buffer, offset, batchSize);
126 Array.Copy(m_data, low, buffer, offset, batchSize);
138
127
139 return true;
128 return true;
140 }
129 }
141
130
142 public T GetAt(int pos) {
131 public T GetAt(int pos) {
143 return m_data[pos];
132 return m_data[pos];
144 }
133 }
145 }
134 }
146
135
147 public const int DEFAULT_CHUNK_SIZE = 32;
136 public const int DEFAULT_CHUNK_SIZE = 32;
148 public const int MAX_CHUNK_SIZE = 262144;
137 public const int MAX_CHUNK_SIZE = 262144;
149
138
150 readonly int m_chunkSize = DEFAULT_CHUNK_SIZE;
139 readonly int m_chunkSize = DEFAULT_CHUNK_SIZE;
151
140
152 Chunk m_first;
141 Chunk m_first;
153 Chunk m_last;
142 Chunk m_last;
154
143
155 public AsyncQueue() {
144 public AsyncQueue() {
156 m_last = m_first = new Chunk(m_chunkSize);
145 m_last = m_first = new Chunk(m_chunkSize);
157 }
146 }
158
147
159 /// <summary>
148 /// <summary>
160 /// Adds the specified value to the queue.
149 /// Adds the specified value to the queue.
161 /// </summary>
150 /// </summary>
162 /// <param name="value">Tha value which will be added to the queue.</param>
151 /// <param name="value">Tha value which will be added to the queue.</param>
163 public void Enqueue(T value) {
152 public void Enqueue(T value) {
164 var last = m_last;
153 var last = m_last;
165 // spin wait to the new chunk
154 // spin wait to the new chunk
166 bool extend = true;
155 bool extend = true;
167 while (last == null || !last.TryEnqueue(value, out extend)) {
156 while (last == null || !last.TryEnqueue(value, out extend)) {
168 // try to extend queue
157 // try to extend queue
169 if (extend || last == null) {
158 if (extend || last == null) {
170 var chunk = new Chunk(m_chunkSize, value);
159 var chunk = new Chunk(m_chunkSize, value);
171 if (EnqueueChunk(last, chunk))
160 if (EnqueueChunk(last, chunk))
172 break;
161 break; // success! exit!
173 last = m_last;
162 last = m_last;
174 } else {
163 } else {
175 while (last == m_last) {
164 while (last == m_last) {
176 Thread.MemoryBarrier();
165 Thread.MemoryBarrier();
177 }
166 }
178 last = m_last;
167 last = m_last;
179 }
168 }
180 }
169 }
181 }
170 }
182
171
183 /// <summary>
172 /// <summary>
184 /// Adds the specified data to the queue.
173 /// Adds the specified data to the queue.
185 /// </summary>
174 /// </summary>
186 /// <param name="data">The buffer which contains the data to be enqueued.</param>
175 /// <param name="data">The buffer which contains the data to be enqueued.</param>
187 /// <param name="offset">The offset of the data in the buffer.</param>
176 /// <param name="offset">The offset of the data in the buffer.</param>
188 /// <param name="length">The size of the data to read from the buffer.</param>
177 /// <param name="length">The size of the data to read from the buffer.</param>
189 public void EnqueueRange(T[] data, int offset, int length) {
178 public void EnqueueRange(T[] data, int offset, int length) {
190 if (data == null)
179 if (data == null)
191 throw new ArgumentNullException("data");
180 throw new ArgumentNullException("data");
192 if (offset < 0)
181 if (offset < 0)
193 throw new ArgumentOutOfRangeException("offset");
182 throw new ArgumentOutOfRangeException("offset");
194 if (length < 1 || offset + length > data.Length)
183 if (length < 1 || offset + length > data.Length)
195 throw new ArgumentOutOfRangeException("length");
184 throw new ArgumentOutOfRangeException("length");
196
185
197 var last = m_last;
186 var last = m_last;
198
187
199 bool extend;
188 bool extend;
200 int enqueued;
189 int enqueued;
201
190
202 while (length > 0) {
191 while (length > 0) {
203 extend = true;
192 extend = true;
204 if (last != null && last.TryEnqueueBatch(data, offset, length, out enqueued, out extend)) {
193 if (last != null && last.TryEnqueueBatch(data, offset, length, out enqueued, out extend)) {
205 length -= enqueued;
194 length -= enqueued;
206 offset += enqueued;
195 offset += enqueued;
207 }
196 }
208
197
209 if (extend) {
198 if (extend) {
210 // there was no enough space in the chunk
199 // there was no enough space in the chunk
211 // or there was no chunks in the queue
200 // or there was no chunks in the queue
212
201
213 while (length > 0) {
202 while (length > 0) {
214
203
215 var size = Math.Min(length, MAX_CHUNK_SIZE);
204 var size = Math.Min(length, MAX_CHUNK_SIZE);
216
205
217 var chunk = new Chunk(
206 var chunk = new Chunk(
218 Math.Max(size, m_chunkSize),
207 Math.Max(size, m_chunkSize),
219 data,
208 data,
220 offset,
209 offset,
221 size,
210 size,
222 length // length >= size
211 length // length >= size
223 );
212 );
224
213
225 if (!EnqueueChunk(last, chunk)) {
214 if (!EnqueueChunk(last, chunk)) {
226 // looks like the queue has been updated then proceed from the beginning
215 // looks like the queue has been updated then proceed from the beginning
227 last = m_last;
216 last = m_last;
228 break;
217 break;
229 }
218 }
230
219
231 // we have successfully added the new chunk
220 // we have successfully added the new chunk
232 last = chunk;
221 last = chunk;
233 length -= size;
222 length -= size;
234 offset += size;
223 offset += size;
235 }
224 }
236 } else {
225 } else {
237 // we don't need to extend the queue, if we successfully enqueued data
226 // we don't need to extend the queue, if we successfully enqueued data
238 if (length == 0)
227 if (length == 0)
239 break;
228 break;
240
229
241 // if we need to wait while someone is extending the queue
230 // if we need to wait while someone is extending the queue
242 // spinwait
231 // spinwait
243 while (last == m_last) {
232 while (last == m_last) {
244 Thread.MemoryBarrier();
233 Thread.MemoryBarrier();
245 }
234 }
246
235
247 last = m_last;
236 last = m_last;
248 }
237 }
249 }
238 }
250 }
239 }
251
240
252 /// <summary>
241 /// <summary>
253 /// Tries to retrieve the first element from the queue.
242 /// Tries to retrieve the first element from the queue.
254 /// </summary>
243 /// </summary>
255 /// <returns><c>true</c>, if element is dequeued, <c>false</c> otherwise.</returns>
244 /// <returns><c>true</c>, if element is dequeued, <c>false</c> otherwise.</returns>
256 /// <param name="value">The value of the dequeued element.</param>
245 /// <param name="value">The value of the dequeued element.</param>
257 public bool TryDequeue(out T value) {
246 public bool TryDequeue(out T value) {
258 var chunk = m_first;
247 var chunk = m_first;
259 bool recycle;
248 bool recycle;
260 while (chunk != null) {
249 while (chunk != null) {
261
250
262 var result = chunk.TryDequeue(out value, out recycle);
251 var result = chunk.TryDequeue(out value, out recycle);
263
252
264 if (recycle) // this chunk is waste
253 if (recycle) // this chunk is waste
265 RecycleFirstChunk(chunk);
254 RecycleFirstChunk(chunk);
266 else
255 else
267 return result; // this chunk is usable and returned actual result
256 return result; // this chunk is usable and returned actual result
268
257
269 if (result) // this chunk is waste but the true result is always actual
258 if (result) // this chunk is waste but the true result is always actual
270 return true;
259 return true;
271
260
272 // try again
261 // try again
273 chunk = m_first;
262 chunk = m_first;
274 }
263 }
275
264
276 // the queue is empty
265 // the queue is empty
277 value = default(T);
266 value = default(T);
278 return false;
267 return false;
279 }
268 }
280
269
281 /// <summary>
270 /// <summary>
282 /// Tries to dequeue the specified amount of data from the queue.
271 /// Tries to dequeue the specified amount of data from the queue.
283 /// </summary>
272 /// </summary>
284 /// <returns><c>true</c>, if data was deuqueued, <c>false</c> otherwise.</returns>
273 /// <returns><c>true</c>, if data was deuqueued, <c>false</c> otherwise.</returns>
285 /// <param name="buffer">The buffer to which the data will be written.</param>
274 /// <param name="buffer">The buffer to which the data will be written.</param>
286 /// <param name="offset">The offset in the buffer at which the data will be written.</param>
275 /// <param name="offset">The offset in the buffer at which the data will be written.</param>
287 /// <param name="length">The maximum amount of data to be retrieved.</param>
276 /// <param name="length">The maximum amount of data to be retrieved.</param>
288 /// <param name="dequeued">The actual amout of the retrieved data.</param>
277 /// <param name="dequeued">The actual amout of the retrieved data.</param>
289 public bool TryDequeueRange(T[] buffer, int offset, int length, out int dequeued) {
278 public bool TryDequeueRange(T[] buffer, int offset, int length, out int dequeued) {
290 if (buffer == null)
279 if (buffer == null)
291 throw new ArgumentNullException("buffer");
280 throw new ArgumentNullException("buffer");
292 if (offset < 0)
281 if (offset < 0)
293 throw new ArgumentOutOfRangeException("offset");
282 throw new ArgumentOutOfRangeException("offset");
294 if (length < 1 || offset + length > buffer.Length)
283 if (length < 1 || offset + length > buffer.Length)
295 throw new ArgumentOutOfRangeException("length");
284 throw new ArgumentOutOfRangeException("length");
296
285
297 var chunk = m_first;
286 var chunk = m_first;
298 bool recycle;
287 bool recycle;
299 dequeued = 0;
288 dequeued = 0;
300 while (chunk != null) {
289 while (chunk != null) {
301
290
302 int actual;
291 int actual;
303 if (chunk.TryDequeueBatch(buffer, offset, length, out actual, out recycle)) {
292 if (chunk.TryDequeueBatch(buffer, offset, length, out actual, out recycle)) {
304 offset += actual;
293 offset += actual;
305 length -= actual;
294 length -= actual;
306 dequeued += actual;
295 dequeued += actual;
307 }
296 }
308
297
309 if (recycle) // this chunk is waste
298 if (recycle) // this chunk is waste
310 RecycleFirstChunk(chunk);
299 RecycleFirstChunk(chunk);
311 else if (actual == 0)
300 else if (actual == 0)
312 break; // the chunk is usable, but doesn't contain any data (it's the last chunk in the queue)
301 break; // the chunk is usable, but doesn't contain any data (it's the last chunk in the queue)
313
302
314 if (length == 0)
303 if (length == 0)
315 return true;
304 return true;
316
305
317 // we still may dequeue something
306 // we still may dequeue something
318 // try again
307 // try again
319 chunk = m_first;
308 chunk = m_first;
320 }
309 }
321
310
322 return dequeued != 0;
311 return dequeued != 0;
323 }
312 }
324
313
325 /// <summary>
314 /// <summary>
326 /// Tries to dequeue all remaining data in the first chunk.
315 /// Tries to dequeue all remaining data in the first chunk.
327 /// </summary>
316 /// </summary>
328 /// <returns><c>true</c>, if data was dequeued, <c>false</c> otherwise.</returns>
317 /// <returns><c>true</c>, if data was dequeued, <c>false</c> otherwise.</returns>
329 /// <param name="buffer">The buffer to which data will be written.</param>
318 /// <param name="buffer">The buffer to which the data will be written.</param>
330 /// <param name="offset">The offset in the buffer at which the data will be written.</param>
319 /// <param name="offset">The offset in the buffer at which the data will be written.</param>
331 /// <param name="length">Tha maximum amount of the data to be dequeued.</param>
320 /// <param name="length">Tha maximum amount of the data to be dequeued.</param>
332 /// <param name="dequeued">The actual amount of the dequeued data.</param>
321 /// <param name="dequeued">The actual amount of the dequeued data.</param>
333 public bool TryDequeueChunk(T[] buffer, int offset, int length, out int dequeued) {
322 public bool TryDequeueChunk(T[] buffer, int offset, int length, out int dequeued) {
334 if (buffer == null)
323 if (buffer == null)
335 throw new ArgumentNullException("buffer");
324 throw new ArgumentNullException("buffer");
336 if (offset < 0)
325 if (offset < 0)
337 throw new ArgumentOutOfRangeException("offset");
326 throw new ArgumentOutOfRangeException("offset");
338 if (length < 1 || offset + length > buffer.Length)
327 if (length < 1 || offset + length > buffer.Length)
339 throw new ArgumentOutOfRangeException("length");
328 throw new ArgumentOutOfRangeException("length");
340
329
341 var chunk = m_first;
330 var chunk = m_first;
342 bool recycle;
331 bool recycle;
343 dequeued = 0;
332 dequeued = 0;
344
333
345 while (chunk != null) {
334 while (chunk != null) {
346
335
347 int actual;
336 int actual;
348 if (chunk.TryDequeueBatch(buffer, offset, length, out actual, out recycle)) {
337 if (chunk.TryDequeueBatch(buffer, offset, length, out actual, out recycle)) {
349 dequeued = actual;
338 dequeued = actual;
350 }
339 }
351
340
352 if (recycle) // this chunk is waste
341 if (recycle) // this chunk is waste
353 RecycleFirstChunk(chunk);
342 RecycleFirstChunk(chunk);
354
343
355 // if we have dequeued any data, then return
344 // if we have dequeued any data, then return
356 if (dequeued != 0)
345 if (dequeued != 0)
357 return true;
346 return true;
358
347
359 // we still may dequeue something
348 // we still may dequeue something
360 // try again
349 // try again
361 chunk = m_first;
350 chunk = m_first;
362 }
351 }
363
352
364 return false;
353 return false;
365 }
354 }
366
355
367 bool EnqueueChunk(Chunk last, Chunk chunk) {
356 bool EnqueueChunk(Chunk last, Chunk chunk) {
368 if (Interlocked.CompareExchange(ref m_last, chunk, last) != last)
357 if (Interlocked.CompareExchange(ref m_last, chunk, last) != last)
369 return false;
358 return false;
370
359
371 if (last != null)
360 if (last != null)
372 last.next = chunk;
361 last.next = chunk;
373 else
362 else
374 m_first = chunk;
363 m_first = chunk;
375 return true;
364 return true;
376 }
365 }
377
366
378 void RecycleFirstChunk(Chunk first) {
367 void RecycleFirstChunk(Chunk first) {
379 var next = first.next;
368 var next = first.next;
380
369
381 if (next == null) {
370 if (next == null) {
382 // looks like this is the last chunk
371 // looks like this is the last chunk
383 if (first != Interlocked.CompareExchange(ref m_last, null, first)) {
372 if (first != Interlocked.CompareExchange(ref m_last, null, first)) {
384 // race
373 // race
385 // maybe someone already recycled this chunk
374 // maybe someone already recycled this chunk
386 // or a new chunk has been appedned to the queue
375 // or a new chunk has been appedned to the queue
387
376
388 return; // give up
377 return; // give up
389 }
378 }
390 // the tail is updated
379 // the tail is updated
391 }
380 }
392
381
393 // we need to update the head
382 // we need to update the head
394 Interlocked.CompareExchange(ref m_first, next, first);
383 Interlocked.CompareExchange(ref m_first, next, first);
395 // if the head is already updated then give up
384 // if the head is already updated then give up
396 return;
385 return;
397
386
398 }
387 }
399
388
400 #region IEnumerable implementation
389 #region IEnumerable implementation
401
390
402 class Enumerator : IEnumerator<T> {
391 class Enumerator : IEnumerator<T> {
403 Chunk m_current;
392 Chunk m_current;
404 int m_pos = -1;
393 int m_pos = -1;
405
394
406 public Enumerator(Chunk fisrt) {
395 public Enumerator(Chunk fisrt) {
407 m_current = fisrt;
396 m_current = fisrt;
408 }
397 }
409
398
410 #region IEnumerator implementation
399 #region IEnumerator implementation
411
400
412 public bool MoveNext() {
401 public bool MoveNext() {
413 if (m_current == null)
402 if (m_current == null)
414 return false;
403 return false;
415
404
416 if (m_pos == -1)
405 if (m_pos == -1)
417 m_pos = m_current.Low;
406 m_pos = m_current.Low;
418 else
407 else
419 m_pos++;
408 m_pos++;
420 if (m_pos == m_current.Hi) {
409 if (m_pos == m_current.Hi) {
421 m_pos = 0;
410 m_pos = 0;
422 m_current = m_current.next;
411 m_current = m_current.next;
423 }
412 }
424
413
425 return true;
414 return true;
426 }
415 }
427
416
428 public void Reset() {
417 public void Reset() {
429 throw new NotSupportedException();
418 throw new NotSupportedException();
430 }
419 }
431
420
432 object IEnumerator.Current {
421 object IEnumerator.Current {
433 get {
422 get {
434 return Current;
423 return Current;
435 }
424 }
436 }
425 }
437
426
438 #endregion
427 #endregion
439
428
440 #region IDisposable implementation
429 #region IDisposable implementation
441
430
442 public void Dispose() {
431 public void Dispose() {
443 }
432 }
444
433
445 #endregion
434 #endregion
446
435
447 #region IEnumerator implementation
436 #region IEnumerator implementation
448
437
449 public T Current {
438 public T Current {
450 get {
439 get {
451 if (m_pos == -1 || m_current == null)
440 if (m_pos == -1 || m_current == null)
452 throw new InvalidOperationException();
441 throw new InvalidOperationException();
453 return m_current.GetAt(m_pos);
442 return m_current.GetAt(m_pos);
454 }
443 }
455 }
444 }
456
445
457 #endregion
446 #endregion
458 }
447 }
459
448
460 public IEnumerator<T> GetEnumerator() {
449 public IEnumerator<T> GetEnumerator() {
461 return new Enumerator(m_first);
450 return new Enumerator(m_first);
462 }
451 }
463
452
464 #endregion
453 #endregion
465
454
466 #region IEnumerable implementation
455 #region IEnumerable implementation
467
456
468 IEnumerator IEnumerable.GetEnumerator() {
457 IEnumerator IEnumerable.GetEnumerator() {
469 return GetEnumerator();
458 return GetEnumerator();
470 }
459 }
471
460
472 #endregion
461 #endregion
473 }
462 }
474 }
463 }
@@ -1,120 +1,120
1 using System;
1 using System;
2 using Implab.Diagnostics;
2 using Implab.Diagnostics;
3 using Implab.Parallels;
3 using Implab.Parallels;
4 using Implab;
4 using Implab;
5 using System.Collections.Generic;
5 using System.Collections.Generic;
6 using System.Collections.Concurrent;
6 using System.Collections.Concurrent;
7
7
8 namespace MonoPlay {
8 namespace MonoPlay {
9 class MainClass {
9 class MainClass {
10 public static void Main(string[] args) {
10 public static void Main(string[] args) {
11 if (args == null)
11 if (args == null)
12 throw new ArgumentNullException("args");
12 throw new ArgumentNullException("args");
13
13
14 var q1 = new AsyncQueue<int>();
14 var q1 = new AsyncQueue<int>();
15 var q2 = new Queue<int>();
15 var q2 = new Queue<int>();
16
16
17 const int count = 10000000;
17 const int count = 10000000;
18
18
19 int res1 = 0, res2 = 0;
19 int res1 = 0, res2 = 0;
20 var t1 = Environment.TickCount;
20 var t1 = Environment.TickCount;
21
21
22 AsyncPool.RunThread(
22 AsyncPool.RunThread(
23 () => {
23 () => {
24 for (var i = 0; i < count; i++)
24 for (var i = 0; i < count; i++)
25 q1.Enqueue(1);
25 q1.Enqueue(1);
26 Console.WriteLine("done writer #1: {0} ms", Environment.TickCount - t1);
26 Console.WriteLine("done writer #1: {0} ms", Environment.TickCount - t1);
27 },
27 },
28 () => {
28 () => {
29 for (var i = 0; i < count; i++)
29 for (var i = 0; i < count; i++)
30 q1.Enqueue(2);
30 q1.Enqueue(2);
31 Console.WriteLine("done writer #2: {0} ms", Environment.TickCount - t1);
31 Console.WriteLine("done writer #2: {0} ms", Environment.TickCount - t1);
32 },
32 },
33 () => {
33 () => {
34 int temp = 0;
34 int temp = 0;
35 int i = 0;
35 int i = 0;
36 while (i < count)
36 while (i < count)
37 if (q1.TryDequeue(out temp)) {
37 if (q1.TryDequeue(out temp)) {
38 i++;
38 i++;
39 res1 += temp;
39 res1 += temp;
40 }
40 }
41 Console.WriteLine("done reader #1: {0} ms", Environment.TickCount - t1);
41 Console.WriteLine("done reader #1: {0} ms", Environment.TickCount - t1);
42 },
42 },
43 () => {
43 () => {
44 int temp = 0;
44 int temp = 0;
45 int i = 0;
45 int i = 0;
46 while (i < count)
46 while (i < count)
47 if (q1.TryDequeue(out temp)) {
47 if (q1.TryDequeue(out temp)) {
48 i++;
48 i++;
49 res2 += temp;
49 res2 += temp;
50 }
50 }
51 Console.WriteLine("done reader #2: {0} ms", Environment.TickCount - t1);
51 Console.WriteLine("done reader #2: {0} ms", Environment.TickCount - t1);
52 }
52 }
53 )
53 )
54 .Combine()
54 .Combine()
55 .Join();
55 .Join();
56
56
57 Console.WriteLine("done: {0} ms, summ#1: {1}, summ#2: {2}, total: {3}", Environment.TickCount - t1, res1, res2, res1 + res2);
57 Console.WriteLine("done: {0} ms, summ#1: {1}, summ#2: {2}, total: {3}, count: {4}", Environment.TickCount - t1, res1, res2, res1 + res2, count*2);
58
58
59 var t2 = Environment.TickCount;
59 var t2 = Environment.TickCount;
60 Console.WriteLine("MTQueue: {0} ms", t2 - t1);
60 Console.WriteLine("MTQueue: {0} ms", t2 - t1);
61
61
62 t1 = Environment.TickCount;
62 t1 = Environment.TickCount;
63
63
64 for (var i = 0; i < count * 2; i++)
64 for (var i = 0; i < count * 2; i++)
65 q2.Enqueue(i);
65 q2.Enqueue(i);
66
66
67 for (var i = 0; i < count * 2; i++)
67 for (var i = 0; i < count * 2; i++)
68 q2.Dequeue();
68 q2.Dequeue();
69
69
70 t2 = Environment.TickCount;
70 t2 = Environment.TickCount;
71 Console.WriteLine("Queue: {0} ms", t2 - t1);
71 Console.WriteLine("Queue: {0} ms", t2 - t1);
72
72
73 q2 = new Queue<int>();
73 q2 = new Queue<int>();
74
74
75 t1 = Environment.TickCount;
75 t1 = Environment.TickCount;
76
76
77
77
78 AsyncPool.RunThread(
78 AsyncPool.RunThread(
79 () => {
79 () => {
80 for (var i = 0; i < count; i++)
80 for (var i = 0; i < count; i++)
81 lock (q2)
81 lock (q2)
82 q2.Enqueue(i);
82 q2.Enqueue(i);
83 },
83 },
84 () => {
84 () => {
85 for (var i = 0; i < count; i++)
85 for (var i = 0; i < count; i++)
86 lock (q2)
86 lock (q2)
87 q2.Enqueue(i);
87 q2.Enqueue(i);
88 },
88 },
89 () => {
89 () => {
90 for (int i = 0; i < count ;)
90 for (int i = 0; i < count ;)
91 lock (q2) {
91 lock (q2) {
92 if (q2.Count == 0)
92 if (q2.Count == 0)
93 continue;
93 continue;
94 q2.Dequeue();
94 q2.Dequeue();
95 i++;
95 i++;
96 }
96 }
97
97
98 },
98 },
99 () => {
99 () => {
100 for (int i = 0; i < count ;)
100 for (int i = 0; i < count ;)
101 lock (q2) {
101 lock (q2) {
102 if (q2.Count == 0)
102 if (q2.Count == 0)
103 continue;
103 continue;
104 q2.Dequeue();
104 q2.Dequeue();
105 i++;
105 i++;
106 }
106 }
107
107
108 }
108 }
109 )
109 )
110 .Combine()
110 .Combine()
111 .Join();
111 .Join();
112
112
113
113
114
114
115 t2 = Environment.TickCount;
115 t2 = Environment.TickCount;
116 Console.WriteLine("Queue+Lock: {0} ms", t2 - t1);
116 Console.WriteLine("Queue+Lock: {0} ms", t2 - t1);
117
117
118 }
118 }
119 }
119 }
120 }
120 }
General Comments 0
You need to be logged in to leave comments. Login now