##// END OF EJS Templates
fixed blocking queue
cin -
r139:041b77711262 v2
parent child
Show More
@@ -1,87 +1,101
1 using System;
1 using System;
2 using System.Threading;
2 using System.Threading;
3
3
4 namespace Implab.Parallels {
4 namespace Implab.Parallels {
5 public class BlockingQueue<T> : AsyncQueue<T> {
5 public class BlockingQueue<T> : AsyncQueue<T> {
6 readonly object m_lock = new object();
6 readonly object m_lock = new object();
7
7
8 public override void Enqueue(T value) {
8 public override void Enqueue(T value) {
9 base.Enqueue(value);
9 base.Enqueue(value);
10 lock (m_lock)
10 lock (m_lock)
11 Monitor.Pulse(m_lock);
11 Monitor.Pulse(m_lock);
12 }
12 }
13
13
14 public override void EnqueueRange(T[] data, int offset, int length) {
14 public override void EnqueueRange(T[] data, int offset, int length) {
15 base.EnqueueRange(data, offset, length);
15 base.EnqueueRange(data, offset, length);
16 if (length > 1)
16 if (length > 1)
17 lock (m_lock)
17 lock (m_lock)
18 Monitor.PulseAll(m_lock);
18 Monitor.PulseAll(m_lock);
19 else
19 else
20 lock (m_lock)
20 lock (m_lock)
21 Monitor.Pulse(m_lock);
21 Monitor.Pulse(m_lock);
22 }
22 }
23
23
24 public T GetItem(int timeout) {
24 public T GetItem(int timeout) {
25 T item;
25 T item;
26
27 if (!TryDequeue(out item)) {
26 var t1 = Environment.TickCount;
28 var t1 = Environment.TickCount;
27 var dt = timeout;
29 var dt = timeout;
30
31 lock (m_lock) {
28 while (!TryDequeue(out item)) {
32 while (!TryDequeue(out item)) {
29 lock (m_lock)
30 if (!Monitor.Wait(m_lock, dt))
33 if (!Monitor.Wait(m_lock, dt))
31 throw new TimeoutException();
34 throw new TimeoutException();
32 if (timeout >= 0) {
35 if (timeout >= 0) {
33 dt = timeout - Environment.TickCount + t1;
36 dt = timeout - Environment.TickCount + t1;
34 if (dt < 0)
37 if (dt < 0)
35 throw new TimeoutException();
38 throw new TimeoutException();
36 }
39 }
37 }
40 }
41 }
42 }
38 return item;
43 return item;
39 }
44 }
40
45
41 public T GetItem() {
46 public T GetItem() {
42 T item;
47 T item;
48 if (!TryDequeue(out item))
49 lock (m_lock) {
43 while (!TryDequeue(out item))
50 while (!TryDequeue(out item))
44 lock (m_lock)
45 Monitor.Wait(m_lock);
51 Monitor.Wait(m_lock);
52 }
46 return item;
53 return item;
47 }
54 }
48
55
49 public T[] GetRange(int max, int timeout) {
56 public T[] GetRange(int max, int timeout) {
50 Safe.ArgumentInRange(max, 1, Int32.MaxValue, "max");
57 Safe.ArgumentInRange(max, 1, Int32.MaxValue, "max");
51
58
52 var buffer = new T[max];
59 var buffer = new T[max];
53 int actual;
60 int actual;
61 if (!TryDequeueRange(buffer, 0, max, out actual)) {
54 var t1 = Environment.TickCount;
62 var t1 = Environment.TickCount;
55 var dt = timeout;
63 var dt = timeout;
64
65 lock (m_lock) {
56 while (!TryDequeueRange(buffer,0,max,out actual)) {
66 while (!TryDequeueRange(buffer, 0, max, out actual)) {
57 lock (m_lock)
67
58 if (!Monitor.Wait(m_lock, dt))
68 if (!Monitor.Wait(m_lock, dt))
59 throw new TimeoutException();
69 throw new TimeoutException();
70
60 if (timeout >= 0) {
71 if (timeout >= 0) {
61 dt = timeout - Environment.TickCount + t1;
72 dt = timeout - Environment.TickCount + t1;
62 if (dt < 0)
73 if (dt < 0)
63 throw new TimeoutException();
74 throw new TimeoutException();
64 }
75 }
65 }
76 }
77 }
78 }
66
79
67 var data = new T[actual];
80 var data = new T[actual];
68 Array.Copy(buffer, data, actual);
81 Array.Copy(buffer, data, actual);
69 return data;
82 return data;
70 }
83 }
71
84
72 public T[] GetRange(int max) {
85 public T[] GetRange(int max) {
73 Safe.ArgumentInRange(max, 1, Int32.MaxValue, "max");
86 Safe.ArgumentInRange(max, 1, Int32.MaxValue, "max");
74
87
75 var buffer = new T[max];
88 var buffer = new T[max];
76 int actual;
89 int actual;
90 if (!TryDequeueRange(buffer, 0, max, out actual))
91 lock (m_lock)
77 while (!TryDequeueRange(buffer, 0, max, out actual))
92 while (!TryDequeueRange(buffer, 0, max, out actual))
78 lock (m_lock)
79 Monitor.Wait(m_lock);
93 Monitor.Wait(m_lock);
80
94
81 var data = new T[actual];
95 var data = new T[actual];
82 Array.Copy(buffer, data, actual);
96 Array.Copy(buffer, data, actual);
83 return data;
97 return data;
84 }
98 }
85 }
99 }
86 }
100 }
87
101
General Comments 0
You need to be logged in to leave comments. Login now