using Implab.Formats.Json; using Implab.Parallels; using Implab.Xml; using System; using System.Collections.Concurrent; using System.Collections.Generic; using System.IO; using System.Linq; using System.Text; using System.Threading; using System.Threading.Tasks; using System.Xml; using System.Xml.Serialization; namespace Implab.Playground { public class Program { static void EnqueueRange(ConcurrentQueue q, T[] data, int offset, int len) { for (var i = offset; i < offset + len; i++) q.Enqueue(data[i]); } static bool TryDequeueRange(ConcurrentQueue q,T[] buffer,int offset, int len, out int actual) { actual = 0; T res; while(q.TryDequeue(out res)) { buffer[offset + actual] = res; actual++; if (actual == len) break; } return actual != 0; } static void EnqueueRange(SimpleAsyncQueue q, T[] data, int offset, int len) { for (var i = offset; i < offset + len; i++) q.Enqueue(data[i]); } static bool TryDequeueRange(SimpleAsyncQueue q, T[] buffer, int offset, int len, out int actual) { actual = 0; T res; while (q.TryDequeue(out res)) { buffer[offset + actual] = res; actual++; if (actual == len) break; } return actual != 0; } static void EnqueueRange(AsyncQueue q, T[] data, int offset, int len) { for (var i = offset; i < offset + len; i++) q.Enqueue(data[i]); } static bool TryDequeueRange(AsyncQueue q, T[] buffer, int offset, int len, out int actual) { actual = 0; T res; while (q.TryDequeue(out res)) { buffer[offset + actual] = res; actual++; if (actual == len) break; } return actual != 0; } /*static void EnqueueRange(AsyncQueue q, T[] data, int offset, int len) { q.EnqueueRange(data, offset, len); } static bool TryDequeueRange(AsyncQueue q, T[] buffer, int offset, int len, out int actual) { return q.TryDequeueRange(buffer, offset, len, out actual); }*/ static void Main(string[] args) { //var queue = new ConcurrentQueue(); var queue = new AsyncQueue(); //var queue = new SimpleAsyncQueue(); const int wBatch = 32; const long wCount = 1000000; const long total = wBatch * wCount * 3; long r1 = 0, r2 = 0, r3 = 0; const int rBatch = 1000; long read = 0; var t1 = Environment.TickCount; AsyncPool.RunThread( () => { var buffer = new int[wBatch]; for (int i = 0; i < wBatch; i++) buffer[i] = 1; for (int i = 0; i < wCount; i++) EnqueueRange(queue, buffer, 0, wBatch); Console.WriteLine("done writer #1: {0} ms", Environment.TickCount - t1); }, () => { var buffer = new int[wBatch]; for (int i = 0; i < wBatch; i++) buffer[i] = 1; for (int i = 0; i < wCount; i++) EnqueueRange(queue, buffer, 0, wBatch); Console.WriteLine("done writer #2: {0} ms", Environment.TickCount - t1); }, () => { var buffer = new int[wBatch]; for (int i = 0; i < wBatch; i++) buffer[i] = 1; for (int i = 0; i < wCount; i++) EnqueueRange(queue, buffer, 0, wBatch); Console.WriteLine("done writer #3: {0} ms", Environment.TickCount - t1); }, () => { var buffer = new int[rBatch]; while (read < total) { int actual; if (TryDequeueRange(queue, buffer, 0, rBatch, out actual)) { for (int i = 0; i < actual; i++) r1 += buffer[i]; Interlocked.Add(ref read, actual); } } Console.WriteLine("done reader #1: {0} ms", Environment.TickCount - t1); }/*, () => { var buffer = new int[rBatch]; while (read < total) { int actual; if (TryDequeueRange(queue, buffer, 0, rBatch, out actual)) { for (int i = 0; i < actual; i++) r2 += buffer[i]; Interlocked.Add(ref read, actual); } } Console.WriteLine("done reader #2: {0} ms", Environment.TickCount - t1); }*//*, () => { var buffer = new int[rBatch]; while (read < total) { int actual; if (TryDequeueRange(queue, buffer, 0, rBatch, out actual)) { for (int i = 0; i < actual; i++) r3 += buffer[i]; Interlocked.Add(ref read, actual); } } Console.WriteLine("done reader #3: {0} ms", Environment.TickCount - t1); }*/ ) .PromiseAll() .Join(); Console.WriteLine( "done: {0} ms, summ#1: {1}, summ#2: {2}, total: {3}, count: {4}", Environment.TickCount - t1, r1, r2, r1 + r2 + r3, total ); Console.WriteLine("done"); } } }