##// END OF EJS Templates
fixed timeout handling in promises
cin -
r148:e6d4b41f0101 v2
parent child
Show More
@@ -1,353 +1,353
1 1 using System;
2 2 using Implab.Parallels;
3 3 using System.Threading;
4 4 using System.Reflection;
5 5
6 6 namespace Implab {
7 7 public abstract class AbstractEvent<THandler> : ICancellationToken, ICancellable {
8 8
9 9 const int UNRESOLVED_SATE = 0;
10 10 const int TRANSITIONAL_STATE = 1;
11 11 const int SUCCEEDED_STATE = 2;
12 12 const int REJECTED_STATE = 3;
13 13 const int CANCELLED_STATE = 4;
14 14
15 15 const int CANCEL_NOT_REQUESTED = 0;
16 16 const int CANCEL_REQUESTING = 1;
17 17 const int CANCEL_REQUESTED = 2;
18 18
19 19 const int RESERVED_HANDLERS_COUNT = 4;
20 20
21 21 int m_state;
22 22 Exception m_error;
23 23 int m_handlersCount;
24 24
25 25 readonly THandler[] m_handlers = new THandler[RESERVED_HANDLERS_COUNT];
26 26 MTQueue<THandler> m_extraHandlers;
27 27 int m_handlerPointer = -1;
28 28 int m_handlersCommited;
29 29
30 30 int m_cancelRequest;
31 31 Exception m_cancelationReason;
32 32 MTQueue<Action<Exception>> m_cancelationHandlers;
33 33
34 34
35 35 #region state managment
36 36 bool BeginTransit() {
37 37 return UNRESOLVED_SATE == Interlocked.CompareExchange(ref m_state, TRANSITIONAL_STATE, UNRESOLVED_SATE);
38 38 }
39 39
40 40 void CompleteTransit(int state) {
41 41 if (TRANSITIONAL_STATE != Interlocked.CompareExchange(ref m_state, state, TRANSITIONAL_STATE))
42 42 throw new InvalidOperationException("Can't complete transition when the object isn't in the transitional state");
43 43 }
44 44
45 45 void WaitTransition() {
46 46 while (m_state == TRANSITIONAL_STATE) {
47 47 Thread.MemoryBarrier();
48 48 }
49 49 }
50 50
51 51 protected bool BeginSetResult() {
52 52 if (!BeginTransit()) {
53 53 WaitTransition();
54 54 if (m_state != CANCELLED_STATE)
55 55 throw new InvalidOperationException("The promise is already resolved");
56 56 return false;
57 57 }
58 58 return true;
59 59 }
60 60
61 61 protected void EndSetResult() {
62 62 CompleteTransit(SUCCEEDED_STATE);
63 63 OnSuccess();
64 64 }
65 65
66 66
67 67
68 68 /// <summary>
69 69 /// ВыполняСт ΠΎΠ±Π΅Ρ‰Π°Π½ΠΈΠ΅, сообщая ΠΎΠ± ошибкС
70 70 /// </summary>
71 71 /// <remarks>
72 72 /// ΠŸΠΎΡΠΊΠΎΠ»ΡŒΠΊΡƒ ΠΎΠ±Π΅Ρ‰Π°Π½ΠΈΠ΅ Π΄ΠΎΠ»ΠΆΠ½ΠΎ Ρ€Π°Π±ΠΎΡ‚Π°Ρ‚ΡŒ Π² ΠΌΠ½ΠΎΠ³ΠΎΠΏΡ‚ΠΎΡ‡Π½ΠΎΠΉ срСдС, ΠΏΡ€ΠΈ Π΅Π³ΠΎ Π²Ρ‹ΠΏΠΎΠ»Π½Π΅Π½ΠΈΠΈ сразу нСсколько ΠΏΠΎΡ‚ΠΎΠΊΠΎΠ²
73 73 /// ΠΌΠΎΠ³Ρƒ Π²Π΅Ρ€Π½ΡƒΡ‚ΡŒ ΠΎΡˆΠΈΠ±ΠΊΡƒ, ΠΏΡ€ΠΈ этом Ρ‚ΠΎΠ»ΡŒΠΊΠΎ пСрвая Π±ΡƒΠ΄Π΅Ρ‚ использована Π² качСствС Ρ€Π΅Π·ΡƒΠ»ΡŒΡ‚Π°Ρ‚Π°, ΠΎΡΡ‚Π°Π»ΡŒΠ½Ρ‹Π΅
74 74 /// Π±ΡƒΠ΄ΡƒΡ‚ ΠΏΡ€ΠΎΠΈΠ³Π½ΠΎΡ€ΠΈΡ€ΠΎΠ²Π°Π½Ρ‹.
75 75 /// </remarks>
76 76 /// <param name="error">Π˜ΡΠΊΠ»ΡŽΡ‡Π΅Π½ΠΈΠ΅ возникшСС ΠΏΡ€ΠΈ Π²Ρ‹ΠΏΠΎΠ»Π½Π΅Π½ΠΈΠΈ ΠΎΠΏΠ΅Ρ€Π°Ρ†ΠΈΠΈ</param>
77 77 /// <exception cref="InvalidOperationException">Π”Π°Π½Π½ΠΎΠ΅ ΠΎΠ±Π΅Ρ‰Π°Π½ΠΈΠ΅ ΡƒΠΆΠ΅ Π²Ρ‹ΠΏΠΎΠ»Π½Π΅Π½ΠΎ</exception>
78 78 protected void SetError(Exception error) {
79 79 if (BeginTransit()) {
80 80 if (error is OperationCanceledException) {
81 81 CompleteTransit(CANCELLED_STATE);
82 82 m_error = error.InnerException;
83 83 OnCancelled();
84 84 } else {
85 85 m_error = error is PromiseTransientException ? error.InnerException : error;
86 86 CompleteTransit(REJECTED_STATE);
87 87 OnError();
88 88 }
89 89 } else {
90 90 WaitTransition();
91 91 if (m_state == SUCCEEDED_STATE)
92 92 throw new InvalidOperationException("The promise is already resolved");
93 93 }
94 94 }
95 95
96 96 /// <summary>
97 97 /// ΠžΡ‚ΠΌΠ΅Π½ΡΠ΅Ρ‚ ΠΎΠΏΠ΅Ρ€Π°Ρ†ΠΈΡŽ, Ссли это Π²ΠΎΠ·ΠΌΠΎΠΆΠ½ΠΎ.
98 98 /// </summary>
99 99 /// <remarks>Для опрСдСлСния Π±Ρ‹Π»Π° Π»ΠΈ опСрация ΠΎΡ‚ΠΌΠ΅Π½Π΅Π½Π° слСдуСт ΠΈΡΠΏΠΎΠ»ΡŒΠ·ΠΎΠ²Π°Ρ‚ΡŒ свойство <see cref="IsCancelled"/>.</remarks>
100 100 protected void SetCancelled(Exception reason) {
101 101 if (BeginTransit()) {
102 102 m_error = reason;
103 103 CompleteTransit(CANCELLED_STATE);
104 104 OnCancelled();
105 105 }
106 106 }
107 107
108 108 protected abstract void SignalSuccess(THandler handler);
109 109
110 110 protected abstract void SignalError(THandler handler, Exception error);
111 111
112 112 protected abstract void SignalCancelled(THandler handler, Exception reason);
113 113
114 114 void OnSuccess() {
115 115 var hp = m_handlerPointer;
116 116 var slot = hp +1 ;
117 117 while (slot < m_handlersCommited) {
118 118 if (Interlocked.CompareExchange(ref m_handlerPointer, slot, hp) == hp) {
119 119 SignalSuccess(m_handlers[slot]);
120 120 }
121 121 hp = m_handlerPointer;
122 122 slot = hp +1 ;
123 123 }
124 124
125 125
126 126 if (m_extraHandlers != null) {
127 127 THandler handler;
128 128 while (m_extraHandlers.TryDequeue(out handler))
129 129 SignalSuccess(handler);
130 130 }
131 131 }
132 132
133 133 void OnError() {
134 134 var hp = m_handlerPointer;
135 135 var slot = hp +1 ;
136 136 while (slot < m_handlersCommited) {
137 137 if (Interlocked.CompareExchange(ref m_handlerPointer, slot, hp) == hp) {
138 138 SignalError(m_handlers[slot],m_error);
139 139 }
140 140 hp = m_handlerPointer;
141 141 slot = hp +1 ;
142 142 }
143 143
144 144 if (m_extraHandlers != null) {
145 145 THandler handler;
146 146 while (m_extraHandlers.TryDequeue(out handler))
147 147 SignalError(handler, m_error);
148 148 }
149 149 }
150 150
151 151 void OnCancelled() {
152 152 var hp = m_handlerPointer;
153 153 var slot = hp +1 ;
154 154 while (slot < m_handlersCommited) {
155 155 if (Interlocked.CompareExchange(ref m_handlerPointer, slot, hp) == hp) {
156 156 SignalCancelled(m_handlers[slot], m_error);
157 157 }
158 158 hp = m_handlerPointer;
159 159 slot = hp +1 ;
160 160 }
161 161
162 162 if (m_extraHandlers != null) {
163 163 THandler handler;
164 164 while (m_extraHandlers.TryDequeue(out handler))
165 165 SignalCancelled(handler, m_error);
166 166 }
167 167 }
168 168
169 169 #endregion
170 170
171 171 protected abstract Signal GetResolveSignal();
172 172
173 173 #region synchronization traits
174 174 protected void WaitResult(int timeout) {
175 if (!IsResolved)
176 GetResolveSignal().Wait(timeout);
175 if (!(IsResolved || GetResolveSignal().Wait(timeout)))
176 throw new TimeoutException();
177 177
178 178 switch (m_state) {
179 179 case SUCCEEDED_STATE:
180 180 return;
181 181 case CANCELLED_STATE:
182 182 throw new OperationCanceledException();
183 183 case REJECTED_STATE:
184 184 throw new TargetInvocationException(m_error);
185 185 default:
186 186 throw new ApplicationException(String.Format("Invalid promise state {0}", m_state));
187 187 }
188 188 }
189 189 #endregion
190 190
191 191 #region handlers managment
192 192
193 193 protected void AddHandler(THandler handler) {
194 194
195 195 if (m_state > 1) {
196 196 // the promise is in the resolved state, just invoke the handler
197 197 InvokeHandler(handler);
198 198 } else {
199 199 var slot = Interlocked.Increment(ref m_handlersCount) - 1;
200 200
201 201 if (slot < RESERVED_HANDLERS_COUNT) {
202 202
203 203 m_handlers[slot] = handler;
204 204
205 205 while (slot != Interlocked.CompareExchange(ref m_handlersCommited, slot + 1, slot)) {
206 206 }
207 207
208 208 if (m_state > 1) {
209 209 do {
210 210 var hp = m_handlerPointer;
211 211 slot = hp + 1;
212 212 if (slot < m_handlersCommited) {
213 213 if (Interlocked.CompareExchange(ref m_handlerPointer, slot, hp) != hp)
214 214 continue;
215 215 InvokeHandler(m_handlers[slot]);
216 216 }
217 217 break;
218 218 } while(true);
219 219 }
220 220 } else {
221 221 if (slot == RESERVED_HANDLERS_COUNT) {
222 222 m_extraHandlers = new MTQueue<THandler>();
223 223 } else {
224 224 while (m_extraHandlers == null)
225 225 Thread.MemoryBarrier();
226 226 }
227 227
228 228 m_extraHandlers.Enqueue(handler);
229 229
230 230 if (m_state > 1 && m_extraHandlers.TryDequeue(out handler))
231 231 // if the promise have been resolved while we was adding the handler to the queue
232 232 // we can't guarantee that someone is still processing it
233 233 // therefore we need to fetch a handler from the queue and execute it
234 234 // note that fetched handler may be not the one that we have added
235 235 // even we can fetch no handlers at all :)
236 236 InvokeHandler(handler);
237 237 }
238 238 }
239 239 }
240 240
241 241 protected void InvokeHandler(THandler handler) {
242 242 switch (m_state) {
243 243 case SUCCEEDED_STATE:
244 244 SignalSuccess(handler);
245 245 break;
246 246 case CANCELLED_STATE:
247 247 SignalCancelled(handler, m_error);
248 248 break;
249 249 case REJECTED_STATE:
250 250 SignalError(handler, m_error);
251 251 break;
252 252 default:
253 253 throw new Exception(String.Format("Invalid promise state {0}", m_state));
254 254 }
255 255 }
256 256
257 257 #endregion
258 258
259 259 #region IPromise implementation
260 260
261 261 public bool IsResolved {
262 262 get {
263 263 Thread.MemoryBarrier();
264 264 return m_state > 1;
265 265 }
266 266 }
267 267
268 268 public bool IsCancelled {
269 269 get {
270 270 Thread.MemoryBarrier();
271 271 return m_state == CANCELLED_STATE;
272 272 }
273 273 }
274 274
275 275 #endregion
276 276
277 277 public Exception Error {
278 278 get {
279 279 return m_error;
280 280 }
281 281 }
282 282
283 283 public bool CancelOperationIfRequested() {
284 284 if (IsCancellationRequested) {
285 285 CancelOperation(CancellationReason);
286 286 return true;
287 287 }
288 288 return false;
289 289 }
290 290
291 291 public virtual void CancelOperation(Exception reason) {
292 292 SetCancelled(reason);
293 293 }
294 294
295 295 public void CancellationRequested(Action<Exception> handler) {
296 296 Safe.ArgumentNotNull(handler, "handler");
297 297 if (IsCancellationRequested)
298 298 handler(CancellationReason);
299 299
300 300 if (m_cancelationHandlers == null)
301 301 Interlocked.CompareExchange(ref m_cancelationHandlers, new MTQueue<Action<Exception>>(), null);
302 302
303 303 m_cancelationHandlers.Enqueue(handler);
304 304
305 305 if (IsCancellationRequested && m_cancelationHandlers.TryDequeue(out handler))
306 306 // TryDeque implies MemoryBarrier()
307 307 handler(m_cancelationReason);
308 308 }
309 309
310 310 public bool IsCancellationRequested {
311 311 get {
312 312 do {
313 313 if (m_cancelRequest == CANCEL_NOT_REQUESTED)
314 314 return false;
315 315 if (m_cancelRequest == CANCEL_REQUESTED)
316 316 return true;
317 317 Thread.MemoryBarrier();
318 318 } while(true);
319 319 }
320 320 }
321 321
322 322 public Exception CancellationReason {
323 323 get {
324 324 do {
325 325 Thread.MemoryBarrier();
326 326 } while(m_cancelRequest == CANCEL_REQUESTING);
327 327
328 328 return m_cancelationReason;
329 329 }
330 330 }
331 331
332 332 #region ICancellable implementation
333 333
334 334 public void Cancel() {
335 335 Cancel(null);
336 336 }
337 337
338 338 public void Cancel(Exception reason) {
339 339 if (CANCEL_NOT_REQUESTED == Interlocked.CompareExchange(ref m_cancelRequest, CANCEL_REQUESTING, CANCEL_NOT_REQUESTED)) {
340 340 m_cancelationReason = reason;
341 341 m_cancelRequest = CANCEL_REQUESTED;
342 342 if (m_cancelationHandlers != null) {
343 343 Action<Exception> handler;
344 344 while (m_cancelationHandlers.TryDequeue(out handler))
345 345 handler(m_cancelationReason);
346 346 }
347 347 }
348 348 }
349 349
350 350 #endregion
351 351 }
352 352 }
353 353
General Comments 0
You need to be logged in to leave comments. Login now