##// END OF EJS Templates
improved performance of promises
cin -
r125:f803565868a4 v2
parent child
Show More
@@ -633,7 +633,7 namespace Implab.Test {
633 633 [TestMethod]
634 634 public void ChainedMapTest() {
635 635
636 using (var pool = new WorkerPool(0,10,1)) {
636 using (var pool = new WorkerPool()) {
637 637 const int count = 10000;
638 638
639 639 var args = new double[count];
@@ -12,10 +12,16 namespace Implab {
12 12 const int REJECTED_STATE = 3;
13 13 const int CANCELLED_STATE = 4;
14 14
15 const int RESERVED_HANDLERS_COUNT = 4;
16
15 17 int m_state;
16 18 Exception m_error;
19 int m_handlersCount;
17 20
18 readonly AsyncQueue<THandler> m_handlers = new AsyncQueue<THandler>();
21 readonly THandler[] m_handlers = new THandler[RESERVED_HANDLERS_COUNT];
22 MTQueue<THandler> m_extraHandlers;
23 int m_handlerPointer = -1;
24 int m_handlersCommited;
19 25
20 26 #region state managment
21 27 bool BeginTransit() {
@@ -88,21 +94,58 namespace Implab {
88 94 protected abstract void SignalCancelled(THandler handler);
89 95
90 96 void OnSuccess() {
91 THandler handler;
92 while (m_handlers.TryDequeue(out handler))
93 SignalSuccess(handler);
97 var hp = m_handlerPointer;
98 var slot = hp +1 ;
99 while (slot < m_handlersCommited) {
100 if (Interlocked.CompareExchange(ref m_handlerPointer, slot, hp) == hp) {
101 SignalSuccess(m_handlers[slot]);
102 }
103 hp = m_handlerPointer;
104 slot = hp +1 ;
105 }
106
107
108 if (m_extraHandlers != null) {
109 THandler handler;
110 while (m_extraHandlers.TryDequeue(out handler))
111 SignalSuccess(handler);
112 }
94 113 }
95 114
96 115 void OnError() {
97 THandler handler;
98 while (m_handlers.TryDequeue(out handler))
99 SignalError(handler,m_error);
116 var hp = m_handlerPointer;
117 var slot = hp +1 ;
118 while (slot < m_handlersCommited) {
119 if (Interlocked.CompareExchange(ref m_handlerPointer, slot, hp) == hp) {
120 SignalError(m_handlers[slot],m_error);
121 }
122 hp = m_handlerPointer;
123 slot = hp +1 ;
124 }
125
126 if (m_extraHandlers != null) {
127 THandler handler;
128 while (m_extraHandlers.TryDequeue(out handler))
129 SignalError(handler, m_error);
130 }
100 131 }
101 132
102 133 void OnCancelled() {
103 THandler handler;
104 while (m_handlers.TryDequeue(out handler))
105 SignalCancelled(handler);
134 var hp = m_handlerPointer;
135 var slot = hp +1 ;
136 while (slot < m_handlersCommited) {
137 if (Interlocked.CompareExchange(ref m_handlerPointer, slot, hp) == hp) {
138 SignalCancelled(m_handlers[slot]);
139 }
140 hp = m_handlerPointer;
141 slot = hp +1 ;
142 }
143
144 if (m_extraHandlers != null) {
145 THandler handler;
146 while (m_extraHandlers.TryDequeue(out handler))
147 SignalCancelled(handler);
148 }
106 149 }
107 150
108 151 #endregion
@@ -145,21 +188,48 namespace Implab {
145 188
146 189 protected void AddHandler(THandler handler) {
147 190
148 if (IsResolved) {
149 InvokeHandler(handler);
150
151 } else {
191 if (m_state > 1) {
152 192 // the promise is in the resolved state, just invoke the handler
153 m_handlers.Enqueue(handler);
193 InvokeHandler(handler);
194 } else {
195 var slot = Interlocked.Increment(ref m_handlersCount) - 1;
196
197 if (slot < RESERVED_HANDLERS_COUNT) {
198 m_handlers[slot] = handler;
199
200 while (slot != Interlocked.CompareExchange(ref m_handlersCommited, slot + 1, slot)) {
201 }
154 202
203 if (m_state > 1) {
204 do {
205 var hp = m_handlerPointer;
206 slot = hp + 1;
207 if (slot < m_handlersCommited) {
208 if (Interlocked.CompareExchange(ref m_handlerPointer, slot, hp) != hp)
209 continue;
210 InvokeHandler(m_handlers[slot]);
211 }
212 break;
213 } while(true);
214 }
215 } else {
216 if (slot == RESERVED_HANDLERS_COUNT) {
217 m_extraHandlers = new MTQueue<THandler>();
218 } else {
219 while (m_extraHandlers == null)
220 Thread.MemoryBarrier();
221 }
155 222
156 if (IsResolved && m_handlers.TryDequeue(out handler))
223 m_extraHandlers.Enqueue(handler);
224
225 if (m_state > 1 && m_extraHandlers.TryDequeue(out handler))
157 226 // if the promise have been resolved while we was adding the handler to the queue
158 227 // we can't guarantee that someone is still processing it
159 228 // therefore we need to fetch a handler from the queue and execute it
160 229 // note that fetched handler may be not the one that we have added
161 230 // even we can fetch no handlers at all :)
162 231 InvokeHandler(handler);
232 }
163 233 }
164 234 }
165 235
@@ -147,15 +147,9 namespace Implab.Parallels {
147 147 public const int DEFAULT_CHUNK_SIZE = 32;
148 148 public const int MAX_CHUNK_SIZE = 262144;
149 149
150 readonly int m_chunkSize = DEFAULT_CHUNK_SIZE;
151
152 150 Chunk m_first;
153 151 Chunk m_last;
154 152
155 public AsyncQueue() {
156 m_last = m_first = new Chunk(m_chunkSize);
157 }
158
159 153 /// <summary>
160 154 /// Adds the specified value to the queue.
161 155 /// </summary>
@@ -167,7 +161,7 namespace Implab.Parallels {
167 161 while (last == null || !last.TryEnqueue(value, out extend)) {
168 162 // try to extend queue
169 163 if (extend || last == null) {
170 var chunk = new Chunk(m_chunkSize, value);
164 var chunk = new Chunk(DEFAULT_CHUNK_SIZE, value);
171 165 if (EnqueueChunk(last, chunk))
172 166 break; // success! exit!
173 167 last = m_last;
@@ -215,7 +209,7 namespace Implab.Parallels {
215 209 var size = Math.Min(length, MAX_CHUNK_SIZE);
216 210
217 211 var chunk = new Chunk(
218 Math.Max(size, m_chunkSize),
212 Math.Max(size, DEFAULT_CHUNK_SIZE),
219 213 data,
220 214 offset,
221 215 size,
@@ -404,7 +398,7 namespace Implab.Parallels {
404 398
405 399 public void Clear() {
406 400 // start the new queue
407 var chunk = new Chunk(m_chunkSize);
401 var chunk = new Chunk(DEFAULT_CHUNK_SIZE);
408 402
409 403 do {
410 404 Thread.MemoryBarrier();
@@ -431,7 +425,7 namespace Implab.Parallels {
431 425
432 426 public T[] Drain() {
433 427 // start the new queue
434 var chunk = new Chunk(m_chunkSize);
428 var chunk = new Chunk(DEFAULT_CHUNK_SIZE);
435 429
436 430 do {
437 431 Thread.MemoryBarrier();
@@ -458,7 +452,7 namespace Implab.Parallels {
458 452
459 453 T[] ReadChunks(Chunk chunk, object last) {
460 454 var result = new List<T>();
461 var buffer = new T[m_chunkSize];
455 var buffer = new T[DEFAULT_CHUNK_SIZE];
462 456 int actual;
463 457 bool recycle;
464 458 while (chunk != null) {
@@ -30,11 +30,9 namespace Implab.Parallels {
30 30 }
31 31
32 32 protected DispatchPool() {
33 int maxThreads, maxCP;
34 ThreadPool.GetMaxThreads(out maxThreads, out maxCP);
35 33
36 34 m_minThreadsLimit = 0;
37 m_maxThreadsLimit = maxThreads;
35 m_maxThreadsLimit = Environment.ProcessorCount;
38 36 }
39 37
40 38 protected void InitPool() {
@@ -11,109 +11,23 namespace MonoPlay {
11 11 if (args == null)
12 12 throw new ArgumentNullException("args");
13 13
14 var q1 = new AsyncQueue<int>();
15 var q2 = new Queue<int>();
16
17 14 const int count = 10000000;
18 15
19 int res1 = 0, res2 = 0;
20 16 var t1 = Environment.TickCount;
21 17
22 AsyncPool.RunThread(
23 () => {
24 for (var i = 0; i < count; i++)
25 q1.Enqueue(1);
26 Console.WriteLine("done writer #1: {0} ms", Environment.TickCount - t1);
27 },
28 () => {
29 for (var i = 0; i < count; i++)
30 q1.Enqueue(2);
31 Console.WriteLine("done writer #2: {0} ms", Environment.TickCount - t1);
32 },
33 () => {
34 int temp = 0;
35 int i = 0;
36 while (i < count)
37 if (q1.TryDequeue(out temp)) {
38 i++;
39 res1 += temp;
40 }
41 Console.WriteLine("done reader #1: {0} ms", Environment.TickCount - t1);
42 },
43 () => {
44 int temp = 0;
45 int i = 0;
46 while (i < count)
47 if (q1.TryDequeue(out temp)) {
48 i++;
49 res2 += temp;
50 }
51 Console.WriteLine("done reader #2: {0} ms", Environment.TickCount - t1);
52 }
53 )
54 .Bundle()
55 .Join();
18 for (int i = 0; i < count; i++) {
19 var p = new Promise<int>();
20
21 p.On(x => {}).On(x => {});
56 22
57 Console.WriteLine("done: {0} ms, summ#1: {1}, summ#2: {2}, total: {3}, count: {4}", Environment.TickCount - t1, res1, res2, res1 + res2, count*2);
23 p.Resolve(i);
24
25 }
26
27
58 28
59 29 var t2 = Environment.TickCount;
60 Console.WriteLine("MTQueue: {0} ms", t2 - t1);
61
62 t1 = Environment.TickCount;
63
64 for (var i = 0; i < count * 2; i++)
65 q2.Enqueue(i);
66
67 for (var i = 0; i < count * 2; i++)
68 q2.Dequeue();
69
70 t2 = Environment.TickCount;
71 Console.WriteLine("Queue: {0} ms", t2 - t1);
72
73 q2 = new Queue<int>();
74
75 t1 = Environment.TickCount;
76
77
78 AsyncPool.RunThread(
79 () => {
80 for (var i = 0; i < count; i++)
81 lock (q2)
82 q2.Enqueue(i);
83 },
84 () => {
85 for (var i = 0; i < count; i++)
86 lock (q2)
87 q2.Enqueue(i);
88 },
89 () => {
90 for (int i = 0; i < count ;)
91 lock (q2) {
92 if (q2.Count == 0)
93 continue;
94 q2.Dequeue();
95 i++;
96 }
97
98 },
99 () => {
100 for (int i = 0; i < count ;)
101 lock (q2) {
102 if (q2.Count == 0)
103 continue;
104 q2.Dequeue();
105 i++;
106 }
107
108 }
109 )
110 .Bundle()
111 .Join();
112
113
114
115 t2 = Environment.TickCount;
116 Console.WriteLine("Queue+Lock: {0} ms", t2 - t1);
30 Console.WriteLine("done: {0} ms, {1:.00} Mb, {2} GC", t2 - t1, GC.GetTotalMemory(false) / (1024*1024), GC.CollectionCount(0) );
117 31
118 32 }
119 33 }
General Comments 0
You need to be logged in to leave comments. Login now