##// END OF EJS Templates
refactoring, sync
cin -
r20:1c3b3d518480 promises
parent child
Show More
@@ -101,18 +101,18 namespace Implab.Test {
101 101 public void WorkerPoolSizeTest() {
102 102 var pool = new WorkerPool(5, 10, 0);
103 103
104 Assert.AreEqual(5, pool.ThreadCount);
104 Assert.AreEqual(5, pool.PoolSize);
105 105
106 106 pool.Invoke(() => { Thread.Sleep(1000000); return 10; });
107 107 pool.Invoke(() => { Thread.Sleep(1000000); return 10; });
108 108 pool.Invoke(() => { Thread.Sleep(1000000); return 10; });
109 109
110 Assert.AreEqual(5, pool.ThreadCount);
110 Assert.AreEqual(5, pool.PoolSize);
111 111
112 112 for (int i = 0; i < 100; i++)
113 113 pool.Invoke(() => { Thread.Sleep(1000000); return 10; });
114 114 Thread.Sleep(100);
115 Assert.AreEqual(10, pool.ThreadCount);
115 Assert.AreEqual(10, pool.PoolSize);
116 116
117 117 pool.Dispose();
118 118 }
@@ -149,10 +149,10 namespace Implab.Test {
149 149 [TestMethod]
150 150 public void WorkerPoolDisposeTest() {
151 151 var pool = new WorkerPool(5, 20);
152 Assert.AreEqual(5, pool.ThreadCount);
152 Assert.AreEqual(5, pool.PoolSize);
153 153 pool.Dispose();
154 Thread.Sleep(100);
155 Assert.AreEqual(0, pool.ThreadCount);
154 Thread.Sleep(200);
155 Assert.AreEqual(0, pool.PoolSize);
156 156 pool.Dispose();
157 157 }
158 158
@@ -244,7 +244,7 namespace Implab.Test {
244 244 [TestMethod]
245 245 public void ChainedMapTest() {
246 246
247 using (var pool = new WorkerPool(4,4,0)) {
247 using (var pool = new WorkerPool(0,100,0)) {
248 248 int count = 10000;
249 249
250 250 double[] args = new double[count];
1 NO CONTENT: modified file, binary diff hidden
@@ -9,10 +9,12 namespace Implab.Parallels {
9 9 public abstract class DispatchPool<TUnit> : IDisposable {
10 10 readonly int m_minThreads;
11 11 readonly int m_maxThreads;
12 int m_runningThreads = 0;
12 int m_createdThreads = 0;
13 int m_activeThreads = 0;
14 int m_sleepingThreads = 0;
13 15 int m_maxRunningThreads = 0;
14 int m_suspended = 0;
15 16 int m_exitRequired = 0;
17 int m_releaseTimeout = 100; // timeout while the working thread will wait for the new tasks before exit
16 18 AutoResetEvent m_hasTasks = new AutoResetEvent(false);
17 19
18 20 protected DispatchPool(int min, int max) {
@@ -44,9 +46,15 namespace Implab.Parallels {
44 46 StartWorker();
45 47 }
46 48
47 public int ThreadCount {
49 public int PoolSize {
48 50 get {
49 return m_runningThreads;
51 return m_createdThreads;
52 }
53 }
54
55 public int ActiveThreads {
56 get {
57 return m_activeThreads;
50 58 }
51 59 }
52 60
@@ -65,11 +73,18 namespace Implab.Parallels {
65 73 protected abstract bool TryDequeue(out TUnit unit);
66 74
67 75 protected virtual bool ExtendPool() {
68 if (m_suspended > 0) {
69 m_hasTasks.Set();
76 if (m_sleepingThreads == 0)
77 // no sleeping workers are available
78 // try create one
79 return StartWorker();
80 else {
81 // we can get here a race condition when several threads asks to extend pool
82 // and some sleaping threads are exited due timeout but they are still counted as sleeping
83 // in that case all of this threads could exit except one
84 WakePool();
70 85 return true;
71 } else
72 return StartWorker();
86 }
87
73 88 }
74 89
75 90 /// <summary>
@@ -79,14 +94,50 namespace Implab.Parallels {
79 94 m_hasTasks.Set(); // wake sleeping thread;
80 95
81 96 if (AllocateThreadSlot(1)) {
97 // if there were no threads in the pool
82 98 var worker = new Thread(this.Worker);
83 99 worker.IsBackground = true;
84 100 worker.Start();
85 101 }
86 102 }
87 103
88 protected virtual void Suspend() {
89 m_hasTasks.WaitOne();
104 bool Sleep(int timeout) {
105 Interlocked.Increment(ref m_sleepingThreads);
106 var result = m_hasTasks.WaitOne(timeout);
107 Interlocked.Decrement(ref m_sleepingThreads);
108 return result;
109 }
110
111 protected virtual bool Suspend() {
112 //no tasks left, exit if the thread is no longer needed
113 bool last;
114 bool requestExit;
115
116 if (m_releaseTimeout > 0)
117 requestExit = !Sleep(m_releaseTimeout);
118 else
119 requestExit = true;
120
121
122 if (requestExit && ReleaseThreadSlot(out last)) {
123 // in case at the moment the last thread was being released
124 // a new task was added to the queue, we need to try
125 // to revoke the thread to avoid the situation when the task is left unprocessed
126 if (last && m_hasTasks.WaitOne(0)) {
127 if (AllocateThreadSlot(1))
128 return true; // spin again...
129 else
130 // we failed to reallocate the first slot for this thread
131 // therefore we need to release the event
132 m_hasTasks.Set();
133 }
134
135 return false;
136 }
137
138 Sleep(-1);
139
140 return true;
90 141 }
91 142
92 143 #region thread slots traits
@@ -95,11 +146,11 namespace Implab.Parallels {
95 146 int current;
96 147 // use spins to allocate slot for the new thread
97 148 do {
98 current = m_runningThreads;
149 current = m_createdThreads;
99 150 if (current >= m_maxThreads || m_exitRequired != 0)
100 151 // no more slots left or the pool has been disposed
101 152 return false;
102 } while (current != Interlocked.CompareExchange(ref m_runningThreads, current + 1, current));
153 } while (current != Interlocked.CompareExchange(ref m_createdThreads, current + 1, current));
103 154
104 155 UpdateMaxThreads(current + 1);
105 156
@@ -107,7 +158,7 namespace Implab.Parallels {
107 158 }
108 159
109 160 bool AllocateThreadSlot(int desired) {
110 if (desired - 1 != Interlocked.CompareExchange(ref m_runningThreads, desired, desired - 1))
161 if (desired - 1 != Interlocked.CompareExchange(ref m_createdThreads, desired, desired - 1))
111 162 return false;
112 163
113 164 UpdateMaxThreads(desired);
@@ -120,11 +171,11 namespace Implab.Parallels {
120 171 int current;
121 172 // use spins to release slot for the new thread
122 173 do {
123 current = m_runningThreads;
174 current = m_createdThreads;
124 175 if (current <= m_minThreads && m_exitRequired == 0)
125 176 // the thread is reserved
126 177 return false;
127 } while (current != Interlocked.CompareExchange(ref m_runningThreads, current - 1, current));
178 } while (current != Interlocked.CompareExchange(ref m_createdThreads, current - 1, current));
128 179
129 180 last = (current == 1);
130 181
@@ -136,7 +187,7 namespace Implab.Parallels {
136 187 /// </summary>
137 188 /// <returns>true - no more threads left</returns>
138 189 bool ReleaseThreadSlotAnyway() {
139 var left = Interlocked.Decrement(ref m_runningThreads);
190 var left = Interlocked.Decrement(ref m_createdThreads);
140 191 return left == 0;
141 192 }
142 193
@@ -169,6 +220,7 namespace Implab.Parallels {
169 220 // exit if requested
170 221 if (m_exitRequired != 0) {
171 222 // release the thread slot
223 Interlocked.Decrement(ref m_activeThreads);
172 224 if (ReleaseThreadSlotAnyway()) // it was the last worker
173 225 m_hasTasks.Dispose();
174 226 else
@@ -183,26 +235,14 namespace Implab.Parallels {
183 235 return true;
184 236 }
185 237
186 //no tasks left, exit if the thread is no longer needed
187 bool last;
188 if (ReleaseThreadSlot(out last)) {
189 if (last && m_hasTasks.WaitOne(0)) {
190 if (AllocateThreadSlot(1))
191 continue; // spin again...
192 else
193 // we failed to reallocate slot for this thread
194 // therefore we need to release the event
195 m_hasTasks.Set();
196 }
197
198 return false;
199 }
238 Interlocked.Decrement(ref m_activeThreads);
200 239
201 240 // entering suspend state
202 Interlocked.Increment(ref m_suspended);
203 241 // keep this thread and wait
204 Suspend();
205 Interlocked.Decrement(ref m_suspended);
242 if (!Suspend())
243 return false;
244
245 Interlocked.Increment(ref m_activeThreads);
206 246 } while (true);
207 247 }
208 248
@@ -210,6 +250,7 namespace Implab.Parallels {
210 250
211 251 void Worker() {
212 252 TUnit unit;
253 Interlocked.Increment(ref m_activeThreads);
213 254 while (FetchTask(out unit))
214 255 InvokeUnit(unit);
215 256 }
@@ -57,12 +57,11 namespace Implab.Parallels {
57 57 var len = Interlocked.Increment(ref m_queueLength);
58 58 m_queue.Enqueue(unit);
59 59
60 if(!ExtendPool())
61 WakePool();
60 ExtendPool();
62 61 }
63 62
64 63 protected override bool ExtendPool() {
65 if (m_queueLength <= m_threshold*ThreadCount)
64 if (m_queueLength <= m_threshold*ActiveThreads)
66 65 // in this case we are in active thread and it request for additional workers
67 66 // satisfy it only when queue is longer than threshold
68 67 return false;
@@ -81,9 +80,11 namespace Implab.Parallels {
81 80 unit();
82 81 }
83 82
84 protected override void Suspend() {
83 protected override bool Suspend() {
85 84 if (m_queueLength == 0)
86 base.Suspend();
85 return base.Suspend();
86 else
87 return true; // spin again without locks...
87 88 }
88 89 }
89 90 }
General Comments 0
You need to be logged in to leave comments. Login now