##// END OF EJS Templates
improved asyncpool usability...
cin -
r120:f1b897999260 v2
parent child
Show More
@@ -1,6 +1,7
1 1 using Implab.Diagnostics;
2 2 using System;
3 3 using System.Threading;
4 using System.Linq;
4 5
5 6 namespace Implab.Parallels {
6 7 /// <summary>
@@ -73,5 +74,13 namespace Implab.Parallels {
73 74
74 75 return p;
75 76 }
77
78 public static IPromise[] ThreadRun(params Action[] func) {
79 return func.Select(f => InvokeNewThread(f)).ToArray();
80 }
81
82 public static IPromise<T>[] ThreadRun<T>(params Func<T>[] func) {
83 return func.Select(f => InvokeNewThread(f)).ToArray();
84 }
76 85 }
77 86 }
@@ -36,19 +36,14 namespace Implab.Parallels {
36 36 }
37 37
38 38 public bool TryEnqueue(T value,out bool extend) {
39 extend = false;
40 int alloc;
41 do {
42 alloc = m_alloc;
43 if (alloc > m_size)
44 return false;
45 } while(alloc != Interlocked.CompareExchange(ref m_alloc, alloc+1, alloc));
39 var alloc = Interlocked.Increment(ref m_alloc) - 1;
46 40
47 if (alloc == m_size) {
48 extend = true;
41 if (alloc >= m_size) {
42 extend = alloc == m_size;
49 43 return false;
50 44 }
51
45
46 extend = false;
52 47 m_data[alloc] = value;
53 48
54 49 while (alloc != Interlocked.CompareExchange(ref m_hi, alloc + 1, alloc)) {
@@ -74,6 +69,38 namespace Implab.Parallels {
74 69 return true;
75 70 }
76 71
72 public bool TryEnqueueBatch(T[] batch, int offset, int length, out int enqueued, out bool extend) {
73 int alloc;
74 int allocSize;
75
76 do {
77 alloc = m_alloc;
78
79 if (alloc > m_size) {
80 enqueued = 0;
81 extend = false;
82 return false;
83 }
84
85 allocSize = Math.Min(m_size - m_alloc, length);
86 } while(alloc != Interlocked.CompareExchange(ref m_alloc, alloc + allocSize, alloc));
87
88 if (alloc == m_size) {
89 enqueued = 0;
90 extend = true;
91 return false;
92 }
93
94 Array.Copy(batch, offset, m_data, alloc, allocSize);
95 enqueued = allocSize;
96 extend = false;
97
98 while (alloc != Interlocked.CompareExchange(ref m_hi, alloc + allocSize, alloc)) {
99 // spin wait for commit
100 }
101 return true;
102 }
103
77 104 public T GetAt(int pos) {
78 105 return m_data[pos];
79 106 }
@@ -19,30 +19,30 namespace MonoPlay {
19 19
20 20 var t1 = Environment.TickCount;
21 21
22 new [] {
23 AsyncPool.InvokeNewThread(() => {
22 AsyncPool.ThreadRun(
23 () => {
24 24 for (var i = 0; i < count; i++)
25 25 q1.Enqueue(i);
26 }),
27 AsyncPool.InvokeNewThread(() => {
26 },
27 () => {
28 28 for (var i = 0; i < count; i++)
29 29 q1.Enqueue(i);
30 }),
31 AsyncPool.InvokeNewThread(() => {
30 },
31 () => {
32 32 int temp = 0;
33 33 int i = 0;
34 34 while (i < count)
35 35 if (q1.TryDequeue(out temp))
36 36 i++;
37 }),
38 AsyncPool.InvokeNewThread(() => {
37 },
38 () => {
39 39 int temp = 0;
40 40 int i = 0;
41 41 while (i < count)
42 42 if (q1.TryDequeue(out temp))
43 43 i++;
44 })
45 }
44 }
45 )
46 46 .Combine()
47 47 .Join();
48 48
@@ -65,18 +65,18 namespace MonoPlay {
65 65 t1 = Environment.TickCount;
66 66
67 67
68 new [] {
69 AsyncPool.InvokeNewThread(() => {
68 AsyncPool.ThreadRun(
69 () => {
70 70 for (var i = 0; i < count; i++)
71 71 lock (q2)
72 72 q2.Enqueue(i);
73 }),
74 AsyncPool.InvokeNewThread(() => {
73 },
74 () => {
75 75 for (var i = 0; i < count; i++)
76 76 lock (q2)
77 77 q2.Enqueue(i);
78 }),
79 AsyncPool.InvokeNewThread(() => {
78 },
79 () => {
80 80 for (int i = 0; i < count ;)
81 81 lock (q2) {
82 82 if (q2.Count == 0)
@@ -85,8 +85,8 namespace MonoPlay {
85 85 i++;
86 86 }
87 87
88 }),
89 AsyncPool.InvokeNewThread(() => {
88 },
89 () => {
90 90 for (int i = 0; i < count ;)
91 91 lock (q2) {
92 92 if (q2.Count == 0)
@@ -95,8 +95,8 namespace MonoPlay {
95 95 i++;
96 96 }
97 97
98 })
99 }
98 }
99 )
100 100 .Combine()
101 101 .Join();
102 102
General Comments 0
You need to be logged in to leave comments. Login now