##// END OF EJS Templates
async queue improvements
async queue improvements

File last commit:

r119:2573b562e328 v2
r123:f4d6ea6969cc v2
Show More
AbstractPromise.cs
219 lines | 7.2 KiB | text/x-csharp | CSharpLexer
/ Implab / AbstractPromise.cs
cin
Promises rewritten, added improved version of AsyncQueue
r119 using System;
using Implab.Parallels;
using System.Threading;
using System.Reflection;
namespace Implab {
public abstract class AbstractPromise<THandler> {
const int UNRESOLVED_SATE = 0;
const int TRANSITIONAL_STATE = 1;
const int SUCCEEDED_STATE = 2;
const int REJECTED_STATE = 3;
const int CANCELLED_STATE = 4;
int m_state;
Exception m_error;
readonly AsyncQueue<THandler> m_handlers = new AsyncQueue<THandler>();
#region state managment
bool BeginTransit() {
return UNRESOLVED_SATE == Interlocked.CompareExchange(ref m_state, TRANSITIONAL_STATE, UNRESOLVED_SATE);
}
void CompleteTransit(int state) {
if (TRANSITIONAL_STATE != Interlocked.CompareExchange(ref m_state, state, TRANSITIONAL_STATE))
throw new InvalidOperationException("Can't complete transition when the object isn't in the transitional state");
}
void WaitTransition() {
while (m_state == TRANSITIONAL_STATE) {
Thread.MemoryBarrier();
}
}
protected void BeginSetResult() {
if (!BeginTransit()) {
WaitTransition();
if (m_state != CANCELLED_STATE)
throw new InvalidOperationException("The promise is already resolved");
}
}
protected void EndSetResult() {
CompleteTransit(SUCCEEDED_STATE);
OnSuccess();
}
/// <summary>
/// Выполняет обещание, сообщая об ошибке
/// </summary>
/// <remarks>
/// Поскольку обещание должно работать в многопточной среде, при его выполнении сразу несколько потоков
/// могу вернуть ошибку, при этом только первая будет использована в качестве результата, остальные
/// будут проигнорированы.
/// </remarks>
/// <param name="error">Исключение возникшее при выполнении операции</param>
/// <exception cref="InvalidOperationException">Данное обещание уже выполнено</exception>
protected void SetError(Exception error) {
if (BeginTransit()) {
m_error = error is PromiseTransientException ? error.InnerException : error;
CompleteTransit(REJECTED_STATE);
OnError();
} else {
WaitTransition();
if (m_state == SUCCEEDED_STATE)
throw new InvalidOperationException("The promise is already resolved");
}
}
/// <summary>
/// Отменяет операцию, если это возможно.
/// </summary>
/// <remarks>Для определения была ли операция отменена следует использовать свойство <see cref="IsCancelled"/>.</remarks>
protected void SetCancelled() {
if (BeginTransit()) {
CompleteTransit(CANCELLED_STATE);
OnCancelled();
}
}
protected abstract void SignalSuccess(THandler handler);
protected abstract void SignalError(THandler handler, Exception error);
protected abstract void SignalCancelled(THandler handler);
void OnSuccess() {
THandler handler;
while (m_handlers.TryDequeue(out handler))
SignalSuccess(handler);
}
void OnError() {
THandler handler;
while (m_handlers.TryDequeue(out handler))
SignalError(handler,m_error);
}
void OnCancelled() {
THandler handler;
while (m_handlers.TryDequeue(out handler))
SignalCancelled(handler);
}
#endregion
protected abstract void Listen(PromiseEventType events, Action handler);
#region synchronization traits
protected void WaitResult(int timeout) {
if (!IsResolved) {
var lk = new object();
Listen(PromiseEventType.All, () => {
lock(lk) {
Monitor.Pulse(lk);
}
});
lock (lk) {
while(!IsResolved) {
if(!Monitor.Wait(lk,timeout))
throw new TimeoutException();
}
}
}
switch (m_state) {
case SUCCEEDED_STATE:
return;
case CANCELLED_STATE:
throw new OperationCanceledException();
case REJECTED_STATE:
throw new TargetInvocationException(m_error);
default:
throw new ApplicationException(String.Format("Invalid promise state {0}", m_state));
}
}
#endregion
#region handlers managment
protected void AddHandler(THandler handler) {
if (IsResolved) {
InvokeHandler(handler);
} else {
// the promise is in the resolved state, just invoke the handler
m_handlers.Enqueue(handler);
if (IsResolved && m_handlers.TryDequeue(out handler))
// if the promise have been resolved while we was adding the handler to the queue
// we can't guarantee that someone is still processing it
// therefore we need to fetch a handler from the queue and execute it
// note that fetched handler may be not the one that we have added
// even we can fetch no handlers at all :)
InvokeHandler(handler);
}
}
protected void InvokeHandler(THandler handler) {
switch (m_state) {
case SUCCEEDED_STATE:
SignalSuccess(handler);
break;
case CANCELLED_STATE:
SignalCancelled(handler);
break;
case REJECTED_STATE:
SignalError(handler, m_error);
break;
default:
throw new Exception(String.Format("Invalid promise state {0}", m_state));
}
}
#endregion
#region IPromise implementation
public void Join(int timeout) {
WaitResult(timeout);
}
public void Join() {
WaitResult(-1);
}
public bool IsResolved {
get {
Thread.MemoryBarrier();
return m_state > 1;
}
}
public bool IsCancelled {
get {
Thread.MemoryBarrier();
return m_state == CANCELLED_STATE;
}
}
#endregion
#region ICancellable implementation
public void Cancel() {
SetCancelled();
}
#endregion
}
}