##// END OF EJS Templates
fixed blocking queue
fixed blocking queue

File last commit:

r138:f75cfa58e3d4 v2
r139:041b77711262 v2
Show More
PromiseExtensions.cs
192 lines | 6.4 KiB | text/x-csharp | CSharpLexer
/ Implab / PromiseExtensions.cs
using System.Threading;
using System;
using Implab.Diagnostics;
using System.Collections.Generic;
#if NET_4_5
using System.Threading.Tasks;
#endif
namespace Implab {
public static class PromiseExtensions {
public static IPromise<T> DispatchToCurrentContext<T>(this IPromise<T> that) {
Safe.ArgumentNotNull(that, "that");
var context = SynchronizationContext.Current;
if (context == null)
return that;
var p = new SyncContextPromise<T>(context);
p.On(that.Cancel, PromiseEventType.Cancelled);
that.On(
p.Resolve,
p.Reject,
p.Cancel
);
return p;
}
public static IPromise<T> DispatchToContext<T>(this IPromise<T> that, SynchronizationContext context) {
Safe.ArgumentNotNull(that, "that");
Safe.ArgumentNotNull(context, "context");
var p = new SyncContextPromise<T>(context);
p.On(that.Cancel, PromiseEventType.Cancelled);
that.On(
p.Resolve,
p.Reject,
p.Cancel
);
return p;
}
/// <summary>
/// Ensures the dispatched.
/// </summary>
/// <returns>The dispatched.</returns>
/// <param name="that">That.</param>
/// <param name="head">Head.</param>
/// <param name="cleanup">Cleanup.</param>
/// <typeparam name="TPromise">The 1st type parameter.</typeparam>
/// <typeparam name="T">The 2nd type parameter.</typeparam>
public static TPromise EnsureDispatched<TPromise,T>(this TPromise that, IPromise<T> head, Action<T> cleanup) where TPromise : IPromise{
Safe.ArgumentNotNull(that, "that");
Safe.ArgumentNotNull(head, "head");
that.On(() => head.On(cleanup), PromiseEventType.Cancelled);
return that;
}
public static AsyncCallback AsyncCallback<T>(this Promise<T> that, Func<IAsyncResult,T> callback) {
Safe.ArgumentNotNull(that, "that");
Safe.ArgumentNotNull(callback, "callback");
var op = TraceContext.Instance.CurrentOperation;
return ar => {
TraceContext.Instance.EnterLogicalOperation(op,false);
try {
that.Resolve(callback(ar));
} catch (Exception err) {
that.Reject(err);
} finally {
TraceContext.Instance.Leave();
}
};
}
static void CancelCallback(object cookie) {
((ICancellable)cookie).Cancel();
}
/// <summary>
/// Cancells promise after the specified timeout is elapsed.
/// </summary>
/// <param name="that">The promise to cancel on timeout.</param>
/// <param name="milliseconds">The timeout in milliseconds.</param>
/// <typeparam name="TPromise">The 1st type parameter.</typeparam>
public static TPromise Timeout<TPromise>(this TPromise that, int milliseconds) where TPromise : IPromise {
Safe.ArgumentNotNull(that, "that");
var timer = new Timer(CancelCallback, that, milliseconds, -1);
that.On(timer.Dispose, PromiseEventType.All);
return that;
}
public static IPromise Bundle(this ICollection<IPromise> that) {
Safe.ArgumentNotNull(that, "that");
int count = that.Count;
int errors = 0;
var medium = new Promise();
if (count == 0) {
medium.Resolve();
return medium;
}
medium.On(() => {
foreach(var p2 in that)
p2.Cancel();
}, PromiseEventType.ErrorOrCancel);
foreach (var p in that)
p.On(
() => {
if (Interlocked.Decrement(ref count) == 0)
medium.Resolve();
},
error => {
if (Interlocked.Increment(ref errors) == 1)
medium.Reject(
new Exception("The dependency promise is failed", error)
);
},
reason => {
if (Interlocked.Increment(ref errors) == 1)
medium.Cancel(
new Exception("The dependency promise is cancelled")
);
}
);
return medium;
}
public static IPromise<T[]> Bundle<T>(this ICollection<IPromise<T>> that) {
Safe.ArgumentNotNull(that, "that");
int count = that.Count;
int errors = 0;
var medium = new Promise<T[]>();
var results = new T[that.Count];
medium.On(() => {
foreach(var p2 in that)
p2.Cancel();
}, PromiseEventType.ErrorOrCancel);
int i = 0;
foreach (var p in that) {
var idx = i;
p.On(
x => {
results[idx] = x;
if (Interlocked.Decrement(ref count) == 0)
medium.Resolve(results);
},
error => {
if (Interlocked.Increment(ref errors) == 1)
medium.Reject(
new Exception("The dependency promise is failed", error)
);
},
reason => {
if (Interlocked.Increment(ref errors) == 1)
medium.Cancel(
new Exception("The dependency promise is cancelled", reason)
);
}
);
i++;
}
return medium;
}
#if NET_4_5
public static Task<T> GetTask<T>(this IPromise<T> that) {
Safe.ArgumentNotNull(that, "that");
var tcs = new TaskCompletionSource<T>();
that.On(tcs.SetResult, tcs.SetException, r => tcs.SetCanceled());
return tcs.Task;
}
#endif
}
}