##// END OF EJS Templates
fixed dispatch pool race condition
cin -
r24:ee04e1fa78da default
parent child
Show More
@@ -244,7 +244,7 namespace Implab.Test {
244 244 [TestMethod]
245 245 public void ChainedMapTest() {
246 246
247 using (var pool = new WorkerPool(4,4,0)) {
247 using (var pool = new WorkerPool(0,100,0)) {
248 248 int count = 10000;
249 249
250 250 double[] args = new double[count];
@@ -255,7 +255,7 namespace Implab.Test {
255 255
256 256 var t = Environment.TickCount;
257 257 var res = args
258 .ChainedMap(
258 .ChainedMap2(
259 259 x => pool.Invoke(
260 260 () => Math.Sin(x * x)
261 261 ),
1 NO CONTENT: modified file, binary diff hidden
@@ -167,5 +167,46 namespace Implab.Parallels {
167 167
168 168 return promise.Anyway(() => semaphore.Dispose());
169 169 }
170
171 /*
172 this method is pretty fast, but it may cause a stack overflow if an element transformation is made faster then the next operation is
173 be chained, in this case the syncronous callback invocation will occur
174
175 public static Promise<TDst[]> ChainedMap2<TSrc, TDst>(this TSrc[] source, ChainedOperation<TSrc, TDst> transform, int threads) {
176 if (source == null)
177 throw new ArgumentNullException("source");
178 if (transform == null)
179 throw new ArgumentNullException("transform");
180 if (threads <= 0)
181 throw new ArgumentOutOfRangeException("Threads number must be greater then zero");
182
183 var promise = new Promise<TDst[]>();
184 var res = new TDst[source.Length];
185 var index = -1; // we will start with increment
186 var len = source.Length;
187 var pending = len;
188
189 Action<int> callback = null;
190 callback = (current) => {
191 if (current < len) {
192 transform(source[current])
193 .Then(
194 x => {
195 res[current] = x;
196 if (Interlocked.Decrement(ref pending) == 0)
197 promise.Resolve(res);
198 else
199 callback(Interlocked.Increment(ref index));
200 },
201 e => promise.Reject(e)
202 );
203 }
204 };
205
206 for (int i = 0; i < threads; i++)
207 callback(Interlocked.Increment(ref index));
208 return promise;
209 }
210 */
170 211 }
171 212 }
@@ -151,7 +151,10 namespace Implab.Parallels {
151 151 }
152 152 } else {
153 153 // if there is no sleeping threads in the pool
154 StartWorker();
154 if (!StartWorker())
155 // we haven't started a new thread, but the current can be on the way and it can't process the queue
156 // send it a signal to spin again
157 SignalThread();
155 158 }
156 159 }
157 160
@@ -281,7 +284,6 namespace Implab.Parallels {
281 284 m_hasTasks.Dispose();
282 285 else
283 286 SignalThread(); // wake next worker
284 unit = default(TUnit);
285 287 break;
286 288 }
287 289
@@ -44,7 +44,7 namespace Implab.Parallels {
44 44 // this is the last element,
45 45 // then try to update the tail
46 46 if (first != Interlocked.CompareExchange(ref m_last, null, first)) {
47 // this is a ace condition
47 // this is a race condition
48 48 if (m_last == null)
49 49 // the queue is empty
50 50 return false;
General Comments 0
You need to be logged in to leave comments. Login now