##// END OF EJS Templates
Implemented interllocked queue...
cin -
r14:e943453e5039 promises
parent child
Show More
@@ -0,0 +1,74
1 using System;
2 using System.Collections.Generic;
3 using System.Linq;
4 using System.Text;
5 using System.Threading;
6
7 namespace Implab.Parallels {
8 public class MTQueue<T> {
9 class Node {
10 public Node(T value) {
11 this.value = value;
12 }
13 public readonly T value;
14 public Node next;
15 }
16
17 Node m_first;
18 Node m_last;
19
20 public void Enqueue(T value) {
21 var last = m_last;
22 var next = new Node(value);
23
24 while (last != Interlocked.CompareExchange(ref m_last, next, last))
25 last = m_last;
26
27 if (last != null)
28 last.next = next;
29 else
30 m_first = next;
31 }
32
33 public bool TryDequeue(out T value) {
34 Node first;
35 Node next = null;
36 value = default(T);
37
38 do {
39 first = m_first;
40 if (first == null)
41 return false;
42 next = first.next;
43 if (next == null) {
44 // this is the last element,
45 // then try to update tail
46 if (first != Interlocked.CompareExchange(ref m_last, null, first)) {
47 // this is inconsistent situation which means that the queue is empty
48 if (m_last == null)
49 return false;
50 // tail has been changed, that means that we need to restart
51 continue;
52 }
53
54 // tail succesfully updated and first.next will never be changed
55 // other readers will fail due to inconsistency m_last != m_fist, but m_first.next == null
56 // but the writer may update the m_first since the m_last is null
57
58 // so we need to fix inconsistency by setting m_first to null, but if it already has been
59 // updated by a writer then we should just give up
60 Interlocked.CompareExchange(ref m_first, null, first);
61 break;
62
63 } else {
64 if (first == Interlocked.CompareExchange(ref m_first, next, first))
65 // head succesfully updated
66 break;
67 }
68 } while (true);
69
70 value = first.value;
71 return true;
72 }
73 }
74 }
@@ -137,6 +137,77 namespace Implab.Test
137 }
137 }
138
138
139 [TestMethod]
139 [TestMethod]
140 public void MTQueueTest() {
141 var queue = new MTQueue<int>();
142 var pool = new WorkerPool(5, 20);
143
144 int res;
145
146 queue.Enqueue(10);
147 Assert.IsTrue(queue.TryDequeue(out res));
148 Assert.AreEqual(10, res);
149 Assert.IsFalse(queue.TryDequeue(out res));
150
151 for (int i = 0; i < 1000; i++)
152 queue.Enqueue(i);
153
154 for (int i = 0; i < 1000; i++) {
155 queue.TryDequeue(out res);
156 Assert.AreEqual(i, res);
157 }
158
159 int writers = 0;
160 int readers = 0;
161 var stop = new ManualResetEvent(false);
162 int total = 0;
163
164 int itemsPerWriter = 1000;
165 int writersCount = 3;
166
167 for (int i = 0; i < writersCount; i++) {
168 Interlocked.Increment(ref writers);
169 var wn = i;
170 AsyncPool
171 .InvokeNewThread(() => {
172 Console.WriteLine("Started writer: {0}", wn);
173 for (int ii = 0; ii < itemsPerWriter; ii++) {
174 queue.Enqueue(1);
175 Thread.Sleep(1);
176 }
177 Console.WriteLine("Stopped writer: {0}", wn);
178 return 1;
179 })
180 .Then(x => Interlocked.Decrement(ref writers) );
181 }
182
183 for (int i = 0; i < 10; i++) {
184 Interlocked.Increment(ref readers);
185 var wn = i;
186 AsyncPool
187 .InvokeNewThread(() => {
188 int t;
189 Console.WriteLine("Started reader: {0}", wn);
190 do {
191 while (queue.TryDequeue(out t))
192 Interlocked.Add(ref total, t);
193 Thread.Sleep(0);
194 } while (writers > 0);
195 Console.WriteLine("Stopped reader: {0}", wn);
196 return 1;
197 })
198 .Then(x => {
199 Interlocked.Decrement(ref readers);
200 if (readers == 0)
201 stop.Set();
202 });
203 }
204
205 stop.WaitOne();
206
207 Assert.AreEqual(itemsPerWriter * writersCount, total);
208 }
209
210 [TestMethod]
140 public void ComplexCase1Test() {
211 public void ComplexCase1Test() {
141 var flags = new bool[3];
212 var flags = new bool[3];
142
213
1 NO CONTENT: modified file, binary diff hidden
NO CONTENT: modified file, binary diff hidden
@@ -38,6 +38,7
38 <Compile Include="IPromise.cs" />
38 <Compile Include="IPromise.cs" />
39 <Compile Include="ITaskController.cs" />
39 <Compile Include="ITaskController.cs" />
40 <Compile Include="ManagedPromise.cs" />
40 <Compile Include="ManagedPromise.cs" />
41 <Compile Include="Parallels\MTQueue.cs" />
41 <Compile Include="Parallels\WorkerPool.cs" />
42 <Compile Include="Parallels\WorkerPool.cs" />
42 <Compile Include="PromiseState.cs" />
43 <Compile Include="PromiseState.cs" />
43 <Compile Include="TaskController.cs" />
44 <Compile Include="TaskController.cs" />
@@ -24,5 +24,22 namespace Implab.Parallels {
24
24
25 return p;
25 return p;
26 }
26 }
27
28 public static Promise<T> InvokeNewThread<T>(Func<T> func) {
29 var p = new Promise<T>();
30
31 var worker = new Thread(() => {
32 try {
33 p.Resolve(func());
34 } catch (Exception e) {
35 p.Reject(e);
36 }
37 });
38 worker.IsBackground = true;
39
40 worker.Start();
41
42 return p;
27 }
43 }
28 }
44 }
45 }
@@ -88,7 +88,7 namespace Implab {
88 /// <param name="result">Результат выполнения.</param>
88 /// <param name="result">Результат выполнения.</param>
89 /// <exception cref="InvalidOperationException">Данное обещание уже выполнено</exception>
89 /// <exception cref="InvalidOperationException">Данное обещание уже выполнено</exception>
90 public void Resolve(T result) {
90 public void Resolve(T result) {
91 lock (this) {
91 lock (m_lock) {
92 if (m_state == PromiseState.Cancelled)
92 if (m_state == PromiseState.Cancelled)
93 return;
93 return;
94 if (m_state != PromiseState.Unresolved)
94 if (m_state != PromiseState.Unresolved)
@@ -106,7 +106,7 namespace Implab {
106 /// <param name="error">Исключение возникшее при выполнении операции</param>
106 /// <param name="error">Исключение возникшее при выполнении операции</param>
107 /// <exception cref="InvalidOperationException">Данное обещание уже выполнено</exception>
107 /// <exception cref="InvalidOperationException">Данное обещание уже выполнено</exception>
108 public void Reject(Exception error) {
108 public void Reject(Exception error) {
109 lock (this) {
109 lock (m_lock) {
110 if (m_state == PromiseState.Cancelled)
110 if (m_state == PromiseState.Cancelled)
111 return;
111 return;
112 if (m_state != PromiseState.Unresolved)
112 if (m_state != PromiseState.Unresolved)
General Comments 0
You need to be logged in to leave comments. Login now