diff --git a/Implab.Test/UnitTest1.cs b/Implab.Test/UnitTest1.cs --- a/Implab.Test/UnitTest1.cs +++ b/Implab.Test/UnitTest1.cs @@ -1,4 +1,5 @@ using System; +using System.Diagnostics; using System.Threading; using Xunit; @@ -9,9 +10,20 @@ namespace Implab.Test [Fact] public void Test1() { - using(var cts = new CancellationTokenSource(1000)) { - PromiseHelper.Sleep(10000, cts.Token).Join(); - } + var listener = new TextWriterTraceListener(Console.Out); + var source = new TraceSource("Custom",SourceLevels.ActivityTracing); + + source.Listeners.Add(listener); + + Trace.Listeners.Add(listener); + Trace.WriteLine("Hello!"); + Trace.CorrelationManager.StartLogicalOperation(); + Trace.WriteLine("Inner"); + foreach(var x in Trace.CorrelationManager.LogicalOperationStack) + Trace.WriteLine($"-{x}"); + source.TraceEvent(TraceEventType.Information, 1, "source event"); + source.TraceData(TraceEventType.Start, 1, DateTime.Now); + Trace.CorrelationManager.StopLogicalOperation(); } } } diff --git a/Implab/Automaton/DFATable.cs b/Implab/Automaton/DFATable.cs --- a/Implab/Automaton/DFATable.cs +++ b/Implab/Automaton/DFATable.cs @@ -20,7 +20,7 @@ namespace Implab.Automaton { #region IDFADefinition implementation public bool IsFinalState(int s) { - Safe.ArgumentInRange(s, 0, m_stateCount, "s"); + Safe.ArgumentInRange(s >= 0 && s < m_stateCount, nameof(s)); return m_finalStates.Contains(s); } @@ -46,7 +46,7 @@ namespace Implab.Automaton { #endregion public void SetInitialState(int s) { - Safe.ArgumentAssert(s >= 0, "s"); + Safe.ArgumentInRange(s >= 0, nameof(s)); m_stateCount = Math.Max(m_stateCount, s + 1); m_initialState = s; } @@ -57,9 +57,9 @@ namespace Implab.Automaton { } public void Add(AutomatonTransition item) { - Safe.ArgumentAssert(item.s1 >= 0, "item"); - Safe.ArgumentAssert(item.s2 >= 0, "item"); - Safe.ArgumentAssert(item.edge >= 0, "item"); + Safe.ArgumentAssert(item.s1 >= 0, nameof(item)); + Safe.ArgumentAssert(item.s2 >= 0, nameof(item)); + Safe.ArgumentAssert(item.edge >= 0, nameof(item)); m_stateCount = Math.Max(m_stateCount, Math.Max(item.s1, item.s2) + 1); m_symbolCount = Math.Max(m_symbolCount, item.edge + 1); diff --git a/Implab/Components/ExecutionState.cs b/Implab/Components/ExecutionState.cs --- a/Implab/Components/ExecutionState.cs +++ b/Implab/Components/ExecutionState.cs @@ -15,6 +15,8 @@ Stopping, + Stopped, + Failed, Disposed, diff --git a/Implab/Components/IInitializable.cs b/Implab/Components/IInitializable.cs --- a/Implab/Components/IInitializable.cs +++ b/Implab/Components/IInitializable.cs @@ -11,9 +11,16 @@ namespace Implab.Components { /// Completes initialization. /// /// + /// /// Normally virtual methods shouldn't be called from the constructor, due to the incomplete object state, but /// they can be called from this method. This method is also usefull when we constructing a complex grpah /// of components where cyclic references may take place. + /// + /// + /// In asyncronous patterns can be called + /// to start initialization and the + /// property can be used to track operation completion. + /// /// void Initialize(); } diff --git a/Implab/Components/IRunnable.cs b/Implab/Components/IRunnable.cs --- a/Implab/Components/IRunnable.cs +++ b/Implab/Components/IRunnable.cs @@ -6,23 +6,53 @@ namespace Implab.Components { /// /// Interface for the component which performs a long running task. /// - public interface IRunnable : IDisposable { + /// + /// The access to the runnable component should be sequential, the + /// componet should support asynchronous completion of the initiated + /// operation but operations itself must be initiated sequentially. + /// + public interface IRunnable { /// /// Starts this instance /// + /// + /// This operation is cancellable and it's expected to move to + /// the failed state or just ignore the cancellation request, + /// void Start(CancellationToken ct); /// - /// Stops this instance and releases all resources, after the instance is stopped it is moved to Disposed state and can't be reused. + /// Stops this instance and releases all resources, after the + /// instance is stopped it is moved to Disposed state and + /// can't be reused. /// + /// + /// If the componet was in the starting state the pending operation + /// will be requested to cancel. The stop operatin will be + /// performed only if the component in the running state. + /// void Stop(CancellationToken ct); - Task Completion { get; } + /// + /// The result of the last started operation. This property reflects + /// only the result of the last started operation and therefore should + /// change only if a new operation is initiated. + /// + Task Completion { get; } + /// + /// Current state of the componenet + /// ExecutionState State { get; } + /// + /// Event to monitor the state of the component. + /// event EventHandler StateChanged; + /// + /// The last error + /// Exception LastError { get; } } } diff --git a/Implab/Components/ObjectPool.cs b/Implab/Components/ObjectPool.cs --- a/Implab/Components/ObjectPool.cs +++ b/Implab/Components/ObjectPool.cs @@ -26,7 +26,7 @@ namespace Implab.Components { } protected ObjectPool(int size) { - Safe.ArgumentInRange(size,1,size,"size"); + Safe.ArgumentInRange(size > 0, nameof(size)); m_size = size; } diff --git a/Implab/Components/RunnableComponent.cs b/Implab/Components/RunnableComponent.cs --- a/Implab/Components/RunnableComponent.cs +++ b/Implab/Components/RunnableComponent.cs @@ -1,57 +1,273 @@ using System; +using System.Diagnostics; using System.Threading; using System.Threading.Tasks; -namespace Implab.Components -{ - public class RunnableComponent : IRunnable { +namespace Implab.Components { + /// + /// Base class for implementing components which support start and stop operations, + /// such components may represent running services. + /// + /// + /// This class provides a basic lifecycle from the creation to the + /// termination of the component. + /// + public class RunnableComponent : IRunnable, IInitializable, IDisposable { + + /// + /// This class bound lifetime to the task, + /// when the task completes the associated token source will be disposed. + /// + class AsyncOperationDescriptor { + + public static AsyncOperationDescriptor None { get; } = new AsyncOperationDescriptor(); + + readonly CancellationTokenSource m_cts; + + bool m_done; + + public CancellationToken Token { + get { return m_cts == null ? CancellationToken.None : m_cts.Token; } + } + + public Task Task { get; private set; } + + private AsyncOperationDescriptor(Task task, CancellationTokenSource cts) { + m_cts = cts; + Task = Chain(task); + } + + private AsyncOperationDescriptor() { + Task = Task.CompletedTask; + } + public void Cancel() { + if (m_cts != null) { + lock (m_cts) { + if (!m_done) + m_cts.Cancel(); + } + } + } + + void Done() { + if (m_cts != null) { + lock (m_cts) { + m_done = true; + m_cts.Dispose(); + } + } else { + m_done = true; + } + } + + async Task Chain(Task other) { + try { + await other; + } finally { + Done(); + } + } + + public static AsyncOperationDescriptor Create(Func factory, CancellationToken ct) { + var cts = ct.CanBeCanceled ? + CancellationTokenSource.CreateLinkedTokenSource(ct) : + new CancellationTokenSource(); + + return new AsyncOperationDescriptor(factory(cts.Token), cts); + } + + } + + // this lock is used to synchronize state flow of the component during + // completions or the operations. readonly object m_lock = new object(); - CancellationTokenSource m_cts; + // current operation cookie, used to check wheather a call to + // MoveSuccess/MoveFailed method belongs to the current + // operation, if cookies didn't match ignore completion result. + object m_cookie; - public Task Completion { - get; - private set; + AsyncOperationDescriptor m_current = AsyncOperationDescriptor.None; + + ExecutionState m_state; + + + protected RunnableComponent(bool initialized) { + State = initialized ? ExecutionState.Ready : ExecutionState.Created; } - public ExecutionState State => throw new NotImplementedException(); + public Task Completion { + get { return m_current.Task; } + } - public Exception LastError => throw new NotImplementedException(); + public ExecutionState State { + get { return m_state; } + private set { + if (m_state != value) { + m_state = value; + StateChanged.DispatchEvent(this, new StateChangeEventArgs { + State = value, + LastError = LastError + }); + } + } + } + + public Exception LastError { get; private set; } public event EventHandler StateChanged; + /// + /// Releases all resources used by the current component regardless of its + /// execution state. + /// + /// + /// Calling to this method may result unexpedted results if the component + /// isn't in the stopped state. Call this method after the component is + /// stopped if needed or if the component is in the failed state. + /// public void Dispose() { - lock(m_lock) { + bool dispose = false; + if (dispose) { Dispose(true); GC.SuppressFinalize(this); } } + ~RunnableComponent() { + Dispose(false); + } + + /// + /// Releases all resources used by the current component regardless of its + /// execution state. + /// + /// Indicates that the component is disposed + /// during a normal disposing or during GC. protected virtual void Dispose(bool disposing) { - if (disposing) { - Safe.Dispose(m_cts); - } + } + + public void Initialize() { + var cookie = new object(); + if (MoveInitialize(cookie)) + ScheduleTask(InitializeInternalAsync, CancellationToken.None, cookie); + } + + /// + /// This method is used for initialization during a component creation. + /// + /// A cancellation token for this operation + /// + /// This method should be used for short and mostly syncronous operations, + /// other operations which require time to run shoud be placed in + /// method. + /// + protected virtual Task InitializeInternalAsync(CancellationToken ct) { + return Task.CompletedTask; } public void Start(CancellationToken ct) { - lock(m_lock) { - switch (State) - { - - default: - throw new InvalidOperationException(); + var cookie = new object(); + if (MoveStart(cookie)) + ScheduleTask(StartInternal, ct, cookie); + } + + protected virtual Task StartInternal(CancellationToken ct) { + return Task.CompletedTask; + } + + public void Stop(CancellationToken ct) { + var cookie = new object(); + if (MoveStop(cookie)) + ScheduleTask(StopAsync, ct, cookie); + } + + async Task StopAsync(CancellationToken ct) { + m_current.Cancel(); + await Completion; + + ct.ThrowIfCancellationRequested(); + + await StopInternalAsync(ct); + } + + protected virtual Task StopInternalAsync(CancellationToken ct) { + return Task.CompletedTask; + } + + + #region state management + + bool MoveInitialize(object cookie) { + lock (m_lock) { + if (State != ExecutionState.Created) + return false; + State = ExecutionState.Initializing; + m_cookie = cookie; + return true; + } + } + + bool MoveStart(object cookie) { + lock (m_lock) { + if (State != ExecutionState.Ready) + return false; + State = ExecutionState.Starting; + m_cookie = cookie; + return true; + } + } + + bool MoveStop(object cookie) { + lock (m_lock) { + if (State != ExecutionState.Starting && State != ExecutionState.Running) + return false; + State = ExecutionState.Stopping; + m_cookie = cookie; + return true; + } + } + + void MoveSuccess(object cookie) { + lock (m_lock) { + if (m_cookie != cookie) + return; + switch (State) { + case ExecutionState.Initializing: + State = ExecutionState.Ready; + break; + case ExecutionState.Starting: + State = ExecutionState.Running; + break; + case ExecutionState.Stopping: + State = ExecutionState.Stopped; + break; } } } - public void Stop(CancellationToken ct) { - throw new NotImplementedException(); + void MoveFailed(Exception err, object cookie) { + lock (m_lock) { + if (m_cookie != cookie) + return; + LastError = err; + State = ExecutionState.Failed; + } } - protected virtual Task StartImpl(CancellationToken ct) { + - return Task.CompletedTask; + protected async void ScheduleTask(Func next, CancellationToken ct, object cookie) { + try { + m_current = AsyncOperationDescriptor.Create(next, ct); + await m_current.Task; + MoveSuccess(cookie); + } catch (Exception e) { + MoveFailed(e, cookie); + } } + + #endregion } } \ No newline at end of file diff --git a/Implab/Deferred.cs b/Implab/Deferred.cs --- a/Implab/Deferred.cs +++ b/Implab/Deferred.cs @@ -1,5 +1,6 @@ using System; using System.Diagnostics; +using System.Threading.Tasks; namespace Implab { /// @@ -49,5 +50,15 @@ namespace Implab { } } + public virtual void Resolve(Task thenable) { + if (thenable == null) + Reject(new Exception("The promise or task are expected")); + try { + thenable.Then(this); + } catch(Exception err) { + Reject(err); + } + } + } } \ No newline at end of file diff --git a/Implab/Deferred`1.cs b/Implab/Deferred`1.cs --- a/Implab/Deferred`1.cs +++ b/Implab/Deferred`1.cs @@ -1,5 +1,6 @@ using System; using System.Diagnostics; +using System.Threading.Tasks; namespace Implab { public class Deferred : IResolvable { @@ -45,5 +46,16 @@ namespace Implab { Reject(err); } } + + public virtual void Resolve(Task thenable) { + if (thenable == null) + Reject(new Exception("The promise or task are expected")); + + try { + thenable.Then(this); + } catch (Exception err) { + Reject(err); + } + } } } \ No newline at end of file diff --git a/Implab/Formats/Json/JsonStringScanner.cs b/Implab/Formats/Json/JsonStringScanner.cs --- a/Implab/Formats/Json/JsonStringScanner.cs +++ b/Implab/Formats/Json/JsonStringScanner.cs @@ -43,11 +43,8 @@ namespace Implab.Formats.Json { public static JsonStringScanner Create(string data, int offset, int length) { Safe.ArgumentNotNull(data, nameof(data)); - Safe.ArgumentGreaterThan(offset, 0, nameof(offset)); - Safe.ArgumentGreaterThan(length, 0, nameof(length)); - - if (offset + length > data.Length) - throw new ArgumentOutOfRangeException("Specified offset and length are out of the string bounds"); + Safe.ArgumentInRange(offset >= 0 && offset < data.Length , nameof(offset)); + Safe.ArgumentInRange(length >= 0 && offset + length <= data.Length, nameof(length)); if (length <= _defaultBuffer) { var buffer = new char[length]; @@ -63,11 +60,8 @@ namespace Implab.Formats.Json { public static JsonStringScanner Create(char[] data, int offset, int length) { Safe.ArgumentNotNull(data, nameof(data)); - Safe.ArgumentGreaterThan(offset, 0, nameof(offset)); - Safe.ArgumentGreaterThan(length, 0, nameof(length)); - - if (offset + length > data.Length) - throw new ArgumentOutOfRangeException("Specified offset and length are out of the array bounds"); + Safe.ArgumentInRange(offset >= 0 && offset < data.Length , nameof(offset)); + Safe.ArgumentInRange(length >= 0 && offset + length <= data.Length, nameof(length)); return new JsonStringScanner(null, data, offset, offset + length, offset + length); diff --git a/Implab/Messaging/IConsumer.cs b/Implab/Messaging/IConsumer.cs --- a/Implab/Messaging/IConsumer.cs +++ b/Implab/Messaging/IConsumer.cs @@ -3,7 +3,9 @@ using System.Threading.Tasks; namespace Implab.Messaging { public interface IConsumer { - Task Receive(CancellationToken ct); + T Receive(CancellationToken ct); + + Task ReceiveAsync(CancellationToken ct); bool TryReceive(out T message); } diff --git a/Implab/Messaging/IProducer.cs b/Implab/Messaging/IProducer.cs --- a/Implab/Messaging/IProducer.cs +++ b/Implab/Messaging/IProducer.cs @@ -4,8 +4,12 @@ using System.Threading.Tasks; namespace Implab.Messaging { public interface IProducer { + void PostMessage(T message, CancellationToken ct); + Task PostMessageAsync(T message, CancellationToken ct); + void PostMessages(IEnumerable messages, CancellationToken ct); + Task PostMessagesAsync(IEnumerable messages, CancellationToken ct); } } \ No newline at end of file diff --git a/Implab/Messaging/ISession.cs b/Implab/Messaging/ISession.cs new file mode 100644 --- /dev/null +++ b/Implab/Messaging/ISession.cs @@ -0,0 +1,14 @@ +namespace Implab.Messaging { + public interface ISession { + /// + /// Starts message consumers, call this method after all adapters are ready + /// + void Start(); + + /// + /// Stops message consumers + /// + void Stop(); + + } +} \ No newline at end of file diff --git a/Implab/Parallels/BlockingQueue.cs b/Implab/Parallels/BlockingQueue.cs --- a/Implab/Parallels/BlockingQueue.cs +++ b/Implab/Parallels/BlockingQueue.cs @@ -54,7 +54,7 @@ namespace Implab.Parallels { } public T[] GetRange(int max, int timeout) { - Safe.ArgumentInRange(max, 1, Int32.MaxValue, "max"); + Safe.ArgumentInRange(max > 0 , nameof(max)); var buffer = new T[max]; int actual; @@ -83,7 +83,7 @@ namespace Implab.Parallels { } public T[] GetRange(int max) { - Safe.ArgumentInRange(max, 1, Int32.MaxValue, "max"); + Safe.ArgumentInRange(max > 0, nameof(max)); var buffer = new T[max]; int actual; diff --git a/Implab/Safe.cs b/Implab/Safe.cs --- a/Implab/Safe.cs +++ b/Implab/Safe.cs @@ -6,6 +6,7 @@ using System.Text.RegularExpressions; using System.Diagnostics; using System.Collections; using System.Runtime.CompilerServices; +using System.Threading.Tasks; #if NET_4_5 using System.Threading.Tasks; @@ -48,14 +49,14 @@ namespace Implab } [MethodImpl(MethodImplOptions.AggressiveInlining)] - internal static void ArgumentGreaterThan(int value, int min, string paramName) { + internal static void ArgumentGreaterEqThan(int value, int min, string paramName) { if (value < min) throw new ArgumentOutOfRangeException(paramName); } [MethodImpl(MethodImplOptions.AggressiveInlining)] - public static void ArgumentInRange(int value, int min, int max, string paramName) { - if (value < min || value > max) + public static void ArgumentInRange(bool condition, string paramName) { + if (!condition) throw new ArgumentOutOfRangeException(paramName); } @@ -144,6 +145,12 @@ namespace Implab public static void NoWait(IPromise promise) { } + public static void NoWait(Task promise) { + } + + public static void NoWait(Task promise) { + } + [DebuggerStepThrough] public static IPromise Run(Func> action) { ArgumentNotNull(action, "action"); diff --git a/Implab/TaskHelpers.cs b/Implab/TaskHelpers.cs new file mode 100644 --- /dev/null +++ b/Implab/TaskHelpers.cs @@ -0,0 +1,70 @@ +using System; +using System.Threading.Tasks; + +namespace Implab { + public static class TaskHelpers { + + public static async Task Then(this Task that, Action fulfilled, Action rejected) { + Safe.ArgumentNotNull(that, nameof(that)); + if (rejected != null) { + try { + await that; + } catch (Exception e) { + rejected(e); + return; + } + } else { + await that; + } + + if (fulfilled != null) + fulfilled(); + } + + public static async Task Then(this Task that, Action fulfilled) { + Safe.ArgumentNotNull(that, nameof(that)); + await that; + if (fulfilled != null) + fulfilled(); + } + + public static async Task Then(this Task that, Func fulfilled) { + Safe.ArgumentNotNull(that, nameof(that)); + await that; + if (fulfilled != null) + await fulfilled(); + } + + public static async Task Finally(this Task that, Action handler) { + Safe.ArgumentNotNull(that, nameof(that)); + try { + await that; + } finally { + if (handler != null) + handler(); + } + } + + public static async void Then(this Task that, IResolvable next) { + try { + await that; + } catch (Exception e) { + next.Reject(e); + return; + } + next.Resolve(); + } + + public static async void Then(this Task that, IResolvable next) { + T result; + try { + result = await that; + } catch (Exception e) { + next.Reject(e); + return; + } + next.Resolve(result); + } + + } +} \ No newline at end of file diff --git a/Implab/Xml/JsonXmlReader.cs b/Implab/Xml/JsonXmlReader.cs --- a/Implab/Xml/JsonXmlReader.cs +++ b/Implab/Xml/JsonXmlReader.cs @@ -163,7 +163,7 @@ namespace Implab.Xml { } public override string GetAttribute(int i) { - Safe.ArgumentInRange(i, 0, AttributeCount - 1, nameof(i)); + Safe.ArgumentInRange(i >= 0 && i < AttributeCount, nameof(i)); return m_attributes[i].Value; } diff --git a/Implab/Xml/SerializationHelpers.cs b/Implab/Xml/SerializationHelpers.cs --- a/Implab/Xml/SerializationHelpers.cs +++ b/Implab/Xml/SerializationHelpers.cs @@ -38,6 +38,20 @@ namespace Implab.Xml { SerializersPool.Instance.Serialize(writer, obj); } + public static void SerializeToElementChild(XmlElement element, T obj) { + using(var writer = element.CreateNavigator().AppendChild()) + SerializersPool.Instance.Serialize(writer, obj); + } + + public static T Deserialize(XmlReader reader) { + return SerializersPool.Instance.Deserialize(reader); + } + + public static T DeserializeFromFile(string file) { + using(var reader = XmlReader.Create(File.OpenText(file))) + return Deserialize(reader); + } + public static T DeserializeFromString(string data) { return SerializersPool.Instance.DeserializeFromString(data); }