AsyncTests.cs
452 lines
| 13.5 KiB
| text/x-csharp
|
CSharpLexer
/ Implab.Test / AsyncTests.cs
cin
|
r77 | using System; | ||
using System.Reflection; | ||||
using System.Threading; | ||||
using Implab.Parallels; | ||||
#if MONO | ||||
using NUnit.Framework; | ||||
using TestClassAttribute = NUnit.Framework.TestFixtureAttribute; | ||||
using TestMethod = NUnit.Framework.TestAttribute; | ||||
#else | ||||
using Microsoft.VisualStudio.TestTools.UnitTesting; | ||||
#endif | ||||
namespace Implab.Test { | ||||
[TestClass] | ||||
public class AsyncTests { | ||||
[TestMethod] | ||||
public void ResolveTest() { | ||||
int res = -1; | ||||
var p = new Promise<int>(); | ||||
p.Then(x => res = x); | ||||
p.Resolve(100); | ||||
Assert.AreEqual(100, res); | ||||
} | ||||
[TestMethod] | ||||
public void RejectTest() { | ||||
int res = -1; | ||||
Exception err = null; | ||||
var p = new Promise<int>(); | ||||
p.Then( | ||||
x => res = x, | ||||
e => { | ||||
err = e; | ||||
return -2; | ||||
} | ||||
); | ||||
p.Reject(new ApplicationException("error")); | ||||
Assert.AreEqual(res, -1); | ||||
Assert.AreEqual(err.Message, "error"); | ||||
} | ||||
[TestMethod] | ||||
public void CancelExceptionTest() { | ||||
var p = new Promise<bool>(); | ||||
p.Cancel(); | ||||
var p2 = p.Cancelled(() => { | ||||
throw new ApplicationException("CANCELLED"); | ||||
}); | ||||
try { | ||||
p2.Join(); | ||||
Assert.Fail(); | ||||
} catch (ApplicationException err) { | ||||
Assert.AreEqual("CANCELLED", err.InnerException.Message); | ||||
} | ||||
} | ||||
[TestMethod] | ||||
public void ContinueOnCancelTest() { | ||||
var p = new Promise<bool>(); | ||||
p.Cancel(); | ||||
var p2 = p | ||||
.Cancelled(() => { | ||||
throw new ApplicationException("CANCELLED"); | ||||
}) | ||||
.Error(e => true); | ||||
Assert.AreEqual(true, p2.Join()); | ||||
} | ||||
[TestMethod] | ||||
public void JoinSuccessTest() { | ||||
var p = new Promise<int>(); | ||||
p.Resolve(100); | ||||
Assert.AreEqual(p.Join(), 100); | ||||
} | ||||
[TestMethod] | ||||
public void JoinFailTest() { | ||||
var p = new Promise<int>(); | ||||
p.Reject(new ApplicationException("failed")); | ||||
try { | ||||
p.Join(); | ||||
throw new ApplicationException("WRONG!"); | ||||
} catch (TargetInvocationException err) { | ||||
Assert.AreEqual(err.InnerException.Message, "failed"); | ||||
} catch { | ||||
Assert.Fail("Got wrong excaption"); | ||||
} | ||||
} | ||||
[TestMethod] | ||||
public void MapTest() { | ||||
var p = new Promise<int>(); | ||||
var p2 = p.Then(x => x.ToString()); | ||||
p.Resolve(100); | ||||
Assert.AreEqual(p2.Join(), "100"); | ||||
} | ||||
[TestMethod] | ||||
public void FixErrorTest() { | ||||
var p = new Promise<int>(); | ||||
var p2 = p.Error(e => 101); | ||||
p.Reject(new Exception()); | ||||
Assert.AreEqual(p2.Join(), 101); | ||||
} | ||||
[TestMethod] | ||||
public void ChainTest() { | ||||
var p1 = new Promise<int>(); | ||||
var p3 = p1.Chain(x => { | ||||
var p2 = new Promise<string>(); | ||||
p2.Resolve(x.ToString()); | ||||
return p2; | ||||
}); | ||||
p1.Resolve(100); | ||||
Assert.AreEqual(p3.Join(), "100"); | ||||
} | ||||
[TestMethod] | ||||
cin
|
r105 | public void ChainFailTest() { | ||
var p1 = new Promise<int>(); | ||||
var p3 = p1.Chain(x => { | ||||
var p2 = new Promise<string>(); | ||||
p2.Reject(new Exception("DIE!!!")); | ||||
return p2; | ||||
}); | ||||
p1.Resolve(100); | ||||
Assert.IsTrue(p3.IsResolved); | ||||
} | ||||
[TestMethod] | ||||
cin
|
r77 | public void PoolTest() { | ||
var pid = Thread.CurrentThread.ManagedThreadId; | ||||
var p = AsyncPool.Invoke(() => Thread.CurrentThread.ManagedThreadId); | ||||
Assert.AreNotEqual(pid, p.Join()); | ||||
} | ||||
[TestMethod] | ||||
public void WorkerPoolSizeTest() { | ||||
cin
|
r81 | var pool = new WorkerPool(5, 10, 1); | ||
cin
|
r77 | |||
Assert.AreEqual(5, pool.PoolSize); | ||||
pool.Invoke(() => { Thread.Sleep(100000000); return 10; }); | ||||
pool.Invoke(() => { Thread.Sleep(100000000); return 10; }); | ||||
pool.Invoke(() => { Thread.Sleep(100000000); return 10; }); | ||||
Assert.AreEqual(5, pool.PoolSize); | ||||
for (int i = 0; i < 100; i++) | ||||
pool.Invoke(() => { Thread.Sleep(100000000); return 10; }); | ||||
Thread.Sleep(200); | ||||
Assert.AreEqual(10, pool.PoolSize); | ||||
pool.Dispose(); | ||||
} | ||||
[TestMethod] | ||||
public void WorkerPoolCorrectTest() { | ||||
var pool = new WorkerPool(0,1000,100); | ||||
const int iterations = 1000; | ||||
int pending = iterations; | ||||
var stop = new ManualResetEvent(false); | ||||
var count = 0; | ||||
for (int i = 0; i < iterations; i++) { | ||||
pool | ||||
.Invoke(() => 1) | ||||
.Then(x => Interlocked.Add(ref count, x)) | ||||
.Then(x => Math.Log10(x)) | ||||
.Anyway(() => { | ||||
Interlocked.Decrement(ref pending); | ||||
if (pending == 0) | ||||
stop.Set(); | ||||
}); | ||||
} | ||||
stop.WaitOne(); | ||||
Assert.AreEqual(iterations, count); | ||||
Console.WriteLine("Max threads: {0}", pool.MaxRunningThreads); | ||||
pool.Dispose(); | ||||
} | ||||
[TestMethod] | ||||
public void WorkerPoolDisposeTest() { | ||||
var pool = new WorkerPool(5, 20); | ||||
Assert.AreEqual(5, pool.PoolSize); | ||||
pool.Dispose(); | ||||
Thread.Sleep(500); | ||||
Assert.AreEqual(0, pool.PoolSize); | ||||
pool.Dispose(); | ||||
} | ||||
[TestMethod] | ||||
public void MTQueueTest() { | ||||
var queue = new MTQueue<int>(); | ||||
int res; | ||||
queue.Enqueue(10); | ||||
Assert.IsTrue(queue.TryDequeue(out res)); | ||||
Assert.AreEqual(10, res); | ||||
Assert.IsFalse(queue.TryDequeue(out res)); | ||||
for (int i = 0; i < 1000; i++) | ||||
queue.Enqueue(i); | ||||
for (int i = 0; i < 1000; i++) { | ||||
queue.TryDequeue(out res); | ||||
Assert.AreEqual(i, res); | ||||
} | ||||
int writers = 0; | ||||
int readers = 0; | ||||
var stop = new ManualResetEvent(false); | ||||
int total = 0; | ||||
const int itemsPerWriter = 10000; | ||||
const int writersCount = 10; | ||||
for (int i = 0; i < writersCount; i++) { | ||||
Interlocked.Increment(ref writers); | ||||
AsyncPool | ||||
.InvokeNewThread(() => { | ||||
for (int ii = 0; ii < itemsPerWriter; ii++) { | ||||
queue.Enqueue(1); | ||||
} | ||||
return 1; | ||||
}) | ||||
.Anyway(() => Interlocked.Decrement(ref writers)); | ||||
} | ||||
for (int i = 0; i < 10; i++) { | ||||
Interlocked.Increment(ref readers); | ||||
AsyncPool | ||||
.InvokeNewThread(() => { | ||||
int t; | ||||
do { | ||||
while (queue.TryDequeue(out t)) | ||||
Interlocked.Add(ref total, t); | ||||
} while (writers > 0); | ||||
return 1; | ||||
}) | ||||
.Anyway(() => { | ||||
Interlocked.Decrement(ref readers); | ||||
if (readers == 0) | ||||
stop.Set(); | ||||
}); | ||||
} | ||||
stop.WaitOne(); | ||||
Assert.AreEqual(itemsPerWriter * writersCount, total); | ||||
} | ||||
[TestMethod] | ||||
public void ParallelMapTest() { | ||||
const int count = 100000; | ||||
var args = new double[count]; | ||||
var rand = new Random(); | ||||
for (int i = 0; i < count; i++) | ||||
args[i] = rand.NextDouble(); | ||||
var t = Environment.TickCount; | ||||
var res = args.ParallelMap(x => Math.Sin(x*x), 4).Join(); | ||||
Console.WriteLine("Map complete in {0} ms", Environment.TickCount - t); | ||||
t = Environment.TickCount; | ||||
for (int i = 0; i < count; i++) | ||||
Assert.AreEqual(Math.Sin(args[i] * args[i]), res[i]); | ||||
Console.WriteLine("Verified in {0} ms", Environment.TickCount - t); | ||||
} | ||||
[TestMethod] | ||||
public void ChainedMapTest() { | ||||
cin
|
r81 | using (var pool = new WorkerPool(0,10,1)) { | ||
cin
|
r77 | const int count = 10000; | ||
var args = new double[count]; | ||||
var rand = new Random(); | ||||
for (int i = 0; i < count; i++) | ||||
args[i] = rand.NextDouble(); | ||||
var t = Environment.TickCount; | ||||
var res = args | ||||
.ChainedMap( | ||||
// Analysis disable once AccessToDisposedClosure | ||||
x => pool.Invoke( | ||||
() => Math.Sin(x * x) | ||||
), | ||||
4 | ||||
) | ||||
.Join(); | ||||
Console.WriteLine("Map complete in {0} ms", Environment.TickCount - t); | ||||
t = Environment.TickCount; | ||||
for (int i = 0; i < count; i++) | ||||
Assert.AreEqual(Math.Sin(args[i] * args[i]), res[i]); | ||||
Console.WriteLine("Verified in {0} ms", Environment.TickCount - t); | ||||
Console.WriteLine("Max workers: {0}", pool.MaxRunningThreads); | ||||
} | ||||
} | ||||
[TestMethod] | ||||
public void ParallelForEachTest() { | ||||
const int count = 100000; | ||||
var args = new int[count]; | ||||
var rand = new Random(); | ||||
for (int i = 0; i < count; i++) | ||||
args[i] = (int)(rand.NextDouble() * 100); | ||||
int result = 0; | ||||
var t = Environment.TickCount; | ||||
args.ParallelForEach(x => Interlocked.Add(ref result, x), 4).Join(); | ||||
Console.WriteLine("Iteration complete in {0} ms, result: {1}", Environment.TickCount - t, result); | ||||
int result2 = 0; | ||||
t = Environment.TickCount; | ||||
for (int i = 0; i < count; i++) | ||||
result2 += args[i]; | ||||
Assert.AreEqual(result2, result); | ||||
Console.WriteLine("Verified in {0} ms", Environment.TickCount - t); | ||||
} | ||||
[TestMethod] | ||||
public void ComplexCase1Test() { | ||||
var flags = new bool[3]; | ||||
// op1 (aync 200ms) => op2 (async 200ms) => op3 (sync map) | ||||
var step1 = PromiseHelper | ||||
.Sleep(200, "Alan") | ||||
.Cancelled(() => flags[0] = true); | ||||
var p = step1 | ||||
.Chain(x => | ||||
PromiseHelper | ||||
.Sleep(200, "Hi, " + x) | ||||
.Then(y => y) | ||||
.Cancelled(() => flags[1] = true) | ||||
) | ||||
.Cancelled(() => flags[2] = true); | ||||
step1.Join(); | ||||
p.Cancel(); | ||||
try { | ||||
Assert.AreEqual(p.Join(), "Hi, Alan"); | ||||
Assert.Fail("Shouldn't get here"); | ||||
} catch (OperationCanceledException) { | ||||
} | ||||
Assert.IsFalse(flags[0]); | ||||
Assert.IsTrue(flags[1]); | ||||
Assert.IsTrue(flags[2]); | ||||
} | ||||
[TestMethod] | ||||
public void ChainedCancel1Test() { | ||||
// при отмене сцепленной асинхронной операции все обещание должно | ||||
// завершаться ошибкой OperationCanceledException | ||||
var p = PromiseHelper | ||||
.Sleep(1, "Hi, HAL!") | ||||
.Then(x => { | ||||
// запускаем две асинхронные операции | ||||
var result = PromiseHelper.Sleep(1000, "HEM ENABLED!!!"); | ||||
// вторая операция отменяет первую до завершения | ||||
PromiseHelper | ||||
.Sleep(100, "HAL, STOP!") | ||||
.Then(result.Cancel); | ||||
return result; | ||||
}); | ||||
try { | ||||
p.Join(); | ||||
} catch (TargetInvocationException err) { | ||||
Assert.IsTrue(err.InnerException is OperationCanceledException); | ||||
} | ||||
} | ||||
[TestMethod] | ||||
public void ChainedCancel2Test() { | ||||
// при отмене цепочки обещаний, вложенные операции также должны отменяться | ||||
var pSurvive = new Promise<bool>(); | ||||
var hemStarted = new ManualResetEvent(false); | ||||
var p = PromiseHelper | ||||
.Sleep(1, "Hi, HAL!") | ||||
.Chain(x => { | ||||
hemStarted.Set(); | ||||
// запускаем две асинхронные операции | ||||
var result = PromiseHelper | ||||
cin
|
r106 | .Sleep(100000000, "HEM ENABLED!!!") | ||
.Then(s => { | ||||
pSurvive.Resolve(false); | ||||
return s; | ||||
}); | ||||
cin
|
r77 | |||
result | ||||
.Cancelled(() => pSurvive.Resolve(true)); | ||||
return result; | ||||
}); | ||||
hemStarted.WaitOne(); | ||||
p.Cancel(); | ||||
try { | ||||
p.Join(); | ||||
} catch (OperationCanceledException) { | ||||
Assert.IsTrue(pSurvive.Join()); | ||||
} | ||||
} | ||||
} | ||||
} | ||||