##// END OF EJS Templates
runnable component, work in progress
cin -
r185:822aab37b107 ref20160224
parent child
Show More
@@ -0,0 +1,194
1 using System;
2 using System.Reflection;
3 using System.Threading;
4 using Implab.Parallels;
5 using Implab.Components;
6
7 #if MONO
8
9 using NUnit.Framework;
10 using TestClassAttribute = NUnit.Framework.TestFixtureAttribute;
11 using TestMethodAttribute = NUnit.Framework.TestAttribute;
12
13 #else
14
15 using Microsoft.VisualStudio.TestTools.UnitTesting;
16
17 #endif
18
19 namespace Implab.Test {
20 [TestClass]
21 public class RunnableComponentTests {
22
23 static void ShouldThrow(Action action) {
24 try {
25 action();
26 Assert.Fail();
27 } catch(AssertionException) {
28 throw;
29 } catch {
30 }
31 }
32
33 class Runnable : RunnableComponent {
34 public Runnable(bool initialized) : base(initialized) {
35 }
36
37 public Action MockInit {
38 get;
39 set;
40 }
41
42 public Func<IPromise> MockStart {
43 get;
44 set;
45 }
46
47 public Func<IPromise> MockStop {
48 get;
49 set;
50 }
51
52 protected override IPromise OnStart() {
53 return MockStart != null ? MockStart() : base.OnStart();
54 }
55
56 protected override IPromise OnStop() {
57 return MockStop != null ? MockStop() : base.OnStart();
58 }
59
60 protected override void OnInitialize() {
61 if (MockInit != null)
62 MockInit();
63 }
64 }
65
66 [TestMethod]
67 public void NormalFlowTest() {
68 var comp = new Runnable(false);
69
70 Assert.AreEqual(ExecutionState.Created, comp.State);
71
72 comp.Init();
73
74 Assert.AreEqual(ExecutionState.Ready, comp.State);
75
76 comp.Start().Join(1000);
77
78 Assert.AreEqual(ExecutionState.Running, comp.State);
79
80 comp.Stop().Join(1000);
81
82 Assert.AreEqual(ExecutionState.Disposed, comp.State);
83
84 }
85
86 [TestMethod]
87 public void InitFailTest() {
88 var comp = new Runnable(false) {
89 MockInit = () => {
90 throw new Exception("BAD");
91 }
92 };
93
94 ShouldThrow(() => comp.Start());
95 ShouldThrow(() => comp.Stop());
96 Assert.AreEqual(ExecutionState.Created, comp.State);
97
98 ShouldThrow(comp.Init);
99
100 Assert.AreEqual(ExecutionState.Failed, comp.State);
101
102 ShouldThrow(() => comp.Start());
103 ShouldThrow(() => comp.Stop());
104 Assert.AreEqual(ExecutionState.Failed, comp.State);
105
106 comp.Dispose();
107 Assert.AreEqual(ExecutionState.Disposed, comp.State);
108 }
109
110 [TestMethod]
111 public void DisposedTest() {
112
113 var comp = new Runnable(false);
114 comp.Dispose();
115
116 ShouldThrow(() => comp.Start());
117 ShouldThrow(() => comp.Stop());
118 ShouldThrow(comp.Init);
119
120 Assert.AreEqual(ExecutionState.Disposed, comp.State);
121 }
122
123 [TestMethod]
124 public void StartCancelTest() {
125 var comp = new Runnable(true) {
126 MockStart = () => PromiseHelper.Sleep(100000, 0)
127 };
128
129 var p = comp.Start();
130 Assert.AreEqual(ExecutionState.Starting, comp.State);
131 p.Cancel();
132 ShouldThrow(() => p.Join(1000));
133 Assert.AreEqual(ExecutionState.Failed, comp.State);
134 Assert.IsInstanceOfType(typeof(OperationCanceledException), comp.LastError);
135
136 comp.Dispose();
137 }
138
139 [TestMethod]
140 public void StartStopTest() {
141 var stop = new Signal();
142 var comp = new Runnable(true) {
143 MockStart = () => PromiseHelper.Sleep(100000, 0),
144 MockStop = () => AsyncPool.RunThread(stop.Wait)
145 };
146
147 var p1 = comp.Start();
148 var p2 = comp.Stop();
149 // should enter stopping state
150
151 ShouldThrow(p1.Join);
152 Assert.IsTrue(p1.IsCancelled);
153 Assert.AreEqual(ExecutionState.Stopping, comp.State);
154
155 stop.Set();
156 p2.Join(1000);
157 Assert.AreEqual(ExecutionState.Disposed, comp.State);
158 }
159
160 [TestMethod]
161 public void StartStopFailTest() {
162 var comp = new Runnable(true) {
163 MockStart = () => PromiseHelper.Sleep(100000, 0).Then(null,null,x => { throw new Exception("I'm dead"); })
164 };
165
166 comp.Start();
167 var p = comp.Stop();
168 // if Start fails to cancel, should fail to stop
169 ShouldThrow(() => p.Join(1000));
170 Assert.AreEqual(ExecutionState.Failed, comp.State);
171 Assert.IsNotNull(comp.LastError);
172 Assert.AreEqual("I'm dead", comp.LastError.Message);
173 }
174
175 [TestMethod]
176 public void StopCancelTest() {
177 var comp = new Runnable(true) {
178 MockStop = () => PromiseHelper.Sleep(100000, 0)
179 };
180
181 comp.Start();
182 var p = comp.Stop();
183 Assert.AreEqual(ExecutionState.Stopping, comp.State);
184 p.Cancel();
185 ShouldThrow(() => p.Join(1000));
186 Assert.AreEqual(ExecutionState.Failed, comp.State);
187 Assert.IsInstanceOfType(typeof(OperationCanceledException), comp.LastError);
188
189 comp.Dispose();
190 }
191
192 }
193 }
194
@@ -1,68 +1,69
1 1 <?xml version="1.0" encoding="utf-8"?>
2 2 <Project DefaultTargets="Build" ToolsVersion="4.0" xmlns="http://schemas.microsoft.com/developer/msbuild/2003">
3 3 <PropertyGroup>
4 4 <Configuration Condition=" '$(Configuration)' == '' ">Debug</Configuration>
5 5 <Platform Condition=" '$(Platform)' == '' ">AnyCPU</Platform>
6 6 <ProductVersion>8.0.30703</ProductVersion>
7 7 <SchemaVersion>2.0</SchemaVersion>
8 8 <ProjectGuid>{2BD05F84-E067-4B87-9477-FDC2676A21C6}</ProjectGuid>
9 9 <OutputType>Library</OutputType>
10 10 <RootNamespace>Implab.Test</RootNamespace>
11 11 <AssemblyName>Implab.Test</AssemblyName>
12 12 <TargetFrameworkVersion>v4.5</TargetFrameworkVersion>
13 13 <ReleaseVersion>0.2</ReleaseVersion>
14 14 </PropertyGroup>
15 15 <PropertyGroup Condition=" '$(Configuration)|$(Platform)' == 'Debug|AnyCPU' ">
16 16 <DebugSymbols>true</DebugSymbols>
17 17 <DebugType>full</DebugType>
18 18 <Optimize>false</Optimize>
19 19 <OutputPath>bin\Debug</OutputPath>
20 20 <DefineConstants>DEBUG;MONO</DefineConstants>
21 21 <ErrorReport>prompt</ErrorReport>
22 22 <WarningLevel>4</WarningLevel>
23 23 <ConsolePause>false</ConsolePause>
24 24 </PropertyGroup>
25 25 <PropertyGroup Condition=" '$(Configuration)|$(Platform)' == 'Release|AnyCPU' ">
26 26 <Optimize>true</Optimize>
27 27 <OutputPath>bin\Release</OutputPath>
28 28 <ErrorReport>prompt</ErrorReport>
29 29 <WarningLevel>4</WarningLevel>
30 30 <ConsolePause>false</ConsolePause>
31 31 <DefineConstants>MONO</DefineConstants>
32 32 </PropertyGroup>
33 33 <PropertyGroup Condition=" '$(Configuration)|$(Platform)' == 'Debug 4.5|AnyCPU' ">
34 34 <DebugSymbols>true</DebugSymbols>
35 35 <DebugType>full</DebugType>
36 36 <Optimize>false</Optimize>
37 37 <OutputPath>bin\Debug</OutputPath>
38 38 <DefineConstants>DEBUG;TRACE;NET_4_5;MONO</DefineConstants>
39 39 <ErrorReport>prompt</ErrorReport>
40 40 <WarningLevel>4</WarningLevel>
41 41 <ConsolePause>false</ConsolePause>
42 42 </PropertyGroup>
43 43 <PropertyGroup Condition=" '$(Configuration)|$(Platform)' == 'Release 4.5|AnyCPU' ">
44 44 <Optimize>true</Optimize>
45 45 <OutputPath>bin\Release</OutputPath>
46 46 <DefineConstants>NET_4_5;MONO</DefineConstants>
47 47 <ErrorReport>prompt</ErrorReport>
48 48 <WarningLevel>4</WarningLevel>
49 49 <ConsolePause>false</ConsolePause>
50 50 </PropertyGroup>
51 51 <ItemGroup>
52 52 <Reference Include="System" />
53 53 <Reference Include="nunit.framework" />
54 54 </ItemGroup>
55 55 <Import Project="$(MSBuildBinPath)\Microsoft.CSharp.targets" />
56 56 <ItemGroup>
57 57 <Compile Include="AsyncTests.cs" />
58 58 <Compile Include="PromiseHelper.cs" />
59 59 <Compile Include="Properties\AssemblyInfo.cs" />
60 60 <Compile Include="CancelationTests.cs" />
61 <Compile Include="RunnableComponentTests.cs" />
61 62 </ItemGroup>
62 63 <ItemGroup>
63 64 <ProjectReference Include="..\Implab\Implab.csproj">
64 65 <Project>{F550F1F8-8746-4AD0-9614-855F4C4B7F05}</Project>
65 66 <Name>Implab</Name>
66 67 </ProjectReference>
67 68 </ItemGroup>
68 69 </Project> No newline at end of file
@@ -1,304 +1,305
1 1 using System;
2 2 using Implab.Parallels;
3 3 using System.Threading;
4 4 using System.Reflection;
5 5
6 6 namespace Implab {
7 7 public abstract class AbstractEvent<THandler> : ICancellationToken, ICancellable {
8 8
9 9 const int UNRESOLVED_SATE = 0;
10 10 const int TRANSITIONAL_STATE = 1;
11 11 protected const int SUCCEEDED_STATE = 2;
12 12 protected const int REJECTED_STATE = 3;
13 13 protected const int CANCELLED_STATE = 4;
14 14
15 15 const int CANCEL_NOT_REQUESTED = 0;
16 16 const int CANCEL_REQUESTING = 1;
17 17 const int CANCEL_REQUESTED = 2;
18 18
19 19 const int RESERVED_HANDLERS_COUNT = 4;
20 20
21 21 int m_state;
22 22 Exception m_error;
23 23 int m_handlersCount;
24 24
25 25 //readonly THandler[] m_handlers = new THandler[RESERVED_HANDLERS_COUNT];
26 26 THandler[] m_handlers;
27 27 MTQueue<THandler> m_extraHandlers;
28 28 int m_handlerPointer = -1;
29 29 int m_handlersCommited;
30 30
31 31 int m_cancelRequest;
32 32 Exception m_cancelationReason;
33 33 MTQueue<Action<Exception>> m_cancelationHandlers;
34 34
35 35
36 36 #region state managment
37 37 bool BeginTransit() {
38 38 return UNRESOLVED_SATE == Interlocked.CompareExchange(ref m_state, TRANSITIONAL_STATE, UNRESOLVED_SATE);
39 39 }
40 40
41 41 void CompleteTransit(int state) {
42 42 if (TRANSITIONAL_STATE != Interlocked.CompareExchange(ref m_state, state, TRANSITIONAL_STATE))
43 43 throw new InvalidOperationException("Can't complete transition when the object isn't in the transitional state");
44 44 }
45 45
46 46 void WaitTransition() {
47 47 while (m_state == TRANSITIONAL_STATE) {
48 48 Thread.MemoryBarrier();
49 49 }
50 50 }
51 51
52 52 protected bool BeginSetResult() {
53 53 if (!BeginTransit()) {
54 54 WaitTransition();
55 55 if (m_state != CANCELLED_STATE)
56 56 throw new InvalidOperationException("The promise is already resolved");
57 57 return false;
58 58 }
59 59 return true;
60 60 }
61 61
62 62 protected void EndSetResult() {
63 63 CompleteTransit(SUCCEEDED_STATE);
64 64 Signal();
65 65 }
66 66
67 67
68 68
69 69 /// <summary>
70 70 /// ВыполняСт ΠΎΠ±Π΅Ρ‰Π°Π½ΠΈΠ΅, сообщая ΠΎΠ± ошибкС
71 71 /// </summary>
72 72 /// <remarks>
73 73 /// ΠŸΠΎΡΠΊΠΎΠ»ΡŒΠΊΡƒ ΠΎΠ±Π΅Ρ‰Π°Π½ΠΈΠ΅ Π΄ΠΎΠ»ΠΆΠ½ΠΎ Ρ€Π°Π±ΠΎΡ‚Π°Ρ‚ΡŒ Π² ΠΌΠ½ΠΎΠ³ΠΎΠΏΡ‚ΠΎΡ‡Π½ΠΎΠΉ срСдС, ΠΏΡ€ΠΈ Π΅Π³ΠΎ Π²Ρ‹ΠΏΠΎΠ»Π½Π΅Π½ΠΈΠΈ сразу нСсколько ΠΏΠΎΡ‚ΠΎΠΊΠΎΠ²
74 74 /// ΠΌΠΎΠ³Ρƒ Π²Π΅Ρ€Π½ΡƒΡ‚ΡŒ ΠΎΡˆΠΈΠ±ΠΊΡƒ, ΠΏΡ€ΠΈ этом Ρ‚ΠΎΠ»ΡŒΠΊΠΎ пСрвая Π±ΡƒΠ΄Π΅Ρ‚ использована Π² качСствС Ρ€Π΅Π·ΡƒΠ»ΡŒΡ‚Π°Ρ‚Π°, ΠΎΡΡ‚Π°Π»ΡŒΠ½Ρ‹Π΅
75 75 /// Π±ΡƒΠ΄ΡƒΡ‚ ΠΏΡ€ΠΎΠΈΠ³Π½ΠΎΡ€ΠΈΡ€ΠΎΠ²Π°Π½Ρ‹.
76 76 /// </remarks>
77 77 /// <param name="error">Π˜ΡΠΊΠ»ΡŽΡ‡Π΅Π½ΠΈΠ΅ возникшСС ΠΏΡ€ΠΈ Π²Ρ‹ΠΏΠΎΠ»Π½Π΅Π½ΠΈΠΈ ΠΎΠΏΠ΅Ρ€Π°Ρ†ΠΈΠΈ</param>
78 78 /// <exception cref="InvalidOperationException">Π”Π°Π½Π½ΠΎΠ΅ ΠΎΠ±Π΅Ρ‰Π°Π½ΠΈΠ΅ ΡƒΠΆΠ΅ Π²Ρ‹ΠΏΠΎΠ»Π½Π΅Π½ΠΎ</exception>
79 79 protected void SetError(Exception error) {
80 while (error is PromiseTransientException)
81 error = error.InnerException;
82
83 var isCancel = error is OperationCanceledException;
84
80 85 if (BeginTransit()) {
81 if (error is OperationCanceledException) {
82 m_error = error.InnerException;
83 CompleteTransit(CANCELLED_STATE);
84 } else {
85 m_error = error is PromiseTransientException ? error.InnerException : error;
86 CompleteTransit(REJECTED_STATE);
87 }
86 m_error = isCancel ? error.InnerException : error;
87 CompleteTransit(isCancel ? CANCELLED_STATE : REJECTED_STATE);
88
88 89 Signal();
89 90 } else {
90 91 WaitTransition();
91 if (m_state == SUCCEEDED_STATE)
92 if (!isCancel || m_state == SUCCEEDED_STATE)
92 93 throw new InvalidOperationException("The promise is already resolved");
93 94 }
94 95 }
95 96
96 97 /// <summary>
97 98 /// ΠžΡ‚ΠΌΠ΅Π½ΡΠ΅Ρ‚ ΠΎΠΏΠ΅Ρ€Π°Ρ†ΠΈΡŽ, Ссли это Π²ΠΎΠ·ΠΌΠΎΠΆΠ½ΠΎ.
98 99 /// </summary>
99 100 /// <remarks>Для опрСдСлСния Π±Ρ‹Π»Π° Π»ΠΈ опСрация ΠΎΡ‚ΠΌΠ΅Π½Π΅Π½Π° слСдуСт ΠΈΡΠΏΠΎΠ»ΡŒΠ·ΠΎΠ²Π°Ρ‚ΡŒ свойство <see cref="IsCancelled"/>.</remarks>
100 101 protected void SetCancelled(Exception reason) {
101 102 if (BeginTransit()) {
102 103 m_error = reason;
103 104 CompleteTransit(CANCELLED_STATE);
104 105 Signal();
105 106 }
106 107 }
107 108
108 109 protected abstract void SignalHandler(THandler handler, int signal);
109 110
110 111 void Signal() {
111 112 var hp = m_handlerPointer;
112 113 var slot = hp +1 ;
113 114 while (slot < m_handlersCommited) {
114 115 if (Interlocked.CompareExchange(ref m_handlerPointer, slot, hp) == hp) {
115 116 SignalHandler(m_handlers[slot], m_state);
116 117 }
117 118 hp = m_handlerPointer;
118 119 slot = hp +1 ;
119 120 }
120 121
121 122
122 123 if (m_extraHandlers != null) {
123 124 THandler handler;
124 125 while (m_extraHandlers.TryDequeue(out handler))
125 126 SignalHandler(handler, m_state);
126 127 }
127 128 }
128 129
129 130 #endregion
130 131
131 132 protected abstract Signal GetResolveSignal();
132 133
133 134 #region synchronization traits
134 135 protected void WaitResult(int timeout) {
135 136 if (!(IsResolved || GetResolveSignal().Wait(timeout)))
136 137 throw new TimeoutException();
137 138
138 139 switch (m_state) {
139 140 case SUCCEEDED_STATE:
140 141 return;
141 142 case CANCELLED_STATE:
142 143 throw new OperationCanceledException();
143 144 case REJECTED_STATE:
144 145 throw new TargetInvocationException(m_error);
145 146 default:
146 147 throw new ApplicationException(String.Format("Invalid promise state {0}", m_state));
147 148 }
148 149 }
149 150 #endregion
150 151
151 152 #region handlers managment
152 153
153 154 protected void AddHandler(THandler handler) {
154 155
155 156 if (m_state > 1) {
156 157 // the promise is in the resolved state, just invoke the handler
157 158 SignalHandler(handler, m_state);
158 159 } else {
159 160 var slot = Interlocked.Increment(ref m_handlersCount) - 1;
160 161
161 162 if (slot < RESERVED_HANDLERS_COUNT) {
162 163
163 164 if (slot == 0) {
164 165 m_handlers = new THandler[RESERVED_HANDLERS_COUNT];
165 166 } else {
166 167 while (m_handlers == null)
167 168 Thread.MemoryBarrier();
168 169 }
169 170
170 171 m_handlers[slot] = handler;
171 172
172 173 while (slot != Interlocked.CompareExchange(ref m_handlersCommited, slot + 1, slot)) {
173 174 }
174 175
175 176 if (m_state > 1) {
176 177 do {
177 178 var hp = m_handlerPointer;
178 179 slot = hp + 1;
179 180 if (slot < m_handlersCommited) {
180 181 if (Interlocked.CompareExchange(ref m_handlerPointer, slot, hp) != hp)
181 182 continue;
182 183 SignalHandler(m_handlers[slot], m_state);
183 184 }
184 185 break;
185 186 } while(true);
186 187 }
187 188 } else {
188 189 if (slot == RESERVED_HANDLERS_COUNT) {
189 190 m_extraHandlers = new MTQueue<THandler>();
190 191 } else {
191 192 while (m_extraHandlers == null)
192 193 Thread.MemoryBarrier();
193 194 }
194 195
195 196 m_extraHandlers.Enqueue(handler);
196 197
197 198 if (m_state > 1 && m_extraHandlers.TryDequeue(out handler))
198 199 // if the promise have been resolved while we was adding the handler to the queue
199 200 // we can't guarantee that someone is still processing it
200 201 // therefore we need to fetch a handler from the queue and execute it
201 202 // note that fetched handler may be not the one that we have added
202 203 // even we can fetch no handlers at all :)
203 204 SignalHandler(handler, m_state);
204 205 }
205 206 }
206 207 }
207 208
208 209 #endregion
209 210
210 211 #region IPromise implementation
211 212
212 213 public bool IsResolved {
213 214 get {
214 215 Thread.MemoryBarrier();
215 216 return m_state > 1;
216 217 }
217 218 }
218 219
219 220 public bool IsCancelled {
220 221 get {
221 222 Thread.MemoryBarrier();
222 223 return m_state == CANCELLED_STATE;
223 224 }
224 225 }
225 226
226 227 #endregion
227 228
228 229 public Exception Error {
229 230 get {
230 231 return m_error;
231 232 }
232 233 }
233 234
234 235 public bool CancelOperationIfRequested() {
235 236 if (IsCancellationRequested) {
236 237 CancelOperation(CancellationReason);
237 238 return true;
238 239 }
239 240 return false;
240 241 }
241 242
242 243 public virtual void CancelOperation(Exception reason) {
243 244 SetCancelled(reason);
244 245 }
245 246
246 247 public void CancellationRequested(Action<Exception> handler) {
247 248 Safe.ArgumentNotNull(handler, "handler");
248 249 if (IsCancellationRequested)
249 250 handler(CancellationReason);
250 251
251 252 if (m_cancelationHandlers == null)
252 253 Interlocked.CompareExchange(ref m_cancelationHandlers, new MTQueue<Action<Exception>>(), null);
253 254
254 255 m_cancelationHandlers.Enqueue(handler);
255 256
256 257 if (IsCancellationRequested && m_cancelationHandlers.TryDequeue(out handler))
257 258 // TryDeque implies MemoryBarrier()
258 259 handler(m_cancelationReason);
259 260 }
260 261
261 262 public bool IsCancellationRequested {
262 263 get {
263 264 do {
264 265 if (m_cancelRequest == CANCEL_NOT_REQUESTED)
265 266 return false;
266 267 if (m_cancelRequest == CANCEL_REQUESTED)
267 268 return true;
268 269 Thread.MemoryBarrier();
269 270 } while(true);
270 271 }
271 272 }
272 273
273 274 public Exception CancellationReason {
274 275 get {
275 276 do {
276 277 Thread.MemoryBarrier();
277 278 } while(m_cancelRequest == CANCEL_REQUESTING);
278 279
279 280 return m_cancelationReason;
280 281 }
281 282 }
282 283
283 284 #region ICancellable implementation
284 285
285 286 public void Cancel() {
286 287 Cancel(null);
287 288 }
288 289
289 290 public void Cancel(Exception reason) {
290 291 if (CANCEL_NOT_REQUESTED == Interlocked.CompareExchange(ref m_cancelRequest, CANCEL_REQUESTING, CANCEL_NOT_REQUESTED)) {
291 292 m_cancelationReason = reason;
292 293 m_cancelRequest = CANCEL_REQUESTED;
293 294 if (m_cancelationHandlers != null) {
294 295 Action<Exception> handler;
295 296 while (m_cancelationHandlers.TryDequeue(out handler))
296 297 handler(m_cancelationReason);
297 298 }
298 299 }
299 300 }
300 301
301 302 #endregion
302 303 }
303 304 }
304 305
@@ -1,25 +1,34
1 1 using System;
2 2
3 3 namespace Implab {
4 4 public class ActionChainTask : ActionChainTaskBase, IDeferred {
5 5 readonly Func<IPromise> m_task;
6 6
7 /// <summary>
8 /// Initializes a new instance of the <see cref="Implab.ActionChainTask"/> class.
9 /// </summary>
10 /// <param name="task">The operation which will be performed when the <see cref="Resolve()"/> is called.</param>
11 /// <param name="error">The error handler which will invoke when the <see cref="Reject(Exception)"/> is called or when the task fails with an error.</param>
12 /// <param name="cancel">The cancellation handler.</param>
13 /// <param name="autoCancellable">If set to <c>true</c> will automatically accept
14 /// all cancel requests before the task is started with <see cref="Resolve()"/>,
15 /// after that all requests are directed to the task.</param>
7 16 public ActionChainTask(Func<IPromise> task, Func<Exception, IPromise> error, Func<Exception, IPromise> cancel, bool autoCancellable) : base(error,cancel, autoCancellable) {
8 17 m_task = task;
9 18 }
10 19
11 20 public void Resolve() {
12 21 if (m_task != null && LockCancelation()) {
13 22 try {
14 23 var p = m_task();
15 24 p.On(SetResult, HandleErrorInternal, SetCancelled);
16 25 CancellationRequested(p.Cancel);
17 26 } catch(Exception err) {
18 27 HandleErrorInternal(err);
19 28 }
20 29 }
21 30 }
22 31
23 32 }
24 33 }
25 34
@@ -1,195 +1,262
1 1 using System;
2 using Implab.Formats;
3 2
4 3 namespace Implab.Components {
5 public class RunnableComponent : Disposable, IRunnable, IInitializable {
4 public abstract class RunnableComponent : IDisposable, IRunnable, IInitializable {
6 5 enum Commands {
7 6 Ok = 0,
8 7 Fail,
9 8 Init,
10 9 Start,
11 10 Stop,
12 11 Dispose,
13 12 Last = Dispose
14 13 }
15 14
16 15 class StateMachine {
17 16 static readonly ExecutionState[,] _transitions;
18 17
19 18 static StateMachine() {
20 19 _transitions = new ExecutionState[(int)ExecutionState.Last + 1, (int)Commands.Last + 1];
21 20
22 Edge(ExecutionState.Created, ExecutionState.Ready, Commands.Ok);
23 Edge(ExecutionState.Created, ExecutionState.Failed, Commands.Fail);
21 Edge(ExecutionState.Created, ExecutionState.Initializing, Commands.Init);
22 Edge(ExecutionState.Created, ExecutionState.Disposed, Commands.Dispose);
23
24 Edge(ExecutionState.Initializing, ExecutionState.Ready, Commands.Ok);
25 Edge(ExecutionState.Initializing, ExecutionState.Failed, Commands.Fail);
24 26
25 27 Edge(ExecutionState.Ready, ExecutionState.Starting, Commands.Start);
26 28 Edge(ExecutionState.Ready, ExecutionState.Disposed, Commands.Dispose);
27 29
28 30 Edge(ExecutionState.Starting, ExecutionState.Running, Commands.Ok);
29 31 Edge(ExecutionState.Starting, ExecutionState.Failed, Commands.Fail);
30 32 Edge(ExecutionState.Starting, ExecutionState.Stopping, Commands.Stop);
31 33 Edge(ExecutionState.Starting, ExecutionState.Disposed, Commands.Dispose);
32 34
33 35 Edge(ExecutionState.Running, ExecutionState.Failed, Commands.Fail);
34 36 Edge(ExecutionState.Running, ExecutionState.Stopping, Commands.Stop);
35 37 Edge(ExecutionState.Running, ExecutionState.Disposed, Commands.Dispose);
36 38
37 39 Edge(ExecutionState.Stopping, ExecutionState.Failed, Commands.Fail);
38 40 Edge(ExecutionState.Stopping, ExecutionState.Disposed, Commands.Ok);
39 Edge(ExecutionState.Stopping, ExecutionState.Disposed, Commands.Dispose);
41
42 Edge(ExecutionState.Failed, ExecutionState.Disposed, Commands.Dispose);
40 43 }
41 44
42 45 static void Edge(ExecutionState s1, ExecutionState s2, Commands cmd) {
43 46 _transitions[(int)s1, (int)cmd] = s2;
44 47 }
45 48
46 49 public ExecutionState State {
47 50 get;
48 51 private set;
49 52 }
50 53
51 54 public StateMachine(ExecutionState initial) {
52 55 State = initial;
53 56 }
54 57
55 58 public bool Move(Commands cmd) {
56 59 var next = _transitions[(int)State, (int)cmd];
57 60 if (next == ExecutionState.Undefined)
58 61 return false;
59 62 State = next;
60 63 return true;
61 64 }
62 65 }
63 66
64 67 IPromise m_pending;
65 68 Exception m_lastError;
66 69
67 70 readonly StateMachine m_stateMachine;
68 71
69 72 protected RunnableComponent(bool initialized) {
70 73 m_stateMachine = new StateMachine(initialized ? ExecutionState.Ready : ExecutionState.Created);
71 74 }
72 75
76 protected virtual int DisposeTimeout {
77 get {
78 return 10000;
79 }
80 }
81
73 82 void ThrowInvalidCommand(Commands cmd) {
83 if (m_stateMachine.State == ExecutionState.Disposed)
84 throw new ObjectDisposedException(ToString());
85
74 86 throw new InvalidOperationException(String.Format("Commnd {0} is not allowed in the state {1}", cmd, m_stateMachine.State));
75 87 }
76 88
77 protected void Move(Commands cmd) {
78 lock (m_stateMachine)
79 if (!m_stateMachine.Move(cmd))
80 ThrowInvalidCommand(cmd);
89 void Move(Commands cmd) {
90 if (!m_stateMachine.Move(cmd))
91 ThrowInvalidCommand(cmd);
81 92 }
82 93
83 protected void Fail(Exception err) {
84 lock (m_stateMachine) {
85 if (!m_stateMachine.Move(Commands.Fail))
86 ThrowInvalidCommand(Commands.Fail);
87
88 m_lastError = err;
89 }
90 }
91
92 protected void Success() {
93 Move(Commands.Ok);
94 }
95
96 protected void Invoke(Commands cmd, Action action) {
97 Move(cmd);
94 void Invoke(Commands cmd, Action action) {
95 lock (m_stateMachine)
96 Move(cmd);
97
98 98 try {
99 99 action();
100 Move(Commands.Ok);
100 lock(m_stateMachine)
101 Move(Commands.Ok);
102
101 103 } catch (Exception err) {
102 Fail(err);
104 lock (m_stateMachine) {
105 Move(Commands.Fail);
106 m_lastError = err;
107 }
103 108 throw;
104 109 }
105 110 }
106 111
107 protected IPromise InvokeAsync(Commands cmd, Func<IPromise> action) {
108 Move(cmd);
109 var medium = new Promise();
112 IPromise InvokeAsync(Commands cmd, Func<IPromise> action, Action<IPromise, IDeferred> chain) {
113 IPromise promise = null;
114 IPromise prev;
110 115
111 IPromise promise = null;
116 var task = new ActionChainTask(action, null, null, true);
117
118 lock (m_stateMachine) {
119 Move(cmd);
120
121 prev = m_pending;
112 122
113 promise = medium.Then(
114 () => {
115 lock(m_stateMachine) {
116 if (m_pending == promise) {
117 m_pending = null;
118 Move(Commands.Ok);
123 promise = task.Then(
124 () => {
125 lock(m_stateMachine) {
126 if (m_pending == promise) {
127 Move(Commands.Ok);
128 m_pending = null;
129 }
119 130 }
131 }, e => {
132 lock(m_stateMachine) {
133 if (m_pending == promise) {
134 Move(Commands.Fail);
135 m_pending = null;
136 m_lastError = e;
137 }
138 }
139 throw new PromiseTransientException(e);
140 },
141 r => {
142 lock(m_stateMachine) {
143 if (m_pending == promise) {
144 Move(Commands.Fail);
145 m_pending = null;
146 m_lastError = new OperationCanceledException("The operation has been cancelled", r);
147 }
148
149 }
150 throw new OperationCanceledException("The operation has been cancelled", r);
120 151 }
121 }, e => {
122 if (m_pending == promise) {
123 m_pending = null;
124 Fail(
125 }
126 }
127 );
152 );
128 153
154 m_pending = promise;
155 }
129 156
157 if (prev == null)
158 task.Resolve();
159 else
160 chain(prev, task);
130 161
131 return Safe.InvokePromise(action).Then(
132 Success,
133 Fail
134 );
135 }
136
137 void AddPending(IPromise result) {
138
162 return promise;
139 163 }
140 164
141 165
142 166 #region IInitializable implementation
143 167
144 168 public void Init() {
145 169 Invoke(Commands.Init, OnInitialize);
146 170 }
147 171
148 172 protected virtual void OnInitialize() {
149 173 }
150 174
151 175 #endregion
152 176
153 177 #region IRunnable implementation
154 178
155 179 public IPromise Start() {
156 Move(Commands.Start);
157
158 return Safe.InvokePromise(OnStart).Then(
159 () => {
160 Move(Commands.Ok);
161 Run();
162 },
163 () => {
164 Move(Commands.Fail);
165 }
166 );
180 return InvokeAsync(Commands.Start, OnStart, null);
167 181 }
168 182
169 183 protected virtual IPromise OnStart() {
170 184 return Promise.SUCCESS;
171 185 }
172 186
173 protected virtual void Run() {
187 public IPromise Stop() {
188 return InvokeAsync(Commands.Stop, OnStop, StopPending).Then(Dispose);
189 }
190
191 protected virtual IPromise OnStop() {
192 return Promise.SUCCESS;
174 193 }
175 194
176 public IPromise Stop() {
177 throw new NotImplementedException();
195 /// <summary>
196 /// Stops the current operation if one exists.
197 /// </summary>
198 /// <param name="current">Current.</param>
199 /// <param name="stop">Stop.</param>
200 protected virtual void StopPending(IPromise current, IDeferred stop) {
201 if (current == null) {
202 stop.Resolve();
203 } else {
204 current.On(stop.Resolve, stop.Reject, stop.CancelOperation);
205 current.Cancel();
206 }
178 207 }
179 208
180 209 public ExecutionState State {
181 210 get {
182 throw new NotImplementedException();
211 return m_stateMachine.State;
183 212 }
184 213 }
185 214
186 215 public Exception LastError {
187 216 get {
188 throw new NotImplementedException();
217 return m_lastError;
189 218 }
190 219 }
191 220
192 221 #endregion
222
223 #region IDisposable implementation
224
225 public void Dispose() {
226 IPromise pending;
227 lock (m_stateMachine) {
228 if (m_stateMachine.State == ExecutionState.Disposed)
229 return;
230
231 Move(Commands.Dispose);
232
233 GC.SuppressFinalize(this);
234
235 pending = m_pending;
236 m_pending = null;
237 }
238 if (pending != null) {
239 pending.Cancel();
240 pending.Timeout(DisposeTimeout).On(
241 () => Dispose(true, null),
242 err => Dispose(true, err),
243 reason => Dispose(true, new OperationCanceledException("The operation is cancelled", reason))
244 );
245 } else {
246 Dispose(true, m_lastError);
247 }
248 }
249
250 ~RunnableComponent() {
251 Dispose(false, null);
252 }
253
254 #endregion
255
256 protected virtual void Dispose(bool disposing, Exception lastError) {
257
258 }
259
193 260 }
194 261 }
195 262
@@ -1,299 +1,289
1 1 using System.Threading;
2 2 using System;
3 3 using Implab.Diagnostics;
4 4 using System.Collections.Generic;
5 5
6
7 #if NET_4_5
8 using System.Threading.Tasks;
9 #endif
10
11 6 namespace Implab {
12 7 public static class PromiseExtensions {
13 8 public static IPromise<T> DispatchToCurrentContext<T>(this IPromise<T> that) {
14 9 Safe.ArgumentNotNull(that, "that");
15 10 var context = SynchronizationContext.Current;
16 11 if (context == null)
17 12 return that;
18 13
19 14 var p = new SyncContextPromise<T>(context);
20 p.On(that.Cancel, PromiseEventType.Cancelled);
15 p.CancellationRequested(that.Cancel);
21 16
22 17 that.On(
23 18 p.Resolve,
24 19 p.Reject,
25 p.Cancel
20 p.CancelOperation
26 21 );
27 22 return p;
28 23 }
29 24
30 25 public static IPromise<T> DispatchToContext<T>(this IPromise<T> that, SynchronizationContext context) {
31 26 Safe.ArgumentNotNull(that, "that");
32 27 Safe.ArgumentNotNull(context, "context");
33 28
34 29 var p = new SyncContextPromise<T>(context);
35 p.On(that.Cancel, PromiseEventType.Cancelled);
36
30 p.CancellationRequested(that.Cancel);
37 31
38 32 that.On(
39 33 p.Resolve,
40 34 p.Reject,
41 p.Cancel
35 p.CancelOperation
42 36 );
43 37 return p;
44 38 }
45 39
46 40 /// <summary>
47 41 /// Ensures the dispatched.
48 42 /// </summary>
49 43 /// <returns>The dispatched.</returns>
50 44 /// <param name="that">That.</param>
51 45 /// <param name="head">Head.</param>
52 46 /// <param name="cleanup">Cleanup.</param>
53 47 /// <typeparam name="TPromise">The 1st type parameter.</typeparam>
54 48 /// <typeparam name="T">The 2nd type parameter.</typeparam>
55 49 public static TPromise EnsureDispatched<TPromise,T>(this TPromise that, IPromise<T> head, Action<T> cleanup) where TPromise : IPromise{
56 50 Safe.ArgumentNotNull(that, "that");
57 51 Safe.ArgumentNotNull(head, "head");
58 52
59 53 that.On(() => head.On(cleanup), PromiseEventType.Cancelled);
60 54
61 55 return that;
62 56 }
63 57
64 58 public static AsyncCallback AsyncCallback<T>(this Promise<T> that, Func<IAsyncResult,T> callback) {
65 59 Safe.ArgumentNotNull(that, "that");
66 60 Safe.ArgumentNotNull(callback, "callback");
67 61 var op = TraceContext.Instance.CurrentOperation;
68 62 return ar => {
69 63 TraceContext.Instance.EnterLogicalOperation(op,false);
70 64 try {
71 65 that.Resolve(callback(ar));
72 66 } catch (Exception err) {
73 67 that.Reject(err);
74 68 } finally {
75 69 TraceContext.Instance.Leave();
76 70 }
77 71 };
78 72 }
79 73
80 static void CancelCallback(object cookie) {
81 ((ICancellable)cookie).Cancel();
74 static void CancelByTimeoutCallback(object cookie) {
75 ((ICancellable)cookie).Cancel(new TimeoutException());
82 76 }
83 77
84 78 /// <summary>
85 79 /// Cancells promise after the specified timeout is elapsed.
86 80 /// </summary>
87 81 /// <param name="that">The promise to cancel on timeout.</param>
88 82 /// <param name="milliseconds">The timeout in milliseconds.</param>
89 83 /// <typeparam name="TPromise">The 1st type parameter.</typeparam>
90 84 public static TPromise Timeout<TPromise>(this TPromise that, int milliseconds) where TPromise : IPromise {
91 85 Safe.ArgumentNotNull(that, "that");
92 var timer = new Timer(CancelCallback, that, milliseconds, -1);
86 var timer = new Timer(CancelByTimeoutCallback, that, milliseconds, -1);
93 87 that.On(timer.Dispose, PromiseEventType.All);
94 88 return that;
95 89 }
96 90
97 91 public static IPromise Bundle(this ICollection<IPromise> that) {
98 92 Safe.ArgumentNotNull(that, "that");
99 93
100 94 int count = that.Count;
101 95 int errors = 0;
102 96 var medium = new Promise();
103 97
104 98 if (count == 0) {
105 99 medium.Resolve();
106 100 return medium;
107 101 }
108 102
109 103 medium.On(() => {
110 104 foreach(var p2 in that)
111 105 p2.Cancel();
112 106 }, PromiseEventType.ErrorOrCancel);
113 107
114 108 foreach (var p in that)
115 109 p.On(
116 110 () => {
117 111 if (Interlocked.Decrement(ref count) == 0)
118 112 medium.Resolve();
119 113 },
120 114 error => {
121 115 if (Interlocked.Increment(ref errors) == 1)
122 116 medium.Reject(
123 117 new Exception("The dependency promise is failed", error)
124 118 );
125 119 },
126 120 reason => {
127 121 if (Interlocked.Increment(ref errors) == 1)
128 122 medium.Cancel(
129 123 new Exception("The dependency promise is cancelled")
130 124 );
131 125 }
132 126 );
133 127
134 128 return medium;
135 129 }
136 130
137 131 public static IPromise<T[]> Bundle<T>(this ICollection<IPromise<T>> that) {
138 132 Safe.ArgumentNotNull(that, "that");
139 133
140 134 int count = that.Count;
141 135 int errors = 0;
142 136 var medium = new Promise<T[]>();
143 137 var results = new T[that.Count];
144 138
145 139 medium.On(() => {
146 140 foreach(var p2 in that)
147 141 p2.Cancel();
148 142 }, PromiseEventType.ErrorOrCancel);
149 143
150 144 int i = 0;
151 145 foreach (var p in that) {
152 146 var idx = i;
153 147 p.On(
154 148 x => {
155 149 results[idx] = x;
156 150 if (Interlocked.Decrement(ref count) == 0)
157 151 medium.Resolve(results);
158 152 },
159 153 error => {
160 154 if (Interlocked.Increment(ref errors) == 1)
161 155 medium.Reject(
162 156 new Exception("The dependency promise is failed", error)
163 157 );
164 158 },
165 159 reason => {
166 160 if (Interlocked.Increment(ref errors) == 1)
167 161 medium.Cancel(
168 162 new Exception("The dependency promise is cancelled", reason)
169 163 );
170 164 }
171 165 );
172 166 i++;
173 167 }
174 168
175 169 return medium;
176 170 }
177 171
178 172 public static IPromise Then(this IPromise that, Action success, Action<Exception> error, Action<Exception> cancel) {
179 173 Safe.ArgumentNotNull(that, "that");
180 174
181 175 var d = new ActionTask(success, error, cancel, false);
182 176 that.On(d.Resolve, d.Reject, d.CancelOperation);
183 if (success != null)
184 d.CancellationRequested(that.Cancel);
177 d.CancellationRequested(that.Cancel);
185 178 return d;
186 179 }
187 180
188 181 public static IPromise Then(this IPromise that, Action success, Action<Exception> error) {
189 182 return Then(that, success, error, null);
190 183 }
191 184
192 185 public static IPromise Then(this IPromise that, Action success) {
193 186 return Then(that, success, null, null);
194 187 }
195 188
196 189 public static IPromise<T> Then<T>(this IPromise that, Func<T> success, Func<Exception, T> error, Func<Exception, T> cancel) {
197 190 Safe.ArgumentNotNull(that, "that");
198 191
199 192 var d = new FuncTask<T>(success, error, cancel, false);
200 193 that.On(d.Resolve, d.Reject, d.CancelOperation);
201 if (success != null)
202 d.CancellationRequested(that.Cancel);
194 d.CancellationRequested(that.Cancel);
203 195 return d;
204 196 }
205 197
206 198 public static IPromise<T> Then<T>(this IPromise that, Func<T> success, Func<Exception, T> error) {
207 199 return Then(that, success, error, null);
208 200 }
209 201
210 202 public static IPromise<T> Then<T>(this IPromise that, Func<T> success) {
211 203 return Then(that, success, null, null);
212 204 }
213 205
214 206 public static IPromise<T2> Then<T, T2>(this IPromise<T> that, Func<T, T2> success, Func<Exception, T2> error, Func<Exception, T2> cancel) {
215 207 Safe.ArgumentNotNull(that, "that");
216 208 var d = new FuncTask<T,T2>(success, error, cancel, false);
217 209 that.On(d.Resolve, d.Reject, d.CancelOperation);
218 if (success != null)
219 d.CancellationRequested(that.Cancel);
210 d.CancellationRequested(that.Cancel);
220 211 return d;
221 212 }
222 213
223 214 public static IPromise<T2> Then<T, T2>(this IPromise<T> that, Func<T, T2> success, Func<Exception, T2> error) {
224 215 return Then(that, success, error, null);
225 216 }
226 217
227 218 public static IPromise<T2> Then<T, T2>(this IPromise<T> that, Func<T, T2> success) {
228 219 return Then(that, success, null, null);
229 220 }
230 221
231 222 #region chain traits
232 223 public static IPromise Chain(this IPromise that, Func<IPromise> success, Func<Exception,IPromise> error, Func<Exception,IPromise> cancel) {
233 224 Safe.ArgumentNotNull(that, "that");
234 225
235 226 var d = new ActionChainTask(success, error, cancel, false);
236 227 that.On(d.Resolve, d.Reject, d.CancelOperation);
237 if (success != null)
238 d.CancellationRequested(that.Cancel);
228 d.CancellationRequested(that.Cancel);
239 229 return d;
240 230 }
241 231
242 232 public static IPromise Chain(this IPromise that, Func<IPromise> success, Func<Exception,IPromise> error) {
243 233 return Chain(that, success, error, null);
244 234 }
245 235
246 236 public static IPromise Chain(this IPromise that, Func<IPromise> success) {
247 237 return Chain(that, success, null, null);
248 238 }
249 239
250 240 public static IPromise<T> Chain<T>(this IPromise that, Func<IPromise<T>> success, Func<Exception, IPromise<T>> error, Func<Exception, IPromise<T>> cancel) {
251 241 Safe.ArgumentNotNull(that, "that");
252 242
253 243 var d = new FuncChainTask<T>(success, error, cancel, false);
254 244 that.On(d.Resolve, d.Reject, d.CancelOperation);
255 245 if (success != null)
256 246 d.CancellationRequested(that.Cancel);
257 247 return d;
258 248 }
259 249
260 250 public static IPromise<T> Chain<T>(this IPromise that, Func<IPromise<T>> success, Func<Exception, IPromise<T>> error) {
261 251 return Chain(that, success, error, null);
262 252 }
263 253
264 254 public static IPromise<T> Chain<T>(this IPromise that, Func<IPromise<T>> success) {
265 255 return Chain(that, success, null, null);
266 256 }
267 257
268 258 public static IPromise<T2> Chain<T, T2>(this IPromise<T> that, Func<T, IPromise<T2>> success, Func<Exception, IPromise<T2>> error, Func<Exception, IPromise<T2>> cancel) {
269 259 Safe.ArgumentNotNull(that, "that");
270 260 var d = new FuncChainTask<T,T2>(success, error, cancel, false);
271 261 that.On(d.Resolve, d.Reject, d.CancelOperation);
272 262 if (success != null)
273 263 d.CancellationRequested(that.Cancel);
274 264 return d;
275 265 }
276 266
277 267 public static IPromise<T2> Chain<T, T2>(this IPromise<T> that, Func<T, IPromise<T2>> success, Func<Exception, IPromise<T2>> error) {
278 268 return Chain(that, success, error, null);
279 269 }
280 270
281 271 public static IPromise<T2> Chain<T, T2>(this IPromise<T> that, Func<T, IPromise<T2>> success) {
282 272 return Chain(that, success, null, null);
283 273 }
284 274
285 275 #endregion
286 276
287 277
288 278 #if NET_4_5
289 279
290 280 public static PromiseAwaiter<T> GetAwaiter<T>(this IPromise<T> that) {
291 281 Safe.ArgumentNotNull(that, "that");
292 282
293 283 return new PromiseAwaiter<T>(that);
294 284 }
295 285
296 286 #endif
297 287 }
298 288 }
299 289
General Comments 0
You need to be logged in to leave comments. Login now