##// END OF EJS Templates
working on cancelation and error handling
cin -
r186:75103928da09 ref20160224
parent child
Show More
@@ -1,305 +1,300
1 using System;
1 using System;
2 using Implab.Parallels;
2 using Implab.Parallels;
3 using System.Threading;
3 using System.Threading;
4 using System.Reflection;
4 using System.Reflection;
5
5
6 namespace Implab {
6 namespace Implab {
7 public abstract class AbstractEvent<THandler> : ICancellationToken, ICancellable {
7 public abstract class AbstractEvent<THandler> : ICancellationToken, ICancellable {
8
8
9 const int UNRESOLVED_SATE = 0;
9 const int UNRESOLVED_SATE = 0;
10 const int TRANSITIONAL_STATE = 1;
10 const int TRANSITIONAL_STATE = 1;
11 protected const int SUCCEEDED_STATE = 2;
11 protected const int SUCCEEDED_STATE = 2;
12 protected const int REJECTED_STATE = 3;
12 protected const int REJECTED_STATE = 3;
13 protected const int CANCELLED_STATE = 4;
13 protected const int CANCELLED_STATE = 4;
14
14
15 const int CANCEL_NOT_REQUESTED = 0;
15 const int CANCEL_NOT_REQUESTED = 0;
16 const int CANCEL_REQUESTING = 1;
16 const int CANCEL_REQUESTING = 1;
17 const int CANCEL_REQUESTED = 2;
17 const int CANCEL_REQUESTED = 2;
18
18
19 const int RESERVED_HANDLERS_COUNT = 4;
19 const int RESERVED_HANDLERS_COUNT = 4;
20
20
21 int m_state;
21 int m_state;
22 Exception m_error;
22 Exception m_error;
23 int m_handlersCount;
23 int m_handlersCount;
24
24
25 //readonly THandler[] m_handlers = new THandler[RESERVED_HANDLERS_COUNT];
25 //readonly THandler[] m_handlers = new THandler[RESERVED_HANDLERS_COUNT];
26 THandler[] m_handlers;
26 THandler[] m_handlers;
27 MTQueue<THandler> m_extraHandlers;
27 MTQueue<THandler> m_extraHandlers;
28 int m_handlerPointer = -1;
28 int m_handlerPointer = -1;
29 int m_handlersCommited;
29 int m_handlersCommited;
30
30
31 int m_cancelRequest;
31 int m_cancelRequest;
32 Exception m_cancelationReason;
32 Exception m_cancelationReason;
33 MTQueue<Action<Exception>> m_cancelationHandlers;
33 MTQueue<Action<Exception>> m_cancelationHandlers;
34
34
35
35
36 #region state managment
36 #region state managment
37 bool BeginTransit() {
37 bool BeginTransit() {
38 return UNRESOLVED_SATE == Interlocked.CompareExchange(ref m_state, TRANSITIONAL_STATE, UNRESOLVED_SATE);
38 return UNRESOLVED_SATE == Interlocked.CompareExchange(ref m_state, TRANSITIONAL_STATE, UNRESOLVED_SATE);
39 }
39 }
40
40
41 void CompleteTransit(int state) {
41 void CompleteTransit(int state) {
42 if (TRANSITIONAL_STATE != Interlocked.CompareExchange(ref m_state, state, TRANSITIONAL_STATE))
42 if (TRANSITIONAL_STATE != Interlocked.CompareExchange(ref m_state, state, TRANSITIONAL_STATE))
43 throw new InvalidOperationException("Can't complete transition when the object isn't in the transitional state");
43 throw new InvalidOperationException("Can't complete transition when the object isn't in the transitional state");
44 }
44 }
45
45
46 void WaitTransition() {
46 void WaitTransition() {
47 while (m_state == TRANSITIONAL_STATE) {
47 while (m_state == TRANSITIONAL_STATE) {
48 Thread.MemoryBarrier();
48 Thread.MemoryBarrier();
49 }
49 }
50 }
50 }
51
51
52 protected bool BeginSetResult() {
52 protected bool BeginSetResult() {
53 if (!BeginTransit()) {
53 if (!BeginTransit()) {
54 WaitTransition();
54 WaitTransition();
55 if (m_state != CANCELLED_STATE)
55 if (m_state != CANCELLED_STATE)
56 throw new InvalidOperationException("The promise is already resolved");
56 throw new InvalidOperationException("The promise is already resolved");
57 return false;
57 return false;
58 }
58 }
59 return true;
59 return true;
60 }
60 }
61
61
62 protected void EndSetResult() {
62 protected void EndSetResult() {
63 CompleteTransit(SUCCEEDED_STATE);
63 CompleteTransit(SUCCEEDED_STATE);
64 Signal();
64 Signal();
65 }
65 }
66
66
67
67
68
68
69 /// <summary>
69 /// <summary>
70 /// ВыполняСт ΠΎΠ±Π΅Ρ‰Π°Π½ΠΈΠ΅, сообщая ΠΎΠ± ошибкС
70 /// ВыполняСт ΠΎΠ±Π΅Ρ‰Π°Π½ΠΈΠ΅, сообщая ΠΎΠ± ошибкС
71 /// </summary>
71 /// </summary>
72 /// <remarks>
72 /// <remarks>
73 /// ΠŸΠΎΡΠΊΠΎΠ»ΡŒΠΊΡƒ ΠΎΠ±Π΅Ρ‰Π°Π½ΠΈΠ΅ Π΄ΠΎΠ»ΠΆΠ½ΠΎ Ρ€Π°Π±ΠΎΡ‚Π°Ρ‚ΡŒ Π² ΠΌΠ½ΠΎΠ³ΠΎΠΏΡ‚ΠΎΡ‡Π½ΠΎΠΉ срСдС, ΠΏΡ€ΠΈ Π΅Π³ΠΎ Π²Ρ‹ΠΏΠΎΠ»Π½Π΅Π½ΠΈΠΈ сразу нСсколько ΠΏΠΎΡ‚ΠΎΠΊΠΎΠ²
73 /// ΠŸΠΎΡΠΊΠΎΠ»ΡŒΠΊΡƒ ΠΎΠ±Π΅Ρ‰Π°Π½ΠΈΠ΅ Π΄ΠΎΠ»ΠΆΠ½ΠΎ Ρ€Π°Π±ΠΎΡ‚Π°Ρ‚ΡŒ Π² ΠΌΠ½ΠΎΠ³ΠΎΠΏΡ‚ΠΎΡ‡Π½ΠΎΠΉ срСдС, ΠΏΡ€ΠΈ Π΅Π³ΠΎ Π²Ρ‹ΠΏΠΎΠ»Π½Π΅Π½ΠΈΠΈ сразу нСсколько ΠΏΠΎΡ‚ΠΎΠΊΠΎΠ²
74 /// ΠΌΠΎΠ³Ρƒ Π²Π΅Ρ€Π½ΡƒΡ‚ΡŒ ΠΎΡˆΠΈΠ±ΠΊΡƒ, ΠΏΡ€ΠΈ этом Ρ‚ΠΎΠ»ΡŒΠΊΠΎ пСрвая Π±ΡƒΠ΄Π΅Ρ‚ использована Π² качСствС Ρ€Π΅Π·ΡƒΠ»ΡŒΡ‚Π°Ρ‚Π°, ΠΎΡΡ‚Π°Π»ΡŒΠ½Ρ‹Π΅
74 /// ΠΌΠΎΠ³Ρƒ Π²Π΅Ρ€Π½ΡƒΡ‚ΡŒ ΠΎΡˆΠΈΠ±ΠΊΡƒ, ΠΏΡ€ΠΈ этом Ρ‚ΠΎΠ»ΡŒΠΊΠΎ пСрвая Π±ΡƒΠ΄Π΅Ρ‚ использована Π² качСствС Ρ€Π΅Π·ΡƒΠ»ΡŒΡ‚Π°Ρ‚Π°, ΠΎΡΡ‚Π°Π»ΡŒΠ½Ρ‹Π΅
75 /// Π±ΡƒΠ΄ΡƒΡ‚ ΠΏΡ€ΠΎΠΈΠ³Π½ΠΎΡ€ΠΈΡ€ΠΎΠ²Π°Π½Ρ‹.
75 /// Π±ΡƒΠ΄ΡƒΡ‚ ΠΏΡ€ΠΎΠΈΠ³Π½ΠΎΡ€ΠΈΡ€ΠΎΠ²Π°Π½Ρ‹.
76 /// </remarks>
76 /// </remarks>
77 /// <param name="error">Π˜ΡΠΊΠ»ΡŽΡ‡Π΅Π½ΠΈΠ΅ возникшСС ΠΏΡ€ΠΈ Π²Ρ‹ΠΏΠΎΠ»Π½Π΅Π½ΠΈΠΈ ΠΎΠΏΠ΅Ρ€Π°Ρ†ΠΈΠΈ</param>
77 /// <param name="error">Π˜ΡΠΊΠ»ΡŽΡ‡Π΅Π½ΠΈΠ΅ возникшСС ΠΏΡ€ΠΈ Π²Ρ‹ΠΏΠΎΠ»Π½Π΅Π½ΠΈΠΈ ΠΎΠΏΠ΅Ρ€Π°Ρ†ΠΈΠΈ</param>
78 /// <exception cref="InvalidOperationException">Π”Π°Π½Π½ΠΎΠ΅ ΠΎΠ±Π΅Ρ‰Π°Π½ΠΈΠ΅ ΡƒΠΆΠ΅ Π²Ρ‹ΠΏΠΎΠ»Π½Π΅Π½ΠΎ</exception>
78 /// <exception cref="InvalidOperationException">Π”Π°Π½Π½ΠΎΠ΅ ΠΎΠ±Π΅Ρ‰Π°Π½ΠΈΠ΅ ΡƒΠΆΠ΅ Π²Ρ‹ΠΏΠΎΠ»Π½Π΅Π½ΠΎ</exception>
79 protected void SetError(Exception error) {
79 protected void SetError(Exception error) {
80 while (error is PromiseTransientException)
81 error = error.InnerException;
82
83 var isCancel = error is OperationCanceledException;
84
85 if (BeginTransit()) {
80 if (BeginTransit()) {
86 m_error = isCancel ? error.InnerException : error;
81 m_error = error;
87 CompleteTransit(isCancel ? CANCELLED_STATE : REJECTED_STATE);
82 CompleteTransit(REJECTED_STATE);
88
83
89 Signal();
84 Signal();
90 } else {
85 } else {
91 WaitTransition();
86 WaitTransition();
92 if (!isCancel || m_state == SUCCEEDED_STATE)
87 if (m_state == SUCCEEDED_STATE)
93 throw new InvalidOperationException("The promise is already resolved");
88 throw new InvalidOperationException("The promise is already resolved");
94 }
89 }
95 }
90 }
96
91
97 /// <summary>
92 /// <summary>
98 /// ΠžΡ‚ΠΌΠ΅Π½ΡΠ΅Ρ‚ ΠΎΠΏΠ΅Ρ€Π°Ρ†ΠΈΡŽ, Ссли это Π²ΠΎΠ·ΠΌΠΎΠΆΠ½ΠΎ.
93 /// ΠžΡ‚ΠΌΠ΅Π½ΡΠ΅Ρ‚ ΠΎΠΏΠ΅Ρ€Π°Ρ†ΠΈΡŽ, Ссли это Π²ΠΎΠ·ΠΌΠΎΠΆΠ½ΠΎ.
99 /// </summary>
94 /// </summary>
100 /// <remarks>Для опрСдСлСния Π±Ρ‹Π»Π° Π»ΠΈ опСрация ΠΎΡ‚ΠΌΠ΅Π½Π΅Π½Π° слСдуСт ΠΈΡΠΏΠΎΠ»ΡŒΠ·ΠΎΠ²Π°Ρ‚ΡŒ свойство <see cref="IsCancelled"/>.</remarks>
95 /// <remarks>Для опрСдСлСния Π±Ρ‹Π»Π° Π»ΠΈ опСрация ΠΎΡ‚ΠΌΠ΅Π½Π΅Π½Π° слСдуСт ΠΈΡΠΏΠΎΠ»ΡŒΠ·ΠΎΠ²Π°Ρ‚ΡŒ свойство <see cref="IsCancelled"/>.</remarks>
101 protected void SetCancelled(Exception reason) {
96 protected void SetCancelled(Exception reason) {
102 if (BeginTransit()) {
97 if (BeginTransit()) {
103 m_error = reason;
98 m_error = reason;
104 CompleteTransit(CANCELLED_STATE);
99 CompleteTransit(CANCELLED_STATE);
105 Signal();
100 Signal();
106 }
101 }
107 }
102 }
108
103
109 protected abstract void SignalHandler(THandler handler, int signal);
104 protected abstract void SignalHandler(THandler handler, int signal);
110
105
111 void Signal() {
106 void Signal() {
112 var hp = m_handlerPointer;
107 var hp = m_handlerPointer;
113 var slot = hp +1 ;
108 var slot = hp +1 ;
114 while (slot < m_handlersCommited) {
109 while (slot < m_handlersCommited) {
115 if (Interlocked.CompareExchange(ref m_handlerPointer, slot, hp) == hp) {
110 if (Interlocked.CompareExchange(ref m_handlerPointer, slot, hp) == hp) {
116 SignalHandler(m_handlers[slot], m_state);
111 SignalHandler(m_handlers[slot], m_state);
117 }
112 }
118 hp = m_handlerPointer;
113 hp = m_handlerPointer;
119 slot = hp +1 ;
114 slot = hp +1 ;
120 }
115 }
121
116
122
117
123 if (m_extraHandlers != null) {
118 if (m_extraHandlers != null) {
124 THandler handler;
119 THandler handler;
125 while (m_extraHandlers.TryDequeue(out handler))
120 while (m_extraHandlers.TryDequeue(out handler))
126 SignalHandler(handler, m_state);
121 SignalHandler(handler, m_state);
127 }
122 }
128 }
123 }
129
124
130 #endregion
125 #endregion
131
126
132 protected abstract Signal GetResolveSignal();
127 protected abstract Signal GetResolveSignal();
133
128
134 #region synchronization traits
129 #region synchronization traits
135 protected void WaitResult(int timeout) {
130 protected void WaitResult(int timeout) {
136 if (!(IsResolved || GetResolveSignal().Wait(timeout)))
131 if (!(IsResolved || GetResolveSignal().Wait(timeout)))
137 throw new TimeoutException();
132 throw new TimeoutException();
138
133
139 switch (m_state) {
134 switch (m_state) {
140 case SUCCEEDED_STATE:
135 case SUCCEEDED_STATE:
141 return;
136 return;
142 case CANCELLED_STATE:
137 case CANCELLED_STATE:
143 throw new OperationCanceledException();
138 throw new OperationCanceledException("The operation has been cancelled", m_error);
144 case REJECTED_STATE:
139 case REJECTED_STATE:
145 throw new TargetInvocationException(m_error);
140 throw new TargetInvocationException(m_error);
146 default:
141 default:
147 throw new ApplicationException(String.Format("Invalid promise state {0}", m_state));
142 throw new ApplicationException(String.Format("The promise state {0} is invalid", m_state));
148 }
143 }
149 }
144 }
150 #endregion
145 #endregion
151
146
152 #region handlers managment
147 #region handlers managment
153
148
154 protected void AddHandler(THandler handler) {
149 protected void AddHandler(THandler handler) {
155
150
156 if (m_state > 1) {
151 if (m_state > 1) {
157 // the promise is in the resolved state, just invoke the handler
152 // the promise is in the resolved state, just invoke the handler
158 SignalHandler(handler, m_state);
153 SignalHandler(handler, m_state);
159 } else {
154 } else {
160 var slot = Interlocked.Increment(ref m_handlersCount) - 1;
155 var slot = Interlocked.Increment(ref m_handlersCount) - 1;
161
156
162 if (slot < RESERVED_HANDLERS_COUNT) {
157 if (slot < RESERVED_HANDLERS_COUNT) {
163
158
164 if (slot == 0) {
159 if (slot == 0) {
165 m_handlers = new THandler[RESERVED_HANDLERS_COUNT];
160 m_handlers = new THandler[RESERVED_HANDLERS_COUNT];
166 } else {
161 } else {
167 while (m_handlers == null)
162 while (m_handlers == null)
168 Thread.MemoryBarrier();
163 Thread.MemoryBarrier();
169 }
164 }
170
165
171 m_handlers[slot] = handler;
166 m_handlers[slot] = handler;
172
167
173 while (slot != Interlocked.CompareExchange(ref m_handlersCommited, slot + 1, slot)) {
168 while (slot != Interlocked.CompareExchange(ref m_handlersCommited, slot + 1, slot)) {
174 }
169 }
175
170
176 if (m_state > 1) {
171 if (m_state > 1) {
177 do {
172 do {
178 var hp = m_handlerPointer;
173 var hp = m_handlerPointer;
179 slot = hp + 1;
174 slot = hp + 1;
180 if (slot < m_handlersCommited) {
175 if (slot < m_handlersCommited) {
181 if (Interlocked.CompareExchange(ref m_handlerPointer, slot, hp) != hp)
176 if (Interlocked.CompareExchange(ref m_handlerPointer, slot, hp) != hp)
182 continue;
177 continue;
183 SignalHandler(m_handlers[slot], m_state);
178 SignalHandler(m_handlers[slot], m_state);
184 }
179 }
185 break;
180 break;
186 } while(true);
181 } while(true);
187 }
182 }
188 } else {
183 } else {
189 if (slot == RESERVED_HANDLERS_COUNT) {
184 if (slot == RESERVED_HANDLERS_COUNT) {
190 m_extraHandlers = new MTQueue<THandler>();
185 m_extraHandlers = new MTQueue<THandler>();
191 } else {
186 } else {
192 while (m_extraHandlers == null)
187 while (m_extraHandlers == null)
193 Thread.MemoryBarrier();
188 Thread.MemoryBarrier();
194 }
189 }
195
190
196 m_extraHandlers.Enqueue(handler);
191 m_extraHandlers.Enqueue(handler);
197
192
198 if (m_state > 1 && m_extraHandlers.TryDequeue(out handler))
193 if (m_state > 1 && m_extraHandlers.TryDequeue(out handler))
199 // if the promise have been resolved while we was adding the handler to the queue
194 // if the promise have been resolved while we was adding the handler to the queue
200 // we can't guarantee that someone is still processing it
195 // we can't guarantee that someone is still processing it
201 // therefore we need to fetch a handler from the queue and execute it
196 // therefore we need to fetch a handler from the queue and execute it
202 // note that fetched handler may be not the one that we have added
197 // note that fetched handler may be not the one that we have added
203 // even we can fetch no handlers at all :)
198 // even we can fetch no handlers at all :)
204 SignalHandler(handler, m_state);
199 SignalHandler(handler, m_state);
205 }
200 }
206 }
201 }
207 }
202 }
208
203
209 #endregion
204 #endregion
210
205
211 #region IPromise implementation
206 #region IPromise implementation
212
207
213 public bool IsResolved {
208 public bool IsResolved {
214 get {
209 get {
215 Thread.MemoryBarrier();
210 Thread.MemoryBarrier();
216 return m_state > 1;
211 return m_state > 1;
217 }
212 }
218 }
213 }
219
214
220 public bool IsCancelled {
215 public bool IsCancelled {
221 get {
216 get {
222 Thread.MemoryBarrier();
217 Thread.MemoryBarrier();
223 return m_state == CANCELLED_STATE;
218 return m_state == CANCELLED_STATE;
224 }
219 }
225 }
220 }
226
221
227 #endregion
222 #endregion
228
223
229 public Exception Error {
224 public Exception Error {
230 get {
225 get {
231 return m_error;
226 return m_error;
232 }
227 }
233 }
228 }
234
229
235 public bool CancelOperationIfRequested() {
230 public bool CancelOperationIfRequested() {
236 if (IsCancellationRequested) {
231 if (IsCancellationRequested) {
237 CancelOperation(CancellationReason);
232 CancelOperation(CancellationReason);
238 return true;
233 return true;
239 }
234 }
240 return false;
235 return false;
241 }
236 }
242
237
243 public virtual void CancelOperation(Exception reason) {
238 public virtual void CancelOperation(Exception reason) {
244 SetCancelled(reason);
239 SetCancelled(reason);
245 }
240 }
246
241
247 public void CancellationRequested(Action<Exception> handler) {
242 public void CancellationRequested(Action<Exception> handler) {
248 Safe.ArgumentNotNull(handler, "handler");
243 Safe.ArgumentNotNull(handler, "handler");
249 if (IsCancellationRequested)
244 if (IsCancellationRequested)
250 handler(CancellationReason);
245 handler(CancellationReason);
251
246
252 if (m_cancelationHandlers == null)
247 if (m_cancelationHandlers == null)
253 Interlocked.CompareExchange(ref m_cancelationHandlers, new MTQueue<Action<Exception>>(), null);
248 Interlocked.CompareExchange(ref m_cancelationHandlers, new MTQueue<Action<Exception>>(), null);
254
249
255 m_cancelationHandlers.Enqueue(handler);
250 m_cancelationHandlers.Enqueue(handler);
256
251
257 if (IsCancellationRequested && m_cancelationHandlers.TryDequeue(out handler))
252 if (IsCancellationRequested && m_cancelationHandlers.TryDequeue(out handler))
258 // TryDeque implies MemoryBarrier()
253 // TryDeque implies MemoryBarrier()
259 handler(m_cancelationReason);
254 handler(m_cancelationReason);
260 }
255 }
261
256
262 public bool IsCancellationRequested {
257 public bool IsCancellationRequested {
263 get {
258 get {
264 do {
259 do {
265 if (m_cancelRequest == CANCEL_NOT_REQUESTED)
260 if (m_cancelRequest == CANCEL_NOT_REQUESTED)
266 return false;
261 return false;
267 if (m_cancelRequest == CANCEL_REQUESTED)
262 if (m_cancelRequest == CANCEL_REQUESTED)
268 return true;
263 return true;
269 Thread.MemoryBarrier();
264 Thread.MemoryBarrier();
270 } while(true);
265 } while(true);
271 }
266 }
272 }
267 }
273
268
274 public Exception CancellationReason {
269 public Exception CancellationReason {
275 get {
270 get {
276 do {
271 do {
277 Thread.MemoryBarrier();
272 Thread.MemoryBarrier();
278 } while(m_cancelRequest == CANCEL_REQUESTING);
273 } while(m_cancelRequest == CANCEL_REQUESTING);
279
274
280 return m_cancelationReason;
275 return m_cancelationReason;
281 }
276 }
282 }
277 }
283
278
284 #region ICancellable implementation
279 #region ICancellable implementation
285
280
286 public void Cancel() {
281 public void Cancel() {
287 Cancel(null);
282 Cancel(null);
288 }
283 }
289
284
290 public void Cancel(Exception reason) {
285 public void Cancel(Exception reason) {
291 if (CANCEL_NOT_REQUESTED == Interlocked.CompareExchange(ref m_cancelRequest, CANCEL_REQUESTING, CANCEL_NOT_REQUESTED)) {
286 if (CANCEL_NOT_REQUESTED == Interlocked.CompareExchange(ref m_cancelRequest, CANCEL_REQUESTING, CANCEL_NOT_REQUESTED)) {
292 m_cancelationReason = reason;
287 m_cancelationReason = reason;
293 m_cancelRequest = CANCEL_REQUESTED;
288 m_cancelRequest = CANCEL_REQUESTED;
294 if (m_cancelationHandlers != null) {
289 if (m_cancelationHandlers != null) {
295 Action<Exception> handler;
290 Action<Exception> handler;
296 while (m_cancelationHandlers.TryDequeue(out handler))
291 while (m_cancelationHandlers.TryDequeue(out handler))
297 handler(m_cancelationReason);
292 handler(m_cancelationReason);
298 }
293 }
299 }
294 }
300 }
295 }
301
296
302 #endregion
297 #endregion
303 }
298 }
304 }
299 }
305
300
@@ -1,142 +1,142
1 using System;
1 using System;
2 using Implab.Parallels;
2 using Implab.Parallels;
3
3
4 namespace Implab {
4 namespace Implab {
5 public abstract class AbstractPromise : AbstractEvent<AbstractPromise.HandlerDescriptor>, IPromise {
5 public abstract class AbstractPromise : AbstractEvent<AbstractPromise.HandlerDescriptor>, IPromise {
6 public struct HandlerDescriptor {
6 public struct HandlerDescriptor {
7 readonly Action m_handler;
7 readonly Action m_handler;
8 readonly Action<Exception> m_error;
8 readonly Action<Exception> m_error;
9 readonly Action<Exception> m_cancel;
9 readonly Action<Exception> m_cancel;
10 readonly PromiseEventType m_mask;
10 readonly PromiseEventType m_mask;
11
11
12 public HandlerDescriptor(Action success, Action<Exception> error, Action<Exception> cancel) {
12 public HandlerDescriptor(Action success, Action<Exception> error, Action<Exception> cancel) {
13 m_handler = success;
13 m_handler = success;
14 m_error = error;
14 m_error = error;
15 m_cancel = cancel;
15 m_cancel = cancel;
16 m_mask = PromiseEventType.Success;
16 m_mask = PromiseEventType.Success;
17 }
17 }
18
18
19 public HandlerDescriptor(Action handler, PromiseEventType mask) {
19 public HandlerDescriptor(Action handler, PromiseEventType mask) {
20 m_handler = handler;
20 m_handler = handler;
21 m_error = null;
21 m_error = null;
22 m_cancel = null;
22 m_cancel = null;
23 m_mask = mask;
23 m_mask = mask;
24 }
24 }
25
25
26 public void SignalSuccess() {
26 public void SignalSuccess() {
27 if ((m_mask & PromiseEventType.Success) != 0 && m_handler != null) {
27 if ((m_mask & PromiseEventType.Success) != 0 && m_handler != null) {
28 try {
28 try {
29 m_handler();
29 m_handler();
30 } catch (Exception err) {
30 } catch (Exception err) {
31 // avoid calling handler twice in case of error
31 // avoid calling handler twice in case of error
32 if (m_error != null)
32 if (m_error != null)
33 SignalError(err);
33 SignalError(err);
34 }
34 }
35 }
35 }
36 }
36 }
37
37
38 public void SignalError(Exception err) {
38 public void SignalError(Exception err) {
39 if (m_error != null) {
39 if (m_error != null) {
40 try {
40 try {
41 m_error(err);
41 m_error(err);
42 // Analysis disable once EmptyGeneralCatchClause
42 // Analysis disable once EmptyGeneralCatchClause
43 } catch {
43 } catch {
44 }
44 }
45 } else if ((m_mask & PromiseEventType.Error ) != 0 && m_handler != null) {
45 } else if ((m_mask & PromiseEventType.Error ) != 0 && m_handler != null) {
46 try {
46 try {
47 m_handler();
47 m_handler();
48 // Analysis disable once EmptyGeneralCatchClause
48 // Analysis disable once EmptyGeneralCatchClause
49 } catch {
49 } catch {
50 }
50 }
51 }
51 }
52 }
52 }
53
53
54 public void SignalCancel(Exception reason) {
54 public void SignalCancel(Exception reason) {
55 if (m_cancel != null) {
55 if (m_cancel != null) {
56 try {
56 try {
57 m_cancel(reason);
57 m_cancel(reason);
58 } catch (Exception err) {
58 } catch (Exception err) {
59 SignalError(err);
59 SignalError(err);
60 }
60 }
61 } else if ( (m_mask & PromiseEventType.Cancelled) != 0 && m_handler != null) {
61 } else if ( (m_mask & PromiseEventType.Cancelled) != 0 && m_handler != null) {
62 try {
62 try {
63 m_handler();
63 m_handler();
64 // Analysis disable once EmptyGeneralCatchClause
64 // Analysis disable once EmptyGeneralCatchClause
65 } catch {
65 } catch {
66 }
66 }
67 }
67 }
68 }
68 }
69 }
69 }
70
70
71
71
72 #region implemented abstract members of AbstractPromise
72 #region implemented abstract members of AbstractPromise
73
73
74 protected override void SignalHandler(HandlerDescriptor handler, int signal) {
74 protected override void SignalHandler(HandlerDescriptor handler, int signal) {
75 switch (signal) {
75 switch (signal) {
76 case SUCCEEDED_STATE:
76 case SUCCEEDED_STATE:
77 handler.SignalSuccess();
77 handler.SignalSuccess();
78 break;
78 break;
79 case REJECTED_STATE:
79 case REJECTED_STATE:
80 handler.SignalError(Error);
80 handler.SignalError(Error);
81 break;
81 break;
82 case CANCELLED_STATE:
82 case CANCELLED_STATE:
83 handler.SignalCancel(CancellationReason);
83 handler.SignalCancel(CancellationReason);
84 break;
84 break;
85 default:
85 default:
86 throw new InvalidOperationException(String.Format("Invalid promise signal: {0}", signal));
86 throw new InvalidOperationException(String.Format("Invalid promise signal: {0}", signal));
87 }
87 }
88 }
88 }
89
89
90 protected override Signal GetResolveSignal() {
90 protected override Signal GetResolveSignal() {
91 var signal = new Signal();
91 var signal = new Signal();
92 On(signal.Set, PromiseEventType.All);
92 On(signal.Set, PromiseEventType.All);
93 return signal;
93 return signal;
94 }
94 }
95
95
96 #endregion
96 #endregion
97
97
98 public Type PromiseType {
98 public Type PromiseType {
99 get {
99 get {
100 return typeof(void);
100 return typeof(void);
101 }
101 }
102 }
102 }
103
103
104 public IPromise On(Action success, Action<Exception> error, Action<Exception> cancel) {
104 public IPromise On(Action success, Action<Exception> error, Action<Exception> cancel) {
105 AddHandler(new HandlerDescriptor(success, error, cancel));
105 AddHandler(new HandlerDescriptor(success, error, cancel));
106 return this;
106 return this;
107 }
107 }
108
108
109 public IPromise On(Action success, Action<Exception> error) {
109 public IPromise On(Action success, Action<Exception> error) {
110 AddHandler(new HandlerDescriptor(success, error, null));
110 AddHandler(new HandlerDescriptor(success, error, null));
111 return this;
111 return this;
112 }
112 }
113
113
114 public IPromise On(Action success) {
114 public IPromise On(Action success) {
115 AddHandler(new HandlerDescriptor(success, null, null));
115 AddHandler(new HandlerDescriptor(success, null, null));
116 return this;
116 return this;
117 }
117 }
118
118
119 public IPromise On(Action handler, PromiseEventType events) {
119 public IPromise On(Action handler, PromiseEventType events) {
120 AddHandler(new HandlerDescriptor(handler,events));
120 AddHandler(new HandlerDescriptor(handler,events));
121 return this;
121 return this;
122 }
122 }
123
123
124 public IPromise<T> Cast<T>() {
124 public IPromise<T> Cast<T>() {
125 throw new InvalidCastException();
125 throw new InvalidCastException();
126 }
126 }
127
127
128 public void Join() {
128 public void Join() {
129 WaitResult(-1);
129 WaitResult(-1);
130 }
130 }
131
131
132 public void Join(int timeout) {
132 public void Join(int timeout) {
133 WaitResult(timeout);
133 WaitResult(timeout);
134 }
134 }
135
135
136 protected void SetResult() {
136 protected void SetResult() {
137 BeginSetResult();
137 if(BeginSetResult())
138 EndSetResult();
138 EndSetResult();
139 }
139 }
140 }
140 }
141 }
141 }
142
142
@@ -1,58 +1,75
1 using System;
1 using System;
2 using System.Threading;
2 using System.Threading;
3
3
4 namespace Implab {
4 namespace Implab {
5 public class ActionChainTaskBase : AbstractPromise {
5 public class ActionChainTaskBase : AbstractPromise {
6 readonly Func<Exception, IPromise> m_error;
6 readonly Func<Exception, IPromise> m_error;
7 readonly Func<Exception, IPromise> m_cancel;
7 readonly Func<Exception, IPromise> m_cancel;
8
8
9 int m_cancelationLock;
9 int m_cancelationLock;
10
10
11 protected ActionChainTaskBase(Func<Exception, IPromise> error, Func<Exception, IPromise> cancel, bool autoCancellable) {
11 protected ActionChainTaskBase(Func<Exception, IPromise> error, Func<Exception, IPromise> cancel, bool autoCancellable) {
12 m_error = error;
12 m_error = error;
13 m_cancel = cancel;
13 m_cancel = cancel;
14 if (autoCancellable)
14 if (autoCancellable)
15 CancellationRequested(CancelOperation);
15 CancellationRequested(CancelOperation);
16 }
16 }
17
17
18 public void Reject(Exception error) {
18 public void Reject(Exception error) {
19 if (LockCancelation())
19 if (LockCancelation())
20 HandleErrorInternal(error);
20 HandleErrorInternal(error);
21 }
21 }
22
22
23
24
25 public override void CancelOperation(Exception reason) {
23 public override void CancelOperation(Exception reason) {
26 if (LockCancelation()) {
24 if (LockCancelation()) {
25 if (!(reason is OperationCanceledException))
26 reason = reason != null ? new OperationCanceledException(null, reason) : new OperationCanceledException();
27
27 if (m_cancel != null) {
28 if (m_cancel != null) {
28 try {
29 try {
29 m_cancel(reason).On(SetResult, SetError, SetCancelled);
30 m_cancel(reason).On(SetResult, HandleErrorInternal, HandleCancelInternal);
30 } catch (Exception err) {
31 } catch (Exception err) {
31 HandleErrorInternal(err);
32 HandleErrorInternal(err);
32 }
33 }
33 } else {
34 } else {
34 SetCancelled(reason);
35 HandleErrorInternal(reason);
35 }
36 }
36 }
37 }
37 }
38 }
38
39
39 protected void HandleErrorInternal(Exception error) {
40 void HandleCancelInternal(Exception reason) {
41 if (!(reason is OperationCanceledException))
42 reason = reason != null ? new OperationCanceledException(null, reason) : new OperationCanceledException();
43 HandleErrorInternal(reason);
44 }
45
46 void HandleErrorInternal(Exception error) {
40 if (m_error != null) {
47 if (m_error != null) {
41 try {
48 try {
42 var p = m_error(error);
49 var p = m_error(error);
43 p.On(SetResult,SetError,SetCancelled);
50 p.On(SetResult, SetError, SetCancelled);
44 CancellationRequested(p.Cancel);
51 CancellationRequested(p.Cancel);
45 } catch (Exception err) {
52 } catch (Exception err) {
46 SetError(err);
53 error = err;
47 }
54 }
48 } else {
55 } else {
56 SetErrorInternal(error);
57 }
58 }
59
60 void SetErrorInternal(Exception error) {
61 while (error is PromiseTransientException)
62 error = error.InnerException;
63
64 if (error is OperationCanceledException)
65 SetCancelled(error);
66 else
49 SetError(error);
67 SetError(error);
50 }
51 }
68 }
52
69
53 protected bool LockCancelation() {
70 protected bool LockCancelation() {
54 return 0 == Interlocked.CompareExchange(ref m_cancelationLock, 1, 0);
71 return 0 == Interlocked.CompareExchange(ref m_cancelationLock, 1, 0);
55 }
72 }
56 }
73 }
57 }
74 }
58
75
@@ -1,262 +1,255
1 using System;
1 using System;
2
2
3 namespace Implab.Components {
3 namespace Implab.Components {
4 public abstract class RunnableComponent : IDisposable, IRunnable, IInitializable {
4 public abstract class RunnableComponent : IDisposable, IRunnable, IInitializable {
5 enum Commands {
5 enum Commands {
6 Ok = 0,
6 Ok = 0,
7 Fail,
7 Fail,
8 Init,
8 Init,
9 Start,
9 Start,
10 Stop,
10 Stop,
11 Dispose,
11 Dispose,
12 Last = Dispose
12 Last = Dispose
13 }
13 }
14
14
15 class StateMachine {
15 class StateMachine {
16 static readonly ExecutionState[,] _transitions;
16 static readonly ExecutionState[,] _transitions;
17
17
18 static StateMachine() {
18 static StateMachine() {
19 _transitions = new ExecutionState[(int)ExecutionState.Last + 1, (int)Commands.Last + 1];
19 _transitions = new ExecutionState[(int)ExecutionState.Last + 1, (int)Commands.Last + 1];
20
20
21 Edge(ExecutionState.Created, ExecutionState.Initializing, Commands.Init);
21 Edge(ExecutionState.Created, ExecutionState.Initializing, Commands.Init);
22 Edge(ExecutionState.Created, ExecutionState.Disposed, Commands.Dispose);
22 Edge(ExecutionState.Created, ExecutionState.Disposed, Commands.Dispose);
23
23
24 Edge(ExecutionState.Initializing, ExecutionState.Ready, Commands.Ok);
24 Edge(ExecutionState.Initializing, ExecutionState.Ready, Commands.Ok);
25 Edge(ExecutionState.Initializing, ExecutionState.Failed, Commands.Fail);
25 Edge(ExecutionState.Initializing, ExecutionState.Failed, Commands.Fail);
26
26
27 Edge(ExecutionState.Ready, ExecutionState.Starting, Commands.Start);
27 Edge(ExecutionState.Ready, ExecutionState.Starting, Commands.Start);
28 Edge(ExecutionState.Ready, ExecutionState.Disposed, Commands.Dispose);
28 Edge(ExecutionState.Ready, ExecutionState.Disposed, Commands.Dispose);
29
29
30 Edge(ExecutionState.Starting, ExecutionState.Running, Commands.Ok);
30 Edge(ExecutionState.Starting, ExecutionState.Running, Commands.Ok);
31 Edge(ExecutionState.Starting, ExecutionState.Failed, Commands.Fail);
31 Edge(ExecutionState.Starting, ExecutionState.Failed, Commands.Fail);
32 Edge(ExecutionState.Starting, ExecutionState.Stopping, Commands.Stop);
32 Edge(ExecutionState.Starting, ExecutionState.Stopping, Commands.Stop);
33 Edge(ExecutionState.Starting, ExecutionState.Disposed, Commands.Dispose);
33 Edge(ExecutionState.Starting, ExecutionState.Disposed, Commands.Dispose);
34
34
35 Edge(ExecutionState.Running, ExecutionState.Failed, Commands.Fail);
35 Edge(ExecutionState.Running, ExecutionState.Failed, Commands.Fail);
36 Edge(ExecutionState.Running, ExecutionState.Stopping, Commands.Stop);
36 Edge(ExecutionState.Running, ExecutionState.Stopping, Commands.Stop);
37 Edge(ExecutionState.Running, ExecutionState.Disposed, Commands.Dispose);
37 Edge(ExecutionState.Running, ExecutionState.Disposed, Commands.Dispose);
38
38
39 Edge(ExecutionState.Stopping, ExecutionState.Failed, Commands.Fail);
39 Edge(ExecutionState.Stopping, ExecutionState.Failed, Commands.Fail);
40 Edge(ExecutionState.Stopping, ExecutionState.Disposed, Commands.Ok);
40 Edge(ExecutionState.Stopping, ExecutionState.Disposed, Commands.Ok);
41
41
42 Edge(ExecutionState.Failed, ExecutionState.Disposed, Commands.Dispose);
42 Edge(ExecutionState.Failed, ExecutionState.Disposed, Commands.Dispose);
43 }
43 }
44
44
45 static void Edge(ExecutionState s1, ExecutionState s2, Commands cmd) {
45 static void Edge(ExecutionState s1, ExecutionState s2, Commands cmd) {
46 _transitions[(int)s1, (int)cmd] = s2;
46 _transitions[(int)s1, (int)cmd] = s2;
47 }
47 }
48
48
49 public ExecutionState State {
49 public ExecutionState State {
50 get;
50 get;
51 private set;
51 private set;
52 }
52 }
53
53
54 public StateMachine(ExecutionState initial) {
54 public StateMachine(ExecutionState initial) {
55 State = initial;
55 State = initial;
56 }
56 }
57
57
58 public bool Move(Commands cmd) {
58 public bool Move(Commands cmd) {
59 var next = _transitions[(int)State, (int)cmd];
59 var next = _transitions[(int)State, (int)cmd];
60 if (next == ExecutionState.Undefined)
60 if (next == ExecutionState.Undefined)
61 return false;
61 return false;
62 State = next;
62 State = next;
63 return true;
63 return true;
64 }
64 }
65 }
65 }
66
66
67 IPromise m_pending;
67 IPromise m_pending;
68 Exception m_lastError;
68 Exception m_lastError;
69
69
70 readonly StateMachine m_stateMachine;
70 readonly StateMachine m_stateMachine;
71
71
72 protected RunnableComponent(bool initialized) {
72 protected RunnableComponent(bool initialized) {
73 m_stateMachine = new StateMachine(initialized ? ExecutionState.Ready : ExecutionState.Created);
73 m_stateMachine = new StateMachine(initialized ? ExecutionState.Ready : ExecutionState.Created);
74 }
74 }
75
75
76 protected virtual int DisposeTimeout {
76 protected virtual int DisposeTimeout {
77 get {
77 get {
78 return 10000;
78 return 10000;
79 }
79 }
80 }
80 }
81
81
82 void ThrowInvalidCommand(Commands cmd) {
82 void ThrowInvalidCommand(Commands cmd) {
83 if (m_stateMachine.State == ExecutionState.Disposed)
83 if (m_stateMachine.State == ExecutionState.Disposed)
84 throw new ObjectDisposedException(ToString());
84 throw new ObjectDisposedException(ToString());
85
85
86 throw new InvalidOperationException(String.Format("Commnd {0} is not allowed in the state {1}", cmd, m_stateMachine.State));
86 throw new InvalidOperationException(String.Format("Commnd {0} is not allowed in the state {1}", cmd, m_stateMachine.State));
87 }
87 }
88
88
89 void Move(Commands cmd) {
89 void Move(Commands cmd) {
90 if (!m_stateMachine.Move(cmd))
90 if (!m_stateMachine.Move(cmd))
91 ThrowInvalidCommand(cmd);
91 ThrowInvalidCommand(cmd);
92 }
92 }
93
93
94 void Invoke(Commands cmd, Action action) {
94 void Invoke(Commands cmd, Action action) {
95 lock (m_stateMachine)
95 lock (m_stateMachine)
96 Move(cmd);
96 Move(cmd);
97
97
98 try {
98 try {
99 action();
99 action();
100 lock(m_stateMachine)
100 lock(m_stateMachine)
101 Move(Commands.Ok);
101 Move(Commands.Ok);
102
102
103 } catch (Exception err) {
103 } catch (Exception err) {
104 lock (m_stateMachine) {
104 lock (m_stateMachine) {
105 Move(Commands.Fail);
105 Move(Commands.Fail);
106 m_lastError = err;
106 m_lastError = err;
107 }
107 }
108 throw;
108 throw;
109 }
109 }
110 }
110 }
111
111
112 IPromise InvokeAsync(Commands cmd, Func<IPromise> action, Action<IPromise, IDeferred> chain) {
112 IPromise InvokeAsync(Commands cmd, Func<IPromise> action, Action<IPromise, IDeferred> chain) {
113 IPromise promise = null;
113 IPromise promise = null;
114 IPromise prev;
114 IPromise prev;
115
115
116 var task = new ActionChainTask(action, null, null, true);
116 var task = new ActionChainTask(action, null, null, true);
117
117
118 lock (m_stateMachine) {
118 lock (m_stateMachine) {
119 Move(cmd);
119 Move(cmd);
120
120
121 prev = m_pending;
121 prev = m_pending;
122
122
123 promise = task.Then(
123 promise = task.Then(
124 () => {
124 () => {
125 lock(m_stateMachine) {
125 lock(m_stateMachine) {
126 if (m_pending == promise) {
126 if (m_pending == promise) {
127 Move(Commands.Ok);
127 Move(Commands.Ok);
128 m_pending = null;
128 m_pending = null;
129 }
129 }
130 }
130 }
131 }, e => {
131 }, e => {
132 lock(m_stateMachine) {
132 lock(m_stateMachine) {
133 if (m_pending == promise) {
133 if (m_pending == promise) {
134 Move(Commands.Fail);
134 Move(Commands.Fail);
135 m_pending = null;
135 m_pending = null;
136 m_lastError = e;
136 m_lastError = e;
137 }
137 }
138 }
138 }
139 throw new PromiseTransientException(e);
139 throw new PromiseTransientException(e);
140 },
140 },
141 r => {
141 r => {
142 lock(m_stateMachine) {
142 // handle cancellation as exception
143 if (m_pending == promise) {
144 Move(Commands.Fail);
145 m_pending = null;
146 m_lastError = new OperationCanceledException("The operation has been cancelled", r);
147 }
148
149 }
150 throw new OperationCanceledException("The operation has been cancelled", r);
143 throw new OperationCanceledException("The operation has been cancelled", r);
151 }
144 }
152 );
145 );
153
146
154 m_pending = promise;
147 m_pending = promise;
155 }
148 }
156
149
157 if (prev == null)
150 if (prev == null)
158 task.Resolve();
151 task.Resolve();
159 else
152 else
160 chain(prev, task);
153 chain(prev, task);
161
154
162 return promise;
155 return promise;
163 }
156 }
164
157
165
158
166 #region IInitializable implementation
159 #region IInitializable implementation
167
160
168 public void Init() {
161 public void Init() {
169 Invoke(Commands.Init, OnInitialize);
162 Invoke(Commands.Init, OnInitialize);
170 }
163 }
171
164
172 protected virtual void OnInitialize() {
165 protected virtual void OnInitialize() {
173 }
166 }
174
167
175 #endregion
168 #endregion
176
169
177 #region IRunnable implementation
170 #region IRunnable implementation
178
171
179 public IPromise Start() {
172 public IPromise Start() {
180 return InvokeAsync(Commands.Start, OnStart, null);
173 return InvokeAsync(Commands.Start, OnStart, null);
181 }
174 }
182
175
183 protected virtual IPromise OnStart() {
176 protected virtual IPromise OnStart() {
184 return Promise.SUCCESS;
177 return Promise.SUCCESS;
185 }
178 }
186
179
187 public IPromise Stop() {
180 public IPromise Stop() {
188 return InvokeAsync(Commands.Stop, OnStop, StopPending).Then(Dispose);
181 return InvokeAsync(Commands.Stop, OnStop, StopPending).Then(Dispose);
189 }
182 }
190
183
191 protected virtual IPromise OnStop() {
184 protected virtual IPromise OnStop() {
192 return Promise.SUCCESS;
185 return Promise.SUCCESS;
193 }
186 }
194
187
195 /// <summary>
188 /// <summary>
196 /// Stops the current operation if one exists.
189 /// Stops the current operation if one exists.
197 /// </summary>
190 /// </summary>
198 /// <param name="current">Current.</param>
191 /// <param name="current">Current.</param>
199 /// <param name="stop">Stop.</param>
192 /// <param name="stop">Stop.</param>
200 protected virtual void StopPending(IPromise current, IDeferred stop) {
193 protected virtual void StopPending(IPromise current, IDeferred stop) {
201 if (current == null) {
194 if (current == null) {
202 stop.Resolve();
195 stop.Resolve();
203 } else {
196 } else {
204 current.On(stop.Resolve, stop.Reject, stop.CancelOperation);
197 current.On(stop.Resolve, stop.Reject, e => stop.Resolve());
205 current.Cancel();
198 current.Cancel();
206 }
199 }
207 }
200 }
208
201
209 public ExecutionState State {
202 public ExecutionState State {
210 get {
203 get {
211 return m_stateMachine.State;
204 return m_stateMachine.State;
212 }
205 }
213 }
206 }
214
207
215 public Exception LastError {
208 public Exception LastError {
216 get {
209 get {
217 return m_lastError;
210 return m_lastError;
218 }
211 }
219 }
212 }
220
213
221 #endregion
214 #endregion
222
215
223 #region IDisposable implementation
216 #region IDisposable implementation
224
217
225 public void Dispose() {
218 public void Dispose() {
226 IPromise pending;
219 IPromise pending;
227 lock (m_stateMachine) {
220 lock (m_stateMachine) {
228 if (m_stateMachine.State == ExecutionState.Disposed)
221 if (m_stateMachine.State == ExecutionState.Disposed)
229 return;
222 return;
230
223
231 Move(Commands.Dispose);
224 Move(Commands.Dispose);
232
225
233 GC.SuppressFinalize(this);
226 GC.SuppressFinalize(this);
234
227
235 pending = m_pending;
228 pending = m_pending;
236 m_pending = null;
229 m_pending = null;
237 }
230 }
238 if (pending != null) {
231 if (pending != null) {
239 pending.Cancel();
232 pending.Cancel();
240 pending.Timeout(DisposeTimeout).On(
233 pending.Timeout(DisposeTimeout).On(
241 () => Dispose(true, null),
234 () => Dispose(true, null),
242 err => Dispose(true, err),
235 err => Dispose(true, err),
243 reason => Dispose(true, new OperationCanceledException("The operation is cancelled", reason))
236 reason => Dispose(true, new OperationCanceledException("The operation is cancelled", reason))
244 );
237 );
245 } else {
238 } else {
246 Dispose(true, m_lastError);
239 Dispose(true, m_lastError);
247 }
240 }
248 }
241 }
249
242
250 ~RunnableComponent() {
243 ~RunnableComponent() {
251 Dispose(false, null);
244 Dispose(false, null);
252 }
245 }
253
246
254 #endregion
247 #endregion
255
248
256 protected virtual void Dispose(bool disposing, Exception lastError) {
249 protected virtual void Dispose(bool disposing, Exception lastError) {
257
250
258 }
251 }
259
252
260 }
253 }
261 }
254 }
262
255
General Comments 0
You need to be logged in to leave comments. Login now