##// END OF EJS Templates
Implemented interllocked queue...
cin -
r14:e943453e5039 promises
parent child
Show More
@@ -0,0 +1,74
1 using System;
2 using System.Collections.Generic;
3 using System.Linq;
4 using System.Text;
5 using System.Threading;
6
7 namespace Implab.Parallels {
8 public class MTQueue<T> {
9 class Node {
10 public Node(T value) {
11 this.value = value;
12 }
13 public readonly T value;
14 public Node next;
15 }
16
17 Node m_first;
18 Node m_last;
19
20 public void Enqueue(T value) {
21 var last = m_last;
22 var next = new Node(value);
23
24 while (last != Interlocked.CompareExchange(ref m_last, next, last))
25 last = m_last;
26
27 if (last != null)
28 last.next = next;
29 else
30 m_first = next;
31 }
32
33 public bool TryDequeue(out T value) {
34 Node first;
35 Node next = null;
36 value = default(T);
37
38 do {
39 first = m_first;
40 if (first == null)
41 return false;
42 next = first.next;
43 if (next == null) {
44 // this is the last element,
45 // then try to update tail
46 if (first != Interlocked.CompareExchange(ref m_last, null, first)) {
47 // this is inconsistent situation which means that the queue is empty
48 if (m_last == null)
49 return false;
50 // tail has been changed, that means that we need to restart
51 continue;
52 }
53
54 // tail succesfully updated and first.next will never be changed
55 // other readers will fail due to inconsistency m_last != m_fist, but m_first.next == null
56 // but the writer may update the m_first since the m_last is null
57
58 // so we need to fix inconsistency by setting m_first to null, but if it already has been
59 // updated by a writer then we should just give up
60 Interlocked.CompareExchange(ref m_first, null, first);
61 break;
62
63 } else {
64 if (first == Interlocked.CompareExchange(ref m_first, next, first))
65 // head succesfully updated
66 break;
67 }
68 } while (true);
69
70 value = first.value;
71 return true;
72 }
73 }
74 }
@@ -137,6 +137,77 namespace Implab.Test
137 137 }
138 138
139 139 [TestMethod]
140 public void MTQueueTest() {
141 var queue = new MTQueue<int>();
142 var pool = new WorkerPool(5, 20);
143
144 int res;
145
146 queue.Enqueue(10);
147 Assert.IsTrue(queue.TryDequeue(out res));
148 Assert.AreEqual(10, res);
149 Assert.IsFalse(queue.TryDequeue(out res));
150
151 for (int i = 0; i < 1000; i++)
152 queue.Enqueue(i);
153
154 for (int i = 0; i < 1000; i++) {
155 queue.TryDequeue(out res);
156 Assert.AreEqual(i, res);
157 }
158
159 int writers = 0;
160 int readers = 0;
161 var stop = new ManualResetEvent(false);
162 int total = 0;
163
164 int itemsPerWriter = 1000;
165 int writersCount = 3;
166
167 for (int i = 0; i < writersCount; i++) {
168 Interlocked.Increment(ref writers);
169 var wn = i;
170 AsyncPool
171 .InvokeNewThread(() => {
172 Console.WriteLine("Started writer: {0}", wn);
173 for (int ii = 0; ii < itemsPerWriter; ii++) {
174 queue.Enqueue(1);
175 Thread.Sleep(1);
176 }
177 Console.WriteLine("Stopped writer: {0}", wn);
178 return 1;
179 })
180 .Then(x => Interlocked.Decrement(ref writers) );
181 }
182
183 for (int i = 0; i < 10; i++) {
184 Interlocked.Increment(ref readers);
185 var wn = i;
186 AsyncPool
187 .InvokeNewThread(() => {
188 int t;
189 Console.WriteLine("Started reader: {0}", wn);
190 do {
191 while (queue.TryDequeue(out t))
192 Interlocked.Add(ref total, t);
193 Thread.Sleep(0);
194 } while (writers > 0);
195 Console.WriteLine("Stopped reader: {0}", wn);
196 return 1;
197 })
198 .Then(x => {
199 Interlocked.Decrement(ref readers);
200 if (readers == 0)
201 stop.Set();
202 });
203 }
204
205 stop.WaitOne();
206
207 Assert.AreEqual(itemsPerWriter * writersCount, total);
208 }
209
210 [TestMethod]
140 211 public void ComplexCase1Test() {
141 212 var flags = new bool[3];
142 213
1 NO CONTENT: modified file, binary diff hidden
@@ -38,6 +38,7
38 38 <Compile Include="IPromise.cs" />
39 39 <Compile Include="ITaskController.cs" />
40 40 <Compile Include="ManagedPromise.cs" />
41 <Compile Include="Parallels\MTQueue.cs" />
41 42 <Compile Include="Parallels\WorkerPool.cs" />
42 43 <Compile Include="PromiseState.cs" />
43 44 <Compile Include="TaskController.cs" />
@@ -16,13 +16,30 namespace Implab.Parallels {
16 16
17 17 ThreadPool.QueueUserWorkItem(param => {
18 18 try {
19 p.Resolve(func());
19 p.Resolve(func());
20 20 } catch(Exception e) {
21 21 p.Reject(e);
22 22 }
23 23 });
24 24
25 25 return p;
26 }
26 }
27
28 public static Promise<T> InvokeNewThread<T>(Func<T> func) {
29 var p = new Promise<T>();
30
31 var worker = new Thread(() => {
32 try {
33 p.Resolve(func());
34 } catch (Exception e) {
35 p.Reject(e);
36 }
37 });
38 worker.IsBackground = true;
39
40 worker.Start();
41
42 return p;
43 }
27 44 }
28 45 }
@@ -88,7 +88,7 namespace Implab {
88 88 /// <param name="result">Результат выполнения.</param>
89 89 /// <exception cref="InvalidOperationException">Данное обещание уже выполнено</exception>
90 90 public void Resolve(T result) {
91 lock (this) {
91 lock (m_lock) {
92 92 if (m_state == PromiseState.Cancelled)
93 93 return;
94 94 if (m_state != PromiseState.Unresolved)
@@ -106,7 +106,7 namespace Implab {
106 106 /// <param name="error">Исключение возникшее при выполнении операции</param>
107 107 /// <exception cref="InvalidOperationException">Данное обещание уже выполнено</exception>
108 108 public void Reject(Exception error) {
109 lock (this) {
109 lock (m_lock) {
110 110 if (m_state == PromiseState.Cancelled)
111 111 return;
112 112 if (m_state != PromiseState.Unresolved)
General Comments 0
You need to be logged in to leave comments. Login now