##// END OF EJS Templates
Слияние с promises
cin -
r23:f0568ff069a5 merge default
parent child
Show More
@@ -14,7 +14,7 namespace Implab.Test {
14 p.Then(x => res = x);
14 p.Then(x => res = x);
15 p.Resolve(100);
15 p.Resolve(100);
16
16
17 Assert.AreEqual(res, 100);
17 Assert.AreEqual(100, res);
18 }
18 }
19
19
20 [TestMethod]
20 [TestMethod]
@@ -101,18 +101,18 namespace Implab.Test {
101 public void WorkerPoolSizeTest() {
101 public void WorkerPoolSizeTest() {
102 var pool = new WorkerPool(5, 10, 0);
102 var pool = new WorkerPool(5, 10, 0);
103
103
104 Assert.AreEqual(5, pool.ThreadCount);
104 Assert.AreEqual(5, pool.PoolSize);
105
105
106 pool.Invoke(() => { Thread.Sleep(1000000); return 10; });
106 pool.Invoke(() => { Thread.Sleep(100000000); return 10; });
107 pool.Invoke(() => { Thread.Sleep(1000000); return 10; });
107 pool.Invoke(() => { Thread.Sleep(100000000); return 10; });
108 pool.Invoke(() => { Thread.Sleep(1000000); return 10; });
108 pool.Invoke(() => { Thread.Sleep(100000000); return 10; });
109
109
110 Assert.AreEqual(5, pool.ThreadCount);
110 Assert.AreEqual(5, pool.PoolSize);
111
111
112 for (int i = 0; i < 100; i++)
112 for (int i = 0; i < 100; i++)
113 pool.Invoke(() => { Thread.Sleep(1000000); return 10; });
113 pool.Invoke(() => { Thread.Sleep(100000000); return 10; });
114 Thread.Sleep(100);
114 Thread.Sleep(200);
115 Assert.AreEqual(10, pool.ThreadCount);
115 Assert.AreEqual(10, pool.PoolSize);
116
116
117 pool.Dispose();
117 pool.Dispose();
118 }
118 }
@@ -149,10 +149,10 namespace Implab.Test {
149 [TestMethod]
149 [TestMethod]
150 public void WorkerPoolDisposeTest() {
150 public void WorkerPoolDisposeTest() {
151 var pool = new WorkerPool(5, 20);
151 var pool = new WorkerPool(5, 20);
152 Assert.AreEqual(5, pool.ThreadCount);
152 Assert.AreEqual(5, pool.PoolSize);
153 pool.Dispose();
153 pool.Dispose();
154 Thread.Sleep(100);
154 Thread.Sleep(500);
155 Assert.AreEqual(0, pool.ThreadCount);
155 Assert.AreEqual(0, pool.PoolSize);
156 pool.Dispose();
156 pool.Dispose();
157 }
157 }
158
158
@@ -244,7 +244,7 namespace Implab.Test {
244 [TestMethod]
244 [TestMethod]
245 public void ChainedMapTest() {
245 public void ChainedMapTest() {
246
246
247 using (var pool = new WorkerPool(8,100,0)) {
247 using (var pool = new WorkerPool(4,4,0)) {
248 int count = 10000;
248 int count = 10000;
249
249
250 double[] args = new double[count];
250 double[] args = new double[count];
1 NO CONTENT: modified file, binary diff hidden
NO CONTENT: modified file, binary diff hidden
1 NO CONTENT: modified file, binary diff hidden
NO CONTENT: modified file, binary diff hidden
@@ -15,19 +15,6 namespace Implab
15 get;
15 get;
16 }
16 }
17
17
18 /// <summary>
19 /// The current state of the promise.
20 /// </summary>
21 PromiseState State
22 {
23 get;
24 }
25
18
26 /// <summary>
27 /// Registers handler for the case when the promise is cencelled. If the promise already cancelled the
28 /// handler will be invoked immediatelly.
29 /// </summary>
30 /// <param name="handler">The handler</param>
31 void HandleCancelled(Action handler);
32 }
19 }
33 }
20 }
@@ -140,7 +140,7 namespace Implab.Parallels {
140
140
141 AsyncPool.InvokeNewThread(() => {
141 AsyncPool.InvokeNewThread(() => {
142 for (int i = 0; i < source.Length; i++) {
142 for (int i = 0; i < source.Length; i++) {
143 if(promise.State != PromiseState.Unresolved)
143 if(promise.IsResolved)
144 break; // stop processing in case of error or cancellation
144 break; // stop processing in case of error or cancellation
145 var idx = i;
145 var idx = i;
146 semaphore.WaitOne();
146 semaphore.WaitOne();
@@ -9,10 +9,15 namespace Implab.Parallels {
9 public abstract class DispatchPool<TUnit> : IDisposable {
9 public abstract class DispatchPool<TUnit> : IDisposable {
10 readonly int m_minThreads;
10 readonly int m_minThreads;
11 readonly int m_maxThreads;
11 readonly int m_maxThreads;
12 int m_runningThreads = 0;
12
13 int m_maxRunningThreads = 0;
13 int m_createdThreads = 0; // the current size of the pool
14 int m_suspended = 0;
14 int m_activeThreads = 0; // the count of threads which are active
15 int m_exitRequired = 0;
15 int m_sleepingThreads = 0; // the count of currently inactive threads
16 int m_maxRunningThreads = 0; // the meximum reached size of the pool
17 int m_exitRequired = 0; // the pool is going to shutdown, all unused workers are released
18 int m_releaseTimeout = 100; // the timeout while the working thread will wait for the new tasks before exit
19 int m_wakeEvents = 0; // the count of wake events
20
16 AutoResetEvent m_hasTasks = new AutoResetEvent(false);
21 AutoResetEvent m_hasTasks = new AutoResetEvent(false);
17
22
18 protected DispatchPool(int min, int max) {
23 protected DispatchPool(int min, int max) {
@@ -44,9 +49,15 namespace Implab.Parallels {
44 StartWorker();
49 StartWorker();
45 }
50 }
46
51
47 public int ThreadCount {
52 public int PoolSize {
48 get {
53 get {
49 return m_runningThreads;
54 return m_createdThreads;
55 }
56 }
57
58 public int ActiveThreads {
59 get {
60 return m_activeThreads;
50 }
61 }
51 }
62 }
52
63
@@ -64,29 +75,120 namespace Implab.Parallels {
64
75
65 protected abstract bool TryDequeue(out TUnit unit);
76 protected abstract bool TryDequeue(out TUnit unit);
66
77
67 protected virtual bool ExtendPool() {
78 #region thread execution traits
68 if (m_suspended > 0) {
79 int SignalThread() {
80 var signals = Interlocked.Increment(ref m_wakeEvents);
81 if(signals == 1)
69 m_hasTasks.Set();
82 m_hasTasks.Set();
83 return signals;
84 }
85
86 bool FetchSignalOrWait(int timeout) {
87 var start = Environment.TickCount;
88
89 // означает, что поток владеет блокировкой и при успешном получении сигнала должен
90 // ее вернуть, чтобы другой ожидающий поток смог
91 bool hasLock = false;
92 do {
93 int signals;
94 do {
95 signals = m_wakeEvents;
96 if (signals == 0)
97 break;
98 } while (Interlocked.CompareExchange(ref m_wakeEvents, signals - 1, signals) != signals);
99
100 if (signals >= 1) {
101 if (signals > 1 && hasLock)
102 m_hasTasks.Set();
103 return true;
104 }
105
106 if (timeout != -1)
107 timeout = Math.Max(0, timeout - (Environment.TickCount - start));
108
109 // если сигналов больше не осталось, то первый поток, который дошел сюда сбросит событие
110 // и уйдет на пустой цикл, после чего заблокируется
111
112 hasLock = true;
113 } while (m_hasTasks.WaitOne(timeout));
114
115 return false;
116 }
117
118 bool Sleep(int timeout) {
119 Interlocked.Increment(ref m_sleepingThreads);
120 if (FetchSignalOrWait(timeout)) {
121 Interlocked.Decrement(ref m_sleepingThreads);
70 return true;
122 return true;
71 } else
123 } else {
72 return StartWorker();
124 Interlocked.Decrement(ref m_sleepingThreads);
125 return false;
126 }
73 }
127 }
128 #endregion
74
129
75 /// <summary>
130 /// <summary>
76 /// Запускает либо новый поток, если раньше не было ни одного потока, либо устанавливает событие пробуждение одного спящего потока
131 /// Запускает либо новый поток, если раньше не было ни одного потока, либо устанавливает событие пробуждение одного спящего потока
77 /// </summary>
132 /// </summary>
78 protected void WakePool() {
133 protected void GrowPool() {
79 m_hasTasks.Set(); // wake sleeping thread;
134 if (m_exitRequired != 0)
135 return;
136 if (m_sleepingThreads > m_wakeEvents) {
137 //Console.WriteLine("Waking threads (sleeps {0}, pending {1})", m_sleepingThreads, m_wakeEvents);
138
139 // all sleeping threads may gone
140 SignalThread(); // wake a sleeping thread;
80
141
81 if (AllocateThreadSlot(1)) {
142 // we can't check whether signal has been processed
82 var worker = new Thread(this.Worker);
143 // anyway it may take some time for the thread to start
83 worker.IsBackground = true;
144 // we will ensure that at least one thread is running
84 worker.Start();
145
146 if (AllocateThreadSlot(1)) {
147 // if there were no threads in the pool
148 var worker = new Thread(this.Worker);
149 worker.IsBackground = true;
150 worker.Start();
151 }
152 } else {
153 // if there is no sleeping threads in the pool
154 StartWorker();
85 }
155 }
86 }
156 }
87
157
88 protected virtual void Suspend() {
158 private bool Suspend() {
89 m_hasTasks.WaitOne();
159 //no tasks left, exit if the thread is no longer needed
160 bool last;
161 bool requestExit;
162
163 // if threads have a timeout before releasing
164 if (m_releaseTimeout > 0)
165 requestExit = !Sleep(m_releaseTimeout);
166 else
167 requestExit = true;
168
169 if (!requestExit)
170 return true;
171
172 // release unsused thread
173 if (requestExit && ReleaseThreadSlot(out last)) {
174 // in case at the moment the last thread was being released
175 // a new task was added to the queue, we need to try
176 // to revoke the thread to avoid the situation when the task is left unprocessed
177 if (last && Sleep(0)) { // Sleep(0) will fetch pending task or will return false
178 if (AllocateThreadSlot(1))
179 return true; // spin again...
180 else
181 SignalThread(); // since Sleep(0) has fetched the signal we neet to reschedule it
182
183 }
184
185 return false;
186 }
187
188 // wait till infinity
189 Sleep(-1);
190
191 return true;
90 }
192 }
91
193
92 #region thread slots traits
194 #region thread slots traits
@@ -95,11 +197,11 namespace Implab.Parallels {
95 int current;
197 int current;
96 // use spins to allocate slot for the new thread
198 // use spins to allocate slot for the new thread
97 do {
199 do {
98 current = m_runningThreads;
200 current = m_createdThreads;
99 if (current >= m_maxThreads || m_exitRequired != 0)
201 if (current >= m_maxThreads || m_exitRequired != 0)
100 // no more slots left or the pool has been disposed
202 // no more slots left or the pool has been disposed
101 return false;
203 return false;
102 } while (current != Interlocked.CompareExchange(ref m_runningThreads, current + 1, current));
204 } while (current != Interlocked.CompareExchange(ref m_createdThreads, current + 1, current));
103
205
104 UpdateMaxThreads(current + 1);
206 UpdateMaxThreads(current + 1);
105
207
@@ -107,7 +209,7 namespace Implab.Parallels {
107 }
209 }
108
210
109 bool AllocateThreadSlot(int desired) {
211 bool AllocateThreadSlot(int desired) {
110 if (desired - 1 != Interlocked.CompareExchange(ref m_runningThreads, desired, desired - 1))
212 if (desired - 1 != Interlocked.CompareExchange(ref m_createdThreads, desired, desired - 1))
111 return false;
213 return false;
112
214
113 UpdateMaxThreads(desired);
215 UpdateMaxThreads(desired);
@@ -120,11 +222,11 namespace Implab.Parallels {
120 int current;
222 int current;
121 // use spins to release slot for the new thread
223 // use spins to release slot for the new thread
122 do {
224 do {
123 current = m_runningThreads;
225 current = m_createdThreads;
124 if (current <= m_minThreads && m_exitRequired == 0)
226 if (current <= m_minThreads && m_exitRequired == 0)
125 // the thread is reserved
227 // the thread is reserved
126 return false;
228 return false;
127 } while (current != Interlocked.CompareExchange(ref m_runningThreads, current - 1, current));
229 } while (current != Interlocked.CompareExchange(ref m_createdThreads, current - 1, current));
128
230
129 last = (current == 1);
231 last = (current == 1);
130
232
@@ -136,7 +238,7 namespace Implab.Parallels {
136 /// </summary>
238 /// </summary>
137 /// <returns>true - no more threads left</returns>
239 /// <returns>true - no more threads left</returns>
138 bool ReleaseThreadSlotAnyway() {
240 bool ReleaseThreadSlotAnyway() {
139 var left = Interlocked.Decrement(ref m_runningThreads);
241 var left = Interlocked.Decrement(ref m_createdThreads);
140 return left == 0;
242 return left == 0;
141 }
243 }
142
244
@@ -164,54 +266,41 namespace Implab.Parallels {
164 }
266 }
165 }
267 }
166
268
167 bool FetchTask(out TUnit unit) {
269 protected abstract void InvokeUnit(TUnit unit);
270
271 void Worker() {
272 TUnit unit;
273 //Console.WriteLine("{0}: Active", Thread.CurrentThread.ManagedThreadId);
274 Interlocked.Increment(ref m_activeThreads);
168 do {
275 do {
169 // exit if requested
276 // exit if requested
170 if (m_exitRequired != 0) {
277 if (m_exitRequired != 0) {
171 // release the thread slot
278 // release the thread slot
279 Interlocked.Decrement(ref m_activeThreads);
172 if (ReleaseThreadSlotAnyway()) // it was the last worker
280 if (ReleaseThreadSlotAnyway()) // it was the last worker
173 m_hasTasks.Dispose();
281 m_hasTasks.Dispose();
174 else
282 else
175 m_hasTasks.Set(); // wake next worker
283 SignalThread(); // wake next worker
176 unit = default(TUnit);
284 unit = default(TUnit);
177 return false;
285 break;
178 }
286 }
179
287
180 // fetch task
288 // fetch task
181 if (TryDequeue(out unit)) {
289 if (TryDequeue(out unit)) {
182 ExtendPool();
290 InvokeUnit(unit);
183 return true;
291 continue;
184 }
292 }
185
293
186 //no tasks left, exit if the thread is no longer needed
294 Interlocked.Decrement(ref m_activeThreads);
187 bool last;
188 if (ReleaseThreadSlot(out last)) {
189 if (last && m_hasTasks.WaitOne(0)) {
190 if (AllocateThreadSlot(1))
191 continue; // spin again...
192 else
193 // we failed to reallocate slot for this thread
194 // therefore we need to release the event
195 m_hasTasks.Set();
196 }
197
198 return false;
199 }
200
295
201 // entering suspend state
296 // entering suspend state
202 Interlocked.Increment(ref m_suspended);
203 // keep this thread and wait
297 // keep this thread and wait
204 Suspend();
298 if (!Suspend())
205 Interlocked.Decrement(ref m_suspended);
299 break;
300 //Console.WriteLine("{0}: Awake", Thread.CurrentThread.ManagedThreadId);
301 Interlocked.Increment(ref m_activeThreads);
206 } while (true);
302 } while (true);
207 }
303 //Console.WriteLine("{0}: Exited", Thread.CurrentThread.ManagedThreadId);
208
209 protected abstract void InvokeUnit(TUnit unit);
210
211 void Worker() {
212 TUnit unit;
213 while (FetchTask(out unit))
214 InvokeUnit(unit);
215 }
304 }
216
305
217 protected virtual void Dispose(bool disposing) {
306 protected virtual void Dispose(bool disposing) {
@@ -221,7 +310,10 namespace Implab.Parallels {
221 return;
310 return;
222
311
223 // wake sleeping threads
312 // wake sleeping threads
224 m_hasTasks.Set();
313 if (m_createdThreads > 0)
314 SignalThread();
315 else
316 m_hasTasks.Dispose();
225 GC.SuppressFinalize(this);
317 GC.SuppressFinalize(this);
226 }
318 }
227 }
319 }
@@ -42,12 +42,13 namespace Implab.Parallels {
42 next = first.next;
42 next = first.next;
43 if (next == null) {
43 if (next == null) {
44 // this is the last element,
44 // this is the last element,
45 // then try to update tail
45 // then try to update the tail
46 if (first != Interlocked.CompareExchange(ref m_last, null, first)) {
46 if (first != Interlocked.CompareExchange(ref m_last, null, first)) {
47 // this is inconsistent situation which means that the queue is empty
47 // this is a ace condition
48 if (m_last == null)
48 if (m_last == null)
49 // the queue is empty
49 return false;
50 return false;
50 // tail has been changed, that means that we need to restart
51 // tail has been changed, than we need to restart
51 continue;
52 continue;
52 }
53 }
53
54
@@ -57,16 +57,8 namespace Implab.Parallels {
57 var len = Interlocked.Increment(ref m_queueLength);
57 var len = Interlocked.Increment(ref m_queueLength);
58 m_queue.Enqueue(unit);
58 m_queue.Enqueue(unit);
59
59
60 if(!ExtendPool())
60 if (len > m_threshold*ActiveThreads)
61 WakePool();
61 GrowPool();
62 }
63
64 protected override bool ExtendPool() {
65 if (m_queueLength <= m_threshold*ThreadCount)
66 // in this case we are in active thread and it request for additional workers
67 // satisfy it only when queue is longer than threshold
68 return false;
69 return base.ExtendPool();
70 }
62 }
71
63
72 protected override bool TryDequeue(out Action unit) {
64 protected override bool TryDequeue(out Action unit) {
@@ -81,9 +73,5 namespace Implab.Parallels {
81 unit();
73 unit();
82 }
74 }
83
75
84 protected override void Suspend() {
85 if (m_queueLength == 0)
86 base.Suspend();
87 }
88 }
76 }
89 }
77 }
@@ -3,6 +3,7 using System.Collections.Generic;
3 using System.Reflection;
3 using System.Reflection;
4 using System.Diagnostics;
4 using System.Diagnostics;
5 using System.Threading;
5 using System.Threading;
6 using Implab.Parallels;
6
7
7 namespace Implab {
8 namespace Implab {
8
9
@@ -48,24 +49,53 namespace Implab {
48 /// </remarks>
49 /// </remarks>
49 public class Promise<T> : IPromise {
50 public class Promise<T> : IPromise {
50
51
51 struct ResultHandlerInfo {
52 struct HandlerDescriptor {
52 public ResultHandler<T> resultHandler;
53 public ResultHandler<T> resultHandler;
53 public ErrorHandler errorHandler;
54 public ErrorHandler errorHandler;
55 public Action cancellHandler;
56
57 public void Resolve(T result) {
58 if (resultHandler != null)
59 try {
60 resultHandler(result);
61 } catch (Exception e) {
62 Reject(e);
63 }
64 }
65
66 public void Reject(Exception err) {
67 if (errorHandler != null)
68 try {
69 errorHandler(err);
70 } catch {
71 }
72 }
73
74 public void Cancel() {
75 if (cancellHandler != null)
76 try {
77 cancellHandler();
78 } catch {
79 }
80 }
54 }
81 }
55
82
83 const int UnresolvedSate = 0;
84 const int TransitionalState = 1;
85 const int ResolvedState = 2;
86 const int RejectedState = 3;
87 const int CancelledState = 4;
88
56 readonly IPromise m_parent;
89 readonly IPromise m_parent;
57
90 readonly bool m_cancellable;
58 LinkedList<ResultHandlerInfo> m_resultHandlers = new LinkedList<ResultHandlerInfo>();
59 LinkedList<Action> m_cancelHandlers = new LinkedList<Action>();
60
91
61 readonly object m_lock = new Object();
62 readonly bool m_cancellable;
63 int m_childrenCount = 0;
92 int m_childrenCount = 0;
64
93 int m_state;
65 PromiseState m_state;
66 T m_result;
94 T m_result;
67 Exception m_error;
95 Exception m_error;
68
96
97 readonly MTQueue<HandlerDescriptor> m_handlers = new MTQueue<HandlerDescriptor>();
98
69 public Promise() {
99 public Promise() {
70 m_cancellable = true;
100 m_cancellable = true;
71 }
101 }
@@ -73,8 +103,6 namespace Implab {
73 public Promise(IPromise parent, bool cancellable) {
103 public Promise(IPromise parent, bool cancellable) {
74 m_cancellable = cancellable;
104 m_cancellable = cancellable;
75 m_parent = parent;
105 m_parent = parent;
76 if (parent != null)
77 parent.HandleCancelled(InternalCancel);
78 }
106 }
79
107
80 void InternalCancel() {
108 void InternalCancel() {
@@ -82,22 +110,39 namespace Implab {
82 Cancel(false);
110 Cancel(false);
83 }
111 }
84
112
113 bool BeginTransit() {
114 return UnresolvedSate == Interlocked.CompareExchange(ref m_state, TransitionalState, UnresolvedSate);
115 }
116
117 void CompleteTransit(int state) {
118 if (TransitionalState != Interlocked.CompareExchange(ref m_state, state, TransitionalState))
119 throw new InvalidOperationException("Can't complete transition when the object isn't in the transitional state");
120 }
121
122 public bool IsResolved {
123 get {
124 return m_state > 1;
125 }
126 }
127
128 public bool IsCancelled {
129 get {
130 return m_state == CancelledState;
131 }
132 }
133
85 /// <summary>
134 /// <summary>
86 /// Выполняет обещание, сообщая об успешном выполнении.
135 /// Выполняет обещание, сообщая об успешном выполнении.
87 /// </summary>
136 /// </summary>
88 /// <param name="result">Результат выполнения.</param>
137 /// <param name="result">Результат выполнения.</param>
89 /// <exception cref="InvalidOperationException">Данное обещание уже выполнено</exception>
138 /// <exception cref="InvalidOperationException">Данное обещание уже выполнено</exception>
90 public void Resolve(T result) {
139 public void Resolve(T result) {
91 lock (m_lock) {
140 if (BeginTransit()) {
92 if (m_state == PromiseState.Cancelled)
93 return;
94 if (m_state != PromiseState.Unresolved)
95 throw new InvalidOperationException("The promise is already resolved");
96 m_result = result;
141 m_result = result;
97 m_state = PromiseState.Resolved;
142 CompleteTransit(ResolvedState);
98 }
143 OnStateChanged();
99
144 } else if (m_state != CancelledState)
100 OnStateChanged();
145 throw new InvalidOperationException("The promise is already resolved");
101 }
146 }
102
147
103 /// <summary>
148 /// <summary>
@@ -111,16 +156,12 namespace Implab {
111 /// <param name="error">Исключение возникшее при выполнении операции</param>
156 /// <param name="error">Исключение возникшее при выполнении операции</param>
112 /// <exception cref="InvalidOperationException">Данное обещание уже выполнено</exception>
157 /// <exception cref="InvalidOperationException">Данное обещание уже выполнено</exception>
113 public void Reject(Exception error) {
158 public void Reject(Exception error) {
114 lock (m_lock) {
159 if (BeginTransit()) {
115 if (m_state == PromiseState.Cancelled || m_state == PromiseState.Rejected)
116 return;
117 if (m_state != PromiseState.Unresolved)
118 throw new InvalidOperationException("The promise is already resolved");
119 m_error = error;
160 m_error = error;
120 m_state = PromiseState.Rejected;
161 CompleteTransit(RejectedState);
121 }
162 OnStateChanged();
122
163 } else if (m_state == ResolvedState)
123 OnStateChanged();
164 throw new InvalidOperationException("The promise is already resolved");
124 }
165 }
125
166
126 /// <summary>
167 /// <summary>
@@ -144,27 +185,27 namespace Implab {
144
185
145 var medium = new Promise<T>(this, true);
186 var medium = new Promise<T>(this, true);
146
187
147 var handlerInfo = new ResultHandlerInfo();
188 ResultHandler<T> resultHandler;
148
149 if (success != null)
189 if (success != null)
150 handlerInfo.resultHandler = x => {
190 resultHandler = x => {
151 success(x);
191 success(x);
152 medium.Resolve(x);
192 medium.Resolve(x);
153 };
193 };
154 else
194 else
155 handlerInfo.resultHandler = medium.Resolve;
195 resultHandler = medium.Resolve;
156
196
197 ErrorHandler errorHandler;
157 if (error != null)
198 if (error != null)
158 handlerInfo.errorHandler = x => {
199 errorHandler = x => {
159 try {
200 try {
160 error(x);
201 error(x);
161 } catch { }
202 } catch { }
162 medium.Reject(x);
203 medium.Reject(x);
163 };
204 };
164 else
205 else
165 handlerInfo.errorHandler = medium.Reject;
206 errorHandler = medium.Reject;
166
207
167 AddHandler(handlerInfo);
208 AddHandler(resultHandler, errorHandler, medium.InternalCancel);
168
209
169 return medium;
210 return medium;
170 }
211 }
@@ -182,27 +223,28 namespace Implab {
182
223
183 var medium = new Promise<T>(this, true);
224 var medium = new Promise<T>(this, true);
184
225
185 var handlerInfo = new ResultHandlerInfo();
226 ResultHandler<T> resultHandler;
227 ErrorHandler errorHandler;
186
228
187 if (success != null)
229 if (success != null)
188 handlerInfo.resultHandler = x => {
230 resultHandler = x => {
189 success(x);
231 success(x);
190 medium.Resolve(x);
232 medium.Resolve(x);
191 };
233 };
192 else
234 else
193 handlerInfo.resultHandler = medium.Resolve;
235 resultHandler = medium.Resolve;
194
236
195 if (error != null)
237 if (error != null)
196 handlerInfo.errorHandler = x => {
238 errorHandler = x => {
197 try {
239 try {
198 medium.Resolve(error(x));
240 medium.Resolve(error(x));
199 } catch { }
241 } catch { }
200 medium.Reject(x);
242 medium.Reject(x);
201 };
243 };
202 else
244 else
203 handlerInfo.errorHandler = medium.Reject;
245 errorHandler = medium.Reject;
204
246
205 AddHandler(handlerInfo);
247 AddHandler(resultHandler, errorHandler, medium.InternalCancel);
206
248
207 return medium;
249 return medium;
208 }
250 }
@@ -214,19 +256,17 namespace Implab {
214
256
215 var medium = new Promise<T>(this, true);
257 var medium = new Promise<T>(this, true);
216
258
217 var handlerInfo = new ResultHandlerInfo();
259 ResultHandler<T> resultHandler;
218
260
219 if (success != null)
261 if (success != null)
220 handlerInfo.resultHandler = x => {
262 resultHandler = x => {
221 success(x);
263 success(x);
222 medium.Resolve(x);
264 medium.Resolve(x);
223 };
265 };
224 else
266 else
225 handlerInfo.resultHandler = medium.Resolve;
267 resultHandler = medium.Resolve;
226
268
227 handlerInfo.errorHandler = medium.Reject;
269 AddHandler(resultHandler, medium.Reject, medium.InternalCancel);
228
229 AddHandler(handlerInfo);
230
270
231 return medium;
271 return medium;
232 }
272 }
@@ -249,15 +289,17 namespace Implab {
249
289
250 var medium = new Promise<T>(this, true);
290 var medium = new Promise<T>(this, true);
251
291
252 AddHandler(new ResultHandlerInfo {
292 AddHandler(
253 errorHandler = e => {
293 null,
294 e => {
254 try {
295 try {
255 medium.Resolve(handler(e));
296 medium.Resolve(handler(e));
256 } catch (Exception e2) {
297 } catch (Exception e2) {
257 medium.Reject(e2);
298 medium.Reject(e2);
258 }
299 }
259 }
300 },
260 });
301 medium.InternalCancel
302 );
261
303
262 return medium;
304 return medium;
263 }
305 }
@@ -268,8 +310,8 namespace Implab {
268
310
269 var medium = new Promise<T>();
311 var medium = new Promise<T>();
270
312
271 AddHandler(new ResultHandlerInfo {
313 AddHandler(
272 resultHandler = x => {
314 x => {
273 // to avoid handler being called multiple times we handle exception by ourselfs
315 // to avoid handler being called multiple times we handle exception by ourselfs
274 try {
316 try {
275 handler();
317 handler();
@@ -278,13 +320,16 namespace Implab {
278 medium.Reject(e);
320 medium.Reject(e);
279 }
321 }
280 },
322 },
281 errorHandler = x => {
323
324 e => {
282 try {
325 try {
283 handler();
326 handler();
284 } catch { }
327 } catch { }
285 medium.Reject(x);
328 medium.Reject(e);
286 }
329 },
287 });
330
331 medium.InternalCancel
332 );
288
333
289 return medium;
334 return medium;
290 }
335 }
@@ -304,17 +349,22 namespace Implab {
304 // создаем прицепленное обещание
349 // создаем прицепленное обещание
305 var chained = new Promise<TNew>();
350 var chained = new Promise<TNew>();
306
351
307 AddHandler(new ResultHandlerInfo() {
352 ResultHandler<T> resultHandler = result => chained.Resolve(mapper(result));
308 resultHandler = result => chained.Resolve(mapper(result)),
353 ErrorHandler errorHandler = delegate(Exception e) {
309 errorHandler = delegate(Exception e) {
354 if (error != null)
310 if (error != null)
355 try {
311 try {
356 error(e);
312 error(e);
357 } catch { }
313 } catch { }
358 // в случае ошибки нужно передать исключение дальше по цепочке
314 // в случае ошибки нужно передать исключение дальше по цепочке
359 chained.Reject(e);
315 chained.Reject(e);
360 };
316 }
361
317 });
362
363 AddHandler(
364 resultHandler,
365 errorHandler,
366 chained.InternalCancel
367 );
318
368
319 return chained;
369 return chained;
320 }
370 }
@@ -341,27 +391,32 namespace Implab {
341 // передать через него результаты работы.
391 // передать через него результаты работы.
342 var medium = new Promise<TNew>(this, true);
392 var medium = new Promise<TNew>(this, true);
343
393
344 AddHandler(new ResultHandlerInfo {
394 ResultHandler<T> resultHandler = delegate(T result) {
345 resultHandler = delegate(T result) {
395 if (medium.IsCancelled)
346 if (medium.State == PromiseState.Cancelled)
396 return;
347 return;
348
397
349 var promise = chained(result);
398 var promise = chained(result);
350
399
351 // notify chained operation that it's not needed
400 // notify chained operation that it's not needed
352 medium.Cancelled(() => promise.Cancel());
401 medium.Cancelled(() => promise.Cancel());
353 promise.Then(
402 promise.Then(
354 x => medium.Resolve(x),
403 x => medium.Resolve(x),
355 e => medium.Reject(e)
404 e => medium.Reject(e)
356 );
405 );
357 },
406 };
358 errorHandler = delegate(Exception e) {
407
359 if (error != null)
408 ErrorHandler errorHandler = delegate(Exception e) {
360 error(e);
409 if (error != null)
361 // в случае ошибки нужно передать исключение дальше по цепочке
410 error(e);
362 medium.Reject(e);
411 // в случае ошибки нужно передать исключение дальше по цепочке
363 }
412 medium.Reject(e);
364 });
413 };
414
415 AddHandler(
416 resultHandler,
417 errorHandler,
418 medium.InternalCancel
419 );
365
420
366 return medium;
421 return medium;
367 }
422 }
@@ -371,19 +426,19 namespace Implab {
371 }
426 }
372
427
373 public Promise<T> Cancelled(Action handler) {
428 public Promise<T> Cancelled(Action handler) {
374 if (handler == null)
429 AddHandler(null, null, handler);
375 return this;
376 lock (m_lock) {
377 if (m_state == PromiseState.Unresolved)
378 m_cancelHandlers.AddLast(handler);
379 else if (m_state == PromiseState.Cancelled)
380 handler();
381 }
382 return this;
430 return this;
383 }
431 }
384
432
385 public void HandleCancelled(Action handler) {
433 public Promise<T> Finally(Action handler) {
386 Cancelled(handler);
434 if (handler == null)
435 throw new ArgumentNullException("handler");
436 AddHandler(
437 x => handler(),
438 e => handler(),
439 handler
440 );
441 return this;
387 }
442 }
388
443
389 /// <summary>
444 /// <summary>
@@ -415,15 +470,15 namespace Implab {
415 if (!evt.WaitOne(timeout, true))
470 if (!evt.WaitOne(timeout, true))
416 throw new TimeoutException();
471 throw new TimeoutException();
417
472
418 switch (State) {
473 switch (m_state) {
419 case PromiseState.Resolved:
474 case ResolvedState:
420 return m_result;
475 return m_result;
421 case PromiseState.Cancelled:
476 case CancelledState:
422 throw new OperationCanceledException();
477 throw new OperationCanceledException();
423 case PromiseState.Rejected:
478 case RejectedState:
424 throw new TargetInvocationException(m_error);
479 throw new TargetInvocationException(m_error);
425 default:
480 default:
426 throw new ApplicationException(String.Format("Invalid promise state {0}", State));
481 throw new ApplicationException(String.Format("Invalid promise state {0}", m_state));
427 }
482 }
428 }
483 }
429
484
@@ -431,40 +486,45 namespace Implab {
431 return Join(Timeout.Infinite);
486 return Join(Timeout.Infinite);
432 }
487 }
433
488
434 void AddHandler(ResultHandlerInfo handler) {
489 void AddHandler(ResultHandler<T> success, ErrorHandler error, Action cancel) {
435 bool invokeRequired = false;
490 Interlocked.Increment(ref m_childrenCount);
491
492 HandlerDescriptor handler = new HandlerDescriptor {
493 resultHandler = success,
494 errorHandler = error,
495 cancellHandler = cancel
496 };
436
497
437 lock (m_lock) {
498 bool queued;
438 m_childrenCount++;
499
439 if (m_state == PromiseState.Unresolved) {
500 if (!IsResolved) {
440 m_resultHandlers.AddLast(handler);
501 m_handlers.Enqueue(handler);
441 } else
502 queued = true;
442 invokeRequired = true;
503 } else {
504 // the promise is in resolved state, just invoke the handled with minimum overhead
505 queued = false;
506 InvokeHandler(handler);
443 }
507 }
444
508
445 // обработчики не должны блокировать сам объект
509 if (queued && IsResolved && m_handlers.TryDequeue(out handler))
446 if (invokeRequired)
510 // if the promise have been resolved while we was adding handler to the queue
511 // we can't guarantee that someone is still processing it
512 // therefore we will fetch a handler from the queue and execute it
513 // note that fetched handler may be not the one we have added
447 InvokeHandler(handler);
514 InvokeHandler(handler);
515
448 }
516 }
449
517
450 void InvokeHandler(ResultHandlerInfo handler) {
518 void InvokeHandler(HandlerDescriptor handler) {
451 switch (m_state) {
519 switch (m_state) {
452 case PromiseState.Resolved:
520 case ResolvedState:
453 try {
521 handler.Resolve(m_result);
454 if (handler.resultHandler != null)
455 handler.resultHandler(m_result);
456 } catch (Exception e) {
457 try {
458 if (handler.errorHandler != null)
459 handler.errorHandler(e);
460 } catch { }
461 }
462 break;
522 break;
463 case PromiseState.Rejected:
523 case RejectedState:
464 try {
524 handler.Reject(m_error);
465 if (handler.errorHandler != null)
525 break;
466 handler.errorHandler(m_error);
526 case CancelledState:
467 } catch { }
527 handler.Cancel();
468 break;
528 break;
469 default:
529 default:
470 // do nothing
530 // do nothing
@@ -473,76 +533,31 namespace Implab {
473 }
533 }
474
534
475 protected virtual void OnStateChanged() {
535 protected virtual void OnStateChanged() {
476 switch (m_state) {
536 HandlerDescriptor handler;
477 case PromiseState.Resolved:
537 while (m_handlers.TryDequeue(out handler))
478 foreach (var resultHandlerInfo in m_resultHandlers)
538 InvokeHandler(handler);
479 try {
480 if (resultHandlerInfo.resultHandler != null)
481 resultHandlerInfo.resultHandler(m_result);
482 } catch (Exception e) {
483 try {
484 if (resultHandlerInfo.errorHandler != null)
485 resultHandlerInfo.errorHandler(e);
486 } catch { }
487 }
488 break;
489 case PromiseState.Cancelled:
490 foreach (var cancelHandler in m_cancelHandlers)
491 cancelHandler();
492 break;
493 case PromiseState.Rejected:
494 foreach (var resultHandlerInfo in m_resultHandlers)
495 try {
496 if (resultHandlerInfo.errorHandler != null)
497 resultHandlerInfo.errorHandler(m_error);
498 } catch { }
499 break;
500 default:
501 throw new InvalidOperationException(String.Format("Promise entered an invalid state {0}", m_state));
502 }
503
504 m_resultHandlers = null;
505 m_cancelHandlers = null;
506 }
539 }
507
540
508
541
509
542
510 public bool IsExclusive {
543 public bool IsExclusive {
511 get {
544 get {
512 lock (m_lock) {
545 return m_childrenCount <= 1;
513 return m_childrenCount <= 1;
514 }
515 }
516 }
517
518 public PromiseState State {
519 get {
520 lock (m_lock) {
521 return m_state;
522 }
523 }
546 }
524 }
547 }
525
548
526 protected bool Cancel(bool dependencies) {
549 protected bool Cancel(bool dependencies) {
527 bool result;
550 if (BeginTransit()) {
528
551 CompleteTransit(CancelledState);
529 lock (m_lock) {
530 if (m_state == PromiseState.Unresolved) {
531 m_state = PromiseState.Cancelled;
532 result = true;
533 } else {
534 result = false;
535 }
536 }
537
538 if (result)
539 OnStateChanged();
552 OnStateChanged();
540
553
541 if (dependencies && m_parent != null && m_parent.IsExclusive) {
554 if (dependencies && m_parent != null && m_parent.IsExclusive)
542 m_parent.Cancel();
555 m_parent.Cancel();
556
557 return true;
558 } else {
559 return false;
543 }
560 }
544
545 return result;
546 }
561 }
547
562
548 }
563 }
General Comments 0
You need to be logged in to leave comments. Login now