AsyncQueue.cs
623 lines
| 20.3 KiB
| text/x-csharp
|
CSharpLexer
cin
|
r119 | using System.Threading; | ||
using System.Collections.Generic; | ||||
using System; | ||||
using System.Collections; | ||||
cin
|
r124 | using System.Diagnostics; | ||
cin
|
r119 | |||
namespace Implab.Parallels { | ||||
public class AsyncQueue<T> : IEnumerable<T> { | ||||
class Chunk { | ||||
public Chunk next; | ||||
int m_low; | ||||
int m_hi; | ||||
int m_alloc; | ||||
readonly int m_size; | ||||
readonly T[] m_data; | ||||
public Chunk(int size) { | ||||
m_size = size; | ||||
m_data = new T[size]; | ||||
} | ||||
public Chunk(int size, T value) { | ||||
m_size = size; | ||||
m_hi = 1; | ||||
m_alloc = 1; | ||||
m_data = new T[size]; | ||||
m_data[0] = value; | ||||
} | ||||
cin
|
r121 | public Chunk(int size, T[] data, int offset, int length, int alloc) { | ||
m_size = size; | ||||
m_hi = length; | ||||
m_alloc = alloc; | ||||
m_data = new T[size]; | ||||
Array.Copy(data, offset, m_data, 0, length); | ||||
} | ||||
cin
|
r119 | public int Low { | ||
get { return m_low; } | ||||
} | ||||
public int Hi { | ||||
get { return m_hi; } | ||||
} | ||||
cin
|
r127 | public int Size { | ||
get { return m_size; } | ||||
} | ||||
cin
|
r121 | public bool TryEnqueue(T value, out bool extend) { | ||
cin
|
r120 | var alloc = Interlocked.Increment(ref m_alloc) - 1; | ||
cin
|
r119 | |||
cin
|
r120 | if (alloc >= m_size) { | ||
extend = alloc == m_size; | ||||
cin
|
r119 | return false; | ||
} | ||||
cin
|
r120 | |||
extend = false; | ||||
cin
|
r119 | m_data[alloc] = value; | ||
while (alloc != Interlocked.CompareExchange(ref m_hi, alloc + 1, alloc)) { | ||||
// spin wait for commit | ||||
} | ||||
return true; | ||||
} | ||||
cin
|
r124 | /// <summary> | ||
/// Prevents from allocating new space in the chunk and waits for all write operations to complete | ||||
/// </summary> | ||||
public void Commit() { | ||||
var actual = Math.Min(Interlocked.Exchange(ref m_alloc, m_size + 1), m_size); | ||||
while (m_hi != actual) | ||||
Thread.MemoryBarrier(); | ||||
} | ||||
cin
|
r121 | public bool TryDequeue(out T value, out bool recycle) { | ||
cin
|
r119 | int low; | ||
do { | ||||
low = m_low; | ||||
if (low >= m_hi) { | ||||
value = default(T); | ||||
recycle = (low == m_size); | ||||
return false; | ||||
} | ||||
} while(low != Interlocked.CompareExchange(ref m_low, low + 1, low)); | ||||
recycle = (low == m_size - 1); | ||||
value = m_data[low]; | ||||
return true; | ||||
} | ||||
cin
|
r120 | public bool TryEnqueueBatch(T[] batch, int offset, int length, out int enqueued, out bool extend) { | ||
cin
|
r122 | //int alloc; | ||
//int allocSize; | ||||
cin
|
r120 | |||
cin
|
r122 | var alloc = Interlocked.Add(ref m_alloc, length) - length; | ||
if (alloc > m_size) { | ||||
// the chunk is full and someone already | ||||
// creating the new one | ||||
enqueued = 0; // nothing was added | ||||
extend = false; // the caller shouldn't try to extend the queue | ||||
return false; // nothing was added | ||||
} | ||||
cin
|
r120 | |||
cin
|
r122 | enqueued = Math.Min(m_size - alloc, length); | ||
extend = length > enqueued; | ||||
cin
|
r120 | |||
cin
|
r122 | if (enqueued == 0) | ||
cin
|
r120 | return false; | ||
cin
|
r122 | Array.Copy(batch, offset, m_data, alloc, enqueued); | ||
while (alloc != Interlocked.CompareExchange(ref m_hi, alloc + enqueued, alloc)) { | ||||
cin
|
r120 | // spin wait for commit | ||
} | ||||
cin
|
r122 | |||
cin
|
r120 | return true; | ||
} | ||||
cin
|
r121 | public bool TryDequeueBatch(T[] buffer, int offset, int length,out int dequeued, out bool recycle) { | ||
int low, hi, batchSize; | ||||
do { | ||||
low = m_low; | ||||
hi = m_hi; | ||||
if (low >= hi) { | ||||
dequeued = 0; | ||||
recycle = (low == m_size); // recycling could be restarted and we need to signal again | ||||
return false; | ||||
} | ||||
batchSize = Math.Min(hi - low, length); | ||||
} while(low != Interlocked.CompareExchange(ref m_low, low + batchSize, low)); | ||||
recycle = (low == m_size - batchSize); | ||||
dequeued = batchSize; | ||||
Array.Copy(m_data, low, buffer, offset, batchSize); | ||||
return true; | ||||
} | ||||
cin
|
r119 | public T GetAt(int pos) { | ||
return m_data[pos]; | ||||
} | ||||
} | ||||
public const int DEFAULT_CHUNK_SIZE = 32; | ||||
cin
|
r121 | public const int MAX_CHUNK_SIZE = 262144; | ||
cin
|
r119 | |||
Chunk m_first; | ||||
Chunk m_last; | ||||
cin
|
r121 | /// <summary> | ||
/// Adds the specified value to the queue. | ||||
/// </summary> | ||||
/// <param name="value">Tha value which will be added to the queue.</param> | ||||
cin
|
r137 | public virtual void Enqueue(T value) { | ||
cin
|
r119 | var last = m_last; | ||
// spin wait to the new chunk | ||||
bool extend = true; | ||||
cin
|
r121 | while (last == null || !last.TryEnqueue(value, out extend)) { | ||
cin
|
r119 | // try to extend queue | ||
if (extend || last == null) { | ||||
cin
|
r125 | var chunk = new Chunk(DEFAULT_CHUNK_SIZE, value); | ||
cin
|
r119 | if (EnqueueChunk(last, chunk)) | ||
cin
|
r122 | break; // success! exit! | ||
cin
|
r119 | last = m_last; | ||
} else { | ||||
cin
|
r121 | while (last == m_last) { | ||
cin
|
r119 | Thread.MemoryBarrier(); | ||
} | ||||
cin
|
r121 | last = m_last; | ||
cin
|
r119 | } | ||
} | ||||
} | ||||
cin
|
r121 | /// <summary> | ||
/// Adds the specified data to the queue. | ||||
/// </summary> | ||||
/// <param name="data">The buffer which contains the data to be enqueued.</param> | ||||
/// <param name="offset">The offset of the data in the buffer.</param> | ||||
/// <param name="length">The size of the data to read from the buffer.</param> | ||||
cin
|
r137 | public virtual void EnqueueRange(T[] data, int offset, int length) { | ||
cin
|
r121 | if (data == null) | ||
throw new ArgumentNullException("data"); | ||||
cin
|
r130 | if (length == 0) | ||
return; | ||||
cin
|
r121 | if (offset < 0) | ||
throw new ArgumentOutOfRangeException("offset"); | ||||
if (length < 1 || offset + length > data.Length) | ||||
throw new ArgumentOutOfRangeException("length"); | ||||
var last = m_last; | ||||
bool extend; | ||||
int enqueued; | ||||
while (length > 0) { | ||||
extend = true; | ||||
if (last != null && last.TryEnqueueBatch(data, offset, length, out enqueued, out extend)) { | ||||
length -= enqueued; | ||||
offset += enqueued; | ||||
} | ||||
if (extend) { | ||||
// there was no enough space in the chunk | ||||
// or there was no chunks in the queue | ||||
while (length > 0) { | ||||
var size = Math.Min(length, MAX_CHUNK_SIZE); | ||||
var chunk = new Chunk( | ||||
cin
|
r125 | Math.Max(size, DEFAULT_CHUNK_SIZE), | ||
cin
|
r121 | data, | ||
offset, | ||||
size, | ||||
length // length >= size | ||||
); | ||||
if (!EnqueueChunk(last, chunk)) { | ||||
// looks like the queue has been updated then proceed from the beginning | ||||
last = m_last; | ||||
break; | ||||
} | ||||
// we have successfully added the new chunk | ||||
last = chunk; | ||||
length -= size; | ||||
offset += size; | ||||
} | ||||
} else { | ||||
// we don't need to extend the queue, if we successfully enqueued data | ||||
if (length == 0) | ||||
break; | ||||
// if we need to wait while someone is extending the queue | ||||
// spinwait | ||||
while (last == m_last) { | ||||
Thread.MemoryBarrier(); | ||||
} | ||||
last = m_last; | ||||
} | ||||
} | ||||
} | ||||
/// <summary> | ||||
/// Tries to retrieve the first element from the queue. | ||||
/// </summary> | ||||
/// <returns><c>true</c>, if element is dequeued, <c>false</c> otherwise.</returns> | ||||
/// <param name="value">The value of the dequeued element.</param> | ||||
cin
|
r119 | public bool TryDequeue(out T value) { | ||
var chunk = m_first; | ||||
bool recycle; | ||||
while (chunk != null) { | ||||
var result = chunk.TryDequeue(out value, out recycle); | ||||
if (recycle) // this chunk is waste | ||||
RecycleFirstChunk(chunk); | ||||
else | ||||
return result; // this chunk is usable and returned actual result | ||||
if (result) // this chunk is waste but the true result is always actual | ||||
return true; | ||||
// try again | ||||
chunk = m_first; | ||||
} | ||||
// the queue is empty | ||||
value = default(T); | ||||
return false; | ||||
} | ||||
cin
|
r121 | /// <summary> | ||
/// Tries to dequeue the specified amount of data from the queue. | ||||
/// </summary> | ||||
/// <returns><c>true</c>, if data was deuqueued, <c>false</c> otherwise.</returns> | ||||
/// <param name="buffer">The buffer to which the data will be written.</param> | ||||
/// <param name="offset">The offset in the buffer at which the data will be written.</param> | ||||
/// <param name="length">The maximum amount of data to be retrieved.</param> | ||||
/// <param name="dequeued">The actual amout of the retrieved data.</param> | ||||
public bool TryDequeueRange(T[] buffer, int offset, int length, out int dequeued) { | ||||
if (buffer == null) | ||||
throw new ArgumentNullException("buffer"); | ||||
if (offset < 0) | ||||
throw new ArgumentOutOfRangeException("offset"); | ||||
if (length < 1 || offset + length > buffer.Length) | ||||
throw new ArgumentOutOfRangeException("length"); | ||||
var chunk = m_first; | ||||
bool recycle; | ||||
dequeued = 0; | ||||
while (chunk != null) { | ||||
int actual; | ||||
if (chunk.TryDequeueBatch(buffer, offset, length, out actual, out recycle)) { | ||||
offset += actual; | ||||
length -= actual; | ||||
dequeued += actual; | ||||
} | ||||
if (recycle) // this chunk is waste | ||||
RecycleFirstChunk(chunk); | ||||
else if (actual == 0) | ||||
break; // the chunk is usable, but doesn't contain any data (it's the last chunk in the queue) | ||||
if (length == 0) | ||||
return true; | ||||
// we still may dequeue something | ||||
// try again | ||||
chunk = m_first; | ||||
} | ||||
return dequeued != 0; | ||||
} | ||||
/// <summary> | ||||
/// Tries to dequeue all remaining data in the first chunk. | ||||
/// </summary> | ||||
/// <returns><c>true</c>, if data was dequeued, <c>false</c> otherwise.</returns> | ||||
cin
|
r122 | /// <param name="buffer">The buffer to which the data will be written.</param> | ||
cin
|
r121 | /// <param name="offset">The offset in the buffer at which the data will be written.</param> | ||
/// <param name="length">Tha maximum amount of the data to be dequeued.</param> | ||||
/// <param name="dequeued">The actual amount of the dequeued data.</param> | ||||
public bool TryDequeueChunk(T[] buffer, int offset, int length, out int dequeued) { | ||||
if (buffer == null) | ||||
throw new ArgumentNullException("buffer"); | ||||
if (offset < 0) | ||||
throw new ArgumentOutOfRangeException("offset"); | ||||
if (length < 1 || offset + length > buffer.Length) | ||||
throw new ArgumentOutOfRangeException("length"); | ||||
var chunk = m_first; | ||||
bool recycle; | ||||
dequeued = 0; | ||||
while (chunk != null) { | ||||
int actual; | ||||
if (chunk.TryDequeueBatch(buffer, offset, length, out actual, out recycle)) { | ||||
dequeued = actual; | ||||
} | ||||
if (recycle) // this chunk is waste | ||||
RecycleFirstChunk(chunk); | ||||
// if we have dequeued any data, then return | ||||
if (dequeued != 0) | ||||
return true; | ||||
// we still may dequeue something | ||||
// try again | ||||
chunk = m_first; | ||||
} | ||||
return false; | ||||
} | ||||
cin
|
r119 | bool EnqueueChunk(Chunk last, Chunk chunk) { | ||
if (Interlocked.CompareExchange(ref m_last, chunk, last) != last) | ||||
return false; | ||||
if (last != null) | ||||
last.next = chunk; | ||||
cin
|
r124 | else { | ||
cin
|
r119 | m_first = chunk; | ||
cin
|
r124 | } | ||
cin
|
r119 | return true; | ||
} | ||||
void RecycleFirstChunk(Chunk first) { | ||||
var next = first.next; | ||||
cin
|
r124 | if (first != Interlocked.CompareExchange(ref m_first, next, first)) | ||
return; | ||||
cin
|
r119 | if (next == null) { | ||
cin
|
r124 | |||
cin
|
r119 | if (first != Interlocked.CompareExchange(ref m_last, null, first)) { | ||
cin
|
r124 | |||
cin
|
r119 | // race | ||
cin
|
r124 | // someone already updated the tail, restore the pointer to the queue head | ||
m_first = first; | ||||
cin
|
r119 | } | ||
// the tail is updated | ||||
} | ||||
} | ||||
cin
|
r123 | public void Clear() { | ||
// start the new queue | ||||
cin
|
r125 | var chunk = new Chunk(DEFAULT_CHUNK_SIZE); | ||
cin
|
r124 | |||
do { | ||||
Thread.MemoryBarrier(); | ||||
var first = m_first; | ||||
var last = m_last; | ||||
if (last == null) // nothing to clear | ||||
return; | ||||
cin
|
r123 | |||
cin
|
r124 | if (first == null || (first.next == null && first != last)) // inconcistency | ||
continue; | ||||
// here we will create inconsistency which will force others to spin | ||||
// and prevent from fetching. chunk.next = null | ||||
if (first != Interlocked.CompareExchange(ref m_first, chunk, first)) | ||||
continue;// inconsistent | ||||
m_last = chunk; | ||||
return; | ||||
} while(true); | ||||
cin
|
r123 | } | ||
public T[] Drain() { | ||||
// start the new queue | ||||
cin
|
r125 | var chunk = new Chunk(DEFAULT_CHUNK_SIZE); | ||
cin
|
r123 | |||
do { | ||||
cin
|
r124 | Thread.MemoryBarrier(); | ||
var first = m_first; | ||||
var last = m_last; | ||||
if (last == null) | ||||
return new T[0]; | ||||
if (first == null || (first.next == null && first != last)) | ||||
continue; | ||||
cin
|
r123 | |||
cin
|
r124 | // here we will create inconsistency which will force others to spin | ||
// and prevent from fetching. chunk.next = null | ||||
if (first != Interlocked.CompareExchange(ref m_first, chunk, first)) | ||||
continue;// inconsistent | ||||
cin
|
r123 | |||
cin
|
r124 | last = Interlocked.Exchange(ref m_last, chunk); | ||
return ReadChunks(first, last); | ||||
} while(true); | ||||
cin
|
r123 | } | ||
cin
|
r128 | static T[] ReadChunks(Chunk chunk, object last) { | ||
cin
|
r123 | var result = new List<T>(); | ||
cin
|
r125 | var buffer = new T[DEFAULT_CHUNK_SIZE]; | ||
cin
|
r123 | int actual; | ||
bool recycle; | ||||
while (chunk != null) { | ||||
cin
|
r124 | // ensure all write operations on the chunk are complete | ||
chunk.Commit(); | ||||
cin
|
r123 | // we need to read the chunk using this way | ||
// since some client still may completing the dequeue | ||||
// operation, such clients most likely won't get results | ||||
while (chunk.TryDequeueBatch(buffer, 0, buffer.Length, out actual, out recycle)) | ||||
result.AddRange(new ArraySegmentCollection(buffer, 0, actual)); | ||||
cin
|
r124 | if (chunk == last) { | ||
chunk = null; | ||||
} else { | ||||
while (chunk.next == null) | ||||
Thread.MemoryBarrier(); | ||||
chunk = chunk.next; | ||||
} | ||||
cin
|
r123 | } | ||
return result.ToArray(); | ||||
} | ||||
struct ArraySegmentCollection : ICollection<T> { | ||||
readonly T[] m_data; | ||||
readonly int m_offset; | ||||
readonly int m_length; | ||||
public ArraySegmentCollection(T[] data, int offset, int length) { | ||||
m_data = data; | ||||
m_offset = offset; | ||||
m_length = length; | ||||
} | ||||
#region ICollection implementation | ||||
public void Add(T item) { | ||||
cin
|
r129 | throw new NotSupportedException(); | ||
cin
|
r123 | } | ||
public void Clear() { | ||||
cin
|
r129 | throw new NotSupportedException(); | ||
cin
|
r123 | } | ||
public bool Contains(T item) { | ||||
return false; | ||||
} | ||||
public void CopyTo(T[] array, int arrayIndex) { | ||||
Array.Copy(m_data,m_offset,array,arrayIndex, m_length); | ||||
} | ||||
public bool Remove(T item) { | ||||
cin
|
r129 | throw new NotSupportedException(); | ||
cin
|
r123 | } | ||
public int Count { | ||||
get { | ||||
return m_length; | ||||
} | ||||
} | ||||
public bool IsReadOnly { | ||||
get { | ||||
return true; | ||||
} | ||||
} | ||||
#endregion | ||||
#region IEnumerable implementation | ||||
public IEnumerator<T> GetEnumerator() { | ||||
for (int i = m_offset; i < m_length + m_offset; i++) | ||||
yield return m_data[i]; | ||||
} | ||||
#endregion | ||||
#region IEnumerable implementation | ||||
IEnumerator IEnumerable.GetEnumerator() { | ||||
return GetEnumerator(); | ||||
} | ||||
#endregion | ||||
} | ||||
cin
|
r119 | #region IEnumerable implementation | ||
class Enumerator : IEnumerator<T> { | ||||
Chunk m_current; | ||||
int m_pos = -1; | ||||
public Enumerator(Chunk fisrt) { | ||||
m_current = fisrt; | ||||
} | ||||
#region IEnumerator implementation | ||||
public bool MoveNext() { | ||||
if (m_current == null) | ||||
return false; | ||||
if (m_pos == -1) | ||||
m_pos = m_current.Low; | ||||
else | ||||
m_pos++; | ||||
cin
|
r127 | |||
cin
|
r119 | if (m_pos == m_current.Hi) { | ||
cin
|
r127 | |||
m_current = m_pos == m_current.Size ? m_current.next : null; | ||||
cin
|
r119 | m_pos = 0; | ||
cin
|
r127 | |||
if (m_current == null) | ||||
return false; | ||||
cin
|
r119 | } | ||
return true; | ||||
} | ||||
public void Reset() { | ||||
throw new NotSupportedException(); | ||||
} | ||||
object IEnumerator.Current { | ||||
get { | ||||
return Current; | ||||
} | ||||
} | ||||
#endregion | ||||
#region IDisposable implementation | ||||
public void Dispose() { | ||||
} | ||||
#endregion | ||||
#region IEnumerator implementation | ||||
public T Current { | ||||
get { | ||||
if (m_pos == -1 || m_current == null) | ||||
throw new InvalidOperationException(); | ||||
return m_current.GetAt(m_pos); | ||||
} | ||||
} | ||||
#endregion | ||||
} | ||||
public IEnumerator<T> GetEnumerator() { | ||||
return new Enumerator(m_first); | ||||
} | ||||
#endregion | ||||
#region IEnumerable implementation | ||||
IEnumerator IEnumerable.GetEnumerator() { | ||||
return GetEnumerator(); | ||||
} | ||||
#endregion | ||||
} | ||||
} | ||||