##// END OF EJS Templates
Added TraceContext support to array traits
cin -
r41:2fc0fbe7d58b default
parent child
Show More
1 NO CONTENT: modified file, binary diff hidden
@@ -1,163 +1,163
1 1 using System;
2 2 using System.Collections.Generic;
3 3 using System.Linq;
4 4 using System.Text;
5 5 using System.Threading;
6 6 using System.Threading.Tasks;
7 7
8 8 namespace Implab.Diagnostics {
9 9 /// <summary>
10 10 /// Контекст трассировки, привязывается к потоку и содержит в себе информацию о стеке логических операций.
11 11 /// </summary>
12 12 /// <remarks>
13 13 /// Контекст трассировки передается слушателям событий для определения места, где возникло событие.
14 14 /// </remarks>
15 15 public class TraceContext {
16 16 LogicalOperation m_currentOperation;
17 17 readonly LogicalOperation m_bound;
18 18 readonly int m_threadId;
19 19
20 20 [ThreadStatic]
21 21 static TraceContext _current;
22 22
23 23 /// <summary>
24 24 /// Текущий контекст трассировки для потока, создается астоматически при первом обращении.
25 25 /// </summary>
26 26 public static TraceContext Current {
27 27 get {
28 28 if (_current == null)
29 29 _current = new TraceContext();
30 30 return _current;
31 31 }
32 32 }
33 33
34 34 TraceContext(TraceContext context) {
35 35 if (context == null)
36 36 throw new ArgumentNullException("context");
37 37
38 38 m_currentOperation = context.CurrentOperation;
39 39 m_bound = context.CurrentOperation;
40 40 m_threadId = Thread.CurrentThread.ManagedThreadId;
41 41 }
42 42
43 43 TraceContext() {
44 44 m_currentOperation = new LogicalOperation();
45 45 m_bound = m_currentOperation;
46 46 m_threadId = Thread.CurrentThread.ManagedThreadId;
47 47 }
48 48
49 49 /// <summary>
50 50 /// При необходимости копирует состояние контекста трассивровки в текущий поток.
51 51 /// </summary>
52 52 /// <param name="from">Исходный контекст трассировки, который передается.</param>
53 53 /// <remarks>
54 54 /// <para>
55 55 /// Копирование происходит за счет создания нового контекста трассировки и заполнением его
56 56 /// состояния из переданного контекста. При этом копируется стек операций, однако в новом
57 57 /// контексте ранее начатые логические операции не могут быть завершены.
58 58 /// </para>
59 59 /// <para>
60 60 /// Если передача состояния состоялась, то вызывается событие трассировки <see cref="TraceEventType.Transfer"/>.
61 61 /// </para>
62 62 /// </remarks>
63 63 public static void Transfer(TraceContext from) {
64 64 if (_current == from)
65 65 return;
66 66 if (from != null) {
67 67 var context = new TraceContext(from);
68 68 context.LogEvent(TraceEventType.Transfer, "[{0}]-->[{1}]",from.ThreadId, context.ThreadId);
69 69 _current = context;
70 70 } else {
71 71 _current = new TraceContext();
72 72 }
73 73 }
74 74
75 75 /// <summary>
76 76 /// Создает постоянную копию текущего контекста, данную копию можно хранить и использовать для передачи через <see cref="Transfer(TraceContext)"/>
77 77 /// </summary>
78 /// <returns>Копия текущего контекста трассировки или <c>null</c> если таковой не был создан.</returns>
78 /// <returns>Копия текущего контекста трассировки.</returns>
79 79 public static TraceContext Snapshot() {
80 return _current == null ? null : new TraceContext(_current);
80 return _current == null ? new TraceContext() : new TraceContext(_current);
81 81 }
82 82
83 83 /// <summary>
84 84 /// Выполняет переданное действие в указанном контексте трассировки, по окончании восстанавливает предыдущий контекст трассировки потока.
85 85 /// </summary>
86 86 /// <param name="action"></param>
87 87 public void Invoke(Action action) {
88 88 if (action == null)
89 89 throw new ArgumentNullException("action");
90 90 var old = _current;
91 91 Transfer(this);
92 92 try {
93 93 action();
94 94 } finally {
95 95 _current = old;
96 96 }
97 97 }
98 98
99 99 /// <summary>
100 100 /// Текущая логическая операция.
101 101 /// </summary>
102 102 public LogicalOperation CurrentOperation {
103 103 get {
104 104 return m_currentOperation;
105 105 }
106 106 }
107 107
108 108 /// <summary>
109 109 /// Операция ниже которой нельзя опускаться в стеке логических операций, т.е. она не может быть завершена в текущем контексте.
110 110 /// </summary>
111 111 public LogicalOperation BoundOperation {
112 112 get {
113 113 return m_bound;
114 114 }
115 115 }
116 116
117 117 /// <summary>
118 118 /// Поток, в котором создан контекст трассировки.
119 119 /// </summary>
120 120 public int ThreadId {
121 121 get {
122 122 return m_threadId;
123 123 }
124 124 }
125 125
126 126 /// <summary>
127 127 /// Начинает безымянную логическую операцию.
128 128 /// </summary>
129 129 public void StartLogicalOperation() {
130 130 StartLogicalOperation(null);
131 131 }
132 132
133 133 /// <summary>
134 134 /// Начинает логическую операцию с указанным именем. Созданная операция будет добвалена в стек логических операций контекста, затем будет создано соответсвующее событие.
135 135 /// </summary>
136 136 /// <param name="name">Имя начинаемой операции.</param>
137 137 public void StartLogicalOperation(string name) {
138 138 m_currentOperation = new LogicalOperation(name, m_currentOperation);
139 139 LogEvent(TraceEventType.OperationStarted, name);
140 140 }
141 141
142 142 /// <summary>
143 143 /// Заканчивает логическую операцию начатую в текущем контексте. Операции, начатые в других контекстах не могут быть закончены в текущем контексте.
144 144 /// </summary>
145 145 /// <remarks>
146 146 /// При вызове данного метода создается событие журнала трассировки, либо о завершении операции, либо об ошибки, поскольку данная операция
147 147 /// начата в другом контексте.
148 148 /// </remarks>
149 149 public void EndLogicalOperation() {
150 150 if (m_bound == m_currentOperation) {
151 151 LogEvent(TraceEventType.Error, "Trying to end the operation which isn't belongs to current trace");
152 152 } else {
153 153 var op = m_currentOperation;
154 154 LogEvent(TraceEventType.OperationCompleted, "{0} {1} ms", op.Name, op.Duration);
155 155 m_currentOperation = m_currentOperation.Parent;
156 156 }
157 157 }
158 158
159 159 void LogEvent(TraceEventType type, string format, params object[] args) {
160 160 LogChannel<TraceEvent>.Default.LogEvent(this, TraceEvent.Create(type, format, args));
161 161 }
162 162 }
163 163 }
@@ -1,176 +1,191
1 using System;
1 using Implab.Diagnostics;
2 using System;
2 3 using System.Collections.Generic;
3 4 using System.Diagnostics;
4 5 using System.Linq;
5 6 using System.Text;
6 7 using System.Threading;
7 8
8 9 namespace Implab.Parallels {
9 10 public static class ArrayTraits {
10 11 class ArrayIterator<TSrc> : DispatchPool<int> {
11 12 readonly Action<TSrc> m_action;
12 13 readonly TSrc[] m_source;
13 14 readonly Promise<int> m_promise = new Promise<int>();
15 readonly TraceContext m_traceContext;
14 16
15 17 int m_pending;
16 18 int m_next;
17 19
18 20 public ArrayIterator(TSrc[] source, Action<TSrc> action, int threads)
19 21 : base(threads) {
20 22
21 23 Debug.Assert(source != null);
22 24 Debug.Assert(action != null);
23 25
26 m_traceContext = TraceContext.Snapshot();
24 27 m_next = 0;
25 28 m_source = source;
26 29 m_pending = source.Length;
27 30 m_action = action;
28 31
29 32 m_promise.Anyway(() => Dispose());
30 33 m_promise.Cancelled(() => Dispose());
31 34
32 35 InitPool();
33 36 }
34 37
35 38 public Promise<int> Promise {
36 39 get {
37 40 return m_promise;
38 41 }
39 42 }
40 43
44 protected override void Worker() {
45 TraceContext.Transfer(m_traceContext);
46 base.Worker();
47 }
48
41 49 protected override bool TryDequeue(out int unit) {
42 50 unit = Interlocked.Increment(ref m_next) - 1;
43 51 return unit >= m_source.Length ? false : true;
44 52 }
45 53
46 54 protected override void InvokeUnit(int unit) {
47 55 try {
48 56 m_action(m_source[unit]);
49 57 var pending = Interlocked.Decrement(ref m_pending);
50 58 if (pending == 0)
51 59 m_promise.Resolve(m_source.Length);
52 60 } catch (Exception e) {
53 61 m_promise.Reject(e);
54 62 }
55 63 }
56 64 }
57 65
58 66 class ArrayMapper<TSrc, TDst>: DispatchPool<int> {
59 67 readonly Func<TSrc, TDst> m_transform;
60 68 readonly TSrc[] m_source;
61 69 readonly TDst[] m_dest;
62 70 readonly Promise<TDst[]> m_promise = new Promise<TDst[]>();
71 readonly TraceContext m_traceContext;
63 72
64 73 int m_pending;
65 74 int m_next;
66 75
67 76 public ArrayMapper(TSrc[] source, Func<TSrc, TDst> transform, int threads)
68 77 : base(threads) {
69 78
70 79 Debug.Assert (source != null);
71 80 Debug.Assert( transform != null);
72 81
73 82 m_next = 0;
74 83 m_source = source;
75 84 m_dest = new TDst[source.Length];
76 85 m_pending = source.Length;
77 86 m_transform = transform;
87 m_traceContext = TraceContext.Snapshot();
78 88
79 89 m_promise.Anyway(() => Dispose());
80 90 m_promise.Cancelled(() => Dispose());
81 91
82 92 InitPool();
83 93 }
84 94
85 95 public Promise<TDst[]> Promise {
86 96 get {
87 97 return m_promise;
88 98 }
89 99 }
90 100
101 protected override void Worker() {
102 TraceContext.Transfer(m_traceContext);
103 base.Worker();
104 }
105
91 106 protected override bool TryDequeue(out int unit) {
92 107 unit = Interlocked.Increment(ref m_next) - 1;
93 108 return unit >= m_source.Length ? false : true;
94 109 }
95 110
96 111 protected override void InvokeUnit(int unit) {
97 112 try {
98 113 m_dest[unit] = m_transform(m_source[unit]);
99 114 var pending = Interlocked.Decrement(ref m_pending);
100 115 if (pending == 0)
101 116 m_promise.Resolve(m_dest);
102 117 } catch (Exception e) {
103 118 m_promise.Reject(e);
104 119 }
105 120 }
106 121 }
107 122
108 123 public static IPromise<TDst[]> ParallelMap<TSrc, TDst> (this TSrc[] source, Func<TSrc,TDst> transform, int threads) {
109 124 if (source == null)
110 125 throw new ArgumentNullException("source");
111 126 if (transform == null)
112 127 throw new ArgumentNullException("transform");
113 128
114 129 var mapper = new ArrayMapper<TSrc, TDst>(source, transform, threads);
115 130 return mapper.Promise;
116 131 }
117 132
118 133 public static IPromise<int> ParallelForEach<TSrc>(this TSrc[] source, Action<TSrc> action, int threads) {
119 134 if (source == null)
120 135 throw new ArgumentNullException("source");
121 136 if (action == null)
122 137 throw new ArgumentNullException("action");
123 138
124 139 var iter = new ArrayIterator<TSrc>(source, action, threads);
125 140 return iter.Promise;
126 141 }
127 142
128 143 public static IPromise<TDst[]> ChainedMap<TSrc, TDst>(this TSrc[] source, ChainedOperation<TSrc, TDst> transform, int threads) {
129 144 if (source == null)
130 145 throw new ArgumentNullException("source");
131 146 if (transform == null)
132 147 throw new ArgumentNullException("transform");
133 148 if (threads <= 0)
134 149 throw new ArgumentOutOfRangeException("Threads number must be greater then zero");
135 150
136 151 if (source.Length == 0)
137 152 return Promise<TDst[]>.ResultToPromise(new TDst[0]);
138 153
139 154 var promise = new Promise<TDst[]>();
140 155 var res = new TDst[source.Length];
141 156 var pending = source.Length;
142 157
143 158 var semaphore = new Semaphore(threads, threads);
144 159
145 160 AsyncPool.InvokeNewThread(() => {
146 161 for (int i = 0; i < source.Length; i++) {
147 162 if(promise.IsResolved)
148 163 break; // stop processing in case of error or cancellation
149 164 var idx = i;
150 165 semaphore.WaitOne();
151 166 try {
152 167 var p1 = transform(source[i]);
153 168 p1.Anyway(() => semaphore.Release());
154 169 p1.Cancelled(() => semaphore.Release());
155 170 p1.Then(
156 171 x => {
157 172 res[idx] = x;
158 173 var left = Interlocked.Decrement(ref pending);
159 174 if (left == 0)
160 175 promise.Resolve(res);
161 176 },
162 177 e => promise.Reject(e)
163 178 );
164 179
165 180 } catch (Exception e) {
166 181 promise.Reject(e);
167 182 }
168 183 }
169 184 return 0;
170 185 });
171 186
172 187 return promise.Anyway(() => semaphore.Dispose());
173 188 }
174 189
175 190 }
176 191 }
@@ -1,334 +1,334
1 1 using System;
2 2 using System.Collections.Generic;
3 3 using System.Linq;
4 4 using System.Text;
5 5 using System.Threading;
6 6 using System.Diagnostics;
7 7
8 8 namespace Implab.Parallels {
9 9 public abstract class DispatchPool<TUnit> : IDisposable {
10 10 readonly int m_minThreads;
11 11 readonly int m_maxThreads;
12 12
13 13 int m_createdThreads = 0; // the current size of the pool
14 14 int m_activeThreads = 0; // the count of threads which are active
15 15 int m_sleepingThreads = 0; // the count of currently inactive threads
16 16 int m_maxRunningThreads = 0; // the meximum reached size of the pool
17 17 int m_exitRequired = 0; // the pool is going to shutdown, all unused workers are released
18 18 int m_releaseTimeout = 100; // the timeout while the working thread will wait for the new tasks before exit
19 19 int m_wakeEvents = 0; // the count of wake events
20 20
21 21 AutoResetEvent m_hasTasks = new AutoResetEvent(false);
22 22
23 23 protected DispatchPool(int min, int max) {
24 24 if (min < 0)
25 25 throw new ArgumentOutOfRangeException("min");
26 26 if (max <= 0)
27 27 throw new ArgumentOutOfRangeException("max");
28 28
29 29 if (min > max)
30 30 min = max;
31 31 m_minThreads = min;
32 32 m_maxThreads = max;
33 33 }
34 34
35 35 protected DispatchPool(int threads)
36 36 : this(threads, threads) {
37 37 }
38 38
39 39 protected DispatchPool() {
40 40 int maxThreads, maxCP;
41 41 ThreadPool.GetMaxThreads(out maxThreads, out maxCP);
42 42
43 43 m_minThreads = 0;
44 44 m_maxThreads = maxThreads;
45 45 }
46 46
47 47 protected void InitPool() {
48 48 for (int i = 0; i < m_minThreads; i++)
49 49 StartWorker();
50 50 }
51 51
52 52 public int PoolSize {
53 53 get {
54 54 return m_createdThreads;
55 55 }
56 56 }
57 57
58 58 public int ActiveThreads {
59 59 get {
60 60 return m_activeThreads;
61 61 }
62 62 }
63 63
64 64 public int MaxRunningThreads {
65 65 get {
66 66 return m_maxRunningThreads;
67 67 }
68 68 }
69 69
70 70 protected bool IsDisposed {
71 71 get {
72 72 return m_exitRequired != 0;
73 73 }
74 74 }
75 75
76 76 protected abstract bool TryDequeue(out TUnit unit);
77 77
78 78 #region thread execution traits
79 79 int SignalThread() {
80 80 var signals = Interlocked.Increment(ref m_wakeEvents);
81 81 if(signals == 1)
82 82 m_hasTasks.Set();
83 83 return signals;
84 84 }
85 85
86 86 bool FetchSignalOrWait(int timeout) {
87 87 var start = Environment.TickCount;
88 88
89 89 // означает, что поток владеет блокировкой и при успешном получении сигнала должен
90 90 // ее вернуть, чтобы другой ожидающий поток смог
91 91 bool hasLock = false;
92 92 do {
93 93 int signals;
94 94 do {
95 95 signals = m_wakeEvents;
96 96 if (signals == 0)
97 97 break;
98 98 } while (Interlocked.CompareExchange(ref m_wakeEvents, signals - 1, signals) != signals);
99 99
100 100 if (signals >= 1) {
101 101 if (signals > 1 && hasLock)
102 102 m_hasTasks.Set();
103 103 return true;
104 104 }
105 105
106 106 if (timeout != -1)
107 107 timeout = Math.Max(0, timeout - (Environment.TickCount - start));
108 108
109 109 // если сигналов больше не осталось, то первый поток, который дошел сюда сбросит событие
110 110 // и уйдет на пустой цикл, после чего заблокируется
111 111
112 112 hasLock = true;
113 113 } while (m_hasTasks.WaitOne(timeout));
114 114
115 115 return false;
116 116 }
117 117
118 118 bool Sleep(int timeout) {
119 119 Interlocked.Increment(ref m_sleepingThreads);
120 120 if (FetchSignalOrWait(timeout)) {
121 121 Interlocked.Decrement(ref m_sleepingThreads);
122 122 return true;
123 123 } else {
124 124 Interlocked.Decrement(ref m_sleepingThreads);
125 125 return false;
126 126 }
127 127 }
128 128 #endregion
129 129
130 130 /// <summary>
131 131 /// Запускает либо новый поток, если раньше не было ни одного потока, либо устанавливает событие пробуждение одного спящего потока
132 132 /// </summary>
133 133 protected void GrowPool() {
134 134 if (m_exitRequired != 0)
135 135 return;
136 136 if (m_sleepingThreads > m_wakeEvents) {
137 137 //Console.WriteLine("Waking threads (sleeps {0}, pending {1})", m_sleepingThreads, m_wakeEvents);
138 138
139 139 // all sleeping threads may gone
140 140 SignalThread(); // wake a sleeping thread;
141 141
142 142 // we can't check whether signal has been processed
143 143 // anyway it may take some time for the thread to start
144 144 // we will ensure that at least one thread is running
145 145
146 146 EnsurePoolIsAlive();
147 147 } else {
148 148 // if there is no sleeping threads in the pool
149 149 if (!StartWorker()) {
150 150 // we haven't started a new thread, but the current can be on the way to terminate and it can't process the queue
151 151 // send it a signal to spin again
152 152 SignalThread();
153 153 EnsurePoolIsAlive();
154 154 }
155 155 }
156 156 }
157 157
158 158 protected void EnsurePoolIsAlive() {
159 159 if (AllocateThreadSlot(1)) {
160 160 // if there were no threads in the pool
161 161 var worker = new Thread(this.Worker);
162 162 worker.IsBackground = true;
163 163 worker.Start();
164 164 }
165 165 }
166 166
167 167 protected virtual bool Suspend() {
168 168 //no tasks left, exit if the thread is no longer needed
169 169 bool last;
170 170 bool requestExit;
171 171
172 172 // if threads have a timeout before releasing
173 173 if (m_releaseTimeout > 0)
174 174 requestExit = !Sleep(m_releaseTimeout);
175 175 else
176 176 requestExit = true;
177 177
178 178 if (!requestExit)
179 179 return true;
180 180
181 181 // release unsused thread
182 182 if (requestExit && ReleaseThreadSlot(out last)) {
183 183 // in case at the moment the last thread was being released
184 184 // a new task was added to the queue, we need to try
185 185 // to revoke the thread to avoid the situation when the task is left unprocessed
186 186 if (last && FetchSignalOrWait(0)) { // FetchSignalOrWait(0) will fetch pending task or will return false
187 187 SignalThread(); // since FetchSignalOrWait(0) has fetched the signal we need to reschedule it
188 188 return AllocateThreadSlot(1); // ensure that at least one thread is alive
189 189 }
190 190
191 191 return false;
192 192 }
193 193
194 194 // wait till infinity
195 195 Sleep(-1);
196 196
197 197 return true;
198 198 }
199 199
200 200 #region thread slots traits
201 201
202 202 bool AllocateThreadSlot() {
203 203 int current;
204 204 // use spins to allocate slot for the new thread
205 205 do {
206 206 current = m_createdThreads;
207 207 if (current >= m_maxThreads || m_exitRequired != 0)
208 208 // no more slots left or the pool has been disposed
209 209 return false;
210 210 } while (current != Interlocked.CompareExchange(ref m_createdThreads, current + 1, current));
211 211
212 212 UpdateMaxThreads(current + 1);
213 213
214 214 return true;
215 215 }
216 216
217 217 bool AllocateThreadSlot(int desired) {
218 218 if (desired - 1 != Interlocked.CompareExchange(ref m_createdThreads, desired, desired - 1))
219 219 return false;
220 220
221 221 UpdateMaxThreads(desired);
222 222
223 223 return true;
224 224 }
225 225
226 226 bool ReleaseThreadSlot(out bool last) {
227 227 last = false;
228 228 int current;
229 229 // use spins to release slot for the new thread
230 230 do {
231 231 current = m_createdThreads;
232 232 if (current <= m_minThreads && m_exitRequired == 0)
233 233 // the thread is reserved
234 234 return false;
235 235 } while (current != Interlocked.CompareExchange(ref m_createdThreads, current - 1, current));
236 236
237 237 last = (current == 1);
238 238
239 239 return true;
240 240 }
241 241
242 242 /// <summary>
243 243 /// releases thread slot unconditionally, used during cleanup
244 244 /// </summary>
245 245 /// <returns>true - no more threads left</returns>
246 246 bool ReleaseThreadSlotAnyway() {
247 247 var left = Interlocked.Decrement(ref m_createdThreads);
248 248 return left == 0;
249 249 }
250 250
251 251 void UpdateMaxThreads(int count) {
252 252 int max;
253 253 do {
254 254 max = m_maxRunningThreads;
255 255 if (max >= count)
256 256 break;
257 257 } while(max != Interlocked.CompareExchange(ref m_maxRunningThreads, count, max));
258 258 }
259 259
260 260 #endregion
261 261
262 262 bool StartWorker() {
263 263 if (AllocateThreadSlot()) {
264 264 // slot successfully allocated
265 265 var worker = new Thread(this.Worker);
266 266 worker.IsBackground = true;
267 267 worker.Start();
268 268
269 269 return true;
270 270 } else {
271 271 return false;
272 272 }
273 273 }
274 274
275 275 protected abstract void InvokeUnit(TUnit unit);
276 276
277 void Worker() {
277 protected virtual void Worker() {
278 278 TUnit unit;
279 279 //Console.WriteLine("{0}: Active", Thread.CurrentThread.ManagedThreadId);
280 280 Interlocked.Increment(ref m_activeThreads);
281 281 do {
282 282 // exit if requested
283 283 if (m_exitRequired != 0) {
284 284 // release the thread slot
285 285 Interlocked.Decrement(ref m_activeThreads);
286 286 if (ReleaseThreadSlotAnyway()) // it was the last worker
287 287 m_hasTasks.Dispose();
288 288 else
289 289 SignalThread(); // wake next worker
290 290 break;
291 291 }
292 292
293 293 // fetch task
294 294 if (TryDequeue(out unit)) {
295 295 InvokeUnit(unit);
296 296 continue;
297 297 }
298 298 Interlocked.Decrement(ref m_activeThreads);
299 299
300 300 // entering suspend state
301 301 // keep this thread and wait
302 302 if (!Suspend())
303 303 break;
304 304 //Console.WriteLine("{0}: Awake", Thread.CurrentThread.ManagedThreadId);
305 305 Interlocked.Increment(ref m_activeThreads);
306 306 } while (true);
307 307 //Console.WriteLine("{0}: Exited", Thread.CurrentThread.ManagedThreadId);
308 308 }
309 309
310 310 protected virtual void Dispose(bool disposing) {
311 311 if (disposing) {
312 312 if (m_exitRequired == 0) {
313 313 if (Interlocked.CompareExchange(ref m_exitRequired, 1, 0) != 0)
314 314 return;
315 315
316 316 // wake sleeping threads
317 317 if (m_createdThreads > 0)
318 318 SignalThread();
319 319 else
320 320 m_hasTasks.Dispose();
321 321 GC.SuppressFinalize(this);
322 322 }
323 323 }
324 324 }
325 325
326 326 public void Dispose() {
327 327 Dispose(true);
328 328 }
329 329
330 330 ~DispatchPool() {
331 331 Dispose(false);
332 332 }
333 333 }
334 334 }
General Comments 0
You need to be logged in to leave comments. Login now