| @@ -242,6 +242,38 namespace Implab.Test { | |||||
| 242 | } |
|
242 | } | |
| 243 |
|
243 | |||
| 244 | [TestMethod] |
|
244 | [TestMethod] | |
|
|
245 | public void ChainedMapTest() { | |||
|
|
246 | ||||
|
|
247 | using (var pool = new WorkerPool(1,10)) { | |||
|
|
248 | int count = 10000; | |||
|
|
249 | ||||
|
|
250 | double[] args = new double[count]; | |||
|
|
251 | var rand = new Random(); | |||
|
|
252 | ||||
|
|
253 | for (int i = 0; i < count; i++) | |||
|
|
254 | args[i] = rand.NextDouble(); | |||
|
|
255 | ||||
|
|
256 | var t = Environment.TickCount; | |||
|
|
257 | var res = args | |||
|
|
258 | .ChainedMap( | |||
|
|
259 | x => pool.Invoke( | |||
|
|
260 | () => Math.Sin(x * x) | |||
|
|
261 | ), | |||
|
|
262 | 4 | |||
|
|
263 | ) | |||
|
|
264 | .Join(); | |||
|
|
265 | ||||
|
|
266 | Console.WriteLine("Map complete in {0} ms", Environment.TickCount - t); | |||
|
|
267 | ||||
|
|
268 | t = Environment.TickCount; | |||
|
|
269 | for (int i = 0; i < count; i++) | |||
|
|
270 | Assert.AreEqual(Math.Sin(args[i] * args[i]), res[i]); | |||
|
|
271 | Console.WriteLine("Verified in {0} ms", Environment.TickCount - t); | |||
|
|
272 | Console.WriteLine("Max workers: {0}", pool.MaxRunningThreads); | |||
|
|
273 | } | |||
|
|
274 | } | |||
|
|
275 | ||||
|
|
276 | [TestMethod] | |||
| 245 | public void ParallelForEachTest() { |
|
277 | public void ParallelForEachTest() { | |
| 246 |
|
278 | |||
| 247 | int count = 100000; |
|
279 | int count = 100000; | |
| 1 | NO CONTENT: modified file, binary diff hidden |
|
NO CONTENT: modified file, binary diff hidden |
| @@ -39,26 +39,14 namespace Implab.Parallels { | |||||
| 39 | } |
|
39 | } | |
| 40 |
|
40 | |||
| 41 | protected override bool TryDequeue(out int unit) { |
|
41 | protected override bool TryDequeue(out int unit) { | |
| 42 | int index; |
|
42 | unit = Interlocked.Increment(ref m_next) - 1; | |
| 43 | unit = -1; |
|
43 | return unit >= m_source.Length ? false : true; | |
| 44 | do { |
|
|||
| 45 | index = m_next; |
|
|||
| 46 | if (index >= m_source.Length) |
|
|||
| 47 | return false; |
|
|||
| 48 | } while (index != Interlocked.CompareExchange(ref m_next, index + 1, index)); |
|
|||
| 49 |
|
||||
| 50 | unit = index; |
|
|||
| 51 | return true; |
|
|||
| 52 | } |
|
44 | } | |
| 53 |
|
45 | |||
| 54 | protected override void InvokeUnit(int unit) { |
|
46 | protected override void InvokeUnit(int unit) { | |
| 55 | try { |
|
47 | try { | |
| 56 | m_action(m_source[unit]); |
|
48 | m_action(m_source[unit]); | |
| 57 | int pending; |
|
49 | var pending = Interlocked.Decrement(ref m_pending); | |
| 58 | do { |
|
|||
| 59 | pending = m_pending; |
|
|||
| 60 | } while (pending != Interlocked.CompareExchange(ref m_pending, pending - 1, pending)); |
|
|||
| 61 | pending--; |
|
|||
| 62 | if (pending == 0) |
|
50 | if (pending == 0) | |
| 63 | m_promise.Resolve(m_source.Length); |
|
51 | m_promise.Resolve(m_source.Length); | |
| 64 | } catch (Exception e) { |
|
52 | } catch (Exception e) { | |
| @@ -101,26 +89,14 namespace Implab.Parallels { | |||||
| 101 | } |
|
89 | } | |
| 102 |
|
90 | |||
| 103 | protected override bool TryDequeue(out int unit) { |
|
91 | protected override bool TryDequeue(out int unit) { | |
| 104 | int index; |
|
92 | unit = Interlocked.Increment(ref m_next) - 1; | |
| 105 | unit = -1; |
|
93 | return unit >= m_source.Length ? false : true; | |
| 106 | do { |
|
|||
| 107 | index = m_next; |
|
|||
| 108 | if (index >= m_source.Length) |
|
|||
| 109 | return false; |
|
|||
| 110 | } while (index != Interlocked.CompareExchange(ref m_next, index + 1, index)); |
|
|||
| 111 |
|
||||
| 112 | unit = index; |
|
|||
| 113 | return true; |
|
|||
| 114 | } |
|
94 | } | |
| 115 |
|
95 | |||
| 116 | protected override void InvokeUnit(int unit) { |
|
96 | protected override void InvokeUnit(int unit) { | |
| 117 | try { |
|
97 | try { | |
| 118 | m_dest[unit] = m_transform(m_source[unit]); |
|
98 | m_dest[unit] = m_transform(m_source[unit]); | |
| 119 | int pending; |
|
99 | var pending = Interlocked.Decrement(ref m_pending); | |
| 120 | do { |
|
|||
| 121 | pending = m_pending; |
|
|||
| 122 | } while ( pending != Interlocked.CompareExchange(ref m_pending,pending -1, pending)); |
|
|||
| 123 | pending --; |
|
|||
| 124 | if (pending == 0) |
|
100 | if (pending == 0) | |
| 125 | m_promise.Resolve(m_dest); |
|
101 | m_promise.Resolve(m_dest); | |
| 126 | } catch (Exception e) { |
|
102 | } catch (Exception e) { | |
| @@ -148,5 +124,48 namespace Implab.Parallels { | |||||
| 148 | var iter = new ArrayIterator<TSrc>(source, action, threads); |
|
124 | var iter = new ArrayIterator<TSrc>(source, action, threads); | |
| 149 | return iter.Promise; |
|
125 | return iter.Promise; | |
| 150 | } |
|
126 | } | |
|
|
127 | ||||
|
|
128 | public static Promise<TDst[]> ChainedMap<TSrc, TDst>(this TSrc[] source, ChainedOperation<TSrc, TDst> transform, int threads) { | |||
|
|
129 | if (source == null) | |||
|
|
130 | throw new ArgumentNullException("source"); | |||
|
|
131 | if (transform == null) | |||
|
|
132 | throw new ArgumentNullException("transform"); | |||
|
|
133 | if (threads <= 0) | |||
|
|
134 | throw new ArgumentOutOfRangeException("Threads number must be greater then zero"); | |||
|
|
135 | ||||
|
|
136 | var promise = new Promise<TDst[]>(); | |||
|
|
137 | var res = new TDst[source.Length]; | |||
|
|
138 | var pending = source.Length; | |||
|
|
139 | var semaphore = new Semaphore(threads, threads); | |||
|
|
140 | ||||
|
|
141 | AsyncPool.InvokeNewThread(() => { | |||
|
|
142 | for (int i = 0; i < source.Length; i++) { | |||
|
|
143 | if(promise.State != PromiseState.Unresolved) | |||
|
|
144 | break; // stop processing in case of error or cancellation | |||
|
|
145 | var idx = i; | |||
|
|
146 | semaphore.WaitOne(); | |||
|
|
147 | try { | |||
|
|
148 | var p1 = transform(source[i]); | |||
|
|
149 | p1.Anyway(() => semaphore.Release()); | |||
|
|
150 | p1.Cancelled(() => semaphore.Release()); | |||
|
|
151 | p1.Then( | |||
|
|
152 | x => { | |||
|
|
153 | res[idx] = x; | |||
|
|
154 | var left = Interlocked.Decrement(ref pending); | |||
|
|
155 | if (left == 0) | |||
|
|
156 | promise.Resolve(res); | |||
|
|
157 | }, | |||
|
|
158 | e => promise.Reject(e) | |||
|
|
159 | ); | |||
|
|
160 | ||||
|
|
161 | } catch (Exception e) { | |||
|
|
162 | promise.Reject(e); | |||
|
|
163 | } | |||
|
|
164 | } | |||
|
|
165 | return 0; | |||
|
|
166 | }); | |||
|
|
167 | ||||
|
|
168 | return promise.Anyway(() => semaphore.Dispose()); | |||
|
|
169 | } | |||
| 151 | } |
|
170 | } | |
| 152 | } |
|
171 | } | |
| @@ -36,7 +36,6 namespace Implab.Parallels { | |||||
| 36 | } |
|
36 | } | |
| 37 | }); |
|
37 | }); | |
| 38 | worker.IsBackground = true; |
|
38 | worker.IsBackground = true; | |
| 39 |
|
||||
| 40 | worker.Start(); |
|
39 | worker.Start(); | |
| 41 |
|
40 | |||
| 42 | return p; |
|
41 | return p; | |
| @@ -63,9 +63,10 namespace Implab.Parallels { | |||||
| 63 | } |
|
63 | } | |
| 64 |
|
64 | |||
| 65 | bool StartWorker() { |
|
65 | bool StartWorker() { | |
| 66 |
|
|
66 | int current; | |
| 67 | // use spins to allocate slot for the new thread |
|
67 | // use spins to allocate slot for the new thread | |
| 68 | do { |
|
68 | do { | |
|
|
69 | current = m_runningThreads; | |||
| 69 | if (current >= m_maxThreads || m_exitRequired != 0) |
|
70 | if (current >= m_maxThreads || m_exitRequired != 0) | |
| 70 | // no more slots left or the pool has been disposed |
|
71 | // no more slots left or the pool has been disposed | |
| 71 | return false; |
|
72 | return false; | |
| @@ -84,24 +85,33 namespace Implab.Parallels { | |||||
| 84 |
|
85 | |||
| 85 | protected abstract bool TryDequeue(out TUnit unit); |
|
86 | protected abstract bool TryDequeue(out TUnit unit); | |
| 86 |
|
87 | |||
| 87 | protected virtual void WakeNewWorker() { |
|
88 | protected virtual void WakeNewWorker(bool extend) { | |
| 88 | if (m_suspended > 0) |
|
89 | if (m_suspended > 0) | |
| 89 | m_hasTasks.Set(); |
|
90 | m_hasTasks.Set(); | |
| 90 | else |
|
91 | else | |
| 91 | StartWorker(); |
|
92 | StartWorker(); | |
| 92 | } |
|
93 | } | |
| 93 |
|
94 | |||
|
|
95 | /// <summary> | |||
|
|
96 | /// Запускает либо новый поток, если раньше не было ни одного потока, либо устанавливает событие пробуждение одного спящего потока | |||
|
|
97 | /// </summary> | |||
|
|
98 | protected void StartIfIdle() { | |||
|
|
99 | int threads; | |||
|
|
100 | do { | |||
|
|
101 | ||||
|
|
102 | } | |||
|
|
103 | } | |||
|
|
104 | ||||
|
|
105 | protected virtual void Suspend() { | |||
|
|
106 | m_hasTasks.WaitOne(); | |||
|
|
107 | } | |||
|
|
108 | ||||
| 94 | bool FetchTask(out TUnit unit) { |
|
109 | bool FetchTask(out TUnit unit) { | |
| 95 | do { |
|
110 | do { | |
| 96 | // exit if requested |
|
111 | // exit if requested | |
| 97 | if (m_exitRequired != 0) { |
|
112 | if (m_exitRequired != 0) { | |
| 98 | // release the thread slot |
|
113 | // release the thread slot | |
| 99 | int running; |
|
114 | var running = Interlocked.Decrement(ref m_runningThreads); | |
| 100 | do { |
|
|||
| 101 | running = m_runningThreads; |
|
|||
| 102 | } while (running != Interlocked.CompareExchange(ref m_runningThreads, running - 1, running)); |
|
|||
| 103 | running--; |
|
|||
| 104 |
|
||||
| 105 | if (running == 0) // it was the last worker |
|
115 | if (running == 0) // it was the last worker | |
| 106 | m_hasTasks.Dispose(); |
|
116 | m_hasTasks.Dispose(); | |
| 107 | else |
|
117 | else | |
| @@ -112,7 +122,7 namespace Implab.Parallels { | |||||
| 112 |
|
122 | |||
| 113 | // fetch task |
|
123 | // fetch task | |
| 114 | if (TryDequeue(out unit)) { |
|
124 | if (TryDequeue(out unit)) { | |
| 115 | WakeNewWorker(); |
|
125 | WakeNewWorker(true); | |
| 116 | return true; |
|
126 | return true; | |
| 117 | } |
|
127 | } | |
| 118 |
|
128 | |||
| @@ -122,19 +132,21 namespace Implab.Parallels { | |||||
| 122 | do { |
|
132 | do { | |
| 123 | runningThreads = m_runningThreads; |
|
133 | runningThreads = m_runningThreads; | |
| 124 | if (runningThreads <= m_minThreads) { |
|
134 | if (runningThreads <= m_minThreads) { | |
|
|
135 | // check wheather this is the last thread and we have tasks | |||
|
|
136 | ||||
| 125 | exit = false; |
|
137 | exit = false; | |
| 126 | break; |
|
138 | break; | |
| 127 | } |
|
139 | } | |
| 128 | } while (runningThreads != Interlocked.CompareExchange(ref m_runningThreads, runningThreads - 1, runningThreads)); |
|
140 | } while (runningThreads != Interlocked.CompareExchange(ref m_runningThreads, runningThreads - 1, runningThreads)); | |
| 129 |
|
141 | |||
| 130 | if (exit) { |
|
142 | if (exit) { | |
| 131 | Interlocked.Decrement(ref m_runningThreads); |
|
|||
| 132 | return false; |
|
143 | return false; | |
| 133 | } |
|
144 | } | |
| 134 |
|
145 | |||
| 135 |
// |
|
146 | // entering suspend state | |
| 136 | Interlocked.Increment(ref m_suspended); |
|
147 | Interlocked.Increment(ref m_suspended); | |
| 137 | m_hasTasks.WaitOne(); |
|
148 | // keep this thread and wait | |
|
|
149 | Suspend(); | |||
| 138 | Interlocked.Decrement(ref m_suspended); |
|
150 | Interlocked.Decrement(ref m_suspended); | |
| 139 | } while (true); |
|
151 | } while (true); | |
| 140 | } |
|
152 | } | |
| @@ -10,20 +10,27 namespace Implab.Parallels { | |||||
| 10 |
|
10 | |||
| 11 | MTQueue<Action> m_queue = new MTQueue<Action>(); |
|
11 | MTQueue<Action> m_queue = new MTQueue<Action>(); | |
| 12 | int m_queueLength = 0; |
|
12 | int m_queueLength = 0; | |
|
|
13 | readonly int m_threshold = 1; | |||
| 13 |
|
14 | |||
| 14 | public WorkerPool(int minThreads, int maxThreads) |
|
15 | public WorkerPool(int minThreads, int maxThreads, int threshold) | |
| 15 | : base(minThreads, maxThreads) { |
|
16 | : base(minThreads, maxThreads) { | |
| 16 | InitPool(); |
|
17 | m_threshold = threshold; | |
|
|
18 | InitPool(); | |||
|
|
19 | } | |||
|
|
20 | ||||
|
|
21 | public WorkerPool(int minThreads, int maxThreads) : | |||
|
|
22 | base(minThreads, maxThreads) { | |||
|
|
23 | InitPool(); | |||
| 17 | } |
|
24 | } | |
| 18 |
|
25 | |||
| 19 | public WorkerPool(int threads) |
|
26 | public WorkerPool(int threads) | |
| 20 | : base(threads) { |
|
27 | : base(threads) { | |
| 21 |
|
|
28 | InitPool(); | |
| 22 | } |
|
29 | } | |
| 23 |
|
30 | |||
| 24 | public WorkerPool() |
|
31 | public WorkerPool() | |
| 25 | : base() { |
|
32 | : base() { | |
| 26 |
|
|
33 | InitPool(); | |
| 27 | } |
|
34 | } | |
| 28 |
|
35 | |||
| 29 | public Promise<T> Invoke<T>(Func<T> task) { |
|
36 | public Promise<T> Invoke<T>(Func<T> task) { | |
| @@ -47,11 +54,20 namespace Implab.Parallels { | |||||
| 47 |
|
54 | |||
| 48 | protected void EnqueueTask(Action unit) { |
|
55 | protected void EnqueueTask(Action unit) { | |
| 49 | Debug.Assert(unit != null); |
|
56 | Debug.Assert(unit != null); | |
| 50 | Interlocked.Increment(ref m_queueLength); |
|
57 | var len = Interlocked.Increment(ref m_queueLength); | |
| 51 | m_queue.Enqueue(unit); |
|
58 | m_queue.Enqueue(unit); | |
| 52 | // if there are sleeping threads in the pool wake one |
|
59 | ||
| 53 | // probably this will lead a dry run |
|
60 | if (ThreadCount == 0) | |
| 54 | WakeNewWorker(); |
|
61 | // force to start | |
|
|
62 | WakeNewWorker(false); | |||
|
|
63 | } | |||
|
|
64 | ||||
|
|
65 | protected override void WakeNewWorker(bool extend) { | |||
|
|
66 | if (extend && m_queueLength <= m_threshold) | |||
|
|
67 | // in this case we are in active thread and it request for additional workers | |||
|
|
68 | // satisfy it only when queue is longer than threshold | |||
|
|
69 | return; | |||
|
|
70 | base.WakeNewWorker(extend); | |||
| 55 | } |
|
71 | } | |
| 56 |
|
72 | |||
| 57 | protected override bool TryDequeue(out Action unit) { |
|
73 | protected override bool TryDequeue(out Action unit) { | |
| @@ -65,5 +81,10 namespace Implab.Parallels { | |||||
| 65 | protected override void InvokeUnit(Action unit) { |
|
81 | protected override void InvokeUnit(Action unit) { | |
| 66 | unit(); |
|
82 | unit(); | |
| 67 | } |
|
83 | } | |
|
|
84 | ||||
|
|
85 | protected override void Suspend() { | |||
|
|
86 | if (m_queueLength == 0) | |||
|
|
87 | base.Suspend(); | |||
|
|
88 | } | |||
| 68 | } |
|
89 | } | |
| 69 | } |
|
90 | } | |
| @@ -103,11 +103,16 namespace Implab { | |||||
| 103 | /// <summary> |
|
103 | /// <summary> | |
| 104 | /// Выполняет обещание, сообщая об ошибке |
|
104 | /// Выполняет обещание, сообщая об ошибке | |
| 105 | /// </summary> |
|
105 | /// </summary> | |
|
|
106 | /// <remarks> | |||
|
|
107 | /// Поскольку обещание должно работать в многопточной среде, при его выполнении сразу несколько потоков | |||
|
|
108 | /// могу вернуть ошибку, при этом только первая будет использована в качестве результата, остальные | |||
|
|
109 | /// будут проигнорированы. | |||
|
|
110 | /// </remarks> | |||
| 106 | /// <param name="error">Исключение возникшее при выполнении операции</param> |
|
111 | /// <param name="error">Исключение возникшее при выполнении операции</param> | |
| 107 | /// <exception cref="InvalidOperationException">Данное обещание уже выполнено</exception> |
|
112 | /// <exception cref="InvalidOperationException">Данное обещание уже выполнено</exception> | |
| 108 | public void Reject(Exception error) { |
|
113 | public void Reject(Exception error) { | |
| 109 | lock (m_lock) { |
|
114 | lock (m_lock) { | |
| 110 | if (m_state == PromiseState.Cancelled) |
|
115 | if (m_state == PromiseState.Cancelled || m_state == PromiseState.Rejected) | |
| 111 | return; |
|
116 | return; | |
| 112 | if (m_state != PromiseState.Unresolved) |
|
117 | if (m_state != PromiseState.Unresolved) | |
| 113 | throw new InvalidOperationException("The promise is already resolved"); |
|
118 | throw new InvalidOperationException("The promise is already resolved"); | |
General Comments 0
You need to be logged in to leave comments.
Login now
