@@ -242,6 +242,38 namespace Implab.Test { | |||
|
242 | 242 | } |
|
243 | 243 | |
|
244 | 244 | [TestMethod] |
|
245 | public void ChainedMapTest() { | |
|
246 | ||
|
247 | using (var pool = new WorkerPool(1,10)) { | |
|
248 | int count = 10000; | |
|
249 | ||
|
250 | double[] args = new double[count]; | |
|
251 | var rand = new Random(); | |
|
252 | ||
|
253 | for (int i = 0; i < count; i++) | |
|
254 | args[i] = rand.NextDouble(); | |
|
255 | ||
|
256 | var t = Environment.TickCount; | |
|
257 | var res = args | |
|
258 | .ChainedMap( | |
|
259 | x => pool.Invoke( | |
|
260 | () => Math.Sin(x * x) | |
|
261 | ), | |
|
262 | 4 | |
|
263 | ) | |
|
264 | .Join(); | |
|
265 | ||
|
266 | Console.WriteLine("Map complete in {0} ms", Environment.TickCount - t); | |
|
267 | ||
|
268 | t = Environment.TickCount; | |
|
269 | for (int i = 0; i < count; i++) | |
|
270 | Assert.AreEqual(Math.Sin(args[i] * args[i]), res[i]); | |
|
271 | Console.WriteLine("Verified in {0} ms", Environment.TickCount - t); | |
|
272 | Console.WriteLine("Max workers: {0}", pool.MaxRunningThreads); | |
|
273 | } | |
|
274 | } | |
|
275 | ||
|
276 | [TestMethod] | |
|
245 | 277 | public void ParallelForEachTest() { |
|
246 | 278 | |
|
247 | 279 | int count = 100000; |
|
1 | NO CONTENT: modified file, binary diff hidden |
@@ -39,26 +39,14 namespace Implab.Parallels { | |||
|
39 | 39 | } |
|
40 | 40 | |
|
41 | 41 | protected override bool TryDequeue(out int unit) { |
|
42 | int index; | |
|
43 | unit = -1; | |
|
44 | do { | |
|
45 | index = m_next; | |
|
46 | if (index >= m_source.Length) | |
|
47 | return false; | |
|
48 | } while (index != Interlocked.CompareExchange(ref m_next, index + 1, index)); | |
|
49 | ||
|
50 | unit = index; | |
|
51 | return true; | |
|
42 | unit = Interlocked.Increment(ref m_next) - 1; | |
|
43 | return unit >= m_source.Length ? false : true; | |
|
52 | 44 | } |
|
53 | 45 | |
|
54 | 46 | protected override void InvokeUnit(int unit) { |
|
55 | 47 | try { |
|
56 | 48 | m_action(m_source[unit]); |
|
57 | int pending; | |
|
58 | do { | |
|
59 | pending = m_pending; | |
|
60 | } while (pending != Interlocked.CompareExchange(ref m_pending, pending - 1, pending)); | |
|
61 | pending--; | |
|
49 | var pending = Interlocked.Decrement(ref m_pending); | |
|
62 | 50 | if (pending == 0) |
|
63 | 51 | m_promise.Resolve(m_source.Length); |
|
64 | 52 | } catch (Exception e) { |
@@ -101,26 +89,14 namespace Implab.Parallels { | |||
|
101 | 89 | } |
|
102 | 90 | |
|
103 | 91 | protected override bool TryDequeue(out int unit) { |
|
104 | int index; | |
|
105 | unit = -1; | |
|
106 | do { | |
|
107 | index = m_next; | |
|
108 | if (index >= m_source.Length) | |
|
109 | return false; | |
|
110 | } while (index != Interlocked.CompareExchange(ref m_next, index + 1, index)); | |
|
111 | ||
|
112 | unit = index; | |
|
113 | return true; | |
|
92 | unit = Interlocked.Increment(ref m_next) - 1; | |
|
93 | return unit >= m_source.Length ? false : true; | |
|
114 | 94 | } |
|
115 | 95 | |
|
116 | 96 | protected override void InvokeUnit(int unit) { |
|
117 | 97 | try { |
|
118 | 98 | m_dest[unit] = m_transform(m_source[unit]); |
|
119 | int pending; | |
|
120 | do { | |
|
121 | pending = m_pending; | |
|
122 | } while ( pending != Interlocked.CompareExchange(ref m_pending,pending -1, pending)); | |
|
123 | pending --; | |
|
99 | var pending = Interlocked.Decrement(ref m_pending); | |
|
124 | 100 | if (pending == 0) |
|
125 | 101 | m_promise.Resolve(m_dest); |
|
126 | 102 | } catch (Exception e) { |
@@ -148,5 +124,48 namespace Implab.Parallels { | |||
|
148 | 124 | var iter = new ArrayIterator<TSrc>(source, action, threads); |
|
149 | 125 | return iter.Promise; |
|
150 | 126 | } |
|
127 | ||
|
128 | public static Promise<TDst[]> ChainedMap<TSrc, TDst>(this TSrc[] source, ChainedOperation<TSrc, TDst> transform, int threads) { | |
|
129 | if (source == null) | |
|
130 | throw new ArgumentNullException("source"); | |
|
131 | if (transform == null) | |
|
132 | throw new ArgumentNullException("transform"); | |
|
133 | if (threads <= 0) | |
|
134 | throw new ArgumentOutOfRangeException("Threads number must be greater then zero"); | |
|
135 | ||
|
136 | var promise = new Promise<TDst[]>(); | |
|
137 | var res = new TDst[source.Length]; | |
|
138 | var pending = source.Length; | |
|
139 | var semaphore = new Semaphore(threads, threads); | |
|
140 | ||
|
141 | AsyncPool.InvokeNewThread(() => { | |
|
142 | for (int i = 0; i < source.Length; i++) { | |
|
143 | if(promise.State != PromiseState.Unresolved) | |
|
144 | break; // stop processing in case of error or cancellation | |
|
145 | var idx = i; | |
|
146 | semaphore.WaitOne(); | |
|
147 | try { | |
|
148 | var p1 = transform(source[i]); | |
|
149 | p1.Anyway(() => semaphore.Release()); | |
|
150 | p1.Cancelled(() => semaphore.Release()); | |
|
151 | p1.Then( | |
|
152 | x => { | |
|
153 | res[idx] = x; | |
|
154 | var left = Interlocked.Decrement(ref pending); | |
|
155 | if (left == 0) | |
|
156 | promise.Resolve(res); | |
|
157 | }, | |
|
158 | e => promise.Reject(e) | |
|
159 | ); | |
|
160 | ||
|
161 | } catch (Exception e) { | |
|
162 | promise.Reject(e); | |
|
163 | } | |
|
164 | } | |
|
165 | return 0; | |
|
166 | }); | |
|
167 | ||
|
168 | return promise.Anyway(() => semaphore.Dispose()); | |
|
169 | } | |
|
151 | 170 | } |
|
152 | 171 | } |
@@ -36,7 +36,6 namespace Implab.Parallels { | |||
|
36 | 36 | } |
|
37 | 37 | }); |
|
38 | 38 | worker.IsBackground = true; |
|
39 | ||
|
40 | 39 | worker.Start(); |
|
41 | 40 | |
|
42 | 41 | return p; |
@@ -63,9 +63,10 namespace Implab.Parallels { | |||
|
63 | 63 | } |
|
64 | 64 | |
|
65 | 65 | bool StartWorker() { |
|
66 |
|
|
|
66 | int current; | |
|
67 | 67 | // use spins to allocate slot for the new thread |
|
68 | 68 | do { |
|
69 | current = m_runningThreads; | |
|
69 | 70 | if (current >= m_maxThreads || m_exitRequired != 0) |
|
70 | 71 | // no more slots left or the pool has been disposed |
|
71 | 72 | return false; |
@@ -84,24 +85,33 namespace Implab.Parallels { | |||
|
84 | 85 | |
|
85 | 86 | protected abstract bool TryDequeue(out TUnit unit); |
|
86 | 87 | |
|
87 | protected virtual void WakeNewWorker() { | |
|
88 | protected virtual void WakeNewWorker(bool extend) { | |
|
88 | 89 | if (m_suspended > 0) |
|
89 | 90 | m_hasTasks.Set(); |
|
90 | 91 | else |
|
91 | 92 | StartWorker(); |
|
92 | 93 | } |
|
93 | 94 | |
|
95 | /// <summary> | |
|
96 | /// Запускает либо новый поток, если раньше не было ни одного потока, либо устанавливает событие пробуждение одного спящего потока | |
|
97 | /// </summary> | |
|
98 | protected void StartIfIdle() { | |
|
99 | int threads; | |
|
100 | do { | |
|
101 | ||
|
102 | } | |
|
103 | } | |
|
104 | ||
|
105 | protected virtual void Suspend() { | |
|
106 | m_hasTasks.WaitOne(); | |
|
107 | } | |
|
108 | ||
|
94 | 109 | bool FetchTask(out TUnit unit) { |
|
95 | 110 | do { |
|
96 | 111 | // exit if requested |
|
97 | 112 | if (m_exitRequired != 0) { |
|
98 | 113 | // release the thread slot |
|
99 | int running; | |
|
100 | do { | |
|
101 | running = m_runningThreads; | |
|
102 | } while (running != Interlocked.CompareExchange(ref m_runningThreads, running - 1, running)); | |
|
103 | running--; | |
|
104 | ||
|
114 | var running = Interlocked.Decrement(ref m_runningThreads); | |
|
105 | 115 | if (running == 0) // it was the last worker |
|
106 | 116 | m_hasTasks.Dispose(); |
|
107 | 117 | else |
@@ -112,7 +122,7 namespace Implab.Parallels { | |||
|
112 | 122 | |
|
113 | 123 | // fetch task |
|
114 | 124 | if (TryDequeue(out unit)) { |
|
115 | WakeNewWorker(); | |
|
125 | WakeNewWorker(true); | |
|
116 | 126 | return true; |
|
117 | 127 | } |
|
118 | 128 | |
@@ -122,19 +132,21 namespace Implab.Parallels { | |||
|
122 | 132 | do { |
|
123 | 133 | runningThreads = m_runningThreads; |
|
124 | 134 | if (runningThreads <= m_minThreads) { |
|
135 | // check wheather this is the last thread and we have tasks | |
|
136 | ||
|
125 | 137 | exit = false; |
|
126 | 138 | break; |
|
127 | 139 | } |
|
128 | 140 | } while (runningThreads != Interlocked.CompareExchange(ref m_runningThreads, runningThreads - 1, runningThreads)); |
|
129 | 141 | |
|
130 | 142 | if (exit) { |
|
131 | Interlocked.Decrement(ref m_runningThreads); | |
|
132 | 143 | return false; |
|
133 | 144 | } |
|
134 | 145 | |
|
135 |
// |
|
|
146 | // entering suspend state | |
|
136 | 147 | Interlocked.Increment(ref m_suspended); |
|
137 | m_hasTasks.WaitOne(); | |
|
148 | // keep this thread and wait | |
|
149 | Suspend(); | |
|
138 | 150 | Interlocked.Decrement(ref m_suspended); |
|
139 | 151 | } while (true); |
|
140 | 152 | } |
@@ -10,20 +10,27 namespace Implab.Parallels { | |||
|
10 | 10 | |
|
11 | 11 | MTQueue<Action> m_queue = new MTQueue<Action>(); |
|
12 | 12 | int m_queueLength = 0; |
|
13 | readonly int m_threshold = 1; | |
|
13 | 14 | |
|
14 | public WorkerPool(int minThreads, int maxThreads) | |
|
15 | public WorkerPool(int minThreads, int maxThreads, int threshold) | |
|
15 | 16 | : base(minThreads, maxThreads) { |
|
16 | InitPool(); | |
|
17 | m_threshold = threshold; | |
|
18 | InitPool(); | |
|
19 | } | |
|
20 | ||
|
21 | public WorkerPool(int minThreads, int maxThreads) : | |
|
22 | base(minThreads, maxThreads) { | |
|
23 | InitPool(); | |
|
17 | 24 | } |
|
18 | 25 | |
|
19 | 26 | public WorkerPool(int threads) |
|
20 | 27 | : base(threads) { |
|
21 |
|
|
|
28 | InitPool(); | |
|
22 | 29 | } |
|
23 | 30 | |
|
24 | 31 | public WorkerPool() |
|
25 | 32 | : base() { |
|
26 |
|
|
|
33 | InitPool(); | |
|
27 | 34 | } |
|
28 | 35 | |
|
29 | 36 | public Promise<T> Invoke<T>(Func<T> task) { |
@@ -47,11 +54,20 namespace Implab.Parallels { | |||
|
47 | 54 | |
|
48 | 55 | protected void EnqueueTask(Action unit) { |
|
49 | 56 | Debug.Assert(unit != null); |
|
50 | Interlocked.Increment(ref m_queueLength); | |
|
57 | var len = Interlocked.Increment(ref m_queueLength); | |
|
51 | 58 | m_queue.Enqueue(unit); |
|
52 | // if there are sleeping threads in the pool wake one | |
|
53 | // probably this will lead a dry run | |
|
54 | WakeNewWorker(); | |
|
59 | ||
|
60 | if (ThreadCount == 0) | |
|
61 | // force to start | |
|
62 | WakeNewWorker(false); | |
|
63 | } | |
|
64 | ||
|
65 | protected override void WakeNewWorker(bool extend) { | |
|
66 | if (extend && m_queueLength <= m_threshold) | |
|
67 | // in this case we are in active thread and it request for additional workers | |
|
68 | // satisfy it only when queue is longer than threshold | |
|
69 | return; | |
|
70 | base.WakeNewWorker(extend); | |
|
55 | 71 | } |
|
56 | 72 | |
|
57 | 73 | protected override bool TryDequeue(out Action unit) { |
@@ -65,5 +81,10 namespace Implab.Parallels { | |||
|
65 | 81 | protected override void InvokeUnit(Action unit) { |
|
66 | 82 | unit(); |
|
67 | 83 | } |
|
84 | ||
|
85 | protected override void Suspend() { | |
|
86 | if (m_queueLength == 0) | |
|
87 | base.Suspend(); | |
|
88 | } | |
|
68 | 89 | } |
|
69 | 90 | } |
@@ -103,11 +103,16 namespace Implab { | |||
|
103 | 103 | /// <summary> |
|
104 | 104 | /// Выполняет обещание, сообщая об ошибке |
|
105 | 105 | /// </summary> |
|
106 | /// <remarks> | |
|
107 | /// Поскольку обещание должно работать в многопточной среде, при его выполнении сразу несколько потоков | |
|
108 | /// могу вернуть ошибку, при этом только первая будет использована в качестве результата, остальные | |
|
109 | /// будут проигнорированы. | |
|
110 | /// </remarks> | |
|
106 | 111 | /// <param name="error">Исключение возникшее при выполнении операции</param> |
|
107 | 112 | /// <exception cref="InvalidOperationException">Данное обещание уже выполнено</exception> |
|
108 | 113 | public void Reject(Exception error) { |
|
109 | 114 | lock (m_lock) { |
|
110 | if (m_state == PromiseState.Cancelled) | |
|
115 | if (m_state == PromiseState.Cancelled || m_state == PromiseState.Rejected) | |
|
111 | 116 | return; |
|
112 | 117 | if (m_state != PromiseState.Unresolved) |
|
113 | 118 | throw new InvalidOperationException("The promise is already resolved"); |
General Comments 0
You need to be logged in to leave comments.
Login now