##// END OF EJS Templates
improved performance of a chained map operation
cin -
r89:ce0171cacec4 v2
parent child
Show More
@@ -1,200 +1,202
1 1 using Implab.Diagnostics;
2 2 using System;
3 3 using System.Collections.Generic;
4 4 using System.Diagnostics;
5 5 using System.Linq;
6 6 using System.Text;
7 7 using System.Threading;
8 8
9 9 namespace Implab.Parallels {
10 10 public static class ArrayTraits {
11 11 class ArrayIterator<TSrc> : DispatchPool<int> {
12 12 readonly Action<TSrc> m_action;
13 13 readonly TSrc[] m_source;
14 14 readonly Promise<int> m_promise = new Promise<int>();
15 15 readonly TraceContext m_traceContext;
16 16
17 17 int m_pending;
18 18 int m_next;
19 19
20 20 public ArrayIterator(TSrc[] source, Action<TSrc> action, int threads)
21 21 : base(threads) {
22 22
23 23 Debug.Assert(source != null);
24 24 Debug.Assert(action != null);
25 25
26 26 m_traceContext = TraceContext.Snapshot();
27 27 m_next = 0;
28 28 m_source = source;
29 29 m_pending = source.Length;
30 30 m_action = action;
31 31
32 32 m_promise.Anyway(Dispose);
33 33
34 34 InitPool();
35 35 }
36 36
37 37 public Promise<int> Promise {
38 38 get {
39 39 return m_promise;
40 40 }
41 41 }
42 42
43 43 protected override void Worker() {
44 44 TraceContext.Fork(m_traceContext);
45 45 base.Worker();
46 46 }
47 47
48 48 protected override bool TryDequeue(out int unit) {
49 49 unit = Interlocked.Increment(ref m_next) - 1;
50 50 return unit < m_source.Length;
51 51 }
52 52
53 53 protected override void InvokeUnit(int unit) {
54 54 try {
55 55 m_action(m_source[unit]);
56 56 var pending = Interlocked.Decrement(ref m_pending);
57 57 if (pending == 0)
58 58 m_promise.Resolve(m_source.Length);
59 59 } catch (Exception e) {
60 60 m_promise.Reject(e);
61 61 }
62 62 }
63 63 }
64 64
65 65 class ArrayMapper<TSrc, TDst>: DispatchPool<int> {
66 66 readonly Func<TSrc, TDst> m_transform;
67 67 readonly TSrc[] m_source;
68 68 readonly TDst[] m_dest;
69 69 readonly Promise<TDst[]> m_promise = new Promise<TDst[]>();
70 70 readonly TraceContext m_traceContext;
71 71
72 72 int m_pending;
73 73 int m_next;
74 74
75 75 public ArrayMapper(TSrc[] source, Func<TSrc, TDst> transform, int threads)
76 76 : base(threads) {
77 77
78 78 Debug.Assert (source != null);
79 79 Debug.Assert( transform != null);
80 80
81 81 m_next = 0;
82 82 m_source = source;
83 83 m_dest = new TDst[source.Length];
84 84 m_pending = source.Length;
85 85 m_transform = transform;
86 86 m_traceContext = TraceContext.Snapshot();
87 87
88 88 m_promise.Anyway(Dispose);
89 89
90 90 InitPool();
91 91 }
92 92
93 93 public Promise<TDst[]> Promise {
94 94 get {
95 95 return m_promise;
96 96 }
97 97 }
98 98
99 99 protected override void Worker() {
100 100 TraceContext.Fork(m_traceContext);
101 101 base.Worker();
102 102 }
103 103
104 104 protected override bool TryDequeue(out int unit) {
105 105 unit = Interlocked.Increment(ref m_next) - 1;
106 106 return unit >= m_source.Length ? false : true;
107 107 }
108 108
109 109 protected override void InvokeUnit(int unit) {
110 110 try {
111 111 m_dest[unit] = m_transform(m_source[unit]);
112 112 var pending = Interlocked.Decrement(ref m_pending);
113 113 if (pending == 0)
114 114 m_promise.Resolve(m_dest);
115 115 } catch (Exception e) {
116 116 m_promise.Reject(e);
117 117 }
118 118 }
119 119 }
120 120
121 121 public static IPromise<TDst[]> ParallelMap<TSrc, TDst> (this TSrc[] source, Func<TSrc,TDst> transform, int threads) {
122 122 if (source == null)
123 123 throw new ArgumentNullException("source");
124 124 if (transform == null)
125 125 throw new ArgumentNullException("transform");
126 126
127 127 var mapper = new ArrayMapper<TSrc, TDst>(source, transform, threads);
128 128 return mapper.Promise;
129 129 }
130 130
131 131 public static IPromise<int> ParallelForEach<TSrc>(this TSrc[] source, Action<TSrc> action, int threads) {
132 132 if (source == null)
133 133 throw new ArgumentNullException("source");
134 134 if (action == null)
135 135 throw new ArgumentNullException("action");
136 136
137 137 var iter = new ArrayIterator<TSrc>(source, action, threads);
138 138 return iter.Promise;
139 139 }
140 140
141 141 public static IPromise<TDst[]> ChainedMap<TSrc, TDst>(this TSrc[] source, ResultMapper<TSrc, IPromise<TDst>> transform, int threads) {
142 142 if (source == null)
143 143 throw new ArgumentNullException("source");
144 144 if (transform == null)
145 145 throw new ArgumentNullException("transform");
146 146 if (threads <= 0)
147 147 throw new ArgumentOutOfRangeException("Threads number must be greater then zero");
148 148
149 149 if (source.Length == 0)
150 150 return Promise<TDst[]>.ResultToPromise(new TDst[0]);
151 151
152 152 var promise = new Promise<TDst[]>();
153 153 var res = new TDst[source.Length];
154 154 var pending = source.Length;
155 155
156 156 object locker = new object();
157 157 int slots = threads;
158 158
159 159 // Analysis disable AccessToDisposedClosure
160 160 AsyncPool.InvokeNewThread(() => {
161 161 for (int i = 0; i < source.Length; i++) {
162 162 if(promise.IsResolved)
163 163 break; // stop processing in case of error or cancellation
164 164 var idx = i;
165 165
166 lock(locker) {
167 while(slots == 0)
168 Monitor.Wait(locker);
169 slots--;
166 if (Interlocked.Decrement(ref slots) < 0) {
167 lock(locker) {
168 while(slots < 0)
169 Monitor.Wait(locker);
170 }
170 171 }
172
171 173 try {
172 174 transform(source[i])
173 175 .Anyway(() => {
174 lock(locker) {
175 slots ++;
176 Interlocked.Increment(ref slots);
177 lock (locker) {
176 178 Monitor.Pulse(locker);
177 179 }
178 180 })
179 181 .Last(
180 182 x => {
181 183 res[idx] = x;
182 184 var left = Interlocked.Decrement(ref pending);
183 185 if (left == 0)
184 186 promise.Resolve(res);
185 187 },
186 e => promise.Reject(e)
188 promise.Reject
187 189 );
188 190
189 191 } catch (Exception e) {
190 192 promise.Reject(e);
191 193 }
192 194 }
193 195 return 0;
194 196 });
195 197
196 198 return promise;
197 199 }
198 200
199 201 }
200 202 }
@@ -1,204 +1,204
1 1 using System;
2 2 using System.Collections.Generic;
3 3 using System.Linq;
4 4 using System.Text;
5 5 using System.Threading;
6 6 using System.Diagnostics;
7 7
8 8 namespace Implab.Parallels {
9 9 public abstract class DispatchPool<TUnit> : IDisposable {
10 10 readonly int m_minThreadsLimit;
11 11 readonly int m_maxThreadsLimit;
12 12 readonly int m_releaseTimeout = 1000; // the timeout while the working thread will wait for the new tasks before exit
13 13
14 14 int m_threads = 0; // the current size of the pool
15 15 int m_maxRunningThreads = 0; // the meximum reached size of the pool
16 16 int m_exit = 0; // the pool is going to shutdown, all unused workers are released
17 17
18 18 readonly object m_signal = new object(); // used to pulse waiting threads
19 19
20 20 protected DispatchPool(int min, int max) {
21 21 if (min < 0)
22 22 throw new ArgumentOutOfRangeException("min");
23 23 if (max <= 0)
24 24 throw new ArgumentOutOfRangeException("max");
25 25
26 26 if (min > max)
27 27 min = max;
28 28 m_minThreadsLimit = min;
29 29 m_maxThreadsLimit = max;
30 30 }
31 31
32 32 protected DispatchPool(int threads)
33 33 : this(threads, threads) {
34 34 }
35 35
36 36 protected DispatchPool() {
37 37 int maxThreads, maxCP;
38 38 ThreadPool.GetMaxThreads(out maxThreads, out maxCP);
39 39
40 40 m_minThreadsLimit = 0;
41 41 m_maxThreadsLimit = maxThreads;
42 42 }
43 43
44 44 protected void InitPool() {
45 45 for (int i = 0; i < m_minThreadsLimit; i++)
46 46 StartWorker();
47 47 }
48 48
49 49 public int PoolSize {
50 50 get {
51 51 Thread.MemoryBarrier();
52 52 return m_threads;
53 53 }
54 54 }
55 55
56 56 public int MaxRunningThreads {
57 57 get {
58 58 Thread.MemoryBarrier();
59 59 return m_maxRunningThreads;
60 60 }
61 61 }
62 62
63 63 protected bool IsDisposed {
64 64 get {
65 65 Thread.MemoryBarrier();
66 66 return m_exit == 1;
67 67 }
68 68 }
69 69
70 70 protected abstract bool TryDequeue(out TUnit unit);
71 71
72 private bool Dequeue(out TUnit unit, int timeout) {
72 bool Dequeue(out TUnit unit, int timeout) {
73 73 int ts = Environment.TickCount;
74 74 if (TryDequeue(out unit))
75 75 return true;
76 76 lock (m_signal) {
77 77 while (!TryDequeue(out unit) && m_exit == 0)
78 78 if(!Monitor.Wait(m_signal, Math.Max(0, ts + timeout - Environment.TickCount))) {
79 79 // timeout
80 80 return false;
81 81 }
82 82 // queue item or terminate
83 83 Monitor.Pulse(m_signal);
84 84 if (m_exit == 1)
85 85 return false;
86 86 }
87 87 return true;
88 88 }
89 89
90 90 protected void SignalThread() {
91 91 lock (m_signal) {
92 92 Monitor.Pulse(m_signal);
93 93 }
94 94 }
95 95
96 96 #region thread slots traits
97 97
98 98 bool AllocateThreadSlot() {
99 99 int current;
100 100 // use spins to allocate slot for the new thread
101 101 do {
102 102 current = m_threads;
103 103 if (current >= m_maxThreadsLimit || m_exit == 1)
104 104 // no more slots left or the pool has been disposed
105 105 return false;
106 106 } while (current != Interlocked.CompareExchange(ref m_threads, current + 1, current));
107 107
108 108 UpdateMaxThreads(current + 1);
109 109
110 110 return true;
111 111 }
112 112
113 113 bool AllocateThreadSlot(int desired) {
114 114 if (desired - 1 != Interlocked.CompareExchange(ref m_threads, desired, desired - 1))
115 115 return false;
116 116
117 117 UpdateMaxThreads(desired);
118 118
119 119 return true;
120 120 }
121 121
122 122 bool ReleaseThreadSlot(out bool last) {
123 123 last = false;
124 124 int current;
125 125 // use spins to release slot for the new thread
126 126 Thread.MemoryBarrier();
127 127 do {
128 128 current = m_threads;
129 129 if (current <= m_minThreadsLimit && m_exit == 0)
130 130 // the thread is reserved
131 131 return false;
132 132 } while (current != Interlocked.CompareExchange(ref m_threads, current - 1, current));
133 133
134 134 last = (current == 1);
135 135
136 136 return true;
137 137 }
138 138
139 139 void UpdateMaxThreads(int count) {
140 140 int max;
141 141 do {
142 142 max = m_maxRunningThreads;
143 143 if (max >= count)
144 144 break;
145 145 } while(max != Interlocked.CompareExchange(ref m_maxRunningThreads, count, max));
146 146 }
147 147
148 148 #endregion
149 149
150 150 protected bool StartWorker() {
151 151 if (AllocateThreadSlot()) {
152 152 // slot successfully allocated
153 153 var worker = new Thread(this.Worker);
154 154 worker.IsBackground = true;
155 155 worker.Start();
156 156
157 157 return true;
158 158 } else {
159 159 return false;
160 160 }
161 161 }
162 162
163 163 protected abstract void InvokeUnit(TUnit unit);
164 164
165 165 protected virtual void Worker() {
166 166 TUnit unit;
167 167 bool last;
168 168 do {
169 169 while (Dequeue(out unit, m_releaseTimeout)) {
170 170 InvokeUnit(unit);
171 171 }
172 172 if(!ReleaseThreadSlot(out last))
173 173 continue;
174 174 // queue may be not empty
175 175 if (last && TryDequeue(out unit)) {
176 176 InvokeUnit(unit);
177 177 if (AllocateThreadSlot(1))
178 178 continue;
179 179 // we can safely exit since pool is alive
180 180 }
181 181 break;
182 182 } while(true);
183 183 }
184 184
185 185
186 186 protected virtual void Dispose(bool disposing) {
187 187 if (disposing) {
188 188 if (0 == Interlocked.CompareExchange(ref m_exit, 1, 0)) { // implies memory barrier
189 189 // wake sleeping threads
190 190 SignalThread();
191 191 GC.SuppressFinalize(this);
192 192 }
193 193 }
194 194 }
195 195
196 196 public void Dispose() {
197 197 Dispose(true);
198 198 }
199 199
200 200 ~DispatchPool() {
201 201 Dispose(false);
202 202 }
203 203 }
204 204 }
General Comments 0
You need to be logged in to leave comments. Login now