# HG changeset patch # User cin # Date 2015-01-11 16:13:02 # Node ID 2573b562e3284452b36869a55cfbddec843e7139 # Parent e046a94eecb108afb038bee28ded899e56a16ce7 Promises rewritten, added improved version of AsyncQueue diff --git a/Implab.Fx/ControlBoundPromise.cs b/Implab.Fx/ControlBoundPromise.cs --- a/Implab.Fx/ControlBoundPromise.cs +++ b/Implab.Fx/ControlBoundPromise.cs @@ -12,19 +12,27 @@ namespace Implab.Fx { m_target = target; } - public ControlBoundPromise(Control target, IPromise parent) - : base(parent) { - Safe.ArgumentNotNull(target, "target"); - - m_target = target; + protected override void SignalSuccess(IDeferred handler) { + if (m_target.InvokeRequired) + m_target.BeginInvoke(new Action>(base.SignalSuccess), handler); + else + base.SignalSuccess(handler); } - protected override void InvokeHandler(AbstractHandler handler) { + protected override void SignalCancelled(IDeferred handler) { if (m_target.InvokeRequired) - m_target.BeginInvoke(new Action(base.InvokeHandler), handler); + m_target.BeginInvoke(new Action>(base.SignalCancelled), handler); else - base.InvokeHandler(handler); + base.SignalCancelled(handler); } + + protected override void SignalError(IDeferred handler, Exception error) { + if (m_target.InvokeRequired) + m_target.BeginInvoke(new Action,Exception>(base.SignalError), handler, error); + else + base.SignalError(handler, error); + } + } } diff --git a/Implab.Fx/PromiseHelpers.cs b/Implab.Fx/PromiseHelpers.cs --- a/Implab.Fx/PromiseHelpers.cs +++ b/Implab.Fx/PromiseHelpers.cs @@ -27,7 +27,9 @@ namespace Implab.Fx Safe.ArgumentNotNull(that, "that"); Safe.ArgumentNotNull(ctl, "ctl"); - var directed = new ControlBoundPromise(ctl,that); + var directed = new ControlBoundPromise(ctl); + + directed.On(that.Cancel, PromiseEventType.Cancelled); that.On( directed.Resolve, diff --git a/Implab.Test/AsyncTests.cs b/Implab.Test/AsyncTests.cs --- a/Implab.Test/AsyncTests.cs +++ b/Implab.Test/AsyncTests.cs @@ -72,7 +72,7 @@ namespace Implab.Test { p.Cancel(); var p2 = p - .Cancelled(() => { + .Cancelled(() => { throw new ApplicationException("CANCELLED"); }) .Error(e => true); @@ -195,11 +195,11 @@ namespace Implab.Test { .Invoke(() => 1) .Then(x => Interlocked.Add(ref count, x)) .Then(x => Math.Log10(x)) - .Anyway(() => { + .On(() => { Interlocked.Decrement(ref pending); if (pending == 0) stop.Set(); - }); + }, PromiseEventType.All); } stop.WaitOne(); @@ -255,7 +255,7 @@ namespace Implab.Test { } return 1; }) - .Anyway(() => Interlocked.Decrement(ref writers)); + .On(() => Interlocked.Decrement(ref writers), PromiseEventType.All); } for (int i = 0; i < 10; i++) { @@ -269,11 +269,72 @@ namespace Implab.Test { } while (writers > 0); return 1; }) - .Anyway(() => { + .On(() => { Interlocked.Decrement(ref readers); if (readers == 0) stop.Set(); - }); + }, PromiseEventType.All); + } + + stop.WaitOne(); + + Assert.AreEqual(100000, total); + } + + [TestMethod] + public void AsyncQueueTest() { + var queue = new AsyncQueue(); + int res; + + queue.Enqueue(10); + Assert.IsTrue(queue.TryDequeue(out res)); + Assert.AreEqual(10, res); + Assert.IsFalse(queue.TryDequeue(out res)); + + for (int i = 0; i < 1000; i++) + queue.Enqueue(i); + + for (int i = 0; i < 1000; i++) { + queue.TryDequeue(out res); + Assert.AreEqual(i, res); + } + + int writers = 0; + int readers = 0; + var stop = new ManualResetEvent(false); + int total = 0; + + const int itemsPerWriter = 10000; + const int writersCount = 10; + + for (int i = 0; i < writersCount; i++) { + Interlocked.Increment(ref writers); + AsyncPool + .InvokeNewThread(() => { + for (int ii = 0; ii < itemsPerWriter; ii++) { + queue.Enqueue(1); + } + return 1; + }) + .On(() => Interlocked.Decrement(ref writers), PromiseEventType.All); + } + + for (int i = 0; i < 10; i++) { + Interlocked.Increment(ref readers); + AsyncPool + .InvokeNewThread(() => { + int t; + do { + while (queue.TryDequeue(out t)) + Interlocked.Add(ref total, t); + } while (writers > 0); + return 1; + }) + .On(() => { + Interlocked.Decrement(ref readers); + if (readers == 0) + stop.Set(); + }, PromiseEventType.All); } stop.WaitOne(); @@ -371,15 +432,15 @@ namespace Implab.Test { var step1 = PromiseHelper .Sleep(200, "Alan") - .Cancelled(() => flags[0] = true); + .On(() => flags[0] = true, PromiseEventType.Cancelled); var p = step1 .Chain(x => PromiseHelper .Sleep(200, "Hi, " + x) .Then(y => y) - .Cancelled(() => flags[1] = true) + .On(() => flags[1] = true, PromiseEventType.Cancelled) ) - .Cancelled(() => flags[2] = true); + .On(() => flags[2] = true, PromiseEventType.Cancelled); step1.Join(); p.Cancel(); try { diff --git a/Implab/AbstractPromise.cs b/Implab/AbstractPromise.cs new file mode 100644 --- /dev/null +++ b/Implab/AbstractPromise.cs @@ -0,0 +1,219 @@ +using System; +using Implab.Parallels; +using System.Threading; +using System.Reflection; + +namespace Implab { + public abstract class AbstractPromise { + + const int UNRESOLVED_SATE = 0; + const int TRANSITIONAL_STATE = 1; + const int SUCCEEDED_STATE = 2; + const int REJECTED_STATE = 3; + const int CANCELLED_STATE = 4; + + int m_state; + Exception m_error; + + readonly AsyncQueue m_handlers = new AsyncQueue(); + + #region state managment + bool BeginTransit() { + return UNRESOLVED_SATE == Interlocked.CompareExchange(ref m_state, TRANSITIONAL_STATE, UNRESOLVED_SATE); + } + + void CompleteTransit(int state) { + if (TRANSITIONAL_STATE != Interlocked.CompareExchange(ref m_state, state, TRANSITIONAL_STATE)) + throw new InvalidOperationException("Can't complete transition when the object isn't in the transitional state"); + } + + void WaitTransition() { + while (m_state == TRANSITIONAL_STATE) { + Thread.MemoryBarrier(); + } + } + + protected void BeginSetResult() { + if (!BeginTransit()) { + WaitTransition(); + if (m_state != CANCELLED_STATE) + throw new InvalidOperationException("The promise is already resolved"); + } + } + + protected void EndSetResult() { + CompleteTransit(SUCCEEDED_STATE); + OnSuccess(); + } + + + + /// + /// Выполняет обещание, сообщая об ошибке + /// + /// + /// Поскольку обещание должно работать в многопточной среде, при его выполнении сразу несколько потоков + /// могу вернуть ошибку, при этом только первая будет использована в качестве результата, остальные + /// будут проигнорированы. + /// + /// Исключение возникшее при выполнении операции + /// Данное обещание уже выполнено + protected void SetError(Exception error) { + if (BeginTransit()) { + m_error = error is PromiseTransientException ? error.InnerException : error; + CompleteTransit(REJECTED_STATE); + OnError(); + } else { + WaitTransition(); + if (m_state == SUCCEEDED_STATE) + throw new InvalidOperationException("The promise is already resolved"); + } + } + + /// + /// Отменяет операцию, если это возможно. + /// + /// Для определения была ли операция отменена следует использовать свойство . + protected void SetCancelled() { + if (BeginTransit()) { + CompleteTransit(CANCELLED_STATE); + OnCancelled(); + } + } + + protected abstract void SignalSuccess(THandler handler); + + protected abstract void SignalError(THandler handler, Exception error); + + protected abstract void SignalCancelled(THandler handler); + + void OnSuccess() { + THandler handler; + while (m_handlers.TryDequeue(out handler)) + SignalSuccess(handler); + } + + void OnError() { + THandler handler; + while (m_handlers.TryDequeue(out handler)) + SignalError(handler,m_error); + } + + void OnCancelled() { + THandler handler; + while (m_handlers.TryDequeue(out handler)) + SignalCancelled(handler); + } + + #endregion + + protected abstract void Listen(PromiseEventType events, Action handler); + + #region synchronization traits + protected void WaitResult(int timeout) { + if (!IsResolved) { + var lk = new object(); + + Listen(PromiseEventType.All, () => { + lock(lk) { + Monitor.Pulse(lk); + } + }); + + lock (lk) { + while(!IsResolved) { + if(!Monitor.Wait(lk,timeout)) + throw new TimeoutException(); + } + } + + } + switch (m_state) { + case SUCCEEDED_STATE: + return; + case CANCELLED_STATE: + throw new OperationCanceledException(); + case REJECTED_STATE: + throw new TargetInvocationException(m_error); + default: + throw new ApplicationException(String.Format("Invalid promise state {0}", m_state)); + } + } + #endregion + + #region handlers managment + + protected void AddHandler(THandler handler) { + + if (IsResolved) { + InvokeHandler(handler); + + } else { + // the promise is in the resolved state, just invoke the handler + m_handlers.Enqueue(handler); + + + if (IsResolved && m_handlers.TryDequeue(out handler)) + // if the promise have been resolved while we was adding the handler to the queue + // we can't guarantee that someone is still processing it + // therefore we need to fetch a handler from the queue and execute it + // note that fetched handler may be not the one that we have added + // even we can fetch no handlers at all :) + InvokeHandler(handler); + } + } + + protected void InvokeHandler(THandler handler) { + switch (m_state) { + case SUCCEEDED_STATE: + SignalSuccess(handler); + break; + case CANCELLED_STATE: + SignalCancelled(handler); + break; + case REJECTED_STATE: + SignalError(handler, m_error); + break; + default: + throw new Exception(String.Format("Invalid promise state {0}", m_state)); + } + } + + #endregion + + #region IPromise implementation + + public void Join(int timeout) { + WaitResult(timeout); + } + + public void Join() { + WaitResult(-1); + } + + public bool IsResolved { + get { + Thread.MemoryBarrier(); + return m_state > 1; + } + } + + public bool IsCancelled { + get { + Thread.MemoryBarrier(); + return m_state == CANCELLED_STATE; + } + } + + #endregion + + #region ICancellable implementation + + public void Cancel() { + SetCancelled(); + } + + #endregion + } +} + diff --git a/Implab/ComponentContainer.cs b/Implab/ComponentContainer.cs --- a/Implab/ComponentContainer.cs +++ b/Implab/ComponentContainer.cs @@ -18,7 +18,7 @@ namespace Implab { } bool m_disposed; - readonly MTQueue m_components = new MTQueue(); + readonly AsyncQueue m_components = new AsyncQueue(); public void Add(IDisposable item) { Safe.ArgumentNotNull(item, "item"); diff --git a/Implab/Diagnostics/Extensions.cs b/Implab/Diagnostics/Extensions.cs --- a/Implab/Diagnostics/Extensions.cs +++ b/Implab/Diagnostics/Extensions.cs @@ -1,23 +1,23 @@ -namespace Implab.Diagnostics { +using System; + +namespace Implab.Diagnostics { public static class Extensions { public static IPromise EndLogicalOperation(this IPromise promise) { Safe.ArgumentNotNull(promise, "promise"); var op = TraceContext.Instance.DetachLogicalOperation(); - return promise.Then( - x => { + return promise.On( + x => { TraceContext.Instance.EnterLogicalOperation(op,true); TraceLog.TraceInformation("promise = {0}", x); TraceLog.EndLogicalOperation(); TraceContext.Instance.Leave(); - return x; }, err =>{ TraceContext.Instance.EnterLogicalOperation(op,true); TraceLog.TraceError("promise died {0}", err); TraceLog.EndLogicalOperation(); TraceContext.Instance.Leave(); - throw new TransientPromiseException(err); }, () => { TraceContext.Instance.EnterLogicalOperation(op,true); @@ -32,11 +32,11 @@ Safe.ArgumentNotNull(promise, "promise"); var op = TraceContext.Instance.DetachLogicalOperation(); - return promise.Anyway(() => { + return promise.On(() => { TraceContext.Instance.EnterLogicalOperation(op,true); TraceLog.EndLogicalOperation(); TraceContext.Instance.Leave(); - }); + }, PromiseEventType.All); } } } diff --git a/Implab/Diagnostics/TraceContext.cs b/Implab/Diagnostics/TraceContext.cs --- a/Implab/Diagnostics/TraceContext.cs +++ b/Implab/Diagnostics/TraceContext.cs @@ -74,7 +74,7 @@ namespace Implab.Diagnostics { m_current = m_stack.Pop(); LogChannel.Default.LogEvent(new TraceEvent(TraceEventType.Leave, String.Format("{0} -> {1}", prev.Name, CurrentOperation.Name))); } else { - TraceLog.TraceWarning("Attemtp to leave the last operation context"); + TraceLog.TraceWarning("Attempt to leave the last operation context"); m_current = OperationContext.EMPTY; } } diff --git a/Implab/DisposablePool.cs b/Implab/DisposablePool.cs --- a/Implab/DisposablePool.cs +++ b/Implab/DisposablePool.cs @@ -7,7 +7,7 @@ using System.Diagnostics.CodeAnalysis; namespace Implab { public abstract class DisposablePool : IDisposable { readonly int m_size; - readonly MTQueue m_queue = new MTQueue(); + readonly AsyncQueue m_queue = new AsyncQueue(); [SuppressMessage("Microsoft.Design", "CA1000:DoNotDeclareStaticMembersOnGenericTypes")] static readonly bool _isValueType = typeof(T).IsValueType; diff --git a/Implab/IDeferred.cs b/Implab/IDeferred.cs new file mode 100644 --- /dev/null +++ b/Implab/IDeferred.cs @@ -0,0 +1,14 @@ +using System; + +namespace Implab { + /// + /// Deferred result, usually used by asynchronous services as the service part of the promise. + /// + public interface IDeferred : ICancellable { + + void Resolve(); + + void Reject(Exception error); + } +} + diff --git a/Implab/IDeferredT.cs b/Implab/IDeferredT.cs new file mode 100644 --- /dev/null +++ b/Implab/IDeferredT.cs @@ -0,0 +1,10 @@ +using System; + +namespace Implab { + public interface IDeferred : ICancellable { + void Resolve(T value); + + void Reject(Exception error); + } +} + diff --git a/Implab/IPromise.cs b/Implab/IPromise.cs --- a/Implab/IPromise.cs +++ b/Implab/IPromise.cs @@ -5,12 +5,6 @@ using System.Text; namespace Implab { public interface IPromise: ICancellable { - /// - /// Check whereather the promise has no more than one dependent promise. - /// - bool IsExclusive { - get; - } /// /// Тип результата, получаемого через данное обещание. @@ -27,44 +21,98 @@ namespace Implab { /// bool IsCancelled { get; } + /// + /// Creates a new promise dependend on the current one and resolved on + /// executing the specified handlers. + /// + /// The handler called on the successful promise completion. + /// The handler is called if an error while completing the promise occurred. + /// The handler is called in case of promise cancellation. + /// The newly created dependant promise. + /// + /// + /// If the success handler is specified the dependend promise will be resolved after the handler is + /// executed and the dependent promise will be linked to the current one, i.e. the cancellation + /// of the dependent property will lead to the cancellation of the current promise. If the + /// success handler isn't specified the dependent promise will not be linked to and + /// will not be resolved after the successfull resolution of the current one. + /// + /// + /// When the error handler is specified, the exception raised during the current promise completion + /// will be passed to it as the parameter. If the error handler returns without raising an + /// exception then the dependant promise will be resolved successfully, otherwise the exception + /// raised by the handler will be transmitted to the dependent promise. If the handler wants + /// to passthrough the original exception it needs to wrap the exception with + /// the . + /// + /// + /// If the cancelation handler is specified and the current promise is cancelled then the dependent + /// promise will be resolved after the handler is executed. If the cancelation hendler raises the + /// exception it will be passed to the dependent promise. + /// + /// IPromise Then(Action success, Action error, Action cancel); IPromise Then(Action success, Action error); IPromise Then(Action success); - IPromise Chain(Func chained, Func error, Action cancel); + IPromise Chain(Func chained, Func error, Func cancel); IPromise Chain(Func chained, Func error); IPromise Chain(Func chained); /// - /// Добавляет последнй обработчик в цепочку обещаний, не создает промежуточных обещаний. + /// Adds specified listeners to the current promise. /// - /// Success. - /// Error. - /// Cancel. - void On(Action success, Action error, Action cancel); - void On(Action success, Action error); - void On(Action success); - void On(Action success, PromiseEventType events); + /// The handler called on the successful promise completion. + /// The handler is called if an error while completing the promise occurred. + /// The handler is called in case of promise cancellation. + /// The current promise. + IPromise On(Action success, Action error, Action cancel); + IPromise On(Action success, Action error); + IPromise On(Action success); - IPromise Error(Action error); /// - /// Обрабатывает либо ошибку, либо результат, либо отмену. + /// Adds specified listeners to the current promise. + /// + /// The handler called on the specified events. + /// The combination of flags denoting the events for which the + /// handler shoud be called. + /// The current promise. + IPromise On(Action handler, PromiseEventType events); + + /// + /// Adds the specified error handler to the current promise + /// and creates the new dependant promise. /// - /// Обработчик. - /// После обработке ошибки, она передается дальше. + /// + /// The error handler. If the error handler returns without + /// an error the dependant promise will be successfully resolved. + /// + /// + /// The new dependant promise which will be resolved after the error + /// handler is executed. + /// + /// + /// The successfull result of the current promise will be ignored. + /// + IPromise Error(Action error); + /// - /// Обрабатывает либо ошибку, либо результат, либо отмену обещания. + /// Adds the specified cncellation handler to the current promise + /// and creates the new dependant promise. /// - /// Обработчик. - /// После обработке ошибки, она передается дальше. - IPromise Anyway(Action handler); - /// - /// Обработчик для регистрации отмены обещания. - /// - /// Новое обещание, связанное с текущим, выполнится после указанного обработчика. - /// Обработчик события. - /// Если обработчик вызывает исключение, то оно передается обработчику ошибки, результат работы - /// которого будет передан связанному обещанию + /// + /// The new dependant promise which will be resolved after the cancellation + /// handler is executed. + /// + /// + /// The cancellation handler. + /// + /// + /// If the cancellation handler is executed without an error the dependent + /// promise will be successfully resolved, otherwise the raised exception + /// will be passed to the dependant promise. The successful result of the + /// current promise will be ignored. + /// IPromise Cancelled(Action handler); /// diff --git a/Implab/IPromiseT.cs b/Implab/IPromiseT.cs --- a/Implab/IPromiseT.cs +++ b/Implab/IPromiseT.cs @@ -1,34 +1,34 @@ using System; namespace Implab { - public interface IPromise : IPromise { + public interface IPromise : IPromise { new T Join(); new T Join(int timeout); - void On(Action success, Action error, Action cancel); + IPromise On(Action success, Action error, Action cancel); - void On(Action success, Action error); + IPromise On(Action success, Action error); - void On(Action success); + IPromise On(Action success); - IPromise Then(Func mapper, Func error, Action cancel); + new IPromise On(Action handler, PromiseEventType events); + + IPromise Then(Func mapper, Func error, Func cancel); IPromise Then(Func mapper, Func error); IPromise Then(Func mapper); - IPromise Chain(Func> chained, Func> error, Action cancel); + IPromise Chain(Func> chained, Func> error, Func> cancel); IPromise Chain(Func> chained, Func> error); IPromise Chain(Func> chained); - IPromise Error(Func error); + IPromise Error(Func error); - new IPromise Cancelled(Action handler); - - new IPromise Anyway(Action handler); + IPromise Cancelled(Func handler); } } diff --git a/Implab/Implab.csproj b/Implab/Implab.csproj --- a/Implab/Implab.csproj +++ b/Implab/Implab.csproj @@ -7,6 +7,8 @@ Library Implab Implab + 8.0.30703 + 2.0 true @@ -131,12 +133,10 @@ - - @@ -150,6 +150,13 @@ + + + + + + + diff --git a/Implab/JSON/JSONWriter.cs b/Implab/JSON/JSONWriter.cs --- a/Implab/JSON/JSONWriter.cs +++ b/Implab/JSON/JSONWriter.cs @@ -1,9 +1,6 @@ using System; using System.Collections.Generic; using System.IO; -using System.Linq; -using System.Text; -using System.Threading.Tasks; namespace Implab.JSON { public class JSONWriter { @@ -23,7 +20,6 @@ namespace Implab.JSON { _escapeCR, _escapeNL, _escapeTAB, - _escapeSLASH, _escapeBSLASH, _escapeQ; @@ -34,7 +30,6 @@ namespace Implab.JSON { _escapeNL = "\\n".ToCharArray(); _escapeTAB = "\\t".ToCharArray(); _escapeBSLASH = "\\\\".ToCharArray(); - _escapeSLASH = "\\/".ToCharArray(); _escapeQ = "\\\"".ToCharArray(); } @@ -205,6 +200,7 @@ namespace Implab.JSON { var chars = value.ToCharArray(); m_writer.Write('"'); + // Analysis disable once ForCanBeConvertedToForeach for (int i = 0; i < chars.Length; i++) { var ch = chars[i]; diff --git a/Implab/ObjectPool.cs b/Implab/ObjectPool.cs --- a/Implab/ObjectPool.cs +++ b/Implab/ObjectPool.cs @@ -18,7 +18,7 @@ namespace Implab { /// Пул поддерживает обращения сразу из нескольких потоков. /// public abstract class ObjectPool where T : class { - readonly MTQueue m_queue = new MTQueue(); + readonly AsyncQueue m_queue = new AsyncQueue(); readonly int m_size; int m_count = 0; diff --git a/Implab/Parallels/ArrayTraits.cs b/Implab/Parallels/ArrayTraits.cs --- a/Implab/Parallels/ArrayTraits.cs +++ b/Implab/Parallels/ArrayTraits.cs @@ -26,7 +26,7 @@ namespace Implab.Parallels { m_pending = source.Length; m_action = action; - m_promise.Anyway(Dispose); + m_promise.On(Dispose, PromiseEventType.All); InitPool(); } @@ -86,7 +86,7 @@ namespace Implab.Parallels { m_transform = transform; m_logicalOperation = TraceContext.Instance.CurrentOperation; - m_promise.Anyway(Dispose); + m_promise.On(Dispose, PromiseEventType.All); InitPool(); } @@ -162,7 +162,7 @@ namespace Implab.Parallels { int slots = threads; // Analysis disable AccessToDisposedClosure - AsyncPool.InvokeNewThread(() => { + AsyncPool.InvokeNewThread(() => { for (int i = 0; i < source.Length; i++) { if(promise.IsResolved) break; // stop processing in case of error or cancellation @@ -177,7 +177,7 @@ namespace Implab.Parallels { try { transform(source[i]) - .Anyway(() => { + .On( x => { Interlocked.Increment(ref slots); lock (locker) { Monitor.Pulse(locker); diff --git a/Implab/Parallels/AsyncPool.cs b/Implab/Parallels/AsyncPool.cs --- a/Implab/Parallels/AsyncPool.cs +++ b/Implab/Parallels/AsyncPool.cs @@ -53,7 +53,7 @@ namespace Implab.Parallels { public static IPromise InvokeNewThread(Action func) { - var p = new Promise(); + var p = new Promise(); var caller = TraceContext.Instance.CurrentOperation; diff --git a/Implab/Parallels/AsyncQueue.cs b/Implab/Parallels/AsyncQueue.cs new file mode 100644 --- /dev/null +++ b/Implab/Parallels/AsyncQueue.cs @@ -0,0 +1,244 @@ +using System.Threading; +using System.Collections.Generic; +using System; +using System.Collections; + +namespace Implab.Parallels { + public class AsyncQueue : IEnumerable { + class Chunk { + public Chunk next; + + int m_low; + int m_hi; + int m_alloc; + readonly int m_size; + readonly T[] m_data; + + public Chunk(int size) { + m_size = size; + m_data = new T[size]; + } + + public Chunk(int size, T value) { + m_size = size; + m_hi = 1; + m_alloc = 1; + m_data = new T[size]; + m_data[0] = value; + } + + public int Low { + get { return m_low; } + } + + public int Hi { + get { return m_hi; } + } + + public bool TryEnqueue(T value,out bool extend) { + extend = false; + int alloc; + do { + alloc = m_alloc; + if (alloc > m_size) + return false; + } while(alloc != Interlocked.CompareExchange(ref m_alloc, alloc+1, alloc)); + + if (alloc == m_size) { + extend = true; + return false; + } + + m_data[alloc] = value; + + while (alloc != Interlocked.CompareExchange(ref m_hi, alloc + 1, alloc)) { + // spin wait for commit + } + return true; + } + + public bool TryDequeue(out T value,out bool recycle) { + int low; + do { + low = m_low; + if (low >= m_hi) { + value = default(T); + recycle = (low == m_size); + return false; + } + } while(low != Interlocked.CompareExchange(ref m_low, low + 1, low)); + + recycle = (low == m_size - 1); + value = m_data[low]; + + return true; + } + + public T GetAt(int pos) { + return m_data[pos]; + } + } + + public const int DEFAULT_CHUNK_SIZE = 32; + + readonly int m_chunkSize = DEFAULT_CHUNK_SIZE; + + Chunk m_first; + Chunk m_last; + + public AsyncQueue() { + m_last = m_first = new Chunk(m_chunkSize); + } + + public void Enqueue(T value) { + var last = m_last; + // spin wait to the new chunk + bool extend = true; + while(last == null || !last.TryEnqueue(value, out extend)) { + // try to extend queue + if (extend || last == null) { + var chunk = new Chunk(m_chunkSize, value); + if (EnqueueChunk(last, chunk)) + break; + last = m_last; + } else { + while (last != m_last) { + Thread.MemoryBarrier(); + last = m_last; + } + } + } + } + + public bool TryDequeue(out T value) { + var chunk = m_first; + bool recycle; + while (chunk != null) { + + var result = chunk.TryDequeue(out value, out recycle); + + if (recycle) // this chunk is waste + RecycleFirstChunk(chunk); + else + return result; // this chunk is usable and returned actual result + + if (result) // this chunk is waste but the true result is always actual + return true; + + // try again + chunk = m_first; + } + + // the queue is empty + value = default(T); + return false; + } + + bool EnqueueChunk(Chunk last, Chunk chunk) { + if (Interlocked.CompareExchange(ref m_last, chunk, last) != last) + return false; + + if (last != null) + last.next = chunk; + else + m_first = chunk; + return true; + } + + void RecycleFirstChunk(Chunk first) { + var next = first.next; + + if (next == null) { + // looks like this is the last chunk + if (first != Interlocked.CompareExchange(ref m_last, null, first)) { + // race + // maybe someone already recycled this chunk + // or a new chunk has been appedned to the queue + + return; // give up + } + // the tail is updated + } + + // we need to update the head + Interlocked.CompareExchange(ref m_first, next, first); + // if the head is already updated then give up + return; + + } + + #region IEnumerable implementation + + class Enumerator : IEnumerator { + Chunk m_current; + int m_pos = -1; + + public Enumerator(Chunk fisrt) { + m_current = fisrt; + } + + #region IEnumerator implementation + + public bool MoveNext() { + if (m_current == null) + return false; + + if (m_pos == -1) + m_pos = m_current.Low; + else + m_pos++; + if (m_pos == m_current.Hi) { + m_pos = 0; + m_current = m_current.next; + } + + return true; + } + + public void Reset() { + throw new NotSupportedException(); + } + + object IEnumerator.Current { + get { + return Current; + } + } + + #endregion + + #region IDisposable implementation + + public void Dispose() { + } + + #endregion + + #region IEnumerator implementation + + public T Current { + get { + if (m_pos == -1 || m_current == null) + throw new InvalidOperationException(); + return m_current.GetAt(m_pos); + } + } + + #endregion + } + + public IEnumerator GetEnumerator() { + return new Enumerator(m_first); + } + + #endregion + + #region IEnumerable implementation + + IEnumerator IEnumerable.GetEnumerator() { + return GetEnumerator(); + } + + #endregion + } +} diff --git a/Implab/Parallels/WorkerPool.cs b/Implab/Parallels/WorkerPool.cs --- a/Implab/Parallels/WorkerPool.cs +++ b/Implab/Parallels/WorkerPool.cs @@ -6,7 +6,7 @@ using Implab.Diagnostics; namespace Implab.Parallels { public class WorkerPool : DispatchPool { - MTQueue m_queue = new MTQueue(); + AsyncQueue m_queue = new AsyncQueue(); int m_queueLength = 0; readonly int m_threshold = 1; diff --git a/Implab/Promise.cs b/Implab/Promise.cs --- a/Implab/Promise.cs +++ b/Implab/Promise.cs @@ -1,954 +1,258 @@ -using System; -using System.Collections.Generic; -using System.Reflection; -using System.Threading; -using Implab.Parallels; - -namespace Implab { - - /// - /// Класс для асинхронного получения результатов. Так называемое "обещание". - /// - /// Тип получаемого результата - /// - /// Сервис при обращении к его методу дает обещаиние о выполнении операции, - /// клиент получив такое обещание может установить ряд обратных вызово для получения - /// событий выполнения обещания, тоесть завершения операции и предоставлении результатов. - /// - /// Обещение может быть как выполнено, так и выполнено с ошибкой. Для подписки на - /// данные события клиент должен использовать методы Then. - /// - /// - /// Сервис, в свою очередь, по окончанию выполнения операции (возможно с ошибкой), - /// использует методы Resolve либо Reject для оповещения клиетна о - /// выполнении обещания. - /// - /// - /// Если сервер успел выполнить обещание еще до того, как клиент на него подписался, - /// то в момент подписки клиента будут вызваны соответсвующие события в синхронном - /// режиме и клиент будет оповещен в любом случае. Иначе, обработчики добавляются в - /// список в порядке подписания и в этом же порядке они будут вызваны при выполнении - /// обещания. - /// - /// - /// Обрабатывая результаты обещания можно преобразовывать результаты либо инициировать - /// связанные асинхронные операции, которые также возвращают обещания. Для этого следует - /// использовать соответствующую форму методе Then. - /// - /// - /// Также хорошим правилом является то, что Resolve и Reject должен вызывать - /// только инициатор обещания иначе могут возникнуть противоречия. - /// - /// - public class Promise : IPromise { - - protected abstract class AbstractHandler : MTCustomQueueNode { - public abstract void Resolve(T result); - public abstract void Reject(Exception error); - public abstract void Cancel(); - } - - protected class RemapDescriptor : AbstractHandler { - - readonly Func m_resultHandler; - readonly Func m_errorHandler; - readonly Action m_cancellHandler; - readonly Promise m_medium; - - public RemapDescriptor(Func resultHandler, Func errorHandler, Action cancelHandler, Promise medium) { - m_resultHandler = resultHandler; - m_errorHandler = errorHandler; - m_cancellHandler = cancelHandler; - m_medium = medium; - } - - public override void Resolve(T result) { - if (m_resultHandler != null) { - try { - if (m_medium != null) - m_medium.Resolve(m_resultHandler(result)); - else - m_resultHandler(result); - } catch (Exception e) { - Reject(e); - } - } else if(m_medium != null) - m_medium.Resolve(default(T2)); - } - - public override void Reject(Exception error) { - if (m_errorHandler != null) { - try { - var res = m_errorHandler(error); - if (m_medium != null) - m_medium.Resolve(res); - } catch (Exception err2) { - if (m_medium != null) - m_medium.Reject(err2); - } - } else if (m_medium != null) - m_medium.Reject(error); - } - - public override void Cancel() { - if (m_cancellHandler != null) { - try { - m_cancellHandler(); - } catch (Exception err) { - Reject(err); - return; - } - } - if (m_medium != null) - m_medium.Cancel(); - } - } - - protected class HandlerDescriptor : AbstractHandler { - - readonly Action m_resultHandler; - readonly Action m_errorHandler; - readonly Action m_cancellHandler; - readonly Promise m_medium; - - public HandlerDescriptor(Action resultHandler, Action errorHandler, Action cancelHandler, Promise medium) { - m_resultHandler = resultHandler; - m_errorHandler = errorHandler; - m_cancellHandler = cancelHandler; - m_medium = medium; - } - - public override void Resolve(T result) { - if (m_resultHandler != null) { - try { - m_resultHandler(result); - } catch (Exception e) { - Reject(e); - return; - } - } - if(m_medium != null) - m_medium.Resolve(result); - } - - public override void Reject(Exception error) { - if (m_errorHandler != null) { - try { - m_errorHandler(error); - if (m_medium != null) - m_medium.Resolve(default(T)); - } catch (Exception err2) { - if (m_medium != null) - m_medium.Reject(err2); - } - } else if (m_medium != null) - m_medium.Reject(error); - } - - public override void Cancel() { - if (m_cancellHandler != null) { - try { - m_cancellHandler(); - } catch (Exception err) { - Reject(err); - return; - } - } - if (m_medium != null) - m_medium.Cancel(); - } - } - - const int UNRESOLVED_SATE = 0; - const int TRANSITIONAL_STATE = 1; - const int SUCCEEDED_STATE = 2; - const int REJECTED_STATE = 3; - const int CANCELLED_STATE = 4; - - int m_childrenCount; - int m_state; - T m_result; - Exception m_error; - - readonly MTCustomQueue m_handlers = new MTCustomQueue(); - //readonly MTQueue m_handlers = new MTQueue(); - - public Promise() { - } - - public Promise(IPromise parent) { - if (parent != null) - AddHandler( - null, - null, - () => { - if (parent.IsExclusive) - parent.Cancel(); - }, - null, - false - ); - } - - bool BeginTransit() { - return UNRESOLVED_SATE == Interlocked.CompareExchange(ref m_state, TRANSITIONAL_STATE, UNRESOLVED_SATE); - } - - void CompleteTransit(int state) { - if (TRANSITIONAL_STATE != Interlocked.CompareExchange(ref m_state, state, TRANSITIONAL_STATE)) - throw new InvalidOperationException("Can't complete transition when the object isn't in the transitional state"); - } - - void WaitTransition() { - while (m_state == TRANSITIONAL_STATE) { - Thread.MemoryBarrier(); - } - } - - public bool IsResolved { - get { - Thread.MemoryBarrier(); - return m_state > 1; - } - } - - public bool IsCancelled { - get { - Thread.MemoryBarrier(); - return m_state == CANCELLED_STATE; - } - } - - public Type PromiseType { - get { return typeof(T); } - } - - /// - /// Выполняет обещание, сообщая об успешном выполнении. - /// - /// Результат выполнения. - /// Данное обещание уже выполнено - public void Resolve(T result) { - if (BeginTransit()) { - m_result = result; - CompleteTransit(SUCCEEDED_STATE); - OnStateChanged(); - } else { - WaitTransition(); - if (m_state != CANCELLED_STATE) - throw new InvalidOperationException("The promise is already resolved"); - } - } - - /// - /// Выполняет обещание, сообщая об успешном выполнении. Результатом выполнения будет пустое значения. - /// - /// - /// Данный вариант удобен в случаях, когда интересен факт выполнения операции, нежели полученное значение. - /// - public void Resolve() { - Resolve(default(T)); - } - - /// - /// Выполняет обещание, сообщая об ошибке - /// - /// - /// Поскольку обещание должно работать в многопточной среде, при его выполнении сразу несколько потоков - /// могу вернуть ошибку, при этом только первая будет использована в качестве результата, остальные - /// будут проигнорированы. - /// - /// Исключение возникшее при выполнении операции - /// Данное обещание уже выполнено - public void Reject(Exception error) { - if (BeginTransit()) { - m_error = error is TransientPromiseException ? error.InnerException : error; - CompleteTransit(REJECTED_STATE); - OnStateChanged(); - } else { - WaitTransition(); - if (m_state == SUCCEEDED_STATE) - throw new InvalidOperationException("The promise is already resolved"); - } - } - - /// - /// Отменяет операцию, если это возможно. - /// - /// Для определения была ли операция отменена следует использовать свойство . - public void Cancel() { - if (BeginTransit()) { - CompleteTransit(CANCELLED_STATE); - OnStateChanged(); - } - } - - /// - /// Последний обработчик в цепочки обещаний. - /// - /// - /// - /// - /// - /// - /// Данный метод не создает связанного с текущим обещания и предназначен для окончания - /// фсинхронной цепочки. - /// - /// - /// Если данный метод вызвать несколько раз, либо добавить другие обработчики, то цепочка - /// не будет одиночной и, как следствие, будет невозможна отмена - /// всей цепи обещаний снизу (с самого последнего обещания). - /// - /// - public void On(Action success, Action error, Action cancel) { - if (success == null && error == null && cancel == null) - return; - - AddHandler(success, error, cancel, null, false); - } - - public void On(Action success, Action error) { - AddHandler(success, error, null, null, false); - } - - public void On(Action success) { - AddHandler(success, null, null, null, false); - } - - public void On(Action handler, PromiseEventType events) { - Safe.ArgumentNotNull(handler, "handler"); - - - AddHandler( - events.HasFlag(PromiseEventType.Success) ? new Action(x => handler()) : null, - events.HasFlag(PromiseEventType.Error) ? new Action( x => handler()) : null, - events.HasFlag(PromiseEventType.Cancelled) ? handler : null, - null, - false - ); - } - - public IPromise Error(Action error) { - if (error == null) - return this; - - var medium = new Promise(this); - - AddMappers( - x => x, - e => { - error(e); - return default(T); - }, - null, - medium, - true - ); - - return medium; - } - - /// - /// Handles error and allows to keep the promise. - /// - /// - /// If the specified handler throws an exception, this exception will be used to reject the promise. - /// - /// The error handler which returns the result of the promise. - /// New promise. - public IPromise Error(Func handler) { - if (handler == null) - return this; - - var medium = new Promise(this); - - AddMappers(x => x, handler, null, medium, true); - - return medium; - } - - /// - /// Позволяет преобразовать результат выполения операции к новому типу. - /// - /// Новый тип результата. - /// Преобразование результата к новому типу. - /// Обработчик ошибки. Данный обработчик получит - /// исключение возникшее при выполнении операции. - /// Новое обещание, которое будет выполнено при выполнении исходного обещания. - /// - public IPromise Then(Func mapper, Func error, Action cancel) { - Safe.ArgumentNotNull(mapper, "mapper"); - - // создаем прицепленное обещание - var medium = new Promise(this); - - AddMappers( - mapper, - error, - cancel, - medium, - true - ); - - return medium; - } - - public IPromise Then(Func mapper, Func error) { - return Then(mapper, error, null); - } - - public IPromise Then(Func mapper) { - return Then(mapper, null, null); - } - - /// - /// Сцепляет несколько аснхронных операций. Указанная асинхронная операция будет вызвана после - /// выполнения текущей, а результат текущей операции может быть использован для инициализации - /// новой операции. - /// - /// Тип результата указанной асинхронной операции. - /// Асинхронная операция, которая должна будет начаться после выполнения текущей. - /// Обработчик ошибки. Данный обработчик получит - /// исключение возникшее при выполнении текуещй операции. - /// Новое обещание, которое будет выполнено по окончанию указанной аснхронной операции. - /// - public IPromise Chain(Func> chained, Func> error, Action cancel) { - - Safe.ArgumentNotNull(chained, "chained"); - - // проблема в том, что на момент связывания еще не начата асинхронная операция, поэтому нужно - // создать посредника, к которому будут подвызяваться следующие обработчики. - // когда будет выполнена реальная асинхронная операция, она обратиться к посреднику, чтобы - // передать через него результаты работы. - var medium = new Promise(this); - - Func resultHandler = delegate(T result) { - if (medium.IsCancelled) - return default(T); - - var promise = chained(result); - - promise.On( - medium.Resolve, - medium.Reject, - () => medium.Reject(new OperationCanceledException()) // внешняя отмена связанной операции рассматривается как ошибка - ); - - // notify chained operation that it's not needed anymore - // порядок вызова Then, Cancelled важен, поскольку от этого - // зависит IsExclusive - medium.On( - null, - null, - () => { - if (promise.IsExclusive) - promise.Cancel(); - } - ); - - return default(T); - }; - - Func errorHandler; - - if (error != null) - errorHandler = delegate(Exception e) { - try { - var promise = error(e); - - promise.On( - medium.Resolve, - medium.Reject, - () => medium.Reject(new OperationCanceledException()) // внешняя отмена связанной операции рассматривается как ошибка - ); - - // notify chained operation that it's not needed anymore - // порядок вызова Then, Cancelled важен, поскольку от этого - // зависит IsExclusive - medium.Cancelled(() => { - if (promise.IsExclusive) - promise.Cancel(); - }); - } catch (Exception e2) { - medium.Reject(e2); - } - return default(T); - }; - else - errorHandler = err => { - medium.Reject(err); - return default(T); - }; - - - Action cancelHandler; - if (cancel != null) - cancelHandler = () => { - if (cancel != null) - cancel(); - medium.Cancel(); - }; - else - cancelHandler = medium.Cancel; - - AddMappers( - resultHandler, - errorHandler, - cancelHandler, - null, - true - ); - - return medium; - } - - public IPromise Chain(Func> chained, Func> error) { - return Chain(chained, error, null); - } - - public IPromise Chain(Func> chained) { - return Chain(chained, null, null); - } - - public IPromise Cancelled(Action handler) { - var medium = new Promise(this); - AddHandler(null, null, handler, medium, false); - return medium; - } - - /// - /// Adds the specified handler for all cases (success, error, cancel) - /// - /// The handler that will be called anyway - /// self - public IPromise Anyway(Action handler) { - Safe.ArgumentNotNull(handler, "handler"); - - var medium = new Promise(this); - - AddHandler( - x => handler(), - e => { - handler(); - throw new TransientPromiseException(e); - }, - handler, - medium, - true - ); - - return medium; - } - - /// - /// Преобразует результат обещания к нужному типу - /// - /// - /// - public IPromise Cast() { - return Then(x => (T2)(object)x, null); - } - - /// - /// Дожидается отложенного обещания и в случае успеха, возвращает - /// его, результат, в противном случае бросает исключение. - /// - /// - /// - /// Если ожидание обещания было прервано по таймауту, это не значит, - /// что обещание было отменено или что-то в этом роде, это только - /// означает, что мы его не дождались, однако все зарегистрированные - /// обработчики, как были так остались и они будут вызваны, когда - /// обещание будет выполнено. - /// - /// - /// Такое поведение вполне оправдано поскольку таймаут может истечь - /// в тот момент, когда началась обработка цепочки обработчиков, и - /// к тому же текущее обещание может стоять в цепочке обещаний и его - /// отклонение может привести к непрогнозируемому результату. - /// - /// - /// Время ожидания - /// Результат выполнения обещания - public T Join(int timeout) { - var evt = new ManualResetEvent(false); - Anyway(() => evt.Set()); - - if (!evt.WaitOne(timeout, true)) - throw new TimeoutException(); - - switch (m_state) { - case SUCCEEDED_STATE: - return m_result; - case CANCELLED_STATE: - throw new OperationCanceledException(); - case REJECTED_STATE: - throw new TargetInvocationException(m_error); - default: - throw new ApplicationException(String.Format("Invalid promise state {0}", m_state)); - } - } - - public T Join() { - return Join(Timeout.Infinite); - } - - void AddMappers(Func success, Func error, Action cancel, Promise medium, bool inc) { - if (inc) - Interlocked.Increment(ref m_childrenCount); - - AbstractHandler handler = new RemapDescriptor(success, error, cancel, medium); - - bool queued; - - if (!IsResolved) { - m_handlers.Enqueue(handler); - queued = true; - } else { - // the promise is in resolved state, just invoke the handled with minimum overhead - queued = false; - InvokeHandler(handler); - } - - if (queued && IsResolved && m_handlers.TryDequeue(out handler)) - // if the promise have been resolved while we was adding handler to the queue - // we can't guarantee that someone is still processing it - // therefore we will fetch a handler from the queue and execute it - // note that fetched handler may be not the one that we have added - // even we can fetch no handlers at all :) - InvokeHandler(handler); - } - - void AddHandler(Action success, Action error, Action cancel, Promise medium, bool inc) { - if (inc) - Interlocked.Increment(ref m_childrenCount); - - AbstractHandler handler = new HandlerDescriptor(success, error, cancel, medium); - - bool queued; - - if (!IsResolved) { - m_handlers.Enqueue(handler); - queued = true; - } else { - // the promise is in resolved state, just invoke the handled with minimum overhead - queued = false; - InvokeHandler(handler); - } - - if (queued && IsResolved && m_handlers.TryDequeue(out handler)) - // if the promise have been resolved while we was adding handler to the queue - // we can't guarantee that someone is still processing it - // therefore we will fetch a handler from the queue and execute it - // note that fetched handler may be not the one that we have added - // even we can fetch no handlers at all :) - InvokeHandler(handler); - } - - protected virtual void InvokeHandler(AbstractHandler handler) { - switch (m_state) { - case SUCCEEDED_STATE: - handler.Resolve(m_result); - break; - case REJECTED_STATE: - handler.Reject(m_error); - break; - case CANCELLED_STATE: - handler.Cancel(); - break; - default: - // do nothing - return; - } - } - - void OnStateChanged() { - AbstractHandler handler; - while (m_handlers.TryDequeue(out handler)) - InvokeHandler(handler); - } - - public bool IsExclusive { - get { - return m_childrenCount <= 1; - } - } - - /// - /// Объединяет несколько обещаний в одно, результатом которого является массив результатов других обещаний. - /// Если хотябы одно из переданных обещаний не будет выполнено, то новое обещение тоже не будет выполнено. - /// При отмене нового обещания, переданные обещания также будут отменены, если никто больше на них не подписан. - /// - /// Список обещаний. Если список пустой, то результирующее обещание возвращается уже выполненным. - /// Обещание объединяющее в себе результат переданных обещаний. - /// не может быть null - public static IPromise CreateComposite(IList> promises) { - if (promises == null) - throw new ArgumentNullException(); - - // создаем аккумулятор для результатов и результирующее обещание - var result = new T[promises.Count]; - var promise = new Promise(); - - // special case - if (promises.Count == 0) { - promise.Resolve(result); - return promise; - } - - int pending = promises.Count; - - for (int i = 0; i < promises.Count; i++) { - var dest = i; - - if (promises[i] != null) { - promises[i].On( - x => { - result[dest] = x; - if (Interlocked.Decrement(ref pending) == 0) - promise.Resolve(result); - }, - promise.Reject - ); - } else { - if (Interlocked.Decrement(ref pending) == 0) - promise.Resolve(result); - } - } - - promise.Cancelled( - () => { - foreach (var d in promises) - if (d != null && d.IsExclusive) - d.Cancel(); - } - ); - - return promise; - } - - /// - /// Объединяет несколько обещаний в одно. Результирующее обещание будет выполнено при - /// выполнении всех указанных обещаний. При этом возвращаемые значения первичных обещаний - /// игнорируются. - /// - /// Коллекция первичных обещаний, которые будут объеденены в одно. - /// Новое обещание, объединяющее в себе переданные. - /// - /// Если в коллекции встречаюься null, то они воспринимаются как выполненные обещания. - /// - public static IPromise CreateComposite(ICollection promises) { - if (promises == null) - throw new ArgumentNullException(); - if (promises.Count == 0) - return Promise.ResultToPromise(null); - - int countdown = promises.Count; - - var result = new Promise(); - - foreach (var d in promises) { - if (d == null) { - if (Interlocked.Decrement(ref countdown) == 0) - result.Resolve(null); - } else { - d.Then(() => { - if (Interlocked.Decrement(ref countdown) == 0) - result.Resolve(null); - }); - } - } - - result.Cancelled(() => { - foreach (var d in promises) - if (d != null && d.IsExclusive) - d.Cancel(); - }); - - return result; - } - - public static Promise ResultToPromise(T result) { - var p = new Promise(); - p.Resolve(result); - return p; - } - - public static Promise ExceptionToPromise(Exception error) { - if (error == null) - throw new ArgumentNullException(); - - var p = new Promise(); - p.Reject(error); - return p; - } - - #region IPromiseBase explicit implementation - - IPromise IPromise.Then(Action success, Action error, Action cancel) { - return Then( - success != null ? new Func(x => { - success(); - return x; - }) : null, - error != null ? new Func(e => { - error(e); - return default(T); - }) : null, - cancel - ); - } - - IPromise IPromise.Then(Action success, Action error) { - return Then( - success != null ? new Func(x => { - success(); - return x; - }) : null, - error != null ? new Func(e => { - error(e); - return default(T); - }) : null - ); - } - - IPromise IPromise.Then(Action success) { - Safe.ArgumentNotNull(success, "success"); - return Then(x => { - success(); - return x; - }); - } - - IPromise IPromise.Chain(Func chained, Func error, Action cancel) { - return ChainNoResult(chained, error, cancel); - } - - IPromise ChainNoResult(Func chained, Func error, Action cancel) { - Safe.ArgumentNotNull(chained, "chained"); - - var medium = new Promise(this); - - Func resultHandler = delegate { - if (medium.IsCancelled) - return default(T); - - var promise = chained(); - - promise.On( - medium.Resolve, - medium.Reject, - () => medium.Reject(new OperationCanceledException()) // внешняя отмена связанной операции рассматривается как ошибка - ); - - // notify chained operation that it's not needed anymore - // порядок вызова Then, Cancelled важен, поскольку от этого - // зависит IsExclusive - medium.Cancelled(() => { - if (promise.IsExclusive) - promise.Cancel(); - }); - - return default(T); - }; - - Func errorHandler; - - if (error != null) - errorHandler = delegate(Exception e) { - try { - var promise = error(e); - - promise.On( - medium.Resolve, - medium.Reject, - () => medium.Reject(new OperationCanceledException()) // внешняя отмена связанной операции рассматривается как ошибка - ); - - // notify chained operation that it's not needed anymore - // порядок вызова Then, Cancelled важен, поскольку от этого - // зависит IsExclusive - medium.Cancelled(() => { - if (promise.IsExclusive) - promise.Cancel(); - }); - } catch (Exception e2) { - medium.Reject(e2); - } - return default(T); - }; - else - errorHandler = err => { - medium.Reject(err); - return default(T); - }; - - - Action cancelHandler; - if (cancel != null) - cancelHandler = () => { - if (cancel != null) - cancel(); - medium.Cancel(); - }; - else - cancelHandler = medium.Cancel; - - AddMappers( - resultHandler, - errorHandler, - cancelHandler, - null, - true - ); - - return medium; - } - - IPromise IPromise.Chain(Func chained, Func error) { - return ChainNoResult(chained, error, null); - } - - IPromise IPromise.Chain(Func chained) { - return ChainNoResult(chained, null, null); - } - - - void IPromise.On(Action success, Action error, Action cancel) { - On(success != null ? new Action(x => success()) : null, error, cancel); - } - - void IPromise.On(Action success, Action error) { - On(x => success(), error, null); - } - - void IPromise.On(Action success) { - On(x => success(), null, null); - } - - IPromise IPromise.Error(Action error) { - return Error(error); - } - - IPromise IPromise.Anyway(Action handler) { - return Anyway(handler); - } - - IPromise IPromise.Cancelled(Action handler) { - return Cancelled(handler); - } - - void IPromise.Join() { - Join(); - } - - void IPromise.Join(int timeout) { - Join(timeout); - } - - #endregion - - - - } -} +using System; +using System.Diagnostics; + +namespace Implab { + public class Promise : AbstractPromise, IPromise, IDeferred { + + public struct HandlerDescriptor { + readonly Action m_success; + readonly Action m_error; + readonly Action m_cancel; + readonly IDeferred m_deferred; + + public HandlerDescriptor(Action success, Action error, Action cancel, IDeferred deferred) { + m_success = success; + m_error = error; + m_cancel = cancel; + m_deferred = deferred; + } + + public void SignalSuccess() { + if (m_success != null) { + try { + m_success(); + if (m_deferred != null) + m_deferred.Resolve(); + } catch (Exception err) { + SignalError(err); + } + } + } + + public void SignalError(Exception err) { + if (m_error != null) { + try { + m_error(err); + if (m_deferred != null) + m_deferred.Resolve(); + } catch (Exception err2) { + if (m_deferred != null) + m_deferred.Reject(err2); + } + } else { + if (m_deferred != null) + m_deferred.Reject(err); + } + } + + public void SignalCancel() { + if (m_cancel != null) { + try { + m_cancel(); + if (m_deferred != null) + m_deferred.Resolve(); + } catch (Exception err) { + SignalError(err); + } + } else { + if (m_deferred != null) + m_deferred.Cancel(); + } + } + } + + public void Resolve() { + BeginSetResult(); + EndSetResult(); + } + + public void Reject(Exception error) { + SetError(error); + } + + #region implemented abstract members of AbstractPromise + + protected override void SignalSuccess(HandlerDescriptor handler) { + handler.SignalSuccess(); + } + + protected override void SignalError(HandlerDescriptor handler, Exception error) { + handler.SignalError(error); + } + + protected override void SignalCancelled(HandlerDescriptor handler) { + handler.SignalCancel(); + } + + protected override void Listen(PromiseEventType events, Action handler) { + AddHandler(new HandlerDescriptor( + events.HasFlag(PromiseEventType.Success) ? handler : null, + events.HasFlag(PromiseEventType.Error) ? new Action(err => handler()) : null, + events.HasFlag(PromiseEventType.Cancelled) ? handler : null, + null + )); + } + + #endregion + + + public Type PromiseType { + get { + return typeof(void); + } + } + + public IPromise Then(Action success, Action error, Action cancel) { + var promise = new Promise(); + if (success != null) + promise.On(Cancel, PromiseEventType.Cancelled); + + AddHandler(new HandlerDescriptor(success, error, cancel, promise)); + + return promise; + } + + public IPromise Then(Action success, Action error) { + return Then(success, error, null); + } + + public IPromise Then(Action success) { + return Then(success, null, null); + } + + public IPromise On(Action success, Action error, Action cancel) { + AddHandler(new HandlerDescriptor(success, error, cancel, null)); + return this; + } + + public IPromise On(Action success, Action error) { + return On(success, error, null); + } + + public IPromise On(Action success) { + return On(success, null, null); + } + + public IPromise On(Action handler, PromiseEventType events) { + return On( + events.HasFlag(PromiseEventType.Success) ? handler : null, + events.HasFlag(PromiseEventType.Error) ? new Action(err => handler()) : null, + events.HasFlag(PromiseEventType.Cancelled) ? handler : null + ); + } + + public IPromise Cast() { + throw new InvalidCastException(); + } + + public IPromise Chain(Func chained, Func error, Func cancel) { + var medium = new Promise(); + + On( + () => { + if (medium.IsCancelled) + return; + if (chained != null) + ConnectPromise(chained(), medium); + }, + ex => { + if (medium.IsCancelled) + return; + if (error != null) { + try { + ConnectPromise(error(ex), medium); + } catch (Exception ex2) { + medium.Reject(ex2); + } + } else { + medium.Reject(ex); + } + }, + () => { + if (medium.IsCancelled) + return; + if (cancel != null) + ConnectPromise(cancel(), medium); + else + medium.Cancel(); + } + ); + + if (chained != null) + medium.On(Cancel, PromiseEventType.Cancelled); + + return medium; + } + + static void ConnectPromise(IPromise result, Promise medium) { + if (result != null) { + result.On( + medium.Resolve, + medium.Reject, + () => medium.Reject(new OperationCanceledException()) + ); + medium.On(result.Cancel, PromiseEventType.Cancelled); + } else { + medium.Reject( + new NullReferenceException( + "The chained asynchronous operation returned" + + " 'null' where the promise instance is expected" + ) + ); + } + } + + public IPromise Chain(Func chained, Func error) { + return Chain(chained, error, null); + } + + public IPromise Chain(Func chained) { + return Chain(chained, null, null); + } + + public IPromise Error(Action error) { + var promise = new Promise(); + On( + null, + err => { + if (error != null) + try { + error(err); + promise.Resolve(); + } catch (Exception err2) { + promise.Reject(err2); + } + else + promise.Reject(err); + } + ); + + return promise; + } + + public IPromise Cancelled(Action handler) { + var promise = new Promise(); + On( + null, + null, + () => { + if (handler != null) { + try { + handler(); + promise.Resolve(); + } catch (Exception err) { + promise.Reject(err); + } + } else { + promise.Cancel(); + } + } + ); + + return promise; + } + + + } +} + diff --git a/Implab/PromiseExtensions.cs b/Implab/PromiseExtensions.cs --- a/Implab/PromiseExtensions.cs +++ b/Implab/PromiseExtensions.cs @@ -1,6 +1,7 @@ using System.Threading; using System; using Implab.Diagnostics; +using System.Collections.Generic; #if NET_4_5 @@ -15,7 +16,8 @@ namespace Implab { if (context == null) return that; - var p = new SyncContextPromise(context, that); + var p = new SyncContextPromise(context); + p.On(that.Cancel, PromiseEventType.Cancelled); that.On( p.Resolve, @@ -29,7 +31,9 @@ namespace Implab { Safe.ArgumentNotNull(that, "that"); Safe.ArgumentNotNull(context, "context"); - var p = new SyncContextPromise(context, that); + var p = new SyncContextPromise(context); + p.On(that.Cancel, PromiseEventType.Cancelled); + that.On( p.Resolve, @@ -89,6 +93,29 @@ namespace Implab { that.On(timer.Dispose, PromiseEventType.All); return that; } + + public static IPromise Combine(this ICollection that) { + Safe.ArgumentNotNull(that, "that"); + + int count = that.Count; + var medium = new Promise(); + + foreach (var p in that) + p.On( + () => { + if (Interlocked.Decrement(ref count) == 0) + medium.Resolve(); + }, + error => { + throw new Exception("The dependency promise is failed", error); + }, + () => { + throw new OperationCanceledException("The dependency promise is cancelled"); + } + ); + + return medium; + } #if NET_4_5 diff --git a/Implab/PromiseT.cs b/Implab/PromiseT.cs new file mode 100644 --- /dev/null +++ b/Implab/PromiseT.cs @@ -0,0 +1,621 @@ +using System; +using System.Diagnostics; + +namespace Implab { + + /// + /// Класс для асинхронного получения результатов. Так называемое "обещание". + /// + /// Тип получаемого результата + /// + /// Сервис при обращении к его методу дает обещаиние о выполнении операции, + /// клиент получив такое обещание может установить ряд обратных вызово для получения + /// событий выполнения обещания, тоесть завершения операции и предоставлении результатов. + /// + /// Обещение может быть как выполнено, так и выполнено с ошибкой. Для подписки на + /// данные события клиент должен использовать методы Then. + /// + /// + /// Сервис, в свою очередь, по окончанию выполнения операции (возможно с ошибкой), + /// использует методы Resolve либо Reject для оповещения клиетна о + /// выполнении обещания. + /// + /// + /// Если сервер успел выполнить обещание еще до того, как клиент на него подписался, + /// то в момент подписки клиента будут вызваны соответсвующие события в синхронном + /// режиме и клиент будет оповещен в любом случае. Иначе, обработчики добавляются в + /// список в порядке подписания и в этом же порядке они будут вызваны при выполнении + /// обещания. + /// + /// + /// Обрабатывая результаты обещания можно преобразовывать результаты либо инициировать + /// связанные асинхронные операции, которые также возвращают обещания. Для этого следует + /// использовать соответствующую форму методе Then. + /// + /// + /// Также хорошим правилом является то, что Resolve и Reject должен вызывать + /// только инициатор обещания иначе могут возникнуть противоречия. + /// + /// + public class Promise : AbstractPromise>, IPromise, IDeferred { + + class StubDeferred : IDeferred { + public static readonly StubDeferred instance = new StubDeferred(); + + StubDeferred() { + } + + #region IDeferred implementation + + public void Resolve(T value) { + } + + public void Reject(Exception error) { + } + + #endregion + + #region ICancellable implementation + + public void Cancel() { + } + + #endregion + + + } + + class RemapDescriptor : IDeferred { + readonly Func m_remap; + readonly Func m_failed; + readonly Func m_cancel; + readonly IDeferred m_deferred; + + public RemapDescriptor(Func remap, Func failed, Func cancel, IDeferred deferred ) { + Debug.Assert(deferred != null); + m_remap = remap; + m_failed = failed; + m_cancel = cancel; + m_deferred = deferred; + } + + + + #region IDeferred implementation + + public void Resolve(T value) { + if (m_remap != null) { + try { + m_deferred.Resolve(m_remap(value)); + } catch (Exception ex) { + Reject(ex); + } + } + } + + public void Reject(Exception error) { + if (m_failed != null) { + try { + m_deferred.Resolve(m_failed(error)); + } catch (Exception ex) { + m_deferred.Reject(ex); + } + } else { + m_deferred.Reject(error); + } + } + + + #endregion + + #region ICancellable implementation + + public void Cancel() { + if (m_cancel != null) { + try { + m_deferred.Resolve(m_cancel()); + } catch (Exception ex) { + Reject(ex); + } + } else { + m_deferred.Cancel(); + } + } + + #endregion + } + + class ListenerDescriptor : IDeferred { + readonly Action m_handler; + readonly PromiseEventType m_events; + + public ListenerDescriptor(Action handler, PromiseEventType events) { + Debug.Assert(handler != null); + + m_handler = handler; + m_events = events; + } + + #region IDeferred implementation + + public void Resolve(T value) { + if (m_events.HasFlag(PromiseEventType.Success)) { + try { + m_handler(); + // Analysis disable once EmptyGeneralCatchClause + } catch { + } + } + } + + public void Reject(Exception error) { + if (m_events.HasFlag(PromiseEventType.Error)){ + try { + m_handler(); + // Analysis disable once EmptyGeneralCatchClause + } catch { + } + } + } + + #endregion + + #region ICancellable implementation + + public void Cancel() { + if (m_events.HasFlag(PromiseEventType.Cancelled)){ + try { + m_handler(); + // Analysis disable once EmptyGeneralCatchClause + } catch { + } + } + } + + #endregion + } + + class ValueEventDescriptor : IDeferred { + readonly Action m_success; + readonly Action m_failed; + readonly Action m_cancelled; + readonly IDeferred m_deferred; + + public ValueEventDescriptor(Action success, Action failed, Action cancelled, IDeferred deferred) { + Debug.Assert(deferred != null); + + m_success = success; + m_failed = failed; + m_cancelled = cancelled; + m_deferred = deferred; + } + + #region IDeferred implementation + + public void Resolve(T value) { + if (m_success != null) { + try { + m_success(value); + m_deferred.Resolve(value); + } catch (Exception ex) { + Reject(ex); + } + } + } + + public void Reject(Exception error) { + if (m_failed != null) { + try { + m_failed(error); + m_deferred.Resolve(default(T)); + } catch(Exception ex) { + m_deferred.Reject(ex); + } + } else { + m_deferred.Reject(error); + } + } + + #endregion + + #region ICancellable implementation + + public void Cancel() { + if (m_cancelled != null) { + try { + m_cancelled(); + m_deferred.Resolve(default(T)); + } catch(Exception ex) { + Reject(ex); + } + } else { + m_deferred.Cancel(); + } + } + + #endregion + } + + public class EventDescriptor : IDeferred { + readonly Action m_success; + readonly Action m_failed; + readonly Action m_cancelled; + readonly IDeferred m_deferred; + + public EventDescriptor(Action success, Action failed, Action cancelled, IDeferred deferred) { + Debug.Assert(deferred != null); + + m_success = success; + m_failed = failed; + m_cancelled = cancelled; + m_deferred = deferred; + } + + #region IDeferred implementation + + public void Resolve(T value) { + if (m_success != null) { + try { + m_success(); + m_deferred.Resolve(value); + } catch (Exception ex) { + Reject(ex); + } + } + } + + public void Reject(Exception error) { + if (m_failed != null) { + try { + m_failed(error); + m_deferred.Resolve(default(T)); + }catch (Exception ex) + { + m_deferred.Reject(ex); + } + } else { + m_deferred.Reject(error); + } + + } + + #endregion + + #region ICancellable implementation + + public void Cancel() { + if (m_cancelled != null) { + try { + m_cancelled(); + m_deferred.Resolve(default(T)); + } catch (Exception ex) { + Reject(ex); + } + } else { + m_deferred.Cancel(); + } + } + + #endregion + } + + T m_result; + + public virtual void Resolve(T value) { + BeginSetResult(); + m_result = value; + EndSetResult(); + } + + public void Reject(Exception error) { + SetError(error); + } + + public Type PromiseType { + get { + return typeof(T); + } + } + + public new T Join() { + WaitResult(-1); + return m_result; + } + public new T Join(int timeout) { + WaitResult(timeout); + return m_result; + } + + public IPromise On(Action success, Action error, Action cancel) { + AddHandler(new ValueEventDescriptor(success, error, cancel, StubDeferred.instance)); + return this; + } + + public IPromise On(Action success, Action error) { + AddHandler(new ValueEventDescriptor(success, error, null, StubDeferred.instance)); + return this; + } + + public IPromise On(Action success) { + AddHandler(new ValueEventDescriptor(success, null, null, StubDeferred.instance)); + return this; + } + + public IPromise On(Action handler, PromiseEventType events) { + Listen(events, handler); + return this; + } + + public IPromise Then(Func mapper, Func error, Func cancel) { + var promise = new Promise(); + AddHandler(new RemapDescriptor(mapper, error, cancel, promise)); + return promise; + } + + public IPromise Then(Func mapper, Func error) { + var promise = new Promise(); + AddHandler(new RemapDescriptor(mapper, error, null, promise)); + return promise; + } + + public IPromise Then(Func mapper) { + var promise = new Promise(); + AddHandler(new RemapDescriptor(mapper, null, null, promise)); + return promise; + } + + public IPromise Chain(Func> chained, Func> error, Func> cancel) { + // this promise will be resolved when an asyc operation is started + var promise = new Promise>(); + + AddHandler(new RemapDescriptor>( + chained, + error, + cancel, + promise + )); + + var medium = new Promise(); + + if (chained != null) + medium.On(Cancel, PromiseEventType.Cancelled); + + // we need to connect started async operation with the medium + // if the async operation hasn't been started by the some reason + // report is to the medium + promise.On( + result => ConnectPromise(result, medium), + medium.Reject, + medium.Cancel + ); + + return medium; + } + + static void ConnectPromise(IPromise result, Promise medium) { + if (result != null) { + result.On( + medium.Resolve, + medium.Reject, + () => medium.Reject(new OperationCanceledException()) + ); + medium.On(result.Cancel, PromiseEventType.Cancelled); + } else { + medium.Reject( + new NullReferenceException( + "The chained asynchronous operation returned" + + " 'null' where the promise instance is expected" + ) + ); + } + } + + public IPromise Chain(Func> chained, Func> error) { + return Chain(chained, error, null); + } + + public IPromise Chain(Func> chained) { + return Chain(chained, null, null); + } + + public IPromise Error(Func error) { + var promise = new Promise(); + if (error != null) + On( + (Action)null, + ex => { + try { + promise.Resolve(error(ex)); + } catch (Exception ex2) { + promise.Reject(ex2); + } + } + ); + else + Listen(PromiseEventType.Error, () => promise.Resolve(default(T2))); + return promise; + } + + public IPromise Cancelled(Func handler) { + var promise = new Promise(); + if (handler != null) + On( + (Action)null, + null, + () => { + try { + promise.Resolve(handler()); + } catch (Exception ex) { + promise.Reject(ex); + } + }); + else + Listen(PromiseEventType.Cancelled, () => promise.Resolve(default(T2))); + return promise; + } + + public IPromise Then(Action success, Action error, Action cancel) { + var promise = new Promise(); + if (success != null) + promise.On(Cancel, PromiseEventType.Cancelled); + + AddHandler(new EventDescriptor(success, error, cancel, promise)); + + return promise; + } + + public IPromise Then(Action success, Action error) { + return Then(success, error, null); + } + + public IPromise Then(Action success) { + return Then(success, null, null); + } + + public IPromise Chain(Func chained, Func error, Func cancel) { + var promise = new Promise(); + + AddHandler( + new RemapDescriptor( + x => chained(), + error, + cancel, + promise + ) + ); + + var medium = new Promise(); + if (chained != null) + medium.On(Cancel, PromiseEventType.Cancelled); + + promise.On( + result => ConnectPromise(result, medium), + medium.Reject, + medium.Cancel + ); + + return medium; + } + + static void ConnectPromise(IPromise result, Promise medium) { + if (result != null) { + result.On( + medium.Resolve, + medium.Reject, + () => medium.Reject(new OperationCanceledException()) + ); + medium.On(result.Cancel, PromiseEventType.Cancelled); + } else { + medium.Reject( + new NullReferenceException( + "The chained asynchronous operation returned" + + " 'null' where the promise instance is expected" + ) + ); + } + } + + public IPromise Chain(Func chained, Func error) { + return Chain(chained, error, null); + } + + public IPromise Chain(Func chained) { + return Chain(chained, null, null); + } + + public IPromise On(Action success, Action error, Action cancel) { + AddHandler(new EventDescriptor(success,error,cancel, StubDeferred.instance)); + return this; + } + + public IPromise On(Action success, Action error) { + AddHandler(new EventDescriptor(success, error, null, StubDeferred.instance)); + return this; + } + + public IPromise On(Action success) { + Listen(PromiseEventType.Success, success); + return this; + } + + IPromise IPromise.On(Action handler, PromiseEventType events) { + Listen(events,handler); + return this; + } + + public IPromise Error(Action error) { + var promise = new Promise(); + if (error != null) + On( + (Action)null, + ex => { + try { + error(ex); + promise.Resolve(); + } catch (Exception ex2) { + promise.Reject(ex2); + } + }); + else + Listen(PromiseEventType.Error, promise.Resolve); + return promise; + } + + public IPromise Cancelled(Action handler) { + var promise = new Promise(); + if (handler != null) + On( + (Action)null, + null, + () => { + try { + handler(); + promise.Resolve(); + } catch (Exception ex) { + promise.Reject(ex); + } + }); + else + Listen(PromiseEventType.Cancelled, promise.Resolve); + return promise; + } + + public IPromise Cast() { + return (IPromise)this; + } + + #region implemented abstract members of AbstractPromise + + protected override void SignalSuccess(IDeferred handler) { + handler.Resolve(m_result); + } + + protected override void SignalError(IDeferred handler, Exception error) { + handler.Reject(error); + } + + protected override void SignalCancelled(IDeferred handler) { + handler.Cancel(); + } + + protected override void Listen(PromiseEventType events, Action handler) { + if (handler != null) + AddHandler(new ListenerDescriptor(handler, events)); + } + + #endregion + + public static IPromise ResultToPromise(T value) { + var p = new Promise(); + p.Resolve(value); + return p; + } + + public static IPromise ExceptionToPromise(Exception error) { + var p = new Promise(); + p.Reject(error); + return p; + } + + } +} diff --git a/Implab/PromiseTransientException.cs b/Implab/PromiseTransientException.cs new file mode 100644 --- /dev/null +++ b/Implab/PromiseTransientException.cs @@ -0,0 +1,33 @@ +using System; + +namespace Implab { + + [Serializable] + public class PromiseTransientException : Exception { + /// + /// Initializes a new instance of the class. + /// + /// The exception that is the cause of the current exception. + public PromiseTransientException(Exception inner) : base("The preceding promise has failed", inner) { + } + + /// + /// Initializes a new instance of the class + /// + /// A that describes the exception. + /// The exception that is the cause of the current exception. + public PromiseTransientException(string message, Exception inner) + : base(message, inner) { + } + + /// + /// Initializes a new instance of the class + /// + /// The contextual information about the source or destination. + /// The object that holds the serialized object data. + protected PromiseTransientException(System.Runtime.Serialization.SerializationInfo info, System.Runtime.Serialization.StreamingContext context) + : base(info, context) { + } + } +} + diff --git a/Implab/Safe.cs b/Implab/Safe.cs --- a/Implab/Safe.cs +++ b/Implab/Safe.cs @@ -23,7 +23,7 @@ namespace Implab public static void ArgumentNotEmpty(T[] param, string name) { if (param == null || param.Length == 0) - throw new ArgumentException("The array must be not emty"); + throw new ArgumentException("The array must be not emty", name); } public static void ArgumentNotNull(object param, string name) { @@ -61,7 +61,7 @@ namespace Implab public static IPromise InvokePromise(Action action) { ArgumentNotNull(action, "action"); - var p = new Promise(); + var p = new Promise(); try { action(); p.Resolve(); diff --git a/Implab/SyncContextPromise.cs b/Implab/SyncContextPromise.cs --- a/Implab/SyncContextPromise.cs +++ b/Implab/SyncContextPromise.cs @@ -9,13 +9,16 @@ namespace Implab { m_context = context; } - public SyncContextPromise(SynchronizationContext context, IPromise parent) - : base(parent) { - Safe.ArgumentNotNull(context, "context"); - m_context = context; + protected override void SignalSuccess(IDeferred handler) { + m_context.Post(x => base.SignalSuccess(handler), null); } - protected override void InvokeHandler(AbstractHandler handler) { - m_context.Post(x => base.InvokeHandler(handler),null); + + protected override void SignalError(IDeferred handler, System.Exception error) { + m_context.Post(x => base.SignalError(handler, error), null); + } + + protected override void SignalCancelled(IDeferred handler) { + m_context.Post(x => base.SignalCancelled(handler), null); } } } diff --git a/Implab/TransientPromiseException.cs b/Implab/TransientPromiseException.cs deleted file mode 100644 --- a/Implab/TransientPromiseException.cs +++ /dev/null @@ -1,33 +0,0 @@ -using System; - -namespace Implab { - - [Serializable] - public class TransientPromiseException : Exception { - /// - /// Initializes a new instance of the class. - /// - /// The exception that is the cause of the current exception. - public TransientPromiseException(Exception inner) : base("The preceding promise has failed", inner) { - } - - /// - /// Initializes a new instance of the class - /// - /// A that describes the exception. - /// The exception that is the cause of the current exception. - public TransientPromiseException(string message, Exception inner) - : base(message, inner) { - } - - /// - /// Initializes a new instance of the class - /// - /// The contextual information about the source or destination. - /// The object that holds the serialized object data. - protected TransientPromiseException(System.Runtime.Serialization.SerializationInfo info, System.Runtime.Serialization.StreamingContext context) - : base(info, context) { - } - } -} - diff --git a/MonoPlay/Program.cs b/MonoPlay/Program.cs --- a/MonoPlay/Program.cs +++ b/MonoPlay/Program.cs @@ -11,7 +11,7 @@ namespace MonoPlay { if (args == null) throw new ArgumentNullException("args"); - var q1 = new MTQueue(); + var q1 = new AsyncQueue(); var q2 = new Queue(); const int count = 10000000; @@ -19,60 +19,91 @@ namespace MonoPlay { var t1 = Environment.TickCount; - Promise.CreateComposite( - new [] { - AsyncPool.InvokeNewThread(() => { - for (var i = 0; i < count; i++) - q1.Enqueue(i); - }), - AsyncPool.InvokeNewThread(() => { - int temp = 0; - for(int i =0 ; i< count ; i++) - while(!q1.TryDequeue(out temp)){ - } - }) - } - ).Join(); + new [] { + AsyncPool.InvokeNewThread(() => { + for (var i = 0; i < count; i++) + q1.Enqueue(i); + }), + AsyncPool.InvokeNewThread(() => { + for (var i = 0; i < count; i++) + q1.Enqueue(i); + }), + AsyncPool.InvokeNewThread(() => { + int temp = 0; + int i = 0; + while (i < count) + if (q1.TryDequeue(out temp)) + i++; + }), + AsyncPool.InvokeNewThread(() => { + int temp = 0; + int i = 0; + while (i < count) + if (q1.TryDequeue(out temp)) + i++; + }) + } + .Combine() + .Join(); var t2 = Environment.TickCount; Console.WriteLine("MTQueue: {0} ms", t2 - t1); t1 = Environment.TickCount; - for (var i = 0; i < count; i++) + for (var i = 0; i < count * 2; i++) q2.Enqueue(i); + for (var i = 0; i < count * 2; i++) + q2.Dequeue(); + t2 = Environment.TickCount; - Console.WriteLine("LinkedList: {0} ms", t2 - t1); + Console.WriteLine("Queue: {0} ms", t2 - t1); q2 = new Queue(); t1 = Environment.TickCount; - Promise.CreateComposite( - new [] { - AsyncPool.InvokeNewThread(() => { - for (var i = 0; i < count; i++) - lock (q2) - q2.Enqueue(i); - }), - AsyncPool.InvokeNewThread(() => { - for(int i = 0 ; i< count ;) - lock(q2) { - if(q2.Count == 0) - continue; - q2.Dequeue(); - i++; - } + + new [] { + AsyncPool.InvokeNewThread(() => { + for (var i = 0; i < count; i++) + lock (q2) + q2.Enqueue(i); + }), + AsyncPool.InvokeNewThread(() => { + for (var i = 0; i < count; i++) + lock (q2) + q2.Enqueue(i); + }), + AsyncPool.InvokeNewThread(() => { + for (int i = 0; i < count ;) + lock (q2) { + if (q2.Count == 0) + continue; + q2.Dequeue(); + i++; + } - }) - } - ).Join(); + }), + AsyncPool.InvokeNewThread(() => { + for (int i = 0; i < count ;) + lock (q2) { + if (q2.Count == 0) + continue; + q2.Dequeue(); + i++; + } + + }) + } + .Combine() + .Join(); t2 = Environment.TickCount; - Console.WriteLine("LinkedList+Lock: {0} ms", t2 - t1); + Console.WriteLine("Queue+Lock: {0} ms", t2 - t1); } }