@@ -244,7 +244,7 namespace Implab.Test { | |||||
244 | [TestMethod] |
|
244 | [TestMethod] | |
245 | public void ChainedMapTest() { |
|
245 | public void ChainedMapTest() { | |
246 |
|
246 | |||
247 |
using (var pool = new WorkerPool( |
|
247 | using (var pool = new WorkerPool(0,100,0)) { | |
248 | int count = 10000; |
|
248 | int count = 10000; | |
249 |
|
249 | |||
250 | double[] args = new double[count]; |
|
250 | double[] args = new double[count]; | |
@@ -255,7 +255,7 namespace Implab.Test { | |||||
255 |
|
255 | |||
256 | var t = Environment.TickCount; |
|
256 | var t = Environment.TickCount; | |
257 | var res = args |
|
257 | var res = args | |
258 | .ChainedMap( |
|
258 | .ChainedMap2( | |
259 | x => pool.Invoke( |
|
259 | x => pool.Invoke( | |
260 | () => Math.Sin(x * x) |
|
260 | () => Math.Sin(x * x) | |
261 | ), |
|
261 | ), |
1 | NO CONTENT: modified file, binary diff hidden |
|
NO CONTENT: modified file, binary diff hidden |
@@ -167,5 +167,46 namespace Implab.Parallels { | |||||
167 |
|
167 | |||
168 | return promise.Anyway(() => semaphore.Dispose()); |
|
168 | return promise.Anyway(() => semaphore.Dispose()); | |
169 | } |
|
169 | } | |
|
170 | ||||
|
171 | /* | |||
|
172 | this method is pretty fast, but it may cause a stack overflow if an element transformation is made faster then the next operation is | |||
|
173 | be chained, in this case the syncronous callback invocation will occur | |||
|
174 | ||||
|
175 | public static Promise<TDst[]> ChainedMap2<TSrc, TDst>(this TSrc[] source, ChainedOperation<TSrc, TDst> transform, int threads) { | |||
|
176 | if (source == null) | |||
|
177 | throw new ArgumentNullException("source"); | |||
|
178 | if (transform == null) | |||
|
179 | throw new ArgumentNullException("transform"); | |||
|
180 | if (threads <= 0) | |||
|
181 | throw new ArgumentOutOfRangeException("Threads number must be greater then zero"); | |||
|
182 | ||||
|
183 | var promise = new Promise<TDst[]>(); | |||
|
184 | var res = new TDst[source.Length]; | |||
|
185 | var index = -1; // we will start with increment | |||
|
186 | var len = source.Length; | |||
|
187 | var pending = len; | |||
|
188 | ||||
|
189 | Action<int> callback = null; | |||
|
190 | callback = (current) => { | |||
|
191 | if (current < len) { | |||
|
192 | transform(source[current]) | |||
|
193 | .Then( | |||
|
194 | x => { | |||
|
195 | res[current] = x; | |||
|
196 | if (Interlocked.Decrement(ref pending) == 0) | |||
|
197 | promise.Resolve(res); | |||
|
198 | else | |||
|
199 | callback(Interlocked.Increment(ref index)); | |||
|
200 | }, | |||
|
201 | e => promise.Reject(e) | |||
|
202 | ); | |||
|
203 | } | |||
|
204 | }; | |||
|
205 | ||||
|
206 | for (int i = 0; i < threads; i++) | |||
|
207 | callback(Interlocked.Increment(ref index)); | |||
|
208 | return promise; | |||
|
209 | } | |||
|
210 | */ | |||
170 | } |
|
211 | } | |
171 | } |
|
212 | } |
@@ -151,7 +151,10 namespace Implab.Parallels { | |||||
151 | } |
|
151 | } | |
152 | } else { |
|
152 | } else { | |
153 | // if there is no sleeping threads in the pool |
|
153 | // if there is no sleeping threads in the pool | |
154 |
StartWorker() |
|
154 | if (!StartWorker()) | |
|
155 | // we haven't started a new thread, but the current can be on the way and it can't process the queue | |||
|
156 | // send it a signal to spin again | |||
|
157 | SignalThread(); | |||
155 | } |
|
158 | } | |
156 | } |
|
159 | } | |
157 |
|
160 | |||
@@ -281,7 +284,6 namespace Implab.Parallels { | |||||
281 | m_hasTasks.Dispose(); |
|
284 | m_hasTasks.Dispose(); | |
282 | else |
|
285 | else | |
283 | SignalThread(); // wake next worker |
|
286 | SignalThread(); // wake next worker | |
284 | unit = default(TUnit); |
|
|||
285 | break; |
|
287 | break; | |
286 | } |
|
288 | } | |
287 |
|
289 |
@@ -44,7 +44,7 namespace Implab.Parallels { | |||||
44 | // this is the last element, |
|
44 | // this is the last element, | |
45 | // then try to update the tail |
|
45 | // then try to update the tail | |
46 | if (first != Interlocked.CompareExchange(ref m_last, null, first)) { |
|
46 | if (first != Interlocked.CompareExchange(ref m_last, null, first)) { | |
47 | // this is a ace condition |
|
47 | // this is a race condition | |
48 | if (m_last == null) |
|
48 | if (m_last == null) | |
49 | // the queue is empty |
|
49 | // the queue is empty | |
50 | return false; |
|
50 | return false; |
General Comments 0
You need to be logged in to leave comments.
Login now