##// END OF EJS Templates
Слияние с promises
cin -
r23:f0568ff069a5 merge default
parent child
Show More
@@ -1,333 +1,333
1 using System;
1 using System;
2 using Microsoft.VisualStudio.TestTools.UnitTesting;
2 using Microsoft.VisualStudio.TestTools.UnitTesting;
3 using System.Reflection;
3 using System.Reflection;
4 using System.Threading;
4 using System.Threading;
5 using Implab.Parallels;
5 using Implab.Parallels;
6
6
7 namespace Implab.Test {
7 namespace Implab.Test {
8 [TestClass]
8 [TestClass]
9 public class AsyncTests {
9 public class AsyncTests {
10 [TestMethod]
10 [TestMethod]
11 public void ResolveTest() {
11 public void ResolveTest() {
12 int res = -1;
12 int res = -1;
13 var p = new Promise<int>();
13 var p = new Promise<int>();
14 p.Then(x => res = x);
14 p.Then(x => res = x);
15 p.Resolve(100);
15 p.Resolve(100);
16
16
17 Assert.AreEqual(res, 100);
17 Assert.AreEqual(100, res);
18 }
18 }
19
19
20 [TestMethod]
20 [TestMethod]
21 public void RejectTest() {
21 public void RejectTest() {
22 int res = -1;
22 int res = -1;
23 Exception err = null;
23 Exception err = null;
24
24
25 var p = new Promise<int>();
25 var p = new Promise<int>();
26 p.Then(x => res = x, e => err = e);
26 p.Then(x => res = x, e => err = e);
27 p.Reject(new ApplicationException("error"));
27 p.Reject(new ApplicationException("error"));
28
28
29 Assert.AreEqual(res, -1);
29 Assert.AreEqual(res, -1);
30 Assert.AreEqual(err.Message, "error");
30 Assert.AreEqual(err.Message, "error");
31
31
32 }
32 }
33
33
34 [TestMethod]
34 [TestMethod]
35 public void JoinSuccessTest() {
35 public void JoinSuccessTest() {
36 var p = new Promise<int>();
36 var p = new Promise<int>();
37 p.Resolve(100);
37 p.Resolve(100);
38 Assert.AreEqual(p.Join(), 100);
38 Assert.AreEqual(p.Join(), 100);
39 }
39 }
40
40
41 [TestMethod]
41 [TestMethod]
42 public void JoinFailTest() {
42 public void JoinFailTest() {
43 var p = new Promise<int>();
43 var p = new Promise<int>();
44 p.Reject(new ApplicationException("failed"));
44 p.Reject(new ApplicationException("failed"));
45
45
46 try {
46 try {
47 p.Join();
47 p.Join();
48 throw new ApplicationException("WRONG!");
48 throw new ApplicationException("WRONG!");
49 } catch (TargetInvocationException err) {
49 } catch (TargetInvocationException err) {
50 Assert.AreEqual(err.InnerException.Message, "failed");
50 Assert.AreEqual(err.InnerException.Message, "failed");
51 } catch {
51 } catch {
52 Assert.Fail("Got wrong excaption");
52 Assert.Fail("Got wrong excaption");
53 }
53 }
54 }
54 }
55
55
56 [TestMethod]
56 [TestMethod]
57 public void MapTest() {
57 public void MapTest() {
58 var p = new Promise<int>();
58 var p = new Promise<int>();
59
59
60 var p2 = p.Map(x => x.ToString());
60 var p2 = p.Map(x => x.ToString());
61 p.Resolve(100);
61 p.Resolve(100);
62
62
63 Assert.AreEqual(p2.Join(), "100");
63 Assert.AreEqual(p2.Join(), "100");
64 }
64 }
65
65
66 [TestMethod]
66 [TestMethod]
67 public void FixErrorTest() {
67 public void FixErrorTest() {
68 var p = new Promise<int>();
68 var p = new Promise<int>();
69
69
70 var p2 = p.Error(e => 101);
70 var p2 = p.Error(e => 101);
71
71
72 p.Reject(new Exception());
72 p.Reject(new Exception());
73
73
74 Assert.AreEqual(p2.Join(), 101);
74 Assert.AreEqual(p2.Join(), 101);
75 }
75 }
76
76
77 [TestMethod]
77 [TestMethod]
78 public void ChainTest() {
78 public void ChainTest() {
79 var p1 = new Promise<int>();
79 var p1 = new Promise<int>();
80
80
81 var p3 = p1.Chain(x => {
81 var p3 = p1.Chain(x => {
82 var p2 = new Promise<string>();
82 var p2 = new Promise<string>();
83 p2.Resolve(x.ToString());
83 p2.Resolve(x.ToString());
84 return p2;
84 return p2;
85 });
85 });
86
86
87 p1.Resolve(100);
87 p1.Resolve(100);
88
88
89 Assert.AreEqual(p3.Join(), "100");
89 Assert.AreEqual(p3.Join(), "100");
90 }
90 }
91
91
92 [TestMethod]
92 [TestMethod]
93 public void PoolTest() {
93 public void PoolTest() {
94 var pid = Thread.CurrentThread.ManagedThreadId;
94 var pid = Thread.CurrentThread.ManagedThreadId;
95 var p = AsyncPool.Invoke(() => Thread.CurrentThread.ManagedThreadId);
95 var p = AsyncPool.Invoke(() => Thread.CurrentThread.ManagedThreadId);
96
96
97 Assert.AreNotEqual(pid, p.Join());
97 Assert.AreNotEqual(pid, p.Join());
98 }
98 }
99
99
100 [TestMethod]
100 [TestMethod]
101 public void WorkerPoolSizeTest() {
101 public void WorkerPoolSizeTest() {
102 var pool = new WorkerPool(5, 10, 0);
102 var pool = new WorkerPool(5, 10, 0);
103
103
104 Assert.AreEqual(5, pool.ThreadCount);
104 Assert.AreEqual(5, pool.PoolSize);
105
105
106 pool.Invoke(() => { Thread.Sleep(1000000); return 10; });
106 pool.Invoke(() => { Thread.Sleep(100000000); return 10; });
107 pool.Invoke(() => { Thread.Sleep(1000000); return 10; });
107 pool.Invoke(() => { Thread.Sleep(100000000); return 10; });
108 pool.Invoke(() => { Thread.Sleep(1000000); return 10; });
108 pool.Invoke(() => { Thread.Sleep(100000000); return 10; });
109
109
110 Assert.AreEqual(5, pool.ThreadCount);
110 Assert.AreEqual(5, pool.PoolSize);
111
111
112 for (int i = 0; i < 100; i++)
112 for (int i = 0; i < 100; i++)
113 pool.Invoke(() => { Thread.Sleep(1000000); return 10; });
113 pool.Invoke(() => { Thread.Sleep(100000000); return 10; });
114 Thread.Sleep(100);
114 Thread.Sleep(200);
115 Assert.AreEqual(10, pool.ThreadCount);
115 Assert.AreEqual(10, pool.PoolSize);
116
116
117 pool.Dispose();
117 pool.Dispose();
118 }
118 }
119
119
120 [TestMethod]
120 [TestMethod]
121 public void WorkerPoolCorrectTest() {
121 public void WorkerPoolCorrectTest() {
122 var pool = new WorkerPool(0,1000,100);
122 var pool = new WorkerPool(0,1000,100);
123
123
124 int iterations = 1000;
124 int iterations = 1000;
125 int pending = iterations;
125 int pending = iterations;
126 var stop = new ManualResetEvent(false);
126 var stop = new ManualResetEvent(false);
127
127
128 var count = 0;
128 var count = 0;
129 for (int i = 0; i < iterations; i++) {
129 for (int i = 0; i < iterations; i++) {
130 pool
130 pool
131 .Invoke(() => 1)
131 .Invoke(() => 1)
132 .Then(x => Interlocked.Add(ref count, x))
132 .Then(x => Interlocked.Add(ref count, x))
133 .Then(x => Math.Log10(x))
133 .Then(x => Math.Log10(x))
134 .Anyway(() => {
134 .Anyway(() => {
135 Interlocked.Decrement(ref pending);
135 Interlocked.Decrement(ref pending);
136 if (pending == 0)
136 if (pending == 0)
137 stop.Set();
137 stop.Set();
138 });
138 });
139 }
139 }
140
140
141 stop.WaitOne();
141 stop.WaitOne();
142
142
143 Assert.AreEqual(iterations, count);
143 Assert.AreEqual(iterations, count);
144 Console.WriteLine("Max threads: {0}", pool.MaxRunningThreads);
144 Console.WriteLine("Max threads: {0}", pool.MaxRunningThreads);
145 pool.Dispose();
145 pool.Dispose();
146
146
147 }
147 }
148
148
149 [TestMethod]
149 [TestMethod]
150 public void WorkerPoolDisposeTest() {
150 public void WorkerPoolDisposeTest() {
151 var pool = new WorkerPool(5, 20);
151 var pool = new WorkerPool(5, 20);
152 Assert.AreEqual(5, pool.ThreadCount);
152 Assert.AreEqual(5, pool.PoolSize);
153 pool.Dispose();
153 pool.Dispose();
154 Thread.Sleep(100);
154 Thread.Sleep(500);
155 Assert.AreEqual(0, pool.ThreadCount);
155 Assert.AreEqual(0, pool.PoolSize);
156 pool.Dispose();
156 pool.Dispose();
157 }
157 }
158
158
159 [TestMethod]
159 [TestMethod]
160 public void MTQueueTest() {
160 public void MTQueueTest() {
161 var queue = new MTQueue<int>();
161 var queue = new MTQueue<int>();
162 int res;
162 int res;
163
163
164 queue.Enqueue(10);
164 queue.Enqueue(10);
165 Assert.IsTrue(queue.TryDequeue(out res));
165 Assert.IsTrue(queue.TryDequeue(out res));
166 Assert.AreEqual(10, res);
166 Assert.AreEqual(10, res);
167 Assert.IsFalse(queue.TryDequeue(out res));
167 Assert.IsFalse(queue.TryDequeue(out res));
168
168
169 for (int i = 0; i < 1000; i++)
169 for (int i = 0; i < 1000; i++)
170 queue.Enqueue(i);
170 queue.Enqueue(i);
171
171
172 for (int i = 0; i < 1000; i++) {
172 for (int i = 0; i < 1000; i++) {
173 queue.TryDequeue(out res);
173 queue.TryDequeue(out res);
174 Assert.AreEqual(i, res);
174 Assert.AreEqual(i, res);
175 }
175 }
176
176
177 int writers = 0;
177 int writers = 0;
178 int readers = 0;
178 int readers = 0;
179 var stop = new ManualResetEvent(false);
179 var stop = new ManualResetEvent(false);
180 int total = 0;
180 int total = 0;
181
181
182 int itemsPerWriter = 1000;
182 int itemsPerWriter = 1000;
183 int writersCount = 3;
183 int writersCount = 3;
184
184
185 for (int i = 0; i < writersCount; i++) {
185 for (int i = 0; i < writersCount; i++) {
186 Interlocked.Increment(ref writers);
186 Interlocked.Increment(ref writers);
187 var wn = i;
187 var wn = i;
188 AsyncPool
188 AsyncPool
189 .InvokeNewThread(() => {
189 .InvokeNewThread(() => {
190 for (int ii = 0; ii < itemsPerWriter; ii++) {
190 for (int ii = 0; ii < itemsPerWriter; ii++) {
191 queue.Enqueue(1);
191 queue.Enqueue(1);
192 }
192 }
193 return 1;
193 return 1;
194 })
194 })
195 .Anyway(() => Interlocked.Decrement(ref writers));
195 .Anyway(() => Interlocked.Decrement(ref writers));
196 }
196 }
197
197
198 for (int i = 0; i < 10; i++) {
198 for (int i = 0; i < 10; i++) {
199 Interlocked.Increment(ref readers);
199 Interlocked.Increment(ref readers);
200 var wn = i;
200 var wn = i;
201 AsyncPool
201 AsyncPool
202 .InvokeNewThread(() => {
202 .InvokeNewThread(() => {
203 int t;
203 int t;
204 do {
204 do {
205 while (queue.TryDequeue(out t))
205 while (queue.TryDequeue(out t))
206 Interlocked.Add(ref total, t);
206 Interlocked.Add(ref total, t);
207 } while (writers > 0);
207 } while (writers > 0);
208 return 1;
208 return 1;
209 })
209 })
210 .Anyway(() => {
210 .Anyway(() => {
211 Interlocked.Decrement(ref readers);
211 Interlocked.Decrement(ref readers);
212 if (readers == 0)
212 if (readers == 0)
213 stop.Set();
213 stop.Set();
214 });
214 });
215 }
215 }
216
216
217 stop.WaitOne();
217 stop.WaitOne();
218
218
219 Assert.AreEqual(itemsPerWriter * writersCount, total);
219 Assert.AreEqual(itemsPerWriter * writersCount, total);
220 }
220 }
221
221
222 [TestMethod]
222 [TestMethod]
223 public void ParallelMapTest() {
223 public void ParallelMapTest() {
224
224
225 int count = 100000;
225 int count = 100000;
226
226
227 double[] args = new double[count];
227 double[] args = new double[count];
228 var rand = new Random();
228 var rand = new Random();
229
229
230 for (int i = 0; i < count; i++)
230 for (int i = 0; i < count; i++)
231 args[i] = rand.NextDouble();
231 args[i] = rand.NextDouble();
232
232
233 var t = Environment.TickCount;
233 var t = Environment.TickCount;
234 var res = args.ParallelMap(x => Math.Sin(x*x), 4).Join();
234 var res = args.ParallelMap(x => Math.Sin(x*x), 4).Join();
235
235
236 Console.WriteLine("Map complete in {0} ms", Environment.TickCount - t);
236 Console.WriteLine("Map complete in {0} ms", Environment.TickCount - t);
237
237
238 t = Environment.TickCount;
238 t = Environment.TickCount;
239 for (int i = 0; i < count; i++)
239 for (int i = 0; i < count; i++)
240 Assert.AreEqual(Math.Sin(args[i] * args[i]), res[i]);
240 Assert.AreEqual(Math.Sin(args[i] * args[i]), res[i]);
241 Console.WriteLine("Verified in {0} ms", Environment.TickCount - t);
241 Console.WriteLine("Verified in {0} ms", Environment.TickCount - t);
242 }
242 }
243
243
244 [TestMethod]
244 [TestMethod]
245 public void ChainedMapTest() {
245 public void ChainedMapTest() {
246
246
247 using (var pool = new WorkerPool(8,100,0)) {
247 using (var pool = new WorkerPool(4,4,0)) {
248 int count = 10000;
248 int count = 10000;
249
249
250 double[] args = new double[count];
250 double[] args = new double[count];
251 var rand = new Random();
251 var rand = new Random();
252
252
253 for (int i = 0; i < count; i++)
253 for (int i = 0; i < count; i++)
254 args[i] = rand.NextDouble();
254 args[i] = rand.NextDouble();
255
255
256 var t = Environment.TickCount;
256 var t = Environment.TickCount;
257 var res = args
257 var res = args
258 .ChainedMap(
258 .ChainedMap(
259 x => pool.Invoke(
259 x => pool.Invoke(
260 () => Math.Sin(x * x)
260 () => Math.Sin(x * x)
261 ),
261 ),
262 4
262 4
263 )
263 )
264 .Join();
264 .Join();
265
265
266 Console.WriteLine("Map complete in {0} ms", Environment.TickCount - t);
266 Console.WriteLine("Map complete in {0} ms", Environment.TickCount - t);
267
267
268 t = Environment.TickCount;
268 t = Environment.TickCount;
269 for (int i = 0; i < count; i++)
269 for (int i = 0; i < count; i++)
270 Assert.AreEqual(Math.Sin(args[i] * args[i]), res[i]);
270 Assert.AreEqual(Math.Sin(args[i] * args[i]), res[i]);
271 Console.WriteLine("Verified in {0} ms", Environment.TickCount - t);
271 Console.WriteLine("Verified in {0} ms", Environment.TickCount - t);
272 Console.WriteLine("Max workers: {0}", pool.MaxRunningThreads);
272 Console.WriteLine("Max workers: {0}", pool.MaxRunningThreads);
273 }
273 }
274 }
274 }
275
275
276 [TestMethod]
276 [TestMethod]
277 public void ParallelForEachTest() {
277 public void ParallelForEachTest() {
278
278
279 int count = 100000;
279 int count = 100000;
280
280
281 int[] args = new int[count];
281 int[] args = new int[count];
282 var rand = new Random();
282 var rand = new Random();
283
283
284 for (int i = 0; i < count; i++)
284 for (int i = 0; i < count; i++)
285 args[i] = (int)(rand.NextDouble() * 100);
285 args[i] = (int)(rand.NextDouble() * 100);
286
286
287 int result = 0;
287 int result = 0;
288
288
289 var t = Environment.TickCount;
289 var t = Environment.TickCount;
290 args.ParallelForEach(x => Interlocked.Add(ref result, x), 4).Join();
290 args.ParallelForEach(x => Interlocked.Add(ref result, x), 4).Join();
291
291
292 Console.WriteLine("Iteration complete in {0} ms, result: {1}", Environment.TickCount - t, result);
292 Console.WriteLine("Iteration complete in {0} ms, result: {1}", Environment.TickCount - t, result);
293
293
294 int result2 = 0;
294 int result2 = 0;
295
295
296 t = Environment.TickCount;
296 t = Environment.TickCount;
297 for (int i = 0; i < count; i++)
297 for (int i = 0; i < count; i++)
298 result2 += args[i];
298 result2 += args[i];
299 Assert.AreEqual(result2, result);
299 Assert.AreEqual(result2, result);
300 Console.WriteLine("Verified in {0} ms", Environment.TickCount - t);
300 Console.WriteLine("Verified in {0} ms", Environment.TickCount - t);
301 }
301 }
302
302
303 [TestMethod]
303 [TestMethod]
304 public void ComplexCase1Test() {
304 public void ComplexCase1Test() {
305 var flags = new bool[3];
305 var flags = new bool[3];
306
306
307 // op1 (aync 200ms) => op2 (async 200ms) => op3 (sync map)
307 // op1 (aync 200ms) => op2 (async 200ms) => op3 (sync map)
308
308
309 var p = PromiseHelper
309 var p = PromiseHelper
310 .Sleep(200, "Alan")
310 .Sleep(200, "Alan")
311 .Cancelled(() => flags[0] = true)
311 .Cancelled(() => flags[0] = true)
312 .Chain(x =>
312 .Chain(x =>
313 PromiseHelper
313 PromiseHelper
314 .Sleep(200, "Hi, " + x)
314 .Sleep(200, "Hi, " + x)
315 .Map(y => y)
315 .Map(y => y)
316 .Cancelled(() => flags[1] = true)
316 .Cancelled(() => flags[1] = true)
317 )
317 )
318 .Cancelled(() => flags[2] = true);
318 .Cancelled(() => flags[2] = true);
319 Thread.Sleep(300);
319 Thread.Sleep(300);
320 p.Cancel();
320 p.Cancel();
321 try {
321 try {
322 Assert.AreEqual(p.Join(), "Hi, Alan");
322 Assert.AreEqual(p.Join(), "Hi, Alan");
323 Assert.Fail("Shouldn't get here");
323 Assert.Fail("Shouldn't get here");
324 } catch (OperationCanceledException) {
324 } catch (OperationCanceledException) {
325 }
325 }
326
326
327 Assert.IsFalse(flags[0]);
327 Assert.IsFalse(flags[0]);
328 Assert.IsTrue(flags[1]);
328 Assert.IsTrue(flags[1]);
329 Assert.IsTrue(flags[2]);
329 Assert.IsTrue(flags[2]);
330 }
330 }
331 }
331 }
332 }
332 }
333
333
1 NO CONTENT: modified file, binary diff hidden
NO CONTENT: modified file, binary diff hidden
1 NO CONTENT: modified file, binary diff hidden
NO CONTENT: modified file, binary diff hidden
@@ -1,33 +1,20
1 using System;
1 using System;
2 using System.Collections.Generic;
2 using System.Collections.Generic;
3 using System.Linq;
3 using System.Linq;
4 using System.Text;
4 using System.Text;
5
5
6 namespace Implab
6 namespace Implab
7 {
7 {
8 public interface IPromise: ICancellable
8 public interface IPromise: ICancellable
9 {
9 {
10 /// <summary>
10 /// <summary>
11 /// Check whereather the promise has no more than one dependent promise.
11 /// Check whereather the promise has no more than one dependent promise.
12 /// </summary>
12 /// </summary>
13 bool IsExclusive
13 bool IsExclusive
14 {
14 {
15 get;
15 get;
16 }
16 }
17
17
18 /// <summary>
19 /// The current state of the promise.
20 /// </summary>
21 PromiseState State
22 {
23 get;
24 }
25
18
26 /// <summary>
27 /// Registers handler for the case when the promise is cencelled. If the promise already cancelled the
28 /// handler will be invoked immediatelly.
29 /// </summary>
30 /// <param name="handler">The handler</param>
31 void HandleCancelled(Action handler);
32 }
19 }
33 }
20 }
@@ -1,171 +1,171
1 using System;
1 using System;
2 using System.Collections.Generic;
2 using System.Collections.Generic;
3 using System.Diagnostics;
3 using System.Diagnostics;
4 using System.Linq;
4 using System.Linq;
5 using System.Text;
5 using System.Text;
6 using System.Threading;
6 using System.Threading;
7
7
8 namespace Implab.Parallels {
8 namespace Implab.Parallels {
9 public static class ArrayTraits {
9 public static class ArrayTraits {
10 class ArrayIterator<TSrc> : DispatchPool<int> {
10 class ArrayIterator<TSrc> : DispatchPool<int> {
11 readonly Action<TSrc> m_action;
11 readonly Action<TSrc> m_action;
12 readonly TSrc[] m_source;
12 readonly TSrc[] m_source;
13 readonly Promise<int> m_promise = new Promise<int>();
13 readonly Promise<int> m_promise = new Promise<int>();
14
14
15 int m_pending;
15 int m_pending;
16 int m_next;
16 int m_next;
17
17
18 public ArrayIterator(TSrc[] source, Action<TSrc> action, int threads)
18 public ArrayIterator(TSrc[] source, Action<TSrc> action, int threads)
19 : base(threads) {
19 : base(threads) {
20
20
21 Debug.Assert(source != null);
21 Debug.Assert(source != null);
22 Debug.Assert(action != null);
22 Debug.Assert(action != null);
23
23
24 m_next = 0;
24 m_next = 0;
25 m_source = source;
25 m_source = source;
26 m_pending = source.Length;
26 m_pending = source.Length;
27 m_action = action;
27 m_action = action;
28
28
29 m_promise.Anyway(() => Dispose());
29 m_promise.Anyway(() => Dispose());
30 m_promise.Cancelled(() => Dispose());
30 m_promise.Cancelled(() => Dispose());
31
31
32 InitPool();
32 InitPool();
33 }
33 }
34
34
35 public Promise<int> Promise {
35 public Promise<int> Promise {
36 get {
36 get {
37 return m_promise;
37 return m_promise;
38 }
38 }
39 }
39 }
40
40
41 protected override bool TryDequeue(out int unit) {
41 protected override bool TryDequeue(out int unit) {
42 unit = Interlocked.Increment(ref m_next) - 1;
42 unit = Interlocked.Increment(ref m_next) - 1;
43 return unit >= m_source.Length ? false : true;
43 return unit >= m_source.Length ? false : true;
44 }
44 }
45
45
46 protected override void InvokeUnit(int unit) {
46 protected override void InvokeUnit(int unit) {
47 try {
47 try {
48 m_action(m_source[unit]);
48 m_action(m_source[unit]);
49 var pending = Interlocked.Decrement(ref m_pending);
49 var pending = Interlocked.Decrement(ref m_pending);
50 if (pending == 0)
50 if (pending == 0)
51 m_promise.Resolve(m_source.Length);
51 m_promise.Resolve(m_source.Length);
52 } catch (Exception e) {
52 } catch (Exception e) {
53 m_promise.Reject(e);
53 m_promise.Reject(e);
54 }
54 }
55 }
55 }
56 }
56 }
57
57
58 class ArrayMapper<TSrc, TDst>: DispatchPool<int> {
58 class ArrayMapper<TSrc, TDst>: DispatchPool<int> {
59 readonly Func<TSrc, TDst> m_transform;
59 readonly Func<TSrc, TDst> m_transform;
60 readonly TSrc[] m_source;
60 readonly TSrc[] m_source;
61 readonly TDst[] m_dest;
61 readonly TDst[] m_dest;
62 readonly Promise<TDst[]> m_promise = new Promise<TDst[]>();
62 readonly Promise<TDst[]> m_promise = new Promise<TDst[]>();
63
63
64 int m_pending;
64 int m_pending;
65 int m_next;
65 int m_next;
66
66
67 public ArrayMapper(TSrc[] source, Func<TSrc, TDst> transform, int threads)
67 public ArrayMapper(TSrc[] source, Func<TSrc, TDst> transform, int threads)
68 : base(threads) {
68 : base(threads) {
69
69
70 Debug.Assert (source != null);
70 Debug.Assert (source != null);
71 Debug.Assert( transform != null);
71 Debug.Assert( transform != null);
72
72
73 m_next = 0;
73 m_next = 0;
74 m_source = source;
74 m_source = source;
75 m_dest = new TDst[source.Length];
75 m_dest = new TDst[source.Length];
76 m_pending = source.Length;
76 m_pending = source.Length;
77 m_transform = transform;
77 m_transform = transform;
78
78
79 m_promise.Anyway(() => Dispose());
79 m_promise.Anyway(() => Dispose());
80 m_promise.Cancelled(() => Dispose());
80 m_promise.Cancelled(() => Dispose());
81
81
82 InitPool();
82 InitPool();
83 }
83 }
84
84
85 public Promise<TDst[]> Promise {
85 public Promise<TDst[]> Promise {
86 get {
86 get {
87 return m_promise;
87 return m_promise;
88 }
88 }
89 }
89 }
90
90
91 protected override bool TryDequeue(out int unit) {
91 protected override bool TryDequeue(out int unit) {
92 unit = Interlocked.Increment(ref m_next) - 1;
92 unit = Interlocked.Increment(ref m_next) - 1;
93 return unit >= m_source.Length ? false : true;
93 return unit >= m_source.Length ? false : true;
94 }
94 }
95
95
96 protected override void InvokeUnit(int unit) {
96 protected override void InvokeUnit(int unit) {
97 try {
97 try {
98 m_dest[unit] = m_transform(m_source[unit]);
98 m_dest[unit] = m_transform(m_source[unit]);
99 var pending = Interlocked.Decrement(ref m_pending);
99 var pending = Interlocked.Decrement(ref m_pending);
100 if (pending == 0)
100 if (pending == 0)
101 m_promise.Resolve(m_dest);
101 m_promise.Resolve(m_dest);
102 } catch (Exception e) {
102 } catch (Exception e) {
103 m_promise.Reject(e);
103 m_promise.Reject(e);
104 }
104 }
105 }
105 }
106 }
106 }
107
107
108 public static Promise<TDst[]> ParallelMap<TSrc, TDst> (this TSrc[] source, Func<TSrc,TDst> transform, int threads) {
108 public static Promise<TDst[]> ParallelMap<TSrc, TDst> (this TSrc[] source, Func<TSrc,TDst> transform, int threads) {
109 if (source == null)
109 if (source == null)
110 throw new ArgumentNullException("source");
110 throw new ArgumentNullException("source");
111 if (transform == null)
111 if (transform == null)
112 throw new ArgumentNullException("transform");
112 throw new ArgumentNullException("transform");
113
113
114 var mapper = new ArrayMapper<TSrc, TDst>(source, transform, threads);
114 var mapper = new ArrayMapper<TSrc, TDst>(source, transform, threads);
115 return mapper.Promise;
115 return mapper.Promise;
116 }
116 }
117
117
118 public static Promise<int> ParallelForEach<TSrc>(this TSrc[] source, Action<TSrc> action, int threads) {
118 public static Promise<int> ParallelForEach<TSrc>(this TSrc[] source, Action<TSrc> action, int threads) {
119 if (source == null)
119 if (source == null)
120 throw new ArgumentNullException("source");
120 throw new ArgumentNullException("source");
121 if (action == null)
121 if (action == null)
122 throw new ArgumentNullException("action");
122 throw new ArgumentNullException("action");
123
123
124 var iter = new ArrayIterator<TSrc>(source, action, threads);
124 var iter = new ArrayIterator<TSrc>(source, action, threads);
125 return iter.Promise;
125 return iter.Promise;
126 }
126 }
127
127
128 public static Promise<TDst[]> ChainedMap<TSrc, TDst>(this TSrc[] source, ChainedOperation<TSrc, TDst> transform, int threads) {
128 public static Promise<TDst[]> ChainedMap<TSrc, TDst>(this TSrc[] source, ChainedOperation<TSrc, TDst> transform, int threads) {
129 if (source == null)
129 if (source == null)
130 throw new ArgumentNullException("source");
130 throw new ArgumentNullException("source");
131 if (transform == null)
131 if (transform == null)
132 throw new ArgumentNullException("transform");
132 throw new ArgumentNullException("transform");
133 if (threads <= 0)
133 if (threads <= 0)
134 throw new ArgumentOutOfRangeException("Threads number must be greater then zero");
134 throw new ArgumentOutOfRangeException("Threads number must be greater then zero");
135
135
136 var promise = new Promise<TDst[]>();
136 var promise = new Promise<TDst[]>();
137 var res = new TDst[source.Length];
137 var res = new TDst[source.Length];
138 var pending = source.Length;
138 var pending = source.Length;
139 var semaphore = new Semaphore(threads, threads);
139 var semaphore = new Semaphore(threads, threads);
140
140
141 AsyncPool.InvokeNewThread(() => {
141 AsyncPool.InvokeNewThread(() => {
142 for (int i = 0; i < source.Length; i++) {
142 for (int i = 0; i < source.Length; i++) {
143 if(promise.State != PromiseState.Unresolved)
143 if(promise.IsResolved)
144 break; // stop processing in case of error or cancellation
144 break; // stop processing in case of error or cancellation
145 var idx = i;
145 var idx = i;
146 semaphore.WaitOne();
146 semaphore.WaitOne();
147 try {
147 try {
148 var p1 = transform(source[i]);
148 var p1 = transform(source[i]);
149 p1.Anyway(() => semaphore.Release());
149 p1.Anyway(() => semaphore.Release());
150 p1.Cancelled(() => semaphore.Release());
150 p1.Cancelled(() => semaphore.Release());
151 p1.Then(
151 p1.Then(
152 x => {
152 x => {
153 res[idx] = x;
153 res[idx] = x;
154 var left = Interlocked.Decrement(ref pending);
154 var left = Interlocked.Decrement(ref pending);
155 if (left == 0)
155 if (left == 0)
156 promise.Resolve(res);
156 promise.Resolve(res);
157 },
157 },
158 e => promise.Reject(e)
158 e => promise.Reject(e)
159 );
159 );
160
160
161 } catch (Exception e) {
161 } catch (Exception e) {
162 promise.Reject(e);
162 promise.Reject(e);
163 }
163 }
164 }
164 }
165 return 0;
165 return 0;
166 });
166 });
167
167
168 return promise.Anyway(() => semaphore.Dispose());
168 return promise.Anyway(() => semaphore.Dispose());
169 }
169 }
170 }
170 }
171 }
171 }
@@ -1,238 +1,330
1 using System;
1 using System;
2 using System.Collections.Generic;
2 using System.Collections.Generic;
3 using System.Linq;
3 using System.Linq;
4 using System.Text;
4 using System.Text;
5 using System.Threading;
5 using System.Threading;
6 using System.Diagnostics;
6 using System.Diagnostics;
7
7
8 namespace Implab.Parallels {
8 namespace Implab.Parallels {
9 public abstract class DispatchPool<TUnit> : IDisposable {
9 public abstract class DispatchPool<TUnit> : IDisposable {
10 readonly int m_minThreads;
10 readonly int m_minThreads;
11 readonly int m_maxThreads;
11 readonly int m_maxThreads;
12 int m_runningThreads = 0;
12
13 int m_maxRunningThreads = 0;
13 int m_createdThreads = 0; // the current size of the pool
14 int m_suspended = 0;
14 int m_activeThreads = 0; // the count of threads which are active
15 int m_exitRequired = 0;
15 int m_sleepingThreads = 0; // the count of currently inactive threads
16 int m_maxRunningThreads = 0; // the meximum reached size of the pool
17 int m_exitRequired = 0; // the pool is going to shutdown, all unused workers are released
18 int m_releaseTimeout = 100; // the timeout while the working thread will wait for the new tasks before exit
19 int m_wakeEvents = 0; // the count of wake events
20
16 AutoResetEvent m_hasTasks = new AutoResetEvent(false);
21 AutoResetEvent m_hasTasks = new AutoResetEvent(false);
17
22
18 protected DispatchPool(int min, int max) {
23 protected DispatchPool(int min, int max) {
19 if (min < 0)
24 if (min < 0)
20 throw new ArgumentOutOfRangeException("min");
25 throw new ArgumentOutOfRangeException("min");
21 if (max <= 0)
26 if (max <= 0)
22 throw new ArgumentOutOfRangeException("max");
27 throw new ArgumentOutOfRangeException("max");
23
28
24 if (min > max)
29 if (min > max)
25 min = max;
30 min = max;
26 m_minThreads = min;
31 m_minThreads = min;
27 m_maxThreads = max;
32 m_maxThreads = max;
28 }
33 }
29
34
30 protected DispatchPool(int threads)
35 protected DispatchPool(int threads)
31 : this(threads, threads) {
36 : this(threads, threads) {
32 }
37 }
33
38
34 protected DispatchPool() {
39 protected DispatchPool() {
35 int maxThreads, maxCP;
40 int maxThreads, maxCP;
36 ThreadPool.GetMaxThreads(out maxThreads, out maxCP);
41 ThreadPool.GetMaxThreads(out maxThreads, out maxCP);
37
42
38 m_minThreads = 0;
43 m_minThreads = 0;
39 m_maxThreads = maxThreads;
44 m_maxThreads = maxThreads;
40 }
45 }
41
46
42 protected void InitPool() {
47 protected void InitPool() {
43 for (int i = 0; i < m_minThreads; i++)
48 for (int i = 0; i < m_minThreads; i++)
44 StartWorker();
49 StartWorker();
45 }
50 }
46
51
47 public int ThreadCount {
52 public int PoolSize {
48 get {
53 get {
49 return m_runningThreads;
54 return m_createdThreads;
55 }
56 }
57
58 public int ActiveThreads {
59 get {
60 return m_activeThreads;
50 }
61 }
51 }
62 }
52
63
53 public int MaxRunningThreads {
64 public int MaxRunningThreads {
54 get {
65 get {
55 return m_maxRunningThreads;
66 return m_maxRunningThreads;
56 }
67 }
57 }
68 }
58
69
59 protected bool IsDisposed {
70 protected bool IsDisposed {
60 get {
71 get {
61 return m_exitRequired != 0;
72 return m_exitRequired != 0;
62 }
73 }
63 }
74 }
64
75
65 protected abstract bool TryDequeue(out TUnit unit);
76 protected abstract bool TryDequeue(out TUnit unit);
66
77
67 protected virtual bool ExtendPool() {
78 #region thread execution traits
68 if (m_suspended > 0) {
79 int SignalThread() {
80 var signals = Interlocked.Increment(ref m_wakeEvents);
81 if(signals == 1)
82 m_hasTasks.Set();
83 return signals;
84 }
85
86 bool FetchSignalOrWait(int timeout) {
87 var start = Environment.TickCount;
88
89 // означает, что поток владеет блокировкой и при успешном получении сигнала должен
90 // ее вернуть, чтобы другой ожидающий поток смог
91 bool hasLock = false;
92 do {
93 int signals;
94 do {
95 signals = m_wakeEvents;
96 if (signals == 0)
97 break;
98 } while (Interlocked.CompareExchange(ref m_wakeEvents, signals - 1, signals) != signals);
99
100 if (signals >= 1) {
101 if (signals > 1 && hasLock)
69 m_hasTasks.Set();
102 m_hasTasks.Set();
70 return true;
103 return true;
71 } else
104 }
72 return StartWorker();
105
106 if (timeout != -1)
107 timeout = Math.Max(0, timeout - (Environment.TickCount - start));
108
109 // если сигналов больше не осталось, то первый поток, который дошел сюда сбросит событие
110 // и уйдет на пустой цикл, после чего заблокируется
111
112 hasLock = true;
113 } while (m_hasTasks.WaitOne(timeout));
114
115 return false;
73 }
116 }
74
117
118 bool Sleep(int timeout) {
119 Interlocked.Increment(ref m_sleepingThreads);
120 if (FetchSignalOrWait(timeout)) {
121 Interlocked.Decrement(ref m_sleepingThreads);
122 return true;
123 } else {
124 Interlocked.Decrement(ref m_sleepingThreads);
125 return false;
126 }
127 }
128 #endregion
129
75 /// <summary>
130 /// <summary>
76 /// Запускает либо новый поток, если раньше не было ни одного потока, либо устанавливает событие пробуждение одного спящего потока
131 /// Запускает либо новый поток, если раньше не было ни одного потока, либо устанавливает событие пробуждение одного спящего потока
77 /// </summary>
132 /// </summary>
78 protected void WakePool() {
133 protected void GrowPool() {
79 m_hasTasks.Set(); // wake sleeping thread;
134 if (m_exitRequired != 0)
135 return;
136 if (m_sleepingThreads > m_wakeEvents) {
137 //Console.WriteLine("Waking threads (sleeps {0}, pending {1})", m_sleepingThreads, m_wakeEvents);
138
139 // all sleeping threads may gone
140 SignalThread(); // wake a sleeping thread;
141
142 // we can't check whether signal has been processed
143 // anyway it may take some time for the thread to start
144 // we will ensure that at least one thread is running
80
145
81 if (AllocateThreadSlot(1)) {
146 if (AllocateThreadSlot(1)) {
147 // if there were no threads in the pool
82 var worker = new Thread(this.Worker);
148 var worker = new Thread(this.Worker);
83 worker.IsBackground = true;
149 worker.IsBackground = true;
84 worker.Start();
150 worker.Start();
85 }
151 }
152 } else {
153 // if there is no sleeping threads in the pool
154 StartWorker();
155 }
86 }
156 }
87
157
88 protected virtual void Suspend() {
158 private bool Suspend() {
89 m_hasTasks.WaitOne();
159 //no tasks left, exit if the thread is no longer needed
160 bool last;
161 bool requestExit;
162
163 // if threads have a timeout before releasing
164 if (m_releaseTimeout > 0)
165 requestExit = !Sleep(m_releaseTimeout);
166 else
167 requestExit = true;
168
169 if (!requestExit)
170 return true;
171
172 // release unsused thread
173 if (requestExit && ReleaseThreadSlot(out last)) {
174 // in case at the moment the last thread was being released
175 // a new task was added to the queue, we need to try
176 // to revoke the thread to avoid the situation when the task is left unprocessed
177 if (last && Sleep(0)) { // Sleep(0) will fetch pending task or will return false
178 if (AllocateThreadSlot(1))
179 return true; // spin again...
180 else
181 SignalThread(); // since Sleep(0) has fetched the signal we neet to reschedule it
182
183 }
184
185 return false;
186 }
187
188 // wait till infinity
189 Sleep(-1);
190
191 return true;
90 }
192 }
91
193
92 #region thread slots traits
194 #region thread slots traits
93
195
94 bool AllocateThreadSlot() {
196 bool AllocateThreadSlot() {
95 int current;
197 int current;
96 // use spins to allocate slot for the new thread
198 // use spins to allocate slot for the new thread
97 do {
199 do {
98 current = m_runningThreads;
200 current = m_createdThreads;
99 if (current >= m_maxThreads || m_exitRequired != 0)
201 if (current >= m_maxThreads || m_exitRequired != 0)
100 // no more slots left or the pool has been disposed
202 // no more slots left or the pool has been disposed
101 return false;
203 return false;
102 } while (current != Interlocked.CompareExchange(ref m_runningThreads, current + 1, current));
204 } while (current != Interlocked.CompareExchange(ref m_createdThreads, current + 1, current));
103
205
104 UpdateMaxThreads(current + 1);
206 UpdateMaxThreads(current + 1);
105
207
106 return true;
208 return true;
107 }
209 }
108
210
109 bool AllocateThreadSlot(int desired) {
211 bool AllocateThreadSlot(int desired) {
110 if (desired - 1 != Interlocked.CompareExchange(ref m_runningThreads, desired, desired - 1))
212 if (desired - 1 != Interlocked.CompareExchange(ref m_createdThreads, desired, desired - 1))
111 return false;
213 return false;
112
214
113 UpdateMaxThreads(desired);
215 UpdateMaxThreads(desired);
114
216
115 return true;
217 return true;
116 }
218 }
117
219
118 bool ReleaseThreadSlot(out bool last) {
220 bool ReleaseThreadSlot(out bool last) {
119 last = false;
221 last = false;
120 int current;
222 int current;
121 // use spins to release slot for the new thread
223 // use spins to release slot for the new thread
122 do {
224 do {
123 current = m_runningThreads;
225 current = m_createdThreads;
124 if (current <= m_minThreads && m_exitRequired == 0)
226 if (current <= m_minThreads && m_exitRequired == 0)
125 // the thread is reserved
227 // the thread is reserved
126 return false;
228 return false;
127 } while (current != Interlocked.CompareExchange(ref m_runningThreads, current - 1, current));
229 } while (current != Interlocked.CompareExchange(ref m_createdThreads, current - 1, current));
128
230
129 last = (current == 1);
231 last = (current == 1);
130
232
131 return true;
233 return true;
132 }
234 }
133
235
134 /// <summary>
236 /// <summary>
135 /// releases thread slot unconditionally, used during cleanup
237 /// releases thread slot unconditionally, used during cleanup
136 /// </summary>
238 /// </summary>
137 /// <returns>true - no more threads left</returns>
239 /// <returns>true - no more threads left</returns>
138 bool ReleaseThreadSlotAnyway() {
240 bool ReleaseThreadSlotAnyway() {
139 var left = Interlocked.Decrement(ref m_runningThreads);
241 var left = Interlocked.Decrement(ref m_createdThreads);
140 return left == 0;
242 return left == 0;
141 }
243 }
142
244
143 void UpdateMaxThreads(int count) {
245 void UpdateMaxThreads(int count) {
144 int max;
246 int max;
145 do {
247 do {
146 max = m_maxRunningThreads;
248 max = m_maxRunningThreads;
147 if (max >= count)
249 if (max >= count)
148 break;
250 break;
149 } while(max != Interlocked.CompareExchange(ref m_maxRunningThreads, count, max));
251 } while(max != Interlocked.CompareExchange(ref m_maxRunningThreads, count, max));
150 }
252 }
151
253
152 #endregion
254 #endregion
153
255
154 bool StartWorker() {
256 bool StartWorker() {
155 if (AllocateThreadSlot()) {
257 if (AllocateThreadSlot()) {
156 // slot successfully allocated
258 // slot successfully allocated
157 var worker = new Thread(this.Worker);
259 var worker = new Thread(this.Worker);
158 worker.IsBackground = true;
260 worker.IsBackground = true;
159 worker.Start();
261 worker.Start();
160
262
161 return true;
263 return true;
162 } else {
264 } else {
163 return false;
265 return false;
164 }
266 }
165 }
267 }
166
268
167 bool FetchTask(out TUnit unit) {
269 protected abstract void InvokeUnit(TUnit unit);
270
271 void Worker() {
272 TUnit unit;
273 //Console.WriteLine("{0}: Active", Thread.CurrentThread.ManagedThreadId);
274 Interlocked.Increment(ref m_activeThreads);
168 do {
275 do {
169 // exit if requested
276 // exit if requested
170 if (m_exitRequired != 0) {
277 if (m_exitRequired != 0) {
171 // release the thread slot
278 // release the thread slot
279 Interlocked.Decrement(ref m_activeThreads);
172 if (ReleaseThreadSlotAnyway()) // it was the last worker
280 if (ReleaseThreadSlotAnyway()) // it was the last worker
173 m_hasTasks.Dispose();
281 m_hasTasks.Dispose();
174 else
282 else
175 m_hasTasks.Set(); // wake next worker
283 SignalThread(); // wake next worker
176 unit = default(TUnit);
284 unit = default(TUnit);
177 return false;
285 break;
178 }
286 }
179
287
180 // fetch task
288 // fetch task
181 if (TryDequeue(out unit)) {
289 if (TryDequeue(out unit)) {
182 ExtendPool();
290 InvokeUnit(unit);
183 return true;
291 continue;
184 }
185
186 //no tasks left, exit if the thread is no longer needed
187 bool last;
188 if (ReleaseThreadSlot(out last)) {
189 if (last && m_hasTasks.WaitOne(0)) {
190 if (AllocateThreadSlot(1))
191 continue; // spin again...
192 else
193 // we failed to reallocate slot for this thread
194 // therefore we need to release the event
195 m_hasTasks.Set();
196 }
292 }
197
293
198 return false;
294 Interlocked.Decrement(ref m_activeThreads);
199 }
200
295
201 // entering suspend state
296 // entering suspend state
202 Interlocked.Increment(ref m_suspended);
203 // keep this thread and wait
297 // keep this thread and wait
204 Suspend();
298 if (!Suspend())
205 Interlocked.Decrement(ref m_suspended);
299 break;
300 //Console.WriteLine("{0}: Awake", Thread.CurrentThread.ManagedThreadId);
301 Interlocked.Increment(ref m_activeThreads);
206 } while (true);
302 } while (true);
207 }
303 //Console.WriteLine("{0}: Exited", Thread.CurrentThread.ManagedThreadId);
208
209 protected abstract void InvokeUnit(TUnit unit);
210
211 void Worker() {
212 TUnit unit;
213 while (FetchTask(out unit))
214 InvokeUnit(unit);
215 }
304 }
216
305
217 protected virtual void Dispose(bool disposing) {
306 protected virtual void Dispose(bool disposing) {
218 if (disposing) {
307 if (disposing) {
219 if (m_exitRequired == 0) {
308 if (m_exitRequired == 0) {
220 if (Interlocked.CompareExchange(ref m_exitRequired, 1, 0) != 0)
309 if (Interlocked.CompareExchange(ref m_exitRequired, 1, 0) != 0)
221 return;
310 return;
222
311
223 // wake sleeping threads
312 // wake sleeping threads
224 m_hasTasks.Set();
313 if (m_createdThreads > 0)
314 SignalThread();
315 else
316 m_hasTasks.Dispose();
225 GC.SuppressFinalize(this);
317 GC.SuppressFinalize(this);
226 }
318 }
227 }
319 }
228 }
320 }
229
321
230 public void Dispose() {
322 public void Dispose() {
231 Dispose(true);
323 Dispose(true);
232 }
324 }
233
325
234 ~DispatchPool() {
326 ~DispatchPool() {
235 Dispose(false);
327 Dispose(false);
236 }
328 }
237 }
329 }
238 }
330 }
@@ -1,74 +1,75
1 using System;
1 using System;
2 using System.Collections.Generic;
2 using System.Collections.Generic;
3 using System.Linq;
3 using System.Linq;
4 using System.Text;
4 using System.Text;
5 using System.Threading;
5 using System.Threading;
6
6
7 namespace Implab.Parallels {
7 namespace Implab.Parallels {
8 public class MTQueue<T> {
8 public class MTQueue<T> {
9 class Node {
9 class Node {
10 public Node(T value) {
10 public Node(T value) {
11 this.value = value;
11 this.value = value;
12 }
12 }
13 public readonly T value;
13 public readonly T value;
14 public Node next;
14 public Node next;
15 }
15 }
16
16
17 Node m_first;
17 Node m_first;
18 Node m_last;
18 Node m_last;
19
19
20 public void Enqueue(T value) {
20 public void Enqueue(T value) {
21 var last = m_last;
21 var last = m_last;
22 var next = new Node(value);
22 var next = new Node(value);
23
23
24 while (last != Interlocked.CompareExchange(ref m_last, next, last))
24 while (last != Interlocked.CompareExchange(ref m_last, next, last))
25 last = m_last;
25 last = m_last;
26
26
27 if (last != null)
27 if (last != null)
28 last.next = next;
28 last.next = next;
29 else
29 else
30 m_first = next;
30 m_first = next;
31 }
31 }
32
32
33 public bool TryDequeue(out T value) {
33 public bool TryDequeue(out T value) {
34 Node first;
34 Node first;
35 Node next = null;
35 Node next = null;
36 value = default(T);
36 value = default(T);
37
37
38 do {
38 do {
39 first = m_first;
39 first = m_first;
40 if (first == null)
40 if (first == null)
41 return false;
41 return false;
42 next = first.next;
42 next = first.next;
43 if (next == null) {
43 if (next == null) {
44 // this is the last element,
44 // this is the last element,
45 // then try to update tail
45 // then try to update the tail
46 if (first != Interlocked.CompareExchange(ref m_last, null, first)) {
46 if (first != Interlocked.CompareExchange(ref m_last, null, first)) {
47 // this is inconsistent situation which means that the queue is empty
47 // this is a ace condition
48 if (m_last == null)
48 if (m_last == null)
49 // the queue is empty
49 return false;
50 return false;
50 // tail has been changed, that means that we need to restart
51 // tail has been changed, than we need to restart
51 continue;
52 continue;
52 }
53 }
53
54
54 // tail succesfully updated and first.next will never be changed
55 // tail succesfully updated and first.next will never be changed
55 // other readers will fail due to inconsistency m_last != m_fist, but m_first.next == null
56 // other readers will fail due to inconsistency m_last != m_fist, but m_first.next == null
56 // but the writer may update the m_first since the m_last is null
57 // but the writer may update the m_first since the m_last is null
57
58
58 // so we need to fix inconsistency by setting m_first to null, but if it already has been
59 // so we need to fix inconsistency by setting m_first to null, but if it already has been
59 // updated by a writer then we should just give up
60 // updated by a writer then we should just give up
60 Interlocked.CompareExchange(ref m_first, null, first);
61 Interlocked.CompareExchange(ref m_first, null, first);
61 break;
62 break;
62
63
63 } else {
64 } else {
64 if (first == Interlocked.CompareExchange(ref m_first, next, first))
65 if (first == Interlocked.CompareExchange(ref m_first, next, first))
65 // head succesfully updated
66 // head succesfully updated
66 break;
67 break;
67 }
68 }
68 } while (true);
69 } while (true);
69
70
70 value = first.value;
71 value = first.value;
71 return true;
72 return true;
72 }
73 }
73 }
74 }
74 }
75 }
@@ -1,89 +1,77
1 using System;
1 using System;
2 using System.Collections.Generic;
2 using System.Collections.Generic;
3 using System.Linq;
3 using System.Linq;
4 using System.Text;
4 using System.Text;
5 using System.Threading;
5 using System.Threading;
6 using System.Diagnostics;
6 using System.Diagnostics;
7
7
8 namespace Implab.Parallels {
8 namespace Implab.Parallels {
9 public class WorkerPool : DispatchPool<Action> {
9 public class WorkerPool : DispatchPool<Action> {
10
10
11 MTQueue<Action> m_queue = new MTQueue<Action>();
11 MTQueue<Action> m_queue = new MTQueue<Action>();
12 int m_queueLength = 0;
12 int m_queueLength = 0;
13 readonly int m_threshold = 1;
13 readonly int m_threshold = 1;
14
14
15 public WorkerPool(int minThreads, int maxThreads, int threshold)
15 public WorkerPool(int minThreads, int maxThreads, int threshold)
16 : base(minThreads, maxThreads) {
16 : base(minThreads, maxThreads) {
17 m_threshold = threshold;
17 m_threshold = threshold;
18 InitPool();
18 InitPool();
19 }
19 }
20
20
21 public WorkerPool(int minThreads, int maxThreads) :
21 public WorkerPool(int minThreads, int maxThreads) :
22 base(minThreads, maxThreads) {
22 base(minThreads, maxThreads) {
23 InitPool();
23 InitPool();
24 }
24 }
25
25
26 public WorkerPool(int threads)
26 public WorkerPool(int threads)
27 : base(threads) {
27 : base(threads) {
28 InitPool();
28 InitPool();
29 }
29 }
30
30
31 public WorkerPool()
31 public WorkerPool()
32 : base() {
32 : base() {
33 InitPool();
33 InitPool();
34 }
34 }
35
35
36 public Promise<T> Invoke<T>(Func<T> task) {
36 public Promise<T> Invoke<T>(Func<T> task) {
37 if (task == null)
37 if (task == null)
38 throw new ArgumentNullException("task");
38 throw new ArgumentNullException("task");
39 if (IsDisposed)
39 if (IsDisposed)
40 throw new ObjectDisposedException(ToString());
40 throw new ObjectDisposedException(ToString());
41
41
42 var promise = new Promise<T>();
42 var promise = new Promise<T>();
43
43
44 EnqueueTask(delegate() {
44 EnqueueTask(delegate() {
45 try {
45 try {
46 promise.Resolve(task());
46 promise.Resolve(task());
47 } catch (Exception e) {
47 } catch (Exception e) {
48 promise.Reject(e);
48 promise.Reject(e);
49 }
49 }
50 });
50 });
51
51
52 return promise;
52 return promise;
53 }
53 }
54
54
55 protected void EnqueueTask(Action unit) {
55 protected void EnqueueTask(Action unit) {
56 Debug.Assert(unit != null);
56 Debug.Assert(unit != null);
57 var len = Interlocked.Increment(ref m_queueLength);
57 var len = Interlocked.Increment(ref m_queueLength);
58 m_queue.Enqueue(unit);
58 m_queue.Enqueue(unit);
59
59
60 if(!ExtendPool())
60 if (len > m_threshold*ActiveThreads)
61 WakePool();
61 GrowPool();
62 }
63
64 protected override bool ExtendPool() {
65 if (m_queueLength <= m_threshold*ThreadCount)
66 // in this case we are in active thread and it request for additional workers
67 // satisfy it only when queue is longer than threshold
68 return false;
69 return base.ExtendPool();
70 }
62 }
71
63
72 protected override bool TryDequeue(out Action unit) {
64 protected override bool TryDequeue(out Action unit) {
73 if (m_queue.TryDequeue(out unit)) {
65 if (m_queue.TryDequeue(out unit)) {
74 Interlocked.Decrement(ref m_queueLength);
66 Interlocked.Decrement(ref m_queueLength);
75 return true;
67 return true;
76 }
68 }
77 return false;
69 return false;
78 }
70 }
79
71
80 protected override void InvokeUnit(Action unit) {
72 protected override void InvokeUnit(Action unit) {
81 unit();
73 unit();
82 }
74 }
83
75
84 protected override void Suspend() {
85 if (m_queueLength == 0)
86 base.Suspend();
87 }
76 }
88 }
77 }
89 }
@@ -1,549 +1,564
1 using System;
1 using System;
2 using System.Collections.Generic;
2 using System.Collections.Generic;
3 using System.Reflection;
3 using System.Reflection;
4 using System.Diagnostics;
4 using System.Diagnostics;
5 using System.Threading;
5 using System.Threading;
6 using Implab.Parallels;
6
7
7 namespace Implab {
8 namespace Implab {
8
9
9 public delegate void ErrorHandler(Exception e);
10 public delegate void ErrorHandler(Exception e);
10 public delegate T ErrorHandler<out T>(Exception e);
11 public delegate T ErrorHandler<out T>(Exception e);
11 public delegate void ResultHandler<in T>(T result);
12 public delegate void ResultHandler<in T>(T result);
12 public delegate TNew ResultMapper<in TSrc, out TNew>(TSrc result);
13 public delegate TNew ResultMapper<in TSrc, out TNew>(TSrc result);
13 public delegate Promise<TNew> ChainedOperation<in TSrc, TNew>(TSrc result);
14 public delegate Promise<TNew> ChainedOperation<in TSrc, TNew>(TSrc result);
14
15
15 /// <summary>
16 /// <summary>
16 /// Класс для асинхронного получения результатов. Так называемое "обещание".
17 /// Класс для асинхронного получения результатов. Так называемое "обещание".
17 /// </summary>
18 /// </summary>
18 /// <typeparam name="T">Тип получаемого результата</typeparam>
19 /// <typeparam name="T">Тип получаемого результата</typeparam>
19 /// <remarks>
20 /// <remarks>
20 /// <para>Сервис при обращении к его методу дает обещаиние о выполнении операции,
21 /// <para>Сервис при обращении к его методу дает обещаиние о выполнении операции,
21 /// клиент получив такое обещание может установить ряд обратных вызово для получения
22 /// клиент получив такое обещание может установить ряд обратных вызово для получения
22 /// событий выполнения обещания, тоесть завершения операции и предоставлении результатов.</para>
23 /// событий выполнения обещания, тоесть завершения операции и предоставлении результатов.</para>
23 /// <para>
24 /// <para>
24 /// Обещение может быть как выполнено, так и выполнено с ошибкой. Для подписки на
25 /// Обещение может быть как выполнено, так и выполнено с ошибкой. Для подписки на
25 /// данные события клиент должен использовать методы <c>Then</c>.
26 /// данные события клиент должен использовать методы <c>Then</c>.
26 /// </para>
27 /// </para>
27 /// <para>
28 /// <para>
28 /// Сервис, в свою очередь, по окончанию выполнения операции (возможно с ошибкой),
29 /// Сервис, в свою очередь, по окончанию выполнения операции (возможно с ошибкой),
29 /// использует методы <c>Resolve</c> либо <c>Reject</c> для оповещения клиетна о
30 /// использует методы <c>Resolve</c> либо <c>Reject</c> для оповещения клиетна о
30 /// выполнении обещания.
31 /// выполнении обещания.
31 /// </para>
32 /// </para>
32 /// <para>
33 /// <para>
33 /// Если сервер успел выполнить обещание еще до того, как клиент на него подписался,
34 /// Если сервер успел выполнить обещание еще до того, как клиент на него подписался,
34 /// то в момент подписки клиента будут вызваны соответсвующие события в синхронном
35 /// то в момент подписки клиента будут вызваны соответсвующие события в синхронном
35 /// режиме и клиент будет оповещен в любом случае. Иначе, обработчики добавляются в
36 /// режиме и клиент будет оповещен в любом случае. Иначе, обработчики добавляются в
36 /// список в порядке подписания и в этом же порядке они будут вызваны при выполнении
37 /// список в порядке подписания и в этом же порядке они будут вызваны при выполнении
37 /// обещания.
38 /// обещания.
38 /// </para>
39 /// </para>
39 /// <para>
40 /// <para>
40 /// Обрабатывая результаты обещания можно преобразовывать результаты либо инициировать
41 /// Обрабатывая результаты обещания можно преобразовывать результаты либо инициировать
41 /// связанные асинхронные операции, которые также возвращают обещания. Для этого следует
42 /// связанные асинхронные операции, которые также возвращают обещания. Для этого следует
42 /// использовать соответствующую форму методе <c>Then</c>.
43 /// использовать соответствующую форму методе <c>Then</c>.
43 /// </para>
44 /// </para>
44 /// <para>
45 /// <para>
45 /// Также хорошим правилом является то, что <c>Resolve</c> и <c>Reject</c> должен вызывать
46 /// Также хорошим правилом является то, что <c>Resolve</c> и <c>Reject</c> должен вызывать
46 /// только инициатор обещания иначе могут возникнуть противоречия.
47 /// только инициатор обещания иначе могут возникнуть противоречия.
47 /// </para>
48 /// </para>
48 /// </remarks>
49 /// </remarks>
49 public class Promise<T> : IPromise {
50 public class Promise<T> : IPromise {
50
51
51 struct ResultHandlerInfo {
52 struct HandlerDescriptor {
52 public ResultHandler<T> resultHandler;
53 public ResultHandler<T> resultHandler;
53 public ErrorHandler errorHandler;
54 public ErrorHandler errorHandler;
55 public Action cancellHandler;
56
57 public void Resolve(T result) {
58 if (resultHandler != null)
59 try {
60 resultHandler(result);
61 } catch (Exception e) {
62 Reject(e);
54 }
63 }
64 }
65
66 public void Reject(Exception err) {
67 if (errorHandler != null)
68 try {
69 errorHandler(err);
70 } catch {
71 }
72 }
73
74 public void Cancel() {
75 if (cancellHandler != null)
76 try {
77 cancellHandler();
78 } catch {
79 }
80 }
81 }
82
83 const int UnresolvedSate = 0;
84 const int TransitionalState = 1;
85 const int ResolvedState = 2;
86 const int RejectedState = 3;
87 const int CancelledState = 4;
55
88
56 readonly IPromise m_parent;
89 readonly IPromise m_parent;
57
90 readonly bool m_cancellable;
58 LinkedList<ResultHandlerInfo> m_resultHandlers = new LinkedList<ResultHandlerInfo>();
59 LinkedList<Action> m_cancelHandlers = new LinkedList<Action>();
60
91
61 readonly object m_lock = new Object();
62 readonly bool m_cancellable;
63 int m_childrenCount = 0;
92 int m_childrenCount = 0;
64
93 int m_state;
65 PromiseState m_state;
66 T m_result;
94 T m_result;
67 Exception m_error;
95 Exception m_error;
68
96
97 readonly MTQueue<HandlerDescriptor> m_handlers = new MTQueue<HandlerDescriptor>();
98
69 public Promise() {
99 public Promise() {
70 m_cancellable = true;
100 m_cancellable = true;
71 }
101 }
72
102
73 public Promise(IPromise parent, bool cancellable) {
103 public Promise(IPromise parent, bool cancellable) {
74 m_cancellable = cancellable;
104 m_cancellable = cancellable;
75 m_parent = parent;
105 m_parent = parent;
76 if (parent != null)
77 parent.HandleCancelled(InternalCancel);
78 }
106 }
79
107
80 void InternalCancel() {
108 void InternalCancel() {
81 // don't try to cancel parent :)
109 // don't try to cancel parent :)
82 Cancel(false);
110 Cancel(false);
83 }
111 }
84
112
113 bool BeginTransit() {
114 return UnresolvedSate == Interlocked.CompareExchange(ref m_state, TransitionalState, UnresolvedSate);
115 }
116
117 void CompleteTransit(int state) {
118 if (TransitionalState != Interlocked.CompareExchange(ref m_state, state, TransitionalState))
119 throw new InvalidOperationException("Can't complete transition when the object isn't in the transitional state");
120 }
121
122 public bool IsResolved {
123 get {
124 return m_state > 1;
125 }
126 }
127
128 public bool IsCancelled {
129 get {
130 return m_state == CancelledState;
131 }
132 }
133
85 /// <summary>
134 /// <summary>
86 /// Выполняет обещание, сообщая об успешном выполнении.
135 /// Выполняет обещание, сообщая об успешном выполнении.
87 /// </summary>
136 /// </summary>
88 /// <param name="result">Результат выполнения.</param>
137 /// <param name="result">Результат выполнения.</param>
89 /// <exception cref="InvalidOperationException">Данное обещание уже выполнено</exception>
138 /// <exception cref="InvalidOperationException">Данное обещание уже выполнено</exception>
90 public void Resolve(T result) {
139 public void Resolve(T result) {
91 lock (m_lock) {
140 if (BeginTransit()) {
92 if (m_state == PromiseState.Cancelled)
141 m_result = result;
93 return;
142 CompleteTransit(ResolvedState);
94 if (m_state != PromiseState.Unresolved)
143 OnStateChanged();
144 } else if (m_state != CancelledState)
95 throw new InvalidOperationException("The promise is already resolved");
145 throw new InvalidOperationException("The promise is already resolved");
96 m_result = result;
97 m_state = PromiseState.Resolved;
98 }
99
100 OnStateChanged();
101 }
146 }
102
147
103 /// <summary>
148 /// <summary>
104 /// Выполняет обещание, сообщая об ошибке
149 /// Выполняет обещание, сообщая об ошибке
105 /// </summary>
150 /// </summary>
106 /// <remarks>
151 /// <remarks>
107 /// Поскольку обещание должно работать в многопточной среде, при его выполнении сразу несколько потоков
152 /// Поскольку обещание должно работать в многопточной среде, при его выполнении сразу несколько потоков
108 /// могу вернуть ошибку, при этом только первая будет использована в качестве результата, остальные
153 /// могу вернуть ошибку, при этом только первая будет использована в качестве результата, остальные
109 /// будут проигнорированы.
154 /// будут проигнорированы.
110 /// </remarks>
155 /// </remarks>
111 /// <param name="error">Исключение возникшее при выполнении операции</param>
156 /// <param name="error">Исключение возникшее при выполнении операции</param>
112 /// <exception cref="InvalidOperationException">Данное обещание уже выполнено</exception>
157 /// <exception cref="InvalidOperationException">Данное обещание уже выполнено</exception>
113 public void Reject(Exception error) {
158 public void Reject(Exception error) {
114 lock (m_lock) {
159 if (BeginTransit()) {
115 if (m_state == PromiseState.Cancelled || m_state == PromiseState.Rejected)
160 m_error = error;
116 return;
161 CompleteTransit(RejectedState);
117 if (m_state != PromiseState.Unresolved)
162 OnStateChanged();
163 } else if (m_state == ResolvedState)
118 throw new InvalidOperationException("The promise is already resolved");
164 throw new InvalidOperationException("The promise is already resolved");
119 m_error = error;
120 m_state = PromiseState.Rejected;
121 }
122
123 OnStateChanged();
124 }
165 }
125
166
126 /// <summary>
167 /// <summary>
127 /// Отменяет операцию, если это возможно.
168 /// Отменяет операцию, если это возможно.
128 /// </summary>
169 /// </summary>
129 /// <returns><c>true</c> Операция была отменена, обработчики не будут вызваны.<c>false</c> отмена не возможна, поскольку обещание уже выполнено и обработчики отработали.</returns>
170 /// <returns><c>true</c> Операция была отменена, обработчики не будут вызваны.<c>false</c> отмена не возможна, поскольку обещание уже выполнено и обработчики отработали.</returns>
130 public bool Cancel() {
171 public bool Cancel() {
131 return Cancel(true);
172 return Cancel(true);
132 }
173 }
133
174
134 /// <summary>
175 /// <summary>
135 /// Adds new handlers to this promise.
176 /// Adds new handlers to this promise.
136 /// </summary>
177 /// </summary>
137 /// <param name="success">The handler of the successfully completed operation.
178 /// <param name="success">The handler of the successfully completed operation.
138 /// This handler will recieve an operation result as a parameter.</param>
179 /// This handler will recieve an operation result as a parameter.</param>
139 /// <param name="error">Handles an exception that may occur during the operation.</param>
180 /// <param name="error">Handles an exception that may occur during the operation.</param>
140 /// <returns>The new promise chained to this one.</returns>
181 /// <returns>The new promise chained to this one.</returns>
141 public Promise<T> Then(ResultHandler<T> success, ErrorHandler error) {
182 public Promise<T> Then(ResultHandler<T> success, ErrorHandler error) {
142 if (success == null && error == null)
183 if (success == null && error == null)
143 return this;
184 return this;
144
185
145 var medium = new Promise<T>(this, true);
186 var medium = new Promise<T>(this, true);
146
187
147 var handlerInfo = new ResultHandlerInfo();
188 ResultHandler<T> resultHandler;
148
149 if (success != null)
189 if (success != null)
150 handlerInfo.resultHandler = x => {
190 resultHandler = x => {
151 success(x);
191 success(x);
152 medium.Resolve(x);
192 medium.Resolve(x);
153 };
193 };
154 else
194 else
155 handlerInfo.resultHandler = medium.Resolve;
195 resultHandler = medium.Resolve;
156
196
197 ErrorHandler errorHandler;
157 if (error != null)
198 if (error != null)
158 handlerInfo.errorHandler = x => {
199 errorHandler = x => {
159 try {
200 try {
160 error(x);
201 error(x);
161 } catch { }
202 } catch { }
162 medium.Reject(x);
203 medium.Reject(x);
163 };
204 };
164 else
205 else
165 handlerInfo.errorHandler = medium.Reject;
206 errorHandler = medium.Reject;
166
207
167 AddHandler(handlerInfo);
208 AddHandler(resultHandler, errorHandler, medium.InternalCancel);
168
209
169 return medium;
210 return medium;
170 }
211 }
171
212
172 /// <summary>
213 /// <summary>
173 /// Adds new handlers to this promise.
214 /// Adds new handlers to this promise.
174 /// </summary>
215 /// </summary>
175 /// <param name="success">The handler of the successfully completed operation.
216 /// <param name="success">The handler of the successfully completed operation.
176 /// This handler will recieve an operation result as a parameter.</param>
217 /// This handler will recieve an operation result as a parameter.</param>
177 /// <param name="error">Handles an exception that may occur during the operation and returns the value which will be used as the result of the operation.</param>
218 /// <param name="error">Handles an exception that may occur during the operation and returns the value which will be used as the result of the operation.</param>
178 /// <returns>The new promise chained to this one.</returns>
219 /// <returns>The new promise chained to this one.</returns>
179 public Promise<T> Then(ResultHandler<T> success, ErrorHandler<T> error) {
220 public Promise<T> Then(ResultHandler<T> success, ErrorHandler<T> error) {
180 if (success == null && error == null)
221 if (success == null && error == null)
181 return this;
222 return this;
182
223
183 var medium = new Promise<T>(this, true);
224 var medium = new Promise<T>(this, true);
184
225
185 var handlerInfo = new ResultHandlerInfo();
226 ResultHandler<T> resultHandler;
227 ErrorHandler errorHandler;
186
228
187 if (success != null)
229 if (success != null)
188 handlerInfo.resultHandler = x => {
230 resultHandler = x => {
189 success(x);
231 success(x);
190 medium.Resolve(x);
232 medium.Resolve(x);
191 };
233 };
192 else
234 else
193 handlerInfo.resultHandler = medium.Resolve;
235 resultHandler = medium.Resolve;
194
236
195 if (error != null)
237 if (error != null)
196 handlerInfo.errorHandler = x => {
238 errorHandler = x => {
197 try {
239 try {
198 medium.Resolve(error(x));
240 medium.Resolve(error(x));
199 } catch { }
241 } catch { }
200 medium.Reject(x);
242 medium.Reject(x);
201 };
243 };
202 else
244 else
203 handlerInfo.errorHandler = medium.Reject;
245 errorHandler = medium.Reject;
204
246
205 AddHandler(handlerInfo);
247 AddHandler(resultHandler, errorHandler, medium.InternalCancel);
206
248
207 return medium;
249 return medium;
208 }
250 }
209
251
210
252
211 public Promise<T> Then(ResultHandler<T> success) {
253 public Promise<T> Then(ResultHandler<T> success) {
212 if (success == null)
254 if (success == null)
213 return this;
255 return this;
214
256
215 var medium = new Promise<T>(this, true);
257 var medium = new Promise<T>(this, true);
216
258
217 var handlerInfo = new ResultHandlerInfo();
259 ResultHandler<T> resultHandler;
218
260
219 if (success != null)
261 if (success != null)
220 handlerInfo.resultHandler = x => {
262 resultHandler = x => {
221 success(x);
263 success(x);
222 medium.Resolve(x);
264 medium.Resolve(x);
223 };
265 };
224 else
266 else
225 handlerInfo.resultHandler = medium.Resolve;
267 resultHandler = medium.Resolve;
226
268
227 handlerInfo.errorHandler = medium.Reject;
269 AddHandler(resultHandler, medium.Reject, medium.InternalCancel);
228
229 AddHandler(handlerInfo);
230
270
231 return medium;
271 return medium;
232 }
272 }
233
273
234 public Promise<T> Error(ErrorHandler error) {
274 public Promise<T> Error(ErrorHandler error) {
235 return Then(null, error);
275 return Then(null, error);
236 }
276 }
237
277
238 /// <summary>
278 /// <summary>
239 /// Handles error and allows to keep the promise.
279 /// Handles error and allows to keep the promise.
240 /// </summary>
280 /// </summary>
241 /// <remarks>
281 /// <remarks>
242 /// If the specified handler throws an exception, this exception will be used to reject the promise.
282 /// If the specified handler throws an exception, this exception will be used to reject the promise.
243 /// </remarks>
283 /// </remarks>
244 /// <param name="handler">The error handler which returns the result of the promise.</param>
284 /// <param name="handler">The error handler which returns the result of the promise.</param>
245 /// <returns>New promise.</returns>
285 /// <returns>New promise.</returns>
246 public Promise<T> Error(ErrorHandler<T> handler) {
286 public Promise<T> Error(ErrorHandler<T> handler) {
247 if (handler == null)
287 if (handler == null)
248 return this;
288 return this;
249
289
250 var medium = new Promise<T>(this, true);
290 var medium = new Promise<T>(this, true);
251
291
252 AddHandler(new ResultHandlerInfo {
292 AddHandler(
253 errorHandler = e => {
293 null,
294 e => {
254 try {
295 try {
255 medium.Resolve(handler(e));
296 medium.Resolve(handler(e));
256 } catch (Exception e2) {
297 } catch (Exception e2) {
257 medium.Reject(e2);
298 medium.Reject(e2);
258 }
299 }
259 }
300 },
260 });
301 medium.InternalCancel
302 );
261
303
262 return medium;
304 return medium;
263 }
305 }
264
306
265 public Promise<T> Anyway(Action handler) {
307 public Promise<T> Anyway(Action handler) {
266 if (handler == null)
308 if (handler == null)
267 return this;
309 return this;
268
310
269 var medium = new Promise<T>();
311 var medium = new Promise<T>();
270
312
271 AddHandler(new ResultHandlerInfo {
313 AddHandler(
272 resultHandler = x => {
314 x => {
273 // to avoid handler being called multiple times we handle exception by ourselfs
315 // to avoid handler being called multiple times we handle exception by ourselfs
274 try {
316 try {
275 handler();
317 handler();
276 medium.Resolve(x);
318 medium.Resolve(x);
277 } catch (Exception e) {
319 } catch (Exception e) {
278 medium.Reject(e);
320 medium.Reject(e);
279 }
321 }
280 },
322 },
281 errorHandler = x => {
323
324 e => {
282 try {
325 try {
283 handler();
326 handler();
284 } catch { }
327 } catch { }
285 medium.Reject(x);
328 medium.Reject(e);
286 }
329 },
287 });
330
331 medium.InternalCancel
332 );
288
333
289 return medium;
334 return medium;
290 }
335 }
291
336
292 /// <summary>
337 /// <summary>
293 /// Позволяет преобразовать результат выполения операции к новому типу.
338 /// Позволяет преобразовать результат выполения операции к новому типу.
294 /// </summary>
339 /// </summary>
295 /// <typeparam name="TNew">Новый тип результата.</typeparam>
340 /// <typeparam name="TNew">Новый тип результата.</typeparam>
296 /// <param name="mapper">Преобразование результата к новому типу.</param>
341 /// <param name="mapper">Преобразование результата к новому типу.</param>
297 /// <param name="error">Обработчик ошибки. Данный обработчик получит
342 /// <param name="error">Обработчик ошибки. Данный обработчик получит
298 /// исключение возникшее при выполнении операции.</param>
343 /// исключение возникшее при выполнении операции.</param>
299 /// <returns>Новое обещание, которое будет выполнено при выполнении исходного обещания.</returns>
344 /// <returns>Новое обещание, которое будет выполнено при выполнении исходного обещания.</returns>
300 public Promise<TNew> Map<TNew>(ResultMapper<T, TNew> mapper, ErrorHandler error) {
345 public Promise<TNew> Map<TNew>(ResultMapper<T, TNew> mapper, ErrorHandler error) {
301 if (mapper == null)
346 if (mapper == null)
302 throw new ArgumentNullException("mapper");
347 throw new ArgumentNullException("mapper");
303
348
304 // создаем прицепленное обещание
349 // создаем прицепленное обещание
305 var chained = new Promise<TNew>();
350 var chained = new Promise<TNew>();
306
351
307 AddHandler(new ResultHandlerInfo() {
352 ResultHandler<T> resultHandler = result => chained.Resolve(mapper(result));
308 resultHandler = result => chained.Resolve(mapper(result)),
353 ErrorHandler errorHandler = delegate(Exception e) {
309 errorHandler = delegate(Exception e) {
310 if (error != null)
354 if (error != null)
311 try {
355 try {
312 error(e);
356 error(e);
313 } catch { }
357 } catch { }
314 // в случае ошибки нужно передать исключение дальше по цепочке
358 // в случае ошибки нужно передать исключение дальше по цепочке
315 chained.Reject(e);
359 chained.Reject(e);
316 }
360 };
317 });
361
362
363 AddHandler(
364 resultHandler,
365 errorHandler,
366 chained.InternalCancel
367 );
318
368
319 return chained;
369 return chained;
320 }
370 }
321
371
322 public Promise<TNew> Map<TNew>(ResultMapper<T, TNew> mapper) {
372 public Promise<TNew> Map<TNew>(ResultMapper<T, TNew> mapper) {
323 return Map(mapper, null);
373 return Map(mapper, null);
324 }
374 }
325
375
326 /// <summary>
376 /// <summary>
327 /// Сцепляет несколько аснхронных операций. Указанная асинхронная операция будет вызвана после
377 /// Сцепляет несколько аснхронных операций. Указанная асинхронная операция будет вызвана после
328 /// выполнения текущей, а результат текущей операции может быть использован для инициализации
378 /// выполнения текущей, а результат текущей операции может быть использован для инициализации
329 /// новой операции.
379 /// новой операции.
330 /// </summary>
380 /// </summary>
331 /// <typeparam name="TNew">Тип результата указанной асинхронной операции.</typeparam>
381 /// <typeparam name="TNew">Тип результата указанной асинхронной операции.</typeparam>
332 /// <param name="chained">Асинхронная операция, которая должна будет начаться после выполнения текущей.</param>
382 /// <param name="chained">Асинхронная операция, которая должна будет начаться после выполнения текущей.</param>
333 /// <param name="error">Обработчик ошибки. Данный обработчик получит
383 /// <param name="error">Обработчик ошибки. Данный обработчик получит
334 /// исключение возникшее при выполнении текуещй операции.</param>
384 /// исключение возникшее при выполнении текуещй операции.</param>
335 /// <returns>Новое обещание, которое будет выполнено по окончанию указанной аснхронной операции.</returns>
385 /// <returns>Новое обещание, которое будет выполнено по окончанию указанной аснхронной операции.</returns>
336 public Promise<TNew> Chain<TNew>(ChainedOperation<T, TNew> chained, ErrorHandler error) {
386 public Promise<TNew> Chain<TNew>(ChainedOperation<T, TNew> chained, ErrorHandler error) {
337
387
338 // проблема в том, что на момент связывания еще не начата асинхронная операция, поэтому нужно
388 // проблема в том, что на момент связывания еще не начата асинхронная операция, поэтому нужно
339 // создать посредника, к которому будут подвызяваться следующие обработчики.
389 // создать посредника, к которому будут подвызяваться следующие обработчики.
340 // когда будет выполнена реальная асинхронная операция, она обратиться к посреднику, чтобы
390 // когда будет выполнена реальная асинхронная операция, она обратиться к посреднику, чтобы
341 // передать через него результаты работы.
391 // передать через него результаты работы.
342 var medium = new Promise<TNew>(this, true);
392 var medium = new Promise<TNew>(this, true);
343
393
344 AddHandler(new ResultHandlerInfo {
394 ResultHandler<T> resultHandler = delegate(T result) {
345 resultHandler = delegate(T result) {
395 if (medium.IsCancelled)
346 if (medium.State == PromiseState.Cancelled)
347 return;
396 return;
348
397
349 var promise = chained(result);
398 var promise = chained(result);
350
399
351 // notify chained operation that it's not needed
400 // notify chained operation that it's not needed
352 medium.Cancelled(() => promise.Cancel());
401 medium.Cancelled(() => promise.Cancel());
353 promise.Then(
402 promise.Then(
354 x => medium.Resolve(x),
403 x => medium.Resolve(x),
355 e => medium.Reject(e)
404 e => medium.Reject(e)
356 );
405 );
357 },
406 };
358 errorHandler = delegate(Exception e) {
407
408 ErrorHandler errorHandler = delegate(Exception e) {
359 if (error != null)
409 if (error != null)
360 error(e);
410 error(e);
361 // в случае ошибки нужно передать исключение дальше по цепочке
411 // в случае ошибки нужно передать исключение дальше по цепочке
362 medium.Reject(e);
412 medium.Reject(e);
363 }
413 };
364 });
414
415 AddHandler(
416 resultHandler,
417 errorHandler,
418 medium.InternalCancel
419 );
365
420
366 return medium;
421 return medium;
367 }
422 }
368
423
369 public Promise<TNew> Chain<TNew>(ChainedOperation<T, TNew> chained) {
424 public Promise<TNew> Chain<TNew>(ChainedOperation<T, TNew> chained) {
370 return Chain(chained, null);
425 return Chain(chained, null);
371 }
426 }
372
427
373 public Promise<T> Cancelled(Action handler) {
428 public Promise<T> Cancelled(Action handler) {
374 if (handler == null)
429 AddHandler(null, null, handler);
375 return this;
376 lock (m_lock) {
377 if (m_state == PromiseState.Unresolved)
378 m_cancelHandlers.AddLast(handler);
379 else if (m_state == PromiseState.Cancelled)
380 handler();
381 }
382 return this;
430 return this;
383 }
431 }
384
432
385 public void HandleCancelled(Action handler) {
433 public Promise<T> Finally(Action handler) {
386 Cancelled(handler);
434 if (handler == null)
435 throw new ArgumentNullException("handler");
436 AddHandler(
437 x => handler(),
438 e => handler(),
439 handler
440 );
441 return this;
387 }
442 }
388
443
389 /// <summary>
444 /// <summary>
390 /// Дожидается отложенного обещания и в случае успеха, возвращает
445 /// Дожидается отложенного обещания и в случае успеха, возвращает
391 /// его, результат, в противном случае бросает исключение.
446 /// его, результат, в противном случае бросает исключение.
392 /// </summary>
447 /// </summary>
393 /// <remarks>
448 /// <remarks>
394 /// <para>
449 /// <para>
395 /// Если ожидание обещания было прервано по таймауту, это не значит,
450 /// Если ожидание обещания было прервано по таймауту, это не значит,
396 /// что обещание было отменено или что-то в этом роде, это только
451 /// что обещание было отменено или что-то в этом роде, это только
397 /// означает, что мы его не дождались, однако все зарегистрированные
452 /// означает, что мы его не дождались, однако все зарегистрированные
398 /// обработчики, как были так остались и они будут вызваны, когда
453 /// обработчики, как были так остались и они будут вызваны, когда
399 /// обещание будет выполнено.
454 /// обещание будет выполнено.
400 /// </para>
455 /// </para>
401 /// <para>
456 /// <para>
402 /// Такое поведение вполне оправдано поскольку таймаут может истечь
457 /// Такое поведение вполне оправдано поскольку таймаут может истечь
403 /// в тот момент, когда началась обработка цепочки обработчиков, и
458 /// в тот момент, когда началась обработка цепочки обработчиков, и
404 /// к тому же текущее обещание может стоять в цепочке обещаний и его
459 /// к тому же текущее обещание может стоять в цепочке обещаний и его
405 /// отклонение может привести к непрогнозируемому результату.
460 /// отклонение может привести к непрогнозируемому результату.
406 /// </para>
461 /// </para>
407 /// </remarks>
462 /// </remarks>
408 /// <param name="timeout">Время ожидания</param>
463 /// <param name="timeout">Время ожидания</param>
409 /// <returns>Результат выполнения обещания</returns>
464 /// <returns>Результат выполнения обещания</returns>
410 public T Join(int timeout) {
465 public T Join(int timeout) {
411 var evt = new ManualResetEvent(false);
466 var evt = new ManualResetEvent(false);
412 Anyway(() => evt.Set());
467 Anyway(() => evt.Set());
413 Cancelled(() => evt.Set());
468 Cancelled(() => evt.Set());
414
469
415 if (!evt.WaitOne(timeout, true))
470 if (!evt.WaitOne(timeout, true))
416 throw new TimeoutException();
471 throw new TimeoutException();
417
472
418 switch (State) {
473 switch (m_state) {
419 case PromiseState.Resolved:
474 case ResolvedState:
420 return m_result;
475 return m_result;
421 case PromiseState.Cancelled:
476 case CancelledState:
422 throw new OperationCanceledException();
477 throw new OperationCanceledException();
423 case PromiseState.Rejected:
478 case RejectedState:
424 throw new TargetInvocationException(m_error);
479 throw new TargetInvocationException(m_error);
425 default:
480 default:
426 throw new ApplicationException(String.Format("Invalid promise state {0}", State));
481 throw new ApplicationException(String.Format("Invalid promise state {0}", m_state));
427 }
482 }
428 }
483 }
429
484
430 public T Join() {
485 public T Join() {
431 return Join(Timeout.Infinite);
486 return Join(Timeout.Infinite);
432 }
487 }
433
488
434 void AddHandler(ResultHandlerInfo handler) {
489 void AddHandler(ResultHandler<T> success, ErrorHandler error, Action cancel) {
435 bool invokeRequired = false;
490 Interlocked.Increment(ref m_childrenCount);
491
492 HandlerDescriptor handler = new HandlerDescriptor {
493 resultHandler = success,
494 errorHandler = error,
495 cancellHandler = cancel
496 };
436
497
437 lock (m_lock) {
498 bool queued;
438 m_childrenCount++;
439 if (m_state == PromiseState.Unresolved) {
440 m_resultHandlers.AddLast(handler);
441 } else
442 invokeRequired = true;
443 }
444
499
445 // обработчики не должны блокировать сам объект
500 if (!IsResolved) {
446 if (invokeRequired)
501 m_handlers.Enqueue(handler);
502 queued = true;
503 } else {
504 // the promise is in resolved state, just invoke the handled with minimum overhead
505 queued = false;
447 InvokeHandler(handler);
506 InvokeHandler(handler);
448 }
507 }
449
508
450 void InvokeHandler(ResultHandlerInfo handler) {
509 if (queued && IsResolved && m_handlers.TryDequeue(out handler))
510 // if the promise have been resolved while we was adding handler to the queue
511 // we can't guarantee that someone is still processing it
512 // therefore we will fetch a handler from the queue and execute it
513 // note that fetched handler may be not the one we have added
514 InvokeHandler(handler);
515
516 }
517
518 void InvokeHandler(HandlerDescriptor handler) {
451 switch (m_state) {
519 switch (m_state) {
452 case PromiseState.Resolved:
520 case ResolvedState:
453 try {
521 handler.Resolve(m_result);
454 if (handler.resultHandler != null)
455 handler.resultHandler(m_result);
456 } catch (Exception e) {
457 try {
458 if (handler.errorHandler != null)
459 handler.errorHandler(e);
460 } catch { }
461 }
462 break;
522 break;
463 case PromiseState.Rejected:
523 case RejectedState:
464 try {
524 handler.Reject(m_error);
465 if (handler.errorHandler != null)
525 break;
466 handler.errorHandler(m_error);
526 case CancelledState:
467 } catch { }
527 handler.Cancel();
468 break;
528 break;
469 default:
529 default:
470 // do nothing
530 // do nothing
471 return;
531 return;
472 }
532 }
473 }
533 }
474
534
475 protected virtual void OnStateChanged() {
535 protected virtual void OnStateChanged() {
476 switch (m_state) {
536 HandlerDescriptor handler;
477 case PromiseState.Resolved:
537 while (m_handlers.TryDequeue(out handler))
478 foreach (var resultHandlerInfo in m_resultHandlers)
538 InvokeHandler(handler);
479 try {
480 if (resultHandlerInfo.resultHandler != null)
481 resultHandlerInfo.resultHandler(m_result);
482 } catch (Exception e) {
483 try {
484 if (resultHandlerInfo.errorHandler != null)
485 resultHandlerInfo.errorHandler(e);
486 } catch { }
487 }
488 break;
489 case PromiseState.Cancelled:
490 foreach (var cancelHandler in m_cancelHandlers)
491 cancelHandler();
492 break;
493 case PromiseState.Rejected:
494 foreach (var resultHandlerInfo in m_resultHandlers)
495 try {
496 if (resultHandlerInfo.errorHandler != null)
497 resultHandlerInfo.errorHandler(m_error);
498 } catch { }
499 break;
500 default:
501 throw new InvalidOperationException(String.Format("Promise entered an invalid state {0}", m_state));
502 }
503
504 m_resultHandlers = null;
505 m_cancelHandlers = null;
506 }
539 }
507
540
508
541
509
542
510 public bool IsExclusive {
543 public bool IsExclusive {
511 get {
544 get {
512 lock (m_lock) {
513 return m_childrenCount <= 1;
545 return m_childrenCount <= 1;
514 }
546 }
515 }
547 }
516 }
517
518 public PromiseState State {
519 get {
520 lock (m_lock) {
521 return m_state;
522 }
523 }
524 }
525
548
526 protected bool Cancel(bool dependencies) {
549 protected bool Cancel(bool dependencies) {
527 bool result;
550 if (BeginTransit()) {
528
551 CompleteTransit(CancelledState);
529 lock (m_lock) {
530 if (m_state == PromiseState.Unresolved) {
531 m_state = PromiseState.Cancelled;
532 result = true;
533 } else {
534 result = false;
535 }
536 }
537
538 if (result)
539 OnStateChanged();
552 OnStateChanged();
540
553
541 if (dependencies && m_parent != null && m_parent.IsExclusive) {
554 if (dependencies && m_parent != null && m_parent.IsExclusive)
542 m_parent.Cancel();
555 m_parent.Cancel();
556
557 return true;
558 } else {
559 return false;
543 }
560 }
544
545 return result;
546 }
561 }
547
562
548 }
563 }
549 }
564 }
General Comments 0
You need to be logged in to leave comments. Login now