##// END OF EJS Templates
Bound promise to CancellationToken...
Bound promise to CancellationToken Added new states to ExecutionSate enum. Added Safe.Guard() method to handle cleanup of the result of the promise

File last commit:

r209:a867536c68fc v2
r209:a867536c68fc v2
Show More
PromiseExtensions.cs
475 lines | 17.9 KiB | text/x-csharp | CSharpLexer
/ Implab / PromiseExtensions.cs
cin
promises refactoring
r72 using System.Threading;
cin
major refactoring, added tasks support
r75 using System;
cin
minor fixes
r109 using Implab.Diagnostics;
cin
Promises rewritten, added improved version of AsyncQueue
r119 using System.Collections.Generic;
cin
Added ResetState to RunnableComponent to reset in case of failure...
r205 using System.Linq;
cin
promises refactoring
r72 namespace Implab {
public static class PromiseExtensions {
public static IPromise<T> DispatchToCurrentContext<T>(this IPromise<T> that) {
cin
major refactoring, added tasks support
r75 Safe.ArgumentNotNull(that, "that");
cin
promises refactoring
r72 var context = SynchronizationContext.Current;
if (context == null)
return that;
cin
Promises rewritten, added improved version of AsyncQueue
r119 var p = new SyncContextPromise<T>(context);
cin
runnable component, work in progress
r185 p.CancellationRequested(that.Cancel);
cin
promises refactoring
r72
cin
renamed Promise.Last -> Promise.On...
r104 that.On(
cin
Refactoring of the IPromise<T> interface...
r76 p.Resolve,
p.Reject,
cin
runnable component, work in progress
r185 p.CancelOperation
cin
promises refactoring
r72 );
return p;
}
public static IPromise<T> DispatchToContext<T>(this IPromise<T> that, SynchronizationContext context) {
cin
major refactoring, added tasks support
r75 Safe.ArgumentNotNull(that, "that");
cin
promises refactoring
r72 Safe.ArgumentNotNull(context, "context");
cin
Promises rewritten, added improved version of AsyncQueue
r119 var p = new SyncContextPromise<T>(context);
cin
runnable component, work in progress
r185 p.CancellationRequested(that.Cancel);
cin
promises refactoring
r72
cin
renamed Promise.Last -> Promise.On...
r104 that.On(
cin
Refactoring of the IPromise<T> interface...
r76 p.Resolve,
p.Reject,
cin
runnable component, work in progress
r185 p.CancelOperation
cin
promises refactoring
r72 );
return p;
}
cin
major refactoring, added tasks support
r75
cin
code cleanup...
r101 /// <summary>
/// Ensures the dispatched.
/// </summary>
/// <returns>The dispatched.</returns>
/// <param name="that">That.</param>
/// <param name="head">Head.</param>
/// <param name="cleanup">Cleanup.</param>
/// <typeparam name="TPromise">The 1st type parameter.</typeparam>
/// <typeparam name="T">The 2nd type parameter.</typeparam>
cin
added Safe.DispatchEvent() a legacy equivalent for '?.Invoke()'...
r207 public static TPromise EnsureDispatched<TPromise, T>(this TPromise that, IPromise<T> head, Action<T> cleanup) where TPromise : IPromise {
cin
code cleanup...
r101 Safe.ArgumentNotNull(that, "that");
Safe.ArgumentNotNull(head, "head");
cin
added ICancellable.Cancel(Exception) to allow specify the reason of cancellation
r138 that.On(() => head.On(cleanup), PromiseEventType.Cancelled);
cin
code cleanup...
r101
return that;
}
cin
added Safe.DispatchEvent() a legacy equivalent for '?.Invoke()'...
r207 /// <summary>
/// Adds a cancellation point to the chain of promises. When a cancellation request reaches the cancellation point the operation is
/// cancelled immediatelly, and the request is passed towards. If the operation at the higher level can not be cancelled is't result
/// will be collected with <paramref name="cleanup"/> callback.
/// </summary>
/// <typeparam name="T">The type of the promise result.</typeparam>
/// <param name="that">The promise to which the cancellation point should be attached.</param>
/// <param name="cleanup">The callback which is used to cleanup the result of the operation if the cancellation point is cancelled already.</param>
/// <returns>The promise</returns>
public static IPromise<T> CancellationPoint<T>(this IPromise<T> that, Action<T> cleanup) {
var meduim = new Promise<T>();
that.On(meduim.Resolve, meduim.Reject, meduim.CancelOperation);
meduim.CancellationRequested(that.Cancel);
meduim.CancellationRequested(meduim.CancelOperation);
if (cleanup != null)
meduim.On((Action<T>)null, null, (e) => {
that.On(cleanup);
});
return meduim;
}
public static AsyncCallback AsyncCallback<T>(this Promise<T> that, Func<IAsyncResult, T> callback) {
cin
major refactoring, added tasks support
r75 Safe.ArgumentNotNull(that, "that");
Safe.ArgumentNotNull(callback, "callback");
cin
minor fixes
r109 var op = TraceContext.Instance.CurrentOperation;
cin
major refactoring, added tasks support
r75 return ar => {
cin
added Safe.DispatchEvent() a legacy equivalent for '?.Invoke()'...
r207 TraceContext.Instance.EnterLogicalOperation(op, false);
cin
major refactoring, added tasks support
r75 try {
that.Resolve(callback(ar));
} catch (Exception err) {
that.Reject(err);
cin
minor fixes
r109 } finally {
TraceContext.Instance.Leave();
cin
major refactoring, added tasks support
r75 }
};
}
cin
added promise timeout helper
r110
cin
runnable component, work in progress
r185 static void CancelByTimeoutCallback(object cookie) {
((ICancellable)cookie).Cancel(new TimeoutException());
cin
added Safe.DispatchEvent() a legacy equivalent for '?.Invoke()'...
r207 }
/// <summary>
cin
added promise timeout helper
r110 /// Cancells promise after the specified timeout is elapsed.
/// </summary>
/// <param name="that">The promise to cancel on timeout.</param>
/// <param name="milliseconds">The timeout in milliseconds.</param>
/// <typeparam name="TPromise">The 1st type parameter.</typeparam>
public static TPromise Timeout<TPromise>(this TPromise that, int milliseconds) where TPromise : IPromise {
Safe.ArgumentNotNull(that, "that");
cin
runnable component, work in progress
r185 var timer = new Timer(CancelByTimeoutCallback, that, milliseconds, -1);
cin
added promise timeout helper
r110 that.On(timer.Dispose, PromiseEventType.All);
return that;
}
cin
Promises rewritten, added improved version of AsyncQueue
r119
cin
Added ResetState to RunnableComponent to reset in case of failure...
r205 public static IPromise PromiseAll(this IEnumerable<IPromise> that) {
Safe.ArgumentNotNull(that, "that");
return PromiseAll(that.ToList());
}
public static IPromise<T[]> PromiseAll<T>(this IEnumerable<IPromise<T>> that) {
cin
RunnableComponent.Dispose(bool,Exception) changed to standart Dispose(bool)...
r208 return PromiseAll(that, null);
}
public static IPromise<T[]> PromiseAll<T>(this IEnumerable<IPromise<T>> that, Action<T> cleanup) {
cin
Added ResetState to RunnableComponent to reset in case of failure...
r205 Safe.ArgumentNotNull(that, "that");
cin
RunnableComponent.Dispose(bool,Exception) changed to standart Dispose(bool)...
r208 return PromiseAll(that.ToList(), cleanup);
cin
Added ResetState to RunnableComponent to reset in case of failure...
r205 }
public static IPromise PromiseAll(this ICollection<IPromise> that) {
cin
Promises rewritten, added improved version of AsyncQueue
r119 Safe.ArgumentNotNull(that, "that");
int count = that.Count;
cin
major update, added Drain mathod to AsyncQueue class
r124 int errors = 0;
cin
Promises rewritten, added improved version of AsyncQueue
r119 var medium = new Promise();
cin
shared locks + tests
r136 if (count == 0) {
medium.Resolve();
return medium;
}
cin
major update, added Drain mathod to AsyncQueue class
r124 medium.On(() => {
cin
added Safe.DispatchEvent() a legacy equivalent for '?.Invoke()'...
r207 foreach (var p2 in that)
cin
major update, added Drain mathod to AsyncQueue class
r124 p2.Cancel();
}, PromiseEventType.ErrorOrCancel);
cin
Promises rewritten, added improved version of AsyncQueue
r119 foreach (var p in that)
p.On(
() => {
if (Interlocked.Decrement(ref count) == 0)
medium.Resolve();
},
error => {
cin
major update, added Drain mathod to AsyncQueue class
r124 if (Interlocked.Increment(ref errors) == 1)
medium.Reject(
new Exception("The dependency promise is failed", error)
);
cin
Promises rewritten, added improved version of AsyncQueue
r119 },
cin
added ICancellable.Cancel(Exception) to allow specify the reason of cancellation
r138 reason => {
cin
major update, added Drain mathod to AsyncQueue class
r124 if (Interlocked.Increment(ref errors) == 1)
cin
added ICancellable.Cancel(Exception) to allow specify the reason of cancellation
r138 medium.Cancel(
cin
major update, added Drain mathod to AsyncQueue class
r124 new Exception("The dependency promise is cancelled")
);
cin
Promises rewritten, added improved version of AsyncQueue
r119 }
);
return medium;
}
cin
major update, added Drain mathod to AsyncQueue class
r124
cin
RunnableComponent.Dispose(bool,Exception) changed to standart Dispose(bool)...
r208 public static IPromise<T[]> PromiseAll<T>(this ICollection<IPromise<T>> that) {
return PromiseAll(that, null);
}
/// <summary>
/// Creates a new promise which will be satisfied when all promises are satisfied.
/// </summary>
/// <typeparam name="T"></typeparam>
/// <param name="that"></param>
/// <param name="cleanup">A callback used to cleanup already resolved promises in case of an error</param>
/// <returns></returns>
public static IPromise<T[]> PromiseAll<T>(this ICollection<IPromise<T>> that, Action<T> cleanup) {
cin
Bound promise to CancellationToken...
r209 Safe.ArgumentNotNull(that, "that");
cin
RunnableComponent.Dispose(bool,Exception) changed to standart Dispose(bool)...
r208 int count = that.Count;
cin
major update, added Drain mathod to AsyncQueue class
r124
cin
RunnableComponent.Dispose(bool,Exception) changed to standart Dispose(bool)...
r208 if (count == 0)
return Promise<T[]>.FromResult(new T[0]);
cin
major update, added Drain mathod to AsyncQueue class
r124 int errors = 0;
var medium = new Promise<T[]>();
var results = new T[that.Count];
medium.On(() => {
cin
RunnableComponent.Dispose(bool,Exception) changed to standart Dispose(bool)...
r208 foreach (var p2 in that) {
p2.Cancel();
if (cleanup != null)
p2.On(cleanup);
}
cin
major update, added Drain mathod to AsyncQueue class
r124 }, PromiseEventType.ErrorOrCancel);
int i = 0;
foreach (var p in that) {
var idx = i;
p.On(
x => {
results[idx] = x;
if (Interlocked.Decrement(ref count) == 0)
medium.Resolve(results);
},
error => {
if (Interlocked.Increment(ref errors) == 1)
medium.Reject(
new Exception("The dependency promise is failed", error)
);
},
cin
added ICancellable.Cancel(Exception) to allow specify the reason of cancellation
r138 reason => {
cin
major update, added Drain mathod to AsyncQueue class
r124 if (Interlocked.Increment(ref errors) == 1)
cin
added ICancellable.Cancel(Exception) to allow specify the reason of cancellation
r138 medium.Cancel(
new Exception("The dependency promise is cancelled", reason)
cin
major update, added Drain mathod to AsyncQueue class
r124 );
}
);
i++;
}
return medium;
}
cin
RC: cancellation support for promises + tests
r145
public static IPromise Then(this IPromise that, Action success, Action<Exception> error, Action<Exception> cancel) {
Safe.ArgumentNotNull(that, "that");
cin
fixed promises cancellation
r149 var d = new ActionTask(success, error, cancel, false);
cin
RC: cancellation support for promises + tests
r145 that.On(d.Resolve, d.Reject, d.CancelOperation);
cin
runnable component, work in progress
r185 d.CancellationRequested(that.Cancel);
cin
RC: cancellation support for promises + tests
r145 return d;
}
public static IPromise Then(this IPromise that, Action success, Action<Exception> error) {
return Then(that, success, error, null);
}
public static IPromise Then(this IPromise that, Action success) {
return Then(that, success, null, null);
}
public static IPromise<T> Then<T>(this IPromise that, Func<T> success, Func<Exception, T> error, Func<Exception, T> cancel) {
Safe.ArgumentNotNull(that, "that");
cin
fixed promises cancellation
r149 var d = new FuncTask<T>(success, error, cancel, false);
cin
RC: cancellation support for promises + tests
r145 that.On(d.Resolve, d.Reject, d.CancelOperation);
cin
runnable component, work in progress
r185 d.CancellationRequested(that.Cancel);
cin
RC: cancellation support for promises + tests
r145 return d;
}
public static IPromise<T> Then<T>(this IPromise that, Func<T> success, Func<Exception, T> error) {
return Then(that, success, error, null);
}
public static IPromise<T> Then<T>(this IPromise that, Func<T> success) {
return Then(that, success, null, null);
}
public static IPromise<T2> Then<T, T2>(this IPromise<T> that, Func<T, T2> success, Func<Exception, T2> error, Func<Exception, T2> cancel) {
Safe.ArgumentNotNull(that, "that");
cin
Bound promise to CancellationToken...
r209 Safe.ArgumentNotNull(success, "success");
cin
added Safe.DispatchEvent() a legacy equivalent for '?.Invoke()'...
r207 var d = new FuncTask<T, T2>(success, error, cancel, false);
cin
RC: cancellation support for promises + tests
r145 that.On(d.Resolve, d.Reject, d.CancelOperation);
cin
runnable component, work in progress
r185 d.CancellationRequested(that.Cancel);
cin
RC: cancellation support for promises + tests
r145 return d;
}
cin
added Safe.DispatchEvent() a legacy equivalent for '?.Invoke()'...
r207 public static IPromise<T> Then<T>(this IPromise<T> that, Action<T> success, Func<Exception, T> error, Func<Exception, T> cancel) {
Safe.ArgumentNotNull(that, "that");
var d = new FuncTask<T, T>(
x => {
success(x);
return x;
},
cin
Bound promise to CancellationToken...
r209 error,
cancel,
cin
added Safe.DispatchEvent() a legacy equivalent for '?.Invoke()'...
r207 false
);
that.On(d.Resolve, d.Reject, d.CancelOperation);
d.CancellationRequested(that.Cancel);
return d;
}
public static IPromise<T> Then<T>(this IPromise<T> that, Action<T> success, Func<Exception, T> error) {
return Then(that, success, error, null);
}
public static IPromise<T> Then<T>(this IPromise<T> that, Action<T> success) {
return Then(that, success, null, null);
}
cin
RC: cancellation support for promises + tests
r145 public static IPromise<T2> Then<T, T2>(this IPromise<T> that, Func<T, T2> success, Func<Exception, T2> error) {
return Then(that, success, error, null);
}
public static IPromise<T2> Then<T, T2>(this IPromise<T> that, Func<T, T2> success) {
return Then(that, success, null, null);
}
cin
added Safe.DispatchEvent() a legacy equivalent for '?.Invoke()'...
r207 public static IPromise<T> Always<T>(this IPromise<T> that, Action handler) {
Func<Exception, T> errorOrCancel;
if (handler != null)
errorOrCancel = e => {
handler();
throw new PromiseTransientException(e);
};
else
errorOrCancel = null;
return Then(
that,
x => {
handler();
return x;
},
errorOrCancel,
errorOrCancel);
}
public static IPromise Always(this IPromise that, Action handler) {
Action<Exception> errorOrCancel;
if (handler != null)
errorOrCancel = e => {
handler();
throw new PromiseTransientException(e);
};
else
errorOrCancel = null;
return Then(
that,
handler,
errorOrCancel,
errorOrCancel);
}
public static IPromise Error(this IPromise that, Action<Exception> handler, bool handleCancellation) {
Action<Exception> errorOrCancel;
if (handler != null)
errorOrCancel = e => {
handler(e);
throw new PromiseTransientException(e);
};
else
errorOrCancel = null;
return Then(that, null, errorOrCancel, handleCancellation ? errorOrCancel : null);
}
public static IPromise Error(this IPromise that, Action<Exception> handler) {
return Error(that, handler, false);
}
public static IPromise<T> Error<T>(this IPromise<T> that, Action<Exception> handler, bool handleCancellation) {
Func<Exception, T> errorOrCancel;
if (handler != null)
errorOrCancel = e => {
handler(e);
throw new PromiseTransientException(e);
};
else
errorOrCancel = null;
return Then(that, null, errorOrCancel, handleCancellation ? errorOrCancel : null);
}
public static IPromise<T> Error<T>(this IPromise<T> that, Action<Exception> handler) {
return Error(that, handler, false);
}
cin
RC: cancellation support for promises + tests
r145 #region chain traits
cin
added Safe.DispatchEvent() a legacy equivalent for '?.Invoke()'...
r207 public static IPromise Chain(this IPromise that, Func<IPromise> success, Func<Exception, IPromise> error, Func<Exception, IPromise> cancel) {
cin
RC: cancellation support for promises + tests
r145 Safe.ArgumentNotNull(that, "that");
cin
fixed promises cancellation
r149 var d = new ActionChainTask(success, error, cancel, false);
cin
RC: cancellation support for promises + tests
r145 that.On(d.Resolve, d.Reject, d.CancelOperation);
cin
runnable component, work in progress
r185 d.CancellationRequested(that.Cancel);
cin
RC: cancellation support for promises + tests
r145 return d;
}
cin
added Safe.DispatchEvent() a legacy equivalent for '?.Invoke()'...
r207 public static IPromise Chain(this IPromise that, Func<IPromise> success, Func<Exception, IPromise> error) {
cin
RC: cancellation support for promises + tests
r145 return Chain(that, success, error, null);
}
public static IPromise Chain(this IPromise that, Func<IPromise> success) {
return Chain(that, success, null, null);
}
public static IPromise<T> Chain<T>(this IPromise that, Func<IPromise<T>> success, Func<Exception, IPromise<T>> error, Func<Exception, IPromise<T>> cancel) {
Safe.ArgumentNotNull(that, "that");
cin
fixed promises cancellation
r149 var d = new FuncChainTask<T>(success, error, cancel, false);
cin
RC: cancellation support for promises + tests
r145 that.On(d.Resolve, d.Reject, d.CancelOperation);
if (success != null)
d.CancellationRequested(that.Cancel);
return d;
}
public static IPromise<T> Chain<T>(this IPromise that, Func<IPromise<T>> success, Func<Exception, IPromise<T>> error) {
return Chain(that, success, error, null);
}
public static IPromise<T> Chain<T>(this IPromise that, Func<IPromise<T>> success) {
return Chain(that, success, null, null);
}
public static IPromise<T2> Chain<T, T2>(this IPromise<T> that, Func<T, IPromise<T2>> success, Func<Exception, IPromise<T2>> error, Func<Exception, IPromise<T2>> cancel) {
Safe.ArgumentNotNull(that, "that");
cin
added Safe.DispatchEvent() a legacy equivalent for '?.Invoke()'...
r207 var d = new FuncChainTask<T, T2>(success, error, cancel, false);
cin
RC: cancellation support for promises + tests
r145 that.On(d.Resolve, d.Reject, d.CancelOperation);
if (success != null)
d.CancellationRequested(that.Cancel);
return d;
}
public static IPromise<T2> Chain<T, T2>(this IPromise<T> that, Func<T, IPromise<T2>> success, Func<Exception, IPromise<T2>> error) {
return Chain(that, success, error, null);
}
public static IPromise<T2> Chain<T, T2>(this IPromise<T> that, Func<T, IPromise<T2>> success) {
return Chain(that, success, null, null);
cin
added Safe.DispatchEvent() a legacy equivalent for '?.Invoke()'...
r207 }
cin
RC: cancellation support for promises + tests
r145 #endregion
cin
added Safe.DispatchEvent() a legacy equivalent for '?.Invoke()'...
r207
cin
Bound promise to CancellationToken...
r209 public static IPromise<T2> Guard<T, T2>(this IPromise<T> that, Func<IPromise<T>, IPromise<T2>> continuation, Action<T> cleanup) {
Safe.ArgumentNotNull(that, "that");
Safe.ArgumentNotNull(continuation, "continuation");
return continuation(that).Error((err) => {
that.On(cleanup);
}, true);
}
cin
added Safe.DispatchEvent() a legacy equivalent for '?.Invoke()'...
r207
#if NET_4_5
cin
Added support for 'await' operator to promises
r151 public static PromiseAwaiter<T> GetAwaiter<T>(this IPromise<T> that) {
cin
major refactoring, added tasks support
r75 Safe.ArgumentNotNull(that, "that");
cin
Added support for 'await' operator to promises
r151 return new PromiseAwaiter<T>(that);
cin
added Safe.DispatchEvent() a legacy equivalent for '?.Invoke()'...
r207 }
cin
RunnableComponent.Dispose(bool,Exception) changed to standart Dispose(bool)...
r208 public static PromiseAwaiter GetAwaiter(this IPromise that) {
Safe.ArgumentNotNull(that, "that");
return new PromiseAwaiter(that);
}
cin
Bound promise to CancellationToken...
r209 public static IPromise BoundCancellationToken(this IPromise that, CancellationToken ct) {
Safe.ArgumentNotNull(that, "that");
ct.Register(that.Cancel);
return that.Then(null, null, (err) => {
ct.ThrowIfCancellationRequested();
throw new PromiseTransientException(err);
});
}
public static IPromise<T> BoundCancellationToken<T>(this IPromise<T> that, CancellationToken ct) {
Safe.ArgumentNotNull(that, "that");
ct.Register(that.Cancel);
return that.Then(null, null, (err) => {
ct.ThrowIfCancellationRequested();
throw new PromiseTransientException(err);
});
}
cin
added Safe.DispatchEvent() a legacy equivalent for '?.Invoke()'...
r207 #endif
cin
promises refactoring
r72 }
}