##// END OF EJS Templates
refactoring, added WorkerPool
cin -
r12:eb418ba8275b promises
parent child
Show More
@@ -0,0 +1,10
1 using System;
2 using System.Collections.Generic;
3 using System.Linq;
4 using System.Text;
5
6 namespace Implab {
7 public interface ICancellable {
8 bool Cancel();
9 }
10 }
@@ -0,0 +1,18
1 using System;
2 using System.Collections.Generic;
3 using System.Linq;
4 using System.Text;
5
6 namespace Implab {
7 public interface IProgressHandler {
8 string Message {
9 get;
10 set;
11 }
12 float CurrentProgress {
13 get;
14 set;
15 }
16 void InitProgress(float current, float max, string message);
17 }
18 }
@@ -0,0 +1,14
1 using System;
2 using System.Collections.Generic;
3 using System.Linq;
4 using System.Text;
5
6 namespace Implab
7 {
8 public interface IProgressNotifier
9 {
10 event EventHandler<ValueEventArgs<string>> MessageUpdated;
11 event EventHandler<ValueEventArgs<float>> ProgressUpdated;
12 EventHandler<ProgressInitEventArgs> ProgressInit;
13 }
14 }
@@ -0,0 +1,12
1 using System;
2 using System.Collections.Generic;
3 using System.Linq;
4 using System.Text;
5
6 namespace Implab {
7 public interface ITaskController: IProgressHandler {
8 bool Cancelled {
9 get;
10 }
11 }
12 }
@@ -0,0 +1,11
1 using System;
2 using System.Collections.Generic;
3 using System.Linq;
4 using System.Text;
5
6 namespace Implab {
7
8 public class ManagedPromise<T>: Promise<T>, ITaskController, IProgressNotifier {
9
10 }
11 }
@@ -0,0 +1,131
1 using System;
2 using System.Collections.Generic;
3 using System.Linq;
4 using System.Text;
5 using System.Threading;
6 using System.Diagnostics;
7
8 namespace Implab.Parallels {
9 public class WorkerPool : IDisposable {
10 readonly int m_minThreads;
11 readonly int m_maxThreads;
12 int m_runningThreads;
13 object m_lock = new object();
14
15 bool m_disposed = false;
16 ManualResetEvent m_hasTasks = new ManualResetEvent(false);
17 Queue<Action> m_queue = new Queue<Action>();
18
19 public WorkerPool(int min, int max) {
20 if (min < 0)
21 throw new ArgumentOutOfRangeException("min");
22 if (min > max)
23 min = max;
24 m_minThreads = min;
25 m_maxThreads = max;
26
27 for (int i = 0; i < m_minThreads; i++)
28 StartWorker();
29 }
30
31 public Promise<T> Invoke<T>(Func<T> task) {
32 if (m_disposed)
33 throw new ObjectDisposedException(ToString());
34 if (task == null)
35 throw new ArgumentNullException("task");
36
37 var promise = new Promise<T>();
38
39
40
41 return promise;
42 }
43
44 bool StartWorker() {
45 var current = m_runningThreads;
46 // use spins to allocate slot for the new thread
47 do {
48 if (current >= m_maxThreads)
49 // no more slots left
50 return false;
51 } while (current != Interlocked.CompareExchange(ref m_runningThreads, current + 1, current));
52
53 // slot successfully allocated
54
55 var worker = new Thread(this.Worker);
56 worker.Start();
57
58 return true;
59 }
60
61 void EnqueueTask(Action task) {
62 Debug.Assert(task != null);
63 lock (m_queue) {
64 m_queue.Enqueue(task);
65 m_hasTasks.Set();
66 }
67 }
68
69 bool FetchTask(out Action task) {
70 task = null;
71
72 while (true) {
73
74 m_hasTasks.WaitOne();
75
76 if (m_disposed)
77 return false;
78
79 lock (m_queue) {
80 if (m_queue.Count > 0) {
81 task = m_queue.Dequeue();
82 return true;
83 }
84
85 // no tasks left
86 // signal that no more tasks left, lock ensures that this event won't suppress newly added task
87 m_hasTasks.Reset();
88 }
89
90 bool exit = true;
91
92 var current = m_runningThreads;
93 do {
94 if (current <= m_minThreads) {
95 exit = false; // this thread should return and wait for the new events
96 break;
97 }
98 } while (current != Interlocked.CompareExchange(ref m_runningThreads, current - 1, current));
99
100 if (exit)
101 return false;
102 }
103 }
104
105 void Worker() {
106 Action task;
107 while (FetchTask(out task))
108 task();
109 }
110
111 protected virtual void Dispose(bool disposing) {
112 if (disposing) {
113 lock (m_lock) {
114 if (m_disposed)
115 return;
116 m_disposed = true;
117 }
118 m_hasTasks.Set();
119 GC.SuppressFinalize(this);
120 }
121 }
122
123 public void Dispose() {
124 Dispose(true);
125 }
126
127 ~WorkerPool() {
128 Dispose(false);
129 }
130 }
131 }
1 NO CONTENT: modified file, binary diff hidden
@@ -1,39 +1,33
1 1 using System;
2 2 using System.Collections.Generic;
3 3 using System.Linq;
4 4 using System.Text;
5 5
6 6 namespace Implab
7 7 {
8 public interface IPromise
8 public interface IPromise: ICancellable
9 9 {
10 10 /// <summary>
11 11 /// Check whereather the promise has no more than one dependent promise.
12 12 /// </summary>
13 13 bool IsExclusive
14 14 {
15 15 get;
16 16 }
17 17
18 18 /// <summary>
19 19 /// The current state of the promise.
20 20 /// </summary>
21 21 PromiseState State
22 22 {
23 23 get;
24 24 }
25 25
26 26 /// <summary>
27 /// Tries to cancel the the complete chain of promises.
28 /// </summary>
29 /// <returns><c>true</c> - if the promise has been cancelled, otherwise the promise will be resolved (or resolved already).</returns>
30 bool Cancel();
31
32 /// <summary>
33 27 /// Registers handler for the case when the promise is cencelled. If the promise already cancelled the
34 28 /// handler will be invoked immediatelly.
35 29 /// </summary>
36 30 /// <param name="handler">The handler</param>
37 31 void HandleCancelled(Action handler);
38 32 }
39 33 }
@@ -1,47 +1,53
1 1 <?xml version="1.0" encoding="utf-8"?>
2 2 <Project DefaultTargets="Build" ToolsVersion="4.0" xmlns="http://schemas.microsoft.com/developer/msbuild/2003">
3 3 <PropertyGroup>
4 4 <Configuration Condition=" '$(Configuration)' == '' ">Debug</Configuration>
5 5 <Platform Condition=" '$(Platform)' == '' ">AnyCPU</Platform>
6 6 <ProductVersion>10.0.0</ProductVersion>
7 7 <SchemaVersion>2.0</SchemaVersion>
8 8 <ProjectGuid>{F550F1F8-8746-4AD0-9614-855F4C4B7F05}</ProjectGuid>
9 9 <OutputType>Library</OutputType>
10 10 <RootNamespace>Implab</RootNamespace>
11 11 <AssemblyName>Implab</AssemblyName>
12 12 </PropertyGroup>
13 13 <PropertyGroup Condition=" '$(Configuration)|$(Platform)' == 'Debug|AnyCPU' ">
14 14 <DebugSymbols>true</DebugSymbols>
15 15 <DebugType>full</DebugType>
16 16 <Optimize>false</Optimize>
17 17 <OutputPath>bin\Debug</OutputPath>
18 18 <DefineConstants>DEBUG;</DefineConstants>
19 19 <ErrorReport>prompt</ErrorReport>
20 20 <WarningLevel>4</WarningLevel>
21 21 <ConsolePause>false</ConsolePause>
22 22 </PropertyGroup>
23 23 <PropertyGroup Condition=" '$(Configuration)|$(Platform)' == 'Release|AnyCPU' ">
24 24 <DebugType>full</DebugType>
25 25 <Optimize>true</Optimize>
26 26 <OutputPath>bin\Release</OutputPath>
27 27 <ErrorReport>prompt</ErrorReport>
28 28 <WarningLevel>4</WarningLevel>
29 29 <ConsolePause>false</ConsolePause>
30 30 </PropertyGroup>
31 31 <ItemGroup>
32 32 <Reference Include="System" />
33 33 </ItemGroup>
34 34 <ItemGroup>
35 <Compile Include="ICancellable.cs" />
36 <Compile Include="IProgressHandler.cs" />
37 <Compile Include="IProgressNotifier.cs" />
35 38 <Compile Include="IPromise.cs" />
39 <Compile Include="ITaskController.cs" />
40 <Compile Include="ManagedPromise.cs" />
41 <Compile Include="Parallels\WorkerPool.cs" />
36 42 <Compile Include="PromiseState.cs" />
37 43 <Compile Include="TaskController.cs" />
38 44 <Compile Include="ProgressInitEventArgs.cs" />
39 45 <Compile Include="Properties\AssemblyInfo.cs" />
40 46 <Compile Include="Promise.cs" />
41 47 <Compile Include="Parallels\AsyncPool.cs" />
42 48 <Compile Include="Safe.cs" />
43 49 <Compile Include="ValueEventArgs.cs" />
44 50 </ItemGroup>
45 51 <Import Project="$(MSBuildBinPath)\Microsoft.CSharp.targets" />
46 52 <ItemGroup />
47 53 </Project> No newline at end of file
@@ -1,36 +1,36
1 1 using System;
2 2 using System.Collections.Generic;
3 3 using System.Linq;
4 4 using System.Text;
5 5
6 6 namespace Implab
7 7 {
8
8 [Serializable]
9 9 public class ProgressInitEventArgs: EventArgs
10 10 {
11 11 public float MaxProgress
12 12 {
13 13 get;
14 14 private set;
15 15 }
16 16
17 17 public float CurrentProgress
18 18 {
19 19 get;
20 20 private set;
21 21 }
22 22
23 23 public string Message
24 24 {
25 25 get;
26 26 private set;
27 27 }
28 28
29 29 public ProgressInitEventArgs(float current, float max, string message)
30 30 {
31 31 this.MaxProgress = max;
32 32 this.CurrentProgress = current;
33 33 this.Message = message;
34 34 }
35 35 }
36 36 }
@@ -1,112 +1,132
1 1 using System;
2 2 using System.Collections.Generic;
3 3 using System.Linq;
4 4 using System.Text;
5 5 using System.Threading;
6 6
7 7 namespace Implab
8 8 {
9 9 /// <summary>
10 10 /// This class allows to interact with asyncronuos task.
11 11 /// </summary>
12 12 /// <remarks>
13 13 /// Members of this object are thread safe.
14 14 /// </remarks>
15 class TaskController
15 class TaskController: IProgressNotifier, ITaskController, ICancellable
16 16 {
17 17 readonly object m_lock;
18 18 string m_message;
19 19
20 20 float m_current;
21 21 float m_max;
22 22
23 bool m_cancelled;
24
23 25 public event EventHandler<ValueEventArgs<string>> MessageUpdated;
24 26 public event EventHandler<ValueEventArgs<float>> ProgressUpdated;
25 27 public event EventHandler<ProgressInitEventArgs> ProgressInit;
26 28
27 29 public TaskController()
28 30 {
29 31 m_lock = new Object();
30 32 }
31 33
32 34 public string Message
33 35 {
34 36 get
35 37 {
36 38 lock (m_lock)
37 39 return m_message;
38 40 }
39 41 set
40 42 {
41 43 lock (m_lock)
42 44 {
43 45 m_message = value;
44 46 OnMessageUpdated();
45 47 }
46 48 }
47 49 }
48 50
49 51 public float CurrentProgress
50 52 {
51 53 get
52 54 {
53 55 lock (m_lock)
54 56 return m_current;
55 57 }
56 58 set
57 59 {
58 60 lock (m_lock)
59 61 {
60 62 var prev = m_current;
61 63 m_current = value;
62 64 if (m_current >= m_max)
63 65 m_current = m_max;
64 66 if (m_current != prev)
65 67 OnProgressUpdated();
66 68 }
67 69 }
68 70 }
69 71
70 72 public void InitProgress(float current, float max, string message)
71 73 {
72 74 if (max < 0)
73 75 throw new ArgumentOutOfRangeException("max");
74 76 if (current < 0 || current > max)
75 77 throw new ArgumentOutOfRangeException("current");
76 78
77 79 lock(m_lock) {
78 80 m_current = current;
79 81 m_max = max;
80 82 m_message = message;
81 83 OnProgressInit();
82 84 }
83 85 }
84 86
87 public bool Cancelled {
88 get {
89 lock (m_lock)
90 return m_cancelled;
91 }
92 }
93
94 public bool Cancel() {
95 lock (m_lock) {
96 if (!m_cancelled) {
97 m_cancelled = true;
98 return true;
99 } else {
100 return false;
101 }
102 }
103 }
104
85 105 protected virtual void OnMessageUpdated()
86 106 {
87 107 var temp = MessageUpdated;
88 108 if (temp != null)
89 109 {
90 110 temp(this, new ValueEventArgs<string>(m_message));
91 111 }
92 112 }
93 113
94 114 protected virtual void OnProgressUpdated()
95 115 {
96 116 var temp = ProgressUpdated;
97 117 if (temp != null)
98 118 {
99 119 temp(this,new ValueEventArgs<float>(m_current));
100 120 }
101 121 }
102 122
103 123 protected virtual void OnProgressInit()
104 124 {
105 125 var temp = ProgressInit;
106 126 if (temp != null)
107 127 {
108 128 temp(this, new ProgressInitEventArgs(m_current,m_max, m_message));
109 129 }
110 130 }
111 131 }
112 132 }
@@ -1,20 +1,21
1 1 using System;
2 2 using System.Collections.Generic;
3 3 using System.Linq;
4 4 using System.Text;
5 5
6 6 namespace Implab
7 7 {
8 [Serializable]
8 9 public class ValueEventArgs<T>: EventArgs
9 10 {
10 11 public ValueEventArgs(T value)
11 12 {
12 13 this.Value = value;
13 14 }
14 15 public T Value
15 16 {
16 17 get;
17 18 private set;
18 19 }
19 20 }
20 21 }
General Comments 0
You need to be logged in to leave comments. Login now