# HG changeset patch
# User cin
# Date 2014-09-25 23:32:34
# Node ID 4f20870d0816a16b35008113dc90205a7b180cef
# Parent 05e6468f066fe4b05297d6a473d3f4b50db6167d
added memory barriers
diff --git a/Implab.Test/AsyncTests.cs b/Implab.Test/AsyncTests.cs
--- a/Implab.Test/AsyncTests.cs
+++ b/Implab.Test/AsyncTests.cs
@@ -291,7 +291,7 @@ namespace Implab.Test {
[TestMethod]
public void ChainedMapTest() {
- using (var pool = new WorkerPool(0,100,100)) {
+ using (var pool = new WorkerPool(0,10,100)) {
const int count = 10000;
var args = new double[count];
diff --git a/Implab.mono.sln b/Implab.mono.sln
--- a/Implab.mono.sln
+++ b/Implab.mono.sln
@@ -16,8 +16,6 @@ Project("{2150E333-8FDC-42A3-9474-1A3956
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Implab.Test.mono", "Implab.Test\Implab.Test.mono.csproj", "{2BD05F84-E067-4B87-9477-FDC2676A21C6}"
EndProject
-Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Implab.Fx.Test.mono", "Implab.Fx.Test\Implab.Fx.Test.mono.csproj", "{2BD05F84-E067-4B87-9477-FDC2676A21C6}"
-EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
Debug|Any CPU = Debug|Any CPU
@@ -77,7 +75,6 @@ Global
EndGlobalSection
GlobalSection(NestedProjects) = preSolution
{2BD05F84-E067-4B87-9477-FDC2676A21C6} = {BCA337C3-BFDC-4825-BBDB-E6D467E4E452}
- {2BD05F84-E067-4B87-9477-FDC2676A21C6} = {BCA337C3-BFDC-4825-BBDB-E6D467E4E452}
EndGlobalSection
GlobalSection(MonoDevelopProperties) = preSolution
StartupItem = Implab\Implab.csproj
diff --git a/Implab/Implab.csproj b/Implab/Implab.csproj
--- a/Implab/Implab.csproj
+++ b/Implab/Implab.csproj
@@ -7,6 +7,8 @@
Library
Implab
Implab
+ 8.0.30703
+ 2.0
true
@@ -121,6 +123,8 @@
+
+
diff --git a/Implab/JSON/JSONXmlReaderOptions.cs b/Implab/JSON/JSONXmlReaderOptions.cs
--- a/Implab/JSON/JSONXmlReaderOptions.cs
+++ b/Implab/JSON/JSONXmlReaderOptions.cs
@@ -19,7 +19,9 @@ namespace Implab.JSON {
}
///
- /// Интерпретировать массивы как множественные элементы (убирает один уровень вложенности)
+ /// Интерпретировать массивы как множественные элементы (убирает один уровень вложенности), иначе массив
+ /// представляется в виде узла, дочерними элементами которого являются элементы массива, имена дочерних элементов
+ /// определяются свойством . По умолчанию false.
///
public bool FlattenArrays {
get;
@@ -44,6 +46,7 @@ namespace Implab.JSON {
///
/// Имя элемента для массивов, если не включена опция .
+ /// По умолчанию item.
///
public string ArrayItemName {
get;
diff --git a/Implab/Parallels/ArrayTraits.cs b/Implab/Parallels/ArrayTraits.cs
--- a/Implab/Parallels/ArrayTraits.cs
+++ b/Implab/Parallels/ArrayTraits.cs
@@ -153,7 +153,8 @@ namespace Implab.Parallels {
var res = new TDst[source.Length];
var pending = source.Length;
- var semaphore = new Semaphore(threads, threads);
+ object locker = new object();
+ int slots = threads;
// Analysis disable AccessToDisposedClosure
AsyncPool.InvokeNewThread(() => {
@@ -162,22 +163,28 @@ namespace Implab.Parallels {
break; // stop processing in case of error or cancellation
var idx = i;
- semaphore.WaitOne();
+ lock(locker) {
+ while(slots == 0)
+ Monitor.Wait(locker);
+ slots--;
+ }
try {
- var p1 = transform(source[i]);
- p1.Anyway(() => semaphore.Release());
- p1.Then(
- x => {
- res[idx] = x;
- var left = Interlocked.Decrement(ref pending);
- if (left == 0)
- promise.Resolve(res);
- },
- e => {
- promise.Reject(e);
- throw new TransientPromiseException(e);
- }
- );
+ transform(source[i])
+ .Anyway(() => {
+ lock(locker) {
+ slots ++;
+ Monitor.Pulse(locker);
+ }
+ })
+ .Last(
+ x => {
+ res[idx] = x;
+ var left = Interlocked.Decrement(ref pending);
+ if (left == 0)
+ promise.Resolve(res);
+ },
+ e => promise.Reject(e)
+ );
} catch (Exception e) {
promise.Reject(e);
@@ -186,7 +193,7 @@ namespace Implab.Parallels {
return 0;
});
- return promise.Anyway(semaphore.Dispose);
+ return promise;
}
}
diff --git a/Implab/Parallels/DispatchPool.cs b/Implab/Parallels/DispatchPool.cs
--- a/Implab/Parallels/DispatchPool.cs
+++ b/Implab/Parallels/DispatchPool.cs
@@ -9,16 +9,17 @@ namespace Implab.Parallels {
public abstract class DispatchPool : IDisposable {
readonly int m_minThreads;
readonly int m_maxThreads;
+ readonly int m_releaseTimeout = 100; // the timeout while the working thread will wait for the new tasks before exit
int m_createdThreads = 0; // the current size of the pool
int m_activeThreads = 0; // the count of threads which are active
int m_sleepingThreads = 0; // the count of currently inactive threads
int m_maxRunningThreads = 0; // the meximum reached size of the pool
int m_exitRequired = 0; // the pool is going to shutdown, all unused workers are released
- int m_releaseTimeout = 100; // the timeout while the working thread will wait for the new tasks before exit
+
int m_wakeEvents = 0; // the count of wake events
- AutoResetEvent m_hasTasks = new AutoResetEvent(false);
+ readonly object m_signalLocker = new object();
protected DispatchPool(int min, int max) {
if (min < 0)
@@ -51,68 +52,76 @@ namespace Implab.Parallels {
public int PoolSize {
get {
+ Thread.MemoryBarrier();
return m_createdThreads;
}
}
public int ActiveThreads {
get {
+ Thread.MemoryBarrier();
return m_activeThreads;
}
}
public int MaxRunningThreads {
get {
+ Thread.MemoryBarrier();
return m_maxRunningThreads;
}
}
protected bool IsDisposed {
get {
- return m_exitRequired != 0;
+ Thread.MemoryBarrier();
+ return m_exitRequired == 1;
}
}
protected abstract bool TryDequeue(out TUnit unit);
- #region thread execution traits
+ #region thread signaling traits
int SignalThread() {
var signals = Interlocked.Increment(ref m_wakeEvents);
if(signals == 1)
- m_hasTasks.Set();
+ lock(m_signalLocker)
+ Monitor.Pulse(m_signalLocker);
return signals;
}
bool FetchSignalOrWait(int timeout) {
var start = Environment.TickCount;
-
- // означает, что поток владеет блокировкой и при успешном получении сигнала должен
- // ее вернуть, чтобы другой ожидающий поток смог
- bool hasLock = false;
+ int signals;
+ Thread.MemoryBarrier(); // m_wakeEvents volatile first read
do {
- int signals;
- do {
- signals = m_wakeEvents;
- if (signals == 0)
- break;
- } while (Interlocked.CompareExchange(ref m_wakeEvents, signals - 1, signals) != signals);
+ signals = m_wakeEvents;
+ if (signals == 0)
+ break;
+ } while (Interlocked.CompareExchange(ref m_wakeEvents, signals - 1, signals) != signals);
- if (signals >= 1) {
- if (signals > 1 && hasLock)
- m_hasTasks.Set();
+ if (signals == 0) {
+ // no signal is fetched
+ lock(m_signalLocker) {
+ while(m_wakeEvents == 0) {
+ if (timeout != -1)
+ timeout = Math.Max(0, timeout - (Environment.TickCount - start));
+ if(!Monitor.Wait(m_signalLocker,timeout))
+ return false; // timeout
+ }
+ // m_wakeEvents > 0
+ if (Interlocked.Decrement(ref m_wakeEvents) > 0) //syncronized
+ Monitor.Pulse(m_signalLocker);
+
+ // signal fetched
return true;
}
- if (timeout != -1)
- timeout = Math.Max(0, timeout - (Environment.TickCount - start));
+ } else {
+ // signal fetched
+ return true;
+ }
- // если сигналов больше не осталось, то первый поток, который дошел сюда сбросит событие
- // и уйдет на пустой цикл, после чего заблокируется
- hasLock = true;
- } while (m_hasTasks.WaitOne(timeout));
-
- return false;
}
bool Sleep(int timeout) {
@@ -131,7 +140,8 @@ namespace Implab.Parallels {
/// Запускает либо новый поток, если раньше не было ни одного потока, либо устанавливает событие пробуждение одного спящего потока
///
protected void GrowPool() {
- if (m_exitRequired != 0)
+ Thread.MemoryBarrier();
+ if (m_exitRequired == 1)
return;
if (m_sleepingThreads > m_wakeEvents) {
//Console.WriteLine("Waking threads (sleeps {0}, pending {1})", m_sleepingThreads, m_wakeEvents);
@@ -204,7 +214,7 @@ namespace Implab.Parallels {
// use spins to allocate slot for the new thread
do {
current = m_createdThreads;
- if (current >= m_maxThreads || m_exitRequired != 0)
+ if (current >= m_maxThreads || m_exitRequired == 1)
// no more slots left or the pool has been disposed
return false;
} while (current != Interlocked.CompareExchange(ref m_createdThreads, current + 1, current));
@@ -227,6 +237,7 @@ namespace Implab.Parallels {
last = false;
int current;
// use spins to release slot for the new thread
+ Thread.MemoryBarrier();
do {
current = m_createdThreads;
if (current <= m_minThreads && m_exitRequired == 0)
@@ -264,6 +275,7 @@ namespace Implab.Parallels {
// slot successfully allocated
var worker = new Thread(this.Worker);
worker.IsBackground = true;
+ Interlocked.Increment(ref m_activeThreads);
worker.Start();
return true;
@@ -277,15 +289,14 @@ namespace Implab.Parallels {
protected virtual void Worker() {
TUnit unit;
//Console.WriteLine("{0}: Active", Thread.CurrentThread.ManagedThreadId);
- Interlocked.Increment(ref m_activeThreads);
+ int count = 0;;
+ Thread.MemoryBarrier();
do {
// exit if requested
- if (m_exitRequired != 0) {
+ if (m_exitRequired == 1) {
// release the thread slot
Interlocked.Decrement(ref m_activeThreads);
- if (ReleaseThreadSlotAnyway()) // it was the last worker
- m_hasTasks.Dispose();
- else
+ if (!ReleaseThreadSlotAnyway()) // it was the last worker
SignalThread(); // wake next worker
break;
}
@@ -293,14 +304,17 @@ namespace Implab.Parallels {
// fetch task
if (TryDequeue(out unit)) {
InvokeUnit(unit);
+ count ++;
continue;
}
Interlocked.Decrement(ref m_activeThreads);
+ Console.WriteLine("{0}: Suspend processed({1})", Thread.CurrentThread.ManagedThreadId,count);
// entering suspend state
// keep this thread and wait
if (!Suspend())
break;
+ count = 0;
//Console.WriteLine("{0}: Awake", Thread.CurrentThread.ManagedThreadId);
Interlocked.Increment(ref m_activeThreads);
} while (true);
@@ -309,15 +323,10 @@ namespace Implab.Parallels {
protected virtual void Dispose(bool disposing) {
if (disposing) {
- if (m_exitRequired == 0) {
- if (Interlocked.CompareExchange(ref m_exitRequired, 1, 0) != 0)
- return;
-
+ if (0 == Interlocked.CompareExchange(ref m_exitRequired, 1, 0)) { // implies memory barrier
// wake sleeping threads
if (m_createdThreads > 0)
SignalThread();
- else
- m_hasTasks.Dispose();
GC.SuppressFinalize(this);
}
}
diff --git a/Implab/Parallels/MTQueue.cs b/Implab/Parallels/MTQueue.cs
--- a/Implab/Parallels/MTQueue.cs
+++ b/Implab/Parallels/MTQueue.cs
@@ -18,6 +18,8 @@ namespace Implab.Parallels {
Node m_last;
public void Enqueue(T value) {
+ Thread.MemoryBarrier();
+
var last = m_last;
var next = new Node(value);
@@ -35,6 +37,7 @@ namespace Implab.Parallels {
Node next = null;
value = default(T);
+ Thread.MemoryBarrier();
do {
first = m_first;
if (first == null)
diff --git a/Implab/Parallels/WorkerPool.cs b/Implab/Parallels/WorkerPool.cs
--- a/Implab/Parallels/WorkerPool.cs
+++ b/Implab/Parallels/WorkerPool.cs
@@ -12,20 +12,24 @@ namespace Implab.Parallels {
MTQueue m_queue = new MTQueue();
int m_queueLength = 0;
readonly int m_threshold = 1;
+ int m_workers = 0;
public WorkerPool(int minThreads, int maxThreads, int threshold)
: base(minThreads, maxThreads) {
m_threshold = threshold;
+ m_workers = minThreads;
InitPool();
}
public WorkerPool(int minThreads, int maxThreads) :
base(minThreads, maxThreads) {
+ m_workers = minThreads;
InitPool();
}
public WorkerPool(int threads)
: base(threads) {
+ m_workers = threads;
InitPool();
}
@@ -62,8 +66,10 @@ namespace Implab.Parallels {
var len = Interlocked.Increment(ref m_queueLength);
m_queue.Enqueue(unit);
- if (len > m_threshold*ActiveThreads)
+ if (len > m_threshold * m_workers) {
+ Interlocked.Increment(ref m_workers);
GrowPool();
+ }
}
protected override bool TryDequeue(out Action unit) {
@@ -85,8 +91,10 @@ namespace Implab.Parallels {
// Suspend
// queueLength > 0
// continue
+ Thread.MemoryBarrier();
if (m_queueLength > 0)
return true;
+ Interlocked.Decrement(ref m_workers);
return base.Suspend();
}
diff --git a/Implab/Promise.cs b/Implab/Promise.cs
--- a/Implab/Promise.cs
+++ b/Implab/Promise.cs
@@ -142,18 +142,20 @@ namespace Implab {
void WaitTransition() {
while (m_state == TRANSITIONAL_STATE) {
- /* noop */
+ Thread.MemoryBarrier();
}
}
public bool IsResolved {
get {
+ Thread.MemoryBarrier();
return m_state > 1;
}
}
public bool IsCancelled {
get {
+ Thread.MemoryBarrier();
return m_state == CANCELLED_STATE;
}
}
diff --git a/Implab/SyncPool.cs b/Implab/SyncPool.cs
new file mode 100644
--- /dev/null
+++ b/Implab/SyncPool.cs
@@ -0,0 +1,85 @@
+using System;
+using Implab.Parallels;
+using System.Threading;
+
+namespace Implab {
+ /*public class SyncPool : IDisposable {
+ readonly Func m_factory;
+ readonly Action m_cleanup;
+ readonly int m_size;
+ readonly MTQueue m_queue = new MTQueue();
+
+ volatile bool m_disposed;
+
+ volatile int m_count;
+
+ public SyncPool(Func factory, Action cleanup, int size) {
+ Safe.ArgumentNotNull(factory, "factory");
+ Safe.ArgumentInRange(size, 1, size, "size");
+
+ m_factory = factory;
+ m_cleanup = cleanup;
+ m_size = size;
+ }
+
+ public SyncPool(Func factory, Action cleanup) : this(factory,cleanup,Environment.ProcessorCount+1) {
+ }
+
+ public SyncPool(Func factory) : this(factory,null,Environment.ProcessorCount+1) {
+ }
+
+ public SyncPoolWrapper Allocate() {
+ if (m_disposed)
+ throw new ObjectDisposedException(this.ToString());
+
+ T instance;
+ if (m_queue.TryDequeue(out instance)) {
+ Interlocked.Decrement(ref m_count);
+ return instance;
+ } else {
+ instance = m_factory();
+ }
+ return new SyncPoolWrapper(instance, this);
+ }
+
+ public void Release(T instance) {
+ if (m_count < m_size && !m_disposed) {
+ Interlocked.Increment(ref m_count);
+
+ if (m_cleanup != null)
+ m_cleanup(instance);
+
+ m_queue.Enqueue(instance);
+
+ // пока элемент возвращался в кеш, была начата операция освобождения всего кеша
+ // и возможно уже законцена, в таком случае следует извлечь элемент обратно и
+ // освободить его. Если операция освобождения кеша еще не заврешилась, то будет
+ // изъят и освобожден произвольный элемен, что не повлияет на ход всего процесса.
+ if (m_disposed && m_queue.TryDequeue(out instance))
+ Safe.Dispose(instance);
+
+ } else {
+ Safe.Dispose(instance);
+ }
+ }
+
+ protected virtual void Dispose(bool disposing) {
+ if (disposing) {
+ m_disposed = true;
+ T instance;
+ while (m_queue.TryDequeue(out instance))
+ Safe.Dispose(instance);
+ }
+ }
+
+ #region IDisposable implementation
+
+ public void Dispose() {
+ Dispose(true);
+ GC.SuppressFinalize(this);
+ }
+
+ #endregion
+ }*/
+}
+
diff --git a/Implab/SyncPoolWrapper.cs b/Implab/SyncPoolWrapper.cs
new file mode 100644
--- /dev/null
+++ b/Implab/SyncPoolWrapper.cs
@@ -0,0 +1,24 @@
+using System;
+
+namespace Implab {
+ /*public struct SyncPoolWrapper : IDisposable {
+ readonly T m_value;
+ readonly SyncPool m_pool;
+
+ internal SyncPoolWrapper(T value, SyncPool pool) {
+ m_value = value;
+ m_pool = pool;
+ }
+
+ public T Value {
+ get { return m_value; }
+ }
+
+ #region IDisposable implementation
+ public void Dispose() {
+ m_pool.Release(m_value);
+ }
+ #endregion
+ }*/
+}
+