|
|
using System;
|
|
|
using System.Threading;
|
|
|
|
|
|
namespace Implab.Parallels {
|
|
|
public class BlockingQueue<T> : AsyncQueue<T> {
|
|
|
readonly object m_lock = new object();
|
|
|
|
|
|
public void EnqueuePulse(T value) {
|
|
|
base.Enqueue(value);
|
|
|
lock (m_lock)
|
|
|
Monitor.Pulse(m_lock);
|
|
|
}
|
|
|
|
|
|
public void EnqueueRangePulse(T[] data, int offset, int length) {
|
|
|
base.EnqueueRange(data, offset, length);
|
|
|
if (length > 1)
|
|
|
lock (m_lock)
|
|
|
Monitor.PulseAll(m_lock);
|
|
|
else
|
|
|
lock (m_lock)
|
|
|
Monitor.Pulse(m_lock);
|
|
|
}
|
|
|
|
|
|
public T GetItem(int timeout) {
|
|
|
T item;
|
|
|
|
|
|
if (!TryDequeue(out item)) {
|
|
|
var t1 = Environment.TickCount;
|
|
|
var dt = timeout;
|
|
|
|
|
|
lock (m_lock) {
|
|
|
while (!TryDequeue(out item)) {
|
|
|
if (!Monitor.Wait(m_lock, dt))
|
|
|
throw new TimeoutException();
|
|
|
if (timeout >= 0) {
|
|
|
dt = timeout - Environment.TickCount + t1;
|
|
|
if (dt < 0)
|
|
|
throw new TimeoutException();
|
|
|
}
|
|
|
}
|
|
|
}
|
|
|
}
|
|
|
return item;
|
|
|
}
|
|
|
|
|
|
public T GetItem() {
|
|
|
T item;
|
|
|
if (!TryDequeue(out item))
|
|
|
lock (m_lock) {
|
|
|
while (!TryDequeue(out item))
|
|
|
Monitor.Wait(m_lock);
|
|
|
}
|
|
|
return item;
|
|
|
}
|
|
|
|
|
|
public T[] GetRange(int max, int timeout) {
|
|
|
Safe.ArgumentInRange(max, 1, Int32.MaxValue, "max");
|
|
|
|
|
|
var buffer = new T[max];
|
|
|
int actual;
|
|
|
if (!TryDequeueRange(buffer, 0, max, out actual)) {
|
|
|
var t1 = Environment.TickCount;
|
|
|
var dt = timeout;
|
|
|
|
|
|
lock (m_lock) {
|
|
|
while (!TryDequeueRange(buffer, 0, max, out actual)) {
|
|
|
|
|
|
if (!Monitor.Wait(m_lock, dt))
|
|
|
throw new TimeoutException();
|
|
|
|
|
|
if (timeout >= 0) {
|
|
|
dt = timeout - Environment.TickCount + t1;
|
|
|
if (dt < 0)
|
|
|
throw new TimeoutException();
|
|
|
}
|
|
|
}
|
|
|
}
|
|
|
}
|
|
|
|
|
|
var data = new T[actual];
|
|
|
Array.Copy(buffer, data, actual);
|
|
|
return data;
|
|
|
}
|
|
|
|
|
|
public T[] GetRange(int max) {
|
|
|
Safe.ArgumentInRange(max, 1, Int32.MaxValue, "max");
|
|
|
|
|
|
var buffer = new T[max];
|
|
|
int actual;
|
|
|
if (!TryDequeueRange(buffer, 0, max, out actual))
|
|
|
lock (m_lock)
|
|
|
while (!TryDequeueRange(buffer, 0, max, out actual))
|
|
|
Monitor.Wait(m_lock);
|
|
|
|
|
|
var data = new T[actual];
|
|
|
Array.Copy(buffer, data, actual);
|
|
|
return data;
|
|
|
}
|
|
|
}
|
|
|
}
|
|
|
|
|
|
|