AbstractEvent.cs
304 lines
| 10.6 KiB
| text/x-csharp
|
CSharpLexer
/ Implab / AbstractEvent.cs
cin
|
r144 | using System; | ||
using Implab.Parallels; | ||||
using System.Threading; | ||||
using System.Reflection; | ||||
namespace Implab { | ||||
cin
|
r145 | public abstract class AbstractEvent<THandler> : ICancellationToken, ICancellable { | ||
cin
|
r144 | |||
const int UNRESOLVED_SATE = 0; | ||||
const int TRANSITIONAL_STATE = 1; | ||||
cin
|
r156 | protected const int SUCCEEDED_STATE = 2; | ||
protected const int REJECTED_STATE = 3; | ||||
protected const int CANCELLED_STATE = 4; | ||||
cin
|
r144 | |||
const int CANCEL_NOT_REQUESTED = 0; | ||||
const int CANCEL_REQUESTING = 1; | ||||
const int CANCEL_REQUESTED = 2; | ||||
const int RESERVED_HANDLERS_COUNT = 4; | ||||
int m_state; | ||||
Exception m_error; | ||||
int m_handlersCount; | ||||
cin
|
r156 | //readonly THandler[] m_handlers = new THandler[RESERVED_HANDLERS_COUNT]; | ||
THandler[] m_handlers; | ||||
cin
|
r144 | MTQueue<THandler> m_extraHandlers; | ||
int m_handlerPointer = -1; | ||||
int m_handlersCommited; | ||||
int m_cancelRequest; | ||||
Exception m_cancelationReason; | ||||
MTQueue<Action<Exception>> m_cancelationHandlers; | ||||
#region state managment | ||||
bool BeginTransit() { | ||||
return UNRESOLVED_SATE == Interlocked.CompareExchange(ref m_state, TRANSITIONAL_STATE, UNRESOLVED_SATE); | ||||
} | ||||
void CompleteTransit(int state) { | ||||
if (TRANSITIONAL_STATE != Interlocked.CompareExchange(ref m_state, state, TRANSITIONAL_STATE)) | ||||
throw new InvalidOperationException("Can't complete transition when the object isn't in the transitional state"); | ||||
} | ||||
void WaitTransition() { | ||||
while (m_state == TRANSITIONAL_STATE) { | ||||
Thread.MemoryBarrier(); | ||||
} | ||||
} | ||||
protected bool BeginSetResult() { | ||||
if (!BeginTransit()) { | ||||
WaitTransition(); | ||||
if (m_state != CANCELLED_STATE) | ||||
throw new InvalidOperationException("The promise is already resolved"); | ||||
return false; | ||||
} | ||||
return true; | ||||
} | ||||
protected void EndSetResult() { | ||||
CompleteTransit(SUCCEEDED_STATE); | ||||
cin
|
r156 | Signal(); | ||
cin
|
r144 | } | ||
/// <summary> | ||||
/// Выполняет обещание, сообщая об ошибке | ||||
/// </summary> | ||||
/// <remarks> | ||||
/// Поскольку обещание должно работать в многопточной среде, при его выполнении сразу несколько потоков | ||||
/// могу вернуть ошибку, при этом только первая будет использована в качестве результата, остальные | ||||
/// будут проигнорированы. | ||||
/// </remarks> | ||||
/// <param name="error">Исключение возникшее при выполнении операции</param> | ||||
/// <exception cref="InvalidOperationException">Данное обещание уже выполнено</exception> | ||||
protected void SetError(Exception error) { | ||||
if (BeginTransit()) { | ||||
if (error is OperationCanceledException) { | ||||
cin
|
r156 | m_error = error.InnerException; | ||
cin
|
r144 | CompleteTransit(CANCELLED_STATE); | ||
} else { | ||||
m_error = error is PromiseTransientException ? error.InnerException : error; | ||||
CompleteTransit(REJECTED_STATE); | ||||
} | ||||
cin
|
r156 | Signal(); | ||
cin
|
r144 | } else { | ||
WaitTransition(); | ||||
if (m_state == SUCCEEDED_STATE) | ||||
throw new InvalidOperationException("The promise is already resolved"); | ||||
} | ||||
} | ||||
/// <summary> | ||||
/// Отменяет операцию, если это возможно. | ||||
/// </summary> | ||||
/// <remarks>Для определения была ли операция отменена следует использовать свойство <see cref="IsCancelled"/>.</remarks> | ||||
protected void SetCancelled(Exception reason) { | ||||
if (BeginTransit()) { | ||||
m_error = reason; | ||||
CompleteTransit(CANCELLED_STATE); | ||||
cin
|
r156 | Signal(); | ||
cin
|
r144 | } | ||
} | ||||
cin
|
r156 | protected abstract void SignalHandler(THandler handler, int signal); | ||
cin
|
r144 | |||
cin
|
r156 | void Signal() { | ||
cin
|
r144 | var hp = m_handlerPointer; | ||
var slot = hp +1 ; | ||||
while (slot < m_handlersCommited) { | ||||
if (Interlocked.CompareExchange(ref m_handlerPointer, slot, hp) == hp) { | ||||
cin
|
r156 | SignalHandler(m_handlers[slot], m_state); | ||
cin
|
r144 | } | ||
hp = m_handlerPointer; | ||||
slot = hp +1 ; | ||||
} | ||||
if (m_extraHandlers != null) { | ||||
THandler handler; | ||||
while (m_extraHandlers.TryDequeue(out handler)) | ||||
cin
|
r156 | SignalHandler(handler, m_state); | ||
cin
|
r144 | } | ||
} | ||||
#endregion | ||||
protected abstract Signal GetResolveSignal(); | ||||
#region synchronization traits | ||||
protected void WaitResult(int timeout) { | ||||
cin
|
r148 | if (!(IsResolved || GetResolveSignal().Wait(timeout))) | ||
throw new TimeoutException(); | ||||
cin
|
r144 | |||
switch (m_state) { | ||||
case SUCCEEDED_STATE: | ||||
return; | ||||
case CANCELLED_STATE: | ||||
throw new OperationCanceledException(); | ||||
case REJECTED_STATE: | ||||
throw new TargetInvocationException(m_error); | ||||
default: | ||||
throw new ApplicationException(String.Format("Invalid promise state {0}", m_state)); | ||||
} | ||||
} | ||||
#endregion | ||||
#region handlers managment | ||||
protected void AddHandler(THandler handler) { | ||||
if (m_state > 1) { | ||||
// the promise is in the resolved state, just invoke the handler | ||||
cin
|
r156 | SignalHandler(handler, m_state); | ||
cin
|
r144 | } else { | ||
var slot = Interlocked.Increment(ref m_handlersCount) - 1; | ||||
if (slot < RESERVED_HANDLERS_COUNT) { | ||||
cin
|
r160 | if (slot == 0) { | ||
m_handlers = new THandler[RESERVED_HANDLERS_COUNT]; | ||||
} else { | ||||
while (m_handlers == null) | ||||
Thread.MemoryBarrier(); | ||||
} | ||||
cin
|
r156 | |||
cin
|
r144 | m_handlers[slot] = handler; | ||
while (slot != Interlocked.CompareExchange(ref m_handlersCommited, slot + 1, slot)) { | ||||
} | ||||
if (m_state > 1) { | ||||
do { | ||||
var hp = m_handlerPointer; | ||||
slot = hp + 1; | ||||
if (slot < m_handlersCommited) { | ||||
if (Interlocked.CompareExchange(ref m_handlerPointer, slot, hp) != hp) | ||||
continue; | ||||
cin
|
r156 | SignalHandler(m_handlers[slot], m_state); | ||
cin
|
r144 | } | ||
break; | ||||
} while(true); | ||||
} | ||||
} else { | ||||
if (slot == RESERVED_HANDLERS_COUNT) { | ||||
m_extraHandlers = new MTQueue<THandler>(); | ||||
} else { | ||||
while (m_extraHandlers == null) | ||||
Thread.MemoryBarrier(); | ||||
} | ||||
m_extraHandlers.Enqueue(handler); | ||||
if (m_state > 1 && m_extraHandlers.TryDequeue(out handler)) | ||||
// if the promise have been resolved while we was adding the handler to the queue | ||||
// we can't guarantee that someone is still processing it | ||||
// therefore we need to fetch a handler from the queue and execute it | ||||
// note that fetched handler may be not the one that we have added | ||||
// even we can fetch no handlers at all :) | ||||
cin
|
r156 | SignalHandler(handler, m_state); | ||
cin
|
r144 | } | ||
} | ||||
} | ||||
#endregion | ||||
#region IPromise implementation | ||||
public bool IsResolved { | ||||
get { | ||||
Thread.MemoryBarrier(); | ||||
return m_state > 1; | ||||
} | ||||
} | ||||
public bool IsCancelled { | ||||
get { | ||||
Thread.MemoryBarrier(); | ||||
return m_state == CANCELLED_STATE; | ||||
} | ||||
} | ||||
#endregion | ||||
public Exception Error { | ||||
get { | ||||
return m_error; | ||||
} | ||||
} | ||||
cin
|
r145 | public bool CancelOperationIfRequested() { | ||
if (IsCancellationRequested) { | ||||
CancelOperation(CancellationReason); | ||||
return true; | ||||
} | ||||
return false; | ||||
cin
|
r144 | } | ||
public virtual void CancelOperation(Exception reason) { | ||||
SetCancelled(reason); | ||||
} | ||||
cin
|
r145 | public void CancellationRequested(Action<Exception> handler) { | ||
cin
|
r144 | Safe.ArgumentNotNull(handler, "handler"); | ||
cin
|
r145 | if (IsCancellationRequested) | ||
handler(CancellationReason); | ||||
cin
|
r144 | |||
if (m_cancelationHandlers == null) | ||||
Interlocked.CompareExchange(ref m_cancelationHandlers, new MTQueue<Action<Exception>>(), null); | ||||
m_cancelationHandlers.Enqueue(handler); | ||||
cin
|
r145 | if (IsCancellationRequested && m_cancelationHandlers.TryDequeue(out handler)) | ||
cin
|
r144 | // TryDeque implies MemoryBarrier() | ||
handler(m_cancelationReason); | ||||
} | ||||
cin
|
r145 | public bool IsCancellationRequested { | ||
cin
|
r144 | get { | ||
do { | ||||
if (m_cancelRequest == CANCEL_NOT_REQUESTED) | ||||
return false; | ||||
if (m_cancelRequest == CANCEL_REQUESTED) | ||||
return true; | ||||
Thread.MemoryBarrier(); | ||||
} while(true); | ||||
} | ||||
} | ||||
cin
|
r145 | public Exception CancellationReason { | ||
cin
|
r144 | get { | ||
do { | ||||
Thread.MemoryBarrier(); | ||||
} while(m_cancelRequest == CANCEL_REQUESTING); | ||||
return m_cancelationReason; | ||||
} | ||||
} | ||||
#region ICancellable implementation | ||||
public void Cancel() { | ||||
Cancel(null); | ||||
} | ||||
public void Cancel(Exception reason) { | ||||
cin
|
r145 | if (CANCEL_NOT_REQUESTED == Interlocked.CompareExchange(ref m_cancelRequest, CANCEL_REQUESTING, CANCEL_NOT_REQUESTED)) { | ||
cin
|
r144 | m_cancelationReason = reason; | ||
m_cancelRequest = CANCEL_REQUESTED; | ||||
if (m_cancelationHandlers != null) { | ||||
Action<Exception> handler; | ||||
while (m_cancelationHandlers.TryDequeue(out handler)) | ||||
handler(m_cancelationReason); | ||||
} | ||||
} | ||||
} | ||||
#endregion | ||||
} | ||||
} | ||||