using System.Threading; using System.Collections.Generic; using System; using System.Collections; using System.Diagnostics; namespace Implab.Parallels { public class AsyncQueue : IEnumerable { class Chunk { public Chunk next; int m_low; int m_hi; int m_alloc; readonly int m_size; readonly T[] m_data; public Chunk(int size) { m_size = size; m_data = new T[size]; } public Chunk(int size, T value) { m_size = size; m_hi = 1; m_alloc = 1; m_data = new T[size]; m_data[0] = value; } public Chunk(int size, T[] data, int offset, int length, int alloc) { m_size = size; m_hi = length; m_alloc = alloc; m_data = new T[size]; Array.Copy(data, offset, m_data, 0, length); } public int Low { get { return m_low; } } public int Hi { get { return m_hi; } } public bool TryEnqueue(T value, out bool extend) { var alloc = Interlocked.Increment(ref m_alloc) - 1; if (alloc >= m_size) { extend = alloc == m_size; return false; } extend = false; m_data[alloc] = value; while (alloc != Interlocked.CompareExchange(ref m_hi, alloc + 1, alloc)) { // spin wait for commit } return true; } /// /// Prevents from allocating new space in the chunk and waits for all write operations to complete /// public void Commit() { var actual = Math.Min(Interlocked.Exchange(ref m_alloc, m_size + 1), m_size); while (m_hi != actual) Thread.MemoryBarrier(); } public bool TryDequeue(out T value, out bool recycle) { int low; do { low = m_low; if (low >= m_hi) { value = default(T); recycle = (low == m_size); return false; } } while(low != Interlocked.CompareExchange(ref m_low, low + 1, low)); recycle = (low == m_size - 1); value = m_data[low]; return true; } public bool TryEnqueueBatch(T[] batch, int offset, int length, out int enqueued, out bool extend) { //int alloc; //int allocSize; var alloc = Interlocked.Add(ref m_alloc, length) - length; if (alloc > m_size) { // the chunk is full and someone already // creating the new one enqueued = 0; // nothing was added extend = false; // the caller shouldn't try to extend the queue return false; // nothing was added } enqueued = Math.Min(m_size - alloc, length); extend = length > enqueued; if (enqueued == 0) return false; Array.Copy(batch, offset, m_data, alloc, enqueued); while (alloc != Interlocked.CompareExchange(ref m_hi, alloc + enqueued, alloc)) { // spin wait for commit } return true; } public bool TryDequeueBatch(T[] buffer, int offset, int length,out int dequeued, out bool recycle) { int low, hi, batchSize; do { low = m_low; hi = m_hi; if (low >= hi) { dequeued = 0; recycle = (low == m_size); // recycling could be restarted and we need to signal again return false; } batchSize = Math.Min(hi - low, length); } while(low != Interlocked.CompareExchange(ref m_low, low + batchSize, low)); recycle = (low == m_size - batchSize); dequeued = batchSize; Array.Copy(m_data, low, buffer, offset, batchSize); return true; } public T GetAt(int pos) { return m_data[pos]; } } public const int DEFAULT_CHUNK_SIZE = 32; public const int MAX_CHUNK_SIZE = 262144; readonly int m_chunkSize = DEFAULT_CHUNK_SIZE; Chunk m_first; Chunk m_last; public AsyncQueue() { m_last = m_first = new Chunk(m_chunkSize); } /// /// Adds the specified value to the queue. /// /// Tha value which will be added to the queue. public void Enqueue(T value) { var last = m_last; // spin wait to the new chunk bool extend = true; while (last == null || !last.TryEnqueue(value, out extend)) { // try to extend queue if (extend || last == null) { var chunk = new Chunk(m_chunkSize, value); if (EnqueueChunk(last, chunk)) break; // success! exit! last = m_last; } else { while (last == m_last) { Thread.MemoryBarrier(); } last = m_last; } } } /// /// Adds the specified data to the queue. /// /// The buffer which contains the data to be enqueued. /// The offset of the data in the buffer. /// The size of the data to read from the buffer. public void EnqueueRange(T[] data, int offset, int length) { if (data == null) throw new ArgumentNullException("data"); if (offset < 0) throw new ArgumentOutOfRangeException("offset"); if (length < 1 || offset + length > data.Length) throw new ArgumentOutOfRangeException("length"); var last = m_last; bool extend; int enqueued; while (length > 0) { extend = true; if (last != null && last.TryEnqueueBatch(data, offset, length, out enqueued, out extend)) { length -= enqueued; offset += enqueued; } if (extend) { // there was no enough space in the chunk // or there was no chunks in the queue while (length > 0) { var size = Math.Min(length, MAX_CHUNK_SIZE); var chunk = new Chunk( Math.Max(size, m_chunkSize), data, offset, size, length // length >= size ); if (!EnqueueChunk(last, chunk)) { // looks like the queue has been updated then proceed from the beginning last = m_last; break; } // we have successfully added the new chunk last = chunk; length -= size; offset += size; } } else { // we don't need to extend the queue, if we successfully enqueued data if (length == 0) break; // if we need to wait while someone is extending the queue // spinwait while (last == m_last) { Thread.MemoryBarrier(); } last = m_last; } } } /// /// Tries to retrieve the first element from the queue. /// /// true, if element is dequeued, false otherwise. /// The value of the dequeued element. public bool TryDequeue(out T value) { var chunk = m_first; bool recycle; while (chunk != null) { var result = chunk.TryDequeue(out value, out recycle); if (recycle) // this chunk is waste RecycleFirstChunk(chunk); else return result; // this chunk is usable and returned actual result if (result) // this chunk is waste but the true result is always actual return true; // try again chunk = m_first; } // the queue is empty value = default(T); return false; } /// /// Tries to dequeue the specified amount of data from the queue. /// /// true, if data was deuqueued, false otherwise. /// The buffer to which the data will be written. /// The offset in the buffer at which the data will be written. /// The maximum amount of data to be retrieved. /// The actual amout of the retrieved data. public bool TryDequeueRange(T[] buffer, int offset, int length, out int dequeued) { if (buffer == null) throw new ArgumentNullException("buffer"); if (offset < 0) throw new ArgumentOutOfRangeException("offset"); if (length < 1 || offset + length > buffer.Length) throw new ArgumentOutOfRangeException("length"); var chunk = m_first; bool recycle; dequeued = 0; while (chunk != null) { int actual; if (chunk.TryDequeueBatch(buffer, offset, length, out actual, out recycle)) { offset += actual; length -= actual; dequeued += actual; } if (recycle) // this chunk is waste RecycleFirstChunk(chunk); else if (actual == 0) break; // the chunk is usable, but doesn't contain any data (it's the last chunk in the queue) if (length == 0) return true; // we still may dequeue something // try again chunk = m_first; } return dequeued != 0; } /// /// Tries to dequeue all remaining data in the first chunk. /// /// true, if data was dequeued, false otherwise. /// The buffer to which the data will be written. /// The offset in the buffer at which the data will be written. /// Tha maximum amount of the data to be dequeued. /// The actual amount of the dequeued data. public bool TryDequeueChunk(T[] buffer, int offset, int length, out int dequeued) { if (buffer == null) throw new ArgumentNullException("buffer"); if (offset < 0) throw new ArgumentOutOfRangeException("offset"); if (length < 1 || offset + length > buffer.Length) throw new ArgumentOutOfRangeException("length"); var chunk = m_first; bool recycle; dequeued = 0; while (chunk != null) { int actual; if (chunk.TryDequeueBatch(buffer, offset, length, out actual, out recycle)) { dequeued = actual; } if (recycle) // this chunk is waste RecycleFirstChunk(chunk); // if we have dequeued any data, then return if (dequeued != 0) return true; // we still may dequeue something // try again chunk = m_first; } return false; } bool EnqueueChunk(Chunk last, Chunk chunk) { if (Interlocked.CompareExchange(ref m_last, chunk, last) != last) return false; if (last != null) last.next = chunk; else { m_first = chunk; } return true; } void RecycleFirstChunk(Chunk first) { var next = first.next; if (first != Interlocked.CompareExchange(ref m_first, next, first)) return; if (next == null) { if (first != Interlocked.CompareExchange(ref m_last, null, first)) { /*while (first.next == null) Thread.MemoryBarrier();*/ // race // someone already updated the tail, restore the pointer to the queue head m_first = first; } // the tail is updated } // we need to update the head //Interlocked.CompareExchange(ref m_first, next, first); // if the head is already updated then give up //return; } public void Clear() { // start the new queue var chunk = new Chunk(m_chunkSize); do { Thread.MemoryBarrier(); var first = m_first; var last = m_last; if (last == null) // nothing to clear return; if (first == null || (first.next == null && first != last)) // inconcistency continue; // here we will create inconsistency which will force others to spin // and prevent from fetching. chunk.next = null if (first != Interlocked.CompareExchange(ref m_first, chunk, first)) continue;// inconsistent m_last = chunk; return; } while(true); } public T[] Drain() { // start the new queue var chunk = new Chunk(m_chunkSize); do { Thread.MemoryBarrier(); var first = m_first; var last = m_last; if (last == null) return new T[0]; if (first == null || (first.next == null && first != last)) continue; // here we will create inconsistency which will force others to spin // and prevent from fetching. chunk.next = null if (first != Interlocked.CompareExchange(ref m_first, chunk, first)) continue;// inconsistent last = Interlocked.Exchange(ref m_last, chunk); return ReadChunks(first, last); } while(true); } T[] ReadChunks(Chunk chunk, object last) { var result = new List(); var buffer = new T[m_chunkSize]; int actual; bool recycle; while (chunk != null) { // ensure all write operations on the chunk are complete chunk.Commit(); // we need to read the chunk using this way // since some client still may completing the dequeue // operation, such clients most likely won't get results while (chunk.TryDequeueBatch(buffer, 0, buffer.Length, out actual, out recycle)) result.AddRange(new ArraySegmentCollection(buffer, 0, actual)); if (chunk == last) { chunk = null; } else { while (chunk.next == null) Thread.MemoryBarrier(); chunk = chunk.next; } } return result.ToArray(); } struct ArraySegmentCollection : ICollection { readonly T[] m_data; readonly int m_offset; readonly int m_length; public ArraySegmentCollection(T[] data, int offset, int length) { m_data = data; m_offset = offset; m_length = length; } #region ICollection implementation public void Add(T item) { throw new InvalidOperationException(); } public void Clear() { throw new InvalidOperationException(); } public bool Contains(T item) { return false; } public void CopyTo(T[] array, int arrayIndex) { Array.Copy(m_data,m_offset,array,arrayIndex, m_length); } public bool Remove(T item) { throw new NotImplementedException(); } public int Count { get { return m_length; } } public bool IsReadOnly { get { return true; } } #endregion #region IEnumerable implementation public IEnumerator GetEnumerator() { for (int i = m_offset; i < m_length + m_offset; i++) yield return m_data[i]; } #endregion #region IEnumerable implementation IEnumerator IEnumerable.GetEnumerator() { return GetEnumerator(); } #endregion } #region IEnumerable implementation class Enumerator : IEnumerator { Chunk m_current; int m_pos = -1; public Enumerator(Chunk fisrt) { m_current = fisrt; } #region IEnumerator implementation public bool MoveNext() { if (m_current == null) return false; if (m_pos == -1) m_pos = m_current.Low; else m_pos++; if (m_pos == m_current.Hi) { m_pos = 0; m_current = m_current.next; } return true; } public void Reset() { throw new NotSupportedException(); } object IEnumerator.Current { get { return Current; } } #endregion #region IDisposable implementation public void Dispose() { } #endregion #region IEnumerator implementation public T Current { get { if (m_pos == -1 || m_current == null) throw new InvalidOperationException(); return m_current.GetAt(m_pos); } } #endregion } public IEnumerator GetEnumerator() { return new Enumerator(m_first); } #endregion #region IEnumerable implementation IEnumerator IEnumerable.GetEnumerator() { return GetEnumerator(); } #endregion } }