@@ -0,0 +1,74 | |||
|
1 | using System; | |
|
2 | using System.Collections.Generic; | |
|
3 | using System.Linq; | |
|
4 | using System.Text; | |
|
5 | using System.Threading; | |
|
6 | ||
|
7 | namespace Implab.Parallels { | |
|
8 | public class MTQueue<T> { | |
|
9 | class Node { | |
|
10 | public Node(T value) { | |
|
11 | this.value = value; | |
|
12 | } | |
|
13 | public readonly T value; | |
|
14 | public Node next; | |
|
15 | } | |
|
16 | ||
|
17 | Node m_first; | |
|
18 | Node m_last; | |
|
19 | ||
|
20 | public void Enqueue(T value) { | |
|
21 | var last = m_last; | |
|
22 | var next = new Node(value); | |
|
23 | ||
|
24 | while (last != Interlocked.CompareExchange(ref m_last, next, last)) | |
|
25 | last = m_last; | |
|
26 | ||
|
27 | if (last != null) | |
|
28 | last.next = next; | |
|
29 | else | |
|
30 | m_first = next; | |
|
31 | } | |
|
32 | ||
|
33 | public bool TryDequeue(out T value) { | |
|
34 | Node first; | |
|
35 | Node next = null; | |
|
36 | value = default(T); | |
|
37 | ||
|
38 | do { | |
|
39 | first = m_first; | |
|
40 | if (first == null) | |
|
41 | return false; | |
|
42 | next = first.next; | |
|
43 | if (next == null) { | |
|
44 | // this is the last element, | |
|
45 | // then try to update tail | |
|
46 | if (first != Interlocked.CompareExchange(ref m_last, null, first)) { | |
|
47 | // this is inconsistent situation which means that the queue is empty | |
|
48 | if (m_last == null) | |
|
49 | return false; | |
|
50 | // tail has been changed, that means that we need to restart | |
|
51 | continue; | |
|
52 | } | |
|
53 | ||
|
54 | // tail succesfully updated and first.next will never be changed | |
|
55 | // other readers will fail due to inconsistency m_last != m_fist, but m_first.next == null | |
|
56 | // but the writer may update the m_first since the m_last is null | |
|
57 | ||
|
58 | // so we need to fix inconsistency by setting m_first to null, but if it already has been | |
|
59 | // updated by a writer then we should just give up | |
|
60 | Interlocked.CompareExchange(ref m_first, null, first); | |
|
61 | break; | |
|
62 | ||
|
63 | } else { | |
|
64 | if (first == Interlocked.CompareExchange(ref m_first, next, first)) | |
|
65 | // head succesfully updated | |
|
66 | break; | |
|
67 | } | |
|
68 | } while (true); | |
|
69 | ||
|
70 | value = first.value; | |
|
71 | return true; | |
|
72 | } | |
|
73 | } | |
|
74 | } |
@@ -137,6 +137,77 namespace Implab.Test | |||
|
137 | 137 | } |
|
138 | 138 | |
|
139 | 139 | [TestMethod] |
|
140 | public void MTQueueTest() { | |
|
141 | var queue = new MTQueue<int>(); | |
|
142 | var pool = new WorkerPool(5, 20); | |
|
143 | ||
|
144 | int res; | |
|
145 | ||
|
146 | queue.Enqueue(10); | |
|
147 | Assert.IsTrue(queue.TryDequeue(out res)); | |
|
148 | Assert.AreEqual(10, res); | |
|
149 | Assert.IsFalse(queue.TryDequeue(out res)); | |
|
150 | ||
|
151 | for (int i = 0; i < 1000; i++) | |
|
152 | queue.Enqueue(i); | |
|
153 | ||
|
154 | for (int i = 0; i < 1000; i++) { | |
|
155 | queue.TryDequeue(out res); | |
|
156 | Assert.AreEqual(i, res); | |
|
157 | } | |
|
158 | ||
|
159 | int writers = 0; | |
|
160 | int readers = 0; | |
|
161 | var stop = new ManualResetEvent(false); | |
|
162 | int total = 0; | |
|
163 | ||
|
164 | int itemsPerWriter = 1000; | |
|
165 | int writersCount = 3; | |
|
166 | ||
|
167 | for (int i = 0; i < writersCount; i++) { | |
|
168 | Interlocked.Increment(ref writers); | |
|
169 | var wn = i; | |
|
170 | AsyncPool | |
|
171 | .InvokeNewThread(() => { | |
|
172 | Console.WriteLine("Started writer: {0}", wn); | |
|
173 | for (int ii = 0; ii < itemsPerWriter; ii++) { | |
|
174 | queue.Enqueue(1); | |
|
175 | Thread.Sleep(1); | |
|
176 | } | |
|
177 | Console.WriteLine("Stopped writer: {0}", wn); | |
|
178 | return 1; | |
|
179 | }) | |
|
180 | .Then(x => Interlocked.Decrement(ref writers) ); | |
|
181 | } | |
|
182 | ||
|
183 | for (int i = 0; i < 10; i++) { | |
|
184 | Interlocked.Increment(ref readers); | |
|
185 | var wn = i; | |
|
186 | AsyncPool | |
|
187 | .InvokeNewThread(() => { | |
|
188 | int t; | |
|
189 | Console.WriteLine("Started reader: {0}", wn); | |
|
190 | do { | |
|
191 | while (queue.TryDequeue(out t)) | |
|
192 | Interlocked.Add(ref total, t); | |
|
193 | Thread.Sleep(0); | |
|
194 | } while (writers > 0); | |
|
195 | Console.WriteLine("Stopped reader: {0}", wn); | |
|
196 | return 1; | |
|
197 | }) | |
|
198 | .Then(x => { | |
|
199 | Interlocked.Decrement(ref readers); | |
|
200 | if (readers == 0) | |
|
201 | stop.Set(); | |
|
202 | }); | |
|
203 | } | |
|
204 | ||
|
205 | stop.WaitOne(); | |
|
206 | ||
|
207 | Assert.AreEqual(itemsPerWriter * writersCount, total); | |
|
208 | } | |
|
209 | ||
|
210 | [TestMethod] | |
|
140 | 211 | public void ComplexCase1Test() { |
|
141 | 212 | var flags = new bool[3]; |
|
142 | 213 |
|
1 | NO CONTENT: modified file, binary diff hidden |
@@ -38,6 +38,7 | |||
|
38 | 38 | <Compile Include="IPromise.cs" /> |
|
39 | 39 | <Compile Include="ITaskController.cs" /> |
|
40 | 40 | <Compile Include="ManagedPromise.cs" /> |
|
41 | <Compile Include="Parallels\MTQueue.cs" /> | |
|
41 | 42 | <Compile Include="Parallels\WorkerPool.cs" /> |
|
42 | 43 | <Compile Include="PromiseState.cs" /> |
|
43 | 44 | <Compile Include="TaskController.cs" /> |
@@ -16,13 +16,30 namespace Implab.Parallels { | |||
|
16 | 16 | |
|
17 | 17 | ThreadPool.QueueUserWorkItem(param => { |
|
18 | 18 | try { |
|
19 | p.Resolve(func()); | |
|
19 | p.Resolve(func()); | |
|
20 | 20 | } catch(Exception e) { |
|
21 | 21 | p.Reject(e); |
|
22 | 22 | } |
|
23 | 23 | }); |
|
24 | 24 | |
|
25 | 25 | return p; |
|
26 | } | |
|
26 | } | |
|
27 | ||
|
28 | public static Promise<T> InvokeNewThread<T>(Func<T> func) { | |
|
29 | var p = new Promise<T>(); | |
|
30 | ||
|
31 | var worker = new Thread(() => { | |
|
32 | try { | |
|
33 | p.Resolve(func()); | |
|
34 | } catch (Exception e) { | |
|
35 | p.Reject(e); | |
|
36 | } | |
|
37 | }); | |
|
38 | worker.IsBackground = true; | |
|
39 | ||
|
40 | worker.Start(); | |
|
41 | ||
|
42 | return p; | |
|
43 | } | |
|
27 | 44 | } |
|
28 | 45 | } |
@@ -88,7 +88,7 namespace Implab { | |||
|
88 | 88 | /// <param name="result">Результат выполнения.</param> |
|
89 | 89 | /// <exception cref="InvalidOperationException">Данное обещание уже выполнено</exception> |
|
90 | 90 | public void Resolve(T result) { |
|
91 |
lock ( |
|
|
91 | lock (m_lock) { | |
|
92 | 92 | if (m_state == PromiseState.Cancelled) |
|
93 | 93 | return; |
|
94 | 94 | if (m_state != PromiseState.Unresolved) |
@@ -106,7 +106,7 namespace Implab { | |||
|
106 | 106 | /// <param name="error">Исключение возникшее при выполнении операции</param> |
|
107 | 107 | /// <exception cref="InvalidOperationException">Данное обещание уже выполнено</exception> |
|
108 | 108 | public void Reject(Exception error) { |
|
109 |
lock ( |
|
|
109 | lock (m_lock) { | |
|
110 | 110 | if (m_state == PromiseState.Cancelled) |
|
111 | 111 | return; |
|
112 | 112 | if (m_state != PromiseState.Unresolved) |
General Comments 0
You need to be logged in to leave comments.
Login now