diff --git a/Implab/Implab.csproj b/Implab/Implab.csproj --- a/Implab/Implab.csproj +++ b/Implab/Implab.csproj @@ -155,6 +155,7 @@ + diff --git a/Implab/Parallels/AsyncQueue.cs b/Implab/Parallels/AsyncQueue.cs --- a/Implab/Parallels/AsyncQueue.cs +++ b/Implab/Parallels/AsyncQueue.cs @@ -158,7 +158,7 @@ namespace Implab.Parallels { /// Adds the specified value to the queue. /// /// Tha value which will be added to the queue. - public void Enqueue(T value) { + public virtual void Enqueue(T value) { var last = m_last; // spin wait to the new chunk bool extend = true; @@ -184,7 +184,7 @@ namespace Implab.Parallels { /// The buffer which contains the data to be enqueued. /// The offset of the data in the buffer. /// The size of the data to read from the buffer. - public void EnqueueRange(T[] data, int offset, int length) { + public virtual void EnqueueRange(T[] data, int offset, int length) { if (data == null) throw new ArgumentNullException("data"); if (length == 0) diff --git a/Implab/Parallels/BlockingQueue.cs b/Implab/Parallels/BlockingQueue.cs new file mode 100644 --- /dev/null +++ b/Implab/Parallels/BlockingQueue.cs @@ -0,0 +1,87 @@ +using System; +using System.Threading; + +namespace Implab.Parallels { + public class BlockingQueue : AsyncQueue { + readonly object m_lock = new object(); + + public override void Enqueue(T value) { + base.Enqueue(value); + lock (m_lock) + Monitor.Pulse(m_lock); + } + + public override void EnqueueRange(T[] data, int offset, int length) { + base.EnqueueRange(data, offset, length); + if (length > 1) + lock (m_lock) + Monitor.PulseAll(m_lock); + else + lock (m_lock) + Monitor.Pulse(m_lock); + } + + public T GetItem(int timeout) { + T item; + var t1 = Environment.TickCount; + var dt = timeout; + while (!TryDequeue(out item)) { + lock (m_lock) + if (!Monitor.Wait(m_lock, dt)) + throw new TimeoutException(); + if (timeout >= 0) { + dt = timeout - Environment.TickCount + t1; + if (dt < 0) + throw new TimeoutException(); + } + } + return item; + } + + public T GetItem() { + T item; + while (!TryDequeue(out item)) + lock (m_lock) + Monitor.Wait(m_lock); + return item; + } + + public T[] GetRange(int max, int timeout) { + Safe.ArgumentInRange(max, 1, Int32.MaxValue, "max"); + + var buffer = new T[max]; + int actual; + var t1 = Environment.TickCount; + var dt = timeout; + while (!TryDequeueRange(buffer,0,max,out actual)) { + lock (m_lock) + if (!Monitor.Wait(m_lock, dt)) + throw new TimeoutException(); + if (timeout >= 0) { + dt = timeout - Environment.TickCount + t1; + if (dt < 0) + throw new TimeoutException(); + } + } + + var data = new T[actual]; + Array.Copy(buffer, data, actual); + return data; + } + + public T[] GetRange(int max) { + Safe.ArgumentInRange(max, 1, Int32.MaxValue, "max"); + + var buffer = new T[max]; + int actual; + while (!TryDequeueRange(buffer, 0, max, out actual)) + lock (m_lock) + Monitor.Wait(m_lock); + + var data = new T[actual]; + Array.Copy(buffer, data, actual); + return data; + } + } +} +