| @@ -299,47 +299,134 namespace Implab.Test { | |||||
| 299 | Assert.AreEqual(i, res); |  | 299 | Assert.AreEqual(i, res); | |
| 300 | } |  | 300 | } | |
| 301 |  | 301 | |||
| 302 | int |  | 302 | const int count = 10000000; | |
| 303 | int readers = 0; |  | |||
| 304 | var stop = new ManualResetEvent(false); |  | |||
| 305 | int total = 0; |  | |||
| 306 |  | 303 | |||
| 307 |  |  | 304 | int res1 = 0, res2 = 0; | |
| 308 | const int writersCount = 10; |  | 305 | var t1 = Environment.TickCount; | |
| 309 |  | 306 | |||
| 310 | for (int i = 0; i < writersCount; i++) { |  | 307 | AsyncPool.RunThread( | |
| 311 | Interlocked.Increment(ref writers); |  | 308 | () => { | |
| 312 | AsyncPool |  | 309 | for (var i = 0; i < count; i++) | |
| 313 | . |  | 310 | queue.Enqueue(1); | |
| 314 | for (int ii = 0; ii < itemsPerWriter; ii++) { |  | 311 | Console.WriteLine("done writer #1: {0} ms", Environment.TickCount - t1); | |
| 315 | queue.Enqueue(1); |  | 312 | }, | |
|  | 313 | () => { | |||
|  | 314 | for (var i = 0; i < count; i++) | |||
|  | 315 | queue.Enqueue(2); | |||
|  | 316 | Console.WriteLine("done writer #2: {0} ms", Environment.TickCount - t1); | |||
|  | 317 | }, | |||
|  | 318 | () => { | |||
|  | 319 | int temp; | |||
|  | 320 | int i = 0; | |||
|  | 321 | while (i < count) | |||
|  | 322 | if (queue.TryDequeue(out temp)) { | |||
|  | 323 | i++; | |||
|  | 324 | res1 += temp; | |||
| 316 | } |  | 325 | } | |
| 317 | return 1; |  | 326 | Console.WriteLine("done reader #1: {0} ms", Environment.TickCount - t1); | |
| 318 |  |  | 327 | }, | |
| 319 | .On(() => Interlocked.Decrement(ref writers), PromiseEventType.All); |  | 328 | () => { | |
| 320 | } |  | 329 | int temp; | |
|  | 330 | int i = 0; | |||
|  | 331 | while (i < count) | |||
|  | 332 | if (queue.TryDequeue(out temp)) { | |||
|  | 333 | i++; | |||
|  | 334 | res2 += temp; | |||
|  | 335 | } | |||
|  | 336 | Console.WriteLine("done reader #2: {0} ms", Environment.TickCount - t1); | |||
|  | 337 | } | |||
|  | 338 | ) | |||
|  | 339 | .Combine() | |||
|  | 340 | .Join(); | |||
|  | 341 | ||||
|  | 342 | Assert.AreEqual(count * 3, res1 + res2); | |||
|  | 343 | ||||
|  | 344 | Console.WriteLine( | |||
|  | 345 | "done: {0} ms, summ#1: {1}, summ#2: {2}, total: {3}, count: {4}", | |||
|  | 346 | Environment.TickCount - t1, | |||
|  | 347 | res1, | |||
|  | 348 | res2, | |||
|  | 349 | res1 + res2, | |||
|  | 350 | count | |||
|  | 351 | ); | |||
|  | 352 | } | |||
|  | 353 | ||||
|  | 354 | [TestMethod] | |||
|  | 355 | public void AsyncQueueBatchTest() { | |||
|  | 356 | var queue = new AsyncQueue<int>(); | |||
|  | 357 | ||||
|  | 358 | const int wBatch = 29; | |||
|  | 359 | const int wCount = 400000; | |||
|  | 360 | const int total = wBatch * wCount * 2; | |||
|  | 361 | const int summ = wBatch * wCount * 3; | |||
| 321 |  | 362 | |||
| 322 | for (int i = 0; i < 10; i++) { |  | 363 | int r1 = 0, r2 = 0; | |
| 323 | Interlocked.Increment(ref readers); |  | 364 | const int rBatch = 111; | |
| 324 |  |  | 365 | int read = 0; | |
| 325 | .InvokeNewThread(() => { |  | 366 | ||
| 326 | int t; |  | 367 | var t1 = Environment.TickCount; | |
| 327 | do { |  | 368 | ||
| 328 | while (queue.TryDequeue(out t)) |  | 369 | AsyncPool.RunThread( | |
| 329 | Interlocked.Add(ref total, t); |  | 370 | () => { | |
| 330 | } while (writers > 0); |  | 371 | var buffer = new int[wBatch]; | |
| 331 |  |  | 372 | for(int i = 0; i<wBatch; i++) | |
| 332 |  |  | 373 | buffer[i] = 1; | |
| 333 | .On(() => { |  | 374 | ||
| 334 | Interlocked.Decrement(ref readers); |  | 375 | for(int i =0; i < wCount; i++) | |
| 335 | if (readers == 0) |  | 376 | queue.EnqueueRange(buffer,0,wBatch); | |
| 336 | stop.Set(); |  | 377 | Console.WriteLine("done writer #1: {0} ms", Environment.TickCount - t1); | |
| 337 | }, PromiseEventType.All); |  | 378 | }, | |
| 338 |  |  | 379 | () => { | |
|  | 380 | var buffer = new int[wBatch]; | |||
|  | 381 | for(int i = 0; i<wBatch; i++) | |||
|  | 382 | buffer[i] = 2; | |||
|  | 383 | ||||
|  | 384 | for(int i =0; i < wCount; i++) | |||
|  | 385 | queue.EnqueueRange(buffer,0,wBatch); | |||
|  | 386 | Console.WriteLine("done writer #2: {0} ms", Environment.TickCount - t1); | |||
|  | 387 | }, | |||
|  | 388 | () => { | |||
|  | 389 | var buffer = new int[rBatch]; | |||
| 339 |  | 390 | |||
| 340 | stop.WaitOne(); |  | 391 | while(read < total) { | |
|  | 392 | int actual; | |||
|  | 393 | if (queue.TryDequeueRange(buffer,0,rBatch,out actual)) { | |||
|  | 394 | for(int i=0; i< actual; i++) | |||
|  | 395 | r1 += buffer[i]; | |||
|  | 396 | Interlocked.Add(ref read, actual); | |||
|  | 397 | } | |||
|  | 398 | } | |||
|  | 399 | ||||
|  | 400 | Console.WriteLine("done reader #1: {0} ms", Environment.TickCount - t1); | |||
|  | 401 | }, | |||
|  | 402 | () => { | |||
|  | 403 | var buffer = new int[rBatch]; | |||
| 341 |  | 404 | |||
| 342 | Assert.AreEqual(itemsPerWriter * writersCount, total); |  | 405 | while(read < total) { | |
|  | 406 | int actual; | |||
|  | 407 | if (queue.TryDequeueRange(buffer,0,rBatch,out actual)) { | |||
|  | 408 | for(int i=0; i< actual; i++) | |||
|  | 409 | r2 += buffer[i]; | |||
|  | 410 | Interlocked.Add(ref read, actual); | |||
|  | 411 | } | |||
|  | 412 | } | |||
|  | 413 | ||||
|  | 414 | Console.WriteLine("done reader #2: {0} ms", Environment.TickCount - t1); | |||
|  | 415 | } | |||
|  | 416 | ) | |||
|  | 417 | .Combine() | |||
|  | 418 | .Join(); | |||
|  | 419 | ||||
|  | 420 | Assert.AreEqual(summ , r1 + r2); | |||
|  | 421 | ||||
|  | 422 | Console.WriteLine( | |||
|  | 423 | "done: {0} ms, summ#1: {1}, summ#2: {2}, total: {3}, count: {4}", | |||
|  | 424 | Environment.TickCount - t1, | |||
|  | 425 | r1, | |||
|  | 426 | r2, | |||
|  | 427 | r1 + r2, | |||
|  | 428 | total | |||
|  | 429 | ); | |||
| 343 | } |  | 430 | } | |
| 344 |  | 431 | |||
| 345 | [TestMethod] |  | 432 | [TestMethod] | |
| @@ -75,11 +75,11 namespace Implab.Parallels { | |||||
| 75 | return p; |  | 75 | return p; | |
| 76 | } |  | 76 | } | |
| 77 |  | 77 | |||
| 78 | public static IPromise[] Thread |  | 78 | public static IPromise[] RunThread(params Action[] func) { | |
| 79 | return func.Select(f => InvokeNewThread(f)).ToArray(); |  | 79 | return func.Select(f => InvokeNewThread(f)).ToArray(); | |
| 80 | } |  | 80 | } | |
| 81 |  | 81 | |||
| 82 | public static IPromise<T>[] Thread |  | 82 | public static IPromise<T>[] RunThread<T>(params Func<T>[] func) { | |
| 83 | return func.Select(f => InvokeNewThread(f)).ToArray(); |  | 83 | return func.Select(f => InvokeNewThread(f)).ToArray(); | |
| 84 | } |  | 84 | } | |
| 85 | } |  | 85 | } | |
| @@ -27,6 +27,14 namespace Implab.Parallels { | |||||
| 27 | m_data[0] = value; |  | 27 | m_data[0] = value; | |
| 28 | } |  | 28 | } | |
| 29 |  | 29 | |||
|  | 30 | public Chunk(int size, T[] data, int offset, int length, int alloc) { | |||
|  | 31 | m_size = size; | |||
|  | 32 | m_hi = length; | |||
|  | 33 | m_alloc = alloc; | |||
|  | 34 | m_data = new T[size]; | |||
|  | 35 | Array.Copy(data, offset, m_data, 0, length); | |||
|  | 36 | } | |||
|  | 37 | ||||
| 30 | public int Low { |  | 38 | public int Low { | |
| 31 | get { return m_low; } |  | 39 | get { return m_low; } | |
| 32 | } |  | 40 | } | |
| @@ -35,7 +43,7 namespace Implab.Parallels { | |||||
| 35 | get { return m_hi; } |  | 43 | get { return m_hi; } | |
| 36 | } |  | 44 | } | |
| 37 |  | 45 | |||
| 38 | public bool TryEnqueue(T value,out bool extend) { |  | 46 | public bool TryEnqueue(T value, out bool extend) { | |
| 39 | var alloc = Interlocked.Increment(ref m_alloc) - 1; |  | 47 | var alloc = Interlocked.Increment(ref m_alloc) - 1; | |
| 40 |  | 48 | |||
| 41 | if (alloc >= m_size) { |  | 49 | if (alloc >= m_size) { | |
| @@ -52,7 +60,7 namespace Implab.Parallels { | |||||
| 52 | return true; |  | 60 | return true; | |
| 53 | } |  | 61 | } | |
| 54 |  | 62 | |||
| 55 | public bool TryDequeue(out T value,out bool recycle) { |  | 63 | public bool TryDequeue(out T value, out bool recycle) { | |
| 56 | int low; |  | 64 | int low; | |
| 57 | do { |  | 65 | do { | |
| 58 | low = m_low; |  | 66 | low = m_low; | |
| @@ -73,27 +81,35 namespace Implab.Parallels { | |||||
| 73 | int alloc; |  | 81 | int alloc; | |
| 74 | int allocSize; |  | 82 | int allocSize; | |
| 75 |  | 83 | |||
|  | 84 | // in case the batch size is larger than a free space in chunk | |||
|  | 85 | // tailGap is used to over allocate the space in the chunk to | |||
|  | 86 | // get exclusive permission on creation of the next one. | |||
|  | 87 | int tailGap = 0; | |||
|  | 88 | ||||
| 76 | do { |  | 89 | do { | |
| 77 | alloc = m_alloc; |  | 90 | alloc = m_alloc; | |
| 78 |  | 91 | |||
| 79 | if (alloc > m_size) { |  | 92 | if (alloc > m_size) { | |
| 80 | enqueued = 0; |  | 93 | // the chunk is full and someone already | |
| 81 |  |  | 94 | // creating the new one | |
| 82 |  |  | 95 | enqueued = 0; // nothing was added | |
|  | 96 | extend = false; // the caller shouldn't try to extend the queue | |||
|  | 97 | return false; // nothing was added | |||
| 83 | } |  | 98 | } | |
| 84 |  | 99 | |||
| 85 | allocSize = Math.Min(m_size - |  | 100 | allocSize = Math.Min(m_size - alloc, length); | |
| 86 | } while(alloc != Interlocked.CompareExchange(ref m_alloc, alloc + allocSize, alloc)); |  | 101 | if (allocSize < length) // the chunk doesn't have enough space to hold the whole batch | |
|  | 102 | tailGap = 1; // overallocate space to get exclusive permission to extend queue | |||
|  | 103 | } while(alloc != Interlocked.CompareExchange(ref m_alloc, alloc + allocSize + tailGap, alloc)); | |||
|  | 104 | ||||
|  | 105 | extend = tailGap != 0; | |||
|  | 106 | enqueued = allocSize; | |||
| 87 |  | 107 | |||
| 88 | if (alloc == m_size) { |  | 108 | // if the chunk was already full then length > 0, allocSize = 0, tailGap = 1 | |
| 89 | enqueued = 0; |  | 109 | if (alloc == m_size) | |
| 90 | extend = true; |  | |||
| 91 | return false; |  | 110 | return false; | |
| 92 | } |  | |||
| 93 |  | 111 | |||
| 94 | Array.Copy(batch, offset, m_data, alloc, allocSize); |  | 112 | Array.Copy(batch, offset, m_data, alloc, allocSize); | |
| 95 | enqueued = allocSize; |  | |||
| 96 | extend = false; |  | |||
| 97 |  | 113 | |||
| 98 | while (alloc != Interlocked.CompareExchange(ref m_hi, alloc + allocSize, alloc)) { |  | 114 | while (alloc != Interlocked.CompareExchange(ref m_hi, alloc + allocSize, alloc)) { | |
| 99 | // spin wait for commit |  | 115 | // spin wait for commit | |
| @@ -101,12 +117,35 namespace Implab.Parallels { | |||||
| 101 | return true; |  | 117 | return true; | |
| 102 | } |  | 118 | } | |
| 103 |  | 119 | |||
|  | 120 | public bool TryDequeueBatch(T[] buffer, int offset, int length,out int dequeued, out bool recycle) { | |||
|  | 121 | int low, hi, batchSize; | |||
|  | 122 | ||||
|  | 123 | do { | |||
|  | 124 | low = m_low; | |||
|  | 125 | hi = m_hi; | |||
|  | 126 | if (low >= hi) { | |||
|  | 127 | dequeued = 0; | |||
|  | 128 | recycle = (low == m_size); // recycling could be restarted and we need to signal again | |||
|  | 129 | return false; | |||
|  | 130 | } | |||
|  | 131 | batchSize = Math.Min(hi - low, length); | |||
|  | 132 | } while(low != Interlocked.CompareExchange(ref m_low, low + batchSize, low)); | |||
|  | 133 | ||||
|  | 134 | recycle = (low == m_size - batchSize); | |||
|  | 135 | dequeued = batchSize; | |||
|  | 136 | ||||
|  | 137 | Array.Copy(m_data, low, buffer, offset, batchSize); | |||
|  | 138 | ||||
|  | 139 | return true; | |||
|  | 140 | } | |||
|  | 141 | ||||
| 104 | public T GetAt(int pos) { |  | 142 | public T GetAt(int pos) { | |
| 105 | return m_data[pos]; |  | 143 | return m_data[pos]; | |
| 106 | } |  | 144 | } | |
| 107 | } |  | 145 | } | |
| 108 |  | 146 | |||
| 109 | public const int DEFAULT_CHUNK_SIZE = 32; |  | 147 | public const int DEFAULT_CHUNK_SIZE = 32; | |
|  | 148 | public const int MAX_CHUNK_SIZE = 262144; | |||
| 110 |  | 149 | |||
| 111 | readonly int m_chunkSize = DEFAULT_CHUNK_SIZE; |  | 150 | readonly int m_chunkSize = DEFAULT_CHUNK_SIZE; | |
| 112 |  | 151 | |||
| @@ -117,11 +156,15 namespace Implab.Parallels { | |||||
| 117 | m_last = m_first = new Chunk(m_chunkSize); |  | 156 | m_last = m_first = new Chunk(m_chunkSize); | |
| 118 | } |  | 157 | } | |
| 119 |  | 158 | |||
|  | 159 | /// <summary> | |||
|  | 160 | /// Adds the specified value to the queue. | |||
|  | 161 | /// </summary> | |||
|  | 162 | /// <param name="value">Tha value which will be added to the queue.</param> | |||
| 120 | public void Enqueue(T value) { |  | 163 | public void Enqueue(T value) { | |
| 121 | var last = m_last; |  | 164 | var last = m_last; | |
| 122 | // spin wait to the new chunk |  | 165 | // spin wait to the new chunk | |
| 123 | bool extend = true; |  | 166 | bool extend = true; | |
| 124 | while(last == null || !last.TryEnqueue(value, out extend)) { |  | 167 | while (last == null || !last.TryEnqueue(value, out extend)) { | |
| 125 | // try to extend queue |  | 168 | // try to extend queue | |
| 126 | if (extend || last == null) { |  | 169 | if (extend || last == null) { | |
| 127 | var chunk = new Chunk(m_chunkSize, value); |  | 170 | var chunk = new Chunk(m_chunkSize, value); | |
| @@ -129,14 +172,88 namespace Implab.Parallels { | |||||
| 129 | break; |  | 172 | break; | |
| 130 | last = m_last; |  | 173 | last = m_last; | |
| 131 | } else { |  | 174 | } else { | |
| 132 | while (last |  | 175 | while (last == m_last) { | |
| 133 | Thread.MemoryBarrier(); |  | 176 | Thread.MemoryBarrier(); | |
| 134 | last = m_last; |  | |||
| 135 | } |  | 177 | } | |
|  | 178 | last = m_last; | |||
| 136 | } |  | 179 | } | |
| 137 | } |  | 180 | } | |
| 138 | } |  | 181 | } | |
| 139 |  | 182 | |||
|  | 183 | /// <summary> | |||
|  | 184 | /// Adds the specified data to the queue. | |||
|  | 185 | /// </summary> | |||
|  | 186 | /// <param name="data">The buffer which contains the data to be enqueued.</param> | |||
|  | 187 | /// <param name="offset">The offset of the data in the buffer.</param> | |||
|  | 188 | /// <param name="length">The size of the data to read from the buffer.</param> | |||
|  | 189 | public void EnqueueRange(T[] data, int offset, int length) { | |||
|  | 190 | if (data == null) | |||
|  | 191 | throw new ArgumentNullException("data"); | |||
|  | 192 | if (offset < 0) | |||
|  | 193 | throw new ArgumentOutOfRangeException("offset"); | |||
|  | 194 | if (length < 1 || offset + length > data.Length) | |||
|  | 195 | throw new ArgumentOutOfRangeException("length"); | |||
|  | 196 | ||||
|  | 197 | var last = m_last; | |||
|  | 198 | ||||
|  | 199 | bool extend; | |||
|  | 200 | int enqueued; | |||
|  | 201 | ||||
|  | 202 | while (length > 0) { | |||
|  | 203 | extend = true; | |||
|  | 204 | if (last != null && last.TryEnqueueBatch(data, offset, length, out enqueued, out extend)) { | |||
|  | 205 | length -= enqueued; | |||
|  | 206 | offset += enqueued; | |||
|  | 207 | } | |||
|  | 208 | ||||
|  | 209 | if (extend) { | |||
|  | 210 | // there was no enough space in the chunk | |||
|  | 211 | // or there was no chunks in the queue | |||
|  | 212 | ||||
|  | 213 | while (length > 0) { | |||
|  | 214 | ||||
|  | 215 | var size = Math.Min(length, MAX_CHUNK_SIZE); | |||
|  | 216 | ||||
|  | 217 | var chunk = new Chunk( | |||
|  | 218 | Math.Max(size, m_chunkSize), | |||
|  | 219 | data, | |||
|  | 220 | offset, | |||
|  | 221 | size, | |||
|  | 222 | length // length >= size | |||
|  | 223 | ); | |||
|  | 224 | ||||
|  | 225 | if (!EnqueueChunk(last, chunk)) { | |||
|  | 226 | // looks like the queue has been updated then proceed from the beginning | |||
|  | 227 | last = m_last; | |||
|  | 228 | break; | |||
|  | 229 | } | |||
|  | 230 | ||||
|  | 231 | // we have successfully added the new chunk | |||
|  | 232 | last = chunk; | |||
|  | 233 | length -= size; | |||
|  | 234 | offset += size; | |||
|  | 235 | } | |||
|  | 236 | } else { | |||
|  | 237 | // we don't need to extend the queue, if we successfully enqueued data | |||
|  | 238 | if (length == 0) | |||
|  | 239 | break; | |||
|  | 240 | ||||
|  | 241 | // if we need to wait while someone is extending the queue | |||
|  | 242 | // spinwait | |||
|  | 243 | while (last == m_last) { | |||
|  | 244 | Thread.MemoryBarrier(); | |||
|  | 245 | } | |||
|  | 246 | ||||
|  | 247 | last = m_last; | |||
|  | 248 | } | |||
|  | 249 | } | |||
|  | 250 | } | |||
|  | 251 | ||||
|  | 252 | /// <summary> | |||
|  | 253 | /// Tries to retrieve the first element from the queue. | |||
|  | 254 | /// </summary> | |||
|  | 255 | /// <returns><c>true</c>, if element is dequeued, <c>false</c> otherwise.</returns> | |||
|  | 256 | /// <param name="value">The value of the dequeued element.</param> | |||
| 140 | public bool TryDequeue(out T value) { |  | 257 | public bool TryDequeue(out T value) { | |
| 141 | var chunk = m_first; |  | 258 | var chunk = m_first; | |
| 142 | bool recycle; |  | 259 | bool recycle; | |
| @@ -161,6 +278,92 namespace Implab.Parallels { | |||||
| 161 | return false; |  | 278 | return false; | |
| 162 | } |  | 279 | } | |
| 163 |  | 280 | |||
|  | 281 | /// <summary> | |||
|  | 282 | /// Tries to dequeue the specified amount of data from the queue. | |||
|  | 283 | /// </summary> | |||
|  | 284 | /// <returns><c>true</c>, if data was deuqueued, <c>false</c> otherwise.</returns> | |||
|  | 285 | /// <param name="buffer">The buffer to which the data will be written.</param> | |||
|  | 286 | /// <param name="offset">The offset in the buffer at which the data will be written.</param> | |||
|  | 287 | /// <param name="length">The maximum amount of data to be retrieved.</param> | |||
|  | 288 | /// <param name="dequeued">The actual amout of the retrieved data.</param> | |||
|  | 289 | public bool TryDequeueRange(T[] buffer, int offset, int length, out int dequeued) { | |||
|  | 290 | if (buffer == null) | |||
|  | 291 | throw new ArgumentNullException("buffer"); | |||
|  | 292 | if (offset < 0) | |||
|  | 293 | throw new ArgumentOutOfRangeException("offset"); | |||
|  | 294 | if (length < 1 || offset + length > buffer.Length) | |||
|  | 295 | throw new ArgumentOutOfRangeException("length"); | |||
|  | 296 | ||||
|  | 297 | var chunk = m_first; | |||
|  | 298 | bool recycle; | |||
|  | 299 | dequeued = 0; | |||
|  | 300 | while (chunk != null) { | |||
|  | 301 | ||||
|  | 302 | int actual; | |||
|  | 303 | if (chunk.TryDequeueBatch(buffer, offset, length, out actual, out recycle)) { | |||
|  | 304 | offset += actual; | |||
|  | 305 | length -= actual; | |||
|  | 306 | dequeued += actual; | |||
|  | 307 | } | |||
|  | 308 | ||||
|  | 309 | if (recycle) // this chunk is waste | |||
|  | 310 | RecycleFirstChunk(chunk); | |||
|  | 311 | else if (actual == 0) | |||
|  | 312 | break; // the chunk is usable, but doesn't contain any data (it's the last chunk in the queue) | |||
|  | 313 | ||||
|  | 314 | if (length == 0) | |||
|  | 315 | return true; | |||
|  | 316 | ||||
|  | 317 | // we still may dequeue something | |||
|  | 318 | // try again | |||
|  | 319 | chunk = m_first; | |||
|  | 320 | } | |||
|  | 321 | ||||
|  | 322 | return dequeued != 0; | |||
|  | 323 | } | |||
|  | 324 | ||||
|  | 325 | /// <summary> | |||
|  | 326 | /// Tries to dequeue all remaining data in the first chunk. | |||
|  | 327 | /// </summary> | |||
|  | 328 | /// <returns><c>true</c>, if data was dequeued, <c>false</c> otherwise.</returns> | |||
|  | 329 | /// <param name="buffer">The buffer to which data will be written.</param> | |||
|  | 330 | /// <param name="offset">The offset in the buffer at which the data will be written.</param> | |||
|  | 331 | /// <param name="length">Tha maximum amount of the data to be dequeued.</param> | |||
|  | 332 | /// <param name="dequeued">The actual amount of the dequeued data.</param> | |||
|  | 333 | public bool TryDequeueChunk(T[] buffer, int offset, int length, out int dequeued) { | |||
|  | 334 | if (buffer == null) | |||
|  | 335 | throw new ArgumentNullException("buffer"); | |||
|  | 336 | if (offset < 0) | |||
|  | 337 | throw new ArgumentOutOfRangeException("offset"); | |||
|  | 338 | if (length < 1 || offset + length > buffer.Length) | |||
|  | 339 | throw new ArgumentOutOfRangeException("length"); | |||
|  | 340 | ||||
|  | 341 | var chunk = m_first; | |||
|  | 342 | bool recycle; | |||
|  | 343 | dequeued = 0; | |||
|  | 344 | ||||
|  | 345 | while (chunk != null) { | |||
|  | 346 | ||||
|  | 347 | int actual; | |||
|  | 348 | if (chunk.TryDequeueBatch(buffer, offset, length, out actual, out recycle)) { | |||
|  | 349 | dequeued = actual; | |||
|  | 350 | } | |||
|  | 351 | ||||
|  | 352 | if (recycle) // this chunk is waste | |||
|  | 353 | RecycleFirstChunk(chunk); | |||
|  | 354 | ||||
|  | 355 | // if we have dequeued any data, then return | |||
|  | 356 | if (dequeued != 0) | |||
|  | 357 | return true; | |||
|  | 358 | ||||
|  | 359 | // we still may dequeue something | |||
|  | 360 | // try again | |||
|  | 361 | chunk = m_first; | |||
|  | 362 | } | |||
|  | 363 | ||||
|  | 364 | return false; | |||
|  | 365 | } | |||
|  | 366 | ||||
| 164 | bool EnqueueChunk(Chunk last, Chunk chunk) { |  | 367 | bool EnqueueChunk(Chunk last, Chunk chunk) { | |
| 165 | if (Interlocked.CompareExchange(ref m_last, chunk, last) != last) |  | 368 | if (Interlocked.CompareExchange(ref m_last, chunk, last) != last) | |
| 166 | return false; |  | 369 | return false; | |
| @@ -9,31 +9,31 namespace Implab | |||||
| 9 | { |  | 9 | { | |
| 10 | public static class Safe |  | 10 | public static class Safe | |
| 11 | { |  | 11 | { | |
| 12 | public static void ArgumentMatch(string |  | 12 | public static void ArgumentMatch(string value, string paramName, Regex rx) { | |
| 13 | if (rx == null) |  | 13 | if (rx == null) | |
| 14 | throw new ArgumentNullException("rx"); |  | 14 | throw new ArgumentNullException("rx"); | |
| 15 | if (!rx.IsMatch( |  | 15 | if (!rx.IsMatch(value)) | |
| 16 | throw new ArgumentException(String.Format("The prameter value must match {0}", rx), |  | 16 | throw new ArgumentException(String.Format("The prameter value must match {0}", rx), paramName); | |
| 17 | } |  | 17 | } | |
| 18 |  | 18 | |||
| 19 | public static void ArgumentNotEmpty(string |  | 19 | public static void ArgumentNotEmpty(string value, string paramName) { | |
| 20 | if (String.IsNullOrEmpty( |  | 20 | if (String.IsNullOrEmpty(value)) | |
| 21 | throw new ArgumentException("The parameter can't be empty", |  | 21 | throw new ArgumentException("The parameter can't be empty", paramName); | |
| 22 | } |  | 22 | } | |
| 23 |  | 23 | |||
| 24 | public static void ArgumentNotEmpty<T>(T[] |  | 24 | public static void ArgumentNotEmpty<T>(T[] value, string paramName) { | |
| 25 | if ( |  | 25 | if (value == null || value.Length == 0) | |
| 26 | throw new ArgumentException("The array must be not emty", |  | 26 | throw new ArgumentException("The array must be not emty", paramName); | |
| 27 | } |  | 27 | } | |
| 28 |  | 28 | |||
| 29 | public static void ArgumentNotNull(object |  | 29 | public static void ArgumentNotNull(object value, string paramName) { | |
| 30 | if ( |  | 30 | if (value == null) | |
| 31 | throw new ArgumentNullException( |  | 31 | throw new ArgumentNullException(paramName); | |
| 32 | } |  | 32 | } | |
| 33 |  | 33 | |||
| 34 | public static void ArgumentInRange(int |  | 34 | public static void ArgumentInRange(int value, int min, int max, string paramName) { | |
| 35 | if ( |  | 35 | if (value < min || value > max) | |
| 36 | throw new ArgumentOutOfRangeException( |  | 36 | throw new ArgumentOutOfRangeException(paramName); | |
| 37 | } |  | 37 | } | |
| 38 |  | 38 | |||
| 39 | public static void Dispose<T>(T obj) where T : class |  | 39 | public static void Dispose<T>(T obj) where T : class | |
| @@ -16,36 +16,46 namespace MonoPlay { | |||||
| 16 |  | 16 | |||
| 17 | const int count = 10000000; |  | 17 | const int count = 10000000; | |
| 18 |  | 18 | |||
| 19 |  | 19 | int res1 = 0, res2 = 0; | ||
| 20 | var t1 = Environment.TickCount; |  | 20 | var t1 = Environment.TickCount; | |
| 21 |  | 21 | |||
| 22 | AsyncPool.Thread |  | 22 | AsyncPool.RunThread( | |
| 23 | () => { |  | 23 | () => { | |
| 24 | for (var i = 0; i < count; i++) |  | 24 | for (var i = 0; i < count; i++) | |
| 25 | q1.Enqueue( |  | 25 | q1.Enqueue(1); | |
|  | 26 | Console.WriteLine("done writer #1: {0} ms", Environment.TickCount - t1); | |||
| 26 | }, |  | 27 | }, | |
| 27 | () => { |  | 28 | () => { | |
| 28 | for (var i = 0; i < count; i++) |  | 29 | for (var i = 0; i < count; i++) | |
| 29 | q1.Enqueue( |  | 30 | q1.Enqueue(2); | |
|  | 31 | Console.WriteLine("done writer #2: {0} ms", Environment.TickCount - t1); | |||
| 30 | }, |  | 32 | }, | |
| 31 | () => { |  | 33 | () => { | |
| 32 | int temp = 0; |  | 34 | int temp = 0; | |
| 33 | int i = 0; |  | 35 | int i = 0; | |
| 34 | while (i < count) |  | 36 | while (i < count) | |
| 35 | if (q1.TryDequeue(out temp)) |  | 37 | if (q1.TryDequeue(out temp)) { | |
| 36 | i++; |  | 38 | i++; | |
|  | 39 | res1 += temp; | |||
|  | 40 | } | |||
|  | 41 | Console.WriteLine("done reader #1: {0} ms", Environment.TickCount - t1); | |||
| 37 | }, |  | 42 | }, | |
| 38 | () => { |  | 43 | () => { | |
| 39 | int temp = 0; |  | 44 | int temp = 0; | |
| 40 | int i = 0; |  | 45 | int i = 0; | |
| 41 | while (i < count) |  | 46 | while (i < count) | |
| 42 | if (q1.TryDequeue(out temp)) |  | 47 | if (q1.TryDequeue(out temp)) { | |
| 43 | i++; |  | 48 | i++; | |
|  | 49 | res2 += temp; | |||
|  | 50 | } | |||
|  | 51 | Console.WriteLine("done reader #2: {0} ms", Environment.TickCount - t1); | |||
| 44 | } |  | 52 | } | |
| 45 | ) |  | 53 | ) | |
| 46 | .Combine() |  | 54 | .Combine() | |
| 47 | .Join(); |  | 55 | .Join(); | |
| 48 |  | 56 | |||
|  | 57 | Console.WriteLine("done: {0} ms, summ#1: {1}, summ#2: {2}, total: {3}", Environment.TickCount - t1, res1, res2, res1 + res2); | |||
|  | 58 | ||||
| 49 | var t2 = Environment.TickCount; |  | 59 | var t2 = Environment.TickCount; | |
| 50 | Console.WriteLine("MTQueue: {0} ms", t2 - t1); |  | 60 | Console.WriteLine("MTQueue: {0} ms", t2 - t1); | |
| 51 |  | 61 | |||
| @@ -65,7 +75,7 namespace MonoPlay { | |||||
| 65 | t1 = Environment.TickCount; |  | 75 | t1 = Environment.TickCount; | |
| 66 |  | 76 | |||
| 67 |  | 77 | |||
| 68 | AsyncPool.Thread |  | 78 | AsyncPool.RunThread( | |
| 69 | () => { |  | 79 | () => { | |
| 70 | for (var i = 0; i < count; i++) |  | 80 | for (var i = 0; i < count; i++) | |
| 71 | lock (q2) |  | 81 | lock (q2) | |
        
        General Comments 0
    
    
  
  
                      You need to be logged in to leave comments.
                      Login now
                    
                