diff --git a/Implab.Test/UnitTest1.cs b/Implab.Test/UnitTest1.cs
--- a/Implab.Test/UnitTest1.cs
+++ b/Implab.Test/UnitTest1.cs
@@ -1,4 +1,5 @@
using System;
+using System.Diagnostics;
using System.Threading;
using Xunit;
@@ -9,9 +10,20 @@ namespace Implab.Test
[Fact]
public void Test1()
{
- using(var cts = new CancellationTokenSource(1000)) {
- PromiseHelper.Sleep(10000, cts.Token).Join();
- }
+ var listener = new TextWriterTraceListener(Console.Out);
+ var source = new TraceSource("Custom",SourceLevels.ActivityTracing);
+
+ source.Listeners.Add(listener);
+
+ Trace.Listeners.Add(listener);
+ Trace.WriteLine("Hello!");
+ Trace.CorrelationManager.StartLogicalOperation();
+ Trace.WriteLine("Inner");
+ foreach(var x in Trace.CorrelationManager.LogicalOperationStack)
+ Trace.WriteLine($"-{x}");
+ source.TraceEvent(TraceEventType.Information, 1, "source event");
+ source.TraceData(TraceEventType.Start, 1, DateTime.Now);
+ Trace.CorrelationManager.StopLogicalOperation();
}
}
}
diff --git a/Implab/Automaton/DFATable.cs b/Implab/Automaton/DFATable.cs
--- a/Implab/Automaton/DFATable.cs
+++ b/Implab/Automaton/DFATable.cs
@@ -20,7 +20,7 @@ namespace Implab.Automaton {
#region IDFADefinition implementation
public bool IsFinalState(int s) {
- Safe.ArgumentInRange(s, 0, m_stateCount, "s");
+ Safe.ArgumentInRange(s >= 0 && s < m_stateCount, nameof(s));
return m_finalStates.Contains(s);
}
@@ -46,7 +46,7 @@ namespace Implab.Automaton {
#endregion
public void SetInitialState(int s) {
- Safe.ArgumentAssert(s >= 0, "s");
+ Safe.ArgumentInRange(s >= 0, nameof(s));
m_stateCount = Math.Max(m_stateCount, s + 1);
m_initialState = s;
}
@@ -57,9 +57,9 @@ namespace Implab.Automaton {
}
public void Add(AutomatonTransition item) {
- Safe.ArgumentAssert(item.s1 >= 0, "item");
- Safe.ArgumentAssert(item.s2 >= 0, "item");
- Safe.ArgumentAssert(item.edge >= 0, "item");
+ Safe.ArgumentAssert(item.s1 >= 0, nameof(item));
+ Safe.ArgumentAssert(item.s2 >= 0, nameof(item));
+ Safe.ArgumentAssert(item.edge >= 0, nameof(item));
m_stateCount = Math.Max(m_stateCount, Math.Max(item.s1, item.s2) + 1);
m_symbolCount = Math.Max(m_symbolCount, item.edge + 1);
diff --git a/Implab/Components/ExecutionState.cs b/Implab/Components/ExecutionState.cs
--- a/Implab/Components/ExecutionState.cs
+++ b/Implab/Components/ExecutionState.cs
@@ -15,6 +15,8 @@
Stopping,
+ Stopped,
+
Failed,
Disposed,
diff --git a/Implab/Components/IInitializable.cs b/Implab/Components/IInitializable.cs
--- a/Implab/Components/IInitializable.cs
+++ b/Implab/Components/IInitializable.cs
@@ -11,9 +11,16 @@ namespace Implab.Components {
/// Completes initialization.
///
///
+ ///
/// Normally virtual methods shouldn't be called from the constructor, due to the incomplete object state, but
/// they can be called from this method. This method is also usefull when we constructing a complex grpah
/// of components where cyclic references may take place.
+ ///
+ ///
+ /// In asyncronous patterns can be called
+ /// to start initialization and the
+ /// property can be used to track operation completion.
+ ///
///
void Initialize();
}
diff --git a/Implab/Components/IRunnable.cs b/Implab/Components/IRunnable.cs
--- a/Implab/Components/IRunnable.cs
+++ b/Implab/Components/IRunnable.cs
@@ -6,23 +6,53 @@ namespace Implab.Components {
///
/// Interface for the component which performs a long running task.
///
- public interface IRunnable : IDisposable {
+ ///
+ /// The access to the runnable component should be sequential, the
+ /// componet should support asynchronous completion of the initiated
+ /// operation but operations itself must be initiated sequentially.
+ ///
+ public interface IRunnable {
///
/// Starts this instance
///
+ ///
+ /// This operation is cancellable and it's expected to move to
+ /// the failed state or just ignore the cancellation request,
+ ///
void Start(CancellationToken ct);
///
- /// Stops this instance and releases all resources, after the instance is stopped it is moved to Disposed state and can't be reused.
+ /// Stops this instance and releases all resources, after the
+ /// instance is stopped it is moved to Disposed state and
+ /// can't be reused.
///
+ ///
+ /// If the componet was in the starting state the pending operation
+ /// will be requested to cancel. The stop operatin will be
+ /// performed only if the component in the running state.
+ ///
void Stop(CancellationToken ct);
- Task Completion { get; }
+ ///
+ /// The result of the last started operation. This property reflects
+ /// only the result of the last started operation and therefore should
+ /// change only if a new operation is initiated.
+ ///
+ Task Completion { get; }
+ ///
+ /// Current state of the componenet
+ ///
ExecutionState State { get; }
+ ///
+ /// Event to monitor the state of the component.
+ ///
event EventHandler StateChanged;
+ ///
+ /// The last error
+ ///
Exception LastError { get; }
}
}
diff --git a/Implab/Components/ObjectPool.cs b/Implab/Components/ObjectPool.cs
--- a/Implab/Components/ObjectPool.cs
+++ b/Implab/Components/ObjectPool.cs
@@ -26,7 +26,7 @@ namespace Implab.Components {
}
protected ObjectPool(int size) {
- Safe.ArgumentInRange(size,1,size,"size");
+ Safe.ArgumentInRange(size > 0, nameof(size));
m_size = size;
}
diff --git a/Implab/Components/RunnableComponent.cs b/Implab/Components/RunnableComponent.cs
--- a/Implab/Components/RunnableComponent.cs
+++ b/Implab/Components/RunnableComponent.cs
@@ -1,57 +1,273 @@
using System;
+using System.Diagnostics;
using System.Threading;
using System.Threading.Tasks;
-namespace Implab.Components
-{
- public class RunnableComponent : IRunnable {
+namespace Implab.Components {
+ ///
+ /// Base class for implementing components which support start and stop operations,
+ /// such components may represent running services.
+ ///
+ ///
+ /// This class provides a basic lifecycle from the creation to the
+ /// termination of the component.
+ ///
+ public class RunnableComponent : IRunnable, IInitializable, IDisposable {
+
+ ///
+ /// This class bound lifetime to the task,
+ /// when the task completes the associated token source will be disposed.
+ ///
+ class AsyncOperationDescriptor {
+
+ public static AsyncOperationDescriptor None { get; } = new AsyncOperationDescriptor();
+
+ readonly CancellationTokenSource m_cts;
+
+ bool m_done;
+
+ public CancellationToken Token {
+ get { return m_cts == null ? CancellationToken.None : m_cts.Token; }
+ }
+
+ public Task Task { get; private set; }
+
+ private AsyncOperationDescriptor(Task task, CancellationTokenSource cts) {
+ m_cts = cts;
+ Task = Chain(task);
+ }
+
+ private AsyncOperationDescriptor() {
+ Task = Task.CompletedTask;
+ }
+ public void Cancel() {
+ if (m_cts != null) {
+ lock (m_cts) {
+ if (!m_done)
+ m_cts.Cancel();
+ }
+ }
+ }
+
+ void Done() {
+ if (m_cts != null) {
+ lock (m_cts) {
+ m_done = true;
+ m_cts.Dispose();
+ }
+ } else {
+ m_done = true;
+ }
+ }
+
+ async Task Chain(Task other) {
+ try {
+ await other;
+ } finally {
+ Done();
+ }
+ }
+
+ public static AsyncOperationDescriptor Create(Func factory, CancellationToken ct) {
+ var cts = ct.CanBeCanceled ?
+ CancellationTokenSource.CreateLinkedTokenSource(ct) :
+ new CancellationTokenSource();
+
+ return new AsyncOperationDescriptor(factory(cts.Token), cts);
+ }
+
+ }
+
+ // this lock is used to synchronize state flow of the component during
+ // completions or the operations.
readonly object m_lock = new object();
- CancellationTokenSource m_cts;
+ // current operation cookie, used to check wheather a call to
+ // MoveSuccess/MoveFailed method belongs to the current
+ // operation, if cookies didn't match ignore completion result.
+ object m_cookie;
- public Task Completion {
- get;
- private set;
+ AsyncOperationDescriptor m_current = AsyncOperationDescriptor.None;
+
+ ExecutionState m_state;
+
+
+ protected RunnableComponent(bool initialized) {
+ State = initialized ? ExecutionState.Ready : ExecutionState.Created;
}
- public ExecutionState State => throw new NotImplementedException();
+ public Task Completion {
+ get { return m_current.Task; }
+ }
- public Exception LastError => throw new NotImplementedException();
+ public ExecutionState State {
+ get { return m_state; }
+ private set {
+ if (m_state != value) {
+ m_state = value;
+ StateChanged.DispatchEvent(this, new StateChangeEventArgs {
+ State = value,
+ LastError = LastError
+ });
+ }
+ }
+ }
+
+ public Exception LastError { get; private set; }
public event EventHandler StateChanged;
+ ///
+ /// Releases all resources used by the current component regardless of its
+ /// execution state.
+ ///
+ ///
+ /// Calling to this method may result unexpedted results if the component
+ /// isn't in the stopped state. Call this method after the component is
+ /// stopped if needed or if the component is in the failed state.
+ ///
public void Dispose() {
- lock(m_lock) {
+ bool dispose = false;
+ if (dispose) {
Dispose(true);
GC.SuppressFinalize(this);
}
}
+ ~RunnableComponent() {
+ Dispose(false);
+ }
+
+ ///
+ /// Releases all resources used by the current component regardless of its
+ /// execution state.
+ ///
+ /// Indicates that the component is disposed
+ /// during a normal disposing or during GC.
protected virtual void Dispose(bool disposing) {
- if (disposing) {
- Safe.Dispose(m_cts);
- }
+ }
+
+ public void Initialize() {
+ var cookie = new object();
+ if (MoveInitialize(cookie))
+ ScheduleTask(InitializeInternalAsync, CancellationToken.None, cookie);
+ }
+
+ ///
+ /// This method is used for initialization during a component creation.
+ ///
+ /// A cancellation token for this operation
+ ///
+ /// This method should be used for short and mostly syncronous operations,
+ /// other operations which require time to run shoud be placed in
+ /// method.
+ ///
+ protected virtual Task InitializeInternalAsync(CancellationToken ct) {
+ return Task.CompletedTask;
}
public void Start(CancellationToken ct) {
- lock(m_lock) {
- switch (State)
- {
-
- default:
- throw new InvalidOperationException();
+ var cookie = new object();
+ if (MoveStart(cookie))
+ ScheduleTask(StartInternal, ct, cookie);
+ }
+
+ protected virtual Task StartInternal(CancellationToken ct) {
+ return Task.CompletedTask;
+ }
+
+ public void Stop(CancellationToken ct) {
+ var cookie = new object();
+ if (MoveStop(cookie))
+ ScheduleTask(StopAsync, ct, cookie);
+ }
+
+ async Task StopAsync(CancellationToken ct) {
+ m_current.Cancel();
+ await Completion;
+
+ ct.ThrowIfCancellationRequested();
+
+ await StopInternalAsync(ct);
+ }
+
+ protected virtual Task StopInternalAsync(CancellationToken ct) {
+ return Task.CompletedTask;
+ }
+
+
+ #region state management
+
+ bool MoveInitialize(object cookie) {
+ lock (m_lock) {
+ if (State != ExecutionState.Created)
+ return false;
+ State = ExecutionState.Initializing;
+ m_cookie = cookie;
+ return true;
+ }
+ }
+
+ bool MoveStart(object cookie) {
+ lock (m_lock) {
+ if (State != ExecutionState.Ready)
+ return false;
+ State = ExecutionState.Starting;
+ m_cookie = cookie;
+ return true;
+ }
+ }
+
+ bool MoveStop(object cookie) {
+ lock (m_lock) {
+ if (State != ExecutionState.Starting && State != ExecutionState.Running)
+ return false;
+ State = ExecutionState.Stopping;
+ m_cookie = cookie;
+ return true;
+ }
+ }
+
+ void MoveSuccess(object cookie) {
+ lock (m_lock) {
+ if (m_cookie != cookie)
+ return;
+ switch (State) {
+ case ExecutionState.Initializing:
+ State = ExecutionState.Ready;
+ break;
+ case ExecutionState.Starting:
+ State = ExecutionState.Running;
+ break;
+ case ExecutionState.Stopping:
+ State = ExecutionState.Stopped;
+ break;
}
}
}
- public void Stop(CancellationToken ct) {
- throw new NotImplementedException();
+ void MoveFailed(Exception err, object cookie) {
+ lock (m_lock) {
+ if (m_cookie != cookie)
+ return;
+ LastError = err;
+ State = ExecutionState.Failed;
+ }
}
- protected virtual Task StartImpl(CancellationToken ct) {
+
- return Task.CompletedTask;
+ protected async void ScheduleTask(Func next, CancellationToken ct, object cookie) {
+ try {
+ m_current = AsyncOperationDescriptor.Create(next, ct);
+ await m_current.Task;
+ MoveSuccess(cookie);
+ } catch (Exception e) {
+ MoveFailed(e, cookie);
+ }
}
+
+ #endregion
}
}
\ No newline at end of file
diff --git a/Implab/Deferred.cs b/Implab/Deferred.cs
--- a/Implab/Deferred.cs
+++ b/Implab/Deferred.cs
@@ -1,5 +1,6 @@
using System;
using System.Diagnostics;
+using System.Threading.Tasks;
namespace Implab {
///
@@ -49,5 +50,15 @@ namespace Implab {
}
}
+ public virtual void Resolve(Task thenable) {
+ if (thenable == null)
+ Reject(new Exception("The promise or task are expected"));
+ try {
+ thenable.Then(this);
+ } catch(Exception err) {
+ Reject(err);
+ }
+ }
+
}
}
\ No newline at end of file
diff --git a/Implab/Deferred`1.cs b/Implab/Deferred`1.cs
--- a/Implab/Deferred`1.cs
+++ b/Implab/Deferred`1.cs
@@ -1,5 +1,6 @@
using System;
using System.Diagnostics;
+using System.Threading.Tasks;
namespace Implab {
public class Deferred : IResolvable {
@@ -45,5 +46,16 @@ namespace Implab {
Reject(err);
}
}
+
+ public virtual void Resolve(Task thenable) {
+ if (thenable == null)
+ Reject(new Exception("The promise or task are expected"));
+
+ try {
+ thenable.Then(this);
+ } catch (Exception err) {
+ Reject(err);
+ }
+ }
}
}
\ No newline at end of file
diff --git a/Implab/Formats/Json/JsonStringScanner.cs b/Implab/Formats/Json/JsonStringScanner.cs
--- a/Implab/Formats/Json/JsonStringScanner.cs
+++ b/Implab/Formats/Json/JsonStringScanner.cs
@@ -43,11 +43,8 @@ namespace Implab.Formats.Json {
public static JsonStringScanner Create(string data, int offset, int length) {
Safe.ArgumentNotNull(data, nameof(data));
- Safe.ArgumentGreaterThan(offset, 0, nameof(offset));
- Safe.ArgumentGreaterThan(length, 0, nameof(length));
-
- if (offset + length > data.Length)
- throw new ArgumentOutOfRangeException("Specified offset and length are out of the string bounds");
+ Safe.ArgumentInRange(offset >= 0 && offset < data.Length , nameof(offset));
+ Safe.ArgumentInRange(length >= 0 && offset + length <= data.Length, nameof(length));
if (length <= _defaultBuffer) {
var buffer = new char[length];
@@ -63,11 +60,8 @@ namespace Implab.Formats.Json {
public static JsonStringScanner Create(char[] data, int offset, int length) {
Safe.ArgumentNotNull(data, nameof(data));
- Safe.ArgumentGreaterThan(offset, 0, nameof(offset));
- Safe.ArgumentGreaterThan(length, 0, nameof(length));
-
- if (offset + length > data.Length)
- throw new ArgumentOutOfRangeException("Specified offset and length are out of the array bounds");
+ Safe.ArgumentInRange(offset >= 0 && offset < data.Length , nameof(offset));
+ Safe.ArgumentInRange(length >= 0 && offset + length <= data.Length, nameof(length));
return new JsonStringScanner(null, data, offset, offset + length, offset + length);
diff --git a/Implab/Messaging/IConsumer.cs b/Implab/Messaging/IConsumer.cs
--- a/Implab/Messaging/IConsumer.cs
+++ b/Implab/Messaging/IConsumer.cs
@@ -3,7 +3,9 @@ using System.Threading.Tasks;
namespace Implab.Messaging {
public interface IConsumer {
- Task Receive(CancellationToken ct);
+ T Receive(CancellationToken ct);
+
+ Task ReceiveAsync(CancellationToken ct);
bool TryReceive(out T message);
}
diff --git a/Implab/Messaging/IProducer.cs b/Implab/Messaging/IProducer.cs
--- a/Implab/Messaging/IProducer.cs
+++ b/Implab/Messaging/IProducer.cs
@@ -4,8 +4,12 @@ using System.Threading.Tasks;
namespace Implab.Messaging {
public interface IProducer {
+ void PostMessage(T message, CancellationToken ct);
+
Task PostMessageAsync(T message, CancellationToken ct);
+ void PostMessages(IEnumerable messages, CancellationToken ct);
+
Task PostMessagesAsync(IEnumerable messages, CancellationToken ct);
}
}
\ No newline at end of file
diff --git a/Implab/Messaging/ISession.cs b/Implab/Messaging/ISession.cs
new file mode 100644
--- /dev/null
+++ b/Implab/Messaging/ISession.cs
@@ -0,0 +1,14 @@
+namespace Implab.Messaging {
+ public interface ISession {
+ ///
+ /// Starts message consumers, call this method after all adapters are ready
+ ///
+ void Start();
+
+ ///
+ /// Stops message consumers
+ ///
+ void Stop();
+
+ }
+}
\ No newline at end of file
diff --git a/Implab/Parallels/BlockingQueue.cs b/Implab/Parallels/BlockingQueue.cs
--- a/Implab/Parallels/BlockingQueue.cs
+++ b/Implab/Parallels/BlockingQueue.cs
@@ -54,7 +54,7 @@ namespace Implab.Parallels {
}
public T[] GetRange(int max, int timeout) {
- Safe.ArgumentInRange(max, 1, Int32.MaxValue, "max");
+ Safe.ArgumentInRange(max > 0 , nameof(max));
var buffer = new T[max];
int actual;
@@ -83,7 +83,7 @@ namespace Implab.Parallels {
}
public T[] GetRange(int max) {
- Safe.ArgumentInRange(max, 1, Int32.MaxValue, "max");
+ Safe.ArgumentInRange(max > 0, nameof(max));
var buffer = new T[max];
int actual;
diff --git a/Implab/Safe.cs b/Implab/Safe.cs
--- a/Implab/Safe.cs
+++ b/Implab/Safe.cs
@@ -6,6 +6,7 @@ using System.Text.RegularExpressions;
using System.Diagnostics;
using System.Collections;
using System.Runtime.CompilerServices;
+using System.Threading.Tasks;
#if NET_4_5
using System.Threading.Tasks;
@@ -48,14 +49,14 @@ namespace Implab
}
[MethodImpl(MethodImplOptions.AggressiveInlining)]
- internal static void ArgumentGreaterThan(int value, int min, string paramName) {
+ internal static void ArgumentGreaterEqThan(int value, int min, string paramName) {
if (value < min)
throw new ArgumentOutOfRangeException(paramName);
}
[MethodImpl(MethodImplOptions.AggressiveInlining)]
- public static void ArgumentInRange(int value, int min, int max, string paramName) {
- if (value < min || value > max)
+ public static void ArgumentInRange(bool condition, string paramName) {
+ if (!condition)
throw new ArgumentOutOfRangeException(paramName);
}
@@ -144,6 +145,12 @@ namespace Implab
public static void NoWait(IPromise promise) {
}
+ public static void NoWait(Task promise) {
+ }
+
+ public static void NoWait(Task promise) {
+ }
+
[DebuggerStepThrough]
public static IPromise Run(Func> action) {
ArgumentNotNull(action, "action");
diff --git a/Implab/TaskHelpers.cs b/Implab/TaskHelpers.cs
new file mode 100644
--- /dev/null
+++ b/Implab/TaskHelpers.cs
@@ -0,0 +1,70 @@
+using System;
+using System.Threading.Tasks;
+
+namespace Implab {
+ public static class TaskHelpers {
+
+ public static async Task Then(this Task that, Action fulfilled, Action rejected) {
+ Safe.ArgumentNotNull(that, nameof(that));
+ if (rejected != null) {
+ try {
+ await that;
+ } catch (Exception e) {
+ rejected(e);
+ return;
+ }
+ } else {
+ await that;
+ }
+
+ if (fulfilled != null)
+ fulfilled();
+ }
+
+ public static async Task Then(this Task that, Action fulfilled) {
+ Safe.ArgumentNotNull(that, nameof(that));
+ await that;
+ if (fulfilled != null)
+ fulfilled();
+ }
+
+ public static async Task Then(this Task that, Func fulfilled) {
+ Safe.ArgumentNotNull(that, nameof(that));
+ await that;
+ if (fulfilled != null)
+ await fulfilled();
+ }
+
+ public static async Task Finally(this Task that, Action handler) {
+ Safe.ArgumentNotNull(that, nameof(that));
+ try {
+ await that;
+ } finally {
+ if (handler != null)
+ handler();
+ }
+ }
+
+ public static async void Then(this Task that, IResolvable next) {
+ try {
+ await that;
+ } catch (Exception e) {
+ next.Reject(e);
+ return;
+ }
+ next.Resolve();
+ }
+
+ public static async void Then(this Task that, IResolvable next) {
+ T result;
+ try {
+ result = await that;
+ } catch (Exception e) {
+ next.Reject(e);
+ return;
+ }
+ next.Resolve(result);
+ }
+
+ }
+}
\ No newline at end of file
diff --git a/Implab/Xml/JsonXmlReader.cs b/Implab/Xml/JsonXmlReader.cs
--- a/Implab/Xml/JsonXmlReader.cs
+++ b/Implab/Xml/JsonXmlReader.cs
@@ -163,7 +163,7 @@ namespace Implab.Xml {
}
public override string GetAttribute(int i) {
- Safe.ArgumentInRange(i, 0, AttributeCount - 1, nameof(i));
+ Safe.ArgumentInRange(i >= 0 && i < AttributeCount, nameof(i));
return m_attributes[i].Value;
}
diff --git a/Implab/Xml/SerializationHelpers.cs b/Implab/Xml/SerializationHelpers.cs
--- a/Implab/Xml/SerializationHelpers.cs
+++ b/Implab/Xml/SerializationHelpers.cs
@@ -38,6 +38,20 @@ namespace Implab.Xml {
SerializersPool.Instance.Serialize(writer, obj);
}
+ public static void SerializeToElementChild(XmlElement element, T obj) {
+ using(var writer = element.CreateNavigator().AppendChild())
+ SerializersPool.Instance.Serialize(writer, obj);
+ }
+
+ public static T Deserialize(XmlReader reader) {
+ return SerializersPool.Instance.Deserialize(reader);
+ }
+
+ public static T DeserializeFromFile(string file) {
+ using(var reader = XmlReader.Create(File.OpenText(file)))
+ return Deserialize(reader);
+ }
+
public static T DeserializeFromString(string data) {
return SerializersPool.Instance.DeserializeFromString(data);
}