@@ -0,0 +1,74 | |||||
|
1 | using System; | |||
|
2 | using System.Collections.Generic; | |||
|
3 | using System.Linq; | |||
|
4 | using System.Text; | |||
|
5 | using System.Threading; | |||
|
6 | ||||
|
7 | namespace Implab.Parallels { | |||
|
8 | public class MTQueue<T> { | |||
|
9 | class Node { | |||
|
10 | public Node(T value) { | |||
|
11 | this.value = value; | |||
|
12 | } | |||
|
13 | public readonly T value; | |||
|
14 | public Node next; | |||
|
15 | } | |||
|
16 | ||||
|
17 | Node m_first; | |||
|
18 | Node m_last; | |||
|
19 | ||||
|
20 | public void Enqueue(T value) { | |||
|
21 | var last = m_last; | |||
|
22 | var next = new Node(value); | |||
|
23 | ||||
|
24 | while (last != Interlocked.CompareExchange(ref m_last, next, last)) | |||
|
25 | last = m_last; | |||
|
26 | ||||
|
27 | if (last != null) | |||
|
28 | last.next = next; | |||
|
29 | else | |||
|
30 | m_first = next; | |||
|
31 | } | |||
|
32 | ||||
|
33 | public bool TryDequeue(out T value) { | |||
|
34 | Node first; | |||
|
35 | Node next = null; | |||
|
36 | value = default(T); | |||
|
37 | ||||
|
38 | do { | |||
|
39 | first = m_first; | |||
|
40 | if (first == null) | |||
|
41 | return false; | |||
|
42 | next = first.next; | |||
|
43 | if (next == null) { | |||
|
44 | // this is the last element, | |||
|
45 | // then try to update tail | |||
|
46 | if (first != Interlocked.CompareExchange(ref m_last, null, first)) { | |||
|
47 | // this is inconsistent situation which means that the queue is empty | |||
|
48 | if (m_last == null) | |||
|
49 | return false; | |||
|
50 | // tail has been changed, that means that we need to restart | |||
|
51 | continue; | |||
|
52 | } | |||
|
53 | ||||
|
54 | // tail succesfully updated and first.next will never be changed | |||
|
55 | // other readers will fail due to inconsistency m_last != m_fist, but m_first.next == null | |||
|
56 | // but the writer may update the m_first since the m_last is null | |||
|
57 | ||||
|
58 | // so we need to fix inconsistency by setting m_first to null, but if it already has been | |||
|
59 | // updated by a writer then we should just give up | |||
|
60 | Interlocked.CompareExchange(ref m_first, null, first); | |||
|
61 | break; | |||
|
62 | ||||
|
63 | } else { | |||
|
64 | if (first == Interlocked.CompareExchange(ref m_first, next, first)) | |||
|
65 | // head succesfully updated | |||
|
66 | break; | |||
|
67 | } | |||
|
68 | } while (true); | |||
|
69 | ||||
|
70 | value = first.value; | |||
|
71 | return true; | |||
|
72 | } | |||
|
73 | } | |||
|
74 | } |
@@ -137,6 +137,77 namespace Implab.Test | |||||
137 | } |
|
137 | } | |
138 |
|
138 | |||
139 | [TestMethod] |
|
139 | [TestMethod] | |
|
140 | public void MTQueueTest() { | |||
|
141 | var queue = new MTQueue<int>(); | |||
|
142 | var pool = new WorkerPool(5, 20); | |||
|
143 | ||||
|
144 | int res; | |||
|
145 | ||||
|
146 | queue.Enqueue(10); | |||
|
147 | Assert.IsTrue(queue.TryDequeue(out res)); | |||
|
148 | Assert.AreEqual(10, res); | |||
|
149 | Assert.IsFalse(queue.TryDequeue(out res)); | |||
|
150 | ||||
|
151 | for (int i = 0; i < 1000; i++) | |||
|
152 | queue.Enqueue(i); | |||
|
153 | ||||
|
154 | for (int i = 0; i < 1000; i++) { | |||
|
155 | queue.TryDequeue(out res); | |||
|
156 | Assert.AreEqual(i, res); | |||
|
157 | } | |||
|
158 | ||||
|
159 | int writers = 0; | |||
|
160 | int readers = 0; | |||
|
161 | var stop = new ManualResetEvent(false); | |||
|
162 | int total = 0; | |||
|
163 | ||||
|
164 | int itemsPerWriter = 1000; | |||
|
165 | int writersCount = 3; | |||
|
166 | ||||
|
167 | for (int i = 0; i < writersCount; i++) { | |||
|
168 | Interlocked.Increment(ref writers); | |||
|
169 | var wn = i; | |||
|
170 | AsyncPool | |||
|
171 | .InvokeNewThread(() => { | |||
|
172 | Console.WriteLine("Started writer: {0}", wn); | |||
|
173 | for (int ii = 0; ii < itemsPerWriter; ii++) { | |||
|
174 | queue.Enqueue(1); | |||
|
175 | Thread.Sleep(1); | |||
|
176 | } | |||
|
177 | Console.WriteLine("Stopped writer: {0}", wn); | |||
|
178 | return 1; | |||
|
179 | }) | |||
|
180 | .Then(x => Interlocked.Decrement(ref writers) ); | |||
|
181 | } | |||
|
182 | ||||
|
183 | for (int i = 0; i < 10; i++) { | |||
|
184 | Interlocked.Increment(ref readers); | |||
|
185 | var wn = i; | |||
|
186 | AsyncPool | |||
|
187 | .InvokeNewThread(() => { | |||
|
188 | int t; | |||
|
189 | Console.WriteLine("Started reader: {0}", wn); | |||
|
190 | do { | |||
|
191 | while (queue.TryDequeue(out t)) | |||
|
192 | Interlocked.Add(ref total, t); | |||
|
193 | Thread.Sleep(0); | |||
|
194 | } while (writers > 0); | |||
|
195 | Console.WriteLine("Stopped reader: {0}", wn); | |||
|
196 | return 1; | |||
|
197 | }) | |||
|
198 | .Then(x => { | |||
|
199 | Interlocked.Decrement(ref readers); | |||
|
200 | if (readers == 0) | |||
|
201 | stop.Set(); | |||
|
202 | }); | |||
|
203 | } | |||
|
204 | ||||
|
205 | stop.WaitOne(); | |||
|
206 | ||||
|
207 | Assert.AreEqual(itemsPerWriter * writersCount, total); | |||
|
208 | } | |||
|
209 | ||||
|
210 | [TestMethod] | |||
140 | public void ComplexCase1Test() { |
|
211 | public void ComplexCase1Test() { | |
141 | var flags = new bool[3]; |
|
212 | var flags = new bool[3]; | |
142 |
|
213 |
1 | NO CONTENT: modified file, binary diff hidden |
|
NO CONTENT: modified file, binary diff hidden |
@@ -38,6 +38,7 | |||||
38 | <Compile Include="IPromise.cs" /> |
|
38 | <Compile Include="IPromise.cs" /> | |
39 | <Compile Include="ITaskController.cs" /> |
|
39 | <Compile Include="ITaskController.cs" /> | |
40 | <Compile Include="ManagedPromise.cs" /> |
|
40 | <Compile Include="ManagedPromise.cs" /> | |
|
41 | <Compile Include="Parallels\MTQueue.cs" /> | |||
41 | <Compile Include="Parallels\WorkerPool.cs" /> |
|
42 | <Compile Include="Parallels\WorkerPool.cs" /> | |
42 | <Compile Include="PromiseState.cs" /> |
|
43 | <Compile Include="PromiseState.cs" /> | |
43 | <Compile Include="TaskController.cs" /> |
|
44 | <Compile Include="TaskController.cs" /> |
@@ -24,5 +24,22 namespace Implab.Parallels { | |||||
24 |
|
24 | |||
25 | return p; |
|
25 | return p; | |
26 | } |
|
26 | } | |
|
27 | ||||
|
28 | public static Promise<T> InvokeNewThread<T>(Func<T> func) { | |||
|
29 | var p = new Promise<T>(); | |||
|
30 | ||||
|
31 | var worker = new Thread(() => { | |||
|
32 | try { | |||
|
33 | p.Resolve(func()); | |||
|
34 | } catch (Exception e) { | |||
|
35 | p.Reject(e); | |||
|
36 | } | |||
|
37 | }); | |||
|
38 | worker.IsBackground = true; | |||
|
39 | ||||
|
40 | worker.Start(); | |||
|
41 | ||||
|
42 | return p; | |||
27 | } |
|
43 | } | |
28 | } |
|
44 | } | |
|
45 | } |
@@ -88,7 +88,7 namespace Implab { | |||||
88 | /// <param name="result">Результат выполнения.</param> |
|
88 | /// <param name="result">Результат выполнения.</param> | |
89 | /// <exception cref="InvalidOperationException">Данное обещание уже выполнено</exception> |
|
89 | /// <exception cref="InvalidOperationException">Данное обещание уже выполнено</exception> | |
90 | public void Resolve(T result) { |
|
90 | public void Resolve(T result) { | |
91 |
lock ( |
|
91 | lock (m_lock) { | |
92 | if (m_state == PromiseState.Cancelled) |
|
92 | if (m_state == PromiseState.Cancelled) | |
93 | return; |
|
93 | return; | |
94 | if (m_state != PromiseState.Unresolved) |
|
94 | if (m_state != PromiseState.Unresolved) | |
@@ -106,7 +106,7 namespace Implab { | |||||
106 | /// <param name="error">Исключение возникшее при выполнении операции</param> |
|
106 | /// <param name="error">Исключение возникшее при выполнении операции</param> | |
107 | /// <exception cref="InvalidOperationException">Данное обещание уже выполнено</exception> |
|
107 | /// <exception cref="InvalidOperationException">Данное обещание уже выполнено</exception> | |
108 | public void Reject(Exception error) { |
|
108 | public void Reject(Exception error) { | |
109 |
lock ( |
|
109 | lock (m_lock) { | |
110 | if (m_state == PromiseState.Cancelled) |
|
110 | if (m_state == PromiseState.Cancelled) | |
111 | return; |
|
111 | return; | |
112 | if (m_state != PromiseState.Unresolved) |
|
112 | if (m_state != PromiseState.Unresolved) |
General Comments 0
You need to be logged in to leave comments.
Login now