@@ -242,6 +242,38 namespace Implab.Test { | |||||
242 | } |
|
242 | } | |
243 |
|
243 | |||
244 | [TestMethod] |
|
244 | [TestMethod] | |
|
245 | public void ChainedMapTest() { | |||
|
246 | ||||
|
247 | using (var pool = new WorkerPool(1,10)) { | |||
|
248 | int count = 10000; | |||
|
249 | ||||
|
250 | double[] args = new double[count]; | |||
|
251 | var rand = new Random(); | |||
|
252 | ||||
|
253 | for (int i = 0; i < count; i++) | |||
|
254 | args[i] = rand.NextDouble(); | |||
|
255 | ||||
|
256 | var t = Environment.TickCount; | |||
|
257 | var res = args | |||
|
258 | .ChainedMap( | |||
|
259 | x => pool.Invoke( | |||
|
260 | () => Math.Sin(x * x) | |||
|
261 | ), | |||
|
262 | 4 | |||
|
263 | ) | |||
|
264 | .Join(); | |||
|
265 | ||||
|
266 | Console.WriteLine("Map complete in {0} ms", Environment.TickCount - t); | |||
|
267 | ||||
|
268 | t = Environment.TickCount; | |||
|
269 | for (int i = 0; i < count; i++) | |||
|
270 | Assert.AreEqual(Math.Sin(args[i] * args[i]), res[i]); | |||
|
271 | Console.WriteLine("Verified in {0} ms", Environment.TickCount - t); | |||
|
272 | Console.WriteLine("Max workers: {0}", pool.MaxRunningThreads); | |||
|
273 | } | |||
|
274 | } | |||
|
275 | ||||
|
276 | [TestMethod] | |||
245 | public void ParallelForEachTest() { |
|
277 | public void ParallelForEachTest() { | |
246 |
|
278 | |||
247 | int count = 100000; |
|
279 | int count = 100000; |
1 | NO CONTENT: modified file, binary diff hidden |
|
NO CONTENT: modified file, binary diff hidden |
@@ -39,26 +39,14 namespace Implab.Parallels { | |||||
39 | } |
|
39 | } | |
40 |
|
40 | |||
41 | protected override bool TryDequeue(out int unit) { |
|
41 | protected override bool TryDequeue(out int unit) { | |
42 | int index; |
|
42 | unit = Interlocked.Increment(ref m_next) - 1; | |
43 | unit = -1; |
|
43 | return unit >= m_source.Length ? false : true; | |
44 | do { |
|
|||
45 | index = m_next; |
|
|||
46 | if (index >= m_source.Length) |
|
|||
47 | return false; |
|
|||
48 | } while (index != Interlocked.CompareExchange(ref m_next, index + 1, index)); |
|
|||
49 |
|
||||
50 | unit = index; |
|
|||
51 | return true; |
|
|||
52 | } |
|
44 | } | |
53 |
|
45 | |||
54 | protected override void InvokeUnit(int unit) { |
|
46 | protected override void InvokeUnit(int unit) { | |
55 | try { |
|
47 | try { | |
56 | m_action(m_source[unit]); |
|
48 | m_action(m_source[unit]); | |
57 | int pending; |
|
49 | var pending = Interlocked.Decrement(ref m_pending); | |
58 | do { |
|
|||
59 | pending = m_pending; |
|
|||
60 | } while (pending != Interlocked.CompareExchange(ref m_pending, pending - 1, pending)); |
|
|||
61 | pending--; |
|
|||
62 | if (pending == 0) |
|
50 | if (pending == 0) | |
63 | m_promise.Resolve(m_source.Length); |
|
51 | m_promise.Resolve(m_source.Length); | |
64 | } catch (Exception e) { |
|
52 | } catch (Exception e) { | |
@@ -101,26 +89,14 namespace Implab.Parallels { | |||||
101 | } |
|
89 | } | |
102 |
|
90 | |||
103 | protected override bool TryDequeue(out int unit) { |
|
91 | protected override bool TryDequeue(out int unit) { | |
104 | int index; |
|
92 | unit = Interlocked.Increment(ref m_next) - 1; | |
105 | unit = -1; |
|
93 | return unit >= m_source.Length ? false : true; | |
106 | do { |
|
|||
107 | index = m_next; |
|
|||
108 | if (index >= m_source.Length) |
|
|||
109 | return false; |
|
|||
110 | } while (index != Interlocked.CompareExchange(ref m_next, index + 1, index)); |
|
|||
111 |
|
||||
112 | unit = index; |
|
|||
113 | return true; |
|
|||
114 | } |
|
94 | } | |
115 |
|
95 | |||
116 | protected override void InvokeUnit(int unit) { |
|
96 | protected override void InvokeUnit(int unit) { | |
117 | try { |
|
97 | try { | |
118 | m_dest[unit] = m_transform(m_source[unit]); |
|
98 | m_dest[unit] = m_transform(m_source[unit]); | |
119 | int pending; |
|
99 | var pending = Interlocked.Decrement(ref m_pending); | |
120 | do { |
|
|||
121 | pending = m_pending; |
|
|||
122 | } while ( pending != Interlocked.CompareExchange(ref m_pending,pending -1, pending)); |
|
|||
123 | pending --; |
|
|||
124 | if (pending == 0) |
|
100 | if (pending == 0) | |
125 | m_promise.Resolve(m_dest); |
|
101 | m_promise.Resolve(m_dest); | |
126 | } catch (Exception e) { |
|
102 | } catch (Exception e) { | |
@@ -148,5 +124,48 namespace Implab.Parallels { | |||||
148 | var iter = new ArrayIterator<TSrc>(source, action, threads); |
|
124 | var iter = new ArrayIterator<TSrc>(source, action, threads); | |
149 | return iter.Promise; |
|
125 | return iter.Promise; | |
150 | } |
|
126 | } | |
|
127 | ||||
|
128 | public static Promise<TDst[]> ChainedMap<TSrc, TDst>(this TSrc[] source, ChainedOperation<TSrc, TDst> transform, int threads) { | |||
|
129 | if (source == null) | |||
|
130 | throw new ArgumentNullException("source"); | |||
|
131 | if (transform == null) | |||
|
132 | throw new ArgumentNullException("transform"); | |||
|
133 | if (threads <= 0) | |||
|
134 | throw new ArgumentOutOfRangeException("Threads number must be greater then zero"); | |||
|
135 | ||||
|
136 | var promise = new Promise<TDst[]>(); | |||
|
137 | var res = new TDst[source.Length]; | |||
|
138 | var pending = source.Length; | |||
|
139 | var semaphore = new Semaphore(threads, threads); | |||
|
140 | ||||
|
141 | AsyncPool.InvokeNewThread(() => { | |||
|
142 | for (int i = 0; i < source.Length; i++) { | |||
|
143 | if(promise.State != PromiseState.Unresolved) | |||
|
144 | break; // stop processing in case of error or cancellation | |||
|
145 | var idx = i; | |||
|
146 | semaphore.WaitOne(); | |||
|
147 | try { | |||
|
148 | var p1 = transform(source[i]); | |||
|
149 | p1.Anyway(() => semaphore.Release()); | |||
|
150 | p1.Cancelled(() => semaphore.Release()); | |||
|
151 | p1.Then( | |||
|
152 | x => { | |||
|
153 | res[idx] = x; | |||
|
154 | var left = Interlocked.Decrement(ref pending); | |||
|
155 | if (left == 0) | |||
|
156 | promise.Resolve(res); | |||
|
157 | }, | |||
|
158 | e => promise.Reject(e) | |||
|
159 | ); | |||
|
160 | ||||
|
161 | } catch (Exception e) { | |||
|
162 | promise.Reject(e); | |||
151 | } |
|
163 | } | |
152 | } |
|
164 | } | |
|
165 | return 0; | |||
|
166 | }); | |||
|
167 | ||||
|
168 | return promise.Anyway(() => semaphore.Dispose()); | |||
|
169 | } | |||
|
170 | } | |||
|
171 | } |
@@ -36,7 +36,6 namespace Implab.Parallels { | |||||
36 | } |
|
36 | } | |
37 | }); |
|
37 | }); | |
38 | worker.IsBackground = true; |
|
38 | worker.IsBackground = true; | |
39 |
|
||||
40 | worker.Start(); |
|
39 | worker.Start(); | |
41 |
|
40 | |||
42 | return p; |
|
41 | return p; |
@@ -63,9 +63,10 namespace Implab.Parallels { | |||||
63 | } |
|
63 | } | |
64 |
|
64 | |||
65 | bool StartWorker() { |
|
65 | bool StartWorker() { | |
66 |
|
|
66 | int current; | |
67 | // use spins to allocate slot for the new thread |
|
67 | // use spins to allocate slot for the new thread | |
68 | do { |
|
68 | do { | |
|
69 | current = m_runningThreads; | |||
69 | if (current >= m_maxThreads || m_exitRequired != 0) |
|
70 | if (current >= m_maxThreads || m_exitRequired != 0) | |
70 | // no more slots left or the pool has been disposed |
|
71 | // no more slots left or the pool has been disposed | |
71 | return false; |
|
72 | return false; | |
@@ -84,24 +85,33 namespace Implab.Parallels { | |||||
84 |
|
85 | |||
85 | protected abstract bool TryDequeue(out TUnit unit); |
|
86 | protected abstract bool TryDequeue(out TUnit unit); | |
86 |
|
87 | |||
87 | protected virtual void WakeNewWorker() { |
|
88 | protected virtual void WakeNewWorker(bool extend) { | |
88 | if (m_suspended > 0) |
|
89 | if (m_suspended > 0) | |
89 | m_hasTasks.Set(); |
|
90 | m_hasTasks.Set(); | |
90 | else |
|
91 | else | |
91 | StartWorker(); |
|
92 | StartWorker(); | |
92 | } |
|
93 | } | |
93 |
|
94 | |||
|
95 | /// <summary> | |||
|
96 | /// Запускает либо новый поток, если раньше не было ни одного потока, либо устанавливает событие пробуждение одного спящего потока | |||
|
97 | /// </summary> | |||
|
98 | protected void StartIfIdle() { | |||
|
99 | int threads; | |||
|
100 | do { | |||
|
101 | ||||
|
102 | } | |||
|
103 | } | |||
|
104 | ||||
|
105 | protected virtual void Suspend() { | |||
|
106 | m_hasTasks.WaitOne(); | |||
|
107 | } | |||
|
108 | ||||
94 | bool FetchTask(out TUnit unit) { |
|
109 | bool FetchTask(out TUnit unit) { | |
95 | do { |
|
110 | do { | |
96 | // exit if requested |
|
111 | // exit if requested | |
97 | if (m_exitRequired != 0) { |
|
112 | if (m_exitRequired != 0) { | |
98 | // release the thread slot |
|
113 | // release the thread slot | |
99 | int running; |
|
114 | var running = Interlocked.Decrement(ref m_runningThreads); | |
100 | do { |
|
|||
101 | running = m_runningThreads; |
|
|||
102 | } while (running != Interlocked.CompareExchange(ref m_runningThreads, running - 1, running)); |
|
|||
103 | running--; |
|
|||
104 |
|
||||
105 | if (running == 0) // it was the last worker |
|
115 | if (running == 0) // it was the last worker | |
106 | m_hasTasks.Dispose(); |
|
116 | m_hasTasks.Dispose(); | |
107 | else |
|
117 | else | |
@@ -112,7 +122,7 namespace Implab.Parallels { | |||||
112 |
|
122 | |||
113 | // fetch task |
|
123 | // fetch task | |
114 | if (TryDequeue(out unit)) { |
|
124 | if (TryDequeue(out unit)) { | |
115 | WakeNewWorker(); |
|
125 | WakeNewWorker(true); | |
116 | return true; |
|
126 | return true; | |
117 | } |
|
127 | } | |
118 |
|
128 | |||
@@ -122,19 +132,21 namespace Implab.Parallels { | |||||
122 | do { |
|
132 | do { | |
123 | runningThreads = m_runningThreads; |
|
133 | runningThreads = m_runningThreads; | |
124 | if (runningThreads <= m_minThreads) { |
|
134 | if (runningThreads <= m_minThreads) { | |
|
135 | // check wheather this is the last thread and we have tasks | |||
|
136 | ||||
125 | exit = false; |
|
137 | exit = false; | |
126 | break; |
|
138 | break; | |
127 | } |
|
139 | } | |
128 | } while (runningThreads != Interlocked.CompareExchange(ref m_runningThreads, runningThreads - 1, runningThreads)); |
|
140 | } while (runningThreads != Interlocked.CompareExchange(ref m_runningThreads, runningThreads - 1, runningThreads)); | |
129 |
|
141 | |||
130 | if (exit) { |
|
142 | if (exit) { | |
131 | Interlocked.Decrement(ref m_runningThreads); |
|
|||
132 | return false; |
|
143 | return false; | |
133 | } |
|
144 | } | |
134 |
|
145 | |||
135 |
// |
|
146 | // entering suspend state | |
136 | Interlocked.Increment(ref m_suspended); |
|
147 | Interlocked.Increment(ref m_suspended); | |
137 | m_hasTasks.WaitOne(); |
|
148 | // keep this thread and wait | |
|
149 | Suspend(); | |||
138 | Interlocked.Decrement(ref m_suspended); |
|
150 | Interlocked.Decrement(ref m_suspended); | |
139 | } while (true); |
|
151 | } while (true); | |
140 | } |
|
152 | } |
@@ -10,9 +10,16 namespace Implab.Parallels { | |||||
10 |
|
10 | |||
11 | MTQueue<Action> m_queue = new MTQueue<Action>(); |
|
11 | MTQueue<Action> m_queue = new MTQueue<Action>(); | |
12 | int m_queueLength = 0; |
|
12 | int m_queueLength = 0; | |
|
13 | readonly int m_threshold = 1; | |||
13 |
|
14 | |||
14 | public WorkerPool(int minThreads, int maxThreads) |
|
15 | public WorkerPool(int minThreads, int maxThreads, int threshold) | |
15 | : base(minThreads, maxThreads) { |
|
16 | : base(minThreads, maxThreads) { | |
|
17 | m_threshold = threshold; | |||
|
18 | InitPool(); | |||
|
19 | } | |||
|
20 | ||||
|
21 | public WorkerPool(int minThreads, int maxThreads) : | |||
|
22 | base(minThreads, maxThreads) { | |||
16 |
|
|
23 | InitPool(); | |
17 | } |
|
24 | } | |
18 |
|
25 | |||
@@ -47,11 +54,20 namespace Implab.Parallels { | |||||
47 |
|
54 | |||
48 | protected void EnqueueTask(Action unit) { |
|
55 | protected void EnqueueTask(Action unit) { | |
49 | Debug.Assert(unit != null); |
|
56 | Debug.Assert(unit != null); | |
50 | Interlocked.Increment(ref m_queueLength); |
|
57 | var len = Interlocked.Increment(ref m_queueLength); | |
51 | m_queue.Enqueue(unit); |
|
58 | m_queue.Enqueue(unit); | |
52 | // if there are sleeping threads in the pool wake one |
|
59 | ||
53 | // probably this will lead a dry run |
|
60 | if (ThreadCount == 0) | |
54 | WakeNewWorker(); |
|
61 | // force to start | |
|
62 | WakeNewWorker(false); | |||
|
63 | } | |||
|
64 | ||||
|
65 | protected override void WakeNewWorker(bool extend) { | |||
|
66 | if (extend && m_queueLength <= m_threshold) | |||
|
67 | // in this case we are in active thread and it request for additional workers | |||
|
68 | // satisfy it only when queue is longer than threshold | |||
|
69 | return; | |||
|
70 | base.WakeNewWorker(extend); | |||
55 | } |
|
71 | } | |
56 |
|
72 | |||
57 | protected override bool TryDequeue(out Action unit) { |
|
73 | protected override bool TryDequeue(out Action unit) { | |
@@ -65,5 +81,10 namespace Implab.Parallels { | |||||
65 | protected override void InvokeUnit(Action unit) { |
|
81 | protected override void InvokeUnit(Action unit) { | |
66 | unit(); |
|
82 | unit(); | |
67 | } |
|
83 | } | |
|
84 | ||||
|
85 | protected override void Suspend() { | |||
|
86 | if (m_queueLength == 0) | |||
|
87 | base.Suspend(); | |||
68 | } |
|
88 | } | |
69 | } |
|
89 | } | |
|
90 | } |
@@ -103,11 +103,16 namespace Implab { | |||||
103 | /// <summary> |
|
103 | /// <summary> | |
104 | /// Выполняет обещание, сообщая об ошибке |
|
104 | /// Выполняет обещание, сообщая об ошибке | |
105 | /// </summary> |
|
105 | /// </summary> | |
|
106 | /// <remarks> | |||
|
107 | /// Поскольку обещание должно работать в многопточной среде, при его выполнении сразу несколько потоков | |||
|
108 | /// могу вернуть ошибку, при этом только первая будет использована в качестве результата, остальные | |||
|
109 | /// будут проигнорированы. | |||
|
110 | /// </remarks> | |||
106 | /// <param name="error">Исключение возникшее при выполнении операции</param> |
|
111 | /// <param name="error">Исключение возникшее при выполнении операции</param> | |
107 | /// <exception cref="InvalidOperationException">Данное обещание уже выполнено</exception> |
|
112 | /// <exception cref="InvalidOperationException">Данное обещание уже выполнено</exception> | |
108 | public void Reject(Exception error) { |
|
113 | public void Reject(Exception error) { | |
109 | lock (m_lock) { |
|
114 | lock (m_lock) { | |
110 | if (m_state == PromiseState.Cancelled) |
|
115 | if (m_state == PromiseState.Cancelled || m_state == PromiseState.Rejected) | |
111 | return; |
|
116 | return; | |
112 | if (m_state != PromiseState.Unresolved) |
|
117 | if (m_state != PromiseState.Unresolved) | |
113 | throw new InvalidOperationException("The promise is already resolved"); |
|
118 | throw new InvalidOperationException("The promise is already resolved"); |
General Comments 0
You need to be logged in to leave comments.
Login now