##// END OF EJS Templates
Removed obsolete App, ComponentContainer...
Removed obsolete App, ComponentContainer Extracted IAsyncComponent interface Working on RunnableComponent

File last commit:

r256:c52691faaf21 v3
r256:c52691faaf21 v3
Show More
RunnableComponent.cs
289 lines | 9.3 KiB | text/x-csharp | CSharpLexer
cin
Working on runnable component
r250 using System;
cin
Prerelease version of RunnableComponent...
r251 using System.Diagnostics;
cin
Working on runnable component
r250 using System.Threading;
using System.Threading.Tasks;
cin
Prerelease version of RunnableComponent...
r251 namespace Implab.Components {
/// <summary>
/// Base class for implementing components which support start and stop operations,
/// such components may represent running services.
/// </summary>
/// <remarks>
/// This class provides a basic lifecycle from the creation to the
/// termination of the component.
/// </remarks>
cin
Removed obsolete App, ComponentContainer...
r256 public class RunnableComponent : IAsyncComponent, IRunnable, IInitializable, IDisposable {
cin
Prerelease version of RunnableComponent...
r251
/// <summary>
cin
removed absolete Diagnostics classes
r252 /// This class bounds <see cref="CancellationTokenSource"/> lifetime to the task,
cin
Prerelease version of RunnableComponent...
r251 /// when the task completes the associated token source will be disposed.
/// </summary>
class AsyncOperationDescriptor {
public static AsyncOperationDescriptor None { get; } = new AsyncOperationDescriptor();
readonly CancellationTokenSource m_cts;
bool m_done;
public CancellationToken Token {
get { return m_cts == null ? CancellationToken.None : m_cts.Token; }
}
public Task Task { get; private set; }
private AsyncOperationDescriptor(Task task, CancellationTokenSource cts) {
m_cts = cts;
Task = Chain(task);
}
private AsyncOperationDescriptor() {
Task = Task.CompletedTask;
}
cin
Working on runnable component
r250
cin
Prerelease version of RunnableComponent...
r251 public void Cancel() {
if (m_cts != null) {
lock (m_cts) {
if (!m_done)
m_cts.Cancel();
}
}
}
void Done() {
if (m_cts != null) {
lock (m_cts) {
m_done = true;
m_cts.Dispose();
}
} else {
m_done = true;
}
}
async Task Chain(Task other) {
try {
await other;
} finally {
Done();
}
}
public static AsyncOperationDescriptor Create(Func<CancellationToken, Task> factory, CancellationToken ct) {
var cts = ct.CanBeCanceled ?
CancellationTokenSource.CreateLinkedTokenSource(ct) :
new CancellationTokenSource();
return new AsyncOperationDescriptor(factory(cts.Token), cts);
}
}
// this lock is used to synchronize state flow of the component during
cin
Removed obsolete App, ComponentContainer...
r256 // processing calls from a client and internal processes.
cin
Working on runnable component
r250 readonly object m_lock = new object();
cin
Prerelease version of RunnableComponent...
r251 // current operation cookie, used to check wheather a call to
// MoveSuccess/MoveFailed method belongs to the current
// operation, if cookies didn't match ignore completion result.
object m_cookie;
cin
Working on runnable component
r250
cin
Removed obsolete App, ComponentContainer...
r256 // AsyncOperationDscriptor aggregates a task and it's cancellation token
cin
Prerelease version of RunnableComponent...
r251 AsyncOperationDescriptor m_current = AsyncOperationDescriptor.None;
ExecutionState m_state;
protected RunnableComponent(bool initialized) {
State = initialized ? ExecutionState.Ready : ExecutionState.Created;
cin
Working on runnable component
r250 }
cin
Prerelease version of RunnableComponent...
r251 public Task Completion {
get { return m_current.Task; }
}
cin
Working on runnable component
r250
cin
Prerelease version of RunnableComponent...
r251 public ExecutionState State {
get { return m_state; }
private set {
if (m_state != value) {
m_state = value;
StateChanged.DispatchEvent(this, new StateChangeEventArgs {
State = value,
LastError = LastError
});
}
}
}
public Exception LastError { get; private set; }
cin
Working on runnable component
r250
public event EventHandler<StateChangeEventArgs> StateChanged;
cin
Prerelease version of RunnableComponent...
r251 /// <summary>
/// Releases all resources used by the current component regardless of its
/// execution state.
/// </summary>
/// <remarks>
/// Calling to this method may result unexpedted results if the component
/// isn't in the stopped state. Call this method after the component is
/// stopped if needed or if the component is in the failed state.
/// </remarks>
cin
Working on runnable component
r250 public void Dispose() {
cin
Prerelease version of RunnableComponent...
r251 bool dispose = false;
if (dispose) {
cin
Working on runnable component
r250 Dispose(true);
GC.SuppressFinalize(this);
}
}
cin
Prerelease version of RunnableComponent...
r251 ~RunnableComponent() {
Dispose(false);
}
/// <summary>
/// Releases all resources used by the current component regardless of its
/// execution state.
/// </summary>
/// <param name="disposing">Indicates that the component is disposed
/// during a normal disposing or during GC.</param>
cin
Working on runnable component
r250 protected virtual void Dispose(bool disposing) {
cin
Prerelease version of RunnableComponent...
r251 }
public void Initialize() {
var cookie = new object();
if (MoveInitialize(cookie))
ScheduleTask(InitializeInternalAsync, CancellationToken.None, cookie);
cin
Removed obsolete App, ComponentContainer...
r256 else
throw new InvalidOperationException();
cin
Prerelease version of RunnableComponent...
r251 }
/// <summary>
/// This method is used for initialization during a component creation.
/// </summary>
/// <param name="ct">A cancellation token for this operation</param>
/// <remarks>
/// This method should be used for short and mostly syncronous operations,
/// other operations which require time to run shoud be placed in
cin
removed absolete Diagnostics classes
r252 /// <see cref="StartInternalAsync(CancellationToken)"/> method.
cin
Prerelease version of RunnableComponent...
r251 /// </remarks>
protected virtual Task InitializeInternalAsync(CancellationToken ct) {
return Task.CompletedTask;
cin
Working on runnable component
r250 }
public void Start(CancellationToken ct) {
cin
Prerelease version of RunnableComponent...
r251 var cookie = new object();
if (MoveStart(cookie))
cin
removed absolete Diagnostics classes
r252 ScheduleTask(StartInternalAsync, ct, cookie);
cin
Removed obsolete App, ComponentContainer...
r256 else
throw new InvalidOperationException();
cin
Prerelease version of RunnableComponent...
r251 }
cin
removed absolete Diagnostics classes
r252 protected virtual Task StartInternalAsync(CancellationToken ct) {
cin
Prerelease version of RunnableComponent...
r251 return Task.CompletedTask;
}
public void Stop(CancellationToken ct) {
var cookie = new object();
if (MoveStop(cookie))
ScheduleTask(StopAsync, ct, cookie);
cin
Removed obsolete App, ComponentContainer...
r256 else
throw new InvalidOperationException();
cin
Prerelease version of RunnableComponent...
r251 }
async Task StopAsync(CancellationToken ct) {
m_current.Cancel();
await Completion;
ct.ThrowIfCancellationRequested();
await StopInternalAsync(ct);
}
protected virtual Task StopInternalAsync(CancellationToken ct) {
return Task.CompletedTask;
}
cin
Removed obsolete App, ComponentContainer...
r256 protected void Fail(Exception err) {
lock(m_lock) {
if (m_state != ExecutionState.Running)
return;
m_cookie = new object();
LastError = err;
State = ExecutionState.Failed;
}
}
cin
Prerelease version of RunnableComponent...
r251
#region state management
bool MoveInitialize(object cookie) {
lock (m_lock) {
if (State != ExecutionState.Created)
return false;
State = ExecutionState.Initializing;
m_cookie = cookie;
return true;
}
}
bool MoveStart(object cookie) {
lock (m_lock) {
if (State != ExecutionState.Ready)
return false;
State = ExecutionState.Starting;
m_cookie = cookie;
return true;
}
}
bool MoveStop(object cookie) {
lock (m_lock) {
if (State != ExecutionState.Starting && State != ExecutionState.Running)
return false;
State = ExecutionState.Stopping;
m_cookie = cookie;
return true;
}
}
void MoveSuccess(object cookie) {
lock (m_lock) {
if (m_cookie != cookie)
return;
switch (State) {
case ExecutionState.Initializing:
State = ExecutionState.Ready;
break;
case ExecutionState.Starting:
State = ExecutionState.Running;
break;
case ExecutionState.Stopping:
State = ExecutionState.Stopped;
break;
cin
Working on runnable component
r250 }
}
}
cin
Prerelease version of RunnableComponent...
r251 void MoveFailed(Exception err, object cookie) {
lock (m_lock) {
if (m_cookie != cookie)
return;
LastError = err;
State = ExecutionState.Failed;
}
cin
Working on runnable component
r250 }
cin
Removed obsolete App, ComponentContainer...
r256 void ScheduleTask(Func<CancellationToken, Task> next, CancellationToken ct, object cookie) {
cin
Working on runnable component
r250
cin
Removed obsolete App, ComponentContainer...
r256 m_current = AsyncOperationDescriptor.Create(async (x) => {
try {
await next(x);
MoveSuccess(cookie);
} catch (Exception e) {
MoveFailed(e, cookie);
}
}, ct);
cin
Working on runnable component
r250 }
cin
Prerelease version of RunnableComponent...
r251
#endregion
cin
Working on runnable component
r250 }
}