##// END OF EJS Templates
Bound promise to CancellationToken...
Bound promise to CancellationToken Added new states to ExecutionSate enum. Added Safe.Guard() method to handle cleanup of the result of the promise

File last commit:

r209:a867536c68fc v2
r209:a867536c68fc v2
Show More
PromiseExtensions.cs
475 lines | 17.9 KiB | text/x-csharp | CSharpLexer
/ Implab / PromiseExtensions.cs
using System.Threading;
using System;
using Implab.Diagnostics;
using System.Collections.Generic;
using System.Linq;
namespace Implab {
public static class PromiseExtensions {
public static IPromise<T> DispatchToCurrentContext<T>(this IPromise<T> that) {
Safe.ArgumentNotNull(that, "that");
var context = SynchronizationContext.Current;
if (context == null)
return that;
var p = new SyncContextPromise<T>(context);
p.CancellationRequested(that.Cancel);
that.On(
p.Resolve,
p.Reject,
p.CancelOperation
);
return p;
}
public static IPromise<T> DispatchToContext<T>(this IPromise<T> that, SynchronizationContext context) {
Safe.ArgumentNotNull(that, "that");
Safe.ArgumentNotNull(context, "context");
var p = new SyncContextPromise<T>(context);
p.CancellationRequested(that.Cancel);
that.On(
p.Resolve,
p.Reject,
p.CancelOperation
);
return p;
}
/// <summary>
/// Ensures the dispatched.
/// </summary>
/// <returns>The dispatched.</returns>
/// <param name="that">That.</param>
/// <param name="head">Head.</param>
/// <param name="cleanup">Cleanup.</param>
/// <typeparam name="TPromise">The 1st type parameter.</typeparam>
/// <typeparam name="T">The 2nd type parameter.</typeparam>
public static TPromise EnsureDispatched<TPromise, T>(this TPromise that, IPromise<T> head, Action<T> cleanup) where TPromise : IPromise {
Safe.ArgumentNotNull(that, "that");
Safe.ArgumentNotNull(head, "head");
that.On(() => head.On(cleanup), PromiseEventType.Cancelled);
return that;
}
/// <summary>
/// Adds a cancellation point to the chain of promises. When a cancellation request reaches the cancellation point the operation is
/// cancelled immediatelly, and the request is passed towards. If the operation at the higher level can not be cancelled is't result
/// will be collected with <paramref name="cleanup"/> callback.
/// </summary>
/// <typeparam name="T">The type of the promise result.</typeparam>
/// <param name="that">The promise to which the cancellation point should be attached.</param>
/// <param name="cleanup">The callback which is used to cleanup the result of the operation if the cancellation point is cancelled already.</param>
/// <returns>The promise</returns>
public static IPromise<T> CancellationPoint<T>(this IPromise<T> that, Action<T> cleanup) {
var meduim = new Promise<T>();
that.On(meduim.Resolve, meduim.Reject, meduim.CancelOperation);
meduim.CancellationRequested(that.Cancel);
meduim.CancellationRequested(meduim.CancelOperation);
if (cleanup != null)
meduim.On((Action<T>)null, null, (e) => {
that.On(cleanup);
});
return meduim;
}
public static AsyncCallback AsyncCallback<T>(this Promise<T> that, Func<IAsyncResult, T> callback) {
Safe.ArgumentNotNull(that, "that");
Safe.ArgumentNotNull(callback, "callback");
var op = TraceContext.Instance.CurrentOperation;
return ar => {
TraceContext.Instance.EnterLogicalOperation(op, false);
try {
that.Resolve(callback(ar));
} catch (Exception err) {
that.Reject(err);
} finally {
TraceContext.Instance.Leave();
}
};
}
static void CancelByTimeoutCallback(object cookie) {
((ICancellable)cookie).Cancel(new TimeoutException());
}
/// <summary>
/// Cancells promise after the specified timeout is elapsed.
/// </summary>
/// <param name="that">The promise to cancel on timeout.</param>
/// <param name="milliseconds">The timeout in milliseconds.</param>
/// <typeparam name="TPromise">The 1st type parameter.</typeparam>
public static TPromise Timeout<TPromise>(this TPromise that, int milliseconds) where TPromise : IPromise {
Safe.ArgumentNotNull(that, "that");
var timer = new Timer(CancelByTimeoutCallback, that, milliseconds, -1);
that.On(timer.Dispose, PromiseEventType.All);
return that;
}
public static IPromise PromiseAll(this IEnumerable<IPromise> that) {
Safe.ArgumentNotNull(that, "that");
return PromiseAll(that.ToList());
}
public static IPromise<T[]> PromiseAll<T>(this IEnumerable<IPromise<T>> that) {
return PromiseAll(that, null);
}
public static IPromise<T[]> PromiseAll<T>(this IEnumerable<IPromise<T>> that, Action<T> cleanup) {
Safe.ArgumentNotNull(that, "that");
return PromiseAll(that.ToList(), cleanup);
}
public static IPromise PromiseAll(this ICollection<IPromise> that) {
Safe.ArgumentNotNull(that, "that");
int count = that.Count;
int errors = 0;
var medium = new Promise();
if (count == 0) {
medium.Resolve();
return medium;
}
medium.On(() => {
foreach (var p2 in that)
p2.Cancel();
}, PromiseEventType.ErrorOrCancel);
foreach (var p in that)
p.On(
() => {
if (Interlocked.Decrement(ref count) == 0)
medium.Resolve();
},
error => {
if (Interlocked.Increment(ref errors) == 1)
medium.Reject(
new Exception("The dependency promise is failed", error)
);
},
reason => {
if (Interlocked.Increment(ref errors) == 1)
medium.Cancel(
new Exception("The dependency promise is cancelled")
);
}
);
return medium;
}
public static IPromise<T[]> PromiseAll<T>(this ICollection<IPromise<T>> that) {
return PromiseAll(that, null);
}
/// <summary>
/// Creates a new promise which will be satisfied when all promises are satisfied.
/// </summary>
/// <typeparam name="T"></typeparam>
/// <param name="that"></param>
/// <param name="cleanup">A callback used to cleanup already resolved promises in case of an error</param>
/// <returns></returns>
public static IPromise<T[]> PromiseAll<T>(this ICollection<IPromise<T>> that, Action<T> cleanup) {
Safe.ArgumentNotNull(that, "that");
int count = that.Count;
if (count == 0)
return Promise<T[]>.FromResult(new T[0]);
int errors = 0;
var medium = new Promise<T[]>();
var results = new T[that.Count];
medium.On(() => {
foreach (var p2 in that) {
p2.Cancel();
if (cleanup != null)
p2.On(cleanup);
}
}, PromiseEventType.ErrorOrCancel);
int i = 0;
foreach (var p in that) {
var idx = i;
p.On(
x => {
results[idx] = x;
if (Interlocked.Decrement(ref count) == 0)
medium.Resolve(results);
},
error => {
if (Interlocked.Increment(ref errors) == 1)
medium.Reject(
new Exception("The dependency promise is failed", error)
);
},
reason => {
if (Interlocked.Increment(ref errors) == 1)
medium.Cancel(
new Exception("The dependency promise is cancelled", reason)
);
}
);
i++;
}
return medium;
}
public static IPromise Then(this IPromise that, Action success, Action<Exception> error, Action<Exception> cancel) {
Safe.ArgumentNotNull(that, "that");
var d = new ActionTask(success, error, cancel, false);
that.On(d.Resolve, d.Reject, d.CancelOperation);
d.CancellationRequested(that.Cancel);
return d;
}
public static IPromise Then(this IPromise that, Action success, Action<Exception> error) {
return Then(that, success, error, null);
}
public static IPromise Then(this IPromise that, Action success) {
return Then(that, success, null, null);
}
public static IPromise<T> Then<T>(this IPromise that, Func<T> success, Func<Exception, T> error, Func<Exception, T> cancel) {
Safe.ArgumentNotNull(that, "that");
var d = new FuncTask<T>(success, error, cancel, false);
that.On(d.Resolve, d.Reject, d.CancelOperation);
d.CancellationRequested(that.Cancel);
return d;
}
public static IPromise<T> Then<T>(this IPromise that, Func<T> success, Func<Exception, T> error) {
return Then(that, success, error, null);
}
public static IPromise<T> Then<T>(this IPromise that, Func<T> success) {
return Then(that, success, null, null);
}
public static IPromise<T2> Then<T, T2>(this IPromise<T> that, Func<T, T2> success, Func<Exception, T2> error, Func<Exception, T2> cancel) {
Safe.ArgumentNotNull(that, "that");
Safe.ArgumentNotNull(success, "success");
var d = new FuncTask<T, T2>(success, error, cancel, false);
that.On(d.Resolve, d.Reject, d.CancelOperation);
d.CancellationRequested(that.Cancel);
return d;
}
public static IPromise<T> Then<T>(this IPromise<T> that, Action<T> success, Func<Exception, T> error, Func<Exception, T> cancel) {
Safe.ArgumentNotNull(that, "that");
var d = new FuncTask<T, T>(
x => {
success(x);
return x;
},
error,
cancel,
false
);
that.On(d.Resolve, d.Reject, d.CancelOperation);
d.CancellationRequested(that.Cancel);
return d;
}
public static IPromise<T> Then<T>(this IPromise<T> that, Action<T> success, Func<Exception, T> error) {
return Then(that, success, error, null);
}
public static IPromise<T> Then<T>(this IPromise<T> that, Action<T> success) {
return Then(that, success, null, null);
}
public static IPromise<T2> Then<T, T2>(this IPromise<T> that, Func<T, T2> success, Func<Exception, T2> error) {
return Then(that, success, error, null);
}
public static IPromise<T2> Then<T, T2>(this IPromise<T> that, Func<T, T2> success) {
return Then(that, success, null, null);
}
public static IPromise<T> Always<T>(this IPromise<T> that, Action handler) {
Func<Exception, T> errorOrCancel;
if (handler != null)
errorOrCancel = e => {
handler();
throw new PromiseTransientException(e);
};
else
errorOrCancel = null;
return Then(
that,
x => {
handler();
return x;
},
errorOrCancel,
errorOrCancel);
}
public static IPromise Always(this IPromise that, Action handler) {
Action<Exception> errorOrCancel;
if (handler != null)
errorOrCancel = e => {
handler();
throw new PromiseTransientException(e);
};
else
errorOrCancel = null;
return Then(
that,
handler,
errorOrCancel,
errorOrCancel);
}
public static IPromise Error(this IPromise that, Action<Exception> handler, bool handleCancellation) {
Action<Exception> errorOrCancel;
if (handler != null)
errorOrCancel = e => {
handler(e);
throw new PromiseTransientException(e);
};
else
errorOrCancel = null;
return Then(that, null, errorOrCancel, handleCancellation ? errorOrCancel : null);
}
public static IPromise Error(this IPromise that, Action<Exception> handler) {
return Error(that, handler, false);
}
public static IPromise<T> Error<T>(this IPromise<T> that, Action<Exception> handler, bool handleCancellation) {
Func<Exception, T> errorOrCancel;
if (handler != null)
errorOrCancel = e => {
handler(e);
throw new PromiseTransientException(e);
};
else
errorOrCancel = null;
return Then(that, null, errorOrCancel, handleCancellation ? errorOrCancel : null);
}
public static IPromise<T> Error<T>(this IPromise<T> that, Action<Exception> handler) {
return Error(that, handler, false);
}
#region chain traits
public static IPromise Chain(this IPromise that, Func<IPromise> success, Func<Exception, IPromise> error, Func<Exception, IPromise> cancel) {
Safe.ArgumentNotNull(that, "that");
var d = new ActionChainTask(success, error, cancel, false);
that.On(d.Resolve, d.Reject, d.CancelOperation);
d.CancellationRequested(that.Cancel);
return d;
}
public static IPromise Chain(this IPromise that, Func<IPromise> success, Func<Exception, IPromise> error) {
return Chain(that, success, error, null);
}
public static IPromise Chain(this IPromise that, Func<IPromise> success) {
return Chain(that, success, null, null);
}
public static IPromise<T> Chain<T>(this IPromise that, Func<IPromise<T>> success, Func<Exception, IPromise<T>> error, Func<Exception, IPromise<T>> cancel) {
Safe.ArgumentNotNull(that, "that");
var d = new FuncChainTask<T>(success, error, cancel, false);
that.On(d.Resolve, d.Reject, d.CancelOperation);
if (success != null)
d.CancellationRequested(that.Cancel);
return d;
}
public static IPromise<T> Chain<T>(this IPromise that, Func<IPromise<T>> success, Func<Exception, IPromise<T>> error) {
return Chain(that, success, error, null);
}
public static IPromise<T> Chain<T>(this IPromise that, Func<IPromise<T>> success) {
return Chain(that, success, null, null);
}
public static IPromise<T2> Chain<T, T2>(this IPromise<T> that, Func<T, IPromise<T2>> success, Func<Exception, IPromise<T2>> error, Func<Exception, IPromise<T2>> cancel) {
Safe.ArgumentNotNull(that, "that");
var d = new FuncChainTask<T, T2>(success, error, cancel, false);
that.On(d.Resolve, d.Reject, d.CancelOperation);
if (success != null)
d.CancellationRequested(that.Cancel);
return d;
}
public static IPromise<T2> Chain<T, T2>(this IPromise<T> that, Func<T, IPromise<T2>> success, Func<Exception, IPromise<T2>> error) {
return Chain(that, success, error, null);
}
public static IPromise<T2> Chain<T, T2>(this IPromise<T> that, Func<T, IPromise<T2>> success) {
return Chain(that, success, null, null);
}
#endregion
public static IPromise<T2> Guard<T, T2>(this IPromise<T> that, Func<IPromise<T>, IPromise<T2>> continuation, Action<T> cleanup) {
Safe.ArgumentNotNull(that, "that");
Safe.ArgumentNotNull(continuation, "continuation");
return continuation(that).Error((err) => {
that.On(cleanup);
}, true);
}
#if NET_4_5
public static PromiseAwaiter<T> GetAwaiter<T>(this IPromise<T> that) {
Safe.ArgumentNotNull(that, "that");
return new PromiseAwaiter<T>(that);
}
public static PromiseAwaiter GetAwaiter(this IPromise that) {
Safe.ArgumentNotNull(that, "that");
return new PromiseAwaiter(that);
}
public static IPromise BoundCancellationToken(this IPromise that, CancellationToken ct) {
Safe.ArgumentNotNull(that, "that");
ct.Register(that.Cancel);
return that.Then(null, null, (err) => {
ct.ThrowIfCancellationRequested();
throw new PromiseTransientException(err);
});
}
public static IPromise<T> BoundCancellationToken<T>(this IPromise<T> that, CancellationToken ct) {
Safe.ArgumentNotNull(that, "that");
ct.Register(that.Cancel);
return that.Then(null, null, (err) => {
ct.ThrowIfCancellationRequested();
throw new PromiseTransientException(err);
});
}
#endif
}
}