##// END OF EJS Templates
small fixes, WorkerPool still incomplete
cin -
r13:b0feb5b9ad1c promises
parent child
Show More
@@ -1,139 +1,169
1 using System;
1 using System;
2 using Microsoft.VisualStudio.TestTools.UnitTesting;
2 using Microsoft.VisualStudio.TestTools.UnitTesting;
3 using System.Reflection;
3 using System.Reflection;
4 using System.Threading;
4 using System.Threading;
5 using Implab.Parallels;
5 using Implab.Parallels;
6
6
7 namespace Implab.Test
7 namespace Implab.Test
8 {
8 {
9 [TestClass]
9 [TestClass]
10 public class AsyncTests
10 public class AsyncTests
11 {
11 {
12 [TestMethod]
12 [TestMethod]
13 public void ResolveTest ()
13 public void ResolveTest ()
14 {
14 {
15 int res = -1;
15 int res = -1;
16 var p = new Promise<int> ();
16 var p = new Promise<int> ();
17 p.Then (x => res = x);
17 p.Then (x => res = x);
18 p.Resolve (100);
18 p.Resolve (100);
19
19
20 Assert.AreEqual (res, 100);
20 Assert.AreEqual (res, 100);
21 }
21 }
22
22
23 [TestMethod]
23 [TestMethod]
24 public void RejectTest ()
24 public void RejectTest ()
25 {
25 {
26 int res = -1;
26 int res = -1;
27 Exception err = null;
27 Exception err = null;
28
28
29 var p = new Promise<int> ();
29 var p = new Promise<int> ();
30 p.Then (x => res = x, e => err = e);
30 p.Then (x => res = x, e => err = e);
31 p.Reject (new ApplicationException ("error"));
31 p.Reject (new ApplicationException ("error"));
32
32
33 Assert.AreEqual (res, -1);
33 Assert.AreEqual (res, -1);
34 Assert.AreEqual (err.Message, "error");
34 Assert.AreEqual (err.Message, "error");
35
35
36 }
36 }
37
37
38 [TestMethod]
38 [TestMethod]
39 public void JoinSuccessTest ()
39 public void JoinSuccessTest ()
40 {
40 {
41 var p = new Promise<int> ();
41 var p = new Promise<int> ();
42 p.Resolve (100);
42 p.Resolve (100);
43 Assert.AreEqual (p.Join (), 100);
43 Assert.AreEqual (p.Join (), 100);
44 }
44 }
45
45
46 [TestMethod]
46 [TestMethod]
47 public void JoinFailTest ()
47 public void JoinFailTest ()
48 {
48 {
49 var p = new Promise<int> ();
49 var p = new Promise<int> ();
50 p.Reject (new ApplicationException ("failed"));
50 p.Reject (new ApplicationException ("failed"));
51
51
52 try {
52 try {
53 p.Join ();
53 p.Join ();
54 throw new ApplicationException ("WRONG!");
54 throw new ApplicationException ("WRONG!");
55 } catch (TargetInvocationException err) {
55 } catch (TargetInvocationException err) {
56 Assert.AreEqual (err.InnerException.Message, "failed");
56 Assert.AreEqual (err.InnerException.Message, "failed");
57 } catch {
57 } catch {
58 Assert.Fail ("Got wrong excaption");
58 Assert.Fail ("Got wrong excaption");
59 }
59 }
60 }
60 }
61
61
62 [TestMethod]
62 [TestMethod]
63 public void MapTest ()
63 public void MapTest ()
64 {
64 {
65 var p = new Promise<int> ();
65 var p = new Promise<int> ();
66
66
67 var p2 = p.Map (x => x.ToString ());
67 var p2 = p.Map (x => x.ToString ());
68 p.Resolve (100);
68 p.Resolve (100);
69
69
70 Assert.AreEqual (p2.Join (), "100");
70 Assert.AreEqual (p2.Join (), "100");
71 }
71 }
72
72
73 [TestMethod]
73 [TestMethod]
74 public void FixErrorTest() {
74 public void FixErrorTest() {
75 var p = new Promise<int>();
75 var p = new Promise<int>();
76
76
77 var p2 = p.Error(e => 101);
77 var p2 = p.Error(e => 101);
78
78
79 p.Reject(new Exception());
79 p.Reject(new Exception());
80
80
81 Assert.AreEqual(p2.Join(), 101);
81 Assert.AreEqual(p2.Join(), 101);
82 }
82 }
83
83
84 [TestMethod]
84 [TestMethod]
85 public void ChainTest ()
85 public void ChainTest ()
86 {
86 {
87 var p1 = new Promise<int> ();
87 var p1 = new Promise<int> ();
88
88
89 var p3 = p1.Chain (x => {
89 var p3 = p1.Chain (x => {
90 var p2 = new Promise<string> ();
90 var p2 = new Promise<string> ();
91 p2.Resolve (x.ToString ());
91 p2.Resolve (x.ToString ());
92 return p2;
92 return p2;
93 });
93 });
94
94
95 p1.Resolve (100);
95 p1.Resolve (100);
96
96
97 Assert.AreEqual (p3.Join (), "100");
97 Assert.AreEqual (p3.Join (), "100");
98 }
98 }
99
99
100 [TestMethod]
100 [TestMethod]
101 public void PoolTest ()
101 public void PoolTest ()
102 {
102 {
103 var pid = Thread.CurrentThread.ManagedThreadId;
103 var pid = Thread.CurrentThread.ManagedThreadId;
104 var p = AsyncPool.Invoke (() => Thread.CurrentThread.ManagedThreadId);
104 var p = AsyncPool.Invoke (() => Thread.CurrentThread.ManagedThreadId);
105
105
106 Assert.AreNotEqual (pid, p.Join ());
106 Assert.AreNotEqual (pid, p.Join ());
107 }
107 }
108
108
109 [TestMethod]
109 [TestMethod]
110 public void WorkerPoolSizeTest() {
111 var pool = new WorkerPool(5,10);
112
113 Assert.AreEqual(5, pool.ThreadCount);
114
115 pool.Invoke(() => { Thread.Sleep(1000); return 10; });
116 pool.Invoke(() => { Thread.Sleep(1000); return 10; });
117 pool.Invoke(() => { Thread.Sleep(1000); return 10; });
118
119 Assert.AreEqual(5, pool.ThreadCount);
120
121 for (int i = 0; i < 100; i++)
122 pool.Invoke(() => { Thread.Sleep(1000); return 10; });
123 Assert.AreEqual(10, pool.ThreadCount);
124 }
125
126 [TestMethod]
127 public void WorkerPoolCorrectTest() {
128 var pool = new WorkerPool(5, 20);
129
130 var count = 0;
131 for (int i = 0; i < 1000; i++)
132 pool
133 .Invoke(() => 1)
134 .Then(x => Interlocked.Add(ref count, x));
135
136 Assert.AreEqual(1000, count);
137 }
138
139 [TestMethod]
110 public void ComplexCase1Test() {
140 public void ComplexCase1Test() {
111 var flags = new bool[3];
141 var flags = new bool[3];
112
142
113 // op1 (aync 200ms) => op2 (async 200ms) => op3 (sync map)
143 // op1 (aync 200ms) => op2 (async 200ms) => op3 (sync map)
114
144
115 var p = PromiseHelper
145 var p = PromiseHelper
116 .Sleep(200, "Alan")
146 .Sleep(200, "Alan")
117 .Cancelled(() => flags[0] = true)
147 .Cancelled(() => flags[0] = true)
118 .Chain(x =>
148 .Chain(x =>
119 PromiseHelper
149 PromiseHelper
120 .Sleep(200, "Hi, " + x)
150 .Sleep(200, "Hi, " + x)
121 .Map( y => y )
151 .Map( y => y )
122 .Cancelled(() => flags[1] = true)
152 .Cancelled(() => flags[1] = true)
123 )
153 )
124 .Cancelled(() => flags[2] = true);
154 .Cancelled(() => flags[2] = true);
125 Thread.Sleep(300);
155 Thread.Sleep(300);
126 p.Cancel();
156 p.Cancel();
127 try {
157 try {
128 Assert.AreEqual(p.Join(), "Hi, Alan");
158 Assert.AreEqual(p.Join(), "Hi, Alan");
129 Assert.Fail("Shouldn't get here");
159 Assert.Fail("Shouldn't get here");
130 } catch(OperationCanceledException) {
160 } catch(OperationCanceledException) {
131 }
161 }
132
162
133 Assert.IsFalse(flags[0]);
163 Assert.IsFalse(flags[0]);
134 Assert.IsTrue(flags[1]);
164 Assert.IsTrue(flags[1]);
135 Assert.IsTrue(flags[2]);
165 Assert.IsTrue(flags[2]);
136 }
166 }
137 }
167 }
138 }
168 }
139
169
@@ -1,14 +1,14
1 using System;
1 using System;
2 using System.Collections.Generic;
2 using System.Collections.Generic;
3 using System.Linq;
3 using System.Linq;
4 using System.Text;
4 using System.Text;
5
5
6 namespace Implab
6 namespace Implab
7 {
7 {
8 public interface IProgressNotifier
8 public interface IProgressNotifier
9 {
9 {
10 event EventHandler<ValueEventArgs<string>> MessageUpdated;
10 event EventHandler<ValueEventArgs<string>> MessageUpdated;
11 event EventHandler<ValueEventArgs<float>> ProgressUpdated;
11 event EventHandler<ValueEventArgs<float>> ProgressUpdated;
12 EventHandler<ProgressInitEventArgs> ProgressInit;
12 event EventHandler<ProgressInitEventArgs> ProgressInit;
13 }
13 }
14 }
14 }
@@ -1,11 +1,11
1 using System;
1 using System;
2 using System.Collections.Generic;
2 using System.Collections.Generic;
3 using System.Linq;
3 using System.Linq;
4 using System.Text;
4 using System.Text;
5
5
6 namespace Implab {
6 namespace Implab {
7
7
8 public class ManagedPromise<T>: Promise<T>, ITaskController, IProgressNotifier {
8 /*public class ManagedPromise<T>: Promise<T>, ITaskController, IProgressNotifier {
9
9
10 }*/
10 }
11 }
11 }
@@ -1,131 +1,171
1 using System;
1 using System;
2 using System.Collections.Generic;
2 using System.Collections.Generic;
3 using System.Linq;
3 using System.Linq;
4 using System.Text;
4 using System.Text;
5 using System.Threading;
5 using System.Threading;
6 using System.Diagnostics;
6 using System.Diagnostics;
7
7
8 namespace Implab.Parallels {
8 namespace Implab.Parallels {
9 public class WorkerPool : IDisposable {
9 public class WorkerPool : IDisposable {
10 readonly int m_minThreads;
10 readonly int m_minThreads;
11 readonly int m_maxThreads;
11 readonly int m_maxThreads;
12 int m_runningThreads;
12 int m_runningThreads;
13 object m_lock = new object();
13 object m_lock = new object();
14
14
15 bool m_disposed = false;
15 bool m_disposed = false;
16
17 // this event will signal that workers can try to fetch a task from queue or the pool has been disposed
16 ManualResetEvent m_hasTasks = new ManualResetEvent(false);
18 ManualResetEvent m_hasTasks = new ManualResetEvent(false);
17 Queue<Action> m_queue = new Queue<Action>();
19 Queue<Action> m_queue = new Queue<Action>();
18
20
19 public WorkerPool(int min, int max) {
21 public WorkerPool(int min, int max) {
20 if (min < 0)
22 if (min < 0)
21 throw new ArgumentOutOfRangeException("min");
23 throw new ArgumentOutOfRangeException("min");
24 if (max <= 0)
25 throw new ArgumentOutOfRangeException("max");
26
22 if (min > max)
27 if (min > max)
23 min = max;
28 min = max;
24 m_minThreads = min;
29 m_minThreads = min;
25 m_maxThreads = max;
30 m_maxThreads = max;
26
31
32 InitPool();
33 }
34
35 public WorkerPool(int max)
36 : this(0, max) {
37 }
38
39 public WorkerPool() {
40 int maxThreads, maxCP;
41 ThreadPool.GetMaxThreads(out maxThreads, out maxCP);
42
43 m_minThreads = 0;
44 m_maxThreads = maxThreads;
45
46 InitPool();
47 }
48
49 void InitPool() {
27 for (int i = 0; i < m_minThreads; i++)
50 for (int i = 0; i < m_minThreads; i++)
28 StartWorker();
51 StartWorker();
29 }
52 }
30
53
54 public int ThreadCount {
55 get {
56 return m_runningThreads;
57 }
58 }
59
31 public Promise<T> Invoke<T>(Func<T> task) {
60 public Promise<T> Invoke<T>(Func<T> task) {
32 if (m_disposed)
61 if (m_disposed)
33 throw new ObjectDisposedException(ToString());
62 throw new ObjectDisposedException(ToString());
34 if (task == null)
63 if (task == null)
35 throw new ArgumentNullException("task");
64 throw new ArgumentNullException("task");
36
65
37 var promise = new Promise<T>();
66 var promise = new Promise<T>();
38
67
68 var queueLen = EnqueueTask(delegate() {
69 try {
70 promise.Resolve(task());
71 } catch (Exception e) {
72 promise.Reject(e);
73 }
74 });
39
75
76 if (queueLen > 1)
77 StartWorker();
40
78
41 return promise;
79 return promise;
42 }
80 }
43
81
44 bool StartWorker() {
82 bool StartWorker() {
45 var current = m_runningThreads;
83 var current = m_runningThreads;
46 // use spins to allocate slot for the new thread
84 // use spins to allocate slot for the new thread
47 do {
85 do {
48 if (current >= m_maxThreads)
86 if (current >= m_maxThreads)
49 // no more slots left
87 // no more slots left
50 return false;
88 return false;
51 } while (current != Interlocked.CompareExchange(ref m_runningThreads, current + 1, current));
89 } while (current != Interlocked.CompareExchange(ref m_runningThreads, current + 1, current));
52
90
53 // slot successfully allocated
91 // slot successfully allocated
54
92
55 var worker = new Thread(this.Worker);
93 var worker = new Thread(this.Worker);
94 worker.IsBackground = true;
56 worker.Start();
95 worker.Start();
57
96
58 return true;
97 return true;
59 }
98 }
60
99
61 void EnqueueTask(Action task) {
100 int EnqueueTask(Action task) {
62 Debug.Assert(task != null);
101 Debug.Assert(task != null);
63 lock (m_queue) {
102 lock (m_queue) {
64 m_queue.Enqueue(task);
103 m_queue.Enqueue(task);
65 m_hasTasks.Set();
104 m_hasTasks.Set();
105 return m_queue.Count;
66 }
106 }
67 }
107 }
68
108
69 bool FetchTask(out Action task) {
109 bool FetchTask(out Action task) {
70 task = null;
110 task = null;
71
111
72 while (true) {
112 while (true) {
73
113
74 m_hasTasks.WaitOne();
114 m_hasTasks.WaitOne();
75
115
76 if (m_disposed)
116 if (m_disposed)
77 return false;
117 return false;
78
118
79 lock (m_queue) {
119 lock (m_queue) {
80 if (m_queue.Count > 0) {
120 if (m_queue.Count > 0) {
81 task = m_queue.Dequeue();
121 task = m_queue.Dequeue();
82 return true;
122 return true;
83 }
123 }
84
124
85 // no tasks left
125 // no tasks left
86 // signal that no more tasks left, lock ensures that this event won't suppress newly added task
126 // signal that no more tasks left, current lock ensures that this event won't suppress newly added task
87 m_hasTasks.Reset();
127 m_hasTasks.Reset();
88 }
128 }
89
129
90 bool exit = true;
130 bool exit = true;
91
131
92 var current = m_runningThreads;
132 var current = m_runningThreads;
93 do {
133 do {
94 if (current <= m_minThreads) {
134 if (current <= m_minThreads) {
95 exit = false; // this thread should return and wait for the new events
135 exit = false; // this thread should return and wait for the new events
96 break;
136 break;
97 }
137 }
98 } while (current != Interlocked.CompareExchange(ref m_runningThreads, current - 1, current));
138 } while (current != Interlocked.CompareExchange(ref m_runningThreads, current - 1, current));
99
139
100 if (exit)
140 if (exit)
101 return false;
141 return false;
102 }
142 }
103 }
143 }
104
144
105 void Worker() {
145 void Worker() {
106 Action task;
146 Action task;
107 while (FetchTask(out task))
147 while (FetchTask(out task))
108 task();
148 task();
109 }
149 }
110
150
111 protected virtual void Dispose(bool disposing) {
151 protected virtual void Dispose(bool disposing) {
112 if (disposing) {
152 if (disposing) {
113 lock (m_lock) {
153 lock (m_lock) {
114 if (m_disposed)
154 if (m_disposed)
115 return;
155 return;
116 m_disposed = true;
156 m_disposed = true;
117 }
157 }
118 m_hasTasks.Set();
158 m_hasTasks.Set();
119 GC.SuppressFinalize(this);
159 GC.SuppressFinalize(this);
120 }
160 }
121 }
161 }
122
162
123 public void Dispose() {
163 public void Dispose() {
124 Dispose(true);
164 Dispose(true);
125 }
165 }
126
166
127 ~WorkerPool() {
167 ~WorkerPool() {
128 Dispose(false);
168 Dispose(false);
129 }
169 }
130 }
170 }
131 }
171 }
General Comments 0
You need to be logged in to leave comments. Login now