##// END OF EJS Templates
working on runnable components
cin -
r257:440801d88019 v3
parent child
Show More
@@ -1,290 +1,331
1 using System;
1 using System;
2 using System.Diagnostics;
2 using System.Diagnostics;
3 using System.Threading;
3 using System.Threading;
4 using System.Threading.Tasks;
4 using System.Threading.Tasks;
5
5
6 namespace Implab.Components {
6 namespace Implab.Components {
7 /// <summary>
7 /// <summary>
8 /// Base class for implementing components which support start and stop operations,
8 /// Base class for implementing components which support start and stop operations,
9 /// such components may represent running services.
9 /// such components may represent running services.
10 /// </summary>
10 /// </summary>
11 /// <remarks>
11 /// <remarks>
12 /// This class provides a basic lifecycle from the creation to the
12 /// This class provides a basic lifecycle from the creation to the
13 /// termination of the component.
13 /// termination of the component.
14 /// </remarks>
14 /// </remarks>
15 public class RunnableComponent : IAsyncComponent, IRunnable, IInitializable, IDisposable {
15 public abstract class RunnableComponent : IAsyncComponent, IRunnable, IInitializable, IDisposable {
16
16
17 /// <summary>
17 /// <summary>
18 /// This class bounds <see cref="CancellationTokenSource"/> lifetime to the task,
18 /// This class bounds <see cref="CancellationTokenSource"/> lifetime to the task,
19 /// when the task completes the associated token source will be disposed.
19 /// when the task completes the associated token source will be disposed.
20 /// </summary>
20 /// </summary>
21 class AsyncOperationDescriptor {
21 class AsyncOperationDescriptor {
22
22
23 public static AsyncOperationDescriptor None { get; } = new AsyncOperationDescriptor();
23 public static AsyncOperationDescriptor None { get; } = new AsyncOperationDescriptor();
24
24
25 readonly CancellationTokenSource m_cts;
25 readonly CancellationTokenSource m_cts;
26
26
27 bool m_done;
27 bool m_done;
28
28
29 public CancellationToken Token {
29 public CancellationToken Token {
30 get { return m_cts == null ? CancellationToken.None : m_cts.Token; }
30 get { return m_cts == null ? CancellationToken.None : m_cts.Token; }
31 }
31 }
32
32
33 public Task Task { get; private set; }
33 public Task Task { get; private set; }
34
34
35 private AsyncOperationDescriptor(Task task, CancellationTokenSource cts) {
35 private AsyncOperationDescriptor(Task task, CancellationTokenSource cts) {
36 m_cts = cts;
36 m_cts = cts;
37 Task = Chain(task);
37 Task = Chain(task);
38 }
38 }
39
39
40 private AsyncOperationDescriptor() {
40 private AsyncOperationDescriptor() {
41 Task = Task.CompletedTask;
41 Task = Task.CompletedTask;
42 }
42 }
43
43
44 public void Cancel() {
44 public void Cancel() {
45 if (m_cts != null) {
45 if (m_cts != null) {
46 lock (m_cts) {
46 lock (m_cts) {
47 if (!m_done)
47 if (!m_done)
48 m_cts.Cancel();
48 m_cts.Cancel();
49 }
49 }
50 }
50 }
51 }
51 }
52
52
53 void Done() {
53 void Done() {
54 if (m_cts != null) {
54 if (m_cts != null) {
55 lock (m_cts) {
55 lock (m_cts) {
56 m_done = true;
56 m_done = true;
57 m_cts.Dispose();
57 m_cts.Dispose();
58 }
58 }
59 } else {
59 } else {
60 m_done = true;
60 m_done = true;
61 }
61 }
62 }
62 }
63
63
64 async Task Chain(Task other) {
64 async Task Chain(Task other) {
65 try {
65 try {
66 await other;
66 await other;
67 } finally {
67 } finally {
68 Done();
68 Done();
69 }
69 }
70 }
70 }
71
71
72 public static AsyncOperationDescriptor Create(Func<CancellationToken, Task> factory, CancellationToken ct) {
72 public static AsyncOperationDescriptor Create(Func<CancellationToken, Task> factory, CancellationToken ct) {
73 var cts = ct.CanBeCanceled ?
73 var cts = ct.CanBeCanceled ?
74 CancellationTokenSource.CreateLinkedTokenSource(ct) :
74 CancellationTokenSource.CreateLinkedTokenSource(ct) :
75 new CancellationTokenSource();
75 new CancellationTokenSource();
76
76
77 return new AsyncOperationDescriptor(factory(cts.Token), cts);
77 return new AsyncOperationDescriptor(factory(cts.Token), cts);
78 }
78 }
79
79
80 }
80 }
81
81
82 // this lock is used to synchronize state flow of the component during
82 // this lock is used to synchronize state flow of the component during
83 // processing calls from a client and internal processes.
83 // processing calls from a client and internal processes.
84 readonly object m_lock = new object();
84 readonly object m_lock = new object();
85
85
86 // current operation cookie, used to check wheather a call to
86 // current operation cookie, used to check wheather a call to
87 // MoveSuccess/MoveFailed method belongs to the current
87 // MoveSuccess/MoveFailed method belongs to the current
88 // operation, if cookies didn't match ignore completion result.
88 // operation, if cookies didn't match ignore completion result.
89 object m_cookie;
89 object m_cookie;
90
90
91 // AsyncOperationDscriptor aggregates a task and it's cancellation token
91 // AsyncOperationDscriptor aggregates a task and it's cancellation token
92 AsyncOperationDescriptor m_current = AsyncOperationDescriptor.None;
92 AsyncOperationDescriptor m_current = AsyncOperationDescriptor.None;
93
93
94 ExecutionState m_state;
94 ExecutionState m_state;
95
95
96 /// <summary>
97 /// ΠžΠ±ΡŠΠ΅ΠΊΡ‚ синхронизации ΠΈΡΠΏΠΎΠ»ΡŒΠ·ΡƒΠ΅Ρ‚ΡΡ для обСспСчСния совмСстного доступа
98 /// ΠΊΠ»ΠΈΠ΅Π½Ρ‚Π° ΠΊΠΎΠΌΠΏΠΎΠ½Π΅Π½Ρ‚Ρ‹ ΠΈ процСссов, ΠΏΡ€ΠΎΡ‚Π΅ΠΊΠ°ΡŽΡ‰ΠΈΡ… Π²Π½ΡƒΡ‚Ρ€ΠΈ ΠΊΠΎΠΌΠΏΠΎΠ½Π΅Π½Ρ‚Ρ‹, ΠΊ ΠΎΠ±Ρ‰Π΅ΠΌΡƒ
99 /// ΡΠΎΡΡ‚ΠΎΡΠ½ΠΈΡŽ, Ρ‚.Π΅.true Ρ‚Π°ΠΊΠΈΠΌ свойствам, ΠΊΠ°ΠΊ <see cref="State"/>,
100 /// <see cref="LastError"/>. ΠžΠ±Ρ€Π°Π±ΠΎΡ‚Ρ‡ΠΈΠΊΠΈ события <see cref="StateChanged"/>
101 /// Π²Ρ‹Π·Ρ‹Π²Π°ΡŽΡ‚ΡΡ ΡƒΠΆΠ΅ с установлСнной Π±Π»ΠΎΠΊΠΈΡ€ΠΎΠ²ΠΊΠΎΠΉ, поэтому Π΄ΠΎΠΏΠΎΠ»Π½ΠΈΡ‚Π΅Π»ΡŒΠ½Π°Ρ
102 /// синхронизация Π½Π΅ трСбуСтся.
103 /// </summary>
104 public object SynchronizationObject { get { return m_lock; } }
96
105
97 protected RunnableComponent(bool initialized) {
106 protected RunnableComponent(bool initialized) {
98 State = initialized ? ExecutionState.Ready : ExecutionState.Created;
107 State = initialized ? ExecutionState.Ready : ExecutionState.Created;
99 }
108 }
100
109
101 public Task Completion {
110 public Task Completion {
102 get { return m_current.Task; }
111 get { return m_current.Task; }
103 }
112 }
104
113
105 public ExecutionState State {
114 public ExecutionState State {
106 get { return m_state; }
115 get { return m_state; }
107 private set {
116 private set {
108 if (m_state != value) {
117 if (m_state != value) {
109 m_state = value;
118 m_state = value;
110 StateChanged.DispatchEvent(this, new StateChangeEventArgs {
119 StateChanged.DispatchEvent(this, new StateChangeEventArgs {
111 State = value,
120 State = value,
112 LastError = LastError
121 LastError = LastError
113 });
122 });
114 }
123 }
115 }
124 }
116 }
125 }
117
126
118 public Exception LastError { get; private set; }
127 public Exception LastError { get; private set; }
119
128
129 /// <summary>
130 /// Π‘ΠΎΠ±Ρ‹Ρ‚ΠΈΠ΅ измСнСния состояния ΠΊΠΎΠΌΠΏΠΎΠ½Π΅Π½Ρ‚Ρ‹.see ΠžΠ±Ρ€Π°Π±ΠΎΡ‚Ρ‡ΠΈΠΊΠΈ Π΄Π°Π½Π½ΠΎΠ³ΠΎ события
131 /// Π²Ρ‹Π·Ρ‹Π²Π°ΡŽΡ‚ΡΡ Π²Π½ΡƒΡ‚Ρ€ΠΈ Π±Π»ΠΎΠΊΠΈΡ€ΠΎΠ²ΠΊΠΈ <see cref="SynchronizationObject"/> ΠΈ Π΄ΠΎΠ»ΠΆΠ½Ρ‹
132 /// Π²Ρ‹ΠΏΠΎΠ»Π½ΡΡ‚ΡŒΡΡ максимально быстро.
133 /// </summary>
120 public event EventHandler<StateChangeEventArgs> StateChanged;
134 public event EventHandler<StateChangeEventArgs> StateChanged;
121
135
122 /// <summary>
136 /// <summary>
123 /// Releases all resources used by the current component regardless of its
137 /// Releases all resources used by the current component regardless of its
124 /// execution state.
138 /// execution state.
125 /// </summary>
139 /// </summary>
126 /// <remarks>
140 /// <remarks>
127 /// Calling to this method may result unexpedted results if the component
141 /// Calling to this method may result unexpedted results if the component
128 /// isn't in the stopped state. Call this method after the component is
142 /// isn't in the stopped state. Call this method after the component is
129 /// stopped if needed or if the component is in the failed state.
143 /// stopped if needed or if the component is in the failed state.
130 /// </remarks>
144 /// </remarks>
131 public void Dispose() {
145 public void Dispose() {
132 bool dispose = false;
146 bool dispose = false;
147 lock (SynchronizationObject) {
148 if (m_state != ExecutionState.Disposed) {
149 dispose = true;
150 m_state = ExecutionState.Disposed;
151 m_cookie = new object();
152 }
153 }
133 if (dispose) {
154 if (dispose) {
134 Dispose(true);
155 Dispose(true);
135 GC.SuppressFinalize(this);
156 GC.SuppressFinalize(this);
136 }
157 }
137 }
158 }
138
159
139 ~RunnableComponent() {
160 ~RunnableComponent() {
140 Dispose(false);
161 Dispose(false);
141 }
162 }
142
163
143 /// <summary>
164 /// <summary>
144 /// Releases all resources used by the current component regardless of its
165 /// Releases all resources used by the current component regardless of its
145 /// execution state.
166 /// execution state.
146 /// </summary>
167 /// </summary>
147 /// <param name="disposing">Indicates that the component is disposed
168 /// <param name="disposing">Indicates that the component is disposed
148 /// during a normal disposing or during GC.</param>
169 /// during a normal disposing or during GC.</param>
149 protected virtual void Dispose(bool disposing) {
170 protected virtual void Dispose(bool disposing) {
150 }
171 }
151
172
152 public void Initialize() {
173 public void Initialize() {
153 var cookie = new object();
174 var cookie = new object();
154 if (MoveInitialize(cookie))
175 if (MoveInitialize(cookie))
155 ScheduleTask(InitializeInternalAsync, CancellationToken.None, cookie);
176 Safe.NoWait(ScheduleTask(InitializeInternalAsync, CancellationToken.None, cookie));
156 else
177 else
157 throw new InvalidOperationException();
178 throw new InvalidOperationException();
158 }
179 }
159
180
160 /// <summary>
181 /// <summary>
161 /// This method is used for initialization during a component creation.
182 /// This method is used for initialization during a component creation.
162 /// </summary>
183 /// </summary>
163 /// <param name="ct">A cancellation token for this operation</param>
184 /// <param name="ct">A cancellation token for this operation</param>
164 /// <remarks>
185 /// <remarks>
165 /// This method should be used for short and mostly syncronous operations,
186 /// This method should be used for short and mostly syncronous operations,
166 /// other operations which require time to run shoud be placed in
187 /// other operations which require time to run shoud be placed in
167 /// <see cref="StartInternalAsync(CancellationToken)"/> method.
188 /// <see cref="StartInternalAsync(CancellationToken)"/> method.
168 /// </remarks>
189 /// </remarks>
169 protected virtual Task InitializeInternalAsync(CancellationToken ct) {
190 protected virtual Task InitializeInternalAsync(CancellationToken ct) {
170 return Task.CompletedTask;
191 return Task.CompletedTask;
171 }
192 }
172
193
173 public void Start(CancellationToken ct) {
194 public void Start(CancellationToken ct) {
174 var cookie = new object();
195 var cookie = new object();
175 if (MoveStart(cookie))
196 if (MoveStart(cookie))
176 ScheduleTask(StartInternalAsync, ct, cookie);
197 Safe.NoWait(ScheduleStartAndRun(ct, cookie));
177 else
198 else
178 throw new InvalidOperationException();
199 throw new InvalidOperationException();
179 }
200 }
180
201
202 async Task ScheduleStartAndRun(CancellationToken ct, object cookie) {
203 try {
204 await ScheduleTask(StartInternalAsync, ct, cookie);
205 RunInternal();
206 } catch (Exception err) {
207 Fail(err);
208 }
209 }
210
181 protected virtual Task StartInternalAsync(CancellationToken ct) {
211 protected virtual Task StartInternalAsync(CancellationToken ct) {
182 return Task.CompletedTask;
212 return Task.CompletedTask;
183 }
213 }
184
214
215 /// <summary>
216 /// This method is called after the component is enetered running state,
217 /// use this method to
218 /// </summary>
219 protected virtual void RunInternal() {
220
221 }
222
185 public void Stop(CancellationToken ct) {
223 public void Stop(CancellationToken ct) {
186 var cookie = new object();
224 var cookie = new object();
187 if (MoveStop(cookie))
225 if (MoveStop(cookie))
188 ScheduleTask(StopAsync, ct, cookie);
226 Safe.NoWait(ScheduleTask(StopAsync, ct, cookie));
189 else
227 else
190 throw new InvalidOperationException();
228 throw new InvalidOperationException();
191 }
229 }
192
230
193 async Task StopAsync(CancellationToken ct) {
231 async Task StopAsync(CancellationToken ct) {
194 m_current.Cancel();
232 m_current.Cancel();
195 await Completion;
233 await Completion;
196
234
197 ct.ThrowIfCancellationRequested();
235 ct.ThrowIfCancellationRequested();
198
236
199 await StopInternalAsync(ct);
237 await StopInternalAsync(ct);
200 }
238 }
201
239
202 protected virtual Task StopInternalAsync(CancellationToken ct) {
240 protected virtual Task StopInternalAsync(CancellationToken ct) {
203 return Task.CompletedTask;
241 return Task.CompletedTask;
204 }
242 }
205
243
206 protected void Fail(Exception err) {
244 protected void Fail(Exception err) {
207 lock(m_lock) {
245 lock(m_lock) {
208 if (m_state != ExecutionState.Running)
246 if (m_state != ExecutionState.Running)
209 return;
247 return;
210 m_cookie = new object();
248 m_cookie = new object();
211 LastError = err;
249 LastError = err;
212 State = ExecutionState.Failed;
250 State = ExecutionState.Failed;
213 }
251 }
214 }
252 }
215
253
216
254
217 #region state management
255 #region state management
218
256
219 bool MoveInitialize(object cookie) {
257 bool MoveInitialize(object cookie) {
220 lock (m_lock) {
258 lock (m_lock) {
221 if (State != ExecutionState.Created)
259 if (State != ExecutionState.Created)
222 return false;
260 return false;
223 State = ExecutionState.Initializing;
261 State = ExecutionState.Initializing;
224 m_cookie = cookie;
262 m_cookie = cookie;
225 return true;
263 return true;
226 }
264 }
227 }
265 }
228
266
229 bool MoveStart(object cookie) {
267 bool MoveStart(object cookie) {
230 lock (m_lock) {
268 lock (m_lock) {
231 if (State != ExecutionState.Ready)
269 if (State != ExecutionState.Ready)
232 return false;
270 return false;
233 State = ExecutionState.Starting;
271 State = ExecutionState.Starting;
234 m_cookie = cookie;
272 m_cookie = cookie;
235 return true;
273 return true;
236 }
274 }
237 }
275 }
238
276
239 bool MoveStop(object cookie) {
277 bool MoveStop(object cookie) {
240 lock (m_lock) {
278 lock (m_lock) {
241 if (State != ExecutionState.Starting && State != ExecutionState.Running)
279 if (State != ExecutionState.Starting && State != ExecutionState.Running)
242 return false;
280 return false;
243 State = ExecutionState.Stopping;
281 State = ExecutionState.Stopping;
244 m_cookie = cookie;
282 m_cookie = cookie;
245 return true;
283 return true;
246 }
284 }
247 }
285 }
248
286
249 void MoveSuccess(object cookie) {
287 void MoveSuccess(object cookie) {
250 lock (m_lock) {
288 lock (m_lock) {
251 if (m_cookie != cookie)
289 if (m_cookie != cookie)
252 return;
290 return;
253 switch (State) {
291 switch (State) {
254 case ExecutionState.Initializing:
292 case ExecutionState.Initializing:
255 State = ExecutionState.Ready;
293 State = ExecutionState.Ready;
256 break;
294 break;
257 case ExecutionState.Starting:
295 case ExecutionState.Starting:
258 State = ExecutionState.Running;
296 State = ExecutionState.Running;
259 break;
297 break;
260 case ExecutionState.Stopping:
298 case ExecutionState.Stopping:
261 State = ExecutionState.Stopped;
299 State = ExecutionState.Stopped;
262 break;
300 break;
263 }
301 }
264 }
302 }
265 }
303 }
266
304
267 void MoveFailed(Exception err, object cookie) {
305 void MoveFailed(Exception err, object cookie) {
268 lock (m_lock) {
306 lock (m_lock) {
269 if (m_cookie != cookie)
307 if (m_cookie != cookie)
270 return;
308 return;
271 LastError = err;
309 LastError = err;
272 State = ExecutionState.Failed;
310 State = ExecutionState.Failed;
273 }
311 }
274 }
312 }
275
313
276 void ScheduleTask(Func<CancellationToken, Task> next, CancellationToken ct, object cookie) {
314 Task ScheduleTask(Func<CancellationToken, Task> next, CancellationToken ct, object cookie) {
277
315
278 m_current = AsyncOperationDescriptor.Create(async (x) => {
316 var op = AsyncOperationDescriptor.Create(async (x) => {
279 try {
317 try {
280 await next(x);
318 await next(x);
281 MoveSuccess(cookie);
319 MoveSuccess(cookie);
282 } catch (Exception e) {
320 } catch (Exception e) {
283 MoveFailed(e, cookie);
321 MoveFailed(e, cookie);
284 }
322 }
285 }, ct);
323 }, ct);
324
325 m_current = op;
326 return op.Task;
286 }
327 }
287
328
288 #endregion
329 #endregion
289 }
330 }
290 } No newline at end of file
331 }
General Comments 3
Under Review
author

Auto status change to "Under Review"

Approved
author

ok, latest stable version should be in default

You need to be logged in to leave comments. Login now