##// END OF EJS Templates
refactoring, added WorkerPool
cin -
r12:eb418ba8275b promises
parent child
Show More
@@ -0,0 +1,10
1 using System;
2 using System.Collections.Generic;
3 using System.Linq;
4 using System.Text;
5
6 namespace Implab {
7 public interface ICancellable {
8 bool Cancel();
9 }
10 }
@@ -0,0 +1,18
1 using System;
2 using System.Collections.Generic;
3 using System.Linq;
4 using System.Text;
5
6 namespace Implab {
7 public interface IProgressHandler {
8 string Message {
9 get;
10 set;
11 }
12 float CurrentProgress {
13 get;
14 set;
15 }
16 void InitProgress(float current, float max, string message);
17 }
18 }
@@ -0,0 +1,14
1 using System;
2 using System.Collections.Generic;
3 using System.Linq;
4 using System.Text;
5
6 namespace Implab
7 {
8 public interface IProgressNotifier
9 {
10 event EventHandler<ValueEventArgs<string>> MessageUpdated;
11 event EventHandler<ValueEventArgs<float>> ProgressUpdated;
12 EventHandler<ProgressInitEventArgs> ProgressInit;
13 }
14 }
@@ -0,0 +1,12
1 using System;
2 using System.Collections.Generic;
3 using System.Linq;
4 using System.Text;
5
6 namespace Implab {
7 public interface ITaskController: IProgressHandler {
8 bool Cancelled {
9 get;
10 }
11 }
12 }
@@ -0,0 +1,11
1 using System;
2 using System.Collections.Generic;
3 using System.Linq;
4 using System.Text;
5
6 namespace Implab {
7
8 public class ManagedPromise<T>: Promise<T>, ITaskController, IProgressNotifier {
9
10 }
11 }
@@ -0,0 +1,131
1 using System;
2 using System.Collections.Generic;
3 using System.Linq;
4 using System.Text;
5 using System.Threading;
6 using System.Diagnostics;
7
8 namespace Implab.Parallels {
9 public class WorkerPool : IDisposable {
10 readonly int m_minThreads;
11 readonly int m_maxThreads;
12 int m_runningThreads;
13 object m_lock = new object();
14
15 bool m_disposed = false;
16 ManualResetEvent m_hasTasks = new ManualResetEvent(false);
17 Queue<Action> m_queue = new Queue<Action>();
18
19 public WorkerPool(int min, int max) {
20 if (min < 0)
21 throw new ArgumentOutOfRangeException("min");
22 if (min > max)
23 min = max;
24 m_minThreads = min;
25 m_maxThreads = max;
26
27 for (int i = 0; i < m_minThreads; i++)
28 StartWorker();
29 }
30
31 public Promise<T> Invoke<T>(Func<T> task) {
32 if (m_disposed)
33 throw new ObjectDisposedException(ToString());
34 if (task == null)
35 throw new ArgumentNullException("task");
36
37 var promise = new Promise<T>();
38
39
40
41 return promise;
42 }
43
44 bool StartWorker() {
45 var current = m_runningThreads;
46 // use spins to allocate slot for the new thread
47 do {
48 if (current >= m_maxThreads)
49 // no more slots left
50 return false;
51 } while (current != Interlocked.CompareExchange(ref m_runningThreads, current + 1, current));
52
53 // slot successfully allocated
54
55 var worker = new Thread(this.Worker);
56 worker.Start();
57
58 return true;
59 }
60
61 void EnqueueTask(Action task) {
62 Debug.Assert(task != null);
63 lock (m_queue) {
64 m_queue.Enqueue(task);
65 m_hasTasks.Set();
66 }
67 }
68
69 bool FetchTask(out Action task) {
70 task = null;
71
72 while (true) {
73
74 m_hasTasks.WaitOne();
75
76 if (m_disposed)
77 return false;
78
79 lock (m_queue) {
80 if (m_queue.Count > 0) {
81 task = m_queue.Dequeue();
82 return true;
83 }
84
85 // no tasks left
86 // signal that no more tasks left, lock ensures that this event won't suppress newly added task
87 m_hasTasks.Reset();
88 }
89
90 bool exit = true;
91
92 var current = m_runningThreads;
93 do {
94 if (current <= m_minThreads) {
95 exit = false; // this thread should return and wait for the new events
96 break;
97 }
98 } while (current != Interlocked.CompareExchange(ref m_runningThreads, current - 1, current));
99
100 if (exit)
101 return false;
102 }
103 }
104
105 void Worker() {
106 Action task;
107 while (FetchTask(out task))
108 task();
109 }
110
111 protected virtual void Dispose(bool disposing) {
112 if (disposing) {
113 lock (m_lock) {
114 if (m_disposed)
115 return;
116 m_disposed = true;
117 }
118 m_hasTasks.Set();
119 GC.SuppressFinalize(this);
120 }
121 }
122
123 public void Dispose() {
124 Dispose(true);
125 }
126
127 ~WorkerPool() {
128 Dispose(false);
129 }
130 }
131 }
1 NO CONTENT: modified file, binary diff hidden
NO CONTENT: modified file, binary diff hidden
@@ -5,7 +5,7 using System.Text;
5
5
6 namespace Implab
6 namespace Implab
7 {
7 {
8 public interface IPromise
8 public interface IPromise: ICancellable
9 {
9 {
10 /// <summary>
10 /// <summary>
11 /// Check whereather the promise has no more than one dependent promise.
11 /// Check whereather the promise has no more than one dependent promise.
@@ -24,12 +24,6 namespace Implab
24 }
24 }
25
25
26 /// <summary>
26 /// <summary>
27 /// Tries to cancel the the complete chain of promises.
28 /// </summary>
29 /// <returns><c>true</c> - if the promise has been cancelled, otherwise the promise will be resolved (or resolved already).</returns>
30 bool Cancel();
31
32 /// <summary>
33 /// Registers handler for the case when the promise is cencelled. If the promise already cancelled the
27 /// Registers handler for the case when the promise is cencelled. If the promise already cancelled the
34 /// handler will be invoked immediatelly.
28 /// handler will be invoked immediatelly.
35 /// </summary>
29 /// </summary>
@@ -32,7 +32,13
32 <Reference Include="System" />
32 <Reference Include="System" />
33 </ItemGroup>
33 </ItemGroup>
34 <ItemGroup>
34 <ItemGroup>
35 <Compile Include="ICancellable.cs" />
36 <Compile Include="IProgressHandler.cs" />
37 <Compile Include="IProgressNotifier.cs" />
35 <Compile Include="IPromise.cs" />
38 <Compile Include="IPromise.cs" />
39 <Compile Include="ITaskController.cs" />
40 <Compile Include="ManagedPromise.cs" />
41 <Compile Include="Parallels\WorkerPool.cs" />
36 <Compile Include="PromiseState.cs" />
42 <Compile Include="PromiseState.cs" />
37 <Compile Include="TaskController.cs" />
43 <Compile Include="TaskController.cs" />
38 <Compile Include="ProgressInitEventArgs.cs" />
44 <Compile Include="ProgressInitEventArgs.cs" />
@@ -5,7 +5,7 using System.Text;
5
5
6 namespace Implab
6 namespace Implab
7 {
7 {
8
8 [Serializable]
9 public class ProgressInitEventArgs: EventArgs
9 public class ProgressInitEventArgs: EventArgs
10 {
10 {
11 public float MaxProgress
11 public float MaxProgress
@@ -12,7 +12,7 namespace Implab
12 /// <remarks>
12 /// <remarks>
13 /// Members of this object are thread safe.
13 /// Members of this object are thread safe.
14 /// </remarks>
14 /// </remarks>
15 class TaskController
15 class TaskController: IProgressNotifier, ITaskController, ICancellable
16 {
16 {
17 readonly object m_lock;
17 readonly object m_lock;
18 string m_message;
18 string m_message;
@@ -20,6 +20,8 namespace Implab
20 float m_current;
20 float m_current;
21 float m_max;
21 float m_max;
22
22
23 bool m_cancelled;
24
23 public event EventHandler<ValueEventArgs<string>> MessageUpdated;
25 public event EventHandler<ValueEventArgs<string>> MessageUpdated;
24 public event EventHandler<ValueEventArgs<float>> ProgressUpdated;
26 public event EventHandler<ValueEventArgs<float>> ProgressUpdated;
25 public event EventHandler<ProgressInitEventArgs> ProgressInit;
27 public event EventHandler<ProgressInitEventArgs> ProgressInit;
@@ -82,6 +84,24 namespace Implab
82 }
84 }
83 }
85 }
84
86
87 public bool Cancelled {
88 get {
89 lock (m_lock)
90 return m_cancelled;
91 }
92 }
93
94 public bool Cancel() {
95 lock (m_lock) {
96 if (!m_cancelled) {
97 m_cancelled = true;
98 return true;
99 } else {
100 return false;
101 }
102 }
103 }
104
85 protected virtual void OnMessageUpdated()
105 protected virtual void OnMessageUpdated()
86 {
106 {
87 var temp = MessageUpdated;
107 var temp = MessageUpdated;
@@ -5,6 +5,7 using System.Text;
5
5
6 namespace Implab
6 namespace Implab
7 {
7 {
8 [Serializable]
8 public class ValueEventArgs<T>: EventArgs
9 public class ValueEventArgs<T>: EventArgs
9 {
10 {
10 public ValueEventArgs(T value)
11 public ValueEventArgs(T value)
General Comments 0
You need to be logged in to leave comments. Login now