AbstractPromise.cs
219 lines
| 7.2 KiB
| text/x-csharp
|
CSharpLexer
/ Implab / AbstractPromise.cs
cin
|
r119 | using System; | ||
using Implab.Parallels; | ||||
using System.Threading; | ||||
using System.Reflection; | ||||
namespace Implab { | ||||
public abstract class AbstractPromise<THandler> { | ||||
const int UNRESOLVED_SATE = 0; | ||||
const int TRANSITIONAL_STATE = 1; | ||||
const int SUCCEEDED_STATE = 2; | ||||
const int REJECTED_STATE = 3; | ||||
const int CANCELLED_STATE = 4; | ||||
int m_state; | ||||
Exception m_error; | ||||
readonly AsyncQueue<THandler> m_handlers = new AsyncQueue<THandler>(); | ||||
#region state managment | ||||
bool BeginTransit() { | ||||
return UNRESOLVED_SATE == Interlocked.CompareExchange(ref m_state, TRANSITIONAL_STATE, UNRESOLVED_SATE); | ||||
} | ||||
void CompleteTransit(int state) { | ||||
if (TRANSITIONAL_STATE != Interlocked.CompareExchange(ref m_state, state, TRANSITIONAL_STATE)) | ||||
throw new InvalidOperationException("Can't complete transition when the object isn't in the transitional state"); | ||||
} | ||||
void WaitTransition() { | ||||
while (m_state == TRANSITIONAL_STATE) { | ||||
Thread.MemoryBarrier(); | ||||
} | ||||
} | ||||
protected void BeginSetResult() { | ||||
if (!BeginTransit()) { | ||||
WaitTransition(); | ||||
if (m_state != CANCELLED_STATE) | ||||
throw new InvalidOperationException("The promise is already resolved"); | ||||
} | ||||
} | ||||
protected void EndSetResult() { | ||||
CompleteTransit(SUCCEEDED_STATE); | ||||
OnSuccess(); | ||||
} | ||||
/// <summary> | ||||
/// Выполняет обещание, сообщая об ошибке | ||||
/// </summary> | ||||
/// <remarks> | ||||
/// Поскольку обещание должно работать в многопточной среде, при его выполнении сразу несколько потоков | ||||
/// могу вернуть ошибку, при этом только первая будет использована в качестве результата, остальные | ||||
/// будут проигнорированы. | ||||
/// </remarks> | ||||
/// <param name="error">Исключение возникшее при выполнении операции</param> | ||||
/// <exception cref="InvalidOperationException">Данное обещание уже выполнено</exception> | ||||
protected void SetError(Exception error) { | ||||
if (BeginTransit()) { | ||||
m_error = error is PromiseTransientException ? error.InnerException : error; | ||||
CompleteTransit(REJECTED_STATE); | ||||
OnError(); | ||||
} else { | ||||
WaitTransition(); | ||||
if (m_state == SUCCEEDED_STATE) | ||||
throw new InvalidOperationException("The promise is already resolved"); | ||||
} | ||||
} | ||||
/// <summary> | ||||
/// Отменяет операцию, если это возможно. | ||||
/// </summary> | ||||
/// <remarks>Для определения была ли операция отменена следует использовать свойство <see cref="IsCancelled"/>.</remarks> | ||||
protected void SetCancelled() { | ||||
if (BeginTransit()) { | ||||
CompleteTransit(CANCELLED_STATE); | ||||
OnCancelled(); | ||||
} | ||||
} | ||||
protected abstract void SignalSuccess(THandler handler); | ||||
protected abstract void SignalError(THandler handler, Exception error); | ||||
protected abstract void SignalCancelled(THandler handler); | ||||
void OnSuccess() { | ||||
THandler handler; | ||||
while (m_handlers.TryDequeue(out handler)) | ||||
SignalSuccess(handler); | ||||
} | ||||
void OnError() { | ||||
THandler handler; | ||||
while (m_handlers.TryDequeue(out handler)) | ||||
SignalError(handler,m_error); | ||||
} | ||||
void OnCancelled() { | ||||
THandler handler; | ||||
while (m_handlers.TryDequeue(out handler)) | ||||
SignalCancelled(handler); | ||||
} | ||||
#endregion | ||||
protected abstract void Listen(PromiseEventType events, Action handler); | ||||
#region synchronization traits | ||||
protected void WaitResult(int timeout) { | ||||
if (!IsResolved) { | ||||
var lk = new object(); | ||||
Listen(PromiseEventType.All, () => { | ||||
lock(lk) { | ||||
Monitor.Pulse(lk); | ||||
} | ||||
}); | ||||
lock (lk) { | ||||
while(!IsResolved) { | ||||
if(!Monitor.Wait(lk,timeout)) | ||||
throw new TimeoutException(); | ||||
} | ||||
} | ||||
} | ||||
switch (m_state) { | ||||
case SUCCEEDED_STATE: | ||||
return; | ||||
case CANCELLED_STATE: | ||||
throw new OperationCanceledException(); | ||||
case REJECTED_STATE: | ||||
throw new TargetInvocationException(m_error); | ||||
default: | ||||
throw new ApplicationException(String.Format("Invalid promise state {0}", m_state)); | ||||
} | ||||
} | ||||
#endregion | ||||
#region handlers managment | ||||
protected void AddHandler(THandler handler) { | ||||
if (IsResolved) { | ||||
InvokeHandler(handler); | ||||
} else { | ||||
// the promise is in the resolved state, just invoke the handler | ||||
m_handlers.Enqueue(handler); | ||||
if (IsResolved && m_handlers.TryDequeue(out handler)) | ||||
// if the promise have been resolved while we was adding the handler to the queue | ||||
// we can't guarantee that someone is still processing it | ||||
// therefore we need to fetch a handler from the queue and execute it | ||||
// note that fetched handler may be not the one that we have added | ||||
// even we can fetch no handlers at all :) | ||||
InvokeHandler(handler); | ||||
} | ||||
} | ||||
protected void InvokeHandler(THandler handler) { | ||||
switch (m_state) { | ||||
case SUCCEEDED_STATE: | ||||
SignalSuccess(handler); | ||||
break; | ||||
case CANCELLED_STATE: | ||||
SignalCancelled(handler); | ||||
break; | ||||
case REJECTED_STATE: | ||||
SignalError(handler, m_error); | ||||
break; | ||||
default: | ||||
throw new Exception(String.Format("Invalid promise state {0}", m_state)); | ||||
} | ||||
} | ||||
#endregion | ||||
#region IPromise implementation | ||||
public void Join(int timeout) { | ||||
WaitResult(timeout); | ||||
} | ||||
public void Join() { | ||||
WaitResult(-1); | ||||
} | ||||
public bool IsResolved { | ||||
get { | ||||
Thread.MemoryBarrier(); | ||||
return m_state > 1; | ||||
} | ||||
} | ||||
public bool IsCancelled { | ||||
get { | ||||
Thread.MemoryBarrier(); | ||||
return m_state == CANCELLED_STATE; | ||||
} | ||||
} | ||||
#endregion | ||||
#region ICancellable implementation | ||||
public void Cancel() { | ||||
SetCancelled(); | ||||
} | ||||
#endregion | ||||
} | ||||
} | ||||