##// END OF EJS Templates
Improved AsyncQueue...
cin -
r233:d6fe09f5592c v2
parent child
Show More
@@ -0,0 +1,131
1 using System.Threading;
2 using System.Collections.Generic;
3 using System;
4 using System.Collections;
5
6 namespace Implab.Parallels {
7 public class SimpleAsyncQueue<T> : IEnumerable<T> {
8 class Node {
9 public Node(T value) {
10 this.value = value;
11 }
12 public readonly T value;
13 public volatile Node next;
14 }
15
16 // the reader and the writer are mainteined completely independent,
17 // the reader can read next item when m_first.next is not null
18 // the writer creates the a new node, moves m_last to this node and
19 // only after that restores the reference from the previous node
20 // making available the reader to read the new node.
21
22 Node m_first; // position on the node which is already read
23 Node m_last; // position on the node which is already written
24
25 public SimpleAsyncQueue() {
26 m_first = m_last = new Node(default(T));
27 }
28
29 public void Enqueue(T value) {
30 var next = new Node(value);
31
32 // Interlocaked.CompareExchange implies Thread.MemoryBarrier();
33 // to ensure that the next node is completely constructed
34 var last = Interlocked.Exchange(ref m_last, next);
35
36 // release-fence
37 last.next = next;
38
39 }
40
41 public bool TryDequeue(out T value) {
42 Node first;
43 Node next;
44
45 Thread.MemoryBarrier(); // ensure m_first is fresh
46 SpinWait spin = new SpinWait();
47 do {
48 first = m_first;
49 // aquire-fence
50 next = first.next;
51 if (next == null) {
52 value = default(T);
53 return false;
54 }
55
56 if (first == Interlocked.CompareExchange(ref m_first, next, first))
57 // head succesfully updated
58 break;
59 spin.SpinOnce();
60 } while (true);
61
62 value = next.value;
63 return true;
64 }
65
66 #region IEnumerable implementation
67
68 class Enumerator : IEnumerator<T> {
69 Node m_current;
70 Node m_first;
71
72 public Enumerator(Node first) {
73 m_first = first;
74 }
75
76 #region IEnumerator implementation
77
78 public bool MoveNext() {
79 m_current = m_current == null ? m_first : m_current.next;
80 return m_current != null;
81 }
82
83 public void Reset() {
84 m_current = null;
85 }
86
87 object IEnumerator.Current {
88 get {
89 if (m_current == null)
90 throw new InvalidOperationException();
91 return m_current.value;
92 }
93 }
94
95 #endregion
96
97 #region IDisposable implementation
98
99 public void Dispose() {
100 }
101
102 #endregion
103
104 #region IEnumerator implementation
105
106 public T Current {
107 get {
108 if (m_current == null)
109 throw new InvalidOperationException();
110 return m_current.value;
111 }
112 }
113
114 #endregion
115 }
116
117 public IEnumerator<T> GetEnumerator() {
118 return new Enumerator(m_first);
119 }
120
121 #endregion
122
123 #region IEnumerable implementation
124
125 IEnumerator IEnumerable.GetEnumerator() {
126 return GetEnumerator();
127 }
128
129 #endregion
130 }
131 }
@@ -1,6 +1,6
1 <?xml version="1.0" encoding="utf-8" ?>
1 <?xml version="1.0" encoding="utf-8"?>
2 <configuration>
2 <configuration>
3 <startup>
3 <startup>
4 <supportedRuntime version="v4.0" sku=".NETFramework,Version=v4.5.2" />
4 <supportedRuntime version="v4.0" sku=".NETFramework,Version=v4.5"/>
5 </startup>
5 </startup>
6 </configuration> No newline at end of file
6 </configuration>
@@ -9,9 +9,10
9 <AppDesignerFolder>Properties</AppDesignerFolder>
9 <AppDesignerFolder>Properties</AppDesignerFolder>
10 <RootNamespace>Implab.Playground</RootNamespace>
10 <RootNamespace>Implab.Playground</RootNamespace>
11 <AssemblyName>Implab.Playground</AssemblyName>
11 <AssemblyName>Implab.Playground</AssemblyName>
12 <TargetFrameworkVersion>v4.5.2</TargetFrameworkVersion>
12 <TargetFrameworkVersion>v4.5</TargetFrameworkVersion>
13 <FileAlignment>512</FileAlignment>
13 <FileAlignment>512</FileAlignment>
14 <AutoGenerateBindingRedirects>true</AutoGenerateBindingRedirects>
14 <AutoGenerateBindingRedirects>true</AutoGenerateBindingRedirects>
15 <TargetFrameworkProfile />
15 </PropertyGroup>
16 </PropertyGroup>
16 <PropertyGroup Condition=" '$(Configuration)|$(Platform)' == 'Debug|AnyCPU' ">
17 <PropertyGroup Condition=" '$(Configuration)|$(Platform)' == 'Debug|AnyCPU' ">
17 <PlatformTarget>AnyCPU</PlatformTarget>
18 <PlatformTarget>AnyCPU</PlatformTarget>
@@ -1,10 +1,13
1 using Implab.Formats.Json;
1 using Implab.Formats.Json;
2 using Implab.Parallels;
2 using Implab.Xml;
3 using Implab.Xml;
3 using System;
4 using System;
5 using System.Collections.Concurrent;
4 using System.Collections.Generic;
6 using System.Collections.Generic;
5 using System.IO;
7 using System.IO;
6 using System.Linq;
8 using System.Linq;
7 using System.Text;
9 using System.Text;
10 using System.Threading;
8 using System.Threading.Tasks;
11 using System.Threading.Tasks;
9 using System.Xml;
12 using System.Xml;
10 using System.Xml.Serialization;
13 using System.Xml.Serialization;
@@ -12,29 +15,167 using System.Xml.Serialization;
12 namespace Implab.Playground {
15 namespace Implab.Playground {
13 public class Program {
16 public class Program {
14
17
15 [XmlRoot(Namespace = "XmlSimpleData")]
18 static void EnqueueRange<T>(ConcurrentQueue<T> q, T[] data, int offset, int len) {
16 public class XmlSimpleModel {
19 for (var i = offset; i < offset + len; i++)
17 [XmlElement]
20 q.Enqueue(data[i]);
18 public string Name { get; set; }
21 }
22
23 static bool TryDequeueRange<T>(ConcurrentQueue<T> q,T[] buffer,int offset, int len, out int actual) {
24 actual = 0;
25 T res;
26 while(q.TryDequeue(out res)) {
27 buffer[offset + actual] = res;
28 actual++;
29 if (actual == len)
30 break;
31 }
32 return actual != 0;
33 }
34
35 static void EnqueueRange<T>(SimpleAsyncQueue<T> q, T[] data, int offset, int len) {
36 for (var i = offset; i < offset + len; i++)
37 q.Enqueue(data[i]);
38 }
19
39
20 [XmlElement]
40 static bool TryDequeueRange<T>(SimpleAsyncQueue<T> q, T[] buffer, int offset, int len, out int actual) {
21 public int Order { get; set; }
41 actual = 0;
42 T res;
43 while (q.TryDequeue(out res)) {
44 buffer[offset + actual] = res;
45 actual++;
46 if (actual == len)
47 break;
48 }
49 return actual != 0;
50 }
22
51
23 [XmlElement]
52 /*
24 public string[] Items { get; set; }
53 static void EnqueueRange<T>(AsyncQueue<T> q, T[] data, int offset, int len) {
54 for (var i = offset; i < offset + len; i++)
55 q.Enqueue(data[i]);
56 }
25
57
58 static bool TryDequeueRange<T>(AsyncQueue<T> q, T[] buffer, int offset, int len, out int actual) {
59 actual = 0;
60 T res;
61 while (q.TryDequeue(out res)) {
62 buffer[offset + actual] = res;
63 actual++;
64 if (actual == len)
65 break;
66 }
67 return actual != 0;
68 }
69 */
70
71 static void EnqueueRange<T>(AsyncQueue<T> q, T[] data, int offset, int len) {
72 q.EnqueueRange(data, offset, len);
26 }
73 }
27
74
75 static bool TryDequeueRange<T>(AsyncQueue<T> q, T[] buffer, int offset, int len, out int actual) {
76 return q.TryDequeueRange(buffer, offset, len, out actual);
77 }
78
79
28 static void Main(string[] args) {
80 static void Main(string[] args) {
29 var model = new XmlSimpleModel {
30 Name = "Tablet",
31 Order = 10,
32 Items = new string[] { "z1", "z2", "z3" }
33 };
34
81
35 var doc = SerializationHelpers.SerializeAsXmlDocument(model);
82 //var queue = new ConcurrentQueue<int>();
36
83 var queue = new AsyncQueue<int>();
37 var m2 = SerializationHelpers.DeserializeFromXmlNode<XmlSimpleModel>(doc.DocumentElement);
84 //var queue = new SimpleAsyncQueue<int>();
85
86 const int wBatch = 32;
87 const long wCount = 1000000;
88 const long total = wBatch * wCount * 3;
89
90 long r1 = 0, r2 = 0, r3 = 0;
91 const int rBatch = 1000;
92 long read = 0;
93
94 var t1 = Environment.TickCount;
95
96 AsyncPool.RunThread(
97 () => {
98 var buffer = new int[wBatch];
99 for (int i = 0; i < wBatch; i++)
100 buffer[i] = 1;
101
102 for (int i = 0; i < wCount; i++)
103 EnqueueRange(queue, buffer, 0, wBatch);
104 Console.WriteLine("done writer #1: {0} ms", Environment.TickCount - t1);
105 },
106 () => {
107 var buffer = new int[wBatch];
108 for (int i = 0; i < wBatch; i++)
109 buffer[i] = 1;
110
111 for (int i = 0; i < wCount; i++)
112 EnqueueRange(queue, buffer, 0, wBatch);
113 Console.WriteLine("done writer #2: {0} ms", Environment.TickCount - t1);
114 },
115 () => {
116 var buffer = new int[wBatch];
117 for (int i = 0; i < wBatch; i++)
118 buffer[i] = 1;
119
120 for (int i = 0; i < wCount; i++)
121 EnqueueRange(queue, buffer, 0, wBatch);
122 Console.WriteLine("done writer #3: {0} ms", Environment.TickCount - t1);
123 },
124 () => {
125 var buffer = new int[rBatch];
126
127 while (read < total) {
128 int actual;
129 if (TryDequeueRange(queue, buffer, 0, rBatch, out actual)) {
130 for (int i = 0; i < actual; i++)
131 r1 += buffer[i];
132 Interlocked.Add(ref read, actual);
133 }
134 }
135
136 Console.WriteLine("done reader #1: {0} ms", Environment.TickCount - t1);
137 }/*,
138 () => {
139 var buffer = new int[rBatch];
140
141 while (read < total) {
142 int actual;
143 if (TryDequeueRange(queue, buffer, 0, rBatch, out actual)) {
144 for (int i = 0; i < actual; i++)
145 r2 += buffer[i];
146 Interlocked.Add(ref read, actual);
147 }
148 }
149
150 Console.WriteLine("done reader #2: {0} ms", Environment.TickCount - t1);
151 }*//*,
152 () => {
153 var buffer = new int[rBatch];
154
155 while (read < total) {
156 int actual;
157 if (TryDequeueRange(queue, buffer, 0, rBatch, out actual)) {
158 for (int i = 0; i < actual; i++)
159 r3 += buffer[i];
160 Interlocked.Add(ref read, actual);
161 }
162 }
163
164 Console.WriteLine("done reader #3: {0} ms", Environment.TickCount - t1);
165 }*/
166 )
167 .PromiseAll()
168 .Join();
169
170
171 Console.WriteLine(
172 "done: {0} ms, summ#1: {1}, summ#2: {2}, total: {3}, count: {4}",
173 Environment.TickCount - t1,
174 r1,
175 r2,
176 r1 + r2 + r3,
177 total
178 );
38
179
39 Console.WriteLine("done");
180 Console.WriteLine("done");
40 }
181 }
@@ -48,8 +48,8
48 <LaunchProject>true</LaunchProject>
48 <LaunchProject>true</LaunchProject>
49 <OverrideProjectSettings>false</OverrideProjectSettings>
49 <OverrideProjectSettings>false</OverrideProjectSettings>
50 <LaunchMethod>Executable</LaunchMethod>
50 <LaunchMethod>Executable</LaunchMethod>
51 <ExecutablePath>Implab.Playground\bin\Release\Implab.Playground.exe</ExecutablePath>
51 <ExecutablePath>Implab.Playground\bin\Debug\Implab.Playground.exe</ExecutablePath>
52 <StartupDirectory>Implab.Playground\bin\Release\</StartupDirectory>
52 <StartupDirectory>Implab.Playground\bin\Debug\</StartupDirectory>
53 <Arguments>
53 <Arguments>
54 </Arguments>
54 </Arguments>
55 <NetAppHost>IIS</NetAppHost>
55 <NetAppHost>IIS</NetAppHost>
@@ -67,9 +67,4
67 <ProjName>Implab.Playground</ProjName>
67 <ProjName>Implab.Playground</ProjName>
68 </ProjBinary>
68 </ProjBinary>
69 </Binaries>
69 </Binaries>
70 <Launches>
71 <ProjBinary>
72 <Path>:PB:{100DFEB0-75BE-436F-ADDF-1F46EF433F46}|Implab.Playground\Implab.Playground.csproj</Path>
73 </ProjBinary>
74 </Launches>
75 </VSPerformanceSession> No newline at end of file
70 </VSPerformanceSession>
@@ -222,9 +222,9 namespace Implab.Test {
222
222
223 [TestMethod]
223 [TestMethod]
224 public void MTQueueTest() {
224 public void MTQueueTest() {
225 var queue = new MTQueue<int>();
225 var queue = new SimpleAsyncQueue<int>();
226 int res;
226 int res;
227
227
228 queue.Enqueue(10);
228 queue.Enqueue(10);
229 Assert.IsTrue(queue.TryDequeue(out res));
229 Assert.IsTrue(queue.TryDequeue(out res));
230 Assert.AreEqual(10, res);
230 Assert.AreEqual(10, res);
@@ -242,8 +242,9 namespace Implab.Test {
242 int readers = 0;
242 int readers = 0;
243 var stop = new ManualResetEvent(false);
243 var stop = new ManualResetEvent(false);
244 int total = 0;
244 int total = 0;
245 var ticks = Environment.TickCount;
245
246
246 const int itemsPerWriter = 10000;
247 const int itemsPerWriter = 1000000;
247 const int writersCount = 10;
248 const int writersCount = 10;
248
249
249 for (int i = 0; i < writersCount; i++) {
250 for (int i = 0; i < writersCount; i++) {
@@ -278,7 +279,9 namespace Implab.Test {
278
279
279 stop.WaitOne();
280 stop.WaitOne();
280
281
281 Assert.AreEqual(100000, total);
282 Console.WriteLine("{0} in {1}ms", total, Environment.TickCount - ticks);
283
284 Assert.AreEqual(itemsPerWriter * writersCount, total);
282 }
285 }
283
286
284 [TestMethod]
287 [TestMethod]
@@ -509,13 +512,12 namespace Implab.Test {
509 public void AsyncQueueDrainTest() {
512 public void AsyncQueueDrainTest() {
510 var queue = new AsyncQueue<int>();
513 var queue = new AsyncQueue<int>();
511
514
512 const int wBatch = 11;
515 const int wBatch = 32;
513 const int wCount = 200000;
516 const int wCount = 200000;
514 const int total = wBatch * wCount * 3;
517 const int total = wBatch * wCount * 3;
515 const int summ = wBatch * wCount * 3;
518 const int summ = wBatch * wCount * 3;
516
519
517 int r1 = 0, r2 = 0;
520 int r1 = 0, r2 = 0;
518 const int rBatch = 11;
519 int read = 0;
521 int read = 0;
520
522
521 var t1 = Environment.TickCount;
523 var t1 = Environment.TickCount;
@@ -531,8 +533,12 namespace Implab.Test {
531 Console.WriteLine("done writer #1: {0} ms", Environment.TickCount - t1);
533 Console.WriteLine("done writer #1: {0} ms", Environment.TickCount - t1);
532 },
534 },
533 () => {
535 () => {
534 for(int i =0; i < wCount * wBatch; i++)
536 var buffer = new int[wBatch];
535 queue.Enqueue(1);
537 for (int i = 0; i < wBatch; i++)
538 buffer[i] = 1;
539
540 for (int i = 0; i < wCount; i++)
541 queue.EnqueueRange(buffer, 0, wBatch);
536 Console.WriteLine("done writer #2: {0} ms", Environment.TickCount - t1);
542 Console.WriteLine("done writer #2: {0} ms", Environment.TickCount - t1);
537 },
543 },
538 () => {
544 () => {
@@ -572,25 +578,34 namespace Implab.Test {
572 },*/
578 },*/
573 () => {
579 () => {
574 var count = 0;
580 var count = 0;
575 while(read < total) {
581 int emptyDrains = 0;
582
583 while (read < total) {
576 var buffer = queue.Drain();
584 var buffer = queue.Drain();
577 for(int i=0; i< buffer.Length; i++)
585 if (buffer.Count == 0)
586 emptyDrains++;
587 for(int i=0; i< buffer.Count; i++)
578 r1 += buffer[i];
588 r1 += buffer[i];
579 Interlocked.Add(ref read, buffer.Length);
589 Interlocked.Add(ref read, buffer.Count);
580 count += buffer.Length;
590 count += buffer.Count;
581 }
591 }
582 Console.WriteLine("done reader #1: {0} ms, {1} items", Environment.TickCount - t1, count);
592 Console.WriteLine("done reader #1: {0} ms, {1} items, empty: {2}", Environment.TickCount - t1, count, emptyDrains);
583 },
593 },
584 () => {
594 () => {
585 var count = 0;
595 var count = 0;
586 while(read < total) {
596 int emptyDrains = 0;
597
598 while (read < total) {
587 var buffer = queue.Drain();
599 var buffer = queue.Drain();
588 for(int i=0; i< buffer.Length; i++)
600 if (buffer.Count == 0)
601 emptyDrains++;
602
603 for (int i=0; i< buffer.Count; i++)
589 r2 += buffer[i];
604 r2 += buffer[i];
590 Interlocked.Add(ref read, buffer.Length);
605 Interlocked.Add(ref read, buffer.Count);
591 count += buffer.Length;
606 count += buffer.Count;
592 }
607 }
593 Console.WriteLine("done reader #2: {0} ms, {1} items", Environment.TickCount - t1, count);
608 Console.WriteLine("done reader #2: {0} ms, {1} items, empty: {2}", Environment.TickCount - t1, count, emptyDrains);
594 }
609 }
595 )
610 )
596 .PromiseAll()
611 .PromiseAll()
@@ -14,8 +14,6 Project("{2150E333-8FDC-42A3-9474-1A3956
14 EndProject
14 EndProject
15 Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Implab.Test", "Implab.Test\Implab.Test.csproj", "{63F92C0C-61BF-48C0-A377-8D67C3C661D0}"
15 Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Implab.Test", "Implab.Test\Implab.Test.csproj", "{63F92C0C-61BF-48C0-A377-8D67C3C661D0}"
16 EndProject
16 EndProject
17 Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Implab.Fx", "Implab.Fx\Implab.Fx.csproj", "{06E706F8-6881-43EB-927E-FFC503AF6ABC}"
18 EndProject
19 Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Implab.Format.Test", "Implab.Format.Test\Implab.Format.Test.csproj", "{4D364996-7ECD-4193-8F90-F223FFEA49DA}"
17 Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Implab.Format.Test", "Implab.Format.Test\Implab.Format.Test.csproj", "{4D364996-7ECD-4193-8F90-F223FFEA49DA}"
20 EndProject
18 EndProject
21 Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Implab.Playground", "Implab.Playground\Implab.Playground.csproj", "{100DFEB0-75BE-436F-ADDF-1F46EF433F46}"
19 Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Implab.Playground", "Implab.Playground\Implab.Playground.csproj", "{100DFEB0-75BE-436F-ADDF-1F46EF433F46}"
@@ -47,14 +45,6 Global
47 {63F92C0C-61BF-48C0-A377-8D67C3C661D0}.Release 4.5|Any CPU.Build.0 = Release 4.5|Any CPU
45 {63F92C0C-61BF-48C0-A377-8D67C3C661D0}.Release 4.5|Any CPU.Build.0 = Release 4.5|Any CPU
48 {63F92C0C-61BF-48C0-A377-8D67C3C661D0}.Release|Any CPU.ActiveCfg = Release|Any CPU
46 {63F92C0C-61BF-48C0-A377-8D67C3C661D0}.Release|Any CPU.ActiveCfg = Release|Any CPU
49 {63F92C0C-61BF-48C0-A377-8D67C3C661D0}.Release|Any CPU.Build.0 = Release|Any CPU
47 {63F92C0C-61BF-48C0-A377-8D67C3C661D0}.Release|Any CPU.Build.0 = Release|Any CPU
50 {06E706F8-6881-43EB-927E-FFC503AF6ABC}.Debug 4.5|Any CPU.ActiveCfg = Debug 4.5|Any CPU
51 {06E706F8-6881-43EB-927E-FFC503AF6ABC}.Debug 4.5|Any CPU.Build.0 = Debug 4.5|Any CPU
52 {06E706F8-6881-43EB-927E-FFC503AF6ABC}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
53 {06E706F8-6881-43EB-927E-FFC503AF6ABC}.Debug|Any CPU.Build.0 = Debug|Any CPU
54 {06E706F8-6881-43EB-927E-FFC503AF6ABC}.Release 4.5|Any CPU.ActiveCfg = Release 4.5|Any CPU
55 {06E706F8-6881-43EB-927E-FFC503AF6ABC}.Release 4.5|Any CPU.Build.0 = Release 4.5|Any CPU
56 {06E706F8-6881-43EB-927E-FFC503AF6ABC}.Release|Any CPU.ActiveCfg = Release|Any CPU
57 {06E706F8-6881-43EB-927E-FFC503AF6ABC}.Release|Any CPU.Build.0 = Release|Any CPU
58 {4D364996-7ECD-4193-8F90-F223FFEA49DA}.Debug 4.5|Any CPU.ActiveCfg = Debug|Any CPU
48 {4D364996-7ECD-4193-8F90-F223FFEA49DA}.Debug 4.5|Any CPU.ActiveCfg = Debug|Any CPU
59 {4D364996-7ECD-4193-8F90-F223FFEA49DA}.Debug 4.5|Any CPU.Build.0 = Debug|Any CPU
49 {4D364996-7ECD-4193-8F90-F223FFEA49DA}.Debug 4.5|Any CPU.Build.0 = Debug|Any CPU
60 {4D364996-7ECD-4193-8F90-F223FFEA49DA}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
50 {4D364996-7ECD-4193-8F90-F223FFEA49DA}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
@@ -24,13 +24,13 namespace Implab {
24
24
25 //readonly THandler[] m_handlers = new THandler[RESERVED_HANDLERS_COUNT];
25 //readonly THandler[] m_handlers = new THandler[RESERVED_HANDLERS_COUNT];
26 THandler[] m_handlers;
26 THandler[] m_handlers;
27 MTQueue<THandler> m_extraHandlers;
27 SimpleAsyncQueue<THandler> m_extraHandlers;
28 int m_handlerPointer = -1;
28 int m_handlerPointer = -1;
29 int m_handlersCommited;
29 int m_handlersCommited;
30
30
31 int m_cancelRequest;
31 int m_cancelRequest;
32 Exception m_cancelationReason;
32 Exception m_cancelationReason;
33 MTQueue<Action<Exception>> m_cancelationHandlers;
33 SimpleAsyncQueue<Action<Exception>> m_cancelationHandlers;
34
34
35
35
36 #region state managment
36 #region state managment
@@ -182,7 +182,7 namespace Implab {
182 }
182 }
183 } else {
183 } else {
184 if (slot == RESERVED_HANDLERS_COUNT) {
184 if (slot == RESERVED_HANDLERS_COUNT) {
185 m_extraHandlers = new MTQueue<THandler>();
185 m_extraHandlers = new SimpleAsyncQueue<THandler>();
186 } else {
186 } else {
187 while (m_extraHandlers == null)
187 while (m_extraHandlers == null)
188 Thread.MemoryBarrier();
188 Thread.MemoryBarrier();
@@ -245,7 +245,7 namespace Implab {
245 handler(CancellationReason);
245 handler(CancellationReason);
246
246
247 if (m_cancelationHandlers == null)
247 if (m_cancelationHandlers == null)
248 Interlocked.CompareExchange(ref m_cancelationHandlers, new MTQueue<Action<Exception>>(), null);
248 Interlocked.CompareExchange(ref m_cancelationHandlers, new SimpleAsyncQueue<Action<Exception>>(), null);
249
249
250 m_cancelationHandlers.Enqueue(handler);
250 m_cancelationHandlers.Enqueue(handler);
251
251
@@ -1,7 +1,7
1 using System;
1 using System;
2
2
3 namespace Implab.Automaton {
3 namespace Implab.Automaton {
4 public struct AutomatonTransition : IEquatable<AutomatonTransition> {
4 public class AutomatonTransition : IEquatable<AutomatonTransition> {
5 public readonly int s1;
5 public readonly int s1;
6 public readonly int s2;
6 public readonly int s2;
7 public readonly int edge;
7 public readonly int edge;
@@ -1,5 +1,5
1 <?xml version="1.0" encoding="utf-8"?>
1 <?xml version="1.0" encoding="utf-8"?>
2 <Project DefaultTargets="Build" ToolsVersion="4.0" xmlns="http://schemas.microsoft.com/developer/msbuild/2003">
2 <Project DefaultTargets="Build" ToolsVersion="12.0" xmlns="http://schemas.microsoft.com/developer/msbuild/2003">
3 <PropertyGroup>
3 <PropertyGroup>
4 <Configuration Condition=" '$(Configuration)' == '' ">Debug</Configuration>
4 <Configuration Condition=" '$(Configuration)' == '' ">Debug</Configuration>
5 <Platform Condition=" '$(Platform)' == '' ">AnyCPU</Platform>
5 <Platform Condition=" '$(Platform)' == '' ">AnyCPU</Platform>
@@ -8,6 +8,7
8 <RootNamespace>Implab</RootNamespace>
8 <RootNamespace>Implab</RootNamespace>
9 <AssemblyName>Implab</AssemblyName>
9 <AssemblyName>Implab</AssemblyName>
10 <TargetFrameworkVersion>v4.5</TargetFrameworkVersion>
10 <TargetFrameworkVersion>v4.5</TargetFrameworkVersion>
11 <TargetFrameworkProfile />
11 </PropertyGroup>
12 </PropertyGroup>
12 <PropertyGroup Condition=" '$(Configuration)|$(Platform)' == 'Debug|AnyCPU' ">
13 <PropertyGroup Condition=" '$(Configuration)|$(Platform)' == 'Debug|AnyCPU' ">
13 <DebugSymbols>true</DebugSymbols>
14 <DebugSymbols>true</DebugSymbols>
@@ -97,7 +98,7
97 <Compile Include="ITaskController.cs" />
98 <Compile Include="ITaskController.cs" />
98 <Compile Include="Parallels\DispatchPool.cs" />
99 <Compile Include="Parallels\DispatchPool.cs" />
99 <Compile Include="Parallels\ArrayTraits.cs" />
100 <Compile Include="Parallels\ArrayTraits.cs" />
100 <Compile Include="Parallels\MTQueue.cs" />
101 <Compile Include="Parallels\SimpleAsyncQueue.cs" />
101 <Compile Include="Parallels\WorkerPool.cs" />
102 <Compile Include="Parallels\WorkerPool.cs" />
102 <Compile Include="ProgressInitEventArgs.cs" />
103 <Compile Include="ProgressInitEventArgs.cs" />
103 <Compile Include="Properties\AssemblyInfo.cs" />
104 <Compile Include="Properties\AssemblyInfo.cs" />
@@ -7,11 +7,11 using System.Diagnostics;
7 namespace Implab.Parallels {
7 namespace Implab.Parallels {
8 public class AsyncQueue<T> : IEnumerable<T> {
8 public class AsyncQueue<T> : IEnumerable<T> {
9 class Chunk {
9 class Chunk {
10 public Chunk next;
10 public volatile Chunk next;
11
11
12 int m_low;
12 volatile int m_low;
13 int m_hi;
13 volatile int m_hi;
14 int m_alloc;
14 volatile int m_alloc;
15 readonly int m_size;
15 readonly int m_size;
16 readonly T[] m_data;
16 readonly T[] m_data;
17
17
@@ -28,12 +28,15 namespace Implab.Parallels {
28 m_data[0] = value;
28 m_data[0] = value;
29 }
29 }
30
30
31 public Chunk(int size, T[] data, int offset, int length, int alloc) {
31 public Chunk(int size, int allocated) {
32 m_size = size;
32 m_size = size;
33 m_hi = length;
33 m_hi = allocated;
34 m_alloc = alloc;
34 m_alloc = allocated;
35 m_data = new T[size];
35 m_data = new T[size];
36 Array.Copy(data, offset, m_data, 0, length);
36 }
37
38 public void WriteData(T[] data, int offset, int dest, int length) {
39 Array.Copy(data, offset, m_data, dest, length);
37 }
40 }
38
41
39 public int Low {
42 public int Low {
@@ -48,31 +51,36 namespace Implab.Parallels {
48 get { return m_size; }
51 get { return m_size; }
49 }
52 }
50
53
51 public bool TryEnqueue(T value, out bool extend) {
54 public bool TryEnqueue(T value) {
52 var alloc = Interlocked.Increment(ref m_alloc) - 1;
55 int alloc;
53
56 do {
54 if (alloc >= m_size) {
57 alloc = m_alloc;
55 extend = alloc == m_size;
58 if (alloc >= m_size)
56 return false;
59 return false;
57 }
60 } while(alloc != Interlocked.CompareExchange(ref m_alloc, alloc + 1, alloc));
58
61
59 extend = false;
60 m_data[alloc] = value;
62 m_data[alloc] = value;
61
63
62 while (alloc != Interlocked.CompareExchange(ref m_hi, alloc + 1, alloc)) {
64 SpinWait spin = new SpinWait();
65 // m_hi is volatile
66 while (alloc != m_hi) {
63 // spin wait for commit
67 // spin wait for commit
68 spin.SpinOnce();
64 }
69 }
70 m_hi = alloc + 1;
71
65 return true;
72 return true;
66 }
73 }
67
74
68 /// <summary>
75 /// <summary>
69 /// Prevents from allocating new space in the chunk and waits for all write operations to complete
76 /// Prevents from allocating new space in the chunk and waits for all write operations to complete
70 /// </summary>
77 /// </summary>
71 public void Commit() {
78 public void Seal() {
72 var actual = Math.Min(Interlocked.Exchange(ref m_alloc, m_size + 1), m_size);
79 var actual = Math.Min(Interlocked.Exchange(ref m_alloc, m_size), m_size);
73
80 SpinWait spin = new SpinWait();
74 while (m_hi != actual)
81 while (m_hi != actual) {
75 Thread.MemoryBarrier();
82 spin.SpinOnce();
83 }
76 }
84 }
77
85
78 public bool TryDequeue(out T value, out bool recycle) {
86 public bool TryDequeue(out T value, out bool recycle) {
@@ -84,44 +92,38 namespace Implab.Parallels {
84 recycle = (low == m_size);
92 recycle = (low == m_size);
85 return false;
93 return false;
86 }
94 }
87 } while(low != Interlocked.CompareExchange(ref m_low, low + 1, low));
95 } while (low != Interlocked.CompareExchange(ref m_low, low + 1, low));
88
96
89 recycle = (low == m_size - 1);
97 recycle = (low + 1 == m_size);
90 value = m_data[low];
98 value = m_data[low];
91
99
92 return true;
100 return true;
93 }
101 }
94
102
95 public bool TryEnqueueBatch(T[] batch, int offset, int length, out int enqueued, out bool extend) {
103 public bool TryEnqueueBatch(T[] batch, int offset, int length, out int enqueued) {
96 //int alloc;
104 int alloc;
97 //int allocSize;
105 do {
106 alloc = m_alloc;
107 if (alloc >= m_size) {
108 enqueued = 0;
109 return false;
110 } else {
111 enqueued = Math.Min(length, m_size - alloc);
112 }
113 } while (alloc != Interlocked.CompareExchange(ref m_alloc, alloc + enqueued, alloc));
114
115 Array.Copy(batch, offset, m_data, alloc, enqueued);
98
116
99 var alloc = Interlocked.Add(ref m_alloc, length) - length;
117 SpinWait spin = new SpinWait();
100 if (alloc > m_size) {
118 while (alloc != m_hi) {
101 // the chunk is full and someone already
119 spin.SpinOnce();
102 // creating the new one
103 enqueued = 0; // nothing was added
104 extend = false; // the caller shouldn't try to extend the queue
105 return false; // nothing was added
106 }
120 }
107
121
108 enqueued = Math.Min(m_size - alloc, length);
122 m_hi = alloc + enqueued;
109 extend = length > enqueued;
110
111 if (enqueued == 0)
112 return false;
113
114
115 Array.Copy(batch, offset, m_data, alloc, enqueued);
116
117 while (alloc != Interlocked.CompareExchange(ref m_hi, alloc + enqueued, alloc)) {
118 // spin wait for commit
119 }
120
121 return true;
123 return true;
122 }
124 }
123
125
124 public bool TryDequeueBatch(T[] buffer, int offset, int length,out int dequeued, out bool recycle) {
126 public bool TryDequeueBatch(T[] buffer, int offset, int length, out int dequeued, out bool recycle) {
125 int low, hi, batchSize;
127 int low, hi, batchSize;
126
128
127 do {
129 do {
@@ -129,15 +131,14 namespace Implab.Parallels {
129 hi = m_hi;
131 hi = m_hi;
130 if (low >= hi) {
132 if (low >= hi) {
131 dequeued = 0;
133 dequeued = 0;
132 recycle = (low == m_size); // recycling could be restarted and we need to signal again
134 recycle = (low == m_size);
133 return false;
135 return false;
134 }
136 }
135 batchSize = Math.Min(hi - low, length);
137 batchSize = Math.Min(hi - low, length);
136 } while(low != Interlocked.CompareExchange(ref m_low, low + batchSize, low));
138 } while (low != Interlocked.CompareExchange(ref m_low, low + batchSize, low));
137
139
138 recycle = (low == m_size - batchSize);
139 dequeued = batchSize;
140 dequeued = batchSize;
140
141 recycle = (low + batchSize == m_size);
141 Array.Copy(m_data, low, buffer, offset, batchSize);
142 Array.Copy(m_data, low, buffer, offset, batchSize);
142
143
143 return true;
144 return true;
@@ -149,32 +150,33 namespace Implab.Parallels {
149 }
150 }
150
151
151 public const int DEFAULT_CHUNK_SIZE = 32;
152 public const int DEFAULT_CHUNK_SIZE = 32;
152 public const int MAX_CHUNK_SIZE = 262144;
153 public const int MAX_CHUNK_SIZE = 256;
153
154
154 Chunk m_first;
155 Chunk m_first;
155 Chunk m_last;
156 Chunk m_last;
156
157
158 public AsyncQueue() {
159 m_first = m_last = new Chunk(DEFAULT_CHUNK_SIZE);
160 }
161
157 /// <summary>
162 /// <summary>
158 /// Adds the specified value to the queue.
163 /// Adds the specified value to the queue.
159 /// </summary>
164 /// </summary>
160 /// <param name="value">Tha value which will be added to the queue.</param>
165 /// <param name="value">Tha value which will be added to the queue.</param>
161 public virtual void Enqueue(T value) {
166 public void Enqueue(T value) {
162 var last = m_last;
167 var last = m_last;
163 // spin wait to the new chunk
168 SpinWait spin = new SpinWait();
164 bool extend = true;
169 while (!last.TryEnqueue(value)) {
165 while (last == null || !last.TryEnqueue(value, out extend)) {
166 // try to extend queue
170 // try to extend queue
167 if (extend || last == null) {
171 var chunk = new Chunk(DEFAULT_CHUNK_SIZE, value);
168 var chunk = new Chunk(DEFAULT_CHUNK_SIZE, value);
172 var t = Interlocked.CompareExchange(ref m_last, chunk, last);
169 if (EnqueueChunk(last, chunk))
173 if (t == last) {
170 break; // success! exit!
174 last.next = chunk;
171 last = m_last;
175 break;
172 } else {
176 } else {
173 while (last == m_last) {
177 last = t;
174 Thread.MemoryBarrier();
175 }
176 last = m_last;
177 }
178 }
179 spin.SpinOnce();
178 }
180 }
179 }
181 }
180
182
@@ -184,67 +186,54 namespace Implab.Parallels {
184 /// <param name="data">The buffer which contains the data to be enqueued.</param>
186 /// <param name="data">The buffer which contains the data to be enqueued.</param>
185 /// <param name="offset">The offset of the data in the buffer.</param>
187 /// <param name="offset">The offset of the data in the buffer.</param>
186 /// <param name="length">The size of the data to read from the buffer.</param>
188 /// <param name="length">The size of the data to read from the buffer.</param>
187 public virtual void EnqueueRange(T[] data, int offset, int length) {
189 public void EnqueueRange(T[] data, int offset, int length) {
188 if (data == null)
190 if (data == null)
189 throw new ArgumentNullException("data");
191 throw new ArgumentNullException("data");
190 if (length == 0)
191 return;
192 if (offset < 0)
192 if (offset < 0)
193 throw new ArgumentOutOfRangeException("offset");
193 throw new ArgumentOutOfRangeException("offset");
194 if (length < 1 || offset + length > data.Length)
194 if (length < 1 || offset + length > data.Length)
195 throw new ArgumentOutOfRangeException("length");
195 throw new ArgumentOutOfRangeException("length");
196
196
197 var last = m_last;
197 while (length > 0) {
198 var last = m_last;
199 int enqueued;
198
200
199 bool extend;
201 if (last.TryEnqueueBatch(data, offset, length, out enqueued)) {
200 int enqueued;
201
202 while (length > 0) {
203 extend = true;
204 if (last != null && last.TryEnqueueBatch(data, offset, length, out enqueued, out extend)) {
205 length -= enqueued;
202 length -= enqueued;
206 offset += enqueued;
203 offset += enqueued;
207 }
204 }
208
205
209 if (extend) {
206 if (length > 0) {
210 // there was no enough space in the chunk
207 // we have something to enqueue
211 // or there was no chunks in the queue
212
208
213 while (length > 0) {
209 var tail = length % MAX_CHUNK_SIZE;
214
215 var size = Math.Min(length, MAX_CHUNK_SIZE);
216
210
217 var chunk = new Chunk(
211 var chunk = new Chunk(Math.Max(tail, DEFAULT_CHUNK_SIZE), tail);
218 Math.Max(size, DEFAULT_CHUNK_SIZE),
212
219 data,
213 if (last != Interlocked.CompareExchange(ref m_last, chunk, last))
220 offset,
214 continue; // we wasn't able to catch the writer, roundtrip
221 size,
222 length // length >= size
223 );
224
215
225 if (!EnqueueChunk(last, chunk)) {
216 // we are lucky
226 // looks like the queue has been updated then proceed from the beginning
217 // we can exclusively write our batch, the other writers will continue their work
227 last = m_last;
218
228 break;
219 length -= tail;
229 }
230
220
231 // we have successfully added the new chunk
221
232 last = chunk;
222 for(var i = 0; i < length; i+= MAX_CHUNK_SIZE) {
233 length -= size;
223 var node = new Chunk(MAX_CHUNK_SIZE, MAX_CHUNK_SIZE);
234 offset += size;
224 node.WriteData(data, offset, 0, MAX_CHUNK_SIZE);
235 }
225 offset += MAX_CHUNK_SIZE;
236 } else {
226 // fence last.next is volatile
237 // we don't need to extend the queue, if we successfully enqueued data
227 last.next = node;
238 if (length == 0)
228 last = node;
239 break;
240
241 // if we need to wait while someone is extending the queue
242 // spinwait
243 while (last == m_last) {
244 Thread.MemoryBarrier();
245 }
229 }
246
230
247 last = m_last;
231 if (tail > 0)
232 chunk.WriteData(data, offset, 0, tail);
233
234 // fence last.next is volatile
235 last.next = chunk;
236 return;
248 }
237 }
249 }
238 }
250 }
239 }
@@ -256,26 +245,21 namespace Implab.Parallels {
256 /// <param name="value">The value of the dequeued element.</param>
245 /// <param name="value">The value of the dequeued element.</param>
257 public bool TryDequeue(out T value) {
246 public bool TryDequeue(out T value) {
258 var chunk = m_first;
247 var chunk = m_first;
259 bool recycle;
248 do {
260 while (chunk != null) {
249 bool recycle;
261
250
262 var result = chunk.TryDequeue(out value, out recycle);
251 var result = chunk.TryDequeue(out value, out recycle);
263
252
264 if (recycle) // this chunk is waste
253 if (recycle && chunk.next != null) {
265 RecycleFirstChunk(chunk);
254 // this chunk is waste
266 else
255 chunk = Interlocked.CompareExchange(ref m_first, chunk.next, chunk);
256 } else {
267 return result; // this chunk is usable and returned actual result
257 return result; // this chunk is usable and returned actual result
258 }
268
259
269 if (result) // this chunk is waste but the true result is always actual
260 if (result) // this chunk is waste but the true result is always actual
270 return true;
261 return true;
271
262 } while (true);
272 // try again
273 chunk = m_first;
274 }
275
276 // the queue is empty
277 value = default(T);
278 return false;
279 }
263 }
280
264
281 /// <summary>
265 /// <summary>
@@ -295,10 +279,9 namespace Implab.Parallels {
295 throw new ArgumentOutOfRangeException("length");
279 throw new ArgumentOutOfRangeException("length");
296
280
297 var chunk = m_first;
281 var chunk = m_first;
298 bool recycle;
299 dequeued = 0;
282 dequeued = 0;
300 while (chunk != null) {
283 do {
301
284 bool recycle;
302 int actual;
285 int actual;
303 if (chunk.TryDequeueBatch(buffer, offset, length, out actual, out recycle)) {
286 if (chunk.TryDequeueBatch(buffer, offset, length, out actual, out recycle)) {
304 offset += actual;
287 offset += actual;
@@ -306,18 +289,16 namespace Implab.Parallels {
306 dequeued += actual;
289 dequeued += actual;
307 }
290 }
308
291
309 if (recycle) // this chunk is waste
292 if (recycle && chunk.next != null) {
310 RecycleFirstChunk(chunk);
293 // this chunk is waste
311 else if (actual == 0)
294 chunk = Interlocked.CompareExchange(ref m_first, chunk.next, chunk);
312 break; // the chunk is usable, but doesn't contain any data (it's the last chunk in the queue)
295 } else {
296 chunk = null;
297 }
313
298
314 if (length == 0)
299 if (length == 0)
315 return true;
300 return true;
316
301 } while (chunk != null);
317 // we still may dequeue something
318 // try again
319 chunk = m_first;
320 }
321
302
322 return dequeued != 0;
303 return dequeued != 0;
323 }
304 }
@@ -339,123 +320,81 namespace Implab.Parallels {
339 throw new ArgumentOutOfRangeException("length");
320 throw new ArgumentOutOfRangeException("length");
340
321
341 var chunk = m_first;
322 var chunk = m_first;
342 bool recycle;
323 do {
343 dequeued = 0;
324 bool recycle;
344
325 chunk.TryDequeueBatch(buffer, offset, length, out dequeued, out recycle);
345 while (chunk != null) {
346
326
347 int actual;
327 if (recycle && chunk.next != null) {
348 if (chunk.TryDequeueBatch(buffer, offset, length, out actual, out recycle)) {
328 // this chunk is waste
349 dequeued = actual;
329 chunk = Interlocked.CompareExchange(ref m_first, chunk.next, chunk);
330 } else {
331 chunk = null;
350 }
332 }
351
333
352 if (recycle) // this chunk is waste
353 RecycleFirstChunk(chunk);
354
355 // if we have dequeued any data, then return
334 // if we have dequeued any data, then return
356 if (dequeued != 0)
335 if (dequeued != 0)
357 return true;
336 return true;
358
337
359 // we still may dequeue something
338 } while (chunk != null);
360 // try again
361 chunk = m_first;
362 }
363
339
364 return false;
340 return false;
365 }
341 }
366
342
367 bool EnqueueChunk(Chunk last, Chunk chunk) {
368 if (Interlocked.CompareExchange(ref m_last, chunk, last) != last)
369 return false;
370
371 if (last != null)
372 last.next = chunk;
373 else {
374 m_first = chunk;
375 }
376 return true;
377 }
378
379 void RecycleFirstChunk(Chunk first) {
380 var next = first.next;
381
382 if (first != Interlocked.CompareExchange(ref m_first, next, first))
383 return;
384
385 if (next == null) {
386
387 if (first != Interlocked.CompareExchange(ref m_last, null, first)) {
388
389 // race
390 // someone already updated the tail, restore the pointer to the queue head
391 m_first = first;
392 }
393 // the tail is updated
394 }
395 }
396
343
397 public void Clear() {
344 public void Clear() {
398 // start the new queue
345 // start the new queue
399 var chunk = new Chunk(DEFAULT_CHUNK_SIZE);
346 var chunk = new Chunk(DEFAULT_CHUNK_SIZE);
400
401 do {
347 do {
402 Thread.MemoryBarrier();
403 var first = m_first;
348 var first = m_first;
404 var last = m_last;
349 if (first.next == null && first != m_last) {
405
406 if (last == null) // nothing to clear
407 return;
408
409 if (first == null || (first.next == null && first != last)) // inconcistency
410 continue;
350 continue;
411
351 }
412 // here we will create inconsistency which will force others to spin
413 // and prevent from fetching. chunk.next = null
414 if (first != Interlocked.CompareExchange(ref m_first, chunk, first))
415 continue;// inconsistent
416
417 m_last = chunk;
418
419 return;
420
421 } while(true);
422 }
423
424 public T[] Drain() {
425 // start the new queue
426 var chunk = new Chunk(DEFAULT_CHUNK_SIZE);
427
428 do {
429 Thread.MemoryBarrier();
430 var first = m_first;
431 var last = m_last;
432
433 if (last == null)
434 return new T[0];
435
436 if (first == null || (first.next == null && first != last))
437 continue;
438
352
439 // here we will create inconsistency which will force others to spin
353 // here we will create inconsistency which will force others to spin
440 // and prevent from fetching. chunk.next = null
354 // and prevent from fetching. chunk.next = null
441 if (first != Interlocked.CompareExchange(ref m_first, chunk, first))
355 if (first != Interlocked.CompareExchange(ref m_first, chunk, first))
442 continue;// inconsistent
356 continue;// inconsistent
443
357
444 last = Interlocked.Exchange(ref m_last, chunk);
358 m_last = chunk;
359 return;
360 } while (true);
361 }
362
363 public List<T> Drain() {
364 // start the new queue
365 var chunk = new Chunk(DEFAULT_CHUNK_SIZE);
366
367 do {
368 var first = m_first;
369 // first.next is volatile
370 if (first.next == null) {
371 if (first != m_last)
372 continue;
373 else if (first.Hi == first.Low)
374 return new List<T>();
375 }
376
377 // here we will create inconsistency which will force others to spin
378 // and prevent from fetching. chunk.next = null
379 if (first != Interlocked.CompareExchange(ref m_first, chunk, first))
380 continue;// inconsistent
381
382 var last = Interlocked.Exchange(ref m_last, chunk);
445
383
446 return ReadChunks(first, last);
384 return ReadChunks(first, last);
447
385
448 } while(true);
386 } while (true);
449 }
387 }
450
388
451 static T[] ReadChunks(Chunk chunk, object last) {
389 static List<T> ReadChunks(Chunk chunk, object last) {
452 var result = new List<T>();
390 var result = new List<T>();
453 var buffer = new T[DEFAULT_CHUNK_SIZE];
391 var buffer = new T[MAX_CHUNK_SIZE];
454 int actual;
392 int actual;
455 bool recycle;
393 bool recycle;
394 SpinWait spin = new SpinWait();
456 while (chunk != null) {
395 while (chunk != null) {
457 // ensure all write operations on the chunk are complete
396 // ensure all write operations on the chunk are complete
458 chunk.Commit();
397 chunk.Seal();
459
398
460 // we need to read the chunk using this way
399 // we need to read the chunk using this way
461 // since some client still may completing the dequeue
400 // since some client still may completing the dequeue
@@ -467,12 +406,12 namespace Implab.Parallels {
467 chunk = null;
406 chunk = null;
468 } else {
407 } else {
469 while (chunk.next == null)
408 while (chunk.next == null)
470 Thread.MemoryBarrier();
409 spin.SpinOnce();
471 chunk = chunk.next;
410 chunk = chunk.next;
472 }
411 }
473 }
412 }
474
413
475 return result.ToArray();
414 return result;
476 }
415 }
477
416
478 struct ArraySegmentCollection : ICollection<T> {
417 struct ArraySegmentCollection : ICollection<T> {
@@ -501,7 +440,7 namespace Implab.Parallels {
501 }
440 }
502
441
503 public void CopyTo(T[] array, int arrayIndex) {
442 public void CopyTo(T[] array, int arrayIndex) {
504 Array.Copy(m_data,m_offset,array,arrayIndex, m_length);
443 Array.Copy(m_data, m_offset, array, arrayIndex, m_length);
505 }
444 }
506
445
507 public bool Remove(T item) {
446 public bool Remove(T item) {
@@ -5,13 +5,13 namespace Implab.Parallels {
5 public class BlockingQueue<T> : AsyncQueue<T> {
5 public class BlockingQueue<T> : AsyncQueue<T> {
6 readonly object m_lock = new object();
6 readonly object m_lock = new object();
7
7
8 public override void Enqueue(T value) {
8 public void EnqueuePulse(T value) {
9 base.Enqueue(value);
9 base.Enqueue(value);
10 lock (m_lock)
10 lock (m_lock)
11 Monitor.Pulse(m_lock);
11 Monitor.Pulse(m_lock);
12 }
12 }
13
13
14 public override void EnqueueRange(T[] data, int offset, int length) {
14 public void EnqueueRangePulse(T[] data, int offset, int length) {
15 base.EnqueueRange(data, offset, length);
15 base.EnqueueRange(data, offset, length);
16 if (length > 1)
16 if (length > 1)
17 lock (m_lock)
17 lock (m_lock)
1 NO CONTENT: file was removed
NO CONTENT: file was removed
General Comments 3
Under Review
author

Auto status change to "Under Review"

Approved
author

ok, latest stable version should be in default

You need to be logged in to leave comments. Login now