##// END OF EJS Templates
Слияние с promises
cin -
r23:f0568ff069a5 merge default
parent child
Show More
@@ -14,7 +14,7 namespace Implab.Test {
14 14 p.Then(x => res = x);
15 15 p.Resolve(100);
16 16
17 Assert.AreEqual(res, 100);
17 Assert.AreEqual(100, res);
18 18 }
19 19
20 20 [TestMethod]
@@ -101,18 +101,18 namespace Implab.Test {
101 101 public void WorkerPoolSizeTest() {
102 102 var pool = new WorkerPool(5, 10, 0);
103 103
104 Assert.AreEqual(5, pool.ThreadCount);
104 Assert.AreEqual(5, pool.PoolSize);
105 105
106 pool.Invoke(() => { Thread.Sleep(1000000); return 10; });
107 pool.Invoke(() => { Thread.Sleep(1000000); return 10; });
108 pool.Invoke(() => { Thread.Sleep(1000000); return 10; });
106 pool.Invoke(() => { Thread.Sleep(100000000); return 10; });
107 pool.Invoke(() => { Thread.Sleep(100000000); return 10; });
108 pool.Invoke(() => { Thread.Sleep(100000000); return 10; });
109 109
110 Assert.AreEqual(5, pool.ThreadCount);
110 Assert.AreEqual(5, pool.PoolSize);
111 111
112 112 for (int i = 0; i < 100; i++)
113 pool.Invoke(() => { Thread.Sleep(1000000); return 10; });
114 Thread.Sleep(100);
115 Assert.AreEqual(10, pool.ThreadCount);
113 pool.Invoke(() => { Thread.Sleep(100000000); return 10; });
114 Thread.Sleep(200);
115 Assert.AreEqual(10, pool.PoolSize);
116 116
117 117 pool.Dispose();
118 118 }
@@ -149,10 +149,10 namespace Implab.Test {
149 149 [TestMethod]
150 150 public void WorkerPoolDisposeTest() {
151 151 var pool = new WorkerPool(5, 20);
152 Assert.AreEqual(5, pool.ThreadCount);
152 Assert.AreEqual(5, pool.PoolSize);
153 153 pool.Dispose();
154 Thread.Sleep(100);
155 Assert.AreEqual(0, pool.ThreadCount);
154 Thread.Sleep(500);
155 Assert.AreEqual(0, pool.PoolSize);
156 156 pool.Dispose();
157 157 }
158 158
@@ -244,7 +244,7 namespace Implab.Test {
244 244 [TestMethod]
245 245 public void ChainedMapTest() {
246 246
247 using (var pool = new WorkerPool(8,100,0)) {
247 using (var pool = new WorkerPool(4,4,0)) {
248 248 int count = 10000;
249 249
250 250 double[] args = new double[count];
1 NO CONTENT: modified file, binary diff hidden
1 NO CONTENT: modified file, binary diff hidden
@@ -15,19 +15,6 namespace Implab
15 15 get;
16 16 }
17 17
18 /// <summary>
19 /// The current state of the promise.
20 /// </summary>
21 PromiseState State
22 {
23 get;
24 }
25 18
26 /// <summary>
27 /// Registers handler for the case when the promise is cencelled. If the promise already cancelled the
28 /// handler will be invoked immediatelly.
29 /// </summary>
30 /// <param name="handler">The handler</param>
31 void HandleCancelled(Action handler);
32 19 }
33 20 }
@@ -140,7 +140,7 namespace Implab.Parallels {
140 140
141 141 AsyncPool.InvokeNewThread(() => {
142 142 for (int i = 0; i < source.Length; i++) {
143 if(promise.State != PromiseState.Unresolved)
143 if(promise.IsResolved)
144 144 break; // stop processing in case of error or cancellation
145 145 var idx = i;
146 146 semaphore.WaitOne();
@@ -9,10 +9,15 namespace Implab.Parallels {
9 9 public abstract class DispatchPool<TUnit> : IDisposable {
10 10 readonly int m_minThreads;
11 11 readonly int m_maxThreads;
12 int m_runningThreads = 0;
13 int m_maxRunningThreads = 0;
14 int m_suspended = 0;
15 int m_exitRequired = 0;
12
13 int m_createdThreads = 0; // the current size of the pool
14 int m_activeThreads = 0; // the count of threads which are active
15 int m_sleepingThreads = 0; // the count of currently inactive threads
16 int m_maxRunningThreads = 0; // the meximum reached size of the pool
17 int m_exitRequired = 0; // the pool is going to shutdown, all unused workers are released
18 int m_releaseTimeout = 100; // the timeout while the working thread will wait for the new tasks before exit
19 int m_wakeEvents = 0; // the count of wake events
20
16 21 AutoResetEvent m_hasTasks = new AutoResetEvent(false);
17 22
18 23 protected DispatchPool(int min, int max) {
@@ -44,9 +49,15 namespace Implab.Parallels {
44 49 StartWorker();
45 50 }
46 51
47 public int ThreadCount {
52 public int PoolSize {
48 53 get {
49 return m_runningThreads;
54 return m_createdThreads;
55 }
56 }
57
58 public int ActiveThreads {
59 get {
60 return m_activeThreads;
50 61 }
51 62 }
52 63
@@ -64,29 +75,120 namespace Implab.Parallels {
64 75
65 76 protected abstract bool TryDequeue(out TUnit unit);
66 77
67 protected virtual bool ExtendPool() {
68 if (m_suspended > 0) {
78 #region thread execution traits
79 int SignalThread() {
80 var signals = Interlocked.Increment(ref m_wakeEvents);
81 if(signals == 1)
69 82 m_hasTasks.Set();
83 return signals;
84 }
85
86 bool FetchSignalOrWait(int timeout) {
87 var start = Environment.TickCount;
88
89 // означает, что поток владеет блокировкой и при успешном получении сигнала должен
90 // ее вернуть, чтобы другой ожидающий поток смог
91 bool hasLock = false;
92 do {
93 int signals;
94 do {
95 signals = m_wakeEvents;
96 if (signals == 0)
97 break;
98 } while (Interlocked.CompareExchange(ref m_wakeEvents, signals - 1, signals) != signals);
99
100 if (signals >= 1) {
101 if (signals > 1 && hasLock)
102 m_hasTasks.Set();
103 return true;
104 }
105
106 if (timeout != -1)
107 timeout = Math.Max(0, timeout - (Environment.TickCount - start));
108
109 // если сигналов больше не осталось, то первый поток, который дошел сюда сбросит событие
110 // и уйдет на пустой цикл, после чего заблокируется
111
112 hasLock = true;
113 } while (m_hasTasks.WaitOne(timeout));
114
115 return false;
116 }
117
118 bool Sleep(int timeout) {
119 Interlocked.Increment(ref m_sleepingThreads);
120 if (FetchSignalOrWait(timeout)) {
121 Interlocked.Decrement(ref m_sleepingThreads);
70 122 return true;
71 } else
72 return StartWorker();
123 } else {
124 Interlocked.Decrement(ref m_sleepingThreads);
125 return false;
126 }
73 127 }
128 #endregion
74 129
75 130 /// <summary>
76 131 /// Запускает либо новый поток, если раньше не было ни одного потока, либо устанавливает событие пробуждение одного спящего потока
77 132 /// </summary>
78 protected void WakePool() {
79 m_hasTasks.Set(); // wake sleeping thread;
133 protected void GrowPool() {
134 if (m_exitRequired != 0)
135 return;
136 if (m_sleepingThreads > m_wakeEvents) {
137 //Console.WriteLine("Waking threads (sleeps {0}, pending {1})", m_sleepingThreads, m_wakeEvents);
138
139 // all sleeping threads may gone
140 SignalThread(); // wake a sleeping thread;
80 141
81 if (AllocateThreadSlot(1)) {
82 var worker = new Thread(this.Worker);
83 worker.IsBackground = true;
84 worker.Start();
142 // we can't check whether signal has been processed
143 // anyway it may take some time for the thread to start
144 // we will ensure that at least one thread is running
145
146 if (AllocateThreadSlot(1)) {
147 // if there were no threads in the pool
148 var worker = new Thread(this.Worker);
149 worker.IsBackground = true;
150 worker.Start();
151 }
152 } else {
153 // if there is no sleeping threads in the pool
154 StartWorker();
85 155 }
86 156 }
87 157
88 protected virtual void Suspend() {
89 m_hasTasks.WaitOne();
158 private bool Suspend() {
159 //no tasks left, exit if the thread is no longer needed
160 bool last;
161 bool requestExit;
162
163 // if threads have a timeout before releasing
164 if (m_releaseTimeout > 0)
165 requestExit = !Sleep(m_releaseTimeout);
166 else
167 requestExit = true;
168
169 if (!requestExit)
170 return true;
171
172 // release unsused thread
173 if (requestExit && ReleaseThreadSlot(out last)) {
174 // in case at the moment the last thread was being released
175 // a new task was added to the queue, we need to try
176 // to revoke the thread to avoid the situation when the task is left unprocessed
177 if (last && Sleep(0)) { // Sleep(0) will fetch pending task or will return false
178 if (AllocateThreadSlot(1))
179 return true; // spin again...
180 else
181 SignalThread(); // since Sleep(0) has fetched the signal we neet to reschedule it
182
183 }
184
185 return false;
186 }
187
188 // wait till infinity
189 Sleep(-1);
190
191 return true;
90 192 }
91 193
92 194 #region thread slots traits
@@ -95,11 +197,11 namespace Implab.Parallels {
95 197 int current;
96 198 // use spins to allocate slot for the new thread
97 199 do {
98 current = m_runningThreads;
200 current = m_createdThreads;
99 201 if (current >= m_maxThreads || m_exitRequired != 0)
100 202 // no more slots left or the pool has been disposed
101 203 return false;
102 } while (current != Interlocked.CompareExchange(ref m_runningThreads, current + 1, current));
204 } while (current != Interlocked.CompareExchange(ref m_createdThreads, current + 1, current));
103 205
104 206 UpdateMaxThreads(current + 1);
105 207
@@ -107,7 +209,7 namespace Implab.Parallels {
107 209 }
108 210
109 211 bool AllocateThreadSlot(int desired) {
110 if (desired - 1 != Interlocked.CompareExchange(ref m_runningThreads, desired, desired - 1))
212 if (desired - 1 != Interlocked.CompareExchange(ref m_createdThreads, desired, desired - 1))
111 213 return false;
112 214
113 215 UpdateMaxThreads(desired);
@@ -120,11 +222,11 namespace Implab.Parallels {
120 222 int current;
121 223 // use spins to release slot for the new thread
122 224 do {
123 current = m_runningThreads;
225 current = m_createdThreads;
124 226 if (current <= m_minThreads && m_exitRequired == 0)
125 227 // the thread is reserved
126 228 return false;
127 } while (current != Interlocked.CompareExchange(ref m_runningThreads, current - 1, current));
229 } while (current != Interlocked.CompareExchange(ref m_createdThreads, current - 1, current));
128 230
129 231 last = (current == 1);
130 232
@@ -136,7 +238,7 namespace Implab.Parallels {
136 238 /// </summary>
137 239 /// <returns>true - no more threads left</returns>
138 240 bool ReleaseThreadSlotAnyway() {
139 var left = Interlocked.Decrement(ref m_runningThreads);
241 var left = Interlocked.Decrement(ref m_createdThreads);
140 242 return left == 0;
141 243 }
142 244
@@ -164,54 +266,41 namespace Implab.Parallels {
164 266 }
165 267 }
166 268
167 bool FetchTask(out TUnit unit) {
269 protected abstract void InvokeUnit(TUnit unit);
270
271 void Worker() {
272 TUnit unit;
273 //Console.WriteLine("{0}: Active", Thread.CurrentThread.ManagedThreadId);
274 Interlocked.Increment(ref m_activeThreads);
168 275 do {
169 276 // exit if requested
170 277 if (m_exitRequired != 0) {
171 278 // release the thread slot
279 Interlocked.Decrement(ref m_activeThreads);
172 280 if (ReleaseThreadSlotAnyway()) // it was the last worker
173 281 m_hasTasks.Dispose();
174 282 else
175 m_hasTasks.Set(); // wake next worker
283 SignalThread(); // wake next worker
176 284 unit = default(TUnit);
177 return false;
285 break;
178 286 }
179 287
180 288 // fetch task
181 289 if (TryDequeue(out unit)) {
182 ExtendPool();
183 return true;
290 InvokeUnit(unit);
291 continue;
184 292 }
185 293
186 //no tasks left, exit if the thread is no longer needed
187 bool last;
188 if (ReleaseThreadSlot(out last)) {
189 if (last && m_hasTasks.WaitOne(0)) {
190 if (AllocateThreadSlot(1))
191 continue; // spin again...
192 else
193 // we failed to reallocate slot for this thread
194 // therefore we need to release the event
195 m_hasTasks.Set();
196 }
197
198 return false;
199 }
294 Interlocked.Decrement(ref m_activeThreads);
200 295
201 296 // entering suspend state
202 Interlocked.Increment(ref m_suspended);
203 297 // keep this thread and wait
204 Suspend();
205 Interlocked.Decrement(ref m_suspended);
298 if (!Suspend())
299 break;
300 //Console.WriteLine("{0}: Awake", Thread.CurrentThread.ManagedThreadId);
301 Interlocked.Increment(ref m_activeThreads);
206 302 } while (true);
207 }
208
209 protected abstract void InvokeUnit(TUnit unit);
210
211 void Worker() {
212 TUnit unit;
213 while (FetchTask(out unit))
214 InvokeUnit(unit);
303 //Console.WriteLine("{0}: Exited", Thread.CurrentThread.ManagedThreadId);
215 304 }
216 305
217 306 protected virtual void Dispose(bool disposing) {
@@ -221,7 +310,10 namespace Implab.Parallels {
221 310 return;
222 311
223 312 // wake sleeping threads
224 m_hasTasks.Set();
313 if (m_createdThreads > 0)
314 SignalThread();
315 else
316 m_hasTasks.Dispose();
225 317 GC.SuppressFinalize(this);
226 318 }
227 319 }
@@ -42,12 +42,13 namespace Implab.Parallels {
42 42 next = first.next;
43 43 if (next == null) {
44 44 // this is the last element,
45 // then try to update tail
45 // then try to update the tail
46 46 if (first != Interlocked.CompareExchange(ref m_last, null, first)) {
47 // this is inconsistent situation which means that the queue is empty
47 // this is a ace condition
48 48 if (m_last == null)
49 // the queue is empty
49 50 return false;
50 // tail has been changed, that means that we need to restart
51 // tail has been changed, than we need to restart
51 52 continue;
52 53 }
53 54
@@ -57,16 +57,8 namespace Implab.Parallels {
57 57 var len = Interlocked.Increment(ref m_queueLength);
58 58 m_queue.Enqueue(unit);
59 59
60 if(!ExtendPool())
61 WakePool();
62 }
63
64 protected override bool ExtendPool() {
65 if (m_queueLength <= m_threshold*ThreadCount)
66 // in this case we are in active thread and it request for additional workers
67 // satisfy it only when queue is longer than threshold
68 return false;
69 return base.ExtendPool();
60 if (len > m_threshold*ActiveThreads)
61 GrowPool();
70 62 }
71 63
72 64 protected override bool TryDequeue(out Action unit) {
@@ -81,9 +73,5 namespace Implab.Parallels {
81 73 unit();
82 74 }
83 75
84 protected override void Suspend() {
85 if (m_queueLength == 0)
86 base.Suspend();
87 }
88 76 }
89 77 }
@@ -3,6 +3,7 using System.Collections.Generic;
3 3 using System.Reflection;
4 4 using System.Diagnostics;
5 5 using System.Threading;
6 using Implab.Parallels;
6 7
7 8 namespace Implab {
8 9
@@ -48,24 +49,53 namespace Implab {
48 49 /// </remarks>
49 50 public class Promise<T> : IPromise {
50 51
51 struct ResultHandlerInfo {
52 struct HandlerDescriptor {
52 53 public ResultHandler<T> resultHandler;
53 54 public ErrorHandler errorHandler;
55 public Action cancellHandler;
56
57 public void Resolve(T result) {
58 if (resultHandler != null)
59 try {
60 resultHandler(result);
61 } catch (Exception e) {
62 Reject(e);
63 }
64 }
65
66 public void Reject(Exception err) {
67 if (errorHandler != null)
68 try {
69 errorHandler(err);
70 } catch {
71 }
72 }
73
74 public void Cancel() {
75 if (cancellHandler != null)
76 try {
77 cancellHandler();
78 } catch {
79 }
80 }
54 81 }
55 82
83 const int UnresolvedSate = 0;
84 const int TransitionalState = 1;
85 const int ResolvedState = 2;
86 const int RejectedState = 3;
87 const int CancelledState = 4;
88
56 89 readonly IPromise m_parent;
57
58 LinkedList<ResultHandlerInfo> m_resultHandlers = new LinkedList<ResultHandlerInfo>();
59 LinkedList<Action> m_cancelHandlers = new LinkedList<Action>();
90 readonly bool m_cancellable;
60 91
61 readonly object m_lock = new Object();
62 readonly bool m_cancellable;
63 92 int m_childrenCount = 0;
64
65 PromiseState m_state;
93 int m_state;
66 94 T m_result;
67 95 Exception m_error;
68 96
97 readonly MTQueue<HandlerDescriptor> m_handlers = new MTQueue<HandlerDescriptor>();
98
69 99 public Promise() {
70 100 m_cancellable = true;
71 101 }
@@ -73,8 +103,6 namespace Implab {
73 103 public Promise(IPromise parent, bool cancellable) {
74 104 m_cancellable = cancellable;
75 105 m_parent = parent;
76 if (parent != null)
77 parent.HandleCancelled(InternalCancel);
78 106 }
79 107
80 108 void InternalCancel() {
@@ -82,22 +110,39 namespace Implab {
82 110 Cancel(false);
83 111 }
84 112
113 bool BeginTransit() {
114 return UnresolvedSate == Interlocked.CompareExchange(ref m_state, TransitionalState, UnresolvedSate);
115 }
116
117 void CompleteTransit(int state) {
118 if (TransitionalState != Interlocked.CompareExchange(ref m_state, state, TransitionalState))
119 throw new InvalidOperationException("Can't complete transition when the object isn't in the transitional state");
120 }
121
122 public bool IsResolved {
123 get {
124 return m_state > 1;
125 }
126 }
127
128 public bool IsCancelled {
129 get {
130 return m_state == CancelledState;
131 }
132 }
133
85 134 /// <summary>
86 135 /// Выполняет обещание, сообщая об успешном выполнении.
87 136 /// </summary>
88 137 /// <param name="result">Результат выполнения.</param>
89 138 /// <exception cref="InvalidOperationException">Данное обещание уже выполнено</exception>
90 139 public void Resolve(T result) {
91 lock (m_lock) {
92 if (m_state == PromiseState.Cancelled)
93 return;
94 if (m_state != PromiseState.Unresolved)
95 throw new InvalidOperationException("The promise is already resolved");
140 if (BeginTransit()) {
96 141 m_result = result;
97 m_state = PromiseState.Resolved;
98 }
99
100 OnStateChanged();
142 CompleteTransit(ResolvedState);
143 OnStateChanged();
144 } else if (m_state != CancelledState)
145 throw new InvalidOperationException("The promise is already resolved");
101 146 }
102 147
103 148 /// <summary>
@@ -111,16 +156,12 namespace Implab {
111 156 /// <param name="error">Исключение возникшее при выполнении операции</param>
112 157 /// <exception cref="InvalidOperationException">Данное обещание уже выполнено</exception>
113 158 public void Reject(Exception error) {
114 lock (m_lock) {
115 if (m_state == PromiseState.Cancelled || m_state == PromiseState.Rejected)
116 return;
117 if (m_state != PromiseState.Unresolved)
118 throw new InvalidOperationException("The promise is already resolved");
159 if (BeginTransit()) {
119 160 m_error = error;
120 m_state = PromiseState.Rejected;
121 }
122
123 OnStateChanged();
161 CompleteTransit(RejectedState);
162 OnStateChanged();
163 } else if (m_state == ResolvedState)
164 throw new InvalidOperationException("The promise is already resolved");
124 165 }
125 166
126 167 /// <summary>
@@ -144,27 +185,27 namespace Implab {
144 185
145 186 var medium = new Promise<T>(this, true);
146 187
147 var handlerInfo = new ResultHandlerInfo();
148
188 ResultHandler<T> resultHandler;
149 189 if (success != null)
150 handlerInfo.resultHandler = x => {
190 resultHandler = x => {
151 191 success(x);
152 192 medium.Resolve(x);
153 193 };
154 194 else
155 handlerInfo.resultHandler = medium.Resolve;
195 resultHandler = medium.Resolve;
156 196
197 ErrorHandler errorHandler;
157 198 if (error != null)
158 handlerInfo.errorHandler = x => {
199 errorHandler = x => {
159 200 try {
160 201 error(x);
161 202 } catch { }
162 203 medium.Reject(x);
163 204 };
164 205 else
165 handlerInfo.errorHandler = medium.Reject;
206 errorHandler = medium.Reject;
166 207
167 AddHandler(handlerInfo);
208 AddHandler(resultHandler, errorHandler, medium.InternalCancel);
168 209
169 210 return medium;
170 211 }
@@ -182,27 +223,28 namespace Implab {
182 223
183 224 var medium = new Promise<T>(this, true);
184 225
185 var handlerInfo = new ResultHandlerInfo();
226 ResultHandler<T> resultHandler;
227 ErrorHandler errorHandler;
186 228
187 229 if (success != null)
188 handlerInfo.resultHandler = x => {
230 resultHandler = x => {
189 231 success(x);
190 232 medium.Resolve(x);
191 233 };
192 234 else
193 handlerInfo.resultHandler = medium.Resolve;
235 resultHandler = medium.Resolve;
194 236
195 237 if (error != null)
196 handlerInfo.errorHandler = x => {
238 errorHandler = x => {
197 239 try {
198 240 medium.Resolve(error(x));
199 241 } catch { }
200 242 medium.Reject(x);
201 243 };
202 244 else
203 handlerInfo.errorHandler = medium.Reject;
245 errorHandler = medium.Reject;
204 246
205 AddHandler(handlerInfo);
247 AddHandler(resultHandler, errorHandler, medium.InternalCancel);
206 248
207 249 return medium;
208 250 }
@@ -214,19 +256,17 namespace Implab {
214 256
215 257 var medium = new Promise<T>(this, true);
216 258
217 var handlerInfo = new ResultHandlerInfo();
218
259 ResultHandler<T> resultHandler;
260
219 261 if (success != null)
220 handlerInfo.resultHandler = x => {
262 resultHandler = x => {
221 263 success(x);
222 264 medium.Resolve(x);
223 265 };
224 266 else
225 handlerInfo.resultHandler = medium.Resolve;
267 resultHandler = medium.Resolve;
226 268
227 handlerInfo.errorHandler = medium.Reject;
228
229 AddHandler(handlerInfo);
269 AddHandler(resultHandler, medium.Reject, medium.InternalCancel);
230 270
231 271 return medium;
232 272 }
@@ -249,15 +289,17 namespace Implab {
249 289
250 290 var medium = new Promise<T>(this, true);
251 291
252 AddHandler(new ResultHandlerInfo {
253 errorHandler = e => {
292 AddHandler(
293 null,
294 e => {
254 295 try {
255 296 medium.Resolve(handler(e));
256 297 } catch (Exception e2) {
257 298 medium.Reject(e2);
258 299 }
259 }
260 });
300 },
301 medium.InternalCancel
302 );
261 303
262 304 return medium;
263 305 }
@@ -268,8 +310,8 namespace Implab {
268 310
269 311 var medium = new Promise<T>();
270 312
271 AddHandler(new ResultHandlerInfo {
272 resultHandler = x => {
313 AddHandler(
314 x => {
273 315 // to avoid handler being called multiple times we handle exception by ourselfs
274 316 try {
275 317 handler();
@@ -278,13 +320,16 namespace Implab {
278 320 medium.Reject(e);
279 321 }
280 322 },
281 errorHandler = x => {
323
324 e => {
282 325 try {
283 326 handler();
284 327 } catch { }
285 medium.Reject(x);
286 }
287 });
328 medium.Reject(e);
329 },
330
331 medium.InternalCancel
332 );
288 333
289 334 return medium;
290 335 }
@@ -304,17 +349,22 namespace Implab {
304 349 // создаем прицепленное обещание
305 350 var chained = new Promise<TNew>();
306 351
307 AddHandler(new ResultHandlerInfo() {
308 resultHandler = result => chained.Resolve(mapper(result)),
309 errorHandler = delegate(Exception e) {
310 if (error != null)
311 try {
312 error(e);
313 } catch { }
314 // в случае ошибки нужно передать исключение дальше по цепочке
315 chained.Reject(e);
316 }
317 });
352 ResultHandler<T> resultHandler = result => chained.Resolve(mapper(result));
353 ErrorHandler errorHandler = delegate(Exception e) {
354 if (error != null)
355 try {
356 error(e);
357 } catch { }
358 // в случае ошибки нужно передать исключение дальше по цепочке
359 chained.Reject(e);
360 };
361
362
363 AddHandler(
364 resultHandler,
365 errorHandler,
366 chained.InternalCancel
367 );
318 368
319 369 return chained;
320 370 }
@@ -341,27 +391,32 namespace Implab {
341 391 // передать через него результаты работы.
342 392 var medium = new Promise<TNew>(this, true);
343 393
344 AddHandler(new ResultHandlerInfo {
345 resultHandler = delegate(T result) {
346 if (medium.State == PromiseState.Cancelled)
347 return;
394 ResultHandler<T> resultHandler = delegate(T result) {
395 if (medium.IsCancelled)
396 return;
348 397
349 var promise = chained(result);
398 var promise = chained(result);
350 399
351 // notify chained operation that it's not needed
352 medium.Cancelled(() => promise.Cancel());
353 promise.Then(
354 x => medium.Resolve(x),
355 e => medium.Reject(e)
356 );
357 },
358 errorHandler = delegate(Exception e) {
359 if (error != null)
360 error(e);
361 // в случае ошибки нужно передать исключение дальше по цепочке
362 medium.Reject(e);
363 }
364 });
400 // notify chained operation that it's not needed
401 medium.Cancelled(() => promise.Cancel());
402 promise.Then(
403 x => medium.Resolve(x),
404 e => medium.Reject(e)
405 );
406 };
407
408 ErrorHandler errorHandler = delegate(Exception e) {
409 if (error != null)
410 error(e);
411 // в случае ошибки нужно передать исключение дальше по цепочке
412 medium.Reject(e);
413 };
414
415 AddHandler(
416 resultHandler,
417 errorHandler,
418 medium.InternalCancel
419 );
365 420
366 421 return medium;
367 422 }
@@ -371,19 +426,19 namespace Implab {
371 426 }
372 427
373 428 public Promise<T> Cancelled(Action handler) {
374 if (handler == null)
375 return this;
376 lock (m_lock) {
377 if (m_state == PromiseState.Unresolved)
378 m_cancelHandlers.AddLast(handler);
379 else if (m_state == PromiseState.Cancelled)
380 handler();
381 }
429 AddHandler(null, null, handler);
382 430 return this;
383 431 }
384 432
385 public void HandleCancelled(Action handler) {
386 Cancelled(handler);
433 public Promise<T> Finally(Action handler) {
434 if (handler == null)
435 throw new ArgumentNullException("handler");
436 AddHandler(
437 x => handler(),
438 e => handler(),
439 handler
440 );
441 return this;
387 442 }
388 443
389 444 /// <summary>
@@ -415,15 +470,15 namespace Implab {
415 470 if (!evt.WaitOne(timeout, true))
416 471 throw new TimeoutException();
417 472
418 switch (State) {
419 case PromiseState.Resolved:
473 switch (m_state) {
474 case ResolvedState:
420 475 return m_result;
421 case PromiseState.Cancelled:
476 case CancelledState:
422 477 throw new OperationCanceledException();
423 case PromiseState.Rejected:
478 case RejectedState:
424 479 throw new TargetInvocationException(m_error);
425 480 default:
426 throw new ApplicationException(String.Format("Invalid promise state {0}", State));
481 throw new ApplicationException(String.Format("Invalid promise state {0}", m_state));
427 482 }
428 483 }
429 484
@@ -431,40 +486,45 namespace Implab {
431 486 return Join(Timeout.Infinite);
432 487 }
433 488
434 void AddHandler(ResultHandlerInfo handler) {
435 bool invokeRequired = false;
489 void AddHandler(ResultHandler<T> success, ErrorHandler error, Action cancel) {
490 Interlocked.Increment(ref m_childrenCount);
491
492 HandlerDescriptor handler = new HandlerDescriptor {
493 resultHandler = success,
494 errorHandler = error,
495 cancellHandler = cancel
496 };
436 497
437 lock (m_lock) {
438 m_childrenCount++;
439 if (m_state == PromiseState.Unresolved) {
440 m_resultHandlers.AddLast(handler);
441 } else
442 invokeRequired = true;
498 bool queued;
499
500 if (!IsResolved) {
501 m_handlers.Enqueue(handler);
502 queued = true;
503 } else {
504 // the promise is in resolved state, just invoke the handled with minimum overhead
505 queued = false;
506 InvokeHandler(handler);
443 507 }
444 508
445 // обработчики не должны блокировать сам объект
446 if (invokeRequired)
509 if (queued && IsResolved && m_handlers.TryDequeue(out handler))
510 // if the promise have been resolved while we was adding handler to the queue
511 // we can't guarantee that someone is still processing it
512 // therefore we will fetch a handler from the queue and execute it
513 // note that fetched handler may be not the one we have added
447 514 InvokeHandler(handler);
515
448 516 }
449 517
450 void InvokeHandler(ResultHandlerInfo handler) {
518 void InvokeHandler(HandlerDescriptor handler) {
451 519 switch (m_state) {
452 case PromiseState.Resolved:
453 try {
454 if (handler.resultHandler != null)
455 handler.resultHandler(m_result);
456 } catch (Exception e) {
457 try {
458 if (handler.errorHandler != null)
459 handler.errorHandler(e);
460 } catch { }
461 }
520 case ResolvedState:
521 handler.Resolve(m_result);
462 522 break;
463 case PromiseState.Rejected:
464 try {
465 if (handler.errorHandler != null)
466 handler.errorHandler(m_error);
467 } catch { }
523 case RejectedState:
524 handler.Reject(m_error);
525 break;
526 case CancelledState:
527 handler.Cancel();
468 528 break;
469 529 default:
470 530 // do nothing
@@ -473,76 +533,31 namespace Implab {
473 533 }
474 534
475 535 protected virtual void OnStateChanged() {
476 switch (m_state) {
477 case PromiseState.Resolved:
478 foreach (var resultHandlerInfo in m_resultHandlers)
479 try {
480 if (resultHandlerInfo.resultHandler != null)
481 resultHandlerInfo.resultHandler(m_result);
482 } catch (Exception e) {
483 try {
484 if (resultHandlerInfo.errorHandler != null)
485 resultHandlerInfo.errorHandler(e);
486 } catch { }
487 }
488 break;
489 case PromiseState.Cancelled:
490 foreach (var cancelHandler in m_cancelHandlers)
491 cancelHandler();
492 break;
493 case PromiseState.Rejected:
494 foreach (var resultHandlerInfo in m_resultHandlers)
495 try {
496 if (resultHandlerInfo.errorHandler != null)
497 resultHandlerInfo.errorHandler(m_error);
498 } catch { }
499 break;
500 default:
501 throw new InvalidOperationException(String.Format("Promise entered an invalid state {0}", m_state));
502 }
503
504 m_resultHandlers = null;
505 m_cancelHandlers = null;
536 HandlerDescriptor handler;
537 while (m_handlers.TryDequeue(out handler))
538 InvokeHandler(handler);
506 539 }
507 540
508 541
509 542
510 543 public bool IsExclusive {
511 544 get {
512 lock (m_lock) {
513 return m_childrenCount <= 1;
514 }
515 }
516 }
517
518 public PromiseState State {
519 get {
520 lock (m_lock) {
521 return m_state;
522 }
545 return m_childrenCount <= 1;
523 546 }
524 547 }
525 548
526 549 protected bool Cancel(bool dependencies) {
527 bool result;
528
529 lock (m_lock) {
530 if (m_state == PromiseState.Unresolved) {
531 m_state = PromiseState.Cancelled;
532 result = true;
533 } else {
534 result = false;
535 }
536 }
537
538 if (result)
550 if (BeginTransit()) {
551 CompleteTransit(CancelledState);
539 552 OnStateChanged();
540 553
541 if (dependencies && m_parent != null && m_parent.IsExclusive) {
542 m_parent.Cancel();
554 if (dependencies && m_parent != null && m_parent.IsExclusive)
555 m_parent.Cancel();
556
557 return true;
558 } else {
559 return false;
543 560 }
544
545 return result;
546 561 }
547 562
548 563 }
General Comments 0
You need to be logged in to leave comments. Login now