##// END OF EJS Templates
fixed promise chaining behavior, the error handler doesn't handle result or cancellation handlers exceptions these exceptions are propagated to the next handlers.
fixed promise chaining behavior, the error handler doesn't handle result or cancellation handlers exceptions these exceptions are propagated to the next handlers.

File last commit:

r137:238e15580926 v2
r196:40d7fed4a09e default
Show More
AsyncQueue.cs
631 lines | 20.6 KiB | text/x-csharp | CSharpLexer
cin
Promises rewritten, added improved version of AsyncQueue
r119 using System.Threading;
using System.Collections.Generic;
using System;
using System.Collections;
cin
major update, added Drain mathod to AsyncQueue class
r124 using System.Diagnostics;
cin
Promises rewritten, added improved version of AsyncQueue
r119
namespace Implab.Parallels {
public class AsyncQueue<T> : IEnumerable<T> {
class Chunk {
public Chunk next;
int m_low;
int m_hi;
int m_alloc;
readonly int m_size;
readonly T[] m_data;
public Chunk(int size) {
m_size = size;
m_data = new T[size];
}
public Chunk(int size, T value) {
m_size = size;
m_hi = 1;
m_alloc = 1;
m_data = new T[size];
m_data[0] = value;
}
cin
working version of AsyncQueue and batch operations...
r121 public Chunk(int size, T[] data, int offset, int length, int alloc) {
m_size = size;
m_hi = length;
m_alloc = alloc;
m_data = new T[size];
Array.Copy(data, offset, m_data, 0, length);
}
cin
Promises rewritten, added improved version of AsyncQueue
r119 public int Low {
get { return m_low; }
}
public int Hi {
get { return m_hi; }
}
cin
fixed AsyncQueue iterator
r127 public int Size {
get { return m_size; }
}
cin
working version of AsyncQueue and batch operations...
r121 public bool TryEnqueue(T value, out bool extend) {
cin
improved asyncpool usability...
r120 var alloc = Interlocked.Increment(ref m_alloc) - 1;
cin
Promises rewritten, added improved version of AsyncQueue
r119
cin
improved asyncpool usability...
r120 if (alloc >= m_size) {
extend = alloc == m_size;
cin
Promises rewritten, added improved version of AsyncQueue
r119 return false;
}
cin
improved asyncpool usability...
r120
extend = false;
cin
Promises rewritten, added improved version of AsyncQueue
r119 m_data[alloc] = value;
while (alloc != Interlocked.CompareExchange(ref m_hi, alloc + 1, alloc)) {
// spin wait for commit
}
return true;
}
cin
major update, added Drain mathod to AsyncQueue class
r124 /// <summary>
/// Prevents from allocating new space in the chunk and waits for all write operations to complete
/// </summary>
public void Commit() {
var actual = Math.Min(Interlocked.Exchange(ref m_alloc, m_size + 1), m_size);
while (m_hi != actual)
Thread.MemoryBarrier();
}
cin
working version of AsyncQueue and batch operations...
r121 public bool TryDequeue(out T value, out bool recycle) {
cin
Promises rewritten, added improved version of AsyncQueue
r119 int low;
do {
low = m_low;
if (low >= m_hi) {
value = default(T);
recycle = (low == m_size);
return false;
}
} while(low != Interlocked.CompareExchange(ref m_low, low + 1, low));
recycle = (low == m_size - 1);
value = m_data[low];
return true;
}
cin
improved asyncpool usability...
r120 public bool TryEnqueueBatch(T[] batch, int offset, int length, out int enqueued, out bool extend) {
cin
minor fixes and improvements of AsyncQueue, additional tests
r122 //int alloc;
//int allocSize;
cin
improved asyncpool usability...
r120
cin
minor fixes and improvements of AsyncQueue, additional tests
r122 var alloc = Interlocked.Add(ref m_alloc, length) - length;
if (alloc > m_size) {
// the chunk is full and someone already
// creating the new one
enqueued = 0; // nothing was added
extend = false; // the caller shouldn't try to extend the queue
return false; // nothing was added
}
cin
improved asyncpool usability...
r120
cin
minor fixes and improvements of AsyncQueue, additional tests
r122 enqueued = Math.Min(m_size - alloc, length);
extend = length > enqueued;
cin
improved asyncpool usability...
r120
cin
minor fixes and improvements of AsyncQueue, additional tests
r122 if (enqueued == 0)
cin
improved asyncpool usability...
r120 return false;
cin
minor fixes and improvements of AsyncQueue, additional tests
r122 Array.Copy(batch, offset, m_data, alloc, enqueued);
while (alloc != Interlocked.CompareExchange(ref m_hi, alloc + enqueued, alloc)) {
cin
improved asyncpool usability...
r120 // spin wait for commit
}
cin
minor fixes and improvements of AsyncQueue, additional tests
r122
cin
improved asyncpool usability...
r120 return true;
}
cin
working version of AsyncQueue and batch operations...
r121 public bool TryDequeueBatch(T[] buffer, int offset, int length,out int dequeued, out bool recycle) {
int low, hi, batchSize;
do {
low = m_low;
hi = m_hi;
if (low >= hi) {
dequeued = 0;
recycle = (low == m_size); // recycling could be restarted and we need to signal again
return false;
}
batchSize = Math.Min(hi - low, length);
} while(low != Interlocked.CompareExchange(ref m_low, low + batchSize, low));
recycle = (low == m_size - batchSize);
dequeued = batchSize;
Array.Copy(m_data, low, buffer, offset, batchSize);
return true;
}
cin
Promises rewritten, added improved version of AsyncQueue
r119 public T GetAt(int pos) {
return m_data[pos];
}
}
public const int DEFAULT_CHUNK_SIZE = 32;
cin
working version of AsyncQueue and batch operations...
r121 public const int MAX_CHUNK_SIZE = 262144;
cin
Promises rewritten, added improved version of AsyncQueue
r119
Chunk m_first;
Chunk m_last;
cin
working version of AsyncQueue and batch operations...
r121 /// <summary>
/// Adds the specified value to the queue.
/// </summary>
/// <param name="value">Tha value which will be added to the queue.</param>
cin
added the blocking queue
r137 public virtual void Enqueue(T value) {
cin
Promises rewritten, added improved version of AsyncQueue
r119 var last = m_last;
// spin wait to the new chunk
bool extend = true;
cin
working version of AsyncQueue and batch operations...
r121 while (last == null || !last.TryEnqueue(value, out extend)) {
cin
Promises rewritten, added improved version of AsyncQueue
r119 // try to extend queue
if (extend || last == null) {
cin
improved performance of promises
r125 var chunk = new Chunk(DEFAULT_CHUNK_SIZE, value);
cin
Promises rewritten, added improved version of AsyncQueue
r119 if (EnqueueChunk(last, chunk))
cin
minor fixes and improvements of AsyncQueue, additional tests
r122 break; // success! exit!
cin
Promises rewritten, added improved version of AsyncQueue
r119 last = m_last;
} else {
cin
working version of AsyncQueue and batch operations...
r121 while (last == m_last) {
cin
Promises rewritten, added improved version of AsyncQueue
r119 Thread.MemoryBarrier();
}
cin
working version of AsyncQueue and batch operations...
r121 last = m_last;
cin
Promises rewritten, added improved version of AsyncQueue
r119 }
}
}
cin
working version of AsyncQueue and batch operations...
r121 /// <summary>
/// Adds the specified data to the queue.
/// </summary>
/// <param name="data">The buffer which contains the data to be enqueued.</param>
/// <param name="offset">The offset of the data in the buffer.</param>
/// <param name="length">The size of the data to read from the buffer.</param>
cin
added the blocking queue
r137 public virtual void EnqueueRange(T[] data, int offset, int length) {
cin
working version of AsyncQueue and batch operations...
r121 if (data == null)
throw new ArgumentNullException("data");
cin
fixed Resove method bug when calling it on already cancelled promise
r130 if (length == 0)
return;
cin
working version of AsyncQueue and batch operations...
r121 if (offset < 0)
throw new ArgumentOutOfRangeException("offset");
if (length < 1 || offset + length > data.Length)
throw new ArgumentOutOfRangeException("length");
var last = m_last;
bool extend;
int enqueued;
while (length > 0) {
extend = true;
if (last != null && last.TryEnqueueBatch(data, offset, length, out enqueued, out extend)) {
length -= enqueued;
offset += enqueued;
}
if (extend) {
// there was no enough space in the chunk
// or there was no chunks in the queue
while (length > 0) {
var size = Math.Min(length, MAX_CHUNK_SIZE);
var chunk = new Chunk(
cin
improved performance of promises
r125 Math.Max(size, DEFAULT_CHUNK_SIZE),
cin
working version of AsyncQueue and batch operations...
r121 data,
offset,
size,
length // length >= size
);
if (!EnqueueChunk(last, chunk)) {
// looks like the queue has been updated then proceed from the beginning
last = m_last;
break;
}
// we have successfully added the new chunk
last = chunk;
length -= size;
offset += size;
}
} else {
// we don't need to extend the queue, if we successfully enqueued data
if (length == 0)
break;
// if we need to wait while someone is extending the queue
// spinwait
while (last == m_last) {
Thread.MemoryBarrier();
}
last = m_last;
}
}
}
/// <summary>
/// Tries to retrieve the first element from the queue.
/// </summary>
/// <returns><c>true</c>, if element is dequeued, <c>false</c> otherwise.</returns>
/// <param name="value">The value of the dequeued element.</param>
cin
Promises rewritten, added improved version of AsyncQueue
r119 public bool TryDequeue(out T value) {
var chunk = m_first;
bool recycle;
while (chunk != null) {
var result = chunk.TryDequeue(out value, out recycle);
if (recycle) // this chunk is waste
RecycleFirstChunk(chunk);
else
return result; // this chunk is usable and returned actual result
if (result) // this chunk is waste but the true result is always actual
return true;
// try again
chunk = m_first;
}
// the queue is empty
value = default(T);
return false;
}
cin
working version of AsyncQueue and batch operations...
r121 /// <summary>
/// Tries to dequeue the specified amount of data from the queue.
/// </summary>
/// <returns><c>true</c>, if data was deuqueued, <c>false</c> otherwise.</returns>
/// <param name="buffer">The buffer to which the data will be written.</param>
/// <param name="offset">The offset in the buffer at which the data will be written.</param>
/// <param name="length">The maximum amount of data to be retrieved.</param>
/// <param name="dequeued">The actual amout of the retrieved data.</param>
public bool TryDequeueRange(T[] buffer, int offset, int length, out int dequeued) {
if (buffer == null)
throw new ArgumentNullException("buffer");
if (offset < 0)
throw new ArgumentOutOfRangeException("offset");
if (length < 1 || offset + length > buffer.Length)
throw new ArgumentOutOfRangeException("length");
var chunk = m_first;
bool recycle;
dequeued = 0;
while (chunk != null) {
int actual;
if (chunk.TryDequeueBatch(buffer, offset, length, out actual, out recycle)) {
offset += actual;
length -= actual;
dequeued += actual;
}
if (recycle) // this chunk is waste
RecycleFirstChunk(chunk);
else if (actual == 0)
break; // the chunk is usable, but doesn't contain any data (it's the last chunk in the queue)
if (length == 0)
return true;
// we still may dequeue something
// try again
chunk = m_first;
}
return dequeued != 0;
}
/// <summary>
/// Tries to dequeue all remaining data in the first chunk.
/// </summary>
/// <returns><c>true</c>, if data was dequeued, <c>false</c> otherwise.</returns>
cin
minor fixes and improvements of AsyncQueue, additional tests
r122 /// <param name="buffer">The buffer to which the data will be written.</param>
cin
working version of AsyncQueue and batch operations...
r121 /// <param name="offset">The offset in the buffer at which the data will be written.</param>
/// <param name="length">Tha maximum amount of the data to be dequeued.</param>
/// <param name="dequeued">The actual amount of the dequeued data.</param>
public bool TryDequeueChunk(T[] buffer, int offset, int length, out int dequeued) {
if (buffer == null)
throw new ArgumentNullException("buffer");
if (offset < 0)
throw new ArgumentOutOfRangeException("offset");
if (length < 1 || offset + length > buffer.Length)
throw new ArgumentOutOfRangeException("length");
var chunk = m_first;
bool recycle;
dequeued = 0;
while (chunk != null) {
int actual;
if (chunk.TryDequeueBatch(buffer, offset, length, out actual, out recycle)) {
dequeued = actual;
}
if (recycle) // this chunk is waste
RecycleFirstChunk(chunk);
// if we have dequeued any data, then return
if (dequeued != 0)
return true;
// we still may dequeue something
// try again
chunk = m_first;
}
return false;
}
cin
Promises rewritten, added improved version of AsyncQueue
r119 bool EnqueueChunk(Chunk last, Chunk chunk) {
if (Interlocked.CompareExchange(ref m_last, chunk, last) != last)
return false;
if (last != null)
last.next = chunk;
cin
major update, added Drain mathod to AsyncQueue class
r124 else {
cin
Promises rewritten, added improved version of AsyncQueue
r119 m_first = chunk;
cin
major update, added Drain mathod to AsyncQueue class
r124 }
cin
Promises rewritten, added improved version of AsyncQueue
r119 return true;
}
void RecycleFirstChunk(Chunk first) {
var next = first.next;
cin
major update, added Drain mathod to AsyncQueue class
r124 if (first != Interlocked.CompareExchange(ref m_first, next, first))
return;
cin
Promises rewritten, added improved version of AsyncQueue
r119 if (next == null) {
cin
major update, added Drain mathod to AsyncQueue class
r124
cin
Promises rewritten, added improved version of AsyncQueue
r119 if (first != Interlocked.CompareExchange(ref m_last, null, first)) {
cin
major update, added Drain mathod to AsyncQueue class
r124 /*while (first.next == null)
Thread.MemoryBarrier();*/
cin
Promises rewritten, added improved version of AsyncQueue
r119 // race
cin
major update, added Drain mathod to AsyncQueue class
r124 // someone already updated the tail, restore the pointer to the queue head
m_first = first;
cin
Promises rewritten, added improved version of AsyncQueue
r119 }
// the tail is updated
}
// we need to update the head
cin
major update, added Drain mathod to AsyncQueue class
r124 //Interlocked.CompareExchange(ref m_first, next, first);
cin
Promises rewritten, added improved version of AsyncQueue
r119 // if the head is already updated then give up
cin
major update, added Drain mathod to AsyncQueue class
r124 //return;
cin
Promises rewritten, added improved version of AsyncQueue
r119
}
cin
async queue improvements
r123 public void Clear() {
// start the new queue
cin
improved performance of promises
r125 var chunk = new Chunk(DEFAULT_CHUNK_SIZE);
cin
major update, added Drain mathod to AsyncQueue class
r124
do {
Thread.MemoryBarrier();
var first = m_first;
var last = m_last;
if (last == null) // nothing to clear
return;
cin
async queue improvements
r123
cin
major update, added Drain mathod to AsyncQueue class
r124 if (first == null || (first.next == null && first != last)) // inconcistency
continue;
// here we will create inconsistency which will force others to spin
// and prevent from fetching. chunk.next = null
if (first != Interlocked.CompareExchange(ref m_first, chunk, first))
continue;// inconsistent
m_last = chunk;
return;
} while(true);
cin
async queue improvements
r123 }
public T[] Drain() {
// start the new queue
cin
improved performance of promises
r125 var chunk = new Chunk(DEFAULT_CHUNK_SIZE);
cin
async queue improvements
r123
do {
cin
major update, added Drain mathod to AsyncQueue class
r124 Thread.MemoryBarrier();
var first = m_first;
var last = m_last;
if (last == null)
return new T[0];
if (first == null || (first.next == null && first != last))
continue;
cin
async queue improvements
r123
cin
major update, added Drain mathod to AsyncQueue class
r124 // here we will create inconsistency which will force others to spin
// and prevent from fetching. chunk.next = null
if (first != Interlocked.CompareExchange(ref m_first, chunk, first))
continue;// inconsistent
cin
async queue improvements
r123
cin
major update, added Drain mathod to AsyncQueue class
r124 last = Interlocked.Exchange(ref m_last, chunk);
return ReadChunks(first, last);
} while(true);
cin
async queue improvements
r123 }
cin
Added Signal class a lightweight alternative to ManualResetEvent
r128 static T[] ReadChunks(Chunk chunk, object last) {
cin
async queue improvements
r123 var result = new List<T>();
cin
improved performance of promises
r125 var buffer = new T[DEFAULT_CHUNK_SIZE];
cin
async queue improvements
r123 int actual;
bool recycle;
while (chunk != null) {
cin
major update, added Drain mathod to AsyncQueue class
r124 // ensure all write operations on the chunk are complete
chunk.Commit();
cin
async queue improvements
r123 // we need to read the chunk using this way
// since some client still may completing the dequeue
// operation, such clients most likely won't get results
while (chunk.TryDequeueBatch(buffer, 0, buffer.Length, out actual, out recycle))
result.AddRange(new ArraySegmentCollection(buffer, 0, actual));
cin
major update, added Drain mathod to AsyncQueue class
r124 if (chunk == last) {
chunk = null;
} else {
while (chunk.next == null)
Thread.MemoryBarrier();
chunk = chunk.next;
}
cin
async queue improvements
r123 }
return result.ToArray();
}
struct ArraySegmentCollection : ICollection<T> {
readonly T[] m_data;
readonly int m_offset;
readonly int m_length;
public ArraySegmentCollection(T[] data, int offset, int length) {
m_data = data;
m_offset = offset;
m_length = length;
}
#region ICollection implementation
public void Add(T item) {
cin
Added SharedLock to synchronization routines
r129 throw new NotSupportedException();
cin
async queue improvements
r123 }
public void Clear() {
cin
Added SharedLock to synchronization routines
r129 throw new NotSupportedException();
cin
async queue improvements
r123 }
public bool Contains(T item) {
return false;
}
public void CopyTo(T[] array, int arrayIndex) {
Array.Copy(m_data,m_offset,array,arrayIndex, m_length);
}
public bool Remove(T item) {
cin
Added SharedLock to synchronization routines
r129 throw new NotSupportedException();
cin
async queue improvements
r123 }
public int Count {
get {
return m_length;
}
}
public bool IsReadOnly {
get {
return true;
}
}
#endregion
#region IEnumerable implementation
public IEnumerator<T> GetEnumerator() {
for (int i = m_offset; i < m_length + m_offset; i++)
yield return m_data[i];
}
#endregion
#region IEnumerable implementation
IEnumerator IEnumerable.GetEnumerator() {
return GetEnumerator();
}
#endregion
}
cin
Promises rewritten, added improved version of AsyncQueue
r119 #region IEnumerable implementation
class Enumerator : IEnumerator<T> {
Chunk m_current;
int m_pos = -1;
public Enumerator(Chunk fisrt) {
m_current = fisrt;
}
#region IEnumerator implementation
public bool MoveNext() {
if (m_current == null)
return false;
if (m_pos == -1)
m_pos = m_current.Low;
else
m_pos++;
cin
fixed AsyncQueue iterator
r127
cin
Promises rewritten, added improved version of AsyncQueue
r119 if (m_pos == m_current.Hi) {
cin
fixed AsyncQueue iterator
r127
m_current = m_pos == m_current.Size ? m_current.next : null;
cin
Promises rewritten, added improved version of AsyncQueue
r119 m_pos = 0;
cin
fixed AsyncQueue iterator
r127
if (m_current == null)
return false;
cin
Promises rewritten, added improved version of AsyncQueue
r119 }
return true;
}
public void Reset() {
throw new NotSupportedException();
}
object IEnumerator.Current {
get {
return Current;
}
}
#endregion
#region IDisposable implementation
public void Dispose() {
}
#endregion
#region IEnumerator implementation
public T Current {
get {
if (m_pos == -1 || m_current == null)
throw new InvalidOperationException();
return m_current.GetAt(m_pos);
}
}
#endregion
}
public IEnumerator<T> GetEnumerator() {
return new Enumerator(m_first);
}
#endregion
#region IEnumerable implementation
IEnumerator IEnumerable.GetEnumerator() {
return GetEnumerator();
}
#endregion
}
}