##// END OF EJS Templates
sync
cin -
r16:5a4b735ba669 promises
parent child
Show More
@@ -242,6 +242,38 namespace Implab.Test {
242 242 }
243 243
244 244 [TestMethod]
245 public void ChainedMapTest() {
246
247 using (var pool = new WorkerPool(1,10)) {
248 int count = 10000;
249
250 double[] args = new double[count];
251 var rand = new Random();
252
253 for (int i = 0; i < count; i++)
254 args[i] = rand.NextDouble();
255
256 var t = Environment.TickCount;
257 var res = args
258 .ChainedMap(
259 x => pool.Invoke(
260 () => Math.Sin(x * x)
261 ),
262 4
263 )
264 .Join();
265
266 Console.WriteLine("Map complete in {0} ms", Environment.TickCount - t);
267
268 t = Environment.TickCount;
269 for (int i = 0; i < count; i++)
270 Assert.AreEqual(Math.Sin(args[i] * args[i]), res[i]);
271 Console.WriteLine("Verified in {0} ms", Environment.TickCount - t);
272 Console.WriteLine("Max workers: {0}", pool.MaxRunningThreads);
273 }
274 }
275
276 [TestMethod]
245 277 public void ParallelForEachTest() {
246 278
247 279 int count = 100000;
1 NO CONTENT: modified file, binary diff hidden
@@ -39,26 +39,14 namespace Implab.Parallels {
39 39 }
40 40
41 41 protected override bool TryDequeue(out int unit) {
42 int index;
43 unit = -1;
44 do {
45 index = m_next;
46 if (index >= m_source.Length)
47 return false;
48 } while (index != Interlocked.CompareExchange(ref m_next, index + 1, index));
49
50 unit = index;
51 return true;
42 unit = Interlocked.Increment(ref m_next) - 1;
43 return unit >= m_source.Length ? false : true;
52 44 }
53 45
54 46 protected override void InvokeUnit(int unit) {
55 47 try {
56 48 m_action(m_source[unit]);
57 int pending;
58 do {
59 pending = m_pending;
60 } while (pending != Interlocked.CompareExchange(ref m_pending, pending - 1, pending));
61 pending--;
49 var pending = Interlocked.Decrement(ref m_pending);
62 50 if (pending == 0)
63 51 m_promise.Resolve(m_source.Length);
64 52 } catch (Exception e) {
@@ -101,26 +89,14 namespace Implab.Parallels {
101 89 }
102 90
103 91 protected override bool TryDequeue(out int unit) {
104 int index;
105 unit = -1;
106 do {
107 index = m_next;
108 if (index >= m_source.Length)
109 return false;
110 } while (index != Interlocked.CompareExchange(ref m_next, index + 1, index));
111
112 unit = index;
113 return true;
92 unit = Interlocked.Increment(ref m_next) - 1;
93 return unit >= m_source.Length ? false : true;
114 94 }
115 95
116 96 protected override void InvokeUnit(int unit) {
117 97 try {
118 98 m_dest[unit] = m_transform(m_source[unit]);
119 int pending;
120 do {
121 pending = m_pending;
122 } while ( pending != Interlocked.CompareExchange(ref m_pending,pending -1, pending));
123 pending --;
99 var pending = Interlocked.Decrement(ref m_pending);
124 100 if (pending == 0)
125 101 m_promise.Resolve(m_dest);
126 102 } catch (Exception e) {
@@ -148,5 +124,48 namespace Implab.Parallels {
148 124 var iter = new ArrayIterator<TSrc>(source, action, threads);
149 125 return iter.Promise;
150 126 }
127
128 public static Promise<TDst[]> ChainedMap<TSrc, TDst>(this TSrc[] source, ChainedOperation<TSrc, TDst> transform, int threads) {
129 if (source == null)
130 throw new ArgumentNullException("source");
131 if (transform == null)
132 throw new ArgumentNullException("transform");
133 if (threads <= 0)
134 throw new ArgumentOutOfRangeException("Threads number must be greater then zero");
135
136 var promise = new Promise<TDst[]>();
137 var res = new TDst[source.Length];
138 var pending = source.Length;
139 var semaphore = new Semaphore(threads, threads);
140
141 AsyncPool.InvokeNewThread(() => {
142 for (int i = 0; i < source.Length; i++) {
143 if(promise.State != PromiseState.Unresolved)
144 break; // stop processing in case of error or cancellation
145 var idx = i;
146 semaphore.WaitOne();
147 try {
148 var p1 = transform(source[i]);
149 p1.Anyway(() => semaphore.Release());
150 p1.Cancelled(() => semaphore.Release());
151 p1.Then(
152 x => {
153 res[idx] = x;
154 var left = Interlocked.Decrement(ref pending);
155 if (left == 0)
156 promise.Resolve(res);
157 },
158 e => promise.Reject(e)
159 );
160
161 } catch (Exception e) {
162 promise.Reject(e);
163 }
164 }
165 return 0;
166 });
167
168 return promise.Anyway(() => semaphore.Dispose());
169 }
151 170 }
152 171 }
@@ -36,7 +36,6 namespace Implab.Parallels {
36 36 }
37 37 });
38 38 worker.IsBackground = true;
39
40 39 worker.Start();
41 40
42 41 return p;
@@ -63,9 +63,10 namespace Implab.Parallels {
63 63 }
64 64
65 65 bool StartWorker() {
66 var current = m_runningThreads;
66 int current;
67 67 // use spins to allocate slot for the new thread
68 68 do {
69 current = m_runningThreads;
69 70 if (current >= m_maxThreads || m_exitRequired != 0)
70 71 // no more slots left or the pool has been disposed
71 72 return false;
@@ -84,24 +85,33 namespace Implab.Parallels {
84 85
85 86 protected abstract bool TryDequeue(out TUnit unit);
86 87
87 protected virtual void WakeNewWorker() {
88 protected virtual void WakeNewWorker(bool extend) {
88 89 if (m_suspended > 0)
89 90 m_hasTasks.Set();
90 91 else
91 92 StartWorker();
92 93 }
93 94
95 /// <summary>
96 /// Запускает либо новый поток, если раньше не было ни одного потока, либо устанавливает событие пробуждение одного спящего потока
97 /// </summary>
98 protected void StartIfIdle() {
99 int threads;
100 do {
101
102 }
103 }
104
105 protected virtual void Suspend() {
106 m_hasTasks.WaitOne();
107 }
108
94 109 bool FetchTask(out TUnit unit) {
95 110 do {
96 111 // exit if requested
97 112 if (m_exitRequired != 0) {
98 113 // release the thread slot
99 int running;
100 do {
101 running = m_runningThreads;
102 } while (running != Interlocked.CompareExchange(ref m_runningThreads, running - 1, running));
103 running--;
104
114 var running = Interlocked.Decrement(ref m_runningThreads);
105 115 if (running == 0) // it was the last worker
106 116 m_hasTasks.Dispose();
107 117 else
@@ -112,7 +122,7 namespace Implab.Parallels {
112 122
113 123 // fetch task
114 124 if (TryDequeue(out unit)) {
115 WakeNewWorker();
125 WakeNewWorker(true);
116 126 return true;
117 127 }
118 128
@@ -122,19 +132,21 namespace Implab.Parallels {
122 132 do {
123 133 runningThreads = m_runningThreads;
124 134 if (runningThreads <= m_minThreads) {
135 // check wheather this is the last thread and we have tasks
136
125 137 exit = false;
126 138 break;
127 139 }
128 140 } while (runningThreads != Interlocked.CompareExchange(ref m_runningThreads, runningThreads - 1, runningThreads));
129 141
130 142 if (exit) {
131 Interlocked.Decrement(ref m_runningThreads);
132 143 return false;
133 144 }
134 145
135 // keep this thread and wait
146 // entering suspend state
136 147 Interlocked.Increment(ref m_suspended);
137 m_hasTasks.WaitOne();
148 // keep this thread and wait
149 Suspend();
138 150 Interlocked.Decrement(ref m_suspended);
139 151 } while (true);
140 152 }
@@ -10,20 +10,27 namespace Implab.Parallels {
10 10
11 11 MTQueue<Action> m_queue = new MTQueue<Action>();
12 12 int m_queueLength = 0;
13 readonly int m_threshold = 1;
13 14
14 public WorkerPool(int minThreads, int maxThreads)
15 public WorkerPool(int minThreads, int maxThreads, int threshold)
15 16 : base(minThreads, maxThreads) {
16 InitPool();
17 m_threshold = threshold;
18 InitPool();
19 }
20
21 public WorkerPool(int minThreads, int maxThreads) :
22 base(minThreads, maxThreads) {
23 InitPool();
17 24 }
18 25
19 26 public WorkerPool(int threads)
20 27 : base(threads) {
21 InitPool();
28 InitPool();
22 29 }
23 30
24 31 public WorkerPool()
25 32 : base() {
26 InitPool();
33 InitPool();
27 34 }
28 35
29 36 public Promise<T> Invoke<T>(Func<T> task) {
@@ -47,11 +54,20 namespace Implab.Parallels {
47 54
48 55 protected void EnqueueTask(Action unit) {
49 56 Debug.Assert(unit != null);
50 Interlocked.Increment(ref m_queueLength);
57 var len = Interlocked.Increment(ref m_queueLength);
51 58 m_queue.Enqueue(unit);
52 // if there are sleeping threads in the pool wake one
53 // probably this will lead a dry run
54 WakeNewWorker();
59
60 if (ThreadCount == 0)
61 // force to start
62 WakeNewWorker(false);
63 }
64
65 protected override void WakeNewWorker(bool extend) {
66 if (extend && m_queueLength <= m_threshold)
67 // in this case we are in active thread and it request for additional workers
68 // satisfy it only when queue is longer than threshold
69 return;
70 base.WakeNewWorker(extend);
55 71 }
56 72
57 73 protected override bool TryDequeue(out Action unit) {
@@ -65,5 +81,10 namespace Implab.Parallels {
65 81 protected override void InvokeUnit(Action unit) {
66 82 unit();
67 83 }
84
85 protected override void Suspend() {
86 if (m_queueLength == 0)
87 base.Suspend();
88 }
68 89 }
69 90 }
@@ -103,11 +103,16 namespace Implab {
103 103 /// <summary>
104 104 /// Выполняет обещание, сообщая об ошибке
105 105 /// </summary>
106 /// <remarks>
107 /// Поскольку обещание должно работать в многопточной среде, при его выполнении сразу несколько потоков
108 /// могу вернуть ошибку, при этом только первая будет использована в качестве результата, остальные
109 /// будут проигнорированы.
110 /// </remarks>
106 111 /// <param name="error">Исключение возникшее при выполнении операции</param>
107 112 /// <exception cref="InvalidOperationException">Данное обещание уже выполнено</exception>
108 113 public void Reject(Exception error) {
109 114 lock (m_lock) {
110 if (m_state == PromiseState.Cancelled)
115 if (m_state == PromiseState.Cancelled || m_state == PromiseState.Rejected)
111 116 return;
112 117 if (m_state != PromiseState.Unresolved)
113 118 throw new InvalidOperationException("The promise is already resolved");
General Comments 0
You need to be logged in to leave comments. Login now