##// END OF EJS Templates
refactoring
refactoring

File last commit:

r262:f1696cdc3d7a v3.0.8 v3
r275:6fefd5811b9b v3
Show More
RunnableComponent.cs
349 lines | 11.7 KiB | text/x-csharp | CSharpLexer
cin
Working on runnable component
r250 using System;
cin
Prerelease version of RunnableComponent...
r251 using System.Diagnostics;
cin
Working on runnable component
r250 using System.Threading;
using System.Threading.Tasks;
cin
Prerelease version of RunnableComponent...
r251 namespace Implab.Components {
/// <summary>
/// Base class for implementing components which support start and stop operations,
/// such components may represent running services.
/// </summary>
/// <remarks>
/// This class provides a basic lifecycle from the creation to the
/// termination of the component.
/// </remarks>
cin
working on runnable components
r257 public abstract class RunnableComponent : IAsyncComponent, IRunnable, IInitializable, IDisposable {
cin
Prerelease version of RunnableComponent...
r251
/// <summary>
cin
removed absolete Diagnostics classes
r252 /// This class bounds <see cref="CancellationTokenSource"/> lifetime to the task,
cin
Prerelease version of RunnableComponent...
r251 /// when the task completes the associated token source will be disposed.
/// </summary>
class AsyncOperationDescriptor {
public static AsyncOperationDescriptor None { get; } = new AsyncOperationDescriptor();
readonly CancellationTokenSource m_cts;
bool m_done;
public CancellationToken Token {
get { return m_cts == null ? CancellationToken.None : m_cts.Token; }
}
public Task Task { get; private set; }
private AsyncOperationDescriptor(Task task, CancellationTokenSource cts) {
m_cts = cts;
Task = Chain(task);
}
private AsyncOperationDescriptor() {
Task = Task.CompletedTask;
}
cin
Working on runnable component
r250
cin
Prerelease version of RunnableComponent...
r251 public void Cancel() {
if (m_cts != null) {
lock (m_cts) {
if (!m_done)
m_cts.Cancel();
}
}
}
void Done() {
if (m_cts != null) {
lock (m_cts) {
m_done = true;
m_cts.Dispose();
}
} else {
m_done = true;
}
}
async Task Chain(Task other) {
try {
await other;
} finally {
Done();
}
}
public static AsyncOperationDescriptor Create(Func<CancellationToken, Task> factory, CancellationToken ct) {
var cts = ct.CanBeCanceled ?
CancellationTokenSource.CreateLinkedTokenSource(ct) :
new CancellationTokenSource();
return new AsyncOperationDescriptor(factory(cts.Token), cts);
}
}
// this lock is used to synchronize state flow of the component during
cin
Removed obsolete App, ComponentContainer...
r256 // processing calls from a client and internal processes.
cin
Working on runnable component
r250 readonly object m_lock = new object();
cin
Prerelease version of RunnableComponent...
r251 // current operation cookie, used to check wheather a call to
// MoveSuccess/MoveFailed method belongs to the current
// operation, if cookies didn't match ignore completion result.
object m_cookie;
cin
Working on runnable component
r250
cin
Removed obsolete App, ComponentContainer...
r256 // AsyncOperationDscriptor aggregates a task and it's cancellation token
cin
Prerelease version of RunnableComponent...
r251 AsyncOperationDescriptor m_current = AsyncOperationDescriptor.None;
ExecutionState m_state;
cin
working on runnable components
r257 /// <summary>
/// Объект синхронизации используется для обеспечения совместного доступа
/// клиента компоненты и процессов, протекающих внутри компоненты, к общему
/// состоянию, т.е.true таким свойствам, как <see cref="State"/>,
/// <see cref="LastError"/>. Обработчики события <see cref="StateChanged"/>
/// вызываются уже с установленной блокировкой, поэтому дополнительная
/// синхронизация не требуется.
/// </summary>
public object SynchronizationObject { get { return m_lock; } }
cin
Prerelease version of RunnableComponent...
r251
protected RunnableComponent(bool initialized) {
State = initialized ? ExecutionState.Ready : ExecutionState.Created;
cin
Working on runnable component
r250 }
cin
Prerelease version of RunnableComponent...
r251 public Task Completion {
get { return m_current.Task; }
}
cin
Working on runnable component
r250
cin
Prerelease version of RunnableComponent...
r251 public ExecutionState State {
get { return m_state; }
private set {
if (m_state != value) {
m_state = value;
StateChanged.DispatchEvent(this, new StateChangeEventArgs {
State = value,
LastError = LastError
});
}
}
}
public Exception LastError { get; private set; }
cin
Working on runnable component
r250
cin
working on runnable components
r257 /// <summary>
/// Событие изменения состояния компоненты.see Обработчики данного события
/// вызываются внутри блокировки <see cref="SynchronizationObject"/> и должны
/// выполняться максимально быстро.
/// </summary>
cin
Working on runnable component
r250 public event EventHandler<StateChangeEventArgs> StateChanged;
cin
Prerelease version of RunnableComponent...
r251 /// <summary>
/// Releases all resources used by the current component regardless of its
/// execution state.
/// </summary>
/// <remarks>
/// Calling to this method may result unexpedted results if the component
/// isn't in the stopped state. Call this method after the component is
/// stopped if needed or if the component is in the failed state.
/// </remarks>
cin
Working on runnable component
r250 public void Dispose() {
cin
Prerelease version of RunnableComponent...
r251 bool dispose = false;
cin
working on runnable components
r257 lock (SynchronizationObject) {
if (m_state != ExecutionState.Disposed) {
dispose = true;
m_state = ExecutionState.Disposed;
m_cookie = new object();
}
}
cin
Prerelease version of RunnableComponent...
r251 if (dispose) {
cin
Working on runnable component
r250 Dispose(true);
GC.SuppressFinalize(this);
}
}
cin
Prerelease version of RunnableComponent...
r251 ~RunnableComponent() {
Dispose(false);
}
/// <summary>
/// Releases all resources used by the current component regardless of its
/// execution state.
/// </summary>
/// <param name="disposing">Indicates that the component is disposed
/// during a normal disposing or during GC.</param>
cin
Working on runnable component
r250 protected virtual void Dispose(bool disposing) {
cin
Prerelease version of RunnableComponent...
r251 }
public void Initialize() {
cin
Added IInitializable.Initialize() overload...
r262 Initialize(CancellationToken.None);
}
public void Initialize(CancellationToken ct) {
cin
Prerelease version of RunnableComponent...
r251 var cookie = new object();
if (MoveInitialize(cookie))
cin
Added IInitializable.Initialize() overload...
r262 Safe.NoWait(ScheduleTask(InitializeInternalAsync, ct, cookie));
cin
Removed obsolete App, ComponentContainer...
r256 else
throw new InvalidOperationException();
cin
Prerelease version of RunnableComponent...
r251 }
/// <summary>
/// This method is used for initialization during a component creation.
/// </summary>
/// <param name="ct">A cancellation token for this operation</param>
/// <remarks>
/// This method should be used for short and mostly syncronous operations,
/// other operations which require time to run shoud be placed in
cin
removed absolete Diagnostics classes
r252 /// <see cref="StartInternalAsync(CancellationToken)"/> method.
cin
Prerelease version of RunnableComponent...
r251 /// </remarks>
protected virtual Task InitializeInternalAsync(CancellationToken ct) {
return Task.CompletedTask;
cin
Working on runnable component
r250 }
cin
Added IInitializable.Initialize() overload...
r262 public void Start() {
Start(CancellationToken.None);
}
cin
Working on runnable component
r250 public void Start(CancellationToken ct) {
cin
Prerelease version of RunnableComponent...
r251 var cookie = new object();
if (MoveStart(cookie))
cin
working on runnable components
r257 Safe.NoWait(ScheduleStartAndRun(ct, cookie));
cin
Removed obsolete App, ComponentContainer...
r256 else
throw new InvalidOperationException();
cin
Prerelease version of RunnableComponent...
r251 }
cin
working on runnable components
r257 async Task ScheduleStartAndRun(CancellationToken ct, object cookie) {
try {
await ScheduleTask(StartInternalAsync, ct, cookie);
RunInternal();
} catch (Exception err) {
Fail(err);
}
}
cin
removed absolete Diagnostics classes
r252 protected virtual Task StartInternalAsync(CancellationToken ct) {
cin
Prerelease version of RunnableComponent...
r251 return Task.CompletedTask;
}
cin
working on runnable components
r257 /// <summary>
/// This method is called after the component is enetered running state,
/// use this method to
/// </summary>
protected virtual void RunInternal() {
}
cin
Added IInitializable.Initialize() overload...
r262 public void Stop() {
Stop(CancellationToken.None);
}
cin
Prerelease version of RunnableComponent...
r251 public void Stop(CancellationToken ct) {
var cookie = new object();
if (MoveStop(cookie))
cin
working on runnable components
r257 Safe.NoWait(ScheduleTask(StopAsync, ct, cookie));
cin
Removed obsolete App, ComponentContainer...
r256 else
throw new InvalidOperationException();
cin
Prerelease version of RunnableComponent...
r251 }
async Task StopAsync(CancellationToken ct) {
m_current.Cancel();
cin
Added IInitializable.Initialize() overload...
r262
try {
await Completion;
} catch(OperationCanceledException) {
// OK
}
cin
Prerelease version of RunnableComponent...
r251
ct.ThrowIfCancellationRequested();
await StopInternalAsync(ct);
}
protected virtual Task StopInternalAsync(CancellationToken ct) {
return Task.CompletedTask;
}
cin
Removed obsolete App, ComponentContainer...
r256 protected void Fail(Exception err) {
lock(m_lock) {
if (m_state != ExecutionState.Running)
return;
m_cookie = new object();
LastError = err;
State = ExecutionState.Failed;
}
}
cin
Prerelease version of RunnableComponent...
r251
#region state management
bool MoveInitialize(object cookie) {
lock (m_lock) {
if (State != ExecutionState.Created)
return false;
State = ExecutionState.Initializing;
m_cookie = cookie;
return true;
}
}
bool MoveStart(object cookie) {
lock (m_lock) {
if (State != ExecutionState.Ready)
return false;
State = ExecutionState.Starting;
m_cookie = cookie;
return true;
}
}
bool MoveStop(object cookie) {
lock (m_lock) {
if (State != ExecutionState.Starting && State != ExecutionState.Running)
return false;
State = ExecutionState.Stopping;
m_cookie = cookie;
return true;
}
}
void MoveSuccess(object cookie) {
lock (m_lock) {
if (m_cookie != cookie)
return;
switch (State) {
case ExecutionState.Initializing:
State = ExecutionState.Ready;
break;
case ExecutionState.Starting:
State = ExecutionState.Running;
break;
case ExecutionState.Stopping:
State = ExecutionState.Stopped;
break;
cin
Working on runnable component
r250 }
}
}
cin
Added IInitializable.Initialize() overload...
r262 bool MoveFailed(Exception err, object cookie) {
cin
Prerelease version of RunnableComponent...
r251 lock (m_lock) {
if (m_cookie != cookie)
cin
Added IInitializable.Initialize() overload...
r262 return false;
cin
Prerelease version of RunnableComponent...
r251 LastError = err;
State = ExecutionState.Failed;
cin
Added IInitializable.Initialize() overload...
r262 return true;
cin
Prerelease version of RunnableComponent...
r251 }
cin
Working on runnable component
r250 }
cin
working on runnable components
r257 Task ScheduleTask(Func<CancellationToken, Task> next, CancellationToken ct, object cookie) {
cin
Working on runnable component
r250
cin
working on runnable components
r257 var op = AsyncOperationDescriptor.Create(async (x) => {
cin
Removed obsolete App, ComponentContainer...
r256 try {
await next(x);
MoveSuccess(cookie);
} catch (Exception e) {
MoveFailed(e, cookie);
cin
minor fixes
r260 throw;
cin
Removed obsolete App, ComponentContainer...
r256 }
}, ct);
cin
working on runnable components
r257
m_current = op;
return op.Task;
cin
Working on runnable component
r250 }
cin
Prerelease version of RunnableComponent...
r251
#endregion
cin
Working on runnable component
r250 }
}