##// END OF EJS Templates
small refactoring, cleanup.
cin -
r30:2fad2d1f4b03 default
parent child
Show More
@@ -1,89 +1,89
1 1 using System;
2 2 using System.Collections.Generic;
3 3 using System.Linq;
4 4 using System.Text;
5 5 using System.Windows.Forms;
6 6 using System.Diagnostics;
7 7
8 8 namespace Implab.Fx
9 9 {
10 10 public static class AnimationHelpers
11 11 {
12 12 public static Animation<TTarget> AnimateProperty<TTarget, TVal>(this Animation<TTarget> animation, Action<TTarget, TVal> setter, Func<TTarget, TVal> getter, TVal newValue, Func<TVal, TVal, int, int, TVal> fx) where TTarget : class
13 13 {
14 14 if (animation == null)
15 15 throw new ArgumentNullException("animation");
16 16
17 17 TVal oldValue = getter(animation.Traget);
18 18
19 19 animation.Step += (target, elaped, duration) =>
20 20 {
21 21 var value = fx(oldValue, newValue, elaped, duration);
22 22 setter(target, value);
23 23 };
24 24
25 25 return animation;
26 26 }
27 27
28 28 public static Animation<T> AnimateTransparency<T>(this T ctl, float newValue) where T : Form
29 29 {
30 30 var anim = new Animation<T>(ctl);
31 31
32 32 anim.AnimateProperty(
33 33 (target, value) => target.Opacity = value,
34 34 target => target.Opacity,
35 35 newValue,
36 36 (ov, nv, el, du) => ov + ((float)el / du) * (nv - ov)
37 37 );
38 38 return anim;
39 39 }
40 40
41 public static Promise<T> CloseFadeOut<T>(this T ctl) where T : Form
41 public static IPromise<T> CloseFadeOut<T>(this T ctl) where T : Form
42 42 {
43 43 var anim = ctl.AnimateTransparency(0);
44 44
45 45 return anim.Play().DispatchToControl(ctl).Then(frm => frm.Close());
46 46 }
47 47
48 public static Promise<T> OverlayFadeIn<T>(this Form that, T overlay) where T : Form
48 public static IPromise<T> OverlayFadeIn<T>(this Form that, T overlay) where T : Form
49 49 {
50 50 if (that == null)
51 51 throw new ArgumentNullException("that");
52 52 if (overlay == null)
53 53 throw new ArgumentNullException("overlay");
54 54
55 55 // setup overlay
56 56 overlay.Opacity = 0;
57 57 overlay.FormBorderStyle = FormBorderStyle.None;
58 58 overlay.ShowInTaskbar = false;
59 59
60 60 that.AddOwnedForm(overlay);
61 61
62 62 EventHandler handler = (object sender, EventArgs args) =>
63 63 {
64 64 overlay.Bounds = that.RectangleToScreen(that.ClientRectangle);
65 65 };
66 66
67 67 // attach handlers
68 68 that.Move += handler;
69 69 that.Resize += handler;
70 70 that.Shown += handler;
71 71
72 72 // remove handlers to release overlay
73 73 overlay.FormClosed += (sender, args) =>
74 74 {
75 75 that.Move -= handler;
76 76 that.Resize -= handler;
77 77 that.Shown -= handler;
78 78 };
79 79
80 80 overlay.Show(that);
81 81 overlay.Bounds = that.RectangleToScreen(that.ClientRectangle);
82 82
83 83 return overlay
84 84 .AnimateTransparency(1)
85 85 .Play()
86 86 .DispatchToControl(overlay);
87 87 }
88 88 }
89 89 }
@@ -1,333 +1,333
1 1 using System;
2 2 using Microsoft.VisualStudio.TestTools.UnitTesting;
3 3 using System.Reflection;
4 4 using System.Threading;
5 5 using Implab.Parallels;
6 6
7 7 namespace Implab.Test {
8 8 [TestClass]
9 9 public class AsyncTests {
10 10 [TestMethod]
11 11 public void ResolveTest() {
12 12 int res = -1;
13 13 var p = new Promise<int>();
14 14 p.Then(x => res = x);
15 15 p.Resolve(100);
16 16
17 17 Assert.AreEqual(100, res);
18 18 }
19 19
20 20 [TestMethod]
21 21 public void RejectTest() {
22 22 int res = -1;
23 23 Exception err = null;
24 24
25 25 var p = new Promise<int>();
26 26 p.Then(x => res = x, e => err = e);
27 27 p.Reject(new ApplicationException("error"));
28 28
29 29 Assert.AreEqual(res, -1);
30 30 Assert.AreEqual(err.Message, "error");
31 31
32 32 }
33 33
34 34 [TestMethod]
35 35 public void JoinSuccessTest() {
36 36 var p = new Promise<int>();
37 37 p.Resolve(100);
38 38 Assert.AreEqual(p.Join(), 100);
39 39 }
40 40
41 41 [TestMethod]
42 42 public void JoinFailTest() {
43 43 var p = new Promise<int>();
44 44 p.Reject(new ApplicationException("failed"));
45 45
46 46 try {
47 47 p.Join();
48 48 throw new ApplicationException("WRONG!");
49 49 } catch (TargetInvocationException err) {
50 50 Assert.AreEqual(err.InnerException.Message, "failed");
51 51 } catch {
52 52 Assert.Fail("Got wrong excaption");
53 53 }
54 54 }
55 55
56 56 [TestMethod]
57 57 public void MapTest() {
58 58 var p = new Promise<int>();
59 59
60 60 var p2 = p.Map(x => x.ToString());
61 61 p.Resolve(100);
62 62
63 63 Assert.AreEqual(p2.Join(), "100");
64 64 }
65 65
66 66 [TestMethod]
67 67 public void FixErrorTest() {
68 68 var p = new Promise<int>();
69 69
70 70 var p2 = p.Error(e => 101);
71 71
72 72 p.Reject(new Exception());
73 73
74 74 Assert.AreEqual(p2.Join(), 101);
75 75 }
76 76
77 77 [TestMethod]
78 78 public void ChainTest() {
79 79 var p1 = new Promise<int>();
80 80
81 81 var p3 = p1.Chain(x => {
82 82 var p2 = new Promise<string>();
83 83 p2.Resolve(x.ToString());
84 84 return p2;
85 85 });
86 86
87 87 p1.Resolve(100);
88 88
89 89 Assert.AreEqual(p3.Join(), "100");
90 90 }
91 91
92 92 [TestMethod]
93 93 public void PoolTest() {
94 94 var pid = Thread.CurrentThread.ManagedThreadId;
95 95 var p = AsyncPool.Invoke(() => Thread.CurrentThread.ManagedThreadId);
96 96
97 97 Assert.AreNotEqual(pid, p.Join());
98 98 }
99 99
100 100 [TestMethod]
101 101 public void WorkerPoolSizeTest() {
102 102 var pool = new WorkerPool(5, 10, 0);
103 103
104 104 Assert.AreEqual(5, pool.PoolSize);
105 105
106 106 pool.Invoke(() => { Thread.Sleep(100000000); return 10; });
107 107 pool.Invoke(() => { Thread.Sleep(100000000); return 10; });
108 108 pool.Invoke(() => { Thread.Sleep(100000000); return 10; });
109 109
110 110 Assert.AreEqual(5, pool.PoolSize);
111 111
112 112 for (int i = 0; i < 100; i++)
113 113 pool.Invoke(() => { Thread.Sleep(100000000); return 10; });
114 114 Thread.Sleep(200);
115 115 Assert.AreEqual(10, pool.PoolSize);
116 116
117 117 pool.Dispose();
118 118 }
119 119
120 120 [TestMethod]
121 121 public void WorkerPoolCorrectTest() {
122 122 var pool = new WorkerPool(0,1000,100);
123 123
124 124 int iterations = 1000;
125 125 int pending = iterations;
126 126 var stop = new ManualResetEvent(false);
127 127
128 128 var count = 0;
129 129 for (int i = 0; i < iterations; i++) {
130 130 pool
131 131 .Invoke(() => 1)
132 132 .Then(x => Interlocked.Add(ref count, x))
133 133 .Then(x => Math.Log10(x))
134 134 .Anyway(() => {
135 135 Interlocked.Decrement(ref pending);
136 136 if (pending == 0)
137 137 stop.Set();
138 138 });
139 139 }
140 140
141 141 stop.WaitOne();
142 142
143 143 Assert.AreEqual(iterations, count);
144 144 Console.WriteLine("Max threads: {0}", pool.MaxRunningThreads);
145 145 pool.Dispose();
146 146
147 147 }
148 148
149 149 [TestMethod]
150 150 public void WorkerPoolDisposeTest() {
151 151 var pool = new WorkerPool(5, 20);
152 152 Assert.AreEqual(5, pool.PoolSize);
153 153 pool.Dispose();
154 154 Thread.Sleep(500);
155 155 Assert.AreEqual(0, pool.PoolSize);
156 156 pool.Dispose();
157 157 }
158 158
159 159 [TestMethod]
160 160 public void MTQueueTest() {
161 161 var queue = new MTQueue<int>();
162 162 int res;
163 163
164 164 queue.Enqueue(10);
165 165 Assert.IsTrue(queue.TryDequeue(out res));
166 166 Assert.AreEqual(10, res);
167 167 Assert.IsFalse(queue.TryDequeue(out res));
168 168
169 169 for (int i = 0; i < 1000; i++)
170 170 queue.Enqueue(i);
171 171
172 172 for (int i = 0; i < 1000; i++) {
173 173 queue.TryDequeue(out res);
174 174 Assert.AreEqual(i, res);
175 175 }
176 176
177 177 int writers = 0;
178 178 int readers = 0;
179 179 var stop = new ManualResetEvent(false);
180 180 int total = 0;
181 181
182 182 int itemsPerWriter = 1000;
183 183 int writersCount = 3;
184 184
185 185 for (int i = 0; i < writersCount; i++) {
186 186 Interlocked.Increment(ref writers);
187 187 var wn = i;
188 188 AsyncPool
189 189 .InvokeNewThread(() => {
190 190 for (int ii = 0; ii < itemsPerWriter; ii++) {
191 191 queue.Enqueue(1);
192 192 }
193 193 return 1;
194 194 })
195 195 .Anyway(() => Interlocked.Decrement(ref writers));
196 196 }
197 197
198 198 for (int i = 0; i < 10; i++) {
199 199 Interlocked.Increment(ref readers);
200 200 var wn = i;
201 201 AsyncPool
202 202 .InvokeNewThread(() => {
203 203 int t;
204 204 do {
205 205 while (queue.TryDequeue(out t))
206 206 Interlocked.Add(ref total, t);
207 207 } while (writers > 0);
208 208 return 1;
209 209 })
210 210 .Anyway(() => {
211 211 Interlocked.Decrement(ref readers);
212 212 if (readers == 0)
213 213 stop.Set();
214 214 });
215 215 }
216 216
217 217 stop.WaitOne();
218 218
219 219 Assert.AreEqual(itemsPerWriter * writersCount, total);
220 220 }
221 221
222 222 [TestMethod]
223 223 public void ParallelMapTest() {
224 224
225 225 int count = 100000;
226 226
227 227 double[] args = new double[count];
228 228 var rand = new Random();
229 229
230 230 for (int i = 0; i < count; i++)
231 231 args[i] = rand.NextDouble();
232 232
233 233 var t = Environment.TickCount;
234 234 var res = args.ParallelMap(x => Math.Sin(x*x), 4).Join();
235 235
236 236 Console.WriteLine("Map complete in {0} ms", Environment.TickCount - t);
237 237
238 238 t = Environment.TickCount;
239 239 for (int i = 0; i < count; i++)
240 240 Assert.AreEqual(Math.Sin(args[i] * args[i]), res[i]);
241 241 Console.WriteLine("Verified in {0} ms", Environment.TickCount - t);
242 242 }
243 243
244 244 [TestMethod]
245 245 public void ChainedMapTest() {
246 246
247 247 using (var pool = new WorkerPool(0,100,0)) {
248 248 int count = 10000;
249 249
250 250 double[] args = new double[count];
251 251 var rand = new Random();
252 252
253 253 for (int i = 0; i < count; i++)
254 254 args[i] = rand.NextDouble();
255 255
256 256 var t = Environment.TickCount;
257 257 var res = args
258 .ChainedMap2(
258 .ChainedMap(
259 259 x => pool.Invoke(
260 260 () => Math.Sin(x * x)
261 261 ),
262 262 4
263 263 )
264 264 .Join();
265 265
266 266 Console.WriteLine("Map complete in {0} ms", Environment.TickCount - t);
267 267
268 268 t = Environment.TickCount;
269 269 for (int i = 0; i < count; i++)
270 270 Assert.AreEqual(Math.Sin(args[i] * args[i]), res[i]);
271 271 Console.WriteLine("Verified in {0} ms", Environment.TickCount - t);
272 272 Console.WriteLine("Max workers: {0}", pool.MaxRunningThreads);
273 273 }
274 274 }
275 275
276 276 [TestMethod]
277 277 public void ParallelForEachTest() {
278 278
279 279 int count = 100000;
280 280
281 281 int[] args = new int[count];
282 282 var rand = new Random();
283 283
284 284 for (int i = 0; i < count; i++)
285 285 args[i] = (int)(rand.NextDouble() * 100);
286 286
287 287 int result = 0;
288 288
289 289 var t = Environment.TickCount;
290 290 args.ParallelForEach(x => Interlocked.Add(ref result, x), 4).Join();
291 291
292 292 Console.WriteLine("Iteration complete in {0} ms, result: {1}", Environment.TickCount - t, result);
293 293
294 294 int result2 = 0;
295 295
296 296 t = Environment.TickCount;
297 297 for (int i = 0; i < count; i++)
298 298 result2 += args[i];
299 299 Assert.AreEqual(result2, result);
300 300 Console.WriteLine("Verified in {0} ms", Environment.TickCount - t);
301 301 }
302 302
303 303 [TestMethod]
304 304 public void ComplexCase1Test() {
305 305 var flags = new bool[3];
306 306
307 307 // op1 (aync 200ms) => op2 (async 200ms) => op3 (sync map)
308 308
309 309 var p = PromiseHelper
310 310 .Sleep(200, "Alan")
311 311 .Cancelled(() => flags[0] = true)
312 312 .Chain(x =>
313 313 PromiseHelper
314 314 .Sleep(200, "Hi, " + x)
315 315 .Map(y => y)
316 316 .Cancelled(() => flags[1] = true)
317 317 )
318 318 .Cancelled(() => flags[2] = true);
319 319 Thread.Sleep(300);
320 320 p.Cancel();
321 321 try {
322 322 Assert.AreEqual(p.Join(), "Hi, Alan");
323 323 Assert.Fail("Shouldn't get here");
324 324 } catch (OperationCanceledException) {
325 325 }
326 326
327 327 Assert.IsFalse(flags[0]);
328 328 Assert.IsTrue(flags[1]);
329 329 Assert.IsTrue(flags[2]);
330 330 }
331 331 }
332 332 }
333 333
@@ -1,172 +1,173
1 1 using System;
2 2 using System.Collections.Generic;
3 3 using System.Diagnostics;
4 4 using System.Linq;
5 5 using System.Text;
6 6 using System.Threading;
7 7
8 8 namespace Implab.Parallels {
9 9 public static class ArrayTraits {
10 10 class ArrayIterator<TSrc> : DispatchPool<int> {
11 11 readonly Action<TSrc> m_action;
12 12 readonly TSrc[] m_source;
13 13 readonly Promise<int> m_promise = new Promise<int>();
14 14
15 15 int m_pending;
16 16 int m_next;
17 17
18 18 public ArrayIterator(TSrc[] source, Action<TSrc> action, int threads)
19 19 : base(threads) {
20 20
21 21 Debug.Assert(source != null);
22 22 Debug.Assert(action != null);
23 23
24 24 m_next = 0;
25 25 m_source = source;
26 26 m_pending = source.Length;
27 27 m_action = action;
28 28
29 29 m_promise.Anyway(() => Dispose());
30 30 m_promise.Cancelled(() => Dispose());
31 31
32 32 InitPool();
33 33 }
34 34
35 35 public Promise<int> Promise {
36 36 get {
37 37 return m_promise;
38 38 }
39 39 }
40 40
41 41 protected override bool TryDequeue(out int unit) {
42 42 unit = Interlocked.Increment(ref m_next) - 1;
43 43 return unit >= m_source.Length ? false : true;
44 44 }
45 45
46 46 protected override void InvokeUnit(int unit) {
47 47 try {
48 48 m_action(m_source[unit]);
49 49 var pending = Interlocked.Decrement(ref m_pending);
50 50 if (pending == 0)
51 51 m_promise.Resolve(m_source.Length);
52 52 } catch (Exception e) {
53 53 m_promise.Reject(e);
54 54 }
55 55 }
56 56 }
57 57
58 58 class ArrayMapper<TSrc, TDst>: DispatchPool<int> {
59 59 readonly Func<TSrc, TDst> m_transform;
60 60 readonly TSrc[] m_source;
61 61 readonly TDst[] m_dest;
62 62 readonly Promise<TDst[]> m_promise = new Promise<TDst[]>();
63 63
64 64 int m_pending;
65 65 int m_next;
66 66
67 67 public ArrayMapper(TSrc[] source, Func<TSrc, TDst> transform, int threads)
68 68 : base(threads) {
69 69
70 70 Debug.Assert (source != null);
71 71 Debug.Assert( transform != null);
72 72
73 73 m_next = 0;
74 74 m_source = source;
75 75 m_dest = new TDst[source.Length];
76 76 m_pending = source.Length;
77 77 m_transform = transform;
78 78
79 79 m_promise.Anyway(() => Dispose());
80 80 m_promise.Cancelled(() => Dispose());
81 81
82 82 InitPool();
83 83 }
84 84
85 85 public Promise<TDst[]> Promise {
86 86 get {
87 87 return m_promise;
88 88 }
89 89 }
90 90
91 91 protected override bool TryDequeue(out int unit) {
92 92 unit = Interlocked.Increment(ref m_next) - 1;
93 93 return unit >= m_source.Length ? false : true;
94 94 }
95 95
96 96 protected override void InvokeUnit(int unit) {
97 97 try {
98 98 m_dest[unit] = m_transform(m_source[unit]);
99 99 var pending = Interlocked.Decrement(ref m_pending);
100 100 if (pending == 0)
101 101 m_promise.Resolve(m_dest);
102 102 } catch (Exception e) {
103 103 m_promise.Reject(e);
104 104 }
105 105 }
106 106 }
107 107
108 public static Promise<TDst[]> ParallelMap<TSrc, TDst> (this TSrc[] source, Func<TSrc,TDst> transform, int threads) {
108 public static IPromise<TDst[]> ParallelMap<TSrc, TDst> (this TSrc[] source, Func<TSrc,TDst> transform, int threads) {
109 109 if (source == null)
110 110 throw new ArgumentNullException("source");
111 111 if (transform == null)
112 112 throw new ArgumentNullException("transform");
113 113
114 114 var mapper = new ArrayMapper<TSrc, TDst>(source, transform, threads);
115 115 return mapper.Promise;
116 116 }
117 117
118 public static Promise<int> ParallelForEach<TSrc>(this TSrc[] source, Action<TSrc> action, int threads) {
118 public static IPromise<int> ParallelForEach<TSrc>(this TSrc[] source, Action<TSrc> action, int threads) {
119 119 if (source == null)
120 120 throw new ArgumentNullException("source");
121 121 if (action == null)
122 122 throw new ArgumentNullException("action");
123 123
124 124 var iter = new ArrayIterator<TSrc>(source, action, threads);
125 125 return iter.Promise;
126 126 }
127 127
128 128 public static IPromise<TDst[]> ChainedMap<TSrc, TDst>(this TSrc[] source, ChainedOperation<TSrc, TDst> transform, int threads) {
129 129 if (source == null)
130 130 throw new ArgumentNullException("source");
131 131 if (transform == null)
132 132 throw new ArgumentNullException("transform");
133 133 if (threads <= 0)
134 134 throw new ArgumentOutOfRangeException("Threads number must be greater then zero");
135 135
136 136 var promise = new Promise<TDst[]>();
137 137 var res = new TDst[source.Length];
138 138 var pending = source.Length;
139
139 140 var semaphore = new Semaphore(threads, threads);
140 141
141 142 AsyncPool.InvokeNewThread(() => {
142 143 for (int i = 0; i < source.Length; i++) {
143 144 if(promise.IsResolved)
144 145 break; // stop processing in case of error or cancellation
145 146 var idx = i;
146 147 semaphore.WaitOne();
147 148 try {
148 149 var p1 = transform(source[i]);
149 150 p1.Anyway(() => semaphore.Release());
150 151 p1.Cancelled(() => semaphore.Release());
151 152 p1.Then(
152 153 x => {
153 154 res[idx] = x;
154 155 var left = Interlocked.Decrement(ref pending);
155 156 if (left == 0)
156 157 promise.Resolve(res);
157 158 },
158 159 e => promise.Reject(e)
159 160 );
160 161
161 162 } catch (Exception e) {
162 163 promise.Reject(e);
163 164 }
164 165 }
165 166 return 0;
166 167 });
167 168
168 169 return promise.Anyway(() => semaphore.Dispose());
169 170 }
170 171
171 172 }
172 173 }
@@ -1,332 +1,335
1 1 using System;
2 2 using System.Collections.Generic;
3 3 using System.Linq;
4 4 using System.Text;
5 5 using System.Threading;
6 6 using System.Diagnostics;
7 7
8 8 namespace Implab.Parallels {
9 9 public abstract class DispatchPool<TUnit> : IDisposable {
10 10 readonly int m_minThreads;
11 11 readonly int m_maxThreads;
12 12
13 13 int m_createdThreads = 0; // the current size of the pool
14 14 int m_activeThreads = 0; // the count of threads which are active
15 15 int m_sleepingThreads = 0; // the count of currently inactive threads
16 16 int m_maxRunningThreads = 0; // the meximum reached size of the pool
17 17 int m_exitRequired = 0; // the pool is going to shutdown, all unused workers are released
18 18 int m_releaseTimeout = 100; // the timeout while the working thread will wait for the new tasks before exit
19 19 int m_wakeEvents = 0; // the count of wake events
20 20
21 21 AutoResetEvent m_hasTasks = new AutoResetEvent(false);
22 22
23 23 protected DispatchPool(int min, int max) {
24 24 if (min < 0)
25 25 throw new ArgumentOutOfRangeException("min");
26 26 if (max <= 0)
27 27 throw new ArgumentOutOfRangeException("max");
28 28
29 29 if (min > max)
30 30 min = max;
31 31 m_minThreads = min;
32 32 m_maxThreads = max;
33 33 }
34 34
35 35 protected DispatchPool(int threads)
36 36 : this(threads, threads) {
37 37 }
38 38
39 39 protected DispatchPool() {
40 40 int maxThreads, maxCP;
41 41 ThreadPool.GetMaxThreads(out maxThreads, out maxCP);
42 42
43 43 m_minThreads = 0;
44 44 m_maxThreads = maxThreads;
45 45 }
46 46
47 47 protected void InitPool() {
48 48 for (int i = 0; i < m_minThreads; i++)
49 49 StartWorker();
50 50 }
51 51
52 52 public int PoolSize {
53 53 get {
54 54 return m_createdThreads;
55 55 }
56 56 }
57 57
58 58 public int ActiveThreads {
59 59 get {
60 60 return m_activeThreads;
61 61 }
62 62 }
63 63
64 64 public int MaxRunningThreads {
65 65 get {
66 66 return m_maxRunningThreads;
67 67 }
68 68 }
69 69
70 70 protected bool IsDisposed {
71 71 get {
72 72 return m_exitRequired != 0;
73 73 }
74 74 }
75 75
76 76 protected abstract bool TryDequeue(out TUnit unit);
77 77
78 78 #region thread execution traits
79 79 int SignalThread() {
80 80 var signals = Interlocked.Increment(ref m_wakeEvents);
81 81 if(signals == 1)
82 82 m_hasTasks.Set();
83 83 return signals;
84 84 }
85 85
86 86 bool FetchSignalOrWait(int timeout) {
87 87 var start = Environment.TickCount;
88 88
89 89 // ΠΎΠ·Π½Π°Ρ‡Π°Π΅Ρ‚, Ρ‡Ρ‚ΠΎ ΠΏΠΎΡ‚ΠΎΠΊ Π²Π»Π°Π΄Π΅Π΅Ρ‚ Π±Π»ΠΎΠΊΠΈΡ€ΠΎΠ²ΠΊΠΎΠΉ ΠΈ ΠΏΡ€ΠΈ ΡƒΡΠΏΠ΅ΡˆΠ½ΠΎΠΌ ΠΏΠΎΠ»ΡƒΡ‡Π΅Π½ΠΈΠΈ сигнала Π΄ΠΎΠ»ΠΆΠ΅Π½
90 90 // Π΅Π΅ Π²Π΅Ρ€Π½ΡƒΡ‚ΡŒ, Ρ‡Ρ‚ΠΎΠ±Ρ‹ Π΄Ρ€ΡƒΠ³ΠΎΠΉ ΠΎΠΆΠΈΠ΄Π°ΡŽΡ‰ΠΈΠΉ ΠΏΠΎΡ‚ΠΎΠΊ смог
91 91 bool hasLock = false;
92 92 do {
93 93 int signals;
94 94 do {
95 95 signals = m_wakeEvents;
96 96 if (signals == 0)
97 97 break;
98 98 } while (Interlocked.CompareExchange(ref m_wakeEvents, signals - 1, signals) != signals);
99 99
100 100 if (signals >= 1) {
101 101 if (signals > 1 && hasLock)
102 102 m_hasTasks.Set();
103 103 return true;
104 104 }
105 105
106 106 if (timeout != -1)
107 107 timeout = Math.Max(0, timeout - (Environment.TickCount - start));
108 108
109 109 // Ссли сигналов большС Π½Π΅ ΠΎΡΡ‚Π°Π»ΠΎΡΡŒ, Ρ‚ΠΎ ΠΏΠ΅Ρ€Π²Ρ‹ΠΉ ΠΏΠΎΡ‚ΠΎΠΊ, ΠΊΠΎΡ‚ΠΎΡ€Ρ‹ΠΉ дошСл сюда сбросит событиС
110 110 // ΠΈ ΡƒΠΉΠ΄Π΅Ρ‚ Π½Π° пустой Ρ†ΠΈΠΊΠ», послС Ρ‡Π΅Π³ΠΎ заблокируСтся
111 111
112 112 hasLock = true;
113 113 } while (m_hasTasks.WaitOne(timeout));
114 114
115 115 return false;
116 116 }
117 117
118 118 bool Sleep(int timeout) {
119 119 Interlocked.Increment(ref m_sleepingThreads);
120 120 if (FetchSignalOrWait(timeout)) {
121 121 Interlocked.Decrement(ref m_sleepingThreads);
122 122 return true;
123 123 } else {
124 124 Interlocked.Decrement(ref m_sleepingThreads);
125 125 return false;
126 126 }
127 127 }
128 128 #endregion
129 129
130 130 /// <summary>
131 131 /// ЗапускаСт Π»ΠΈΠ±ΠΎ Π½ΠΎΠ²Ρ‹ΠΉ ΠΏΠΎΡ‚ΠΎΠΊ, Ссли Ρ€Π°Π½ΡŒΡˆΠ΅ Π½Π΅ Π±Ρ‹Π»ΠΎ Π½ΠΈ ΠΎΠ΄Π½ΠΎΠ³ΠΎ ΠΏΠΎΡ‚ΠΎΠΊΠ°, Π»ΠΈΠ±ΠΎ устанавливаСт событиС ΠΏΡ€ΠΎΠ±ΡƒΠΆΠ΄Π΅Π½ΠΈΠ΅ ΠΎΠ΄Π½ΠΎΠ³ΠΎ спящСго ΠΏΠΎΡ‚ΠΎΠΊΠ°
132 132 /// </summary>
133 133 protected void GrowPool() {
134 134 if (m_exitRequired != 0)
135 135 return;
136 136 if (m_sleepingThreads > m_wakeEvents) {
137 137 //Console.WriteLine("Waking threads (sleeps {0}, pending {1})", m_sleepingThreads, m_wakeEvents);
138 138
139 139 // all sleeping threads may gone
140 140 SignalThread(); // wake a sleeping thread;
141 141
142 142 // we can't check whether signal has been processed
143 143 // anyway it may take some time for the thread to start
144 144 // we will ensure that at least one thread is running
145 145
146 if (AllocateThreadSlot(1)) {
147 // if there were no threads in the pool
148 var worker = new Thread(this.Worker);
149 worker.IsBackground = true;
150 worker.Start();
151 }
146 EnsurePoolIsAlive();
152 147 } else {
153 148 // if there is no sleeping threads in the pool
154 if (!StartWorker())
155 // we haven't started a new thread, but the current can be on the way and it can't process the queue
149 if (!StartWorker()) {
150 // we haven't started a new thread, but the current can be on the way to terminate and it can't process the queue
156 151 // send it a signal to spin again
157 SignalThread();
152 SignalThread();
153 EnsurePoolIsAlive();
154 }
155 }
156 }
157
158 private void EnsurePoolIsAlive() {
159 if (AllocateThreadSlot(1)) {
160 // if there were no threads in the pool
161 var worker = new Thread(this.Worker);
162 worker.IsBackground = true;
163 worker.Start();
158 164 }
159 165 }
160 166
161 167 private bool Suspend() {
162 168 //no tasks left, exit if the thread is no longer needed
163 169 bool last;
164 170 bool requestExit;
165 171
166 172 // if threads have a timeout before releasing
167 173 if (m_releaseTimeout > 0)
168 174 requestExit = !Sleep(m_releaseTimeout);
169 175 else
170 176 requestExit = true;
171 177
172 178 if (!requestExit)
173 179 return true;
174 180
175 181 // release unsused thread
176 182 if (requestExit && ReleaseThreadSlot(out last)) {
177 183 // in case at the moment the last thread was being released
178 184 // a new task was added to the queue, we need to try
179 185 // to revoke the thread to avoid the situation when the task is left unprocessed
180 if (last && Sleep(0)) { // Sleep(0) will fetch pending task or will return false
181 if (AllocateThreadSlot(1))
182 return true; // spin again...
183 else
184 SignalThread(); // since Sleep(0) has fetched the signal we neet to reschedule it
185
186 if (last && FetchSignalOrWait(0)) { // FetchSignalOrWait(0) will fetch pending task or will return false
187 SignalThread(); // since FetchSignalOrWait(0) has fetched the signal we need to reschedule it
188 return AllocateThreadSlot(1); // ensure that at least one thread is alive
186 189 }
187 190
188 191 return false;
189 192 }
190 193
191 194 // wait till infinity
192 195 Sleep(-1);
193 196
194 197 return true;
195 198 }
196 199
197 200 #region thread slots traits
198 201
199 202 bool AllocateThreadSlot() {
200 203 int current;
201 204 // use spins to allocate slot for the new thread
202 205 do {
203 206 current = m_createdThreads;
204 207 if (current >= m_maxThreads || m_exitRequired != 0)
205 208 // no more slots left or the pool has been disposed
206 209 return false;
207 210 } while (current != Interlocked.CompareExchange(ref m_createdThreads, current + 1, current));
208 211
209 212 UpdateMaxThreads(current + 1);
210 213
211 214 return true;
212 215 }
213 216
214 217 bool AllocateThreadSlot(int desired) {
215 218 if (desired - 1 != Interlocked.CompareExchange(ref m_createdThreads, desired, desired - 1))
216 219 return false;
217 220
218 221 UpdateMaxThreads(desired);
219 222
220 223 return true;
221 224 }
222 225
223 226 bool ReleaseThreadSlot(out bool last) {
224 227 last = false;
225 228 int current;
226 229 // use spins to release slot for the new thread
227 230 do {
228 231 current = m_createdThreads;
229 232 if (current <= m_minThreads && m_exitRequired == 0)
230 233 // the thread is reserved
231 234 return false;
232 235 } while (current != Interlocked.CompareExchange(ref m_createdThreads, current - 1, current));
233 236
234 237 last = (current == 1);
235 238
236 239 return true;
237 240 }
238 241
239 242 /// <summary>
240 243 /// releases thread slot unconditionally, used during cleanup
241 244 /// </summary>
242 245 /// <returns>true - no more threads left</returns>
243 246 bool ReleaseThreadSlotAnyway() {
244 247 var left = Interlocked.Decrement(ref m_createdThreads);
245 248 return left == 0;
246 249 }
247 250
248 251 void UpdateMaxThreads(int count) {
249 252 int max;
250 253 do {
251 254 max = m_maxRunningThreads;
252 255 if (max >= count)
253 256 break;
254 257 } while(max != Interlocked.CompareExchange(ref m_maxRunningThreads, count, max));
255 258 }
256 259
257 260 #endregion
258 261
259 262 bool StartWorker() {
260 263 if (AllocateThreadSlot()) {
261 264 // slot successfully allocated
262 265 var worker = new Thread(this.Worker);
263 266 worker.IsBackground = true;
264 267 worker.Start();
265 268
266 269 return true;
267 270 } else {
268 271 return false;
269 272 }
270 273 }
271 274
272 275 protected abstract void InvokeUnit(TUnit unit);
273 276
274 277 void Worker() {
275 278 TUnit unit;
276 279 //Console.WriteLine("{0}: Active", Thread.CurrentThread.ManagedThreadId);
277 280 Interlocked.Increment(ref m_activeThreads);
278 281 do {
279 282 // exit if requested
280 283 if (m_exitRequired != 0) {
281 284 // release the thread slot
282 285 Interlocked.Decrement(ref m_activeThreads);
283 286 if (ReleaseThreadSlotAnyway()) // it was the last worker
284 287 m_hasTasks.Dispose();
285 288 else
286 289 SignalThread(); // wake next worker
287 290 break;
288 291 }
289 292
290 293 // fetch task
291 294 if (TryDequeue(out unit)) {
292 295 InvokeUnit(unit);
293 296 continue;
294 297 }
295 298
296 299 Interlocked.Decrement(ref m_activeThreads);
297 300
298 301 // entering suspend state
299 302 // keep this thread and wait
300 303 if (!Suspend())
301 304 break;
302 305 //Console.WriteLine("{0}: Awake", Thread.CurrentThread.ManagedThreadId);
303 306 Interlocked.Increment(ref m_activeThreads);
304 307 } while (true);
305 308 //Console.WriteLine("{0}: Exited", Thread.CurrentThread.ManagedThreadId);
306 309 }
307 310
308 311 protected virtual void Dispose(bool disposing) {
309 312 if (disposing) {
310 313 if (m_exitRequired == 0) {
311 314 if (Interlocked.CompareExchange(ref m_exitRequired, 1, 0) != 0)
312 315 return;
313 316
314 317 // wake sleeping threads
315 318 if (m_createdThreads > 0)
316 319 SignalThread();
317 320 else
318 321 m_hasTasks.Dispose();
319 322 GC.SuppressFinalize(this);
320 323 }
321 324 }
322 325 }
323 326
324 327 public void Dispose() {
325 328 Dispose(true);
326 329 }
327 330
328 331 ~DispatchPool() {
329 332 Dispose(false);
330 333 }
331 334 }
332 335 }
@@ -1,669 +1,669
1 1 using System;
2 2 using System.Collections.Generic;
3 3 using System.Reflection;
4 4 using System.Diagnostics;
5 5 using System.Threading;
6 6 using Implab.Parallels;
7 7
8 8 namespace Implab {
9 9
10 10 public delegate void ErrorHandler(Exception e);
11 11 public delegate T ErrorHandler<out T>(Exception e);
12 12 public delegate void ResultHandler<in T>(T result);
13 13 public delegate TNew ResultMapper<in TSrc, out TNew>(TSrc result);
14 public delegate Promise<TNew> ChainedOperation<in TSrc,TNew>(TSrc result);
14 public delegate IPromise<TNew> ChainedOperation<in TSrc,TNew>(TSrc result);
15 15
16 16 /// <summary>
17 17 /// Класс для асинхронного получСния Ρ€Π΅Π·ΡƒΠ»ΡŒΡ‚Π°Ρ‚ΠΎΠ². Π’Π°ΠΊ Π½Π°Π·Ρ‹Π²Π°Π΅ΠΌΠΎΠ΅ "ΠΎΠ±Π΅Ρ‰Π°Π½ΠΈΠ΅".
18 18 /// </summary>
19 19 /// <typeparam name="T">Π’ΠΈΠΏ ΠΏΠΎΠ»ΡƒΡ‡Π°Π΅ΠΌΠΎΠ³ΠΎ Ρ€Π΅Π·ΡƒΠ»ΡŒΡ‚Π°Ρ‚Π°</typeparam>
20 20 /// <remarks>
21 21 /// <para>БСрвис ΠΏΡ€ΠΈ ΠΎΠ±Ρ€Π°Ρ‰Π΅Π½ΠΈΠΈ ΠΊ Π΅Π³ΠΎ ΠΌΠ΅Ρ‚ΠΎΠ΄Ρƒ Π΄Π°Π΅Ρ‚ ΠΎΠ±Π΅Ρ‰Π°ΠΈΠ½ΠΈΠ΅ ΠΎ Π²Ρ‹ΠΏΠΎΠ»Π½Π΅Π½ΠΈΠΈ ΠΎΠΏΠ΅Ρ€Π°Ρ†ΠΈΠΈ,
22 22 /// ΠΊΠ»ΠΈΠ΅Π½Ρ‚ ΠΏΠΎΠ»ΡƒΡ‡ΠΈΠ² Ρ‚Π°ΠΊΠΎΠ΅ ΠΎΠ±Π΅Ρ‰Π°Π½ΠΈΠ΅ ΠΌΠΎΠΆΠ΅Ρ‚ ΡƒΡΡ‚Π°Π½ΠΎΠ²ΠΈΡ‚ΡŒ ряд ΠΎΠ±Ρ€Π°Ρ‚Π½Ρ‹Ρ… Π²Ρ‹Π·ΠΎΠ²ΠΎ для получСния
23 23 /// событий выполнСния обСщания, Ρ‚ΠΎΠ΅ΡΡ‚ΡŒ Π·Π°Π²Π΅Ρ€ΡˆΠ΅Π½ΠΈΡ ΠΎΠΏΠ΅Ρ€Π°Ρ†ΠΈΠΈ ΠΈ прСдоставлСнии Ρ€Π΅Π·ΡƒΠ»ΡŒΡ‚Π°Ρ‚ΠΎΠ².</para>
24 24 /// <para>
25 25 /// ΠžΠ±Π΅Ρ‰Π΅Π½ΠΈΠ΅ ΠΌΠΎΠΆΠ΅Ρ‚ Π±Ρ‹Ρ‚ΡŒ ΠΊΠ°ΠΊ Π²Ρ‹ΠΏΠΎΠ»Π½Π΅Π½ΠΎ, Ρ‚Π°ΠΊ ΠΈ Π²Ρ‹ΠΏΠΎΠ»Π½Π΅Π½ΠΎ с ошибкой. Для подписки Π½Π°
26 26 /// Π΄Π°Π½Π½Ρ‹Π΅ события ΠΊΠ»ΠΈΠ΅Π½Ρ‚ Π΄ΠΎΠ»ΠΆΠ΅Π½ ΠΈΡΠΏΠΎΠ»ΡŒΠ·ΠΎΠ²Π°Ρ‚ΡŒ ΠΌΠ΅Ρ‚ΠΎΠ΄Ρ‹ <c>Then</c>.
27 27 /// </para>
28 28 /// <para>
29 29 /// БСрвис, Π² свою ΠΎΡ‡Π΅Ρ€Π΅Π΄ΡŒ, ΠΏΠΎ ΠΎΠΊΠΎΠ½Ρ‡Π°Π½ΠΈΡŽ выполнСния ΠΎΠΏΠ΅Ρ€Π°Ρ†ΠΈΠΈ (Π²ΠΎΠ·ΠΌΠΎΠΆΠ½ΠΎ с ошибкой),
30 30 /// ΠΈΡΠΏΠΎΠ»ΡŒΠ·ΡƒΠ΅Ρ‚ ΠΌΠ΅Ρ‚ΠΎΠ΄Ρ‹ <c>Resolve</c> Π»ΠΈΠ±ΠΎ <c>Reject</c> для оповСщСния ΠΊΠ»ΠΈΠ΅Ρ‚Π½Π° ΠΎ
31 31 /// Π²Ρ‹ΠΏΠΎΠ»Π½Π΅Π½ΠΈΠΈ обСщания.
32 32 /// </para>
33 33 /// <para>
34 34 /// Если сСрвСр успСл Π²Ρ‹ΠΏΠΎΠ»Π½ΠΈΡ‚ΡŒ ΠΎΠ±Π΅Ρ‰Π°Π½ΠΈΠ΅ Π΅Ρ‰Π΅ Π΄ΠΎ Ρ‚ΠΎΠ³ΠΎ, ΠΊΠ°ΠΊ ΠΊΠ»ΠΈΠ΅Π½Ρ‚ Π½Π° Π½Π΅Π³ΠΎ подписался,
35 35 /// Ρ‚ΠΎ Π² ΠΌΠΎΠΌΠ΅Π½Ρ‚ подписки ΠΊΠ»ΠΈΠ΅Π½Ρ‚Π° Π±ΡƒΠ΄ΡƒΡ‚ Π²Ρ‹Π·Π²Π°Π½Ρ‹ ΡΠΎΠΎΡ‚Π²Π΅Ρ‚ΡΠ²ΡƒΡŽΡ‰ΠΈΠ΅ события Π² синхронном
36 36 /// Ρ€Π΅ΠΆΠΈΠΌΠ΅ ΠΈ ΠΊΠ»ΠΈΠ΅Π½Ρ‚ Π±ΡƒΠ΄Π΅Ρ‚ ΠΎΠΏΠΎΠ²Π΅Ρ‰Π΅Π½ Π² любом случаС. Π˜Π½Π°Ρ‡Π΅, ΠΎΠ±Ρ€Π°Π±ΠΎΡ‚Ρ‡ΠΈΠΊΠΈ Π΄ΠΎΠ±Π°Π²Π»ΡΡŽΡ‚ΡΡ Π²
37 37 /// список Π² порядкС подписания ΠΈ Π² этом ΠΆΠ΅ порядкС ΠΎΠ½ΠΈ Π±ΡƒΠ΄ΡƒΡ‚ Π²Ρ‹Π·Π²Π°Π½Ρ‹ ΠΏΡ€ΠΈ Π²Ρ‹ΠΏΠΎΠ»Π½Π΅Π½ΠΈΠΈ
38 38 /// обСщания.
39 39 /// </para>
40 40 /// <para>
41 41 /// ΠžΠ±Ρ€Π°Π±Π°Ρ‚Ρ‹Π²Π°Ρ Ρ€Π΅Π·ΡƒΠ»ΡŒΡ‚Π°Ρ‚Ρ‹ обСщания ΠΌΠΎΠΆΠ½ΠΎ ΠΏΡ€Π΅ΠΎΠ±Ρ€Π°Π·ΠΎΠ²Ρ‹Π²Π°Ρ‚ΡŒ Ρ€Π΅Π·ΡƒΠ»ΡŒΡ‚Π°Ρ‚Ρ‹ Π»ΠΈΠ±ΠΎ ΠΈΠ½ΠΈΡ†ΠΈΠΈΡ€ΠΎΠ²Π°Ρ‚ΡŒ
42 42 /// связанныС асинхронныС ΠΎΠΏΠ΅Ρ€Π°Ρ†ΠΈΠΈ, ΠΊΠΎΡ‚ΠΎΡ€Ρ‹Π΅ Ρ‚Π°ΠΊΠΆΠ΅ Π²ΠΎΠ·Π²Ρ€Π°Ρ‰Π°ΡŽΡ‚ обСщания. Для этого слСдуСт
43 43 /// ΠΈΡΠΏΠΎΠ»ΡŒΠ·ΠΎΠ²Π°Ρ‚ΡŒ ΡΠΎΠΎΡ‚Π²Π΅Ρ‚ΡΡ‚Π²ΡƒΡŽΡ‰ΡƒΡŽ Ρ„ΠΎΡ€ΠΌΡƒ ΠΌΠ΅Ρ‚ΠΎΠ΄Π΅ <c>Then</c>.
44 44 /// </para>
45 45 /// <para>
46 46 /// Π’Π°ΠΊΠΆΠ΅ Ρ…ΠΎΡ€ΠΎΡˆΠΈΠΌ ΠΏΡ€Π°Π²ΠΈΠ»ΠΎΠΌ являСтся Ρ‚ΠΎ, Ρ‡Ρ‚ΠΎ <c>Resolve</c> ΠΈ <c>Reject</c> Π΄ΠΎΠ»ΠΆΠ΅Π½ Π²Ρ‹Π·Ρ‹Π²Π°Ρ‚ΡŒ
47 47 /// Ρ‚ΠΎΠ»ΡŒΠΊΠΎ ΠΈΠ½ΠΈΡ†ΠΈΠ°Ρ‚ΠΎΡ€ обСщания ΠΈΠ½Π°Ρ‡Π΅ ΠΌΠΎΠ³ΡƒΡ‚ Π²ΠΎΠ·Π½ΠΈΠΊΠ½ΡƒΡ‚ΡŒ противорСчия.
48 48 /// </para>
49 49 /// </remarks>
50 50 public class Promise<T> : IPromise<T> {
51 51
52 52 protected struct HandlerDescriptor {
53 53 public ResultHandler<T> resultHandler;
54 54 public ErrorHandler errorHandler;
55 55 public Action cancellHandler;
56 56
57 57 public void Resolve(T result) {
58 58 if (resultHandler != null)
59 59 try {
60 60 resultHandler(result);
61 61 } catch (Exception e) {
62 62 Reject(e);
63 63 }
64 64 }
65 65
66 66 public void Reject(Exception err) {
67 67 if (errorHandler != null)
68 68 try {
69 69 errorHandler(err);
70 70 } catch {
71 71 }
72 72 }
73 73
74 74 public void Cancel() {
75 75 if (cancellHandler != null)
76 76 try {
77 77 cancellHandler();
78 78 } catch {
79 79 }
80 80 }
81 81 }
82 82
83 83 const int UnresolvedSate = 0;
84 84 const int TransitionalState = 1;
85 85 const int SucceededState = 2;
86 86 const int RejectedState = 3;
87 87 const int CancelledState = 4;
88 88
89 89 readonly IPromiseBase m_parent;
90 90 readonly bool m_cancellable;
91 91
92 92 int m_childrenCount = 0;
93 93 int m_state;
94 94 T m_result;
95 95 Exception m_error;
96 96
97 97 readonly MTQueue<HandlerDescriptor> m_handlers = new MTQueue<HandlerDescriptor>();
98 98
99 99 public Promise() {
100 100 m_cancellable = true;
101 101 }
102 102
103 103 public Promise(IPromiseBase parent, bool cancellable) {
104 104 m_cancellable = cancellable;
105 105 m_parent = parent;
106 106 }
107 107
108 108 void InternalCancel() {
109 109 // don't try to cancel parent :)
110 110 Cancel(false);
111 111 }
112 112
113 113 bool BeginTransit() {
114 114 return UnresolvedSate == Interlocked.CompareExchange(ref m_state, TransitionalState, UnresolvedSate);
115 115 }
116 116
117 117 void CompleteTransit(int state) {
118 118 if (TransitionalState != Interlocked.CompareExchange(ref m_state, state, TransitionalState))
119 119 throw new InvalidOperationException("Can't complete transition when the object isn't in the transitional state");
120 120 }
121 121
122 122 void WaitTransition() {
123 123 while (m_state == TransitionalState) {
124 124 /* noop */
125 125 }
126 126 }
127 127
128 128 public bool IsResolved {
129 129 get {
130 130 return m_state > 1;
131 131 }
132 132 }
133 133
134 134 public bool IsCancelled {
135 135 get {
136 136 return m_state == CancelledState;
137 137 }
138 138 }
139 139
140 140 public Type PromiseType {
141 141 get { return typeof(T); }
142 142 }
143 143
144 144 /// <summary>
145 145 /// ВыполняСт ΠΎΠ±Π΅Ρ‰Π°Π½ΠΈΠ΅, сообщая ΠΎΠ± ΡƒΡΠΏΠ΅ΡˆΠ½ΠΎΠΌ Π²Ρ‹ΠΏΠΎΠ»Π½Π΅Π½ΠΈΠΈ.
146 146 /// </summary>
147 147 /// <param name="result">Π Π΅Π·ΡƒΠ»ΡŒΡ‚Π°Ρ‚ выполнСния.</param>
148 148 /// <exception cref="InvalidOperationException">Π”Π°Π½Π½ΠΎΠ΅ ΠΎΠ±Π΅Ρ‰Π°Π½ΠΈΠ΅ ΡƒΠΆΠ΅ Π²Ρ‹ΠΏΠΎΠ»Π½Π΅Π½ΠΎ</exception>
149 149 public void Resolve(T result) {
150 150 if (BeginTransit()) {
151 151 m_result = result;
152 152 CompleteTransit(SucceededState);
153 153 OnStateChanged();
154 154 } else {
155 155 WaitTransition();
156 156 if (m_state != CancelledState)
157 157 throw new InvalidOperationException("The promise is already resolved");
158 158 }
159 159 }
160 160
161 161 /// <summary>
162 162 /// ВыполняСт ΠΎΠ±Π΅Ρ‰Π°Π½ΠΈΠ΅, сообщая ΠΎΠ± ошибкС
163 163 /// </summary>
164 164 /// <remarks>
165 165 /// ΠŸΠΎΡΠΊΠΎΠ»ΡŒΠΊΡƒ ΠΎΠ±Π΅Ρ‰Π°Π½ΠΈΠ΅ Π΄ΠΎΠ»ΠΆΠ½ΠΎ Ρ€Π°Π±ΠΎΡ‚Π°Ρ‚ΡŒ Π² ΠΌΠ½ΠΎΠ³ΠΎΠΏΡ‚ΠΎΡ‡Π½ΠΎΠΉ срСдС, ΠΏΡ€ΠΈ Π΅Π³ΠΎ Π²Ρ‹ΠΏΠΎΠ»Π½Π΅Π½ΠΈΠΈ сразу нСсколько ΠΏΠΎΡ‚ΠΎΠΊΠΎΠ²
166 166 /// ΠΌΠΎΠ³Ρƒ Π²Π΅Ρ€Π½ΡƒΡ‚ΡŒ ΠΎΡˆΠΈΠ±ΠΊΡƒ, ΠΏΡ€ΠΈ этом Ρ‚ΠΎΠ»ΡŒΠΊΠΎ пСрвая Π±ΡƒΠ΄Π΅Ρ‚ использована Π² качСствС Ρ€Π΅Π·ΡƒΠ»ΡŒΡ‚Π°Ρ‚Π°, ΠΎΡΡ‚Π°Π»ΡŒΠ½Ρ‹Π΅
167 167 /// Π±ΡƒΠ΄ΡƒΡ‚ ΠΏΡ€ΠΎΠΈΠ³Π½ΠΎΡ€ΠΈΡ€ΠΎΠ²Π°Π½Ρ‹.
168 168 /// </remarks>
169 169 /// <param name="error">Π˜ΡΠΊΠ»ΡŽΡ‡Π΅Π½ΠΈΠ΅ возникшСС ΠΏΡ€ΠΈ Π²Ρ‹ΠΏΠΎΠ»Π½Π΅Π½ΠΈΠΈ ΠΎΠΏΠ΅Ρ€Π°Ρ†ΠΈΠΈ</param>
170 170 /// <exception cref="InvalidOperationException">Π”Π°Π½Π½ΠΎΠ΅ ΠΎΠ±Π΅Ρ‰Π°Π½ΠΈΠ΅ ΡƒΠΆΠ΅ Π²Ρ‹ΠΏΠΎΠ»Π½Π΅Π½ΠΎ</exception>
171 171 public void Reject(Exception error) {
172 172 if (BeginTransit()) {
173 173 m_error = error;
174 174 CompleteTransit(RejectedState);
175 175 OnStateChanged();
176 176 } else {
177 177 WaitTransition();
178 178 if (m_state == SucceededState)
179 179 throw new InvalidOperationException("The promise is already resolved");
180 180 }
181 181 }
182 182
183 183 /// <summary>
184 184 /// ΠžΡ‚ΠΌΠ΅Π½ΡΠ΅Ρ‚ ΠΎΠΏΠ΅Ρ€Π°Ρ†ΠΈΡŽ, Ссли это Π²ΠΎΠ·ΠΌΠΎΠΆΠ½ΠΎ.
185 185 /// </summary>
186 186 /// <returns><c>true</c> ΠžΠΏΠ΅Ρ€Π°Ρ†ΠΈΡ Π±Ρ‹Π»Π° ΠΎΡ‚ΠΌΠ΅Π½Π΅Π½Π°, ΠΎΠ±Ρ€Π°Π±ΠΎΡ‚Ρ‡ΠΈΠΊΠΈ Π½Π΅ Π±ΡƒΠ΄ΡƒΡ‚ Π²Ρ‹Π·Π²Π°Π½Ρ‹.<c>false</c> ΠΎΡ‚ΠΌΠ΅Π½Π° Π½Π΅ Π²ΠΎΠ·ΠΌΠΎΠΆΠ½Π°, ΠΏΠΎΡΠΊΠΎΠ»ΡŒΠΊΡƒ ΠΎΠ±Π΅Ρ‰Π°Π½ΠΈΠ΅ ΡƒΠΆΠ΅ Π²Ρ‹ΠΏΠΎΠ»Π½Π΅Π½ΠΎ ΠΈ ΠΎΠ±Ρ€Π°Π±ΠΎΡ‚Ρ‡ΠΈΠΊΠΈ ΠΎΡ‚Ρ€Π°Π±ΠΎΡ‚Π°Π»ΠΈ.</returns>
187 187 public bool Cancel() {
188 188 return Cancel(true);
189 189 }
190 190
191 191 /// <summary>
192 192 /// Adds new handlers to this promise.
193 193 /// </summary>
194 194 /// <param name="success">The handler of the successfully completed operation.
195 195 /// This handler will recieve an operation result as a parameter.</param>
196 196 /// <param name="error">Handles an exception that may occur during the operation.</param>
197 197 /// <returns>The new promise chained to this one.</returns>
198 198 public IPromise<T> Then(ResultHandler<T> success, ErrorHandler error) {
199 199 if (success == null && error == null)
200 200 return this;
201 201
202 202 var medium = new Promise<T>(this, true);
203 203
204 204 ResultHandler<T> resultHandler;
205 205 if (success != null)
206 206 resultHandler = x => {
207 207 success(x);
208 208 medium.Resolve(x);
209 209 };
210 210 else
211 211 resultHandler = medium.Resolve;
212 212
213 213 ErrorHandler errorHandler;
214 214 if (error != null)
215 215 errorHandler = x => {
216 216 // нСсмотря Π½Π° Ρ‚ΠΎ, Ρ‡Ρ‚ΠΎ ΠΎΠ±Ρ€Π°Π±ΠΎΡ‚Ρ‡ΠΈΠΊ ошибки вызываСтся бСзопасно,
217 217 // Ρ‚.Π΅. возникшиС Π² Π½Π΅ΠΌ ошибки Π±ΡƒΠ΄ΡƒΡ‚ ΠΏΠΎΠ΄Π°Π²Π»Π΅Π½Ρ‹, Π½Π°ΠΌ Π½ΡƒΠΆΠ½ΠΎ
218 218 // Π³Π°Ρ€Π°Π½Ρ‚ΠΈΡ€ΠΎΠ²Π°Ρ‚ΡŒ, Ρ‡Ρ‚ΠΎ ошибка Π±ΡƒΠ΄Π΅Ρ‚ ΠΏΠ΅Ρ€Π΅Π΄Π°Π½Π° дальшС ΠΏΠΎ Ρ†Π΅ΠΏΠΎΡ‡ΠΊΠ΅ ΠΎΠ±Π΅Ρ‰Π°Π½ΠΈΠΉ
219 219 try {
220 220 error(x);
221 221 } catch { }
222 222 medium.Reject(x);
223 223 };
224 224 else
225 225 errorHandler = medium.Reject;
226 226
227 227 AddHandler(resultHandler, errorHandler, medium.InternalCancel);
228 228
229 229 return medium;
230 230 }
231 231
232 232 public IPromiseBase Then(Action success,ErrorHandler error)
233 233 {
234 234 return Then(x => success(), error);
235 235 }
236 236
237 237 public IPromiseBase Then(Action success)
238 238 {
239 239 return Then(success);
240 240 }
241 241
242 242 /// <summary>
243 243 /// Adds new handlers to this promise.
244 244 /// </summary>
245 245 /// <param name="success">The handler of the successfully completed operation.
246 246 /// This handler will recieve an operation result as a parameter.</param>
247 247 /// <param name="error">Handles an exception that may occur during the operation and returns the value which will be used as the result of the operation.</param>
248 248 /// <returns>The new promise chained to this one.</returns>
249 249 public IPromise<T> Then(ResultHandler<T> success, ErrorHandler<T> error) {
250 250 if (success == null && error == null)
251 251 return this;
252 252
253 253 var medium = new Promise<T>(this, true);
254 254
255 255 ResultHandler<T> resultHandler;
256 256 ErrorHandler errorHandler;
257 257
258 258 if (success != null)
259 259 resultHandler = x => {
260 260 success(x);
261 261 medium.Resolve(x);
262 262 };
263 263 else
264 264 resultHandler = medium.Resolve;
265 265
266 266 if (error != null)
267 267 errorHandler = x => {
268 268 try {
269 269 medium.Resolve(error(x));
270 270 } catch(Exception e) {
271 271 medium.Reject(e);
272 272 }
273 273 };
274 274 else
275 275 errorHandler = medium.Reject;
276 276
277 277 AddHandler(resultHandler, errorHandler, medium.InternalCancel);
278 278
279 279 return medium;
280 280 }
281 281
282 282
283 283 public IPromise<T> Then(ResultHandler<T> success) {
284 284 if (success == null)
285 285 return this;
286 286
287 287 var medium = new Promise<T>(this, true);
288 288
289 289 ResultHandler<T> resultHandler;
290 290
291 291 if (success != null)
292 292 resultHandler = x => {
293 293 success(x);
294 294 medium.Resolve(x);
295 295 };
296 296 else
297 297 resultHandler = medium.Resolve;
298 298
299 299 AddHandler(resultHandler, medium.Reject, medium.InternalCancel);
300 300
301 301 return medium;
302 302 }
303 303
304 304 public IPromise<T> Error(ErrorHandler error) {
305 305 return Then((ResultHandler<T>)null, error);
306 306 }
307 307
308 308 /// <summary>
309 309 /// Handles error and allows to keep the promise.
310 310 /// </summary>
311 311 /// <remarks>
312 312 /// If the specified handler throws an exception, this exception will be used to reject the promise.
313 313 /// </remarks>
314 314 /// <param name="handler">The error handler which returns the result of the promise.</param>
315 315 /// <returns>New promise.</returns>
316 316 public IPromise<T> Error(ErrorHandler<T> handler) {
317 317 if (handler == null)
318 318 return this;
319 319
320 320 var medium = new Promise<T>(this, true);
321 321
322 322 AddHandler(
323 323 x => medium.Resolve(x),
324 324 e => {
325 325 try {
326 326 medium.Resolve(handler(e));
327 327 } catch (Exception e2) {
328 328 medium.Reject(e2);
329 329 }
330 330 },
331 331 medium.InternalCancel
332 332 );
333 333
334 334 return medium;
335 335 }
336 336
337 337 public IPromise<T> Anyway(Action handler) {
338 338 if (handler == null)
339 339 return this;
340 340
341 341 var medium = new Promise<T>();
342 342
343 343 AddHandler(
344 344 x => {
345 345 // to avoid handler being called multiple times we handle exception by ourselfs
346 346 try {
347 347 handler();
348 348 medium.Resolve(x);
349 349 } catch (Exception e) {
350 350 medium.Reject(e);
351 351 }
352 352 },
353 353
354 354 e => {
355 355 try {
356 356 handler();
357 357 } catch { }
358 358 medium.Reject(e);
359 359 },
360 360
361 361 medium.InternalCancel
362 362 );
363 363
364 364 return medium;
365 365 }
366 366
367 367 /// <summary>
368 368 /// ΠŸΠΎΠ·Π²ΠΎΠ»ΡΠ΅Ρ‚ ΠΏΡ€Π΅ΠΎΠ±Ρ€Π°Π·ΠΎΠ²Π°Ρ‚ΡŒ Ρ€Π΅Π·ΡƒΠ»ΡŒΡ‚Π°Ρ‚ выполСния ΠΎΠΏΠ΅Ρ€Π°Ρ†ΠΈΠΈ ΠΊ Π½ΠΎΠ²ΠΎΠΌΡƒ Ρ‚ΠΈΠΏΡƒ.
369 369 /// </summary>
370 370 /// <typeparam name="TNew">Новый Ρ‚ΠΈΠΏ Ρ€Π΅Π·ΡƒΠ»ΡŒΡ‚Π°Ρ‚Π°.</typeparam>
371 371 /// <param name="mapper">ΠŸΡ€Π΅ΠΎΠ±Ρ€Π°Π·ΠΎΠ²Π°Π½ΠΈΠ΅ Ρ€Π΅Π·ΡƒΠ»ΡŒΡ‚Π°Ρ‚Π° ΠΊ Π½ΠΎΠ²ΠΎΠΌΡƒ Ρ‚ΠΈΠΏΡƒ.</param>
372 372 /// <param name="error">ΠžΠ±Ρ€Π°Π±ΠΎΡ‚Ρ‡ΠΈΠΊ ошибки. Π”Π°Π½Π½Ρ‹ΠΉ ΠΎΠ±Ρ€Π°Π±ΠΎΡ‚Ρ‡ΠΈΠΊ ΠΏΠΎΠ»ΡƒΡ‡ΠΈΡ‚
373 373 /// ΠΈΡΠΊΠ»ΡŽΡ‡Π΅Π½ΠΈΠ΅ возникшСС ΠΏΡ€ΠΈ Π²Ρ‹ΠΏΠΎΠ»Π½Π΅Π½ΠΈΠΈ ΠΎΠΏΠ΅Ρ€Π°Ρ†ΠΈΠΈ.</param>
374 374 /// <returns>НовоС ΠΎΠ±Π΅Ρ‰Π°Π½ΠΈΠ΅, ΠΊΠΎΡ‚ΠΎΡ€ΠΎΠ΅ Π±ΡƒΠ΄Π΅Ρ‚ Π²Ρ‹ΠΏΠΎΠ»Π½Π΅Π½ΠΎ ΠΏΡ€ΠΈ Π²Ρ‹ΠΏΠΎΠ»Π½Π΅Π½ΠΈΠΈ исходного обСщания.</returns>
375 375 public IPromise<TNew> Map<TNew>(ResultMapper<T, TNew> mapper, ErrorHandler error) {
376 376 if (mapper == null)
377 377 throw new ArgumentNullException("mapper");
378 378
379 379 // создаСм ΠΏΡ€ΠΈΡ†Π΅ΠΏΠ»Π΅Π½Π½ΠΎΠ΅ ΠΎΠ±Π΅Ρ‰Π°Π½ΠΈΠ΅
380 380 var chained = new Promise<TNew>();
381 381
382 382 ResultHandler<T> resultHandler = result => chained.Resolve(mapper(result));
383 383 ErrorHandler errorHandler = delegate(Exception e) {
384 384 if (error != null)
385 385 try {
386 386 error(e);
387 387 } catch { }
388 388 // Π² случаС ошибки Π½ΡƒΠΆΠ½ΠΎ ΠΏΠ΅Ρ€Π΅Π΄Π°Ρ‚ΡŒ ΠΈΡΠΊΠ»ΡŽΡ‡Π΅Π½ΠΈΠ΅ дальшС ΠΏΠΎ Ρ†Π΅ΠΏΠΎΡ‡ΠΊΠ΅
389 389 chained.Reject(e);
390 390 };
391 391
392 392
393 393 AddHandler(
394 394 resultHandler,
395 395 errorHandler,
396 396 chained.InternalCancel
397 397 );
398 398
399 399 return chained;
400 400 }
401 401
402 402 public IPromise<TNew> Map<TNew>(ResultMapper<T, TNew> mapper) {
403 403 return Map(mapper, null);
404 404 }
405 405
406 406 /// <summary>
407 407 /// БцСпляСт нСсколько аснхронных ΠΎΠΏΠ΅Ρ€Π°Ρ†ΠΈΠΉ. Указанная асинхронная опСрация Π±ΡƒΠ΄Π΅Ρ‚ Π²Ρ‹Π·Π²Π°Π½Π° послС
408 408 /// выполнСния Ρ‚Π΅ΠΊΡƒΡ‰Π΅ΠΉ, Π° Ρ€Π΅Π·ΡƒΠ»ΡŒΡ‚Π°Ρ‚ Ρ‚Π΅ΠΊΡƒΡ‰Π΅ΠΉ ΠΎΠΏΠ΅Ρ€Π°Ρ†ΠΈΠΈ ΠΌΠΎΠΆΠ΅Ρ‚ Π±Ρ‹Ρ‚ΡŒ использован для ΠΈΠ½ΠΈΡ†ΠΈΠ°Π»ΠΈΠ·Π°Ρ†ΠΈΠΈ
409 409 /// Π½ΠΎΠ²ΠΎΠΉ ΠΎΠΏΠ΅Ρ€Π°Ρ†ΠΈΠΈ.
410 410 /// </summary>
411 411 /// <typeparam name="TNew">Π’ΠΈΠΏ Ρ€Π΅Π·ΡƒΠ»ΡŒΡ‚Π°Ρ‚Π° ΡƒΠΊΠ°Π·Π°Π½Π½ΠΎΠΉ асинхронной ΠΎΠΏΠ΅Ρ€Π°Ρ†ΠΈΠΈ.</typeparam>
412 412 /// <param name="chained">Асинхронная опСрация, которая Π΄ΠΎΠ»ΠΆΠ½Π° Π±ΡƒΠ΄Π΅Ρ‚ Π½Π°Ρ‡Π°Ρ‚ΡŒΡΡ послС выполнСния Ρ‚Π΅ΠΊΡƒΡ‰Π΅ΠΉ.</param>
413 413 /// <param name="error">ΠžΠ±Ρ€Π°Π±ΠΎΡ‚Ρ‡ΠΈΠΊ ошибки. Π”Π°Π½Π½Ρ‹ΠΉ ΠΎΠ±Ρ€Π°Π±ΠΎΡ‚Ρ‡ΠΈΠΊ ΠΏΠΎΠ»ΡƒΡ‡ΠΈΡ‚
414 414 /// ΠΈΡΠΊΠ»ΡŽΡ‡Π΅Π½ΠΈΠ΅ возникшСС ΠΏΡ€ΠΈ Π²Ρ‹ΠΏΠΎΠ»Π½Π΅Π½ΠΈΠΈ Ρ‚Π΅ΠΊΡƒΠ΅Ρ‰ΠΉ ΠΎΠΏΠ΅Ρ€Π°Ρ†ΠΈΠΈ.</param>
415 415 /// <returns>НовоС ΠΎΠ±Π΅Ρ‰Π°Π½ΠΈΠ΅, ΠΊΠΎΡ‚ΠΎΡ€ΠΎΠ΅ Π±ΡƒΠ΄Π΅Ρ‚ Π²Ρ‹ΠΏΠΎΠ»Π½Π΅Π½ΠΎ ΠΏΠΎ ΠΎΠΊΠΎΠ½Ρ‡Π°Π½ΠΈΡŽ ΡƒΠΊΠ°Π·Π°Π½Π½ΠΎΠΉ аснхронной ΠΎΠΏΠ΅Ρ€Π°Ρ†ΠΈΠΈ.</returns>
416 416 public IPromise<TNew> Chain<TNew>(ChainedOperation<T, TNew> chained, ErrorHandler error) {
417 417
418 418 // ΠΏΡ€ΠΎΠ±Π»Π΅ΠΌΠ° Π² Ρ‚ΠΎΠΌ, Ρ‡Ρ‚ΠΎ Π½Π° ΠΌΠΎΠΌΠ΅Π½Ρ‚ связывания Π΅Ρ‰Π΅ Π½Π΅ Π½Π°Ρ‡Π°Ρ‚Π° асинхронная опСрация, поэтому Π½ΡƒΠΆΠ½ΠΎ
419 419 // ΡΠΎΠ·Π΄Π°Ρ‚ΡŒ посрСдника, ΠΊ ΠΊΠΎΡ‚ΠΎΡ€ΠΎΠΌΡƒ Π±ΡƒΠ΄ΡƒΡ‚ ΠΏΠΎΠ΄Π²Ρ‹Π·ΡΠ²Π°Ρ‚ΡŒΡΡ ΡΠ»Π΅Π΄ΡƒΡŽΡ‰ΠΈΠ΅ ΠΎΠ±Ρ€Π°Π±ΠΎΡ‚Ρ‡ΠΈΠΊΠΈ.
420 420 // ΠΊΠΎΠ³Π΄Π° Π±ΡƒΠ΄Π΅Ρ‚ Π²Ρ‹ΠΏΠΎΠ»Π½Π΅Π½Π° Ρ€Π΅Π°Π»ΡŒΠ½Π°Ρ асинхронная опСрация, ΠΎΠ½Π° ΠΎΠ±Ρ€Π°Ρ‚ΠΈΡ‚ΡŒΡΡ ΠΊ посрСднику, Ρ‡Ρ‚ΠΎΠ±Ρ‹
421 421 // ΠΏΠ΅Ρ€Π΅Π΄Π°Ρ‚ΡŒ Ρ‡Π΅Ρ€Π΅Π· Π½Π΅Π³ΠΎ Ρ€Π΅Π·ΡƒΠ»ΡŒΡ‚Π°Ρ‚Ρ‹ Ρ€Π°Π±ΠΎΡ‚Ρ‹.
422 422 var medium = new Promise<TNew>(this, true);
423 423
424 424 ResultHandler<T> resultHandler = delegate(T result) {
425 425 if (medium.IsCancelled)
426 426 return;
427 427
428 428 var promise = chained(result);
429 429
430 430 // notify chained operation that it's not needed
431 431 medium.Cancelled(() => promise.Cancel());
432 432 promise.Then(
433 433 x => medium.Resolve(x),
434 434 e => medium.Reject(e)
435 435 );
436 436 };
437 437
438 438 ErrorHandler errorHandler = delegate(Exception e) {
439 439 if (error != null)
440 440 error(e);
441 441 // Π² случаС ошибки Π½ΡƒΠΆΠ½ΠΎ ΠΏΠ΅Ρ€Π΅Π΄Π°Ρ‚ΡŒ ΠΈΡΠΊΠ»ΡŽΡ‡Π΅Π½ΠΈΠ΅ дальшС ΠΏΠΎ Ρ†Π΅ΠΏΠΎΡ‡ΠΊΠ΅
442 442 medium.Reject(e);
443 443 };
444 444
445 445 AddHandler(
446 446 resultHandler,
447 447 errorHandler,
448 448 medium.InternalCancel
449 449 );
450 450
451 451 return medium;
452 452 }
453 453
454 454 public IPromise<TNew> Chain<TNew>(ChainedOperation<T, TNew> chained) {
455 455 return Chain(chained, null);
456 456 }
457 457
458 458 public IPromise<T> Cancelled(Action handler) {
459 459 AddHandler(null, null, handler);
460 460 return this;
461 461 }
462 462
463 463 /// <summary>
464 464 /// Adds the specified handler for all cases (success, error, cancel)
465 465 /// </summary>
466 466 /// <param name="handler">The handler that will be called anyway</param>
467 467 /// <returns>self</returns>
468 468 public IPromise<T> Finally(Action handler) {
469 469 if (handler == null)
470 470 throw new ArgumentNullException("handler");
471 471 AddHandler(
472 472 x => handler(),
473 473 e => handler(),
474 474 handler
475 475 );
476 476 return this;
477 477 }
478 478
479 479 /// <summary>
480 480 /// ΠŸΡ€Π΅ΠΎΠ±Ρ€Π°Π·ΡƒΠ΅Ρ‚ Ρ€Π΅Π·ΡƒΠ»ΡŒΡ‚Π°Ρ‚ обСщания ΠΊ Π½ΡƒΠΆΠ½ΠΎΠΌΡƒ Ρ‚ΠΈΠΏΡƒ
481 481 /// </summary>
482 482 /// <typeparam name="T2"></typeparam>
483 483 /// <returns></returns>
484 484 public IPromise<T2> Cast<T2>() {
485 485 return Map(x => (T2)(object)x, null);
486 486 }
487 487
488 488 /// <summary>
489 489 /// ДоТидаСтся ΠΎΡ‚Π»ΠΎΠΆΠ΅Π½Π½ΠΎΠ³ΠΎ обСщания ΠΈ Π² случаС успСха, Π²ΠΎΠ·Π²Ρ€Π°Ρ‰Π°Π΅Ρ‚
490 490 /// Π΅Π³ΠΎ, Ρ€Π΅Π·ΡƒΠ»ΡŒΡ‚Π°Ρ‚, Π² ΠΏΡ€ΠΎΡ‚ΠΈΠ²Π½ΠΎΠΌ случаС бросаСт ΠΈΡΠΊΠ»ΡŽΡ‡Π΅Π½ΠΈΠ΅.
491 491 /// </summary>
492 492 /// <remarks>
493 493 /// <para>
494 494 /// Если ΠΎΠΆΠΈΠ΄Π°Π½ΠΈΠ΅ обСщания Π±Ρ‹Π»ΠΎ ΠΏΡ€Π΅Ρ€Π²Π°Π½ΠΎ ΠΏΠΎ Ρ‚Π°ΠΉΠΌΠ°ΡƒΡ‚Ρƒ, это Π½Π΅ Π·Π½Π°Ρ‡ΠΈΡ‚,
495 495 /// Ρ‡Ρ‚ΠΎ ΠΎΠ±Π΅Ρ‰Π°Π½ΠΈΠ΅ Π±Ρ‹Π»ΠΎ ΠΎΡ‚ΠΌΠ΅Π½Π΅Π½ΠΎ ΠΈΠ»ΠΈ Ρ‡Ρ‚ΠΎ-Ρ‚ΠΎ Π² этом Ρ€ΠΎΠ΄Π΅, это Ρ‚ΠΎΠ»ΡŒΠΊΠΎ
496 496 /// ΠΎΠ·Π½Π°Ρ‡Π°Π΅Ρ‚, Ρ‡Ρ‚ΠΎ ΠΌΡ‹ Π΅Π³ΠΎ Π½Π΅ доТдались, ΠΎΠ΄Π½Π°ΠΊΠΎ всС зарСгистрированныС
497 497 /// ΠΎΠ±Ρ€Π°Π±ΠΎΡ‚Ρ‡ΠΈΠΊΠΈ, ΠΊΠ°ΠΊ Π±Ρ‹Π»ΠΈ Ρ‚Π°ΠΊ ΠΎΡΡ‚Π°Π»ΠΈΡΡŒ ΠΈ ΠΎΠ½ΠΈ Π±ΡƒΠ΄ΡƒΡ‚ Π²Ρ‹Π·Π²Π°Π½Ρ‹, ΠΊΠΎΠ³Π΄Π°
498 498 /// ΠΎΠ±Π΅Ρ‰Π°Π½ΠΈΠ΅ Π±ΡƒΠ΄Π΅Ρ‚ Π²Ρ‹ΠΏΠΎΠ»Π½Π΅Π½ΠΎ.
499 499 /// </para>
500 500 /// <para>
501 501 /// Π’Π°ΠΊΠΎΠ΅ ΠΏΠΎΠ²Π΅Π΄Π΅Π½ΠΈΠ΅ Π²ΠΏΠΎΠ»Π½Π΅ ΠΎΠΏΡ€Π°Π²Π΄Π°Π½ΠΎ ΠΏΠΎΡΠΊΠΎΠ»ΡŒΠΊΡƒ Ρ‚Π°ΠΉΠΌΠ°ΡƒΡ‚ ΠΌΠΎΠΆΠ΅Ρ‚ ΠΈΡΡ‚Π΅Ρ‡ΡŒ
502 502 /// Π² Ρ‚ΠΎΡ‚ ΠΌΠΎΠΌΠ΅Π½Ρ‚, ΠΊΠΎΠ³Π΄Π° Π½Π°Ρ‡Π°Π»Π°ΡΡŒ ΠΎΠ±Ρ€Π°Π±ΠΎΡ‚ΠΊΠ° Ρ†Π΅ΠΏΠΎΡ‡ΠΊΠΈ ΠΎΠ±Ρ€Π°Π±ΠΎΡ‚Ρ‡ΠΈΠΊΠΎΠ², ΠΈ
503 503 /// ΠΊ Ρ‚ΠΎΠΌΡƒ ΠΆΠ΅ Ρ‚Π΅ΠΊΡƒΡ‰Π΅Π΅ ΠΎΠ±Π΅Ρ‰Π°Π½ΠΈΠ΅ ΠΌΠΎΠΆΠ΅Ρ‚ ΡΡ‚ΠΎΡΡ‚ΡŒ Π² Ρ†Π΅ΠΏΠΎΡ‡ΠΊΠ΅ ΠΎΠ±Π΅Ρ‰Π°Π½ΠΈΠΉ ΠΈ Π΅Π³ΠΎ
504 504 /// ΠΎΡ‚ΠΊΠ»ΠΎΠ½Π΅Π½ΠΈΠ΅ ΠΌΠΎΠΆΠ΅Ρ‚ привСсти ΠΊ Π½Π΅ΠΏΡ€ΠΎΠ³Π½ΠΎΠ·ΠΈΡ€ΡƒΠ΅ΠΌΠΎΠΌΡƒ Ρ€Π΅Π·ΡƒΠ»ΡŒΡ‚Π°Ρ‚Ρƒ.
505 505 /// </para>
506 506 /// </remarks>
507 507 /// <param name="timeout">ВрСмя оТидания</param>
508 508 /// <returns>Π Π΅Π·ΡƒΠ»ΡŒΡ‚Π°Ρ‚ выполнСния обСщания</returns>
509 509 public T Join(int timeout) {
510 510 var evt = new ManualResetEvent(false);
511 511 Anyway(() => evt.Set());
512 512 Cancelled(() => evt.Set());
513 513
514 514 if (!evt.WaitOne(timeout, true))
515 515 throw new TimeoutException();
516 516
517 517 switch (m_state) {
518 518 case SucceededState:
519 519 return m_result;
520 520 case CancelledState:
521 521 throw new OperationCanceledException();
522 522 case RejectedState:
523 523 throw new TargetInvocationException(m_error);
524 524 default:
525 525 throw new ApplicationException(String.Format("Invalid promise state {0}", m_state));
526 526 }
527 527 }
528 528
529 529 public T Join() {
530 530 return Join(Timeout.Infinite);
531 531 }
532 532
533 533 void AddHandler(ResultHandler<T> success, ErrorHandler error, Action cancel) {
534 534 Interlocked.Increment(ref m_childrenCount);
535 535
536 536 HandlerDescriptor handler = new HandlerDescriptor {
537 537 resultHandler = success,
538 538 errorHandler = error,
539 539 cancellHandler = cancel
540 540 };
541 541
542 542 bool queued;
543 543
544 544 if (!IsResolved) {
545 545 m_handlers.Enqueue(handler);
546 546 queued = true;
547 547 } else {
548 548 // the promise is in resolved state, just invoke the handled with minimum overhead
549 549 queued = false;
550 550 InvokeHandler(handler);
551 551 }
552 552
553 553 if (queued && IsResolved && m_handlers.TryDequeue(out handler))
554 554 // if the promise have been resolved while we was adding handler to the queue
555 555 // we can't guarantee that someone is still processing it
556 556 // therefore we will fetch a handler from the queue and execute it
557 557 // note that fetched handler may be not the one that we have added
558 558 // even we can fetch no handlers at all :)
559 559 InvokeHandler(handler);
560 560 }
561 561
562 562 protected virtual void InvokeHandler(HandlerDescriptor handler) {
563 563 switch (m_state) {
564 564 case SucceededState:
565 565 handler.Resolve(m_result);
566 566 break;
567 567 case RejectedState:
568 568 handler.Reject(m_error);
569 569 break;
570 570 case CancelledState:
571 571 handler.Cancel();
572 572 break;
573 573 default:
574 574 // do nothing
575 575 return;
576 576 }
577 577 }
578 578
579 579 protected virtual void OnStateChanged() {
580 580 HandlerDescriptor handler;
581 581 while (m_handlers.TryDequeue(out handler))
582 582 InvokeHandler(handler);
583 583 }
584 584
585 585 public bool IsExclusive {
586 586 get {
587 587 return m_childrenCount <= 1;
588 588 }
589 589 }
590 590
591 591 protected bool Cancel(bool dependencies) {
592 592 if (BeginTransit()) {
593 593 CompleteTransit(CancelledState);
594 594 OnStateChanged();
595 595
596 596 if (dependencies && m_parent != null && m_parent.IsExclusive)
597 597 m_parent.Cancel();
598 598
599 599 return true;
600 600 } else {
601 601 return false;
602 602 }
603 603 }
604 604
605 605 /// <summary>
606 606 /// ΠžΠ±ΡŠΠ΅Π΄ΠΈΠ½ΡΠ΅Ρ‚ нСсколько ΠΎΠ±Π΅Ρ‰Π°Π½ΠΈΠΉ Π² ΠΎΠ΄Π½ΠΎ, Ρ€Π΅Π·ΡƒΠ»ΡŒΡ‚Π°Ρ‚ΠΎΠΌ ΠΊΠΎΡ‚ΠΎΡ€ΠΎΠ³ΠΎ являСтся массив Ρ€Π΅Π·ΡƒΠ»ΡŒΡ‚Π°Ρ‚ΠΎΠ² Π΄Ρ€ΡƒΠ³ΠΈΡ… ΠΎΠ±Π΅Ρ‰Π°Π½ΠΈΠΉ.
607 607 /// Если хотябы ΠΎΠ΄Π½ΠΎ ΠΈΠ· ΠΏΠ΅Ρ€Π΅Π΄Π°Π½Π½Ρ‹Ρ… ΠΎΠ±Π΅Ρ‰Π°Π½ΠΈΠΉ Π½Π΅ Π±ΡƒΠ΄Π΅Ρ‚ Π²Ρ‹ΠΏΠΎΠ»Π½Π΅Π½ΠΎ, Ρ‚ΠΎ Π½ΠΎΠ²ΠΎΠ΅ ΠΎΠ±Π΅Ρ‰Π΅Π½ΠΈΠ΅ Ρ‚ΠΎΠΆΠ΅ Π½Π΅ Π±ΡƒΠ΄Π΅Ρ‚ Π²Ρ‹ΠΏΠΎΠ»Π½Π΅Π½ΠΎ.
608 608 /// ΠŸΡ€ΠΈ ΠΎΡ‚ΠΌΠ΅Π½Π΅ Π½ΠΎΠ²ΠΎΠ³ΠΎ обСщания, ΠΏΠ΅Ρ€Π΅Π΄Π°Π½Π½Ρ‹Π΅ обСщания Ρ‚Π°ΠΊΠΆΠ΅ Π±ΡƒΠ΄ΡƒΡ‚ ΠΎΡ‚ΠΌΠ΅Π½Π΅Π½Ρ‹, Ссли Π½ΠΈΠΊΡ‚ΠΎ большС Π½Π° Π½ΠΈΡ… Π½Π΅ подписан.
609 609 /// </summary>
610 610 /// <param name="promises">Бписок ΠΎΠ±Π΅Ρ‰Π°Π½ΠΈΠΉ. Если список пустой, Ρ‚ΠΎ Ρ€Π΅Π·ΡƒΠ»ΡŒΡ‚ΠΈΡ€ΡƒΡŽΡ‰Π΅Π΅ ΠΎΠ±Π΅Ρ‰Π°Π½ΠΈΠ΅ возвращаСтся ΡƒΠΆΠ΅ Π²Ρ‹ΠΏΠΎΠ»Π½Π΅Π½Π½Ρ‹ΠΌ.</param>
611 611 /// <returns>ΠžΠ±Π΅Ρ‰Π°Π½ΠΈΠ΅ ΠΎΠ±ΡŠΠ΅Π΄ΠΈΠ½ΡΡŽΡ‰Π΅Π΅ Π² сСбС Ρ€Π΅Π·ΡƒΠ»ΡŒΡ‚Π°Ρ‚ ΠΏΠ΅Ρ€Π΅Π΄Π°Π½Π½Ρ‹Ρ… ΠΎΠ±Π΅Ρ‰Π°Π½ΠΈΠΉ.</returns>
612 612 /// <exception cref="ArgumentNullException"><paramref name="promises"/> Π½Π΅ ΠΌΠΎΠΆΠ΅Ρ‚ Π±Ρ‹Ρ‚ΡŒ null</exception>
613 public static Promise<T[]> CreateComposite(IList<Promise<T>> promises) {
613 public static IPromise<T[]> CreateComposite(IList<IPromise<T>> promises) {
614 614 if (promises == null)
615 615 throw new ArgumentNullException();
616 616
617 617 // создаСм аккумулятор для Ρ€Π΅Π·ΡƒΠ»ΡŒΡ‚Π°Ρ‚ΠΎΠ² ΠΈ Ρ€Π΅Π·ΡƒΠ»ΡŒΡ‚ΠΈΡ€ΡƒΡŽΡ‰Π΅Π΅ ΠΎΠ±Π΅Ρ‰Π°Π½ΠΈΠ΅
618 618 var result = new T[promises.Count];
619 619 var promise = new Promise<T[]>();
620 620
621 621 // special case
622 622 if (promises.Count == 0) {
623 623 promise.Resolve(result);
624 624 return promise;
625 625 }
626 626
627 627 int pending = promises.Count;
628 628
629 629 for (int i = 0; i < promises.Count; i++) {
630 630 var dest = i;
631 631
632 632 promises[i].Then(
633 633 x => {
634 634 result[dest] = x;
635 635 if(Interlocked.Decrement(ref pending) == 0)
636 636 promise.Resolve(result);
637 637 },
638 638 e => promise.Reject(e)
639 639 );
640 640 }
641 641
642 642 promise.Cancelled(
643 643 () => {
644 644 foreach(var d in promises)
645 645 if(d.IsExclusive)
646 646 d.Cancel();
647 647 }
648 648 );
649 649
650 650 return promise;
651 651 }
652 652
653 653 public static Promise<T> ResultToPromise(T result) {
654 654 var p = new Promise<T>();
655 655 p.Resolve(result);
656 656 return p;
657 657 }
658 658
659 659 public static Promise<T> ExceptionToPromise(Exception error) {
660 660 if (error == null)
661 661 throw new ArgumentNullException();
662 662
663 663 var p = new Promise<T>();
664 664 p.Reject(error);
665 665 return p;
666 666 }
667 667
668 668 }
669 669 }
General Comments 0
You need to be logged in to leave comments. Login now