##// END OF EJS Templates
minor fixes and improvements of AsyncQueue, additional tests
minor fixes and improvements of AsyncQueue, additional tests

File last commit:

r122:0c8685c8b56b v2
r122:0c8685c8b56b v2
Show More
AsyncQueue.cs
463 lines | 15.4 KiB | text/x-csharp | CSharpLexer
cin
Promises rewritten, added improved version of AsyncQueue
r119 using System.Threading;
using System.Collections.Generic;
using System;
using System.Collections;
namespace Implab.Parallels {
public class AsyncQueue<T> : IEnumerable<T> {
class Chunk {
public Chunk next;
int m_low;
int m_hi;
int m_alloc;
readonly int m_size;
readonly T[] m_data;
public Chunk(int size) {
m_size = size;
m_data = new T[size];
}
public Chunk(int size, T value) {
m_size = size;
m_hi = 1;
m_alloc = 1;
m_data = new T[size];
m_data[0] = value;
}
cin
working version of AsyncQueue and batch operations...
r121 public Chunk(int size, T[] data, int offset, int length, int alloc) {
m_size = size;
m_hi = length;
m_alloc = alloc;
m_data = new T[size];
Array.Copy(data, offset, m_data, 0, length);
}
cin
Promises rewritten, added improved version of AsyncQueue
r119 public int Low {
get { return m_low; }
}
public int Hi {
get { return m_hi; }
}
cin
working version of AsyncQueue and batch operations...
r121 public bool TryEnqueue(T value, out bool extend) {
cin
improved asyncpool usability...
r120 var alloc = Interlocked.Increment(ref m_alloc) - 1;
cin
Promises rewritten, added improved version of AsyncQueue
r119
cin
improved asyncpool usability...
r120 if (alloc >= m_size) {
extend = alloc == m_size;
cin
Promises rewritten, added improved version of AsyncQueue
r119 return false;
}
cin
improved asyncpool usability...
r120
extend = false;
cin
Promises rewritten, added improved version of AsyncQueue
r119 m_data[alloc] = value;
while (alloc != Interlocked.CompareExchange(ref m_hi, alloc + 1, alloc)) {
// spin wait for commit
}
return true;
}
cin
working version of AsyncQueue and batch operations...
r121 public bool TryDequeue(out T value, out bool recycle) {
cin
Promises rewritten, added improved version of AsyncQueue
r119 int low;
do {
low = m_low;
if (low >= m_hi) {
value = default(T);
recycle = (low == m_size);
return false;
}
} while(low != Interlocked.CompareExchange(ref m_low, low + 1, low));
recycle = (low == m_size - 1);
value = m_data[low];
return true;
}
cin
improved asyncpool usability...
r120 public bool TryEnqueueBatch(T[] batch, int offset, int length, out int enqueued, out bool extend) {
cin
minor fixes and improvements of AsyncQueue, additional tests
r122 //int alloc;
//int allocSize;
cin
improved asyncpool usability...
r120
cin
minor fixes and improvements of AsyncQueue, additional tests
r122 var alloc = Interlocked.Add(ref m_alloc, length) - length;
if (alloc > m_size) {
// the chunk is full and someone already
// creating the new one
enqueued = 0; // nothing was added
extend = false; // the caller shouldn't try to extend the queue
return false; // nothing was added
}
cin
improved asyncpool usability...
r120
cin
minor fixes and improvements of AsyncQueue, additional tests
r122 enqueued = Math.Min(m_size - alloc, length);
extend = length > enqueued;
cin
improved asyncpool usability...
r120
cin
minor fixes and improvements of AsyncQueue, additional tests
r122 if (enqueued == 0)
cin
improved asyncpool usability...
r120 return false;
cin
minor fixes and improvements of AsyncQueue, additional tests
r122 Array.Copy(batch, offset, m_data, alloc, enqueued);
while (alloc != Interlocked.CompareExchange(ref m_hi, alloc + enqueued, alloc)) {
cin
improved asyncpool usability...
r120 // spin wait for commit
}
cin
minor fixes and improvements of AsyncQueue, additional tests
r122
cin
improved asyncpool usability...
r120 return true;
}
cin
working version of AsyncQueue and batch operations...
r121 public bool TryDequeueBatch(T[] buffer, int offset, int length,out int dequeued, out bool recycle) {
int low, hi, batchSize;
do {
low = m_low;
hi = m_hi;
if (low >= hi) {
dequeued = 0;
recycle = (low == m_size); // recycling could be restarted and we need to signal again
return false;
}
batchSize = Math.Min(hi - low, length);
} while(low != Interlocked.CompareExchange(ref m_low, low + batchSize, low));
recycle = (low == m_size - batchSize);
dequeued = batchSize;
Array.Copy(m_data, low, buffer, offset, batchSize);
return true;
}
cin
Promises rewritten, added improved version of AsyncQueue
r119 public T GetAt(int pos) {
return m_data[pos];
}
}
public const int DEFAULT_CHUNK_SIZE = 32;
cin
working version of AsyncQueue and batch operations...
r121 public const int MAX_CHUNK_SIZE = 262144;
cin
Promises rewritten, added improved version of AsyncQueue
r119
readonly int m_chunkSize = DEFAULT_CHUNK_SIZE;
Chunk m_first;
Chunk m_last;
public AsyncQueue() {
m_last = m_first = new Chunk(m_chunkSize);
}
cin
working version of AsyncQueue and batch operations...
r121 /// <summary>
/// Adds the specified value to the queue.
/// </summary>
/// <param name="value">Tha value which will be added to the queue.</param>
cin
Promises rewritten, added improved version of AsyncQueue
r119 public void Enqueue(T value) {
var last = m_last;
// spin wait to the new chunk
bool extend = true;
cin
working version of AsyncQueue and batch operations...
r121 while (last == null || !last.TryEnqueue(value, out extend)) {
cin
Promises rewritten, added improved version of AsyncQueue
r119 // try to extend queue
if (extend || last == null) {
var chunk = new Chunk(m_chunkSize, value);
if (EnqueueChunk(last, chunk))
cin
minor fixes and improvements of AsyncQueue, additional tests
r122 break; // success! exit!
cin
Promises rewritten, added improved version of AsyncQueue
r119 last = m_last;
} else {
cin
working version of AsyncQueue and batch operations...
r121 while (last == m_last) {
cin
Promises rewritten, added improved version of AsyncQueue
r119 Thread.MemoryBarrier();
}
cin
working version of AsyncQueue and batch operations...
r121 last = m_last;
cin
Promises rewritten, added improved version of AsyncQueue
r119 }
}
}
cin
working version of AsyncQueue and batch operations...
r121 /// <summary>
/// Adds the specified data to the queue.
/// </summary>
/// <param name="data">The buffer which contains the data to be enqueued.</param>
/// <param name="offset">The offset of the data in the buffer.</param>
/// <param name="length">The size of the data to read from the buffer.</param>
public void EnqueueRange(T[] data, int offset, int length) {
if (data == null)
throw new ArgumentNullException("data");
if (offset < 0)
throw new ArgumentOutOfRangeException("offset");
if (length < 1 || offset + length > data.Length)
throw new ArgumentOutOfRangeException("length");
var last = m_last;
bool extend;
int enqueued;
while (length > 0) {
extend = true;
if (last != null && last.TryEnqueueBatch(data, offset, length, out enqueued, out extend)) {
length -= enqueued;
offset += enqueued;
}
if (extend) {
// there was no enough space in the chunk
// or there was no chunks in the queue
while (length > 0) {
var size = Math.Min(length, MAX_CHUNK_SIZE);
var chunk = new Chunk(
Math.Max(size, m_chunkSize),
data,
offset,
size,
length // length >= size
);
if (!EnqueueChunk(last, chunk)) {
// looks like the queue has been updated then proceed from the beginning
last = m_last;
break;
}
// we have successfully added the new chunk
last = chunk;
length -= size;
offset += size;
}
} else {
// we don't need to extend the queue, if we successfully enqueued data
if (length == 0)
break;
// if we need to wait while someone is extending the queue
// spinwait
while (last == m_last) {
Thread.MemoryBarrier();
}
last = m_last;
}
}
}
/// <summary>
/// Tries to retrieve the first element from the queue.
/// </summary>
/// <returns><c>true</c>, if element is dequeued, <c>false</c> otherwise.</returns>
/// <param name="value">The value of the dequeued element.</param>
cin
Promises rewritten, added improved version of AsyncQueue
r119 public bool TryDequeue(out T value) {
var chunk = m_first;
bool recycle;
while (chunk != null) {
var result = chunk.TryDequeue(out value, out recycle);
if (recycle) // this chunk is waste
RecycleFirstChunk(chunk);
else
return result; // this chunk is usable and returned actual result
if (result) // this chunk is waste but the true result is always actual
return true;
// try again
chunk = m_first;
}
// the queue is empty
value = default(T);
return false;
}
cin
working version of AsyncQueue and batch operations...
r121 /// <summary>
/// Tries to dequeue the specified amount of data from the queue.
/// </summary>
/// <returns><c>true</c>, if data was deuqueued, <c>false</c> otherwise.</returns>
/// <param name="buffer">The buffer to which the data will be written.</param>
/// <param name="offset">The offset in the buffer at which the data will be written.</param>
/// <param name="length">The maximum amount of data to be retrieved.</param>
/// <param name="dequeued">The actual amout of the retrieved data.</param>
public bool TryDequeueRange(T[] buffer, int offset, int length, out int dequeued) {
if (buffer == null)
throw new ArgumentNullException("buffer");
if (offset < 0)
throw new ArgumentOutOfRangeException("offset");
if (length < 1 || offset + length > buffer.Length)
throw new ArgumentOutOfRangeException("length");
var chunk = m_first;
bool recycle;
dequeued = 0;
while (chunk != null) {
int actual;
if (chunk.TryDequeueBatch(buffer, offset, length, out actual, out recycle)) {
offset += actual;
length -= actual;
dequeued += actual;
}
if (recycle) // this chunk is waste
RecycleFirstChunk(chunk);
else if (actual == 0)
break; // the chunk is usable, but doesn't contain any data (it's the last chunk in the queue)
if (length == 0)
return true;
// we still may dequeue something
// try again
chunk = m_first;
}
return dequeued != 0;
}
/// <summary>
/// Tries to dequeue all remaining data in the first chunk.
/// </summary>
/// <returns><c>true</c>, if data was dequeued, <c>false</c> otherwise.</returns>
cin
minor fixes and improvements of AsyncQueue, additional tests
r122 /// <param name="buffer">The buffer to which the data will be written.</param>
cin
working version of AsyncQueue and batch operations...
r121 /// <param name="offset">The offset in the buffer at which the data will be written.</param>
/// <param name="length">Tha maximum amount of the data to be dequeued.</param>
/// <param name="dequeued">The actual amount of the dequeued data.</param>
public bool TryDequeueChunk(T[] buffer, int offset, int length, out int dequeued) {
if (buffer == null)
throw new ArgumentNullException("buffer");
if (offset < 0)
throw new ArgumentOutOfRangeException("offset");
if (length < 1 || offset + length > buffer.Length)
throw new ArgumentOutOfRangeException("length");
var chunk = m_first;
bool recycle;
dequeued = 0;
while (chunk != null) {
int actual;
if (chunk.TryDequeueBatch(buffer, offset, length, out actual, out recycle)) {
dequeued = actual;
}
if (recycle) // this chunk is waste
RecycleFirstChunk(chunk);
// if we have dequeued any data, then return
if (dequeued != 0)
return true;
// we still may dequeue something
// try again
chunk = m_first;
}
return false;
}
cin
Promises rewritten, added improved version of AsyncQueue
r119 bool EnqueueChunk(Chunk last, Chunk chunk) {
if (Interlocked.CompareExchange(ref m_last, chunk, last) != last)
return false;
if (last != null)
last.next = chunk;
else
m_first = chunk;
return true;
}
void RecycleFirstChunk(Chunk first) {
var next = first.next;
if (next == null) {
// looks like this is the last chunk
if (first != Interlocked.CompareExchange(ref m_last, null, first)) {
// race
// maybe someone already recycled this chunk
// or a new chunk has been appedned to the queue
return; // give up
}
// the tail is updated
}
// we need to update the head
Interlocked.CompareExchange(ref m_first, next, first);
// if the head is already updated then give up
return;
}
#region IEnumerable implementation
class Enumerator : IEnumerator<T> {
Chunk m_current;
int m_pos = -1;
public Enumerator(Chunk fisrt) {
m_current = fisrt;
}
#region IEnumerator implementation
public bool MoveNext() {
if (m_current == null)
return false;
if (m_pos == -1)
m_pos = m_current.Low;
else
m_pos++;
if (m_pos == m_current.Hi) {
m_pos = 0;
m_current = m_current.next;
}
return true;
}
public void Reset() {
throw new NotSupportedException();
}
object IEnumerator.Current {
get {
return Current;
}
}
#endregion
#region IDisposable implementation
public void Dispose() {
}
#endregion
#region IEnumerator implementation
public T Current {
get {
if (m_pos == -1 || m_current == null)
throw new InvalidOperationException();
return m_current.GetAt(m_pos);
}
}
#endregion
}
public IEnumerator<T> GetEnumerator() {
return new Enumerator(m_first);
}
#endregion
#region IEnumerable implementation
IEnumerator IEnumerable.GetEnumerator() {
return GetEnumerator();
}
#endregion
}
}