##// END OF EJS Templates
refactoring, code cleanup
refactoring, code cleanup

File last commit:

r156:97fbbf816844 v2
r158:130781364799 v2
Show More
AbstractEvent.cs
300 lines | 10.5 KiB | text/x-csharp | CSharpLexer
cin
DRAFT: refactoring
r144 using System;
using Implab.Parallels;
using System.Threading;
using System.Reflection;
namespace Implab {
cin
RC: cancellation support for promises + tests
r145 public abstract class AbstractEvent<THandler> : ICancellationToken, ICancellable {
cin
DRAFT: refactoring
r144
const int UNRESOLVED_SATE = 0;
const int TRANSITIONAL_STATE = 1;
cin
Promises: SignalXXX methods merged into SignalHandler method....
r156 protected const int SUCCEEDED_STATE = 2;
protected const int REJECTED_STATE = 3;
protected const int CANCELLED_STATE = 4;
cin
DRAFT: refactoring
r144
const int CANCEL_NOT_REQUESTED = 0;
const int CANCEL_REQUESTING = 1;
const int CANCEL_REQUESTED = 2;
const int RESERVED_HANDLERS_COUNT = 4;
int m_state;
Exception m_error;
int m_handlersCount;
cin
Promises: SignalXXX methods merged into SignalHandler method....
r156 //readonly THandler[] m_handlers = new THandler[RESERVED_HANDLERS_COUNT];
THandler[] m_handlers;
cin
DRAFT: refactoring
r144 MTQueue<THandler> m_extraHandlers;
int m_handlerPointer = -1;
int m_handlersCommited;
int m_cancelRequest;
Exception m_cancelationReason;
MTQueue<Action<Exception>> m_cancelationHandlers;
#region state managment
bool BeginTransit() {
return UNRESOLVED_SATE == Interlocked.CompareExchange(ref m_state, TRANSITIONAL_STATE, UNRESOLVED_SATE);
}
void CompleteTransit(int state) {
if (TRANSITIONAL_STATE != Interlocked.CompareExchange(ref m_state, state, TRANSITIONAL_STATE))
throw new InvalidOperationException("Can't complete transition when the object isn't in the transitional state");
}
void WaitTransition() {
while (m_state == TRANSITIONAL_STATE) {
Thread.MemoryBarrier();
}
}
protected bool BeginSetResult() {
if (!BeginTransit()) {
WaitTransition();
if (m_state != CANCELLED_STATE)
throw new InvalidOperationException("The promise is already resolved");
return false;
}
return true;
}
protected void EndSetResult() {
CompleteTransit(SUCCEEDED_STATE);
cin
Promises: SignalXXX methods merged into SignalHandler method....
r156 Signal();
cin
DRAFT: refactoring
r144 }
/// <summary>
/// Выполняет обещание, сообщая об ошибке
/// </summary>
/// <remarks>
/// Поскольку обещание должно работать в многопточной среде, при его выполнении сразу несколько потоков
/// могу вернуть ошибку, при этом только первая будет использована в качестве результата, остальные
/// будут проигнорированы.
/// </remarks>
/// <param name="error">Исключение возникшее при выполнении операции</param>
/// <exception cref="InvalidOperationException">Данное обещание уже выполнено</exception>
protected void SetError(Exception error) {
if (BeginTransit()) {
if (error is OperationCanceledException) {
cin
Promises: SignalXXX methods merged into SignalHandler method....
r156 m_error = error.InnerException;
cin
DRAFT: refactoring
r144 CompleteTransit(CANCELLED_STATE);
} else {
m_error = error is PromiseTransientException ? error.InnerException : error;
CompleteTransit(REJECTED_STATE);
}
cin
Promises: SignalXXX methods merged into SignalHandler method....
r156 Signal();
cin
DRAFT: refactoring
r144 } else {
WaitTransition();
if (m_state == SUCCEEDED_STATE)
throw new InvalidOperationException("The promise is already resolved");
}
}
/// <summary>
/// Отменяет операцию, если это возможно.
/// </summary>
/// <remarks>Для определения была ли операция отменена следует использовать свойство <see cref="IsCancelled"/>.</remarks>
protected void SetCancelled(Exception reason) {
if (BeginTransit()) {
m_error = reason;
CompleteTransit(CANCELLED_STATE);
cin
Promises: SignalXXX methods merged into SignalHandler method....
r156 Signal();
cin
DRAFT: refactoring
r144 }
}
cin
Promises: SignalXXX methods merged into SignalHandler method....
r156 protected abstract void SignalHandler(THandler handler, int signal);
cin
DRAFT: refactoring
r144
cin
Promises: SignalXXX methods merged into SignalHandler method....
r156 void Signal() {
cin
DRAFT: refactoring
r144 var hp = m_handlerPointer;
var slot = hp +1 ;
while (slot < m_handlersCommited) {
if (Interlocked.CompareExchange(ref m_handlerPointer, slot, hp) == hp) {
cin
Promises: SignalXXX methods merged into SignalHandler method....
r156 SignalHandler(m_handlers[slot], m_state);
cin
DRAFT: refactoring
r144 }
hp = m_handlerPointer;
slot = hp +1 ;
}
if (m_extraHandlers != null) {
THandler handler;
while (m_extraHandlers.TryDequeue(out handler))
cin
Promises: SignalXXX methods merged into SignalHandler method....
r156 SignalHandler(handler, m_state);
cin
DRAFT: refactoring
r144 }
}
#endregion
protected abstract Signal GetResolveSignal();
#region synchronization traits
protected void WaitResult(int timeout) {
cin
fixed timeout handling in promises
r148 if (!(IsResolved || GetResolveSignal().Wait(timeout)))
throw new TimeoutException();
cin
DRAFT: refactoring
r144
switch (m_state) {
case SUCCEEDED_STATE:
return;
case CANCELLED_STATE:
throw new OperationCanceledException();
case REJECTED_STATE:
throw new TargetInvocationException(m_error);
default:
throw new ApplicationException(String.Format("Invalid promise state {0}", m_state));
}
}
#endregion
#region handlers managment
protected void AddHandler(THandler handler) {
if (m_state > 1) {
// the promise is in the resolved state, just invoke the handler
cin
Promises: SignalXXX methods merged into SignalHandler method....
r156 SignalHandler(handler, m_state);
cin
DRAFT: refactoring
r144 } else {
var slot = Interlocked.Increment(ref m_handlersCount) - 1;
if (slot < RESERVED_HANDLERS_COUNT) {
cin
Promises: SignalXXX methods merged into SignalHandler method....
r156 if (slot == 0)
Interlocked.CompareExchange(ref m_handlers, new THandler[RESERVED_HANDLERS_COUNT], null);
cin
DRAFT: refactoring
r144 m_handlers[slot] = handler;
while (slot != Interlocked.CompareExchange(ref m_handlersCommited, slot + 1, slot)) {
}
if (m_state > 1) {
do {
var hp = m_handlerPointer;
slot = hp + 1;
if (slot < m_handlersCommited) {
if (Interlocked.CompareExchange(ref m_handlerPointer, slot, hp) != hp)
continue;
cin
Promises: SignalXXX methods merged into SignalHandler method....
r156 SignalHandler(m_handlers[slot], m_state);
cin
DRAFT: refactoring
r144 }
break;
} while(true);
}
} else {
if (slot == RESERVED_HANDLERS_COUNT) {
m_extraHandlers = new MTQueue<THandler>();
} else {
while (m_extraHandlers == null)
Thread.MemoryBarrier();
}
m_extraHandlers.Enqueue(handler);
if (m_state > 1 && m_extraHandlers.TryDequeue(out handler))
// if the promise have been resolved while we was adding the handler to the queue
// we can't guarantee that someone is still processing it
// therefore we need to fetch a handler from the queue and execute it
// note that fetched handler may be not the one that we have added
// even we can fetch no handlers at all :)
cin
Promises: SignalXXX methods merged into SignalHandler method....
r156 SignalHandler(handler, m_state);
cin
DRAFT: refactoring
r144 }
}
}
#endregion
#region IPromise implementation
public bool IsResolved {
get {
Thread.MemoryBarrier();
return m_state > 1;
}
}
public bool IsCancelled {
get {
Thread.MemoryBarrier();
return m_state == CANCELLED_STATE;
}
}
#endregion
public Exception Error {
get {
return m_error;
}
}
cin
RC: cancellation support for promises + tests
r145 public bool CancelOperationIfRequested() {
if (IsCancellationRequested) {
CancelOperation(CancellationReason);
return true;
}
return false;
cin
DRAFT: refactoring
r144 }
public virtual void CancelOperation(Exception reason) {
SetCancelled(reason);
}
cin
RC: cancellation support for promises + tests
r145 public void CancellationRequested(Action<Exception> handler) {
cin
DRAFT: refactoring
r144 Safe.ArgumentNotNull(handler, "handler");
cin
RC: cancellation support for promises + tests
r145 if (IsCancellationRequested)
handler(CancellationReason);
cin
DRAFT: refactoring
r144
if (m_cancelationHandlers == null)
Interlocked.CompareExchange(ref m_cancelationHandlers, new MTQueue<Action<Exception>>(), null);
m_cancelationHandlers.Enqueue(handler);
cin
RC: cancellation support for promises + tests
r145 if (IsCancellationRequested && m_cancelationHandlers.TryDequeue(out handler))
cin
DRAFT: refactoring
r144 // TryDeque implies MemoryBarrier()
handler(m_cancelationReason);
}
cin
RC: cancellation support for promises + tests
r145 public bool IsCancellationRequested {
cin
DRAFT: refactoring
r144 get {
do {
if (m_cancelRequest == CANCEL_NOT_REQUESTED)
return false;
if (m_cancelRequest == CANCEL_REQUESTED)
return true;
Thread.MemoryBarrier();
} while(true);
}
}
cin
RC: cancellation support for promises + tests
r145 public Exception CancellationReason {
cin
DRAFT: refactoring
r144 get {
do {
Thread.MemoryBarrier();
} while(m_cancelRequest == CANCEL_REQUESTING);
return m_cancelationReason;
}
}
#region ICancellable implementation
public void Cancel() {
Cancel(null);
}
public void Cancel(Exception reason) {
cin
RC: cancellation support for promises + tests
r145 if (CANCEL_NOT_REQUESTED == Interlocked.CompareExchange(ref m_cancelRequest, CANCEL_REQUESTING, CANCEL_NOT_REQUESTED)) {
cin
DRAFT: refactoring
r144 m_cancelationReason = reason;
m_cancelRequest = CANCEL_REQUESTED;
if (m_cancelationHandlers != null) {
Action<Exception> handler;
while (m_cancelationHandlers.TryDequeue(out handler))
handler(m_cancelationReason);
}
}
}
#endregion
}
}