##// END OF EJS Templates
fixed AsyncQueue iterator
cin -
r127:d86da8d2d4c3 v2
parent child
Show More
@@ -1,232 +1,230
1 1 <?xml version="1.0" encoding="utf-8"?>
2 2 <Project DefaultTargets="Build" ToolsVersion="4.0" xmlns="http://schemas.microsoft.com/developer/msbuild/2003">
3 3 <PropertyGroup>
4 4 <Configuration Condition=" '$(Configuration)' == '' ">Debug</Configuration>
5 5 <Platform Condition=" '$(Platform)' == '' ">AnyCPU</Platform>
6 6 <ProjectGuid>{F550F1F8-8746-4AD0-9614-855F4C4B7F05}</ProjectGuid>
7 7 <OutputType>Library</OutputType>
8 8 <RootNamespace>Implab</RootNamespace>
9 9 <AssemblyName>Implab</AssemblyName>
10 <ProductVersion>8.0.30703</ProductVersion>
11 <SchemaVersion>2.0</SchemaVersion>
12 10 </PropertyGroup>
13 11 <PropertyGroup Condition=" '$(Configuration)|$(Platform)' == 'Debug|AnyCPU' ">
14 12 <DebugSymbols>true</DebugSymbols>
15 13 <DebugType>full</DebugType>
16 14 <Optimize>false</Optimize>
17 15 <OutputPath>bin\Debug</OutputPath>
18 16 <DefineConstants>TRACE;DEBUG;</DefineConstants>
19 17 <ErrorReport>prompt</ErrorReport>
20 18 <WarningLevel>4</WarningLevel>
21 19 <ConsolePause>false</ConsolePause>
22 20 <RunCodeAnalysis>true</RunCodeAnalysis>
23 21 </PropertyGroup>
24 22 <PropertyGroup Condition=" '$(Configuration)|$(Platform)' == 'Release|AnyCPU' ">
25 23 <DebugType>full</DebugType>
26 24 <Optimize>true</Optimize>
27 25 <OutputPath>bin\Release</OutputPath>
28 26 <ErrorReport>prompt</ErrorReport>
29 27 <WarningLevel>4</WarningLevel>
30 28 <ConsolePause>false</ConsolePause>
31 29 </PropertyGroup>
32 30 <PropertyGroup Condition=" '$(Configuration)|$(Platform)' == 'Debug 4.5|AnyCPU' ">
33 31 <DebugSymbols>true</DebugSymbols>
34 32 <DebugType>full</DebugType>
35 33 <Optimize>false</Optimize>
36 34 <OutputPath>bin\Debug</OutputPath>
37 35 <DefineConstants>TRACE;DEBUG;NET_4_5</DefineConstants>
38 36 <ErrorReport>prompt</ErrorReport>
39 37 <WarningLevel>4</WarningLevel>
40 38 <RunCodeAnalysis>true</RunCodeAnalysis>
41 39 <ConsolePause>false</ConsolePause>
42 40 </PropertyGroup>
43 41 <PropertyGroup Condition=" '$(Configuration)|$(Platform)' == 'Release 4.5|AnyCPU' ">
44 42 <Optimize>true</Optimize>
45 43 <OutputPath>bin\Release</OutputPath>
46 44 <ErrorReport>prompt</ErrorReport>
47 45 <WarningLevel>4</WarningLevel>
48 46 <ConsolePause>false</ConsolePause>
49 47 <DefineConstants>NET_4_5</DefineConstants>
50 48 </PropertyGroup>
51 49 <PropertyGroup Condition=" '$(Configuration)|$(Platform)' == 'DebugMono|AnyCPU' ">
52 50 <DebugSymbols>true</DebugSymbols>
53 51 <DebugType>full</DebugType>
54 52 <Optimize>false</Optimize>
55 53 <OutputPath>bin\Debug</OutputPath>
56 54 <DefineConstants>TRACE;DEBUG;NET_4_5;MONO</DefineConstants>
57 55 <ErrorReport>prompt</ErrorReport>
58 56 <WarningLevel>4</WarningLevel>
59 57 <RunCodeAnalysis>true</RunCodeAnalysis>
60 58 <ConsolePause>false</ConsolePause>
61 59 </PropertyGroup>
62 60 <PropertyGroup Condition=" '$(Configuration)|$(Platform)' == 'ReleaseMono|AnyCPU' ">
63 61 <Optimize>true</Optimize>
64 62 <OutputPath>bin\Release</OutputPath>
65 63 <DefineConstants>NET_4_5;MONO;</DefineConstants>
66 64 <ErrorReport>prompt</ErrorReport>
67 65 <WarningLevel>4</WarningLevel>
68 66 <ConsolePause>false</ConsolePause>
69 67 </PropertyGroup>
70 68 <ItemGroup>
71 69 <Reference Include="System" />
72 70 <Reference Include="System.Xml" />
73 71 </ItemGroup>
74 72 <ItemGroup>
75 73 <Compile Include="Component.cs" />
76 74 <Compile Include="CustomEqualityComparer.cs" />
77 75 <Compile Include="Diagnostics\ConsoleTraceListener.cs" />
78 76 <Compile Include="Diagnostics\EventText.cs" />
79 77 <Compile Include="Diagnostics\IEventTextFormatter.cs" />
80 78 <Compile Include="Diagnostics\LogChannel.cs" />
81 79 <Compile Include="Diagnostics\LogicalOperation.cs" />
82 80 <Compile Include="Diagnostics\TextFileListener.cs" />
83 81 <Compile Include="Diagnostics\TextListenerBase.cs" />
84 82 <Compile Include="Diagnostics\TraceLog.cs" />
85 83 <Compile Include="Diagnostics\TraceEvent.cs" />
86 84 <Compile Include="Diagnostics\TraceEventType.cs" />
87 85 <Compile Include="Disposable.cs" />
88 86 <Compile Include="ICancellable.cs" />
89 87 <Compile Include="IProgressHandler.cs" />
90 88 <Compile Include="IProgressNotifier.cs" />
91 89 <Compile Include="IPromiseT.cs" />
92 90 <Compile Include="IPromise.cs" />
93 91 <Compile Include="IServiceLocator.cs" />
94 92 <Compile Include="ITaskController.cs" />
95 93 <Compile Include="JSON\JSONElementContext.cs" />
96 94 <Compile Include="JSON\JSONElementType.cs" />
97 95 <Compile Include="JSON\JSONGrammar.cs" />
98 96 <Compile Include="JSON\JSONParser.cs" />
99 97 <Compile Include="JSON\JSONScanner.cs" />
100 98 <Compile Include="JSON\JsonTokenType.cs" />
101 99 <Compile Include="JSON\JSONWriter.cs" />
102 100 <Compile Include="JSON\JSONXmlReader.cs" />
103 101 <Compile Include="JSON\JSONXmlReaderOptions.cs" />
104 102 <Compile Include="JSON\StringTranslator.cs" />
105 103 <Compile Include="Parallels\DispatchPool.cs" />
106 104 <Compile Include="Parallels\ArrayTraits.cs" />
107 105 <Compile Include="Parallels\MTQueue.cs" />
108 106 <Compile Include="Parallels\WorkerPool.cs" />
109 107 <Compile Include="Parsing\Alphabet.cs" />
110 108 <Compile Include="Parsing\AlphabetBase.cs" />
111 109 <Compile Include="Parsing\AltToken.cs" />
112 110 <Compile Include="Parsing\BinaryToken.cs" />
113 111 <Compile Include="Parsing\CatToken.cs" />
114 112 <Compile Include="Parsing\CDFADefinition.cs" />
115 113 <Compile Include="Parsing\DFABuilder.cs" />
116 114 <Compile Include="Parsing\DFADefinitionBase.cs" />
117 115 <Compile Include="Parsing\DFAStateDescriptor.cs" />
118 116 <Compile Include="Parsing\DFAutomaton.cs" />
119 117 <Compile Include="Parsing\EDFADefinition.cs" />
120 118 <Compile Include="Parsing\EmptyToken.cs" />
121 119 <Compile Include="Parsing\EndToken.cs" />
122 120 <Compile Include="Parsing\EnumAlphabet.cs" />
123 121 <Compile Include="Parsing\Grammar.cs" />
124 122 <Compile Include="Parsing\IAlphabet.cs" />
125 123 <Compile Include="Parsing\IDFADefinition.cs" />
126 124 <Compile Include="Parsing\IVisitor.cs" />
127 125 <Compile Include="Parsing\ParserException.cs" />
128 126 <Compile Include="Parsing\Scanner.cs" />
129 127 <Compile Include="Parsing\StarToken.cs" />
130 128 <Compile Include="Parsing\SymbolToken.cs" />
131 129 <Compile Include="Parsing\Token.cs" />
132 130 <Compile Include="ServiceLocator.cs" />
133 131 <Compile Include="TaskController.cs" />
134 132 <Compile Include="ProgressInitEventArgs.cs" />
135 133 <Compile Include="Properties\AssemblyInfo.cs" />
136 134 <Compile Include="Parallels\AsyncPool.cs" />
137 135 <Compile Include="Safe.cs" />
138 136 <Compile Include="ValueEventArgs.cs" />
139 137 <Compile Include="PromiseExtensions.cs" />
140 138 <Compile Include="SyncContextPromise.cs" />
141 139 <Compile Include="Diagnostics\OperationContext.cs" />
142 140 <Compile Include="Diagnostics\TraceContext.cs" />
143 141 <Compile Include="Diagnostics\LogEventArgs.cs" />
144 142 <Compile Include="Diagnostics\LogEventArgsT.cs" />
145 143 <Compile Include="Diagnostics\Extensions.cs" />
146 144 <Compile Include="IComponentContainer.cs" />
147 145 <Compile Include="PromiseEventType.cs" />
148 146 <Compile Include="Parallels\MTCustomQueue.cs" />
149 147 <Compile Include="Parallels\MTCustomQueueNode.cs" />
150 148 <Compile Include="ComponentContainer.cs" />
151 149 <Compile Include="DisposablePool.cs" />
152 150 <Compile Include="ObjectPool.cs" />
153 151 <Compile Include="Parallels\AsyncQueue.cs" />
154 152 <Compile Include="PromiseT.cs" />
155 153 <Compile Include="IDeferred.cs" />
156 154 <Compile Include="IDeferredT.cs" />
157 155 <Compile Include="AbstractPromise.cs" />
158 156 <Compile Include="Promise.cs" />
159 157 <Compile Include="PromiseTransientException.cs" />
160 158 </ItemGroup>
161 159 <Import Project="$(MSBuildBinPath)\Microsoft.CSharp.targets" />
162 160 <ItemGroup />
163 161 <ProjectExtensions>
164 162 <MonoDevelop>
165 163 <Properties>
166 164 <Policies>
167 165 <CSharpFormattingPolicy IndentSwitchBody="True" NamespaceBraceStyle="EndOfLine" ClassBraceStyle="EndOfLine" InterfaceBraceStyle="EndOfLine" StructBraceStyle="EndOfLine" EnumBraceStyle="EndOfLine" MethodBraceStyle="EndOfLine" ConstructorBraceStyle="EndOfLine" DestructorBraceStyle="EndOfLine" BeforeMethodDeclarationParentheses="False" BeforeMethodCallParentheses="False" BeforeConstructorDeclarationParentheses="False" NewLineBeforeConstructorInitializerColon="NewLine" NewLineAfterConstructorInitializerColon="SameLine" BeforeIndexerDeclarationBracket="False" BeforeDelegateDeclarationParentheses="False" NewParentheses="False" SpacesBeforeBrackets="False" inheritsSet="Mono" inheritsScope="text/x-csharp" scope="text/x-csharp" />
168 166 <TextStylePolicy FileWidth="120" EolMarker="Unix" inheritsSet="VisualStudio" inheritsScope="text/plain" scope="text/x-csharp" />
169 167 <DotNetNamingPolicy DirectoryNamespaceAssociation="PrefixedHierarchical" ResourceNamePolicy="MSBuild" />
170 168 <TextStylePolicy FileWidth="120" TabsToSpaces="False" inheritsSet="VisualStudio" inheritsScope="text/plain" scope="application/xml" />
171 169 <XmlFormattingPolicy inheritsSet="Mono" inheritsScope="application/xml" scope="application/xml" />
172 170 <TextStylePolicy FileWidth="120" TabsToSpaces="False" inheritsSet="VisualStudio" inheritsScope="text/plain" scope="text/plain" />
173 171 <NameConventionPolicy>
174 172 <Rules>
175 173 <NamingRule Name="Namespaces" AffectedEntity="Namespace" VisibilityMask="VisibilityMask" NamingStyle="PascalCase" IncludeInstanceMembers="True" IncludeStaticEntities="True" />
176 174 <NamingRule Name="Types" AffectedEntity="Class, Struct, Enum, Delegate" VisibilityMask="VisibilityMask" NamingStyle="PascalCase" IncludeInstanceMembers="True" IncludeStaticEntities="True" />
177 175 <NamingRule Name="Interfaces" AffectedEntity="Interface" VisibilityMask="VisibilityMask" NamingStyle="PascalCase" IncludeInstanceMembers="True" IncludeStaticEntities="True">
178 176 <RequiredPrefixes>
179 177 <String>I</String>
180 178 </RequiredPrefixes>
181 179 </NamingRule>
182 180 <NamingRule Name="Attributes" AffectedEntity="CustomAttributes" VisibilityMask="VisibilityMask" NamingStyle="PascalCase" IncludeInstanceMembers="True" IncludeStaticEntities="True">
183 181 <RequiredSuffixes>
184 182 <String>Attribute</String>
185 183 </RequiredSuffixes>
186 184 </NamingRule>
187 185 <NamingRule Name="Event Arguments" AffectedEntity="CustomEventArgs" VisibilityMask="VisibilityMask" NamingStyle="PascalCase" IncludeInstanceMembers="True" IncludeStaticEntities="True">
188 186 <RequiredSuffixes>
189 187 <String>EventArgs</String>
190 188 </RequiredSuffixes>
191 189 </NamingRule>
192 190 <NamingRule Name="Exceptions" AffectedEntity="CustomExceptions" VisibilityMask="VisibilityMask" NamingStyle="PascalCase" IncludeInstanceMembers="True" IncludeStaticEntities="True">
193 191 <RequiredSuffixes>
194 192 <String>Exception</String>
195 193 </RequiredSuffixes>
196 194 </NamingRule>
197 195 <NamingRule Name="Methods" AffectedEntity="Methods" VisibilityMask="VisibilityMask" NamingStyle="PascalCase" IncludeInstanceMembers="True" IncludeStaticEntities="True" />
198 196 <NamingRule Name="Static Readonly Fields" AffectedEntity="ReadonlyField" VisibilityMask="Internal, Protected, Public" NamingStyle="CamelCase" IncludeInstanceMembers="False" IncludeStaticEntities="True" />
199 197 <NamingRule Name="Fields (Non Private)" AffectedEntity="Field" VisibilityMask="Internal, Public" NamingStyle="CamelCase" IncludeInstanceMembers="True" IncludeStaticEntities="True" />
200 198 <NamingRule Name="ReadOnly Fields (Non Private)" AffectedEntity="ReadonlyField" VisibilityMask="Internal, Public" NamingStyle="CamelCase" IncludeInstanceMembers="True" IncludeStaticEntities="False" />
201 199 <NamingRule Name="Fields (Private)" AffectedEntity="Field, ReadonlyField" VisibilityMask="Private, Protected" NamingStyle="CamelCase" IncludeInstanceMembers="True" IncludeStaticEntities="False">
202 200 <RequiredPrefixes>
203 201 <String>m_</String>
204 202 </RequiredPrefixes>
205 203 </NamingRule>
206 204 <NamingRule Name="Static Fields (Private)" AffectedEntity="Field" VisibilityMask="Private" NamingStyle="CamelCase" IncludeInstanceMembers="False" IncludeStaticEntities="True">
207 205 <RequiredPrefixes>
208 206 <String>_</String>
209 207 </RequiredPrefixes>
210 208 </NamingRule>
211 209 <NamingRule Name="ReadOnly Fields (Private)" AffectedEntity="ReadonlyField" VisibilityMask="Private, Protected" NamingStyle="CamelCase" IncludeInstanceMembers="True" IncludeStaticEntities="False">
212 210 <RequiredPrefixes>
213 211 <String>m_</String>
214 212 </RequiredPrefixes>
215 213 </NamingRule>
216 214 <NamingRule Name="Constant Fields" AffectedEntity="ConstantField" VisibilityMask="VisibilityMask" NamingStyle="AllUpper" IncludeInstanceMembers="True" IncludeStaticEntities="True" />
217 215 <NamingRule Name="Properties" AffectedEntity="Property" VisibilityMask="VisibilityMask" NamingStyle="PascalCase" IncludeInstanceMembers="True" IncludeStaticEntities="True" />
218 216 <NamingRule Name="Events" AffectedEntity="Event" VisibilityMask="VisibilityMask" NamingStyle="PascalCase" IncludeInstanceMembers="True" IncludeStaticEntities="True" />
219 217 <NamingRule Name="Enum Members" AffectedEntity="EnumMember" VisibilityMask="VisibilityMask" NamingStyle="PascalCase" IncludeInstanceMembers="True" IncludeStaticEntities="True" />
220 218 <NamingRule Name="Parameters" AffectedEntity="Parameter, LocalVariable" VisibilityMask="VisibilityMask" NamingStyle="CamelCase" IncludeInstanceMembers="True" IncludeStaticEntities="True" />
221 219 <NamingRule Name="Type Parameters" AffectedEntity="TypeParameter" VisibilityMask="VisibilityMask" NamingStyle="PascalCase" IncludeInstanceMembers="True" IncludeStaticEntities="True">
222 220 <RequiredPrefixes>
223 221 <String>T</String>
224 222 </RequiredPrefixes>
225 223 </NamingRule>
226 224 </Rules>
227 225 </NameConventionPolicy>
228 226 </Policies>
229 227 </Properties>
230 228 </MonoDevelop>
231 229 </ProjectExtensions>
232 230 </Project> No newline at end of file
@@ -1,619 +1,629
1 1 using System.Threading;
2 2 using System.Collections.Generic;
3 3 using System;
4 4 using System.Collections;
5 5 using System.Diagnostics;
6 6
7 7 namespace Implab.Parallels {
8 8 public class AsyncQueue<T> : IEnumerable<T> {
9 9 class Chunk {
10 10 public Chunk next;
11 11
12 12 int m_low;
13 13 int m_hi;
14 14 int m_alloc;
15 15 readonly int m_size;
16 16 readonly T[] m_data;
17 17
18 18 public Chunk(int size) {
19 19 m_size = size;
20 20 m_data = new T[size];
21 21 }
22 22
23 23 public Chunk(int size, T value) {
24 24 m_size = size;
25 25 m_hi = 1;
26 26 m_alloc = 1;
27 27 m_data = new T[size];
28 28 m_data[0] = value;
29 29 }
30 30
31 31 public Chunk(int size, T[] data, int offset, int length, int alloc) {
32 32 m_size = size;
33 33 m_hi = length;
34 34 m_alloc = alloc;
35 35 m_data = new T[size];
36 36 Array.Copy(data, offset, m_data, 0, length);
37 37 }
38 38
39 39 public int Low {
40 40 get { return m_low; }
41 41 }
42 42
43 43 public int Hi {
44 44 get { return m_hi; }
45 45 }
46 46
47 public int Size {
48 get { return m_size; }
49 }
50
47 51 public bool TryEnqueue(T value, out bool extend) {
48 52 var alloc = Interlocked.Increment(ref m_alloc) - 1;
49 53
50 54 if (alloc >= m_size) {
51 55 extend = alloc == m_size;
52 56 return false;
53 57 }
54 58
55 59 extend = false;
56 60 m_data[alloc] = value;
57 61
58 62 while (alloc != Interlocked.CompareExchange(ref m_hi, alloc + 1, alloc)) {
59 63 // spin wait for commit
60 64 }
61 65 return true;
62 66 }
63 67
64 68 /// <summary>
65 69 /// Prevents from allocating new space in the chunk and waits for all write operations to complete
66 70 /// </summary>
67 71 public void Commit() {
68 72 var actual = Math.Min(Interlocked.Exchange(ref m_alloc, m_size + 1), m_size);
69 73
70 74 while (m_hi != actual)
71 75 Thread.MemoryBarrier();
72 76 }
73 77
74 78 public bool TryDequeue(out T value, out bool recycle) {
75 79 int low;
76 80 do {
77 81 low = m_low;
78 82 if (low >= m_hi) {
79 83 value = default(T);
80 84 recycle = (low == m_size);
81 85 return false;
82 86 }
83 87 } while(low != Interlocked.CompareExchange(ref m_low, low + 1, low));
84 88
85 89 recycle = (low == m_size - 1);
86 90 value = m_data[low];
87 91
88 92 return true;
89 93 }
90 94
91 95 public bool TryEnqueueBatch(T[] batch, int offset, int length, out int enqueued, out bool extend) {
92 96 //int alloc;
93 97 //int allocSize;
94 98
95 99 var alloc = Interlocked.Add(ref m_alloc, length) - length;
96 100 if (alloc > m_size) {
97 101 // the chunk is full and someone already
98 102 // creating the new one
99 103 enqueued = 0; // nothing was added
100 104 extend = false; // the caller shouldn't try to extend the queue
101 105 return false; // nothing was added
102 106 }
103 107
104 108 enqueued = Math.Min(m_size - alloc, length);
105 109 extend = length > enqueued;
106 110
107 111 if (enqueued == 0)
108 112 return false;
109 113
110 114
111 115 Array.Copy(batch, offset, m_data, alloc, enqueued);
112 116
113 117 while (alloc != Interlocked.CompareExchange(ref m_hi, alloc + enqueued, alloc)) {
114 118 // spin wait for commit
115 119 }
116 120
117 121 return true;
118 122 }
119 123
120 124 public bool TryDequeueBatch(T[] buffer, int offset, int length,out int dequeued, out bool recycle) {
121 125 int low, hi, batchSize;
122 126
123 127 do {
124 128 low = m_low;
125 129 hi = m_hi;
126 130 if (low >= hi) {
127 131 dequeued = 0;
128 132 recycle = (low == m_size); // recycling could be restarted and we need to signal again
129 133 return false;
130 134 }
131 135 batchSize = Math.Min(hi - low, length);
132 136 } while(low != Interlocked.CompareExchange(ref m_low, low + batchSize, low));
133 137
134 138 recycle = (low == m_size - batchSize);
135 139 dequeued = batchSize;
136 140
137 141 Array.Copy(m_data, low, buffer, offset, batchSize);
138 142
139 143 return true;
140 144 }
141 145
142 146 public T GetAt(int pos) {
143 147 return m_data[pos];
144 148 }
145 149 }
146 150
147 151 public const int DEFAULT_CHUNK_SIZE = 32;
148 152 public const int MAX_CHUNK_SIZE = 262144;
149 153
150 154 Chunk m_first;
151 155 Chunk m_last;
152 156
153 157 /// <summary>
154 158 /// Adds the specified value to the queue.
155 159 /// </summary>
156 160 /// <param name="value">Tha value which will be added to the queue.</param>
157 161 public void Enqueue(T value) {
158 162 var last = m_last;
159 163 // spin wait to the new chunk
160 164 bool extend = true;
161 165 while (last == null || !last.TryEnqueue(value, out extend)) {
162 166 // try to extend queue
163 167 if (extend || last == null) {
164 168 var chunk = new Chunk(DEFAULT_CHUNK_SIZE, value);
165 169 if (EnqueueChunk(last, chunk))
166 170 break; // success! exit!
167 171 last = m_last;
168 172 } else {
169 173 while (last == m_last) {
170 174 Thread.MemoryBarrier();
171 175 }
172 176 last = m_last;
173 177 }
174 178 }
175 179 }
176 180
177 181 /// <summary>
178 182 /// Adds the specified data to the queue.
179 183 /// </summary>
180 184 /// <param name="data">The buffer which contains the data to be enqueued.</param>
181 185 /// <param name="offset">The offset of the data in the buffer.</param>
182 186 /// <param name="length">The size of the data to read from the buffer.</param>
183 187 public void EnqueueRange(T[] data, int offset, int length) {
184 188 if (data == null)
185 189 throw new ArgumentNullException("data");
186 190 if (offset < 0)
187 191 throw new ArgumentOutOfRangeException("offset");
188 192 if (length < 1 || offset + length > data.Length)
189 193 throw new ArgumentOutOfRangeException("length");
190 194
191 195 var last = m_last;
192 196
193 197 bool extend;
194 198 int enqueued;
195 199
196 200 while (length > 0) {
197 201 extend = true;
198 202 if (last != null && last.TryEnqueueBatch(data, offset, length, out enqueued, out extend)) {
199 203 length -= enqueued;
200 204 offset += enqueued;
201 205 }
202 206
203 207 if (extend) {
204 208 // there was no enough space in the chunk
205 209 // or there was no chunks in the queue
206 210
207 211 while (length > 0) {
208 212
209 213 var size = Math.Min(length, MAX_CHUNK_SIZE);
210 214
211 215 var chunk = new Chunk(
212 216 Math.Max(size, DEFAULT_CHUNK_SIZE),
213 217 data,
214 218 offset,
215 219 size,
216 220 length // length >= size
217 221 );
218 222
219 223 if (!EnqueueChunk(last, chunk)) {
220 224 // looks like the queue has been updated then proceed from the beginning
221 225 last = m_last;
222 226 break;
223 227 }
224 228
225 229 // we have successfully added the new chunk
226 230 last = chunk;
227 231 length -= size;
228 232 offset += size;
229 233 }
230 234 } else {
231 235 // we don't need to extend the queue, if we successfully enqueued data
232 236 if (length == 0)
233 237 break;
234 238
235 239 // if we need to wait while someone is extending the queue
236 240 // spinwait
237 241 while (last == m_last) {
238 242 Thread.MemoryBarrier();
239 243 }
240 244
241 245 last = m_last;
242 246 }
243 247 }
244 248 }
245 249
246 250 /// <summary>
247 251 /// Tries to retrieve the first element from the queue.
248 252 /// </summary>
249 253 /// <returns><c>true</c>, if element is dequeued, <c>false</c> otherwise.</returns>
250 254 /// <param name="value">The value of the dequeued element.</param>
251 255 public bool TryDequeue(out T value) {
252 256 var chunk = m_first;
253 257 bool recycle;
254 258 while (chunk != null) {
255 259
256 260 var result = chunk.TryDequeue(out value, out recycle);
257 261
258 262 if (recycle) // this chunk is waste
259 263 RecycleFirstChunk(chunk);
260 264 else
261 265 return result; // this chunk is usable and returned actual result
262 266
263 267 if (result) // this chunk is waste but the true result is always actual
264 268 return true;
265 269
266 270 // try again
267 271 chunk = m_first;
268 272 }
269 273
270 274 // the queue is empty
271 275 value = default(T);
272 276 return false;
273 277 }
274 278
275 279 /// <summary>
276 280 /// Tries to dequeue the specified amount of data from the queue.
277 281 /// </summary>
278 282 /// <returns><c>true</c>, if data was deuqueued, <c>false</c> otherwise.</returns>
279 283 /// <param name="buffer">The buffer to which the data will be written.</param>
280 284 /// <param name="offset">The offset in the buffer at which the data will be written.</param>
281 285 /// <param name="length">The maximum amount of data to be retrieved.</param>
282 286 /// <param name="dequeued">The actual amout of the retrieved data.</param>
283 287 public bool TryDequeueRange(T[] buffer, int offset, int length, out int dequeued) {
284 288 if (buffer == null)
285 289 throw new ArgumentNullException("buffer");
286 290 if (offset < 0)
287 291 throw new ArgumentOutOfRangeException("offset");
288 292 if (length < 1 || offset + length > buffer.Length)
289 293 throw new ArgumentOutOfRangeException("length");
290 294
291 295 var chunk = m_first;
292 296 bool recycle;
293 297 dequeued = 0;
294 298 while (chunk != null) {
295 299
296 300 int actual;
297 301 if (chunk.TryDequeueBatch(buffer, offset, length, out actual, out recycle)) {
298 302 offset += actual;
299 303 length -= actual;
300 304 dequeued += actual;
301 305 }
302 306
303 307 if (recycle) // this chunk is waste
304 308 RecycleFirstChunk(chunk);
305 309 else if (actual == 0)
306 310 break; // the chunk is usable, but doesn't contain any data (it's the last chunk in the queue)
307 311
308 312 if (length == 0)
309 313 return true;
310 314
311 315 // we still may dequeue something
312 316 // try again
313 317 chunk = m_first;
314 318 }
315 319
316 320 return dequeued != 0;
317 321 }
318 322
319 323 /// <summary>
320 324 /// Tries to dequeue all remaining data in the first chunk.
321 325 /// </summary>
322 326 /// <returns><c>true</c>, if data was dequeued, <c>false</c> otherwise.</returns>
323 327 /// <param name="buffer">The buffer to which the data will be written.</param>
324 328 /// <param name="offset">The offset in the buffer at which the data will be written.</param>
325 329 /// <param name="length">Tha maximum amount of the data to be dequeued.</param>
326 330 /// <param name="dequeued">The actual amount of the dequeued data.</param>
327 331 public bool TryDequeueChunk(T[] buffer, int offset, int length, out int dequeued) {
328 332 if (buffer == null)
329 333 throw new ArgumentNullException("buffer");
330 334 if (offset < 0)
331 335 throw new ArgumentOutOfRangeException("offset");
332 336 if (length < 1 || offset + length > buffer.Length)
333 337 throw new ArgumentOutOfRangeException("length");
334 338
335 339 var chunk = m_first;
336 340 bool recycle;
337 341 dequeued = 0;
338 342
339 343 while (chunk != null) {
340 344
341 345 int actual;
342 346 if (chunk.TryDequeueBatch(buffer, offset, length, out actual, out recycle)) {
343 347 dequeued = actual;
344 348 }
345 349
346 350 if (recycle) // this chunk is waste
347 351 RecycleFirstChunk(chunk);
348 352
349 353 // if we have dequeued any data, then return
350 354 if (dequeued != 0)
351 355 return true;
352 356
353 357 // we still may dequeue something
354 358 // try again
355 359 chunk = m_first;
356 360 }
357 361
358 362 return false;
359 363 }
360 364
361 365 bool EnqueueChunk(Chunk last, Chunk chunk) {
362 366 if (Interlocked.CompareExchange(ref m_last, chunk, last) != last)
363 367 return false;
364 368
365 369 if (last != null)
366 370 last.next = chunk;
367 371 else {
368 372 m_first = chunk;
369 373 }
370 374 return true;
371 375 }
372 376
373 377 void RecycleFirstChunk(Chunk first) {
374 378 var next = first.next;
375 379
376 380 if (first != Interlocked.CompareExchange(ref m_first, next, first))
377 381 return;
378 382
379 383 if (next == null) {
380 384
381 385 if (first != Interlocked.CompareExchange(ref m_last, null, first)) {
382 386 /*while (first.next == null)
383 387 Thread.MemoryBarrier();*/
384 388
385 389 // race
386 390 // someone already updated the tail, restore the pointer to the queue head
387 391 m_first = first;
388 392 }
389 393 // the tail is updated
390 394 }
391 395
392 396 // we need to update the head
393 397 //Interlocked.CompareExchange(ref m_first, next, first);
394 398 // if the head is already updated then give up
395 399 //return;
396 400
397 401 }
398 402
399 403 public void Clear() {
400 404 // start the new queue
401 405 var chunk = new Chunk(DEFAULT_CHUNK_SIZE);
402 406
403 407 do {
404 408 Thread.MemoryBarrier();
405 409 var first = m_first;
406 410 var last = m_last;
407 411
408 412 if (last == null) // nothing to clear
409 413 return;
410 414
411 415 if (first == null || (first.next == null && first != last)) // inconcistency
412 416 continue;
413 417
414 418 // here we will create inconsistency which will force others to spin
415 419 // and prevent from fetching. chunk.next = null
416 420 if (first != Interlocked.CompareExchange(ref m_first, chunk, first))
417 421 continue;// inconsistent
418 422
419 423 m_last = chunk;
420 424
421 425 return;
422 426
423 427 } while(true);
424 428 }
425 429
426 430 public T[] Drain() {
427 431 // start the new queue
428 432 var chunk = new Chunk(DEFAULT_CHUNK_SIZE);
429 433
430 434 do {
431 435 Thread.MemoryBarrier();
432 436 var first = m_first;
433 437 var last = m_last;
434 438
435 439 if (last == null)
436 440 return new T[0];
437 441
438 442 if (first == null || (first.next == null && first != last))
439 443 continue;
440 444
441 445 // here we will create inconsistency which will force others to spin
442 446 // and prevent from fetching. chunk.next = null
443 447 if (first != Interlocked.CompareExchange(ref m_first, chunk, first))
444 448 continue;// inconsistent
445 449
446 450 last = Interlocked.Exchange(ref m_last, chunk);
447 451
448 452 return ReadChunks(first, last);
449 453
450 454 } while(true);
451 455 }
452 456
453 457 T[] ReadChunks(Chunk chunk, object last) {
454 458 var result = new List<T>();
455 459 var buffer = new T[DEFAULT_CHUNK_SIZE];
456 460 int actual;
457 461 bool recycle;
458 462 while (chunk != null) {
459 463 // ensure all write operations on the chunk are complete
460 464 chunk.Commit();
461 465
462 466 // we need to read the chunk using this way
463 467 // since some client still may completing the dequeue
464 468 // operation, such clients most likely won't get results
465 469 while (chunk.TryDequeueBatch(buffer, 0, buffer.Length, out actual, out recycle))
466 470 result.AddRange(new ArraySegmentCollection(buffer, 0, actual));
467 471
468 472 if (chunk == last) {
469 473 chunk = null;
470 474 } else {
471 475 while (chunk.next == null)
472 476 Thread.MemoryBarrier();
473 477 chunk = chunk.next;
474 478 }
475 479 }
476 480
477 481 return result.ToArray();
478 482 }
479 483
480 484 struct ArraySegmentCollection : ICollection<T> {
481 485 readonly T[] m_data;
482 486 readonly int m_offset;
483 487 readonly int m_length;
484 488
485 489 public ArraySegmentCollection(T[] data, int offset, int length) {
486 490 m_data = data;
487 491 m_offset = offset;
488 492 m_length = length;
489 493 }
490 494
491 495 #region ICollection implementation
492 496
493 497 public void Add(T item) {
494 498 throw new InvalidOperationException();
495 499 }
496 500
497 501 public void Clear() {
498 502 throw new InvalidOperationException();
499 503 }
500 504
501 505 public bool Contains(T item) {
502 506 return false;
503 507 }
504 508
505 509 public void CopyTo(T[] array, int arrayIndex) {
506 510 Array.Copy(m_data,m_offset,array,arrayIndex, m_length);
507 511 }
508 512
509 513 public bool Remove(T item) {
510 514 throw new NotImplementedException();
511 515 }
512 516
513 517 public int Count {
514 518 get {
515 519 return m_length;
516 520 }
517 521 }
518 522
519 523 public bool IsReadOnly {
520 524 get {
521 525 return true;
522 526 }
523 527 }
524 528
525 529 #endregion
526 530
527 531 #region IEnumerable implementation
528 532
529 533 public IEnumerator<T> GetEnumerator() {
530 534 for (int i = m_offset; i < m_length + m_offset; i++)
531 535 yield return m_data[i];
532 536 }
533 537
534 538 #endregion
535 539
536 540 #region IEnumerable implementation
537 541
538 542 IEnumerator IEnumerable.GetEnumerator() {
539 543 return GetEnumerator();
540 544 }
541 545
542 546 #endregion
543 547 }
544 548
545 549 #region IEnumerable implementation
546 550
547 551 class Enumerator : IEnumerator<T> {
548 552 Chunk m_current;
549 553 int m_pos = -1;
550 554
551 555 public Enumerator(Chunk fisrt) {
552 556 m_current = fisrt;
553 557 }
554 558
555 559 #region IEnumerator implementation
556 560
557 561 public bool MoveNext() {
558 562 if (m_current == null)
559 563 return false;
560 564
561 565 if (m_pos == -1)
562 566 m_pos = m_current.Low;
563 567 else
564 568 m_pos++;
569
565 570 if (m_pos == m_current.Hi) {
571
572 m_current = m_pos == m_current.Size ? m_current.next : null;
573
566 574 m_pos = 0;
567 m_current = m_current.next;
575
576 if (m_current == null)
577 return false;
568 578 }
569 579
570 580 return true;
571 581 }
572 582
573 583 public void Reset() {
574 584 throw new NotSupportedException();
575 585 }
576 586
577 587 object IEnumerator.Current {
578 588 get {
579 589 return Current;
580 590 }
581 591 }
582 592
583 593 #endregion
584 594
585 595 #region IDisposable implementation
586 596
587 597 public void Dispose() {
588 598 }
589 599
590 600 #endregion
591 601
592 602 #region IEnumerator implementation
593 603
594 604 public T Current {
595 605 get {
596 606 if (m_pos == -1 || m_current == null)
597 607 throw new InvalidOperationException();
598 608 return m_current.GetAt(m_pos);
599 609 }
600 610 }
601 611
602 612 #endregion
603 613 }
604 614
605 615 public IEnumerator<T> GetEnumerator() {
606 616 return new Enumerator(m_first);
607 617 }
608 618
609 619 #endregion
610 620
611 621 #region IEnumerable implementation
612 622
613 623 IEnumerator IEnumerable.GetEnumerator() {
614 624 return GetEnumerator();
615 625 }
616 626
617 627 #endregion
618 628 }
619 629 }
General Comments 0
You need to be logged in to leave comments. Login now