##// END OF EJS Templates
Improved AsyncQueue...
cin -
r233:d6fe09f5592c v2
parent child
Show More
@@ -0,0 +1,131
1 using System.Threading;
2 using System.Collections.Generic;
3 using System;
4 using System.Collections;
5
6 namespace Implab.Parallels {
7 public class SimpleAsyncQueue<T> : IEnumerable<T> {
8 class Node {
9 public Node(T value) {
10 this.value = value;
11 }
12 public readonly T value;
13 public volatile Node next;
14 }
15
16 // the reader and the writer are mainteined completely independent,
17 // the reader can read next item when m_first.next is not null
18 // the writer creates the a new node, moves m_last to this node and
19 // only after that restores the reference from the previous node
20 // making available the reader to read the new node.
21
22 Node m_first; // position on the node which is already read
23 Node m_last; // position on the node which is already written
24
25 public SimpleAsyncQueue() {
26 m_first = m_last = new Node(default(T));
27 }
28
29 public void Enqueue(T value) {
30 var next = new Node(value);
31
32 // Interlocaked.CompareExchange implies Thread.MemoryBarrier();
33 // to ensure that the next node is completely constructed
34 var last = Interlocked.Exchange(ref m_last, next);
35
36 // release-fence
37 last.next = next;
38
39 }
40
41 public bool TryDequeue(out T value) {
42 Node first;
43 Node next;
44
45 Thread.MemoryBarrier(); // ensure m_first is fresh
46 SpinWait spin = new SpinWait();
47 do {
48 first = m_first;
49 // aquire-fence
50 next = first.next;
51 if (next == null) {
52 value = default(T);
53 return false;
54 }
55
56 if (first == Interlocked.CompareExchange(ref m_first, next, first))
57 // head succesfully updated
58 break;
59 spin.SpinOnce();
60 } while (true);
61
62 value = next.value;
63 return true;
64 }
65
66 #region IEnumerable implementation
67
68 class Enumerator : IEnumerator<T> {
69 Node m_current;
70 Node m_first;
71
72 public Enumerator(Node first) {
73 m_first = first;
74 }
75
76 #region IEnumerator implementation
77
78 public bool MoveNext() {
79 m_current = m_current == null ? m_first : m_current.next;
80 return m_current != null;
81 }
82
83 public void Reset() {
84 m_current = null;
85 }
86
87 object IEnumerator.Current {
88 get {
89 if (m_current == null)
90 throw new InvalidOperationException();
91 return m_current.value;
92 }
93 }
94
95 #endregion
96
97 #region IDisposable implementation
98
99 public void Dispose() {
100 }
101
102 #endregion
103
104 #region IEnumerator implementation
105
106 public T Current {
107 get {
108 if (m_current == null)
109 throw new InvalidOperationException();
110 return m_current.value;
111 }
112 }
113
114 #endregion
115 }
116
117 public IEnumerator<T> GetEnumerator() {
118 return new Enumerator(m_first);
119 }
120
121 #endregion
122
123 #region IEnumerable implementation
124
125 IEnumerator IEnumerable.GetEnumerator() {
126 return GetEnumerator();
127 }
128
129 #endregion
130 }
131 }
@@ -1,6 +1,6
1 <?xml version="1.0" encoding="utf-8" ?>
1 <?xml version="1.0" encoding="utf-8"?>
2 2 <configuration>
3 3 <startup>
4 <supportedRuntime version="v4.0" sku=".NETFramework,Version=v4.5.2" />
4 <supportedRuntime version="v4.0" sku=".NETFramework,Version=v4.5"/>
5 5 </startup>
6 </configuration> No newline at end of file
6 </configuration>
@@ -1,68 +1,69
1 1 <?xml version="1.0" encoding="utf-8"?>
2 2 <Project ToolsVersion="14.0" DefaultTargets="Build" xmlns="http://schemas.microsoft.com/developer/msbuild/2003">
3 3 <Import Project="$(MSBuildExtensionsPath)\$(MSBuildToolsVersion)\Microsoft.Common.props" Condition="Exists('$(MSBuildExtensionsPath)\$(MSBuildToolsVersion)\Microsoft.Common.props')" />
4 4 <PropertyGroup>
5 5 <Configuration Condition=" '$(Configuration)' == '' ">Debug</Configuration>
6 6 <Platform Condition=" '$(Platform)' == '' ">AnyCPU</Platform>
7 7 <ProjectGuid>{100DFEB0-75BE-436F-ADDF-1F46EF433F46}</ProjectGuid>
8 8 <OutputType>Exe</OutputType>
9 9 <AppDesignerFolder>Properties</AppDesignerFolder>
10 10 <RootNamespace>Implab.Playground</RootNamespace>
11 11 <AssemblyName>Implab.Playground</AssemblyName>
12 <TargetFrameworkVersion>v4.5.2</TargetFrameworkVersion>
12 <TargetFrameworkVersion>v4.5</TargetFrameworkVersion>
13 13 <FileAlignment>512</FileAlignment>
14 14 <AutoGenerateBindingRedirects>true</AutoGenerateBindingRedirects>
15 <TargetFrameworkProfile />
15 16 </PropertyGroup>
16 17 <PropertyGroup Condition=" '$(Configuration)|$(Platform)' == 'Debug|AnyCPU' ">
17 18 <PlatformTarget>AnyCPU</PlatformTarget>
18 19 <DebugSymbols>true</DebugSymbols>
19 20 <DebugType>full</DebugType>
20 21 <Optimize>false</Optimize>
21 22 <OutputPath>bin\Debug\</OutputPath>
22 23 <DefineConstants>DEBUG;TRACE</DefineConstants>
23 24 <ErrorReport>prompt</ErrorReport>
24 25 <WarningLevel>4</WarningLevel>
25 26 </PropertyGroup>
26 27 <PropertyGroup Condition=" '$(Configuration)|$(Platform)' == 'Release|AnyCPU' ">
27 28 <PlatformTarget>AnyCPU</PlatformTarget>
28 29 <DebugType>pdbonly</DebugType>
29 30 <Optimize>true</Optimize>
30 31 <OutputPath>bin\Release\</OutputPath>
31 32 <DefineConstants>TRACE</DefineConstants>
32 33 <ErrorReport>prompt</ErrorReport>
33 34 <WarningLevel>4</WarningLevel>
34 35 <Prefer32Bit>true</Prefer32Bit>
35 36 <DebugSymbols>true</DebugSymbols>
36 37 </PropertyGroup>
37 38 <ItemGroup>
38 39 <Reference Include="System" />
39 40 <Reference Include="System.Core" />
40 41 <Reference Include="System.Xml.Linq" />
41 42 <Reference Include="System.Data.DataSetExtensions" />
42 43 <Reference Include="Microsoft.CSharp" />
43 44 <Reference Include="System.Data" />
44 45 <Reference Include="System.Net.Http" />
45 46 <Reference Include="System.Xml" />
46 47 </ItemGroup>
47 48 <ItemGroup>
48 49 <Compile Include="Program.cs" />
49 50 <Compile Include="Properties\AssemblyInfo.cs" />
50 51 </ItemGroup>
51 52 <ItemGroup>
52 53 <None Include="App.config" />
53 54 </ItemGroup>
54 55 <ItemGroup>
55 56 <ProjectReference Include="..\Implab\Implab.csproj">
56 57 <Project>{f550f1f8-8746-4ad0-9614-855f4c4b7f05}</Project>
57 58 <Name>Implab</Name>
58 59 </ProjectReference>
59 60 </ItemGroup>
60 61 <Import Project="$(MSBuildToolsPath)\Microsoft.CSharp.targets" />
61 62 <!-- To modify your build process, add your task inside one of the targets below and uncomment it.
62 63 Other similar extension points exist, see Microsoft.Common.targets.
63 64 <Target Name="BeforeBuild">
64 65 </Target>
65 66 <Target Name="AfterBuild">
66 67 </Target>
67 68 -->
68 69 </Project> No newline at end of file
@@ -1,42 +1,183
1 1 using Implab.Formats.Json;
2 using Implab.Parallels;
2 3 using Implab.Xml;
3 4 using System;
5 using System.Collections.Concurrent;
4 6 using System.Collections.Generic;
5 7 using System.IO;
6 8 using System.Linq;
7 9 using System.Text;
10 using System.Threading;
8 11 using System.Threading.Tasks;
9 12 using System.Xml;
10 13 using System.Xml.Serialization;
11 14
12 15 namespace Implab.Playground {
13 16 public class Program {
14 17
15 [XmlRoot(Namespace = "XmlSimpleData")]
16 public class XmlSimpleModel {
17 [XmlElement]
18 public string Name { get; set; }
18 static void EnqueueRange<T>(ConcurrentQueue<T> q, T[] data, int offset, int len) {
19 for (var i = offset; i < offset + len; i++)
20 q.Enqueue(data[i]);
21 }
22
23 static bool TryDequeueRange<T>(ConcurrentQueue<T> q,T[] buffer,int offset, int len, out int actual) {
24 actual = 0;
25 T res;
26 while(q.TryDequeue(out res)) {
27 buffer[offset + actual] = res;
28 actual++;
29 if (actual == len)
30 break;
31 }
32 return actual != 0;
33 }
34
35 static void EnqueueRange<T>(SimpleAsyncQueue<T> q, T[] data, int offset, int len) {
36 for (var i = offset; i < offset + len; i++)
37 q.Enqueue(data[i]);
38 }
19 39
20 [XmlElement]
21 public int Order { get; set; }
40 static bool TryDequeueRange<T>(SimpleAsyncQueue<T> q, T[] buffer, int offset, int len, out int actual) {
41 actual = 0;
42 T res;
43 while (q.TryDequeue(out res)) {
44 buffer[offset + actual] = res;
45 actual++;
46 if (actual == len)
47 break;
48 }
49 return actual != 0;
50 }
22 51
23 [XmlElement]
24 public string[] Items { get; set; }
52 /*
53 static void EnqueueRange<T>(AsyncQueue<T> q, T[] data, int offset, int len) {
54 for (var i = offset; i < offset + len; i++)
55 q.Enqueue(data[i]);
56 }
25 57
58 static bool TryDequeueRange<T>(AsyncQueue<T> q, T[] buffer, int offset, int len, out int actual) {
59 actual = 0;
60 T res;
61 while (q.TryDequeue(out res)) {
62 buffer[offset + actual] = res;
63 actual++;
64 if (actual == len)
65 break;
66 }
67 return actual != 0;
68 }
69 */
70
71 static void EnqueueRange<T>(AsyncQueue<T> q, T[] data, int offset, int len) {
72 q.EnqueueRange(data, offset, len);
26 73 }
27 74
75 static bool TryDequeueRange<T>(AsyncQueue<T> q, T[] buffer, int offset, int len, out int actual) {
76 return q.TryDequeueRange(buffer, offset, len, out actual);
77 }
78
79
28 80 static void Main(string[] args) {
29 var model = new XmlSimpleModel {
30 Name = "Tablet",
31 Order = 10,
32 Items = new string[] { "z1", "z2", "z3" }
33 };
34 81
35 var doc = SerializationHelpers.SerializeAsXmlDocument(model);
36
37 var m2 = SerializationHelpers.DeserializeFromXmlNode<XmlSimpleModel>(doc.DocumentElement);
82 //var queue = new ConcurrentQueue<int>();
83 var queue = new AsyncQueue<int>();
84 //var queue = new SimpleAsyncQueue<int>();
85
86 const int wBatch = 32;
87 const long wCount = 1000000;
88 const long total = wBatch * wCount * 3;
89
90 long r1 = 0, r2 = 0, r3 = 0;
91 const int rBatch = 1000;
92 long read = 0;
93
94 var t1 = Environment.TickCount;
95
96 AsyncPool.RunThread(
97 () => {
98 var buffer = new int[wBatch];
99 for (int i = 0; i < wBatch; i++)
100 buffer[i] = 1;
101
102 for (int i = 0; i < wCount; i++)
103 EnqueueRange(queue, buffer, 0, wBatch);
104 Console.WriteLine("done writer #1: {0} ms", Environment.TickCount - t1);
105 },
106 () => {
107 var buffer = new int[wBatch];
108 for (int i = 0; i < wBatch; i++)
109 buffer[i] = 1;
110
111 for (int i = 0; i < wCount; i++)
112 EnqueueRange(queue, buffer, 0, wBatch);
113 Console.WriteLine("done writer #2: {0} ms", Environment.TickCount - t1);
114 },
115 () => {
116 var buffer = new int[wBatch];
117 for (int i = 0; i < wBatch; i++)
118 buffer[i] = 1;
119
120 for (int i = 0; i < wCount; i++)
121 EnqueueRange(queue, buffer, 0, wBatch);
122 Console.WriteLine("done writer #3: {0} ms", Environment.TickCount - t1);
123 },
124 () => {
125 var buffer = new int[rBatch];
126
127 while (read < total) {
128 int actual;
129 if (TryDequeueRange(queue, buffer, 0, rBatch, out actual)) {
130 for (int i = 0; i < actual; i++)
131 r1 += buffer[i];
132 Interlocked.Add(ref read, actual);
133 }
134 }
135
136 Console.WriteLine("done reader #1: {0} ms", Environment.TickCount - t1);
137 }/*,
138 () => {
139 var buffer = new int[rBatch];
140
141 while (read < total) {
142 int actual;
143 if (TryDequeueRange(queue, buffer, 0, rBatch, out actual)) {
144 for (int i = 0; i < actual; i++)
145 r2 += buffer[i];
146 Interlocked.Add(ref read, actual);
147 }
148 }
149
150 Console.WriteLine("done reader #2: {0} ms", Environment.TickCount - t1);
151 }*//*,
152 () => {
153 var buffer = new int[rBatch];
154
155 while (read < total) {
156 int actual;
157 if (TryDequeueRange(queue, buffer, 0, rBatch, out actual)) {
158 for (int i = 0; i < actual; i++)
159 r3 += buffer[i];
160 Interlocked.Add(ref read, actual);
161 }
162 }
163
164 Console.WriteLine("done reader #3: {0} ms", Environment.TickCount - t1);
165 }*/
166 )
167 .PromiseAll()
168 .Join();
169
170
171 Console.WriteLine(
172 "done: {0} ms, summ#1: {1}, summ#2: {2}, total: {3}, count: {4}",
173 Environment.TickCount - t1,
174 r1,
175 r2,
176 r1 + r2 + r3,
177 total
178 );
38 179
39 180 Console.WriteLine("done");
40 181 }
41 182 }
42 183 }
@@ -1,75 +1,70
1 1 <?xml version="1.0" encoding="UTF-8"?>
2 2 <VSPerformanceSession Version="1.00">
3 3 <Options>
4 4 <Solution>Implab.sln</Solution>
5 5 <CollectionMethod>Sampling</CollectionMethod>
6 6 <AllocationMethod>None</AllocationMethod>
7 7 <AddReport>true</AddReport>
8 8 <ResourceBasedAnalysisSelected>true</ResourceBasedAnalysisSelected>
9 9 <UniqueReport>Timestamp</UniqueReport>
10 10 <SamplingMethod>Cycles</SamplingMethod>
11 11 <CycleCount>50000</CycleCount>
12 12 <PageFaultCount>10</PageFaultCount>
13 13 <SysCallCount>10</SysCallCount>
14 14 <SamplingCounter Name="" ReloadValue="00000000000f4240" DisplayName="" />
15 15 <RelocateBinaries>false</RelocateBinaries>
16 16 <HardwareCounters EnableHWCounters="false" />
17 17 <EtwSettings />
18 18 <PdhSettings>
19 19 <PdhCountersEnabled>false</PdhCountersEnabled>
20 20 <PdhCountersRate>500</PdhCountersRate>
21 21 <PdhCounters>
22 22 <PdhCounter>\Память\Обмен страниц/с</PdhCounter>
23 23 <PdhCounter>\Процессор(_Total)\% загруженности процессора</PdhCounter>
24 24 <PdhCounter>\Физический диск(_Total)\Средняя длина очереди диска</PdhCounter>
25 25 </PdhCounters>
26 26 </PdhSettings>
27 27 </Options>
28 28 <ExcludeSmallFuncs>true</ExcludeSmallFuncs>
29 29 <InteractionProfilingEnabled>false</InteractionProfilingEnabled>
30 30 <JScriptProfilingEnabled>false</JScriptProfilingEnabled>
31 31 <PreinstrumentEvent>
32 32 <InstrEventExclude>false</InstrEventExclude>
33 33 </PreinstrumentEvent>
34 34 <PostinstrumentEvent>
35 35 <InstrEventExclude>false</InstrEventExclude>
36 36 </PostinstrumentEvent>
37 37 <Binaries>
38 38 <ProjBinary>
39 39 <Path>Implab.Playground\obj\Debug\Implab.Playground.exe</Path>
40 40 <ArgumentTimestamp>01/01/0001 00:00:00</ArgumentTimestamp>
41 41 <Instrument>true</Instrument>
42 42 <Sample>true</Sample>
43 43 <ExternalWebsite>false</ExternalWebsite>
44 44 <InteractionProfilingEnabled>false</InteractionProfilingEnabled>
45 45 <IsLocalJavascript>false</IsLocalJavascript>
46 46 <IsWindowsStoreApp>false</IsWindowsStoreApp>
47 47 <IsWWA>false</IsWWA>
48 48 <LaunchProject>true</LaunchProject>
49 49 <OverrideProjectSettings>false</OverrideProjectSettings>
50 50 <LaunchMethod>Executable</LaunchMethod>
51 <ExecutablePath>Implab.Playground\bin\Release\Implab.Playground.exe</ExecutablePath>
52 <StartupDirectory>Implab.Playground\bin\Release\</StartupDirectory>
51 <ExecutablePath>Implab.Playground\bin\Debug\Implab.Playground.exe</ExecutablePath>
52 <StartupDirectory>Implab.Playground\bin\Debug\</StartupDirectory>
53 53 <Arguments>
54 54 </Arguments>
55 55 <NetAppHost>IIS</NetAppHost>
56 56 <NetBrowser>InternetExplorer</NetBrowser>
57 57 <ExcludeSmallFuncs>true</ExcludeSmallFuncs>
58 58 <JScriptProfilingEnabled>false</JScriptProfilingEnabled>
59 59 <PreinstrumentEvent>
60 60 <InstrEventExclude>false</InstrEventExclude>
61 61 </PreinstrumentEvent>
62 62 <PostinstrumentEvent>
63 63 <InstrEventExclude>false</InstrEventExclude>
64 64 </PostinstrumentEvent>
65 65 <ProjRef>{100DFEB0-75BE-436F-ADDF-1F46EF433F46}|Implab.Playground\Implab.Playground.csproj</ProjRef>
66 66 <ProjPath>Implab.Playground\Implab.Playground.csproj</ProjPath>
67 67 <ProjName>Implab.Playground</ProjName>
68 68 </ProjBinary>
69 69 </Binaries>
70 <Launches>
71 <ProjBinary>
72 <Path>:PB:{100DFEB0-75BE-436F-ADDF-1F46EF433F46}|Implab.Playground\Implab.Playground.csproj</Path>
73 </ProjBinary>
74 </Launches>
75 70 </VSPerformanceSession> No newline at end of file
@@ -1,863 +1,878
1 1 using System;
2 2 using System.Reflection;
3 3 using System.Threading;
4 4 using Implab.Parallels;
5 5
6 6 #if MONO
7 7
8 8 using NUnit.Framework;
9 9 using TestClassAttribute = NUnit.Framework.TestFixtureAttribute;
10 10 using TestMethodAttribute = NUnit.Framework.TestAttribute;
11 11
12 12 #else
13 13
14 14 using Microsoft.VisualStudio.TestTools.UnitTesting;
15 15
16 16 #endif
17 17
18 18 namespace Implab.Test {
19 19 [TestClass]
20 20 public class AsyncTests {
21 21 [TestMethod]
22 22 public void ResolveTest() {
23 23 int res = -1;
24 24 var p = new Promise<int>();
25 25 p.Then(x => res = x);
26 26 p.Resolve(100);
27 27
28 28 Assert.AreEqual(100, res);
29 29 }
30 30
31 31 [TestMethod]
32 32 public void RejectTest() {
33 33 int res = -1;
34 34 Exception err = null;
35 35
36 36 var p = new Promise<int>();
37 37 p.Then(
38 38 x => res = x,
39 39 e => {
40 40 err = e;
41 41 return -2;
42 42 }
43 43 );
44 44 p.Reject(new ApplicationException("error"));
45 45
46 46 Assert.AreEqual(res, -1);
47 47 Assert.AreEqual(err.Message, "error");
48 48
49 49 }
50 50
51 51 [TestMethod]
52 52 public void CancelExceptionTest() {
53 53 var p = new Promise<bool>();
54 54 p.CancelOperation(null);
55 55
56 56 var p2 = p.Then(x => x, null, reason => {
57 57 throw new ApplicationException("CANCELLED");
58 58 });
59 59
60 60 try {
61 61 p2.Join();
62 62 Assert.Fail();
63 63 } catch (ApplicationException err) {
64 64 Assert.AreEqual("CANCELLED", err.InnerException.Message);
65 65 }
66 66
67 67 }
68 68
69 69 [TestMethod]
70 70 public void ContinueOnCancelTest() {
71 71 var p = new Promise<bool>();
72 72 p.CancelOperation(null);
73 73
74 74 var p2 = p
75 75 .Then(x => x, null, reason => {
76 76 throw new ApplicationException("CANCELLED");
77 77 })
78 78 .Then(x => x, e => true);
79 79
80 80 Assert.AreEqual(true, p2.Join());
81 81 }
82 82
83 83 [TestMethod]
84 84 public void JoinSuccessTest() {
85 85 var p = new Promise<int>();
86 86 p.Resolve(100);
87 87 Assert.AreEqual(p.Join(), 100);
88 88 }
89 89
90 90 [TestMethod]
91 91 public void JoinFailTest() {
92 92 var p = new Promise<int>();
93 93 p.Reject(new ApplicationException("failed"));
94 94
95 95 try {
96 96 p.Join();
97 97 throw new ApplicationException("WRONG!");
98 98 } catch (TargetInvocationException err) {
99 99 Assert.AreEqual(err.InnerException.Message, "failed");
100 100 } catch {
101 101 Assert.Fail("Got wrong excaption");
102 102 }
103 103 }
104 104
105 105 [TestMethod]
106 106 public void MapTest() {
107 107 var p = new Promise<int>();
108 108
109 109 var p2 = p.Then(x => x.ToString());
110 110 p.Resolve(100);
111 111
112 112 Assert.AreEqual(p2.Join(), "100");
113 113 }
114 114
115 115 [TestMethod]
116 116 public void FixErrorTest() {
117 117 var p = new Promise<int>();
118 118
119 119 var p2 = p.Then(x => x, e => 101);
120 120
121 121 p.Reject(new Exception());
122 122
123 123 Assert.AreEqual(p2.Join(), 101);
124 124 }
125 125
126 126 [TestMethod]
127 127 public void ChainTest() {
128 128 var p1 = new Promise<int>();
129 129
130 130 var p3 = p1.Chain(x => {
131 131 var p2 = new Promise<string>();
132 132 p2.Resolve(x.ToString());
133 133 return p2;
134 134 });
135 135
136 136 p1.Resolve(100);
137 137
138 138 Assert.AreEqual(p3.Join(), "100");
139 139 }
140 140
141 141 [TestMethod]
142 142 public void ChainFailTest() {
143 143 var p1 = new Promise<int>();
144 144
145 145 var p3 = p1.Chain(x => {
146 146 var p2 = new Promise<string>();
147 147 p2.Reject(new Exception("DIE!!!"));
148 148 return p2;
149 149 });
150 150
151 151 p1.Resolve(100);
152 152
153 153 Assert.IsTrue(p3.IsResolved);
154 154 }
155 155
156 156 [TestMethod]
157 157 public void PoolTest() {
158 158 var pid = Thread.CurrentThread.ManagedThreadId;
159 159 var p = AsyncPool.Invoke(() => Thread.CurrentThread.ManagedThreadId);
160 160
161 161 Assert.AreNotEqual(pid, p.Join());
162 162 }
163 163
164 164 [TestMethod]
165 165 public void WorkerPoolSizeTest() {
166 166 var pool = new WorkerPool(5, 10, 1);
167 167
168 168 Assert.AreEqual(5, pool.PoolSize);
169 169
170 170 pool.Invoke(() => { Thread.Sleep(100000000); return 10; });
171 171 pool.Invoke(() => { Thread.Sleep(100000000); return 10; });
172 172 pool.Invoke(() => { Thread.Sleep(100000000); return 10; });
173 173
174 174 Assert.AreEqual(5, pool.PoolSize);
175 175
176 176 for (int i = 0; i < 100; i++)
177 177 pool.Invoke(() => { Thread.Sleep(100000000); return 10; });
178 178 Thread.Sleep(200);
179 179 Assert.AreEqual(10, pool.PoolSize);
180 180
181 181 pool.Dispose();
182 182 }
183 183
184 184 [TestMethod]
185 185 public void WorkerPoolCorrectTest() {
186 186 var pool = new WorkerPool(0,1000,100);
187 187
188 188 const int iterations = 1000;
189 189 int pending = iterations;
190 190 var stop = new ManualResetEvent(false);
191 191
192 192 var count = 0;
193 193 for (int i = 0; i < iterations; i++) {
194 194 pool
195 195 .Invoke(() => 1)
196 196 .Then(x => Interlocked.Add(ref count, x))
197 197 .Then(x => Math.Log10(x))
198 198 .On(() => {
199 199 Interlocked.Decrement(ref pending);
200 200 if (pending == 0)
201 201 stop.Set();
202 202 }, PromiseEventType.All);
203 203 }
204 204
205 205 stop.WaitOne();
206 206
207 207 Assert.AreEqual(iterations, count);
208 208 Console.WriteLine("Max threads: {0}", pool.MaxRunningThreads);
209 209 pool.Dispose();
210 210
211 211 }
212 212
213 213 [TestMethod]
214 214 public void WorkerPoolDisposeTest() {
215 215 var pool = new WorkerPool(5, 20);
216 216 Assert.AreEqual(5, pool.PoolSize);
217 217 pool.Dispose();
218 218 Thread.Sleep(500);
219 219 Assert.AreEqual(0, pool.PoolSize);
220 220 pool.Dispose();
221 221 }
222 222
223 223 [TestMethod]
224 224 public void MTQueueTest() {
225 var queue = new MTQueue<int>();
225 var queue = new SimpleAsyncQueue<int>();
226 226 int res;
227
227
228 228 queue.Enqueue(10);
229 229 Assert.IsTrue(queue.TryDequeue(out res));
230 230 Assert.AreEqual(10, res);
231 231 Assert.IsFalse(queue.TryDequeue(out res));
232 232
233 233 for (int i = 0; i < 1000; i++)
234 234 queue.Enqueue(i);
235 235
236 236 for (int i = 0; i < 1000; i++) {
237 237 queue.TryDequeue(out res);
238 238 Assert.AreEqual(i, res);
239 239 }
240 240
241 241 int writers = 0;
242 242 int readers = 0;
243 243 var stop = new ManualResetEvent(false);
244 244 int total = 0;
245 var ticks = Environment.TickCount;
245 246
246 const int itemsPerWriter = 10000;
247 const int itemsPerWriter = 1000000;
247 248 const int writersCount = 10;
248 249
249 250 for (int i = 0; i < writersCount; i++) {
250 251 Interlocked.Increment(ref writers);
251 252 AsyncPool
252 253 .RunThread(() => {
253 254 for (int ii = 0; ii < itemsPerWriter; ii++) {
254 255 queue.Enqueue(1);
255 256 }
256 257 return 1;
257 258 })
258 259 .On(() => Interlocked.Decrement(ref writers), PromiseEventType.All);
259 260 }
260 261
261 262 for (int i = 0; i < 10; i++) {
262 263 Interlocked.Increment(ref readers);
263 264 AsyncPool
264 265 .RunThread(() => {
265 266 int t;
266 267 do {
267 268 while (queue.TryDequeue(out t))
268 269 Interlocked.Add(ref total, t);
269 270 } while (writers > 0);
270 271 return 1;
271 272 })
272 273 .On(() => {
273 274 Interlocked.Decrement(ref readers);
274 275 if (readers == 0)
275 276 stop.Set();
276 277 }, PromiseEventType.All);
277 278 }
278 279
279 280 stop.WaitOne();
280 281
281 Assert.AreEqual(100000, total);
282 Console.WriteLine("{0} in {1}ms", total, Environment.TickCount - ticks);
283
284 Assert.AreEqual(itemsPerWriter * writersCount, total);
282 285 }
283 286
284 287 [TestMethod]
285 288 public void AsyncQueueTest() {
286 289 var queue = new AsyncQueue<int>();
287 290 int res;
288 291
289 292 queue.Enqueue(10);
290 293 Assert.IsTrue(queue.TryDequeue(out res));
291 294 Assert.AreEqual(10, res);
292 295 Assert.IsFalse(queue.TryDequeue(out res));
293 296
294 297 for (int i = 0; i < 1000; i++)
295 298 queue.Enqueue(i);
296 299
297 300 for (int i = 0; i < 1000; i++) {
298 301 queue.TryDequeue(out res);
299 302 Assert.AreEqual(i, res);
300 303 }
301 304
302 305 const int count = 10000000;
303 306
304 307 int res1 = 0, res2 = 0;
305 308 var t1 = Environment.TickCount;
306 309
307 310 AsyncPool.RunThread(
308 311 () => {
309 312 for (var i = 0; i < count; i++)
310 313 queue.Enqueue(1);
311 314 Console.WriteLine("done writer #1: {0} ms", Environment.TickCount - t1);
312 315 },
313 316 () => {
314 317 for (var i = 0; i < count; i++)
315 318 queue.Enqueue(2);
316 319 Console.WriteLine("done writer #2: {0} ms", Environment.TickCount - t1);
317 320 },
318 321 () => {
319 322 int temp;
320 323 int i = 0;
321 324 while (i < count)
322 325 if (queue.TryDequeue(out temp)) {
323 326 i++;
324 327 res1 += temp;
325 328 }
326 329 Console.WriteLine("done reader #1: {0} ms", Environment.TickCount - t1);
327 330 },
328 331 () => {
329 332 int temp;
330 333 int i = 0;
331 334 while (i < count)
332 335 if (queue.TryDequeue(out temp)) {
333 336 i++;
334 337 res2 += temp;
335 338 }
336 339 Console.WriteLine("done reader #2: {0} ms", Environment.TickCount - t1);
337 340 }
338 341 )
339 342 .PromiseAll()
340 343 .Join();
341 344
342 345 Assert.AreEqual(count * 3, res1 + res2);
343 346
344 347 Console.WriteLine(
345 348 "done: {0} ms, summ#1: {1}, summ#2: {2}, total: {3}, count: {4}",
346 349 Environment.TickCount - t1,
347 350 res1,
348 351 res2,
349 352 res1 + res2,
350 353 count
351 354 );
352 355 }
353 356
354 357 [TestMethod]
355 358 public void AsyncQueueBatchTest() {
356 359 var queue = new AsyncQueue<int>();
357 360
358 361 const int wBatch = 29;
359 362 const int wCount = 400000;
360 363 const int total = wBatch * wCount * 2;
361 364 const int summ = wBatch * wCount * 3;
362 365
363 366 int r1 = 0, r2 = 0;
364 367 const int rBatch = 111;
365 368 int read = 0;
366 369
367 370 var t1 = Environment.TickCount;
368 371
369 372 AsyncPool.RunThread(
370 373 () => {
371 374 var buffer = new int[wBatch];
372 375 for(int i = 0; i<wBatch; i++)
373 376 buffer[i] = 1;
374 377
375 378 for(int i =0; i < wCount; i++)
376 379 queue.EnqueueRange(buffer,0,wBatch);
377 380 Console.WriteLine("done writer #1: {0} ms", Environment.TickCount - t1);
378 381 },
379 382 () => {
380 383 var buffer = new int[wBatch];
381 384 for(int i = 0; i<wBatch; i++)
382 385 buffer[i] = 2;
383 386
384 387 for(int i =0; i < wCount; i++)
385 388 queue.EnqueueRange(buffer,0,wBatch);
386 389 Console.WriteLine("done writer #2: {0} ms", Environment.TickCount - t1);
387 390 },
388 391 () => {
389 392 var buffer = new int[rBatch];
390 393
391 394 while(read < total) {
392 395 int actual;
393 396 if (queue.TryDequeueRange(buffer,0,rBatch,out actual)) {
394 397 for(int i=0; i< actual; i++)
395 398 r1 += buffer[i];
396 399 Interlocked.Add(ref read, actual);
397 400 }
398 401 }
399 402
400 403 Console.WriteLine("done reader #1: {0} ms", Environment.TickCount - t1);
401 404 },
402 405 () => {
403 406 var buffer = new int[rBatch];
404 407
405 408 while(read < total) {
406 409 int actual;
407 410 if (queue.TryDequeueRange(buffer,0,rBatch,out actual)) {
408 411 for(int i=0; i< actual; i++)
409 412 r2 += buffer[i];
410 413 Interlocked.Add(ref read, actual);
411 414 }
412 415 }
413 416
414 417 Console.WriteLine("done reader #2: {0} ms", Environment.TickCount - t1);
415 418 }
416 419 )
417 420 .PromiseAll()
418 421 .Join();
419 422
420 423 Assert.AreEqual(summ , r1 + r2);
421 424
422 425 Console.WriteLine(
423 426 "done: {0} ms, summ#1: {1}, summ#2: {2}, total: {3}, count: {4}",
424 427 Environment.TickCount - t1,
425 428 r1,
426 429 r2,
427 430 r1 + r2,
428 431 total
429 432 );
430 433 }
431 434
432 435 [TestMethod]
433 436 public void AsyncQueueChunkDequeueTest() {
434 437 var queue = new AsyncQueue<int>();
435 438
436 439 const int wBatch = 31;
437 440 const int wCount = 200000;
438 441 const int total = wBatch * wCount * 3;
439 442 const int summ = wBatch * wCount * 6;
440 443
441 444 int r1 = 0, r2 = 0;
442 445 const int rBatch = 1024;
443 446 int read = 0;
444 447
445 448 var t1 = Environment.TickCount;
446 449
447 450 AsyncPool.RunThread(
448 451 () => {
449 452 var buffer = new int[wBatch];
450 453 for(int i = 0; i<wBatch; i++)
451 454 buffer[i] = 1;
452 455
453 456 for(int i =0; i < wCount; i++)
454 457 queue.EnqueueRange(buffer,0,wBatch);
455 458 Console.WriteLine("done writer #1: {0} ms", Environment.TickCount - t1);
456 459 },
457 460 () => {
458 461 var buffer = new int[wBatch];
459 462 for(int i = 0; i<wBatch; i++)
460 463 buffer[i] = 2;
461 464
462 465 for(int i =0; i < wCount; i++)
463 466 queue.EnqueueRange(buffer,0,wBatch);
464 467 Console.WriteLine("done writer #2: {0} ms", Environment.TickCount - t1);
465 468 },
466 469 () => {
467 470 var buffer = new int[wBatch];
468 471 for(int i = 0; i<wBatch; i++)
469 472 buffer[i] = 3;
470 473
471 474 for(int i =0; i < wCount; i++)
472 475 queue.EnqueueRange(buffer,0,wBatch);
473 476 Console.WriteLine("done writer #3: {0} ms", Environment.TickCount - t1);
474 477 },
475 478 () => {
476 479 var buffer = new int[rBatch];
477 480 int count = 1;
478 481 double avgchunk = 0;
479 482 while(read < total) {
480 483 int actual;
481 484 if (queue.TryDequeueChunk(buffer,0,rBatch,out actual)) {
482 485 for(int i=0; i< actual; i++)
483 486 r2 += buffer[i];
484 487 Interlocked.Add(ref read, actual);
485 488 avgchunk = avgchunk*(count-1)/count + actual/(double)count;
486 489 count ++;
487 490 }
488 491 }
489 492
490 493 Console.WriteLine("done reader #2: {0} ms, avg chunk size: {1}", Environment.TickCount - t1, avgchunk);
491 494 }
492 495 )
493 496 .PromiseAll()
494 497 .Join();
495 498
496 499 Assert.AreEqual(summ , r1 + r2);
497 500
498 501 Console.WriteLine(
499 502 "done: {0} ms, summ#1: {1}, summ#2: {2}, total: {3}, count: {4}",
500 503 Environment.TickCount - t1,
501 504 r1,
502 505 r2,
503 506 r1 + r2,
504 507 total
505 508 );
506 509 }
507 510
508 511 [TestMethod]
509 512 public void AsyncQueueDrainTest() {
510 513 var queue = new AsyncQueue<int>();
511 514
512 const int wBatch = 11;
515 const int wBatch = 32;
513 516 const int wCount = 200000;
514 517 const int total = wBatch * wCount * 3;
515 518 const int summ = wBatch * wCount * 3;
516 519
517 520 int r1 = 0, r2 = 0;
518 const int rBatch = 11;
519 521 int read = 0;
520 522
521 523 var t1 = Environment.TickCount;
522 524
523 525 AsyncPool.RunThread(
524 526 () => {
525 527 var buffer = new int[wBatch];
526 528 for(int i = 0; i<wBatch; i++)
527 529 buffer[i] = 1;
528 530
529 531 for(int i =0; i < wCount; i++)
530 532 queue.EnqueueRange(buffer,0,wBatch);
531 533 Console.WriteLine("done writer #1: {0} ms", Environment.TickCount - t1);
532 534 },
533 535 () => {
534 for(int i =0; i < wCount * wBatch; i++)
535 queue.Enqueue(1);
536 var buffer = new int[wBatch];
537 for (int i = 0; i < wBatch; i++)
538 buffer[i] = 1;
539
540 for (int i = 0; i < wCount; i++)
541 queue.EnqueueRange(buffer, 0, wBatch);
536 542 Console.WriteLine("done writer #2: {0} ms", Environment.TickCount - t1);
537 543 },
538 544 () => {
539 545 var buffer = new int[wBatch];
540 546 for(int i = 0; i<wBatch; i++)
541 547 buffer[i] = 1;
542 548
543 549 for(int i =0; i < wCount; i++)
544 550 queue.EnqueueRange(buffer,0,wBatch);
545 551 Console.WriteLine("done writer #3: {0} ms", Environment.TickCount - t1);
546 552 },
547 553 /*() => {
548 554 int temp;
549 555 int count = 0;
550 556 while (read < total)
551 557 if (queue.TryDequeue(out temp)) {
552 558 count++;
553 559 r1 += temp;
554 560 Interlocked.Increment(ref read);
555 561 }
556 562 Console.WriteLine("done reader #1: {0} ms, {1} count", Environment.TickCount - t1, count);
557 563 },*/
558 564 /*() => {
559 565 var buffer = new int[rBatch];
560 566 var count = 0;
561 567 while(read < total) {
562 568 int actual;
563 569 if (queue.TryDequeueRange(buffer,0,rBatch,out actual)) {
564 570 for(int i=0; i< actual; i++)
565 571 r1 += buffer[i];
566 572 Interlocked.Add(ref read, actual);
567 573 count += actual;
568 574 }
569 575 }
570 576
571 577 Console.WriteLine("done reader #1: {0} ms, {1} items", Environment.TickCount - t1, count);
572 578 },*/
573 579 () => {
574 580 var count = 0;
575 while(read < total) {
581 int emptyDrains = 0;
582
583 while (read < total) {
576 584 var buffer = queue.Drain();
577 for(int i=0; i< buffer.Length; i++)
585 if (buffer.Count == 0)
586 emptyDrains++;
587 for(int i=0; i< buffer.Count; i++)
578 588 r1 += buffer[i];
579 Interlocked.Add(ref read, buffer.Length);
580 count += buffer.Length;
589 Interlocked.Add(ref read, buffer.Count);
590 count += buffer.Count;
581 591 }
582 Console.WriteLine("done reader #1: {0} ms, {1} items", Environment.TickCount - t1, count);
592 Console.WriteLine("done reader #1: {0} ms, {1} items, empty: {2}", Environment.TickCount - t1, count, emptyDrains);
583 593 },
584 594 () => {
585 var count = 0;
586 while(read < total) {
595 var count = 0;
596 int emptyDrains = 0;
597
598 while (read < total) {
587 599 var buffer = queue.Drain();
588 for(int i=0; i< buffer.Length; i++)
600 if (buffer.Count == 0)
601 emptyDrains++;
602
603 for (int i=0; i< buffer.Count; i++)
589 604 r2 += buffer[i];
590 Interlocked.Add(ref read, buffer.Length);
591 count += buffer.Length;
605 Interlocked.Add(ref read, buffer.Count);
606 count += buffer.Count;
592 607 }
593 Console.WriteLine("done reader #2: {0} ms, {1} items", Environment.TickCount - t1, count);
608 Console.WriteLine("done reader #2: {0} ms, {1} items, empty: {2}", Environment.TickCount - t1, count, emptyDrains);
594 609 }
595 610 )
596 611 .PromiseAll()
597 612 .Join();
598 613
599 614 Assert.AreEqual(summ , r1 + r2);
600 615
601 616 Console.WriteLine(
602 617 "done: {0} ms, summ#1: {1}, summ#2: {2}, total: {3}, count: {4}",
603 618 Environment.TickCount - t1,
604 619 r1,
605 620 r2,
606 621 r1 + r2,
607 622 total
608 623 );
609 624 }
610 625
611 626 [TestMethod]
612 627 public void ParallelMapTest() {
613 628
614 629 const int count = 100000;
615 630
616 631 var args = new double[count];
617 632 var rand = new Random();
618 633
619 634 for (int i = 0; i < count; i++)
620 635 args[i] = rand.NextDouble();
621 636
622 637 var t = Environment.TickCount;
623 638 var res = args.ParallelMap(x => Math.Sin(x*x), 4).Join();
624 639
625 640 Console.WriteLine("Map complete in {0} ms", Environment.TickCount - t);
626 641
627 642 t = Environment.TickCount;
628 643 for (int i = 0; i < count; i++)
629 644 Assert.AreEqual(Math.Sin(args[i] * args[i]), res[i]);
630 645 Console.WriteLine("Verified in {0} ms", Environment.TickCount - t);
631 646 }
632 647
633 648 [TestMethod]
634 649 public void ChainedMapTest() {
635 650
636 651 using (var pool = new WorkerPool()) {
637 652 const int count = 10000;
638 653
639 654 var args = new double[count];
640 655 var rand = new Random();
641 656
642 657 for (int i = 0; i < count; i++)
643 658 args[i] = rand.NextDouble();
644 659
645 660 var t = Environment.TickCount;
646 661 var res = args
647 662 .ChainedMap(
648 663 // Analysis disable once AccessToDisposedClosure
649 664 x => pool.Invoke(
650 665 () => Math.Sin(x * x)
651 666 ),
652 667 4
653 668 )
654 669 .Join();
655 670
656 671 Console.WriteLine("Map complete in {0} ms", Environment.TickCount - t);
657 672
658 673 t = Environment.TickCount;
659 674 for (int i = 0; i < count; i++)
660 675 Assert.AreEqual(Math.Sin(args[i] * args[i]), res[i]);
661 676 Console.WriteLine("Verified in {0} ms", Environment.TickCount - t);
662 677 Console.WriteLine("Max workers: {0}", pool.MaxRunningThreads);
663 678 }
664 679 }
665 680
666 681 [TestMethod]
667 682 public void ParallelForEachTest() {
668 683
669 684 const int count = 100000;
670 685
671 686 var args = new int[count];
672 687 var rand = new Random();
673 688
674 689 for (int i = 0; i < count; i++)
675 690 args[i] = (int)(rand.NextDouble() * 100);
676 691
677 692 int result = 0;
678 693
679 694 var t = Environment.TickCount;
680 695 args.ParallelForEach(x => Interlocked.Add(ref result, x), 4).Join();
681 696
682 697 Console.WriteLine("Iteration complete in {0} ms, result: {1}", Environment.TickCount - t, result);
683 698
684 699 int result2 = 0;
685 700
686 701 t = Environment.TickCount;
687 702 for (int i = 0; i < count; i++)
688 703 result2 += args[i];
689 704 Assert.AreEqual(result2, result);
690 705 Console.WriteLine("Verified in {0} ms", Environment.TickCount - t);
691 706 }
692 707
693 708 [TestMethod]
694 709 public void ComplexCase1Test() {
695 710 var flags = new bool[3];
696 711
697 712 // op1 (aync 200ms) => op2 (async 200ms) => op3 (sync map)
698 713
699 714 var step1 = PromiseHelper
700 715 .Sleep(200, "Alan")
701 716 .On(() => flags[0] = true, PromiseEventType.Cancelled);
702 717 var p = step1
703 718 .Chain(x =>
704 719 PromiseHelper
705 720 .Sleep(200, "Hi, " + x)
706 721 .Then(y => y)
707 722 .On(() => flags[1] = true, PromiseEventType.Cancelled)
708 723 )
709 724 .On(() => flags[2] = true, PromiseEventType.Cancelled);
710 725 step1.Join();
711 726 p.Cancel();
712 727 try {
713 728 Assert.AreEqual(p.Join(), "Hi, Alan");
714 729 Assert.Fail("Shouldn't get here");
715 730 } catch (OperationCanceledException) {
716 731 }
717 732
718 733 Assert.IsFalse(flags[0]);
719 734 Assert.IsTrue(flags[1]);
720 735 Assert.IsTrue(flags[2]);
721 736 }
722 737
723 738 [TestMethod]
724 739 public void ChainedCancel1Test() {
725 740 // при отмене сцепленной асинхронной операции все обещание должно
726 741 // завершаться ошибкой OperationCanceledException
727 742 var p = PromiseHelper
728 743 .Sleep(1, "Hi, HAL!")
729 744 .Then(x => {
730 745 // запускаем две асинхронные операции
731 746 var result = PromiseHelper.Sleep(1000, "HEM ENABLED!!!");
732 747 // вторая операция отменяет первую до завершения
733 748 PromiseHelper
734 749 .Sleep(100, "HAL, STOP!")
735 750 .Then(result.Cancel);
736 751 return result;
737 752 });
738 753 try {
739 754 p.Join();
740 755 } catch (TargetInvocationException err) {
741 756 Assert.IsTrue(err.InnerException is OperationCanceledException);
742 757 }
743 758 }
744 759
745 760 [TestMethod]
746 761 public void ChainedCancel2Test() {
747 762 // при отмене цепочки обещаний, вложенные операции также должны отменяться
748 763 var pSurvive = new Promise<bool>();
749 764 var hemStarted = new Signal();
750 765 var p = PromiseHelper
751 766 .Sleep(1, "Hi, HAL!")
752 767 .Chain(() => {
753 768 hemStarted.Set();
754 769 // запускаем две асинхронные операции
755 770 var result = PromiseHelper
756 771 .Sleep(2000, "HEM ENABLED!!!")
757 772 .Then(() => pSurvive.Resolve(false));
758 773
759 774 result
760 775 .On(() => pSurvive.Resolve(true), PromiseEventType.Cancelled);
761 776
762 777 return result;
763 778 });
764 779
765 780 hemStarted.Wait();
766 781 p.Cancel();
767 782
768 783 try {
769 784 p.Join();
770 785 Assert.Fail();
771 786 } catch (OperationCanceledException) {
772 787 }
773 788 Assert.IsTrue(pSurvive.Join());
774 789 }
775 790
776 791 [TestMethod]
777 792 public void SharedLockTest() {
778 793 var l = new SharedLock();
779 794 int shared = 0;
780 795 int exclusive = 0;
781 796 var s1 = new Signal();
782 797 var log = new AsyncQueue<string>();
783 798
784 799 try {
785 800 AsyncPool.RunThread(
786 801 () => {
787 802 log.Enqueue("Reader #1 started");
788 803 try {
789 804 l.LockShared();
790 805 log.Enqueue("Reader #1 lock got");
791 806 if (Interlocked.Increment(ref shared) == 2)
792 807 s1.Set();
793 808 s1.Wait();
794 809 log.Enqueue("Reader #1 finished");
795 810 Interlocked.Decrement(ref shared);
796 811 } finally {
797 812 l.Release();
798 813 log.Enqueue("Reader #1 lock released");
799 814 }
800 815 },
801 816 () => {
802 817 log.Enqueue("Reader #2 started");
803 818
804 819 try {
805 820 l.LockShared();
806 821 log.Enqueue("Reader #2 lock got");
807 822
808 823 if (Interlocked.Increment(ref shared) == 2)
809 824 s1.Set();
810 825 s1.Wait();
811 826 log.Enqueue("Reader #2 upgrading to writer");
812 827 Interlocked.Decrement(ref shared);
813 828 l.Upgrade();
814 829 log.Enqueue("Reader #2 upgraded");
815 830
816 831 Assert.AreEqual(1, Interlocked.Increment(ref exclusive));
817 832 Assert.AreEqual(0, shared);
818 833 log.Enqueue("Reader #2 finished");
819 834 Interlocked.Decrement(ref exclusive);
820 835 } finally {
821 836 l.Release();
822 837 log.Enqueue("Reader #2 lock released");
823 838 }
824 839 },
825 840 () => {
826 841 log.Enqueue("Writer #1 started");
827 842 try {
828 843 l.LockExclusive();
829 844 log.Enqueue("Writer #1 got the lock");
830 845 Assert.AreEqual(1, Interlocked.Increment(ref exclusive));
831 846 Interlocked.Decrement(ref exclusive);
832 847 log.Enqueue("Writer #1 is finished");
833 848 } finally {
834 849 l.Release();
835 850 log.Enqueue("Writer #1 lock released");
836 851 }
837 852 }
838 853 ).PromiseAll().Join(1000);
839 854 log.Enqueue("Done");
840 855 } catch(Exception error) {
841 856 log.Enqueue(error.Message);
842 857 throw;
843 858 } finally {
844 859 foreach (var m in log)
845 860 Console.WriteLine(m);
846 861 }
847 862 }
848 863
849 864 #if NET_4_5
850 865
851 866 [TestMethod]
852 867 public async void TaskInteropTest() {
853 868 var promise = new Promise<int>();
854 869 promise.Resolve(10);
855 870 var res = await promise;
856 871
857 872 Assert.AreEqual(10, res);
858 873 }
859 874
860 875 #endif
861 876 }
862 877 }
863 878
@@ -1,263 +1,253
1 1 
2 2 Microsoft Visual Studio Solution File, Format Version 12.00
3 3 # Visual Studio 14
4 4 VisualStudioVersion = 14.0.25420.1
5 5 MinimumVisualStudioVersion = 10.0.40219.1
6 6 Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Implab", "Implab\Implab.csproj", "{F550F1F8-8746-4AD0-9614-855F4C4B7F05}"
7 7 EndProject
8 8 Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "Solution Items", "Solution Items", "{CE8D8D18-437A-445C-B662-4C2CE79A76F6}"
9 9 ProjectSection(SolutionItems) = preProject
10 10 Implab.vsmdi = Implab.vsmdi
11 11 Local.testsettings = Local.testsettings
12 12 TraceAndTestImpact.testsettings = TraceAndTestImpact.testsettings
13 13 EndProjectSection
14 14 EndProject
15 15 Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Implab.Test", "Implab.Test\Implab.Test.csproj", "{63F92C0C-61BF-48C0-A377-8D67C3C661D0}"
16 16 EndProject
17 Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Implab.Fx", "Implab.Fx\Implab.Fx.csproj", "{06E706F8-6881-43EB-927E-FFC503AF6ABC}"
18 EndProject
19 17 Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Implab.Format.Test", "Implab.Format.Test\Implab.Format.Test.csproj", "{4D364996-7ECD-4193-8F90-F223FFEA49DA}"
20 18 EndProject
21 19 Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Implab.Playground", "Implab.Playground\Implab.Playground.csproj", "{100DFEB0-75BE-436F-ADDF-1F46EF433F46}"
22 20 EndProject
23 21 Global
24 22 GlobalSection(Performance) = preSolution
25 23 HasPerformanceSessions = true
26 24 EndGlobalSection
27 25 GlobalSection(SolutionConfigurationPlatforms) = preSolution
28 26 Debug 4.5|Any CPU = Debug 4.5|Any CPU
29 27 Debug|Any CPU = Debug|Any CPU
30 28 Release 4.5|Any CPU = Release 4.5|Any CPU
31 29 Release|Any CPU = Release|Any CPU
32 30 EndGlobalSection
33 31 GlobalSection(ProjectConfigurationPlatforms) = postSolution
34 32 {F550F1F8-8746-4AD0-9614-855F4C4B7F05}.Debug 4.5|Any CPU.ActiveCfg = Debug 4.5|Any CPU
35 33 {F550F1F8-8746-4AD0-9614-855F4C4B7F05}.Debug 4.5|Any CPU.Build.0 = Debug 4.5|Any CPU
36 34 {F550F1F8-8746-4AD0-9614-855F4C4B7F05}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
37 35 {F550F1F8-8746-4AD0-9614-855F4C4B7F05}.Debug|Any CPU.Build.0 = Debug|Any CPU
38 36 {F550F1F8-8746-4AD0-9614-855F4C4B7F05}.Release 4.5|Any CPU.ActiveCfg = Release 4.5|Any CPU
39 37 {F550F1F8-8746-4AD0-9614-855F4C4B7F05}.Release 4.5|Any CPU.Build.0 = Release 4.5|Any CPU
40 38 {F550F1F8-8746-4AD0-9614-855F4C4B7F05}.Release|Any CPU.ActiveCfg = Release|Any CPU
41 39 {F550F1F8-8746-4AD0-9614-855F4C4B7F05}.Release|Any CPU.Build.0 = Release|Any CPU
42 40 {63F92C0C-61BF-48C0-A377-8D67C3C661D0}.Debug 4.5|Any CPU.ActiveCfg = Debug 4.5|Any CPU
43 41 {63F92C0C-61BF-48C0-A377-8D67C3C661D0}.Debug 4.5|Any CPU.Build.0 = Debug 4.5|Any CPU
44 42 {63F92C0C-61BF-48C0-A377-8D67C3C661D0}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
45 43 {63F92C0C-61BF-48C0-A377-8D67C3C661D0}.Debug|Any CPU.Build.0 = Debug|Any CPU
46 44 {63F92C0C-61BF-48C0-A377-8D67C3C661D0}.Release 4.5|Any CPU.ActiveCfg = Release 4.5|Any CPU
47 45 {63F92C0C-61BF-48C0-A377-8D67C3C661D0}.Release 4.5|Any CPU.Build.0 = Release 4.5|Any CPU
48 46 {63F92C0C-61BF-48C0-A377-8D67C3C661D0}.Release|Any CPU.ActiveCfg = Release|Any CPU
49 47 {63F92C0C-61BF-48C0-A377-8D67C3C661D0}.Release|Any CPU.Build.0 = Release|Any CPU
50 {06E706F8-6881-43EB-927E-FFC503AF6ABC}.Debug 4.5|Any CPU.ActiveCfg = Debug 4.5|Any CPU
51 {06E706F8-6881-43EB-927E-FFC503AF6ABC}.Debug 4.5|Any CPU.Build.0 = Debug 4.5|Any CPU
52 {06E706F8-6881-43EB-927E-FFC503AF6ABC}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
53 {06E706F8-6881-43EB-927E-FFC503AF6ABC}.Debug|Any CPU.Build.0 = Debug|Any CPU
54 {06E706F8-6881-43EB-927E-FFC503AF6ABC}.Release 4.5|Any CPU.ActiveCfg = Release 4.5|Any CPU
55 {06E706F8-6881-43EB-927E-FFC503AF6ABC}.Release 4.5|Any CPU.Build.0 = Release 4.5|Any CPU
56 {06E706F8-6881-43EB-927E-FFC503AF6ABC}.Release|Any CPU.ActiveCfg = Release|Any CPU
57 {06E706F8-6881-43EB-927E-FFC503AF6ABC}.Release|Any CPU.Build.0 = Release|Any CPU
58 48 {4D364996-7ECD-4193-8F90-F223FFEA49DA}.Debug 4.5|Any CPU.ActiveCfg = Debug|Any CPU
59 49 {4D364996-7ECD-4193-8F90-F223FFEA49DA}.Debug 4.5|Any CPU.Build.0 = Debug|Any CPU
60 50 {4D364996-7ECD-4193-8F90-F223FFEA49DA}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
61 51 {4D364996-7ECD-4193-8F90-F223FFEA49DA}.Debug|Any CPU.Build.0 = Debug|Any CPU
62 52 {4D364996-7ECD-4193-8F90-F223FFEA49DA}.Release 4.5|Any CPU.ActiveCfg = Release|Any CPU
63 53 {4D364996-7ECD-4193-8F90-F223FFEA49DA}.Release 4.5|Any CPU.Build.0 = Release|Any CPU
64 54 {4D364996-7ECD-4193-8F90-F223FFEA49DA}.Release|Any CPU.ActiveCfg = Release|Any CPU
65 55 {4D364996-7ECD-4193-8F90-F223FFEA49DA}.Release|Any CPU.Build.0 = Release|Any CPU
66 56 {100DFEB0-75BE-436F-ADDF-1F46EF433F46}.Debug 4.5|Any CPU.ActiveCfg = Debug|Any CPU
67 57 {100DFEB0-75BE-436F-ADDF-1F46EF433F46}.Debug 4.5|Any CPU.Build.0 = Debug|Any CPU
68 58 {100DFEB0-75BE-436F-ADDF-1F46EF433F46}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
69 59 {100DFEB0-75BE-436F-ADDF-1F46EF433F46}.Debug|Any CPU.Build.0 = Debug|Any CPU
70 60 {100DFEB0-75BE-436F-ADDF-1F46EF433F46}.Release 4.5|Any CPU.ActiveCfg = Release|Any CPU
71 61 {100DFEB0-75BE-436F-ADDF-1F46EF433F46}.Release 4.5|Any CPU.Build.0 = Release|Any CPU
72 62 {100DFEB0-75BE-436F-ADDF-1F46EF433F46}.Release|Any CPU.ActiveCfg = Release|Any CPU
73 63 {100DFEB0-75BE-436F-ADDF-1F46EF433F46}.Release|Any CPU.Build.0 = Release|Any CPU
74 64 EndGlobalSection
75 65 GlobalSection(SolutionProperties) = preSolution
76 66 HideSolutionNode = FALSE
77 67 EndGlobalSection
78 68 GlobalSection(MonoDevelopProperties) = preSolution
79 69 StartupItem = Implab\Implab.csproj
80 70 Policies = $0
81 71 $0.CSharpFormattingPolicy = $1
82 72 $1.IndentSwitchBody = True
83 73 $1.NamespaceBraceStyle = EndOfLine
84 74 $1.ClassBraceStyle = EndOfLine
85 75 $1.InterfaceBraceStyle = EndOfLine
86 76 $1.StructBraceStyle = EndOfLine
87 77 $1.EnumBraceStyle = EndOfLine
88 78 $1.MethodBraceStyle = EndOfLine
89 79 $1.ConstructorBraceStyle = EndOfLine
90 80 $1.DestructorBraceStyle = EndOfLine
91 81 $1.BeforeMethodDeclarationParentheses = False
92 82 $1.BeforeMethodCallParentheses = False
93 83 $1.BeforeConstructorDeclarationParentheses = False
94 84 $1.NewLineBeforeConstructorInitializerColon = NewLine
95 85 $1.NewLineAfterConstructorInitializerColon = SameLine
96 86 $1.BeforeIndexerDeclarationBracket = False
97 87 $1.BeforeDelegateDeclarationParentheses = False
98 88 $1.NewParentheses = False
99 89 $1.SpacesBeforeBrackets = False
100 90 $1.inheritsSet = Mono
101 91 $1.inheritsScope = text/x-csharp
102 92 $1.scope = text/x-csharp
103 93 $0.TextStylePolicy = $6
104 94 $2.FileWidth = 120
105 95 $2.EolMarker = Unix
106 96 $2.inheritsSet = VisualStudio
107 97 $2.inheritsScope = text/plain
108 98 $2.scope = text/x-csharp
109 99 $0.DotNetNamingPolicy = $3
110 100 $3.DirectoryNamespaceAssociation = PrefixedHierarchical
111 101 $3.ResourceNamePolicy = MSBuild
112 102 $4.FileWidth = 120
113 103 $4.TabsToSpaces = False
114 104 $4.inheritsSet = VisualStudio
115 105 $4.inheritsScope = text/plain
116 106 $4.scope = application/xml
117 107 $0.XmlFormattingPolicy = $5
118 108 $5.inheritsSet = Mono
119 109 $5.inheritsScope = application/xml
120 110 $5.scope = application/xml
121 111 $6.FileWidth = 120
122 112 $6.TabsToSpaces = False
123 113 $6.inheritsSet = VisualStudio
124 114 $6.inheritsScope = text/plain
125 115 $6.scope = text/plain
126 116 $0.NameConventionPolicy = $7
127 117 $7.Rules = $8
128 118 $8.NamingRule = $34
129 119 $9.Name = Namespaces
130 120 $9.AffectedEntity = Namespace
131 121 $9.VisibilityMask = VisibilityMask
132 122 $9.NamingStyle = PascalCase
133 123 $9.IncludeInstanceMembers = True
134 124 $9.IncludeStaticEntities = True
135 125 $10.Name = Types
136 126 $10.AffectedEntity = Class, Struct, Enum, Delegate
137 127 $10.VisibilityMask = VisibilityMask
138 128 $10.NamingStyle = PascalCase
139 129 $10.IncludeInstanceMembers = True
140 130 $10.IncludeStaticEntities = True
141 131 $11.Name = Interfaces
142 132 $11.RequiredPrefixes = $12
143 133 $12.String = I
144 134 $11.AffectedEntity = Interface
145 135 $11.VisibilityMask = VisibilityMask
146 136 $11.NamingStyle = PascalCase
147 137 $11.IncludeInstanceMembers = True
148 138 $11.IncludeStaticEntities = True
149 139 $13.Name = Attributes
150 140 $13.RequiredSuffixes = $14
151 141 $14.String = Attribute
152 142 $13.AffectedEntity = CustomAttributes
153 143 $13.VisibilityMask = VisibilityMask
154 144 $13.NamingStyle = PascalCase
155 145 $13.IncludeInstanceMembers = True
156 146 $13.IncludeStaticEntities = True
157 147 $15.Name = Event Arguments
158 148 $15.RequiredSuffixes = $16
159 149 $16.String = EventArgs
160 150 $15.AffectedEntity = CustomEventArgs
161 151 $15.VisibilityMask = VisibilityMask
162 152 $15.NamingStyle = PascalCase
163 153 $15.IncludeInstanceMembers = True
164 154 $15.IncludeStaticEntities = True
165 155 $17.Name = Exceptions
166 156 $17.RequiredSuffixes = $18
167 157 $18.String = Exception
168 158 $17.AffectedEntity = CustomExceptions
169 159 $17.VisibilityMask = VisibilityMask
170 160 $17.NamingStyle = PascalCase
171 161 $17.IncludeInstanceMembers = True
172 162 $17.IncludeStaticEntities = True
173 163 $19.Name = Methods
174 164 $19.AffectedEntity = Methods
175 165 $19.VisibilityMask = VisibilityMask
176 166 $19.NamingStyle = PascalCase
177 167 $19.IncludeInstanceMembers = True
178 168 $19.IncludeStaticEntities = True
179 169 $20.Name = Static Readonly Fields
180 170 $20.AffectedEntity = ReadonlyField
181 171 $20.VisibilityMask = Internal, Protected, Public
182 172 $20.NamingStyle = CamelCase
183 173 $20.IncludeInstanceMembers = False
184 174 $20.IncludeStaticEntities = True
185 175 $21.Name = Fields (Non Private)
186 176 $21.AffectedEntity = Field
187 177 $21.VisibilityMask = Internal, Public
188 178 $21.NamingStyle = CamelCase
189 179 $21.IncludeInstanceMembers = True
190 180 $21.IncludeStaticEntities = True
191 181 $22.Name = ReadOnly Fields (Non Private)
192 182 $22.AffectedEntity = ReadonlyField
193 183 $22.VisibilityMask = Internal, Public
194 184 $22.NamingStyle = CamelCase
195 185 $22.IncludeInstanceMembers = True
196 186 $22.IncludeStaticEntities = False
197 187 $23.Name = Fields (Private)
198 188 $23.RequiredPrefixes = $24
199 189 $24.String = m_
200 190 $23.AffectedEntity = Field, ReadonlyField
201 191 $23.VisibilityMask = Private, Protected
202 192 $23.NamingStyle = CamelCase
203 193 $23.IncludeInstanceMembers = True
204 194 $23.IncludeStaticEntities = False
205 195 $25.Name = Static Fields (Private)
206 196 $25.RequiredPrefixes = $26
207 197 $26.String = _
208 198 $25.AffectedEntity = Field
209 199 $25.VisibilityMask = Private
210 200 $25.NamingStyle = CamelCase
211 201 $25.IncludeInstanceMembers = False
212 202 $25.IncludeStaticEntities = True
213 203 $27.Name = ReadOnly Fields (Private)
214 204 $27.RequiredPrefixes = $28
215 205 $28.String = m_
216 206 $27.AffectedEntity = ReadonlyField
217 207 $27.VisibilityMask = Private, Protected
218 208 $27.NamingStyle = CamelCase
219 209 $27.IncludeInstanceMembers = True
220 210 $27.IncludeStaticEntities = False
221 211 $29.Name = Constant Fields
222 212 $29.AffectedEntity = ConstantField
223 213 $29.VisibilityMask = VisibilityMask
224 214 $29.NamingStyle = AllUpper
225 215 $29.IncludeInstanceMembers = True
226 216 $29.IncludeStaticEntities = True
227 217 $30.Name = Properties
228 218 $30.AffectedEntity = Property
229 219 $30.VisibilityMask = VisibilityMask
230 220 $30.NamingStyle = PascalCase
231 221 $30.IncludeInstanceMembers = True
232 222 $30.IncludeStaticEntities = True
233 223 $31.Name = Events
234 224 $31.AffectedEntity = Event
235 225 $31.VisibilityMask = VisibilityMask
236 226 $31.NamingStyle = PascalCase
237 227 $31.IncludeInstanceMembers = True
238 228 $31.IncludeStaticEntities = True
239 229 $32.Name = Enum Members
240 230 $32.AffectedEntity = EnumMember
241 231 $32.VisibilityMask = VisibilityMask
242 232 $32.NamingStyle = PascalCase
243 233 $32.IncludeInstanceMembers = True
244 234 $32.IncludeStaticEntities = True
245 235 $33.Name = Parameters
246 236 $33.AffectedEntity = Parameter, LocalVariable
247 237 $33.VisibilityMask = VisibilityMask
248 238 $33.NamingStyle = CamelCase
249 239 $33.IncludeInstanceMembers = True
250 240 $33.IncludeStaticEntities = True
251 241 $34.Name = Type Parameters
252 242 $34.RequiredPrefixes = $35
253 243 $35.String = T
254 244 $34.AffectedEntity = TypeParameter
255 245 $34.VisibilityMask = VisibilityMask
256 246 $34.NamingStyle = PascalCase
257 247 $34.IncludeInstanceMembers = True
258 248 $34.IncludeStaticEntities = True
259 249 EndGlobalSection
260 250 GlobalSection(TestCaseManagementSettings) = postSolution
261 251 CategoryFile = Implab.vsmdi
262 252 EndGlobalSection
263 253 EndGlobal
@@ -1,300 +1,300
1 1 using System;
2 2 using Implab.Parallels;
3 3 using System.Threading;
4 4 using System.Reflection;
5 5
6 6 namespace Implab {
7 7 public abstract class AbstractEvent<THandler> : ICancellationToken, ICancellable {
8 8
9 9 const int UNRESOLVED_SATE = 0;
10 10 const int TRANSITIONAL_STATE = 1;
11 11 protected const int SUCCEEDED_STATE = 2;
12 12 protected const int REJECTED_STATE = 3;
13 13 protected const int CANCELLED_STATE = 4;
14 14
15 15 const int CANCEL_NOT_REQUESTED = 0;
16 16 const int CANCEL_REQUESTING = 1;
17 17 const int CANCEL_REQUESTED = 2;
18 18
19 19 const int RESERVED_HANDLERS_COUNT = 4;
20 20
21 21 int m_state;
22 22 Exception m_error;
23 23 int m_handlersCount;
24 24
25 25 //readonly THandler[] m_handlers = new THandler[RESERVED_HANDLERS_COUNT];
26 26 THandler[] m_handlers;
27 MTQueue<THandler> m_extraHandlers;
27 SimpleAsyncQueue<THandler> m_extraHandlers;
28 28 int m_handlerPointer = -1;
29 29 int m_handlersCommited;
30 30
31 31 int m_cancelRequest;
32 32 Exception m_cancelationReason;
33 MTQueue<Action<Exception>> m_cancelationHandlers;
33 SimpleAsyncQueue<Action<Exception>> m_cancelationHandlers;
34 34
35 35
36 36 #region state managment
37 37 bool BeginTransit() {
38 38 return UNRESOLVED_SATE == Interlocked.CompareExchange(ref m_state, TRANSITIONAL_STATE, UNRESOLVED_SATE);
39 39 }
40 40
41 41 void CompleteTransit(int state) {
42 42 if (TRANSITIONAL_STATE != Interlocked.CompareExchange(ref m_state, state, TRANSITIONAL_STATE))
43 43 throw new InvalidOperationException("Can't complete transition when the object isn't in the transitional state");
44 44 }
45 45
46 46 void WaitTransition() {
47 47 while (m_state == TRANSITIONAL_STATE) {
48 48 Thread.MemoryBarrier();
49 49 }
50 50 }
51 51
52 52 protected bool BeginSetResult() {
53 53 if (!BeginTransit()) {
54 54 WaitTransition();
55 55 if (m_state != CANCELLED_STATE)
56 56 throw new InvalidOperationException("The promise is already resolved");
57 57 return false;
58 58 }
59 59 return true;
60 60 }
61 61
62 62 protected void EndSetResult() {
63 63 CompleteTransit(SUCCEEDED_STATE);
64 64 Signal();
65 65 }
66 66
67 67
68 68
69 69 /// <summary>
70 70 /// Выполняет обещание, сообщая об ошибке
71 71 /// </summary>
72 72 /// <remarks>
73 73 /// Поскольку обещание должно работать в многопточной среде, при его выполнении сразу несколько потоков
74 74 /// могу вернуть ошибку, при этом только первая будет использована в качестве результата, остальные
75 75 /// будут проигнорированы.
76 76 /// </remarks>
77 77 /// <param name="error">Исключение возникшее при выполнении операции</param>
78 78 /// <exception cref="InvalidOperationException">Данное обещание уже выполнено</exception>
79 79 protected void SetError(Exception error) {
80 80 if (BeginTransit()) {
81 81 m_error = error;
82 82 CompleteTransit(REJECTED_STATE);
83 83
84 84 Signal();
85 85 } else {
86 86 WaitTransition();
87 87 if (m_state == SUCCEEDED_STATE)
88 88 throw new InvalidOperationException("The promise is already resolved");
89 89 }
90 90 }
91 91
92 92 /// <summary>
93 93 /// Отменяет операцию, если это возможно.
94 94 /// </summary>
95 95 /// <remarks>Для определения была ли операция отменена следует использовать свойство <see cref="IsCancelled"/>.</remarks>
96 96 protected void SetCancelled(Exception reason) {
97 97 if (BeginTransit()) {
98 98 m_error = reason;
99 99 CompleteTransit(CANCELLED_STATE);
100 100 Signal();
101 101 }
102 102 }
103 103
104 104 protected abstract void SignalHandler(THandler handler, int signal);
105 105
106 106 void Signal() {
107 107 var hp = m_handlerPointer;
108 108 var slot = hp +1 ;
109 109 while (slot < m_handlersCommited) {
110 110 if (Interlocked.CompareExchange(ref m_handlerPointer, slot, hp) == hp) {
111 111 SignalHandler(m_handlers[slot], m_state);
112 112 }
113 113 hp = m_handlerPointer;
114 114 slot = hp +1 ;
115 115 }
116 116
117 117
118 118 if (m_extraHandlers != null) {
119 119 THandler handler;
120 120 while (m_extraHandlers.TryDequeue(out handler))
121 121 SignalHandler(handler, m_state);
122 122 }
123 123 }
124 124
125 125 #endregion
126 126
127 127 protected abstract Signal GetResolveSignal();
128 128
129 129 #region synchronization traits
130 130 protected void WaitResult(int timeout) {
131 131 if (!(IsResolved || GetResolveSignal().Wait(timeout)))
132 132 throw new TimeoutException();
133 133
134 134 switch (m_state) {
135 135 case SUCCEEDED_STATE:
136 136 return;
137 137 case CANCELLED_STATE:
138 138 throw new OperationCanceledException("The operation has been cancelled", m_error);
139 139 case REJECTED_STATE:
140 140 throw new TargetInvocationException(m_error);
141 141 default:
142 142 throw new ApplicationException(String.Format("The promise state {0} is invalid", m_state));
143 143 }
144 144 }
145 145 #endregion
146 146
147 147 #region handlers managment
148 148
149 149 protected void AddHandler(THandler handler) {
150 150
151 151 if (m_state > 1) {
152 152 // the promise is in the resolved state, just invoke the handler
153 153 SignalHandler(handler, m_state);
154 154 } else {
155 155 var slot = Interlocked.Increment(ref m_handlersCount) - 1;
156 156
157 157 if (slot < RESERVED_HANDLERS_COUNT) {
158 158
159 159 if (slot == 0) {
160 160 m_handlers = new THandler[RESERVED_HANDLERS_COUNT];
161 161 } else {
162 162 while (m_handlers == null)
163 163 Thread.MemoryBarrier();
164 164 }
165 165
166 166 m_handlers[slot] = handler;
167 167
168 168 while (slot != Interlocked.CompareExchange(ref m_handlersCommited, slot + 1, slot)) {
169 169 }
170 170
171 171 if (m_state > 1) {
172 172 do {
173 173 var hp = m_handlerPointer;
174 174 slot = hp + 1;
175 175 if (slot < m_handlersCommited) {
176 176 if (Interlocked.CompareExchange(ref m_handlerPointer, slot, hp) != hp)
177 177 continue;
178 178 SignalHandler(m_handlers[slot], m_state);
179 179 }
180 180 break;
181 181 } while(true);
182 182 }
183 183 } else {
184 184 if (slot == RESERVED_HANDLERS_COUNT) {
185 m_extraHandlers = new MTQueue<THandler>();
185 m_extraHandlers = new SimpleAsyncQueue<THandler>();
186 186 } else {
187 187 while (m_extraHandlers == null)
188 188 Thread.MemoryBarrier();
189 189 }
190 190
191 191 m_extraHandlers.Enqueue(handler);
192 192
193 193 if (m_state > 1 && m_extraHandlers.TryDequeue(out handler))
194 194 // if the promise have been resolved while we was adding the handler to the queue
195 195 // we can't guarantee that someone is still processing it
196 196 // therefore we need to fetch a handler from the queue and execute it
197 197 // note that fetched handler may be not the one that we have added
198 198 // even we can fetch no handlers at all :)
199 199 SignalHandler(handler, m_state);
200 200 }
201 201 }
202 202 }
203 203
204 204 #endregion
205 205
206 206 #region IPromise implementation
207 207
208 208 public bool IsResolved {
209 209 get {
210 210 Thread.MemoryBarrier();
211 211 return m_state > 1;
212 212 }
213 213 }
214 214
215 215 public bool IsCancelled {
216 216 get {
217 217 Thread.MemoryBarrier();
218 218 return m_state == CANCELLED_STATE;
219 219 }
220 220 }
221 221
222 222 #endregion
223 223
224 224 public Exception Error {
225 225 get {
226 226 return m_error;
227 227 }
228 228 }
229 229
230 230 public bool CancelOperationIfRequested() {
231 231 if (IsCancellationRequested) {
232 232 CancelOperation(CancellationReason);
233 233 return true;
234 234 }
235 235 return false;
236 236 }
237 237
238 238 public virtual void CancelOperation(Exception reason) {
239 239 SetCancelled(reason);
240 240 }
241 241
242 242 public void CancellationRequested(Action<Exception> handler) {
243 243 Safe.ArgumentNotNull(handler, "handler");
244 244 if (IsCancellationRequested)
245 245 handler(CancellationReason);
246 246
247 247 if (m_cancelationHandlers == null)
248 Interlocked.CompareExchange(ref m_cancelationHandlers, new MTQueue<Action<Exception>>(), null);
248 Interlocked.CompareExchange(ref m_cancelationHandlers, new SimpleAsyncQueue<Action<Exception>>(), null);
249 249
250 250 m_cancelationHandlers.Enqueue(handler);
251 251
252 252 if (IsCancellationRequested && m_cancelationHandlers.TryDequeue(out handler))
253 253 // TryDeque implies MemoryBarrier()
254 254 handler(m_cancelationReason);
255 255 }
256 256
257 257 public bool IsCancellationRequested {
258 258 get {
259 259 do {
260 260 if (m_cancelRequest == CANCEL_NOT_REQUESTED)
261 261 return false;
262 262 if (m_cancelRequest == CANCEL_REQUESTED)
263 263 return true;
264 264 Thread.MemoryBarrier();
265 265 } while(true);
266 266 }
267 267 }
268 268
269 269 public Exception CancellationReason {
270 270 get {
271 271 do {
272 272 Thread.MemoryBarrier();
273 273 } while(m_cancelRequest == CANCEL_REQUESTING);
274 274
275 275 return m_cancelationReason;
276 276 }
277 277 }
278 278
279 279 #region ICancellable implementation
280 280
281 281 public void Cancel() {
282 282 Cancel(null);
283 283 }
284 284
285 285 public void Cancel(Exception reason) {
286 286 if (CANCEL_NOT_REQUESTED == Interlocked.CompareExchange(ref m_cancelRequest, CANCEL_REQUESTING, CANCEL_NOT_REQUESTED)) {
287 287 m_cancelationReason = reason;
288 288 m_cancelRequest = CANCEL_REQUESTED;
289 289 if (m_cancelationHandlers != null) {
290 290 Action<Exception> handler;
291 291 while (m_cancelationHandlers.TryDequeue(out handler))
292 292 handler(m_cancelationReason);
293 293 }
294 294 }
295 295 }
296 296
297 297 #endregion
298 298 }
299 299 }
300 300
@@ -1,41 +1,41
1 1 using System;
2 2
3 3 namespace Implab.Automaton {
4 public struct AutomatonTransition : IEquatable<AutomatonTransition> {
4 public class AutomatonTransition : IEquatable<AutomatonTransition> {
5 5 public readonly int s1;
6 6 public readonly int s2;
7 7 public readonly int edge;
8 8
9 9 public AutomatonTransition(int s1, int s2, int edge) {
10 10 this.s1 = s1;
11 11 this.s2 = s2;
12 12 this.edge = edge;
13 13 }
14 14
15 15
16 16 #region IEquatable implementation
17 17 public bool Equals(AutomatonTransition other) {
18 18 return other.s1 == s1 && other.s2 == s2 && other.edge == edge ;
19 19 }
20 20 #endregion
21 21
22 22 public override bool Equals(object obj) {
23 23 if (obj is AutomatonTransition)
24 24 return Equals((AutomatonTransition)obj);
25 25 return base.Equals(obj);
26 26 }
27 27
28 28 public override int GetHashCode() {
29 29 return s1 + s2 + edge;
30 30 }
31 31
32 32 public static bool operator == (AutomatonTransition rv, AutomatonTransition lv) {
33 33 return rv.Equals(lv);
34 34 }
35 35
36 36 public static bool operator !=(AutomatonTransition rv, AutomatonTransition lv) {
37 37 return rv.Equals(lv);
38 38 }
39 39 }
40 40 }
41 41
@@ -1,282 +1,283
1 1 <?xml version="1.0" encoding="utf-8"?>
2 <Project DefaultTargets="Build" ToolsVersion="4.0" xmlns="http://schemas.microsoft.com/developer/msbuild/2003">
2 <Project DefaultTargets="Build" ToolsVersion="12.0" xmlns="http://schemas.microsoft.com/developer/msbuild/2003">
3 3 <PropertyGroup>
4 4 <Configuration Condition=" '$(Configuration)' == '' ">Debug</Configuration>
5 5 <Platform Condition=" '$(Platform)' == '' ">AnyCPU</Platform>
6 6 <ProjectGuid>{F550F1F8-8746-4AD0-9614-855F4C4B7F05}</ProjectGuid>
7 7 <OutputType>Library</OutputType>
8 8 <RootNamespace>Implab</RootNamespace>
9 9 <AssemblyName>Implab</AssemblyName>
10 10 <TargetFrameworkVersion>v4.5</TargetFrameworkVersion>
11 <TargetFrameworkProfile />
11 12 </PropertyGroup>
12 13 <PropertyGroup Condition=" '$(Configuration)|$(Platform)' == 'Debug|AnyCPU' ">
13 14 <DebugSymbols>true</DebugSymbols>
14 15 <DebugType>full</DebugType>
15 16 <Optimize>false</Optimize>
16 17 <OutputPath>bin\Debug</OutputPath>
17 18 <DefineConstants>TRACE;DEBUG;</DefineConstants>
18 19 <ErrorReport>prompt</ErrorReport>
19 20 <WarningLevel>4</WarningLevel>
20 21 <ConsolePause>false</ConsolePause>
21 22 <RunCodeAnalysis>true</RunCodeAnalysis>
22 23 </PropertyGroup>
23 24 <PropertyGroup Condition=" '$(Configuration)|$(Platform)' == 'Release|AnyCPU' ">
24 25 <DebugType>full</DebugType>
25 26 <Optimize>true</Optimize>
26 27 <OutputPath>bin\Release</OutputPath>
27 28 <ErrorReport>prompt</ErrorReport>
28 29 <WarningLevel>4</WarningLevel>
29 30 <ConsolePause>false</ConsolePause>
30 31 </PropertyGroup>
31 32 <PropertyGroup Condition=" '$(Configuration)|$(Platform)' == 'Debug 4.5|AnyCPU' ">
32 33 <DebugSymbols>true</DebugSymbols>
33 34 <DebugType>full</DebugType>
34 35 <Optimize>false</Optimize>
35 36 <OutputPath>bin\Debug</OutputPath>
36 37 <DefineConstants>TRACE;DEBUG;NET_4_5</DefineConstants>
37 38 <ErrorReport>prompt</ErrorReport>
38 39 <WarningLevel>4</WarningLevel>
39 40 <RunCodeAnalysis>true</RunCodeAnalysis>
40 41 <ConsolePause>false</ConsolePause>
41 42 </PropertyGroup>
42 43 <PropertyGroup Condition=" '$(Configuration)|$(Platform)' == 'Release 4.5|AnyCPU' ">
43 44 <Optimize>true</Optimize>
44 45 <OutputPath>bin\Release</OutputPath>
45 46 <ErrorReport>prompt</ErrorReport>
46 47 <WarningLevel>4</WarningLevel>
47 48 <ConsolePause>false</ConsolePause>
48 49 <DefineConstants>NET_4_5</DefineConstants>
49 50 </PropertyGroup>
50 51 <PropertyGroup Condition=" '$(Configuration)|$(Platform)' == 'DebugMono|AnyCPU' ">
51 52 <DebugSymbols>true</DebugSymbols>
52 53 <DebugType>full</DebugType>
53 54 <Optimize>false</Optimize>
54 55 <OutputPath>bin\Debug</OutputPath>
55 56 <DefineConstants>TRACE;DEBUG;NET_4_5;MONO</DefineConstants>
56 57 <ErrorReport>prompt</ErrorReport>
57 58 <WarningLevel>4</WarningLevel>
58 59 <RunCodeAnalysis>true</RunCodeAnalysis>
59 60 <ConsolePause>false</ConsolePause>
60 61 </PropertyGroup>
61 62 <PropertyGroup Condition=" '$(Configuration)|$(Platform)' == 'ReleaseMono|AnyCPU' ">
62 63 <Optimize>true</Optimize>
63 64 <OutputPath>bin\Release</OutputPath>
64 65 <DefineConstants>NET_4_5;MONO;</DefineConstants>
65 66 <ErrorReport>prompt</ErrorReport>
66 67 <WarningLevel>4</WarningLevel>
67 68 <ConsolePause>false</ConsolePause>
68 69 </PropertyGroup>
69 70 <ItemGroup>
70 71 <Reference Include="System" />
71 72 <Reference Include="System.Xml" />
72 73 <Reference Include="mscorlib" />
73 74 <Reference Include="System.Xml.Linq" />
74 75 </ItemGroup>
75 76 <ItemGroup>
76 77 <Compile Include="Components\StateChangeEventArgs.cs" />
77 78 <Compile Include="CustomEqualityComparer.cs" />
78 79 <Compile Include="Diagnostics\ConsoleTraceListener.cs" />
79 80 <Compile Include="Diagnostics\LogChannel.cs" />
80 81 <Compile Include="Diagnostics\LogicalOperation.cs" />
81 82 <Compile Include="Diagnostics\TextFileListener.cs" />
82 83 <Compile Include="Diagnostics\Trace.cs" />
83 84 <Compile Include="Diagnostics\TraceLog.cs" />
84 85 <Compile Include="Diagnostics\TraceEvent.cs" />
85 86 <Compile Include="Diagnostics\TraceEventType.cs" />
86 87 <Compile Include="Diagnostics\TraceSourceAttribute.cs" />
87 88 <Compile Include="Formats\CharMap.cs" />
88 89 <Compile Include="Formats\InputScanner.cs" />
89 90 <Compile Include="Formats\Json\JsonStringScanner.cs" />
90 91 <Compile Include="Formats\Json\JsonTextScanner.cs" />
91 92 <Compile Include="ICancellable.cs" />
92 93 <Compile Include="IProgressHandler.cs" />
93 94 <Compile Include="IProgressNotifier.cs" />
94 95 <Compile Include="IPromiseT.cs" />
95 96 <Compile Include="IPromise.cs" />
96 97 <Compile Include="IServiceLocator.cs" />
97 98 <Compile Include="ITaskController.cs" />
98 99 <Compile Include="Parallels\DispatchPool.cs" />
99 100 <Compile Include="Parallels\ArrayTraits.cs" />
100 <Compile Include="Parallels\MTQueue.cs" />
101 <Compile Include="Parallels\SimpleAsyncQueue.cs" />
101 102 <Compile Include="Parallels\WorkerPool.cs" />
102 103 <Compile Include="ProgressInitEventArgs.cs" />
103 104 <Compile Include="Properties\AssemblyInfo.cs" />
104 105 <Compile Include="Parallels\AsyncPool.cs" />
105 106 <Compile Include="Safe.cs" />
106 107 <Compile Include="SyncContextPromise.cs" />
107 108 <Compile Include="ValueEventArgs.cs" />
108 109 <Compile Include="PromiseExtensions.cs" />
109 110 <Compile Include="SyncContextPromiseT.cs" />
110 111 <Compile Include="Diagnostics\OperationContext.cs" />
111 112 <Compile Include="Diagnostics\TraceContext.cs" />
112 113 <Compile Include="Diagnostics\LogEventArgs.cs" />
113 114 <Compile Include="Diagnostics\LogEventArgsT.cs" />
114 115 <Compile Include="Diagnostics\Extensions.cs" />
115 116 <Compile Include="PromiseEventType.cs" />
116 117 <Compile Include="Parallels\AsyncQueue.cs" />
117 118 <Compile Include="PromiseT.cs" />
118 119 <Compile Include="IDeferred.cs" />
119 120 <Compile Include="IDeferredT.cs" />
120 121 <Compile Include="Promise.cs" />
121 122 <Compile Include="PromiseTransientException.cs" />
122 123 <Compile Include="Parallels\Signal.cs" />
123 124 <Compile Include="Parallels\SharedLock.cs" />
124 125 <Compile Include="Diagnostics\ILogWriter.cs" />
125 126 <Compile Include="Diagnostics\ListenerBase.cs" />
126 127 <Compile Include="Parallels\BlockingQueue.cs" />
127 128 <Compile Include="AbstractEvent.cs" />
128 129 <Compile Include="AbstractPromise.cs" />
129 130 <Compile Include="AbstractPromiseT.cs" />
130 131 <Compile Include="FuncTask.cs" />
131 132 <Compile Include="FuncTaskBase.cs" />
132 133 <Compile Include="FuncTaskT.cs" />
133 134 <Compile Include="ActionChainTaskBase.cs" />
134 135 <Compile Include="ActionChainTask.cs" />
135 136 <Compile Include="ActionChainTaskT.cs" />
136 137 <Compile Include="FuncChainTaskBase.cs" />
137 138 <Compile Include="FuncChainTask.cs" />
138 139 <Compile Include="FuncChainTaskT.cs" />
139 140 <Compile Include="ActionTaskBase.cs" />
140 141 <Compile Include="ActionTask.cs" />
141 142 <Compile Include="ActionTaskT.cs" />
142 143 <Compile Include="ICancellationToken.cs" />
143 144 <Compile Include="SuccessPromise.cs" />
144 145 <Compile Include="SuccessPromiseT.cs" />
145 146 <Compile Include="PromiseAwaiterT.cs" />
146 147 <Compile Include="PromiseAwaiter.cs" />
147 148 <Compile Include="Components\ComponentContainer.cs" />
148 149 <Compile Include="Components\Disposable.cs" />
149 150 <Compile Include="Components\DisposablePool.cs" />
150 151 <Compile Include="Components\ObjectPool.cs" />
151 152 <Compile Include="Components\ServiceLocator.cs" />
152 153 <Compile Include="Components\IInitializable.cs" />
153 154 <Compile Include="TaskController.cs" />
154 155 <Compile Include="Components\App.cs" />
155 156 <Compile Include="Components\IRunnable.cs" />
156 157 <Compile Include="Components\ExecutionState.cs" />
157 158 <Compile Include="Components\RunnableComponent.cs" />
158 159 <Compile Include="Components\IFactory.cs" />
159 160 <Compile Include="Automaton\IAlphabet.cs" />
160 161 <Compile Include="Automaton\ParserException.cs" />
161 162 <Compile Include="Automaton\IndexedAlphabetBase.cs" />
162 163 <Compile Include="Automaton\IAlphabetBuilder.cs" />
163 164 <Compile Include="Automaton\RegularExpressions\AltToken.cs" />
164 165 <Compile Include="Automaton\RegularExpressions\BinaryToken.cs" />
165 166 <Compile Include="Automaton\RegularExpressions\CatToken.cs" />
166 167 <Compile Include="Automaton\RegularExpressions\StarToken.cs" />
167 168 <Compile Include="Automaton\RegularExpressions\SymbolToken.cs" />
168 169 <Compile Include="Automaton\RegularExpressions\EmptyToken.cs" />
169 170 <Compile Include="Automaton\RegularExpressions\Token.cs" />
170 171 <Compile Include="Automaton\RegularExpressions\IVisitor.cs" />
171 172 <Compile Include="Automaton\AutomatonTransition.cs" />
172 173 <Compile Include="Formats\Json\JsonElementContext.cs" />
173 174 <Compile Include="Formats\Json\JsonElementType.cs" />
174 175 <Compile Include="Formats\Json\JsonGrammar.cs" />
175 176 <Compile Include="Formats\Json\JsonReader.cs" />
176 177 <Compile Include="Formats\Json\JsonScanner.cs" />
177 178 <Compile Include="Formats\Json\JsonTokenType.cs" />
178 179 <Compile Include="Formats\Json\JsonWriter.cs" />
179 180 <Compile Include="Formats\Json\StringTranslator.cs" />
180 181 <Compile Include="Automaton\MapAlphabet.cs" />
181 182 <Compile Include="Formats\CharAlphabet.cs" />
182 183 <Compile Include="Formats\ByteAlphabet.cs" />
183 184 <Compile Include="Automaton\IDFATable.cs" />
184 185 <Compile Include="Automaton\IDFATableBuilder.cs" />
185 186 <Compile Include="Automaton\DFATable.cs" />
186 187 <Compile Include="Automaton\RegularExpressions\RegularExpressionVisitor.cs" />
187 188 <Compile Include="Automaton\RegularExpressions\ITaggedDFABuilder.cs" />
188 189 <Compile Include="Formats\Grammar.cs" />
189 190 <Compile Include="Automaton\RegularExpressions\EndTokenT.cs" />
190 191 <Compile Include="Automaton\RegularExpressions\EndToken.cs" />
191 192 <Compile Include="Automaton\RegularExpressions\RegularExpressionVisitorT.cs" />
192 193 <Compile Include="Automaton\AutomatonConst.cs" />
193 194 <Compile Include="Automaton\RegularExpressions\RegularDFA.cs" />
194 195 <Compile Include="Components\LazyAndWeak.cs" />
195 196 <Compile Include="AbstractTask.cs" />
196 197 <Compile Include="AbstractTaskT.cs" />
197 198 <Compile Include="FailedPromise.cs" />
198 199 <Compile Include="FailedPromiseT.cs" />
199 200 <Compile Include="Components\PollingComponent.cs" />
200 201 <Compile Include="Xml\JsonXmlReader.cs" />
201 202 <Compile Include="Xml\JsonXmlReaderOptions.cs" />
202 203 <Compile Include="Xml\JsonXmlReaderPosition.cs" />
203 204 <Compile Include="Xml\SerializationHelpers.cs" />
204 205 <Compile Include="Xml\SerializersPool.cs" />
205 206 <Compile Include="Xml\XmlSimpleAttribute.cs" />
206 207 <Compile Include="Xml\XmlNameContext.cs" />
207 208 </ItemGroup>
208 209 <Import Project="$(MSBuildBinPath)\Microsoft.CSharp.targets" />
209 210 <ItemGroup />
210 211 <ProjectExtensions>
211 212 <MonoDevelop>
212 213 <Properties>
213 214 <Policies>
214 215 <CSharpFormattingPolicy IndentBlock="True" IndentBraces="False" IndentSwitchSection="False" IndentSwitchCaseSection="True" LabelPositioning="OneLess" NewLinesForBracesInProperties="False" NewLinesForBracesInAccessors="False" NewLinesForBracesInAnonymousMethods="False" NewLinesForBracesInControlBlocks="False" NewLinesForBracesInAnonymousTypes="False" NewLinesForBracesInObjectCollectionArrayInitializers="False" NewLinesForBracesInLambdaExpressionBody="False" NewLineForElse="False" NewLineForCatch="False" NewLineForFinally="False" NewLineForMembersInObjectInit="False" NewLineForMembersInAnonymousTypes="False" NewLineForClausesInQuery="False" SpaceWithinMethodDeclarationParenthesis="False" SpaceBetweenEmptyMethodDeclarationParentheses="False" SpaceWithinMethodCallParentheses="False" SpaceBetweenEmptyMethodCallParentheses="False" SpaceAfterControlFlowStatementKeyword="True" SpaceWithinExpressionParentheses="False" SpaceWithinCastParentheses="False" SpaceWithinOtherParentheses="False" SpaceAfterCast="False" SpacesIgnoreAroundVariableDeclaration="False" SpaceBetweenEmptySquareBrackets="False" SpaceWithinSquareBrackets="False" SpaceAfterColonInBaseTypeDeclaration="True" SpaceAfterComma="True" SpaceAfterDot="False" SpaceAfterSemicolonsInForStatement="True" SpaceBeforeComma="False" SpaceBeforeDot="False" SpaceBeforeSemicolonsInForStatement="False" SpacingAroundBinaryOperator="Single" WrappingPreserveSingleLine="True" WrappingKeepStatementsOnSingleLine="True" PlaceSystemDirectiveFirst="True" NewLinesForBracesInTypes="True" NewLinesForBracesInMethods="True" SpacingAfterMethodDeclarationName="True" SpaceAfterMethodCallName="True" SpaceBeforeOpenSquareBracket="True" SpaceBeforeColonInBaseTypeDeclaration="True" scope="text/x-csharp" />
215 216 <TextStylePolicy FileWidth="120" TabWidth="4" IndentWidth="4" RemoveTrailingWhitespace="True" NoTabsAfterNonTabs="False" TabsToSpaces="True" EolMarker="Unix" scope="text/x-csharp" />
216 217 <DotNetNamingPolicy DirectoryNamespaceAssociation="PrefixedHierarchical" ResourceNamePolicy="MSBuild" />
217 218 <TextStylePolicy FileWidth="120" TabWidth="4" TabsToSpaces="False" IndentWidth="4" RemoveTrailingWhitespace="True" NoTabsAfterNonTabs="False" EolMarker="Native" scope="application/xml" />
218 219 <XmlFormattingPolicy scope="application/xml">
219 220 <DefaultFormat OmitXmlDeclaration="False" NewLineChars="&#xA;" IndentContent="True" ContentIndentString=" " AttributesInNewLine="False" MaxAttributesPerLine="10" AttributesIndentString=" " WrapAttributes="False" AlignAttributes="False" AlignAttributeValues="False" QuoteChar="&quot;" SpacesBeforeAssignment="0" SpacesAfterAssignment="0" EmptyLinesBeforeStart="0" EmptyLinesAfterStart="0" EmptyLinesBeforeEnd="0" EmptyLinesAfterEnd="0" />
220 221 </XmlFormattingPolicy>
221 222 <TextStylePolicy FileWidth="120" TabWidth="4" TabsToSpaces="False" IndentWidth="4" RemoveTrailingWhitespace="True" NoTabsAfterNonTabs="False" EolMarker="Native" scope="text/plain" />
222 223 <NameConventionPolicy>
223 224 <Rules>
224 225 <NamingRule Name="Namespaces" AffectedEntity="Namespace" VisibilityMask="VisibilityMask" NamingStyle="PascalCase" IncludeInstanceMembers="True" IncludeStaticEntities="True" />
225 226 <NamingRule Name="Types" AffectedEntity="Class, Struct, Enum, Delegate" VisibilityMask="VisibilityMask" NamingStyle="PascalCase" IncludeInstanceMembers="True" IncludeStaticEntities="True" />
226 227 <NamingRule Name="Interfaces" AffectedEntity="Interface" VisibilityMask="VisibilityMask" NamingStyle="PascalCase" IncludeInstanceMembers="True" IncludeStaticEntities="True">
227 228 <RequiredPrefixes>
228 229 <String>I</String>
229 230 </RequiredPrefixes>
230 231 </NamingRule>
231 232 <NamingRule Name="Attributes" AffectedEntity="CustomAttributes" VisibilityMask="VisibilityMask" NamingStyle="PascalCase" IncludeInstanceMembers="True" IncludeStaticEntities="True">
232 233 <RequiredSuffixes>
233 234 <String>Attribute</String>
234 235 </RequiredSuffixes>
235 236 </NamingRule>
236 237 <NamingRule Name="Event Arguments" AffectedEntity="CustomEventArgs" VisibilityMask="VisibilityMask" NamingStyle="PascalCase" IncludeInstanceMembers="True" IncludeStaticEntities="True">
237 238 <RequiredSuffixes>
238 239 <String>EventArgs</String>
239 240 </RequiredSuffixes>
240 241 </NamingRule>
241 242 <NamingRule Name="Exceptions" AffectedEntity="CustomExceptions" VisibilityMask="VisibilityMask" NamingStyle="PascalCase" IncludeInstanceMembers="True" IncludeStaticEntities="True">
242 243 <RequiredSuffixes>
243 244 <String>Exception</String>
244 245 </RequiredSuffixes>
245 246 </NamingRule>
246 247 <NamingRule Name="Methods" AffectedEntity="Methods" VisibilityMask="VisibilityMask" NamingStyle="PascalCase" IncludeInstanceMembers="True" IncludeStaticEntities="True" />
247 248 <NamingRule Name="Static Readonly Fields" AffectedEntity="ReadonlyField" VisibilityMask="Internal, Protected, Public" NamingStyle="CamelCase" IncludeInstanceMembers="False" IncludeStaticEntities="True" />
248 249 <NamingRule Name="Fields (Non Private)" AffectedEntity="Field" VisibilityMask="Internal, Public" NamingStyle="CamelCase" IncludeInstanceMembers="True" IncludeStaticEntities="True" />
249 250 <NamingRule Name="ReadOnly Fields (Non Private)" AffectedEntity="ReadonlyField" VisibilityMask="Internal, Public" NamingStyle="CamelCase" IncludeInstanceMembers="True" IncludeStaticEntities="False" />
250 251 <NamingRule Name="Fields (Private)" AffectedEntity="Field, ReadonlyField" VisibilityMask="Private, Protected" NamingStyle="CamelCase" IncludeInstanceMembers="True" IncludeStaticEntities="False">
251 252 <RequiredPrefixes>
252 253 <String>m_</String>
253 254 </RequiredPrefixes>
254 255 </NamingRule>
255 256 <NamingRule Name="Static Fields (Private)" AffectedEntity="Field" VisibilityMask="Private" NamingStyle="CamelCase" IncludeInstanceMembers="False" IncludeStaticEntities="True">
256 257 <RequiredPrefixes>
257 258 <String>_</String>
258 259 </RequiredPrefixes>
259 260 </NamingRule>
260 261 <NamingRule Name="ReadOnly Fields (Private)" AffectedEntity="ReadonlyField" VisibilityMask="Private, Protected" NamingStyle="CamelCase" IncludeInstanceMembers="True" IncludeStaticEntities="False">
261 262 <RequiredPrefixes>
262 263 <String>m_</String>
263 264 </RequiredPrefixes>
264 265 </NamingRule>
265 266 <NamingRule Name="Constant Fields" AffectedEntity="ConstantField" VisibilityMask="VisibilityMask" NamingStyle="AllUpper" IncludeInstanceMembers="True" IncludeStaticEntities="True" />
266 267 <NamingRule Name="Properties" AffectedEntity="Property" VisibilityMask="VisibilityMask" NamingStyle="PascalCase" IncludeInstanceMembers="True" IncludeStaticEntities="True" />
267 268 <NamingRule Name="Events" AffectedEntity="Event" VisibilityMask="VisibilityMask" NamingStyle="PascalCase" IncludeInstanceMembers="True" IncludeStaticEntities="True" />
268 269 <NamingRule Name="Enum Members" AffectedEntity="EnumMember" VisibilityMask="VisibilityMask" NamingStyle="PascalCase" IncludeInstanceMembers="True" IncludeStaticEntities="True" />
269 270 <NamingRule Name="Parameters" AffectedEntity="Parameter, LocalVariable" VisibilityMask="VisibilityMask" NamingStyle="CamelCase" IncludeInstanceMembers="True" IncludeStaticEntities="True" />
270 271 <NamingRule Name="Type Parameters" AffectedEntity="TypeParameter" VisibilityMask="VisibilityMask" NamingStyle="PascalCase" IncludeInstanceMembers="True" IncludeStaticEntities="True">
271 272 <RequiredPrefixes>
272 273 <String>T</String>
273 274 </RequiredPrefixes>
274 275 </NamingRule>
275 276 </Rules>
276 277 </NameConventionPolicy>
277 278 </Policies>
278 279 </Properties>
279 280 </MonoDevelop>
280 281 </ProjectExtensions>
281 282 <ItemGroup />
282 283 </Project> No newline at end of file
@@ -1,623 +1,562
1 1 using System.Threading;
2 2 using System.Collections.Generic;
3 3 using System;
4 4 using System.Collections;
5 5 using System.Diagnostics;
6 6
7 7 namespace Implab.Parallels {
8 8 public class AsyncQueue<T> : IEnumerable<T> {
9 9 class Chunk {
10 public Chunk next;
10 public volatile Chunk next;
11 11
12 int m_low;
13 int m_hi;
14 int m_alloc;
12 volatile int m_low;
13 volatile int m_hi;
14 volatile int m_alloc;
15 15 readonly int m_size;
16 16 readonly T[] m_data;
17 17
18 18 public Chunk(int size) {
19 19 m_size = size;
20 20 m_data = new T[size];
21 21 }
22 22
23 23 public Chunk(int size, T value) {
24 24 m_size = size;
25 25 m_hi = 1;
26 26 m_alloc = 1;
27 27 m_data = new T[size];
28 28 m_data[0] = value;
29 29 }
30 30
31 public Chunk(int size, T[] data, int offset, int length, int alloc) {
31 public Chunk(int size, int allocated) {
32 32 m_size = size;
33 m_hi = length;
34 m_alloc = alloc;
33 m_hi = allocated;
34 m_alloc = allocated;
35 35 m_data = new T[size];
36 Array.Copy(data, offset, m_data, 0, length);
36 }
37
38 public void WriteData(T[] data, int offset, int dest, int length) {
39 Array.Copy(data, offset, m_data, dest, length);
37 40 }
38 41
39 42 public int Low {
40 43 get { return m_low; }
41 44 }
42 45
43 46 public int Hi {
44 47 get { return m_hi; }
45 48 }
46 49
47 50 public int Size {
48 51 get { return m_size; }
49 52 }
50 53
51 public bool TryEnqueue(T value, out bool extend) {
52 var alloc = Interlocked.Increment(ref m_alloc) - 1;
53
54 if (alloc >= m_size) {
55 extend = alloc == m_size;
56 return false;
57 }
58
59 extend = false;
54 public bool TryEnqueue(T value) {
55 int alloc;
56 do {
57 alloc = m_alloc;
58 if (alloc >= m_size)
59 return false;
60 } while(alloc != Interlocked.CompareExchange(ref m_alloc, alloc + 1, alloc));
61
60 62 m_data[alloc] = value;
61 63
62 while (alloc != Interlocked.CompareExchange(ref m_hi, alloc + 1, alloc)) {
64 SpinWait spin = new SpinWait();
65 // m_hi is volatile
66 while (alloc != m_hi) {
63 67 // spin wait for commit
68 spin.SpinOnce();
64 69 }
70 m_hi = alloc + 1;
71
65 72 return true;
66 73 }
67 74
68 75 /// <summary>
69 76 /// Prevents from allocating new space in the chunk and waits for all write operations to complete
70 77 /// </summary>
71 public void Commit() {
72 var actual = Math.Min(Interlocked.Exchange(ref m_alloc, m_size + 1), m_size);
73
74 while (m_hi != actual)
75 Thread.MemoryBarrier();
78 public void Seal() {
79 var actual = Math.Min(Interlocked.Exchange(ref m_alloc, m_size), m_size);
80 SpinWait spin = new SpinWait();
81 while (m_hi != actual) {
82 spin.SpinOnce();
83 }
76 84 }
77 85
78 86 public bool TryDequeue(out T value, out bool recycle) {
79 87 int low;
80 88 do {
81 89 low = m_low;
82 90 if (low >= m_hi) {
83 91 value = default(T);
84 92 recycle = (low == m_size);
85 93 return false;
86 94 }
87 } while(low != Interlocked.CompareExchange(ref m_low, low + 1, low));
95 } while (low != Interlocked.CompareExchange(ref m_low, low + 1, low));
88 96
89 recycle = (low == m_size - 1);
97 recycle = (low + 1 == m_size);
90 98 value = m_data[low];
91 99
92 100 return true;
93 101 }
94 102
95 public bool TryEnqueueBatch(T[] batch, int offset, int length, out int enqueued, out bool extend) {
96 //int alloc;
97 //int allocSize;
103 public bool TryEnqueueBatch(T[] batch, int offset, int length, out int enqueued) {
104 int alloc;
105 do {
106 alloc = m_alloc;
107 if (alloc >= m_size) {
108 enqueued = 0;
109 return false;
110 } else {
111 enqueued = Math.Min(length, m_size - alloc);
112 }
113 } while (alloc != Interlocked.CompareExchange(ref m_alloc, alloc + enqueued, alloc));
114
115 Array.Copy(batch, offset, m_data, alloc, enqueued);
98 116
99 var alloc = Interlocked.Add(ref m_alloc, length) - length;
100 if (alloc > m_size) {
101 // the chunk is full and someone already
102 // creating the new one
103 enqueued = 0; // nothing was added
104 extend = false; // the caller shouldn't try to extend the queue
105 return false; // nothing was added
117 SpinWait spin = new SpinWait();
118 while (alloc != m_hi) {
119 spin.SpinOnce();
106 120 }
107 121
108 enqueued = Math.Min(m_size - alloc, length);
109 extend = length > enqueued;
110
111 if (enqueued == 0)
112 return false;
113
114
115 Array.Copy(batch, offset, m_data, alloc, enqueued);
116
117 while (alloc != Interlocked.CompareExchange(ref m_hi, alloc + enqueued, alloc)) {
118 // spin wait for commit
119 }
120
122 m_hi = alloc + enqueued;
121 123 return true;
122 124 }
123 125
124 public bool TryDequeueBatch(T[] buffer, int offset, int length,out int dequeued, out bool recycle) {
126 public bool TryDequeueBatch(T[] buffer, int offset, int length, out int dequeued, out bool recycle) {
125 127 int low, hi, batchSize;
126 128
127 129 do {
128 130 low = m_low;
129 131 hi = m_hi;
130 132 if (low >= hi) {
131 133 dequeued = 0;
132 recycle = (low == m_size); // recycling could be restarted and we need to signal again
134 recycle = (low == m_size);
133 135 return false;
134 136 }
135 137 batchSize = Math.Min(hi - low, length);
136 } while(low != Interlocked.CompareExchange(ref m_low, low + batchSize, low));
138 } while (low != Interlocked.CompareExchange(ref m_low, low + batchSize, low));
137 139
138 recycle = (low == m_size - batchSize);
139 140 dequeued = batchSize;
140
141 recycle = (low + batchSize == m_size);
141 142 Array.Copy(m_data, low, buffer, offset, batchSize);
142 143
143 144 return true;
144 145 }
145 146
146 147 public T GetAt(int pos) {
147 148 return m_data[pos];
148 149 }
149 150 }
150 151
151 152 public const int DEFAULT_CHUNK_SIZE = 32;
152 public const int MAX_CHUNK_SIZE = 262144;
153 public const int MAX_CHUNK_SIZE = 256;
153 154
154 155 Chunk m_first;
155 156 Chunk m_last;
156 157
158 public AsyncQueue() {
159 m_first = m_last = new Chunk(DEFAULT_CHUNK_SIZE);
160 }
161
157 162 /// <summary>
158 163 /// Adds the specified value to the queue.
159 164 /// </summary>
160 165 /// <param name="value">Tha value which will be added to the queue.</param>
161 public virtual void Enqueue(T value) {
166 public void Enqueue(T value) {
162 167 var last = m_last;
163 // spin wait to the new chunk
164 bool extend = true;
165 while (last == null || !last.TryEnqueue(value, out extend)) {
168 SpinWait spin = new SpinWait();
169 while (!last.TryEnqueue(value)) {
166 170 // try to extend queue
167 if (extend || last == null) {
168 var chunk = new Chunk(DEFAULT_CHUNK_SIZE, value);
169 if (EnqueueChunk(last, chunk))
170 break; // success! exit!
171 last = m_last;
171 var chunk = new Chunk(DEFAULT_CHUNK_SIZE, value);
172 var t = Interlocked.CompareExchange(ref m_last, chunk, last);
173 if (t == last) {
174 last.next = chunk;
175 break;
172 176 } else {
173 while (last == m_last) {
174 Thread.MemoryBarrier();
175 }
176 last = m_last;
177 last = t;
177 178 }
179 spin.SpinOnce();
178 180 }
179 181 }
180 182
181 183 /// <summary>
182 184 /// Adds the specified data to the queue.
183 185 /// </summary>
184 186 /// <param name="data">The buffer which contains the data to be enqueued.</param>
185 187 /// <param name="offset">The offset of the data in the buffer.</param>
186 188 /// <param name="length">The size of the data to read from the buffer.</param>
187 public virtual void EnqueueRange(T[] data, int offset, int length) {
189 public void EnqueueRange(T[] data, int offset, int length) {
188 190 if (data == null)
189 191 throw new ArgumentNullException("data");
190 if (length == 0)
191 return;
192 192 if (offset < 0)
193 193 throw new ArgumentOutOfRangeException("offset");
194 194 if (length < 1 || offset + length > data.Length)
195 195 throw new ArgumentOutOfRangeException("length");
196 196
197 var last = m_last;
197 while (length > 0) {
198 var last = m_last;
199 int enqueued;
198 200
199 bool extend;
200 int enqueued;
201
202 while (length > 0) {
203 extend = true;
204 if (last != null && last.TryEnqueueBatch(data, offset, length, out enqueued, out extend)) {
201 if (last.TryEnqueueBatch(data, offset, length, out enqueued)) {
205 202 length -= enqueued;
206 203 offset += enqueued;
207 204 }
208 205
209 if (extend) {
210 // there was no enough space in the chunk
211 // or there was no chunks in the queue
206 if (length > 0) {
207 // we have something to enqueue
212 208
213 while (length > 0) {
214
215 var size = Math.Min(length, MAX_CHUNK_SIZE);
209 var tail = length % MAX_CHUNK_SIZE;
216 210
217 var chunk = new Chunk(
218 Math.Max(size, DEFAULT_CHUNK_SIZE),
219 data,
220 offset,
221 size,
222 length // length >= size
223 );
211 var chunk = new Chunk(Math.Max(tail, DEFAULT_CHUNK_SIZE), tail);
212
213 if (last != Interlocked.CompareExchange(ref m_last, chunk, last))
214 continue; // we wasn't able to catch the writer, roundtrip
224 215
225 if (!EnqueueChunk(last, chunk)) {
226 // looks like the queue has been updated then proceed from the beginning
227 last = m_last;
228 break;
229 }
216 // we are lucky
217 // we can exclusively write our batch, the other writers will continue their work
218
219 length -= tail;
230 220
231 // we have successfully added the new chunk
232 last = chunk;
233 length -= size;
234 offset += size;
235 }
236 } else {
237 // we don't need to extend the queue, if we successfully enqueued data
238 if (length == 0)
239 break;
240
241 // if we need to wait while someone is extending the queue
242 // spinwait
243 while (last == m_last) {
244 Thread.MemoryBarrier();
221
222 for(var i = 0; i < length; i+= MAX_CHUNK_SIZE) {
223 var node = new Chunk(MAX_CHUNK_SIZE, MAX_CHUNK_SIZE);
224 node.WriteData(data, offset, 0, MAX_CHUNK_SIZE);
225 offset += MAX_CHUNK_SIZE;
226 // fence last.next is volatile
227 last.next = node;
228 last = node;
245 229 }
246 230
247 last = m_last;
231 if (tail > 0)
232 chunk.WriteData(data, offset, 0, tail);
233
234 // fence last.next is volatile
235 last.next = chunk;
236 return;
248 237 }
249 238 }
250 239 }
251 240
252 241 /// <summary>
253 242 /// Tries to retrieve the first element from the queue.
254 243 /// </summary>
255 244 /// <returns><c>true</c>, if element is dequeued, <c>false</c> otherwise.</returns>
256 245 /// <param name="value">The value of the dequeued element.</param>
257 246 public bool TryDequeue(out T value) {
258 247 var chunk = m_first;
259 bool recycle;
260 while (chunk != null) {
248 do {
249 bool recycle;
261 250
262 251 var result = chunk.TryDequeue(out value, out recycle);
263 252
264 if (recycle) // this chunk is waste
265 RecycleFirstChunk(chunk);
266 else
253 if (recycle && chunk.next != null) {
254 // this chunk is waste
255 chunk = Interlocked.CompareExchange(ref m_first, chunk.next, chunk);
256 } else {
267 257 return result; // this chunk is usable and returned actual result
258 }
268 259
269 260 if (result) // this chunk is waste but the true result is always actual
270 261 return true;
271
272 // try again
273 chunk = m_first;
274 }
275
276 // the queue is empty
277 value = default(T);
278 return false;
262 } while (true);
279 263 }
280 264
281 265 /// <summary>
282 266 /// Tries to dequeue the specified amount of data from the queue.
283 267 /// </summary>
284 268 /// <returns><c>true</c>, if data was deuqueued, <c>false</c> otherwise.</returns>
285 269 /// <param name="buffer">The buffer to which the data will be written.</param>
286 270 /// <param name="offset">The offset in the buffer at which the data will be written.</param>
287 271 /// <param name="length">The maximum amount of data to be retrieved.</param>
288 272 /// <param name="dequeued">The actual amout of the retrieved data.</param>
289 273 public bool TryDequeueRange(T[] buffer, int offset, int length, out int dequeued) {
290 274 if (buffer == null)
291 275 throw new ArgumentNullException("buffer");
292 276 if (offset < 0)
293 277 throw new ArgumentOutOfRangeException("offset");
294 278 if (length < 1 || offset + length > buffer.Length)
295 279 throw new ArgumentOutOfRangeException("length");
296 280
297 281 var chunk = m_first;
298 bool recycle;
299 282 dequeued = 0;
300 while (chunk != null) {
301
283 do {
284 bool recycle;
302 285 int actual;
303 286 if (chunk.TryDequeueBatch(buffer, offset, length, out actual, out recycle)) {
304 287 offset += actual;
305 288 length -= actual;
306 289 dequeued += actual;
307 290 }
308 291
309 if (recycle) // this chunk is waste
310 RecycleFirstChunk(chunk);
311 else if (actual == 0)
312 break; // the chunk is usable, but doesn't contain any data (it's the last chunk in the queue)
292 if (recycle && chunk.next != null) {
293 // this chunk is waste
294 chunk = Interlocked.CompareExchange(ref m_first, chunk.next, chunk);
295 } else {
296 chunk = null;
297 }
313 298
314 299 if (length == 0)
315 300 return true;
316
317 // we still may dequeue something
318 // try again
319 chunk = m_first;
320 }
301 } while (chunk != null);
321 302
322 303 return dequeued != 0;
323 304 }
324 305
325 306 /// <summary>
326 307 /// Tries to dequeue all remaining data in the first chunk.
327 308 /// </summary>
328 309 /// <returns><c>true</c>, if data was dequeued, <c>false</c> otherwise.</returns>
329 310 /// <param name="buffer">The buffer to which the data will be written.</param>
330 311 /// <param name="offset">The offset in the buffer at which the data will be written.</param>
331 312 /// <param name="length">Tha maximum amount of the data to be dequeued.</param>
332 313 /// <param name="dequeued">The actual amount of the dequeued data.</param>
333 314 public bool TryDequeueChunk(T[] buffer, int offset, int length, out int dequeued) {
334 315 if (buffer == null)
335 316 throw new ArgumentNullException("buffer");
336 317 if (offset < 0)
337 318 throw new ArgumentOutOfRangeException("offset");
338 319 if (length < 1 || offset + length > buffer.Length)
339 320 throw new ArgumentOutOfRangeException("length");
340 321
341 322 var chunk = m_first;
342 bool recycle;
343 dequeued = 0;
344
345 while (chunk != null) {
323 do {
324 bool recycle;
325 chunk.TryDequeueBatch(buffer, offset, length, out dequeued, out recycle);
346 326
347 int actual;
348 if (chunk.TryDequeueBatch(buffer, offset, length, out actual, out recycle)) {
349 dequeued = actual;
327 if (recycle && chunk.next != null) {
328 // this chunk is waste
329 chunk = Interlocked.CompareExchange(ref m_first, chunk.next, chunk);
330 } else {
331 chunk = null;
350 332 }
351 333
352 if (recycle) // this chunk is waste
353 RecycleFirstChunk(chunk);
354
355 334 // if we have dequeued any data, then return
356 335 if (dequeued != 0)
357 336 return true;
358 337
359 // we still may dequeue something
360 // try again
361 chunk = m_first;
362 }
338 } while (chunk != null);
363 339
364 340 return false;
365 341 }
366
367 bool EnqueueChunk(Chunk last, Chunk chunk) {
368 if (Interlocked.CompareExchange(ref m_last, chunk, last) != last)
369 return false;
370
371 if (last != null)
372 last.next = chunk;
373 else {
374 m_first = chunk;
375 }
376 return true;
377 }
378
379 void RecycleFirstChunk(Chunk first) {
380 var next = first.next;
381
382 if (first != Interlocked.CompareExchange(ref m_first, next, first))
383 return;
384
385 if (next == null) {
386
387 if (first != Interlocked.CompareExchange(ref m_last, null, first)) {
388
389 // race
390 // someone already updated the tail, restore the pointer to the queue head
391 m_first = first;
392 }
393 // the tail is updated
394 }
395 }
342
396 343
397 344 public void Clear() {
398 345 // start the new queue
399 346 var chunk = new Chunk(DEFAULT_CHUNK_SIZE);
400
401 347 do {
402 Thread.MemoryBarrier();
403 348 var first = m_first;
404 var last = m_last;
405
406 if (last == null) // nothing to clear
407 return;
408
409 if (first == null || (first.next == null && first != last)) // inconcistency
349 if (first.next == null && first != m_last) {
410 350 continue;
411
412 // here we will create inconsistency which will force others to spin
413 // and prevent from fetching. chunk.next = null
414 if (first != Interlocked.CompareExchange(ref m_first, chunk, first))
415 continue;// inconsistent
416
417 m_last = chunk;
418
419 return;
420
421 } while(true);
422 }
423
424 public T[] Drain() {
425 // start the new queue
426 var chunk = new Chunk(DEFAULT_CHUNK_SIZE);
427
428 do {
429 Thread.MemoryBarrier();
430 var first = m_first;
431 var last = m_last;
432
433 if (last == null)
434 return new T[0];
435
436 if (first == null || (first.next == null && first != last))
437 continue;
351 }
438 352
439 353 // here we will create inconsistency which will force others to spin
440 354 // and prevent from fetching. chunk.next = null
441 355 if (first != Interlocked.CompareExchange(ref m_first, chunk, first))
442 356 continue;// inconsistent
443 357
444 last = Interlocked.Exchange(ref m_last, chunk);
358 m_last = chunk;
359 return;
360 } while (true);
361 }
362
363 public List<T> Drain() {
364 // start the new queue
365 var chunk = new Chunk(DEFAULT_CHUNK_SIZE);
366
367 do {
368 var first = m_first;
369 // first.next is volatile
370 if (first.next == null) {
371 if (first != m_last)
372 continue;
373 else if (first.Hi == first.Low)
374 return new List<T>();
375 }
376
377 // here we will create inconsistency which will force others to spin
378 // and prevent from fetching. chunk.next = null
379 if (first != Interlocked.CompareExchange(ref m_first, chunk, first))
380 continue;// inconsistent
381
382 var last = Interlocked.Exchange(ref m_last, chunk);
445 383
446 384 return ReadChunks(first, last);
447 385
448 } while(true);
386 } while (true);
449 387 }
450
451 static T[] ReadChunks(Chunk chunk, object last) {
388
389 static List<T> ReadChunks(Chunk chunk, object last) {
452 390 var result = new List<T>();
453 var buffer = new T[DEFAULT_CHUNK_SIZE];
391 var buffer = new T[MAX_CHUNK_SIZE];
454 392 int actual;
455 393 bool recycle;
394 SpinWait spin = new SpinWait();
456 395 while (chunk != null) {
457 396 // ensure all write operations on the chunk are complete
458 chunk.Commit();
397 chunk.Seal();
459 398
460 399 // we need to read the chunk using this way
461 400 // since some client still may completing the dequeue
462 401 // operation, such clients most likely won't get results
463 402 while (chunk.TryDequeueBatch(buffer, 0, buffer.Length, out actual, out recycle))
464 403 result.AddRange(new ArraySegmentCollection(buffer, 0, actual));
465 404
466 405 if (chunk == last) {
467 406 chunk = null;
468 407 } else {
469 408 while (chunk.next == null)
470 Thread.MemoryBarrier();
409 spin.SpinOnce();
471 410 chunk = chunk.next;
472 411 }
473 412 }
474 413
475 return result.ToArray();
414 return result;
476 415 }
477 416
478 417 struct ArraySegmentCollection : ICollection<T> {
479 418 readonly T[] m_data;
480 419 readonly int m_offset;
481 420 readonly int m_length;
482 421
483 422 public ArraySegmentCollection(T[] data, int offset, int length) {
484 423 m_data = data;
485 424 m_offset = offset;
486 425 m_length = length;
487 426 }
488 427
489 428 #region ICollection implementation
490 429
491 430 public void Add(T item) {
492 431 throw new NotSupportedException();
493 432 }
494 433
495 434 public void Clear() {
496 435 throw new NotSupportedException();
497 436 }
498 437
499 438 public bool Contains(T item) {
500 439 return false;
501 440 }
502 441
503 442 public void CopyTo(T[] array, int arrayIndex) {
504 Array.Copy(m_data,m_offset,array,arrayIndex, m_length);
443 Array.Copy(m_data, m_offset, array, arrayIndex, m_length);
505 444 }
506 445
507 446 public bool Remove(T item) {
508 447 throw new NotSupportedException();
509 448 }
510 449
511 450 public int Count {
512 451 get {
513 452 return m_length;
514 453 }
515 454 }
516 455
517 456 public bool IsReadOnly {
518 457 get {
519 458 return true;
520 459 }
521 460 }
522 461
523 462 #endregion
524 463
525 464 #region IEnumerable implementation
526 465
527 466 public IEnumerator<T> GetEnumerator() {
528 467 for (int i = m_offset; i < m_length + m_offset; i++)
529 468 yield return m_data[i];
530 469 }
531 470
532 471 #endregion
533 472
534 473 #region IEnumerable implementation
535 474
536 475 IEnumerator IEnumerable.GetEnumerator() {
537 476 return GetEnumerator();
538 477 }
539 478
540 479 #endregion
541 480 }
542 481
543 482 #region IEnumerable implementation
544 483
545 484 class Enumerator : IEnumerator<T> {
546 485 Chunk m_current;
547 486 int m_pos = -1;
548 487
549 488 public Enumerator(Chunk fisrt) {
550 489 m_current = fisrt;
551 490 }
552 491
553 492 #region IEnumerator implementation
554 493
555 494 public bool MoveNext() {
556 495 if (m_current == null)
557 496 return false;
558 497
559 498 if (m_pos == -1)
560 499 m_pos = m_current.Low;
561 500 else
562 501 m_pos++;
563 502
564 503 if (m_pos == m_current.Hi) {
565 504
566 505 m_current = m_pos == m_current.Size ? m_current.next : null;
567 506
568 507 m_pos = 0;
569 508
570 509 if (m_current == null)
571 510 return false;
572 511 }
573 512
574 513 return true;
575 514 }
576 515
577 516 public void Reset() {
578 517 throw new NotSupportedException();
579 518 }
580 519
581 520 object IEnumerator.Current {
582 521 get {
583 522 return Current;
584 523 }
585 524 }
586 525
587 526 #endregion
588 527
589 528 #region IDisposable implementation
590 529
591 530 public void Dispose() {
592 531 }
593 532
594 533 #endregion
595 534
596 535 #region IEnumerator implementation
597 536
598 537 public T Current {
599 538 get {
600 539 if (m_pos == -1 || m_current == null)
601 540 throw new InvalidOperationException();
602 541 return m_current.GetAt(m_pos);
603 542 }
604 543 }
605 544
606 545 #endregion
607 546 }
608 547
609 548 public IEnumerator<T> GetEnumerator() {
610 549 return new Enumerator(m_first);
611 550 }
612 551
613 552 #endregion
614 553
615 554 #region IEnumerable implementation
616 555
617 556 IEnumerator IEnumerable.GetEnumerator() {
618 557 return GetEnumerator();
619 558 }
620 559
621 560 #endregion
622 561 }
623 562 }
@@ -1,101 +1,101
1 1 using System;
2 2 using System.Threading;
3 3
4 4 namespace Implab.Parallels {
5 5 public class BlockingQueue<T> : AsyncQueue<T> {
6 6 readonly object m_lock = new object();
7 7
8 public override void Enqueue(T value) {
8 public void EnqueuePulse(T value) {
9 9 base.Enqueue(value);
10 10 lock (m_lock)
11 11 Monitor.Pulse(m_lock);
12 12 }
13 13
14 public override void EnqueueRange(T[] data, int offset, int length) {
14 public void EnqueueRangePulse(T[] data, int offset, int length) {
15 15 base.EnqueueRange(data, offset, length);
16 16 if (length > 1)
17 17 lock (m_lock)
18 18 Monitor.PulseAll(m_lock);
19 19 else
20 20 lock (m_lock)
21 21 Monitor.Pulse(m_lock);
22 22 }
23 23
24 24 public T GetItem(int timeout) {
25 25 T item;
26 26
27 27 if (!TryDequeue(out item)) {
28 28 var t1 = Environment.TickCount;
29 29 var dt = timeout;
30 30
31 31 lock (m_lock) {
32 32 while (!TryDequeue(out item)) {
33 33 if (!Monitor.Wait(m_lock, dt))
34 34 throw new TimeoutException();
35 35 if (timeout >= 0) {
36 36 dt = timeout - Environment.TickCount + t1;
37 37 if (dt < 0)
38 38 throw new TimeoutException();
39 39 }
40 40 }
41 41 }
42 42 }
43 43 return item;
44 44 }
45 45
46 46 public T GetItem() {
47 47 T item;
48 48 if (!TryDequeue(out item))
49 49 lock (m_lock) {
50 50 while (!TryDequeue(out item))
51 51 Monitor.Wait(m_lock);
52 52 }
53 53 return item;
54 54 }
55 55
56 56 public T[] GetRange(int max, int timeout) {
57 57 Safe.ArgumentInRange(max, 1, Int32.MaxValue, "max");
58 58
59 59 var buffer = new T[max];
60 60 int actual;
61 61 if (!TryDequeueRange(buffer, 0, max, out actual)) {
62 62 var t1 = Environment.TickCount;
63 63 var dt = timeout;
64 64
65 65 lock (m_lock) {
66 66 while (!TryDequeueRange(buffer, 0, max, out actual)) {
67 67
68 68 if (!Monitor.Wait(m_lock, dt))
69 69 throw new TimeoutException();
70 70
71 71 if (timeout >= 0) {
72 72 dt = timeout - Environment.TickCount + t1;
73 73 if (dt < 0)
74 74 throw new TimeoutException();
75 75 }
76 76 }
77 77 }
78 78 }
79 79
80 80 var data = new T[actual];
81 81 Array.Copy(buffer, data, actual);
82 82 return data;
83 83 }
84 84
85 85 public T[] GetRange(int max) {
86 86 Safe.ArgumentInRange(max, 1, Int32.MaxValue, "max");
87 87
88 88 var buffer = new T[max];
89 89 int actual;
90 90 if (!TryDequeueRange(buffer, 0, max, out actual))
91 91 lock (m_lock)
92 92 while (!TryDequeueRange(buffer, 0, max, out actual))
93 93 Monitor.Wait(m_lock);
94 94
95 95 var data = new T[actual];
96 96 Array.Copy(buffer, data, actual);
97 97 return data;
98 98 }
99 99 }
100 100 }
101 101
1 NO CONTENT: file was removed
General Comments 3
Under Review
author

Auto status change to "Under Review"

Approved
author

ok, latest stable version should be in default

You need to be logged in to leave comments. Login now