##// END OF EJS Templates
improved performance of a chained map operation
cin -
r89:ce0171cacec4 v2
parent child
Show More
@@ -1,200 +1,202
1 using Implab.Diagnostics;
1 using Implab.Diagnostics;
2 using System;
2 using System;
3 using System.Collections.Generic;
3 using System.Collections.Generic;
4 using System.Diagnostics;
4 using System.Diagnostics;
5 using System.Linq;
5 using System.Linq;
6 using System.Text;
6 using System.Text;
7 using System.Threading;
7 using System.Threading;
8
8
9 namespace Implab.Parallels {
9 namespace Implab.Parallels {
10 public static class ArrayTraits {
10 public static class ArrayTraits {
11 class ArrayIterator<TSrc> : DispatchPool<int> {
11 class ArrayIterator<TSrc> : DispatchPool<int> {
12 readonly Action<TSrc> m_action;
12 readonly Action<TSrc> m_action;
13 readonly TSrc[] m_source;
13 readonly TSrc[] m_source;
14 readonly Promise<int> m_promise = new Promise<int>();
14 readonly Promise<int> m_promise = new Promise<int>();
15 readonly TraceContext m_traceContext;
15 readonly TraceContext m_traceContext;
16
16
17 int m_pending;
17 int m_pending;
18 int m_next;
18 int m_next;
19
19
20 public ArrayIterator(TSrc[] source, Action<TSrc> action, int threads)
20 public ArrayIterator(TSrc[] source, Action<TSrc> action, int threads)
21 : base(threads) {
21 : base(threads) {
22
22
23 Debug.Assert(source != null);
23 Debug.Assert(source != null);
24 Debug.Assert(action != null);
24 Debug.Assert(action != null);
25
25
26 m_traceContext = TraceContext.Snapshot();
26 m_traceContext = TraceContext.Snapshot();
27 m_next = 0;
27 m_next = 0;
28 m_source = source;
28 m_source = source;
29 m_pending = source.Length;
29 m_pending = source.Length;
30 m_action = action;
30 m_action = action;
31
31
32 m_promise.Anyway(Dispose);
32 m_promise.Anyway(Dispose);
33
33
34 InitPool();
34 InitPool();
35 }
35 }
36
36
37 public Promise<int> Promise {
37 public Promise<int> Promise {
38 get {
38 get {
39 return m_promise;
39 return m_promise;
40 }
40 }
41 }
41 }
42
42
43 protected override void Worker() {
43 protected override void Worker() {
44 TraceContext.Fork(m_traceContext);
44 TraceContext.Fork(m_traceContext);
45 base.Worker();
45 base.Worker();
46 }
46 }
47
47
48 protected override bool TryDequeue(out int unit) {
48 protected override bool TryDequeue(out int unit) {
49 unit = Interlocked.Increment(ref m_next) - 1;
49 unit = Interlocked.Increment(ref m_next) - 1;
50 return unit < m_source.Length;
50 return unit < m_source.Length;
51 }
51 }
52
52
53 protected override void InvokeUnit(int unit) {
53 protected override void InvokeUnit(int unit) {
54 try {
54 try {
55 m_action(m_source[unit]);
55 m_action(m_source[unit]);
56 var pending = Interlocked.Decrement(ref m_pending);
56 var pending = Interlocked.Decrement(ref m_pending);
57 if (pending == 0)
57 if (pending == 0)
58 m_promise.Resolve(m_source.Length);
58 m_promise.Resolve(m_source.Length);
59 } catch (Exception e) {
59 } catch (Exception e) {
60 m_promise.Reject(e);
60 m_promise.Reject(e);
61 }
61 }
62 }
62 }
63 }
63 }
64
64
65 class ArrayMapper<TSrc, TDst>: DispatchPool<int> {
65 class ArrayMapper<TSrc, TDst>: DispatchPool<int> {
66 readonly Func<TSrc, TDst> m_transform;
66 readonly Func<TSrc, TDst> m_transform;
67 readonly TSrc[] m_source;
67 readonly TSrc[] m_source;
68 readonly TDst[] m_dest;
68 readonly TDst[] m_dest;
69 readonly Promise<TDst[]> m_promise = new Promise<TDst[]>();
69 readonly Promise<TDst[]> m_promise = new Promise<TDst[]>();
70 readonly TraceContext m_traceContext;
70 readonly TraceContext m_traceContext;
71
71
72 int m_pending;
72 int m_pending;
73 int m_next;
73 int m_next;
74
74
75 public ArrayMapper(TSrc[] source, Func<TSrc, TDst> transform, int threads)
75 public ArrayMapper(TSrc[] source, Func<TSrc, TDst> transform, int threads)
76 : base(threads) {
76 : base(threads) {
77
77
78 Debug.Assert (source != null);
78 Debug.Assert (source != null);
79 Debug.Assert( transform != null);
79 Debug.Assert( transform != null);
80
80
81 m_next = 0;
81 m_next = 0;
82 m_source = source;
82 m_source = source;
83 m_dest = new TDst[source.Length];
83 m_dest = new TDst[source.Length];
84 m_pending = source.Length;
84 m_pending = source.Length;
85 m_transform = transform;
85 m_transform = transform;
86 m_traceContext = TraceContext.Snapshot();
86 m_traceContext = TraceContext.Snapshot();
87
87
88 m_promise.Anyway(Dispose);
88 m_promise.Anyway(Dispose);
89
89
90 InitPool();
90 InitPool();
91 }
91 }
92
92
93 public Promise<TDst[]> Promise {
93 public Promise<TDst[]> Promise {
94 get {
94 get {
95 return m_promise;
95 return m_promise;
96 }
96 }
97 }
97 }
98
98
99 protected override void Worker() {
99 protected override void Worker() {
100 TraceContext.Fork(m_traceContext);
100 TraceContext.Fork(m_traceContext);
101 base.Worker();
101 base.Worker();
102 }
102 }
103
103
104 protected override bool TryDequeue(out int unit) {
104 protected override bool TryDequeue(out int unit) {
105 unit = Interlocked.Increment(ref m_next) - 1;
105 unit = Interlocked.Increment(ref m_next) - 1;
106 return unit >= m_source.Length ? false : true;
106 return unit >= m_source.Length ? false : true;
107 }
107 }
108
108
109 protected override void InvokeUnit(int unit) {
109 protected override void InvokeUnit(int unit) {
110 try {
110 try {
111 m_dest[unit] = m_transform(m_source[unit]);
111 m_dest[unit] = m_transform(m_source[unit]);
112 var pending = Interlocked.Decrement(ref m_pending);
112 var pending = Interlocked.Decrement(ref m_pending);
113 if (pending == 0)
113 if (pending == 0)
114 m_promise.Resolve(m_dest);
114 m_promise.Resolve(m_dest);
115 } catch (Exception e) {
115 } catch (Exception e) {
116 m_promise.Reject(e);
116 m_promise.Reject(e);
117 }
117 }
118 }
118 }
119 }
119 }
120
120
121 public static IPromise<TDst[]> ParallelMap<TSrc, TDst> (this TSrc[] source, Func<TSrc,TDst> transform, int threads) {
121 public static IPromise<TDst[]> ParallelMap<TSrc, TDst> (this TSrc[] source, Func<TSrc,TDst> transform, int threads) {
122 if (source == null)
122 if (source == null)
123 throw new ArgumentNullException("source");
123 throw new ArgumentNullException("source");
124 if (transform == null)
124 if (transform == null)
125 throw new ArgumentNullException("transform");
125 throw new ArgumentNullException("transform");
126
126
127 var mapper = new ArrayMapper<TSrc, TDst>(source, transform, threads);
127 var mapper = new ArrayMapper<TSrc, TDst>(source, transform, threads);
128 return mapper.Promise;
128 return mapper.Promise;
129 }
129 }
130
130
131 public static IPromise<int> ParallelForEach<TSrc>(this TSrc[] source, Action<TSrc> action, int threads) {
131 public static IPromise<int> ParallelForEach<TSrc>(this TSrc[] source, Action<TSrc> action, int threads) {
132 if (source == null)
132 if (source == null)
133 throw new ArgumentNullException("source");
133 throw new ArgumentNullException("source");
134 if (action == null)
134 if (action == null)
135 throw new ArgumentNullException("action");
135 throw new ArgumentNullException("action");
136
136
137 var iter = new ArrayIterator<TSrc>(source, action, threads);
137 var iter = new ArrayIterator<TSrc>(source, action, threads);
138 return iter.Promise;
138 return iter.Promise;
139 }
139 }
140
140
141 public static IPromise<TDst[]> ChainedMap<TSrc, TDst>(this TSrc[] source, ResultMapper<TSrc, IPromise<TDst>> transform, int threads) {
141 public static IPromise<TDst[]> ChainedMap<TSrc, TDst>(this TSrc[] source, ResultMapper<TSrc, IPromise<TDst>> transform, int threads) {
142 if (source == null)
142 if (source == null)
143 throw new ArgumentNullException("source");
143 throw new ArgumentNullException("source");
144 if (transform == null)
144 if (transform == null)
145 throw new ArgumentNullException("transform");
145 throw new ArgumentNullException("transform");
146 if (threads <= 0)
146 if (threads <= 0)
147 throw new ArgumentOutOfRangeException("Threads number must be greater then zero");
147 throw new ArgumentOutOfRangeException("Threads number must be greater then zero");
148
148
149 if (source.Length == 0)
149 if (source.Length == 0)
150 return Promise<TDst[]>.ResultToPromise(new TDst[0]);
150 return Promise<TDst[]>.ResultToPromise(new TDst[0]);
151
151
152 var promise = new Promise<TDst[]>();
152 var promise = new Promise<TDst[]>();
153 var res = new TDst[source.Length];
153 var res = new TDst[source.Length];
154 var pending = source.Length;
154 var pending = source.Length;
155
155
156 object locker = new object();
156 object locker = new object();
157 int slots = threads;
157 int slots = threads;
158
158
159 // Analysis disable AccessToDisposedClosure
159 // Analysis disable AccessToDisposedClosure
160 AsyncPool.InvokeNewThread(() => {
160 AsyncPool.InvokeNewThread(() => {
161 for (int i = 0; i < source.Length; i++) {
161 for (int i = 0; i < source.Length; i++) {
162 if(promise.IsResolved)
162 if(promise.IsResolved)
163 break; // stop processing in case of error or cancellation
163 break; // stop processing in case of error or cancellation
164 var idx = i;
164 var idx = i;
165
165
166 lock(locker) {
166 if (Interlocked.Decrement(ref slots) < 0) {
167 while(slots == 0)
167 lock(locker) {
168 Monitor.Wait(locker);
168 while(slots < 0)
169 slots--;
169 Monitor.Wait(locker);
170 }
170 }
171 }
172
171 try {
173 try {
172 transform(source[i])
174 transform(source[i])
173 .Anyway(() => {
175 .Anyway(() => {
174 lock(locker) {
176 Interlocked.Increment(ref slots);
175 slots ++;
177 lock (locker) {
176 Monitor.Pulse(locker);
178 Monitor.Pulse(locker);
177 }
179 }
178 })
180 })
179 .Last(
181 .Last(
180 x => {
182 x => {
181 res[idx] = x;
183 res[idx] = x;
182 var left = Interlocked.Decrement(ref pending);
184 var left = Interlocked.Decrement(ref pending);
183 if (left == 0)
185 if (left == 0)
184 promise.Resolve(res);
186 promise.Resolve(res);
185 },
187 },
186 e => promise.Reject(e)
188 promise.Reject
187 );
189 );
188
190
189 } catch (Exception e) {
191 } catch (Exception e) {
190 promise.Reject(e);
192 promise.Reject(e);
191 }
193 }
192 }
194 }
193 return 0;
195 return 0;
194 });
196 });
195
197
196 return promise;
198 return promise;
197 }
199 }
198
200
199 }
201 }
200 }
202 }
@@ -1,204 +1,204
1 using System;
1 using System;
2 using System.Collections.Generic;
2 using System.Collections.Generic;
3 using System.Linq;
3 using System.Linq;
4 using System.Text;
4 using System.Text;
5 using System.Threading;
5 using System.Threading;
6 using System.Diagnostics;
6 using System.Diagnostics;
7
7
8 namespace Implab.Parallels {
8 namespace Implab.Parallels {
9 public abstract class DispatchPool<TUnit> : IDisposable {
9 public abstract class DispatchPool<TUnit> : IDisposable {
10 readonly int m_minThreadsLimit;
10 readonly int m_minThreadsLimit;
11 readonly int m_maxThreadsLimit;
11 readonly int m_maxThreadsLimit;
12 readonly int m_releaseTimeout = 1000; // the timeout while the working thread will wait for the new tasks before exit
12 readonly int m_releaseTimeout = 1000; // the timeout while the working thread will wait for the new tasks before exit
13
13
14 int m_threads = 0; // the current size of the pool
14 int m_threads = 0; // the current size of the pool
15 int m_maxRunningThreads = 0; // the meximum reached size of the pool
15 int m_maxRunningThreads = 0; // the meximum reached size of the pool
16 int m_exit = 0; // the pool is going to shutdown, all unused workers are released
16 int m_exit = 0; // the pool is going to shutdown, all unused workers are released
17
17
18 readonly object m_signal = new object(); // used to pulse waiting threads
18 readonly object m_signal = new object(); // used to pulse waiting threads
19
19
20 protected DispatchPool(int min, int max) {
20 protected DispatchPool(int min, int max) {
21 if (min < 0)
21 if (min < 0)
22 throw new ArgumentOutOfRangeException("min");
22 throw new ArgumentOutOfRangeException("min");
23 if (max <= 0)
23 if (max <= 0)
24 throw new ArgumentOutOfRangeException("max");
24 throw new ArgumentOutOfRangeException("max");
25
25
26 if (min > max)
26 if (min > max)
27 min = max;
27 min = max;
28 m_minThreadsLimit = min;
28 m_minThreadsLimit = min;
29 m_maxThreadsLimit = max;
29 m_maxThreadsLimit = max;
30 }
30 }
31
31
32 protected DispatchPool(int threads)
32 protected DispatchPool(int threads)
33 : this(threads, threads) {
33 : this(threads, threads) {
34 }
34 }
35
35
36 protected DispatchPool() {
36 protected DispatchPool() {
37 int maxThreads, maxCP;
37 int maxThreads, maxCP;
38 ThreadPool.GetMaxThreads(out maxThreads, out maxCP);
38 ThreadPool.GetMaxThreads(out maxThreads, out maxCP);
39
39
40 m_minThreadsLimit = 0;
40 m_minThreadsLimit = 0;
41 m_maxThreadsLimit = maxThreads;
41 m_maxThreadsLimit = maxThreads;
42 }
42 }
43
43
44 protected void InitPool() {
44 protected void InitPool() {
45 for (int i = 0; i < m_minThreadsLimit; i++)
45 for (int i = 0; i < m_minThreadsLimit; i++)
46 StartWorker();
46 StartWorker();
47 }
47 }
48
48
49 public int PoolSize {
49 public int PoolSize {
50 get {
50 get {
51 Thread.MemoryBarrier();
51 Thread.MemoryBarrier();
52 return m_threads;
52 return m_threads;
53 }
53 }
54 }
54 }
55
55
56 public int MaxRunningThreads {
56 public int MaxRunningThreads {
57 get {
57 get {
58 Thread.MemoryBarrier();
58 Thread.MemoryBarrier();
59 return m_maxRunningThreads;
59 return m_maxRunningThreads;
60 }
60 }
61 }
61 }
62
62
63 protected bool IsDisposed {
63 protected bool IsDisposed {
64 get {
64 get {
65 Thread.MemoryBarrier();
65 Thread.MemoryBarrier();
66 return m_exit == 1;
66 return m_exit == 1;
67 }
67 }
68 }
68 }
69
69
70 protected abstract bool TryDequeue(out TUnit unit);
70 protected abstract bool TryDequeue(out TUnit unit);
71
71
72 private bool Dequeue(out TUnit unit, int timeout) {
72 bool Dequeue(out TUnit unit, int timeout) {
73 int ts = Environment.TickCount;
73 int ts = Environment.TickCount;
74 if (TryDequeue(out unit))
74 if (TryDequeue(out unit))
75 return true;
75 return true;
76 lock (m_signal) {
76 lock (m_signal) {
77 while (!TryDequeue(out unit) && m_exit == 0)
77 while (!TryDequeue(out unit) && m_exit == 0)
78 if(!Monitor.Wait(m_signal, Math.Max(0, ts + timeout - Environment.TickCount))) {
78 if(!Monitor.Wait(m_signal, Math.Max(0, ts + timeout - Environment.TickCount))) {
79 // timeout
79 // timeout
80 return false;
80 return false;
81 }
81 }
82 // queue item or terminate
82 // queue item or terminate
83 Monitor.Pulse(m_signal);
83 Monitor.Pulse(m_signal);
84 if (m_exit == 1)
84 if (m_exit == 1)
85 return false;
85 return false;
86 }
86 }
87 return true;
87 return true;
88 }
88 }
89
89
90 protected void SignalThread() {
90 protected void SignalThread() {
91 lock (m_signal) {
91 lock (m_signal) {
92 Monitor.Pulse(m_signal);
92 Monitor.Pulse(m_signal);
93 }
93 }
94 }
94 }
95
95
96 #region thread slots traits
96 #region thread slots traits
97
97
98 bool AllocateThreadSlot() {
98 bool AllocateThreadSlot() {
99 int current;
99 int current;
100 // use spins to allocate slot for the new thread
100 // use spins to allocate slot for the new thread
101 do {
101 do {
102 current = m_threads;
102 current = m_threads;
103 if (current >= m_maxThreadsLimit || m_exit == 1)
103 if (current >= m_maxThreadsLimit || m_exit == 1)
104 // no more slots left or the pool has been disposed
104 // no more slots left or the pool has been disposed
105 return false;
105 return false;
106 } while (current != Interlocked.CompareExchange(ref m_threads, current + 1, current));
106 } while (current != Interlocked.CompareExchange(ref m_threads, current + 1, current));
107
107
108 UpdateMaxThreads(current + 1);
108 UpdateMaxThreads(current + 1);
109
109
110 return true;
110 return true;
111 }
111 }
112
112
113 bool AllocateThreadSlot(int desired) {
113 bool AllocateThreadSlot(int desired) {
114 if (desired - 1 != Interlocked.CompareExchange(ref m_threads, desired, desired - 1))
114 if (desired - 1 != Interlocked.CompareExchange(ref m_threads, desired, desired - 1))
115 return false;
115 return false;
116
116
117 UpdateMaxThreads(desired);
117 UpdateMaxThreads(desired);
118
118
119 return true;
119 return true;
120 }
120 }
121
121
122 bool ReleaseThreadSlot(out bool last) {
122 bool ReleaseThreadSlot(out bool last) {
123 last = false;
123 last = false;
124 int current;
124 int current;
125 // use spins to release slot for the new thread
125 // use spins to release slot for the new thread
126 Thread.MemoryBarrier();
126 Thread.MemoryBarrier();
127 do {
127 do {
128 current = m_threads;
128 current = m_threads;
129 if (current <= m_minThreadsLimit && m_exit == 0)
129 if (current <= m_minThreadsLimit && m_exit == 0)
130 // the thread is reserved
130 // the thread is reserved
131 return false;
131 return false;
132 } while (current != Interlocked.CompareExchange(ref m_threads, current - 1, current));
132 } while (current != Interlocked.CompareExchange(ref m_threads, current - 1, current));
133
133
134 last = (current == 1);
134 last = (current == 1);
135
135
136 return true;
136 return true;
137 }
137 }
138
138
139 void UpdateMaxThreads(int count) {
139 void UpdateMaxThreads(int count) {
140 int max;
140 int max;
141 do {
141 do {
142 max = m_maxRunningThreads;
142 max = m_maxRunningThreads;
143 if (max >= count)
143 if (max >= count)
144 break;
144 break;
145 } while(max != Interlocked.CompareExchange(ref m_maxRunningThreads, count, max));
145 } while(max != Interlocked.CompareExchange(ref m_maxRunningThreads, count, max));
146 }
146 }
147
147
148 #endregion
148 #endregion
149
149
150 protected bool StartWorker() {
150 protected bool StartWorker() {
151 if (AllocateThreadSlot()) {
151 if (AllocateThreadSlot()) {
152 // slot successfully allocated
152 // slot successfully allocated
153 var worker = new Thread(this.Worker);
153 var worker = new Thread(this.Worker);
154 worker.IsBackground = true;
154 worker.IsBackground = true;
155 worker.Start();
155 worker.Start();
156
156
157 return true;
157 return true;
158 } else {
158 } else {
159 return false;
159 return false;
160 }
160 }
161 }
161 }
162
162
163 protected abstract void InvokeUnit(TUnit unit);
163 protected abstract void InvokeUnit(TUnit unit);
164
164
165 protected virtual void Worker() {
165 protected virtual void Worker() {
166 TUnit unit;
166 TUnit unit;
167 bool last;
167 bool last;
168 do {
168 do {
169 while (Dequeue(out unit, m_releaseTimeout)) {
169 while (Dequeue(out unit, m_releaseTimeout)) {
170 InvokeUnit(unit);
170 InvokeUnit(unit);
171 }
171 }
172 if(!ReleaseThreadSlot(out last))
172 if(!ReleaseThreadSlot(out last))
173 continue;
173 continue;
174 // queue may be not empty
174 // queue may be not empty
175 if (last && TryDequeue(out unit)) {
175 if (last && TryDequeue(out unit)) {
176 InvokeUnit(unit);
176 InvokeUnit(unit);
177 if (AllocateThreadSlot(1))
177 if (AllocateThreadSlot(1))
178 continue;
178 continue;
179 // we can safely exit since pool is alive
179 // we can safely exit since pool is alive
180 }
180 }
181 break;
181 break;
182 } while(true);
182 } while(true);
183 }
183 }
184
184
185
185
186 protected virtual void Dispose(bool disposing) {
186 protected virtual void Dispose(bool disposing) {
187 if (disposing) {
187 if (disposing) {
188 if (0 == Interlocked.CompareExchange(ref m_exit, 1, 0)) { // implies memory barrier
188 if (0 == Interlocked.CompareExchange(ref m_exit, 1, 0)) { // implies memory barrier
189 // wake sleeping threads
189 // wake sleeping threads
190 SignalThread();
190 SignalThread();
191 GC.SuppressFinalize(this);
191 GC.SuppressFinalize(this);
192 }
192 }
193 }
193 }
194 }
194 }
195
195
196 public void Dispose() {
196 public void Dispose() {
197 Dispose(true);
197 Dispose(true);
198 }
198 }
199
199
200 ~DispatchPool() {
200 ~DispatchPool() {
201 Dispose(false);
201 Dispose(false);
202 }
202 }
203 }
203 }
204 }
204 }
General Comments 0
You need to be logged in to leave comments. Login now