##// END OF EJS Templates
Prerelease version of RunnableComponent...
cin -
r251:7c7e9ad6fe4a v3
parent child
Show More
@@ -0,0 +1,14
1 namespace Implab.Messaging {
2 public interface ISession {
3 /// <summary>
4 /// Starts message consumers, call this method after all adapters are ready
5 /// </summary>
6 void Start();
7
8 /// <summary>
9 /// Stops message consumers
10 /// </summary>
11 void Stop();
12
13 }
14 } No newline at end of file
@@ -0,0 +1,70
1 using System;
2 using System.Threading.Tasks;
3
4 namespace Implab {
5 public static class TaskHelpers {
6
7 public static async Task Then(this Task that, Action fulfilled, Action<Exception> rejected) {
8 Safe.ArgumentNotNull(that, nameof(that));
9 if (rejected != null) {
10 try {
11 await that;
12 } catch (Exception e) {
13 rejected(e);
14 return;
15 }
16 } else {
17 await that;
18 }
19
20 if (fulfilled != null)
21 fulfilled();
22 }
23
24 public static async Task Then(this Task that, Action fulfilled) {
25 Safe.ArgumentNotNull(that, nameof(that));
26 await that;
27 if (fulfilled != null)
28 fulfilled();
29 }
30
31 public static async Task Then(this Task that, Func<Task> fulfilled) {
32 Safe.ArgumentNotNull(that, nameof(that));
33 await that;
34 if (fulfilled != null)
35 await fulfilled();
36 }
37
38 public static async Task Finally(this Task that, Action handler) {
39 Safe.ArgumentNotNull(that, nameof(that));
40 try {
41 await that;
42 } finally {
43 if (handler != null)
44 handler();
45 }
46 }
47
48 public static async void Then(this Task that, IResolvable next) {
49 try {
50 await that;
51 } catch (Exception e) {
52 next.Reject(e);
53 return;
54 }
55 next.Resolve();
56 }
57
58 public static async void Then<T>(this Task<T> that, IResolvable<T> next) {
59 T result;
60 try {
61 result = await that;
62 } catch (Exception e) {
63 next.Reject(e);
64 return;
65 }
66 next.Resolve(result);
67 }
68
69 }
70 } No newline at end of file
@@ -1,4 +1,5
1 1 using System;
2 using System.Diagnostics;
2 3 using System.Threading;
3 4 using Xunit;
4 5
@@ -9,9 +10,20 namespace Implab.Test
9 10 [Fact]
10 11 public void Test1()
11 12 {
12 using(var cts = new CancellationTokenSource(1000)) {
13 PromiseHelper.Sleep(10000, cts.Token).Join();
13 var listener = new TextWriterTraceListener(Console.Out);
14 var source = new TraceSource("Custom",SourceLevels.ActivityTracing);
15
16 source.Listeners.Add(listener);
17
18 Trace.Listeners.Add(listener);
19 Trace.WriteLine("Hello!");
20 Trace.CorrelationManager.StartLogicalOperation();
21 Trace.WriteLine("Inner");
22 foreach(var x in Trace.CorrelationManager.LogicalOperationStack)
23 Trace.WriteLine($"-{x}");
24 source.TraceEvent(TraceEventType.Information, 1, "source event");
25 source.TraceData(TraceEventType.Start, 1, DateTime.Now);
26 Trace.CorrelationManager.StopLogicalOperation();
14 27 }
15 28 }
16 29 }
17 }
@@ -20,7 +20,7 namespace Implab.Automaton {
20 20 #region IDFADefinition implementation
21 21
22 22 public bool IsFinalState(int s) {
23 Safe.ArgumentInRange(s, 0, m_stateCount, "s");
23 Safe.ArgumentInRange(s >= 0 && s < m_stateCount, nameof(s));
24 24
25 25 return m_finalStates.Contains(s);
26 26 }
@@ -46,7 +46,7 namespace Implab.Automaton {
46 46 #endregion
47 47
48 48 public void SetInitialState(int s) {
49 Safe.ArgumentAssert(s >= 0, "s");
49 Safe.ArgumentInRange(s >= 0, nameof(s));
50 50 m_stateCount = Math.Max(m_stateCount, s + 1);
51 51 m_initialState = s;
52 52 }
@@ -57,9 +57,9 namespace Implab.Automaton {
57 57 }
58 58
59 59 public void Add(AutomatonTransition item) {
60 Safe.ArgumentAssert(item.s1 >= 0, "item");
61 Safe.ArgumentAssert(item.s2 >= 0, "item");
62 Safe.ArgumentAssert(item.edge >= 0, "item");
60 Safe.ArgumentAssert(item.s1 >= 0, nameof(item));
61 Safe.ArgumentAssert(item.s2 >= 0, nameof(item));
62 Safe.ArgumentAssert(item.edge >= 0, nameof(item));
63 63
64 64 m_stateCount = Math.Max(m_stateCount, Math.Max(item.s1, item.s2) + 1);
65 65 m_symbolCount = Math.Max(m_symbolCount, item.edge + 1);
@@ -15,6 +15,8
15 15
16 16 Stopping,
17 17
18 Stopped,
19
18 20 Failed,
19 21
20 22 Disposed,
@@ -11,9 +11,16 namespace Implab.Components {
11 11 /// Completes initialization.
12 12 /// </summary>
13 13 /// <remarks>
14 /// <para>
14 15 /// Normally virtual methods shouldn't be called from the constructor, due to the incomplete object state, but
15 16 /// they can be called from this method. This method is also usefull when we constructing a complex grpah
16 17 /// of components where cyclic references may take place.
18 /// </para>
19 /// <para>
20 /// In asyncronous patterns <see cref="Initialize()"/> can be called
21 /// to start initialization and the <see cref="IRunnable.Completion"/>
22 /// property can be used to track operation completion.
23 /// </para>
17 24 /// </remarks>
18 25 void Initialize();
19 26 }
@@ -6,23 +6,53 namespace Implab.Components {
6 6 /// <summary>
7 7 /// Interface for the component which performs a long running task.
8 8 /// </summary>
9 public interface IRunnable : IDisposable {
9 /// <remarks>
10 /// The access to the runnable component should be sequential, the
11 /// componet should support asynchronous completion of the initiated
12 /// operation but operations itself must be initiated sequentially.
13 /// </remarks>
14 public interface IRunnable {
10 15 /// <summary>
11 16 /// Starts this instance
12 17 /// </summary>
18 /// <remarks>
19 /// This operation is cancellable and it's expected to move to
20 /// the failed state or just ignore the cancellation request,
21 /// </remarks>
13 22 void Start(CancellationToken ct);
14 23
15 24 /// <summary>
16 /// Stops this instance and releases all resources, after the instance is stopped it is moved to Disposed state and can't be reused.
25 /// Stops this instance and releases all resources, after the
26 /// instance is stopped it is moved to Disposed state and
27 /// can't be reused.
17 28 /// </summary>
29 /// <remarks>
30 /// If the componet was in the starting state the pending operation
31 /// will be requested to cancel. The stop operatin will be
32 /// performed only if the component in the running state.
33 /// </remarks>
18 34 void Stop(CancellationToken ct);
19 35
20 Task<ExecutionState> Completion { get; }
36 /// <summary>
37 /// The result of the last started operation. This property reflects
38 /// only the result of the last started operation and therefore should
39 /// change only if a new operation is initiated.
40 /// </summary>
41 Task Completion { get; }
21 42
43 /// <summary>
44 /// Current state of the componenet
45 /// </summary>
22 46 ExecutionState State { get; }
23 47
48 /// <summary>
49 /// Event to monitor the state of the component.
50 /// </summary>
24 51 event EventHandler<StateChangeEventArgs> StateChanged;
25 52
53 /// <summary>
54 /// The last error
55 /// </summary>
26 56 Exception LastError { get; }
27 57 }
28 58 }
@@ -26,7 +26,7 namespace Implab.Components {
26 26 }
27 27
28 28 protected ObjectPool(int size) {
29 Safe.ArgumentInRange(size,1,size,"size");
29 Safe.ArgumentInRange(size > 0, nameof(size));
30 30
31 31 m_size = size;
32 32 }
@@ -1,57 +1,273
1 1 using System;
2 using System.Diagnostics;
2 3 using System.Threading;
3 4 using System.Threading.Tasks;
4 5
5 namespace Implab.Components
6 {
7 public class RunnableComponent : IRunnable {
6 namespace Implab.Components {
7 /// <summary>
8 /// Base class for implementing components which support start and stop operations,
9 /// such components may represent running services.
10 /// </summary>
11 /// <remarks>
12 /// This class provides a basic lifecycle from the creation to the
13 /// termination of the component.
14 /// </remarks>
15 public class RunnableComponent : IRunnable, IInitializable, IDisposable {
16
17 /// <summary>
18 /// This class bound <see cref="CancellationTokenSource"/> lifetime to the task,
19 /// when the task completes the associated token source will be disposed.
20 /// </summary>
21 class AsyncOperationDescriptor {
22
23 public static AsyncOperationDescriptor None { get; } = new AsyncOperationDescriptor();
24
25 readonly CancellationTokenSource m_cts;
26
27 bool m_done;
28
29 public CancellationToken Token {
30 get { return m_cts == null ? CancellationToken.None : m_cts.Token; }
31 }
32
33 public Task Task { get; private set; }
34
35 private AsyncOperationDescriptor(Task task, CancellationTokenSource cts) {
36 m_cts = cts;
37 Task = Chain(task);
38 }
39
40 private AsyncOperationDescriptor() {
41 Task = Task.CompletedTask;
42 }
8 43
44 public void Cancel() {
45 if (m_cts != null) {
46 lock (m_cts) {
47 if (!m_done)
48 m_cts.Cancel();
49 }
50 }
51 }
52
53 void Done() {
54 if (m_cts != null) {
55 lock (m_cts) {
56 m_done = true;
57 m_cts.Dispose();
58 }
59 } else {
60 m_done = true;
61 }
62 }
63
64 async Task Chain(Task other) {
65 try {
66 await other;
67 } finally {
68 Done();
69 }
70 }
71
72 public static AsyncOperationDescriptor Create(Func<CancellationToken, Task> factory, CancellationToken ct) {
73 var cts = ct.CanBeCanceled ?
74 CancellationTokenSource.CreateLinkedTokenSource(ct) :
75 new CancellationTokenSource();
76
77 return new AsyncOperationDescriptor(factory(cts.Token), cts);
78 }
79
80 }
81
82 // this lock is used to synchronize state flow of the component during
83 // completions or the operations.
9 84 readonly object m_lock = new object();
10 85
11 CancellationTokenSource m_cts;
86 // current operation cookie, used to check wheather a call to
87 // MoveSuccess/MoveFailed method belongs to the current
88 // operation, if cookies didn't match ignore completion result.
89 object m_cookie;
12 90
13 public Task<ExecutionState> Completion {
14 get;
15 private set;
91 AsyncOperationDescriptor m_current = AsyncOperationDescriptor.None;
92
93 ExecutionState m_state;
94
95
96 protected RunnableComponent(bool initialized) {
97 State = initialized ? ExecutionState.Ready : ExecutionState.Created;
16 98 }
17 99
18 public ExecutionState State => throw new NotImplementedException();
100 public Task Completion {
101 get { return m_current.Task; }
102 }
19 103
20 public Exception LastError => throw new NotImplementedException();
104 public ExecutionState State {
105 get { return m_state; }
106 private set {
107 if (m_state != value) {
108 m_state = value;
109 StateChanged.DispatchEvent(this, new StateChangeEventArgs {
110 State = value,
111 LastError = LastError
112 });
113 }
114 }
115 }
116
117 public Exception LastError { get; private set; }
21 118
22 119 public event EventHandler<StateChangeEventArgs> StateChanged;
23 120
121 /// <summary>
122 /// Releases all resources used by the current component regardless of its
123 /// execution state.
124 /// </summary>
125 /// <remarks>
126 /// Calling to this method may result unexpedted results if the component
127 /// isn't in the stopped state. Call this method after the component is
128 /// stopped if needed or if the component is in the failed state.
129 /// </remarks>
24 130 public void Dispose() {
25 lock(m_lock) {
131 bool dispose = false;
132 if (dispose) {
26 133 Dispose(true);
27 134 GC.SuppressFinalize(this);
28 135 }
29 136 }
30 137
138 ~RunnableComponent() {
139 Dispose(false);
140 }
141
142 /// <summary>
143 /// Releases all resources used by the current component regardless of its
144 /// execution state.
145 /// </summary>
146 /// <param name="disposing">Indicates that the component is disposed
147 /// during a normal disposing or during GC.</param>
31 148 protected virtual void Dispose(bool disposing) {
32 if (disposing) {
33 Safe.Dispose(m_cts);
149 }
150
151 public void Initialize() {
152 var cookie = new object();
153 if (MoveInitialize(cookie))
154 ScheduleTask(InitializeInternalAsync, CancellationToken.None, cookie);
34 155 }
156
157 /// <summary>
158 /// This method is used for initialization during a component creation.
159 /// </summary>
160 /// <param name="ct">A cancellation token for this operation</param>
161 /// <remarks>
162 /// This method should be used for short and mostly syncronous operations,
163 /// other operations which require time to run shoud be placed in
164 /// <see cref="StartInternal(CancellationToken)"/> method.
165 /// </remarks>
166 protected virtual Task InitializeInternalAsync(CancellationToken ct) {
167 return Task.CompletedTask;
35 168 }
36 169
37 170 public void Start(CancellationToken ct) {
171 var cookie = new object();
172 if (MoveStart(cookie))
173 ScheduleTask(StartInternal, ct, cookie);
174 }
175
176 protected virtual Task StartInternal(CancellationToken ct) {
177 return Task.CompletedTask;
178 }
179
180 public void Stop(CancellationToken ct) {
181 var cookie = new object();
182 if (MoveStop(cookie))
183 ScheduleTask(StopAsync, ct, cookie);
184 }
185
186 async Task StopAsync(CancellationToken ct) {
187 m_current.Cancel();
188 await Completion;
189
190 ct.ThrowIfCancellationRequested();
191
192 await StopInternalAsync(ct);
193 }
194
195 protected virtual Task StopInternalAsync(CancellationToken ct) {
196 return Task.CompletedTask;
197 }
198
199
200 #region state management
201
202 bool MoveInitialize(object cookie) {
38 203 lock(m_lock) {
39 switch (State)
40 {
204 if (State != ExecutionState.Created)
205 return false;
206 State = ExecutionState.Initializing;
207 m_cookie = cookie;
208 return true;
209 }
210 }
211
212 bool MoveStart(object cookie) {
213 lock (m_lock) {
214 if (State != ExecutionState.Ready)
215 return false;
216 State = ExecutionState.Starting;
217 m_cookie = cookie;
218 return true;
219 }
220 }
41 221
42 default:
43 throw new InvalidOperationException();
222 bool MoveStop(object cookie) {
223 lock (m_lock) {
224 if (State != ExecutionState.Starting && State != ExecutionState.Running)
225 return false;
226 State = ExecutionState.Stopping;
227 m_cookie = cookie;
228 return true;
229 }
230 }
231
232 void MoveSuccess(object cookie) {
233 lock (m_lock) {
234 if (m_cookie != cookie)
235 return;
236 switch (State) {
237 case ExecutionState.Initializing:
238 State = ExecutionState.Ready;
239 break;
240 case ExecutionState.Starting:
241 State = ExecutionState.Running;
242 break;
243 case ExecutionState.Stopping:
244 State = ExecutionState.Stopped;
245 break;
44 246 }
45 247 }
46 248 }
47 249
48 public void Stop(CancellationToken ct) {
49 throw new NotImplementedException();
250 void MoveFailed(Exception err, object cookie) {
251 lock (m_lock) {
252 if (m_cookie != cookie)
253 return;
254 LastError = err;
255 State = ExecutionState.Failed;
256 }
50 257 }
51 258
52 protected virtual Task StartImpl(CancellationToken ct) {
259
53 260
54 return Task.CompletedTask;
261 protected async void ScheduleTask(Func<CancellationToken, Task> next, CancellationToken ct, object cookie) {
262 try {
263 m_current = AsyncOperationDescriptor.Create(next, ct);
264 await m_current.Task;
265 MoveSuccess(cookie);
266 } catch (Exception e) {
267 MoveFailed(e, cookie);
55 268 }
56 269 }
270
271 #endregion
272 }
57 273 } No newline at end of file
@@ -1,5 +1,6
1 1 using System;
2 2 using System.Diagnostics;
3 using System.Threading.Tasks;
3 4
4 5 namespace Implab {
5 6 /// <summary>
@@ -49,5 +50,15 namespace Implab {
49 50 }
50 51 }
51 52
53 public virtual void Resolve(Task thenable) {
54 if (thenable == null)
55 Reject(new Exception("The promise or task are expected"));
56 try {
57 thenable.Then(this);
58 } catch(Exception err) {
59 Reject(err);
60 }
61 }
62
52 63 }
53 64 } No newline at end of file
@@ -1,5 +1,6
1 1 using System;
2 2 using System.Diagnostics;
3 using System.Threading.Tasks;
3 4
4 5 namespace Implab {
5 6 public class Deferred<T> : IResolvable<T> {
@@ -45,5 +46,16 namespace Implab {
45 46 Reject(err);
46 47 }
47 48 }
49
50 public virtual void Resolve(Task<T> thenable) {
51 if (thenable == null)
52 Reject(new Exception("The promise or task are expected"));
53
54 try {
55 thenable.Then(this);
56 } catch (Exception err) {
57 Reject(err);
58 }
59 }
48 60 }
49 61 } No newline at end of file
@@ -43,11 +43,8 namespace Implab.Formats.Json {
43 43
44 44 public static JsonStringScanner Create(string data, int offset, int length) {
45 45 Safe.ArgumentNotNull(data, nameof(data));
46 Safe.ArgumentGreaterThan(offset, 0, nameof(offset));
47 Safe.ArgumentGreaterThan(length, 0, nameof(length));
48
49 if (offset + length > data.Length)
50 throw new ArgumentOutOfRangeException("Specified offset and length are out of the string bounds");
46 Safe.ArgumentInRange(offset >= 0 && offset < data.Length , nameof(offset));
47 Safe.ArgumentInRange(length >= 0 && offset + length <= data.Length, nameof(length));
51 48
52 49 if (length <= _defaultBuffer) {
53 50 var buffer = new char[length];
@@ -63,11 +60,8 namespace Implab.Formats.Json {
63 60
64 61 public static JsonStringScanner Create(char[] data, int offset, int length) {
65 62 Safe.ArgumentNotNull(data, nameof(data));
66 Safe.ArgumentGreaterThan(offset, 0, nameof(offset));
67 Safe.ArgumentGreaterThan(length, 0, nameof(length));
68
69 if (offset + length > data.Length)
70 throw new ArgumentOutOfRangeException("Specified offset and length are out of the array bounds");
63 Safe.ArgumentInRange(offset >= 0 && offset < data.Length , nameof(offset));
64 Safe.ArgumentInRange(length >= 0 && offset + length <= data.Length, nameof(length));
71 65
72 66 return new JsonStringScanner(null, data, offset, offset + length, offset + length);
73 67
@@ -3,7 +3,9 using System.Threading.Tasks;
3 3
4 4 namespace Implab.Messaging {
5 5 public interface IConsumer<T> {
6 Task<T> Receive(CancellationToken ct);
6 T Receive(CancellationToken ct);
7
8 Task<T> ReceiveAsync(CancellationToken ct);
7 9
8 10 bool TryReceive(out T message);
9 11 }
@@ -4,8 +4,12 using System.Threading.Tasks;
4 4
5 5 namespace Implab.Messaging {
6 6 public interface IProducer<T> {
7 void PostMessage(T message, CancellationToken ct);
8
7 9 Task PostMessageAsync(T message, CancellationToken ct);
8 10
11 void PostMessages(IEnumerable<T> messages, CancellationToken ct);
12
9 13 Task PostMessagesAsync(IEnumerable<T> messages, CancellationToken ct);
10 14 }
11 15 } No newline at end of file
@@ -54,7 +54,7 namespace Implab.Parallels {
54 54 }
55 55
56 56 public T[] GetRange(int max, int timeout) {
57 Safe.ArgumentInRange(max, 1, Int32.MaxValue, "max");
57 Safe.ArgumentInRange(max > 0 , nameof(max));
58 58
59 59 var buffer = new T[max];
60 60 int actual;
@@ -83,7 +83,7 namespace Implab.Parallels {
83 83 }
84 84
85 85 public T[] GetRange(int max) {
86 Safe.ArgumentInRange(max, 1, Int32.MaxValue, "max");
86 Safe.ArgumentInRange(max > 0, nameof(max));
87 87
88 88 var buffer = new T[max];
89 89 int actual;
@@ -6,6 +6,7 using System.Text.RegularExpressions;
6 6 using System.Diagnostics;
7 7 using System.Collections;
8 8 using System.Runtime.CompilerServices;
9 using System.Threading.Tasks;
9 10
10 11 #if NET_4_5
11 12 using System.Threading.Tasks;
@@ -48,14 +49,14 namespace Implab
48 49 }
49 50
50 51 [MethodImpl(MethodImplOptions.AggressiveInlining)]
51 internal static void ArgumentGreaterThan(int value, int min, string paramName) {
52 internal static void ArgumentGreaterEqThan(int value, int min, string paramName) {
52 53 if (value < min)
53 54 throw new ArgumentOutOfRangeException(paramName);
54 55 }
55 56
56 57 [MethodImpl(MethodImplOptions.AggressiveInlining)]
57 public static void ArgumentInRange(int value, int min, int max, string paramName) {
58 if (value < min || value > max)
58 public static void ArgumentInRange(bool condition, string paramName) {
59 if (!condition)
59 60 throw new ArgumentOutOfRangeException(paramName);
60 61 }
61 62
@@ -144,6 +145,12 namespace Implab
144 145 public static void NoWait(IPromise promise) {
145 146 }
146 147
148 public static void NoWait(Task promise) {
149 }
150
151 public static void NoWait<T>(Task<T> promise) {
152 }
153
147 154 [DebuggerStepThrough]
148 155 public static IPromise<T> Run<T>(Func<IPromise<T>> action) {
149 156 ArgumentNotNull(action, "action");
@@ -163,7 +163,7 namespace Implab.Xml {
163 163 }
164 164
165 165 public override string GetAttribute(int i) {
166 Safe.ArgumentInRange(i, 0, AttributeCount - 1, nameof(i));
166 Safe.ArgumentInRange(i >= 0 && i < AttributeCount, nameof(i));
167 167 return m_attributes[i].Value;
168 168 }
169 169
@@ -38,6 +38,20 namespace Implab.Xml {
38 38 SerializersPool<T>.Instance.Serialize(writer, obj);
39 39 }
40 40
41 public static void SerializeToElementChild<T>(XmlElement element, T obj) {
42 using(var writer = element.CreateNavigator().AppendChild())
43 SerializersPool<T>.Instance.Serialize(writer, obj);
44 }
45
46 public static T Deserialize<T>(XmlReader reader) {
47 return SerializersPool<T>.Instance.Deserialize(reader);
48 }
49
50 public static T DeserializeFromFile<T>(string file) {
51 using(var reader = XmlReader.Create(File.OpenText(file)))
52 return Deserialize<T>(reader);
53 }
54
41 55 public static T DeserializeFromString<T>(string data) {
42 56 return SerializersPool<T>.Instance.DeserializeFromString(data);
43 57 }
General Comments 3
Under Review
author

Auto status change to "Under Review"

Approved
author

ok, latest stable version should be in default

You need to be logged in to leave comments. Login now