##// END OF EJS Templates
fixed dispatch pool race condition
cin -
r24:ee04e1fa78da default
parent child
Show More
@@ -244,7 +244,7 namespace Implab.Test {
244 [TestMethod]
244 [TestMethod]
245 public void ChainedMapTest() {
245 public void ChainedMapTest() {
246
246
247 using (var pool = new WorkerPool(4,4,0)) {
247 using (var pool = new WorkerPool(0,100,0)) {
248 int count = 10000;
248 int count = 10000;
249
249
250 double[] args = new double[count];
250 double[] args = new double[count];
@@ -255,7 +255,7 namespace Implab.Test {
255
255
256 var t = Environment.TickCount;
256 var t = Environment.TickCount;
257 var res = args
257 var res = args
258 .ChainedMap(
258 .ChainedMap2(
259 x => pool.Invoke(
259 x => pool.Invoke(
260 () => Math.Sin(x * x)
260 () => Math.Sin(x * x)
261 ),
261 ),
1 NO CONTENT: modified file, binary diff hidden
NO CONTENT: modified file, binary diff hidden
@@ -167,5 +167,46 namespace Implab.Parallels {
167
167
168 return promise.Anyway(() => semaphore.Dispose());
168 return promise.Anyway(() => semaphore.Dispose());
169 }
169 }
170
171 /*
172 this method is pretty fast, but it may cause a stack overflow if an element transformation is made faster then the next operation is
173 be chained, in this case the syncronous callback invocation will occur
174
175 public static Promise<TDst[]> ChainedMap2<TSrc, TDst>(this TSrc[] source, ChainedOperation<TSrc, TDst> transform, int threads) {
176 if (source == null)
177 throw new ArgumentNullException("source");
178 if (transform == null)
179 throw new ArgumentNullException("transform");
180 if (threads <= 0)
181 throw new ArgumentOutOfRangeException("Threads number must be greater then zero");
182
183 var promise = new Promise<TDst[]>();
184 var res = new TDst[source.Length];
185 var index = -1; // we will start with increment
186 var len = source.Length;
187 var pending = len;
188
189 Action<int> callback = null;
190 callback = (current) => {
191 if (current < len) {
192 transform(source[current])
193 .Then(
194 x => {
195 res[current] = x;
196 if (Interlocked.Decrement(ref pending) == 0)
197 promise.Resolve(res);
198 else
199 callback(Interlocked.Increment(ref index));
200 },
201 e => promise.Reject(e)
202 );
203 }
204 };
205
206 for (int i = 0; i < threads; i++)
207 callback(Interlocked.Increment(ref index));
208 return promise;
209 }
210 */
170 }
211 }
171 }
212 }
@@ -151,7 +151,10 namespace Implab.Parallels {
151 }
151 }
152 } else {
152 } else {
153 // if there is no sleeping threads in the pool
153 // if there is no sleeping threads in the pool
154 StartWorker();
154 if (!StartWorker())
155 // we haven't started a new thread, but the current can be on the way and it can't process the queue
156 // send it a signal to spin again
157 SignalThread();
155 }
158 }
156 }
159 }
157
160
@@ -281,7 +284,6 namespace Implab.Parallels {
281 m_hasTasks.Dispose();
284 m_hasTasks.Dispose();
282 else
285 else
283 SignalThread(); // wake next worker
286 SignalThread(); // wake next worker
284 unit = default(TUnit);
285 break;
287 break;
286 }
288 }
287
289
@@ -44,7 +44,7 namespace Implab.Parallels {
44 // this is the last element,
44 // this is the last element,
45 // then try to update the tail
45 // then try to update the tail
46 if (first != Interlocked.CompareExchange(ref m_last, null, first)) {
46 if (first != Interlocked.CompareExchange(ref m_last, null, first)) {
47 // this is a ace condition
47 // this is a race condition
48 if (m_last == null)
48 if (m_last == null)
49 // the queue is empty
49 // the queue is empty
50 return false;
50 return false;
General Comments 0
You need to be logged in to leave comments. Login now