##// END OF EJS Templates
Improved AsyncQueue...
Improved AsyncQueue Removed ImplabFx

File last commit:

r233:d6fe09f5592c v2
r233:d6fe09f5592c v2
Show More
AsyncQueue.cs
562 lines | 18.6 KiB | text/x-csharp | CSharpLexer
cin
Promises rewritten, added improved version of AsyncQueue
r119 using System.Threading;
using System.Collections.Generic;
using System;
using System.Collections;
cin
major update, added Drain mathod to AsyncQueue class
r124 using System.Diagnostics;
cin
Promises rewritten, added improved version of AsyncQueue
r119
namespace Implab.Parallels {
public class AsyncQueue<T> : IEnumerable<T> {
class Chunk {
cin
Improved AsyncQueue...
r233 public volatile Chunk next;
cin
Promises rewritten, added improved version of AsyncQueue
r119
cin
Improved AsyncQueue...
r233 volatile int m_low;
volatile int m_hi;
volatile int m_alloc;
cin
Promises rewritten, added improved version of AsyncQueue
r119 readonly int m_size;
readonly T[] m_data;
public Chunk(int size) {
m_size = size;
m_data = new T[size];
}
public Chunk(int size, T value) {
m_size = size;
m_hi = 1;
m_alloc = 1;
m_data = new T[size];
m_data[0] = value;
}
cin
Improved AsyncQueue...
r233 public Chunk(int size, int allocated) {
cin
working version of AsyncQueue and batch operations...
r121 m_size = size;
cin
Improved AsyncQueue...
r233 m_hi = allocated;
m_alloc = allocated;
cin
working version of AsyncQueue and batch operations...
r121 m_data = new T[size];
cin
Improved AsyncQueue...
r233 }
public void WriteData(T[] data, int offset, int dest, int length) {
Array.Copy(data, offset, m_data, dest, length);
cin
working version of AsyncQueue and batch operations...
r121 }
cin
Promises rewritten, added improved version of AsyncQueue
r119 public int Low {
get { return m_low; }
}
public int Hi {
get { return m_hi; }
}
cin
fixed AsyncQueue iterator
r127 public int Size {
get { return m_size; }
}
cin
Improved AsyncQueue...
r233 public bool TryEnqueue(T value) {
int alloc;
do {
alloc = m_alloc;
if (alloc >= m_size)
return false;
} while(alloc != Interlocked.CompareExchange(ref m_alloc, alloc + 1, alloc));
cin
Promises rewritten, added improved version of AsyncQueue
r119 m_data[alloc] = value;
cin
Improved AsyncQueue...
r233 SpinWait spin = new SpinWait();
// m_hi is volatile
while (alloc != m_hi) {
cin
Promises rewritten, added improved version of AsyncQueue
r119 // spin wait for commit
cin
Improved AsyncQueue...
r233 spin.SpinOnce();
cin
Promises rewritten, added improved version of AsyncQueue
r119 }
cin
Improved AsyncQueue...
r233 m_hi = alloc + 1;
cin
Promises rewritten, added improved version of AsyncQueue
r119 return true;
}
cin
major update, added Drain mathod to AsyncQueue class
r124 /// <summary>
/// Prevents from allocating new space in the chunk and waits for all write operations to complete
/// </summary>
cin
Improved AsyncQueue...
r233 public void Seal() {
var actual = Math.Min(Interlocked.Exchange(ref m_alloc, m_size), m_size);
SpinWait spin = new SpinWait();
while (m_hi != actual) {
spin.SpinOnce();
}
cin
major update, added Drain mathod to AsyncQueue class
r124 }
cin
working version of AsyncQueue and batch operations...
r121 public bool TryDequeue(out T value, out bool recycle) {
cin
Promises rewritten, added improved version of AsyncQueue
r119 int low;
do {
low = m_low;
if (low >= m_hi) {
value = default(T);
recycle = (low == m_size);
return false;
}
cin
Improved AsyncQueue...
r233 } while (low != Interlocked.CompareExchange(ref m_low, low + 1, low));
cin
Promises rewritten, added improved version of AsyncQueue
r119
cin
Improved AsyncQueue...
r233 recycle = (low + 1 == m_size);
cin
Promises rewritten, added improved version of AsyncQueue
r119 value = m_data[low];
return true;
}
cin
Improved AsyncQueue...
r233 public bool TryEnqueueBatch(T[] batch, int offset, int length, out int enqueued) {
int alloc;
do {
alloc = m_alloc;
if (alloc >= m_size) {
enqueued = 0;
return false;
} else {
enqueued = Math.Min(length, m_size - alloc);
}
} while (alloc != Interlocked.CompareExchange(ref m_alloc, alloc + enqueued, alloc));
Array.Copy(batch, offset, m_data, alloc, enqueued);
cin
improved asyncpool usability...
r120
cin
Improved AsyncQueue...
r233 SpinWait spin = new SpinWait();
while (alloc != m_hi) {
spin.SpinOnce();
cin
minor fixes and improvements of AsyncQueue, additional tests
r122 }
cin
improved asyncpool usability...
r120
cin
Improved AsyncQueue...
r233 m_hi = alloc + enqueued;
cin
improved asyncpool usability...
r120 return true;
}
cin
Improved AsyncQueue...
r233 public bool TryDequeueBatch(T[] buffer, int offset, int length, out int dequeued, out bool recycle) {
cin
working version of AsyncQueue and batch operations...
r121 int low, hi, batchSize;
do {
low = m_low;
hi = m_hi;
if (low >= hi) {
dequeued = 0;
cin
Improved AsyncQueue...
r233 recycle = (low == m_size);
cin
working version of AsyncQueue and batch operations...
r121 return false;
}
batchSize = Math.Min(hi - low, length);
cin
Improved AsyncQueue...
r233 } while (low != Interlocked.CompareExchange(ref m_low, low + batchSize, low));
cin
working version of AsyncQueue and batch operations...
r121
dequeued = batchSize;
cin
Improved AsyncQueue...
r233 recycle = (low + batchSize == m_size);
cin
working version of AsyncQueue and batch operations...
r121 Array.Copy(m_data, low, buffer, offset, batchSize);
return true;
}
cin
Promises rewritten, added improved version of AsyncQueue
r119 public T GetAt(int pos) {
return m_data[pos];
}
}
public const int DEFAULT_CHUNK_SIZE = 32;
cin
Improved AsyncQueue...
r233 public const int MAX_CHUNK_SIZE = 256;
cin
Promises rewritten, added improved version of AsyncQueue
r119
Chunk m_first;
Chunk m_last;
cin
Improved AsyncQueue...
r233 public AsyncQueue() {
m_first = m_last = new Chunk(DEFAULT_CHUNK_SIZE);
}
cin
working version of AsyncQueue and batch operations...
r121 /// <summary>
/// Adds the specified value to the queue.
/// </summary>
/// <param name="value">Tha value which will be added to the queue.</param>
cin
Improved AsyncQueue...
r233 public void Enqueue(T value) {
cin
Promises rewritten, added improved version of AsyncQueue
r119 var last = m_last;
cin
Improved AsyncQueue...
r233 SpinWait spin = new SpinWait();
while (!last.TryEnqueue(value)) {
cin
Promises rewritten, added improved version of AsyncQueue
r119 // try to extend queue
cin
Improved AsyncQueue...
r233 var chunk = new Chunk(DEFAULT_CHUNK_SIZE, value);
var t = Interlocked.CompareExchange(ref m_last, chunk, last);
if (t == last) {
last.next = chunk;
break;
cin
Promises rewritten, added improved version of AsyncQueue
r119 } else {
cin
Improved AsyncQueue...
r233 last = t;
cin
Promises rewritten, added improved version of AsyncQueue
r119 }
cin
Improved AsyncQueue...
r233 spin.SpinOnce();
cin
Promises rewritten, added improved version of AsyncQueue
r119 }
}
cin
working version of AsyncQueue and batch operations...
r121 /// <summary>
/// Adds the specified data to the queue.
/// </summary>
/// <param name="data">The buffer which contains the data to be enqueued.</param>
/// <param name="offset">The offset of the data in the buffer.</param>
/// <param name="length">The size of the data to read from the buffer.</param>
cin
Improved AsyncQueue...
r233 public void EnqueueRange(T[] data, int offset, int length) {
cin
working version of AsyncQueue and batch operations...
r121 if (data == null)
throw new ArgumentNullException("data");
if (offset < 0)
throw new ArgumentOutOfRangeException("offset");
if (length < 1 || offset + length > data.Length)
throw new ArgumentOutOfRangeException("length");
cin
Improved AsyncQueue...
r233 while (length > 0) {
var last = m_last;
int enqueued;
cin
working version of AsyncQueue and batch operations...
r121
cin
Improved AsyncQueue...
r233 if (last.TryEnqueueBatch(data, offset, length, out enqueued)) {
cin
working version of AsyncQueue and batch operations...
r121 length -= enqueued;
offset += enqueued;
}
cin
Improved AsyncQueue...
r233 if (length > 0) {
// we have something to enqueue
cin
working version of AsyncQueue and batch operations...
r121
cin
Improved AsyncQueue...
r233 var tail = length % MAX_CHUNK_SIZE;
cin
working version of AsyncQueue and batch operations...
r121
cin
Improved AsyncQueue...
r233 var chunk = new Chunk(Math.Max(tail, DEFAULT_CHUNK_SIZE), tail);
if (last != Interlocked.CompareExchange(ref m_last, chunk, last))
continue; // we wasn't able to catch the writer, roundtrip
cin
working version of AsyncQueue and batch operations...
r121
cin
Improved AsyncQueue...
r233 // we are lucky
// we can exclusively write our batch, the other writers will continue their work
length -= tail;
cin
working version of AsyncQueue and batch operations...
r121
cin
Improved AsyncQueue...
r233
for(var i = 0; i < length; i+= MAX_CHUNK_SIZE) {
var node = new Chunk(MAX_CHUNK_SIZE, MAX_CHUNK_SIZE);
node.WriteData(data, offset, 0, MAX_CHUNK_SIZE);
offset += MAX_CHUNK_SIZE;
// fence last.next is volatile
last.next = node;
last = node;
cin
working version of AsyncQueue and batch operations...
r121 }
cin
Improved AsyncQueue...
r233 if (tail > 0)
chunk.WriteData(data, offset, 0, tail);
// fence last.next is volatile
last.next = chunk;
return;
cin
working version of AsyncQueue and batch operations...
r121 }
}
}
/// <summary>
/// Tries to retrieve the first element from the queue.
/// </summary>
/// <returns><c>true</c>, if element is dequeued, <c>false</c> otherwise.</returns>
/// <param name="value">The value of the dequeued element.</param>
cin
Promises rewritten, added improved version of AsyncQueue
r119 public bool TryDequeue(out T value) {
var chunk = m_first;
cin
Improved AsyncQueue...
r233 do {
bool recycle;
cin
Promises rewritten, added improved version of AsyncQueue
r119
var result = chunk.TryDequeue(out value, out recycle);
cin
Improved AsyncQueue...
r233 if (recycle && chunk.next != null) {
// this chunk is waste
chunk = Interlocked.CompareExchange(ref m_first, chunk.next, chunk);
} else {
cin
Promises rewritten, added improved version of AsyncQueue
r119 return result; // this chunk is usable and returned actual result
cin
Improved AsyncQueue...
r233 }
cin
Promises rewritten, added improved version of AsyncQueue
r119
if (result) // this chunk is waste but the true result is always actual
return true;
cin
Improved AsyncQueue...
r233 } while (true);
cin
Promises rewritten, added improved version of AsyncQueue
r119 }
cin
working version of AsyncQueue and batch operations...
r121 /// <summary>
/// Tries to dequeue the specified amount of data from the queue.
/// </summary>
/// <returns><c>true</c>, if data was deuqueued, <c>false</c> otherwise.</returns>
/// <param name="buffer">The buffer to which the data will be written.</param>
/// <param name="offset">The offset in the buffer at which the data will be written.</param>
/// <param name="length">The maximum amount of data to be retrieved.</param>
/// <param name="dequeued">The actual amout of the retrieved data.</param>
public bool TryDequeueRange(T[] buffer, int offset, int length, out int dequeued) {
if (buffer == null)
throw new ArgumentNullException("buffer");
if (offset < 0)
throw new ArgumentOutOfRangeException("offset");
if (length < 1 || offset + length > buffer.Length)
throw new ArgumentOutOfRangeException("length");
var chunk = m_first;
dequeued = 0;
cin
Improved AsyncQueue...
r233 do {
bool recycle;
cin
working version of AsyncQueue and batch operations...
r121 int actual;
if (chunk.TryDequeueBatch(buffer, offset, length, out actual, out recycle)) {
offset += actual;
length -= actual;
dequeued += actual;
}
cin
Improved AsyncQueue...
r233 if (recycle && chunk.next != null) {
// this chunk is waste
chunk = Interlocked.CompareExchange(ref m_first, chunk.next, chunk);
} else {
chunk = null;
}
cin
working version of AsyncQueue and batch operations...
r121
if (length == 0)
return true;
cin
Improved AsyncQueue...
r233 } while (chunk != null);
cin
working version of AsyncQueue and batch operations...
r121
return dequeued != 0;
}
/// <summary>
/// Tries to dequeue all remaining data in the first chunk.
/// </summary>
/// <returns><c>true</c>, if data was dequeued, <c>false</c> otherwise.</returns>
cin
minor fixes and improvements of AsyncQueue, additional tests
r122 /// <param name="buffer">The buffer to which the data will be written.</param>
cin
working version of AsyncQueue and batch operations...
r121 /// <param name="offset">The offset in the buffer at which the data will be written.</param>
/// <param name="length">Tha maximum amount of the data to be dequeued.</param>
/// <param name="dequeued">The actual amount of the dequeued data.</param>
public bool TryDequeueChunk(T[] buffer, int offset, int length, out int dequeued) {
if (buffer == null)
throw new ArgumentNullException("buffer");
if (offset < 0)
throw new ArgumentOutOfRangeException("offset");
if (length < 1 || offset + length > buffer.Length)
throw new ArgumentOutOfRangeException("length");
var chunk = m_first;
cin
Improved AsyncQueue...
r233 do {
bool recycle;
chunk.TryDequeueBatch(buffer, offset, length, out dequeued, out recycle);
cin
working version of AsyncQueue and batch operations...
r121
cin
Improved AsyncQueue...
r233 if (recycle && chunk.next != null) {
// this chunk is waste
chunk = Interlocked.CompareExchange(ref m_first, chunk.next, chunk);
} else {
chunk = null;
cin
working version of AsyncQueue and batch operations...
r121 }
// if we have dequeued any data, then return
if (dequeued != 0)
return true;
cin
Improved AsyncQueue...
r233 } while (chunk != null);
cin
working version of AsyncQueue and batch operations...
r121
return false;
}
cin
Improved AsyncQueue...
r233
cin
Promises rewritten, added improved version of AsyncQueue
r119
cin
async queue improvements
r123 public void Clear() {
// start the new queue
cin
improved performance of promises
r125 var chunk = new Chunk(DEFAULT_CHUNK_SIZE);
cin
major update, added Drain mathod to AsyncQueue class
r124 do {
var first = m_first;
cin
Improved AsyncQueue...
r233 if (first.next == null && first != m_last) {
cin
major update, added Drain mathod to AsyncQueue class
r124 continue;
cin
Improved AsyncQueue...
r233 }
cin
async queue improvements
r123
cin
major update, added Drain mathod to AsyncQueue class
r124 // here we will create inconsistency which will force others to spin
// and prevent from fetching. chunk.next = null
if (first != Interlocked.CompareExchange(ref m_first, chunk, first))
continue;// inconsistent
cin
async queue improvements
r123
cin
Improved AsyncQueue...
r233 m_last = chunk;
return;
} while (true);
}
public List<T> Drain() {
// start the new queue
var chunk = new Chunk(DEFAULT_CHUNK_SIZE);
do {
var first = m_first;
// first.next is volatile
if (first.next == null) {
if (first != m_last)
continue;
else if (first.Hi == first.Low)
return new List<T>();
}
// here we will create inconsistency which will force others to spin
// and prevent from fetching. chunk.next = null
if (first != Interlocked.CompareExchange(ref m_first, chunk, first))
continue;// inconsistent
var last = Interlocked.Exchange(ref m_last, chunk);
cin
major update, added Drain mathod to AsyncQueue class
r124
return ReadChunks(first, last);
cin
Improved AsyncQueue...
r233 } while (true);
cin
async queue improvements
r123 }
cin
Improved AsyncQueue...
r233
static List<T> ReadChunks(Chunk chunk, object last) {
cin
async queue improvements
r123 var result = new List<T>();
cin
Improved AsyncQueue...
r233 var buffer = new T[MAX_CHUNK_SIZE];
cin
async queue improvements
r123 int actual;
bool recycle;
cin
Improved AsyncQueue...
r233 SpinWait spin = new SpinWait();
cin
async queue improvements
r123 while (chunk != null) {
cin
major update, added Drain mathod to AsyncQueue class
r124 // ensure all write operations on the chunk are complete
cin
Improved AsyncQueue...
r233 chunk.Seal();
cin
major update, added Drain mathod to AsyncQueue class
r124
cin
async queue improvements
r123 // we need to read the chunk using this way
// since some client still may completing the dequeue
// operation, such clients most likely won't get results
while (chunk.TryDequeueBatch(buffer, 0, buffer.Length, out actual, out recycle))
result.AddRange(new ArraySegmentCollection(buffer, 0, actual));
cin
major update, added Drain mathod to AsyncQueue class
r124 if (chunk == last) {
chunk = null;
} else {
while (chunk.next == null)
cin
Improved AsyncQueue...
r233 spin.SpinOnce();
cin
major update, added Drain mathod to AsyncQueue class
r124 chunk = chunk.next;
}
cin
async queue improvements
r123 }
cin
Improved AsyncQueue...
r233 return result;
cin
async queue improvements
r123 }
struct ArraySegmentCollection : ICollection<T> {
readonly T[] m_data;
readonly int m_offset;
readonly int m_length;
public ArraySegmentCollection(T[] data, int offset, int length) {
m_data = data;
m_offset = offset;
m_length = length;
}
#region ICollection implementation
public void Add(T item) {
cin
Added SharedLock to synchronization routines
r129 throw new NotSupportedException();
cin
async queue improvements
r123 }
public void Clear() {
cin
Added SharedLock to synchronization routines
r129 throw new NotSupportedException();
cin
async queue improvements
r123 }
public bool Contains(T item) {
return false;
}
public void CopyTo(T[] array, int arrayIndex) {
cin
Improved AsyncQueue...
r233 Array.Copy(m_data, m_offset, array, arrayIndex, m_length);
cin
async queue improvements
r123 }
public bool Remove(T item) {
cin
Added SharedLock to synchronization routines
r129 throw new NotSupportedException();
cin
async queue improvements
r123 }
public int Count {
get {
return m_length;
}
}
public bool IsReadOnly {
get {
return true;
}
}
#endregion
#region IEnumerable implementation
public IEnumerator<T> GetEnumerator() {
for (int i = m_offset; i < m_length + m_offset; i++)
yield return m_data[i];
}
#endregion
#region IEnumerable implementation
IEnumerator IEnumerable.GetEnumerator() {
return GetEnumerator();
}
#endregion
}
cin
Promises rewritten, added improved version of AsyncQueue
r119 #region IEnumerable implementation
class Enumerator : IEnumerator<T> {
Chunk m_current;
int m_pos = -1;
public Enumerator(Chunk fisrt) {
m_current = fisrt;
}
#region IEnumerator implementation
public bool MoveNext() {
if (m_current == null)
return false;
if (m_pos == -1)
m_pos = m_current.Low;
else
m_pos++;
cin
fixed AsyncQueue iterator
r127
cin
Promises rewritten, added improved version of AsyncQueue
r119 if (m_pos == m_current.Hi) {
cin
fixed AsyncQueue iterator
r127
m_current = m_pos == m_current.Size ? m_current.next : null;
cin
Promises rewritten, added improved version of AsyncQueue
r119 m_pos = 0;
cin
fixed AsyncQueue iterator
r127
if (m_current == null)
return false;
cin
Promises rewritten, added improved version of AsyncQueue
r119 }
return true;
}
public void Reset() {
throw new NotSupportedException();
}
object IEnumerator.Current {
get {
return Current;
}
}
#endregion
#region IDisposable implementation
public void Dispose() {
}
#endregion
#region IEnumerator implementation
public T Current {
get {
if (m_pos == -1 || m_current == null)
throw new InvalidOperationException();
return m_current.GetAt(m_pos);
}
}
#endregion
}
public IEnumerator<T> GetEnumerator() {
return new Enumerator(m_first);
}
#endregion
#region IEnumerable implementation
IEnumerator IEnumerable.GetEnumerator() {
return GetEnumerator();
}
#endregion
}
}