##// END OF EJS Templates
small fixes, WorkerPool still incomplete
cin -
r13:b0feb5b9ad1c promises
parent child
Show More
@@ -1,139 +1,169
1 1 using System;
2 2 using Microsoft.VisualStudio.TestTools.UnitTesting;
3 3 using System.Reflection;
4 4 using System.Threading;
5 5 using Implab.Parallels;
6 6
7 7 namespace Implab.Test
8 8 {
9 9 [TestClass]
10 10 public class AsyncTests
11 11 {
12 12 [TestMethod]
13 13 public void ResolveTest ()
14 14 {
15 15 int res = -1;
16 16 var p = new Promise<int> ();
17 17 p.Then (x => res = x);
18 18 p.Resolve (100);
19 19
20 20 Assert.AreEqual (res, 100);
21 21 }
22 22
23 23 [TestMethod]
24 24 public void RejectTest ()
25 25 {
26 26 int res = -1;
27 27 Exception err = null;
28 28
29 29 var p = new Promise<int> ();
30 30 p.Then (x => res = x, e => err = e);
31 31 p.Reject (new ApplicationException ("error"));
32 32
33 33 Assert.AreEqual (res, -1);
34 34 Assert.AreEqual (err.Message, "error");
35 35
36 36 }
37 37
38 38 [TestMethod]
39 39 public void JoinSuccessTest ()
40 40 {
41 41 var p = new Promise<int> ();
42 42 p.Resolve (100);
43 43 Assert.AreEqual (p.Join (), 100);
44 44 }
45 45
46 46 [TestMethod]
47 47 public void JoinFailTest ()
48 48 {
49 49 var p = new Promise<int> ();
50 50 p.Reject (new ApplicationException ("failed"));
51 51
52 52 try {
53 53 p.Join ();
54 54 throw new ApplicationException ("WRONG!");
55 55 } catch (TargetInvocationException err) {
56 56 Assert.AreEqual (err.InnerException.Message, "failed");
57 57 } catch {
58 58 Assert.Fail ("Got wrong excaption");
59 59 }
60 60 }
61 61
62 62 [TestMethod]
63 63 public void MapTest ()
64 64 {
65 65 var p = new Promise<int> ();
66 66
67 67 var p2 = p.Map (x => x.ToString ());
68 68 p.Resolve (100);
69 69
70 70 Assert.AreEqual (p2.Join (), "100");
71 71 }
72 72
73 73 [TestMethod]
74 74 public void FixErrorTest() {
75 75 var p = new Promise<int>();
76 76
77 77 var p2 = p.Error(e => 101);
78 78
79 79 p.Reject(new Exception());
80 80
81 81 Assert.AreEqual(p2.Join(), 101);
82 82 }
83 83
84 84 [TestMethod]
85 85 public void ChainTest ()
86 86 {
87 87 var p1 = new Promise<int> ();
88 88
89 89 var p3 = p1.Chain (x => {
90 90 var p2 = new Promise<string> ();
91 91 p2.Resolve (x.ToString ());
92 92 return p2;
93 93 });
94 94
95 95 p1.Resolve (100);
96 96
97 97 Assert.AreEqual (p3.Join (), "100");
98 98 }
99 99
100 100 [TestMethod]
101 101 public void PoolTest ()
102 102 {
103 103 var pid = Thread.CurrentThread.ManagedThreadId;
104 104 var p = AsyncPool.Invoke (() => Thread.CurrentThread.ManagedThreadId);
105 105
106 106 Assert.AreNotEqual (pid, p.Join ());
107 107 }
108 108
109 109 [TestMethod]
110 public void WorkerPoolSizeTest() {
111 var pool = new WorkerPool(5,10);
112
113 Assert.AreEqual(5, pool.ThreadCount);
114
115 pool.Invoke(() => { Thread.Sleep(1000); return 10; });
116 pool.Invoke(() => { Thread.Sleep(1000); return 10; });
117 pool.Invoke(() => { Thread.Sleep(1000); return 10; });
118
119 Assert.AreEqual(5, pool.ThreadCount);
120
121 for (int i = 0; i < 100; i++)
122 pool.Invoke(() => { Thread.Sleep(1000); return 10; });
123 Assert.AreEqual(10, pool.ThreadCount);
124 }
125
126 [TestMethod]
127 public void WorkerPoolCorrectTest() {
128 var pool = new WorkerPool(5, 20);
129
130 var count = 0;
131 for (int i = 0; i < 1000; i++)
132 pool
133 .Invoke(() => 1)
134 .Then(x => Interlocked.Add(ref count, x));
135
136 Assert.AreEqual(1000, count);
137 }
138
139 [TestMethod]
110 140 public void ComplexCase1Test() {
111 141 var flags = new bool[3];
112 142
113 143 // op1 (aync 200ms) => op2 (async 200ms) => op3 (sync map)
114 144
115 145 var p = PromiseHelper
116 146 .Sleep(200, "Alan")
117 147 .Cancelled(() => flags[0] = true)
118 148 .Chain(x =>
119 149 PromiseHelper
120 150 .Sleep(200, "Hi, " + x)
121 151 .Map( y => y )
122 152 .Cancelled(() => flags[1] = true)
123 153 )
124 154 .Cancelled(() => flags[2] = true);
125 155 Thread.Sleep(300);
126 156 p.Cancel();
127 157 try {
128 158 Assert.AreEqual(p.Join(), "Hi, Alan");
129 159 Assert.Fail("Shouldn't get here");
130 160 } catch(OperationCanceledException) {
131 161 }
132 162
133 163 Assert.IsFalse(flags[0]);
134 164 Assert.IsTrue(flags[1]);
135 165 Assert.IsTrue(flags[2]);
136 166 }
137 167 }
138 168 }
139 169
@@ -1,14 +1,14
1 1 using System;
2 2 using System.Collections.Generic;
3 3 using System.Linq;
4 4 using System.Text;
5 5
6 6 namespace Implab
7 7 {
8 8 public interface IProgressNotifier
9 9 {
10 10 event EventHandler<ValueEventArgs<string>> MessageUpdated;
11 11 event EventHandler<ValueEventArgs<float>> ProgressUpdated;
12 EventHandler<ProgressInitEventArgs> ProgressInit;
12 event EventHandler<ProgressInitEventArgs> ProgressInit;
13 13 }
14 14 }
@@ -1,11 +1,11
1 1 using System;
2 2 using System.Collections.Generic;
3 3 using System.Linq;
4 4 using System.Text;
5 5
6 6 namespace Implab {
7 7
8 public class ManagedPromise<T>: Promise<T>, ITaskController, IProgressNotifier {
8 /*public class ManagedPromise<T>: Promise<T>, ITaskController, IProgressNotifier {
9 9
10 }
10 }*/
11 11 }
@@ -1,131 +1,171
1 1 using System;
2 2 using System.Collections.Generic;
3 3 using System.Linq;
4 4 using System.Text;
5 5 using System.Threading;
6 6 using System.Diagnostics;
7 7
8 8 namespace Implab.Parallels {
9 9 public class WorkerPool : IDisposable {
10 10 readonly int m_minThreads;
11 11 readonly int m_maxThreads;
12 12 int m_runningThreads;
13 13 object m_lock = new object();
14 14
15 15 bool m_disposed = false;
16
17 // this event will signal that workers can try to fetch a task from queue or the pool has been disposed
16 18 ManualResetEvent m_hasTasks = new ManualResetEvent(false);
17 19 Queue<Action> m_queue = new Queue<Action>();
18 20
19 21 public WorkerPool(int min, int max) {
20 22 if (min < 0)
21 23 throw new ArgumentOutOfRangeException("min");
24 if (max <= 0)
25 throw new ArgumentOutOfRangeException("max");
26
22 27 if (min > max)
23 28 min = max;
24 29 m_minThreads = min;
25 30 m_maxThreads = max;
26 31
32 InitPool();
33 }
34
35 public WorkerPool(int max)
36 : this(0, max) {
37 }
38
39 public WorkerPool() {
40 int maxThreads, maxCP;
41 ThreadPool.GetMaxThreads(out maxThreads, out maxCP);
42
43 m_minThreads = 0;
44 m_maxThreads = maxThreads;
45
46 InitPool();
47 }
48
49 void InitPool() {
27 50 for (int i = 0; i < m_minThreads; i++)
28 51 StartWorker();
29 52 }
30 53
54 public int ThreadCount {
55 get {
56 return m_runningThreads;
57 }
58 }
59
31 60 public Promise<T> Invoke<T>(Func<T> task) {
32 61 if (m_disposed)
33 62 throw new ObjectDisposedException(ToString());
34 63 if (task == null)
35 64 throw new ArgumentNullException("task");
36 65
37 66 var promise = new Promise<T>();
38 67
68 var queueLen = EnqueueTask(delegate() {
69 try {
70 promise.Resolve(task());
71 } catch (Exception e) {
72 promise.Reject(e);
73 }
74 });
39 75
76 if (queueLen > 1)
77 StartWorker();
40 78
41 79 return promise;
42 80 }
43 81
44 82 bool StartWorker() {
45 83 var current = m_runningThreads;
46 84 // use spins to allocate slot for the new thread
47 85 do {
48 86 if (current >= m_maxThreads)
49 87 // no more slots left
50 88 return false;
51 89 } while (current != Interlocked.CompareExchange(ref m_runningThreads, current + 1, current));
52 90
53 91 // slot successfully allocated
54 92
55 93 var worker = new Thread(this.Worker);
94 worker.IsBackground = true;
56 95 worker.Start();
57 96
58 97 return true;
59 98 }
60 99
61 void EnqueueTask(Action task) {
100 int EnqueueTask(Action task) {
62 101 Debug.Assert(task != null);
63 102 lock (m_queue) {
64 103 m_queue.Enqueue(task);
65 104 m_hasTasks.Set();
105 return m_queue.Count;
66 106 }
67 107 }
68 108
69 109 bool FetchTask(out Action task) {
70 110 task = null;
71 111
72 112 while (true) {
73 113
74 114 m_hasTasks.WaitOne();
75 115
76 116 if (m_disposed)
77 117 return false;
78 118
79 119 lock (m_queue) {
80 120 if (m_queue.Count > 0) {
81 121 task = m_queue.Dequeue();
82 122 return true;
83 123 }
84 124
85 125 // no tasks left
86 // signal that no more tasks left, lock ensures that this event won't suppress newly added task
126 // signal that no more tasks left, current lock ensures that this event won't suppress newly added task
87 127 m_hasTasks.Reset();
88 128 }
89 129
90 130 bool exit = true;
91 131
92 132 var current = m_runningThreads;
93 133 do {
94 134 if (current <= m_minThreads) {
95 135 exit = false; // this thread should return and wait for the new events
96 136 break;
97 137 }
98 138 } while (current != Interlocked.CompareExchange(ref m_runningThreads, current - 1, current));
99 139
100 140 if (exit)
101 141 return false;
102 142 }
103 143 }
104 144
105 145 void Worker() {
106 146 Action task;
107 147 while (FetchTask(out task))
108 148 task();
109 149 }
110 150
111 151 protected virtual void Dispose(bool disposing) {
112 152 if (disposing) {
113 153 lock (m_lock) {
114 154 if (m_disposed)
115 155 return;
116 156 m_disposed = true;
117 157 }
118 158 m_hasTasks.Set();
119 159 GC.SuppressFinalize(this);
120 160 }
121 161 }
122 162
123 163 public void Dispose() {
124 164 Dispose(true);
125 165 }
126 166
127 167 ~WorkerPool() {
128 168 Dispose(false);
129 169 }
130 170 }
131 171 }
General Comments 0
You need to be logged in to leave comments. Login now