SimpleAsyncQueue.cs
115 lines
| 3.8 KiB
| text/x-csharp
|
CSharpLexer
cin
|
r289 | using System.Threading; | ||
using System.Collections.Generic; | ||||
using System; | ||||
using System.Collections; | ||||
namespace Implab.Parallels { | ||||
/// <summary> | ||||
/// Very simple thred-safe FIFO queue based on the sinle linked list. | ||||
/// </summary> | ||||
/// <typeparam name="T"></typeparam> | ||||
/// <remarks> | ||||
/// This queue uses interlocked operations to add and remove nodes, | ||||
/// each node stores a single value. The queue provides mean performance, | ||||
/// moderate overhead and situable for a small amount of elements. | ||||
/// </remarks> | ||||
public class SimpleAsyncQueue<T> : IEnumerable<T> { | ||||
class Node { | ||||
public Node(T value) { | ||||
this.value = value; | ||||
} | ||||
public readonly T value; | ||||
public volatile Node next; | ||||
} | ||||
// the reader and the writer are maintained completely independent, | ||||
// the reader can read next item when m_first.next is not null | ||||
// the writer creates a new node, moves m_last to this node and | ||||
// only after that restores the reference from the previous node | ||||
// making the reader able to read the new node. | ||||
volatile Node m_first; // position on the node which is already read | ||||
volatile Node m_last; // position on the node which is already written | ||||
public SimpleAsyncQueue() { | ||||
m_first = m_last = new Node(default(T)); | ||||
} | ||||
public void Enqueue(T value) { | ||||
var next = new Node(value); | ||||
// Interlocaked.CompareExchange implies Thread.MemoryBarrier(); | ||||
// to ensure that the next node is completely constructed | ||||
var last = Interlocked.Exchange(ref m_last, next); | ||||
// release-fence | ||||
last.next = next; | ||||
} | ||||
public bool TryDequeue(out T value) { | ||||
Node first = m_first; | ||||
Node next = first.next; | ||||
if (next == null) { | ||||
value = default(T); | ||||
return false; | ||||
} | ||||
var first2 = Interlocked.CompareExchange(ref m_first, next, first); | ||||
if (first != first2) { | ||||
// head is updated by someone else | ||||
SpinWait spin = new SpinWait(); | ||||
do { | ||||
first = first2; | ||||
next = first.next; | ||||
if (next == null) { | ||||
value = default(T); | ||||
return false; | ||||
} | ||||
first2 = Interlocked.CompareExchange(ref m_first, next, first); | ||||
if (first == first2) | ||||
break; | ||||
spin.SpinOnce(); | ||||
} while (true); | ||||
} | ||||
value = next.value; | ||||
return true; | ||||
} | ||||
/// <summary> | ||||
/// Creates a thin copy of the current linked list. | ||||
/// </summary> | ||||
/// <remarks>Iterating over the snapshot is thread safe and | ||||
/// will produce repeatble results. Each snapshot stores only | ||||
/// two references one for the first and one for last elements | ||||
/// from list. | ||||
/// <returns>Enumerable collection.</returns> | ||||
public IEnumerable<T> Snapshot() { | ||||
var first = m_first; | ||||
var last = m_last; | ||||
var current = m_first; | ||||
while(current != m_last) { | ||||
current = current.next; | ||||
yield return current.value; | ||||
} | ||||
} | ||||
public IEnumerator<T> GetEnumerator() { | ||||
for (var current = m_first.next; current != null; current = current.next) { | ||||
yield return current.value; | ||||
} | ||||
} | ||||
IEnumerator IEnumerable.GetEnumerator() { | ||||
return GetEnumerator(); | ||||
} | ||||
} | ||||
} | ||||