BlockingQueue.cs
101 lines
| 3.0 KiB
| text/x-csharp
|
CSharpLexer
|
|
r137 | using System; | ||
| using System.Threading; | ||||
| namespace Implab.Parallels { | ||||
| public class BlockingQueue<T> : AsyncQueue<T> { | ||||
| readonly object m_lock = new object(); | ||||
|
|
r233 | public void EnqueuePulse(T value) { | ||
|
|
r137 | base.Enqueue(value); | ||
| lock (m_lock) | ||||
| Monitor.Pulse(m_lock); | ||||
| } | ||||
|
|
r233 | public void EnqueueRangePulse(T[] data, int offset, int length) { | ||
|
|
r137 | base.EnqueueRange(data, offset, length); | ||
| if (length > 1) | ||||
| lock (m_lock) | ||||
| Monitor.PulseAll(m_lock); | ||||
| else | ||||
| lock (m_lock) | ||||
| Monitor.Pulse(m_lock); | ||||
| } | ||||
| public T GetItem(int timeout) { | ||||
| T item; | ||||
|
|
r139 | |||
| if (!TryDequeue(out item)) { | ||||
| var t1 = Environment.TickCount; | ||||
| var dt = timeout; | ||||
| lock (m_lock) { | ||||
| while (!TryDequeue(out item)) { | ||||
| if (!Monitor.Wait(m_lock, dt)) | ||||
| throw new TimeoutException(); | ||||
| if (timeout >= 0) { | ||||
| dt = timeout - Environment.TickCount + t1; | ||||
| if (dt < 0) | ||||
| throw new TimeoutException(); | ||||
| } | ||||
| } | ||||
|
|
r137 | } | ||
| } | ||||
| return item; | ||||
| } | ||||
| public T GetItem() { | ||||
| T item; | ||||
|
|
r139 | if (!TryDequeue(out item)) | ||
| lock (m_lock) { | ||||
| while (!TryDequeue(out item)) | ||||
| Monitor.Wait(m_lock); | ||||
| } | ||||
|
|
r137 | return item; | ||
| } | ||||
| public T[] GetRange(int max, int timeout) { | ||||
|
|
r251 | Safe.ArgumentInRange(max > 0 , nameof(max)); | ||
|
|
r137 | |||
| var buffer = new T[max]; | ||||
| int actual; | ||||
|
|
r139 | if (!TryDequeueRange(buffer, 0, max, out actual)) { | ||
| var t1 = Environment.TickCount; | ||||
| var dt = timeout; | ||||
| lock (m_lock) { | ||||
| while (!TryDequeueRange(buffer, 0, max, out actual)) { | ||||
| if (!Monitor.Wait(m_lock, dt)) | ||||
| throw new TimeoutException(); | ||||
| if (timeout >= 0) { | ||||
| dt = timeout - Environment.TickCount + t1; | ||||
| if (dt < 0) | ||||
| throw new TimeoutException(); | ||||
| } | ||||
| } | ||||
|
|
r137 | } | ||
| } | ||||
| var data = new T[actual]; | ||||
| Array.Copy(buffer, data, actual); | ||||
| return data; | ||||
| } | ||||
| public T[] GetRange(int max) { | ||||
|
|
r251 | Safe.ArgumentInRange(max > 0, nameof(max)); | ||
|
|
r137 | |||
| var buffer = new T[max]; | ||||
| int actual; | ||||
|
|
r139 | if (!TryDequeueRange(buffer, 0, max, out actual)) | ||
|
|
r137 | lock (m_lock) | ||
|
|
r139 | while (!TryDequeueRange(buffer, 0, max, out actual)) | ||
| Monitor.Wait(m_lock); | ||||
|
|
r137 | |||
| var data = new T[actual]; | ||||
| Array.Copy(buffer, data, actual); | ||||
| return data; | ||||
| } | ||||
| } | ||||
| } | ||||
