|
|
using System.Threading;
|
|
|
using System.Collections.Generic;
|
|
|
using System;
|
|
|
using System.Collections;
|
|
|
|
|
|
namespace Implab.Parallels {
|
|
|
public class AsyncQueue<T> : IEnumerable<T> {
|
|
|
class Chunk {
|
|
|
public Chunk next;
|
|
|
|
|
|
int m_low;
|
|
|
int m_hi;
|
|
|
int m_alloc;
|
|
|
readonly int m_size;
|
|
|
readonly T[] m_data;
|
|
|
|
|
|
public Chunk(int size) {
|
|
|
m_size = size;
|
|
|
m_data = new T[size];
|
|
|
}
|
|
|
|
|
|
public Chunk(int size, T value) {
|
|
|
m_size = size;
|
|
|
m_hi = 1;
|
|
|
m_alloc = 1;
|
|
|
m_data = new T[size];
|
|
|
m_data[0] = value;
|
|
|
}
|
|
|
|
|
|
public int Low {
|
|
|
get { return m_low; }
|
|
|
}
|
|
|
|
|
|
public int Hi {
|
|
|
get { return m_hi; }
|
|
|
}
|
|
|
|
|
|
public bool TryEnqueue(T value,out bool extend) {
|
|
|
var alloc = Interlocked.Increment(ref m_alloc) - 1;
|
|
|
|
|
|
if (alloc >= m_size) {
|
|
|
extend = alloc == m_size;
|
|
|
return false;
|
|
|
}
|
|
|
|
|
|
extend = false;
|
|
|
m_data[alloc] = value;
|
|
|
|
|
|
while (alloc != Interlocked.CompareExchange(ref m_hi, alloc + 1, alloc)) {
|
|
|
// spin wait for commit
|
|
|
}
|
|
|
return true;
|
|
|
}
|
|
|
|
|
|
public bool TryDequeue(out T value,out bool recycle) {
|
|
|
int low;
|
|
|
do {
|
|
|
low = m_low;
|
|
|
if (low >= m_hi) {
|
|
|
value = default(T);
|
|
|
recycle = (low == m_size);
|
|
|
return false;
|
|
|
}
|
|
|
} while(low != Interlocked.CompareExchange(ref m_low, low + 1, low));
|
|
|
|
|
|
recycle = (low == m_size - 1);
|
|
|
value = m_data[low];
|
|
|
|
|
|
return true;
|
|
|
}
|
|
|
|
|
|
public bool TryEnqueueBatch(T[] batch, int offset, int length, out int enqueued, out bool extend) {
|
|
|
int alloc;
|
|
|
int allocSize;
|
|
|
|
|
|
do {
|
|
|
alloc = m_alloc;
|
|
|
|
|
|
if (alloc > m_size) {
|
|
|
enqueued = 0;
|
|
|
extend = false;
|
|
|
return false;
|
|
|
}
|
|
|
|
|
|
allocSize = Math.Min(m_size - m_alloc, length);
|
|
|
} while(alloc != Interlocked.CompareExchange(ref m_alloc, alloc + allocSize, alloc));
|
|
|
|
|
|
if (alloc == m_size) {
|
|
|
enqueued = 0;
|
|
|
extend = true;
|
|
|
return false;
|
|
|
}
|
|
|
|
|
|
Array.Copy(batch, offset, m_data, alloc, allocSize);
|
|
|
enqueued = allocSize;
|
|
|
extend = false;
|
|
|
|
|
|
while (alloc != Interlocked.CompareExchange(ref m_hi, alloc + allocSize, alloc)) {
|
|
|
// spin wait for commit
|
|
|
}
|
|
|
return true;
|
|
|
}
|
|
|
|
|
|
public T GetAt(int pos) {
|
|
|
return m_data[pos];
|
|
|
}
|
|
|
}
|
|
|
|
|
|
public const int DEFAULT_CHUNK_SIZE = 32;
|
|
|
|
|
|
readonly int m_chunkSize = DEFAULT_CHUNK_SIZE;
|
|
|
|
|
|
Chunk m_first;
|
|
|
Chunk m_last;
|
|
|
|
|
|
public AsyncQueue() {
|
|
|
m_last = m_first = new Chunk(m_chunkSize);
|
|
|
}
|
|
|
|
|
|
public void Enqueue(T value) {
|
|
|
var last = m_last;
|
|
|
// spin wait to the new chunk
|
|
|
bool extend = true;
|
|
|
while(last == null || !last.TryEnqueue(value, out extend)) {
|
|
|
// try to extend queue
|
|
|
if (extend || last == null) {
|
|
|
var chunk = new Chunk(m_chunkSize, value);
|
|
|
if (EnqueueChunk(last, chunk))
|
|
|
break;
|
|
|
last = m_last;
|
|
|
} else {
|
|
|
while (last != m_last) {
|
|
|
Thread.MemoryBarrier();
|
|
|
last = m_last;
|
|
|
}
|
|
|
}
|
|
|
}
|
|
|
}
|
|
|
|
|
|
public bool TryDequeue(out T value) {
|
|
|
var chunk = m_first;
|
|
|
bool recycle;
|
|
|
while (chunk != null) {
|
|
|
|
|
|
var result = chunk.TryDequeue(out value, out recycle);
|
|
|
|
|
|
if (recycle) // this chunk is waste
|
|
|
RecycleFirstChunk(chunk);
|
|
|
else
|
|
|
return result; // this chunk is usable and returned actual result
|
|
|
|
|
|
if (result) // this chunk is waste but the true result is always actual
|
|
|
return true;
|
|
|
|
|
|
// try again
|
|
|
chunk = m_first;
|
|
|
}
|
|
|
|
|
|
// the queue is empty
|
|
|
value = default(T);
|
|
|
return false;
|
|
|
}
|
|
|
|
|
|
bool EnqueueChunk(Chunk last, Chunk chunk) {
|
|
|
if (Interlocked.CompareExchange(ref m_last, chunk, last) != last)
|
|
|
return false;
|
|
|
|
|
|
if (last != null)
|
|
|
last.next = chunk;
|
|
|
else
|
|
|
m_first = chunk;
|
|
|
return true;
|
|
|
}
|
|
|
|
|
|
void RecycleFirstChunk(Chunk first) {
|
|
|
var next = first.next;
|
|
|
|
|
|
if (next == null) {
|
|
|
// looks like this is the last chunk
|
|
|
if (first != Interlocked.CompareExchange(ref m_last, null, first)) {
|
|
|
// race
|
|
|
// maybe someone already recycled this chunk
|
|
|
// or a new chunk has been appedned to the queue
|
|
|
|
|
|
return; // give up
|
|
|
}
|
|
|
// the tail is updated
|
|
|
}
|
|
|
|
|
|
// we need to update the head
|
|
|
Interlocked.CompareExchange(ref m_first, next, first);
|
|
|
// if the head is already updated then give up
|
|
|
return;
|
|
|
|
|
|
}
|
|
|
|
|
|
#region IEnumerable implementation
|
|
|
|
|
|
class Enumerator : IEnumerator<T> {
|
|
|
Chunk m_current;
|
|
|
int m_pos = -1;
|
|
|
|
|
|
public Enumerator(Chunk fisrt) {
|
|
|
m_current = fisrt;
|
|
|
}
|
|
|
|
|
|
#region IEnumerator implementation
|
|
|
|
|
|
public bool MoveNext() {
|
|
|
if (m_current == null)
|
|
|
return false;
|
|
|
|
|
|
if (m_pos == -1)
|
|
|
m_pos = m_current.Low;
|
|
|
else
|
|
|
m_pos++;
|
|
|
if (m_pos == m_current.Hi) {
|
|
|
m_pos = 0;
|
|
|
m_current = m_current.next;
|
|
|
}
|
|
|
|
|
|
return true;
|
|
|
}
|
|
|
|
|
|
public void Reset() {
|
|
|
throw new NotSupportedException();
|
|
|
}
|
|
|
|
|
|
object IEnumerator.Current {
|
|
|
get {
|
|
|
return Current;
|
|
|
}
|
|
|
}
|
|
|
|
|
|
#endregion
|
|
|
|
|
|
#region IDisposable implementation
|
|
|
|
|
|
public void Dispose() {
|
|
|
}
|
|
|
|
|
|
#endregion
|
|
|
|
|
|
#region IEnumerator implementation
|
|
|
|
|
|
public T Current {
|
|
|
get {
|
|
|
if (m_pos == -1 || m_current == null)
|
|
|
throw new InvalidOperationException();
|
|
|
return m_current.GetAt(m_pos);
|
|
|
}
|
|
|
}
|
|
|
|
|
|
#endregion
|
|
|
}
|
|
|
|
|
|
public IEnumerator<T> GetEnumerator() {
|
|
|
return new Enumerator(m_first);
|
|
|
}
|
|
|
|
|
|
#endregion
|
|
|
|
|
|
#region IEnumerable implementation
|
|
|
|
|
|
IEnumerator IEnumerable.GetEnumerator() {
|
|
|
return GetEnumerator();
|
|
|
}
|
|
|
|
|
|
#endregion
|
|
|
}
|
|
|
}
|
|
|
|