##// END OF EJS Templates
Removed obsolete App, ComponentContainer...
cin -
r256:c52691faaf21 v3
parent child
Show More
@@ -0,0 +1,37
1 using System;
2 using System.Threading.Tasks;
3
4 namespace Implab.Components {
5 /// <summary>
6 /// An interface for asynchronous components.
7 /// </summary>
8 /// <remarks>
9 /// <para>
10 /// Асинхронные компоненты не предназначены для одновременного использования несколькими клиентами,
11 /// однако существуют внутренние процессы, изменяющее состояние компонент без участия клиента.
12 /// Данный интерфейс определяет протокол взаимодействия с компонентой, при которм компоненте
13 /// посылаются сигналы от клиента, в ответ на которые компонента меняет свойство <see cref="Completion"/>,
14 /// данное свойство содержит в себе новую задачу, выполняемую компонентой и данное свойство
15 /// может измениться только при получении нового сигнала от клиента.
16 /// </para>
17 /// <para>
18 /// В дополнение к <see cref="Completion"/> компонента может определять другие свойства, в
19 /// которых будет передаваться информация о результате выполнения операции.
20 /// </para>
21 /// <para>
22 /// Особое внимание следует уделить реализации <see cref="IDisposable"/>, который по своей природе
23 /// синхронный, данное правило безусловно можно нарушить, но тогда могут возникнуть проблемы с
24 /// тем, что ресурсы еще не освободились, а ход программы продолжается, что приведет к ошибкам,
25 /// например при попытке получить ресуср другим объектом, либо при заврешении программы.
26 /// </para>
27 /// <seealso href="https://blog.stephencleary.com/2013/01/async-oop-0-introduction.html"/>
28 /// </remarks>
29 public interface IAsyncComponent {
30 /// <summary>
31 /// The result of the last started operation. This property reflects
32 /// only the result of the last started operation and therefore should
33 /// change only if a new operation is initiated.
34 /// </summary>
35 Task Completion { get; }
36 }
37 } No newline at end of file
@@ -1,17 +1,17
1 1 <Project Sdk="Microsoft.NET.Sdk">
2 2
3 3 <PropertyGroup>
4 4 <TargetFramework>net46</TargetFramework>
5 <FrameworkPathOverride Condition="'$(TargetFramework)'=='net45' and '$(OSTYPE)'=='linux'">/usr/lib/mono/4.5/</FrameworkPathOverride>
5 <FrameworkPathOverride Condition="'$(TargetFramework)'=='net46' and '$(OSTYPE)'=='linux'">/usr/lib/mono/4.5/</FrameworkPathOverride>
6 6
7 7 <IsPackable>false</IsPackable>
8 8 </PropertyGroup>
9 9 <ItemGroup>
10 10 <ProjectReference Include="../Implab/Implab.csproj" />
11 11 <PackageReference Include="Microsoft.NET.Test.Sdk" Version="15.5.0" />
12 12 <PackageReference Include="xunit" Version="2.3.1" />
13 13 <PackageReference Include="xunit.runner.visualstudio" Version="2.3.1" />
14 14 <DotNetCliToolReference Include="dotnet-xunit" Version="2.3.1" />
15 15 </ItemGroup>
16 16
17 17 </Project>
@@ -1,29 +1,40
1 1 using System;
2 2 using System.Diagnostics;
3 3 using System.Threading;
4 4 using Implab.Diagnostics;
5 using System.Linq;
5 6 using Xunit;
6 7
7 8 namespace Implab.Test {
8 9 using System.Threading.Tasks;
9 10 using static Trace<UnitTest1>;
10 11 public class UnitTest1 {
11 12 [Fact]
12 13 public async Task Test1() {
13 14 var listener = new SimpleTraceListener(Console.Out);
15 listener.TraceOutputOptions |= TraceOptions.ThreadId;
14 16
15 17 var source = TraceSource;
16 18 source.Switch.Level = SourceLevels.All;
17 19
18 20 source.Listeners.Add(listener);
19 21
20 using (var op = LogicalOperation(nameof(Test1)))
21 using (LogicalOperation("InnerOperation")){
22 using (LogicalOperation("Test1")){
22 23 await Task.Yield();
23 Log("Inner");
24 await Task.Yield();
25 Log("source event");
24 Log(String.Join(", ", Trace.CorrelationManager.LogicalOperationStack.Cast<object>().Select(x => x.ToString())));
25 await AsyncDummy();
26 Log(String.Join(", ", Trace.CorrelationManager.LogicalOperationStack.Cast<object>().Select(x => x.ToString())));
27 }
28 }
29
30 async Task AsyncDummy() {
31 using(LogicalOperation("OuterDummy"))
32 using(LogicalOperation("InnerDummy")) {
33 Log(String.Join(", ", Trace.CorrelationManager.LogicalOperationStack.Cast<object>().Select(x => x.ToString())));
34 await Task.Delay(1);
35 Log(String.Join(", ", Trace.CorrelationManager.LogicalOperationStack.Cast<object>().Select(x => x.ToString())));
36 }
37 Log(String.Join(", ", Trace.CorrelationManager.LogicalOperationStack.Cast<object>().Select(x => x.ToString())));
26 38 }
27 39 }
28 40 }
29 }
@@ -1,102 +1,102
1 1 using System;
2 2 using Implab.Parallels;
3 3 using System.Threading;
4 4 using System.Diagnostics;
5 5 using System.Diagnostics.CodeAnalysis;
6 6
7 7 namespace Implab.Components {
8 8 /// <summary>
9 /// The base class for implementing pools of disposable objects.
9 /// The base class for implementing thread-safe pools of disposable objects.
10 10 /// </summary>
11 11 /// <remarks>
12 /// <para>This class maintains a set of pre-created objects and which are frequently allocated and released
12 /// <para>This class maintains a set of pre-created objects which are frequently allocated and released
13 13 /// by clients. The pool maintains maximum number of unsued object, any object above this limit is disposed,
14 14 /// if the pool is empty it will create new objects on demand.</para>
15 15 /// <para>Instances of this class are thread-safe.</para>
16 16 /// </remarks>
17 17 public abstract class DisposablePool<T> : IDisposable {
18 18 readonly int m_size;
19 19 readonly AsyncQueue<T> m_queue = new AsyncQueue<T>();
20 20
21 21 [SuppressMessage("Microsoft.Design", "CA1000:DoNotDeclareStaticMembersOnGenericTypes")]
22 22 static readonly bool _isValueType = typeof(T).IsValueType;
23 23
24 24 bool m_disposed;
25 25
26 26 int m_count;
27 27
28 28 protected DisposablePool(int size) {
29 29 m_size = size;
30 30 }
31 31
32 32 protected DisposablePool() : this(Environment.ProcessorCount+1) {
33 33 }
34 34
35 35 public T Allocate() {
36 36 if (m_disposed)
37 37 throw new ObjectDisposedException(ToString());
38 38
39 39 T instance;
40 40 if (m_queue.TryDequeue(out instance)) {
41 41 Interlocked.Decrement(ref m_count);
42 42 } else {
43 43 instance = CreateInstance();
44 44 Debug.Assert(!Object.Equals(instance, default(T)) || _isValueType);
45 45 }
46 46 return instance;
47 47 }
48 48
49 49 protected abstract T CreateInstance();
50 50
51 51 protected virtual void CleanupInstance(T instance) {
52 52 }
53 53
54 54 public void Release(T instance) {
55 55 if ( Object.Equals(instance,default(T)) && !_isValueType)
56 56 return;
57 57
58 58 Thread.MemoryBarrier();
59 59 if (m_count < m_size && !m_disposed) {
60 60 Interlocked.Increment(ref m_count);
61 61
62 62 CleanupInstance(instance);
63 63
64 64 m_queue.Enqueue(instance);
65 65
66 66 // пока элемент возвращался в кеш, была начата операция освобождения всего кеша
67 67 // и возможно уже законцена, в таком случае следует извлечь элемент обратно и
68 68 // освободить его. Если операция освобождения кеша еще не заврешилась, то будет
69 69 // изъят и освобожден произвольный элемен, что не повлияет на ход всего процесса.
70 70 if (m_disposed && m_queue.TryDequeue(out instance) && instance is IDisposable)
71 71 ((IDisposable)instance).Dispose() ;
72 72
73 73 } else {
74 74 var disposable = instance as IDisposable;
75 75 if (disposable != null)
76 76 disposable.Dispose();
77 77 }
78 78 }
79 79
80 80 protected virtual void Dispose(bool disposing) {
81 81 if (disposing) {
82 82 m_disposed = true;
83 83 T instance;
84 84 while (m_queue.TryDequeue(out instance)) {
85 85 var disposable = instance as IDisposable;
86 86 if (disposable != null)
87 87 disposable.Dispose();
88 88 }
89 89 }
90 90 }
91 91
92 92 #region IDisposable implementation
93 93
94 94 public void Dispose() {
95 95 Dispose(true);
96 96 GC.SuppressFinalize(this);
97 97 }
98 98
99 99 #endregion
100 100 }
101 101 }
102 102
@@ -1,59 +1,52
1 1 using System;
2 2 using System.Threading;
3 3 using System.Threading.Tasks;
4 4
5 5 namespace Implab.Components {
6 6 /// <summary>
7 7 /// Interface for the component which performs a long running task.
8 8 /// </summary>
9 9 /// <remarks>
10 10 /// The access to the runnable component should be sequential, the
11 11 /// componet should support asynchronous completion of the initiated
12 12 /// operation but operations itself must be initiated sequentially.
13 13 /// </remarks>
14 14 public interface IRunnable {
15 15 /// <summary>
16 16 /// Starts this instance
17 17 /// </summary>
18 18 /// <remarks>
19 19 /// This operation is cancellable and it's expected to move to
20 20 /// the failed state or just ignore the cancellation request,
21 21 /// </remarks>
22 22 void Start(CancellationToken ct);
23 23
24 24 /// <summary>
25 25 /// Stops this instance and releases all resources, after the
26 26 /// instance is stopped it is moved to Disposed state and
27 27 /// can't be reused.
28 28 /// </summary>
29 29 /// <remarks>
30 30 /// If the componet was in the starting state the pending operation
31 31 /// will be requested to cancel. The stop operatin will be
32 32 /// performed only if the component in the running state.
33 33 /// </remarks>
34 34 void Stop(CancellationToken ct);
35 35
36 36 /// <summary>
37 /// The result of the last started operation. This property reflects
38 /// only the result of the last started operation and therefore should
39 /// change only if a new operation is initiated.
40 /// </summary>
41 Task Completion { get; }
42
43 /// <summary>
44 /// Current state of the componenet
37 /// Current state of the componenet, dynamically reflects the current state.
45 38 /// </summary>
46 39 ExecutionState State { get; }
47 40
48 41 /// <summary>
49 42 /// Event to monitor the state of the component.
50 43 /// </summary>
51 44 event EventHandler<StateChangeEventArgs> StateChanged;
52 45
53 46 /// <summary>
54 47 /// The last error
55 48 /// </summary>
56 49 Exception LastError { get; }
57 50 }
58 51 }
59 52
@@ -1,273 +1,290
1 1 using System;
2 2 using System.Diagnostics;
3 3 using System.Threading;
4 4 using System.Threading.Tasks;
5 5
6 6 namespace Implab.Components {
7 7 /// <summary>
8 8 /// Base class for implementing components which support start and stop operations,
9 9 /// such components may represent running services.
10 10 /// </summary>
11 11 /// <remarks>
12 12 /// This class provides a basic lifecycle from the creation to the
13 13 /// termination of the component.
14 14 /// </remarks>
15 public class RunnableComponent : IRunnable, IInitializable, IDisposable {
15 public class RunnableComponent : IAsyncComponent, IRunnable, IInitializable, IDisposable {
16 16
17 17 /// <summary>
18 18 /// This class bounds <see cref="CancellationTokenSource"/> lifetime to the task,
19 19 /// when the task completes the associated token source will be disposed.
20 20 /// </summary>
21 21 class AsyncOperationDescriptor {
22 22
23 23 public static AsyncOperationDescriptor None { get; } = new AsyncOperationDescriptor();
24 24
25 25 readonly CancellationTokenSource m_cts;
26 26
27 27 bool m_done;
28 28
29 29 public CancellationToken Token {
30 30 get { return m_cts == null ? CancellationToken.None : m_cts.Token; }
31 31 }
32 32
33 33 public Task Task { get; private set; }
34 34
35 35 private AsyncOperationDescriptor(Task task, CancellationTokenSource cts) {
36 36 m_cts = cts;
37 37 Task = Chain(task);
38 38 }
39 39
40 40 private AsyncOperationDescriptor() {
41 41 Task = Task.CompletedTask;
42 42 }
43 43
44 44 public void Cancel() {
45 45 if (m_cts != null) {
46 46 lock (m_cts) {
47 47 if (!m_done)
48 48 m_cts.Cancel();
49 49 }
50 50 }
51 51 }
52 52
53 53 void Done() {
54 54 if (m_cts != null) {
55 55 lock (m_cts) {
56 56 m_done = true;
57 57 m_cts.Dispose();
58 58 }
59 59 } else {
60 60 m_done = true;
61 61 }
62 62 }
63 63
64 64 async Task Chain(Task other) {
65 65 try {
66 66 await other;
67 67 } finally {
68 68 Done();
69 69 }
70 70 }
71 71
72 72 public static AsyncOperationDescriptor Create(Func<CancellationToken, Task> factory, CancellationToken ct) {
73 73 var cts = ct.CanBeCanceled ?
74 74 CancellationTokenSource.CreateLinkedTokenSource(ct) :
75 75 new CancellationTokenSource();
76 76
77 77 return new AsyncOperationDescriptor(factory(cts.Token), cts);
78 78 }
79 79
80 80 }
81 81
82 82 // this lock is used to synchronize state flow of the component during
83 // completions or the operations.
83 // processing calls from a client and internal processes.
84 84 readonly object m_lock = new object();
85 85
86 86 // current operation cookie, used to check wheather a call to
87 87 // MoveSuccess/MoveFailed method belongs to the current
88 88 // operation, if cookies didn't match ignore completion result.
89 89 object m_cookie;
90 90
91 // AsyncOperationDscriptor aggregates a task and it's cancellation token
91 92 AsyncOperationDescriptor m_current = AsyncOperationDescriptor.None;
92 93
93 94 ExecutionState m_state;
94 95
95 96
96 97 protected RunnableComponent(bool initialized) {
97 98 State = initialized ? ExecutionState.Ready : ExecutionState.Created;
98 99 }
99 100
100 101 public Task Completion {
101 102 get { return m_current.Task; }
102 103 }
103 104
104 105 public ExecutionState State {
105 106 get { return m_state; }
106 107 private set {
107 108 if (m_state != value) {
108 109 m_state = value;
109 110 StateChanged.DispatchEvent(this, new StateChangeEventArgs {
110 111 State = value,
111 112 LastError = LastError
112 113 });
113 114 }
114 115 }
115 116 }
116 117
117 118 public Exception LastError { get; private set; }
118 119
119 120 public event EventHandler<StateChangeEventArgs> StateChanged;
120 121
121 122 /// <summary>
122 123 /// Releases all resources used by the current component regardless of its
123 124 /// execution state.
124 125 /// </summary>
125 126 /// <remarks>
126 127 /// Calling to this method may result unexpedted results if the component
127 128 /// isn't in the stopped state. Call this method after the component is
128 129 /// stopped if needed or if the component is in the failed state.
129 130 /// </remarks>
130 131 public void Dispose() {
131 132 bool dispose = false;
132 133 if (dispose) {
133 134 Dispose(true);
134 135 GC.SuppressFinalize(this);
135 136 }
136 137 }
137 138
138 139 ~RunnableComponent() {
139 140 Dispose(false);
140 141 }
141 142
142 143 /// <summary>
143 144 /// Releases all resources used by the current component regardless of its
144 145 /// execution state.
145 146 /// </summary>
146 147 /// <param name="disposing">Indicates that the component is disposed
147 148 /// during a normal disposing or during GC.</param>
148 149 protected virtual void Dispose(bool disposing) {
149 150 }
150 151
151 152 public void Initialize() {
152 153 var cookie = new object();
153 154 if (MoveInitialize(cookie))
154 155 ScheduleTask(InitializeInternalAsync, CancellationToken.None, cookie);
156 else
157 throw new InvalidOperationException();
155 158 }
156 159
157 160 /// <summary>
158 161 /// This method is used for initialization during a component creation.
159 162 /// </summary>
160 163 /// <param name="ct">A cancellation token for this operation</param>
161 164 /// <remarks>
162 165 /// This method should be used for short and mostly syncronous operations,
163 166 /// other operations which require time to run shoud be placed in
164 167 /// <see cref="StartInternalAsync(CancellationToken)"/> method.
165 168 /// </remarks>
166 169 protected virtual Task InitializeInternalAsync(CancellationToken ct) {
167 170 return Task.CompletedTask;
168 171 }
169 172
170 173 public void Start(CancellationToken ct) {
171 174 var cookie = new object();
172 175 if (MoveStart(cookie))
173 176 ScheduleTask(StartInternalAsync, ct, cookie);
177 else
178 throw new InvalidOperationException();
174 179 }
175 180
176 181 protected virtual Task StartInternalAsync(CancellationToken ct) {
177 182 return Task.CompletedTask;
178 183 }
179 184
180 185 public void Stop(CancellationToken ct) {
181 186 var cookie = new object();
182 187 if (MoveStop(cookie))
183 188 ScheduleTask(StopAsync, ct, cookie);
189 else
190 throw new InvalidOperationException();
184 191 }
185 192
186 193 async Task StopAsync(CancellationToken ct) {
187 194 m_current.Cancel();
188 195 await Completion;
189 196
190 197 ct.ThrowIfCancellationRequested();
191 198
192 199 await StopInternalAsync(ct);
193 200 }
194 201
195 202 protected virtual Task StopInternalAsync(CancellationToken ct) {
196 203 return Task.CompletedTask;
197 204 }
198 205
206 protected void Fail(Exception err) {
207 lock(m_lock) {
208 if (m_state != ExecutionState.Running)
209 return;
210 m_cookie = new object();
211 LastError = err;
212 State = ExecutionState.Failed;
213 }
214 }
215
199 216
200 217 #region state management
201 218
202 219 bool MoveInitialize(object cookie) {
203 220 lock (m_lock) {
204 221 if (State != ExecutionState.Created)
205 222 return false;
206 223 State = ExecutionState.Initializing;
207 224 m_cookie = cookie;
208 225 return true;
209 226 }
210 227 }
211 228
212 229 bool MoveStart(object cookie) {
213 230 lock (m_lock) {
214 231 if (State != ExecutionState.Ready)
215 232 return false;
216 233 State = ExecutionState.Starting;
217 234 m_cookie = cookie;
218 235 return true;
219 236 }
220 237 }
221 238
222 239 bool MoveStop(object cookie) {
223 240 lock (m_lock) {
224 241 if (State != ExecutionState.Starting && State != ExecutionState.Running)
225 242 return false;
226 243 State = ExecutionState.Stopping;
227 244 m_cookie = cookie;
228 245 return true;
229 246 }
230 247 }
231 248
232 249 void MoveSuccess(object cookie) {
233 250 lock (m_lock) {
234 251 if (m_cookie != cookie)
235 252 return;
236 253 switch (State) {
237 254 case ExecutionState.Initializing:
238 255 State = ExecutionState.Ready;
239 256 break;
240 257 case ExecutionState.Starting:
241 258 State = ExecutionState.Running;
242 259 break;
243 260 case ExecutionState.Stopping:
244 261 State = ExecutionState.Stopped;
245 262 break;
246 263 }
247 264 }
248 265 }
249 266
250 267 void MoveFailed(Exception err, object cookie) {
251 268 lock (m_lock) {
252 269 if (m_cookie != cookie)
253 270 return;
254 271 LastError = err;
255 272 State = ExecutionState.Failed;
256 273 }
257 274 }
258 275
259
276 void ScheduleTask(Func<CancellationToken, Task> next, CancellationToken ct, object cookie) {
260 277
261 protected async void ScheduleTask(Func<CancellationToken, Task> next, CancellationToken ct, object cookie) {
278 m_current = AsyncOperationDescriptor.Create(async (x) => {
262 279 try {
263 m_current = AsyncOperationDescriptor.Create(next, ct);
264 await m_current.Task;
280 await next(x);
265 281 MoveSuccess(cookie);
266 282 } catch (Exception e) {
267 283 MoveFailed(e, cookie);
268 284 }
285 }, ct);
269 286 }
270 287
271 288 #endregion
272 289 }
273 290 } No newline at end of file
1 NO CONTENT: file was removed
1 NO CONTENT: file was removed
General Comments 3
Under Review
author

Auto status change to "Under Review"

Approved
author

ok, latest stable version should be in default

You need to be logged in to leave comments. Login now