##// END OF EJS Templates
Added 'Fail' method to RunnableComponent which allows component to move from...
Added 'Fail' method to RunnableComponent which allows component to move from Running to Failed state. Added PollingComponent a timer based runnable component More tests Added FailPromise a thin class to wrap exceptions Fixed error handling in SuccessPromise classes.

File last commit:

r186:75103928da09 ref20160224
r203:4d9830a9bbb8 v2
Show More
AbstractEvent.cs
300 lines | 10.4 KiB | text/x-csharp | CSharpLexer
cin
DRAFT: refactoring
r144 using System;
using Implab.Parallels;
using System.Threading;
using System.Reflection;
namespace Implab {
cin
RC: cancellation support for promises + tests
r145 public abstract class AbstractEvent<THandler> : ICancellationToken, ICancellable {
cin
DRAFT: refactoring
r144
const int UNRESOLVED_SATE = 0;
const int TRANSITIONAL_STATE = 1;
cin
Promises: SignalXXX methods merged into SignalHandler method....
r156 protected const int SUCCEEDED_STATE = 2;
protected const int REJECTED_STATE = 3;
protected const int CANCELLED_STATE = 4;
cin
DRAFT: refactoring
r144
const int CANCEL_NOT_REQUESTED = 0;
const int CANCEL_REQUESTING = 1;
const int CANCEL_REQUESTED = 2;
const int RESERVED_HANDLERS_COUNT = 4;
int m_state;
Exception m_error;
int m_handlersCount;
cin
Promises: SignalXXX methods merged into SignalHandler method....
r156 //readonly THandler[] m_handlers = new THandler[RESERVED_HANDLERS_COUNT];
THandler[] m_handlers;
cin
DRAFT: refactoring
r144 MTQueue<THandler> m_extraHandlers;
int m_handlerPointer = -1;
int m_handlersCommited;
int m_cancelRequest;
Exception m_cancelationReason;
MTQueue<Action<Exception>> m_cancelationHandlers;
#region state managment
bool BeginTransit() {
return UNRESOLVED_SATE == Interlocked.CompareExchange(ref m_state, TRANSITIONAL_STATE, UNRESOLVED_SATE);
}
void CompleteTransit(int state) {
if (TRANSITIONAL_STATE != Interlocked.CompareExchange(ref m_state, state, TRANSITIONAL_STATE))
throw new InvalidOperationException("Can't complete transition when the object isn't in the transitional state");
}
void WaitTransition() {
while (m_state == TRANSITIONAL_STATE) {
Thread.MemoryBarrier();
}
}
protected bool BeginSetResult() {
if (!BeginTransit()) {
WaitTransition();
if (m_state != CANCELLED_STATE)
throw new InvalidOperationException("The promise is already resolved");
return false;
}
return true;
}
protected void EndSetResult() {
CompleteTransit(SUCCEEDED_STATE);
cin
Promises: SignalXXX methods merged into SignalHandler method....
r156 Signal();
cin
DRAFT: refactoring
r144 }
/// <summary>
/// Выполняет обещание, сообщая об ошибке
/// </summary>
/// <remarks>
/// Поскольку обещание должно работать в многопточной среде, при его выполнении сразу несколько потоков
/// могу вернуть ошибку, при этом только первая будет использована в качестве результата, остальные
/// будут проигнорированы.
/// </remarks>
/// <param name="error">Исключение возникшее при выполнении операции</param>
/// <exception cref="InvalidOperationException">Данное обещание уже выполнено</exception>
protected void SetError(Exception error) {
if (BeginTransit()) {
cin
working on cancelation and error handling
r186 m_error = error;
CompleteTransit(REJECTED_STATE);
cin
runnable component, work in progress
r185
cin
Promises: SignalXXX methods merged into SignalHandler method....
r156 Signal();
cin
DRAFT: refactoring
r144 } else {
WaitTransition();
cin
working on cancelation and error handling
r186 if (m_state == SUCCEEDED_STATE)
cin
DRAFT: refactoring
r144 throw new InvalidOperationException("The promise is already resolved");
}
}
/// <summary>
/// Отменяет операцию, если это возможно.
/// </summary>
/// <remarks>Для определения была ли операция отменена следует использовать свойство <see cref="IsCancelled"/>.</remarks>
protected void SetCancelled(Exception reason) {
if (BeginTransit()) {
m_error = reason;
CompleteTransit(CANCELLED_STATE);
cin
Promises: SignalXXX methods merged into SignalHandler method....
r156 Signal();
cin
DRAFT: refactoring
r144 }
}
cin
Promises: SignalXXX methods merged into SignalHandler method....
r156 protected abstract void SignalHandler(THandler handler, int signal);
cin
DRAFT: refactoring
r144
cin
Promises: SignalXXX methods merged into SignalHandler method....
r156 void Signal() {
cin
DRAFT: refactoring
r144 var hp = m_handlerPointer;
var slot = hp +1 ;
while (slot < m_handlersCommited) {
if (Interlocked.CompareExchange(ref m_handlerPointer, slot, hp) == hp) {
cin
Promises: SignalXXX methods merged into SignalHandler method....
r156 SignalHandler(m_handlers[slot], m_state);
cin
DRAFT: refactoring
r144 }
hp = m_handlerPointer;
slot = hp +1 ;
}
if (m_extraHandlers != null) {
THandler handler;
while (m_extraHandlers.TryDequeue(out handler))
cin
Promises: SignalXXX methods merged into SignalHandler method....
r156 SignalHandler(handler, m_state);
cin
DRAFT: refactoring
r144 }
}
#endregion
protected abstract Signal GetResolveSignal();
#region synchronization traits
protected void WaitResult(int timeout) {
cin
fixed timeout handling in promises
r148 if (!(IsResolved || GetResolveSignal().Wait(timeout)))
throw new TimeoutException();
cin
DRAFT: refactoring
r144
switch (m_state) {
case SUCCEEDED_STATE:
return;
case CANCELLED_STATE:
cin
working on cancelation and error handling
r186 throw new OperationCanceledException("The operation has been cancelled", m_error);
cin
DRAFT: refactoring
r144 case REJECTED_STATE:
throw new TargetInvocationException(m_error);
default:
cin
working on cancelation and error handling
r186 throw new ApplicationException(String.Format("The promise state {0} is invalid", m_state));
cin
DRAFT: refactoring
r144 }
}
#endregion
#region handlers managment
protected void AddHandler(THandler handler) {
if (m_state > 1) {
// the promise is in the resolved state, just invoke the handler
cin
Promises: SignalXXX methods merged into SignalHandler method....
r156 SignalHandler(handler, m_state);
cin
DRAFT: refactoring
r144 } else {
var slot = Interlocked.Increment(ref m_handlersCount) - 1;
if (slot < RESERVED_HANDLERS_COUNT) {
cin
fixed regression: race condition in Promise...
r160 if (slot == 0) {
m_handlers = new THandler[RESERVED_HANDLERS_COUNT];
} else {
while (m_handlers == null)
Thread.MemoryBarrier();
}
cin
Promises: SignalXXX methods merged into SignalHandler method....
r156
cin
DRAFT: refactoring
r144 m_handlers[slot] = handler;
while (slot != Interlocked.CompareExchange(ref m_handlersCommited, slot + 1, slot)) {
}
if (m_state > 1) {
do {
var hp = m_handlerPointer;
slot = hp + 1;
if (slot < m_handlersCommited) {
if (Interlocked.CompareExchange(ref m_handlerPointer, slot, hp) != hp)
continue;
cin
Promises: SignalXXX methods merged into SignalHandler method....
r156 SignalHandler(m_handlers[slot], m_state);
cin
DRAFT: refactoring
r144 }
break;
} while(true);
}
} else {
if (slot == RESERVED_HANDLERS_COUNT) {
m_extraHandlers = new MTQueue<THandler>();
} else {
while (m_extraHandlers == null)
Thread.MemoryBarrier();
}
m_extraHandlers.Enqueue(handler);
if (m_state > 1 && m_extraHandlers.TryDequeue(out handler))
// if the promise have been resolved while we was adding the handler to the queue
// we can't guarantee that someone is still processing it
// therefore we need to fetch a handler from the queue and execute it
// note that fetched handler may be not the one that we have added
// even we can fetch no handlers at all :)
cin
Promises: SignalXXX methods merged into SignalHandler method....
r156 SignalHandler(handler, m_state);
cin
DRAFT: refactoring
r144 }
}
}
#endregion
#region IPromise implementation
public bool IsResolved {
get {
Thread.MemoryBarrier();
return m_state > 1;
}
}
public bool IsCancelled {
get {
Thread.MemoryBarrier();
return m_state == CANCELLED_STATE;
}
}
#endregion
public Exception Error {
get {
return m_error;
}
}
cin
RC: cancellation support for promises + tests
r145 public bool CancelOperationIfRequested() {
if (IsCancellationRequested) {
CancelOperation(CancellationReason);
return true;
}
return false;
cin
DRAFT: refactoring
r144 }
public virtual void CancelOperation(Exception reason) {
SetCancelled(reason);
}
cin
RC: cancellation support for promises + tests
r145 public void CancellationRequested(Action<Exception> handler) {
cin
DRAFT: refactoring
r144 Safe.ArgumentNotNull(handler, "handler");
cin
RC: cancellation support for promises + tests
r145 if (IsCancellationRequested)
handler(CancellationReason);
cin
DRAFT: refactoring
r144
if (m_cancelationHandlers == null)
Interlocked.CompareExchange(ref m_cancelationHandlers, new MTQueue<Action<Exception>>(), null);
m_cancelationHandlers.Enqueue(handler);
cin
RC: cancellation support for promises + tests
r145 if (IsCancellationRequested && m_cancelationHandlers.TryDequeue(out handler))
cin
DRAFT: refactoring
r144 // TryDeque implies MemoryBarrier()
handler(m_cancelationReason);
}
cin
RC: cancellation support for promises + tests
r145 public bool IsCancellationRequested {
cin
DRAFT: refactoring
r144 get {
do {
if (m_cancelRequest == CANCEL_NOT_REQUESTED)
return false;
if (m_cancelRequest == CANCEL_REQUESTED)
return true;
Thread.MemoryBarrier();
} while(true);
}
}
cin
RC: cancellation support for promises + tests
r145 public Exception CancellationReason {
cin
DRAFT: refactoring
r144 get {
do {
Thread.MemoryBarrier();
} while(m_cancelRequest == CANCEL_REQUESTING);
return m_cancelationReason;
}
}
#region ICancellable implementation
public void Cancel() {
Cancel(null);
}
public void Cancel(Exception reason) {
cin
RC: cancellation support for promises + tests
r145 if (CANCEL_NOT_REQUESTED == Interlocked.CompareExchange(ref m_cancelRequest, CANCEL_REQUESTING, CANCEL_NOT_REQUESTED)) {
cin
DRAFT: refactoring
r144 m_cancelationReason = reason;
m_cancelRequest = CANCEL_REQUESTED;
if (m_cancelationHandlers != null) {
Action<Exception> handler;
while (m_cancelationHandlers.TryDequeue(out handler))
handler(m_cancelationReason);
}
}
}
#endregion
}
}