##// END OF EJS Templates
Added Signal class a lightweight alternative to ManualResetEvent
cin -
r128:6241bff0cd64 v2
parent child
Show More
@@ -0,0 +1,31
1 using System;
2 using System.Threading;
3
4 namespace Implab.Parallels {
5 /// <summary>
6 /// Implements simple signalling logic using <see cref="Monitor.PulseAll(object)"/>.
7 /// </summary>
8 public class Signal {
9 readonly object m_lock = new object();
10 bool m_state;
11
12 public void Set() {
13 lock(m_lock) {
14 m_state = true;
15 Monitor.PulseAll(m_lock);
16 }
17 }
18
19 public void Wait() {
20 lock (m_lock)
21 if (!m_state)
22 Monitor.Wait(m_lock);
23 }
24
25 public bool Wait(int timeout) {
26 lock (m_lock)
27 return m_state || Monitor.Wait(m_lock, timeout);
28 }
29 }
30 }
31
@@ -1,230 +1,231
1 1 ο»Ώ<?xml version="1.0" encoding="utf-8"?>
2 2 <Project DefaultTargets="Build" ToolsVersion="4.0" xmlns="http://schemas.microsoft.com/developer/msbuild/2003">
3 3 <PropertyGroup>
4 4 <Configuration Condition=" '$(Configuration)' == '' ">Debug</Configuration>
5 5 <Platform Condition=" '$(Platform)' == '' ">AnyCPU</Platform>
6 6 <ProjectGuid>{F550F1F8-8746-4AD0-9614-855F4C4B7F05}</ProjectGuid>
7 7 <OutputType>Library</OutputType>
8 8 <RootNamespace>Implab</RootNamespace>
9 9 <AssemblyName>Implab</AssemblyName>
10 10 </PropertyGroup>
11 11 <PropertyGroup Condition=" '$(Configuration)|$(Platform)' == 'Debug|AnyCPU' ">
12 12 <DebugSymbols>true</DebugSymbols>
13 13 <DebugType>full</DebugType>
14 14 <Optimize>false</Optimize>
15 15 <OutputPath>bin\Debug</OutputPath>
16 16 <DefineConstants>TRACE;DEBUG;</DefineConstants>
17 17 <ErrorReport>prompt</ErrorReport>
18 18 <WarningLevel>4</WarningLevel>
19 19 <ConsolePause>false</ConsolePause>
20 20 <RunCodeAnalysis>true</RunCodeAnalysis>
21 21 </PropertyGroup>
22 22 <PropertyGroup Condition=" '$(Configuration)|$(Platform)' == 'Release|AnyCPU' ">
23 23 <DebugType>full</DebugType>
24 24 <Optimize>true</Optimize>
25 25 <OutputPath>bin\Release</OutputPath>
26 26 <ErrorReport>prompt</ErrorReport>
27 27 <WarningLevel>4</WarningLevel>
28 28 <ConsolePause>false</ConsolePause>
29 29 </PropertyGroup>
30 30 <PropertyGroup Condition=" '$(Configuration)|$(Platform)' == 'Debug 4.5|AnyCPU' ">
31 31 <DebugSymbols>true</DebugSymbols>
32 32 <DebugType>full</DebugType>
33 33 <Optimize>false</Optimize>
34 34 <OutputPath>bin\Debug</OutputPath>
35 35 <DefineConstants>TRACE;DEBUG;NET_4_5</DefineConstants>
36 36 <ErrorReport>prompt</ErrorReport>
37 37 <WarningLevel>4</WarningLevel>
38 38 <RunCodeAnalysis>true</RunCodeAnalysis>
39 39 <ConsolePause>false</ConsolePause>
40 40 </PropertyGroup>
41 41 <PropertyGroup Condition=" '$(Configuration)|$(Platform)' == 'Release 4.5|AnyCPU' ">
42 42 <Optimize>true</Optimize>
43 43 <OutputPath>bin\Release</OutputPath>
44 44 <ErrorReport>prompt</ErrorReport>
45 45 <WarningLevel>4</WarningLevel>
46 46 <ConsolePause>false</ConsolePause>
47 47 <DefineConstants>NET_4_5</DefineConstants>
48 48 </PropertyGroup>
49 49 <PropertyGroup Condition=" '$(Configuration)|$(Platform)' == 'DebugMono|AnyCPU' ">
50 50 <DebugSymbols>true</DebugSymbols>
51 51 <DebugType>full</DebugType>
52 52 <Optimize>false</Optimize>
53 53 <OutputPath>bin\Debug</OutputPath>
54 54 <DefineConstants>TRACE;DEBUG;NET_4_5;MONO</DefineConstants>
55 55 <ErrorReport>prompt</ErrorReport>
56 56 <WarningLevel>4</WarningLevel>
57 57 <RunCodeAnalysis>true</RunCodeAnalysis>
58 58 <ConsolePause>false</ConsolePause>
59 59 </PropertyGroup>
60 60 <PropertyGroup Condition=" '$(Configuration)|$(Platform)' == 'ReleaseMono|AnyCPU' ">
61 61 <Optimize>true</Optimize>
62 62 <OutputPath>bin\Release</OutputPath>
63 63 <DefineConstants>NET_4_5;MONO;</DefineConstants>
64 64 <ErrorReport>prompt</ErrorReport>
65 65 <WarningLevel>4</WarningLevel>
66 66 <ConsolePause>false</ConsolePause>
67 67 </PropertyGroup>
68 68 <ItemGroup>
69 69 <Reference Include="System" />
70 70 <Reference Include="System.Xml" />
71 71 </ItemGroup>
72 72 <ItemGroup>
73 73 <Compile Include="Component.cs" />
74 74 <Compile Include="CustomEqualityComparer.cs" />
75 75 <Compile Include="Diagnostics\ConsoleTraceListener.cs" />
76 76 <Compile Include="Diagnostics\EventText.cs" />
77 77 <Compile Include="Diagnostics\IEventTextFormatter.cs" />
78 78 <Compile Include="Diagnostics\LogChannel.cs" />
79 79 <Compile Include="Diagnostics\LogicalOperation.cs" />
80 80 <Compile Include="Diagnostics\TextFileListener.cs" />
81 81 <Compile Include="Diagnostics\TextListenerBase.cs" />
82 82 <Compile Include="Diagnostics\TraceLog.cs" />
83 83 <Compile Include="Diagnostics\TraceEvent.cs" />
84 84 <Compile Include="Diagnostics\TraceEventType.cs" />
85 85 <Compile Include="Disposable.cs" />
86 86 <Compile Include="ICancellable.cs" />
87 87 <Compile Include="IProgressHandler.cs" />
88 88 <Compile Include="IProgressNotifier.cs" />
89 89 <Compile Include="IPromiseT.cs" />
90 90 <Compile Include="IPromise.cs" />
91 91 <Compile Include="IServiceLocator.cs" />
92 92 <Compile Include="ITaskController.cs" />
93 93 <Compile Include="JSON\JSONElementContext.cs" />
94 94 <Compile Include="JSON\JSONElementType.cs" />
95 95 <Compile Include="JSON\JSONGrammar.cs" />
96 96 <Compile Include="JSON\JSONParser.cs" />
97 97 <Compile Include="JSON\JSONScanner.cs" />
98 98 <Compile Include="JSON\JsonTokenType.cs" />
99 99 <Compile Include="JSON\JSONWriter.cs" />
100 100 <Compile Include="JSON\JSONXmlReader.cs" />
101 101 <Compile Include="JSON\JSONXmlReaderOptions.cs" />
102 102 <Compile Include="JSON\StringTranslator.cs" />
103 103 <Compile Include="Parallels\DispatchPool.cs" />
104 104 <Compile Include="Parallels\ArrayTraits.cs" />
105 105 <Compile Include="Parallels\MTQueue.cs" />
106 106 <Compile Include="Parallels\WorkerPool.cs" />
107 107 <Compile Include="Parsing\Alphabet.cs" />
108 108 <Compile Include="Parsing\AlphabetBase.cs" />
109 109 <Compile Include="Parsing\AltToken.cs" />
110 110 <Compile Include="Parsing\BinaryToken.cs" />
111 111 <Compile Include="Parsing\CatToken.cs" />
112 112 <Compile Include="Parsing\CDFADefinition.cs" />
113 113 <Compile Include="Parsing\DFABuilder.cs" />
114 114 <Compile Include="Parsing\DFADefinitionBase.cs" />
115 115 <Compile Include="Parsing\DFAStateDescriptor.cs" />
116 116 <Compile Include="Parsing\DFAutomaton.cs" />
117 117 <Compile Include="Parsing\EDFADefinition.cs" />
118 118 <Compile Include="Parsing\EmptyToken.cs" />
119 119 <Compile Include="Parsing\EndToken.cs" />
120 120 <Compile Include="Parsing\EnumAlphabet.cs" />
121 121 <Compile Include="Parsing\Grammar.cs" />
122 122 <Compile Include="Parsing\IAlphabet.cs" />
123 123 <Compile Include="Parsing\IDFADefinition.cs" />
124 124 <Compile Include="Parsing\IVisitor.cs" />
125 125 <Compile Include="Parsing\ParserException.cs" />
126 126 <Compile Include="Parsing\Scanner.cs" />
127 127 <Compile Include="Parsing\StarToken.cs" />
128 128 <Compile Include="Parsing\SymbolToken.cs" />
129 129 <Compile Include="Parsing\Token.cs" />
130 130 <Compile Include="ServiceLocator.cs" />
131 131 <Compile Include="TaskController.cs" />
132 132 <Compile Include="ProgressInitEventArgs.cs" />
133 133 <Compile Include="Properties\AssemblyInfo.cs" />
134 134 <Compile Include="Parallels\AsyncPool.cs" />
135 135 <Compile Include="Safe.cs" />
136 136 <Compile Include="ValueEventArgs.cs" />
137 137 <Compile Include="PromiseExtensions.cs" />
138 138 <Compile Include="SyncContextPromise.cs" />
139 139 <Compile Include="Diagnostics\OperationContext.cs" />
140 140 <Compile Include="Diagnostics\TraceContext.cs" />
141 141 <Compile Include="Diagnostics\LogEventArgs.cs" />
142 142 <Compile Include="Diagnostics\LogEventArgsT.cs" />
143 143 <Compile Include="Diagnostics\Extensions.cs" />
144 144 <Compile Include="IComponentContainer.cs" />
145 145 <Compile Include="PromiseEventType.cs" />
146 146 <Compile Include="Parallels\MTCustomQueue.cs" />
147 147 <Compile Include="Parallels\MTCustomQueueNode.cs" />
148 148 <Compile Include="ComponentContainer.cs" />
149 149 <Compile Include="DisposablePool.cs" />
150 150 <Compile Include="ObjectPool.cs" />
151 151 <Compile Include="Parallels\AsyncQueue.cs" />
152 152 <Compile Include="PromiseT.cs" />
153 153 <Compile Include="IDeferred.cs" />
154 154 <Compile Include="IDeferredT.cs" />
155 155 <Compile Include="AbstractPromise.cs" />
156 156 <Compile Include="Promise.cs" />
157 157 <Compile Include="PromiseTransientException.cs" />
158 <Compile Include="Parallels\Signal.cs" />
158 159 </ItemGroup>
159 160 <Import Project="$(MSBuildBinPath)\Microsoft.CSharp.targets" />
160 161 <ItemGroup />
161 162 <ProjectExtensions>
162 163 <MonoDevelop>
163 164 <Properties>
164 165 <Policies>
165 166 <CSharpFormattingPolicy IndentSwitchBody="True" NamespaceBraceStyle="EndOfLine" ClassBraceStyle="EndOfLine" InterfaceBraceStyle="EndOfLine" StructBraceStyle="EndOfLine" EnumBraceStyle="EndOfLine" MethodBraceStyle="EndOfLine" ConstructorBraceStyle="EndOfLine" DestructorBraceStyle="EndOfLine" BeforeMethodDeclarationParentheses="False" BeforeMethodCallParentheses="False" BeforeConstructorDeclarationParentheses="False" NewLineBeforeConstructorInitializerColon="NewLine" NewLineAfterConstructorInitializerColon="SameLine" BeforeIndexerDeclarationBracket="False" BeforeDelegateDeclarationParentheses="False" NewParentheses="False" SpacesBeforeBrackets="False" inheritsSet="Mono" inheritsScope="text/x-csharp" scope="text/x-csharp" />
166 167 <TextStylePolicy FileWidth="120" EolMarker="Unix" inheritsSet="VisualStudio" inheritsScope="text/plain" scope="text/x-csharp" />
167 168 <DotNetNamingPolicy DirectoryNamespaceAssociation="PrefixedHierarchical" ResourceNamePolicy="MSBuild" />
168 169 <TextStylePolicy FileWidth="120" TabsToSpaces="False" inheritsSet="VisualStudio" inheritsScope="text/plain" scope="application/xml" />
169 170 <XmlFormattingPolicy inheritsSet="Mono" inheritsScope="application/xml" scope="application/xml" />
170 171 <TextStylePolicy FileWidth="120" TabsToSpaces="False" inheritsSet="VisualStudio" inheritsScope="text/plain" scope="text/plain" />
171 172 <NameConventionPolicy>
172 173 <Rules>
173 174 <NamingRule Name="Namespaces" AffectedEntity="Namespace" VisibilityMask="VisibilityMask" NamingStyle="PascalCase" IncludeInstanceMembers="True" IncludeStaticEntities="True" />
174 175 <NamingRule Name="Types" AffectedEntity="Class, Struct, Enum, Delegate" VisibilityMask="VisibilityMask" NamingStyle="PascalCase" IncludeInstanceMembers="True" IncludeStaticEntities="True" />
175 176 <NamingRule Name="Interfaces" AffectedEntity="Interface" VisibilityMask="VisibilityMask" NamingStyle="PascalCase" IncludeInstanceMembers="True" IncludeStaticEntities="True">
176 177 <RequiredPrefixes>
177 178 <String>I</String>
178 179 </RequiredPrefixes>
179 180 </NamingRule>
180 181 <NamingRule Name="Attributes" AffectedEntity="CustomAttributes" VisibilityMask="VisibilityMask" NamingStyle="PascalCase" IncludeInstanceMembers="True" IncludeStaticEntities="True">
181 182 <RequiredSuffixes>
182 183 <String>Attribute</String>
183 184 </RequiredSuffixes>
184 185 </NamingRule>
185 186 <NamingRule Name="Event Arguments" AffectedEntity="CustomEventArgs" VisibilityMask="VisibilityMask" NamingStyle="PascalCase" IncludeInstanceMembers="True" IncludeStaticEntities="True">
186 187 <RequiredSuffixes>
187 188 <String>EventArgs</String>
188 189 </RequiredSuffixes>
189 190 </NamingRule>
190 191 <NamingRule Name="Exceptions" AffectedEntity="CustomExceptions" VisibilityMask="VisibilityMask" NamingStyle="PascalCase" IncludeInstanceMembers="True" IncludeStaticEntities="True">
191 192 <RequiredSuffixes>
192 193 <String>Exception</String>
193 194 </RequiredSuffixes>
194 195 </NamingRule>
195 196 <NamingRule Name="Methods" AffectedEntity="Methods" VisibilityMask="VisibilityMask" NamingStyle="PascalCase" IncludeInstanceMembers="True" IncludeStaticEntities="True" />
196 197 <NamingRule Name="Static Readonly Fields" AffectedEntity="ReadonlyField" VisibilityMask="Internal, Protected, Public" NamingStyle="CamelCase" IncludeInstanceMembers="False" IncludeStaticEntities="True" />
197 198 <NamingRule Name="Fields (Non Private)" AffectedEntity="Field" VisibilityMask="Internal, Public" NamingStyle="CamelCase" IncludeInstanceMembers="True" IncludeStaticEntities="True" />
198 199 <NamingRule Name="ReadOnly Fields (Non Private)" AffectedEntity="ReadonlyField" VisibilityMask="Internal, Public" NamingStyle="CamelCase" IncludeInstanceMembers="True" IncludeStaticEntities="False" />
199 200 <NamingRule Name="Fields (Private)" AffectedEntity="Field, ReadonlyField" VisibilityMask="Private, Protected" NamingStyle="CamelCase" IncludeInstanceMembers="True" IncludeStaticEntities="False">
200 201 <RequiredPrefixes>
201 202 <String>m_</String>
202 203 </RequiredPrefixes>
203 204 </NamingRule>
204 205 <NamingRule Name="Static Fields (Private)" AffectedEntity="Field" VisibilityMask="Private" NamingStyle="CamelCase" IncludeInstanceMembers="False" IncludeStaticEntities="True">
205 206 <RequiredPrefixes>
206 207 <String>_</String>
207 208 </RequiredPrefixes>
208 209 </NamingRule>
209 210 <NamingRule Name="ReadOnly Fields (Private)" AffectedEntity="ReadonlyField" VisibilityMask="Private, Protected" NamingStyle="CamelCase" IncludeInstanceMembers="True" IncludeStaticEntities="False">
210 211 <RequiredPrefixes>
211 212 <String>m_</String>
212 213 </RequiredPrefixes>
213 214 </NamingRule>
214 215 <NamingRule Name="Constant Fields" AffectedEntity="ConstantField" VisibilityMask="VisibilityMask" NamingStyle="AllUpper" IncludeInstanceMembers="True" IncludeStaticEntities="True" />
215 216 <NamingRule Name="Properties" AffectedEntity="Property" VisibilityMask="VisibilityMask" NamingStyle="PascalCase" IncludeInstanceMembers="True" IncludeStaticEntities="True" />
216 217 <NamingRule Name="Events" AffectedEntity="Event" VisibilityMask="VisibilityMask" NamingStyle="PascalCase" IncludeInstanceMembers="True" IncludeStaticEntities="True" />
217 218 <NamingRule Name="Enum Members" AffectedEntity="EnumMember" VisibilityMask="VisibilityMask" NamingStyle="PascalCase" IncludeInstanceMembers="True" IncludeStaticEntities="True" />
218 219 <NamingRule Name="Parameters" AffectedEntity="Parameter, LocalVariable" VisibilityMask="VisibilityMask" NamingStyle="CamelCase" IncludeInstanceMembers="True" IncludeStaticEntities="True" />
219 220 <NamingRule Name="Type Parameters" AffectedEntity="TypeParameter" VisibilityMask="VisibilityMask" NamingStyle="PascalCase" IncludeInstanceMembers="True" IncludeStaticEntities="True">
220 221 <RequiredPrefixes>
221 222 <String>T</String>
222 223 </RequiredPrefixes>
223 224 </NamingRule>
224 225 </Rules>
225 226 </NameConventionPolicy>
226 227 </Policies>
227 228 </Properties>
228 229 </MonoDevelop>
229 230 </ProjectExtensions>
230 231 </Project> No newline at end of file
@@ -1,629 +1,629
1 1 using System.Threading;
2 2 using System.Collections.Generic;
3 3 using System;
4 4 using System.Collections;
5 5 using System.Diagnostics;
6 6
7 7 namespace Implab.Parallels {
8 8 public class AsyncQueue<T> : IEnumerable<T> {
9 9 class Chunk {
10 10 public Chunk next;
11 11
12 12 int m_low;
13 13 int m_hi;
14 14 int m_alloc;
15 15 readonly int m_size;
16 16 readonly T[] m_data;
17 17
18 18 public Chunk(int size) {
19 19 m_size = size;
20 20 m_data = new T[size];
21 21 }
22 22
23 23 public Chunk(int size, T value) {
24 24 m_size = size;
25 25 m_hi = 1;
26 26 m_alloc = 1;
27 27 m_data = new T[size];
28 28 m_data[0] = value;
29 29 }
30 30
31 31 public Chunk(int size, T[] data, int offset, int length, int alloc) {
32 32 m_size = size;
33 33 m_hi = length;
34 34 m_alloc = alloc;
35 35 m_data = new T[size];
36 36 Array.Copy(data, offset, m_data, 0, length);
37 37 }
38 38
39 39 public int Low {
40 40 get { return m_low; }
41 41 }
42 42
43 43 public int Hi {
44 44 get { return m_hi; }
45 45 }
46 46
47 47 public int Size {
48 48 get { return m_size; }
49 49 }
50 50
51 51 public bool TryEnqueue(T value, out bool extend) {
52 52 var alloc = Interlocked.Increment(ref m_alloc) - 1;
53 53
54 54 if (alloc >= m_size) {
55 55 extend = alloc == m_size;
56 56 return false;
57 57 }
58 58
59 59 extend = false;
60 60 m_data[alloc] = value;
61 61
62 62 while (alloc != Interlocked.CompareExchange(ref m_hi, alloc + 1, alloc)) {
63 63 // spin wait for commit
64 64 }
65 65 return true;
66 66 }
67 67
68 68 /// <summary>
69 69 /// Prevents from allocating new space in the chunk and waits for all write operations to complete
70 70 /// </summary>
71 71 public void Commit() {
72 72 var actual = Math.Min(Interlocked.Exchange(ref m_alloc, m_size + 1), m_size);
73 73
74 74 while (m_hi != actual)
75 75 Thread.MemoryBarrier();
76 76 }
77 77
78 78 public bool TryDequeue(out T value, out bool recycle) {
79 79 int low;
80 80 do {
81 81 low = m_low;
82 82 if (low >= m_hi) {
83 83 value = default(T);
84 84 recycle = (low == m_size);
85 85 return false;
86 86 }
87 87 } while(low != Interlocked.CompareExchange(ref m_low, low + 1, low));
88 88
89 89 recycle = (low == m_size - 1);
90 90 value = m_data[low];
91 91
92 92 return true;
93 93 }
94 94
95 95 public bool TryEnqueueBatch(T[] batch, int offset, int length, out int enqueued, out bool extend) {
96 96 //int alloc;
97 97 //int allocSize;
98 98
99 99 var alloc = Interlocked.Add(ref m_alloc, length) - length;
100 100 if (alloc > m_size) {
101 101 // the chunk is full and someone already
102 102 // creating the new one
103 103 enqueued = 0; // nothing was added
104 104 extend = false; // the caller shouldn't try to extend the queue
105 105 return false; // nothing was added
106 106 }
107 107
108 108 enqueued = Math.Min(m_size - alloc, length);
109 109 extend = length > enqueued;
110 110
111 111 if (enqueued == 0)
112 112 return false;
113 113
114 114
115 115 Array.Copy(batch, offset, m_data, alloc, enqueued);
116 116
117 117 while (alloc != Interlocked.CompareExchange(ref m_hi, alloc + enqueued, alloc)) {
118 118 // spin wait for commit
119 119 }
120 120
121 121 return true;
122 122 }
123 123
124 124 public bool TryDequeueBatch(T[] buffer, int offset, int length,out int dequeued, out bool recycle) {
125 125 int low, hi, batchSize;
126 126
127 127 do {
128 128 low = m_low;
129 129 hi = m_hi;
130 130 if (low >= hi) {
131 131 dequeued = 0;
132 132 recycle = (low == m_size); // recycling could be restarted and we need to signal again
133 133 return false;
134 134 }
135 135 batchSize = Math.Min(hi - low, length);
136 136 } while(low != Interlocked.CompareExchange(ref m_low, low + batchSize, low));
137 137
138 138 recycle = (low == m_size - batchSize);
139 139 dequeued = batchSize;
140 140
141 141 Array.Copy(m_data, low, buffer, offset, batchSize);
142 142
143 143 return true;
144 144 }
145 145
146 146 public T GetAt(int pos) {
147 147 return m_data[pos];
148 148 }
149 149 }
150 150
151 151 public const int DEFAULT_CHUNK_SIZE = 32;
152 152 public const int MAX_CHUNK_SIZE = 262144;
153 153
154 154 Chunk m_first;
155 155 Chunk m_last;
156 156
157 157 /// <summary>
158 158 /// Adds the specified value to the queue.
159 159 /// </summary>
160 160 /// <param name="value">Tha value which will be added to the queue.</param>
161 161 public void Enqueue(T value) {
162 162 var last = m_last;
163 163 // spin wait to the new chunk
164 164 bool extend = true;
165 165 while (last == null || !last.TryEnqueue(value, out extend)) {
166 166 // try to extend queue
167 167 if (extend || last == null) {
168 168 var chunk = new Chunk(DEFAULT_CHUNK_SIZE, value);
169 169 if (EnqueueChunk(last, chunk))
170 170 break; // success! exit!
171 171 last = m_last;
172 172 } else {
173 173 while (last == m_last) {
174 174 Thread.MemoryBarrier();
175 175 }
176 176 last = m_last;
177 177 }
178 178 }
179 179 }
180 180
181 181 /// <summary>
182 182 /// Adds the specified data to the queue.
183 183 /// </summary>
184 184 /// <param name="data">The buffer which contains the data to be enqueued.</param>
185 185 /// <param name="offset">The offset of the data in the buffer.</param>
186 186 /// <param name="length">The size of the data to read from the buffer.</param>
187 187 public void EnqueueRange(T[] data, int offset, int length) {
188 188 if (data == null)
189 189 throw new ArgumentNullException("data");
190 190 if (offset < 0)
191 191 throw new ArgumentOutOfRangeException("offset");
192 192 if (length < 1 || offset + length > data.Length)
193 193 throw new ArgumentOutOfRangeException("length");
194 194
195 195 var last = m_last;
196 196
197 197 bool extend;
198 198 int enqueued;
199 199
200 200 while (length > 0) {
201 201 extend = true;
202 202 if (last != null && last.TryEnqueueBatch(data, offset, length, out enqueued, out extend)) {
203 203 length -= enqueued;
204 204 offset += enqueued;
205 205 }
206 206
207 207 if (extend) {
208 208 // there was no enough space in the chunk
209 209 // or there was no chunks in the queue
210 210
211 211 while (length > 0) {
212 212
213 213 var size = Math.Min(length, MAX_CHUNK_SIZE);
214 214
215 215 var chunk = new Chunk(
216 216 Math.Max(size, DEFAULT_CHUNK_SIZE),
217 217 data,
218 218 offset,
219 219 size,
220 220 length // length >= size
221 221 );
222 222
223 223 if (!EnqueueChunk(last, chunk)) {
224 224 // looks like the queue has been updated then proceed from the beginning
225 225 last = m_last;
226 226 break;
227 227 }
228 228
229 229 // we have successfully added the new chunk
230 230 last = chunk;
231 231 length -= size;
232 232 offset += size;
233 233 }
234 234 } else {
235 235 // we don't need to extend the queue, if we successfully enqueued data
236 236 if (length == 0)
237 237 break;
238 238
239 239 // if we need to wait while someone is extending the queue
240 240 // spinwait
241 241 while (last == m_last) {
242 242 Thread.MemoryBarrier();
243 243 }
244 244
245 245 last = m_last;
246 246 }
247 247 }
248 248 }
249 249
250 250 /// <summary>
251 251 /// Tries to retrieve the first element from the queue.
252 252 /// </summary>
253 253 /// <returns><c>true</c>, if element is dequeued, <c>false</c> otherwise.</returns>
254 254 /// <param name="value">The value of the dequeued element.</param>
255 255 public bool TryDequeue(out T value) {
256 256 var chunk = m_first;
257 257 bool recycle;
258 258 while (chunk != null) {
259 259
260 260 var result = chunk.TryDequeue(out value, out recycle);
261 261
262 262 if (recycle) // this chunk is waste
263 263 RecycleFirstChunk(chunk);
264 264 else
265 265 return result; // this chunk is usable and returned actual result
266 266
267 267 if (result) // this chunk is waste but the true result is always actual
268 268 return true;
269 269
270 270 // try again
271 271 chunk = m_first;
272 272 }
273 273
274 274 // the queue is empty
275 275 value = default(T);
276 276 return false;
277 277 }
278 278
279 279 /// <summary>
280 280 /// Tries to dequeue the specified amount of data from the queue.
281 281 /// </summary>
282 282 /// <returns><c>true</c>, if data was deuqueued, <c>false</c> otherwise.</returns>
283 283 /// <param name="buffer">The buffer to which the data will be written.</param>
284 284 /// <param name="offset">The offset in the buffer at which the data will be written.</param>
285 285 /// <param name="length">The maximum amount of data to be retrieved.</param>
286 286 /// <param name="dequeued">The actual amout of the retrieved data.</param>
287 287 public bool TryDequeueRange(T[] buffer, int offset, int length, out int dequeued) {
288 288 if (buffer == null)
289 289 throw new ArgumentNullException("buffer");
290 290 if (offset < 0)
291 291 throw new ArgumentOutOfRangeException("offset");
292 292 if (length < 1 || offset + length > buffer.Length)
293 293 throw new ArgumentOutOfRangeException("length");
294 294
295 295 var chunk = m_first;
296 296 bool recycle;
297 297 dequeued = 0;
298 298 while (chunk != null) {
299 299
300 300 int actual;
301 301 if (chunk.TryDequeueBatch(buffer, offset, length, out actual, out recycle)) {
302 302 offset += actual;
303 303 length -= actual;
304 304 dequeued += actual;
305 305 }
306 306
307 307 if (recycle) // this chunk is waste
308 308 RecycleFirstChunk(chunk);
309 309 else if (actual == 0)
310 310 break; // the chunk is usable, but doesn't contain any data (it's the last chunk in the queue)
311 311
312 312 if (length == 0)
313 313 return true;
314 314
315 315 // we still may dequeue something
316 316 // try again
317 317 chunk = m_first;
318 318 }
319 319
320 320 return dequeued != 0;
321 321 }
322 322
323 323 /// <summary>
324 324 /// Tries to dequeue all remaining data in the first chunk.
325 325 /// </summary>
326 326 /// <returns><c>true</c>, if data was dequeued, <c>false</c> otherwise.</returns>
327 327 /// <param name="buffer">The buffer to which the data will be written.</param>
328 328 /// <param name="offset">The offset in the buffer at which the data will be written.</param>
329 329 /// <param name="length">Tha maximum amount of the data to be dequeued.</param>
330 330 /// <param name="dequeued">The actual amount of the dequeued data.</param>
331 331 public bool TryDequeueChunk(T[] buffer, int offset, int length, out int dequeued) {
332 332 if (buffer == null)
333 333 throw new ArgumentNullException("buffer");
334 334 if (offset < 0)
335 335 throw new ArgumentOutOfRangeException("offset");
336 336 if (length < 1 || offset + length > buffer.Length)
337 337 throw new ArgumentOutOfRangeException("length");
338 338
339 339 var chunk = m_first;
340 340 bool recycle;
341 341 dequeued = 0;
342 342
343 343 while (chunk != null) {
344 344
345 345 int actual;
346 346 if (chunk.TryDequeueBatch(buffer, offset, length, out actual, out recycle)) {
347 347 dequeued = actual;
348 348 }
349 349
350 350 if (recycle) // this chunk is waste
351 351 RecycleFirstChunk(chunk);
352 352
353 353 // if we have dequeued any data, then return
354 354 if (dequeued != 0)
355 355 return true;
356 356
357 357 // we still may dequeue something
358 358 // try again
359 359 chunk = m_first;
360 360 }
361 361
362 362 return false;
363 363 }
364 364
365 365 bool EnqueueChunk(Chunk last, Chunk chunk) {
366 366 if (Interlocked.CompareExchange(ref m_last, chunk, last) != last)
367 367 return false;
368 368
369 369 if (last != null)
370 370 last.next = chunk;
371 371 else {
372 372 m_first = chunk;
373 373 }
374 374 return true;
375 375 }
376 376
377 377 void RecycleFirstChunk(Chunk first) {
378 378 var next = first.next;
379 379
380 380 if (first != Interlocked.CompareExchange(ref m_first, next, first))
381 381 return;
382 382
383 383 if (next == null) {
384 384
385 385 if (first != Interlocked.CompareExchange(ref m_last, null, first)) {
386 386 /*while (first.next == null)
387 387 Thread.MemoryBarrier();*/
388 388
389 389 // race
390 390 // someone already updated the tail, restore the pointer to the queue head
391 391 m_first = first;
392 392 }
393 393 // the tail is updated
394 394 }
395 395
396 396 // we need to update the head
397 397 //Interlocked.CompareExchange(ref m_first, next, first);
398 398 // if the head is already updated then give up
399 399 //return;
400 400
401 401 }
402 402
403 403 public void Clear() {
404 404 // start the new queue
405 405 var chunk = new Chunk(DEFAULT_CHUNK_SIZE);
406 406
407 407 do {
408 408 Thread.MemoryBarrier();
409 409 var first = m_first;
410 410 var last = m_last;
411 411
412 412 if (last == null) // nothing to clear
413 413 return;
414 414
415 415 if (first == null || (first.next == null && first != last)) // inconcistency
416 416 continue;
417 417
418 418 // here we will create inconsistency which will force others to spin
419 419 // and prevent from fetching. chunk.next = null
420 420 if (first != Interlocked.CompareExchange(ref m_first, chunk, first))
421 421 continue;// inconsistent
422 422
423 423 m_last = chunk;
424 424
425 425 return;
426 426
427 427 } while(true);
428 428 }
429 429
430 430 public T[] Drain() {
431 431 // start the new queue
432 432 var chunk = new Chunk(DEFAULT_CHUNK_SIZE);
433 433
434 434 do {
435 435 Thread.MemoryBarrier();
436 436 var first = m_first;
437 437 var last = m_last;
438 438
439 439 if (last == null)
440 440 return new T[0];
441 441
442 442 if (first == null || (first.next == null && first != last))
443 443 continue;
444 444
445 445 // here we will create inconsistency which will force others to spin
446 446 // and prevent from fetching. chunk.next = null
447 447 if (first != Interlocked.CompareExchange(ref m_first, chunk, first))
448 448 continue;// inconsistent
449 449
450 450 last = Interlocked.Exchange(ref m_last, chunk);
451 451
452 452 return ReadChunks(first, last);
453 453
454 454 } while(true);
455 455 }
456 456
457 T[] ReadChunks(Chunk chunk, object last) {
457 static T[] ReadChunks(Chunk chunk, object last) {
458 458 var result = new List<T>();
459 459 var buffer = new T[DEFAULT_CHUNK_SIZE];
460 460 int actual;
461 461 bool recycle;
462 462 while (chunk != null) {
463 463 // ensure all write operations on the chunk are complete
464 464 chunk.Commit();
465 465
466 466 // we need to read the chunk using this way
467 467 // since some client still may completing the dequeue
468 468 // operation, such clients most likely won't get results
469 469 while (chunk.TryDequeueBatch(buffer, 0, buffer.Length, out actual, out recycle))
470 470 result.AddRange(new ArraySegmentCollection(buffer, 0, actual));
471 471
472 472 if (chunk == last) {
473 473 chunk = null;
474 474 } else {
475 475 while (chunk.next == null)
476 476 Thread.MemoryBarrier();
477 477 chunk = chunk.next;
478 478 }
479 479 }
480 480
481 481 return result.ToArray();
482 482 }
483 483
484 484 struct ArraySegmentCollection : ICollection<T> {
485 485 readonly T[] m_data;
486 486 readonly int m_offset;
487 487 readonly int m_length;
488 488
489 489 public ArraySegmentCollection(T[] data, int offset, int length) {
490 490 m_data = data;
491 491 m_offset = offset;
492 492 m_length = length;
493 493 }
494 494
495 495 #region ICollection implementation
496 496
497 497 public void Add(T item) {
498 498 throw new InvalidOperationException();
499 499 }
500 500
501 501 public void Clear() {
502 502 throw new InvalidOperationException();
503 503 }
504 504
505 505 public bool Contains(T item) {
506 506 return false;
507 507 }
508 508
509 509 public void CopyTo(T[] array, int arrayIndex) {
510 510 Array.Copy(m_data,m_offset,array,arrayIndex, m_length);
511 511 }
512 512
513 513 public bool Remove(T item) {
514 514 throw new NotImplementedException();
515 515 }
516 516
517 517 public int Count {
518 518 get {
519 519 return m_length;
520 520 }
521 521 }
522 522
523 523 public bool IsReadOnly {
524 524 get {
525 525 return true;
526 526 }
527 527 }
528 528
529 529 #endregion
530 530
531 531 #region IEnumerable implementation
532 532
533 533 public IEnumerator<T> GetEnumerator() {
534 534 for (int i = m_offset; i < m_length + m_offset; i++)
535 535 yield return m_data[i];
536 536 }
537 537
538 538 #endregion
539 539
540 540 #region IEnumerable implementation
541 541
542 542 IEnumerator IEnumerable.GetEnumerator() {
543 543 return GetEnumerator();
544 544 }
545 545
546 546 #endregion
547 547 }
548 548
549 549 #region IEnumerable implementation
550 550
551 551 class Enumerator : IEnumerator<T> {
552 552 Chunk m_current;
553 553 int m_pos = -1;
554 554
555 555 public Enumerator(Chunk fisrt) {
556 556 m_current = fisrt;
557 557 }
558 558
559 559 #region IEnumerator implementation
560 560
561 561 public bool MoveNext() {
562 562 if (m_current == null)
563 563 return false;
564 564
565 565 if (m_pos == -1)
566 566 m_pos = m_current.Low;
567 567 else
568 568 m_pos++;
569 569
570 570 if (m_pos == m_current.Hi) {
571 571
572 572 m_current = m_pos == m_current.Size ? m_current.next : null;
573 573
574 574 m_pos = 0;
575 575
576 576 if (m_current == null)
577 577 return false;
578 578 }
579 579
580 580 return true;
581 581 }
582 582
583 583 public void Reset() {
584 584 throw new NotSupportedException();
585 585 }
586 586
587 587 object IEnumerator.Current {
588 588 get {
589 589 return Current;
590 590 }
591 591 }
592 592
593 593 #endregion
594 594
595 595 #region IDisposable implementation
596 596
597 597 public void Dispose() {
598 598 }
599 599
600 600 #endregion
601 601
602 602 #region IEnumerator implementation
603 603
604 604 public T Current {
605 605 get {
606 606 if (m_pos == -1 || m_current == null)
607 607 throw new InvalidOperationException();
608 608 return m_current.GetAt(m_pos);
609 609 }
610 610 }
611 611
612 612 #endregion
613 613 }
614 614
615 615 public IEnumerator<T> GetEnumerator() {
616 616 return new Enumerator(m_first);
617 617 }
618 618
619 619 #endregion
620 620
621 621 #region IEnumerable implementation
622 622
623 623 IEnumerator IEnumerable.GetEnumerator() {
624 624 return GetEnumerator();
625 625 }
626 626
627 627 #endregion
628 628 }
629 629 }
@@ -1,86 +1,99
1 1 using System;
2 2 using System.Collections.Generic;
3 3 using System.Linq;
4 4 using System.Text;
5 5 using System.Text.RegularExpressions;
6 6 using System.Diagnostics;
7 7
8 8 namespace Implab
9 9 {
10 10 public static class Safe
11 11 {
12 12 public static void ArgumentMatch(string value, string paramName, Regex rx) {
13 13 if (rx == null)
14 14 throw new ArgumentNullException("rx");
15 15 if (!rx.IsMatch(value))
16 16 throw new ArgumentException(String.Format("The prameter value must match {0}", rx), paramName);
17 17 }
18 18
19 19 public static void ArgumentNotEmpty(string value, string paramName) {
20 20 if (String.IsNullOrEmpty(value))
21 21 throw new ArgumentException("The parameter can't be empty", paramName);
22 22 }
23 23
24 24 public static void ArgumentNotEmpty<T>(T[] value, string paramName) {
25 25 if (value == null || value.Length == 0)
26 26 throw new ArgumentException("The array must be not emty", paramName);
27 27 }
28 28
29 29 public static void ArgumentNotNull(object value, string paramName) {
30 30 if (value == null)
31 31 throw new ArgumentNullException(paramName);
32 32 }
33 33
34 34 public static void ArgumentInRange(int value, int min, int max, string paramName) {
35 35 if (value < min || value > max)
36 36 throw new ArgumentOutOfRangeException(paramName);
37 37 }
38 38
39 public static void Dispose(params IDisposable[] objects)
40 {
41 foreach(var d in objects)
39 public static void Dispose(params IDisposable[] objects) {
40 foreach (var d in objects)
42 41 if (d != null)
43 42 d.Dispose();
44 43 }
45 44
45 public static void Dispose(params object[] objects) {
46 foreach (var obj in objects) {
47 var d = obj as IDisposable;
48 if (d != null)
49 d.Dispose();
50 }
51 }
52
53 public static void Dispose(object obj) {
54 var d = obj as IDisposable;
55 if (d != null)
56 d.Dispose();
57 }
58
46 59 [DebuggerStepThrough]
47 60 public static IPromise<T> InvokePromise<T>(Func<T> action) {
48 61 ArgumentNotNull(action, "action");
49 62
50 63 var p = new Promise<T>();
51 64 try {
52 65 p.Resolve(action());
53 66 } catch (Exception err) {
54 67 p.Reject(err);
55 68 }
56 69
57 70 return p;
58 71 }
59 72
60 73 [DebuggerStepThrough]
61 74 public static IPromise InvokePromise(Action action) {
62 75 ArgumentNotNull(action, "action");
63 76
64 77 var p = new Promise();
65 78 try {
66 79 action();
67 80 p.Resolve();
68 81 } catch (Exception err) {
69 82 p.Reject(err);
70 83 }
71 84
72 85 return p;
73 86 }
74 87
75 88 [DebuggerStepThrough]
76 89 public static IPromise<T> InvokePromise<T>(Func<IPromise<T>> action) {
77 90 ArgumentNotNull(action, "action");
78 91
79 92 try {
80 93 return action() ?? Promise<T>.ExceptionToPromise(new Exception("The action returned null"));
81 94 } catch (Exception err) {
82 95 return Promise<T>.ExceptionToPromise(err);
83 96 }
84 97 }
85 98 }
86 99 }
General Comments 0
You need to be logged in to leave comments. Login now