##// END OF EJS Templates
improved asyncpool usability...
cin -
r120:f1b897999260 v2
parent child
Show More
@@ -1,77 +1,86
1 1 using Implab.Diagnostics;
2 2 using System;
3 3 using System.Threading;
4 using System.Linq;
4 5
5 6 namespace Implab.Parallels {
6 7 /// <summary>
7 8 /// Класс для распаралСливания Π·Π°Π΄Π°Ρ‡.
8 9 /// </summary>
9 10 /// <remarks>
10 11 /// Π˜ΡΠΏΠΎΠ»ΡŒΠ·ΡƒΡ Π΄Π°Π½Π½Ρ‹ΠΉ класс ΠΈ лямда выраТСния ΠΌΠΎΠΆΠ½ΠΎ Ρ€Π°ΡΠΏΠ°Ρ€Π°Π»Π»Π΅Π»ΠΈΡ‚ΡŒ
11 12 /// вычислСния, для этого ΠΈΡΠΏΠΎΠ»ΡŒΠ·ΡƒΠ΅Ρ‚ΡΡ концСпция ΠΎΠ±Π΅Ρ‰Π°Π½ΠΈΠΉ.
12 13 /// </remarks>
13 14 public static class AsyncPool {
14 15
15 16 public static IPromise<T> Invoke<T>(Func<T> func) {
16 17 var p = new Promise<T>();
17 18 var caller = TraceContext.Instance.CurrentOperation;
18 19
19 20 ThreadPool.QueueUserWorkItem(param => {
20 21 TraceContext.Instance.EnterLogicalOperation(caller,false);
21 22 try {
22 23 p.Resolve(func());
23 24 } catch(Exception e) {
24 25 p.Reject(e);
25 26 } finally {
26 27 TraceContext.Instance.Leave();
27 28 }
28 29 });
29 30
30 31 return p;
31 32 }
32 33
33 34 public static IPromise<T> InvokeNewThread<T>(Func<T> func) {
34 35 var p = new Promise<T>();
35 36
36 37 var caller = TraceContext.Instance.CurrentOperation;
37 38
38 39 var worker = new Thread(() => {
39 40 TraceContext.Instance.EnterLogicalOperation(caller,false);
40 41 try {
41 42 p.Resolve(func());
42 43 } catch (Exception e) {
43 44 p.Reject(e);
44 45 } finally {
45 46 TraceContext.Instance.Leave();
46 47 }
47 48 });
48 49 worker.IsBackground = true;
49 50 worker.Start();
50 51
51 52 return p;
52 53 }
53 54
54 55
55 56 public static IPromise InvokeNewThread(Action func) {
56 57 var p = new Promise();
57 58
58 59 var caller = TraceContext.Instance.CurrentOperation;
59 60
60 61 var worker = new Thread(() => {
61 62 TraceContext.Instance.EnterLogicalOperation(caller,false);
62 63 try {
63 64 func();
64 65 p.Resolve();
65 66 } catch (Exception e) {
66 67 p.Reject(e);
67 68 } finally {
68 69 TraceContext.Instance.Leave();
69 70 }
70 71 });
71 72 worker.IsBackground = true;
72 73 worker.Start();
73 74
74 75 return p;
75 76 }
77
78 public static IPromise[] ThreadRun(params Action[] func) {
79 return func.Select(f => InvokeNewThread(f)).ToArray();
80 }
81
82 public static IPromise<T>[] ThreadRun<T>(params Func<T>[] func) {
83 return func.Select(f => InvokeNewThread(f)).ToArray();
84 }
76 85 }
77 86 }
@@ -1,244 +1,271
1 1 using System.Threading;
2 2 using System.Collections.Generic;
3 3 using System;
4 4 using System.Collections;
5 5
6 6 namespace Implab.Parallels {
7 7 public class AsyncQueue<T> : IEnumerable<T> {
8 8 class Chunk {
9 9 public Chunk next;
10 10
11 11 int m_low;
12 12 int m_hi;
13 13 int m_alloc;
14 14 readonly int m_size;
15 15 readonly T[] m_data;
16 16
17 17 public Chunk(int size) {
18 18 m_size = size;
19 19 m_data = new T[size];
20 20 }
21 21
22 22 public Chunk(int size, T value) {
23 23 m_size = size;
24 24 m_hi = 1;
25 25 m_alloc = 1;
26 26 m_data = new T[size];
27 27 m_data[0] = value;
28 28 }
29 29
30 30 public int Low {
31 31 get { return m_low; }
32 32 }
33 33
34 34 public int Hi {
35 35 get { return m_hi; }
36 36 }
37 37
38 38 public bool TryEnqueue(T value,out bool extend) {
39 extend = false;
40 int alloc;
41 do {
42 alloc = m_alloc;
43 if (alloc > m_size)
44 return false;
45 } while(alloc != Interlocked.CompareExchange(ref m_alloc, alloc+1, alloc));
39 var alloc = Interlocked.Increment(ref m_alloc) - 1;
46 40
47 if (alloc == m_size) {
48 extend = true;
41 if (alloc >= m_size) {
42 extend = alloc == m_size;
49 43 return false;
50 44 }
51
45
46 extend = false;
52 47 m_data[alloc] = value;
53 48
54 49 while (alloc != Interlocked.CompareExchange(ref m_hi, alloc + 1, alloc)) {
55 50 // spin wait for commit
56 51 }
57 52 return true;
58 53 }
59 54
60 55 public bool TryDequeue(out T value,out bool recycle) {
61 56 int low;
62 57 do {
63 58 low = m_low;
64 59 if (low >= m_hi) {
65 60 value = default(T);
66 61 recycle = (low == m_size);
67 62 return false;
68 63 }
69 64 } while(low != Interlocked.CompareExchange(ref m_low, low + 1, low));
70 65
71 66 recycle = (low == m_size - 1);
72 67 value = m_data[low];
73 68
74 69 return true;
75 70 }
76 71
72 public bool TryEnqueueBatch(T[] batch, int offset, int length, out int enqueued, out bool extend) {
73 int alloc;
74 int allocSize;
75
76 do {
77 alloc = m_alloc;
78
79 if (alloc > m_size) {
80 enqueued = 0;
81 extend = false;
82 return false;
83 }
84
85 allocSize = Math.Min(m_size - m_alloc, length);
86 } while(alloc != Interlocked.CompareExchange(ref m_alloc, alloc + allocSize, alloc));
87
88 if (alloc == m_size) {
89 enqueued = 0;
90 extend = true;
91 return false;
92 }
93
94 Array.Copy(batch, offset, m_data, alloc, allocSize);
95 enqueued = allocSize;
96 extend = false;
97
98 while (alloc != Interlocked.CompareExchange(ref m_hi, alloc + allocSize, alloc)) {
99 // spin wait for commit
100 }
101 return true;
102 }
103
77 104 public T GetAt(int pos) {
78 105 return m_data[pos];
79 106 }
80 107 }
81 108
82 109 public const int DEFAULT_CHUNK_SIZE = 32;
83 110
84 111 readonly int m_chunkSize = DEFAULT_CHUNK_SIZE;
85 112
86 113 Chunk m_first;
87 114 Chunk m_last;
88 115
89 116 public AsyncQueue() {
90 117 m_last = m_first = new Chunk(m_chunkSize);
91 118 }
92 119
93 120 public void Enqueue(T value) {
94 121 var last = m_last;
95 122 // spin wait to the new chunk
96 123 bool extend = true;
97 124 while(last == null || !last.TryEnqueue(value, out extend)) {
98 125 // try to extend queue
99 126 if (extend || last == null) {
100 127 var chunk = new Chunk(m_chunkSize, value);
101 128 if (EnqueueChunk(last, chunk))
102 129 break;
103 130 last = m_last;
104 131 } else {
105 132 while (last != m_last) {
106 133 Thread.MemoryBarrier();
107 134 last = m_last;
108 135 }
109 136 }
110 137 }
111 138 }
112 139
113 140 public bool TryDequeue(out T value) {
114 141 var chunk = m_first;
115 142 bool recycle;
116 143 while (chunk != null) {
117 144
118 145 var result = chunk.TryDequeue(out value, out recycle);
119 146
120 147 if (recycle) // this chunk is waste
121 148 RecycleFirstChunk(chunk);
122 149 else
123 150 return result; // this chunk is usable and returned actual result
124 151
125 152 if (result) // this chunk is waste but the true result is always actual
126 153 return true;
127 154
128 155 // try again
129 156 chunk = m_first;
130 157 }
131 158
132 159 // the queue is empty
133 160 value = default(T);
134 161 return false;
135 162 }
136 163
137 164 bool EnqueueChunk(Chunk last, Chunk chunk) {
138 165 if (Interlocked.CompareExchange(ref m_last, chunk, last) != last)
139 166 return false;
140 167
141 168 if (last != null)
142 169 last.next = chunk;
143 170 else
144 171 m_first = chunk;
145 172 return true;
146 173 }
147 174
148 175 void RecycleFirstChunk(Chunk first) {
149 176 var next = first.next;
150 177
151 178 if (next == null) {
152 179 // looks like this is the last chunk
153 180 if (first != Interlocked.CompareExchange(ref m_last, null, first)) {
154 181 // race
155 182 // maybe someone already recycled this chunk
156 183 // or a new chunk has been appedned to the queue
157 184
158 185 return; // give up
159 186 }
160 187 // the tail is updated
161 188 }
162 189
163 190 // we need to update the head
164 191 Interlocked.CompareExchange(ref m_first, next, first);
165 192 // if the head is already updated then give up
166 193 return;
167 194
168 195 }
169 196
170 197 #region IEnumerable implementation
171 198
172 199 class Enumerator : IEnumerator<T> {
173 200 Chunk m_current;
174 201 int m_pos = -1;
175 202
176 203 public Enumerator(Chunk fisrt) {
177 204 m_current = fisrt;
178 205 }
179 206
180 207 #region IEnumerator implementation
181 208
182 209 public bool MoveNext() {
183 210 if (m_current == null)
184 211 return false;
185 212
186 213 if (m_pos == -1)
187 214 m_pos = m_current.Low;
188 215 else
189 216 m_pos++;
190 217 if (m_pos == m_current.Hi) {
191 218 m_pos = 0;
192 219 m_current = m_current.next;
193 220 }
194 221
195 222 return true;
196 223 }
197 224
198 225 public void Reset() {
199 226 throw new NotSupportedException();
200 227 }
201 228
202 229 object IEnumerator.Current {
203 230 get {
204 231 return Current;
205 232 }
206 233 }
207 234
208 235 #endregion
209 236
210 237 #region IDisposable implementation
211 238
212 239 public void Dispose() {
213 240 }
214 241
215 242 #endregion
216 243
217 244 #region IEnumerator implementation
218 245
219 246 public T Current {
220 247 get {
221 248 if (m_pos == -1 || m_current == null)
222 249 throw new InvalidOperationException();
223 250 return m_current.GetAt(m_pos);
224 251 }
225 252 }
226 253
227 254 #endregion
228 255 }
229 256
230 257 public IEnumerator<T> GetEnumerator() {
231 258 return new Enumerator(m_first);
232 259 }
233 260
234 261 #endregion
235 262
236 263 #region IEnumerable implementation
237 264
238 265 IEnumerator IEnumerable.GetEnumerator() {
239 266 return GetEnumerator();
240 267 }
241 268
242 269 #endregion
243 270 }
244 271 }
@@ -1,110 +1,110
1 1 using System;
2 2 using Implab.Diagnostics;
3 3 using Implab.Parallels;
4 4 using Implab;
5 5 using System.Collections.Generic;
6 6 using System.Collections.Concurrent;
7 7
8 8 namespace MonoPlay {
9 9 class MainClass {
10 10 public static void Main(string[] args) {
11 11 if (args == null)
12 12 throw new ArgumentNullException("args");
13 13
14 14 var q1 = new AsyncQueue<int>();
15 15 var q2 = new Queue<int>();
16 16
17 17 const int count = 10000000;
18 18
19 19
20 20 var t1 = Environment.TickCount;
21 21
22 new [] {
23 AsyncPool.InvokeNewThread(() => {
22 AsyncPool.ThreadRun(
23 () => {
24 24 for (var i = 0; i < count; i++)
25 25 q1.Enqueue(i);
26 }),
27 AsyncPool.InvokeNewThread(() => {
26 },
27 () => {
28 28 for (var i = 0; i < count; i++)
29 29 q1.Enqueue(i);
30 }),
31 AsyncPool.InvokeNewThread(() => {
30 },
31 () => {
32 32 int temp = 0;
33 33 int i = 0;
34 34 while (i < count)
35 35 if (q1.TryDequeue(out temp))
36 36 i++;
37 }),
38 AsyncPool.InvokeNewThread(() => {
37 },
38 () => {
39 39 int temp = 0;
40 40 int i = 0;
41 41 while (i < count)
42 42 if (q1.TryDequeue(out temp))
43 43 i++;
44 })
45 }
44 }
45 )
46 46 .Combine()
47 47 .Join();
48 48
49 49 var t2 = Environment.TickCount;
50 50 Console.WriteLine("MTQueue: {0} ms", t2 - t1);
51 51
52 52 t1 = Environment.TickCount;
53 53
54 54 for (var i = 0; i < count * 2; i++)
55 55 q2.Enqueue(i);
56 56
57 57 for (var i = 0; i < count * 2; i++)
58 58 q2.Dequeue();
59 59
60 60 t2 = Environment.TickCount;
61 61 Console.WriteLine("Queue: {0} ms", t2 - t1);
62 62
63 63 q2 = new Queue<int>();
64 64
65 65 t1 = Environment.TickCount;
66 66
67 67
68 new [] {
69 AsyncPool.InvokeNewThread(() => {
68 AsyncPool.ThreadRun(
69 () => {
70 70 for (var i = 0; i < count; i++)
71 71 lock (q2)
72 72 q2.Enqueue(i);
73 }),
74 AsyncPool.InvokeNewThread(() => {
73 },
74 () => {
75 75 for (var i = 0; i < count; i++)
76 76 lock (q2)
77 77 q2.Enqueue(i);
78 }),
79 AsyncPool.InvokeNewThread(() => {
78 },
79 () => {
80 80 for (int i = 0; i < count ;)
81 81 lock (q2) {
82 82 if (q2.Count == 0)
83 83 continue;
84 84 q2.Dequeue();
85 85 i++;
86 86 }
87 87
88 }),
89 AsyncPool.InvokeNewThread(() => {
88 },
89 () => {
90 90 for (int i = 0; i < count ;)
91 91 lock (q2) {
92 92 if (q2.Count == 0)
93 93 continue;
94 94 q2.Dequeue();
95 95 i++;
96 96 }
97 97
98 })
99 }
98 }
99 )
100 100 .Combine()
101 101 .Join();
102 102
103 103
104 104
105 105 t2 = Environment.TickCount;
106 106 Console.WriteLine("Queue+Lock: {0} ms", t2 - t1);
107 107
108 108 }
109 109 }
110 110 }
General Comments 0
You need to be logged in to leave comments. Login now