##// END OF EJS Templates
refactoring, sync
cin -
r20:1c3b3d518480 promises
parent child
Show More
@@ -101,18 +101,18 namespace Implab.Test {
101 public void WorkerPoolSizeTest() {
101 public void WorkerPoolSizeTest() {
102 var pool = new WorkerPool(5, 10, 0);
102 var pool = new WorkerPool(5, 10, 0);
103
103
104 Assert.AreEqual(5, pool.ThreadCount);
104 Assert.AreEqual(5, pool.PoolSize);
105
105
106 pool.Invoke(() => { Thread.Sleep(1000000); return 10; });
106 pool.Invoke(() => { Thread.Sleep(1000000); return 10; });
107 pool.Invoke(() => { Thread.Sleep(1000000); return 10; });
107 pool.Invoke(() => { Thread.Sleep(1000000); return 10; });
108 pool.Invoke(() => { Thread.Sleep(1000000); return 10; });
108 pool.Invoke(() => { Thread.Sleep(1000000); return 10; });
109
109
110 Assert.AreEqual(5, pool.ThreadCount);
110 Assert.AreEqual(5, pool.PoolSize);
111
111
112 for (int i = 0; i < 100; i++)
112 for (int i = 0; i < 100; i++)
113 pool.Invoke(() => { Thread.Sleep(1000000); return 10; });
113 pool.Invoke(() => { Thread.Sleep(1000000); return 10; });
114 Thread.Sleep(100);
114 Thread.Sleep(100);
115 Assert.AreEqual(10, pool.ThreadCount);
115 Assert.AreEqual(10, pool.PoolSize);
116
116
117 pool.Dispose();
117 pool.Dispose();
118 }
118 }
@@ -149,10 +149,10 namespace Implab.Test {
149 [TestMethod]
149 [TestMethod]
150 public void WorkerPoolDisposeTest() {
150 public void WorkerPoolDisposeTest() {
151 var pool = new WorkerPool(5, 20);
151 var pool = new WorkerPool(5, 20);
152 Assert.AreEqual(5, pool.ThreadCount);
152 Assert.AreEqual(5, pool.PoolSize);
153 pool.Dispose();
153 pool.Dispose();
154 Thread.Sleep(100);
154 Thread.Sleep(200);
155 Assert.AreEqual(0, pool.ThreadCount);
155 Assert.AreEqual(0, pool.PoolSize);
156 pool.Dispose();
156 pool.Dispose();
157 }
157 }
158
158
@@ -244,7 +244,7 namespace Implab.Test {
244 [TestMethod]
244 [TestMethod]
245 public void ChainedMapTest() {
245 public void ChainedMapTest() {
246
246
247 using (var pool = new WorkerPool(4,4,0)) {
247 using (var pool = new WorkerPool(0,100,0)) {
248 int count = 10000;
248 int count = 10000;
249
249
250 double[] args = new double[count];
250 double[] args = new double[count];
1 NO CONTENT: modified file, binary diff hidden
NO CONTENT: modified file, binary diff hidden
@@ -9,10 +9,12 namespace Implab.Parallels {
9 public abstract class DispatchPool<TUnit> : IDisposable {
9 public abstract class DispatchPool<TUnit> : IDisposable {
10 readonly int m_minThreads;
10 readonly int m_minThreads;
11 readonly int m_maxThreads;
11 readonly int m_maxThreads;
12 int m_runningThreads = 0;
12 int m_createdThreads = 0;
13 int m_activeThreads = 0;
14 int m_sleepingThreads = 0;
13 int m_maxRunningThreads = 0;
15 int m_maxRunningThreads = 0;
14 int m_suspended = 0;
15 int m_exitRequired = 0;
16 int m_exitRequired = 0;
17 int m_releaseTimeout = 100; // timeout while the working thread will wait for the new tasks before exit
16 AutoResetEvent m_hasTasks = new AutoResetEvent(false);
18 AutoResetEvent m_hasTasks = new AutoResetEvent(false);
17
19
18 protected DispatchPool(int min, int max) {
20 protected DispatchPool(int min, int max) {
@@ -44,9 +46,15 namespace Implab.Parallels {
44 StartWorker();
46 StartWorker();
45 }
47 }
46
48
47 public int ThreadCount {
49 public int PoolSize {
48 get {
50 get {
49 return m_runningThreads;
51 return m_createdThreads;
52 }
53 }
54
55 public int ActiveThreads {
56 get {
57 return m_activeThreads;
50 }
58 }
51 }
59 }
52
60
@@ -65,11 +73,18 namespace Implab.Parallels {
65 protected abstract bool TryDequeue(out TUnit unit);
73 protected abstract bool TryDequeue(out TUnit unit);
66
74
67 protected virtual bool ExtendPool() {
75 protected virtual bool ExtendPool() {
68 if (m_suspended > 0) {
76 if (m_sleepingThreads == 0)
69 m_hasTasks.Set();
77 // no sleeping workers are available
78 // try create one
79 return StartWorker();
80 else {
81 // we can get here a race condition when several threads asks to extend pool
82 // and some sleaping threads are exited due timeout but they are still counted as sleeping
83 // in that case all of this threads could exit except one
84 WakePool();
70 return true;
85 return true;
71 } else
86 }
72 return StartWorker();
87
73 }
88 }
74
89
75 /// <summary>
90 /// <summary>
@@ -79,14 +94,50 namespace Implab.Parallels {
79 m_hasTasks.Set(); // wake sleeping thread;
94 m_hasTasks.Set(); // wake sleeping thread;
80
95
81 if (AllocateThreadSlot(1)) {
96 if (AllocateThreadSlot(1)) {
97 // if there were no threads in the pool
82 var worker = new Thread(this.Worker);
98 var worker = new Thread(this.Worker);
83 worker.IsBackground = true;
99 worker.IsBackground = true;
84 worker.Start();
100 worker.Start();
85 }
101 }
86 }
102 }
87
103
88 protected virtual void Suspend() {
104 bool Sleep(int timeout) {
89 m_hasTasks.WaitOne();
105 Interlocked.Increment(ref m_sleepingThreads);
106 var result = m_hasTasks.WaitOne(timeout);
107 Interlocked.Decrement(ref m_sleepingThreads);
108 return result;
109 }
110
111 protected virtual bool Suspend() {
112 //no tasks left, exit if the thread is no longer needed
113 bool last;
114 bool requestExit;
115
116 if (m_releaseTimeout > 0)
117 requestExit = !Sleep(m_releaseTimeout);
118 else
119 requestExit = true;
120
121
122 if (requestExit && ReleaseThreadSlot(out last)) {
123 // in case at the moment the last thread was being released
124 // a new task was added to the queue, we need to try
125 // to revoke the thread to avoid the situation when the task is left unprocessed
126 if (last && m_hasTasks.WaitOne(0)) {
127 if (AllocateThreadSlot(1))
128 return true; // spin again...
129 else
130 // we failed to reallocate the first slot for this thread
131 // therefore we need to release the event
132 m_hasTasks.Set();
133 }
134
135 return false;
136 }
137
138 Sleep(-1);
139
140 return true;
90 }
141 }
91
142
92 #region thread slots traits
143 #region thread slots traits
@@ -95,11 +146,11 namespace Implab.Parallels {
95 int current;
146 int current;
96 // use spins to allocate slot for the new thread
147 // use spins to allocate slot for the new thread
97 do {
148 do {
98 current = m_runningThreads;
149 current = m_createdThreads;
99 if (current >= m_maxThreads || m_exitRequired != 0)
150 if (current >= m_maxThreads || m_exitRequired != 0)
100 // no more slots left or the pool has been disposed
151 // no more slots left or the pool has been disposed
101 return false;
152 return false;
102 } while (current != Interlocked.CompareExchange(ref m_runningThreads, current + 1, current));
153 } while (current != Interlocked.CompareExchange(ref m_createdThreads, current + 1, current));
103
154
104 UpdateMaxThreads(current + 1);
155 UpdateMaxThreads(current + 1);
105
156
@@ -107,7 +158,7 namespace Implab.Parallels {
107 }
158 }
108
159
109 bool AllocateThreadSlot(int desired) {
160 bool AllocateThreadSlot(int desired) {
110 if (desired - 1 != Interlocked.CompareExchange(ref m_runningThreads, desired, desired - 1))
161 if (desired - 1 != Interlocked.CompareExchange(ref m_createdThreads, desired, desired - 1))
111 return false;
162 return false;
112
163
113 UpdateMaxThreads(desired);
164 UpdateMaxThreads(desired);
@@ -120,11 +171,11 namespace Implab.Parallels {
120 int current;
171 int current;
121 // use spins to release slot for the new thread
172 // use spins to release slot for the new thread
122 do {
173 do {
123 current = m_runningThreads;
174 current = m_createdThreads;
124 if (current <= m_minThreads && m_exitRequired == 0)
175 if (current <= m_minThreads && m_exitRequired == 0)
125 // the thread is reserved
176 // the thread is reserved
126 return false;
177 return false;
127 } while (current != Interlocked.CompareExchange(ref m_runningThreads, current - 1, current));
178 } while (current != Interlocked.CompareExchange(ref m_createdThreads, current - 1, current));
128
179
129 last = (current == 1);
180 last = (current == 1);
130
181
@@ -136,7 +187,7 namespace Implab.Parallels {
136 /// </summary>
187 /// </summary>
137 /// <returns>true - no more threads left</returns>
188 /// <returns>true - no more threads left</returns>
138 bool ReleaseThreadSlotAnyway() {
189 bool ReleaseThreadSlotAnyway() {
139 var left = Interlocked.Decrement(ref m_runningThreads);
190 var left = Interlocked.Decrement(ref m_createdThreads);
140 return left == 0;
191 return left == 0;
141 }
192 }
142
193
@@ -169,6 +220,7 namespace Implab.Parallels {
169 // exit if requested
220 // exit if requested
170 if (m_exitRequired != 0) {
221 if (m_exitRequired != 0) {
171 // release the thread slot
222 // release the thread slot
223 Interlocked.Decrement(ref m_activeThreads);
172 if (ReleaseThreadSlotAnyway()) // it was the last worker
224 if (ReleaseThreadSlotAnyway()) // it was the last worker
173 m_hasTasks.Dispose();
225 m_hasTasks.Dispose();
174 else
226 else
@@ -183,26 +235,14 namespace Implab.Parallels {
183 return true;
235 return true;
184 }
236 }
185
237
186 //no tasks left, exit if the thread is no longer needed
238 Interlocked.Decrement(ref m_activeThreads);
187 bool last;
188 if (ReleaseThreadSlot(out last)) {
189 if (last && m_hasTasks.WaitOne(0)) {
190 if (AllocateThreadSlot(1))
191 continue; // spin again...
192 else
193 // we failed to reallocate slot for this thread
194 // therefore we need to release the event
195 m_hasTasks.Set();
196 }
197
198 return false;
199 }
200
239
201 // entering suspend state
240 // entering suspend state
202 Interlocked.Increment(ref m_suspended);
203 // keep this thread and wait
241 // keep this thread and wait
204 Suspend();
242 if (!Suspend())
205 Interlocked.Decrement(ref m_suspended);
243 return false;
244
245 Interlocked.Increment(ref m_activeThreads);
206 } while (true);
246 } while (true);
207 }
247 }
208
248
@@ -210,6 +250,7 namespace Implab.Parallels {
210
250
211 void Worker() {
251 void Worker() {
212 TUnit unit;
252 TUnit unit;
253 Interlocked.Increment(ref m_activeThreads);
213 while (FetchTask(out unit))
254 while (FetchTask(out unit))
214 InvokeUnit(unit);
255 InvokeUnit(unit);
215 }
256 }
@@ -57,12 +57,11 namespace Implab.Parallels {
57 var len = Interlocked.Increment(ref m_queueLength);
57 var len = Interlocked.Increment(ref m_queueLength);
58 m_queue.Enqueue(unit);
58 m_queue.Enqueue(unit);
59
59
60 if(!ExtendPool())
60 ExtendPool();
61 WakePool();
62 }
61 }
63
62
64 protected override bool ExtendPool() {
63 protected override bool ExtendPool() {
65 if (m_queueLength <= m_threshold*ThreadCount)
64 if (m_queueLength <= m_threshold*ActiveThreads)
66 // in this case we are in active thread and it request for additional workers
65 // in this case we are in active thread and it request for additional workers
67 // satisfy it only when queue is longer than threshold
66 // satisfy it only when queue is longer than threshold
68 return false;
67 return false;
@@ -81,9 +80,11 namespace Implab.Parallels {
81 unit();
80 unit();
82 }
81 }
83
82
84 protected override void Suspend() {
83 protected override bool Suspend() {
85 if (m_queueLength == 0)
84 if (m_queueLength == 0)
86 base.Suspend();
85 return base.Suspend();
86 else
87 return true; // spin again without locks...
87 }
88 }
88 }
89 }
89 }
90 }
General Comments 0
You need to be logged in to leave comments. Login now