diff --git a/Implab/AbstractEvent.cs b/Implab/AbstractEvent.cs new file mode 100644 --- /dev/null +++ b/Implab/AbstractEvent.cs @@ -0,0 +1,350 @@ +using System; +using Implab.Parallels; +using System.Threading; +using System.Reflection; + +namespace Implab { + public abstract class AbstractEvent : ICancelationToken, ICancellable { + + const int UNRESOLVED_SATE = 0; + const int TRANSITIONAL_STATE = 1; + const int SUCCEEDED_STATE = 2; + const int REJECTED_STATE = 3; + const int CANCELLED_STATE = 4; + + const int CANCEL_NOT_REQUESTED = 0; + const int CANCEL_REQUESTING = 1; + const int CANCEL_REQUESTED = 2; + + const int RESERVED_HANDLERS_COUNT = 4; + + int m_state; + Exception m_error; + int m_handlersCount; + + readonly THandler[] m_handlers = new THandler[RESERVED_HANDLERS_COUNT]; + MTQueue m_extraHandlers; + int m_handlerPointer = -1; + int m_handlersCommited; + + int m_cancelRequest; + Exception m_cancelationReason; + MTQueue> m_cancelationHandlers; + + + #region state managment + bool BeginTransit() { + return UNRESOLVED_SATE == Interlocked.CompareExchange(ref m_state, TRANSITIONAL_STATE, UNRESOLVED_SATE); + } + + void CompleteTransit(int state) { + if (TRANSITIONAL_STATE != Interlocked.CompareExchange(ref m_state, state, TRANSITIONAL_STATE)) + throw new InvalidOperationException("Can't complete transition when the object isn't in the transitional state"); + } + + void WaitTransition() { + while (m_state == TRANSITIONAL_STATE) { + Thread.MemoryBarrier(); + } + } + + protected bool BeginSetResult() { + if (!BeginTransit()) { + WaitTransition(); + if (m_state != CANCELLED_STATE) + throw new InvalidOperationException("The promise is already resolved"); + return false; + } + return true; + } + + protected void EndSetResult() { + CompleteTransit(SUCCEEDED_STATE); + OnSuccess(); + } + + + + /// + /// Выполняет обещание, сообщая об ошибке + /// + /// + /// Поскольку обещание должно работать в многопточной среде, при его выполнении сразу несколько потоков + /// могу вернуть ошибку, при этом только первая будет использована в качестве результата, остальные + /// будут проигнорированы. + /// + /// Исключение возникшее при выполнении операции + /// Данное обещание уже выполнено + protected void SetError(Exception error) { + if (BeginTransit()) { + if (error is OperationCanceledException) { + CompleteTransit(CANCELLED_STATE); + m_error = error.InnerException; + OnCancelled(); + } else { + m_error = error is PromiseTransientException ? error.InnerException : error; + CompleteTransit(REJECTED_STATE); + OnError(); + } + } else { + WaitTransition(); + if (m_state == SUCCEEDED_STATE) + throw new InvalidOperationException("The promise is already resolved"); + } + } + + /// + /// Отменяет операцию, если это возможно. + /// + /// Для определения была ли операция отменена следует использовать свойство . + protected void SetCancelled(Exception reason) { + if (BeginTransit()) { + m_error = reason; + CompleteTransit(CANCELLED_STATE); + OnCancelled(); + } + } + + protected abstract void SignalSuccess(THandler handler); + + protected abstract void SignalError(THandler handler, Exception error); + + protected abstract void SignalCancelled(THandler handler, Exception reason); + + void OnSuccess() { + var hp = m_handlerPointer; + var slot = hp +1 ; + while (slot < m_handlersCommited) { + if (Interlocked.CompareExchange(ref m_handlerPointer, slot, hp) == hp) { + SignalSuccess(m_handlers[slot]); + } + hp = m_handlerPointer; + slot = hp +1 ; + } + + + if (m_extraHandlers != null) { + THandler handler; + while (m_extraHandlers.TryDequeue(out handler)) + SignalSuccess(handler); + } + } + + void OnError() { + var hp = m_handlerPointer; + var slot = hp +1 ; + while (slot < m_handlersCommited) { + if (Interlocked.CompareExchange(ref m_handlerPointer, slot, hp) == hp) { + SignalError(m_handlers[slot],m_error); + } + hp = m_handlerPointer; + slot = hp +1 ; + } + + if (m_extraHandlers != null) { + THandler handler; + while (m_extraHandlers.TryDequeue(out handler)) + SignalError(handler, m_error); + } + } + + void OnCancelled() { + var hp = m_handlerPointer; + var slot = hp +1 ; + while (slot < m_handlersCommited) { + if (Interlocked.CompareExchange(ref m_handlerPointer, slot, hp) == hp) { + SignalCancelled(m_handlers[slot], m_error); + } + hp = m_handlerPointer; + slot = hp +1 ; + } + + if (m_extraHandlers != null) { + THandler handler; + while (m_extraHandlers.TryDequeue(out handler)) + SignalCancelled(handler, m_error); + } + } + + #endregion + + protected abstract Signal GetResolveSignal(); + + #region synchronization traits + protected void WaitResult(int timeout) { + if (!IsResolved) + GetResolveSignal().Wait(timeout); + + switch (m_state) { + case SUCCEEDED_STATE: + return; + case CANCELLED_STATE: + throw new OperationCanceledException(); + case REJECTED_STATE: + throw new TargetInvocationException(m_error); + default: + throw new ApplicationException(String.Format("Invalid promise state {0}", m_state)); + } + } + #endregion + + #region handlers managment + + protected void AddHandler(THandler handler) { + + if (m_state > 1) { + // the promise is in the resolved state, just invoke the handler + InvokeHandler(handler); + } else { + var slot = Interlocked.Increment(ref m_handlersCount) - 1; + + if (slot < RESERVED_HANDLERS_COUNT) { + + m_handlers[slot] = handler; + + while (slot != Interlocked.CompareExchange(ref m_handlersCommited, slot + 1, slot)) { + } + + if (m_state > 1) { + do { + var hp = m_handlerPointer; + slot = hp + 1; + if (slot < m_handlersCommited) { + if (Interlocked.CompareExchange(ref m_handlerPointer, slot, hp) != hp) + continue; + InvokeHandler(m_handlers[slot]); + } + break; + } while(true); + } + } else { + if (slot == RESERVED_HANDLERS_COUNT) { + m_extraHandlers = new MTQueue(); + } else { + while (m_extraHandlers == null) + Thread.MemoryBarrier(); + } + + m_extraHandlers.Enqueue(handler); + + if (m_state > 1 && m_extraHandlers.TryDequeue(out handler)) + // if the promise have been resolved while we was adding the handler to the queue + // we can't guarantee that someone is still processing it + // therefore we need to fetch a handler from the queue and execute it + // note that fetched handler may be not the one that we have added + // even we can fetch no handlers at all :) + InvokeHandler(handler); + } + } + } + + protected void InvokeHandler(THandler handler) { + switch (m_state) { + case SUCCEEDED_STATE: + SignalSuccess(handler); + break; + case CANCELLED_STATE: + SignalCancelled(handler, m_error); + break; + case REJECTED_STATE: + SignalError(handler, m_error); + break; + default: + throw new Exception(String.Format("Invalid promise state {0}", m_state)); + } + } + + #endregion + + #region IPromise implementation + + public bool IsResolved { + get { + Thread.MemoryBarrier(); + return m_state > 1; + } + } + + public bool IsCancelled { + get { + Thread.MemoryBarrier(); + return m_state == CANCELLED_STATE; + } + } + + #endregion + + public Exception Error { + get { + return m_error; + } + } + + public bool AcceptIfRequested() { + if (IsCancelRequested) + CancelOperation(CancelReason); + } + + public virtual void CancelOperation(Exception reason) { + SetCancelled(reason); + } + + public void CancelationRequested(Action handler) { + Safe.ArgumentNotNull(handler, "handler"); + if (IsCancelRequested) + handler(CancelReason); + + if (m_cancelationHandlers == null) + Interlocked.CompareExchange(ref m_cancelationHandlers, new MTQueue>(), null); + + m_cancelationHandlers.Enqueue(handler); + + if (IsCancelRequested && m_cancelationHandlers.TryDequeue(out handler)) + // TryDeque implies MemoryBarrier() + handler(m_cancelationReason); + } + + public bool IsCancelRequested { + get { + do { + if (m_cancelRequest == CANCEL_NOT_REQUESTED) + return false; + if (m_cancelRequest == CANCEL_REQUESTED) + return true; + Thread.MemoryBarrier(); + } while(true); + } + } + + public Exception CancelReason { + get { + do { + Thread.MemoryBarrier(); + } while(m_cancelRequest == CANCEL_REQUESTING); + + return m_cancelationReason; + } + } + + #region ICancellable implementation + + public void Cancel() { + Cancel(null); + } + + public void Cancel(Exception reason) { + if (CANCEL_NOT_REQUESTED == Interlocked.CompareExchange(ref m_cancelRequest, CANCEL_REQUESTING)) { + m_cancelationReason = reason; + m_cancelRequest = CANCEL_REQUESTED; + if (m_cancelationHandlers != null) { + Action handler; + while (m_cancelationHandlers.TryDequeue(out handler)) + handler(m_cancelationReason); + } + } + } + + #endregion + } +} + diff --git a/Implab/AbstractPromise.cs b/Implab/AbstractPromise.cs --- a/Implab/AbstractPromise.cs +++ b/Implab/AbstractPromise.cs @@ -1,295 +1,135 @@ using System; using Implab.Parallels; -using System.Threading; -using System.Reflection; namespace Implab { - public abstract class AbstractPromise { + public abstract class AbstractPromise : AbstractEvent, IPromise { + public struct HandlerDescriptor { + readonly Action m_handler; + readonly Action m_error; + readonly Action m_cancel; + readonly PromiseEventType m_mask; - const int UNRESOLVED_SATE = 0; - const int TRANSITIONAL_STATE = 1; - const int SUCCEEDED_STATE = 2; - const int REJECTED_STATE = 3; - const int CANCELLED_STATE = 4; - - const int RESERVED_HANDLERS_COUNT = 4; + public HandlerDescriptor(Action success, Action error, Action cancel) { + m_handler = success; + m_error = error; + m_cancel = cancel; + m_mask = PromiseEventType.Success; + } - int m_state; - Exception m_error; - int m_handlersCount; + public HandlerDescriptor(Action handler, PromiseEventType mask) { + m_handler = handler; + m_mask = mask; + } - readonly THandler[] m_handlers = new THandler[RESERVED_HANDLERS_COUNT]; - MTQueue m_extraHandlers; - int m_handlerPointer = -1; - int m_handlersCommited; - - #region state managment - bool BeginTransit() { - return UNRESOLVED_SATE == Interlocked.CompareExchange(ref m_state, TRANSITIONAL_STATE, UNRESOLVED_SATE); - } + public void SignalSuccess() { + if (m_mask & PromiseEventType.Success && m_handler != null) { + try { + m_handler(); + } catch (Exception err) { + // avoid calling handler twice in case of error + if (m_error != null) + SignalError(err); + } + } + } - void CompleteTransit(int state) { - if (TRANSITIONAL_STATE != Interlocked.CompareExchange(ref m_state, state, TRANSITIONAL_STATE)) - throw new InvalidOperationException("Can't complete transition when the object isn't in the transitional state"); - } - - void WaitTransition() { - while (m_state == TRANSITIONAL_STATE) { - Thread.MemoryBarrier(); + public void SignalError(Exception err) { + if (m_error != null) { + try { + m_error(err); + // Analysis disable once EmptyGeneralCatchClause + } catch { + } + } else if (m_mask & PromiseEventType.Error && m_handler != null) { + try { + m_handler(); + // Analysis disable once EmptyGeneralCatchClause + } catch { + } + } } - } - protected bool BeginSetResult() { - if (!BeginTransit()) { - WaitTransition(); - if (m_state != CANCELLED_STATE) - throw new InvalidOperationException("The promise is already resolved"); - return false; + public void SignalCancel(Exception reason) { + if (m_cancel != null) { + try { + m_cancel(reason); + } catch (Exception err) { + SignalError(err); + } + } else if (m_mask & PromiseEventType.Cancelled && m_handler != null) { + try { + m_handler(); + // Analysis disable once EmptyGeneralCatchClause + } catch { + } + } } - return true; - } - - protected void EndSetResult() { - CompleteTransit(SUCCEEDED_STATE); - OnSuccess(); } + #region implemented abstract members of AbstractPromise - /// - /// Выполняет обещание, сообщая об ошибке - /// - /// - /// Поскольку обещание должно работать в многопточной среде, при его выполнении сразу несколько потоков - /// могу вернуть ошибку, при этом только первая будет использована в качестве результата, остальные - /// будут проигнорированы. - /// - /// Исключение возникшее при выполнении операции - /// Данное обещание уже выполнено - protected void SetError(Exception error) { - if (BeginTransit()) { - if (error is OperationCanceledException) { - CompleteTransit(CANCELLED_STATE); - m_error = error.InnerException; - OnCancelled(); - } else { - m_error = error is PromiseTransientException ? error.InnerException : error; - CompleteTransit(REJECTED_STATE); - OnError(); - } - } else { - WaitTransition(); - if (m_state == SUCCEEDED_STATE) - throw new InvalidOperationException("The promise is already resolved"); - } - } - - /// - /// Отменяет операцию, если это возможно. - /// - /// Для определения была ли операция отменена следует использовать свойство . - protected void SetCancelled(Exception reason) { - if (BeginTransit()) { - m_error = reason; - CompleteTransit(CANCELLED_STATE); - OnCancelled(); - } + protected override void SignalSuccess(HandlerDescriptor handler) { + handler.SignalSuccess(); } - protected abstract void SignalSuccess(THandler handler); - - protected abstract void SignalError(THandler handler, Exception error); - - protected abstract void SignalCancelled(THandler handler, Exception reason); - - void OnSuccess() { - var hp = m_handlerPointer; - var slot = hp +1 ; - while (slot < m_handlersCommited) { - if (Interlocked.CompareExchange(ref m_handlerPointer, slot, hp) == hp) { - SignalSuccess(m_handlers[slot]); - } - hp = m_handlerPointer; - slot = hp +1 ; - } - - - if (m_extraHandlers != null) { - THandler handler; - while (m_extraHandlers.TryDequeue(out handler)) - SignalSuccess(handler); - } + protected override void SignalError(HandlerDescriptor handler, Exception error) { + handler.SignalError(error); } - void OnError() { - var hp = m_handlerPointer; - var slot = hp +1 ; - while (slot < m_handlersCommited) { - if (Interlocked.CompareExchange(ref m_handlerPointer, slot, hp) == hp) { - SignalError(m_handlers[slot],m_error); - } - hp = m_handlerPointer; - slot = hp +1 ; - } - - if (m_extraHandlers != null) { - THandler handler; - while (m_extraHandlers.TryDequeue(out handler)) - SignalError(handler, m_error); - } + protected override void SignalCancelled(HandlerDescriptor handler, Exception reason) { + handler.SignalCancel(reason); } - void OnCancelled() { - var hp = m_handlerPointer; - var slot = hp +1 ; - while (slot < m_handlersCommited) { - if (Interlocked.CompareExchange(ref m_handlerPointer, slot, hp) == hp) { - SignalCancelled(m_handlers[slot], m_error); - } - hp = m_handlerPointer; - slot = hp +1 ; - } - - if (m_extraHandlers != null) { - THandler handler; - while (m_extraHandlers.TryDequeue(out handler)) - SignalCancelled(handler, m_error); - } + protected override Signal GetResolveSignal() { + var signal = new Signal(); + On(signal.Set, PromiseEventType.All); } #endregion - protected abstract void Listen(PromiseEventType events, Action handler); - #region synchronization traits - protected void WaitResult(int timeout) { - if (!IsResolved) { - var lk = new object(); - - Listen(PromiseEventType.All, () => { - lock(lk) { - Monitor.Pulse(lk); - } - }); - - lock (lk) { - while(!IsResolved) { - if(!Monitor.Wait(lk,timeout)) - throw new TimeoutException(); - } - } - - } - switch (m_state) { - case SUCCEEDED_STATE: - return; - case CANCELLED_STATE: - throw new OperationCanceledException(); - case REJECTED_STATE: - throw new TargetInvocationException(m_error); - default: - throw new ApplicationException(String.Format("Invalid promise state {0}", m_state)); - } - } - #endregion - - #region handlers managment - - protected void AddHandler(THandler handler) { - - if (m_state > 1) { - // the promise is in the resolved state, just invoke the handler - InvokeHandler(handler); - } else { - var slot = Interlocked.Increment(ref m_handlersCount) - 1; - - if (slot < RESERVED_HANDLERS_COUNT) { - m_handlers[slot] = handler; - - while (slot != Interlocked.CompareExchange(ref m_handlersCommited, slot + 1, slot)) { - } - - if (m_state > 1) { - do { - var hp = m_handlerPointer; - slot = hp + 1; - if (slot < m_handlersCommited) { - if (Interlocked.CompareExchange(ref m_handlerPointer, slot, hp) != hp) - continue; - InvokeHandler(m_handlers[slot]); - } - break; - } while(true); - } - } else { - if (slot == RESERVED_HANDLERS_COUNT) { - m_extraHandlers = new MTQueue(); - } else { - while (m_extraHandlers == null) - Thread.MemoryBarrier(); - } - - m_extraHandlers.Enqueue(handler); - - if (m_state > 1 && m_extraHandlers.TryDequeue(out handler)) - // if the promise have been resolved while we was adding the handler to the queue - // we can't guarantee that someone is still processing it - // therefore we need to fetch a handler from the queue and execute it - // note that fetched handler may be not the one that we have added - // even we can fetch no handlers at all :) - InvokeHandler(handler); - } + public Type PromiseType { + get { + return typeof(void); } } - protected void InvokeHandler(THandler handler) { - switch (m_state) { - case SUCCEEDED_STATE: - SignalSuccess(handler); - break; - case CANCELLED_STATE: - SignalCancelled(handler, m_error); - break; - case REJECTED_STATE: - SignalError(handler, m_error); - break; - default: - throw new Exception(String.Format("Invalid promise state {0}", m_state)); - } + public IPromise On(Action success, Action error, Action cancel) { + AddHandler(new HandlerDescriptor(success, error, cancel)); + return this; + } + + public IPromise On(Action success, Action error) { + AddHandler(new HandlerDescriptor(success, error, null)); + return this; } - #endregion + public IPromise On(Action success) { + AddHandler(new HandlerDescriptor(success, null, null)); + return this; + } - #region IPromise implementation + public IPromise On(Action handler, PromiseEventType events) { + AddHandler(new HandlerDescriptor(handler,events)); + return this; + } - public void Join(int timeout) { - WaitResult(timeout); + public IPromise Cast() { + throw new InvalidCastException(); } public void Join() { WaitResult(-1); } - public bool IsResolved { - get { - Thread.MemoryBarrier(); - return m_state > 1; - } + public void Join(int timeout) { + WaitResult(timeout); } - public bool IsCancelled { - get { - Thread.MemoryBarrier(); - return m_state == CANCELLED_STATE; - } - } - - #endregion - - public Exception Error { - get { - return m_error; - } + protected void SetResult() { + BeginSetResult(); + EndSetResult(); } } } diff --git a/Implab/AbstractPromiseT.cs b/Implab/AbstractPromiseT.cs new file mode 100644 --- /dev/null +++ b/Implab/AbstractPromiseT.cs @@ -0,0 +1,185 @@ +using System; +using Implab.Parallels; + +namespace Implab { + public abstract class AbstractPromise : AbstractEvent.HandlerDescriptor>, IPromise { + public struct HandlerDescriptor { + readonly Action m_handler; + readonly Action m_success; + readonly Action m_error; + readonly Action m_cancel; + readonly PromiseEventType m_mask; + + public HandlerDescriptor(Action success, Action error, Action cancel) { + m_success = success; + m_error = error; + m_cancel = cancel; + } + + public HandlerDescriptor(Action success, Action error, Action cancel) { + m_handler = success; + m_error = error; + m_cancel = cancel; + m_mask = PromiseEventType.Success; + } + + public HandlerDescriptor(Action handler, PromiseEventType mask) { + m_handler = handler; + m_mask = mask; + } + + public void SignalSuccess(T result) { + if (m_success != null) { + try { + m_success(result); + } catch(Exception err) { + SignalError(err); + } + } else if (m_mask & PromiseEventType.Success && m_handler != null) { + try { + m_handler(); + } catch(Exception err) { + // avoid calling handler twice in case of error + if (m_error != null) + SignalError(err); + } + } + } + + public void SignalError(Exception err) { + if (m_error != null) { + try { + m_error(err); + // Analysis disable once EmptyGeneralCatchClause + } catch { + } + } else if (m_mask & PromiseEventType.Error && m_handler != null) { + try { + m_handler(); + // Analysis disable once EmptyGeneralCatchClause + } catch { + } + } + } + + public void SignalCancel(Exception reason) { + if (m_cancel != null) { + try { + m_cancel(reason); + } catch (Exception err) { + SignalError(err); + } + } else if (m_mask & PromiseEventType.Cancelled && m_handler != null) { + try { + m_handler(); + // Analysis disable once EmptyGeneralCatchClause + } catch { + } + } + } + } + + + + public Type PromiseType { + get { + return typeof(T); + } + } + + public new T Join() { + WaitResult(-1); + return m_result; + } + public new T Join(int timeout) { + WaitResult(timeout); + return m_result; + } + + public IPromise On(Action success, Action error, Action cancel) { + AddHandler(new HandlerDescriptor(success, error, cancel)); + return this; + } + + public IPromise On(Action success, Action error) { + AddHandler(new HandlerDescriptor(success, error, null)); + return this; + } + + public IPromise On(Action success) { + AddHandler(new HandlerDescriptor(success, null, null)); + return this; + } + + public IPromise On(Action handler, PromiseEventType events) { + AddHandler(new HandlerDescriptor(handler, events)); + return this; + } + + public IPromise On(Action success, Action error, Action cancel) { + AddHandler(new HandlerDescriptor(success, error, cancel)); + return this; + } + + public IPromise On(Action success, Action error) { + AddHandler(new HandlerDescriptor(success, error, null)); + return this; + } + + public IPromise On(Action success) { + AddHandler(new HandlerDescriptor(success, null, null)); + return this; + } + + IPromise IPromise.On(Action success, Action error, Action cancel) { + AddHandler(new HandlerDescriptor(success, error, cancel)); + return this; + } + + IPromise IPromise.On(Action success, Action error) { + AddHandler(new HandlerDescriptor(success, error, null)); + return this; + } + + IPromise IPromise.On(Action success) { + AddHandler(new HandlerDescriptor(success, null, null)); + return this; + } + + public IPromise Cast() { + return (IPromise)this; + } + + #region implemented abstract members of AbstractPromise + + protected override Signal GetResolveSignal() { + var signal = new Signal(); + AddHandler(new HandlerDescriptor(signal.Set, PromiseEventType.All)); + return signal; + } + + protected override void SignalSuccess(HandlerDescriptor handler) { + handler.SignalSuccess(m_result); + } + + protected override void SignalError(HandlerDescriptor handler, Exception error) { + handler.SignalError(error); + } + + protected override void SignalCancelled(HandlerDescriptor handler, Exception reason) { + handler.SignalCancel(reason); + } + + #endregion + + T m_result; + + protected void SetResult(T value) { + if (BeginSetResult()) { + m_result = value; + EndSetResult(); + } + } + } +} + diff --git a/Implab/ChainTask.cs b/Implab/ChainTask.cs new file mode 100644 --- /dev/null +++ b/Implab/ChainTask.cs @@ -0,0 +1,54 @@ +using System; +using System.Threading; + +namespace Implab { + public class ChainTask : AbstractPromise, IDeferred { + readonly Func m_task; + readonly Action m_error; + readonly Action m_cancel; + + int m_cancelationLock; + + public ChainTask(Func task, Func error, Func cancel) { + m_task = task; + } + + public void Resolve() { + if (m_task != null && LockCancelation()) { + try { + var operation = m_task(); + if (operation == null) + throw new NullReferenceException("The task returned null promise"); + + operation.On(SetResult, SetError, SetCancelled); + + CancelationRequested(operation.Cancel); + } catch(Exception err) { + HandleErrorInternal(err); + } + } + } + + public void Reject(Exception error) { + throw new NotImplementedException(); + } + + protected void HandleErrorInternal(Exception error) { + if (m_error != null) { + try { + m_error(error); + SetResult(); + } catch(Exception err) { + SetError(err); + } + } else { + SetError(error); + } + } + + protected bool LockCancelation() { + return 0 == Interlocked.CompareExchange(ref m_cancelationLock, 1, 0); + } + } +} + diff --git a/Implab/FuncTask.cs b/Implab/FuncTask.cs new file mode 100644 --- /dev/null +++ b/Implab/FuncTask.cs @@ -0,0 +1,23 @@ +using System; +using System.Threading; + +namespace Implab { + public class FuncTask : FuncTaskBase, IDeferred { + readonly Func m_task; + + public FuncTask(Func task, Func error, Func cancel) : base(error,cancel) { + m_task = task; + } + + public void Resolve() { + if (m_task != null && LockCancelation()) { + try { + SetResult(m_task()); + } catch(Exception err) { + HandleErrorInternal(err); + } + } + } + } +} + diff --git a/Implab/FuncTaskBase.cs b/Implab/FuncTaskBase.cs new file mode 100644 --- /dev/null +++ b/Implab/FuncTaskBase.cs @@ -0,0 +1,53 @@ +using System; +using System.Threading; + +namespace Implab { + public class FuncTaskBase : AbstractPromise { + readonly Func m_cancel; + readonly Func m_error; + + int m_cancelationLock; + + protected FuncTaskBase( Func error, Func cancel) { + m_error = error; + m_cancel = cancel; + } + + public void Reject(Exception error) { + Safe.ArgumentNotNull(error, "error"); + if (LockCancelation()) + HandleErrorInternal(error); + } + + protected void HandleErrorInternal(Exception error) { + if (m_error != null) { + try { + SetResult(m_error(error)); + } catch(Exception err) { + SetError(err); + } + } else { + SetError(error); + } + } + + public override void CancelOperation(Exception reason) { + if (LockCancelation()) { + if (m_cancel != null) { + try { + SetResult(m_cancel(reason)); + } catch (Exception err) { + HandleErrorInternal(err); + } + } else { + SetCancelled(reason); + } + } + } + + protected bool LockCancelation() { + return 0 == Interlocked.CompareExchange(ref m_cancelationLock, 1, 0); + } + } +} + diff --git a/Implab/FuncTaskT.cs b/Implab/FuncTaskT.cs new file mode 100644 --- /dev/null +++ b/Implab/FuncTaskT.cs @@ -0,0 +1,22 @@ +using System; + +namespace Implab { + public class FuncTask : FuncTaskBase, IDeferred { + readonly Func m_task; + + public FuncTask(Func task, Func error,Func cancel) : base(error,cancel) { + m_task = task; + } + + public void Resolve(TArg value) { + if (m_task != null && LockCancelation()) { + try { + SetResult(m_task(value)); + } catch (Exception err) { + HandleErrorInternal(err); + } + } + } + } +} + diff --git a/Implab/ICancelationToken.cs b/Implab/ICancelationToken.cs --- a/Implab/ICancelationToken.cs +++ b/Implab/ICancelationToken.cs @@ -22,14 +22,14 @@ namespace Implab { /// Sets the token to cancelled state. /// /// The reason why the operation was cancelled. - void SetCancelled(Exception reason); + void CancelOperation(Exception reason); /// /// Adds the listener for the cancellation request, is the cancellation was requested the /// is executed immediatelly. /// /// The handler which will be executed if the cancel occurs. - void CancellationRequested(Action handler); + void CancelationRequested(Action handler); } } diff --git a/Implab/IPromise.cs b/Implab/IPromise.cs --- a/Implab/IPromise.cs +++ b/Implab/IPromise.cs @@ -58,13 +58,13 @@ namespace Implab { /// exception it will be passed to the dependent promise. /// /// - IPromise Then(Action success, Action error, Action cancel); + /* IPromise Then(Action success, Action error, Action cancel); IPromise Then(Action success, Action error); IPromise Then(Action success); IPromise Chain(Func chained, Func error, Func cancel); IPromise Chain(Func chained, Func error); - IPromise Chain(Func chained); + IPromise Chain(Func chained);*/ /// /// Adds specified listeners to the current promise. diff --git a/Implab/IPromiseT.cs b/Implab/IPromiseT.cs --- a/Implab/IPromiseT.cs +++ b/Implab/IPromiseT.cs @@ -3,28 +3,23 @@ namespace Implab { public interface IPromise : IPromise { - new T Join(); - - new T Join(int timeout); - IPromise On(Action success, Action error, Action cancel); IPromise On(Action success, Action error); IPromise On(Action success); + new T Join(); + + new T Join(int timeout); + + new IPromise On(Action success, Action error, Action cancel); + + new IPromise On(Action success, Action error); + + new IPromise On(Action success); + new IPromise On(Action handler, PromiseEventType events); - IPromise Then(Func mapper, Func error, Func cancel); - - IPromise Then(Func mapper, Func error); - - IPromise Then(Func mapper); - - IPromise Chain(Func> chained, Func> error, Func> cancel); - - IPromise Chain(Func> chained, Func> error); - - IPromise Chain(Func> chained); } } diff --git a/Implab/Implab.csproj b/Implab/Implab.csproj --- a/Implab/Implab.csproj +++ b/Implab/Implab.csproj @@ -150,7 +150,6 @@ - @@ -159,6 +158,13 @@ + + + + + + + diff --git a/Implab/Promise.cs b/Implab/Promise.cs --- a/Implab/Promise.cs +++ b/Implab/Promise.cs @@ -1,222 +1,22 @@ using System; -using System.Diagnostics; +using Implab.Parallels; namespace Implab { - public class Promise : AbstractPromise, IPromise, IDeferred { - - public struct HandlerDescriptor { - readonly Action m_success; - readonly Action m_error; - readonly Action m_cancel; - readonly IDeferred m_deferred; - - public HandlerDescriptor(Action success, Action error, Action cancel, IDeferred deferred) { - m_success = success; - m_error = error; - m_cancel = cancel; - m_deferred = deferred; - } - - public void SignalSuccess() { - if (m_success != null) { - try { - m_success(); - if (m_deferred != null) - m_deferred.Resolve(); - } catch (Exception err) { - SignalError(err); - } - } - } + public class Promise : AbstractPromise, IDeferred { + public static readonly Promise SUCCESS; - public void SignalError(Exception err) { - if (m_error != null) { - try { - m_error(err); - if (m_deferred != null) - m_deferred.Resolve(); - } catch (Exception err2) { - if (m_deferred != null) - m_deferred.Reject(err2); - } - } else { - if (m_deferred != null) - m_deferred.Reject(err); - } - } - - public void SignalCancel(Exception reason) { - if (m_cancel != null) { - try { - m_cancel(reason); - if (m_deferred != null) - m_deferred.Resolve(); - } catch (Exception err) { - SignalError(err); - } - } else if (reason != null && m_error != null) { - try { - m_error(new OperationCanceledException("The operation was canceled.", reason)); - if (m_deferred != null) - m_deferred.Resolve(); - } catch (Exception err) { - SignalError(err); - } - } else { - if (m_deferred != null) - m_deferred.Cancel(reason); - } - } + static Promise() { + SUCCESS = new Promise(); + SUCCESS.Resolve(); } public void Resolve() { - BeginSetResult(); - EndSetResult(); + SetResult(); } public void Reject(Exception error) { SetError(error); } - - #region implemented abstract members of AbstractPromise - - protected override void SignalSuccess(HandlerDescriptor handler) { - handler.SignalSuccess(); - } - - protected override void SignalError(HandlerDescriptor handler, Exception error) { - handler.SignalError(error); - } - - protected override void SignalCancelled(HandlerDescriptor handler, Exception reason) { - handler.SignalCancel(reason); - } - - protected override void Listen(PromiseEventType events, Action handler) { - AddHandler(new HandlerDescriptor( - events.HasFlag(PromiseEventType.Success) ? handler : null, - events.HasFlag(PromiseEventType.Error) ? new Action(err => handler()) : null, - events.HasFlag(PromiseEventType.Cancelled) ? new Action(reason => handler()) : null, - null - )); - } - - #endregion - - - public Type PromiseType { - get { - return typeof(void); - } - } - - public IPromise Then(Action success, Action error, Action cancel) { - var promise = new Promise(); - if (success != null) - promise.On(Cancel, PromiseEventType.Cancelled); - - AddHandler(new HandlerDescriptor(success, error, cancel, promise)); - - return promise; - } - - public IPromise Then(Action success, Action error) { - return Then(success, error, null); - } - - public IPromise Then(Action success) { - return Then(success, null, null); - } - - public IPromise On(Action success, Action error, Action cancel) { - AddHandler(new HandlerDescriptor(success, error, cancel, null)); - return this; - } - - public IPromise On(Action success, Action error) { - return On(success, error, null); - } - - public IPromise On(Action success) { - return On(success, null, null); - } - - public IPromise On(Action handler, PromiseEventType events) { - return On( - events.HasFlag(PromiseEventType.Success) ? handler : null, - events.HasFlag(PromiseEventType.Error) ? new Action(err => handler()) : null, - events.HasFlag(PromiseEventType.Cancelled) ? new Action(reason => handler()) : null - ); - } - - public IPromise Cast() { - throw new InvalidCastException(); - } - - public IPromise Chain(Func chained, Func error, Func cancel) { - var medium = new Promise(); - - On( - () => { - if (medium.IsCancelled) - return; - if (chained != null) - ConnectPromise(chained(), medium); - }, - ex => { - if (medium.IsCancelled) - return; - if (error != null) { - try { - ConnectPromise(error(ex), medium); - } catch (Exception ex2) { - medium.Reject(ex2); - } - } else { - medium.Reject(ex); - } - }, - reason => { - if (medium.IsCancelled) - return; - if (cancel != null) - ConnectPromise(cancel(reason), medium); - else - medium.Cancel(reason); - } - ); - - if (chained != null) - medium.On(Cancel, PromiseEventType.Cancelled); - - return medium; - } - - static void ConnectPromise(IPromise result, Promise medium) { - if (result != null) { - result.On( - medium.Resolve, - medium.Reject, - medium.Cancel - ); - medium.On(null,null,result.Cancel); - } else { - medium.Reject( - new NullReferenceException( - "The chained asynchronous operation returned" + - " 'null' where the promise instance is expected" - ) - ); - } - } - - public IPromise Chain(Func chained, Func error) { - return Chain(chained, error, null); - } - - public IPromise Chain(Func chained) { - return Chain(chained, null, null); - } } } diff --git a/Implab/PromiseT.cs b/Implab/PromiseT.cs --- a/Implab/PromiseT.cs +++ b/Implab/PromiseT.cs @@ -1,5 +1,6 @@ using System; using System.Diagnostics; +using Implab.Parallels; namespace Implab { @@ -37,538 +38,26 @@ namespace Implab { /// только инициатор обещания иначе могут возникнуть противоречия. /// /// - public class Promise : AbstractPromise>, IPromise, IDeferred { - - class StubDeferred : IDeferred, IDeferred { - public static readonly StubDeferred instance = new StubDeferred(); - - StubDeferred() { - } - - #region IDeferred implementation - - public void Resolve(T value) { - } - - public void Resolve() { - } - - public void Reject(Exception error) { - } - - #endregion - - #region ICancellable implementation - - public void Cancel() { - } - - public void Cancel(Exception reason) { - } - - #endregion - - - } - - class RemapDescriptor : IDeferred { - readonly Func m_remap; - readonly Func m_failed; - readonly Func m_cancel; - readonly IDeferred m_deferred; - - public RemapDescriptor(Func remap, Func failed, Func cancel, IDeferred deferred ) { - Debug.Assert(deferred != null); - m_remap = remap; - m_failed = failed; - m_cancel = cancel; - m_deferred = deferred; - } + public class Promise : AbstractPromise, IDeferred { - - - #region IDeferred implementation - - public void Resolve(T value) { - if (m_remap != null) { - try { - m_deferred.Resolve(m_remap(value)); - } catch (Exception ex) { - Reject(ex); - } - } - } - - public void Reject(Exception error) { - if (m_failed != null) { - try { - m_deferred.Resolve(m_failed(error)); - } catch (Exception ex) { - m_deferred.Reject(ex); - } - } else { - m_deferred.Reject(error); - } - } - - - #endregion - - #region ICancellable implementation - - public void Cancel(Exception reason) { - if (m_cancel != null) { - try { - m_deferred.Resolve(m_cancel(reason)); - } catch (Exception ex) { - Reject(ex); - } - } else { - m_deferred.Cancel(reason); - } - } - - public void Cancel() { - Cancel(null); - } - #endregion + public static IPromise FromResult(T value) { + var p = new Promise(); + p.Resolve(value); + return p; } - class ListenerDescriptor : IDeferred { - readonly Action m_handler; - readonly PromiseEventType m_events; - - public ListenerDescriptor(Action handler, PromiseEventType events) { - Debug.Assert(handler != null); - - m_handler = handler; - m_events = events; - } - - #region IDeferred implementation - - public void Resolve(T value) { - if (m_events.HasFlag(PromiseEventType.Success)) { - try { - m_handler(); - // Analysis disable once EmptyGeneralCatchClause - } catch { - } - } - } - - public void Reject(Exception error) { - if (m_events.HasFlag(PromiseEventType.Error)){ - try { - m_handler(); - // Analysis disable once EmptyGeneralCatchClause - } catch { - } - } - } - - #endregion - - #region ICancellable implementation - - public void Cancel() { - Cancel(null); - } - - public void Cancel(Exception reason) { - if (m_events.HasFlag(PromiseEventType.Cancelled)){ - try { - m_handler(); - // Analysis disable once EmptyGeneralCatchClause - } catch { - } - } - } - - #endregion + public static IPromise FromException(Exception error) { + var p = new Promise(); + p.Reject(error); + return p; } - class ValueEventDescriptor : IDeferred { - readonly Action m_success; - readonly Action m_failed; - readonly Action m_cancelled; - readonly IDeferred m_deferred; - - public ValueEventDescriptor(Action success, Action failed, Action cancelled, IDeferred deferred) { - Debug.Assert(deferred != null); - - m_success = success; - m_failed = failed; - m_cancelled = cancelled; - m_deferred = deferred; - } - - #region IDeferred implementation - - public void Resolve(T value) { - if (m_success != null) { - try { - m_success(value); - m_deferred.Resolve(value); - } catch (Exception ex) { - Reject(ex); - } - } - } - - public void Reject(Exception error) { - if (m_failed != null) { - try { - m_failed(error); - m_deferred.Resolve(default(T)); - } catch(Exception ex) { - m_deferred.Reject(ex); - } - } else { - m_deferred.Reject(error); - } - } - - #endregion - - #region ICancellable implementation - - public void Cancel(Exception reason) { - if (m_cancelled != null) { - try { - m_cancelled(reason); - m_deferred.Resolve(default(T)); - } catch (Exception ex) { - Reject(ex); - } - } else { - m_deferred.Cancel(reason); - } - } - - public void Cancel() { - Cancel(null); - } - - #endregion - } - - public class EventDescriptor : IDeferred { - readonly Action m_success; - readonly Action m_failed; - readonly Action m_cancelled; - readonly IDeferred m_deferred; - - public EventDescriptor(Action success, Action failed, Action cancelled, IDeferred deferred) { - Debug.Assert(deferred != null); - - m_success = success; - m_failed = failed; - m_cancelled = cancelled; - m_deferred = deferred; - } - - #region IDeferred implementation - - public void Resolve(T value) { - if (m_success != null) { - try { - m_success(); - m_deferred.Resolve(); - } catch (Exception ex) { - Reject(ex); - } - } - } - - public void Reject(Exception error) { - if (m_failed != null) { - try { - m_failed(error); - m_deferred.Resolve(); - } catch (Exception ex) { - m_deferred.Reject(ex); - } - } else { - m_deferred.Reject(error); - } - } - - #endregion - - #region ICancellable implementation - - public void Cancel(Exception reason) { - if (m_cancelled != null) { - try { - m_cancelled(reason); - m_deferred.Resolve(); - } catch (Exception ex) { - Reject(ex); - } - } else { - m_deferred.Cancel(reason); - } - } - - public void Cancel() { - Cancel(null); - } - - #endregion - } - - T m_result; - public virtual void Resolve(T value) { - if (BeginSetResult()) { - m_result = value; - EndSetResult(); - } + SetResult(value); } public void Reject(Exception error) { SetError(error); } - - public Type PromiseType { - get { - return typeof(T); - } - } - - public new T Join() { - WaitResult(-1); - return m_result; - } - public new T Join(int timeout) { - WaitResult(timeout); - return m_result; - } - - public IPromise On(Action success, Action error, Action cancel) { - AddHandler(new ValueEventDescriptor(success, error, cancel, StubDeferred.instance)); - return this; - } - - public IPromise On(Action success, Action error) { - AddHandler(new ValueEventDescriptor(success, error, null, StubDeferred.instance)); - return this; - } - - public IPromise On(Action success) { - AddHandler(new ValueEventDescriptor(success, null, null, StubDeferred.instance)); - return this; - } - - public IPromise On(Action handler, PromiseEventType events) { - Listen(events, handler); - return this; - } - - public IPromise Then(Func mapper, Func error, Func cancel) { - var promise = new Promise(); - if (mapper != null) - promise.On((Action)null, null, Cancel); - AddHandler(new RemapDescriptor(mapper, error, cancel, promise)); - return promise; - } - - public IPromise Then(Func mapper, Func error) { - var promise = new Promise(); - if (mapper != null) - promise.On((Action)null, null, Cancel); - AddHandler(new RemapDescriptor(mapper, error, null, promise)); - return promise; - } - - public IPromise Then(Func mapper) { - var promise = new Promise(); - if (mapper != null) - promise.On((Action)null, null, Cancel); - AddHandler(new RemapDescriptor(mapper, null, null, promise)); - return promise; - } - - public IPromise Chain(Func> chained, Func> error, Func> cancel) { - // this promise will be resolved when an asyc operation is started - var promise = new Promise>(); - - AddHandler(new RemapDescriptor>( - chained, - error, - cancel, - promise - )); - - var medium = new Promise(); - - if (chained != null) - medium.On(Cancel, PromiseEventType.Cancelled); - - // we need to connect started async operation with the medium - // if the async operation hasn't been started by the some reason - // report is to the medium - promise.On( - result => ConnectPromise(result, medium), - medium.Reject, - medium.Cancel - ); - - return medium; - } - - static void ConnectPromise(IPromise result, Promise medium) { - if (result != null) { - result.On( - medium.Resolve, - medium.Reject, - medium.Cancel - ); - medium.On(result.Cancel, PromiseEventType.Cancelled); - } else { - medium.Reject( - new NullReferenceException( - "The chained asynchronous operation returned" + - " 'null' where the promise instance is expected" - ) - ); - } - } - - public IPromise Chain(Func> chained, Func> error) { - return Chain(chained, error, null); - } - - public IPromise Chain(Func> chained) { - return Chain(chained, null, null); - } - - public IPromise Then(Action success, Action error, Action cancel) { - var promise = new Promise(); - if (success != null) - promise.On(null, null, Cancel); - - AddHandler(new EventDescriptor(success, error, cancel, promise)); - - return promise; - } - - public IPromise Then(Action success, Action error) { - return Then(success, error, null); - } - - public IPromise Then(Action success) { - return Then(success, null, null); - } - - public IPromise Chain(Func chained, Func error, Func cancel) { - var promise = new Promise(); - - AddHandler( - new RemapDescriptor( - x => chained(), - error, - cancel, - promise - ) - ); - - var medium = new Promise(); - if (chained != null) - medium.On(null, null, Cancel); - - promise.On( - result => ConnectPromise(result, medium), - medium.Reject, - medium.Cancel - ); - - return medium; - } - - static void ConnectPromise(IPromise result, Promise medium) { - if (result != null) { - result.On( - medium.Resolve, - medium.Reject, - medium.Cancel - ); - medium.On(null, null, result.Cancel); - } else { - medium.Reject( - new NullReferenceException( - "The chained asynchronous operation returned" + - " 'null' where the promise instance is expected" - ) - ); - } - } - - public IPromise Chain(Func chained, Func error) { - return Chain(chained, error, null); - } - - public IPromise Chain(Func chained) { - return Chain(chained, null, null); - } - - public IPromise On(Action success, Action error, Action cancel) { - AddHandler(new EventDescriptor(success,error,cancel, StubDeferred.instance)); - return this; - } - - public IPromise On(Action success, Action error) { - AddHandler(new EventDescriptor(success, error, null, StubDeferred.instance)); - return this; - } - - public IPromise On(Action success) { - Listen(PromiseEventType.Success, success); - return this; - } - - IPromise IPromise.On(Action handler, PromiseEventType events) { - Listen(events,handler); - return this; - } - - public IPromise Cast() { - return (IPromise)this; - } - - #region implemented abstract members of AbstractPromise - - protected override void SignalSuccess(IDeferred handler) { - handler.Resolve(m_result); - } - - protected override void SignalError(IDeferred handler, Exception error) { - handler.Reject(error); - } - - protected override void SignalCancelled(IDeferred handler, Exception reason) { - handler.Cancel(reason); - } - - protected override void Listen(PromiseEventType events, Action handler) { - if (handler != null) - AddHandler(new ListenerDescriptor(handler, events)); - } - - #endregion - - public static IPromise ResultToPromise(T value) { - var p = new Promise(); - p.Resolve(value); - return p; - } - - public static IPromise ExceptionToPromise(Exception error) { - var p = new Promise(); - p.Reject(error); - return p; - } - } }