##// END OF EJS Templates
added the blocking queue
cin -
r137:238e15580926 v2
parent child
Show More
@@ -0,0 +1,87
1 using System;
2 using System.Threading;
3
4 namespace Implab.Parallels {
5 public class BlockingQueue<T> : AsyncQueue<T> {
6 readonly object m_lock = new object();
7
8 public override void Enqueue(T value) {
9 base.Enqueue(value);
10 lock (m_lock)
11 Monitor.Pulse(m_lock);
12 }
13
14 public override void EnqueueRange(T[] data, int offset, int length) {
15 base.EnqueueRange(data, offset, length);
16 if (length > 1)
17 lock (m_lock)
18 Monitor.PulseAll(m_lock);
19 else
20 lock (m_lock)
21 Monitor.Pulse(m_lock);
22 }
23
24 public T GetItem(int timeout) {
25 T item;
26 var t1 = Environment.TickCount;
27 var dt = timeout;
28 while (!TryDequeue(out item)) {
29 lock (m_lock)
30 if (!Monitor.Wait(m_lock, dt))
31 throw new TimeoutException();
32 if (timeout >= 0) {
33 dt = timeout - Environment.TickCount + t1;
34 if (dt < 0)
35 throw new TimeoutException();
36 }
37 }
38 return item;
39 }
40
41 public T GetItem() {
42 T item;
43 while (!TryDequeue(out item))
44 lock (m_lock)
45 Monitor.Wait(m_lock);
46 return item;
47 }
48
49 public T[] GetRange(int max, int timeout) {
50 Safe.ArgumentInRange(max, 1, Int32.MaxValue, "max");
51
52 var buffer = new T[max];
53 int actual;
54 var t1 = Environment.TickCount;
55 var dt = timeout;
56 while (!TryDequeueRange(buffer,0,max,out actual)) {
57 lock (m_lock)
58 if (!Monitor.Wait(m_lock, dt))
59 throw new TimeoutException();
60 if (timeout >= 0) {
61 dt = timeout - Environment.TickCount + t1;
62 if (dt < 0)
63 throw new TimeoutException();
64 }
65 }
66
67 var data = new T[actual];
68 Array.Copy(buffer, data, actual);
69 return data;
70 }
71
72 public T[] GetRange(int max) {
73 Safe.ArgumentInRange(max, 1, Int32.MaxValue, "max");
74
75 var buffer = new T[max];
76 int actual;
77 while (!TryDequeueRange(buffer, 0, max, out actual))
78 lock (m_lock)
79 Monitor.Wait(m_lock);
80
81 var data = new T[actual];
82 Array.Copy(buffer, data, actual);
83 return data;
84 }
85 }
86 }
87
@@ -1,230 +1,231
1 ο»Ώ<?xml version="1.0" encoding="utf-8"?>
1 ο»Ώ<?xml version="1.0" encoding="utf-8"?>
2 <Project DefaultTargets="Build" ToolsVersion="4.0" xmlns="http://schemas.microsoft.com/developer/msbuild/2003">
2 <Project DefaultTargets="Build" ToolsVersion="4.0" xmlns="http://schemas.microsoft.com/developer/msbuild/2003">
3 <PropertyGroup>
3 <PropertyGroup>
4 <Configuration Condition=" '$(Configuration)' == '' ">Debug</Configuration>
4 <Configuration Condition=" '$(Configuration)' == '' ">Debug</Configuration>
5 <Platform Condition=" '$(Platform)' == '' ">AnyCPU</Platform>
5 <Platform Condition=" '$(Platform)' == '' ">AnyCPU</Platform>
6 <ProjectGuid>{F550F1F8-8746-4AD0-9614-855F4C4B7F05}</ProjectGuid>
6 <ProjectGuid>{F550F1F8-8746-4AD0-9614-855F4C4B7F05}</ProjectGuid>
7 <OutputType>Library</OutputType>
7 <OutputType>Library</OutputType>
8 <RootNamespace>Implab</RootNamespace>
8 <RootNamespace>Implab</RootNamespace>
9 <AssemblyName>Implab</AssemblyName>
9 <AssemblyName>Implab</AssemblyName>
10 </PropertyGroup>
10 </PropertyGroup>
11 <PropertyGroup Condition=" '$(Configuration)|$(Platform)' == 'Debug|AnyCPU' ">
11 <PropertyGroup Condition=" '$(Configuration)|$(Platform)' == 'Debug|AnyCPU' ">
12 <DebugSymbols>true</DebugSymbols>
12 <DebugSymbols>true</DebugSymbols>
13 <DebugType>full</DebugType>
13 <DebugType>full</DebugType>
14 <Optimize>false</Optimize>
14 <Optimize>false</Optimize>
15 <OutputPath>bin\Debug</OutputPath>
15 <OutputPath>bin\Debug</OutputPath>
16 <DefineConstants>TRACE;DEBUG;</DefineConstants>
16 <DefineConstants>TRACE;DEBUG;</DefineConstants>
17 <ErrorReport>prompt</ErrorReport>
17 <ErrorReport>prompt</ErrorReport>
18 <WarningLevel>4</WarningLevel>
18 <WarningLevel>4</WarningLevel>
19 <ConsolePause>false</ConsolePause>
19 <ConsolePause>false</ConsolePause>
20 <RunCodeAnalysis>true</RunCodeAnalysis>
20 <RunCodeAnalysis>true</RunCodeAnalysis>
21 </PropertyGroup>
21 </PropertyGroup>
22 <PropertyGroup Condition=" '$(Configuration)|$(Platform)' == 'Release|AnyCPU' ">
22 <PropertyGroup Condition=" '$(Configuration)|$(Platform)' == 'Release|AnyCPU' ">
23 <DebugType>full</DebugType>
23 <DebugType>full</DebugType>
24 <Optimize>true</Optimize>
24 <Optimize>true</Optimize>
25 <OutputPath>bin\Release</OutputPath>
25 <OutputPath>bin\Release</OutputPath>
26 <ErrorReport>prompt</ErrorReport>
26 <ErrorReport>prompt</ErrorReport>
27 <WarningLevel>4</WarningLevel>
27 <WarningLevel>4</WarningLevel>
28 <ConsolePause>false</ConsolePause>
28 <ConsolePause>false</ConsolePause>
29 </PropertyGroup>
29 </PropertyGroup>
30 <PropertyGroup Condition=" '$(Configuration)|$(Platform)' == 'Debug 4.5|AnyCPU' ">
30 <PropertyGroup Condition=" '$(Configuration)|$(Platform)' == 'Debug 4.5|AnyCPU' ">
31 <DebugSymbols>true</DebugSymbols>
31 <DebugSymbols>true</DebugSymbols>
32 <DebugType>full</DebugType>
32 <DebugType>full</DebugType>
33 <Optimize>false</Optimize>
33 <Optimize>false</Optimize>
34 <OutputPath>bin\Debug</OutputPath>
34 <OutputPath>bin\Debug</OutputPath>
35 <DefineConstants>TRACE;DEBUG;NET_4_5</DefineConstants>
35 <DefineConstants>TRACE;DEBUG;NET_4_5</DefineConstants>
36 <ErrorReport>prompt</ErrorReport>
36 <ErrorReport>prompt</ErrorReport>
37 <WarningLevel>4</WarningLevel>
37 <WarningLevel>4</WarningLevel>
38 <RunCodeAnalysis>true</RunCodeAnalysis>
38 <RunCodeAnalysis>true</RunCodeAnalysis>
39 <ConsolePause>false</ConsolePause>
39 <ConsolePause>false</ConsolePause>
40 </PropertyGroup>
40 </PropertyGroup>
41 <PropertyGroup Condition=" '$(Configuration)|$(Platform)' == 'Release 4.5|AnyCPU' ">
41 <PropertyGroup Condition=" '$(Configuration)|$(Platform)' == 'Release 4.5|AnyCPU' ">
42 <Optimize>true</Optimize>
42 <Optimize>true</Optimize>
43 <OutputPath>bin\Release</OutputPath>
43 <OutputPath>bin\Release</OutputPath>
44 <ErrorReport>prompt</ErrorReport>
44 <ErrorReport>prompt</ErrorReport>
45 <WarningLevel>4</WarningLevel>
45 <WarningLevel>4</WarningLevel>
46 <ConsolePause>false</ConsolePause>
46 <ConsolePause>false</ConsolePause>
47 <DefineConstants>NET_4_5</DefineConstants>
47 <DefineConstants>NET_4_5</DefineConstants>
48 </PropertyGroup>
48 </PropertyGroup>
49 <PropertyGroup Condition=" '$(Configuration)|$(Platform)' == 'DebugMono|AnyCPU' ">
49 <PropertyGroup Condition=" '$(Configuration)|$(Platform)' == 'DebugMono|AnyCPU' ">
50 <DebugSymbols>true</DebugSymbols>
50 <DebugSymbols>true</DebugSymbols>
51 <DebugType>full</DebugType>
51 <DebugType>full</DebugType>
52 <Optimize>false</Optimize>
52 <Optimize>false</Optimize>
53 <OutputPath>bin\Debug</OutputPath>
53 <OutputPath>bin\Debug</OutputPath>
54 <DefineConstants>TRACE;DEBUG;NET_4_5;MONO</DefineConstants>
54 <DefineConstants>TRACE;DEBUG;NET_4_5;MONO</DefineConstants>
55 <ErrorReport>prompt</ErrorReport>
55 <ErrorReport>prompt</ErrorReport>
56 <WarningLevel>4</WarningLevel>
56 <WarningLevel>4</WarningLevel>
57 <RunCodeAnalysis>true</RunCodeAnalysis>
57 <RunCodeAnalysis>true</RunCodeAnalysis>
58 <ConsolePause>false</ConsolePause>
58 <ConsolePause>false</ConsolePause>
59 </PropertyGroup>
59 </PropertyGroup>
60 <PropertyGroup Condition=" '$(Configuration)|$(Platform)' == 'ReleaseMono|AnyCPU' ">
60 <PropertyGroup Condition=" '$(Configuration)|$(Platform)' == 'ReleaseMono|AnyCPU' ">
61 <Optimize>true</Optimize>
61 <Optimize>true</Optimize>
62 <OutputPath>bin\Release</OutputPath>
62 <OutputPath>bin\Release</OutputPath>
63 <DefineConstants>NET_4_5;MONO;</DefineConstants>
63 <DefineConstants>NET_4_5;MONO;</DefineConstants>
64 <ErrorReport>prompt</ErrorReport>
64 <ErrorReport>prompt</ErrorReport>
65 <WarningLevel>4</WarningLevel>
65 <WarningLevel>4</WarningLevel>
66 <ConsolePause>false</ConsolePause>
66 <ConsolePause>false</ConsolePause>
67 </PropertyGroup>
67 </PropertyGroup>
68 <ItemGroup>
68 <ItemGroup>
69 <Reference Include="System" />
69 <Reference Include="System" />
70 <Reference Include="System.Xml" />
70 <Reference Include="System.Xml" />
71 </ItemGroup>
71 </ItemGroup>
72 <ItemGroup>
72 <ItemGroup>
73 <Compile Include="Component.cs" />
73 <Compile Include="Component.cs" />
74 <Compile Include="CustomEqualityComparer.cs" />
74 <Compile Include="CustomEqualityComparer.cs" />
75 <Compile Include="Diagnostics\ConsoleTraceListener.cs" />
75 <Compile Include="Diagnostics\ConsoleTraceListener.cs" />
76 <Compile Include="Diagnostics\EventText.cs" />
76 <Compile Include="Diagnostics\EventText.cs" />
77 <Compile Include="Diagnostics\LogChannel.cs" />
77 <Compile Include="Diagnostics\LogChannel.cs" />
78 <Compile Include="Diagnostics\LogicalOperation.cs" />
78 <Compile Include="Diagnostics\LogicalOperation.cs" />
79 <Compile Include="Diagnostics\TextFileListener.cs" />
79 <Compile Include="Diagnostics\TextFileListener.cs" />
80 <Compile Include="Diagnostics\TraceLog.cs" />
80 <Compile Include="Diagnostics\TraceLog.cs" />
81 <Compile Include="Diagnostics\TraceEvent.cs" />
81 <Compile Include="Diagnostics\TraceEvent.cs" />
82 <Compile Include="Diagnostics\TraceEventType.cs" />
82 <Compile Include="Diagnostics\TraceEventType.cs" />
83 <Compile Include="Disposable.cs" />
83 <Compile Include="Disposable.cs" />
84 <Compile Include="ICancellable.cs" />
84 <Compile Include="ICancellable.cs" />
85 <Compile Include="IProgressHandler.cs" />
85 <Compile Include="IProgressHandler.cs" />
86 <Compile Include="IProgressNotifier.cs" />
86 <Compile Include="IProgressNotifier.cs" />
87 <Compile Include="IPromiseT.cs" />
87 <Compile Include="IPromiseT.cs" />
88 <Compile Include="IPromise.cs" />
88 <Compile Include="IPromise.cs" />
89 <Compile Include="IServiceLocator.cs" />
89 <Compile Include="IServiceLocator.cs" />
90 <Compile Include="ITaskController.cs" />
90 <Compile Include="ITaskController.cs" />
91 <Compile Include="JSON\JSONElementContext.cs" />
91 <Compile Include="JSON\JSONElementContext.cs" />
92 <Compile Include="JSON\JSONElementType.cs" />
92 <Compile Include="JSON\JSONElementType.cs" />
93 <Compile Include="JSON\JSONGrammar.cs" />
93 <Compile Include="JSON\JSONGrammar.cs" />
94 <Compile Include="JSON\JSONParser.cs" />
94 <Compile Include="JSON\JSONParser.cs" />
95 <Compile Include="JSON\JSONScanner.cs" />
95 <Compile Include="JSON\JSONScanner.cs" />
96 <Compile Include="JSON\JsonTokenType.cs" />
96 <Compile Include="JSON\JsonTokenType.cs" />
97 <Compile Include="JSON\JSONWriter.cs" />
97 <Compile Include="JSON\JSONWriter.cs" />
98 <Compile Include="JSON\JSONXmlReader.cs" />
98 <Compile Include="JSON\JSONXmlReader.cs" />
99 <Compile Include="JSON\JSONXmlReaderOptions.cs" />
99 <Compile Include="JSON\JSONXmlReaderOptions.cs" />
100 <Compile Include="JSON\StringTranslator.cs" />
100 <Compile Include="JSON\StringTranslator.cs" />
101 <Compile Include="Parallels\DispatchPool.cs" />
101 <Compile Include="Parallels\DispatchPool.cs" />
102 <Compile Include="Parallels\ArrayTraits.cs" />
102 <Compile Include="Parallels\ArrayTraits.cs" />
103 <Compile Include="Parallels\MTQueue.cs" />
103 <Compile Include="Parallels\MTQueue.cs" />
104 <Compile Include="Parallels\WorkerPool.cs" />
104 <Compile Include="Parallels\WorkerPool.cs" />
105 <Compile Include="Parsing\Alphabet.cs" />
105 <Compile Include="Parsing\Alphabet.cs" />
106 <Compile Include="Parsing\AlphabetBase.cs" />
106 <Compile Include="Parsing\AlphabetBase.cs" />
107 <Compile Include="Parsing\AltToken.cs" />
107 <Compile Include="Parsing\AltToken.cs" />
108 <Compile Include="Parsing\BinaryToken.cs" />
108 <Compile Include="Parsing\BinaryToken.cs" />
109 <Compile Include="Parsing\CatToken.cs" />
109 <Compile Include="Parsing\CatToken.cs" />
110 <Compile Include="Parsing\CDFADefinition.cs" />
110 <Compile Include="Parsing\CDFADefinition.cs" />
111 <Compile Include="Parsing\DFABuilder.cs" />
111 <Compile Include="Parsing\DFABuilder.cs" />
112 <Compile Include="Parsing\DFADefinitionBase.cs" />
112 <Compile Include="Parsing\DFADefinitionBase.cs" />
113 <Compile Include="Parsing\DFAStateDescriptor.cs" />
113 <Compile Include="Parsing\DFAStateDescriptor.cs" />
114 <Compile Include="Parsing\DFAutomaton.cs" />
114 <Compile Include="Parsing\DFAutomaton.cs" />
115 <Compile Include="Parsing\EDFADefinition.cs" />
115 <Compile Include="Parsing\EDFADefinition.cs" />
116 <Compile Include="Parsing\EmptyToken.cs" />
116 <Compile Include="Parsing\EmptyToken.cs" />
117 <Compile Include="Parsing\EndToken.cs" />
117 <Compile Include="Parsing\EndToken.cs" />
118 <Compile Include="Parsing\EnumAlphabet.cs" />
118 <Compile Include="Parsing\EnumAlphabet.cs" />
119 <Compile Include="Parsing\Grammar.cs" />
119 <Compile Include="Parsing\Grammar.cs" />
120 <Compile Include="Parsing\IAlphabet.cs" />
120 <Compile Include="Parsing\IAlphabet.cs" />
121 <Compile Include="Parsing\IDFADefinition.cs" />
121 <Compile Include="Parsing\IDFADefinition.cs" />
122 <Compile Include="Parsing\IVisitor.cs" />
122 <Compile Include="Parsing\IVisitor.cs" />
123 <Compile Include="Parsing\ParserException.cs" />
123 <Compile Include="Parsing\ParserException.cs" />
124 <Compile Include="Parsing\Scanner.cs" />
124 <Compile Include="Parsing\Scanner.cs" />
125 <Compile Include="Parsing\StarToken.cs" />
125 <Compile Include="Parsing\StarToken.cs" />
126 <Compile Include="Parsing\SymbolToken.cs" />
126 <Compile Include="Parsing\SymbolToken.cs" />
127 <Compile Include="Parsing\Token.cs" />
127 <Compile Include="Parsing\Token.cs" />
128 <Compile Include="ServiceLocator.cs" />
128 <Compile Include="ServiceLocator.cs" />
129 <Compile Include="TaskController.cs" />
129 <Compile Include="TaskController.cs" />
130 <Compile Include="ProgressInitEventArgs.cs" />
130 <Compile Include="ProgressInitEventArgs.cs" />
131 <Compile Include="Properties\AssemblyInfo.cs" />
131 <Compile Include="Properties\AssemblyInfo.cs" />
132 <Compile Include="Parallels\AsyncPool.cs" />
132 <Compile Include="Parallels\AsyncPool.cs" />
133 <Compile Include="Safe.cs" />
133 <Compile Include="Safe.cs" />
134 <Compile Include="ValueEventArgs.cs" />
134 <Compile Include="ValueEventArgs.cs" />
135 <Compile Include="PromiseExtensions.cs" />
135 <Compile Include="PromiseExtensions.cs" />
136 <Compile Include="SyncContextPromise.cs" />
136 <Compile Include="SyncContextPromise.cs" />
137 <Compile Include="Diagnostics\OperationContext.cs" />
137 <Compile Include="Diagnostics\OperationContext.cs" />
138 <Compile Include="Diagnostics\TraceContext.cs" />
138 <Compile Include="Diagnostics\TraceContext.cs" />
139 <Compile Include="Diagnostics\LogEventArgs.cs" />
139 <Compile Include="Diagnostics\LogEventArgs.cs" />
140 <Compile Include="Diagnostics\LogEventArgsT.cs" />
140 <Compile Include="Diagnostics\LogEventArgsT.cs" />
141 <Compile Include="Diagnostics\Extensions.cs" />
141 <Compile Include="Diagnostics\Extensions.cs" />
142 <Compile Include="IComponentContainer.cs" />
142 <Compile Include="IComponentContainer.cs" />
143 <Compile Include="PromiseEventType.cs" />
143 <Compile Include="PromiseEventType.cs" />
144 <Compile Include="ComponentContainer.cs" />
144 <Compile Include="ComponentContainer.cs" />
145 <Compile Include="DisposablePool.cs" />
145 <Compile Include="DisposablePool.cs" />
146 <Compile Include="ObjectPool.cs" />
146 <Compile Include="ObjectPool.cs" />
147 <Compile Include="Parallels\AsyncQueue.cs" />
147 <Compile Include="Parallels\AsyncQueue.cs" />
148 <Compile Include="PromiseT.cs" />
148 <Compile Include="PromiseT.cs" />
149 <Compile Include="IDeferred.cs" />
149 <Compile Include="IDeferred.cs" />
150 <Compile Include="IDeferredT.cs" />
150 <Compile Include="IDeferredT.cs" />
151 <Compile Include="AbstractPromise.cs" />
151 <Compile Include="AbstractPromise.cs" />
152 <Compile Include="Promise.cs" />
152 <Compile Include="Promise.cs" />
153 <Compile Include="PromiseTransientException.cs" />
153 <Compile Include="PromiseTransientException.cs" />
154 <Compile Include="Parallels\Signal.cs" />
154 <Compile Include="Parallels\Signal.cs" />
155 <Compile Include="Parallels\SharedLock.cs" />
155 <Compile Include="Parallels\SharedLock.cs" />
156 <Compile Include="Diagnostics\ILogWriter.cs" />
156 <Compile Include="Diagnostics\ILogWriter.cs" />
157 <Compile Include="Diagnostics\ListenerBase.cs" />
157 <Compile Include="Diagnostics\ListenerBase.cs" />
158 <Compile Include="Parallels\BlockingQueue.cs" />
158 </ItemGroup>
159 </ItemGroup>
159 <Import Project="$(MSBuildBinPath)\Microsoft.CSharp.targets" />
160 <Import Project="$(MSBuildBinPath)\Microsoft.CSharp.targets" />
160 <ItemGroup />
161 <ItemGroup />
161 <ProjectExtensions>
162 <ProjectExtensions>
162 <MonoDevelop>
163 <MonoDevelop>
163 <Properties>
164 <Properties>
164 <Policies>
165 <Policies>
165 <CSharpFormattingPolicy IndentSwitchBody="True" NamespaceBraceStyle="EndOfLine" ClassBraceStyle="EndOfLine" InterfaceBraceStyle="EndOfLine" StructBraceStyle="EndOfLine" EnumBraceStyle="EndOfLine" MethodBraceStyle="EndOfLine" ConstructorBraceStyle="EndOfLine" DestructorBraceStyle="EndOfLine" BeforeMethodDeclarationParentheses="False" BeforeMethodCallParentheses="False" BeforeConstructorDeclarationParentheses="False" NewLineBeforeConstructorInitializerColon="NewLine" NewLineAfterConstructorInitializerColon="SameLine" BeforeIndexerDeclarationBracket="False" BeforeDelegateDeclarationParentheses="False" NewParentheses="False" SpacesBeforeBrackets="False" inheritsSet="Mono" inheritsScope="text/x-csharp" scope="text/x-csharp" />
166 <CSharpFormattingPolicy IndentSwitchBody="True" NamespaceBraceStyle="EndOfLine" ClassBraceStyle="EndOfLine" InterfaceBraceStyle="EndOfLine" StructBraceStyle="EndOfLine" EnumBraceStyle="EndOfLine" MethodBraceStyle="EndOfLine" ConstructorBraceStyle="EndOfLine" DestructorBraceStyle="EndOfLine" BeforeMethodDeclarationParentheses="False" BeforeMethodCallParentheses="False" BeforeConstructorDeclarationParentheses="False" NewLineBeforeConstructorInitializerColon="NewLine" NewLineAfterConstructorInitializerColon="SameLine" BeforeIndexerDeclarationBracket="False" BeforeDelegateDeclarationParentheses="False" NewParentheses="False" SpacesBeforeBrackets="False" inheritsSet="Mono" inheritsScope="text/x-csharp" scope="text/x-csharp" />
166 <TextStylePolicy FileWidth="120" EolMarker="Unix" inheritsSet="VisualStudio" inheritsScope="text/plain" scope="text/x-csharp" />
167 <TextStylePolicy FileWidth="120" EolMarker="Unix" inheritsSet="VisualStudio" inheritsScope="text/plain" scope="text/x-csharp" />
167 <DotNetNamingPolicy DirectoryNamespaceAssociation="PrefixedHierarchical" ResourceNamePolicy="MSBuild" />
168 <DotNetNamingPolicy DirectoryNamespaceAssociation="PrefixedHierarchical" ResourceNamePolicy="MSBuild" />
168 <TextStylePolicy FileWidth="120" TabsToSpaces="False" inheritsSet="VisualStudio" inheritsScope="text/plain" scope="application/xml" />
169 <TextStylePolicy FileWidth="120" TabsToSpaces="False" inheritsSet="VisualStudio" inheritsScope="text/plain" scope="application/xml" />
169 <XmlFormattingPolicy inheritsSet="Mono" inheritsScope="application/xml" scope="application/xml" />
170 <XmlFormattingPolicy inheritsSet="Mono" inheritsScope="application/xml" scope="application/xml" />
170 <TextStylePolicy FileWidth="120" TabsToSpaces="False" inheritsSet="VisualStudio" inheritsScope="text/plain" scope="text/plain" />
171 <TextStylePolicy FileWidth="120" TabsToSpaces="False" inheritsSet="VisualStudio" inheritsScope="text/plain" scope="text/plain" />
171 <NameConventionPolicy>
172 <NameConventionPolicy>
172 <Rules>
173 <Rules>
173 <NamingRule Name="Namespaces" AffectedEntity="Namespace" VisibilityMask="VisibilityMask" NamingStyle="PascalCase" IncludeInstanceMembers="True" IncludeStaticEntities="True" />
174 <NamingRule Name="Namespaces" AffectedEntity="Namespace" VisibilityMask="VisibilityMask" NamingStyle="PascalCase" IncludeInstanceMembers="True" IncludeStaticEntities="True" />
174 <NamingRule Name="Types" AffectedEntity="Class, Struct, Enum, Delegate" VisibilityMask="VisibilityMask" NamingStyle="PascalCase" IncludeInstanceMembers="True" IncludeStaticEntities="True" />
175 <NamingRule Name="Types" AffectedEntity="Class, Struct, Enum, Delegate" VisibilityMask="VisibilityMask" NamingStyle="PascalCase" IncludeInstanceMembers="True" IncludeStaticEntities="True" />
175 <NamingRule Name="Interfaces" AffectedEntity="Interface" VisibilityMask="VisibilityMask" NamingStyle="PascalCase" IncludeInstanceMembers="True" IncludeStaticEntities="True">
176 <NamingRule Name="Interfaces" AffectedEntity="Interface" VisibilityMask="VisibilityMask" NamingStyle="PascalCase" IncludeInstanceMembers="True" IncludeStaticEntities="True">
176 <RequiredPrefixes>
177 <RequiredPrefixes>
177 <String>I</String>
178 <String>I</String>
178 </RequiredPrefixes>
179 </RequiredPrefixes>
179 </NamingRule>
180 </NamingRule>
180 <NamingRule Name="Attributes" AffectedEntity="CustomAttributes" VisibilityMask="VisibilityMask" NamingStyle="PascalCase" IncludeInstanceMembers="True" IncludeStaticEntities="True">
181 <NamingRule Name="Attributes" AffectedEntity="CustomAttributes" VisibilityMask="VisibilityMask" NamingStyle="PascalCase" IncludeInstanceMembers="True" IncludeStaticEntities="True">
181 <RequiredSuffixes>
182 <RequiredSuffixes>
182 <String>Attribute</String>
183 <String>Attribute</String>
183 </RequiredSuffixes>
184 </RequiredSuffixes>
184 </NamingRule>
185 </NamingRule>
185 <NamingRule Name="Event Arguments" AffectedEntity="CustomEventArgs" VisibilityMask="VisibilityMask" NamingStyle="PascalCase" IncludeInstanceMembers="True" IncludeStaticEntities="True">
186 <NamingRule Name="Event Arguments" AffectedEntity="CustomEventArgs" VisibilityMask="VisibilityMask" NamingStyle="PascalCase" IncludeInstanceMembers="True" IncludeStaticEntities="True">
186 <RequiredSuffixes>
187 <RequiredSuffixes>
187 <String>EventArgs</String>
188 <String>EventArgs</String>
188 </RequiredSuffixes>
189 </RequiredSuffixes>
189 </NamingRule>
190 </NamingRule>
190 <NamingRule Name="Exceptions" AffectedEntity="CustomExceptions" VisibilityMask="VisibilityMask" NamingStyle="PascalCase" IncludeInstanceMembers="True" IncludeStaticEntities="True">
191 <NamingRule Name="Exceptions" AffectedEntity="CustomExceptions" VisibilityMask="VisibilityMask" NamingStyle="PascalCase" IncludeInstanceMembers="True" IncludeStaticEntities="True">
191 <RequiredSuffixes>
192 <RequiredSuffixes>
192 <String>Exception</String>
193 <String>Exception</String>
193 </RequiredSuffixes>
194 </RequiredSuffixes>
194 </NamingRule>
195 </NamingRule>
195 <NamingRule Name="Methods" AffectedEntity="Methods" VisibilityMask="VisibilityMask" NamingStyle="PascalCase" IncludeInstanceMembers="True" IncludeStaticEntities="True" />
196 <NamingRule Name="Methods" AffectedEntity="Methods" VisibilityMask="VisibilityMask" NamingStyle="PascalCase" IncludeInstanceMembers="True" IncludeStaticEntities="True" />
196 <NamingRule Name="Static Readonly Fields" AffectedEntity="ReadonlyField" VisibilityMask="Internal, Protected, Public" NamingStyle="CamelCase" IncludeInstanceMembers="False" IncludeStaticEntities="True" />
197 <NamingRule Name="Static Readonly Fields" AffectedEntity="ReadonlyField" VisibilityMask="Internal, Protected, Public" NamingStyle="CamelCase" IncludeInstanceMembers="False" IncludeStaticEntities="True" />
197 <NamingRule Name="Fields (Non Private)" AffectedEntity="Field" VisibilityMask="Internal, Public" NamingStyle="CamelCase" IncludeInstanceMembers="True" IncludeStaticEntities="True" />
198 <NamingRule Name="Fields (Non Private)" AffectedEntity="Field" VisibilityMask="Internal, Public" NamingStyle="CamelCase" IncludeInstanceMembers="True" IncludeStaticEntities="True" />
198 <NamingRule Name="ReadOnly Fields (Non Private)" AffectedEntity="ReadonlyField" VisibilityMask="Internal, Public" NamingStyle="CamelCase" IncludeInstanceMembers="True" IncludeStaticEntities="False" />
199 <NamingRule Name="ReadOnly Fields (Non Private)" AffectedEntity="ReadonlyField" VisibilityMask="Internal, Public" NamingStyle="CamelCase" IncludeInstanceMembers="True" IncludeStaticEntities="False" />
199 <NamingRule Name="Fields (Private)" AffectedEntity="Field, ReadonlyField" VisibilityMask="Private, Protected" NamingStyle="CamelCase" IncludeInstanceMembers="True" IncludeStaticEntities="False">
200 <NamingRule Name="Fields (Private)" AffectedEntity="Field, ReadonlyField" VisibilityMask="Private, Protected" NamingStyle="CamelCase" IncludeInstanceMembers="True" IncludeStaticEntities="False">
200 <RequiredPrefixes>
201 <RequiredPrefixes>
201 <String>m_</String>
202 <String>m_</String>
202 </RequiredPrefixes>
203 </RequiredPrefixes>
203 </NamingRule>
204 </NamingRule>
204 <NamingRule Name="Static Fields (Private)" AffectedEntity="Field" VisibilityMask="Private" NamingStyle="CamelCase" IncludeInstanceMembers="False" IncludeStaticEntities="True">
205 <NamingRule Name="Static Fields (Private)" AffectedEntity="Field" VisibilityMask="Private" NamingStyle="CamelCase" IncludeInstanceMembers="False" IncludeStaticEntities="True">
205 <RequiredPrefixes>
206 <RequiredPrefixes>
206 <String>_</String>
207 <String>_</String>
207 </RequiredPrefixes>
208 </RequiredPrefixes>
208 </NamingRule>
209 </NamingRule>
209 <NamingRule Name="ReadOnly Fields (Private)" AffectedEntity="ReadonlyField" VisibilityMask="Private, Protected" NamingStyle="CamelCase" IncludeInstanceMembers="True" IncludeStaticEntities="False">
210 <NamingRule Name="ReadOnly Fields (Private)" AffectedEntity="ReadonlyField" VisibilityMask="Private, Protected" NamingStyle="CamelCase" IncludeInstanceMembers="True" IncludeStaticEntities="False">
210 <RequiredPrefixes>
211 <RequiredPrefixes>
211 <String>m_</String>
212 <String>m_</String>
212 </RequiredPrefixes>
213 </RequiredPrefixes>
213 </NamingRule>
214 </NamingRule>
214 <NamingRule Name="Constant Fields" AffectedEntity="ConstantField" VisibilityMask="VisibilityMask" NamingStyle="AllUpper" IncludeInstanceMembers="True" IncludeStaticEntities="True" />
215 <NamingRule Name="Constant Fields" AffectedEntity="ConstantField" VisibilityMask="VisibilityMask" NamingStyle="AllUpper" IncludeInstanceMembers="True" IncludeStaticEntities="True" />
215 <NamingRule Name="Properties" AffectedEntity="Property" VisibilityMask="VisibilityMask" NamingStyle="PascalCase" IncludeInstanceMembers="True" IncludeStaticEntities="True" />
216 <NamingRule Name="Properties" AffectedEntity="Property" VisibilityMask="VisibilityMask" NamingStyle="PascalCase" IncludeInstanceMembers="True" IncludeStaticEntities="True" />
216 <NamingRule Name="Events" AffectedEntity="Event" VisibilityMask="VisibilityMask" NamingStyle="PascalCase" IncludeInstanceMembers="True" IncludeStaticEntities="True" />
217 <NamingRule Name="Events" AffectedEntity="Event" VisibilityMask="VisibilityMask" NamingStyle="PascalCase" IncludeInstanceMembers="True" IncludeStaticEntities="True" />
217 <NamingRule Name="Enum Members" AffectedEntity="EnumMember" VisibilityMask="VisibilityMask" NamingStyle="PascalCase" IncludeInstanceMembers="True" IncludeStaticEntities="True" />
218 <NamingRule Name="Enum Members" AffectedEntity="EnumMember" VisibilityMask="VisibilityMask" NamingStyle="PascalCase" IncludeInstanceMembers="True" IncludeStaticEntities="True" />
218 <NamingRule Name="Parameters" AffectedEntity="Parameter, LocalVariable" VisibilityMask="VisibilityMask" NamingStyle="CamelCase" IncludeInstanceMembers="True" IncludeStaticEntities="True" />
219 <NamingRule Name="Parameters" AffectedEntity="Parameter, LocalVariable" VisibilityMask="VisibilityMask" NamingStyle="CamelCase" IncludeInstanceMembers="True" IncludeStaticEntities="True" />
219 <NamingRule Name="Type Parameters" AffectedEntity="TypeParameter" VisibilityMask="VisibilityMask" NamingStyle="PascalCase" IncludeInstanceMembers="True" IncludeStaticEntities="True">
220 <NamingRule Name="Type Parameters" AffectedEntity="TypeParameter" VisibilityMask="VisibilityMask" NamingStyle="PascalCase" IncludeInstanceMembers="True" IncludeStaticEntities="True">
220 <RequiredPrefixes>
221 <RequiredPrefixes>
221 <String>T</String>
222 <String>T</String>
222 </RequiredPrefixes>
223 </RequiredPrefixes>
223 </NamingRule>
224 </NamingRule>
224 </Rules>
225 </Rules>
225 </NameConventionPolicy>
226 </NameConventionPolicy>
226 </Policies>
227 </Policies>
227 </Properties>
228 </Properties>
228 </MonoDevelop>
229 </MonoDevelop>
229 </ProjectExtensions>
230 </ProjectExtensions>
230 </Project> No newline at end of file
231 </Project>
@@ -1,631 +1,631
1 using System.Threading;
1 using System.Threading;
2 using System.Collections.Generic;
2 using System.Collections.Generic;
3 using System;
3 using System;
4 using System.Collections;
4 using System.Collections;
5 using System.Diagnostics;
5 using System.Diagnostics;
6
6
7 namespace Implab.Parallels {
7 namespace Implab.Parallels {
8 public class AsyncQueue<T> : IEnumerable<T> {
8 public class AsyncQueue<T> : IEnumerable<T> {
9 class Chunk {
9 class Chunk {
10 public Chunk next;
10 public Chunk next;
11
11
12 int m_low;
12 int m_low;
13 int m_hi;
13 int m_hi;
14 int m_alloc;
14 int m_alloc;
15 readonly int m_size;
15 readonly int m_size;
16 readonly T[] m_data;
16 readonly T[] m_data;
17
17
18 public Chunk(int size) {
18 public Chunk(int size) {
19 m_size = size;
19 m_size = size;
20 m_data = new T[size];
20 m_data = new T[size];
21 }
21 }
22
22
23 public Chunk(int size, T value) {
23 public Chunk(int size, T value) {
24 m_size = size;
24 m_size = size;
25 m_hi = 1;
25 m_hi = 1;
26 m_alloc = 1;
26 m_alloc = 1;
27 m_data = new T[size];
27 m_data = new T[size];
28 m_data[0] = value;
28 m_data[0] = value;
29 }
29 }
30
30
31 public Chunk(int size, T[] data, int offset, int length, int alloc) {
31 public Chunk(int size, T[] data, int offset, int length, int alloc) {
32 m_size = size;
32 m_size = size;
33 m_hi = length;
33 m_hi = length;
34 m_alloc = alloc;
34 m_alloc = alloc;
35 m_data = new T[size];
35 m_data = new T[size];
36 Array.Copy(data, offset, m_data, 0, length);
36 Array.Copy(data, offset, m_data, 0, length);
37 }
37 }
38
38
39 public int Low {
39 public int Low {
40 get { return m_low; }
40 get { return m_low; }
41 }
41 }
42
42
43 public int Hi {
43 public int Hi {
44 get { return m_hi; }
44 get { return m_hi; }
45 }
45 }
46
46
47 public int Size {
47 public int Size {
48 get { return m_size; }
48 get { return m_size; }
49 }
49 }
50
50
51 public bool TryEnqueue(T value, out bool extend) {
51 public bool TryEnqueue(T value, out bool extend) {
52 var alloc = Interlocked.Increment(ref m_alloc) - 1;
52 var alloc = Interlocked.Increment(ref m_alloc) - 1;
53
53
54 if (alloc >= m_size) {
54 if (alloc >= m_size) {
55 extend = alloc == m_size;
55 extend = alloc == m_size;
56 return false;
56 return false;
57 }
57 }
58
58
59 extend = false;
59 extend = false;
60 m_data[alloc] = value;
60 m_data[alloc] = value;
61
61
62 while (alloc != Interlocked.CompareExchange(ref m_hi, alloc + 1, alloc)) {
62 while (alloc != Interlocked.CompareExchange(ref m_hi, alloc + 1, alloc)) {
63 // spin wait for commit
63 // spin wait for commit
64 }
64 }
65 return true;
65 return true;
66 }
66 }
67
67
68 /// <summary>
68 /// <summary>
69 /// Prevents from allocating new space in the chunk and waits for all write operations to complete
69 /// Prevents from allocating new space in the chunk and waits for all write operations to complete
70 /// </summary>
70 /// </summary>
71 public void Commit() {
71 public void Commit() {
72 var actual = Math.Min(Interlocked.Exchange(ref m_alloc, m_size + 1), m_size);
72 var actual = Math.Min(Interlocked.Exchange(ref m_alloc, m_size + 1), m_size);
73
73
74 while (m_hi != actual)
74 while (m_hi != actual)
75 Thread.MemoryBarrier();
75 Thread.MemoryBarrier();
76 }
76 }
77
77
78 public bool TryDequeue(out T value, out bool recycle) {
78 public bool TryDequeue(out T value, out bool recycle) {
79 int low;
79 int low;
80 do {
80 do {
81 low = m_low;
81 low = m_low;
82 if (low >= m_hi) {
82 if (low >= m_hi) {
83 value = default(T);
83 value = default(T);
84 recycle = (low == m_size);
84 recycle = (low == m_size);
85 return false;
85 return false;
86 }
86 }
87 } while(low != Interlocked.CompareExchange(ref m_low, low + 1, low));
87 } while(low != Interlocked.CompareExchange(ref m_low, low + 1, low));
88
88
89 recycle = (low == m_size - 1);
89 recycle = (low == m_size - 1);
90 value = m_data[low];
90 value = m_data[low];
91
91
92 return true;
92 return true;
93 }
93 }
94
94
95 public bool TryEnqueueBatch(T[] batch, int offset, int length, out int enqueued, out bool extend) {
95 public bool TryEnqueueBatch(T[] batch, int offset, int length, out int enqueued, out bool extend) {
96 //int alloc;
96 //int alloc;
97 //int allocSize;
97 //int allocSize;
98
98
99 var alloc = Interlocked.Add(ref m_alloc, length) - length;
99 var alloc = Interlocked.Add(ref m_alloc, length) - length;
100 if (alloc > m_size) {
100 if (alloc > m_size) {
101 // the chunk is full and someone already
101 // the chunk is full and someone already
102 // creating the new one
102 // creating the new one
103 enqueued = 0; // nothing was added
103 enqueued = 0; // nothing was added
104 extend = false; // the caller shouldn't try to extend the queue
104 extend = false; // the caller shouldn't try to extend the queue
105 return false; // nothing was added
105 return false; // nothing was added
106 }
106 }
107
107
108 enqueued = Math.Min(m_size - alloc, length);
108 enqueued = Math.Min(m_size - alloc, length);
109 extend = length > enqueued;
109 extend = length > enqueued;
110
110
111 if (enqueued == 0)
111 if (enqueued == 0)
112 return false;
112 return false;
113
113
114
114
115 Array.Copy(batch, offset, m_data, alloc, enqueued);
115 Array.Copy(batch, offset, m_data, alloc, enqueued);
116
116
117 while (alloc != Interlocked.CompareExchange(ref m_hi, alloc + enqueued, alloc)) {
117 while (alloc != Interlocked.CompareExchange(ref m_hi, alloc + enqueued, alloc)) {
118 // spin wait for commit
118 // spin wait for commit
119 }
119 }
120
120
121 return true;
121 return true;
122 }
122 }
123
123
124 public bool TryDequeueBatch(T[] buffer, int offset, int length,out int dequeued, out bool recycle) {
124 public bool TryDequeueBatch(T[] buffer, int offset, int length,out int dequeued, out bool recycle) {
125 int low, hi, batchSize;
125 int low, hi, batchSize;
126
126
127 do {
127 do {
128 low = m_low;
128 low = m_low;
129 hi = m_hi;
129 hi = m_hi;
130 if (low >= hi) {
130 if (low >= hi) {
131 dequeued = 0;
131 dequeued = 0;
132 recycle = (low == m_size); // recycling could be restarted and we need to signal again
132 recycle = (low == m_size); // recycling could be restarted and we need to signal again
133 return false;
133 return false;
134 }
134 }
135 batchSize = Math.Min(hi - low, length);
135 batchSize = Math.Min(hi - low, length);
136 } while(low != Interlocked.CompareExchange(ref m_low, low + batchSize, low));
136 } while(low != Interlocked.CompareExchange(ref m_low, low + batchSize, low));
137
137
138 recycle = (low == m_size - batchSize);
138 recycle = (low == m_size - batchSize);
139 dequeued = batchSize;
139 dequeued = batchSize;
140
140
141 Array.Copy(m_data, low, buffer, offset, batchSize);
141 Array.Copy(m_data, low, buffer, offset, batchSize);
142
142
143 return true;
143 return true;
144 }
144 }
145
145
146 public T GetAt(int pos) {
146 public T GetAt(int pos) {
147 return m_data[pos];
147 return m_data[pos];
148 }
148 }
149 }
149 }
150
150
151 public const int DEFAULT_CHUNK_SIZE = 32;
151 public const int DEFAULT_CHUNK_SIZE = 32;
152 public const int MAX_CHUNK_SIZE = 262144;
152 public const int MAX_CHUNK_SIZE = 262144;
153
153
154 Chunk m_first;
154 Chunk m_first;
155 Chunk m_last;
155 Chunk m_last;
156
156
157 /// <summary>
157 /// <summary>
158 /// Adds the specified value to the queue.
158 /// Adds the specified value to the queue.
159 /// </summary>
159 /// </summary>
160 /// <param name="value">Tha value which will be added to the queue.</param>
160 /// <param name="value">Tha value which will be added to the queue.</param>
161 public void Enqueue(T value) {
161 public virtual void Enqueue(T value) {
162 var last = m_last;
162 var last = m_last;
163 // spin wait to the new chunk
163 // spin wait to the new chunk
164 bool extend = true;
164 bool extend = true;
165 while (last == null || !last.TryEnqueue(value, out extend)) {
165 while (last == null || !last.TryEnqueue(value, out extend)) {
166 // try to extend queue
166 // try to extend queue
167 if (extend || last == null) {
167 if (extend || last == null) {
168 var chunk = new Chunk(DEFAULT_CHUNK_SIZE, value);
168 var chunk = new Chunk(DEFAULT_CHUNK_SIZE, value);
169 if (EnqueueChunk(last, chunk))
169 if (EnqueueChunk(last, chunk))
170 break; // success! exit!
170 break; // success! exit!
171 last = m_last;
171 last = m_last;
172 } else {
172 } else {
173 while (last == m_last) {
173 while (last == m_last) {
174 Thread.MemoryBarrier();
174 Thread.MemoryBarrier();
175 }
175 }
176 last = m_last;
176 last = m_last;
177 }
177 }
178 }
178 }
179 }
179 }
180
180
181 /// <summary>
181 /// <summary>
182 /// Adds the specified data to the queue.
182 /// Adds the specified data to the queue.
183 /// </summary>
183 /// </summary>
184 /// <param name="data">The buffer which contains the data to be enqueued.</param>
184 /// <param name="data">The buffer which contains the data to be enqueued.</param>
185 /// <param name="offset">The offset of the data in the buffer.</param>
185 /// <param name="offset">The offset of the data in the buffer.</param>
186 /// <param name="length">The size of the data to read from the buffer.</param>
186 /// <param name="length">The size of the data to read from the buffer.</param>
187 public void EnqueueRange(T[] data, int offset, int length) {
187 public virtual void EnqueueRange(T[] data, int offset, int length) {
188 if (data == null)
188 if (data == null)
189 throw new ArgumentNullException("data");
189 throw new ArgumentNullException("data");
190 if (length == 0)
190 if (length == 0)
191 return;
191 return;
192 if (offset < 0)
192 if (offset < 0)
193 throw new ArgumentOutOfRangeException("offset");
193 throw new ArgumentOutOfRangeException("offset");
194 if (length < 1 || offset + length > data.Length)
194 if (length < 1 || offset + length > data.Length)
195 throw new ArgumentOutOfRangeException("length");
195 throw new ArgumentOutOfRangeException("length");
196
196
197 var last = m_last;
197 var last = m_last;
198
198
199 bool extend;
199 bool extend;
200 int enqueued;
200 int enqueued;
201
201
202 while (length > 0) {
202 while (length > 0) {
203 extend = true;
203 extend = true;
204 if (last != null && last.TryEnqueueBatch(data, offset, length, out enqueued, out extend)) {
204 if (last != null && last.TryEnqueueBatch(data, offset, length, out enqueued, out extend)) {
205 length -= enqueued;
205 length -= enqueued;
206 offset += enqueued;
206 offset += enqueued;
207 }
207 }
208
208
209 if (extend) {
209 if (extend) {
210 // there was no enough space in the chunk
210 // there was no enough space in the chunk
211 // or there was no chunks in the queue
211 // or there was no chunks in the queue
212
212
213 while (length > 0) {
213 while (length > 0) {
214
214
215 var size = Math.Min(length, MAX_CHUNK_SIZE);
215 var size = Math.Min(length, MAX_CHUNK_SIZE);
216
216
217 var chunk = new Chunk(
217 var chunk = new Chunk(
218 Math.Max(size, DEFAULT_CHUNK_SIZE),
218 Math.Max(size, DEFAULT_CHUNK_SIZE),
219 data,
219 data,
220 offset,
220 offset,
221 size,
221 size,
222 length // length >= size
222 length // length >= size
223 );
223 );
224
224
225 if (!EnqueueChunk(last, chunk)) {
225 if (!EnqueueChunk(last, chunk)) {
226 // looks like the queue has been updated then proceed from the beginning
226 // looks like the queue has been updated then proceed from the beginning
227 last = m_last;
227 last = m_last;
228 break;
228 break;
229 }
229 }
230
230
231 // we have successfully added the new chunk
231 // we have successfully added the new chunk
232 last = chunk;
232 last = chunk;
233 length -= size;
233 length -= size;
234 offset += size;
234 offset += size;
235 }
235 }
236 } else {
236 } else {
237 // we don't need to extend the queue, if we successfully enqueued data
237 // we don't need to extend the queue, if we successfully enqueued data
238 if (length == 0)
238 if (length == 0)
239 break;
239 break;
240
240
241 // if we need to wait while someone is extending the queue
241 // if we need to wait while someone is extending the queue
242 // spinwait
242 // spinwait
243 while (last == m_last) {
243 while (last == m_last) {
244 Thread.MemoryBarrier();
244 Thread.MemoryBarrier();
245 }
245 }
246
246
247 last = m_last;
247 last = m_last;
248 }
248 }
249 }
249 }
250 }
250 }
251
251
252 /// <summary>
252 /// <summary>
253 /// Tries to retrieve the first element from the queue.
253 /// Tries to retrieve the first element from the queue.
254 /// </summary>
254 /// </summary>
255 /// <returns><c>true</c>, if element is dequeued, <c>false</c> otherwise.</returns>
255 /// <returns><c>true</c>, if element is dequeued, <c>false</c> otherwise.</returns>
256 /// <param name="value">The value of the dequeued element.</param>
256 /// <param name="value">The value of the dequeued element.</param>
257 public bool TryDequeue(out T value) {
257 public bool TryDequeue(out T value) {
258 var chunk = m_first;
258 var chunk = m_first;
259 bool recycle;
259 bool recycle;
260 while (chunk != null) {
260 while (chunk != null) {
261
261
262 var result = chunk.TryDequeue(out value, out recycle);
262 var result = chunk.TryDequeue(out value, out recycle);
263
263
264 if (recycle) // this chunk is waste
264 if (recycle) // this chunk is waste
265 RecycleFirstChunk(chunk);
265 RecycleFirstChunk(chunk);
266 else
266 else
267 return result; // this chunk is usable and returned actual result
267 return result; // this chunk is usable and returned actual result
268
268
269 if (result) // this chunk is waste but the true result is always actual
269 if (result) // this chunk is waste but the true result is always actual
270 return true;
270 return true;
271
271
272 // try again
272 // try again
273 chunk = m_first;
273 chunk = m_first;
274 }
274 }
275
275
276 // the queue is empty
276 // the queue is empty
277 value = default(T);
277 value = default(T);
278 return false;
278 return false;
279 }
279 }
280
280
281 /// <summary>
281 /// <summary>
282 /// Tries to dequeue the specified amount of data from the queue.
282 /// Tries to dequeue the specified amount of data from the queue.
283 /// </summary>
283 /// </summary>
284 /// <returns><c>true</c>, if data was deuqueued, <c>false</c> otherwise.</returns>
284 /// <returns><c>true</c>, if data was deuqueued, <c>false</c> otherwise.</returns>
285 /// <param name="buffer">The buffer to which the data will be written.</param>
285 /// <param name="buffer">The buffer to which the data will be written.</param>
286 /// <param name="offset">The offset in the buffer at which the data will be written.</param>
286 /// <param name="offset">The offset in the buffer at which the data will be written.</param>
287 /// <param name="length">The maximum amount of data to be retrieved.</param>
287 /// <param name="length">The maximum amount of data to be retrieved.</param>
288 /// <param name="dequeued">The actual amout of the retrieved data.</param>
288 /// <param name="dequeued">The actual amout of the retrieved data.</param>
289 public bool TryDequeueRange(T[] buffer, int offset, int length, out int dequeued) {
289 public bool TryDequeueRange(T[] buffer, int offset, int length, out int dequeued) {
290 if (buffer == null)
290 if (buffer == null)
291 throw new ArgumentNullException("buffer");
291 throw new ArgumentNullException("buffer");
292 if (offset < 0)
292 if (offset < 0)
293 throw new ArgumentOutOfRangeException("offset");
293 throw new ArgumentOutOfRangeException("offset");
294 if (length < 1 || offset + length > buffer.Length)
294 if (length < 1 || offset + length > buffer.Length)
295 throw new ArgumentOutOfRangeException("length");
295 throw new ArgumentOutOfRangeException("length");
296
296
297 var chunk = m_first;
297 var chunk = m_first;
298 bool recycle;
298 bool recycle;
299 dequeued = 0;
299 dequeued = 0;
300 while (chunk != null) {
300 while (chunk != null) {
301
301
302 int actual;
302 int actual;
303 if (chunk.TryDequeueBatch(buffer, offset, length, out actual, out recycle)) {
303 if (chunk.TryDequeueBatch(buffer, offset, length, out actual, out recycle)) {
304 offset += actual;
304 offset += actual;
305 length -= actual;
305 length -= actual;
306 dequeued += actual;
306 dequeued += actual;
307 }
307 }
308
308
309 if (recycle) // this chunk is waste
309 if (recycle) // this chunk is waste
310 RecycleFirstChunk(chunk);
310 RecycleFirstChunk(chunk);
311 else if (actual == 0)
311 else if (actual == 0)
312 break; // the chunk is usable, but doesn't contain any data (it's the last chunk in the queue)
312 break; // the chunk is usable, but doesn't contain any data (it's the last chunk in the queue)
313
313
314 if (length == 0)
314 if (length == 0)
315 return true;
315 return true;
316
316
317 // we still may dequeue something
317 // we still may dequeue something
318 // try again
318 // try again
319 chunk = m_first;
319 chunk = m_first;
320 }
320 }
321
321
322 return dequeued != 0;
322 return dequeued != 0;
323 }
323 }
324
324
325 /// <summary>
325 /// <summary>
326 /// Tries to dequeue all remaining data in the first chunk.
326 /// Tries to dequeue all remaining data in the first chunk.
327 /// </summary>
327 /// </summary>
328 /// <returns><c>true</c>, if data was dequeued, <c>false</c> otherwise.</returns>
328 /// <returns><c>true</c>, if data was dequeued, <c>false</c> otherwise.</returns>
329 /// <param name="buffer">The buffer to which the data will be written.</param>
329 /// <param name="buffer">The buffer to which the data will be written.</param>
330 /// <param name="offset">The offset in the buffer at which the data will be written.</param>
330 /// <param name="offset">The offset in the buffer at which the data will be written.</param>
331 /// <param name="length">Tha maximum amount of the data to be dequeued.</param>
331 /// <param name="length">Tha maximum amount of the data to be dequeued.</param>
332 /// <param name="dequeued">The actual amount of the dequeued data.</param>
332 /// <param name="dequeued">The actual amount of the dequeued data.</param>
333 public bool TryDequeueChunk(T[] buffer, int offset, int length, out int dequeued) {
333 public bool TryDequeueChunk(T[] buffer, int offset, int length, out int dequeued) {
334 if (buffer == null)
334 if (buffer == null)
335 throw new ArgumentNullException("buffer");
335 throw new ArgumentNullException("buffer");
336 if (offset < 0)
336 if (offset < 0)
337 throw new ArgumentOutOfRangeException("offset");
337 throw new ArgumentOutOfRangeException("offset");
338 if (length < 1 || offset + length > buffer.Length)
338 if (length < 1 || offset + length > buffer.Length)
339 throw new ArgumentOutOfRangeException("length");
339 throw new ArgumentOutOfRangeException("length");
340
340
341 var chunk = m_first;
341 var chunk = m_first;
342 bool recycle;
342 bool recycle;
343 dequeued = 0;
343 dequeued = 0;
344
344
345 while (chunk != null) {
345 while (chunk != null) {
346
346
347 int actual;
347 int actual;
348 if (chunk.TryDequeueBatch(buffer, offset, length, out actual, out recycle)) {
348 if (chunk.TryDequeueBatch(buffer, offset, length, out actual, out recycle)) {
349 dequeued = actual;
349 dequeued = actual;
350 }
350 }
351
351
352 if (recycle) // this chunk is waste
352 if (recycle) // this chunk is waste
353 RecycleFirstChunk(chunk);
353 RecycleFirstChunk(chunk);
354
354
355 // if we have dequeued any data, then return
355 // if we have dequeued any data, then return
356 if (dequeued != 0)
356 if (dequeued != 0)
357 return true;
357 return true;
358
358
359 // we still may dequeue something
359 // we still may dequeue something
360 // try again
360 // try again
361 chunk = m_first;
361 chunk = m_first;
362 }
362 }
363
363
364 return false;
364 return false;
365 }
365 }
366
366
367 bool EnqueueChunk(Chunk last, Chunk chunk) {
367 bool EnqueueChunk(Chunk last, Chunk chunk) {
368 if (Interlocked.CompareExchange(ref m_last, chunk, last) != last)
368 if (Interlocked.CompareExchange(ref m_last, chunk, last) != last)
369 return false;
369 return false;
370
370
371 if (last != null)
371 if (last != null)
372 last.next = chunk;
372 last.next = chunk;
373 else {
373 else {
374 m_first = chunk;
374 m_first = chunk;
375 }
375 }
376 return true;
376 return true;
377 }
377 }
378
378
379 void RecycleFirstChunk(Chunk first) {
379 void RecycleFirstChunk(Chunk first) {
380 var next = first.next;
380 var next = first.next;
381
381
382 if (first != Interlocked.CompareExchange(ref m_first, next, first))
382 if (first != Interlocked.CompareExchange(ref m_first, next, first))
383 return;
383 return;
384
384
385 if (next == null) {
385 if (next == null) {
386
386
387 if (first != Interlocked.CompareExchange(ref m_last, null, first)) {
387 if (first != Interlocked.CompareExchange(ref m_last, null, first)) {
388 /*while (first.next == null)
388 /*while (first.next == null)
389 Thread.MemoryBarrier();*/
389 Thread.MemoryBarrier();*/
390
390
391 // race
391 // race
392 // someone already updated the tail, restore the pointer to the queue head
392 // someone already updated the tail, restore the pointer to the queue head
393 m_first = first;
393 m_first = first;
394 }
394 }
395 // the tail is updated
395 // the tail is updated
396 }
396 }
397
397
398 // we need to update the head
398 // we need to update the head
399 //Interlocked.CompareExchange(ref m_first, next, first);
399 //Interlocked.CompareExchange(ref m_first, next, first);
400 // if the head is already updated then give up
400 // if the head is already updated then give up
401 //return;
401 //return;
402
402
403 }
403 }
404
404
405 public void Clear() {
405 public void Clear() {
406 // start the new queue
406 // start the new queue
407 var chunk = new Chunk(DEFAULT_CHUNK_SIZE);
407 var chunk = new Chunk(DEFAULT_CHUNK_SIZE);
408
408
409 do {
409 do {
410 Thread.MemoryBarrier();
410 Thread.MemoryBarrier();
411 var first = m_first;
411 var first = m_first;
412 var last = m_last;
412 var last = m_last;
413
413
414 if (last == null) // nothing to clear
414 if (last == null) // nothing to clear
415 return;
415 return;
416
416
417 if (first == null || (first.next == null && first != last)) // inconcistency
417 if (first == null || (first.next == null && first != last)) // inconcistency
418 continue;
418 continue;
419
419
420 // here we will create inconsistency which will force others to spin
420 // here we will create inconsistency which will force others to spin
421 // and prevent from fetching. chunk.next = null
421 // and prevent from fetching. chunk.next = null
422 if (first != Interlocked.CompareExchange(ref m_first, chunk, first))
422 if (first != Interlocked.CompareExchange(ref m_first, chunk, first))
423 continue;// inconsistent
423 continue;// inconsistent
424
424
425 m_last = chunk;
425 m_last = chunk;
426
426
427 return;
427 return;
428
428
429 } while(true);
429 } while(true);
430 }
430 }
431
431
432 public T[] Drain() {
432 public T[] Drain() {
433 // start the new queue
433 // start the new queue
434 var chunk = new Chunk(DEFAULT_CHUNK_SIZE);
434 var chunk = new Chunk(DEFAULT_CHUNK_SIZE);
435
435
436 do {
436 do {
437 Thread.MemoryBarrier();
437 Thread.MemoryBarrier();
438 var first = m_first;
438 var first = m_first;
439 var last = m_last;
439 var last = m_last;
440
440
441 if (last == null)
441 if (last == null)
442 return new T[0];
442 return new T[0];
443
443
444 if (first == null || (first.next == null && first != last))
444 if (first == null || (first.next == null && first != last))
445 continue;
445 continue;
446
446
447 // here we will create inconsistency which will force others to spin
447 // here we will create inconsistency which will force others to spin
448 // and prevent from fetching. chunk.next = null
448 // and prevent from fetching. chunk.next = null
449 if (first != Interlocked.CompareExchange(ref m_first, chunk, first))
449 if (first != Interlocked.CompareExchange(ref m_first, chunk, first))
450 continue;// inconsistent
450 continue;// inconsistent
451
451
452 last = Interlocked.Exchange(ref m_last, chunk);
452 last = Interlocked.Exchange(ref m_last, chunk);
453
453
454 return ReadChunks(first, last);
454 return ReadChunks(first, last);
455
455
456 } while(true);
456 } while(true);
457 }
457 }
458
458
459 static T[] ReadChunks(Chunk chunk, object last) {
459 static T[] ReadChunks(Chunk chunk, object last) {
460 var result = new List<T>();
460 var result = new List<T>();
461 var buffer = new T[DEFAULT_CHUNK_SIZE];
461 var buffer = new T[DEFAULT_CHUNK_SIZE];
462 int actual;
462 int actual;
463 bool recycle;
463 bool recycle;
464 while (chunk != null) {
464 while (chunk != null) {
465 // ensure all write operations on the chunk are complete
465 // ensure all write operations on the chunk are complete
466 chunk.Commit();
466 chunk.Commit();
467
467
468 // we need to read the chunk using this way
468 // we need to read the chunk using this way
469 // since some client still may completing the dequeue
469 // since some client still may completing the dequeue
470 // operation, such clients most likely won't get results
470 // operation, such clients most likely won't get results
471 while (chunk.TryDequeueBatch(buffer, 0, buffer.Length, out actual, out recycle))
471 while (chunk.TryDequeueBatch(buffer, 0, buffer.Length, out actual, out recycle))
472 result.AddRange(new ArraySegmentCollection(buffer, 0, actual));
472 result.AddRange(new ArraySegmentCollection(buffer, 0, actual));
473
473
474 if (chunk == last) {
474 if (chunk == last) {
475 chunk = null;
475 chunk = null;
476 } else {
476 } else {
477 while (chunk.next == null)
477 while (chunk.next == null)
478 Thread.MemoryBarrier();
478 Thread.MemoryBarrier();
479 chunk = chunk.next;
479 chunk = chunk.next;
480 }
480 }
481 }
481 }
482
482
483 return result.ToArray();
483 return result.ToArray();
484 }
484 }
485
485
486 struct ArraySegmentCollection : ICollection<T> {
486 struct ArraySegmentCollection : ICollection<T> {
487 readonly T[] m_data;
487 readonly T[] m_data;
488 readonly int m_offset;
488 readonly int m_offset;
489 readonly int m_length;
489 readonly int m_length;
490
490
491 public ArraySegmentCollection(T[] data, int offset, int length) {
491 public ArraySegmentCollection(T[] data, int offset, int length) {
492 m_data = data;
492 m_data = data;
493 m_offset = offset;
493 m_offset = offset;
494 m_length = length;
494 m_length = length;
495 }
495 }
496
496
497 #region ICollection implementation
497 #region ICollection implementation
498
498
499 public void Add(T item) {
499 public void Add(T item) {
500 throw new NotSupportedException();
500 throw new NotSupportedException();
501 }
501 }
502
502
503 public void Clear() {
503 public void Clear() {
504 throw new NotSupportedException();
504 throw new NotSupportedException();
505 }
505 }
506
506
507 public bool Contains(T item) {
507 public bool Contains(T item) {
508 return false;
508 return false;
509 }
509 }
510
510
511 public void CopyTo(T[] array, int arrayIndex) {
511 public void CopyTo(T[] array, int arrayIndex) {
512 Array.Copy(m_data,m_offset,array,arrayIndex, m_length);
512 Array.Copy(m_data,m_offset,array,arrayIndex, m_length);
513 }
513 }
514
514
515 public bool Remove(T item) {
515 public bool Remove(T item) {
516 throw new NotSupportedException();
516 throw new NotSupportedException();
517 }
517 }
518
518
519 public int Count {
519 public int Count {
520 get {
520 get {
521 return m_length;
521 return m_length;
522 }
522 }
523 }
523 }
524
524
525 public bool IsReadOnly {
525 public bool IsReadOnly {
526 get {
526 get {
527 return true;
527 return true;
528 }
528 }
529 }
529 }
530
530
531 #endregion
531 #endregion
532
532
533 #region IEnumerable implementation
533 #region IEnumerable implementation
534
534
535 public IEnumerator<T> GetEnumerator() {
535 public IEnumerator<T> GetEnumerator() {
536 for (int i = m_offset; i < m_length + m_offset; i++)
536 for (int i = m_offset; i < m_length + m_offset; i++)
537 yield return m_data[i];
537 yield return m_data[i];
538 }
538 }
539
539
540 #endregion
540 #endregion
541
541
542 #region IEnumerable implementation
542 #region IEnumerable implementation
543
543
544 IEnumerator IEnumerable.GetEnumerator() {
544 IEnumerator IEnumerable.GetEnumerator() {
545 return GetEnumerator();
545 return GetEnumerator();
546 }
546 }
547
547
548 #endregion
548 #endregion
549 }
549 }
550
550
551 #region IEnumerable implementation
551 #region IEnumerable implementation
552
552
553 class Enumerator : IEnumerator<T> {
553 class Enumerator : IEnumerator<T> {
554 Chunk m_current;
554 Chunk m_current;
555 int m_pos = -1;
555 int m_pos = -1;
556
556
557 public Enumerator(Chunk fisrt) {
557 public Enumerator(Chunk fisrt) {
558 m_current = fisrt;
558 m_current = fisrt;
559 }
559 }
560
560
561 #region IEnumerator implementation
561 #region IEnumerator implementation
562
562
563 public bool MoveNext() {
563 public bool MoveNext() {
564 if (m_current == null)
564 if (m_current == null)
565 return false;
565 return false;
566
566
567 if (m_pos == -1)
567 if (m_pos == -1)
568 m_pos = m_current.Low;
568 m_pos = m_current.Low;
569 else
569 else
570 m_pos++;
570 m_pos++;
571
571
572 if (m_pos == m_current.Hi) {
572 if (m_pos == m_current.Hi) {
573
573
574 m_current = m_pos == m_current.Size ? m_current.next : null;
574 m_current = m_pos == m_current.Size ? m_current.next : null;
575
575
576 m_pos = 0;
576 m_pos = 0;
577
577
578 if (m_current == null)
578 if (m_current == null)
579 return false;
579 return false;
580 }
580 }
581
581
582 return true;
582 return true;
583 }
583 }
584
584
585 public void Reset() {
585 public void Reset() {
586 throw new NotSupportedException();
586 throw new NotSupportedException();
587 }
587 }
588
588
589 object IEnumerator.Current {
589 object IEnumerator.Current {
590 get {
590 get {
591 return Current;
591 return Current;
592 }
592 }
593 }
593 }
594
594
595 #endregion
595 #endregion
596
596
597 #region IDisposable implementation
597 #region IDisposable implementation
598
598
599 public void Dispose() {
599 public void Dispose() {
600 }
600 }
601
601
602 #endregion
602 #endregion
603
603
604 #region IEnumerator implementation
604 #region IEnumerator implementation
605
605
606 public T Current {
606 public T Current {
607 get {
607 get {
608 if (m_pos == -1 || m_current == null)
608 if (m_pos == -1 || m_current == null)
609 throw new InvalidOperationException();
609 throw new InvalidOperationException();
610 return m_current.GetAt(m_pos);
610 return m_current.GetAt(m_pos);
611 }
611 }
612 }
612 }
613
613
614 #endregion
614 #endregion
615 }
615 }
616
616
617 public IEnumerator<T> GetEnumerator() {
617 public IEnumerator<T> GetEnumerator() {
618 return new Enumerator(m_first);
618 return new Enumerator(m_first);
619 }
619 }
620
620
621 #endregion
621 #endregion
622
622
623 #region IEnumerable implementation
623 #region IEnumerable implementation
624
624
625 IEnumerator IEnumerable.GetEnumerator() {
625 IEnumerator IEnumerable.GetEnumerator() {
626 return GetEnumerator();
626 return GetEnumerator();
627 }
627 }
628
628
629 #endregion
629 #endregion
630 }
630 }
631 }
631 }
General Comments 0
You need to be logged in to leave comments. Login now