using System; using Implab.Parallels; using System.Threading; using System.Reflection; namespace Implab { public abstract class AbstractEvent : ICancellationToken, ICancellable { const int UNRESOLVED_SATE = 0; const int TRANSITIONAL_STATE = 1; const int SUCCEEDED_STATE = 2; const int REJECTED_STATE = 3; const int CANCELLED_STATE = 4; const int CANCEL_NOT_REQUESTED = 0; const int CANCEL_REQUESTING = 1; const int CANCEL_REQUESTED = 2; const int RESERVED_HANDLERS_COUNT = 4; int m_state; Exception m_error; int m_handlersCount; readonly THandler[] m_handlers = new THandler[RESERVED_HANDLERS_COUNT]; MTQueue m_extraHandlers; int m_handlerPointer = -1; int m_handlersCommited; int m_cancelRequest; Exception m_cancelationReason; MTQueue> m_cancelationHandlers; #region state managment bool BeginTransit() { return UNRESOLVED_SATE == Interlocked.CompareExchange(ref m_state, TRANSITIONAL_STATE, UNRESOLVED_SATE); } void CompleteTransit(int state) { if (TRANSITIONAL_STATE != Interlocked.CompareExchange(ref m_state, state, TRANSITIONAL_STATE)) throw new InvalidOperationException("Can't complete transition when the object isn't in the transitional state"); } void WaitTransition() { while (m_state == TRANSITIONAL_STATE) { Thread.MemoryBarrier(); } } protected bool BeginSetResult() { if (!BeginTransit()) { WaitTransition(); if (m_state != CANCELLED_STATE) throw new InvalidOperationException("The promise is already resolved"); return false; } return true; } protected void EndSetResult() { CompleteTransit(SUCCEEDED_STATE); OnSuccess(); } /// /// Выполняет обещание, сообщая об ошибке /// /// /// Поскольку обещание должно работать в многопточной среде, при его выполнении сразу несколько потоков /// могу вернуть ошибку, при этом только первая будет использована в качестве результата, остальные /// будут проигнорированы. /// /// Исключение возникшее при выполнении операции /// Данное обещание уже выполнено protected void SetError(Exception error) { if (BeginTransit()) { if (error is OperationCanceledException) { CompleteTransit(CANCELLED_STATE); m_error = error.InnerException; OnCancelled(); } else { m_error = error is PromiseTransientException ? error.InnerException : error; CompleteTransit(REJECTED_STATE); OnError(); } } else { WaitTransition(); if (m_state == SUCCEEDED_STATE) throw new InvalidOperationException("The promise is already resolved"); } } /// /// Отменяет операцию, если это возможно. /// /// Для определения была ли операция отменена следует использовать свойство . protected void SetCancelled(Exception reason) { if (BeginTransit()) { m_error = reason; CompleteTransit(CANCELLED_STATE); OnCancelled(); } } protected abstract void SignalSuccess(THandler handler); protected abstract void SignalError(THandler handler, Exception error); protected abstract void SignalCancelled(THandler handler, Exception reason); void OnSuccess() { var hp = m_handlerPointer; var slot = hp +1 ; while (slot < m_handlersCommited) { if (Interlocked.CompareExchange(ref m_handlerPointer, slot, hp) == hp) { SignalSuccess(m_handlers[slot]); } hp = m_handlerPointer; slot = hp +1 ; } if (m_extraHandlers != null) { THandler handler; while (m_extraHandlers.TryDequeue(out handler)) SignalSuccess(handler); } } void OnError() { var hp = m_handlerPointer; var slot = hp +1 ; while (slot < m_handlersCommited) { if (Interlocked.CompareExchange(ref m_handlerPointer, slot, hp) == hp) { SignalError(m_handlers[slot],m_error); } hp = m_handlerPointer; slot = hp +1 ; } if (m_extraHandlers != null) { THandler handler; while (m_extraHandlers.TryDequeue(out handler)) SignalError(handler, m_error); } } void OnCancelled() { var hp = m_handlerPointer; var slot = hp +1 ; while (slot < m_handlersCommited) { if (Interlocked.CompareExchange(ref m_handlerPointer, slot, hp) == hp) { SignalCancelled(m_handlers[slot], m_error); } hp = m_handlerPointer; slot = hp +1 ; } if (m_extraHandlers != null) { THandler handler; while (m_extraHandlers.TryDequeue(out handler)) SignalCancelled(handler, m_error); } } #endregion protected abstract Signal GetResolveSignal(); #region synchronization traits protected void WaitResult(int timeout) { if (!IsResolved) GetResolveSignal().Wait(timeout); switch (m_state) { case SUCCEEDED_STATE: return; case CANCELLED_STATE: throw new OperationCanceledException(); case REJECTED_STATE: throw new TargetInvocationException(m_error); default: throw new ApplicationException(String.Format("Invalid promise state {0}", m_state)); } } #endregion #region handlers managment protected void AddHandler(THandler handler) { if (m_state > 1) { // the promise is in the resolved state, just invoke the handler InvokeHandler(handler); } else { var slot = Interlocked.Increment(ref m_handlersCount) - 1; if (slot < RESERVED_HANDLERS_COUNT) { m_handlers[slot] = handler; while (slot != Interlocked.CompareExchange(ref m_handlersCommited, slot + 1, slot)) { } if (m_state > 1) { do { var hp = m_handlerPointer; slot = hp + 1; if (slot < m_handlersCommited) { if (Interlocked.CompareExchange(ref m_handlerPointer, slot, hp) != hp) continue; InvokeHandler(m_handlers[slot]); } break; } while(true); } } else { if (slot == RESERVED_HANDLERS_COUNT) { m_extraHandlers = new MTQueue(); } else { while (m_extraHandlers == null) Thread.MemoryBarrier(); } m_extraHandlers.Enqueue(handler); if (m_state > 1 && m_extraHandlers.TryDequeue(out handler)) // if the promise have been resolved while we was adding the handler to the queue // we can't guarantee that someone is still processing it // therefore we need to fetch a handler from the queue and execute it // note that fetched handler may be not the one that we have added // even we can fetch no handlers at all :) InvokeHandler(handler); } } } protected void InvokeHandler(THandler handler) { switch (m_state) { case SUCCEEDED_STATE: SignalSuccess(handler); break; case CANCELLED_STATE: SignalCancelled(handler, m_error); break; case REJECTED_STATE: SignalError(handler, m_error); break; default: throw new Exception(String.Format("Invalid promise state {0}", m_state)); } } #endregion #region IPromise implementation public bool IsResolved { get { Thread.MemoryBarrier(); return m_state > 1; } } public bool IsCancelled { get { Thread.MemoryBarrier(); return m_state == CANCELLED_STATE; } } #endregion public Exception Error { get { return m_error; } } public bool CancelOperationIfRequested() { if (IsCancellationRequested) { CancelOperation(CancellationReason); return true; } return false; } public virtual void CancelOperation(Exception reason) { SetCancelled(reason); } public void CancellationRequested(Action handler) { Safe.ArgumentNotNull(handler, "handler"); if (IsCancellationRequested) handler(CancellationReason); if (m_cancelationHandlers == null) Interlocked.CompareExchange(ref m_cancelationHandlers, new MTQueue>(), null); m_cancelationHandlers.Enqueue(handler); if (IsCancellationRequested && m_cancelationHandlers.TryDequeue(out handler)) // TryDeque implies MemoryBarrier() handler(m_cancelationReason); } public bool IsCancellationRequested { get { do { if (m_cancelRequest == CANCEL_NOT_REQUESTED) return false; if (m_cancelRequest == CANCEL_REQUESTED) return true; Thread.MemoryBarrier(); } while(true); } } public Exception CancellationReason { get { do { Thread.MemoryBarrier(); } while(m_cancelRequest == CANCEL_REQUESTING); return m_cancelationReason; } } #region ICancellable implementation public void Cancel() { Cancel(null); } public void Cancel(Exception reason) { if (CANCEL_NOT_REQUESTED == Interlocked.CompareExchange(ref m_cancelRequest, CANCEL_REQUESTING, CANCEL_NOT_REQUESTED)) { m_cancelationReason = reason; m_cancelRequest = CANCEL_REQUESTED; if (m_cancelationHandlers != null) { Action handler; while (m_cancelationHandlers.TryDequeue(out handler)) handler(m_cancelationReason); } } } #endregion } }