AsyncQueue.cs
271 lines
| 7.7 KiB
| text/x-csharp
|
CSharpLexer
cin
|
r119 | using System.Threading; | ||
using System.Collections.Generic; | ||||
using System; | ||||
using System.Collections; | ||||
namespace Implab.Parallels { | ||||
public class AsyncQueue<T> : IEnumerable<T> { | ||||
class Chunk { | ||||
public Chunk next; | ||||
int m_low; | ||||
int m_hi; | ||||
int m_alloc; | ||||
readonly int m_size; | ||||
readonly T[] m_data; | ||||
public Chunk(int size) { | ||||
m_size = size; | ||||
m_data = new T[size]; | ||||
} | ||||
public Chunk(int size, T value) { | ||||
m_size = size; | ||||
m_hi = 1; | ||||
m_alloc = 1; | ||||
m_data = new T[size]; | ||||
m_data[0] = value; | ||||
} | ||||
public int Low { | ||||
get { return m_low; } | ||||
} | ||||
public int Hi { | ||||
get { return m_hi; } | ||||
} | ||||
public bool TryEnqueue(T value,out bool extend) { | ||||
cin
|
r120 | var alloc = Interlocked.Increment(ref m_alloc) - 1; | ||
cin
|
r119 | |||
cin
|
r120 | if (alloc >= m_size) { | ||
extend = alloc == m_size; | ||||
cin
|
r119 | return false; | ||
} | ||||
cin
|
r120 | |||
extend = false; | ||||
cin
|
r119 | m_data[alloc] = value; | ||
while (alloc != Interlocked.CompareExchange(ref m_hi, alloc + 1, alloc)) { | ||||
// spin wait for commit | ||||
} | ||||
return true; | ||||
} | ||||
public bool TryDequeue(out T value,out bool recycle) { | ||||
int low; | ||||
do { | ||||
low = m_low; | ||||
if (low >= m_hi) { | ||||
value = default(T); | ||||
recycle = (low == m_size); | ||||
return false; | ||||
} | ||||
} while(low != Interlocked.CompareExchange(ref m_low, low + 1, low)); | ||||
recycle = (low == m_size - 1); | ||||
value = m_data[low]; | ||||
return true; | ||||
} | ||||
cin
|
r120 | public bool TryEnqueueBatch(T[] batch, int offset, int length, out int enqueued, out bool extend) { | ||
int alloc; | ||||
int allocSize; | ||||
do { | ||||
alloc = m_alloc; | ||||
if (alloc > m_size) { | ||||
enqueued = 0; | ||||
extend = false; | ||||
return false; | ||||
} | ||||
allocSize = Math.Min(m_size - m_alloc, length); | ||||
} while(alloc != Interlocked.CompareExchange(ref m_alloc, alloc + allocSize, alloc)); | ||||
if (alloc == m_size) { | ||||
enqueued = 0; | ||||
extend = true; | ||||
return false; | ||||
} | ||||
Array.Copy(batch, offset, m_data, alloc, allocSize); | ||||
enqueued = allocSize; | ||||
extend = false; | ||||
while (alloc != Interlocked.CompareExchange(ref m_hi, alloc + allocSize, alloc)) { | ||||
// spin wait for commit | ||||
} | ||||
return true; | ||||
} | ||||
cin
|
r119 | public T GetAt(int pos) { | ||
return m_data[pos]; | ||||
} | ||||
} | ||||
public const int DEFAULT_CHUNK_SIZE = 32; | ||||
readonly int m_chunkSize = DEFAULT_CHUNK_SIZE; | ||||
Chunk m_first; | ||||
Chunk m_last; | ||||
public AsyncQueue() { | ||||
m_last = m_first = new Chunk(m_chunkSize); | ||||
} | ||||
public void Enqueue(T value) { | ||||
var last = m_last; | ||||
// spin wait to the new chunk | ||||
bool extend = true; | ||||
while(last == null || !last.TryEnqueue(value, out extend)) { | ||||
// try to extend queue | ||||
if (extend || last == null) { | ||||
var chunk = new Chunk(m_chunkSize, value); | ||||
if (EnqueueChunk(last, chunk)) | ||||
break; | ||||
last = m_last; | ||||
} else { | ||||
while (last != m_last) { | ||||
Thread.MemoryBarrier(); | ||||
last = m_last; | ||||
} | ||||
} | ||||
} | ||||
} | ||||
public bool TryDequeue(out T value) { | ||||
var chunk = m_first; | ||||
bool recycle; | ||||
while (chunk != null) { | ||||
var result = chunk.TryDequeue(out value, out recycle); | ||||
if (recycle) // this chunk is waste | ||||
RecycleFirstChunk(chunk); | ||||
else | ||||
return result; // this chunk is usable and returned actual result | ||||
if (result) // this chunk is waste but the true result is always actual | ||||
return true; | ||||
// try again | ||||
chunk = m_first; | ||||
} | ||||
// the queue is empty | ||||
value = default(T); | ||||
return false; | ||||
} | ||||
bool EnqueueChunk(Chunk last, Chunk chunk) { | ||||
if (Interlocked.CompareExchange(ref m_last, chunk, last) != last) | ||||
return false; | ||||
if (last != null) | ||||
last.next = chunk; | ||||
else | ||||
m_first = chunk; | ||||
return true; | ||||
} | ||||
void RecycleFirstChunk(Chunk first) { | ||||
var next = first.next; | ||||
if (next == null) { | ||||
// looks like this is the last chunk | ||||
if (first != Interlocked.CompareExchange(ref m_last, null, first)) { | ||||
// race | ||||
// maybe someone already recycled this chunk | ||||
// or a new chunk has been appedned to the queue | ||||
return; // give up | ||||
} | ||||
// the tail is updated | ||||
} | ||||
// we need to update the head | ||||
Interlocked.CompareExchange(ref m_first, next, first); | ||||
// if the head is already updated then give up | ||||
return; | ||||
} | ||||
#region IEnumerable implementation | ||||
class Enumerator : IEnumerator<T> { | ||||
Chunk m_current; | ||||
int m_pos = -1; | ||||
public Enumerator(Chunk fisrt) { | ||||
m_current = fisrt; | ||||
} | ||||
#region IEnumerator implementation | ||||
public bool MoveNext() { | ||||
if (m_current == null) | ||||
return false; | ||||
if (m_pos == -1) | ||||
m_pos = m_current.Low; | ||||
else | ||||
m_pos++; | ||||
if (m_pos == m_current.Hi) { | ||||
m_pos = 0; | ||||
m_current = m_current.next; | ||||
} | ||||
return true; | ||||
} | ||||
public void Reset() { | ||||
throw new NotSupportedException(); | ||||
} | ||||
object IEnumerator.Current { | ||||
get { | ||||
return Current; | ||||
} | ||||
} | ||||
#endregion | ||||
#region IDisposable implementation | ||||
public void Dispose() { | ||||
} | ||||
#endregion | ||||
#region IEnumerator implementation | ||||
public T Current { | ||||
get { | ||||
if (m_pos == -1 || m_current == null) | ||||
throw new InvalidOperationException(); | ||||
return m_current.GetAt(m_pos); | ||||
} | ||||
} | ||||
#endregion | ||||
} | ||||
public IEnumerator<T> GetEnumerator() { | ||||
return new Enumerator(m_first); | ||||
} | ||||
#endregion | ||||
#region IEnumerable implementation | ||||
IEnumerator IEnumerable.GetEnumerator() { | ||||
return GetEnumerator(); | ||||
} | ||||
#endregion | ||||
} | ||||
} | ||||