@@ -1,139 +1,169 | |||
|
1 | 1 | using System; |
|
2 | 2 | using Microsoft.VisualStudio.TestTools.UnitTesting; |
|
3 | 3 | using System.Reflection; |
|
4 | 4 | using System.Threading; |
|
5 | 5 | using Implab.Parallels; |
|
6 | 6 | |
|
7 | 7 | namespace Implab.Test |
|
8 | 8 | { |
|
9 | 9 | [TestClass] |
|
10 | 10 | public class AsyncTests |
|
11 | 11 | { |
|
12 | 12 | [TestMethod] |
|
13 | 13 | public void ResolveTest () |
|
14 | 14 | { |
|
15 | 15 | int res = -1; |
|
16 | 16 | var p = new Promise<int> (); |
|
17 | 17 | p.Then (x => res = x); |
|
18 | 18 | p.Resolve (100); |
|
19 | 19 | |
|
20 | 20 | Assert.AreEqual (res, 100); |
|
21 | 21 | } |
|
22 | 22 | |
|
23 | 23 | [TestMethod] |
|
24 | 24 | public void RejectTest () |
|
25 | 25 | { |
|
26 | 26 | int res = -1; |
|
27 | 27 | Exception err = null; |
|
28 | 28 | |
|
29 | 29 | var p = new Promise<int> (); |
|
30 | 30 | p.Then (x => res = x, e => err = e); |
|
31 | 31 | p.Reject (new ApplicationException ("error")); |
|
32 | 32 | |
|
33 | 33 | Assert.AreEqual (res, -1); |
|
34 | 34 | Assert.AreEqual (err.Message, "error"); |
|
35 | 35 | |
|
36 | 36 | } |
|
37 | 37 | |
|
38 | 38 | [TestMethod] |
|
39 | 39 | public void JoinSuccessTest () |
|
40 | 40 | { |
|
41 | 41 | var p = new Promise<int> (); |
|
42 | 42 | p.Resolve (100); |
|
43 | 43 | Assert.AreEqual (p.Join (), 100); |
|
44 | 44 | } |
|
45 | 45 | |
|
46 | 46 | [TestMethod] |
|
47 | 47 | public void JoinFailTest () |
|
48 | 48 | { |
|
49 | 49 | var p = new Promise<int> (); |
|
50 | 50 | p.Reject (new ApplicationException ("failed")); |
|
51 | 51 | |
|
52 | 52 | try { |
|
53 | 53 | p.Join (); |
|
54 | 54 | throw new ApplicationException ("WRONG!"); |
|
55 | 55 | } catch (TargetInvocationException err) { |
|
56 | 56 | Assert.AreEqual (err.InnerException.Message, "failed"); |
|
57 | 57 | } catch { |
|
58 | 58 | Assert.Fail ("Got wrong excaption"); |
|
59 | 59 | } |
|
60 | 60 | } |
|
61 | 61 | |
|
62 | 62 | [TestMethod] |
|
63 | 63 | public void MapTest () |
|
64 | 64 | { |
|
65 | 65 | var p = new Promise<int> (); |
|
66 | 66 | |
|
67 | 67 | var p2 = p.Map (x => x.ToString ()); |
|
68 | 68 | p.Resolve (100); |
|
69 | 69 | |
|
70 | 70 | Assert.AreEqual (p2.Join (), "100"); |
|
71 | 71 | } |
|
72 | 72 | |
|
73 | 73 | [TestMethod] |
|
74 | 74 | public void FixErrorTest() { |
|
75 | 75 | var p = new Promise<int>(); |
|
76 | 76 | |
|
77 | 77 | var p2 = p.Error(e => 101); |
|
78 | 78 | |
|
79 | 79 | p.Reject(new Exception()); |
|
80 | 80 | |
|
81 | 81 | Assert.AreEqual(p2.Join(), 101); |
|
82 | 82 | } |
|
83 | 83 | |
|
84 | 84 | [TestMethod] |
|
85 | 85 | public void ChainTest () |
|
86 | 86 | { |
|
87 | 87 | var p1 = new Promise<int> (); |
|
88 | 88 | |
|
89 | 89 | var p3 = p1.Chain (x => { |
|
90 | 90 | var p2 = new Promise<string> (); |
|
91 | 91 | p2.Resolve (x.ToString ()); |
|
92 | 92 | return p2; |
|
93 | 93 | }); |
|
94 | 94 | |
|
95 | 95 | p1.Resolve (100); |
|
96 | 96 | |
|
97 | 97 | Assert.AreEqual (p3.Join (), "100"); |
|
98 | 98 | } |
|
99 | 99 | |
|
100 | 100 | [TestMethod] |
|
101 | 101 | public void PoolTest () |
|
102 | 102 | { |
|
103 | 103 | var pid = Thread.CurrentThread.ManagedThreadId; |
|
104 | 104 | var p = AsyncPool.Invoke (() => Thread.CurrentThread.ManagedThreadId); |
|
105 | 105 | |
|
106 | 106 | Assert.AreNotEqual (pid, p.Join ()); |
|
107 | 107 | } |
|
108 | 108 | |
|
109 | 109 | [TestMethod] |
|
110 | public void WorkerPoolSizeTest() { | |
|
111 | var pool = new WorkerPool(5,10); | |
|
112 | ||
|
113 | Assert.AreEqual(5, pool.ThreadCount); | |
|
114 | ||
|
115 | pool.Invoke(() => { Thread.Sleep(1000); return 10; }); | |
|
116 | pool.Invoke(() => { Thread.Sleep(1000); return 10; }); | |
|
117 | pool.Invoke(() => { Thread.Sleep(1000); return 10; }); | |
|
118 | ||
|
119 | Assert.AreEqual(5, pool.ThreadCount); | |
|
120 | ||
|
121 | for (int i = 0; i < 100; i++) | |
|
122 | pool.Invoke(() => { Thread.Sleep(1000); return 10; }); | |
|
123 | Assert.AreEqual(10, pool.ThreadCount); | |
|
124 | } | |
|
125 | ||
|
126 | [TestMethod] | |
|
127 | public void WorkerPoolCorrectTest() { | |
|
128 | var pool = new WorkerPool(5, 20); | |
|
129 | ||
|
130 | var count = 0; | |
|
131 | for (int i = 0; i < 1000; i++) | |
|
132 | pool | |
|
133 | .Invoke(() => 1) | |
|
134 | .Then(x => Interlocked.Add(ref count, x)); | |
|
135 | ||
|
136 | Assert.AreEqual(1000, count); | |
|
137 | } | |
|
138 | ||
|
139 | [TestMethod] | |
|
110 | 140 | public void ComplexCase1Test() { |
|
111 | 141 | var flags = new bool[3]; |
|
112 | 142 | |
|
113 | 143 | // op1 (aync 200ms) => op2 (async 200ms) => op3 (sync map) |
|
114 | 144 | |
|
115 | 145 | var p = PromiseHelper |
|
116 | 146 | .Sleep(200, "Alan") |
|
117 | 147 | .Cancelled(() => flags[0] = true) |
|
118 | 148 | .Chain(x => |
|
119 | 149 | PromiseHelper |
|
120 | 150 | .Sleep(200, "Hi, " + x) |
|
121 | 151 | .Map( y => y ) |
|
122 | 152 | .Cancelled(() => flags[1] = true) |
|
123 | 153 | ) |
|
124 | 154 | .Cancelled(() => flags[2] = true); |
|
125 | 155 | Thread.Sleep(300); |
|
126 | 156 | p.Cancel(); |
|
127 | 157 | try { |
|
128 | 158 | Assert.AreEqual(p.Join(), "Hi, Alan"); |
|
129 | 159 | Assert.Fail("Shouldn't get here"); |
|
130 | 160 | } catch(OperationCanceledException) { |
|
131 | 161 | } |
|
132 | 162 | |
|
133 | 163 | Assert.IsFalse(flags[0]); |
|
134 | 164 | Assert.IsTrue(flags[1]); |
|
135 | 165 | Assert.IsTrue(flags[2]); |
|
136 | 166 | } |
|
137 | 167 | } |
|
138 | 168 | } |
|
139 | 169 |
@@ -1,14 +1,14 | |||
|
1 | 1 | using System; |
|
2 | 2 | using System.Collections.Generic; |
|
3 | 3 | using System.Linq; |
|
4 | 4 | using System.Text; |
|
5 | 5 | |
|
6 | 6 | namespace Implab |
|
7 | 7 | { |
|
8 | 8 | public interface IProgressNotifier |
|
9 | 9 | { |
|
10 | 10 | event EventHandler<ValueEventArgs<string>> MessageUpdated; |
|
11 | 11 | event EventHandler<ValueEventArgs<float>> ProgressUpdated; |
|
12 | EventHandler<ProgressInitEventArgs> ProgressInit; | |
|
12 | event EventHandler<ProgressInitEventArgs> ProgressInit; | |
|
13 | 13 | } |
|
14 | 14 | } |
@@ -1,11 +1,11 | |||
|
1 | 1 | using System; |
|
2 | 2 | using System.Collections.Generic; |
|
3 | 3 | using System.Linq; |
|
4 | 4 | using System.Text; |
|
5 | 5 | |
|
6 | 6 | namespace Implab { |
|
7 | 7 | |
|
8 | public class ManagedPromise<T>: Promise<T>, ITaskController, IProgressNotifier { | |
|
8 | /*public class ManagedPromise<T>: Promise<T>, ITaskController, IProgressNotifier { | |
|
9 | 9 |
|
|
10 |
|
|
|
10 | }*/ | |
|
11 | 11 | } |
@@ -1,131 +1,171 | |||
|
1 | 1 | using System; |
|
2 | 2 | using System.Collections.Generic; |
|
3 | 3 | using System.Linq; |
|
4 | 4 | using System.Text; |
|
5 | 5 | using System.Threading; |
|
6 | 6 | using System.Diagnostics; |
|
7 | 7 | |
|
8 | 8 | namespace Implab.Parallels { |
|
9 | 9 | public class WorkerPool : IDisposable { |
|
10 | 10 | readonly int m_minThreads; |
|
11 | 11 | readonly int m_maxThreads; |
|
12 | 12 | int m_runningThreads; |
|
13 | 13 | object m_lock = new object(); |
|
14 | 14 | |
|
15 | 15 | bool m_disposed = false; |
|
16 | ||
|
17 | // this event will signal that workers can try to fetch a task from queue or the pool has been disposed | |
|
16 | 18 | ManualResetEvent m_hasTasks = new ManualResetEvent(false); |
|
17 | 19 | Queue<Action> m_queue = new Queue<Action>(); |
|
18 | 20 | |
|
19 | 21 | public WorkerPool(int min, int max) { |
|
20 | 22 | if (min < 0) |
|
21 | 23 | throw new ArgumentOutOfRangeException("min"); |
|
24 | if (max <= 0) | |
|
25 | throw new ArgumentOutOfRangeException("max"); | |
|
26 | ||
|
22 | 27 | if (min > max) |
|
23 | 28 | min = max; |
|
24 | 29 | m_minThreads = min; |
|
25 | 30 | m_maxThreads = max; |
|
26 | 31 | |
|
32 | InitPool(); | |
|
33 | } | |
|
34 | ||
|
35 | public WorkerPool(int max) | |
|
36 | : this(0, max) { | |
|
37 | } | |
|
38 | ||
|
39 | public WorkerPool() { | |
|
40 | int maxThreads, maxCP; | |
|
41 | ThreadPool.GetMaxThreads(out maxThreads, out maxCP); | |
|
42 | ||
|
43 | m_minThreads = 0; | |
|
44 | m_maxThreads = maxThreads; | |
|
45 | ||
|
46 | InitPool(); | |
|
47 | } | |
|
48 | ||
|
49 | void InitPool() { | |
|
27 | 50 | for (int i = 0; i < m_minThreads; i++) |
|
28 | 51 | StartWorker(); |
|
29 | 52 | } |
|
30 | 53 | |
|
54 | public int ThreadCount { | |
|
55 | get { | |
|
56 | return m_runningThreads; | |
|
57 | } | |
|
58 | } | |
|
59 | ||
|
31 | 60 | public Promise<T> Invoke<T>(Func<T> task) { |
|
32 | 61 | if (m_disposed) |
|
33 | 62 | throw new ObjectDisposedException(ToString()); |
|
34 | 63 | if (task == null) |
|
35 | 64 | throw new ArgumentNullException("task"); |
|
36 | 65 | |
|
37 | 66 | var promise = new Promise<T>(); |
|
38 | 67 | |
|
68 | var queueLen = EnqueueTask(delegate() { | |
|
69 | try { | |
|
70 | promise.Resolve(task()); | |
|
71 | } catch (Exception e) { | |
|
72 | promise.Reject(e); | |
|
73 | } | |
|
74 | }); | |
|
39 | 75 | |
|
76 | if (queueLen > 1) | |
|
77 | StartWorker(); | |
|
40 | 78 | |
|
41 | 79 | return promise; |
|
42 | 80 | } |
|
43 | 81 | |
|
44 | 82 | bool StartWorker() { |
|
45 | 83 | var current = m_runningThreads; |
|
46 | 84 | // use spins to allocate slot for the new thread |
|
47 | 85 | do { |
|
48 | 86 | if (current >= m_maxThreads) |
|
49 | 87 | // no more slots left |
|
50 | 88 | return false; |
|
51 | 89 | } while (current != Interlocked.CompareExchange(ref m_runningThreads, current + 1, current)); |
|
52 | 90 | |
|
53 | 91 | // slot successfully allocated |
|
54 | 92 | |
|
55 | 93 | var worker = new Thread(this.Worker); |
|
94 | worker.IsBackground = true; | |
|
56 | 95 | worker.Start(); |
|
57 | 96 | |
|
58 | 97 | return true; |
|
59 | 98 | } |
|
60 | 99 | |
|
61 |
|
|
|
100 | int EnqueueTask(Action task) { | |
|
62 | 101 | Debug.Assert(task != null); |
|
63 | 102 | lock (m_queue) { |
|
64 | 103 | m_queue.Enqueue(task); |
|
65 | 104 | m_hasTasks.Set(); |
|
105 | return m_queue.Count; | |
|
66 | 106 | } |
|
67 | 107 | } |
|
68 | 108 | |
|
69 | 109 | bool FetchTask(out Action task) { |
|
70 | 110 | task = null; |
|
71 | 111 | |
|
72 | 112 | while (true) { |
|
73 | 113 | |
|
74 | 114 | m_hasTasks.WaitOne(); |
|
75 | 115 | |
|
76 | 116 | if (m_disposed) |
|
77 | 117 | return false; |
|
78 | 118 | |
|
79 | 119 | lock (m_queue) { |
|
80 | 120 | if (m_queue.Count > 0) { |
|
81 | 121 | task = m_queue.Dequeue(); |
|
82 | 122 | return true; |
|
83 | 123 | } |
|
84 | 124 | |
|
85 | 125 | // no tasks left |
|
86 | // signal that no more tasks left, lock ensures that this event won't suppress newly added task | |
|
126 | // signal that no more tasks left, current lock ensures that this event won't suppress newly added task | |
|
87 | 127 | m_hasTasks.Reset(); |
|
88 | 128 | } |
|
89 | 129 | |
|
90 | 130 | bool exit = true; |
|
91 | 131 | |
|
92 | 132 | var current = m_runningThreads; |
|
93 | 133 | do { |
|
94 | 134 | if (current <= m_minThreads) { |
|
95 | 135 | exit = false; // this thread should return and wait for the new events |
|
96 | 136 | break; |
|
97 | 137 | } |
|
98 | 138 | } while (current != Interlocked.CompareExchange(ref m_runningThreads, current - 1, current)); |
|
99 | 139 | |
|
100 | 140 | if (exit) |
|
101 | 141 | return false; |
|
102 | 142 | } |
|
103 | 143 | } |
|
104 | 144 | |
|
105 | 145 | void Worker() { |
|
106 | 146 | Action task; |
|
107 | 147 | while (FetchTask(out task)) |
|
108 | 148 | task(); |
|
109 | 149 | } |
|
110 | 150 | |
|
111 | 151 | protected virtual void Dispose(bool disposing) { |
|
112 | 152 | if (disposing) { |
|
113 | 153 | lock (m_lock) { |
|
114 | 154 | if (m_disposed) |
|
115 | 155 | return; |
|
116 | 156 | m_disposed = true; |
|
117 | 157 | } |
|
118 | 158 | m_hasTasks.Set(); |
|
119 | 159 | GC.SuppressFinalize(this); |
|
120 | 160 | } |
|
121 | 161 | } |
|
122 | 162 | |
|
123 | 163 | public void Dispose() { |
|
124 | 164 | Dispose(true); |
|
125 | 165 | } |
|
126 | 166 | |
|
127 | 167 | ~WorkerPool() { |
|
128 | 168 | Dispose(false); |
|
129 | 169 | } |
|
130 | 170 | } |
|
131 | 171 | } |
General Comments 0
You need to be logged in to leave comments.
Login now