##// END OF EJS Templates
missing file
missing file

File last commit:

r251:7c7e9ad6fe4a v3
r258:d0876436d95d v3
Show More
BlockingQueue.cs
101 lines | 3.0 KiB | text/x-csharp | CSharpLexer
cin
added the blocking queue
r137 using System;
using System.Threading;
namespace Implab.Parallels {
public class BlockingQueue<T> : AsyncQueue<T> {
readonly object m_lock = new object();
cin
Improved AsyncQueue...
r233 public void EnqueuePulse(T value) {
cin
added the blocking queue
r137 base.Enqueue(value);
lock (m_lock)
Monitor.Pulse(m_lock);
}
cin
Improved AsyncQueue...
r233 public void EnqueueRangePulse(T[] data, int offset, int length) {
cin
added the blocking queue
r137 base.EnqueueRange(data, offset, length);
if (length > 1)
lock (m_lock)
Monitor.PulseAll(m_lock);
else
lock (m_lock)
Monitor.Pulse(m_lock);
}
public T GetItem(int timeout) {
T item;
cin
fixed blocking queue
r139
if (!TryDequeue(out item)) {
var t1 = Environment.TickCount;
var dt = timeout;
lock (m_lock) {
while (!TryDequeue(out item)) {
if (!Monitor.Wait(m_lock, dt))
throw new TimeoutException();
if (timeout >= 0) {
dt = timeout - Environment.TickCount + t1;
if (dt < 0)
throw new TimeoutException();
}
}
cin
added the blocking queue
r137 }
}
return item;
}
public T GetItem() {
T item;
cin
fixed blocking queue
r139 if (!TryDequeue(out item))
lock (m_lock) {
while (!TryDequeue(out item))
Monitor.Wait(m_lock);
}
cin
added the blocking queue
r137 return item;
}
public T[] GetRange(int max, int timeout) {
cin
Prerelease version of RunnableComponent...
r251 Safe.ArgumentInRange(max > 0 , nameof(max));
cin
added the blocking queue
r137
var buffer = new T[max];
int actual;
cin
fixed blocking queue
r139 if (!TryDequeueRange(buffer, 0, max, out actual)) {
var t1 = Environment.TickCount;
var dt = timeout;
lock (m_lock) {
while (!TryDequeueRange(buffer, 0, max, out actual)) {
if (!Monitor.Wait(m_lock, dt))
throw new TimeoutException();
if (timeout >= 0) {
dt = timeout - Environment.TickCount + t1;
if (dt < 0)
throw new TimeoutException();
}
}
cin
added the blocking queue
r137 }
}
var data = new T[actual];
Array.Copy(buffer, data, actual);
return data;
}
public T[] GetRange(int max) {
cin
Prerelease version of RunnableComponent...
r251 Safe.ArgumentInRange(max > 0, nameof(max));
cin
added the blocking queue
r137
var buffer = new T[max];
int actual;
cin
fixed blocking queue
r139 if (!TryDequeueRange(buffer, 0, max, out actual))
cin
added the blocking queue
r137 lock (m_lock)
cin
fixed blocking queue
r139 while (!TryDequeueRange(buffer, 0, max, out actual))
Monitor.Wait(m_lock);
cin
added the blocking queue
r137
var data = new T[actual];
Array.Copy(buffer, data, actual);
return data;
}
}
}