using System;
using System.Diagnostics;
using System.Threading;
using System.Threading.Tasks;
namespace Implab.Components {
///
/// Base class for implementing components which support start and stop operations,
/// such components may represent running services.
///
///
/// This class provides a basic lifecycle from the creation to the
/// termination of the component.
///
public abstract class RunnableComponent : IAsyncComponent, IRunnable, IInitializable, IDisposable {
///
/// This class bounds lifetime to the task,
/// when the task completes the associated token source will be disposed.
///
class AsyncOperationDescriptor {
public static AsyncOperationDescriptor None { get; } = new AsyncOperationDescriptor();
readonly CancellationTokenSource m_cts;
bool m_done;
public CancellationToken Token {
get { return m_cts == null ? CancellationToken.None : m_cts.Token; }
}
public Task Task { get; private set; }
private AsyncOperationDescriptor(Task task, CancellationTokenSource cts) {
m_cts = cts;
Task = Chain(task);
}
private AsyncOperationDescriptor() {
Task = Task.CompletedTask;
}
public void Cancel() {
if (m_cts != null) {
lock (m_cts) {
if (!m_done)
m_cts.Cancel();
}
}
}
void Done() {
if (m_cts != null) {
lock (m_cts) {
m_done = true;
m_cts.Dispose();
}
} else {
m_done = true;
}
}
async Task Chain(Task other) {
try {
await other;
} finally {
Done();
}
}
public static AsyncOperationDescriptor Create(Func factory, CancellationToken ct) {
var cts = ct.CanBeCanceled ?
CancellationTokenSource.CreateLinkedTokenSource(ct) :
new CancellationTokenSource();
return new AsyncOperationDescriptor(factory(cts.Token), cts);
}
}
// this lock is used to synchronize state flow of the component during
// processing calls from a client and internal processes.
readonly object m_lock = new object();
// current operation cookie, used to check wheather a call to
// MoveSuccess/MoveFailed method belongs to the current
// operation, if cookies didn't match ignore completion result.
object m_cookie;
// AsyncOperationDscriptor aggregates a task and it's cancellation token
AsyncOperationDescriptor m_current = AsyncOperationDescriptor.None;
ExecutionState m_state;
///
/// Объект синхронизации используется для обеспечения совместного доступа
/// клиента компоненты и процессов, протекающих внутри компоненты, к общему
/// состоянию, т.е.true таким свойствам, как ,
/// . Обработчики события
/// вызываются уже с установленной блокировкой, поэтому дополнительная
/// синхронизация не требуется.
///
public object SynchronizationObject { get { return m_lock; } }
protected RunnableComponent(bool initialized) {
State = initialized ? ExecutionState.Ready : ExecutionState.Created;
}
public Task Completion {
get { return m_current.Task; }
}
public ExecutionState State {
get { return m_state; }
private set {
if (m_state != value) {
m_state = value;
StateChanged.DispatchEvent(this, new StateChangeEventArgs {
State = value,
LastError = LastError
});
}
}
}
public Exception LastError { get; private set; }
///
/// Событие изменения состояния компоненты.see Обработчики данного события
/// вызываются внутри блокировки и должны
/// выполняться максимально быстро.
///
public event EventHandler StateChanged;
///
/// Releases all resources used by the current component regardless of its
/// execution state.
///
///
/// Calling to this method may result unexpedted results if the component
/// isn't in the stopped state. Call this method after the component is
/// stopped if needed or if the component is in the failed state.
///
public void Dispose() {
bool dispose = false;
lock (SynchronizationObject) {
if (m_state != ExecutionState.Disposed) {
dispose = true;
m_state = ExecutionState.Disposed;
m_cookie = new object();
}
}
if (dispose) {
Dispose(true);
GC.SuppressFinalize(this);
}
}
~RunnableComponent() {
Dispose(false);
}
///
/// Releases all resources used by the current component regardless of its
/// execution state.
///
/// Indicates that the component is disposed
/// during a normal disposing or during GC.
protected virtual void Dispose(bool disposing) {
}
public void Initialize() {
Initialize(CancellationToken.None);
}
public void Initialize(CancellationToken ct) {
var cookie = new object();
if (MoveInitialize(cookie))
Safe.NoWait(ScheduleTask(InitializeInternalAsync, ct, cookie));
else
throw new InvalidOperationException();
}
///
/// This method is used for initialization during a component creation.
///
/// A cancellation token for this operation
///
/// This method should be used for short and mostly syncronous operations,
/// other operations which require time to run shoud be placed in
/// method.
///
protected virtual Task InitializeInternalAsync(CancellationToken ct) {
return Task.CompletedTask;
}
public void Start() {
Start(CancellationToken.None);
}
public void Start(CancellationToken ct) {
var cookie = new object();
if (MoveStart(cookie))
Safe.NoWait(ScheduleStartAndRun(ct, cookie));
else
throw new InvalidOperationException();
}
async Task ScheduleStartAndRun(CancellationToken ct, object cookie) {
try {
await ScheduleTask(StartInternalAsync, ct, cookie);
RunInternal();
} catch (Exception err) {
Fail(err);
}
}
protected virtual Task StartInternalAsync(CancellationToken ct) {
return Task.CompletedTask;
}
///
/// This method is called after the component is enetered running state,
/// use this method to
///
protected virtual void RunInternal() {
}
public void Stop() {
Stop(CancellationToken.None);
}
public void Stop(CancellationToken ct) {
var cookie = new object();
if (MoveStop(cookie))
Safe.NoWait(ScheduleTask(StopAsync, ct, cookie));
else
throw new InvalidOperationException();
}
async Task StopAsync(CancellationToken ct) {
m_current.Cancel();
try {
await Completion;
} catch(OperationCanceledException) {
// OK
}
ct.ThrowIfCancellationRequested();
await StopInternalAsync(ct);
}
protected virtual Task StopInternalAsync(CancellationToken ct) {
return Task.CompletedTask;
}
protected void Fail(Exception err) {
lock(m_lock) {
if (m_state != ExecutionState.Running)
return;
m_cookie = new object();
LastError = err;
State = ExecutionState.Failed;
}
}
#region state management
bool MoveInitialize(object cookie) {
lock (m_lock) {
if (State != ExecutionState.Created)
return false;
State = ExecutionState.Initializing;
m_cookie = cookie;
return true;
}
}
bool MoveStart(object cookie) {
lock (m_lock) {
if (State != ExecutionState.Ready)
return false;
State = ExecutionState.Starting;
m_cookie = cookie;
return true;
}
}
bool MoveStop(object cookie) {
lock (m_lock) {
if (State != ExecutionState.Starting && State != ExecutionState.Running)
return false;
State = ExecutionState.Stopping;
m_cookie = cookie;
return true;
}
}
void MoveSuccess(object cookie) {
lock (m_lock) {
if (m_cookie != cookie)
return;
switch (State) {
case ExecutionState.Initializing:
State = ExecutionState.Ready;
break;
case ExecutionState.Starting:
State = ExecutionState.Running;
break;
case ExecutionState.Stopping:
State = ExecutionState.Stopped;
break;
}
}
}
bool MoveFailed(Exception err, object cookie) {
lock (m_lock) {
if (m_cookie != cookie)
return false;
LastError = err;
State = ExecutionState.Failed;
return true;
}
}
Task ScheduleTask(Func next, CancellationToken ct, object cookie) {
var op = AsyncOperationDescriptor.Create(async (x) => {
try {
await next(x);
MoveSuccess(cookie);
} catch (Exception e) {
MoveFailed(e, cookie);
throw;
}
}, ct);
m_current = op;
return op.Task;
}
#endregion
}
}