##// END OF EJS Templates
fixed dispatch pool race condition
cin -
r24:ee04e1fa78da default
parent child
Show More
@@ -1,333 +1,333
1 1 using System;
2 2 using Microsoft.VisualStudio.TestTools.UnitTesting;
3 3 using System.Reflection;
4 4 using System.Threading;
5 5 using Implab.Parallels;
6 6
7 7 namespace Implab.Test {
8 8 [TestClass]
9 9 public class AsyncTests {
10 10 [TestMethod]
11 11 public void ResolveTest() {
12 12 int res = -1;
13 13 var p = new Promise<int>();
14 14 p.Then(x => res = x);
15 15 p.Resolve(100);
16 16
17 17 Assert.AreEqual(100, res);
18 18 }
19 19
20 20 [TestMethod]
21 21 public void RejectTest() {
22 22 int res = -1;
23 23 Exception err = null;
24 24
25 25 var p = new Promise<int>();
26 26 p.Then(x => res = x, e => err = e);
27 27 p.Reject(new ApplicationException("error"));
28 28
29 29 Assert.AreEqual(res, -1);
30 30 Assert.AreEqual(err.Message, "error");
31 31
32 32 }
33 33
34 34 [TestMethod]
35 35 public void JoinSuccessTest() {
36 36 var p = new Promise<int>();
37 37 p.Resolve(100);
38 38 Assert.AreEqual(p.Join(), 100);
39 39 }
40 40
41 41 [TestMethod]
42 42 public void JoinFailTest() {
43 43 var p = new Promise<int>();
44 44 p.Reject(new ApplicationException("failed"));
45 45
46 46 try {
47 47 p.Join();
48 48 throw new ApplicationException("WRONG!");
49 49 } catch (TargetInvocationException err) {
50 50 Assert.AreEqual(err.InnerException.Message, "failed");
51 51 } catch {
52 52 Assert.Fail("Got wrong excaption");
53 53 }
54 54 }
55 55
56 56 [TestMethod]
57 57 public void MapTest() {
58 58 var p = new Promise<int>();
59 59
60 60 var p2 = p.Map(x => x.ToString());
61 61 p.Resolve(100);
62 62
63 63 Assert.AreEqual(p2.Join(), "100");
64 64 }
65 65
66 66 [TestMethod]
67 67 public void FixErrorTest() {
68 68 var p = new Promise<int>();
69 69
70 70 var p2 = p.Error(e => 101);
71 71
72 72 p.Reject(new Exception());
73 73
74 74 Assert.AreEqual(p2.Join(), 101);
75 75 }
76 76
77 77 [TestMethod]
78 78 public void ChainTest() {
79 79 var p1 = new Promise<int>();
80 80
81 81 var p3 = p1.Chain(x => {
82 82 var p2 = new Promise<string>();
83 83 p2.Resolve(x.ToString());
84 84 return p2;
85 85 });
86 86
87 87 p1.Resolve(100);
88 88
89 89 Assert.AreEqual(p3.Join(), "100");
90 90 }
91 91
92 92 [TestMethod]
93 93 public void PoolTest() {
94 94 var pid = Thread.CurrentThread.ManagedThreadId;
95 95 var p = AsyncPool.Invoke(() => Thread.CurrentThread.ManagedThreadId);
96 96
97 97 Assert.AreNotEqual(pid, p.Join());
98 98 }
99 99
100 100 [TestMethod]
101 101 public void WorkerPoolSizeTest() {
102 102 var pool = new WorkerPool(5, 10, 0);
103 103
104 104 Assert.AreEqual(5, pool.PoolSize);
105 105
106 106 pool.Invoke(() => { Thread.Sleep(100000000); return 10; });
107 107 pool.Invoke(() => { Thread.Sleep(100000000); return 10; });
108 108 pool.Invoke(() => { Thread.Sleep(100000000); return 10; });
109 109
110 110 Assert.AreEqual(5, pool.PoolSize);
111 111
112 112 for (int i = 0; i < 100; i++)
113 113 pool.Invoke(() => { Thread.Sleep(100000000); return 10; });
114 114 Thread.Sleep(200);
115 115 Assert.AreEqual(10, pool.PoolSize);
116 116
117 117 pool.Dispose();
118 118 }
119 119
120 120 [TestMethod]
121 121 public void WorkerPoolCorrectTest() {
122 122 var pool = new WorkerPool(0,1000,100);
123 123
124 124 int iterations = 1000;
125 125 int pending = iterations;
126 126 var stop = new ManualResetEvent(false);
127 127
128 128 var count = 0;
129 129 for (int i = 0; i < iterations; i++) {
130 130 pool
131 131 .Invoke(() => 1)
132 132 .Then(x => Interlocked.Add(ref count, x))
133 133 .Then(x => Math.Log10(x))
134 134 .Anyway(() => {
135 135 Interlocked.Decrement(ref pending);
136 136 if (pending == 0)
137 137 stop.Set();
138 138 });
139 139 }
140 140
141 141 stop.WaitOne();
142 142
143 143 Assert.AreEqual(iterations, count);
144 144 Console.WriteLine("Max threads: {0}", pool.MaxRunningThreads);
145 145 pool.Dispose();
146 146
147 147 }
148 148
149 149 [TestMethod]
150 150 public void WorkerPoolDisposeTest() {
151 151 var pool = new WorkerPool(5, 20);
152 152 Assert.AreEqual(5, pool.PoolSize);
153 153 pool.Dispose();
154 154 Thread.Sleep(500);
155 155 Assert.AreEqual(0, pool.PoolSize);
156 156 pool.Dispose();
157 157 }
158 158
159 159 [TestMethod]
160 160 public void MTQueueTest() {
161 161 var queue = new MTQueue<int>();
162 162 int res;
163 163
164 164 queue.Enqueue(10);
165 165 Assert.IsTrue(queue.TryDequeue(out res));
166 166 Assert.AreEqual(10, res);
167 167 Assert.IsFalse(queue.TryDequeue(out res));
168 168
169 169 for (int i = 0; i < 1000; i++)
170 170 queue.Enqueue(i);
171 171
172 172 for (int i = 0; i < 1000; i++) {
173 173 queue.TryDequeue(out res);
174 174 Assert.AreEqual(i, res);
175 175 }
176 176
177 177 int writers = 0;
178 178 int readers = 0;
179 179 var stop = new ManualResetEvent(false);
180 180 int total = 0;
181 181
182 182 int itemsPerWriter = 1000;
183 183 int writersCount = 3;
184 184
185 185 for (int i = 0; i < writersCount; i++) {
186 186 Interlocked.Increment(ref writers);
187 187 var wn = i;
188 188 AsyncPool
189 189 .InvokeNewThread(() => {
190 190 for (int ii = 0; ii < itemsPerWriter; ii++) {
191 191 queue.Enqueue(1);
192 192 }
193 193 return 1;
194 194 })
195 195 .Anyway(() => Interlocked.Decrement(ref writers));
196 196 }
197 197
198 198 for (int i = 0; i < 10; i++) {
199 199 Interlocked.Increment(ref readers);
200 200 var wn = i;
201 201 AsyncPool
202 202 .InvokeNewThread(() => {
203 203 int t;
204 204 do {
205 205 while (queue.TryDequeue(out t))
206 206 Interlocked.Add(ref total, t);
207 207 } while (writers > 0);
208 208 return 1;
209 209 })
210 210 .Anyway(() => {
211 211 Interlocked.Decrement(ref readers);
212 212 if (readers == 0)
213 213 stop.Set();
214 214 });
215 215 }
216 216
217 217 stop.WaitOne();
218 218
219 219 Assert.AreEqual(itemsPerWriter * writersCount, total);
220 220 }
221 221
222 222 [TestMethod]
223 223 public void ParallelMapTest() {
224 224
225 225 int count = 100000;
226 226
227 227 double[] args = new double[count];
228 228 var rand = new Random();
229 229
230 230 for (int i = 0; i < count; i++)
231 231 args[i] = rand.NextDouble();
232 232
233 233 var t = Environment.TickCount;
234 234 var res = args.ParallelMap(x => Math.Sin(x*x), 4).Join();
235 235
236 236 Console.WriteLine("Map complete in {0} ms", Environment.TickCount - t);
237 237
238 238 t = Environment.TickCount;
239 239 for (int i = 0; i < count; i++)
240 240 Assert.AreEqual(Math.Sin(args[i] * args[i]), res[i]);
241 241 Console.WriteLine("Verified in {0} ms", Environment.TickCount - t);
242 242 }
243 243
244 244 [TestMethod]
245 245 public void ChainedMapTest() {
246 246
247 using (var pool = new WorkerPool(4,4,0)) {
247 using (var pool = new WorkerPool(0,100,0)) {
248 248 int count = 10000;
249 249
250 250 double[] args = new double[count];
251 251 var rand = new Random();
252 252
253 253 for (int i = 0; i < count; i++)
254 254 args[i] = rand.NextDouble();
255 255
256 256 var t = Environment.TickCount;
257 257 var res = args
258 .ChainedMap(
258 .ChainedMap2(
259 259 x => pool.Invoke(
260 260 () => Math.Sin(x * x)
261 261 ),
262 262 4
263 263 )
264 264 .Join();
265 265
266 266 Console.WriteLine("Map complete in {0} ms", Environment.TickCount - t);
267 267
268 268 t = Environment.TickCount;
269 269 for (int i = 0; i < count; i++)
270 270 Assert.AreEqual(Math.Sin(args[i] * args[i]), res[i]);
271 271 Console.WriteLine("Verified in {0} ms", Environment.TickCount - t);
272 272 Console.WriteLine("Max workers: {0}", pool.MaxRunningThreads);
273 273 }
274 274 }
275 275
276 276 [TestMethod]
277 277 public void ParallelForEachTest() {
278 278
279 279 int count = 100000;
280 280
281 281 int[] args = new int[count];
282 282 var rand = new Random();
283 283
284 284 for (int i = 0; i < count; i++)
285 285 args[i] = (int)(rand.NextDouble() * 100);
286 286
287 287 int result = 0;
288 288
289 289 var t = Environment.TickCount;
290 290 args.ParallelForEach(x => Interlocked.Add(ref result, x), 4).Join();
291 291
292 292 Console.WriteLine("Iteration complete in {0} ms, result: {1}", Environment.TickCount - t, result);
293 293
294 294 int result2 = 0;
295 295
296 296 t = Environment.TickCount;
297 297 for (int i = 0; i < count; i++)
298 298 result2 += args[i];
299 299 Assert.AreEqual(result2, result);
300 300 Console.WriteLine("Verified in {0} ms", Environment.TickCount - t);
301 301 }
302 302
303 303 [TestMethod]
304 304 public void ComplexCase1Test() {
305 305 var flags = new bool[3];
306 306
307 307 // op1 (aync 200ms) => op2 (async 200ms) => op3 (sync map)
308 308
309 309 var p = PromiseHelper
310 310 .Sleep(200, "Alan")
311 311 .Cancelled(() => flags[0] = true)
312 312 .Chain(x =>
313 313 PromiseHelper
314 314 .Sleep(200, "Hi, " + x)
315 315 .Map(y => y)
316 316 .Cancelled(() => flags[1] = true)
317 317 )
318 318 .Cancelled(() => flags[2] = true);
319 319 Thread.Sleep(300);
320 320 p.Cancel();
321 321 try {
322 322 Assert.AreEqual(p.Join(), "Hi, Alan");
323 323 Assert.Fail("Shouldn't get here");
324 324 } catch (OperationCanceledException) {
325 325 }
326 326
327 327 Assert.IsFalse(flags[0]);
328 328 Assert.IsTrue(flags[1]);
329 329 Assert.IsTrue(flags[2]);
330 330 }
331 331 }
332 332 }
333 333
1 NO CONTENT: modified file, binary diff hidden
@@ -1,171 +1,212
1 1 using System;
2 2 using System.Collections.Generic;
3 3 using System.Diagnostics;
4 4 using System.Linq;
5 5 using System.Text;
6 6 using System.Threading;
7 7
8 8 namespace Implab.Parallels {
9 9 public static class ArrayTraits {
10 10 class ArrayIterator<TSrc> : DispatchPool<int> {
11 11 readonly Action<TSrc> m_action;
12 12 readonly TSrc[] m_source;
13 13 readonly Promise<int> m_promise = new Promise<int>();
14 14
15 15 int m_pending;
16 16 int m_next;
17 17
18 18 public ArrayIterator(TSrc[] source, Action<TSrc> action, int threads)
19 19 : base(threads) {
20 20
21 21 Debug.Assert(source != null);
22 22 Debug.Assert(action != null);
23 23
24 24 m_next = 0;
25 25 m_source = source;
26 26 m_pending = source.Length;
27 27 m_action = action;
28 28
29 29 m_promise.Anyway(() => Dispose());
30 30 m_promise.Cancelled(() => Dispose());
31 31
32 32 InitPool();
33 33 }
34 34
35 35 public Promise<int> Promise {
36 36 get {
37 37 return m_promise;
38 38 }
39 39 }
40 40
41 41 protected override bool TryDequeue(out int unit) {
42 42 unit = Interlocked.Increment(ref m_next) - 1;
43 43 return unit >= m_source.Length ? false : true;
44 44 }
45 45
46 46 protected override void InvokeUnit(int unit) {
47 47 try {
48 48 m_action(m_source[unit]);
49 49 var pending = Interlocked.Decrement(ref m_pending);
50 50 if (pending == 0)
51 51 m_promise.Resolve(m_source.Length);
52 52 } catch (Exception e) {
53 53 m_promise.Reject(e);
54 54 }
55 55 }
56 56 }
57 57
58 58 class ArrayMapper<TSrc, TDst>: DispatchPool<int> {
59 59 readonly Func<TSrc, TDst> m_transform;
60 60 readonly TSrc[] m_source;
61 61 readonly TDst[] m_dest;
62 62 readonly Promise<TDst[]> m_promise = new Promise<TDst[]>();
63 63
64 64 int m_pending;
65 65 int m_next;
66 66
67 67 public ArrayMapper(TSrc[] source, Func<TSrc, TDst> transform, int threads)
68 68 : base(threads) {
69 69
70 70 Debug.Assert (source != null);
71 71 Debug.Assert( transform != null);
72 72
73 73 m_next = 0;
74 74 m_source = source;
75 75 m_dest = new TDst[source.Length];
76 76 m_pending = source.Length;
77 77 m_transform = transform;
78 78
79 79 m_promise.Anyway(() => Dispose());
80 80 m_promise.Cancelled(() => Dispose());
81 81
82 82 InitPool();
83 83 }
84 84
85 85 public Promise<TDst[]> Promise {
86 86 get {
87 87 return m_promise;
88 88 }
89 89 }
90 90
91 91 protected override bool TryDequeue(out int unit) {
92 92 unit = Interlocked.Increment(ref m_next) - 1;
93 93 return unit >= m_source.Length ? false : true;
94 94 }
95 95
96 96 protected override void InvokeUnit(int unit) {
97 97 try {
98 98 m_dest[unit] = m_transform(m_source[unit]);
99 99 var pending = Interlocked.Decrement(ref m_pending);
100 100 if (pending == 0)
101 101 m_promise.Resolve(m_dest);
102 102 } catch (Exception e) {
103 103 m_promise.Reject(e);
104 104 }
105 105 }
106 106 }
107 107
108 108 public static Promise<TDst[]> ParallelMap<TSrc, TDst> (this TSrc[] source, Func<TSrc,TDst> transform, int threads) {
109 109 if (source == null)
110 110 throw new ArgumentNullException("source");
111 111 if (transform == null)
112 112 throw new ArgumentNullException("transform");
113 113
114 114 var mapper = new ArrayMapper<TSrc, TDst>(source, transform, threads);
115 115 return mapper.Promise;
116 116 }
117 117
118 118 public static Promise<int> ParallelForEach<TSrc>(this TSrc[] source, Action<TSrc> action, int threads) {
119 119 if (source == null)
120 120 throw new ArgumentNullException("source");
121 121 if (action == null)
122 122 throw new ArgumentNullException("action");
123 123
124 124 var iter = new ArrayIterator<TSrc>(source, action, threads);
125 125 return iter.Promise;
126 126 }
127 127
128 128 public static Promise<TDst[]> ChainedMap<TSrc, TDst>(this TSrc[] source, ChainedOperation<TSrc, TDst> transform, int threads) {
129 129 if (source == null)
130 130 throw new ArgumentNullException("source");
131 131 if (transform == null)
132 132 throw new ArgumentNullException("transform");
133 133 if (threads <= 0)
134 134 throw new ArgumentOutOfRangeException("Threads number must be greater then zero");
135 135
136 136 var promise = new Promise<TDst[]>();
137 137 var res = new TDst[source.Length];
138 138 var pending = source.Length;
139 139 var semaphore = new Semaphore(threads, threads);
140 140
141 141 AsyncPool.InvokeNewThread(() => {
142 142 for (int i = 0; i < source.Length; i++) {
143 143 if(promise.IsResolved)
144 144 break; // stop processing in case of error or cancellation
145 145 var idx = i;
146 146 semaphore.WaitOne();
147 147 try {
148 148 var p1 = transform(source[i]);
149 149 p1.Anyway(() => semaphore.Release());
150 150 p1.Cancelled(() => semaphore.Release());
151 151 p1.Then(
152 152 x => {
153 153 res[idx] = x;
154 154 var left = Interlocked.Decrement(ref pending);
155 155 if (left == 0)
156 156 promise.Resolve(res);
157 157 },
158 158 e => promise.Reject(e)
159 159 );
160 160
161 161 } catch (Exception e) {
162 162 promise.Reject(e);
163 163 }
164 164 }
165 165 return 0;
166 166 });
167 167
168 168 return promise.Anyway(() => semaphore.Dispose());
169 169 }
170
171 /*
172 this method is pretty fast, but it may cause a stack overflow if an element transformation is made faster then the next operation is
173 be chained, in this case the syncronous callback invocation will occur
174
175 public static Promise<TDst[]> ChainedMap2<TSrc, TDst>(this TSrc[] source, ChainedOperation<TSrc, TDst> transform, int threads) {
176 if (source == null)
177 throw new ArgumentNullException("source");
178 if (transform == null)
179 throw new ArgumentNullException("transform");
180 if (threads <= 0)
181 throw new ArgumentOutOfRangeException("Threads number must be greater then zero");
182
183 var promise = new Promise<TDst[]>();
184 var res = new TDst[source.Length];
185 var index = -1; // we will start with increment
186 var len = source.Length;
187 var pending = len;
188
189 Action<int> callback = null;
190 callback = (current) => {
191 if (current < len) {
192 transform(source[current])
193 .Then(
194 x => {
195 res[current] = x;
196 if (Interlocked.Decrement(ref pending) == 0)
197 promise.Resolve(res);
198 else
199 callback(Interlocked.Increment(ref index));
200 },
201 e => promise.Reject(e)
202 );
203 }
204 };
205
206 for (int i = 0; i < threads; i++)
207 callback(Interlocked.Increment(ref index));
208 return promise;
209 }
210 */
170 211 }
171 212 }
@@ -1,330 +1,332
1 1 using System;
2 2 using System.Collections.Generic;
3 3 using System.Linq;
4 4 using System.Text;
5 5 using System.Threading;
6 6 using System.Diagnostics;
7 7
8 8 namespace Implab.Parallels {
9 9 public abstract class DispatchPool<TUnit> : IDisposable {
10 10 readonly int m_minThreads;
11 11 readonly int m_maxThreads;
12 12
13 13 int m_createdThreads = 0; // the current size of the pool
14 14 int m_activeThreads = 0; // the count of threads which are active
15 15 int m_sleepingThreads = 0; // the count of currently inactive threads
16 16 int m_maxRunningThreads = 0; // the meximum reached size of the pool
17 17 int m_exitRequired = 0; // the pool is going to shutdown, all unused workers are released
18 18 int m_releaseTimeout = 100; // the timeout while the working thread will wait for the new tasks before exit
19 19 int m_wakeEvents = 0; // the count of wake events
20 20
21 21 AutoResetEvent m_hasTasks = new AutoResetEvent(false);
22 22
23 23 protected DispatchPool(int min, int max) {
24 24 if (min < 0)
25 25 throw new ArgumentOutOfRangeException("min");
26 26 if (max <= 0)
27 27 throw new ArgumentOutOfRangeException("max");
28 28
29 29 if (min > max)
30 30 min = max;
31 31 m_minThreads = min;
32 32 m_maxThreads = max;
33 33 }
34 34
35 35 protected DispatchPool(int threads)
36 36 : this(threads, threads) {
37 37 }
38 38
39 39 protected DispatchPool() {
40 40 int maxThreads, maxCP;
41 41 ThreadPool.GetMaxThreads(out maxThreads, out maxCP);
42 42
43 43 m_minThreads = 0;
44 44 m_maxThreads = maxThreads;
45 45 }
46 46
47 47 protected void InitPool() {
48 48 for (int i = 0; i < m_minThreads; i++)
49 49 StartWorker();
50 50 }
51 51
52 52 public int PoolSize {
53 53 get {
54 54 return m_createdThreads;
55 55 }
56 56 }
57 57
58 58 public int ActiveThreads {
59 59 get {
60 60 return m_activeThreads;
61 61 }
62 62 }
63 63
64 64 public int MaxRunningThreads {
65 65 get {
66 66 return m_maxRunningThreads;
67 67 }
68 68 }
69 69
70 70 protected bool IsDisposed {
71 71 get {
72 72 return m_exitRequired != 0;
73 73 }
74 74 }
75 75
76 76 protected abstract bool TryDequeue(out TUnit unit);
77 77
78 78 #region thread execution traits
79 79 int SignalThread() {
80 80 var signals = Interlocked.Increment(ref m_wakeEvents);
81 81 if(signals == 1)
82 82 m_hasTasks.Set();
83 83 return signals;
84 84 }
85 85
86 86 bool FetchSignalOrWait(int timeout) {
87 87 var start = Environment.TickCount;
88 88
89 89 // ΠΎΠ·Π½Π°Ρ‡Π°Π΅Ρ‚, Ρ‡Ρ‚ΠΎ ΠΏΠΎΡ‚ΠΎΠΊ Π²Π»Π°Π΄Π΅Π΅Ρ‚ Π±Π»ΠΎΠΊΠΈΡ€ΠΎΠ²ΠΊΠΎΠΉ ΠΈ ΠΏΡ€ΠΈ ΡƒΡΠΏΠ΅ΡˆΠ½ΠΎΠΌ ΠΏΠΎΠ»ΡƒΡ‡Π΅Π½ΠΈΠΈ сигнала Π΄ΠΎΠ»ΠΆΠ΅Π½
90 90 // Π΅Π΅ Π²Π΅Ρ€Π½ΡƒΡ‚ΡŒ, Ρ‡Ρ‚ΠΎΠ±Ρ‹ Π΄Ρ€ΡƒΠ³ΠΎΠΉ ΠΎΠΆΠΈΠ΄Π°ΡŽΡ‰ΠΈΠΉ ΠΏΠΎΡ‚ΠΎΠΊ смог
91 91 bool hasLock = false;
92 92 do {
93 93 int signals;
94 94 do {
95 95 signals = m_wakeEvents;
96 96 if (signals == 0)
97 97 break;
98 98 } while (Interlocked.CompareExchange(ref m_wakeEvents, signals - 1, signals) != signals);
99 99
100 100 if (signals >= 1) {
101 101 if (signals > 1 && hasLock)
102 102 m_hasTasks.Set();
103 103 return true;
104 104 }
105 105
106 106 if (timeout != -1)
107 107 timeout = Math.Max(0, timeout - (Environment.TickCount - start));
108 108
109 109 // Ссли сигналов большС Π½Π΅ ΠΎΡΡ‚Π°Π»ΠΎΡΡŒ, Ρ‚ΠΎ ΠΏΠ΅Ρ€Π²Ρ‹ΠΉ ΠΏΠΎΡ‚ΠΎΠΊ, ΠΊΠΎΡ‚ΠΎΡ€Ρ‹ΠΉ дошСл сюда сбросит событиС
110 110 // ΠΈ ΡƒΠΉΠ΄Π΅Ρ‚ Π½Π° пустой Ρ†ΠΈΠΊΠ», послС Ρ‡Π΅Π³ΠΎ заблокируСтся
111 111
112 112 hasLock = true;
113 113 } while (m_hasTasks.WaitOne(timeout));
114 114
115 115 return false;
116 116 }
117 117
118 118 bool Sleep(int timeout) {
119 119 Interlocked.Increment(ref m_sleepingThreads);
120 120 if (FetchSignalOrWait(timeout)) {
121 121 Interlocked.Decrement(ref m_sleepingThreads);
122 122 return true;
123 123 } else {
124 124 Interlocked.Decrement(ref m_sleepingThreads);
125 125 return false;
126 126 }
127 127 }
128 128 #endregion
129 129
130 130 /// <summary>
131 131 /// ЗапускаСт Π»ΠΈΠ±ΠΎ Π½ΠΎΠ²Ρ‹ΠΉ ΠΏΠΎΡ‚ΠΎΠΊ, Ссли Ρ€Π°Π½ΡŒΡˆΠ΅ Π½Π΅ Π±Ρ‹Π»ΠΎ Π½ΠΈ ΠΎΠ΄Π½ΠΎΠ³ΠΎ ΠΏΠΎΡ‚ΠΎΠΊΠ°, Π»ΠΈΠ±ΠΎ устанавливаСт событиС ΠΏΡ€ΠΎΠ±ΡƒΠΆΠ΄Π΅Π½ΠΈΠ΅ ΠΎΠ΄Π½ΠΎΠ³ΠΎ спящСго ΠΏΠΎΡ‚ΠΎΠΊΠ°
132 132 /// </summary>
133 133 protected void GrowPool() {
134 134 if (m_exitRequired != 0)
135 135 return;
136 136 if (m_sleepingThreads > m_wakeEvents) {
137 137 //Console.WriteLine("Waking threads (sleeps {0}, pending {1})", m_sleepingThreads, m_wakeEvents);
138 138
139 139 // all sleeping threads may gone
140 140 SignalThread(); // wake a sleeping thread;
141 141
142 142 // we can't check whether signal has been processed
143 143 // anyway it may take some time for the thread to start
144 144 // we will ensure that at least one thread is running
145 145
146 146 if (AllocateThreadSlot(1)) {
147 147 // if there were no threads in the pool
148 148 var worker = new Thread(this.Worker);
149 149 worker.IsBackground = true;
150 150 worker.Start();
151 151 }
152 152 } else {
153 153 // if there is no sleeping threads in the pool
154 StartWorker();
154 if (!StartWorker())
155 // we haven't started a new thread, but the current can be on the way and it can't process the queue
156 // send it a signal to spin again
157 SignalThread();
155 158 }
156 159 }
157 160
158 161 private bool Suspend() {
159 162 //no tasks left, exit if the thread is no longer needed
160 163 bool last;
161 164 bool requestExit;
162 165
163 166 // if threads have a timeout before releasing
164 167 if (m_releaseTimeout > 0)
165 168 requestExit = !Sleep(m_releaseTimeout);
166 169 else
167 170 requestExit = true;
168 171
169 172 if (!requestExit)
170 173 return true;
171 174
172 175 // release unsused thread
173 176 if (requestExit && ReleaseThreadSlot(out last)) {
174 177 // in case at the moment the last thread was being released
175 178 // a new task was added to the queue, we need to try
176 179 // to revoke the thread to avoid the situation when the task is left unprocessed
177 180 if (last && Sleep(0)) { // Sleep(0) will fetch pending task or will return false
178 181 if (AllocateThreadSlot(1))
179 182 return true; // spin again...
180 183 else
181 184 SignalThread(); // since Sleep(0) has fetched the signal we neet to reschedule it
182 185
183 186 }
184 187
185 188 return false;
186 189 }
187 190
188 191 // wait till infinity
189 192 Sleep(-1);
190 193
191 194 return true;
192 195 }
193 196
194 197 #region thread slots traits
195 198
196 199 bool AllocateThreadSlot() {
197 200 int current;
198 201 // use spins to allocate slot for the new thread
199 202 do {
200 203 current = m_createdThreads;
201 204 if (current >= m_maxThreads || m_exitRequired != 0)
202 205 // no more slots left or the pool has been disposed
203 206 return false;
204 207 } while (current != Interlocked.CompareExchange(ref m_createdThreads, current + 1, current));
205 208
206 209 UpdateMaxThreads(current + 1);
207 210
208 211 return true;
209 212 }
210 213
211 214 bool AllocateThreadSlot(int desired) {
212 215 if (desired - 1 != Interlocked.CompareExchange(ref m_createdThreads, desired, desired - 1))
213 216 return false;
214 217
215 218 UpdateMaxThreads(desired);
216 219
217 220 return true;
218 221 }
219 222
220 223 bool ReleaseThreadSlot(out bool last) {
221 224 last = false;
222 225 int current;
223 226 // use spins to release slot for the new thread
224 227 do {
225 228 current = m_createdThreads;
226 229 if (current <= m_minThreads && m_exitRequired == 0)
227 230 // the thread is reserved
228 231 return false;
229 232 } while (current != Interlocked.CompareExchange(ref m_createdThreads, current - 1, current));
230 233
231 234 last = (current == 1);
232 235
233 236 return true;
234 237 }
235 238
236 239 /// <summary>
237 240 /// releases thread slot unconditionally, used during cleanup
238 241 /// </summary>
239 242 /// <returns>true - no more threads left</returns>
240 243 bool ReleaseThreadSlotAnyway() {
241 244 var left = Interlocked.Decrement(ref m_createdThreads);
242 245 return left == 0;
243 246 }
244 247
245 248 void UpdateMaxThreads(int count) {
246 249 int max;
247 250 do {
248 251 max = m_maxRunningThreads;
249 252 if (max >= count)
250 253 break;
251 254 } while(max != Interlocked.CompareExchange(ref m_maxRunningThreads, count, max));
252 255 }
253 256
254 257 #endregion
255 258
256 259 bool StartWorker() {
257 260 if (AllocateThreadSlot()) {
258 261 // slot successfully allocated
259 262 var worker = new Thread(this.Worker);
260 263 worker.IsBackground = true;
261 264 worker.Start();
262 265
263 266 return true;
264 267 } else {
265 268 return false;
266 269 }
267 270 }
268 271
269 272 protected abstract void InvokeUnit(TUnit unit);
270 273
271 274 void Worker() {
272 275 TUnit unit;
273 276 //Console.WriteLine("{0}: Active", Thread.CurrentThread.ManagedThreadId);
274 277 Interlocked.Increment(ref m_activeThreads);
275 278 do {
276 279 // exit if requested
277 280 if (m_exitRequired != 0) {
278 281 // release the thread slot
279 282 Interlocked.Decrement(ref m_activeThreads);
280 283 if (ReleaseThreadSlotAnyway()) // it was the last worker
281 284 m_hasTasks.Dispose();
282 285 else
283 286 SignalThread(); // wake next worker
284 unit = default(TUnit);
285 287 break;
286 288 }
287 289
288 290 // fetch task
289 291 if (TryDequeue(out unit)) {
290 292 InvokeUnit(unit);
291 293 continue;
292 294 }
293 295
294 296 Interlocked.Decrement(ref m_activeThreads);
295 297
296 298 // entering suspend state
297 299 // keep this thread and wait
298 300 if (!Suspend())
299 301 break;
300 302 //Console.WriteLine("{0}: Awake", Thread.CurrentThread.ManagedThreadId);
301 303 Interlocked.Increment(ref m_activeThreads);
302 304 } while (true);
303 305 //Console.WriteLine("{0}: Exited", Thread.CurrentThread.ManagedThreadId);
304 306 }
305 307
306 308 protected virtual void Dispose(bool disposing) {
307 309 if (disposing) {
308 310 if (m_exitRequired == 0) {
309 311 if (Interlocked.CompareExchange(ref m_exitRequired, 1, 0) != 0)
310 312 return;
311 313
312 314 // wake sleeping threads
313 315 if (m_createdThreads > 0)
314 316 SignalThread();
315 317 else
316 318 m_hasTasks.Dispose();
317 319 GC.SuppressFinalize(this);
318 320 }
319 321 }
320 322 }
321 323
322 324 public void Dispose() {
323 325 Dispose(true);
324 326 }
325 327
326 328 ~DispatchPool() {
327 329 Dispose(false);
328 330 }
329 331 }
330 332 }
@@ -1,75 +1,75
1 1 using System;
2 2 using System.Collections.Generic;
3 3 using System.Linq;
4 4 using System.Text;
5 5 using System.Threading;
6 6
7 7 namespace Implab.Parallels {
8 8 public class MTQueue<T> {
9 9 class Node {
10 10 public Node(T value) {
11 11 this.value = value;
12 12 }
13 13 public readonly T value;
14 14 public Node next;
15 15 }
16 16
17 17 Node m_first;
18 18 Node m_last;
19 19
20 20 public void Enqueue(T value) {
21 21 var last = m_last;
22 22 var next = new Node(value);
23 23
24 24 while (last != Interlocked.CompareExchange(ref m_last, next, last))
25 25 last = m_last;
26 26
27 27 if (last != null)
28 28 last.next = next;
29 29 else
30 30 m_first = next;
31 31 }
32 32
33 33 public bool TryDequeue(out T value) {
34 34 Node first;
35 35 Node next = null;
36 36 value = default(T);
37 37
38 38 do {
39 39 first = m_first;
40 40 if (first == null)
41 41 return false;
42 42 next = first.next;
43 43 if (next == null) {
44 44 // this is the last element,
45 45 // then try to update the tail
46 46 if (first != Interlocked.CompareExchange(ref m_last, null, first)) {
47 // this is a ace condition
47 // this is a race condition
48 48 if (m_last == null)
49 49 // the queue is empty
50 50 return false;
51 51 // tail has been changed, than we need to restart
52 52 continue;
53 53 }
54 54
55 55 // tail succesfully updated and first.next will never be changed
56 56 // other readers will fail due to inconsistency m_last != m_fist, but m_first.next == null
57 57 // but the writer may update the m_first since the m_last is null
58 58
59 59 // so we need to fix inconsistency by setting m_first to null, but if it already has been
60 60 // updated by a writer then we should just give up
61 61 Interlocked.CompareExchange(ref m_first, null, first);
62 62 break;
63 63
64 64 } else {
65 65 if (first == Interlocked.CompareExchange(ref m_first, next, first))
66 66 // head succesfully updated
67 67 break;
68 68 }
69 69 } while (true);
70 70
71 71 value = first.value;
72 72 return true;
73 73 }
74 74 }
75 75 }
General Comments 0
You need to be logged in to leave comments. Login now