##// END OF EJS Templates
sync
cin -
r16:5a4b735ba669 promises
parent child
Show More
@@ -242,6 +242,38 namespace Implab.Test {
242 }
242 }
243
243
244 [TestMethod]
244 [TestMethod]
245 public void ChainedMapTest() {
246
247 using (var pool = new WorkerPool(1,10)) {
248 int count = 10000;
249
250 double[] args = new double[count];
251 var rand = new Random();
252
253 for (int i = 0; i < count; i++)
254 args[i] = rand.NextDouble();
255
256 var t = Environment.TickCount;
257 var res = args
258 .ChainedMap(
259 x => pool.Invoke(
260 () => Math.Sin(x * x)
261 ),
262 4
263 )
264 .Join();
265
266 Console.WriteLine("Map complete in {0} ms", Environment.TickCount - t);
267
268 t = Environment.TickCount;
269 for (int i = 0; i < count; i++)
270 Assert.AreEqual(Math.Sin(args[i] * args[i]), res[i]);
271 Console.WriteLine("Verified in {0} ms", Environment.TickCount - t);
272 Console.WriteLine("Max workers: {0}", pool.MaxRunningThreads);
273 }
274 }
275
276 [TestMethod]
245 public void ParallelForEachTest() {
277 public void ParallelForEachTest() {
246
278
247 int count = 100000;
279 int count = 100000;
1 NO CONTENT: modified file, binary diff hidden
NO CONTENT: modified file, binary diff hidden
@@ -39,26 +39,14 namespace Implab.Parallels {
39 }
39 }
40
40
41 protected override bool TryDequeue(out int unit) {
41 protected override bool TryDequeue(out int unit) {
42 int index;
42 unit = Interlocked.Increment(ref m_next) - 1;
43 unit = -1;
43 return unit >= m_source.Length ? false : true;
44 do {
45 index = m_next;
46 if (index >= m_source.Length)
47 return false;
48 } while (index != Interlocked.CompareExchange(ref m_next, index + 1, index));
49
50 unit = index;
51 return true;
52 }
44 }
53
45
54 protected override void InvokeUnit(int unit) {
46 protected override void InvokeUnit(int unit) {
55 try {
47 try {
56 m_action(m_source[unit]);
48 m_action(m_source[unit]);
57 int pending;
49 var pending = Interlocked.Decrement(ref m_pending);
58 do {
59 pending = m_pending;
60 } while (pending != Interlocked.CompareExchange(ref m_pending, pending - 1, pending));
61 pending--;
62 if (pending == 0)
50 if (pending == 0)
63 m_promise.Resolve(m_source.Length);
51 m_promise.Resolve(m_source.Length);
64 } catch (Exception e) {
52 } catch (Exception e) {
@@ -101,26 +89,14 namespace Implab.Parallels {
101 }
89 }
102
90
103 protected override bool TryDequeue(out int unit) {
91 protected override bool TryDequeue(out int unit) {
104 int index;
92 unit = Interlocked.Increment(ref m_next) - 1;
105 unit = -1;
93 return unit >= m_source.Length ? false : true;
106 do {
107 index = m_next;
108 if (index >= m_source.Length)
109 return false;
110 } while (index != Interlocked.CompareExchange(ref m_next, index + 1, index));
111
112 unit = index;
113 return true;
114 }
94 }
115
95
116 protected override void InvokeUnit(int unit) {
96 protected override void InvokeUnit(int unit) {
117 try {
97 try {
118 m_dest[unit] = m_transform(m_source[unit]);
98 m_dest[unit] = m_transform(m_source[unit]);
119 int pending;
99 var pending = Interlocked.Decrement(ref m_pending);
120 do {
121 pending = m_pending;
122 } while ( pending != Interlocked.CompareExchange(ref m_pending,pending -1, pending));
123 pending --;
124 if (pending == 0)
100 if (pending == 0)
125 m_promise.Resolve(m_dest);
101 m_promise.Resolve(m_dest);
126 } catch (Exception e) {
102 } catch (Exception e) {
@@ -148,5 +124,48 namespace Implab.Parallels {
148 var iter = new ArrayIterator<TSrc>(source, action, threads);
124 var iter = new ArrayIterator<TSrc>(source, action, threads);
149 return iter.Promise;
125 return iter.Promise;
150 }
126 }
127
128 public static Promise<TDst[]> ChainedMap<TSrc, TDst>(this TSrc[] source, ChainedOperation<TSrc, TDst> transform, int threads) {
129 if (source == null)
130 throw new ArgumentNullException("source");
131 if (transform == null)
132 throw new ArgumentNullException("transform");
133 if (threads <= 0)
134 throw new ArgumentOutOfRangeException("Threads number must be greater then zero");
135
136 var promise = new Promise<TDst[]>();
137 var res = new TDst[source.Length];
138 var pending = source.Length;
139 var semaphore = new Semaphore(threads, threads);
140
141 AsyncPool.InvokeNewThread(() => {
142 for (int i = 0; i < source.Length; i++) {
143 if(promise.State != PromiseState.Unresolved)
144 break; // stop processing in case of error or cancellation
145 var idx = i;
146 semaphore.WaitOne();
147 try {
148 var p1 = transform(source[i]);
149 p1.Anyway(() => semaphore.Release());
150 p1.Cancelled(() => semaphore.Release());
151 p1.Then(
152 x => {
153 res[idx] = x;
154 var left = Interlocked.Decrement(ref pending);
155 if (left == 0)
156 promise.Resolve(res);
157 },
158 e => promise.Reject(e)
159 );
160
161 } catch (Exception e) {
162 promise.Reject(e);
151 }
163 }
152 }
164 }
165 return 0;
166 });
167
168 return promise.Anyway(() => semaphore.Dispose());
169 }
170 }
171 }
@@ -36,7 +36,6 namespace Implab.Parallels {
36 }
36 }
37 });
37 });
38 worker.IsBackground = true;
38 worker.IsBackground = true;
39
40 worker.Start();
39 worker.Start();
41
40
42 return p;
41 return p;
@@ -63,9 +63,10 namespace Implab.Parallels {
63 }
63 }
64
64
65 bool StartWorker() {
65 bool StartWorker() {
66 var current = m_runningThreads;
66 int current;
67 // use spins to allocate slot for the new thread
67 // use spins to allocate slot for the new thread
68 do {
68 do {
69 current = m_runningThreads;
69 if (current >= m_maxThreads || m_exitRequired != 0)
70 if (current >= m_maxThreads || m_exitRequired != 0)
70 // no more slots left or the pool has been disposed
71 // no more slots left or the pool has been disposed
71 return false;
72 return false;
@@ -84,24 +85,33 namespace Implab.Parallels {
84
85
85 protected abstract bool TryDequeue(out TUnit unit);
86 protected abstract bool TryDequeue(out TUnit unit);
86
87
87 protected virtual void WakeNewWorker() {
88 protected virtual void WakeNewWorker(bool extend) {
88 if (m_suspended > 0)
89 if (m_suspended > 0)
89 m_hasTasks.Set();
90 m_hasTasks.Set();
90 else
91 else
91 StartWorker();
92 StartWorker();
92 }
93 }
93
94
95 /// <summary>
96 /// Запускает либо новый поток, если раньше не было ни одного потока, либо устанавливает событие пробуждение одного спящего потока
97 /// </summary>
98 protected void StartIfIdle() {
99 int threads;
100 do {
101
102 }
103 }
104
105 protected virtual void Suspend() {
106 m_hasTasks.WaitOne();
107 }
108
94 bool FetchTask(out TUnit unit) {
109 bool FetchTask(out TUnit unit) {
95 do {
110 do {
96 // exit if requested
111 // exit if requested
97 if (m_exitRequired != 0) {
112 if (m_exitRequired != 0) {
98 // release the thread slot
113 // release the thread slot
99 int running;
114 var running = Interlocked.Decrement(ref m_runningThreads);
100 do {
101 running = m_runningThreads;
102 } while (running != Interlocked.CompareExchange(ref m_runningThreads, running - 1, running));
103 running--;
104
105 if (running == 0) // it was the last worker
115 if (running == 0) // it was the last worker
106 m_hasTasks.Dispose();
116 m_hasTasks.Dispose();
107 else
117 else
@@ -112,7 +122,7 namespace Implab.Parallels {
112
122
113 // fetch task
123 // fetch task
114 if (TryDequeue(out unit)) {
124 if (TryDequeue(out unit)) {
115 WakeNewWorker();
125 WakeNewWorker(true);
116 return true;
126 return true;
117 }
127 }
118
128
@@ -122,19 +132,21 namespace Implab.Parallels {
122 do {
132 do {
123 runningThreads = m_runningThreads;
133 runningThreads = m_runningThreads;
124 if (runningThreads <= m_minThreads) {
134 if (runningThreads <= m_minThreads) {
135 // check wheather this is the last thread and we have tasks
136
125 exit = false;
137 exit = false;
126 break;
138 break;
127 }
139 }
128 } while (runningThreads != Interlocked.CompareExchange(ref m_runningThreads, runningThreads - 1, runningThreads));
140 } while (runningThreads != Interlocked.CompareExchange(ref m_runningThreads, runningThreads - 1, runningThreads));
129
141
130 if (exit) {
142 if (exit) {
131 Interlocked.Decrement(ref m_runningThreads);
132 return false;
143 return false;
133 }
144 }
134
145
135 // keep this thread and wait
146 // entering suspend state
136 Interlocked.Increment(ref m_suspended);
147 Interlocked.Increment(ref m_suspended);
137 m_hasTasks.WaitOne();
148 // keep this thread and wait
149 Suspend();
138 Interlocked.Decrement(ref m_suspended);
150 Interlocked.Decrement(ref m_suspended);
139 } while (true);
151 } while (true);
140 }
152 }
@@ -10,9 +10,16 namespace Implab.Parallels {
10
10
11 MTQueue<Action> m_queue = new MTQueue<Action>();
11 MTQueue<Action> m_queue = new MTQueue<Action>();
12 int m_queueLength = 0;
12 int m_queueLength = 0;
13 readonly int m_threshold = 1;
13
14
14 public WorkerPool(int minThreads, int maxThreads)
15 public WorkerPool(int minThreads, int maxThreads, int threshold)
15 : base(minThreads, maxThreads) {
16 : base(minThreads, maxThreads) {
17 m_threshold = threshold;
18 InitPool();
19 }
20
21 public WorkerPool(int minThreads, int maxThreads) :
22 base(minThreads, maxThreads) {
16 InitPool();
23 InitPool();
17 }
24 }
18
25
@@ -47,11 +54,20 namespace Implab.Parallels {
47
54
48 protected void EnqueueTask(Action unit) {
55 protected void EnqueueTask(Action unit) {
49 Debug.Assert(unit != null);
56 Debug.Assert(unit != null);
50 Interlocked.Increment(ref m_queueLength);
57 var len = Interlocked.Increment(ref m_queueLength);
51 m_queue.Enqueue(unit);
58 m_queue.Enqueue(unit);
52 // if there are sleeping threads in the pool wake one
59
53 // probably this will lead a dry run
60 if (ThreadCount == 0)
54 WakeNewWorker();
61 // force to start
62 WakeNewWorker(false);
63 }
64
65 protected override void WakeNewWorker(bool extend) {
66 if (extend && m_queueLength <= m_threshold)
67 // in this case we are in active thread and it request for additional workers
68 // satisfy it only when queue is longer than threshold
69 return;
70 base.WakeNewWorker(extend);
55 }
71 }
56
72
57 protected override bool TryDequeue(out Action unit) {
73 protected override bool TryDequeue(out Action unit) {
@@ -65,5 +81,10 namespace Implab.Parallels {
65 protected override void InvokeUnit(Action unit) {
81 protected override void InvokeUnit(Action unit) {
66 unit();
82 unit();
67 }
83 }
84
85 protected override void Suspend() {
86 if (m_queueLength == 0)
87 base.Suspend();
68 }
88 }
69 }
89 }
90 }
@@ -103,11 +103,16 namespace Implab {
103 /// <summary>
103 /// <summary>
104 /// Выполняет обещание, сообщая об ошибке
104 /// Выполняет обещание, сообщая об ошибке
105 /// </summary>
105 /// </summary>
106 /// <remarks>
107 /// Поскольку обещание должно работать в многопточной среде, при его выполнении сразу несколько потоков
108 /// могу вернуть ошибку, при этом только первая будет использована в качестве результата, остальные
109 /// будут проигнорированы.
110 /// </remarks>
106 /// <param name="error">Исключение возникшее при выполнении операции</param>
111 /// <param name="error">Исключение возникшее при выполнении операции</param>
107 /// <exception cref="InvalidOperationException">Данное обещание уже выполнено</exception>
112 /// <exception cref="InvalidOperationException">Данное обещание уже выполнено</exception>
108 public void Reject(Exception error) {
113 public void Reject(Exception error) {
109 lock (m_lock) {
114 lock (m_lock) {
110 if (m_state == PromiseState.Cancelled)
115 if (m_state == PromiseState.Cancelled || m_state == PromiseState.Rejected)
111 return;
116 return;
112 if (m_state != PromiseState.Unresolved)
117 if (m_state != PromiseState.Unresolved)
113 throw new InvalidOperationException("The promise is already resolved");
118 throw new InvalidOperationException("The promise is already resolved");
General Comments 0
You need to be logged in to leave comments. Login now