AbstractPromise.cs
        
        
            
                    308 lines
            
             | 10.4 KiB
            
                | text/x-csharp
            
             |
                CSharpLexer
            
          
        
             / Implab / AbstractPromise.cs
          
          
          
      |  | r119 | using System; | ||
| using Implab.Parallels; | ||||
| using System.Threading; | ||||
| using System.Reflection; | ||||
| namespace Implab { | ||||
| public abstract class AbstractPromise<THandler> { | ||||
| const int UNRESOLVED_SATE = 0; | ||||
| const int TRANSITIONAL_STATE = 1; | ||||
| const int SUCCEEDED_STATE = 2; | ||||
| const int REJECTED_STATE = 3; | ||||
| const int CANCELLED_STATE = 4; | ||||
|  | r125 | const int RESERVED_HANDLERS_COUNT = 4; | ||
|  | r119 | int m_state; | ||
| Exception m_error; | ||||
|  | r125 | int m_handlersCount; | ||
|  | r119 | |||
|  | r125 | readonly THandler[] m_handlers = new THandler[RESERVED_HANDLERS_COUNT]; | ||
| MTQueue<THandler> m_extraHandlers; | ||||
| int m_handlerPointer = -1; | ||||
| int m_handlersCommited; | ||||
|  | r119 | |||
| #region state managment | ||||
| bool BeginTransit() { | ||||
| return UNRESOLVED_SATE == Interlocked.CompareExchange(ref m_state, TRANSITIONAL_STATE, UNRESOLVED_SATE); | ||||
| } | ||||
| void CompleteTransit(int state) { | ||||
| if (TRANSITIONAL_STATE != Interlocked.CompareExchange(ref m_state, state, TRANSITIONAL_STATE)) | ||||
| throw new InvalidOperationException("Can't complete transition when the object isn't in the transitional state"); | ||||
| } | ||||
| void WaitTransition() { | ||||
| while (m_state == TRANSITIONAL_STATE) { | ||||
| Thread.MemoryBarrier(); | ||||
| } | ||||
| } | ||||
|  | r130 | protected bool BeginSetResult() { | ||
|  | r119 | if (!BeginTransit()) { | ||
| WaitTransition(); | ||||
| if (m_state != CANCELLED_STATE) | ||||
| throw new InvalidOperationException("The promise is already resolved"); | ||||
|  | r130 | return false; | ||
|  | r119 | } | ||
|  | r130 | return true; | ||
|  | r119 | } | ||
| protected void EndSetResult() { | ||||
| CompleteTransit(SUCCEEDED_STATE); | ||||
| OnSuccess(); | ||||
| } | ||||
| /// <summary> | ||||
| /// Выполняет обещание, сообщая об ошибке | ||||
| /// </summary> | ||||
| /// <remarks> | ||||
| /// Поскольку обещание должно работать в многопточной среде, при его выполнении сразу несколько потоков | ||||
| /// могу вернуть ошибку, при этом только первая будет использована в качестве результата, остальные | ||||
| /// будут проигнорированы. | ||||
| /// </remarks> | ||||
| /// <param name="error">Исключение возникшее при выполнении операции</param> | ||||
| /// <exception cref="InvalidOperationException">Данное обещание уже выполнено</exception> | ||||
| protected void SetError(Exception error) { | ||||
| if (BeginTransit()) { | ||||
|  | r138 | if (error is OperationCanceledException) { | ||
| CompleteTransit(CANCELLED_STATE); | ||||
| m_error = error.InnerException; | ||||
| OnCancelled(); | ||||
| } else { | ||||
| m_error = error is PromiseTransientException ? error.InnerException : error; | ||||
| CompleteTransit(REJECTED_STATE); | ||||
| OnError(); | ||||
| } | ||||
|  | r119 | } else { | ||
| WaitTransition(); | ||||
| if (m_state == SUCCEEDED_STATE) | ||||
| throw new InvalidOperationException("The promise is already resolved"); | ||||
| } | ||||
| } | ||||
| /// <summary> | ||||
| /// Отменяет операцию, если это возможно. | ||||
| /// </summary> | ||||
| /// <remarks>Для определения была ли операция отменена следует использовать свойство <see cref="IsCancelled"/>.</remarks> | ||||
|  | r138 | protected void SetCancelled(Exception reason) { | ||
|  | r119 | if (BeginTransit()) { | ||
|  | r138 | m_error = reason; | ||
|  | r119 | CompleteTransit(CANCELLED_STATE); | ||
| OnCancelled(); | ||||
| } | ||||
| } | ||||
| protected abstract void SignalSuccess(THandler handler); | ||||
| protected abstract void SignalError(THandler handler, Exception error); | ||||
|  | r138 | protected abstract void SignalCancelled(THandler handler, Exception reason); | ||
|  | r119 | |||
| void OnSuccess() { | ||||
|  | r125 | var hp = m_handlerPointer; | ||
| var slot = hp +1 ; | ||||
| while (slot < m_handlersCommited) { | ||||
| if (Interlocked.CompareExchange(ref m_handlerPointer, slot, hp) == hp) { | ||||
| SignalSuccess(m_handlers[slot]); | ||||
| } | ||||
| hp = m_handlerPointer; | ||||
| slot = hp +1 ; | ||||
| } | ||||
| if (m_extraHandlers != null) { | ||||
| THandler handler; | ||||
| while (m_extraHandlers.TryDequeue(out handler)) | ||||
| SignalSuccess(handler); | ||||
| } | ||||
|  | r119 | } | ||
| void OnError() { | ||||
|  | r125 | var hp = m_handlerPointer; | ||
| var slot = hp +1 ; | ||||
| while (slot < m_handlersCommited) { | ||||
| if (Interlocked.CompareExchange(ref m_handlerPointer, slot, hp) == hp) { | ||||
| SignalError(m_handlers[slot],m_error); | ||||
| } | ||||
| hp = m_handlerPointer; | ||||
| slot = hp +1 ; | ||||
| } | ||||
| if (m_extraHandlers != null) { | ||||
| THandler handler; | ||||
| while (m_extraHandlers.TryDequeue(out handler)) | ||||
| SignalError(handler, m_error); | ||||
| } | ||||
|  | r119 | } | ||
| void OnCancelled() { | ||||
|  | r125 | var hp = m_handlerPointer; | ||
| var slot = hp +1 ; | ||||
| while (slot < m_handlersCommited) { | ||||
| if (Interlocked.CompareExchange(ref m_handlerPointer, slot, hp) == hp) { | ||||
|  | r138 | SignalCancelled(m_handlers[slot], m_error); | ||
|  | r125 | } | ||
| hp = m_handlerPointer; | ||||
| slot = hp +1 ; | ||||
| } | ||||
| if (m_extraHandlers != null) { | ||||
| THandler handler; | ||||
| while (m_extraHandlers.TryDequeue(out handler)) | ||||
|  | r138 | SignalCancelled(handler, m_error); | ||
|  | r125 | } | ||
|  | r119 | } | ||
| #endregion | ||||
| protected abstract void Listen(PromiseEventType events, Action handler); | ||||
| #region synchronization traits | ||||
| protected void WaitResult(int timeout) { | ||||
| if (!IsResolved) { | ||||
| var lk = new object(); | ||||
| Listen(PromiseEventType.All, () => { | ||||
| lock(lk) { | ||||
| Monitor.Pulse(lk); | ||||
| } | ||||
| }); | ||||
| lock (lk) { | ||||
| while(!IsResolved) { | ||||
| if(!Monitor.Wait(lk,timeout)) | ||||
| throw new TimeoutException(); | ||||
| } | ||||
| } | ||||
| } | ||||
| switch (m_state) { | ||||
| case SUCCEEDED_STATE: | ||||
| return; | ||||
| case CANCELLED_STATE: | ||||
| throw new OperationCanceledException(); | ||||
| case REJECTED_STATE: | ||||
| throw new TargetInvocationException(m_error); | ||||
| default: | ||||
| throw new ApplicationException(String.Format("Invalid promise state {0}", m_state)); | ||||
| } | ||||
| } | ||||
| #endregion | ||||
| #region handlers managment | ||||
| protected void AddHandler(THandler handler) { | ||||
|  | r125 | if (m_state > 1) { | ||
|  | r119 | // the promise is in the resolved state, just invoke the handler | ||
|  | r125 | InvokeHandler(handler); | ||
| } else { | ||||
| var slot = Interlocked.Increment(ref m_handlersCount) - 1; | ||||
| if (slot < RESERVED_HANDLERS_COUNT) { | ||||
| m_handlers[slot] = handler; | ||||
| while (slot != Interlocked.CompareExchange(ref m_handlersCommited, slot + 1, slot)) { | ||||
| } | ||||
|  | r119 | |||
|  | r125 | if (m_state > 1) { | ||
| do { | ||||
| var hp = m_handlerPointer; | ||||
| slot = hp + 1; | ||||
| if (slot < m_handlersCommited) { | ||||
| if (Interlocked.CompareExchange(ref m_handlerPointer, slot, hp) != hp) | ||||
| continue; | ||||
| InvokeHandler(m_handlers[slot]); | ||||
| } | ||||
| break; | ||||
| } while(true); | ||||
| } | ||||
| } else { | ||||
| if (slot == RESERVED_HANDLERS_COUNT) { | ||||
| m_extraHandlers = new MTQueue<THandler>(); | ||||
| } else { | ||||
| while (m_extraHandlers == null) | ||||
| Thread.MemoryBarrier(); | ||||
| } | ||||
|  | r119 | |||
|  | r125 | m_extraHandlers.Enqueue(handler); | ||
| if (m_state > 1 && m_extraHandlers.TryDequeue(out handler)) | ||||
|  | r119 | // if the promise have been resolved while we was adding the handler to the queue | ||
| // we can't guarantee that someone is still processing it | ||||
| // therefore we need to fetch a handler from the queue and execute it | ||||
| // note that fetched handler may be not the one that we have added | ||||
| // even we can fetch no handlers at all :) | ||||
| InvokeHandler(handler); | ||||
|  | r125 | } | ||
|  | r119 | } | ||
| } | ||||
| protected void InvokeHandler(THandler handler) { | ||||
| switch (m_state) { | ||||
| case SUCCEEDED_STATE: | ||||
| SignalSuccess(handler); | ||||
| break; | ||||
| case CANCELLED_STATE: | ||||
|  | r138 | SignalCancelled(handler, m_error); | ||
|  | r119 | break; | ||
| case REJECTED_STATE: | ||||
| SignalError(handler, m_error); | ||||
| break; | ||||
| default: | ||||
| throw new Exception(String.Format("Invalid promise state {0}", m_state)); | ||||
| } | ||||
| } | ||||
| #endregion | ||||
| #region IPromise implementation | ||||
| public void Join(int timeout) { | ||||
| WaitResult(timeout); | ||||
| } | ||||
| public void Join() { | ||||
| WaitResult(-1); | ||||
| } | ||||
| public bool IsResolved { | ||||
| get { | ||||
| Thread.MemoryBarrier(); | ||||
| return m_state > 1; | ||||
| } | ||||
| } | ||||
| public bool IsCancelled { | ||||
| get { | ||||
| Thread.MemoryBarrier(); | ||||
| return m_state == CANCELLED_STATE; | ||||
| } | ||||
| } | ||||
| #endregion | ||||
| #region ICancellable implementation | ||||
| public void Cancel() { | ||||
|  | r138 | SetCancelled(null); | ||
| } | ||||
| public void Cancel(Exception reason) { | ||||
| SetCancelled(reason); | ||||
|  | r119 | } | ||
| #endregion | ||||
|  | r138 | |||
| public Exception Error { | ||||
| get { | ||||
| return m_error; | ||||
| } | ||||
| } | ||||
|  | r119 | } | ||
| } | ||||
