# HG changeset patch # User cin # Date 2018-01-24 00:03:21 # Node ID cbe10ac0731e4edb4d43386f1a3fc3454f6f925c # Parent fa6cbf4d8841204bd901308afe0e594dc61387e5 Working on promises diff --git a/Implab.Fx/StaApartment.cs b/Implab.Fx/StaApartment.cs --- a/Implab.Fx/StaApartment.cs +++ b/Implab.Fx/StaApartment.cs @@ -195,7 +195,7 @@ namespace Implab.Fx { protected override void Dispose(bool disposing) { if (disposing) { - if (!m_threadTerminated.IsResolved) + if (!m_threadTerminated.IsFulfilled) m_syncContext.Post(x => Application.ExitThread(), null); } base.Dispose(disposing); diff --git a/Implab.Test/CancelationTests.cs b/Implab.Test/CancelationTests.cs --- a/Implab.Test/CancelationTests.cs +++ b/Implab.Test/CancelationTests.cs @@ -40,7 +40,7 @@ namespace Implab.Test { // cancel the promise Assert.IsTrue(p.CancelOperationIfRequested()); Assert.IsTrue(p.IsCancelled); - Assert.AreSame(reason, p.Error); + Assert.AreSame(reason, p.RejectReason); } [TestMethod] @@ -76,7 +76,7 @@ namespace Implab.Test { task.Cancel(); Assert.IsTrue(task.IsCancellationRequested); Assert.IsFalse(task.IsCancelled); - Assert.IsFalse(task.IsResolved); + Assert.IsFalse(task.IsFulfilled); finish.Set(); task.Join(1000); diff --git a/Implab/AbstractEvent.cs b/Implab/AbstractEvent.cs --- a/Implab/AbstractEvent.cs +++ b/Implab/AbstractEvent.cs @@ -2,56 +2,50 @@ using Implab.Parallels; using System.Threading; using System.Reflection; +using System.Diagnostics; namespace Implab { - public abstract class AbstractEvent : ICancellable { + public abstract class AbstractEvent where THandler : class { - const int UNRESOLVED_SATE = 0; - const int TRANSITIONAL_STATE = 1; + const int PENDING_SATE = 0; + protected const int TRANSITIONAL_STATE = 1; + protected const int SUCCEEDED_STATE = 2; protected const int REJECTED_STATE = 3; - protected const int CANCELLED_STATE = 4; - const int CANCEL_NOT_REQUESTED = 0; - const int CANCEL_REQUESTING = 1; - const int CANCEL_REQUESTED = 2; - - const int RESERVED_HANDLERS_COUNT = 4; - - int m_state; + volatile int m_state; Exception m_error; - int m_handlersCount; - //readonly THandler[] m_handlers = new THandler[RESERVED_HANDLERS_COUNT]; - THandler[] m_handlers; + THandler m_handler; SimpleAsyncQueue m_extraHandlers; - int m_handlerPointer = -1; - int m_handlersCommited; - - int m_cancelRequest; - Exception m_cancelationReason; #region state managment - bool BeginTransit() { - return UNRESOLVED_SATE == Interlocked.CompareExchange(ref m_state, TRANSITIONAL_STATE, UNRESOLVED_SATE); + protected bool BeginTransit() { + return PENDING_SATE == Interlocked.CompareExchange(ref m_state, TRANSITIONAL_STATE, PENDING_SATE); } - void CompleteTransit(int state) { + protected void CompleteTransit(int state) { +#if DEBUG if (TRANSITIONAL_STATE != Interlocked.CompareExchange(ref m_state, state, TRANSITIONAL_STATE)) throw new InvalidOperationException("Can't complete transition when the object isn't in the transitional state"); +#else + m_state = state; +#endif + Signal(); } - void WaitTransition() { - while (m_state == TRANSITIONAL_STATE) { - Thread.MemoryBarrier(); + protected void WaitTransition() { + if (m_state == TRANSITIONAL_STATE) { + SpinWait spin; + do { + spin.SpinOnce(); + } while (m_state == TRANSITIONAL_STATE); } } protected bool BeginSetResult() { if (!BeginTransit()) { WaitTransition(); - if (m_state != CANCELLED_STATE) - throw new InvalidOperationException("The promise is already resolved"); return false; } return true; @@ -59,7 +53,6 @@ namespace Implab { protected void EndSetResult() { CompleteTransit(SUCCEEDED_STATE); - Signal(); } @@ -78,8 +71,6 @@ namespace Implab { if (BeginTransit()) { m_error = error; CompleteTransit(REJECTED_STATE); - - Signal(); } else { WaitTransition(); if (m_state == SUCCEEDED_STATE) @@ -87,58 +78,33 @@ namespace Implab { } } - /// - /// Отменяет операцию, если это возможно. - /// - /// Для определения была ли операция отменена следует использовать свойство . - protected void SetCancelled(Exception reason) { - if (BeginTransit()) { - m_error = reason; - CompleteTransit(CANCELLED_STATE); - Signal(); - } - } - protected abstract void SignalHandler(THandler handler, int signal); void Signal() { - var hp = m_handlerPointer; - var slot = hp +1 ; - while (slot < m_handlersCommited) { - if (Interlocked.CompareExchange(ref m_handlerPointer, slot, hp) == hp) { - SignalHandler(m_handlers[slot], m_state); - } - hp = m_handlerPointer; - slot = hp +1 ; - } - - - if (m_extraHandlers != null) { - THandler handler; - while (m_extraHandlers.TryDequeue(out handler)) - SignalHandler(handler, m_state); - } + THandler handler; + while (TryDequeueHandler(out handler)) + SignalHandler(handler, m_state); } #endregion - protected abstract Signal GetResolveSignal(); + protected abstract Signal GetFulfillSignal(); #region synchronization traits protected void WaitResult(int timeout) { - if (!(IsResolved || GetResolveSignal().Wait(timeout))) + if (!(IsFulfilled || GetFulfillSignal().Wait(timeout))) throw new TimeoutException(); - switch (m_state) { - case SUCCEEDED_STATE: - return; - case CANCELLED_STATE: - throw new OperationCanceledException("The operation has been cancelled", m_error); - case REJECTED_STATE: - throw new TargetInvocationException(m_error); - default: - throw new ApplicationException(String.Format("The promise state {0} is invalid", m_state)); - } + if (IsRejected) + Rethrow(); + } + + protected void Rethrow() { + Debug.Assert(m_error != null); + if (m_error is OperationCanceledException) + throw new OperationCanceledException("Operation cancelled", m_error); + else + throw new TargetInvocationException(m_error); } #endregion @@ -150,149 +116,55 @@ namespace Implab { // the promise is in the resolved state, just invoke the handler SignalHandler(handler, m_state); } else { - var slot = Interlocked.Increment(ref m_handlersCount) - 1; - - if (slot < RESERVED_HANDLERS_COUNT) { - - if (slot == 0) { - m_handlers = new THandler[RESERVED_HANDLERS_COUNT]; - } else { - while (m_handlers == null) - Thread.MemoryBarrier(); - } - - m_handlers[slot] = handler; - - while (slot != Interlocked.CompareExchange(ref m_handlersCommited, slot + 1, slot)) { - } + if (Interlocked.CompareExchange(ref m_handler, handler, null) != null) { + if (m_extraHandlers == null) + // compare-exchange will fprotect from loosing already created queue + Interlocked.CompareExchange(ref m_extraHandlers, new SimpleAsyncQueue(), null); + m_extraHandlers.Enqueue(handler); + } - if (m_state > 1) { - do { - var hp = m_handlerPointer; - slot = hp + 1; - if (slot < m_handlersCommited) { - if (Interlocked.CompareExchange(ref m_handlerPointer, slot, hp) != hp) - continue; - SignalHandler(m_handlers[slot], m_state); - } - break; - } while(true); - } - } else { - if (slot == RESERVED_HANDLERS_COUNT) { - m_extraHandlers = new SimpleAsyncQueue(); - } else { - while (m_extraHandlers == null) - Thread.MemoryBarrier(); - } - - m_extraHandlers.Enqueue(handler); - - if (m_state > 1 && m_extraHandlers.TryDequeue(out handler)) + if (m_state > 1 && TryDequeueHandler(out handler)) // if the promise have been resolved while we was adding the handler to the queue // we can't guarantee that someone is still processing it // therefore we need to fetch a handler from the queue and execute it // note that fetched handler may be not the one that we have added // even we can fetch no handlers at all :) - SignalHandler(handler, m_state); - } + SignalHandler(handler, m_state); } + + } + + bool TryDequeueHandler(out THandler handler) { + handler = Interlocked.Exchange(ref m_handler, null); + if (handler != null) + return true; + return m_extraHandlers != null && m_extraHandlers.TryDequeue(out handler); } #endregion #region IPromise implementation - public bool IsResolved { + public bool IsFulfilled { get { - Thread.MemoryBarrier(); - return m_state > 1; + return m_state > TRANSITIONAL_STATE; } } - public bool IsCancelled { + public bool IsRejected { get { - Thread.MemoryBarrier(); - return m_state == CANCELLED_STATE; + return m_state == REJECTED_STATE; } } #endregion - public Exception Error { + public Exception RejectReason { get { return m_error; } } - public bool CancelOperationIfRequested() { - if (IsCancellationRequested) { - CancelOperation(CancellationReason); - return true; - } - return false; - } - - public virtual void CancelOperation(Exception reason) { - SetCancelled(reason); - } - - public void CancellationRequested(Action handler) { - Safe.ArgumentNotNull(handler, "handler"); - if (IsCancellationRequested) - handler(CancellationReason); - - if (m_cancelationHandlers == null) - Interlocked.CompareExchange(ref m_cancelationHandlers, new SimpleAsyncQueue>(), null); - - m_cancelationHandlers.Enqueue(handler); - - if (IsCancellationRequested && m_cancelationHandlers.TryDequeue(out handler)) - // TryDeque implies MemoryBarrier() - handler(m_cancelationReason); - } - - public bool IsCancellationRequested { - get { - do { - if (m_cancelRequest == CANCEL_NOT_REQUESTED) - return false; - if (m_cancelRequest == CANCEL_REQUESTED) - return true; - Thread.MemoryBarrier(); - } while(true); - } - } - - public Exception CancellationReason { - get { - do { - Thread.MemoryBarrier(); - } while(m_cancelRequest == CANCEL_REQUESTING); - - return m_cancelationReason; - } - } - - #region ICancellable implementation - - public void Cancel() { - Cancel(null); - } - - public void Cancel(Exception reason) { - if (CANCEL_NOT_REQUESTED == Interlocked.CompareExchange(ref m_cancelRequest, CANCEL_REQUESTING, CANCEL_NOT_REQUESTED)) { - m_cancelationReason = reason; - m_cancelRequest = CANCEL_REQUESTED; - if (m_cancelationHandlers != null) { - Action handler; - while (m_cancelationHandlers.TryDequeue(out handler)) - handler(m_cancelationReason); - } - } - } - - #endregion } } diff --git a/Implab/AbstractPromise.cs b/Implab/AbstractPromise.cs --- a/Implab/AbstractPromise.cs +++ b/Implab/AbstractPromise.cs @@ -3,28 +3,16 @@ using Implab.Parallels; namespace Implab { public abstract class AbstractPromise : AbstractEvent, IPromise { - public struct HandlerDescriptor { + public class HandlerDescriptor { readonly Action m_handler; readonly Action m_error; - readonly Action m_cancel; - readonly PromiseEventType m_mask; - - public HandlerDescriptor(Action success, Action error, Action cancel) { + public HandlerDescriptor(Action success, Action error) { m_handler = success; m_error = error; - m_cancel = cancel; - m_mask = PromiseEventType.Success; - } - - public HandlerDescriptor(Action handler, PromiseEventType mask) { - m_handler = handler; - m_error = null; - m_cancel = null; - m_mask = mask; } public void SignalSuccess() { - if ((m_mask & PromiseEventType.Success) != 0 && m_handler != null) { + if (m_handler != null) { try { m_handler(); // Analysis disable once EmptyGeneralCatchClause @@ -40,28 +28,6 @@ namespace Implab { // Analysis disable once EmptyGeneralCatchClause } catch { } - } else if ((m_mask & PromiseEventType.Error ) != 0 && m_handler != null) { - try { - m_handler(); - // Analysis disable once EmptyGeneralCatchClause - } catch { - } - } - } - - public void SignalCancel(Exception reason) { - if (m_cancel != null) { - try { - m_cancel(reason); - // Analysis disable once EmptyGeneralCatchClause - } catch { - } - } else if ( (m_mask & PromiseEventType.Cancelled) != 0 && m_handler != null) { - try { - m_handler(); - // Analysis disable once EmptyGeneralCatchClause - } catch { - } } } } @@ -75,48 +41,29 @@ namespace Implab { handler.SignalSuccess(); break; case REJECTED_STATE: - handler.SignalError(Error); - break; - case CANCELLED_STATE: - handler.SignalCancel(CancellationReason); + handler.SignalError(RejectReason); break; default: throw new InvalidOperationException(String.Format("Invalid promise signal: {0}", signal)); } } - protected override Signal GetResolveSignal() { + protected override Signal GetFulfillSignal() { var signal = new Signal(); - On(signal.Set, PromiseEventType.All); + On(signal.Set, e => signal.Set()); return signal; } #endregion - public Type PromiseType { + public Type ResultType { get { return typeof(void); } } - public IPromise On(Action success, Action error, Action cancel) { - AddHandler(new HandlerDescriptor(success, error, cancel)); - return this; - } - - public IPromise On(Action success, Action error) { - AddHandler(new HandlerDescriptor(success, error, null)); - return this; - } - - public IPromise On(Action success) { - AddHandler(new HandlerDescriptor(success, null, null)); - return this; - } - - public IPromise On(Action handler, PromiseEventType events) { - AddHandler(new HandlerDescriptor(handler,events)); - return this; + public void On(Action success, Action error) { + AddHandler(new HandlerDescriptor(success, error)); } public IPromise Cast() { diff --git a/Implab/AbstractPromiseT.cs b/Implab/AbstractPromiseT.cs --- a/Implab/AbstractPromiseT.cs +++ b/Implab/AbstractPromiseT.cs @@ -84,7 +84,7 @@ namespace Implab { } } - public Type PromiseType { + public Type ResultType { get { return typeof(T); } @@ -167,7 +167,7 @@ namespace Implab { #region implemented abstract members of AbstractPromise - protected override Signal GetResolveSignal() { + protected override Signal GetFulfillSignal() { var signal = new Signal(); AddHandler(new HandlerDescriptor(signal.Set, PromiseEventType.All)); return signal; @@ -179,7 +179,7 @@ namespace Implab { handler.SignalSuccess(m_result); break; case REJECTED_STATE: - handler.SignalError(Error); + handler.SignalError(RejectReason); break; case CANCELLED_STATE: handler.SignalCancel(CancellationReason); diff --git a/Implab/FailedPromise.cs b/Implab/FailedPromise.cs --- a/Implab/FailedPromise.cs +++ b/Implab/FailedPromise.cs @@ -53,20 +53,20 @@ namespace Implab { } public void Join() { - throw new TargetInvocationException(Error); + throw new TargetInvocationException(RejectReason); } public void Join(int timeout) { - throw new TargetInvocationException(Error); + throw new TargetInvocationException(RejectReason); } - public virtual Type PromiseType { + public virtual Type ResultType { get { return typeof(void); } } - public bool IsResolved { + public bool IsFulfilled { get { return true; } @@ -78,7 +78,7 @@ namespace Implab { } } - public Exception Error { + public Exception RejectReason { get { return m_error; } diff --git a/Implab/FailedPromiseT.cs b/Implab/FailedPromiseT.cs --- a/Implab/FailedPromiseT.cs +++ b/Implab/FailedPromiseT.cs @@ -9,7 +9,7 @@ namespace Implab { public IPromise On(Action success, Action error, Action cancel) { if (error != null) { try { - error(Error); + error(RejectReason); // Analysis disable once EmptyGeneralCatchClause } catch { } @@ -20,7 +20,7 @@ namespace Implab { public IPromise On(Action success, Action error) { if (error != null) { try { - error(Error); + error(RejectReason); // Analysis disable once EmptyGeneralCatchClause } catch { } @@ -33,11 +33,11 @@ namespace Implab { } T IPromise.Join() { - throw new TargetInvocationException(Error); + throw new TargetInvocationException(RejectReason); } T IPromise.Join(int timeout) { - throw new TargetInvocationException(Error); + throw new TargetInvocationException(RejectReason); } diff --git a/Implab/IPromise.cs b/Implab/IPromise.cs --- a/Implab/IPromise.cs +++ b/Implab/IPromise.cs @@ -9,42 +9,29 @@ namespace Implab { /// /// Тип результата, получаемого через данное обещание. /// - Type PromiseType { get; } + Type ResultType { get; } /// /// Обещание является выполненым, либо успешно, либо с ошибкой, либо отменено. /// - bool IsResolved { get; } + bool IsFulfilled { get; } - /// - /// Обещание было отменено. - /// - bool IsCancelled { get; } + bool IsRejected { get; } + + bool IsResolved { get; } /// /// Исключение возникшее в результате выполнения обещания, либо причина отмены. /// - Exception Error { get; } + Exception RejectReason { get; } /// /// Adds specified listeners to the current promise. /// /// The handler called on the successful promise completion. /// The handler is called if an error while completing the promise occurred. - /// The handler is called in case of promise cancellation. /// The current promise. - IPromise On(Action success, Action error, Action cancel); - IPromise On(Action success, Action error); - IPromise On(Action success); - - /// - /// Adds specified listeners to the current promise. - /// - /// The handler called on the specified events. - /// The combination of flags denoting the events for which the - /// handler shoud be called. - /// The current promise. - IPromise On(Action handler, PromiseEventType events); + void On(Action success, Action error); /// /// Преобразует результат обещания к заданному типу и возвращает новое обещание. diff --git a/Implab/IPromiseT.cs b/Implab/IPromiseT.cs --- a/Implab/IPromiseT.cs +++ b/Implab/IPromiseT.cs @@ -3,23 +3,10 @@ namespace Implab { public interface IPromise : IPromise { - IPromise On(Action success, Action error, Action cancel); - - IPromise On(Action success, Action error); - - IPromise On(Action success); + void On(Action success, Action error); new T Join(); new T Join(int timeout); - - new IPromise On(Action success, Action error, Action cancel); - - new IPromise On(Action success, Action error); - - new IPromise On(Action success); - - new IPromise On(Action handler, PromiseEventType events); - } } diff --git a/Implab/Parallels/ArrayTraits.cs b/Implab/Parallels/ArrayTraits.cs --- a/Implab/Parallels/ArrayTraits.cs +++ b/Implab/Parallels/ArrayTraits.cs @@ -164,7 +164,7 @@ namespace Implab.Parallels { // Analysis disable AccessToDisposedClosure AsyncPool.RunThread(() => { for (int i = 0; i < source.Length; i++) { - if(promise.IsResolved) + if(promise.IsFulfilled) break; // stop processing in case of error or cancellation var idx = i; diff --git a/Implab/Parallels/SimpleAsyncQueue.cs b/Implab/Parallels/SimpleAsyncQueue.cs --- a/Implab/Parallels/SimpleAsyncQueue.cs +++ b/Implab/Parallels/SimpleAsyncQueue.cs @@ -15,12 +15,12 @@ namespace Implab.Parallels { // the reader and the writer are mainteined completely independent, // the reader can read next item when m_first.next is not null - // the writer creates the a new node, moves m_last to this node and + // the writer creates a new node, moves m_last to this node and // only after that restores the reference from the previous node - // making available the reader to read the new node. + // making the reader be able to read the new node. - Node m_first; // position on the node which is already read - Node m_last; // position on the node which is already written + volatile Node m_first; // position on the node which is already read + volatile Node m_last; // position on the node which is already written public SimpleAsyncQueue() { m_first = m_last = new Node(default(T)); @@ -35,29 +35,38 @@ namespace Implab.Parallels { // release-fence last.next = next; - + } public bool TryDequeue(out T value) { - Node first; - Node next; + Node first = m_first; ; + Node next = first.next; ; + + if (next == null) { + value = default(T); + return false; + } + + var first2 = Interlocked.CompareExchange(ref m_first, next, first); + + if (first != first2) { + // head is updated by someone else - Thread.MemoryBarrier(); // ensure m_first is fresh - SpinWait spin = new SpinWait(); - do { - first = m_first; - // aquire-fence - next = first.next; - if (next == null) { - value = default(T); - return false; - } - - if (first == Interlocked.CompareExchange(ref m_first, next, first)) - // head succesfully updated - break; - spin.SpinOnce(); - } while (true); + SpinWait spin = new SpinWait(); + do { + first = first2; + next = first.next; + if (next == null) { + value = default(T); + return false; + } + + first2 = Interlocked.CompareExchange(ref m_first, next, first); + if (first == first2) + break; + spin.SpinOnce(); + } while (true); + } value = next.value; return true; diff --git a/Implab/PromiseAwaiter.cs b/Implab/PromiseAwaiter.cs --- a/Implab/PromiseAwaiter.cs +++ b/Implab/PromiseAwaiter.cs @@ -20,7 +20,7 @@ namespace Implab { public bool IsCompleted { get { - return m_promise.IsResolved; + return m_promise.IsFulfilled; } } } diff --git a/Implab/PromiseAwaiterT.cs b/Implab/PromiseAwaiterT.cs --- a/Implab/PromiseAwaiterT.cs +++ b/Implab/PromiseAwaiterT.cs @@ -20,7 +20,7 @@ namespace Implab { public bool IsCompleted { get { - return m_promise.IsResolved; + return m_promise.IsFulfilled; } } } diff --git a/Implab/SuccessPromise.cs b/Implab/SuccessPromise.cs --- a/Implab/SuccessPromise.cs +++ b/Implab/SuccessPromise.cs @@ -58,13 +58,13 @@ namespace Implab { public void Join(int timeout) { } - public Type PromiseType { + public Type ResultType { get { return typeof(void); } } - public bool IsResolved { + public bool IsFulfilled { get { return true; } @@ -76,7 +76,7 @@ namespace Implab { } } - public Exception Error { + public Exception RejectReason { get { return null; } diff --git a/Implab/SuccessPromiseT.cs b/Implab/SuccessPromiseT.cs --- a/Implab/SuccessPromiseT.cs +++ b/Implab/SuccessPromiseT.cs @@ -119,13 +119,13 @@ namespace Implab { void IPromise.Join(int timeout) { } - public Type PromiseType { + public Type ResultType { get { return typeof(T); } } - public bool IsResolved { + public bool IsFulfilled { get { return true; } @@ -137,7 +137,7 @@ namespace Implab { } } - public Exception Error { + public Exception RejectReason { get { return null; } diff --git a/packages/repositories.config b/packages/repositories.config --- a/packages/repositories.config +++ b/packages/repositories.config @@ -1,5 +1,6 @@  + \ No newline at end of file