AbstractEvent.cs
170 lines
| 5.7 KiB
| text/x-csharp
|
CSharpLexer
/ Implab / AbstractEvent.cs
cin
|
r144 | using System; | ||
using Implab.Parallels; | ||||
using System.Threading; | ||||
using System.Reflection; | ||||
cin
|
r242 | using System.Diagnostics; | ||
cin
|
r144 | |||
namespace Implab { | ||||
cin
|
r242 | public abstract class AbstractEvent<THandler> where THandler : class { | ||
cin
|
r144 | |||
cin
|
r242 | const int PENDING_SATE = 0; | ||
protected const int TRANSITIONAL_STATE = 1; | ||||
cin
|
r156 | protected const int SUCCEEDED_STATE = 2; | ||
protected const int REJECTED_STATE = 3; | ||||
cin
|
r144 | |||
cin
|
r242 | volatile int m_state; | ||
cin
|
r144 | Exception m_error; | ||
cin
|
r242 | THandler m_handler; | ||
cin
|
r233 | SimpleAsyncQueue<THandler> m_extraHandlers; | ||
cin
|
r144 | |||
#region state managment | ||||
cin
|
r242 | protected bool BeginTransit() { | ||
return PENDING_SATE == Interlocked.CompareExchange(ref m_state, TRANSITIONAL_STATE, PENDING_SATE); | ||||
cin
|
r144 | } | ||
cin
|
r242 | protected void CompleteTransit(int state) { | ||
#if DEBUG | ||||
cin
|
r144 | if (TRANSITIONAL_STATE != Interlocked.CompareExchange(ref m_state, state, TRANSITIONAL_STATE)) | ||
throw new InvalidOperationException("Can't complete transition when the object isn't in the transitional state"); | ||||
cin
|
r242 | #else | ||
m_state = state; | ||||
#endif | ||||
Signal(); | ||||
cin
|
r144 | } | ||
cin
|
r242 | protected void WaitTransition() { | ||
if (m_state == TRANSITIONAL_STATE) { | ||||
SpinWait spin; | ||||
do { | ||||
spin.SpinOnce(); | ||||
} while (m_state == TRANSITIONAL_STATE); | ||||
cin
|
r144 | } | ||
} | ||||
protected bool BeginSetResult() { | ||||
if (!BeginTransit()) { | ||||
WaitTransition(); | ||||
return false; | ||||
} | ||||
return true; | ||||
} | ||||
protected void EndSetResult() { | ||||
CompleteTransit(SUCCEEDED_STATE); | ||||
} | ||||
/// <summary> | ||||
/// Выполняет обещание, сообщая об ошибке | ||||
/// </summary> | ||||
/// <remarks> | ||||
/// Поскольку обещание должно работать в многопточной среде, при его выполнении сразу несколько потоков | ||||
/// могу вернуть ошибку, при этом только первая будет использована в качестве результата, остальные | ||||
/// будут проигнорированы. | ||||
/// </remarks> | ||||
/// <param name="error">Исключение возникшее при выполнении операции</param> | ||||
/// <exception cref="InvalidOperationException">Данное обещание уже выполнено</exception> | ||||
protected void SetError(Exception error) { | ||||
if (BeginTransit()) { | ||||
cin
|
r186 | m_error = error; | ||
CompleteTransit(REJECTED_STATE); | ||||
cin
|
r144 | } else { | ||
WaitTransition(); | ||||
cin
|
r186 | if (m_state == SUCCEEDED_STATE) | ||
cin
|
r144 | throw new InvalidOperationException("The promise is already resolved"); | ||
} | ||||
} | ||||
cin
|
r156 | protected abstract void SignalHandler(THandler handler, int signal); | ||
cin
|
r144 | |||
cin
|
r156 | void Signal() { | ||
cin
|
r242 | THandler handler; | ||
while (TryDequeueHandler(out handler)) | ||||
SignalHandler(handler, m_state); | ||||
cin
|
r144 | } | ||
#endregion | ||||
cin
|
r242 | protected abstract Signal GetFulfillSignal(); | ||
cin
|
r144 | |||
#region synchronization traits | ||||
protected void WaitResult(int timeout) { | ||||
cin
|
r242 | if (!(IsFulfilled || GetFulfillSignal().Wait(timeout))) | ||
cin
|
r148 | throw new TimeoutException(); | ||
cin
|
r144 | |||
cin
|
r242 | if (IsRejected) | ||
Rethrow(); | ||||
} | ||||
protected void Rethrow() { | ||||
Debug.Assert(m_error != null); | ||||
if (m_error is OperationCanceledException) | ||||
throw new OperationCanceledException("Operation cancelled", m_error); | ||||
else | ||||
throw new TargetInvocationException(m_error); | ||||
cin
|
r144 | } | ||
#endregion | ||||
#region handlers managment | ||||
protected void AddHandler(THandler handler) { | ||||
if (m_state > 1) { | ||||
// the promise is in the resolved state, just invoke the handler | ||||
cin
|
r156 | SignalHandler(handler, m_state); | ||
cin
|
r144 | } else { | ||
cin
|
r242 | if (Interlocked.CompareExchange(ref m_handler, handler, null) != null) { | ||
if (m_extraHandlers == null) | ||||
// compare-exchange will fprotect from loosing already created queue | ||||
Interlocked.CompareExchange(ref m_extraHandlers, new SimpleAsyncQueue<THandler>(), null); | ||||
m_extraHandlers.Enqueue(handler); | ||||
} | ||||
cin
|
r144 | |||
cin
|
r242 | if (m_state > 1 && TryDequeueHandler(out handler)) | ||
cin
|
r144 | // if the promise have been resolved while we was adding the handler to the queue | ||
// we can't guarantee that someone is still processing it | ||||
// therefore we need to fetch a handler from the queue and execute it | ||||
// note that fetched handler may be not the one that we have added | ||||
// even we can fetch no handlers at all :) | ||||
cin
|
r242 | SignalHandler(handler, m_state); | ||
cin
|
r144 | } | ||
cin
|
r242 | |||
} | ||||
bool TryDequeueHandler(out THandler handler) { | ||||
handler = Interlocked.Exchange(ref m_handler, null); | ||||
if (handler != null) | ||||
return true; | ||||
return m_extraHandlers != null && m_extraHandlers.TryDequeue(out handler); | ||||
cin
|
r144 | } | ||
#endregion | ||||
#region IPromise implementation | ||||
cin
|
r242 | public bool IsFulfilled { | ||
cin
|
r144 | get { | ||
cin
|
r242 | return m_state > TRANSITIONAL_STATE; | ||
cin
|
r144 | } | ||
} | ||||
cin
|
r242 | public bool IsRejected { | ||
cin
|
r144 | get { | ||
cin
|
r242 | return m_state == REJECTED_STATE; | ||
cin
|
r144 | } | ||
} | ||||
#endregion | ||||
cin
|
r242 | public Exception RejectReason { | ||
cin
|
r144 | get { | ||
return m_error; | ||||
} | ||||
} | ||||
} | ||||
} | ||||