SimpleAsyncQueue.cs
115 lines
| 3.8 KiB
| text/x-csharp
|
CSharpLexer
|
|
r233 | using System.Threading; | ||
| using System.Collections.Generic; | ||||
| using System; | ||||
| using System.Collections; | ||||
| namespace Implab.Parallels { | ||||
|
|
r285 | |||
| /// <summary> | ||||
| /// Very simple thred-safe FIFO queue based on the sinle linked list. | ||||
| /// </summary> | ||||
| /// <typeparam name="T"></typeparam> | ||||
| /// <remarks> | ||||
| /// This queue uses interlocked operations to add and remove nodes, | ||||
| /// each node stores a single value. The queue provides mean performance, | ||||
| /// moderate overhead and situable for a small amount of elements. | ||||
| /// </remarks> | ||||
|
|
r233 | public class SimpleAsyncQueue<T> : IEnumerable<T> { | ||
| class Node { | ||||
| public Node(T value) { | ||||
| this.value = value; | ||||
| } | ||||
| public readonly T value; | ||||
| public volatile Node next; | ||||
| } | ||||
|
|
r285 | // the reader and the writer are maintained completely independent, | ||
|
|
r233 | // the reader can read next item when m_first.next is not null | ||
|
|
r242 | // the writer creates a new node, moves m_last to this node and | ||
|
|
r233 | // only after that restores the reference from the previous node | ||
|
|
r285 | // making the reader able to read the new node. | ||
|
|
r233 | |||
|
|
r242 | volatile Node m_first; // position on the node which is already read | ||
| volatile Node m_last; // position on the node which is already written | ||||
|
|
r233 | |||
| public SimpleAsyncQueue() { | ||||
| m_first = m_last = new Node(default(T)); | ||||
| } | ||||
| public void Enqueue(T value) { | ||||
| var next = new Node(value); | ||||
| // Interlocaked.CompareExchange implies Thread.MemoryBarrier(); | ||||
| // to ensure that the next node is completely constructed | ||||
| var last = Interlocked.Exchange(ref m_last, next); | ||||
| // release-fence | ||||
| last.next = next; | ||||
|
|
r242 | |||
|
|
r233 | } | ||
| public bool TryDequeue(out T value) { | ||||
|
|
r285 | Node first = m_first; | ||
| Node next = first.next; | ||||
|
|
r242 | |||
| if (next == null) { | ||||
| value = default(T); | ||||
| return false; | ||||
| } | ||||
| var first2 = Interlocked.CompareExchange(ref m_first, next, first); | ||||
| if (first != first2) { | ||||
| // head is updated by someone else | ||||
|
|
r233 | |||
|
|
r242 | SpinWait spin = new SpinWait(); | ||
| do { | ||||
| first = first2; | ||||
| next = first.next; | ||||
| if (next == null) { | ||||
| value = default(T); | ||||
| return false; | ||||
| } | ||||
| first2 = Interlocked.CompareExchange(ref m_first, next, first); | ||||
| if (first == first2) | ||||
| break; | ||||
| spin.SpinOnce(); | ||||
| } while (true); | ||||
| } | ||||
|
|
r233 | |||
| value = next.value; | ||||
| return true; | ||||
| } | ||||
|
|
r285 | /// <summary> | ||
| /// Creates a thin copy of the current linked list. | ||||
| /// </summary> | ||||
| /// <remarks>Iterating over the snapshot is thread safe and | ||||
| /// will produce repeatble results. Each snapshot stores only | ||||
| /// two references one for the first and one for last elements | ||||
| /// from list. | ||||
| /// <returns>Enumerable collection.</returns> | ||||
| public IEnumerable<T> Snapshot() { | ||||
| var first = m_first; | ||||
| var last = m_last; | ||||
|
|
r233 | |||
|
|
r285 | var current = m_first; | ||
| while(current != m_last) { | ||||
| current = current.next; | ||||
| yield return current.value; | ||||
|
|
r233 | } | ||
| } | ||||
| public IEnumerator<T> GetEnumerator() { | ||||
|
|
r285 | for (var current = m_first.next; current != null; current = current.next) { | ||
| yield return current.value; | ||||
| } | ||||
|
|
r233 | } | ||
| IEnumerator IEnumerable.GetEnumerator() { | ||||
| return GetEnumerator(); | ||||
| } | ||||
| } | ||||
| } | ||||
