##// END OF EJS Templates
working version of AsyncQueue and batch operations...
cin -
r121:62d2f1e98c4e v2
parent child
Show More
@@ -299,47 +299,134 namespace Implab.Test {
299 Assert.AreEqual(i, res);
299 Assert.AreEqual(i, res);
300 }
300 }
301
301
302 int writers = 0;
302 const int count = 10000000;
303 int readers = 0;
304 var stop = new ManualResetEvent(false);
305 int total = 0;
306
303
307 const int itemsPerWriter = 10000;
304 int res1 = 0, res2 = 0;
308 const int writersCount = 10;
305 var t1 = Environment.TickCount;
309
306
310 for (int i = 0; i < writersCount; i++) {
307 AsyncPool.RunThread(
311 Interlocked.Increment(ref writers);
308 () => {
312 AsyncPool
309 for (var i = 0; i < count; i++)
313 .InvokeNewThread(() => {
314 for (int ii = 0; ii < itemsPerWriter; ii++) {
315 queue.Enqueue(1);
310 queue.Enqueue(1);
311 Console.WriteLine("done writer #1: {0} ms", Environment.TickCount - t1);
312 },
313 () => {
314 for (var i = 0; i < count; i++)
315 queue.Enqueue(2);
316 Console.WriteLine("done writer #2: {0} ms", Environment.TickCount - t1);
317 },
318 () => {
319 int temp;
320 int i = 0;
321 while (i < count)
322 if (queue.TryDequeue(out temp)) {
323 i++;
324 res1 += temp;
316 }
325 }
317 return 1;
326 Console.WriteLine("done reader #1: {0} ms", Environment.TickCount - t1);
318 })
327 },
319 .On(() => Interlocked.Decrement(ref writers), PromiseEventType.All);
328 () => {
329 int temp;
330 int i = 0;
331 while (i < count)
332 if (queue.TryDequeue(out temp)) {
333 i++;
334 res2 += temp;
335 }
336 Console.WriteLine("done reader #2: {0} ms", Environment.TickCount - t1);
337 }
338 )
339 .Combine()
340 .Join();
341
342 Assert.AreEqual(count * 3, res1 + res2);
343
344 Console.WriteLine(
345 "done: {0} ms, summ#1: {1}, summ#2: {2}, total: {3}, count: {4}",
346 Environment.TickCount - t1,
347 res1,
348 res2,
349 res1 + res2,
350 count
351 );
320 }
352 }
321
353
322 for (int i = 0; i < 10; i++) {
354 [TestMethod]
323 Interlocked.Increment(ref readers);
355 public void AsyncQueueBatchTest() {
324 AsyncPool
356 var queue = new AsyncQueue<int>();
325 .InvokeNewThread(() => {
357
326 int t;
358 const int wBatch = 29;
327 do {
359 const int wCount = 400000;
328 while (queue.TryDequeue(out t))
360 const int total = wBatch * wCount * 2;
329 Interlocked.Add(ref total, t);
361 const int summ = wBatch * wCount * 3;
330 } while (writers > 0);
362
331 return 1;
363 int r1 = 0, r2 = 0;
332 })
364 const int rBatch = 111;
333 .On(() => {
365 int read = 0;
334 Interlocked.Decrement(ref readers);
366
335 if (readers == 0)
367 var t1 = Environment.TickCount;
336 stop.Set();
368
337 }, PromiseEventType.All);
369 AsyncPool.RunThread(
370 () => {
371 var buffer = new int[wBatch];
372 for(int i = 0; i<wBatch; i++)
373 buffer[i] = 1;
374
375 for(int i =0; i < wCount; i++)
376 queue.EnqueueRange(buffer,0,wBatch);
377 Console.WriteLine("done writer #1: {0} ms", Environment.TickCount - t1);
378 },
379 () => {
380 var buffer = new int[wBatch];
381 for(int i = 0; i<wBatch; i++)
382 buffer[i] = 2;
383
384 for(int i =0; i < wCount; i++)
385 queue.EnqueueRange(buffer,0,wBatch);
386 Console.WriteLine("done writer #2: {0} ms", Environment.TickCount - t1);
387 },
388 () => {
389 var buffer = new int[rBatch];
390
391 while(read < total) {
392 int actual;
393 if (queue.TryDequeueRange(buffer,0,rBatch,out actual)) {
394 for(int i=0; i< actual; i++)
395 r1 += buffer[i];
396 Interlocked.Add(ref read, actual);
397 }
338 }
398 }
339
399
340 stop.WaitOne();
400 Console.WriteLine("done reader #1: {0} ms", Environment.TickCount - t1);
401 },
402 () => {
403 var buffer = new int[rBatch];
404
405 while(read < total) {
406 int actual;
407 if (queue.TryDequeueRange(buffer,0,rBatch,out actual)) {
408 for(int i=0; i< actual; i++)
409 r2 += buffer[i];
410 Interlocked.Add(ref read, actual);
411 }
412 }
341
413
342 Assert.AreEqual(itemsPerWriter * writersCount, total);
414 Console.WriteLine("done reader #2: {0} ms", Environment.TickCount - t1);
415 }
416 )
417 .Combine()
418 .Join();
419
420 Assert.AreEqual(summ , r1 + r2);
421
422 Console.WriteLine(
423 "done: {0} ms, summ#1: {1}, summ#2: {2}, total: {3}, count: {4}",
424 Environment.TickCount - t1,
425 r1,
426 r2,
427 r1 + r2,
428 total
429 );
343 }
430 }
344
431
345 [TestMethod]
432 [TestMethod]
@@ -75,11 +75,11 namespace Implab.Parallels {
75 return p;
75 return p;
76 }
76 }
77
77
78 public static IPromise[] ThreadRun(params Action[] func) {
78 public static IPromise[] RunThread(params Action[] func) {
79 return func.Select(f => InvokeNewThread(f)).ToArray();
79 return func.Select(f => InvokeNewThread(f)).ToArray();
80 }
80 }
81
81
82 public static IPromise<T>[] ThreadRun<T>(params Func<T>[] func) {
82 public static IPromise<T>[] RunThread<T>(params Func<T>[] func) {
83 return func.Select(f => InvokeNewThread(f)).ToArray();
83 return func.Select(f => InvokeNewThread(f)).ToArray();
84 }
84 }
85 }
85 }
@@ -27,6 +27,14 namespace Implab.Parallels {
27 m_data[0] = value;
27 m_data[0] = value;
28 }
28 }
29
29
30 public Chunk(int size, T[] data, int offset, int length, int alloc) {
31 m_size = size;
32 m_hi = length;
33 m_alloc = alloc;
34 m_data = new T[size];
35 Array.Copy(data, offset, m_data, 0, length);
36 }
37
30 public int Low {
38 public int Low {
31 get { return m_low; }
39 get { return m_low; }
32 }
40 }
@@ -73,27 +81,35 namespace Implab.Parallels {
73 int alloc;
81 int alloc;
74 int allocSize;
82 int allocSize;
75
83
84 // in case the batch size is larger than a free space in chunk
85 // tailGap is used to over allocate the space in the chunk to
86 // get exclusive permission on creation of the next one.
87 int tailGap = 0;
88
76 do {
89 do {
77 alloc = m_alloc;
90 alloc = m_alloc;
78
91
79 if (alloc > m_size) {
92 if (alloc > m_size) {
80 enqueued = 0;
93 // the chunk is full and someone already
81 extend = false;
94 // creating the new one
82 return false;
95 enqueued = 0; // nothing was added
96 extend = false; // the caller shouldn't try to extend the queue
97 return false; // nothing was added
83 }
98 }
84
99
85 allocSize = Math.Min(m_size - m_alloc, length);
100 allocSize = Math.Min(m_size - alloc, length);
86 } while(alloc != Interlocked.CompareExchange(ref m_alloc, alloc + allocSize, alloc));
101 if (allocSize < length) // the chunk doesn't have enough space to hold the whole batch
102 tailGap = 1; // overallocate space to get exclusive permission to extend queue
103 } while(alloc != Interlocked.CompareExchange(ref m_alloc, alloc + allocSize + tailGap, alloc));
87
104
88 if (alloc == m_size) {
105 extend = tailGap != 0;
89 enqueued = 0;
106 enqueued = allocSize;
90 extend = true;
107
108 // if the chunk was already full then length > 0, allocSize = 0, tailGap = 1
109 if (alloc == m_size)
91 return false;
110 return false;
92 }
93
111
94 Array.Copy(batch, offset, m_data, alloc, allocSize);
112 Array.Copy(batch, offset, m_data, alloc, allocSize);
95 enqueued = allocSize;
96 extend = false;
97
113
98 while (alloc != Interlocked.CompareExchange(ref m_hi, alloc + allocSize, alloc)) {
114 while (alloc != Interlocked.CompareExchange(ref m_hi, alloc + allocSize, alloc)) {
99 // spin wait for commit
115 // spin wait for commit
@@ -101,12 +117,35 namespace Implab.Parallels {
101 return true;
117 return true;
102 }
118 }
103
119
120 public bool TryDequeueBatch(T[] buffer, int offset, int length,out int dequeued, out bool recycle) {
121 int low, hi, batchSize;
122
123 do {
124 low = m_low;
125 hi = m_hi;
126 if (low >= hi) {
127 dequeued = 0;
128 recycle = (low == m_size); // recycling could be restarted and we need to signal again
129 return false;
130 }
131 batchSize = Math.Min(hi - low, length);
132 } while(low != Interlocked.CompareExchange(ref m_low, low + batchSize, low));
133
134 recycle = (low == m_size - batchSize);
135 dequeued = batchSize;
136
137 Array.Copy(m_data, low, buffer, offset, batchSize);
138
139 return true;
140 }
141
104 public T GetAt(int pos) {
142 public T GetAt(int pos) {
105 return m_data[pos];
143 return m_data[pos];
106 }
144 }
107 }
145 }
108
146
109 public const int DEFAULT_CHUNK_SIZE = 32;
147 public const int DEFAULT_CHUNK_SIZE = 32;
148 public const int MAX_CHUNK_SIZE = 262144;
110
149
111 readonly int m_chunkSize = DEFAULT_CHUNK_SIZE;
150 readonly int m_chunkSize = DEFAULT_CHUNK_SIZE;
112
151
@@ -117,6 +156,10 namespace Implab.Parallels {
117 m_last = m_first = new Chunk(m_chunkSize);
156 m_last = m_first = new Chunk(m_chunkSize);
118 }
157 }
119
158
159 /// <summary>
160 /// Adds the specified value to the queue.
161 /// </summary>
162 /// <param name="value">Tha value which will be added to the queue.</param>
120 public void Enqueue(T value) {
163 public void Enqueue(T value) {
121 var last = m_last;
164 var last = m_last;
122 // spin wait to the new chunk
165 // spin wait to the new chunk
@@ -129,14 +172,88 namespace Implab.Parallels {
129 break;
172 break;
130 last = m_last;
173 last = m_last;
131 } else {
174 } else {
132 while (last != m_last) {
175 while (last == m_last) {
133 Thread.MemoryBarrier();
176 Thread.MemoryBarrier();
177 }
134 last = m_last;
178 last = m_last;
135 }
179 }
136 }
180 }
137 }
181 }
182
183 /// <summary>
184 /// Adds the specified data to the queue.
185 /// </summary>
186 /// <param name="data">The buffer which contains the data to be enqueued.</param>
187 /// <param name="offset">The offset of the data in the buffer.</param>
188 /// <param name="length">The size of the data to read from the buffer.</param>
189 public void EnqueueRange(T[] data, int offset, int length) {
190 if (data == null)
191 throw new ArgumentNullException("data");
192 if (offset < 0)
193 throw new ArgumentOutOfRangeException("offset");
194 if (length < 1 || offset + length > data.Length)
195 throw new ArgumentOutOfRangeException("length");
196
197 var last = m_last;
198
199 bool extend;
200 int enqueued;
201
202 while (length > 0) {
203 extend = true;
204 if (last != null && last.TryEnqueueBatch(data, offset, length, out enqueued, out extend)) {
205 length -= enqueued;
206 offset += enqueued;
138 }
207 }
139
208
209 if (extend) {
210 // there was no enough space in the chunk
211 // or there was no chunks in the queue
212
213 while (length > 0) {
214
215 var size = Math.Min(length, MAX_CHUNK_SIZE);
216
217 var chunk = new Chunk(
218 Math.Max(size, m_chunkSize),
219 data,
220 offset,
221 size,
222 length // length >= size
223 );
224
225 if (!EnqueueChunk(last, chunk)) {
226 // looks like the queue has been updated then proceed from the beginning
227 last = m_last;
228 break;
229 }
230
231 // we have successfully added the new chunk
232 last = chunk;
233 length -= size;
234 offset += size;
235 }
236 } else {
237 // we don't need to extend the queue, if we successfully enqueued data
238 if (length == 0)
239 break;
240
241 // if we need to wait while someone is extending the queue
242 // spinwait
243 while (last == m_last) {
244 Thread.MemoryBarrier();
245 }
246
247 last = m_last;
248 }
249 }
250 }
251
252 /// <summary>
253 /// Tries to retrieve the first element from the queue.
254 /// </summary>
255 /// <returns><c>true</c>, if element is dequeued, <c>false</c> otherwise.</returns>
256 /// <param name="value">The value of the dequeued element.</param>
140 public bool TryDequeue(out T value) {
257 public bool TryDequeue(out T value) {
141 var chunk = m_first;
258 var chunk = m_first;
142 bool recycle;
259 bool recycle;
@@ -161,6 +278,92 namespace Implab.Parallels {
161 return false;
278 return false;
162 }
279 }
163
280
281 /// <summary>
282 /// Tries to dequeue the specified amount of data from the queue.
283 /// </summary>
284 /// <returns><c>true</c>, if data was deuqueued, <c>false</c> otherwise.</returns>
285 /// <param name="buffer">The buffer to which the data will be written.</param>
286 /// <param name="offset">The offset in the buffer at which the data will be written.</param>
287 /// <param name="length">The maximum amount of data to be retrieved.</param>
288 /// <param name="dequeued">The actual amout of the retrieved data.</param>
289 public bool TryDequeueRange(T[] buffer, int offset, int length, out int dequeued) {
290 if (buffer == null)
291 throw new ArgumentNullException("buffer");
292 if (offset < 0)
293 throw new ArgumentOutOfRangeException("offset");
294 if (length < 1 || offset + length > buffer.Length)
295 throw new ArgumentOutOfRangeException("length");
296
297 var chunk = m_first;
298 bool recycle;
299 dequeued = 0;
300 while (chunk != null) {
301
302 int actual;
303 if (chunk.TryDequeueBatch(buffer, offset, length, out actual, out recycle)) {
304 offset += actual;
305 length -= actual;
306 dequeued += actual;
307 }
308
309 if (recycle) // this chunk is waste
310 RecycleFirstChunk(chunk);
311 else if (actual == 0)
312 break; // the chunk is usable, but doesn't contain any data (it's the last chunk in the queue)
313
314 if (length == 0)
315 return true;
316
317 // we still may dequeue something
318 // try again
319 chunk = m_first;
320 }
321
322 return dequeued != 0;
323 }
324
325 /// <summary>
326 /// Tries to dequeue all remaining data in the first chunk.
327 /// </summary>
328 /// <returns><c>true</c>, if data was dequeued, <c>false</c> otherwise.</returns>
329 /// <param name="buffer">The buffer to which data will be written.</param>
330 /// <param name="offset">The offset in the buffer at which the data will be written.</param>
331 /// <param name="length">Tha maximum amount of the data to be dequeued.</param>
332 /// <param name="dequeued">The actual amount of the dequeued data.</param>
333 public bool TryDequeueChunk(T[] buffer, int offset, int length, out int dequeued) {
334 if (buffer == null)
335 throw new ArgumentNullException("buffer");
336 if (offset < 0)
337 throw new ArgumentOutOfRangeException("offset");
338 if (length < 1 || offset + length > buffer.Length)
339 throw new ArgumentOutOfRangeException("length");
340
341 var chunk = m_first;
342 bool recycle;
343 dequeued = 0;
344
345 while (chunk != null) {
346
347 int actual;
348 if (chunk.TryDequeueBatch(buffer, offset, length, out actual, out recycle)) {
349 dequeued = actual;
350 }
351
352 if (recycle) // this chunk is waste
353 RecycleFirstChunk(chunk);
354
355 // if we have dequeued any data, then return
356 if (dequeued != 0)
357 return true;
358
359 // we still may dequeue something
360 // try again
361 chunk = m_first;
362 }
363
364 return false;
365 }
366
164 bool EnqueueChunk(Chunk last, Chunk chunk) {
367 bool EnqueueChunk(Chunk last, Chunk chunk) {
165 if (Interlocked.CompareExchange(ref m_last, chunk, last) != last)
368 if (Interlocked.CompareExchange(ref m_last, chunk, last) != last)
166 return false;
369 return false;
@@ -9,31 +9,31 namespace Implab
9 {
9 {
10 public static class Safe
10 public static class Safe
11 {
11 {
12 public static void ArgumentMatch(string param, string name, Regex rx) {
12 public static void ArgumentMatch(string value, string paramName, Regex rx) {
13 if (rx == null)
13 if (rx == null)
14 throw new ArgumentNullException("rx");
14 throw new ArgumentNullException("rx");
15 if (!rx.IsMatch(param))
15 if (!rx.IsMatch(value))
16 throw new ArgumentException(String.Format("The prameter value must match {0}", rx), name);
16 throw new ArgumentException(String.Format("The prameter value must match {0}", rx), paramName);
17 }
17 }
18
18
19 public static void ArgumentNotEmpty(string param, string name) {
19 public static void ArgumentNotEmpty(string value, string paramName) {
20 if (String.IsNullOrEmpty(param))
20 if (String.IsNullOrEmpty(value))
21 throw new ArgumentException("The parameter can't be empty", name);
21 throw new ArgumentException("The parameter can't be empty", paramName);
22 }
22 }
23
23
24 public static void ArgumentNotEmpty<T>(T[] param, string name) {
24 public static void ArgumentNotEmpty<T>(T[] value, string paramName) {
25 if (param == null || param.Length == 0)
25 if (value == null || value.Length == 0)
26 throw new ArgumentException("The array must be not emty", name);
26 throw new ArgumentException("The array must be not emty", paramName);
27 }
27 }
28
28
29 public static void ArgumentNotNull(object param, string name) {
29 public static void ArgumentNotNull(object value, string paramName) {
30 if (param == null)
30 if (value == null)
31 throw new ArgumentNullException(name);
31 throw new ArgumentNullException(paramName);
32 }
32 }
33
33
34 public static void ArgumentInRange(int arg, int min, int max, string name) {
34 public static void ArgumentInRange(int value, int min, int max, string paramName) {
35 if (arg < min || arg > max)
35 if (value < min || value > max)
36 throw new ArgumentOutOfRangeException(name);
36 throw new ArgumentOutOfRangeException(paramName);
37 }
37 }
38
38
39 public static void Dispose<T>(T obj) where T : class
39 public static void Dispose<T>(T obj) where T : class
@@ -16,36 +16,46 namespace MonoPlay {
16
16
17 const int count = 10000000;
17 const int count = 10000000;
18
18
19
19 int res1 = 0, res2 = 0;
20 var t1 = Environment.TickCount;
20 var t1 = Environment.TickCount;
21
21
22 AsyncPool.ThreadRun(
22 AsyncPool.RunThread(
23 () => {
23 () => {
24 for (var i = 0; i < count; i++)
24 for (var i = 0; i < count; i++)
25 q1.Enqueue(i);
25 q1.Enqueue(1);
26 Console.WriteLine("done writer #1: {0} ms", Environment.TickCount - t1);
26 },
27 },
27 () => {
28 () => {
28 for (var i = 0; i < count; i++)
29 for (var i = 0; i < count; i++)
29 q1.Enqueue(i);
30 q1.Enqueue(2);
31 Console.WriteLine("done writer #2: {0} ms", Environment.TickCount - t1);
30 },
32 },
31 () => {
33 () => {
32 int temp = 0;
34 int temp = 0;
33 int i = 0;
35 int i = 0;
34 while (i < count)
36 while (i < count)
35 if (q1.TryDequeue(out temp))
37 if (q1.TryDequeue(out temp)) {
36 i++;
38 i++;
39 res1 += temp;
40 }
41 Console.WriteLine("done reader #1: {0} ms", Environment.TickCount - t1);
37 },
42 },
38 () => {
43 () => {
39 int temp = 0;
44 int temp = 0;
40 int i = 0;
45 int i = 0;
41 while (i < count)
46 while (i < count)
42 if (q1.TryDequeue(out temp))
47 if (q1.TryDequeue(out temp)) {
43 i++;
48 i++;
49 res2 += temp;
50 }
51 Console.WriteLine("done reader #2: {0} ms", Environment.TickCount - t1);
44 }
52 }
45 )
53 )
46 .Combine()
54 .Combine()
47 .Join();
55 .Join();
48
56
57 Console.WriteLine("done: {0} ms, summ#1: {1}, summ#2: {2}, total: {3}", Environment.TickCount - t1, res1, res2, res1 + res2);
58
49 var t2 = Environment.TickCount;
59 var t2 = Environment.TickCount;
50 Console.WriteLine("MTQueue: {0} ms", t2 - t1);
60 Console.WriteLine("MTQueue: {0} ms", t2 - t1);
51
61
@@ -65,7 +75,7 namespace MonoPlay {
65 t1 = Environment.TickCount;
75 t1 = Environment.TickCount;
66
76
67
77
68 AsyncPool.ThreadRun(
78 AsyncPool.RunThread(
69 () => {
79 () => {
70 for (var i = 0; i < count; i++)
80 for (var i = 0; i < count; i++)
71 lock (q2)
81 lock (q2)
General Comments 0
You need to be logged in to leave comments. Login now