# HG changeset patch
# User cin
# Date 2015-02-16 14:48:39
# Node ID 238e15580926ee5d0388c090e6e1602f20f68456
# Parent e9e7940c7d98aab2fe347fd4fd25e8ad5fe60a19
added the blocking queue
diff --git a/Implab/Implab.csproj b/Implab/Implab.csproj
--- a/Implab/Implab.csproj
+++ b/Implab/Implab.csproj
@@ -155,6 +155,7 @@
+
diff --git a/Implab/Parallels/AsyncQueue.cs b/Implab/Parallels/AsyncQueue.cs
--- a/Implab/Parallels/AsyncQueue.cs
+++ b/Implab/Parallels/AsyncQueue.cs
@@ -158,7 +158,7 @@ namespace Implab.Parallels {
/// Adds the specified value to the queue.
///
/// Tha value which will be added to the queue.
- public void Enqueue(T value) {
+ public virtual void Enqueue(T value) {
var last = m_last;
// spin wait to the new chunk
bool extend = true;
@@ -184,7 +184,7 @@ namespace Implab.Parallels {
/// The buffer which contains the data to be enqueued.
/// The offset of the data in the buffer.
/// The size of the data to read from the buffer.
- public void EnqueueRange(T[] data, int offset, int length) {
+ public virtual void EnqueueRange(T[] data, int offset, int length) {
if (data == null)
throw new ArgumentNullException("data");
if (length == 0)
diff --git a/Implab/Parallels/BlockingQueue.cs b/Implab/Parallels/BlockingQueue.cs
new file mode 100644
--- /dev/null
+++ b/Implab/Parallels/BlockingQueue.cs
@@ -0,0 +1,87 @@
+using System;
+using System.Threading;
+
+namespace Implab.Parallels {
+ public class BlockingQueue : AsyncQueue {
+ readonly object m_lock = new object();
+
+ public override void Enqueue(T value) {
+ base.Enqueue(value);
+ lock (m_lock)
+ Monitor.Pulse(m_lock);
+ }
+
+ public override void EnqueueRange(T[] data, int offset, int length) {
+ base.EnqueueRange(data, offset, length);
+ if (length > 1)
+ lock (m_lock)
+ Monitor.PulseAll(m_lock);
+ else
+ lock (m_lock)
+ Monitor.Pulse(m_lock);
+ }
+
+ public T GetItem(int timeout) {
+ T item;
+ var t1 = Environment.TickCount;
+ var dt = timeout;
+ while (!TryDequeue(out item)) {
+ lock (m_lock)
+ if (!Monitor.Wait(m_lock, dt))
+ throw new TimeoutException();
+ if (timeout >= 0) {
+ dt = timeout - Environment.TickCount + t1;
+ if (dt < 0)
+ throw new TimeoutException();
+ }
+ }
+ return item;
+ }
+
+ public T GetItem() {
+ T item;
+ while (!TryDequeue(out item))
+ lock (m_lock)
+ Monitor.Wait(m_lock);
+ return item;
+ }
+
+ public T[] GetRange(int max, int timeout) {
+ Safe.ArgumentInRange(max, 1, Int32.MaxValue, "max");
+
+ var buffer = new T[max];
+ int actual;
+ var t1 = Environment.TickCount;
+ var dt = timeout;
+ while (!TryDequeueRange(buffer,0,max,out actual)) {
+ lock (m_lock)
+ if (!Monitor.Wait(m_lock, dt))
+ throw new TimeoutException();
+ if (timeout >= 0) {
+ dt = timeout - Environment.TickCount + t1;
+ if (dt < 0)
+ throw new TimeoutException();
+ }
+ }
+
+ var data = new T[actual];
+ Array.Copy(buffer, data, actual);
+ return data;
+ }
+
+ public T[] GetRange(int max) {
+ Safe.ArgumentInRange(max, 1, Int32.MaxValue, "max");
+
+ var buffer = new T[max];
+ int actual;
+ while (!TryDequeueRange(buffer, 0, max, out actual))
+ lock (m_lock)
+ Monitor.Wait(m_lock);
+
+ var data = new T[actual];
+ Array.Copy(buffer, data, actual);
+ return data;
+ }
+ }
+}
+