##// END OF EJS Templates
Fixed promise rejection when there is not specified error handler in the reaction....
Fixed promise rejection when there is not specified error handler in the reaction. FIXED SPELLING IN THE XML CONTAINER CONFIGURATION signleton->singleton Code cleanup Update tests make them working on dotnet core

File last commit:

r289:95896f882995 v3.0.14 v3
r295:28af686e24f7 default
Show More
RunnableComponent.cs
349 lines | 11.7 KiB | text/x-csharp | CSharpLexer
using System;
using System.Diagnostics;
using System.Threading;
using System.Threading.Tasks;
namespace Implab.Components {
/// <summary>
/// Base class for implementing components which support start and stop operations,
/// such components may represent running services.
/// </summary>
/// <remarks>
/// This class provides a basic lifecycle from the creation to the
/// termination of the component.
/// </remarks>
public abstract class RunnableComponent : IAsyncComponent, IRunnable, IInitializable, IDisposable {
/// <summary>
/// This class bounds <see cref="CancellationTokenSource"/> lifetime to the task,
/// when the task completes the associated token source will be disposed.
/// </summary>
class AsyncOperationDescriptor {
public static AsyncOperationDescriptor None { get; } = new AsyncOperationDescriptor();
readonly CancellationTokenSource m_cts;
bool m_done;
public CancellationToken Token {
get { return m_cts == null ? CancellationToken.None : m_cts.Token; }
}
public Task Task { get; private set; }
private AsyncOperationDescriptor(Task task, CancellationTokenSource cts) {
m_cts = cts;
Task = Chain(task);
}
private AsyncOperationDescriptor() {
Task = Task.CompletedTask;
}
public void Cancel() {
if (m_cts != null) {
lock (m_cts) {
if (!m_done)
m_cts.Cancel();
}
}
}
void Done() {
if (m_cts != null) {
lock (m_cts) {
m_done = true;
m_cts.Dispose();
}
} else {
m_done = true;
}
}
async Task Chain(Task other) {
try {
await other;
} finally {
Done();
}
}
public static AsyncOperationDescriptor Create(Func<CancellationToken, Task> factory, CancellationToken ct) {
var cts = ct.CanBeCanceled ?
CancellationTokenSource.CreateLinkedTokenSource(ct) :
new CancellationTokenSource();
return new AsyncOperationDescriptor(factory(cts.Token), cts);
}
}
// this lock is used to synchronize state flow of the component during
// processing calls from a client and internal processes.
readonly object m_lock = new object();
// current operation cookie, used to check wheather a call to
// MoveSuccess/MoveFailed method belongs to the current
// operation, if cookies didn't match ignore completion result.
object m_cookie;
// AsyncOperationDscriptor aggregates a task and it's cancellation token
AsyncOperationDescriptor m_current = AsyncOperationDescriptor.None;
ExecutionState m_state;
/// <summary>
/// Объект синхронизации используется для обеспечения совместного доступа
/// клиента компоненты и процессов, протекающих внутри компоненты, к общему
/// состоянию, т.е.true таким свойствам, как <see cref="State"/>,
/// <see cref="LastError"/>. Обработчики события <see cref="StateChanged"/>
/// вызываются уже с установленной блокировкой, поэтому дополнительная
/// синхронизация не требуется.
/// </summary>
public object SynchronizationObject { get { return m_lock; } }
protected RunnableComponent(bool initialized) {
State = initialized ? ExecutionState.Ready : ExecutionState.Created;
}
public Task Completion {
get { return m_current.Task; }
}
public ExecutionState State {
get { return m_state; }
private set {
if (m_state != value) {
m_state = value;
StateChanged.DispatchEvent(this, new StateChangeEventArgs {
State = value,
LastError = LastError
});
}
}
}
public Exception LastError { get; private set; }
/// <summary>
/// Событие изменения состояния компоненты.see Обработчики данного события
/// вызываются внутри блокировки <see cref="SynchronizationObject"/> и должны
/// выполняться максимально быстро.
/// </summary>
public event EventHandler<StateChangeEventArgs> StateChanged;
/// <summary>
/// Releases all resources used by the current component regardless of its
/// execution state.
/// </summary>
/// <remarks>
/// Calling to this method may result unexpedted results if the component
/// isn't in the stopped state. Call this method after the component is
/// stopped if needed or if the component is in the failed state.
/// </remarks>
public void Dispose() {
bool dispose = false;
lock (SynchronizationObject) {
if (m_state != ExecutionState.Disposed) {
dispose = true;
m_state = ExecutionState.Disposed;
m_cookie = new object();
}
}
if (dispose) {
Dispose(true);
GC.SuppressFinalize(this);
}
}
~RunnableComponent() {
Dispose(false);
}
/// <summary>
/// Releases all resources used by the current component regardless of its
/// execution state.
/// </summary>
/// <param name="disposing">Indicates that the component is disposed
/// during a normal disposing or during GC.</param>
protected virtual void Dispose(bool disposing) {
}
public void Initialize() {
Initialize(CancellationToken.None);
}
public void Initialize(CancellationToken ct) {
var cookie = new object();
if (MoveInitialize(cookie))
Safe.NoWait(ScheduleTask(InitializeInternalAsync, ct, cookie));
else
throw new InvalidOperationException();
}
/// <summary>
/// This method is used for initialization during a component creation.
/// </summary>
/// <param name="ct">A cancellation token for this operation</param>
/// <remarks>
/// This method should be used for short and mostly syncronous operations,
/// other operations which require time to run shoud be placed in
/// <see cref="StartInternalAsync(CancellationToken)"/> method.
/// </remarks>
protected virtual Task InitializeInternalAsync(CancellationToken ct) {
return Task.CompletedTask;
}
public void Start() {
Start(CancellationToken.None);
}
public void Start(CancellationToken ct) {
var cookie = new object();
if (MoveStart(cookie))
Safe.NoWait(ScheduleStartAndRun(ct, cookie));
else
throw new InvalidOperationException();
}
async Task ScheduleStartAndRun(CancellationToken ct, object cookie) {
try {
await ScheduleTask(StartInternalAsync, ct, cookie);
RunInternal();
} catch (Exception err) {
Fail(err);
}
}
protected virtual Task StartInternalAsync(CancellationToken ct) {
return Task.CompletedTask;
}
/// <summary>
/// This method is called after the component is enetered running state,
/// use this method to
/// </summary>
protected virtual void RunInternal() {
}
public void Stop() {
Stop(CancellationToken.None);
}
public void Stop(CancellationToken ct) {
var cookie = new object();
if (MoveStop(cookie))
Safe.NoWait(ScheduleTask(StopAsync, ct, cookie));
else
throw new InvalidOperationException();
}
async Task StopAsync(CancellationToken ct) {
m_current.Cancel();
try {
await Completion;
} catch(OperationCanceledException) {
// OK
}
ct.ThrowIfCancellationRequested();
await StopInternalAsync(ct);
}
protected virtual Task StopInternalAsync(CancellationToken ct) {
return Task.CompletedTask;
}
protected void Fail(Exception err) {
lock(m_lock) {
if (m_state != ExecutionState.Running)
return;
m_cookie = new object();
LastError = err;
State = ExecutionState.Failed;
}
}
#region state management
bool MoveInitialize(object cookie) {
lock (m_lock) {
if (State != ExecutionState.Created)
return false;
State = ExecutionState.Initializing;
m_cookie = cookie;
return true;
}
}
bool MoveStart(object cookie) {
lock (m_lock) {
if (State != ExecutionState.Ready)
return false;
State = ExecutionState.Starting;
m_cookie = cookie;
return true;
}
}
bool MoveStop(object cookie) {
lock (m_lock) {
if (State != ExecutionState.Starting && State != ExecutionState.Running)
return false;
State = ExecutionState.Stopping;
m_cookie = cookie;
return true;
}
}
void MoveSuccess(object cookie) {
lock (m_lock) {
if (m_cookie != cookie)
return;
switch (State) {
case ExecutionState.Initializing:
State = ExecutionState.Ready;
break;
case ExecutionState.Starting:
State = ExecutionState.Running;
break;
case ExecutionState.Stopping:
State = ExecutionState.Stopped;
break;
}
}
}
bool MoveFailed(Exception err, object cookie) {
lock (m_lock) {
if (m_cookie != cookie)
return false;
LastError = err;
State = ExecutionState.Failed;
return true;
}
}
Task ScheduleTask(Func<CancellationToken, Task> next, CancellationToken ct, object cookie) {
var op = AsyncOperationDescriptor.Create(async (x) => {
try {
await next(x);
MoveSuccess(cookie);
} catch (Exception e) {
MoveFailed(e, cookie);
throw;
}
}, ct);
m_current = op;
return op.Task;
}
#endregion
}
}