##// END OF EJS Templates
working on promises
cin -
r243:b1e0ffdf3451 v3
parent child
Show More
@@ -0,0 +1,9
1 namespace Implab {
2 public enum PromiseState {
3 Pending,
4
5 Resolved,
6
7 Rejected
8 }
9 } No newline at end of file
@@ -1,170 +1,141
1 1 using System;
2 2 using Implab.Parallels;
3 3 using System.Threading;
4 4 using System.Reflection;
5 5 using System.Diagnostics;
6 6
7 7 namespace Implab {
8 /// <summary>
9 /// Abstract class for creation of custom one-shot thread safe events.
10 /// </summary>
11 /// <remarks>
12 /// <para>
13 /// An event is something that should happen in the future and the
14 /// triggering of the event causes execution of some pending actions
15 /// which are formely event handlers. One-shot events occur only once
16 /// and any handler added after the event is triggered should run
17 /// without a delay.
18 /// </para>
19 /// <para>
20 /// The lifecycle of the one-shot event is tipically consists of following
21 /// phases.
22 /// <list>
23 /// <description>Pending state. This is the initial state of the event. Any
24 /// handler added to the event will be queued for the future execution.
25 /// </description>
26 /// <description>Transitional state. This is intermediate state between pending
27 /// and fulfilled states, during this state internal initialization and storing
28 /// of the result occurs.
29 /// </description>
30 /// <description>Fulfilled state. The event contains the result, all queued
31 /// handlers are signalled to run and newly added handlers are executed
32 /// immediatelly.
33 /// </description>
34 /// </list>
35 /// </para>
36 /// </remarks>
8 37 public abstract class AbstractEvent<THandler> where THandler : class {
9
10 38 const int PENDING_SATE = 0;
11 protected const int TRANSITIONAL_STATE = 1;
12 39
13 protected const int SUCCEEDED_STATE = 2;
14 protected const int REJECTED_STATE = 3;
40 const int TRANSITIONAL_STATE = 1;
41
42 const int FULFILLED_STATE = 2;
15 43
16 44 volatile int m_state;
17 Exception m_error;
18 45
19 46 THandler m_handler;
20 47 SimpleAsyncQueue<THandler> m_extraHandlers;
21 48
49 public bool IsFulfilled {
50 get {
51 return m_state > TRANSITIONAL_STATE;
52 }
53 }
54
22 55 #region state managment
23 56 protected bool BeginTransit() {
24 57 return PENDING_SATE == Interlocked.CompareExchange(ref m_state, TRANSITIONAL_STATE, PENDING_SATE);
25 58 }
26 59
27 protected void CompleteTransit(int state) {
60 protected void CompleteTransit() {
28 61 #if DEBUG
29 if (TRANSITIONAL_STATE != Interlocked.CompareExchange(ref m_state, state, TRANSITIONAL_STATE))
62 if (TRANSITIONAL_STATE != Interlocked.CompareExchange(ref m_state, FULFILLED_STATE, TRANSITIONAL_STATE))
30 63 throw new InvalidOperationException("Can't complete transition when the object isn't in the transitional state");
31 64 #else
32 65 m_state = state;
33 66 #endif
34 67 Signal();
35 68 }
36 69
37 70 protected void WaitTransition() {
38 71 if (m_state == TRANSITIONAL_STATE) {
39 72 SpinWait spin;
40 73 do {
41 74 spin.SpinOnce();
42 75 } while (m_state == TRANSITIONAL_STATE);
43 76 }
44 77 }
45 78
46 protected bool BeginSetResult() {
47 if (!BeginTransit()) {
48 WaitTransition();
49 return false;
50 }
51 return true;
52 }
53 79
54 protected void EndSetResult() {
55 CompleteTransit(SUCCEEDED_STATE);
56 }
57
58
59
60 /// <summary>
61 /// Выполняет обещание, сообщая об ошибке
62 /// </summary>
63 /// <remarks>
64 /// Поскольку обещание должно работать в многопточной среде, при его выполнении сразу несколько потоков
65 /// могу вернуть ошибку, при этом только первая будет использована в качестве результата, остальные
66 /// будут проигнорированы.
67 /// </remarks>
68 /// <param name="error">Исключение возникшее при выполнении операции</param>
69 /// <exception cref="InvalidOperationException">Данное обещание уже выполнено</exception>
70 protected void SetError(Exception error) {
71 if (BeginTransit()) {
72 m_error = error;
73 CompleteTransit(REJECTED_STATE);
74 } else {
75 WaitTransition();
76 if (m_state == SUCCEEDED_STATE)
77 throw new InvalidOperationException("The promise is already resolved");
78 }
79 }
80
81 protected abstract void SignalHandler(THandler handler, int signal);
80 protected abstract void SignalHandler(THandler handler);
82 81
83 82 void Signal() {
84 83 THandler handler;
85 84 while (TryDequeueHandler(out handler))
86 SignalHandler(handler, m_state);
85 SignalHandler(handler);
87 86 }
88 87
89 88 #endregion
90 89
91 90 protected abstract Signal GetFulfillSignal();
92 91
93 92 #region synchronization traits
94 93 protected void WaitResult(int timeout) {
95 94 if (!(IsFulfilled || GetFulfillSignal().Wait(timeout)))
96 95 throw new TimeoutException();
97
98 if (IsRejected)
99 Rethrow();
100 96 }
101 97
102 protected void Rethrow() {
103 Debug.Assert(m_error != null);
104 if (m_error is OperationCanceledException)
105 throw new OperationCanceledException("Operation cancelled", m_error);
106 else
107 throw new TargetInvocationException(m_error);
108 }
98
109 99 #endregion
110 100
111 101 #region handlers managment
112 102
113 103 protected void AddHandler(THandler handler) {
114 104
115 if (m_state > 1) {
105 if (IsFulfilled) {
116 106 // the promise is in the resolved state, just invoke the handler
117 SignalHandler(handler, m_state);
107 SignalHandler(handler);
118 108 } else {
119 if (Interlocked.CompareExchange(ref m_handler, handler, null) != null) {
120 if (m_extraHandlers == null)
121 // compare-exchange will fprotect from loosing already created queue
122 Interlocked.CompareExchange(ref m_extraHandlers, new SimpleAsyncQueue<THandler>(), null);
123 m_extraHandlers.Enqueue(handler);
124 }
109 EnqueueHandler(handler);
125 110
126 if (m_state > 1 && TryDequeueHandler(out handler))
111 if (IsFulfilled && TryDequeueHandler(out handler))
127 112 // if the promise have been resolved while we was adding the handler to the queue
128 113 // we can't guarantee that someone is still processing it
129 114 // therefore we need to fetch a handler from the queue and execute it
130 115 // note that fetched handler may be not the one that we have added
131 116 // even we can fetch no handlers at all :)
132 SignalHandler(handler, m_state);
117 SignalHandler(handler);
118 }
119
133 120 }
134 121
122 void EnqueueHandler(THandler handler) {
123 if (Interlocked.CompareExchange(ref m_handler, handler, null) != null) {
124 if (m_extraHandlers == null)
125 // compare-exchange will protect from loosing already created queue
126 Interlocked.CompareExchange(ref m_extraHandlers, new SimpleAsyncQueue<THandler>(), null);
127 m_extraHandlers.Enqueue(handler);
128 }
135 129 }
136 130
137 131 bool TryDequeueHandler(out THandler handler) {
138 132 handler = Interlocked.Exchange(ref m_handler, null);
139 133 if (handler != null)
140 134 return true;
141 135 return m_extraHandlers != null && m_extraHandlers.TryDequeue(out handler);
142 136 }
143 137
144 138 #endregion
145
146 #region IPromise implementation
147
148 public bool IsFulfilled {
149 get {
150 return m_state > TRANSITIONAL_STATE;
151 139 }
152 140 }
153 141
154 public bool IsRejected {
155 get {
156 return m_state == REJECTED_STATE;
157 }
158 }
159
160 #endregion
161
162 public Exception RejectReason {
163 get {
164 return m_error;
165 }
166 }
167
168 }
169 }
170
@@ -1,87 +1,146
1 1 using System;
2 using System.Diagnostics;
3 using System.Reflection;
2 4 using Implab.Parallels;
3 5
4 6 namespace Implab {
5 7 public abstract class AbstractPromise : AbstractEvent<AbstractPromise.HandlerDescriptor>, IPromise {
6 8 public class HandlerDescriptor {
7 readonly Action m_handler;
8 readonly Action<Exception> m_error;
9 readonly Action m_resolve;
10 readonly Action<Exception> m_reject;
11
12 readonly IDeferred m_deferred;
9 13 public HandlerDescriptor(Action success, Action<Exception> error) {
10 m_handler = success;
11 m_error = error;
14 m_resolve = success;
15 m_reject = error;
12 16 }
13 17
14 18 public void SignalSuccess() {
15 if (m_handler != null) {
16 19 try {
17 m_handler();
18 // Analysis disable once EmptyGeneralCatchClause
19 } catch {
20 if (m_resolve != null)
21 m_resolve();
22 m_deferred.Resolve();
23 } catch (Exception ex) {
24 m_deferred.Reject(ex);
25 }
26 }
27
28 public void SignalError(Exception err) {
29 if (m_reject != null) {
30 try {
31 m_reject(err);
32 m_deferred.Resolve();
33 } catch (Exception ex) {
34 m_deferred.Reject(ex);
35 }
20 36 }
21 37 }
22 38 }
23 39
24 public void SignalError(Exception err) {
25 if (m_error != null) {
26 try {
27 m_error(err);
28 // Analysis disable once EmptyGeneralCatchClause
29 } catch {
40 PromiseState m_state;
41
42 Exception m_error;
43
44 public bool IsRejected {
45 get {
46 return m_state == PromiseState.Rejected;
30 47 }
31 48 }
49
50 public bool IsResolved {
51 get {
52 return m_state == PromiseState.Resolved;
53 }
54 }
55
56 public Exception RejectReason {
57 get {
58 return m_error;
32 59 }
33 60 }
34 61
35 62
36 63 #region implemented abstract members of AbstractPromise
37 64
38 protected override void SignalHandler(HandlerDescriptor handler, int signal) {
39 switch (signal) {
40 case SUCCEEDED_STATE:
65 protected override void SignalHandler(HandlerDescriptor handler) {
66 switch (m_state) {
67 case PromiseState.Resolved:
41 68 handler.SignalSuccess();
42 69 break;
43 case REJECTED_STATE:
70 case PromiseState.Rejected:
44 71 handler.SignalError(RejectReason);
45 72 break;
46 73 default:
47 throw new InvalidOperationException(String.Format("Invalid promise signal: {0}", signal));
74 throw new InvalidOperationException(String.Format("Invalid promise signal: {0}", m_state));
48 75 }
49 76 }
50 77
51 78 protected override Signal GetFulfillSignal() {
52 79 var signal = new Signal();
53 80 On(signal.Set, e => signal.Set());
54 81 return signal;
55 82 }
56 83
57 84 #endregion
58 85
86 protected void CompleteResolve() {
87 m_state = PromiseState.Resolved;
88 CompleteTransit();
89 }
90
59 91 public Type ResultType {
60 92 get {
61 93 return typeof(void);
62 94 }
63 95 }
64 96
97 /// <summary>
98 /// Выполняет обещание, сообщая об ошибке
99 /// </summary>
100 /// <remarks>
101 /// Поскольку обещание должно работать в многопточной среде, при его выполнении сразу несколько потоков
102 /// могу вернуть ошибку, при этом только первая будет использована в качестве результата, остальные
103 /// будут проигнорированы.
104 /// </remarks>
105 /// <param name="error">Исключение возникшее при выполнении операции</param>
106 /// <exception cref="InvalidOperationException">Данное обещание уже выполнено</exception>
107 protected void SetError(Exception error) {
108 if (BeginTransit()) {
109 m_error = error;
110 m_state = PromiseState.Rejected;
111 CompleteTransit();
112 } else {
113 WaitTransition();
114 if (m_state == PromiseState.Resolved)
115 throw new InvalidOperationException("The promise is already resolved");
116 }
117 }
118
119 protected void Rethrow() {
120 Debug.Assert(m_error != null);
121 if (m_error is OperationCanceledException)
122 throw new OperationCanceledException("Operation cancelled", m_error);
123 else
124 throw new TargetInvocationException(m_error);
125 }
126
65 127 public void On(Action success, Action<Exception> error) {
66 128 AddHandler(new HandlerDescriptor(success, error));
67 129 }
68 130
69 131 public IPromise<T> Cast<T>() {
70 132 throw new InvalidCastException();
71 133 }
72 134
73 135 public void Join() {
74 136 WaitResult(-1);
137 if (IsRejected)
138 Rethrow();
75 139 }
76 140
77 141 public void Join(int timeout) {
78 142 WaitResult(timeout);
79 143 }
80
81 protected void SetResult() {
82 if(BeginSetResult())
83 EndSetResult();
84 }
85 144 }
86 145 }
87 146
@@ -1,53 +1,53
1 1 using System;
2 2 using System.Collections.Generic;
3 3 using System.Linq;
4 4 using System.Text;
5 5
6 6 namespace Implab {
7 public interface IPromise: ICancellable {
7 public interface IPromise {
8 8
9 9 /// <summary>
10 10 /// Тип результата, получаемого через данное обещание.
11 11 /// </summary>
12 12 Type ResultType { get; }
13 13
14 14 /// <summary>
15 15 /// Обещание является выполненым, либо успешно, либо с ошибкой, либо отменено.
16 16 /// </summary>
17 17 bool IsFulfilled { get; }
18 18
19 19 bool IsRejected { get; }
20 20
21 21 bool IsResolved { get; }
22 22
23 23 /// <summary>
24 24 /// Исключение возникшее в результате выполнения обещания, либо причина отмены.
25 25 /// </summary>
26 26 Exception RejectReason { get; }
27 27
28 28 /// <summary>
29 29 /// Adds specified listeners to the current promise.
30 30 /// </summary>
31 31 /// <param name="success">The handler called on the successful promise completion.</param>
32 32 /// <param name="error">The handler is called if an error while completing the promise occurred.</param>
33 33 /// <returns>The current promise.</returns>
34 34 void On(Action success, Action<Exception> error);
35 35
36 36 /// <summary>
37 37 /// Преобразует результат обещания к заданному типу и возвращает новое обещание.
38 38 /// </summary>
39 39 IPromise<T> Cast<T>();
40 40
41 41 /// <summary>
42 42 /// Синхронизирует текущий поток с обещанием.
43 43 /// </summary>
44 44 void Join();
45 45 /// <summary>
46 46 /// Синхронизирует текущий поток с обещанием.
47 47 /// </summary>
48 48 /// <param name="timeout">Время ожидания, по его истечению возникнет исключение.</param>
49 49 /// <exception cref="TimeoutException">Превышено время ожидания.</exception>
50 50 void Join(int timeout);
51 51
52 52 }
53 53 }
@@ -1,25 +1,25
1 1 using System;
2 2 using Implab.Parallels;
3 3
4 4 namespace Implab {
5 public class Promise : AbstractPromise, IDeferred {
5 public class Promise : AbstractPromise {
6 6 public static readonly IPromise Success;
7 7
8 8 static Promise() {
9 9 Success = new SuccessPromise();
10 10 }
11 11
12 public void Resolve() {
12 internal void ResolvePromise() {
13 13 SetResult();
14 14 }
15 15
16 public void Reject(Exception error) {
16 internal void RejectPromise(Exception error) {
17 17 SetError(error);
18 18 }
19 19
20 public static IPromise FromException(Exception exception) {
20 public static IPromise Reject(Exception exception) {
21 21 return new FailedPromise(exception);
22 22 }
23 23 }
24 24 }
25 25
General Comments 3
Under Review
author

Auto status change to "Under Review"

Approved
author

ok, latest stable version should be in default

You need to be logged in to leave comments. Login now