# HG changeset patch # User cin # Date 2015-05-06 14:11:27 # Node ID eb793fbbe4ea5d051ef9b4fa446cb957826eda9c # Parent e6d4b41f0101bb778d4e6eabb0b0d3af6bd90337 fixed promises cancellation diff --git a/Implab.Test/AsyncTests.cs b/Implab.Test/AsyncTests.cs --- a/Implab.Test/AsyncTests.cs +++ b/Implab.Test/AsyncTests.cs @@ -746,33 +746,31 @@ namespace Implab.Test { public void ChainedCancel2Test() { // при отмене цепочки обещаний, вложенные операции также должны отменяться var pSurvive = new Promise(); - var hemStarted = new ManualResetEvent(false); + var hemStarted = new Signal(); var p = PromiseHelper .Sleep(1, "Hi, HAL!") - .Chain(x => { + .Chain(() => { hemStarted.Set(); // запускаем две асинхронные операции var result = PromiseHelper - .Sleep(100000000, "HEM ENABLED!!!") - .Then(s => { - pSurvive.Resolve(false); - return s; - }); + .Sleep(2000, "HEM ENABLED!!!") + .Then(() => pSurvive.Resolve(false)); result .On(() => pSurvive.Resolve(true), PromiseEventType.Cancelled); - + return result; }); - hemStarted.WaitOne(); + hemStarted.Wait(); p.Cancel(); try { p.Join(); + Assert.Fail(); } catch (OperationCanceledException) { - Assert.IsTrue(pSurvive.Join()); } + Assert.IsTrue(pSurvive.Join()); } [TestMethod] diff --git a/Implab.Test/CancelationTests.cs b/Implab.Test/CancelationTests.cs --- a/Implab.Test/CancelationTests.cs +++ b/Implab.Test/CancelationTests.cs @@ -48,7 +48,7 @@ namespace Implab.Test { bool run = false; var task = new ActionTask(() => { run = true; - }, null, null); + }, null, null, true); // request cancelation task.Cancel(); @@ -65,7 +65,7 @@ namespace Implab.Test { var task = new ActionTask(() => { started.Set(); finish.Wait(); - }, null, null); + }, null, null, true); AsyncPool.RunThread(() => { task.Resolve(); @@ -85,14 +85,18 @@ namespace Implab.Test { [TestMethod] public void CancelTaskChainFromBottom() { + var started = new Signal(); var check1 = new Signal(); var requested = false; var p1 = AsyncPool.RunThread(token => { token.CancellationRequested(reason => requested = true); + started.Set(); check1.Wait(); token.CancelOperationIfRequested(); }); + started.Wait(); + var p2 = p1.Then(() => { }); diff --git a/Implab.Test/PromiseHelper.cs b/Implab.Test/PromiseHelper.cs --- a/Implab.Test/PromiseHelper.cs +++ b/Implab.Test/PromiseHelper.cs @@ -4,7 +4,8 @@ using System.Threading; namespace Implab.Test { static class PromiseHelper { public static IPromise Sleep(int timeout, T retVal) { - return AsyncPool.Invoke(() => { + return AsyncPool.Invoke((ct) => { + ct.CancellationRequested(ct.CancelOperation); Thread.Sleep(timeout); return retVal; }); diff --git a/Implab/ActionChainTask.cs b/Implab/ActionChainTask.cs --- a/Implab/ActionChainTask.cs +++ b/Implab/ActionChainTask.cs @@ -4,14 +4,16 @@ namespace Implab { public class ActionChainTask : ActionChainTaskBase, IDeferred { readonly Func m_task; - public ActionChainTask(Func task, Func error, Func cancel) : base(error,cancel) { + public ActionChainTask(Func task, Func error, Func cancel, bool autoCancellable) : base(error,cancel, autoCancellable) { m_task = task; } public void Resolve() { if (m_task != null && LockCancelation()) { try { - Observe(m_task()); + var p = m_task(); + p.On(SetResult, HandleErrorInternal, SetCancelled); + CancellationRequested(p.Cancel); } catch(Exception err) { HandleErrorInternal(err); } diff --git a/Implab/ActionChainTaskBase.cs b/Implab/ActionChainTaskBase.cs --- a/Implab/ActionChainTaskBase.cs +++ b/Implab/ActionChainTaskBase.cs @@ -8,9 +8,11 @@ namespace Implab { int m_cancelationLock; - protected ActionChainTaskBase( Func error, Func cancel) { + protected ActionChainTaskBase(Func error, Func cancel, bool autoCancellable) { m_error = error; m_cancel = cancel; + if (autoCancellable) + CancellationRequested(CancelOperation); } public void Reject(Exception error) { @@ -21,21 +23,26 @@ namespace Implab { public override void CancelOperation(Exception reason) { - if (m_cancel != null && LockCancelation()) { - try { - Observe(m_cancel(reason)); - } catch(Exception err) { - HandleErrorInternal(err); + if (LockCancelation()) { + if (m_cancel != null) { + try { + m_cancel(reason).On(SetResult, SetError, SetCancelled); + } catch (Exception err) { + HandleErrorInternal(err); + } + } else { + SetCancelled(reason); } } - } protected void HandleErrorInternal(Exception error) { if (m_error != null) { try { - Observe(m_error(error)); - } catch(Exception err) { + var p = m_error(error); + p.On(SetResult,SetError,SetCancelled); + CancellationRequested(p.Cancel); + } catch (Exception err) { SetError(err); } } else { @@ -43,17 +50,6 @@ namespace Implab { } } - protected void Observe(IPromise operation) { - if (operation == null) - throw new NullReferenceException("The task returned null promise"); - - // pass operation results to the current promise - operation.On(SetResult, SetError, SetCancelled); - - // pass the cancelation request - CancellationRequested(operation.Cancel); - } - protected bool LockCancelation() { return 0 == Interlocked.CompareExchange(ref m_cancelationLock, 1, 0); } diff --git a/Implab/ActionChainTaskT.cs b/Implab/ActionChainTaskT.cs --- a/Implab/ActionChainTaskT.cs +++ b/Implab/ActionChainTaskT.cs @@ -4,14 +4,16 @@ namespace Implab { public class ActionChainTask : ActionChainTaskBase, IDeferred { readonly Func m_task; - public ActionChainTask(Func task, Func error, Func cancel) : base(error,cancel) { + public ActionChainTask(Func task, Func error, Func cancel, bool autoCancellable) : base(error,cancel, autoCancellable) { m_task = task; } public void Resolve(T value) { if (m_task != null && LockCancelation()) { try { - Observe(m_task(value)); + var p = m_task(value); + p.On(SetResult, HandleErrorInternal, SetCancelled); + CancellationRequested(p.Cancel); } catch(Exception err) { HandleErrorInternal(err); } diff --git a/Implab/ActionTask.cs b/Implab/ActionTask.cs --- a/Implab/ActionTask.cs +++ b/Implab/ActionTask.cs @@ -3,7 +3,7 @@ namespace Implab { public class ActionTask : ActionTaskBase, IDeferred { readonly Action m_task; - public ActionTask(Action task, Action error, Action cancel) : base(error,cancel) { + public ActionTask(Action task, Action error, Action cancel, bool autoCancellable) : base(error,cancel, autoCancellable) { m_task = task; } diff --git a/Implab/ActionTaskBase.cs b/Implab/ActionTaskBase.cs --- a/Implab/ActionTaskBase.cs +++ b/Implab/ActionTaskBase.cs @@ -8,9 +8,11 @@ namespace Implab { int m_cancelationLock; - protected ActionTaskBase( Action error, Action cancel) { + protected ActionTaskBase( Action error, Action cancel, bool autoCancellable) { m_error = error; m_cancel = cancel; + if (autoCancellable) + CancellationRequested(CancelOperation); } public void Reject(Exception error) { diff --git a/Implab/ActionTaskT.cs b/Implab/ActionTaskT.cs --- a/Implab/ActionTaskT.cs +++ b/Implab/ActionTaskT.cs @@ -3,7 +3,7 @@ namespace Implab { public class ActionTask : ActionTaskBase, IDeferred { readonly Action m_task; - public ActionTask(Action task, Action error, Action cancel) : base(error,cancel) { + public ActionTask(Action task, Action error, Action cancel, bool autoCancellable) : base(error,cancel, autoCancellable) { m_task = task; } diff --git a/Implab/FuncChainTask.cs b/Implab/FuncChainTask.cs --- a/Implab/FuncChainTask.cs +++ b/Implab/FuncChainTask.cs @@ -4,14 +4,17 @@ namespace Implab { public class FuncChainTask : FuncChainTaskBase, IDeferred { readonly Func> m_task; - public FuncChainTask(Func> task, Func> error, Func> cancel) : base(error, cancel){ + public FuncChainTask(Func> task, Func> error, Func> cancel, bool autoCancellable) + : base(error, cancel, autoCancellable) { m_task = task; } public void Resolve() { if (m_task != null && LockCancelation()) { try { - Observe(m_task()); + var operation = m_task(); + operation.On(SetResult, HandleErrorInternal, SetCancelled); + CancellationRequested(operation.Cancel); } catch (Exception err) { HandleErrorInternal(err); } diff --git a/Implab/FuncChainTaskBase.cs b/Implab/FuncChainTaskBase.cs --- a/Implab/FuncChainTaskBase.cs +++ b/Implab/FuncChainTaskBase.cs @@ -8,9 +8,11 @@ namespace Implab { int m_cancelationLock; - protected FuncChainTaskBase( Func> error, Func> cancel) { + protected FuncChainTaskBase( Func> error, Func> cancel, bool autoCancellable) { m_error = error; m_cancel = cancel; + if (autoCancellable) + CancellationRequested(CancelOperation); } public void Reject(Exception error) { @@ -19,11 +21,15 @@ namespace Implab { } public override void CancelOperation(Exception reason) { - if (m_cancel != null && LockCancelation()) { - try { - Observe(m_cancel(reason)); - } catch(Exception err) { - HandleErrorInternal(err); + if (LockCancelation()) { + if (m_cancel != null) { + try { + m_cancel(reason).On(SetResult, HandleErrorInternal, SetCancelled); + } catch (Exception err) { + HandleErrorInternal(err); + } + } else { + SetCancelled(reason); } } @@ -32,7 +38,10 @@ namespace Implab { protected void HandleErrorInternal(Exception error) { if (m_error != null) { try { - Observe(m_error(error)); + var operation = m_error(error); + + operation.On(SetResult, SetError, SetCancelled); + CancellationRequested(operation.Cancel); } catch(Exception err) { SetError(err); } @@ -41,17 +50,6 @@ namespace Implab { } } - protected void Observe(IPromise operation) { - if (operation == null) - throw new NullReferenceException("The task returned null promise"); - - // pass operation results to the current promise - operation.On(SetResult, SetError, SetCancelled); - - // pass the cancelation request - CancellationRequested(operation.Cancel); - } - protected bool LockCancelation() { return 0 == Interlocked.CompareExchange(ref m_cancelationLock, 1, 0); } diff --git a/Implab/FuncChainTaskT.cs b/Implab/FuncChainTaskT.cs --- a/Implab/FuncChainTaskT.cs +++ b/Implab/FuncChainTaskT.cs @@ -4,14 +4,16 @@ namespace Implab { public class FuncChainTask : FuncChainTaskBase, IDeferred { readonly Func> m_task; - public FuncChainTask(Func> task, Func> error, Func> cancel) : base(error, cancel){ + public FuncChainTask(Func> task, Func> error, Func> cancel, bool autoCancellable) : base(error, cancel, autoCancellable){ m_task = task; } public void Resolve(TArg value) { if (m_task != null && LockCancelation()) { try { - Observe(m_task(value)); + var operation = m_task(value); + operation.On(SetResult, HandleErrorInternal, SetCancelled); + CancellationRequested(operation.Cancel); } catch (Exception err) { HandleErrorInternal(err); } diff --git a/Implab/FuncTask.cs b/Implab/FuncTask.cs --- a/Implab/FuncTask.cs +++ b/Implab/FuncTask.cs @@ -5,7 +5,7 @@ namespace Implab { public class FuncTask : FuncTaskBase, IDeferred { readonly Func m_task; - public FuncTask(Func task, Func error, Func cancel) : base(error,cancel) { + public FuncTask(Func task, Func error, Func cancel, bool autoCancellable) : base(error, cancel, autoCancellable) { m_task = task; } diff --git a/Implab/FuncTaskBase.cs b/Implab/FuncTaskBase.cs --- a/Implab/FuncTaskBase.cs +++ b/Implab/FuncTaskBase.cs @@ -8,9 +8,11 @@ namespace Implab { int m_cancelationLock; - protected FuncTaskBase( Func error, Func cancel) { + protected FuncTaskBase( Func error, Func cancel, bool autoCancellable) { m_error = error; m_cancel = cancel; + if (autoCancellable) + CancellationRequested(CancelOperation); } public void Reject(Exception error) { diff --git a/Implab/FuncTaskT.cs b/Implab/FuncTaskT.cs --- a/Implab/FuncTaskT.cs +++ b/Implab/FuncTaskT.cs @@ -4,7 +4,7 @@ namespace Implab { public class FuncTask : FuncTaskBase, IDeferred { readonly Func m_task; - public FuncTask(Func task, Func error,Func cancel) : base(error,cancel) { + public FuncTask(Func task, Func error,Func cancel, bool autoCancellable) : base(error,cancel, autoCancellable) { m_task = task; } diff --git a/Implab/Parallels/WorkerPool.cs b/Implab/Parallels/WorkerPool.cs --- a/Implab/Parallels/WorkerPool.cs +++ b/Implab/Parallels/WorkerPool.cs @@ -30,7 +30,49 @@ namespace Implab.Parallels { InitPool(); } - public Promise Invoke(Func task) { + public IPromise Invoke(Func task) { + if (task == null) + throw new ArgumentNullException("task"); + if (IsDisposed) + throw new ObjectDisposedException(ToString()); + + var promise = new FuncTask(task, null, null, true); + + var lop = TraceContext.Instance.CurrentOperation; + + EnqueueTask(delegate { + TraceContext.Instance.EnterLogicalOperation(lop, false); + + promise.Resolve(); + + TraceContext.Instance.Leave(); + }); + + return promise; + } + + public IPromise Invoke(Action task) { + if (task == null) + throw new ArgumentNullException("task"); + if (IsDisposed) + throw new ObjectDisposedException(ToString()); + + var promise = new ActionTask(task, null, null, true); + + var lop = TraceContext.Instance.CurrentOperation; + + EnqueueTask(delegate { + TraceContext.Instance.EnterLogicalOperation(lop, false); + + promise.Resolve(); + + TraceContext.Instance.Leave(); + }); + + return promise; + } + + public IPromise Invoke(Func task) { if (task == null) throw new ArgumentNullException("task"); if (IsDisposed) @@ -43,7 +85,35 @@ namespace Implab.Parallels { EnqueueTask(delegate { TraceContext.Instance.EnterLogicalOperation(lop, false); try { - promise.Resolve(task()); + if (!promise.CancelOperationIfRequested()) + promise.Resolve(task(promise)); + } catch (Exception e) { + promise.Reject(e); + } finally { + TraceContext.Instance.Leave(); + } + }); + + return promise; + } + + public IPromise Invoke(Action task) { + if (task == null) + throw new ArgumentNullException("task"); + if (IsDisposed) + throw new ObjectDisposedException(ToString()); + + var promise = new Promise(); + + var lop = TraceContext.Instance.CurrentOperation; + + EnqueueTask(delegate { + TraceContext.Instance.EnterLogicalOperation(lop, false); + try { + if (!promise.CancelOperationIfRequested()) { + task(promise); + promise.Resolve(); + } } catch (Exception e) { promise.Reject(e); } finally { diff --git a/Implab/PromiseExtensions.cs b/Implab/PromiseExtensions.cs --- a/Implab/PromiseExtensions.cs +++ b/Implab/PromiseExtensions.cs @@ -178,7 +178,7 @@ namespace Implab { public static IPromise Then(this IPromise that, Action success, Action error, Action cancel) { Safe.ArgumentNotNull(that, "that"); - var d = new ActionTask(success, error, cancel); + var d = new ActionTask(success, error, cancel, false); that.On(d.Resolve, d.Reject, d.CancelOperation); if (success != null) d.CancellationRequested(that.Cancel); @@ -196,7 +196,7 @@ namespace Implab { public static IPromise Then(this IPromise that, Func success, Func error, Func cancel) { Safe.ArgumentNotNull(that, "that"); - var d = new FuncTask(success, error, cancel); + var d = new FuncTask(success, error, cancel, false); that.On(d.Resolve, d.Reject, d.CancelOperation); if (success != null) d.CancellationRequested(that.Cancel); @@ -213,7 +213,7 @@ namespace Implab { public static IPromise Then(this IPromise that, Func success, Func error, Func cancel) { Safe.ArgumentNotNull(that, "that"); - var d = new FuncTask(success, error, cancel); + var d = new FuncTask(success, error, cancel, false); that.On(d.Resolve, d.Reject, d.CancelOperation); if (success != null) d.CancellationRequested(that.Cancel); @@ -232,7 +232,7 @@ namespace Implab { public static IPromise Chain(this IPromise that, Func success, Func error, Func cancel) { Safe.ArgumentNotNull(that, "that"); - var d = new ActionChainTask(success, error, cancel); + var d = new ActionChainTask(success, error, cancel, false); that.On(d.Resolve, d.Reject, d.CancelOperation); if (success != null) d.CancellationRequested(that.Cancel); @@ -250,7 +250,7 @@ namespace Implab { public static IPromise Chain(this IPromise that, Func> success, Func> error, Func> cancel) { Safe.ArgumentNotNull(that, "that"); - var d = new FuncChainTask(success, error, cancel); + var d = new FuncChainTask(success, error, cancel, false); that.On(d.Resolve, d.Reject, d.CancelOperation); if (success != null) d.CancellationRequested(that.Cancel); @@ -267,7 +267,7 @@ namespace Implab { public static IPromise Chain(this IPromise that, Func> success, Func> error, Func> cancel) { Safe.ArgumentNotNull(that, "that"); - var d = new FuncChainTask(success, error, cancel); + var d = new FuncChainTask(success, error, cancel, false); that.On(d.Resolve, d.Reject, d.CancelOperation); if (success != null) d.CancellationRequested(that.Cancel);