|
|
using System;
|
|
|
using System.Reflection;
|
|
|
using System.Threading;
|
|
|
using Implab.Parallels;
|
|
|
|
|
|
#if MONO
|
|
|
|
|
|
using NUnit.Framework;
|
|
|
using TestClassAttribute = NUnit.Framework.TestFixtureAttribute;
|
|
|
using TestMethodAttribute = NUnit.Framework.TestAttribute;
|
|
|
|
|
|
#else
|
|
|
|
|
|
using Microsoft.VisualStudio.TestTools.UnitTesting;
|
|
|
|
|
|
#endif
|
|
|
|
|
|
namespace Implab.Test {
|
|
|
[TestClass]
|
|
|
public class AsyncTests {
|
|
|
[TestMethod]
|
|
|
public void ResolveTest() {
|
|
|
int res = -1;
|
|
|
var p = new Promise<int>();
|
|
|
p.Then(x => res = x);
|
|
|
p.Resolve(100);
|
|
|
|
|
|
Assert.AreEqual(100, res);
|
|
|
}
|
|
|
|
|
|
[TestMethod]
|
|
|
public void RejectTest() {
|
|
|
int res = -1;
|
|
|
Exception err = null;
|
|
|
|
|
|
var p = new Promise<int>();
|
|
|
p.Then(
|
|
|
x => res = x,
|
|
|
e => {
|
|
|
err = e;
|
|
|
return -2;
|
|
|
}
|
|
|
);
|
|
|
p.Reject(new ApplicationException("error"));
|
|
|
|
|
|
Assert.AreEqual(res, -1);
|
|
|
Assert.AreEqual(err.Message, "error");
|
|
|
|
|
|
}
|
|
|
|
|
|
[TestMethod]
|
|
|
public void CancelExceptionTest() {
|
|
|
var p = new Promise<bool>();
|
|
|
p.CancelOperation(null);
|
|
|
|
|
|
var p2 = p.Then(x => x, null, reason => {
|
|
|
throw new ApplicationException("CANCELLED");
|
|
|
});
|
|
|
|
|
|
try {
|
|
|
p2.Join();
|
|
|
Assert.Fail();
|
|
|
} catch (ApplicationException err) {
|
|
|
Assert.AreEqual("CANCELLED", err.InnerException.Message);
|
|
|
}
|
|
|
|
|
|
}
|
|
|
|
|
|
[TestMethod]
|
|
|
public void ContinueOnCancelTest() {
|
|
|
var p = new Promise<bool>();
|
|
|
p.CancelOperation(null);
|
|
|
|
|
|
var p2 = p
|
|
|
.Then(x => x, null, reason => {
|
|
|
throw new ApplicationException("CANCELLED");
|
|
|
})
|
|
|
.Then(x => x, e => true);
|
|
|
|
|
|
Assert.AreEqual(true, p2.Join());
|
|
|
}
|
|
|
|
|
|
[TestMethod]
|
|
|
public void JoinSuccessTest() {
|
|
|
var p = new Promise<int>();
|
|
|
p.Resolve(100);
|
|
|
Assert.AreEqual(p.Join(), 100);
|
|
|
}
|
|
|
|
|
|
[TestMethod]
|
|
|
public void JoinFailTest() {
|
|
|
var p = new Promise<int>();
|
|
|
p.Reject(new ApplicationException("failed"));
|
|
|
|
|
|
try {
|
|
|
p.Join();
|
|
|
throw new ApplicationException("WRONG!");
|
|
|
} catch (TargetInvocationException err) {
|
|
|
Assert.AreEqual(err.InnerException.Message, "failed");
|
|
|
} catch {
|
|
|
Assert.Fail("Got wrong excaption");
|
|
|
}
|
|
|
}
|
|
|
|
|
|
[TestMethod]
|
|
|
public void MapTest() {
|
|
|
var p = new Promise<int>();
|
|
|
|
|
|
var p2 = p.Then(x => x.ToString());
|
|
|
p.Resolve(100);
|
|
|
|
|
|
Assert.AreEqual(p2.Join(), "100");
|
|
|
}
|
|
|
|
|
|
[TestMethod]
|
|
|
public void FixErrorTest() {
|
|
|
var p = new Promise<int>();
|
|
|
|
|
|
var p2 = p.Then(x => x, e => 101);
|
|
|
|
|
|
p.Reject(new Exception());
|
|
|
|
|
|
Assert.AreEqual(p2.Join(), 101);
|
|
|
}
|
|
|
|
|
|
[TestMethod]
|
|
|
public void ChainTest() {
|
|
|
var p1 = new Promise<int>();
|
|
|
|
|
|
var p3 = p1.Chain(x => {
|
|
|
var p2 = new Promise<string>();
|
|
|
p2.Resolve(x.ToString());
|
|
|
return p2;
|
|
|
});
|
|
|
|
|
|
p1.Resolve(100);
|
|
|
|
|
|
Assert.AreEqual(p3.Join(), "100");
|
|
|
}
|
|
|
|
|
|
[TestMethod]
|
|
|
public void ChainFailTest() {
|
|
|
var p1 = new Promise<int>();
|
|
|
|
|
|
var p3 = p1.Chain(x => {
|
|
|
var p2 = new Promise<string>();
|
|
|
p2.Reject(new Exception("DIE!!!"));
|
|
|
return p2;
|
|
|
});
|
|
|
|
|
|
p1.Resolve(100);
|
|
|
|
|
|
Assert.IsTrue(p3.IsResolved);
|
|
|
}
|
|
|
|
|
|
[TestMethod]
|
|
|
public void PoolTest() {
|
|
|
var pid = Thread.CurrentThread.ManagedThreadId;
|
|
|
var p = AsyncPool.Invoke(() => Thread.CurrentThread.ManagedThreadId);
|
|
|
|
|
|
Assert.AreNotEqual(pid, p.Join());
|
|
|
}
|
|
|
|
|
|
[TestMethod]
|
|
|
public void WorkerPoolSizeTest() {
|
|
|
var pool = new WorkerPool(5, 10, 1);
|
|
|
|
|
|
Assert.AreEqual(5, pool.PoolSize);
|
|
|
|
|
|
pool.Invoke(() => { Thread.Sleep(100000000); return 10; });
|
|
|
pool.Invoke(() => { Thread.Sleep(100000000); return 10; });
|
|
|
pool.Invoke(() => { Thread.Sleep(100000000); return 10; });
|
|
|
|
|
|
Assert.AreEqual(5, pool.PoolSize);
|
|
|
|
|
|
for (int i = 0; i < 100; i++)
|
|
|
pool.Invoke(() => { Thread.Sleep(100000000); return 10; });
|
|
|
Thread.Sleep(200);
|
|
|
Assert.AreEqual(10, pool.PoolSize);
|
|
|
|
|
|
pool.Dispose();
|
|
|
}
|
|
|
|
|
|
[TestMethod]
|
|
|
public void WorkerPoolCorrectTest() {
|
|
|
var pool = new WorkerPool(0,1000,100);
|
|
|
|
|
|
const int iterations = 1000;
|
|
|
int pending = iterations;
|
|
|
var stop = new ManualResetEvent(false);
|
|
|
|
|
|
var count = 0;
|
|
|
for (int i = 0; i < iterations; i++) {
|
|
|
pool
|
|
|
.Invoke(() => 1)
|
|
|
.Then(x => Interlocked.Add(ref count, x))
|
|
|
.Then(x => Math.Log10(x))
|
|
|
.On(() => {
|
|
|
Interlocked.Decrement(ref pending);
|
|
|
if (pending == 0)
|
|
|
stop.Set();
|
|
|
}, PromiseEventType.All);
|
|
|
}
|
|
|
|
|
|
stop.WaitOne();
|
|
|
|
|
|
Assert.AreEqual(iterations, count);
|
|
|
Console.WriteLine("Max threads: {0}", pool.MaxRunningThreads);
|
|
|
pool.Dispose();
|
|
|
|
|
|
}
|
|
|
|
|
|
[TestMethod]
|
|
|
public void WorkerPoolDisposeTest() {
|
|
|
var pool = new WorkerPool(5, 20);
|
|
|
Assert.AreEqual(5, pool.PoolSize);
|
|
|
pool.Dispose();
|
|
|
Thread.Sleep(500);
|
|
|
Assert.AreEqual(0, pool.PoolSize);
|
|
|
pool.Dispose();
|
|
|
}
|
|
|
|
|
|
[TestMethod]
|
|
|
public void MTQueueTest() {
|
|
|
var queue = new MTQueue<int>();
|
|
|
int res;
|
|
|
|
|
|
queue.Enqueue(10);
|
|
|
Assert.IsTrue(queue.TryDequeue(out res));
|
|
|
Assert.AreEqual(10, res);
|
|
|
Assert.IsFalse(queue.TryDequeue(out res));
|
|
|
|
|
|
for (int i = 0; i < 1000; i++)
|
|
|
queue.Enqueue(i);
|
|
|
|
|
|
for (int i = 0; i < 1000; i++) {
|
|
|
queue.TryDequeue(out res);
|
|
|
Assert.AreEqual(i, res);
|
|
|
}
|
|
|
|
|
|
int writers = 0;
|
|
|
int readers = 0;
|
|
|
var stop = new ManualResetEvent(false);
|
|
|
int total = 0;
|
|
|
|
|
|
const int itemsPerWriter = 10000;
|
|
|
const int writersCount = 10;
|
|
|
|
|
|
for (int i = 0; i < writersCount; i++) {
|
|
|
Interlocked.Increment(ref writers);
|
|
|
AsyncPool
|
|
|
.RunThread(() => {
|
|
|
for (int ii = 0; ii < itemsPerWriter; ii++) {
|
|
|
queue.Enqueue(1);
|
|
|
}
|
|
|
return 1;
|
|
|
})
|
|
|
.On(() => Interlocked.Decrement(ref writers), PromiseEventType.All);
|
|
|
}
|
|
|
|
|
|
for (int i = 0; i < 10; i++) {
|
|
|
Interlocked.Increment(ref readers);
|
|
|
AsyncPool
|
|
|
.RunThread(() => {
|
|
|
int t;
|
|
|
do {
|
|
|
while (queue.TryDequeue(out t))
|
|
|
Interlocked.Add(ref total, t);
|
|
|
} while (writers > 0);
|
|
|
return 1;
|
|
|
})
|
|
|
.On(() => {
|
|
|
Interlocked.Decrement(ref readers);
|
|
|
if (readers == 0)
|
|
|
stop.Set();
|
|
|
}, PromiseEventType.All);
|
|
|
}
|
|
|
|
|
|
stop.WaitOne();
|
|
|
|
|
|
Assert.AreEqual(100000, total);
|
|
|
}
|
|
|
|
|
|
[TestMethod]
|
|
|
public void AsyncQueueTest() {
|
|
|
var queue = new AsyncQueue<int>();
|
|
|
int res;
|
|
|
|
|
|
queue.Enqueue(10);
|
|
|
Assert.IsTrue(queue.TryDequeue(out res));
|
|
|
Assert.AreEqual(10, res);
|
|
|
Assert.IsFalse(queue.TryDequeue(out res));
|
|
|
|
|
|
for (int i = 0; i < 1000; i++)
|
|
|
queue.Enqueue(i);
|
|
|
|
|
|
for (int i = 0; i < 1000; i++) {
|
|
|
queue.TryDequeue(out res);
|
|
|
Assert.AreEqual(i, res);
|
|
|
}
|
|
|
|
|
|
const int count = 10000000;
|
|
|
|
|
|
int res1 = 0, res2 = 0;
|
|
|
var t1 = Environment.TickCount;
|
|
|
|
|
|
AsyncPool.RunThread(
|
|
|
() => {
|
|
|
for (var i = 0; i < count; i++)
|
|
|
queue.Enqueue(1);
|
|
|
Console.WriteLine("done writer #1: {0} ms", Environment.TickCount - t1);
|
|
|
},
|
|
|
() => {
|
|
|
for (var i = 0; i < count; i++)
|
|
|
queue.Enqueue(2);
|
|
|
Console.WriteLine("done writer #2: {0} ms", Environment.TickCount - t1);
|
|
|
},
|
|
|
() => {
|
|
|
int temp;
|
|
|
int i = 0;
|
|
|
while (i < count)
|
|
|
if (queue.TryDequeue(out temp)) {
|
|
|
i++;
|
|
|
res1 += temp;
|
|
|
}
|
|
|
Console.WriteLine("done reader #1: {0} ms", Environment.TickCount - t1);
|
|
|
},
|
|
|
() => {
|
|
|
int temp;
|
|
|
int i = 0;
|
|
|
while (i < count)
|
|
|
if (queue.TryDequeue(out temp)) {
|
|
|
i++;
|
|
|
res2 += temp;
|
|
|
}
|
|
|
Console.WriteLine("done reader #2: {0} ms", Environment.TickCount - t1);
|
|
|
}
|
|
|
)
|
|
|
.PromiseAll()
|
|
|
.Join();
|
|
|
|
|
|
Assert.AreEqual(count * 3, res1 + res2);
|
|
|
|
|
|
Console.WriteLine(
|
|
|
"done: {0} ms, summ#1: {1}, summ#2: {2}, total: {3}, count: {4}",
|
|
|
Environment.TickCount - t1,
|
|
|
res1,
|
|
|
res2,
|
|
|
res1 + res2,
|
|
|
count
|
|
|
);
|
|
|
}
|
|
|
|
|
|
[TestMethod]
|
|
|
public void AsyncQueueBatchTest() {
|
|
|
var queue = new AsyncQueue<int>();
|
|
|
|
|
|
const int wBatch = 29;
|
|
|
const int wCount = 400000;
|
|
|
const int total = wBatch * wCount * 2;
|
|
|
const int summ = wBatch * wCount * 3;
|
|
|
|
|
|
int r1 = 0, r2 = 0;
|
|
|
const int rBatch = 111;
|
|
|
int read = 0;
|
|
|
|
|
|
var t1 = Environment.TickCount;
|
|
|
|
|
|
AsyncPool.RunThread(
|
|
|
() => {
|
|
|
var buffer = new int[wBatch];
|
|
|
for(int i = 0; i<wBatch; i++)
|
|
|
buffer[i] = 1;
|
|
|
|
|
|
for(int i =0; i < wCount; i++)
|
|
|
queue.EnqueueRange(buffer,0,wBatch);
|
|
|
Console.WriteLine("done writer #1: {0} ms", Environment.TickCount - t1);
|
|
|
},
|
|
|
() => {
|
|
|
var buffer = new int[wBatch];
|
|
|
for(int i = 0; i<wBatch; i++)
|
|
|
buffer[i] = 2;
|
|
|
|
|
|
for(int i =0; i < wCount; i++)
|
|
|
queue.EnqueueRange(buffer,0,wBatch);
|
|
|
Console.WriteLine("done writer #2: {0} ms", Environment.TickCount - t1);
|
|
|
},
|
|
|
() => {
|
|
|
var buffer = new int[rBatch];
|
|
|
|
|
|
while(read < total) {
|
|
|
int actual;
|
|
|
if (queue.TryDequeueRange(buffer,0,rBatch,out actual)) {
|
|
|
for(int i=0; i< actual; i++)
|
|
|
r1 += buffer[i];
|
|
|
Interlocked.Add(ref read, actual);
|
|
|
}
|
|
|
}
|
|
|
|
|
|
Console.WriteLine("done reader #1: {0} ms", Environment.TickCount - t1);
|
|
|
},
|
|
|
() => {
|
|
|
var buffer = new int[rBatch];
|
|
|
|
|
|
while(read < total) {
|
|
|
int actual;
|
|
|
if (queue.TryDequeueRange(buffer,0,rBatch,out actual)) {
|
|
|
for(int i=0; i< actual; i++)
|
|
|
r2 += buffer[i];
|
|
|
Interlocked.Add(ref read, actual);
|
|
|
}
|
|
|
}
|
|
|
|
|
|
Console.WriteLine("done reader #2: {0} ms", Environment.TickCount - t1);
|
|
|
}
|
|
|
)
|
|
|
.PromiseAll()
|
|
|
.Join();
|
|
|
|
|
|
Assert.AreEqual(summ , r1 + r2);
|
|
|
|
|
|
Console.WriteLine(
|
|
|
"done: {0} ms, summ#1: {1}, summ#2: {2}, total: {3}, count: {4}",
|
|
|
Environment.TickCount - t1,
|
|
|
r1,
|
|
|
r2,
|
|
|
r1 + r2,
|
|
|
total
|
|
|
);
|
|
|
}
|
|
|
|
|
|
[TestMethod]
|
|
|
public void AsyncQueueChunkDequeueTest() {
|
|
|
var queue = new AsyncQueue<int>();
|
|
|
|
|
|
const int wBatch = 31;
|
|
|
const int wCount = 200000;
|
|
|
const int total = wBatch * wCount * 3;
|
|
|
const int summ = wBatch * wCount * 6;
|
|
|
|
|
|
int r1 = 0, r2 = 0;
|
|
|
const int rBatch = 1024;
|
|
|
int read = 0;
|
|
|
|
|
|
var t1 = Environment.TickCount;
|
|
|
|
|
|
AsyncPool.RunThread(
|
|
|
() => {
|
|
|
var buffer = new int[wBatch];
|
|
|
for(int i = 0; i<wBatch; i++)
|
|
|
buffer[i] = 1;
|
|
|
|
|
|
for(int i =0; i < wCount; i++)
|
|
|
queue.EnqueueRange(buffer,0,wBatch);
|
|
|
Console.WriteLine("done writer #1: {0} ms", Environment.TickCount - t1);
|
|
|
},
|
|
|
() => {
|
|
|
var buffer = new int[wBatch];
|
|
|
for(int i = 0; i<wBatch; i++)
|
|
|
buffer[i] = 2;
|
|
|
|
|
|
for(int i =0; i < wCount; i++)
|
|
|
queue.EnqueueRange(buffer,0,wBatch);
|
|
|
Console.WriteLine("done writer #2: {0} ms", Environment.TickCount - t1);
|
|
|
},
|
|
|
() => {
|
|
|
var buffer = new int[wBatch];
|
|
|
for(int i = 0; i<wBatch; i++)
|
|
|
buffer[i] = 3;
|
|
|
|
|
|
for(int i =0; i < wCount; i++)
|
|
|
queue.EnqueueRange(buffer,0,wBatch);
|
|
|
Console.WriteLine("done writer #3: {0} ms", Environment.TickCount - t1);
|
|
|
},
|
|
|
() => {
|
|
|
var buffer = new int[rBatch];
|
|
|
int count = 1;
|
|
|
double avgchunk = 0;
|
|
|
while(read < total) {
|
|
|
int actual;
|
|
|
if (queue.TryDequeueChunk(buffer,0,rBatch,out actual)) {
|
|
|
for(int i=0; i< actual; i++)
|
|
|
r2 += buffer[i];
|
|
|
Interlocked.Add(ref read, actual);
|
|
|
avgchunk = avgchunk*(count-1)/count + actual/(double)count;
|
|
|
count ++;
|
|
|
}
|
|
|
}
|
|
|
|
|
|
Console.WriteLine("done reader #2: {0} ms, avg chunk size: {1}", Environment.TickCount - t1, avgchunk);
|
|
|
}
|
|
|
)
|
|
|
.PromiseAll()
|
|
|
.Join();
|
|
|
|
|
|
Assert.AreEqual(summ , r1 + r2);
|
|
|
|
|
|
Console.WriteLine(
|
|
|
"done: {0} ms, summ#1: {1}, summ#2: {2}, total: {3}, count: {4}",
|
|
|
Environment.TickCount - t1,
|
|
|
r1,
|
|
|
r2,
|
|
|
r1 + r2,
|
|
|
total
|
|
|
);
|
|
|
}
|
|
|
|
|
|
[TestMethod]
|
|
|
public void AsyncQueueDrainTest() {
|
|
|
var queue = new AsyncQueue<int>();
|
|
|
|
|
|
const int wBatch = 11;
|
|
|
const int wCount = 200000;
|
|
|
const int total = wBatch * wCount * 3;
|
|
|
const int summ = wBatch * wCount * 3;
|
|
|
|
|
|
int r1 = 0, r2 = 0;
|
|
|
const int rBatch = 11;
|
|
|
int read = 0;
|
|
|
|
|
|
var t1 = Environment.TickCount;
|
|
|
|
|
|
AsyncPool.RunThread(
|
|
|
() => {
|
|
|
var buffer = new int[wBatch];
|
|
|
for(int i = 0; i<wBatch; i++)
|
|
|
buffer[i] = 1;
|
|
|
|
|
|
for(int i =0; i < wCount; i++)
|
|
|
queue.EnqueueRange(buffer,0,wBatch);
|
|
|
Console.WriteLine("done writer #1: {0} ms", Environment.TickCount - t1);
|
|
|
},
|
|
|
() => {
|
|
|
for(int i =0; i < wCount * wBatch; i++)
|
|
|
queue.Enqueue(1);
|
|
|
Console.WriteLine("done writer #2: {0} ms", Environment.TickCount - t1);
|
|
|
},
|
|
|
() => {
|
|
|
var buffer = new int[wBatch];
|
|
|
for(int i = 0; i<wBatch; i++)
|
|
|
buffer[i] = 1;
|
|
|
|
|
|
for(int i =0; i < wCount; i++)
|
|
|
queue.EnqueueRange(buffer,0,wBatch);
|
|
|
Console.WriteLine("done writer #3: {0} ms", Environment.TickCount - t1);
|
|
|
},
|
|
|
/*() => {
|
|
|
int temp;
|
|
|
int count = 0;
|
|
|
while (read < total)
|
|
|
if (queue.TryDequeue(out temp)) {
|
|
|
count++;
|
|
|
r1 += temp;
|
|
|
Interlocked.Increment(ref read);
|
|
|
}
|
|
|
Console.WriteLine("done reader #1: {0} ms, {1} count", Environment.TickCount - t1, count);
|
|
|
},*/
|
|
|
/*() => {
|
|
|
var buffer = new int[rBatch];
|
|
|
var count = 0;
|
|
|
while(read < total) {
|
|
|
int actual;
|
|
|
if (queue.TryDequeueRange(buffer,0,rBatch,out actual)) {
|
|
|
for(int i=0; i< actual; i++)
|
|
|
r1 += buffer[i];
|
|
|
Interlocked.Add(ref read, actual);
|
|
|
count += actual;
|
|
|
}
|
|
|
}
|
|
|
|
|
|
Console.WriteLine("done reader #1: {0} ms, {1} items", Environment.TickCount - t1, count);
|
|
|
},*/
|
|
|
() => {
|
|
|
var count = 0;
|
|
|
while(read < total) {
|
|
|
var buffer = queue.Drain();
|
|
|
for(int i=0; i< buffer.Length; i++)
|
|
|
r1 += buffer[i];
|
|
|
Interlocked.Add(ref read, buffer.Length);
|
|
|
count += buffer.Length;
|
|
|
}
|
|
|
Console.WriteLine("done reader #1: {0} ms, {1} items", Environment.TickCount - t1, count);
|
|
|
},
|
|
|
() => {
|
|
|
var count = 0;
|
|
|
while(read < total) {
|
|
|
var buffer = queue.Drain();
|
|
|
for(int i=0; i< buffer.Length; i++)
|
|
|
r2 += buffer[i];
|
|
|
Interlocked.Add(ref read, buffer.Length);
|
|
|
count += buffer.Length;
|
|
|
}
|
|
|
Console.WriteLine("done reader #2: {0} ms, {1} items", Environment.TickCount - t1, count);
|
|
|
}
|
|
|
)
|
|
|
.PromiseAll()
|
|
|
.Join();
|
|
|
|
|
|
Assert.AreEqual(summ , r1 + r2);
|
|
|
|
|
|
Console.WriteLine(
|
|
|
"done: {0} ms, summ#1: {1}, summ#2: {2}, total: {3}, count: {4}",
|
|
|
Environment.TickCount - t1,
|
|
|
r1,
|
|
|
r2,
|
|
|
r1 + r2,
|
|
|
total
|
|
|
);
|
|
|
}
|
|
|
|
|
|
[TestMethod]
|
|
|
public void ParallelMapTest() {
|
|
|
|
|
|
const int count = 100000;
|
|
|
|
|
|
var args = new double[count];
|
|
|
var rand = new Random();
|
|
|
|
|
|
for (int i = 0; i < count; i++)
|
|
|
args[i] = rand.NextDouble();
|
|
|
|
|
|
var t = Environment.TickCount;
|
|
|
var res = args.ParallelMap(x => Math.Sin(x*x), 4).Join();
|
|
|
|
|
|
Console.WriteLine("Map complete in {0} ms", Environment.TickCount - t);
|
|
|
|
|
|
t = Environment.TickCount;
|
|
|
for (int i = 0; i < count; i++)
|
|
|
Assert.AreEqual(Math.Sin(args[i] * args[i]), res[i]);
|
|
|
Console.WriteLine("Verified in {0} ms", Environment.TickCount - t);
|
|
|
}
|
|
|
|
|
|
[TestMethod]
|
|
|
public void ChainedMapTest() {
|
|
|
|
|
|
using (var pool = new WorkerPool()) {
|
|
|
const int count = 10000;
|
|
|
|
|
|
var args = new double[count];
|
|
|
var rand = new Random();
|
|
|
|
|
|
for (int i = 0; i < count; i++)
|
|
|
args[i] = rand.NextDouble();
|
|
|
|
|
|
var t = Environment.TickCount;
|
|
|
var res = args
|
|
|
.ChainedMap(
|
|
|
// Analysis disable once AccessToDisposedClosure
|
|
|
x => pool.Invoke(
|
|
|
() => Math.Sin(x * x)
|
|
|
),
|
|
|
4
|
|
|
)
|
|
|
.Join();
|
|
|
|
|
|
Console.WriteLine("Map complete in {0} ms", Environment.TickCount - t);
|
|
|
|
|
|
t = Environment.TickCount;
|
|
|
for (int i = 0; i < count; i++)
|
|
|
Assert.AreEqual(Math.Sin(args[i] * args[i]), res[i]);
|
|
|
Console.WriteLine("Verified in {0} ms", Environment.TickCount - t);
|
|
|
Console.WriteLine("Max workers: {0}", pool.MaxRunningThreads);
|
|
|
}
|
|
|
}
|
|
|
|
|
|
[TestMethod]
|
|
|
public void ParallelForEachTest() {
|
|
|
|
|
|
const int count = 100000;
|
|
|
|
|
|
var args = new int[count];
|
|
|
var rand = new Random();
|
|
|
|
|
|
for (int i = 0; i < count; i++)
|
|
|
args[i] = (int)(rand.NextDouble() * 100);
|
|
|
|
|
|
int result = 0;
|
|
|
|
|
|
var t = Environment.TickCount;
|
|
|
args.ParallelForEach(x => Interlocked.Add(ref result, x), 4).Join();
|
|
|
|
|
|
Console.WriteLine("Iteration complete in {0} ms, result: {1}", Environment.TickCount - t, result);
|
|
|
|
|
|
int result2 = 0;
|
|
|
|
|
|
t = Environment.TickCount;
|
|
|
for (int i = 0; i < count; i++)
|
|
|
result2 += args[i];
|
|
|
Assert.AreEqual(result2, result);
|
|
|
Console.WriteLine("Verified in {0} ms", Environment.TickCount - t);
|
|
|
}
|
|
|
|
|
|
[TestMethod]
|
|
|
public void ComplexCase1Test() {
|
|
|
var flags = new bool[3];
|
|
|
|
|
|
// op1 (aync 200ms) => op2 (async 200ms) => op3 (sync map)
|
|
|
|
|
|
var step1 = PromiseHelper
|
|
|
.Sleep(200, "Alan")
|
|
|
.On(() => flags[0] = true, PromiseEventType.Cancelled);
|
|
|
var p = step1
|
|
|
.Chain(x =>
|
|
|
PromiseHelper
|
|
|
.Sleep(200, "Hi, " + x)
|
|
|
.Then(y => y)
|
|
|
.On(() => flags[1] = true, PromiseEventType.Cancelled)
|
|
|
)
|
|
|
.On(() => flags[2] = true, PromiseEventType.Cancelled);
|
|
|
step1.Join();
|
|
|
p.Cancel();
|
|
|
try {
|
|
|
Assert.AreEqual(p.Join(), "Hi, Alan");
|
|
|
Assert.Fail("Shouldn't get here");
|
|
|
} catch (OperationCanceledException) {
|
|
|
}
|
|
|
|
|
|
Assert.IsFalse(flags[0]);
|
|
|
Assert.IsTrue(flags[1]);
|
|
|
Assert.IsTrue(flags[2]);
|
|
|
}
|
|
|
|
|
|
[TestMethod]
|
|
|
public void ChainedCancel1Test() {
|
|
|
// при отмене сцепленной асинхронной операции все обещание должно
|
|
|
// завершаться ошибкой OperationCanceledException
|
|
|
var p = PromiseHelper
|
|
|
.Sleep(1, "Hi, HAL!")
|
|
|
.Then(x => {
|
|
|
// запускаем две асинхронные операции
|
|
|
var result = PromiseHelper.Sleep(1000, "HEM ENABLED!!!");
|
|
|
// вторая операция отменяет первую до завершения
|
|
|
PromiseHelper
|
|
|
.Sleep(100, "HAL, STOP!")
|
|
|
.Then(result.Cancel);
|
|
|
return result;
|
|
|
});
|
|
|
try {
|
|
|
p.Join();
|
|
|
} catch (TargetInvocationException err) {
|
|
|
Assert.IsTrue(err.InnerException is OperationCanceledException);
|
|
|
}
|
|
|
}
|
|
|
|
|
|
[TestMethod]
|
|
|
public void ChainedCancel2Test() {
|
|
|
// при отмене цепочки обещаний, вложенные операции также должны отменяться
|
|
|
var pSurvive = new Promise<bool>();
|
|
|
var hemStarted = new Signal();
|
|
|
var p = PromiseHelper
|
|
|
.Sleep(1, "Hi, HAL!")
|
|
|
.Chain(() => {
|
|
|
hemStarted.Set();
|
|
|
// запускаем две асинхронные операции
|
|
|
var result = PromiseHelper
|
|
|
.Sleep(2000, "HEM ENABLED!!!")
|
|
|
.Then(() => pSurvive.Resolve(false));
|
|
|
|
|
|
result
|
|
|
.On(() => pSurvive.Resolve(true), PromiseEventType.Cancelled);
|
|
|
|
|
|
return result;
|
|
|
});
|
|
|
|
|
|
hemStarted.Wait();
|
|
|
p.Cancel();
|
|
|
|
|
|
try {
|
|
|
p.Join();
|
|
|
Assert.Fail();
|
|
|
} catch (OperationCanceledException) {
|
|
|
}
|
|
|
Assert.IsTrue(pSurvive.Join());
|
|
|
}
|
|
|
|
|
|
[TestMethod]
|
|
|
public void SharedLockTest() {
|
|
|
var l = new SharedLock();
|
|
|
int shared = 0;
|
|
|
int exclusive = 0;
|
|
|
var s1 = new Signal();
|
|
|
var log = new AsyncQueue<string>();
|
|
|
|
|
|
try {
|
|
|
AsyncPool.RunThread(
|
|
|
() => {
|
|
|
log.Enqueue("Reader #1 started");
|
|
|
try {
|
|
|
l.LockShared();
|
|
|
log.Enqueue("Reader #1 lock got");
|
|
|
if (Interlocked.Increment(ref shared) == 2)
|
|
|
s1.Set();
|
|
|
s1.Wait();
|
|
|
log.Enqueue("Reader #1 finished");
|
|
|
Interlocked.Decrement(ref shared);
|
|
|
} finally {
|
|
|
l.Release();
|
|
|
log.Enqueue("Reader #1 lock released");
|
|
|
}
|
|
|
},
|
|
|
() => {
|
|
|
log.Enqueue("Reader #2 started");
|
|
|
|
|
|
try {
|
|
|
l.LockShared();
|
|
|
log.Enqueue("Reader #2 lock got");
|
|
|
|
|
|
if (Interlocked.Increment(ref shared) == 2)
|
|
|
s1.Set();
|
|
|
s1.Wait();
|
|
|
log.Enqueue("Reader #2 upgrading to writer");
|
|
|
Interlocked.Decrement(ref shared);
|
|
|
l.Upgrade();
|
|
|
log.Enqueue("Reader #2 upgraded");
|
|
|
|
|
|
Assert.AreEqual(1, Interlocked.Increment(ref exclusive));
|
|
|
Assert.AreEqual(0, shared);
|
|
|
log.Enqueue("Reader #2 finished");
|
|
|
Interlocked.Decrement(ref exclusive);
|
|
|
} finally {
|
|
|
l.Release();
|
|
|
log.Enqueue("Reader #2 lock released");
|
|
|
}
|
|
|
},
|
|
|
() => {
|
|
|
log.Enqueue("Writer #1 started");
|
|
|
try {
|
|
|
l.LockExclusive();
|
|
|
log.Enqueue("Writer #1 got the lock");
|
|
|
Assert.AreEqual(1, Interlocked.Increment(ref exclusive));
|
|
|
Interlocked.Decrement(ref exclusive);
|
|
|
log.Enqueue("Writer #1 is finished");
|
|
|
} finally {
|
|
|
l.Release();
|
|
|
log.Enqueue("Writer #1 lock released");
|
|
|
}
|
|
|
}
|
|
|
).PromiseAll().Join(1000);
|
|
|
log.Enqueue("Done");
|
|
|
} catch(Exception error) {
|
|
|
log.Enqueue(error.Message);
|
|
|
throw;
|
|
|
} finally {
|
|
|
foreach (var m in log)
|
|
|
Console.WriteLine(m);
|
|
|
}
|
|
|
}
|
|
|
|
|
|
#if NET_4_5
|
|
|
|
|
|
[TestMethod]
|
|
|
public async void TaskInteropTest() {
|
|
|
var promise = new Promise<int>();
|
|
|
promise.Resolve(10);
|
|
|
var res = await promise;
|
|
|
|
|
|
Assert.AreEqual(10, res);
|
|
|
}
|
|
|
|
|
|
#endif
|
|
|
}
|
|
|
}
|
|
|
|
|
|
|