##// END OF EJS Templates
shared locks + tests
cin -
r136:e9e7940c7d98 v2
parent child
Show More
@@ -1,779 +1,852
1 using System;
1 using System;
2 using System.Reflection;
2 using System.Reflection;
3 using System.Threading;
3 using System.Threading;
4 using Implab.Parallels;
4 using Implab.Parallels;
5
5
6 #if MONO
6 #if MONO
7
7
8 using NUnit.Framework;
8 using NUnit.Framework;
9 using TestClassAttribute = NUnit.Framework.TestFixtureAttribute;
9 using TestClassAttribute = NUnit.Framework.TestFixtureAttribute;
10 using TestMethod = NUnit.Framework.TestAttribute;
10 using TestMethod = NUnit.Framework.TestAttribute;
11
11
12 #else
12 #else
13
13
14 using Microsoft.VisualStudio.TestTools.UnitTesting;
14 using Microsoft.VisualStudio.TestTools.UnitTesting;
15
15
16 #endif
16 #endif
17
17
18 namespace Implab.Test {
18 namespace Implab.Test {
19 [TestClass]
19 [TestClass]
20 public class AsyncTests {
20 public class AsyncTests {
21 [TestMethod]
21 [TestMethod]
22 public void ResolveTest() {
22 public void ResolveTest() {
23 int res = -1;
23 int res = -1;
24 var p = new Promise<int>();
24 var p = new Promise<int>();
25 p.Then(x => res = x);
25 p.Then(x => res = x);
26 p.Resolve(100);
26 p.Resolve(100);
27
27
28 Assert.AreEqual(100, res);
28 Assert.AreEqual(100, res);
29 }
29 }
30
30
31 [TestMethod]
31 [TestMethod]
32 public void RejectTest() {
32 public void RejectTest() {
33 int res = -1;
33 int res = -1;
34 Exception err = null;
34 Exception err = null;
35
35
36 var p = new Promise<int>();
36 var p = new Promise<int>();
37 p.Then(
37 p.Then(
38 x => res = x,
38 x => res = x,
39 e => {
39 e => {
40 err = e;
40 err = e;
41 return -2;
41 return -2;
42 }
42 }
43 );
43 );
44 p.Reject(new ApplicationException("error"));
44 p.Reject(new ApplicationException("error"));
45
45
46 Assert.AreEqual(res, -1);
46 Assert.AreEqual(res, -1);
47 Assert.AreEqual(err.Message, "error");
47 Assert.AreEqual(err.Message, "error");
48
48
49 }
49 }
50
50
51 [TestMethod]
51 [TestMethod]
52 public void CancelExceptionTest() {
52 public void CancelExceptionTest() {
53 var p = new Promise<bool>();
53 var p = new Promise<bool>();
54 p.Cancel();
54 p.Cancel();
55
55
56 var p2 = p.Cancelled(() => {
56 var p2 = p.Cancelled(() => {
57 throw new ApplicationException("CANCELLED");
57 throw new ApplicationException("CANCELLED");
58 });
58 });
59
59
60 try {
60 try {
61 p2.Join();
61 p2.Join();
62 Assert.Fail();
62 Assert.Fail();
63 } catch (ApplicationException err) {
63 } catch (ApplicationException err) {
64 Assert.AreEqual("CANCELLED", err.InnerException.Message);
64 Assert.AreEqual("CANCELLED", err.InnerException.Message);
65 }
65 }
66
66
67 }
67 }
68
68
69 [TestMethod]
69 [TestMethod]
70 public void ContinueOnCancelTest() {
70 public void ContinueOnCancelTest() {
71 var p = new Promise<bool>();
71 var p = new Promise<bool>();
72 p.Cancel();
72 p.Cancel();
73
73
74 var p2 = p
74 var p2 = p
75 .Cancelled<bool>(() => {
75 .Cancelled<bool>(() => {
76 throw new ApplicationException("CANCELLED");
76 throw new ApplicationException("CANCELLED");
77 })
77 })
78 .Error(e => true);
78 .Error(e => true);
79
79
80 Assert.AreEqual(true, p2.Join());
80 Assert.AreEqual(true, p2.Join());
81 }
81 }
82
82
83 [TestMethod]
83 [TestMethod]
84 public void JoinSuccessTest() {
84 public void JoinSuccessTest() {
85 var p = new Promise<int>();
85 var p = new Promise<int>();
86 p.Resolve(100);
86 p.Resolve(100);
87 Assert.AreEqual(p.Join(), 100);
87 Assert.AreEqual(p.Join(), 100);
88 }
88 }
89
89
90 [TestMethod]
90 [TestMethod]
91 public void JoinFailTest() {
91 public void JoinFailTest() {
92 var p = new Promise<int>();
92 var p = new Promise<int>();
93 p.Reject(new ApplicationException("failed"));
93 p.Reject(new ApplicationException("failed"));
94
94
95 try {
95 try {
96 p.Join();
96 p.Join();
97 throw new ApplicationException("WRONG!");
97 throw new ApplicationException("WRONG!");
98 } catch (TargetInvocationException err) {
98 } catch (TargetInvocationException err) {
99 Assert.AreEqual(err.InnerException.Message, "failed");
99 Assert.AreEqual(err.InnerException.Message, "failed");
100 } catch {
100 } catch {
101 Assert.Fail("Got wrong excaption");
101 Assert.Fail("Got wrong excaption");
102 }
102 }
103 }
103 }
104
104
105 [TestMethod]
105 [TestMethod]
106 public void MapTest() {
106 public void MapTest() {
107 var p = new Promise<int>();
107 var p = new Promise<int>();
108
108
109 var p2 = p.Then(x => x.ToString());
109 var p2 = p.Then(x => x.ToString());
110 p.Resolve(100);
110 p.Resolve(100);
111
111
112 Assert.AreEqual(p2.Join(), "100");
112 Assert.AreEqual(p2.Join(), "100");
113 }
113 }
114
114
115 [TestMethod]
115 [TestMethod]
116 public void FixErrorTest() {
116 public void FixErrorTest() {
117 var p = new Promise<int>();
117 var p = new Promise<int>();
118
118
119 var p2 = p.Error(e => 101);
119 var p2 = p.Error(e => 101);
120
120
121 p.Reject(new Exception());
121 p.Reject(new Exception());
122
122
123 Assert.AreEqual(p2.Join(), 101);
123 Assert.AreEqual(p2.Join(), 101);
124 }
124 }
125
125
126 [TestMethod]
126 [TestMethod]
127 public void ChainTest() {
127 public void ChainTest() {
128 var p1 = new Promise<int>();
128 var p1 = new Promise<int>();
129
129
130 var p3 = p1.Chain(x => {
130 var p3 = p1.Chain(x => {
131 var p2 = new Promise<string>();
131 var p2 = new Promise<string>();
132 p2.Resolve(x.ToString());
132 p2.Resolve(x.ToString());
133 return p2;
133 return p2;
134 });
134 });
135
135
136 p1.Resolve(100);
136 p1.Resolve(100);
137
137
138 Assert.AreEqual(p3.Join(), "100");
138 Assert.AreEqual(p3.Join(), "100");
139 }
139 }
140
140
141 [TestMethod]
141 [TestMethod]
142 public void ChainFailTest() {
142 public void ChainFailTest() {
143 var p1 = new Promise<int>();
143 var p1 = new Promise<int>();
144
144
145 var p3 = p1.Chain(x => {
145 var p3 = p1.Chain(x => {
146 var p2 = new Promise<string>();
146 var p2 = new Promise<string>();
147 p2.Reject(new Exception("DIE!!!"));
147 p2.Reject(new Exception("DIE!!!"));
148 return p2;
148 return p2;
149 });
149 });
150
150
151 p1.Resolve(100);
151 p1.Resolve(100);
152
152
153 Assert.IsTrue(p3.IsResolved);
153 Assert.IsTrue(p3.IsResolved);
154 }
154 }
155
155
156 [TestMethod]
156 [TestMethod]
157 public void PoolTest() {
157 public void PoolTest() {
158 var pid = Thread.CurrentThread.ManagedThreadId;
158 var pid = Thread.CurrentThread.ManagedThreadId;
159 var p = AsyncPool.Invoke(() => Thread.CurrentThread.ManagedThreadId);
159 var p = AsyncPool.Invoke(() => Thread.CurrentThread.ManagedThreadId);
160
160
161 Assert.AreNotEqual(pid, p.Join());
161 Assert.AreNotEqual(pid, p.Join());
162 }
162 }
163
163
164 [TestMethod]
164 [TestMethod]
165 public void WorkerPoolSizeTest() {
165 public void WorkerPoolSizeTest() {
166 var pool = new WorkerPool(5, 10, 1);
166 var pool = new WorkerPool(5, 10, 1);
167
167
168 Assert.AreEqual(5, pool.PoolSize);
168 Assert.AreEqual(5, pool.PoolSize);
169
169
170 pool.Invoke(() => { Thread.Sleep(100000000); return 10; });
170 pool.Invoke(() => { Thread.Sleep(100000000); return 10; });
171 pool.Invoke(() => { Thread.Sleep(100000000); return 10; });
171 pool.Invoke(() => { Thread.Sleep(100000000); return 10; });
172 pool.Invoke(() => { Thread.Sleep(100000000); return 10; });
172 pool.Invoke(() => { Thread.Sleep(100000000); return 10; });
173
173
174 Assert.AreEqual(5, pool.PoolSize);
174 Assert.AreEqual(5, pool.PoolSize);
175
175
176 for (int i = 0; i < 100; i++)
176 for (int i = 0; i < 100; i++)
177 pool.Invoke(() => { Thread.Sleep(100000000); return 10; });
177 pool.Invoke(() => { Thread.Sleep(100000000); return 10; });
178 Thread.Sleep(200);
178 Thread.Sleep(200);
179 Assert.AreEqual(10, pool.PoolSize);
179 Assert.AreEqual(10, pool.PoolSize);
180
180
181 pool.Dispose();
181 pool.Dispose();
182 }
182 }
183
183
184 [TestMethod]
184 [TestMethod]
185 public void WorkerPoolCorrectTest() {
185 public void WorkerPoolCorrectTest() {
186 var pool = new WorkerPool(0,1000,100);
186 var pool = new WorkerPool(0,1000,100);
187
187
188 const int iterations = 1000;
188 const int iterations = 1000;
189 int pending = iterations;
189 int pending = iterations;
190 var stop = new ManualResetEvent(false);
190 var stop = new ManualResetEvent(false);
191
191
192 var count = 0;
192 var count = 0;
193 for (int i = 0; i < iterations; i++) {
193 for (int i = 0; i < iterations; i++) {
194 pool
194 pool
195 .Invoke(() => 1)
195 .Invoke(() => 1)
196 .Then(x => Interlocked.Add(ref count, x))
196 .Then(x => Interlocked.Add(ref count, x))
197 .Then(x => Math.Log10(x))
197 .Then(x => Math.Log10(x))
198 .On(() => {
198 .On(() => {
199 Interlocked.Decrement(ref pending);
199 Interlocked.Decrement(ref pending);
200 if (pending == 0)
200 if (pending == 0)
201 stop.Set();
201 stop.Set();
202 }, PromiseEventType.All);
202 }, PromiseEventType.All);
203 }
203 }
204
204
205 stop.WaitOne();
205 stop.WaitOne();
206
206
207 Assert.AreEqual(iterations, count);
207 Assert.AreEqual(iterations, count);
208 Console.WriteLine("Max threads: {0}", pool.MaxRunningThreads);
208 Console.WriteLine("Max threads: {0}", pool.MaxRunningThreads);
209 pool.Dispose();
209 pool.Dispose();
210
210
211 }
211 }
212
212
213 [TestMethod]
213 [TestMethod]
214 public void WorkerPoolDisposeTest() {
214 public void WorkerPoolDisposeTest() {
215 var pool = new WorkerPool(5, 20);
215 var pool = new WorkerPool(5, 20);
216 Assert.AreEqual(5, pool.PoolSize);
216 Assert.AreEqual(5, pool.PoolSize);
217 pool.Dispose();
217 pool.Dispose();
218 Thread.Sleep(500);
218 Thread.Sleep(500);
219 Assert.AreEqual(0, pool.PoolSize);
219 Assert.AreEqual(0, pool.PoolSize);
220 pool.Dispose();
220 pool.Dispose();
221 }
221 }
222
222
223 [TestMethod]
223 [TestMethod]
224 public void MTQueueTest() {
224 public void MTQueueTest() {
225 var queue = new MTQueue<int>();
225 var queue = new MTQueue<int>();
226 int res;
226 int res;
227
227
228 queue.Enqueue(10);
228 queue.Enqueue(10);
229 Assert.IsTrue(queue.TryDequeue(out res));
229 Assert.IsTrue(queue.TryDequeue(out res));
230 Assert.AreEqual(10, res);
230 Assert.AreEqual(10, res);
231 Assert.IsFalse(queue.TryDequeue(out res));
231 Assert.IsFalse(queue.TryDequeue(out res));
232
232
233 for (int i = 0; i < 1000; i++)
233 for (int i = 0; i < 1000; i++)
234 queue.Enqueue(i);
234 queue.Enqueue(i);
235
235
236 for (int i = 0; i < 1000; i++) {
236 for (int i = 0; i < 1000; i++) {
237 queue.TryDequeue(out res);
237 queue.TryDequeue(out res);
238 Assert.AreEqual(i, res);
238 Assert.AreEqual(i, res);
239 }
239 }
240
240
241 int writers = 0;
241 int writers = 0;
242 int readers = 0;
242 int readers = 0;
243 var stop = new ManualResetEvent(false);
243 var stop = new ManualResetEvent(false);
244 int total = 0;
244 int total = 0;
245
245
246 const int itemsPerWriter = 10000;
246 const int itemsPerWriter = 10000;
247 const int writersCount = 10;
247 const int writersCount = 10;
248
248
249 for (int i = 0; i < writersCount; i++) {
249 for (int i = 0; i < writersCount; i++) {
250 Interlocked.Increment(ref writers);
250 Interlocked.Increment(ref writers);
251 AsyncPool
251 AsyncPool
252 .RunThread(() => {
252 .RunThread(() => {
253 for (int ii = 0; ii < itemsPerWriter; ii++) {
253 for (int ii = 0; ii < itemsPerWriter; ii++) {
254 queue.Enqueue(1);
254 queue.Enqueue(1);
255 }
255 }
256 return 1;
256 return 1;
257 })
257 })
258 .On(() => Interlocked.Decrement(ref writers), PromiseEventType.All);
258 .On(() => Interlocked.Decrement(ref writers), PromiseEventType.All);
259 }
259 }
260
260
261 for (int i = 0; i < 10; i++) {
261 for (int i = 0; i < 10; i++) {
262 Interlocked.Increment(ref readers);
262 Interlocked.Increment(ref readers);
263 AsyncPool
263 AsyncPool
264 .RunThread(() => {
264 .RunThread(() => {
265 int t;
265 int t;
266 do {
266 do {
267 while (queue.TryDequeue(out t))
267 while (queue.TryDequeue(out t))
268 Interlocked.Add(ref total, t);
268 Interlocked.Add(ref total, t);
269 } while (writers > 0);
269 } while (writers > 0);
270 return 1;
270 return 1;
271 })
271 })
272 .On(() => {
272 .On(() => {
273 Interlocked.Decrement(ref readers);
273 Interlocked.Decrement(ref readers);
274 if (readers == 0)
274 if (readers == 0)
275 stop.Set();
275 stop.Set();
276 }, PromiseEventType.All);
276 }, PromiseEventType.All);
277 }
277 }
278
278
279 stop.WaitOne();
279 stop.WaitOne();
280
280
281 Assert.AreEqual(100000, total);
281 Assert.AreEqual(100000, total);
282 }
282 }
283
283
284 [TestMethod]
284 [TestMethod]
285 public void AsyncQueueTest() {
285 public void AsyncQueueTest() {
286 var queue = new AsyncQueue<int>();
286 var queue = new AsyncQueue<int>();
287 int res;
287 int res;
288
288
289 queue.Enqueue(10);
289 queue.Enqueue(10);
290 Assert.IsTrue(queue.TryDequeue(out res));
290 Assert.IsTrue(queue.TryDequeue(out res));
291 Assert.AreEqual(10, res);
291 Assert.AreEqual(10, res);
292 Assert.IsFalse(queue.TryDequeue(out res));
292 Assert.IsFalse(queue.TryDequeue(out res));
293
293
294 for (int i = 0; i < 1000; i++)
294 for (int i = 0; i < 1000; i++)
295 queue.Enqueue(i);
295 queue.Enqueue(i);
296
296
297 for (int i = 0; i < 1000; i++) {
297 for (int i = 0; i < 1000; i++) {
298 queue.TryDequeue(out res);
298 queue.TryDequeue(out res);
299 Assert.AreEqual(i, res);
299 Assert.AreEqual(i, res);
300 }
300 }
301
301
302 const int count = 10000000;
302 const int count = 10000000;
303
303
304 int res1 = 0, res2 = 0;
304 int res1 = 0, res2 = 0;
305 var t1 = Environment.TickCount;
305 var t1 = Environment.TickCount;
306
306
307 AsyncPool.RunThread(
307 AsyncPool.RunThread(
308 () => {
308 () => {
309 for (var i = 0; i < count; i++)
309 for (var i = 0; i < count; i++)
310 queue.Enqueue(1);
310 queue.Enqueue(1);
311 Console.WriteLine("done writer #1: {0} ms", Environment.TickCount - t1);
311 Console.WriteLine("done writer #1: {0} ms", Environment.TickCount - t1);
312 },
312 },
313 () => {
313 () => {
314 for (var i = 0; i < count; i++)
314 for (var i = 0; i < count; i++)
315 queue.Enqueue(2);
315 queue.Enqueue(2);
316 Console.WriteLine("done writer #2: {0} ms", Environment.TickCount - t1);
316 Console.WriteLine("done writer #2: {0} ms", Environment.TickCount - t1);
317 },
317 },
318 () => {
318 () => {
319 int temp;
319 int temp;
320 int i = 0;
320 int i = 0;
321 while (i < count)
321 while (i < count)
322 if (queue.TryDequeue(out temp)) {
322 if (queue.TryDequeue(out temp)) {
323 i++;
323 i++;
324 res1 += temp;
324 res1 += temp;
325 }
325 }
326 Console.WriteLine("done reader #1: {0} ms", Environment.TickCount - t1);
326 Console.WriteLine("done reader #1: {0} ms", Environment.TickCount - t1);
327 },
327 },
328 () => {
328 () => {
329 int temp;
329 int temp;
330 int i = 0;
330 int i = 0;
331 while (i < count)
331 while (i < count)
332 if (queue.TryDequeue(out temp)) {
332 if (queue.TryDequeue(out temp)) {
333 i++;
333 i++;
334 res2 += temp;
334 res2 += temp;
335 }
335 }
336 Console.WriteLine("done reader #2: {0} ms", Environment.TickCount - t1);
336 Console.WriteLine("done reader #2: {0} ms", Environment.TickCount - t1);
337 }
337 }
338 )
338 )
339 .Bundle()
339 .Bundle()
340 .Join();
340 .Join();
341
341
342 Assert.AreEqual(count * 3, res1 + res2);
342 Assert.AreEqual(count * 3, res1 + res2);
343
343
344 Console.WriteLine(
344 Console.WriteLine(
345 "done: {0} ms, summ#1: {1}, summ#2: {2}, total: {3}, count: {4}",
345 "done: {0} ms, summ#1: {1}, summ#2: {2}, total: {3}, count: {4}",
346 Environment.TickCount - t1,
346 Environment.TickCount - t1,
347 res1,
347 res1,
348 res2,
348 res2,
349 res1 + res2,
349 res1 + res2,
350 count
350 count
351 );
351 );
352 }
352 }
353
353
354 [TestMethod]
354 [TestMethod]
355 public void AsyncQueueBatchTest() {
355 public void AsyncQueueBatchTest() {
356 var queue = new AsyncQueue<int>();
356 var queue = new AsyncQueue<int>();
357
357
358 const int wBatch = 29;
358 const int wBatch = 29;
359 const int wCount = 400000;
359 const int wCount = 400000;
360 const int total = wBatch * wCount * 2;
360 const int total = wBatch * wCount * 2;
361 const int summ = wBatch * wCount * 3;
361 const int summ = wBatch * wCount * 3;
362
362
363 int r1 = 0, r2 = 0;
363 int r1 = 0, r2 = 0;
364 const int rBatch = 111;
364 const int rBatch = 111;
365 int read = 0;
365 int read = 0;
366
366
367 var t1 = Environment.TickCount;
367 var t1 = Environment.TickCount;
368
368
369 AsyncPool.RunThread(
369 AsyncPool.RunThread(
370 () => {
370 () => {
371 var buffer = new int[wBatch];
371 var buffer = new int[wBatch];
372 for(int i = 0; i<wBatch; i++)
372 for(int i = 0; i<wBatch; i++)
373 buffer[i] = 1;
373 buffer[i] = 1;
374
374
375 for(int i =0; i < wCount; i++)
375 for(int i =0; i < wCount; i++)
376 queue.EnqueueRange(buffer,0,wBatch);
376 queue.EnqueueRange(buffer,0,wBatch);
377 Console.WriteLine("done writer #1: {0} ms", Environment.TickCount - t1);
377 Console.WriteLine("done writer #1: {0} ms", Environment.TickCount - t1);
378 },
378 },
379 () => {
379 () => {
380 var buffer = new int[wBatch];
380 var buffer = new int[wBatch];
381 for(int i = 0; i<wBatch; i++)
381 for(int i = 0; i<wBatch; i++)
382 buffer[i] = 2;
382 buffer[i] = 2;
383
383
384 for(int i =0; i < wCount; i++)
384 for(int i =0; i < wCount; i++)
385 queue.EnqueueRange(buffer,0,wBatch);
385 queue.EnqueueRange(buffer,0,wBatch);
386 Console.WriteLine("done writer #2: {0} ms", Environment.TickCount - t1);
386 Console.WriteLine("done writer #2: {0} ms", Environment.TickCount - t1);
387 },
387 },
388 () => {
388 () => {
389 var buffer = new int[rBatch];
389 var buffer = new int[rBatch];
390
390
391 while(read < total) {
391 while(read < total) {
392 int actual;
392 int actual;
393 if (queue.TryDequeueRange(buffer,0,rBatch,out actual)) {
393 if (queue.TryDequeueRange(buffer,0,rBatch,out actual)) {
394 for(int i=0; i< actual; i++)
394 for(int i=0; i< actual; i++)
395 r1 += buffer[i];
395 r1 += buffer[i];
396 Interlocked.Add(ref read, actual);
396 Interlocked.Add(ref read, actual);
397 }
397 }
398 }
398 }
399
399
400 Console.WriteLine("done reader #1: {0} ms", Environment.TickCount - t1);
400 Console.WriteLine("done reader #1: {0} ms", Environment.TickCount - t1);
401 },
401 },
402 () => {
402 () => {
403 var buffer = new int[rBatch];
403 var buffer = new int[rBatch];
404
404
405 while(read < total) {
405 while(read < total) {
406 int actual;
406 int actual;
407 if (queue.TryDequeueRange(buffer,0,rBatch,out actual)) {
407 if (queue.TryDequeueRange(buffer,0,rBatch,out actual)) {
408 for(int i=0; i< actual; i++)
408 for(int i=0; i< actual; i++)
409 r2 += buffer[i];
409 r2 += buffer[i];
410 Interlocked.Add(ref read, actual);
410 Interlocked.Add(ref read, actual);
411 }
411 }
412 }
412 }
413
413
414 Console.WriteLine("done reader #2: {0} ms", Environment.TickCount - t1);
414 Console.WriteLine("done reader #2: {0} ms", Environment.TickCount - t1);
415 }
415 }
416 )
416 )
417 .Bundle()
417 .Bundle()
418 .Join();
418 .Join();
419
419
420 Assert.AreEqual(summ , r1 + r2);
420 Assert.AreEqual(summ , r1 + r2);
421
421
422 Console.WriteLine(
422 Console.WriteLine(
423 "done: {0} ms, summ#1: {1}, summ#2: {2}, total: {3}, count: {4}",
423 "done: {0} ms, summ#1: {1}, summ#2: {2}, total: {3}, count: {4}",
424 Environment.TickCount - t1,
424 Environment.TickCount - t1,
425 r1,
425 r1,
426 r2,
426 r2,
427 r1 + r2,
427 r1 + r2,
428 total
428 total
429 );
429 );
430 }
430 }
431
431
432 [TestMethod]
432 [TestMethod]
433 public void AsyncQueueChunkDequeueTest() {
433 public void AsyncQueueChunkDequeueTest() {
434 var queue = new AsyncQueue<int>();
434 var queue = new AsyncQueue<int>();
435
435
436 const int wBatch = 31;
436 const int wBatch = 31;
437 const int wCount = 200000;
437 const int wCount = 200000;
438 const int total = wBatch * wCount * 3;
438 const int total = wBatch * wCount * 3;
439 const int summ = wBatch * wCount * 6;
439 const int summ = wBatch * wCount * 6;
440
440
441 int r1 = 0, r2 = 0;
441 int r1 = 0, r2 = 0;
442 const int rBatch = 1024;
442 const int rBatch = 1024;
443 int read = 0;
443 int read = 0;
444
444
445 var t1 = Environment.TickCount;
445 var t1 = Environment.TickCount;
446
446
447 AsyncPool.RunThread(
447 AsyncPool.RunThread(
448 () => {
448 () => {
449 var buffer = new int[wBatch];
449 var buffer = new int[wBatch];
450 for(int i = 0; i<wBatch; i++)
450 for(int i = 0; i<wBatch; i++)
451 buffer[i] = 1;
451 buffer[i] = 1;
452
452
453 for(int i =0; i < wCount; i++)
453 for(int i =0; i < wCount; i++)
454 queue.EnqueueRange(buffer,0,wBatch);
454 queue.EnqueueRange(buffer,0,wBatch);
455 Console.WriteLine("done writer #1: {0} ms", Environment.TickCount - t1);
455 Console.WriteLine("done writer #1: {0} ms", Environment.TickCount - t1);
456 },
456 },
457 () => {
457 () => {
458 var buffer = new int[wBatch];
458 var buffer = new int[wBatch];
459 for(int i = 0; i<wBatch; i++)
459 for(int i = 0; i<wBatch; i++)
460 buffer[i] = 2;
460 buffer[i] = 2;
461
461
462 for(int i =0; i < wCount; i++)
462 for(int i =0; i < wCount; i++)
463 queue.EnqueueRange(buffer,0,wBatch);
463 queue.EnqueueRange(buffer,0,wBatch);
464 Console.WriteLine("done writer #2: {0} ms", Environment.TickCount - t1);
464 Console.WriteLine("done writer #2: {0} ms", Environment.TickCount - t1);
465 },
465 },
466 () => {
466 () => {
467 var buffer = new int[wBatch];
467 var buffer = new int[wBatch];
468 for(int i = 0; i<wBatch; i++)
468 for(int i = 0; i<wBatch; i++)
469 buffer[i] = 3;
469 buffer[i] = 3;
470
470
471 for(int i =0; i < wCount; i++)
471 for(int i =0; i < wCount; i++)
472 queue.EnqueueRange(buffer,0,wBatch);
472 queue.EnqueueRange(buffer,0,wBatch);
473 Console.WriteLine("done writer #3: {0} ms", Environment.TickCount - t1);
473 Console.WriteLine("done writer #3: {0} ms", Environment.TickCount - t1);
474 },
474 },
475 () => {
475 () => {
476 var buffer = new int[rBatch];
476 var buffer = new int[rBatch];
477 int count = 1;
477 int count = 1;
478 double avgchunk = 0;
478 double avgchunk = 0;
479 while(read < total) {
479 while(read < total) {
480 int actual;
480 int actual;
481 if (queue.TryDequeueChunk(buffer,0,rBatch,out actual)) {
481 if (queue.TryDequeueChunk(buffer,0,rBatch,out actual)) {
482 for(int i=0; i< actual; i++)
482 for(int i=0; i< actual; i++)
483 r2 += buffer[i];
483 r2 += buffer[i];
484 Interlocked.Add(ref read, actual);
484 Interlocked.Add(ref read, actual);
485 avgchunk = avgchunk*(count-1)/count + actual/(double)count;
485 avgchunk = avgchunk*(count-1)/count + actual/(double)count;
486 count ++;
486 count ++;
487 }
487 }
488 }
488 }
489
489
490 Console.WriteLine("done reader #2: {0} ms, avg chunk size: {1}", Environment.TickCount - t1, avgchunk);
490 Console.WriteLine("done reader #2: {0} ms, avg chunk size: {1}", Environment.TickCount - t1, avgchunk);
491 }
491 }
492 )
492 )
493 .Bundle()
493 .Bundle()
494 .Join();
494 .Join();
495
495
496 Assert.AreEqual(summ , r1 + r2);
496 Assert.AreEqual(summ , r1 + r2);
497
497
498 Console.WriteLine(
498 Console.WriteLine(
499 "done: {0} ms, summ#1: {1}, summ#2: {2}, total: {3}, count: {4}",
499 "done: {0} ms, summ#1: {1}, summ#2: {2}, total: {3}, count: {4}",
500 Environment.TickCount - t1,
500 Environment.TickCount - t1,
501 r1,
501 r1,
502 r2,
502 r2,
503 r1 + r2,
503 r1 + r2,
504 total
504 total
505 );
505 );
506 }
506 }
507
507
508 [TestMethod]
508 [TestMethod]
509 public void AsyncQueueDrainTest() {
509 public void AsyncQueueDrainTest() {
510 var queue = new AsyncQueue<int>();
510 var queue = new AsyncQueue<int>();
511
511
512 const int wBatch = 11;
512 const int wBatch = 11;
513 const int wCount = 200000;
513 const int wCount = 200000;
514 const int total = wBatch * wCount * 3;
514 const int total = wBatch * wCount * 3;
515 const int summ = wBatch * wCount * 3;
515 const int summ = wBatch * wCount * 3;
516
516
517 int r1 = 0, r2 = 0;
517 int r1 = 0, r2 = 0;
518 const int rBatch = 11;
518 const int rBatch = 11;
519 int read = 0;
519 int read = 0;
520
520
521 var t1 = Environment.TickCount;
521 var t1 = Environment.TickCount;
522
522
523 AsyncPool.RunThread(
523 AsyncPool.RunThread(
524 () => {
524 () => {
525 var buffer = new int[wBatch];
525 var buffer = new int[wBatch];
526 for(int i = 0; i<wBatch; i++)
526 for(int i = 0; i<wBatch; i++)
527 buffer[i] = 1;
527 buffer[i] = 1;
528
528
529 for(int i =0; i < wCount; i++)
529 for(int i =0; i < wCount; i++)
530 queue.EnqueueRange(buffer,0,wBatch);
530 queue.EnqueueRange(buffer,0,wBatch);
531 Console.WriteLine("done writer #1: {0} ms", Environment.TickCount - t1);
531 Console.WriteLine("done writer #1: {0} ms", Environment.TickCount - t1);
532 },
532 },
533 () => {
533 () => {
534 for(int i =0; i < wCount * wBatch; i++)
534 for(int i =0; i < wCount * wBatch; i++)
535 queue.Enqueue(1);
535 queue.Enqueue(1);
536 Console.WriteLine("done writer #2: {0} ms", Environment.TickCount - t1);
536 Console.WriteLine("done writer #2: {0} ms", Environment.TickCount - t1);
537 },
537 },
538 () => {
538 () => {
539 var buffer = new int[wBatch];
539 var buffer = new int[wBatch];
540 for(int i = 0; i<wBatch; i++)
540 for(int i = 0; i<wBatch; i++)
541 buffer[i] = 1;
541 buffer[i] = 1;
542
542
543 for(int i =0; i < wCount; i++)
543 for(int i =0; i < wCount; i++)
544 queue.EnqueueRange(buffer,0,wBatch);
544 queue.EnqueueRange(buffer,0,wBatch);
545 Console.WriteLine("done writer #3: {0} ms", Environment.TickCount - t1);
545 Console.WriteLine("done writer #3: {0} ms", Environment.TickCount - t1);
546 },
546 },
547 /*() => {
547 /*() => {
548 int temp;
548 int temp;
549 int count = 0;
549 int count = 0;
550 while (read < total)
550 while (read < total)
551 if (queue.TryDequeue(out temp)) {
551 if (queue.TryDequeue(out temp)) {
552 count++;
552 count++;
553 r1 += temp;
553 r1 += temp;
554 Interlocked.Increment(ref read);
554 Interlocked.Increment(ref read);
555 }
555 }
556 Console.WriteLine("done reader #1: {0} ms, {1} count", Environment.TickCount - t1, count);
556 Console.WriteLine("done reader #1: {0} ms, {1} count", Environment.TickCount - t1, count);
557 },*/
557 },*/
558 /*() => {
558 /*() => {
559 var buffer = new int[rBatch];
559 var buffer = new int[rBatch];
560 var count = 0;
560 var count = 0;
561 while(read < total) {
561 while(read < total) {
562 int actual;
562 int actual;
563 if (queue.TryDequeueRange(buffer,0,rBatch,out actual)) {
563 if (queue.TryDequeueRange(buffer,0,rBatch,out actual)) {
564 for(int i=0; i< actual; i++)
564 for(int i=0; i< actual; i++)
565 r1 += buffer[i];
565 r1 += buffer[i];
566 Interlocked.Add(ref read, actual);
566 Interlocked.Add(ref read, actual);
567 count += actual;
567 count += actual;
568 }
568 }
569 }
569 }
570
570
571 Console.WriteLine("done reader #1: {0} ms, {1} items", Environment.TickCount - t1, count);
571 Console.WriteLine("done reader #1: {0} ms, {1} items", Environment.TickCount - t1, count);
572 },*/
572 },*/
573 () => {
573 () => {
574 var count = 0;
574 var count = 0;
575 while(read < total) {
575 while(read < total) {
576 var buffer = queue.Drain();
576 var buffer = queue.Drain();
577 for(int i=0; i< buffer.Length; i++)
577 for(int i=0; i< buffer.Length; i++)
578 r1 += buffer[i];
578 r1 += buffer[i];
579 Interlocked.Add(ref read, buffer.Length);
579 Interlocked.Add(ref read, buffer.Length);
580 count += buffer.Length;
580 count += buffer.Length;
581 }
581 }
582 Console.WriteLine("done reader #1: {0} ms, {1} items", Environment.TickCount - t1, count);
582 Console.WriteLine("done reader #1: {0} ms, {1} items", Environment.TickCount - t1, count);
583 },
583 },
584 () => {
584 () => {
585 var count = 0;
585 var count = 0;
586 while(read < total) {
586 while(read < total) {
587 var buffer = queue.Drain();
587 var buffer = queue.Drain();
588 for(int i=0; i< buffer.Length; i++)
588 for(int i=0; i< buffer.Length; i++)
589 r2 += buffer[i];
589 r2 += buffer[i];
590 Interlocked.Add(ref read, buffer.Length);
590 Interlocked.Add(ref read, buffer.Length);
591 count += buffer.Length;
591 count += buffer.Length;
592 }
592 }
593 Console.WriteLine("done reader #2: {0} ms, {1} items", Environment.TickCount - t1, count);
593 Console.WriteLine("done reader #2: {0} ms, {1} items", Environment.TickCount - t1, count);
594 }
594 }
595 )
595 )
596 .Bundle()
596 .Bundle()
597 .Join();
597 .Join();
598
598
599 Assert.AreEqual(summ , r1 + r2);
599 Assert.AreEqual(summ , r1 + r2);
600
600
601 Console.WriteLine(
601 Console.WriteLine(
602 "done: {0} ms, summ#1: {1}, summ#2: {2}, total: {3}, count: {4}",
602 "done: {0} ms, summ#1: {1}, summ#2: {2}, total: {3}, count: {4}",
603 Environment.TickCount - t1,
603 Environment.TickCount - t1,
604 r1,
604 r1,
605 r2,
605 r2,
606 r1 + r2,
606 r1 + r2,
607 total
607 total
608 );
608 );
609 }
609 }
610
610
611 [TestMethod]
611 [TestMethod]
612 public void ParallelMapTest() {
612 public void ParallelMapTest() {
613
613
614 const int count = 100000;
614 const int count = 100000;
615
615
616 var args = new double[count];
616 var args = new double[count];
617 var rand = new Random();
617 var rand = new Random();
618
618
619 for (int i = 0; i < count; i++)
619 for (int i = 0; i < count; i++)
620 args[i] = rand.NextDouble();
620 args[i] = rand.NextDouble();
621
621
622 var t = Environment.TickCount;
622 var t = Environment.TickCount;
623 var res = args.ParallelMap(x => Math.Sin(x*x), 4).Join();
623 var res = args.ParallelMap(x => Math.Sin(x*x), 4).Join();
624
624
625 Console.WriteLine("Map complete in {0} ms", Environment.TickCount - t);
625 Console.WriteLine("Map complete in {0} ms", Environment.TickCount - t);
626
626
627 t = Environment.TickCount;
627 t = Environment.TickCount;
628 for (int i = 0; i < count; i++)
628 for (int i = 0; i < count; i++)
629 Assert.AreEqual(Math.Sin(args[i] * args[i]), res[i]);
629 Assert.AreEqual(Math.Sin(args[i] * args[i]), res[i]);
630 Console.WriteLine("Verified in {0} ms", Environment.TickCount - t);
630 Console.WriteLine("Verified in {0} ms", Environment.TickCount - t);
631 }
631 }
632
632
633 [TestMethod]
633 [TestMethod]
634 public void ChainedMapTest() {
634 public void ChainedMapTest() {
635
635
636 using (var pool = new WorkerPool()) {
636 using (var pool = new WorkerPool()) {
637 const int count = 10000;
637 const int count = 10000;
638
638
639 var args = new double[count];
639 var args = new double[count];
640 var rand = new Random();
640 var rand = new Random();
641
641
642 for (int i = 0; i < count; i++)
642 for (int i = 0; i < count; i++)
643 args[i] = rand.NextDouble();
643 args[i] = rand.NextDouble();
644
644
645 var t = Environment.TickCount;
645 var t = Environment.TickCount;
646 var res = args
646 var res = args
647 .ChainedMap(
647 .ChainedMap(
648 // Analysis disable once AccessToDisposedClosure
648 // Analysis disable once AccessToDisposedClosure
649 x => pool.Invoke(
649 x => pool.Invoke(
650 () => Math.Sin(x * x)
650 () => Math.Sin(x * x)
651 ),
651 ),
652 4
652 4
653 )
653 )
654 .Join();
654 .Join();
655
655
656 Console.WriteLine("Map complete in {0} ms", Environment.TickCount - t);
656 Console.WriteLine("Map complete in {0} ms", Environment.TickCount - t);
657
657
658 t = Environment.TickCount;
658 t = Environment.TickCount;
659 for (int i = 0; i < count; i++)
659 for (int i = 0; i < count; i++)
660 Assert.AreEqual(Math.Sin(args[i] * args[i]), res[i]);
660 Assert.AreEqual(Math.Sin(args[i] * args[i]), res[i]);
661 Console.WriteLine("Verified in {0} ms", Environment.TickCount - t);
661 Console.WriteLine("Verified in {0} ms", Environment.TickCount - t);
662 Console.WriteLine("Max workers: {0}", pool.MaxRunningThreads);
662 Console.WriteLine("Max workers: {0}", pool.MaxRunningThreads);
663 }
663 }
664 }
664 }
665
665
666 [TestMethod]
666 [TestMethod]
667 public void ParallelForEachTest() {
667 public void ParallelForEachTest() {
668
668
669 const int count = 100000;
669 const int count = 100000;
670
670
671 var args = new int[count];
671 var args = new int[count];
672 var rand = new Random();
672 var rand = new Random();
673
673
674 for (int i = 0; i < count; i++)
674 for (int i = 0; i < count; i++)
675 args[i] = (int)(rand.NextDouble() * 100);
675 args[i] = (int)(rand.NextDouble() * 100);
676
676
677 int result = 0;
677 int result = 0;
678
678
679 var t = Environment.TickCount;
679 var t = Environment.TickCount;
680 args.ParallelForEach(x => Interlocked.Add(ref result, x), 4).Join();
680 args.ParallelForEach(x => Interlocked.Add(ref result, x), 4).Join();
681
681
682 Console.WriteLine("Iteration complete in {0} ms, result: {1}", Environment.TickCount - t, result);
682 Console.WriteLine("Iteration complete in {0} ms, result: {1}", Environment.TickCount - t, result);
683
683
684 int result2 = 0;
684 int result2 = 0;
685
685
686 t = Environment.TickCount;
686 t = Environment.TickCount;
687 for (int i = 0; i < count; i++)
687 for (int i = 0; i < count; i++)
688 result2 += args[i];
688 result2 += args[i];
689 Assert.AreEqual(result2, result);
689 Assert.AreEqual(result2, result);
690 Console.WriteLine("Verified in {0} ms", Environment.TickCount - t);
690 Console.WriteLine("Verified in {0} ms", Environment.TickCount - t);
691 }
691 }
692
692
693 [TestMethod]
693 [TestMethod]
694 public void ComplexCase1Test() {
694 public void ComplexCase1Test() {
695 var flags = new bool[3];
695 var flags = new bool[3];
696
696
697 // op1 (aync 200ms) => op2 (async 200ms) => op3 (sync map)
697 // op1 (aync 200ms) => op2 (async 200ms) => op3 (sync map)
698
698
699 var step1 = PromiseHelper
699 var step1 = PromiseHelper
700 .Sleep(200, "Alan")
700 .Sleep(200, "Alan")
701 .On(() => flags[0] = true, PromiseEventType.Cancelled);
701 .On(() => flags[0] = true, PromiseEventType.Cancelled);
702 var p = step1
702 var p = step1
703 .Chain(x =>
703 .Chain(x =>
704 PromiseHelper
704 PromiseHelper
705 .Sleep(200, "Hi, " + x)
705 .Sleep(200, "Hi, " + x)
706 .Then(y => y)
706 .Then(y => y)
707 .On(() => flags[1] = true, PromiseEventType.Cancelled)
707 .On(() => flags[1] = true, PromiseEventType.Cancelled)
708 )
708 )
709 .On(() => flags[2] = true, PromiseEventType.Cancelled);
709 .On(() => flags[2] = true, PromiseEventType.Cancelled);
710 step1.Join();
710 step1.Join();
711 p.Cancel();
711 p.Cancel();
712 try {
712 try {
713 Assert.AreEqual(p.Join(), "Hi, Alan");
713 Assert.AreEqual(p.Join(), "Hi, Alan");
714 Assert.Fail("Shouldn't get here");
714 Assert.Fail("Shouldn't get here");
715 } catch (OperationCanceledException) {
715 } catch (OperationCanceledException) {
716 }
716 }
717
717
718 Assert.IsFalse(flags[0]);
718 Assert.IsFalse(flags[0]);
719 Assert.IsTrue(flags[1]);
719 Assert.IsTrue(flags[1]);
720 Assert.IsTrue(flags[2]);
720 Assert.IsTrue(flags[2]);
721 }
721 }
722
722
723 [TestMethod]
723 [TestMethod]
724 public void ChainedCancel1Test() {
724 public void ChainedCancel1Test() {
725 // ΠΏΡ€ΠΈ ΠΎΡ‚ΠΌΠ΅Π½Π΅ сцСплСнной асинхронной ΠΎΠΏΠ΅Ρ€Π°Ρ†ΠΈΠΈ всС ΠΎΠ±Π΅Ρ‰Π°Π½ΠΈΠ΅ Π΄ΠΎΠ»ΠΆΠ½ΠΎ
725 // ΠΏΡ€ΠΈ ΠΎΡ‚ΠΌΠ΅Π½Π΅ сцСплСнной асинхронной ΠΎΠΏΠ΅Ρ€Π°Ρ†ΠΈΠΈ всС ΠΎΠ±Π΅Ρ‰Π°Π½ΠΈΠ΅ Π΄ΠΎΠ»ΠΆΠ½ΠΎ
726 // Π·Π°Π²Π΅Ρ€ΡˆΠ°Ρ‚ΡŒΡΡ ошибкой OperationCanceledException
726 // Π·Π°Π²Π΅Ρ€ΡˆΠ°Ρ‚ΡŒΡΡ ошибкой OperationCanceledException
727 var p = PromiseHelper
727 var p = PromiseHelper
728 .Sleep(1, "Hi, HAL!")
728 .Sleep(1, "Hi, HAL!")
729 .Then(x => {
729 .Then(x => {
730 // запускаСм Π΄Π²Π΅ асинхронныС ΠΎΠΏΠ΅Ρ€Π°Ρ†ΠΈΠΈ
730 // запускаСм Π΄Π²Π΅ асинхронныС ΠΎΠΏΠ΅Ρ€Π°Ρ†ΠΈΠΈ
731 var result = PromiseHelper.Sleep(1000, "HEM ENABLED!!!");
731 var result = PromiseHelper.Sleep(1000, "HEM ENABLED!!!");
732 // вторая опСрация отмСняСт ΠΏΠ΅Ρ€Π²ΡƒΡŽ Π΄ΠΎ Π·Π°Π²Π΅Ρ€ΡˆΠ΅Π½ΠΈΡ
732 // вторая опСрация отмСняСт ΠΏΠ΅Ρ€Π²ΡƒΡŽ Π΄ΠΎ Π·Π°Π²Π΅Ρ€ΡˆΠ΅Π½ΠΈΡ
733 PromiseHelper
733 PromiseHelper
734 .Sleep(100, "HAL, STOP!")
734 .Sleep(100, "HAL, STOP!")
735 .Then(result.Cancel);
735 .Then(result.Cancel);
736 return result;
736 return result;
737 });
737 });
738 try {
738 try {
739 p.Join();
739 p.Join();
740 } catch (TargetInvocationException err) {
740 } catch (TargetInvocationException err) {
741 Assert.IsTrue(err.InnerException is OperationCanceledException);
741 Assert.IsTrue(err.InnerException is OperationCanceledException);
742 }
742 }
743 }
743 }
744
744
745 [TestMethod]
745 [TestMethod]
746 public void ChainedCancel2Test() {
746 public void ChainedCancel2Test() {
747 // ΠΏΡ€ΠΈ ΠΎΡ‚ΠΌΠ΅Π½Π΅ Ρ†Π΅ΠΏΠΎΡ‡ΠΊΠΈ ΠΎΠ±Π΅Ρ‰Π°Π½ΠΈΠΉ, Π²Π»ΠΎΠΆΠ΅Π½Π½Ρ‹Π΅ ΠΎΠΏΠ΅Ρ€Π°Ρ†ΠΈΠΈ Ρ‚Π°ΠΊΠΆΠ΅ Π΄ΠΎΠ»ΠΆΠ½Ρ‹ ΠΎΡ‚ΠΌΠ΅Π½ΡΡ‚ΡŒΡΡ
747 // ΠΏΡ€ΠΈ ΠΎΡ‚ΠΌΠ΅Π½Π΅ Ρ†Π΅ΠΏΠΎΡ‡ΠΊΠΈ ΠΎΠ±Π΅Ρ‰Π°Π½ΠΈΠΉ, Π²Π»ΠΎΠΆΠ΅Π½Π½Ρ‹Π΅ ΠΎΠΏΠ΅Ρ€Π°Ρ†ΠΈΠΈ Ρ‚Π°ΠΊΠΆΠ΅ Π΄ΠΎΠ»ΠΆΠ½Ρ‹ ΠΎΡ‚ΠΌΠ΅Π½ΡΡ‚ΡŒΡΡ
748 var pSurvive = new Promise<bool>();
748 var pSurvive = new Promise<bool>();
749 var hemStarted = new ManualResetEvent(false);
749 var hemStarted = new ManualResetEvent(false);
750 var p = PromiseHelper
750 var p = PromiseHelper
751 .Sleep(1, "Hi, HAL!")
751 .Sleep(1, "Hi, HAL!")
752 .Chain(x => {
752 .Chain(x => {
753 hemStarted.Set();
753 hemStarted.Set();
754 // запускаСм Π΄Π²Π΅ асинхронныС ΠΎΠΏΠ΅Ρ€Π°Ρ†ΠΈΠΈ
754 // запускаСм Π΄Π²Π΅ асинхронныС ΠΎΠΏΠ΅Ρ€Π°Ρ†ΠΈΠΈ
755 var result = PromiseHelper
755 var result = PromiseHelper
756 .Sleep(100000000, "HEM ENABLED!!!")
756 .Sleep(100000000, "HEM ENABLED!!!")
757 .Then(s => {
757 .Then(s => {
758 pSurvive.Resolve(false);
758 pSurvive.Resolve(false);
759 return s;
759 return s;
760 });
760 });
761
761
762 result
762 result
763 .Cancelled(() => pSurvive.Resolve(true));
763 .Cancelled(() => pSurvive.Resolve(true));
764
764
765 return result;
765 return result;
766 });
766 });
767
767
768 hemStarted.WaitOne();
768 hemStarted.WaitOne();
769 p.Cancel();
769 p.Cancel();
770
770
771 try {
771 try {
772 p.Join();
772 p.Join();
773 } catch (OperationCanceledException) {
773 } catch (OperationCanceledException) {
774 Assert.IsTrue(pSurvive.Join());
774 Assert.IsTrue(pSurvive.Join());
775 }
775 }
776 }
776 }
777
778 [TestMethod]
779 public void SharedLockTest() {
780 var l = new SharedLock();
781 int shared = 0;
782 int exclusive = 0;
783 var s1 = new Signal();
784 var log = new AsyncQueue<string>();
785
786 try {
787 AsyncPool.RunThread(
788 () => {
789 log.Enqueue("Reader #1 started");
790 try {
791 l.LockShared();
792 log.Enqueue("Reader #1 lock got");
793 if (Interlocked.Increment(ref shared) == 2)
794 s1.Set();
795 s1.Wait();
796 log.Enqueue("Reader #1 finished");
797 Interlocked.Decrement(ref shared);
798 } finally {
799 l.Release();
800 log.Enqueue("Reader #1 lock released");
801 }
802 },
803 () => {
804 log.Enqueue("Reader #2 started");
805
806 try {
807 l.LockShared();
808 log.Enqueue("Reader #2 lock got");
809
810 if (Interlocked.Increment(ref shared) == 2)
811 s1.Set();
812 s1.Wait();
813 log.Enqueue("Reader #2 upgrading to writer");
814 Interlocked.Decrement(ref shared);
815 l.Upgrade();
816 log.Enqueue("Reader #2 upgraded");
817
818 Assert.AreEqual(1, Interlocked.Increment(ref exclusive));
819 Assert.AreEqual(0, shared);
820 log.Enqueue("Reader #2 finished");
821 Interlocked.Decrement(ref exclusive);
822 } finally {
823 l.Release();
824 log.Enqueue("Reader #2 lock released");
825 }
826 },
827 () => {
828 log.Enqueue("Writer #1 started");
829 try {
830 l.LockExclusive();
831 log.Enqueue("Writer #1 got the lock");
832 Assert.AreEqual(1, Interlocked.Increment(ref exclusive));
833 Interlocked.Decrement(ref exclusive);
834 log.Enqueue("Writer #1 is finished");
835 } finally {
836 l.Release();
837 log.Enqueue("Writer #1 lock released");
838 }
839 }
840 ).Bundle().Join(1000);
841 log.Enqueue("Done");
842 } catch(Exception error) {
843 log.Enqueue(error.Message);
844 throw;
845 } finally {
846 foreach (var m in log)
847 Console.WriteLine(m);
848 }
849 }
777 }
850 }
778 }
851 }
779
852
@@ -1,76 +1,202
1 using System;
1 using System;
2 using System.Threading;
2 using System.Threading;
3 using System.Diagnostics;
3 using System.Diagnostics;
4
4
5 namespace Implab.Parallels {
5 namespace Implab.Parallels {
6 /// <summary>
6 /// <summary>
7 /// Implements a lightweight mechanism to aquire a shared or an exclusive lock.
7 /// Implements a lightweight mechanism to aquire a shared or an exclusive lock.
8 /// </summary>
8 /// </summary>
9 public class SharedLock {
9 public class SharedLock {
10 readonly object m_lock = new object();
10 readonly object m_lock = new object();
11 // the count of locks currently acquired by clients
11 int m_locks;
12 int m_locks;
13 // the count of pending requests for upgrade
14 int m_upgrades;
12 bool m_exclusive;
15 bool m_exclusive;
13
16
14 public bool LockExclusive(int timeout) {
17 public bool LockExclusive(int timeout) {
15 lock (m_lock) {
18 lock (m_lock) {
16 if (m_locks > 0 && !Monitor.Wait(m_lock, timeout))
19 var dt = timeout;
17 return false;
20 if (m_locks > m_upgrades) {
21 var t1 = Environment.TickCount;
22 do {
23 if (!Monitor.Wait(m_lock, timeout))
24 return false;
25
26 if (m_locks == m_upgrades)
27 break;
28
29 if (timeout > 0) {
30 dt = timeout - Environment.TickCount + t1;
31 if (dt < 0)
32 return false;
33 }
34 } while(true);
35 }
18 m_exclusive = true;
36 m_exclusive = true;
19 m_locks = 1;
37 m_locks ++;
20 return true;
38 return true;
21 }
39 }
22 }
40 }
23
41
24 public void LockExclusive() {
42 public void LockExclusive() {
25 LockExclusive(-1);
43 lock (m_lock) {
44
45 while (m_locks > m_upgrades)
46 Monitor.Wait(m_lock);
47
48 m_exclusive = true;
49 m_locks ++;
50 }
26 }
51 }
27
52
53 /// <summary>
54 /// Acquires a shared lock.
55 /// </summary>
56 /// <returns><c>true</c>, if the shared lock was acquired, <c>false</c> if the specified timeout was expired.</returns>
57 /// <param name="timeout">Timeout.</param>
28 public bool LockShared(int timeout) {
58 public bool LockShared(int timeout) {
29 lock (m_lock) {
59 lock (m_lock) {
30 if (!m_exclusive) {
60 if (!m_exclusive) {
31 m_locks++;
61 m_locks++;
32 return true;
62 return true;
33 }
63 }
34
64
35 if (m_locks == 0) {
65 if (m_locks == m_upgrades) {
36 m_exclusive = false;
66 m_exclusive = false;
37 m_locks = 1;
67 m_locks = 1;
38 return true;
68 return true;
39 }
69 }
40
70
41 if (Monitor.Wait(m_lock, timeout)) {
71 var t1 = Environment.TickCount;
42 Debug.Assert(m_locks == 0);
72 var dt = timeout;
43 m_locks = 1;
73 do {
74 if (!Monitor.Wait(m_lock, dt))
75 return false;
76
77 if (m_locks == m_upgrades || !m_exclusive)
78 break;
79
80 if (timeout >= 0) {
81 dt = timeout - Environment.TickCount + t1;
82 if (dt < 0)
83 return false;
84 }
85 } while(true);
86
87 m_locks ++;
88 m_exclusive = false;
89 return true;
90 }
91 }
92
93 /// <summary>
94 /// Acquires the shared lock.
95 /// </summary>
96 public void LockShared() {
97 lock (m_lock) {
98 if (!m_exclusive) {
99 m_locks++;
100 } else if (m_locks == m_upgrades) {
44 m_exclusive = false;
101 m_exclusive = false;
45 return true;
102 m_locks++;
103 } else {
104 while (m_exclusive && m_locks > m_upgrades)
105 Monitor.Wait(m_lock);
106
107 m_locks++;
108 m_exclusive = false;
46 }
109 }
47 return false;
48 }
110 }
49 }
111 }
50
112
51 public void LockShared() {
113 /// <summary>
52 LockShared(-1);
114 /// Upgrades the current lock to exclusive level.
53 }
115 /// </summary>
54
116 /// <remarks>If the current lock is exclusive already the method does nothing.</remarks>
55 public void ReleaseShared() {
117 public void Upgrade() {
56 lock (m_lock) {
118 lock (m_lock) {
57 if (m_exclusive || m_locks <= 0)
119 if (!m_exclusive) {
58 throw new InvalidOperationException();
120
59 m_locks--;
121 if (m_locks <= m_upgrades)
60 if (m_locks == 0)
122 throw new InvalidOperationException();
61 Monitor.PulseAll(m_lock);
123
124 if (m_locks - m_upgrades == 1) {
125 m_exclusive = true;
126 } else {
127 m_upgrades++;
128
129 while (m_locks > m_upgrades)
130 Monitor.Wait(m_lock);
131
132 m_upgrades--;
133 m_exclusive = true;
134 }
135 }
62 }
136 }
63 }
137 }
64
138
65 public void ReleaseExclusive() {
139 /// <summary>
140 /// Upgrades the current lock to exclusive level.
141 /// </summary>
142 /// <param name="timeout">Timeout.</param>
143 /// <returns><c>true</c> if the current lock was updated, <c>false</c> the specified timeout was expired.</returns>
144 /// <remarks>If the current lock is exclusive already the method does nothing.</remarks>
145 public bool Upgrade(int timeout) {
66 lock (m_lock) {
146 lock (m_lock) {
67 if (!m_exclusive && m_locks != 1)
147 if (m_exclusive)
148 return true;
149 if (m_locks <= m_upgrades)
68 throw new InvalidOperationException();
150 throw new InvalidOperationException();
69 m_locks = 0;
151
70 Monitor.PulseAll(m_lock);
152 if (m_locks - m_upgrades == 1) {
153 m_exclusive = true;
154 } else {
155 var t1 = Environment.TickCount;
156 var dt = timeout;
157 m_upgrades++;
158 do {
159 if (!Monitor.Wait(m_lock, dt)) {
160 m_upgrades--;
161 return false;
162 }
163
164 // we may get there but the shared lock already aquired
165 if (m_locks == m_upgrades)
166 break;
167
168 if (timeout >= 0) {
169 dt = timeout - Environment.TickCount + t1;
170 if (dt < 0) {
171 m_upgrades--;
172 return false;
173 }
174 }
175 } while(true);
176 m_upgrades--;
177 m_exclusive = true;
178 }
179 return true;
71 }
180 }
72 }
181 }
73
182
183 /// <summary>
184 /// Downgrades this lock to shared level.
185 /// </summary>
186 public void Downgrade() {
187 lock (m_lock)
188 m_exclusive = false;
189 }
190
191 /// <summary>
192 /// Releases the current lock.
193 /// </summary>
194 public void Release() {
195 lock (m_lock)
196 // if no more running threads left
197 if (--m_locks == m_upgrades)
198 Monitor.PulseAll(m_lock);
199 }
74 }
200 }
75 }
201 }
76
202
@@ -1,187 +1,192
1 using System.Threading;
1 using System.Threading;
2 using System;
2 using System;
3 using Implab.Diagnostics;
3 using Implab.Diagnostics;
4 using System.Collections.Generic;
4 using System.Collections.Generic;
5
5
6
6
7 #if NET_4_5
7 #if NET_4_5
8 using System.Threading.Tasks;
8 using System.Threading.Tasks;
9 #endif
9 #endif
10
10
11 namespace Implab {
11 namespace Implab {
12 public static class PromiseExtensions {
12 public static class PromiseExtensions {
13 public static IPromise<T> DispatchToCurrentContext<T>(this IPromise<T> that) {
13 public static IPromise<T> DispatchToCurrentContext<T>(this IPromise<T> that) {
14 Safe.ArgumentNotNull(that, "that");
14 Safe.ArgumentNotNull(that, "that");
15 var context = SynchronizationContext.Current;
15 var context = SynchronizationContext.Current;
16 if (context == null)
16 if (context == null)
17 return that;
17 return that;
18
18
19 var p = new SyncContextPromise<T>(context);
19 var p = new SyncContextPromise<T>(context);
20 p.On(that.Cancel, PromiseEventType.Cancelled);
20 p.On(that.Cancel, PromiseEventType.Cancelled);
21
21
22 that.On(
22 that.On(
23 p.Resolve,
23 p.Resolve,
24 p.Reject,
24 p.Reject,
25 p.Cancel
25 p.Cancel
26 );
26 );
27 return p;
27 return p;
28 }
28 }
29
29
30 public static IPromise<T> DispatchToContext<T>(this IPromise<T> that, SynchronizationContext context) {
30 public static IPromise<T> DispatchToContext<T>(this IPromise<T> that, SynchronizationContext context) {
31 Safe.ArgumentNotNull(that, "that");
31 Safe.ArgumentNotNull(that, "that");
32 Safe.ArgumentNotNull(context, "context");
32 Safe.ArgumentNotNull(context, "context");
33
33
34 var p = new SyncContextPromise<T>(context);
34 var p = new SyncContextPromise<T>(context);
35 p.On(that.Cancel, PromiseEventType.Cancelled);
35 p.On(that.Cancel, PromiseEventType.Cancelled);
36
36
37
37
38 that.On(
38 that.On(
39 p.Resolve,
39 p.Resolve,
40 p.Reject,
40 p.Reject,
41 p.Cancel
41 p.Cancel
42 );
42 );
43 return p;
43 return p;
44 }
44 }
45
45
46 /// <summary>
46 /// <summary>
47 /// Ensures the dispatched.
47 /// Ensures the dispatched.
48 /// </summary>
48 /// </summary>
49 /// <returns>The dispatched.</returns>
49 /// <returns>The dispatched.</returns>
50 /// <param name="that">That.</param>
50 /// <param name="that">That.</param>
51 /// <param name="head">Head.</param>
51 /// <param name="head">Head.</param>
52 /// <param name="cleanup">Cleanup.</param>
52 /// <param name="cleanup">Cleanup.</param>
53 /// <typeparam name="TPromise">The 1st type parameter.</typeparam>
53 /// <typeparam name="TPromise">The 1st type parameter.</typeparam>
54 /// <typeparam name="T">The 2nd type parameter.</typeparam>
54 /// <typeparam name="T">The 2nd type parameter.</typeparam>
55 public static TPromise EnsureDispatched<TPromise,T>(this TPromise that, IPromise<T> head, Action<T> cleanup) where TPromise : IPromise{
55 public static TPromise EnsureDispatched<TPromise,T>(this TPromise that, IPromise<T> head, Action<T> cleanup) where TPromise : IPromise{
56 Safe.ArgumentNotNull(that, "that");
56 Safe.ArgumentNotNull(that, "that");
57 Safe.ArgumentNotNull(head, "head");
57 Safe.ArgumentNotNull(head, "head");
58
58
59 that.On(null,null,() => head.On(cleanup));
59 that.On(null,null,() => head.On(cleanup));
60
60
61 return that;
61 return that;
62 }
62 }
63
63
64 public static AsyncCallback AsyncCallback<T>(this Promise<T> that, Func<IAsyncResult,T> callback) {
64 public static AsyncCallback AsyncCallback<T>(this Promise<T> that, Func<IAsyncResult,T> callback) {
65 Safe.ArgumentNotNull(that, "that");
65 Safe.ArgumentNotNull(that, "that");
66 Safe.ArgumentNotNull(callback, "callback");
66 Safe.ArgumentNotNull(callback, "callback");
67 var op = TraceContext.Instance.CurrentOperation;
67 var op = TraceContext.Instance.CurrentOperation;
68 return ar => {
68 return ar => {
69 TraceContext.Instance.EnterLogicalOperation(op,false);
69 TraceContext.Instance.EnterLogicalOperation(op,false);
70 try {
70 try {
71 that.Resolve(callback(ar));
71 that.Resolve(callback(ar));
72 } catch (Exception err) {
72 } catch (Exception err) {
73 that.Reject(err);
73 that.Reject(err);
74 } finally {
74 } finally {
75 TraceContext.Instance.Leave();
75 TraceContext.Instance.Leave();
76 }
76 }
77 };
77 };
78 }
78 }
79
79
80 static void CancelCallback(object cookie) {
80 static void CancelCallback(object cookie) {
81 ((ICancellable)cookie).Cancel();
81 ((ICancellable)cookie).Cancel();
82 }
82 }
83
83
84 /// <summary>
84 /// <summary>
85 /// Cancells promise after the specified timeout is elapsed.
85 /// Cancells promise after the specified timeout is elapsed.
86 /// </summary>
86 /// </summary>
87 /// <param name="that">The promise to cancel on timeout.</param>
87 /// <param name="that">The promise to cancel on timeout.</param>
88 /// <param name="milliseconds">The timeout in milliseconds.</param>
88 /// <param name="milliseconds">The timeout in milliseconds.</param>
89 /// <typeparam name="TPromise">The 1st type parameter.</typeparam>
89 /// <typeparam name="TPromise">The 1st type parameter.</typeparam>
90 public static TPromise Timeout<TPromise>(this TPromise that, int milliseconds) where TPromise : IPromise {
90 public static TPromise Timeout<TPromise>(this TPromise that, int milliseconds) where TPromise : IPromise {
91 Safe.ArgumentNotNull(that, "that");
91 Safe.ArgumentNotNull(that, "that");
92 var timer = new Timer(CancelCallback, that, milliseconds, -1);
92 var timer = new Timer(CancelCallback, that, milliseconds, -1);
93 that.On(timer.Dispose, PromiseEventType.All);
93 that.On(timer.Dispose, PromiseEventType.All);
94 return that;
94 return that;
95 }
95 }
96
96
97 public static IPromise Bundle(this ICollection<IPromise> that) {
97 public static IPromise Bundle(this ICollection<IPromise> that) {
98 Safe.ArgumentNotNull(that, "that");
98 Safe.ArgumentNotNull(that, "that");
99
99
100 int count = that.Count;
100 int count = that.Count;
101 int errors = 0;
101 int errors = 0;
102 var medium = new Promise();
102 var medium = new Promise();
103
103
104 if (count == 0) {
105 medium.Resolve();
106 return medium;
107 }
108
104 medium.On(() => {
109 medium.On(() => {
105 foreach(var p2 in that)
110 foreach(var p2 in that)
106 p2.Cancel();
111 p2.Cancel();
107 }, PromiseEventType.ErrorOrCancel);
112 }, PromiseEventType.ErrorOrCancel);
108
113
109 foreach (var p in that)
114 foreach (var p in that)
110 p.On(
115 p.On(
111 () => {
116 () => {
112 if (Interlocked.Decrement(ref count) == 0)
117 if (Interlocked.Decrement(ref count) == 0)
113 medium.Resolve();
118 medium.Resolve();
114 },
119 },
115 error => {
120 error => {
116 if (Interlocked.Increment(ref errors) == 1)
121 if (Interlocked.Increment(ref errors) == 1)
117 medium.Reject(
122 medium.Reject(
118 new Exception("The dependency promise is failed", error)
123 new Exception("The dependency promise is failed", error)
119 );
124 );
120 },
125 },
121 () => {
126 () => {
122 if (Interlocked.Increment(ref errors) == 1)
127 if (Interlocked.Increment(ref errors) == 1)
123 medium.Reject(
128 medium.Reject(
124 new Exception("The dependency promise is cancelled")
129 new Exception("The dependency promise is cancelled")
125 );
130 );
126 }
131 }
127 );
132 );
128
133
129 return medium;
134 return medium;
130 }
135 }
131
136
132 public static IPromise<T[]> Bundle<T>(this ICollection<IPromise<T>> that) {
137 public static IPromise<T[]> Bundle<T>(this ICollection<IPromise<T>> that) {
133 Safe.ArgumentNotNull(that, "that");
138 Safe.ArgumentNotNull(that, "that");
134
139
135 int count = that.Count;
140 int count = that.Count;
136 int errors = 0;
141 int errors = 0;
137 var medium = new Promise<T[]>();
142 var medium = new Promise<T[]>();
138 var results = new T[that.Count];
143 var results = new T[that.Count];
139
144
140 medium.On(() => {
145 medium.On(() => {
141 foreach(var p2 in that)
146 foreach(var p2 in that)
142 p2.Cancel();
147 p2.Cancel();
143 }, PromiseEventType.ErrorOrCancel);
148 }, PromiseEventType.ErrorOrCancel);
144
149
145 int i = 0;
150 int i = 0;
146 foreach (var p in that) {
151 foreach (var p in that) {
147 var idx = i;
152 var idx = i;
148 p.On(
153 p.On(
149 x => {
154 x => {
150 results[idx] = x;
155 results[idx] = x;
151 if (Interlocked.Decrement(ref count) == 0)
156 if (Interlocked.Decrement(ref count) == 0)
152 medium.Resolve(results);
157 medium.Resolve(results);
153 },
158 },
154 error => {
159 error => {
155 if (Interlocked.Increment(ref errors) == 1)
160 if (Interlocked.Increment(ref errors) == 1)
156 medium.Reject(
161 medium.Reject(
157 new Exception("The dependency promise is failed", error)
162 new Exception("The dependency promise is failed", error)
158 );
163 );
159 },
164 },
160 () => {
165 () => {
161 if (Interlocked.Increment(ref errors) == 1)
166 if (Interlocked.Increment(ref errors) == 1)
162 medium.Reject(
167 medium.Reject(
163 new Exception("The dependency promise is cancelled")
168 new Exception("The dependency promise is cancelled")
164 );
169 );
165 }
170 }
166 );
171 );
167 i++;
172 i++;
168 }
173 }
169
174
170 return medium;
175 return medium;
171 }
176 }
172
177
173 #if NET_4_5
178 #if NET_4_5
174
179
175 public static Task<T> GetTask<T>(this IPromise<T> that) {
180 public static Task<T> GetTask<T>(this IPromise<T> that) {
176 Safe.ArgumentNotNull(that, "that");
181 Safe.ArgumentNotNull(that, "that");
177 var tcs = new TaskCompletionSource<T>();
182 var tcs = new TaskCompletionSource<T>();
178
183
179 that.On(tcs.SetResult, tcs.SetException, tcs.SetCanceled);
184 that.On(tcs.SetResult, tcs.SetException, tcs.SetCanceled);
180
185
181 return tcs.Task;
186 return tcs.Task;
182 }
187 }
183
188
184 #endif
189 #endif
185 }
190 }
186 }
191 }
187
192
@@ -1,34 +1,93
1 using System;
1 using System;
2 using Implab.Diagnostics;
2 using Implab.Diagnostics;
3 using Implab.Parallels;
3 using Implab.Parallels;
4 using Implab;
4 using Implab;
5 using System.Collections.Generic;
5 using System.Collections.Generic;
6 using System.Collections.Concurrent;
6 using System.Collections.Concurrent;
7 using System.Threading;
7
8
8 namespace MonoPlay {
9 namespace MonoPlay {
9 class MainClass {
10 class MainClass {
10 public static void Main(string[] args) {
11 public static void Main(string[] args) {
11 if (args == null)
12 if (args == null)
12 throw new ArgumentNullException("args");
13 throw new ArgumentNullException("args");
13
14
14 const int count = 10000000;
15
16 var t1 = Environment.TickCount;
15 var t1 = Environment.TickCount;
17
16
18 for (int i = 0; i < count; i++) {
17 const int reads = 100000;
19 var p = new Promise<int>();
18 const int writes = 1000;
19 const int readThreads = 8;
20 const int writeThreads = 0;
21
22 var l = new SharedLock();
23 var st = new HashSet<int>();
20
24
21 p.On(x => {}).On(x => {});
25 Action reader1 = () => {
26 for (int i =0; i < reads; i++) {
27 try {
28 l.LockShared();
29 st.Contains(i % 1000);
30 Thread.Sleep(0);
31 } finally {
32 l.Release();
33 }
34 }
35 };
36
37 Action reader2 = () => {
38 for(var i = 0; i < reads; i++)
39 lock(st) {
40 st.Contains(i % 1000);
41 Thread.Sleep(0);
42 }
43 };
22
44
23 p.Resolve(i);
45 Action writer1 = () => {
46 var rnd = new Random(Environment.TickCount);
47 for (int i = 0; i < writes; i++) {
48 try {
49 l.LockExclusive();
50 st.Add(rnd.Next(1000));
51 //Thread.Sleep(1);
52 } finally {
53 l.Release();
54 }
55 }
56 };
24
57
25 }
58 Action writer2 = () => {
59 var rnd = new Random(Environment.TickCount);
60 for (int i = 0; i < writes; i++) {
61 lock (st) {
62 st.Add(rnd.Next(1000));
63 //Thread.Sleep(1);
64 }
65 }
66 };
67
68
26
69
27
70 var readers = new IPromise[readThreads];
71 for (int i = 0; i < readThreads; i++)
72 readers[i] = AsyncPool.RunThread(reader1);
73
74 var writers = new IPromise[writeThreads];
75 for (int i = 0; i < writeThreads; i++)
76 writers[i] = AsyncPool.RunThread(writer1);
77
78
79 new [] {
80 readers.Bundle().On(() => Console.WriteLine("readers complete in {0} ms", Environment.TickCount - t1)),
81 writers.Bundle().On(() => Console.WriteLine("writers complete in {0} ms", Environment.TickCount - t1))
82 }.Bundle().Join();
83
84
28
85
29 var t2 = Environment.TickCount;
86 var t2 = Environment.TickCount;
30 Console.WriteLine("done: {0} ms, {1:.00} Mb, {2} GC", t2 - t1, GC.GetTotalMemory(false) / (1024*1024), GC.CollectionCount(0) );
87 Console.WriteLine("done: {0} ms, {1:.00} Mb, {2} GC", t2 - t1, GC.GetTotalMemory(false) / (1024*1024), GC.CollectionCount(0) );
31
88
32 }
89 }
90
91
33 }
92 }
34 }
93 }
General Comments 0
You need to be logged in to leave comments. Login now