diff --git a/Implab.Playground/App.config b/Implab.Playground/App.config
--- a/Implab.Playground/App.config
+++ b/Implab.Playground/App.config
@@ -1,6 +1,6 @@
-
+
-
+
-
\ No newline at end of file
+
diff --git a/Implab.Playground/Implab.Playground.csproj b/Implab.Playground/Implab.Playground.csproj
--- a/Implab.Playground/Implab.Playground.csproj
+++ b/Implab.Playground/Implab.Playground.csproj
@@ -9,9 +9,10 @@
Properties
Implab.Playground
Implab.Playground
- v4.5.2
+ v4.5
512
true
+
AnyCPU
diff --git a/Implab.Playground/Program.cs b/Implab.Playground/Program.cs
--- a/Implab.Playground/Program.cs
+++ b/Implab.Playground/Program.cs
@@ -1,10 +1,13 @@
using Implab.Formats.Json;
+using Implab.Parallels;
using Implab.Xml;
using System;
+using System.Collections.Concurrent;
using System.Collections.Generic;
using System.IO;
using System.Linq;
using System.Text;
+using System.Threading;
using System.Threading.Tasks;
using System.Xml;
using System.Xml.Serialization;
@@ -12,29 +15,167 @@ using System.Xml.Serialization;
namespace Implab.Playground {
public class Program {
- [XmlRoot(Namespace = "XmlSimpleData")]
- public class XmlSimpleModel {
- [XmlElement]
- public string Name { get; set; }
+ static void EnqueueRange(ConcurrentQueue q, T[] data, int offset, int len) {
+ for (var i = offset; i < offset + len; i++)
+ q.Enqueue(data[i]);
+ }
+
+ static bool TryDequeueRange(ConcurrentQueue q,T[] buffer,int offset, int len, out int actual) {
+ actual = 0;
+ T res;
+ while(q.TryDequeue(out res)) {
+ buffer[offset + actual] = res;
+ actual++;
+ if (actual == len)
+ break;
+ }
+ return actual != 0;
+ }
+
+ static void EnqueueRange(SimpleAsyncQueue q, T[] data, int offset, int len) {
+ for (var i = offset; i < offset + len; i++)
+ q.Enqueue(data[i]);
+ }
- [XmlElement]
- public int Order { get; set; }
+ static bool TryDequeueRange(SimpleAsyncQueue q, T[] buffer, int offset, int len, out int actual) {
+ actual = 0;
+ T res;
+ while (q.TryDequeue(out res)) {
+ buffer[offset + actual] = res;
+ actual++;
+ if (actual == len)
+ break;
+ }
+ return actual != 0;
+ }
- [XmlElement]
- public string[] Items { get; set; }
+ /*
+ static void EnqueueRange(AsyncQueue q, T[] data, int offset, int len) {
+ for (var i = offset; i < offset + len; i++)
+ q.Enqueue(data[i]);
+ }
+ static bool TryDequeueRange(AsyncQueue q, T[] buffer, int offset, int len, out int actual) {
+ actual = 0;
+ T res;
+ while (q.TryDequeue(out res)) {
+ buffer[offset + actual] = res;
+ actual++;
+ if (actual == len)
+ break;
+ }
+ return actual != 0;
+ }
+ */
+
+ static void EnqueueRange(AsyncQueue q, T[] data, int offset, int len) {
+ q.EnqueueRange(data, offset, len);
}
+ static bool TryDequeueRange(AsyncQueue q, T[] buffer, int offset, int len, out int actual) {
+ return q.TryDequeueRange(buffer, offset, len, out actual);
+ }
+
+
static void Main(string[] args) {
- var model = new XmlSimpleModel {
- Name = "Tablet",
- Order = 10,
- Items = new string[] { "z1", "z2", "z3" }
- };
- var doc = SerializationHelpers.SerializeAsXmlDocument(model);
-
- var m2 = SerializationHelpers.DeserializeFromXmlNode(doc.DocumentElement);
+ //var queue = new ConcurrentQueue();
+ var queue = new AsyncQueue();
+ //var queue = new SimpleAsyncQueue();
+
+ const int wBatch = 32;
+ const long wCount = 1000000;
+ const long total = wBatch * wCount * 3;
+
+ long r1 = 0, r2 = 0, r3 = 0;
+ const int rBatch = 1000;
+ long read = 0;
+
+ var t1 = Environment.TickCount;
+
+ AsyncPool.RunThread(
+ () => {
+ var buffer = new int[wBatch];
+ for (int i = 0; i < wBatch; i++)
+ buffer[i] = 1;
+
+ for (int i = 0; i < wCount; i++)
+ EnqueueRange(queue, buffer, 0, wBatch);
+ Console.WriteLine("done writer #1: {0} ms", Environment.TickCount - t1);
+ },
+ () => {
+ var buffer = new int[wBatch];
+ for (int i = 0; i < wBatch; i++)
+ buffer[i] = 1;
+
+ for (int i = 0; i < wCount; i++)
+ EnqueueRange(queue, buffer, 0, wBatch);
+ Console.WriteLine("done writer #2: {0} ms", Environment.TickCount - t1);
+ },
+ () => {
+ var buffer = new int[wBatch];
+ for (int i = 0; i < wBatch; i++)
+ buffer[i] = 1;
+
+ for (int i = 0; i < wCount; i++)
+ EnqueueRange(queue, buffer, 0, wBatch);
+ Console.WriteLine("done writer #3: {0} ms", Environment.TickCount - t1);
+ },
+ () => {
+ var buffer = new int[rBatch];
+
+ while (read < total) {
+ int actual;
+ if (TryDequeueRange(queue, buffer, 0, rBatch, out actual)) {
+ for (int i = 0; i < actual; i++)
+ r1 += buffer[i];
+ Interlocked.Add(ref read, actual);
+ }
+ }
+
+ Console.WriteLine("done reader #1: {0} ms", Environment.TickCount - t1);
+ }/*,
+ () => {
+ var buffer = new int[rBatch];
+
+ while (read < total) {
+ int actual;
+ if (TryDequeueRange(queue, buffer, 0, rBatch, out actual)) {
+ for (int i = 0; i < actual; i++)
+ r2 += buffer[i];
+ Interlocked.Add(ref read, actual);
+ }
+ }
+
+ Console.WriteLine("done reader #2: {0} ms", Environment.TickCount - t1);
+ }*//*,
+ () => {
+ var buffer = new int[rBatch];
+
+ while (read < total) {
+ int actual;
+ if (TryDequeueRange(queue, buffer, 0, rBatch, out actual)) {
+ for (int i = 0; i < actual; i++)
+ r3 += buffer[i];
+ Interlocked.Add(ref read, actual);
+ }
+ }
+
+ Console.WriteLine("done reader #3: {0} ms", Environment.TickCount - t1);
+ }*/
+ )
+ .PromiseAll()
+ .Join();
+
+
+ Console.WriteLine(
+ "done: {0} ms, summ#1: {1}, summ#2: {2}, total: {3}, count: {4}",
+ Environment.TickCount - t1,
+ r1,
+ r2,
+ r1 + r2 + r3,
+ total
+ );
Console.WriteLine("done");
}
diff --git a/Implab.Playground2.psess b/Implab.Playground2.psess
--- a/Implab.Playground2.psess
+++ b/Implab.Playground2.psess
@@ -48,8 +48,8 @@
true
false
Executable
- Implab.Playground\bin\Release\Implab.Playground.exe
- Implab.Playground\bin\Release\
+ Implab.Playground\bin\Debug\Implab.Playground.exe
+ Implab.Playground\bin\Debug\
IIS
@@ -67,9 +67,4 @@
Implab.Playground
-
-
- :PB:{100DFEB0-75BE-436F-ADDF-1F46EF433F46}|Implab.Playground\Implab.Playground.csproj
-
-
\ No newline at end of file
diff --git a/Implab.Test/AsyncTests.cs b/Implab.Test/AsyncTests.cs
--- a/Implab.Test/AsyncTests.cs
+++ b/Implab.Test/AsyncTests.cs
@@ -222,9 +222,9 @@ namespace Implab.Test {
[TestMethod]
public void MTQueueTest() {
- var queue = new MTQueue();
+ var queue = new SimpleAsyncQueue();
int res;
-
+
queue.Enqueue(10);
Assert.IsTrue(queue.TryDequeue(out res));
Assert.AreEqual(10, res);
@@ -242,8 +242,9 @@ namespace Implab.Test {
int readers = 0;
var stop = new ManualResetEvent(false);
int total = 0;
+ var ticks = Environment.TickCount;
- const int itemsPerWriter = 10000;
+ const int itemsPerWriter = 1000000;
const int writersCount = 10;
for (int i = 0; i < writersCount; i++) {
@@ -278,7 +279,9 @@ namespace Implab.Test {
stop.WaitOne();
- Assert.AreEqual(100000, total);
+ Console.WriteLine("{0} in {1}ms", total, Environment.TickCount - ticks);
+
+ Assert.AreEqual(itemsPerWriter * writersCount, total);
}
[TestMethod]
@@ -509,13 +512,12 @@ namespace Implab.Test {
public void AsyncQueueDrainTest() {
var queue = new AsyncQueue();
- const int wBatch = 11;
+ const int wBatch = 32;
const int wCount = 200000;
const int total = wBatch * wCount * 3;
const int summ = wBatch * wCount * 3;
int r1 = 0, r2 = 0;
- const int rBatch = 11;
int read = 0;
var t1 = Environment.TickCount;
@@ -531,8 +533,12 @@ namespace Implab.Test {
Console.WriteLine("done writer #1: {0} ms", Environment.TickCount - t1);
},
() => {
- for(int i =0; i < wCount * wBatch; i++)
- queue.Enqueue(1);
+ var buffer = new int[wBatch];
+ for (int i = 0; i < wBatch; i++)
+ buffer[i] = 1;
+
+ for (int i = 0; i < wCount; i++)
+ queue.EnqueueRange(buffer, 0, wBatch);
Console.WriteLine("done writer #2: {0} ms", Environment.TickCount - t1);
},
() => {
@@ -572,25 +578,34 @@ namespace Implab.Test {
},*/
() => {
var count = 0;
- while(read < total) {
+ int emptyDrains = 0;
+
+ while (read < total) {
var buffer = queue.Drain();
- for(int i=0; i< buffer.Length; i++)
+ if (buffer.Count == 0)
+ emptyDrains++;
+ for(int i=0; i< buffer.Count; i++)
r1 += buffer[i];
- Interlocked.Add(ref read, buffer.Length);
- count += buffer.Length;
+ Interlocked.Add(ref read, buffer.Count);
+ count += buffer.Count;
}
- Console.WriteLine("done reader #1: {0} ms, {1} items", Environment.TickCount - t1, count);
+ Console.WriteLine("done reader #1: {0} ms, {1} items, empty: {2}", Environment.TickCount - t1, count, emptyDrains);
},
() => {
- var count = 0;
- while(read < total) {
+ var count = 0;
+ int emptyDrains = 0;
+
+ while (read < total) {
var buffer = queue.Drain();
- for(int i=0; i< buffer.Length; i++)
+ if (buffer.Count == 0)
+ emptyDrains++;
+
+ for (int i=0; i< buffer.Count; i++)
r2 += buffer[i];
- Interlocked.Add(ref read, buffer.Length);
- count += buffer.Length;
+ Interlocked.Add(ref read, buffer.Count);
+ count += buffer.Count;
}
- Console.WriteLine("done reader #2: {0} ms, {1} items", Environment.TickCount - t1, count);
+ Console.WriteLine("done reader #2: {0} ms, {1} items, empty: {2}", Environment.TickCount - t1, count, emptyDrains);
}
)
.PromiseAll()
diff --git a/Implab.sln b/Implab.sln
--- a/Implab.sln
+++ b/Implab.sln
@@ -14,8 +14,6 @@ Project("{2150E333-8FDC-42A3-9474-1A3956
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Implab.Test", "Implab.Test\Implab.Test.csproj", "{63F92C0C-61BF-48C0-A377-8D67C3C661D0}"
EndProject
-Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Implab.Fx", "Implab.Fx\Implab.Fx.csproj", "{06E706F8-6881-43EB-927E-FFC503AF6ABC}"
-EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Implab.Format.Test", "Implab.Format.Test\Implab.Format.Test.csproj", "{4D364996-7ECD-4193-8F90-F223FFEA49DA}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Implab.Playground", "Implab.Playground\Implab.Playground.csproj", "{100DFEB0-75BE-436F-ADDF-1F46EF433F46}"
@@ -47,14 +45,6 @@ Global
{63F92C0C-61BF-48C0-A377-8D67C3C661D0}.Release 4.5|Any CPU.Build.0 = Release 4.5|Any CPU
{63F92C0C-61BF-48C0-A377-8D67C3C661D0}.Release|Any CPU.ActiveCfg = Release|Any CPU
{63F92C0C-61BF-48C0-A377-8D67C3C661D0}.Release|Any CPU.Build.0 = Release|Any CPU
- {06E706F8-6881-43EB-927E-FFC503AF6ABC}.Debug 4.5|Any CPU.ActiveCfg = Debug 4.5|Any CPU
- {06E706F8-6881-43EB-927E-FFC503AF6ABC}.Debug 4.5|Any CPU.Build.0 = Debug 4.5|Any CPU
- {06E706F8-6881-43EB-927E-FFC503AF6ABC}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
- {06E706F8-6881-43EB-927E-FFC503AF6ABC}.Debug|Any CPU.Build.0 = Debug|Any CPU
- {06E706F8-6881-43EB-927E-FFC503AF6ABC}.Release 4.5|Any CPU.ActiveCfg = Release 4.5|Any CPU
- {06E706F8-6881-43EB-927E-FFC503AF6ABC}.Release 4.5|Any CPU.Build.0 = Release 4.5|Any CPU
- {06E706F8-6881-43EB-927E-FFC503AF6ABC}.Release|Any CPU.ActiveCfg = Release|Any CPU
- {06E706F8-6881-43EB-927E-FFC503AF6ABC}.Release|Any CPU.Build.0 = Release|Any CPU
{4D364996-7ECD-4193-8F90-F223FFEA49DA}.Debug 4.5|Any CPU.ActiveCfg = Debug|Any CPU
{4D364996-7ECD-4193-8F90-F223FFEA49DA}.Debug 4.5|Any CPU.Build.0 = Debug|Any CPU
{4D364996-7ECD-4193-8F90-F223FFEA49DA}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
diff --git a/Implab/AbstractEvent.cs b/Implab/AbstractEvent.cs
--- a/Implab/AbstractEvent.cs
+++ b/Implab/AbstractEvent.cs
@@ -24,13 +24,13 @@ namespace Implab {
//readonly THandler[] m_handlers = new THandler[RESERVED_HANDLERS_COUNT];
THandler[] m_handlers;
- MTQueue m_extraHandlers;
+ SimpleAsyncQueue m_extraHandlers;
int m_handlerPointer = -1;
int m_handlersCommited;
int m_cancelRequest;
Exception m_cancelationReason;
- MTQueue> m_cancelationHandlers;
+ SimpleAsyncQueue> m_cancelationHandlers;
#region state managment
@@ -182,7 +182,7 @@ namespace Implab {
}
} else {
if (slot == RESERVED_HANDLERS_COUNT) {
- m_extraHandlers = new MTQueue();
+ m_extraHandlers = new SimpleAsyncQueue();
} else {
while (m_extraHandlers == null)
Thread.MemoryBarrier();
@@ -245,7 +245,7 @@ namespace Implab {
handler(CancellationReason);
if (m_cancelationHandlers == null)
- Interlocked.CompareExchange(ref m_cancelationHandlers, new MTQueue>(), null);
+ Interlocked.CompareExchange(ref m_cancelationHandlers, new SimpleAsyncQueue>(), null);
m_cancelationHandlers.Enqueue(handler);
diff --git a/Implab/Automaton/AutomatonTransition.cs b/Implab/Automaton/AutomatonTransition.cs
--- a/Implab/Automaton/AutomatonTransition.cs
+++ b/Implab/Automaton/AutomatonTransition.cs
@@ -1,7 +1,7 @@
using System;
namespace Implab.Automaton {
- public struct AutomatonTransition : IEquatable {
+ public class AutomatonTransition : IEquatable {
public readonly int s1;
public readonly int s2;
public readonly int edge;
diff --git a/Implab/Implab.csproj b/Implab/Implab.csproj
--- a/Implab/Implab.csproj
+++ b/Implab/Implab.csproj
@@ -1,5 +1,5 @@
-
+
Debug
AnyCPU
@@ -8,6 +8,7 @@
Implab
Implab
v4.5
+
true
@@ -97,7 +98,7 @@
-
+
diff --git a/Implab/Parallels/AsyncQueue.cs b/Implab/Parallels/AsyncQueue.cs
--- a/Implab/Parallels/AsyncQueue.cs
+++ b/Implab/Parallels/AsyncQueue.cs
@@ -7,11 +7,11 @@ using System.Diagnostics;
namespace Implab.Parallels {
public class AsyncQueue : IEnumerable {
class Chunk {
- public Chunk next;
+ public volatile Chunk next;
- int m_low;
- int m_hi;
- int m_alloc;
+ volatile int m_low;
+ volatile int m_hi;
+ volatile int m_alloc;
readonly int m_size;
readonly T[] m_data;
@@ -28,12 +28,15 @@ namespace Implab.Parallels {
m_data[0] = value;
}
- public Chunk(int size, T[] data, int offset, int length, int alloc) {
+ public Chunk(int size, int allocated) {
m_size = size;
- m_hi = length;
- m_alloc = alloc;
+ m_hi = allocated;
+ m_alloc = allocated;
m_data = new T[size];
- Array.Copy(data, offset, m_data, 0, length);
+ }
+
+ public void WriteData(T[] data, int offset, int dest, int length) {
+ Array.Copy(data, offset, m_data, dest, length);
}
public int Low {
@@ -48,31 +51,36 @@ namespace Implab.Parallels {
get { return m_size; }
}
- public bool TryEnqueue(T value, out bool extend) {
- var alloc = Interlocked.Increment(ref m_alloc) - 1;
-
- if (alloc >= m_size) {
- extend = alloc == m_size;
- return false;
- }
-
- extend = false;
+ public bool TryEnqueue(T value) {
+ int alloc;
+ do {
+ alloc = m_alloc;
+ if (alloc >= m_size)
+ return false;
+ } while(alloc != Interlocked.CompareExchange(ref m_alloc, alloc + 1, alloc));
+
m_data[alloc] = value;
- while (alloc != Interlocked.CompareExchange(ref m_hi, alloc + 1, alloc)) {
+ SpinWait spin = new SpinWait();
+ // m_hi is volatile
+ while (alloc != m_hi) {
// spin wait for commit
+ spin.SpinOnce();
}
+ m_hi = alloc + 1;
+
return true;
}
///
/// Prevents from allocating new space in the chunk and waits for all write operations to complete
///
- public void Commit() {
- var actual = Math.Min(Interlocked.Exchange(ref m_alloc, m_size + 1), m_size);
-
- while (m_hi != actual)
- Thread.MemoryBarrier();
+ public void Seal() {
+ var actual = Math.Min(Interlocked.Exchange(ref m_alloc, m_size), m_size);
+ SpinWait spin = new SpinWait();
+ while (m_hi != actual) {
+ spin.SpinOnce();
+ }
}
public bool TryDequeue(out T value, out bool recycle) {
@@ -84,44 +92,38 @@ namespace Implab.Parallels {
recycle = (low == m_size);
return false;
}
- } while(low != Interlocked.CompareExchange(ref m_low, low + 1, low));
+ } while (low != Interlocked.CompareExchange(ref m_low, low + 1, low));
- recycle = (low == m_size - 1);
+ recycle = (low + 1 == m_size);
value = m_data[low];
return true;
}
- public bool TryEnqueueBatch(T[] batch, int offset, int length, out int enqueued, out bool extend) {
- //int alloc;
- //int allocSize;
+ public bool TryEnqueueBatch(T[] batch, int offset, int length, out int enqueued) {
+ int alloc;
+ do {
+ alloc = m_alloc;
+ if (alloc >= m_size) {
+ enqueued = 0;
+ return false;
+ } else {
+ enqueued = Math.Min(length, m_size - alloc);
+ }
+ } while (alloc != Interlocked.CompareExchange(ref m_alloc, alloc + enqueued, alloc));
+
+ Array.Copy(batch, offset, m_data, alloc, enqueued);
- var alloc = Interlocked.Add(ref m_alloc, length) - length;
- if (alloc > m_size) {
- // the chunk is full and someone already
- // creating the new one
- enqueued = 0; // nothing was added
- extend = false; // the caller shouldn't try to extend the queue
- return false; // nothing was added
+ SpinWait spin = new SpinWait();
+ while (alloc != m_hi) {
+ spin.SpinOnce();
}
- enqueued = Math.Min(m_size - alloc, length);
- extend = length > enqueued;
-
- if (enqueued == 0)
- return false;
-
-
- Array.Copy(batch, offset, m_data, alloc, enqueued);
-
- while (alloc != Interlocked.CompareExchange(ref m_hi, alloc + enqueued, alloc)) {
- // spin wait for commit
- }
-
+ m_hi = alloc + enqueued;
return true;
}
- public bool TryDequeueBatch(T[] buffer, int offset, int length,out int dequeued, out bool recycle) {
+ public bool TryDequeueBatch(T[] buffer, int offset, int length, out int dequeued, out bool recycle) {
int low, hi, batchSize;
do {
@@ -129,15 +131,14 @@ namespace Implab.Parallels {
hi = m_hi;
if (low >= hi) {
dequeued = 0;
- recycle = (low == m_size); // recycling could be restarted and we need to signal again
+ recycle = (low == m_size);
return false;
}
batchSize = Math.Min(hi - low, length);
- } while(low != Interlocked.CompareExchange(ref m_low, low + batchSize, low));
+ } while (low != Interlocked.CompareExchange(ref m_low, low + batchSize, low));
- recycle = (low == m_size - batchSize);
dequeued = batchSize;
-
+ recycle = (low + batchSize == m_size);
Array.Copy(m_data, low, buffer, offset, batchSize);
return true;
@@ -149,32 +150,33 @@ namespace Implab.Parallels {
}
public const int DEFAULT_CHUNK_SIZE = 32;
- public const int MAX_CHUNK_SIZE = 262144;
+ public const int MAX_CHUNK_SIZE = 256;
Chunk m_first;
Chunk m_last;
+ public AsyncQueue() {
+ m_first = m_last = new Chunk(DEFAULT_CHUNK_SIZE);
+ }
+
///
/// Adds the specified value to the queue.
///
/// Tha value which will be added to the queue.
- public virtual void Enqueue(T value) {
+ public void Enqueue(T value) {
var last = m_last;
- // spin wait to the new chunk
- bool extend = true;
- while (last == null || !last.TryEnqueue(value, out extend)) {
+ SpinWait spin = new SpinWait();
+ while (!last.TryEnqueue(value)) {
// try to extend queue
- if (extend || last == null) {
- var chunk = new Chunk(DEFAULT_CHUNK_SIZE, value);
- if (EnqueueChunk(last, chunk))
- break; // success! exit!
- last = m_last;
+ var chunk = new Chunk(DEFAULT_CHUNK_SIZE, value);
+ var t = Interlocked.CompareExchange(ref m_last, chunk, last);
+ if (t == last) {
+ last.next = chunk;
+ break;
} else {
- while (last == m_last) {
- Thread.MemoryBarrier();
- }
- last = m_last;
+ last = t;
}
+ spin.SpinOnce();
}
}
@@ -184,67 +186,54 @@ namespace Implab.Parallels {
/// The buffer which contains the data to be enqueued.
/// The offset of the data in the buffer.
/// The size of the data to read from the buffer.
- public virtual void EnqueueRange(T[] data, int offset, int length) {
+ public void EnqueueRange(T[] data, int offset, int length) {
if (data == null)
throw new ArgumentNullException("data");
- if (length == 0)
- return;
if (offset < 0)
throw new ArgumentOutOfRangeException("offset");
if (length < 1 || offset + length > data.Length)
throw new ArgumentOutOfRangeException("length");
- var last = m_last;
+ while (length > 0) {
+ var last = m_last;
+ int enqueued;
- bool extend;
- int enqueued;
-
- while (length > 0) {
- extend = true;
- if (last != null && last.TryEnqueueBatch(data, offset, length, out enqueued, out extend)) {
+ if (last.TryEnqueueBatch(data, offset, length, out enqueued)) {
length -= enqueued;
offset += enqueued;
}
- if (extend) {
- // there was no enough space in the chunk
- // or there was no chunks in the queue
+ if (length > 0) {
+ // we have something to enqueue
- while (length > 0) {
-
- var size = Math.Min(length, MAX_CHUNK_SIZE);
+ var tail = length % MAX_CHUNK_SIZE;
- var chunk = new Chunk(
- Math.Max(size, DEFAULT_CHUNK_SIZE),
- data,
- offset,
- size,
- length // length >= size
- );
+ var chunk = new Chunk(Math.Max(tail, DEFAULT_CHUNK_SIZE), tail);
+
+ if (last != Interlocked.CompareExchange(ref m_last, chunk, last))
+ continue; // we wasn't able to catch the writer, roundtrip
- if (!EnqueueChunk(last, chunk)) {
- // looks like the queue has been updated then proceed from the beginning
- last = m_last;
- break;
- }
+ // we are lucky
+ // we can exclusively write our batch, the other writers will continue their work
+
+ length -= tail;
- // we have successfully added the new chunk
- last = chunk;
- length -= size;
- offset += size;
- }
- } else {
- // we don't need to extend the queue, if we successfully enqueued data
- if (length == 0)
- break;
-
- // if we need to wait while someone is extending the queue
- // spinwait
- while (last == m_last) {
- Thread.MemoryBarrier();
+
+ for(var i = 0; i < length; i+= MAX_CHUNK_SIZE) {
+ var node = new Chunk(MAX_CHUNK_SIZE, MAX_CHUNK_SIZE);
+ node.WriteData(data, offset, 0, MAX_CHUNK_SIZE);
+ offset += MAX_CHUNK_SIZE;
+ // fence last.next is volatile
+ last.next = node;
+ last = node;
}
- last = m_last;
+ if (tail > 0)
+ chunk.WriteData(data, offset, 0, tail);
+
+ // fence last.next is volatile
+ last.next = chunk;
+ return;
}
}
}
@@ -256,26 +245,21 @@ namespace Implab.Parallels {
/// The value of the dequeued element.
public bool TryDequeue(out T value) {
var chunk = m_first;
- bool recycle;
- while (chunk != null) {
+ do {
+ bool recycle;
var result = chunk.TryDequeue(out value, out recycle);
- if (recycle) // this chunk is waste
- RecycleFirstChunk(chunk);
- else
+ if (recycle && chunk.next != null) {
+ // this chunk is waste
+ chunk = Interlocked.CompareExchange(ref m_first, chunk.next, chunk);
+ } else {
return result; // this chunk is usable and returned actual result
+ }
if (result) // this chunk is waste but the true result is always actual
return true;
-
- // try again
- chunk = m_first;
- }
-
- // the queue is empty
- value = default(T);
- return false;
+ } while (true);
}
///
@@ -295,10 +279,9 @@ namespace Implab.Parallels {
throw new ArgumentOutOfRangeException("length");
var chunk = m_first;
- bool recycle;
dequeued = 0;
- while (chunk != null) {
-
+ do {
+ bool recycle;
int actual;
if (chunk.TryDequeueBatch(buffer, offset, length, out actual, out recycle)) {
offset += actual;
@@ -306,18 +289,16 @@ namespace Implab.Parallels {
dequeued += actual;
}
- if (recycle) // this chunk is waste
- RecycleFirstChunk(chunk);
- else if (actual == 0)
- break; // the chunk is usable, but doesn't contain any data (it's the last chunk in the queue)
+ if (recycle && chunk.next != null) {
+ // this chunk is waste
+ chunk = Interlocked.CompareExchange(ref m_first, chunk.next, chunk);
+ } else {
+ chunk = null;
+ }
if (length == 0)
return true;
-
- // we still may dequeue something
- // try again
- chunk = m_first;
- }
+ } while (chunk != null);
return dequeued != 0;
}
@@ -339,123 +320,81 @@ namespace Implab.Parallels {
throw new ArgumentOutOfRangeException("length");
var chunk = m_first;
- bool recycle;
- dequeued = 0;
-
- while (chunk != null) {
+ do {
+ bool recycle;
+ chunk.TryDequeueBatch(buffer, offset, length, out dequeued, out recycle);
- int actual;
- if (chunk.TryDequeueBatch(buffer, offset, length, out actual, out recycle)) {
- dequeued = actual;
+ if (recycle && chunk.next != null) {
+ // this chunk is waste
+ chunk = Interlocked.CompareExchange(ref m_first, chunk.next, chunk);
+ } else {
+ chunk = null;
}
- if (recycle) // this chunk is waste
- RecycleFirstChunk(chunk);
-
// if we have dequeued any data, then return
if (dequeued != 0)
return true;
- // we still may dequeue something
- // try again
- chunk = m_first;
- }
+ } while (chunk != null);
return false;
}
-
- bool EnqueueChunk(Chunk last, Chunk chunk) {
- if (Interlocked.CompareExchange(ref m_last, chunk, last) != last)
- return false;
-
- if (last != null)
- last.next = chunk;
- else {
- m_first = chunk;
- }
- return true;
- }
-
- void RecycleFirstChunk(Chunk first) {
- var next = first.next;
-
- if (first != Interlocked.CompareExchange(ref m_first, next, first))
- return;
-
- if (next == null) {
-
- if (first != Interlocked.CompareExchange(ref m_last, null, first)) {
-
- // race
- // someone already updated the tail, restore the pointer to the queue head
- m_first = first;
- }
- // the tail is updated
- }
- }
+
public void Clear() {
// start the new queue
var chunk = new Chunk(DEFAULT_CHUNK_SIZE);
-
do {
- Thread.MemoryBarrier();
var first = m_first;
- var last = m_last;
-
- if (last == null) // nothing to clear
- return;
-
- if (first == null || (first.next == null && first != last)) // inconcistency
+ if (first.next == null && first != m_last) {
continue;
-
- // here we will create inconsistency which will force others to spin
- // and prevent from fetching. chunk.next = null
- if (first != Interlocked.CompareExchange(ref m_first, chunk, first))
- continue;// inconsistent
-
- m_last = chunk;
-
- return;
-
- } while(true);
- }
-
- public T[] Drain() {
- // start the new queue
- var chunk = new Chunk(DEFAULT_CHUNK_SIZE);
-
- do {
- Thread.MemoryBarrier();
- var first = m_first;
- var last = m_last;
-
- if (last == null)
- return new T[0];
-
- if (first == null || (first.next == null && first != last))
- continue;
+ }
// here we will create inconsistency which will force others to spin
// and prevent from fetching. chunk.next = null
if (first != Interlocked.CompareExchange(ref m_first, chunk, first))
continue;// inconsistent
- last = Interlocked.Exchange(ref m_last, chunk);
+ m_last = chunk;
+ return;
+ } while (true);
+ }
+
+ public List Drain() {
+ // start the new queue
+ var chunk = new Chunk(DEFAULT_CHUNK_SIZE);
+
+ do {
+ var first = m_first;
+ // first.next is volatile
+ if (first.next == null) {
+ if (first != m_last)
+ continue;
+ else if (first.Hi == first.Low)
+ return new List();
+ }
+
+ // here we will create inconsistency which will force others to spin
+ // and prevent from fetching. chunk.next = null
+ if (first != Interlocked.CompareExchange(ref m_first, chunk, first))
+ continue;// inconsistent
+
+ var last = Interlocked.Exchange(ref m_last, chunk);
return ReadChunks(first, last);
- } while(true);
+ } while (true);
}
-
- static T[] ReadChunks(Chunk chunk, object last) {
+
+ static List ReadChunks(Chunk chunk, object last) {
var result = new List();
- var buffer = new T[DEFAULT_CHUNK_SIZE];
+ var buffer = new T[MAX_CHUNK_SIZE];
int actual;
bool recycle;
+ SpinWait spin = new SpinWait();
while (chunk != null) {
// ensure all write operations on the chunk are complete
- chunk.Commit();
+ chunk.Seal();
// we need to read the chunk using this way
// since some client still may completing the dequeue
@@ -467,12 +406,12 @@ namespace Implab.Parallels {
chunk = null;
} else {
while (chunk.next == null)
- Thread.MemoryBarrier();
+ spin.SpinOnce();
chunk = chunk.next;
}
}
- return result.ToArray();
+ return result;
}
struct ArraySegmentCollection : ICollection {
@@ -501,7 +440,7 @@ namespace Implab.Parallels {
}
public void CopyTo(T[] array, int arrayIndex) {
- Array.Copy(m_data,m_offset,array,arrayIndex, m_length);
+ Array.Copy(m_data, m_offset, array, arrayIndex, m_length);
}
public bool Remove(T item) {
diff --git a/Implab/Parallels/BlockingQueue.cs b/Implab/Parallels/BlockingQueue.cs
--- a/Implab/Parallels/BlockingQueue.cs
+++ b/Implab/Parallels/BlockingQueue.cs
@@ -5,13 +5,13 @@ namespace Implab.Parallels {
public class BlockingQueue : AsyncQueue {
readonly object m_lock = new object();
- public override void Enqueue(T value) {
+ public void EnqueuePulse(T value) {
base.Enqueue(value);
lock (m_lock)
Monitor.Pulse(m_lock);
}
- public override void EnqueueRange(T[] data, int offset, int length) {
+ public void EnqueueRangePulse(T[] data, int offset, int length) {
base.EnqueueRange(data, offset, length);
if (length > 1)
lock (m_lock)
diff --git a/Implab/Parallels/MTQueue.cs b/Implab/Parallels/MTQueue.cs
deleted file mode 100644
--- a/Implab/Parallels/MTQueue.cs
+++ /dev/null
@@ -1,143 +0,0 @@
-using System.Threading;
-using System.Collections.Generic;
-using System;
-using System.Collections;
-
-namespace Implab.Parallels {
- public class MTQueue : IEnumerable {
- class Node {
- public Node(T value) {
- this.value = value;
- }
- public readonly T value;
- public Node next;
- }
-
- Node m_first;
- Node m_last;
-
- public void Enqueue(T value) {
- Thread.MemoryBarrier();
-
- var last = m_last;
- var next = new Node(value);
-
- // Interlocaked.CompareExchange implies Thread.MemoryBarrier();
- // to ensure that the next node is completely constructed
- while (last != Interlocked.CompareExchange(ref m_last, next, last))
- last = m_last;
-
- if (last != null)
- last.next = next;
- else
- m_first = next;
- }
-
- public bool TryDequeue(out T value) {
- Node first;
- Node next;
- value = default(T);
-
- Thread.MemoryBarrier();
- do {
- first = m_first;
- if (first == null)
- return false;
- next = first.next;
- if (next == null) {
- // this is the last element,
- // then try to update the tail
- if (first != Interlocked.CompareExchange(ref m_last, null, first)) {
- // this is the race condition
- if (m_last == null)
- // the queue is empty
- return false;
- // tail has been changed, we need to restart
- continue;
- }
-
- // tail succesfully updated and first.next will never be changed
- // other readers will fail due to inconsistency m_last != m_fist && m_first.next == null
- // however the parallel writer may update the m_first since the m_last is null
-
- // so we need to fix inconsistency by setting m_first to null or if it has been
- // updated by the writer already then we should just to give up
- Interlocked.CompareExchange(ref m_first, null, first);
- break;
-
- }
- if (first == Interlocked.CompareExchange(ref m_first, next, first))
- // head succesfully updated
- break;
- } while (true);
-
- value = first.value;
- return true;
- }
-
- #region IEnumerable implementation
-
- class Enumerator : IEnumerator {
- Node m_current;
- Node m_first;
-
- public Enumerator(Node first) {
- m_first = first;
- }
-
- #region IEnumerator implementation
-
- public bool MoveNext() {
- m_current = m_current == null ? m_first : m_current.next;
- return m_current != null;
- }
-
- public void Reset() {
- m_current = null;
- }
-
- object IEnumerator.Current {
- get {
- if (m_current == null)
- throw new InvalidOperationException();
- return m_current.value;
- }
- }
-
- #endregion
-
- #region IDisposable implementation
-
- public void Dispose() {
- }
-
- #endregion
-
- #region IEnumerator implementation
-
- public T Current {
- get {
- if (m_current == null)
- throw new InvalidOperationException();
- return m_current.value;
- }
- }
-
- #endregion
- }
-
- public IEnumerator GetEnumerator() {
- return new Enumerator(m_first);
- }
-
- #endregion
-
- #region IEnumerable implementation
-
- IEnumerator IEnumerable.GetEnumerator() {
- return GetEnumerator();
- }
-
- #endregion
- }
-}
diff --git a/Implab/Parallels/SimpleAsyncQueue.cs b/Implab/Parallels/SimpleAsyncQueue.cs
new file mode 100644
--- /dev/null
+++ b/Implab/Parallels/SimpleAsyncQueue.cs
@@ -0,0 +1,131 @@
+using System.Threading;
+using System.Collections.Generic;
+using System;
+using System.Collections;
+
+namespace Implab.Parallels {
+ public class SimpleAsyncQueue : IEnumerable {
+ class Node {
+ public Node(T value) {
+ this.value = value;
+ }
+ public readonly T value;
+ public volatile Node next;
+ }
+
+ // the reader and the writer are mainteined completely independent,
+ // the reader can read next item when m_first.next is not null
+ // the writer creates the a new node, moves m_last to this node and
+ // only after that restores the reference from the previous node
+ // making available the reader to read the new node.
+
+ Node m_first; // position on the node which is already read
+ Node m_last; // position on the node which is already written
+
+ public SimpleAsyncQueue() {
+ m_first = m_last = new Node(default(T));
+ }
+
+ public void Enqueue(T value) {
+ var next = new Node(value);
+
+ // Interlocaked.CompareExchange implies Thread.MemoryBarrier();
+ // to ensure that the next node is completely constructed
+ var last = Interlocked.Exchange(ref m_last, next);
+
+ // release-fence
+ last.next = next;
+
+ }
+
+ public bool TryDequeue(out T value) {
+ Node first;
+ Node next;
+
+ Thread.MemoryBarrier(); // ensure m_first is fresh
+ SpinWait spin = new SpinWait();
+ do {
+ first = m_first;
+ // aquire-fence
+ next = first.next;
+ if (next == null) {
+ value = default(T);
+ return false;
+ }
+
+ if (first == Interlocked.CompareExchange(ref m_first, next, first))
+ // head succesfully updated
+ break;
+ spin.SpinOnce();
+ } while (true);
+
+ value = next.value;
+ return true;
+ }
+
+ #region IEnumerable implementation
+
+ class Enumerator : IEnumerator {
+ Node m_current;
+ Node m_first;
+
+ public Enumerator(Node first) {
+ m_first = first;
+ }
+
+ #region IEnumerator implementation
+
+ public bool MoveNext() {
+ m_current = m_current == null ? m_first : m_current.next;
+ return m_current != null;
+ }
+
+ public void Reset() {
+ m_current = null;
+ }
+
+ object IEnumerator.Current {
+ get {
+ if (m_current == null)
+ throw new InvalidOperationException();
+ return m_current.value;
+ }
+ }
+
+ #endregion
+
+ #region IDisposable implementation
+
+ public void Dispose() {
+ }
+
+ #endregion
+
+ #region IEnumerator implementation
+
+ public T Current {
+ get {
+ if (m_current == null)
+ throw new InvalidOperationException();
+ return m_current.value;
+ }
+ }
+
+ #endregion
+ }
+
+ public IEnumerator GetEnumerator() {
+ return new Enumerator(m_first);
+ }
+
+ #endregion
+
+ #region IEnumerable implementation
+
+ IEnumerator IEnumerable.GetEnumerator() {
+ return GetEnumerator();
+ }
+
+ #endregion
+ }
+}