##// END OF EJS Templates
improved asyncpool usability...
cin -
r120:f1b897999260 v2
parent child
Show More
@@ -1,77 +1,86
1 using Implab.Diagnostics;
1 using Implab.Diagnostics;
2 using System;
2 using System;
3 using System.Threading;
3 using System.Threading;
4 using System.Linq;
4
5
5 namespace Implab.Parallels {
6 namespace Implab.Parallels {
6 /// <summary>
7 /// <summary>
7 /// Класс для распаралСливания Π·Π°Π΄Π°Ρ‡.
8 /// Класс для распаралСливания Π·Π°Π΄Π°Ρ‡.
8 /// </summary>
9 /// </summary>
9 /// <remarks>
10 /// <remarks>
10 /// Π˜ΡΠΏΠΎΠ»ΡŒΠ·ΡƒΡ Π΄Π°Π½Π½Ρ‹ΠΉ класс ΠΈ лямда выраТСния ΠΌΠΎΠΆΠ½ΠΎ Ρ€Π°ΡΠΏΠ°Ρ€Π°Π»Π»Π΅Π»ΠΈΡ‚ΡŒ
11 /// Π˜ΡΠΏΠΎΠ»ΡŒΠ·ΡƒΡ Π΄Π°Π½Π½Ρ‹ΠΉ класс ΠΈ лямда выраТСния ΠΌΠΎΠΆΠ½ΠΎ Ρ€Π°ΡΠΏΠ°Ρ€Π°Π»Π»Π΅Π»ΠΈΡ‚ΡŒ
11 /// вычислСния, для этого ΠΈΡΠΏΠΎΠ»ΡŒΠ·ΡƒΠ΅Ρ‚ΡΡ концСпция ΠΎΠ±Π΅Ρ‰Π°Π½ΠΈΠΉ.
12 /// вычислСния, для этого ΠΈΡΠΏΠΎΠ»ΡŒΠ·ΡƒΠ΅Ρ‚ΡΡ концСпция ΠΎΠ±Π΅Ρ‰Π°Π½ΠΈΠΉ.
12 /// </remarks>
13 /// </remarks>
13 public static class AsyncPool {
14 public static class AsyncPool {
14
15
15 public static IPromise<T> Invoke<T>(Func<T> func) {
16 public static IPromise<T> Invoke<T>(Func<T> func) {
16 var p = new Promise<T>();
17 var p = new Promise<T>();
17 var caller = TraceContext.Instance.CurrentOperation;
18 var caller = TraceContext.Instance.CurrentOperation;
18
19
19 ThreadPool.QueueUserWorkItem(param => {
20 ThreadPool.QueueUserWorkItem(param => {
20 TraceContext.Instance.EnterLogicalOperation(caller,false);
21 TraceContext.Instance.EnterLogicalOperation(caller,false);
21 try {
22 try {
22 p.Resolve(func());
23 p.Resolve(func());
23 } catch(Exception e) {
24 } catch(Exception e) {
24 p.Reject(e);
25 p.Reject(e);
25 } finally {
26 } finally {
26 TraceContext.Instance.Leave();
27 TraceContext.Instance.Leave();
27 }
28 }
28 });
29 });
29
30
30 return p;
31 return p;
31 }
32 }
32
33
33 public static IPromise<T> InvokeNewThread<T>(Func<T> func) {
34 public static IPromise<T> InvokeNewThread<T>(Func<T> func) {
34 var p = new Promise<T>();
35 var p = new Promise<T>();
35
36
36 var caller = TraceContext.Instance.CurrentOperation;
37 var caller = TraceContext.Instance.CurrentOperation;
37
38
38 var worker = new Thread(() => {
39 var worker = new Thread(() => {
39 TraceContext.Instance.EnterLogicalOperation(caller,false);
40 TraceContext.Instance.EnterLogicalOperation(caller,false);
40 try {
41 try {
41 p.Resolve(func());
42 p.Resolve(func());
42 } catch (Exception e) {
43 } catch (Exception e) {
43 p.Reject(e);
44 p.Reject(e);
44 } finally {
45 } finally {
45 TraceContext.Instance.Leave();
46 TraceContext.Instance.Leave();
46 }
47 }
47 });
48 });
48 worker.IsBackground = true;
49 worker.IsBackground = true;
49 worker.Start();
50 worker.Start();
50
51
51 return p;
52 return p;
52 }
53 }
53
54
54
55
55 public static IPromise InvokeNewThread(Action func) {
56 public static IPromise InvokeNewThread(Action func) {
56 var p = new Promise();
57 var p = new Promise();
57
58
58 var caller = TraceContext.Instance.CurrentOperation;
59 var caller = TraceContext.Instance.CurrentOperation;
59
60
60 var worker = new Thread(() => {
61 var worker = new Thread(() => {
61 TraceContext.Instance.EnterLogicalOperation(caller,false);
62 TraceContext.Instance.EnterLogicalOperation(caller,false);
62 try {
63 try {
63 func();
64 func();
64 p.Resolve();
65 p.Resolve();
65 } catch (Exception e) {
66 } catch (Exception e) {
66 p.Reject(e);
67 p.Reject(e);
67 } finally {
68 } finally {
68 TraceContext.Instance.Leave();
69 TraceContext.Instance.Leave();
69 }
70 }
70 });
71 });
71 worker.IsBackground = true;
72 worker.IsBackground = true;
72 worker.Start();
73 worker.Start();
73
74
74 return p;
75 return p;
75 }
76 }
77
78 public static IPromise[] ThreadRun(params Action[] func) {
79 return func.Select(f => InvokeNewThread(f)).ToArray();
80 }
81
82 public static IPromise<T>[] ThreadRun<T>(params Func<T>[] func) {
83 return func.Select(f => InvokeNewThread(f)).ToArray();
76 }
84 }
77 }
85 }
86 }
@@ -1,244 +1,271
1 using System.Threading;
1 using System.Threading;
2 using System.Collections.Generic;
2 using System.Collections.Generic;
3 using System;
3 using System;
4 using System.Collections;
4 using System.Collections;
5
5
6 namespace Implab.Parallels {
6 namespace Implab.Parallels {
7 public class AsyncQueue<T> : IEnumerable<T> {
7 public class AsyncQueue<T> : IEnumerable<T> {
8 class Chunk {
8 class Chunk {
9 public Chunk next;
9 public Chunk next;
10
10
11 int m_low;
11 int m_low;
12 int m_hi;
12 int m_hi;
13 int m_alloc;
13 int m_alloc;
14 readonly int m_size;
14 readonly int m_size;
15 readonly T[] m_data;
15 readonly T[] m_data;
16
16
17 public Chunk(int size) {
17 public Chunk(int size) {
18 m_size = size;
18 m_size = size;
19 m_data = new T[size];
19 m_data = new T[size];
20 }
20 }
21
21
22 public Chunk(int size, T value) {
22 public Chunk(int size, T value) {
23 m_size = size;
23 m_size = size;
24 m_hi = 1;
24 m_hi = 1;
25 m_alloc = 1;
25 m_alloc = 1;
26 m_data = new T[size];
26 m_data = new T[size];
27 m_data[0] = value;
27 m_data[0] = value;
28 }
28 }
29
29
30 public int Low {
30 public int Low {
31 get { return m_low; }
31 get { return m_low; }
32 }
32 }
33
33
34 public int Hi {
34 public int Hi {
35 get { return m_hi; }
35 get { return m_hi; }
36 }
36 }
37
37
38 public bool TryEnqueue(T value,out bool extend) {
38 public bool TryEnqueue(T value,out bool extend) {
39 extend = false;
39 var alloc = Interlocked.Increment(ref m_alloc) - 1;
40 int alloc;
41 do {
42 alloc = m_alloc;
43 if (alloc > m_size)
44 return false;
45 } while(alloc != Interlocked.CompareExchange(ref m_alloc, alloc+1, alloc));
46
40
47 if (alloc == m_size) {
41 if (alloc >= m_size) {
48 extend = true;
42 extend = alloc == m_size;
49 return false;
43 return false;
50 }
44 }
51
45
46 extend = false;
52 m_data[alloc] = value;
47 m_data[alloc] = value;
53
48
54 while (alloc != Interlocked.CompareExchange(ref m_hi, alloc + 1, alloc)) {
49 while (alloc != Interlocked.CompareExchange(ref m_hi, alloc + 1, alloc)) {
55 // spin wait for commit
50 // spin wait for commit
56 }
51 }
57 return true;
52 return true;
58 }
53 }
59
54
60 public bool TryDequeue(out T value,out bool recycle) {
55 public bool TryDequeue(out T value,out bool recycle) {
61 int low;
56 int low;
62 do {
57 do {
63 low = m_low;
58 low = m_low;
64 if (low >= m_hi) {
59 if (low >= m_hi) {
65 value = default(T);
60 value = default(T);
66 recycle = (low == m_size);
61 recycle = (low == m_size);
67 return false;
62 return false;
68 }
63 }
69 } while(low != Interlocked.CompareExchange(ref m_low, low + 1, low));
64 } while(low != Interlocked.CompareExchange(ref m_low, low + 1, low));
70
65
71 recycle = (low == m_size - 1);
66 recycle = (low == m_size - 1);
72 value = m_data[low];
67 value = m_data[low];
73
68
74 return true;
69 return true;
75 }
70 }
76
71
72 public bool TryEnqueueBatch(T[] batch, int offset, int length, out int enqueued, out bool extend) {
73 int alloc;
74 int allocSize;
75
76 do {
77 alloc = m_alloc;
78
79 if (alloc > m_size) {
80 enqueued = 0;
81 extend = false;
82 return false;
83 }
84
85 allocSize = Math.Min(m_size - m_alloc, length);
86 } while(alloc != Interlocked.CompareExchange(ref m_alloc, alloc + allocSize, alloc));
87
88 if (alloc == m_size) {
89 enqueued = 0;
90 extend = true;
91 return false;
92 }
93
94 Array.Copy(batch, offset, m_data, alloc, allocSize);
95 enqueued = allocSize;
96 extend = false;
97
98 while (alloc != Interlocked.CompareExchange(ref m_hi, alloc + allocSize, alloc)) {
99 // spin wait for commit
100 }
101 return true;
102 }
103
77 public T GetAt(int pos) {
104 public T GetAt(int pos) {
78 return m_data[pos];
105 return m_data[pos];
79 }
106 }
80 }
107 }
81
108
82 public const int DEFAULT_CHUNK_SIZE = 32;
109 public const int DEFAULT_CHUNK_SIZE = 32;
83
110
84 readonly int m_chunkSize = DEFAULT_CHUNK_SIZE;
111 readonly int m_chunkSize = DEFAULT_CHUNK_SIZE;
85
112
86 Chunk m_first;
113 Chunk m_first;
87 Chunk m_last;
114 Chunk m_last;
88
115
89 public AsyncQueue() {
116 public AsyncQueue() {
90 m_last = m_first = new Chunk(m_chunkSize);
117 m_last = m_first = new Chunk(m_chunkSize);
91 }
118 }
92
119
93 public void Enqueue(T value) {
120 public void Enqueue(T value) {
94 var last = m_last;
121 var last = m_last;
95 // spin wait to the new chunk
122 // spin wait to the new chunk
96 bool extend = true;
123 bool extend = true;
97 while(last == null || !last.TryEnqueue(value, out extend)) {
124 while(last == null || !last.TryEnqueue(value, out extend)) {
98 // try to extend queue
125 // try to extend queue
99 if (extend || last == null) {
126 if (extend || last == null) {
100 var chunk = new Chunk(m_chunkSize, value);
127 var chunk = new Chunk(m_chunkSize, value);
101 if (EnqueueChunk(last, chunk))
128 if (EnqueueChunk(last, chunk))
102 break;
129 break;
103 last = m_last;
130 last = m_last;
104 } else {
131 } else {
105 while (last != m_last) {
132 while (last != m_last) {
106 Thread.MemoryBarrier();
133 Thread.MemoryBarrier();
107 last = m_last;
134 last = m_last;
108 }
135 }
109 }
136 }
110 }
137 }
111 }
138 }
112
139
113 public bool TryDequeue(out T value) {
140 public bool TryDequeue(out T value) {
114 var chunk = m_first;
141 var chunk = m_first;
115 bool recycle;
142 bool recycle;
116 while (chunk != null) {
143 while (chunk != null) {
117
144
118 var result = chunk.TryDequeue(out value, out recycle);
145 var result = chunk.TryDequeue(out value, out recycle);
119
146
120 if (recycle) // this chunk is waste
147 if (recycle) // this chunk is waste
121 RecycleFirstChunk(chunk);
148 RecycleFirstChunk(chunk);
122 else
149 else
123 return result; // this chunk is usable and returned actual result
150 return result; // this chunk is usable and returned actual result
124
151
125 if (result) // this chunk is waste but the true result is always actual
152 if (result) // this chunk is waste but the true result is always actual
126 return true;
153 return true;
127
154
128 // try again
155 // try again
129 chunk = m_first;
156 chunk = m_first;
130 }
157 }
131
158
132 // the queue is empty
159 // the queue is empty
133 value = default(T);
160 value = default(T);
134 return false;
161 return false;
135 }
162 }
136
163
137 bool EnqueueChunk(Chunk last, Chunk chunk) {
164 bool EnqueueChunk(Chunk last, Chunk chunk) {
138 if (Interlocked.CompareExchange(ref m_last, chunk, last) != last)
165 if (Interlocked.CompareExchange(ref m_last, chunk, last) != last)
139 return false;
166 return false;
140
167
141 if (last != null)
168 if (last != null)
142 last.next = chunk;
169 last.next = chunk;
143 else
170 else
144 m_first = chunk;
171 m_first = chunk;
145 return true;
172 return true;
146 }
173 }
147
174
148 void RecycleFirstChunk(Chunk first) {
175 void RecycleFirstChunk(Chunk first) {
149 var next = first.next;
176 var next = first.next;
150
177
151 if (next == null) {
178 if (next == null) {
152 // looks like this is the last chunk
179 // looks like this is the last chunk
153 if (first != Interlocked.CompareExchange(ref m_last, null, first)) {
180 if (first != Interlocked.CompareExchange(ref m_last, null, first)) {
154 // race
181 // race
155 // maybe someone already recycled this chunk
182 // maybe someone already recycled this chunk
156 // or a new chunk has been appedned to the queue
183 // or a new chunk has been appedned to the queue
157
184
158 return; // give up
185 return; // give up
159 }
186 }
160 // the tail is updated
187 // the tail is updated
161 }
188 }
162
189
163 // we need to update the head
190 // we need to update the head
164 Interlocked.CompareExchange(ref m_first, next, first);
191 Interlocked.CompareExchange(ref m_first, next, first);
165 // if the head is already updated then give up
192 // if the head is already updated then give up
166 return;
193 return;
167
194
168 }
195 }
169
196
170 #region IEnumerable implementation
197 #region IEnumerable implementation
171
198
172 class Enumerator : IEnumerator<T> {
199 class Enumerator : IEnumerator<T> {
173 Chunk m_current;
200 Chunk m_current;
174 int m_pos = -1;
201 int m_pos = -1;
175
202
176 public Enumerator(Chunk fisrt) {
203 public Enumerator(Chunk fisrt) {
177 m_current = fisrt;
204 m_current = fisrt;
178 }
205 }
179
206
180 #region IEnumerator implementation
207 #region IEnumerator implementation
181
208
182 public bool MoveNext() {
209 public bool MoveNext() {
183 if (m_current == null)
210 if (m_current == null)
184 return false;
211 return false;
185
212
186 if (m_pos == -1)
213 if (m_pos == -1)
187 m_pos = m_current.Low;
214 m_pos = m_current.Low;
188 else
215 else
189 m_pos++;
216 m_pos++;
190 if (m_pos == m_current.Hi) {
217 if (m_pos == m_current.Hi) {
191 m_pos = 0;
218 m_pos = 0;
192 m_current = m_current.next;
219 m_current = m_current.next;
193 }
220 }
194
221
195 return true;
222 return true;
196 }
223 }
197
224
198 public void Reset() {
225 public void Reset() {
199 throw new NotSupportedException();
226 throw new NotSupportedException();
200 }
227 }
201
228
202 object IEnumerator.Current {
229 object IEnumerator.Current {
203 get {
230 get {
204 return Current;
231 return Current;
205 }
232 }
206 }
233 }
207
234
208 #endregion
235 #endregion
209
236
210 #region IDisposable implementation
237 #region IDisposable implementation
211
238
212 public void Dispose() {
239 public void Dispose() {
213 }
240 }
214
241
215 #endregion
242 #endregion
216
243
217 #region IEnumerator implementation
244 #region IEnumerator implementation
218
245
219 public T Current {
246 public T Current {
220 get {
247 get {
221 if (m_pos == -1 || m_current == null)
248 if (m_pos == -1 || m_current == null)
222 throw new InvalidOperationException();
249 throw new InvalidOperationException();
223 return m_current.GetAt(m_pos);
250 return m_current.GetAt(m_pos);
224 }
251 }
225 }
252 }
226
253
227 #endregion
254 #endregion
228 }
255 }
229
256
230 public IEnumerator<T> GetEnumerator() {
257 public IEnumerator<T> GetEnumerator() {
231 return new Enumerator(m_first);
258 return new Enumerator(m_first);
232 }
259 }
233
260
234 #endregion
261 #endregion
235
262
236 #region IEnumerable implementation
263 #region IEnumerable implementation
237
264
238 IEnumerator IEnumerable.GetEnumerator() {
265 IEnumerator IEnumerable.GetEnumerator() {
239 return GetEnumerator();
266 return GetEnumerator();
240 }
267 }
241
268
242 #endregion
269 #endregion
243 }
270 }
244 }
271 }
@@ -1,110 +1,110
1 using System;
1 using System;
2 using Implab.Diagnostics;
2 using Implab.Diagnostics;
3 using Implab.Parallels;
3 using Implab.Parallels;
4 using Implab;
4 using Implab;
5 using System.Collections.Generic;
5 using System.Collections.Generic;
6 using System.Collections.Concurrent;
6 using System.Collections.Concurrent;
7
7
8 namespace MonoPlay {
8 namespace MonoPlay {
9 class MainClass {
9 class MainClass {
10 public static void Main(string[] args) {
10 public static void Main(string[] args) {
11 if (args == null)
11 if (args == null)
12 throw new ArgumentNullException("args");
12 throw new ArgumentNullException("args");
13
13
14 var q1 = new AsyncQueue<int>();
14 var q1 = new AsyncQueue<int>();
15 var q2 = new Queue<int>();
15 var q2 = new Queue<int>();
16
16
17 const int count = 10000000;
17 const int count = 10000000;
18
18
19
19
20 var t1 = Environment.TickCount;
20 var t1 = Environment.TickCount;
21
21
22 new [] {
22 AsyncPool.ThreadRun(
23 AsyncPool.InvokeNewThread(() => {
23 () => {
24 for (var i = 0; i < count; i++)
24 for (var i = 0; i < count; i++)
25 q1.Enqueue(i);
25 q1.Enqueue(i);
26 }),
26 },
27 AsyncPool.InvokeNewThread(() => {
27 () => {
28 for (var i = 0; i < count; i++)
28 for (var i = 0; i < count; i++)
29 q1.Enqueue(i);
29 q1.Enqueue(i);
30 }),
30 },
31 AsyncPool.InvokeNewThread(() => {
31 () => {
32 int temp = 0;
32 int temp = 0;
33 int i = 0;
33 int i = 0;
34 while (i < count)
34 while (i < count)
35 if (q1.TryDequeue(out temp))
35 if (q1.TryDequeue(out temp))
36 i++;
36 i++;
37 }),
37 },
38 AsyncPool.InvokeNewThread(() => {
38 () => {
39 int temp = 0;
39 int temp = 0;
40 int i = 0;
40 int i = 0;
41 while (i < count)
41 while (i < count)
42 if (q1.TryDequeue(out temp))
42 if (q1.TryDequeue(out temp))
43 i++;
43 i++;
44 })
45 }
44 }
45 )
46 .Combine()
46 .Combine()
47 .Join();
47 .Join();
48
48
49 var t2 = Environment.TickCount;
49 var t2 = Environment.TickCount;
50 Console.WriteLine("MTQueue: {0} ms", t2 - t1);
50 Console.WriteLine("MTQueue: {0} ms", t2 - t1);
51
51
52 t1 = Environment.TickCount;
52 t1 = Environment.TickCount;
53
53
54 for (var i = 0; i < count * 2; i++)
54 for (var i = 0; i < count * 2; i++)
55 q2.Enqueue(i);
55 q2.Enqueue(i);
56
56
57 for (var i = 0; i < count * 2; i++)
57 for (var i = 0; i < count * 2; i++)
58 q2.Dequeue();
58 q2.Dequeue();
59
59
60 t2 = Environment.TickCount;
60 t2 = Environment.TickCount;
61 Console.WriteLine("Queue: {0} ms", t2 - t1);
61 Console.WriteLine("Queue: {0} ms", t2 - t1);
62
62
63 q2 = new Queue<int>();
63 q2 = new Queue<int>();
64
64
65 t1 = Environment.TickCount;
65 t1 = Environment.TickCount;
66
66
67
67
68 new [] {
68 AsyncPool.ThreadRun(
69 AsyncPool.InvokeNewThread(() => {
69 () => {
70 for (var i = 0; i < count; i++)
70 for (var i = 0; i < count; i++)
71 lock (q2)
71 lock (q2)
72 q2.Enqueue(i);
72 q2.Enqueue(i);
73 }),
73 },
74 AsyncPool.InvokeNewThread(() => {
74 () => {
75 for (var i = 0; i < count; i++)
75 for (var i = 0; i < count; i++)
76 lock (q2)
76 lock (q2)
77 q2.Enqueue(i);
77 q2.Enqueue(i);
78 }),
78 },
79 AsyncPool.InvokeNewThread(() => {
79 () => {
80 for (int i = 0; i < count ;)
80 for (int i = 0; i < count ;)
81 lock (q2) {
81 lock (q2) {
82 if (q2.Count == 0)
82 if (q2.Count == 0)
83 continue;
83 continue;
84 q2.Dequeue();
84 q2.Dequeue();
85 i++;
85 i++;
86 }
86 }
87
87
88 }),
88 },
89 AsyncPool.InvokeNewThread(() => {
89 () => {
90 for (int i = 0; i < count ;)
90 for (int i = 0; i < count ;)
91 lock (q2) {
91 lock (q2) {
92 if (q2.Count == 0)
92 if (q2.Count == 0)
93 continue;
93 continue;
94 q2.Dequeue();
94 q2.Dequeue();
95 i++;
95 i++;
96 }
96 }
97
97
98 })
99 }
98 }
99 )
100 .Combine()
100 .Combine()
101 .Join();
101 .Join();
102
102
103
103
104
104
105 t2 = Environment.TickCount;
105 t2 = Environment.TickCount;
106 Console.WriteLine("Queue+Lock: {0} ms", t2 - t1);
106 Console.WriteLine("Queue+Lock: {0} ms", t2 - t1);
107
107
108 }
108 }
109 }
109 }
110 }
110 }
General Comments 0
You need to be logged in to leave comments. Login now