##// END OF EJS Templates
added the blocking queue
cin -
r137:238e15580926 v2
parent child
Show More
@@ -0,0 +1,87
1 using System;
2 using System.Threading;
3
4 namespace Implab.Parallels {
5 public class BlockingQueue<T> : AsyncQueue<T> {
6 readonly object m_lock = new object();
7
8 public override void Enqueue(T value) {
9 base.Enqueue(value);
10 lock (m_lock)
11 Monitor.Pulse(m_lock);
12 }
13
14 public override void EnqueueRange(T[] data, int offset, int length) {
15 base.EnqueueRange(data, offset, length);
16 if (length > 1)
17 lock (m_lock)
18 Monitor.PulseAll(m_lock);
19 else
20 lock (m_lock)
21 Monitor.Pulse(m_lock);
22 }
23
24 public T GetItem(int timeout) {
25 T item;
26 var t1 = Environment.TickCount;
27 var dt = timeout;
28 while (!TryDequeue(out item)) {
29 lock (m_lock)
30 if (!Monitor.Wait(m_lock, dt))
31 throw new TimeoutException();
32 if (timeout >= 0) {
33 dt = timeout - Environment.TickCount + t1;
34 if (dt < 0)
35 throw new TimeoutException();
36 }
37 }
38 return item;
39 }
40
41 public T GetItem() {
42 T item;
43 while (!TryDequeue(out item))
44 lock (m_lock)
45 Monitor.Wait(m_lock);
46 return item;
47 }
48
49 public T[] GetRange(int max, int timeout) {
50 Safe.ArgumentInRange(max, 1, Int32.MaxValue, "max");
51
52 var buffer = new T[max];
53 int actual;
54 var t1 = Environment.TickCount;
55 var dt = timeout;
56 while (!TryDequeueRange(buffer,0,max,out actual)) {
57 lock (m_lock)
58 if (!Monitor.Wait(m_lock, dt))
59 throw new TimeoutException();
60 if (timeout >= 0) {
61 dt = timeout - Environment.TickCount + t1;
62 if (dt < 0)
63 throw new TimeoutException();
64 }
65 }
66
67 var data = new T[actual];
68 Array.Copy(buffer, data, actual);
69 return data;
70 }
71
72 public T[] GetRange(int max) {
73 Safe.ArgumentInRange(max, 1, Int32.MaxValue, "max");
74
75 var buffer = new T[max];
76 int actual;
77 while (!TryDequeueRange(buffer, 0, max, out actual))
78 lock (m_lock)
79 Monitor.Wait(m_lock);
80
81 var data = new T[actual];
82 Array.Copy(buffer, data, actual);
83 return data;
84 }
85 }
86 }
87
@@ -155,6 +155,7
155 155 <Compile Include="Parallels\SharedLock.cs" />
156 156 <Compile Include="Diagnostics\ILogWriter.cs" />
157 157 <Compile Include="Diagnostics\ListenerBase.cs" />
158 <Compile Include="Parallels\BlockingQueue.cs" />
158 159 </ItemGroup>
159 160 <Import Project="$(MSBuildBinPath)\Microsoft.CSharp.targets" />
160 161 <ItemGroup />
@@ -158,7 +158,7 namespace Implab.Parallels {
158 158 /// Adds the specified value to the queue.
159 159 /// </summary>
160 160 /// <param name="value">Tha value which will be added to the queue.</param>
161 public void Enqueue(T value) {
161 public virtual void Enqueue(T value) {
162 162 var last = m_last;
163 163 // spin wait to the new chunk
164 164 bool extend = true;
@@ -184,7 +184,7 namespace Implab.Parallels {
184 184 /// <param name="data">The buffer which contains the data to be enqueued.</param>
185 185 /// <param name="offset">The offset of the data in the buffer.</param>
186 186 /// <param name="length">The size of the data to read from the buffer.</param>
187 public void EnqueueRange(T[] data, int offset, int length) {
187 public virtual void EnqueueRange(T[] data, int offset, int length) {
188 188 if (data == null)
189 189 throw new ArgumentNullException("data");
190 190 if (length == 0)
General Comments 0
You need to be logged in to leave comments. Login now