AbstractEvent.cs
304 lines
| 10.6 KiB
| text/x-csharp
|
CSharpLexer
/ Implab / AbstractEvent.cs
|
|
r144 | using System; | ||
| using Implab.Parallels; | ||||
| using System.Threading; | ||||
| using System.Reflection; | ||||
| namespace Implab { | ||||
|
|
r145 | public abstract class AbstractEvent<THandler> : ICancellationToken, ICancellable { | ||
|
|
r144 | |||
| const int UNRESOLVED_SATE = 0; | ||||
| const int TRANSITIONAL_STATE = 1; | ||||
|
|
r156 | protected const int SUCCEEDED_STATE = 2; | ||
| protected const int REJECTED_STATE = 3; | ||||
| protected const int CANCELLED_STATE = 4; | ||||
|
|
r144 | |||
| const int CANCEL_NOT_REQUESTED = 0; | ||||
| const int CANCEL_REQUESTING = 1; | ||||
| const int CANCEL_REQUESTED = 2; | ||||
| const int RESERVED_HANDLERS_COUNT = 4; | ||||
| int m_state; | ||||
| Exception m_error; | ||||
| int m_handlersCount; | ||||
|
|
r156 | //readonly THandler[] m_handlers = new THandler[RESERVED_HANDLERS_COUNT]; | ||
| THandler[] m_handlers; | ||||
|
|
r144 | MTQueue<THandler> m_extraHandlers; | ||
| int m_handlerPointer = -1; | ||||
| int m_handlersCommited; | ||||
| int m_cancelRequest; | ||||
| Exception m_cancelationReason; | ||||
| MTQueue<Action<Exception>> m_cancelationHandlers; | ||||
| #region state managment | ||||
| bool BeginTransit() { | ||||
| return UNRESOLVED_SATE == Interlocked.CompareExchange(ref m_state, TRANSITIONAL_STATE, UNRESOLVED_SATE); | ||||
| } | ||||
| void CompleteTransit(int state) { | ||||
| if (TRANSITIONAL_STATE != Interlocked.CompareExchange(ref m_state, state, TRANSITIONAL_STATE)) | ||||
| throw new InvalidOperationException("Can't complete transition when the object isn't in the transitional state"); | ||||
| } | ||||
| void WaitTransition() { | ||||
| while (m_state == TRANSITIONAL_STATE) { | ||||
| Thread.MemoryBarrier(); | ||||
| } | ||||
| } | ||||
| protected bool BeginSetResult() { | ||||
| if (!BeginTransit()) { | ||||
| WaitTransition(); | ||||
| if (m_state != CANCELLED_STATE) | ||||
| throw new InvalidOperationException("The promise is already resolved"); | ||||
| return false; | ||||
| } | ||||
| return true; | ||||
| } | ||||
| protected void EndSetResult() { | ||||
| CompleteTransit(SUCCEEDED_STATE); | ||||
|
|
r156 | Signal(); | ||
|
|
r144 | } | ||
| /// <summary> | ||||
| /// Выполняет обещание, сообщая об ошибке | ||||
| /// </summary> | ||||
| /// <remarks> | ||||
| /// Поскольку обещание должно работать в многопточной среде, при его выполнении сразу несколько потоков | ||||
| /// могу вернуть ошибку, при этом только первая будет использована в качестве результата, остальные | ||||
| /// будут проигнорированы. | ||||
| /// </remarks> | ||||
| /// <param name="error">Исключение возникшее при выполнении операции</param> | ||||
| /// <exception cref="InvalidOperationException">Данное обещание уже выполнено</exception> | ||||
| protected void SetError(Exception error) { | ||||
| if (BeginTransit()) { | ||||
| if (error is OperationCanceledException) { | ||||
|
|
r156 | m_error = error.InnerException; | ||
|
|
r144 | CompleteTransit(CANCELLED_STATE); | ||
| } else { | ||||
| m_error = error is PromiseTransientException ? error.InnerException : error; | ||||
| CompleteTransit(REJECTED_STATE); | ||||
| } | ||||
|
|
r156 | Signal(); | ||
|
|
r144 | } else { | ||
| WaitTransition(); | ||||
| if (m_state == SUCCEEDED_STATE) | ||||
| throw new InvalidOperationException("The promise is already resolved"); | ||||
| } | ||||
| } | ||||
| /// <summary> | ||||
| /// Отменяет операцию, если это возможно. | ||||
| /// </summary> | ||||
| /// <remarks>Для определения была ли операция отменена следует использовать свойство <see cref="IsCancelled"/>.</remarks> | ||||
| protected void SetCancelled(Exception reason) { | ||||
| if (BeginTransit()) { | ||||
| m_error = reason; | ||||
| CompleteTransit(CANCELLED_STATE); | ||||
|
|
r156 | Signal(); | ||
|
|
r144 | } | ||
| } | ||||
|
|
r156 | protected abstract void SignalHandler(THandler handler, int signal); | ||
|
|
r144 | |||
|
|
r156 | void Signal() { | ||
|
|
r144 | var hp = m_handlerPointer; | ||
| var slot = hp +1 ; | ||||
| while (slot < m_handlersCommited) { | ||||
| if (Interlocked.CompareExchange(ref m_handlerPointer, slot, hp) == hp) { | ||||
|
|
r156 | SignalHandler(m_handlers[slot], m_state); | ||
|
|
r144 | } | ||
| hp = m_handlerPointer; | ||||
| slot = hp +1 ; | ||||
| } | ||||
| if (m_extraHandlers != null) { | ||||
| THandler handler; | ||||
| while (m_extraHandlers.TryDequeue(out handler)) | ||||
|
|
r156 | SignalHandler(handler, m_state); | ||
|
|
r144 | } | ||
| } | ||||
| #endregion | ||||
| protected abstract Signal GetResolveSignal(); | ||||
| #region synchronization traits | ||||
| protected void WaitResult(int timeout) { | ||||
|
|
r148 | if (!(IsResolved || GetResolveSignal().Wait(timeout))) | ||
| throw new TimeoutException(); | ||||
|
|
r144 | |||
| switch (m_state) { | ||||
| case SUCCEEDED_STATE: | ||||
| return; | ||||
| case CANCELLED_STATE: | ||||
| throw new OperationCanceledException(); | ||||
| case REJECTED_STATE: | ||||
| throw new TargetInvocationException(m_error); | ||||
| default: | ||||
| throw new ApplicationException(String.Format("Invalid promise state {0}", m_state)); | ||||
| } | ||||
| } | ||||
| #endregion | ||||
| #region handlers managment | ||||
| protected void AddHandler(THandler handler) { | ||||
| if (m_state > 1) { | ||||
| // the promise is in the resolved state, just invoke the handler | ||||
|
|
r156 | SignalHandler(handler, m_state); | ||
|
|
r144 | } else { | ||
| var slot = Interlocked.Increment(ref m_handlersCount) - 1; | ||||
| if (slot < RESERVED_HANDLERS_COUNT) { | ||||
|
|
r160 | if (slot == 0) { | ||
| m_handlers = new THandler[RESERVED_HANDLERS_COUNT]; | ||||
| } else { | ||||
| while (m_handlers == null) | ||||
| Thread.MemoryBarrier(); | ||||
| } | ||||
|
|
r156 | |||
|
|
r144 | m_handlers[slot] = handler; | ||
| while (slot != Interlocked.CompareExchange(ref m_handlersCommited, slot + 1, slot)) { | ||||
| } | ||||
| if (m_state > 1) { | ||||
| do { | ||||
| var hp = m_handlerPointer; | ||||
| slot = hp + 1; | ||||
| if (slot < m_handlersCommited) { | ||||
| if (Interlocked.CompareExchange(ref m_handlerPointer, slot, hp) != hp) | ||||
| continue; | ||||
|
|
r156 | SignalHandler(m_handlers[slot], m_state); | ||
|
|
r144 | } | ||
| break; | ||||
| } while(true); | ||||
| } | ||||
| } else { | ||||
| if (slot == RESERVED_HANDLERS_COUNT) { | ||||
| m_extraHandlers = new MTQueue<THandler>(); | ||||
| } else { | ||||
| while (m_extraHandlers == null) | ||||
| Thread.MemoryBarrier(); | ||||
| } | ||||
| m_extraHandlers.Enqueue(handler); | ||||
| if (m_state > 1 && m_extraHandlers.TryDequeue(out handler)) | ||||
| // if the promise have been resolved while we was adding the handler to the queue | ||||
| // we can't guarantee that someone is still processing it | ||||
| // therefore we need to fetch a handler from the queue and execute it | ||||
| // note that fetched handler may be not the one that we have added | ||||
| // even we can fetch no handlers at all :) | ||||
|
|
r156 | SignalHandler(handler, m_state); | ||
|
|
r144 | } | ||
| } | ||||
| } | ||||
| #endregion | ||||
| #region IPromise implementation | ||||
| public bool IsResolved { | ||||
| get { | ||||
| Thread.MemoryBarrier(); | ||||
| return m_state > 1; | ||||
| } | ||||
| } | ||||
| public bool IsCancelled { | ||||
| get { | ||||
| Thread.MemoryBarrier(); | ||||
| return m_state == CANCELLED_STATE; | ||||
| } | ||||
| } | ||||
| #endregion | ||||
| public Exception Error { | ||||
| get { | ||||
| return m_error; | ||||
| } | ||||
| } | ||||
|
|
r145 | public bool CancelOperationIfRequested() { | ||
| if (IsCancellationRequested) { | ||||
| CancelOperation(CancellationReason); | ||||
| return true; | ||||
| } | ||||
| return false; | ||||
|
|
r144 | } | ||
| public virtual void CancelOperation(Exception reason) { | ||||
| SetCancelled(reason); | ||||
| } | ||||
|
|
r145 | public void CancellationRequested(Action<Exception> handler) { | ||
|
|
r144 | Safe.ArgumentNotNull(handler, "handler"); | ||
|
|
r145 | if (IsCancellationRequested) | ||
| handler(CancellationReason); | ||||
|
|
r144 | |||
| if (m_cancelationHandlers == null) | ||||
| Interlocked.CompareExchange(ref m_cancelationHandlers, new MTQueue<Action<Exception>>(), null); | ||||
| m_cancelationHandlers.Enqueue(handler); | ||||
|
|
r145 | if (IsCancellationRequested && m_cancelationHandlers.TryDequeue(out handler)) | ||
|
|
r144 | // TryDeque implies MemoryBarrier() | ||
| handler(m_cancelationReason); | ||||
| } | ||||
|
|
r145 | public bool IsCancellationRequested { | ||
|
|
r144 | get { | ||
| do { | ||||
| if (m_cancelRequest == CANCEL_NOT_REQUESTED) | ||||
| return false; | ||||
| if (m_cancelRequest == CANCEL_REQUESTED) | ||||
| return true; | ||||
| Thread.MemoryBarrier(); | ||||
| } while(true); | ||||
| } | ||||
| } | ||||
|
|
r145 | public Exception CancellationReason { | ||
|
|
r144 | get { | ||
| do { | ||||
| Thread.MemoryBarrier(); | ||||
| } while(m_cancelRequest == CANCEL_REQUESTING); | ||||
| return m_cancelationReason; | ||||
| } | ||||
| } | ||||
| #region ICancellable implementation | ||||
| public void Cancel() { | ||||
| Cancel(null); | ||||
| } | ||||
| public void Cancel(Exception reason) { | ||||
|
|
r145 | if (CANCEL_NOT_REQUESTED == Interlocked.CompareExchange(ref m_cancelRequest, CANCEL_REQUESTING, CANCEL_NOT_REQUESTED)) { | ||
|
|
r144 | m_cancelationReason = reason; | ||
| m_cancelRequest = CANCEL_REQUESTED; | ||||
| if (m_cancelationHandlers != null) { | ||||
| Action<Exception> handler; | ||||
| while (m_cancelationHandlers.TryDequeue(out handler)) | ||||
| handler(m_cancelationReason); | ||||
| } | ||||
| } | ||||
| } | ||||
| #endregion | ||||
| } | ||||
| } | ||||
