##// END OF EJS Templates
Слияние с promises
cin -
r23:f0568ff069a5 merge default
parent child
Show More
@@ -1,333 +1,333
1 1 using System;
2 2 using Microsoft.VisualStudio.TestTools.UnitTesting;
3 3 using System.Reflection;
4 4 using System.Threading;
5 5 using Implab.Parallels;
6 6
7 7 namespace Implab.Test {
8 8 [TestClass]
9 9 public class AsyncTests {
10 10 [TestMethod]
11 11 public void ResolveTest() {
12 12 int res = -1;
13 13 var p = new Promise<int>();
14 14 p.Then(x => res = x);
15 15 p.Resolve(100);
16 16
17 Assert.AreEqual(res, 100);
17 Assert.AreEqual(100, res);
18 18 }
19 19
20 20 [TestMethod]
21 21 public void RejectTest() {
22 22 int res = -1;
23 23 Exception err = null;
24 24
25 25 var p = new Promise<int>();
26 26 p.Then(x => res = x, e => err = e);
27 27 p.Reject(new ApplicationException("error"));
28 28
29 29 Assert.AreEqual(res, -1);
30 30 Assert.AreEqual(err.Message, "error");
31 31
32 32 }
33 33
34 34 [TestMethod]
35 35 public void JoinSuccessTest() {
36 36 var p = new Promise<int>();
37 37 p.Resolve(100);
38 38 Assert.AreEqual(p.Join(), 100);
39 39 }
40 40
41 41 [TestMethod]
42 42 public void JoinFailTest() {
43 43 var p = new Promise<int>();
44 44 p.Reject(new ApplicationException("failed"));
45 45
46 46 try {
47 47 p.Join();
48 48 throw new ApplicationException("WRONG!");
49 49 } catch (TargetInvocationException err) {
50 50 Assert.AreEqual(err.InnerException.Message, "failed");
51 51 } catch {
52 52 Assert.Fail("Got wrong excaption");
53 53 }
54 54 }
55 55
56 56 [TestMethod]
57 57 public void MapTest() {
58 58 var p = new Promise<int>();
59 59
60 60 var p2 = p.Map(x => x.ToString());
61 61 p.Resolve(100);
62 62
63 63 Assert.AreEqual(p2.Join(), "100");
64 64 }
65 65
66 66 [TestMethod]
67 67 public void FixErrorTest() {
68 68 var p = new Promise<int>();
69 69
70 70 var p2 = p.Error(e => 101);
71 71
72 72 p.Reject(new Exception());
73 73
74 74 Assert.AreEqual(p2.Join(), 101);
75 75 }
76 76
77 77 [TestMethod]
78 78 public void ChainTest() {
79 79 var p1 = new Promise<int>();
80 80
81 81 var p3 = p1.Chain(x => {
82 82 var p2 = new Promise<string>();
83 83 p2.Resolve(x.ToString());
84 84 return p2;
85 85 });
86 86
87 87 p1.Resolve(100);
88 88
89 89 Assert.AreEqual(p3.Join(), "100");
90 90 }
91 91
92 92 [TestMethod]
93 93 public void PoolTest() {
94 94 var pid = Thread.CurrentThread.ManagedThreadId;
95 95 var p = AsyncPool.Invoke(() => Thread.CurrentThread.ManagedThreadId);
96 96
97 97 Assert.AreNotEqual(pid, p.Join());
98 98 }
99 99
100 100 [TestMethod]
101 101 public void WorkerPoolSizeTest() {
102 102 var pool = new WorkerPool(5, 10, 0);
103 103
104 Assert.AreEqual(5, pool.ThreadCount);
104 Assert.AreEqual(5, pool.PoolSize);
105 105
106 pool.Invoke(() => { Thread.Sleep(1000000); return 10; });
107 pool.Invoke(() => { Thread.Sleep(1000000); return 10; });
108 pool.Invoke(() => { Thread.Sleep(1000000); return 10; });
106 pool.Invoke(() => { Thread.Sleep(100000000); return 10; });
107 pool.Invoke(() => { Thread.Sleep(100000000); return 10; });
108 pool.Invoke(() => { Thread.Sleep(100000000); return 10; });
109 109
110 Assert.AreEqual(5, pool.ThreadCount);
110 Assert.AreEqual(5, pool.PoolSize);
111 111
112 112 for (int i = 0; i < 100; i++)
113 pool.Invoke(() => { Thread.Sleep(1000000); return 10; });
114 Thread.Sleep(100);
115 Assert.AreEqual(10, pool.ThreadCount);
113 pool.Invoke(() => { Thread.Sleep(100000000); return 10; });
114 Thread.Sleep(200);
115 Assert.AreEqual(10, pool.PoolSize);
116 116
117 117 pool.Dispose();
118 118 }
119 119
120 120 [TestMethod]
121 121 public void WorkerPoolCorrectTest() {
122 122 var pool = new WorkerPool(0,1000,100);
123 123
124 124 int iterations = 1000;
125 125 int pending = iterations;
126 126 var stop = new ManualResetEvent(false);
127 127
128 128 var count = 0;
129 129 for (int i = 0; i < iterations; i++) {
130 130 pool
131 131 .Invoke(() => 1)
132 132 .Then(x => Interlocked.Add(ref count, x))
133 133 .Then(x => Math.Log10(x))
134 134 .Anyway(() => {
135 135 Interlocked.Decrement(ref pending);
136 136 if (pending == 0)
137 137 stop.Set();
138 138 });
139 139 }
140 140
141 141 stop.WaitOne();
142 142
143 143 Assert.AreEqual(iterations, count);
144 144 Console.WriteLine("Max threads: {0}", pool.MaxRunningThreads);
145 145 pool.Dispose();
146 146
147 147 }
148 148
149 149 [TestMethod]
150 150 public void WorkerPoolDisposeTest() {
151 151 var pool = new WorkerPool(5, 20);
152 Assert.AreEqual(5, pool.ThreadCount);
152 Assert.AreEqual(5, pool.PoolSize);
153 153 pool.Dispose();
154 Thread.Sleep(100);
155 Assert.AreEqual(0, pool.ThreadCount);
154 Thread.Sleep(500);
155 Assert.AreEqual(0, pool.PoolSize);
156 156 pool.Dispose();
157 157 }
158 158
159 159 [TestMethod]
160 160 public void MTQueueTest() {
161 161 var queue = new MTQueue<int>();
162 162 int res;
163 163
164 164 queue.Enqueue(10);
165 165 Assert.IsTrue(queue.TryDequeue(out res));
166 166 Assert.AreEqual(10, res);
167 167 Assert.IsFalse(queue.TryDequeue(out res));
168 168
169 169 for (int i = 0; i < 1000; i++)
170 170 queue.Enqueue(i);
171 171
172 172 for (int i = 0; i < 1000; i++) {
173 173 queue.TryDequeue(out res);
174 174 Assert.AreEqual(i, res);
175 175 }
176 176
177 177 int writers = 0;
178 178 int readers = 0;
179 179 var stop = new ManualResetEvent(false);
180 180 int total = 0;
181 181
182 182 int itemsPerWriter = 1000;
183 183 int writersCount = 3;
184 184
185 185 for (int i = 0; i < writersCount; i++) {
186 186 Interlocked.Increment(ref writers);
187 187 var wn = i;
188 188 AsyncPool
189 189 .InvokeNewThread(() => {
190 190 for (int ii = 0; ii < itemsPerWriter; ii++) {
191 191 queue.Enqueue(1);
192 192 }
193 193 return 1;
194 194 })
195 195 .Anyway(() => Interlocked.Decrement(ref writers));
196 196 }
197 197
198 198 for (int i = 0; i < 10; i++) {
199 199 Interlocked.Increment(ref readers);
200 200 var wn = i;
201 201 AsyncPool
202 202 .InvokeNewThread(() => {
203 203 int t;
204 204 do {
205 205 while (queue.TryDequeue(out t))
206 206 Interlocked.Add(ref total, t);
207 207 } while (writers > 0);
208 208 return 1;
209 209 })
210 210 .Anyway(() => {
211 211 Interlocked.Decrement(ref readers);
212 212 if (readers == 0)
213 213 stop.Set();
214 214 });
215 215 }
216 216
217 217 stop.WaitOne();
218 218
219 219 Assert.AreEqual(itemsPerWriter * writersCount, total);
220 220 }
221 221
222 222 [TestMethod]
223 223 public void ParallelMapTest() {
224 224
225 225 int count = 100000;
226 226
227 227 double[] args = new double[count];
228 228 var rand = new Random();
229 229
230 230 for (int i = 0; i < count; i++)
231 231 args[i] = rand.NextDouble();
232 232
233 233 var t = Environment.TickCount;
234 234 var res = args.ParallelMap(x => Math.Sin(x*x), 4).Join();
235 235
236 236 Console.WriteLine("Map complete in {0} ms", Environment.TickCount - t);
237 237
238 238 t = Environment.TickCount;
239 239 for (int i = 0; i < count; i++)
240 240 Assert.AreEqual(Math.Sin(args[i] * args[i]), res[i]);
241 241 Console.WriteLine("Verified in {0} ms", Environment.TickCount - t);
242 242 }
243 243
244 244 [TestMethod]
245 245 public void ChainedMapTest() {
246 246
247 using (var pool = new WorkerPool(8,100,0)) {
247 using (var pool = new WorkerPool(4,4,0)) {
248 248 int count = 10000;
249 249
250 250 double[] args = new double[count];
251 251 var rand = new Random();
252 252
253 253 for (int i = 0; i < count; i++)
254 254 args[i] = rand.NextDouble();
255 255
256 256 var t = Environment.TickCount;
257 257 var res = args
258 258 .ChainedMap(
259 259 x => pool.Invoke(
260 260 () => Math.Sin(x * x)
261 261 ),
262 262 4
263 263 )
264 264 .Join();
265 265
266 266 Console.WriteLine("Map complete in {0} ms", Environment.TickCount - t);
267 267
268 268 t = Environment.TickCount;
269 269 for (int i = 0; i < count; i++)
270 270 Assert.AreEqual(Math.Sin(args[i] * args[i]), res[i]);
271 271 Console.WriteLine("Verified in {0} ms", Environment.TickCount - t);
272 272 Console.WriteLine("Max workers: {0}", pool.MaxRunningThreads);
273 273 }
274 274 }
275 275
276 276 [TestMethod]
277 277 public void ParallelForEachTest() {
278 278
279 279 int count = 100000;
280 280
281 281 int[] args = new int[count];
282 282 var rand = new Random();
283 283
284 284 for (int i = 0; i < count; i++)
285 285 args[i] = (int)(rand.NextDouble() * 100);
286 286
287 287 int result = 0;
288 288
289 289 var t = Environment.TickCount;
290 290 args.ParallelForEach(x => Interlocked.Add(ref result, x), 4).Join();
291 291
292 292 Console.WriteLine("Iteration complete in {0} ms, result: {1}", Environment.TickCount - t, result);
293 293
294 294 int result2 = 0;
295 295
296 296 t = Environment.TickCount;
297 297 for (int i = 0; i < count; i++)
298 298 result2 += args[i];
299 299 Assert.AreEqual(result2, result);
300 300 Console.WriteLine("Verified in {0} ms", Environment.TickCount - t);
301 301 }
302 302
303 303 [TestMethod]
304 304 public void ComplexCase1Test() {
305 305 var flags = new bool[3];
306 306
307 307 // op1 (aync 200ms) => op2 (async 200ms) => op3 (sync map)
308 308
309 309 var p = PromiseHelper
310 310 .Sleep(200, "Alan")
311 311 .Cancelled(() => flags[0] = true)
312 312 .Chain(x =>
313 313 PromiseHelper
314 314 .Sleep(200, "Hi, " + x)
315 315 .Map(y => y)
316 316 .Cancelled(() => flags[1] = true)
317 317 )
318 318 .Cancelled(() => flags[2] = true);
319 319 Thread.Sleep(300);
320 320 p.Cancel();
321 321 try {
322 322 Assert.AreEqual(p.Join(), "Hi, Alan");
323 323 Assert.Fail("Shouldn't get here");
324 324 } catch (OperationCanceledException) {
325 325 }
326 326
327 327 Assert.IsFalse(flags[0]);
328 328 Assert.IsTrue(flags[1]);
329 329 Assert.IsTrue(flags[2]);
330 330 }
331 331 }
332 332 }
333 333
1 NO CONTENT: modified file, binary diff hidden
1 NO CONTENT: modified file, binary diff hidden
@@ -1,33 +1,20
1 1 using System;
2 2 using System.Collections.Generic;
3 3 using System.Linq;
4 4 using System.Text;
5 5
6 6 namespace Implab
7 7 {
8 8 public interface IPromise: ICancellable
9 9 {
10 10 /// <summary>
11 11 /// Check whereather the promise has no more than one dependent promise.
12 12 /// </summary>
13 13 bool IsExclusive
14 14 {
15 15 get;
16 16 }
17 17
18 /// <summary>
19 /// The current state of the promise.
20 /// </summary>
21 PromiseState State
22 {
23 get;
24 }
25 18
26 /// <summary>
27 /// Registers handler for the case when the promise is cencelled. If the promise already cancelled the
28 /// handler will be invoked immediatelly.
29 /// </summary>
30 /// <param name="handler">The handler</param>
31 void HandleCancelled(Action handler);
32 19 }
33 20 }
@@ -1,171 +1,171
1 1 using System;
2 2 using System.Collections.Generic;
3 3 using System.Diagnostics;
4 4 using System.Linq;
5 5 using System.Text;
6 6 using System.Threading;
7 7
8 8 namespace Implab.Parallels {
9 9 public static class ArrayTraits {
10 10 class ArrayIterator<TSrc> : DispatchPool<int> {
11 11 readonly Action<TSrc> m_action;
12 12 readonly TSrc[] m_source;
13 13 readonly Promise<int> m_promise = new Promise<int>();
14 14
15 15 int m_pending;
16 16 int m_next;
17 17
18 18 public ArrayIterator(TSrc[] source, Action<TSrc> action, int threads)
19 19 : base(threads) {
20 20
21 21 Debug.Assert(source != null);
22 22 Debug.Assert(action != null);
23 23
24 24 m_next = 0;
25 25 m_source = source;
26 26 m_pending = source.Length;
27 27 m_action = action;
28 28
29 29 m_promise.Anyway(() => Dispose());
30 30 m_promise.Cancelled(() => Dispose());
31 31
32 32 InitPool();
33 33 }
34 34
35 35 public Promise<int> Promise {
36 36 get {
37 37 return m_promise;
38 38 }
39 39 }
40 40
41 41 protected override bool TryDequeue(out int unit) {
42 42 unit = Interlocked.Increment(ref m_next) - 1;
43 43 return unit >= m_source.Length ? false : true;
44 44 }
45 45
46 46 protected override void InvokeUnit(int unit) {
47 47 try {
48 48 m_action(m_source[unit]);
49 49 var pending = Interlocked.Decrement(ref m_pending);
50 50 if (pending == 0)
51 51 m_promise.Resolve(m_source.Length);
52 52 } catch (Exception e) {
53 53 m_promise.Reject(e);
54 54 }
55 55 }
56 56 }
57 57
58 58 class ArrayMapper<TSrc, TDst>: DispatchPool<int> {
59 59 readonly Func<TSrc, TDst> m_transform;
60 60 readonly TSrc[] m_source;
61 61 readonly TDst[] m_dest;
62 62 readonly Promise<TDst[]> m_promise = new Promise<TDst[]>();
63 63
64 64 int m_pending;
65 65 int m_next;
66 66
67 67 public ArrayMapper(TSrc[] source, Func<TSrc, TDst> transform, int threads)
68 68 : base(threads) {
69 69
70 70 Debug.Assert (source != null);
71 71 Debug.Assert( transform != null);
72 72
73 73 m_next = 0;
74 74 m_source = source;
75 75 m_dest = new TDst[source.Length];
76 76 m_pending = source.Length;
77 77 m_transform = transform;
78 78
79 79 m_promise.Anyway(() => Dispose());
80 80 m_promise.Cancelled(() => Dispose());
81 81
82 82 InitPool();
83 83 }
84 84
85 85 public Promise<TDst[]> Promise {
86 86 get {
87 87 return m_promise;
88 88 }
89 89 }
90 90
91 91 protected override bool TryDequeue(out int unit) {
92 92 unit = Interlocked.Increment(ref m_next) - 1;
93 93 return unit >= m_source.Length ? false : true;
94 94 }
95 95
96 96 protected override void InvokeUnit(int unit) {
97 97 try {
98 98 m_dest[unit] = m_transform(m_source[unit]);
99 99 var pending = Interlocked.Decrement(ref m_pending);
100 100 if (pending == 0)
101 101 m_promise.Resolve(m_dest);
102 102 } catch (Exception e) {
103 103 m_promise.Reject(e);
104 104 }
105 105 }
106 106 }
107 107
108 108 public static Promise<TDst[]> ParallelMap<TSrc, TDst> (this TSrc[] source, Func<TSrc,TDst> transform, int threads) {
109 109 if (source == null)
110 110 throw new ArgumentNullException("source");
111 111 if (transform == null)
112 112 throw new ArgumentNullException("transform");
113 113
114 114 var mapper = new ArrayMapper<TSrc, TDst>(source, transform, threads);
115 115 return mapper.Promise;
116 116 }
117 117
118 118 public static Promise<int> ParallelForEach<TSrc>(this TSrc[] source, Action<TSrc> action, int threads) {
119 119 if (source == null)
120 120 throw new ArgumentNullException("source");
121 121 if (action == null)
122 122 throw new ArgumentNullException("action");
123 123
124 124 var iter = new ArrayIterator<TSrc>(source, action, threads);
125 125 return iter.Promise;
126 126 }
127 127
128 128 public static Promise<TDst[]> ChainedMap<TSrc, TDst>(this TSrc[] source, ChainedOperation<TSrc, TDst> transform, int threads) {
129 129 if (source == null)
130 130 throw new ArgumentNullException("source");
131 131 if (transform == null)
132 132 throw new ArgumentNullException("transform");
133 133 if (threads <= 0)
134 134 throw new ArgumentOutOfRangeException("Threads number must be greater then zero");
135 135
136 136 var promise = new Promise<TDst[]>();
137 137 var res = new TDst[source.Length];
138 138 var pending = source.Length;
139 139 var semaphore = new Semaphore(threads, threads);
140 140
141 141 AsyncPool.InvokeNewThread(() => {
142 142 for (int i = 0; i < source.Length; i++) {
143 if(promise.State != PromiseState.Unresolved)
143 if(promise.IsResolved)
144 144 break; // stop processing in case of error or cancellation
145 145 var idx = i;
146 146 semaphore.WaitOne();
147 147 try {
148 148 var p1 = transform(source[i]);
149 149 p1.Anyway(() => semaphore.Release());
150 150 p1.Cancelled(() => semaphore.Release());
151 151 p1.Then(
152 152 x => {
153 153 res[idx] = x;
154 154 var left = Interlocked.Decrement(ref pending);
155 155 if (left == 0)
156 156 promise.Resolve(res);
157 157 },
158 158 e => promise.Reject(e)
159 159 );
160 160
161 161 } catch (Exception e) {
162 162 promise.Reject(e);
163 163 }
164 164 }
165 165 return 0;
166 166 });
167 167
168 168 return promise.Anyway(() => semaphore.Dispose());
169 169 }
170 170 }
171 171 }
@@ -1,238 +1,330
1 1 using System;
2 2 using System.Collections.Generic;
3 3 using System.Linq;
4 4 using System.Text;
5 5 using System.Threading;
6 6 using System.Diagnostics;
7 7
8 8 namespace Implab.Parallels {
9 9 public abstract class DispatchPool<TUnit> : IDisposable {
10 10 readonly int m_minThreads;
11 11 readonly int m_maxThreads;
12 int m_runningThreads = 0;
13 int m_maxRunningThreads = 0;
14 int m_suspended = 0;
15 int m_exitRequired = 0;
12
13 int m_createdThreads = 0; // the current size of the pool
14 int m_activeThreads = 0; // the count of threads which are active
15 int m_sleepingThreads = 0; // the count of currently inactive threads
16 int m_maxRunningThreads = 0; // the meximum reached size of the pool
17 int m_exitRequired = 0; // the pool is going to shutdown, all unused workers are released
18 int m_releaseTimeout = 100; // the timeout while the working thread will wait for the new tasks before exit
19 int m_wakeEvents = 0; // the count of wake events
20
16 21 AutoResetEvent m_hasTasks = new AutoResetEvent(false);
17 22
18 23 protected DispatchPool(int min, int max) {
19 24 if (min < 0)
20 25 throw new ArgumentOutOfRangeException("min");
21 26 if (max <= 0)
22 27 throw new ArgumentOutOfRangeException("max");
23 28
24 29 if (min > max)
25 30 min = max;
26 31 m_minThreads = min;
27 32 m_maxThreads = max;
28 33 }
29 34
30 35 protected DispatchPool(int threads)
31 36 : this(threads, threads) {
32 37 }
33 38
34 39 protected DispatchPool() {
35 40 int maxThreads, maxCP;
36 41 ThreadPool.GetMaxThreads(out maxThreads, out maxCP);
37 42
38 43 m_minThreads = 0;
39 44 m_maxThreads = maxThreads;
40 45 }
41 46
42 47 protected void InitPool() {
43 48 for (int i = 0; i < m_minThreads; i++)
44 49 StartWorker();
45 50 }
46 51
47 public int ThreadCount {
52 public int PoolSize {
48 53 get {
49 return m_runningThreads;
54 return m_createdThreads;
55 }
56 }
57
58 public int ActiveThreads {
59 get {
60 return m_activeThreads;
50 61 }
51 62 }
52 63
53 64 public int MaxRunningThreads {
54 65 get {
55 66 return m_maxRunningThreads;
56 67 }
57 68 }
58 69
59 70 protected bool IsDisposed {
60 71 get {
61 72 return m_exitRequired != 0;
62 73 }
63 74 }
64 75
65 76 protected abstract bool TryDequeue(out TUnit unit);
66 77
67 protected virtual bool ExtendPool() {
68 if (m_suspended > 0) {
78 #region thread execution traits
79 int SignalThread() {
80 var signals = Interlocked.Increment(ref m_wakeEvents);
81 if(signals == 1)
69 82 m_hasTasks.Set();
83 return signals;
84 }
85
86 bool FetchSignalOrWait(int timeout) {
87 var start = Environment.TickCount;
88
89 // означает, что поток владеет блокировкой и при успешном получении сигнала должен
90 // ее вернуть, чтобы другой ожидающий поток смог
91 bool hasLock = false;
92 do {
93 int signals;
94 do {
95 signals = m_wakeEvents;
96 if (signals == 0)
97 break;
98 } while (Interlocked.CompareExchange(ref m_wakeEvents, signals - 1, signals) != signals);
99
100 if (signals >= 1) {
101 if (signals > 1 && hasLock)
102 m_hasTasks.Set();
103 return true;
104 }
105
106 if (timeout != -1)
107 timeout = Math.Max(0, timeout - (Environment.TickCount - start));
108
109 // если сигналов больше не осталось, то первый поток, который дошел сюда сбросит событие
110 // и уйдет на пустой цикл, после чего заблокируется
111
112 hasLock = true;
113 } while (m_hasTasks.WaitOne(timeout));
114
115 return false;
116 }
117
118 bool Sleep(int timeout) {
119 Interlocked.Increment(ref m_sleepingThreads);
120 if (FetchSignalOrWait(timeout)) {
121 Interlocked.Decrement(ref m_sleepingThreads);
70 122 return true;
71 } else
72 return StartWorker();
123 } else {
124 Interlocked.Decrement(ref m_sleepingThreads);
125 return false;
126 }
73 127 }
128 #endregion
74 129
75 130 /// <summary>
76 131 /// Запускает либо новый поток, если раньше не было ни одного потока, либо устанавливает событие пробуждение одного спящего потока
77 132 /// </summary>
78 protected void WakePool() {
79 m_hasTasks.Set(); // wake sleeping thread;
133 protected void GrowPool() {
134 if (m_exitRequired != 0)
135 return;
136 if (m_sleepingThreads > m_wakeEvents) {
137 //Console.WriteLine("Waking threads (sleeps {0}, pending {1})", m_sleepingThreads, m_wakeEvents);
138
139 // all sleeping threads may gone
140 SignalThread(); // wake a sleeping thread;
80 141
81 if (AllocateThreadSlot(1)) {
82 var worker = new Thread(this.Worker);
83 worker.IsBackground = true;
84 worker.Start();
142 // we can't check whether signal has been processed
143 // anyway it may take some time for the thread to start
144 // we will ensure that at least one thread is running
145
146 if (AllocateThreadSlot(1)) {
147 // if there were no threads in the pool
148 var worker = new Thread(this.Worker);
149 worker.IsBackground = true;
150 worker.Start();
151 }
152 } else {
153 // if there is no sleeping threads in the pool
154 StartWorker();
85 155 }
86 156 }
87 157
88 protected virtual void Suspend() {
89 m_hasTasks.WaitOne();
158 private bool Suspend() {
159 //no tasks left, exit if the thread is no longer needed
160 bool last;
161 bool requestExit;
162
163 // if threads have a timeout before releasing
164 if (m_releaseTimeout > 0)
165 requestExit = !Sleep(m_releaseTimeout);
166 else
167 requestExit = true;
168
169 if (!requestExit)
170 return true;
171
172 // release unsused thread
173 if (requestExit && ReleaseThreadSlot(out last)) {
174 // in case at the moment the last thread was being released
175 // a new task was added to the queue, we need to try
176 // to revoke the thread to avoid the situation when the task is left unprocessed
177 if (last && Sleep(0)) { // Sleep(0) will fetch pending task or will return false
178 if (AllocateThreadSlot(1))
179 return true; // spin again...
180 else
181 SignalThread(); // since Sleep(0) has fetched the signal we neet to reschedule it
182
183 }
184
185 return false;
186 }
187
188 // wait till infinity
189 Sleep(-1);
190
191 return true;
90 192 }
91 193
92 194 #region thread slots traits
93 195
94 196 bool AllocateThreadSlot() {
95 197 int current;
96 198 // use spins to allocate slot for the new thread
97 199 do {
98 current = m_runningThreads;
200 current = m_createdThreads;
99 201 if (current >= m_maxThreads || m_exitRequired != 0)
100 202 // no more slots left or the pool has been disposed
101 203 return false;
102 } while (current != Interlocked.CompareExchange(ref m_runningThreads, current + 1, current));
204 } while (current != Interlocked.CompareExchange(ref m_createdThreads, current + 1, current));
103 205
104 206 UpdateMaxThreads(current + 1);
105 207
106 208 return true;
107 209 }
108 210
109 211 bool AllocateThreadSlot(int desired) {
110 if (desired - 1 != Interlocked.CompareExchange(ref m_runningThreads, desired, desired - 1))
212 if (desired - 1 != Interlocked.CompareExchange(ref m_createdThreads, desired, desired - 1))
111 213 return false;
112 214
113 215 UpdateMaxThreads(desired);
114 216
115 217 return true;
116 218 }
117 219
118 220 bool ReleaseThreadSlot(out bool last) {
119 221 last = false;
120 222 int current;
121 223 // use spins to release slot for the new thread
122 224 do {
123 current = m_runningThreads;
225 current = m_createdThreads;
124 226 if (current <= m_minThreads && m_exitRequired == 0)
125 227 // the thread is reserved
126 228 return false;
127 } while (current != Interlocked.CompareExchange(ref m_runningThreads, current - 1, current));
229 } while (current != Interlocked.CompareExchange(ref m_createdThreads, current - 1, current));
128 230
129 231 last = (current == 1);
130 232
131 233 return true;
132 234 }
133 235
134 236 /// <summary>
135 237 /// releases thread slot unconditionally, used during cleanup
136 238 /// </summary>
137 239 /// <returns>true - no more threads left</returns>
138 240 bool ReleaseThreadSlotAnyway() {
139 var left = Interlocked.Decrement(ref m_runningThreads);
241 var left = Interlocked.Decrement(ref m_createdThreads);
140 242 return left == 0;
141 243 }
142 244
143 245 void UpdateMaxThreads(int count) {
144 246 int max;
145 247 do {
146 248 max = m_maxRunningThreads;
147 249 if (max >= count)
148 250 break;
149 251 } while(max != Interlocked.CompareExchange(ref m_maxRunningThreads, count, max));
150 252 }
151 253
152 254 #endregion
153 255
154 256 bool StartWorker() {
155 257 if (AllocateThreadSlot()) {
156 258 // slot successfully allocated
157 259 var worker = new Thread(this.Worker);
158 260 worker.IsBackground = true;
159 261 worker.Start();
160 262
161 263 return true;
162 264 } else {
163 265 return false;
164 266 }
165 267 }
166 268
167 bool FetchTask(out TUnit unit) {
269 protected abstract void InvokeUnit(TUnit unit);
270
271 void Worker() {
272 TUnit unit;
273 //Console.WriteLine("{0}: Active", Thread.CurrentThread.ManagedThreadId);
274 Interlocked.Increment(ref m_activeThreads);
168 275 do {
169 276 // exit if requested
170 277 if (m_exitRequired != 0) {
171 278 // release the thread slot
279 Interlocked.Decrement(ref m_activeThreads);
172 280 if (ReleaseThreadSlotAnyway()) // it was the last worker
173 281 m_hasTasks.Dispose();
174 282 else
175 m_hasTasks.Set(); // wake next worker
283 SignalThread(); // wake next worker
176 284 unit = default(TUnit);
177 return false;
285 break;
178 286 }
179 287
180 288 // fetch task
181 289 if (TryDequeue(out unit)) {
182 ExtendPool();
183 return true;
290 InvokeUnit(unit);
291 continue;
184 292 }
185 293
186 //no tasks left, exit if the thread is no longer needed
187 bool last;
188 if (ReleaseThreadSlot(out last)) {
189 if (last && m_hasTasks.WaitOne(0)) {
190 if (AllocateThreadSlot(1))
191 continue; // spin again...
192 else
193 // we failed to reallocate slot for this thread
194 // therefore we need to release the event
195 m_hasTasks.Set();
196 }
197
198 return false;
199 }
294 Interlocked.Decrement(ref m_activeThreads);
200 295
201 296 // entering suspend state
202 Interlocked.Increment(ref m_suspended);
203 297 // keep this thread and wait
204 Suspend();
205 Interlocked.Decrement(ref m_suspended);
298 if (!Suspend())
299 break;
300 //Console.WriteLine("{0}: Awake", Thread.CurrentThread.ManagedThreadId);
301 Interlocked.Increment(ref m_activeThreads);
206 302 } while (true);
207 }
208
209 protected abstract void InvokeUnit(TUnit unit);
210
211 void Worker() {
212 TUnit unit;
213 while (FetchTask(out unit))
214 InvokeUnit(unit);
303 //Console.WriteLine("{0}: Exited", Thread.CurrentThread.ManagedThreadId);
215 304 }
216 305
217 306 protected virtual void Dispose(bool disposing) {
218 307 if (disposing) {
219 308 if (m_exitRequired == 0) {
220 309 if (Interlocked.CompareExchange(ref m_exitRequired, 1, 0) != 0)
221 310 return;
222 311
223 312 // wake sleeping threads
224 m_hasTasks.Set();
313 if (m_createdThreads > 0)
314 SignalThread();
315 else
316 m_hasTasks.Dispose();
225 317 GC.SuppressFinalize(this);
226 318 }
227 319 }
228 320 }
229 321
230 322 public void Dispose() {
231 323 Dispose(true);
232 324 }
233 325
234 326 ~DispatchPool() {
235 327 Dispose(false);
236 328 }
237 329 }
238 330 }
@@ -1,74 +1,75
1 1 using System;
2 2 using System.Collections.Generic;
3 3 using System.Linq;
4 4 using System.Text;
5 5 using System.Threading;
6 6
7 7 namespace Implab.Parallels {
8 8 public class MTQueue<T> {
9 9 class Node {
10 10 public Node(T value) {
11 11 this.value = value;
12 12 }
13 13 public readonly T value;
14 14 public Node next;
15 15 }
16 16
17 17 Node m_first;
18 18 Node m_last;
19 19
20 20 public void Enqueue(T value) {
21 21 var last = m_last;
22 22 var next = new Node(value);
23 23
24 24 while (last != Interlocked.CompareExchange(ref m_last, next, last))
25 25 last = m_last;
26 26
27 27 if (last != null)
28 28 last.next = next;
29 29 else
30 30 m_first = next;
31 31 }
32 32
33 33 public bool TryDequeue(out T value) {
34 34 Node first;
35 35 Node next = null;
36 36 value = default(T);
37 37
38 38 do {
39 39 first = m_first;
40 40 if (first == null)
41 41 return false;
42 42 next = first.next;
43 43 if (next == null) {
44 44 // this is the last element,
45 // then try to update tail
45 // then try to update the tail
46 46 if (first != Interlocked.CompareExchange(ref m_last, null, first)) {
47 // this is inconsistent situation which means that the queue is empty
47 // this is a ace condition
48 48 if (m_last == null)
49 // the queue is empty
49 50 return false;
50 // tail has been changed, that means that we need to restart
51 // tail has been changed, than we need to restart
51 52 continue;
52 53 }
53 54
54 55 // tail succesfully updated and first.next will never be changed
55 56 // other readers will fail due to inconsistency m_last != m_fist, but m_first.next == null
56 57 // but the writer may update the m_first since the m_last is null
57 58
58 59 // so we need to fix inconsistency by setting m_first to null, but if it already has been
59 60 // updated by a writer then we should just give up
60 61 Interlocked.CompareExchange(ref m_first, null, first);
61 62 break;
62 63
63 64 } else {
64 65 if (first == Interlocked.CompareExchange(ref m_first, next, first))
65 66 // head succesfully updated
66 67 break;
67 68 }
68 69 } while (true);
69 70
70 71 value = first.value;
71 72 return true;
72 73 }
73 74 }
74 75 }
@@ -1,89 +1,77
1 1 using System;
2 2 using System.Collections.Generic;
3 3 using System.Linq;
4 4 using System.Text;
5 5 using System.Threading;
6 6 using System.Diagnostics;
7 7
8 8 namespace Implab.Parallels {
9 9 public class WorkerPool : DispatchPool<Action> {
10 10
11 11 MTQueue<Action> m_queue = new MTQueue<Action>();
12 12 int m_queueLength = 0;
13 13 readonly int m_threshold = 1;
14 14
15 15 public WorkerPool(int minThreads, int maxThreads, int threshold)
16 16 : base(minThreads, maxThreads) {
17 17 m_threshold = threshold;
18 18 InitPool();
19 19 }
20 20
21 21 public WorkerPool(int minThreads, int maxThreads) :
22 22 base(minThreads, maxThreads) {
23 23 InitPool();
24 24 }
25 25
26 26 public WorkerPool(int threads)
27 27 : base(threads) {
28 28 InitPool();
29 29 }
30 30
31 31 public WorkerPool()
32 32 : base() {
33 33 InitPool();
34 34 }
35 35
36 36 public Promise<T> Invoke<T>(Func<T> task) {
37 37 if (task == null)
38 38 throw new ArgumentNullException("task");
39 39 if (IsDisposed)
40 40 throw new ObjectDisposedException(ToString());
41 41
42 42 var promise = new Promise<T>();
43 43
44 44 EnqueueTask(delegate() {
45 45 try {
46 46 promise.Resolve(task());
47 47 } catch (Exception e) {
48 48 promise.Reject(e);
49 49 }
50 50 });
51 51
52 52 return promise;
53 53 }
54 54
55 55 protected void EnqueueTask(Action unit) {
56 56 Debug.Assert(unit != null);
57 57 var len = Interlocked.Increment(ref m_queueLength);
58 58 m_queue.Enqueue(unit);
59 59
60 if(!ExtendPool())
61 WakePool();
62 }
63
64 protected override bool ExtendPool() {
65 if (m_queueLength <= m_threshold*ThreadCount)
66 // in this case we are in active thread and it request for additional workers
67 // satisfy it only when queue is longer than threshold
68 return false;
69 return base.ExtendPool();
60 if (len > m_threshold*ActiveThreads)
61 GrowPool();
70 62 }
71 63
72 64 protected override bool TryDequeue(out Action unit) {
73 65 if (m_queue.TryDequeue(out unit)) {
74 66 Interlocked.Decrement(ref m_queueLength);
75 67 return true;
76 68 }
77 69 return false;
78 70 }
79 71
80 72 protected override void InvokeUnit(Action unit) {
81 73 unit();
82 74 }
83 75
84 protected override void Suspend() {
85 if (m_queueLength == 0)
86 base.Suspend();
87 }
88 76 }
89 77 }
@@ -1,549 +1,564
1 1 using System;
2 2 using System.Collections.Generic;
3 3 using System.Reflection;
4 4 using System.Diagnostics;
5 5 using System.Threading;
6 using Implab.Parallels;
6 7
7 8 namespace Implab {
8 9
9 10 public delegate void ErrorHandler(Exception e);
10 11 public delegate T ErrorHandler<out T>(Exception e);
11 12 public delegate void ResultHandler<in T>(T result);
12 13 public delegate TNew ResultMapper<in TSrc, out TNew>(TSrc result);
13 14 public delegate Promise<TNew> ChainedOperation<in TSrc, TNew>(TSrc result);
14 15
15 16 /// <summary>
16 17 /// Класс для асинхронного получения результатов. Так называемое "обещание".
17 18 /// </summary>
18 19 /// <typeparam name="T">Тип получаемого результата</typeparam>
19 20 /// <remarks>
20 21 /// <para>Сервис при обращении к его методу дает обещаиние о выполнении операции,
21 22 /// клиент получив такое обещание может установить ряд обратных вызово для получения
22 23 /// событий выполнения обещания, тоесть завершения операции и предоставлении результатов.</para>
23 24 /// <para>
24 25 /// Обещение может быть как выполнено, так и выполнено с ошибкой. Для подписки на
25 26 /// данные события клиент должен использовать методы <c>Then</c>.
26 27 /// </para>
27 28 /// <para>
28 29 /// Сервис, в свою очередь, по окончанию выполнения операции (возможно с ошибкой),
29 30 /// использует методы <c>Resolve</c> либо <c>Reject</c> для оповещения клиетна о
30 31 /// выполнении обещания.
31 32 /// </para>
32 33 /// <para>
33 34 /// Если сервер успел выполнить обещание еще до того, как клиент на него подписался,
34 35 /// то в момент подписки клиента будут вызваны соответсвующие события в синхронном
35 36 /// режиме и клиент будет оповещен в любом случае. Иначе, обработчики добавляются в
36 37 /// список в порядке подписания и в этом же порядке они будут вызваны при выполнении
37 38 /// обещания.
38 39 /// </para>
39 40 /// <para>
40 41 /// Обрабатывая результаты обещания можно преобразовывать результаты либо инициировать
41 42 /// связанные асинхронные операции, которые также возвращают обещания. Для этого следует
42 43 /// использовать соответствующую форму методе <c>Then</c>.
43 44 /// </para>
44 45 /// <para>
45 46 /// Также хорошим правилом является то, что <c>Resolve</c> и <c>Reject</c> должен вызывать
46 47 /// только инициатор обещания иначе могут возникнуть противоречия.
47 48 /// </para>
48 49 /// </remarks>
49 50 public class Promise<T> : IPromise {
50 51
51 struct ResultHandlerInfo {
52 struct HandlerDescriptor {
52 53 public ResultHandler<T> resultHandler;
53 54 public ErrorHandler errorHandler;
55 public Action cancellHandler;
56
57 public void Resolve(T result) {
58 if (resultHandler != null)
59 try {
60 resultHandler(result);
61 } catch (Exception e) {
62 Reject(e);
63 }
64 }
65
66 public void Reject(Exception err) {
67 if (errorHandler != null)
68 try {
69 errorHandler(err);
70 } catch {
71 }
72 }
73
74 public void Cancel() {
75 if (cancellHandler != null)
76 try {
77 cancellHandler();
78 } catch {
79 }
80 }
54 81 }
55 82
83 const int UnresolvedSate = 0;
84 const int TransitionalState = 1;
85 const int ResolvedState = 2;
86 const int RejectedState = 3;
87 const int CancelledState = 4;
88
56 89 readonly IPromise m_parent;
57
58 LinkedList<ResultHandlerInfo> m_resultHandlers = new LinkedList<ResultHandlerInfo>();
59 LinkedList<Action> m_cancelHandlers = new LinkedList<Action>();
90 readonly bool m_cancellable;
60 91
61 readonly object m_lock = new Object();
62 readonly bool m_cancellable;
63 92 int m_childrenCount = 0;
64
65 PromiseState m_state;
93 int m_state;
66 94 T m_result;
67 95 Exception m_error;
68 96
97 readonly MTQueue<HandlerDescriptor> m_handlers = new MTQueue<HandlerDescriptor>();
98
69 99 public Promise() {
70 100 m_cancellable = true;
71 101 }
72 102
73 103 public Promise(IPromise parent, bool cancellable) {
74 104 m_cancellable = cancellable;
75 105 m_parent = parent;
76 if (parent != null)
77 parent.HandleCancelled(InternalCancel);
78 106 }
79 107
80 108 void InternalCancel() {
81 109 // don't try to cancel parent :)
82 110 Cancel(false);
83 111 }
84 112
113 bool BeginTransit() {
114 return UnresolvedSate == Interlocked.CompareExchange(ref m_state, TransitionalState, UnresolvedSate);
115 }
116
117 void CompleteTransit(int state) {
118 if (TransitionalState != Interlocked.CompareExchange(ref m_state, state, TransitionalState))
119 throw new InvalidOperationException("Can't complete transition when the object isn't in the transitional state");
120 }
121
122 public bool IsResolved {
123 get {
124 return m_state > 1;
125 }
126 }
127
128 public bool IsCancelled {
129 get {
130 return m_state == CancelledState;
131 }
132 }
133
85 134 /// <summary>
86 135 /// Выполняет обещание, сообщая об успешном выполнении.
87 136 /// </summary>
88 137 /// <param name="result">Результат выполнения.</param>
89 138 /// <exception cref="InvalidOperationException">Данное обещание уже выполнено</exception>
90 139 public void Resolve(T result) {
91 lock (m_lock) {
92 if (m_state == PromiseState.Cancelled)
93 return;
94 if (m_state != PromiseState.Unresolved)
95 throw new InvalidOperationException("The promise is already resolved");
140 if (BeginTransit()) {
96 141 m_result = result;
97 m_state = PromiseState.Resolved;
98 }
99
100 OnStateChanged();
142 CompleteTransit(ResolvedState);
143 OnStateChanged();
144 } else if (m_state != CancelledState)
145 throw new InvalidOperationException("The promise is already resolved");
101 146 }
102 147
103 148 /// <summary>
104 149 /// Выполняет обещание, сообщая об ошибке
105 150 /// </summary>
106 151 /// <remarks>
107 152 /// Поскольку обещание должно работать в многопточной среде, при его выполнении сразу несколько потоков
108 153 /// могу вернуть ошибку, при этом только первая будет использована в качестве результата, остальные
109 154 /// будут проигнорированы.
110 155 /// </remarks>
111 156 /// <param name="error">Исключение возникшее при выполнении операции</param>
112 157 /// <exception cref="InvalidOperationException">Данное обещание уже выполнено</exception>
113 158 public void Reject(Exception error) {
114 lock (m_lock) {
115 if (m_state == PromiseState.Cancelled || m_state == PromiseState.Rejected)
116 return;
117 if (m_state != PromiseState.Unresolved)
118 throw new InvalidOperationException("The promise is already resolved");
159 if (BeginTransit()) {
119 160 m_error = error;
120 m_state = PromiseState.Rejected;
121 }
122
123 OnStateChanged();
161 CompleteTransit(RejectedState);
162 OnStateChanged();
163 } else if (m_state == ResolvedState)
164 throw new InvalidOperationException("The promise is already resolved");
124 165 }
125 166
126 167 /// <summary>
127 168 /// Отменяет операцию, если это возможно.
128 169 /// </summary>
129 170 /// <returns><c>true</c> Операция была отменена, обработчики не будут вызваны.<c>false</c> отмена не возможна, поскольку обещание уже выполнено и обработчики отработали.</returns>
130 171 public bool Cancel() {
131 172 return Cancel(true);
132 173 }
133 174
134 175 /// <summary>
135 176 /// Adds new handlers to this promise.
136 177 /// </summary>
137 178 /// <param name="success">The handler of the successfully completed operation.
138 179 /// This handler will recieve an operation result as a parameter.</param>
139 180 /// <param name="error">Handles an exception that may occur during the operation.</param>
140 181 /// <returns>The new promise chained to this one.</returns>
141 182 public Promise<T> Then(ResultHandler<T> success, ErrorHandler error) {
142 183 if (success == null && error == null)
143 184 return this;
144 185
145 186 var medium = new Promise<T>(this, true);
146 187
147 var handlerInfo = new ResultHandlerInfo();
148
188 ResultHandler<T> resultHandler;
149 189 if (success != null)
150 handlerInfo.resultHandler = x => {
190 resultHandler = x => {
151 191 success(x);
152 192 medium.Resolve(x);
153 193 };
154 194 else
155 handlerInfo.resultHandler = medium.Resolve;
195 resultHandler = medium.Resolve;
156 196
197 ErrorHandler errorHandler;
157 198 if (error != null)
158 handlerInfo.errorHandler = x => {
199 errorHandler = x => {
159 200 try {
160 201 error(x);
161 202 } catch { }
162 203 medium.Reject(x);
163 204 };
164 205 else
165 handlerInfo.errorHandler = medium.Reject;
206 errorHandler = medium.Reject;
166 207
167 AddHandler(handlerInfo);
208 AddHandler(resultHandler, errorHandler, medium.InternalCancel);
168 209
169 210 return medium;
170 211 }
171 212
172 213 /// <summary>
173 214 /// Adds new handlers to this promise.
174 215 /// </summary>
175 216 /// <param name="success">The handler of the successfully completed operation.
176 217 /// This handler will recieve an operation result as a parameter.</param>
177 218 /// <param name="error">Handles an exception that may occur during the operation and returns the value which will be used as the result of the operation.</param>
178 219 /// <returns>The new promise chained to this one.</returns>
179 220 public Promise<T> Then(ResultHandler<T> success, ErrorHandler<T> error) {
180 221 if (success == null && error == null)
181 222 return this;
182 223
183 224 var medium = new Promise<T>(this, true);
184 225
185 var handlerInfo = new ResultHandlerInfo();
226 ResultHandler<T> resultHandler;
227 ErrorHandler errorHandler;
186 228
187 229 if (success != null)
188 handlerInfo.resultHandler = x => {
230 resultHandler = x => {
189 231 success(x);
190 232 medium.Resolve(x);
191 233 };
192 234 else
193 handlerInfo.resultHandler = medium.Resolve;
235 resultHandler = medium.Resolve;
194 236
195 237 if (error != null)
196 handlerInfo.errorHandler = x => {
238 errorHandler = x => {
197 239 try {
198 240 medium.Resolve(error(x));
199 241 } catch { }
200 242 medium.Reject(x);
201 243 };
202 244 else
203 handlerInfo.errorHandler = medium.Reject;
245 errorHandler = medium.Reject;
204 246
205 AddHandler(handlerInfo);
247 AddHandler(resultHandler, errorHandler, medium.InternalCancel);
206 248
207 249 return medium;
208 250 }
209 251
210 252
211 253 public Promise<T> Then(ResultHandler<T> success) {
212 254 if (success == null)
213 255 return this;
214 256
215 257 var medium = new Promise<T>(this, true);
216 258
217 var handlerInfo = new ResultHandlerInfo();
218
259 ResultHandler<T> resultHandler;
260
219 261 if (success != null)
220 handlerInfo.resultHandler = x => {
262 resultHandler = x => {
221 263 success(x);
222 264 medium.Resolve(x);
223 265 };
224 266 else
225 handlerInfo.resultHandler = medium.Resolve;
267 resultHandler = medium.Resolve;
226 268
227 handlerInfo.errorHandler = medium.Reject;
228
229 AddHandler(handlerInfo);
269 AddHandler(resultHandler, medium.Reject, medium.InternalCancel);
230 270
231 271 return medium;
232 272 }
233 273
234 274 public Promise<T> Error(ErrorHandler error) {
235 275 return Then(null, error);
236 276 }
237 277
238 278 /// <summary>
239 279 /// Handles error and allows to keep the promise.
240 280 /// </summary>
241 281 /// <remarks>
242 282 /// If the specified handler throws an exception, this exception will be used to reject the promise.
243 283 /// </remarks>
244 284 /// <param name="handler">The error handler which returns the result of the promise.</param>
245 285 /// <returns>New promise.</returns>
246 286 public Promise<T> Error(ErrorHandler<T> handler) {
247 287 if (handler == null)
248 288 return this;
249 289
250 290 var medium = new Promise<T>(this, true);
251 291
252 AddHandler(new ResultHandlerInfo {
253 errorHandler = e => {
292 AddHandler(
293 null,
294 e => {
254 295 try {
255 296 medium.Resolve(handler(e));
256 297 } catch (Exception e2) {
257 298 medium.Reject(e2);
258 299 }
259 }
260 });
300 },
301 medium.InternalCancel
302 );
261 303
262 304 return medium;
263 305 }
264 306
265 307 public Promise<T> Anyway(Action handler) {
266 308 if (handler == null)
267 309 return this;
268 310
269 311 var medium = new Promise<T>();
270 312
271 AddHandler(new ResultHandlerInfo {
272 resultHandler = x => {
313 AddHandler(
314 x => {
273 315 // to avoid handler being called multiple times we handle exception by ourselfs
274 316 try {
275 317 handler();
276 318 medium.Resolve(x);
277 319 } catch (Exception e) {
278 320 medium.Reject(e);
279 321 }
280 322 },
281 errorHandler = x => {
323
324 e => {
282 325 try {
283 326 handler();
284 327 } catch { }
285 medium.Reject(x);
286 }
287 });
328 medium.Reject(e);
329 },
330
331 medium.InternalCancel
332 );
288 333
289 334 return medium;
290 335 }
291 336
292 337 /// <summary>
293 338 /// Позволяет преобразовать результат выполения операции к новому типу.
294 339 /// </summary>
295 340 /// <typeparam name="TNew">Новый тип результата.</typeparam>
296 341 /// <param name="mapper">Преобразование результата к новому типу.</param>
297 342 /// <param name="error">Обработчик ошибки. Данный обработчик получит
298 343 /// исключение возникшее при выполнении операции.</param>
299 344 /// <returns>Новое обещание, которое будет выполнено при выполнении исходного обещания.</returns>
300 345 public Promise<TNew> Map<TNew>(ResultMapper<T, TNew> mapper, ErrorHandler error) {
301 346 if (mapper == null)
302 347 throw new ArgumentNullException("mapper");
303 348
304 349 // создаем прицепленное обещание
305 350 var chained = new Promise<TNew>();
306 351
307 AddHandler(new ResultHandlerInfo() {
308 resultHandler = result => chained.Resolve(mapper(result)),
309 errorHandler = delegate(Exception e) {
310 if (error != null)
311 try {
312 error(e);
313 } catch { }
314 // в случае ошибки нужно передать исключение дальше по цепочке
315 chained.Reject(e);
316 }
317 });
352 ResultHandler<T> resultHandler = result => chained.Resolve(mapper(result));
353 ErrorHandler errorHandler = delegate(Exception e) {
354 if (error != null)
355 try {
356 error(e);
357 } catch { }
358 // в случае ошибки нужно передать исключение дальше по цепочке
359 chained.Reject(e);
360 };
361
362
363 AddHandler(
364 resultHandler,
365 errorHandler,
366 chained.InternalCancel
367 );
318 368
319 369 return chained;
320 370 }
321 371
322 372 public Promise<TNew> Map<TNew>(ResultMapper<T, TNew> mapper) {
323 373 return Map(mapper, null);
324 374 }
325 375
326 376 /// <summary>
327 377 /// Сцепляет несколько аснхронных операций. Указанная асинхронная операция будет вызвана после
328 378 /// выполнения текущей, а результат текущей операции может быть использован для инициализации
329 379 /// новой операции.
330 380 /// </summary>
331 381 /// <typeparam name="TNew">Тип результата указанной асинхронной операции.</typeparam>
332 382 /// <param name="chained">Асинхронная операция, которая должна будет начаться после выполнения текущей.</param>
333 383 /// <param name="error">Обработчик ошибки. Данный обработчик получит
334 384 /// исключение возникшее при выполнении текуещй операции.</param>
335 385 /// <returns>Новое обещание, которое будет выполнено по окончанию указанной аснхронной операции.</returns>
336 386 public Promise<TNew> Chain<TNew>(ChainedOperation<T, TNew> chained, ErrorHandler error) {
337 387
338 388 // проблема в том, что на момент связывания еще не начата асинхронная операция, поэтому нужно
339 389 // создать посредника, к которому будут подвызяваться следующие обработчики.
340 390 // когда будет выполнена реальная асинхронная операция, она обратиться к посреднику, чтобы
341 391 // передать через него результаты работы.
342 392 var medium = new Promise<TNew>(this, true);
343 393
344 AddHandler(new ResultHandlerInfo {
345 resultHandler = delegate(T result) {
346 if (medium.State == PromiseState.Cancelled)
347 return;
394 ResultHandler<T> resultHandler = delegate(T result) {
395 if (medium.IsCancelled)
396 return;
348 397
349 var promise = chained(result);
398 var promise = chained(result);
350 399
351 // notify chained operation that it's not needed
352 medium.Cancelled(() => promise.Cancel());
353 promise.Then(
354 x => medium.Resolve(x),
355 e => medium.Reject(e)
356 );
357 },
358 errorHandler = delegate(Exception e) {
359 if (error != null)
360 error(e);
361 // в случае ошибки нужно передать исключение дальше по цепочке
362 medium.Reject(e);
363 }
364 });
400 // notify chained operation that it's not needed
401 medium.Cancelled(() => promise.Cancel());
402 promise.Then(
403 x => medium.Resolve(x),
404 e => medium.Reject(e)
405 );
406 };
407
408 ErrorHandler errorHandler = delegate(Exception e) {
409 if (error != null)
410 error(e);
411 // в случае ошибки нужно передать исключение дальше по цепочке
412 medium.Reject(e);
413 };
414
415 AddHandler(
416 resultHandler,
417 errorHandler,
418 medium.InternalCancel
419 );
365 420
366 421 return medium;
367 422 }
368 423
369 424 public Promise<TNew> Chain<TNew>(ChainedOperation<T, TNew> chained) {
370 425 return Chain(chained, null);
371 426 }
372 427
373 428 public Promise<T> Cancelled(Action handler) {
374 if (handler == null)
375 return this;
376 lock (m_lock) {
377 if (m_state == PromiseState.Unresolved)
378 m_cancelHandlers.AddLast(handler);
379 else if (m_state == PromiseState.Cancelled)
380 handler();
381 }
429 AddHandler(null, null, handler);
382 430 return this;
383 431 }
384 432
385 public void HandleCancelled(Action handler) {
386 Cancelled(handler);
433 public Promise<T> Finally(Action handler) {
434 if (handler == null)
435 throw new ArgumentNullException("handler");
436 AddHandler(
437 x => handler(),
438 e => handler(),
439 handler
440 );
441 return this;
387 442 }
388 443
389 444 /// <summary>
390 445 /// Дожидается отложенного обещания и в случае успеха, возвращает
391 446 /// его, результат, в противном случае бросает исключение.
392 447 /// </summary>
393 448 /// <remarks>
394 449 /// <para>
395 450 /// Если ожидание обещания было прервано по таймауту, это не значит,
396 451 /// что обещание было отменено или что-то в этом роде, это только
397 452 /// означает, что мы его не дождались, однако все зарегистрированные
398 453 /// обработчики, как были так остались и они будут вызваны, когда
399 454 /// обещание будет выполнено.
400 455 /// </para>
401 456 /// <para>
402 457 /// Такое поведение вполне оправдано поскольку таймаут может истечь
403 458 /// в тот момент, когда началась обработка цепочки обработчиков, и
404 459 /// к тому же текущее обещание может стоять в цепочке обещаний и его
405 460 /// отклонение может привести к непрогнозируемому результату.
406 461 /// </para>
407 462 /// </remarks>
408 463 /// <param name="timeout">Время ожидания</param>
409 464 /// <returns>Результат выполнения обещания</returns>
410 465 public T Join(int timeout) {
411 466 var evt = new ManualResetEvent(false);
412 467 Anyway(() => evt.Set());
413 468 Cancelled(() => evt.Set());
414 469
415 470 if (!evt.WaitOne(timeout, true))
416 471 throw new TimeoutException();
417 472
418 switch (State) {
419 case PromiseState.Resolved:
473 switch (m_state) {
474 case ResolvedState:
420 475 return m_result;
421 case PromiseState.Cancelled:
476 case CancelledState:
422 477 throw new OperationCanceledException();
423 case PromiseState.Rejected:
478 case RejectedState:
424 479 throw new TargetInvocationException(m_error);
425 480 default:
426 throw new ApplicationException(String.Format("Invalid promise state {0}", State));
481 throw new ApplicationException(String.Format("Invalid promise state {0}", m_state));
427 482 }
428 483 }
429 484
430 485 public T Join() {
431 486 return Join(Timeout.Infinite);
432 487 }
433 488
434 void AddHandler(ResultHandlerInfo handler) {
435 bool invokeRequired = false;
489 void AddHandler(ResultHandler<T> success, ErrorHandler error, Action cancel) {
490 Interlocked.Increment(ref m_childrenCount);
491
492 HandlerDescriptor handler = new HandlerDescriptor {
493 resultHandler = success,
494 errorHandler = error,
495 cancellHandler = cancel
496 };
436 497
437 lock (m_lock) {
438 m_childrenCount++;
439 if (m_state == PromiseState.Unresolved) {
440 m_resultHandlers.AddLast(handler);
441 } else
442 invokeRequired = true;
498 bool queued;
499
500 if (!IsResolved) {
501 m_handlers.Enqueue(handler);
502 queued = true;
503 } else {
504 // the promise is in resolved state, just invoke the handled with minimum overhead
505 queued = false;
506 InvokeHandler(handler);
443 507 }
444 508
445 // обработчики не должны блокировать сам объект
446 if (invokeRequired)
509 if (queued && IsResolved && m_handlers.TryDequeue(out handler))
510 // if the promise have been resolved while we was adding handler to the queue
511 // we can't guarantee that someone is still processing it
512 // therefore we will fetch a handler from the queue and execute it
513 // note that fetched handler may be not the one we have added
447 514 InvokeHandler(handler);
515
448 516 }
449 517
450 void InvokeHandler(ResultHandlerInfo handler) {
518 void InvokeHandler(HandlerDescriptor handler) {
451 519 switch (m_state) {
452 case PromiseState.Resolved:
453 try {
454 if (handler.resultHandler != null)
455 handler.resultHandler(m_result);
456 } catch (Exception e) {
457 try {
458 if (handler.errorHandler != null)
459 handler.errorHandler(e);
460 } catch { }
461 }
520 case ResolvedState:
521 handler.Resolve(m_result);
462 522 break;
463 case PromiseState.Rejected:
464 try {
465 if (handler.errorHandler != null)
466 handler.errorHandler(m_error);
467 } catch { }
523 case RejectedState:
524 handler.Reject(m_error);
525 break;
526 case CancelledState:
527 handler.Cancel();
468 528 break;
469 529 default:
470 530 // do nothing
471 531 return;
472 532 }
473 533 }
474 534
475 535 protected virtual void OnStateChanged() {
476 switch (m_state) {
477 case PromiseState.Resolved:
478 foreach (var resultHandlerInfo in m_resultHandlers)
479 try {
480 if (resultHandlerInfo.resultHandler != null)
481 resultHandlerInfo.resultHandler(m_result);
482 } catch (Exception e) {
483 try {
484 if (resultHandlerInfo.errorHandler != null)
485 resultHandlerInfo.errorHandler(e);
486 } catch { }
487 }
488 break;
489 case PromiseState.Cancelled:
490 foreach (var cancelHandler in m_cancelHandlers)
491 cancelHandler();
492 break;
493 case PromiseState.Rejected:
494 foreach (var resultHandlerInfo in m_resultHandlers)
495 try {
496 if (resultHandlerInfo.errorHandler != null)
497 resultHandlerInfo.errorHandler(m_error);
498 } catch { }
499 break;
500 default:
501 throw new InvalidOperationException(String.Format("Promise entered an invalid state {0}", m_state));
502 }
503
504 m_resultHandlers = null;
505 m_cancelHandlers = null;
536 HandlerDescriptor handler;
537 while (m_handlers.TryDequeue(out handler))
538 InvokeHandler(handler);
506 539 }
507 540
508 541
509 542
510 543 public bool IsExclusive {
511 544 get {
512 lock (m_lock) {
513 return m_childrenCount <= 1;
514 }
515 }
516 }
517
518 public PromiseState State {
519 get {
520 lock (m_lock) {
521 return m_state;
522 }
545 return m_childrenCount <= 1;
523 546 }
524 547 }
525 548
526 549 protected bool Cancel(bool dependencies) {
527 bool result;
528
529 lock (m_lock) {
530 if (m_state == PromiseState.Unresolved) {
531 m_state = PromiseState.Cancelled;
532 result = true;
533 } else {
534 result = false;
535 }
536 }
537
538 if (result)
550 if (BeginTransit()) {
551 CompleteTransit(CancelledState);
539 552 OnStateChanged();
540 553
541 if (dependencies && m_parent != null && m_parent.IsExclusive) {
542 m_parent.Cancel();
554 if (dependencies && m_parent != null && m_parent.IsExclusive)
555 m_parent.Cancel();
556
557 return true;
558 } else {
559 return false;
543 560 }
544
545 return result;
546 561 }
547 562
548 563 }
549 564 }
General Comments 0
You need to be logged in to leave comments. Login now