MTQueue.cs
143 lines
| 4.2 KiB
| text/x-csharp
|
CSharpLexer
cin
|
r93 | using System.Threading; | ||
cin
|
r97 | using System.Collections.Generic; | ||
using System; | ||||
using System.Collections; | ||||
cin
|
r14 | |||
namespace Implab.Parallels { | ||||
cin
|
r97 | public class MTQueue<T> : IEnumerable<T> { | ||
cin
|
r14 | class Node { | ||
public Node(T value) { | ||||
this.value = value; | ||||
} | ||||
public readonly T value; | ||||
public Node next; | ||||
} | ||||
Node m_first; | ||||
Node m_last; | ||||
public void Enqueue(T value) { | ||||
cin
|
r80 | Thread.MemoryBarrier(); | ||
cin
|
r14 | var last = m_last; | ||
var next = new Node(value); | ||||
cin
|
r97 | // Interlocaked.CompareExchange implies Thread.MemoryBarrier(); | ||
// to ensure that the next node is completely constructed | ||||
cin
|
r14 | while (last != Interlocked.CompareExchange(ref m_last, next, last)) | ||
last = m_last; | ||||
if (last != null) | ||||
last.next = next; | ||||
else | ||||
m_first = next; | ||||
} | ||||
public bool TryDequeue(out T value) { | ||||
Node first; | ||||
cin
|
r93 | Node next; | ||
cin
|
r14 | value = default(T); | ||
cin
|
r80 | Thread.MemoryBarrier(); | ||
cin
|
r14 | do { | ||
first = m_first; | ||||
if (first == null) | ||||
return false; | ||||
next = first.next; | ||||
if (next == null) { | ||||
// this is the last element, | ||||
cin
|
r19 | // then try to update the tail | ||
cin
|
r14 | if (first != Interlocked.CompareExchange(ref m_last, null, first)) { | ||
cin
|
r71 | // this is the race condition | ||
cin
|
r14 | if (m_last == null) | ||
cin
|
r19 | // the queue is empty | ||
cin
|
r14 | return false; | ||
cin
|
r71 | // tail has been changed, we need to restart | ||
cin
|
r14 | continue; | ||
} | ||||
// tail succesfully updated and first.next will never be changed | ||||
cin
|
r71 | // other readers will fail due to inconsistency m_last != m_fist && m_first.next == null | ||
// however the parallel writer may update the m_first since the m_last is null | ||||
cin
|
r14 | |||
cin
|
r71 | // so we need to fix inconsistency by setting m_first to null or if it has been | ||
// updated by the writer already then we should just to give up | ||||
cin
|
r14 | Interlocked.CompareExchange(ref m_first, null, first); | ||
break; | ||||
} | ||||
cin
|
r93 | if (first == Interlocked.CompareExchange(ref m_first, next, first)) | ||
// head succesfully updated | ||||
break; | ||||
cin
|
r14 | } while (true); | ||
value = first.value; | ||||
return true; | ||||
} | ||||
cin
|
r97 | |||
#region IEnumerable implementation | ||||
class Enumerator : IEnumerator<T> { | ||||
Node m_current; | ||||
Node m_first; | ||||
public Enumerator(Node first) { | ||||
m_first = first; | ||||
} | ||||
#region IEnumerator implementation | ||||
public bool MoveNext() { | ||||
m_current = m_current == null ? m_first : m_current.next; | ||||
return m_current != null; | ||||
} | ||||
public void Reset() { | ||||
m_current = null; | ||||
} | ||||
cin
|
r98 | object IEnumerator.Current { | ||
cin
|
r97 | get { | ||
if (m_current == null) | ||||
throw new InvalidOperationException(); | ||||
return m_current.value; | ||||
} | ||||
} | ||||
#endregion | ||||
#region IDisposable implementation | ||||
public void Dispose() { | ||||
} | ||||
#endregion | ||||
#region IEnumerator implementation | ||||
public T Current { | ||||
get { | ||||
if (m_current == null) | ||||
throw new InvalidOperationException(); | ||||
return m_current.value; | ||||
} | ||||
} | ||||
#endregion | ||||
} | ||||
public IEnumerator<T> GetEnumerator() { | ||||
return new Enumerator(m_first); | ||||
} | ||||
#endregion | ||||
#region IEnumerable implementation | ||||
IEnumerator IEnumerable.GetEnumerator() { | ||||
return GetEnumerator(); | ||||
} | ||||
#endregion | ||||
cin
|
r14 | } | ||
} | ||||