# HG changeset patch # User cin # Date 2018-01-29 22:37:17 # Node ID 5cb4826c2c2a97b8ce89dd5ba957c9a0ead10477 # Parent fb70574741a1d7ddb6542f4403a96e4273727b90 Added awaiters to promises Added static methods to Promise Resolve, Reject, All. Updated promise helpers diff --git a/Implab/AbstractEvent.cs b/Implab/AbstractEvent.cs --- a/Implab/AbstractEvent.cs +++ b/Implab/AbstractEvent.cs @@ -69,7 +69,7 @@ namespace Implab { protected void WaitTransition() { if (m_state == TransitionalState) { - SpinWait spin; + SpinWait spin = new SpinWait(); do { spin.SpinOnce(); } while (m_state == TransitionalState); @@ -87,17 +87,6 @@ namespace Implab { #endregion - protected abstract Signal GetFulfillSignal(); - - #region synchronization traits - protected void WaitResult(int timeout) { - if (!(IsResolved || GetFulfillSignal().Wait(timeout))) - throw new TimeoutException(); - } - - - #endregion - #region handlers managment protected void AddHandler(THandler handler) { diff --git a/Implab/AbstractPromise.cs b/Implab/AbstractPromise.cs deleted file mode 100644 --- a/Implab/AbstractPromise.cs +++ /dev/null @@ -1,126 +0,0 @@ -using System; -using System.Diagnostics; -using System.Reflection; -using Implab.Parallels; - -namespace Implab { - public class AbstractPromise : AbstractEvent, IPromise { - - class ResolvableSignal : IResolvable { - public Signal Signal { get; private set; } - public ResolvableSignal() { - Signal = new Signal(); - } - - - public void Reject(Exception error) { - Signal.Set(); - } - - public void Resolve() { - Signal.Set(); - } - } - - PromiseState m_state; - - Exception m_error; - - public bool IsRejected { - get { - return m_state == PromiseState.Rejected; - } - } - - public bool IsFulfilled { - get { - return m_state == PromiseState.Fulfilled; - } - } - - public Exception RejectReason { - get { - return m_error; - } - } - - - internal void Resolve() { - if (BeginTransit()) - CompleteResolve(); - } - - internal void Reject(Exception reason) { - if (BeginTransit()) { - m_error = reason; - m_state = PromiseState.Rejected; - CompleteTransit(); - } - } - - - #region implemented abstract members of AbstractPromise - - protected override void SignalHandler(IResolvable handler) { - switch (m_state) { - case PromiseState.Fulfilled: - handler.Resolve(); - break; - case PromiseState.Rejected: - handler.Reject(RejectReason); - break; - default: - throw new InvalidOperationException(String.Format("Invalid promise signal: {0}", m_state)); - } - } - - protected override Signal GetFulfillSignal() { - var next = new ResolvableSignal(); - Then(next); - return next.Signal; - } - - #endregion - - protected void CompleteResolve() { - m_state = PromiseState.Fulfilled; - CompleteTransit(); - } - - public Type ResultType { - get { - return typeof(void); - } - } - - - protected void Rethrow() { - Debug.Assert(m_error != null); - if (m_error is OperationCanceledException) - throw new OperationCanceledException("Operation cancelled", m_error); - else - throw new TargetInvocationException(m_error); - } - - public void Then(IResolvable next) { - AddHandler(next); - } - - public IPromise Cast() { - throw new InvalidCastException(); - } - - public void Join() { - WaitResult(-1); - if (IsRejected) - Rethrow(); - } - - public void Join(int timeout) { - WaitResult(timeout); - if (IsRejected) - Rethrow(); - } - } -} - diff --git a/Implab/AbstractPromise`1.cs b/Implab/AbstractPromise`1.cs deleted file mode 100644 --- a/Implab/AbstractPromise`1.cs +++ /dev/null @@ -1,161 +0,0 @@ -using System; -using System.Diagnostics; -using System.Reflection; -using Implab.Parallels; - -namespace Implab { - public class AbstractPromise : AbstractEvent>, IPromise { - - class ResolvableSignal : IResolvable { - public Signal Signal { get; private set; } - public ResolvableSignal() { - Signal = new Signal(); - } - - - public void Reject(Exception error) { - Signal.Set(); - } - - public void Resolve(T result) { - Signal.Set(); - } - } - - class ResolvableWrapper : IResolvable { - readonly IResolvable m_resolvable; - public ResolvableWrapper(IResolvable resolvable) { - - } - - public void Reject(Exception reason) { - m_resolvable.Reject(reason); - } - - public void Resolve(T value) { - m_resolvable.Resolve(); - } - } - - PromiseState m_state; - - T m_result; - - Exception m_error; - - public bool IsRejected { - get { - return m_state == PromiseState.Rejected; - } - } - - public bool IsFulfilled { - get { - return m_state == PromiseState.Fulfilled; - } - } - - public Exception RejectReason { - get { - return m_error; - } - } - - - internal void Resolve(T result) { - if (BeginTransit()) - CompleteResolve(); - } - - internal void Reject(Exception reason) { - if (BeginTransit()) { - m_error = reason; - m_state = PromiseState.Rejected; - CompleteTransit(); - } - } - - - #region implemented abstract members of AbstractPromise - - protected override void SignalHandler(IResolvable handler) { - switch (m_state) { - case PromiseState.Fulfilled: - handler.Resolve(m_result); - break; - case PromiseState.Rejected: - handler.Reject(RejectReason); - break; - default: - throw new InvalidOperationException(String.Format("Invalid promise signal: {0}", m_state)); - } - } - - protected override Signal GetFulfillSignal() { - var next = new ResolvableSignal(); - Then(next); - return next.Signal; - } - - #endregion - - protected void CompleteResolve() { - m_state = PromiseState.Fulfilled; - CompleteTransit(); - } - - public Type ResultType { - get { - return typeof(void); - } - } - - - protected void Rethrow() { - Debug.Assert(m_error != null); - if (m_error is OperationCanceledException) - throw new OperationCanceledException("Operation cancelled", m_error); - else - throw new TargetInvocationException(m_error); - } - - public void Then(IResolvable next) { - AddHandler(next); - } - - public void Then(IResolvable next) { - AddHandler(new ResolvableWrapper(next)); - } - - public IPromise Cast() { - return (IPromise)this; - } - - void IPromise.Join() { - WaitResult(-1); - if (IsRejected) - Rethrow(); - } - - void IPromise.Join(int timeout) { - WaitResult(timeout); - if (IsRejected) - Rethrow(); - } - - public T Join() { - WaitResult(-1); - if (IsRejected) - Rethrow(); - return m_result; - } - - public T Join(int timeout) { - WaitResult(timeout); - if (IsRejected) - Rethrow(); - return m_result; - } - } -} - diff --git a/Implab/CancellationToken.cs b/Implab/CancellationToken.cs --- a/Implab/CancellationToken.cs +++ b/Implab/CancellationToken.cs @@ -3,63 +3,26 @@ using System.Threading; using Implab.Parallels; namespace Implab { - public class CancellationToken : ICancellationToken { - const int CANCEL_NOT_REQUESTED = 0; - const int CANCEL_REQUESTING = 1; - const int CANCEL_REQUESTED = 2; - - volatile int m_state = CANCEL_NOT_REQUESTED; - - Action m_handler; - - Parallels.SimpleAsyncQueue> m_handlers; - - public bool IsCancellationRequested { - get { return m_state == CANCEL_REQUESTED; } + /// + /// The cancellation token signals to the worker that cancellation has been + /// requested, after the signal is received the worker decides wheather to + /// cancel its work or to continue. + /// + public class CancellationToken : AbstractEvent> { + public CancellationToken() { + } - - public Exception CancellationReason { - get; set; - } + + public void RequestCancellation() { - public void CancellationRequested(Action handler) { - Safe.ArgumentNotNull(handler, nameof(handler)); - if (IsCancellationRequested) { - handler(CancellationReason); - } else { - EnqueueHandler(handler); - if (IsCancellationRequested && TryDequeueHandler(out handler)) - handler(CancellationReason); - } } - bool TryDequeueHandler(out Action handler) { - handler = Interlocked.Exchange(ref m_handler, null); - if (handler != null) - return true; - else if (m_handlers != null) - return m_handlers.TryDequeue(out handler); - else - return false; - } + public void RequestCancellation(Exception reason) { - void EnqueueHandler(Action handler) { - if (Interlocked.CompareExchange(ref m_handler, handler, null) != null) { - if (m_handlers == null) - // compare-exchange will fprotect from loosing already created queue - Interlocked.CompareExchange(ref m_handlers, new SimpleAsyncQueue>(), null); - m_handlers.Enqueue(handler); - } - } - - void RequestCancellation(Exception reason) { - if (Interlocked.CompareExchange(ref m_state, CANCEL_REQUESTING, CANCEL_NOT_REQUESTED) == CANCEL_NOT_REQUESTED) { - if (reason == null) - reason = new OperationCanceledException(); - CancellationReason = reason; - m_state = CANCEL_REQUESTED; - } } + protected override void SignalHandler(Action handler) { + throw new NotImplementedException(); + } } } \ No newline at end of file diff --git a/Implab/Components/PollingComponent.cs b/Implab/Components/PollingComponent.cs deleted file mode 100644 --- a/Implab/Components/PollingComponent.cs +++ /dev/null @@ -1,155 +0,0 @@ -using System; -using System.Threading; -using Implab.Diagnostics; - -namespace Implab.Components { - public class PollingComponent : RunnableComponent { - readonly Timer m_timer; - readonly Func, IPromise> m_dispatcher; - readonly TimeSpan m_interval; - - readonly object m_lock = new object(); - - ActionTask m_pending; - - protected PollingComponent(TimeSpan interval, Func, IPromise> dispatcher, bool initialized) : base(initialized) { - m_timer = new Timer(OnInternalTick); - - m_interval = interval; - m_dispatcher = dispatcher; - } - - protected override IPromise OnStart() { - m_timer.Change(TimeSpan.Zero, m_interval); - - return base.OnStart(); - } - - void OnInternalTick(object state) { - if (StartTick()) { - try { - if (m_dispatcher != null) { - var result = m_dispatcher(OnTick); - m_pending.CancellationRequested(result.Cancel); - AwaitTick(result); - } else { - AwaitTick(OnTick(m_pending)); - } - } catch (Exception error) { - HandleTickError(error); - } - } - } - - /// - /// Checks wheather there is no running handler in the component and marks that the handler is starting. - /// - /// boolean value, true - the new tick handler may be invoked, false - a tick handler is already running or a component isn't running. - /// - /// If the component is stopping no new handlers can be run. Every successful call to this method must be completed with either AwaitTick or HandleTickError handlers. - /// - protected virtual bool StartTick() { - lock (m_lock) { - if (State != ExecutionState.Running || m_pending != null) - return false; - // actually the component may be in a not running state here (stopping, disposed or what ever), - // but OnStop method also locks on the same object and will handle operation in m_pending - m_pending = new ActionTask( - () => { - // only one operation is running, it's safe to assing m_pending from it - m_pending = null; - }, - ex => { - try { - OnTickError(ex); - // Analysis disable once EmptyGeneralCatchClause - } catch { - } finally { - m_pending = null; - } - // suppress error - }, - ex => { - try { - OnTickCancel(ex); - // Analysis disable once EmptyGeneralCatchClause - } catch { - } finally { - m_pending = null; - } - // supress cancellation - }, - false - ); - return true; - } - } - - /// - /// Awaits the tick. - /// - /// Tick. - /// - /// This method is called only after StartTick method and m_pending will hold the promise which should be fulfilled. - /// - void AwaitTick(IPromise tick) { - if (tick == null) { - m_pending.Resolve(); - } else { - tick.On( - m_pending.Resolve, - m_pending.Reject, - m_pending.CancelOperation - ); - } - } - - /// - /// Handles the tick error. - /// - /// - /// This method is called only after StartTick method and m_pending will hold the promise which should be fulfilled. - /// - void HandleTickError(Exception error) { - m_pending.Reject(error); - } - - protected virtual void OnTickError(Exception error) { - } - - protected virtual void OnTickCancel(Exception error) { - } - - /// - /// Invoked when the timer ticks, use this method to implement your logic - /// - protected virtual IPromise OnTick(ICancellationToken cancellationToken) { - return Promise.Success; - } - - protected override IPromise OnStop() { - m_timer.Change(-1, -1); - - // the component is in the stopping state - lock (m_lock) { - // after this lock no more pending operations could be created - var pending = m_pending; - // m_pending could be fulfilled and set to null already - if (pending != null) { - pending.Cancel(); - return pending.Then(base.OnStop); - } - } - - return base.OnStop(); - } - - protected override void Dispose(bool disposing) { - if (disposing) - m_timer.Dispose(); - - base.Dispose(disposing); - } - } -} - diff --git a/Implab/Components/RunnableComponent.cs b/Implab/Components/RunnableComponent.cs deleted file mode 100644 --- a/Implab/Components/RunnableComponent.cs +++ /dev/null @@ -1,411 +0,0 @@ -using System; -using System.Diagnostics.CodeAnalysis; - -namespace Implab.Components { - public abstract class RunnableComponent : IDisposable, IRunnable, IInitializable { - enum Commands { - Ok = 0, - Fail, - Init, - Start, - Stop, - Dispose, - Reset, - Last = Reset - } - - class StateMachine { - public static readonly ExecutionState[,] ReusableTransitions; - public static readonly ExecutionState[,] NonreusableTransitions; - - class StateBuilder { - readonly ExecutionState[,] m_states; - - public ExecutionState[,] States { - get { return m_states; } - } - public StateBuilder(ExecutionState[,] states) { - m_states = states; - } - - public StateBuilder() { - m_states = new ExecutionState[(int)ExecutionState.Last + 1, (int)Commands.Last + 1]; - } - - public StateBuilder Edge(ExecutionState s1, ExecutionState s2, Commands cmd) { - m_states[(int)s1, (int)cmd] = s2; - return this; - } - - public StateBuilder Clone() { - return new StateBuilder((ExecutionState[,])m_states.Clone()); - } - } - - static StateMachine() { - ReusableTransitions = new ExecutionState[(int)ExecutionState.Last + 1, (int)Commands.Last + 1]; - - var common = new StateBuilder() - .Edge(ExecutionState.Created, ExecutionState.Initializing, Commands.Init) - .Edge(ExecutionState.Created, ExecutionState.Disposed, Commands.Dispose) - - .Edge(ExecutionState.Initializing, ExecutionState.Ready, Commands.Ok) - .Edge(ExecutionState.Initializing, ExecutionState.Failed, Commands.Fail) - - .Edge(ExecutionState.Ready, ExecutionState.Starting, Commands.Start) - .Edge(ExecutionState.Ready, ExecutionState.Disposed, Commands.Dispose) - - .Edge(ExecutionState.Starting, ExecutionState.Running, Commands.Ok) - .Edge(ExecutionState.Starting, ExecutionState.Failed, Commands.Fail) - .Edge(ExecutionState.Starting, ExecutionState.Stopping, Commands.Stop) - .Edge(ExecutionState.Starting, ExecutionState.Disposed, Commands.Dispose) - - .Edge(ExecutionState.Running, ExecutionState.Failed, Commands.Fail) - .Edge(ExecutionState.Running, ExecutionState.Stopping, Commands.Stop) - .Edge(ExecutionState.Running, ExecutionState.Disposed, Commands.Dispose) - - .Edge(ExecutionState.Failed, ExecutionState.Disposed, Commands.Dispose) - .Edge(ExecutionState.Failed, ExecutionState.Initializing, Commands.Reset) - - .Edge(ExecutionState.Stopping, ExecutionState.Failed, Commands.Fail) - .Edge(ExecutionState.Stopping, ExecutionState.Disposed, Commands.Dispose) - - .Edge(ExecutionState.Disposed, ExecutionState.Disposed, Commands.Dispose); - - var reusable = common - .Clone() - .Edge(ExecutionState.Stopping, ExecutionState.Ready, Commands.Ok); - - var nonreusable = common - .Clone() - .Edge(ExecutionState.Stopping, ExecutionState.Disposed, Commands.Ok); - - NonreusableTransitions = nonreusable.States; - ReusableTransitions = reusable.States; - - } - - readonly ExecutionState[,] m_states; - - public ExecutionState State { - get; - private set; - } - - public StateMachine(ExecutionState[,] states, ExecutionState initial) { - State = initial; - m_states = states; - } - - public bool Move(Commands cmd) { - var next = m_states[(int)State, (int)cmd]; - if (next == ExecutionState.Undefined) - return false; - State = next; - return true; - } - } - - IPromise m_pending; - Exception m_lastError; - - readonly StateMachine m_stateMachine; - readonly bool m_reusable; - public event EventHandler StateChanged; - - /// - /// Initializes component state. - /// - /// If set, the component initial state is and the component is ready to start, otherwise initialization is required. - /// If set, the component may start after it has been stopped, otherwise the component is disposed after being stopped. - protected RunnableComponent(bool initialized, bool reusable) { - m_stateMachine = new StateMachine( - reusable ? StateMachine.ReusableTransitions : StateMachine.NonreusableTransitions, - initialized ? ExecutionState.Ready : ExecutionState.Created - ); - m_reusable = reusable; - } - - /// - /// Initializes component state. The component created with this constructor is not reusable, i.e. it will be disposed after stop. - /// - /// If set, the component initial state is and the component is ready to start, otherwise initialization is required. - protected RunnableComponent(bool initialized) : this(initialized, false) { - } - - void ThrowInvalidCommand(Commands cmd) { - if (m_stateMachine.State == ExecutionState.Disposed) - throw new ObjectDisposedException(ToString()); - - throw new InvalidOperationException(String.Format("Command {0} is not allowed in the state {1}", cmd, m_stateMachine.State)); - } - - bool MoveIfInState(Commands cmd, IPromise pending, Exception error, ExecutionState state) { - ExecutionState prev, current; - lock (m_stateMachine) { - if (m_stateMachine.State != state) - return false; - - prev = m_stateMachine.State; - if (!m_stateMachine.Move(cmd)) - ThrowInvalidCommand(cmd); - current = m_stateMachine.State; - - m_pending = pending; - m_lastError = error; - } - if (prev != current) - OnStateChanged(prev, current, error); - return true; - } - - bool MoveIfPending(Commands cmd, IPromise pending, Exception error, IPromise expected) { - ExecutionState prev, current; - lock (m_stateMachine) { - if (m_pending != expected) - return false; - prev = m_stateMachine.State; - if (!m_stateMachine.Move(cmd)) - ThrowInvalidCommand(cmd); - current = m_stateMachine.State; - m_pending = pending; - m_lastError = error; - } - if (prev != current) - OnStateChanged(prev, current, error); - return true; - } - - IPromise Move(Commands cmd, IPromise pending, Exception error) { - ExecutionState prev, current; - IPromise ret; - lock (m_stateMachine) { - prev = m_stateMachine.State; - if (!m_stateMachine.Move(cmd)) - ThrowInvalidCommand(cmd); - current = m_stateMachine.State; - - ret = m_pending; - m_pending = pending; - m_lastError = error; - - } - if (prev != current) - OnStateChanged(prev, current, error); - return ret; - } - - /// - /// Handles the state of the component change event, raises the event, handles - /// the transition to the state (calls method). - /// - /// The previous state - /// The current state - /// The last error if any. - /// - /// - /// If the previous state and the current state are same this method isn't called, such situiation is treated - /// as the component hasn't changed it's state. - /// - /// - /// When overriding this method ensure the call is made to the base implementation, otherwise it will lead to - /// the wrong behavior of the component. - /// - /// - protected virtual void OnStateChanged(ExecutionState previous, ExecutionState current, Exception error) { - StateChanged.DispatchEvent( - this, - new StateChangeEventArgs { - State = current, - LastError = error - } - ); - if (current == ExecutionState.Disposed) { - GC.SuppressFinalize(this); - Dispose(true); - } - } - - /// - /// Moves the component from running to failed state. - /// - /// The exception which is describing the error. - protected bool Fail(Exception error) { - return MoveIfInState(Commands.Fail, null, error, ExecutionState.Running); - } - - /// - /// Tries to reset state to . - /// - /// True if component is reset to , false if the componet wasn't - /// in state. - /// - /// This method checks the current state of the component and if it's in - /// moves component to . - /// The is called and if this method completes succesfully the component moved - /// to state, otherwise the component is moved to - /// state. If throws an exception it will be propagated by this method to the caller. - /// - protected bool ResetState() { - if (!MoveIfInState(Commands.Reset, null, null, ExecutionState.Failed)) - return false; - - try { - OnResetState(); - Move(Commands.Ok, null, null); - return true; - } catch (Exception err) { - Move(Commands.Fail, null, err); - throw; - } - } - - /// - /// This method is called by to reinitialize component in the failed state. - /// - /// - /// Default implementation throws which will cause the component - /// fail to reset it's state and it left in state. - /// If this method doesn't throw exceptions the component is moved to state. - /// - protected virtual void OnResetState() { - throw new NotImplementedException(); - } - - IPromise InvokeAsync(Commands cmd, Func action, Action chain) { - IPromise promise = null; - IPromise prev; - - var task = new ActionChainTask(action, null, null, true); - - Action errorOrCancel = e => { - if (e == null) - e = new OperationCanceledException(); - MoveIfPending(Commands.Fail, null, e, promise); - throw new PromiseTransientException(e); - }; - - promise = task.Then( - () => MoveIfPending(Commands.Ok, null, null, promise), - errorOrCancel, - errorOrCancel - ); - - prev = Move(cmd, promise, null); - - if (prev == null) - task.Resolve(); - else - chain(prev, task); - - return promise; - } - - - #region IInitializable implementation - - public void Initialize() { - Move(Commands.Init, null, null); - - try { - OnInitialize(); - Move(Commands.Ok, null, null); - } catch (Exception err) { - Move(Commands.Fail, null, err); - throw; - } - } - - protected virtual void OnInitialize() { - } - - #endregion - - #region IRunnable implementation - - public IPromise Start() { - return InvokeAsync(Commands.Start, OnStart, null); - } - - protected virtual IPromise OnStart() { - return Promise.Success; - } - - public IPromise Stop() { - return InvokeAsync(Commands.Stop, OnStop, StopPending); - } - - protected virtual IPromise OnStop() { - return Promise.Success; - } - - /// - /// Stops the current operation if one exists. - /// - /// Current. - /// Stop. - protected virtual void StopPending(IPromise current, IResolvable stop) { - if (current == null) { - stop.Resolve(); - } else { - // связваем текущую операцию с операцией остановки - current.On( - stop.Resolve, // если текущая операция заверщилась, то можно начинать остановку - stop.Reject, // если текущая операция дала ошибку - то все плохо, нельзя продолжать - e => stop.Resolve() // если текущая отменилась, то можно начинать остановку - ); - // посылаем текущей операции сигнал остановки - current.Cancel(); - } - } - - public ExecutionState State { - get { - return m_stateMachine.State; - } - } - - public Exception LastError { - get { - return m_lastError; - } - } - - #endregion - - #region IDisposable implementation - - /// - /// Releases all resource used by the object. - /// - /// - /// Will not try to stop the component, it will just release all resources. - /// To cleanup the component gracefully use method. - /// - /// In normal cases the method shouldn't be called, the call to the - /// method is sufficient to cleanup the component. Call only to cleanup after errors, - /// especially if method is failed. Using this method insted of may - /// lead to the data loss by the component. - /// - [SuppressMessage("Microsoft.Design", "CA1063:ImplementIDisposableCorrectly", Justification = "Dipose(bool) and GC.SuppessFinalize are called")] - public void Dispose() { - Move(Commands.Dispose, null, null); - } - - ~RunnableComponent() { - Dispose(false); - } - - #endregion - - /// - /// Releases all resources used by the component, called automatically, override this method to implement your cleanup. - /// - /// true if this method is called during normal dispose process. - /// The operation which is currenty pending - protected virtual void Dispose(bool disposing) { - } - - } -} - diff --git a/Implab/Deferred.cs b/Implab/Deferred.cs --- a/Implab/Deferred.cs +++ b/Implab/Deferred.cs @@ -7,10 +7,13 @@ namespace Implab { /// public class Deferred : IResolvable { - readonly AbstractPromise m_promise; + readonly Promise m_promise; readonly IDispatcher m_dispatcher; - internal Deferred(AbstractPromise promise, IDispatcher dispatcher) { + internal Deferred(IDispatcher dispatcher) : this(new Promise(), dispatcher) { + } + + internal Deferred(Promise promise, IDispatcher dispatcher) { Debug.Assert(promise != null); m_promise = promise; m_dispatcher = dispatcher; @@ -21,11 +24,14 @@ namespace Implab { } public void Reject(Exception error) { - m_promise.Reject(error); + if (error is PromiseTransientException) + error = ((PromiseTransientException)error).InnerException; + + m_promise.RejectPromise(error); } public void Resolve() { - m_promise.Resolve(); + m_promise.ResolvePromise(); } public void Resolve(IPromise thenable) { @@ -36,7 +42,7 @@ namespace Implab { else if (m_dispatcher != null) // dispatch (see ecma-262/6.0: 25.4.1.3.2 Promise Resolve Functions) - m_dispatcher.Enqueue(() => Chain(thenable)); + m_dispatcher.Enqueue(Chain, thenable); else Chain(thenable); } diff --git a/Implab/Deferred`1.cs b/Implab/Deferred`1.cs --- a/Implab/Deferred`1.cs +++ b/Implab/Deferred`1.cs @@ -3,10 +3,13 @@ using System.Diagnostics; namespace Implab { public class Deferred : IResolvable { - readonly AbstractPromise m_promise; + readonly Promise m_promise; readonly IDispatcher m_dispatcher; - internal Deferred(AbstractPromise promise, IDispatcher dispatcher) { + internal Deferred(IDispatcher dispatcher) : this(new Promise(), dispatcher) { + } + + internal Deferred(Promise promise, IDispatcher dispatcher) { Debug.Assert(promise != null); m_promise = promise; m_dispatcher = dispatcher; @@ -17,11 +20,14 @@ namespace Implab { } public void Reject(Exception error) { - m_promise.Reject(error); + if (error is PromiseTransientException) + error = ((PromiseTransientException)error).InnerException; + + m_promise.RejectPromise(error); } public void Resolve(T value) { - m_promise.Resolve(value); + m_promise.ResolvePromise(value); } public void Resolve(IPromise thenable) { @@ -32,7 +38,7 @@ namespace Implab { else if (m_dispatcher != null) // dispatch (see ecma-262/6.0: 25.4.1.3.2 Promise Resolve Functions) - m_dispatcher.Enqueue(() => Chain(thenable)); + m_dispatcher.Enqueue(Chain, thenable); else Chain(thenable); } diff --git a/Implab/Diagnostics/Extensions.cs b/Implab/Diagnostics/Extensions.cs deleted file mode 100644 --- a/Implab/Diagnostics/Extensions.cs +++ /dev/null @@ -1,41 +0,0 @@ -namespace Implab.Diagnostics { - public static class Extensions { - public static IPromise EndLogicalOperation(this IPromise promise) { - Safe.ArgumentNotNull(promise, "promise"); - var op = TraceContext.Instance.DetachLogicalOperation(); - - return promise.On( - x => { - TraceContext.Instance.EnterLogicalOperation(op,true); - TraceLog.TraceInformation("promise = {0}", x); - TraceLog.EndLogicalOperation(); - TraceContext.Instance.Leave(); - }, - err =>{ - TraceContext.Instance.EnterLogicalOperation(op,true); - TraceLog.TraceError("promise died {0}", err); - TraceLog.EndLogicalOperation(); - TraceContext.Instance.Leave(); - }, - reason => { - TraceContext.Instance.EnterLogicalOperation(op,true); - TraceLog.TraceInformation("promise cancelled {0}", reason == null ? "" : reason.Message); - TraceLog.EndLogicalOperation(); - TraceContext.Instance.Leave(); - } - ); - } - - public static IPromise EndLogicalOperation(this IPromise promise) { - Safe.ArgumentNotNull(promise, "promise"); - var op = TraceContext.Instance.DetachLogicalOperation(); - - return promise.On(() => { - TraceContext.Instance.EnterLogicalOperation(op,true); - TraceLog.EndLogicalOperation(); - TraceContext.Instance.Leave(); - }, PromiseEventType.All); - } - } -} - diff --git a/Implab/ExceptionHelpers.cs b/Implab/ExceptionHelpers.cs new file mode 100644 --- /dev/null +++ b/Implab/ExceptionHelpers.cs @@ -0,0 +1,18 @@ +using System; +using System.Reflection; +using System.Runtime.ExceptionServices; + +namespace Implab { + public static class ExceptionHelpers { + public static void Rethrow(this Exception that) { + ExceptionDispatchInfo.Capture(that).Throw(); + } + + public static void ThrowInvocationException(this Exception that) { + if (that is OperationCanceledException) + throw new OperationCanceledException("Operation cancelled", that); + else + throw new TargetInvocationException(that); + } + } +} \ No newline at end of file diff --git a/Implab/IDispatcher.cs b/Implab/IDispatcher.cs --- a/Implab/IDispatcher.cs +++ b/Implab/IDispatcher.cs @@ -3,5 +3,7 @@ using System; namespace Implab { public interface IDispatcher { void Enqueue(Action job); + + void Enqueue(Action job, T arg); } } \ No newline at end of file diff --git a/Implab/Parallels/ArrayTraits.cs b/Implab/Parallels/ArrayTraits.cs deleted file mode 100644 --- a/Implab/Parallels/ArrayTraits.cs +++ /dev/null @@ -1,207 +0,0 @@ -using Implab.Diagnostics; -using System; -using System.Diagnostics; -using System.Threading; - -namespace Implab.Parallels { - public static class ArrayTraits { - class ArrayIterator : DispatchPool { - readonly Action m_action; - readonly TSrc[] m_source; - readonly Promise m_promise = new Promise(); - readonly LogicalOperation m_logicalOperation; - - int m_pending; - int m_next; - - public ArrayIterator(TSrc[] source, Action action, int threads) - : base(threads) { - - Debug.Assert(source != null); - Debug.Assert(action != null); - - m_logicalOperation = TraceContext.Instance.CurrentOperation; - m_next = 0; - m_source = source; - m_pending = source.Length; - m_action = action; - - m_promise.On(Dispose, PromiseEventType.All); - - InitPool(); - } - - public Promise Promise { - get { - return m_promise; - } - } - - protected override void Worker() { - TraceContext.Instance.EnterLogicalOperation(m_logicalOperation, false); - try { - base.Worker(); - } finally { - TraceContext.Instance.Leave(); - } - } - - protected override bool TryDequeue(out int unit) { - unit = Interlocked.Increment(ref m_next) - 1; - return unit < m_source.Length; - } - - protected override void InvokeUnit(int unit) { - try { - m_action(m_source[unit]); - var pending = Interlocked.Decrement(ref m_pending); - if (pending == 0) - m_promise.Resolve(m_source.Length); - } catch (Exception e) { - m_promise.Reject(e); - } - } - } - - class ArrayMapper: DispatchPool { - readonly Func m_transform; - readonly TSrc[] m_source; - readonly TDst[] m_dest; - readonly Promise m_promise = new Promise(); - readonly LogicalOperation m_logicalOperation; - - int m_pending; - int m_next; - - public ArrayMapper(TSrc[] source, Func transform, int threads) - : base(threads) { - - Debug.Assert (source != null); - Debug.Assert( transform != null); - - m_next = 0; - m_source = source; - m_dest = new TDst[source.Length]; - m_pending = source.Length; - m_transform = transform; - m_logicalOperation = TraceContext.Instance.CurrentOperation; - - m_promise.On(Dispose, PromiseEventType.All); - - InitPool(); - } - - public Promise Promise { - get { - return m_promise; - } - } - - protected override void Worker() { - TraceContext.Instance.EnterLogicalOperation(m_logicalOperation,false); - try { - base.Worker(); - } finally { - TraceContext.Instance.Leave(); - } - } - - protected override bool TryDequeue(out int unit) { - unit = Interlocked.Increment(ref m_next) - 1; - return unit < m_source.Length; - } - - protected override void InvokeUnit(int unit) { - try { - m_dest[unit] = m_transform(m_source[unit]); - var pending = Interlocked.Decrement(ref m_pending); - if (pending == 0) - m_promise.Resolve(m_dest); - } catch (Exception e) { - m_promise.Reject(e); - } - } - } - - public static IPromise ParallelMap (this TSrc[] source, Func transform, int threads) { - if (source == null) - throw new ArgumentNullException("source"); - if (transform == null) - throw new ArgumentNullException("transform"); - - var mapper = new ArrayMapper(source, transform, threads); - return mapper.Promise; - } - - public static IPromise ParallelForEach(this TSrc[] source, Action action, int threads) { - if (source == null) - throw new ArgumentNullException("source"); - if (action == null) - throw new ArgumentNullException("action"); - - var iter = new ArrayIterator(source, action, threads); - return iter.Promise; - } - - public static IPromise ChainedMap(this TSrc[] source, Func> transform, int threads) { - if (source == null) - throw new ArgumentNullException("source"); - if (transform == null) - throw new ArgumentNullException("transform"); - if (threads <= 0) - throw new ArgumentOutOfRangeException("threads","Threads number must be greater then zero"); - - if (source.Length == 0) - return Promise.FromResult(new TDst[0]); - - var promise = new Promise(); - var res = new TDst[source.Length]; - var pending = source.Length; - - object locker = new object(); - int slots = threads; - - // Analysis disable AccessToDisposedClosure - AsyncPool.RunThread(() => { - for (int i = 0; i < source.Length; i++) { - if(promise.IsFulfilled) - break; // stop processing in case of error or cancellation - var idx = i; - - if (Interlocked.Decrement(ref slots) < 0) { - lock(locker) { - while(slots < 0) - Monitor.Wait(locker); - } - } - - try { - transform(source[i]) - .On( x => { - Interlocked.Increment(ref slots); - lock (locker) { - Monitor.Pulse(locker); - } - }) - .On( - x => { - res[idx] = x; - var left = Interlocked.Decrement(ref pending); - if (left == 0) - promise.Resolve(res); - }, - promise.Reject - ); - - } catch (Exception e) { - promise.Reject(e); - } - } - return 0; - }); - - return promise; - } - - } -} diff --git a/Implab/Parallels/AsyncPool.cs b/Implab/Parallels/AsyncPool.cs deleted file mode 100644 --- a/Implab/Parallels/AsyncPool.cs +++ /dev/null @@ -1,155 +0,0 @@ -using Implab.Diagnostics; -using System; -using System.Threading; -using System.Linq; - -namespace Implab.Parallels { - /// - /// Класс для распаралеливания задач. - /// - /// - /// Используя данный класс и лямда выражения можно распараллелить - /// вычисления, для этого используется концепция обещаний. - /// - public static class AsyncPool { - - public static IPromise Invoke(Func func) { - var p = new Promise(); - var caller = TraceContext.Instance.CurrentOperation; - - ThreadPool.QueueUserWorkItem(param => { - TraceContext.Instance.EnterLogicalOperation(caller,false); - try { - p.Resolve(func()); - } catch(Exception e) { - p.Reject(e); - } finally { - TraceContext.Instance.Leave(); - } - }); - - return p; - } - - public static IPromise Invoke(Func func) { - var p = new Promise(); - var caller = TraceContext.Instance.CurrentOperation; - - ThreadPool.QueueUserWorkItem(param => { - TraceContext.Instance.EnterLogicalOperation(caller,false); - try { - p.Resolve(func(p)); - } catch(Exception e) { - p.Reject(e); - } finally { - TraceContext.Instance.Leave(); - } - }); - - return p; - } - - public static IPromise RunThread(Func func) { - var p = new Promise(); - - var caller = TraceContext.Instance.CurrentOperation; - - var worker = new Thread(() => { - TraceContext.Instance.EnterLogicalOperation(caller,false); - try { - p.Resolve(func()); - } catch (Exception e) { - p.Reject(e); - } finally { - TraceContext.Instance.Leave(); - } - }); - worker.IsBackground = true; - worker.Start(); - - return p; - } - - public static IPromise RunThread(Func func) { - var p = new Promise(); - - var caller = TraceContext.Instance.CurrentOperation; - - var worker = new Thread(() => { - TraceContext.Instance.EnterLogicalOperation(caller,false); - try { - p.Resolve(func(p)); - } catch (Exception e) { - p.Reject(e); - } finally { - TraceContext.Instance.Leave(); - } - }); - worker.IsBackground = true; - worker.Start(); - - return p; - } - - - public static IPromise RunThread(Action func) { - var p = new Promise(); - - var caller = TraceContext.Instance.CurrentOperation; - - var worker = new Thread(() => { - TraceContext.Instance.EnterLogicalOperation(caller,false); - try { - func(); - p.Resolve(); - } catch (Exception e) { - p.Reject(e); - } finally { - TraceContext.Instance.Leave(); - } - }); - worker.IsBackground = true; - worker.Start(); - - return p; - } - - public static IPromise RunThread(Action func) { - var p = new Promise(); - - var caller = TraceContext.Instance.CurrentOperation; - - var worker = new Thread(() => { - TraceContext.Instance.EnterLogicalOperation(caller,false); - try { - func(p); - p.Resolve(); - } catch (Exception e) { - p.Reject(e); - } finally { - TraceContext.Instance.Leave(); - } - }); - worker.IsBackground = true; - worker.Start(); - - return p; - } - - public static IPromise[] RunThread(params Action[] func) { - return func.Select(f => RunThread(f)).ToArray(); - } - - public static IPromise[] RunThread(params Action[] func) { - return func.Select(f => RunThread(f)).ToArray(); - } - - public static IPromise[] RunThread(params Func[] func) { - return func.Select(f => RunThread(f)).ToArray(); - } - - public static IPromise[] RunThread(params Func[] func) { - return func.Select(f => RunThread(f)).ToArray(); - } - } -} diff --git a/Implab/Parallels/SyncContextDispatcher.cs b/Implab/Parallels/SyncContextDispatcher.cs new file mode 100644 --- /dev/null +++ b/Implab/Parallels/SyncContextDispatcher.cs @@ -0,0 +1,20 @@ +using System; +using System.Threading; + +namespace Implab { + public class SyncContextDispatcher : IDispatcher { + SynchronizationContext m_context; + public SyncContextDispatcher(SynchronizationContext context) { + Safe.ArgumentNotNull(context, nameof(context)); + m_context = context; + } + + public void Enqueue(Action job) { + m_context.Post((o) => job(), null); + } + + public void Enqueue(Action job, T arg) { + m_context.Post((o) => job((T)o), arg); + } + } +} \ No newline at end of file diff --git a/Implab/Parallels/ThreadPoolDispatcher.cs b/Implab/Parallels/ThreadPoolDispatcher.cs new file mode 100644 --- /dev/null +++ b/Implab/Parallels/ThreadPoolDispatcher.cs @@ -0,0 +1,20 @@ +using System; +using System.Threading; + +namespace Implab.Parallels { + public class ThreadPoolDispatcher : IDispatcher { + + public static ThreadPoolDispatcher Instance { get; private set; } = new ThreadPoolDispatcher(); + + private ThreadPoolDispatcher() { + } + + public void Enqueue(Action job) { + ThreadPool.QueueUserWorkItem((o) => job(), null); + } + + public void Enqueue(Action job, T arg) { + ThreadPool.QueueUserWorkItem((o) => job((T)o), arg); + } + } +} \ No newline at end of file diff --git a/Implab/Parallels/WorkerPool.cs b/Implab/Parallels/WorkerPool.cs deleted file mode 100644 --- a/Implab/Parallels/WorkerPool.cs +++ /dev/null @@ -1,152 +0,0 @@ -using System; -using System.Threading; -using System.Diagnostics; -using Implab.Diagnostics; - -namespace Implab.Parallels { - public class WorkerPool : DispatchPool { - - AsyncQueue m_queue = new AsyncQueue(); - int m_queueLength; - readonly int m_threshold = 1; - - public WorkerPool(int minThreads, int maxThreads, int threshold) - : base(minThreads, maxThreads) { - m_threshold = threshold; - InitPool(); - } - - public WorkerPool(int minThreads, int maxThreads) : - base(minThreads, maxThreads) { - InitPool(); - } - - public WorkerPool(int threads) - : base(threads) { - InitPool(); - } - - public WorkerPool() { - InitPool(); - } - - public IPromise Invoke(Func task) { - if (task == null) - throw new ArgumentNullException("task"); - if (IsDisposed) - throw new ObjectDisposedException(ToString()); - - var promise = new FuncTask(task, null, null, true); - - var lop = TraceContext.Instance.CurrentOperation; - - EnqueueTask(delegate { - TraceContext.Instance.EnterLogicalOperation(lop, false); - - promise.Resolve(); - - TraceContext.Instance.Leave(); - }); - - return promise; - } - - public IPromise Invoke(Action task) { - if (task == null) - throw new ArgumentNullException("task"); - if (IsDisposed) - throw new ObjectDisposedException(ToString()); - - var promise = new ActionTask(task, null, null, true); - - var lop = TraceContext.Instance.CurrentOperation; - - EnqueueTask(delegate { - TraceContext.Instance.EnterLogicalOperation(lop, false); - - promise.Resolve(); - - TraceContext.Instance.Leave(); - }); - - return promise; - } - - public IPromise Invoke(Func task) { - if (task == null) - throw new ArgumentNullException("task"); - if (IsDisposed) - throw new ObjectDisposedException(ToString()); - - var promise = new Promise(); - - var lop = TraceContext.Instance.CurrentOperation; - - EnqueueTask(delegate { - TraceContext.Instance.EnterLogicalOperation(lop, false); - try { - if (!promise.CancelOperationIfRequested()) - promise.Resolve(task(promise)); - } catch (Exception e) { - promise.Reject(e); - } finally { - TraceContext.Instance.Leave(); - } - }); - - return promise; - } - - public IPromise Invoke(Action task) { - if (task == null) - throw new ArgumentNullException("task"); - if (IsDisposed) - throw new ObjectDisposedException(ToString()); - - var promise = new Promise(); - - var lop = TraceContext.Instance.CurrentOperation; - - EnqueueTask(delegate { - TraceContext.Instance.EnterLogicalOperation(lop, false); - try { - if (!promise.CancelOperationIfRequested()) { - task(promise); - promise.Resolve(); - } - } catch (Exception e) { - promise.Reject(e); - } finally { - TraceContext.Instance.Leave(); - } - }); - - return promise; - } - - protected void EnqueueTask(Action unit) { - Debug.Assert(unit != null); - var len = Interlocked.Increment(ref m_queueLength); - m_queue.Enqueue(unit); - - if (len > m_threshold * PoolSize) { - StartWorker(); - } - - SignalThread(); - } - - protected override bool TryDequeue(out Action unit) { - if (m_queue.TryDequeue(out unit)) { - Interlocked.Decrement(ref m_queueLength); - return true; - } - return false; - } - - protected override void InvokeUnit(Action unit) { - unit(); - } - - } -} diff --git a/Implab/Promise.cs b/Implab/Promise.cs new file mode 100644 --- /dev/null +++ b/Implab/Promise.cs @@ -0,0 +1,209 @@ +using System; +using System.Collections.Generic; +using System.Diagnostics; +using System.Reflection; +using System.Threading.Tasks; +using Implab.Parallels; + +namespace Implab { + public class Promise : AbstractEvent, IPromise { + public static IDispatcher DefaultDispatcher { + get { + return ThreadPoolDispatcher.Instance; + } + } + + class ResolvableSignal : IResolvable { + public Signal Signal { get; private set; } + public ResolvableSignal() { + Signal = new Signal(); + } + + + public void Reject(Exception error) { + Signal.Set(); + } + + public void Resolve() { + Signal.Set(); + } + } + + PromiseState m_state; + + Exception m_error; + + public bool IsRejected { + get { + return m_state == PromiseState.Rejected; + } + } + + public bool IsFulfilled { + get { + return m_state == PromiseState.Fulfilled; + } + } + + public Exception RejectReason { + get { + return m_error; + } + } + + internal Promise() { + + } + + internal void ResolvePromise() { + if (BeginTransit()) { + m_state = PromiseState.Fulfilled; + CompleteTransit(); + } + } + + internal void RejectPromise(Exception reason) { + if (BeginTransit()) { + m_error = reason; + m_state = PromiseState.Rejected; + CompleteTransit(); + } + } + + + #region implemented abstract members of AbstractPromise + + protected override void SignalHandler(IResolvable handler) { + switch (m_state) { + case PromiseState.Fulfilled: + handler.Resolve(); + break; + case PromiseState.Rejected: + handler.Reject(RejectReason); + break; + default: + throw new InvalidOperationException(String.Format("Invalid promise signal: {0}", m_state)); + } + } + + protected void WaitResult(int timeout) { + if (!(IsResolved || GetFulfillSignal().Wait(timeout))) + throw new TimeoutException(); + } + + protected Signal GetFulfillSignal() { + var next = new ResolvableSignal(); + Then(next); + return next.Signal; + } + + #endregion + + + public Type ResultType { + get { + return typeof(void); + } + } + + + protected void Rethrow() { + Debug.Assert(m_error != null); + if (m_error is OperationCanceledException) + throw new OperationCanceledException("Operation cancelled", m_error); + else + throw new TargetInvocationException(m_error); + } + + public void Then(IResolvable next) { + AddHandler(next); + } + + public IPromise Cast() { + throw new InvalidCastException(); + } + + public void Join() { + WaitResult(-1); + if (IsRejected) + Rethrow(); + } + + public void Join(int timeout) { + WaitResult(timeout); + if (IsRejected) + Rethrow(); + } + + public static ResolvedPromise Resolve() { + return new ResolvedPromise(); + } + + public static ResolvedPromise Resolve(T result) { + return new ResolvedPromise(result); + } + + public static RejectedPromise Reject(Exception reason) { + return new RejectedPromise(reason); + } + + public static RejectedPromise Reject(Exception reason) { + return new RejectedPromise(reason); + } + + public static IPromise Create(PromiseExecutor executor) { + Safe.ArgumentNotNull(executor, nameof(executor)); + + var p = new Promise(); + var d = new Deferred(p, DefaultDispatcher); + + try { + executor(d); + } catch (Exception e) { + d.Reject(e); + } + + return d.Promise; + } + + public static IPromise Create(PromiseExecutor executor) { + Safe.ArgumentNotNull(executor, nameof(executor)); + + var p = new Promise(); + var d = new Deferred(p, DefaultDispatcher); + + try { + executor(d); + } catch (Exception e) { + d.Reject(e); + } + + return d.Promise; + } + + public static IPromise All(IEnumerable promises) { + var d = new Deferred(DefaultDispatcher); + var all = new PromiseAll(d); + foreach (var promise in promises) { + all.AddPromise(promise); + if (all.Done) + break; + } + all.Complete(); + return all.ResultPromise; + } + + public static IPromise All(IEnumerable> promises, Func cleanup, Action cancel) { + var d = new Deferred(DefaultDispatcher); + var all = new PromiseAll(d, cleanup, cancel); + foreach (var promise in promises) { + all.AddPromise(promise); + if (all.Done) + break; + } + all.Complete(); + return all.ResultPromise; + } + } +} + diff --git a/Implab/PromiseActionReaction.cs b/Implab/PromiseActionReaction.cs --- a/Implab/PromiseActionReaction.cs +++ b/Implab/PromiseActionReaction.cs @@ -3,63 +3,48 @@ using System.Diagnostics; namespace Implab { class PromiseActionReaction : PromiseReaction { - readonly Action m_fulfilled; - - readonly Action m_rejected; readonly Deferred m_next; - public PromiseActionReaction(Action fulfilled, Action rejected, Deferred next, IDispatcher dispatcher) : base(dispatcher) { + public IPromise Promise { + get { return m_next.Promise; } + } + + public PromiseActionReaction(Action fulfilled, Action rejected, IDispatcher dispatcher) : base(dispatcher) { + m_next = new Deferred(dispatcher); if (fulfilled != null) - m_fulfilled = () => { - fulfilled(); - next.Resolve(); - }; + FulfilHandler = PromiseHandler.Create(fulfilled, m_next); if (rejected != null) - m_rejected = (x) => { - rejected(x); - next.Resolve(); - }; - m_next = next; - } - - public PromiseActionReaction(Func fulfilled, Func rejected, Deferred next, IDispatcher dispatcher) : base(dispatcher) { - if (fulfilled != null) - m_fulfilled = () => { next.Resolve(fulfilled()); }; - if (rejected != null) - m_rejected = (e) => { next.Resolve(rejected(e)); }; - m_next = next; + RejectHandler = PromiseHandler.Create(rejected, m_next); } - public PromiseActionReaction(Action fulfilled, Func rejected, Deferred next, IDispatcher dispatcher) : base(dispatcher) { + public PromiseActionReaction(Func fulfilled, Func rejected, IDispatcher dispatcher) : base(dispatcher) { + m_next = new Deferred(dispatcher); if (fulfilled != null) - m_fulfilled = () => { - fulfilled(); - next.Resolve(); - }; + FulfilHandler = PromiseHandler.Create(fulfilled, m_next); if (rejected != null) - m_rejected = (e) => { next.Resolve(rejected(e)); }; - m_next = next; + RejectHandler = PromiseHandler.Create(rejected, m_next); } - public PromiseActionReaction(Func fulfilled, Action rejected, Deferred next, IDispatcher dispatcher) : base(dispatcher) { + public PromiseActionReaction(Action fulfilled, Func rejected, IDispatcher dispatcher) : base(dispatcher) { + m_next = new Deferred(dispatcher); if (fulfilled != null) - m_fulfilled = () => { next.Resolve(fulfilled()); }; + FulfilHandler = PromiseHandler.Create(fulfilled, m_next); if (rejected != null) - m_rejected = (x) => { - rejected(x); - next.Resolve(); - }; - m_next = next; + RejectHandler = PromiseHandler.Create(rejected, m_next); } + public PromiseActionReaction(Func fulfilled, Action rejected, IDispatcher dispatcher) : base(dispatcher) { + m_next = new Deferred(dispatcher); + if (fulfilled != null) + FulfilHandler = PromiseHandler.Create(fulfilled, m_next); - protected override bool HasFulfilHandler => m_fulfilled != null; - - protected override bool HasRejectHandler => m_rejected != null; + if (rejected != null) + RejectHandler = PromiseHandler.Create(rejected, m_next); + } protected override void DefaultReject(Exception reason) { m_next.Reject(reason); @@ -68,21 +53,5 @@ namespace Implab { protected override void DefaultResolve() { m_next.Resolve(); } - - protected override void RejectImpl(Exception reason) { - try { - m_rejected(reason); - } catch (Exception e){ - m_next.Reject(e); - } - } - - protected override void ResolveImpl() { - try { - m_fulfilled(); - } catch (Exception e){ - m_next.Reject(e); - } - } } } \ No newline at end of file diff --git a/Implab/PromiseActionReaction`1.cs b/Implab/PromiseActionReaction`1.cs --- a/Implab/PromiseActionReaction`1.cs +++ b/Implab/PromiseActionReaction`1.cs @@ -3,62 +3,47 @@ using System.Diagnostics; namespace Implab { class PromiseActionReaction : PromiseReaction { - readonly Action m_fulfilled; - - readonly Action m_rejected; - readonly Deferred m_next; - public PromiseActionReaction(Action fulfilled, Action rejected, Deferred next, IDispatcher dispatcher) : base(dispatcher) { + public IPromise Promise { + get { return m_next.Promise; } + } + + public PromiseActionReaction(Action fulfilled, Action rejected, IDispatcher dispatcher) : base(dispatcher) { + m_next = new Deferred(dispatcher); if (fulfilled != null) - m_fulfilled = (x) => { - fulfilled(x); - next.Resolve(); - }; + FulfilHandler = PromiseHandler.Create(fulfilled, m_next); if (rejected != null) - m_rejected = (x) => { - rejected(x); - next.Resolve(); - }; - m_next = next; + RejectHandler = PromiseHandler.Create(rejected, m_next); } - public PromiseActionReaction(Func fulfilled, Func rejected, Deferred next, IDispatcher dispatcher) : base(dispatcher) { + public PromiseActionReaction(Func fulfilled, Func rejected, IDispatcher dispatcher) : base(dispatcher) { + m_next = new Deferred(dispatcher); if (fulfilled != null) - m_fulfilled = (x) => { next.Resolve(fulfilled(x)); }; - if (rejected != null) - m_rejected = (e) => { next.Resolve(rejected(e)); }; - m_next = next; - } - - public PromiseActionReaction(Action fulfilled, Func rejected, Deferred next, IDispatcher dispatcher) : base(dispatcher) { - if (fulfilled != null) - m_fulfilled = (x) => { - fulfilled(x); - next.Resolve(); - }; + FulfilHandler = PromiseHandler.Create(fulfilled, m_next); if (rejected != null) - m_rejected = (e) => { next.Resolve(rejected(e)); }; - m_next = next; + RejectHandler = PromiseHandler.Create(rejected, m_next); } - public PromiseActionReaction(Func fulfilled, Action rejected, Deferred next, IDispatcher dispatcher) : base(dispatcher) { + public PromiseActionReaction(Action fulfilled, Func rejected, IDispatcher dispatcher) : base(dispatcher) { + m_next = new Deferred(dispatcher); if (fulfilled != null) - m_fulfilled = (x) => { next.Resolve(fulfilled(x)); }; + FulfilHandler = PromiseHandler.Create(fulfilled, m_next); if (rejected != null) - m_rejected = (x) => { - rejected(x); - next.Resolve(); - }; - m_next = next; + RejectHandler = PromiseHandler.Create(rejected, m_next); } - protected override bool HasFulfilHandler => m_fulfilled != null; + public PromiseActionReaction(Func fulfilled, Action rejected, IDispatcher dispatcher) : base(dispatcher) { + m_next = new Deferred(dispatcher); + if (fulfilled != null) + FulfilHandler = PromiseHandler.Create(fulfilled, m_next); - protected override bool HasRejectHandler => m_rejected != null; + if (rejected != null) + RejectHandler = PromiseHandler.Create(rejected, m_next); + } protected override void DefaultReject(Exception reason) { m_next.Reject(reason); @@ -67,21 +52,5 @@ namespace Implab { protected override void DefaultResolve(T result) { m_next.Resolve(); } - - protected override void RejectImpl(Exception reason) { - try { - m_rejected(reason); - } catch (Exception e) { - m_next.Reject(e); - } - } - - protected override void ResolveImpl(T result) { - try { - m_fulfilled(result); - } catch (Exception e) { - m_next.Reject(e); - } - } } } \ No newline at end of file diff --git a/Implab/PromiseAll.cs b/Implab/PromiseAll.cs new file mode 100644 --- /dev/null +++ b/Implab/PromiseAll.cs @@ -0,0 +1,40 @@ +using System; +using System.Threading; + +namespace Implab +{ + class PromiseAll : IResolvable { + int m_count; + + readonly Deferred m_deferred; + + public bool Done { + get { return m_deferred.Promise.IsResolved; } + } + + public IPromise ResultPromise { + get { return m_deferred.Promise; } + } + + public void AddPromise(IPromise promise) { + Interlocked.Increment(ref m_count); + } + + public PromiseAll(Deferred deferred) { + m_deferred = deferred; + } + + public void Resolve() { + if (Interlocked.Decrement(ref m_count) == 0) + m_deferred.Resolve(); + } + + public void Complete() { + Resolve(); + } + + public void Reject(Exception error) { + m_deferred.Reject(error); + } + } +} \ No newline at end of file diff --git a/Implab/PromiseAll`1.cs b/Implab/PromiseAll`1.cs new file mode 100644 --- /dev/null +++ b/Implab/PromiseAll`1.cs @@ -0,0 +1,90 @@ +using System; +using System.Collections.Generic; +using System.Linq; +using System.Threading; + +namespace Implab { + class PromiseAll : IResolvable { + + int m_count; + + readonly List> m_promises = new List>(); + + readonly Deferred m_deferred; + + IPromise m_result; + + readonly Func m_cleanup; + + readonly Action m_cancel; + + public bool Done { + get { return m_deferred.Promise.IsResolved && m_cleanup == null; } + } + + public IPromise ResultPromise { + get { return m_result; } + } + + public void AddPromise(IPromise promise) { + Interlocked.Increment(ref m_count); + promise.Then(this); + } + + public PromiseAll(Deferred deferred, Func cleanup, Action cancel) { + m_deferred = deferred; + m_cancel = cancel; + m_cleanup = cleanup; + } + + public void Resolve() { + if (Interlocked.Decrement(ref m_count) == 0) + m_deferred.Resolve(GetResults()); + } + + public void Reject(Exception error) { + m_deferred.Reject(error); + } + + public void Complete() { + if (m_cancel != null || m_cleanup != null) + m_result = m_deferred.Promise.Catch(CleanupResults); + else + m_result = m_deferred.Promise; + } + + IPromise CleanupResults(Exception reason) { + var errors = new List(); + errors.Add(reason); + + if (m_cancel != null) + try { + m_cancel(); + } catch (Exception e) { + errors.Add(e); + } + + if (m_cleanup != null) { + return Promise.All( + m_promises.Select(p => p + .Then(m_cleanup, e => { }) + .Catch(e => { + errors.Add(e); + }) + ) + ).Then(new Func(() => { + throw new AggregateException(errors); + }), (Func)null); + } else { + return Promise.Reject(errors.Count > 1 ? new AggregateException(errors) : reason); + } + } + + T[] GetResults() { + var results = new T[m_promises.Count]; + for (var i = 0; i < results.Length; i++) + results[i] = m_promises[i].Join(); + return results; + } + } +} \ No newline at end of file diff --git a/Implab/PromiseAwaiter.cs b/Implab/PromiseAwaiter.cs new file mode 100644 --- /dev/null +++ b/Implab/PromiseAwaiter.cs @@ -0,0 +1,62 @@ +using System; +using System.Runtime.CompilerServices; +using System.Threading; +using Implab.Parallels; + +namespace Implab +{ + public struct PromiseAwaiter : INotifyCompletion { + class PromiseEvent : IResolvable { + IDispatcher m_dispatcher; + + Action m_handler; + + public PromiseEvent(Action handler, IDispatcher dispatcher) { + m_handler = handler; + m_dispatcher = dispatcher; + } + + public void Resolve() { + m_dispatcher.Enqueue(m_handler); + } + + public void Reject(Exception error) { + m_dispatcher.Enqueue(m_handler); + } + } + + readonly IPromise m_promise; + readonly IDispatcher m_dispatcher; + + public PromiseAwaiter(IPromise promise, IDispatcher dispatcher) { + m_promise = promise; + m_dispatcher = dispatcher; + } + + public PromiseAwaiter(IPromise promise) { + m_promise = promise; + m_dispatcher = GetDispatcher(); + } + + public void OnCompleted (Action continuation) { + if (m_promise != null) + m_promise.Then(new PromiseEvent(continuation, GetDispatcher())); + } + + public void GetResult() { + m_promise.Join(); + } + + static IDispatcher GetDispatcher() { + if(SynchronizationContext.Current == null) + return ThreadPoolDispatcher.Instance; + return new SyncContextDispatcher(SynchronizationContext.Current); + } + + public bool IsCompleted { + get { + return m_promise.IsResolved; + } + } + } +} \ No newline at end of file diff --git a/Implab/PromiseAwaiter`1.cs b/Implab/PromiseAwaiter`1.cs new file mode 100644 --- /dev/null +++ b/Implab/PromiseAwaiter`1.cs @@ -0,0 +1,62 @@ +using System; +using System.Runtime.CompilerServices; +using System.Threading; +using Implab.Parallels; + +namespace Implab { + public struct PromiseAwaiter : INotifyCompletion { + class PromiseEvent : IResolvable { + IDispatcher m_dispatcher; + + Action m_handler; + + public PromiseEvent(Action handler, IDispatcher dispatcher) { + m_handler = handler; + m_dispatcher = dispatcher; + } + + public void Resolve(T result) { + m_dispatcher.Enqueue(m_handler); + } + + public void Reject(Exception error) { + m_dispatcher.Enqueue(m_handler); + } + } + + readonly IPromise m_promise; + + readonly IDispatcher m_dispatcher; + + public PromiseAwaiter(IPromise promise) { + m_promise = promise; + m_dispatcher = GetDispatcher(); + } + + public PromiseAwaiter(IPromise promise, IDispatcher dispatcher) { + m_promise = promise; + m_dispatcher = dispatcher; + } + + public void OnCompleted(Action continuation) { + if (m_promise != null) + m_promise.Then(new PromiseEvent(continuation, GetDispatcher())); + } + + public T GetResult() { + return m_promise.Join(); + } + + static IDispatcher GetDispatcher() { + if (SynchronizationContext.Current == null) + return ThreadPoolDispatcher.Instance; + return new SyncContextDispatcher(SynchronizationContext.Current); + } + + public bool IsCompleted { + get { + return m_promise.IsResolved; + } + } + } +} \ No newline at end of file diff --git a/Implab/PromiseExecutor.cs b/Implab/PromiseExecutor.cs new file mode 100644 --- /dev/null +++ b/Implab/PromiseExecutor.cs @@ -0,0 +1,3 @@ +namespace Implab { + public delegate void PromiseExecutor(Deferred deferred); +} \ No newline at end of file diff --git a/Implab/PromiseExecutor`1.cs b/Implab/PromiseExecutor`1.cs new file mode 100644 --- /dev/null +++ b/Implab/PromiseExecutor`1.cs @@ -0,0 +1,3 @@ +namespace Implab { + public delegate void PromiseExecutor(Deferred d); +} \ No newline at end of file diff --git a/Implab/PromiseExtensions.cs b/Implab/PromiseExtensions.cs --- a/Implab/PromiseExtensions.cs +++ b/Implab/PromiseExtensions.cs @@ -2,474 +2,130 @@ using System; using Implab.Diagnostics; using System.Collections.Generic; -using System.Linq; - +using System.Linq; + namespace Implab { public static class PromiseExtensions { - public static IPromise DispatchToCurrentContext(this IPromise that) { - Safe.ArgumentNotNull(that, "that"); - var context = SynchronizationContext.Current; - if (context == null) - return that; - var p = new SyncContextPromise(context); - p.CancellationRequested(that.Cancel); - - that.On( - p.Resolve, - p.Reject, - p.CancelOperation - ); - return p; - } - - public static IPromise DispatchToContext(this IPromise that, SynchronizationContext context) { - Safe.ArgumentNotNull(that, "that"); - Safe.ArgumentNotNull(context, "context"); - - var p = new SyncContextPromise(context); - p.CancellationRequested(that.Cancel); - - that.On( - p.Resolve, - p.Reject, - p.CancelOperation - ); - return p; - } - - /// - /// Ensures the dispatched. - /// - /// The dispatched. - /// That. - /// Head. - /// Cleanup. - /// The 1st type parameter. - /// The 2nd type parameter. - public static TPromise EnsureDispatched(this TPromise that, IPromise head, Action cleanup) where TPromise : IPromise { - Safe.ArgumentNotNull(that, "that"); - Safe.ArgumentNotNull(head, "head"); - - that.On(() => head.On(cleanup), PromiseEventType.Cancelled); - - return that; + public static IPromise Then(this IPromise that, Action fulfilled, Action rejected) { + var reaction = new PromiseActionReaction(fulfilled, rejected, Promise.DefaultDispatcher); + that.Then(reaction); + return reaction.Promise; } - /// - /// Adds a cancellation point to the chain of promises. When a cancellation request reaches the cancellation point the operation is - /// cancelled immediatelly, and the request is passed towards. If the operation at the higher level can not be cancelled is't result - /// will be collected with callback. - /// - /// The type of the promise result. - /// The promise to which the cancellation point should be attached. - /// The callback which is used to cleanup the result of the operation if the cancellation point is cancelled already. - /// The promise - public static IPromise CancellationPoint(this IPromise that, Action cleanup) { - var meduim = new Promise(); - - that.On(meduim.Resolve, meduim.Reject, meduim.CancelOperation); - - meduim.CancellationRequested(that.Cancel); - meduim.CancellationRequested(meduim.CancelOperation); - - if (cleanup != null) - meduim.On((Action)null, null, (e) => { - that.On(cleanup); - }); - - return meduim; + public static IPromise Then(this IPromise that, Action fulfilled, Func rejected) { + var reaction = new PromiseActionReaction(fulfilled, rejected, Promise.DefaultDispatcher); + that.Then(reaction); + return reaction.Promise; } - public static AsyncCallback AsyncCallback(this Promise that, Func callback) { - Safe.ArgumentNotNull(that, "that"); - Safe.ArgumentNotNull(callback, "callback"); - var op = TraceContext.Instance.CurrentOperation; - return ar => { - TraceContext.Instance.EnterLogicalOperation(op, false); - try { - that.Resolve(callback(ar)); - } catch (Exception err) { - that.Reject(err); - } finally { - TraceContext.Instance.Leave(); - } - }; + public static IPromise Then(this IPromise that, Func fulfilled, Action rejected) { + var reaction = new PromiseActionReaction(fulfilled, rejected, Promise.DefaultDispatcher); + that.Then(reaction); + return reaction.Promise; } - static void CancelByTimeoutCallback(object cookie) { - ((ICancellable)cookie).Cancel(new TimeoutException()); - } - - /// - /// Cancells promise after the specified timeout is elapsed. - /// - /// The promise to cancel on timeout. - /// The timeout in milliseconds. - /// The 1st type parameter. - public static TPromise Timeout(this TPromise that, int milliseconds) where TPromise : IPromise { - Safe.ArgumentNotNull(that, "that"); - var timer = new Timer(CancelByTimeoutCallback, that, milliseconds, -1); - that.On(timer.Dispose, PromiseEventType.All); - return that; + public static IPromise Then(this IPromise that, Func fulfilled, Func rejected) { + var reaction = new PromiseActionReaction(fulfilled, rejected, Promise.DefaultDispatcher); + that.Then(reaction); + return reaction.Promise; } - public static IPromise PromiseAll(this IEnumerable that) { - Safe.ArgumentNotNull(that, "that"); - return PromiseAll(that.ToList()); - } - - public static IPromise PromiseAll(this IEnumerable> that) { - return PromiseAll(that, null); - } - - public static IPromise PromiseAll(this IEnumerable> that, Action cleanup) { - Safe.ArgumentNotNull(that, "that"); - return PromiseAll(that.ToList(), cleanup); + public static IPromise Then(this IPromise that, Action fulfilled, Action rejected) { + var reaction = new PromiseActionReaction(fulfilled, rejected, Promise.DefaultDispatcher); + that.Then(reaction); + return reaction.Promise; } - public static IPromise PromiseAll(this ICollection that) { - Safe.ArgumentNotNull(that, "that"); - - int count = that.Count; - int errors = 0; - var medium = new Promise(); - - if (count == 0) { - medium.Resolve(); - return medium; - } - - medium.On(() => { - foreach (var p2 in that) - p2.Cancel(); - }, PromiseEventType.ErrorOrCancel); - - foreach (var p in that) - p.On( - () => { - if (Interlocked.Decrement(ref count) == 0) - medium.Resolve(); - }, - error => { - if (Interlocked.Increment(ref errors) == 1) - medium.Reject( - new Exception("The dependency promise is failed", error) - ); - }, - reason => { - if (Interlocked.Increment(ref errors) == 1) - medium.Cancel( - new Exception("The dependency promise is cancelled") - ); - } - ); - - return medium; - } - - public static IPromise PromiseAll(this ICollection> that) { - return PromiseAll(that, null); + public static IPromise Then(this IPromise that, Action fulfilled, Func rejected) { + var reaction = new PromiseActionReaction(fulfilled, rejected, Promise.DefaultDispatcher); + that.Then(reaction); + return reaction.Promise; } - /// - /// Creates a new promise which will be satisfied when all promises are satisfied. - /// - /// - /// - /// A callback used to cleanup already resolved promises in case of an error - /// - public static IPromise PromiseAll(this ICollection> that, Action cleanup) { - Safe.ArgumentNotNull(that, "that"); - - int count = that.Count; - - if (count == 0) - return Promise.FromResult(new T[0]); - - int errors = 0; - var medium = new Promise(); - var results = new T[that.Count]; - - medium.On(() => { - foreach (var p2 in that) { - p2.Cancel(); - if (cleanup != null) - p2.On(cleanup); - } - }, PromiseEventType.ErrorOrCancel); - - int i = 0; - foreach (var p in that) { - var idx = i; - p.On( - x => { - results[idx] = x; - if (Interlocked.Decrement(ref count) == 0) - medium.Resolve(results); - }, - error => { - if (Interlocked.Increment(ref errors) == 1) - medium.Reject( - new Exception("The dependency promise is failed", error) - ); - }, - reason => { - if (Interlocked.Increment(ref errors) == 1) - medium.Cancel( - new Exception("The dependency promise is cancelled", reason) - ); - } - ); - i++; - } - - return medium; + public static IPromise Then(this IPromise that, Func fulfilled, Action rejected) { + var reaction = new PromiseActionReaction(fulfilled, rejected, Promise.DefaultDispatcher); + that.Then(reaction); + return reaction.Promise; } - public static IPromise Then(this IPromise that, Action success, Action error, Action cancel) { - Safe.ArgumentNotNull(that, "that"); + public static IPromise Then(this IPromise that, Func fulfilled, Func rejected) { + var reaction = new PromiseActionReaction(fulfilled, rejected, Promise.DefaultDispatcher); + that.Then(reaction); + return reaction.Promise; + } - var d = new ActionTask(success, error, cancel, false); - that.On(d.Resolve, d.Reject, d.CancelOperation); - d.CancellationRequested(that.Cancel); - return d; + public static IPromise Then(this IPromise that, Func fulfilled, Func rejected) { + var reaction = new PromiseFuncReaction(fulfilled, rejected, Promise.DefaultDispatcher); + that.Then(reaction); + return reaction.Promise; } - public static IPromise Then(this IPromise that, Action success, Action error) { - return Then(that, success, error, null); - } - - public static IPromise Then(this IPromise that, Action success) { - return Then(that, success, null, null); - } - - public static IPromise Then(this IPromise that, Func success, Func error, Func cancel) { - Safe.ArgumentNotNull(that, "that"); - - var d = new FuncTask(success, error, cancel, false); - that.On(d.Resolve, d.Reject, d.CancelOperation); - d.CancellationRequested(that.Cancel); - return d; - } - - public static IPromise Then(this IPromise that, Func success, Func error) { - return Then(that, success, error, null); - } - - public static IPromise Then(this IPromise that, Func success) { - return Then(that, success, null, null); + public static IPromise Then(this IPromise that, Func fulfilled, Func> rejected) { + var reaction = new PromiseFuncReaction(fulfilled, rejected, Promise.DefaultDispatcher); + that.Then(reaction); + return reaction.Promise; } - public static IPromise Then(this IPromise that, Func success, Func error, Func cancel) { - Safe.ArgumentNotNull(that, "that"); - Safe.ArgumentNotNull(success, "success"); - - var d = new FuncTask(success, error, cancel, false); - that.On(d.Resolve, d.Reject, d.CancelOperation); - d.CancellationRequested(that.Cancel); - return d; - } - - public static IPromise Then(this IPromise that, Action success, Func error, Func cancel) { - Safe.ArgumentNotNull(that, "that"); - var d = new FuncTask( - x => { - success(x); - return x; - }, - error, - cancel, - false - ); - that.On(d.Resolve, d.Reject, d.CancelOperation); - d.CancellationRequested(that.Cancel); - return d; - } - - public static IPromise Then(this IPromise that, Action success, Func error) { - return Then(that, success, error, null); + public static IPromise Then(this IPromise that, Func> fulfilled, Func rejected) { + var reaction = new PromiseFuncReaction(fulfilled, rejected, Promise.DefaultDispatcher); + that.Then(reaction); + return reaction.Promise; } - public static IPromise Then(this IPromise that, Action success) { - return Then(that, success, null, null); - } - - public static IPromise Then(this IPromise that, Func success, Func error) { - return Then(that, success, error, null); - } - - public static IPromise Then(this IPromise that, Func success) { - return Then(that, success, null, null); + public static IPromise Then(this IPromise that, Func> fulfilled, Func> rejected) { + var reaction = new PromiseFuncReaction(fulfilled, rejected, Promise.DefaultDispatcher); + that.Then(reaction); + return reaction.Promise; } - public static IPromise Always(this IPromise that, Action handler) { - Func errorOrCancel; - if (handler != null) - errorOrCancel = e => { - handler(); - throw new PromiseTransientException(e); - }; - else - errorOrCancel = null; - - return Then( - that, - x => { - handler(); - return x; - }, - errorOrCancel, - errorOrCancel); + public static IPromise Then(this IPromise that, Func fulfilled, Func rejected) { + var reaction = new PromiseFuncReaction(fulfilled, rejected, Promise.DefaultDispatcher); + that.Then(reaction); + return reaction.Promise; } - public static IPromise Always(this IPromise that, Action handler) { - Action errorOrCancel; - if (handler != null) - errorOrCancel = e => { - handler(); - throw new PromiseTransientException(e); - }; - else - errorOrCancel = null; - - return Then( - that, - handler, - errorOrCancel, - errorOrCancel); - } - - public static IPromise Error(this IPromise that, Action handler, bool handleCancellation) { - Action errorOrCancel; - if (handler != null) - errorOrCancel = e => { - handler(e); - throw new PromiseTransientException(e); - }; - else - errorOrCancel = null; - - return Then(that, null, errorOrCancel, handleCancellation ? errorOrCancel : null); + public static IPromise Then(this IPromise that, Func fulfilled, Func> rejected) { + var reaction = new PromiseFuncReaction(fulfilled, rejected, Promise.DefaultDispatcher); + that.Then(reaction); + return reaction.Promise; } - public static IPromise Error(this IPromise that, Action handler) { - return Error(that, handler, false); - } - - public static IPromise Error(this IPromise that, Action handler, bool handleCancellation) { - Func errorOrCancel; - if (handler != null) - errorOrCancel = e => { - handler(e); - throw new PromiseTransientException(e); - }; - else - errorOrCancel = null; - - return Then(that, null, errorOrCancel, handleCancellation ? errorOrCancel : null); + public static IPromise Then(this IPromise that, Func> fulfilled, Func rejected) { + var reaction = new PromiseFuncReaction(fulfilled, rejected, Promise.DefaultDispatcher); + that.Then(reaction); + return reaction.Promise; } - public static IPromise Error(this IPromise that, Action handler) { - return Error(that, handler, false); - } - - #region chain traits - public static IPromise Chain(this IPromise that, Func success, Func error, Func cancel) { - Safe.ArgumentNotNull(that, "that"); - - var d = new ActionChainTask(success, error, cancel, false); - that.On(d.Resolve, d.Reject, d.CancelOperation); - d.CancellationRequested(that.Cancel); - return d; - } - - public static IPromise Chain(this IPromise that, Func success, Func error) { - return Chain(that, success, error, null); + public static IPromise Then(this IPromise that, Func> fulfilled, Func> rejected) { + var reaction = new PromiseFuncReaction(fulfilled, rejected, Promise.DefaultDispatcher); + that.Then(reaction); + return reaction.Promise; } - public static IPromise Chain(this IPromise that, Func success) { - return Chain(that, success, null, null); - } - - public static IPromise Chain(this IPromise that, Func> success, Func> error, Func> cancel) { - Safe.ArgumentNotNull(that, "that"); - - var d = new FuncChainTask(success, error, cancel, false); - that.On(d.Resolve, d.Reject, d.CancelOperation); - if (success != null) - d.CancellationRequested(that.Cancel); - return d; - } - - public static IPromise Chain(this IPromise that, Func> success, Func> error) { - return Chain(that, success, error, null); - } - - public static IPromise Chain(this IPromise that, Func> success) { - return Chain(that, success, null, null); + public static IPromise Catch(this IPromise that, Action rejected) { + return Then(that, null, rejected); } - public static IPromise Chain(this IPromise that, Func> success, Func> error, Func> cancel) { - Safe.ArgumentNotNull(that, "that"); - var d = new FuncChainTask(success, error, cancel, false); - that.On(d.Resolve, d.Reject, d.CancelOperation); - if (success != null) - d.CancellationRequested(that.Cancel); - return d; + public static IPromise Catch(this IPromise that, Func rejected) { + return Then(that, null, rejected); } - public static IPromise Chain(this IPromise that, Func> success, Func> error) { - return Chain(that, success, error, null); + public static IPromise Catch(this IPromise that, Func rejected) { + return Then(that, (Func)null, rejected); } - public static IPromise Chain(this IPromise that, Func> success) { - return Chain(that, success, null, null); - } - - #endregion - - public static IPromise Guard(this IPromise that, Func, IPromise> continuation, Action cleanup) { - Safe.ArgumentNotNull(that, "that"); - Safe.ArgumentNotNull(continuation, "continuation"); - return continuation(that).Error((err) => { - that.On(cleanup); - }, true); - } - -#if NET_4_5 - - public static PromiseAwaiter GetAwaiter(this IPromise that) { - Safe.ArgumentNotNull(that, "that"); + public static IPromise Catch(this IPromise that, Func> rejected) { + return Then(that, (Func)null, rejected); + } - return new PromiseAwaiter(that); - } - - public static PromiseAwaiter GetAwaiter(this IPromise that) { - Safe.ArgumentNotNull(that, "that"); + public static IPromise Catch(this IPromise that, Func rejected) { + return Then(that, (Func)null, rejected); + } - return new PromiseAwaiter(that); - } - - public static IPromise BoundCancellationToken(this IPromise that, CancellationToken ct) { - Safe.ArgumentNotNull(that, "that"); - ct.Register(that.Cancel); - return that.Then(null, null, (err) => { - ct.ThrowIfCancellationRequested(); - throw new PromiseTransientException(err); - }); - } - - public static IPromise BoundCancellationToken(this IPromise that, CancellationToken ct) { - Safe.ArgumentNotNull(that, "that"); - ct.Register(that.Cancel); - return that.Then(null, null, (err) => { - ct.ThrowIfCancellationRequested(); - throw new PromiseTransientException(err); - }); - } - -#endif + public static IPromise Catch(this IPromise that, Func> rejected) { + return Then(that, (Func)null, rejected); + } } } diff --git a/Implab/PromiseFuncReaction`1.cs b/Implab/PromiseFuncReaction`1.cs --- a/Implab/PromiseFuncReaction`1.cs +++ b/Implab/PromiseFuncReaction`1.cs @@ -3,51 +3,47 @@ using System.Diagnostics; namespace Implab { class PromiseFuncReaction : PromiseReaction { - readonly Action m_fulfilled; - - readonly Action m_rejected; - readonly Deferred m_next; - public PromiseFuncReaction(Func fulfilled, Func rejected, Deferred next, IDispatcher dispatcher) : base(dispatcher) { + public IPromise Promise { + get { return m_next.Promise; } + } + + public PromiseFuncReaction(Func fulfilled, Func rejected, IDispatcher dispatcher) : base(dispatcher) { + m_next = new Deferred(dispatcher); if (fulfilled != null) - m_fulfilled = () => { next.Resolve(fulfilled()); }; + FulfilHandler = PromiseHandler.Create(fulfilled, m_next); if (rejected != null) - m_rejected = (e) => { next.Resolve(rejected(e)); }; - m_next = next; - } - - public PromiseFuncReaction(Func> fulfilled, Func> rejected, Deferred next, IDispatcher dispatcher) : base(dispatcher) { - if (fulfilled != null) - m_fulfilled = () => { next.Resolve(fulfilled()); }; - if (rejected != null) - m_rejected = (e) => { next.Resolve(rejected(e)); }; - m_next = next; + RejectHandler = PromiseHandler.Create(rejected, m_next); } - public PromiseFuncReaction(Func fulfilled, Func> rejected, Deferred next, IDispatcher dispatcher) : base(dispatcher) { + public PromiseFuncReaction(Func> fulfilled, Func> rejected, IDispatcher dispatcher) : base(dispatcher) { + m_next = new Deferred(dispatcher); if (fulfilled != null) - m_fulfilled = () => { next.Resolve(fulfilled()); }; + FulfilHandler = PromiseHandler.Create(fulfilled, m_next); + if (rejected != null) - m_rejected = (e) => { next.Resolve(rejected(e)); }; - - m_next = next; + RejectHandler = PromiseHandler.Create(rejected, m_next); } - public PromiseFuncReaction(Func> fulfilled, Func rejected, Deferred next, IDispatcher dispatcher) : base(dispatcher) { + public PromiseFuncReaction(Func fulfilled, Func> rejected, IDispatcher dispatcher) : base(dispatcher) { + m_next = new Deferred(dispatcher); if (fulfilled != null) - m_fulfilled = () => { next.Resolve(fulfilled()); }; + FulfilHandler = PromiseHandler.Create(fulfilled, m_next); + if (rejected != null) - m_rejected = (e) => { next.Resolve(rejected(e)); }; - - m_next = next; + RejectHandler = PromiseHandler.Create(rejected, m_next); } + public PromiseFuncReaction(Func> fulfilled, Func rejected, IDispatcher dispatcher) : base(dispatcher) { + m_next = new Deferred(dispatcher); + if (fulfilled != null) + FulfilHandler = PromiseHandler.Create(fulfilled, m_next); - protected override bool HasFulfilHandler => m_fulfilled != null; - - protected override bool HasRejectHandler => m_rejected != null; + if (rejected != null) + RejectHandler = PromiseHandler.Create(rejected, m_next); + } protected override void DefaultReject(Exception reason) { m_next.Reject(reason); @@ -56,21 +52,5 @@ namespace Implab { protected override void DefaultResolve() { throw new NotImplementedException(); } - - protected override void RejectImpl(Exception reason) { - try { - m_rejected(reason); - } catch (Exception e){ - m_next.Reject(e); - } - } - - protected override void ResolveImpl() { - try { - m_fulfilled(); - } catch (Exception e){ - m_next.Reject(e); - } - } } } \ No newline at end of file diff --git a/Implab/PromiseFuncReaction`2.cs b/Implab/PromiseFuncReaction`2.cs new file mode 100644 --- /dev/null +++ b/Implab/PromiseFuncReaction`2.cs @@ -0,0 +1,56 @@ +using System; +using System.Diagnostics; + +namespace Implab { + class PromiseFuncReaction : PromiseReaction { + readonly Deferred m_next; + + public IPromise Promise { + get { return m_next.Promise; } + } + + public PromiseFuncReaction(Func fulfilled, Func rejected, IDispatcher dispatcher) : base(dispatcher) { + m_next = new Deferred(dispatcher); + if (fulfilled != null) + FulfilHandler = PromiseHandler.Create(fulfilled, m_next); + + if (rejected != null) + RejectHandler = PromiseHandler.Create(rejected, m_next); + } + + public PromiseFuncReaction(Func> fulfilled, Func> rejected, IDispatcher dispatcher) : base(dispatcher) { + m_next = new Deferred(dispatcher); + if (fulfilled != null) + FulfilHandler = PromiseHandler.Create(fulfilled, m_next); + + if (rejected != null) + RejectHandler = PromiseHandler.Create(rejected, m_next); + } + + public PromiseFuncReaction(Func fulfilled, Func> rejected, IDispatcher dispatcher) : base(dispatcher) { + m_next = new Deferred(dispatcher); + if (fulfilled != null) + FulfilHandler = PromiseHandler.Create(fulfilled, m_next); + + if (rejected != null) + RejectHandler = PromiseHandler.Create(rejected, m_next); + } + + public PromiseFuncReaction(Func> fulfilled, Func rejected, IDispatcher dispatcher) : base(dispatcher) { + m_next = new Deferred(dispatcher); + if (fulfilled != null) + FulfilHandler = PromiseHandler.Create(fulfilled, m_next); + + if (rejected != null) + RejectHandler = PromiseHandler.Create(rejected, m_next); + } + + protected override void DefaultReject(Exception reason) { + m_next.Reject(reason); + } + + protected override void DefaultResolve(TIn result) { + m_next.Resolve((TRet)(object)result); + } + } +} \ No newline at end of file diff --git a/Implab/PromiseHandler.cs b/Implab/PromiseHandler.cs new file mode 100644 --- /dev/null +++ b/Implab/PromiseHandler.cs @@ -0,0 +1,102 @@ +using System; +using System.Diagnostics; + +namespace Implab { + class PromiseHandler { + public static Action Create(Action handler, Deferred next) { + Debug.Assert(handler != null); + + return (v) => { + try { + handler(v); + next.Resolve(); + } catch (Exception err) { + next.Reject(err); + } + }; + } + + public static Action Create(Func handler, Deferred next) { + Debug.Assert(handler != null); + + return (v) => { + try { + next.Resolve(handler(v)); + } catch (Exception err) { + next.Reject(err); + } + }; + } + + public static Action Create(Func handler, Deferred next) { + Debug.Assert(handler != null); + + return (v) => { + try { + next.Resolve(handler(v)); + } catch (Exception err) { + next.Reject(err); + } + }; + } + + public static Action Create(Func> handler, Deferred next) { + Debug.Assert(handler != null); + return (v) => { + try { + next.Resolve(handler(v)); + } catch (Exception err) { + next.Reject(err); + } + }; + } + + public static Action Create(Action handler, Deferred next) { + Debug.Assert(handler != null); + + return () => { + try { + handler(); + next.Resolve(); + } catch (Exception err) { + next.Reject(err); + } + }; + } + + public static Action Create(Func handler, Deferred next) { + Debug.Assert(handler != null); + + return () => { + try { + next.Resolve(handler()); + } catch (Exception err) { + next.Reject(err); + } + }; + } + + public static Action Create(Func handler, Deferred next) { + Debug.Assert(handler != null); + + return () => { + try { + next.Resolve(handler()); + } catch (Exception err) { + next.Reject(err); + } + }; + } + + public static Action Create(Func> handler, Deferred next) { + Debug.Assert(handler != null); + return () => { + try { + next.Resolve(handler()); + } catch (Exception err) { + next.Reject(err); + } + }; + } + } +} \ No newline at end of file diff --git a/Implab/PromiseReaction.cs b/Implab/PromiseReaction.cs --- a/Implab/PromiseReaction.cs +++ b/Implab/PromiseReaction.cs @@ -1,6 +1,11 @@ using System; namespace Implab { + /// + /// Базовыйй класс для создания обработчиков результов выполнения обещаний. + /// Данный объект связывает обработчик и обешание, при этом для выполнения + /// обработчика будет использоваться диспетчер. + /// abstract class PromiseReaction : IResolvable { readonly IDispatcher m_dispatcher; @@ -8,36 +13,28 @@ namespace Implab { m_dispatcher = dispatcher; } - protected abstract bool HasFulfilHandler { - get; - } + protected Action FulfilHandler { get; set; } - protected abstract bool HasRejectHandler { - get; - } + protected Action RejectHandler { get; set; } public void Reject(Exception error) { - if (!HasRejectHandler) + if (RejectHandler == null) DefaultReject(error); else if (m_dispatcher != null) - m_dispatcher.Enqueue(() => RejectImpl(error)); + m_dispatcher.Enqueue(RejectHandler, error); else - RejectImpl(error); + RejectHandler(error); } public void Resolve() { - if (!HasFulfilHandler) + if (FulfilHandler == null) DefaultResolve(); else if (m_dispatcher != null) - m_dispatcher.Enqueue(ResolveImpl); + m_dispatcher.Enqueue(FulfilHandler); else - ResolveImpl(); + FulfilHandler(); } - protected abstract void ResolveImpl(); - - protected abstract void RejectImpl(Exception reason); - protected abstract void DefaultResolve(); protected abstract void DefaultReject(Exception reason); diff --git a/Implab/PromiseReaction`1.cs b/Implab/PromiseReaction`1.cs --- a/Implab/PromiseReaction`1.cs +++ b/Implab/PromiseReaction`1.cs @@ -1,6 +1,11 @@ using System; namespace Implab { + /// + /// Базовыйй класс для создания обработчиков результов выполнения обещаний. + /// Данный объект связывает обработчик и обешание, при этом для выполнения + /// обработчика будет использоваться диспетчер. + /// abstract class PromiseReaction : IResolvable { readonly IDispatcher m_dispatcher; @@ -8,36 +13,28 @@ namespace Implab { m_dispatcher = dispatcher; } - protected abstract bool HasFulfilHandler { - get; - } + protected Action FulfilHandler { get; set; } - protected abstract bool HasRejectHandler { - get; - } + protected Action RejectHandler { get; set; } public void Reject(Exception error) { - if (!HasRejectHandler) + if (RejectHandler == null) DefaultReject(error); else if (m_dispatcher != null) - m_dispatcher.Enqueue(() => RejectImpl(error)); + m_dispatcher.Enqueue(RejectHandler, error); else - RejectImpl(error); + RejectHandler(error); } public void Resolve(T result) { - if (!HasFulfilHandler) + if (FulfilHandler == null) DefaultResolve(result); else if (m_dispatcher != null) - m_dispatcher.Enqueue(() => ResolveImpl(result)); + m_dispatcher.Enqueue(FulfilHandler, result); else - ResolveImpl(result); + FulfilHandler(result); } - protected abstract void ResolveImpl(T result); - - protected abstract void RejectImpl(Exception reason); - protected abstract void DefaultResolve(T result); protected abstract void DefaultReject(Exception reason); diff --git a/Implab/Promise`1.cs b/Implab/Promise`1.cs new file mode 100644 --- /dev/null +++ b/Implab/Promise`1.cs @@ -0,0 +1,159 @@ +using System; +using System.Diagnostics; +using System.Reflection; +using Implab.Parallels; + +namespace Implab { + public class Promise : AbstractEvent>, IPromise { + + class ResolvableSignal : IResolvable { + public Signal Signal { get; private set; } + public ResolvableSignal() { + Signal = new Signal(); + } + + + public void Reject(Exception error) { + Signal.Set(); + } + + public void Resolve(T result) { + Signal.Set(); + } + } + + class ResolvableWrapper : IResolvable { + readonly IResolvable m_resolvable; + public ResolvableWrapper(IResolvable resolvable) { + m_resolvable = resolvable; + } + + public void Reject(Exception reason) { + m_resolvable.Reject(reason); + } + + public void Resolve(T value) { + m_resolvable.Resolve(); + } + } + + PromiseState m_state; + + T m_result; + + Exception m_error; + + public bool IsRejected { + get { + return m_state == PromiseState.Rejected; + } + } + + public bool IsFulfilled { + get { + return m_state == PromiseState.Fulfilled; + } + } + + public Exception RejectReason { + get { + return m_error; + } + } + + + internal void ResolvePromise(T result) { + if (BeginTransit()) { + m_result = result; + m_state = PromiseState.Fulfilled; + CompleteTransit(); + } + } + + internal void RejectPromise(Exception reason) { + if (BeginTransit()) { + m_error = reason; + m_state = PromiseState.Rejected; + CompleteTransit(); + } + } + + + #region implemented abstract members of AbstractPromise + + protected override void SignalHandler(IResolvable handler) { + switch (m_state) { + case PromiseState.Fulfilled: + handler.Resolve(m_result); + break; + case PromiseState.Rejected: + handler.Reject(RejectReason); + break; + default: + throw new InvalidOperationException(String.Format("Invalid promise signal: {0}", m_state)); + } + } + + protected void WaitResult(int timeout) { + if (!(IsResolved || GetFulfillSignal().Wait(timeout))) + throw new TimeoutException(); + } + + protected Signal GetFulfillSignal() { + var next = new ResolvableSignal(); + Then(next); + return next.Signal; + } + + #endregion + + public Type ResultType { + get { + return typeof(void); + } + } + + + protected void Rethrow() { + if (m_error is OperationCanceledException) + throw new OperationCanceledException("Operation cancelled", m_error); + else + throw new TargetInvocationException(m_error); + } + + public void Then(IResolvable next) { + AddHandler(next); + } + + public void Then(IResolvable next) { + AddHandler(new ResolvableWrapper(next)); + } + + public IPromise Cast() { + return (IPromise)this; + } + + void IPromise.Join() { + Join(); + } + + void IPromise.Join(int timeout) { + Join(timeout); + } + + public T Join() { + WaitResult(-1); + if (IsRejected) + Rethrow(); + return m_result; + } + + public T Join(int timeout) { + WaitResult(timeout); + if (IsRejected) + Rethrow(); + return m_result; + } + } +} + diff --git a/Implab/RejectedPromise.cs b/Implab/RejectedPromise.cs new file mode 100644 --- /dev/null +++ b/Implab/RejectedPromise.cs @@ -0,0 +1,38 @@ +using System; + +namespace Implab +{ + public struct RejectedPromise : IPromise { + readonly Exception m_reason; + + public Type ResultType => typeof(void); + + public bool IsResolved => true; + + public bool IsRejected => true; + + public bool IsFulfilled => false; + + public Exception RejectReason => m_reason; + + public RejectedPromise(Exception reason) { + m_reason = reason; + } + + public IPromise Cast() { + throw new InvalidCastException(); + } + + public void Join() { + m_reason.ThrowInvocationException(); + } + + public void Join(int timeout) { + m_reason.ThrowInvocationException(); + } + + public void Then(IResolvable next) { + next.Reject(m_reason); + } + } +} \ No newline at end of file diff --git a/Implab/RejectedPromise`1.cs b/Implab/RejectedPromise`1.cs new file mode 100644 --- /dev/null +++ b/Implab/RejectedPromise`1.cs @@ -0,0 +1,52 @@ +using System; + +namespace Implab +{ + public struct RejectedPromise : IPromise { + readonly Exception m_reason; + + public Type ResultType => typeof(void); + + public bool IsResolved => true; + + public bool IsRejected => true; + + public bool IsFulfilled => false; + + public Exception RejectReason => m_reason; + + public RejectedPromise(Exception reason) { + m_reason = reason; + } + + public IPromise Cast() { + return (IPromise)(IPromise)this; + } + + void IPromise.Join() { + m_reason.ThrowInvocationException(); + } + + void IPromise.Join(int timeout) { + m_reason.ThrowInvocationException(); + } + + public T Join() { + m_reason.ThrowInvocationException(); + throw new Exception(); // unreachable code + } + + public T Join(int timeout) { + m_reason.ThrowInvocationException(); + throw new Exception(); // unreachable code + } + + public void Then(IResolvable next) { + next.Reject(m_reason); + } + + public void Then(IResolvable next) { + next.Reject(m_reason); + } + } +} \ No newline at end of file diff --git a/Implab/ResolvedPromise.cs b/Implab/ResolvedPromise.cs new file mode 100644 --- /dev/null +++ b/Implab/ResolvedPromise.cs @@ -0,0 +1,30 @@ +using System; + +namespace Implab +{ + public struct ResolvedPromise : IPromise { + public Type ResultType => typeof(void); + + public bool IsResolved => true; + + public bool IsRejected => false; + + public bool IsFulfilled => true; + + public Exception RejectReason => null; + + public IPromise Cast() { + throw new InvalidCastException(); + } + + public void Join() { + } + + public void Join(int timeout) { + } + + public void Then(IResolvable next) { + next.Resolve(); + } + } +} \ No newline at end of file diff --git a/Implab/ResolvedPromise`1.cs b/Implab/ResolvedPromise`1.cs new file mode 100644 --- /dev/null +++ b/Implab/ResolvedPromise`1.cs @@ -0,0 +1,47 @@ +using System; + +namespace Implab { + public struct ResolvedPromise : IPromise { + T m_result; + + public Type ResultType => typeof(T); + + public bool IsResolved => true; + + public bool IsRejected => false; + + public bool IsFulfilled => true; + + public Exception RejectReason => null; + + public ResolvedPromise(T result) { + m_result = result; + } + + public IPromise Cast() { + return (IPromise)(IPromise)this; + } + + void IPromise.Join() { + } + + void IPromise.Join(int timeout) { + } + + public T Join() { + return m_result; + } + + public T Join(int timeout) { + return m_result; + } + + public void Then(IResolvable next) { + next.Resolve(m_result); + } + + public void Then(IResolvable next) { + next.Resolve(); + } + } +} \ No newline at end of file diff --git a/Implab/Safe.cs b/Implab/Safe.cs --- a/Implab/Safe.cs +++ b/Implab/Safe.cs @@ -112,9 +112,9 @@ namespace Implab ArgumentNotNull(action, "action"); try { - return Promise.FromResult(action()); + return Promise.Resolve(action()); } catch (Exception err) { - return Promise.FromException(err); + return Promise.Reject(err); } } @@ -124,9 +124,9 @@ namespace Implab try { action(); - return Promise.Success; + return Promise.Resolve(); } catch (Exception err) { - return new FailedPromise(err); + return Promise.Reject(err); } } @@ -135,9 +135,9 @@ namespace Implab ArgumentNotNull(action, "action"); try { - return action() ?? new FailedPromise(new Exception("The action returned null")); + return action() ?? Promise.Reject(new Exception("The action returned null")); } catch (Exception err) { - return new FailedPromise(err); + return Promise.Reject(err); } } @@ -149,9 +149,9 @@ namespace Implab ArgumentNotNull(action, "action"); try { - return action() ?? Promise.FromException(new Exception("The action returned null")); + return action() ?? Promise.Reject(new Exception("The action returned null")); } catch (Exception err) { - return Promise.FromException(err); + return Promise.Reject(err); } }