using System; using System.Threading; namespace Implab.Parallels { public class BlockingQueue : AsyncQueue { readonly object m_lock = new object(); public void EnqueuePulse(T value) { base.Enqueue(value); lock (m_lock) Monitor.Pulse(m_lock); } public void EnqueueRangePulse(T[] data, int offset, int length) { base.EnqueueRange(data, offset, length); if (length > 1) lock (m_lock) Monitor.PulseAll(m_lock); else lock (m_lock) Monitor.Pulse(m_lock); } public T GetItem(int timeout) { T item; if (!TryDequeue(out item)) { var t1 = Environment.TickCount; var dt = timeout; lock (m_lock) { while (!TryDequeue(out item)) { if (!Monitor.Wait(m_lock, dt)) throw new TimeoutException(); if (timeout >= 0) { dt = timeout - Environment.TickCount + t1; if (dt < 0) throw new TimeoutException(); } } } } return item; } public T GetItem() { T item; if (!TryDequeue(out item)) lock (m_lock) { while (!TryDequeue(out item)) Monitor.Wait(m_lock); } return item; } public T[] GetRange(int max, int timeout) { Safe.ArgumentInRange(max, 1, Int32.MaxValue, "max"); var buffer = new T[max]; int actual; if (!TryDequeueRange(buffer, 0, max, out actual)) { var t1 = Environment.TickCount; var dt = timeout; lock (m_lock) { while (!TryDequeueRange(buffer, 0, max, out actual)) { if (!Monitor.Wait(m_lock, dt)) throw new TimeoutException(); if (timeout >= 0) { dt = timeout - Environment.TickCount + t1; if (dt < 0) throw new TimeoutException(); } } } } var data = new T[actual]; Array.Copy(buffer, data, actual); return data; } public T[] GetRange(int max) { Safe.ArgumentInRange(max, 1, Int32.MaxValue, "max"); var buffer = new T[max]; int actual; if (!TryDequeueRange(buffer, 0, max, out actual)) lock (m_lock) while (!TryDequeueRange(buffer, 0, max, out actual)) Monitor.Wait(m_lock); var data = new T[actual]; Array.Copy(buffer, data, actual); return data; } } }