##// END OF EJS Templates
fixed Resove method bug when calling it on already cancelled promise
cin -
r130:671f60cd0250 v2
parent child
Show More
@@ -1,122 +1,122
1 1 using Implab.Parallels;
2 2 using System;
3 3 using System.Collections.Generic;
4 4 using System.Linq;
5 5 using System.Text;
6 6 using System.Threading;
7 7 using System.Threading.Tasks;
8 8 using System.Windows.Forms;
9 9
10 10 namespace Implab.Diagnostics.Interactive
11 11 {
12 12 public class InteractiveListener: TextListenerBase
13 13 {
14 14 TraceForm m_form;
15 15
16 16 SynchronizationContext m_syncGuiThread;
17 readonly Promise<object> m_guiStarted = new Promise<object>();
17 readonly Promise m_guiStarted = new Promise();
18 18
19 19 readonly IPromise m_guiFinished;
20 20 // readonly IPromise m_workerFinished = new Promise<object>();
21 21
22 22 readonly MTQueue<TraceViewItem> m_queue = new MTQueue<TraceViewItem>();
23 23 readonly AutoResetEvent m_queueEvent = new AutoResetEvent(false);
24 24
25 25 int m_queueLength;
26 26 bool m_exitPending;
27 27
28 28 readonly object m_pauseLock = new object();
29 29 bool m_paused;
30 30 readonly ManualResetEvent m_pauseEvent = new ManualResetEvent(true);
31 31
32 32 public InteractiveListener(bool global) : base(global) {
33 m_guiFinished = AsyncPool.InvokeNewThread(GuiThread);
34 /*m_workerFinished = */AsyncPool.InvokeNewThread(QueueThread);
33 m_guiFinished = AsyncPool.RunThread(GuiThread);
34 /*m_workerFinished = */AsyncPool.RunThread(QueueThread);
35 35
36 36 m_guiStarted.Join();
37 37 }
38 38
39 39 void GuiThread() {
40 40 m_form = new TraceForm(); // will create SynchronizationContext
41 41
42 42 m_form.PauseEvents += (s,a) => Pause();
43 43 m_form.ResumeEvents += (s, a) => Resume();
44 44
45 45 m_syncGuiThread = SynchronizationContext.Current;
46 46 m_guiStarted.Resolve();
47 47 Application.Run();
48 48 }
49 49
50 50 void QueueThread() {
51 51 while (!m_exitPending) {
52 52 if (m_paused)
53 53 m_pauseEvent.WaitOne();
54 54
55 55 TraceViewItem item;
56 56 if (m_queue.TryDequeue(out item)) {
57 57 Interlocked.Decrement(ref m_queueLength);
58 58
59 59 m_syncGuiThread.Send(x => m_form.AddTraceEvent(item),null);
60 60 } else {
61 61 m_queueEvent.WaitOne();
62 62 }
63 63 }
64 64 }
65 65
66 66 public void Pause() {
67 67 // for consistency we need to set this properties atomically
68 68 lock (m_pauseLock) {
69 69 m_pauseEvent.Reset();
70 70 m_paused = true;
71 71 }
72 72 }
73 73
74 74 public void Resume() {
75 75 // for consistency we need to set this properties atomically
76 76 lock (m_pauseLock) {
77 77 m_paused = false;
78 78 m_pauseEvent.Set();
79 79 }
80 80 }
81 81
82 82 void Enqueue(TraceViewItem item) {
83 83 m_queue.Enqueue(item);
84 84 if (Interlocked.Increment(ref m_queueLength) == 1)
85 85 m_queueEvent.Set();
86 86 }
87 87
88 88 public void ShowForm() {
89 89 m_syncGuiThread.Post(x => m_form.Show(), null);
90 90 }
91 91
92 92 public void HideForm() {
93 93 m_syncGuiThread.Post(x => m_form.Hide(), null);
94 94 }
95 95
96 96 void Terminate() {
97 97 m_exitPending = true;
98 98 Resume();
99 99 m_syncGuiThread.Post(x => Application.ExitThread(), null);
100 100 }
101 101
102 102 protected override void Dispose(bool disposing) {
103 103 if (disposing) {
104 104 Terminate();
105 105 m_guiFinished.Join();
106 106 }
107 107 base.Dispose(disposing);
108 108 }
109 109
110 110 protected override void WriteEntry(LogEventArgs args, EventText text, string channel) {
111 111 var item = new TraceViewItem {
112 112 Indent = text.indent,
113 113 Message = text.content,
114 114 Thread = args.ThreadId,
115 115 Channel = channel,
116 116 Timestamp = Environment.TickCount
117 117 };
118 118
119 119 Enqueue(item);
120 120 }
121 121 }
122 122 }
@@ -1,289 +1,291
1 1 using System;
2 2 using Implab.Parallels;
3 3 using System.Threading;
4 4 using System.Reflection;
5 5
6 6 namespace Implab {
7 7 public abstract class AbstractPromise<THandler> {
8 8
9 9 const int UNRESOLVED_SATE = 0;
10 10 const int TRANSITIONAL_STATE = 1;
11 11 const int SUCCEEDED_STATE = 2;
12 12 const int REJECTED_STATE = 3;
13 13 const int CANCELLED_STATE = 4;
14 14
15 15 const int RESERVED_HANDLERS_COUNT = 4;
16 16
17 17 int m_state;
18 18 Exception m_error;
19 19 int m_handlersCount;
20 20
21 21 readonly THandler[] m_handlers = new THandler[RESERVED_HANDLERS_COUNT];
22 22 MTQueue<THandler> m_extraHandlers;
23 23 int m_handlerPointer = -1;
24 24 int m_handlersCommited;
25 25
26 26 #region state managment
27 27 bool BeginTransit() {
28 28 return UNRESOLVED_SATE == Interlocked.CompareExchange(ref m_state, TRANSITIONAL_STATE, UNRESOLVED_SATE);
29 29 }
30 30
31 31 void CompleteTransit(int state) {
32 32 if (TRANSITIONAL_STATE != Interlocked.CompareExchange(ref m_state, state, TRANSITIONAL_STATE))
33 33 throw new InvalidOperationException("Can't complete transition when the object isn't in the transitional state");
34 34 }
35 35
36 36 void WaitTransition() {
37 37 while (m_state == TRANSITIONAL_STATE) {
38 38 Thread.MemoryBarrier();
39 39 }
40 40 }
41 41
42 protected void BeginSetResult() {
42 protected bool BeginSetResult() {
43 43 if (!BeginTransit()) {
44 44 WaitTransition();
45 45 if (m_state != CANCELLED_STATE)
46 46 throw new InvalidOperationException("The promise is already resolved");
47 return false;
47 48 }
49 return true;
48 50 }
49 51
50 52 protected void EndSetResult() {
51 53 CompleteTransit(SUCCEEDED_STATE);
52 54 OnSuccess();
53 55 }
54 56
55 57
56 58
57 59 /// <summary>
58 60 /// Выполняет обещание, сообщая об ошибке
59 61 /// </summary>
60 62 /// <remarks>
61 63 /// Поскольку обещание должно работать в многопточной среде, при его выполнении сразу несколько потоков
62 64 /// могу вернуть ошибку, при этом только первая будет использована в качестве результата, остальные
63 65 /// будут проигнорированы.
64 66 /// </remarks>
65 67 /// <param name="error">Исключение возникшее при выполнении операции</param>
66 68 /// <exception cref="InvalidOperationException">Данное обещание уже выполнено</exception>
67 69 protected void SetError(Exception error) {
68 70 if (BeginTransit()) {
69 71 m_error = error is PromiseTransientException ? error.InnerException : error;
70 72 CompleteTransit(REJECTED_STATE);
71 73 OnError();
72 74 } else {
73 75 WaitTransition();
74 76 if (m_state == SUCCEEDED_STATE)
75 77 throw new InvalidOperationException("The promise is already resolved");
76 78 }
77 79 }
78 80
79 81 /// <summary>
80 82 /// Отменяет операцию, если это возможно.
81 83 /// </summary>
82 84 /// <remarks>Для определения была ли операция отменена следует использовать свойство <see cref="IsCancelled"/>.</remarks>
83 85 protected void SetCancelled() {
84 86 if (BeginTransit()) {
85 87 CompleteTransit(CANCELLED_STATE);
86 88 OnCancelled();
87 89 }
88 90 }
89 91
90 92 protected abstract void SignalSuccess(THandler handler);
91 93
92 94 protected abstract void SignalError(THandler handler, Exception error);
93 95
94 96 protected abstract void SignalCancelled(THandler handler);
95 97
96 98 void OnSuccess() {
97 99 var hp = m_handlerPointer;
98 100 var slot = hp +1 ;
99 101 while (slot < m_handlersCommited) {
100 102 if (Interlocked.CompareExchange(ref m_handlerPointer, slot, hp) == hp) {
101 103 SignalSuccess(m_handlers[slot]);
102 104 }
103 105 hp = m_handlerPointer;
104 106 slot = hp +1 ;
105 107 }
106 108
107 109
108 110 if (m_extraHandlers != null) {
109 111 THandler handler;
110 112 while (m_extraHandlers.TryDequeue(out handler))
111 113 SignalSuccess(handler);
112 114 }
113 115 }
114 116
115 117 void OnError() {
116 118 var hp = m_handlerPointer;
117 119 var slot = hp +1 ;
118 120 while (slot < m_handlersCommited) {
119 121 if (Interlocked.CompareExchange(ref m_handlerPointer, slot, hp) == hp) {
120 122 SignalError(m_handlers[slot],m_error);
121 123 }
122 124 hp = m_handlerPointer;
123 125 slot = hp +1 ;
124 126 }
125 127
126 128 if (m_extraHandlers != null) {
127 129 THandler handler;
128 130 while (m_extraHandlers.TryDequeue(out handler))
129 131 SignalError(handler, m_error);
130 132 }
131 133 }
132 134
133 135 void OnCancelled() {
134 136 var hp = m_handlerPointer;
135 137 var slot = hp +1 ;
136 138 while (slot < m_handlersCommited) {
137 139 if (Interlocked.CompareExchange(ref m_handlerPointer, slot, hp) == hp) {
138 140 SignalCancelled(m_handlers[slot]);
139 141 }
140 142 hp = m_handlerPointer;
141 143 slot = hp +1 ;
142 144 }
143 145
144 146 if (m_extraHandlers != null) {
145 147 THandler handler;
146 148 while (m_extraHandlers.TryDequeue(out handler))
147 149 SignalCancelled(handler);
148 150 }
149 151 }
150 152
151 153 #endregion
152 154
153 155 protected abstract void Listen(PromiseEventType events, Action handler);
154 156
155 157 #region synchronization traits
156 158 protected void WaitResult(int timeout) {
157 159 if (!IsResolved) {
158 160 var lk = new object();
159 161
160 162 Listen(PromiseEventType.All, () => {
161 163 lock(lk) {
162 164 Monitor.Pulse(lk);
163 165 }
164 166 });
165 167
166 168 lock (lk) {
167 169 while(!IsResolved) {
168 170 if(!Monitor.Wait(lk,timeout))
169 171 throw new TimeoutException();
170 172 }
171 173 }
172 174
173 175 }
174 176 switch (m_state) {
175 177 case SUCCEEDED_STATE:
176 178 return;
177 179 case CANCELLED_STATE:
178 180 throw new OperationCanceledException();
179 181 case REJECTED_STATE:
180 182 throw new TargetInvocationException(m_error);
181 183 default:
182 184 throw new ApplicationException(String.Format("Invalid promise state {0}", m_state));
183 185 }
184 186 }
185 187 #endregion
186 188
187 189 #region handlers managment
188 190
189 191 protected void AddHandler(THandler handler) {
190 192
191 193 if (m_state > 1) {
192 194 // the promise is in the resolved state, just invoke the handler
193 195 InvokeHandler(handler);
194 196 } else {
195 197 var slot = Interlocked.Increment(ref m_handlersCount) - 1;
196 198
197 199 if (slot < RESERVED_HANDLERS_COUNT) {
198 200 m_handlers[slot] = handler;
199 201
200 202 while (slot != Interlocked.CompareExchange(ref m_handlersCommited, slot + 1, slot)) {
201 203 }
202 204
203 205 if (m_state > 1) {
204 206 do {
205 207 var hp = m_handlerPointer;
206 208 slot = hp + 1;
207 209 if (slot < m_handlersCommited) {
208 210 if (Interlocked.CompareExchange(ref m_handlerPointer, slot, hp) != hp)
209 211 continue;
210 212 InvokeHandler(m_handlers[slot]);
211 213 }
212 214 break;
213 215 } while(true);
214 216 }
215 217 } else {
216 218 if (slot == RESERVED_HANDLERS_COUNT) {
217 219 m_extraHandlers = new MTQueue<THandler>();
218 220 } else {
219 221 while (m_extraHandlers == null)
220 222 Thread.MemoryBarrier();
221 223 }
222 224
223 225 m_extraHandlers.Enqueue(handler);
224 226
225 227 if (m_state > 1 && m_extraHandlers.TryDequeue(out handler))
226 228 // if the promise have been resolved while we was adding the handler to the queue
227 229 // we can't guarantee that someone is still processing it
228 230 // therefore we need to fetch a handler from the queue and execute it
229 231 // note that fetched handler may be not the one that we have added
230 232 // even we can fetch no handlers at all :)
231 233 InvokeHandler(handler);
232 234 }
233 235 }
234 236 }
235 237
236 238 protected void InvokeHandler(THandler handler) {
237 239 switch (m_state) {
238 240 case SUCCEEDED_STATE:
239 241 SignalSuccess(handler);
240 242 break;
241 243 case CANCELLED_STATE:
242 244 SignalCancelled(handler);
243 245 break;
244 246 case REJECTED_STATE:
245 247 SignalError(handler, m_error);
246 248 break;
247 249 default:
248 250 throw new Exception(String.Format("Invalid promise state {0}", m_state));
249 251 }
250 252 }
251 253
252 254 #endregion
253 255
254 256 #region IPromise implementation
255 257
256 258 public void Join(int timeout) {
257 259 WaitResult(timeout);
258 260 }
259 261
260 262 public void Join() {
261 263 WaitResult(-1);
262 264 }
263 265
264 266 public bool IsResolved {
265 267 get {
266 268 Thread.MemoryBarrier();
267 269 return m_state > 1;
268 270 }
269 271 }
270 272
271 273 public bool IsCancelled {
272 274 get {
273 275 Thread.MemoryBarrier();
274 276 return m_state == CANCELLED_STATE;
275 277 }
276 278 }
277 279
278 280 #endregion
279 281
280 282 #region ICancellable implementation
281 283
282 284 public void Cancel() {
283 285 SetCancelled();
284 286 }
285 287
286 288 #endregion
287 289 }
288 290 }
289 291
@@ -1,629 +1,631
1 1 using System.Threading;
2 2 using System.Collections.Generic;
3 3 using System;
4 4 using System.Collections;
5 5 using System.Diagnostics;
6 6
7 7 namespace Implab.Parallels {
8 8 public class AsyncQueue<T> : IEnumerable<T> {
9 9 class Chunk {
10 10 public Chunk next;
11 11
12 12 int m_low;
13 13 int m_hi;
14 14 int m_alloc;
15 15 readonly int m_size;
16 16 readonly T[] m_data;
17 17
18 18 public Chunk(int size) {
19 19 m_size = size;
20 20 m_data = new T[size];
21 21 }
22 22
23 23 public Chunk(int size, T value) {
24 24 m_size = size;
25 25 m_hi = 1;
26 26 m_alloc = 1;
27 27 m_data = new T[size];
28 28 m_data[0] = value;
29 29 }
30 30
31 31 public Chunk(int size, T[] data, int offset, int length, int alloc) {
32 32 m_size = size;
33 33 m_hi = length;
34 34 m_alloc = alloc;
35 35 m_data = new T[size];
36 36 Array.Copy(data, offset, m_data, 0, length);
37 37 }
38 38
39 39 public int Low {
40 40 get { return m_low; }
41 41 }
42 42
43 43 public int Hi {
44 44 get { return m_hi; }
45 45 }
46 46
47 47 public int Size {
48 48 get { return m_size; }
49 49 }
50 50
51 51 public bool TryEnqueue(T value, out bool extend) {
52 52 var alloc = Interlocked.Increment(ref m_alloc) - 1;
53 53
54 54 if (alloc >= m_size) {
55 55 extend = alloc == m_size;
56 56 return false;
57 57 }
58 58
59 59 extend = false;
60 60 m_data[alloc] = value;
61 61
62 62 while (alloc != Interlocked.CompareExchange(ref m_hi, alloc + 1, alloc)) {
63 63 // spin wait for commit
64 64 }
65 65 return true;
66 66 }
67 67
68 68 /// <summary>
69 69 /// Prevents from allocating new space in the chunk and waits for all write operations to complete
70 70 /// </summary>
71 71 public void Commit() {
72 72 var actual = Math.Min(Interlocked.Exchange(ref m_alloc, m_size + 1), m_size);
73 73
74 74 while (m_hi != actual)
75 75 Thread.MemoryBarrier();
76 76 }
77 77
78 78 public bool TryDequeue(out T value, out bool recycle) {
79 79 int low;
80 80 do {
81 81 low = m_low;
82 82 if (low >= m_hi) {
83 83 value = default(T);
84 84 recycle = (low == m_size);
85 85 return false;
86 86 }
87 87 } while(low != Interlocked.CompareExchange(ref m_low, low + 1, low));
88 88
89 89 recycle = (low == m_size - 1);
90 90 value = m_data[low];
91 91
92 92 return true;
93 93 }
94 94
95 95 public bool TryEnqueueBatch(T[] batch, int offset, int length, out int enqueued, out bool extend) {
96 96 //int alloc;
97 97 //int allocSize;
98 98
99 99 var alloc = Interlocked.Add(ref m_alloc, length) - length;
100 100 if (alloc > m_size) {
101 101 // the chunk is full and someone already
102 102 // creating the new one
103 103 enqueued = 0; // nothing was added
104 104 extend = false; // the caller shouldn't try to extend the queue
105 105 return false; // nothing was added
106 106 }
107 107
108 108 enqueued = Math.Min(m_size - alloc, length);
109 109 extend = length > enqueued;
110 110
111 111 if (enqueued == 0)
112 112 return false;
113 113
114 114
115 115 Array.Copy(batch, offset, m_data, alloc, enqueued);
116 116
117 117 while (alloc != Interlocked.CompareExchange(ref m_hi, alloc + enqueued, alloc)) {
118 118 // spin wait for commit
119 119 }
120 120
121 121 return true;
122 122 }
123 123
124 124 public bool TryDequeueBatch(T[] buffer, int offset, int length,out int dequeued, out bool recycle) {
125 125 int low, hi, batchSize;
126 126
127 127 do {
128 128 low = m_low;
129 129 hi = m_hi;
130 130 if (low >= hi) {
131 131 dequeued = 0;
132 132 recycle = (low == m_size); // recycling could be restarted and we need to signal again
133 133 return false;
134 134 }
135 135 batchSize = Math.Min(hi - low, length);
136 136 } while(low != Interlocked.CompareExchange(ref m_low, low + batchSize, low));
137 137
138 138 recycle = (low == m_size - batchSize);
139 139 dequeued = batchSize;
140 140
141 141 Array.Copy(m_data, low, buffer, offset, batchSize);
142 142
143 143 return true;
144 144 }
145 145
146 146 public T GetAt(int pos) {
147 147 return m_data[pos];
148 148 }
149 149 }
150 150
151 151 public const int DEFAULT_CHUNK_SIZE = 32;
152 152 public const int MAX_CHUNK_SIZE = 262144;
153 153
154 154 Chunk m_first;
155 155 Chunk m_last;
156 156
157 157 /// <summary>
158 158 /// Adds the specified value to the queue.
159 159 /// </summary>
160 160 /// <param name="value">Tha value which will be added to the queue.</param>
161 161 public void Enqueue(T value) {
162 162 var last = m_last;
163 163 // spin wait to the new chunk
164 164 bool extend = true;
165 165 while (last == null || !last.TryEnqueue(value, out extend)) {
166 166 // try to extend queue
167 167 if (extend || last == null) {
168 168 var chunk = new Chunk(DEFAULT_CHUNK_SIZE, value);
169 169 if (EnqueueChunk(last, chunk))
170 170 break; // success! exit!
171 171 last = m_last;
172 172 } else {
173 173 while (last == m_last) {
174 174 Thread.MemoryBarrier();
175 175 }
176 176 last = m_last;
177 177 }
178 178 }
179 179 }
180 180
181 181 /// <summary>
182 182 /// Adds the specified data to the queue.
183 183 /// </summary>
184 184 /// <param name="data">The buffer which contains the data to be enqueued.</param>
185 185 /// <param name="offset">The offset of the data in the buffer.</param>
186 186 /// <param name="length">The size of the data to read from the buffer.</param>
187 187 public void EnqueueRange(T[] data, int offset, int length) {
188 188 if (data == null)
189 189 throw new ArgumentNullException("data");
190 if (length == 0)
191 return;
190 192 if (offset < 0)
191 193 throw new ArgumentOutOfRangeException("offset");
192 194 if (length < 1 || offset + length > data.Length)
193 195 throw new ArgumentOutOfRangeException("length");
194 196
195 197 var last = m_last;
196 198
197 199 bool extend;
198 200 int enqueued;
199 201
200 202 while (length > 0) {
201 203 extend = true;
202 204 if (last != null && last.TryEnqueueBatch(data, offset, length, out enqueued, out extend)) {
203 205 length -= enqueued;
204 206 offset += enqueued;
205 207 }
206 208
207 209 if (extend) {
208 210 // there was no enough space in the chunk
209 211 // or there was no chunks in the queue
210 212
211 213 while (length > 0) {
212 214
213 215 var size = Math.Min(length, MAX_CHUNK_SIZE);
214 216
215 217 var chunk = new Chunk(
216 218 Math.Max(size, DEFAULT_CHUNK_SIZE),
217 219 data,
218 220 offset,
219 221 size,
220 222 length // length >= size
221 223 );
222 224
223 225 if (!EnqueueChunk(last, chunk)) {
224 226 // looks like the queue has been updated then proceed from the beginning
225 227 last = m_last;
226 228 break;
227 229 }
228 230
229 231 // we have successfully added the new chunk
230 232 last = chunk;
231 233 length -= size;
232 234 offset += size;
233 235 }
234 236 } else {
235 237 // we don't need to extend the queue, if we successfully enqueued data
236 238 if (length == 0)
237 239 break;
238 240
239 241 // if we need to wait while someone is extending the queue
240 242 // spinwait
241 243 while (last == m_last) {
242 244 Thread.MemoryBarrier();
243 245 }
244 246
245 247 last = m_last;
246 248 }
247 249 }
248 250 }
249 251
250 252 /// <summary>
251 253 /// Tries to retrieve the first element from the queue.
252 254 /// </summary>
253 255 /// <returns><c>true</c>, if element is dequeued, <c>false</c> otherwise.</returns>
254 256 /// <param name="value">The value of the dequeued element.</param>
255 257 public bool TryDequeue(out T value) {
256 258 var chunk = m_first;
257 259 bool recycle;
258 260 while (chunk != null) {
259 261
260 262 var result = chunk.TryDequeue(out value, out recycle);
261 263
262 264 if (recycle) // this chunk is waste
263 265 RecycleFirstChunk(chunk);
264 266 else
265 267 return result; // this chunk is usable and returned actual result
266 268
267 269 if (result) // this chunk is waste but the true result is always actual
268 270 return true;
269 271
270 272 // try again
271 273 chunk = m_first;
272 274 }
273 275
274 276 // the queue is empty
275 277 value = default(T);
276 278 return false;
277 279 }
278 280
279 281 /// <summary>
280 282 /// Tries to dequeue the specified amount of data from the queue.
281 283 /// </summary>
282 284 /// <returns><c>true</c>, if data was deuqueued, <c>false</c> otherwise.</returns>
283 285 /// <param name="buffer">The buffer to which the data will be written.</param>
284 286 /// <param name="offset">The offset in the buffer at which the data will be written.</param>
285 287 /// <param name="length">The maximum amount of data to be retrieved.</param>
286 288 /// <param name="dequeued">The actual amout of the retrieved data.</param>
287 289 public bool TryDequeueRange(T[] buffer, int offset, int length, out int dequeued) {
288 290 if (buffer == null)
289 291 throw new ArgumentNullException("buffer");
290 292 if (offset < 0)
291 293 throw new ArgumentOutOfRangeException("offset");
292 294 if (length < 1 || offset + length > buffer.Length)
293 295 throw new ArgumentOutOfRangeException("length");
294 296
295 297 var chunk = m_first;
296 298 bool recycle;
297 299 dequeued = 0;
298 300 while (chunk != null) {
299 301
300 302 int actual;
301 303 if (chunk.TryDequeueBatch(buffer, offset, length, out actual, out recycle)) {
302 304 offset += actual;
303 305 length -= actual;
304 306 dequeued += actual;
305 307 }
306 308
307 309 if (recycle) // this chunk is waste
308 310 RecycleFirstChunk(chunk);
309 311 else if (actual == 0)
310 312 break; // the chunk is usable, but doesn't contain any data (it's the last chunk in the queue)
311 313
312 314 if (length == 0)
313 315 return true;
314 316
315 317 // we still may dequeue something
316 318 // try again
317 319 chunk = m_first;
318 320 }
319 321
320 322 return dequeued != 0;
321 323 }
322 324
323 325 /// <summary>
324 326 /// Tries to dequeue all remaining data in the first chunk.
325 327 /// </summary>
326 328 /// <returns><c>true</c>, if data was dequeued, <c>false</c> otherwise.</returns>
327 329 /// <param name="buffer">The buffer to which the data will be written.</param>
328 330 /// <param name="offset">The offset in the buffer at which the data will be written.</param>
329 331 /// <param name="length">Tha maximum amount of the data to be dequeued.</param>
330 332 /// <param name="dequeued">The actual amount of the dequeued data.</param>
331 333 public bool TryDequeueChunk(T[] buffer, int offset, int length, out int dequeued) {
332 334 if (buffer == null)
333 335 throw new ArgumentNullException("buffer");
334 336 if (offset < 0)
335 337 throw new ArgumentOutOfRangeException("offset");
336 338 if (length < 1 || offset + length > buffer.Length)
337 339 throw new ArgumentOutOfRangeException("length");
338 340
339 341 var chunk = m_first;
340 342 bool recycle;
341 343 dequeued = 0;
342 344
343 345 while (chunk != null) {
344 346
345 347 int actual;
346 348 if (chunk.TryDequeueBatch(buffer, offset, length, out actual, out recycle)) {
347 349 dequeued = actual;
348 350 }
349 351
350 352 if (recycle) // this chunk is waste
351 353 RecycleFirstChunk(chunk);
352 354
353 355 // if we have dequeued any data, then return
354 356 if (dequeued != 0)
355 357 return true;
356 358
357 359 // we still may dequeue something
358 360 // try again
359 361 chunk = m_first;
360 362 }
361 363
362 364 return false;
363 365 }
364 366
365 367 bool EnqueueChunk(Chunk last, Chunk chunk) {
366 368 if (Interlocked.CompareExchange(ref m_last, chunk, last) != last)
367 369 return false;
368 370
369 371 if (last != null)
370 372 last.next = chunk;
371 373 else {
372 374 m_first = chunk;
373 375 }
374 376 return true;
375 377 }
376 378
377 379 void RecycleFirstChunk(Chunk first) {
378 380 var next = first.next;
379 381
380 382 if (first != Interlocked.CompareExchange(ref m_first, next, first))
381 383 return;
382 384
383 385 if (next == null) {
384 386
385 387 if (first != Interlocked.CompareExchange(ref m_last, null, first)) {
386 388 /*while (first.next == null)
387 389 Thread.MemoryBarrier();*/
388 390
389 391 // race
390 392 // someone already updated the tail, restore the pointer to the queue head
391 393 m_first = first;
392 394 }
393 395 // the tail is updated
394 396 }
395 397
396 398 // we need to update the head
397 399 //Interlocked.CompareExchange(ref m_first, next, first);
398 400 // if the head is already updated then give up
399 401 //return;
400 402
401 403 }
402 404
403 405 public void Clear() {
404 406 // start the new queue
405 407 var chunk = new Chunk(DEFAULT_CHUNK_SIZE);
406 408
407 409 do {
408 410 Thread.MemoryBarrier();
409 411 var first = m_first;
410 412 var last = m_last;
411 413
412 414 if (last == null) // nothing to clear
413 415 return;
414 416
415 417 if (first == null || (first.next == null && first != last)) // inconcistency
416 418 continue;
417 419
418 420 // here we will create inconsistency which will force others to spin
419 421 // and prevent from fetching. chunk.next = null
420 422 if (first != Interlocked.CompareExchange(ref m_first, chunk, first))
421 423 continue;// inconsistent
422 424
423 425 m_last = chunk;
424 426
425 427 return;
426 428
427 429 } while(true);
428 430 }
429 431
430 432 public T[] Drain() {
431 433 // start the new queue
432 434 var chunk = new Chunk(DEFAULT_CHUNK_SIZE);
433 435
434 436 do {
435 437 Thread.MemoryBarrier();
436 438 var first = m_first;
437 439 var last = m_last;
438 440
439 441 if (last == null)
440 442 return new T[0];
441 443
442 444 if (first == null || (first.next == null && first != last))
443 445 continue;
444 446
445 447 // here we will create inconsistency which will force others to spin
446 448 // and prevent from fetching. chunk.next = null
447 449 if (first != Interlocked.CompareExchange(ref m_first, chunk, first))
448 450 continue;// inconsistent
449 451
450 452 last = Interlocked.Exchange(ref m_last, chunk);
451 453
452 454 return ReadChunks(first, last);
453 455
454 456 } while(true);
455 457 }
456 458
457 459 static T[] ReadChunks(Chunk chunk, object last) {
458 460 var result = new List<T>();
459 461 var buffer = new T[DEFAULT_CHUNK_SIZE];
460 462 int actual;
461 463 bool recycle;
462 464 while (chunk != null) {
463 465 // ensure all write operations on the chunk are complete
464 466 chunk.Commit();
465 467
466 468 // we need to read the chunk using this way
467 469 // since some client still may completing the dequeue
468 470 // operation, such clients most likely won't get results
469 471 while (chunk.TryDequeueBatch(buffer, 0, buffer.Length, out actual, out recycle))
470 472 result.AddRange(new ArraySegmentCollection(buffer, 0, actual));
471 473
472 474 if (chunk == last) {
473 475 chunk = null;
474 476 } else {
475 477 while (chunk.next == null)
476 478 Thread.MemoryBarrier();
477 479 chunk = chunk.next;
478 480 }
479 481 }
480 482
481 483 return result.ToArray();
482 484 }
483 485
484 486 struct ArraySegmentCollection : ICollection<T> {
485 487 readonly T[] m_data;
486 488 readonly int m_offset;
487 489 readonly int m_length;
488 490
489 491 public ArraySegmentCollection(T[] data, int offset, int length) {
490 492 m_data = data;
491 493 m_offset = offset;
492 494 m_length = length;
493 495 }
494 496
495 497 #region ICollection implementation
496 498
497 499 public void Add(T item) {
498 500 throw new NotSupportedException();
499 501 }
500 502
501 503 public void Clear() {
502 504 throw new NotSupportedException();
503 505 }
504 506
505 507 public bool Contains(T item) {
506 508 return false;
507 509 }
508 510
509 511 public void CopyTo(T[] array, int arrayIndex) {
510 512 Array.Copy(m_data,m_offset,array,arrayIndex, m_length);
511 513 }
512 514
513 515 public bool Remove(T item) {
514 516 throw new NotSupportedException();
515 517 }
516 518
517 519 public int Count {
518 520 get {
519 521 return m_length;
520 522 }
521 523 }
522 524
523 525 public bool IsReadOnly {
524 526 get {
525 527 return true;
526 528 }
527 529 }
528 530
529 531 #endregion
530 532
531 533 #region IEnumerable implementation
532 534
533 535 public IEnumerator<T> GetEnumerator() {
534 536 for (int i = m_offset; i < m_length + m_offset; i++)
535 537 yield return m_data[i];
536 538 }
537 539
538 540 #endregion
539 541
540 542 #region IEnumerable implementation
541 543
542 544 IEnumerator IEnumerable.GetEnumerator() {
543 545 return GetEnumerator();
544 546 }
545 547
546 548 #endregion
547 549 }
548 550
549 551 #region IEnumerable implementation
550 552
551 553 class Enumerator : IEnumerator<T> {
552 554 Chunk m_current;
553 555 int m_pos = -1;
554 556
555 557 public Enumerator(Chunk fisrt) {
556 558 m_current = fisrt;
557 559 }
558 560
559 561 #region IEnumerator implementation
560 562
561 563 public bool MoveNext() {
562 564 if (m_current == null)
563 565 return false;
564 566
565 567 if (m_pos == -1)
566 568 m_pos = m_current.Low;
567 569 else
568 570 m_pos++;
569 571
570 572 if (m_pos == m_current.Hi) {
571 573
572 574 m_current = m_pos == m_current.Size ? m_current.next : null;
573 575
574 576 m_pos = 0;
575 577
576 578 if (m_current == null)
577 579 return false;
578 580 }
579 581
580 582 return true;
581 583 }
582 584
583 585 public void Reset() {
584 586 throw new NotSupportedException();
585 587 }
586 588
587 589 object IEnumerator.Current {
588 590 get {
589 591 return Current;
590 592 }
591 593 }
592 594
593 595 #endregion
594 596
595 597 #region IDisposable implementation
596 598
597 599 public void Dispose() {
598 600 }
599 601
600 602 #endregion
601 603
602 604 #region IEnumerator implementation
603 605
604 606 public T Current {
605 607 get {
606 608 if (m_pos == -1 || m_current == null)
607 609 throw new InvalidOperationException();
608 610 return m_current.GetAt(m_pos);
609 611 }
610 612 }
611 613
612 614 #endregion
613 615 }
614 616
615 617 public IEnumerator<T> GetEnumerator() {
616 618 return new Enumerator(m_first);
617 619 }
618 620
619 621 #endregion
620 622
621 623 #region IEnumerable implementation
622 624
623 625 IEnumerator IEnumerable.GetEnumerator() {
624 626 return GetEnumerator();
625 627 }
626 628
627 629 #endregion
628 630 }
629 631 }
@@ -1,75 +1,76
1 1 using System;
2 2 using System.Threading;
3 3 using System.Diagnostics;
4 4
5 5 namespace Implab.Parallels {
6 6 /// <summary>
7 7 /// Implements a lightweight mechanism to aquire a shared or an exclusive lock.
8 8 /// </summary>
9 9 public class SharedLock {
10 10 readonly object m_lock = new object();
11 11 int m_locks;
12 12 bool m_exclusive;
13 13
14 14 public bool LockExclusive(int timeout) {
15 15 lock (m_lock) {
16 16 if (m_locks > 0 && !Monitor.Wait(m_lock, timeout))
17 17 return false;
18 18 m_exclusive = true;
19 19 m_locks = 1;
20 return true;
20 21 }
21 22 }
22 23
23 24 public void LockExclusive() {
24 25 LockExclusive(-1);
25 26 }
26 27
27 28 public bool LockShared(int timeout) {
28 29 lock (m_lock) {
29 30 if (!m_exclusive) {
30 31 m_locks++;
31 32 return true;
32 33 }
33 34
34 if (m_lock == 0) {
35 if (m_locks == 0) {
35 36 m_exclusive = false;
36 37 m_locks = 1;
37 38 return true;
38 39 }
39 40
40 41 if (Monitor.Wait(m_lock, timeout)) {
41 42 Debug.Assert(m_locks == 0);
42 43 m_locks = 1;
43 44 m_exclusive = false;
44 45 return true;
45 46 }
46 47 return false;
47 48 }
48 49 }
49 50
50 51 public void LockShared() {
51 52 LockShared(-1);
52 53 }
53 54
54 55 public void ReleaseShared() {
55 56 lock (m_lock) {
56 57 if (m_exclusive || m_locks <= 0)
57 58 throw new InvalidOperationException();
58 59 m_locks--;
59 60 if (m_locks == 0)
60 61 Monitor.PulseAll(m_lock);
61 62 }
62 63 }
63 64
64 65 public void ReleaseExclusive() {
65 66 lock (m_lock) {
66 67 if (!m_exclusive && m_locks != 1)
67 68 throw new InvalidOperationException();
68 69 m_locks = 0;
69 70 Monitor.PulseAll(m_lock);
70 71 }
71 72 }
72 73
73 74 }
74 75 }
75 76
@@ -1,621 +1,622
1 1 using System;
2 2 using System.Diagnostics;
3 3
4 4 namespace Implab {
5 5
6 6 /// <summary>
7 7 /// Класс для асинхронного получения результатов. Так называемое "обещание".
8 8 /// </summary>
9 9 /// <typeparam name="T">Тип получаемого результата</typeparam>
10 10 /// <remarks>
11 11 /// <para>Сервис при обращении к его методу дает обещаиние о выполнении операции,
12 12 /// клиент получив такое обещание может установить ряд обратных вызово для получения
13 13 /// событий выполнения обещания, тоесть завершения операции и предоставлении результатов.</para>
14 14 /// <para>
15 15 /// Обещение может быть как выполнено, так и выполнено с ошибкой. Для подписки на
16 16 /// данные события клиент должен использовать методы <c>Then</c>.
17 17 /// </para>
18 18 /// <para>
19 19 /// Сервис, в свою очередь, по окончанию выполнения операции (возможно с ошибкой),
20 20 /// использует методы <c>Resolve</c> либо <c>Reject</c> для оповещения клиетна о
21 21 /// выполнении обещания.
22 22 /// </para>
23 23 /// <para>
24 24 /// Если сервер успел выполнить обещание еще до того, как клиент на него подписался,
25 25 /// то в момент подписки клиента будут вызваны соответсвующие события в синхронном
26 26 /// режиме и клиент будет оповещен в любом случае. Иначе, обработчики добавляются в
27 27 /// список в порядке подписания и в этом же порядке они будут вызваны при выполнении
28 28 /// обещания.
29 29 /// </para>
30 30 /// <para>
31 31 /// Обрабатывая результаты обещания можно преобразовывать результаты либо инициировать
32 32 /// связанные асинхронные операции, которые также возвращают обещания. Для этого следует
33 33 /// использовать соответствующую форму методе <c>Then</c>.
34 34 /// </para>
35 35 /// <para>
36 36 /// Также хорошим правилом является то, что <c>Resolve</c> и <c>Reject</c> должен вызывать
37 37 /// только инициатор обещания иначе могут возникнуть противоречия.
38 38 /// </para>
39 39 /// </remarks>
40 40 public class Promise<T> : AbstractPromise<IDeferred<T>>, IPromise<T>, IDeferred<T> {
41 41
42 42 class StubDeferred : IDeferred<T> {
43 43 public static readonly StubDeferred instance = new StubDeferred();
44 44
45 45 StubDeferred() {
46 46 }
47 47
48 48 #region IDeferred implementation
49 49
50 50 public void Resolve(T value) {
51 51 }
52 52
53 53 public void Reject(Exception error) {
54 54 }
55 55
56 56 #endregion
57 57
58 58 #region ICancellable implementation
59 59
60 60 public void Cancel() {
61 61 }
62 62
63 63 #endregion
64 64
65 65
66 66 }
67 67
68 68 class RemapDescriptor<T2> : IDeferred<T> {
69 69 readonly Func<T,T2> m_remap;
70 70 readonly Func<Exception,T2> m_failed;
71 71 readonly Func<T2> m_cancel;
72 72 readonly IDeferred<T2> m_deferred;
73 73
74 74 public RemapDescriptor(Func<T,T2> remap, Func<Exception,T2> failed, Func<T2> cancel, IDeferred<T2> deferred ) {
75 75 Debug.Assert(deferred != null);
76 76 m_remap = remap;
77 77 m_failed = failed;
78 78 m_cancel = cancel;
79 79 m_deferred = deferred;
80 80 }
81 81
82 82
83 83
84 84 #region IDeferred implementation
85 85
86 86 public void Resolve(T value) {
87 87 if (m_remap != null) {
88 88 try {
89 89 m_deferred.Resolve(m_remap(value));
90 90 } catch (Exception ex) {
91 91 Reject(ex);
92 92 }
93 93 }
94 94 }
95 95
96 96 public void Reject(Exception error) {
97 97 if (m_failed != null) {
98 98 try {
99 99 m_deferred.Resolve(m_failed(error));
100 100 } catch (Exception ex) {
101 101 m_deferred.Reject(ex);
102 102 }
103 103 } else {
104 104 m_deferred.Reject(error);
105 105 }
106 106 }
107 107
108 108
109 109 #endregion
110 110
111 111 #region ICancellable implementation
112 112
113 113 public void Cancel() {
114 114 if (m_cancel != null) {
115 115 try {
116 116 m_deferred.Resolve(m_cancel());
117 117 } catch (Exception ex) {
118 118 Reject(ex);
119 119 }
120 120 } else {
121 121 m_deferred.Cancel();
122 122 }
123 123 }
124 124
125 125 #endregion
126 126 }
127 127
128 128 class ListenerDescriptor : IDeferred<T> {
129 129 readonly Action m_handler;
130 130 readonly PromiseEventType m_events;
131 131
132 132 public ListenerDescriptor(Action handler, PromiseEventType events) {
133 133 Debug.Assert(handler != null);
134 134
135 135 m_handler = handler;
136 136 m_events = events;
137 137 }
138 138
139 139 #region IDeferred implementation
140 140
141 141 public void Resolve(T value) {
142 142 if (m_events.HasFlag(PromiseEventType.Success)) {
143 143 try {
144 144 m_handler();
145 145 // Analysis disable once EmptyGeneralCatchClause
146 146 } catch {
147 147 }
148 148 }
149 149 }
150 150
151 151 public void Reject(Exception error) {
152 152 if (m_events.HasFlag(PromiseEventType.Error)){
153 153 try {
154 154 m_handler();
155 155 // Analysis disable once EmptyGeneralCatchClause
156 156 } catch {
157 157 }
158 158 }
159 159 }
160 160
161 161 #endregion
162 162
163 163 #region ICancellable implementation
164 164
165 165 public void Cancel() {
166 166 if (m_events.HasFlag(PromiseEventType.Cancelled)){
167 167 try {
168 168 m_handler();
169 169 // Analysis disable once EmptyGeneralCatchClause
170 170 } catch {
171 171 }
172 172 }
173 173 }
174 174
175 175 #endregion
176 176 }
177 177
178 178 class ValueEventDescriptor : IDeferred<T> {
179 179 readonly Action<T> m_success;
180 180 readonly Action<Exception> m_failed;
181 181 readonly Action m_cancelled;
182 182 readonly IDeferred<T> m_deferred;
183 183
184 184 public ValueEventDescriptor(Action<T> success, Action<Exception> failed, Action cancelled, IDeferred<T> deferred) {
185 185 Debug.Assert(deferred != null);
186 186
187 187 m_success = success;
188 188 m_failed = failed;
189 189 m_cancelled = cancelled;
190 190 m_deferred = deferred;
191 191 }
192 192
193 193 #region IDeferred implementation
194 194
195 195 public void Resolve(T value) {
196 196 if (m_success != null) {
197 197 try {
198 198 m_success(value);
199 199 m_deferred.Resolve(value);
200 200 } catch (Exception ex) {
201 201 Reject(ex);
202 202 }
203 203 }
204 204 }
205 205
206 206 public void Reject(Exception error) {
207 207 if (m_failed != null) {
208 208 try {
209 209 m_failed(error);
210 210 m_deferred.Resolve(default(T));
211 211 } catch(Exception ex) {
212 212 m_deferred.Reject(ex);
213 213 }
214 214 } else {
215 215 m_deferred.Reject(error);
216 216 }
217 217 }
218 218
219 219 #endregion
220 220
221 221 #region ICancellable implementation
222 222
223 223 public void Cancel() {
224 224 if (m_cancelled != null) {
225 225 try {
226 226 m_cancelled();
227 227 m_deferred.Resolve(default(T));
228 228 } catch(Exception ex) {
229 229 Reject(ex);
230 230 }
231 231 } else {
232 232 m_deferred.Cancel();
233 233 }
234 234 }
235 235
236 236 #endregion
237 237 }
238 238
239 239 public class EventDescriptor : IDeferred<T> {
240 240 readonly Action m_success;
241 241 readonly Action<Exception> m_failed;
242 242 readonly Action m_cancelled;
243 243 readonly IDeferred<T> m_deferred;
244 244
245 245 public EventDescriptor(Action success, Action<Exception> failed, Action cancelled, IDeferred<T> deferred) {
246 246 Debug.Assert(deferred != null);
247 247
248 248 m_success = success;
249 249 m_failed = failed;
250 250 m_cancelled = cancelled;
251 251 m_deferred = deferred;
252 252 }
253 253
254 254 #region IDeferred implementation
255 255
256 256 public void Resolve(T value) {
257 257 if (m_success != null) {
258 258 try {
259 259 m_success();
260 260 m_deferred.Resolve(value);
261 261 } catch (Exception ex) {
262 262 Reject(ex);
263 263 }
264 264 }
265 265 }
266 266
267 267 public void Reject(Exception error) {
268 268 if (m_failed != null) {
269 269 try {
270 270 m_failed(error);
271 271 m_deferred.Resolve(default(T));
272 272 }catch (Exception ex)
273 273 {
274 274 m_deferred.Reject(ex);
275 275 }
276 276 } else {
277 277 m_deferred.Reject(error);
278 278 }
279 279
280 280 }
281 281
282 282 #endregion
283 283
284 284 #region ICancellable implementation
285 285
286 286 public void Cancel() {
287 287 if (m_cancelled != null) {
288 288 try {
289 289 m_cancelled();
290 290 m_deferred.Resolve(default(T));
291 291 } catch (Exception ex) {
292 292 Reject(ex);
293 293 }
294 294 } else {
295 295 m_deferred.Cancel();
296 296 }
297 297 }
298 298
299 299 #endregion
300 300 }
301 301
302 302 T m_result;
303 303
304 304 public virtual void Resolve(T value) {
305 BeginSetResult();
306 m_result = value;
307 EndSetResult();
305 if (BeginSetResult()) {
306 m_result = value;
307 EndSetResult();
308 }
308 309 }
309 310
310 311 public void Reject(Exception error) {
311 312 SetError(error);
312 313 }
313 314
314 315 public Type PromiseType {
315 316 get {
316 317 return typeof(T);
317 318 }
318 319 }
319 320
320 321 public new T Join() {
321 322 WaitResult(-1);
322 323 return m_result;
323 324 }
324 325 public new T Join(int timeout) {
325 326 WaitResult(timeout);
326 327 return m_result;
327 328 }
328 329
329 330 public IPromise<T> On(Action<T> success, Action<Exception> error, Action cancel) {
330 331 AddHandler(new ValueEventDescriptor(success, error, cancel, StubDeferred.instance));
331 332 return this;
332 333 }
333 334
334 335 public IPromise<T> On(Action<T> success, Action<Exception> error) {
335 336 AddHandler(new ValueEventDescriptor(success, error, null, StubDeferred.instance));
336 337 return this;
337 338 }
338 339
339 340 public IPromise<T> On(Action<T> success) {
340 341 AddHandler(new ValueEventDescriptor(success, null, null, StubDeferred.instance));
341 342 return this;
342 343 }
343 344
344 345 public IPromise<T> On(Action handler, PromiseEventType events) {
345 346 Listen(events, handler);
346 347 return this;
347 348 }
348 349
349 350 public IPromise<T2> Then<T2>(Func<T, T2> mapper, Func<Exception, T2> error, Func<T2> cancel) {
350 351 var promise = new Promise<T2>();
351 352 AddHandler(new RemapDescriptor<T2>(mapper, error, cancel, promise));
352 353 return promise;
353 354 }
354 355
355 356 public IPromise<T2> Then<T2>(Func<T, T2> mapper, Func<Exception, T2> error) {
356 357 var promise = new Promise<T2>();
357 358 AddHandler(new RemapDescriptor<T2>(mapper, error, null, promise));
358 359 return promise;
359 360 }
360 361
361 362 public IPromise<T2> Then<T2>(Func<T, T2> mapper) {
362 363 var promise = new Promise<T2>();
363 364 AddHandler(new RemapDescriptor<T2>(mapper, null, null, promise));
364 365 return promise;
365 366 }
366 367
367 368 public IPromise<T2> Chain<T2>(Func<T, IPromise<T2>> chained, Func<Exception, IPromise<T2>> error, Func<IPromise<T2>> cancel) {
368 369 // this promise will be resolved when an asyc operation is started
369 370 var promise = new Promise<IPromise<T2>>();
370 371
371 372 AddHandler(new RemapDescriptor<IPromise<T2>>(
372 373 chained,
373 374 error,
374 375 cancel,
375 376 promise
376 377 ));
377 378
378 379 var medium = new Promise<T2>();
379 380
380 381 if (chained != null)
381 382 medium.On(Cancel, PromiseEventType.Cancelled);
382 383
383 384 // we need to connect started async operation with the medium
384 385 // if the async operation hasn't been started by the some reason
385 386 // report is to the medium
386 387 promise.On(
387 388 result => ConnectPromise<T2>(result, medium),
388 389 medium.Reject,
389 390 medium.Cancel
390 391 );
391 392
392 393 return medium;
393 394 }
394 395
395 396 static void ConnectPromise<T2>(IPromise<T2> result, Promise<T2> medium) {
396 397 if (result != null) {
397 398 result.On(
398 399 medium.Resolve,
399 400 medium.Reject,
400 401 () => medium.Reject(new OperationCanceledException())
401 402 );
402 403 medium.On(result.Cancel, PromiseEventType.Cancelled);
403 404 } else {
404 405 medium.Reject(
405 406 new NullReferenceException(
406 407 "The chained asynchronous operation returned" +
407 408 " 'null' where the promise instance is expected"
408 409 )
409 410 );
410 411 }
411 412 }
412 413
413 414 public IPromise<T2> Chain<T2>(Func<T, IPromise<T2>> chained, Func<Exception, IPromise<T2>> error) {
414 415 return Chain(chained, error, null);
415 416 }
416 417
417 418 public IPromise<T2> Chain<T2>(Func<T, IPromise<T2>> chained) {
418 419 return Chain(chained, null, null);
419 420 }
420 421
421 422 public IPromise<T2> Error<T2>(Func<Exception, T2> error) {
422 423 var promise = new Promise<T2>();
423 424 if (error != null)
424 425 On(
425 426 (Action<T>)null,
426 427 ex => {
427 428 try {
428 429 promise.Resolve(error(ex));
429 430 } catch (Exception ex2) {
430 431 promise.Reject(ex2);
431 432 }
432 433 }
433 434 );
434 435 else
435 436 Listen(PromiseEventType.Error, () => promise.Resolve(default(T2)));
436 437 return promise;
437 438 }
438 439
439 440 public IPromise<T2> Cancelled<T2>(Func<T2> handler) {
440 441 var promise = new Promise<T2>();
441 442 if (handler != null)
442 443 On(
443 444 (Action<T>)null,
444 445 null,
445 446 () => {
446 447 try {
447 448 promise.Resolve(handler());
448 449 } catch (Exception ex) {
449 450 promise.Reject(ex);
450 451 }
451 452 });
452 453 else
453 454 Listen(PromiseEventType.Cancelled, () => promise.Resolve(default(T2)));
454 455 return promise;
455 456 }
456 457
457 458 public IPromise Then(Action success, Action<Exception> error, Action cancel) {
458 459 var promise = new Promise<T>();
459 460 if (success != null)
460 461 promise.On(Cancel, PromiseEventType.Cancelled);
461 462
462 463 AddHandler(new EventDescriptor(success, error, cancel, promise));
463 464
464 465 return promise;
465 466 }
466 467
467 468 public IPromise Then(Action success, Action<Exception> error) {
468 469 return Then(success, error, null);
469 470 }
470 471
471 472 public IPromise Then(Action success) {
472 473 return Then(success, null, null);
473 474 }
474 475
475 476 public IPromise Chain(Func<IPromise> chained, Func<Exception, IPromise> error, Func<IPromise> cancel) {
476 477 var promise = new Promise<IPromise>();
477 478
478 479 AddHandler(
479 480 new RemapDescriptor<IPromise>(
480 481 x => chained(),
481 482 error,
482 483 cancel,
483 484 promise
484 485 )
485 486 );
486 487
487 488 var medium = new Promise();
488 489 if (chained != null)
489 490 medium.On(Cancel, PromiseEventType.Cancelled);
490 491
491 492 promise.On(
492 493 result => ConnectPromise(result, medium),
493 494 medium.Reject,
494 495 medium.Cancel
495 496 );
496 497
497 498 return medium;
498 499 }
499 500
500 501 static void ConnectPromise(IPromise result, Promise medium) {
501 502 if (result != null) {
502 503 result.On(
503 504 medium.Resolve,
504 505 medium.Reject,
505 506 () => medium.Reject(new OperationCanceledException())
506 507 );
507 508 medium.On(result.Cancel, PromiseEventType.Cancelled);
508 509 } else {
509 510 medium.Reject(
510 511 new NullReferenceException(
511 512 "The chained asynchronous operation returned" +
512 513 " 'null' where the promise instance is expected"
513 514 )
514 515 );
515 516 }
516 517 }
517 518
518 519 public IPromise Chain(Func<IPromise> chained, Func<Exception, IPromise> error) {
519 520 return Chain(chained, error, null);
520 521 }
521 522
522 523 public IPromise Chain(Func<IPromise> chained) {
523 524 return Chain(chained, null, null);
524 525 }
525 526
526 527 public IPromise On(Action success, Action<Exception> error, Action cancel) {
527 528 AddHandler(new EventDescriptor(success,error,cancel, StubDeferred.instance));
528 529 return this;
529 530 }
530 531
531 532 public IPromise On(Action success, Action<Exception> error) {
532 533 AddHandler(new EventDescriptor(success, error, null, StubDeferred.instance));
533 534 return this;
534 535 }
535 536
536 537 public IPromise On(Action success) {
537 538 Listen(PromiseEventType.Success, success);
538 539 return this;
539 540 }
540 541
541 542 IPromise IPromise.On(Action handler, PromiseEventType events) {
542 543 Listen(events,handler);
543 544 return this;
544 545 }
545 546
546 547 public IPromise Error(Action<Exception> error) {
547 548 var promise = new Promise();
548 549 if (error != null)
549 550 On(
550 551 (Action<T>)null,
551 552 ex => {
552 553 try {
553 554 error(ex);
554 555 promise.Resolve();
555 556 } catch (Exception ex2) {
556 557 promise.Reject(ex2);
557 558 }
558 559 });
559 560 else
560 561 Listen(PromiseEventType.Error, promise.Resolve);
561 562 return promise;
562 563 }
563 564
564 565 public IPromise Cancelled(Action handler) {
565 566 var promise = new Promise();
566 567 if (handler != null)
567 568 On(
568 569 (Action<T>)null,
569 570 null,
570 571 () => {
571 572 try {
572 573 handler();
573 574 promise.Resolve();
574 575 } catch (Exception ex) {
575 576 promise.Reject(ex);
576 577 }
577 578 });
578 579 else
579 580 Listen(PromiseEventType.Cancelled, promise.Resolve);
580 581 return promise;
581 582 }
582 583
583 584 public IPromise<T2> Cast<T2>() {
584 585 return (IPromise<T2>)this;
585 586 }
586 587
587 588 #region implemented abstract members of AbstractPromise
588 589
589 590 protected override void SignalSuccess(IDeferred<T> handler) {
590 591 handler.Resolve(m_result);
591 592 }
592 593
593 594 protected override void SignalError(IDeferred<T> handler, Exception error) {
594 595 handler.Reject(error);
595 596 }
596 597
597 598 protected override void SignalCancelled(IDeferred<T> handler) {
598 599 handler.Cancel();
599 600 }
600 601
601 602 protected override void Listen(PromiseEventType events, Action handler) {
602 603 if (handler != null)
603 604 AddHandler(new ListenerDescriptor(handler, events));
604 605 }
605 606
606 607 #endregion
607 608
608 609 public static IPromise<T> ResultToPromise(T value) {
609 610 var p = new Promise<T>();
610 611 p.Resolve(value);
611 612 return p;
612 613 }
613 614
614 615 public static IPromise<T> ExceptionToPromise(Exception error) {
615 616 var p = new Promise<T>();
616 617 p.Reject(error);
617 618 return p;
618 619 }
619 620
620 621 }
621 622 }
General Comments 0
You need to be logged in to leave comments. Login now