##// END OF EJS Templates
added memory barriers
cin -
r80:4f20870d0816 v2
parent child
Show More
@@ -0,0 +1,85
1 using System;
2 using Implab.Parallels;
3 using System.Threading;
4
5 namespace Implab {
6 /*public class SyncPool<T> : IDisposable {
7 readonly Func<T> m_factory;
8 readonly Action<T> m_cleanup;
9 readonly int m_size;
10 readonly MTQueue<T> m_queue = new MTQueue<T>();
11
12 volatile bool m_disposed;
13
14 volatile int m_count;
15
16 public SyncPool(Func<T> factory, Action<T> cleanup, int size) {
17 Safe.ArgumentNotNull(factory, "factory");
18 Safe.ArgumentInRange(size, 1, size, "size");
19
20 m_factory = factory;
21 m_cleanup = cleanup;
22 m_size = size;
23 }
24
25 public SyncPool(Func<T> factory, Action<T> cleanup) : this(factory,cleanup,Environment.ProcessorCount+1) {
26 }
27
28 public SyncPool(Func<T> factory) : this(factory,null,Environment.ProcessorCount+1) {
29 }
30
31 public SyncPoolWrapper<T> Allocate() {
32 if (m_disposed)
33 throw new ObjectDisposedException(this.ToString());
34
35 T instance;
36 if (m_queue.TryDequeue(out instance)) {
37 Interlocked.Decrement(ref m_count);
38 return instance;
39 } else {
40 instance = m_factory();
41 }
42 return new SyncPoolWrapper<T>(instance, this);
43 }
44
45 public void Release(T instance) {
46 if (m_count < m_size && !m_disposed) {
47 Interlocked.Increment(ref m_count);
48
49 if (m_cleanup != null)
50 m_cleanup(instance);
51
52 m_queue.Enqueue(instance);
53
54 // пока элемент возвращался в кеш, была начата операция освобождения всего кеша
55 // и возможно уже законцена, в таком случае следует извлечь элемент обратно и
56 // освободить его. Если операция освобождения кеша еще не заврешилась, то будет
57 // изъят и освобожден произвольный элемен, что не повлияет на ход всего процесса.
58 if (m_disposed && m_queue.TryDequeue(out instance))
59 Safe.Dispose(instance);
60
61 } else {
62 Safe.Dispose(instance);
63 }
64 }
65
66 protected virtual void Dispose(bool disposing) {
67 if (disposing) {
68 m_disposed = true;
69 T instance;
70 while (m_queue.TryDequeue(out instance))
71 Safe.Dispose(instance);
72 }
73 }
74
75 #region IDisposable implementation
76
77 public void Dispose() {
78 Dispose(true);
79 GC.SuppressFinalize(this);
80 }
81
82 #endregion
83 }*/
84 }
85
@@ -0,0 +1,24
1 using System;
2
3 namespace Implab {
4 /*public struct SyncPoolWrapper<T> : IDisposable {
5 readonly T m_value;
6 readonly SyncPool<T> m_pool;
7
8 internal SyncPoolWrapper(T value, SyncPool<T> pool) {
9 m_value = value;
10 m_pool = pool;
11 }
12
13 public T Value {
14 get { return m_value; }
15 }
16
17 #region IDisposable implementation
18 public void Dispose() {
19 m_pool.Release(m_value);
20 }
21 #endregion
22 }*/
23 }
24
@@ -291,7 +291,7 namespace Implab.Test {
291 291 [TestMethod]
292 292 public void ChainedMapTest() {
293 293
294 using (var pool = new WorkerPool(0,100,100)) {
294 using (var pool = new WorkerPool(0,10,100)) {
295 295 const int count = 10000;
296 296
297 297 var args = new double[count];
@@ -16,8 +16,6 Project("{2150E333-8FDC-42A3-9474-1A3956
16 16 EndProject
17 17 Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Implab.Test.mono", "Implab.Test\Implab.Test.mono.csproj", "{2BD05F84-E067-4B87-9477-FDC2676A21C6}"
18 18 EndProject
19 Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Implab.Fx.Test.mono", "Implab.Fx.Test\Implab.Fx.Test.mono.csproj", "{2BD05F84-E067-4B87-9477-FDC2676A21C6}"
20 EndProject
21 19 Global
22 20 GlobalSection(SolutionConfigurationPlatforms) = preSolution
23 21 Debug|Any CPU = Debug|Any CPU
@@ -77,7 +75,6 Global
77 75 EndGlobalSection
78 76 GlobalSection(NestedProjects) = preSolution
79 77 {2BD05F84-E067-4B87-9477-FDC2676A21C6} = {BCA337C3-BFDC-4825-BBDB-E6D467E4E452}
80 {2BD05F84-E067-4B87-9477-FDC2676A21C6} = {BCA337C3-BFDC-4825-BBDB-E6D467E4E452}
81 78 EndGlobalSection
82 79 GlobalSection(MonoDevelopProperties) = preSolution
83 80 StartupItem = Implab\Implab.csproj
@@ -7,6 +7,8
7 7 <OutputType>Library</OutputType>
8 8 <RootNamespace>Implab</RootNamespace>
9 9 <AssemblyName>Implab</AssemblyName>
10 <ProductVersion>8.0.30703</ProductVersion>
11 <SchemaVersion>2.0</SchemaVersion>
10 12 </PropertyGroup>
11 13 <PropertyGroup Condition=" '$(Configuration)|$(Platform)' == 'Debug|AnyCPU' ">
12 14 <DebugSymbols>true</DebugSymbols>
@@ -121,6 +123,8
121 123 <Compile Include="PromiseExtensions.cs" />
122 124 <Compile Include="TransientPromiseException.cs" />
123 125 <Compile Include="SyncContextPromise.cs" />
126 <Compile Include="SyncPool.cs" />
127 <Compile Include="SyncPoolWrapper.cs" />
124 128 </ItemGroup>
125 129 <Import Project="$(MSBuildBinPath)\Microsoft.CSharp.targets" />
126 130 <ItemGroup />
@@ -19,7 +19,9 namespace Implab.JSON {
19 19 }
20 20
21 21 /// <summary>
22 /// Интерпретировать массивы как множественные элементы (убирает один уровень вложенности)
22 /// Интерпретировать массивы как множественные элементы (убирает один уровень вложенности), иначе массив
23 /// представляется в виде узла, дочерними элементами которого являются элементы массива, имена дочерних элементов
24 /// определяются свойством <see cref="ArrayItemName"/>. По умолчанию <c>false</c>.
23 25 /// </summary>
24 26 public bool FlattenArrays {
25 27 get;
@@ -44,6 +46,7 namespace Implab.JSON {
44 46
45 47 /// <summary>
46 48 /// Имя элемента для массивов, если не включена опция <see cref="FlattenArrays"/>.
49 /// По умолчанию <c>item</c>.
47 50 /// </summary>
48 51 public string ArrayItemName {
49 52 get;
@@ -153,7 +153,8 namespace Implab.Parallels {
153 153 var res = new TDst[source.Length];
154 154 var pending = source.Length;
155 155
156 var semaphore = new Semaphore(threads, threads);
156 object locker = new object();
157 int slots = threads;
157 158
158 159 // Analysis disable AccessToDisposedClosure
159 160 AsyncPool.InvokeNewThread(() => {
@@ -162,22 +163,28 namespace Implab.Parallels {
162 163 break; // stop processing in case of error or cancellation
163 164 var idx = i;
164 165
165 semaphore.WaitOne();
166 lock(locker) {
167 while(slots == 0)
168 Monitor.Wait(locker);
169 slots--;
170 }
166 171 try {
167 var p1 = transform(source[i]);
168 p1.Anyway(() => semaphore.Release());
169 p1.Then(
170 x => {
171 res[idx] = x;
172 var left = Interlocked.Decrement(ref pending);
173 if (left == 0)
174 promise.Resolve(res);
175 },
176 e => {
177 promise.Reject(e);
178 throw new TransientPromiseException(e);
179 }
180 );
172 transform(source[i])
173 .Anyway(() => {
174 lock(locker) {
175 slots ++;
176 Monitor.Pulse(locker);
177 }
178 })
179 .Last(
180 x => {
181 res[idx] = x;
182 var left = Interlocked.Decrement(ref pending);
183 if (left == 0)
184 promise.Resolve(res);
185 },
186 e => promise.Reject(e)
187 );
181 188
182 189 } catch (Exception e) {
183 190 promise.Reject(e);
@@ -186,7 +193,7 namespace Implab.Parallels {
186 193 return 0;
187 194 });
188 195
189 return promise.Anyway(semaphore.Dispose);
196 return promise;
190 197 }
191 198
192 199 }
@@ -9,16 +9,17 namespace Implab.Parallels {
9 9 public abstract class DispatchPool<TUnit> : IDisposable {
10 10 readonly int m_minThreads;
11 11 readonly int m_maxThreads;
12 readonly int m_releaseTimeout = 100; // the timeout while the working thread will wait for the new tasks before exit
12 13
13 14 int m_createdThreads = 0; // the current size of the pool
14 15 int m_activeThreads = 0; // the count of threads which are active
15 16 int m_sleepingThreads = 0; // the count of currently inactive threads
16 17 int m_maxRunningThreads = 0; // the meximum reached size of the pool
17 18 int m_exitRequired = 0; // the pool is going to shutdown, all unused workers are released
18 int m_releaseTimeout = 100; // the timeout while the working thread will wait for the new tasks before exit
19
19 20 int m_wakeEvents = 0; // the count of wake events
20 21
21 AutoResetEvent m_hasTasks = new AutoResetEvent(false);
22 readonly object m_signalLocker = new object();
22 23
23 24 protected DispatchPool(int min, int max) {
24 25 if (min < 0)
@@ -51,68 +52,76 namespace Implab.Parallels {
51 52
52 53 public int PoolSize {
53 54 get {
55 Thread.MemoryBarrier();
54 56 return m_createdThreads;
55 57 }
56 58 }
57 59
58 60 public int ActiveThreads {
59 61 get {
62 Thread.MemoryBarrier();
60 63 return m_activeThreads;
61 64 }
62 65 }
63 66
64 67 public int MaxRunningThreads {
65 68 get {
69 Thread.MemoryBarrier();
66 70 return m_maxRunningThreads;
67 71 }
68 72 }
69 73
70 74 protected bool IsDisposed {
71 75 get {
72 return m_exitRequired != 0;
76 Thread.MemoryBarrier();
77 return m_exitRequired == 1;
73 78 }
74 79 }
75 80
76 81 protected abstract bool TryDequeue(out TUnit unit);
77 82
78 #region thread execution traits
83 #region thread signaling traits
79 84 int SignalThread() {
80 85 var signals = Interlocked.Increment(ref m_wakeEvents);
81 86 if(signals == 1)
82 m_hasTasks.Set();
87 lock(m_signalLocker)
88 Monitor.Pulse(m_signalLocker);
83 89 return signals;
84 90 }
85 91
86 92 bool FetchSignalOrWait(int timeout) {
87 93 var start = Environment.TickCount;
88
89 // означает, что поток владеет блокировкой и при успешном получении сигнала должен
90 // ее вернуть, чтобы другой ожидающий поток смог
91 bool hasLock = false;
94 int signals;
95 Thread.MemoryBarrier(); // m_wakeEvents volatile first read
92 96 do {
93 int signals;
94 do {
95 signals = m_wakeEvents;
96 if (signals == 0)
97 break;
98 } while (Interlocked.CompareExchange(ref m_wakeEvents, signals - 1, signals) != signals);
97 signals = m_wakeEvents;
98 if (signals == 0)
99 break;
100 } while (Interlocked.CompareExchange(ref m_wakeEvents, signals - 1, signals) != signals);
99 101
100 if (signals >= 1) {
101 if (signals > 1 && hasLock)
102 m_hasTasks.Set();
102 if (signals == 0) {
103 // no signal is fetched
104 lock(m_signalLocker) {
105 while(m_wakeEvents == 0) {
106 if (timeout != -1)
107 timeout = Math.Max(0, timeout - (Environment.TickCount - start));
108 if(!Monitor.Wait(m_signalLocker,timeout))
109 return false; // timeout
110 }
111 // m_wakeEvents > 0
112 if (Interlocked.Decrement(ref m_wakeEvents) > 0) //syncronized
113 Monitor.Pulse(m_signalLocker);
114
115 // signal fetched
103 116 return true;
104 117 }
105 118
106 if (timeout != -1)
107 timeout = Math.Max(0, timeout - (Environment.TickCount - start));
119 } else {
120 // signal fetched
121 return true;
122 }
108 123
109 // если сигналов больше не осталось, то первый поток, который дошел сюда сбросит событие
110 // и уйдет на пустой цикл, после чего заблокируется
111 124
112 hasLock = true;
113 } while (m_hasTasks.WaitOne(timeout));
114
115 return false;
116 125 }
117 126
118 127 bool Sleep(int timeout) {
@@ -131,7 +140,8 namespace Implab.Parallels {
131 140 /// Запускает либо новый поток, если раньше не было ни одного потока, либо устанавливает событие пробуждение одного спящего потока
132 141 /// </summary>
133 142 protected void GrowPool() {
134 if (m_exitRequired != 0)
143 Thread.MemoryBarrier();
144 if (m_exitRequired == 1)
135 145 return;
136 146 if (m_sleepingThreads > m_wakeEvents) {
137 147 //Console.WriteLine("Waking threads (sleeps {0}, pending {1})", m_sleepingThreads, m_wakeEvents);
@@ -204,7 +214,7 namespace Implab.Parallels {
204 214 // use spins to allocate slot for the new thread
205 215 do {
206 216 current = m_createdThreads;
207 if (current >= m_maxThreads || m_exitRequired != 0)
217 if (current >= m_maxThreads || m_exitRequired == 1)
208 218 // no more slots left or the pool has been disposed
209 219 return false;
210 220 } while (current != Interlocked.CompareExchange(ref m_createdThreads, current + 1, current));
@@ -227,6 +237,7 namespace Implab.Parallels {
227 237 last = false;
228 238 int current;
229 239 // use spins to release slot for the new thread
240 Thread.MemoryBarrier();
230 241 do {
231 242 current = m_createdThreads;
232 243 if (current <= m_minThreads && m_exitRequired == 0)
@@ -264,6 +275,7 namespace Implab.Parallels {
264 275 // slot successfully allocated
265 276 var worker = new Thread(this.Worker);
266 277 worker.IsBackground = true;
278 Interlocked.Increment(ref m_activeThreads);
267 279 worker.Start();
268 280
269 281 return true;
@@ -277,15 +289,14 namespace Implab.Parallels {
277 289 protected virtual void Worker() {
278 290 TUnit unit;
279 291 //Console.WriteLine("{0}: Active", Thread.CurrentThread.ManagedThreadId);
280 Interlocked.Increment(ref m_activeThreads);
292 int count = 0;;
293 Thread.MemoryBarrier();
281 294 do {
282 295 // exit if requested
283 if (m_exitRequired != 0) {
296 if (m_exitRequired == 1) {
284 297 // release the thread slot
285 298 Interlocked.Decrement(ref m_activeThreads);
286 if (ReleaseThreadSlotAnyway()) // it was the last worker
287 m_hasTasks.Dispose();
288 else
299 if (!ReleaseThreadSlotAnyway()) // it was the last worker
289 300 SignalThread(); // wake next worker
290 301 break;
291 302 }
@@ -293,14 +304,17 namespace Implab.Parallels {
293 304 // fetch task
294 305 if (TryDequeue(out unit)) {
295 306 InvokeUnit(unit);
307 count ++;
296 308 continue;
297 309 }
298 310 Interlocked.Decrement(ref m_activeThreads);
299 311
312 Console.WriteLine("{0}: Suspend processed({1})", Thread.CurrentThread.ManagedThreadId,count);
300 313 // entering suspend state
301 314 // keep this thread and wait
302 315 if (!Suspend())
303 316 break;
317 count = 0;
304 318 //Console.WriteLine("{0}: Awake", Thread.CurrentThread.ManagedThreadId);
305 319 Interlocked.Increment(ref m_activeThreads);
306 320 } while (true);
@@ -309,15 +323,10 namespace Implab.Parallels {
309 323
310 324 protected virtual void Dispose(bool disposing) {
311 325 if (disposing) {
312 if (m_exitRequired == 0) {
313 if (Interlocked.CompareExchange(ref m_exitRequired, 1, 0) != 0)
314 return;
315
326 if (0 == Interlocked.CompareExchange(ref m_exitRequired, 1, 0)) { // implies memory barrier
316 327 // wake sleeping threads
317 328 if (m_createdThreads > 0)
318 329 SignalThread();
319 else
320 m_hasTasks.Dispose();
321 330 GC.SuppressFinalize(this);
322 331 }
323 332 }
@@ -18,6 +18,8 namespace Implab.Parallels {
18 18 Node m_last;
19 19
20 20 public void Enqueue(T value) {
21 Thread.MemoryBarrier();
22
21 23 var last = m_last;
22 24 var next = new Node(value);
23 25
@@ -35,6 +37,7 namespace Implab.Parallels {
35 37 Node next = null;
36 38 value = default(T);
37 39
40 Thread.MemoryBarrier();
38 41 do {
39 42 first = m_first;
40 43 if (first == null)
@@ -12,20 +12,24 namespace Implab.Parallels {
12 12 MTQueue<Action> m_queue = new MTQueue<Action>();
13 13 int m_queueLength = 0;
14 14 readonly int m_threshold = 1;
15 int m_workers = 0;
15 16
16 17 public WorkerPool(int minThreads, int maxThreads, int threshold)
17 18 : base(minThreads, maxThreads) {
18 19 m_threshold = threshold;
20 m_workers = minThreads;
19 21 InitPool();
20 22 }
21 23
22 24 public WorkerPool(int minThreads, int maxThreads) :
23 25 base(minThreads, maxThreads) {
26 m_workers = minThreads;
24 27 InitPool();
25 28 }
26 29
27 30 public WorkerPool(int threads)
28 31 : base(threads) {
32 m_workers = threads;
29 33 InitPool();
30 34 }
31 35
@@ -62,8 +66,10 namespace Implab.Parallels {
62 66 var len = Interlocked.Increment(ref m_queueLength);
63 67 m_queue.Enqueue(unit);
64 68
65 if (len > m_threshold*ActiveThreads)
69 if (len > m_threshold * m_workers) {
70 Interlocked.Increment(ref m_workers);
66 71 GrowPool();
72 }
67 73 }
68 74
69 75 protected override bool TryDequeue(out Action unit) {
@@ -85,8 +91,10 namespace Implab.Parallels {
85 91 // Suspend
86 92 // queueLength > 0
87 93 // continue
94 Thread.MemoryBarrier();
88 95 if (m_queueLength > 0)
89 96 return true;
97 Interlocked.Decrement(ref m_workers);
90 98 return base.Suspend();
91 99 }
92 100
@@ -142,18 +142,20 namespace Implab {
142 142
143 143 void WaitTransition() {
144 144 while (m_state == TRANSITIONAL_STATE) {
145 /* noop */
145 Thread.MemoryBarrier();
146 146 }
147 147 }
148 148
149 149 public bool IsResolved {
150 150 get {
151 Thread.MemoryBarrier();
151 152 return m_state > 1;
152 153 }
153 154 }
154 155
155 156 public bool IsCancelled {
156 157 get {
158 Thread.MemoryBarrier();
157 159 return m_state == CANCELLED_STATE;
158 160 }
159 161 }
General Comments 0
You need to be logged in to leave comments. Login now