##// END OF EJS Templates
fixed promise chaining behavior, the error handler doesn't handle result or cancellation handlers exceptions these exceptions are propagated to the next handlers.
fixed promise chaining behavior, the error handler doesn't handle result or cancellation handlers exceptions these exceptions are propagated to the next handlers.

File last commit:

r137:238e15580926 v2
r196:40d7fed4a09e default
Show More
AsyncQueue.cs
631 lines | 20.6 KiB | text/x-csharp | CSharpLexer
using System.Threading;
using System.Collections.Generic;
using System;
using System.Collections;
using System.Diagnostics;
namespace Implab.Parallels {
public class AsyncQueue<T> : IEnumerable<T> {
class Chunk {
public Chunk next;
int m_low;
int m_hi;
int m_alloc;
readonly int m_size;
readonly T[] m_data;
public Chunk(int size) {
m_size = size;
m_data = new T[size];
}
public Chunk(int size, T value) {
m_size = size;
m_hi = 1;
m_alloc = 1;
m_data = new T[size];
m_data[0] = value;
}
public Chunk(int size, T[] data, int offset, int length, int alloc) {
m_size = size;
m_hi = length;
m_alloc = alloc;
m_data = new T[size];
Array.Copy(data, offset, m_data, 0, length);
}
public int Low {
get { return m_low; }
}
public int Hi {
get { return m_hi; }
}
public int Size {
get { return m_size; }
}
public bool TryEnqueue(T value, out bool extend) {
var alloc = Interlocked.Increment(ref m_alloc) - 1;
if (alloc >= m_size) {
extend = alloc == m_size;
return false;
}
extend = false;
m_data[alloc] = value;
while (alloc != Interlocked.CompareExchange(ref m_hi, alloc + 1, alloc)) {
// spin wait for commit
}
return true;
}
/// <summary>
/// Prevents from allocating new space in the chunk and waits for all write operations to complete
/// </summary>
public void Commit() {
var actual = Math.Min(Interlocked.Exchange(ref m_alloc, m_size + 1), m_size);
while (m_hi != actual)
Thread.MemoryBarrier();
}
public bool TryDequeue(out T value, out bool recycle) {
int low;
do {
low = m_low;
if (low >= m_hi) {
value = default(T);
recycle = (low == m_size);
return false;
}
} while(low != Interlocked.CompareExchange(ref m_low, low + 1, low));
recycle = (low == m_size - 1);
value = m_data[low];
return true;
}
public bool TryEnqueueBatch(T[] batch, int offset, int length, out int enqueued, out bool extend) {
//int alloc;
//int allocSize;
var alloc = Interlocked.Add(ref m_alloc, length) - length;
if (alloc > m_size) {
// the chunk is full and someone already
// creating the new one
enqueued = 0; // nothing was added
extend = false; // the caller shouldn't try to extend the queue
return false; // nothing was added
}
enqueued = Math.Min(m_size - alloc, length);
extend = length > enqueued;
if (enqueued == 0)
return false;
Array.Copy(batch, offset, m_data, alloc, enqueued);
while (alloc != Interlocked.CompareExchange(ref m_hi, alloc + enqueued, alloc)) {
// spin wait for commit
}
return true;
}
public bool TryDequeueBatch(T[] buffer, int offset, int length,out int dequeued, out bool recycle) {
int low, hi, batchSize;
do {
low = m_low;
hi = m_hi;
if (low >= hi) {
dequeued = 0;
recycle = (low == m_size); // recycling could be restarted and we need to signal again
return false;
}
batchSize = Math.Min(hi - low, length);
} while(low != Interlocked.CompareExchange(ref m_low, low + batchSize, low));
recycle = (low == m_size - batchSize);
dequeued = batchSize;
Array.Copy(m_data, low, buffer, offset, batchSize);
return true;
}
public T GetAt(int pos) {
return m_data[pos];
}
}
public const int DEFAULT_CHUNK_SIZE = 32;
public const int MAX_CHUNK_SIZE = 262144;
Chunk m_first;
Chunk m_last;
/// <summary>
/// Adds the specified value to the queue.
/// </summary>
/// <param name="value">Tha value which will be added to the queue.</param>
public virtual void Enqueue(T value) {
var last = m_last;
// spin wait to the new chunk
bool extend = true;
while (last == null || !last.TryEnqueue(value, out extend)) {
// try to extend queue
if (extend || last == null) {
var chunk = new Chunk(DEFAULT_CHUNK_SIZE, value);
if (EnqueueChunk(last, chunk))
break; // success! exit!
last = m_last;
} else {
while (last == m_last) {
Thread.MemoryBarrier();
}
last = m_last;
}
}
}
/// <summary>
/// Adds the specified data to the queue.
/// </summary>
/// <param name="data">The buffer which contains the data to be enqueued.</param>
/// <param name="offset">The offset of the data in the buffer.</param>
/// <param name="length">The size of the data to read from the buffer.</param>
public virtual void EnqueueRange(T[] data, int offset, int length) {
if (data == null)
throw new ArgumentNullException("data");
if (length == 0)
return;
if (offset < 0)
throw new ArgumentOutOfRangeException("offset");
if (length < 1 || offset + length > data.Length)
throw new ArgumentOutOfRangeException("length");
var last = m_last;
bool extend;
int enqueued;
while (length > 0) {
extend = true;
if (last != null && last.TryEnqueueBatch(data, offset, length, out enqueued, out extend)) {
length -= enqueued;
offset += enqueued;
}
if (extend) {
// there was no enough space in the chunk
// or there was no chunks in the queue
while (length > 0) {
var size = Math.Min(length, MAX_CHUNK_SIZE);
var chunk = new Chunk(
Math.Max(size, DEFAULT_CHUNK_SIZE),
data,
offset,
size,
length // length >= size
);
if (!EnqueueChunk(last, chunk)) {
// looks like the queue has been updated then proceed from the beginning
last = m_last;
break;
}
// we have successfully added the new chunk
last = chunk;
length -= size;
offset += size;
}
} else {
// we don't need to extend the queue, if we successfully enqueued data
if (length == 0)
break;
// if we need to wait while someone is extending the queue
// spinwait
while (last == m_last) {
Thread.MemoryBarrier();
}
last = m_last;
}
}
}
/// <summary>
/// Tries to retrieve the first element from the queue.
/// </summary>
/// <returns><c>true</c>, if element is dequeued, <c>false</c> otherwise.</returns>
/// <param name="value">The value of the dequeued element.</param>
public bool TryDequeue(out T value) {
var chunk = m_first;
bool recycle;
while (chunk != null) {
var result = chunk.TryDequeue(out value, out recycle);
if (recycle) // this chunk is waste
RecycleFirstChunk(chunk);
else
return result; // this chunk is usable and returned actual result
if (result) // this chunk is waste but the true result is always actual
return true;
// try again
chunk = m_first;
}
// the queue is empty
value = default(T);
return false;
}
/// <summary>
/// Tries to dequeue the specified amount of data from the queue.
/// </summary>
/// <returns><c>true</c>, if data was deuqueued, <c>false</c> otherwise.</returns>
/// <param name="buffer">The buffer to which the data will be written.</param>
/// <param name="offset">The offset in the buffer at which the data will be written.</param>
/// <param name="length">The maximum amount of data to be retrieved.</param>
/// <param name="dequeued">The actual amout of the retrieved data.</param>
public bool TryDequeueRange(T[] buffer, int offset, int length, out int dequeued) {
if (buffer == null)
throw new ArgumentNullException("buffer");
if (offset < 0)
throw new ArgumentOutOfRangeException("offset");
if (length < 1 || offset + length > buffer.Length)
throw new ArgumentOutOfRangeException("length");
var chunk = m_first;
bool recycle;
dequeued = 0;
while (chunk != null) {
int actual;
if (chunk.TryDequeueBatch(buffer, offset, length, out actual, out recycle)) {
offset += actual;
length -= actual;
dequeued += actual;
}
if (recycle) // this chunk is waste
RecycleFirstChunk(chunk);
else if (actual == 0)
break; // the chunk is usable, but doesn't contain any data (it's the last chunk in the queue)
if (length == 0)
return true;
// we still may dequeue something
// try again
chunk = m_first;
}
return dequeued != 0;
}
/// <summary>
/// Tries to dequeue all remaining data in the first chunk.
/// </summary>
/// <returns><c>true</c>, if data was dequeued, <c>false</c> otherwise.</returns>
/// <param name="buffer">The buffer to which the data will be written.</param>
/// <param name="offset">The offset in the buffer at which the data will be written.</param>
/// <param name="length">Tha maximum amount of the data to be dequeued.</param>
/// <param name="dequeued">The actual amount of the dequeued data.</param>
public bool TryDequeueChunk(T[] buffer, int offset, int length, out int dequeued) {
if (buffer == null)
throw new ArgumentNullException("buffer");
if (offset < 0)
throw new ArgumentOutOfRangeException("offset");
if (length < 1 || offset + length > buffer.Length)
throw new ArgumentOutOfRangeException("length");
var chunk = m_first;
bool recycle;
dequeued = 0;
while (chunk != null) {
int actual;
if (chunk.TryDequeueBatch(buffer, offset, length, out actual, out recycle)) {
dequeued = actual;
}
if (recycle) // this chunk is waste
RecycleFirstChunk(chunk);
// if we have dequeued any data, then return
if (dequeued != 0)
return true;
// we still may dequeue something
// try again
chunk = m_first;
}
return false;
}
bool EnqueueChunk(Chunk last, Chunk chunk) {
if (Interlocked.CompareExchange(ref m_last, chunk, last) != last)
return false;
if (last != null)
last.next = chunk;
else {
m_first = chunk;
}
return true;
}
void RecycleFirstChunk(Chunk first) {
var next = first.next;
if (first != Interlocked.CompareExchange(ref m_first, next, first))
return;
if (next == null) {
if (first != Interlocked.CompareExchange(ref m_last, null, first)) {
/*while (first.next == null)
Thread.MemoryBarrier();*/
// race
// someone already updated the tail, restore the pointer to the queue head
m_first = first;
}
// the tail is updated
}
// we need to update the head
//Interlocked.CompareExchange(ref m_first, next, first);
// if the head is already updated then give up
//return;
}
public void Clear() {
// start the new queue
var chunk = new Chunk(DEFAULT_CHUNK_SIZE);
do {
Thread.MemoryBarrier();
var first = m_first;
var last = m_last;
if (last == null) // nothing to clear
return;
if (first == null || (first.next == null && first != last)) // inconcistency
continue;
// here we will create inconsistency which will force others to spin
// and prevent from fetching. chunk.next = null
if (first != Interlocked.CompareExchange(ref m_first, chunk, first))
continue;// inconsistent
m_last = chunk;
return;
} while(true);
}
public T[] Drain() {
// start the new queue
var chunk = new Chunk(DEFAULT_CHUNK_SIZE);
do {
Thread.MemoryBarrier();
var first = m_first;
var last = m_last;
if (last == null)
return new T[0];
if (first == null || (first.next == null && first != last))
continue;
// here we will create inconsistency which will force others to spin
// and prevent from fetching. chunk.next = null
if (first != Interlocked.CompareExchange(ref m_first, chunk, first))
continue;// inconsistent
last = Interlocked.Exchange(ref m_last, chunk);
return ReadChunks(first, last);
} while(true);
}
static T[] ReadChunks(Chunk chunk, object last) {
var result = new List<T>();
var buffer = new T[DEFAULT_CHUNK_SIZE];
int actual;
bool recycle;
while (chunk != null) {
// ensure all write operations on the chunk are complete
chunk.Commit();
// we need to read the chunk using this way
// since some client still may completing the dequeue
// operation, such clients most likely won't get results
while (chunk.TryDequeueBatch(buffer, 0, buffer.Length, out actual, out recycle))
result.AddRange(new ArraySegmentCollection(buffer, 0, actual));
if (chunk == last) {
chunk = null;
} else {
while (chunk.next == null)
Thread.MemoryBarrier();
chunk = chunk.next;
}
}
return result.ToArray();
}
struct ArraySegmentCollection : ICollection<T> {
readonly T[] m_data;
readonly int m_offset;
readonly int m_length;
public ArraySegmentCollection(T[] data, int offset, int length) {
m_data = data;
m_offset = offset;
m_length = length;
}
#region ICollection implementation
public void Add(T item) {
throw new NotSupportedException();
}
public void Clear() {
throw new NotSupportedException();
}
public bool Contains(T item) {
return false;
}
public void CopyTo(T[] array, int arrayIndex) {
Array.Copy(m_data,m_offset,array,arrayIndex, m_length);
}
public bool Remove(T item) {
throw new NotSupportedException();
}
public int Count {
get {
return m_length;
}
}
public bool IsReadOnly {
get {
return true;
}
}
#endregion
#region IEnumerable implementation
public IEnumerator<T> GetEnumerator() {
for (int i = m_offset; i < m_length + m_offset; i++)
yield return m_data[i];
}
#endregion
#region IEnumerable implementation
IEnumerator IEnumerable.GetEnumerator() {
return GetEnumerator();
}
#endregion
}
#region IEnumerable implementation
class Enumerator : IEnumerator<T> {
Chunk m_current;
int m_pos = -1;
public Enumerator(Chunk fisrt) {
m_current = fisrt;
}
#region IEnumerator implementation
public bool MoveNext() {
if (m_current == null)
return false;
if (m_pos == -1)
m_pos = m_current.Low;
else
m_pos++;
if (m_pos == m_current.Hi) {
m_current = m_pos == m_current.Size ? m_current.next : null;
m_pos = 0;
if (m_current == null)
return false;
}
return true;
}
public void Reset() {
throw new NotSupportedException();
}
object IEnumerator.Current {
get {
return Current;
}
}
#endregion
#region IDisposable implementation
public void Dispose() {
}
#endregion
#region IEnumerator implementation
public T Current {
get {
if (m_pos == -1 || m_current == null)
throw new InvalidOperationException();
return m_current.GetAt(m_pos);
}
}
#endregion
}
public IEnumerator<T> GetEnumerator() {
return new Enumerator(m_first);
}
#endregion
#region IEnumerable implementation
IEnumerator IEnumerable.GetEnumerator() {
return GetEnumerator();
}
#endregion
}
}