##// END OF EJS Templates
added ICancellable.Cancel(Exception) to allow specify the reason of cancellation
cin -
r138:f75cfa58e3d4 v2
parent child
Show More
@@ -1,38 +1,38
1 using System.Windows.Forms;
1 using System.Windows.Forms;
2 using System;
2 using System;
3
3
4
4
5 namespace Implab.Fx {
5 namespace Implab.Fx {
6 public class ControlBoundPromise<T> : Promise<T> {
6 public class ControlBoundPromise<T> : Promise<T> {
7 readonly Control m_target;
7 readonly Control m_target;
8
8
9 public ControlBoundPromise(Control target) {
9 public ControlBoundPromise(Control target) {
10 Safe.ArgumentNotNull(target, "target");
10 Safe.ArgumentNotNull(target, "target");
11
11
12 m_target = target;
12 m_target = target;
13 }
13 }
14
14
15 protected override void SignalSuccess(IDeferred<T> handler) {
15 protected override void SignalSuccess(IDeferred<T> handler) {
16 if (m_target.InvokeRequired)
16 if (m_target.InvokeRequired)
17 m_target.BeginInvoke(new Action<IDeferred<T>>(base.SignalSuccess), handler);
17 m_target.BeginInvoke(new Action<IDeferred<T>>(base.SignalSuccess), handler);
18 else
18 else
19 base.SignalSuccess(handler);
19 base.SignalSuccess(handler);
20 }
20 }
21
21
22 protected override void SignalCancelled(IDeferred<T> handler) {
22 protected override void SignalCancelled(IDeferred<T> handler, Exception reason) {
23 if (m_target.InvokeRequired)
23 if (m_target.InvokeRequired)
24 m_target.BeginInvoke(new Action<IDeferred<T>>(base.SignalCancelled), handler);
24 m_target.BeginInvoke(new Action<IDeferred<T>,Exception>(base.SignalCancelled), handler, reason);
25 else
25 else
26 base.SignalCancelled(handler);
26 base.SignalCancelled(handler, reason);
27 }
27 }
28
28
29 protected override void SignalError(IDeferred<T> handler, Exception error) {
29 protected override void SignalError(IDeferred<T> handler, Exception error) {
30 if (m_target.InvokeRequired)
30 if (m_target.InvokeRequired)
31 m_target.BeginInvoke(new Action<IDeferred<T>,Exception>(base.SignalError), handler, error);
31 m_target.BeginInvoke(new Action<IDeferred<T>,Exception>(base.SignalError), handler, error);
32 else
32 else
33 base.SignalError(handler, error);
33 base.SignalError(handler, error);
34 }
34 }
35
35
36 }
36 }
37 }
37 }
38
38
@@ -1,852 +1,852
1 using System;
1 using System;
2 using System.Reflection;
2 using System.Reflection;
3 using System.Threading;
3 using System.Threading;
4 using Implab.Parallels;
4 using Implab.Parallels;
5
5
6 #if MONO
6 #if MONO
7
7
8 using NUnit.Framework;
8 using NUnit.Framework;
9 using TestClassAttribute = NUnit.Framework.TestFixtureAttribute;
9 using TestClassAttribute = NUnit.Framework.TestFixtureAttribute;
10 using TestMethod = NUnit.Framework.TestAttribute;
10 using TestMethod = NUnit.Framework.TestAttribute;
11
11
12 #else
12 #else
13
13
14 using Microsoft.VisualStudio.TestTools.UnitTesting;
14 using Microsoft.VisualStudio.TestTools.UnitTesting;
15
15
16 #endif
16 #endif
17
17
18 namespace Implab.Test {
18 namespace Implab.Test {
19 [TestClass]
19 [TestClass]
20 public class AsyncTests {
20 public class AsyncTests {
21 [TestMethod]
21 [TestMethod]
22 public void ResolveTest() {
22 public void ResolveTest() {
23 int res = -1;
23 int res = -1;
24 var p = new Promise<int>();
24 var p = new Promise<int>();
25 p.Then(x => res = x);
25 p.Then(x => res = x);
26 p.Resolve(100);
26 p.Resolve(100);
27
27
28 Assert.AreEqual(100, res);
28 Assert.AreEqual(100, res);
29 }
29 }
30
30
31 [TestMethod]
31 [TestMethod]
32 public void RejectTest() {
32 public void RejectTest() {
33 int res = -1;
33 int res = -1;
34 Exception err = null;
34 Exception err = null;
35
35
36 var p = new Promise<int>();
36 var p = new Promise<int>();
37 p.Then(
37 p.Then(
38 x => res = x,
38 x => res = x,
39 e => {
39 e => {
40 err = e;
40 err = e;
41 return -2;
41 return -2;
42 }
42 }
43 );
43 );
44 p.Reject(new ApplicationException("error"));
44 p.Reject(new ApplicationException("error"));
45
45
46 Assert.AreEqual(res, -1);
46 Assert.AreEqual(res, -1);
47 Assert.AreEqual(err.Message, "error");
47 Assert.AreEqual(err.Message, "error");
48
48
49 }
49 }
50
50
51 [TestMethod]
51 [TestMethod]
52 public void CancelExceptionTest() {
52 public void CancelExceptionTest() {
53 var p = new Promise<bool>();
53 var p = new Promise<bool>();
54 p.Cancel();
54 p.Cancel();
55
55
56 var p2 = p.Cancelled(() => {
56 var p2 = p.Then(x => x, null, reason => {
57 throw new ApplicationException("CANCELLED");
57 throw new ApplicationException("CANCELLED");
58 });
58 });
59
59
60 try {
60 try {
61 p2.Join();
61 p2.Join();
62 Assert.Fail();
62 Assert.Fail();
63 } catch (ApplicationException err) {
63 } catch (ApplicationException err) {
64 Assert.AreEqual("CANCELLED", err.InnerException.Message);
64 Assert.AreEqual("CANCELLED", err.InnerException.Message);
65 }
65 }
66
66
67 }
67 }
68
68
69 [TestMethod]
69 [TestMethod]
70 public void ContinueOnCancelTest() {
70 public void ContinueOnCancelTest() {
71 var p = new Promise<bool>();
71 var p = new Promise<bool>();
72 p.Cancel();
72 p.Cancel();
73
73
74 var p2 = p
74 var p2 = p
75 .Cancelled<bool>(() => {
75 .Then<bool>(x => x, null, reason => {
76 throw new ApplicationException("CANCELLED");
76 throw new ApplicationException("CANCELLED");
77 })
77 })
78 .Error(e => true);
78 .Then(x => x, e => true);
79
79
80 Assert.AreEqual(true, p2.Join());
80 Assert.AreEqual(true, p2.Join());
81 }
81 }
82
82
83 [TestMethod]
83 [TestMethod]
84 public void JoinSuccessTest() {
84 public void JoinSuccessTest() {
85 var p = new Promise<int>();
85 var p = new Promise<int>();
86 p.Resolve(100);
86 p.Resolve(100);
87 Assert.AreEqual(p.Join(), 100);
87 Assert.AreEqual(p.Join(), 100);
88 }
88 }
89
89
90 [TestMethod]
90 [TestMethod]
91 public void JoinFailTest() {
91 public void JoinFailTest() {
92 var p = new Promise<int>();
92 var p = new Promise<int>();
93 p.Reject(new ApplicationException("failed"));
93 p.Reject(new ApplicationException("failed"));
94
94
95 try {
95 try {
96 p.Join();
96 p.Join();
97 throw new ApplicationException("WRONG!");
97 throw new ApplicationException("WRONG!");
98 } catch (TargetInvocationException err) {
98 } catch (TargetInvocationException err) {
99 Assert.AreEqual(err.InnerException.Message, "failed");
99 Assert.AreEqual(err.InnerException.Message, "failed");
100 } catch {
100 } catch {
101 Assert.Fail("Got wrong excaption");
101 Assert.Fail("Got wrong excaption");
102 }
102 }
103 }
103 }
104
104
105 [TestMethod]
105 [TestMethod]
106 public void MapTest() {
106 public void MapTest() {
107 var p = new Promise<int>();
107 var p = new Promise<int>();
108
108
109 var p2 = p.Then(x => x.ToString());
109 var p2 = p.Then(x => x.ToString());
110 p.Resolve(100);
110 p.Resolve(100);
111
111
112 Assert.AreEqual(p2.Join(), "100");
112 Assert.AreEqual(p2.Join(), "100");
113 }
113 }
114
114
115 [TestMethod]
115 [TestMethod]
116 public void FixErrorTest() {
116 public void FixErrorTest() {
117 var p = new Promise<int>();
117 var p = new Promise<int>();
118
118
119 var p2 = p.Error(e => 101);
119 var p2 = p.Then(x => x, e => 101);
120
120
121 p.Reject(new Exception());
121 p.Reject(new Exception());
122
122
123 Assert.AreEqual(p2.Join(), 101);
123 Assert.AreEqual(p2.Join(), 101);
124 }
124 }
125
125
126 [TestMethod]
126 [TestMethod]
127 public void ChainTest() {
127 public void ChainTest() {
128 var p1 = new Promise<int>();
128 var p1 = new Promise<int>();
129
129
130 var p3 = p1.Chain(x => {
130 var p3 = p1.Chain(x => {
131 var p2 = new Promise<string>();
131 var p2 = new Promise<string>();
132 p2.Resolve(x.ToString());
132 p2.Resolve(x.ToString());
133 return p2;
133 return p2;
134 });
134 });
135
135
136 p1.Resolve(100);
136 p1.Resolve(100);
137
137
138 Assert.AreEqual(p3.Join(), "100");
138 Assert.AreEqual(p3.Join(), "100");
139 }
139 }
140
140
141 [TestMethod]
141 [TestMethod]
142 public void ChainFailTest() {
142 public void ChainFailTest() {
143 var p1 = new Promise<int>();
143 var p1 = new Promise<int>();
144
144
145 var p3 = p1.Chain(x => {
145 var p3 = p1.Chain(x => {
146 var p2 = new Promise<string>();
146 var p2 = new Promise<string>();
147 p2.Reject(new Exception("DIE!!!"));
147 p2.Reject(new Exception("DIE!!!"));
148 return p2;
148 return p2;
149 });
149 });
150
150
151 p1.Resolve(100);
151 p1.Resolve(100);
152
152
153 Assert.IsTrue(p3.IsResolved);
153 Assert.IsTrue(p3.IsResolved);
154 }
154 }
155
155
156 [TestMethod]
156 [TestMethod]
157 public void PoolTest() {
157 public void PoolTest() {
158 var pid = Thread.CurrentThread.ManagedThreadId;
158 var pid = Thread.CurrentThread.ManagedThreadId;
159 var p = AsyncPool.Invoke(() => Thread.CurrentThread.ManagedThreadId);
159 var p = AsyncPool.Invoke(() => Thread.CurrentThread.ManagedThreadId);
160
160
161 Assert.AreNotEqual(pid, p.Join());
161 Assert.AreNotEqual(pid, p.Join());
162 }
162 }
163
163
164 [TestMethod]
164 [TestMethod]
165 public void WorkerPoolSizeTest() {
165 public void WorkerPoolSizeTest() {
166 var pool = new WorkerPool(5, 10, 1);
166 var pool = new WorkerPool(5, 10, 1);
167
167
168 Assert.AreEqual(5, pool.PoolSize);
168 Assert.AreEqual(5, pool.PoolSize);
169
169
170 pool.Invoke(() => { Thread.Sleep(100000000); return 10; });
170 pool.Invoke(() => { Thread.Sleep(100000000); return 10; });
171 pool.Invoke(() => { Thread.Sleep(100000000); return 10; });
171 pool.Invoke(() => { Thread.Sleep(100000000); return 10; });
172 pool.Invoke(() => { Thread.Sleep(100000000); return 10; });
172 pool.Invoke(() => { Thread.Sleep(100000000); return 10; });
173
173
174 Assert.AreEqual(5, pool.PoolSize);
174 Assert.AreEqual(5, pool.PoolSize);
175
175
176 for (int i = 0; i < 100; i++)
176 for (int i = 0; i < 100; i++)
177 pool.Invoke(() => { Thread.Sleep(100000000); return 10; });
177 pool.Invoke(() => { Thread.Sleep(100000000); return 10; });
178 Thread.Sleep(200);
178 Thread.Sleep(200);
179 Assert.AreEqual(10, pool.PoolSize);
179 Assert.AreEqual(10, pool.PoolSize);
180
180
181 pool.Dispose();
181 pool.Dispose();
182 }
182 }
183
183
184 [TestMethod]
184 [TestMethod]
185 public void WorkerPoolCorrectTest() {
185 public void WorkerPoolCorrectTest() {
186 var pool = new WorkerPool(0,1000,100);
186 var pool = new WorkerPool(0,1000,100);
187
187
188 const int iterations = 1000;
188 const int iterations = 1000;
189 int pending = iterations;
189 int pending = iterations;
190 var stop = new ManualResetEvent(false);
190 var stop = new ManualResetEvent(false);
191
191
192 var count = 0;
192 var count = 0;
193 for (int i = 0; i < iterations; i++) {
193 for (int i = 0; i < iterations; i++) {
194 pool
194 pool
195 .Invoke(() => 1)
195 .Invoke(() => 1)
196 .Then(x => Interlocked.Add(ref count, x))
196 .Then(x => Interlocked.Add(ref count, x))
197 .Then(x => Math.Log10(x))
197 .Then(x => Math.Log10(x))
198 .On(() => {
198 .On(() => {
199 Interlocked.Decrement(ref pending);
199 Interlocked.Decrement(ref pending);
200 if (pending == 0)
200 if (pending == 0)
201 stop.Set();
201 stop.Set();
202 }, PromiseEventType.All);
202 }, PromiseEventType.All);
203 }
203 }
204
204
205 stop.WaitOne();
205 stop.WaitOne();
206
206
207 Assert.AreEqual(iterations, count);
207 Assert.AreEqual(iterations, count);
208 Console.WriteLine("Max threads: {0}", pool.MaxRunningThreads);
208 Console.WriteLine("Max threads: {0}", pool.MaxRunningThreads);
209 pool.Dispose();
209 pool.Dispose();
210
210
211 }
211 }
212
212
213 [TestMethod]
213 [TestMethod]
214 public void WorkerPoolDisposeTest() {
214 public void WorkerPoolDisposeTest() {
215 var pool = new WorkerPool(5, 20);
215 var pool = new WorkerPool(5, 20);
216 Assert.AreEqual(5, pool.PoolSize);
216 Assert.AreEqual(5, pool.PoolSize);
217 pool.Dispose();
217 pool.Dispose();
218 Thread.Sleep(500);
218 Thread.Sleep(500);
219 Assert.AreEqual(0, pool.PoolSize);
219 Assert.AreEqual(0, pool.PoolSize);
220 pool.Dispose();
220 pool.Dispose();
221 }
221 }
222
222
223 [TestMethod]
223 [TestMethod]
224 public void MTQueueTest() {
224 public void MTQueueTest() {
225 var queue = new MTQueue<int>();
225 var queue = new MTQueue<int>();
226 int res;
226 int res;
227
227
228 queue.Enqueue(10);
228 queue.Enqueue(10);
229 Assert.IsTrue(queue.TryDequeue(out res));
229 Assert.IsTrue(queue.TryDequeue(out res));
230 Assert.AreEqual(10, res);
230 Assert.AreEqual(10, res);
231 Assert.IsFalse(queue.TryDequeue(out res));
231 Assert.IsFalse(queue.TryDequeue(out res));
232
232
233 for (int i = 0; i < 1000; i++)
233 for (int i = 0; i < 1000; i++)
234 queue.Enqueue(i);
234 queue.Enqueue(i);
235
235
236 for (int i = 0; i < 1000; i++) {
236 for (int i = 0; i < 1000; i++) {
237 queue.TryDequeue(out res);
237 queue.TryDequeue(out res);
238 Assert.AreEqual(i, res);
238 Assert.AreEqual(i, res);
239 }
239 }
240
240
241 int writers = 0;
241 int writers = 0;
242 int readers = 0;
242 int readers = 0;
243 var stop = new ManualResetEvent(false);
243 var stop = new ManualResetEvent(false);
244 int total = 0;
244 int total = 0;
245
245
246 const int itemsPerWriter = 10000;
246 const int itemsPerWriter = 10000;
247 const int writersCount = 10;
247 const int writersCount = 10;
248
248
249 for (int i = 0; i < writersCount; i++) {
249 for (int i = 0; i < writersCount; i++) {
250 Interlocked.Increment(ref writers);
250 Interlocked.Increment(ref writers);
251 AsyncPool
251 AsyncPool
252 .RunThread(() => {
252 .RunThread(() => {
253 for (int ii = 0; ii < itemsPerWriter; ii++) {
253 for (int ii = 0; ii < itemsPerWriter; ii++) {
254 queue.Enqueue(1);
254 queue.Enqueue(1);
255 }
255 }
256 return 1;
256 return 1;
257 })
257 })
258 .On(() => Interlocked.Decrement(ref writers), PromiseEventType.All);
258 .On(() => Interlocked.Decrement(ref writers), PromiseEventType.All);
259 }
259 }
260
260
261 for (int i = 0; i < 10; i++) {
261 for (int i = 0; i < 10; i++) {
262 Interlocked.Increment(ref readers);
262 Interlocked.Increment(ref readers);
263 AsyncPool
263 AsyncPool
264 .RunThread(() => {
264 .RunThread(() => {
265 int t;
265 int t;
266 do {
266 do {
267 while (queue.TryDequeue(out t))
267 while (queue.TryDequeue(out t))
268 Interlocked.Add(ref total, t);
268 Interlocked.Add(ref total, t);
269 } while (writers > 0);
269 } while (writers > 0);
270 return 1;
270 return 1;
271 })
271 })
272 .On(() => {
272 .On(() => {
273 Interlocked.Decrement(ref readers);
273 Interlocked.Decrement(ref readers);
274 if (readers == 0)
274 if (readers == 0)
275 stop.Set();
275 stop.Set();
276 }, PromiseEventType.All);
276 }, PromiseEventType.All);
277 }
277 }
278
278
279 stop.WaitOne();
279 stop.WaitOne();
280
280
281 Assert.AreEqual(100000, total);
281 Assert.AreEqual(100000, total);
282 }
282 }
283
283
284 [TestMethod]
284 [TestMethod]
285 public void AsyncQueueTest() {
285 public void AsyncQueueTest() {
286 var queue = new AsyncQueue<int>();
286 var queue = new AsyncQueue<int>();
287 int res;
287 int res;
288
288
289 queue.Enqueue(10);
289 queue.Enqueue(10);
290 Assert.IsTrue(queue.TryDequeue(out res));
290 Assert.IsTrue(queue.TryDequeue(out res));
291 Assert.AreEqual(10, res);
291 Assert.AreEqual(10, res);
292 Assert.IsFalse(queue.TryDequeue(out res));
292 Assert.IsFalse(queue.TryDequeue(out res));
293
293
294 for (int i = 0; i < 1000; i++)
294 for (int i = 0; i < 1000; i++)
295 queue.Enqueue(i);
295 queue.Enqueue(i);
296
296
297 for (int i = 0; i < 1000; i++) {
297 for (int i = 0; i < 1000; i++) {
298 queue.TryDequeue(out res);
298 queue.TryDequeue(out res);
299 Assert.AreEqual(i, res);
299 Assert.AreEqual(i, res);
300 }
300 }
301
301
302 const int count = 10000000;
302 const int count = 10000000;
303
303
304 int res1 = 0, res2 = 0;
304 int res1 = 0, res2 = 0;
305 var t1 = Environment.TickCount;
305 var t1 = Environment.TickCount;
306
306
307 AsyncPool.RunThread(
307 AsyncPool.RunThread(
308 () => {
308 () => {
309 for (var i = 0; i < count; i++)
309 for (var i = 0; i < count; i++)
310 queue.Enqueue(1);
310 queue.Enqueue(1);
311 Console.WriteLine("done writer #1: {0} ms", Environment.TickCount - t1);
311 Console.WriteLine("done writer #1: {0} ms", Environment.TickCount - t1);
312 },
312 },
313 () => {
313 () => {
314 for (var i = 0; i < count; i++)
314 for (var i = 0; i < count; i++)
315 queue.Enqueue(2);
315 queue.Enqueue(2);
316 Console.WriteLine("done writer #2: {0} ms", Environment.TickCount - t1);
316 Console.WriteLine("done writer #2: {0} ms", Environment.TickCount - t1);
317 },
317 },
318 () => {
318 () => {
319 int temp;
319 int temp;
320 int i = 0;
320 int i = 0;
321 while (i < count)
321 while (i < count)
322 if (queue.TryDequeue(out temp)) {
322 if (queue.TryDequeue(out temp)) {
323 i++;
323 i++;
324 res1 += temp;
324 res1 += temp;
325 }
325 }
326 Console.WriteLine("done reader #1: {0} ms", Environment.TickCount - t1);
326 Console.WriteLine("done reader #1: {0} ms", Environment.TickCount - t1);
327 },
327 },
328 () => {
328 () => {
329 int temp;
329 int temp;
330 int i = 0;
330 int i = 0;
331 while (i < count)
331 while (i < count)
332 if (queue.TryDequeue(out temp)) {
332 if (queue.TryDequeue(out temp)) {
333 i++;
333 i++;
334 res2 += temp;
334 res2 += temp;
335 }
335 }
336 Console.WriteLine("done reader #2: {0} ms", Environment.TickCount - t1);
336 Console.WriteLine("done reader #2: {0} ms", Environment.TickCount - t1);
337 }
337 }
338 )
338 )
339 .Bundle()
339 .Bundle()
340 .Join();
340 .Join();
341
341
342 Assert.AreEqual(count * 3, res1 + res2);
342 Assert.AreEqual(count * 3, res1 + res2);
343
343
344 Console.WriteLine(
344 Console.WriteLine(
345 "done: {0} ms, summ#1: {1}, summ#2: {2}, total: {3}, count: {4}",
345 "done: {0} ms, summ#1: {1}, summ#2: {2}, total: {3}, count: {4}",
346 Environment.TickCount - t1,
346 Environment.TickCount - t1,
347 res1,
347 res1,
348 res2,
348 res2,
349 res1 + res2,
349 res1 + res2,
350 count
350 count
351 );
351 );
352 }
352 }
353
353
354 [TestMethod]
354 [TestMethod]
355 public void AsyncQueueBatchTest() {
355 public void AsyncQueueBatchTest() {
356 var queue = new AsyncQueue<int>();
356 var queue = new AsyncQueue<int>();
357
357
358 const int wBatch = 29;
358 const int wBatch = 29;
359 const int wCount = 400000;
359 const int wCount = 400000;
360 const int total = wBatch * wCount * 2;
360 const int total = wBatch * wCount * 2;
361 const int summ = wBatch * wCount * 3;
361 const int summ = wBatch * wCount * 3;
362
362
363 int r1 = 0, r2 = 0;
363 int r1 = 0, r2 = 0;
364 const int rBatch = 111;
364 const int rBatch = 111;
365 int read = 0;
365 int read = 0;
366
366
367 var t1 = Environment.TickCount;
367 var t1 = Environment.TickCount;
368
368
369 AsyncPool.RunThread(
369 AsyncPool.RunThread(
370 () => {
370 () => {
371 var buffer = new int[wBatch];
371 var buffer = new int[wBatch];
372 for(int i = 0; i<wBatch; i++)
372 for(int i = 0; i<wBatch; i++)
373 buffer[i] = 1;
373 buffer[i] = 1;
374
374
375 for(int i =0; i < wCount; i++)
375 for(int i =0; i < wCount; i++)
376 queue.EnqueueRange(buffer,0,wBatch);
376 queue.EnqueueRange(buffer,0,wBatch);
377 Console.WriteLine("done writer #1: {0} ms", Environment.TickCount - t1);
377 Console.WriteLine("done writer #1: {0} ms", Environment.TickCount - t1);
378 },
378 },
379 () => {
379 () => {
380 var buffer = new int[wBatch];
380 var buffer = new int[wBatch];
381 for(int i = 0; i<wBatch; i++)
381 for(int i = 0; i<wBatch; i++)
382 buffer[i] = 2;
382 buffer[i] = 2;
383
383
384 for(int i =0; i < wCount; i++)
384 for(int i =0; i < wCount; i++)
385 queue.EnqueueRange(buffer,0,wBatch);
385 queue.EnqueueRange(buffer,0,wBatch);
386 Console.WriteLine("done writer #2: {0} ms", Environment.TickCount - t1);
386 Console.WriteLine("done writer #2: {0} ms", Environment.TickCount - t1);
387 },
387 },
388 () => {
388 () => {
389 var buffer = new int[rBatch];
389 var buffer = new int[rBatch];
390
390
391 while(read < total) {
391 while(read < total) {
392 int actual;
392 int actual;
393 if (queue.TryDequeueRange(buffer,0,rBatch,out actual)) {
393 if (queue.TryDequeueRange(buffer,0,rBatch,out actual)) {
394 for(int i=0; i< actual; i++)
394 for(int i=0; i< actual; i++)
395 r1 += buffer[i];
395 r1 += buffer[i];
396 Interlocked.Add(ref read, actual);
396 Interlocked.Add(ref read, actual);
397 }
397 }
398 }
398 }
399
399
400 Console.WriteLine("done reader #1: {0} ms", Environment.TickCount - t1);
400 Console.WriteLine("done reader #1: {0} ms", Environment.TickCount - t1);
401 },
401 },
402 () => {
402 () => {
403 var buffer = new int[rBatch];
403 var buffer = new int[rBatch];
404
404
405 while(read < total) {
405 while(read < total) {
406 int actual;
406 int actual;
407 if (queue.TryDequeueRange(buffer,0,rBatch,out actual)) {
407 if (queue.TryDequeueRange(buffer,0,rBatch,out actual)) {
408 for(int i=0; i< actual; i++)
408 for(int i=0; i< actual; i++)
409 r2 += buffer[i];
409 r2 += buffer[i];
410 Interlocked.Add(ref read, actual);
410 Interlocked.Add(ref read, actual);
411 }
411 }
412 }
412 }
413
413
414 Console.WriteLine("done reader #2: {0} ms", Environment.TickCount - t1);
414 Console.WriteLine("done reader #2: {0} ms", Environment.TickCount - t1);
415 }
415 }
416 )
416 )
417 .Bundle()
417 .Bundle()
418 .Join();
418 .Join();
419
419
420 Assert.AreEqual(summ , r1 + r2);
420 Assert.AreEqual(summ , r1 + r2);
421
421
422 Console.WriteLine(
422 Console.WriteLine(
423 "done: {0} ms, summ#1: {1}, summ#2: {2}, total: {3}, count: {4}",
423 "done: {0} ms, summ#1: {1}, summ#2: {2}, total: {3}, count: {4}",
424 Environment.TickCount - t1,
424 Environment.TickCount - t1,
425 r1,
425 r1,
426 r2,
426 r2,
427 r1 + r2,
427 r1 + r2,
428 total
428 total
429 );
429 );
430 }
430 }
431
431
432 [TestMethod]
432 [TestMethod]
433 public void AsyncQueueChunkDequeueTest() {
433 public void AsyncQueueChunkDequeueTest() {
434 var queue = new AsyncQueue<int>();
434 var queue = new AsyncQueue<int>();
435
435
436 const int wBatch = 31;
436 const int wBatch = 31;
437 const int wCount = 200000;
437 const int wCount = 200000;
438 const int total = wBatch * wCount * 3;
438 const int total = wBatch * wCount * 3;
439 const int summ = wBatch * wCount * 6;
439 const int summ = wBatch * wCount * 6;
440
440
441 int r1 = 0, r2 = 0;
441 int r1 = 0, r2 = 0;
442 const int rBatch = 1024;
442 const int rBatch = 1024;
443 int read = 0;
443 int read = 0;
444
444
445 var t1 = Environment.TickCount;
445 var t1 = Environment.TickCount;
446
446
447 AsyncPool.RunThread(
447 AsyncPool.RunThread(
448 () => {
448 () => {
449 var buffer = new int[wBatch];
449 var buffer = new int[wBatch];
450 for(int i = 0; i<wBatch; i++)
450 for(int i = 0; i<wBatch; i++)
451 buffer[i] = 1;
451 buffer[i] = 1;
452
452
453 for(int i =0; i < wCount; i++)
453 for(int i =0; i < wCount; i++)
454 queue.EnqueueRange(buffer,0,wBatch);
454 queue.EnqueueRange(buffer,0,wBatch);
455 Console.WriteLine("done writer #1: {0} ms", Environment.TickCount - t1);
455 Console.WriteLine("done writer #1: {0} ms", Environment.TickCount - t1);
456 },
456 },
457 () => {
457 () => {
458 var buffer = new int[wBatch];
458 var buffer = new int[wBatch];
459 for(int i = 0; i<wBatch; i++)
459 for(int i = 0; i<wBatch; i++)
460 buffer[i] = 2;
460 buffer[i] = 2;
461
461
462 for(int i =0; i < wCount; i++)
462 for(int i =0; i < wCount; i++)
463 queue.EnqueueRange(buffer,0,wBatch);
463 queue.EnqueueRange(buffer,0,wBatch);
464 Console.WriteLine("done writer #2: {0} ms", Environment.TickCount - t1);
464 Console.WriteLine("done writer #2: {0} ms", Environment.TickCount - t1);
465 },
465 },
466 () => {
466 () => {
467 var buffer = new int[wBatch];
467 var buffer = new int[wBatch];
468 for(int i = 0; i<wBatch; i++)
468 for(int i = 0; i<wBatch; i++)
469 buffer[i] = 3;
469 buffer[i] = 3;
470
470
471 for(int i =0; i < wCount; i++)
471 for(int i =0; i < wCount; i++)
472 queue.EnqueueRange(buffer,0,wBatch);
472 queue.EnqueueRange(buffer,0,wBatch);
473 Console.WriteLine("done writer #3: {0} ms", Environment.TickCount - t1);
473 Console.WriteLine("done writer #3: {0} ms", Environment.TickCount - t1);
474 },
474 },
475 () => {
475 () => {
476 var buffer = new int[rBatch];
476 var buffer = new int[rBatch];
477 int count = 1;
477 int count = 1;
478 double avgchunk = 0;
478 double avgchunk = 0;
479 while(read < total) {
479 while(read < total) {
480 int actual;
480 int actual;
481 if (queue.TryDequeueChunk(buffer,0,rBatch,out actual)) {
481 if (queue.TryDequeueChunk(buffer,0,rBatch,out actual)) {
482 for(int i=0; i< actual; i++)
482 for(int i=0; i< actual; i++)
483 r2 += buffer[i];
483 r2 += buffer[i];
484 Interlocked.Add(ref read, actual);
484 Interlocked.Add(ref read, actual);
485 avgchunk = avgchunk*(count-1)/count + actual/(double)count;
485 avgchunk = avgchunk*(count-1)/count + actual/(double)count;
486 count ++;
486 count ++;
487 }
487 }
488 }
488 }
489
489
490 Console.WriteLine("done reader #2: {0} ms, avg chunk size: {1}", Environment.TickCount - t1, avgchunk);
490 Console.WriteLine("done reader #2: {0} ms, avg chunk size: {1}", Environment.TickCount - t1, avgchunk);
491 }
491 }
492 )
492 )
493 .Bundle()
493 .Bundle()
494 .Join();
494 .Join();
495
495
496 Assert.AreEqual(summ , r1 + r2);
496 Assert.AreEqual(summ , r1 + r2);
497
497
498 Console.WriteLine(
498 Console.WriteLine(
499 "done: {0} ms, summ#1: {1}, summ#2: {2}, total: {3}, count: {4}",
499 "done: {0} ms, summ#1: {1}, summ#2: {2}, total: {3}, count: {4}",
500 Environment.TickCount - t1,
500 Environment.TickCount - t1,
501 r1,
501 r1,
502 r2,
502 r2,
503 r1 + r2,
503 r1 + r2,
504 total
504 total
505 );
505 );
506 }
506 }
507
507
508 [TestMethod]
508 [TestMethod]
509 public void AsyncQueueDrainTest() {
509 public void AsyncQueueDrainTest() {
510 var queue = new AsyncQueue<int>();
510 var queue = new AsyncQueue<int>();
511
511
512 const int wBatch = 11;
512 const int wBatch = 11;
513 const int wCount = 200000;
513 const int wCount = 200000;
514 const int total = wBatch * wCount * 3;
514 const int total = wBatch * wCount * 3;
515 const int summ = wBatch * wCount * 3;
515 const int summ = wBatch * wCount * 3;
516
516
517 int r1 = 0, r2 = 0;
517 int r1 = 0, r2 = 0;
518 const int rBatch = 11;
518 const int rBatch = 11;
519 int read = 0;
519 int read = 0;
520
520
521 var t1 = Environment.TickCount;
521 var t1 = Environment.TickCount;
522
522
523 AsyncPool.RunThread(
523 AsyncPool.RunThread(
524 () => {
524 () => {
525 var buffer = new int[wBatch];
525 var buffer = new int[wBatch];
526 for(int i = 0; i<wBatch; i++)
526 for(int i = 0; i<wBatch; i++)
527 buffer[i] = 1;
527 buffer[i] = 1;
528
528
529 for(int i =0; i < wCount; i++)
529 for(int i =0; i < wCount; i++)
530 queue.EnqueueRange(buffer,0,wBatch);
530 queue.EnqueueRange(buffer,0,wBatch);
531 Console.WriteLine("done writer #1: {0} ms", Environment.TickCount - t1);
531 Console.WriteLine("done writer #1: {0} ms", Environment.TickCount - t1);
532 },
532 },
533 () => {
533 () => {
534 for(int i =0; i < wCount * wBatch; i++)
534 for(int i =0; i < wCount * wBatch; i++)
535 queue.Enqueue(1);
535 queue.Enqueue(1);
536 Console.WriteLine("done writer #2: {0} ms", Environment.TickCount - t1);
536 Console.WriteLine("done writer #2: {0} ms", Environment.TickCount - t1);
537 },
537 },
538 () => {
538 () => {
539 var buffer = new int[wBatch];
539 var buffer = new int[wBatch];
540 for(int i = 0; i<wBatch; i++)
540 for(int i = 0; i<wBatch; i++)
541 buffer[i] = 1;
541 buffer[i] = 1;
542
542
543 for(int i =0; i < wCount; i++)
543 for(int i =0; i < wCount; i++)
544 queue.EnqueueRange(buffer,0,wBatch);
544 queue.EnqueueRange(buffer,0,wBatch);
545 Console.WriteLine("done writer #3: {0} ms", Environment.TickCount - t1);
545 Console.WriteLine("done writer #3: {0} ms", Environment.TickCount - t1);
546 },
546 },
547 /*() => {
547 /*() => {
548 int temp;
548 int temp;
549 int count = 0;
549 int count = 0;
550 while (read < total)
550 while (read < total)
551 if (queue.TryDequeue(out temp)) {
551 if (queue.TryDequeue(out temp)) {
552 count++;
552 count++;
553 r1 += temp;
553 r1 += temp;
554 Interlocked.Increment(ref read);
554 Interlocked.Increment(ref read);
555 }
555 }
556 Console.WriteLine("done reader #1: {0} ms, {1} count", Environment.TickCount - t1, count);
556 Console.WriteLine("done reader #1: {0} ms, {1} count", Environment.TickCount - t1, count);
557 },*/
557 },*/
558 /*() => {
558 /*() => {
559 var buffer = new int[rBatch];
559 var buffer = new int[rBatch];
560 var count = 0;
560 var count = 0;
561 while(read < total) {
561 while(read < total) {
562 int actual;
562 int actual;
563 if (queue.TryDequeueRange(buffer,0,rBatch,out actual)) {
563 if (queue.TryDequeueRange(buffer,0,rBatch,out actual)) {
564 for(int i=0; i< actual; i++)
564 for(int i=0; i< actual; i++)
565 r1 += buffer[i];
565 r1 += buffer[i];
566 Interlocked.Add(ref read, actual);
566 Interlocked.Add(ref read, actual);
567 count += actual;
567 count += actual;
568 }
568 }
569 }
569 }
570
570
571 Console.WriteLine("done reader #1: {0} ms, {1} items", Environment.TickCount - t1, count);
571 Console.WriteLine("done reader #1: {0} ms, {1} items", Environment.TickCount - t1, count);
572 },*/
572 },*/
573 () => {
573 () => {
574 var count = 0;
574 var count = 0;
575 while(read < total) {
575 while(read < total) {
576 var buffer = queue.Drain();
576 var buffer = queue.Drain();
577 for(int i=0; i< buffer.Length; i++)
577 for(int i=0; i< buffer.Length; i++)
578 r1 += buffer[i];
578 r1 += buffer[i];
579 Interlocked.Add(ref read, buffer.Length);
579 Interlocked.Add(ref read, buffer.Length);
580 count += buffer.Length;
580 count += buffer.Length;
581 }
581 }
582 Console.WriteLine("done reader #1: {0} ms, {1} items", Environment.TickCount - t1, count);
582 Console.WriteLine("done reader #1: {0} ms, {1} items", Environment.TickCount - t1, count);
583 },
583 },
584 () => {
584 () => {
585 var count = 0;
585 var count = 0;
586 while(read < total) {
586 while(read < total) {
587 var buffer = queue.Drain();
587 var buffer = queue.Drain();
588 for(int i=0; i< buffer.Length; i++)
588 for(int i=0; i< buffer.Length; i++)
589 r2 += buffer[i];
589 r2 += buffer[i];
590 Interlocked.Add(ref read, buffer.Length);
590 Interlocked.Add(ref read, buffer.Length);
591 count += buffer.Length;
591 count += buffer.Length;
592 }
592 }
593 Console.WriteLine("done reader #2: {0} ms, {1} items", Environment.TickCount - t1, count);
593 Console.WriteLine("done reader #2: {0} ms, {1} items", Environment.TickCount - t1, count);
594 }
594 }
595 )
595 )
596 .Bundle()
596 .Bundle()
597 .Join();
597 .Join();
598
598
599 Assert.AreEqual(summ , r1 + r2);
599 Assert.AreEqual(summ , r1 + r2);
600
600
601 Console.WriteLine(
601 Console.WriteLine(
602 "done: {0} ms, summ#1: {1}, summ#2: {2}, total: {3}, count: {4}",
602 "done: {0} ms, summ#1: {1}, summ#2: {2}, total: {3}, count: {4}",
603 Environment.TickCount - t1,
603 Environment.TickCount - t1,
604 r1,
604 r1,
605 r2,
605 r2,
606 r1 + r2,
606 r1 + r2,
607 total
607 total
608 );
608 );
609 }
609 }
610
610
611 [TestMethod]
611 [TestMethod]
612 public void ParallelMapTest() {
612 public void ParallelMapTest() {
613
613
614 const int count = 100000;
614 const int count = 100000;
615
615
616 var args = new double[count];
616 var args = new double[count];
617 var rand = new Random();
617 var rand = new Random();
618
618
619 for (int i = 0; i < count; i++)
619 for (int i = 0; i < count; i++)
620 args[i] = rand.NextDouble();
620 args[i] = rand.NextDouble();
621
621
622 var t = Environment.TickCount;
622 var t = Environment.TickCount;
623 var res = args.ParallelMap(x => Math.Sin(x*x), 4).Join();
623 var res = args.ParallelMap(x => Math.Sin(x*x), 4).Join();
624
624
625 Console.WriteLine("Map complete in {0} ms", Environment.TickCount - t);
625 Console.WriteLine("Map complete in {0} ms", Environment.TickCount - t);
626
626
627 t = Environment.TickCount;
627 t = Environment.TickCount;
628 for (int i = 0; i < count; i++)
628 for (int i = 0; i < count; i++)
629 Assert.AreEqual(Math.Sin(args[i] * args[i]), res[i]);
629 Assert.AreEqual(Math.Sin(args[i] * args[i]), res[i]);
630 Console.WriteLine("Verified in {0} ms", Environment.TickCount - t);
630 Console.WriteLine("Verified in {0} ms", Environment.TickCount - t);
631 }
631 }
632
632
633 [TestMethod]
633 [TestMethod]
634 public void ChainedMapTest() {
634 public void ChainedMapTest() {
635
635
636 using (var pool = new WorkerPool()) {
636 using (var pool = new WorkerPool()) {
637 const int count = 10000;
637 const int count = 10000;
638
638
639 var args = new double[count];
639 var args = new double[count];
640 var rand = new Random();
640 var rand = new Random();
641
641
642 for (int i = 0; i < count; i++)
642 for (int i = 0; i < count; i++)
643 args[i] = rand.NextDouble();
643 args[i] = rand.NextDouble();
644
644
645 var t = Environment.TickCount;
645 var t = Environment.TickCount;
646 var res = args
646 var res = args
647 .ChainedMap(
647 .ChainedMap(
648 // Analysis disable once AccessToDisposedClosure
648 // Analysis disable once AccessToDisposedClosure
649 x => pool.Invoke(
649 x => pool.Invoke(
650 () => Math.Sin(x * x)
650 () => Math.Sin(x * x)
651 ),
651 ),
652 4
652 4
653 )
653 )
654 .Join();
654 .Join();
655
655
656 Console.WriteLine("Map complete in {0} ms", Environment.TickCount - t);
656 Console.WriteLine("Map complete in {0} ms", Environment.TickCount - t);
657
657
658 t = Environment.TickCount;
658 t = Environment.TickCount;
659 for (int i = 0; i < count; i++)
659 for (int i = 0; i < count; i++)
660 Assert.AreEqual(Math.Sin(args[i] * args[i]), res[i]);
660 Assert.AreEqual(Math.Sin(args[i] * args[i]), res[i]);
661 Console.WriteLine("Verified in {0} ms", Environment.TickCount - t);
661 Console.WriteLine("Verified in {0} ms", Environment.TickCount - t);
662 Console.WriteLine("Max workers: {0}", pool.MaxRunningThreads);
662 Console.WriteLine("Max workers: {0}", pool.MaxRunningThreads);
663 }
663 }
664 }
664 }
665
665
666 [TestMethod]
666 [TestMethod]
667 public void ParallelForEachTest() {
667 public void ParallelForEachTest() {
668
668
669 const int count = 100000;
669 const int count = 100000;
670
670
671 var args = new int[count];
671 var args = new int[count];
672 var rand = new Random();
672 var rand = new Random();
673
673
674 for (int i = 0; i < count; i++)
674 for (int i = 0; i < count; i++)
675 args[i] = (int)(rand.NextDouble() * 100);
675 args[i] = (int)(rand.NextDouble() * 100);
676
676
677 int result = 0;
677 int result = 0;
678
678
679 var t = Environment.TickCount;
679 var t = Environment.TickCount;
680 args.ParallelForEach(x => Interlocked.Add(ref result, x), 4).Join();
680 args.ParallelForEach(x => Interlocked.Add(ref result, x), 4).Join();
681
681
682 Console.WriteLine("Iteration complete in {0} ms, result: {1}", Environment.TickCount - t, result);
682 Console.WriteLine("Iteration complete in {0} ms, result: {1}", Environment.TickCount - t, result);
683
683
684 int result2 = 0;
684 int result2 = 0;
685
685
686 t = Environment.TickCount;
686 t = Environment.TickCount;
687 for (int i = 0; i < count; i++)
687 for (int i = 0; i < count; i++)
688 result2 += args[i];
688 result2 += args[i];
689 Assert.AreEqual(result2, result);
689 Assert.AreEqual(result2, result);
690 Console.WriteLine("Verified in {0} ms", Environment.TickCount - t);
690 Console.WriteLine("Verified in {0} ms", Environment.TickCount - t);
691 }
691 }
692
692
693 [TestMethod]
693 [TestMethod]
694 public void ComplexCase1Test() {
694 public void ComplexCase1Test() {
695 var flags = new bool[3];
695 var flags = new bool[3];
696
696
697 // op1 (aync 200ms) => op2 (async 200ms) => op3 (sync map)
697 // op1 (aync 200ms) => op2 (async 200ms) => op3 (sync map)
698
698
699 var step1 = PromiseHelper
699 var step1 = PromiseHelper
700 .Sleep(200, "Alan")
700 .Sleep(200, "Alan")
701 .On(() => flags[0] = true, PromiseEventType.Cancelled);
701 .On(() => flags[0] = true, PromiseEventType.Cancelled);
702 var p = step1
702 var p = step1
703 .Chain(x =>
703 .Chain(x =>
704 PromiseHelper
704 PromiseHelper
705 .Sleep(200, "Hi, " + x)
705 .Sleep(200, "Hi, " + x)
706 .Then(y => y)
706 .Then(y => y)
707 .On(() => flags[1] = true, PromiseEventType.Cancelled)
707 .On(() => flags[1] = true, PromiseEventType.Cancelled)
708 )
708 )
709 .On(() => flags[2] = true, PromiseEventType.Cancelled);
709 .On(() => flags[2] = true, PromiseEventType.Cancelled);
710 step1.Join();
710 step1.Join();
711 p.Cancel();
711 p.Cancel();
712 try {
712 try {
713 Assert.AreEqual(p.Join(), "Hi, Alan");
713 Assert.AreEqual(p.Join(), "Hi, Alan");
714 Assert.Fail("Shouldn't get here");
714 Assert.Fail("Shouldn't get here");
715 } catch (OperationCanceledException) {
715 } catch (OperationCanceledException) {
716 }
716 }
717
717
718 Assert.IsFalse(flags[0]);
718 Assert.IsFalse(flags[0]);
719 Assert.IsTrue(flags[1]);
719 Assert.IsTrue(flags[1]);
720 Assert.IsTrue(flags[2]);
720 Assert.IsTrue(flags[2]);
721 }
721 }
722
722
723 [TestMethod]
723 [TestMethod]
724 public void ChainedCancel1Test() {
724 public void ChainedCancel1Test() {
725 // ΠΏΡ€ΠΈ ΠΎΡ‚ΠΌΠ΅Π½Π΅ сцСплСнной асинхронной ΠΎΠΏΠ΅Ρ€Π°Ρ†ΠΈΠΈ всС ΠΎΠ±Π΅Ρ‰Π°Π½ΠΈΠ΅ Π΄ΠΎΠ»ΠΆΠ½ΠΎ
725 // ΠΏΡ€ΠΈ ΠΎΡ‚ΠΌΠ΅Π½Π΅ сцСплСнной асинхронной ΠΎΠΏΠ΅Ρ€Π°Ρ†ΠΈΠΈ всС ΠΎΠ±Π΅Ρ‰Π°Π½ΠΈΠ΅ Π΄ΠΎΠ»ΠΆΠ½ΠΎ
726 // Π·Π°Π²Π΅Ρ€ΡˆΠ°Ρ‚ΡŒΡΡ ошибкой OperationCanceledException
726 // Π·Π°Π²Π΅Ρ€ΡˆΠ°Ρ‚ΡŒΡΡ ошибкой OperationCanceledException
727 var p = PromiseHelper
727 var p = PromiseHelper
728 .Sleep(1, "Hi, HAL!")
728 .Sleep(1, "Hi, HAL!")
729 .Then(x => {
729 .Then(x => {
730 // запускаСм Π΄Π²Π΅ асинхронныС ΠΎΠΏΠ΅Ρ€Π°Ρ†ΠΈΠΈ
730 // запускаСм Π΄Π²Π΅ асинхронныС ΠΎΠΏΠ΅Ρ€Π°Ρ†ΠΈΠΈ
731 var result = PromiseHelper.Sleep(1000, "HEM ENABLED!!!");
731 var result = PromiseHelper.Sleep(1000, "HEM ENABLED!!!");
732 // вторая опСрация отмСняСт ΠΏΠ΅Ρ€Π²ΡƒΡŽ Π΄ΠΎ Π·Π°Π²Π΅Ρ€ΡˆΠ΅Π½ΠΈΡ
732 // вторая опСрация отмСняСт ΠΏΠ΅Ρ€Π²ΡƒΡŽ Π΄ΠΎ Π·Π°Π²Π΅Ρ€ΡˆΠ΅Π½ΠΈΡ
733 PromiseHelper
733 PromiseHelper
734 .Sleep(100, "HAL, STOP!")
734 .Sleep(100, "HAL, STOP!")
735 .Then(result.Cancel);
735 .Then(result.Cancel);
736 return result;
736 return result;
737 });
737 });
738 try {
738 try {
739 p.Join();
739 p.Join();
740 } catch (TargetInvocationException err) {
740 } catch (TargetInvocationException err) {
741 Assert.IsTrue(err.InnerException is OperationCanceledException);
741 Assert.IsTrue(err.InnerException is OperationCanceledException);
742 }
742 }
743 }
743 }
744
744
745 [TestMethod]
745 [TestMethod]
746 public void ChainedCancel2Test() {
746 public void ChainedCancel2Test() {
747 // ΠΏΡ€ΠΈ ΠΎΡ‚ΠΌΠ΅Π½Π΅ Ρ†Π΅ΠΏΠΎΡ‡ΠΊΠΈ ΠΎΠ±Π΅Ρ‰Π°Π½ΠΈΠΉ, Π²Π»ΠΎΠΆΠ΅Π½Π½Ρ‹Π΅ ΠΎΠΏΠ΅Ρ€Π°Ρ†ΠΈΠΈ Ρ‚Π°ΠΊΠΆΠ΅ Π΄ΠΎΠ»ΠΆΠ½Ρ‹ ΠΎΡ‚ΠΌΠ΅Π½ΡΡ‚ΡŒΡΡ
747 // ΠΏΡ€ΠΈ ΠΎΡ‚ΠΌΠ΅Π½Π΅ Ρ†Π΅ΠΏΠΎΡ‡ΠΊΠΈ ΠΎΠ±Π΅Ρ‰Π°Π½ΠΈΠΉ, Π²Π»ΠΎΠΆΠ΅Π½Π½Ρ‹Π΅ ΠΎΠΏΠ΅Ρ€Π°Ρ†ΠΈΠΈ Ρ‚Π°ΠΊΠΆΠ΅ Π΄ΠΎΠ»ΠΆΠ½Ρ‹ ΠΎΡ‚ΠΌΠ΅Π½ΡΡ‚ΡŒΡΡ
748 var pSurvive = new Promise<bool>();
748 var pSurvive = new Promise<bool>();
749 var hemStarted = new ManualResetEvent(false);
749 var hemStarted = new ManualResetEvent(false);
750 var p = PromiseHelper
750 var p = PromiseHelper
751 .Sleep(1, "Hi, HAL!")
751 .Sleep(1, "Hi, HAL!")
752 .Chain(x => {
752 .Chain(x => {
753 hemStarted.Set();
753 hemStarted.Set();
754 // запускаСм Π΄Π²Π΅ асинхронныС ΠΎΠΏΠ΅Ρ€Π°Ρ†ΠΈΠΈ
754 // запускаСм Π΄Π²Π΅ асинхронныС ΠΎΠΏΠ΅Ρ€Π°Ρ†ΠΈΠΈ
755 var result = PromiseHelper
755 var result = PromiseHelper
756 .Sleep(100000000, "HEM ENABLED!!!")
756 .Sleep(100000000, "HEM ENABLED!!!")
757 .Then(s => {
757 .Then(s => {
758 pSurvive.Resolve(false);
758 pSurvive.Resolve(false);
759 return s;
759 return s;
760 });
760 });
761
761
762 result
762 result
763 .Cancelled(() => pSurvive.Resolve(true));
763 .On(() => pSurvive.Resolve(true), PromiseEventType.Cancelled);
764
764
765 return result;
765 return result;
766 });
766 });
767
767
768 hemStarted.WaitOne();
768 hemStarted.WaitOne();
769 p.Cancel();
769 p.Cancel();
770
770
771 try {
771 try {
772 p.Join();
772 p.Join();
773 } catch (OperationCanceledException) {
773 } catch (OperationCanceledException) {
774 Assert.IsTrue(pSurvive.Join());
774 Assert.IsTrue(pSurvive.Join());
775 }
775 }
776 }
776 }
777
777
778 [TestMethod]
778 [TestMethod]
779 public void SharedLockTest() {
779 public void SharedLockTest() {
780 var l = new SharedLock();
780 var l = new SharedLock();
781 int shared = 0;
781 int shared = 0;
782 int exclusive = 0;
782 int exclusive = 0;
783 var s1 = new Signal();
783 var s1 = new Signal();
784 var log = new AsyncQueue<string>();
784 var log = new AsyncQueue<string>();
785
785
786 try {
786 try {
787 AsyncPool.RunThread(
787 AsyncPool.RunThread(
788 () => {
788 () => {
789 log.Enqueue("Reader #1 started");
789 log.Enqueue("Reader #1 started");
790 try {
790 try {
791 l.LockShared();
791 l.LockShared();
792 log.Enqueue("Reader #1 lock got");
792 log.Enqueue("Reader #1 lock got");
793 if (Interlocked.Increment(ref shared) == 2)
793 if (Interlocked.Increment(ref shared) == 2)
794 s1.Set();
794 s1.Set();
795 s1.Wait();
795 s1.Wait();
796 log.Enqueue("Reader #1 finished");
796 log.Enqueue("Reader #1 finished");
797 Interlocked.Decrement(ref shared);
797 Interlocked.Decrement(ref shared);
798 } finally {
798 } finally {
799 l.Release();
799 l.Release();
800 log.Enqueue("Reader #1 lock released");
800 log.Enqueue("Reader #1 lock released");
801 }
801 }
802 },
802 },
803 () => {
803 () => {
804 log.Enqueue("Reader #2 started");
804 log.Enqueue("Reader #2 started");
805
805
806 try {
806 try {
807 l.LockShared();
807 l.LockShared();
808 log.Enqueue("Reader #2 lock got");
808 log.Enqueue("Reader #2 lock got");
809
809
810 if (Interlocked.Increment(ref shared) == 2)
810 if (Interlocked.Increment(ref shared) == 2)
811 s1.Set();
811 s1.Set();
812 s1.Wait();
812 s1.Wait();
813 log.Enqueue("Reader #2 upgrading to writer");
813 log.Enqueue("Reader #2 upgrading to writer");
814 Interlocked.Decrement(ref shared);
814 Interlocked.Decrement(ref shared);
815 l.Upgrade();
815 l.Upgrade();
816 log.Enqueue("Reader #2 upgraded");
816 log.Enqueue("Reader #2 upgraded");
817
817
818 Assert.AreEqual(1, Interlocked.Increment(ref exclusive));
818 Assert.AreEqual(1, Interlocked.Increment(ref exclusive));
819 Assert.AreEqual(0, shared);
819 Assert.AreEqual(0, shared);
820 log.Enqueue("Reader #2 finished");
820 log.Enqueue("Reader #2 finished");
821 Interlocked.Decrement(ref exclusive);
821 Interlocked.Decrement(ref exclusive);
822 } finally {
822 } finally {
823 l.Release();
823 l.Release();
824 log.Enqueue("Reader #2 lock released");
824 log.Enqueue("Reader #2 lock released");
825 }
825 }
826 },
826 },
827 () => {
827 () => {
828 log.Enqueue("Writer #1 started");
828 log.Enqueue("Writer #1 started");
829 try {
829 try {
830 l.LockExclusive();
830 l.LockExclusive();
831 log.Enqueue("Writer #1 got the lock");
831 log.Enqueue("Writer #1 got the lock");
832 Assert.AreEqual(1, Interlocked.Increment(ref exclusive));
832 Assert.AreEqual(1, Interlocked.Increment(ref exclusive));
833 Interlocked.Decrement(ref exclusive);
833 Interlocked.Decrement(ref exclusive);
834 log.Enqueue("Writer #1 is finished");
834 log.Enqueue("Writer #1 is finished");
835 } finally {
835 } finally {
836 l.Release();
836 l.Release();
837 log.Enqueue("Writer #1 lock released");
837 log.Enqueue("Writer #1 lock released");
838 }
838 }
839 }
839 }
840 ).Bundle().Join(1000);
840 ).Bundle().Join(1000);
841 log.Enqueue("Done");
841 log.Enqueue("Done");
842 } catch(Exception error) {
842 } catch(Exception error) {
843 log.Enqueue(error.Message);
843 log.Enqueue(error.Message);
844 throw;
844 throw;
845 } finally {
845 } finally {
846 foreach (var m in log)
846 foreach (var m in log)
847 Console.WriteLine(m);
847 Console.WriteLine(m);
848 }
848 }
849 }
849 }
850 }
850 }
851 }
851 }
852
852
@@ -1,291 +1,308
1 using System;
1 using System;
2 using Implab.Parallels;
2 using Implab.Parallels;
3 using System.Threading;
3 using System.Threading;
4 using System.Reflection;
4 using System.Reflection;
5
5
6 namespace Implab {
6 namespace Implab {
7 public abstract class AbstractPromise<THandler> {
7 public abstract class AbstractPromise<THandler> {
8
8
9 const int UNRESOLVED_SATE = 0;
9 const int UNRESOLVED_SATE = 0;
10 const int TRANSITIONAL_STATE = 1;
10 const int TRANSITIONAL_STATE = 1;
11 const int SUCCEEDED_STATE = 2;
11 const int SUCCEEDED_STATE = 2;
12 const int REJECTED_STATE = 3;
12 const int REJECTED_STATE = 3;
13 const int CANCELLED_STATE = 4;
13 const int CANCELLED_STATE = 4;
14
14
15 const int RESERVED_HANDLERS_COUNT = 4;
15 const int RESERVED_HANDLERS_COUNT = 4;
16
16
17 int m_state;
17 int m_state;
18 Exception m_error;
18 Exception m_error;
19 int m_handlersCount;
19 int m_handlersCount;
20
20
21 readonly THandler[] m_handlers = new THandler[RESERVED_HANDLERS_COUNT];
21 readonly THandler[] m_handlers = new THandler[RESERVED_HANDLERS_COUNT];
22 MTQueue<THandler> m_extraHandlers;
22 MTQueue<THandler> m_extraHandlers;
23 int m_handlerPointer = -1;
23 int m_handlerPointer = -1;
24 int m_handlersCommited;
24 int m_handlersCommited;
25
25
26 #region state managment
26 #region state managment
27 bool BeginTransit() {
27 bool BeginTransit() {
28 return UNRESOLVED_SATE == Interlocked.CompareExchange(ref m_state, TRANSITIONAL_STATE, UNRESOLVED_SATE);
28 return UNRESOLVED_SATE == Interlocked.CompareExchange(ref m_state, TRANSITIONAL_STATE, UNRESOLVED_SATE);
29 }
29 }
30
30
31 void CompleteTransit(int state) {
31 void CompleteTransit(int state) {
32 if (TRANSITIONAL_STATE != Interlocked.CompareExchange(ref m_state, state, TRANSITIONAL_STATE))
32 if (TRANSITIONAL_STATE != Interlocked.CompareExchange(ref m_state, state, TRANSITIONAL_STATE))
33 throw new InvalidOperationException("Can't complete transition when the object isn't in the transitional state");
33 throw new InvalidOperationException("Can't complete transition when the object isn't in the transitional state");
34 }
34 }
35
35
36 void WaitTransition() {
36 void WaitTransition() {
37 while (m_state == TRANSITIONAL_STATE) {
37 while (m_state == TRANSITIONAL_STATE) {
38 Thread.MemoryBarrier();
38 Thread.MemoryBarrier();
39 }
39 }
40 }
40 }
41
41
42 protected bool BeginSetResult() {
42 protected bool BeginSetResult() {
43 if (!BeginTransit()) {
43 if (!BeginTransit()) {
44 WaitTransition();
44 WaitTransition();
45 if (m_state != CANCELLED_STATE)
45 if (m_state != CANCELLED_STATE)
46 throw new InvalidOperationException("The promise is already resolved");
46 throw new InvalidOperationException("The promise is already resolved");
47 return false;
47 return false;
48 }
48 }
49 return true;
49 return true;
50 }
50 }
51
51
52 protected void EndSetResult() {
52 protected void EndSetResult() {
53 CompleteTransit(SUCCEEDED_STATE);
53 CompleteTransit(SUCCEEDED_STATE);
54 OnSuccess();
54 OnSuccess();
55 }
55 }
56
56
57
57
58
58
59 /// <summary>
59 /// <summary>
60 /// ВыполняСт ΠΎΠ±Π΅Ρ‰Π°Π½ΠΈΠ΅, сообщая ΠΎΠ± ошибкС
60 /// ВыполняСт ΠΎΠ±Π΅Ρ‰Π°Π½ΠΈΠ΅, сообщая ΠΎΠ± ошибкС
61 /// </summary>
61 /// </summary>
62 /// <remarks>
62 /// <remarks>
63 /// ΠŸΠΎΡΠΊΠΎΠ»ΡŒΠΊΡƒ ΠΎΠ±Π΅Ρ‰Π°Π½ΠΈΠ΅ Π΄ΠΎΠ»ΠΆΠ½ΠΎ Ρ€Π°Π±ΠΎΡ‚Π°Ρ‚ΡŒ Π² ΠΌΠ½ΠΎΠ³ΠΎΠΏΡ‚ΠΎΡ‡Π½ΠΎΠΉ срСдС, ΠΏΡ€ΠΈ Π΅Π³ΠΎ Π²Ρ‹ΠΏΠΎΠ»Π½Π΅Π½ΠΈΠΈ сразу нСсколько ΠΏΠΎΡ‚ΠΎΠΊΠΎΠ²
63 /// ΠŸΠΎΡΠΊΠΎΠ»ΡŒΠΊΡƒ ΠΎΠ±Π΅Ρ‰Π°Π½ΠΈΠ΅ Π΄ΠΎΠ»ΠΆΠ½ΠΎ Ρ€Π°Π±ΠΎΡ‚Π°Ρ‚ΡŒ Π² ΠΌΠ½ΠΎΠ³ΠΎΠΏΡ‚ΠΎΡ‡Π½ΠΎΠΉ срСдС, ΠΏΡ€ΠΈ Π΅Π³ΠΎ Π²Ρ‹ΠΏΠΎΠ»Π½Π΅Π½ΠΈΠΈ сразу нСсколько ΠΏΠΎΡ‚ΠΎΠΊΠΎΠ²
64 /// ΠΌΠΎΠ³Ρƒ Π²Π΅Ρ€Π½ΡƒΡ‚ΡŒ ΠΎΡˆΠΈΠ±ΠΊΡƒ, ΠΏΡ€ΠΈ этом Ρ‚ΠΎΠ»ΡŒΠΊΠΎ пСрвая Π±ΡƒΠ΄Π΅Ρ‚ использована Π² качСствС Ρ€Π΅Π·ΡƒΠ»ΡŒΡ‚Π°Ρ‚Π°, ΠΎΡΡ‚Π°Π»ΡŒΠ½Ρ‹Π΅
64 /// ΠΌΠΎΠ³Ρƒ Π²Π΅Ρ€Π½ΡƒΡ‚ΡŒ ΠΎΡˆΠΈΠ±ΠΊΡƒ, ΠΏΡ€ΠΈ этом Ρ‚ΠΎΠ»ΡŒΠΊΠΎ пСрвая Π±ΡƒΠ΄Π΅Ρ‚ использована Π² качСствС Ρ€Π΅Π·ΡƒΠ»ΡŒΡ‚Π°Ρ‚Π°, ΠΎΡΡ‚Π°Π»ΡŒΠ½Ρ‹Π΅
65 /// Π±ΡƒΠ΄ΡƒΡ‚ ΠΏΡ€ΠΎΠΈΠ³Π½ΠΎΡ€ΠΈΡ€ΠΎΠ²Π°Π½Ρ‹.
65 /// Π±ΡƒΠ΄ΡƒΡ‚ ΠΏΡ€ΠΎΠΈΠ³Π½ΠΎΡ€ΠΈΡ€ΠΎΠ²Π°Π½Ρ‹.
66 /// </remarks>
66 /// </remarks>
67 /// <param name="error">Π˜ΡΠΊΠ»ΡŽΡ‡Π΅Π½ΠΈΠ΅ возникшСС ΠΏΡ€ΠΈ Π²Ρ‹ΠΏΠΎΠ»Π½Π΅Π½ΠΈΠΈ ΠΎΠΏΠ΅Ρ€Π°Ρ†ΠΈΠΈ</param>
67 /// <param name="error">Π˜ΡΠΊΠ»ΡŽΡ‡Π΅Π½ΠΈΠ΅ возникшСС ΠΏΡ€ΠΈ Π²Ρ‹ΠΏΠΎΠ»Π½Π΅Π½ΠΈΠΈ ΠΎΠΏΠ΅Ρ€Π°Ρ†ΠΈΠΈ</param>
68 /// <exception cref="InvalidOperationException">Π”Π°Π½Π½ΠΎΠ΅ ΠΎΠ±Π΅Ρ‰Π°Π½ΠΈΠ΅ ΡƒΠΆΠ΅ Π²Ρ‹ΠΏΠΎΠ»Π½Π΅Π½ΠΎ</exception>
68 /// <exception cref="InvalidOperationException">Π”Π°Π½Π½ΠΎΠ΅ ΠΎΠ±Π΅Ρ‰Π°Π½ΠΈΠ΅ ΡƒΠΆΠ΅ Π²Ρ‹ΠΏΠΎΠ»Π½Π΅Π½ΠΎ</exception>
69 protected void SetError(Exception error) {
69 protected void SetError(Exception error) {
70 if (BeginTransit()) {
70 if (BeginTransit()) {
71 if (error is OperationCanceledException) {
72 CompleteTransit(CANCELLED_STATE);
73 m_error = error.InnerException;
74 OnCancelled();
75 } else {
71 m_error = error is PromiseTransientException ? error.InnerException : error;
76 m_error = error is PromiseTransientException ? error.InnerException : error;
72 CompleteTransit(REJECTED_STATE);
77 CompleteTransit(REJECTED_STATE);
73 OnError();
78 OnError();
79 }
74 } else {
80 } else {
75 WaitTransition();
81 WaitTransition();
76 if (m_state == SUCCEEDED_STATE)
82 if (m_state == SUCCEEDED_STATE)
77 throw new InvalidOperationException("The promise is already resolved");
83 throw new InvalidOperationException("The promise is already resolved");
78 }
84 }
79 }
85 }
80
86
81 /// <summary>
87 /// <summary>
82 /// ΠžΡ‚ΠΌΠ΅Π½ΡΠ΅Ρ‚ ΠΎΠΏΠ΅Ρ€Π°Ρ†ΠΈΡŽ, Ссли это Π²ΠΎΠ·ΠΌΠΎΠΆΠ½ΠΎ.
88 /// ΠžΡ‚ΠΌΠ΅Π½ΡΠ΅Ρ‚ ΠΎΠΏΠ΅Ρ€Π°Ρ†ΠΈΡŽ, Ссли это Π²ΠΎΠ·ΠΌΠΎΠΆΠ½ΠΎ.
83 /// </summary>
89 /// </summary>
84 /// <remarks>Для опрСдСлСния Π±Ρ‹Π»Π° Π»ΠΈ опСрация ΠΎΡ‚ΠΌΠ΅Π½Π΅Π½Π° слСдуСт ΠΈΡΠΏΠΎΠ»ΡŒΠ·ΠΎΠ²Π°Ρ‚ΡŒ свойство <see cref="IsCancelled"/>.</remarks>
90 /// <remarks>Для опрСдСлСния Π±Ρ‹Π»Π° Π»ΠΈ опСрация ΠΎΡ‚ΠΌΠ΅Π½Π΅Π½Π° слСдуСт ΠΈΡΠΏΠΎΠ»ΡŒΠ·ΠΎΠ²Π°Ρ‚ΡŒ свойство <see cref="IsCancelled"/>.</remarks>
85 protected void SetCancelled() {
91 protected void SetCancelled(Exception reason) {
86 if (BeginTransit()) {
92 if (BeginTransit()) {
93 m_error = reason;
87 CompleteTransit(CANCELLED_STATE);
94 CompleteTransit(CANCELLED_STATE);
88 OnCancelled();
95 OnCancelled();
89 }
96 }
90 }
97 }
91
98
92 protected abstract void SignalSuccess(THandler handler);
99 protected abstract void SignalSuccess(THandler handler);
93
100
94 protected abstract void SignalError(THandler handler, Exception error);
101 protected abstract void SignalError(THandler handler, Exception error);
95
102
96 protected abstract void SignalCancelled(THandler handler);
103 protected abstract void SignalCancelled(THandler handler, Exception reason);
97
104
98 void OnSuccess() {
105 void OnSuccess() {
99 var hp = m_handlerPointer;
106 var hp = m_handlerPointer;
100 var slot = hp +1 ;
107 var slot = hp +1 ;
101 while (slot < m_handlersCommited) {
108 while (slot < m_handlersCommited) {
102 if (Interlocked.CompareExchange(ref m_handlerPointer, slot, hp) == hp) {
109 if (Interlocked.CompareExchange(ref m_handlerPointer, slot, hp) == hp) {
103 SignalSuccess(m_handlers[slot]);
110 SignalSuccess(m_handlers[slot]);
104 }
111 }
105 hp = m_handlerPointer;
112 hp = m_handlerPointer;
106 slot = hp +1 ;
113 slot = hp +1 ;
107 }
114 }
108
115
109
116
110 if (m_extraHandlers != null) {
117 if (m_extraHandlers != null) {
111 THandler handler;
118 THandler handler;
112 while (m_extraHandlers.TryDequeue(out handler))
119 while (m_extraHandlers.TryDequeue(out handler))
113 SignalSuccess(handler);
120 SignalSuccess(handler);
114 }
121 }
115 }
122 }
116
123
117 void OnError() {
124 void OnError() {
118 var hp = m_handlerPointer;
125 var hp = m_handlerPointer;
119 var slot = hp +1 ;
126 var slot = hp +1 ;
120 while (slot < m_handlersCommited) {
127 while (slot < m_handlersCommited) {
121 if (Interlocked.CompareExchange(ref m_handlerPointer, slot, hp) == hp) {
128 if (Interlocked.CompareExchange(ref m_handlerPointer, slot, hp) == hp) {
122 SignalError(m_handlers[slot],m_error);
129 SignalError(m_handlers[slot],m_error);
123 }
130 }
124 hp = m_handlerPointer;
131 hp = m_handlerPointer;
125 slot = hp +1 ;
132 slot = hp +1 ;
126 }
133 }
127
134
128 if (m_extraHandlers != null) {
135 if (m_extraHandlers != null) {
129 THandler handler;
136 THandler handler;
130 while (m_extraHandlers.TryDequeue(out handler))
137 while (m_extraHandlers.TryDequeue(out handler))
131 SignalError(handler, m_error);
138 SignalError(handler, m_error);
132 }
139 }
133 }
140 }
134
141
135 void OnCancelled() {
142 void OnCancelled() {
136 var hp = m_handlerPointer;
143 var hp = m_handlerPointer;
137 var slot = hp +1 ;
144 var slot = hp +1 ;
138 while (slot < m_handlersCommited) {
145 while (slot < m_handlersCommited) {
139 if (Interlocked.CompareExchange(ref m_handlerPointer, slot, hp) == hp) {
146 if (Interlocked.CompareExchange(ref m_handlerPointer, slot, hp) == hp) {
140 SignalCancelled(m_handlers[slot]);
147 SignalCancelled(m_handlers[slot], m_error);
141 }
148 }
142 hp = m_handlerPointer;
149 hp = m_handlerPointer;
143 slot = hp +1 ;
150 slot = hp +1 ;
144 }
151 }
145
152
146 if (m_extraHandlers != null) {
153 if (m_extraHandlers != null) {
147 THandler handler;
154 THandler handler;
148 while (m_extraHandlers.TryDequeue(out handler))
155 while (m_extraHandlers.TryDequeue(out handler))
149 SignalCancelled(handler);
156 SignalCancelled(handler, m_error);
150 }
157 }
151 }
158 }
152
159
153 #endregion
160 #endregion
154
161
155 protected abstract void Listen(PromiseEventType events, Action handler);
162 protected abstract void Listen(PromiseEventType events, Action handler);
156
163
157 #region synchronization traits
164 #region synchronization traits
158 protected void WaitResult(int timeout) {
165 protected void WaitResult(int timeout) {
159 if (!IsResolved) {
166 if (!IsResolved) {
160 var lk = new object();
167 var lk = new object();
161
168
162 Listen(PromiseEventType.All, () => {
169 Listen(PromiseEventType.All, () => {
163 lock(lk) {
170 lock(lk) {
164 Monitor.Pulse(lk);
171 Monitor.Pulse(lk);
165 }
172 }
166 });
173 });
167
174
168 lock (lk) {
175 lock (lk) {
169 while(!IsResolved) {
176 while(!IsResolved) {
170 if(!Monitor.Wait(lk,timeout))
177 if(!Monitor.Wait(lk,timeout))
171 throw new TimeoutException();
178 throw new TimeoutException();
172 }
179 }
173 }
180 }
174
181
175 }
182 }
176 switch (m_state) {
183 switch (m_state) {
177 case SUCCEEDED_STATE:
184 case SUCCEEDED_STATE:
178 return;
185 return;
179 case CANCELLED_STATE:
186 case CANCELLED_STATE:
180 throw new OperationCanceledException();
187 throw new OperationCanceledException();
181 case REJECTED_STATE:
188 case REJECTED_STATE:
182 throw new TargetInvocationException(m_error);
189 throw new TargetInvocationException(m_error);
183 default:
190 default:
184 throw new ApplicationException(String.Format("Invalid promise state {0}", m_state));
191 throw new ApplicationException(String.Format("Invalid promise state {0}", m_state));
185 }
192 }
186 }
193 }
187 #endregion
194 #endregion
188
195
189 #region handlers managment
196 #region handlers managment
190
197
191 protected void AddHandler(THandler handler) {
198 protected void AddHandler(THandler handler) {
192
199
193 if (m_state > 1) {
200 if (m_state > 1) {
194 // the promise is in the resolved state, just invoke the handler
201 // the promise is in the resolved state, just invoke the handler
195 InvokeHandler(handler);
202 InvokeHandler(handler);
196 } else {
203 } else {
197 var slot = Interlocked.Increment(ref m_handlersCount) - 1;
204 var slot = Interlocked.Increment(ref m_handlersCount) - 1;
198
205
199 if (slot < RESERVED_HANDLERS_COUNT) {
206 if (slot < RESERVED_HANDLERS_COUNT) {
200 m_handlers[slot] = handler;
207 m_handlers[slot] = handler;
201
208
202 while (slot != Interlocked.CompareExchange(ref m_handlersCommited, slot + 1, slot)) {
209 while (slot != Interlocked.CompareExchange(ref m_handlersCommited, slot + 1, slot)) {
203 }
210 }
204
211
205 if (m_state > 1) {
212 if (m_state > 1) {
206 do {
213 do {
207 var hp = m_handlerPointer;
214 var hp = m_handlerPointer;
208 slot = hp + 1;
215 slot = hp + 1;
209 if (slot < m_handlersCommited) {
216 if (slot < m_handlersCommited) {
210 if (Interlocked.CompareExchange(ref m_handlerPointer, slot, hp) != hp)
217 if (Interlocked.CompareExchange(ref m_handlerPointer, slot, hp) != hp)
211 continue;
218 continue;
212 InvokeHandler(m_handlers[slot]);
219 InvokeHandler(m_handlers[slot]);
213 }
220 }
214 break;
221 break;
215 } while(true);
222 } while(true);
216 }
223 }
217 } else {
224 } else {
218 if (slot == RESERVED_HANDLERS_COUNT) {
225 if (slot == RESERVED_HANDLERS_COUNT) {
219 m_extraHandlers = new MTQueue<THandler>();
226 m_extraHandlers = new MTQueue<THandler>();
220 } else {
227 } else {
221 while (m_extraHandlers == null)
228 while (m_extraHandlers == null)
222 Thread.MemoryBarrier();
229 Thread.MemoryBarrier();
223 }
230 }
224
231
225 m_extraHandlers.Enqueue(handler);
232 m_extraHandlers.Enqueue(handler);
226
233
227 if (m_state > 1 && m_extraHandlers.TryDequeue(out handler))
234 if (m_state > 1 && m_extraHandlers.TryDequeue(out handler))
228 // if the promise have been resolved while we was adding the handler to the queue
235 // if the promise have been resolved while we was adding the handler to the queue
229 // we can't guarantee that someone is still processing it
236 // we can't guarantee that someone is still processing it
230 // therefore we need to fetch a handler from the queue and execute it
237 // therefore we need to fetch a handler from the queue and execute it
231 // note that fetched handler may be not the one that we have added
238 // note that fetched handler may be not the one that we have added
232 // even we can fetch no handlers at all :)
239 // even we can fetch no handlers at all :)
233 InvokeHandler(handler);
240 InvokeHandler(handler);
234 }
241 }
235 }
242 }
236 }
243 }
237
244
238 protected void InvokeHandler(THandler handler) {
245 protected void InvokeHandler(THandler handler) {
239 switch (m_state) {
246 switch (m_state) {
240 case SUCCEEDED_STATE:
247 case SUCCEEDED_STATE:
241 SignalSuccess(handler);
248 SignalSuccess(handler);
242 break;
249 break;
243 case CANCELLED_STATE:
250 case CANCELLED_STATE:
244 SignalCancelled(handler);
251 SignalCancelled(handler, m_error);
245 break;
252 break;
246 case REJECTED_STATE:
253 case REJECTED_STATE:
247 SignalError(handler, m_error);
254 SignalError(handler, m_error);
248 break;
255 break;
249 default:
256 default:
250 throw new Exception(String.Format("Invalid promise state {0}", m_state));
257 throw new Exception(String.Format("Invalid promise state {0}", m_state));
251 }
258 }
252 }
259 }
253
260
254 #endregion
261 #endregion
255
262
256 #region IPromise implementation
263 #region IPromise implementation
257
264
258 public void Join(int timeout) {
265 public void Join(int timeout) {
259 WaitResult(timeout);
266 WaitResult(timeout);
260 }
267 }
261
268
262 public void Join() {
269 public void Join() {
263 WaitResult(-1);
270 WaitResult(-1);
264 }
271 }
265
272
266 public bool IsResolved {
273 public bool IsResolved {
267 get {
274 get {
268 Thread.MemoryBarrier();
275 Thread.MemoryBarrier();
269 return m_state > 1;
276 return m_state > 1;
270 }
277 }
271 }
278 }
272
279
273 public bool IsCancelled {
280 public bool IsCancelled {
274 get {
281 get {
275 Thread.MemoryBarrier();
282 Thread.MemoryBarrier();
276 return m_state == CANCELLED_STATE;
283 return m_state == CANCELLED_STATE;
277 }
284 }
278 }
285 }
279
286
280 #endregion
287 #endregion
281
288
282 #region ICancellable implementation
289 #region ICancellable implementation
283
290
284 public void Cancel() {
291 public void Cancel() {
285 SetCancelled();
292 SetCancelled(null);
293 }
294
295 public void Cancel(Exception reason) {
296 SetCancelled(reason);
286 }
297 }
287
298
288 #endregion
299 #endregion
300
301 public Exception Error {
302 get {
303 return m_error;
304 }
305 }
289 }
306 }
290 }
307 }
291
308
@@ -1,41 +1,41
1 namespace Implab.Diagnostics {
1 namespace Implab.Diagnostics {
2 public static class Extensions {
2 public static class Extensions {
3 public static IPromise<T> EndLogicalOperation<T>(this IPromise<T> promise) {
3 public static IPromise<T> EndLogicalOperation<T>(this IPromise<T> promise) {
4 Safe.ArgumentNotNull(promise, "promise");
4 Safe.ArgumentNotNull(promise, "promise");
5 var op = TraceContext.Instance.DetachLogicalOperation();
5 var op = TraceContext.Instance.DetachLogicalOperation();
6
6
7 return promise.On(
7 return promise.On(
8 x => {
8 x => {
9 TraceContext.Instance.EnterLogicalOperation(op,true);
9 TraceContext.Instance.EnterLogicalOperation(op,true);
10 TraceLog.TraceInformation("promise = {0}", x);
10 TraceLog.TraceInformation("promise = {0}", x);
11 TraceLog.EndLogicalOperation();
11 TraceLog.EndLogicalOperation();
12 TraceContext.Instance.Leave();
12 TraceContext.Instance.Leave();
13 },
13 },
14 err =>{
14 err =>{
15 TraceContext.Instance.EnterLogicalOperation(op,true);
15 TraceContext.Instance.EnterLogicalOperation(op,true);
16 TraceLog.TraceError("promise died {0}", err);
16 TraceLog.TraceError("promise died {0}", err);
17 TraceLog.EndLogicalOperation();
17 TraceLog.EndLogicalOperation();
18 TraceContext.Instance.Leave();
18 TraceContext.Instance.Leave();
19 },
19 },
20 () => {
20 reason => {
21 TraceContext.Instance.EnterLogicalOperation(op,true);
21 TraceContext.Instance.EnterLogicalOperation(op,true);
22 TraceLog.TraceInformation("promise cancelled");
22 TraceLog.TraceInformation("promise cancelled {0}", reason.Message);
23 TraceLog.EndLogicalOperation();
23 TraceLog.EndLogicalOperation();
24 TraceContext.Instance.Leave();
24 TraceContext.Instance.Leave();
25 }
25 }
26 );
26 );
27 }
27 }
28
28
29 public static IPromise EndLogicalOperation(this IPromise promise) {
29 public static IPromise EndLogicalOperation(this IPromise promise) {
30 Safe.ArgumentNotNull(promise, "promise");
30 Safe.ArgumentNotNull(promise, "promise");
31 var op = TraceContext.Instance.DetachLogicalOperation();
31 var op = TraceContext.Instance.DetachLogicalOperation();
32
32
33 return promise.On(() => {
33 return promise.On(() => {
34 TraceContext.Instance.EnterLogicalOperation(op,true);
34 TraceContext.Instance.EnterLogicalOperation(op,true);
35 TraceLog.EndLogicalOperation();
35 TraceLog.EndLogicalOperation();
36 TraceContext.Instance.Leave();
36 TraceContext.Instance.Leave();
37 }, PromiseEventType.All);
37 }, PromiseEventType.All);
38 }
38 }
39 }
39 }
40 }
40 }
41
41
@@ -1,10 +1,11
1 using System;
1 using System;
2 using System.Collections.Generic;
2 using System.Collections.Generic;
3 using System.Linq;
3 using System.Linq;
4 using System.Text;
4 using System.Text;
5
5
6 namespace Implab {
6 namespace Implab {
7 public interface ICancellable {
7 public interface ICancellable {
8 void Cancel();
8 void Cancel();
9 void Cancel(Exception reason);
9 }
10 }
10 }
11 }
@@ -1,14 +1,24
1 using System;
1 using System;
2
2
3 namespace Implab {
3 namespace Implab {
4 /// <summary>
4 /// <summary>
5 /// Deferred result, usually used by asynchronous services as the service part of the promise.
5 /// Deferred result, usually used by asynchronous services as the service part of the promise.
6 /// </summary>
6 /// </summary>
7 public interface IDeferred : ICancellable {
7 public interface IDeferred : ICancellable {
8
8
9 void Resolve();
9 void Resolve();
10
10
11 /// <summary>
12 /// Reject the promise with the specified error.
13 /// </summary>
14 /// <param name="error">The reason why the promise is rejected.</param>
15 /// <remarks>
16 /// Some exceptions are treated in a special case:
17 /// <see cref="OperationCanceledException"/> is interpreted as call to <see cref="Cancel()"/> method,
18 /// and <see cref="PromiseTransientException"/> is always unwrapped and its
19 /// <see cref="PromiseTransientException.InnerException"> is used as the reason to reject promise.
20 /// </remarks>
11 void Reject(Exception error);
21 void Reject(Exception error);
12 }
22 }
13 }
23 }
14
24
@@ -1,135 +1,106
1 using System;
1 using System;
2 using System.Collections.Generic;
2 using System.Collections.Generic;
3 using System.Linq;
3 using System.Linq;
4 using System.Text;
4 using System.Text;
5
5
6 namespace Implab {
6 namespace Implab {
7 public interface IPromise: ICancellable {
7 public interface IPromise: ICancellable {
8
8
9 /// <summary>
9 /// <summary>
10 /// Π’ΠΈΠΏ Ρ€Π΅Π·ΡƒΠ»ΡŒΡ‚Π°Ρ‚Π°, ΠΏΠΎΠ»ΡƒΡ‡Π°Π΅ΠΌΠΎΠ³ΠΎ Ρ‡Π΅Ρ€Π΅Π· Π΄Π°Π½Π½ΠΎΠ΅ ΠΎΠ±Π΅Ρ‰Π°Π½ΠΈΠ΅.
10 /// Π’ΠΈΠΏ Ρ€Π΅Π·ΡƒΠ»ΡŒΡ‚Π°Ρ‚Π°, ΠΏΠΎΠ»ΡƒΡ‡Π°Π΅ΠΌΠΎΠ³ΠΎ Ρ‡Π΅Ρ€Π΅Π· Π΄Π°Π½Π½ΠΎΠ΅ ΠΎΠ±Π΅Ρ‰Π°Π½ΠΈΠ΅.
11 /// </summary>
11 /// </summary>
12 Type PromiseType { get; }
12 Type PromiseType { get; }
13
13
14 /// <summary>
14 /// <summary>
15 /// ΠžΠ±Π΅Ρ‰Π°Π½ΠΈΠ΅ являСтся Π²Ρ‹ΠΏΠΎΠ»Π½Π΅Π½Ρ‹ΠΌ, Π»ΠΈΠ±ΠΎ ΡƒΡΠΏΠ΅ΡˆΠ½ΠΎ, Π»ΠΈΠ±ΠΎ с ошибкой, Π»ΠΈΠ±ΠΎ ΠΎΡ‚ΠΌΠ΅Π½Π΅Π½ΠΎ.
15 /// ΠžΠ±Π΅Ρ‰Π°Π½ΠΈΠ΅ являСтся Π²Ρ‹ΠΏΠΎΠ»Π½Π΅Π½Ρ‹ΠΌ, Π»ΠΈΠ±ΠΎ ΡƒΡΠΏΠ΅ΡˆΠ½ΠΎ, Π»ΠΈΠ±ΠΎ с ошибкой, Π»ΠΈΠ±ΠΎ ΠΎΡ‚ΠΌΠ΅Π½Π΅Π½ΠΎ.
16 /// </summary>
16 /// </summary>
17 bool IsResolved { get; }
17 bool IsResolved { get; }
18
18
19 /// <summary>
19 /// <summary>
20 /// ΠžΠ±Π΅Ρ‰Π°Π½ΠΈΠ΅ Π±Ρ‹Π»ΠΎ ΠΎΡ‚ΠΌΠ΅Π½Π΅Π½ΠΎ.
20 /// ΠžΠ±Π΅Ρ‰Π°Π½ΠΈΠ΅ Π±Ρ‹Π»ΠΎ ΠΎΡ‚ΠΌΠ΅Π½Π΅Π½ΠΎ.
21 /// </summary>
21 /// </summary>
22 bool IsCancelled { get; }
22 bool IsCancelled { get; }
23
23
24 /// <summary>
24 /// <summary>
25 /// Π˜ΡΠΊΠ»ΡŽΡ‡Π΅Π½ΠΈΠ΅ возникшСС Π² Ρ€Π΅Π·ΡƒΠ»ΡŒΡ‚Π°Ρ‚Π΅ выполнСния обСщания, Π»ΠΈΠ±ΠΎ ΠΏΡ€ΠΈΡ‡ΠΈΠ½Π° ΠΎΡ‚ΠΌΠ΅Π½Ρ‹.
26 /// </summary>
27 Exception Error { get; }
28
29 /// <summary>
25 /// Creates a new promise dependend on the current one and resolved on
30 /// Creates a new promise dependend on the current one and resolved on
26 /// executing the specified handlers.
31 /// executing the specified handlers.
27 /// </summary>
32 /// </summary>
28 /// <param name="success">The handler called on the successful promise completion.</param>
33 /// <param name="success">The handler called on the successful promise completion.</param>
29 /// <param name="error">The handler is called if an error while completing the promise occurred.</param>
34 /// <param name="error">The handler is called if an error while completing the promise occurred.</param>
30 /// <param name="cancel">The handler is called in case of promise cancellation.</param>
35 /// <param name="cancel">The handler is called in case of promise cancellation.</param>
31 /// <returns>The newly created dependant promise.</returns>
36 /// <returns>The newly created dependant promise.</returns>
32 /// <remarks>
37 /// <remarks>
33 /// <para>
38 /// <para>
34 /// If the success handler is specified the dependend promise will be resolved after the handler is
39 /// If the success handler is specified the dependend promise will be resolved after the handler is
35 /// executed and the dependent promise will be linked to the current one, i.e. the cancellation
40 /// executed and the dependent promise will be linked to the current one, i.e. the cancellation
36 /// of the dependent property will lead to the cancellation of the current promise. If the
41 /// of the dependent property will lead to the cancellation of the current promise. If the
37 /// success handler isn't specified the dependent promise will not be linked to and
42 /// success handler isn't specified the dependent promise will not be linked to and
38 /// will not be resolved after the successfull resolution of the current one.
43 /// will not be resolved after the successfull resolution of the current one.
39 /// </para>
44 /// </para>
40 /// <para>
45 /// <para>
41 /// When the error handler is specified, the exception raised during the current promise completion
46 /// When the error handler is specified, the exception raised during the current promise completion
42 /// will be passed to it as the parameter. If the error handler returns without raising an
47 /// will be passed to it as the parameter. If the error handler returns without raising an
43 /// exception then the dependant promise will be resolved successfully, otherwise the exception
48 /// exception then the dependant promise will be resolved successfully, otherwise the exception
44 /// raised by the handler will be transmitted to the dependent promise. If the handler wants
49 /// raised by the handler will be transmitted to the dependent promise. If the handler wants
45 /// to passthrough the original exception it needs to wrap the exception with
50 /// to passthrough the original exception it needs to wrap the exception with
46 /// the <see cref="PromiseTransientException"/>.
51 /// the <see cref="PromiseTransientException"/>. The handler may raise <see cref="OperationCanceledException"/>
52 /// to cancel the dependant promise, the innner exception specifies the reason why the promise
53 /// is canceled.
47 /// </para>
54 /// </para>
48 /// <para>
55 /// <para>
49 /// If the cancelation handler is specified and the current promise is cancelled then the dependent
56 /// If the cancelation handler is specified and the current promise is cancelled then the dependent
50 /// promise will be resolved after the handler is executed. If the cancelation hendler raises the
57 /// promise will be resolved after the handler is executed. If the cancelation handler raises the
51 /// exception it will be passed to the dependent promise.
58 /// exception it will be passed to the dependent promise.
52 /// </para>
59 /// </para>
53 /// </remarks>
60 /// </remarks>
54 IPromise Then(Action success, Action<Exception> error, Action cancel);
61 IPromise Then(Action success, Action<Exception> error, Action<Exception> cancel);
55 IPromise Then(Action success, Action<Exception> error);
62 IPromise Then(Action success, Action<Exception> error);
56 IPromise Then(Action success);
63 IPromise Then(Action success);
57
64
58 IPromise Chain(Func<IPromise> chained, Func<Exception, IPromise> error, Func<IPromise> cancel);
65 IPromise Chain(Func<IPromise> chained, Func<Exception, IPromise> error, Func<Exception, IPromise> cancel);
59 IPromise Chain(Func<IPromise> chained, Func<Exception, IPromise> error);
66 IPromise Chain(Func<IPromise> chained, Func<Exception, IPromise> error);
60 IPromise Chain(Func<IPromise> chained);
67 IPromise Chain(Func<IPromise> chained);
61
68
62 /// <summary>
69 /// <summary>
63 /// Adds specified listeners to the current promise.
70 /// Adds specified listeners to the current promise.
64 /// </summary>
71 /// </summary>
65 /// <param name="success">The handler called on the successful promise completion.</param>
72 /// <param name="success">The handler called on the successful promise completion.</param>
66 /// <param name="error">The handler is called if an error while completing the promise occurred.</param>
73 /// <param name="error">The handler is called if an error while completing the promise occurred.</param>
67 /// <param name="cancel">The handler is called in case of promise cancellation.</param>
74 /// <param name="cancel">The handler is called in case of promise cancellation.</param>
68 /// <returns>The current promise.</returns>
75 /// <returns>The current promise.</returns>
69 IPromise On(Action success, Action<Exception> error, Action cancel);
76 IPromise On(Action success, Action<Exception> error, Action<Exception> cancel);
70 IPromise On(Action success, Action<Exception> error);
77 IPromise On(Action success, Action<Exception> error);
71 IPromise On(Action success);
78 IPromise On(Action success);
72
79
73 /// <summary>
80 /// <summary>
74 /// Adds specified listeners to the current promise.
81 /// Adds specified listeners to the current promise.
75 /// </summary>
82 /// </summary>
76 /// <param name="handler">The handler called on the specified events.</param>
83 /// <param name="handler">The handler called on the specified events.</param>
77 /// <param name = "events">The combination of flags denoting the events for which the
84 /// <param name = "events">The combination of flags denoting the events for which the
78 /// handler shoud be called.</param>
85 /// handler shoud be called.</param>
79 /// <returns>The current promise.</returns>
86 /// <returns>The current promise.</returns>
80 IPromise On(Action handler, PromiseEventType events);
87 IPromise On(Action handler, PromiseEventType events);
81
88
82 /// <summary>
89 /// <summary>
83 /// Adds the specified error handler to the current promise
84 /// and creates the new dependant promise.
85 /// </summary>
86 /// <param name="error">
87 /// The error handler. If the error handler returns without
88 /// an error the dependant promise will be successfully resolved.
89 /// </param>
90 /// <returns>
91 /// The new dependant promise which will be resolved after the error
92 /// handler is executed.
93 /// </returns>
94 /// <remarks>
95 /// The successfull result of the current promise will be ignored.
96 /// </remarks>
97 IPromise Error(Action<Exception> error);
98
99 /// <summary>
100 /// Adds the specified cncellation handler to the current promise
101 /// and creates the new dependant promise.
102 /// </summary>
103 /// <returns>
104 /// The new dependant promise which will be resolved after the cancellation
105 /// handler is executed.
106 /// </returns>
107 /// <param name="handler">
108 /// The cancellation handler.
109 /// </param>
110 /// <remarks>
111 /// If the cancellation handler is executed without an error the dependent
112 /// promise will be successfully resolved, otherwise the raised exception
113 /// will be passed to the dependant promise. The successful result of the
114 /// current promise will be ignored.
115 /// </remarks>
116 IPromise Cancelled(Action handler);
117
118 /// <summary>
119 /// ΠŸΡ€Π΅ΠΎΠ±Ρ€Π°Π·ΡƒΠ΅Ρ‚ Ρ€Π΅Π·ΡƒΠ»ΡŒΡ‚Π°Ρ‚ обСщания ΠΊ Π·Π°Π΄Π°Π½Π½ΠΎΠΌΡƒ Ρ‚ΠΈΠΏΡƒ ΠΈ Π²ΠΎΠ·Π²Ρ€Π°Ρ‰Π°Π΅Ρ‚ Π½ΠΎΠ²ΠΎΠ΅ ΠΎΠ±Π΅Ρ‰Π°Π½ΠΈΠ΅.
90 /// ΠŸΡ€Π΅ΠΎΠ±Ρ€Π°Π·ΡƒΠ΅Ρ‚ Ρ€Π΅Π·ΡƒΠ»ΡŒΡ‚Π°Ρ‚ обСщания ΠΊ Π·Π°Π΄Π°Π½Π½ΠΎΠΌΡƒ Ρ‚ΠΈΠΏΡƒ ΠΈ Π²ΠΎΠ·Π²Ρ€Π°Ρ‰Π°Π΅Ρ‚ Π½ΠΎΠ²ΠΎΠ΅ ΠΎΠ±Π΅Ρ‰Π°Π½ΠΈΠ΅.
120 /// </summary>
91 /// </summary>
121 IPromise<T> Cast<T>();
92 IPromise<T> Cast<T>();
122
93
123 /// <summary>
94 /// <summary>
124 /// Π‘ΠΈΠ½Ρ…Ρ€ΠΎΠ½ΠΈΠ·ΠΈΡ€ΡƒΠ΅Ρ‚ Ρ‚Π΅ΠΊΡƒΡ‰ΠΈΠΉ ΠΏΠΎΡ‚ΠΎΠΊ с ΠΎΠ±Π΅Ρ‰Π°Π½ΠΈΠ΅ΠΌ.
95 /// Π‘ΠΈΠ½Ρ…Ρ€ΠΎΠ½ΠΈΠ·ΠΈΡ€ΡƒΠ΅Ρ‚ Ρ‚Π΅ΠΊΡƒΡ‰ΠΈΠΉ ΠΏΠΎΡ‚ΠΎΠΊ с ΠΎΠ±Π΅Ρ‰Π°Π½ΠΈΠ΅ΠΌ.
125 /// </summary>
96 /// </summary>
126 void Join();
97 void Join();
127 /// <summary>
98 /// <summary>
128 /// Π‘ΠΈΠ½Ρ…Ρ€ΠΎΠ½ΠΈΠ·ΠΈΡ€ΡƒΠ΅Ρ‚ Ρ‚Π΅ΠΊΡƒΡ‰ΠΈΠΉ ΠΏΠΎΡ‚ΠΎΠΊ с ΠΎΠ±Π΅Ρ‰Π°Π½ΠΈΠ΅ΠΌ.
99 /// Π‘ΠΈΠ½Ρ…Ρ€ΠΎΠ½ΠΈΠ·ΠΈΡ€ΡƒΠ΅Ρ‚ Ρ‚Π΅ΠΊΡƒΡ‰ΠΈΠΉ ΠΏΠΎΡ‚ΠΎΠΊ с ΠΎΠ±Π΅Ρ‰Π°Π½ΠΈΠ΅ΠΌ.
129 /// </summary>
100 /// </summary>
130 /// <param name="timeout">ВрСмя оТидания, ΠΏΠΎ Π΅Π³ΠΎ ΠΈΡΡ‚Π΅Ρ‡Π΅Π½ΠΈΡŽ Π²ΠΎΠ·Π½ΠΈΠΊΠ½Π΅Ρ‚ ΠΈΡΠΊΠ»ΡŽΡ‡Π΅Π½ΠΈΠ΅.</param>
101 /// <param name="timeout">ВрСмя оТидания, ΠΏΠΎ Π΅Π³ΠΎ ΠΈΡΡ‚Π΅Ρ‡Π΅Π½ΠΈΡŽ Π²ΠΎΠ·Π½ΠΈΠΊΠ½Π΅Ρ‚ ΠΈΡΠΊΠ»ΡŽΡ‡Π΅Π½ΠΈΠ΅.</param>
131 /// <exception cref="TimeoutException">ΠŸΡ€Π΅Π²Ρ‹ΡˆΠ΅Π½ΠΎ врСмя оТидания.</exception>
102 /// <exception cref="TimeoutException">ΠŸΡ€Π΅Π²Ρ‹ΡˆΠ΅Π½ΠΎ врСмя оТидания.</exception>
132 void Join(int timeout);
103 void Join(int timeout);
133
104
134 }
105 }
135 }
106 }
@@ -1,34 +1,30
1 using System;
1 using System;
2
2
3 namespace Implab {
3 namespace Implab {
4 public interface IPromise<out T> : IPromise {
4 public interface IPromise<out T> : IPromise {
5
5
6 new T Join();
6 new T Join();
7
7
8 new T Join(int timeout);
8 new T Join(int timeout);
9
9
10 IPromise<T> On(Action<T> success, Action<Exception> error, Action cancel);
10 IPromise<T> On(Action<T> success, Action<Exception> error, Action<Exception> cancel);
11
11
12 IPromise<T> On(Action<T> success, Action<Exception> error);
12 IPromise<T> On(Action<T> success, Action<Exception> error);
13
13
14 IPromise<T> On(Action<T> success);
14 IPromise<T> On(Action<T> success);
15
15
16 new IPromise<T> On(Action handler, PromiseEventType events);
16 new IPromise<T> On(Action handler, PromiseEventType events);
17
17
18 IPromise<T2> Then<T2>(Func<T, T2> mapper, Func<Exception,T2> error, Func<T2> cancel);
18 IPromise<T2> Then<T2>(Func<T, T2> mapper, Func<Exception, T2> error, Func<Exception, T2> cancel);
19
19
20 IPromise<T2> Then<T2>(Func<T, T2> mapper, Func<Exception,T2> error);
20 IPromise<T2> Then<T2>(Func<T, T2> mapper, Func<Exception, T2> error);
21
21
22 IPromise<T2> Then<T2>(Func<T, T2> mapper);
22 IPromise<T2> Then<T2>(Func<T, T2> mapper);
23
23
24 IPromise<T2> Chain<T2>(Func<T, IPromise<T2>> chained, Func<Exception,IPromise<T2>> error, Func<IPromise<T2>> cancel);
24 IPromise<T2> Chain<T2>(Func<T, IPromise<T2>> chained, Func<Exception, IPromise<T2>> error, Func<Exception, IPromise<T2>> cancel);
25
25
26 IPromise<T2> Chain<T2>(Func<T, IPromise<T2>> chained, Func<Exception,IPromise<T2>> error);
26 IPromise<T2> Chain<T2>(Func<T, IPromise<T2>> chained, Func<Exception, IPromise<T2>> error);
27
27
28 IPromise<T2> Chain<T2>(Func<T, IPromise<T2>> chained);
28 IPromise<T2> Chain<T2>(Func<T, IPromise<T2>> chained);
29
30 IPromise<T2> Error<T2>(Func<Exception,T2> error);
31
32 IPromise<T2> Cancelled<T2>(Func<T2> handler);
33 }
29 }
34 }
30 }
@@ -1,258 +1,222
1 using System;
1 using System;
2 using System.Diagnostics;
2 using System.Diagnostics;
3
3
4 namespace Implab {
4 namespace Implab {
5 public class Promise : AbstractPromise<Promise.HandlerDescriptor>, IPromise, IDeferred {
5 public class Promise : AbstractPromise<Promise.HandlerDescriptor>, IPromise, IDeferred {
6
6
7 public struct HandlerDescriptor {
7 public struct HandlerDescriptor {
8 readonly Action m_success;
8 readonly Action m_success;
9 readonly Action<Exception> m_error;
9 readonly Action<Exception> m_error;
10 readonly Action m_cancel;
10 readonly Action<Exception> m_cancel;
11 readonly IDeferred m_deferred;
11 readonly IDeferred m_deferred;
12
12
13 public HandlerDescriptor(Action success, Action<Exception> error, Action cancel, IDeferred deferred) {
13 public HandlerDescriptor(Action success, Action<Exception> error, Action<Exception> cancel, IDeferred deferred) {
14 m_success = success;
14 m_success = success;
15 m_error = error;
15 m_error = error;
16 m_cancel = cancel;
16 m_cancel = cancel;
17 m_deferred = deferred;
17 m_deferred = deferred;
18 }
18 }
19
19
20 public void SignalSuccess() {
20 public void SignalSuccess() {
21 if (m_success != null) {
21 if (m_success != null) {
22 try {
22 try {
23 m_success();
23 m_success();
24 if (m_deferred != null)
24 if (m_deferred != null)
25 m_deferred.Resolve();
25 m_deferred.Resolve();
26 } catch (Exception err) {
26 } catch (Exception err) {
27 SignalError(err);
27 SignalError(err);
28 }
28 }
29 }
29 }
30 }
30 }
31
31
32 public void SignalError(Exception err) {
32 public void SignalError(Exception err) {
33 if (m_error != null) {
33 if (m_error != null) {
34 try {
34 try {
35 m_error(err);
35 m_error(err);
36 if (m_deferred != null)
36 if (m_deferred != null)
37 m_deferred.Resolve();
37 m_deferred.Resolve();
38 } catch (Exception err2) {
38 } catch (Exception err2) {
39 if (m_deferred != null)
39 if (m_deferred != null)
40 m_deferred.Reject(err2);
40 m_deferred.Reject(err2);
41 }
41 }
42 } else {
42 } else {
43 if (m_deferred != null)
43 if (m_deferred != null)
44 m_deferred.Reject(err);
44 m_deferred.Reject(err);
45 }
45 }
46 }
46 }
47
47
48 public void SignalCancel() {
48 public void SignalCancel(Exception reason) {
49 if (m_cancel != null) {
49 if (m_cancel != null) {
50 try {
50 try {
51 m_cancel();
51 m_cancel(reason);
52 if (m_deferred != null)
53 m_deferred.Resolve();
54 } catch (Exception err) {
55 SignalError(err);
56 }
57 } else if (reason != null && m_error != null) {
58 try {
59 m_error(new OperationCanceledException("The operation was canceled.", reason));
52 if (m_deferred != null)
60 if (m_deferred != null)
53 m_deferred.Resolve();
61 m_deferred.Resolve();
54 } catch (Exception err) {
62 } catch (Exception err) {
55 SignalError(err);
63 SignalError(err);
56 }
64 }
57 } else {
65 } else {
58 if (m_deferred != null)
66 if (m_deferred != null)
59 m_deferred.Cancel();
67 m_deferred.Cancel(reason);
60 }
68 }
61 }
69 }
62 }
70 }
63
71
64 public void Resolve() {
72 public void Resolve() {
65 BeginSetResult();
73 BeginSetResult();
66 EndSetResult();
74 EndSetResult();
67 }
75 }
68
76
69 public void Reject(Exception error) {
77 public void Reject(Exception error) {
70 SetError(error);
78 SetError(error);
71 }
79 }
72
80
73 #region implemented abstract members of AbstractPromise
81 #region implemented abstract members of AbstractPromise
74
82
75 protected override void SignalSuccess(HandlerDescriptor handler) {
83 protected override void SignalSuccess(HandlerDescriptor handler) {
76 handler.SignalSuccess();
84 handler.SignalSuccess();
77 }
85 }
78
86
79 protected override void SignalError(HandlerDescriptor handler, Exception error) {
87 protected override void SignalError(HandlerDescriptor handler, Exception error) {
80 handler.SignalError(error);
88 handler.SignalError(error);
81 }
89 }
82
90
83 protected override void SignalCancelled(HandlerDescriptor handler) {
91 protected override void SignalCancelled(HandlerDescriptor handler, Exception reason) {
84 handler.SignalCancel();
92 handler.SignalCancel(reason);
85 }
93 }
86
94
87 protected override void Listen(PromiseEventType events, Action handler) {
95 protected override void Listen(PromiseEventType events, Action handler) {
88 AddHandler(new HandlerDescriptor(
96 AddHandler(new HandlerDescriptor(
89 events.HasFlag(PromiseEventType.Success) ? handler : null,
97 events.HasFlag(PromiseEventType.Success) ? handler : null,
90 events.HasFlag(PromiseEventType.Error) ? new Action<Exception>(err => handler()) : null,
98 events.HasFlag(PromiseEventType.Error) ? new Action<Exception>(err => handler()) : null,
91 events.HasFlag(PromiseEventType.Cancelled) ? handler : null,
99 events.HasFlag(PromiseEventType.Cancelled) ? new Action<Exception>(reason => handler()) : null,
92 null
100 null
93 ));
101 ));
94 }
102 }
95
103
96 #endregion
104 #endregion
97
105
98
106
99 public Type PromiseType {
107 public Type PromiseType {
100 get {
108 get {
101 return typeof(void);
109 return typeof(void);
102 }
110 }
103 }
111 }
104
112
105 public IPromise Then(Action success, Action<Exception> error, Action cancel) {
113 public IPromise Then(Action success, Action<Exception> error, Action<Exception> cancel) {
106 var promise = new Promise();
114 var promise = new Promise();
107 if (success != null)
115 if (success != null)
108 promise.On(Cancel, PromiseEventType.Cancelled);
116 promise.On(Cancel, PromiseEventType.Cancelled);
109
117
110 AddHandler(new HandlerDescriptor(success, error, cancel, promise));
118 AddHandler(new HandlerDescriptor(success, error, cancel, promise));
111
119
112 return promise;
120 return promise;
113 }
121 }
114
122
115 public IPromise Then(Action success, Action<Exception> error) {
123 public IPromise Then(Action success, Action<Exception> error) {
116 return Then(success, error, null);
124 return Then(success, error, null);
117 }
125 }
118
126
119 public IPromise Then(Action success) {
127 public IPromise Then(Action success) {
120 return Then(success, null, null);
128 return Then(success, null, null);
121 }
129 }
122
130
123 public IPromise On(Action success, Action<Exception> error, Action cancel) {
131 public IPromise On(Action success, Action<Exception> error, Action<Exception> cancel) {
124 AddHandler(new HandlerDescriptor(success, error, cancel, null));
132 AddHandler(new HandlerDescriptor(success, error, cancel, null));
125 return this;
133 return this;
126 }
134 }
127
135
128 public IPromise On(Action success, Action<Exception> error) {
136 public IPromise On(Action success, Action<Exception> error) {
129 return On(success, error, null);
137 return On(success, error, null);
130 }
138 }
131
139
132 public IPromise On(Action success) {
140 public IPromise On(Action success) {
133 return On(success, null, null);
141 return On(success, null, null);
134 }
142 }
135
143
136 public IPromise On(Action handler, PromiseEventType events) {
144 public IPromise On(Action handler, PromiseEventType events) {
137 return On(
145 return On(
138 events.HasFlag(PromiseEventType.Success) ? handler : null,
146 events.HasFlag(PromiseEventType.Success) ? handler : null,
139 events.HasFlag(PromiseEventType.Error) ? new Action<Exception>(err => handler()) : null,
147 events.HasFlag(PromiseEventType.Error) ? new Action<Exception>(err => handler()) : null,
140 events.HasFlag(PromiseEventType.Cancelled) ? handler : null
148 events.HasFlag(PromiseEventType.Cancelled) ? new Action<Exception>(reason => handler()) : null
141 );
149 );
142 }
150 }
143
151
144 public IPromise<T> Cast<T>() {
152 public IPromise<T> Cast<T>() {
145 throw new InvalidCastException();
153 throw new InvalidCastException();
146 }
154 }
147
155
148 public IPromise Chain(Func<IPromise> chained, Func<Exception, IPromise> error, Func<IPromise> cancel) {
156 public IPromise Chain(Func<IPromise> chained, Func<Exception, IPromise> error, Func<Exception,IPromise> cancel) {
149 var medium = new Promise();
157 var medium = new Promise();
150
158
151 On(
159 On(
152 () => {
160 () => {
153 if (medium.IsCancelled)
161 if (medium.IsCancelled)
154 return;
162 return;
155 if (chained != null)
163 if (chained != null)
156 ConnectPromise(chained(), medium);
164 ConnectPromise(chained(), medium);
157 },
165 },
158 ex => {
166 ex => {
159 if (medium.IsCancelled)
167 if (medium.IsCancelled)
160 return;
168 return;
161 if (error != null) {
169 if (error != null) {
162 try {
170 try {
163 ConnectPromise(error(ex), medium);
171 ConnectPromise(error(ex), medium);
164 } catch (Exception ex2) {
172 } catch (Exception ex2) {
165 medium.Reject(ex2);
173 medium.Reject(ex2);
166 }
174 }
167 } else {
175 } else {
168 medium.Reject(ex);
176 medium.Reject(ex);
169 }
177 }
170 },
178 },
171 () => {
179 reason => {
172 if (medium.IsCancelled)
180 if (medium.IsCancelled)
173 return;
181 return;
174 if (cancel != null)
182 if (cancel != null)
175 ConnectPromise(cancel(), medium);
183 ConnectPromise(cancel(reason), medium);
176 else
184 else
177 medium.Cancel();
185 medium.Cancel(reason);
178 }
186 }
179 );
187 );
180
188
181 if (chained != null)
189 if (chained != null)
182 medium.On(Cancel, PromiseEventType.Cancelled);
190 medium.On(Cancel, PromiseEventType.Cancelled);
183
191
184 return medium;
192 return medium;
185 }
193 }
186
194
187 static void ConnectPromise(IPromise result, Promise medium) {
195 static void ConnectPromise(IPromise result, Promise medium) {
188 if (result != null) {
196 if (result != null) {
189 result.On(
197 result.On(
190 medium.Resolve,
198 medium.Resolve,
191 medium.Reject,
199 medium.Reject,
192 () => medium.Reject(new OperationCanceledException())
200 medium.Cancel
193 );
201 );
194 medium.On(result.Cancel, PromiseEventType.Cancelled);
202 medium.On(null,null,result.Cancel);
195 } else {
203 } else {
196 medium.Reject(
204 medium.Reject(
197 new NullReferenceException(
205 new NullReferenceException(
198 "The chained asynchronous operation returned" +
206 "The chained asynchronous operation returned" +
199 " 'null' where the promise instance is expected"
207 " 'null' where the promise instance is expected"
200 )
208 )
201 );
209 );
202 }
210 }
203 }
211 }
204
212
205 public IPromise Chain(Func<IPromise> chained, Func<Exception, IPromise> error) {
213 public IPromise Chain(Func<IPromise> chained, Func<Exception, IPromise> error) {
206 return Chain(chained, error, null);
214 return Chain(chained, error, null);
207 }
215 }
208
216
209 public IPromise Chain(Func<IPromise> chained) {
217 public IPromise Chain(Func<IPromise> chained) {
210 return Chain(chained, null, null);
218 return Chain(chained, null, null);
211 }
219 }
212
213 public IPromise Error(Action<Exception> error) {
214 var promise = new Promise();
215 On(
216 null,
217 err => {
218 if (error != null)
219 try {
220 error(err);
221 promise.Resolve();
222 } catch (Exception err2) {
223 promise.Reject(err2);
224 }
225 else
226 promise.Reject(err);
227 }
228 );
229
230 return promise;
231 }
232
233 public IPromise Cancelled(Action handler) {
234 var promise = new Promise();
235 On(
236 null,
237 null,
238 () => {
239 if (handler != null) {
240 try {
241 handler();
242 promise.Resolve();
243 } catch (Exception err) {
244 promise.Reject(err);
245 }
246 } else {
247 promise.Cancel();
248 }
249 }
250 );
251
252 return promise;
253 }
254
255
256 }
220 }
257 }
221 }
258
222
@@ -1,192 +1,192
1 using System.Threading;
1 using System.Threading;
2 using System;
2 using System;
3 using Implab.Diagnostics;
3 using Implab.Diagnostics;
4 using System.Collections.Generic;
4 using System.Collections.Generic;
5
5
6
6
7 #if NET_4_5
7 #if NET_4_5
8 using System.Threading.Tasks;
8 using System.Threading.Tasks;
9 #endif
9 #endif
10
10
11 namespace Implab {
11 namespace Implab {
12 public static class PromiseExtensions {
12 public static class PromiseExtensions {
13 public static IPromise<T> DispatchToCurrentContext<T>(this IPromise<T> that) {
13 public static IPromise<T> DispatchToCurrentContext<T>(this IPromise<T> that) {
14 Safe.ArgumentNotNull(that, "that");
14 Safe.ArgumentNotNull(that, "that");
15 var context = SynchronizationContext.Current;
15 var context = SynchronizationContext.Current;
16 if (context == null)
16 if (context == null)
17 return that;
17 return that;
18
18
19 var p = new SyncContextPromise<T>(context);
19 var p = new SyncContextPromise<T>(context);
20 p.On(that.Cancel, PromiseEventType.Cancelled);
20 p.On(that.Cancel, PromiseEventType.Cancelled);
21
21
22 that.On(
22 that.On(
23 p.Resolve,
23 p.Resolve,
24 p.Reject,
24 p.Reject,
25 p.Cancel
25 p.Cancel
26 );
26 );
27 return p;
27 return p;
28 }
28 }
29
29
30 public static IPromise<T> DispatchToContext<T>(this IPromise<T> that, SynchronizationContext context) {
30 public static IPromise<T> DispatchToContext<T>(this IPromise<T> that, SynchronizationContext context) {
31 Safe.ArgumentNotNull(that, "that");
31 Safe.ArgumentNotNull(that, "that");
32 Safe.ArgumentNotNull(context, "context");
32 Safe.ArgumentNotNull(context, "context");
33
33
34 var p = new SyncContextPromise<T>(context);
34 var p = new SyncContextPromise<T>(context);
35 p.On(that.Cancel, PromiseEventType.Cancelled);
35 p.On(that.Cancel, PromiseEventType.Cancelled);
36
36
37
37
38 that.On(
38 that.On(
39 p.Resolve,
39 p.Resolve,
40 p.Reject,
40 p.Reject,
41 p.Cancel
41 p.Cancel
42 );
42 );
43 return p;
43 return p;
44 }
44 }
45
45
46 /// <summary>
46 /// <summary>
47 /// Ensures the dispatched.
47 /// Ensures the dispatched.
48 /// </summary>
48 /// </summary>
49 /// <returns>The dispatched.</returns>
49 /// <returns>The dispatched.</returns>
50 /// <param name="that">That.</param>
50 /// <param name="that">That.</param>
51 /// <param name="head">Head.</param>
51 /// <param name="head">Head.</param>
52 /// <param name="cleanup">Cleanup.</param>
52 /// <param name="cleanup">Cleanup.</param>
53 /// <typeparam name="TPromise">The 1st type parameter.</typeparam>
53 /// <typeparam name="TPromise">The 1st type parameter.</typeparam>
54 /// <typeparam name="T">The 2nd type parameter.</typeparam>
54 /// <typeparam name="T">The 2nd type parameter.</typeparam>
55 public static TPromise EnsureDispatched<TPromise,T>(this TPromise that, IPromise<T> head, Action<T> cleanup) where TPromise : IPromise{
55 public static TPromise EnsureDispatched<TPromise,T>(this TPromise that, IPromise<T> head, Action<T> cleanup) where TPromise : IPromise{
56 Safe.ArgumentNotNull(that, "that");
56 Safe.ArgumentNotNull(that, "that");
57 Safe.ArgumentNotNull(head, "head");
57 Safe.ArgumentNotNull(head, "head");
58
58
59 that.On(null,null,() => head.On(cleanup));
59 that.On(() => head.On(cleanup), PromiseEventType.Cancelled);
60
60
61 return that;
61 return that;
62 }
62 }
63
63
64 public static AsyncCallback AsyncCallback<T>(this Promise<T> that, Func<IAsyncResult,T> callback) {
64 public static AsyncCallback AsyncCallback<T>(this Promise<T> that, Func<IAsyncResult,T> callback) {
65 Safe.ArgumentNotNull(that, "that");
65 Safe.ArgumentNotNull(that, "that");
66 Safe.ArgumentNotNull(callback, "callback");
66 Safe.ArgumentNotNull(callback, "callback");
67 var op = TraceContext.Instance.CurrentOperation;
67 var op = TraceContext.Instance.CurrentOperation;
68 return ar => {
68 return ar => {
69 TraceContext.Instance.EnterLogicalOperation(op,false);
69 TraceContext.Instance.EnterLogicalOperation(op,false);
70 try {
70 try {
71 that.Resolve(callback(ar));
71 that.Resolve(callback(ar));
72 } catch (Exception err) {
72 } catch (Exception err) {
73 that.Reject(err);
73 that.Reject(err);
74 } finally {
74 } finally {
75 TraceContext.Instance.Leave();
75 TraceContext.Instance.Leave();
76 }
76 }
77 };
77 };
78 }
78 }
79
79
80 static void CancelCallback(object cookie) {
80 static void CancelCallback(object cookie) {
81 ((ICancellable)cookie).Cancel();
81 ((ICancellable)cookie).Cancel();
82 }
82 }
83
83
84 /// <summary>
84 /// <summary>
85 /// Cancells promise after the specified timeout is elapsed.
85 /// Cancells promise after the specified timeout is elapsed.
86 /// </summary>
86 /// </summary>
87 /// <param name="that">The promise to cancel on timeout.</param>
87 /// <param name="that">The promise to cancel on timeout.</param>
88 /// <param name="milliseconds">The timeout in milliseconds.</param>
88 /// <param name="milliseconds">The timeout in milliseconds.</param>
89 /// <typeparam name="TPromise">The 1st type parameter.</typeparam>
89 /// <typeparam name="TPromise">The 1st type parameter.</typeparam>
90 public static TPromise Timeout<TPromise>(this TPromise that, int milliseconds) where TPromise : IPromise {
90 public static TPromise Timeout<TPromise>(this TPromise that, int milliseconds) where TPromise : IPromise {
91 Safe.ArgumentNotNull(that, "that");
91 Safe.ArgumentNotNull(that, "that");
92 var timer = new Timer(CancelCallback, that, milliseconds, -1);
92 var timer = new Timer(CancelCallback, that, milliseconds, -1);
93 that.On(timer.Dispose, PromiseEventType.All);
93 that.On(timer.Dispose, PromiseEventType.All);
94 return that;
94 return that;
95 }
95 }
96
96
97 public static IPromise Bundle(this ICollection<IPromise> that) {
97 public static IPromise Bundle(this ICollection<IPromise> that) {
98 Safe.ArgumentNotNull(that, "that");
98 Safe.ArgumentNotNull(that, "that");
99
99
100 int count = that.Count;
100 int count = that.Count;
101 int errors = 0;
101 int errors = 0;
102 var medium = new Promise();
102 var medium = new Promise();
103
103
104 if (count == 0) {
104 if (count == 0) {
105 medium.Resolve();
105 medium.Resolve();
106 return medium;
106 return medium;
107 }
107 }
108
108
109 medium.On(() => {
109 medium.On(() => {
110 foreach(var p2 in that)
110 foreach(var p2 in that)
111 p2.Cancel();
111 p2.Cancel();
112 }, PromiseEventType.ErrorOrCancel);
112 }, PromiseEventType.ErrorOrCancel);
113
113
114 foreach (var p in that)
114 foreach (var p in that)
115 p.On(
115 p.On(
116 () => {
116 () => {
117 if (Interlocked.Decrement(ref count) == 0)
117 if (Interlocked.Decrement(ref count) == 0)
118 medium.Resolve();
118 medium.Resolve();
119 },
119 },
120 error => {
120 error => {
121 if (Interlocked.Increment(ref errors) == 1)
121 if (Interlocked.Increment(ref errors) == 1)
122 medium.Reject(
122 medium.Reject(
123 new Exception("The dependency promise is failed", error)
123 new Exception("The dependency promise is failed", error)
124 );
124 );
125 },
125 },
126 () => {
126 reason => {
127 if (Interlocked.Increment(ref errors) == 1)
127 if (Interlocked.Increment(ref errors) == 1)
128 medium.Reject(
128 medium.Cancel(
129 new Exception("The dependency promise is cancelled")
129 new Exception("The dependency promise is cancelled")
130 );
130 );
131 }
131 }
132 );
132 );
133
133
134 return medium;
134 return medium;
135 }
135 }
136
136
137 public static IPromise<T[]> Bundle<T>(this ICollection<IPromise<T>> that) {
137 public static IPromise<T[]> Bundle<T>(this ICollection<IPromise<T>> that) {
138 Safe.ArgumentNotNull(that, "that");
138 Safe.ArgumentNotNull(that, "that");
139
139
140 int count = that.Count;
140 int count = that.Count;
141 int errors = 0;
141 int errors = 0;
142 var medium = new Promise<T[]>();
142 var medium = new Promise<T[]>();
143 var results = new T[that.Count];
143 var results = new T[that.Count];
144
144
145 medium.On(() => {
145 medium.On(() => {
146 foreach(var p2 in that)
146 foreach(var p2 in that)
147 p2.Cancel();
147 p2.Cancel();
148 }, PromiseEventType.ErrorOrCancel);
148 }, PromiseEventType.ErrorOrCancel);
149
149
150 int i = 0;
150 int i = 0;
151 foreach (var p in that) {
151 foreach (var p in that) {
152 var idx = i;
152 var idx = i;
153 p.On(
153 p.On(
154 x => {
154 x => {
155 results[idx] = x;
155 results[idx] = x;
156 if (Interlocked.Decrement(ref count) == 0)
156 if (Interlocked.Decrement(ref count) == 0)
157 medium.Resolve(results);
157 medium.Resolve(results);
158 },
158 },
159 error => {
159 error => {
160 if (Interlocked.Increment(ref errors) == 1)
160 if (Interlocked.Increment(ref errors) == 1)
161 medium.Reject(
161 medium.Reject(
162 new Exception("The dependency promise is failed", error)
162 new Exception("The dependency promise is failed", error)
163 );
163 );
164 },
164 },
165 () => {
165 reason => {
166 if (Interlocked.Increment(ref errors) == 1)
166 if (Interlocked.Increment(ref errors) == 1)
167 medium.Reject(
167 medium.Cancel(
168 new Exception("The dependency promise is cancelled")
168 new Exception("The dependency promise is cancelled", reason)
169 );
169 );
170 }
170 }
171 );
171 );
172 i++;
172 i++;
173 }
173 }
174
174
175 return medium;
175 return medium;
176 }
176 }
177
177
178 #if NET_4_5
178 #if NET_4_5
179
179
180 public static Task<T> GetTask<T>(this IPromise<T> that) {
180 public static Task<T> GetTask<T>(this IPromise<T> that) {
181 Safe.ArgumentNotNull(that, "that");
181 Safe.ArgumentNotNull(that, "that");
182 var tcs = new TaskCompletionSource<T>();
182 var tcs = new TaskCompletionSource<T>();
183
183
184 that.On(tcs.SetResult, tcs.SetException, tcs.SetCanceled);
184 that.On(tcs.SetResult, tcs.SetException, r => tcs.SetCanceled());
185
185
186 return tcs.Task;
186 return tcs.Task;
187 }
187 }
188
188
189 #endif
189 #endif
190 }
190 }
191 }
191 }
192
192
@@ -1,628 +1,574
1 using System;
1 using System;
2 using System.Diagnostics;
2 using System.Diagnostics;
3
3
4 namespace Implab {
4 namespace Implab {
5
5
6 /// <summary>
6 /// <summary>
7 /// Класс для асинхронного получСния Ρ€Π΅Π·ΡƒΠ»ΡŒΡ‚Π°Ρ‚ΠΎΠ². Π’Π°ΠΊ Π½Π°Π·Ρ‹Π²Π°Π΅ΠΌΠΎΠ΅ "ΠΎΠ±Π΅Ρ‰Π°Π½ΠΈΠ΅".
7 /// Класс для асинхронного получСния Ρ€Π΅Π·ΡƒΠ»ΡŒΡ‚Π°Ρ‚ΠΎΠ². Π’Π°ΠΊ Π½Π°Π·Ρ‹Π²Π°Π΅ΠΌΠΎΠ΅ "ΠΎΠ±Π΅Ρ‰Π°Π½ΠΈΠ΅".
8 /// </summary>
8 /// </summary>
9 /// <typeparam name="T">Π’ΠΈΠΏ ΠΏΠΎΠ»ΡƒΡ‡Π°Π΅ΠΌΠΎΠ³ΠΎ Ρ€Π΅Π·ΡƒΠ»ΡŒΡ‚Π°Ρ‚Π°</typeparam>
9 /// <typeparam name="T">Π’ΠΈΠΏ ΠΏΠΎΠ»ΡƒΡ‡Π°Π΅ΠΌΠΎΠ³ΠΎ Ρ€Π΅Π·ΡƒΠ»ΡŒΡ‚Π°Ρ‚Π°</typeparam>
10 /// <remarks>
10 /// <remarks>
11 /// <para>БСрвис ΠΏΡ€ΠΈ ΠΎΠ±Ρ€Π°Ρ‰Π΅Π½ΠΈΠΈ ΠΊ Π΅Π³ΠΎ ΠΌΠ΅Ρ‚ΠΎΠ΄Ρƒ Π΄Π°Π΅Ρ‚ ΠΎΠ±Π΅Ρ‰Π°ΠΈΠ½ΠΈΠ΅ ΠΎ Π²Ρ‹ΠΏΠΎΠ»Π½Π΅Π½ΠΈΠΈ ΠΎΠΏΠ΅Ρ€Π°Ρ†ΠΈΠΈ,
11 /// <para>БСрвис ΠΏΡ€ΠΈ ΠΎΠ±Ρ€Π°Ρ‰Π΅Π½ΠΈΠΈ ΠΊ Π΅Π³ΠΎ ΠΌΠ΅Ρ‚ΠΎΠ΄Ρƒ Π΄Π°Π΅Ρ‚ ΠΎΠ±Π΅Ρ‰Π°ΠΈΠ½ΠΈΠ΅ ΠΎ Π²Ρ‹ΠΏΠΎΠ»Π½Π΅Π½ΠΈΠΈ ΠΎΠΏΠ΅Ρ€Π°Ρ†ΠΈΠΈ,
12 /// ΠΊΠ»ΠΈΠ΅Π½Ρ‚ ΠΏΠΎΠ»ΡƒΡ‡ΠΈΠ² Ρ‚Π°ΠΊΠΎΠ΅ ΠΎΠ±Π΅Ρ‰Π°Π½ΠΈΠ΅ ΠΌΠΎΠΆΠ΅Ρ‚ ΡƒΡΡ‚Π°Π½ΠΎΠ²ΠΈΡ‚ΡŒ ряд ΠΎΠ±Ρ€Π°Ρ‚Π½Ρ‹Ρ… Π²Ρ‹Π·ΠΎΠ²ΠΎ для получСния
12 /// ΠΊΠ»ΠΈΠ΅Π½Ρ‚ ΠΏΠΎΠ»ΡƒΡ‡ΠΈΠ² Ρ‚Π°ΠΊΠΎΠ΅ ΠΎΠ±Π΅Ρ‰Π°Π½ΠΈΠ΅ ΠΌΠΎΠΆΠ΅Ρ‚ ΡƒΡΡ‚Π°Π½ΠΎΠ²ΠΈΡ‚ΡŒ ряд ΠΎΠ±Ρ€Π°Ρ‚Π½Ρ‹Ρ… Π²Ρ‹Π·ΠΎΠ²ΠΎ для получСния
13 /// событий выполнСния обСщания, Ρ‚ΠΎΠ΅ΡΡ‚ΡŒ Π·Π°Π²Π΅Ρ€ΡˆΠ΅Π½ΠΈΡ ΠΎΠΏΠ΅Ρ€Π°Ρ†ΠΈΠΈ ΠΈ прСдоставлСнии Ρ€Π΅Π·ΡƒΠ»ΡŒΡ‚Π°Ρ‚ΠΎΠ².</para>
13 /// событий выполнСния обСщания, Ρ‚ΠΎΠ΅ΡΡ‚ΡŒ Π·Π°Π²Π΅Ρ€ΡˆΠ΅Π½ΠΈΡ ΠΎΠΏΠ΅Ρ€Π°Ρ†ΠΈΠΈ ΠΈ прСдоставлСнии Ρ€Π΅Π·ΡƒΠ»ΡŒΡ‚Π°Ρ‚ΠΎΠ².</para>
14 /// <para>
14 /// <para>
15 /// ΠžΠ±Π΅Ρ‰Π΅Π½ΠΈΠ΅ ΠΌΠΎΠΆΠ΅Ρ‚ Π±Ρ‹Ρ‚ΡŒ ΠΊΠ°ΠΊ Π²Ρ‹ΠΏΠΎΠ»Π½Π΅Π½ΠΎ, Ρ‚Π°ΠΊ ΠΈ Π²Ρ‹ΠΏΠΎΠ»Π½Π΅Π½ΠΎ с ошибкой. Для подписки Π½Π°
15 /// ΠžΠ±Π΅Ρ‰Π΅Π½ΠΈΠ΅ ΠΌΠΎΠΆΠ΅Ρ‚ Π±Ρ‹Ρ‚ΡŒ ΠΊΠ°ΠΊ Π²Ρ‹ΠΏΠΎΠ»Π½Π΅Π½ΠΎ, Ρ‚Π°ΠΊ ΠΈ Π²Ρ‹ΠΏΠΎΠ»Π½Π΅Π½ΠΎ с ошибкой. Для подписки Π½Π°
16 /// Π΄Π°Π½Π½Ρ‹Π΅ события ΠΊΠ»ΠΈΠ΅Π½Ρ‚ Π΄ΠΎΠ»ΠΆΠ΅Π½ ΠΈΡΠΏΠΎΠ»ΡŒΠ·ΠΎΠ²Π°Ρ‚ΡŒ ΠΌΠ΅Ρ‚ΠΎΠ΄Ρ‹ <c>Then</c>.
16 /// Π΄Π°Π½Π½Ρ‹Π΅ события ΠΊΠ»ΠΈΠ΅Π½Ρ‚ Π΄ΠΎΠ»ΠΆΠ΅Π½ ΠΈΡΠΏΠΎΠ»ΡŒΠ·ΠΎΠ²Π°Ρ‚ΡŒ ΠΌΠ΅Ρ‚ΠΎΠ΄Ρ‹ <c>Then</c>.
17 /// </para>
17 /// </para>
18 /// <para>
18 /// <para>
19 /// БСрвис, Π² свою ΠΎΡ‡Π΅Ρ€Π΅Π΄ΡŒ, ΠΏΠΎ ΠΎΠΊΠΎΠ½Ρ‡Π°Π½ΠΈΡŽ выполнСния ΠΎΠΏΠ΅Ρ€Π°Ρ†ΠΈΠΈ (Π²ΠΎΠ·ΠΌΠΎΠΆΠ½ΠΎ с ошибкой),
19 /// БСрвис, Π² свою ΠΎΡ‡Π΅Ρ€Π΅Π΄ΡŒ, ΠΏΠΎ ΠΎΠΊΠΎΠ½Ρ‡Π°Π½ΠΈΡŽ выполнСния ΠΎΠΏΠ΅Ρ€Π°Ρ†ΠΈΠΈ (Π²ΠΎΠ·ΠΌΠΎΠΆΠ½ΠΎ с ошибкой),
20 /// ΠΈΡΠΏΠΎΠ»ΡŒΠ·ΡƒΠ΅Ρ‚ ΠΌΠ΅Ρ‚ΠΎΠ΄Ρ‹ <c>Resolve</c> Π»ΠΈΠ±ΠΎ <c>Reject</c> для оповСщСния ΠΊΠ»ΠΈΠ΅Ρ‚Π½Π° ΠΎ
20 /// ΠΈΡΠΏΠΎΠ»ΡŒΠ·ΡƒΠ΅Ρ‚ ΠΌΠ΅Ρ‚ΠΎΠ΄Ρ‹ <c>Resolve</c> Π»ΠΈΠ±ΠΎ <c>Reject</c> для оповСщСния ΠΊΠ»ΠΈΠ΅Ρ‚Π½Π° ΠΎ
21 /// Π²Ρ‹ΠΏΠΎΠ»Π½Π΅Π½ΠΈΠΈ обСщания.
21 /// Π²Ρ‹ΠΏΠΎΠ»Π½Π΅Π½ΠΈΠΈ обСщания.
22 /// </para>
22 /// </para>
23 /// <para>
23 /// <para>
24 /// Если сСрвСр успСл Π²Ρ‹ΠΏΠΎΠ»Π½ΠΈΡ‚ΡŒ ΠΎΠ±Π΅Ρ‰Π°Π½ΠΈΠ΅ Π΅Ρ‰Π΅ Π΄ΠΎ Ρ‚ΠΎΠ³ΠΎ, ΠΊΠ°ΠΊ ΠΊΠ»ΠΈΠ΅Π½Ρ‚ Π½Π° Π½Π΅Π³ΠΎ подписался,
24 /// Если сСрвСр успСл Π²Ρ‹ΠΏΠΎΠ»Π½ΠΈΡ‚ΡŒ ΠΎΠ±Π΅Ρ‰Π°Π½ΠΈΠ΅ Π΅Ρ‰Π΅ Π΄ΠΎ Ρ‚ΠΎΠ³ΠΎ, ΠΊΠ°ΠΊ ΠΊΠ»ΠΈΠ΅Π½Ρ‚ Π½Π° Π½Π΅Π³ΠΎ подписался,
25 /// Ρ‚ΠΎ Π² ΠΌΠΎΠΌΠ΅Π½Ρ‚ подписки ΠΊΠ»ΠΈΠ΅Π½Ρ‚Π° Π±ΡƒΠ΄ΡƒΡ‚ Π²Ρ‹Π·Π²Π°Π½Ρ‹ ΡΠΎΠΎΡ‚Π²Π΅Ρ‚ΡΠ²ΡƒΡŽΡ‰ΠΈΠ΅ события Π² синхронном
25 /// Ρ‚ΠΎ Π² ΠΌΠΎΠΌΠ΅Π½Ρ‚ подписки ΠΊΠ»ΠΈΠ΅Π½Ρ‚Π° Π±ΡƒΠ΄ΡƒΡ‚ Π²Ρ‹Π·Π²Π°Π½Ρ‹ ΡΠΎΠΎΡ‚Π²Π΅Ρ‚ΡΠ²ΡƒΡŽΡ‰ΠΈΠ΅ события Π² синхронном
26 /// Ρ€Π΅ΠΆΠΈΠΌΠ΅ ΠΈ ΠΊΠ»ΠΈΠ΅Π½Ρ‚ Π±ΡƒΠ΄Π΅Ρ‚ ΠΎΠΏΠΎΠ²Π΅Ρ‰Π΅Π½ Π² любом случаС. Π˜Π½Π°Ρ‡Π΅, ΠΎΠ±Ρ€Π°Π±ΠΎΡ‚Ρ‡ΠΈΠΊΠΈ Π΄ΠΎΠ±Π°Π²Π»ΡΡŽΡ‚ΡΡ Π²
26 /// Ρ€Π΅ΠΆΠΈΠΌΠ΅ ΠΈ ΠΊΠ»ΠΈΠ΅Π½Ρ‚ Π±ΡƒΠ΄Π΅Ρ‚ ΠΎΠΏΠΎΠ²Π΅Ρ‰Π΅Π½ Π² любом случаС. Π˜Π½Π°Ρ‡Π΅, ΠΎΠ±Ρ€Π°Π±ΠΎΡ‚Ρ‡ΠΈΠΊΠΈ Π΄ΠΎΠ±Π°Π²Π»ΡΡŽΡ‚ΡΡ Π²
27 /// список Π² порядкС подписания ΠΈ Π² этом ΠΆΠ΅ порядкС ΠΎΠ½ΠΈ Π±ΡƒΠ΄ΡƒΡ‚ Π²Ρ‹Π·Π²Π°Π½Ρ‹ ΠΏΡ€ΠΈ Π²Ρ‹ΠΏΠΎΠ»Π½Π΅Π½ΠΈΠΈ
27 /// список Π² порядкС подписания ΠΈ Π² этом ΠΆΠ΅ порядкС ΠΎΠ½ΠΈ Π±ΡƒΠ΄ΡƒΡ‚ Π²Ρ‹Π·Π²Π°Π½Ρ‹ ΠΏΡ€ΠΈ Π²Ρ‹ΠΏΠΎΠ»Π½Π΅Π½ΠΈΠΈ
28 /// обСщания.
28 /// обСщания.
29 /// </para>
29 /// </para>
30 /// <para>
30 /// <para>
31 /// ΠžΠ±Ρ€Π°Π±Π°Ρ‚Ρ‹Π²Π°Ρ Ρ€Π΅Π·ΡƒΠ»ΡŒΡ‚Π°Ρ‚Ρ‹ обСщания ΠΌΠΎΠΆΠ½ΠΎ ΠΏΡ€Π΅ΠΎΠ±Ρ€Π°Π·ΠΎΠ²Ρ‹Π²Π°Ρ‚ΡŒ Ρ€Π΅Π·ΡƒΠ»ΡŒΡ‚Π°Ρ‚Ρ‹ Π»ΠΈΠ±ΠΎ ΠΈΠ½ΠΈΡ†ΠΈΠΈΡ€ΠΎΠ²Π°Ρ‚ΡŒ
31 /// ΠžΠ±Ρ€Π°Π±Π°Ρ‚Ρ‹Π²Π°Ρ Ρ€Π΅Π·ΡƒΠ»ΡŒΡ‚Π°Ρ‚Ρ‹ обСщания ΠΌΠΎΠΆΠ½ΠΎ ΠΏΡ€Π΅ΠΎΠ±Ρ€Π°Π·ΠΎΠ²Ρ‹Π²Π°Ρ‚ΡŒ Ρ€Π΅Π·ΡƒΠ»ΡŒΡ‚Π°Ρ‚Ρ‹ Π»ΠΈΠ±ΠΎ ΠΈΠ½ΠΈΡ†ΠΈΠΈΡ€ΠΎΠ²Π°Ρ‚ΡŒ
32 /// связанныС асинхронныС ΠΎΠΏΠ΅Ρ€Π°Ρ†ΠΈΠΈ, ΠΊΠΎΡ‚ΠΎΡ€Ρ‹Π΅ Ρ‚Π°ΠΊΠΆΠ΅ Π²ΠΎΠ·Π²Ρ€Π°Ρ‰Π°ΡŽΡ‚ обСщания. Для этого слСдуСт
32 /// связанныС асинхронныС ΠΎΠΏΠ΅Ρ€Π°Ρ†ΠΈΠΈ, ΠΊΠΎΡ‚ΠΎΡ€Ρ‹Π΅ Ρ‚Π°ΠΊΠΆΠ΅ Π²ΠΎΠ·Π²Ρ€Π°Ρ‰Π°ΡŽΡ‚ обСщания. Для этого слСдуСт
33 /// ΠΈΡΠΏΠΎΠ»ΡŒΠ·ΠΎΠ²Π°Ρ‚ΡŒ ΡΠΎΠΎΡ‚Π²Π΅Ρ‚ΡΡ‚Π²ΡƒΡŽΡ‰ΡƒΡŽ Ρ„ΠΎΡ€ΠΌΡƒ ΠΌΠ΅Ρ‚ΠΎΠ΄Π΅ <c>Then</c>.
33 /// ΠΈΡΠΏΠΎΠ»ΡŒΠ·ΠΎΠ²Π°Ρ‚ΡŒ ΡΠΎΠΎΡ‚Π²Π΅Ρ‚ΡΡ‚Π²ΡƒΡŽΡ‰ΡƒΡŽ Ρ„ΠΎΡ€ΠΌΡƒ ΠΌΠ΅Ρ‚ΠΎΠ΄Π΅ <c>Then</c>.
34 /// </para>
34 /// </para>
35 /// <para>
35 /// <para>
36 /// Π’Π°ΠΊΠΆΠ΅ Ρ…ΠΎΡ€ΠΎΡˆΠΈΠΌ ΠΏΡ€Π°Π²ΠΈΠ»ΠΎΠΌ являСтся Ρ‚ΠΎ, Ρ‡Ρ‚ΠΎ <c>Resolve</c> ΠΈ <c>Reject</c> Π΄ΠΎΠ»ΠΆΠ΅Π½ Π²Ρ‹Π·Ρ‹Π²Π°Ρ‚ΡŒ
36 /// Π’Π°ΠΊΠΆΠ΅ Ρ…ΠΎΡ€ΠΎΡˆΠΈΠΌ ΠΏΡ€Π°Π²ΠΈΠ»ΠΎΠΌ являСтся Ρ‚ΠΎ, Ρ‡Ρ‚ΠΎ <c>Resolve</c> ΠΈ <c>Reject</c> Π΄ΠΎΠ»ΠΆΠ΅Π½ Π²Ρ‹Π·Ρ‹Π²Π°Ρ‚ΡŒ
37 /// Ρ‚ΠΎΠ»ΡŒΠΊΠΎ ΠΈΠ½ΠΈΡ†ΠΈΠ°Ρ‚ΠΎΡ€ обСщания ΠΈΠ½Π°Ρ‡Π΅ ΠΌΠΎΠ³ΡƒΡ‚ Π²ΠΎΠ·Π½ΠΈΠΊΠ½ΡƒΡ‚ΡŒ противорСчия.
37 /// Ρ‚ΠΎΠ»ΡŒΠΊΠΎ ΠΈΠ½ΠΈΡ†ΠΈΠ°Ρ‚ΠΎΡ€ обСщания ΠΈΠ½Π°Ρ‡Π΅ ΠΌΠΎΠ³ΡƒΡ‚ Π²ΠΎΠ·Π½ΠΈΠΊΠ½ΡƒΡ‚ΡŒ противорСчия.
38 /// </para>
38 /// </para>
39 /// </remarks>
39 /// </remarks>
40 public class Promise<T> : AbstractPromise<IDeferred<T>>, IPromise<T>, IDeferred<T> {
40 public class Promise<T> : AbstractPromise<IDeferred<T>>, IPromise<T>, IDeferred<T> {
41
41
42 class StubDeferred : IDeferred<T> {
42 class StubDeferred : IDeferred, IDeferred<T> {
43 public static readonly StubDeferred instance = new StubDeferred();
43 public static readonly StubDeferred instance = new StubDeferred();
44
44
45 StubDeferred() {
45 StubDeferred() {
46 }
46 }
47
47
48 #region IDeferred implementation
48 #region IDeferred implementation
49
49
50 public void Resolve(T value) {
50 public void Resolve(T value) {
51 }
51 }
52
52
53 public void Resolve() {
54 }
55
53 public void Reject(Exception error) {
56 public void Reject(Exception error) {
54 }
57 }
55
58
56 #endregion
59 #endregion
57
60
58 #region ICancellable implementation
61 #region ICancellable implementation
59
62
60 public void Cancel() {
63 public void Cancel() {
61 }
64 }
62
65
66 public void Cancel(Exception reason) {
67 }
68
63 #endregion
69 #endregion
64
70
65
71
66 }
72 }
67
73
68 class RemapDescriptor<T2> : IDeferred<T> {
74 class RemapDescriptor<T2> : IDeferred<T> {
69 readonly Func<T,T2> m_remap;
75 readonly Func<T,T2> m_remap;
70 readonly Func<Exception,T2> m_failed;
76 readonly Func<Exception, T2> m_failed;
71 readonly Func<T2> m_cancel;
77 readonly Func<Exception, T2> m_cancel;
72 readonly IDeferred<T2> m_deferred;
78 readonly IDeferred<T2> m_deferred;
73
79
74 public RemapDescriptor(Func<T,T2> remap, Func<Exception,T2> failed, Func<T2> cancel, IDeferred<T2> deferred ) {
80 public RemapDescriptor(Func<T,T2> remap, Func<Exception,T2> failed, Func<Exception, T2> cancel, IDeferred<T2> deferred ) {
75 Debug.Assert(deferred != null);
81 Debug.Assert(deferred != null);
76 m_remap = remap;
82 m_remap = remap;
77 m_failed = failed;
83 m_failed = failed;
78 m_cancel = cancel;
84 m_cancel = cancel;
79 m_deferred = deferred;
85 m_deferred = deferred;
80 }
86 }
81
87
82
88
83
89
84 #region IDeferred implementation
90 #region IDeferred implementation
85
91
86 public void Resolve(T value) {
92 public void Resolve(T value) {
87 if (m_remap != null) {
93 if (m_remap != null) {
88 try {
94 try {
89 m_deferred.Resolve(m_remap(value));
95 m_deferred.Resolve(m_remap(value));
90 } catch (Exception ex) {
96 } catch (Exception ex) {
91 Reject(ex);
97 Reject(ex);
92 }
98 }
93 }
99 }
94 }
100 }
95
101
96 public void Reject(Exception error) {
102 public void Reject(Exception error) {
97 if (m_failed != null) {
103 if (m_failed != null) {
98 try {
104 try {
99 m_deferred.Resolve(m_failed(error));
105 m_deferred.Resolve(m_failed(error));
100 } catch (Exception ex) {
106 } catch (Exception ex) {
101 m_deferred.Reject(ex);
107 m_deferred.Reject(ex);
102 }
108 }
103 } else {
109 } else {
104 m_deferred.Reject(error);
110 m_deferred.Reject(error);
105 }
111 }
106 }
112 }
107
113
108
114
109 #endregion
115 #endregion
110
116
111 #region ICancellable implementation
117 #region ICancellable implementation
112
118
113 public void Cancel() {
119 public void Cancel(Exception reason) {
114 if (m_cancel != null) {
120 if (m_cancel != null) {
115 try {
121 try {
116 m_deferred.Resolve(m_cancel());
122 m_deferred.Resolve(m_cancel(reason));
117 } catch (Exception ex) {
123 } catch (Exception ex) {
118 Reject(ex);
124 Reject(ex);
119 }
125 }
120 } else {
126 } else {
121 m_deferred.Cancel();
127 m_deferred.Cancel(reason);
122 }
128 }
123 }
129 }
124
130
131 public void Cancel() {
132 Cancel(null);
133 }
125 #endregion
134 #endregion
126 }
135 }
127
136
128 class ListenerDescriptor : IDeferred<T> {
137 class ListenerDescriptor : IDeferred<T> {
129 readonly Action m_handler;
138 readonly Action m_handler;
130 readonly PromiseEventType m_events;
139 readonly PromiseEventType m_events;
131
140
132 public ListenerDescriptor(Action handler, PromiseEventType events) {
141 public ListenerDescriptor(Action handler, PromiseEventType events) {
133 Debug.Assert(handler != null);
142 Debug.Assert(handler != null);
134
143
135 m_handler = handler;
144 m_handler = handler;
136 m_events = events;
145 m_events = events;
137 }
146 }
138
147
139 #region IDeferred implementation
148 #region IDeferred implementation
140
149
141 public void Resolve(T value) {
150 public void Resolve(T value) {
142 if (m_events.HasFlag(PromiseEventType.Success)) {
151 if (m_events.HasFlag(PromiseEventType.Success)) {
143 try {
152 try {
144 m_handler();
153 m_handler();
145 // Analysis disable once EmptyGeneralCatchClause
154 // Analysis disable once EmptyGeneralCatchClause
146 } catch {
155 } catch {
147 }
156 }
148 }
157 }
149 }
158 }
150
159
151 public void Reject(Exception error) {
160 public void Reject(Exception error) {
152 if (m_events.HasFlag(PromiseEventType.Error)){
161 if (m_events.HasFlag(PromiseEventType.Error)){
153 try {
162 try {
154 m_handler();
163 m_handler();
155 // Analysis disable once EmptyGeneralCatchClause
164 // Analysis disable once EmptyGeneralCatchClause
156 } catch {
165 } catch {
157 }
166 }
158 }
167 }
159 }
168 }
160
169
161 #endregion
170 #endregion
162
171
163 #region ICancellable implementation
172 #region ICancellable implementation
164
173
165 public void Cancel() {
174 public void Cancel() {
175 Cancel(null);
176 }
177
178 public void Cancel(Exception reason) {
166 if (m_events.HasFlag(PromiseEventType.Cancelled)){
179 if (m_events.HasFlag(PromiseEventType.Cancelled)){
167 try {
180 try {
168 m_handler();
181 m_handler();
169 // Analysis disable once EmptyGeneralCatchClause
182 // Analysis disable once EmptyGeneralCatchClause
170 } catch {
183 } catch {
171 }
184 }
172 }
185 }
173 }
186 }
174
187
175 #endregion
188 #endregion
176 }
189 }
177
190
178 class ValueEventDescriptor : IDeferred<T> {
191 class ValueEventDescriptor : IDeferred<T> {
179 readonly Action<T> m_success;
192 readonly Action<T> m_success;
180 readonly Action<Exception> m_failed;
193 readonly Action<Exception> m_failed;
181 readonly Action m_cancelled;
194 readonly Action<Exception> m_cancelled;
182 readonly IDeferred<T> m_deferred;
195 readonly IDeferred<T> m_deferred;
183
196
184 public ValueEventDescriptor(Action<T> success, Action<Exception> failed, Action cancelled, IDeferred<T> deferred) {
197 public ValueEventDescriptor(Action<T> success, Action<Exception> failed, Action<Exception> cancelled, IDeferred<T> deferred) {
185 Debug.Assert(deferred != null);
198 Debug.Assert(deferred != null);
186
199
187 m_success = success;
200 m_success = success;
188 m_failed = failed;
201 m_failed = failed;
189 m_cancelled = cancelled;
202 m_cancelled = cancelled;
190 m_deferred = deferred;
203 m_deferred = deferred;
191 }
204 }
192
205
193 #region IDeferred implementation
206 #region IDeferred implementation
194
207
195 public void Resolve(T value) {
208 public void Resolve(T value) {
196 if (m_success != null) {
209 if (m_success != null) {
197 try {
210 try {
198 m_success(value);
211 m_success(value);
199 m_deferred.Resolve(value);
212 m_deferred.Resolve(value);
200 } catch (Exception ex) {
213 } catch (Exception ex) {
201 Reject(ex);
214 Reject(ex);
202 }
215 }
203 }
216 }
204 }
217 }
205
218
206 public void Reject(Exception error) {
219 public void Reject(Exception error) {
207 if (m_failed != null) {
220 if (m_failed != null) {
208 try {
221 try {
209 m_failed(error);
222 m_failed(error);
210 m_deferred.Resolve(default(T));
223 m_deferred.Resolve(default(T));
211 } catch(Exception ex) {
224 } catch(Exception ex) {
212 m_deferred.Reject(ex);
225 m_deferred.Reject(ex);
213 }
226 }
214 } else {
227 } else {
215 m_deferred.Reject(error);
228 m_deferred.Reject(error);
216 }
229 }
217 }
230 }
218
231
219 #endregion
232 #endregion
220
233
221 #region ICancellable implementation
234 #region ICancellable implementation
222
235
223 public void Cancel() {
236 public void Cancel(Exception reason) {
224 if (m_cancelled != null) {
237 if (m_cancelled != null) {
225 try {
238 try {
226 m_cancelled();
239 m_cancelled(reason);
227 m_deferred.Resolve(default(T));
240 m_deferred.Resolve(default(T));
228 } catch(Exception ex) {
241 } catch (Exception ex) {
229 Reject(ex);
242 Reject(ex);
230 }
243 }
231 } else {
244 } else {
232 m_deferred.Cancel();
245 m_deferred.Cancel(reason);
246 }
233 }
247 }
248
249 public void Cancel() {
250 Cancel(null);
234 }
251 }
235
252
236 #endregion
253 #endregion
237 }
254 }
238
255
239 public class EventDescriptor : IDeferred<T> {
256 public class EventDescriptor : IDeferred<T> {
240 readonly Action m_success;
257 readonly Action m_success;
241 readonly Action<Exception> m_failed;
258 readonly Action<Exception> m_failed;
242 readonly Action m_cancelled;
259 readonly Action<Exception> m_cancelled;
243 readonly IDeferred<T> m_deferred;
260 readonly IDeferred m_deferred;
244
261
245 public EventDescriptor(Action success, Action<Exception> failed, Action cancelled, IDeferred<T> deferred) {
262 public EventDescriptor(Action success, Action<Exception> failed, Action<Exception> cancelled, IDeferred deferred) {
246 Debug.Assert(deferred != null);
263 Debug.Assert(deferred != null);
247
264
248 m_success = success;
265 m_success = success;
249 m_failed = failed;
266 m_failed = failed;
250 m_cancelled = cancelled;
267 m_cancelled = cancelled;
251 m_deferred = deferred;
268 m_deferred = deferred;
252 }
269 }
253
270
254 #region IDeferred implementation
271 #region IDeferred implementation
255
272
256 public void Resolve(T value) {
273 public void Resolve(T value) {
257 if (m_success != null) {
274 if (m_success != null) {
258 try {
275 try {
259 m_success();
276 m_success();
260 m_deferred.Resolve(value);
277 m_deferred.Resolve();
261 } catch (Exception ex) {
278 } catch (Exception ex) {
262 Reject(ex);
279 Reject(ex);
263 }
280 }
264 }
281 }
265 }
282 }
266
283
267 public void Reject(Exception error) {
284 public void Reject(Exception error) {
268 if (m_failed != null) {
285 if (m_failed != null) {
269 try {
286 try {
270 m_failed(error);
287 m_failed(error);
271 m_deferred.Resolve(default(T));
288 m_deferred.Resolve();
272 }catch (Exception ex)
289 } catch (Exception ex) {
273 {
274 m_deferred.Reject(ex);
290 m_deferred.Reject(ex);
275 }
291 }
276 } else {
292 } else {
277 m_deferred.Reject(error);
293 m_deferred.Reject(error);
278 }
294 }
279
280 }
295 }
281
296
282 #endregion
297 #endregion
283
298
284 #region ICancellable implementation
299 #region ICancellable implementation
285
300
286 public void Cancel() {
301 public void Cancel(Exception reason) {
287 if (m_cancelled != null) {
302 if (m_cancelled != null) {
288 try {
303 try {
289 m_cancelled();
304 m_cancelled(reason);
290 m_deferred.Resolve(default(T));
305 m_deferred.Resolve();
291 } catch (Exception ex) {
306 } catch (Exception ex) {
292 Reject(ex);
307 Reject(ex);
293 }
308 }
294 } else {
309 } else {
295 m_deferred.Cancel();
310 m_deferred.Cancel(reason);
311 }
296 }
312 }
313
314 public void Cancel() {
315 Cancel(null);
297 }
316 }
298
317
299 #endregion
318 #endregion
300 }
319 }
301
320
302 T m_result;
321 T m_result;
303
322
304 public virtual void Resolve(T value) {
323 public virtual void Resolve(T value) {
305 if (BeginSetResult()) {
324 if (BeginSetResult()) {
306 m_result = value;
325 m_result = value;
307 EndSetResult();
326 EndSetResult();
308 }
327 }
309 }
328 }
310
329
311 public void Reject(Exception error) {
330 public void Reject(Exception error) {
312 SetError(error);
331 SetError(error);
313 }
332 }
314
333
315 public Type PromiseType {
334 public Type PromiseType {
316 get {
335 get {
317 return typeof(T);
336 return typeof(T);
318 }
337 }
319 }
338 }
320
339
321 public new T Join() {
340 public new T Join() {
322 WaitResult(-1);
341 WaitResult(-1);
323 return m_result;
342 return m_result;
324 }
343 }
325 public new T Join(int timeout) {
344 public new T Join(int timeout) {
326 WaitResult(timeout);
345 WaitResult(timeout);
327 return m_result;
346 return m_result;
328 }
347 }
329
348
330 public IPromise<T> On(Action<T> success, Action<Exception> error, Action cancel) {
349 public IPromise<T> On(Action<T> success, Action<Exception> error, Action<Exception> cancel) {
331 AddHandler(new ValueEventDescriptor(success, error, cancel, StubDeferred.instance));
350 AddHandler(new ValueEventDescriptor(success, error, cancel, StubDeferred.instance));
332 return this;
351 return this;
333 }
352 }
334
353
335 public IPromise<T> On(Action<T> success, Action<Exception> error) {
354 public IPromise<T> On(Action<T> success, Action<Exception> error) {
336 AddHandler(new ValueEventDescriptor(success, error, null, StubDeferred.instance));
355 AddHandler(new ValueEventDescriptor(success, error, null, StubDeferred.instance));
337 return this;
356 return this;
338 }
357 }
339
358
340 public IPromise<T> On(Action<T> success) {
359 public IPromise<T> On(Action<T> success) {
341 AddHandler(new ValueEventDescriptor(success, null, null, StubDeferred.instance));
360 AddHandler(new ValueEventDescriptor(success, null, null, StubDeferred.instance));
342 return this;
361 return this;
343 }
362 }
344
363
345 public IPromise<T> On(Action handler, PromiseEventType events) {
364 public IPromise<T> On(Action handler, PromiseEventType events) {
346 Listen(events, handler);
365 Listen(events, handler);
347 return this;
366 return this;
348 }
367 }
349
368
350 public IPromise<T2> Then<T2>(Func<T, T2> mapper, Func<Exception, T2> error, Func<T2> cancel) {
369 public IPromise<T2> Then<T2>(Func<T, T2> mapper, Func<Exception, T2> error, Func<Exception, T2> cancel) {
351 var promise = new Promise<T2>();
370 var promise = new Promise<T2>();
352 if (mapper != null)
371 if (mapper != null)
353 promise.On(Cancel, PromiseEventType.Cancelled);
372 promise.On((Action)null, null, Cancel);
354 AddHandler(new RemapDescriptor<T2>(mapper, error, cancel, promise));
373 AddHandler(new RemapDescriptor<T2>(mapper, error, cancel, promise));
355 return promise;
374 return promise;
356 }
375 }
357
376
358 public IPromise<T2> Then<T2>(Func<T, T2> mapper, Func<Exception, T2> error) {
377 public IPromise<T2> Then<T2>(Func<T, T2> mapper, Func<Exception, T2> error) {
359 var promise = new Promise<T2>();
378 var promise = new Promise<T2>();
360 if (mapper != null)
379 if (mapper != null)
361 promise.On(Cancel, PromiseEventType.Cancelled);
380 promise.On((Action)null, null, Cancel);
362 AddHandler(new RemapDescriptor<T2>(mapper, error, null, promise));
381 AddHandler(new RemapDescriptor<T2>(mapper, error, null, promise));
363 return promise;
382 return promise;
364 }
383 }
365
384
366 public IPromise<T2> Then<T2>(Func<T, T2> mapper) {
385 public IPromise<T2> Then<T2>(Func<T, T2> mapper) {
367 var promise = new Promise<T2>();
386 var promise = new Promise<T2>();
368 if (mapper != null)
387 if (mapper != null)
369 promise.On(Cancel, PromiseEventType.Cancelled);
388 promise.On((Action)null, null, Cancel);
370 AddHandler(new RemapDescriptor<T2>(mapper, null, null, promise));
389 AddHandler(new RemapDescriptor<T2>(mapper, null, null, promise));
371 return promise;
390 return promise;
372 }
391 }
373
392
374 public IPromise<T2> Chain<T2>(Func<T, IPromise<T2>> chained, Func<Exception, IPromise<T2>> error, Func<IPromise<T2>> cancel) {
393 public IPromise<T2> Chain<T2>(Func<T, IPromise<T2>> chained, Func<Exception, IPromise<T2>> error, Func<Exception, IPromise<T2>> cancel) {
375 // this promise will be resolved when an asyc operation is started
394 // this promise will be resolved when an asyc operation is started
376 var promise = new Promise<IPromise<T2>>();
395 var promise = new Promise<IPromise<T2>>();
377
396
378 AddHandler(new RemapDescriptor<IPromise<T2>>(
397 AddHandler(new RemapDescriptor<IPromise<T2>>(
379 chained,
398 chained,
380 error,
399 error,
381 cancel,
400 cancel,
382 promise
401 promise
383 ));
402 ));
384
403
385 var medium = new Promise<T2>();
404 var medium = new Promise<T2>();
386
405
387 if (chained != null)
406 if (chained != null)
388 medium.On(Cancel, PromiseEventType.Cancelled);
407 medium.On(Cancel, PromiseEventType.Cancelled);
389
408
390 // we need to connect started async operation with the medium
409 // we need to connect started async operation with the medium
391 // if the async operation hasn't been started by the some reason
410 // if the async operation hasn't been started by the some reason
392 // report is to the medium
411 // report is to the medium
393 promise.On(
412 promise.On(
394 result => ConnectPromise<T2>(result, medium),
413 result => ConnectPromise<T2>(result, medium),
395 medium.Reject,
414 medium.Reject,
396 medium.Cancel
415 medium.Cancel
397 );
416 );
398
417
399 return medium;
418 return medium;
400 }
419 }
401
420
402 static void ConnectPromise<T2>(IPromise<T2> result, Promise<T2> medium) {
421 static void ConnectPromise<T2>(IPromise<T2> result, Promise<T2> medium) {
403 if (result != null) {
422 if (result != null) {
404 result.On(
423 result.On(
405 medium.Resolve,
424 medium.Resolve,
406 medium.Reject,
425 medium.Reject,
407 () => medium.Reject(new OperationCanceledException())
426 medium.Cancel
408 );
427 );
409 medium.On(result.Cancel, PromiseEventType.Cancelled);
428 medium.On(result.Cancel, PromiseEventType.Cancelled);
410 } else {
429 } else {
411 medium.Reject(
430 medium.Reject(
412 new NullReferenceException(
431 new NullReferenceException(
413 "The chained asynchronous operation returned" +
432 "The chained asynchronous operation returned" +
414 " 'null' where the promise instance is expected"
433 " 'null' where the promise instance is expected"
415 )
434 )
416 );
435 );
417 }
436 }
418 }
437 }
419
438
420 public IPromise<T2> Chain<T2>(Func<T, IPromise<T2>> chained, Func<Exception, IPromise<T2>> error) {
439 public IPromise<T2> Chain<T2>(Func<T, IPromise<T2>> chained, Func<Exception, IPromise<T2>> error) {
421 return Chain(chained, error, null);
440 return Chain(chained, error, null);
422 }
441 }
423
442
424 public IPromise<T2> Chain<T2>(Func<T, IPromise<T2>> chained) {
443 public IPromise<T2> Chain<T2>(Func<T, IPromise<T2>> chained) {
425 return Chain(chained, null, null);
444 return Chain(chained, null, null);
426 }
445 }
427
446
428 public IPromise<T2> Error<T2>(Func<Exception, T2> error) {
447 public IPromise Then(Action success, Action<Exception> error, Action<Exception> cancel) {
429 var promise = new Promise<T2>();
448 var promise = new Promise();
430 if (error != null)
431 On(
432 (Action<T>)null,
433 ex => {
434 try {
435 promise.Resolve(error(ex));
436 } catch (Exception ex2) {
437 promise.Reject(ex2);
438 }
439 }
440 );
441 else
442 Listen(PromiseEventType.Error, () => promise.Resolve(default(T2)));
443 return promise;
444 }
445
446 public IPromise<T2> Cancelled<T2>(Func<T2> handler) {
447 var promise = new Promise<T2>();
448 if (handler != null)
449 On(
450 (Action<T>)null,
451 null,
452 () => {
453 try {
454 promise.Resolve(handler());
455 } catch (Exception ex) {
456 promise.Reject(ex);
457 }
458 });
459 else
460 Listen(PromiseEventType.Cancelled, () => promise.Resolve(default(T2)));
461 return promise;
462 }
463
464 public IPromise Then(Action success, Action<Exception> error, Action cancel) {
465 var promise = new Promise<T>();
466 if (success != null)
449 if (success != null)
467 promise.On(Cancel, PromiseEventType.Cancelled);
450 promise.On(null, null, Cancel);
468
451
469 AddHandler(new EventDescriptor(success, error, cancel, promise));
452 AddHandler(new EventDescriptor(success, error, cancel, promise));
470
453
471 return promise;
454 return promise;
472 }
455 }
473
456
474 public IPromise Then(Action success, Action<Exception> error) {
457 public IPromise Then(Action success, Action<Exception> error) {
475 return Then(success, error, null);
458 return Then(success, error, null);
476 }
459 }
477
460
478 public IPromise Then(Action success) {
461 public IPromise Then(Action success) {
479 return Then(success, null, null);
462 return Then(success, null, null);
480 }
463 }
481
464
482 public IPromise Chain(Func<IPromise> chained, Func<Exception, IPromise> error, Func<IPromise> cancel) {
465 public IPromise Chain(Func<IPromise> chained, Func<Exception, IPromise> error, Func<Exception, IPromise> cancel) {
483 var promise = new Promise<IPromise>();
466 var promise = new Promise<IPromise>();
484
467
485 AddHandler(
468 AddHandler(
486 new RemapDescriptor<IPromise>(
469 new RemapDescriptor<IPromise>(
487 x => chained(),
470 x => chained(),
488 error,
471 error,
489 cancel,
472 cancel,
490 promise
473 promise
491 )
474 )
492 );
475 );
493
476
494 var medium = new Promise();
477 var medium = new Promise();
495 if (chained != null)
478 if (chained != null)
496 medium.On(Cancel, PromiseEventType.Cancelled);
479 medium.On(null, null, Cancel);
497
480
498 promise.On(
481 promise.On(
499 result => ConnectPromise(result, medium),
482 result => ConnectPromise(result, medium),
500 medium.Reject,
483 medium.Reject,
501 medium.Cancel
484 medium.Cancel
502 );
485 );
503
486
504 return medium;
487 return medium;
505 }
488 }
506
489
507 static void ConnectPromise(IPromise result, Promise medium) {
490 static void ConnectPromise(IPromise result, Promise medium) {
508 if (result != null) {
491 if (result != null) {
509 result.On(
492 result.On(
510 medium.Resolve,
493 medium.Resolve,
511 medium.Reject,
494 medium.Reject,
512 () => medium.Reject(new OperationCanceledException())
495 medium.Cancel
513 );
496 );
514 medium.On(result.Cancel, PromiseEventType.Cancelled);
497 medium.On(null, null, result.Cancel);
515 } else {
498 } else {
516 medium.Reject(
499 medium.Reject(
517 new NullReferenceException(
500 new NullReferenceException(
518 "The chained asynchronous operation returned" +
501 "The chained asynchronous operation returned" +
519 " 'null' where the promise instance is expected"
502 " 'null' where the promise instance is expected"
520 )
503 )
521 );
504 );
522 }
505 }
523 }
506 }
524
507
525 public IPromise Chain(Func<IPromise> chained, Func<Exception, IPromise> error) {
508 public IPromise Chain(Func<IPromise> chained, Func<Exception, IPromise> error) {
526 return Chain(chained, error, null);
509 return Chain(chained, error, null);
527 }
510 }
528
511
529 public IPromise Chain(Func<IPromise> chained) {
512 public IPromise Chain(Func<IPromise> chained) {
530 return Chain(chained, null, null);
513 return Chain(chained, null, null);
531 }
514 }
532
515
533 public IPromise On(Action success, Action<Exception> error, Action cancel) {
516 public IPromise On(Action success, Action<Exception> error, Action<Exception> cancel) {
534 AddHandler(new EventDescriptor(success,error,cancel, StubDeferred.instance));
517 AddHandler(new EventDescriptor(success,error,cancel, StubDeferred.instance));
535 return this;
518 return this;
536 }
519 }
537
520
538 public IPromise On(Action success, Action<Exception> error) {
521 public IPromise On(Action success, Action<Exception> error) {
539 AddHandler(new EventDescriptor(success, error, null, StubDeferred.instance));
522 AddHandler(new EventDescriptor(success, error, null, StubDeferred.instance));
540 return this;
523 return this;
541 }
524 }
542
525
543 public IPromise On(Action success) {
526 public IPromise On(Action success) {
544 Listen(PromiseEventType.Success, success);
527 Listen(PromiseEventType.Success, success);
545 return this;
528 return this;
546 }
529 }
547
530
548 IPromise IPromise.On(Action handler, PromiseEventType events) {
531 IPromise IPromise.On(Action handler, PromiseEventType events) {
549 Listen(events,handler);
532 Listen(events,handler);
550 return this;
533 return this;
551 }
534 }
552
535
553 public IPromise Error(Action<Exception> error) {
554 var promise = new Promise();
555 if (error != null)
556 On(
557 (Action<T>)null,
558 ex => {
559 try {
560 error(ex);
561 promise.Resolve();
562 } catch (Exception ex2) {
563 promise.Reject(ex2);
564 }
565 });
566 else
567 Listen(PromiseEventType.Error, promise.Resolve);
568 return promise;
569 }
570
571 public IPromise Cancelled(Action handler) {
572 var promise = new Promise();
573 if (handler != null)
574 On(
575 (Action<T>)null,
576 null,
577 () => {
578 try {
579 handler();
580 promise.Resolve();
581 } catch (Exception ex) {
582 promise.Reject(ex);
583 }
584 });
585 else
586 Listen(PromiseEventType.Cancelled, promise.Resolve);
587 return promise;
588 }
589
590 public IPromise<T2> Cast<T2>() {
536 public IPromise<T2> Cast<T2>() {
591 return (IPromise<T2>)this;
537 return (IPromise<T2>)this;
592 }
538 }
593
539
594 #region implemented abstract members of AbstractPromise
540 #region implemented abstract members of AbstractPromise
595
541
596 protected override void SignalSuccess(IDeferred<T> handler) {
542 protected override void SignalSuccess(IDeferred<T> handler) {
597 handler.Resolve(m_result);
543 handler.Resolve(m_result);
598 }
544 }
599
545
600 protected override void SignalError(IDeferred<T> handler, Exception error) {
546 protected override void SignalError(IDeferred<T> handler, Exception error) {
601 handler.Reject(error);
547 handler.Reject(error);
602 }
548 }
603
549
604 protected override void SignalCancelled(IDeferred<T> handler) {
550 protected override void SignalCancelled(IDeferred<T> handler, Exception reason) {
605 handler.Cancel();
551 handler.Cancel(reason);
606 }
552 }
607
553
608 protected override void Listen(PromiseEventType events, Action handler) {
554 protected override void Listen(PromiseEventType events, Action handler) {
609 if (handler != null)
555 if (handler != null)
610 AddHandler(new ListenerDescriptor(handler, events));
556 AddHandler(new ListenerDescriptor(handler, events));
611 }
557 }
612
558
613 #endregion
559 #endregion
614
560
615 public static IPromise<T> ResultToPromise(T value) {
561 public static IPromise<T> ResultToPromise(T value) {
616 var p = new Promise<T>();
562 var p = new Promise<T>();
617 p.Resolve(value);
563 p.Resolve(value);
618 return p;
564 return p;
619 }
565 }
620
566
621 public static IPromise<T> ExceptionToPromise(Exception error) {
567 public static IPromise<T> ExceptionToPromise(Exception error) {
622 var p = new Promise<T>();
568 var p = new Promise<T>();
623 p.Reject(error);
569 p.Reject(error);
624 return p;
570 return p;
625 }
571 }
626
572
627 }
573 }
628 }
574 }
@@ -1,33 +1,32
1 using System;
1 using System;
2
2
3 namespace Implab {
3 namespace Implab {
4
5 [Serializable]
4 [Serializable]
6 public class PromiseTransientException : Exception {
5 public class PromiseTransientException : Exception {
7 /// <summary>
6 /// <summary>
8 /// Initializes a new instance of the <see cref="PromiseTransientException"/> class.
7 /// Initializes a new instance of the <see cref="PromiseTransientException"/> class.
9 /// </summary>
8 /// </summary>
10 /// <param name="inner">The exception that is the cause of the current exception.</param>
9 /// <param name="inner">The exception that is the cause of the current exception.</param>
11 public PromiseTransientException(Exception inner) : base("The preceding promise has failed", inner) {
10 public PromiseTransientException(Exception inner) : base("The preceding promise has failed", inner) {
12 }
11 }
13
12
14 /// <summary>
13 /// <summary>
15 /// Initializes a new instance of the <see cref="PromiseTransientException"/> class
14 /// Initializes a new instance of the <see cref="PromiseTransientException"/> class
16 /// </summary>
15 /// </summary>
17 /// <param name="message">A <see cref="T:System.String"/> that describes the exception. </param>
16 /// <param name="message">A <see cref="T:System.String"/> that describes the exception. </param>
18 /// <param name="inner">The exception that is the cause of the current exception. </param>
17 /// <param name="inner">The exception that is the cause of the current exception. </param>
19 public PromiseTransientException(string message, Exception inner)
18 public PromiseTransientException(string message, Exception inner)
20 : base(message, inner) {
19 : base(message, inner) {
21 }
20 }
22
21
23 /// <summary>
22 /// <summary>
24 /// Initializes a new instance of the <see cref="PromiseTransientException"/> class
23 /// Initializes a new instance of the <see cref="PromiseTransientException"/> class
25 /// </summary>
24 /// </summary>
26 /// <param name="context">The contextual information about the source or destination.</param>
25 /// <param name="context">The contextual information about the source or destination.</param>
27 /// <param name="info">The object that holds the serialized object data.</param>
26 /// <param name="info">The object that holds the serialized object data.</param>
28 protected PromiseTransientException(System.Runtime.Serialization.SerializationInfo info, System.Runtime.Serialization.StreamingContext context)
27 protected PromiseTransientException(System.Runtime.Serialization.SerializationInfo info, System.Runtime.Serialization.StreamingContext context)
29 : base(info, context) {
28 : base(info, context) {
30 }
29 }
31 }
30 }
32 }
31 }
33
32
@@ -1,25 +1,26
1 using System.Threading;
1 using System.Threading;
2 using System;
2
3
3 namespace Implab {
4 namespace Implab {
4 public class SyncContextPromise<T> : Promise<T> {
5 public class SyncContextPromise<T> : Promise<T> {
5 readonly SynchronizationContext m_context;
6 readonly SynchronizationContext m_context;
6
7
7 public SyncContextPromise(SynchronizationContext context) {
8 public SyncContextPromise(SynchronizationContext context) {
8 Safe.ArgumentNotNull(context, "context");
9 Safe.ArgumentNotNull(context, "context");
9 m_context = context;
10 m_context = context;
10 }
11 }
11
12
12 protected override void SignalSuccess(IDeferred<T> handler) {
13 protected override void SignalSuccess(IDeferred<T> handler) {
13 m_context.Post(x => base.SignalSuccess(handler), null);
14 m_context.Post(x => base.SignalSuccess(handler), null);
14 }
15 }
15
16
16 protected override void SignalError(IDeferred<T> handler, System.Exception error) {
17 protected override void SignalError(IDeferred<T> handler, Exception error) {
17 m_context.Post(x => base.SignalError(handler, error), null);
18 m_context.Post(x => base.SignalError(handler, error), null);
18 }
19 }
19
20
20 protected override void SignalCancelled(IDeferred<T> handler) {
21 protected override void SignalCancelled(IDeferred<T> handler, Exception reason) {
21 m_context.Post(x => base.SignalCancelled(handler), null);
22 m_context.Post(x => base.SignalCancelled(handler, reason), null);
22 }
23 }
23 }
24 }
24 }
25 }
25
26
@@ -1,136 +1,143
1 using System;
1 using System;
2 using System.Collections.Generic;
2 using System.Collections.Generic;
3 using System.Linq;
3 using System.Linq;
4 using System.Text;
4 using System.Text;
5 using System.Threading;
5 using System.Threading;
6
6
7 namespace Implab
7 namespace Implab
8 {
8 {
9 /// <summary>
9 /// <summary>
10 /// This class allows to interact with asyncronuos task.
10 /// This class allows to interact with asyncronuos task.
11 /// </summary>
11 /// </summary>
12 /// <remarks>
12 /// <remarks>
13 /// Members of this object are thread safe.
13 /// Members of this object are thread safe.
14 /// </remarks>
14 /// </remarks>
15 public class TaskController: IProgressNotifier, ITaskController
15 public class TaskController: IProgressNotifier, ITaskController
16 {
16 {
17 readonly object m_lock;
17 readonly object m_lock;
18 string m_message;
18 string m_message;
19
19
20 float m_current;
20 float m_current;
21 float m_max;
21 float m_max;
22
22
23 bool m_cancelled;
23 bool m_cancelled;
24
24
25 public event EventHandler Cancelled;
25 public event EventHandler Cancelled;
26 public event EventHandler<ValueEventArgs<string>> MessageUpdated;
26 public event EventHandler<ValueEventArgs<string>> MessageUpdated;
27 public event EventHandler<ValueEventArgs<float>> ProgressUpdated;
27 public event EventHandler<ValueEventArgs<float>> ProgressUpdated;
28 public event EventHandler<ProgressInitEventArgs> ProgressInit;
28 public event EventHandler<ProgressInitEventArgs> ProgressInit;
29
29
30 public TaskController()
30 public TaskController()
31 {
31 {
32 m_lock = new Object();
32 m_lock = new Object();
33 }
33 }
34
34
35 public string Message
35 public string Message
36 {
36 {
37 get
37 get
38 {
38 {
39 lock (m_lock)
39 lock (m_lock)
40 return m_message;
40 return m_message;
41 }
41 }
42 set
42 set
43 {
43 {
44 lock (m_lock)
44 lock (m_lock)
45 {
45 {
46 m_message = value;
46 m_message = value;
47 OnMessageUpdated();
47 OnMessageUpdated();
48 }
48 }
49 }
49 }
50 }
50 }
51
51
52 public float CurrentProgress
52 public float CurrentProgress
53 {
53 {
54 get
54 get
55 {
55 {
56 lock (m_lock)
56 lock (m_lock)
57 return m_current;
57 return m_current;
58 }
58 }
59 set
59 set
60 {
60 {
61 lock (m_lock)
61 lock (m_lock)
62 {
62 {
63 var prev = m_current;
63 var prev = m_current;
64 m_current = value;
64 m_current = value;
65 if (m_current >= m_max)
65 if (m_current >= m_max)
66 m_current = m_max;
66 m_current = m_max;
67 if (m_current != prev)
67 if (m_current != prev)
68 OnProgressUpdated();
68 OnProgressUpdated();
69 }
69 }
70 }
70 }
71 }
71 }
72
72
73 public void InitProgress(float current, float max, string message)
73 public void InitProgress(float current, float max, string message)
74 {
74 {
75 if (max < 0)
75 if (max < 0)
76 throw new ArgumentOutOfRangeException("max");
76 throw new ArgumentOutOfRangeException("max");
77 if (current < 0 || current > max)
77 if (current < 0 || current > max)
78 throw new ArgumentOutOfRangeException("current");
78 throw new ArgumentOutOfRangeException("current");
79
79
80 lock(m_lock) {
80 lock(m_lock) {
81 m_current = current;
81 m_current = current;
82 m_max = max;
82 m_max = max;
83 m_message = message;
83 m_message = message;
84 OnProgressInit();
84 OnProgressInit();
85 }
85 }
86 }
86 }
87
87
88 public bool IsCancelled {
88 public bool IsCancelled {
89 get {
89 get {
90 lock (m_lock)
90 lock (m_lock)
91 return m_cancelled;
91 return m_cancelled;
92 }
92 }
93 }
93 }
94
94
95 public void Cancel() {
95 public void Cancel() {
96 lock (m_lock) {
96 lock (m_lock) {
97 if (!m_cancelled)
97 if (!m_cancelled)
98 m_cancelled = true;
98 m_cancelled = true;
99 }
99 }
100 }
100 }
101
101
102 public void Cancel(Exception reason) {
103 lock (m_lock) {
104 if (!m_cancelled)
105 m_cancelled = true;
106 }
107 }
108
102 protected virtual void OnCancelled() {
109 protected virtual void OnCancelled() {
103 var temp = Cancelled;
110 var temp = Cancelled;
104 if (temp != null) {
111 if (temp != null) {
105 temp(this,new EventArgs());
112 temp(this,new EventArgs());
106 }
113 }
107 }
114 }
108
115
109 protected virtual void OnMessageUpdated()
116 protected virtual void OnMessageUpdated()
110 {
117 {
111 var temp = MessageUpdated;
118 var temp = MessageUpdated;
112 if (temp != null)
119 if (temp != null)
113 {
120 {
114 temp(this, new ValueEventArgs<string>(m_message));
121 temp(this, new ValueEventArgs<string>(m_message));
115 }
122 }
116 }
123 }
117
124
118 protected virtual void OnProgressUpdated()
125 protected virtual void OnProgressUpdated()
119 {
126 {
120 var temp = ProgressUpdated;
127 var temp = ProgressUpdated;
121 if (temp != null)
128 if (temp != null)
122 {
129 {
123 temp(this,new ValueEventArgs<float>(m_current));
130 temp(this,new ValueEventArgs<float>(m_current));
124 }
131 }
125 }
132 }
126
133
127 protected virtual void OnProgressInit()
134 protected virtual void OnProgressInit()
128 {
135 {
129 var temp = ProgressInit;
136 var temp = ProgressInit;
130 if (temp != null)
137 if (temp != null)
131 {
138 {
132 temp(this, new ProgressInitEventArgs(m_current,m_max, m_message));
139 temp(this, new ProgressInitEventArgs(m_current,m_max, m_message));
133 }
140 }
134 }
141 }
135 }
142 }
136 }
143 }
@@ -1,93 +1,93
1 using System;
1 using System;
2 using Implab.Diagnostics;
2 using Implab.Diagnostics;
3 using Implab.Parallels;
3 using Implab.Parallels;
4 using Implab;
4 using Implab;
5 using System.Collections.Generic;
5 using System.Collections.Generic;
6 using System.Collections.Concurrent;
6 using System.Collections.Concurrent;
7 using System.Threading;
7 using System.Threading;
8
8
9 namespace MonoPlay {
9 namespace MonoPlay {
10 class MainClass {
10 class MainClass {
11 public static void Main(string[] args) {
11 public static void Main(string[] args) {
12 if (args == null)
12 if (args == null)
13 throw new ArgumentNullException("args");
13 throw new ArgumentNullException("args");
14
14
15 var t1 = Environment.TickCount;
15 var t1 = Environment.TickCount;
16
16
17 const int reads = 100000;
17 const int reads = 100000;
18 const int writes = 1000;
18 const int writes = 1000;
19 const int readThreads = 8;
19 const int readThreads = 8;
20 const int writeThreads = 0;
20 const int writeThreads = 0;
21
21
22 var l = new SharedLock();
22 var l = new SharedLock();
23 var st = new HashSet<int>();
23 var st = new HashSet<int>();
24
24
25 Action reader1 = () => {
25 Action reader1 = () => {
26 for (int i =0; i < reads; i++) {
26 for (int i =0; i < reads; i++) {
27 try {
27 try {
28 l.LockShared();
28 l.LockShared();
29 st.Contains(i % 1000);
29 st.Contains(i % 1000);
30 Thread.Sleep(0);
30 Thread.Sleep(0);
31 } finally {
31 } finally {
32 l.Release();
32 l.Release();
33 }
33 }
34 }
34 }
35 };
35 };
36
36
37 Action reader2 = () => {
37 Action reader2 = () => {
38 for(var i = 0; i < reads; i++)
38 for(var i = 0; i < reads; i++)
39 lock(st) {
39 lock(st) {
40 st.Contains(i % 1000);
40 st.Contains(i % 1000);
41 Thread.Sleep(0);
41 Thread.Sleep(0);
42 }
42 }
43 };
43 };
44
44
45 Action writer1 = () => {
45 Action writer1 = () => {
46 var rnd = new Random(Environment.TickCount);
46 var rnd = new Random(Environment.TickCount);
47 for (int i = 0; i < writes; i++) {
47 for (int i = 0; i < writes; i++) {
48 try {
48 try {
49 l.LockExclusive();
49 l.LockExclusive();
50 st.Add(rnd.Next(1000));
50 st.Add(rnd.Next(1000));
51 //Thread.Sleep(1);
51 //Thread.Sleep(1);
52 } finally {
52 } finally {
53 l.Release();
53 l.Release();
54 }
54 }
55 }
55 }
56 };
56 };
57
57
58 Action writer2 = () => {
58 Action writer2 = () => {
59 var rnd = new Random(Environment.TickCount);
59 var rnd = new Random(Environment.TickCount);
60 for (int i = 0; i < writes; i++) {
60 for (int i = 0; i < writes; i++) {
61 lock (st) {
61 lock (st) {
62 st.Add(rnd.Next(1000));
62 st.Add(rnd.Next(1000));
63 //Thread.Sleep(1);
63 //Thread.Sleep(1);
64 }
64 }
65 }
65 }
66 };
66 };
67
67
68
68
69
69
70 var readers = new IPromise[readThreads];
70 var readers = new IPromise[readThreads];
71 for (int i = 0; i < readThreads; i++)
71 for (int i = 0; i < readThreads; i++)
72 readers[i] = AsyncPool.RunThread(reader1);
72 readers[i] = AsyncPool.RunThread(reader2);
73
73
74 var writers = new IPromise[writeThreads];
74 var writers = new IPromise[writeThreads];
75 for (int i = 0; i < writeThreads; i++)
75 for (int i = 0; i < writeThreads; i++)
76 writers[i] = AsyncPool.RunThread(writer1);
76 writers[i] = AsyncPool.RunThread(writer1);
77
77
78
78
79 new [] {
79 new [] {
80 readers.Bundle().On(() => Console.WriteLine("readers complete in {0} ms", Environment.TickCount - t1)),
80 readers.Bundle().On(() => Console.WriteLine("readers complete in {0} ms", Environment.TickCount - t1)),
81 writers.Bundle().On(() => Console.WriteLine("writers complete in {0} ms", Environment.TickCount - t1))
81 writers.Bundle().On(() => Console.WriteLine("writers complete in {0} ms", Environment.TickCount - t1))
82 }.Bundle().Join();
82 }.Bundle().Join();
83
83
84
84
85
85
86 var t2 = Environment.TickCount;
86 var t2 = Environment.TickCount;
87 Console.WriteLine("done: {0} ms, {1:.00} Mb, {2} GC", t2 - t1, GC.GetTotalMemory(false) / (1024*1024), GC.CollectionCount(0) );
87 Console.WriteLine("done: {0} ms, {1:.00} Mb, {2} GC", t2 - t1, GC.GetTotalMemory(false) / (1024*1024), GC.CollectionCount(0) );
88
88
89 }
89 }
90
90
91
91
92 }
92 }
93 }
93 }
General Comments 0
You need to be logged in to leave comments. Login now