##// END OF EJS Templates
improved performance of promises
cin -
r125:f803565868a4 v2
parent child
Show More
@@ -633,7 +633,7 namespace Implab.Test {
633 [TestMethod]
633 [TestMethod]
634 public void ChainedMapTest() {
634 public void ChainedMapTest() {
635
635
636 using (var pool = new WorkerPool(0,10,1)) {
636 using (var pool = new WorkerPool()) {
637 const int count = 10000;
637 const int count = 10000;
638
638
639 var args = new double[count];
639 var args = new double[count];
@@ -12,10 +12,16 namespace Implab {
12 const int REJECTED_STATE = 3;
12 const int REJECTED_STATE = 3;
13 const int CANCELLED_STATE = 4;
13 const int CANCELLED_STATE = 4;
14
14
15 const int RESERVED_HANDLERS_COUNT = 4;
16
15 int m_state;
17 int m_state;
16 Exception m_error;
18 Exception m_error;
19 int m_handlersCount;
17
20
18 readonly AsyncQueue<THandler> m_handlers = new AsyncQueue<THandler>();
21 readonly THandler[] m_handlers = new THandler[RESERVED_HANDLERS_COUNT];
22 MTQueue<THandler> m_extraHandlers;
23 int m_handlerPointer = -1;
24 int m_handlersCommited;
19
25
20 #region state managment
26 #region state managment
21 bool BeginTransit() {
27 bool BeginTransit() {
@@ -88,22 +94,59 namespace Implab {
88 protected abstract void SignalCancelled(THandler handler);
94 protected abstract void SignalCancelled(THandler handler);
89
95
90 void OnSuccess() {
96 void OnSuccess() {
97 var hp = m_handlerPointer;
98 var slot = hp +1 ;
99 while (slot < m_handlersCommited) {
100 if (Interlocked.CompareExchange(ref m_handlerPointer, slot, hp) == hp) {
101 SignalSuccess(m_handlers[slot]);
102 }
103 hp = m_handlerPointer;
104 slot = hp +1 ;
105 }
106
107
108 if (m_extraHandlers != null) {
91 THandler handler;
109 THandler handler;
92 while (m_handlers.TryDequeue(out handler))
110 while (m_extraHandlers.TryDequeue(out handler))
93 SignalSuccess(handler);
111 SignalSuccess(handler);
94 }
112 }
113 }
95
114
96 void OnError() {
115 void OnError() {
116 var hp = m_handlerPointer;
117 var slot = hp +1 ;
118 while (slot < m_handlersCommited) {
119 if (Interlocked.CompareExchange(ref m_handlerPointer, slot, hp) == hp) {
120 SignalError(m_handlers[slot],m_error);
121 }
122 hp = m_handlerPointer;
123 slot = hp +1 ;
124 }
125
126 if (m_extraHandlers != null) {
97 THandler handler;
127 THandler handler;
98 while (m_handlers.TryDequeue(out handler))
128 while (m_extraHandlers.TryDequeue(out handler))
99 SignalError(handler,m_error);
129 SignalError(handler, m_error);
100 }
130 }
131 }
101
132
102 void OnCancelled() {
133 void OnCancelled() {
134 var hp = m_handlerPointer;
135 var slot = hp +1 ;
136 while (slot < m_handlersCommited) {
137 if (Interlocked.CompareExchange(ref m_handlerPointer, slot, hp) == hp) {
138 SignalCancelled(m_handlers[slot]);
139 }
140 hp = m_handlerPointer;
141 slot = hp +1 ;
142 }
143
144 if (m_extraHandlers != null) {
103 THandler handler;
145 THandler handler;
104 while (m_handlers.TryDequeue(out handler))
146 while (m_extraHandlers.TryDequeue(out handler))
105 SignalCancelled(handler);
147 SignalCancelled(handler);
106 }
148 }
149 }
107
150
108 #endregion
151 #endregion
109
152
@@ -145,15 +188,41 namespace Implab {
145
188
146 protected void AddHandler(THandler handler) {
189 protected void AddHandler(THandler handler) {
147
190
148 if (IsResolved) {
191 if (m_state > 1) {
149 InvokeHandler(handler);
150
151 } else {
152 // the promise is in the resolved state, just invoke the handler
192 // the promise is in the resolved state, just invoke the handler
153 m_handlers.Enqueue(handler);
193 InvokeHandler(handler);
194 } else {
195 var slot = Interlocked.Increment(ref m_handlersCount) - 1;
196
197 if (slot < RESERVED_HANDLERS_COUNT) {
198 m_handlers[slot] = handler;
199
200 while (slot != Interlocked.CompareExchange(ref m_handlersCommited, slot + 1, slot)) {
201 }
154
202
203 if (m_state > 1) {
204 do {
205 var hp = m_handlerPointer;
206 slot = hp + 1;
207 if (slot < m_handlersCommited) {
208 if (Interlocked.CompareExchange(ref m_handlerPointer, slot, hp) != hp)
209 continue;
210 InvokeHandler(m_handlers[slot]);
211 }
212 break;
213 } while(true);
214 }
215 } else {
216 if (slot == RESERVED_HANDLERS_COUNT) {
217 m_extraHandlers = new MTQueue<THandler>();
218 } else {
219 while (m_extraHandlers == null)
220 Thread.MemoryBarrier();
221 }
155
222
156 if (IsResolved && m_handlers.TryDequeue(out handler))
223 m_extraHandlers.Enqueue(handler);
224
225 if (m_state > 1 && m_extraHandlers.TryDequeue(out handler))
157 // if the promise have been resolved while we was adding the handler to the queue
226 // if the promise have been resolved while we was adding the handler to the queue
158 // we can't guarantee that someone is still processing it
227 // we can't guarantee that someone is still processing it
159 // therefore we need to fetch a handler from the queue and execute it
228 // therefore we need to fetch a handler from the queue and execute it
@@ -162,6 +231,7 namespace Implab {
162 InvokeHandler(handler);
231 InvokeHandler(handler);
163 }
232 }
164 }
233 }
234 }
165
235
166 protected void InvokeHandler(THandler handler) {
236 protected void InvokeHandler(THandler handler) {
167 switch (m_state) {
237 switch (m_state) {
@@ -147,15 +147,9 namespace Implab.Parallels {
147 public const int DEFAULT_CHUNK_SIZE = 32;
147 public const int DEFAULT_CHUNK_SIZE = 32;
148 public const int MAX_CHUNK_SIZE = 262144;
148 public const int MAX_CHUNK_SIZE = 262144;
149
149
150 readonly int m_chunkSize = DEFAULT_CHUNK_SIZE;
151
152 Chunk m_first;
150 Chunk m_first;
153 Chunk m_last;
151 Chunk m_last;
154
152
155 public AsyncQueue() {
156 m_last = m_first = new Chunk(m_chunkSize);
157 }
158
159 /// <summary>
153 /// <summary>
160 /// Adds the specified value to the queue.
154 /// Adds the specified value to the queue.
161 /// </summary>
155 /// </summary>
@@ -167,7 +161,7 namespace Implab.Parallels {
167 while (last == null || !last.TryEnqueue(value, out extend)) {
161 while (last == null || !last.TryEnqueue(value, out extend)) {
168 // try to extend queue
162 // try to extend queue
169 if (extend || last == null) {
163 if (extend || last == null) {
170 var chunk = new Chunk(m_chunkSize, value);
164 var chunk = new Chunk(DEFAULT_CHUNK_SIZE, value);
171 if (EnqueueChunk(last, chunk))
165 if (EnqueueChunk(last, chunk))
172 break; // success! exit!
166 break; // success! exit!
173 last = m_last;
167 last = m_last;
@@ -215,7 +209,7 namespace Implab.Parallels {
215 var size = Math.Min(length, MAX_CHUNK_SIZE);
209 var size = Math.Min(length, MAX_CHUNK_SIZE);
216
210
217 var chunk = new Chunk(
211 var chunk = new Chunk(
218 Math.Max(size, m_chunkSize),
212 Math.Max(size, DEFAULT_CHUNK_SIZE),
219 data,
213 data,
220 offset,
214 offset,
221 size,
215 size,
@@ -404,7 +398,7 namespace Implab.Parallels {
404
398
405 public void Clear() {
399 public void Clear() {
406 // start the new queue
400 // start the new queue
407 var chunk = new Chunk(m_chunkSize);
401 var chunk = new Chunk(DEFAULT_CHUNK_SIZE);
408
402
409 do {
403 do {
410 Thread.MemoryBarrier();
404 Thread.MemoryBarrier();
@@ -431,7 +425,7 namespace Implab.Parallels {
431
425
432 public T[] Drain() {
426 public T[] Drain() {
433 // start the new queue
427 // start the new queue
434 var chunk = new Chunk(m_chunkSize);
428 var chunk = new Chunk(DEFAULT_CHUNK_SIZE);
435
429
436 do {
430 do {
437 Thread.MemoryBarrier();
431 Thread.MemoryBarrier();
@@ -458,7 +452,7 namespace Implab.Parallels {
458
452
459 T[] ReadChunks(Chunk chunk, object last) {
453 T[] ReadChunks(Chunk chunk, object last) {
460 var result = new List<T>();
454 var result = new List<T>();
461 var buffer = new T[m_chunkSize];
455 var buffer = new T[DEFAULT_CHUNK_SIZE];
462 int actual;
456 int actual;
463 bool recycle;
457 bool recycle;
464 while (chunk != null) {
458 while (chunk != null) {
@@ -30,11 +30,9 namespace Implab.Parallels {
30 }
30 }
31
31
32 protected DispatchPool() {
32 protected DispatchPool() {
33 int maxThreads, maxCP;
34 ThreadPool.GetMaxThreads(out maxThreads, out maxCP);
35
33
36 m_minThreadsLimit = 0;
34 m_minThreadsLimit = 0;
37 m_maxThreadsLimit = maxThreads;
35 m_maxThreadsLimit = Environment.ProcessorCount;
38 }
36 }
39
37
40 protected void InitPool() {
38 protected void InitPool() {
@@ -11,109 +11,23 namespace MonoPlay {
11 if (args == null)
11 if (args == null)
12 throw new ArgumentNullException("args");
12 throw new ArgumentNullException("args");
13
13
14 var q1 = new AsyncQueue<int>();
15 var q2 = new Queue<int>();
16
17 const int count = 10000000;
14 const int count = 10000000;
18
15
19 int res1 = 0, res2 = 0;
20 var t1 = Environment.TickCount;
16 var t1 = Environment.TickCount;
21
17
22 AsyncPool.RunThread(
18 for (int i = 0; i < count; i++) {
23 () => {
19 var p = new Promise<int>();
24 for (var i = 0; i < count; i++)
25 q1.Enqueue(1);
26 Console.WriteLine("done writer #1: {0} ms", Environment.TickCount - t1);
27 },
28 () => {
29 for (var i = 0; i < count; i++)
30 q1.Enqueue(2);
31 Console.WriteLine("done writer #2: {0} ms", Environment.TickCount - t1);
32 },
33 () => {
34 int temp = 0;
35 int i = 0;
36 while (i < count)
37 if (q1.TryDequeue(out temp)) {
38 i++;
39 res1 += temp;
40 }
41 Console.WriteLine("done reader #1: {0} ms", Environment.TickCount - t1);
42 },
43 () => {
44 int temp = 0;
45 int i = 0;
46 while (i < count)
47 if (q1.TryDequeue(out temp)) {
48 i++;
49 res2 += temp;
50 }
51 Console.WriteLine("done reader #2: {0} ms", Environment.TickCount - t1);
52 }
53 )
54 .Bundle()
55 .Join();
56
57 Console.WriteLine("done: {0} ms, summ#1: {1}, summ#2: {2}, total: {3}, count: {4}", Environment.TickCount - t1, res1, res2, res1 + res2, count*2);
58
59 var t2 = Environment.TickCount;
60 Console.WriteLine("MTQueue: {0} ms", t2 - t1);
61
62 t1 = Environment.TickCount;
63
20
64 for (var i = 0; i < count * 2; i++)
21 p.On(x => {}).On(x => {});
65 q2.Enqueue(i);
66
67 for (var i = 0; i < count * 2; i++)
68 q2.Dequeue();
69
70 t2 = Environment.TickCount;
71 Console.WriteLine("Queue: {0} ms", t2 - t1);
72
73 q2 = new Queue<int>();
74
75 t1 = Environment.TickCount;
76
77
22
78 AsyncPool.RunThread(
23 p.Resolve(i);
79 () => {
80 for (var i = 0; i < count; i++)
81 lock (q2)
82 q2.Enqueue(i);
83 },
84 () => {
85 for (var i = 0; i < count; i++)
86 lock (q2)
87 q2.Enqueue(i);
88 },
89 () => {
90 for (int i = 0; i < count ;)
91 lock (q2) {
92 if (q2.Count == 0)
93 continue;
94 q2.Dequeue();
95 i++;
96 }
97
98 },
99 () => {
100 for (int i = 0; i < count ;)
101 lock (q2) {
102 if (q2.Count == 0)
103 continue;
104 q2.Dequeue();
105 i++;
106 }
107
24
108 }
25 }
109 )
110 .Bundle()
111 .Join();
112
26
113
27
114
28
115 t2 = Environment.TickCount;
29 var t2 = Environment.TickCount;
116 Console.WriteLine("Queue+Lock: {0} ms", t2 - t1);
30 Console.WriteLine("done: {0} ms, {1:.00} Mb, {2} GC", t2 - t1, GC.GetTotalMemory(false) / (1024*1024), GC.CollectionCount(0) );
117
31
118 }
32 }
119 }
33 }
General Comments 0
You need to be logged in to leave comments. Login now