##// END OF EJS Templates
Слияние с promises
cin -
r18:0c924dff5498 merge default
parent child
Show More
@@ -0,0 +1,17
1 using Implab.Parallels;
2 using System;
3 using System.Collections.Generic;
4 using System.Linq;
5 using System.Text;
6 using System.Threading;
7
8 namespace Implab.Test {
9 class PromiseHelper {
10 public static Promise<T> Sleep<T>(int timeout, T retVal) {
11 return AsyncPool.Invoke(() => {
12 Thread.Sleep(timeout);
13 return retVal;
14 });
15 }
16 }
17 }
1 NO CONTENT: new file 100644, binary diff hidden
@@ -0,0 +1,10
1 using System;
2 using System.Collections.Generic;
3 using System.Linq;
4 using System.Text;
5
6 namespace Implab {
7 public interface ICancellable {
8 bool Cancel();
9 }
10 }
@@ -0,0 +1,18
1 using System;
2 using System.Collections.Generic;
3 using System.Linq;
4 using System.Text;
5
6 namespace Implab {
7 public interface IProgressHandler {
8 string Message {
9 get;
10 set;
11 }
12 float CurrentProgress {
13 get;
14 set;
15 }
16 void InitProgress(float current, float max, string message);
17 }
18 }
@@ -0,0 +1,14
1 using System;
2 using System.Collections.Generic;
3 using System.Linq;
4 using System.Text;
5
6 namespace Implab
7 {
8 public interface IProgressNotifier
9 {
10 event EventHandler<ValueEventArgs<string>> MessageUpdated;
11 event EventHandler<ValueEventArgs<float>> ProgressUpdated;
12 event EventHandler<ProgressInitEventArgs> ProgressInit;
13 }
14 }
@@ -0,0 +1,33
1 using System;
2 using System.Collections.Generic;
3 using System.Linq;
4 using System.Text;
5
6 namespace Implab
7 {
8 public interface IPromise: ICancellable
9 {
10 /// <summary>
11 /// Check whereather the promise has no more than one dependent promise.
12 /// </summary>
13 bool IsExclusive
14 {
15 get;
16 }
17
18 /// <summary>
19 /// The current state of the promise.
20 /// </summary>
21 PromiseState State
22 {
23 get;
24 }
25
26 /// <summary>
27 /// Registers handler for the case when the promise is cencelled. If the promise already cancelled the
28 /// handler will be invoked immediatelly.
29 /// </summary>
30 /// <param name="handler">The handler</param>
31 void HandleCancelled(Action handler);
32 }
33 }
@@ -0,0 +1,12
1 using System;
2 using System.Collections.Generic;
3 using System.Linq;
4 using System.Text;
5
6 namespace Implab {
7 public interface ITaskController: IProgressHandler {
8 bool Cancelled {
9 get;
10 }
11 }
12 }
@@ -0,0 +1,11
1 using System;
2 using System.Collections.Generic;
3 using System.Linq;
4 using System.Text;
5
6 namespace Implab {
7
8 /*public class ManagedPromise<T>: Promise<T>, ITaskController, IProgressNotifier {
9
10 }*/
11 }
@@ -0,0 +1,171
1 using System;
2 using System.Collections.Generic;
3 using System.Diagnostics;
4 using System.Linq;
5 using System.Text;
6 using System.Threading;
7
8 namespace Implab.Parallels {
9 public static class ArrayTraits {
10 class ArrayIterator<TSrc> : DispatchPool<int> {
11 readonly Action<TSrc> m_action;
12 readonly TSrc[] m_source;
13 readonly Promise<int> m_promise = new Promise<int>();
14
15 int m_pending;
16 int m_next;
17
18 public ArrayIterator(TSrc[] source, Action<TSrc> action, int threads)
19 : base(threads) {
20
21 Debug.Assert(source != null);
22 Debug.Assert(action != null);
23
24 m_next = 0;
25 m_source = source;
26 m_pending = source.Length;
27 m_action = action;
28
29 m_promise.Anyway(() => Dispose());
30 m_promise.Cancelled(() => Dispose());
31
32 InitPool();
33 }
34
35 public Promise<int> Promise {
36 get {
37 return m_promise;
38 }
39 }
40
41 protected override bool TryDequeue(out int unit) {
42 unit = Interlocked.Increment(ref m_next) - 1;
43 return unit >= m_source.Length ? false : true;
44 }
45
46 protected override void InvokeUnit(int unit) {
47 try {
48 m_action(m_source[unit]);
49 var pending = Interlocked.Decrement(ref m_pending);
50 if (pending == 0)
51 m_promise.Resolve(m_source.Length);
52 } catch (Exception e) {
53 m_promise.Reject(e);
54 }
55 }
56 }
57
58 class ArrayMapper<TSrc, TDst>: DispatchPool<int> {
59 readonly Func<TSrc, TDst> m_transform;
60 readonly TSrc[] m_source;
61 readonly TDst[] m_dest;
62 readonly Promise<TDst[]> m_promise = new Promise<TDst[]>();
63
64 int m_pending;
65 int m_next;
66
67 public ArrayMapper(TSrc[] source, Func<TSrc, TDst> transform, int threads)
68 : base(threads) {
69
70 Debug.Assert (source != null);
71 Debug.Assert( transform != null);
72
73 m_next = 0;
74 m_source = source;
75 m_dest = new TDst[source.Length];
76 m_pending = source.Length;
77 m_transform = transform;
78
79 m_promise.Anyway(() => Dispose());
80 m_promise.Cancelled(() => Dispose());
81
82 InitPool();
83 }
84
85 public Promise<TDst[]> Promise {
86 get {
87 return m_promise;
88 }
89 }
90
91 protected override bool TryDequeue(out int unit) {
92 unit = Interlocked.Increment(ref m_next) - 1;
93 return unit >= m_source.Length ? false : true;
94 }
95
96 protected override void InvokeUnit(int unit) {
97 try {
98 m_dest[unit] = m_transform(m_source[unit]);
99 var pending = Interlocked.Decrement(ref m_pending);
100 if (pending == 0)
101 m_promise.Resolve(m_dest);
102 } catch (Exception e) {
103 m_promise.Reject(e);
104 }
105 }
106 }
107
108 public static Promise<TDst[]> ParallelMap<TSrc, TDst> (this TSrc[] source, Func<TSrc,TDst> transform, int threads) {
109 if (source == null)
110 throw new ArgumentNullException("source");
111 if (transform == null)
112 throw new ArgumentNullException("transform");
113
114 var mapper = new ArrayMapper<TSrc, TDst>(source, transform, threads);
115 return mapper.Promise;
116 }
117
118 public static Promise<int> ParallelForEach<TSrc>(this TSrc[] source, Action<TSrc> action, int threads) {
119 if (source == null)
120 throw new ArgumentNullException("source");
121 if (action == null)
122 throw new ArgumentNullException("action");
123
124 var iter = new ArrayIterator<TSrc>(source, action, threads);
125 return iter.Promise;
126 }
127
128 public static Promise<TDst[]> ChainedMap<TSrc, TDst>(this TSrc[] source, ChainedOperation<TSrc, TDst> transform, int threads) {
129 if (source == null)
130 throw new ArgumentNullException("source");
131 if (transform == null)
132 throw new ArgumentNullException("transform");
133 if (threads <= 0)
134 throw new ArgumentOutOfRangeException("Threads number must be greater then zero");
135
136 var promise = new Promise<TDst[]>();
137 var res = new TDst[source.Length];
138 var pending = source.Length;
139 var semaphore = new Semaphore(threads, threads);
140
141 AsyncPool.InvokeNewThread(() => {
142 for (int i = 0; i < source.Length; i++) {
143 if(promise.State != PromiseState.Unresolved)
144 break; // stop processing in case of error or cancellation
145 var idx = i;
146 semaphore.WaitOne();
147 try {
148 var p1 = transform(source[i]);
149 p1.Anyway(() => semaphore.Release());
150 p1.Cancelled(() => semaphore.Release());
151 p1.Then(
152 x => {
153 res[idx] = x;
154 var left = Interlocked.Decrement(ref pending);
155 if (left == 0)
156 promise.Resolve(res);
157 },
158 e => promise.Reject(e)
159 );
160
161 } catch (Exception e) {
162 promise.Reject(e);
163 }
164 }
165 return 0;
166 });
167
168 return promise.Anyway(() => semaphore.Dispose());
169 }
170 }
171 }
@@ -0,0 +1,44
1 using System;
2 using System.Threading;
3
4 namespace Implab.Parallels {
5 /// <summary>
6 /// Класс для распаралеливания задач.
7 /// </summary>
8 /// <remarks>
9 /// Используя данный класс и лямда выражения можно распараллелить
10 /// вычисления, для этого используется концепция обещаний.
11 /// </remarks>
12 public static class AsyncPool {
13
14 public static Promise<T> Invoke<T>(Func<T> func) {
15 var p = new Promise<T>();
16
17 ThreadPool.QueueUserWorkItem(param => {
18 try {
19 p.Resolve(func());
20 } catch(Exception e) {
21 p.Reject(e);
22 }
23 });
24
25 return p;
26 }
27
28 public static Promise<T> InvokeNewThread<T>(Func<T> func) {
29 var p = new Promise<T>();
30
31 var worker = new Thread(() => {
32 try {
33 p.Resolve(func());
34 } catch (Exception e) {
35 p.Reject(e);
36 }
37 });
38 worker.IsBackground = true;
39 worker.Start();
40
41 return p;
42 }
43 }
44 }
@@ -0,0 +1,238
1 using System;
2 using System.Collections.Generic;
3 using System.Linq;
4 using System.Text;
5 using System.Threading;
6 using System.Diagnostics;
7
8 namespace Implab.Parallels {
9 public abstract class DispatchPool<TUnit> : IDisposable {
10 readonly int m_minThreads;
11 readonly int m_maxThreads;
12 int m_runningThreads = 0;
13 int m_maxRunningThreads = 0;
14 int m_suspended = 0;
15 int m_exitRequired = 0;
16 AutoResetEvent m_hasTasks = new AutoResetEvent(false);
17
18 protected DispatchPool(int min, int max) {
19 if (min < 0)
20 throw new ArgumentOutOfRangeException("min");
21 if (max <= 0)
22 throw new ArgumentOutOfRangeException("max");
23
24 if (min > max)
25 min = max;
26 m_minThreads = min;
27 m_maxThreads = max;
28 }
29
30 protected DispatchPool(int threads)
31 : this(threads, threads) {
32 }
33
34 protected DispatchPool() {
35 int maxThreads, maxCP;
36 ThreadPool.GetMaxThreads(out maxThreads, out maxCP);
37
38 m_minThreads = 0;
39 m_maxThreads = maxThreads;
40 }
41
42 protected void InitPool() {
43 for (int i = 0; i < m_minThreads; i++)
44 StartWorker();
45 }
46
47 public int ThreadCount {
48 get {
49 return m_runningThreads;
50 }
51 }
52
53 public int MaxRunningThreads {
54 get {
55 return m_maxRunningThreads;
56 }
57 }
58
59 protected bool IsDisposed {
60 get {
61 return m_exitRequired != 0;
62 }
63 }
64
65 protected abstract bool TryDequeue(out TUnit unit);
66
67 protected virtual bool ExtendPool() {
68 if (m_suspended > 0) {
69 m_hasTasks.Set();
70 return true;
71 } else
72 return StartWorker();
73 }
74
75 /// <summary>
76 /// Запускает либо новый поток, если раньше не было ни одного потока, либо устанавливает событие пробуждение одного спящего потока
77 /// </summary>
78 protected void WakePool() {
79 m_hasTasks.Set(); // wake sleeping thread;
80
81 if (AllocateThreadSlot(1)) {
82 var worker = new Thread(this.Worker);
83 worker.IsBackground = true;
84 worker.Start();
85 }
86 }
87
88 protected virtual void Suspend() {
89 m_hasTasks.WaitOne();
90 }
91
92 #region thread slots traits
93
94 bool AllocateThreadSlot() {
95 int current;
96 // use spins to allocate slot for the new thread
97 do {
98 current = m_runningThreads;
99 if (current >= m_maxThreads || m_exitRequired != 0)
100 // no more slots left or the pool has been disposed
101 return false;
102 } while (current != Interlocked.CompareExchange(ref m_runningThreads, current + 1, current));
103
104 UpdateMaxThreads(current + 1);
105
106 return true;
107 }
108
109 bool AllocateThreadSlot(int desired) {
110 if (desired - 1 != Interlocked.CompareExchange(ref m_runningThreads, desired, desired - 1))
111 return false;
112
113 UpdateMaxThreads(desired);
114
115 return true;
116 }
117
118 bool ReleaseThreadSlot(out bool last) {
119 last = false;
120 int current;
121 // use spins to release slot for the new thread
122 do {
123 current = m_runningThreads;
124 if (current <= m_minThreads && m_exitRequired == 0)
125 // the thread is reserved
126 return false;
127 } while (current != Interlocked.CompareExchange(ref m_runningThreads, current - 1, current));
128
129 last = (current == 1);
130
131 return true;
132 }
133
134 /// <summary>
135 /// releases thread slot unconditionally, used during cleanup
136 /// </summary>
137 /// <returns>true - no more threads left</returns>
138 bool ReleaseThreadSlotAnyway() {
139 var left = Interlocked.Decrement(ref m_runningThreads);
140 return left == 0;
141 }
142
143 void UpdateMaxThreads(int count) {
144 int max;
145 do {
146 max = m_maxRunningThreads;
147 if (max >= count)
148 break;
149 } while(max != Interlocked.CompareExchange(ref m_maxRunningThreads, count, max));
150 }
151
152 #endregion
153
154 bool StartWorker() {
155 if (AllocateThreadSlot()) {
156 // slot successfully allocated
157 var worker = new Thread(this.Worker);
158 worker.IsBackground = true;
159 worker.Start();
160
161 return true;
162 } else {
163 return false;
164 }
165 }
166
167 bool FetchTask(out TUnit unit) {
168 do {
169 // exit if requested
170 if (m_exitRequired != 0) {
171 // release the thread slot
172 if (ReleaseThreadSlotAnyway()) // it was the last worker
173 m_hasTasks.Dispose();
174 else
175 m_hasTasks.Set(); // wake next worker
176 unit = default(TUnit);
177 return false;
178 }
179
180 // fetch task
181 if (TryDequeue(out unit)) {
182 ExtendPool();
183 return true;
184 }
185
186 //no tasks left, exit if the thread is no longer needed
187 bool last;
188 if (ReleaseThreadSlot(out last)) {
189 if (last && m_hasTasks.WaitOne(0)) {
190 if (AllocateThreadSlot(1))
191 continue; // spin again...
192 else
193 // we failed to reallocate slot for this thread
194 // therefore we need to release the event
195 m_hasTasks.Set();
196 }
197
198 return false;
199 }
200
201 // entering suspend state
202 Interlocked.Increment(ref m_suspended);
203 // keep this thread and wait
204 Suspend();
205 Interlocked.Decrement(ref m_suspended);
206 } while (true);
207 }
208
209 protected abstract void InvokeUnit(TUnit unit);
210
211 void Worker() {
212 TUnit unit;
213 while (FetchTask(out unit))
214 InvokeUnit(unit);
215 }
216
217 protected virtual void Dispose(bool disposing) {
218 if (disposing) {
219 if (m_exitRequired == 0) {
220 if (Interlocked.CompareExchange(ref m_exitRequired, 1, 0) != 0)
221 return;
222
223 // wake sleeping threads
224 m_hasTasks.Set();
225 GC.SuppressFinalize(this);
226 }
227 }
228 }
229
230 public void Dispose() {
231 Dispose(true);
232 }
233
234 ~DispatchPool() {
235 Dispose(false);
236 }
237 }
238 }
@@ -0,0 +1,74
1 using System;
2 using System.Collections.Generic;
3 using System.Linq;
4 using System.Text;
5 using System.Threading;
6
7 namespace Implab.Parallels {
8 public class MTQueue<T> {
9 class Node {
10 public Node(T value) {
11 this.value = value;
12 }
13 public readonly T value;
14 public Node next;
15 }
16
17 Node m_first;
18 Node m_last;
19
20 public void Enqueue(T value) {
21 var last = m_last;
22 var next = new Node(value);
23
24 while (last != Interlocked.CompareExchange(ref m_last, next, last))
25 last = m_last;
26
27 if (last != null)
28 last.next = next;
29 else
30 m_first = next;
31 }
32
33 public bool TryDequeue(out T value) {
34 Node first;
35 Node next = null;
36 value = default(T);
37
38 do {
39 first = m_first;
40 if (first == null)
41 return false;
42 next = first.next;
43 if (next == null) {
44 // this is the last element,
45 // then try to update tail
46 if (first != Interlocked.CompareExchange(ref m_last, null, first)) {
47 // this is inconsistent situation which means that the queue is empty
48 if (m_last == null)
49 return false;
50 // tail has been changed, that means that we need to restart
51 continue;
52 }
53
54 // tail succesfully updated and first.next will never be changed
55 // other readers will fail due to inconsistency m_last != m_fist, but m_first.next == null
56 // but the writer may update the m_first since the m_last is null
57
58 // so we need to fix inconsistency by setting m_first to null, but if it already has been
59 // updated by a writer then we should just give up
60 Interlocked.CompareExchange(ref m_first, null, first);
61 break;
62
63 } else {
64 if (first == Interlocked.CompareExchange(ref m_first, next, first))
65 // head succesfully updated
66 break;
67 }
68 } while (true);
69
70 value = first.value;
71 return true;
72 }
73 }
74 }
@@ -0,0 +1,89
1 using System;
2 using System.Collections.Generic;
3 using System.Linq;
4 using System.Text;
5 using System.Threading;
6 using System.Diagnostics;
7
8 namespace Implab.Parallels {
9 public class WorkerPool : DispatchPool<Action> {
10
11 MTQueue<Action> m_queue = new MTQueue<Action>();
12 int m_queueLength = 0;
13 readonly int m_threshold = 1;
14
15 public WorkerPool(int minThreads, int maxThreads, int threshold)
16 : base(minThreads, maxThreads) {
17 m_threshold = threshold;
18 InitPool();
19 }
20
21 public WorkerPool(int minThreads, int maxThreads) :
22 base(minThreads, maxThreads) {
23 InitPool();
24 }
25
26 public WorkerPool(int threads)
27 : base(threads) {
28 InitPool();
29 }
30
31 public WorkerPool()
32 : base() {
33 InitPool();
34 }
35
36 public Promise<T> Invoke<T>(Func<T> task) {
37 if (task == null)
38 throw new ArgumentNullException("task");
39 if (IsDisposed)
40 throw new ObjectDisposedException(ToString());
41
42 var promise = new Promise<T>();
43
44 EnqueueTask(delegate() {
45 try {
46 promise.Resolve(task());
47 } catch (Exception e) {
48 promise.Reject(e);
49 }
50 });
51
52 return promise;
53 }
54
55 protected void EnqueueTask(Action unit) {
56 Debug.Assert(unit != null);
57 var len = Interlocked.Increment(ref m_queueLength);
58 m_queue.Enqueue(unit);
59
60 if(!ExtendPool())
61 WakePool();
62 }
63
64 protected override bool ExtendPool() {
65 if (m_queueLength <= m_threshold*ThreadCount)
66 // in this case we are in active thread and it request for additional workers
67 // satisfy it only when queue is longer than threshold
68 return false;
69 return base.ExtendPool();
70 }
71
72 protected override bool TryDequeue(out Action unit) {
73 if (m_queue.TryDequeue(out unit)) {
74 Interlocked.Decrement(ref m_queueLength);
75 return true;
76 }
77 return false;
78 }
79
80 protected override void InvokeUnit(Action unit) {
81 unit();
82 }
83
84 protected override void Suspend() {
85 if (m_queueLength == 0)
86 base.Suspend();
87 }
88 }
89 }
@@ -0,0 +1,36
1 using System;
2 using System.Collections.Generic;
3 using System.Linq;
4 using System.Text;
5
6 namespace Implab
7 {
8 [Serializable]
9 public class ProgressInitEventArgs: EventArgs
10 {
11 public float MaxProgress
12 {
13 get;
14 private set;
15 }
16
17 public float CurrentProgress
18 {
19 get;
20 private set;
21 }
22
23 public string Message
24 {
25 get;
26 private set;
27 }
28
29 public ProgressInitEventArgs(float current, float max, string message)
30 {
31 this.MaxProgress = max;
32 this.CurrentProgress = current;
33 this.Message = message;
34 }
35 }
36 }
@@ -0,0 +1,15
1 using System;
2 using System.Collections.Generic;
3 using System.Linq;
4 using System.Text;
5
6 namespace Implab
7 {
8 public enum PromiseState
9 {
10 Unresolved,
11 Resolved,
12 Cancelled,
13 Rejected
14 }
15 }
@@ -0,0 +1,132
1 using System;
2 using System.Collections.Generic;
3 using System.Linq;
4 using System.Text;
5 using System.Threading;
6
7 namespace Implab
8 {
9 /// <summary>
10 /// This class allows to interact with asyncronuos task.
11 /// </summary>
12 /// <remarks>
13 /// Members of this object are thread safe.
14 /// </remarks>
15 class TaskController: IProgressNotifier, ITaskController, ICancellable
16 {
17 readonly object m_lock;
18 string m_message;
19
20 float m_current;
21 float m_max;
22
23 bool m_cancelled;
24
25 public event EventHandler<ValueEventArgs<string>> MessageUpdated;
26 public event EventHandler<ValueEventArgs<float>> ProgressUpdated;
27 public event EventHandler<ProgressInitEventArgs> ProgressInit;
28
29 public TaskController()
30 {
31 m_lock = new Object();
32 }
33
34 public string Message
35 {
36 get
37 {
38 lock (m_lock)
39 return m_message;
40 }
41 set
42 {
43 lock (m_lock)
44 {
45 m_message = value;
46 OnMessageUpdated();
47 }
48 }
49 }
50
51 public float CurrentProgress
52 {
53 get
54 {
55 lock (m_lock)
56 return m_current;
57 }
58 set
59 {
60 lock (m_lock)
61 {
62 var prev = m_current;
63 m_current = value;
64 if (m_current >= m_max)
65 m_current = m_max;
66 if (m_current != prev)
67 OnProgressUpdated();
68 }
69 }
70 }
71
72 public void InitProgress(float current, float max, string message)
73 {
74 if (max < 0)
75 throw new ArgumentOutOfRangeException("max");
76 if (current < 0 || current > max)
77 throw new ArgumentOutOfRangeException("current");
78
79 lock(m_lock) {
80 m_current = current;
81 m_max = max;
82 m_message = message;
83 OnProgressInit();
84 }
85 }
86
87 public bool Cancelled {
88 get {
89 lock (m_lock)
90 return m_cancelled;
91 }
92 }
93
94 public bool Cancel() {
95 lock (m_lock) {
96 if (!m_cancelled) {
97 m_cancelled = true;
98 return true;
99 } else {
100 return false;
101 }
102 }
103 }
104
105 protected virtual void OnMessageUpdated()
106 {
107 var temp = MessageUpdated;
108 if (temp != null)
109 {
110 temp(this, new ValueEventArgs<string>(m_message));
111 }
112 }
113
114 protected virtual void OnProgressUpdated()
115 {
116 var temp = ProgressUpdated;
117 if (temp != null)
118 {
119 temp(this,new ValueEventArgs<float>(m_current));
120 }
121 }
122
123 protected virtual void OnProgressInit()
124 {
125 var temp = ProgressInit;
126 if (temp != null)
127 {
128 temp(this, new ProgressInitEventArgs(m_current,m_max, m_message));
129 }
130 }
131 }
132 }
@@ -0,0 +1,21
1 using System;
2 using System.Collections.Generic;
3 using System.Linq;
4 using System.Text;
5
6 namespace Implab
7 {
8 [Serializable]
9 public class ValueEventArgs<T>: EventArgs
10 {
11 public ValueEventArgs(T value)
12 {
13 this.Value = value;
14 }
15 public T Value
16 {
17 get;
18 private set;
19 }
20 }
21 }
@@ -10,3 +10,4 Implab.Fx/obj/
10 10 Implab.Fx/bin/
11 11 Implab.Fx.Test/bin/
12 12 Implab.Fx.Test/obj/
13 _ReSharper.Implab/
@@ -1,17 +1,14
1 1 using System;
2 2 using Microsoft.VisualStudio.TestTools.UnitTesting;
3 using Implab;
4 3 using System.Reflection;
5 4 using System.Threading;
5 using Implab.Parallels;
6 6
7 namespace Implab.Tests
8 {
7 namespace Implab.Test {
9 8 [TestClass]
10 public class AsyncTests
11 {
9 public class AsyncTests {
12 10 [TestMethod]
13 public void ResolveTest ()
14 {
11 public void ResolveTest() {
15 12 int res = -1;
16 13 var p = new Promise<int> ();
17 14 p.Then (x => res = x);
@@ -21,8 +18,7 namespace Implab.Tests
21 18 }
22 19
23 20 [TestMethod]
24 public void RejectTest ()
25 {
21 public void RejectTest() {
26 22 int res = -1;
27 23 Exception err = null;
28 24
@@ -36,16 +32,14 namespace Implab.Tests
36 32 }
37 33
38 34 [TestMethod]
39 public void JoinSuccessTest ()
40 {
35 public void JoinSuccessTest() {
41 36 var p = new Promise<int> ();
42 37 p.Resolve (100);
43 38 Assert.AreEqual (p.Join (), 100);
44 39 }
45 40
46 41 [TestMethod]
47 public void JoinFailTest ()
48 {
42 public void JoinFailTest() {
49 43 var p = new Promise<int> ();
50 44 p.Reject (new ApplicationException ("failed"));
51 45
@@ -60,8 +54,7 namespace Implab.Tests
60 54 }
61 55
62 56 [TestMethod]
63 public void MapTest ()
64 {
57 public void MapTest() {
65 58 var p = new Promise<int> ();
66 59
67 60 var p2 = p.Map (x => x.ToString ());
@@ -71,8 +64,18 namespace Implab.Tests
71 64 }
72 65
73 66 [TestMethod]
74 public void ChainTest ()
75 {
67 public void FixErrorTest() {
68 var p = new Promise<int>();
69
70 var p2 = p.Error(e => 101);
71
72 p.Reject(new Exception());
73
74 Assert.AreEqual(p2.Join(), 101);
75 }
76
77 [TestMethod]
78 public void ChainTest() {
76 79 var p1 = new Promise<int> ();
77 80
78 81 var p3 = p1.Chain (x => {
@@ -87,15 +90,244 namespace Implab.Tests
87 90 }
88 91
89 92 [TestMethod]
90 public void PoolTest ()
91 {
93 public void PoolTest() {
92 94 var pid = Thread.CurrentThread.ManagedThreadId;
93 var p = AsyncPool.Invoke (() => {
94 return Thread.CurrentThread.ManagedThreadId;
95 });
95 var p = AsyncPool.Invoke(() => Thread.CurrentThread.ManagedThreadId);
96 96
97 97 Assert.AreNotEqual (pid, p.Join ());
98 98 }
99
100 [TestMethod]
101 public void WorkerPoolSizeTest() {
102 var pool = new WorkerPool(5, 10, 0);
103
104 Assert.AreEqual(5, pool.ThreadCount);
105
106 pool.Invoke(() => { Thread.Sleep(1000000); return 10; });
107 pool.Invoke(() => { Thread.Sleep(1000000); return 10; });
108 pool.Invoke(() => { Thread.Sleep(1000000); return 10; });
109
110 Assert.AreEqual(5, pool.ThreadCount);
111
112 for (int i = 0; i < 100; i++)
113 pool.Invoke(() => { Thread.Sleep(1000000); return 10; });
114 Thread.Sleep(100);
115 Assert.AreEqual(10, pool.ThreadCount);
116
117 pool.Dispose();
118 }
119
120 [TestMethod]
121 public void WorkerPoolCorrectTest() {
122 var pool = new WorkerPool(0,1000,100);
123
124 int iterations = 1000;
125 int pending = iterations;
126 var stop = new ManualResetEvent(false);
127
128 var count = 0;
129 for (int i = 0; i < iterations; i++) {
130 pool
131 .Invoke(() => 1)
132 .Then(x => Interlocked.Add(ref count, x))
133 .Then(x => Math.Log10(x))
134 .Anyway(() => {
135 Interlocked.Decrement(ref pending);
136 if (pending == 0)
137 stop.Set();
138 });
139 }
140
141 stop.WaitOne();
142
143 Assert.AreEqual(iterations, count);
144 Console.WriteLine("Max threads: {0}", pool.MaxRunningThreads);
145 pool.Dispose();
146
147 }
148
149 [TestMethod]
150 public void WorkerPoolDisposeTest() {
151 var pool = new WorkerPool(5, 20);
152 Assert.AreEqual(5, pool.ThreadCount);
153 pool.Dispose();
154 Thread.Sleep(100);
155 Assert.AreEqual(0, pool.ThreadCount);
156 pool.Dispose();
157 }
158
159 [TestMethod]
160 public void MTQueueTest() {
161 var queue = new MTQueue<int>();
162 int res;
163
164 queue.Enqueue(10);
165 Assert.IsTrue(queue.TryDequeue(out res));
166 Assert.AreEqual(10, res);
167 Assert.IsFalse(queue.TryDequeue(out res));
168
169 for (int i = 0; i < 1000; i++)
170 queue.Enqueue(i);
171
172 for (int i = 0; i < 1000; i++) {
173 queue.TryDequeue(out res);
174 Assert.AreEqual(i, res);
175 }
176
177 int writers = 0;
178 int readers = 0;
179 var stop = new ManualResetEvent(false);
180 int total = 0;
181
182 int itemsPerWriter = 1000;
183 int writersCount = 3;
184
185 for (int i = 0; i < writersCount; i++) {
186 Interlocked.Increment(ref writers);
187 var wn = i;
188 AsyncPool
189 .InvokeNewThread(() => {
190 for (int ii = 0; ii < itemsPerWriter; ii++) {
191 queue.Enqueue(1);
192 }
193 return 1;
194 })
195 .Anyway(() => Interlocked.Decrement(ref writers));
196 }
197
198 for (int i = 0; i < 10; i++) {
199 Interlocked.Increment(ref readers);
200 var wn = i;
201 AsyncPool
202 .InvokeNewThread(() => {
203 int t;
204 do {
205 while (queue.TryDequeue(out t))
206 Interlocked.Add(ref total, t);
207 } while (writers > 0);
208 return 1;
209 })
210 .Anyway(() => {
211 Interlocked.Decrement(ref readers);
212 if (readers == 0)
213 stop.Set();
214 });
215 }
216
217 stop.WaitOne();
218
219 Assert.AreEqual(itemsPerWriter * writersCount, total);
220 }
221
222 [TestMethod]
223 public void ParallelMapTest() {
224
225 int count = 100000;
226
227 double[] args = new double[count];
228 var rand = new Random();
229
230 for (int i = 0; i < count; i++)
231 args[i] = rand.NextDouble();
232
233 var t = Environment.TickCount;
234 var res = args.ParallelMap(x => Math.Sin(x*x), 4).Join();
235
236 Console.WriteLine("Map complete in {0} ms", Environment.TickCount - t);
237
238 t = Environment.TickCount;
239 for (int i = 0; i < count; i++)
240 Assert.AreEqual(Math.Sin(args[i] * args[i]), res[i]);
241 Console.WriteLine("Verified in {0} ms", Environment.TickCount - t);
242 }
243
244 [TestMethod]
245 public void ChainedMapTest() {
246
247 using (var pool = new WorkerPool(8,100,0)) {
248 int count = 10000;
249
250 double[] args = new double[count];
251 var rand = new Random();
252
253 for (int i = 0; i < count; i++)
254 args[i] = rand.NextDouble();
255
256 var t = Environment.TickCount;
257 var res = args
258 .ChainedMap(
259 x => pool.Invoke(
260 () => Math.Sin(x * x)
261 ),
262 4
263 )
264 .Join();
265
266 Console.WriteLine("Map complete in {0} ms", Environment.TickCount - t);
267
268 t = Environment.TickCount;
269 for (int i = 0; i < count; i++)
270 Assert.AreEqual(Math.Sin(args[i] * args[i]), res[i]);
271 Console.WriteLine("Verified in {0} ms", Environment.TickCount - t);
272 Console.WriteLine("Max workers: {0}", pool.MaxRunningThreads);
273 }
274 }
275
276 [TestMethod]
277 public void ParallelForEachTest() {
278
279 int count = 100000;
280
281 int[] args = new int[count];
282 var rand = new Random();
283
284 for (int i = 0; i < count; i++)
285 args[i] = (int)(rand.NextDouble() * 100);
286
287 int result = 0;
288
289 var t = Environment.TickCount;
290 args.ParallelForEach(x => Interlocked.Add(ref result, x), 4).Join();
291
292 Console.WriteLine("Iteration complete in {0} ms, result: {1}", Environment.TickCount - t, result);
293
294 int result2 = 0;
295
296 t = Environment.TickCount;
297 for (int i = 0; i < count; i++)
298 result2 += args[i];
299 Assert.AreEqual(result2, result);
300 Console.WriteLine("Verified in {0} ms", Environment.TickCount - t);
301 }
302
303 [TestMethod]
304 public void ComplexCase1Test() {
305 var flags = new bool[3];
306
307 // op1 (aync 200ms) => op2 (async 200ms) => op3 (sync map)
308
309 var p = PromiseHelper
310 .Sleep(200, "Alan")
311 .Cancelled(() => flags[0] = true)
312 .Chain(x =>
313 PromiseHelper
314 .Sleep(200, "Hi, " + x)
315 .Map(y => y)
316 .Cancelled(() => flags[1] = true)
317 )
318 .Cancelled(() => flags[2] = true);
319 Thread.Sleep(300);
320 p.Cancel();
321 try {
322 Assert.AreEqual(p.Join(), "Hi, Alan");
323 Assert.Fail("Shouldn't get here");
324 } catch (OperationCanceledException) {
325 }
326
327 Assert.IsFalse(flags[0]);
328 Assert.IsTrue(flags[1]);
329 Assert.IsTrue(flags[2]);
330 }
99 331 }
100 332 }
101 333
@@ -46,6 +46,7
46 46 </ItemGroup>
47 47 <ItemGroup>
48 48 <Compile Include="AsyncTests.cs" />
49 <Compile Include="PromiseHelper.cs" />
49 50 <Compile Include="Properties\AssemblyInfo.cs" />
50 51 </ItemGroup>
51 52 <ItemGroup>
1 NO CONTENT: modified file, binary diff hidden
@@ -32,13 +32,25
32 32 <Reference Include="System" />
33 33 </ItemGroup>
34 34 <ItemGroup>
35 <Compile Include="ICancellable.cs" />
36 <Compile Include="IProgressHandler.cs" />
37 <Compile Include="IProgressNotifier.cs" />
38 <Compile Include="IPromise.cs" />
39 <Compile Include="ITaskController.cs" />
40 <Compile Include="ManagedPromise.cs" />
41 <Compile Include="Parallels\DispatchPool.cs" />
42 <Compile Include="Parallels\ArrayTraits.cs" />
43 <Compile Include="Parallels\MTQueue.cs" />
44 <Compile Include="Parallels\WorkerPool.cs" />
45 <Compile Include="PromiseState.cs" />
46 <Compile Include="TaskController.cs" />
47 <Compile Include="ProgressInitEventArgs.cs" />
35 48 <Compile Include="Properties\AssemblyInfo.cs" />
36 49 <Compile Include="Promise.cs" />
37 <Compile Include="AsyncPool.cs" />
50 <Compile Include="Parallels\AsyncPool.cs" />
38 51 <Compile Include="Safe.cs" />
52 <Compile Include="ValueEventArgs.cs" />
39 53 </ItemGroup>
40 54 <Import Project="$(MSBuildBinPath)\Microsoft.CSharp.targets" />
41 <ItemGroup>
42 <Folder Include="Parallels\" />
43 </ItemGroup>
55 <ItemGroup />
44 56 </Project> No newline at end of file
@@ -1,18 +1,16
1 1 using System;
2 2 using System.Collections.Generic;
3 using System.Linq;
4 3 using System.Reflection;
5 using System.Text;
6 4 using System.Diagnostics;
7 5 using System.Threading;
8 6
9 7 namespace Implab {
10 8
11 9 public delegate void ErrorHandler(Exception e);
12
13 public delegate void ResultHandler<T>(T result);
14 public delegate TNew ResultMapper<TSrc, TNew>(TSrc result);
15 public delegate Promise<TNew> ChainedOperation<TSrc, TNew>(TSrc result);
10 public delegate T ErrorHandler<out T>(Exception e);
11 public delegate void ResultHandler<in T>(T result);
12 public delegate TNew ResultMapper<in TSrc, out TNew>(TSrc result);
13 public delegate Promise<TNew> ChainedOperation<in TSrc, TNew>(TSrc result);
16 14
17 15 /// <summary>
18 16 /// Класс для асинхронного получения результатов. Так называемое "обещание".
@@ -48,23 +46,23 namespace Implab {
48 46 /// только инициатор обещания иначе могут возникнуть противоречия.
49 47 /// </para>
50 48 /// </remarks>
51 public class Promise<T> {
49 public class Promise<T> : IPromise {
52 50
53 51 struct ResultHandlerInfo {
54 52 public ResultHandler<T> resultHandler;
55 53 public ErrorHandler errorHandler;
56 54 }
57 55
58 enum State {
59 Unresolved,
60 Resolving,
61 Resolved,
62 Cancelled
63 }
56 readonly IPromise m_parent;
57
58 LinkedList<ResultHandlerInfo> m_resultHandlers = new LinkedList<ResultHandlerInfo>();
59 LinkedList<Action> m_cancelHandlers = new LinkedList<Action>();
64 60
65 LinkedList<ResultHandlerInfo> m_handlersChain = new LinkedList<ResultHandlerInfo>();
66 State m_state;
67 bool m_cancellable;
61 readonly object m_lock = new Object();
62 readonly bool m_cancellable;
63 int m_childrenCount = 0;
64
65 PromiseState m_state;
68 66 T m_result;
69 67 Exception m_error;
70 68
@@ -72,13 +70,17 namespace Implab {
72 70 m_cancellable = true;
73 71 }
74 72
75 /// <summary>
76 /// Событие, возникающее при отмене асинхронной операции.
77 /// </summary>
78 /// <description>
79 /// Как правило используется для оповещения объекта, выполняющего асинхронную операцию, о том, что ее следует отменить.
80 /// </description>
81 public event EventHandler Cancelled;
73 public Promise(IPromise parent, bool cancellable) {
74 m_cancellable = cancellable;
75 m_parent = parent;
76 if (parent != null)
77 parent.HandleCancelled(InternalCancel);
78 }
79
80 void InternalCancel() {
81 // don't try to cancel parent :)
82 Cancel(false);
83 }
82 84
83 85 /// <summary>
84 86 /// Выполняет обещание, сообщая об успешном выполнении.
@@ -86,38 +88,39 namespace Implab {
86 88 /// <param name="result">Результат выполнения.</param>
87 89 /// <exception cref="InvalidOperationException">Данное обещание уже выполнено</exception>
88 90 public void Resolve(T result) {
89 lock (this) {
90 if (m_state == State.Cancelled)
91 lock (m_lock) {
92 if (m_state == PromiseState.Cancelled)
91 93 return;
92 if (m_state != State.Unresolved)
94 if (m_state != PromiseState.Unresolved)
93 95 throw new InvalidOperationException("The promise is already resolved");
94 96 m_result = result;
95 m_state = State.Resolving;
97 m_state = PromiseState.Resolved;
96 98 }
97 99
98 ResultHandlerInfo handler;
99 while (FetchNextHandler(out handler))
100 InvokeHandler(handler);
100 OnStateChanged();
101 101 }
102 102
103 103 /// <summary>
104 104 /// Выполняет обещание, сообщая об ошибке
105 105 /// </summary>
106 /// <remarks>
107 /// Поскольку обещание должно работать в многопточной среде, при его выполнении сразу несколько потоков
108 /// могу вернуть ошибку, при этом только первая будет использована в качестве результата, остальные
109 /// будут проигнорированы.
110 /// </remarks>
106 111 /// <param name="error">Исключение возникшее при выполнении операции</param>
107 112 /// <exception cref="InvalidOperationException">Данное обещание уже выполнено</exception>
108 113 public void Reject(Exception error) {
109 lock (this) {
110 if (m_state == State.Cancelled)
114 lock (m_lock) {
115 if (m_state == PromiseState.Cancelled || m_state == PromiseState.Rejected)
111 116 return;
112 if (m_state != State.Unresolved)
117 if (m_state != PromiseState.Unresolved)
113 118 throw new InvalidOperationException("The promise is already resolved");
114 119 m_error = error;
115 m_state = State.Resolving;
120 m_state = PromiseState.Rejected;
116 121 }
117 122
118 ResultHandlerInfo handler;
119 while (FetchNextHandler(out handler))
120 InvokeHandler(handler);
123 OnStateChanged();
121 124 }
122 125
123 126 /// <summary>
@@ -125,47 +128,31 namespace Implab {
125 128 /// </summary>
126 129 /// <returns><c>true</c> Операция была отменена, обработчики не будут вызваны.<c>false</c> отмена не возможна, поскольку обещание уже выполнено и обработчики отработали.</returns>
127 130 public bool Cancel() {
128 lock (this) {
129 if (m_state == State.Unresolved && m_cancellable) {
130 m_state = State.Cancelled;
131 EventHandler temp = Cancelled;
132
133 if (temp != null)
134 temp(this, new EventArgs());
135
136 return true;
137 } else
138 return false;
139 }
131 return Cancel(true);
140 132 }
141 133
142 134 /// <summary>
143 /// Добавляет обработчики событий выполнения обещания.
135 /// Adds new handlers to this promise.
144 136 /// </summary>
145 /// <param name="success">Обработчик успешного выполнения обещания.
146 /// Данному обработчику будет передан результат выполнения операции.</param>
147 /// <param name="error">Обработчик ошибки. Данный обработчик получит
148 /// исключение возникшее при выполнении операции.</param>
149 /// <returns>Само обещание</returns>
137 /// <param name="success">The handler of the successfully completed operation.
138 /// This handler will recieve an operation result as a parameter.</param>
139 /// <param name="error">Handles an exception that may occur during the operation.</param>
140 /// <returns>The new promise chained to this one.</returns>
150 141 public Promise<T> Then(ResultHandler<T> success, ErrorHandler error) {
151 142 if (success == null && error == null)
152 143 return this;
153 144
154 var medium = new Promise<T>();
145 var medium = new Promise<T>(this, true);
155 146
156 147 var handlerInfo = new ResultHandlerInfo();
157 148
158 149 if (success != null)
159 150 handlerInfo.resultHandler = x => {
160 try {
161 151 success(x);
162 152 medium.Resolve(x);
163 } catch (Exception e) {
164 medium.Reject(e);
165 }
166 153 };
167 154 else
168 handlerInfo.resultHandler = x => medium.Resolve(x);
155 handlerInfo.resultHandler = medium.Resolve;
169 156
170 157 if (error != null)
171 158 handlerInfo.errorHandler = x => {
@@ -175,21 +162,106 namespace Implab {
175 162 medium.Reject(x);
176 163 };
177 164 else
178 handlerInfo.errorHandler = x => medium.Reject(x);
165 handlerInfo.errorHandler = medium.Reject;
166
167 AddHandler(handlerInfo);
168
169 return medium;
170 }
171
172 /// <summary>
173 /// Adds new handlers to this promise.
174 /// </summary>
175 /// <param name="success">The handler of the successfully completed operation.
176 /// This handler will recieve an operation result as a parameter.</param>
177 /// <param name="error">Handles an exception that may occur during the operation and returns the value which will be used as the result of the operation.</param>
178 /// <returns>The new promise chained to this one.</returns>
179 public Promise<T> Then(ResultHandler<T> success, ErrorHandler<T> error) {
180 if (success == null && error == null)
181 return this;
182
183 var medium = new Promise<T>(this, true);
184
185 var handlerInfo = new ResultHandlerInfo();
186
187 if (success != null)
188 handlerInfo.resultHandler = x => {
189 success(x);
190 medium.Resolve(x);
191 };
192 else
193 handlerInfo.resultHandler = medium.Resolve;
194
195 if (error != null)
196 handlerInfo.errorHandler = x => {
197 try {
198 medium.Resolve(error(x));
199 } catch { }
200 medium.Reject(x);
201 };
202 else
203 handlerInfo.errorHandler = medium.Reject;
179 204
180 205 AddHandler(handlerInfo);
181 206
182 207 return medium;
183 208 }
184 209
210
185 211 public Promise<T> Then(ResultHandler<T> success) {
186 return Then(success, null);
212 if (success == null)
213 return this;
214
215 var medium = new Promise<T>(this, true);
216
217 var handlerInfo = new ResultHandlerInfo();
218
219 if (success != null)
220 handlerInfo.resultHandler = x => {
221 success(x);
222 medium.Resolve(x);
223 };
224 else
225 handlerInfo.resultHandler = medium.Resolve;
226
227 handlerInfo.errorHandler = medium.Reject;
228
229 AddHandler(handlerInfo);
230
231 return medium;
187 232 }
188 233
189 234 public Promise<T> Error(ErrorHandler error) {
190 235 return Then(null, error);
191 236 }
192 237
238 /// <summary>
239 /// Handles error and allows to keep the promise.
240 /// </summary>
241 /// <remarks>
242 /// If the specified handler throws an exception, this exception will be used to reject the promise.
243 /// </remarks>
244 /// <param name="handler">The error handler which returns the result of the promise.</param>
245 /// <returns>New promise.</returns>
246 public Promise<T> Error(ErrorHandler<T> handler) {
247 if (handler == null)
248 return this;
249
250 var medium = new Promise<T>(this, true);
251
252 AddHandler(new ResultHandlerInfo {
253 errorHandler = e => {
254 try {
255 medium.Resolve(handler(e));
256 } catch (Exception e2) {
257 medium.Reject(e2);
258 }
259 }
260 });
261
262 return medium;
263 }
264
193 265 public Promise<T> Anyway(Action handler) {
194 266 if (handler == null)
195 267 return this;
@@ -198,6 +270,7 namespace Implab {
198 270
199 271 AddHandler(new ResultHandlerInfo {
200 272 resultHandler = x => {
273 // to avoid handler being called multiple times we handle exception by ourselfs
201 274 try {
202 275 handler();
203 276 medium.Resolve(x);
@@ -229,20 +302,15 namespace Implab {
229 302 throw new ArgumentNullException("mapper");
230 303
231 304 // создаем прицепленное обещание
232 Promise<TNew> chained = new Promise<TNew>();
305 var chained = new Promise<TNew>();
233 306
234 307 AddHandler(new ResultHandlerInfo() {
235 resultHandler = delegate(T result) {
236 try {
237 // если преобразование выдаст исключение, то сработает reject сцепленного deferred
238 chained.Resolve(mapper(result));
239 } catch (Exception e) {
240 chained.Reject(e);
241 }
242 },
308 resultHandler = result => chained.Resolve(mapper(result)),
243 309 errorHandler = delegate(Exception e) {
244 310 if (error != null)
311 try {
245 312 error(e);
313 } catch { }
246 314 // в случае ошибки нужно передать исключение дальше по цепочке
247 315 chained.Reject(e);
248 316 }
@@ -271,19 +339,21 namespace Implab {
271 339 // создать посредника, к которому будут подвызяваться следующие обработчики.
272 340 // когда будет выполнена реальная асинхронная операция, она обратиться к посреднику, чтобы
273 341 // передать через него результаты работы.
274 Promise<TNew> medium = new Promise<TNew>();
342 var medium = new Promise<TNew>(this, true);
275 343
276 AddHandler(new ResultHandlerInfo() {
344 AddHandler(new ResultHandlerInfo {
277 345 resultHandler = delegate(T result) {
278 try {
279 chained(result).Then(
346 if (medium.State == PromiseState.Cancelled)
347 return;
348
349 var promise = chained(result);
350
351 // notify chained operation that it's not needed
352 medium.Cancelled(() => promise.Cancel());
353 promise.Then(
280 354 x => medium.Resolve(x),
281 355 e => medium.Reject(e)
282 356 );
283 } catch (Exception e) {
284 // если сцепленное действие выдало исключение вместо обещания, то передаем ошибку по цепочке
285 medium.Reject(e);
286 }
287 357 },
288 358 errorHandler = delegate(Exception e) {
289 359 if (error != null)
@@ -300,6 +370,22 namespace Implab {
300 370 return Chain(chained, null);
301 371 }
302 372
373 public Promise<T> Cancelled(Action handler) {
374 if (handler == null)
375 return this;
376 lock (m_lock) {
377 if (m_state == PromiseState.Unresolved)
378 m_cancelHandlers.AddLast(handler);
379 else if (m_state == PromiseState.Cancelled)
380 handler();
381 }
382 return this;
383 }
384
385 public void HandleCancelled(Action handler) {
386 Cancelled(handler);
387 }
388
303 389 /// <summary>
304 390 /// Дожидается отложенного обещания и в случае успеха, возвращает
305 391 /// его, результат, в противном случае бросает исключение.
@@ -322,52 +408,37 namespace Implab {
322 408 /// <param name="timeout">Время ожидания</param>
323 409 /// <returns>Результат выполнения обещания</returns>
324 410 public T Join(int timeout) {
325 ManualResetEvent evt = new ManualResetEvent(false);
411 var evt = new ManualResetEvent(false);
326 412 Anyway(() => evt.Set());
413 Cancelled(() => evt.Set());
327 414
328 415 if (!evt.WaitOne(timeout, true))
329 416 throw new TimeoutException();
330 417
331 if (m_error != null)
418 switch (State) {
419 case PromiseState.Resolved:
420 return m_result;
421 case PromiseState.Cancelled:
422 throw new OperationCanceledException();
423 case PromiseState.Rejected:
332 424 throw new TargetInvocationException(m_error);
333 else
334 return m_result;
425 default:
426 throw new ApplicationException(String.Format("Invalid promise state {0}", State));
427 }
335 428 }
336 429
337 430 public T Join() {
338 431 return Join(Timeout.Infinite);
339 432 }
340 433
341 /// <summary>
342 /// Данный метод последовательно извлекает обработчики обещания и когда
343 /// их больше не осталось - ставит состояние "разрешено".
344 /// </summary>
345 /// <param name="handler">Информация об обработчике</param>
346 /// <returns>Признак того, что еще остались обработчики в очереди</returns>
347 bool FetchNextHandler(out ResultHandlerInfo handler) {
348 handler = default(ResultHandlerInfo);
349
350 lock (this) {
351 Debug.Assert(m_state == State.Resolving);
352
353 if (m_handlersChain.Count > 0) {
354 handler = m_handlersChain.First.Value;
355 m_handlersChain.RemoveFirst();
356 return true;
357 } else {
358 m_state = State.Resolved;
359 return false;
360 }
361 }
362 }
363
364 434 void AddHandler(ResultHandlerInfo handler) {
365 435 bool invokeRequired = false;
366 436
367 lock (this) {
368 if (m_state != State.Resolved)
369 m_handlersChain.AddLast(handler);
370 else
437 lock (m_lock) {
438 m_childrenCount++;
439 if (m_state == PromiseState.Unresolved) {
440 m_resultHandlers.AddLast(handler);
441 } else
371 442 invokeRequired = true;
372 443 }
373 444
@@ -377,21 +448,102 namespace Implab {
377 448 }
378 449
379 450 void InvokeHandler(ResultHandlerInfo handler) {
380 if (m_error == null) {
451 switch (m_state) {
452 case PromiseState.Resolved:
381 453 try {
382 454 if (handler.resultHandler != null)
383 455 handler.resultHandler(m_result);
456 } catch (Exception e) {
457 try {
458 if (handler.errorHandler != null)
459 handler.errorHandler(e);
384 460 } catch { }
385 461 }
386
387 if (m_error != null) {
462 break;
463 case PromiseState.Rejected:
388 464 try {
389 465 if (handler.errorHandler != null)
390 466 handler.errorHandler(m_error);
391 467 } catch { }
468 break;
469 default:
470 // do nothing
471 return;
392 472 }
393 473 }
394 474
475 protected virtual void OnStateChanged() {
476 switch (m_state) {
477 case PromiseState.Resolved:
478 foreach (var resultHandlerInfo in m_resultHandlers)
479 try {
480 if (resultHandlerInfo.resultHandler != null)
481 resultHandlerInfo.resultHandler(m_result);
482 } catch (Exception e) {
483 try {
484 if (resultHandlerInfo.errorHandler != null)
485 resultHandlerInfo.errorHandler(e);
486 } catch { }
487 }
488 break;
489 case PromiseState.Cancelled:
490 foreach (var cancelHandler in m_cancelHandlers)
491 cancelHandler();
492 break;
493 case PromiseState.Rejected:
494 foreach (var resultHandlerInfo in m_resultHandlers)
495 try {
496 if (resultHandlerInfo.errorHandler != null)
497 resultHandlerInfo.errorHandler(m_error);
498 } catch { }
499 break;
500 default:
501 throw new InvalidOperationException(String.Format("Promise entered an invalid state {0}", m_state));
502 }
503
504 m_resultHandlers = null;
505 m_cancelHandlers = null;
506 }
507
508
509
510 public bool IsExclusive {
511 get {
512 lock (m_lock) {
513 return m_childrenCount <= 1;
514 }
515 }
516 }
517
518 public PromiseState State {
519 get {
520 lock (m_lock) {
521 return m_state;
522 }
523 }
524 }
525
526 protected bool Cancel(bool dependencies) {
527 bool result;
528
529 lock (m_lock) {
530 if (m_state == PromiseState.Unresolved) {
531 m_state = PromiseState.Cancelled;
532 result = true;
533 } else {
534 result = false;
535 }
536 }
537
538 if (result)
539 OnStateChanged();
540
541 if (dependencies && m_parent != null && m_parent.IsExclusive) {
542 m_parent.Cancel();
543 }
544
545 return result;
546 }
395 547
396 548 }
397 549 }
1 NO CONTENT: file was removed
General Comments 0
You need to be logged in to leave comments. Login now