AsyncQueue.cs
563 lines
| 18.7 KiB
| text/x-csharp
|
CSharpLexer
|
|
r119 | using System.Threading; | ||
| using System.Collections.Generic; | ||||
| using System; | ||||
| using System.Collections; | ||||
|
|
r124 | using System.Diagnostics; | ||
|
|
r234 | using System.Runtime.CompilerServices; | ||
|
|
r119 | |||
| namespace Implab.Parallels { | ||||
| public class AsyncQueue<T> : IEnumerable<T> { | ||||
| class Chunk { | ||||
|
|
r233 | public volatile Chunk next; | ||
|
|
r119 | |||
|
|
r233 | volatile int m_low; | ||
| volatile int m_hi; | ||||
| volatile int m_alloc; | ||||
|
|
r119 | readonly int m_size; | ||
| readonly T[] m_data; | ||||
| public Chunk(int size) { | ||||
| m_size = size; | ||||
| m_data = new T[size]; | ||||
| } | ||||
| public Chunk(int size, T value) { | ||||
| m_size = size; | ||||
| m_hi = 1; | ||||
| m_alloc = 1; | ||||
| m_data = new T[size]; | ||||
| m_data[0] = value; | ||||
| } | ||||
|
|
r233 | public Chunk(int size, int allocated) { | ||
|
|
r121 | m_size = size; | ||
|
|
r233 | m_hi = allocated; | ||
| m_alloc = allocated; | ||||
|
|
r121 | m_data = new T[size]; | ||
|
|
r233 | } | ||
| public void WriteData(T[] data, int offset, int dest, int length) { | ||||
| Array.Copy(data, offset, m_data, dest, length); | ||||
|
|
r121 | } | ||
|
|
r119 | public int Low { | ||
| get { return m_low; } | ||||
| } | ||||
| public int Hi { | ||||
| get { return m_hi; } | ||||
| } | ||||
|
|
r127 | public int Size { | ||
| get { return m_size; } | ||||
| } | ||||
|
|
r234 | [MethodImpl(MethodImplOptions.AggressiveInlining)] | ||
| void AwaitWrites(int mark) { | ||||
| if (m_hi != mark) { | ||||
| SpinWait spin = new SpinWait(); | ||||
| do { | ||||
| spin.SpinOnce(); | ||||
| } while (m_hi != mark); | ||||
| } | ||||
| } | ||||
|
|
r233 | public bool TryEnqueue(T value) { | ||
| int alloc; | ||||
| do { | ||||
| alloc = m_alloc; | ||||
| if (alloc >= m_size) | ||||
| return false; | ||||
| } while(alloc != Interlocked.CompareExchange(ref m_alloc, alloc + 1, alloc)); | ||||
|
|
r119 | m_data[alloc] = value; | ||
|
|
r234 | AwaitWrites(alloc); | ||
|
|
r233 | m_hi = alloc + 1; | ||
|
|
r119 | return true; | ||
| } | ||||
|
|
r124 | /// <summary> | ||
| /// Prevents from allocating new space in the chunk and waits for all write operations to complete | ||||
| /// </summary> | ||||
|
|
r233 | public void Seal() { | ||
| var actual = Math.Min(Interlocked.Exchange(ref m_alloc, m_size), m_size); | ||||
|
|
r234 | AwaitWrites(actual); | ||
|
|
r124 | } | ||
|
|
r121 | public bool TryDequeue(out T value, out bool recycle) { | ||
|
|
r119 | int low; | ||
| do { | ||||
| low = m_low; | ||||
| if (low >= m_hi) { | ||||
| value = default(T); | ||||
| recycle = (low == m_size); | ||||
| return false; | ||||
| } | ||||
|
|
r233 | } while (low != Interlocked.CompareExchange(ref m_low, low + 1, low)); | ||
|
|
r119 | |||
|
|
r233 | recycle = (low + 1 == m_size); | ||
|
|
r119 | value = m_data[low]; | ||
| return true; | ||||
| } | ||||
|
|
r233 | public bool TryEnqueueBatch(T[] batch, int offset, int length, out int enqueued) { | ||
| int alloc; | ||||
| do { | ||||
| alloc = m_alloc; | ||||
| if (alloc >= m_size) { | ||||
| enqueued = 0; | ||||
| return false; | ||||
| } else { | ||||
| enqueued = Math.Min(length, m_size - alloc); | ||||
| } | ||||
| } while (alloc != Interlocked.CompareExchange(ref m_alloc, alloc + enqueued, alloc)); | ||||
| Array.Copy(batch, offset, m_data, alloc, enqueued); | ||||
|
|
r120 | |||
|
|
r234 | AwaitWrites(alloc); | ||
|
|
r233 | m_hi = alloc + enqueued; | ||
|
|
r120 | return true; | ||
| } | ||||
|
|
r233 | public bool TryDequeueBatch(T[] buffer, int offset, int length, out int dequeued, out bool recycle) { | ||
|
|
r121 | int low, hi, batchSize; | ||
| do { | ||||
| low = m_low; | ||||
| hi = m_hi; | ||||
| if (low >= hi) { | ||||
| dequeued = 0; | ||||
|
|
r233 | recycle = (low == m_size); | ||
|
|
r121 | return false; | ||
| } | ||||
| batchSize = Math.Min(hi - low, length); | ||||
|
|
r233 | } while (low != Interlocked.CompareExchange(ref m_low, low + batchSize, low)); | ||
|
|
r121 | |||
| dequeued = batchSize; | ||||
|
|
r233 | recycle = (low + batchSize == m_size); | ||
|
|
r121 | Array.Copy(m_data, low, buffer, offset, batchSize); | ||
| return true; | ||||
| } | ||||
|
|
r119 | public T GetAt(int pos) { | ||
| return m_data[pos]; | ||||
| } | ||||
| } | ||||
| public const int DEFAULT_CHUNK_SIZE = 32; | ||||
|
|
r233 | public const int MAX_CHUNK_SIZE = 256; | ||
|
|
r119 | |||
| Chunk m_first; | ||||
| Chunk m_last; | ||||
|
|
r233 | public AsyncQueue() { | ||
| m_first = m_last = new Chunk(DEFAULT_CHUNK_SIZE); | ||||
| } | ||||
|
|
r121 | /// <summary> | ||
| /// Adds the specified value to the queue. | ||||
| /// </summary> | ||||
| /// <param name="value">Tha value which will be added to the queue.</param> | ||||
|
|
r233 | public void Enqueue(T value) { | ||
|
|
r119 | var last = m_last; | ||
|
|
r233 | SpinWait spin = new SpinWait(); | ||
| while (!last.TryEnqueue(value)) { | ||||
|
|
r119 | // try to extend queue | ||
|
|
r233 | var chunk = new Chunk(DEFAULT_CHUNK_SIZE, value); | ||
| var t = Interlocked.CompareExchange(ref m_last, chunk, last); | ||||
| if (t == last) { | ||||
| last.next = chunk; | ||||
| break; | ||||
|
|
r119 | } else { | ||
|
|
r233 | last = t; | ||
|
|
r119 | } | ||
|
|
r233 | spin.SpinOnce(); | ||
|
|
r119 | } | ||
| } | ||||
|
|
r121 | /// <summary> | ||
| /// Adds the specified data to the queue. | ||||
| /// </summary> | ||||
| /// <param name="data">The buffer which contains the data to be enqueued.</param> | ||||
| /// <param name="offset">The offset of the data in the buffer.</param> | ||||
| /// <param name="length">The size of the data to read from the buffer.</param> | ||||
|
|
r233 | public void EnqueueRange(T[] data, int offset, int length) { | ||
|
|
r121 | if (data == null) | ||
| throw new ArgumentNullException("data"); | ||||
| if (offset < 0) | ||||
| throw new ArgumentOutOfRangeException("offset"); | ||||
| if (length < 1 || offset + length > data.Length) | ||||
| throw new ArgumentOutOfRangeException("length"); | ||||
|
|
r233 | while (length > 0) { | ||
| var last = m_last; | ||||
| int enqueued; | ||||
|
|
r121 | |||
|
|
r233 | if (last.TryEnqueueBatch(data, offset, length, out enqueued)) { | ||
|
|
r121 | length -= enqueued; | ||
| offset += enqueued; | ||||
| } | ||||
|
|
r233 | if (length > 0) { | ||
| // we have something to enqueue | ||||
|
|
r121 | |||
|
|
r233 | var tail = length % MAX_CHUNK_SIZE; | ||
|
|
r121 | |||
|
|
r233 | var chunk = new Chunk(Math.Max(tail, DEFAULT_CHUNK_SIZE), tail); | ||
| if (last != Interlocked.CompareExchange(ref m_last, chunk, last)) | ||||
| continue; // we wasn't able to catch the writer, roundtrip | ||||
|
|
r121 | |||
|
|
r233 | // we are lucky | ||
| // we can exclusively write our batch, the other writers will continue their work | ||||
| length -= tail; | ||||
|
|
r121 | |||
|
|
r233 | |||
| for(var i = 0; i < length; i+= MAX_CHUNK_SIZE) { | ||||
| var node = new Chunk(MAX_CHUNK_SIZE, MAX_CHUNK_SIZE); | ||||
| node.WriteData(data, offset, 0, MAX_CHUNK_SIZE); | ||||
| offset += MAX_CHUNK_SIZE; | ||||
| // fence last.next is volatile | ||||
| last.next = node; | ||||
| last = node; | ||||
|
|
r121 | } | ||
|
|
r233 | if (tail > 0) | ||
| chunk.WriteData(data, offset, 0, tail); | ||||
| // fence last.next is volatile | ||||
| last.next = chunk; | ||||
| return; | ||||
|
|
r121 | } | ||
| } | ||||
| } | ||||
| /// <summary> | ||||
| /// Tries to retrieve the first element from the queue. | ||||
| /// </summary> | ||||
| /// <returns><c>true</c>, if element is dequeued, <c>false</c> otherwise.</returns> | ||||
| /// <param name="value">The value of the dequeued element.</param> | ||||
|
|
r119 | public bool TryDequeue(out T value) { | ||
| var chunk = m_first; | ||||
|
|
r233 | do { | ||
| bool recycle; | ||||
|
|
r119 | |||
| var result = chunk.TryDequeue(out value, out recycle); | ||||
|
|
r233 | if (recycle && chunk.next != null) { | ||
| // this chunk is waste | ||||
| chunk = Interlocked.CompareExchange(ref m_first, chunk.next, chunk); | ||||
| } else { | ||||
|
|
r119 | return result; // this chunk is usable and returned actual result | ||
|
|
r233 | } | ||
|
|
r119 | |||
| if (result) // this chunk is waste but the true result is always actual | ||||
| return true; | ||||
|
|
r233 | } while (true); | ||
|
|
r119 | } | ||
|
|
r121 | /// <summary> | ||
| /// Tries to dequeue the specified amount of data from the queue. | ||||
| /// </summary> | ||||
| /// <returns><c>true</c>, if data was deuqueued, <c>false</c> otherwise.</returns> | ||||
| /// <param name="buffer">The buffer to which the data will be written.</param> | ||||
| /// <param name="offset">The offset in the buffer at which the data will be written.</param> | ||||
| /// <param name="length">The maximum amount of data to be retrieved.</param> | ||||
| /// <param name="dequeued">The actual amout of the retrieved data.</param> | ||||
| public bool TryDequeueRange(T[] buffer, int offset, int length, out int dequeued) { | ||||
| if (buffer == null) | ||||
| throw new ArgumentNullException("buffer"); | ||||
| if (offset < 0) | ||||
| throw new ArgumentOutOfRangeException("offset"); | ||||
| if (length < 1 || offset + length > buffer.Length) | ||||
| throw new ArgumentOutOfRangeException("length"); | ||||
| var chunk = m_first; | ||||
| dequeued = 0; | ||||
|
|
r233 | do { | ||
| bool recycle; | ||||
|
|
r121 | int actual; | ||
| if (chunk.TryDequeueBatch(buffer, offset, length, out actual, out recycle)) { | ||||
| offset += actual; | ||||
| length -= actual; | ||||
| dequeued += actual; | ||||
| } | ||||
|
|
r233 | if (recycle && chunk.next != null) { | ||
| // this chunk is waste | ||||
| chunk = Interlocked.CompareExchange(ref m_first, chunk.next, chunk); | ||||
| } else { | ||||
| chunk = null; | ||||
| } | ||||
|
|
r121 | |||
| if (length == 0) | ||||
| return true; | ||||
|
|
r233 | } while (chunk != null); | ||
|
|
r121 | |||
| return dequeued != 0; | ||||
| } | ||||
| /// <summary> | ||||
| /// Tries to dequeue all remaining data in the first chunk. | ||||
| /// </summary> | ||||
| /// <returns><c>true</c>, if data was dequeued, <c>false</c> otherwise.</returns> | ||||
|
|
r122 | /// <param name="buffer">The buffer to which the data will be written.</param> | ||
|
|
r121 | /// <param name="offset">The offset in the buffer at which the data will be written.</param> | ||
| /// <param name="length">Tha maximum amount of the data to be dequeued.</param> | ||||
| /// <param name="dequeued">The actual amount of the dequeued data.</param> | ||||
| public bool TryDequeueChunk(T[] buffer, int offset, int length, out int dequeued) { | ||||
| if (buffer == null) | ||||
| throw new ArgumentNullException("buffer"); | ||||
| if (offset < 0) | ||||
| throw new ArgumentOutOfRangeException("offset"); | ||||
| if (length < 1 || offset + length > buffer.Length) | ||||
| throw new ArgumentOutOfRangeException("length"); | ||||
| var chunk = m_first; | ||||
|
|
r233 | do { | ||
| bool recycle; | ||||
| chunk.TryDequeueBatch(buffer, offset, length, out dequeued, out recycle); | ||||
|
|
r121 | |||
|
|
r233 | if (recycle && chunk.next != null) { | ||
| // this chunk is waste | ||||
| chunk = Interlocked.CompareExchange(ref m_first, chunk.next, chunk); | ||||
| } else { | ||||
| chunk = null; | ||||
|
|
r121 | } | ||
| // if we have dequeued any data, then return | ||||
| if (dequeued != 0) | ||||
| return true; | ||||
|
|
r233 | } while (chunk != null); | ||
|
|
r121 | |||
| return false; | ||||
| } | ||||
|
|
r233 | |||
|
|
r119 | |||
|
|
r123 | public void Clear() { | ||
| // start the new queue | ||||
|
|
r125 | var chunk = new Chunk(DEFAULT_CHUNK_SIZE); | ||
|
|
r124 | do { | ||
| var first = m_first; | ||||
|
|
r233 | if (first.next == null && first != m_last) { | ||
|
|
r124 | continue; | ||
|
|
r233 | } | ||
|
|
r123 | |||
|
|
r124 | // here we will create inconsistency which will force others to spin | ||
| // and prevent from fetching. chunk.next = null | ||||
| if (first != Interlocked.CompareExchange(ref m_first, chunk, first)) | ||||
| continue;// inconsistent | ||||
|
|
r123 | |||
|
|
r233 | m_last = chunk; | ||
| return; | ||||
| } while (true); | ||||
| } | ||||
| public List<T> Drain() { | ||||
|
|
r234 | Chunk chunk = null; | ||
|
|
r233 | do { | ||
| var first = m_first; | ||||
| // first.next is volatile | ||||
| if (first.next == null) { | ||||
| if (first != m_last) | ||||
| continue; | ||||
| else if (first.Hi == first.Low) | ||||
| return new List<T>(); | ||||
| } | ||||
|
|
r234 | // start the new queue | ||
| if (chunk == null) | ||||
| chunk = new Chunk(DEFAULT_CHUNK_SIZE); | ||||
|
|
r233 | // here we will create inconsistency which will force others to spin | ||
| // and prevent from fetching. chunk.next = null | ||||
| if (first != Interlocked.CompareExchange(ref m_first, chunk, first)) | ||||
| continue;// inconsistent | ||||
| var last = Interlocked.Exchange(ref m_last, chunk); | ||||
|
|
r124 | |||
| return ReadChunks(first, last); | ||||
|
|
r233 | } while (true); | ||
|
|
r123 | } | ||
|
|
r233 | |||
| static List<T> ReadChunks(Chunk chunk, object last) { | ||||
|
|
r123 | var result = new List<T>(); | ||
|
|
r233 | var buffer = new T[MAX_CHUNK_SIZE]; | ||
|
|
r123 | int actual; | ||
| bool recycle; | ||||
|
|
r233 | SpinWait spin = new SpinWait(); | ||
|
|
r123 | while (chunk != null) { | ||
|
|
r124 | // ensure all write operations on the chunk are complete | ||
|
|
r233 | chunk.Seal(); | ||
|
|
r124 | |||
|
|
r123 | // we need to read the chunk using this way | ||
| // since some client still may completing the dequeue | ||||
| // operation, such clients most likely won't get results | ||||
| while (chunk.TryDequeueBatch(buffer, 0, buffer.Length, out actual, out recycle)) | ||||
| result.AddRange(new ArraySegmentCollection(buffer, 0, actual)); | ||||
|
|
r124 | if (chunk == last) { | ||
| chunk = null; | ||||
| } else { | ||||
| while (chunk.next == null) | ||||
|
|
r233 | spin.SpinOnce(); | ||
|
|
r124 | chunk = chunk.next; | ||
| } | ||||
|
|
r123 | } | ||
|
|
r233 | return result; | ||
|
|
r123 | } | ||
| struct ArraySegmentCollection : ICollection<T> { | ||||
| readonly T[] m_data; | ||||
| readonly int m_offset; | ||||
| readonly int m_length; | ||||
| public ArraySegmentCollection(T[] data, int offset, int length) { | ||||
| m_data = data; | ||||
| m_offset = offset; | ||||
| m_length = length; | ||||
| } | ||||
| #region ICollection implementation | ||||
| public void Add(T item) { | ||||
|
|
r129 | throw new NotSupportedException(); | ||
|
|
r123 | } | ||
| public void Clear() { | ||||
|
|
r129 | throw new NotSupportedException(); | ||
|
|
r123 | } | ||
| public bool Contains(T item) { | ||||
| return false; | ||||
| } | ||||
| public void CopyTo(T[] array, int arrayIndex) { | ||||
|
|
r233 | Array.Copy(m_data, m_offset, array, arrayIndex, m_length); | ||
|
|
r123 | } | ||
| public bool Remove(T item) { | ||||
|
|
r129 | throw new NotSupportedException(); | ||
|
|
r123 | } | ||
| public int Count { | ||||
| get { | ||||
| return m_length; | ||||
| } | ||||
| } | ||||
| public bool IsReadOnly { | ||||
| get { | ||||
| return true; | ||||
| } | ||||
| } | ||||
| #endregion | ||||
| #region IEnumerable implementation | ||||
| public IEnumerator<T> GetEnumerator() { | ||||
| for (int i = m_offset; i < m_length + m_offset; i++) | ||||
| yield return m_data[i]; | ||||
| } | ||||
| #endregion | ||||
| #region IEnumerable implementation | ||||
| IEnumerator IEnumerable.GetEnumerator() { | ||||
| return GetEnumerator(); | ||||
| } | ||||
| #endregion | ||||
| } | ||||
|
|
r119 | #region IEnumerable implementation | ||
| class Enumerator : IEnumerator<T> { | ||||
| Chunk m_current; | ||||
| int m_pos = -1; | ||||
| public Enumerator(Chunk fisrt) { | ||||
| m_current = fisrt; | ||||
| } | ||||
| #region IEnumerator implementation | ||||
| public bool MoveNext() { | ||||
| if (m_current == null) | ||||
| return false; | ||||
| if (m_pos == -1) | ||||
| m_pos = m_current.Low; | ||||
| else | ||||
| m_pos++; | ||||
|
|
r127 | |||
|
|
r119 | if (m_pos == m_current.Hi) { | ||
|
|
r127 | |||
| m_current = m_pos == m_current.Size ? m_current.next : null; | ||||
|
|
r119 | m_pos = 0; | ||
|
|
r127 | |||
| if (m_current == null) | ||||
| return false; | ||||
|
|
r119 | } | ||
| return true; | ||||
| } | ||||
| public void Reset() { | ||||
| throw new NotSupportedException(); | ||||
| } | ||||
| object IEnumerator.Current { | ||||
| get { | ||||
| return Current; | ||||
| } | ||||
| } | ||||
| #endregion | ||||
| #region IDisposable implementation | ||||
| public void Dispose() { | ||||
| } | ||||
| #endregion | ||||
| #region IEnumerator implementation | ||||
| public T Current { | ||||
| get { | ||||
| if (m_pos == -1 || m_current == null) | ||||
| throw new InvalidOperationException(); | ||||
| return m_current.GetAt(m_pos); | ||||
| } | ||||
| } | ||||
| #endregion | ||||
| } | ||||
| public IEnumerator<T> GetEnumerator() { | ||||
| return new Enumerator(m_first); | ||||
| } | ||||
| #endregion | ||||
| #region IEnumerable implementation | ||||
| IEnumerator IEnumerable.GetEnumerator() { | ||||
| return GetEnumerator(); | ||||
| } | ||||
| #endregion | ||||
| } | ||||
| } | ||||
