##// END OF EJS Templates
Merge pull request !2 from ImplabNet v3...
Merge pull request !2 from ImplabNet v3 Changes from branch: V3

File last commit:

r289:95896f882995 v3.0.14 v3
r294:abef3ebaa230 merge default
Show More
RunnableComponent.cs
349 lines | 11.7 KiB | text/x-csharp | CSharpLexer
/ Implab / src / Components / RunnableComponent.cs
cin
Added tests for Implab.ServiceHost.Unity configuration loader.
r289 using System;
using System.Diagnostics;
using System.Threading;
using System.Threading.Tasks;
namespace Implab.Components {
/// <summary>
/// Base class for implementing components which support start and stop operations,
/// such components may represent running services.
/// </summary>
/// <remarks>
/// This class provides a basic lifecycle from the creation to the
/// termination of the component.
/// </remarks>
public abstract class RunnableComponent : IAsyncComponent, IRunnable, IInitializable, IDisposable {
/// <summary>
/// This class bounds <see cref="CancellationTokenSource"/> lifetime to the task,
/// when the task completes the associated token source will be disposed.
/// </summary>
class AsyncOperationDescriptor {
public static AsyncOperationDescriptor None { get; } = new AsyncOperationDescriptor();
readonly CancellationTokenSource m_cts;
bool m_done;
public CancellationToken Token {
get { return m_cts == null ? CancellationToken.None : m_cts.Token; }
}
public Task Task { get; private set; }
private AsyncOperationDescriptor(Task task, CancellationTokenSource cts) {
m_cts = cts;
Task = Chain(task);
}
private AsyncOperationDescriptor() {
Task = Task.CompletedTask;
}
public void Cancel() {
if (m_cts != null) {
lock (m_cts) {
if (!m_done)
m_cts.Cancel();
}
}
}
void Done() {
if (m_cts != null) {
lock (m_cts) {
m_done = true;
m_cts.Dispose();
}
} else {
m_done = true;
}
}
async Task Chain(Task other) {
try {
await other;
} finally {
Done();
}
}
public static AsyncOperationDescriptor Create(Func<CancellationToken, Task> factory, CancellationToken ct) {
var cts = ct.CanBeCanceled ?
CancellationTokenSource.CreateLinkedTokenSource(ct) :
new CancellationTokenSource();
return new AsyncOperationDescriptor(factory(cts.Token), cts);
}
}
// this lock is used to synchronize state flow of the component during
// processing calls from a client and internal processes.
readonly object m_lock = new object();
// current operation cookie, used to check wheather a call to
// MoveSuccess/MoveFailed method belongs to the current
// operation, if cookies didn't match ignore completion result.
object m_cookie;
// AsyncOperationDscriptor aggregates a task and it's cancellation token
AsyncOperationDescriptor m_current = AsyncOperationDescriptor.None;
ExecutionState m_state;
/// <summary>
/// Объект синхронизации используется для обеспечения совместного доступа
/// клиента компоненты и процессов, протекающих внутри компоненты, к общему
/// состоянию, т.е.true таким свойствам, как <see cref="State"/>,
/// <see cref="LastError"/>. Обработчики события <see cref="StateChanged"/>
/// вызываются уже с установленной блокировкой, поэтому дополнительная
/// синхронизация не требуется.
/// </summary>
public object SynchronizationObject { get { return m_lock; } }
protected RunnableComponent(bool initialized) {
State = initialized ? ExecutionState.Ready : ExecutionState.Created;
}
public Task Completion {
get { return m_current.Task; }
}
public ExecutionState State {
get { return m_state; }
private set {
if (m_state != value) {
m_state = value;
StateChanged.DispatchEvent(this, new StateChangeEventArgs {
State = value,
LastError = LastError
});
}
}
}
public Exception LastError { get; private set; }
/// <summary>
/// Событие изменения состояния компоненты.see Обработчики данного события
/// вызываются внутри блокировки <see cref="SynchronizationObject"/> и должны
/// выполняться максимально быстро.
/// </summary>
public event EventHandler<StateChangeEventArgs> StateChanged;
/// <summary>
/// Releases all resources used by the current component regardless of its
/// execution state.
/// </summary>
/// <remarks>
/// Calling to this method may result unexpedted results if the component
/// isn't in the stopped state. Call this method after the component is
/// stopped if needed or if the component is in the failed state.
/// </remarks>
public void Dispose() {
bool dispose = false;
lock (SynchronizationObject) {
if (m_state != ExecutionState.Disposed) {
dispose = true;
m_state = ExecutionState.Disposed;
m_cookie = new object();
}
}
if (dispose) {
Dispose(true);
GC.SuppressFinalize(this);
}
}
~RunnableComponent() {
Dispose(false);
}
/// <summary>
/// Releases all resources used by the current component regardless of its
/// execution state.
/// </summary>
/// <param name="disposing">Indicates that the component is disposed
/// during a normal disposing or during GC.</param>
protected virtual void Dispose(bool disposing) {
}
public void Initialize() {
Initialize(CancellationToken.None);
}
public void Initialize(CancellationToken ct) {
var cookie = new object();
if (MoveInitialize(cookie))
Safe.NoWait(ScheduleTask(InitializeInternalAsync, ct, cookie));
else
throw new InvalidOperationException();
}
/// <summary>
/// This method is used for initialization during a component creation.
/// </summary>
/// <param name="ct">A cancellation token for this operation</param>
/// <remarks>
/// This method should be used for short and mostly syncronous operations,
/// other operations which require time to run shoud be placed in
/// <see cref="StartInternalAsync(CancellationToken)"/> method.
/// </remarks>
protected virtual Task InitializeInternalAsync(CancellationToken ct) {
return Task.CompletedTask;
}
public void Start() {
Start(CancellationToken.None);
}
public void Start(CancellationToken ct) {
var cookie = new object();
if (MoveStart(cookie))
Safe.NoWait(ScheduleStartAndRun(ct, cookie));
else
throw new InvalidOperationException();
}
async Task ScheduleStartAndRun(CancellationToken ct, object cookie) {
try {
await ScheduleTask(StartInternalAsync, ct, cookie);
RunInternal();
} catch (Exception err) {
Fail(err);
}
}
protected virtual Task StartInternalAsync(CancellationToken ct) {
return Task.CompletedTask;
}
/// <summary>
/// This method is called after the component is enetered running state,
/// use this method to
/// </summary>
protected virtual void RunInternal() {
}
public void Stop() {
Stop(CancellationToken.None);
}
public void Stop(CancellationToken ct) {
var cookie = new object();
if (MoveStop(cookie))
Safe.NoWait(ScheduleTask(StopAsync, ct, cookie));
else
throw new InvalidOperationException();
}
async Task StopAsync(CancellationToken ct) {
m_current.Cancel();
try {
await Completion;
} catch(OperationCanceledException) {
// OK
}
ct.ThrowIfCancellationRequested();
await StopInternalAsync(ct);
}
protected virtual Task StopInternalAsync(CancellationToken ct) {
return Task.CompletedTask;
}
protected void Fail(Exception err) {
lock(m_lock) {
if (m_state != ExecutionState.Running)
return;
m_cookie = new object();
LastError = err;
State = ExecutionState.Failed;
}
}
#region state management
bool MoveInitialize(object cookie) {
lock (m_lock) {
if (State != ExecutionState.Created)
return false;
State = ExecutionState.Initializing;
m_cookie = cookie;
return true;
}
}
bool MoveStart(object cookie) {
lock (m_lock) {
if (State != ExecutionState.Ready)
return false;
State = ExecutionState.Starting;
m_cookie = cookie;
return true;
}
}
bool MoveStop(object cookie) {
lock (m_lock) {
if (State != ExecutionState.Starting && State != ExecutionState.Running)
return false;
State = ExecutionState.Stopping;
m_cookie = cookie;
return true;
}
}
void MoveSuccess(object cookie) {
lock (m_lock) {
if (m_cookie != cookie)
return;
switch (State) {
case ExecutionState.Initializing:
State = ExecutionState.Ready;
break;
case ExecutionState.Starting:
State = ExecutionState.Running;
break;
case ExecutionState.Stopping:
State = ExecutionState.Stopped;
break;
}
}
}
bool MoveFailed(Exception err, object cookie) {
lock (m_lock) {
if (m_cookie != cookie)
return false;
LastError = err;
State = ExecutionState.Failed;
return true;
}
}
Task ScheduleTask(Func<CancellationToken, Task> next, CancellationToken ct, object cookie) {
var op = AsyncOperationDescriptor.Create(async (x) => {
try {
await next(x);
MoveSuccess(cookie);
} catch (Exception e) {
MoveFailed(e, cookie);
throw;
}
}, ct);
m_current = op;
return op.Task;
}
#endregion
}
}