|
|
using System.Threading;
|
|
|
using System.Collections.Generic;
|
|
|
using System;
|
|
|
using System.Collections;
|
|
|
using System.Diagnostics;
|
|
|
|
|
|
namespace Implab.Parallels {
|
|
|
public class AsyncQueue<T> : IEnumerable<T> {
|
|
|
class Chunk {
|
|
|
public Chunk next;
|
|
|
|
|
|
int m_low;
|
|
|
int m_hi;
|
|
|
int m_alloc;
|
|
|
readonly int m_size;
|
|
|
readonly T[] m_data;
|
|
|
|
|
|
public Chunk(int size) {
|
|
|
m_size = size;
|
|
|
m_data = new T[size];
|
|
|
}
|
|
|
|
|
|
public Chunk(int size, T value) {
|
|
|
m_size = size;
|
|
|
m_hi = 1;
|
|
|
m_alloc = 1;
|
|
|
m_data = new T[size];
|
|
|
m_data[0] = value;
|
|
|
}
|
|
|
|
|
|
public Chunk(int size, T[] data, int offset, int length, int alloc) {
|
|
|
m_size = size;
|
|
|
m_hi = length;
|
|
|
m_alloc = alloc;
|
|
|
m_data = new T[size];
|
|
|
Array.Copy(data, offset, m_data, 0, length);
|
|
|
}
|
|
|
|
|
|
public int Low {
|
|
|
get { return m_low; }
|
|
|
}
|
|
|
|
|
|
public int Hi {
|
|
|
get { return m_hi; }
|
|
|
}
|
|
|
|
|
|
public int Size {
|
|
|
get { return m_size; }
|
|
|
}
|
|
|
|
|
|
public bool TryEnqueue(T value, out bool extend) {
|
|
|
var alloc = Interlocked.Increment(ref m_alloc) - 1;
|
|
|
|
|
|
if (alloc >= m_size) {
|
|
|
extend = alloc == m_size;
|
|
|
return false;
|
|
|
}
|
|
|
|
|
|
extend = false;
|
|
|
m_data[alloc] = value;
|
|
|
|
|
|
while (alloc != Interlocked.CompareExchange(ref m_hi, alloc + 1, alloc)) {
|
|
|
// spin wait for commit
|
|
|
}
|
|
|
return true;
|
|
|
}
|
|
|
|
|
|
/// <summary>
|
|
|
/// Prevents from allocating new space in the chunk and waits for all write operations to complete
|
|
|
/// </summary>
|
|
|
public void Commit() {
|
|
|
var actual = Math.Min(Interlocked.Exchange(ref m_alloc, m_size + 1), m_size);
|
|
|
|
|
|
while (m_hi != actual)
|
|
|
Thread.MemoryBarrier();
|
|
|
}
|
|
|
|
|
|
public bool TryDequeue(out T value, out bool recycle) {
|
|
|
int low;
|
|
|
do {
|
|
|
low = m_low;
|
|
|
if (low >= m_hi) {
|
|
|
value = default(T);
|
|
|
recycle = (low == m_size);
|
|
|
return false;
|
|
|
}
|
|
|
} while(low != Interlocked.CompareExchange(ref m_low, low + 1, low));
|
|
|
|
|
|
recycle = (low == m_size - 1);
|
|
|
value = m_data[low];
|
|
|
|
|
|
return true;
|
|
|
}
|
|
|
|
|
|
public bool TryEnqueueBatch(T[] batch, int offset, int length, out int enqueued, out bool extend) {
|
|
|
//int alloc;
|
|
|
//int allocSize;
|
|
|
|
|
|
var alloc = Interlocked.Add(ref m_alloc, length) - length;
|
|
|
if (alloc > m_size) {
|
|
|
// the chunk is full and someone already
|
|
|
// creating the new one
|
|
|
enqueued = 0; // nothing was added
|
|
|
extend = false; // the caller shouldn't try to extend the queue
|
|
|
return false; // nothing was added
|
|
|
}
|
|
|
|
|
|
enqueued = Math.Min(m_size - alloc, length);
|
|
|
extend = length > enqueued;
|
|
|
|
|
|
if (enqueued == 0)
|
|
|
return false;
|
|
|
|
|
|
|
|
|
Array.Copy(batch, offset, m_data, alloc, enqueued);
|
|
|
|
|
|
while (alloc != Interlocked.CompareExchange(ref m_hi, alloc + enqueued, alloc)) {
|
|
|
// spin wait for commit
|
|
|
}
|
|
|
|
|
|
return true;
|
|
|
}
|
|
|
|
|
|
public bool TryDequeueBatch(T[] buffer, int offset, int length,out int dequeued, out bool recycle) {
|
|
|
int low, hi, batchSize;
|
|
|
|
|
|
do {
|
|
|
low = m_low;
|
|
|
hi = m_hi;
|
|
|
if (low >= hi) {
|
|
|
dequeued = 0;
|
|
|
recycle = (low == m_size); // recycling could be restarted and we need to signal again
|
|
|
return false;
|
|
|
}
|
|
|
batchSize = Math.Min(hi - low, length);
|
|
|
} while(low != Interlocked.CompareExchange(ref m_low, low + batchSize, low));
|
|
|
|
|
|
recycle = (low == m_size - batchSize);
|
|
|
dequeued = batchSize;
|
|
|
|
|
|
Array.Copy(m_data, low, buffer, offset, batchSize);
|
|
|
|
|
|
return true;
|
|
|
}
|
|
|
|
|
|
public T GetAt(int pos) {
|
|
|
return m_data[pos];
|
|
|
}
|
|
|
}
|
|
|
|
|
|
public const int DEFAULT_CHUNK_SIZE = 32;
|
|
|
public const int MAX_CHUNK_SIZE = 262144;
|
|
|
|
|
|
Chunk m_first;
|
|
|
Chunk m_last;
|
|
|
|
|
|
/// <summary>
|
|
|
/// Adds the specified value to the queue.
|
|
|
/// </summary>
|
|
|
/// <param name="value">Tha value which will be added to the queue.</param>
|
|
|
public void Enqueue(T value) {
|
|
|
var last = m_last;
|
|
|
// spin wait to the new chunk
|
|
|
bool extend = true;
|
|
|
while (last == null || !last.TryEnqueue(value, out extend)) {
|
|
|
// try to extend queue
|
|
|
if (extend || last == null) {
|
|
|
var chunk = new Chunk(DEFAULT_CHUNK_SIZE, value);
|
|
|
if (EnqueueChunk(last, chunk))
|
|
|
break; // success! exit!
|
|
|
last = m_last;
|
|
|
} else {
|
|
|
while (last == m_last) {
|
|
|
Thread.MemoryBarrier();
|
|
|
}
|
|
|
last = m_last;
|
|
|
}
|
|
|
}
|
|
|
}
|
|
|
|
|
|
/// <summary>
|
|
|
/// Adds the specified data to the queue.
|
|
|
/// </summary>
|
|
|
/// <param name="data">The buffer which contains the data to be enqueued.</param>
|
|
|
/// <param name="offset">The offset of the data in the buffer.</param>
|
|
|
/// <param name="length">The size of the data to read from the buffer.</param>
|
|
|
public void EnqueueRange(T[] data, int offset, int length) {
|
|
|
if (data == null)
|
|
|
throw new ArgumentNullException("data");
|
|
|
if (offset < 0)
|
|
|
throw new ArgumentOutOfRangeException("offset");
|
|
|
if (length < 1 || offset + length > data.Length)
|
|
|
throw new ArgumentOutOfRangeException("length");
|
|
|
|
|
|
var last = m_last;
|
|
|
|
|
|
bool extend;
|
|
|
int enqueued;
|
|
|
|
|
|
while (length > 0) {
|
|
|
extend = true;
|
|
|
if (last != null && last.TryEnqueueBatch(data, offset, length, out enqueued, out extend)) {
|
|
|
length -= enqueued;
|
|
|
offset += enqueued;
|
|
|
}
|
|
|
|
|
|
if (extend) {
|
|
|
// there was no enough space in the chunk
|
|
|
// or there was no chunks in the queue
|
|
|
|
|
|
while (length > 0) {
|
|
|
|
|
|
var size = Math.Min(length, MAX_CHUNK_SIZE);
|
|
|
|
|
|
var chunk = new Chunk(
|
|
|
Math.Max(size, DEFAULT_CHUNK_SIZE),
|
|
|
data,
|
|
|
offset,
|
|
|
size,
|
|
|
length // length >= size
|
|
|
);
|
|
|
|
|
|
if (!EnqueueChunk(last, chunk)) {
|
|
|
// looks like the queue has been updated then proceed from the beginning
|
|
|
last = m_last;
|
|
|
break;
|
|
|
}
|
|
|
|
|
|
// we have successfully added the new chunk
|
|
|
last = chunk;
|
|
|
length -= size;
|
|
|
offset += size;
|
|
|
}
|
|
|
} else {
|
|
|
// we don't need to extend the queue, if we successfully enqueued data
|
|
|
if (length == 0)
|
|
|
break;
|
|
|
|
|
|
// if we need to wait while someone is extending the queue
|
|
|
// spinwait
|
|
|
while (last == m_last) {
|
|
|
Thread.MemoryBarrier();
|
|
|
}
|
|
|
|
|
|
last = m_last;
|
|
|
}
|
|
|
}
|
|
|
}
|
|
|
|
|
|
/// <summary>
|
|
|
/// Tries to retrieve the first element from the queue.
|
|
|
/// </summary>
|
|
|
/// <returns><c>true</c>, if element is dequeued, <c>false</c> otherwise.</returns>
|
|
|
/// <param name="value">The value of the dequeued element.</param>
|
|
|
public bool TryDequeue(out T value) {
|
|
|
var chunk = m_first;
|
|
|
bool recycle;
|
|
|
while (chunk != null) {
|
|
|
|
|
|
var result = chunk.TryDequeue(out value, out recycle);
|
|
|
|
|
|
if (recycle) // this chunk is waste
|
|
|
RecycleFirstChunk(chunk);
|
|
|
else
|
|
|
return result; // this chunk is usable and returned actual result
|
|
|
|
|
|
if (result) // this chunk is waste but the true result is always actual
|
|
|
return true;
|
|
|
|
|
|
// try again
|
|
|
chunk = m_first;
|
|
|
}
|
|
|
|
|
|
// the queue is empty
|
|
|
value = default(T);
|
|
|
return false;
|
|
|
}
|
|
|
|
|
|
/// <summary>
|
|
|
/// Tries to dequeue the specified amount of data from the queue.
|
|
|
/// </summary>
|
|
|
/// <returns><c>true</c>, if data was deuqueued, <c>false</c> otherwise.</returns>
|
|
|
/// <param name="buffer">The buffer to which the data will be written.</param>
|
|
|
/// <param name="offset">The offset in the buffer at which the data will be written.</param>
|
|
|
/// <param name="length">The maximum amount of data to be retrieved.</param>
|
|
|
/// <param name="dequeued">The actual amout of the retrieved data.</param>
|
|
|
public bool TryDequeueRange(T[] buffer, int offset, int length, out int dequeued) {
|
|
|
if (buffer == null)
|
|
|
throw new ArgumentNullException("buffer");
|
|
|
if (offset < 0)
|
|
|
throw new ArgumentOutOfRangeException("offset");
|
|
|
if (length < 1 || offset + length > buffer.Length)
|
|
|
throw new ArgumentOutOfRangeException("length");
|
|
|
|
|
|
var chunk = m_first;
|
|
|
bool recycle;
|
|
|
dequeued = 0;
|
|
|
while (chunk != null) {
|
|
|
|
|
|
int actual;
|
|
|
if (chunk.TryDequeueBatch(buffer, offset, length, out actual, out recycle)) {
|
|
|
offset += actual;
|
|
|
length -= actual;
|
|
|
dequeued += actual;
|
|
|
}
|
|
|
|
|
|
if (recycle) // this chunk is waste
|
|
|
RecycleFirstChunk(chunk);
|
|
|
else if (actual == 0)
|
|
|
break; // the chunk is usable, but doesn't contain any data (it's the last chunk in the queue)
|
|
|
|
|
|
if (length == 0)
|
|
|
return true;
|
|
|
|
|
|
// we still may dequeue something
|
|
|
// try again
|
|
|
chunk = m_first;
|
|
|
}
|
|
|
|
|
|
return dequeued != 0;
|
|
|
}
|
|
|
|
|
|
/// <summary>
|
|
|
/// Tries to dequeue all remaining data in the first chunk.
|
|
|
/// </summary>
|
|
|
/// <returns><c>true</c>, if data was dequeued, <c>false</c> otherwise.</returns>
|
|
|
/// <param name="buffer">The buffer to which the data will be written.</param>
|
|
|
/// <param name="offset">The offset in the buffer at which the data will be written.</param>
|
|
|
/// <param name="length">Tha maximum amount of the data to be dequeued.</param>
|
|
|
/// <param name="dequeued">The actual amount of the dequeued data.</param>
|
|
|
public bool TryDequeueChunk(T[] buffer, int offset, int length, out int dequeued) {
|
|
|
if (buffer == null)
|
|
|
throw new ArgumentNullException("buffer");
|
|
|
if (offset < 0)
|
|
|
throw new ArgumentOutOfRangeException("offset");
|
|
|
if (length < 1 || offset + length > buffer.Length)
|
|
|
throw new ArgumentOutOfRangeException("length");
|
|
|
|
|
|
var chunk = m_first;
|
|
|
bool recycle;
|
|
|
dequeued = 0;
|
|
|
|
|
|
while (chunk != null) {
|
|
|
|
|
|
int actual;
|
|
|
if (chunk.TryDequeueBatch(buffer, offset, length, out actual, out recycle)) {
|
|
|
dequeued = actual;
|
|
|
}
|
|
|
|
|
|
if (recycle) // this chunk is waste
|
|
|
RecycleFirstChunk(chunk);
|
|
|
|
|
|
// if we have dequeued any data, then return
|
|
|
if (dequeued != 0)
|
|
|
return true;
|
|
|
|
|
|
// we still may dequeue something
|
|
|
// try again
|
|
|
chunk = m_first;
|
|
|
}
|
|
|
|
|
|
return false;
|
|
|
}
|
|
|
|
|
|
bool EnqueueChunk(Chunk last, Chunk chunk) {
|
|
|
if (Interlocked.CompareExchange(ref m_last, chunk, last) != last)
|
|
|
return false;
|
|
|
|
|
|
if (last != null)
|
|
|
last.next = chunk;
|
|
|
else {
|
|
|
m_first = chunk;
|
|
|
}
|
|
|
return true;
|
|
|
}
|
|
|
|
|
|
void RecycleFirstChunk(Chunk first) {
|
|
|
var next = first.next;
|
|
|
|
|
|
if (first != Interlocked.CompareExchange(ref m_first, next, first))
|
|
|
return;
|
|
|
|
|
|
if (next == null) {
|
|
|
|
|
|
if (first != Interlocked.CompareExchange(ref m_last, null, first)) {
|
|
|
/*while (first.next == null)
|
|
|
Thread.MemoryBarrier();*/
|
|
|
|
|
|
// race
|
|
|
// someone already updated the tail, restore the pointer to the queue head
|
|
|
m_first = first;
|
|
|
}
|
|
|
// the tail is updated
|
|
|
}
|
|
|
|
|
|
// we need to update the head
|
|
|
//Interlocked.CompareExchange(ref m_first, next, first);
|
|
|
// if the head is already updated then give up
|
|
|
//return;
|
|
|
|
|
|
}
|
|
|
|
|
|
public void Clear() {
|
|
|
// start the new queue
|
|
|
var chunk = new Chunk(DEFAULT_CHUNK_SIZE);
|
|
|
|
|
|
do {
|
|
|
Thread.MemoryBarrier();
|
|
|
var first = m_first;
|
|
|
var last = m_last;
|
|
|
|
|
|
if (last == null) // nothing to clear
|
|
|
return;
|
|
|
|
|
|
if (first == null || (first.next == null && first != last)) // inconcistency
|
|
|
continue;
|
|
|
|
|
|
// here we will create inconsistency which will force others to spin
|
|
|
// and prevent from fetching. chunk.next = null
|
|
|
if (first != Interlocked.CompareExchange(ref m_first, chunk, first))
|
|
|
continue;// inconsistent
|
|
|
|
|
|
m_last = chunk;
|
|
|
|
|
|
return;
|
|
|
|
|
|
} while(true);
|
|
|
}
|
|
|
|
|
|
public T[] Drain() {
|
|
|
// start the new queue
|
|
|
var chunk = new Chunk(DEFAULT_CHUNK_SIZE);
|
|
|
|
|
|
do {
|
|
|
Thread.MemoryBarrier();
|
|
|
var first = m_first;
|
|
|
var last = m_last;
|
|
|
|
|
|
if (last == null)
|
|
|
return new T[0];
|
|
|
|
|
|
if (first == null || (first.next == null && first != last))
|
|
|
continue;
|
|
|
|
|
|
// here we will create inconsistency which will force others to spin
|
|
|
// and prevent from fetching. chunk.next = null
|
|
|
if (first != Interlocked.CompareExchange(ref m_first, chunk, first))
|
|
|
continue;// inconsistent
|
|
|
|
|
|
last = Interlocked.Exchange(ref m_last, chunk);
|
|
|
|
|
|
return ReadChunks(first, last);
|
|
|
|
|
|
} while(true);
|
|
|
}
|
|
|
|
|
|
T[] ReadChunks(Chunk chunk, object last) {
|
|
|
var result = new List<T>();
|
|
|
var buffer = new T[DEFAULT_CHUNK_SIZE];
|
|
|
int actual;
|
|
|
bool recycle;
|
|
|
while (chunk != null) {
|
|
|
// ensure all write operations on the chunk are complete
|
|
|
chunk.Commit();
|
|
|
|
|
|
// we need to read the chunk using this way
|
|
|
// since some client still may completing the dequeue
|
|
|
// operation, such clients most likely won't get results
|
|
|
while (chunk.TryDequeueBatch(buffer, 0, buffer.Length, out actual, out recycle))
|
|
|
result.AddRange(new ArraySegmentCollection(buffer, 0, actual));
|
|
|
|
|
|
if (chunk == last) {
|
|
|
chunk = null;
|
|
|
} else {
|
|
|
while (chunk.next == null)
|
|
|
Thread.MemoryBarrier();
|
|
|
chunk = chunk.next;
|
|
|
}
|
|
|
}
|
|
|
|
|
|
return result.ToArray();
|
|
|
}
|
|
|
|
|
|
struct ArraySegmentCollection : ICollection<T> {
|
|
|
readonly T[] m_data;
|
|
|
readonly int m_offset;
|
|
|
readonly int m_length;
|
|
|
|
|
|
public ArraySegmentCollection(T[] data, int offset, int length) {
|
|
|
m_data = data;
|
|
|
m_offset = offset;
|
|
|
m_length = length;
|
|
|
}
|
|
|
|
|
|
#region ICollection implementation
|
|
|
|
|
|
public void Add(T item) {
|
|
|
throw new InvalidOperationException();
|
|
|
}
|
|
|
|
|
|
public void Clear() {
|
|
|
throw new InvalidOperationException();
|
|
|
}
|
|
|
|
|
|
public bool Contains(T item) {
|
|
|
return false;
|
|
|
}
|
|
|
|
|
|
public void CopyTo(T[] array, int arrayIndex) {
|
|
|
Array.Copy(m_data,m_offset,array,arrayIndex, m_length);
|
|
|
}
|
|
|
|
|
|
public bool Remove(T item) {
|
|
|
throw new NotImplementedException();
|
|
|
}
|
|
|
|
|
|
public int Count {
|
|
|
get {
|
|
|
return m_length;
|
|
|
}
|
|
|
}
|
|
|
|
|
|
public bool IsReadOnly {
|
|
|
get {
|
|
|
return true;
|
|
|
}
|
|
|
}
|
|
|
|
|
|
#endregion
|
|
|
|
|
|
#region IEnumerable implementation
|
|
|
|
|
|
public IEnumerator<T> GetEnumerator() {
|
|
|
for (int i = m_offset; i < m_length + m_offset; i++)
|
|
|
yield return m_data[i];
|
|
|
}
|
|
|
|
|
|
#endregion
|
|
|
|
|
|
#region IEnumerable implementation
|
|
|
|
|
|
IEnumerator IEnumerable.GetEnumerator() {
|
|
|
return GetEnumerator();
|
|
|
}
|
|
|
|
|
|
#endregion
|
|
|
}
|
|
|
|
|
|
#region IEnumerable implementation
|
|
|
|
|
|
class Enumerator : IEnumerator<T> {
|
|
|
Chunk m_current;
|
|
|
int m_pos = -1;
|
|
|
|
|
|
public Enumerator(Chunk fisrt) {
|
|
|
m_current = fisrt;
|
|
|
}
|
|
|
|
|
|
#region IEnumerator implementation
|
|
|
|
|
|
public bool MoveNext() {
|
|
|
if (m_current == null)
|
|
|
return false;
|
|
|
|
|
|
if (m_pos == -1)
|
|
|
m_pos = m_current.Low;
|
|
|
else
|
|
|
m_pos++;
|
|
|
|
|
|
if (m_pos == m_current.Hi) {
|
|
|
|
|
|
m_current = m_pos == m_current.Size ? m_current.next : null;
|
|
|
|
|
|
m_pos = 0;
|
|
|
|
|
|
if (m_current == null)
|
|
|
return false;
|
|
|
}
|
|
|
|
|
|
return true;
|
|
|
}
|
|
|
|
|
|
public void Reset() {
|
|
|
throw new NotSupportedException();
|
|
|
}
|
|
|
|
|
|
object IEnumerator.Current {
|
|
|
get {
|
|
|
return Current;
|
|
|
}
|
|
|
}
|
|
|
|
|
|
#endregion
|
|
|
|
|
|
#region IDisposable implementation
|
|
|
|
|
|
public void Dispose() {
|
|
|
}
|
|
|
|
|
|
#endregion
|
|
|
|
|
|
#region IEnumerator implementation
|
|
|
|
|
|
public T Current {
|
|
|
get {
|
|
|
if (m_pos == -1 || m_current == null)
|
|
|
throw new InvalidOperationException();
|
|
|
return m_current.GetAt(m_pos);
|
|
|
}
|
|
|
}
|
|
|
|
|
|
#endregion
|
|
|
}
|
|
|
|
|
|
public IEnumerator<T> GetEnumerator() {
|
|
|
return new Enumerator(m_first);
|
|
|
}
|
|
|
|
|
|
#endregion
|
|
|
|
|
|
#region IEnumerable implementation
|
|
|
|
|
|
IEnumerator IEnumerable.GetEnumerator() {
|
|
|
return GetEnumerator();
|
|
|
}
|
|
|
|
|
|
#endregion
|
|
|
}
|
|
|
}
|
|
|
|