##// END OF EJS Templates
improved asyncpool usability...
cin -
r120:f1b897999260 v2
parent child
Show More
@@ -1,6 +1,7
1 using Implab.Diagnostics;
1 using Implab.Diagnostics;
2 using System;
2 using System;
3 using System.Threading;
3 using System.Threading;
4 using System.Linq;
4
5
5 namespace Implab.Parallels {
6 namespace Implab.Parallels {
6 /// <summary>
7 /// <summary>
@@ -73,5 +74,13 namespace Implab.Parallels {
73
74
74 return p;
75 return p;
75 }
76 }
77
78 public static IPromise[] ThreadRun(params Action[] func) {
79 return func.Select(f => InvokeNewThread(f)).ToArray();
80 }
81
82 public static IPromise<T>[] ThreadRun<T>(params Func<T>[] func) {
83 return func.Select(f => InvokeNewThread(f)).ToArray();
76 }
84 }
77 }
85 }
86 }
@@ -36,19 +36,14 namespace Implab.Parallels {
36 }
36 }
37
37
38 public bool TryEnqueue(T value,out bool extend) {
38 public bool TryEnqueue(T value,out bool extend) {
39 extend = false;
39 var alloc = Interlocked.Increment(ref m_alloc) - 1;
40 int alloc;
41 do {
42 alloc = m_alloc;
43 if (alloc > m_size)
44 return false;
45 } while(alloc != Interlocked.CompareExchange(ref m_alloc, alloc+1, alloc));
46
40
47 if (alloc == m_size) {
41 if (alloc >= m_size) {
48 extend = true;
42 extend = alloc == m_size;
49 return false;
43 return false;
50 }
44 }
51
45
46 extend = false;
52 m_data[alloc] = value;
47 m_data[alloc] = value;
53
48
54 while (alloc != Interlocked.CompareExchange(ref m_hi, alloc + 1, alloc)) {
49 while (alloc != Interlocked.CompareExchange(ref m_hi, alloc + 1, alloc)) {
@@ -74,6 +69,38 namespace Implab.Parallels {
74 return true;
69 return true;
75 }
70 }
76
71
72 public bool TryEnqueueBatch(T[] batch, int offset, int length, out int enqueued, out bool extend) {
73 int alloc;
74 int allocSize;
75
76 do {
77 alloc = m_alloc;
78
79 if (alloc > m_size) {
80 enqueued = 0;
81 extend = false;
82 return false;
83 }
84
85 allocSize = Math.Min(m_size - m_alloc, length);
86 } while(alloc != Interlocked.CompareExchange(ref m_alloc, alloc + allocSize, alloc));
87
88 if (alloc == m_size) {
89 enqueued = 0;
90 extend = true;
91 return false;
92 }
93
94 Array.Copy(batch, offset, m_data, alloc, allocSize);
95 enqueued = allocSize;
96 extend = false;
97
98 while (alloc != Interlocked.CompareExchange(ref m_hi, alloc + allocSize, alloc)) {
99 // spin wait for commit
100 }
101 return true;
102 }
103
77 public T GetAt(int pos) {
104 public T GetAt(int pos) {
78 return m_data[pos];
105 return m_data[pos];
79 }
106 }
@@ -19,30 +19,30 namespace MonoPlay {
19
19
20 var t1 = Environment.TickCount;
20 var t1 = Environment.TickCount;
21
21
22 new [] {
22 AsyncPool.ThreadRun(
23 AsyncPool.InvokeNewThread(() => {
23 () => {
24 for (var i = 0; i < count; i++)
24 for (var i = 0; i < count; i++)
25 q1.Enqueue(i);
25 q1.Enqueue(i);
26 }),
26 },
27 AsyncPool.InvokeNewThread(() => {
27 () => {
28 for (var i = 0; i < count; i++)
28 for (var i = 0; i < count; i++)
29 q1.Enqueue(i);
29 q1.Enqueue(i);
30 }),
30 },
31 AsyncPool.InvokeNewThread(() => {
31 () => {
32 int temp = 0;
32 int temp = 0;
33 int i = 0;
33 int i = 0;
34 while (i < count)
34 while (i < count)
35 if (q1.TryDequeue(out temp))
35 if (q1.TryDequeue(out temp))
36 i++;
36 i++;
37 }),
37 },
38 AsyncPool.InvokeNewThread(() => {
38 () => {
39 int temp = 0;
39 int temp = 0;
40 int i = 0;
40 int i = 0;
41 while (i < count)
41 while (i < count)
42 if (q1.TryDequeue(out temp))
42 if (q1.TryDequeue(out temp))
43 i++;
43 i++;
44 })
45 }
44 }
45 )
46 .Combine()
46 .Combine()
47 .Join();
47 .Join();
48
48
@@ -65,18 +65,18 namespace MonoPlay {
65 t1 = Environment.TickCount;
65 t1 = Environment.TickCount;
66
66
67
67
68 new [] {
68 AsyncPool.ThreadRun(
69 AsyncPool.InvokeNewThread(() => {
69 () => {
70 for (var i = 0; i < count; i++)
70 for (var i = 0; i < count; i++)
71 lock (q2)
71 lock (q2)
72 q2.Enqueue(i);
72 q2.Enqueue(i);
73 }),
73 },
74 AsyncPool.InvokeNewThread(() => {
74 () => {
75 for (var i = 0; i < count; i++)
75 for (var i = 0; i < count; i++)
76 lock (q2)
76 lock (q2)
77 q2.Enqueue(i);
77 q2.Enqueue(i);
78 }),
78 },
79 AsyncPool.InvokeNewThread(() => {
79 () => {
80 for (int i = 0; i < count ;)
80 for (int i = 0; i < count ;)
81 lock (q2) {
81 lock (q2) {
82 if (q2.Count == 0)
82 if (q2.Count == 0)
@@ -85,8 +85,8 namespace MonoPlay {
85 i++;
85 i++;
86 }
86 }
87
87
88 }),
88 },
89 AsyncPool.InvokeNewThread(() => {
89 () => {
90 for (int i = 0; i < count ;)
90 for (int i = 0; i < count ;)
91 lock (q2) {
91 lock (q2) {
92 if (q2.Count == 0)
92 if (q2.Count == 0)
@@ -95,8 +95,8 namespace MonoPlay {
95 i++;
95 i++;
96 }
96 }
97
97
98 })
99 }
98 }
99 )
100 .Combine()
100 .Combine()
101 .Join();
101 .Join();
102
102
General Comments 0
You need to be logged in to leave comments. Login now