AbstractPromise.cs
289 lines
| 9.8 KiB
| text/x-csharp
|
CSharpLexer
/ Implab / AbstractPromise.cs
cin
|
r119 | using System; | ||
using Implab.Parallels; | ||||
using System.Threading; | ||||
using System.Reflection; | ||||
namespace Implab { | ||||
public abstract class AbstractPromise<THandler> { | ||||
const int UNRESOLVED_SATE = 0; | ||||
const int TRANSITIONAL_STATE = 1; | ||||
const int SUCCEEDED_STATE = 2; | ||||
const int REJECTED_STATE = 3; | ||||
const int CANCELLED_STATE = 4; | ||||
cin
|
r125 | const int RESERVED_HANDLERS_COUNT = 4; | ||
cin
|
r119 | int m_state; | ||
Exception m_error; | ||||
cin
|
r125 | int m_handlersCount; | ||
cin
|
r119 | |||
cin
|
r125 | readonly THandler[] m_handlers = new THandler[RESERVED_HANDLERS_COUNT]; | ||
MTQueue<THandler> m_extraHandlers; | ||||
int m_handlerPointer = -1; | ||||
int m_handlersCommited; | ||||
cin
|
r119 | |||
#region state managment | ||||
bool BeginTransit() { | ||||
return UNRESOLVED_SATE == Interlocked.CompareExchange(ref m_state, TRANSITIONAL_STATE, UNRESOLVED_SATE); | ||||
} | ||||
void CompleteTransit(int state) { | ||||
if (TRANSITIONAL_STATE != Interlocked.CompareExchange(ref m_state, state, TRANSITIONAL_STATE)) | ||||
throw new InvalidOperationException("Can't complete transition when the object isn't in the transitional state"); | ||||
} | ||||
void WaitTransition() { | ||||
while (m_state == TRANSITIONAL_STATE) { | ||||
Thread.MemoryBarrier(); | ||||
} | ||||
} | ||||
protected void BeginSetResult() { | ||||
if (!BeginTransit()) { | ||||
WaitTransition(); | ||||
if (m_state != CANCELLED_STATE) | ||||
throw new InvalidOperationException("The promise is already resolved"); | ||||
} | ||||
} | ||||
protected void EndSetResult() { | ||||
CompleteTransit(SUCCEEDED_STATE); | ||||
OnSuccess(); | ||||
} | ||||
/// <summary> | ||||
/// Выполняет обещание, сообщая об ошибке | ||||
/// </summary> | ||||
/// <remarks> | ||||
/// Поскольку обещание должно работать в многопточной среде, при его выполнении сразу несколько потоков | ||||
/// могу вернуть ошибку, при этом только первая будет использована в качестве результата, остальные | ||||
/// будут проигнорированы. | ||||
/// </remarks> | ||||
/// <param name="error">Исключение возникшее при выполнении операции</param> | ||||
/// <exception cref="InvalidOperationException">Данное обещание уже выполнено</exception> | ||||
protected void SetError(Exception error) { | ||||
if (BeginTransit()) { | ||||
m_error = error is PromiseTransientException ? error.InnerException : error; | ||||
CompleteTransit(REJECTED_STATE); | ||||
OnError(); | ||||
} else { | ||||
WaitTransition(); | ||||
if (m_state == SUCCEEDED_STATE) | ||||
throw new InvalidOperationException("The promise is already resolved"); | ||||
} | ||||
} | ||||
/// <summary> | ||||
/// Отменяет операцию, если это возможно. | ||||
/// </summary> | ||||
/// <remarks>Для определения была ли операция отменена следует использовать свойство <see cref="IsCancelled"/>.</remarks> | ||||
protected void SetCancelled() { | ||||
if (BeginTransit()) { | ||||
CompleteTransit(CANCELLED_STATE); | ||||
OnCancelled(); | ||||
} | ||||
} | ||||
protected abstract void SignalSuccess(THandler handler); | ||||
protected abstract void SignalError(THandler handler, Exception error); | ||||
protected abstract void SignalCancelled(THandler handler); | ||||
void OnSuccess() { | ||||
cin
|
r125 | var hp = m_handlerPointer; | ||
var slot = hp +1 ; | ||||
while (slot < m_handlersCommited) { | ||||
if (Interlocked.CompareExchange(ref m_handlerPointer, slot, hp) == hp) { | ||||
SignalSuccess(m_handlers[slot]); | ||||
} | ||||
hp = m_handlerPointer; | ||||
slot = hp +1 ; | ||||
} | ||||
if (m_extraHandlers != null) { | ||||
THandler handler; | ||||
while (m_extraHandlers.TryDequeue(out handler)) | ||||
SignalSuccess(handler); | ||||
} | ||||
cin
|
r119 | } | ||
void OnError() { | ||||
cin
|
r125 | var hp = m_handlerPointer; | ||
var slot = hp +1 ; | ||||
while (slot < m_handlersCommited) { | ||||
if (Interlocked.CompareExchange(ref m_handlerPointer, slot, hp) == hp) { | ||||
SignalError(m_handlers[slot],m_error); | ||||
} | ||||
hp = m_handlerPointer; | ||||
slot = hp +1 ; | ||||
} | ||||
if (m_extraHandlers != null) { | ||||
THandler handler; | ||||
while (m_extraHandlers.TryDequeue(out handler)) | ||||
SignalError(handler, m_error); | ||||
} | ||||
cin
|
r119 | } | ||
void OnCancelled() { | ||||
cin
|
r125 | var hp = m_handlerPointer; | ||
var slot = hp +1 ; | ||||
while (slot < m_handlersCommited) { | ||||
if (Interlocked.CompareExchange(ref m_handlerPointer, slot, hp) == hp) { | ||||
SignalCancelled(m_handlers[slot]); | ||||
} | ||||
hp = m_handlerPointer; | ||||
slot = hp +1 ; | ||||
} | ||||
if (m_extraHandlers != null) { | ||||
THandler handler; | ||||
while (m_extraHandlers.TryDequeue(out handler)) | ||||
SignalCancelled(handler); | ||||
} | ||||
cin
|
r119 | } | ||
#endregion | ||||
protected abstract void Listen(PromiseEventType events, Action handler); | ||||
#region synchronization traits | ||||
protected void WaitResult(int timeout) { | ||||
if (!IsResolved) { | ||||
var lk = new object(); | ||||
Listen(PromiseEventType.All, () => { | ||||
lock(lk) { | ||||
Monitor.Pulse(lk); | ||||
} | ||||
}); | ||||
lock (lk) { | ||||
while(!IsResolved) { | ||||
if(!Monitor.Wait(lk,timeout)) | ||||
throw new TimeoutException(); | ||||
} | ||||
} | ||||
} | ||||
switch (m_state) { | ||||
case SUCCEEDED_STATE: | ||||
return; | ||||
case CANCELLED_STATE: | ||||
throw new OperationCanceledException(); | ||||
case REJECTED_STATE: | ||||
throw new TargetInvocationException(m_error); | ||||
default: | ||||
throw new ApplicationException(String.Format("Invalid promise state {0}", m_state)); | ||||
} | ||||
} | ||||
#endregion | ||||
#region handlers managment | ||||
protected void AddHandler(THandler handler) { | ||||
cin
|
r125 | if (m_state > 1) { | ||
cin
|
r119 | // the promise is in the resolved state, just invoke the handler | ||
cin
|
r125 | InvokeHandler(handler); | ||
} else { | ||||
var slot = Interlocked.Increment(ref m_handlersCount) - 1; | ||||
if (slot < RESERVED_HANDLERS_COUNT) { | ||||
m_handlers[slot] = handler; | ||||
while (slot != Interlocked.CompareExchange(ref m_handlersCommited, slot + 1, slot)) { | ||||
} | ||||
cin
|
r119 | |||
cin
|
r125 | if (m_state > 1) { | ||
do { | ||||
var hp = m_handlerPointer; | ||||
slot = hp + 1; | ||||
if (slot < m_handlersCommited) { | ||||
if (Interlocked.CompareExchange(ref m_handlerPointer, slot, hp) != hp) | ||||
continue; | ||||
InvokeHandler(m_handlers[slot]); | ||||
} | ||||
break; | ||||
} while(true); | ||||
} | ||||
} else { | ||||
if (slot == RESERVED_HANDLERS_COUNT) { | ||||
m_extraHandlers = new MTQueue<THandler>(); | ||||
} else { | ||||
while (m_extraHandlers == null) | ||||
Thread.MemoryBarrier(); | ||||
} | ||||
cin
|
r119 | |||
cin
|
r125 | m_extraHandlers.Enqueue(handler); | ||
if (m_state > 1 && m_extraHandlers.TryDequeue(out handler)) | ||||
cin
|
r119 | // if the promise have been resolved while we was adding the handler to the queue | ||
// we can't guarantee that someone is still processing it | ||||
// therefore we need to fetch a handler from the queue and execute it | ||||
// note that fetched handler may be not the one that we have added | ||||
// even we can fetch no handlers at all :) | ||||
InvokeHandler(handler); | ||||
cin
|
r125 | } | ||
cin
|
r119 | } | ||
} | ||||
protected void InvokeHandler(THandler handler) { | ||||
switch (m_state) { | ||||
case SUCCEEDED_STATE: | ||||
SignalSuccess(handler); | ||||
break; | ||||
case CANCELLED_STATE: | ||||
SignalCancelled(handler); | ||||
break; | ||||
case REJECTED_STATE: | ||||
SignalError(handler, m_error); | ||||
break; | ||||
default: | ||||
throw new Exception(String.Format("Invalid promise state {0}", m_state)); | ||||
} | ||||
} | ||||
#endregion | ||||
#region IPromise implementation | ||||
public void Join(int timeout) { | ||||
WaitResult(timeout); | ||||
} | ||||
public void Join() { | ||||
WaitResult(-1); | ||||
} | ||||
public bool IsResolved { | ||||
get { | ||||
Thread.MemoryBarrier(); | ||||
return m_state > 1; | ||||
} | ||||
} | ||||
public bool IsCancelled { | ||||
get { | ||||
Thread.MemoryBarrier(); | ||||
return m_state == CANCELLED_STATE; | ||||
} | ||||
} | ||||
#endregion | ||||
#region ICancellable implementation | ||||
public void Cancel() { | ||||
SetCancelled(); | ||||
} | ||||
#endregion | ||||
} | ||||
} | ||||