BlockingQueue.cs
101 lines
| 3.1 KiB
| text/x-csharp
|
CSharpLexer
cin
|
r137 | using System; | ||
using System.Threading; | ||||
namespace Implab.Parallels { | ||||
public class BlockingQueue<T> : AsyncQueue<T> { | ||||
readonly object m_lock = new object(); | ||||
cin
|
r233 | public void EnqueuePulse(T value) { | ||
cin
|
r137 | base.Enqueue(value); | ||
lock (m_lock) | ||||
Monitor.Pulse(m_lock); | ||||
} | ||||
cin
|
r233 | public void EnqueueRangePulse(T[] data, int offset, int length) { | ||
cin
|
r137 | base.EnqueueRange(data, offset, length); | ||
if (length > 1) | ||||
lock (m_lock) | ||||
Monitor.PulseAll(m_lock); | ||||
else | ||||
lock (m_lock) | ||||
Monitor.Pulse(m_lock); | ||||
} | ||||
public T GetItem(int timeout) { | ||||
T item; | ||||
cin
|
r139 | |||
if (!TryDequeue(out item)) { | ||||
var t1 = Environment.TickCount; | ||||
var dt = timeout; | ||||
lock (m_lock) { | ||||
while (!TryDequeue(out item)) { | ||||
if (!Monitor.Wait(m_lock, dt)) | ||||
throw new TimeoutException(); | ||||
if (timeout >= 0) { | ||||
dt = timeout - Environment.TickCount + t1; | ||||
if (dt < 0) | ||||
throw new TimeoutException(); | ||||
} | ||||
} | ||||
cin
|
r137 | } | ||
} | ||||
return item; | ||||
} | ||||
public T GetItem() { | ||||
T item; | ||||
cin
|
r139 | if (!TryDequeue(out item)) | ||
lock (m_lock) { | ||||
while (!TryDequeue(out item)) | ||||
Monitor.Wait(m_lock); | ||||
} | ||||
cin
|
r137 | return item; | ||
} | ||||
public T[] GetRange(int max, int timeout) { | ||||
Safe.ArgumentInRange(max, 1, Int32.MaxValue, "max"); | ||||
var buffer = new T[max]; | ||||
int actual; | ||||
cin
|
r139 | if (!TryDequeueRange(buffer, 0, max, out actual)) { | ||
var t1 = Environment.TickCount; | ||||
var dt = timeout; | ||||
lock (m_lock) { | ||||
while (!TryDequeueRange(buffer, 0, max, out actual)) { | ||||
if (!Monitor.Wait(m_lock, dt)) | ||||
throw new TimeoutException(); | ||||
if (timeout >= 0) { | ||||
dt = timeout - Environment.TickCount + t1; | ||||
if (dt < 0) | ||||
throw new TimeoutException(); | ||||
} | ||||
} | ||||
cin
|
r137 | } | ||
} | ||||
var data = new T[actual]; | ||||
Array.Copy(buffer, data, actual); | ||||
return data; | ||||
} | ||||
public T[] GetRange(int max) { | ||||
Safe.ArgumentInRange(max, 1, Int32.MaxValue, "max"); | ||||
var buffer = new T[max]; | ||||
int actual; | ||||
cin
|
r139 | if (!TryDequeueRange(buffer, 0, max, out actual)) | ||
cin
|
r137 | lock (m_lock) | ||
cin
|
r139 | while (!TryDequeueRange(buffer, 0, max, out actual)) | ||
Monitor.Wait(m_lock); | ||||
cin
|
r137 | |||
var data = new T[actual]; | ||||
Array.Copy(buffer, data, actual); | ||||
return data; | ||||
} | ||||
} | ||||
} | ||||