##// END OF EJS Templates
major update, added Drain mathod to AsyncQueue class
cin -
r124:a336cb13c6a9 v2
parent child
Show More
@@ -1,676 +1,779
1 using System;
1 using System;
2 using System.Reflection;
2 using System.Reflection;
3 using System.Threading;
3 using System.Threading;
4 using Implab.Parallels;
4 using Implab.Parallels;
5
5
6 #if MONO
6 #if MONO
7
7
8 using NUnit.Framework;
8 using NUnit.Framework;
9 using TestClassAttribute = NUnit.Framework.TestFixtureAttribute;
9 using TestClassAttribute = NUnit.Framework.TestFixtureAttribute;
10 using TestMethod = NUnit.Framework.TestAttribute;
10 using TestMethod = NUnit.Framework.TestAttribute;
11
11
12 #else
12 #else
13
13
14 using Microsoft.VisualStudio.TestTools.UnitTesting;
14 using Microsoft.VisualStudio.TestTools.UnitTesting;
15
15
16 #endif
16 #endif
17
17
18 namespace Implab.Test {
18 namespace Implab.Test {
19 [TestClass]
19 [TestClass]
20 public class AsyncTests {
20 public class AsyncTests {
21 [TestMethod]
21 [TestMethod]
22 public void ResolveTest() {
22 public void ResolveTest() {
23 int res = -1;
23 int res = -1;
24 var p = new Promise<int>();
24 var p = new Promise<int>();
25 p.Then(x => res = x);
25 p.Then(x => res = x);
26 p.Resolve(100);
26 p.Resolve(100);
27
27
28 Assert.AreEqual(100, res);
28 Assert.AreEqual(100, res);
29 }
29 }
30
30
31 [TestMethod]
31 [TestMethod]
32 public void RejectTest() {
32 public void RejectTest() {
33 int res = -1;
33 int res = -1;
34 Exception err = null;
34 Exception err = null;
35
35
36 var p = new Promise<int>();
36 var p = new Promise<int>();
37 p.Then(
37 p.Then(
38 x => res = x,
38 x => res = x,
39 e => {
39 e => {
40 err = e;
40 err = e;
41 return -2;
41 return -2;
42 }
42 }
43 );
43 );
44 p.Reject(new ApplicationException("error"));
44 p.Reject(new ApplicationException("error"));
45
45
46 Assert.AreEqual(res, -1);
46 Assert.AreEqual(res, -1);
47 Assert.AreEqual(err.Message, "error");
47 Assert.AreEqual(err.Message, "error");
48
48
49 }
49 }
50
50
51 [TestMethod]
51 [TestMethod]
52 public void CancelExceptionTest() {
52 public void CancelExceptionTest() {
53 var p = new Promise<bool>();
53 var p = new Promise<bool>();
54 p.Cancel();
54 p.Cancel();
55
55
56 var p2 = p.Cancelled(() => {
56 var p2 = p.Cancelled(() => {
57 throw new ApplicationException("CANCELLED");
57 throw new ApplicationException("CANCELLED");
58 });
58 });
59
59
60 try {
60 try {
61 p2.Join();
61 p2.Join();
62 Assert.Fail();
62 Assert.Fail();
63 } catch (ApplicationException err) {
63 } catch (ApplicationException err) {
64 Assert.AreEqual("CANCELLED", err.InnerException.Message);
64 Assert.AreEqual("CANCELLED", err.InnerException.Message);
65 }
65 }
66
66
67 }
67 }
68
68
69 [TestMethod]
69 [TestMethod]
70 public void ContinueOnCancelTest() {
70 public void ContinueOnCancelTest() {
71 var p = new Promise<bool>();
71 var p = new Promise<bool>();
72 p.Cancel();
72 p.Cancel();
73
73
74 var p2 = p
74 var p2 = p
75 .Cancelled<bool>(() => {
75 .Cancelled<bool>(() => {
76 throw new ApplicationException("CANCELLED");
76 throw new ApplicationException("CANCELLED");
77 })
77 })
78 .Error(e => true);
78 .Error(e => true);
79
79
80 Assert.AreEqual(true, p2.Join());
80 Assert.AreEqual(true, p2.Join());
81 }
81 }
82
82
83 [TestMethod]
83 [TestMethod]
84 public void JoinSuccessTest() {
84 public void JoinSuccessTest() {
85 var p = new Promise<int>();
85 var p = new Promise<int>();
86 p.Resolve(100);
86 p.Resolve(100);
87 Assert.AreEqual(p.Join(), 100);
87 Assert.AreEqual(p.Join(), 100);
88 }
88 }
89
89
90 [TestMethod]
90 [TestMethod]
91 public void JoinFailTest() {
91 public void JoinFailTest() {
92 var p = new Promise<int>();
92 var p = new Promise<int>();
93 p.Reject(new ApplicationException("failed"));
93 p.Reject(new ApplicationException("failed"));
94
94
95 try {
95 try {
96 p.Join();
96 p.Join();
97 throw new ApplicationException("WRONG!");
97 throw new ApplicationException("WRONG!");
98 } catch (TargetInvocationException err) {
98 } catch (TargetInvocationException err) {
99 Assert.AreEqual(err.InnerException.Message, "failed");
99 Assert.AreEqual(err.InnerException.Message, "failed");
100 } catch {
100 } catch {
101 Assert.Fail("Got wrong excaption");
101 Assert.Fail("Got wrong excaption");
102 }
102 }
103 }
103 }
104
104
105 [TestMethod]
105 [TestMethod]
106 public void MapTest() {
106 public void MapTest() {
107 var p = new Promise<int>();
107 var p = new Promise<int>();
108
108
109 var p2 = p.Then(x => x.ToString());
109 var p2 = p.Then(x => x.ToString());
110 p.Resolve(100);
110 p.Resolve(100);
111
111
112 Assert.AreEqual(p2.Join(), "100");
112 Assert.AreEqual(p2.Join(), "100");
113 }
113 }
114
114
115 [TestMethod]
115 [TestMethod]
116 public void FixErrorTest() {
116 public void FixErrorTest() {
117 var p = new Promise<int>();
117 var p = new Promise<int>();
118
118
119 var p2 = p.Error(e => 101);
119 var p2 = p.Error(e => 101);
120
120
121 p.Reject(new Exception());
121 p.Reject(new Exception());
122
122
123 Assert.AreEqual(p2.Join(), 101);
123 Assert.AreEqual(p2.Join(), 101);
124 }
124 }
125
125
126 [TestMethod]
126 [TestMethod]
127 public void ChainTest() {
127 public void ChainTest() {
128 var p1 = new Promise<int>();
128 var p1 = new Promise<int>();
129
129
130 var p3 = p1.Chain(x => {
130 var p3 = p1.Chain(x => {
131 var p2 = new Promise<string>();
131 var p2 = new Promise<string>();
132 p2.Resolve(x.ToString());
132 p2.Resolve(x.ToString());
133 return p2;
133 return p2;
134 });
134 });
135
135
136 p1.Resolve(100);
136 p1.Resolve(100);
137
137
138 Assert.AreEqual(p3.Join(), "100");
138 Assert.AreEqual(p3.Join(), "100");
139 }
139 }
140
140
141 [TestMethod]
141 [TestMethod]
142 public void ChainFailTest() {
142 public void ChainFailTest() {
143 var p1 = new Promise<int>();
143 var p1 = new Promise<int>();
144
144
145 var p3 = p1.Chain(x => {
145 var p3 = p1.Chain(x => {
146 var p2 = new Promise<string>();
146 var p2 = new Promise<string>();
147 p2.Reject(new Exception("DIE!!!"));
147 p2.Reject(new Exception("DIE!!!"));
148 return p2;
148 return p2;
149 });
149 });
150
150
151 p1.Resolve(100);
151 p1.Resolve(100);
152
152
153 Assert.IsTrue(p3.IsResolved);
153 Assert.IsTrue(p3.IsResolved);
154 }
154 }
155
155
156 [TestMethod]
156 [TestMethod]
157 public void PoolTest() {
157 public void PoolTest() {
158 var pid = Thread.CurrentThread.ManagedThreadId;
158 var pid = Thread.CurrentThread.ManagedThreadId;
159 var p = AsyncPool.Invoke(() => Thread.CurrentThread.ManagedThreadId);
159 var p = AsyncPool.Invoke(() => Thread.CurrentThread.ManagedThreadId);
160
160
161 Assert.AreNotEqual(pid, p.Join());
161 Assert.AreNotEqual(pid, p.Join());
162 }
162 }
163
163
164 [TestMethod]
164 [TestMethod]
165 public void WorkerPoolSizeTest() {
165 public void WorkerPoolSizeTest() {
166 var pool = new WorkerPool(5, 10, 1);
166 var pool = new WorkerPool(5, 10, 1);
167
167
168 Assert.AreEqual(5, pool.PoolSize);
168 Assert.AreEqual(5, pool.PoolSize);
169
169
170 pool.Invoke(() => { Thread.Sleep(100000000); return 10; });
170 pool.Invoke(() => { Thread.Sleep(100000000); return 10; });
171 pool.Invoke(() => { Thread.Sleep(100000000); return 10; });
171 pool.Invoke(() => { Thread.Sleep(100000000); return 10; });
172 pool.Invoke(() => { Thread.Sleep(100000000); return 10; });
172 pool.Invoke(() => { Thread.Sleep(100000000); return 10; });
173
173
174 Assert.AreEqual(5, pool.PoolSize);
174 Assert.AreEqual(5, pool.PoolSize);
175
175
176 for (int i = 0; i < 100; i++)
176 for (int i = 0; i < 100; i++)
177 pool.Invoke(() => { Thread.Sleep(100000000); return 10; });
177 pool.Invoke(() => { Thread.Sleep(100000000); return 10; });
178 Thread.Sleep(200);
178 Thread.Sleep(200);
179 Assert.AreEqual(10, pool.PoolSize);
179 Assert.AreEqual(10, pool.PoolSize);
180
180
181 pool.Dispose();
181 pool.Dispose();
182 }
182 }
183
183
184 [TestMethod]
184 [TestMethod]
185 public void WorkerPoolCorrectTest() {
185 public void WorkerPoolCorrectTest() {
186 var pool = new WorkerPool(0,1000,100);
186 var pool = new WorkerPool(0,1000,100);
187
187
188 const int iterations = 1000;
188 const int iterations = 1000;
189 int pending = iterations;
189 int pending = iterations;
190 var stop = new ManualResetEvent(false);
190 var stop = new ManualResetEvent(false);
191
191
192 var count = 0;
192 var count = 0;
193 for (int i = 0; i < iterations; i++) {
193 for (int i = 0; i < iterations; i++) {
194 pool
194 pool
195 .Invoke(() => 1)
195 .Invoke(() => 1)
196 .Then(x => Interlocked.Add(ref count, x))
196 .Then(x => Interlocked.Add(ref count, x))
197 .Then(x => Math.Log10(x))
197 .Then(x => Math.Log10(x))
198 .On(() => {
198 .On(() => {
199 Interlocked.Decrement(ref pending);
199 Interlocked.Decrement(ref pending);
200 if (pending == 0)
200 if (pending == 0)
201 stop.Set();
201 stop.Set();
202 }, PromiseEventType.All);
202 }, PromiseEventType.All);
203 }
203 }
204
204
205 stop.WaitOne();
205 stop.WaitOne();
206
206
207 Assert.AreEqual(iterations, count);
207 Assert.AreEqual(iterations, count);
208 Console.WriteLine("Max threads: {0}", pool.MaxRunningThreads);
208 Console.WriteLine("Max threads: {0}", pool.MaxRunningThreads);
209 pool.Dispose();
209 pool.Dispose();
210
210
211 }
211 }
212
212
213 [TestMethod]
213 [TestMethod]
214 public void WorkerPoolDisposeTest() {
214 public void WorkerPoolDisposeTest() {
215 var pool = new WorkerPool(5, 20);
215 var pool = new WorkerPool(5, 20);
216 Assert.AreEqual(5, pool.PoolSize);
216 Assert.AreEqual(5, pool.PoolSize);
217 pool.Dispose();
217 pool.Dispose();
218 Thread.Sleep(500);
218 Thread.Sleep(500);
219 Assert.AreEqual(0, pool.PoolSize);
219 Assert.AreEqual(0, pool.PoolSize);
220 pool.Dispose();
220 pool.Dispose();
221 }
221 }
222
222
223 [TestMethod]
223 [TestMethod]
224 public void MTQueueTest() {
224 public void MTQueueTest() {
225 var queue = new MTQueue<int>();
225 var queue = new MTQueue<int>();
226 int res;
226 int res;
227
227
228 queue.Enqueue(10);
228 queue.Enqueue(10);
229 Assert.IsTrue(queue.TryDequeue(out res));
229 Assert.IsTrue(queue.TryDequeue(out res));
230 Assert.AreEqual(10, res);
230 Assert.AreEqual(10, res);
231 Assert.IsFalse(queue.TryDequeue(out res));
231 Assert.IsFalse(queue.TryDequeue(out res));
232
232
233 for (int i = 0; i < 1000; i++)
233 for (int i = 0; i < 1000; i++)
234 queue.Enqueue(i);
234 queue.Enqueue(i);
235
235
236 for (int i = 0; i < 1000; i++) {
236 for (int i = 0; i < 1000; i++) {
237 queue.TryDequeue(out res);
237 queue.TryDequeue(out res);
238 Assert.AreEqual(i, res);
238 Assert.AreEqual(i, res);
239 }
239 }
240
240
241 int writers = 0;
241 int writers = 0;
242 int readers = 0;
242 int readers = 0;
243 var stop = new ManualResetEvent(false);
243 var stop = new ManualResetEvent(false);
244 int total = 0;
244 int total = 0;
245
245
246 const int itemsPerWriter = 10000;
246 const int itemsPerWriter = 10000;
247 const int writersCount = 10;
247 const int writersCount = 10;
248
248
249 for (int i = 0; i < writersCount; i++) {
249 for (int i = 0; i < writersCount; i++) {
250 Interlocked.Increment(ref writers);
250 Interlocked.Increment(ref writers);
251 AsyncPool
251 AsyncPool
252 .InvokeNewThread(() => {
252 .RunThread(() => {
253 for (int ii = 0; ii < itemsPerWriter; ii++) {
253 for (int ii = 0; ii < itemsPerWriter; ii++) {
254 queue.Enqueue(1);
254 queue.Enqueue(1);
255 }
255 }
256 return 1;
256 return 1;
257 })
257 })
258 .On(() => Interlocked.Decrement(ref writers), PromiseEventType.All);
258 .On(() => Interlocked.Decrement(ref writers), PromiseEventType.All);
259 }
259 }
260
260
261 for (int i = 0; i < 10; i++) {
261 for (int i = 0; i < 10; i++) {
262 Interlocked.Increment(ref readers);
262 Interlocked.Increment(ref readers);
263 AsyncPool
263 AsyncPool
264 .InvokeNewThread(() => {
264 .RunThread(() => {
265 int t;
265 int t;
266 do {
266 do {
267 while (queue.TryDequeue(out t))
267 while (queue.TryDequeue(out t))
268 Interlocked.Add(ref total, t);
268 Interlocked.Add(ref total, t);
269 } while (writers > 0);
269 } while (writers > 0);
270 return 1;
270 return 1;
271 })
271 })
272 .On(() => {
272 .On(() => {
273 Interlocked.Decrement(ref readers);
273 Interlocked.Decrement(ref readers);
274 if (readers == 0)
274 if (readers == 0)
275 stop.Set();
275 stop.Set();
276 }, PromiseEventType.All);
276 }, PromiseEventType.All);
277 }
277 }
278
278
279 stop.WaitOne();
279 stop.WaitOne();
280
280
281 Assert.AreEqual(100000, total);
281 Assert.AreEqual(100000, total);
282 }
282 }
283
283
284 [TestMethod]
284 [TestMethod]
285 public void AsyncQueueTest() {
285 public void AsyncQueueTest() {
286 var queue = new AsyncQueue<int>();
286 var queue = new AsyncQueue<int>();
287 int res;
287 int res;
288
288
289 queue.Enqueue(10);
289 queue.Enqueue(10);
290 Assert.IsTrue(queue.TryDequeue(out res));
290 Assert.IsTrue(queue.TryDequeue(out res));
291 Assert.AreEqual(10, res);
291 Assert.AreEqual(10, res);
292 Assert.IsFalse(queue.TryDequeue(out res));
292 Assert.IsFalse(queue.TryDequeue(out res));
293
293
294 for (int i = 0; i < 1000; i++)
294 for (int i = 0; i < 1000; i++)
295 queue.Enqueue(i);
295 queue.Enqueue(i);
296
296
297 for (int i = 0; i < 1000; i++) {
297 for (int i = 0; i < 1000; i++) {
298 queue.TryDequeue(out res);
298 queue.TryDequeue(out res);
299 Assert.AreEqual(i, res);
299 Assert.AreEqual(i, res);
300 }
300 }
301
301
302 const int count = 10000000;
302 const int count = 10000000;
303
303
304 int res1 = 0, res2 = 0;
304 int res1 = 0, res2 = 0;
305 var t1 = Environment.TickCount;
305 var t1 = Environment.TickCount;
306
306
307 AsyncPool.RunThread(
307 AsyncPool.RunThread(
308 () => {
308 () => {
309 for (var i = 0; i < count; i++)
309 for (var i = 0; i < count; i++)
310 queue.Enqueue(1);
310 queue.Enqueue(1);
311 Console.WriteLine("done writer #1: {0} ms", Environment.TickCount - t1);
311 Console.WriteLine("done writer #1: {0} ms", Environment.TickCount - t1);
312 },
312 },
313 () => {
313 () => {
314 for (var i = 0; i < count; i++)
314 for (var i = 0; i < count; i++)
315 queue.Enqueue(2);
315 queue.Enqueue(2);
316 Console.WriteLine("done writer #2: {0} ms", Environment.TickCount - t1);
316 Console.WriteLine("done writer #2: {0} ms", Environment.TickCount - t1);
317 },
317 },
318 () => {
318 () => {
319 int temp;
319 int temp;
320 int i = 0;
320 int i = 0;
321 while (i < count)
321 while (i < count)
322 if (queue.TryDequeue(out temp)) {
322 if (queue.TryDequeue(out temp)) {
323 i++;
323 i++;
324 res1 += temp;
324 res1 += temp;
325 }
325 }
326 Console.WriteLine("done reader #1: {0} ms", Environment.TickCount - t1);
326 Console.WriteLine("done reader #1: {0} ms", Environment.TickCount - t1);
327 },
327 },
328 () => {
328 () => {
329 int temp;
329 int temp;
330 int i = 0;
330 int i = 0;
331 while (i < count)
331 while (i < count)
332 if (queue.TryDequeue(out temp)) {
332 if (queue.TryDequeue(out temp)) {
333 i++;
333 i++;
334 res2 += temp;
334 res2 += temp;
335 }
335 }
336 Console.WriteLine("done reader #2: {0} ms", Environment.TickCount - t1);
336 Console.WriteLine("done reader #2: {0} ms", Environment.TickCount - t1);
337 }
337 }
338 )
338 )
339 .Combine()
339 .Bundle()
340 .Join();
340 .Join();
341
341
342 Assert.AreEqual(count * 3, res1 + res2);
342 Assert.AreEqual(count * 3, res1 + res2);
343
343
344 Console.WriteLine(
344 Console.WriteLine(
345 "done: {0} ms, summ#1: {1}, summ#2: {2}, total: {3}, count: {4}",
345 "done: {0} ms, summ#1: {1}, summ#2: {2}, total: {3}, count: {4}",
346 Environment.TickCount - t1,
346 Environment.TickCount - t1,
347 res1,
347 res1,
348 res2,
348 res2,
349 res1 + res2,
349 res1 + res2,
350 count
350 count
351 );
351 );
352 }
352 }
353
353
354 [TestMethod]
354 [TestMethod]
355 public void AsyncQueueBatchTest() {
355 public void AsyncQueueBatchTest() {
356 var queue = new AsyncQueue<int>();
356 var queue = new AsyncQueue<int>();
357
357
358 const int wBatch = 29;
358 const int wBatch = 29;
359 const int wCount = 400000;
359 const int wCount = 400000;
360 const int total = wBatch * wCount * 2;
360 const int total = wBatch * wCount * 2;
361 const int summ = wBatch * wCount * 3;
361 const int summ = wBatch * wCount * 3;
362
362
363 int r1 = 0, r2 = 0;
363 int r1 = 0, r2 = 0;
364 const int rBatch = 111;
364 const int rBatch = 111;
365 int read = 0;
365 int read = 0;
366
366
367 var t1 = Environment.TickCount;
367 var t1 = Environment.TickCount;
368
368
369 AsyncPool.RunThread(
369 AsyncPool.RunThread(
370 () => {
370 () => {
371 var buffer = new int[wBatch];
371 var buffer = new int[wBatch];
372 for(int i = 0; i<wBatch; i++)
372 for(int i = 0; i<wBatch; i++)
373 buffer[i] = 1;
373 buffer[i] = 1;
374
374
375 for(int i =0; i < wCount; i++)
375 for(int i =0; i < wCount; i++)
376 queue.EnqueueRange(buffer,0,wBatch);
376 queue.EnqueueRange(buffer,0,wBatch);
377 Console.WriteLine("done writer #1: {0} ms", Environment.TickCount - t1);
377 Console.WriteLine("done writer #1: {0} ms", Environment.TickCount - t1);
378 },
378 },
379 () => {
379 () => {
380 var buffer = new int[wBatch];
380 var buffer = new int[wBatch];
381 for(int i = 0; i<wBatch; i++)
381 for(int i = 0; i<wBatch; i++)
382 buffer[i] = 2;
382 buffer[i] = 2;
383
383
384 for(int i =0; i < wCount; i++)
384 for(int i =0; i < wCount; i++)
385 queue.EnqueueRange(buffer,0,wBatch);
385 queue.EnqueueRange(buffer,0,wBatch);
386 Console.WriteLine("done writer #2: {0} ms", Environment.TickCount - t1);
386 Console.WriteLine("done writer #2: {0} ms", Environment.TickCount - t1);
387 },
387 },
388 () => {
388 () => {
389 var buffer = new int[rBatch];
389 var buffer = new int[rBatch];
390
390
391 while(read < total) {
391 while(read < total) {
392 int actual;
392 int actual;
393 if (queue.TryDequeueRange(buffer,0,rBatch,out actual)) {
393 if (queue.TryDequeueRange(buffer,0,rBatch,out actual)) {
394 for(int i=0; i< actual; i++)
394 for(int i=0; i< actual; i++)
395 r1 += buffer[i];
395 r1 += buffer[i];
396 Interlocked.Add(ref read, actual);
396 Interlocked.Add(ref read, actual);
397 }
397 }
398 }
398 }
399
399
400 Console.WriteLine("done reader #1: {0} ms", Environment.TickCount - t1);
400 Console.WriteLine("done reader #1: {0} ms", Environment.TickCount - t1);
401 },
401 },
402 () => {
402 () => {
403 var buffer = new int[rBatch];
403 var buffer = new int[rBatch];
404
404
405 while(read < total) {
405 while(read < total) {
406 int actual;
406 int actual;
407 if (queue.TryDequeueRange(buffer,0,rBatch,out actual)) {
407 if (queue.TryDequeueRange(buffer,0,rBatch,out actual)) {
408 for(int i=0; i< actual; i++)
408 for(int i=0; i< actual; i++)
409 r2 += buffer[i];
409 r2 += buffer[i];
410 Interlocked.Add(ref read, actual);
410 Interlocked.Add(ref read, actual);
411 }
411 }
412 }
412 }
413
413
414 Console.WriteLine("done reader #2: {0} ms", Environment.TickCount - t1);
414 Console.WriteLine("done reader #2: {0} ms", Environment.TickCount - t1);
415 }
415 }
416 )
416 )
417 .Combine()
417 .Bundle()
418 .Join();
418 .Join();
419
419
420 Assert.AreEqual(summ , r1 + r2);
420 Assert.AreEqual(summ , r1 + r2);
421
421
422 Console.WriteLine(
422 Console.WriteLine(
423 "done: {0} ms, summ#1: {1}, summ#2: {2}, total: {3}, count: {4}",
423 "done: {0} ms, summ#1: {1}, summ#2: {2}, total: {3}, count: {4}",
424 Environment.TickCount - t1,
424 Environment.TickCount - t1,
425 r1,
425 r1,
426 r2,
426 r2,
427 r1 + r2,
427 r1 + r2,
428 total
428 total
429 );
429 );
430 }
430 }
431
431
432 [TestMethod]
432 [TestMethod]
433 public void AsyncQueueChunkDequeueTest() {
433 public void AsyncQueueChunkDequeueTest() {
434 var queue = new AsyncQueue<int>();
434 var queue = new AsyncQueue<int>();
435
435
436 const int wBatch = 31;
436 const int wBatch = 31;
437 const int wCount = 200000;
437 const int wCount = 200000;
438 const int total = wBatch * wCount * 3;
438 const int total = wBatch * wCount * 3;
439 const int summ = wBatch * wCount * 6;
439 const int summ = wBatch * wCount * 6;
440
440
441 int r1 = 0, r2 = 0;
441 int r1 = 0, r2 = 0;
442 const int rBatch = 1024;
442 const int rBatch = 1024;
443 int read = 0;
443 int read = 0;
444
444
445 var t1 = Environment.TickCount;
445 var t1 = Environment.TickCount;
446
446
447 AsyncPool.RunThread(
447 AsyncPool.RunThread(
448 () => {
448 () => {
449 var buffer = new int[wBatch];
449 var buffer = new int[wBatch];
450 for(int i = 0; i<wBatch; i++)
450 for(int i = 0; i<wBatch; i++)
451 buffer[i] = 1;
451 buffer[i] = 1;
452
452
453 for(int i =0; i < wCount; i++)
453 for(int i =0; i < wCount; i++)
454 queue.EnqueueRange(buffer,0,wBatch);
454 queue.EnqueueRange(buffer,0,wBatch);
455 Console.WriteLine("done writer #1: {0} ms", Environment.TickCount - t1);
455 Console.WriteLine("done writer #1: {0} ms", Environment.TickCount - t1);
456 },
456 },
457 () => {
457 () => {
458 var buffer = new int[wBatch];
458 var buffer = new int[wBatch];
459 for(int i = 0; i<wBatch; i++)
459 for(int i = 0; i<wBatch; i++)
460 buffer[i] = 2;
460 buffer[i] = 2;
461
461
462 for(int i =0; i < wCount; i++)
462 for(int i =0; i < wCount; i++)
463 queue.EnqueueRange(buffer,0,wBatch);
463 queue.EnqueueRange(buffer,0,wBatch);
464 Console.WriteLine("done writer #2: {0} ms", Environment.TickCount - t1);
464 Console.WriteLine("done writer #2: {0} ms", Environment.TickCount - t1);
465 },
465 },
466 () => {
466 () => {
467 var buffer = new int[wBatch];
467 var buffer = new int[wBatch];
468 for(int i = 0; i<wBatch; i++)
468 for(int i = 0; i<wBatch; i++)
469 buffer[i] = 3;
469 buffer[i] = 3;
470
470
471 for(int i =0; i < wCount; i++)
471 for(int i =0; i < wCount; i++)
472 queue.EnqueueRange(buffer,0,wBatch);
472 queue.EnqueueRange(buffer,0,wBatch);
473 Console.WriteLine("done writer #3: {0} ms", Environment.TickCount - t1);
473 Console.WriteLine("done writer #3: {0} ms", Environment.TickCount - t1);
474 },
474 },
475 () => {
475 () => {
476 var buffer = new int[rBatch];
476 var buffer = new int[rBatch];
477 int count = 1;
477 int count = 1;
478 double avgchunk = 0;
478 double avgchunk = 0;
479 while(read < total) {
479 while(read < total) {
480 int actual;
480 int actual;
481 if (queue.TryDequeueChunk(buffer,0,rBatch,out actual)) {
481 if (queue.TryDequeueChunk(buffer,0,rBatch,out actual)) {
482 for(int i=0; i< actual; i++)
482 for(int i=0; i< actual; i++)
483 r2 += buffer[i];
483 r2 += buffer[i];
484 Interlocked.Add(ref read, actual);
484 Interlocked.Add(ref read, actual);
485 avgchunk = avgchunk*(count-1)/count + actual/(double)count;
485 avgchunk = avgchunk*(count-1)/count + actual/(double)count;
486 count ++;
486 count ++;
487 }
487 }
488 }
488 }
489
489
490 Console.WriteLine("done reader #2: {0} ms, avg chunk size: {1}", Environment.TickCount - t1, avgchunk);
490 Console.WriteLine("done reader #2: {0} ms, avg chunk size: {1}", Environment.TickCount - t1, avgchunk);
491 }
491 }
492 )
492 )
493 .Combine()
493 .Bundle()
494 .Join();
495
496 Assert.AreEqual(summ , r1 + r2);
497
498 Console.WriteLine(
499 "done: {0} ms, summ#1: {1}, summ#2: {2}, total: {3}, count: {4}",
500 Environment.TickCount - t1,
501 r1,
502 r2,
503 r1 + r2,
504 total
505 );
506 }
507
508 [TestMethod]
509 public void AsyncQueueDrainTest() {
510 var queue = new AsyncQueue<int>();
511
512 const int wBatch = 11;
513 const int wCount = 200000;
514 const int total = wBatch * wCount * 3;
515 const int summ = wBatch * wCount * 3;
516
517 int r1 = 0, r2 = 0;
518 const int rBatch = 11;
519 int read = 0;
520
521 var t1 = Environment.TickCount;
522
523 AsyncPool.RunThread(
524 () => {
525 var buffer = new int[wBatch];
526 for(int i = 0; i<wBatch; i++)
527 buffer[i] = 1;
528
529 for(int i =0; i < wCount; i++)
530 queue.EnqueueRange(buffer,0,wBatch);
531 Console.WriteLine("done writer #1: {0} ms", Environment.TickCount - t1);
532 },
533 () => {
534 for(int i =0; i < wCount * wBatch; i++)
535 queue.Enqueue(1);
536 Console.WriteLine("done writer #2: {0} ms", Environment.TickCount - t1);
537 },
538 () => {
539 var buffer = new int[wBatch];
540 for(int i = 0; i<wBatch; i++)
541 buffer[i] = 1;
542
543 for(int i =0; i < wCount; i++)
544 queue.EnqueueRange(buffer,0,wBatch);
545 Console.WriteLine("done writer #3: {0} ms", Environment.TickCount - t1);
546 },
547 /*() => {
548 int temp;
549 int count = 0;
550 while (read < total)
551 if (queue.TryDequeue(out temp)) {
552 count++;
553 r1 += temp;
554 Interlocked.Increment(ref read);
555 }
556 Console.WriteLine("done reader #1: {0} ms, {1} count", Environment.TickCount - t1, count);
557 },*/
558 /*() => {
559 var buffer = new int[rBatch];
560 var count = 0;
561 while(read < total) {
562 int actual;
563 if (queue.TryDequeueRange(buffer,0,rBatch,out actual)) {
564 for(int i=0; i< actual; i++)
565 r1 += buffer[i];
566 Interlocked.Add(ref read, actual);
567 count += actual;
568 }
569 }
570
571 Console.WriteLine("done reader #1: {0} ms, {1} items", Environment.TickCount - t1, count);
572 },*/
573 () => {
574 var count = 0;
575 while(read < total) {
576 var buffer = queue.Drain();
577 for(int i=0; i< buffer.Length; i++)
578 r1 += buffer[i];
579 Interlocked.Add(ref read, buffer.Length);
580 count += buffer.Length;
581 }
582 Console.WriteLine("done reader #1: {0} ms, {1} items", Environment.TickCount - t1, count);
583 },
584 () => {
585 var count = 0;
586 while(read < total) {
587 var buffer = queue.Drain();
588 for(int i=0; i< buffer.Length; i++)
589 r2 += buffer[i];
590 Interlocked.Add(ref read, buffer.Length);
591 count += buffer.Length;
592 }
593 Console.WriteLine("done reader #2: {0} ms, {1} items", Environment.TickCount - t1, count);
594 }
595 )
596 .Bundle()
494 .Join();
597 .Join();
495
598
496 Assert.AreEqual(summ , r1 + r2);
599 Assert.AreEqual(summ , r1 + r2);
497
600
498 Console.WriteLine(
601 Console.WriteLine(
499 "done: {0} ms, summ#1: {1}, summ#2: {2}, total: {3}, count: {4}",
602 "done: {0} ms, summ#1: {1}, summ#2: {2}, total: {3}, count: {4}",
500 Environment.TickCount - t1,
603 Environment.TickCount - t1,
501 r1,
604 r1,
502 r2,
605 r2,
503 r1 + r2,
606 r1 + r2,
504 total
607 total
505 );
608 );
506 }
609 }
507
610
508 [TestMethod]
611 [TestMethod]
509 public void ParallelMapTest() {
612 public void ParallelMapTest() {
510
613
511 const int count = 100000;
614 const int count = 100000;
512
615
513 var args = new double[count];
616 var args = new double[count];
514 var rand = new Random();
617 var rand = new Random();
515
618
516 for (int i = 0; i < count; i++)
619 for (int i = 0; i < count; i++)
517 args[i] = rand.NextDouble();
620 args[i] = rand.NextDouble();
518
621
519 var t = Environment.TickCount;
622 var t = Environment.TickCount;
520 var res = args.ParallelMap(x => Math.Sin(x*x), 4).Join();
623 var res = args.ParallelMap(x => Math.Sin(x*x), 4).Join();
521
624
522 Console.WriteLine("Map complete in {0} ms", Environment.TickCount - t);
625 Console.WriteLine("Map complete in {0} ms", Environment.TickCount - t);
523
626
524 t = Environment.TickCount;
627 t = Environment.TickCount;
525 for (int i = 0; i < count; i++)
628 for (int i = 0; i < count; i++)
526 Assert.AreEqual(Math.Sin(args[i] * args[i]), res[i]);
629 Assert.AreEqual(Math.Sin(args[i] * args[i]), res[i]);
527 Console.WriteLine("Verified in {0} ms", Environment.TickCount - t);
630 Console.WriteLine("Verified in {0} ms", Environment.TickCount - t);
528 }
631 }
529
632
530 [TestMethod]
633 [TestMethod]
531 public void ChainedMapTest() {
634 public void ChainedMapTest() {
532
635
533 using (var pool = new WorkerPool(0,10,1)) {
636 using (var pool = new WorkerPool(0,10,1)) {
534 const int count = 10000;
637 const int count = 10000;
535
638
536 var args = new double[count];
639 var args = new double[count];
537 var rand = new Random();
640 var rand = new Random();
538
641
539 for (int i = 0; i < count; i++)
642 for (int i = 0; i < count; i++)
540 args[i] = rand.NextDouble();
643 args[i] = rand.NextDouble();
541
644
542 var t = Environment.TickCount;
645 var t = Environment.TickCount;
543 var res = args
646 var res = args
544 .ChainedMap(
647 .ChainedMap(
545 // Analysis disable once AccessToDisposedClosure
648 // Analysis disable once AccessToDisposedClosure
546 x => pool.Invoke(
649 x => pool.Invoke(
547 () => Math.Sin(x * x)
650 () => Math.Sin(x * x)
548 ),
651 ),
549 4
652 4
550 )
653 )
551 .Join();
654 .Join();
552
655
553 Console.WriteLine("Map complete in {0} ms", Environment.TickCount - t);
656 Console.WriteLine("Map complete in {0} ms", Environment.TickCount - t);
554
657
555 t = Environment.TickCount;
658 t = Environment.TickCount;
556 for (int i = 0; i < count; i++)
659 for (int i = 0; i < count; i++)
557 Assert.AreEqual(Math.Sin(args[i] * args[i]), res[i]);
660 Assert.AreEqual(Math.Sin(args[i] * args[i]), res[i]);
558 Console.WriteLine("Verified in {0} ms", Environment.TickCount - t);
661 Console.WriteLine("Verified in {0} ms", Environment.TickCount - t);
559 Console.WriteLine("Max workers: {0}", pool.MaxRunningThreads);
662 Console.WriteLine("Max workers: {0}", pool.MaxRunningThreads);
560 }
663 }
561 }
664 }
562
665
563 [TestMethod]
666 [TestMethod]
564 public void ParallelForEachTest() {
667 public void ParallelForEachTest() {
565
668
566 const int count = 100000;
669 const int count = 100000;
567
670
568 var args = new int[count];
671 var args = new int[count];
569 var rand = new Random();
672 var rand = new Random();
570
673
571 for (int i = 0; i < count; i++)
674 for (int i = 0; i < count; i++)
572 args[i] = (int)(rand.NextDouble() * 100);
675 args[i] = (int)(rand.NextDouble() * 100);
573
676
574 int result = 0;
677 int result = 0;
575
678
576 var t = Environment.TickCount;
679 var t = Environment.TickCount;
577 args.ParallelForEach(x => Interlocked.Add(ref result, x), 4).Join();
680 args.ParallelForEach(x => Interlocked.Add(ref result, x), 4).Join();
578
681
579 Console.WriteLine("Iteration complete in {0} ms, result: {1}", Environment.TickCount - t, result);
682 Console.WriteLine("Iteration complete in {0} ms, result: {1}", Environment.TickCount - t, result);
580
683
581 int result2 = 0;
684 int result2 = 0;
582
685
583 t = Environment.TickCount;
686 t = Environment.TickCount;
584 for (int i = 0; i < count; i++)
687 for (int i = 0; i < count; i++)
585 result2 += args[i];
688 result2 += args[i];
586 Assert.AreEqual(result2, result);
689 Assert.AreEqual(result2, result);
587 Console.WriteLine("Verified in {0} ms", Environment.TickCount - t);
690 Console.WriteLine("Verified in {0} ms", Environment.TickCount - t);
588 }
691 }
589
692
590 [TestMethod]
693 [TestMethod]
591 public void ComplexCase1Test() {
694 public void ComplexCase1Test() {
592 var flags = new bool[3];
695 var flags = new bool[3];
593
696
594 // op1 (aync 200ms) => op2 (async 200ms) => op3 (sync map)
697 // op1 (aync 200ms) => op2 (async 200ms) => op3 (sync map)
595
698
596 var step1 = PromiseHelper
699 var step1 = PromiseHelper
597 .Sleep(200, "Alan")
700 .Sleep(200, "Alan")
598 .On(() => flags[0] = true, PromiseEventType.Cancelled);
701 .On(() => flags[0] = true, PromiseEventType.Cancelled);
599 var p = step1
702 var p = step1
600 .Chain(x =>
703 .Chain(x =>
601 PromiseHelper
704 PromiseHelper
602 .Sleep(200, "Hi, " + x)
705 .Sleep(200, "Hi, " + x)
603 .Then(y => y)
706 .Then(y => y)
604 .On(() => flags[1] = true, PromiseEventType.Cancelled)
707 .On(() => flags[1] = true, PromiseEventType.Cancelled)
605 )
708 )
606 .On(() => flags[2] = true, PromiseEventType.Cancelled);
709 .On(() => flags[2] = true, PromiseEventType.Cancelled);
607 step1.Join();
710 step1.Join();
608 p.Cancel();
711 p.Cancel();
609 try {
712 try {
610 Assert.AreEqual(p.Join(), "Hi, Alan");
713 Assert.AreEqual(p.Join(), "Hi, Alan");
611 Assert.Fail("Shouldn't get here");
714 Assert.Fail("Shouldn't get here");
612 } catch (OperationCanceledException) {
715 } catch (OperationCanceledException) {
613 }
716 }
614
717
615 Assert.IsFalse(flags[0]);
718 Assert.IsFalse(flags[0]);
616 Assert.IsTrue(flags[1]);
719 Assert.IsTrue(flags[1]);
617 Assert.IsTrue(flags[2]);
720 Assert.IsTrue(flags[2]);
618 }
721 }
619
722
620 [TestMethod]
723 [TestMethod]
621 public void ChainedCancel1Test() {
724 public void ChainedCancel1Test() {
622 // ΠΏΡ€ΠΈ ΠΎΡ‚ΠΌΠ΅Π½Π΅ сцСплСнной асинхронной ΠΎΠΏΠ΅Ρ€Π°Ρ†ΠΈΠΈ всС ΠΎΠ±Π΅Ρ‰Π°Π½ΠΈΠ΅ Π΄ΠΎΠ»ΠΆΠ½ΠΎ
725 // ΠΏΡ€ΠΈ ΠΎΡ‚ΠΌΠ΅Π½Π΅ сцСплСнной асинхронной ΠΎΠΏΠ΅Ρ€Π°Ρ†ΠΈΠΈ всС ΠΎΠ±Π΅Ρ‰Π°Π½ΠΈΠ΅ Π΄ΠΎΠ»ΠΆΠ½ΠΎ
623 // Π·Π°Π²Π΅Ρ€ΡˆΠ°Ρ‚ΡŒΡΡ ошибкой OperationCanceledException
726 // Π·Π°Π²Π΅Ρ€ΡˆΠ°Ρ‚ΡŒΡΡ ошибкой OperationCanceledException
624 var p = PromiseHelper
727 var p = PromiseHelper
625 .Sleep(1, "Hi, HAL!")
728 .Sleep(1, "Hi, HAL!")
626 .Then(x => {
729 .Then(x => {
627 // запускаСм Π΄Π²Π΅ асинхронныС ΠΎΠΏΠ΅Ρ€Π°Ρ†ΠΈΠΈ
730 // запускаСм Π΄Π²Π΅ асинхронныС ΠΎΠΏΠ΅Ρ€Π°Ρ†ΠΈΠΈ
628 var result = PromiseHelper.Sleep(1000, "HEM ENABLED!!!");
731 var result = PromiseHelper.Sleep(1000, "HEM ENABLED!!!");
629 // вторая опСрация отмСняСт ΠΏΠ΅Ρ€Π²ΡƒΡŽ Π΄ΠΎ Π·Π°Π²Π΅Ρ€ΡˆΠ΅Π½ΠΈΡ
732 // вторая опСрация отмСняСт ΠΏΠ΅Ρ€Π²ΡƒΡŽ Π΄ΠΎ Π·Π°Π²Π΅Ρ€ΡˆΠ΅Π½ΠΈΡ
630 PromiseHelper
733 PromiseHelper
631 .Sleep(100, "HAL, STOP!")
734 .Sleep(100, "HAL, STOP!")
632 .Then(result.Cancel);
735 .Then(result.Cancel);
633 return result;
736 return result;
634 });
737 });
635 try {
738 try {
636 p.Join();
739 p.Join();
637 } catch (TargetInvocationException err) {
740 } catch (TargetInvocationException err) {
638 Assert.IsTrue(err.InnerException is OperationCanceledException);
741 Assert.IsTrue(err.InnerException is OperationCanceledException);
639 }
742 }
640 }
743 }
641
744
642 [TestMethod]
745 [TestMethod]
643 public void ChainedCancel2Test() {
746 public void ChainedCancel2Test() {
644 // ΠΏΡ€ΠΈ ΠΎΡ‚ΠΌΠ΅Π½Π΅ Ρ†Π΅ΠΏΠΎΡ‡ΠΊΠΈ ΠΎΠ±Π΅Ρ‰Π°Π½ΠΈΠΉ, Π²Π»ΠΎΠΆΠ΅Π½Π½Ρ‹Π΅ ΠΎΠΏΠ΅Ρ€Π°Ρ†ΠΈΠΈ Ρ‚Π°ΠΊΠΆΠ΅ Π΄ΠΎΠ»ΠΆΠ½Ρ‹ ΠΎΡ‚ΠΌΠ΅Π½ΡΡ‚ΡŒΡΡ
747 // ΠΏΡ€ΠΈ ΠΎΡ‚ΠΌΠ΅Π½Π΅ Ρ†Π΅ΠΏΠΎΡ‡ΠΊΠΈ ΠΎΠ±Π΅Ρ‰Π°Π½ΠΈΠΉ, Π²Π»ΠΎΠΆΠ΅Π½Π½Ρ‹Π΅ ΠΎΠΏΠ΅Ρ€Π°Ρ†ΠΈΠΈ Ρ‚Π°ΠΊΠΆΠ΅ Π΄ΠΎΠ»ΠΆΠ½Ρ‹ ΠΎΡ‚ΠΌΠ΅Π½ΡΡ‚ΡŒΡΡ
645 var pSurvive = new Promise<bool>();
748 var pSurvive = new Promise<bool>();
646 var hemStarted = new ManualResetEvent(false);
749 var hemStarted = new ManualResetEvent(false);
647 var p = PromiseHelper
750 var p = PromiseHelper
648 .Sleep(1, "Hi, HAL!")
751 .Sleep(1, "Hi, HAL!")
649 .Chain(x => {
752 .Chain(x => {
650 hemStarted.Set();
753 hemStarted.Set();
651 // запускаСм Π΄Π²Π΅ асинхронныС ΠΎΠΏΠ΅Ρ€Π°Ρ†ΠΈΠΈ
754 // запускаСм Π΄Π²Π΅ асинхронныС ΠΎΠΏΠ΅Ρ€Π°Ρ†ΠΈΠΈ
652 var result = PromiseHelper
755 var result = PromiseHelper
653 .Sleep(100000000, "HEM ENABLED!!!")
756 .Sleep(100000000, "HEM ENABLED!!!")
654 .Then(s => {
757 .Then(s => {
655 pSurvive.Resolve(false);
758 pSurvive.Resolve(false);
656 return s;
759 return s;
657 });
760 });
658
761
659 result
762 result
660 .Cancelled(() => pSurvive.Resolve(true));
763 .Cancelled(() => pSurvive.Resolve(true));
661
764
662 return result;
765 return result;
663 });
766 });
664
767
665 hemStarted.WaitOne();
768 hemStarted.WaitOne();
666 p.Cancel();
769 p.Cancel();
667
770
668 try {
771 try {
669 p.Join();
772 p.Join();
670 } catch (OperationCanceledException) {
773 } catch (OperationCanceledException) {
671 Assert.IsTrue(pSurvive.Join());
774 Assert.IsTrue(pSurvive.Join());
672 }
775 }
673 }
776 }
674 }
777 }
675 }
778 }
676
779
@@ -1,207 +1,207
1 using Implab.Diagnostics;
1 using Implab.Diagnostics;
2 using System;
2 using System;
3 using System.Diagnostics;
3 using System.Diagnostics;
4 using System.Threading;
4 using System.Threading;
5
5
6 namespace Implab.Parallels {
6 namespace Implab.Parallels {
7 public static class ArrayTraits {
7 public static class ArrayTraits {
8 class ArrayIterator<TSrc> : DispatchPool<int> {
8 class ArrayIterator<TSrc> : DispatchPool<int> {
9 readonly Action<TSrc> m_action;
9 readonly Action<TSrc> m_action;
10 readonly TSrc[] m_source;
10 readonly TSrc[] m_source;
11 readonly Promise<int> m_promise = new Promise<int>();
11 readonly Promise<int> m_promise = new Promise<int>();
12 readonly LogicalOperation m_logicalOperation;
12 readonly LogicalOperation m_logicalOperation;
13
13
14 int m_pending;
14 int m_pending;
15 int m_next;
15 int m_next;
16
16
17 public ArrayIterator(TSrc[] source, Action<TSrc> action, int threads)
17 public ArrayIterator(TSrc[] source, Action<TSrc> action, int threads)
18 : base(threads) {
18 : base(threads) {
19
19
20 Debug.Assert(source != null);
20 Debug.Assert(source != null);
21 Debug.Assert(action != null);
21 Debug.Assert(action != null);
22
22
23 m_logicalOperation = TraceContext.Instance.CurrentOperation;
23 m_logicalOperation = TraceContext.Instance.CurrentOperation;
24 m_next = 0;
24 m_next = 0;
25 m_source = source;
25 m_source = source;
26 m_pending = source.Length;
26 m_pending = source.Length;
27 m_action = action;
27 m_action = action;
28
28
29 m_promise.On(Dispose, PromiseEventType.All);
29 m_promise.On(Dispose, PromiseEventType.All);
30
30
31 InitPool();
31 InitPool();
32 }
32 }
33
33
34 public Promise<int> Promise {
34 public Promise<int> Promise {
35 get {
35 get {
36 return m_promise;
36 return m_promise;
37 }
37 }
38 }
38 }
39
39
40 protected override void Worker() {
40 protected override void Worker() {
41 TraceContext.Instance.EnterLogicalOperation(m_logicalOperation, false);
41 TraceContext.Instance.EnterLogicalOperation(m_logicalOperation, false);
42 try {
42 try {
43 base.Worker();
43 base.Worker();
44 } finally {
44 } finally {
45 TraceContext.Instance.Leave();
45 TraceContext.Instance.Leave();
46 }
46 }
47 }
47 }
48
48
49 protected override bool TryDequeue(out int unit) {
49 protected override bool TryDequeue(out int unit) {
50 unit = Interlocked.Increment(ref m_next) - 1;
50 unit = Interlocked.Increment(ref m_next) - 1;
51 return unit < m_source.Length;
51 return unit < m_source.Length;
52 }
52 }
53
53
54 protected override void InvokeUnit(int unit) {
54 protected override void InvokeUnit(int unit) {
55 try {
55 try {
56 m_action(m_source[unit]);
56 m_action(m_source[unit]);
57 var pending = Interlocked.Decrement(ref m_pending);
57 var pending = Interlocked.Decrement(ref m_pending);
58 if (pending == 0)
58 if (pending == 0)
59 m_promise.Resolve(m_source.Length);
59 m_promise.Resolve(m_source.Length);
60 } catch (Exception e) {
60 } catch (Exception e) {
61 m_promise.Reject(e);
61 m_promise.Reject(e);
62 }
62 }
63 }
63 }
64 }
64 }
65
65
66 class ArrayMapper<TSrc, TDst>: DispatchPool<int> {
66 class ArrayMapper<TSrc, TDst>: DispatchPool<int> {
67 readonly Func<TSrc, TDst> m_transform;
67 readonly Func<TSrc, TDst> m_transform;
68 readonly TSrc[] m_source;
68 readonly TSrc[] m_source;
69 readonly TDst[] m_dest;
69 readonly TDst[] m_dest;
70 readonly Promise<TDst[]> m_promise = new Promise<TDst[]>();
70 readonly Promise<TDst[]> m_promise = new Promise<TDst[]>();
71 readonly LogicalOperation m_logicalOperation;
71 readonly LogicalOperation m_logicalOperation;
72
72
73 int m_pending;
73 int m_pending;
74 int m_next;
74 int m_next;
75
75
76 public ArrayMapper(TSrc[] source, Func<TSrc, TDst> transform, int threads)
76 public ArrayMapper(TSrc[] source, Func<TSrc, TDst> transform, int threads)
77 : base(threads) {
77 : base(threads) {
78
78
79 Debug.Assert (source != null);
79 Debug.Assert (source != null);
80 Debug.Assert( transform != null);
80 Debug.Assert( transform != null);
81
81
82 m_next = 0;
82 m_next = 0;
83 m_source = source;
83 m_source = source;
84 m_dest = new TDst[source.Length];
84 m_dest = new TDst[source.Length];
85 m_pending = source.Length;
85 m_pending = source.Length;
86 m_transform = transform;
86 m_transform = transform;
87 m_logicalOperation = TraceContext.Instance.CurrentOperation;
87 m_logicalOperation = TraceContext.Instance.CurrentOperation;
88
88
89 m_promise.On(Dispose, PromiseEventType.All);
89 m_promise.On(Dispose, PromiseEventType.All);
90
90
91 InitPool();
91 InitPool();
92 }
92 }
93
93
94 public Promise<TDst[]> Promise {
94 public Promise<TDst[]> Promise {
95 get {
95 get {
96 return m_promise;
96 return m_promise;
97 }
97 }
98 }
98 }
99
99
100 protected override void Worker() {
100 protected override void Worker() {
101 TraceContext.Instance.EnterLogicalOperation(m_logicalOperation,false);
101 TraceContext.Instance.EnterLogicalOperation(m_logicalOperation,false);
102 try {
102 try {
103 base.Worker();
103 base.Worker();
104 } finally {
104 } finally {
105 TraceContext.Instance.Leave();
105 TraceContext.Instance.Leave();
106 }
106 }
107 }
107 }
108
108
109 protected override bool TryDequeue(out int unit) {
109 protected override bool TryDequeue(out int unit) {
110 unit = Interlocked.Increment(ref m_next) - 1;
110 unit = Interlocked.Increment(ref m_next) - 1;
111 return unit < m_source.Length;
111 return unit < m_source.Length;
112 }
112 }
113
113
114 protected override void InvokeUnit(int unit) {
114 protected override void InvokeUnit(int unit) {
115 try {
115 try {
116 m_dest[unit] = m_transform(m_source[unit]);
116 m_dest[unit] = m_transform(m_source[unit]);
117 var pending = Interlocked.Decrement(ref m_pending);
117 var pending = Interlocked.Decrement(ref m_pending);
118 if (pending == 0)
118 if (pending == 0)
119 m_promise.Resolve(m_dest);
119 m_promise.Resolve(m_dest);
120 } catch (Exception e) {
120 } catch (Exception e) {
121 m_promise.Reject(e);
121 m_promise.Reject(e);
122 }
122 }
123 }
123 }
124 }
124 }
125
125
126 public static IPromise<TDst[]> ParallelMap<TSrc, TDst> (this TSrc[] source, Func<TSrc,TDst> transform, int threads) {
126 public static IPromise<TDst[]> ParallelMap<TSrc, TDst> (this TSrc[] source, Func<TSrc,TDst> transform, int threads) {
127 if (source == null)
127 if (source == null)
128 throw new ArgumentNullException("source");
128 throw new ArgumentNullException("source");
129 if (transform == null)
129 if (transform == null)
130 throw new ArgumentNullException("transform");
130 throw new ArgumentNullException("transform");
131
131
132 var mapper = new ArrayMapper<TSrc, TDst>(source, transform, threads);
132 var mapper = new ArrayMapper<TSrc, TDst>(source, transform, threads);
133 return mapper.Promise;
133 return mapper.Promise;
134 }
134 }
135
135
136 public static IPromise<int> ParallelForEach<TSrc>(this TSrc[] source, Action<TSrc> action, int threads) {
136 public static IPromise<int> ParallelForEach<TSrc>(this TSrc[] source, Action<TSrc> action, int threads) {
137 if (source == null)
137 if (source == null)
138 throw new ArgumentNullException("source");
138 throw new ArgumentNullException("source");
139 if (action == null)
139 if (action == null)
140 throw new ArgumentNullException("action");
140 throw new ArgumentNullException("action");
141
141
142 var iter = new ArrayIterator<TSrc>(source, action, threads);
142 var iter = new ArrayIterator<TSrc>(source, action, threads);
143 return iter.Promise;
143 return iter.Promise;
144 }
144 }
145
145
146 public static IPromise<TDst[]> ChainedMap<TSrc, TDst>(this TSrc[] source, Func<TSrc, IPromise<TDst>> transform, int threads) {
146 public static IPromise<TDst[]> ChainedMap<TSrc, TDst>(this TSrc[] source, Func<TSrc, IPromise<TDst>> transform, int threads) {
147 if (source == null)
147 if (source == null)
148 throw new ArgumentNullException("source");
148 throw new ArgumentNullException("source");
149 if (transform == null)
149 if (transform == null)
150 throw new ArgumentNullException("transform");
150 throw new ArgumentNullException("transform");
151 if (threads <= 0)
151 if (threads <= 0)
152 throw new ArgumentOutOfRangeException("threads","Threads number must be greater then zero");
152 throw new ArgumentOutOfRangeException("threads","Threads number must be greater then zero");
153
153
154 if (source.Length == 0)
154 if (source.Length == 0)
155 return Promise<TDst[]>.ResultToPromise(new TDst[0]);
155 return Promise<TDst[]>.ResultToPromise(new TDst[0]);
156
156
157 var promise = new Promise<TDst[]>();
157 var promise = new Promise<TDst[]>();
158 var res = new TDst[source.Length];
158 var res = new TDst[source.Length];
159 var pending = source.Length;
159 var pending = source.Length;
160
160
161 object locker = new object();
161 object locker = new object();
162 int slots = threads;
162 int slots = threads;
163
163
164 // Analysis disable AccessToDisposedClosure
164 // Analysis disable AccessToDisposedClosure
165 AsyncPool.InvokeNewThread<int>(() => {
165 AsyncPool.RunThread<int>(() => {
166 for (int i = 0; i < source.Length; i++) {
166 for (int i = 0; i < source.Length; i++) {
167 if(promise.IsResolved)
167 if(promise.IsResolved)
168 break; // stop processing in case of error or cancellation
168 break; // stop processing in case of error or cancellation
169 var idx = i;
169 var idx = i;
170
170
171 if (Interlocked.Decrement(ref slots) < 0) {
171 if (Interlocked.Decrement(ref slots) < 0) {
172 lock(locker) {
172 lock(locker) {
173 while(slots < 0)
173 while(slots < 0)
174 Monitor.Wait(locker);
174 Monitor.Wait(locker);
175 }
175 }
176 }
176 }
177
177
178 try {
178 try {
179 transform(source[i])
179 transform(source[i])
180 .On( x => {
180 .On( x => {
181 Interlocked.Increment(ref slots);
181 Interlocked.Increment(ref slots);
182 lock (locker) {
182 lock (locker) {
183 Monitor.Pulse(locker);
183 Monitor.Pulse(locker);
184 }
184 }
185 })
185 })
186 .On(
186 .On(
187 x => {
187 x => {
188 res[idx] = x;
188 res[idx] = x;
189 var left = Interlocked.Decrement(ref pending);
189 var left = Interlocked.Decrement(ref pending);
190 if (left == 0)
190 if (left == 0)
191 promise.Resolve(res);
191 promise.Resolve(res);
192 },
192 },
193 promise.Reject
193 promise.Reject
194 );
194 );
195
195
196 } catch (Exception e) {
196 } catch (Exception e) {
197 promise.Reject(e);
197 promise.Reject(e);
198 }
198 }
199 }
199 }
200 return 0;
200 return 0;
201 });
201 });
202
202
203 return promise;
203 return promise;
204 }
204 }
205
205
206 }
206 }
207 }
207 }
@@ -1,86 +1,86
1 using Implab.Diagnostics;
1 using Implab.Diagnostics;
2 using System;
2 using System;
3 using System.Threading;
3 using System.Threading;
4 using System.Linq;
4 using System.Linq;
5
5
6 namespace Implab.Parallels {
6 namespace Implab.Parallels {
7 /// <summary>
7 /// <summary>
8 /// Класс для распаралСливания Π·Π°Π΄Π°Ρ‡.
8 /// Класс для распаралСливания Π·Π°Π΄Π°Ρ‡.
9 /// </summary>
9 /// </summary>
10 /// <remarks>
10 /// <remarks>
11 /// Π˜ΡΠΏΠΎΠ»ΡŒΠ·ΡƒΡ Π΄Π°Π½Π½Ρ‹ΠΉ класс ΠΈ лямда выраТСния ΠΌΠΎΠΆΠ½ΠΎ Ρ€Π°ΡΠΏΠ°Ρ€Π°Π»Π»Π΅Π»ΠΈΡ‚ΡŒ
11 /// Π˜ΡΠΏΠΎΠ»ΡŒΠ·ΡƒΡ Π΄Π°Π½Π½Ρ‹ΠΉ класс ΠΈ лямда выраТСния ΠΌΠΎΠΆΠ½ΠΎ Ρ€Π°ΡΠΏΠ°Ρ€Π°Π»Π»Π΅Π»ΠΈΡ‚ΡŒ
12 /// вычислСния, для этого ΠΈΡΠΏΠΎΠ»ΡŒΠ·ΡƒΠ΅Ρ‚ΡΡ концСпция ΠΎΠ±Π΅Ρ‰Π°Π½ΠΈΠΉ.
12 /// вычислСния, для этого ΠΈΡΠΏΠΎΠ»ΡŒΠ·ΡƒΠ΅Ρ‚ΡΡ концСпция ΠΎΠ±Π΅Ρ‰Π°Π½ΠΈΠΉ.
13 /// </remarks>
13 /// </remarks>
14 public static class AsyncPool {
14 public static class AsyncPool {
15
15
16 public static IPromise<T> Invoke<T>(Func<T> func) {
16 public static IPromise<T> Invoke<T>(Func<T> func) {
17 var p = new Promise<T>();
17 var p = new Promise<T>();
18 var caller = TraceContext.Instance.CurrentOperation;
18 var caller = TraceContext.Instance.CurrentOperation;
19
19
20 ThreadPool.QueueUserWorkItem(param => {
20 ThreadPool.QueueUserWorkItem(param => {
21 TraceContext.Instance.EnterLogicalOperation(caller,false);
21 TraceContext.Instance.EnterLogicalOperation(caller,false);
22 try {
22 try {
23 p.Resolve(func());
23 p.Resolve(func());
24 } catch(Exception e) {
24 } catch(Exception e) {
25 p.Reject(e);
25 p.Reject(e);
26 } finally {
26 } finally {
27 TraceContext.Instance.Leave();
27 TraceContext.Instance.Leave();
28 }
28 }
29 });
29 });
30
30
31 return p;
31 return p;
32 }
32 }
33
33
34 public static IPromise<T> InvokeNewThread<T>(Func<T> func) {
34 public static IPromise<T> RunThread<T>(Func<T> func) {
35 var p = new Promise<T>();
35 var p = new Promise<T>();
36
36
37 var caller = TraceContext.Instance.CurrentOperation;
37 var caller = TraceContext.Instance.CurrentOperation;
38
38
39 var worker = new Thread(() => {
39 var worker = new Thread(() => {
40 TraceContext.Instance.EnterLogicalOperation(caller,false);
40 TraceContext.Instance.EnterLogicalOperation(caller,false);
41 try {
41 try {
42 p.Resolve(func());
42 p.Resolve(func());
43 } catch (Exception e) {
43 } catch (Exception e) {
44 p.Reject(e);
44 p.Reject(e);
45 } finally {
45 } finally {
46 TraceContext.Instance.Leave();
46 TraceContext.Instance.Leave();
47 }
47 }
48 });
48 });
49 worker.IsBackground = true;
49 worker.IsBackground = true;
50 worker.Start();
50 worker.Start();
51
51
52 return p;
52 return p;
53 }
53 }
54
54
55
55
56 public static IPromise InvokeNewThread(Action func) {
56 public static IPromise RunThread(Action func) {
57 var p = new Promise();
57 var p = new Promise();
58
58
59 var caller = TraceContext.Instance.CurrentOperation;
59 var caller = TraceContext.Instance.CurrentOperation;
60
60
61 var worker = new Thread(() => {
61 var worker = new Thread(() => {
62 TraceContext.Instance.EnterLogicalOperation(caller,false);
62 TraceContext.Instance.EnterLogicalOperation(caller,false);
63 try {
63 try {
64 func();
64 func();
65 p.Resolve();
65 p.Resolve();
66 } catch (Exception e) {
66 } catch (Exception e) {
67 p.Reject(e);
67 p.Reject(e);
68 } finally {
68 } finally {
69 TraceContext.Instance.Leave();
69 TraceContext.Instance.Leave();
70 }
70 }
71 });
71 });
72 worker.IsBackground = true;
72 worker.IsBackground = true;
73 worker.Start();
73 worker.Start();
74
74
75 return p;
75 return p;
76 }
76 }
77
77
78 public static IPromise[] RunThread(params Action[] func) {
78 public static IPromise[] RunThread(params Action[] func) {
79 return func.Select(f => InvokeNewThread(f)).ToArray();
79 return func.Select(f => RunThread(f)).ToArray();
80 }
80 }
81
81
82 public static IPromise<T>[] RunThread<T>(params Func<T>[] func) {
82 public static IPromise<T>[] RunThread<T>(params Func<T>[] func) {
83 return func.Select(f => InvokeNewThread(f)).ToArray();
83 return func.Select(f => RunThread(f)).ToArray();
84 }
84 }
85 }
85 }
86 }
86 }
@@ -1,576 +1,625
1 using System.Threading;
1 using System.Threading;
2 using System.Collections.Generic;
2 using System.Collections.Generic;
3 using System;
3 using System;
4 using System.Collections;
4 using System.Collections;
5 using System.Diagnostics;
5
6
6 namespace Implab.Parallels {
7 namespace Implab.Parallels {
7 public class AsyncQueue<T> : IEnumerable<T> {
8 public class AsyncQueue<T> : IEnumerable<T> {
8 class Chunk {
9 class Chunk {
9 public Chunk next;
10 public Chunk next;
10
11
11 int m_low;
12 int m_low;
12 int m_hi;
13 int m_hi;
13 int m_alloc;
14 int m_alloc;
14 readonly int m_size;
15 readonly int m_size;
15 readonly T[] m_data;
16 readonly T[] m_data;
16
17
17 public Chunk(int size) {
18 public Chunk(int size) {
18 m_size = size;
19 m_size = size;
19 m_data = new T[size];
20 m_data = new T[size];
20 }
21 }
21
22
22 public Chunk(int size, T value) {
23 public Chunk(int size, T value) {
23 m_size = size;
24 m_size = size;
24 m_hi = 1;
25 m_hi = 1;
25 m_alloc = 1;
26 m_alloc = 1;
26 m_data = new T[size];
27 m_data = new T[size];
27 m_data[0] = value;
28 m_data[0] = value;
28 }
29 }
29
30
30 public Chunk(int size, T[] data, int offset, int length, int alloc) {
31 public Chunk(int size, T[] data, int offset, int length, int alloc) {
31 m_size = size;
32 m_size = size;
32 m_hi = length;
33 m_hi = length;
33 m_alloc = alloc;
34 m_alloc = alloc;
34 m_data = new T[size];
35 m_data = new T[size];
35 Array.Copy(data, offset, m_data, 0, length);
36 Array.Copy(data, offset, m_data, 0, length);
36 }
37 }
37
38
38 public int Low {
39 public int Low {
39 get { return m_low; }
40 get { return m_low; }
40 }
41 }
41
42
42 public int Hi {
43 public int Hi {
43 get { return m_hi; }
44 get { return m_hi; }
44 }
45 }
45
46
46 public bool TryEnqueue(T value, out bool extend) {
47 public bool TryEnqueue(T value, out bool extend) {
47 var alloc = Interlocked.Increment(ref m_alloc) - 1;
48 var alloc = Interlocked.Increment(ref m_alloc) - 1;
48
49
49 if (alloc >= m_size) {
50 if (alloc >= m_size) {
50 extend = alloc == m_size;
51 extend = alloc == m_size;
51 return false;
52 return false;
52 }
53 }
53
54
54 extend = false;
55 extend = false;
55 m_data[alloc] = value;
56 m_data[alloc] = value;
56
57
57 while (alloc != Interlocked.CompareExchange(ref m_hi, alloc + 1, alloc)) {
58 while (alloc != Interlocked.CompareExchange(ref m_hi, alloc + 1, alloc)) {
58 // spin wait for commit
59 // spin wait for commit
59 }
60 }
60 return true;
61 return true;
61 }
62 }
62
63
64 /// <summary>
65 /// Prevents from allocating new space in the chunk and waits for all write operations to complete
66 /// </summary>
67 public void Commit() {
68 var actual = Math.Min(Interlocked.Exchange(ref m_alloc, m_size + 1), m_size);
69
70 while (m_hi != actual)
71 Thread.MemoryBarrier();
72 }
73
63 public bool TryDequeue(out T value, out bool recycle) {
74 public bool TryDequeue(out T value, out bool recycle) {
64 int low;
75 int low;
65 do {
76 do {
66 low = m_low;
77 low = m_low;
67 if (low >= m_hi) {
78 if (low >= m_hi) {
68 value = default(T);
79 value = default(T);
69 recycle = (low == m_size);
80 recycle = (low == m_size);
70 return false;
81 return false;
71 }
82 }
72 } while(low != Interlocked.CompareExchange(ref m_low, low + 1, low));
83 } while(low != Interlocked.CompareExchange(ref m_low, low + 1, low));
73
84
74 recycle = (low == m_size - 1);
85 recycle = (low == m_size - 1);
75 value = m_data[low];
86 value = m_data[low];
76
87
77 return true;
88 return true;
78 }
89 }
79
90
80 public bool TryEnqueueBatch(T[] batch, int offset, int length, out int enqueued, out bool extend) {
91 public bool TryEnqueueBatch(T[] batch, int offset, int length, out int enqueued, out bool extend) {
81 //int alloc;
92 //int alloc;
82 //int allocSize;
93 //int allocSize;
83
94
84 var alloc = Interlocked.Add(ref m_alloc, length) - length;
95 var alloc = Interlocked.Add(ref m_alloc, length) - length;
85 if (alloc > m_size) {
96 if (alloc > m_size) {
86 // the chunk is full and someone already
97 // the chunk is full and someone already
87 // creating the new one
98 // creating the new one
88 enqueued = 0; // nothing was added
99 enqueued = 0; // nothing was added
89 extend = false; // the caller shouldn't try to extend the queue
100 extend = false; // the caller shouldn't try to extend the queue
90 return false; // nothing was added
101 return false; // nothing was added
91 }
102 }
92
103
93 enqueued = Math.Min(m_size - alloc, length);
104 enqueued = Math.Min(m_size - alloc, length);
94 extend = length > enqueued;
105 extend = length > enqueued;
95
106
96 if (enqueued == 0)
107 if (enqueued == 0)
97 return false;
108 return false;
98
109
99
110
100 Array.Copy(batch, offset, m_data, alloc, enqueued);
111 Array.Copy(batch, offset, m_data, alloc, enqueued);
101
112
102 while (alloc != Interlocked.CompareExchange(ref m_hi, alloc + enqueued, alloc)) {
113 while (alloc != Interlocked.CompareExchange(ref m_hi, alloc + enqueued, alloc)) {
103 // spin wait for commit
114 // spin wait for commit
104 }
115 }
105
116
106 return true;
117 return true;
107 }
118 }
108
119
109 public bool TryDequeueBatch(T[] buffer, int offset, int length,out int dequeued, out bool recycle) {
120 public bool TryDequeueBatch(T[] buffer, int offset, int length,out int dequeued, out bool recycle) {
110 int low, hi, batchSize;
121 int low, hi, batchSize;
111
122
112 do {
123 do {
113 low = m_low;
124 low = m_low;
114 hi = m_hi;
125 hi = m_hi;
115 if (low >= hi) {
126 if (low >= hi) {
116 dequeued = 0;
127 dequeued = 0;
117 recycle = (low == m_size); // recycling could be restarted and we need to signal again
128 recycle = (low == m_size); // recycling could be restarted and we need to signal again
118 return false;
129 return false;
119 }
130 }
120 batchSize = Math.Min(hi - low, length);
131 batchSize = Math.Min(hi - low, length);
121 } while(low != Interlocked.CompareExchange(ref m_low, low + batchSize, low));
132 } while(low != Interlocked.CompareExchange(ref m_low, low + batchSize, low));
122
133
123 recycle = (low == m_size - batchSize);
134 recycle = (low == m_size - batchSize);
124 dequeued = batchSize;
135 dequeued = batchSize;
125
136
126 Array.Copy(m_data, low, buffer, offset, batchSize);
137 Array.Copy(m_data, low, buffer, offset, batchSize);
127
138
128 return true;
139 return true;
129 }
140 }
130
141
131 public T GetAt(int pos) {
142 public T GetAt(int pos) {
132 return m_data[pos];
143 return m_data[pos];
133 }
144 }
134 }
145 }
135
146
136 public const int DEFAULT_CHUNK_SIZE = 32;
147 public const int DEFAULT_CHUNK_SIZE = 32;
137 public const int MAX_CHUNK_SIZE = 262144;
148 public const int MAX_CHUNK_SIZE = 262144;
138
149
139 readonly int m_chunkSize = DEFAULT_CHUNK_SIZE;
150 readonly int m_chunkSize = DEFAULT_CHUNK_SIZE;
140
151
141 Chunk m_first;
152 Chunk m_first;
142 Chunk m_last;
153 Chunk m_last;
143
154
144 public AsyncQueue() {
155 public AsyncQueue() {
145 m_last = m_first = new Chunk(m_chunkSize);
156 m_last = m_first = new Chunk(m_chunkSize);
146 }
157 }
147
158
148 /// <summary>
159 /// <summary>
149 /// Adds the specified value to the queue.
160 /// Adds the specified value to the queue.
150 /// </summary>
161 /// </summary>
151 /// <param name="value">Tha value which will be added to the queue.</param>
162 /// <param name="value">Tha value which will be added to the queue.</param>
152 public void Enqueue(T value) {
163 public void Enqueue(T value) {
153 var last = m_last;
164 var last = m_last;
154 // spin wait to the new chunk
165 // spin wait to the new chunk
155 bool extend = true;
166 bool extend = true;
156 while (last == null || !last.TryEnqueue(value, out extend)) {
167 while (last == null || !last.TryEnqueue(value, out extend)) {
157 // try to extend queue
168 // try to extend queue
158 if (extend || last == null) {
169 if (extend || last == null) {
159 var chunk = new Chunk(m_chunkSize, value);
170 var chunk = new Chunk(m_chunkSize, value);
160 if (EnqueueChunk(last, chunk))
171 if (EnqueueChunk(last, chunk))
161 break; // success! exit!
172 break; // success! exit!
162 last = m_last;
173 last = m_last;
163 } else {
174 } else {
164 while (last == m_last) {
175 while (last == m_last) {
165 Thread.MemoryBarrier();
176 Thread.MemoryBarrier();
166 }
177 }
167 last = m_last;
178 last = m_last;
168 }
179 }
169 }
180 }
170 }
181 }
171
182
172 /// <summary>
183 /// <summary>
173 /// Adds the specified data to the queue.
184 /// Adds the specified data to the queue.
174 /// </summary>
185 /// </summary>
175 /// <param name="data">The buffer which contains the data to be enqueued.</param>
186 /// <param name="data">The buffer which contains the data to be enqueued.</param>
176 /// <param name="offset">The offset of the data in the buffer.</param>
187 /// <param name="offset">The offset of the data in the buffer.</param>
177 /// <param name="length">The size of the data to read from the buffer.</param>
188 /// <param name="length">The size of the data to read from the buffer.</param>
178 public void EnqueueRange(T[] data, int offset, int length) {
189 public void EnqueueRange(T[] data, int offset, int length) {
179 if (data == null)
190 if (data == null)
180 throw new ArgumentNullException("data");
191 throw new ArgumentNullException("data");
181 if (offset < 0)
192 if (offset < 0)
182 throw new ArgumentOutOfRangeException("offset");
193 throw new ArgumentOutOfRangeException("offset");
183 if (length < 1 || offset + length > data.Length)
194 if (length < 1 || offset + length > data.Length)
184 throw new ArgumentOutOfRangeException("length");
195 throw new ArgumentOutOfRangeException("length");
185
196
186 var last = m_last;
197 var last = m_last;
187
198
188 bool extend;
199 bool extend;
189 int enqueued;
200 int enqueued;
190
201
191 while (length > 0) {
202 while (length > 0) {
192 extend = true;
203 extend = true;
193 if (last != null && last.TryEnqueueBatch(data, offset, length, out enqueued, out extend)) {
204 if (last != null && last.TryEnqueueBatch(data, offset, length, out enqueued, out extend)) {
194 length -= enqueued;
205 length -= enqueued;
195 offset += enqueued;
206 offset += enqueued;
196 }
207 }
197
208
198 if (extend) {
209 if (extend) {
199 // there was no enough space in the chunk
210 // there was no enough space in the chunk
200 // or there was no chunks in the queue
211 // or there was no chunks in the queue
201
212
202 while (length > 0) {
213 while (length > 0) {
203
214
204 var size = Math.Min(length, MAX_CHUNK_SIZE);
215 var size = Math.Min(length, MAX_CHUNK_SIZE);
205
216
206 var chunk = new Chunk(
217 var chunk = new Chunk(
207 Math.Max(size, m_chunkSize),
218 Math.Max(size, m_chunkSize),
208 data,
219 data,
209 offset,
220 offset,
210 size,
221 size,
211 length // length >= size
222 length // length >= size
212 );
223 );
213
224
214 if (!EnqueueChunk(last, chunk)) {
225 if (!EnqueueChunk(last, chunk)) {
215 // looks like the queue has been updated then proceed from the beginning
226 // looks like the queue has been updated then proceed from the beginning
216 last = m_last;
227 last = m_last;
217 break;
228 break;
218 }
229 }
219
230
220 // we have successfully added the new chunk
231 // we have successfully added the new chunk
221 last = chunk;
232 last = chunk;
222 length -= size;
233 length -= size;
223 offset += size;
234 offset += size;
224 }
235 }
225 } else {
236 } else {
226 // we don't need to extend the queue, if we successfully enqueued data
237 // we don't need to extend the queue, if we successfully enqueued data
227 if (length == 0)
238 if (length == 0)
228 break;
239 break;
229
240
230 // if we need to wait while someone is extending the queue
241 // if we need to wait while someone is extending the queue
231 // spinwait
242 // spinwait
232 while (last == m_last) {
243 while (last == m_last) {
233 Thread.MemoryBarrier();
244 Thread.MemoryBarrier();
234 }
245 }
235
246
236 last = m_last;
247 last = m_last;
237 }
248 }
238 }
249 }
239 }
250 }
240
251
241 /// <summary>
252 /// <summary>
242 /// Tries to retrieve the first element from the queue.
253 /// Tries to retrieve the first element from the queue.
243 /// </summary>
254 /// </summary>
244 /// <returns><c>true</c>, if element is dequeued, <c>false</c> otherwise.</returns>
255 /// <returns><c>true</c>, if element is dequeued, <c>false</c> otherwise.</returns>
245 /// <param name="value">The value of the dequeued element.</param>
256 /// <param name="value">The value of the dequeued element.</param>
246 public bool TryDequeue(out T value) {
257 public bool TryDequeue(out T value) {
247 var chunk = m_first;
258 var chunk = m_first;
248 bool recycle;
259 bool recycle;
249 while (chunk != null) {
260 while (chunk != null) {
250
261
251 var result = chunk.TryDequeue(out value, out recycle);
262 var result = chunk.TryDequeue(out value, out recycle);
252
263
253 if (recycle) // this chunk is waste
264 if (recycle) // this chunk is waste
254 RecycleFirstChunk(chunk);
265 RecycleFirstChunk(chunk);
255 else
266 else
256 return result; // this chunk is usable and returned actual result
267 return result; // this chunk is usable and returned actual result
257
268
258 if (result) // this chunk is waste but the true result is always actual
269 if (result) // this chunk is waste but the true result is always actual
259 return true;
270 return true;
260
271
261 // try again
272 // try again
262 chunk = m_first;
273 chunk = m_first;
263 }
274 }
264
275
265 // the queue is empty
276 // the queue is empty
266 value = default(T);
277 value = default(T);
267 return false;
278 return false;
268 }
279 }
269
280
270 /// <summary>
281 /// <summary>
271 /// Tries to dequeue the specified amount of data from the queue.
282 /// Tries to dequeue the specified amount of data from the queue.
272 /// </summary>
283 /// </summary>
273 /// <returns><c>true</c>, if data was deuqueued, <c>false</c> otherwise.</returns>
284 /// <returns><c>true</c>, if data was deuqueued, <c>false</c> otherwise.</returns>
274 /// <param name="buffer">The buffer to which the data will be written.</param>
285 /// <param name="buffer">The buffer to which the data will be written.</param>
275 /// <param name="offset">The offset in the buffer at which the data will be written.</param>
286 /// <param name="offset">The offset in the buffer at which the data will be written.</param>
276 /// <param name="length">The maximum amount of data to be retrieved.</param>
287 /// <param name="length">The maximum amount of data to be retrieved.</param>
277 /// <param name="dequeued">The actual amout of the retrieved data.</param>
288 /// <param name="dequeued">The actual amout of the retrieved data.</param>
278 public bool TryDequeueRange(T[] buffer, int offset, int length, out int dequeued) {
289 public bool TryDequeueRange(T[] buffer, int offset, int length, out int dequeued) {
279 if (buffer == null)
290 if (buffer == null)
280 throw new ArgumentNullException("buffer");
291 throw new ArgumentNullException("buffer");
281 if (offset < 0)
292 if (offset < 0)
282 throw new ArgumentOutOfRangeException("offset");
293 throw new ArgumentOutOfRangeException("offset");
283 if (length < 1 || offset + length > buffer.Length)
294 if (length < 1 || offset + length > buffer.Length)
284 throw new ArgumentOutOfRangeException("length");
295 throw new ArgumentOutOfRangeException("length");
285
296
286 var chunk = m_first;
297 var chunk = m_first;
287 bool recycle;
298 bool recycle;
288 dequeued = 0;
299 dequeued = 0;
289 while (chunk != null) {
300 while (chunk != null) {
290
301
291 int actual;
302 int actual;
292 if (chunk.TryDequeueBatch(buffer, offset, length, out actual, out recycle)) {
303 if (chunk.TryDequeueBatch(buffer, offset, length, out actual, out recycle)) {
293 offset += actual;
304 offset += actual;
294 length -= actual;
305 length -= actual;
295 dequeued += actual;
306 dequeued += actual;
296 }
307 }
297
308
298 if (recycle) // this chunk is waste
309 if (recycle) // this chunk is waste
299 RecycleFirstChunk(chunk);
310 RecycleFirstChunk(chunk);
300 else if (actual == 0)
311 else if (actual == 0)
301 break; // the chunk is usable, but doesn't contain any data (it's the last chunk in the queue)
312 break; // the chunk is usable, but doesn't contain any data (it's the last chunk in the queue)
302
313
303 if (length == 0)
314 if (length == 0)
304 return true;
315 return true;
305
316
306 // we still may dequeue something
317 // we still may dequeue something
307 // try again
318 // try again
308 chunk = m_first;
319 chunk = m_first;
309 }
320 }
310
321
311 return dequeued != 0;
322 return dequeued != 0;
312 }
323 }
313
324
314 /// <summary>
325 /// <summary>
315 /// Tries to dequeue all remaining data in the first chunk.
326 /// Tries to dequeue all remaining data in the first chunk.
316 /// </summary>
327 /// </summary>
317 /// <returns><c>true</c>, if data was dequeued, <c>false</c> otherwise.</returns>
328 /// <returns><c>true</c>, if data was dequeued, <c>false</c> otherwise.</returns>
318 /// <param name="buffer">The buffer to which the data will be written.</param>
329 /// <param name="buffer">The buffer to which the data will be written.</param>
319 /// <param name="offset">The offset in the buffer at which the data will be written.</param>
330 /// <param name="offset">The offset in the buffer at which the data will be written.</param>
320 /// <param name="length">Tha maximum amount of the data to be dequeued.</param>
331 /// <param name="length">Tha maximum amount of the data to be dequeued.</param>
321 /// <param name="dequeued">The actual amount of the dequeued data.</param>
332 /// <param name="dequeued">The actual amount of the dequeued data.</param>
322 public bool TryDequeueChunk(T[] buffer, int offset, int length, out int dequeued) {
333 public bool TryDequeueChunk(T[] buffer, int offset, int length, out int dequeued) {
323 if (buffer == null)
334 if (buffer == null)
324 throw new ArgumentNullException("buffer");
335 throw new ArgumentNullException("buffer");
325 if (offset < 0)
336 if (offset < 0)
326 throw new ArgumentOutOfRangeException("offset");
337 throw new ArgumentOutOfRangeException("offset");
327 if (length < 1 || offset + length > buffer.Length)
338 if (length < 1 || offset + length > buffer.Length)
328 throw new ArgumentOutOfRangeException("length");
339 throw new ArgumentOutOfRangeException("length");
329
340
330 var chunk = m_first;
341 var chunk = m_first;
331 bool recycle;
342 bool recycle;
332 dequeued = 0;
343 dequeued = 0;
333
344
334 while (chunk != null) {
345 while (chunk != null) {
335
346
336 int actual;
347 int actual;
337 if (chunk.TryDequeueBatch(buffer, offset, length, out actual, out recycle)) {
348 if (chunk.TryDequeueBatch(buffer, offset, length, out actual, out recycle)) {
338 dequeued = actual;
349 dequeued = actual;
339 }
350 }
340
351
341 if (recycle) // this chunk is waste
352 if (recycle) // this chunk is waste
342 RecycleFirstChunk(chunk);
353 RecycleFirstChunk(chunk);
343
354
344 // if we have dequeued any data, then return
355 // if we have dequeued any data, then return
345 if (dequeued != 0)
356 if (dequeued != 0)
346 return true;
357 return true;
347
358
348 // we still may dequeue something
359 // we still may dequeue something
349 // try again
360 // try again
350 chunk = m_first;
361 chunk = m_first;
351 }
362 }
352
363
353 return false;
364 return false;
354 }
365 }
355
366
356 bool EnqueueChunk(Chunk last, Chunk chunk) {
367 bool EnqueueChunk(Chunk last, Chunk chunk) {
357 if (Interlocked.CompareExchange(ref m_last, chunk, last) != last)
368 if (Interlocked.CompareExchange(ref m_last, chunk, last) != last)
358 return false;
369 return false;
359
370
360 if (last != null)
371 if (last != null)
361 last.next = chunk;
372 last.next = chunk;
362 else
373 else {
363 m_first = chunk;
374 m_first = chunk;
375 }
364 return true;
376 return true;
365 }
377 }
366
378
367 void RecycleFirstChunk(Chunk first) {
379 void RecycleFirstChunk(Chunk first) {
368 var next = first.next;
380 var next = first.next;
369
381
382 if (first != Interlocked.CompareExchange(ref m_first, next, first))
383 return;
384
370 if (next == null) {
385 if (next == null) {
371 // looks like this is the last chunk
386
372 if (first != Interlocked.CompareExchange(ref m_last, null, first)) {
387 if (first != Interlocked.CompareExchange(ref m_last, null, first)) {
388 /*while (first.next == null)
389 Thread.MemoryBarrier();*/
390
373 // race
391 // race
374 // maybe someone already recycled this chunk
392 // someone already updated the tail, restore the pointer to the queue head
375 // or a new chunk has been appedned to the queue
393 m_first = first;
376
377 return; // give up
378 }
394 }
379 // the tail is updated
395 // the tail is updated
380 }
396 }
381
397
382 // we need to update the head
398 // we need to update the head
383 Interlocked.CompareExchange(ref m_first, next, first);
399 //Interlocked.CompareExchange(ref m_first, next, first);
384 // if the head is already updated then give up
400 // if the head is already updated then give up
385 return;
401 //return;
386
402
387 }
403 }
388
404
389 public void Clear() {
405 public void Clear() {
390 // start the new queue
406 // start the new queue
391 var t = new Chunk(m_chunkSize);
407 var chunk = new Chunk(m_chunkSize);
392 Thread.MemoryBarrier();
408
393 m_last = t;
409 do {
394 Thread.MemoryBarrier();
410 Thread.MemoryBarrier();
411 var first = m_first;
412 var last = m_last;
413
414 if (last == null) // nothing to clear
415 return;
395
416
396 // make the new queue available to the readers, and stop the old one
417 if (first == null || (first.next == null && first != last)) // inconcistency
397 m_first = t;
418 continue;
398 Thread.MemoryBarrier();
419
420 // here we will create inconsistency which will force others to spin
421 // and prevent from fetching. chunk.next = null
422 if (first != Interlocked.CompareExchange(ref m_first, chunk, first))
423 continue;// inconsistent
424
425 m_last = chunk;
426
427 return;
428
429 } while(true);
399 }
430 }
400
431
401 public T[] Drain() {
432 public T[] Drain() {
402 // start the new queue
433 // start the new queue
403 var t = new Chunk(m_chunkSize);
434 var chunk = new Chunk(m_chunkSize);
404 Thread.MemoryBarrier();
405 m_last = t;
406 Thread.MemoryBarrier();
407
408 // make the new queue available to the readers, and stop the old one
409 Chunk first;
410
435
411 do {
436 do {
412 first = m_first;
437 Thread.MemoryBarrier();
413 } while(first != Interlocked.CompareExchange(ref m_first
438 var first = m_first;
414 Thread.MemoryBarrier();
439 var last = m_last;
440
441 if (last == null)
442 return new T[0];
443
444 if (first == null || (first.next == null && first != last))
445 continue;
415
446
447 // here we will create inconsistency which will force others to spin
448 // and prevent from fetching. chunk.next = null
449 if (first != Interlocked.CompareExchange(ref m_first, chunk, first))
450 continue;// inconsistent
416
451
452 last = Interlocked.Exchange(ref m_last, chunk);
453
454 return ReadChunks(first, last);
455
456 } while(true);
417 }
457 }
418
458
419 T[] ReadChunks(Chunk chunk) {
459 T[] ReadChunks(Chunk chunk, object last) {
420 var result = new List<T>();
460 var result = new List<T>();
421 var buffer = new T[m_chunkSize];
461 var buffer = new T[m_chunkSize];
422 int actual;
462 int actual;
423 bool recycle;
463 bool recycle;
424 while (chunk != null) {
464 while (chunk != null) {
465 // ensure all write operations on the chunk are complete
466 chunk.Commit();
467
425 // we need to read the chunk using this way
468 // we need to read the chunk using this way
426 // since some client still may completing the dequeue
469 // since some client still may completing the dequeue
427 // operation, such clients most likely won't get results
470 // operation, such clients most likely won't get results
428 while (chunk.TryDequeueBatch(buffer, 0, buffer.Length, out actual, out recycle))
471 while (chunk.TryDequeueBatch(buffer, 0, buffer.Length, out actual, out recycle))
429 result.AddRange(new ArraySegmentCollection(buffer, 0, actual));
472 result.AddRange(new ArraySegmentCollection(buffer, 0, actual));
430
473
431 chunk = chunk.next;
474 if (chunk == last) {
475 chunk = null;
476 } else {
477 while (chunk.next == null)
478 Thread.MemoryBarrier();
479 chunk = chunk.next;
480 }
432 }
481 }
433
482
434 return result.ToArray();
483 return result.ToArray();
435 }
484 }
436
485
437 struct ArraySegmentCollection : ICollection<T> {
486 struct ArraySegmentCollection : ICollection<T> {
438 readonly T[] m_data;
487 readonly T[] m_data;
439 readonly int m_offset;
488 readonly int m_offset;
440 readonly int m_length;
489 readonly int m_length;
441
490
442 public ArraySegmentCollection(T[] data, int offset, int length) {
491 public ArraySegmentCollection(T[] data, int offset, int length) {
443 m_data = data;
492 m_data = data;
444 m_offset = offset;
493 m_offset = offset;
445 m_length = length;
494 m_length = length;
446 }
495 }
447
496
448 #region ICollection implementation
497 #region ICollection implementation
449
498
450 public void Add(T item) {
499 public void Add(T item) {
451 throw new InvalidOperationException();
500 throw new InvalidOperationException();
452 }
501 }
453
502
454 public void Clear() {
503 public void Clear() {
455 throw new InvalidOperationException();
504 throw new InvalidOperationException();
456 }
505 }
457
506
458 public bool Contains(T item) {
507 public bool Contains(T item) {
459 return false;
508 return false;
460 }
509 }
461
510
462 public void CopyTo(T[] array, int arrayIndex) {
511 public void CopyTo(T[] array, int arrayIndex) {
463 Array.Copy(m_data,m_offset,array,arrayIndex, m_length);
512 Array.Copy(m_data,m_offset,array,arrayIndex, m_length);
464 }
513 }
465
514
466 public bool Remove(T item) {
515 public bool Remove(T item) {
467 throw new NotImplementedException();
516 throw new NotImplementedException();
468 }
517 }
469
518
470 public int Count {
519 public int Count {
471 get {
520 get {
472 return m_length;
521 return m_length;
473 }
522 }
474 }
523 }
475
524
476 public bool IsReadOnly {
525 public bool IsReadOnly {
477 get {
526 get {
478 return true;
527 return true;
479 }
528 }
480 }
529 }
481
530
482 #endregion
531 #endregion
483
532
484 #region IEnumerable implementation
533 #region IEnumerable implementation
485
534
486 public IEnumerator<T> GetEnumerator() {
535 public IEnumerator<T> GetEnumerator() {
487 for (int i = m_offset; i < m_length + m_offset; i++)
536 for (int i = m_offset; i < m_length + m_offset; i++)
488 yield return m_data[i];
537 yield return m_data[i];
489 }
538 }
490
539
491 #endregion
540 #endregion
492
541
493 #region IEnumerable implementation
542 #region IEnumerable implementation
494
543
495 IEnumerator IEnumerable.GetEnumerator() {
544 IEnumerator IEnumerable.GetEnumerator() {
496 return GetEnumerator();
545 return GetEnumerator();
497 }
546 }
498
547
499 #endregion
548 #endregion
500 }
549 }
501
550
502 #region IEnumerable implementation
551 #region IEnumerable implementation
503
552
504 class Enumerator : IEnumerator<T> {
553 class Enumerator : IEnumerator<T> {
505 Chunk m_current;
554 Chunk m_current;
506 int m_pos = -1;
555 int m_pos = -1;
507
556
508 public Enumerator(Chunk fisrt) {
557 public Enumerator(Chunk fisrt) {
509 m_current = fisrt;
558 m_current = fisrt;
510 }
559 }
511
560
512 #region IEnumerator implementation
561 #region IEnumerator implementation
513
562
514 public bool MoveNext() {
563 public bool MoveNext() {
515 if (m_current == null)
564 if (m_current == null)
516 return false;
565 return false;
517
566
518 if (m_pos == -1)
567 if (m_pos == -1)
519 m_pos = m_current.Low;
568 m_pos = m_current.Low;
520 else
569 else
521 m_pos++;
570 m_pos++;
522 if (m_pos == m_current.Hi) {
571 if (m_pos == m_current.Hi) {
523 m_pos = 0;
572 m_pos = 0;
524 m_current = m_current.next;
573 m_current = m_current.next;
525 }
574 }
526
575
527 return true;
576 return true;
528 }
577 }
529
578
530 public void Reset() {
579 public void Reset() {
531 throw new NotSupportedException();
580 throw new NotSupportedException();
532 }
581 }
533
582
534 object IEnumerator.Current {
583 object IEnumerator.Current {
535 get {
584 get {
536 return Current;
585 return Current;
537 }
586 }
538 }
587 }
539
588
540 #endregion
589 #endregion
541
590
542 #region IDisposable implementation
591 #region IDisposable implementation
543
592
544 public void Dispose() {
593 public void Dispose() {
545 }
594 }
546
595
547 #endregion
596 #endregion
548
597
549 #region IEnumerator implementation
598 #region IEnumerator implementation
550
599
551 public T Current {
600 public T Current {
552 get {
601 get {
553 if (m_pos == -1 || m_current == null)
602 if (m_pos == -1 || m_current == null)
554 throw new InvalidOperationException();
603 throw new InvalidOperationException();
555 return m_current.GetAt(m_pos);
604 return m_current.GetAt(m_pos);
556 }
605 }
557 }
606 }
558
607
559 #endregion
608 #endregion
560 }
609 }
561
610
562 public IEnumerator<T> GetEnumerator() {
611 public IEnumerator<T> GetEnumerator() {
563 return new Enumerator(m_first);
612 return new Enumerator(m_first);
564 }
613 }
565
614
566 #endregion
615 #endregion
567
616
568 #region IEnumerable implementation
617 #region IEnumerable implementation
569
618
570 IEnumerator IEnumerable.GetEnumerator() {
619 IEnumerator IEnumerable.GetEnumerator() {
571 return GetEnumerator();
620 return GetEnumerator();
572 }
621 }
573
622
574 #endregion
623 #endregion
575 }
624 }
576 }
625 }
@@ -1,134 +1,187
1 using System.Threading;
1 using System.Threading;
2 using System;
2 using System;
3 using Implab.Diagnostics;
3 using Implab.Diagnostics;
4 using System.Collections.Generic;
4 using System.Collections.Generic;
5
5
6
6
7 #if NET_4_5
7 #if NET_4_5
8 using System.Threading.Tasks;
8 using System.Threading.Tasks;
9 #endif
9 #endif
10
10
11 namespace Implab {
11 namespace Implab {
12 public static class PromiseExtensions {
12 public static class PromiseExtensions {
13 public static IPromise<T> DispatchToCurrentContext<T>(this IPromise<T> that) {
13 public static IPromise<T> DispatchToCurrentContext<T>(this IPromise<T> that) {
14 Safe.ArgumentNotNull(that, "that");
14 Safe.ArgumentNotNull(that, "that");
15 var context = SynchronizationContext.Current;
15 var context = SynchronizationContext.Current;
16 if (context == null)
16 if (context == null)
17 return that;
17 return that;
18
18
19 var p = new SyncContextPromise<T>(context);
19 var p = new SyncContextPromise<T>(context);
20 p.On(that.Cancel, PromiseEventType.Cancelled);
20 p.On(that.Cancel, PromiseEventType.Cancelled);
21
21
22 that.On(
22 that.On(
23 p.Resolve,
23 p.Resolve,
24 p.Reject,
24 p.Reject,
25 p.Cancel
25 p.Cancel
26 );
26 );
27 return p;
27 return p;
28 }
28 }
29
29
30 public static IPromise<T> DispatchToContext<T>(this IPromise<T> that, SynchronizationContext context) {
30 public static IPromise<T> DispatchToContext<T>(this IPromise<T> that, SynchronizationContext context) {
31 Safe.ArgumentNotNull(that, "that");
31 Safe.ArgumentNotNull(that, "that");
32 Safe.ArgumentNotNull(context, "context");
32 Safe.ArgumentNotNull(context, "context");
33
33
34 var p = new SyncContextPromise<T>(context);
34 var p = new SyncContextPromise<T>(context);
35 p.On(that.Cancel, PromiseEventType.Cancelled);
35 p.On(that.Cancel, PromiseEventType.Cancelled);
36
36
37
37
38 that.On(
38 that.On(
39 p.Resolve,
39 p.Resolve,
40 p.Reject,
40 p.Reject,
41 p.Cancel
41 p.Cancel
42 );
42 );
43 return p;
43 return p;
44 }
44 }
45
45
46 /// <summary>
46 /// <summary>
47 /// Ensures the dispatched.
47 /// Ensures the dispatched.
48 /// </summary>
48 /// </summary>
49 /// <returns>The dispatched.</returns>
49 /// <returns>The dispatched.</returns>
50 /// <param name="that">That.</param>
50 /// <param name="that">That.</param>
51 /// <param name="head">Head.</param>
51 /// <param name="head">Head.</param>
52 /// <param name="cleanup">Cleanup.</param>
52 /// <param name="cleanup">Cleanup.</param>
53 /// <typeparam name="TPromise">The 1st type parameter.</typeparam>
53 /// <typeparam name="TPromise">The 1st type parameter.</typeparam>
54 /// <typeparam name="T">The 2nd type parameter.</typeparam>
54 /// <typeparam name="T">The 2nd type parameter.</typeparam>
55 public static TPromise EnsureDispatched<TPromise,T>(this TPromise that, IPromise<T> head, Action<T> cleanup) where TPromise : IPromise{
55 public static TPromise EnsureDispatched<TPromise,T>(this TPromise that, IPromise<T> head, Action<T> cleanup) where TPromise : IPromise{
56 Safe.ArgumentNotNull(that, "that");
56 Safe.ArgumentNotNull(that, "that");
57 Safe.ArgumentNotNull(head, "head");
57 Safe.ArgumentNotNull(head, "head");
58
58
59 that.On(null,null,() => head.On(cleanup));
59 that.On(null,null,() => head.On(cleanup));
60
60
61 return that;
61 return that;
62 }
62 }
63
63
64 public static AsyncCallback AsyncCallback<T>(this Promise<T> that, Func<IAsyncResult,T> callback) {
64 public static AsyncCallback AsyncCallback<T>(this Promise<T> that, Func<IAsyncResult,T> callback) {
65 Safe.ArgumentNotNull(that, "that");
65 Safe.ArgumentNotNull(that, "that");
66 Safe.ArgumentNotNull(callback, "callback");
66 Safe.ArgumentNotNull(callback, "callback");
67 var op = TraceContext.Instance.CurrentOperation;
67 var op = TraceContext.Instance.CurrentOperation;
68 return ar => {
68 return ar => {
69 TraceContext.Instance.EnterLogicalOperation(op,false);
69 TraceContext.Instance.EnterLogicalOperation(op,false);
70 try {
70 try {
71 that.Resolve(callback(ar));
71 that.Resolve(callback(ar));
72 } catch (Exception err) {
72 } catch (Exception err) {
73 that.Reject(err);
73 that.Reject(err);
74 } finally {
74 } finally {
75 TraceContext.Instance.Leave();
75 TraceContext.Instance.Leave();
76 }
76 }
77 };
77 };
78 }
78 }
79
79
80 static void CancelCallback(object cookie) {
80 static void CancelCallback(object cookie) {
81 ((ICancellable)cookie).Cancel();
81 ((ICancellable)cookie).Cancel();
82 }
82 }
83
83
84 /// <summary>
84 /// <summary>
85 /// Cancells promise after the specified timeout is elapsed.
85 /// Cancells promise after the specified timeout is elapsed.
86 /// </summary>
86 /// </summary>
87 /// <param name="that">The promise to cancel on timeout.</param>
87 /// <param name="that">The promise to cancel on timeout.</param>
88 /// <param name="milliseconds">The timeout in milliseconds.</param>
88 /// <param name="milliseconds">The timeout in milliseconds.</param>
89 /// <typeparam name="TPromise">The 1st type parameter.</typeparam>
89 /// <typeparam name="TPromise">The 1st type parameter.</typeparam>
90 public static TPromise Timeout<TPromise>(this TPromise that, int milliseconds) where TPromise : IPromise {
90 public static TPromise Timeout<TPromise>(this TPromise that, int milliseconds) where TPromise : IPromise {
91 Safe.ArgumentNotNull(that, "that");
91 Safe.ArgumentNotNull(that, "that");
92 var timer = new Timer(CancelCallback, that, milliseconds, -1);
92 var timer = new Timer(CancelCallback, that, milliseconds, -1);
93 that.On(timer.Dispose, PromiseEventType.All);
93 that.On(timer.Dispose, PromiseEventType.All);
94 return that;
94 return that;
95 }
95 }
96
96
97 public static IPromise Combine(this ICollection<IPromise> that) {
97 public static IPromise Bundle(this ICollection<IPromise> that) {
98 Safe.ArgumentNotNull(that, "that");
98 Safe.ArgumentNotNull(that, "that");
99
99
100 int count = that.Count;
100 int count = that.Count;
101 int errors = 0;
101 var medium = new Promise();
102 var medium = new Promise();
102
103
104 medium.On(() => {
105 foreach(var p2 in that)
106 p2.Cancel();
107 }, PromiseEventType.ErrorOrCancel);
108
103 foreach (var p in that)
109 foreach (var p in that)
104 p.On(
110 p.On(
105 () => {
111 () => {
106 if (Interlocked.Decrement(ref count) == 0)
112 if (Interlocked.Decrement(ref count) == 0)
107 medium.Resolve();
113 medium.Resolve();
108 },
114 },
109 error => {
115 error => {
110 throw new Exception("The dependency promise is failed", error);
116 if (Interlocked.Increment(ref errors) == 1)
117 medium.Reject(
118 new Exception("The dependency promise is failed", error)
119 );
111 },
120 },
112 () => {
121 () => {
113 throw new OperationCanceledException("The dependency promise is cancelled");
122 if (Interlocked.Increment(ref errors) == 1)
123 medium.Reject(
124 new Exception("The dependency promise is cancelled")
125 );
114 }
126 }
115 );
127 );
116
128
117 return medium;
129 return medium;
118 }
130 }
131
132 public static IPromise<T[]> Bundle<T>(this ICollection<IPromise<T>> that) {
133 Safe.ArgumentNotNull(that, "that");
134
135 int count = that.Count;
136 int errors = 0;
137 var medium = new Promise<T[]>();
138 var results = new T[that.Count];
139
140 medium.On(() => {
141 foreach(var p2 in that)
142 p2.Cancel();
143 }, PromiseEventType.ErrorOrCancel);
144
145 int i = 0;
146 foreach (var p in that) {
147 var idx = i;
148 p.On(
149 x => {
150 results[idx] = x;
151 if (Interlocked.Decrement(ref count) == 0)
152 medium.Resolve(results);
153 },
154 error => {
155 if (Interlocked.Increment(ref errors) == 1)
156 medium.Reject(
157 new Exception("The dependency promise is failed", error)
158 );
159 },
160 () => {
161 if (Interlocked.Increment(ref errors) == 1)
162 medium.Reject(
163 new Exception("The dependency promise is cancelled")
164 );
165 }
166 );
167 i++;
168 }
169
170 return medium;
171 }
119
172
120 #if NET_4_5
173 #if NET_4_5
121
174
122 public static Task<T> GetTask<T>(this IPromise<T> that) {
175 public static Task<T> GetTask<T>(this IPromise<T> that) {
123 Safe.ArgumentNotNull(that, "that");
176 Safe.ArgumentNotNull(that, "that");
124 var tcs = new TaskCompletionSource<T>();
177 var tcs = new TaskCompletionSource<T>();
125
178
126 that.On(tcs.SetResult, tcs.SetException, tcs.SetCanceled);
179 that.On(tcs.SetResult, tcs.SetException, tcs.SetCanceled);
127
180
128 return tcs.Task;
181 return tcs.Task;
129 }
182 }
130
183
131 #endif
184 #endif
132 }
185 }
133 }
186 }
134
187
@@ -1,120 +1,120
1 using System;
1 using System;
2 using Implab.Diagnostics;
2 using Implab.Diagnostics;
3 using Implab.Parallels;
3 using Implab.Parallels;
4 using Implab;
4 using Implab;
5 using System.Collections.Generic;
5 using System.Collections.Generic;
6 using System.Collections.Concurrent;
6 using System.Collections.Concurrent;
7
7
8 namespace MonoPlay {
8 namespace MonoPlay {
9 class MainClass {
9 class MainClass {
10 public static void Main(string[] args) {
10 public static void Main(string[] args) {
11 if (args == null)
11 if (args == null)
12 throw new ArgumentNullException("args");
12 throw new ArgumentNullException("args");
13
13
14 var q1 = new AsyncQueue<int>();
14 var q1 = new AsyncQueue<int>();
15 var q2 = new Queue<int>();
15 var q2 = new Queue<int>();
16
16
17 const int count = 10000000;
17 const int count = 10000000;
18
18
19 int res1 = 0, res2 = 0;
19 int res1 = 0, res2 = 0;
20 var t1 = Environment.TickCount;
20 var t1 = Environment.TickCount;
21
21
22 AsyncPool.RunThread(
22 AsyncPool.RunThread(
23 () => {
23 () => {
24 for (var i = 0; i < count; i++)
24 for (var i = 0; i < count; i++)
25 q1.Enqueue(1);
25 q1.Enqueue(1);
26 Console.WriteLine("done writer #1: {0} ms", Environment.TickCount - t1);
26 Console.WriteLine("done writer #1: {0} ms", Environment.TickCount - t1);
27 },
27 },
28 () => {
28 () => {
29 for (var i = 0; i < count; i++)
29 for (var i = 0; i < count; i++)
30 q1.Enqueue(2);
30 q1.Enqueue(2);
31 Console.WriteLine("done writer #2: {0} ms", Environment.TickCount - t1);
31 Console.WriteLine("done writer #2: {0} ms", Environment.TickCount - t1);
32 },
32 },
33 () => {
33 () => {
34 int temp = 0;
34 int temp = 0;
35 int i = 0;
35 int i = 0;
36 while (i < count)
36 while (i < count)
37 if (q1.TryDequeue(out temp)) {
37 if (q1.TryDequeue(out temp)) {
38 i++;
38 i++;
39 res1 += temp;
39 res1 += temp;
40 }
40 }
41 Console.WriteLine("done reader #1: {0} ms", Environment.TickCount - t1);
41 Console.WriteLine("done reader #1: {0} ms", Environment.TickCount - t1);
42 },
42 },
43 () => {
43 () => {
44 int temp = 0;
44 int temp = 0;
45 int i = 0;
45 int i = 0;
46 while (i < count)
46 while (i < count)
47 if (q1.TryDequeue(out temp)) {
47 if (q1.TryDequeue(out temp)) {
48 i++;
48 i++;
49 res2 += temp;
49 res2 += temp;
50 }
50 }
51 Console.WriteLine("done reader #2: {0} ms", Environment.TickCount - t1);
51 Console.WriteLine("done reader #2: {0} ms", Environment.TickCount - t1);
52 }
52 }
53 )
53 )
54 .Combine()
54 .Bundle()
55 .Join();
55 .Join();
56
56
57 Console.WriteLine("done: {0} ms, summ#1: {1}, summ#2: {2}, total: {3}, count: {4}", Environment.TickCount - t1, res1, res2, res1 + res2, count*2);
57 Console.WriteLine("done: {0} ms, summ#1: {1}, summ#2: {2}, total: {3}, count: {4}", Environment.TickCount - t1, res1, res2, res1 + res2, count*2);
58
58
59 var t2 = Environment.TickCount;
59 var t2 = Environment.TickCount;
60 Console.WriteLine("MTQueue: {0} ms", t2 - t1);
60 Console.WriteLine("MTQueue: {0} ms", t2 - t1);
61
61
62 t1 = Environment.TickCount;
62 t1 = Environment.TickCount;
63
63
64 for (var i = 0; i < count * 2; i++)
64 for (var i = 0; i < count * 2; i++)
65 q2.Enqueue(i);
65 q2.Enqueue(i);
66
66
67 for (var i = 0; i < count * 2; i++)
67 for (var i = 0; i < count * 2; i++)
68 q2.Dequeue();
68 q2.Dequeue();
69
69
70 t2 = Environment.TickCount;
70 t2 = Environment.TickCount;
71 Console.WriteLine("Queue: {0} ms", t2 - t1);
71 Console.WriteLine("Queue: {0} ms", t2 - t1);
72
72
73 q2 = new Queue<int>();
73 q2 = new Queue<int>();
74
74
75 t1 = Environment.TickCount;
75 t1 = Environment.TickCount;
76
76
77
77
78 AsyncPool.RunThread(
78 AsyncPool.RunThread(
79 () => {
79 () => {
80 for (var i = 0; i < count; i++)
80 for (var i = 0; i < count; i++)
81 lock (q2)
81 lock (q2)
82 q2.Enqueue(i);
82 q2.Enqueue(i);
83 },
83 },
84 () => {
84 () => {
85 for (var i = 0; i < count; i++)
85 for (var i = 0; i < count; i++)
86 lock (q2)
86 lock (q2)
87 q2.Enqueue(i);
87 q2.Enqueue(i);
88 },
88 },
89 () => {
89 () => {
90 for (int i = 0; i < count ;)
90 for (int i = 0; i < count ;)
91 lock (q2) {
91 lock (q2) {
92 if (q2.Count == 0)
92 if (q2.Count == 0)
93 continue;
93 continue;
94 q2.Dequeue();
94 q2.Dequeue();
95 i++;
95 i++;
96 }
96 }
97
97
98 },
98 },
99 () => {
99 () => {
100 for (int i = 0; i < count ;)
100 for (int i = 0; i < count ;)
101 lock (q2) {
101 lock (q2) {
102 if (q2.Count == 0)
102 if (q2.Count == 0)
103 continue;
103 continue;
104 q2.Dequeue();
104 q2.Dequeue();
105 i++;
105 i++;
106 }
106 }
107
107
108 }
108 }
109 )
109 )
110 .Combine()
110 .Bundle()
111 .Join();
111 .Join();
112
112
113
113
114
114
115 t2 = Environment.TickCount;
115 t2 = Environment.TickCount;
116 Console.WriteLine("Queue+Lock: {0} ms", t2 - t1);
116 Console.WriteLine("Queue+Lock: {0} ms", t2 - t1);
117
117
118 }
118 }
119 }
119 }
120 }
120 }
General Comments 0
You need to be logged in to leave comments. Login now