AsyncTests.cs
863 lines
| 28.5 KiB
| text/x-csharp
|
CSharpLexer
/ Implab.Test / AsyncTests.cs
|
|
r77 | using System; | ||
| using System.Reflection; | ||||
| using System.Threading; | ||||
| using Implab.Parallels; | ||||
| #if MONO | ||||
| using NUnit.Framework; | ||||
| using TestClassAttribute = NUnit.Framework.TestFixtureAttribute; | ||||
|
|
r145 | using TestMethodAttribute = NUnit.Framework.TestAttribute; | ||
|
|
r77 | |||
| #else | ||||
| using Microsoft.VisualStudio.TestTools.UnitTesting; | ||||
| #endif | ||||
| namespace Implab.Test { | ||||
| [TestClass] | ||||
| public class AsyncTests { | ||||
| [TestMethod] | ||||
| public void ResolveTest() { | ||||
| int res = -1; | ||||
| var p = new Promise<int>(); | ||||
| p.Then(x => res = x); | ||||
| p.Resolve(100); | ||||
| Assert.AreEqual(100, res); | ||||
| } | ||||
| [TestMethod] | ||||
| public void RejectTest() { | ||||
| int res = -1; | ||||
| Exception err = null; | ||||
| var p = new Promise<int>(); | ||||
| p.Then( | ||||
| x => res = x, | ||||
| e => { | ||||
| err = e; | ||||
| return -2; | ||||
| } | ||||
| ); | ||||
| p.Reject(new ApplicationException("error")); | ||||
| Assert.AreEqual(res, -1); | ||||
| Assert.AreEqual(err.Message, "error"); | ||||
| } | ||||
| [TestMethod] | ||||
| public void CancelExceptionTest() { | ||||
| var p = new Promise<bool>(); | ||||
|
|
r145 | p.CancelOperation(null); | ||
|
|
r77 | |||
|
|
r138 | var p2 = p.Then(x => x, null, reason => { | ||
|
|
r77 | throw new ApplicationException("CANCELLED"); | ||
| }); | ||||
| try { | ||||
| p2.Join(); | ||||
| Assert.Fail(); | ||||
| } catch (ApplicationException err) { | ||||
| Assert.AreEqual("CANCELLED", err.InnerException.Message); | ||||
| } | ||||
| } | ||||
| [TestMethod] | ||||
| public void ContinueOnCancelTest() { | ||||
| var p = new Promise<bool>(); | ||||
|
|
r145 | p.CancelOperation(null); | ||
|
|
r77 | |||
| var p2 = p | ||||
|
|
r145 | .Then(x => x, null, reason => { | ||
|
|
r77 | throw new ApplicationException("CANCELLED"); | ||
| }) | ||||
|
|
r138 | .Then(x => x, e => true); | ||
|
|
r77 | |||
| Assert.AreEqual(true, p2.Join()); | ||||
| } | ||||
| [TestMethod] | ||||
| public void JoinSuccessTest() { | ||||
| var p = new Promise<int>(); | ||||
| p.Resolve(100); | ||||
| Assert.AreEqual(p.Join(), 100); | ||||
| } | ||||
| [TestMethod] | ||||
| public void JoinFailTest() { | ||||
| var p = new Promise<int>(); | ||||
| p.Reject(new ApplicationException("failed")); | ||||
| try { | ||||
| p.Join(); | ||||
| throw new ApplicationException("WRONG!"); | ||||
| } catch (TargetInvocationException err) { | ||||
| Assert.AreEqual(err.InnerException.Message, "failed"); | ||||
| } catch { | ||||
| Assert.Fail("Got wrong excaption"); | ||||
| } | ||||
| } | ||||
| [TestMethod] | ||||
| public void MapTest() { | ||||
| var p = new Promise<int>(); | ||||
| var p2 = p.Then(x => x.ToString()); | ||||
| p.Resolve(100); | ||||
| Assert.AreEqual(p2.Join(), "100"); | ||||
| } | ||||
| [TestMethod] | ||||
| public void FixErrorTest() { | ||||
| var p = new Promise<int>(); | ||||
|
|
r138 | var p2 = p.Then(x => x, e => 101); | ||
|
|
r77 | |||
| p.Reject(new Exception()); | ||||
| Assert.AreEqual(p2.Join(), 101); | ||||
| } | ||||
| [TestMethod] | ||||
| public void ChainTest() { | ||||
| var p1 = new Promise<int>(); | ||||
| var p3 = p1.Chain(x => { | ||||
| var p2 = new Promise<string>(); | ||||
| p2.Resolve(x.ToString()); | ||||
| return p2; | ||||
| }); | ||||
| p1.Resolve(100); | ||||
| Assert.AreEqual(p3.Join(), "100"); | ||||
| } | ||||
| [TestMethod] | ||||
|
|
r105 | public void ChainFailTest() { | ||
| var p1 = new Promise<int>(); | ||||
| var p3 = p1.Chain(x => { | ||||
| var p2 = new Promise<string>(); | ||||
| p2.Reject(new Exception("DIE!!!")); | ||||
| return p2; | ||||
| }); | ||||
| p1.Resolve(100); | ||||
| Assert.IsTrue(p3.IsResolved); | ||||
| } | ||||
| [TestMethod] | ||||
|
|
r77 | public void PoolTest() { | ||
| var pid = Thread.CurrentThread.ManagedThreadId; | ||||
| var p = AsyncPool.Invoke(() => Thread.CurrentThread.ManagedThreadId); | ||||
| Assert.AreNotEqual(pid, p.Join()); | ||||
| } | ||||
| [TestMethod] | ||||
| public void WorkerPoolSizeTest() { | ||||
|
|
r81 | var pool = new WorkerPool(5, 10, 1); | ||
|
|
r77 | |||
| Assert.AreEqual(5, pool.PoolSize); | ||||
| pool.Invoke(() => { Thread.Sleep(100000000); return 10; }); | ||||
| pool.Invoke(() => { Thread.Sleep(100000000); return 10; }); | ||||
| pool.Invoke(() => { Thread.Sleep(100000000); return 10; }); | ||||
| Assert.AreEqual(5, pool.PoolSize); | ||||
| for (int i = 0; i < 100; i++) | ||||
| pool.Invoke(() => { Thread.Sleep(100000000); return 10; }); | ||||
| Thread.Sleep(200); | ||||
| Assert.AreEqual(10, pool.PoolSize); | ||||
| pool.Dispose(); | ||||
| } | ||||
| [TestMethod] | ||||
| public void WorkerPoolCorrectTest() { | ||||
| var pool = new WorkerPool(0,1000,100); | ||||
| const int iterations = 1000; | ||||
| int pending = iterations; | ||||
| var stop = new ManualResetEvent(false); | ||||
| var count = 0; | ||||
| for (int i = 0; i < iterations; i++) { | ||||
| pool | ||||
| .Invoke(() => 1) | ||||
| .Then(x => Interlocked.Add(ref count, x)) | ||||
| .Then(x => Math.Log10(x)) | ||||
|
|
r119 | .On(() => { | ||
|
|
r77 | Interlocked.Decrement(ref pending); | ||
| if (pending == 0) | ||||
| stop.Set(); | ||||
|
|
r119 | }, PromiseEventType.All); | ||
|
|
r77 | } | ||
| stop.WaitOne(); | ||||
| Assert.AreEqual(iterations, count); | ||||
| Console.WriteLine("Max threads: {0}", pool.MaxRunningThreads); | ||||
| pool.Dispose(); | ||||
| } | ||||
| [TestMethod] | ||||
| public void WorkerPoolDisposeTest() { | ||||
| var pool = new WorkerPool(5, 20); | ||||
| Assert.AreEqual(5, pool.PoolSize); | ||||
| pool.Dispose(); | ||||
| Thread.Sleep(500); | ||||
| Assert.AreEqual(0, pool.PoolSize); | ||||
| pool.Dispose(); | ||||
| } | ||||
| [TestMethod] | ||||
| public void MTQueueTest() { | ||||
| var queue = new MTQueue<int>(); | ||||
| int res; | ||||
| queue.Enqueue(10); | ||||
| Assert.IsTrue(queue.TryDequeue(out res)); | ||||
| Assert.AreEqual(10, res); | ||||
| Assert.IsFalse(queue.TryDequeue(out res)); | ||||
| for (int i = 0; i < 1000; i++) | ||||
| queue.Enqueue(i); | ||||
| for (int i = 0; i < 1000; i++) { | ||||
| queue.TryDequeue(out res); | ||||
| Assert.AreEqual(i, res); | ||||
| } | ||||
| int writers = 0; | ||||
| int readers = 0; | ||||
| var stop = new ManualResetEvent(false); | ||||
| int total = 0; | ||||
| const int itemsPerWriter = 10000; | ||||
| const int writersCount = 10; | ||||
| for (int i = 0; i < writersCount; i++) { | ||||
| Interlocked.Increment(ref writers); | ||||
| AsyncPool | ||||
|
|
r124 | .RunThread(() => { | ||
|
|
r77 | for (int ii = 0; ii < itemsPerWriter; ii++) { | ||
| queue.Enqueue(1); | ||||
| } | ||||
| return 1; | ||||
| }) | ||||
|
|
r119 | .On(() => Interlocked.Decrement(ref writers), PromiseEventType.All); | ||
|
|
r77 | } | ||
| for (int i = 0; i < 10; i++) { | ||||
| Interlocked.Increment(ref readers); | ||||
| AsyncPool | ||||
|
|
r124 | .RunThread(() => { | ||
|
|
r77 | int t; | ||
| do { | ||||
| while (queue.TryDequeue(out t)) | ||||
| Interlocked.Add(ref total, t); | ||||
| } while (writers > 0); | ||||
| return 1; | ||||
| }) | ||||
|
|
r119 | .On(() => { | ||
|
|
r77 | Interlocked.Decrement(ref readers); | ||
| if (readers == 0) | ||||
| stop.Set(); | ||||
|
|
r119 | }, PromiseEventType.All); | ||
| } | ||||
| stop.WaitOne(); | ||||
| Assert.AreEqual(100000, total); | ||||
| } | ||||
| [TestMethod] | ||||
| public void AsyncQueueTest() { | ||||
| var queue = new AsyncQueue<int>(); | ||||
| int res; | ||||
| queue.Enqueue(10); | ||||
| Assert.IsTrue(queue.TryDequeue(out res)); | ||||
| Assert.AreEqual(10, res); | ||||
| Assert.IsFalse(queue.TryDequeue(out res)); | ||||
| for (int i = 0; i < 1000; i++) | ||||
| queue.Enqueue(i); | ||||
| for (int i = 0; i < 1000; i++) { | ||||
| queue.TryDequeue(out res); | ||||
| Assert.AreEqual(i, res); | ||||
| } | ||||
|
|
r121 | const int count = 10000000; | ||
|
|
r119 | |||
|
|
r121 | int res1 = 0, res2 = 0; | ||
| var t1 = Environment.TickCount; | ||||
|
|
r119 | |||
|
|
r121 | AsyncPool.RunThread( | ||
| () => { | ||||
| for (var i = 0; i < count; i++) | ||||
| queue.Enqueue(1); | ||||
| Console.WriteLine("done writer #1: {0} ms", Environment.TickCount - t1); | ||||
| }, | ||||
| () => { | ||||
| for (var i = 0; i < count; i++) | ||||
| queue.Enqueue(2); | ||||
| Console.WriteLine("done writer #2: {0} ms", Environment.TickCount - t1); | ||||
| }, | ||||
| () => { | ||||
| int temp; | ||||
| int i = 0; | ||||
| while (i < count) | ||||
| if (queue.TryDequeue(out temp)) { | ||||
| i++; | ||||
| res1 += temp; | ||||
|
|
r119 | } | ||
|
|
r121 | Console.WriteLine("done reader #1: {0} ms", Environment.TickCount - t1); | ||
| }, | ||||
| () => { | ||||
| int temp; | ||||
| int i = 0; | ||||
| while (i < count) | ||||
| if (queue.TryDequeue(out temp)) { | ||||
| i++; | ||||
| res2 += temp; | ||||
| } | ||||
| Console.WriteLine("done reader #2: {0} ms", Environment.TickCount - t1); | ||||
| } | ||||
| ) | ||||
|
|
r124 | .Bundle() | ||
|
|
r121 | .Join(); | ||
| Assert.AreEqual(count * 3, res1 + res2); | ||||
| Console.WriteLine( | ||||
| "done: {0} ms, summ#1: {1}, summ#2: {2}, total: {3}, count: {4}", | ||||
| Environment.TickCount - t1, | ||||
| res1, | ||||
| res2, | ||||
| res1 + res2, | ||||
| count | ||||
| ); | ||||
| } | ||||
| [TestMethod] | ||||
| public void AsyncQueueBatchTest() { | ||||
| var queue = new AsyncQueue<int>(); | ||||
| const int wBatch = 29; | ||||
| const int wCount = 400000; | ||||
| const int total = wBatch * wCount * 2; | ||||
| const int summ = wBatch * wCount * 3; | ||||
|
|
r119 | |||
|
|
r121 | int r1 = 0, r2 = 0; | ||
| const int rBatch = 111; | ||||
| int read = 0; | ||||
| var t1 = Environment.TickCount; | ||||
| AsyncPool.RunThread( | ||||
| () => { | ||||
| var buffer = new int[wBatch]; | ||||
| for(int i = 0; i<wBatch; i++) | ||||
| buffer[i] = 1; | ||||
| for(int i =0; i < wCount; i++) | ||||
| queue.EnqueueRange(buffer,0,wBatch); | ||||
| Console.WriteLine("done writer #1: {0} ms", Environment.TickCount - t1); | ||||
| }, | ||||
| () => { | ||||
| var buffer = new int[wBatch]; | ||||
| for(int i = 0; i<wBatch; i++) | ||||
| buffer[i] = 2; | ||||
| for(int i =0; i < wCount; i++) | ||||
| queue.EnqueueRange(buffer,0,wBatch); | ||||
| Console.WriteLine("done writer #2: {0} ms", Environment.TickCount - t1); | ||||
| }, | ||||
| () => { | ||||
| var buffer = new int[rBatch]; | ||||
|
|
r77 | |||
|
|
r121 | while(read < total) { | ||
| int actual; | ||||
| if (queue.TryDequeueRange(buffer,0,rBatch,out actual)) { | ||||
| for(int i=0; i< actual; i++) | ||||
| r1 += buffer[i]; | ||||
| Interlocked.Add(ref read, actual); | ||||
| } | ||||
| } | ||||
| Console.WriteLine("done reader #1: {0} ms", Environment.TickCount - t1); | ||||
| }, | ||||
| () => { | ||||
| var buffer = new int[rBatch]; | ||||
|
|
r77 | |||
|
|
r121 | while(read < total) { | ||
| int actual; | ||||
| if (queue.TryDequeueRange(buffer,0,rBatch,out actual)) { | ||||
| for(int i=0; i< actual; i++) | ||||
| r2 += buffer[i]; | ||||
| Interlocked.Add(ref read, actual); | ||||
| } | ||||
| } | ||||
| Console.WriteLine("done reader #2: {0} ms", Environment.TickCount - t1); | ||||
| } | ||||
| ) | ||||
|
|
r124 | .Bundle() | ||
|
|
r121 | .Join(); | ||
| Assert.AreEqual(summ , r1 + r2); | ||||
| Console.WriteLine( | ||||
| "done: {0} ms, summ#1: {1}, summ#2: {2}, total: {3}, count: {4}", | ||||
| Environment.TickCount - t1, | ||||
| r1, | ||||
| r2, | ||||
| r1 + r2, | ||||
| total | ||||
| ); | ||||
|
|
r77 | } | ||
| [TestMethod] | ||||
|
|
r122 | public void AsyncQueueChunkDequeueTest() { | ||
| var queue = new AsyncQueue<int>(); | ||||
| const int wBatch = 31; | ||||
| const int wCount = 200000; | ||||
| const int total = wBatch * wCount * 3; | ||||
| const int summ = wBatch * wCount * 6; | ||||
| int r1 = 0, r2 = 0; | ||||
| const int rBatch = 1024; | ||||
| int read = 0; | ||||
| var t1 = Environment.TickCount; | ||||
| AsyncPool.RunThread( | ||||
| () => { | ||||
| var buffer = new int[wBatch]; | ||||
| for(int i = 0; i<wBatch; i++) | ||||
| buffer[i] = 1; | ||||
| for(int i =0; i < wCount; i++) | ||||
| queue.EnqueueRange(buffer,0,wBatch); | ||||
| Console.WriteLine("done writer #1: {0} ms", Environment.TickCount - t1); | ||||
| }, | ||||
| () => { | ||||
| var buffer = new int[wBatch]; | ||||
| for(int i = 0; i<wBatch; i++) | ||||
| buffer[i] = 2; | ||||
| for(int i =0; i < wCount; i++) | ||||
| queue.EnqueueRange(buffer,0,wBatch); | ||||
| Console.WriteLine("done writer #2: {0} ms", Environment.TickCount - t1); | ||||
| }, | ||||
| () => { | ||||
| var buffer = new int[wBatch]; | ||||
| for(int i = 0; i<wBatch; i++) | ||||
| buffer[i] = 3; | ||||
| for(int i =0; i < wCount; i++) | ||||
| queue.EnqueueRange(buffer,0,wBatch); | ||||
| Console.WriteLine("done writer #3: {0} ms", Environment.TickCount - t1); | ||||
| }, | ||||
| () => { | ||||
| var buffer = new int[rBatch]; | ||||
| int count = 1; | ||||
| double avgchunk = 0; | ||||
| while(read < total) { | ||||
| int actual; | ||||
| if (queue.TryDequeueChunk(buffer,0,rBatch,out actual)) { | ||||
| for(int i=0; i< actual; i++) | ||||
| r2 += buffer[i]; | ||||
| Interlocked.Add(ref read, actual); | ||||
| avgchunk = avgchunk*(count-1)/count + actual/(double)count; | ||||
| count ++; | ||||
| } | ||||
| } | ||||
| Console.WriteLine("done reader #2: {0} ms, avg chunk size: {1}", Environment.TickCount - t1, avgchunk); | ||||
| } | ||||
| ) | ||||
|
|
r124 | .Bundle() | ||
| .Join(); | ||||
| Assert.AreEqual(summ , r1 + r2); | ||||
| Console.WriteLine( | ||||
| "done: {0} ms, summ#1: {1}, summ#2: {2}, total: {3}, count: {4}", | ||||
| Environment.TickCount - t1, | ||||
| r1, | ||||
| r2, | ||||
| r1 + r2, | ||||
| total | ||||
| ); | ||||
| } | ||||
| [TestMethod] | ||||
| public void AsyncQueueDrainTest() { | ||||
| var queue = new AsyncQueue<int>(); | ||||
| const int wBatch = 11; | ||||
| const int wCount = 200000; | ||||
| const int total = wBatch * wCount * 3; | ||||
| const int summ = wBatch * wCount * 3; | ||||
| int r1 = 0, r2 = 0; | ||||
| const int rBatch = 11; | ||||
| int read = 0; | ||||
| var t1 = Environment.TickCount; | ||||
| AsyncPool.RunThread( | ||||
| () => { | ||||
| var buffer = new int[wBatch]; | ||||
| for(int i = 0; i<wBatch; i++) | ||||
| buffer[i] = 1; | ||||
| for(int i =0; i < wCount; i++) | ||||
| queue.EnqueueRange(buffer,0,wBatch); | ||||
| Console.WriteLine("done writer #1: {0} ms", Environment.TickCount - t1); | ||||
| }, | ||||
| () => { | ||||
| for(int i =0; i < wCount * wBatch; i++) | ||||
| queue.Enqueue(1); | ||||
| Console.WriteLine("done writer #2: {0} ms", Environment.TickCount - t1); | ||||
| }, | ||||
| () => { | ||||
| var buffer = new int[wBatch]; | ||||
| for(int i = 0; i<wBatch; i++) | ||||
| buffer[i] = 1; | ||||
| for(int i =0; i < wCount; i++) | ||||
| queue.EnqueueRange(buffer,0,wBatch); | ||||
| Console.WriteLine("done writer #3: {0} ms", Environment.TickCount - t1); | ||||
| }, | ||||
| /*() => { | ||||
| int temp; | ||||
| int count = 0; | ||||
| while (read < total) | ||||
| if (queue.TryDequeue(out temp)) { | ||||
| count++; | ||||
| r1 += temp; | ||||
| Interlocked.Increment(ref read); | ||||
| } | ||||
| Console.WriteLine("done reader #1: {0} ms, {1} count", Environment.TickCount - t1, count); | ||||
| },*/ | ||||
| /*() => { | ||||
| var buffer = new int[rBatch]; | ||||
| var count = 0; | ||||
| while(read < total) { | ||||
| int actual; | ||||
| if (queue.TryDequeueRange(buffer,0,rBatch,out actual)) { | ||||
| for(int i=0; i< actual; i++) | ||||
| r1 += buffer[i]; | ||||
| Interlocked.Add(ref read, actual); | ||||
| count += actual; | ||||
| } | ||||
| } | ||||
| Console.WriteLine("done reader #1: {0} ms, {1} items", Environment.TickCount - t1, count); | ||||
| },*/ | ||||
| () => { | ||||
| var count = 0; | ||||
| while(read < total) { | ||||
| var buffer = queue.Drain(); | ||||
| for(int i=0; i< buffer.Length; i++) | ||||
| r1 += buffer[i]; | ||||
| Interlocked.Add(ref read, buffer.Length); | ||||
| count += buffer.Length; | ||||
| } | ||||
| Console.WriteLine("done reader #1: {0} ms, {1} items", Environment.TickCount - t1, count); | ||||
| }, | ||||
| () => { | ||||
| var count = 0; | ||||
| while(read < total) { | ||||
| var buffer = queue.Drain(); | ||||
| for(int i=0; i< buffer.Length; i++) | ||||
| r2 += buffer[i]; | ||||
| Interlocked.Add(ref read, buffer.Length); | ||||
| count += buffer.Length; | ||||
| } | ||||
| Console.WriteLine("done reader #2: {0} ms, {1} items", Environment.TickCount - t1, count); | ||||
| } | ||||
| ) | ||||
| .Bundle() | ||||
|
|
r122 | .Join(); | ||
| Assert.AreEqual(summ , r1 + r2); | ||||
| Console.WriteLine( | ||||
| "done: {0} ms, summ#1: {1}, summ#2: {2}, total: {3}, count: {4}", | ||||
| Environment.TickCount - t1, | ||||
| r1, | ||||
| r2, | ||||
| r1 + r2, | ||||
| total | ||||
| ); | ||||
| } | ||||
| [TestMethod] | ||||
|
|
r77 | public void ParallelMapTest() { | ||
| const int count = 100000; | ||||
| var args = new double[count]; | ||||
| var rand = new Random(); | ||||
| for (int i = 0; i < count; i++) | ||||
| args[i] = rand.NextDouble(); | ||||
| var t = Environment.TickCount; | ||||
| var res = args.ParallelMap(x => Math.Sin(x*x), 4).Join(); | ||||
| Console.WriteLine("Map complete in {0} ms", Environment.TickCount - t); | ||||
| t = Environment.TickCount; | ||||
| for (int i = 0; i < count; i++) | ||||
| Assert.AreEqual(Math.Sin(args[i] * args[i]), res[i]); | ||||
| Console.WriteLine("Verified in {0} ms", Environment.TickCount - t); | ||||
| } | ||||
| [TestMethod] | ||||
| public void ChainedMapTest() { | ||||
|
|
r125 | using (var pool = new WorkerPool()) { | ||
|
|
r77 | const int count = 10000; | ||
| var args = new double[count]; | ||||
| var rand = new Random(); | ||||
| for (int i = 0; i < count; i++) | ||||
| args[i] = rand.NextDouble(); | ||||
| var t = Environment.TickCount; | ||||
| var res = args | ||||
| .ChainedMap( | ||||
| // Analysis disable once AccessToDisposedClosure | ||||
| x => pool.Invoke( | ||||
| () => Math.Sin(x * x) | ||||
| ), | ||||
| 4 | ||||
| ) | ||||
| .Join(); | ||||
| Console.WriteLine("Map complete in {0} ms", Environment.TickCount - t); | ||||
| t = Environment.TickCount; | ||||
| for (int i = 0; i < count; i++) | ||||
| Assert.AreEqual(Math.Sin(args[i] * args[i]), res[i]); | ||||
| Console.WriteLine("Verified in {0} ms", Environment.TickCount - t); | ||||
| Console.WriteLine("Max workers: {0}", pool.MaxRunningThreads); | ||||
| } | ||||
| } | ||||
| [TestMethod] | ||||
| public void ParallelForEachTest() { | ||||
| const int count = 100000; | ||||
| var args = new int[count]; | ||||
| var rand = new Random(); | ||||
| for (int i = 0; i < count; i++) | ||||
| args[i] = (int)(rand.NextDouble() * 100); | ||||
| int result = 0; | ||||
| var t = Environment.TickCount; | ||||
| args.ParallelForEach(x => Interlocked.Add(ref result, x), 4).Join(); | ||||
| Console.WriteLine("Iteration complete in {0} ms, result: {1}", Environment.TickCount - t, result); | ||||
| int result2 = 0; | ||||
| t = Environment.TickCount; | ||||
| for (int i = 0; i < count; i++) | ||||
| result2 += args[i]; | ||||
| Assert.AreEqual(result2, result); | ||||
| Console.WriteLine("Verified in {0} ms", Environment.TickCount - t); | ||||
| } | ||||
| [TestMethod] | ||||
| public void ComplexCase1Test() { | ||||
| var flags = new bool[3]; | ||||
| // op1 (aync 200ms) => op2 (async 200ms) => op3 (sync map) | ||||
| var step1 = PromiseHelper | ||||
| .Sleep(200, "Alan") | ||||
|
|
r119 | .On(() => flags[0] = true, PromiseEventType.Cancelled); | ||
|
|
r77 | var p = step1 | ||
| .Chain(x => | ||||
| PromiseHelper | ||||
| .Sleep(200, "Hi, " + x) | ||||
| .Then(y => y) | ||||
|
|
r119 | .On(() => flags[1] = true, PromiseEventType.Cancelled) | ||
|
|
r77 | ) | ||
|
|
r119 | .On(() => flags[2] = true, PromiseEventType.Cancelled); | ||
|
|
r77 | step1.Join(); | ||
| p.Cancel(); | ||||
| try { | ||||
| Assert.AreEqual(p.Join(), "Hi, Alan"); | ||||
| Assert.Fail("Shouldn't get here"); | ||||
| } catch (OperationCanceledException) { | ||||
| } | ||||
| Assert.IsFalse(flags[0]); | ||||
| Assert.IsTrue(flags[1]); | ||||
| Assert.IsTrue(flags[2]); | ||||
| } | ||||
| [TestMethod] | ||||
| public void ChainedCancel1Test() { | ||||
| // при отмене сцепленной асинхронной операции все обещание должно | ||||
| // завершаться ошибкой OperationCanceledException | ||||
| var p = PromiseHelper | ||||
| .Sleep(1, "Hi, HAL!") | ||||
| .Then(x => { | ||||
| // запускаем две асинхронные операции | ||||
| var result = PromiseHelper.Sleep(1000, "HEM ENABLED!!!"); | ||||
| // вторая операция отменяет первую до завершения | ||||
| PromiseHelper | ||||
| .Sleep(100, "HAL, STOP!") | ||||
| .Then(result.Cancel); | ||||
| return result; | ||||
| }); | ||||
| try { | ||||
| p.Join(); | ||||
| } catch (TargetInvocationException err) { | ||||
| Assert.IsTrue(err.InnerException is OperationCanceledException); | ||||
| } | ||||
| } | ||||
| [TestMethod] | ||||
| public void ChainedCancel2Test() { | ||||
| // при отмене цепочки обещаний, вложенные операции также должны отменяться | ||||
| var pSurvive = new Promise<bool>(); | ||||
|
|
r149 | var hemStarted = new Signal(); | ||
|
|
r77 | var p = PromiseHelper | ||
| .Sleep(1, "Hi, HAL!") | ||||
|
|
r149 | .Chain(() => { | ||
|
|
r77 | hemStarted.Set(); | ||
| // запускаем две асинхронные операции | ||||
| var result = PromiseHelper | ||||
|
|
r149 | .Sleep(2000, "HEM ENABLED!!!") | ||
| .Then(() => pSurvive.Resolve(false)); | ||||
|
|
r77 | |||
| result | ||||
|
|
r138 | .On(() => pSurvive.Resolve(true), PromiseEventType.Cancelled); | ||
|
|
r149 | |||
|
|
r77 | return result; | ||
| }); | ||||
|
|
r149 | hemStarted.Wait(); | ||
|
|
r77 | p.Cancel(); | ||
| try { | ||||
| p.Join(); | ||||
|
|
r149 | Assert.Fail(); | ||
|
|
r77 | } catch (OperationCanceledException) { | ||
| } | ||||
|
|
r149 | Assert.IsTrue(pSurvive.Join()); | ||
|
|
r77 | } | ||
|
|
r136 | |||
| [TestMethod] | ||||
| public void SharedLockTest() { | ||||
| var l = new SharedLock(); | ||||
| int shared = 0; | ||||
| int exclusive = 0; | ||||
| var s1 = new Signal(); | ||||
| var log = new AsyncQueue<string>(); | ||||
| try { | ||||
| AsyncPool.RunThread( | ||||
| () => { | ||||
| log.Enqueue("Reader #1 started"); | ||||
| try { | ||||
| l.LockShared(); | ||||
| log.Enqueue("Reader #1 lock got"); | ||||
| if (Interlocked.Increment(ref shared) == 2) | ||||
| s1.Set(); | ||||
| s1.Wait(); | ||||
| log.Enqueue("Reader #1 finished"); | ||||
| Interlocked.Decrement(ref shared); | ||||
| } finally { | ||||
| l.Release(); | ||||
| log.Enqueue("Reader #1 lock released"); | ||||
| } | ||||
| }, | ||||
| () => { | ||||
| log.Enqueue("Reader #2 started"); | ||||
| try { | ||||
| l.LockShared(); | ||||
| log.Enqueue("Reader #2 lock got"); | ||||
| if (Interlocked.Increment(ref shared) == 2) | ||||
| s1.Set(); | ||||
| s1.Wait(); | ||||
| log.Enqueue("Reader #2 upgrading to writer"); | ||||
| Interlocked.Decrement(ref shared); | ||||
| l.Upgrade(); | ||||
| log.Enqueue("Reader #2 upgraded"); | ||||
| Assert.AreEqual(1, Interlocked.Increment(ref exclusive)); | ||||
| Assert.AreEqual(0, shared); | ||||
| log.Enqueue("Reader #2 finished"); | ||||
| Interlocked.Decrement(ref exclusive); | ||||
| } finally { | ||||
| l.Release(); | ||||
| log.Enqueue("Reader #2 lock released"); | ||||
| } | ||||
| }, | ||||
| () => { | ||||
| log.Enqueue("Writer #1 started"); | ||||
| try { | ||||
| l.LockExclusive(); | ||||
| log.Enqueue("Writer #1 got the lock"); | ||||
| Assert.AreEqual(1, Interlocked.Increment(ref exclusive)); | ||||
| Interlocked.Decrement(ref exclusive); | ||||
| log.Enqueue("Writer #1 is finished"); | ||||
| } finally { | ||||
| l.Release(); | ||||
| log.Enqueue("Writer #1 lock released"); | ||||
| } | ||||
| } | ||||
| ).Bundle().Join(1000); | ||||
| log.Enqueue("Done"); | ||||
| } catch(Exception error) { | ||||
| log.Enqueue(error.Message); | ||||
| throw; | ||||
| } finally { | ||||
| foreach (var m in log) | ||||
| Console.WriteLine(m); | ||||
| } | ||||
| } | ||||
|
|
r151 | |||
| #if NET_4_5 | ||||
| [TestMethod] | ||||
| public async void TaskInteropTest() { | ||||
| var promise = new Promise<int>(); | ||||
| promise.Resolve(10); | ||||
| var res = await promise; | ||||
| Assert.AreEqual(10, res); | ||||
| } | ||||
| #endif | ||||
|
|
r77 | } | ||
| } | ||||
