using System; using Microsoft.VisualStudio.TestTools.UnitTesting; using System.Reflection; using System.Threading; using Implab.Parallels; namespace Implab.Test { [TestClass] public class AsyncTests { [TestMethod] public void ResolveTest() { int res = -1; var p = new Promise(); p.Then(x => res = x); p.Resolve(100); Assert.AreEqual(100, res); } [TestMethod] public void RejectTest() { int res = -1; Exception err = null; var p = new Promise(); p.Then( x => res = x, e => { err = e; return -2; } ); p.Reject(new ApplicationException("error")); Assert.AreEqual(res, -1); Assert.AreEqual(err.Message, "error"); } [TestMethod] public void JoinSuccessTest() { var p = new Promise(); p.Resolve(100); Assert.AreEqual(p.Join(), 100); } [TestMethod] public void JoinFailTest() { var p = new Promise(); p.Reject(new ApplicationException("failed")); try { p.Join(); throw new ApplicationException("WRONG!"); } catch (TargetInvocationException err) { Assert.AreEqual(err.InnerException.Message, "failed"); } catch { Assert.Fail("Got wrong excaption"); } } [TestMethod] public void MapTest() { var p = new Promise(); var p2 = p.Map(x => x.ToString()); p.Resolve(100); Assert.AreEqual(p2.Join(), "100"); } [TestMethod] public void FixErrorTest() { var p = new Promise(); var p2 = p.Error(e => 101); p.Reject(new Exception()); Assert.AreEqual(p2.Join(), 101); } [TestMethod] public void ChainTest() { var p1 = new Promise(); var p3 = p1.Chain(x => { var p2 = new Promise(); p2.Resolve(x.ToString()); return p2; }); p1.Resolve(100); Assert.AreEqual(p3.Join(), "100"); } [TestMethod] public void PoolTest() { var pid = Thread.CurrentThread.ManagedThreadId; var p = AsyncPool.Invoke(() => Thread.CurrentThread.ManagedThreadId); Assert.AreNotEqual(pid, p.Join()); } [TestMethod] public void WorkerPoolSizeTest() { var pool = new WorkerPool(5, 10, 0); Assert.AreEqual(5, pool.PoolSize); pool.Invoke(() => { Thread.Sleep(100000000); return 10; }); pool.Invoke(() => { Thread.Sleep(100000000); return 10; }); pool.Invoke(() => { Thread.Sleep(100000000); return 10; }); Assert.AreEqual(5, pool.PoolSize); for (int i = 0; i < 100; i++) pool.Invoke(() => { Thread.Sleep(100000000); return 10; }); Thread.Sleep(200); Assert.AreEqual(10, pool.PoolSize); pool.Dispose(); } [TestMethod] public void WorkerPoolCorrectTest() { var pool = new WorkerPool(0,1000,100); int iterations = 1000; int pending = iterations; var stop = new ManualResetEvent(false); var count = 0; for (int i = 0; i < iterations; i++) { pool .Invoke(() => 1) .Then(x => Interlocked.Add(ref count, x)) .Then(x => Math.Log10(x)) .Anyway(() => { Interlocked.Decrement(ref pending); if (pending == 0) stop.Set(); }); } stop.WaitOne(); Assert.AreEqual(iterations, count); Console.WriteLine("Max threads: {0}", pool.MaxRunningThreads); pool.Dispose(); } [TestMethod] public void WorkerPoolDisposeTest() { var pool = new WorkerPool(5, 20); Assert.AreEqual(5, pool.PoolSize); pool.Dispose(); Thread.Sleep(500); Assert.AreEqual(0, pool.PoolSize); pool.Dispose(); } [TestMethod] public void MTQueueTest() { var queue = new MTQueue(); int res; queue.Enqueue(10); Assert.IsTrue(queue.TryDequeue(out res)); Assert.AreEqual(10, res); Assert.IsFalse(queue.TryDequeue(out res)); for (int i = 0; i < 1000; i++) queue.Enqueue(i); for (int i = 0; i < 1000; i++) { queue.TryDequeue(out res); Assert.AreEqual(i, res); } int writers = 0; int readers = 0; var stop = new ManualResetEvent(false); int total = 0; int itemsPerWriter = 1000; int writersCount = 3; for (int i = 0; i < writersCount; i++) { Interlocked.Increment(ref writers); var wn = i; AsyncPool .InvokeNewThread(() => { for (int ii = 0; ii < itemsPerWriter; ii++) { queue.Enqueue(1); } return 1; }) .Anyway(() => Interlocked.Decrement(ref writers)); } for (int i = 0; i < 10; i++) { Interlocked.Increment(ref readers); var wn = i; AsyncPool .InvokeNewThread(() => { int t; do { while (queue.TryDequeue(out t)) Interlocked.Add(ref total, t); } while (writers > 0); return 1; }) .Anyway(() => { Interlocked.Decrement(ref readers); if (readers == 0) stop.Set(); }); } stop.WaitOne(); Assert.AreEqual(itemsPerWriter * writersCount, total); } [TestMethod] public void ParallelMapTest() { int count = 100000; double[] args = new double[count]; var rand = new Random(); for (int i = 0; i < count; i++) args[i] = rand.NextDouble(); var t = Environment.TickCount; var res = args.ParallelMap(x => Math.Sin(x*x), 4).Join(); Console.WriteLine("Map complete in {0} ms", Environment.TickCount - t); t = Environment.TickCount; for (int i = 0; i < count; i++) Assert.AreEqual(Math.Sin(args[i] * args[i]), res[i]); Console.WriteLine("Verified in {0} ms", Environment.TickCount - t); } [TestMethod] public void ChainedMapTest() { using (var pool = new WorkerPool(0,100,100)) { int count = 10000; double[] args = new double[count]; var rand = new Random(); for (int i = 0; i < count; i++) args[i] = rand.NextDouble(); var t = Environment.TickCount; var res = args .ChainedMap( x => pool.Invoke( () => Math.Sin(x * x) ), 4 ) .Join(); Console.WriteLine("Map complete in {0} ms", Environment.TickCount - t); t = Environment.TickCount; for (int i = 0; i < count; i++) Assert.AreEqual(Math.Sin(args[i] * args[i]), res[i]); Console.WriteLine("Verified in {0} ms", Environment.TickCount - t); Console.WriteLine("Max workers: {0}", pool.MaxRunningThreads); } } [TestMethod] public void ParallelForEachTest() { int count = 100000; int[] args = new int[count]; var rand = new Random(); for (int i = 0; i < count; i++) args[i] = (int)(rand.NextDouble() * 100); int result = 0; var t = Environment.TickCount; args.ParallelForEach(x => Interlocked.Add(ref result, x), 4).Join(); Console.WriteLine("Iteration complete in {0} ms, result: {1}", Environment.TickCount - t, result); int result2 = 0; t = Environment.TickCount; for (int i = 0; i < count; i++) result2 += args[i]; Assert.AreEqual(result2, result); Console.WriteLine("Verified in {0} ms", Environment.TickCount - t); } [TestMethod] public void ComplexCase1Test() { var flags = new bool[3]; // op1 (aync 200ms) => op2 (async 200ms) => op3 (sync map) var p = PromiseHelper .Sleep(200, "Alan") .Cancelled(() => flags[0] = true) .Chain(x => PromiseHelper .Sleep(200, "Hi, " + x) .Map(y => y) .Cancelled(() => flags[1] = true) ) .Cancelled(() => flags[2] = true); Thread.Sleep(300); p.Cancel(); try { Assert.AreEqual(p.Join(), "Hi, Alan"); Assert.Fail("Shouldn't get here"); } catch (OperationCanceledException) { } Assert.IsFalse(flags[0]); Assert.IsTrue(flags[1]); Assert.IsTrue(flags[2]); } [TestMethod] public void ChainedCancel1Test() { // при отмене сцепленной асинхронной операции все обещание должно // завершаться ошибкой OperationCanceledException var p = PromiseHelper .Sleep(1, "Hi, HAL!") .Chain(x => { // запускаем две асинхронные операции var result = PromiseHelper.Sleep(1000, "HEM ENABLED!!!"); // вторая операция отменяет первую до завершения PromiseHelper .Sleep(100, "HAL, STOP!") .Then(() => result.Cancel()); return result; }); try { p.Join(); } catch (TargetInvocationException err) { Assert.IsTrue(err.InnerException is OperationCanceledException); } } [TestMethod] public void ChainedCancel2Test() { // при отмене цепочки обещаний, вложенные операции также должны отменяться IPromise p = null; var pSurvive = new Promise(); var hemStarted = new ManualResetEvent(false); p = PromiseHelper .Sleep(1, "Hi, HAL!") .Chain(x => { hemStarted.Set(); // запускаем две асинхронные операции var result = PromiseHelper .Sleep(1000, "HEM ENABLED!!!") .Then(s => pSurvive.Resolve(false)); result .Cancelled(() => pSurvive.Resolve(true)); return result; }); hemStarted.WaitOne(); p.Cancel(); try { p.Join(); } catch (OperationCanceledException) { Assert.IsTrue(pSurvive.Join()); } } } }