##// END OF EJS Templates
Added TraceContext support to array traits
cin -
r41:2fc0fbe7d58b default
parent child
Show More
1 NO CONTENT: modified file, binary diff hidden
NO CONTENT: modified file, binary diff hidden
@@ -1,163 +1,163
1 using System;
1 using System;
2 using System.Collections.Generic;
2 using System.Collections.Generic;
3 using System.Linq;
3 using System.Linq;
4 using System.Text;
4 using System.Text;
5 using System.Threading;
5 using System.Threading;
6 using System.Threading.Tasks;
6 using System.Threading.Tasks;
7
7
8 namespace Implab.Diagnostics {
8 namespace Implab.Diagnostics {
9 /// <summary>
9 /// <summary>
10 /// Контекст трассировки, привязывается к потоку и содержит в себе информацию о стеке логических операций.
10 /// Контекст трассировки, привязывается к потоку и содержит в себе информацию о стеке логических операций.
11 /// </summary>
11 /// </summary>
12 /// <remarks>
12 /// <remarks>
13 /// Контекст трассировки передается слушателям событий для определения места, где возникло событие.
13 /// Контекст трассировки передается слушателям событий для определения места, где возникло событие.
14 /// </remarks>
14 /// </remarks>
15 public class TraceContext {
15 public class TraceContext {
16 LogicalOperation m_currentOperation;
16 LogicalOperation m_currentOperation;
17 readonly LogicalOperation m_bound;
17 readonly LogicalOperation m_bound;
18 readonly int m_threadId;
18 readonly int m_threadId;
19
19
20 [ThreadStatic]
20 [ThreadStatic]
21 static TraceContext _current;
21 static TraceContext _current;
22
22
23 /// <summary>
23 /// <summary>
24 /// Текущий контекст трассировки для потока, создается астоматически при первом обращении.
24 /// Текущий контекст трассировки для потока, создается астоматически при первом обращении.
25 /// </summary>
25 /// </summary>
26 public static TraceContext Current {
26 public static TraceContext Current {
27 get {
27 get {
28 if (_current == null)
28 if (_current == null)
29 _current = new TraceContext();
29 _current = new TraceContext();
30 return _current;
30 return _current;
31 }
31 }
32 }
32 }
33
33
34 TraceContext(TraceContext context) {
34 TraceContext(TraceContext context) {
35 if (context == null)
35 if (context == null)
36 throw new ArgumentNullException("context");
36 throw new ArgumentNullException("context");
37
37
38 m_currentOperation = context.CurrentOperation;
38 m_currentOperation = context.CurrentOperation;
39 m_bound = context.CurrentOperation;
39 m_bound = context.CurrentOperation;
40 m_threadId = Thread.CurrentThread.ManagedThreadId;
40 m_threadId = Thread.CurrentThread.ManagedThreadId;
41 }
41 }
42
42
43 TraceContext() {
43 TraceContext() {
44 m_currentOperation = new LogicalOperation();
44 m_currentOperation = new LogicalOperation();
45 m_bound = m_currentOperation;
45 m_bound = m_currentOperation;
46 m_threadId = Thread.CurrentThread.ManagedThreadId;
46 m_threadId = Thread.CurrentThread.ManagedThreadId;
47 }
47 }
48
48
49 /// <summary>
49 /// <summary>
50 /// При необходимости копирует состояние контекста трассивровки в текущий поток.
50 /// При необходимости копирует состояние контекста трассивровки в текущий поток.
51 /// </summary>
51 /// </summary>
52 /// <param name="from">Исходный контекст трассировки, который передается.</param>
52 /// <param name="from">Исходный контекст трассировки, который передается.</param>
53 /// <remarks>
53 /// <remarks>
54 /// <para>
54 /// <para>
55 /// Копирование происходит за счет создания нового контекста трассировки и заполнением его
55 /// Копирование происходит за счет создания нового контекста трассировки и заполнением его
56 /// состояния из переданного контекста. При этом копируется стек операций, однако в новом
56 /// состояния из переданного контекста. При этом копируется стек операций, однако в новом
57 /// контексте ранее начатые логические операции не могут быть завершены.
57 /// контексте ранее начатые логические операции не могут быть завершены.
58 /// </para>
58 /// </para>
59 /// <para>
59 /// <para>
60 /// Если передача состояния состоялась, то вызывается событие трассировки <see cref="TraceEventType.Transfer"/>.
60 /// Если передача состояния состоялась, то вызывается событие трассировки <see cref="TraceEventType.Transfer"/>.
61 /// </para>
61 /// </para>
62 /// </remarks>
62 /// </remarks>
63 public static void Transfer(TraceContext from) {
63 public static void Transfer(TraceContext from) {
64 if (_current == from)
64 if (_current == from)
65 return;
65 return;
66 if (from != null) {
66 if (from != null) {
67 var context = new TraceContext(from);
67 var context = new TraceContext(from);
68 context.LogEvent(TraceEventType.Transfer, "[{0}]-->[{1}]",from.ThreadId, context.ThreadId);
68 context.LogEvent(TraceEventType.Transfer, "[{0}]-->[{1}]",from.ThreadId, context.ThreadId);
69 _current = context;
69 _current = context;
70 } else {
70 } else {
71 _current = new TraceContext();
71 _current = new TraceContext();
72 }
72 }
73 }
73 }
74
74
75 /// <summary>
75 /// <summary>
76 /// Создает постоянную копию текущего контекста, данную копию можно хранить и использовать для передачи через <see cref="Transfer(TraceContext)"/>
76 /// Создает постоянную копию текущего контекста, данную копию можно хранить и использовать для передачи через <see cref="Transfer(TraceContext)"/>
77 /// </summary>
77 /// </summary>
78 /// <returns>Копия текущего контекста трассировки или <c>null</c> если таковой не был создан.</returns>
78 /// <returns>Копия текущего контекста трассировки.</returns>
79 public static TraceContext Snapshot() {
79 public static TraceContext Snapshot() {
80 return _current == null ? null : new TraceContext(_current);
80 return _current == null ? new TraceContext() : new TraceContext(_current);
81 }
81 }
82
82
83 /// <summary>
83 /// <summary>
84 /// Выполняет переданное действие в указанном контексте трассировки, по окончании восстанавливает предыдущий контекст трассировки потока.
84 /// Выполняет переданное действие в указанном контексте трассировки, по окончании восстанавливает предыдущий контекст трассировки потока.
85 /// </summary>
85 /// </summary>
86 /// <param name="action"></param>
86 /// <param name="action"></param>
87 public void Invoke(Action action) {
87 public void Invoke(Action action) {
88 if (action == null)
88 if (action == null)
89 throw new ArgumentNullException("action");
89 throw new ArgumentNullException("action");
90 var old = _current;
90 var old = _current;
91 Transfer(this);
91 Transfer(this);
92 try {
92 try {
93 action();
93 action();
94 } finally {
94 } finally {
95 _current = old;
95 _current = old;
96 }
96 }
97 }
97 }
98
98
99 /// <summary>
99 /// <summary>
100 /// Текущая логическая операция.
100 /// Текущая логическая операция.
101 /// </summary>
101 /// </summary>
102 public LogicalOperation CurrentOperation {
102 public LogicalOperation CurrentOperation {
103 get {
103 get {
104 return m_currentOperation;
104 return m_currentOperation;
105 }
105 }
106 }
106 }
107
107
108 /// <summary>
108 /// <summary>
109 /// Операция ниже которой нельзя опускаться в стеке логических операций, т.е. она не может быть завершена в текущем контексте.
109 /// Операция ниже которой нельзя опускаться в стеке логических операций, т.е. она не может быть завершена в текущем контексте.
110 /// </summary>
110 /// </summary>
111 public LogicalOperation BoundOperation {
111 public LogicalOperation BoundOperation {
112 get {
112 get {
113 return m_bound;
113 return m_bound;
114 }
114 }
115 }
115 }
116
116
117 /// <summary>
117 /// <summary>
118 /// Поток, в котором создан контекст трассировки.
118 /// Поток, в котором создан контекст трассировки.
119 /// </summary>
119 /// </summary>
120 public int ThreadId {
120 public int ThreadId {
121 get {
121 get {
122 return m_threadId;
122 return m_threadId;
123 }
123 }
124 }
124 }
125
125
126 /// <summary>
126 /// <summary>
127 /// Начинает безымянную логическую операцию.
127 /// Начинает безымянную логическую операцию.
128 /// </summary>
128 /// </summary>
129 public void StartLogicalOperation() {
129 public void StartLogicalOperation() {
130 StartLogicalOperation(null);
130 StartLogicalOperation(null);
131 }
131 }
132
132
133 /// <summary>
133 /// <summary>
134 /// Начинает логическую операцию с указанным именем. Созданная операция будет добвалена в стек логических операций контекста, затем будет создано соответсвующее событие.
134 /// Начинает логическую операцию с указанным именем. Созданная операция будет добвалена в стек логических операций контекста, затем будет создано соответсвующее событие.
135 /// </summary>
135 /// </summary>
136 /// <param name="name">Имя начинаемой операции.</param>
136 /// <param name="name">Имя начинаемой операции.</param>
137 public void StartLogicalOperation(string name) {
137 public void StartLogicalOperation(string name) {
138 m_currentOperation = new LogicalOperation(name, m_currentOperation);
138 m_currentOperation = new LogicalOperation(name, m_currentOperation);
139 LogEvent(TraceEventType.OperationStarted, name);
139 LogEvent(TraceEventType.OperationStarted, name);
140 }
140 }
141
141
142 /// <summary>
142 /// <summary>
143 /// Заканчивает логическую операцию начатую в текущем контексте. Операции, начатые в других контекстах не могут быть закончены в текущем контексте.
143 /// Заканчивает логическую операцию начатую в текущем контексте. Операции, начатые в других контекстах не могут быть закончены в текущем контексте.
144 /// </summary>
144 /// </summary>
145 /// <remarks>
145 /// <remarks>
146 /// При вызове данного метода создается событие журнала трассировки, либо о завершении операции, либо об ошибки, поскольку данная операция
146 /// При вызове данного метода создается событие журнала трассировки, либо о завершении операции, либо об ошибки, поскольку данная операция
147 /// начата в другом контексте.
147 /// начата в другом контексте.
148 /// </remarks>
148 /// </remarks>
149 public void EndLogicalOperation() {
149 public void EndLogicalOperation() {
150 if (m_bound == m_currentOperation) {
150 if (m_bound == m_currentOperation) {
151 LogEvent(TraceEventType.Error, "Trying to end the operation which isn't belongs to current trace");
151 LogEvent(TraceEventType.Error, "Trying to end the operation which isn't belongs to current trace");
152 } else {
152 } else {
153 var op = m_currentOperation;
153 var op = m_currentOperation;
154 LogEvent(TraceEventType.OperationCompleted, "{0} {1} ms", op.Name, op.Duration);
154 LogEvent(TraceEventType.OperationCompleted, "{0} {1} ms", op.Name, op.Duration);
155 m_currentOperation = m_currentOperation.Parent;
155 m_currentOperation = m_currentOperation.Parent;
156 }
156 }
157 }
157 }
158
158
159 void LogEvent(TraceEventType type, string format, params object[] args) {
159 void LogEvent(TraceEventType type, string format, params object[] args) {
160 LogChannel<TraceEvent>.Default.LogEvent(this, TraceEvent.Create(type, format, args));
160 LogChannel<TraceEvent>.Default.LogEvent(this, TraceEvent.Create(type, format, args));
161 }
161 }
162 }
162 }
163 }
163 }
@@ -1,176 +1,191
1 using System;
1 using Implab.Diagnostics;
2 using System;
2 using System.Collections.Generic;
3 using System.Collections.Generic;
3 using System.Diagnostics;
4 using System.Diagnostics;
4 using System.Linq;
5 using System.Linq;
5 using System.Text;
6 using System.Text;
6 using System.Threading;
7 using System.Threading;
7
8
8 namespace Implab.Parallels {
9 namespace Implab.Parallels {
9 public static class ArrayTraits {
10 public static class ArrayTraits {
10 class ArrayIterator<TSrc> : DispatchPool<int> {
11 class ArrayIterator<TSrc> : DispatchPool<int> {
11 readonly Action<TSrc> m_action;
12 readonly Action<TSrc> m_action;
12 readonly TSrc[] m_source;
13 readonly TSrc[] m_source;
13 readonly Promise<int> m_promise = new Promise<int>();
14 readonly Promise<int> m_promise = new Promise<int>();
15 readonly TraceContext m_traceContext;
14
16
15 int m_pending;
17 int m_pending;
16 int m_next;
18 int m_next;
17
19
18 public ArrayIterator(TSrc[] source, Action<TSrc> action, int threads)
20 public ArrayIterator(TSrc[] source, Action<TSrc> action, int threads)
19 : base(threads) {
21 : base(threads) {
20
22
21 Debug.Assert(source != null);
23 Debug.Assert(source != null);
22 Debug.Assert(action != null);
24 Debug.Assert(action != null);
23
25
26 m_traceContext = TraceContext.Snapshot();
24 m_next = 0;
27 m_next = 0;
25 m_source = source;
28 m_source = source;
26 m_pending = source.Length;
29 m_pending = source.Length;
27 m_action = action;
30 m_action = action;
28
31
29 m_promise.Anyway(() => Dispose());
32 m_promise.Anyway(() => Dispose());
30 m_promise.Cancelled(() => Dispose());
33 m_promise.Cancelled(() => Dispose());
31
34
32 InitPool();
35 InitPool();
33 }
36 }
34
37
35 public Promise<int> Promise {
38 public Promise<int> Promise {
36 get {
39 get {
37 return m_promise;
40 return m_promise;
38 }
41 }
39 }
42 }
40
43
44 protected override void Worker() {
45 TraceContext.Transfer(m_traceContext);
46 base.Worker();
47 }
48
41 protected override bool TryDequeue(out int unit) {
49 protected override bool TryDequeue(out int unit) {
42 unit = Interlocked.Increment(ref m_next) - 1;
50 unit = Interlocked.Increment(ref m_next) - 1;
43 return unit >= m_source.Length ? false : true;
51 return unit >= m_source.Length ? false : true;
44 }
52 }
45
53
46 protected override void InvokeUnit(int unit) {
54 protected override void InvokeUnit(int unit) {
47 try {
55 try {
48 m_action(m_source[unit]);
56 m_action(m_source[unit]);
49 var pending = Interlocked.Decrement(ref m_pending);
57 var pending = Interlocked.Decrement(ref m_pending);
50 if (pending == 0)
58 if (pending == 0)
51 m_promise.Resolve(m_source.Length);
59 m_promise.Resolve(m_source.Length);
52 } catch (Exception e) {
60 } catch (Exception e) {
53 m_promise.Reject(e);
61 m_promise.Reject(e);
54 }
62 }
55 }
63 }
56 }
64 }
57
65
58 class ArrayMapper<TSrc, TDst>: DispatchPool<int> {
66 class ArrayMapper<TSrc, TDst>: DispatchPool<int> {
59 readonly Func<TSrc, TDst> m_transform;
67 readonly Func<TSrc, TDst> m_transform;
60 readonly TSrc[] m_source;
68 readonly TSrc[] m_source;
61 readonly TDst[] m_dest;
69 readonly TDst[] m_dest;
62 readonly Promise<TDst[]> m_promise = new Promise<TDst[]>();
70 readonly Promise<TDst[]> m_promise = new Promise<TDst[]>();
71 readonly TraceContext m_traceContext;
63
72
64 int m_pending;
73 int m_pending;
65 int m_next;
74 int m_next;
66
75
67 public ArrayMapper(TSrc[] source, Func<TSrc, TDst> transform, int threads)
76 public ArrayMapper(TSrc[] source, Func<TSrc, TDst> transform, int threads)
68 : base(threads) {
77 : base(threads) {
69
78
70 Debug.Assert (source != null);
79 Debug.Assert (source != null);
71 Debug.Assert( transform != null);
80 Debug.Assert( transform != null);
72
81
73 m_next = 0;
82 m_next = 0;
74 m_source = source;
83 m_source = source;
75 m_dest = new TDst[source.Length];
84 m_dest = new TDst[source.Length];
76 m_pending = source.Length;
85 m_pending = source.Length;
77 m_transform = transform;
86 m_transform = transform;
87 m_traceContext = TraceContext.Snapshot();
78
88
79 m_promise.Anyway(() => Dispose());
89 m_promise.Anyway(() => Dispose());
80 m_promise.Cancelled(() => Dispose());
90 m_promise.Cancelled(() => Dispose());
81
91
82 InitPool();
92 InitPool();
83 }
93 }
84
94
85 public Promise<TDst[]> Promise {
95 public Promise<TDst[]> Promise {
86 get {
96 get {
87 return m_promise;
97 return m_promise;
88 }
98 }
89 }
99 }
90
100
101 protected override void Worker() {
102 TraceContext.Transfer(m_traceContext);
103 base.Worker();
104 }
105
91 protected override bool TryDequeue(out int unit) {
106 protected override bool TryDequeue(out int unit) {
92 unit = Interlocked.Increment(ref m_next) - 1;
107 unit = Interlocked.Increment(ref m_next) - 1;
93 return unit >= m_source.Length ? false : true;
108 return unit >= m_source.Length ? false : true;
94 }
109 }
95
110
96 protected override void InvokeUnit(int unit) {
111 protected override void InvokeUnit(int unit) {
97 try {
112 try {
98 m_dest[unit] = m_transform(m_source[unit]);
113 m_dest[unit] = m_transform(m_source[unit]);
99 var pending = Interlocked.Decrement(ref m_pending);
114 var pending = Interlocked.Decrement(ref m_pending);
100 if (pending == 0)
115 if (pending == 0)
101 m_promise.Resolve(m_dest);
116 m_promise.Resolve(m_dest);
102 } catch (Exception e) {
117 } catch (Exception e) {
103 m_promise.Reject(e);
118 m_promise.Reject(e);
104 }
119 }
105 }
120 }
106 }
121 }
107
122
108 public static IPromise<TDst[]> ParallelMap<TSrc, TDst> (this TSrc[] source, Func<TSrc,TDst> transform, int threads) {
123 public static IPromise<TDst[]> ParallelMap<TSrc, TDst> (this TSrc[] source, Func<TSrc,TDst> transform, int threads) {
109 if (source == null)
124 if (source == null)
110 throw new ArgumentNullException("source");
125 throw new ArgumentNullException("source");
111 if (transform == null)
126 if (transform == null)
112 throw new ArgumentNullException("transform");
127 throw new ArgumentNullException("transform");
113
128
114 var mapper = new ArrayMapper<TSrc, TDst>(source, transform, threads);
129 var mapper = new ArrayMapper<TSrc, TDst>(source, transform, threads);
115 return mapper.Promise;
130 return mapper.Promise;
116 }
131 }
117
132
118 public static IPromise<int> ParallelForEach<TSrc>(this TSrc[] source, Action<TSrc> action, int threads) {
133 public static IPromise<int> ParallelForEach<TSrc>(this TSrc[] source, Action<TSrc> action, int threads) {
119 if (source == null)
134 if (source == null)
120 throw new ArgumentNullException("source");
135 throw new ArgumentNullException("source");
121 if (action == null)
136 if (action == null)
122 throw new ArgumentNullException("action");
137 throw new ArgumentNullException("action");
123
138
124 var iter = new ArrayIterator<TSrc>(source, action, threads);
139 var iter = new ArrayIterator<TSrc>(source, action, threads);
125 return iter.Promise;
140 return iter.Promise;
126 }
141 }
127
142
128 public static IPromise<TDst[]> ChainedMap<TSrc, TDst>(this TSrc[] source, ChainedOperation<TSrc, TDst> transform, int threads) {
143 public static IPromise<TDst[]> ChainedMap<TSrc, TDst>(this TSrc[] source, ChainedOperation<TSrc, TDst> transform, int threads) {
129 if (source == null)
144 if (source == null)
130 throw new ArgumentNullException("source");
145 throw new ArgumentNullException("source");
131 if (transform == null)
146 if (transform == null)
132 throw new ArgumentNullException("transform");
147 throw new ArgumentNullException("transform");
133 if (threads <= 0)
148 if (threads <= 0)
134 throw new ArgumentOutOfRangeException("Threads number must be greater then zero");
149 throw new ArgumentOutOfRangeException("Threads number must be greater then zero");
135
150
136 if (source.Length == 0)
151 if (source.Length == 0)
137 return Promise<TDst[]>.ResultToPromise(new TDst[0]);
152 return Promise<TDst[]>.ResultToPromise(new TDst[0]);
138
153
139 var promise = new Promise<TDst[]>();
154 var promise = new Promise<TDst[]>();
140 var res = new TDst[source.Length];
155 var res = new TDst[source.Length];
141 var pending = source.Length;
156 var pending = source.Length;
142
157
143 var semaphore = new Semaphore(threads, threads);
158 var semaphore = new Semaphore(threads, threads);
144
159
145 AsyncPool.InvokeNewThread(() => {
160 AsyncPool.InvokeNewThread(() => {
146 for (int i = 0; i < source.Length; i++) {
161 for (int i = 0; i < source.Length; i++) {
147 if(promise.IsResolved)
162 if(promise.IsResolved)
148 break; // stop processing in case of error or cancellation
163 break; // stop processing in case of error or cancellation
149 var idx = i;
164 var idx = i;
150 semaphore.WaitOne();
165 semaphore.WaitOne();
151 try {
166 try {
152 var p1 = transform(source[i]);
167 var p1 = transform(source[i]);
153 p1.Anyway(() => semaphore.Release());
168 p1.Anyway(() => semaphore.Release());
154 p1.Cancelled(() => semaphore.Release());
169 p1.Cancelled(() => semaphore.Release());
155 p1.Then(
170 p1.Then(
156 x => {
171 x => {
157 res[idx] = x;
172 res[idx] = x;
158 var left = Interlocked.Decrement(ref pending);
173 var left = Interlocked.Decrement(ref pending);
159 if (left == 0)
174 if (left == 0)
160 promise.Resolve(res);
175 promise.Resolve(res);
161 },
176 },
162 e => promise.Reject(e)
177 e => promise.Reject(e)
163 );
178 );
164
179
165 } catch (Exception e) {
180 } catch (Exception e) {
166 promise.Reject(e);
181 promise.Reject(e);
167 }
182 }
168 }
183 }
169 return 0;
184 return 0;
170 });
185 });
171
186
172 return promise.Anyway(() => semaphore.Dispose());
187 return promise.Anyway(() => semaphore.Dispose());
173 }
188 }
174
189
175 }
190 }
176 }
191 }
@@ -1,334 +1,334
1 using System;
1 using System;
2 using System.Collections.Generic;
2 using System.Collections.Generic;
3 using System.Linq;
3 using System.Linq;
4 using System.Text;
4 using System.Text;
5 using System.Threading;
5 using System.Threading;
6 using System.Diagnostics;
6 using System.Diagnostics;
7
7
8 namespace Implab.Parallels {
8 namespace Implab.Parallels {
9 public abstract class DispatchPool<TUnit> : IDisposable {
9 public abstract class DispatchPool<TUnit> : IDisposable {
10 readonly int m_minThreads;
10 readonly int m_minThreads;
11 readonly int m_maxThreads;
11 readonly int m_maxThreads;
12
12
13 int m_createdThreads = 0; // the current size of the pool
13 int m_createdThreads = 0; // the current size of the pool
14 int m_activeThreads = 0; // the count of threads which are active
14 int m_activeThreads = 0; // the count of threads which are active
15 int m_sleepingThreads = 0; // the count of currently inactive threads
15 int m_sleepingThreads = 0; // the count of currently inactive threads
16 int m_maxRunningThreads = 0; // the meximum reached size of the pool
16 int m_maxRunningThreads = 0; // the meximum reached size of the pool
17 int m_exitRequired = 0; // the pool is going to shutdown, all unused workers are released
17 int m_exitRequired = 0; // the pool is going to shutdown, all unused workers are released
18 int m_releaseTimeout = 100; // the timeout while the working thread will wait for the new tasks before exit
18 int m_releaseTimeout = 100; // the timeout while the working thread will wait for the new tasks before exit
19 int m_wakeEvents = 0; // the count of wake events
19 int m_wakeEvents = 0; // the count of wake events
20
20
21 AutoResetEvent m_hasTasks = new AutoResetEvent(false);
21 AutoResetEvent m_hasTasks = new AutoResetEvent(false);
22
22
23 protected DispatchPool(int min, int max) {
23 protected DispatchPool(int min, int max) {
24 if (min < 0)
24 if (min < 0)
25 throw new ArgumentOutOfRangeException("min");
25 throw new ArgumentOutOfRangeException("min");
26 if (max <= 0)
26 if (max <= 0)
27 throw new ArgumentOutOfRangeException("max");
27 throw new ArgumentOutOfRangeException("max");
28
28
29 if (min > max)
29 if (min > max)
30 min = max;
30 min = max;
31 m_minThreads = min;
31 m_minThreads = min;
32 m_maxThreads = max;
32 m_maxThreads = max;
33 }
33 }
34
34
35 protected DispatchPool(int threads)
35 protected DispatchPool(int threads)
36 : this(threads, threads) {
36 : this(threads, threads) {
37 }
37 }
38
38
39 protected DispatchPool() {
39 protected DispatchPool() {
40 int maxThreads, maxCP;
40 int maxThreads, maxCP;
41 ThreadPool.GetMaxThreads(out maxThreads, out maxCP);
41 ThreadPool.GetMaxThreads(out maxThreads, out maxCP);
42
42
43 m_minThreads = 0;
43 m_minThreads = 0;
44 m_maxThreads = maxThreads;
44 m_maxThreads = maxThreads;
45 }
45 }
46
46
47 protected void InitPool() {
47 protected void InitPool() {
48 for (int i = 0; i < m_minThreads; i++)
48 for (int i = 0; i < m_minThreads; i++)
49 StartWorker();
49 StartWorker();
50 }
50 }
51
51
52 public int PoolSize {
52 public int PoolSize {
53 get {
53 get {
54 return m_createdThreads;
54 return m_createdThreads;
55 }
55 }
56 }
56 }
57
57
58 public int ActiveThreads {
58 public int ActiveThreads {
59 get {
59 get {
60 return m_activeThreads;
60 return m_activeThreads;
61 }
61 }
62 }
62 }
63
63
64 public int MaxRunningThreads {
64 public int MaxRunningThreads {
65 get {
65 get {
66 return m_maxRunningThreads;
66 return m_maxRunningThreads;
67 }
67 }
68 }
68 }
69
69
70 protected bool IsDisposed {
70 protected bool IsDisposed {
71 get {
71 get {
72 return m_exitRequired != 0;
72 return m_exitRequired != 0;
73 }
73 }
74 }
74 }
75
75
76 protected abstract bool TryDequeue(out TUnit unit);
76 protected abstract bool TryDequeue(out TUnit unit);
77
77
78 #region thread execution traits
78 #region thread execution traits
79 int SignalThread() {
79 int SignalThread() {
80 var signals = Interlocked.Increment(ref m_wakeEvents);
80 var signals = Interlocked.Increment(ref m_wakeEvents);
81 if(signals == 1)
81 if(signals == 1)
82 m_hasTasks.Set();
82 m_hasTasks.Set();
83 return signals;
83 return signals;
84 }
84 }
85
85
86 bool FetchSignalOrWait(int timeout) {
86 bool FetchSignalOrWait(int timeout) {
87 var start = Environment.TickCount;
87 var start = Environment.TickCount;
88
88
89 // означает, что поток владеет блокировкой и при успешном получении сигнала должен
89 // означает, что поток владеет блокировкой и при успешном получении сигнала должен
90 // ее вернуть, чтобы другой ожидающий поток смог
90 // ее вернуть, чтобы другой ожидающий поток смог
91 bool hasLock = false;
91 bool hasLock = false;
92 do {
92 do {
93 int signals;
93 int signals;
94 do {
94 do {
95 signals = m_wakeEvents;
95 signals = m_wakeEvents;
96 if (signals == 0)
96 if (signals == 0)
97 break;
97 break;
98 } while (Interlocked.CompareExchange(ref m_wakeEvents, signals - 1, signals) != signals);
98 } while (Interlocked.CompareExchange(ref m_wakeEvents, signals - 1, signals) != signals);
99
99
100 if (signals >= 1) {
100 if (signals >= 1) {
101 if (signals > 1 && hasLock)
101 if (signals > 1 && hasLock)
102 m_hasTasks.Set();
102 m_hasTasks.Set();
103 return true;
103 return true;
104 }
104 }
105
105
106 if (timeout != -1)
106 if (timeout != -1)
107 timeout = Math.Max(0, timeout - (Environment.TickCount - start));
107 timeout = Math.Max(0, timeout - (Environment.TickCount - start));
108
108
109 // если сигналов больше не осталось, то первый поток, который дошел сюда сбросит событие
109 // если сигналов больше не осталось, то первый поток, который дошел сюда сбросит событие
110 // и уйдет на пустой цикл, после чего заблокируется
110 // и уйдет на пустой цикл, после чего заблокируется
111
111
112 hasLock = true;
112 hasLock = true;
113 } while (m_hasTasks.WaitOne(timeout));
113 } while (m_hasTasks.WaitOne(timeout));
114
114
115 return false;
115 return false;
116 }
116 }
117
117
118 bool Sleep(int timeout) {
118 bool Sleep(int timeout) {
119 Interlocked.Increment(ref m_sleepingThreads);
119 Interlocked.Increment(ref m_sleepingThreads);
120 if (FetchSignalOrWait(timeout)) {
120 if (FetchSignalOrWait(timeout)) {
121 Interlocked.Decrement(ref m_sleepingThreads);
121 Interlocked.Decrement(ref m_sleepingThreads);
122 return true;
122 return true;
123 } else {
123 } else {
124 Interlocked.Decrement(ref m_sleepingThreads);
124 Interlocked.Decrement(ref m_sleepingThreads);
125 return false;
125 return false;
126 }
126 }
127 }
127 }
128 #endregion
128 #endregion
129
129
130 /// <summary>
130 /// <summary>
131 /// Запускает либо новый поток, если раньше не было ни одного потока, либо устанавливает событие пробуждение одного спящего потока
131 /// Запускает либо новый поток, если раньше не было ни одного потока, либо устанавливает событие пробуждение одного спящего потока
132 /// </summary>
132 /// </summary>
133 protected void GrowPool() {
133 protected void GrowPool() {
134 if (m_exitRequired != 0)
134 if (m_exitRequired != 0)
135 return;
135 return;
136 if (m_sleepingThreads > m_wakeEvents) {
136 if (m_sleepingThreads > m_wakeEvents) {
137 //Console.WriteLine("Waking threads (sleeps {0}, pending {1})", m_sleepingThreads, m_wakeEvents);
137 //Console.WriteLine("Waking threads (sleeps {0}, pending {1})", m_sleepingThreads, m_wakeEvents);
138
138
139 // all sleeping threads may gone
139 // all sleeping threads may gone
140 SignalThread(); // wake a sleeping thread;
140 SignalThread(); // wake a sleeping thread;
141
141
142 // we can't check whether signal has been processed
142 // we can't check whether signal has been processed
143 // anyway it may take some time for the thread to start
143 // anyway it may take some time for the thread to start
144 // we will ensure that at least one thread is running
144 // we will ensure that at least one thread is running
145
145
146 EnsurePoolIsAlive();
146 EnsurePoolIsAlive();
147 } else {
147 } else {
148 // if there is no sleeping threads in the pool
148 // if there is no sleeping threads in the pool
149 if (!StartWorker()) {
149 if (!StartWorker()) {
150 // we haven't started a new thread, but the current can be on the way to terminate and it can't process the queue
150 // we haven't started a new thread, but the current can be on the way to terminate and it can't process the queue
151 // send it a signal to spin again
151 // send it a signal to spin again
152 SignalThread();
152 SignalThread();
153 EnsurePoolIsAlive();
153 EnsurePoolIsAlive();
154 }
154 }
155 }
155 }
156 }
156 }
157
157
158 protected void EnsurePoolIsAlive() {
158 protected void EnsurePoolIsAlive() {
159 if (AllocateThreadSlot(1)) {
159 if (AllocateThreadSlot(1)) {
160 // if there were no threads in the pool
160 // if there were no threads in the pool
161 var worker = new Thread(this.Worker);
161 var worker = new Thread(this.Worker);
162 worker.IsBackground = true;
162 worker.IsBackground = true;
163 worker.Start();
163 worker.Start();
164 }
164 }
165 }
165 }
166
166
167 protected virtual bool Suspend() {
167 protected virtual bool Suspend() {
168 //no tasks left, exit if the thread is no longer needed
168 //no tasks left, exit if the thread is no longer needed
169 bool last;
169 bool last;
170 bool requestExit;
170 bool requestExit;
171
171
172 // if threads have a timeout before releasing
172 // if threads have a timeout before releasing
173 if (m_releaseTimeout > 0)
173 if (m_releaseTimeout > 0)
174 requestExit = !Sleep(m_releaseTimeout);
174 requestExit = !Sleep(m_releaseTimeout);
175 else
175 else
176 requestExit = true;
176 requestExit = true;
177
177
178 if (!requestExit)
178 if (!requestExit)
179 return true;
179 return true;
180
180
181 // release unsused thread
181 // release unsused thread
182 if (requestExit && ReleaseThreadSlot(out last)) {
182 if (requestExit && ReleaseThreadSlot(out last)) {
183 // in case at the moment the last thread was being released
183 // in case at the moment the last thread was being released
184 // a new task was added to the queue, we need to try
184 // a new task was added to the queue, we need to try
185 // to revoke the thread to avoid the situation when the task is left unprocessed
185 // to revoke the thread to avoid the situation when the task is left unprocessed
186 if (last && FetchSignalOrWait(0)) { // FetchSignalOrWait(0) will fetch pending task or will return false
186 if (last && FetchSignalOrWait(0)) { // FetchSignalOrWait(0) will fetch pending task or will return false
187 SignalThread(); // since FetchSignalOrWait(0) has fetched the signal we need to reschedule it
187 SignalThread(); // since FetchSignalOrWait(0) has fetched the signal we need to reschedule it
188 return AllocateThreadSlot(1); // ensure that at least one thread is alive
188 return AllocateThreadSlot(1); // ensure that at least one thread is alive
189 }
189 }
190
190
191 return false;
191 return false;
192 }
192 }
193
193
194 // wait till infinity
194 // wait till infinity
195 Sleep(-1);
195 Sleep(-1);
196
196
197 return true;
197 return true;
198 }
198 }
199
199
200 #region thread slots traits
200 #region thread slots traits
201
201
202 bool AllocateThreadSlot() {
202 bool AllocateThreadSlot() {
203 int current;
203 int current;
204 // use spins to allocate slot for the new thread
204 // use spins to allocate slot for the new thread
205 do {
205 do {
206 current = m_createdThreads;
206 current = m_createdThreads;
207 if (current >= m_maxThreads || m_exitRequired != 0)
207 if (current >= m_maxThreads || m_exitRequired != 0)
208 // no more slots left or the pool has been disposed
208 // no more slots left or the pool has been disposed
209 return false;
209 return false;
210 } while (current != Interlocked.CompareExchange(ref m_createdThreads, current + 1, current));
210 } while (current != Interlocked.CompareExchange(ref m_createdThreads, current + 1, current));
211
211
212 UpdateMaxThreads(current + 1);
212 UpdateMaxThreads(current + 1);
213
213
214 return true;
214 return true;
215 }
215 }
216
216
217 bool AllocateThreadSlot(int desired) {
217 bool AllocateThreadSlot(int desired) {
218 if (desired - 1 != Interlocked.CompareExchange(ref m_createdThreads, desired, desired - 1))
218 if (desired - 1 != Interlocked.CompareExchange(ref m_createdThreads, desired, desired - 1))
219 return false;
219 return false;
220
220
221 UpdateMaxThreads(desired);
221 UpdateMaxThreads(desired);
222
222
223 return true;
223 return true;
224 }
224 }
225
225
226 bool ReleaseThreadSlot(out bool last) {
226 bool ReleaseThreadSlot(out bool last) {
227 last = false;
227 last = false;
228 int current;
228 int current;
229 // use spins to release slot for the new thread
229 // use spins to release slot for the new thread
230 do {
230 do {
231 current = m_createdThreads;
231 current = m_createdThreads;
232 if (current <= m_minThreads && m_exitRequired == 0)
232 if (current <= m_minThreads && m_exitRequired == 0)
233 // the thread is reserved
233 // the thread is reserved
234 return false;
234 return false;
235 } while (current != Interlocked.CompareExchange(ref m_createdThreads, current - 1, current));
235 } while (current != Interlocked.CompareExchange(ref m_createdThreads, current - 1, current));
236
236
237 last = (current == 1);
237 last = (current == 1);
238
238
239 return true;
239 return true;
240 }
240 }
241
241
242 /// <summary>
242 /// <summary>
243 /// releases thread slot unconditionally, used during cleanup
243 /// releases thread slot unconditionally, used during cleanup
244 /// </summary>
244 /// </summary>
245 /// <returns>true - no more threads left</returns>
245 /// <returns>true - no more threads left</returns>
246 bool ReleaseThreadSlotAnyway() {
246 bool ReleaseThreadSlotAnyway() {
247 var left = Interlocked.Decrement(ref m_createdThreads);
247 var left = Interlocked.Decrement(ref m_createdThreads);
248 return left == 0;
248 return left == 0;
249 }
249 }
250
250
251 void UpdateMaxThreads(int count) {
251 void UpdateMaxThreads(int count) {
252 int max;
252 int max;
253 do {
253 do {
254 max = m_maxRunningThreads;
254 max = m_maxRunningThreads;
255 if (max >= count)
255 if (max >= count)
256 break;
256 break;
257 } while(max != Interlocked.CompareExchange(ref m_maxRunningThreads, count, max));
257 } while(max != Interlocked.CompareExchange(ref m_maxRunningThreads, count, max));
258 }
258 }
259
259
260 #endregion
260 #endregion
261
261
262 bool StartWorker() {
262 bool StartWorker() {
263 if (AllocateThreadSlot()) {
263 if (AllocateThreadSlot()) {
264 // slot successfully allocated
264 // slot successfully allocated
265 var worker = new Thread(this.Worker);
265 var worker = new Thread(this.Worker);
266 worker.IsBackground = true;
266 worker.IsBackground = true;
267 worker.Start();
267 worker.Start();
268
268
269 return true;
269 return true;
270 } else {
270 } else {
271 return false;
271 return false;
272 }
272 }
273 }
273 }
274
274
275 protected abstract void InvokeUnit(TUnit unit);
275 protected abstract void InvokeUnit(TUnit unit);
276
276
277 void Worker() {
277 protected virtual void Worker() {
278 TUnit unit;
278 TUnit unit;
279 //Console.WriteLine("{0}: Active", Thread.CurrentThread.ManagedThreadId);
279 //Console.WriteLine("{0}: Active", Thread.CurrentThread.ManagedThreadId);
280 Interlocked.Increment(ref m_activeThreads);
280 Interlocked.Increment(ref m_activeThreads);
281 do {
281 do {
282 // exit if requested
282 // exit if requested
283 if (m_exitRequired != 0) {
283 if (m_exitRequired != 0) {
284 // release the thread slot
284 // release the thread slot
285 Interlocked.Decrement(ref m_activeThreads);
285 Interlocked.Decrement(ref m_activeThreads);
286 if (ReleaseThreadSlotAnyway()) // it was the last worker
286 if (ReleaseThreadSlotAnyway()) // it was the last worker
287 m_hasTasks.Dispose();
287 m_hasTasks.Dispose();
288 else
288 else
289 SignalThread(); // wake next worker
289 SignalThread(); // wake next worker
290 break;
290 break;
291 }
291 }
292
292
293 // fetch task
293 // fetch task
294 if (TryDequeue(out unit)) {
294 if (TryDequeue(out unit)) {
295 InvokeUnit(unit);
295 InvokeUnit(unit);
296 continue;
296 continue;
297 }
297 }
298 Interlocked.Decrement(ref m_activeThreads);
298 Interlocked.Decrement(ref m_activeThreads);
299
299
300 // entering suspend state
300 // entering suspend state
301 // keep this thread and wait
301 // keep this thread and wait
302 if (!Suspend())
302 if (!Suspend())
303 break;
303 break;
304 //Console.WriteLine("{0}: Awake", Thread.CurrentThread.ManagedThreadId);
304 //Console.WriteLine("{0}: Awake", Thread.CurrentThread.ManagedThreadId);
305 Interlocked.Increment(ref m_activeThreads);
305 Interlocked.Increment(ref m_activeThreads);
306 } while (true);
306 } while (true);
307 //Console.WriteLine("{0}: Exited", Thread.CurrentThread.ManagedThreadId);
307 //Console.WriteLine("{0}: Exited", Thread.CurrentThread.ManagedThreadId);
308 }
308 }
309
309
310 protected virtual void Dispose(bool disposing) {
310 protected virtual void Dispose(bool disposing) {
311 if (disposing) {
311 if (disposing) {
312 if (m_exitRequired == 0) {
312 if (m_exitRequired == 0) {
313 if (Interlocked.CompareExchange(ref m_exitRequired, 1, 0) != 0)
313 if (Interlocked.CompareExchange(ref m_exitRequired, 1, 0) != 0)
314 return;
314 return;
315
315
316 // wake sleeping threads
316 // wake sleeping threads
317 if (m_createdThreads > 0)
317 if (m_createdThreads > 0)
318 SignalThread();
318 SignalThread();
319 else
319 else
320 m_hasTasks.Dispose();
320 m_hasTasks.Dispose();
321 GC.SuppressFinalize(this);
321 GC.SuppressFinalize(this);
322 }
322 }
323 }
323 }
324 }
324 }
325
325
326 public void Dispose() {
326 public void Dispose() {
327 Dispose(true);
327 Dispose(true);
328 }
328 }
329
329
330 ~DispatchPool() {
330 ~DispatchPool() {
331 Dispose(false);
331 Dispose(false);
332 }
332 }
333 }
333 }
334 }
334 }
General Comments 0
You need to be logged in to leave comments. Login now