@@ -244,7 +244,7 namespace Implab.Test { | |||
|
244 | 244 | [TestMethod] |
|
245 | 245 | public void ChainedMapTest() { |
|
246 | 246 | |
|
247 |
using (var pool = new WorkerPool( |
|
|
247 | using (var pool = new WorkerPool(0,100,0)) { | |
|
248 | 248 | int count = 10000; |
|
249 | 249 | |
|
250 | 250 | double[] args = new double[count]; |
@@ -255,7 +255,7 namespace Implab.Test { | |||
|
255 | 255 | |
|
256 | 256 | var t = Environment.TickCount; |
|
257 | 257 | var res = args |
|
258 | .ChainedMap( | |
|
258 | .ChainedMap2( | |
|
259 | 259 | x => pool.Invoke( |
|
260 | 260 | () => Math.Sin(x * x) |
|
261 | 261 | ), |
|
1 | NO CONTENT: modified file, binary diff hidden |
@@ -167,5 +167,46 namespace Implab.Parallels { | |||
|
167 | 167 | |
|
168 | 168 | return promise.Anyway(() => semaphore.Dispose()); |
|
169 | 169 | } |
|
170 | ||
|
171 | /* | |
|
172 | this method is pretty fast, but it may cause a stack overflow if an element transformation is made faster then the next operation is | |
|
173 | be chained, in this case the syncronous callback invocation will occur | |
|
174 | ||
|
175 | public static Promise<TDst[]> ChainedMap2<TSrc, TDst>(this TSrc[] source, ChainedOperation<TSrc, TDst> transform, int threads) { | |
|
176 | if (source == null) | |
|
177 | throw new ArgumentNullException("source"); | |
|
178 | if (transform == null) | |
|
179 | throw new ArgumentNullException("transform"); | |
|
180 | if (threads <= 0) | |
|
181 | throw new ArgumentOutOfRangeException("Threads number must be greater then zero"); | |
|
182 | ||
|
183 | var promise = new Promise<TDst[]>(); | |
|
184 | var res = new TDst[source.Length]; | |
|
185 | var index = -1; // we will start with increment | |
|
186 | var len = source.Length; | |
|
187 | var pending = len; | |
|
188 | ||
|
189 | Action<int> callback = null; | |
|
190 | callback = (current) => { | |
|
191 | if (current < len) { | |
|
192 | transform(source[current]) | |
|
193 | .Then( | |
|
194 | x => { | |
|
195 | res[current] = x; | |
|
196 | if (Interlocked.Decrement(ref pending) == 0) | |
|
197 | promise.Resolve(res); | |
|
198 | else | |
|
199 | callback(Interlocked.Increment(ref index)); | |
|
200 | }, | |
|
201 | e => promise.Reject(e) | |
|
202 | ); | |
|
203 | } | |
|
204 | }; | |
|
205 | ||
|
206 | for (int i = 0; i < threads; i++) | |
|
207 | callback(Interlocked.Increment(ref index)); | |
|
208 | return promise; | |
|
209 | } | |
|
210 | */ | |
|
170 | 211 | } |
|
171 | 212 | } |
@@ -151,7 +151,10 namespace Implab.Parallels { | |||
|
151 | 151 | } |
|
152 | 152 | } else { |
|
153 | 153 | // if there is no sleeping threads in the pool |
|
154 |
StartWorker() |
|
|
154 | if (!StartWorker()) | |
|
155 | // we haven't started a new thread, but the current can be on the way and it can't process the queue | |
|
156 | // send it a signal to spin again | |
|
157 | SignalThread(); | |
|
155 | 158 | } |
|
156 | 159 | } |
|
157 | 160 | |
@@ -281,7 +284,6 namespace Implab.Parallels { | |||
|
281 | 284 | m_hasTasks.Dispose(); |
|
282 | 285 | else |
|
283 | 286 | SignalThread(); // wake next worker |
|
284 | unit = default(TUnit); | |
|
285 | 287 | break; |
|
286 | 288 | } |
|
287 | 289 |
@@ -44,7 +44,7 namespace Implab.Parallels { | |||
|
44 | 44 | // this is the last element, |
|
45 | 45 | // then try to update the tail |
|
46 | 46 | if (first != Interlocked.CompareExchange(ref m_last, null, first)) { |
|
47 | // this is a ace condition | |
|
47 | // this is a race condition | |
|
48 | 48 | if (m_last == null) |
|
49 | 49 | // the queue is empty |
|
50 | 50 | return false; |
General Comments 0
You need to be logged in to leave comments.
Login now