RunnableComponent.cs
272 lines
| 8.6 KiB
| text/x-csharp
|
CSharpLexer
cin
|
r250 | using System; | |
cin
|
r251 | using System.Diagnostics; | |
cin
|
r250 | using System.Threading; | |
using System.Threading.Tasks; | |||
cin
|
r251 | namespace Implab.Components { | |
/// <summary> | |||
/// Base class for implementing components which support start and stop operations, | |||
/// such components may represent running services. | |||
/// </summary> | |||
/// <remarks> | |||
/// This class provides a basic lifecycle from the creation to the | |||
/// termination of the component. | |||
/// </remarks> | |||
public class RunnableComponent : IRunnable, IInitializable, IDisposable { | |||
/// <summary> | |||
/// This class bound <see cref="CancellationTokenSource"/> lifetime to the task, | |||
/// when the task completes the associated token source will be disposed. | |||
/// </summary> | |||
class AsyncOperationDescriptor { | |||
public static AsyncOperationDescriptor None { get; } = new AsyncOperationDescriptor(); | |||
readonly CancellationTokenSource m_cts; | |||
bool m_done; | |||
public CancellationToken Token { | |||
get { return m_cts == null ? CancellationToken.None : m_cts.Token; } | |||
} | |||
public Task Task { get; private set; } | |||
private AsyncOperationDescriptor(Task task, CancellationTokenSource cts) { | |||
m_cts = cts; | |||
Task = Chain(task); | |||
} | |||
private AsyncOperationDescriptor() { | |||
Task = Task.CompletedTask; | |||
} | |||
cin
|
r250 | ||
cin
|
r251 | public void Cancel() { | |
if (m_cts != null) { | |||
lock (m_cts) { | |||
if (!m_done) | |||
m_cts.Cancel(); | |||
} | |||
} | |||
} | |||
void Done() { | |||
if (m_cts != null) { | |||
lock (m_cts) { | |||
m_done = true; | |||
m_cts.Dispose(); | |||
} | |||
} else { | |||
m_done = true; | |||
} | |||
} | |||
async Task Chain(Task other) { | |||
try { | |||
await other; | |||
} finally { | |||
Done(); | |||
} | |||
} | |||
public static AsyncOperationDescriptor Create(Func<CancellationToken, Task> factory, CancellationToken ct) { | |||
var cts = ct.CanBeCanceled ? | |||
CancellationTokenSource.CreateLinkedTokenSource(ct) : | |||
new CancellationTokenSource(); | |||
return new AsyncOperationDescriptor(factory(cts.Token), cts); | |||
} | |||
} | |||
// this lock is used to synchronize state flow of the component during | |||
// completions or the operations. | |||
cin
|
r250 | readonly object m_lock = new object(); | |
cin
|
r251 | // current operation cookie, used to check wheather a call to | |
// MoveSuccess/MoveFailed method belongs to the current | |||
// operation, if cookies didn't match ignore completion result. | |||
object m_cookie; | |||
cin
|
r250 | ||
cin
|
r251 | AsyncOperationDescriptor m_current = AsyncOperationDescriptor.None; | |
ExecutionState m_state; | |||
protected RunnableComponent(bool initialized) { | |||
State = initialized ? ExecutionState.Ready : ExecutionState.Created; | |||
cin
|
r250 | } | |
cin
|
r251 | public Task Completion { | |
get { return m_current.Task; } | |||
} | |||
cin
|
r250 | ||
cin
|
r251 | public ExecutionState State { | |
get { return m_state; } | |||
private set { | |||
if (m_state != value) { | |||
m_state = value; | |||
StateChanged.DispatchEvent(this, new StateChangeEventArgs { | |||
State = value, | |||
LastError = LastError | |||
}); | |||
} | |||
} | |||
} | |||
public Exception LastError { get; private set; } | |||
cin
|
r250 | ||
public event EventHandler<StateChangeEventArgs> StateChanged; | |||
cin
|
r251 | /// <summary> | |
/// Releases all resources used by the current component regardless of its | |||
/// execution state. | |||
/// </summary> | |||
/// <remarks> | |||
/// Calling to this method may result unexpedted results if the component | |||
/// isn't in the stopped state. Call this method after the component is | |||
/// stopped if needed or if the component is in the failed state. | |||
/// </remarks> | |||
cin
|
r250 | public void Dispose() { | |
cin
|
r251 | bool dispose = false; | |
if (dispose) { | |||
cin
|
r250 | Dispose(true); | |
GC.SuppressFinalize(this); | |||
} | |||
} | |||
cin
|
r251 | ~RunnableComponent() { | |
Dispose(false); | |||
} | |||
/// <summary> | |||
/// Releases all resources used by the current component regardless of its | |||
/// execution state. | |||
/// </summary> | |||
/// <param name="disposing">Indicates that the component is disposed | |||
/// during a normal disposing or during GC.</param> | |||
cin
|
r250 | protected virtual void Dispose(bool disposing) { | |
cin
|
r251 | } | |
public void Initialize() { | |||
var cookie = new object(); | |||
if (MoveInitialize(cookie)) | |||
ScheduleTask(InitializeInternalAsync, CancellationToken.None, cookie); | |||
} | |||
/// <summary> | |||
/// This method is used for initialization during a component creation. | |||
/// </summary> | |||
/// <param name="ct">A cancellation token for this operation</param> | |||
/// <remarks> | |||
/// This method should be used for short and mostly syncronous operations, | |||
/// other operations which require time to run shoud be placed in | |||
/// <see cref="StartInternal(CancellationToken)"/> method. | |||
/// </remarks> | |||
protected virtual Task InitializeInternalAsync(CancellationToken ct) { | |||
return Task.CompletedTask; | |||
cin
|
r250 | } | |
public void Start(CancellationToken ct) { | |||
cin
|
r251 | var cookie = new object(); | |
if (MoveStart(cookie)) | |||
ScheduleTask(StartInternal, ct, cookie); | |||
} | |||
protected virtual Task StartInternal(CancellationToken ct) { | |||
return Task.CompletedTask; | |||
} | |||
public void Stop(CancellationToken ct) { | |||
var cookie = new object(); | |||
if (MoveStop(cookie)) | |||
ScheduleTask(StopAsync, ct, cookie); | |||
} | |||
async Task StopAsync(CancellationToken ct) { | |||
m_current.Cancel(); | |||
await Completion; | |||
ct.ThrowIfCancellationRequested(); | |||
await StopInternalAsync(ct); | |||
} | |||
protected virtual Task StopInternalAsync(CancellationToken ct) { | |||
return Task.CompletedTask; | |||
} | |||
#region state management | |||
bool MoveInitialize(object cookie) { | |||
lock (m_lock) { | |||
if (State != ExecutionState.Created) | |||
return false; | |||
State = ExecutionState.Initializing; | |||
m_cookie = cookie; | |||
return true; | |||
} | |||
} | |||
bool MoveStart(object cookie) { | |||
lock (m_lock) { | |||
if (State != ExecutionState.Ready) | |||
return false; | |||
State = ExecutionState.Starting; | |||
m_cookie = cookie; | |||
return true; | |||
} | |||
} | |||
bool MoveStop(object cookie) { | |||
lock (m_lock) { | |||
if (State != ExecutionState.Starting && State != ExecutionState.Running) | |||
return false; | |||
State = ExecutionState.Stopping; | |||
m_cookie = cookie; | |||
return true; | |||
} | |||
} | |||
void MoveSuccess(object cookie) { | |||
lock (m_lock) { | |||
if (m_cookie != cookie) | |||
return; | |||
switch (State) { | |||
case ExecutionState.Initializing: | |||
State = ExecutionState.Ready; | |||
break; | |||
case ExecutionState.Starting: | |||
State = ExecutionState.Running; | |||
break; | |||
case ExecutionState.Stopping: | |||
State = ExecutionState.Stopped; | |||
break; | |||
cin
|
r250 | } | |
} | |||
} | |||
cin
|
r251 | void MoveFailed(Exception err, object cookie) { | |
lock (m_lock) { | |||
if (m_cookie != cookie) | |||
return; | |||
LastError = err; | |||
State = ExecutionState.Failed; | |||
} | |||
cin
|
r250 | } | |
cin
|
r251 | ||
cin
|
r250 | ||
cin
|
r251 | protected async void ScheduleTask(Func<CancellationToken, Task> next, CancellationToken ct, object cookie) { | |
try { | |||
m_current = AsyncOperationDescriptor.Create(next, ct); | |||
await m_current.Task; | |||
MoveSuccess(cookie); | |||
} catch (Exception e) { | |||
MoveFailed(e, cookie); | |||
} | |||
cin
|
r250 | } | |
cin
|
r251 | ||
#endregion | |||
cin
|
r250 | } | |
} |