##// END OF EJS Templates
fixed Resove method bug when calling it on already cancelled promise
cin -
r130:671f60cd0250 v2
parent child
Show More
@@ -1,122 +1,122
1 using Implab.Parallels;
1 using Implab.Parallels;
2 using System;
2 using System;
3 using System.Collections.Generic;
3 using System.Collections.Generic;
4 using System.Linq;
4 using System.Linq;
5 using System.Text;
5 using System.Text;
6 using System.Threading;
6 using System.Threading;
7 using System.Threading.Tasks;
7 using System.Threading.Tasks;
8 using System.Windows.Forms;
8 using System.Windows.Forms;
9
9
10 namespace Implab.Diagnostics.Interactive
10 namespace Implab.Diagnostics.Interactive
11 {
11 {
12 public class InteractiveListener: TextListenerBase
12 public class InteractiveListener: TextListenerBase
13 {
13 {
14 TraceForm m_form;
14 TraceForm m_form;
15
15
16 SynchronizationContext m_syncGuiThread;
16 SynchronizationContext m_syncGuiThread;
17 readonly Promise<object> m_guiStarted = new Promise<object>();
17 readonly Promise m_guiStarted = new Promise();
18
18
19 readonly IPromise m_guiFinished;
19 readonly IPromise m_guiFinished;
20 // readonly IPromise m_workerFinished = new Promise<object>();
20 // readonly IPromise m_workerFinished = new Promise<object>();
21
21
22 readonly MTQueue<TraceViewItem> m_queue = new MTQueue<TraceViewItem>();
22 readonly MTQueue<TraceViewItem> m_queue = new MTQueue<TraceViewItem>();
23 readonly AutoResetEvent m_queueEvent = new AutoResetEvent(false);
23 readonly AutoResetEvent m_queueEvent = new AutoResetEvent(false);
24
24
25 int m_queueLength;
25 int m_queueLength;
26 bool m_exitPending;
26 bool m_exitPending;
27
27
28 readonly object m_pauseLock = new object();
28 readonly object m_pauseLock = new object();
29 bool m_paused;
29 bool m_paused;
30 readonly ManualResetEvent m_pauseEvent = new ManualResetEvent(true);
30 readonly ManualResetEvent m_pauseEvent = new ManualResetEvent(true);
31
31
32 public InteractiveListener(bool global) : base(global) {
32 public InteractiveListener(bool global) : base(global) {
33 m_guiFinished = AsyncPool.InvokeNewThread(GuiThread);
33 m_guiFinished = AsyncPool.RunThread(GuiThread);
34 /*m_workerFinished = */AsyncPool.InvokeNewThread(QueueThread);
34 /*m_workerFinished = */AsyncPool.RunThread(QueueThread);
35
35
36 m_guiStarted.Join();
36 m_guiStarted.Join();
37 }
37 }
38
38
39 void GuiThread() {
39 void GuiThread() {
40 m_form = new TraceForm(); // will create SynchronizationContext
40 m_form = new TraceForm(); // will create SynchronizationContext
41
41
42 m_form.PauseEvents += (s,a) => Pause();
42 m_form.PauseEvents += (s,a) => Pause();
43 m_form.ResumeEvents += (s, a) => Resume();
43 m_form.ResumeEvents += (s, a) => Resume();
44
44
45 m_syncGuiThread = SynchronizationContext.Current;
45 m_syncGuiThread = SynchronizationContext.Current;
46 m_guiStarted.Resolve();
46 m_guiStarted.Resolve();
47 Application.Run();
47 Application.Run();
48 }
48 }
49
49
50 void QueueThread() {
50 void QueueThread() {
51 while (!m_exitPending) {
51 while (!m_exitPending) {
52 if (m_paused)
52 if (m_paused)
53 m_pauseEvent.WaitOne();
53 m_pauseEvent.WaitOne();
54
54
55 TraceViewItem item;
55 TraceViewItem item;
56 if (m_queue.TryDequeue(out item)) {
56 if (m_queue.TryDequeue(out item)) {
57 Interlocked.Decrement(ref m_queueLength);
57 Interlocked.Decrement(ref m_queueLength);
58
58
59 m_syncGuiThread.Send(x => m_form.AddTraceEvent(item),null);
59 m_syncGuiThread.Send(x => m_form.AddTraceEvent(item),null);
60 } else {
60 } else {
61 m_queueEvent.WaitOne();
61 m_queueEvent.WaitOne();
62 }
62 }
63 }
63 }
64 }
64 }
65
65
66 public void Pause() {
66 public void Pause() {
67 // for consistency we need to set this properties atomically
67 // for consistency we need to set this properties atomically
68 lock (m_pauseLock) {
68 lock (m_pauseLock) {
69 m_pauseEvent.Reset();
69 m_pauseEvent.Reset();
70 m_paused = true;
70 m_paused = true;
71 }
71 }
72 }
72 }
73
73
74 public void Resume() {
74 public void Resume() {
75 // for consistency we need to set this properties atomically
75 // for consistency we need to set this properties atomically
76 lock (m_pauseLock) {
76 lock (m_pauseLock) {
77 m_paused = false;
77 m_paused = false;
78 m_pauseEvent.Set();
78 m_pauseEvent.Set();
79 }
79 }
80 }
80 }
81
81
82 void Enqueue(TraceViewItem item) {
82 void Enqueue(TraceViewItem item) {
83 m_queue.Enqueue(item);
83 m_queue.Enqueue(item);
84 if (Interlocked.Increment(ref m_queueLength) == 1)
84 if (Interlocked.Increment(ref m_queueLength) == 1)
85 m_queueEvent.Set();
85 m_queueEvent.Set();
86 }
86 }
87
87
88 public void ShowForm() {
88 public void ShowForm() {
89 m_syncGuiThread.Post(x => m_form.Show(), null);
89 m_syncGuiThread.Post(x => m_form.Show(), null);
90 }
90 }
91
91
92 public void HideForm() {
92 public void HideForm() {
93 m_syncGuiThread.Post(x => m_form.Hide(), null);
93 m_syncGuiThread.Post(x => m_form.Hide(), null);
94 }
94 }
95
95
96 void Terminate() {
96 void Terminate() {
97 m_exitPending = true;
97 m_exitPending = true;
98 Resume();
98 Resume();
99 m_syncGuiThread.Post(x => Application.ExitThread(), null);
99 m_syncGuiThread.Post(x => Application.ExitThread(), null);
100 }
100 }
101
101
102 protected override void Dispose(bool disposing) {
102 protected override void Dispose(bool disposing) {
103 if (disposing) {
103 if (disposing) {
104 Terminate();
104 Terminate();
105 m_guiFinished.Join();
105 m_guiFinished.Join();
106 }
106 }
107 base.Dispose(disposing);
107 base.Dispose(disposing);
108 }
108 }
109
109
110 protected override void WriteEntry(LogEventArgs args, EventText text, string channel) {
110 protected override void WriteEntry(LogEventArgs args, EventText text, string channel) {
111 var item = new TraceViewItem {
111 var item = new TraceViewItem {
112 Indent = text.indent,
112 Indent = text.indent,
113 Message = text.content,
113 Message = text.content,
114 Thread = args.ThreadId,
114 Thread = args.ThreadId,
115 Channel = channel,
115 Channel = channel,
116 Timestamp = Environment.TickCount
116 Timestamp = Environment.TickCount
117 };
117 };
118
118
119 Enqueue(item);
119 Enqueue(item);
120 }
120 }
121 }
121 }
122 }
122 }
@@ -1,289 +1,291
1 using System;
1 using System;
2 using Implab.Parallels;
2 using Implab.Parallels;
3 using System.Threading;
3 using System.Threading;
4 using System.Reflection;
4 using System.Reflection;
5
5
6 namespace Implab {
6 namespace Implab {
7 public abstract class AbstractPromise<THandler> {
7 public abstract class AbstractPromise<THandler> {
8
8
9 const int UNRESOLVED_SATE = 0;
9 const int UNRESOLVED_SATE = 0;
10 const int TRANSITIONAL_STATE = 1;
10 const int TRANSITIONAL_STATE = 1;
11 const int SUCCEEDED_STATE = 2;
11 const int SUCCEEDED_STATE = 2;
12 const int REJECTED_STATE = 3;
12 const int REJECTED_STATE = 3;
13 const int CANCELLED_STATE = 4;
13 const int CANCELLED_STATE = 4;
14
14
15 const int RESERVED_HANDLERS_COUNT = 4;
15 const int RESERVED_HANDLERS_COUNT = 4;
16
16
17 int m_state;
17 int m_state;
18 Exception m_error;
18 Exception m_error;
19 int m_handlersCount;
19 int m_handlersCount;
20
20
21 readonly THandler[] m_handlers = new THandler[RESERVED_HANDLERS_COUNT];
21 readonly THandler[] m_handlers = new THandler[RESERVED_HANDLERS_COUNT];
22 MTQueue<THandler> m_extraHandlers;
22 MTQueue<THandler> m_extraHandlers;
23 int m_handlerPointer = -1;
23 int m_handlerPointer = -1;
24 int m_handlersCommited;
24 int m_handlersCommited;
25
25
26 #region state managment
26 #region state managment
27 bool BeginTransit() {
27 bool BeginTransit() {
28 return UNRESOLVED_SATE == Interlocked.CompareExchange(ref m_state, TRANSITIONAL_STATE, UNRESOLVED_SATE);
28 return UNRESOLVED_SATE == Interlocked.CompareExchange(ref m_state, TRANSITIONAL_STATE, UNRESOLVED_SATE);
29 }
29 }
30
30
31 void CompleteTransit(int state) {
31 void CompleteTransit(int state) {
32 if (TRANSITIONAL_STATE != Interlocked.CompareExchange(ref m_state, state, TRANSITIONAL_STATE))
32 if (TRANSITIONAL_STATE != Interlocked.CompareExchange(ref m_state, state, TRANSITIONAL_STATE))
33 throw new InvalidOperationException("Can't complete transition when the object isn't in the transitional state");
33 throw new InvalidOperationException("Can't complete transition when the object isn't in the transitional state");
34 }
34 }
35
35
36 void WaitTransition() {
36 void WaitTransition() {
37 while (m_state == TRANSITIONAL_STATE) {
37 while (m_state == TRANSITIONAL_STATE) {
38 Thread.MemoryBarrier();
38 Thread.MemoryBarrier();
39 }
39 }
40 }
40 }
41
41
42 protected void BeginSetResult() {
42 protected bool BeginSetResult() {
43 if (!BeginTransit()) {
43 if (!BeginTransit()) {
44 WaitTransition();
44 WaitTransition();
45 if (m_state != CANCELLED_STATE)
45 if (m_state != CANCELLED_STATE)
46 throw new InvalidOperationException("The promise is already resolved");
46 throw new InvalidOperationException("The promise is already resolved");
47 return false;
47 }
48 }
49 return true;
48 }
50 }
49
51
50 protected void EndSetResult() {
52 protected void EndSetResult() {
51 CompleteTransit(SUCCEEDED_STATE);
53 CompleteTransit(SUCCEEDED_STATE);
52 OnSuccess();
54 OnSuccess();
53 }
55 }
54
56
55
57
56
58
57 /// <summary>
59 /// <summary>
58 /// ВыполняСт ΠΎΠ±Π΅Ρ‰Π°Π½ΠΈΠ΅, сообщая ΠΎΠ± ошибкС
60 /// ВыполняСт ΠΎΠ±Π΅Ρ‰Π°Π½ΠΈΠ΅, сообщая ΠΎΠ± ошибкС
59 /// </summary>
61 /// </summary>
60 /// <remarks>
62 /// <remarks>
61 /// ΠŸΠΎΡΠΊΠΎΠ»ΡŒΠΊΡƒ ΠΎΠ±Π΅Ρ‰Π°Π½ΠΈΠ΅ Π΄ΠΎΠ»ΠΆΠ½ΠΎ Ρ€Π°Π±ΠΎΡ‚Π°Ρ‚ΡŒ Π² ΠΌΠ½ΠΎΠ³ΠΎΠΏΡ‚ΠΎΡ‡Π½ΠΎΠΉ срСдС, ΠΏΡ€ΠΈ Π΅Π³ΠΎ Π²Ρ‹ΠΏΠΎΠ»Π½Π΅Π½ΠΈΠΈ сразу нСсколько ΠΏΠΎΡ‚ΠΎΠΊΠΎΠ²
63 /// ΠŸΠΎΡΠΊΠΎΠ»ΡŒΠΊΡƒ ΠΎΠ±Π΅Ρ‰Π°Π½ΠΈΠ΅ Π΄ΠΎΠ»ΠΆΠ½ΠΎ Ρ€Π°Π±ΠΎΡ‚Π°Ρ‚ΡŒ Π² ΠΌΠ½ΠΎΠ³ΠΎΠΏΡ‚ΠΎΡ‡Π½ΠΎΠΉ срСдС, ΠΏΡ€ΠΈ Π΅Π³ΠΎ Π²Ρ‹ΠΏΠΎΠ»Π½Π΅Π½ΠΈΠΈ сразу нСсколько ΠΏΠΎΡ‚ΠΎΠΊΠΎΠ²
62 /// ΠΌΠΎΠ³Ρƒ Π²Π΅Ρ€Π½ΡƒΡ‚ΡŒ ΠΎΡˆΠΈΠ±ΠΊΡƒ, ΠΏΡ€ΠΈ этом Ρ‚ΠΎΠ»ΡŒΠΊΠΎ пСрвая Π±ΡƒΠ΄Π΅Ρ‚ использована Π² качСствС Ρ€Π΅Π·ΡƒΠ»ΡŒΡ‚Π°Ρ‚Π°, ΠΎΡΡ‚Π°Π»ΡŒΠ½Ρ‹Π΅
64 /// ΠΌΠΎΠ³Ρƒ Π²Π΅Ρ€Π½ΡƒΡ‚ΡŒ ΠΎΡˆΠΈΠ±ΠΊΡƒ, ΠΏΡ€ΠΈ этом Ρ‚ΠΎΠ»ΡŒΠΊΠΎ пСрвая Π±ΡƒΠ΄Π΅Ρ‚ использована Π² качСствС Ρ€Π΅Π·ΡƒΠ»ΡŒΡ‚Π°Ρ‚Π°, ΠΎΡΡ‚Π°Π»ΡŒΠ½Ρ‹Π΅
63 /// Π±ΡƒΠ΄ΡƒΡ‚ ΠΏΡ€ΠΎΠΈΠ³Π½ΠΎΡ€ΠΈΡ€ΠΎΠ²Π°Π½Ρ‹.
65 /// Π±ΡƒΠ΄ΡƒΡ‚ ΠΏΡ€ΠΎΠΈΠ³Π½ΠΎΡ€ΠΈΡ€ΠΎΠ²Π°Π½Ρ‹.
64 /// </remarks>
66 /// </remarks>
65 /// <param name="error">Π˜ΡΠΊΠ»ΡŽΡ‡Π΅Π½ΠΈΠ΅ возникшСС ΠΏΡ€ΠΈ Π²Ρ‹ΠΏΠΎΠ»Π½Π΅Π½ΠΈΠΈ ΠΎΠΏΠ΅Ρ€Π°Ρ†ΠΈΠΈ</param>
67 /// <param name="error">Π˜ΡΠΊΠ»ΡŽΡ‡Π΅Π½ΠΈΠ΅ возникшСС ΠΏΡ€ΠΈ Π²Ρ‹ΠΏΠΎΠ»Π½Π΅Π½ΠΈΠΈ ΠΎΠΏΠ΅Ρ€Π°Ρ†ΠΈΠΈ</param>
66 /// <exception cref="InvalidOperationException">Π”Π°Π½Π½ΠΎΠ΅ ΠΎΠ±Π΅Ρ‰Π°Π½ΠΈΠ΅ ΡƒΠΆΠ΅ Π²Ρ‹ΠΏΠΎΠ»Π½Π΅Π½ΠΎ</exception>
68 /// <exception cref="InvalidOperationException">Π”Π°Π½Π½ΠΎΠ΅ ΠΎΠ±Π΅Ρ‰Π°Π½ΠΈΠ΅ ΡƒΠΆΠ΅ Π²Ρ‹ΠΏΠΎΠ»Π½Π΅Π½ΠΎ</exception>
67 protected void SetError(Exception error) {
69 protected void SetError(Exception error) {
68 if (BeginTransit()) {
70 if (BeginTransit()) {
69 m_error = error is PromiseTransientException ? error.InnerException : error;
71 m_error = error is PromiseTransientException ? error.InnerException : error;
70 CompleteTransit(REJECTED_STATE);
72 CompleteTransit(REJECTED_STATE);
71 OnError();
73 OnError();
72 } else {
74 } else {
73 WaitTransition();
75 WaitTransition();
74 if (m_state == SUCCEEDED_STATE)
76 if (m_state == SUCCEEDED_STATE)
75 throw new InvalidOperationException("The promise is already resolved");
77 throw new InvalidOperationException("The promise is already resolved");
76 }
78 }
77 }
79 }
78
80
79 /// <summary>
81 /// <summary>
80 /// ΠžΡ‚ΠΌΠ΅Π½ΡΠ΅Ρ‚ ΠΎΠΏΠ΅Ρ€Π°Ρ†ΠΈΡŽ, Ссли это Π²ΠΎΠ·ΠΌΠΎΠΆΠ½ΠΎ.
82 /// ΠžΡ‚ΠΌΠ΅Π½ΡΠ΅Ρ‚ ΠΎΠΏΠ΅Ρ€Π°Ρ†ΠΈΡŽ, Ссли это Π²ΠΎΠ·ΠΌΠΎΠΆΠ½ΠΎ.
81 /// </summary>
83 /// </summary>
82 /// <remarks>Для опрСдСлСния Π±Ρ‹Π»Π° Π»ΠΈ опСрация ΠΎΡ‚ΠΌΠ΅Π½Π΅Π½Π° слСдуСт ΠΈΡΠΏΠΎΠ»ΡŒΠ·ΠΎΠ²Π°Ρ‚ΡŒ свойство <see cref="IsCancelled"/>.</remarks>
84 /// <remarks>Для опрСдСлСния Π±Ρ‹Π»Π° Π»ΠΈ опСрация ΠΎΡ‚ΠΌΠ΅Π½Π΅Π½Π° слСдуСт ΠΈΡΠΏΠΎΠ»ΡŒΠ·ΠΎΠ²Π°Ρ‚ΡŒ свойство <see cref="IsCancelled"/>.</remarks>
83 protected void SetCancelled() {
85 protected void SetCancelled() {
84 if (BeginTransit()) {
86 if (BeginTransit()) {
85 CompleteTransit(CANCELLED_STATE);
87 CompleteTransit(CANCELLED_STATE);
86 OnCancelled();
88 OnCancelled();
87 }
89 }
88 }
90 }
89
91
90 protected abstract void SignalSuccess(THandler handler);
92 protected abstract void SignalSuccess(THandler handler);
91
93
92 protected abstract void SignalError(THandler handler, Exception error);
94 protected abstract void SignalError(THandler handler, Exception error);
93
95
94 protected abstract void SignalCancelled(THandler handler);
96 protected abstract void SignalCancelled(THandler handler);
95
97
96 void OnSuccess() {
98 void OnSuccess() {
97 var hp = m_handlerPointer;
99 var hp = m_handlerPointer;
98 var slot = hp +1 ;
100 var slot = hp +1 ;
99 while (slot < m_handlersCommited) {
101 while (slot < m_handlersCommited) {
100 if (Interlocked.CompareExchange(ref m_handlerPointer, slot, hp) == hp) {
102 if (Interlocked.CompareExchange(ref m_handlerPointer, slot, hp) == hp) {
101 SignalSuccess(m_handlers[slot]);
103 SignalSuccess(m_handlers[slot]);
102 }
104 }
103 hp = m_handlerPointer;
105 hp = m_handlerPointer;
104 slot = hp +1 ;
106 slot = hp +1 ;
105 }
107 }
106
108
107
109
108 if (m_extraHandlers != null) {
110 if (m_extraHandlers != null) {
109 THandler handler;
111 THandler handler;
110 while (m_extraHandlers.TryDequeue(out handler))
112 while (m_extraHandlers.TryDequeue(out handler))
111 SignalSuccess(handler);
113 SignalSuccess(handler);
112 }
114 }
113 }
115 }
114
116
115 void OnError() {
117 void OnError() {
116 var hp = m_handlerPointer;
118 var hp = m_handlerPointer;
117 var slot = hp +1 ;
119 var slot = hp +1 ;
118 while (slot < m_handlersCommited) {
120 while (slot < m_handlersCommited) {
119 if (Interlocked.CompareExchange(ref m_handlerPointer, slot, hp) == hp) {
121 if (Interlocked.CompareExchange(ref m_handlerPointer, slot, hp) == hp) {
120 SignalError(m_handlers[slot],m_error);
122 SignalError(m_handlers[slot],m_error);
121 }
123 }
122 hp = m_handlerPointer;
124 hp = m_handlerPointer;
123 slot = hp +1 ;
125 slot = hp +1 ;
124 }
126 }
125
127
126 if (m_extraHandlers != null) {
128 if (m_extraHandlers != null) {
127 THandler handler;
129 THandler handler;
128 while (m_extraHandlers.TryDequeue(out handler))
130 while (m_extraHandlers.TryDequeue(out handler))
129 SignalError(handler, m_error);
131 SignalError(handler, m_error);
130 }
132 }
131 }
133 }
132
134
133 void OnCancelled() {
135 void OnCancelled() {
134 var hp = m_handlerPointer;
136 var hp = m_handlerPointer;
135 var slot = hp +1 ;
137 var slot = hp +1 ;
136 while (slot < m_handlersCommited) {
138 while (slot < m_handlersCommited) {
137 if (Interlocked.CompareExchange(ref m_handlerPointer, slot, hp) == hp) {
139 if (Interlocked.CompareExchange(ref m_handlerPointer, slot, hp) == hp) {
138 SignalCancelled(m_handlers[slot]);
140 SignalCancelled(m_handlers[slot]);
139 }
141 }
140 hp = m_handlerPointer;
142 hp = m_handlerPointer;
141 slot = hp +1 ;
143 slot = hp +1 ;
142 }
144 }
143
145
144 if (m_extraHandlers != null) {
146 if (m_extraHandlers != null) {
145 THandler handler;
147 THandler handler;
146 while (m_extraHandlers.TryDequeue(out handler))
148 while (m_extraHandlers.TryDequeue(out handler))
147 SignalCancelled(handler);
149 SignalCancelled(handler);
148 }
150 }
149 }
151 }
150
152
151 #endregion
153 #endregion
152
154
153 protected abstract void Listen(PromiseEventType events, Action handler);
155 protected abstract void Listen(PromiseEventType events, Action handler);
154
156
155 #region synchronization traits
157 #region synchronization traits
156 protected void WaitResult(int timeout) {
158 protected void WaitResult(int timeout) {
157 if (!IsResolved) {
159 if (!IsResolved) {
158 var lk = new object();
160 var lk = new object();
159
161
160 Listen(PromiseEventType.All, () => {
162 Listen(PromiseEventType.All, () => {
161 lock(lk) {
163 lock(lk) {
162 Monitor.Pulse(lk);
164 Monitor.Pulse(lk);
163 }
165 }
164 });
166 });
165
167
166 lock (lk) {
168 lock (lk) {
167 while(!IsResolved) {
169 while(!IsResolved) {
168 if(!Monitor.Wait(lk,timeout))
170 if(!Monitor.Wait(lk,timeout))
169 throw new TimeoutException();
171 throw new TimeoutException();
170 }
172 }
171 }
173 }
172
174
173 }
175 }
174 switch (m_state) {
176 switch (m_state) {
175 case SUCCEEDED_STATE:
177 case SUCCEEDED_STATE:
176 return;
178 return;
177 case CANCELLED_STATE:
179 case CANCELLED_STATE:
178 throw new OperationCanceledException();
180 throw new OperationCanceledException();
179 case REJECTED_STATE:
181 case REJECTED_STATE:
180 throw new TargetInvocationException(m_error);
182 throw new TargetInvocationException(m_error);
181 default:
183 default:
182 throw new ApplicationException(String.Format("Invalid promise state {0}", m_state));
184 throw new ApplicationException(String.Format("Invalid promise state {0}", m_state));
183 }
185 }
184 }
186 }
185 #endregion
187 #endregion
186
188
187 #region handlers managment
189 #region handlers managment
188
190
189 protected void AddHandler(THandler handler) {
191 protected void AddHandler(THandler handler) {
190
192
191 if (m_state > 1) {
193 if (m_state > 1) {
192 // the promise is in the resolved state, just invoke the handler
194 // the promise is in the resolved state, just invoke the handler
193 InvokeHandler(handler);
195 InvokeHandler(handler);
194 } else {
196 } else {
195 var slot = Interlocked.Increment(ref m_handlersCount) - 1;
197 var slot = Interlocked.Increment(ref m_handlersCount) - 1;
196
198
197 if (slot < RESERVED_HANDLERS_COUNT) {
199 if (slot < RESERVED_HANDLERS_COUNT) {
198 m_handlers[slot] = handler;
200 m_handlers[slot] = handler;
199
201
200 while (slot != Interlocked.CompareExchange(ref m_handlersCommited, slot + 1, slot)) {
202 while (slot != Interlocked.CompareExchange(ref m_handlersCommited, slot + 1, slot)) {
201 }
203 }
202
204
203 if (m_state > 1) {
205 if (m_state > 1) {
204 do {
206 do {
205 var hp = m_handlerPointer;
207 var hp = m_handlerPointer;
206 slot = hp + 1;
208 slot = hp + 1;
207 if (slot < m_handlersCommited) {
209 if (slot < m_handlersCommited) {
208 if (Interlocked.CompareExchange(ref m_handlerPointer, slot, hp) != hp)
210 if (Interlocked.CompareExchange(ref m_handlerPointer, slot, hp) != hp)
209 continue;
211 continue;
210 InvokeHandler(m_handlers[slot]);
212 InvokeHandler(m_handlers[slot]);
211 }
213 }
212 break;
214 break;
213 } while(true);
215 } while(true);
214 }
216 }
215 } else {
217 } else {
216 if (slot == RESERVED_HANDLERS_COUNT) {
218 if (slot == RESERVED_HANDLERS_COUNT) {
217 m_extraHandlers = new MTQueue<THandler>();
219 m_extraHandlers = new MTQueue<THandler>();
218 } else {
220 } else {
219 while (m_extraHandlers == null)
221 while (m_extraHandlers == null)
220 Thread.MemoryBarrier();
222 Thread.MemoryBarrier();
221 }
223 }
222
224
223 m_extraHandlers.Enqueue(handler);
225 m_extraHandlers.Enqueue(handler);
224
226
225 if (m_state > 1 && m_extraHandlers.TryDequeue(out handler))
227 if (m_state > 1 && m_extraHandlers.TryDequeue(out handler))
226 // if the promise have been resolved while we was adding the handler to the queue
228 // if the promise have been resolved while we was adding the handler to the queue
227 // we can't guarantee that someone is still processing it
229 // we can't guarantee that someone is still processing it
228 // therefore we need to fetch a handler from the queue and execute it
230 // therefore we need to fetch a handler from the queue and execute it
229 // note that fetched handler may be not the one that we have added
231 // note that fetched handler may be not the one that we have added
230 // even we can fetch no handlers at all :)
232 // even we can fetch no handlers at all :)
231 InvokeHandler(handler);
233 InvokeHandler(handler);
232 }
234 }
233 }
235 }
234 }
236 }
235
237
236 protected void InvokeHandler(THandler handler) {
238 protected void InvokeHandler(THandler handler) {
237 switch (m_state) {
239 switch (m_state) {
238 case SUCCEEDED_STATE:
240 case SUCCEEDED_STATE:
239 SignalSuccess(handler);
241 SignalSuccess(handler);
240 break;
242 break;
241 case CANCELLED_STATE:
243 case CANCELLED_STATE:
242 SignalCancelled(handler);
244 SignalCancelled(handler);
243 break;
245 break;
244 case REJECTED_STATE:
246 case REJECTED_STATE:
245 SignalError(handler, m_error);
247 SignalError(handler, m_error);
246 break;
248 break;
247 default:
249 default:
248 throw new Exception(String.Format("Invalid promise state {0}", m_state));
250 throw new Exception(String.Format("Invalid promise state {0}", m_state));
249 }
251 }
250 }
252 }
251
253
252 #endregion
254 #endregion
253
255
254 #region IPromise implementation
256 #region IPromise implementation
255
257
256 public void Join(int timeout) {
258 public void Join(int timeout) {
257 WaitResult(timeout);
259 WaitResult(timeout);
258 }
260 }
259
261
260 public void Join() {
262 public void Join() {
261 WaitResult(-1);
263 WaitResult(-1);
262 }
264 }
263
265
264 public bool IsResolved {
266 public bool IsResolved {
265 get {
267 get {
266 Thread.MemoryBarrier();
268 Thread.MemoryBarrier();
267 return m_state > 1;
269 return m_state > 1;
268 }
270 }
269 }
271 }
270
272
271 public bool IsCancelled {
273 public bool IsCancelled {
272 get {
274 get {
273 Thread.MemoryBarrier();
275 Thread.MemoryBarrier();
274 return m_state == CANCELLED_STATE;
276 return m_state == CANCELLED_STATE;
275 }
277 }
276 }
278 }
277
279
278 #endregion
280 #endregion
279
281
280 #region ICancellable implementation
282 #region ICancellable implementation
281
283
282 public void Cancel() {
284 public void Cancel() {
283 SetCancelled();
285 SetCancelled();
284 }
286 }
285
287
286 #endregion
288 #endregion
287 }
289 }
288 }
290 }
289
291
@@ -1,629 +1,631
1 using System.Threading;
1 using System.Threading;
2 using System.Collections.Generic;
2 using System.Collections.Generic;
3 using System;
3 using System;
4 using System.Collections;
4 using System.Collections;
5 using System.Diagnostics;
5 using System.Diagnostics;
6
6
7 namespace Implab.Parallels {
7 namespace Implab.Parallels {
8 public class AsyncQueue<T> : IEnumerable<T> {
8 public class AsyncQueue<T> : IEnumerable<T> {
9 class Chunk {
9 class Chunk {
10 public Chunk next;
10 public Chunk next;
11
11
12 int m_low;
12 int m_low;
13 int m_hi;
13 int m_hi;
14 int m_alloc;
14 int m_alloc;
15 readonly int m_size;
15 readonly int m_size;
16 readonly T[] m_data;
16 readonly T[] m_data;
17
17
18 public Chunk(int size) {
18 public Chunk(int size) {
19 m_size = size;
19 m_size = size;
20 m_data = new T[size];
20 m_data = new T[size];
21 }
21 }
22
22
23 public Chunk(int size, T value) {
23 public Chunk(int size, T value) {
24 m_size = size;
24 m_size = size;
25 m_hi = 1;
25 m_hi = 1;
26 m_alloc = 1;
26 m_alloc = 1;
27 m_data = new T[size];
27 m_data = new T[size];
28 m_data[0] = value;
28 m_data[0] = value;
29 }
29 }
30
30
31 public Chunk(int size, T[] data, int offset, int length, int alloc) {
31 public Chunk(int size, T[] data, int offset, int length, int alloc) {
32 m_size = size;
32 m_size = size;
33 m_hi = length;
33 m_hi = length;
34 m_alloc = alloc;
34 m_alloc = alloc;
35 m_data = new T[size];
35 m_data = new T[size];
36 Array.Copy(data, offset, m_data, 0, length);
36 Array.Copy(data, offset, m_data, 0, length);
37 }
37 }
38
38
39 public int Low {
39 public int Low {
40 get { return m_low; }
40 get { return m_low; }
41 }
41 }
42
42
43 public int Hi {
43 public int Hi {
44 get { return m_hi; }
44 get { return m_hi; }
45 }
45 }
46
46
47 public int Size {
47 public int Size {
48 get { return m_size; }
48 get { return m_size; }
49 }
49 }
50
50
51 public bool TryEnqueue(T value, out bool extend) {
51 public bool TryEnqueue(T value, out bool extend) {
52 var alloc = Interlocked.Increment(ref m_alloc) - 1;
52 var alloc = Interlocked.Increment(ref m_alloc) - 1;
53
53
54 if (alloc >= m_size) {
54 if (alloc >= m_size) {
55 extend = alloc == m_size;
55 extend = alloc == m_size;
56 return false;
56 return false;
57 }
57 }
58
58
59 extend = false;
59 extend = false;
60 m_data[alloc] = value;
60 m_data[alloc] = value;
61
61
62 while (alloc != Interlocked.CompareExchange(ref m_hi, alloc + 1, alloc)) {
62 while (alloc != Interlocked.CompareExchange(ref m_hi, alloc + 1, alloc)) {
63 // spin wait for commit
63 // spin wait for commit
64 }
64 }
65 return true;
65 return true;
66 }
66 }
67
67
68 /// <summary>
68 /// <summary>
69 /// Prevents from allocating new space in the chunk and waits for all write operations to complete
69 /// Prevents from allocating new space in the chunk and waits for all write operations to complete
70 /// </summary>
70 /// </summary>
71 public void Commit() {
71 public void Commit() {
72 var actual = Math.Min(Interlocked.Exchange(ref m_alloc, m_size + 1), m_size);
72 var actual = Math.Min(Interlocked.Exchange(ref m_alloc, m_size + 1), m_size);
73
73
74 while (m_hi != actual)
74 while (m_hi != actual)
75 Thread.MemoryBarrier();
75 Thread.MemoryBarrier();
76 }
76 }
77
77
78 public bool TryDequeue(out T value, out bool recycle) {
78 public bool TryDequeue(out T value, out bool recycle) {
79 int low;
79 int low;
80 do {
80 do {
81 low = m_low;
81 low = m_low;
82 if (low >= m_hi) {
82 if (low >= m_hi) {
83 value = default(T);
83 value = default(T);
84 recycle = (low == m_size);
84 recycle = (low == m_size);
85 return false;
85 return false;
86 }
86 }
87 } while(low != Interlocked.CompareExchange(ref m_low, low + 1, low));
87 } while(low != Interlocked.CompareExchange(ref m_low, low + 1, low));
88
88
89 recycle = (low == m_size - 1);
89 recycle = (low == m_size - 1);
90 value = m_data[low];
90 value = m_data[low];
91
91
92 return true;
92 return true;
93 }
93 }
94
94
95 public bool TryEnqueueBatch(T[] batch, int offset, int length, out int enqueued, out bool extend) {
95 public bool TryEnqueueBatch(T[] batch, int offset, int length, out int enqueued, out bool extend) {
96 //int alloc;
96 //int alloc;
97 //int allocSize;
97 //int allocSize;
98
98
99 var alloc = Interlocked.Add(ref m_alloc, length) - length;
99 var alloc = Interlocked.Add(ref m_alloc, length) - length;
100 if (alloc > m_size) {
100 if (alloc > m_size) {
101 // the chunk is full and someone already
101 // the chunk is full and someone already
102 // creating the new one
102 // creating the new one
103 enqueued = 0; // nothing was added
103 enqueued = 0; // nothing was added
104 extend = false; // the caller shouldn't try to extend the queue
104 extend = false; // the caller shouldn't try to extend the queue
105 return false; // nothing was added
105 return false; // nothing was added
106 }
106 }
107
107
108 enqueued = Math.Min(m_size - alloc, length);
108 enqueued = Math.Min(m_size - alloc, length);
109 extend = length > enqueued;
109 extend = length > enqueued;
110
110
111 if (enqueued == 0)
111 if (enqueued == 0)
112 return false;
112 return false;
113
113
114
114
115 Array.Copy(batch, offset, m_data, alloc, enqueued);
115 Array.Copy(batch, offset, m_data, alloc, enqueued);
116
116
117 while (alloc != Interlocked.CompareExchange(ref m_hi, alloc + enqueued, alloc)) {
117 while (alloc != Interlocked.CompareExchange(ref m_hi, alloc + enqueued, alloc)) {
118 // spin wait for commit
118 // spin wait for commit
119 }
119 }
120
120
121 return true;
121 return true;
122 }
122 }
123
123
124 public bool TryDequeueBatch(T[] buffer, int offset, int length,out int dequeued, out bool recycle) {
124 public bool TryDequeueBatch(T[] buffer, int offset, int length,out int dequeued, out bool recycle) {
125 int low, hi, batchSize;
125 int low, hi, batchSize;
126
126
127 do {
127 do {
128 low = m_low;
128 low = m_low;
129 hi = m_hi;
129 hi = m_hi;
130 if (low >= hi) {
130 if (low >= hi) {
131 dequeued = 0;
131 dequeued = 0;
132 recycle = (low == m_size); // recycling could be restarted and we need to signal again
132 recycle = (low == m_size); // recycling could be restarted and we need to signal again
133 return false;
133 return false;
134 }
134 }
135 batchSize = Math.Min(hi - low, length);
135 batchSize = Math.Min(hi - low, length);
136 } while(low != Interlocked.CompareExchange(ref m_low, low + batchSize, low));
136 } while(low != Interlocked.CompareExchange(ref m_low, low + batchSize, low));
137
137
138 recycle = (low == m_size - batchSize);
138 recycle = (low == m_size - batchSize);
139 dequeued = batchSize;
139 dequeued = batchSize;
140
140
141 Array.Copy(m_data, low, buffer, offset, batchSize);
141 Array.Copy(m_data, low, buffer, offset, batchSize);
142
142
143 return true;
143 return true;
144 }
144 }
145
145
146 public T GetAt(int pos) {
146 public T GetAt(int pos) {
147 return m_data[pos];
147 return m_data[pos];
148 }
148 }
149 }
149 }
150
150
151 public const int DEFAULT_CHUNK_SIZE = 32;
151 public const int DEFAULT_CHUNK_SIZE = 32;
152 public const int MAX_CHUNK_SIZE = 262144;
152 public const int MAX_CHUNK_SIZE = 262144;
153
153
154 Chunk m_first;
154 Chunk m_first;
155 Chunk m_last;
155 Chunk m_last;
156
156
157 /// <summary>
157 /// <summary>
158 /// Adds the specified value to the queue.
158 /// Adds the specified value to the queue.
159 /// </summary>
159 /// </summary>
160 /// <param name="value">Tha value which will be added to the queue.</param>
160 /// <param name="value">Tha value which will be added to the queue.</param>
161 public void Enqueue(T value) {
161 public void Enqueue(T value) {
162 var last = m_last;
162 var last = m_last;
163 // spin wait to the new chunk
163 // spin wait to the new chunk
164 bool extend = true;
164 bool extend = true;
165 while (last == null || !last.TryEnqueue(value, out extend)) {
165 while (last == null || !last.TryEnqueue(value, out extend)) {
166 // try to extend queue
166 // try to extend queue
167 if (extend || last == null) {
167 if (extend || last == null) {
168 var chunk = new Chunk(DEFAULT_CHUNK_SIZE, value);
168 var chunk = new Chunk(DEFAULT_CHUNK_SIZE, value);
169 if (EnqueueChunk(last, chunk))
169 if (EnqueueChunk(last, chunk))
170 break; // success! exit!
170 break; // success! exit!
171 last = m_last;
171 last = m_last;
172 } else {
172 } else {
173 while (last == m_last) {
173 while (last == m_last) {
174 Thread.MemoryBarrier();
174 Thread.MemoryBarrier();
175 }
175 }
176 last = m_last;
176 last = m_last;
177 }
177 }
178 }
178 }
179 }
179 }
180
180
181 /// <summary>
181 /// <summary>
182 /// Adds the specified data to the queue.
182 /// Adds the specified data to the queue.
183 /// </summary>
183 /// </summary>
184 /// <param name="data">The buffer which contains the data to be enqueued.</param>
184 /// <param name="data">The buffer which contains the data to be enqueued.</param>
185 /// <param name="offset">The offset of the data in the buffer.</param>
185 /// <param name="offset">The offset of the data in the buffer.</param>
186 /// <param name="length">The size of the data to read from the buffer.</param>
186 /// <param name="length">The size of the data to read from the buffer.</param>
187 public void EnqueueRange(T[] data, int offset, int length) {
187 public void EnqueueRange(T[] data, int offset, int length) {
188 if (data == null)
188 if (data == null)
189 throw new ArgumentNullException("data");
189 throw new ArgumentNullException("data");
190 if (length == 0)
191 return;
190 if (offset < 0)
192 if (offset < 0)
191 throw new ArgumentOutOfRangeException("offset");
193 throw new ArgumentOutOfRangeException("offset");
192 if (length < 1 || offset + length > data.Length)
194 if (length < 1 || offset + length > data.Length)
193 throw new ArgumentOutOfRangeException("length");
195 throw new ArgumentOutOfRangeException("length");
194
196
195 var last = m_last;
197 var last = m_last;
196
198
197 bool extend;
199 bool extend;
198 int enqueued;
200 int enqueued;
199
201
200 while (length > 0) {
202 while (length > 0) {
201 extend = true;
203 extend = true;
202 if (last != null && last.TryEnqueueBatch(data, offset, length, out enqueued, out extend)) {
204 if (last != null && last.TryEnqueueBatch(data, offset, length, out enqueued, out extend)) {
203 length -= enqueued;
205 length -= enqueued;
204 offset += enqueued;
206 offset += enqueued;
205 }
207 }
206
208
207 if (extend) {
209 if (extend) {
208 // there was no enough space in the chunk
210 // there was no enough space in the chunk
209 // or there was no chunks in the queue
211 // or there was no chunks in the queue
210
212
211 while (length > 0) {
213 while (length > 0) {
212
214
213 var size = Math.Min(length, MAX_CHUNK_SIZE);
215 var size = Math.Min(length, MAX_CHUNK_SIZE);
214
216
215 var chunk = new Chunk(
217 var chunk = new Chunk(
216 Math.Max(size, DEFAULT_CHUNK_SIZE),
218 Math.Max(size, DEFAULT_CHUNK_SIZE),
217 data,
219 data,
218 offset,
220 offset,
219 size,
221 size,
220 length // length >= size
222 length // length >= size
221 );
223 );
222
224
223 if (!EnqueueChunk(last, chunk)) {
225 if (!EnqueueChunk(last, chunk)) {
224 // looks like the queue has been updated then proceed from the beginning
226 // looks like the queue has been updated then proceed from the beginning
225 last = m_last;
227 last = m_last;
226 break;
228 break;
227 }
229 }
228
230
229 // we have successfully added the new chunk
231 // we have successfully added the new chunk
230 last = chunk;
232 last = chunk;
231 length -= size;
233 length -= size;
232 offset += size;
234 offset += size;
233 }
235 }
234 } else {
236 } else {
235 // we don't need to extend the queue, if we successfully enqueued data
237 // we don't need to extend the queue, if we successfully enqueued data
236 if (length == 0)
238 if (length == 0)
237 break;
239 break;
238
240
239 // if we need to wait while someone is extending the queue
241 // if we need to wait while someone is extending the queue
240 // spinwait
242 // spinwait
241 while (last == m_last) {
243 while (last == m_last) {
242 Thread.MemoryBarrier();
244 Thread.MemoryBarrier();
243 }
245 }
244
246
245 last = m_last;
247 last = m_last;
246 }
248 }
247 }
249 }
248 }
250 }
249
251
250 /// <summary>
252 /// <summary>
251 /// Tries to retrieve the first element from the queue.
253 /// Tries to retrieve the first element from the queue.
252 /// </summary>
254 /// </summary>
253 /// <returns><c>true</c>, if element is dequeued, <c>false</c> otherwise.</returns>
255 /// <returns><c>true</c>, if element is dequeued, <c>false</c> otherwise.</returns>
254 /// <param name="value">The value of the dequeued element.</param>
256 /// <param name="value">The value of the dequeued element.</param>
255 public bool TryDequeue(out T value) {
257 public bool TryDequeue(out T value) {
256 var chunk = m_first;
258 var chunk = m_first;
257 bool recycle;
259 bool recycle;
258 while (chunk != null) {
260 while (chunk != null) {
259
261
260 var result = chunk.TryDequeue(out value, out recycle);
262 var result = chunk.TryDequeue(out value, out recycle);
261
263
262 if (recycle) // this chunk is waste
264 if (recycle) // this chunk is waste
263 RecycleFirstChunk(chunk);
265 RecycleFirstChunk(chunk);
264 else
266 else
265 return result; // this chunk is usable and returned actual result
267 return result; // this chunk is usable and returned actual result
266
268
267 if (result) // this chunk is waste but the true result is always actual
269 if (result) // this chunk is waste but the true result is always actual
268 return true;
270 return true;
269
271
270 // try again
272 // try again
271 chunk = m_first;
273 chunk = m_first;
272 }
274 }
273
275
274 // the queue is empty
276 // the queue is empty
275 value = default(T);
277 value = default(T);
276 return false;
278 return false;
277 }
279 }
278
280
279 /// <summary>
281 /// <summary>
280 /// Tries to dequeue the specified amount of data from the queue.
282 /// Tries to dequeue the specified amount of data from the queue.
281 /// </summary>
283 /// </summary>
282 /// <returns><c>true</c>, if data was deuqueued, <c>false</c> otherwise.</returns>
284 /// <returns><c>true</c>, if data was deuqueued, <c>false</c> otherwise.</returns>
283 /// <param name="buffer">The buffer to which the data will be written.</param>
285 /// <param name="buffer">The buffer to which the data will be written.</param>
284 /// <param name="offset">The offset in the buffer at which the data will be written.</param>
286 /// <param name="offset">The offset in the buffer at which the data will be written.</param>
285 /// <param name="length">The maximum amount of data to be retrieved.</param>
287 /// <param name="length">The maximum amount of data to be retrieved.</param>
286 /// <param name="dequeued">The actual amout of the retrieved data.</param>
288 /// <param name="dequeued">The actual amout of the retrieved data.</param>
287 public bool TryDequeueRange(T[] buffer, int offset, int length, out int dequeued) {
289 public bool TryDequeueRange(T[] buffer, int offset, int length, out int dequeued) {
288 if (buffer == null)
290 if (buffer == null)
289 throw new ArgumentNullException("buffer");
291 throw new ArgumentNullException("buffer");
290 if (offset < 0)
292 if (offset < 0)
291 throw new ArgumentOutOfRangeException("offset");
293 throw new ArgumentOutOfRangeException("offset");
292 if (length < 1 || offset + length > buffer.Length)
294 if (length < 1 || offset + length > buffer.Length)
293 throw new ArgumentOutOfRangeException("length");
295 throw new ArgumentOutOfRangeException("length");
294
296
295 var chunk = m_first;
297 var chunk = m_first;
296 bool recycle;
298 bool recycle;
297 dequeued = 0;
299 dequeued = 0;
298 while (chunk != null) {
300 while (chunk != null) {
299
301
300 int actual;
302 int actual;
301 if (chunk.TryDequeueBatch(buffer, offset, length, out actual, out recycle)) {
303 if (chunk.TryDequeueBatch(buffer, offset, length, out actual, out recycle)) {
302 offset += actual;
304 offset += actual;
303 length -= actual;
305 length -= actual;
304 dequeued += actual;
306 dequeued += actual;
305 }
307 }
306
308
307 if (recycle) // this chunk is waste
309 if (recycle) // this chunk is waste
308 RecycleFirstChunk(chunk);
310 RecycleFirstChunk(chunk);
309 else if (actual == 0)
311 else if (actual == 0)
310 break; // the chunk is usable, but doesn't contain any data (it's the last chunk in the queue)
312 break; // the chunk is usable, but doesn't contain any data (it's the last chunk in the queue)
311
313
312 if (length == 0)
314 if (length == 0)
313 return true;
315 return true;
314
316
315 // we still may dequeue something
317 // we still may dequeue something
316 // try again
318 // try again
317 chunk = m_first;
319 chunk = m_first;
318 }
320 }
319
321
320 return dequeued != 0;
322 return dequeued != 0;
321 }
323 }
322
324
323 /// <summary>
325 /// <summary>
324 /// Tries to dequeue all remaining data in the first chunk.
326 /// Tries to dequeue all remaining data in the first chunk.
325 /// </summary>
327 /// </summary>
326 /// <returns><c>true</c>, if data was dequeued, <c>false</c> otherwise.</returns>
328 /// <returns><c>true</c>, if data was dequeued, <c>false</c> otherwise.</returns>
327 /// <param name="buffer">The buffer to which the data will be written.</param>
329 /// <param name="buffer">The buffer to which the data will be written.</param>
328 /// <param name="offset">The offset in the buffer at which the data will be written.</param>
330 /// <param name="offset">The offset in the buffer at which the data will be written.</param>
329 /// <param name="length">Tha maximum amount of the data to be dequeued.</param>
331 /// <param name="length">Tha maximum amount of the data to be dequeued.</param>
330 /// <param name="dequeued">The actual amount of the dequeued data.</param>
332 /// <param name="dequeued">The actual amount of the dequeued data.</param>
331 public bool TryDequeueChunk(T[] buffer, int offset, int length, out int dequeued) {
333 public bool TryDequeueChunk(T[] buffer, int offset, int length, out int dequeued) {
332 if (buffer == null)
334 if (buffer == null)
333 throw new ArgumentNullException("buffer");
335 throw new ArgumentNullException("buffer");
334 if (offset < 0)
336 if (offset < 0)
335 throw new ArgumentOutOfRangeException("offset");
337 throw new ArgumentOutOfRangeException("offset");
336 if (length < 1 || offset + length > buffer.Length)
338 if (length < 1 || offset + length > buffer.Length)
337 throw new ArgumentOutOfRangeException("length");
339 throw new ArgumentOutOfRangeException("length");
338
340
339 var chunk = m_first;
341 var chunk = m_first;
340 bool recycle;
342 bool recycle;
341 dequeued = 0;
343 dequeued = 0;
342
344
343 while (chunk != null) {
345 while (chunk != null) {
344
346
345 int actual;
347 int actual;
346 if (chunk.TryDequeueBatch(buffer, offset, length, out actual, out recycle)) {
348 if (chunk.TryDequeueBatch(buffer, offset, length, out actual, out recycle)) {
347 dequeued = actual;
349 dequeued = actual;
348 }
350 }
349
351
350 if (recycle) // this chunk is waste
352 if (recycle) // this chunk is waste
351 RecycleFirstChunk(chunk);
353 RecycleFirstChunk(chunk);
352
354
353 // if we have dequeued any data, then return
355 // if we have dequeued any data, then return
354 if (dequeued != 0)
356 if (dequeued != 0)
355 return true;
357 return true;
356
358
357 // we still may dequeue something
359 // we still may dequeue something
358 // try again
360 // try again
359 chunk = m_first;
361 chunk = m_first;
360 }
362 }
361
363
362 return false;
364 return false;
363 }
365 }
364
366
365 bool EnqueueChunk(Chunk last, Chunk chunk) {
367 bool EnqueueChunk(Chunk last, Chunk chunk) {
366 if (Interlocked.CompareExchange(ref m_last, chunk, last) != last)
368 if (Interlocked.CompareExchange(ref m_last, chunk, last) != last)
367 return false;
369 return false;
368
370
369 if (last != null)
371 if (last != null)
370 last.next = chunk;
372 last.next = chunk;
371 else {
373 else {
372 m_first = chunk;
374 m_first = chunk;
373 }
375 }
374 return true;
376 return true;
375 }
377 }
376
378
377 void RecycleFirstChunk(Chunk first) {
379 void RecycleFirstChunk(Chunk first) {
378 var next = first.next;
380 var next = first.next;
379
381
380 if (first != Interlocked.CompareExchange(ref m_first, next, first))
382 if (first != Interlocked.CompareExchange(ref m_first, next, first))
381 return;
383 return;
382
384
383 if (next == null) {
385 if (next == null) {
384
386
385 if (first != Interlocked.CompareExchange(ref m_last, null, first)) {
387 if (first != Interlocked.CompareExchange(ref m_last, null, first)) {
386 /*while (first.next == null)
388 /*while (first.next == null)
387 Thread.MemoryBarrier();*/
389 Thread.MemoryBarrier();*/
388
390
389 // race
391 // race
390 // someone already updated the tail, restore the pointer to the queue head
392 // someone already updated the tail, restore the pointer to the queue head
391 m_first = first;
393 m_first = first;
392 }
394 }
393 // the tail is updated
395 // the tail is updated
394 }
396 }
395
397
396 // we need to update the head
398 // we need to update the head
397 //Interlocked.CompareExchange(ref m_first, next, first);
399 //Interlocked.CompareExchange(ref m_first, next, first);
398 // if the head is already updated then give up
400 // if the head is already updated then give up
399 //return;
401 //return;
400
402
401 }
403 }
402
404
403 public void Clear() {
405 public void Clear() {
404 // start the new queue
406 // start the new queue
405 var chunk = new Chunk(DEFAULT_CHUNK_SIZE);
407 var chunk = new Chunk(DEFAULT_CHUNK_SIZE);
406
408
407 do {
409 do {
408 Thread.MemoryBarrier();
410 Thread.MemoryBarrier();
409 var first = m_first;
411 var first = m_first;
410 var last = m_last;
412 var last = m_last;
411
413
412 if (last == null) // nothing to clear
414 if (last == null) // nothing to clear
413 return;
415 return;
414
416
415 if (first == null || (first.next == null && first != last)) // inconcistency
417 if (first == null || (first.next == null && first != last)) // inconcistency
416 continue;
418 continue;
417
419
418 // here we will create inconsistency which will force others to spin
420 // here we will create inconsistency which will force others to spin
419 // and prevent from fetching. chunk.next = null
421 // and prevent from fetching. chunk.next = null
420 if (first != Interlocked.CompareExchange(ref m_first, chunk, first))
422 if (first != Interlocked.CompareExchange(ref m_first, chunk, first))
421 continue;// inconsistent
423 continue;// inconsistent
422
424
423 m_last = chunk;
425 m_last = chunk;
424
426
425 return;
427 return;
426
428
427 } while(true);
429 } while(true);
428 }
430 }
429
431
430 public T[] Drain() {
432 public T[] Drain() {
431 // start the new queue
433 // start the new queue
432 var chunk = new Chunk(DEFAULT_CHUNK_SIZE);
434 var chunk = new Chunk(DEFAULT_CHUNK_SIZE);
433
435
434 do {
436 do {
435 Thread.MemoryBarrier();
437 Thread.MemoryBarrier();
436 var first = m_first;
438 var first = m_first;
437 var last = m_last;
439 var last = m_last;
438
440
439 if (last == null)
441 if (last == null)
440 return new T[0];
442 return new T[0];
441
443
442 if (first == null || (first.next == null && first != last))
444 if (first == null || (first.next == null && first != last))
443 continue;
445 continue;
444
446
445 // here we will create inconsistency which will force others to spin
447 // here we will create inconsistency which will force others to spin
446 // and prevent from fetching. chunk.next = null
448 // and prevent from fetching. chunk.next = null
447 if (first != Interlocked.CompareExchange(ref m_first, chunk, first))
449 if (first != Interlocked.CompareExchange(ref m_first, chunk, first))
448 continue;// inconsistent
450 continue;// inconsistent
449
451
450 last = Interlocked.Exchange(ref m_last, chunk);
452 last = Interlocked.Exchange(ref m_last, chunk);
451
453
452 return ReadChunks(first, last);
454 return ReadChunks(first, last);
453
455
454 } while(true);
456 } while(true);
455 }
457 }
456
458
457 static T[] ReadChunks(Chunk chunk, object last) {
459 static T[] ReadChunks(Chunk chunk, object last) {
458 var result = new List<T>();
460 var result = new List<T>();
459 var buffer = new T[DEFAULT_CHUNK_SIZE];
461 var buffer = new T[DEFAULT_CHUNK_SIZE];
460 int actual;
462 int actual;
461 bool recycle;
463 bool recycle;
462 while (chunk != null) {
464 while (chunk != null) {
463 // ensure all write operations on the chunk are complete
465 // ensure all write operations on the chunk are complete
464 chunk.Commit();
466 chunk.Commit();
465
467
466 // we need to read the chunk using this way
468 // we need to read the chunk using this way
467 // since some client still may completing the dequeue
469 // since some client still may completing the dequeue
468 // operation, such clients most likely won't get results
470 // operation, such clients most likely won't get results
469 while (chunk.TryDequeueBatch(buffer, 0, buffer.Length, out actual, out recycle))
471 while (chunk.TryDequeueBatch(buffer, 0, buffer.Length, out actual, out recycle))
470 result.AddRange(new ArraySegmentCollection(buffer, 0, actual));
472 result.AddRange(new ArraySegmentCollection(buffer, 0, actual));
471
473
472 if (chunk == last) {
474 if (chunk == last) {
473 chunk = null;
475 chunk = null;
474 } else {
476 } else {
475 while (chunk.next == null)
477 while (chunk.next == null)
476 Thread.MemoryBarrier();
478 Thread.MemoryBarrier();
477 chunk = chunk.next;
479 chunk = chunk.next;
478 }
480 }
479 }
481 }
480
482
481 return result.ToArray();
483 return result.ToArray();
482 }
484 }
483
485
484 struct ArraySegmentCollection : ICollection<T> {
486 struct ArraySegmentCollection : ICollection<T> {
485 readonly T[] m_data;
487 readonly T[] m_data;
486 readonly int m_offset;
488 readonly int m_offset;
487 readonly int m_length;
489 readonly int m_length;
488
490
489 public ArraySegmentCollection(T[] data, int offset, int length) {
491 public ArraySegmentCollection(T[] data, int offset, int length) {
490 m_data = data;
492 m_data = data;
491 m_offset = offset;
493 m_offset = offset;
492 m_length = length;
494 m_length = length;
493 }
495 }
494
496
495 #region ICollection implementation
497 #region ICollection implementation
496
498
497 public void Add(T item) {
499 public void Add(T item) {
498 throw new NotSupportedException();
500 throw new NotSupportedException();
499 }
501 }
500
502
501 public void Clear() {
503 public void Clear() {
502 throw new NotSupportedException();
504 throw new NotSupportedException();
503 }
505 }
504
506
505 public bool Contains(T item) {
507 public bool Contains(T item) {
506 return false;
508 return false;
507 }
509 }
508
510
509 public void CopyTo(T[] array, int arrayIndex) {
511 public void CopyTo(T[] array, int arrayIndex) {
510 Array.Copy(m_data,m_offset,array,arrayIndex, m_length);
512 Array.Copy(m_data,m_offset,array,arrayIndex, m_length);
511 }
513 }
512
514
513 public bool Remove(T item) {
515 public bool Remove(T item) {
514 throw new NotSupportedException();
516 throw new NotSupportedException();
515 }
517 }
516
518
517 public int Count {
519 public int Count {
518 get {
520 get {
519 return m_length;
521 return m_length;
520 }
522 }
521 }
523 }
522
524
523 public bool IsReadOnly {
525 public bool IsReadOnly {
524 get {
526 get {
525 return true;
527 return true;
526 }
528 }
527 }
529 }
528
530
529 #endregion
531 #endregion
530
532
531 #region IEnumerable implementation
533 #region IEnumerable implementation
532
534
533 public IEnumerator<T> GetEnumerator() {
535 public IEnumerator<T> GetEnumerator() {
534 for (int i = m_offset; i < m_length + m_offset; i++)
536 for (int i = m_offset; i < m_length + m_offset; i++)
535 yield return m_data[i];
537 yield return m_data[i];
536 }
538 }
537
539
538 #endregion
540 #endregion
539
541
540 #region IEnumerable implementation
542 #region IEnumerable implementation
541
543
542 IEnumerator IEnumerable.GetEnumerator() {
544 IEnumerator IEnumerable.GetEnumerator() {
543 return GetEnumerator();
545 return GetEnumerator();
544 }
546 }
545
547
546 #endregion
548 #endregion
547 }
549 }
548
550
549 #region IEnumerable implementation
551 #region IEnumerable implementation
550
552
551 class Enumerator : IEnumerator<T> {
553 class Enumerator : IEnumerator<T> {
552 Chunk m_current;
554 Chunk m_current;
553 int m_pos = -1;
555 int m_pos = -1;
554
556
555 public Enumerator(Chunk fisrt) {
557 public Enumerator(Chunk fisrt) {
556 m_current = fisrt;
558 m_current = fisrt;
557 }
559 }
558
560
559 #region IEnumerator implementation
561 #region IEnumerator implementation
560
562
561 public bool MoveNext() {
563 public bool MoveNext() {
562 if (m_current == null)
564 if (m_current == null)
563 return false;
565 return false;
564
566
565 if (m_pos == -1)
567 if (m_pos == -1)
566 m_pos = m_current.Low;
568 m_pos = m_current.Low;
567 else
569 else
568 m_pos++;
570 m_pos++;
569
571
570 if (m_pos == m_current.Hi) {
572 if (m_pos == m_current.Hi) {
571
573
572 m_current = m_pos == m_current.Size ? m_current.next : null;
574 m_current = m_pos == m_current.Size ? m_current.next : null;
573
575
574 m_pos = 0;
576 m_pos = 0;
575
577
576 if (m_current == null)
578 if (m_current == null)
577 return false;
579 return false;
578 }
580 }
579
581
580 return true;
582 return true;
581 }
583 }
582
584
583 public void Reset() {
585 public void Reset() {
584 throw new NotSupportedException();
586 throw new NotSupportedException();
585 }
587 }
586
588
587 object IEnumerator.Current {
589 object IEnumerator.Current {
588 get {
590 get {
589 return Current;
591 return Current;
590 }
592 }
591 }
593 }
592
594
593 #endregion
595 #endregion
594
596
595 #region IDisposable implementation
597 #region IDisposable implementation
596
598
597 public void Dispose() {
599 public void Dispose() {
598 }
600 }
599
601
600 #endregion
602 #endregion
601
603
602 #region IEnumerator implementation
604 #region IEnumerator implementation
603
605
604 public T Current {
606 public T Current {
605 get {
607 get {
606 if (m_pos == -1 || m_current == null)
608 if (m_pos == -1 || m_current == null)
607 throw new InvalidOperationException();
609 throw new InvalidOperationException();
608 return m_current.GetAt(m_pos);
610 return m_current.GetAt(m_pos);
609 }
611 }
610 }
612 }
611
613
612 #endregion
614 #endregion
613 }
615 }
614
616
615 public IEnumerator<T> GetEnumerator() {
617 public IEnumerator<T> GetEnumerator() {
616 return new Enumerator(m_first);
618 return new Enumerator(m_first);
617 }
619 }
618
620
619 #endregion
621 #endregion
620
622
621 #region IEnumerable implementation
623 #region IEnumerable implementation
622
624
623 IEnumerator IEnumerable.GetEnumerator() {
625 IEnumerator IEnumerable.GetEnumerator() {
624 return GetEnumerator();
626 return GetEnumerator();
625 }
627 }
626
628
627 #endregion
629 #endregion
628 }
630 }
629 }
631 }
@@ -1,75 +1,76
1 using System;
1 using System;
2 using System.Threading;
2 using System.Threading;
3 using System.Diagnostics;
3 using System.Diagnostics;
4
4
5 namespace Implab.Parallels {
5 namespace Implab.Parallels {
6 /// <summary>
6 /// <summary>
7 /// Implements a lightweight mechanism to aquire a shared or an exclusive lock.
7 /// Implements a lightweight mechanism to aquire a shared or an exclusive lock.
8 /// </summary>
8 /// </summary>
9 public class SharedLock {
9 public class SharedLock {
10 readonly object m_lock = new object();
10 readonly object m_lock = new object();
11 int m_locks;
11 int m_locks;
12 bool m_exclusive;
12 bool m_exclusive;
13
13
14 public bool LockExclusive(int timeout) {
14 public bool LockExclusive(int timeout) {
15 lock (m_lock) {
15 lock (m_lock) {
16 if (m_locks > 0 && !Monitor.Wait(m_lock, timeout))
16 if (m_locks > 0 && !Monitor.Wait(m_lock, timeout))
17 return false;
17 return false;
18 m_exclusive = true;
18 m_exclusive = true;
19 m_locks = 1;
19 m_locks = 1;
20 return true;
20 }
21 }
21 }
22 }
22
23
23 public void LockExclusive() {
24 public void LockExclusive() {
24 LockExclusive(-1);
25 LockExclusive(-1);
25 }
26 }
26
27
27 public bool LockShared(int timeout) {
28 public bool LockShared(int timeout) {
28 lock (m_lock) {
29 lock (m_lock) {
29 if (!m_exclusive) {
30 if (!m_exclusive) {
30 m_locks++;
31 m_locks++;
31 return true;
32 return true;
32 }
33 }
33
34
34 if (m_lock == 0) {
35 if (m_locks == 0) {
35 m_exclusive = false;
36 m_exclusive = false;
36 m_locks = 1;
37 m_locks = 1;
37 return true;
38 return true;
38 }
39 }
39
40
40 if (Monitor.Wait(m_lock, timeout)) {
41 if (Monitor.Wait(m_lock, timeout)) {
41 Debug.Assert(m_locks == 0);
42 Debug.Assert(m_locks == 0);
42 m_locks = 1;
43 m_locks = 1;
43 m_exclusive = false;
44 m_exclusive = false;
44 return true;
45 return true;
45 }
46 }
46 return false;
47 return false;
47 }
48 }
48 }
49 }
49
50
50 public void LockShared() {
51 public void LockShared() {
51 LockShared(-1);
52 LockShared(-1);
52 }
53 }
53
54
54 public void ReleaseShared() {
55 public void ReleaseShared() {
55 lock (m_lock) {
56 lock (m_lock) {
56 if (m_exclusive || m_locks <= 0)
57 if (m_exclusive || m_locks <= 0)
57 throw new InvalidOperationException();
58 throw new InvalidOperationException();
58 m_locks--;
59 m_locks--;
59 if (m_locks == 0)
60 if (m_locks == 0)
60 Monitor.PulseAll(m_lock);
61 Monitor.PulseAll(m_lock);
61 }
62 }
62 }
63 }
63
64
64 public void ReleaseExclusive() {
65 public void ReleaseExclusive() {
65 lock (m_lock) {
66 lock (m_lock) {
66 if (!m_exclusive && m_locks != 1)
67 if (!m_exclusive && m_locks != 1)
67 throw new InvalidOperationException();
68 throw new InvalidOperationException();
68 m_locks = 0;
69 m_locks = 0;
69 Monitor.PulseAll(m_lock);
70 Monitor.PulseAll(m_lock);
70 }
71 }
71 }
72 }
72
73
73 }
74 }
74 }
75 }
75
76
@@ -1,621 +1,622
1 using System;
1 using System;
2 using System.Diagnostics;
2 using System.Diagnostics;
3
3
4 namespace Implab {
4 namespace Implab {
5
5
6 /// <summary>
6 /// <summary>
7 /// Класс для асинхронного получСния Ρ€Π΅Π·ΡƒΠ»ΡŒΡ‚Π°Ρ‚ΠΎΠ². Π’Π°ΠΊ Π½Π°Π·Ρ‹Π²Π°Π΅ΠΌΠΎΠ΅ "ΠΎΠ±Π΅Ρ‰Π°Π½ΠΈΠ΅".
7 /// Класс для асинхронного получСния Ρ€Π΅Π·ΡƒΠ»ΡŒΡ‚Π°Ρ‚ΠΎΠ². Π’Π°ΠΊ Π½Π°Π·Ρ‹Π²Π°Π΅ΠΌΠΎΠ΅ "ΠΎΠ±Π΅Ρ‰Π°Π½ΠΈΠ΅".
8 /// </summary>
8 /// </summary>
9 /// <typeparam name="T">Π’ΠΈΠΏ ΠΏΠΎΠ»ΡƒΡ‡Π°Π΅ΠΌΠΎΠ³ΠΎ Ρ€Π΅Π·ΡƒΠ»ΡŒΡ‚Π°Ρ‚Π°</typeparam>
9 /// <typeparam name="T">Π’ΠΈΠΏ ΠΏΠΎΠ»ΡƒΡ‡Π°Π΅ΠΌΠΎΠ³ΠΎ Ρ€Π΅Π·ΡƒΠ»ΡŒΡ‚Π°Ρ‚Π°</typeparam>
10 /// <remarks>
10 /// <remarks>
11 /// <para>БСрвис ΠΏΡ€ΠΈ ΠΎΠ±Ρ€Π°Ρ‰Π΅Π½ΠΈΠΈ ΠΊ Π΅Π³ΠΎ ΠΌΠ΅Ρ‚ΠΎΠ΄Ρƒ Π΄Π°Π΅Ρ‚ ΠΎΠ±Π΅Ρ‰Π°ΠΈΠ½ΠΈΠ΅ ΠΎ Π²Ρ‹ΠΏΠΎΠ»Π½Π΅Π½ΠΈΠΈ ΠΎΠΏΠ΅Ρ€Π°Ρ†ΠΈΠΈ,
11 /// <para>БСрвис ΠΏΡ€ΠΈ ΠΎΠ±Ρ€Π°Ρ‰Π΅Π½ΠΈΠΈ ΠΊ Π΅Π³ΠΎ ΠΌΠ΅Ρ‚ΠΎΠ΄Ρƒ Π΄Π°Π΅Ρ‚ ΠΎΠ±Π΅Ρ‰Π°ΠΈΠ½ΠΈΠ΅ ΠΎ Π²Ρ‹ΠΏΠΎΠ»Π½Π΅Π½ΠΈΠΈ ΠΎΠΏΠ΅Ρ€Π°Ρ†ΠΈΠΈ,
12 /// ΠΊΠ»ΠΈΠ΅Π½Ρ‚ ΠΏΠΎΠ»ΡƒΡ‡ΠΈΠ² Ρ‚Π°ΠΊΠΎΠ΅ ΠΎΠ±Π΅Ρ‰Π°Π½ΠΈΠ΅ ΠΌΠΎΠΆΠ΅Ρ‚ ΡƒΡΡ‚Π°Π½ΠΎΠ²ΠΈΡ‚ΡŒ ряд ΠΎΠ±Ρ€Π°Ρ‚Π½Ρ‹Ρ… Π²Ρ‹Π·ΠΎΠ²ΠΎ для получСния
12 /// ΠΊΠ»ΠΈΠ΅Π½Ρ‚ ΠΏΠΎΠ»ΡƒΡ‡ΠΈΠ² Ρ‚Π°ΠΊΠΎΠ΅ ΠΎΠ±Π΅Ρ‰Π°Π½ΠΈΠ΅ ΠΌΠΎΠΆΠ΅Ρ‚ ΡƒΡΡ‚Π°Π½ΠΎΠ²ΠΈΡ‚ΡŒ ряд ΠΎΠ±Ρ€Π°Ρ‚Π½Ρ‹Ρ… Π²Ρ‹Π·ΠΎΠ²ΠΎ для получСния
13 /// событий выполнСния обСщания, Ρ‚ΠΎΠ΅ΡΡ‚ΡŒ Π·Π°Π²Π΅Ρ€ΡˆΠ΅Π½ΠΈΡ ΠΎΠΏΠ΅Ρ€Π°Ρ†ΠΈΠΈ ΠΈ прСдоставлСнии Ρ€Π΅Π·ΡƒΠ»ΡŒΡ‚Π°Ρ‚ΠΎΠ².</para>
13 /// событий выполнСния обСщания, Ρ‚ΠΎΠ΅ΡΡ‚ΡŒ Π·Π°Π²Π΅Ρ€ΡˆΠ΅Π½ΠΈΡ ΠΎΠΏΠ΅Ρ€Π°Ρ†ΠΈΠΈ ΠΈ прСдоставлСнии Ρ€Π΅Π·ΡƒΠ»ΡŒΡ‚Π°Ρ‚ΠΎΠ².</para>
14 /// <para>
14 /// <para>
15 /// ΠžΠ±Π΅Ρ‰Π΅Π½ΠΈΠ΅ ΠΌΠΎΠΆΠ΅Ρ‚ Π±Ρ‹Ρ‚ΡŒ ΠΊΠ°ΠΊ Π²Ρ‹ΠΏΠΎΠ»Π½Π΅Π½ΠΎ, Ρ‚Π°ΠΊ ΠΈ Π²Ρ‹ΠΏΠΎΠ»Π½Π΅Π½ΠΎ с ошибкой. Для подписки Π½Π°
15 /// ΠžΠ±Π΅Ρ‰Π΅Π½ΠΈΠ΅ ΠΌΠΎΠΆΠ΅Ρ‚ Π±Ρ‹Ρ‚ΡŒ ΠΊΠ°ΠΊ Π²Ρ‹ΠΏΠΎΠ»Π½Π΅Π½ΠΎ, Ρ‚Π°ΠΊ ΠΈ Π²Ρ‹ΠΏΠΎΠ»Π½Π΅Π½ΠΎ с ошибкой. Для подписки Π½Π°
16 /// Π΄Π°Π½Π½Ρ‹Π΅ события ΠΊΠ»ΠΈΠ΅Π½Ρ‚ Π΄ΠΎΠ»ΠΆΠ΅Π½ ΠΈΡΠΏΠΎΠ»ΡŒΠ·ΠΎΠ²Π°Ρ‚ΡŒ ΠΌΠ΅Ρ‚ΠΎΠ΄Ρ‹ <c>Then</c>.
16 /// Π΄Π°Π½Π½Ρ‹Π΅ события ΠΊΠ»ΠΈΠ΅Π½Ρ‚ Π΄ΠΎΠ»ΠΆΠ΅Π½ ΠΈΡΠΏΠΎΠ»ΡŒΠ·ΠΎΠ²Π°Ρ‚ΡŒ ΠΌΠ΅Ρ‚ΠΎΠ΄Ρ‹ <c>Then</c>.
17 /// </para>
17 /// </para>
18 /// <para>
18 /// <para>
19 /// БСрвис, Π² свою ΠΎΡ‡Π΅Ρ€Π΅Π΄ΡŒ, ΠΏΠΎ ΠΎΠΊΠΎΠ½Ρ‡Π°Π½ΠΈΡŽ выполнСния ΠΎΠΏΠ΅Ρ€Π°Ρ†ΠΈΠΈ (Π²ΠΎΠ·ΠΌΠΎΠΆΠ½ΠΎ с ошибкой),
19 /// БСрвис, Π² свою ΠΎΡ‡Π΅Ρ€Π΅Π΄ΡŒ, ΠΏΠΎ ΠΎΠΊΠΎΠ½Ρ‡Π°Π½ΠΈΡŽ выполнСния ΠΎΠΏΠ΅Ρ€Π°Ρ†ΠΈΠΈ (Π²ΠΎΠ·ΠΌΠΎΠΆΠ½ΠΎ с ошибкой),
20 /// ΠΈΡΠΏΠΎΠ»ΡŒΠ·ΡƒΠ΅Ρ‚ ΠΌΠ΅Ρ‚ΠΎΠ΄Ρ‹ <c>Resolve</c> Π»ΠΈΠ±ΠΎ <c>Reject</c> для оповСщСния ΠΊΠ»ΠΈΠ΅Ρ‚Π½Π° ΠΎ
20 /// ΠΈΡΠΏΠΎΠ»ΡŒΠ·ΡƒΠ΅Ρ‚ ΠΌΠ΅Ρ‚ΠΎΠ΄Ρ‹ <c>Resolve</c> Π»ΠΈΠ±ΠΎ <c>Reject</c> для оповСщСния ΠΊΠ»ΠΈΠ΅Ρ‚Π½Π° ΠΎ
21 /// Π²Ρ‹ΠΏΠΎΠ»Π½Π΅Π½ΠΈΠΈ обСщания.
21 /// Π²Ρ‹ΠΏΠΎΠ»Π½Π΅Π½ΠΈΠΈ обСщания.
22 /// </para>
22 /// </para>
23 /// <para>
23 /// <para>
24 /// Если сСрвСр успСл Π²Ρ‹ΠΏΠΎΠ»Π½ΠΈΡ‚ΡŒ ΠΎΠ±Π΅Ρ‰Π°Π½ΠΈΠ΅ Π΅Ρ‰Π΅ Π΄ΠΎ Ρ‚ΠΎΠ³ΠΎ, ΠΊΠ°ΠΊ ΠΊΠ»ΠΈΠ΅Π½Ρ‚ Π½Π° Π½Π΅Π³ΠΎ подписался,
24 /// Если сСрвСр успСл Π²Ρ‹ΠΏΠΎΠ»Π½ΠΈΡ‚ΡŒ ΠΎΠ±Π΅Ρ‰Π°Π½ΠΈΠ΅ Π΅Ρ‰Π΅ Π΄ΠΎ Ρ‚ΠΎΠ³ΠΎ, ΠΊΠ°ΠΊ ΠΊΠ»ΠΈΠ΅Π½Ρ‚ Π½Π° Π½Π΅Π³ΠΎ подписался,
25 /// Ρ‚ΠΎ Π² ΠΌΠΎΠΌΠ΅Π½Ρ‚ подписки ΠΊΠ»ΠΈΠ΅Π½Ρ‚Π° Π±ΡƒΠ΄ΡƒΡ‚ Π²Ρ‹Π·Π²Π°Π½Ρ‹ ΡΠΎΠΎΡ‚Π²Π΅Ρ‚ΡΠ²ΡƒΡŽΡ‰ΠΈΠ΅ события Π² синхронном
25 /// Ρ‚ΠΎ Π² ΠΌΠΎΠΌΠ΅Π½Ρ‚ подписки ΠΊΠ»ΠΈΠ΅Π½Ρ‚Π° Π±ΡƒΠ΄ΡƒΡ‚ Π²Ρ‹Π·Π²Π°Π½Ρ‹ ΡΠΎΠΎΡ‚Π²Π΅Ρ‚ΡΠ²ΡƒΡŽΡ‰ΠΈΠ΅ события Π² синхронном
26 /// Ρ€Π΅ΠΆΠΈΠΌΠ΅ ΠΈ ΠΊΠ»ΠΈΠ΅Π½Ρ‚ Π±ΡƒΠ΄Π΅Ρ‚ ΠΎΠΏΠΎΠ²Π΅Ρ‰Π΅Π½ Π² любом случаС. Π˜Π½Π°Ρ‡Π΅, ΠΎΠ±Ρ€Π°Π±ΠΎΡ‚Ρ‡ΠΈΠΊΠΈ Π΄ΠΎΠ±Π°Π²Π»ΡΡŽΡ‚ΡΡ Π²
26 /// Ρ€Π΅ΠΆΠΈΠΌΠ΅ ΠΈ ΠΊΠ»ΠΈΠ΅Π½Ρ‚ Π±ΡƒΠ΄Π΅Ρ‚ ΠΎΠΏΠΎΠ²Π΅Ρ‰Π΅Π½ Π² любом случаС. Π˜Π½Π°Ρ‡Π΅, ΠΎΠ±Ρ€Π°Π±ΠΎΡ‚Ρ‡ΠΈΠΊΠΈ Π΄ΠΎΠ±Π°Π²Π»ΡΡŽΡ‚ΡΡ Π²
27 /// список Π² порядкС подписания ΠΈ Π² этом ΠΆΠ΅ порядкС ΠΎΠ½ΠΈ Π±ΡƒΠ΄ΡƒΡ‚ Π²Ρ‹Π·Π²Π°Π½Ρ‹ ΠΏΡ€ΠΈ Π²Ρ‹ΠΏΠΎΠ»Π½Π΅Π½ΠΈΠΈ
27 /// список Π² порядкС подписания ΠΈ Π² этом ΠΆΠ΅ порядкС ΠΎΠ½ΠΈ Π±ΡƒΠ΄ΡƒΡ‚ Π²Ρ‹Π·Π²Π°Π½Ρ‹ ΠΏΡ€ΠΈ Π²Ρ‹ΠΏΠΎΠ»Π½Π΅Π½ΠΈΠΈ
28 /// обСщания.
28 /// обСщания.
29 /// </para>
29 /// </para>
30 /// <para>
30 /// <para>
31 /// ΠžΠ±Ρ€Π°Π±Π°Ρ‚Ρ‹Π²Π°Ρ Ρ€Π΅Π·ΡƒΠ»ΡŒΡ‚Π°Ρ‚Ρ‹ обСщания ΠΌΠΎΠΆΠ½ΠΎ ΠΏΡ€Π΅ΠΎΠ±Ρ€Π°Π·ΠΎΠ²Ρ‹Π²Π°Ρ‚ΡŒ Ρ€Π΅Π·ΡƒΠ»ΡŒΡ‚Π°Ρ‚Ρ‹ Π»ΠΈΠ±ΠΎ ΠΈΠ½ΠΈΡ†ΠΈΠΈΡ€ΠΎΠ²Π°Ρ‚ΡŒ
31 /// ΠžΠ±Ρ€Π°Π±Π°Ρ‚Ρ‹Π²Π°Ρ Ρ€Π΅Π·ΡƒΠ»ΡŒΡ‚Π°Ρ‚Ρ‹ обСщания ΠΌΠΎΠΆΠ½ΠΎ ΠΏΡ€Π΅ΠΎΠ±Ρ€Π°Π·ΠΎΠ²Ρ‹Π²Π°Ρ‚ΡŒ Ρ€Π΅Π·ΡƒΠ»ΡŒΡ‚Π°Ρ‚Ρ‹ Π»ΠΈΠ±ΠΎ ΠΈΠ½ΠΈΡ†ΠΈΠΈΡ€ΠΎΠ²Π°Ρ‚ΡŒ
32 /// связанныС асинхронныС ΠΎΠΏΠ΅Ρ€Π°Ρ†ΠΈΠΈ, ΠΊΠΎΡ‚ΠΎΡ€Ρ‹Π΅ Ρ‚Π°ΠΊΠΆΠ΅ Π²ΠΎΠ·Π²Ρ€Π°Ρ‰Π°ΡŽΡ‚ обСщания. Для этого слСдуСт
32 /// связанныС асинхронныС ΠΎΠΏΠ΅Ρ€Π°Ρ†ΠΈΠΈ, ΠΊΠΎΡ‚ΠΎΡ€Ρ‹Π΅ Ρ‚Π°ΠΊΠΆΠ΅ Π²ΠΎΠ·Π²Ρ€Π°Ρ‰Π°ΡŽΡ‚ обСщания. Для этого слСдуСт
33 /// ΠΈΡΠΏΠΎΠ»ΡŒΠ·ΠΎΠ²Π°Ρ‚ΡŒ ΡΠΎΠΎΡ‚Π²Π΅Ρ‚ΡΡ‚Π²ΡƒΡŽΡ‰ΡƒΡŽ Ρ„ΠΎΡ€ΠΌΡƒ ΠΌΠ΅Ρ‚ΠΎΠ΄Π΅ <c>Then</c>.
33 /// ΠΈΡΠΏΠΎΠ»ΡŒΠ·ΠΎΠ²Π°Ρ‚ΡŒ ΡΠΎΠΎΡ‚Π²Π΅Ρ‚ΡΡ‚Π²ΡƒΡŽΡ‰ΡƒΡŽ Ρ„ΠΎΡ€ΠΌΡƒ ΠΌΠ΅Ρ‚ΠΎΠ΄Π΅ <c>Then</c>.
34 /// </para>
34 /// </para>
35 /// <para>
35 /// <para>
36 /// Π’Π°ΠΊΠΆΠ΅ Ρ…ΠΎΡ€ΠΎΡˆΠΈΠΌ ΠΏΡ€Π°Π²ΠΈΠ»ΠΎΠΌ являСтся Ρ‚ΠΎ, Ρ‡Ρ‚ΠΎ <c>Resolve</c> ΠΈ <c>Reject</c> Π΄ΠΎΠ»ΠΆΠ΅Π½ Π²Ρ‹Π·Ρ‹Π²Π°Ρ‚ΡŒ
36 /// Π’Π°ΠΊΠΆΠ΅ Ρ…ΠΎΡ€ΠΎΡˆΠΈΠΌ ΠΏΡ€Π°Π²ΠΈΠ»ΠΎΠΌ являСтся Ρ‚ΠΎ, Ρ‡Ρ‚ΠΎ <c>Resolve</c> ΠΈ <c>Reject</c> Π΄ΠΎΠ»ΠΆΠ΅Π½ Π²Ρ‹Π·Ρ‹Π²Π°Ρ‚ΡŒ
37 /// Ρ‚ΠΎΠ»ΡŒΠΊΠΎ ΠΈΠ½ΠΈΡ†ΠΈΠ°Ρ‚ΠΎΡ€ обСщания ΠΈΠ½Π°Ρ‡Π΅ ΠΌΠΎΠ³ΡƒΡ‚ Π²ΠΎΠ·Π½ΠΈΠΊΠ½ΡƒΡ‚ΡŒ противорСчия.
37 /// Ρ‚ΠΎΠ»ΡŒΠΊΠΎ ΠΈΠ½ΠΈΡ†ΠΈΠ°Ρ‚ΠΎΡ€ обСщания ΠΈΠ½Π°Ρ‡Π΅ ΠΌΠΎΠ³ΡƒΡ‚ Π²ΠΎΠ·Π½ΠΈΠΊΠ½ΡƒΡ‚ΡŒ противорСчия.
38 /// </para>
38 /// </para>
39 /// </remarks>
39 /// </remarks>
40 public class Promise<T> : AbstractPromise<IDeferred<T>>, IPromise<T>, IDeferred<T> {
40 public class Promise<T> : AbstractPromise<IDeferred<T>>, IPromise<T>, IDeferred<T> {
41
41
42 class StubDeferred : IDeferred<T> {
42 class StubDeferred : IDeferred<T> {
43 public static readonly StubDeferred instance = new StubDeferred();
43 public static readonly StubDeferred instance = new StubDeferred();
44
44
45 StubDeferred() {
45 StubDeferred() {
46 }
46 }
47
47
48 #region IDeferred implementation
48 #region IDeferred implementation
49
49
50 public void Resolve(T value) {
50 public void Resolve(T value) {
51 }
51 }
52
52
53 public void Reject(Exception error) {
53 public void Reject(Exception error) {
54 }
54 }
55
55
56 #endregion
56 #endregion
57
57
58 #region ICancellable implementation
58 #region ICancellable implementation
59
59
60 public void Cancel() {
60 public void Cancel() {
61 }
61 }
62
62
63 #endregion
63 #endregion
64
64
65
65
66 }
66 }
67
67
68 class RemapDescriptor<T2> : IDeferred<T> {
68 class RemapDescriptor<T2> : IDeferred<T> {
69 readonly Func<T,T2> m_remap;
69 readonly Func<T,T2> m_remap;
70 readonly Func<Exception,T2> m_failed;
70 readonly Func<Exception,T2> m_failed;
71 readonly Func<T2> m_cancel;
71 readonly Func<T2> m_cancel;
72 readonly IDeferred<T2> m_deferred;
72 readonly IDeferred<T2> m_deferred;
73
73
74 public RemapDescriptor(Func<T,T2> remap, Func<Exception,T2> failed, Func<T2> cancel, IDeferred<T2> deferred ) {
74 public RemapDescriptor(Func<T,T2> remap, Func<Exception,T2> failed, Func<T2> cancel, IDeferred<T2> deferred ) {
75 Debug.Assert(deferred != null);
75 Debug.Assert(deferred != null);
76 m_remap = remap;
76 m_remap = remap;
77 m_failed = failed;
77 m_failed = failed;
78 m_cancel = cancel;
78 m_cancel = cancel;
79 m_deferred = deferred;
79 m_deferred = deferred;
80 }
80 }
81
81
82
82
83
83
84 #region IDeferred implementation
84 #region IDeferred implementation
85
85
86 public void Resolve(T value) {
86 public void Resolve(T value) {
87 if (m_remap != null) {
87 if (m_remap != null) {
88 try {
88 try {
89 m_deferred.Resolve(m_remap(value));
89 m_deferred.Resolve(m_remap(value));
90 } catch (Exception ex) {
90 } catch (Exception ex) {
91 Reject(ex);
91 Reject(ex);
92 }
92 }
93 }
93 }
94 }
94 }
95
95
96 public void Reject(Exception error) {
96 public void Reject(Exception error) {
97 if (m_failed != null) {
97 if (m_failed != null) {
98 try {
98 try {
99 m_deferred.Resolve(m_failed(error));
99 m_deferred.Resolve(m_failed(error));
100 } catch (Exception ex) {
100 } catch (Exception ex) {
101 m_deferred.Reject(ex);
101 m_deferred.Reject(ex);
102 }
102 }
103 } else {
103 } else {
104 m_deferred.Reject(error);
104 m_deferred.Reject(error);
105 }
105 }
106 }
106 }
107
107
108
108
109 #endregion
109 #endregion
110
110
111 #region ICancellable implementation
111 #region ICancellable implementation
112
112
113 public void Cancel() {
113 public void Cancel() {
114 if (m_cancel != null) {
114 if (m_cancel != null) {
115 try {
115 try {
116 m_deferred.Resolve(m_cancel());
116 m_deferred.Resolve(m_cancel());
117 } catch (Exception ex) {
117 } catch (Exception ex) {
118 Reject(ex);
118 Reject(ex);
119 }
119 }
120 } else {
120 } else {
121 m_deferred.Cancel();
121 m_deferred.Cancel();
122 }
122 }
123 }
123 }
124
124
125 #endregion
125 #endregion
126 }
126 }
127
127
128 class ListenerDescriptor : IDeferred<T> {
128 class ListenerDescriptor : IDeferred<T> {
129 readonly Action m_handler;
129 readonly Action m_handler;
130 readonly PromiseEventType m_events;
130 readonly PromiseEventType m_events;
131
131
132 public ListenerDescriptor(Action handler, PromiseEventType events) {
132 public ListenerDescriptor(Action handler, PromiseEventType events) {
133 Debug.Assert(handler != null);
133 Debug.Assert(handler != null);
134
134
135 m_handler = handler;
135 m_handler = handler;
136 m_events = events;
136 m_events = events;
137 }
137 }
138
138
139 #region IDeferred implementation
139 #region IDeferred implementation
140
140
141 public void Resolve(T value) {
141 public void Resolve(T value) {
142 if (m_events.HasFlag(PromiseEventType.Success)) {
142 if (m_events.HasFlag(PromiseEventType.Success)) {
143 try {
143 try {
144 m_handler();
144 m_handler();
145 // Analysis disable once EmptyGeneralCatchClause
145 // Analysis disable once EmptyGeneralCatchClause
146 } catch {
146 } catch {
147 }
147 }
148 }
148 }
149 }
149 }
150
150
151 public void Reject(Exception error) {
151 public void Reject(Exception error) {
152 if (m_events.HasFlag(PromiseEventType.Error)){
152 if (m_events.HasFlag(PromiseEventType.Error)){
153 try {
153 try {
154 m_handler();
154 m_handler();
155 // Analysis disable once EmptyGeneralCatchClause
155 // Analysis disable once EmptyGeneralCatchClause
156 } catch {
156 } catch {
157 }
157 }
158 }
158 }
159 }
159 }
160
160
161 #endregion
161 #endregion
162
162
163 #region ICancellable implementation
163 #region ICancellable implementation
164
164
165 public void Cancel() {
165 public void Cancel() {
166 if (m_events.HasFlag(PromiseEventType.Cancelled)){
166 if (m_events.HasFlag(PromiseEventType.Cancelled)){
167 try {
167 try {
168 m_handler();
168 m_handler();
169 // Analysis disable once EmptyGeneralCatchClause
169 // Analysis disable once EmptyGeneralCatchClause
170 } catch {
170 } catch {
171 }
171 }
172 }
172 }
173 }
173 }
174
174
175 #endregion
175 #endregion
176 }
176 }
177
177
178 class ValueEventDescriptor : IDeferred<T> {
178 class ValueEventDescriptor : IDeferred<T> {
179 readonly Action<T> m_success;
179 readonly Action<T> m_success;
180 readonly Action<Exception> m_failed;
180 readonly Action<Exception> m_failed;
181 readonly Action m_cancelled;
181 readonly Action m_cancelled;
182 readonly IDeferred<T> m_deferred;
182 readonly IDeferred<T> m_deferred;
183
183
184 public ValueEventDescriptor(Action<T> success, Action<Exception> failed, Action cancelled, IDeferred<T> deferred) {
184 public ValueEventDescriptor(Action<T> success, Action<Exception> failed, Action cancelled, IDeferred<T> deferred) {
185 Debug.Assert(deferred != null);
185 Debug.Assert(deferred != null);
186
186
187 m_success = success;
187 m_success = success;
188 m_failed = failed;
188 m_failed = failed;
189 m_cancelled = cancelled;
189 m_cancelled = cancelled;
190 m_deferred = deferred;
190 m_deferred = deferred;
191 }
191 }
192
192
193 #region IDeferred implementation
193 #region IDeferred implementation
194
194
195 public void Resolve(T value) {
195 public void Resolve(T value) {
196 if (m_success != null) {
196 if (m_success != null) {
197 try {
197 try {
198 m_success(value);
198 m_success(value);
199 m_deferred.Resolve(value);
199 m_deferred.Resolve(value);
200 } catch (Exception ex) {
200 } catch (Exception ex) {
201 Reject(ex);
201 Reject(ex);
202 }
202 }
203 }
203 }
204 }
204 }
205
205
206 public void Reject(Exception error) {
206 public void Reject(Exception error) {
207 if (m_failed != null) {
207 if (m_failed != null) {
208 try {
208 try {
209 m_failed(error);
209 m_failed(error);
210 m_deferred.Resolve(default(T));
210 m_deferred.Resolve(default(T));
211 } catch(Exception ex) {
211 } catch(Exception ex) {
212 m_deferred.Reject(ex);
212 m_deferred.Reject(ex);
213 }
213 }
214 } else {
214 } else {
215 m_deferred.Reject(error);
215 m_deferred.Reject(error);
216 }
216 }
217 }
217 }
218
218
219 #endregion
219 #endregion
220
220
221 #region ICancellable implementation
221 #region ICancellable implementation
222
222
223 public void Cancel() {
223 public void Cancel() {
224 if (m_cancelled != null) {
224 if (m_cancelled != null) {
225 try {
225 try {
226 m_cancelled();
226 m_cancelled();
227 m_deferred.Resolve(default(T));
227 m_deferred.Resolve(default(T));
228 } catch(Exception ex) {
228 } catch(Exception ex) {
229 Reject(ex);
229 Reject(ex);
230 }
230 }
231 } else {
231 } else {
232 m_deferred.Cancel();
232 m_deferred.Cancel();
233 }
233 }
234 }
234 }
235
235
236 #endregion
236 #endregion
237 }
237 }
238
238
239 public class EventDescriptor : IDeferred<T> {
239 public class EventDescriptor : IDeferred<T> {
240 readonly Action m_success;
240 readonly Action m_success;
241 readonly Action<Exception> m_failed;
241 readonly Action<Exception> m_failed;
242 readonly Action m_cancelled;
242 readonly Action m_cancelled;
243 readonly IDeferred<T> m_deferred;
243 readonly IDeferred<T> m_deferred;
244
244
245 public EventDescriptor(Action success, Action<Exception> failed, Action cancelled, IDeferred<T> deferred) {
245 public EventDescriptor(Action success, Action<Exception> failed, Action cancelled, IDeferred<T> deferred) {
246 Debug.Assert(deferred != null);
246 Debug.Assert(deferred != null);
247
247
248 m_success = success;
248 m_success = success;
249 m_failed = failed;
249 m_failed = failed;
250 m_cancelled = cancelled;
250 m_cancelled = cancelled;
251 m_deferred = deferred;
251 m_deferred = deferred;
252 }
252 }
253
253
254 #region IDeferred implementation
254 #region IDeferred implementation
255
255
256 public void Resolve(T value) {
256 public void Resolve(T value) {
257 if (m_success != null) {
257 if (m_success != null) {
258 try {
258 try {
259 m_success();
259 m_success();
260 m_deferred.Resolve(value);
260 m_deferred.Resolve(value);
261 } catch (Exception ex) {
261 } catch (Exception ex) {
262 Reject(ex);
262 Reject(ex);
263 }
263 }
264 }
264 }
265 }
265 }
266
266
267 public void Reject(Exception error) {
267 public void Reject(Exception error) {
268 if (m_failed != null) {
268 if (m_failed != null) {
269 try {
269 try {
270 m_failed(error);
270 m_failed(error);
271 m_deferred.Resolve(default(T));
271 m_deferred.Resolve(default(T));
272 }catch (Exception ex)
272 }catch (Exception ex)
273 {
273 {
274 m_deferred.Reject(ex);
274 m_deferred.Reject(ex);
275 }
275 }
276 } else {
276 } else {
277 m_deferred.Reject(error);
277 m_deferred.Reject(error);
278 }
278 }
279
279
280 }
280 }
281
281
282 #endregion
282 #endregion
283
283
284 #region ICancellable implementation
284 #region ICancellable implementation
285
285
286 public void Cancel() {
286 public void Cancel() {
287 if (m_cancelled != null) {
287 if (m_cancelled != null) {
288 try {
288 try {
289 m_cancelled();
289 m_cancelled();
290 m_deferred.Resolve(default(T));
290 m_deferred.Resolve(default(T));
291 } catch (Exception ex) {
291 } catch (Exception ex) {
292 Reject(ex);
292 Reject(ex);
293 }
293 }
294 } else {
294 } else {
295 m_deferred.Cancel();
295 m_deferred.Cancel();
296 }
296 }
297 }
297 }
298
298
299 #endregion
299 #endregion
300 }
300 }
301
301
302 T m_result;
302 T m_result;
303
303
304 public virtual void Resolve(T value) {
304 public virtual void Resolve(T value) {
305 BeginSetResult();
305 if (BeginSetResult()) {
306 m_result = value;
306 m_result = value;
307 EndSetResult();
307 EndSetResult();
308 }
308 }
309 }
309
310
310 public void Reject(Exception error) {
311 public void Reject(Exception error) {
311 SetError(error);
312 SetError(error);
312 }
313 }
313
314
314 public Type PromiseType {
315 public Type PromiseType {
315 get {
316 get {
316 return typeof(T);
317 return typeof(T);
317 }
318 }
318 }
319 }
319
320
320 public new T Join() {
321 public new T Join() {
321 WaitResult(-1);
322 WaitResult(-1);
322 return m_result;
323 return m_result;
323 }
324 }
324 public new T Join(int timeout) {
325 public new T Join(int timeout) {
325 WaitResult(timeout);
326 WaitResult(timeout);
326 return m_result;
327 return m_result;
327 }
328 }
328
329
329 public IPromise<T> On(Action<T> success, Action<Exception> error, Action cancel) {
330 public IPromise<T> On(Action<T> success, Action<Exception> error, Action cancel) {
330 AddHandler(new ValueEventDescriptor(success, error, cancel, StubDeferred.instance));
331 AddHandler(new ValueEventDescriptor(success, error, cancel, StubDeferred.instance));
331 return this;
332 return this;
332 }
333 }
333
334
334 public IPromise<T> On(Action<T> success, Action<Exception> error) {
335 public IPromise<T> On(Action<T> success, Action<Exception> error) {
335 AddHandler(new ValueEventDescriptor(success, error, null, StubDeferred.instance));
336 AddHandler(new ValueEventDescriptor(success, error, null, StubDeferred.instance));
336 return this;
337 return this;
337 }
338 }
338
339
339 public IPromise<T> On(Action<T> success) {
340 public IPromise<T> On(Action<T> success) {
340 AddHandler(new ValueEventDescriptor(success, null, null, StubDeferred.instance));
341 AddHandler(new ValueEventDescriptor(success, null, null, StubDeferred.instance));
341 return this;
342 return this;
342 }
343 }
343
344
344 public IPromise<T> On(Action handler, PromiseEventType events) {
345 public IPromise<T> On(Action handler, PromiseEventType events) {
345 Listen(events, handler);
346 Listen(events, handler);
346 return this;
347 return this;
347 }
348 }
348
349
349 public IPromise<T2> Then<T2>(Func<T, T2> mapper, Func<Exception, T2> error, Func<T2> cancel) {
350 public IPromise<T2> Then<T2>(Func<T, T2> mapper, Func<Exception, T2> error, Func<T2> cancel) {
350 var promise = new Promise<T2>();
351 var promise = new Promise<T2>();
351 AddHandler(new RemapDescriptor<T2>(mapper, error, cancel, promise));
352 AddHandler(new RemapDescriptor<T2>(mapper, error, cancel, promise));
352 return promise;
353 return promise;
353 }
354 }
354
355
355 public IPromise<T2> Then<T2>(Func<T, T2> mapper, Func<Exception, T2> error) {
356 public IPromise<T2> Then<T2>(Func<T, T2> mapper, Func<Exception, T2> error) {
356 var promise = new Promise<T2>();
357 var promise = new Promise<T2>();
357 AddHandler(new RemapDescriptor<T2>(mapper, error, null, promise));
358 AddHandler(new RemapDescriptor<T2>(mapper, error, null, promise));
358 return promise;
359 return promise;
359 }
360 }
360
361
361 public IPromise<T2> Then<T2>(Func<T, T2> mapper) {
362 public IPromise<T2> Then<T2>(Func<T, T2> mapper) {
362 var promise = new Promise<T2>();
363 var promise = new Promise<T2>();
363 AddHandler(new RemapDescriptor<T2>(mapper, null, null, promise));
364 AddHandler(new RemapDescriptor<T2>(mapper, null, null, promise));
364 return promise;
365 return promise;
365 }
366 }
366
367
367 public IPromise<T2> Chain<T2>(Func<T, IPromise<T2>> chained, Func<Exception, IPromise<T2>> error, Func<IPromise<T2>> cancel) {
368 public IPromise<T2> Chain<T2>(Func<T, IPromise<T2>> chained, Func<Exception, IPromise<T2>> error, Func<IPromise<T2>> cancel) {
368 // this promise will be resolved when an asyc operation is started
369 // this promise will be resolved when an asyc operation is started
369 var promise = new Promise<IPromise<T2>>();
370 var promise = new Promise<IPromise<T2>>();
370
371
371 AddHandler(new RemapDescriptor<IPromise<T2>>(
372 AddHandler(new RemapDescriptor<IPromise<T2>>(
372 chained,
373 chained,
373 error,
374 error,
374 cancel,
375 cancel,
375 promise
376 promise
376 ));
377 ));
377
378
378 var medium = new Promise<T2>();
379 var medium = new Promise<T2>();
379
380
380 if (chained != null)
381 if (chained != null)
381 medium.On(Cancel, PromiseEventType.Cancelled);
382 medium.On(Cancel, PromiseEventType.Cancelled);
382
383
383 // we need to connect started async operation with the medium
384 // we need to connect started async operation with the medium
384 // if the async operation hasn't been started by the some reason
385 // if the async operation hasn't been started by the some reason
385 // report is to the medium
386 // report is to the medium
386 promise.On(
387 promise.On(
387 result => ConnectPromise<T2>(result, medium),
388 result => ConnectPromise<T2>(result, medium),
388 medium.Reject,
389 medium.Reject,
389 medium.Cancel
390 medium.Cancel
390 );
391 );
391
392
392 return medium;
393 return medium;
393 }
394 }
394
395
395 static void ConnectPromise<T2>(IPromise<T2> result, Promise<T2> medium) {
396 static void ConnectPromise<T2>(IPromise<T2> result, Promise<T2> medium) {
396 if (result != null) {
397 if (result != null) {
397 result.On(
398 result.On(
398 medium.Resolve,
399 medium.Resolve,
399 medium.Reject,
400 medium.Reject,
400 () => medium.Reject(new OperationCanceledException())
401 () => medium.Reject(new OperationCanceledException())
401 );
402 );
402 medium.On(result.Cancel, PromiseEventType.Cancelled);
403 medium.On(result.Cancel, PromiseEventType.Cancelled);
403 } else {
404 } else {
404 medium.Reject(
405 medium.Reject(
405 new NullReferenceException(
406 new NullReferenceException(
406 "The chained asynchronous operation returned" +
407 "The chained asynchronous operation returned" +
407 " 'null' where the promise instance is expected"
408 " 'null' where the promise instance is expected"
408 )
409 )
409 );
410 );
410 }
411 }
411 }
412 }
412
413
413 public IPromise<T2> Chain<T2>(Func<T, IPromise<T2>> chained, Func<Exception, IPromise<T2>> error) {
414 public IPromise<T2> Chain<T2>(Func<T, IPromise<T2>> chained, Func<Exception, IPromise<T2>> error) {
414 return Chain(chained, error, null);
415 return Chain(chained, error, null);
415 }
416 }
416
417
417 public IPromise<T2> Chain<T2>(Func<T, IPromise<T2>> chained) {
418 public IPromise<T2> Chain<T2>(Func<T, IPromise<T2>> chained) {
418 return Chain(chained, null, null);
419 return Chain(chained, null, null);
419 }
420 }
420
421
421 public IPromise<T2> Error<T2>(Func<Exception, T2> error) {
422 public IPromise<T2> Error<T2>(Func<Exception, T2> error) {
422 var promise = new Promise<T2>();
423 var promise = new Promise<T2>();
423 if (error != null)
424 if (error != null)
424 On(
425 On(
425 (Action<T>)null,
426 (Action<T>)null,
426 ex => {
427 ex => {
427 try {
428 try {
428 promise.Resolve(error(ex));
429 promise.Resolve(error(ex));
429 } catch (Exception ex2) {
430 } catch (Exception ex2) {
430 promise.Reject(ex2);
431 promise.Reject(ex2);
431 }
432 }
432 }
433 }
433 );
434 );
434 else
435 else
435 Listen(PromiseEventType.Error, () => promise.Resolve(default(T2)));
436 Listen(PromiseEventType.Error, () => promise.Resolve(default(T2)));
436 return promise;
437 return promise;
437 }
438 }
438
439
439 public IPromise<T2> Cancelled<T2>(Func<T2> handler) {
440 public IPromise<T2> Cancelled<T2>(Func<T2> handler) {
440 var promise = new Promise<T2>();
441 var promise = new Promise<T2>();
441 if (handler != null)
442 if (handler != null)
442 On(
443 On(
443 (Action<T>)null,
444 (Action<T>)null,
444 null,
445 null,
445 () => {
446 () => {
446 try {
447 try {
447 promise.Resolve(handler());
448 promise.Resolve(handler());
448 } catch (Exception ex) {
449 } catch (Exception ex) {
449 promise.Reject(ex);
450 promise.Reject(ex);
450 }
451 }
451 });
452 });
452 else
453 else
453 Listen(PromiseEventType.Cancelled, () => promise.Resolve(default(T2)));
454 Listen(PromiseEventType.Cancelled, () => promise.Resolve(default(T2)));
454 return promise;
455 return promise;
455 }
456 }
456
457
457 public IPromise Then(Action success, Action<Exception> error, Action cancel) {
458 public IPromise Then(Action success, Action<Exception> error, Action cancel) {
458 var promise = new Promise<T>();
459 var promise = new Promise<T>();
459 if (success != null)
460 if (success != null)
460 promise.On(Cancel, PromiseEventType.Cancelled);
461 promise.On(Cancel, PromiseEventType.Cancelled);
461
462
462 AddHandler(new EventDescriptor(success, error, cancel, promise));
463 AddHandler(new EventDescriptor(success, error, cancel, promise));
463
464
464 return promise;
465 return promise;
465 }
466 }
466
467
467 public IPromise Then(Action success, Action<Exception> error) {
468 public IPromise Then(Action success, Action<Exception> error) {
468 return Then(success, error, null);
469 return Then(success, error, null);
469 }
470 }
470
471
471 public IPromise Then(Action success) {
472 public IPromise Then(Action success) {
472 return Then(success, null, null);
473 return Then(success, null, null);
473 }
474 }
474
475
475 public IPromise Chain(Func<IPromise> chained, Func<Exception, IPromise> error, Func<IPromise> cancel) {
476 public IPromise Chain(Func<IPromise> chained, Func<Exception, IPromise> error, Func<IPromise> cancel) {
476 var promise = new Promise<IPromise>();
477 var promise = new Promise<IPromise>();
477
478
478 AddHandler(
479 AddHandler(
479 new RemapDescriptor<IPromise>(
480 new RemapDescriptor<IPromise>(
480 x => chained(),
481 x => chained(),
481 error,
482 error,
482 cancel,
483 cancel,
483 promise
484 promise
484 )
485 )
485 );
486 );
486
487
487 var medium = new Promise();
488 var medium = new Promise();
488 if (chained != null)
489 if (chained != null)
489 medium.On(Cancel, PromiseEventType.Cancelled);
490 medium.On(Cancel, PromiseEventType.Cancelled);
490
491
491 promise.On(
492 promise.On(
492 result => ConnectPromise(result, medium),
493 result => ConnectPromise(result, medium),
493 medium.Reject,
494 medium.Reject,
494 medium.Cancel
495 medium.Cancel
495 );
496 );
496
497
497 return medium;
498 return medium;
498 }
499 }
499
500
500 static void ConnectPromise(IPromise result, Promise medium) {
501 static void ConnectPromise(IPromise result, Promise medium) {
501 if (result != null) {
502 if (result != null) {
502 result.On(
503 result.On(
503 medium.Resolve,
504 medium.Resolve,
504 medium.Reject,
505 medium.Reject,
505 () => medium.Reject(new OperationCanceledException())
506 () => medium.Reject(new OperationCanceledException())
506 );
507 );
507 medium.On(result.Cancel, PromiseEventType.Cancelled);
508 medium.On(result.Cancel, PromiseEventType.Cancelled);
508 } else {
509 } else {
509 medium.Reject(
510 medium.Reject(
510 new NullReferenceException(
511 new NullReferenceException(
511 "The chained asynchronous operation returned" +
512 "The chained asynchronous operation returned" +
512 " 'null' where the promise instance is expected"
513 " 'null' where the promise instance is expected"
513 )
514 )
514 );
515 );
515 }
516 }
516 }
517 }
517
518
518 public IPromise Chain(Func<IPromise> chained, Func<Exception, IPromise> error) {
519 public IPromise Chain(Func<IPromise> chained, Func<Exception, IPromise> error) {
519 return Chain(chained, error, null);
520 return Chain(chained, error, null);
520 }
521 }
521
522
522 public IPromise Chain(Func<IPromise> chained) {
523 public IPromise Chain(Func<IPromise> chained) {
523 return Chain(chained, null, null);
524 return Chain(chained, null, null);
524 }
525 }
525
526
526 public IPromise On(Action success, Action<Exception> error, Action cancel) {
527 public IPromise On(Action success, Action<Exception> error, Action cancel) {
527 AddHandler(new EventDescriptor(success,error,cancel, StubDeferred.instance));
528 AddHandler(new EventDescriptor(success,error,cancel, StubDeferred.instance));
528 return this;
529 return this;
529 }
530 }
530
531
531 public IPromise On(Action success, Action<Exception> error) {
532 public IPromise On(Action success, Action<Exception> error) {
532 AddHandler(new EventDescriptor(success, error, null, StubDeferred.instance));
533 AddHandler(new EventDescriptor(success, error, null, StubDeferred.instance));
533 return this;
534 return this;
534 }
535 }
535
536
536 public IPromise On(Action success) {
537 public IPromise On(Action success) {
537 Listen(PromiseEventType.Success, success);
538 Listen(PromiseEventType.Success, success);
538 return this;
539 return this;
539 }
540 }
540
541
541 IPromise IPromise.On(Action handler, PromiseEventType events) {
542 IPromise IPromise.On(Action handler, PromiseEventType events) {
542 Listen(events,handler);
543 Listen(events,handler);
543 return this;
544 return this;
544 }
545 }
545
546
546 public IPromise Error(Action<Exception> error) {
547 public IPromise Error(Action<Exception> error) {
547 var promise = new Promise();
548 var promise = new Promise();
548 if (error != null)
549 if (error != null)
549 On(
550 On(
550 (Action<T>)null,
551 (Action<T>)null,
551 ex => {
552 ex => {
552 try {
553 try {
553 error(ex);
554 error(ex);
554 promise.Resolve();
555 promise.Resolve();
555 } catch (Exception ex2) {
556 } catch (Exception ex2) {
556 promise.Reject(ex2);
557 promise.Reject(ex2);
557 }
558 }
558 });
559 });
559 else
560 else
560 Listen(PromiseEventType.Error, promise.Resolve);
561 Listen(PromiseEventType.Error, promise.Resolve);
561 return promise;
562 return promise;
562 }
563 }
563
564
564 public IPromise Cancelled(Action handler) {
565 public IPromise Cancelled(Action handler) {
565 var promise = new Promise();
566 var promise = new Promise();
566 if (handler != null)
567 if (handler != null)
567 On(
568 On(
568 (Action<T>)null,
569 (Action<T>)null,
569 null,
570 null,
570 () => {
571 () => {
571 try {
572 try {
572 handler();
573 handler();
573 promise.Resolve();
574 promise.Resolve();
574 } catch (Exception ex) {
575 } catch (Exception ex) {
575 promise.Reject(ex);
576 promise.Reject(ex);
576 }
577 }
577 });
578 });
578 else
579 else
579 Listen(PromiseEventType.Cancelled, promise.Resolve);
580 Listen(PromiseEventType.Cancelled, promise.Resolve);
580 return promise;
581 return promise;
581 }
582 }
582
583
583 public IPromise<T2> Cast<T2>() {
584 public IPromise<T2> Cast<T2>() {
584 return (IPromise<T2>)this;
585 return (IPromise<T2>)this;
585 }
586 }
586
587
587 #region implemented abstract members of AbstractPromise
588 #region implemented abstract members of AbstractPromise
588
589
589 protected override void SignalSuccess(IDeferred<T> handler) {
590 protected override void SignalSuccess(IDeferred<T> handler) {
590 handler.Resolve(m_result);
591 handler.Resolve(m_result);
591 }
592 }
592
593
593 protected override void SignalError(IDeferred<T> handler, Exception error) {
594 protected override void SignalError(IDeferred<T> handler, Exception error) {
594 handler.Reject(error);
595 handler.Reject(error);
595 }
596 }
596
597
597 protected override void SignalCancelled(IDeferred<T> handler) {
598 protected override void SignalCancelled(IDeferred<T> handler) {
598 handler.Cancel();
599 handler.Cancel();
599 }
600 }
600
601
601 protected override void Listen(PromiseEventType events, Action handler) {
602 protected override void Listen(PromiseEventType events, Action handler) {
602 if (handler != null)
603 if (handler != null)
603 AddHandler(new ListenerDescriptor(handler, events));
604 AddHandler(new ListenerDescriptor(handler, events));
604 }
605 }
605
606
606 #endregion
607 #endregion
607
608
608 public static IPromise<T> ResultToPromise(T value) {
609 public static IPromise<T> ResultToPromise(T value) {
609 var p = new Promise<T>();
610 var p = new Promise<T>();
610 p.Resolve(value);
611 p.Resolve(value);
611 return p;
612 return p;
612 }
613 }
613
614
614 public static IPromise<T> ExceptionToPromise(Exception error) {
615 public static IPromise<T> ExceptionToPromise(Exception error) {
615 var p = new Promise<T>();
616 var p = new Promise<T>();
616 p.Reject(error);
617 p.Reject(error);
617 return p;
618 return p;
618 }
619 }
619
620
620 }
621 }
621 }
622 }
General Comments 0
You need to be logged in to leave comments. Login now