Promise.cs
221 lines
| 6.0 KiB
| text/x-csharp
|
CSharpLexer
cin
|
r289 | using System; | ||
using System.Collections.Generic; | ||||
using System.Diagnostics; | ||||
using System.Reflection; | ||||
using System.Threading; | ||||
using Implab.Parallels; | ||||
namespace Implab { | ||||
public class Promise : AbstractEvent<IResolvable>, IPromise { | ||||
public static IDispatcher DefaultDispatcher { | ||||
get { | ||||
return ThreadPoolDispatcher.Instance; | ||||
} | ||||
} | ||||
class ResolvableSignal : IResolvable { | ||||
public Signal Signal { get; private set; } | ||||
public ResolvableSignal() { | ||||
Signal = new Signal(); | ||||
} | ||||
public void Reject(Exception error) { | ||||
Signal.Set(); | ||||
} | ||||
public void Resolve() { | ||||
Signal.Set(); | ||||
} | ||||
} | ||||
PromiseState m_state; | ||||
Exception m_error; | ||||
public bool IsRejected { | ||||
get { | ||||
return m_state == PromiseState.Rejected; | ||||
} | ||||
} | ||||
public bool IsFulfilled { | ||||
get { | ||||
return m_state == PromiseState.Fulfilled; | ||||
} | ||||
} | ||||
public Exception RejectReason { | ||||
get { | ||||
return m_error; | ||||
} | ||||
} | ||||
internal Promise() { | ||||
} | ||||
internal void ResolvePromise() { | ||||
if (BeginTransit()) { | ||||
m_state = PromiseState.Fulfilled; | ||||
CompleteTransit(); | ||||
} | ||||
} | ||||
internal void RejectPromise(Exception reason) { | ||||
if (BeginTransit()) { | ||||
m_error = reason; | ||||
m_state = PromiseState.Rejected; | ||||
CompleteTransit(); | ||||
} | ||||
} | ||||
#region implemented abstract members of AbstractPromise | ||||
protected override void SignalHandler(IResolvable handler) { | ||||
switch (m_state) { | ||||
case PromiseState.Fulfilled: | ||||
handler.Resolve(); | ||||
break; | ||||
case PromiseState.Rejected: | ||||
handler.Reject(RejectReason); | ||||
break; | ||||
default: | ||||
throw new InvalidOperationException(String.Format("Invalid promise signal: {0}", m_state)); | ||||
} | ||||
} | ||||
protected void WaitResult(int timeout) { | ||||
if (!(IsResolved || GetFulfillSignal().Wait(timeout))) | ||||
throw new TimeoutException(); | ||||
} | ||||
protected Signal GetFulfillSignal() { | ||||
var next = new ResolvableSignal(); | ||||
Then(next); | ||||
return next.Signal; | ||||
} | ||||
#endregion | ||||
public Type ResultType { | ||||
get { | ||||
return typeof(void); | ||||
} | ||||
} | ||||
protected void Rethrow() { | ||||
Debug.Assert(m_error != null); | ||||
if (m_error is OperationCanceledException) | ||||
throw new OperationCanceledException("Operation cancelled", m_error); | ||||
else | ||||
throw new TargetInvocationException(m_error); | ||||
} | ||||
public void Then(IResolvable next) { | ||||
AddHandler(next); | ||||
} | ||||
public IPromise<T> Cast<T>() { | ||||
throw new InvalidCastException(); | ||||
} | ||||
public void Join() { | ||||
WaitResult(-1); | ||||
if (IsRejected) | ||||
Rethrow(); | ||||
} | ||||
public void Join(int timeout) { | ||||
WaitResult(timeout); | ||||
if (IsRejected) | ||||
Rethrow(); | ||||
} | ||||
public static ResolvedPromise Resolve() { | ||||
return new ResolvedPromise(); | ||||
} | ||||
public static ResolvedPromise<T> Resolve<T>(T result) { | ||||
return new ResolvedPromise<T>(result); | ||||
} | ||||
public static RejectedPromise Reject(Exception reason) { | ||||
return new RejectedPromise(reason); | ||||
} | ||||
public static RejectedPromise<T> Reject<T>(Exception reason) { | ||||
return new RejectedPromise<T>(reason); | ||||
} | ||||
public static IPromise Create(PromiseExecutor executor) { | ||||
return Create(executor, CancellationToken.None); | ||||
} | ||||
public static IPromise Create(PromiseExecutor executor, CancellationToken ct) { | ||||
Safe.ArgumentNotNull(executor, nameof(executor)); | ||||
if (!ct.CanBeCanceled) | ||||
return Create(executor); | ||||
var d = new Deferred(); | ||||
ct.Register(d.Cancel); | ||||
try { | ||||
if (!ct.IsCancellationRequested) | ||||
executor(d); | ||||
} catch(Exception e) { | ||||
d.Reject(e); | ||||
} | ||||
return d.Promise; | ||||
} | ||||
public static IPromise<T> Create<T>(PromiseExecutor<T> executor) { | ||||
return Create(executor, CancellationToken.None); | ||||
} | ||||
public static IPromise<T> Create<T>(PromiseExecutor<T> executor, CancellationToken ct) { | ||||
Safe.ArgumentNotNull(executor, nameof(executor)); | ||||
var d = new Deferred<T>(); | ||||
ct.Register(d.Cancel); | ||||
try { | ||||
if (!ct.IsCancellationRequested) | ||||
executor(d); | ||||
} catch(Exception e) { | ||||
d.Reject(e); | ||||
} | ||||
return d.Promise; | ||||
} | ||||
public static IPromise All(IEnumerable<IPromise> promises) { | ||||
var d = new Deferred(); | ||||
var all = new PromiseAll(d); | ||||
foreach (var promise in promises) { | ||||
all.AddPromise(promise); | ||||
if (all.Done) | ||||
break; | ||||
} | ||||
all.Complete(); | ||||
return all.ResultPromise; | ||||
} | ||||
public static IPromise<T[]> All<T>(IEnumerable<IPromise<T>> promises, Func<T, IPromise> cleanup = null, Action cancel = null) { | ||||
var d = new Deferred<T[]>(); | ||||
var all = new PromiseAll<T>(d, cleanup, cancel); | ||||
foreach (var promise in promises) { | ||||
all.AddPromise(promise); | ||||
if (all.Done) | ||||
break; | ||||
} | ||||
all.Complete(); | ||||
return all.ResultPromise; | ||||
} | ||||
} | ||||
} | ||||