|
|
using System;
|
|
|
using Implab.Parallels;
|
|
|
using System.Threading;
|
|
|
using System.Reflection;
|
|
|
|
|
|
namespace Implab {
|
|
|
public abstract class AbstractEvent<THandler> : ICancellationToken, ICancellable {
|
|
|
|
|
|
const int UNRESOLVED_SATE = 0;
|
|
|
const int TRANSITIONAL_STATE = 1;
|
|
|
protected const int SUCCEEDED_STATE = 2;
|
|
|
protected const int REJECTED_STATE = 3;
|
|
|
protected const int CANCELLED_STATE = 4;
|
|
|
|
|
|
const int CANCEL_NOT_REQUESTED = 0;
|
|
|
const int CANCEL_REQUESTING = 1;
|
|
|
const int CANCEL_REQUESTED = 2;
|
|
|
|
|
|
const int RESERVED_HANDLERS_COUNT = 4;
|
|
|
|
|
|
int m_state;
|
|
|
Exception m_error;
|
|
|
int m_handlersCount;
|
|
|
|
|
|
//readonly THandler[] m_handlers = new THandler[RESERVED_HANDLERS_COUNT];
|
|
|
THandler[] m_handlers;
|
|
|
MTQueue<THandler> m_extraHandlers;
|
|
|
int m_handlerPointer = -1;
|
|
|
int m_handlersCommited;
|
|
|
|
|
|
int m_cancelRequest;
|
|
|
Exception m_cancelationReason;
|
|
|
MTQueue<Action<Exception>> m_cancelationHandlers;
|
|
|
|
|
|
|
|
|
#region state managment
|
|
|
bool BeginTransit() {
|
|
|
return UNRESOLVED_SATE == Interlocked.CompareExchange(ref m_state, TRANSITIONAL_STATE, UNRESOLVED_SATE);
|
|
|
}
|
|
|
|
|
|
void CompleteTransit(int state) {
|
|
|
if (TRANSITIONAL_STATE != Interlocked.CompareExchange(ref m_state, state, TRANSITIONAL_STATE))
|
|
|
throw new InvalidOperationException("Can't complete transition when the object isn't in the transitional state");
|
|
|
}
|
|
|
|
|
|
void WaitTransition() {
|
|
|
while (m_state == TRANSITIONAL_STATE) {
|
|
|
Thread.MemoryBarrier();
|
|
|
}
|
|
|
}
|
|
|
|
|
|
protected bool BeginSetResult() {
|
|
|
if (!BeginTransit()) {
|
|
|
WaitTransition();
|
|
|
if (m_state != CANCELLED_STATE)
|
|
|
throw new InvalidOperationException("The promise is already resolved");
|
|
|
return false;
|
|
|
}
|
|
|
return true;
|
|
|
}
|
|
|
|
|
|
protected void EndSetResult() {
|
|
|
CompleteTransit(SUCCEEDED_STATE);
|
|
|
Signal();
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
/// <summary>
|
|
|
/// Выполняет обещание, сообщая об ошибке
|
|
|
/// </summary>
|
|
|
/// <remarks>
|
|
|
/// Поскольку обещание должно работать в многопточной среде, при его выполнении сразу несколько потоков
|
|
|
/// могу вернуть ошибку, при этом только первая будет использована в качестве результата, остальные
|
|
|
/// будут проигнорированы.
|
|
|
/// </remarks>
|
|
|
/// <param name="error">Исключение возникшее при выполнении операции</param>
|
|
|
/// <exception cref="InvalidOperationException">Данное обещание уже выполнено</exception>
|
|
|
protected void SetError(Exception error) {
|
|
|
while (error is PromiseTransientException)
|
|
|
error = error.InnerException;
|
|
|
|
|
|
var isCancel = error is OperationCanceledException;
|
|
|
|
|
|
if (BeginTransit()) {
|
|
|
m_error = isCancel ? error.InnerException : error;
|
|
|
CompleteTransit(isCancel ? CANCELLED_STATE : REJECTED_STATE);
|
|
|
|
|
|
Signal();
|
|
|
} else {
|
|
|
WaitTransition();
|
|
|
if (!isCancel || m_state == SUCCEEDED_STATE)
|
|
|
throw new InvalidOperationException("The promise is already resolved");
|
|
|
}
|
|
|
}
|
|
|
|
|
|
/// <summary>
|
|
|
/// Отменяет операцию, если это возможно.
|
|
|
/// </summary>
|
|
|
/// <remarks>Для определения была ли операция отменена следует использовать свойство <see cref="IsCancelled"/>.</remarks>
|
|
|
protected void SetCancelled(Exception reason) {
|
|
|
if (BeginTransit()) {
|
|
|
m_error = reason;
|
|
|
CompleteTransit(CANCELLED_STATE);
|
|
|
Signal();
|
|
|
}
|
|
|
}
|
|
|
|
|
|
protected abstract void SignalHandler(THandler handler, int signal);
|
|
|
|
|
|
void Signal() {
|
|
|
var hp = m_handlerPointer;
|
|
|
var slot = hp +1 ;
|
|
|
while (slot < m_handlersCommited) {
|
|
|
if (Interlocked.CompareExchange(ref m_handlerPointer, slot, hp) == hp) {
|
|
|
SignalHandler(m_handlers[slot], m_state);
|
|
|
}
|
|
|
hp = m_handlerPointer;
|
|
|
slot = hp +1 ;
|
|
|
}
|
|
|
|
|
|
|
|
|
if (m_extraHandlers != null) {
|
|
|
THandler handler;
|
|
|
while (m_extraHandlers.TryDequeue(out handler))
|
|
|
SignalHandler(handler, m_state);
|
|
|
}
|
|
|
}
|
|
|
|
|
|
#endregion
|
|
|
|
|
|
protected abstract Signal GetResolveSignal();
|
|
|
|
|
|
#region synchronization traits
|
|
|
protected void WaitResult(int timeout) {
|
|
|
if (!(IsResolved || GetResolveSignal().Wait(timeout)))
|
|
|
throw new TimeoutException();
|
|
|
|
|
|
switch (m_state) {
|
|
|
case SUCCEEDED_STATE:
|
|
|
return;
|
|
|
case CANCELLED_STATE:
|
|
|
throw new OperationCanceledException();
|
|
|
case REJECTED_STATE:
|
|
|
throw new TargetInvocationException(m_error);
|
|
|
default:
|
|
|
throw new ApplicationException(String.Format("Invalid promise state {0}", m_state));
|
|
|
}
|
|
|
}
|
|
|
#endregion
|
|
|
|
|
|
#region handlers managment
|
|
|
|
|
|
protected void AddHandler(THandler handler) {
|
|
|
|
|
|
if (m_state > 1) {
|
|
|
// the promise is in the resolved state, just invoke the handler
|
|
|
SignalHandler(handler, m_state);
|
|
|
} else {
|
|
|
var slot = Interlocked.Increment(ref m_handlersCount) - 1;
|
|
|
|
|
|
if (slot < RESERVED_HANDLERS_COUNT) {
|
|
|
|
|
|
if (slot == 0) {
|
|
|
m_handlers = new THandler[RESERVED_HANDLERS_COUNT];
|
|
|
} else {
|
|
|
while (m_handlers == null)
|
|
|
Thread.MemoryBarrier();
|
|
|
}
|
|
|
|
|
|
m_handlers[slot] = handler;
|
|
|
|
|
|
while (slot != Interlocked.CompareExchange(ref m_handlersCommited, slot + 1, slot)) {
|
|
|
}
|
|
|
|
|
|
if (m_state > 1) {
|
|
|
do {
|
|
|
var hp = m_handlerPointer;
|
|
|
slot = hp + 1;
|
|
|
if (slot < m_handlersCommited) {
|
|
|
if (Interlocked.CompareExchange(ref m_handlerPointer, slot, hp) != hp)
|
|
|
continue;
|
|
|
SignalHandler(m_handlers[slot], m_state);
|
|
|
}
|
|
|
break;
|
|
|
} while(true);
|
|
|
}
|
|
|
} else {
|
|
|
if (slot == RESERVED_HANDLERS_COUNT) {
|
|
|
m_extraHandlers = new MTQueue<THandler>();
|
|
|
} else {
|
|
|
while (m_extraHandlers == null)
|
|
|
Thread.MemoryBarrier();
|
|
|
}
|
|
|
|
|
|
m_extraHandlers.Enqueue(handler);
|
|
|
|
|
|
if (m_state > 1 && m_extraHandlers.TryDequeue(out handler))
|
|
|
// if the promise have been resolved while we was adding the handler to the queue
|
|
|
// we can't guarantee that someone is still processing it
|
|
|
// therefore we need to fetch a handler from the queue and execute it
|
|
|
// note that fetched handler may be not the one that we have added
|
|
|
// even we can fetch no handlers at all :)
|
|
|
SignalHandler(handler, m_state);
|
|
|
}
|
|
|
}
|
|
|
}
|
|
|
|
|
|
#endregion
|
|
|
|
|
|
#region IPromise implementation
|
|
|
|
|
|
public bool IsResolved {
|
|
|
get {
|
|
|
Thread.MemoryBarrier();
|
|
|
return m_state > 1;
|
|
|
}
|
|
|
}
|
|
|
|
|
|
public bool IsCancelled {
|
|
|
get {
|
|
|
Thread.MemoryBarrier();
|
|
|
return m_state == CANCELLED_STATE;
|
|
|
}
|
|
|
}
|
|
|
|
|
|
#endregion
|
|
|
|
|
|
public Exception Error {
|
|
|
get {
|
|
|
return m_error;
|
|
|
}
|
|
|
}
|
|
|
|
|
|
public bool CancelOperationIfRequested() {
|
|
|
if (IsCancellationRequested) {
|
|
|
CancelOperation(CancellationReason);
|
|
|
return true;
|
|
|
}
|
|
|
return false;
|
|
|
}
|
|
|
|
|
|
public virtual void CancelOperation(Exception reason) {
|
|
|
SetCancelled(reason);
|
|
|
}
|
|
|
|
|
|
public void CancellationRequested(Action<Exception> handler) {
|
|
|
Safe.ArgumentNotNull(handler, "handler");
|
|
|
if (IsCancellationRequested)
|
|
|
handler(CancellationReason);
|
|
|
|
|
|
if (m_cancelationHandlers == null)
|
|
|
Interlocked.CompareExchange(ref m_cancelationHandlers, new MTQueue<Action<Exception>>(), null);
|
|
|
|
|
|
m_cancelationHandlers.Enqueue(handler);
|
|
|
|
|
|
if (IsCancellationRequested && m_cancelationHandlers.TryDequeue(out handler))
|
|
|
// TryDeque implies MemoryBarrier()
|
|
|
handler(m_cancelationReason);
|
|
|
}
|
|
|
|
|
|
public bool IsCancellationRequested {
|
|
|
get {
|
|
|
do {
|
|
|
if (m_cancelRequest == CANCEL_NOT_REQUESTED)
|
|
|
return false;
|
|
|
if (m_cancelRequest == CANCEL_REQUESTED)
|
|
|
return true;
|
|
|
Thread.MemoryBarrier();
|
|
|
} while(true);
|
|
|
}
|
|
|
}
|
|
|
|
|
|
public Exception CancellationReason {
|
|
|
get {
|
|
|
do {
|
|
|
Thread.MemoryBarrier();
|
|
|
} while(m_cancelRequest == CANCEL_REQUESTING);
|
|
|
|
|
|
return m_cancelationReason;
|
|
|
}
|
|
|
}
|
|
|
|
|
|
#region ICancellable implementation
|
|
|
|
|
|
public void Cancel() {
|
|
|
Cancel(null);
|
|
|
}
|
|
|
|
|
|
public void Cancel(Exception reason) {
|
|
|
if (CANCEL_NOT_REQUESTED == Interlocked.CompareExchange(ref m_cancelRequest, CANCEL_REQUESTING, CANCEL_NOT_REQUESTED)) {
|
|
|
m_cancelationReason = reason;
|
|
|
m_cancelRequest = CANCEL_REQUESTED;
|
|
|
if (m_cancelationHandlers != null) {
|
|
|
Action<Exception> handler;
|
|
|
while (m_cancelationHandlers.TryDequeue(out handler))
|
|
|
handler(m_cancelationReason);
|
|
|
}
|
|
|
}
|
|
|
}
|
|
|
|
|
|
#endregion
|
|
|
}
|
|
|
}
|
|
|
|
|
|
|