RunnableComponent.cs
349 lines
| 11.7 KiB
| text/x-csharp
|
CSharpLexer
cin
|
r289 | using System; | |
using System.Diagnostics; | |||
using System.Threading; | |||
using System.Threading.Tasks; | |||
namespace Implab.Components { | |||
/// <summary> | |||
/// Base class for implementing components which support start and stop operations, | |||
/// such components may represent running services. | |||
/// </summary> | |||
/// <remarks> | |||
/// This class provides a basic lifecycle from the creation to the | |||
/// termination of the component. | |||
/// </remarks> | |||
public abstract class RunnableComponent : IAsyncComponent, IRunnable, IInitializable, IDisposable { | |||
/// <summary> | |||
/// This class bounds <see cref="CancellationTokenSource"/> lifetime to the task, | |||
/// when the task completes the associated token source will be disposed. | |||
/// </summary> | |||
class AsyncOperationDescriptor { | |||
public static AsyncOperationDescriptor None { get; } = new AsyncOperationDescriptor(); | |||
readonly CancellationTokenSource m_cts; | |||
bool m_done; | |||
public CancellationToken Token { | |||
get { return m_cts == null ? CancellationToken.None : m_cts.Token; } | |||
} | |||
public Task Task { get; private set; } | |||
private AsyncOperationDescriptor(Task task, CancellationTokenSource cts) { | |||
m_cts = cts; | |||
Task = Chain(task); | |||
} | |||
private AsyncOperationDescriptor() { | |||
Task = Task.CompletedTask; | |||
} | |||
public void Cancel() { | |||
if (m_cts != null) { | |||
lock (m_cts) { | |||
if (!m_done) | |||
m_cts.Cancel(); | |||
} | |||
} | |||
} | |||
void Done() { | |||
if (m_cts != null) { | |||
lock (m_cts) { | |||
m_done = true; | |||
m_cts.Dispose(); | |||
} | |||
} else { | |||
m_done = true; | |||
} | |||
} | |||
async Task Chain(Task other) { | |||
try { | |||
await other; | |||
} finally { | |||
Done(); | |||
} | |||
} | |||
public static AsyncOperationDescriptor Create(Func<CancellationToken, Task> factory, CancellationToken ct) { | |||
var cts = ct.CanBeCanceled ? | |||
CancellationTokenSource.CreateLinkedTokenSource(ct) : | |||
new CancellationTokenSource(); | |||
return new AsyncOperationDescriptor(factory(cts.Token), cts); | |||
} | |||
} | |||
// this lock is used to synchronize state flow of the component during | |||
// processing calls from a client and internal processes. | |||
readonly object m_lock = new object(); | |||
// current operation cookie, used to check wheather a call to | |||
// MoveSuccess/MoveFailed method belongs to the current | |||
// operation, if cookies didn't match ignore completion result. | |||
object m_cookie; | |||
// AsyncOperationDscriptor aggregates a task and it's cancellation token | |||
AsyncOperationDescriptor m_current = AsyncOperationDescriptor.None; | |||
ExecutionState m_state; | |||
/// <summary> | |||
/// Объект синхронизации используется для обеспечения совместного доступа | |||
/// клиента компоненты и процессов, протекающих внутри компоненты, к общему | |||
/// состоянию, т.е.true таким свойствам, как <see cref="State"/>, | |||
/// <see cref="LastError"/>. Обработчики события <see cref="StateChanged"/> | |||
/// вызываются уже с установленной блокировкой, поэтому дополнительная | |||
/// синхронизация не требуется. | |||
/// </summary> | |||
public object SynchronizationObject { get { return m_lock; } } | |||
protected RunnableComponent(bool initialized) { | |||
State = initialized ? ExecutionState.Ready : ExecutionState.Created; | |||
} | |||
public Task Completion { | |||
get { return m_current.Task; } | |||
} | |||
public ExecutionState State { | |||
get { return m_state; } | |||
private set { | |||
if (m_state != value) { | |||
m_state = value; | |||
StateChanged.DispatchEvent(this, new StateChangeEventArgs { | |||
State = value, | |||
LastError = LastError | |||
}); | |||
} | |||
} | |||
} | |||
public Exception LastError { get; private set; } | |||
/// <summary> | |||
/// Событие изменения состояния компоненты.see Обработчики данного события | |||
/// вызываются внутри блокировки <see cref="SynchronizationObject"/> и должны | |||
/// выполняться максимально быстро. | |||
/// </summary> | |||
public event EventHandler<StateChangeEventArgs> StateChanged; | |||
/// <summary> | |||
/// Releases all resources used by the current component regardless of its | |||
/// execution state. | |||
/// </summary> | |||
/// <remarks> | |||
/// Calling to this method may result unexpedted results if the component | |||
/// isn't in the stopped state. Call this method after the component is | |||
/// stopped if needed or if the component is in the failed state. | |||
/// </remarks> | |||
public void Dispose() { | |||
bool dispose = false; | |||
lock (SynchronizationObject) { | |||
if (m_state != ExecutionState.Disposed) { | |||
dispose = true; | |||
m_state = ExecutionState.Disposed; | |||
m_cookie = new object(); | |||
} | |||
} | |||
if (dispose) { | |||
Dispose(true); | |||
GC.SuppressFinalize(this); | |||
} | |||
} | |||
~RunnableComponent() { | |||
Dispose(false); | |||
} | |||
/// <summary> | |||
/// Releases all resources used by the current component regardless of its | |||
/// execution state. | |||
/// </summary> | |||
/// <param name="disposing">Indicates that the component is disposed | |||
/// during a normal disposing or during GC.</param> | |||
protected virtual void Dispose(bool disposing) { | |||
} | |||
public void Initialize() { | |||
Initialize(CancellationToken.None); | |||
} | |||
public void Initialize(CancellationToken ct) { | |||
var cookie = new object(); | |||
if (MoveInitialize(cookie)) | |||
Safe.NoWait(ScheduleTask(InitializeInternalAsync, ct, cookie)); | |||
else | |||
throw new InvalidOperationException(); | |||
} | |||
/// <summary> | |||
/// This method is used for initialization during a component creation. | |||
/// </summary> | |||
/// <param name="ct">A cancellation token for this operation</param> | |||
/// <remarks> | |||
/// This method should be used for short and mostly syncronous operations, | |||
/// other operations which require time to run shoud be placed in | |||
/// <see cref="StartInternalAsync(CancellationToken)"/> method. | |||
/// </remarks> | |||
protected virtual Task InitializeInternalAsync(CancellationToken ct) { | |||
return Task.CompletedTask; | |||
} | |||
public void Start() { | |||
Start(CancellationToken.None); | |||
} | |||
public void Start(CancellationToken ct) { | |||
var cookie = new object(); | |||
if (MoveStart(cookie)) | |||
Safe.NoWait(ScheduleStartAndRun(ct, cookie)); | |||
else | |||
throw new InvalidOperationException(); | |||
} | |||
async Task ScheduleStartAndRun(CancellationToken ct, object cookie) { | |||
try { | |||
await ScheduleTask(StartInternalAsync, ct, cookie); | |||
RunInternal(); | |||
} catch (Exception err) { | |||
Fail(err); | |||
} | |||
} | |||
protected virtual Task StartInternalAsync(CancellationToken ct) { | |||
return Task.CompletedTask; | |||
} | |||
/// <summary> | |||
/// This method is called after the component is enetered running state, | |||
/// use this method to | |||
/// </summary> | |||
protected virtual void RunInternal() { | |||
} | |||
public void Stop() { | |||
Stop(CancellationToken.None); | |||
} | |||
public void Stop(CancellationToken ct) { | |||
var cookie = new object(); | |||
if (MoveStop(cookie)) | |||
Safe.NoWait(ScheduleTask(StopAsync, ct, cookie)); | |||
else | |||
throw new InvalidOperationException(); | |||
} | |||
async Task StopAsync(CancellationToken ct) { | |||
m_current.Cancel(); | |||
try { | |||
await Completion; | |||
} catch(OperationCanceledException) { | |||
// OK | |||
} | |||
ct.ThrowIfCancellationRequested(); | |||
await StopInternalAsync(ct); | |||
} | |||
protected virtual Task StopInternalAsync(CancellationToken ct) { | |||
return Task.CompletedTask; | |||
} | |||
protected void Fail(Exception err) { | |||
lock(m_lock) { | |||
if (m_state != ExecutionState.Running) | |||
return; | |||
m_cookie = new object(); | |||
LastError = err; | |||
State = ExecutionState.Failed; | |||
} | |||
} | |||
#region state management | |||
bool MoveInitialize(object cookie) { | |||
lock (m_lock) { | |||
if (State != ExecutionState.Created) | |||
return false; | |||
State = ExecutionState.Initializing; | |||
m_cookie = cookie; | |||
return true; | |||
} | |||
} | |||
bool MoveStart(object cookie) { | |||
lock (m_lock) { | |||
if (State != ExecutionState.Ready) | |||
return false; | |||
State = ExecutionState.Starting; | |||
m_cookie = cookie; | |||
return true; | |||
} | |||
} | |||
bool MoveStop(object cookie) { | |||
lock (m_lock) { | |||
if (State != ExecutionState.Starting && State != ExecutionState.Running) | |||
return false; | |||
State = ExecutionState.Stopping; | |||
m_cookie = cookie; | |||
return true; | |||
} | |||
} | |||
void MoveSuccess(object cookie) { | |||
lock (m_lock) { | |||
if (m_cookie != cookie) | |||
return; | |||
switch (State) { | |||
case ExecutionState.Initializing: | |||
State = ExecutionState.Ready; | |||
break; | |||
case ExecutionState.Starting: | |||
State = ExecutionState.Running; | |||
break; | |||
case ExecutionState.Stopping: | |||
State = ExecutionState.Stopped; | |||
break; | |||
} | |||
} | |||
} | |||
bool MoveFailed(Exception err, object cookie) { | |||
lock (m_lock) { | |||
if (m_cookie != cookie) | |||
return false; | |||
LastError = err; | |||
State = ExecutionState.Failed; | |||
return true; | |||
} | |||
} | |||
Task ScheduleTask(Func<CancellationToken, Task> next, CancellationToken ct, object cookie) { | |||
var op = AsyncOperationDescriptor.Create(async (x) => { | |||
try { | |||
await next(x); | |||
MoveSuccess(cookie); | |||
} catch (Exception e) { | |||
MoveFailed(e, cookie); | |||
throw; | |||
} | |||
}, ct); | |||
m_current = op; | |||
return op.Task; | |||
} | |||
#endregion | |||
} | |||
} |