##// END OF EJS Templates
fixed promises cancellation
cin -
r149:eb793fbbe4ea v2
parent child
Show More
@@ -1,852 +1,850
1 using System;
1 using System;
2 using System.Reflection;
2 using System.Reflection;
3 using System.Threading;
3 using System.Threading;
4 using Implab.Parallels;
4 using Implab.Parallels;
5
5
6 #if MONO
6 #if MONO
7
7
8 using NUnit.Framework;
8 using NUnit.Framework;
9 using TestClassAttribute = NUnit.Framework.TestFixtureAttribute;
9 using TestClassAttribute = NUnit.Framework.TestFixtureAttribute;
10 using TestMethodAttribute = NUnit.Framework.TestAttribute;
10 using TestMethodAttribute = NUnit.Framework.TestAttribute;
11
11
12 #else
12 #else
13
13
14 using Microsoft.VisualStudio.TestTools.UnitTesting;
14 using Microsoft.VisualStudio.TestTools.UnitTesting;
15
15
16 #endif
16 #endif
17
17
18 namespace Implab.Test {
18 namespace Implab.Test {
19 [TestClass]
19 [TestClass]
20 public class AsyncTests {
20 public class AsyncTests {
21 [TestMethod]
21 [TestMethod]
22 public void ResolveTest() {
22 public void ResolveTest() {
23 int res = -1;
23 int res = -1;
24 var p = new Promise<int>();
24 var p = new Promise<int>();
25 p.Then(x => res = x);
25 p.Then(x => res = x);
26 p.Resolve(100);
26 p.Resolve(100);
27
27
28 Assert.AreEqual(100, res);
28 Assert.AreEqual(100, res);
29 }
29 }
30
30
31 [TestMethod]
31 [TestMethod]
32 public void RejectTest() {
32 public void RejectTest() {
33 int res = -1;
33 int res = -1;
34 Exception err = null;
34 Exception err = null;
35
35
36 var p = new Promise<int>();
36 var p = new Promise<int>();
37 p.Then(
37 p.Then(
38 x => res = x,
38 x => res = x,
39 e => {
39 e => {
40 err = e;
40 err = e;
41 return -2;
41 return -2;
42 }
42 }
43 );
43 );
44 p.Reject(new ApplicationException("error"));
44 p.Reject(new ApplicationException("error"));
45
45
46 Assert.AreEqual(res, -1);
46 Assert.AreEqual(res, -1);
47 Assert.AreEqual(err.Message, "error");
47 Assert.AreEqual(err.Message, "error");
48
48
49 }
49 }
50
50
51 [TestMethod]
51 [TestMethod]
52 public void CancelExceptionTest() {
52 public void CancelExceptionTest() {
53 var p = new Promise<bool>();
53 var p = new Promise<bool>();
54 p.CancelOperation(null);
54 p.CancelOperation(null);
55
55
56 var p2 = p.Then(x => x, null, reason => {
56 var p2 = p.Then(x => x, null, reason => {
57 throw new ApplicationException("CANCELLED");
57 throw new ApplicationException("CANCELLED");
58 });
58 });
59
59
60 try {
60 try {
61 p2.Join();
61 p2.Join();
62 Assert.Fail();
62 Assert.Fail();
63 } catch (ApplicationException err) {
63 } catch (ApplicationException err) {
64 Assert.AreEqual("CANCELLED", err.InnerException.Message);
64 Assert.AreEqual("CANCELLED", err.InnerException.Message);
65 }
65 }
66
66
67 }
67 }
68
68
69 [TestMethod]
69 [TestMethod]
70 public void ContinueOnCancelTest() {
70 public void ContinueOnCancelTest() {
71 var p = new Promise<bool>();
71 var p = new Promise<bool>();
72 p.CancelOperation(null);
72 p.CancelOperation(null);
73
73
74 var p2 = p
74 var p2 = p
75 .Then(x => x, null, reason => {
75 .Then(x => x, null, reason => {
76 throw new ApplicationException("CANCELLED");
76 throw new ApplicationException("CANCELLED");
77 })
77 })
78 .Then(x => x, e => true);
78 .Then(x => x, e => true);
79
79
80 Assert.AreEqual(true, p2.Join());
80 Assert.AreEqual(true, p2.Join());
81 }
81 }
82
82
83 [TestMethod]
83 [TestMethod]
84 public void JoinSuccessTest() {
84 public void JoinSuccessTest() {
85 var p = new Promise<int>();
85 var p = new Promise<int>();
86 p.Resolve(100);
86 p.Resolve(100);
87 Assert.AreEqual(p.Join(), 100);
87 Assert.AreEqual(p.Join(), 100);
88 }
88 }
89
89
90 [TestMethod]
90 [TestMethod]
91 public void JoinFailTest() {
91 public void JoinFailTest() {
92 var p = new Promise<int>();
92 var p = new Promise<int>();
93 p.Reject(new ApplicationException("failed"));
93 p.Reject(new ApplicationException("failed"));
94
94
95 try {
95 try {
96 p.Join();
96 p.Join();
97 throw new ApplicationException("WRONG!");
97 throw new ApplicationException("WRONG!");
98 } catch (TargetInvocationException err) {
98 } catch (TargetInvocationException err) {
99 Assert.AreEqual(err.InnerException.Message, "failed");
99 Assert.AreEqual(err.InnerException.Message, "failed");
100 } catch {
100 } catch {
101 Assert.Fail("Got wrong excaption");
101 Assert.Fail("Got wrong excaption");
102 }
102 }
103 }
103 }
104
104
105 [TestMethod]
105 [TestMethod]
106 public void MapTest() {
106 public void MapTest() {
107 var p = new Promise<int>();
107 var p = new Promise<int>();
108
108
109 var p2 = p.Then(x => x.ToString());
109 var p2 = p.Then(x => x.ToString());
110 p.Resolve(100);
110 p.Resolve(100);
111
111
112 Assert.AreEqual(p2.Join(), "100");
112 Assert.AreEqual(p2.Join(), "100");
113 }
113 }
114
114
115 [TestMethod]
115 [TestMethod]
116 public void FixErrorTest() {
116 public void FixErrorTest() {
117 var p = new Promise<int>();
117 var p = new Promise<int>();
118
118
119 var p2 = p.Then(x => x, e => 101);
119 var p2 = p.Then(x => x, e => 101);
120
120
121 p.Reject(new Exception());
121 p.Reject(new Exception());
122
122
123 Assert.AreEqual(p2.Join(), 101);
123 Assert.AreEqual(p2.Join(), 101);
124 }
124 }
125
125
126 [TestMethod]
126 [TestMethod]
127 public void ChainTest() {
127 public void ChainTest() {
128 var p1 = new Promise<int>();
128 var p1 = new Promise<int>();
129
129
130 var p3 = p1.Chain(x => {
130 var p3 = p1.Chain(x => {
131 var p2 = new Promise<string>();
131 var p2 = new Promise<string>();
132 p2.Resolve(x.ToString());
132 p2.Resolve(x.ToString());
133 return p2;
133 return p2;
134 });
134 });
135
135
136 p1.Resolve(100);
136 p1.Resolve(100);
137
137
138 Assert.AreEqual(p3.Join(), "100");
138 Assert.AreEqual(p3.Join(), "100");
139 }
139 }
140
140
141 [TestMethod]
141 [TestMethod]
142 public void ChainFailTest() {
142 public void ChainFailTest() {
143 var p1 = new Promise<int>();
143 var p1 = new Promise<int>();
144
144
145 var p3 = p1.Chain(x => {
145 var p3 = p1.Chain(x => {
146 var p2 = new Promise<string>();
146 var p2 = new Promise<string>();
147 p2.Reject(new Exception("DIE!!!"));
147 p2.Reject(new Exception("DIE!!!"));
148 return p2;
148 return p2;
149 });
149 });
150
150
151 p1.Resolve(100);
151 p1.Resolve(100);
152
152
153 Assert.IsTrue(p3.IsResolved);
153 Assert.IsTrue(p3.IsResolved);
154 }
154 }
155
155
156 [TestMethod]
156 [TestMethod]
157 public void PoolTest() {
157 public void PoolTest() {
158 var pid = Thread.CurrentThread.ManagedThreadId;
158 var pid = Thread.CurrentThread.ManagedThreadId;
159 var p = AsyncPool.Invoke(() => Thread.CurrentThread.ManagedThreadId);
159 var p = AsyncPool.Invoke(() => Thread.CurrentThread.ManagedThreadId);
160
160
161 Assert.AreNotEqual(pid, p.Join());
161 Assert.AreNotEqual(pid, p.Join());
162 }
162 }
163
163
164 [TestMethod]
164 [TestMethod]
165 public void WorkerPoolSizeTest() {
165 public void WorkerPoolSizeTest() {
166 var pool = new WorkerPool(5, 10, 1);
166 var pool = new WorkerPool(5, 10, 1);
167
167
168 Assert.AreEqual(5, pool.PoolSize);
168 Assert.AreEqual(5, pool.PoolSize);
169
169
170 pool.Invoke(() => { Thread.Sleep(100000000); return 10; });
170 pool.Invoke(() => { Thread.Sleep(100000000); return 10; });
171 pool.Invoke(() => { Thread.Sleep(100000000); return 10; });
171 pool.Invoke(() => { Thread.Sleep(100000000); return 10; });
172 pool.Invoke(() => { Thread.Sleep(100000000); return 10; });
172 pool.Invoke(() => { Thread.Sleep(100000000); return 10; });
173
173
174 Assert.AreEqual(5, pool.PoolSize);
174 Assert.AreEqual(5, pool.PoolSize);
175
175
176 for (int i = 0; i < 100; i++)
176 for (int i = 0; i < 100; i++)
177 pool.Invoke(() => { Thread.Sleep(100000000); return 10; });
177 pool.Invoke(() => { Thread.Sleep(100000000); return 10; });
178 Thread.Sleep(200);
178 Thread.Sleep(200);
179 Assert.AreEqual(10, pool.PoolSize);
179 Assert.AreEqual(10, pool.PoolSize);
180
180
181 pool.Dispose();
181 pool.Dispose();
182 }
182 }
183
183
184 [TestMethod]
184 [TestMethod]
185 public void WorkerPoolCorrectTest() {
185 public void WorkerPoolCorrectTest() {
186 var pool = new WorkerPool(0,1000,100);
186 var pool = new WorkerPool(0,1000,100);
187
187
188 const int iterations = 1000;
188 const int iterations = 1000;
189 int pending = iterations;
189 int pending = iterations;
190 var stop = new ManualResetEvent(false);
190 var stop = new ManualResetEvent(false);
191
191
192 var count = 0;
192 var count = 0;
193 for (int i = 0; i < iterations; i++) {
193 for (int i = 0; i < iterations; i++) {
194 pool
194 pool
195 .Invoke(() => 1)
195 .Invoke(() => 1)
196 .Then(x => Interlocked.Add(ref count, x))
196 .Then(x => Interlocked.Add(ref count, x))
197 .Then(x => Math.Log10(x))
197 .Then(x => Math.Log10(x))
198 .On(() => {
198 .On(() => {
199 Interlocked.Decrement(ref pending);
199 Interlocked.Decrement(ref pending);
200 if (pending == 0)
200 if (pending == 0)
201 stop.Set();
201 stop.Set();
202 }, PromiseEventType.All);
202 }, PromiseEventType.All);
203 }
203 }
204
204
205 stop.WaitOne();
205 stop.WaitOne();
206
206
207 Assert.AreEqual(iterations, count);
207 Assert.AreEqual(iterations, count);
208 Console.WriteLine("Max threads: {0}", pool.MaxRunningThreads);
208 Console.WriteLine("Max threads: {0}", pool.MaxRunningThreads);
209 pool.Dispose();
209 pool.Dispose();
210
210
211 }
211 }
212
212
213 [TestMethod]
213 [TestMethod]
214 public void WorkerPoolDisposeTest() {
214 public void WorkerPoolDisposeTest() {
215 var pool = new WorkerPool(5, 20);
215 var pool = new WorkerPool(5, 20);
216 Assert.AreEqual(5, pool.PoolSize);
216 Assert.AreEqual(5, pool.PoolSize);
217 pool.Dispose();
217 pool.Dispose();
218 Thread.Sleep(500);
218 Thread.Sleep(500);
219 Assert.AreEqual(0, pool.PoolSize);
219 Assert.AreEqual(0, pool.PoolSize);
220 pool.Dispose();
220 pool.Dispose();
221 }
221 }
222
222
223 [TestMethod]
223 [TestMethod]
224 public void MTQueueTest() {
224 public void MTQueueTest() {
225 var queue = new MTQueue<int>();
225 var queue = new MTQueue<int>();
226 int res;
226 int res;
227
227
228 queue.Enqueue(10);
228 queue.Enqueue(10);
229 Assert.IsTrue(queue.TryDequeue(out res));
229 Assert.IsTrue(queue.TryDequeue(out res));
230 Assert.AreEqual(10, res);
230 Assert.AreEqual(10, res);
231 Assert.IsFalse(queue.TryDequeue(out res));
231 Assert.IsFalse(queue.TryDequeue(out res));
232
232
233 for (int i = 0; i < 1000; i++)
233 for (int i = 0; i < 1000; i++)
234 queue.Enqueue(i);
234 queue.Enqueue(i);
235
235
236 for (int i = 0; i < 1000; i++) {
236 for (int i = 0; i < 1000; i++) {
237 queue.TryDequeue(out res);
237 queue.TryDequeue(out res);
238 Assert.AreEqual(i, res);
238 Assert.AreEqual(i, res);
239 }
239 }
240
240
241 int writers = 0;
241 int writers = 0;
242 int readers = 0;
242 int readers = 0;
243 var stop = new ManualResetEvent(false);
243 var stop = new ManualResetEvent(false);
244 int total = 0;
244 int total = 0;
245
245
246 const int itemsPerWriter = 10000;
246 const int itemsPerWriter = 10000;
247 const int writersCount = 10;
247 const int writersCount = 10;
248
248
249 for (int i = 0; i < writersCount; i++) {
249 for (int i = 0; i < writersCount; i++) {
250 Interlocked.Increment(ref writers);
250 Interlocked.Increment(ref writers);
251 AsyncPool
251 AsyncPool
252 .RunThread(() => {
252 .RunThread(() => {
253 for (int ii = 0; ii < itemsPerWriter; ii++) {
253 for (int ii = 0; ii < itemsPerWriter; ii++) {
254 queue.Enqueue(1);
254 queue.Enqueue(1);
255 }
255 }
256 return 1;
256 return 1;
257 })
257 })
258 .On(() => Interlocked.Decrement(ref writers), PromiseEventType.All);
258 .On(() => Interlocked.Decrement(ref writers), PromiseEventType.All);
259 }
259 }
260
260
261 for (int i = 0; i < 10; i++) {
261 for (int i = 0; i < 10; i++) {
262 Interlocked.Increment(ref readers);
262 Interlocked.Increment(ref readers);
263 AsyncPool
263 AsyncPool
264 .RunThread(() => {
264 .RunThread(() => {
265 int t;
265 int t;
266 do {
266 do {
267 while (queue.TryDequeue(out t))
267 while (queue.TryDequeue(out t))
268 Interlocked.Add(ref total, t);
268 Interlocked.Add(ref total, t);
269 } while (writers > 0);
269 } while (writers > 0);
270 return 1;
270 return 1;
271 })
271 })
272 .On(() => {
272 .On(() => {
273 Interlocked.Decrement(ref readers);
273 Interlocked.Decrement(ref readers);
274 if (readers == 0)
274 if (readers == 0)
275 stop.Set();
275 stop.Set();
276 }, PromiseEventType.All);
276 }, PromiseEventType.All);
277 }
277 }
278
278
279 stop.WaitOne();
279 stop.WaitOne();
280
280
281 Assert.AreEqual(100000, total);
281 Assert.AreEqual(100000, total);
282 }
282 }
283
283
284 [TestMethod]
284 [TestMethod]
285 public void AsyncQueueTest() {
285 public void AsyncQueueTest() {
286 var queue = new AsyncQueue<int>();
286 var queue = new AsyncQueue<int>();
287 int res;
287 int res;
288
288
289 queue.Enqueue(10);
289 queue.Enqueue(10);
290 Assert.IsTrue(queue.TryDequeue(out res));
290 Assert.IsTrue(queue.TryDequeue(out res));
291 Assert.AreEqual(10, res);
291 Assert.AreEqual(10, res);
292 Assert.IsFalse(queue.TryDequeue(out res));
292 Assert.IsFalse(queue.TryDequeue(out res));
293
293
294 for (int i = 0; i < 1000; i++)
294 for (int i = 0; i < 1000; i++)
295 queue.Enqueue(i);
295 queue.Enqueue(i);
296
296
297 for (int i = 0; i < 1000; i++) {
297 for (int i = 0; i < 1000; i++) {
298 queue.TryDequeue(out res);
298 queue.TryDequeue(out res);
299 Assert.AreEqual(i, res);
299 Assert.AreEqual(i, res);
300 }
300 }
301
301
302 const int count = 10000000;
302 const int count = 10000000;
303
303
304 int res1 = 0, res2 = 0;
304 int res1 = 0, res2 = 0;
305 var t1 = Environment.TickCount;
305 var t1 = Environment.TickCount;
306
306
307 AsyncPool.RunThread(
307 AsyncPool.RunThread(
308 () => {
308 () => {
309 for (var i = 0; i < count; i++)
309 for (var i = 0; i < count; i++)
310 queue.Enqueue(1);
310 queue.Enqueue(1);
311 Console.WriteLine("done writer #1: {0} ms", Environment.TickCount - t1);
311 Console.WriteLine("done writer #1: {0} ms", Environment.TickCount - t1);
312 },
312 },
313 () => {
313 () => {
314 for (var i = 0; i < count; i++)
314 for (var i = 0; i < count; i++)
315 queue.Enqueue(2);
315 queue.Enqueue(2);
316 Console.WriteLine("done writer #2: {0} ms", Environment.TickCount - t1);
316 Console.WriteLine("done writer #2: {0} ms", Environment.TickCount - t1);
317 },
317 },
318 () => {
318 () => {
319 int temp;
319 int temp;
320 int i = 0;
320 int i = 0;
321 while (i < count)
321 while (i < count)
322 if (queue.TryDequeue(out temp)) {
322 if (queue.TryDequeue(out temp)) {
323 i++;
323 i++;
324 res1 += temp;
324 res1 += temp;
325 }
325 }
326 Console.WriteLine("done reader #1: {0} ms", Environment.TickCount - t1);
326 Console.WriteLine("done reader #1: {0} ms", Environment.TickCount - t1);
327 },
327 },
328 () => {
328 () => {
329 int temp;
329 int temp;
330 int i = 0;
330 int i = 0;
331 while (i < count)
331 while (i < count)
332 if (queue.TryDequeue(out temp)) {
332 if (queue.TryDequeue(out temp)) {
333 i++;
333 i++;
334 res2 += temp;
334 res2 += temp;
335 }
335 }
336 Console.WriteLine("done reader #2: {0} ms", Environment.TickCount - t1);
336 Console.WriteLine("done reader #2: {0} ms", Environment.TickCount - t1);
337 }
337 }
338 )
338 )
339 .Bundle()
339 .Bundle()
340 .Join();
340 .Join();
341
341
342 Assert.AreEqual(count * 3, res1 + res2);
342 Assert.AreEqual(count * 3, res1 + res2);
343
343
344 Console.WriteLine(
344 Console.WriteLine(
345 "done: {0} ms, summ#1: {1}, summ#2: {2}, total: {3}, count: {4}",
345 "done: {0} ms, summ#1: {1}, summ#2: {2}, total: {3}, count: {4}",
346 Environment.TickCount - t1,
346 Environment.TickCount - t1,
347 res1,
347 res1,
348 res2,
348 res2,
349 res1 + res2,
349 res1 + res2,
350 count
350 count
351 );
351 );
352 }
352 }
353
353
354 [TestMethod]
354 [TestMethod]
355 public void AsyncQueueBatchTest() {
355 public void AsyncQueueBatchTest() {
356 var queue = new AsyncQueue<int>();
356 var queue = new AsyncQueue<int>();
357
357
358 const int wBatch = 29;
358 const int wBatch = 29;
359 const int wCount = 400000;
359 const int wCount = 400000;
360 const int total = wBatch * wCount * 2;
360 const int total = wBatch * wCount * 2;
361 const int summ = wBatch * wCount * 3;
361 const int summ = wBatch * wCount * 3;
362
362
363 int r1 = 0, r2 = 0;
363 int r1 = 0, r2 = 0;
364 const int rBatch = 111;
364 const int rBatch = 111;
365 int read = 0;
365 int read = 0;
366
366
367 var t1 = Environment.TickCount;
367 var t1 = Environment.TickCount;
368
368
369 AsyncPool.RunThread(
369 AsyncPool.RunThread(
370 () => {
370 () => {
371 var buffer = new int[wBatch];
371 var buffer = new int[wBatch];
372 for(int i = 0; i<wBatch; i++)
372 for(int i = 0; i<wBatch; i++)
373 buffer[i] = 1;
373 buffer[i] = 1;
374
374
375 for(int i =0; i < wCount; i++)
375 for(int i =0; i < wCount; i++)
376 queue.EnqueueRange(buffer,0,wBatch);
376 queue.EnqueueRange(buffer,0,wBatch);
377 Console.WriteLine("done writer #1: {0} ms", Environment.TickCount - t1);
377 Console.WriteLine("done writer #1: {0} ms", Environment.TickCount - t1);
378 },
378 },
379 () => {
379 () => {
380 var buffer = new int[wBatch];
380 var buffer = new int[wBatch];
381 for(int i = 0; i<wBatch; i++)
381 for(int i = 0; i<wBatch; i++)
382 buffer[i] = 2;
382 buffer[i] = 2;
383
383
384 for(int i =0; i < wCount; i++)
384 for(int i =0; i < wCount; i++)
385 queue.EnqueueRange(buffer,0,wBatch);
385 queue.EnqueueRange(buffer,0,wBatch);
386 Console.WriteLine("done writer #2: {0} ms", Environment.TickCount - t1);
386 Console.WriteLine("done writer #2: {0} ms", Environment.TickCount - t1);
387 },
387 },
388 () => {
388 () => {
389 var buffer = new int[rBatch];
389 var buffer = new int[rBatch];
390
390
391 while(read < total) {
391 while(read < total) {
392 int actual;
392 int actual;
393 if (queue.TryDequeueRange(buffer,0,rBatch,out actual)) {
393 if (queue.TryDequeueRange(buffer,0,rBatch,out actual)) {
394 for(int i=0; i< actual; i++)
394 for(int i=0; i< actual; i++)
395 r1 += buffer[i];
395 r1 += buffer[i];
396 Interlocked.Add(ref read, actual);
396 Interlocked.Add(ref read, actual);
397 }
397 }
398 }
398 }
399
399
400 Console.WriteLine("done reader #1: {0} ms", Environment.TickCount - t1);
400 Console.WriteLine("done reader #1: {0} ms", Environment.TickCount - t1);
401 },
401 },
402 () => {
402 () => {
403 var buffer = new int[rBatch];
403 var buffer = new int[rBatch];
404
404
405 while(read < total) {
405 while(read < total) {
406 int actual;
406 int actual;
407 if (queue.TryDequeueRange(buffer,0,rBatch,out actual)) {
407 if (queue.TryDequeueRange(buffer,0,rBatch,out actual)) {
408 for(int i=0; i< actual; i++)
408 for(int i=0; i< actual; i++)
409 r2 += buffer[i];
409 r2 += buffer[i];
410 Interlocked.Add(ref read, actual);
410 Interlocked.Add(ref read, actual);
411 }
411 }
412 }
412 }
413
413
414 Console.WriteLine("done reader #2: {0} ms", Environment.TickCount - t1);
414 Console.WriteLine("done reader #2: {0} ms", Environment.TickCount - t1);
415 }
415 }
416 )
416 )
417 .Bundle()
417 .Bundle()
418 .Join();
418 .Join();
419
419
420 Assert.AreEqual(summ , r1 + r2);
420 Assert.AreEqual(summ , r1 + r2);
421
421
422 Console.WriteLine(
422 Console.WriteLine(
423 "done: {0} ms, summ#1: {1}, summ#2: {2}, total: {3}, count: {4}",
423 "done: {0} ms, summ#1: {1}, summ#2: {2}, total: {3}, count: {4}",
424 Environment.TickCount - t1,
424 Environment.TickCount - t1,
425 r1,
425 r1,
426 r2,
426 r2,
427 r1 + r2,
427 r1 + r2,
428 total
428 total
429 );
429 );
430 }
430 }
431
431
432 [TestMethod]
432 [TestMethod]
433 public void AsyncQueueChunkDequeueTest() {
433 public void AsyncQueueChunkDequeueTest() {
434 var queue = new AsyncQueue<int>();
434 var queue = new AsyncQueue<int>();
435
435
436 const int wBatch = 31;
436 const int wBatch = 31;
437 const int wCount = 200000;
437 const int wCount = 200000;
438 const int total = wBatch * wCount * 3;
438 const int total = wBatch * wCount * 3;
439 const int summ = wBatch * wCount * 6;
439 const int summ = wBatch * wCount * 6;
440
440
441 int r1 = 0, r2 = 0;
441 int r1 = 0, r2 = 0;
442 const int rBatch = 1024;
442 const int rBatch = 1024;
443 int read = 0;
443 int read = 0;
444
444
445 var t1 = Environment.TickCount;
445 var t1 = Environment.TickCount;
446
446
447 AsyncPool.RunThread(
447 AsyncPool.RunThread(
448 () => {
448 () => {
449 var buffer = new int[wBatch];
449 var buffer = new int[wBatch];
450 for(int i = 0; i<wBatch; i++)
450 for(int i = 0; i<wBatch; i++)
451 buffer[i] = 1;
451 buffer[i] = 1;
452
452
453 for(int i =0; i < wCount; i++)
453 for(int i =0; i < wCount; i++)
454 queue.EnqueueRange(buffer,0,wBatch);
454 queue.EnqueueRange(buffer,0,wBatch);
455 Console.WriteLine("done writer #1: {0} ms", Environment.TickCount - t1);
455 Console.WriteLine("done writer #1: {0} ms", Environment.TickCount - t1);
456 },
456 },
457 () => {
457 () => {
458 var buffer = new int[wBatch];
458 var buffer = new int[wBatch];
459 for(int i = 0; i<wBatch; i++)
459 for(int i = 0; i<wBatch; i++)
460 buffer[i] = 2;
460 buffer[i] = 2;
461
461
462 for(int i =0; i < wCount; i++)
462 for(int i =0; i < wCount; i++)
463 queue.EnqueueRange(buffer,0,wBatch);
463 queue.EnqueueRange(buffer,0,wBatch);
464 Console.WriteLine("done writer #2: {0} ms", Environment.TickCount - t1);
464 Console.WriteLine("done writer #2: {0} ms", Environment.TickCount - t1);
465 },
465 },
466 () => {
466 () => {
467 var buffer = new int[wBatch];
467 var buffer = new int[wBatch];
468 for(int i = 0; i<wBatch; i++)
468 for(int i = 0; i<wBatch; i++)
469 buffer[i] = 3;
469 buffer[i] = 3;
470
470
471 for(int i =0; i < wCount; i++)
471 for(int i =0; i < wCount; i++)
472 queue.EnqueueRange(buffer,0,wBatch);
472 queue.EnqueueRange(buffer,0,wBatch);
473 Console.WriteLine("done writer #3: {0} ms", Environment.TickCount - t1);
473 Console.WriteLine("done writer #3: {0} ms", Environment.TickCount - t1);
474 },
474 },
475 () => {
475 () => {
476 var buffer = new int[rBatch];
476 var buffer = new int[rBatch];
477 int count = 1;
477 int count = 1;
478 double avgchunk = 0;
478 double avgchunk = 0;
479 while(read < total) {
479 while(read < total) {
480 int actual;
480 int actual;
481 if (queue.TryDequeueChunk(buffer,0,rBatch,out actual)) {
481 if (queue.TryDequeueChunk(buffer,0,rBatch,out actual)) {
482 for(int i=0; i< actual; i++)
482 for(int i=0; i< actual; i++)
483 r2 += buffer[i];
483 r2 += buffer[i];
484 Interlocked.Add(ref read, actual);
484 Interlocked.Add(ref read, actual);
485 avgchunk = avgchunk*(count-1)/count + actual/(double)count;
485 avgchunk = avgchunk*(count-1)/count + actual/(double)count;
486 count ++;
486 count ++;
487 }
487 }
488 }
488 }
489
489
490 Console.WriteLine("done reader #2: {0} ms, avg chunk size: {1}", Environment.TickCount - t1, avgchunk);
490 Console.WriteLine("done reader #2: {0} ms, avg chunk size: {1}", Environment.TickCount - t1, avgchunk);
491 }
491 }
492 )
492 )
493 .Bundle()
493 .Bundle()
494 .Join();
494 .Join();
495
495
496 Assert.AreEqual(summ , r1 + r2);
496 Assert.AreEqual(summ , r1 + r2);
497
497
498 Console.WriteLine(
498 Console.WriteLine(
499 "done: {0} ms, summ#1: {1}, summ#2: {2}, total: {3}, count: {4}",
499 "done: {0} ms, summ#1: {1}, summ#2: {2}, total: {3}, count: {4}",
500 Environment.TickCount - t1,
500 Environment.TickCount - t1,
501 r1,
501 r1,
502 r2,
502 r2,
503 r1 + r2,
503 r1 + r2,
504 total
504 total
505 );
505 );
506 }
506 }
507
507
508 [TestMethod]
508 [TestMethod]
509 public void AsyncQueueDrainTest() {
509 public void AsyncQueueDrainTest() {
510 var queue = new AsyncQueue<int>();
510 var queue = new AsyncQueue<int>();
511
511
512 const int wBatch = 11;
512 const int wBatch = 11;
513 const int wCount = 200000;
513 const int wCount = 200000;
514 const int total = wBatch * wCount * 3;
514 const int total = wBatch * wCount * 3;
515 const int summ = wBatch * wCount * 3;
515 const int summ = wBatch * wCount * 3;
516
516
517 int r1 = 0, r2 = 0;
517 int r1 = 0, r2 = 0;
518 const int rBatch = 11;
518 const int rBatch = 11;
519 int read = 0;
519 int read = 0;
520
520
521 var t1 = Environment.TickCount;
521 var t1 = Environment.TickCount;
522
522
523 AsyncPool.RunThread(
523 AsyncPool.RunThread(
524 () => {
524 () => {
525 var buffer = new int[wBatch];
525 var buffer = new int[wBatch];
526 for(int i = 0; i<wBatch; i++)
526 for(int i = 0; i<wBatch; i++)
527 buffer[i] = 1;
527 buffer[i] = 1;
528
528
529 for(int i =0; i < wCount; i++)
529 for(int i =0; i < wCount; i++)
530 queue.EnqueueRange(buffer,0,wBatch);
530 queue.EnqueueRange(buffer,0,wBatch);
531 Console.WriteLine("done writer #1: {0} ms", Environment.TickCount - t1);
531 Console.WriteLine("done writer #1: {0} ms", Environment.TickCount - t1);
532 },
532 },
533 () => {
533 () => {
534 for(int i =0; i < wCount * wBatch; i++)
534 for(int i =0; i < wCount * wBatch; i++)
535 queue.Enqueue(1);
535 queue.Enqueue(1);
536 Console.WriteLine("done writer #2: {0} ms", Environment.TickCount - t1);
536 Console.WriteLine("done writer #2: {0} ms", Environment.TickCount - t1);
537 },
537 },
538 () => {
538 () => {
539 var buffer = new int[wBatch];
539 var buffer = new int[wBatch];
540 for(int i = 0; i<wBatch; i++)
540 for(int i = 0; i<wBatch; i++)
541 buffer[i] = 1;
541 buffer[i] = 1;
542
542
543 for(int i =0; i < wCount; i++)
543 for(int i =0; i < wCount; i++)
544 queue.EnqueueRange(buffer,0,wBatch);
544 queue.EnqueueRange(buffer,0,wBatch);
545 Console.WriteLine("done writer #3: {0} ms", Environment.TickCount - t1);
545 Console.WriteLine("done writer #3: {0} ms", Environment.TickCount - t1);
546 },
546 },
547 /*() => {
547 /*() => {
548 int temp;
548 int temp;
549 int count = 0;
549 int count = 0;
550 while (read < total)
550 while (read < total)
551 if (queue.TryDequeue(out temp)) {
551 if (queue.TryDequeue(out temp)) {
552 count++;
552 count++;
553 r1 += temp;
553 r1 += temp;
554 Interlocked.Increment(ref read);
554 Interlocked.Increment(ref read);
555 }
555 }
556 Console.WriteLine("done reader #1: {0} ms, {1} count", Environment.TickCount - t1, count);
556 Console.WriteLine("done reader #1: {0} ms, {1} count", Environment.TickCount - t1, count);
557 },*/
557 },*/
558 /*() => {
558 /*() => {
559 var buffer = new int[rBatch];
559 var buffer = new int[rBatch];
560 var count = 0;
560 var count = 0;
561 while(read < total) {
561 while(read < total) {
562 int actual;
562 int actual;
563 if (queue.TryDequeueRange(buffer,0,rBatch,out actual)) {
563 if (queue.TryDequeueRange(buffer,0,rBatch,out actual)) {
564 for(int i=0; i< actual; i++)
564 for(int i=0; i< actual; i++)
565 r1 += buffer[i];
565 r1 += buffer[i];
566 Interlocked.Add(ref read, actual);
566 Interlocked.Add(ref read, actual);
567 count += actual;
567 count += actual;
568 }
568 }
569 }
569 }
570
570
571 Console.WriteLine("done reader #1: {0} ms, {1} items", Environment.TickCount - t1, count);
571 Console.WriteLine("done reader #1: {0} ms, {1} items", Environment.TickCount - t1, count);
572 },*/
572 },*/
573 () => {
573 () => {
574 var count = 0;
574 var count = 0;
575 while(read < total) {
575 while(read < total) {
576 var buffer = queue.Drain();
576 var buffer = queue.Drain();
577 for(int i=0; i< buffer.Length; i++)
577 for(int i=0; i< buffer.Length; i++)
578 r1 += buffer[i];
578 r1 += buffer[i];
579 Interlocked.Add(ref read, buffer.Length);
579 Interlocked.Add(ref read, buffer.Length);
580 count += buffer.Length;
580 count += buffer.Length;
581 }
581 }
582 Console.WriteLine("done reader #1: {0} ms, {1} items", Environment.TickCount - t1, count);
582 Console.WriteLine("done reader #1: {0} ms, {1} items", Environment.TickCount - t1, count);
583 },
583 },
584 () => {
584 () => {
585 var count = 0;
585 var count = 0;
586 while(read < total) {
586 while(read < total) {
587 var buffer = queue.Drain();
587 var buffer = queue.Drain();
588 for(int i=0; i< buffer.Length; i++)
588 for(int i=0; i< buffer.Length; i++)
589 r2 += buffer[i];
589 r2 += buffer[i];
590 Interlocked.Add(ref read, buffer.Length);
590 Interlocked.Add(ref read, buffer.Length);
591 count += buffer.Length;
591 count += buffer.Length;
592 }
592 }
593 Console.WriteLine("done reader #2: {0} ms, {1} items", Environment.TickCount - t1, count);
593 Console.WriteLine("done reader #2: {0} ms, {1} items", Environment.TickCount - t1, count);
594 }
594 }
595 )
595 )
596 .Bundle()
596 .Bundle()
597 .Join();
597 .Join();
598
598
599 Assert.AreEqual(summ , r1 + r2);
599 Assert.AreEqual(summ , r1 + r2);
600
600
601 Console.WriteLine(
601 Console.WriteLine(
602 "done: {0} ms, summ#1: {1}, summ#2: {2}, total: {3}, count: {4}",
602 "done: {0} ms, summ#1: {1}, summ#2: {2}, total: {3}, count: {4}",
603 Environment.TickCount - t1,
603 Environment.TickCount - t1,
604 r1,
604 r1,
605 r2,
605 r2,
606 r1 + r2,
606 r1 + r2,
607 total
607 total
608 );
608 );
609 }
609 }
610
610
611 [TestMethod]
611 [TestMethod]
612 public void ParallelMapTest() {
612 public void ParallelMapTest() {
613
613
614 const int count = 100000;
614 const int count = 100000;
615
615
616 var args = new double[count];
616 var args = new double[count];
617 var rand = new Random();
617 var rand = new Random();
618
618
619 for (int i = 0; i < count; i++)
619 for (int i = 0; i < count; i++)
620 args[i] = rand.NextDouble();
620 args[i] = rand.NextDouble();
621
621
622 var t = Environment.TickCount;
622 var t = Environment.TickCount;
623 var res = args.ParallelMap(x => Math.Sin(x*x), 4).Join();
623 var res = args.ParallelMap(x => Math.Sin(x*x), 4).Join();
624
624
625 Console.WriteLine("Map complete in {0} ms", Environment.TickCount - t);
625 Console.WriteLine("Map complete in {0} ms", Environment.TickCount - t);
626
626
627 t = Environment.TickCount;
627 t = Environment.TickCount;
628 for (int i = 0; i < count; i++)
628 for (int i = 0; i < count; i++)
629 Assert.AreEqual(Math.Sin(args[i] * args[i]), res[i]);
629 Assert.AreEqual(Math.Sin(args[i] * args[i]), res[i]);
630 Console.WriteLine("Verified in {0} ms", Environment.TickCount - t);
630 Console.WriteLine("Verified in {0} ms", Environment.TickCount - t);
631 }
631 }
632
632
633 [TestMethod]
633 [TestMethod]
634 public void ChainedMapTest() {
634 public void ChainedMapTest() {
635
635
636 using (var pool = new WorkerPool()) {
636 using (var pool = new WorkerPool()) {
637 const int count = 10000;
637 const int count = 10000;
638
638
639 var args = new double[count];
639 var args = new double[count];
640 var rand = new Random();
640 var rand = new Random();
641
641
642 for (int i = 0; i < count; i++)
642 for (int i = 0; i < count; i++)
643 args[i] = rand.NextDouble();
643 args[i] = rand.NextDouble();
644
644
645 var t = Environment.TickCount;
645 var t = Environment.TickCount;
646 var res = args
646 var res = args
647 .ChainedMap(
647 .ChainedMap(
648 // Analysis disable once AccessToDisposedClosure
648 // Analysis disable once AccessToDisposedClosure
649 x => pool.Invoke(
649 x => pool.Invoke(
650 () => Math.Sin(x * x)
650 () => Math.Sin(x * x)
651 ),
651 ),
652 4
652 4
653 )
653 )
654 .Join();
654 .Join();
655
655
656 Console.WriteLine("Map complete in {0} ms", Environment.TickCount - t);
656 Console.WriteLine("Map complete in {0} ms", Environment.TickCount - t);
657
657
658 t = Environment.TickCount;
658 t = Environment.TickCount;
659 for (int i = 0; i < count; i++)
659 for (int i = 0; i < count; i++)
660 Assert.AreEqual(Math.Sin(args[i] * args[i]), res[i]);
660 Assert.AreEqual(Math.Sin(args[i] * args[i]), res[i]);
661 Console.WriteLine("Verified in {0} ms", Environment.TickCount - t);
661 Console.WriteLine("Verified in {0} ms", Environment.TickCount - t);
662 Console.WriteLine("Max workers: {0}", pool.MaxRunningThreads);
662 Console.WriteLine("Max workers: {0}", pool.MaxRunningThreads);
663 }
663 }
664 }
664 }
665
665
666 [TestMethod]
666 [TestMethod]
667 public void ParallelForEachTest() {
667 public void ParallelForEachTest() {
668
668
669 const int count = 100000;
669 const int count = 100000;
670
670
671 var args = new int[count];
671 var args = new int[count];
672 var rand = new Random();
672 var rand = new Random();
673
673
674 for (int i = 0; i < count; i++)
674 for (int i = 0; i < count; i++)
675 args[i] = (int)(rand.NextDouble() * 100);
675 args[i] = (int)(rand.NextDouble() * 100);
676
676
677 int result = 0;
677 int result = 0;
678
678
679 var t = Environment.TickCount;
679 var t = Environment.TickCount;
680 args.ParallelForEach(x => Interlocked.Add(ref result, x), 4).Join();
680 args.ParallelForEach(x => Interlocked.Add(ref result, x), 4).Join();
681
681
682 Console.WriteLine("Iteration complete in {0} ms, result: {1}", Environment.TickCount - t, result);
682 Console.WriteLine("Iteration complete in {0} ms, result: {1}", Environment.TickCount - t, result);
683
683
684 int result2 = 0;
684 int result2 = 0;
685
685
686 t = Environment.TickCount;
686 t = Environment.TickCount;
687 for (int i = 0; i < count; i++)
687 for (int i = 0; i < count; i++)
688 result2 += args[i];
688 result2 += args[i];
689 Assert.AreEqual(result2, result);
689 Assert.AreEqual(result2, result);
690 Console.WriteLine("Verified in {0} ms", Environment.TickCount - t);
690 Console.WriteLine("Verified in {0} ms", Environment.TickCount - t);
691 }
691 }
692
692
693 [TestMethod]
693 [TestMethod]
694 public void ComplexCase1Test() {
694 public void ComplexCase1Test() {
695 var flags = new bool[3];
695 var flags = new bool[3];
696
696
697 // op1 (aync 200ms) => op2 (async 200ms) => op3 (sync map)
697 // op1 (aync 200ms) => op2 (async 200ms) => op3 (sync map)
698
698
699 var step1 = PromiseHelper
699 var step1 = PromiseHelper
700 .Sleep(200, "Alan")
700 .Sleep(200, "Alan")
701 .On(() => flags[0] = true, PromiseEventType.Cancelled);
701 .On(() => flags[0] = true, PromiseEventType.Cancelled);
702 var p = step1
702 var p = step1
703 .Chain(x =>
703 .Chain(x =>
704 PromiseHelper
704 PromiseHelper
705 .Sleep(200, "Hi, " + x)
705 .Sleep(200, "Hi, " + x)
706 .Then(y => y)
706 .Then(y => y)
707 .On(() => flags[1] = true, PromiseEventType.Cancelled)
707 .On(() => flags[1] = true, PromiseEventType.Cancelled)
708 )
708 )
709 .On(() => flags[2] = true, PromiseEventType.Cancelled);
709 .On(() => flags[2] = true, PromiseEventType.Cancelled);
710 step1.Join();
710 step1.Join();
711 p.Cancel();
711 p.Cancel();
712 try {
712 try {
713 Assert.AreEqual(p.Join(), "Hi, Alan");
713 Assert.AreEqual(p.Join(), "Hi, Alan");
714 Assert.Fail("Shouldn't get here");
714 Assert.Fail("Shouldn't get here");
715 } catch (OperationCanceledException) {
715 } catch (OperationCanceledException) {
716 }
716 }
717
717
718 Assert.IsFalse(flags[0]);
718 Assert.IsFalse(flags[0]);
719 Assert.IsTrue(flags[1]);
719 Assert.IsTrue(flags[1]);
720 Assert.IsTrue(flags[2]);
720 Assert.IsTrue(flags[2]);
721 }
721 }
722
722
723 [TestMethod]
723 [TestMethod]
724 public void ChainedCancel1Test() {
724 public void ChainedCancel1Test() {
725 // ΠΏΡ€ΠΈ ΠΎΡ‚ΠΌΠ΅Π½Π΅ сцСплСнной асинхронной ΠΎΠΏΠ΅Ρ€Π°Ρ†ΠΈΠΈ всС ΠΎΠ±Π΅Ρ‰Π°Π½ΠΈΠ΅ Π΄ΠΎΠ»ΠΆΠ½ΠΎ
725 // ΠΏΡ€ΠΈ ΠΎΡ‚ΠΌΠ΅Π½Π΅ сцСплСнной асинхронной ΠΎΠΏΠ΅Ρ€Π°Ρ†ΠΈΠΈ всС ΠΎΠ±Π΅Ρ‰Π°Π½ΠΈΠ΅ Π΄ΠΎΠ»ΠΆΠ½ΠΎ
726 // Π·Π°Π²Π΅Ρ€ΡˆΠ°Ρ‚ΡŒΡΡ ошибкой OperationCanceledException
726 // Π·Π°Π²Π΅Ρ€ΡˆΠ°Ρ‚ΡŒΡΡ ошибкой OperationCanceledException
727 var p = PromiseHelper
727 var p = PromiseHelper
728 .Sleep(1, "Hi, HAL!")
728 .Sleep(1, "Hi, HAL!")
729 .Then(x => {
729 .Then(x => {
730 // запускаСм Π΄Π²Π΅ асинхронныС ΠΎΠΏΠ΅Ρ€Π°Ρ†ΠΈΠΈ
730 // запускаСм Π΄Π²Π΅ асинхронныС ΠΎΠΏΠ΅Ρ€Π°Ρ†ΠΈΠΈ
731 var result = PromiseHelper.Sleep(1000, "HEM ENABLED!!!");
731 var result = PromiseHelper.Sleep(1000, "HEM ENABLED!!!");
732 // вторая опСрация отмСняСт ΠΏΠ΅Ρ€Π²ΡƒΡŽ Π΄ΠΎ Π·Π°Π²Π΅Ρ€ΡˆΠ΅Π½ΠΈΡ
732 // вторая опСрация отмСняСт ΠΏΠ΅Ρ€Π²ΡƒΡŽ Π΄ΠΎ Π·Π°Π²Π΅Ρ€ΡˆΠ΅Π½ΠΈΡ
733 PromiseHelper
733 PromiseHelper
734 .Sleep(100, "HAL, STOP!")
734 .Sleep(100, "HAL, STOP!")
735 .Then(result.Cancel);
735 .Then(result.Cancel);
736 return result;
736 return result;
737 });
737 });
738 try {
738 try {
739 p.Join();
739 p.Join();
740 } catch (TargetInvocationException err) {
740 } catch (TargetInvocationException err) {
741 Assert.IsTrue(err.InnerException is OperationCanceledException);
741 Assert.IsTrue(err.InnerException is OperationCanceledException);
742 }
742 }
743 }
743 }
744
744
745 [TestMethod]
745 [TestMethod]
746 public void ChainedCancel2Test() {
746 public void ChainedCancel2Test() {
747 // ΠΏΡ€ΠΈ ΠΎΡ‚ΠΌΠ΅Π½Π΅ Ρ†Π΅ΠΏΠΎΡ‡ΠΊΠΈ ΠΎΠ±Π΅Ρ‰Π°Π½ΠΈΠΉ, Π²Π»ΠΎΠΆΠ΅Π½Π½Ρ‹Π΅ ΠΎΠΏΠ΅Ρ€Π°Ρ†ΠΈΠΈ Ρ‚Π°ΠΊΠΆΠ΅ Π΄ΠΎΠ»ΠΆΠ½Ρ‹ ΠΎΡ‚ΠΌΠ΅Π½ΡΡ‚ΡŒΡΡ
747 // ΠΏΡ€ΠΈ ΠΎΡ‚ΠΌΠ΅Π½Π΅ Ρ†Π΅ΠΏΠΎΡ‡ΠΊΠΈ ΠΎΠ±Π΅Ρ‰Π°Π½ΠΈΠΉ, Π²Π»ΠΎΠΆΠ΅Π½Π½Ρ‹Π΅ ΠΎΠΏΠ΅Ρ€Π°Ρ†ΠΈΠΈ Ρ‚Π°ΠΊΠΆΠ΅ Π΄ΠΎΠ»ΠΆΠ½Ρ‹ ΠΎΡ‚ΠΌΠ΅Π½ΡΡ‚ΡŒΡΡ
748 var pSurvive = new Promise<bool>();
748 var pSurvive = new Promise<bool>();
749 var hemStarted = new ManualResetEvent(false);
749 var hemStarted = new Signal();
750 var p = PromiseHelper
750 var p = PromiseHelper
751 .Sleep(1, "Hi, HAL!")
751 .Sleep(1, "Hi, HAL!")
752 .Chain(x => {
752 .Chain(() => {
753 hemStarted.Set();
753 hemStarted.Set();
754 // запускаСм Π΄Π²Π΅ асинхронныС ΠΎΠΏΠ΅Ρ€Π°Ρ†ΠΈΠΈ
754 // запускаСм Π΄Π²Π΅ асинхронныС ΠΎΠΏΠ΅Ρ€Π°Ρ†ΠΈΠΈ
755 var result = PromiseHelper
755 var result = PromiseHelper
756 .Sleep(100000000, "HEM ENABLED!!!")
756 .Sleep(2000, "HEM ENABLED!!!")
757 .Then(s => {
757 .Then(() => pSurvive.Resolve(false));
758 pSurvive.Resolve(false);
759 return s;
760 });
761
758
762 result
759 result
763 .On(() => pSurvive.Resolve(true), PromiseEventType.Cancelled);
760 .On(() => pSurvive.Resolve(true), PromiseEventType.Cancelled);
764
761
765 return result;
762 return result;
766 });
763 });
767
764
768 hemStarted.WaitOne();
765 hemStarted.Wait();
769 p.Cancel();
766 p.Cancel();
770
767
771 try {
768 try {
772 p.Join();
769 p.Join();
770 Assert.Fail();
773 } catch (OperationCanceledException) {
771 } catch (OperationCanceledException) {
772 }
774 Assert.IsTrue(pSurvive.Join());
773 Assert.IsTrue(pSurvive.Join());
775 }
774 }
776 }
777
775
778 [TestMethod]
776 [TestMethod]
779 public void SharedLockTest() {
777 public void SharedLockTest() {
780 var l = new SharedLock();
778 var l = new SharedLock();
781 int shared = 0;
779 int shared = 0;
782 int exclusive = 0;
780 int exclusive = 0;
783 var s1 = new Signal();
781 var s1 = new Signal();
784 var log = new AsyncQueue<string>();
782 var log = new AsyncQueue<string>();
785
783
786 try {
784 try {
787 AsyncPool.RunThread(
785 AsyncPool.RunThread(
788 () => {
786 () => {
789 log.Enqueue("Reader #1 started");
787 log.Enqueue("Reader #1 started");
790 try {
788 try {
791 l.LockShared();
789 l.LockShared();
792 log.Enqueue("Reader #1 lock got");
790 log.Enqueue("Reader #1 lock got");
793 if (Interlocked.Increment(ref shared) == 2)
791 if (Interlocked.Increment(ref shared) == 2)
794 s1.Set();
792 s1.Set();
795 s1.Wait();
793 s1.Wait();
796 log.Enqueue("Reader #1 finished");
794 log.Enqueue("Reader #1 finished");
797 Interlocked.Decrement(ref shared);
795 Interlocked.Decrement(ref shared);
798 } finally {
796 } finally {
799 l.Release();
797 l.Release();
800 log.Enqueue("Reader #1 lock released");
798 log.Enqueue("Reader #1 lock released");
801 }
799 }
802 },
800 },
803 () => {
801 () => {
804 log.Enqueue("Reader #2 started");
802 log.Enqueue("Reader #2 started");
805
803
806 try {
804 try {
807 l.LockShared();
805 l.LockShared();
808 log.Enqueue("Reader #2 lock got");
806 log.Enqueue("Reader #2 lock got");
809
807
810 if (Interlocked.Increment(ref shared) == 2)
808 if (Interlocked.Increment(ref shared) == 2)
811 s1.Set();
809 s1.Set();
812 s1.Wait();
810 s1.Wait();
813 log.Enqueue("Reader #2 upgrading to writer");
811 log.Enqueue("Reader #2 upgrading to writer");
814 Interlocked.Decrement(ref shared);
812 Interlocked.Decrement(ref shared);
815 l.Upgrade();
813 l.Upgrade();
816 log.Enqueue("Reader #2 upgraded");
814 log.Enqueue("Reader #2 upgraded");
817
815
818 Assert.AreEqual(1, Interlocked.Increment(ref exclusive));
816 Assert.AreEqual(1, Interlocked.Increment(ref exclusive));
819 Assert.AreEqual(0, shared);
817 Assert.AreEqual(0, shared);
820 log.Enqueue("Reader #2 finished");
818 log.Enqueue("Reader #2 finished");
821 Interlocked.Decrement(ref exclusive);
819 Interlocked.Decrement(ref exclusive);
822 } finally {
820 } finally {
823 l.Release();
821 l.Release();
824 log.Enqueue("Reader #2 lock released");
822 log.Enqueue("Reader #2 lock released");
825 }
823 }
826 },
824 },
827 () => {
825 () => {
828 log.Enqueue("Writer #1 started");
826 log.Enqueue("Writer #1 started");
829 try {
827 try {
830 l.LockExclusive();
828 l.LockExclusive();
831 log.Enqueue("Writer #1 got the lock");
829 log.Enqueue("Writer #1 got the lock");
832 Assert.AreEqual(1, Interlocked.Increment(ref exclusive));
830 Assert.AreEqual(1, Interlocked.Increment(ref exclusive));
833 Interlocked.Decrement(ref exclusive);
831 Interlocked.Decrement(ref exclusive);
834 log.Enqueue("Writer #1 is finished");
832 log.Enqueue("Writer #1 is finished");
835 } finally {
833 } finally {
836 l.Release();
834 l.Release();
837 log.Enqueue("Writer #1 lock released");
835 log.Enqueue("Writer #1 lock released");
838 }
836 }
839 }
837 }
840 ).Bundle().Join(1000);
838 ).Bundle().Join(1000);
841 log.Enqueue("Done");
839 log.Enqueue("Done");
842 } catch(Exception error) {
840 } catch(Exception error) {
843 log.Enqueue(error.Message);
841 log.Enqueue(error.Message);
844 throw;
842 throw;
845 } finally {
843 } finally {
846 foreach (var m in log)
844 foreach (var m in log)
847 Console.WriteLine(m);
845 Console.WriteLine(m);
848 }
846 }
849 }
847 }
850 }
848 }
851 }
849 }
852
850
@@ -1,144 +1,148
1 using System;
1 using System;
2 using Implab.Parallels;
2 using Implab.Parallels;
3
3
4 #if MONO
4 #if MONO
5
5
6 using NUnit.Framework;
6 using NUnit.Framework;
7 using TestClassAttribute = NUnit.Framework.TestFixtureAttribute;
7 using TestClassAttribute = NUnit.Framework.TestFixtureAttribute;
8 using TestMethodAttribute = NUnit.Framework.TestAttribute;
8 using TestMethodAttribute = NUnit.Framework.TestAttribute;
9
9
10 #else
10 #else
11
11
12 using Microsoft.VisualStudio.TestTools.UnitTesting;
12 using Microsoft.VisualStudio.TestTools.UnitTesting;
13
13
14 #endif
14 #endif
15
15
16 namespace Implab.Test {
16 namespace Implab.Test {
17 [TestClass]
17 [TestClass]
18 public class CancelationTests {
18 public class CancelationTests {
19
19
20 [TestMethod]
20 [TestMethod]
21 public void PromiseCancelTest() {
21 public void PromiseCancelTest() {
22 var p = new Promise();
22 var p = new Promise();
23 bool requested = false;
23 bool requested = false;
24 var reason = new Exception("Test");
24 var reason = new Exception("Test");
25
25
26 // request cancelation
26 // request cancelation
27 p.Cancel(reason);
27 p.Cancel(reason);
28
28
29 Assert.IsTrue(p.IsCancellationRequested);
29 Assert.IsTrue(p.IsCancellationRequested);
30 Assert.AreSame(reason, p.CancellationReason);
30 Assert.AreSame(reason, p.CancellationReason);
31 Assert.IsFalse(p.IsCancelled);
31 Assert.IsFalse(p.IsCancelled);
32
32
33 p.CancellationRequested(r => {
33 p.CancellationRequested(r => {
34 Assert.AreSame(reason, r);
34 Assert.AreSame(reason, r);
35 requested = true;
35 requested = true;
36 });
36 });
37
37
38 Assert.IsTrue(requested);
38 Assert.IsTrue(requested);
39
39
40 // cancel the promise
40 // cancel the promise
41 Assert.IsTrue(p.CancelOperationIfRequested());
41 Assert.IsTrue(p.CancelOperationIfRequested());
42 Assert.IsTrue(p.IsCancelled);
42 Assert.IsTrue(p.IsCancelled);
43 Assert.AreSame(reason, p.Error);
43 Assert.AreSame(reason, p.Error);
44 }
44 }
45
45
46 [TestMethod]
46 [TestMethod]
47 public void CancelActionBeforeStartTask() {
47 public void CancelActionBeforeStartTask() {
48 bool run = false;
48 bool run = false;
49 var task = new ActionTask(() => {
49 var task = new ActionTask(() => {
50 run = true;
50 run = true;
51 }, null, null);
51 }, null, null, true);
52
52
53 // request cancelation
53 // request cancelation
54 task.Cancel();
54 task.Cancel();
55 Assert.IsTrue(task.IsCancelled);
55 Assert.IsTrue(task.IsCancelled);
56 task.Resolve();
56 task.Resolve();
57 Assert.IsFalse(run);
57 Assert.IsFalse(run);
58 }
58 }
59
59
60 [TestMethod]
60 [TestMethod]
61 public void CancelActionAfterTaskStarted() {
61 public void CancelActionAfterTaskStarted() {
62 var finish = new Signal();
62 var finish = new Signal();
63 var started = new Signal();
63 var started = new Signal();
64
64
65 var task = new ActionTask(() => {
65 var task = new ActionTask(() => {
66 started.Set();
66 started.Set();
67 finish.Wait();
67 finish.Wait();
68 }, null, null);
68 }, null, null, true);
69
69
70 AsyncPool.RunThread(() => {
70 AsyncPool.RunThread(() => {
71 task.Resolve();
71 task.Resolve();
72 });
72 });
73
73
74 started.Wait(1000);
74 started.Wait(1000);
75
75
76 task.Cancel();
76 task.Cancel();
77 Assert.IsTrue(task.IsCancellationRequested);
77 Assert.IsTrue(task.IsCancellationRequested);
78 Assert.IsFalse(task.IsCancelled);
78 Assert.IsFalse(task.IsCancelled);
79 Assert.IsFalse(task.IsResolved);
79 Assert.IsFalse(task.IsResolved);
80
80
81 finish.Set();
81 finish.Set();
82 task.Join(1000);
82 task.Join(1000);
83
83
84 }
84 }
85
85
86 [TestMethod]
86 [TestMethod]
87 public void CancelTaskChainFromBottom() {
87 public void CancelTaskChainFromBottom() {
88 var started = new Signal();
88 var check1 = new Signal();
89 var check1 = new Signal();
89 var requested = false;
90 var requested = false;
90 var p1 = AsyncPool.RunThread(token => {
91 var p1 = AsyncPool.RunThread(token => {
91 token.CancellationRequested(reason => requested = true);
92 token.CancellationRequested(reason => requested = true);
93 started.Set();
92 check1.Wait();
94 check1.Wait();
93 token.CancelOperationIfRequested();
95 token.CancelOperationIfRequested();
94 });
96 });
95
97
98 started.Wait();
99
96 var p2 = p1.Then(() => {
100 var p2 = p1.Then(() => {
97 });
101 });
98
102
99 Assert.IsFalse(p1.IsResolved);
103 Assert.IsFalse(p1.IsResolved);
100 Assert.IsFalse(p2.IsResolved);
104 Assert.IsFalse(p2.IsResolved);
101
105
102 p2.Cancel();
106 p2.Cancel();
103
107
104 Assert.IsFalse(p2.IsCancelled);
108 Assert.IsFalse(p2.IsCancelled);
105 Assert.IsFalse(p1.IsCancelled);
109 Assert.IsFalse(p1.IsCancelled);
106 Assert.IsTrue(requested);
110 Assert.IsTrue(requested);
107
111
108 check1.Set();
112 check1.Set();
109
113
110 try {
114 try {
111 p2.Join(1000);
115 p2.Join(1000);
112 Assert.Fail("The chain isn't cancelled");
116 Assert.Fail("The chain isn't cancelled");
113 } catch(OperationCanceledException){
117 } catch(OperationCanceledException){
114 }
118 }
115
119
116 Assert.IsTrue(p1.IsCancelled);
120 Assert.IsTrue(p1.IsCancelled);
117 Assert.IsTrue(p2.IsCancelled);
121 Assert.IsTrue(p2.IsCancelled);
118 }
122 }
119
123
120
124
121
125
122 [TestMethod]
126 [TestMethod]
123 public void CancellableAsyncTask() {
127 public void CancellableAsyncTask() {
124 var finish = new Signal();
128 var finish = new Signal();
125 var started = new Signal();
129 var started = new Signal();
126
130
127 var p = AsyncPool.RunThread(token => {
131 var p = AsyncPool.RunThread(token => {
128 token.CancellationRequested(r => finish.Set());
132 token.CancellationRequested(r => finish.Set());
129 started.Set();
133 started.Set();
130 finish.Wait();
134 finish.Wait();
131 Assert.IsTrue(token.CancelOperationIfRequested());
135 Assert.IsTrue(token.CancelOperationIfRequested());
132 });
136 });
133
137
134 started.Wait(1000);
138 started.Wait(1000);
135 Assert.IsFalse(p.IsResolved);
139 Assert.IsFalse(p.IsResolved);
136 p.Cancel();
140 p.Cancel();
137 try {
141 try {
138 p.Join(1000);
142 p.Join(1000);
139 } catch (OperationCanceledException) {
143 } catch (OperationCanceledException) {
140 }
144 }
141 }
145 }
142 }
146 }
143 }
147 }
144
148
@@ -1,13 +1,14
1 using Implab.Parallels;
1 using Implab.Parallels;
2 using System.Threading;
2 using System.Threading;
3
3
4 namespace Implab.Test {
4 namespace Implab.Test {
5 static class PromiseHelper {
5 static class PromiseHelper {
6 public static IPromise<T> Sleep<T>(int timeout, T retVal) {
6 public static IPromise<T> Sleep<T>(int timeout, T retVal) {
7 return AsyncPool.Invoke(() => {
7 return AsyncPool.Invoke((ct) => {
8 ct.CancellationRequested(ct.CancelOperation);
8 Thread.Sleep(timeout);
9 Thread.Sleep(timeout);
9 return retVal;
10 return retVal;
10 });
11 });
11 }
12 }
12 }
13 }
13 }
14 }
@@ -1,23 +1,25
1 using System;
1 using System;
2
2
3 namespace Implab {
3 namespace Implab {
4 public class ActionChainTask : ActionChainTaskBase, IDeferred {
4 public class ActionChainTask : ActionChainTaskBase, IDeferred {
5 readonly Func<IPromise> m_task;
5 readonly Func<IPromise> m_task;
6
6
7 public ActionChainTask(Func<IPromise> task, Func<Exception, IPromise> error, Func<Exception, IPromise> cancel) : base(error,cancel) {
7 public ActionChainTask(Func<IPromise> task, Func<Exception, IPromise> error, Func<Exception, IPromise> cancel, bool autoCancellable) : base(error,cancel, autoCancellable) {
8 m_task = task;
8 m_task = task;
9 }
9 }
10
10
11 public void Resolve() {
11 public void Resolve() {
12 if (m_task != null && LockCancelation()) {
12 if (m_task != null && LockCancelation()) {
13 try {
13 try {
14 Observe(m_task());
14 var p = m_task();
15 p.On(SetResult, HandleErrorInternal, SetCancelled);
16 CancellationRequested(p.Cancel);
15 } catch(Exception err) {
17 } catch(Exception err) {
16 HandleErrorInternal(err);
18 HandleErrorInternal(err);
17 }
19 }
18 }
20 }
19 }
21 }
20
22
21 }
23 }
22 }
24 }
23
25
@@ -1,62 +1,58
1 using System;
1 using System;
2 using System.Threading;
2 using System.Threading;
3
3
4 namespace Implab {
4 namespace Implab {
5 public class ActionChainTaskBase : AbstractPromise {
5 public class ActionChainTaskBase : AbstractPromise {
6 readonly Func<Exception, IPromise> m_error;
6 readonly Func<Exception, IPromise> m_error;
7 readonly Func<Exception, IPromise> m_cancel;
7 readonly Func<Exception, IPromise> m_cancel;
8
8
9 int m_cancelationLock;
9 int m_cancelationLock;
10
10
11 protected ActionChainTaskBase( Func<Exception, IPromise> error, Func<Exception, IPromise> cancel) {
11 protected ActionChainTaskBase(Func<Exception, IPromise> error, Func<Exception, IPromise> cancel, bool autoCancellable) {
12 m_error = error;
12 m_error = error;
13 m_cancel = cancel;
13 m_cancel = cancel;
14 if (autoCancellable)
15 CancellationRequested(CancelOperation);
14 }
16 }
15
17
16 public void Reject(Exception error) {
18 public void Reject(Exception error) {
17 if (LockCancelation())
19 if (LockCancelation())
18 HandleErrorInternal(error);
20 HandleErrorInternal(error);
19 }
21 }
20
22
21
23
22
24
23 public override void CancelOperation(Exception reason) {
25 public override void CancelOperation(Exception reason) {
24 if (m_cancel != null && LockCancelation()) {
26 if (LockCancelation()) {
27 if (m_cancel != null) {
25 try {
28 try {
26 Observe(m_cancel(reason));
29 m_cancel(reason).On(SetResult, SetError, SetCancelled);
27 } catch(Exception err) {
30 } catch (Exception err) {
28 HandleErrorInternal(err);
31 HandleErrorInternal(err);
29 }
32 }
33 } else {
34 SetCancelled(reason);
30 }
35 }
31
36 }
32 }
37 }
33
38
34 protected void HandleErrorInternal(Exception error) {
39 protected void HandleErrorInternal(Exception error) {
35 if (m_error != null) {
40 if (m_error != null) {
36 try {
41 try {
37 Observe(m_error(error));
42 var p = m_error(error);
43 p.On(SetResult,SetError,SetCancelled);
44 CancellationRequested(p.Cancel);
38 } catch(Exception err) {
45 } catch (Exception err) {
39 SetError(err);
46 SetError(err);
40 }
47 }
41 } else {
48 } else {
42 SetError(error);
49 SetError(error);
43 }
50 }
44 }
51 }
45
52
46 protected void Observe(IPromise operation) {
47 if (operation == null)
48 throw new NullReferenceException("The task returned null promise");
49
50 // pass operation results to the current promise
51 operation.On(SetResult, SetError, SetCancelled);
52
53 // pass the cancelation request
54 CancellationRequested(operation.Cancel);
55 }
56
57 protected bool LockCancelation() {
53 protected bool LockCancelation() {
58 return 0 == Interlocked.CompareExchange(ref m_cancelationLock, 1, 0);
54 return 0 == Interlocked.CompareExchange(ref m_cancelationLock, 1, 0);
59 }
55 }
60 }
56 }
61 }
57 }
62
58
@@ -1,23 +1,25
1 using System;
1 using System;
2
2
3 namespace Implab {
3 namespace Implab {
4 public class ActionChainTask<T> : ActionChainTaskBase, IDeferred<T> {
4 public class ActionChainTask<T> : ActionChainTaskBase, IDeferred<T> {
5 readonly Func<T, IPromise> m_task;
5 readonly Func<T, IPromise> m_task;
6
6
7 public ActionChainTask(Func<T, IPromise> task, Func<Exception, IPromise> error, Func<Exception, IPromise> cancel) : base(error,cancel) {
7 public ActionChainTask(Func<T, IPromise> task, Func<Exception, IPromise> error, Func<Exception, IPromise> cancel, bool autoCancellable) : base(error,cancel, autoCancellable) {
8 m_task = task;
8 m_task = task;
9 }
9 }
10
10
11 public void Resolve(T value) {
11 public void Resolve(T value) {
12 if (m_task != null && LockCancelation()) {
12 if (m_task != null && LockCancelation()) {
13 try {
13 try {
14 Observe(m_task(value));
14 var p = m_task(value);
15 p.On(SetResult, HandleErrorInternal, SetCancelled);
16 CancellationRequested(p.Cancel);
15 } catch(Exception err) {
17 } catch(Exception err) {
16 HandleErrorInternal(err);
18 HandleErrorInternal(err);
17 }
19 }
18 }
20 }
19 }
21 }
20
22
21 }
23 }
22 }
24 }
23
25
@@ -1,22 +1,22
1 using System;
1 using System;
2
2
3 namespace Implab {
3 namespace Implab {
4 public class ActionTask : ActionTaskBase, IDeferred {
4 public class ActionTask : ActionTaskBase, IDeferred {
5 readonly Action m_task;
5 readonly Action m_task;
6 public ActionTask(Action task, Action<Exception> error, Action<Exception> cancel) : base(error,cancel) {
6 public ActionTask(Action task, Action<Exception> error, Action<Exception> cancel, bool autoCancellable) : base(error,cancel, autoCancellable) {
7 m_task = task;
7 m_task = task;
8 }
8 }
9
9
10 public void Resolve() {
10 public void Resolve() {
11 if (m_task != null && LockCancelation()) {
11 if (m_task != null && LockCancelation()) {
12 try {
12 try {
13 m_task();
13 m_task();
14 SetResult();
14 SetResult();
15 } catch(Exception err) {
15 } catch(Exception err) {
16 HandleErrorInternal(err);
16 HandleErrorInternal(err);
17 }
17 }
18 }
18 }
19 }
19 }
20 }
20 }
21 }
21 }
22
22
@@ -1,55 +1,57
1 using System;
1 using System;
2 using System.Threading;
2 using System.Threading;
3
3
4 namespace Implab {
4 namespace Implab {
5 public class ActionTaskBase : AbstractPromise {
5 public class ActionTaskBase : AbstractPromise {
6 readonly Action<Exception> m_cancel;
6 readonly Action<Exception> m_cancel;
7 readonly Action<Exception> m_error;
7 readonly Action<Exception> m_error;
8
8
9 int m_cancelationLock;
9 int m_cancelationLock;
10
10
11 protected ActionTaskBase( Action<Exception> error, Action<Exception> cancel) {
11 protected ActionTaskBase( Action<Exception> error, Action<Exception> cancel, bool autoCancellable) {
12 m_error = error;
12 m_error = error;
13 m_cancel = cancel;
13 m_cancel = cancel;
14 if (autoCancellable)
15 CancellationRequested(CancelOperation);
14 }
16 }
15
17
16 public void Reject(Exception error) {
18 public void Reject(Exception error) {
17 Safe.ArgumentNotNull(error, "error");
19 Safe.ArgumentNotNull(error, "error");
18 if (LockCancelation())
20 if (LockCancelation())
19 HandleErrorInternal(error);
21 HandleErrorInternal(error);
20 }
22 }
21
23
22 protected void HandleErrorInternal(Exception error) {
24 protected void HandleErrorInternal(Exception error) {
23 if (m_error != null) {
25 if (m_error != null) {
24 try {
26 try {
25 m_error(error);
27 m_error(error);
26 SetResult();
28 SetResult();
27 } catch(Exception err) {
29 } catch(Exception err) {
28 SetError(err);
30 SetError(err);
29 }
31 }
30 } else {
32 } else {
31 SetError(error);
33 SetError(error);
32 }
34 }
33 }
35 }
34
36
35 public override void CancelOperation(Exception reason) {
37 public override void CancelOperation(Exception reason) {
36 if (LockCancelation()) {
38 if (LockCancelation()) {
37 if (m_cancel != null) {
39 if (m_cancel != null) {
38 try {
40 try {
39 m_cancel(reason);
41 m_cancel(reason);
40 SetResult();
42 SetResult();
41 } catch (Exception err) {
43 } catch (Exception err) {
42 HandleErrorInternal(err);
44 HandleErrorInternal(err);
43 }
45 }
44 } else {
46 } else {
45 SetCancelled(reason);
47 SetCancelled(reason);
46 }
48 }
47 }
49 }
48 }
50 }
49
51
50 protected bool LockCancelation() {
52 protected bool LockCancelation() {
51 return 0 == Interlocked.CompareExchange(ref m_cancelationLock, 1, 0);
53 return 0 == Interlocked.CompareExchange(ref m_cancelationLock, 1, 0);
52 }
54 }
53 }
55 }
54 }
56 }
55
57
@@ -1,22 +1,22
1 using System;
1 using System;
2
2
3 namespace Implab {
3 namespace Implab {
4 public class ActionTask<T> : ActionTaskBase, IDeferred<T> {
4 public class ActionTask<T> : ActionTaskBase, IDeferred<T> {
5 readonly Action<T> m_task;
5 readonly Action<T> m_task;
6 public ActionTask(Action<T> task, Action<Exception> error, Action<Exception> cancel) : base(error,cancel) {
6 public ActionTask(Action<T> task, Action<Exception> error, Action<Exception> cancel, bool autoCancellable) : base(error,cancel, autoCancellable) {
7 m_task = task;
7 m_task = task;
8 }
8 }
9
9
10 public void Resolve(T value) {
10 public void Resolve(T value) {
11 if (m_task != null && LockCancelation()) {
11 if (m_task != null && LockCancelation()) {
12 try {
12 try {
13 m_task(value);
13 m_task(value);
14 SetResult();
14 SetResult();
15 } catch(Exception err) {
15 } catch(Exception err) {
16 HandleErrorInternal(err);
16 HandleErrorInternal(err);
17 }
17 }
18 }
18 }
19 }
19 }
20 }
20 }
21 }
21 }
22
22
@@ -1,21 +1,24
1 using System;
1 using System;
2
2
3 namespace Implab {
3 namespace Implab {
4 public class FuncChainTask<TResult> : FuncChainTaskBase<TResult>, IDeferred {
4 public class FuncChainTask<TResult> : FuncChainTaskBase<TResult>, IDeferred {
5 readonly Func<IPromise<TResult>> m_task;
5 readonly Func<IPromise<TResult>> m_task;
6
6
7 public FuncChainTask(Func<IPromise<TResult>> task, Func<Exception, IPromise<TResult>> error, Func<Exception, IPromise<TResult>> cancel) : base(error, cancel){
7 public FuncChainTask(Func<IPromise<TResult>> task, Func<Exception, IPromise<TResult>> error, Func<Exception, IPromise<TResult>> cancel, bool autoCancellable)
8 : base(error, cancel, autoCancellable) {
8 m_task = task;
9 m_task = task;
9 }
10 }
10
11
11 public void Resolve() {
12 public void Resolve() {
12 if (m_task != null && LockCancelation()) {
13 if (m_task != null && LockCancelation()) {
13 try {
14 try {
14 Observe(m_task());
15 var operation = m_task();
16 operation.On(SetResult, HandleErrorInternal, SetCancelled);
17 CancellationRequested(operation.Cancel);
15 } catch (Exception err) {
18 } catch (Exception err) {
16 HandleErrorInternal(err);
19 HandleErrorInternal(err);
17 }
20 }
18 }
21 }
19 }
22 }
20 }
23 }
21 } No newline at end of file
24 }
@@ -1,60 +1,58
1 using System;
1 using System;
2 using System.Threading;
2 using System.Threading;
3
3
4 namespace Implab {
4 namespace Implab {
5 public class FuncChainTaskBase<TResult> : AbstractPromise<TResult> {
5 public class FuncChainTaskBase<TResult> : AbstractPromise<TResult> {
6 readonly Func<Exception, IPromise<TResult>> m_error;
6 readonly Func<Exception, IPromise<TResult>> m_error;
7 readonly Func<Exception, IPromise<TResult>> m_cancel;
7 readonly Func<Exception, IPromise<TResult>> m_cancel;
8
8
9 int m_cancelationLock;
9 int m_cancelationLock;
10
10
11 protected FuncChainTaskBase( Func<Exception, IPromise<TResult>> error, Func<Exception, IPromise<TResult>> cancel) {
11 protected FuncChainTaskBase( Func<Exception, IPromise<TResult>> error, Func<Exception, IPromise<TResult>> cancel, bool autoCancellable) {
12 m_error = error;
12 m_error = error;
13 m_cancel = cancel;
13 m_cancel = cancel;
14 if (autoCancellable)
15 CancellationRequested(CancelOperation);
14 }
16 }
15
17
16 public void Reject(Exception error) {
18 public void Reject(Exception error) {
17 if (LockCancelation())
19 if (LockCancelation())
18 HandleErrorInternal(error);
20 HandleErrorInternal(error);
19 }
21 }
20
22
21 public override void CancelOperation(Exception reason) {
23 public override void CancelOperation(Exception reason) {
22 if (m_cancel != null && LockCancelation()) {
24 if (LockCancelation()) {
25 if (m_cancel != null) {
23 try {
26 try {
24 Observe(m_cancel(reason));
27 m_cancel(reason).On(SetResult, HandleErrorInternal, SetCancelled);
25 } catch(Exception err) {
28 } catch (Exception err) {
26 HandleErrorInternal(err);
29 HandleErrorInternal(err);
27 }
30 }
31 } else {
32 SetCancelled(reason);
33 }
28 }
34 }
29
35
30 }
36 }
31
37
32 protected void HandleErrorInternal(Exception error) {
38 protected void HandleErrorInternal(Exception error) {
33 if (m_error != null) {
39 if (m_error != null) {
34 try {
40 try {
35 Observe(m_error(error));
41 var operation = m_error(error);
42
43 operation.On(SetResult, SetError, SetCancelled);
44 CancellationRequested(operation.Cancel);
36 } catch(Exception err) {
45 } catch(Exception err) {
37 SetError(err);
46 SetError(err);
38 }
47 }
39 } else {
48 } else {
40 SetError(error);
49 SetError(error);
41 }
50 }
42 }
51 }
43
52
44 protected void Observe(IPromise<TResult> operation) {
45 if (operation == null)
46 throw new NullReferenceException("The task returned null promise");
47
48 // pass operation results to the current promise
49 operation.On(SetResult, SetError, SetCancelled);
50
51 // pass the cancelation request
52 CancellationRequested(operation.Cancel);
53 }
54
55 protected bool LockCancelation() {
53 protected bool LockCancelation() {
56 return 0 == Interlocked.CompareExchange(ref m_cancelationLock, 1, 0);
54 return 0 == Interlocked.CompareExchange(ref m_cancelationLock, 1, 0);
57 }
55 }
58 }
56 }
59 }
57 }
60
58
@@ -1,21 +1,23
1 using System;
1 using System;
2
2
3 namespace Implab {
3 namespace Implab {
4 public class FuncChainTask<TArg,TResult> : FuncChainTaskBase<TResult>, IDeferred<TArg> {
4 public class FuncChainTask<TArg,TResult> : FuncChainTaskBase<TResult>, IDeferred<TArg> {
5 readonly Func<TArg, IPromise<TResult>> m_task;
5 readonly Func<TArg, IPromise<TResult>> m_task;
6
6
7 public FuncChainTask(Func<TArg, IPromise<TResult>> task, Func<Exception, IPromise<TResult>> error, Func<Exception, IPromise<TResult>> cancel) : base(error, cancel){
7 public FuncChainTask(Func<TArg, IPromise<TResult>> task, Func<Exception, IPromise<TResult>> error, Func<Exception, IPromise<TResult>> cancel, bool autoCancellable) : base(error, cancel, autoCancellable){
8 m_task = task;
8 m_task = task;
9 }
9 }
10
10
11 public void Resolve(TArg value) {
11 public void Resolve(TArg value) {
12 if (m_task != null && LockCancelation()) {
12 if (m_task != null && LockCancelation()) {
13 try {
13 try {
14 Observe(m_task(value));
14 var operation = m_task(value);
15 operation.On(SetResult, HandleErrorInternal, SetCancelled);
16 CancellationRequested(operation.Cancel);
15 } catch (Exception err) {
17 } catch (Exception err) {
16 HandleErrorInternal(err);
18 HandleErrorInternal(err);
17 }
19 }
18 }
20 }
19 }
21 }
20 }
22 }
21 } No newline at end of file
23 }
@@ -1,23 +1,23
1 using System;
1 using System;
2 using System.Threading;
2 using System.Threading;
3
3
4 namespace Implab {
4 namespace Implab {
5 public class FuncTask<T> : FuncTaskBase<T>, IDeferred {
5 public class FuncTask<T> : FuncTaskBase<T>, IDeferred {
6 readonly Func<T> m_task;
6 readonly Func<T> m_task;
7
7
8 public FuncTask(Func<T> task, Func<Exception, T> error, Func<Exception, T> cancel) : base(error,cancel) {
8 public FuncTask(Func<T> task, Func<Exception, T> error, Func<Exception, T> cancel, bool autoCancellable) : base(error, cancel, autoCancellable) {
9 m_task = task;
9 m_task = task;
10 }
10 }
11
11
12 public void Resolve() {
12 public void Resolve() {
13 if (m_task != null && LockCancelation()) {
13 if (m_task != null && LockCancelation()) {
14 try {
14 try {
15 SetResult(m_task());
15 SetResult(m_task());
16 } catch(Exception err) {
16 } catch(Exception err) {
17 HandleErrorInternal(err);
17 HandleErrorInternal(err);
18 }
18 }
19 }
19 }
20 }
20 }
21 }
21 }
22 }
22 }
23
23
@@ -1,53 +1,55
1 using System;
1 using System;
2 using System.Threading;
2 using System.Threading;
3
3
4 namespace Implab {
4 namespace Implab {
5 public class FuncTaskBase<TResult> : AbstractPromise<TResult> {
5 public class FuncTaskBase<TResult> : AbstractPromise<TResult> {
6 readonly Func<Exception, TResult> m_cancel;
6 readonly Func<Exception, TResult> m_cancel;
7 readonly Func<Exception, TResult> m_error;
7 readonly Func<Exception, TResult> m_error;
8
8
9 int m_cancelationLock;
9 int m_cancelationLock;
10
10
11 protected FuncTaskBase( Func<Exception, TResult> error, Func<Exception, TResult> cancel) {
11 protected FuncTaskBase( Func<Exception, TResult> error, Func<Exception, TResult> cancel, bool autoCancellable) {
12 m_error = error;
12 m_error = error;
13 m_cancel = cancel;
13 m_cancel = cancel;
14 if (autoCancellable)
15 CancellationRequested(CancelOperation);
14 }
16 }
15
17
16 public void Reject(Exception error) {
18 public void Reject(Exception error) {
17 Safe.ArgumentNotNull(error, "error");
19 Safe.ArgumentNotNull(error, "error");
18 if (LockCancelation())
20 if (LockCancelation())
19 HandleErrorInternal(error);
21 HandleErrorInternal(error);
20 }
22 }
21
23
22 protected void HandleErrorInternal(Exception error) {
24 protected void HandleErrorInternal(Exception error) {
23 if (m_error != null) {
25 if (m_error != null) {
24 try {
26 try {
25 SetResult(m_error(error));
27 SetResult(m_error(error));
26 } catch(Exception err) {
28 } catch(Exception err) {
27 SetError(err);
29 SetError(err);
28 }
30 }
29 } else {
31 } else {
30 SetError(error);
32 SetError(error);
31 }
33 }
32 }
34 }
33
35
34 public override void CancelOperation(Exception reason) {
36 public override void CancelOperation(Exception reason) {
35 if (LockCancelation()) {
37 if (LockCancelation()) {
36 if (m_cancel != null) {
38 if (m_cancel != null) {
37 try {
39 try {
38 SetResult(m_cancel(reason));
40 SetResult(m_cancel(reason));
39 } catch (Exception err) {
41 } catch (Exception err) {
40 HandleErrorInternal(err);
42 HandleErrorInternal(err);
41 }
43 }
42 } else {
44 } else {
43 SetCancelled(reason);
45 SetCancelled(reason);
44 }
46 }
45 }
47 }
46 }
48 }
47
49
48 protected bool LockCancelation() {
50 protected bool LockCancelation() {
49 return 0 == Interlocked.CompareExchange(ref m_cancelationLock, 1, 0);
51 return 0 == Interlocked.CompareExchange(ref m_cancelationLock, 1, 0);
50 }
52 }
51 }
53 }
52 }
54 }
53
55
@@ -1,22 +1,22
1 using System;
1 using System;
2
2
3 namespace Implab {
3 namespace Implab {
4 public class FuncTask<TArg, TResult> : FuncTaskBase<TResult>, IDeferred<TArg> {
4 public class FuncTask<TArg, TResult> : FuncTaskBase<TResult>, IDeferred<TArg> {
5 readonly Func<TArg, TResult> m_task;
5 readonly Func<TArg, TResult> m_task;
6
6
7 public FuncTask(Func<TArg, TResult> task, Func<Exception, TResult> error,Func<Exception, TResult> cancel) : base(error,cancel) {
7 public FuncTask(Func<TArg, TResult> task, Func<Exception, TResult> error,Func<Exception, TResult> cancel, bool autoCancellable) : base(error,cancel, autoCancellable) {
8 m_task = task;
8 m_task = task;
9 }
9 }
10
10
11 public void Resolve(TArg value) {
11 public void Resolve(TArg value) {
12 if (m_task != null && LockCancelation()) {
12 if (m_task != null && LockCancelation()) {
13 try {
13 try {
14 SetResult(m_task(value));
14 SetResult(m_task(value));
15 } catch (Exception err) {
15 } catch (Exception err) {
16 HandleErrorInternal(err);
16 HandleErrorInternal(err);
17 }
17 }
18 }
18 }
19 }
19 }
20 }
20 }
21 }
21 }
22
22
@@ -1,82 +1,152
1 using System;
1 using System;
2 using System.Threading;
2 using System.Threading;
3 using System.Diagnostics;
3 using System.Diagnostics;
4 using Implab.Diagnostics;
4 using Implab.Diagnostics;
5
5
6 namespace Implab.Parallels {
6 namespace Implab.Parallels {
7 public class WorkerPool : DispatchPool<Action> {
7 public class WorkerPool : DispatchPool<Action> {
8
8
9 AsyncQueue<Action> m_queue = new AsyncQueue<Action>();
9 AsyncQueue<Action> m_queue = new AsyncQueue<Action>();
10 int m_queueLength;
10 int m_queueLength;
11 readonly int m_threshold = 1;
11 readonly int m_threshold = 1;
12
12
13 public WorkerPool(int minThreads, int maxThreads, int threshold)
13 public WorkerPool(int minThreads, int maxThreads, int threshold)
14 : base(minThreads, maxThreads) {
14 : base(minThreads, maxThreads) {
15 m_threshold = threshold;
15 m_threshold = threshold;
16 InitPool();
16 InitPool();
17 }
17 }
18
18
19 public WorkerPool(int minThreads, int maxThreads) :
19 public WorkerPool(int minThreads, int maxThreads) :
20 base(minThreads, maxThreads) {
20 base(minThreads, maxThreads) {
21 InitPool();
21 InitPool();
22 }
22 }
23
23
24 public WorkerPool(int threads)
24 public WorkerPool(int threads)
25 : base(threads) {
25 : base(threads) {
26 InitPool();
26 InitPool();
27 }
27 }
28
28
29 public WorkerPool() {
29 public WorkerPool() {
30 InitPool();
30 InitPool();
31 }
31 }
32
32
33 public Promise<T> Invoke<T>(Func<T> task) {
33 public IPromise<T> Invoke<T>(Func<T> task) {
34 if (task == null)
35 throw new ArgumentNullException("task");
36 if (IsDisposed)
37 throw new ObjectDisposedException(ToString());
38
39 var promise = new FuncTask<T>(task, null, null, true);
40
41 var lop = TraceContext.Instance.CurrentOperation;
42
43 EnqueueTask(delegate {
44 TraceContext.Instance.EnterLogicalOperation(lop, false);
45
46 promise.Resolve();
47
48 TraceContext.Instance.Leave();
49 });
50
51 return promise;
52 }
53
54 public IPromise Invoke(Action task) {
55 if (task == null)
56 throw new ArgumentNullException("task");
57 if (IsDisposed)
58 throw new ObjectDisposedException(ToString());
59
60 var promise = new ActionTask(task, null, null, true);
61
62 var lop = TraceContext.Instance.CurrentOperation;
63
64 EnqueueTask(delegate {
65 TraceContext.Instance.EnterLogicalOperation(lop, false);
66
67 promise.Resolve();
68
69 TraceContext.Instance.Leave();
70 });
71
72 return promise;
73 }
74
75 public IPromise<T> Invoke<T>(Func<ICancellationToken, T> task) {
34 if (task == null)
76 if (task == null)
35 throw new ArgumentNullException("task");
77 throw new ArgumentNullException("task");
36 if (IsDisposed)
78 if (IsDisposed)
37 throw new ObjectDisposedException(ToString());
79 throw new ObjectDisposedException(ToString());
38
80
39 var promise = new Promise<T>();
81 var promise = new Promise<T>();
40
82
41 var lop = TraceContext.Instance.CurrentOperation;
83 var lop = TraceContext.Instance.CurrentOperation;
42
84
43 EnqueueTask(delegate {
85 EnqueueTask(delegate {
44 TraceContext.Instance.EnterLogicalOperation(lop, false);
86 TraceContext.Instance.EnterLogicalOperation(lop, false);
45 try {
87 try {
46 promise.Resolve(task());
88 if (!promise.CancelOperationIfRequested())
89 promise.Resolve(task(promise));
90 } catch (Exception e) {
91 promise.Reject(e);
92 } finally {
93 TraceContext.Instance.Leave();
94 }
95 });
96
97 return promise;
98 }
99
100 public IPromise Invoke<T>(Action<ICancellationToken> task) {
101 if (task == null)
102 throw new ArgumentNullException("task");
103 if (IsDisposed)
104 throw new ObjectDisposedException(ToString());
105
106 var promise = new Promise();
107
108 var lop = TraceContext.Instance.CurrentOperation;
109
110 EnqueueTask(delegate {
111 TraceContext.Instance.EnterLogicalOperation(lop, false);
112 try {
113 if (!promise.CancelOperationIfRequested()) {
114 task(promise);
115 promise.Resolve();
116 }
47 } catch (Exception e) {
117 } catch (Exception e) {
48 promise.Reject(e);
118 promise.Reject(e);
49 } finally {
119 } finally {
50 TraceContext.Instance.Leave();
120 TraceContext.Instance.Leave();
51 }
121 }
52 });
122 });
53
123
54 return promise;
124 return promise;
55 }
125 }
56
126
57 protected void EnqueueTask(Action unit) {
127 protected void EnqueueTask(Action unit) {
58 Debug.Assert(unit != null);
128 Debug.Assert(unit != null);
59 var len = Interlocked.Increment(ref m_queueLength);
129 var len = Interlocked.Increment(ref m_queueLength);
60 m_queue.Enqueue(unit);
130 m_queue.Enqueue(unit);
61
131
62 if (len > m_threshold * PoolSize) {
132 if (len > m_threshold * PoolSize) {
63 StartWorker();
133 StartWorker();
64 }
134 }
65
135
66 SignalThread();
136 SignalThread();
67 }
137 }
68
138
69 protected override bool TryDequeue(out Action unit) {
139 protected override bool TryDequeue(out Action unit) {
70 if (m_queue.TryDequeue(out unit)) {
140 if (m_queue.TryDequeue(out unit)) {
71 Interlocked.Decrement(ref m_queueLength);
141 Interlocked.Decrement(ref m_queueLength);
72 return true;
142 return true;
73 }
143 }
74 return false;
144 return false;
75 }
145 }
76
146
77 protected override void InvokeUnit(Action unit) {
147 protected override void InvokeUnit(Action unit) {
78 unit();
148 unit();
79 }
149 }
80
150
81 }
151 }
82 }
152 }
@@ -1,302 +1,302
1 using System.Threading;
1 using System.Threading;
2 using System;
2 using System;
3 using Implab.Diagnostics;
3 using Implab.Diagnostics;
4 using System.Collections.Generic;
4 using System.Collections.Generic;
5
5
6
6
7 #if NET_4_5
7 #if NET_4_5
8 using System.Threading.Tasks;
8 using System.Threading.Tasks;
9 #endif
9 #endif
10
10
11 namespace Implab {
11 namespace Implab {
12 public static class PromiseExtensions {
12 public static class PromiseExtensions {
13 public static IPromise<T> DispatchToCurrentContext<T>(this IPromise<T> that) {
13 public static IPromise<T> DispatchToCurrentContext<T>(this IPromise<T> that) {
14 Safe.ArgumentNotNull(that, "that");
14 Safe.ArgumentNotNull(that, "that");
15 var context = SynchronizationContext.Current;
15 var context = SynchronizationContext.Current;
16 if (context == null)
16 if (context == null)
17 return that;
17 return that;
18
18
19 var p = new SyncContextPromise<T>(context);
19 var p = new SyncContextPromise<T>(context);
20 p.On(that.Cancel, PromiseEventType.Cancelled);
20 p.On(that.Cancel, PromiseEventType.Cancelled);
21
21
22 that.On(
22 that.On(
23 p.Resolve,
23 p.Resolve,
24 p.Reject,
24 p.Reject,
25 p.Cancel
25 p.Cancel
26 );
26 );
27 return p;
27 return p;
28 }
28 }
29
29
30 public static IPromise<T> DispatchToContext<T>(this IPromise<T> that, SynchronizationContext context) {
30 public static IPromise<T> DispatchToContext<T>(this IPromise<T> that, SynchronizationContext context) {
31 Safe.ArgumentNotNull(that, "that");
31 Safe.ArgumentNotNull(that, "that");
32 Safe.ArgumentNotNull(context, "context");
32 Safe.ArgumentNotNull(context, "context");
33
33
34 var p = new SyncContextPromise<T>(context);
34 var p = new SyncContextPromise<T>(context);
35 p.On(that.Cancel, PromiseEventType.Cancelled);
35 p.On(that.Cancel, PromiseEventType.Cancelled);
36
36
37
37
38 that.On(
38 that.On(
39 p.Resolve,
39 p.Resolve,
40 p.Reject,
40 p.Reject,
41 p.Cancel
41 p.Cancel
42 );
42 );
43 return p;
43 return p;
44 }
44 }
45
45
46 /// <summary>
46 /// <summary>
47 /// Ensures the dispatched.
47 /// Ensures the dispatched.
48 /// </summary>
48 /// </summary>
49 /// <returns>The dispatched.</returns>
49 /// <returns>The dispatched.</returns>
50 /// <param name="that">That.</param>
50 /// <param name="that">That.</param>
51 /// <param name="head">Head.</param>
51 /// <param name="head">Head.</param>
52 /// <param name="cleanup">Cleanup.</param>
52 /// <param name="cleanup">Cleanup.</param>
53 /// <typeparam name="TPromise">The 1st type parameter.</typeparam>
53 /// <typeparam name="TPromise">The 1st type parameter.</typeparam>
54 /// <typeparam name="T">The 2nd type parameter.</typeparam>
54 /// <typeparam name="T">The 2nd type parameter.</typeparam>
55 public static TPromise EnsureDispatched<TPromise,T>(this TPromise that, IPromise<T> head, Action<T> cleanup) where TPromise : IPromise{
55 public static TPromise EnsureDispatched<TPromise,T>(this TPromise that, IPromise<T> head, Action<T> cleanup) where TPromise : IPromise{
56 Safe.ArgumentNotNull(that, "that");
56 Safe.ArgumentNotNull(that, "that");
57 Safe.ArgumentNotNull(head, "head");
57 Safe.ArgumentNotNull(head, "head");
58
58
59 that.On(() => head.On(cleanup), PromiseEventType.Cancelled);
59 that.On(() => head.On(cleanup), PromiseEventType.Cancelled);
60
60
61 return that;
61 return that;
62 }
62 }
63
63
64 public static AsyncCallback AsyncCallback<T>(this Promise<T> that, Func<IAsyncResult,T> callback) {
64 public static AsyncCallback AsyncCallback<T>(this Promise<T> that, Func<IAsyncResult,T> callback) {
65 Safe.ArgumentNotNull(that, "that");
65 Safe.ArgumentNotNull(that, "that");
66 Safe.ArgumentNotNull(callback, "callback");
66 Safe.ArgumentNotNull(callback, "callback");
67 var op = TraceContext.Instance.CurrentOperation;
67 var op = TraceContext.Instance.CurrentOperation;
68 return ar => {
68 return ar => {
69 TraceContext.Instance.EnterLogicalOperation(op,false);
69 TraceContext.Instance.EnterLogicalOperation(op,false);
70 try {
70 try {
71 that.Resolve(callback(ar));
71 that.Resolve(callback(ar));
72 } catch (Exception err) {
72 } catch (Exception err) {
73 that.Reject(err);
73 that.Reject(err);
74 } finally {
74 } finally {
75 TraceContext.Instance.Leave();
75 TraceContext.Instance.Leave();
76 }
76 }
77 };
77 };
78 }
78 }
79
79
80 static void CancelCallback(object cookie) {
80 static void CancelCallback(object cookie) {
81 ((ICancellable)cookie).Cancel();
81 ((ICancellable)cookie).Cancel();
82 }
82 }
83
83
84 /// <summary>
84 /// <summary>
85 /// Cancells promise after the specified timeout is elapsed.
85 /// Cancells promise after the specified timeout is elapsed.
86 /// </summary>
86 /// </summary>
87 /// <param name="that">The promise to cancel on timeout.</param>
87 /// <param name="that">The promise to cancel on timeout.</param>
88 /// <param name="milliseconds">The timeout in milliseconds.</param>
88 /// <param name="milliseconds">The timeout in milliseconds.</param>
89 /// <typeparam name="TPromise">The 1st type parameter.</typeparam>
89 /// <typeparam name="TPromise">The 1st type parameter.</typeparam>
90 public static TPromise Timeout<TPromise>(this TPromise that, int milliseconds) where TPromise : IPromise {
90 public static TPromise Timeout<TPromise>(this TPromise that, int milliseconds) where TPromise : IPromise {
91 Safe.ArgumentNotNull(that, "that");
91 Safe.ArgumentNotNull(that, "that");
92 var timer = new Timer(CancelCallback, that, milliseconds, -1);
92 var timer = new Timer(CancelCallback, that, milliseconds, -1);
93 that.On(timer.Dispose, PromiseEventType.All);
93 that.On(timer.Dispose, PromiseEventType.All);
94 return that;
94 return that;
95 }
95 }
96
96
97 public static IPromise Bundle(this ICollection<IPromise> that) {
97 public static IPromise Bundle(this ICollection<IPromise> that) {
98 Safe.ArgumentNotNull(that, "that");
98 Safe.ArgumentNotNull(that, "that");
99
99
100 int count = that.Count;
100 int count = that.Count;
101 int errors = 0;
101 int errors = 0;
102 var medium = new Promise();
102 var medium = new Promise();
103
103
104 if (count == 0) {
104 if (count == 0) {
105 medium.Resolve();
105 medium.Resolve();
106 return medium;
106 return medium;
107 }
107 }
108
108
109 medium.On(() => {
109 medium.On(() => {
110 foreach(var p2 in that)
110 foreach(var p2 in that)
111 p2.Cancel();
111 p2.Cancel();
112 }, PromiseEventType.ErrorOrCancel);
112 }, PromiseEventType.ErrorOrCancel);
113
113
114 foreach (var p in that)
114 foreach (var p in that)
115 p.On(
115 p.On(
116 () => {
116 () => {
117 if (Interlocked.Decrement(ref count) == 0)
117 if (Interlocked.Decrement(ref count) == 0)
118 medium.Resolve();
118 medium.Resolve();
119 },
119 },
120 error => {
120 error => {
121 if (Interlocked.Increment(ref errors) == 1)
121 if (Interlocked.Increment(ref errors) == 1)
122 medium.Reject(
122 medium.Reject(
123 new Exception("The dependency promise is failed", error)
123 new Exception("The dependency promise is failed", error)
124 );
124 );
125 },
125 },
126 reason => {
126 reason => {
127 if (Interlocked.Increment(ref errors) == 1)
127 if (Interlocked.Increment(ref errors) == 1)
128 medium.Cancel(
128 medium.Cancel(
129 new Exception("The dependency promise is cancelled")
129 new Exception("The dependency promise is cancelled")
130 );
130 );
131 }
131 }
132 );
132 );
133
133
134 return medium;
134 return medium;
135 }
135 }
136
136
137 public static IPromise<T[]> Bundle<T>(this ICollection<IPromise<T>> that) {
137 public static IPromise<T[]> Bundle<T>(this ICollection<IPromise<T>> that) {
138 Safe.ArgumentNotNull(that, "that");
138 Safe.ArgumentNotNull(that, "that");
139
139
140 int count = that.Count;
140 int count = that.Count;
141 int errors = 0;
141 int errors = 0;
142 var medium = new Promise<T[]>();
142 var medium = new Promise<T[]>();
143 var results = new T[that.Count];
143 var results = new T[that.Count];
144
144
145 medium.On(() => {
145 medium.On(() => {
146 foreach(var p2 in that)
146 foreach(var p2 in that)
147 p2.Cancel();
147 p2.Cancel();
148 }, PromiseEventType.ErrorOrCancel);
148 }, PromiseEventType.ErrorOrCancel);
149
149
150 int i = 0;
150 int i = 0;
151 foreach (var p in that) {
151 foreach (var p in that) {
152 var idx = i;
152 var idx = i;
153 p.On(
153 p.On(
154 x => {
154 x => {
155 results[idx] = x;
155 results[idx] = x;
156 if (Interlocked.Decrement(ref count) == 0)
156 if (Interlocked.Decrement(ref count) == 0)
157 medium.Resolve(results);
157 medium.Resolve(results);
158 },
158 },
159 error => {
159 error => {
160 if (Interlocked.Increment(ref errors) == 1)
160 if (Interlocked.Increment(ref errors) == 1)
161 medium.Reject(
161 medium.Reject(
162 new Exception("The dependency promise is failed", error)
162 new Exception("The dependency promise is failed", error)
163 );
163 );
164 },
164 },
165 reason => {
165 reason => {
166 if (Interlocked.Increment(ref errors) == 1)
166 if (Interlocked.Increment(ref errors) == 1)
167 medium.Cancel(
167 medium.Cancel(
168 new Exception("The dependency promise is cancelled", reason)
168 new Exception("The dependency promise is cancelled", reason)
169 );
169 );
170 }
170 }
171 );
171 );
172 i++;
172 i++;
173 }
173 }
174
174
175 return medium;
175 return medium;
176 }
176 }
177
177
178 public static IPromise Then(this IPromise that, Action success, Action<Exception> error, Action<Exception> cancel) {
178 public static IPromise Then(this IPromise that, Action success, Action<Exception> error, Action<Exception> cancel) {
179 Safe.ArgumentNotNull(that, "that");
179 Safe.ArgumentNotNull(that, "that");
180
180
181 var d = new ActionTask(success, error, cancel);
181 var d = new ActionTask(success, error, cancel, false);
182 that.On(d.Resolve, d.Reject, d.CancelOperation);
182 that.On(d.Resolve, d.Reject, d.CancelOperation);
183 if (success != null)
183 if (success != null)
184 d.CancellationRequested(that.Cancel);
184 d.CancellationRequested(that.Cancel);
185 return d;
185 return d;
186 }
186 }
187
187
188 public static IPromise Then(this IPromise that, Action success, Action<Exception> error) {
188 public static IPromise Then(this IPromise that, Action success, Action<Exception> error) {
189 return Then(that, success, error, null);
189 return Then(that, success, error, null);
190 }
190 }
191
191
192 public static IPromise Then(this IPromise that, Action success) {
192 public static IPromise Then(this IPromise that, Action success) {
193 return Then(that, success, null, null);
193 return Then(that, success, null, null);
194 }
194 }
195
195
196 public static IPromise<T> Then<T>(this IPromise that, Func<T> success, Func<Exception, T> error, Func<Exception, T> cancel) {
196 public static IPromise<T> Then<T>(this IPromise that, Func<T> success, Func<Exception, T> error, Func<Exception, T> cancel) {
197 Safe.ArgumentNotNull(that, "that");
197 Safe.ArgumentNotNull(that, "that");
198
198
199 var d = new FuncTask<T>(success, error, cancel);
199 var d = new FuncTask<T>(success, error, cancel, false);
200 that.On(d.Resolve, d.Reject, d.CancelOperation);
200 that.On(d.Resolve, d.Reject, d.CancelOperation);
201 if (success != null)
201 if (success != null)
202 d.CancellationRequested(that.Cancel);
202 d.CancellationRequested(that.Cancel);
203 return d;
203 return d;
204 }
204 }
205
205
206 public static IPromise<T> Then<T>(this IPromise that, Func<T> success, Func<Exception, T> error) {
206 public static IPromise<T> Then<T>(this IPromise that, Func<T> success, Func<Exception, T> error) {
207 return Then(that, success, error, null);
207 return Then(that, success, error, null);
208 }
208 }
209
209
210 public static IPromise<T> Then<T>(this IPromise that, Func<T> success) {
210 public static IPromise<T> Then<T>(this IPromise that, Func<T> success) {
211 return Then(that, success, null, null);
211 return Then(that, success, null, null);
212 }
212 }
213
213
214 public static IPromise<T2> Then<T, T2>(this IPromise<T> that, Func<T, T2> success, Func<Exception, T2> error, Func<Exception, T2> cancel) {
214 public static IPromise<T2> Then<T, T2>(this IPromise<T> that, Func<T, T2> success, Func<Exception, T2> error, Func<Exception, T2> cancel) {
215 Safe.ArgumentNotNull(that, "that");
215 Safe.ArgumentNotNull(that, "that");
216 var d = new FuncTask<T,T2>(success, error, cancel);
216 var d = new FuncTask<T,T2>(success, error, cancel, false);
217 that.On(d.Resolve, d.Reject, d.CancelOperation);
217 that.On(d.Resolve, d.Reject, d.CancelOperation);
218 if (success != null)
218 if (success != null)
219 d.CancellationRequested(that.Cancel);
219 d.CancellationRequested(that.Cancel);
220 return d;
220 return d;
221 }
221 }
222
222
223 public static IPromise<T2> Then<T, T2>(this IPromise<T> that, Func<T, T2> success, Func<Exception, T2> error) {
223 public static IPromise<T2> Then<T, T2>(this IPromise<T> that, Func<T, T2> success, Func<Exception, T2> error) {
224 return Then(that, success, error, null);
224 return Then(that, success, error, null);
225 }
225 }
226
226
227 public static IPromise<T2> Then<T, T2>(this IPromise<T> that, Func<T, T2> success) {
227 public static IPromise<T2> Then<T, T2>(this IPromise<T> that, Func<T, T2> success) {
228 return Then(that, success, null, null);
228 return Then(that, success, null, null);
229 }
229 }
230
230
231 #region chain traits
231 #region chain traits
232 public static IPromise Chain(this IPromise that, Func<IPromise> success, Func<Exception,IPromise> error, Func<Exception,IPromise> cancel) {
232 public static IPromise Chain(this IPromise that, Func<IPromise> success, Func<Exception,IPromise> error, Func<Exception,IPromise> cancel) {
233 Safe.ArgumentNotNull(that, "that");
233 Safe.ArgumentNotNull(that, "that");
234
234
235 var d = new ActionChainTask(success, error, cancel);
235 var d = new ActionChainTask(success, error, cancel, false);
236 that.On(d.Resolve, d.Reject, d.CancelOperation);
236 that.On(d.Resolve, d.Reject, d.CancelOperation);
237 if (success != null)
237 if (success != null)
238 d.CancellationRequested(that.Cancel);
238 d.CancellationRequested(that.Cancel);
239 return d;
239 return d;
240 }
240 }
241
241
242 public static IPromise Chain(this IPromise that, Func<IPromise> success, Func<Exception,IPromise> error) {
242 public static IPromise Chain(this IPromise that, Func<IPromise> success, Func<Exception,IPromise> error) {
243 return Chain(that, success, error, null);
243 return Chain(that, success, error, null);
244 }
244 }
245
245
246 public static IPromise Chain(this IPromise that, Func<IPromise> success) {
246 public static IPromise Chain(this IPromise that, Func<IPromise> success) {
247 return Chain(that, success, null, null);
247 return Chain(that, success, null, null);
248 }
248 }
249
249
250 public static IPromise<T> Chain<T>(this IPromise that, Func<IPromise<T>> success, Func<Exception, IPromise<T>> error, Func<Exception, IPromise<T>> cancel) {
250 public static IPromise<T> Chain<T>(this IPromise that, Func<IPromise<T>> success, Func<Exception, IPromise<T>> error, Func<Exception, IPromise<T>> cancel) {
251 Safe.ArgumentNotNull(that, "that");
251 Safe.ArgumentNotNull(that, "that");
252
252
253 var d = new FuncChainTask<T>(success, error, cancel);
253 var d = new FuncChainTask<T>(success, error, cancel, false);
254 that.On(d.Resolve, d.Reject, d.CancelOperation);
254 that.On(d.Resolve, d.Reject, d.CancelOperation);
255 if (success != null)
255 if (success != null)
256 d.CancellationRequested(that.Cancel);
256 d.CancellationRequested(that.Cancel);
257 return d;
257 return d;
258 }
258 }
259
259
260 public static IPromise<T> Chain<T>(this IPromise that, Func<IPromise<T>> success, Func<Exception, IPromise<T>> error) {
260 public static IPromise<T> Chain<T>(this IPromise that, Func<IPromise<T>> success, Func<Exception, IPromise<T>> error) {
261 return Chain(that, success, error, null);
261 return Chain(that, success, error, null);
262 }
262 }
263
263
264 public static IPromise<T> Chain<T>(this IPromise that, Func<IPromise<T>> success) {
264 public static IPromise<T> Chain<T>(this IPromise that, Func<IPromise<T>> success) {
265 return Chain(that, success, null, null);
265 return Chain(that, success, null, null);
266 }
266 }
267
267
268 public static IPromise<T2> Chain<T, T2>(this IPromise<T> that, Func<T, IPromise<T2>> success, Func<Exception, IPromise<T2>> error, Func<Exception, IPromise<T2>> cancel) {
268 public static IPromise<T2> Chain<T, T2>(this IPromise<T> that, Func<T, IPromise<T2>> success, Func<Exception, IPromise<T2>> error, Func<Exception, IPromise<T2>> cancel) {
269 Safe.ArgumentNotNull(that, "that");
269 Safe.ArgumentNotNull(that, "that");
270 var d = new FuncChainTask<T,T2>(success, error, cancel);
270 var d = new FuncChainTask<T,T2>(success, error, cancel, false);
271 that.On(d.Resolve, d.Reject, d.CancelOperation);
271 that.On(d.Resolve, d.Reject, d.CancelOperation);
272 if (success != null)
272 if (success != null)
273 d.CancellationRequested(that.Cancel);
273 d.CancellationRequested(that.Cancel);
274 return d;
274 return d;
275 }
275 }
276
276
277 public static IPromise<T2> Chain<T, T2>(this IPromise<T> that, Func<T, IPromise<T2>> success, Func<Exception, IPromise<T2>> error) {
277 public static IPromise<T2> Chain<T, T2>(this IPromise<T> that, Func<T, IPromise<T2>> success, Func<Exception, IPromise<T2>> error) {
278 return Chain(that, success, error, null);
278 return Chain(that, success, error, null);
279 }
279 }
280
280
281 public static IPromise<T2> Chain<T, T2>(this IPromise<T> that, Func<T, IPromise<T2>> success) {
281 public static IPromise<T2> Chain<T, T2>(this IPromise<T> that, Func<T, IPromise<T2>> success) {
282 return Chain(that, success, null, null);
282 return Chain(that, success, null, null);
283 }
283 }
284
284
285 #endregion
285 #endregion
286
286
287
287
288 #if NET_4_5
288 #if NET_4_5
289
289
290 public static Task<T> GetTask<T>(this IPromise<T> that) {
290 public static Task<T> GetTask<T>(this IPromise<T> that) {
291 Safe.ArgumentNotNull(that, "that");
291 Safe.ArgumentNotNull(that, "that");
292 var tcs = new TaskCompletionSource<T>();
292 var tcs = new TaskCompletionSource<T>();
293
293
294 that.On(tcs.SetResult, tcs.SetException, r => tcs.SetCanceled());
294 that.On(tcs.SetResult, tcs.SetException, r => tcs.SetCanceled());
295
295
296 return tcs.Task;
296 return tcs.Task;
297 }
297 }
298
298
299 #endif
299 #endif
300 }
300 }
301 }
301 }
302
302
General Comments 0
You need to be logged in to leave comments. Login now