##// END OF EJS Templates
async queue improvements
cin -
r123:f4d6ea6969cc v2
parent child
Show More
@@ -1,463 +1,576
1 1 using System.Threading;
2 2 using System.Collections.Generic;
3 3 using System;
4 4 using System.Collections;
5 5
6 6 namespace Implab.Parallels {
7 7 public class AsyncQueue<T> : IEnumerable<T> {
8 8 class Chunk {
9 9 public Chunk next;
10 10
11 11 int m_low;
12 12 int m_hi;
13 13 int m_alloc;
14 14 readonly int m_size;
15 15 readonly T[] m_data;
16 16
17 17 public Chunk(int size) {
18 18 m_size = size;
19 19 m_data = new T[size];
20 20 }
21 21
22 22 public Chunk(int size, T value) {
23 23 m_size = size;
24 24 m_hi = 1;
25 25 m_alloc = 1;
26 26 m_data = new T[size];
27 27 m_data[0] = value;
28 28 }
29 29
30 30 public Chunk(int size, T[] data, int offset, int length, int alloc) {
31 31 m_size = size;
32 32 m_hi = length;
33 33 m_alloc = alloc;
34 34 m_data = new T[size];
35 35 Array.Copy(data, offset, m_data, 0, length);
36 36 }
37 37
38 38 public int Low {
39 39 get { return m_low; }
40 40 }
41 41
42 42 public int Hi {
43 43 get { return m_hi; }
44 44 }
45 45
46 46 public bool TryEnqueue(T value, out bool extend) {
47 47 var alloc = Interlocked.Increment(ref m_alloc) - 1;
48 48
49 49 if (alloc >= m_size) {
50 50 extend = alloc == m_size;
51 51 return false;
52 52 }
53 53
54 54 extend = false;
55 55 m_data[alloc] = value;
56 56
57 57 while (alloc != Interlocked.CompareExchange(ref m_hi, alloc + 1, alloc)) {
58 58 // spin wait for commit
59 59 }
60 60 return true;
61 61 }
62 62
63 63 public bool TryDequeue(out T value, out bool recycle) {
64 64 int low;
65 65 do {
66 66 low = m_low;
67 67 if (low >= m_hi) {
68 68 value = default(T);
69 69 recycle = (low == m_size);
70 70 return false;
71 71 }
72 72 } while(low != Interlocked.CompareExchange(ref m_low, low + 1, low));
73 73
74 74 recycle = (low == m_size - 1);
75 75 value = m_data[low];
76 76
77 77 return true;
78 78 }
79 79
80 80 public bool TryEnqueueBatch(T[] batch, int offset, int length, out int enqueued, out bool extend) {
81 81 //int alloc;
82 82 //int allocSize;
83 83
84 84 var alloc = Interlocked.Add(ref m_alloc, length) - length;
85 85 if (alloc > m_size) {
86 86 // the chunk is full and someone already
87 87 // creating the new one
88 88 enqueued = 0; // nothing was added
89 89 extend = false; // the caller shouldn't try to extend the queue
90 90 return false; // nothing was added
91 91 }
92 92
93 93 enqueued = Math.Min(m_size - alloc, length);
94 94 extend = length > enqueued;
95 95
96 96 if (enqueued == 0)
97 97 return false;
98 98
99 99
100 100 Array.Copy(batch, offset, m_data, alloc, enqueued);
101 101
102 102 while (alloc != Interlocked.CompareExchange(ref m_hi, alloc + enqueued, alloc)) {
103 103 // spin wait for commit
104 104 }
105 105
106 106 return true;
107 107 }
108 108
109 109 public bool TryDequeueBatch(T[] buffer, int offset, int length,out int dequeued, out bool recycle) {
110 110 int low, hi, batchSize;
111 111
112 112 do {
113 113 low = m_low;
114 114 hi = m_hi;
115 115 if (low >= hi) {
116 116 dequeued = 0;
117 117 recycle = (low == m_size); // recycling could be restarted and we need to signal again
118 118 return false;
119 119 }
120 120 batchSize = Math.Min(hi - low, length);
121 121 } while(low != Interlocked.CompareExchange(ref m_low, low + batchSize, low));
122 122
123 123 recycle = (low == m_size - batchSize);
124 124 dequeued = batchSize;
125 125
126 126 Array.Copy(m_data, low, buffer, offset, batchSize);
127 127
128 128 return true;
129 129 }
130 130
131 131 public T GetAt(int pos) {
132 132 return m_data[pos];
133 133 }
134 134 }
135 135
136 136 public const int DEFAULT_CHUNK_SIZE = 32;
137 137 public const int MAX_CHUNK_SIZE = 262144;
138 138
139 139 readonly int m_chunkSize = DEFAULT_CHUNK_SIZE;
140 140
141 141 Chunk m_first;
142 142 Chunk m_last;
143 143
144 144 public AsyncQueue() {
145 145 m_last = m_first = new Chunk(m_chunkSize);
146 146 }
147 147
148 148 /// <summary>
149 149 /// Adds the specified value to the queue.
150 150 /// </summary>
151 151 /// <param name="value">Tha value which will be added to the queue.</param>
152 152 public void Enqueue(T value) {
153 153 var last = m_last;
154 154 // spin wait to the new chunk
155 155 bool extend = true;
156 156 while (last == null || !last.TryEnqueue(value, out extend)) {
157 157 // try to extend queue
158 158 if (extend || last == null) {
159 159 var chunk = new Chunk(m_chunkSize, value);
160 160 if (EnqueueChunk(last, chunk))
161 161 break; // success! exit!
162 162 last = m_last;
163 163 } else {
164 164 while (last == m_last) {
165 165 Thread.MemoryBarrier();
166 166 }
167 167 last = m_last;
168 168 }
169 169 }
170 170 }
171 171
172 172 /// <summary>
173 173 /// Adds the specified data to the queue.
174 174 /// </summary>
175 175 /// <param name="data">The buffer which contains the data to be enqueued.</param>
176 176 /// <param name="offset">The offset of the data in the buffer.</param>
177 177 /// <param name="length">The size of the data to read from the buffer.</param>
178 178 public void EnqueueRange(T[] data, int offset, int length) {
179 179 if (data == null)
180 180 throw new ArgumentNullException("data");
181 181 if (offset < 0)
182 182 throw new ArgumentOutOfRangeException("offset");
183 183 if (length < 1 || offset + length > data.Length)
184 184 throw new ArgumentOutOfRangeException("length");
185 185
186 186 var last = m_last;
187 187
188 188 bool extend;
189 189 int enqueued;
190 190
191 191 while (length > 0) {
192 192 extend = true;
193 193 if (last != null && last.TryEnqueueBatch(data, offset, length, out enqueued, out extend)) {
194 194 length -= enqueued;
195 195 offset += enqueued;
196 196 }
197 197
198 198 if (extend) {
199 199 // there was no enough space in the chunk
200 200 // or there was no chunks in the queue
201 201
202 202 while (length > 0) {
203 203
204 204 var size = Math.Min(length, MAX_CHUNK_SIZE);
205 205
206 206 var chunk = new Chunk(
207 207 Math.Max(size, m_chunkSize),
208 208 data,
209 209 offset,
210 210 size,
211 211 length // length >= size
212 212 );
213 213
214 214 if (!EnqueueChunk(last, chunk)) {
215 215 // looks like the queue has been updated then proceed from the beginning
216 216 last = m_last;
217 217 break;
218 218 }
219 219
220 220 // we have successfully added the new chunk
221 221 last = chunk;
222 222 length -= size;
223 223 offset += size;
224 224 }
225 225 } else {
226 226 // we don't need to extend the queue, if we successfully enqueued data
227 227 if (length == 0)
228 228 break;
229 229
230 230 // if we need to wait while someone is extending the queue
231 231 // spinwait
232 232 while (last == m_last) {
233 233 Thread.MemoryBarrier();
234 234 }
235 235
236 236 last = m_last;
237 237 }
238 238 }
239 239 }
240 240
241 241 /// <summary>
242 242 /// Tries to retrieve the first element from the queue.
243 243 /// </summary>
244 244 /// <returns><c>true</c>, if element is dequeued, <c>false</c> otherwise.</returns>
245 245 /// <param name="value">The value of the dequeued element.</param>
246 246 public bool TryDequeue(out T value) {
247 247 var chunk = m_first;
248 248 bool recycle;
249 249 while (chunk != null) {
250 250
251 251 var result = chunk.TryDequeue(out value, out recycle);
252 252
253 253 if (recycle) // this chunk is waste
254 254 RecycleFirstChunk(chunk);
255 255 else
256 256 return result; // this chunk is usable and returned actual result
257 257
258 258 if (result) // this chunk is waste but the true result is always actual
259 259 return true;
260 260
261 261 // try again
262 262 chunk = m_first;
263 263 }
264 264
265 265 // the queue is empty
266 266 value = default(T);
267 267 return false;
268 268 }
269 269
270 270 /// <summary>
271 271 /// Tries to dequeue the specified amount of data from the queue.
272 272 /// </summary>
273 273 /// <returns><c>true</c>, if data was deuqueued, <c>false</c> otherwise.</returns>
274 274 /// <param name="buffer">The buffer to which the data will be written.</param>
275 275 /// <param name="offset">The offset in the buffer at which the data will be written.</param>
276 276 /// <param name="length">The maximum amount of data to be retrieved.</param>
277 277 /// <param name="dequeued">The actual amout of the retrieved data.</param>
278 278 public bool TryDequeueRange(T[] buffer, int offset, int length, out int dequeued) {
279 279 if (buffer == null)
280 280 throw new ArgumentNullException("buffer");
281 281 if (offset < 0)
282 282 throw new ArgumentOutOfRangeException("offset");
283 283 if (length < 1 || offset + length > buffer.Length)
284 284 throw new ArgumentOutOfRangeException("length");
285 285
286 286 var chunk = m_first;
287 287 bool recycle;
288 288 dequeued = 0;
289 289 while (chunk != null) {
290 290
291 291 int actual;
292 292 if (chunk.TryDequeueBatch(buffer, offset, length, out actual, out recycle)) {
293 293 offset += actual;
294 294 length -= actual;
295 295 dequeued += actual;
296 296 }
297 297
298 298 if (recycle) // this chunk is waste
299 299 RecycleFirstChunk(chunk);
300 300 else if (actual == 0)
301 301 break; // the chunk is usable, but doesn't contain any data (it's the last chunk in the queue)
302 302
303 303 if (length == 0)
304 304 return true;
305 305
306 306 // we still may dequeue something
307 307 // try again
308 308 chunk = m_first;
309 309 }
310 310
311 311 return dequeued != 0;
312 312 }
313 313
314 314 /// <summary>
315 315 /// Tries to dequeue all remaining data in the first chunk.
316 316 /// </summary>
317 317 /// <returns><c>true</c>, if data was dequeued, <c>false</c> otherwise.</returns>
318 318 /// <param name="buffer">The buffer to which the data will be written.</param>
319 319 /// <param name="offset">The offset in the buffer at which the data will be written.</param>
320 320 /// <param name="length">Tha maximum amount of the data to be dequeued.</param>
321 321 /// <param name="dequeued">The actual amount of the dequeued data.</param>
322 322 public bool TryDequeueChunk(T[] buffer, int offset, int length, out int dequeued) {
323 323 if (buffer == null)
324 324 throw new ArgumentNullException("buffer");
325 325 if (offset < 0)
326 326 throw new ArgumentOutOfRangeException("offset");
327 327 if (length < 1 || offset + length > buffer.Length)
328 328 throw new ArgumentOutOfRangeException("length");
329 329
330 330 var chunk = m_first;
331 331 bool recycle;
332 332 dequeued = 0;
333 333
334 334 while (chunk != null) {
335 335
336 336 int actual;
337 337 if (chunk.TryDequeueBatch(buffer, offset, length, out actual, out recycle)) {
338 338 dequeued = actual;
339 339 }
340 340
341 341 if (recycle) // this chunk is waste
342 342 RecycleFirstChunk(chunk);
343 343
344 344 // if we have dequeued any data, then return
345 345 if (dequeued != 0)
346 346 return true;
347 347
348 348 // we still may dequeue something
349 349 // try again
350 350 chunk = m_first;
351 351 }
352 352
353 353 return false;
354 354 }
355 355
356 356 bool EnqueueChunk(Chunk last, Chunk chunk) {
357 357 if (Interlocked.CompareExchange(ref m_last, chunk, last) != last)
358 358 return false;
359 359
360 360 if (last != null)
361 361 last.next = chunk;
362 362 else
363 363 m_first = chunk;
364 364 return true;
365 365 }
366 366
367 367 void RecycleFirstChunk(Chunk first) {
368 368 var next = first.next;
369 369
370 370 if (next == null) {
371 371 // looks like this is the last chunk
372 372 if (first != Interlocked.CompareExchange(ref m_last, null, first)) {
373 373 // race
374 374 // maybe someone already recycled this chunk
375 375 // or a new chunk has been appedned to the queue
376 376
377 377 return; // give up
378 378 }
379 379 // the tail is updated
380 380 }
381 381
382 382 // we need to update the head
383 383 Interlocked.CompareExchange(ref m_first, next, first);
384 384 // if the head is already updated then give up
385 385 return;
386 386
387 387 }
388 388
389 public void Clear() {
390 // start the new queue
391 var t = new Chunk(m_chunkSize);
392 Thread.MemoryBarrier();
393 m_last = t;
394 Thread.MemoryBarrier();
395
396 // make the new queue available to the readers, and stop the old one
397 m_first = t;
398 Thread.MemoryBarrier();
399 }
400
401 public T[] Drain() {
402 // start the new queue
403 var t = new Chunk(m_chunkSize);
404 Thread.MemoryBarrier();
405 m_last = t;
406 Thread.MemoryBarrier();
407
408 // make the new queue available to the readers, and stop the old one
409 Chunk first;
410
411 do {
412 first = m_first;
413 } while(first != Interlocked.CompareExchange(ref m_first
414 Thread.MemoryBarrier();
415
416
417 }
418
419 T[] ReadChunks(Chunk chunk) {
420 var result = new List<T>();
421 var buffer = new T[m_chunkSize];
422 int actual;
423 bool recycle;
424 while (chunk != null) {
425 // we need to read the chunk using this way
426 // since some client still may completing the dequeue
427 // operation, such clients most likely won't get results
428 while (chunk.TryDequeueBatch(buffer, 0, buffer.Length, out actual, out recycle))
429 result.AddRange(new ArraySegmentCollection(buffer, 0, actual));
430
431 chunk = chunk.next;
432 }
433
434 return result.ToArray();
435 }
436
437 struct ArraySegmentCollection : ICollection<T> {
438 readonly T[] m_data;
439 readonly int m_offset;
440 readonly int m_length;
441
442 public ArraySegmentCollection(T[] data, int offset, int length) {
443 m_data = data;
444 m_offset = offset;
445 m_length = length;
446 }
447
448 #region ICollection implementation
449
450 public void Add(T item) {
451 throw new InvalidOperationException();
452 }
453
454 public void Clear() {
455 throw new InvalidOperationException();
456 }
457
458 public bool Contains(T item) {
459 return false;
460 }
461
462 public void CopyTo(T[] array, int arrayIndex) {
463 Array.Copy(m_data,m_offset,array,arrayIndex, m_length);
464 }
465
466 public bool Remove(T item) {
467 throw new NotImplementedException();
468 }
469
470 public int Count {
471 get {
472 return m_length;
473 }
474 }
475
476 public bool IsReadOnly {
477 get {
478 return true;
479 }
480 }
481
482 #endregion
483
484 #region IEnumerable implementation
485
486 public IEnumerator<T> GetEnumerator() {
487 for (int i = m_offset; i < m_length + m_offset; i++)
488 yield return m_data[i];
489 }
490
491 #endregion
492
493 #region IEnumerable implementation
494
495 IEnumerator IEnumerable.GetEnumerator() {
496 return GetEnumerator();
497 }
498
499 #endregion
500 }
501
389 502 #region IEnumerable implementation
390 503
391 504 class Enumerator : IEnumerator<T> {
392 505 Chunk m_current;
393 506 int m_pos = -1;
394 507
395 508 public Enumerator(Chunk fisrt) {
396 509 m_current = fisrt;
397 510 }
398 511
399 512 #region IEnumerator implementation
400 513
401 514 public bool MoveNext() {
402 515 if (m_current == null)
403 516 return false;
404 517
405 518 if (m_pos == -1)
406 519 m_pos = m_current.Low;
407 520 else
408 521 m_pos++;
409 522 if (m_pos == m_current.Hi) {
410 523 m_pos = 0;
411 524 m_current = m_current.next;
412 525 }
413 526
414 527 return true;
415 528 }
416 529
417 530 public void Reset() {
418 531 throw new NotSupportedException();
419 532 }
420 533
421 534 object IEnumerator.Current {
422 535 get {
423 536 return Current;
424 537 }
425 538 }
426 539
427 540 #endregion
428 541
429 542 #region IDisposable implementation
430 543
431 544 public void Dispose() {
432 545 }
433 546
434 547 #endregion
435 548
436 549 #region IEnumerator implementation
437 550
438 551 public T Current {
439 552 get {
440 553 if (m_pos == -1 || m_current == null)
441 554 throw new InvalidOperationException();
442 555 return m_current.GetAt(m_pos);
443 556 }
444 557 }
445 558
446 559 #endregion
447 560 }
448 561
449 562 public IEnumerator<T> GetEnumerator() {
450 563 return new Enumerator(m_first);
451 564 }
452 565
453 566 #endregion
454 567
455 568 #region IEnumerable implementation
456 569
457 570 IEnumerator IEnumerable.GetEnumerator() {
458 571 return GetEnumerator();
459 572 }
460 573
461 574 #endregion
462 575 }
463 576 }
General Comments 0
You need to be logged in to leave comments. Login now