##// END OF EJS Templates
dispatch pool rewritten
cin -
r81:2c5631b43c7d v2
parent child
Show More
@@ -1,434 +1,434
1 1 using System;
2 2 using System.Reflection;
3 3 using System.Threading;
4 4 using Implab.Parallels;
5 5
6 6 #if MONO
7 7
8 8 using NUnit.Framework;
9 9 using TestClassAttribute = NUnit.Framework.TestFixtureAttribute;
10 10 using TestMethod = NUnit.Framework.TestAttribute;
11 11
12 12 #else
13 13
14 14 using Microsoft.VisualStudio.TestTools.UnitTesting;
15 15
16 16 #endif
17 17
18 18 namespace Implab.Test {
19 19 [TestClass]
20 20 public class AsyncTests {
21 21 [TestMethod]
22 22 public void ResolveTest() {
23 23 int res = -1;
24 24 var p = new Promise<int>();
25 25 p.Then(x => res = x);
26 26 p.Resolve(100);
27 27
28 28 Assert.AreEqual(100, res);
29 29 }
30 30
31 31 [TestMethod]
32 32 public void RejectTest() {
33 33 int res = -1;
34 34 Exception err = null;
35 35
36 36 var p = new Promise<int>();
37 37 p.Then(
38 38 x => res = x,
39 39 e => {
40 40 err = e;
41 41 return -2;
42 42 }
43 43 );
44 44 p.Reject(new ApplicationException("error"));
45 45
46 46 Assert.AreEqual(res, -1);
47 47 Assert.AreEqual(err.Message, "error");
48 48
49 49 }
50 50
51 51 [TestMethod]
52 52 public void CancelExceptionTest() {
53 53 var p = new Promise<bool>();
54 54 p.Cancel();
55 55
56 56 var p2 = p.Cancelled(() => {
57 57 throw new ApplicationException("CANCELLED");
58 58 });
59 59
60 60 try {
61 61 p2.Join();
62 62 Assert.Fail();
63 63 } catch (ApplicationException err) {
64 64 Assert.AreEqual("CANCELLED", err.InnerException.Message);
65 65 }
66 66
67 67 }
68 68
69 69 [TestMethod]
70 70 public void ContinueOnCancelTest() {
71 71 var p = new Promise<bool>();
72 72 p.Cancel();
73 73
74 74 var p2 = p
75 75 .Cancelled(() => {
76 76 throw new ApplicationException("CANCELLED");
77 77 })
78 78 .Error(e => true);
79 79
80 80 Assert.AreEqual(true, p2.Join());
81 81 }
82 82
83 83 [TestMethod]
84 84 public void JoinSuccessTest() {
85 85 var p = new Promise<int>();
86 86 p.Resolve(100);
87 87 Assert.AreEqual(p.Join(), 100);
88 88 }
89 89
90 90 [TestMethod]
91 91 public void JoinFailTest() {
92 92 var p = new Promise<int>();
93 93 p.Reject(new ApplicationException("failed"));
94 94
95 95 try {
96 96 p.Join();
97 97 throw new ApplicationException("WRONG!");
98 98 } catch (TargetInvocationException err) {
99 99 Assert.AreEqual(err.InnerException.Message, "failed");
100 100 } catch {
101 101 Assert.Fail("Got wrong excaption");
102 102 }
103 103 }
104 104
105 105 [TestMethod]
106 106 public void MapTest() {
107 107 var p = new Promise<int>();
108 108
109 109 var p2 = p.Then(x => x.ToString());
110 110 p.Resolve(100);
111 111
112 112 Assert.AreEqual(p2.Join(), "100");
113 113 }
114 114
115 115 [TestMethod]
116 116 public void FixErrorTest() {
117 117 var p = new Promise<int>();
118 118
119 119 var p2 = p.Error(e => 101);
120 120
121 121 p.Reject(new Exception());
122 122
123 123 Assert.AreEqual(p2.Join(), 101);
124 124 }
125 125
126 126 [TestMethod]
127 127 public void ChainTest() {
128 128 var p1 = new Promise<int>();
129 129
130 130 var p3 = p1.Chain(x => {
131 131 var p2 = new Promise<string>();
132 132 p2.Resolve(x.ToString());
133 133 return p2;
134 134 });
135 135
136 136 p1.Resolve(100);
137 137
138 138 Assert.AreEqual(p3.Join(), "100");
139 139 }
140 140
141 141 [TestMethod]
142 142 public void PoolTest() {
143 143 var pid = Thread.CurrentThread.ManagedThreadId;
144 144 var p = AsyncPool.Invoke(() => Thread.CurrentThread.ManagedThreadId);
145 145
146 146 Assert.AreNotEqual(pid, p.Join());
147 147 }
148 148
149 149 [TestMethod]
150 150 public void WorkerPoolSizeTest() {
151 var pool = new WorkerPool(5, 10, 0);
151 var pool = new WorkerPool(5, 10, 1);
152 152
153 153 Assert.AreEqual(5, pool.PoolSize);
154 154
155 155 pool.Invoke(() => { Thread.Sleep(100000000); return 10; });
156 156 pool.Invoke(() => { Thread.Sleep(100000000); return 10; });
157 157 pool.Invoke(() => { Thread.Sleep(100000000); return 10; });
158 158
159 159 Assert.AreEqual(5, pool.PoolSize);
160 160
161 161 for (int i = 0; i < 100; i++)
162 162 pool.Invoke(() => { Thread.Sleep(100000000); return 10; });
163 163 Thread.Sleep(200);
164 164 Assert.AreEqual(10, pool.PoolSize);
165 165
166 166 pool.Dispose();
167 167 }
168 168
169 169 [TestMethod]
170 170 public void WorkerPoolCorrectTest() {
171 171 var pool = new WorkerPool(0,1000,100);
172 172
173 173 const int iterations = 1000;
174 174 int pending = iterations;
175 175 var stop = new ManualResetEvent(false);
176 176
177 177 var count = 0;
178 178 for (int i = 0; i < iterations; i++) {
179 179 pool
180 180 .Invoke(() => 1)
181 181 .Then(x => Interlocked.Add(ref count, x))
182 182 .Then(x => Math.Log10(x))
183 183 .Anyway(() => {
184 184 Interlocked.Decrement(ref pending);
185 185 if (pending == 0)
186 186 stop.Set();
187 187 });
188 188 }
189 189
190 190 stop.WaitOne();
191 191
192 192 Assert.AreEqual(iterations, count);
193 193 Console.WriteLine("Max threads: {0}", pool.MaxRunningThreads);
194 194 pool.Dispose();
195 195
196 196 }
197 197
198 198 [TestMethod]
199 199 public void WorkerPoolDisposeTest() {
200 200 var pool = new WorkerPool(5, 20);
201 201 Assert.AreEqual(5, pool.PoolSize);
202 202 pool.Dispose();
203 203 Thread.Sleep(500);
204 204 Assert.AreEqual(0, pool.PoolSize);
205 205 pool.Dispose();
206 206 }
207 207
208 208 [TestMethod]
209 209 public void MTQueueTest() {
210 210 var queue = new MTQueue<int>();
211 211 int res;
212 212
213 213 queue.Enqueue(10);
214 214 Assert.IsTrue(queue.TryDequeue(out res));
215 215 Assert.AreEqual(10, res);
216 216 Assert.IsFalse(queue.TryDequeue(out res));
217 217
218 218 for (int i = 0; i < 1000; i++)
219 219 queue.Enqueue(i);
220 220
221 221 for (int i = 0; i < 1000; i++) {
222 222 queue.TryDequeue(out res);
223 223 Assert.AreEqual(i, res);
224 224 }
225 225
226 226 int writers = 0;
227 227 int readers = 0;
228 228 var stop = new ManualResetEvent(false);
229 229 int total = 0;
230 230
231 231 const int itemsPerWriter = 10000;
232 232 const int writersCount = 10;
233 233
234 234 for (int i = 0; i < writersCount; i++) {
235 235 Interlocked.Increment(ref writers);
236 236 AsyncPool
237 237 .InvokeNewThread(() => {
238 238 for (int ii = 0; ii < itemsPerWriter; ii++) {
239 239 queue.Enqueue(1);
240 240 }
241 241 return 1;
242 242 })
243 243 .Anyway(() => Interlocked.Decrement(ref writers));
244 244 }
245 245
246 246 for (int i = 0; i < 10; i++) {
247 247 Interlocked.Increment(ref readers);
248 248 AsyncPool
249 249 .InvokeNewThread(() => {
250 250 int t;
251 251 do {
252 252 while (queue.TryDequeue(out t))
253 253 Interlocked.Add(ref total, t);
254 254 } while (writers > 0);
255 255 return 1;
256 256 })
257 257 .Anyway(() => {
258 258 Interlocked.Decrement(ref readers);
259 259 if (readers == 0)
260 260 stop.Set();
261 261 });
262 262 }
263 263
264 264 stop.WaitOne();
265 265
266 266 Assert.AreEqual(itemsPerWriter * writersCount, total);
267 267 }
268 268
269 269 [TestMethod]
270 270 public void ParallelMapTest() {
271 271
272 272 const int count = 100000;
273 273
274 274 var args = new double[count];
275 275 var rand = new Random();
276 276
277 277 for (int i = 0; i < count; i++)
278 278 args[i] = rand.NextDouble();
279 279
280 280 var t = Environment.TickCount;
281 281 var res = args.ParallelMap(x => Math.Sin(x*x), 4).Join();
282 282
283 283 Console.WriteLine("Map complete in {0} ms", Environment.TickCount - t);
284 284
285 285 t = Environment.TickCount;
286 286 for (int i = 0; i < count; i++)
287 287 Assert.AreEqual(Math.Sin(args[i] * args[i]), res[i]);
288 288 Console.WriteLine("Verified in {0} ms", Environment.TickCount - t);
289 289 }
290 290
291 291 [TestMethod]
292 292 public void ChainedMapTest() {
293 293
294 using (var pool = new WorkerPool(0,10,100)) {
294 using (var pool = new WorkerPool(0,10,1)) {
295 295 const int count = 10000;
296 296
297 297 var args = new double[count];
298 298 var rand = new Random();
299 299
300 300 for (int i = 0; i < count; i++)
301 301 args[i] = rand.NextDouble();
302 302
303 303 var t = Environment.TickCount;
304 304 var res = args
305 305 .ChainedMap(
306 306 // Analysis disable once AccessToDisposedClosure
307 307 x => pool.Invoke(
308 308 () => Math.Sin(x * x)
309 309 ),
310 310 4
311 311 )
312 312 .Join();
313 313
314 314 Console.WriteLine("Map complete in {0} ms", Environment.TickCount - t);
315 315
316 316 t = Environment.TickCount;
317 317 for (int i = 0; i < count; i++)
318 318 Assert.AreEqual(Math.Sin(args[i] * args[i]), res[i]);
319 319 Console.WriteLine("Verified in {0} ms", Environment.TickCount - t);
320 320 Console.WriteLine("Max workers: {0}", pool.MaxRunningThreads);
321 321 }
322 322 }
323 323
324 324 [TestMethod]
325 325 public void ParallelForEachTest() {
326 326
327 327 const int count = 100000;
328 328
329 329 var args = new int[count];
330 330 var rand = new Random();
331 331
332 332 for (int i = 0; i < count; i++)
333 333 args[i] = (int)(rand.NextDouble() * 100);
334 334
335 335 int result = 0;
336 336
337 337 var t = Environment.TickCount;
338 338 args.ParallelForEach(x => Interlocked.Add(ref result, x), 4).Join();
339 339
340 340 Console.WriteLine("Iteration complete in {0} ms, result: {1}", Environment.TickCount - t, result);
341 341
342 342 int result2 = 0;
343 343
344 344 t = Environment.TickCount;
345 345 for (int i = 0; i < count; i++)
346 346 result2 += args[i];
347 347 Assert.AreEqual(result2, result);
348 348 Console.WriteLine("Verified in {0} ms", Environment.TickCount - t);
349 349 }
350 350
351 351 [TestMethod]
352 352 public void ComplexCase1Test() {
353 353 var flags = new bool[3];
354 354
355 355 // op1 (aync 200ms) => op2 (async 200ms) => op3 (sync map)
356 356
357 357 var step1 = PromiseHelper
358 358 .Sleep(200, "Alan")
359 359 .Cancelled(() => flags[0] = true);
360 360 var p = step1
361 361 .Chain(x =>
362 362 PromiseHelper
363 363 .Sleep(200, "Hi, " + x)
364 364 .Then(y => y)
365 365 .Cancelled(() => flags[1] = true)
366 366 )
367 367 .Cancelled(() => flags[2] = true);
368 368 step1.Join();
369 369 p.Cancel();
370 370 try {
371 371 Assert.AreEqual(p.Join(), "Hi, Alan");
372 372 Assert.Fail("Shouldn't get here");
373 373 } catch (OperationCanceledException) {
374 374 }
375 375
376 376 Assert.IsFalse(flags[0]);
377 377 Assert.IsTrue(flags[1]);
378 378 Assert.IsTrue(flags[2]);
379 379 }
380 380
381 381 [TestMethod]
382 382 public void ChainedCancel1Test() {
383 383 // ΠΏΡ€ΠΈ ΠΎΡ‚ΠΌΠ΅Π½Π΅ сцСплСнной асинхронной ΠΎΠΏΠ΅Ρ€Π°Ρ†ΠΈΠΈ всС ΠΎΠ±Π΅Ρ‰Π°Π½ΠΈΠ΅ Π΄ΠΎΠ»ΠΆΠ½ΠΎ
384 384 // Π·Π°Π²Π΅Ρ€ΡˆΠ°Ρ‚ΡŒΡΡ ошибкой OperationCanceledException
385 385 var p = PromiseHelper
386 386 .Sleep(1, "Hi, HAL!")
387 387 .Then(x => {
388 388 // запускаСм Π΄Π²Π΅ асинхронныС ΠΎΠΏΠ΅Ρ€Π°Ρ†ΠΈΠΈ
389 389 var result = PromiseHelper.Sleep(1000, "HEM ENABLED!!!");
390 390 // вторая опСрация отмСняСт ΠΏΠ΅Ρ€Π²ΡƒΡŽ Π΄ΠΎ Π·Π°Π²Π΅Ρ€ΡˆΠ΅Π½ΠΈΡ
391 391 PromiseHelper
392 392 .Sleep(100, "HAL, STOP!")
393 393 .Then(result.Cancel);
394 394 return result;
395 395 });
396 396 try {
397 397 p.Join();
398 398 } catch (TargetInvocationException err) {
399 399 Assert.IsTrue(err.InnerException is OperationCanceledException);
400 400 }
401 401 }
402 402
403 403 [TestMethod]
404 404 public void ChainedCancel2Test() {
405 405 // ΠΏΡ€ΠΈ ΠΎΡ‚ΠΌΠ΅Π½Π΅ Ρ†Π΅ΠΏΠΎΡ‡ΠΊΠΈ ΠΎΠ±Π΅Ρ‰Π°Π½ΠΈΠΉ, Π²Π»ΠΎΠΆΠ΅Π½Π½Ρ‹Π΅ ΠΎΠΏΠ΅Ρ€Π°Ρ†ΠΈΠΈ Ρ‚Π°ΠΊΠΆΠ΅ Π΄ΠΎΠ»ΠΆΠ½Ρ‹ ΠΎΡ‚ΠΌΠ΅Π½ΡΡ‚ΡŒΡΡ
406 406 var pSurvive = new Promise<bool>();
407 407 var hemStarted = new ManualResetEvent(false);
408 408 var p = PromiseHelper
409 409 .Sleep(1, "Hi, HAL!")
410 410 .Chain(x => {
411 411 hemStarted.Set();
412 412 // запускаСм Π΄Π²Π΅ асинхронныС ΠΎΠΏΠ΅Ρ€Π°Ρ†ΠΈΠΈ
413 413 var result = PromiseHelper
414 414 .Sleep(10000, "HEM ENABLED!!!")
415 415 .Then(s => pSurvive.Resolve(false));
416 416
417 417 result
418 418 .Cancelled(() => pSurvive.Resolve(true));
419 419
420 420 return result;
421 421 });
422 422
423 423 hemStarted.WaitOne();
424 424 p.Cancel();
425 425
426 426 try {
427 427 p.Join();
428 428 } catch (OperationCanceledException) {
429 429 Assert.IsTrue(pSurvive.Join());
430 430 }
431 431 }
432 432 }
433 433 }
434 434
@@ -1,343 +1,204
1 1 using System;
2 2 using System.Collections.Generic;
3 3 using System.Linq;
4 4 using System.Text;
5 5 using System.Threading;
6 6 using System.Diagnostics;
7 7
8 8 namespace Implab.Parallels {
9 9 public abstract class DispatchPool<TUnit> : IDisposable {
10 readonly int m_minThreads;
11 readonly int m_maxThreads;
12 readonly int m_releaseTimeout = 100; // the timeout while the working thread will wait for the new tasks before exit
10 readonly int m_minThreadsLimit;
11 readonly int m_maxThreadsLimit;
12 readonly int m_releaseTimeout = 1000; // the timeout while the working thread will wait for the new tasks before exit
13 13
14 int m_createdThreads = 0; // the current size of the pool
15 int m_activeThreads = 0; // the count of threads which are active
16 int m_sleepingThreads = 0; // the count of currently inactive threads
14 int m_threads = 0; // the current size of the pool
17 15 int m_maxRunningThreads = 0; // the meximum reached size of the pool
18 int m_exitRequired = 0; // the pool is going to shutdown, all unused workers are released
16 int m_exit = 0; // the pool is going to shutdown, all unused workers are released
19 17
20 int m_wakeEvents = 0; // the count of wake events
21
22 readonly object m_signalLocker = new object();
18 readonly object m_signal = new object(); // used to pulse waiting threads
23 19
24 20 protected DispatchPool(int min, int max) {
25 21 if (min < 0)
26 22 throw new ArgumentOutOfRangeException("min");
27 23 if (max <= 0)
28 24 throw new ArgumentOutOfRangeException("max");
29 25
30 26 if (min > max)
31 27 min = max;
32 m_minThreads = min;
33 m_maxThreads = max;
28 m_minThreadsLimit = min;
29 m_maxThreadsLimit = max;
34 30 }
35 31
36 32 protected DispatchPool(int threads)
37 33 : this(threads, threads) {
38 34 }
39 35
40 36 protected DispatchPool() {
41 37 int maxThreads, maxCP;
42 38 ThreadPool.GetMaxThreads(out maxThreads, out maxCP);
43 39
44 m_minThreads = 0;
45 m_maxThreads = maxThreads;
40 m_minThreadsLimit = 0;
41 m_maxThreadsLimit = maxThreads;
46 42 }
47 43
48 44 protected void InitPool() {
49 for (int i = 0; i < m_minThreads; i++)
45 for (int i = 0; i < m_minThreadsLimit; i++)
50 46 StartWorker();
51 47 }
52 48
53 49 public int PoolSize {
54 50 get {
55 51 Thread.MemoryBarrier();
56 return m_createdThreads;
52 return m_threads;
57 53 }
58 54 }
59
60 public int ActiveThreads {
61 get {
62 Thread.MemoryBarrier();
63 return m_activeThreads;
64 }
65 }
66
55
67 56 public int MaxRunningThreads {
68 57 get {
69 58 Thread.MemoryBarrier();
70 59 return m_maxRunningThreads;
71 60 }
72 61 }
73 62
74 63 protected bool IsDisposed {
75 64 get {
76 65 Thread.MemoryBarrier();
77 return m_exitRequired == 1;
66 return m_exit == 1;
78 67 }
79 68 }
80 69
81 70 protected abstract bool TryDequeue(out TUnit unit);
82 71
83 #region thread signaling traits
84 int SignalThread() {
85 var signals = Interlocked.Increment(ref m_wakeEvents);
86 if(signals == 1)
87 lock(m_signalLocker)
88 Monitor.Pulse(m_signalLocker);
89 return signals;
90 }
91
92 bool FetchSignalOrWait(int timeout) {
93 var start = Environment.TickCount;
94 int signals;
95 Thread.MemoryBarrier(); // m_wakeEvents volatile first read
96 do {
97 signals = m_wakeEvents;
98 if (signals == 0)
99 break;
100 } while (Interlocked.CompareExchange(ref m_wakeEvents, signals - 1, signals) != signals);
101
102 if (signals == 0) {
103 // no signal is fetched
104 lock(m_signalLocker) {
105 while(m_wakeEvents == 0) {
106 if (timeout != -1)
107 timeout = Math.Max(0, timeout - (Environment.TickCount - start));
108 if(!Monitor.Wait(m_signalLocker,timeout))
109 return false; // timeout
72 private bool Dequeue(out TUnit unit, int timeout) {
73 int ts = Environment.TickCount;
74 if (TryDequeue(out unit))
75 return true;
76 lock (m_signal) {
77 while (!TryDequeue(out unit) && m_exit == 0)
78 if(!Monitor.Wait(m_signal, Math.Max(0, ts + timeout - Environment.TickCount))) {
79 // timeout
80 return false;
110 81 }
111 // m_wakeEvents > 0
112 if (Interlocked.Decrement(ref m_wakeEvents) > 0) //syncronized
113 Monitor.Pulse(m_signalLocker);
114
115 // signal fetched
116 return true;
117 }
118
119 } else {
120 // signal fetched
121 return true;
82 // queue item or terminate
83 Monitor.Pulse(m_signal);
84 if (m_exit == 1)
85 return false;
122 86 }
123
124
87 return true;
125 88 }
126 89
127 bool Sleep(int timeout) {
128 Interlocked.Increment(ref m_sleepingThreads);
129 if (FetchSignalOrWait(timeout)) {
130 Interlocked.Decrement(ref m_sleepingThreads);
131 return true;
132 } else {
133 Interlocked.Decrement(ref m_sleepingThreads);
134 return false;
135 }
136 }
137 #endregion
138
139 /// <summary>
140 /// ЗапускаСт Π»ΠΈΠ±ΠΎ Π½ΠΎΠ²Ρ‹ΠΉ ΠΏΠΎΡ‚ΠΎΠΊ, Ссли Ρ€Π°Π½ΡŒΡˆΠ΅ Π½Π΅ Π±Ρ‹Π»ΠΎ Π½ΠΈ ΠΎΠ΄Π½ΠΎΠ³ΠΎ ΠΏΠΎΡ‚ΠΎΠΊΠ°, Π»ΠΈΠ±ΠΎ устанавливаСт событиС ΠΏΡ€ΠΎΠ±ΡƒΠΆΠ΄Π΅Π½ΠΈΠ΅ ΠΎΠ΄Π½ΠΎΠ³ΠΎ спящСго ΠΏΠΎΡ‚ΠΎΠΊΠ°
141 /// </summary>
142 protected void GrowPool() {
143 Thread.MemoryBarrier();
144 if (m_exitRequired == 1)
145 return;
146 if (m_sleepingThreads > m_wakeEvents) {
147 //Console.WriteLine("Waking threads (sleeps {0}, pending {1})", m_sleepingThreads, m_wakeEvents);
148
149 // all sleeping threads may gone
150 SignalThread(); // wake a sleeping thread;
151
152 // we can't check whether signal has been processed
153 // anyway it may take some time for the thread to start
154 // we will ensure that at least one thread is running
155
156 EnsurePoolIsAlive();
157 } else {
158 // if there is no sleeping threads in the pool
159 if (!StartWorker()) {
160 // we haven't started a new thread, but the current can be on the way to terminate and it can't process the queue
161 // send it a signal to spin again
162 SignalThread();
163 EnsurePoolIsAlive();
164 }
90 protected void SignalThread() {
91 lock (m_signal) {
92 Monitor.Pulse(m_signal);
165 93 }
166 94 }
167 95
168 protected void EnsurePoolIsAlive() {
169 if (AllocateThreadSlot(1)) {
170 // if there were no threads in the pool
171 var worker = new Thread(this.Worker);
172 worker.IsBackground = true;
173 worker.Start();
174 }
175 }
176
177 protected virtual bool Suspend() {
178 //no tasks left, exit if the thread is no longer needed
179 bool last;
180 bool requestExit;
181
182 // if threads have a timeout before releasing
183 if (m_releaseTimeout > 0)
184 requestExit = !Sleep(m_releaseTimeout);
185 else
186 requestExit = true;
187
188 if (!requestExit)
189 return true;
190
191 // release unsused thread
192 if (requestExit && ReleaseThreadSlot(out last)) {
193 // in case at the moment the last thread was being released
194 // a new task was added to the queue, we need to try
195 // to revoke the thread to avoid the situation when the task is left unprocessed
196 if (last && FetchSignalOrWait(0)) { // FetchSignalOrWait(0) will fetch pending task or will return false
197 SignalThread(); // since FetchSignalOrWait(0) has fetched the signal we need to reschedule it
198 return AllocateThreadSlot(1); // ensure that at least one thread is alive
199 }
200
201 return false;
202 }
203
204 // wait till infinity
205 Sleep(-1);
206
207 return true;
208 }
209
210 96 #region thread slots traits
211 97
212 98 bool AllocateThreadSlot() {
213 99 int current;
214 100 // use spins to allocate slot for the new thread
215 101 do {
216 current = m_createdThreads;
217 if (current >= m_maxThreads || m_exitRequired == 1)
102 current = m_threads;
103 if (current >= m_maxThreadsLimit || m_exit == 1)
218 104 // no more slots left or the pool has been disposed
219 105 return false;
220 } while (current != Interlocked.CompareExchange(ref m_createdThreads, current + 1, current));
106 } while (current != Interlocked.CompareExchange(ref m_threads, current + 1, current));
221 107
222 108 UpdateMaxThreads(current + 1);
223 109
224 110 return true;
225 111 }
226 112
227 113 bool AllocateThreadSlot(int desired) {
228 if (desired - 1 != Interlocked.CompareExchange(ref m_createdThreads, desired, desired - 1))
114 if (desired - 1 != Interlocked.CompareExchange(ref m_threads, desired, desired - 1))
229 115 return false;
230 116
231 117 UpdateMaxThreads(desired);
232 118
233 119 return true;
234 120 }
235 121
236 122 bool ReleaseThreadSlot(out bool last) {
237 123 last = false;
238 124 int current;
239 125 // use spins to release slot for the new thread
240 126 Thread.MemoryBarrier();
241 127 do {
242 current = m_createdThreads;
243 if (current <= m_minThreads && m_exitRequired == 0)
128 current = m_threads;
129 if (current <= m_minThreadsLimit && m_exit == 0)
244 130 // the thread is reserved
245 131 return false;
246 } while (current != Interlocked.CompareExchange(ref m_createdThreads, current - 1, current));
132 } while (current != Interlocked.CompareExchange(ref m_threads, current - 1, current));
247 133
248 134 last = (current == 1);
249 135
250 136 return true;
251 137 }
252 138
253 /// <summary>
254 /// releases thread slot unconditionally, used during cleanup
255 /// </summary>
256 /// <returns>true - no more threads left</returns>
257 bool ReleaseThreadSlotAnyway() {
258 var left = Interlocked.Decrement(ref m_createdThreads);
259 return left == 0;
260 }
261
262 139 void UpdateMaxThreads(int count) {
263 140 int max;
264 141 do {
265 142 max = m_maxRunningThreads;
266 143 if (max >= count)
267 144 break;
268 145 } while(max != Interlocked.CompareExchange(ref m_maxRunningThreads, count, max));
269 146 }
270 147
271 148 #endregion
272 149
273 bool StartWorker() {
150 protected bool StartWorker() {
274 151 if (AllocateThreadSlot()) {
275 152 // slot successfully allocated
276 153 var worker = new Thread(this.Worker);
277 154 worker.IsBackground = true;
278 Interlocked.Increment(ref m_activeThreads);
279 155 worker.Start();
280 156
281 157 return true;
282 158 } else {
283 159 return false;
284 160 }
285 161 }
286 162
287 163 protected abstract void InvokeUnit(TUnit unit);
288 164
289 165 protected virtual void Worker() {
290 166 TUnit unit;
291 //Console.WriteLine("{0}: Active", Thread.CurrentThread.ManagedThreadId);
292 int count = 0;;
293 Thread.MemoryBarrier();
167 bool last;
294 168 do {
295 // exit if requested
296 if (m_exitRequired == 1) {
297 // release the thread slot
298 Interlocked.Decrement(ref m_activeThreads);
299 if (!ReleaseThreadSlotAnyway()) // it was the last worker
300 SignalThread(); // wake next worker
301 break;
302 }
303
304 // fetch task
305 if (TryDequeue(out unit)) {
169 while (Dequeue(out unit, m_releaseTimeout)) {
306 170 InvokeUnit(unit);
307 count ++;
171 }
172 if(!ReleaseThreadSlot(out last))
308 173 continue;
174 // queue may be not empty
175 if (last && TryDequeue(out unit)) {
176 InvokeUnit(unit);
177 if (AllocateThreadSlot(1))
178 continue;
179 // we can safely exit since pool is alive
309 180 }
310 Interlocked.Decrement(ref m_activeThreads);
181 break;
182 } while(true);
183 }
311 184
312 Console.WriteLine("{0}: Suspend processed({1})", Thread.CurrentThread.ManagedThreadId,count);
313 // entering suspend state
314 // keep this thread and wait
315 if (!Suspend())
316 break;
317 count = 0;
318 //Console.WriteLine("{0}: Awake", Thread.CurrentThread.ManagedThreadId);
319 Interlocked.Increment(ref m_activeThreads);
320 } while (true);
321 //Console.WriteLine("{0}: Exited", Thread.CurrentThread.ManagedThreadId);
322 }
323 185
324 186 protected virtual void Dispose(bool disposing) {
325 187 if (disposing) {
326 if (0 == Interlocked.CompareExchange(ref m_exitRequired, 1, 0)) { // implies memory barrier
188 if (0 == Interlocked.CompareExchange(ref m_exit, 1, 0)) { // implies memory barrier
327 189 // wake sleeping threads
328 if (m_createdThreads > 0)
329 SignalThread();
190 SignalThread();
330 191 GC.SuppressFinalize(this);
331 192 }
332 193 }
333 194 }
334 195
335 196 public void Dispose() {
336 197 Dispose(true);
337 198 }
338 199
339 200 ~DispatchPool() {
340 201 Dispose(false);
341 202 }
342 203 }
343 204 }
@@ -1,106 +1,89
1 1 using System;
2 2 using System.Collections.Generic;
3 3 using System.Linq;
4 4 using System.Text;
5 5 using System.Threading;
6 6 using System.Diagnostics;
7 7 using Implab.Diagnostics;
8 8
9 9 namespace Implab.Parallels {
10 10 public class WorkerPool : DispatchPool<Action> {
11 11
12 12 MTQueue<Action> m_queue = new MTQueue<Action>();
13 13 int m_queueLength = 0;
14 14 readonly int m_threshold = 1;
15 15 int m_workers = 0;
16 16
17 17 public WorkerPool(int minThreads, int maxThreads, int threshold)
18 18 : base(minThreads, maxThreads) {
19 19 m_threshold = threshold;
20 20 m_workers = minThreads;
21 21 InitPool();
22 22 }
23 23
24 24 public WorkerPool(int minThreads, int maxThreads) :
25 25 base(minThreads, maxThreads) {
26 26 m_workers = minThreads;
27 27 InitPool();
28 28 }
29 29
30 30 public WorkerPool(int threads)
31 31 : base(threads) {
32 32 m_workers = threads;
33 33 InitPool();
34 34 }
35 35
36 36 public WorkerPool()
37 37 : base() {
38 38 InitPool();
39 39 }
40 40
41 41 public Promise<T> Invoke<T>(Func<T> task) {
42 42 if (task == null)
43 43 throw new ArgumentNullException("task");
44 44 if (IsDisposed)
45 45 throw new ObjectDisposedException(ToString());
46 46
47 47 var promise = new Promise<T>();
48 48
49 49 var caller = TraceContext.Snapshot();
50 50
51 51 EnqueueTask(delegate() {
52 52 caller.Invoke(delegate() {
53 53 try {
54 54 promise.Resolve(task());
55 55 } catch (Exception e) {
56 56 promise.Reject(e);
57 57 }
58 58 });
59 59 });
60 60
61 61 return promise;
62 62 }
63 63
64 64 protected void EnqueueTask(Action unit) {
65 65 Debug.Assert(unit != null);
66 66 var len = Interlocked.Increment(ref m_queueLength);
67 67 m_queue.Enqueue(unit);
68 68
69 if (len > m_threshold * m_workers) {
70 Interlocked.Increment(ref m_workers);
71 GrowPool();
69 if (len > m_threshold * PoolSize) {
70 StartWorker();
72 71 }
72
73 SignalThread();
73 74 }
74 75
75 76 protected override bool TryDequeue(out Action unit) {
76 77 if (m_queue.TryDequeue(out unit)) {
77 78 Interlocked.Decrement(ref m_queueLength);
78 79 return true;
79 80 }
80 81 return false;
81 82 }
82 83
83 protected override bool Suspend() {
84 // This override solves race condition
85 // WORKER CLIENT
86 // ---------------------------------------
87 // TryDeque == false
88 // Enqueue(unit), queueLen++
89 // GrowPool? == NO
90 // ActiveThreads--
91 // Suspend
92 // queueLength > 0
93 // continue
94 Thread.MemoryBarrier();
95 if (m_queueLength > 0)
96 return true;
97 Interlocked.Decrement(ref m_workers);
98 return base.Suspend();
99 }
100
101 84 protected override void InvokeUnit(Action unit) {
102 85 unit();
103 86 }
104 87
105 88 }
106 89 }
General Comments 0
You need to be logged in to leave comments. Login now