##// END OF EJS Templates
improved performance of promises
cin -
r125:f803565868a4 v2
parent child
Show More
@@ -1,779 +1,779
1 using System;
1 using System;
2 using System.Reflection;
2 using System.Reflection;
3 using System.Threading;
3 using System.Threading;
4 using Implab.Parallels;
4 using Implab.Parallels;
5
5
6 #if MONO
6 #if MONO
7
7
8 using NUnit.Framework;
8 using NUnit.Framework;
9 using TestClassAttribute = NUnit.Framework.TestFixtureAttribute;
9 using TestClassAttribute = NUnit.Framework.TestFixtureAttribute;
10 using TestMethod = NUnit.Framework.TestAttribute;
10 using TestMethod = NUnit.Framework.TestAttribute;
11
11
12 #else
12 #else
13
13
14 using Microsoft.VisualStudio.TestTools.UnitTesting;
14 using Microsoft.VisualStudio.TestTools.UnitTesting;
15
15
16 #endif
16 #endif
17
17
18 namespace Implab.Test {
18 namespace Implab.Test {
19 [TestClass]
19 [TestClass]
20 public class AsyncTests {
20 public class AsyncTests {
21 [TestMethod]
21 [TestMethod]
22 public void ResolveTest() {
22 public void ResolveTest() {
23 int res = -1;
23 int res = -1;
24 var p = new Promise<int>();
24 var p = new Promise<int>();
25 p.Then(x => res = x);
25 p.Then(x => res = x);
26 p.Resolve(100);
26 p.Resolve(100);
27
27
28 Assert.AreEqual(100, res);
28 Assert.AreEqual(100, res);
29 }
29 }
30
30
31 [TestMethod]
31 [TestMethod]
32 public void RejectTest() {
32 public void RejectTest() {
33 int res = -1;
33 int res = -1;
34 Exception err = null;
34 Exception err = null;
35
35
36 var p = new Promise<int>();
36 var p = new Promise<int>();
37 p.Then(
37 p.Then(
38 x => res = x,
38 x => res = x,
39 e => {
39 e => {
40 err = e;
40 err = e;
41 return -2;
41 return -2;
42 }
42 }
43 );
43 );
44 p.Reject(new ApplicationException("error"));
44 p.Reject(new ApplicationException("error"));
45
45
46 Assert.AreEqual(res, -1);
46 Assert.AreEqual(res, -1);
47 Assert.AreEqual(err.Message, "error");
47 Assert.AreEqual(err.Message, "error");
48
48
49 }
49 }
50
50
51 [TestMethod]
51 [TestMethod]
52 public void CancelExceptionTest() {
52 public void CancelExceptionTest() {
53 var p = new Promise<bool>();
53 var p = new Promise<bool>();
54 p.Cancel();
54 p.Cancel();
55
55
56 var p2 = p.Cancelled(() => {
56 var p2 = p.Cancelled(() => {
57 throw new ApplicationException("CANCELLED");
57 throw new ApplicationException("CANCELLED");
58 });
58 });
59
59
60 try {
60 try {
61 p2.Join();
61 p2.Join();
62 Assert.Fail();
62 Assert.Fail();
63 } catch (ApplicationException err) {
63 } catch (ApplicationException err) {
64 Assert.AreEqual("CANCELLED", err.InnerException.Message);
64 Assert.AreEqual("CANCELLED", err.InnerException.Message);
65 }
65 }
66
66
67 }
67 }
68
68
69 [TestMethod]
69 [TestMethod]
70 public void ContinueOnCancelTest() {
70 public void ContinueOnCancelTest() {
71 var p = new Promise<bool>();
71 var p = new Promise<bool>();
72 p.Cancel();
72 p.Cancel();
73
73
74 var p2 = p
74 var p2 = p
75 .Cancelled<bool>(() => {
75 .Cancelled<bool>(() => {
76 throw new ApplicationException("CANCELLED");
76 throw new ApplicationException("CANCELLED");
77 })
77 })
78 .Error(e => true);
78 .Error(e => true);
79
79
80 Assert.AreEqual(true, p2.Join());
80 Assert.AreEqual(true, p2.Join());
81 }
81 }
82
82
83 [TestMethod]
83 [TestMethod]
84 public void JoinSuccessTest() {
84 public void JoinSuccessTest() {
85 var p = new Promise<int>();
85 var p = new Promise<int>();
86 p.Resolve(100);
86 p.Resolve(100);
87 Assert.AreEqual(p.Join(), 100);
87 Assert.AreEqual(p.Join(), 100);
88 }
88 }
89
89
90 [TestMethod]
90 [TestMethod]
91 public void JoinFailTest() {
91 public void JoinFailTest() {
92 var p = new Promise<int>();
92 var p = new Promise<int>();
93 p.Reject(new ApplicationException("failed"));
93 p.Reject(new ApplicationException("failed"));
94
94
95 try {
95 try {
96 p.Join();
96 p.Join();
97 throw new ApplicationException("WRONG!");
97 throw new ApplicationException("WRONG!");
98 } catch (TargetInvocationException err) {
98 } catch (TargetInvocationException err) {
99 Assert.AreEqual(err.InnerException.Message, "failed");
99 Assert.AreEqual(err.InnerException.Message, "failed");
100 } catch {
100 } catch {
101 Assert.Fail("Got wrong excaption");
101 Assert.Fail("Got wrong excaption");
102 }
102 }
103 }
103 }
104
104
105 [TestMethod]
105 [TestMethod]
106 public void MapTest() {
106 public void MapTest() {
107 var p = new Promise<int>();
107 var p = new Promise<int>();
108
108
109 var p2 = p.Then(x => x.ToString());
109 var p2 = p.Then(x => x.ToString());
110 p.Resolve(100);
110 p.Resolve(100);
111
111
112 Assert.AreEqual(p2.Join(), "100");
112 Assert.AreEqual(p2.Join(), "100");
113 }
113 }
114
114
115 [TestMethod]
115 [TestMethod]
116 public void FixErrorTest() {
116 public void FixErrorTest() {
117 var p = new Promise<int>();
117 var p = new Promise<int>();
118
118
119 var p2 = p.Error(e => 101);
119 var p2 = p.Error(e => 101);
120
120
121 p.Reject(new Exception());
121 p.Reject(new Exception());
122
122
123 Assert.AreEqual(p2.Join(), 101);
123 Assert.AreEqual(p2.Join(), 101);
124 }
124 }
125
125
126 [TestMethod]
126 [TestMethod]
127 public void ChainTest() {
127 public void ChainTest() {
128 var p1 = new Promise<int>();
128 var p1 = new Promise<int>();
129
129
130 var p3 = p1.Chain(x => {
130 var p3 = p1.Chain(x => {
131 var p2 = new Promise<string>();
131 var p2 = new Promise<string>();
132 p2.Resolve(x.ToString());
132 p2.Resolve(x.ToString());
133 return p2;
133 return p2;
134 });
134 });
135
135
136 p1.Resolve(100);
136 p1.Resolve(100);
137
137
138 Assert.AreEqual(p3.Join(), "100");
138 Assert.AreEqual(p3.Join(), "100");
139 }
139 }
140
140
141 [TestMethod]
141 [TestMethod]
142 public void ChainFailTest() {
142 public void ChainFailTest() {
143 var p1 = new Promise<int>();
143 var p1 = new Promise<int>();
144
144
145 var p3 = p1.Chain(x => {
145 var p3 = p1.Chain(x => {
146 var p2 = new Promise<string>();
146 var p2 = new Promise<string>();
147 p2.Reject(new Exception("DIE!!!"));
147 p2.Reject(new Exception("DIE!!!"));
148 return p2;
148 return p2;
149 });
149 });
150
150
151 p1.Resolve(100);
151 p1.Resolve(100);
152
152
153 Assert.IsTrue(p3.IsResolved);
153 Assert.IsTrue(p3.IsResolved);
154 }
154 }
155
155
156 [TestMethod]
156 [TestMethod]
157 public void PoolTest() {
157 public void PoolTest() {
158 var pid = Thread.CurrentThread.ManagedThreadId;
158 var pid = Thread.CurrentThread.ManagedThreadId;
159 var p = AsyncPool.Invoke(() => Thread.CurrentThread.ManagedThreadId);
159 var p = AsyncPool.Invoke(() => Thread.CurrentThread.ManagedThreadId);
160
160
161 Assert.AreNotEqual(pid, p.Join());
161 Assert.AreNotEqual(pid, p.Join());
162 }
162 }
163
163
164 [TestMethod]
164 [TestMethod]
165 public void WorkerPoolSizeTest() {
165 public void WorkerPoolSizeTest() {
166 var pool = new WorkerPool(5, 10, 1);
166 var pool = new WorkerPool(5, 10, 1);
167
167
168 Assert.AreEqual(5, pool.PoolSize);
168 Assert.AreEqual(5, pool.PoolSize);
169
169
170 pool.Invoke(() => { Thread.Sleep(100000000); return 10; });
170 pool.Invoke(() => { Thread.Sleep(100000000); return 10; });
171 pool.Invoke(() => { Thread.Sleep(100000000); return 10; });
171 pool.Invoke(() => { Thread.Sleep(100000000); return 10; });
172 pool.Invoke(() => { Thread.Sleep(100000000); return 10; });
172 pool.Invoke(() => { Thread.Sleep(100000000); return 10; });
173
173
174 Assert.AreEqual(5, pool.PoolSize);
174 Assert.AreEqual(5, pool.PoolSize);
175
175
176 for (int i = 0; i < 100; i++)
176 for (int i = 0; i < 100; i++)
177 pool.Invoke(() => { Thread.Sleep(100000000); return 10; });
177 pool.Invoke(() => { Thread.Sleep(100000000); return 10; });
178 Thread.Sleep(200);
178 Thread.Sleep(200);
179 Assert.AreEqual(10, pool.PoolSize);
179 Assert.AreEqual(10, pool.PoolSize);
180
180
181 pool.Dispose();
181 pool.Dispose();
182 }
182 }
183
183
184 [TestMethod]
184 [TestMethod]
185 public void WorkerPoolCorrectTest() {
185 public void WorkerPoolCorrectTest() {
186 var pool = new WorkerPool(0,1000,100);
186 var pool = new WorkerPool(0,1000,100);
187
187
188 const int iterations = 1000;
188 const int iterations = 1000;
189 int pending = iterations;
189 int pending = iterations;
190 var stop = new ManualResetEvent(false);
190 var stop = new ManualResetEvent(false);
191
191
192 var count = 0;
192 var count = 0;
193 for (int i = 0; i < iterations; i++) {
193 for (int i = 0; i < iterations; i++) {
194 pool
194 pool
195 .Invoke(() => 1)
195 .Invoke(() => 1)
196 .Then(x => Interlocked.Add(ref count, x))
196 .Then(x => Interlocked.Add(ref count, x))
197 .Then(x => Math.Log10(x))
197 .Then(x => Math.Log10(x))
198 .On(() => {
198 .On(() => {
199 Interlocked.Decrement(ref pending);
199 Interlocked.Decrement(ref pending);
200 if (pending == 0)
200 if (pending == 0)
201 stop.Set();
201 stop.Set();
202 }, PromiseEventType.All);
202 }, PromiseEventType.All);
203 }
203 }
204
204
205 stop.WaitOne();
205 stop.WaitOne();
206
206
207 Assert.AreEqual(iterations, count);
207 Assert.AreEqual(iterations, count);
208 Console.WriteLine("Max threads: {0}", pool.MaxRunningThreads);
208 Console.WriteLine("Max threads: {0}", pool.MaxRunningThreads);
209 pool.Dispose();
209 pool.Dispose();
210
210
211 }
211 }
212
212
213 [TestMethod]
213 [TestMethod]
214 public void WorkerPoolDisposeTest() {
214 public void WorkerPoolDisposeTest() {
215 var pool = new WorkerPool(5, 20);
215 var pool = new WorkerPool(5, 20);
216 Assert.AreEqual(5, pool.PoolSize);
216 Assert.AreEqual(5, pool.PoolSize);
217 pool.Dispose();
217 pool.Dispose();
218 Thread.Sleep(500);
218 Thread.Sleep(500);
219 Assert.AreEqual(0, pool.PoolSize);
219 Assert.AreEqual(0, pool.PoolSize);
220 pool.Dispose();
220 pool.Dispose();
221 }
221 }
222
222
223 [TestMethod]
223 [TestMethod]
224 public void MTQueueTest() {
224 public void MTQueueTest() {
225 var queue = new MTQueue<int>();
225 var queue = new MTQueue<int>();
226 int res;
226 int res;
227
227
228 queue.Enqueue(10);
228 queue.Enqueue(10);
229 Assert.IsTrue(queue.TryDequeue(out res));
229 Assert.IsTrue(queue.TryDequeue(out res));
230 Assert.AreEqual(10, res);
230 Assert.AreEqual(10, res);
231 Assert.IsFalse(queue.TryDequeue(out res));
231 Assert.IsFalse(queue.TryDequeue(out res));
232
232
233 for (int i = 0; i < 1000; i++)
233 for (int i = 0; i < 1000; i++)
234 queue.Enqueue(i);
234 queue.Enqueue(i);
235
235
236 for (int i = 0; i < 1000; i++) {
236 for (int i = 0; i < 1000; i++) {
237 queue.TryDequeue(out res);
237 queue.TryDequeue(out res);
238 Assert.AreEqual(i, res);
238 Assert.AreEqual(i, res);
239 }
239 }
240
240
241 int writers = 0;
241 int writers = 0;
242 int readers = 0;
242 int readers = 0;
243 var stop = new ManualResetEvent(false);
243 var stop = new ManualResetEvent(false);
244 int total = 0;
244 int total = 0;
245
245
246 const int itemsPerWriter = 10000;
246 const int itemsPerWriter = 10000;
247 const int writersCount = 10;
247 const int writersCount = 10;
248
248
249 for (int i = 0; i < writersCount; i++) {
249 for (int i = 0; i < writersCount; i++) {
250 Interlocked.Increment(ref writers);
250 Interlocked.Increment(ref writers);
251 AsyncPool
251 AsyncPool
252 .RunThread(() => {
252 .RunThread(() => {
253 for (int ii = 0; ii < itemsPerWriter; ii++) {
253 for (int ii = 0; ii < itemsPerWriter; ii++) {
254 queue.Enqueue(1);
254 queue.Enqueue(1);
255 }
255 }
256 return 1;
256 return 1;
257 })
257 })
258 .On(() => Interlocked.Decrement(ref writers), PromiseEventType.All);
258 .On(() => Interlocked.Decrement(ref writers), PromiseEventType.All);
259 }
259 }
260
260
261 for (int i = 0; i < 10; i++) {
261 for (int i = 0; i < 10; i++) {
262 Interlocked.Increment(ref readers);
262 Interlocked.Increment(ref readers);
263 AsyncPool
263 AsyncPool
264 .RunThread(() => {
264 .RunThread(() => {
265 int t;
265 int t;
266 do {
266 do {
267 while (queue.TryDequeue(out t))
267 while (queue.TryDequeue(out t))
268 Interlocked.Add(ref total, t);
268 Interlocked.Add(ref total, t);
269 } while (writers > 0);
269 } while (writers > 0);
270 return 1;
270 return 1;
271 })
271 })
272 .On(() => {
272 .On(() => {
273 Interlocked.Decrement(ref readers);
273 Interlocked.Decrement(ref readers);
274 if (readers == 0)
274 if (readers == 0)
275 stop.Set();
275 stop.Set();
276 }, PromiseEventType.All);
276 }, PromiseEventType.All);
277 }
277 }
278
278
279 stop.WaitOne();
279 stop.WaitOne();
280
280
281 Assert.AreEqual(100000, total);
281 Assert.AreEqual(100000, total);
282 }
282 }
283
283
284 [TestMethod]
284 [TestMethod]
285 public void AsyncQueueTest() {
285 public void AsyncQueueTest() {
286 var queue = new AsyncQueue<int>();
286 var queue = new AsyncQueue<int>();
287 int res;
287 int res;
288
288
289 queue.Enqueue(10);
289 queue.Enqueue(10);
290 Assert.IsTrue(queue.TryDequeue(out res));
290 Assert.IsTrue(queue.TryDequeue(out res));
291 Assert.AreEqual(10, res);
291 Assert.AreEqual(10, res);
292 Assert.IsFalse(queue.TryDequeue(out res));
292 Assert.IsFalse(queue.TryDequeue(out res));
293
293
294 for (int i = 0; i < 1000; i++)
294 for (int i = 0; i < 1000; i++)
295 queue.Enqueue(i);
295 queue.Enqueue(i);
296
296
297 for (int i = 0; i < 1000; i++) {
297 for (int i = 0; i < 1000; i++) {
298 queue.TryDequeue(out res);
298 queue.TryDequeue(out res);
299 Assert.AreEqual(i, res);
299 Assert.AreEqual(i, res);
300 }
300 }
301
301
302 const int count = 10000000;
302 const int count = 10000000;
303
303
304 int res1 = 0, res2 = 0;
304 int res1 = 0, res2 = 0;
305 var t1 = Environment.TickCount;
305 var t1 = Environment.TickCount;
306
306
307 AsyncPool.RunThread(
307 AsyncPool.RunThread(
308 () => {
308 () => {
309 for (var i = 0; i < count; i++)
309 for (var i = 0; i < count; i++)
310 queue.Enqueue(1);
310 queue.Enqueue(1);
311 Console.WriteLine("done writer #1: {0} ms", Environment.TickCount - t1);
311 Console.WriteLine("done writer #1: {0} ms", Environment.TickCount - t1);
312 },
312 },
313 () => {
313 () => {
314 for (var i = 0; i < count; i++)
314 for (var i = 0; i < count; i++)
315 queue.Enqueue(2);
315 queue.Enqueue(2);
316 Console.WriteLine("done writer #2: {0} ms", Environment.TickCount - t1);
316 Console.WriteLine("done writer #2: {0} ms", Environment.TickCount - t1);
317 },
317 },
318 () => {
318 () => {
319 int temp;
319 int temp;
320 int i = 0;
320 int i = 0;
321 while (i < count)
321 while (i < count)
322 if (queue.TryDequeue(out temp)) {
322 if (queue.TryDequeue(out temp)) {
323 i++;
323 i++;
324 res1 += temp;
324 res1 += temp;
325 }
325 }
326 Console.WriteLine("done reader #1: {0} ms", Environment.TickCount - t1);
326 Console.WriteLine("done reader #1: {0} ms", Environment.TickCount - t1);
327 },
327 },
328 () => {
328 () => {
329 int temp;
329 int temp;
330 int i = 0;
330 int i = 0;
331 while (i < count)
331 while (i < count)
332 if (queue.TryDequeue(out temp)) {
332 if (queue.TryDequeue(out temp)) {
333 i++;
333 i++;
334 res2 += temp;
334 res2 += temp;
335 }
335 }
336 Console.WriteLine("done reader #2: {0} ms", Environment.TickCount - t1);
336 Console.WriteLine("done reader #2: {0} ms", Environment.TickCount - t1);
337 }
337 }
338 )
338 )
339 .Bundle()
339 .Bundle()
340 .Join();
340 .Join();
341
341
342 Assert.AreEqual(count * 3, res1 + res2);
342 Assert.AreEqual(count * 3, res1 + res2);
343
343
344 Console.WriteLine(
344 Console.WriteLine(
345 "done: {0} ms, summ#1: {1}, summ#2: {2}, total: {3}, count: {4}",
345 "done: {0} ms, summ#1: {1}, summ#2: {2}, total: {3}, count: {4}",
346 Environment.TickCount - t1,
346 Environment.TickCount - t1,
347 res1,
347 res1,
348 res2,
348 res2,
349 res1 + res2,
349 res1 + res2,
350 count
350 count
351 );
351 );
352 }
352 }
353
353
354 [TestMethod]
354 [TestMethod]
355 public void AsyncQueueBatchTest() {
355 public void AsyncQueueBatchTest() {
356 var queue = new AsyncQueue<int>();
356 var queue = new AsyncQueue<int>();
357
357
358 const int wBatch = 29;
358 const int wBatch = 29;
359 const int wCount = 400000;
359 const int wCount = 400000;
360 const int total = wBatch * wCount * 2;
360 const int total = wBatch * wCount * 2;
361 const int summ = wBatch * wCount * 3;
361 const int summ = wBatch * wCount * 3;
362
362
363 int r1 = 0, r2 = 0;
363 int r1 = 0, r2 = 0;
364 const int rBatch = 111;
364 const int rBatch = 111;
365 int read = 0;
365 int read = 0;
366
366
367 var t1 = Environment.TickCount;
367 var t1 = Environment.TickCount;
368
368
369 AsyncPool.RunThread(
369 AsyncPool.RunThread(
370 () => {
370 () => {
371 var buffer = new int[wBatch];
371 var buffer = new int[wBatch];
372 for(int i = 0; i<wBatch; i++)
372 for(int i = 0; i<wBatch; i++)
373 buffer[i] = 1;
373 buffer[i] = 1;
374
374
375 for(int i =0; i < wCount; i++)
375 for(int i =0; i < wCount; i++)
376 queue.EnqueueRange(buffer,0,wBatch);
376 queue.EnqueueRange(buffer,0,wBatch);
377 Console.WriteLine("done writer #1: {0} ms", Environment.TickCount - t1);
377 Console.WriteLine("done writer #1: {0} ms", Environment.TickCount - t1);
378 },
378 },
379 () => {
379 () => {
380 var buffer = new int[wBatch];
380 var buffer = new int[wBatch];
381 for(int i = 0; i<wBatch; i++)
381 for(int i = 0; i<wBatch; i++)
382 buffer[i] = 2;
382 buffer[i] = 2;
383
383
384 for(int i =0; i < wCount; i++)
384 for(int i =0; i < wCount; i++)
385 queue.EnqueueRange(buffer,0,wBatch);
385 queue.EnqueueRange(buffer,0,wBatch);
386 Console.WriteLine("done writer #2: {0} ms", Environment.TickCount - t1);
386 Console.WriteLine("done writer #2: {0} ms", Environment.TickCount - t1);
387 },
387 },
388 () => {
388 () => {
389 var buffer = new int[rBatch];
389 var buffer = new int[rBatch];
390
390
391 while(read < total) {
391 while(read < total) {
392 int actual;
392 int actual;
393 if (queue.TryDequeueRange(buffer,0,rBatch,out actual)) {
393 if (queue.TryDequeueRange(buffer,0,rBatch,out actual)) {
394 for(int i=0; i< actual; i++)
394 for(int i=0; i< actual; i++)
395 r1 += buffer[i];
395 r1 += buffer[i];
396 Interlocked.Add(ref read, actual);
396 Interlocked.Add(ref read, actual);
397 }
397 }
398 }
398 }
399
399
400 Console.WriteLine("done reader #1: {0} ms", Environment.TickCount - t1);
400 Console.WriteLine("done reader #1: {0} ms", Environment.TickCount - t1);
401 },
401 },
402 () => {
402 () => {
403 var buffer = new int[rBatch];
403 var buffer = new int[rBatch];
404
404
405 while(read < total) {
405 while(read < total) {
406 int actual;
406 int actual;
407 if (queue.TryDequeueRange(buffer,0,rBatch,out actual)) {
407 if (queue.TryDequeueRange(buffer,0,rBatch,out actual)) {
408 for(int i=0; i< actual; i++)
408 for(int i=0; i< actual; i++)
409 r2 += buffer[i];
409 r2 += buffer[i];
410 Interlocked.Add(ref read, actual);
410 Interlocked.Add(ref read, actual);
411 }
411 }
412 }
412 }
413
413
414 Console.WriteLine("done reader #2: {0} ms", Environment.TickCount - t1);
414 Console.WriteLine("done reader #2: {0} ms", Environment.TickCount - t1);
415 }
415 }
416 )
416 )
417 .Bundle()
417 .Bundle()
418 .Join();
418 .Join();
419
419
420 Assert.AreEqual(summ , r1 + r2);
420 Assert.AreEqual(summ , r1 + r2);
421
421
422 Console.WriteLine(
422 Console.WriteLine(
423 "done: {0} ms, summ#1: {1}, summ#2: {2}, total: {3}, count: {4}",
423 "done: {0} ms, summ#1: {1}, summ#2: {2}, total: {3}, count: {4}",
424 Environment.TickCount - t1,
424 Environment.TickCount - t1,
425 r1,
425 r1,
426 r2,
426 r2,
427 r1 + r2,
427 r1 + r2,
428 total
428 total
429 );
429 );
430 }
430 }
431
431
432 [TestMethod]
432 [TestMethod]
433 public void AsyncQueueChunkDequeueTest() {
433 public void AsyncQueueChunkDequeueTest() {
434 var queue = new AsyncQueue<int>();
434 var queue = new AsyncQueue<int>();
435
435
436 const int wBatch = 31;
436 const int wBatch = 31;
437 const int wCount = 200000;
437 const int wCount = 200000;
438 const int total = wBatch * wCount * 3;
438 const int total = wBatch * wCount * 3;
439 const int summ = wBatch * wCount * 6;
439 const int summ = wBatch * wCount * 6;
440
440
441 int r1 = 0, r2 = 0;
441 int r1 = 0, r2 = 0;
442 const int rBatch = 1024;
442 const int rBatch = 1024;
443 int read = 0;
443 int read = 0;
444
444
445 var t1 = Environment.TickCount;
445 var t1 = Environment.TickCount;
446
446
447 AsyncPool.RunThread(
447 AsyncPool.RunThread(
448 () => {
448 () => {
449 var buffer = new int[wBatch];
449 var buffer = new int[wBatch];
450 for(int i = 0; i<wBatch; i++)
450 for(int i = 0; i<wBatch; i++)
451 buffer[i] = 1;
451 buffer[i] = 1;
452
452
453 for(int i =0; i < wCount; i++)
453 for(int i =0; i < wCount; i++)
454 queue.EnqueueRange(buffer,0,wBatch);
454 queue.EnqueueRange(buffer,0,wBatch);
455 Console.WriteLine("done writer #1: {0} ms", Environment.TickCount - t1);
455 Console.WriteLine("done writer #1: {0} ms", Environment.TickCount - t1);
456 },
456 },
457 () => {
457 () => {
458 var buffer = new int[wBatch];
458 var buffer = new int[wBatch];
459 for(int i = 0; i<wBatch; i++)
459 for(int i = 0; i<wBatch; i++)
460 buffer[i] = 2;
460 buffer[i] = 2;
461
461
462 for(int i =0; i < wCount; i++)
462 for(int i =0; i < wCount; i++)
463 queue.EnqueueRange(buffer,0,wBatch);
463 queue.EnqueueRange(buffer,0,wBatch);
464 Console.WriteLine("done writer #2: {0} ms", Environment.TickCount - t1);
464 Console.WriteLine("done writer #2: {0} ms", Environment.TickCount - t1);
465 },
465 },
466 () => {
466 () => {
467 var buffer = new int[wBatch];
467 var buffer = new int[wBatch];
468 for(int i = 0; i<wBatch; i++)
468 for(int i = 0; i<wBatch; i++)
469 buffer[i] = 3;
469 buffer[i] = 3;
470
470
471 for(int i =0; i < wCount; i++)
471 for(int i =0; i < wCount; i++)
472 queue.EnqueueRange(buffer,0,wBatch);
472 queue.EnqueueRange(buffer,0,wBatch);
473 Console.WriteLine("done writer #3: {0} ms", Environment.TickCount - t1);
473 Console.WriteLine("done writer #3: {0} ms", Environment.TickCount - t1);
474 },
474 },
475 () => {
475 () => {
476 var buffer = new int[rBatch];
476 var buffer = new int[rBatch];
477 int count = 1;
477 int count = 1;
478 double avgchunk = 0;
478 double avgchunk = 0;
479 while(read < total) {
479 while(read < total) {
480 int actual;
480 int actual;
481 if (queue.TryDequeueChunk(buffer,0,rBatch,out actual)) {
481 if (queue.TryDequeueChunk(buffer,0,rBatch,out actual)) {
482 for(int i=0; i< actual; i++)
482 for(int i=0; i< actual; i++)
483 r2 += buffer[i];
483 r2 += buffer[i];
484 Interlocked.Add(ref read, actual);
484 Interlocked.Add(ref read, actual);
485 avgchunk = avgchunk*(count-1)/count + actual/(double)count;
485 avgchunk = avgchunk*(count-1)/count + actual/(double)count;
486 count ++;
486 count ++;
487 }
487 }
488 }
488 }
489
489
490 Console.WriteLine("done reader #2: {0} ms, avg chunk size: {1}", Environment.TickCount - t1, avgchunk);
490 Console.WriteLine("done reader #2: {0} ms, avg chunk size: {1}", Environment.TickCount - t1, avgchunk);
491 }
491 }
492 )
492 )
493 .Bundle()
493 .Bundle()
494 .Join();
494 .Join();
495
495
496 Assert.AreEqual(summ , r1 + r2);
496 Assert.AreEqual(summ , r1 + r2);
497
497
498 Console.WriteLine(
498 Console.WriteLine(
499 "done: {0} ms, summ#1: {1}, summ#2: {2}, total: {3}, count: {4}",
499 "done: {0} ms, summ#1: {1}, summ#2: {2}, total: {3}, count: {4}",
500 Environment.TickCount - t1,
500 Environment.TickCount - t1,
501 r1,
501 r1,
502 r2,
502 r2,
503 r1 + r2,
503 r1 + r2,
504 total
504 total
505 );
505 );
506 }
506 }
507
507
508 [TestMethod]
508 [TestMethod]
509 public void AsyncQueueDrainTest() {
509 public void AsyncQueueDrainTest() {
510 var queue = new AsyncQueue<int>();
510 var queue = new AsyncQueue<int>();
511
511
512 const int wBatch = 11;
512 const int wBatch = 11;
513 const int wCount = 200000;
513 const int wCount = 200000;
514 const int total = wBatch * wCount * 3;
514 const int total = wBatch * wCount * 3;
515 const int summ = wBatch * wCount * 3;
515 const int summ = wBatch * wCount * 3;
516
516
517 int r1 = 0, r2 = 0;
517 int r1 = 0, r2 = 0;
518 const int rBatch = 11;
518 const int rBatch = 11;
519 int read = 0;
519 int read = 0;
520
520
521 var t1 = Environment.TickCount;
521 var t1 = Environment.TickCount;
522
522
523 AsyncPool.RunThread(
523 AsyncPool.RunThread(
524 () => {
524 () => {
525 var buffer = new int[wBatch];
525 var buffer = new int[wBatch];
526 for(int i = 0; i<wBatch; i++)
526 for(int i = 0; i<wBatch; i++)
527 buffer[i] = 1;
527 buffer[i] = 1;
528
528
529 for(int i =0; i < wCount; i++)
529 for(int i =0; i < wCount; i++)
530 queue.EnqueueRange(buffer,0,wBatch);
530 queue.EnqueueRange(buffer,0,wBatch);
531 Console.WriteLine("done writer #1: {0} ms", Environment.TickCount - t1);
531 Console.WriteLine("done writer #1: {0} ms", Environment.TickCount - t1);
532 },
532 },
533 () => {
533 () => {
534 for(int i =0; i < wCount * wBatch; i++)
534 for(int i =0; i < wCount * wBatch; i++)
535 queue.Enqueue(1);
535 queue.Enqueue(1);
536 Console.WriteLine("done writer #2: {0} ms", Environment.TickCount - t1);
536 Console.WriteLine("done writer #2: {0} ms", Environment.TickCount - t1);
537 },
537 },
538 () => {
538 () => {
539 var buffer = new int[wBatch];
539 var buffer = new int[wBatch];
540 for(int i = 0; i<wBatch; i++)
540 for(int i = 0; i<wBatch; i++)
541 buffer[i] = 1;
541 buffer[i] = 1;
542
542
543 for(int i =0; i < wCount; i++)
543 for(int i =0; i < wCount; i++)
544 queue.EnqueueRange(buffer,0,wBatch);
544 queue.EnqueueRange(buffer,0,wBatch);
545 Console.WriteLine("done writer #3: {0} ms", Environment.TickCount - t1);
545 Console.WriteLine("done writer #3: {0} ms", Environment.TickCount - t1);
546 },
546 },
547 /*() => {
547 /*() => {
548 int temp;
548 int temp;
549 int count = 0;
549 int count = 0;
550 while (read < total)
550 while (read < total)
551 if (queue.TryDequeue(out temp)) {
551 if (queue.TryDequeue(out temp)) {
552 count++;
552 count++;
553 r1 += temp;
553 r1 += temp;
554 Interlocked.Increment(ref read);
554 Interlocked.Increment(ref read);
555 }
555 }
556 Console.WriteLine("done reader #1: {0} ms, {1} count", Environment.TickCount - t1, count);
556 Console.WriteLine("done reader #1: {0} ms, {1} count", Environment.TickCount - t1, count);
557 },*/
557 },*/
558 /*() => {
558 /*() => {
559 var buffer = new int[rBatch];
559 var buffer = new int[rBatch];
560 var count = 0;
560 var count = 0;
561 while(read < total) {
561 while(read < total) {
562 int actual;
562 int actual;
563 if (queue.TryDequeueRange(buffer,0,rBatch,out actual)) {
563 if (queue.TryDequeueRange(buffer,0,rBatch,out actual)) {
564 for(int i=0; i< actual; i++)
564 for(int i=0; i< actual; i++)
565 r1 += buffer[i];
565 r1 += buffer[i];
566 Interlocked.Add(ref read, actual);
566 Interlocked.Add(ref read, actual);
567 count += actual;
567 count += actual;
568 }
568 }
569 }
569 }
570
570
571 Console.WriteLine("done reader #1: {0} ms, {1} items", Environment.TickCount - t1, count);
571 Console.WriteLine("done reader #1: {0} ms, {1} items", Environment.TickCount - t1, count);
572 },*/
572 },*/
573 () => {
573 () => {
574 var count = 0;
574 var count = 0;
575 while(read < total) {
575 while(read < total) {
576 var buffer = queue.Drain();
576 var buffer = queue.Drain();
577 for(int i=0; i< buffer.Length; i++)
577 for(int i=0; i< buffer.Length; i++)
578 r1 += buffer[i];
578 r1 += buffer[i];
579 Interlocked.Add(ref read, buffer.Length);
579 Interlocked.Add(ref read, buffer.Length);
580 count += buffer.Length;
580 count += buffer.Length;
581 }
581 }
582 Console.WriteLine("done reader #1: {0} ms, {1} items", Environment.TickCount - t1, count);
582 Console.WriteLine("done reader #1: {0} ms, {1} items", Environment.TickCount - t1, count);
583 },
583 },
584 () => {
584 () => {
585 var count = 0;
585 var count = 0;
586 while(read < total) {
586 while(read < total) {
587 var buffer = queue.Drain();
587 var buffer = queue.Drain();
588 for(int i=0; i< buffer.Length; i++)
588 for(int i=0; i< buffer.Length; i++)
589 r2 += buffer[i];
589 r2 += buffer[i];
590 Interlocked.Add(ref read, buffer.Length);
590 Interlocked.Add(ref read, buffer.Length);
591 count += buffer.Length;
591 count += buffer.Length;
592 }
592 }
593 Console.WriteLine("done reader #2: {0} ms, {1} items", Environment.TickCount - t1, count);
593 Console.WriteLine("done reader #2: {0} ms, {1} items", Environment.TickCount - t1, count);
594 }
594 }
595 )
595 )
596 .Bundle()
596 .Bundle()
597 .Join();
597 .Join();
598
598
599 Assert.AreEqual(summ , r1 + r2);
599 Assert.AreEqual(summ , r1 + r2);
600
600
601 Console.WriteLine(
601 Console.WriteLine(
602 "done: {0} ms, summ#1: {1}, summ#2: {2}, total: {3}, count: {4}",
602 "done: {0} ms, summ#1: {1}, summ#2: {2}, total: {3}, count: {4}",
603 Environment.TickCount - t1,
603 Environment.TickCount - t1,
604 r1,
604 r1,
605 r2,
605 r2,
606 r1 + r2,
606 r1 + r2,
607 total
607 total
608 );
608 );
609 }
609 }
610
610
611 [TestMethod]
611 [TestMethod]
612 public void ParallelMapTest() {
612 public void ParallelMapTest() {
613
613
614 const int count = 100000;
614 const int count = 100000;
615
615
616 var args = new double[count];
616 var args = new double[count];
617 var rand = new Random();
617 var rand = new Random();
618
618
619 for (int i = 0; i < count; i++)
619 for (int i = 0; i < count; i++)
620 args[i] = rand.NextDouble();
620 args[i] = rand.NextDouble();
621
621
622 var t = Environment.TickCount;
622 var t = Environment.TickCount;
623 var res = args.ParallelMap(x => Math.Sin(x*x), 4).Join();
623 var res = args.ParallelMap(x => Math.Sin(x*x), 4).Join();
624
624
625 Console.WriteLine("Map complete in {0} ms", Environment.TickCount - t);
625 Console.WriteLine("Map complete in {0} ms", Environment.TickCount - t);
626
626
627 t = Environment.TickCount;
627 t = Environment.TickCount;
628 for (int i = 0; i < count; i++)
628 for (int i = 0; i < count; i++)
629 Assert.AreEqual(Math.Sin(args[i] * args[i]), res[i]);
629 Assert.AreEqual(Math.Sin(args[i] * args[i]), res[i]);
630 Console.WriteLine("Verified in {0} ms", Environment.TickCount - t);
630 Console.WriteLine("Verified in {0} ms", Environment.TickCount - t);
631 }
631 }
632
632
633 [TestMethod]
633 [TestMethod]
634 public void ChainedMapTest() {
634 public void ChainedMapTest() {
635
635
636 using (var pool = new WorkerPool(0,10,1)) {
636 using (var pool = new WorkerPool()) {
637 const int count = 10000;
637 const int count = 10000;
638
638
639 var args = new double[count];
639 var args = new double[count];
640 var rand = new Random();
640 var rand = new Random();
641
641
642 for (int i = 0; i < count; i++)
642 for (int i = 0; i < count; i++)
643 args[i] = rand.NextDouble();
643 args[i] = rand.NextDouble();
644
644
645 var t = Environment.TickCount;
645 var t = Environment.TickCount;
646 var res = args
646 var res = args
647 .ChainedMap(
647 .ChainedMap(
648 // Analysis disable once AccessToDisposedClosure
648 // Analysis disable once AccessToDisposedClosure
649 x => pool.Invoke(
649 x => pool.Invoke(
650 () => Math.Sin(x * x)
650 () => Math.Sin(x * x)
651 ),
651 ),
652 4
652 4
653 )
653 )
654 .Join();
654 .Join();
655
655
656 Console.WriteLine("Map complete in {0} ms", Environment.TickCount - t);
656 Console.WriteLine("Map complete in {0} ms", Environment.TickCount - t);
657
657
658 t = Environment.TickCount;
658 t = Environment.TickCount;
659 for (int i = 0; i < count; i++)
659 for (int i = 0; i < count; i++)
660 Assert.AreEqual(Math.Sin(args[i] * args[i]), res[i]);
660 Assert.AreEqual(Math.Sin(args[i] * args[i]), res[i]);
661 Console.WriteLine("Verified in {0} ms", Environment.TickCount - t);
661 Console.WriteLine("Verified in {0} ms", Environment.TickCount - t);
662 Console.WriteLine("Max workers: {0}", pool.MaxRunningThreads);
662 Console.WriteLine("Max workers: {0}", pool.MaxRunningThreads);
663 }
663 }
664 }
664 }
665
665
666 [TestMethod]
666 [TestMethod]
667 public void ParallelForEachTest() {
667 public void ParallelForEachTest() {
668
668
669 const int count = 100000;
669 const int count = 100000;
670
670
671 var args = new int[count];
671 var args = new int[count];
672 var rand = new Random();
672 var rand = new Random();
673
673
674 for (int i = 0; i < count; i++)
674 for (int i = 0; i < count; i++)
675 args[i] = (int)(rand.NextDouble() * 100);
675 args[i] = (int)(rand.NextDouble() * 100);
676
676
677 int result = 0;
677 int result = 0;
678
678
679 var t = Environment.TickCount;
679 var t = Environment.TickCount;
680 args.ParallelForEach(x => Interlocked.Add(ref result, x), 4).Join();
680 args.ParallelForEach(x => Interlocked.Add(ref result, x), 4).Join();
681
681
682 Console.WriteLine("Iteration complete in {0} ms, result: {1}", Environment.TickCount - t, result);
682 Console.WriteLine("Iteration complete in {0} ms, result: {1}", Environment.TickCount - t, result);
683
683
684 int result2 = 0;
684 int result2 = 0;
685
685
686 t = Environment.TickCount;
686 t = Environment.TickCount;
687 for (int i = 0; i < count; i++)
687 for (int i = 0; i < count; i++)
688 result2 += args[i];
688 result2 += args[i];
689 Assert.AreEqual(result2, result);
689 Assert.AreEqual(result2, result);
690 Console.WriteLine("Verified in {0} ms", Environment.TickCount - t);
690 Console.WriteLine("Verified in {0} ms", Environment.TickCount - t);
691 }
691 }
692
692
693 [TestMethod]
693 [TestMethod]
694 public void ComplexCase1Test() {
694 public void ComplexCase1Test() {
695 var flags = new bool[3];
695 var flags = new bool[3];
696
696
697 // op1 (aync 200ms) => op2 (async 200ms) => op3 (sync map)
697 // op1 (aync 200ms) => op2 (async 200ms) => op3 (sync map)
698
698
699 var step1 = PromiseHelper
699 var step1 = PromiseHelper
700 .Sleep(200, "Alan")
700 .Sleep(200, "Alan")
701 .On(() => flags[0] = true, PromiseEventType.Cancelled);
701 .On(() => flags[0] = true, PromiseEventType.Cancelled);
702 var p = step1
702 var p = step1
703 .Chain(x =>
703 .Chain(x =>
704 PromiseHelper
704 PromiseHelper
705 .Sleep(200, "Hi, " + x)
705 .Sleep(200, "Hi, " + x)
706 .Then(y => y)
706 .Then(y => y)
707 .On(() => flags[1] = true, PromiseEventType.Cancelled)
707 .On(() => flags[1] = true, PromiseEventType.Cancelled)
708 )
708 )
709 .On(() => flags[2] = true, PromiseEventType.Cancelled);
709 .On(() => flags[2] = true, PromiseEventType.Cancelled);
710 step1.Join();
710 step1.Join();
711 p.Cancel();
711 p.Cancel();
712 try {
712 try {
713 Assert.AreEqual(p.Join(), "Hi, Alan");
713 Assert.AreEqual(p.Join(), "Hi, Alan");
714 Assert.Fail("Shouldn't get here");
714 Assert.Fail("Shouldn't get here");
715 } catch (OperationCanceledException) {
715 } catch (OperationCanceledException) {
716 }
716 }
717
717
718 Assert.IsFalse(flags[0]);
718 Assert.IsFalse(flags[0]);
719 Assert.IsTrue(flags[1]);
719 Assert.IsTrue(flags[1]);
720 Assert.IsTrue(flags[2]);
720 Assert.IsTrue(flags[2]);
721 }
721 }
722
722
723 [TestMethod]
723 [TestMethod]
724 public void ChainedCancel1Test() {
724 public void ChainedCancel1Test() {
725 // ΠΏΡ€ΠΈ ΠΎΡ‚ΠΌΠ΅Π½Π΅ сцСплСнной асинхронной ΠΎΠΏΠ΅Ρ€Π°Ρ†ΠΈΠΈ всС ΠΎΠ±Π΅Ρ‰Π°Π½ΠΈΠ΅ Π΄ΠΎΠ»ΠΆΠ½ΠΎ
725 // ΠΏΡ€ΠΈ ΠΎΡ‚ΠΌΠ΅Π½Π΅ сцСплСнной асинхронной ΠΎΠΏΠ΅Ρ€Π°Ρ†ΠΈΠΈ всС ΠΎΠ±Π΅Ρ‰Π°Π½ΠΈΠ΅ Π΄ΠΎΠ»ΠΆΠ½ΠΎ
726 // Π·Π°Π²Π΅Ρ€ΡˆΠ°Ρ‚ΡŒΡΡ ошибкой OperationCanceledException
726 // Π·Π°Π²Π΅Ρ€ΡˆΠ°Ρ‚ΡŒΡΡ ошибкой OperationCanceledException
727 var p = PromiseHelper
727 var p = PromiseHelper
728 .Sleep(1, "Hi, HAL!")
728 .Sleep(1, "Hi, HAL!")
729 .Then(x => {
729 .Then(x => {
730 // запускаСм Π΄Π²Π΅ асинхронныС ΠΎΠΏΠ΅Ρ€Π°Ρ†ΠΈΠΈ
730 // запускаСм Π΄Π²Π΅ асинхронныС ΠΎΠΏΠ΅Ρ€Π°Ρ†ΠΈΠΈ
731 var result = PromiseHelper.Sleep(1000, "HEM ENABLED!!!");
731 var result = PromiseHelper.Sleep(1000, "HEM ENABLED!!!");
732 // вторая опСрация отмСняСт ΠΏΠ΅Ρ€Π²ΡƒΡŽ Π΄ΠΎ Π·Π°Π²Π΅Ρ€ΡˆΠ΅Π½ΠΈΡ
732 // вторая опСрация отмСняСт ΠΏΠ΅Ρ€Π²ΡƒΡŽ Π΄ΠΎ Π·Π°Π²Π΅Ρ€ΡˆΠ΅Π½ΠΈΡ
733 PromiseHelper
733 PromiseHelper
734 .Sleep(100, "HAL, STOP!")
734 .Sleep(100, "HAL, STOP!")
735 .Then(result.Cancel);
735 .Then(result.Cancel);
736 return result;
736 return result;
737 });
737 });
738 try {
738 try {
739 p.Join();
739 p.Join();
740 } catch (TargetInvocationException err) {
740 } catch (TargetInvocationException err) {
741 Assert.IsTrue(err.InnerException is OperationCanceledException);
741 Assert.IsTrue(err.InnerException is OperationCanceledException);
742 }
742 }
743 }
743 }
744
744
745 [TestMethod]
745 [TestMethod]
746 public void ChainedCancel2Test() {
746 public void ChainedCancel2Test() {
747 // ΠΏΡ€ΠΈ ΠΎΡ‚ΠΌΠ΅Π½Π΅ Ρ†Π΅ΠΏΠΎΡ‡ΠΊΠΈ ΠΎΠ±Π΅Ρ‰Π°Π½ΠΈΠΉ, Π²Π»ΠΎΠΆΠ΅Π½Π½Ρ‹Π΅ ΠΎΠΏΠ΅Ρ€Π°Ρ†ΠΈΠΈ Ρ‚Π°ΠΊΠΆΠ΅ Π΄ΠΎΠ»ΠΆΠ½Ρ‹ ΠΎΡ‚ΠΌΠ΅Π½ΡΡ‚ΡŒΡΡ
747 // ΠΏΡ€ΠΈ ΠΎΡ‚ΠΌΠ΅Π½Π΅ Ρ†Π΅ΠΏΠΎΡ‡ΠΊΠΈ ΠΎΠ±Π΅Ρ‰Π°Π½ΠΈΠΉ, Π²Π»ΠΎΠΆΠ΅Π½Π½Ρ‹Π΅ ΠΎΠΏΠ΅Ρ€Π°Ρ†ΠΈΠΈ Ρ‚Π°ΠΊΠΆΠ΅ Π΄ΠΎΠ»ΠΆΠ½Ρ‹ ΠΎΡ‚ΠΌΠ΅Π½ΡΡ‚ΡŒΡΡ
748 var pSurvive = new Promise<bool>();
748 var pSurvive = new Promise<bool>();
749 var hemStarted = new ManualResetEvent(false);
749 var hemStarted = new ManualResetEvent(false);
750 var p = PromiseHelper
750 var p = PromiseHelper
751 .Sleep(1, "Hi, HAL!")
751 .Sleep(1, "Hi, HAL!")
752 .Chain(x => {
752 .Chain(x => {
753 hemStarted.Set();
753 hemStarted.Set();
754 // запускаСм Π΄Π²Π΅ асинхронныС ΠΎΠΏΠ΅Ρ€Π°Ρ†ΠΈΠΈ
754 // запускаСм Π΄Π²Π΅ асинхронныС ΠΎΠΏΠ΅Ρ€Π°Ρ†ΠΈΠΈ
755 var result = PromiseHelper
755 var result = PromiseHelper
756 .Sleep(100000000, "HEM ENABLED!!!")
756 .Sleep(100000000, "HEM ENABLED!!!")
757 .Then(s => {
757 .Then(s => {
758 pSurvive.Resolve(false);
758 pSurvive.Resolve(false);
759 return s;
759 return s;
760 });
760 });
761
761
762 result
762 result
763 .Cancelled(() => pSurvive.Resolve(true));
763 .Cancelled(() => pSurvive.Resolve(true));
764
764
765 return result;
765 return result;
766 });
766 });
767
767
768 hemStarted.WaitOne();
768 hemStarted.WaitOne();
769 p.Cancel();
769 p.Cancel();
770
770
771 try {
771 try {
772 p.Join();
772 p.Join();
773 } catch (OperationCanceledException) {
773 } catch (OperationCanceledException) {
774 Assert.IsTrue(pSurvive.Join());
774 Assert.IsTrue(pSurvive.Join());
775 }
775 }
776 }
776 }
777 }
777 }
778 }
778 }
779
779
@@ -1,219 +1,289
1 using System;
1 using System;
2 using Implab.Parallels;
2 using Implab.Parallels;
3 using System.Threading;
3 using System.Threading;
4 using System.Reflection;
4 using System.Reflection;
5
5
6 namespace Implab {
6 namespace Implab {
7 public abstract class AbstractPromise<THandler> {
7 public abstract class AbstractPromise<THandler> {
8
8
9 const int UNRESOLVED_SATE = 0;
9 const int UNRESOLVED_SATE = 0;
10 const int TRANSITIONAL_STATE = 1;
10 const int TRANSITIONAL_STATE = 1;
11 const int SUCCEEDED_STATE = 2;
11 const int SUCCEEDED_STATE = 2;
12 const int REJECTED_STATE = 3;
12 const int REJECTED_STATE = 3;
13 const int CANCELLED_STATE = 4;
13 const int CANCELLED_STATE = 4;
14
14
15 const int RESERVED_HANDLERS_COUNT = 4;
16
15 int m_state;
17 int m_state;
16 Exception m_error;
18 Exception m_error;
19 int m_handlersCount;
17
20
18 readonly AsyncQueue<THandler> m_handlers = new AsyncQueue<THandler>();
21 readonly THandler[] m_handlers = new THandler[RESERVED_HANDLERS_COUNT];
22 MTQueue<THandler> m_extraHandlers;
23 int m_handlerPointer = -1;
24 int m_handlersCommited;
19
25
20 #region state managment
26 #region state managment
21 bool BeginTransit() {
27 bool BeginTransit() {
22 return UNRESOLVED_SATE == Interlocked.CompareExchange(ref m_state, TRANSITIONAL_STATE, UNRESOLVED_SATE);
28 return UNRESOLVED_SATE == Interlocked.CompareExchange(ref m_state, TRANSITIONAL_STATE, UNRESOLVED_SATE);
23 }
29 }
24
30
25 void CompleteTransit(int state) {
31 void CompleteTransit(int state) {
26 if (TRANSITIONAL_STATE != Interlocked.CompareExchange(ref m_state, state, TRANSITIONAL_STATE))
32 if (TRANSITIONAL_STATE != Interlocked.CompareExchange(ref m_state, state, TRANSITIONAL_STATE))
27 throw new InvalidOperationException("Can't complete transition when the object isn't in the transitional state");
33 throw new InvalidOperationException("Can't complete transition when the object isn't in the transitional state");
28 }
34 }
29
35
30 void WaitTransition() {
36 void WaitTransition() {
31 while (m_state == TRANSITIONAL_STATE) {
37 while (m_state == TRANSITIONAL_STATE) {
32 Thread.MemoryBarrier();
38 Thread.MemoryBarrier();
33 }
39 }
34 }
40 }
35
41
36 protected void BeginSetResult() {
42 protected void BeginSetResult() {
37 if (!BeginTransit()) {
43 if (!BeginTransit()) {
38 WaitTransition();
44 WaitTransition();
39 if (m_state != CANCELLED_STATE)
45 if (m_state != CANCELLED_STATE)
40 throw new InvalidOperationException("The promise is already resolved");
46 throw new InvalidOperationException("The promise is already resolved");
41 }
47 }
42 }
48 }
43
49
44 protected void EndSetResult() {
50 protected void EndSetResult() {
45 CompleteTransit(SUCCEEDED_STATE);
51 CompleteTransit(SUCCEEDED_STATE);
46 OnSuccess();
52 OnSuccess();
47 }
53 }
48
54
49
55
50
56
51 /// <summary>
57 /// <summary>
52 /// ВыполняСт ΠΎΠ±Π΅Ρ‰Π°Π½ΠΈΠ΅, сообщая ΠΎΠ± ошибкС
58 /// ВыполняСт ΠΎΠ±Π΅Ρ‰Π°Π½ΠΈΠ΅, сообщая ΠΎΠ± ошибкС
53 /// </summary>
59 /// </summary>
54 /// <remarks>
60 /// <remarks>
55 /// ΠŸΠΎΡΠΊΠΎΠ»ΡŒΠΊΡƒ ΠΎΠ±Π΅Ρ‰Π°Π½ΠΈΠ΅ Π΄ΠΎΠ»ΠΆΠ½ΠΎ Ρ€Π°Π±ΠΎΡ‚Π°Ρ‚ΡŒ Π² ΠΌΠ½ΠΎΠ³ΠΎΠΏΡ‚ΠΎΡ‡Π½ΠΎΠΉ срСдС, ΠΏΡ€ΠΈ Π΅Π³ΠΎ Π²Ρ‹ΠΏΠΎΠ»Π½Π΅Π½ΠΈΠΈ сразу нСсколько ΠΏΠΎΡ‚ΠΎΠΊΠΎΠ²
61 /// ΠŸΠΎΡΠΊΠΎΠ»ΡŒΠΊΡƒ ΠΎΠ±Π΅Ρ‰Π°Π½ΠΈΠ΅ Π΄ΠΎΠ»ΠΆΠ½ΠΎ Ρ€Π°Π±ΠΎΡ‚Π°Ρ‚ΡŒ Π² ΠΌΠ½ΠΎΠ³ΠΎΠΏΡ‚ΠΎΡ‡Π½ΠΎΠΉ срСдС, ΠΏΡ€ΠΈ Π΅Π³ΠΎ Π²Ρ‹ΠΏΠΎΠ»Π½Π΅Π½ΠΈΠΈ сразу нСсколько ΠΏΠΎΡ‚ΠΎΠΊΠΎΠ²
56 /// ΠΌΠΎΠ³Ρƒ Π²Π΅Ρ€Π½ΡƒΡ‚ΡŒ ΠΎΡˆΠΈΠ±ΠΊΡƒ, ΠΏΡ€ΠΈ этом Ρ‚ΠΎΠ»ΡŒΠΊΠΎ пСрвая Π±ΡƒΠ΄Π΅Ρ‚ использована Π² качСствС Ρ€Π΅Π·ΡƒΠ»ΡŒΡ‚Π°Ρ‚Π°, ΠΎΡΡ‚Π°Π»ΡŒΠ½Ρ‹Π΅
62 /// ΠΌΠΎΠ³Ρƒ Π²Π΅Ρ€Π½ΡƒΡ‚ΡŒ ΠΎΡˆΠΈΠ±ΠΊΡƒ, ΠΏΡ€ΠΈ этом Ρ‚ΠΎΠ»ΡŒΠΊΠΎ пСрвая Π±ΡƒΠ΄Π΅Ρ‚ использована Π² качСствС Ρ€Π΅Π·ΡƒΠ»ΡŒΡ‚Π°Ρ‚Π°, ΠΎΡΡ‚Π°Π»ΡŒΠ½Ρ‹Π΅
57 /// Π±ΡƒΠ΄ΡƒΡ‚ ΠΏΡ€ΠΎΠΈΠ³Π½ΠΎΡ€ΠΈΡ€ΠΎΠ²Π°Π½Ρ‹.
63 /// Π±ΡƒΠ΄ΡƒΡ‚ ΠΏΡ€ΠΎΠΈΠ³Π½ΠΎΡ€ΠΈΡ€ΠΎΠ²Π°Π½Ρ‹.
58 /// </remarks>
64 /// </remarks>
59 /// <param name="error">Π˜ΡΠΊΠ»ΡŽΡ‡Π΅Π½ΠΈΠ΅ возникшСС ΠΏΡ€ΠΈ Π²Ρ‹ΠΏΠΎΠ»Π½Π΅Π½ΠΈΠΈ ΠΎΠΏΠ΅Ρ€Π°Ρ†ΠΈΠΈ</param>
65 /// <param name="error">Π˜ΡΠΊΠ»ΡŽΡ‡Π΅Π½ΠΈΠ΅ возникшСС ΠΏΡ€ΠΈ Π²Ρ‹ΠΏΠΎΠ»Π½Π΅Π½ΠΈΠΈ ΠΎΠΏΠ΅Ρ€Π°Ρ†ΠΈΠΈ</param>
60 /// <exception cref="InvalidOperationException">Π”Π°Π½Π½ΠΎΠ΅ ΠΎΠ±Π΅Ρ‰Π°Π½ΠΈΠ΅ ΡƒΠΆΠ΅ Π²Ρ‹ΠΏΠΎΠ»Π½Π΅Π½ΠΎ</exception>
66 /// <exception cref="InvalidOperationException">Π”Π°Π½Π½ΠΎΠ΅ ΠΎΠ±Π΅Ρ‰Π°Π½ΠΈΠ΅ ΡƒΠΆΠ΅ Π²Ρ‹ΠΏΠΎΠ»Π½Π΅Π½ΠΎ</exception>
61 protected void SetError(Exception error) {
67 protected void SetError(Exception error) {
62 if (BeginTransit()) {
68 if (BeginTransit()) {
63 m_error = error is PromiseTransientException ? error.InnerException : error;
69 m_error = error is PromiseTransientException ? error.InnerException : error;
64 CompleteTransit(REJECTED_STATE);
70 CompleteTransit(REJECTED_STATE);
65 OnError();
71 OnError();
66 } else {
72 } else {
67 WaitTransition();
73 WaitTransition();
68 if (m_state == SUCCEEDED_STATE)
74 if (m_state == SUCCEEDED_STATE)
69 throw new InvalidOperationException("The promise is already resolved");
75 throw new InvalidOperationException("The promise is already resolved");
70 }
76 }
71 }
77 }
72
78
73 /// <summary>
79 /// <summary>
74 /// ΠžΡ‚ΠΌΠ΅Π½ΡΠ΅Ρ‚ ΠΎΠΏΠ΅Ρ€Π°Ρ†ΠΈΡŽ, Ссли это Π²ΠΎΠ·ΠΌΠΎΠΆΠ½ΠΎ.
80 /// ΠžΡ‚ΠΌΠ΅Π½ΡΠ΅Ρ‚ ΠΎΠΏΠ΅Ρ€Π°Ρ†ΠΈΡŽ, Ссли это Π²ΠΎΠ·ΠΌΠΎΠΆΠ½ΠΎ.
75 /// </summary>
81 /// </summary>
76 /// <remarks>Для опрСдСлСния Π±Ρ‹Π»Π° Π»ΠΈ опСрация ΠΎΡ‚ΠΌΠ΅Π½Π΅Π½Π° слСдуСт ΠΈΡΠΏΠΎΠ»ΡŒΠ·ΠΎΠ²Π°Ρ‚ΡŒ свойство <see cref="IsCancelled"/>.</remarks>
82 /// <remarks>Для опрСдСлСния Π±Ρ‹Π»Π° Π»ΠΈ опСрация ΠΎΡ‚ΠΌΠ΅Π½Π΅Π½Π° слСдуСт ΠΈΡΠΏΠΎΠ»ΡŒΠ·ΠΎΠ²Π°Ρ‚ΡŒ свойство <see cref="IsCancelled"/>.</remarks>
77 protected void SetCancelled() {
83 protected void SetCancelled() {
78 if (BeginTransit()) {
84 if (BeginTransit()) {
79 CompleteTransit(CANCELLED_STATE);
85 CompleteTransit(CANCELLED_STATE);
80 OnCancelled();
86 OnCancelled();
81 }
87 }
82 }
88 }
83
89
84 protected abstract void SignalSuccess(THandler handler);
90 protected abstract void SignalSuccess(THandler handler);
85
91
86 protected abstract void SignalError(THandler handler, Exception error);
92 protected abstract void SignalError(THandler handler, Exception error);
87
93
88 protected abstract void SignalCancelled(THandler handler);
94 protected abstract void SignalCancelled(THandler handler);
89
95
90 void OnSuccess() {
96 void OnSuccess() {
97 var hp = m_handlerPointer;
98 var slot = hp +1 ;
99 while (slot < m_handlersCommited) {
100 if (Interlocked.CompareExchange(ref m_handlerPointer, slot, hp) == hp) {
101 SignalSuccess(m_handlers[slot]);
102 }
103 hp = m_handlerPointer;
104 slot = hp +1 ;
105 }
106
107
108 if (m_extraHandlers != null) {
91 THandler handler;
109 THandler handler;
92 while (m_handlers.TryDequeue(out handler))
110 while (m_extraHandlers.TryDequeue(out handler))
93 SignalSuccess(handler);
111 SignalSuccess(handler);
94 }
112 }
113 }
95
114
96 void OnError() {
115 void OnError() {
116 var hp = m_handlerPointer;
117 var slot = hp +1 ;
118 while (slot < m_handlersCommited) {
119 if (Interlocked.CompareExchange(ref m_handlerPointer, slot, hp) == hp) {
120 SignalError(m_handlers[slot],m_error);
121 }
122 hp = m_handlerPointer;
123 slot = hp +1 ;
124 }
125
126 if (m_extraHandlers != null) {
97 THandler handler;
127 THandler handler;
98 while (m_handlers.TryDequeue(out handler))
128 while (m_extraHandlers.TryDequeue(out handler))
99 SignalError(handler,m_error);
129 SignalError(handler, m_error);
100 }
130 }
131 }
101
132
102 void OnCancelled() {
133 void OnCancelled() {
134 var hp = m_handlerPointer;
135 var slot = hp +1 ;
136 while (slot < m_handlersCommited) {
137 if (Interlocked.CompareExchange(ref m_handlerPointer, slot, hp) == hp) {
138 SignalCancelled(m_handlers[slot]);
139 }
140 hp = m_handlerPointer;
141 slot = hp +1 ;
142 }
143
144 if (m_extraHandlers != null) {
103 THandler handler;
145 THandler handler;
104 while (m_handlers.TryDequeue(out handler))
146 while (m_extraHandlers.TryDequeue(out handler))
105 SignalCancelled(handler);
147 SignalCancelled(handler);
106 }
148 }
149 }
107
150
108 #endregion
151 #endregion
109
152
110 protected abstract void Listen(PromiseEventType events, Action handler);
153 protected abstract void Listen(PromiseEventType events, Action handler);
111
154
112 #region synchronization traits
155 #region synchronization traits
113 protected void WaitResult(int timeout) {
156 protected void WaitResult(int timeout) {
114 if (!IsResolved) {
157 if (!IsResolved) {
115 var lk = new object();
158 var lk = new object();
116
159
117 Listen(PromiseEventType.All, () => {
160 Listen(PromiseEventType.All, () => {
118 lock(lk) {
161 lock(lk) {
119 Monitor.Pulse(lk);
162 Monitor.Pulse(lk);
120 }
163 }
121 });
164 });
122
165
123 lock (lk) {
166 lock (lk) {
124 while(!IsResolved) {
167 while(!IsResolved) {
125 if(!Monitor.Wait(lk,timeout))
168 if(!Monitor.Wait(lk,timeout))
126 throw new TimeoutException();
169 throw new TimeoutException();
127 }
170 }
128 }
171 }
129
172
130 }
173 }
131 switch (m_state) {
174 switch (m_state) {
132 case SUCCEEDED_STATE:
175 case SUCCEEDED_STATE:
133 return;
176 return;
134 case CANCELLED_STATE:
177 case CANCELLED_STATE:
135 throw new OperationCanceledException();
178 throw new OperationCanceledException();
136 case REJECTED_STATE:
179 case REJECTED_STATE:
137 throw new TargetInvocationException(m_error);
180 throw new TargetInvocationException(m_error);
138 default:
181 default:
139 throw new ApplicationException(String.Format("Invalid promise state {0}", m_state));
182 throw new ApplicationException(String.Format("Invalid promise state {0}", m_state));
140 }
183 }
141 }
184 }
142 #endregion
185 #endregion
143
186
144 #region handlers managment
187 #region handlers managment
145
188
146 protected void AddHandler(THandler handler) {
189 protected void AddHandler(THandler handler) {
147
190
148 if (IsResolved) {
191 if (m_state > 1) {
149 InvokeHandler(handler);
150
151 } else {
152 // the promise is in the resolved state, just invoke the handler
192 // the promise is in the resolved state, just invoke the handler
153 m_handlers.Enqueue(handler);
193 InvokeHandler(handler);
194 } else {
195 var slot = Interlocked.Increment(ref m_handlersCount) - 1;
196
197 if (slot < RESERVED_HANDLERS_COUNT) {
198 m_handlers[slot] = handler;
199
200 while (slot != Interlocked.CompareExchange(ref m_handlersCommited, slot + 1, slot)) {
201 }
154
202
203 if (m_state > 1) {
204 do {
205 var hp = m_handlerPointer;
206 slot = hp + 1;
207 if (slot < m_handlersCommited) {
208 if (Interlocked.CompareExchange(ref m_handlerPointer, slot, hp) != hp)
209 continue;
210 InvokeHandler(m_handlers[slot]);
211 }
212 break;
213 } while(true);
214 }
215 } else {
216 if (slot == RESERVED_HANDLERS_COUNT) {
217 m_extraHandlers = new MTQueue<THandler>();
218 } else {
219 while (m_extraHandlers == null)
220 Thread.MemoryBarrier();
221 }
155
222
156 if (IsResolved && m_handlers.TryDequeue(out handler))
223 m_extraHandlers.Enqueue(handler);
224
225 if (m_state > 1 && m_extraHandlers.TryDequeue(out handler))
157 // if the promise have been resolved while we was adding the handler to the queue
226 // if the promise have been resolved while we was adding the handler to the queue
158 // we can't guarantee that someone is still processing it
227 // we can't guarantee that someone is still processing it
159 // therefore we need to fetch a handler from the queue and execute it
228 // therefore we need to fetch a handler from the queue and execute it
160 // note that fetched handler may be not the one that we have added
229 // note that fetched handler may be not the one that we have added
161 // even we can fetch no handlers at all :)
230 // even we can fetch no handlers at all :)
162 InvokeHandler(handler);
231 InvokeHandler(handler);
163 }
232 }
164 }
233 }
234 }
165
235
166 protected void InvokeHandler(THandler handler) {
236 protected void InvokeHandler(THandler handler) {
167 switch (m_state) {
237 switch (m_state) {
168 case SUCCEEDED_STATE:
238 case SUCCEEDED_STATE:
169 SignalSuccess(handler);
239 SignalSuccess(handler);
170 break;
240 break;
171 case CANCELLED_STATE:
241 case CANCELLED_STATE:
172 SignalCancelled(handler);
242 SignalCancelled(handler);
173 break;
243 break;
174 case REJECTED_STATE:
244 case REJECTED_STATE:
175 SignalError(handler, m_error);
245 SignalError(handler, m_error);
176 break;
246 break;
177 default:
247 default:
178 throw new Exception(String.Format("Invalid promise state {0}", m_state));
248 throw new Exception(String.Format("Invalid promise state {0}", m_state));
179 }
249 }
180 }
250 }
181
251
182 #endregion
252 #endregion
183
253
184 #region IPromise implementation
254 #region IPromise implementation
185
255
186 public void Join(int timeout) {
256 public void Join(int timeout) {
187 WaitResult(timeout);
257 WaitResult(timeout);
188 }
258 }
189
259
190 public void Join() {
260 public void Join() {
191 WaitResult(-1);
261 WaitResult(-1);
192 }
262 }
193
263
194 public bool IsResolved {
264 public bool IsResolved {
195 get {
265 get {
196 Thread.MemoryBarrier();
266 Thread.MemoryBarrier();
197 return m_state > 1;
267 return m_state > 1;
198 }
268 }
199 }
269 }
200
270
201 public bool IsCancelled {
271 public bool IsCancelled {
202 get {
272 get {
203 Thread.MemoryBarrier();
273 Thread.MemoryBarrier();
204 return m_state == CANCELLED_STATE;
274 return m_state == CANCELLED_STATE;
205 }
275 }
206 }
276 }
207
277
208 #endregion
278 #endregion
209
279
210 #region ICancellable implementation
280 #region ICancellable implementation
211
281
212 public void Cancel() {
282 public void Cancel() {
213 SetCancelled();
283 SetCancelled();
214 }
284 }
215
285
216 #endregion
286 #endregion
217 }
287 }
218 }
288 }
219
289
@@ -1,625 +1,619
1 using System.Threading;
1 using System.Threading;
2 using System.Collections.Generic;
2 using System.Collections.Generic;
3 using System;
3 using System;
4 using System.Collections;
4 using System.Collections;
5 using System.Diagnostics;
5 using System.Diagnostics;
6
6
7 namespace Implab.Parallels {
7 namespace Implab.Parallels {
8 public class AsyncQueue<T> : IEnumerable<T> {
8 public class AsyncQueue<T> : IEnumerable<T> {
9 class Chunk {
9 class Chunk {
10 public Chunk next;
10 public Chunk next;
11
11
12 int m_low;
12 int m_low;
13 int m_hi;
13 int m_hi;
14 int m_alloc;
14 int m_alloc;
15 readonly int m_size;
15 readonly int m_size;
16 readonly T[] m_data;
16 readonly T[] m_data;
17
17
18 public Chunk(int size) {
18 public Chunk(int size) {
19 m_size = size;
19 m_size = size;
20 m_data = new T[size];
20 m_data = new T[size];
21 }
21 }
22
22
23 public Chunk(int size, T value) {
23 public Chunk(int size, T value) {
24 m_size = size;
24 m_size = size;
25 m_hi = 1;
25 m_hi = 1;
26 m_alloc = 1;
26 m_alloc = 1;
27 m_data = new T[size];
27 m_data = new T[size];
28 m_data[0] = value;
28 m_data[0] = value;
29 }
29 }
30
30
31 public Chunk(int size, T[] data, int offset, int length, int alloc) {
31 public Chunk(int size, T[] data, int offset, int length, int alloc) {
32 m_size = size;
32 m_size = size;
33 m_hi = length;
33 m_hi = length;
34 m_alloc = alloc;
34 m_alloc = alloc;
35 m_data = new T[size];
35 m_data = new T[size];
36 Array.Copy(data, offset, m_data, 0, length);
36 Array.Copy(data, offset, m_data, 0, length);
37 }
37 }
38
38
39 public int Low {
39 public int Low {
40 get { return m_low; }
40 get { return m_low; }
41 }
41 }
42
42
43 public int Hi {
43 public int Hi {
44 get { return m_hi; }
44 get { return m_hi; }
45 }
45 }
46
46
47 public bool TryEnqueue(T value, out bool extend) {
47 public bool TryEnqueue(T value, out bool extend) {
48 var alloc = Interlocked.Increment(ref m_alloc) - 1;
48 var alloc = Interlocked.Increment(ref m_alloc) - 1;
49
49
50 if (alloc >= m_size) {
50 if (alloc >= m_size) {
51 extend = alloc == m_size;
51 extend = alloc == m_size;
52 return false;
52 return false;
53 }
53 }
54
54
55 extend = false;
55 extend = false;
56 m_data[alloc] = value;
56 m_data[alloc] = value;
57
57
58 while (alloc != Interlocked.CompareExchange(ref m_hi, alloc + 1, alloc)) {
58 while (alloc != Interlocked.CompareExchange(ref m_hi, alloc + 1, alloc)) {
59 // spin wait for commit
59 // spin wait for commit
60 }
60 }
61 return true;
61 return true;
62 }
62 }
63
63
64 /// <summary>
64 /// <summary>
65 /// Prevents from allocating new space in the chunk and waits for all write operations to complete
65 /// Prevents from allocating new space in the chunk and waits for all write operations to complete
66 /// </summary>
66 /// </summary>
67 public void Commit() {
67 public void Commit() {
68 var actual = Math.Min(Interlocked.Exchange(ref m_alloc, m_size + 1), m_size);
68 var actual = Math.Min(Interlocked.Exchange(ref m_alloc, m_size + 1), m_size);
69
69
70 while (m_hi != actual)
70 while (m_hi != actual)
71 Thread.MemoryBarrier();
71 Thread.MemoryBarrier();
72 }
72 }
73
73
74 public bool TryDequeue(out T value, out bool recycle) {
74 public bool TryDequeue(out T value, out bool recycle) {
75 int low;
75 int low;
76 do {
76 do {
77 low = m_low;
77 low = m_low;
78 if (low >= m_hi) {
78 if (low >= m_hi) {
79 value = default(T);
79 value = default(T);
80 recycle = (low == m_size);
80 recycle = (low == m_size);
81 return false;
81 return false;
82 }
82 }
83 } while(low != Interlocked.CompareExchange(ref m_low, low + 1, low));
83 } while(low != Interlocked.CompareExchange(ref m_low, low + 1, low));
84
84
85 recycle = (low == m_size - 1);
85 recycle = (low == m_size - 1);
86 value = m_data[low];
86 value = m_data[low];
87
87
88 return true;
88 return true;
89 }
89 }
90
90
91 public bool TryEnqueueBatch(T[] batch, int offset, int length, out int enqueued, out bool extend) {
91 public bool TryEnqueueBatch(T[] batch, int offset, int length, out int enqueued, out bool extend) {
92 //int alloc;
92 //int alloc;
93 //int allocSize;
93 //int allocSize;
94
94
95 var alloc = Interlocked.Add(ref m_alloc, length) - length;
95 var alloc = Interlocked.Add(ref m_alloc, length) - length;
96 if (alloc > m_size) {
96 if (alloc > m_size) {
97 // the chunk is full and someone already
97 // the chunk is full and someone already
98 // creating the new one
98 // creating the new one
99 enqueued = 0; // nothing was added
99 enqueued = 0; // nothing was added
100 extend = false; // the caller shouldn't try to extend the queue
100 extend = false; // the caller shouldn't try to extend the queue
101 return false; // nothing was added
101 return false; // nothing was added
102 }
102 }
103
103
104 enqueued = Math.Min(m_size - alloc, length);
104 enqueued = Math.Min(m_size - alloc, length);
105 extend = length > enqueued;
105 extend = length > enqueued;
106
106
107 if (enqueued == 0)
107 if (enqueued == 0)
108 return false;
108 return false;
109
109
110
110
111 Array.Copy(batch, offset, m_data, alloc, enqueued);
111 Array.Copy(batch, offset, m_data, alloc, enqueued);
112
112
113 while (alloc != Interlocked.CompareExchange(ref m_hi, alloc + enqueued, alloc)) {
113 while (alloc != Interlocked.CompareExchange(ref m_hi, alloc + enqueued, alloc)) {
114 // spin wait for commit
114 // spin wait for commit
115 }
115 }
116
116
117 return true;
117 return true;
118 }
118 }
119
119
120 public bool TryDequeueBatch(T[] buffer, int offset, int length,out int dequeued, out bool recycle) {
120 public bool TryDequeueBatch(T[] buffer, int offset, int length,out int dequeued, out bool recycle) {
121 int low, hi, batchSize;
121 int low, hi, batchSize;
122
122
123 do {
123 do {
124 low = m_low;
124 low = m_low;
125 hi = m_hi;
125 hi = m_hi;
126 if (low >= hi) {
126 if (low >= hi) {
127 dequeued = 0;
127 dequeued = 0;
128 recycle = (low == m_size); // recycling could be restarted and we need to signal again
128 recycle = (low == m_size); // recycling could be restarted and we need to signal again
129 return false;
129 return false;
130 }
130 }
131 batchSize = Math.Min(hi - low, length);
131 batchSize = Math.Min(hi - low, length);
132 } while(low != Interlocked.CompareExchange(ref m_low, low + batchSize, low));
132 } while(low != Interlocked.CompareExchange(ref m_low, low + batchSize, low));
133
133
134 recycle = (low == m_size - batchSize);
134 recycle = (low == m_size - batchSize);
135 dequeued = batchSize;
135 dequeued = batchSize;
136
136
137 Array.Copy(m_data, low, buffer, offset, batchSize);
137 Array.Copy(m_data, low, buffer, offset, batchSize);
138
138
139 return true;
139 return true;
140 }
140 }
141
141
142 public T GetAt(int pos) {
142 public T GetAt(int pos) {
143 return m_data[pos];
143 return m_data[pos];
144 }
144 }
145 }
145 }
146
146
147 public const int DEFAULT_CHUNK_SIZE = 32;
147 public const int DEFAULT_CHUNK_SIZE = 32;
148 public const int MAX_CHUNK_SIZE = 262144;
148 public const int MAX_CHUNK_SIZE = 262144;
149
149
150 readonly int m_chunkSize = DEFAULT_CHUNK_SIZE;
151
152 Chunk m_first;
150 Chunk m_first;
153 Chunk m_last;
151 Chunk m_last;
154
152
155 public AsyncQueue() {
156 m_last = m_first = new Chunk(m_chunkSize);
157 }
158
159 /// <summary>
153 /// <summary>
160 /// Adds the specified value to the queue.
154 /// Adds the specified value to the queue.
161 /// </summary>
155 /// </summary>
162 /// <param name="value">Tha value which will be added to the queue.</param>
156 /// <param name="value">Tha value which will be added to the queue.</param>
163 public void Enqueue(T value) {
157 public void Enqueue(T value) {
164 var last = m_last;
158 var last = m_last;
165 // spin wait to the new chunk
159 // spin wait to the new chunk
166 bool extend = true;
160 bool extend = true;
167 while (last == null || !last.TryEnqueue(value, out extend)) {
161 while (last == null || !last.TryEnqueue(value, out extend)) {
168 // try to extend queue
162 // try to extend queue
169 if (extend || last == null) {
163 if (extend || last == null) {
170 var chunk = new Chunk(m_chunkSize, value);
164 var chunk = new Chunk(DEFAULT_CHUNK_SIZE, value);
171 if (EnqueueChunk(last, chunk))
165 if (EnqueueChunk(last, chunk))
172 break; // success! exit!
166 break; // success! exit!
173 last = m_last;
167 last = m_last;
174 } else {
168 } else {
175 while (last == m_last) {
169 while (last == m_last) {
176 Thread.MemoryBarrier();
170 Thread.MemoryBarrier();
177 }
171 }
178 last = m_last;
172 last = m_last;
179 }
173 }
180 }
174 }
181 }
175 }
182
176
183 /// <summary>
177 /// <summary>
184 /// Adds the specified data to the queue.
178 /// Adds the specified data to the queue.
185 /// </summary>
179 /// </summary>
186 /// <param name="data">The buffer which contains the data to be enqueued.</param>
180 /// <param name="data">The buffer which contains the data to be enqueued.</param>
187 /// <param name="offset">The offset of the data in the buffer.</param>
181 /// <param name="offset">The offset of the data in the buffer.</param>
188 /// <param name="length">The size of the data to read from the buffer.</param>
182 /// <param name="length">The size of the data to read from the buffer.</param>
189 public void EnqueueRange(T[] data, int offset, int length) {
183 public void EnqueueRange(T[] data, int offset, int length) {
190 if (data == null)
184 if (data == null)
191 throw new ArgumentNullException("data");
185 throw new ArgumentNullException("data");
192 if (offset < 0)
186 if (offset < 0)
193 throw new ArgumentOutOfRangeException("offset");
187 throw new ArgumentOutOfRangeException("offset");
194 if (length < 1 || offset + length > data.Length)
188 if (length < 1 || offset + length > data.Length)
195 throw new ArgumentOutOfRangeException("length");
189 throw new ArgumentOutOfRangeException("length");
196
190
197 var last = m_last;
191 var last = m_last;
198
192
199 bool extend;
193 bool extend;
200 int enqueued;
194 int enqueued;
201
195
202 while (length > 0) {
196 while (length > 0) {
203 extend = true;
197 extend = true;
204 if (last != null && last.TryEnqueueBatch(data, offset, length, out enqueued, out extend)) {
198 if (last != null && last.TryEnqueueBatch(data, offset, length, out enqueued, out extend)) {
205 length -= enqueued;
199 length -= enqueued;
206 offset += enqueued;
200 offset += enqueued;
207 }
201 }
208
202
209 if (extend) {
203 if (extend) {
210 // there was no enough space in the chunk
204 // there was no enough space in the chunk
211 // or there was no chunks in the queue
205 // or there was no chunks in the queue
212
206
213 while (length > 0) {
207 while (length > 0) {
214
208
215 var size = Math.Min(length, MAX_CHUNK_SIZE);
209 var size = Math.Min(length, MAX_CHUNK_SIZE);
216
210
217 var chunk = new Chunk(
211 var chunk = new Chunk(
218 Math.Max(size, m_chunkSize),
212 Math.Max(size, DEFAULT_CHUNK_SIZE),
219 data,
213 data,
220 offset,
214 offset,
221 size,
215 size,
222 length // length >= size
216 length // length >= size
223 );
217 );
224
218
225 if (!EnqueueChunk(last, chunk)) {
219 if (!EnqueueChunk(last, chunk)) {
226 // looks like the queue has been updated then proceed from the beginning
220 // looks like the queue has been updated then proceed from the beginning
227 last = m_last;
221 last = m_last;
228 break;
222 break;
229 }
223 }
230
224
231 // we have successfully added the new chunk
225 // we have successfully added the new chunk
232 last = chunk;
226 last = chunk;
233 length -= size;
227 length -= size;
234 offset += size;
228 offset += size;
235 }
229 }
236 } else {
230 } else {
237 // we don't need to extend the queue, if we successfully enqueued data
231 // we don't need to extend the queue, if we successfully enqueued data
238 if (length == 0)
232 if (length == 0)
239 break;
233 break;
240
234
241 // if we need to wait while someone is extending the queue
235 // if we need to wait while someone is extending the queue
242 // spinwait
236 // spinwait
243 while (last == m_last) {
237 while (last == m_last) {
244 Thread.MemoryBarrier();
238 Thread.MemoryBarrier();
245 }
239 }
246
240
247 last = m_last;
241 last = m_last;
248 }
242 }
249 }
243 }
250 }
244 }
251
245
252 /// <summary>
246 /// <summary>
253 /// Tries to retrieve the first element from the queue.
247 /// Tries to retrieve the first element from the queue.
254 /// </summary>
248 /// </summary>
255 /// <returns><c>true</c>, if element is dequeued, <c>false</c> otherwise.</returns>
249 /// <returns><c>true</c>, if element is dequeued, <c>false</c> otherwise.</returns>
256 /// <param name="value">The value of the dequeued element.</param>
250 /// <param name="value">The value of the dequeued element.</param>
257 public bool TryDequeue(out T value) {
251 public bool TryDequeue(out T value) {
258 var chunk = m_first;
252 var chunk = m_first;
259 bool recycle;
253 bool recycle;
260 while (chunk != null) {
254 while (chunk != null) {
261
255
262 var result = chunk.TryDequeue(out value, out recycle);
256 var result = chunk.TryDequeue(out value, out recycle);
263
257
264 if (recycle) // this chunk is waste
258 if (recycle) // this chunk is waste
265 RecycleFirstChunk(chunk);
259 RecycleFirstChunk(chunk);
266 else
260 else
267 return result; // this chunk is usable and returned actual result
261 return result; // this chunk is usable and returned actual result
268
262
269 if (result) // this chunk is waste but the true result is always actual
263 if (result) // this chunk is waste but the true result is always actual
270 return true;
264 return true;
271
265
272 // try again
266 // try again
273 chunk = m_first;
267 chunk = m_first;
274 }
268 }
275
269
276 // the queue is empty
270 // the queue is empty
277 value = default(T);
271 value = default(T);
278 return false;
272 return false;
279 }
273 }
280
274
281 /// <summary>
275 /// <summary>
282 /// Tries to dequeue the specified amount of data from the queue.
276 /// Tries to dequeue the specified amount of data from the queue.
283 /// </summary>
277 /// </summary>
284 /// <returns><c>true</c>, if data was deuqueued, <c>false</c> otherwise.</returns>
278 /// <returns><c>true</c>, if data was deuqueued, <c>false</c> otherwise.</returns>
285 /// <param name="buffer">The buffer to which the data will be written.</param>
279 /// <param name="buffer">The buffer to which the data will be written.</param>
286 /// <param name="offset">The offset in the buffer at which the data will be written.</param>
280 /// <param name="offset">The offset in the buffer at which the data will be written.</param>
287 /// <param name="length">The maximum amount of data to be retrieved.</param>
281 /// <param name="length">The maximum amount of data to be retrieved.</param>
288 /// <param name="dequeued">The actual amout of the retrieved data.</param>
282 /// <param name="dequeued">The actual amout of the retrieved data.</param>
289 public bool TryDequeueRange(T[] buffer, int offset, int length, out int dequeued) {
283 public bool TryDequeueRange(T[] buffer, int offset, int length, out int dequeued) {
290 if (buffer == null)
284 if (buffer == null)
291 throw new ArgumentNullException("buffer");
285 throw new ArgumentNullException("buffer");
292 if (offset < 0)
286 if (offset < 0)
293 throw new ArgumentOutOfRangeException("offset");
287 throw new ArgumentOutOfRangeException("offset");
294 if (length < 1 || offset + length > buffer.Length)
288 if (length < 1 || offset + length > buffer.Length)
295 throw new ArgumentOutOfRangeException("length");
289 throw new ArgumentOutOfRangeException("length");
296
290
297 var chunk = m_first;
291 var chunk = m_first;
298 bool recycle;
292 bool recycle;
299 dequeued = 0;
293 dequeued = 0;
300 while (chunk != null) {
294 while (chunk != null) {
301
295
302 int actual;
296 int actual;
303 if (chunk.TryDequeueBatch(buffer, offset, length, out actual, out recycle)) {
297 if (chunk.TryDequeueBatch(buffer, offset, length, out actual, out recycle)) {
304 offset += actual;
298 offset += actual;
305 length -= actual;
299 length -= actual;
306 dequeued += actual;
300 dequeued += actual;
307 }
301 }
308
302
309 if (recycle) // this chunk is waste
303 if (recycle) // this chunk is waste
310 RecycleFirstChunk(chunk);
304 RecycleFirstChunk(chunk);
311 else if (actual == 0)
305 else if (actual == 0)
312 break; // the chunk is usable, but doesn't contain any data (it's the last chunk in the queue)
306 break; // the chunk is usable, but doesn't contain any data (it's the last chunk in the queue)
313
307
314 if (length == 0)
308 if (length == 0)
315 return true;
309 return true;
316
310
317 // we still may dequeue something
311 // we still may dequeue something
318 // try again
312 // try again
319 chunk = m_first;
313 chunk = m_first;
320 }
314 }
321
315
322 return dequeued != 0;
316 return dequeued != 0;
323 }
317 }
324
318
325 /// <summary>
319 /// <summary>
326 /// Tries to dequeue all remaining data in the first chunk.
320 /// Tries to dequeue all remaining data in the first chunk.
327 /// </summary>
321 /// </summary>
328 /// <returns><c>true</c>, if data was dequeued, <c>false</c> otherwise.</returns>
322 /// <returns><c>true</c>, if data was dequeued, <c>false</c> otherwise.</returns>
329 /// <param name="buffer">The buffer to which the data will be written.</param>
323 /// <param name="buffer">The buffer to which the data will be written.</param>
330 /// <param name="offset">The offset in the buffer at which the data will be written.</param>
324 /// <param name="offset">The offset in the buffer at which the data will be written.</param>
331 /// <param name="length">Tha maximum amount of the data to be dequeued.</param>
325 /// <param name="length">Tha maximum amount of the data to be dequeued.</param>
332 /// <param name="dequeued">The actual amount of the dequeued data.</param>
326 /// <param name="dequeued">The actual amount of the dequeued data.</param>
333 public bool TryDequeueChunk(T[] buffer, int offset, int length, out int dequeued) {
327 public bool TryDequeueChunk(T[] buffer, int offset, int length, out int dequeued) {
334 if (buffer == null)
328 if (buffer == null)
335 throw new ArgumentNullException("buffer");
329 throw new ArgumentNullException("buffer");
336 if (offset < 0)
330 if (offset < 0)
337 throw new ArgumentOutOfRangeException("offset");
331 throw new ArgumentOutOfRangeException("offset");
338 if (length < 1 || offset + length > buffer.Length)
332 if (length < 1 || offset + length > buffer.Length)
339 throw new ArgumentOutOfRangeException("length");
333 throw new ArgumentOutOfRangeException("length");
340
334
341 var chunk = m_first;
335 var chunk = m_first;
342 bool recycle;
336 bool recycle;
343 dequeued = 0;
337 dequeued = 0;
344
338
345 while (chunk != null) {
339 while (chunk != null) {
346
340
347 int actual;
341 int actual;
348 if (chunk.TryDequeueBatch(buffer, offset, length, out actual, out recycle)) {
342 if (chunk.TryDequeueBatch(buffer, offset, length, out actual, out recycle)) {
349 dequeued = actual;
343 dequeued = actual;
350 }
344 }
351
345
352 if (recycle) // this chunk is waste
346 if (recycle) // this chunk is waste
353 RecycleFirstChunk(chunk);
347 RecycleFirstChunk(chunk);
354
348
355 // if we have dequeued any data, then return
349 // if we have dequeued any data, then return
356 if (dequeued != 0)
350 if (dequeued != 0)
357 return true;
351 return true;
358
352
359 // we still may dequeue something
353 // we still may dequeue something
360 // try again
354 // try again
361 chunk = m_first;
355 chunk = m_first;
362 }
356 }
363
357
364 return false;
358 return false;
365 }
359 }
366
360
367 bool EnqueueChunk(Chunk last, Chunk chunk) {
361 bool EnqueueChunk(Chunk last, Chunk chunk) {
368 if (Interlocked.CompareExchange(ref m_last, chunk, last) != last)
362 if (Interlocked.CompareExchange(ref m_last, chunk, last) != last)
369 return false;
363 return false;
370
364
371 if (last != null)
365 if (last != null)
372 last.next = chunk;
366 last.next = chunk;
373 else {
367 else {
374 m_first = chunk;
368 m_first = chunk;
375 }
369 }
376 return true;
370 return true;
377 }
371 }
378
372
379 void RecycleFirstChunk(Chunk first) {
373 void RecycleFirstChunk(Chunk first) {
380 var next = first.next;
374 var next = first.next;
381
375
382 if (first != Interlocked.CompareExchange(ref m_first, next, first))
376 if (first != Interlocked.CompareExchange(ref m_first, next, first))
383 return;
377 return;
384
378
385 if (next == null) {
379 if (next == null) {
386
380
387 if (first != Interlocked.CompareExchange(ref m_last, null, first)) {
381 if (first != Interlocked.CompareExchange(ref m_last, null, first)) {
388 /*while (first.next == null)
382 /*while (first.next == null)
389 Thread.MemoryBarrier();*/
383 Thread.MemoryBarrier();*/
390
384
391 // race
385 // race
392 // someone already updated the tail, restore the pointer to the queue head
386 // someone already updated the tail, restore the pointer to the queue head
393 m_first = first;
387 m_first = first;
394 }
388 }
395 // the tail is updated
389 // the tail is updated
396 }
390 }
397
391
398 // we need to update the head
392 // we need to update the head
399 //Interlocked.CompareExchange(ref m_first, next, first);
393 //Interlocked.CompareExchange(ref m_first, next, first);
400 // if the head is already updated then give up
394 // if the head is already updated then give up
401 //return;
395 //return;
402
396
403 }
397 }
404
398
405 public void Clear() {
399 public void Clear() {
406 // start the new queue
400 // start the new queue
407 var chunk = new Chunk(m_chunkSize);
401 var chunk = new Chunk(DEFAULT_CHUNK_SIZE);
408
402
409 do {
403 do {
410 Thread.MemoryBarrier();
404 Thread.MemoryBarrier();
411 var first = m_first;
405 var first = m_first;
412 var last = m_last;
406 var last = m_last;
413
407
414 if (last == null) // nothing to clear
408 if (last == null) // nothing to clear
415 return;
409 return;
416
410
417 if (first == null || (first.next == null && first != last)) // inconcistency
411 if (first == null || (first.next == null && first != last)) // inconcistency
418 continue;
412 continue;
419
413
420 // here we will create inconsistency which will force others to spin
414 // here we will create inconsistency which will force others to spin
421 // and prevent from fetching. chunk.next = null
415 // and prevent from fetching. chunk.next = null
422 if (first != Interlocked.CompareExchange(ref m_first, chunk, first))
416 if (first != Interlocked.CompareExchange(ref m_first, chunk, first))
423 continue;// inconsistent
417 continue;// inconsistent
424
418
425 m_last = chunk;
419 m_last = chunk;
426
420
427 return;
421 return;
428
422
429 } while(true);
423 } while(true);
430 }
424 }
431
425
432 public T[] Drain() {
426 public T[] Drain() {
433 // start the new queue
427 // start the new queue
434 var chunk = new Chunk(m_chunkSize);
428 var chunk = new Chunk(DEFAULT_CHUNK_SIZE);
435
429
436 do {
430 do {
437 Thread.MemoryBarrier();
431 Thread.MemoryBarrier();
438 var first = m_first;
432 var first = m_first;
439 var last = m_last;
433 var last = m_last;
440
434
441 if (last == null)
435 if (last == null)
442 return new T[0];
436 return new T[0];
443
437
444 if (first == null || (first.next == null && first != last))
438 if (first == null || (first.next == null && first != last))
445 continue;
439 continue;
446
440
447 // here we will create inconsistency which will force others to spin
441 // here we will create inconsistency which will force others to spin
448 // and prevent from fetching. chunk.next = null
442 // and prevent from fetching. chunk.next = null
449 if (first != Interlocked.CompareExchange(ref m_first, chunk, first))
443 if (first != Interlocked.CompareExchange(ref m_first, chunk, first))
450 continue;// inconsistent
444 continue;// inconsistent
451
445
452 last = Interlocked.Exchange(ref m_last, chunk);
446 last = Interlocked.Exchange(ref m_last, chunk);
453
447
454 return ReadChunks(first, last);
448 return ReadChunks(first, last);
455
449
456 } while(true);
450 } while(true);
457 }
451 }
458
452
459 T[] ReadChunks(Chunk chunk, object last) {
453 T[] ReadChunks(Chunk chunk, object last) {
460 var result = new List<T>();
454 var result = new List<T>();
461 var buffer = new T[m_chunkSize];
455 var buffer = new T[DEFAULT_CHUNK_SIZE];
462 int actual;
456 int actual;
463 bool recycle;
457 bool recycle;
464 while (chunk != null) {
458 while (chunk != null) {
465 // ensure all write operations on the chunk are complete
459 // ensure all write operations on the chunk are complete
466 chunk.Commit();
460 chunk.Commit();
467
461
468 // we need to read the chunk using this way
462 // we need to read the chunk using this way
469 // since some client still may completing the dequeue
463 // since some client still may completing the dequeue
470 // operation, such clients most likely won't get results
464 // operation, such clients most likely won't get results
471 while (chunk.TryDequeueBatch(buffer, 0, buffer.Length, out actual, out recycle))
465 while (chunk.TryDequeueBatch(buffer, 0, buffer.Length, out actual, out recycle))
472 result.AddRange(new ArraySegmentCollection(buffer, 0, actual));
466 result.AddRange(new ArraySegmentCollection(buffer, 0, actual));
473
467
474 if (chunk == last) {
468 if (chunk == last) {
475 chunk = null;
469 chunk = null;
476 } else {
470 } else {
477 while (chunk.next == null)
471 while (chunk.next == null)
478 Thread.MemoryBarrier();
472 Thread.MemoryBarrier();
479 chunk = chunk.next;
473 chunk = chunk.next;
480 }
474 }
481 }
475 }
482
476
483 return result.ToArray();
477 return result.ToArray();
484 }
478 }
485
479
486 struct ArraySegmentCollection : ICollection<T> {
480 struct ArraySegmentCollection : ICollection<T> {
487 readonly T[] m_data;
481 readonly T[] m_data;
488 readonly int m_offset;
482 readonly int m_offset;
489 readonly int m_length;
483 readonly int m_length;
490
484
491 public ArraySegmentCollection(T[] data, int offset, int length) {
485 public ArraySegmentCollection(T[] data, int offset, int length) {
492 m_data = data;
486 m_data = data;
493 m_offset = offset;
487 m_offset = offset;
494 m_length = length;
488 m_length = length;
495 }
489 }
496
490
497 #region ICollection implementation
491 #region ICollection implementation
498
492
499 public void Add(T item) {
493 public void Add(T item) {
500 throw new InvalidOperationException();
494 throw new InvalidOperationException();
501 }
495 }
502
496
503 public void Clear() {
497 public void Clear() {
504 throw new InvalidOperationException();
498 throw new InvalidOperationException();
505 }
499 }
506
500
507 public bool Contains(T item) {
501 public bool Contains(T item) {
508 return false;
502 return false;
509 }
503 }
510
504
511 public void CopyTo(T[] array, int arrayIndex) {
505 public void CopyTo(T[] array, int arrayIndex) {
512 Array.Copy(m_data,m_offset,array,arrayIndex, m_length);
506 Array.Copy(m_data,m_offset,array,arrayIndex, m_length);
513 }
507 }
514
508
515 public bool Remove(T item) {
509 public bool Remove(T item) {
516 throw new NotImplementedException();
510 throw new NotImplementedException();
517 }
511 }
518
512
519 public int Count {
513 public int Count {
520 get {
514 get {
521 return m_length;
515 return m_length;
522 }
516 }
523 }
517 }
524
518
525 public bool IsReadOnly {
519 public bool IsReadOnly {
526 get {
520 get {
527 return true;
521 return true;
528 }
522 }
529 }
523 }
530
524
531 #endregion
525 #endregion
532
526
533 #region IEnumerable implementation
527 #region IEnumerable implementation
534
528
535 public IEnumerator<T> GetEnumerator() {
529 public IEnumerator<T> GetEnumerator() {
536 for (int i = m_offset; i < m_length + m_offset; i++)
530 for (int i = m_offset; i < m_length + m_offset; i++)
537 yield return m_data[i];
531 yield return m_data[i];
538 }
532 }
539
533
540 #endregion
534 #endregion
541
535
542 #region IEnumerable implementation
536 #region IEnumerable implementation
543
537
544 IEnumerator IEnumerable.GetEnumerator() {
538 IEnumerator IEnumerable.GetEnumerator() {
545 return GetEnumerator();
539 return GetEnumerator();
546 }
540 }
547
541
548 #endregion
542 #endregion
549 }
543 }
550
544
551 #region IEnumerable implementation
545 #region IEnumerable implementation
552
546
553 class Enumerator : IEnumerator<T> {
547 class Enumerator : IEnumerator<T> {
554 Chunk m_current;
548 Chunk m_current;
555 int m_pos = -1;
549 int m_pos = -1;
556
550
557 public Enumerator(Chunk fisrt) {
551 public Enumerator(Chunk fisrt) {
558 m_current = fisrt;
552 m_current = fisrt;
559 }
553 }
560
554
561 #region IEnumerator implementation
555 #region IEnumerator implementation
562
556
563 public bool MoveNext() {
557 public bool MoveNext() {
564 if (m_current == null)
558 if (m_current == null)
565 return false;
559 return false;
566
560
567 if (m_pos == -1)
561 if (m_pos == -1)
568 m_pos = m_current.Low;
562 m_pos = m_current.Low;
569 else
563 else
570 m_pos++;
564 m_pos++;
571 if (m_pos == m_current.Hi) {
565 if (m_pos == m_current.Hi) {
572 m_pos = 0;
566 m_pos = 0;
573 m_current = m_current.next;
567 m_current = m_current.next;
574 }
568 }
575
569
576 return true;
570 return true;
577 }
571 }
578
572
579 public void Reset() {
573 public void Reset() {
580 throw new NotSupportedException();
574 throw new NotSupportedException();
581 }
575 }
582
576
583 object IEnumerator.Current {
577 object IEnumerator.Current {
584 get {
578 get {
585 return Current;
579 return Current;
586 }
580 }
587 }
581 }
588
582
589 #endregion
583 #endregion
590
584
591 #region IDisposable implementation
585 #region IDisposable implementation
592
586
593 public void Dispose() {
587 public void Dispose() {
594 }
588 }
595
589
596 #endregion
590 #endregion
597
591
598 #region IEnumerator implementation
592 #region IEnumerator implementation
599
593
600 public T Current {
594 public T Current {
601 get {
595 get {
602 if (m_pos == -1 || m_current == null)
596 if (m_pos == -1 || m_current == null)
603 throw new InvalidOperationException();
597 throw new InvalidOperationException();
604 return m_current.GetAt(m_pos);
598 return m_current.GetAt(m_pos);
605 }
599 }
606 }
600 }
607
601
608 #endregion
602 #endregion
609 }
603 }
610
604
611 public IEnumerator<T> GetEnumerator() {
605 public IEnumerator<T> GetEnumerator() {
612 return new Enumerator(m_first);
606 return new Enumerator(m_first);
613 }
607 }
614
608
615 #endregion
609 #endregion
616
610
617 #region IEnumerable implementation
611 #region IEnumerable implementation
618
612
619 IEnumerator IEnumerable.GetEnumerator() {
613 IEnumerator IEnumerable.GetEnumerator() {
620 return GetEnumerator();
614 return GetEnumerator();
621 }
615 }
622
616
623 #endregion
617 #endregion
624 }
618 }
625 }
619 }
@@ -1,199 +1,197
1 using System;
1 using System;
2 using System.Threading;
2 using System.Threading;
3
3
4 namespace Implab.Parallels {
4 namespace Implab.Parallels {
5 public abstract class DispatchPool<TUnit> : IDisposable {
5 public abstract class DispatchPool<TUnit> : IDisposable {
6 readonly int m_minThreadsLimit;
6 readonly int m_minThreadsLimit;
7 readonly int m_maxThreadsLimit;
7 readonly int m_maxThreadsLimit;
8 readonly int m_releaseTimeout = 1000; // the timeout while the working thread will wait for the new tasks before exit
8 readonly int m_releaseTimeout = 1000; // the timeout while the working thread will wait for the new tasks before exit
9
9
10 int m_threads; // the current size of the pool
10 int m_threads; // the current size of the pool
11 int m_maxRunningThreads; // the meximum reached size of the pool
11 int m_maxRunningThreads; // the meximum reached size of the pool
12 int m_exit; // the pool is going to shutdown, all unused workers are released
12 int m_exit; // the pool is going to shutdown, all unused workers are released
13
13
14 readonly object m_signal = new object(); // used to pulse waiting threads
14 readonly object m_signal = new object(); // used to pulse waiting threads
15
15
16 protected DispatchPool(int min, int max) {
16 protected DispatchPool(int min, int max) {
17 if (min < 0)
17 if (min < 0)
18 throw new ArgumentOutOfRangeException("min");
18 throw new ArgumentOutOfRangeException("min");
19 if (max <= 0)
19 if (max <= 0)
20 throw new ArgumentOutOfRangeException("max");
20 throw new ArgumentOutOfRangeException("max");
21
21
22 if (min > max)
22 if (min > max)
23 min = max;
23 min = max;
24 m_minThreadsLimit = min;
24 m_minThreadsLimit = min;
25 m_maxThreadsLimit = max;
25 m_maxThreadsLimit = max;
26 }
26 }
27
27
28 protected DispatchPool(int threads)
28 protected DispatchPool(int threads)
29 : this(threads, threads) {
29 : this(threads, threads) {
30 }
30 }
31
31
32 protected DispatchPool() {
32 protected DispatchPool() {
33 int maxThreads, maxCP;
34 ThreadPool.GetMaxThreads(out maxThreads, out maxCP);
35
33
36 m_minThreadsLimit = 0;
34 m_minThreadsLimit = 0;
37 m_maxThreadsLimit = maxThreads;
35 m_maxThreadsLimit = Environment.ProcessorCount;
38 }
36 }
39
37
40 protected void InitPool() {
38 protected void InitPool() {
41 for (int i = 0; i < m_minThreadsLimit; i++)
39 for (int i = 0; i < m_minThreadsLimit; i++)
42 StartWorker();
40 StartWorker();
43 }
41 }
44
42
45 public int PoolSize {
43 public int PoolSize {
46 get {
44 get {
47 Thread.MemoryBarrier();
45 Thread.MemoryBarrier();
48 return m_threads;
46 return m_threads;
49 }
47 }
50 }
48 }
51
49
52 public int MaxRunningThreads {
50 public int MaxRunningThreads {
53 get {
51 get {
54 Thread.MemoryBarrier();
52 Thread.MemoryBarrier();
55 return m_maxRunningThreads;
53 return m_maxRunningThreads;
56 }
54 }
57 }
55 }
58
56
59 protected bool IsDisposed {
57 protected bool IsDisposed {
60 get {
58 get {
61 Thread.MemoryBarrier();
59 Thread.MemoryBarrier();
62 return m_exit == 1;
60 return m_exit == 1;
63 }
61 }
64 }
62 }
65
63
66 protected abstract bool TryDequeue(out TUnit unit);
64 protected abstract bool TryDequeue(out TUnit unit);
67
65
68 bool Dequeue(out TUnit unit, int timeout) {
66 bool Dequeue(out TUnit unit, int timeout) {
69 int ts = Environment.TickCount;
67 int ts = Environment.TickCount;
70 if (TryDequeue(out unit))
68 if (TryDequeue(out unit))
71 return true;
69 return true;
72 lock (m_signal) {
70 lock (m_signal) {
73 while (!TryDequeue(out unit) && m_exit == 0)
71 while (!TryDequeue(out unit) && m_exit == 0)
74 if(!Monitor.Wait(m_signal, Math.Max(0, ts + timeout - Environment.TickCount))) {
72 if(!Monitor.Wait(m_signal, Math.Max(0, ts + timeout - Environment.TickCount))) {
75 // timeout
73 // timeout
76 return false;
74 return false;
77 }
75 }
78 // queue item or terminate
76 // queue item or terminate
79 Monitor.Pulse(m_signal);
77 Monitor.Pulse(m_signal);
80 if (m_exit == 1)
78 if (m_exit == 1)
81 return false;
79 return false;
82 }
80 }
83 return true;
81 return true;
84 }
82 }
85
83
86 protected void SignalThread() {
84 protected void SignalThread() {
87 lock (m_signal) {
85 lock (m_signal) {
88 Monitor.Pulse(m_signal);
86 Monitor.Pulse(m_signal);
89 }
87 }
90 }
88 }
91
89
92 #region thread slots traits
90 #region thread slots traits
93
91
94 bool AllocateThreadSlot() {
92 bool AllocateThreadSlot() {
95 int current;
93 int current;
96 // use spins to allocate slot for the new thread
94 // use spins to allocate slot for the new thread
97 do {
95 do {
98 current = m_threads;
96 current = m_threads;
99 if (current >= m_maxThreadsLimit || m_exit == 1)
97 if (current >= m_maxThreadsLimit || m_exit == 1)
100 // no more slots left or the pool has been disposed
98 // no more slots left or the pool has been disposed
101 return false;
99 return false;
102 } while (current != Interlocked.CompareExchange(ref m_threads, current + 1, current));
100 } while (current != Interlocked.CompareExchange(ref m_threads, current + 1, current));
103
101
104 UpdateMaxThreads(current + 1);
102 UpdateMaxThreads(current + 1);
105
103
106 return true;
104 return true;
107 }
105 }
108
106
109 bool AllocateThreadSlot(int desired) {
107 bool AllocateThreadSlot(int desired) {
110 if (desired - 1 != Interlocked.CompareExchange(ref m_threads, desired, desired - 1))
108 if (desired - 1 != Interlocked.CompareExchange(ref m_threads, desired, desired - 1))
111 return false;
109 return false;
112
110
113 UpdateMaxThreads(desired);
111 UpdateMaxThreads(desired);
114
112
115 return true;
113 return true;
116 }
114 }
117
115
118 bool ReleaseThreadSlot(out bool last) {
116 bool ReleaseThreadSlot(out bool last) {
119 last = false;
117 last = false;
120 int current;
118 int current;
121 // use spins to release slot for the new thread
119 // use spins to release slot for the new thread
122 Thread.MemoryBarrier();
120 Thread.MemoryBarrier();
123 do {
121 do {
124 current = m_threads;
122 current = m_threads;
125 if (current <= m_minThreadsLimit && m_exit == 0)
123 if (current <= m_minThreadsLimit && m_exit == 0)
126 // the thread is reserved
124 // the thread is reserved
127 return false;
125 return false;
128 } while (current != Interlocked.CompareExchange(ref m_threads, current - 1, current));
126 } while (current != Interlocked.CompareExchange(ref m_threads, current - 1, current));
129
127
130 last = (current == 1);
128 last = (current == 1);
131
129
132 return true;
130 return true;
133 }
131 }
134
132
135 void UpdateMaxThreads(int count) {
133 void UpdateMaxThreads(int count) {
136 int max;
134 int max;
137 do {
135 do {
138 max = m_maxRunningThreads;
136 max = m_maxRunningThreads;
139 if (max >= count)
137 if (max >= count)
140 break;
138 break;
141 } while(max != Interlocked.CompareExchange(ref m_maxRunningThreads, count, max));
139 } while(max != Interlocked.CompareExchange(ref m_maxRunningThreads, count, max));
142 }
140 }
143
141
144 #endregion
142 #endregion
145
143
146 protected bool StartWorker() {
144 protected bool StartWorker() {
147 if (AllocateThreadSlot()) {
145 if (AllocateThreadSlot()) {
148 // slot successfully allocated
146 // slot successfully allocated
149 var worker = new Thread(Worker);
147 var worker = new Thread(Worker);
150 worker.IsBackground = true;
148 worker.IsBackground = true;
151 worker.Start();
149 worker.Start();
152
150
153 return true;
151 return true;
154 }
152 }
155 return false;
153 return false;
156 }
154 }
157
155
158 protected abstract void InvokeUnit(TUnit unit);
156 protected abstract void InvokeUnit(TUnit unit);
159
157
160 protected virtual void Worker() {
158 protected virtual void Worker() {
161 TUnit unit;
159 TUnit unit;
162 bool last;
160 bool last;
163 do {
161 do {
164 while (Dequeue(out unit, m_releaseTimeout)) {
162 while (Dequeue(out unit, m_releaseTimeout)) {
165 InvokeUnit(unit);
163 InvokeUnit(unit);
166 }
164 }
167 if(!ReleaseThreadSlot(out last))
165 if(!ReleaseThreadSlot(out last))
168 continue;
166 continue;
169 // queue may be not empty
167 // queue may be not empty
170 if (last && TryDequeue(out unit)) {
168 if (last && TryDequeue(out unit)) {
171 InvokeUnit(unit);
169 InvokeUnit(unit);
172 if (AllocateThreadSlot(1))
170 if (AllocateThreadSlot(1))
173 continue;
171 continue;
174 // we can safely exit since pool is alive
172 // we can safely exit since pool is alive
175 }
173 }
176 break;
174 break;
177 } while(true);
175 } while(true);
178 }
176 }
179
177
180
178
181 protected virtual void Dispose(bool disposing) {
179 protected virtual void Dispose(bool disposing) {
182 if (disposing) {
180 if (disposing) {
183 if (0 == Interlocked.CompareExchange(ref m_exit, 1, 0)) { // implies memory barrier
181 if (0 == Interlocked.CompareExchange(ref m_exit, 1, 0)) { // implies memory barrier
184 // wake sleeping threads
182 // wake sleeping threads
185 SignalThread();
183 SignalThread();
186 GC.SuppressFinalize(this);
184 GC.SuppressFinalize(this);
187 }
185 }
188 }
186 }
189 }
187 }
190
188
191 public void Dispose() {
189 public void Dispose() {
192 Dispose(true);
190 Dispose(true);
193 }
191 }
194
192
195 ~DispatchPool() {
193 ~DispatchPool() {
196 Dispose(false);
194 Dispose(false);
197 }
195 }
198 }
196 }
199 }
197 }
@@ -1,120 +1,34
1 using System;
1 using System;
2 using Implab.Diagnostics;
2 using Implab.Diagnostics;
3 using Implab.Parallels;
3 using Implab.Parallels;
4 using Implab;
4 using Implab;
5 using System.Collections.Generic;
5 using System.Collections.Generic;
6 using System.Collections.Concurrent;
6 using System.Collections.Concurrent;
7
7
8 namespace MonoPlay {
8 namespace MonoPlay {
9 class MainClass {
9 class MainClass {
10 public static void Main(string[] args) {
10 public static void Main(string[] args) {
11 if (args == null)
11 if (args == null)
12 throw new ArgumentNullException("args");
12 throw new ArgumentNullException("args");
13
13
14 var q1 = new AsyncQueue<int>();
15 var q2 = new Queue<int>();
16
17 const int count = 10000000;
14 const int count = 10000000;
18
15
19 int res1 = 0, res2 = 0;
20 var t1 = Environment.TickCount;
16 var t1 = Environment.TickCount;
21
17
22 AsyncPool.RunThread(
18 for (int i = 0; i < count; i++) {
23 () => {
19 var p = new Promise<int>();
24 for (var i = 0; i < count; i++)
25 q1.Enqueue(1);
26 Console.WriteLine("done writer #1: {0} ms", Environment.TickCount - t1);
27 },
28 () => {
29 for (var i = 0; i < count; i++)
30 q1.Enqueue(2);
31 Console.WriteLine("done writer #2: {0} ms", Environment.TickCount - t1);
32 },
33 () => {
34 int temp = 0;
35 int i = 0;
36 while (i < count)
37 if (q1.TryDequeue(out temp)) {
38 i++;
39 res1 += temp;
40 }
41 Console.WriteLine("done reader #1: {0} ms", Environment.TickCount - t1);
42 },
43 () => {
44 int temp = 0;
45 int i = 0;
46 while (i < count)
47 if (q1.TryDequeue(out temp)) {
48 i++;
49 res2 += temp;
50 }
51 Console.WriteLine("done reader #2: {0} ms", Environment.TickCount - t1);
52 }
53 )
54 .Bundle()
55 .Join();
56
57 Console.WriteLine("done: {0} ms, summ#1: {1}, summ#2: {2}, total: {3}, count: {4}", Environment.TickCount - t1, res1, res2, res1 + res2, count*2);
58
59 var t2 = Environment.TickCount;
60 Console.WriteLine("MTQueue: {0} ms", t2 - t1);
61
62 t1 = Environment.TickCount;
63
20
64 for (var i = 0; i < count * 2; i++)
21 p.On(x => {}).On(x => {});
65 q2.Enqueue(i);
66
67 for (var i = 0; i < count * 2; i++)
68 q2.Dequeue();
69
70 t2 = Environment.TickCount;
71 Console.WriteLine("Queue: {0} ms", t2 - t1);
72
73 q2 = new Queue<int>();
74
75 t1 = Environment.TickCount;
76
77
22
78 AsyncPool.RunThread(
23 p.Resolve(i);
79 () => {
80 for (var i = 0; i < count; i++)
81 lock (q2)
82 q2.Enqueue(i);
83 },
84 () => {
85 for (var i = 0; i < count; i++)
86 lock (q2)
87 q2.Enqueue(i);
88 },
89 () => {
90 for (int i = 0; i < count ;)
91 lock (q2) {
92 if (q2.Count == 0)
93 continue;
94 q2.Dequeue();
95 i++;
96 }
97
98 },
99 () => {
100 for (int i = 0; i < count ;)
101 lock (q2) {
102 if (q2.Count == 0)
103 continue;
104 q2.Dequeue();
105 i++;
106 }
107
24
108 }
25 }
109 )
110 .Bundle()
111 .Join();
112
26
113
27
114
28
115 t2 = Environment.TickCount;
29 var t2 = Environment.TickCount;
116 Console.WriteLine("Queue+Lock: {0} ms", t2 - t1);
30 Console.WriteLine("done: {0} ms, {1:.00} Mb, {2} GC", t2 - t1, GC.GetTotalMemory(false) / (1024*1024), GC.CollectionCount(0) );
117
31
118 }
32 }
119 }
33 }
120 }
34 }
General Comments 0
You need to be logged in to leave comments. Login now