##// END OF EJS Templates
refactoring
cin -
r25:9bf5b23650c9 default
parent child
Show More
@@ -0,0 +1,19
1 using System;
2 using System.Collections.Generic;
3 using System.Linq;
4 using System.Text;
5
6 namespace Implab {
7 public interface IPromiseBase: ICancellable {
8 /// <summary>
9 /// Check whereather the promise has no more than one dependent promise.
10 /// </summary>
11 bool IsExclusive {
12 get;
13 }
14
15 bool IsResolved { get; }
16
17 bool IsCancelled { get; }
18 }
19 }
@@ -5,15 +5,11 using System.Text;
5 5
6 6 namespace Implab
7 7 {
8 public interface IPromise: ICancellable
8 public interface IPromise<T>: IPromiseBase
9 9 {
10 /// <summary>
11 /// Check whereather the promise has no more than one dependent promise.
12 /// </summary>
13 bool IsExclusive
14 {
15 get;
16 }
10
11
12
17 13
18 14
19 15 }
@@ -4,9 +4,11 using System.Linq;
4 4 using System.Text;
5 5
6 6 namespace Implab {
7 public interface ITaskController: IProgressHandler {
8 bool Cancelled {
7 public interface ITaskController: IProgressHandler, ICancellable {
8 bool IsCancelled {
9 9 get;
10 10 }
11
12 event EventHandler Cancelled;
11 13 }
12 14 }
@@ -36,6 +36,7
36 36 <Compile Include="IProgressHandler.cs" />
37 37 <Compile Include="IProgressNotifier.cs" />
38 38 <Compile Include="IPromise.cs" />
39 <Compile Include="IPromiseBase.cs" />
39 40 <Compile Include="ITaskController.cs" />
40 41 <Compile Include="ManagedPromise.cs" />
41 42 <Compile Include="Parallels\DispatchPool.cs" />
@@ -168,45 +168,5 namespace Implab.Parallels {
168 168 return promise.Anyway(() => semaphore.Dispose());
169 169 }
170 170
171 /*
172 this method is pretty fast, but it may cause a stack overflow if an element transformation is made faster then the next operation is
173 be chained, in this case the syncronous callback invocation will occur
174
175 public static Promise<TDst[]> ChainedMap2<TSrc, TDst>(this TSrc[] source, ChainedOperation<TSrc, TDst> transform, int threads) {
176 if (source == null)
177 throw new ArgumentNullException("source");
178 if (transform == null)
179 throw new ArgumentNullException("transform");
180 if (threads <= 0)
181 throw new ArgumentOutOfRangeException("Threads number must be greater then zero");
182
183 var promise = new Promise<TDst[]>();
184 var res = new TDst[source.Length];
185 var index = -1; // we will start with increment
186 var len = source.Length;
187 var pending = len;
188
189 Action<int> callback = null;
190 callback = (current) => {
191 if (current < len) {
192 transform(source[current])
193 .Then(
194 x => {
195 res[current] = x;
196 if (Interlocked.Decrement(ref pending) == 0)
197 promise.Resolve(res);
198 else
199 callback(Interlocked.Increment(ref index));
200 },
201 e => promise.Reject(e)
202 );
203 }
204 };
205
206 for (int i = 0; i < threads; i++)
207 callback(Interlocked.Increment(ref index));
208 return promise;
209 }
210 */
211 171 }
212 172 }
@@ -1,28 +1,28
1 using System;
2 using System.Threading;
3
4 namespace Implab.Parallels {
5 /// <summary>
6 /// Класс для распаралеливания задач.
7 /// </summary>
8 /// <remarks>
9 /// Используя данный класс и лямда выражения можно распараллелить
10 /// вычисления, для этого используется концепция обещаний.
11 /// </remarks>
12 public static class AsyncPool {
13
14 public static Promise<T> Invoke<T>(Func<T> func) {
15 var p = new Promise<T>();
16
17 ThreadPool.QueueUserWorkItem(param => {
18 try {
1 using System;
2 using System.Threading;
3
4 namespace Implab.Parallels {
5 /// <summary>
6 /// Класс для распаралеливания задач.
7 /// </summary>
8 /// <remarks>
9 /// Используя данный класс и лямда выражения можно распараллелить
10 /// вычисления, для этого используется концепция обещаний.
11 /// </remarks>
12 public static class AsyncPool {
13
14 public static Promise<T> Invoke<T>(Func<T> func) {
15 var p = new Promise<T>();
16
17 ThreadPool.QueueUserWorkItem(param => {
18 try {
19 19 p.Resolve(func());
20 } catch(Exception e) {
21 p.Reject(e);
22 }
23 });
24
25 return p;
20 } catch(Exception e) {
21 p.Reject(e);
22 }
23 });
24
25 return p;
26 26 }
27 27
28 28 public static Promise<T> InvokeNewThread<T>(Func<T> func) {
@@ -39,6 +39,6 namespace Implab.Parallels {
39 39 worker.Start();
40 40
41 41 return p;
42 }
43 }
44 }
42 }
43 }
44 }
@@ -47,7 +47,7 namespace Implab {
47 47 /// только инициатор обещания иначе могут возникнуть противоречия.
48 48 /// </para>
49 49 /// </remarks>
50 public class Promise<T> : IPromise {
50 public class Promise<T> : IPromise<T> {
51 51
52 52 struct HandlerDescriptor {
53 53 public ResultHandler<T> resultHandler;
@@ -82,11 +82,11 namespace Implab {
82 82
83 83 const int UnresolvedSate = 0;
84 84 const int TransitionalState = 1;
85 const int ResolvedState = 2;
85 const int SucceededState = 2;
86 86 const int RejectedState = 3;
87 87 const int CancelledState = 4;
88 88
89 readonly IPromise m_parent;
89 readonly IPromiseBase m_parent;
90 90 readonly bool m_cancellable;
91 91
92 92 int m_childrenCount = 0;
@@ -100,7 +100,7 namespace Implab {
100 100 m_cancellable = true;
101 101 }
102 102
103 public Promise(IPromise parent, bool cancellable) {
103 public Promise(IPromiseBase parent, bool cancellable) {
104 104 m_cancellable = cancellable;
105 105 m_parent = parent;
106 106 }
@@ -119,6 +119,12 namespace Implab {
119 119 throw new InvalidOperationException("Can't complete transition when the object isn't in the transitional state");
120 120 }
121 121
122 void WaitTransition() {
123 while (m_state == TransitionalState) {
124 /* noop */
125 }
126 }
127
122 128 public bool IsResolved {
123 129 get {
124 130 return m_state > 1;
@@ -139,10 +145,13 namespace Implab {
139 145 public void Resolve(T result) {
140 146 if (BeginTransit()) {
141 147 m_result = result;
142 CompleteTransit(ResolvedState);
148 CompleteTransit(SucceededState);
143 149 OnStateChanged();
144 } else if (m_state != CancelledState)
145 throw new InvalidOperationException("The promise is already resolved");
150 } else {
151 WaitTransition();
152 if (m_state != CancelledState)
153 throw new InvalidOperationException("The promise is already resolved");
154 }
146 155 }
147 156
148 157 /// <summary>
@@ -160,8 +169,11 namespace Implab {
160 169 m_error = error;
161 170 CompleteTransit(RejectedState);
162 171 OnStateChanged();
163 } else if (m_state == ResolvedState)
164 throw new InvalidOperationException("The promise is already resolved");
172 } else {
173 WaitTransition();
174 if (m_state == SucceededState)
175 throw new InvalidOperationException("The promise is already resolved");
176 }
165 177 }
166 178
167 179 /// <summary>
@@ -197,6 +209,9 namespace Implab {
197 209 ErrorHandler errorHandler;
198 210 if (error != null)
199 211 errorHandler = x => {
212 // несмотря на то, что обработчик ошибки вызывается безопасно,
213 // т.е. возникшие в нем ошибки будут подавлены, нам нужно
214 // гарантировать, что ошибка будет передана дальше по цепочке обещаний
200 215 try {
201 216 error(x);
202 217 } catch { }
@@ -238,8 +253,9 namespace Implab {
238 253 errorHandler = x => {
239 254 try {
240 255 medium.Resolve(error(x));
241 } catch { }
242 medium.Reject(x);
256 } catch(Exception e) {
257 medium.Reject(e);
258 }
243 259 };
244 260 else
245 261 errorHandler = medium.Reject;
@@ -257,7 +273,7 namespace Implab {
257 273 var medium = new Promise<T>(this, true);
258 274
259 275 ResultHandler<T> resultHandler;
260
276
261 277 if (success != null)
262 278 resultHandler = x => {
263 279 success(x);
@@ -430,6 +446,11 namespace Implab {
430 446 return this;
431 447 }
432 448
449 /// <summary>
450 /// Adds the specified handler for all cases (success, error, cancel)
451 /// </summary>
452 /// <param name="handler">The handler that will be called anyway</param>
453 /// <returns>self</returns>
433 454 public Promise<T> Finally(Action handler) {
434 455 if (handler == null)
435 456 throw new ArgumentNullException("handler");
@@ -471,7 +492,7 namespace Implab {
471 492 throw new TimeoutException();
472 493
473 494 switch (m_state) {
474 case ResolvedState:
495 case SucceededState:
475 496 return m_result;
476 497 case CancelledState:
477 498 throw new OperationCanceledException();
@@ -517,7 +538,7 namespace Implab {
517 538
518 539 void InvokeHandler(HandlerDescriptor handler) {
519 540 switch (m_state) {
520 case ResolvedState:
541 case SucceededState:
521 542 handler.Resolve(m_result);
522 543 break;
523 544 case RejectedState:
@@ -538,8 +559,6 namespace Implab {
538 559 InvokeHandler(handler);
539 560 }
540 561
541
542
543 562 public bool IsExclusive {
544 563 get {
545 564 return m_childrenCount <= 1;
@@ -560,5 +579,68 namespace Implab {
560 579 }
561 580 }
562 581
582 /// <summary>
583 /// Объединяет несколько обещаний в одно, результатом которого является массив результатов других обещаний.
584 /// Если хотябы одно из переданных обещаний не будет выполнено, то новое обещение тоже не будет выполнено.
585 /// При отмене нового обещания, переданные обещания также будут отменены, если никто больше на них не подписан.
586 /// </summary>
587 /// <param name="promises">Список обещаний. Если список пустой, то результирующее обещание возвращается уже выполненным.</param>
588 /// <returns>Обещание объединяющее в себе результат переданных обещаний.</returns>
589 /// <exception cref="ArgumentNullException"><paramref name="promises"/> не может быть null</exception>
590 public static Promise<T[]> CreateComposite(IList<Promise<T>> promises) {
591 if (promises == null)
592 throw new ArgumentNullException();
593
594 // создаем аккумулятор для результатов и результирующее обещание
595 var result = new T[promises.Count];
596 var promise = new Promise<T[]>();
597
598 // special case
599 if (promises.Count == 0) {
600 promise.Resolve(result);
601 return promise;
602 }
603
604 int pending = promises.Count;
605
606 for (int i = 0; i < promises.Count; i++) {
607 var dest = i;
608
609 promises[i].Then(
610 x => {
611 result[dest] = x;
612 if(Interlocked.Decrement(ref pending) == 0)
613 promise.Resolve(result);
614 },
615 e => promise.Reject(e)
616 );
617 }
618
619 promise.Cancelled(
620 () => {
621 foreach(var d in promises)
622 if(d.IsExclusive)
623 d.Cancel();
624 }
625 );
626
627 return promise;
628 }
629
630 public static Promise<T> ResultToPromise(T result) {
631 var p = new Promise<T>();
632 p.Resolve(result);
633 return p;
634 }
635
636 public static Promise<T> ExceptionToPromise(Exception error) {
637 if (error == null)
638 throw new ArgumentNullException();
639
640 var p = new Promise<T>();
641 p.Reject(error);
642 return p;
643 }
644
563 645 }
564 646 }
@@ -12,7 +12,7 namespace Implab
12 12 /// <remarks>
13 13 /// Members of this object are thread safe.
14 14 /// </remarks>
15 class TaskController: IProgressNotifier, ITaskController, ICancellable
15 public class TaskController: IProgressNotifier, ITaskController
16 16 {
17 17 readonly object m_lock;
18 18 string m_message;
@@ -22,6 +22,7 namespace Implab
22 22
23 23 bool m_cancelled;
24 24
25 public event EventHandler Cancelled;
25 26 public event EventHandler<ValueEventArgs<string>> MessageUpdated;
26 27 public event EventHandler<ValueEventArgs<float>> ProgressUpdated;
27 28 public event EventHandler<ProgressInitEventArgs> ProgressInit;
@@ -84,7 +85,7 namespace Implab
84 85 }
85 86 }
86 87
87 public bool Cancelled {
88 public bool IsCancelled {
88 89 get {
89 90 lock (m_lock)
90 91 return m_cancelled;
@@ -102,6 +103,13 namespace Implab
102 103 }
103 104 }
104 105
106 protected virtual void OnCancelled() {
107 var temp = Cancelled;
108 if (temp != null) {
109 temp(this,new EventArgs());
110 }
111 }
112
105 113 protected virtual void OnMessageUpdated()
106 114 {
107 115 var temp = MessageUpdated;
General Comments 0
You need to be logged in to leave comments. Login now