##// END OF EJS Templates
Слияние с promises
cin -
r18:0c924dff5498 merge default
parent child
Show More
@@ -0,0 +1,17
1 using Implab.Parallels;
2 using System;
3 using System.Collections.Generic;
4 using System.Linq;
5 using System.Text;
6 using System.Threading;
7
8 namespace Implab.Test {
9 class PromiseHelper {
10 public static Promise<T> Sleep<T>(int timeout, T retVal) {
11 return AsyncPool.Invoke(() => {
12 Thread.Sleep(timeout);
13 return retVal;
14 });
15 }
16 }
17 }
1 NO CONTENT: new file 100644, binary diff hidden
NO CONTENT: new file 100644, binary diff hidden
@@ -0,0 +1,10
1 using System;
2 using System.Collections.Generic;
3 using System.Linq;
4 using System.Text;
5
6 namespace Implab {
7 public interface ICancellable {
8 bool Cancel();
9 }
10 }
@@ -0,0 +1,18
1 using System;
2 using System.Collections.Generic;
3 using System.Linq;
4 using System.Text;
5
6 namespace Implab {
7 public interface IProgressHandler {
8 string Message {
9 get;
10 set;
11 }
12 float CurrentProgress {
13 get;
14 set;
15 }
16 void InitProgress(float current, float max, string message);
17 }
18 }
@@ -0,0 +1,14
1 using System;
2 using System.Collections.Generic;
3 using System.Linq;
4 using System.Text;
5
6 namespace Implab
7 {
8 public interface IProgressNotifier
9 {
10 event EventHandler<ValueEventArgs<string>> MessageUpdated;
11 event EventHandler<ValueEventArgs<float>> ProgressUpdated;
12 event EventHandler<ProgressInitEventArgs> ProgressInit;
13 }
14 }
@@ -0,0 +1,33
1 using System;
2 using System.Collections.Generic;
3 using System.Linq;
4 using System.Text;
5
6 namespace Implab
7 {
8 public interface IPromise: ICancellable
9 {
10 /// <summary>
11 /// Check whereather the promise has no more than one dependent promise.
12 /// </summary>
13 bool IsExclusive
14 {
15 get;
16 }
17
18 /// <summary>
19 /// The current state of the promise.
20 /// </summary>
21 PromiseState State
22 {
23 get;
24 }
25
26 /// <summary>
27 /// Registers handler for the case when the promise is cencelled. If the promise already cancelled the
28 /// handler will be invoked immediatelly.
29 /// </summary>
30 /// <param name="handler">The handler</param>
31 void HandleCancelled(Action handler);
32 }
33 }
@@ -0,0 +1,12
1 using System;
2 using System.Collections.Generic;
3 using System.Linq;
4 using System.Text;
5
6 namespace Implab {
7 public interface ITaskController: IProgressHandler {
8 bool Cancelled {
9 get;
10 }
11 }
12 }
@@ -0,0 +1,11
1 using System;
2 using System.Collections.Generic;
3 using System.Linq;
4 using System.Text;
5
6 namespace Implab {
7
8 /*public class ManagedPromise<T>: Promise<T>, ITaskController, IProgressNotifier {
9
10 }*/
11 }
@@ -0,0 +1,171
1 using System;
2 using System.Collections.Generic;
3 using System.Diagnostics;
4 using System.Linq;
5 using System.Text;
6 using System.Threading;
7
8 namespace Implab.Parallels {
9 public static class ArrayTraits {
10 class ArrayIterator<TSrc> : DispatchPool<int> {
11 readonly Action<TSrc> m_action;
12 readonly TSrc[] m_source;
13 readonly Promise<int> m_promise = new Promise<int>();
14
15 int m_pending;
16 int m_next;
17
18 public ArrayIterator(TSrc[] source, Action<TSrc> action, int threads)
19 : base(threads) {
20
21 Debug.Assert(source != null);
22 Debug.Assert(action != null);
23
24 m_next = 0;
25 m_source = source;
26 m_pending = source.Length;
27 m_action = action;
28
29 m_promise.Anyway(() => Dispose());
30 m_promise.Cancelled(() => Dispose());
31
32 InitPool();
33 }
34
35 public Promise<int> Promise {
36 get {
37 return m_promise;
38 }
39 }
40
41 protected override bool TryDequeue(out int unit) {
42 unit = Interlocked.Increment(ref m_next) - 1;
43 return unit >= m_source.Length ? false : true;
44 }
45
46 protected override void InvokeUnit(int unit) {
47 try {
48 m_action(m_source[unit]);
49 var pending = Interlocked.Decrement(ref m_pending);
50 if (pending == 0)
51 m_promise.Resolve(m_source.Length);
52 } catch (Exception e) {
53 m_promise.Reject(e);
54 }
55 }
56 }
57
58 class ArrayMapper<TSrc, TDst>: DispatchPool<int> {
59 readonly Func<TSrc, TDst> m_transform;
60 readonly TSrc[] m_source;
61 readonly TDst[] m_dest;
62 readonly Promise<TDst[]> m_promise = new Promise<TDst[]>();
63
64 int m_pending;
65 int m_next;
66
67 public ArrayMapper(TSrc[] source, Func<TSrc, TDst> transform, int threads)
68 : base(threads) {
69
70 Debug.Assert (source != null);
71 Debug.Assert( transform != null);
72
73 m_next = 0;
74 m_source = source;
75 m_dest = new TDst[source.Length];
76 m_pending = source.Length;
77 m_transform = transform;
78
79 m_promise.Anyway(() => Dispose());
80 m_promise.Cancelled(() => Dispose());
81
82 InitPool();
83 }
84
85 public Promise<TDst[]> Promise {
86 get {
87 return m_promise;
88 }
89 }
90
91 protected override bool TryDequeue(out int unit) {
92 unit = Interlocked.Increment(ref m_next) - 1;
93 return unit >= m_source.Length ? false : true;
94 }
95
96 protected override void InvokeUnit(int unit) {
97 try {
98 m_dest[unit] = m_transform(m_source[unit]);
99 var pending = Interlocked.Decrement(ref m_pending);
100 if (pending == 0)
101 m_promise.Resolve(m_dest);
102 } catch (Exception e) {
103 m_promise.Reject(e);
104 }
105 }
106 }
107
108 public static Promise<TDst[]> ParallelMap<TSrc, TDst> (this TSrc[] source, Func<TSrc,TDst> transform, int threads) {
109 if (source == null)
110 throw new ArgumentNullException("source");
111 if (transform == null)
112 throw new ArgumentNullException("transform");
113
114 var mapper = new ArrayMapper<TSrc, TDst>(source, transform, threads);
115 return mapper.Promise;
116 }
117
118 public static Promise<int> ParallelForEach<TSrc>(this TSrc[] source, Action<TSrc> action, int threads) {
119 if (source == null)
120 throw new ArgumentNullException("source");
121 if (action == null)
122 throw new ArgumentNullException("action");
123
124 var iter = new ArrayIterator<TSrc>(source, action, threads);
125 return iter.Promise;
126 }
127
128 public static Promise<TDst[]> ChainedMap<TSrc, TDst>(this TSrc[] source, ChainedOperation<TSrc, TDst> transform, int threads) {
129 if (source == null)
130 throw new ArgumentNullException("source");
131 if (transform == null)
132 throw new ArgumentNullException("transform");
133 if (threads <= 0)
134 throw new ArgumentOutOfRangeException("Threads number must be greater then zero");
135
136 var promise = new Promise<TDst[]>();
137 var res = new TDst[source.Length];
138 var pending = source.Length;
139 var semaphore = new Semaphore(threads, threads);
140
141 AsyncPool.InvokeNewThread(() => {
142 for (int i = 0; i < source.Length; i++) {
143 if(promise.State != PromiseState.Unresolved)
144 break; // stop processing in case of error or cancellation
145 var idx = i;
146 semaphore.WaitOne();
147 try {
148 var p1 = transform(source[i]);
149 p1.Anyway(() => semaphore.Release());
150 p1.Cancelled(() => semaphore.Release());
151 p1.Then(
152 x => {
153 res[idx] = x;
154 var left = Interlocked.Decrement(ref pending);
155 if (left == 0)
156 promise.Resolve(res);
157 },
158 e => promise.Reject(e)
159 );
160
161 } catch (Exception e) {
162 promise.Reject(e);
163 }
164 }
165 return 0;
166 });
167
168 return promise.Anyway(() => semaphore.Dispose());
169 }
170 }
171 }
@@ -0,0 +1,44
1 using System;
2 using System.Threading;
3
4 namespace Implab.Parallels {
5 /// <summary>
6 /// Класс для распаралеливания задач.
7 /// </summary>
8 /// <remarks>
9 /// Используя данный класс и лямда выражения можно распараллелить
10 /// вычисления, для этого используется концепция обещаний.
11 /// </remarks>
12 public static class AsyncPool {
13
14 public static Promise<T> Invoke<T>(Func<T> func) {
15 var p = new Promise<T>();
16
17 ThreadPool.QueueUserWorkItem(param => {
18 try {
19 p.Resolve(func());
20 } catch(Exception e) {
21 p.Reject(e);
22 }
23 });
24
25 return p;
26 }
27
28 public static Promise<T> InvokeNewThread<T>(Func<T> func) {
29 var p = new Promise<T>();
30
31 var worker = new Thread(() => {
32 try {
33 p.Resolve(func());
34 } catch (Exception e) {
35 p.Reject(e);
36 }
37 });
38 worker.IsBackground = true;
39 worker.Start();
40
41 return p;
42 }
43 }
44 }
@@ -0,0 +1,238
1 using System;
2 using System.Collections.Generic;
3 using System.Linq;
4 using System.Text;
5 using System.Threading;
6 using System.Diagnostics;
7
8 namespace Implab.Parallels {
9 public abstract class DispatchPool<TUnit> : IDisposable {
10 readonly int m_minThreads;
11 readonly int m_maxThreads;
12 int m_runningThreads = 0;
13 int m_maxRunningThreads = 0;
14 int m_suspended = 0;
15 int m_exitRequired = 0;
16 AutoResetEvent m_hasTasks = new AutoResetEvent(false);
17
18 protected DispatchPool(int min, int max) {
19 if (min < 0)
20 throw new ArgumentOutOfRangeException("min");
21 if (max <= 0)
22 throw new ArgumentOutOfRangeException("max");
23
24 if (min > max)
25 min = max;
26 m_minThreads = min;
27 m_maxThreads = max;
28 }
29
30 protected DispatchPool(int threads)
31 : this(threads, threads) {
32 }
33
34 protected DispatchPool() {
35 int maxThreads, maxCP;
36 ThreadPool.GetMaxThreads(out maxThreads, out maxCP);
37
38 m_minThreads = 0;
39 m_maxThreads = maxThreads;
40 }
41
42 protected void InitPool() {
43 for (int i = 0; i < m_minThreads; i++)
44 StartWorker();
45 }
46
47 public int ThreadCount {
48 get {
49 return m_runningThreads;
50 }
51 }
52
53 public int MaxRunningThreads {
54 get {
55 return m_maxRunningThreads;
56 }
57 }
58
59 protected bool IsDisposed {
60 get {
61 return m_exitRequired != 0;
62 }
63 }
64
65 protected abstract bool TryDequeue(out TUnit unit);
66
67 protected virtual bool ExtendPool() {
68 if (m_suspended > 0) {
69 m_hasTasks.Set();
70 return true;
71 } else
72 return StartWorker();
73 }
74
75 /// <summary>
76 /// Запускает либо новый поток, если раньше не было ни одного потока, либо устанавливает событие пробуждение одного спящего потока
77 /// </summary>
78 protected void WakePool() {
79 m_hasTasks.Set(); // wake sleeping thread;
80
81 if (AllocateThreadSlot(1)) {
82 var worker = new Thread(this.Worker);
83 worker.IsBackground = true;
84 worker.Start();
85 }
86 }
87
88 protected virtual void Suspend() {
89 m_hasTasks.WaitOne();
90 }
91
92 #region thread slots traits
93
94 bool AllocateThreadSlot() {
95 int current;
96 // use spins to allocate slot for the new thread
97 do {
98 current = m_runningThreads;
99 if (current >= m_maxThreads || m_exitRequired != 0)
100 // no more slots left or the pool has been disposed
101 return false;
102 } while (current != Interlocked.CompareExchange(ref m_runningThreads, current + 1, current));
103
104 UpdateMaxThreads(current + 1);
105
106 return true;
107 }
108
109 bool AllocateThreadSlot(int desired) {
110 if (desired - 1 != Interlocked.CompareExchange(ref m_runningThreads, desired, desired - 1))
111 return false;
112
113 UpdateMaxThreads(desired);
114
115 return true;
116 }
117
118 bool ReleaseThreadSlot(out bool last) {
119 last = false;
120 int current;
121 // use spins to release slot for the new thread
122 do {
123 current = m_runningThreads;
124 if (current <= m_minThreads && m_exitRequired == 0)
125 // the thread is reserved
126 return false;
127 } while (current != Interlocked.CompareExchange(ref m_runningThreads, current - 1, current));
128
129 last = (current == 1);
130
131 return true;
132 }
133
134 /// <summary>
135 /// releases thread slot unconditionally, used during cleanup
136 /// </summary>
137 /// <returns>true - no more threads left</returns>
138 bool ReleaseThreadSlotAnyway() {
139 var left = Interlocked.Decrement(ref m_runningThreads);
140 return left == 0;
141 }
142
143 void UpdateMaxThreads(int count) {
144 int max;
145 do {
146 max = m_maxRunningThreads;
147 if (max >= count)
148 break;
149 } while(max != Interlocked.CompareExchange(ref m_maxRunningThreads, count, max));
150 }
151
152 #endregion
153
154 bool StartWorker() {
155 if (AllocateThreadSlot()) {
156 // slot successfully allocated
157 var worker = new Thread(this.Worker);
158 worker.IsBackground = true;
159 worker.Start();
160
161 return true;
162 } else {
163 return false;
164 }
165 }
166
167 bool FetchTask(out TUnit unit) {
168 do {
169 // exit if requested
170 if (m_exitRequired != 0) {
171 // release the thread slot
172 if (ReleaseThreadSlotAnyway()) // it was the last worker
173 m_hasTasks.Dispose();
174 else
175 m_hasTasks.Set(); // wake next worker
176 unit = default(TUnit);
177 return false;
178 }
179
180 // fetch task
181 if (TryDequeue(out unit)) {
182 ExtendPool();
183 return true;
184 }
185
186 //no tasks left, exit if the thread is no longer needed
187 bool last;
188 if (ReleaseThreadSlot(out last)) {
189 if (last && m_hasTasks.WaitOne(0)) {
190 if (AllocateThreadSlot(1))
191 continue; // spin again...
192 else
193 // we failed to reallocate slot for this thread
194 // therefore we need to release the event
195 m_hasTasks.Set();
196 }
197
198 return false;
199 }
200
201 // entering suspend state
202 Interlocked.Increment(ref m_suspended);
203 // keep this thread and wait
204 Suspend();
205 Interlocked.Decrement(ref m_suspended);
206 } while (true);
207 }
208
209 protected abstract void InvokeUnit(TUnit unit);
210
211 void Worker() {
212 TUnit unit;
213 while (FetchTask(out unit))
214 InvokeUnit(unit);
215 }
216
217 protected virtual void Dispose(bool disposing) {
218 if (disposing) {
219 if (m_exitRequired == 0) {
220 if (Interlocked.CompareExchange(ref m_exitRequired, 1, 0) != 0)
221 return;
222
223 // wake sleeping threads
224 m_hasTasks.Set();
225 GC.SuppressFinalize(this);
226 }
227 }
228 }
229
230 public void Dispose() {
231 Dispose(true);
232 }
233
234 ~DispatchPool() {
235 Dispose(false);
236 }
237 }
238 }
@@ -0,0 +1,74
1 using System;
2 using System.Collections.Generic;
3 using System.Linq;
4 using System.Text;
5 using System.Threading;
6
7 namespace Implab.Parallels {
8 public class MTQueue<T> {
9 class Node {
10 public Node(T value) {
11 this.value = value;
12 }
13 public readonly T value;
14 public Node next;
15 }
16
17 Node m_first;
18 Node m_last;
19
20 public void Enqueue(T value) {
21 var last = m_last;
22 var next = new Node(value);
23
24 while (last != Interlocked.CompareExchange(ref m_last, next, last))
25 last = m_last;
26
27 if (last != null)
28 last.next = next;
29 else
30 m_first = next;
31 }
32
33 public bool TryDequeue(out T value) {
34 Node first;
35 Node next = null;
36 value = default(T);
37
38 do {
39 first = m_first;
40 if (first == null)
41 return false;
42 next = first.next;
43 if (next == null) {
44 // this is the last element,
45 // then try to update tail
46 if (first != Interlocked.CompareExchange(ref m_last, null, first)) {
47 // this is inconsistent situation which means that the queue is empty
48 if (m_last == null)
49 return false;
50 // tail has been changed, that means that we need to restart
51 continue;
52 }
53
54 // tail succesfully updated and first.next will never be changed
55 // other readers will fail due to inconsistency m_last != m_fist, but m_first.next == null
56 // but the writer may update the m_first since the m_last is null
57
58 // so we need to fix inconsistency by setting m_first to null, but if it already has been
59 // updated by a writer then we should just give up
60 Interlocked.CompareExchange(ref m_first, null, first);
61 break;
62
63 } else {
64 if (first == Interlocked.CompareExchange(ref m_first, next, first))
65 // head succesfully updated
66 break;
67 }
68 } while (true);
69
70 value = first.value;
71 return true;
72 }
73 }
74 }
@@ -0,0 +1,89
1 using System;
2 using System.Collections.Generic;
3 using System.Linq;
4 using System.Text;
5 using System.Threading;
6 using System.Diagnostics;
7
8 namespace Implab.Parallels {
9 public class WorkerPool : DispatchPool<Action> {
10
11 MTQueue<Action> m_queue = new MTQueue<Action>();
12 int m_queueLength = 0;
13 readonly int m_threshold = 1;
14
15 public WorkerPool(int minThreads, int maxThreads, int threshold)
16 : base(minThreads, maxThreads) {
17 m_threshold = threshold;
18 InitPool();
19 }
20
21 public WorkerPool(int minThreads, int maxThreads) :
22 base(minThreads, maxThreads) {
23 InitPool();
24 }
25
26 public WorkerPool(int threads)
27 : base(threads) {
28 InitPool();
29 }
30
31 public WorkerPool()
32 : base() {
33 InitPool();
34 }
35
36 public Promise<T> Invoke<T>(Func<T> task) {
37 if (task == null)
38 throw new ArgumentNullException("task");
39 if (IsDisposed)
40 throw new ObjectDisposedException(ToString());
41
42 var promise = new Promise<T>();
43
44 EnqueueTask(delegate() {
45 try {
46 promise.Resolve(task());
47 } catch (Exception e) {
48 promise.Reject(e);
49 }
50 });
51
52 return promise;
53 }
54
55 protected void EnqueueTask(Action unit) {
56 Debug.Assert(unit != null);
57 var len = Interlocked.Increment(ref m_queueLength);
58 m_queue.Enqueue(unit);
59
60 if(!ExtendPool())
61 WakePool();
62 }
63
64 protected override bool ExtendPool() {
65 if (m_queueLength <= m_threshold*ThreadCount)
66 // in this case we are in active thread and it request for additional workers
67 // satisfy it only when queue is longer than threshold
68 return false;
69 return base.ExtendPool();
70 }
71
72 protected override bool TryDequeue(out Action unit) {
73 if (m_queue.TryDequeue(out unit)) {
74 Interlocked.Decrement(ref m_queueLength);
75 return true;
76 }
77 return false;
78 }
79
80 protected override void InvokeUnit(Action unit) {
81 unit();
82 }
83
84 protected override void Suspend() {
85 if (m_queueLength == 0)
86 base.Suspend();
87 }
88 }
89 }
@@ -0,0 +1,36
1 using System;
2 using System.Collections.Generic;
3 using System.Linq;
4 using System.Text;
5
6 namespace Implab
7 {
8 [Serializable]
9 public class ProgressInitEventArgs: EventArgs
10 {
11 public float MaxProgress
12 {
13 get;
14 private set;
15 }
16
17 public float CurrentProgress
18 {
19 get;
20 private set;
21 }
22
23 public string Message
24 {
25 get;
26 private set;
27 }
28
29 public ProgressInitEventArgs(float current, float max, string message)
30 {
31 this.MaxProgress = max;
32 this.CurrentProgress = current;
33 this.Message = message;
34 }
35 }
36 }
@@ -0,0 +1,15
1 using System;
2 using System.Collections.Generic;
3 using System.Linq;
4 using System.Text;
5
6 namespace Implab
7 {
8 public enum PromiseState
9 {
10 Unresolved,
11 Resolved,
12 Cancelled,
13 Rejected
14 }
15 }
@@ -0,0 +1,132
1 using System;
2 using System.Collections.Generic;
3 using System.Linq;
4 using System.Text;
5 using System.Threading;
6
7 namespace Implab
8 {
9 /// <summary>
10 /// This class allows to interact with asyncronuos task.
11 /// </summary>
12 /// <remarks>
13 /// Members of this object are thread safe.
14 /// </remarks>
15 class TaskController: IProgressNotifier, ITaskController, ICancellable
16 {
17 readonly object m_lock;
18 string m_message;
19
20 float m_current;
21 float m_max;
22
23 bool m_cancelled;
24
25 public event EventHandler<ValueEventArgs<string>> MessageUpdated;
26 public event EventHandler<ValueEventArgs<float>> ProgressUpdated;
27 public event EventHandler<ProgressInitEventArgs> ProgressInit;
28
29 public TaskController()
30 {
31 m_lock = new Object();
32 }
33
34 public string Message
35 {
36 get
37 {
38 lock (m_lock)
39 return m_message;
40 }
41 set
42 {
43 lock (m_lock)
44 {
45 m_message = value;
46 OnMessageUpdated();
47 }
48 }
49 }
50
51 public float CurrentProgress
52 {
53 get
54 {
55 lock (m_lock)
56 return m_current;
57 }
58 set
59 {
60 lock (m_lock)
61 {
62 var prev = m_current;
63 m_current = value;
64 if (m_current >= m_max)
65 m_current = m_max;
66 if (m_current != prev)
67 OnProgressUpdated();
68 }
69 }
70 }
71
72 public void InitProgress(float current, float max, string message)
73 {
74 if (max < 0)
75 throw new ArgumentOutOfRangeException("max");
76 if (current < 0 || current > max)
77 throw new ArgumentOutOfRangeException("current");
78
79 lock(m_lock) {
80 m_current = current;
81 m_max = max;
82 m_message = message;
83 OnProgressInit();
84 }
85 }
86
87 public bool Cancelled {
88 get {
89 lock (m_lock)
90 return m_cancelled;
91 }
92 }
93
94 public bool Cancel() {
95 lock (m_lock) {
96 if (!m_cancelled) {
97 m_cancelled = true;
98 return true;
99 } else {
100 return false;
101 }
102 }
103 }
104
105 protected virtual void OnMessageUpdated()
106 {
107 var temp = MessageUpdated;
108 if (temp != null)
109 {
110 temp(this, new ValueEventArgs<string>(m_message));
111 }
112 }
113
114 protected virtual void OnProgressUpdated()
115 {
116 var temp = ProgressUpdated;
117 if (temp != null)
118 {
119 temp(this,new ValueEventArgs<float>(m_current));
120 }
121 }
122
123 protected virtual void OnProgressInit()
124 {
125 var temp = ProgressInit;
126 if (temp != null)
127 {
128 temp(this, new ProgressInitEventArgs(m_current,m_max, m_message));
129 }
130 }
131 }
132 }
@@ -0,0 +1,21
1 using System;
2 using System.Collections.Generic;
3 using System.Linq;
4 using System.Text;
5
6 namespace Implab
7 {
8 [Serializable]
9 public class ValueEventArgs<T>: EventArgs
10 {
11 public ValueEventArgs(T value)
12 {
13 this.Value = value;
14 }
15 public T Value
16 {
17 get;
18 private set;
19 }
20 }
21 }
@@ -10,3 +10,4 Implab.Fx/obj/
10 Implab.Fx/bin/
10 Implab.Fx/bin/
11 Implab.Fx.Test/bin/
11 Implab.Fx.Test/bin/
12 Implab.Fx.Test/obj/
12 Implab.Fx.Test/obj/
13 _ReSharper.Implab/
@@ -1,101 +1,333
1 using System;
1 using System;
2 using Microsoft.VisualStudio.TestTools.UnitTesting;
2 using Microsoft.VisualStudio.TestTools.UnitTesting;
3 using Implab;
4 using System.Reflection;
3 using System.Reflection;
5 using System.Threading;
4 using System.Threading;
5 using Implab.Parallels;
6
6
7 namespace Implab.Tests
7 namespace Implab.Test {
8 {
8 [TestClass]
9 [TestClass]
9 public class AsyncTests {
10 public class AsyncTests
10 [TestMethod]
11 {
11 public void ResolveTest() {
12 [TestMethod]
12 int res = -1;
13 public void ResolveTest ()
13 var p = new Promise<int>();
14 {
14 p.Then(x => res = x);
15 int res = -1;
15 p.Resolve(100);
16 var p = new Promise<int> ();
16
17 p.Then (x => res = x);
17 Assert.AreEqual(res, 100);
18 p.Resolve (100);
18 }
19
20 [TestMethod]
21 public void RejectTest() {
22 int res = -1;
23 Exception err = null;
24
25 var p = new Promise<int>();
26 p.Then(x => res = x, e => err = e);
27 p.Reject(new ApplicationException("error"));
28
29 Assert.AreEqual(res, -1);
30 Assert.AreEqual(err.Message, "error");
31
32 }
33
34 [TestMethod]
35 public void JoinSuccessTest() {
36 var p = new Promise<int>();
37 p.Resolve(100);
38 Assert.AreEqual(p.Join(), 100);
39 }
19
40
20 Assert.AreEqual (res, 100);
41 [TestMethod]
21 }
42 public void JoinFailTest() {
43 var p = new Promise<int>();
44 p.Reject(new ApplicationException("failed"));
45
46 try {
47 p.Join();
48 throw new ApplicationException("WRONG!");
49 } catch (TargetInvocationException err) {
50 Assert.AreEqual(err.InnerException.Message, "failed");
51 } catch {
52 Assert.Fail("Got wrong excaption");
53 }
54 }
55
56 [TestMethod]
57 public void MapTest() {
58 var p = new Promise<int>();
59
60 var p2 = p.Map(x => x.ToString());
61 p.Resolve(100);
62
63 Assert.AreEqual(p2.Join(), "100");
64 }
65
66 [TestMethod]
67 public void FixErrorTest() {
68 var p = new Promise<int>();
69
70 var p2 = p.Error(e => 101);
71
72 p.Reject(new Exception());
73
74 Assert.AreEqual(p2.Join(), 101);
75 }
22
76
23 [TestMethod]
77 [TestMethod]
24 public void RejectTest ()
78 public void ChainTest() {
25 {
79 var p1 = new Promise<int>();
26 int res = -1;
80
27 Exception err = null;
81 var p3 = p1.Chain(x => {
82 var p2 = new Promise<string>();
83 p2.Resolve(x.ToString());
84 return p2;
85 });
86
87 p1.Resolve(100);
28
88
29 var p = new Promise<int> ();
89 Assert.AreEqual(p3.Join(), "100");
30 p.Then (x => res = x, e => err = e);
90 }
31 p.Reject (new ApplicationException ("error"));
32
91
33 Assert.AreEqual (res, -1);
92 [TestMethod]
34 Assert.AreEqual (err.Message, "error");
93 public void PoolTest() {
94 var pid = Thread.CurrentThread.ManagedThreadId;
95 var p = AsyncPool.Invoke(() => Thread.CurrentThread.ManagedThreadId);
35
96
36 }
97 Assert.AreNotEqual(pid, p.Join());
98 }
37
99
38 [TestMethod]
100 [TestMethod]
39 public void JoinSuccessTest ()
101 public void WorkerPoolSizeTest() {
40 {
102 var pool = new WorkerPool(5, 10, 0);
41 var p = new Promise<int> ();
103
42 p.Resolve (100);
104 Assert.AreEqual(5, pool.ThreadCount);
43 Assert.AreEqual (p.Join (), 100);
105
44 }
106 pool.Invoke(() => { Thread.Sleep(1000000); return 10; });
107 pool.Invoke(() => { Thread.Sleep(1000000); return 10; });
108 pool.Invoke(() => { Thread.Sleep(1000000); return 10; });
109
110 Assert.AreEqual(5, pool.ThreadCount);
111
112 for (int i = 0; i < 100; i++)
113 pool.Invoke(() => { Thread.Sleep(1000000); return 10; });
114 Thread.Sleep(100);
115 Assert.AreEqual(10, pool.ThreadCount);
116
117 pool.Dispose();
118 }
119
120 [TestMethod]
121 public void WorkerPoolCorrectTest() {
122 var pool = new WorkerPool(0,1000,100);
123
124 int iterations = 1000;
125 int pending = iterations;
126 var stop = new ManualResetEvent(false);
127
128 var count = 0;
129 for (int i = 0; i < iterations; i++) {
130 pool
131 .Invoke(() => 1)
132 .Then(x => Interlocked.Add(ref count, x))
133 .Then(x => Math.Log10(x))
134 .Anyway(() => {
135 Interlocked.Decrement(ref pending);
136 if (pending == 0)
137 stop.Set();
138 });
139 }
140
141 stop.WaitOne();
142
143 Assert.AreEqual(iterations, count);
144 Console.WriteLine("Max threads: {0}", pool.MaxRunningThreads);
145 pool.Dispose();
146
147 }
148
149 [TestMethod]
150 public void WorkerPoolDisposeTest() {
151 var pool = new WorkerPool(5, 20);
152 Assert.AreEqual(5, pool.ThreadCount);
153 pool.Dispose();
154 Thread.Sleep(100);
155 Assert.AreEqual(0, pool.ThreadCount);
156 pool.Dispose();
157 }
45
158
46 [TestMethod]
159 [TestMethod]
47 public void JoinFailTest ()
160 public void MTQueueTest() {
48 {
161 var queue = new MTQueue<int>();
49 var p = new Promise<int> ();
162 int res;
50 p.Reject (new ApplicationException ("failed"));
163
164 queue.Enqueue(10);
165 Assert.IsTrue(queue.TryDequeue(out res));
166 Assert.AreEqual(10, res);
167 Assert.IsFalse(queue.TryDequeue(out res));
168
169 for (int i = 0; i < 1000; i++)
170 queue.Enqueue(i);
171
172 for (int i = 0; i < 1000; i++) {
173 queue.TryDequeue(out res);
174 Assert.AreEqual(i, res);
175 }
176
177 int writers = 0;
178 int readers = 0;
179 var stop = new ManualResetEvent(false);
180 int total = 0;
181
182 int itemsPerWriter = 1000;
183 int writersCount = 3;
51
184
52 try {
185 for (int i = 0; i < writersCount; i++) {
53 p.Join ();
186 Interlocked.Increment(ref writers);
54 throw new ApplicationException ("WRONG!");
187 var wn = i;
55 } catch (TargetInvocationException err) {
188 AsyncPool
56 Assert.AreEqual (err.InnerException.Message, "failed");
189 .InvokeNewThread(() => {
57 } catch {
190 for (int ii = 0; ii < itemsPerWriter; ii++) {
58 Assert.Fail ("Got wrong excaption");
191 queue.Enqueue(1);
59 }
192 }
60 }
193 return 1;
194 })
195 .Anyway(() => Interlocked.Decrement(ref writers));
196 }
197
198 for (int i = 0; i < 10; i++) {
199 Interlocked.Increment(ref readers);
200 var wn = i;
201 AsyncPool
202 .InvokeNewThread(() => {
203 int t;
204 do {
205 while (queue.TryDequeue(out t))
206 Interlocked.Add(ref total, t);
207 } while (writers > 0);
208 return 1;
209 })
210 .Anyway(() => {
211 Interlocked.Decrement(ref readers);
212 if (readers == 0)
213 stop.Set();
214 });
215 }
216
217 stop.WaitOne();
218
219 Assert.AreEqual(itemsPerWriter * writersCount, total);
220 }
61
221
62 [TestMethod]
222 [TestMethod]
63 public void MapTest ()
223 public void ParallelMapTest() {
64 {
224
65 var p = new Promise<int> ();
225 int count = 100000;
226
227 double[] args = new double[count];
228 var rand = new Random();
229
230 for (int i = 0; i < count; i++)
231 args[i] = rand.NextDouble();
66
232
67 var p2 = p.Map (x => x.ToString ());
233 var t = Environment.TickCount;
68 p.Resolve (100);
234 var res = args.ParallelMap(x => Math.Sin(x*x), 4).Join();
235
236 Console.WriteLine("Map complete in {0} ms", Environment.TickCount - t);
69
237
70 Assert.AreEqual (p2.Join (), "100");
238 t = Environment.TickCount;
71 }
239 for (int i = 0; i < count; i++)
240 Assert.AreEqual(Math.Sin(args[i] * args[i]), res[i]);
241 Console.WriteLine("Verified in {0} ms", Environment.TickCount - t);
242 }
72
243
73 [TestMethod]
244 [TestMethod]
74 public void ChainTest ()
245 public void ChainedMapTest() {
75 {
246
76 var p1 = new Promise<int> ();
247 using (var pool = new WorkerPool(8,100,0)) {
248 int count = 10000;
249
250 double[] args = new double[count];
251 var rand = new Random();
252
253 for (int i = 0; i < count; i++)
254 args[i] = rand.NextDouble();
77
255
78 var p3 = p1.Chain (x => {
256 var t = Environment.TickCount;
79 var p2 = new Promise<string> ();
257 var res = args
80 p2.Resolve (x.ToString ());
258 .ChainedMap(
81 return p2;
259 x => pool.Invoke(
82 });
260 () => Math.Sin(x * x)
261 ),
262 4
263 )
264 .Join();
83
265
84 p1.Resolve (100);
266 Console.WriteLine("Map complete in {0} ms", Environment.TickCount - t);
85
267
86 Assert.AreEqual (p3.Join (), "100");
268 t = Environment.TickCount;
87 }
269 for (int i = 0; i < count; i++)
270 Assert.AreEqual(Math.Sin(args[i] * args[i]), res[i]);
271 Console.WriteLine("Verified in {0} ms", Environment.TickCount - t);
272 Console.WriteLine("Max workers: {0}", pool.MaxRunningThreads);
273 }
274 }
88
275
89 [TestMethod]
276 [TestMethod]
90 public void PoolTest ()
277 public void ParallelForEachTest() {
91 {
278
92 var pid = Thread.CurrentThread.ManagedThreadId;
279 int count = 100000;
93 var p = AsyncPool.Invoke (() => {
280
94 return Thread.CurrentThread.ManagedThreadId;
281 int[] args = new int[count];
95 });
282 var rand = new Random();
283
284 for (int i = 0; i < count; i++)
285 args[i] = (int)(rand.NextDouble() * 100);
286
287 int result = 0;
288
289 var t = Environment.TickCount;
290 args.ParallelForEach(x => Interlocked.Add(ref result, x), 4).Join();
291
292 Console.WriteLine("Iteration complete in {0} ms, result: {1}", Environment.TickCount - t, result);
293
294 int result2 = 0;
295
296 t = Environment.TickCount;
297 for (int i = 0; i < count; i++)
298 result2 += args[i];
299 Assert.AreEqual(result2, result);
300 Console.WriteLine("Verified in {0} ms", Environment.TickCount - t);
301 }
96
302
97 Assert.AreNotEqual (pid, p.Join ());
303 [TestMethod]
98 }
304 public void ComplexCase1Test() {
99 }
305 var flags = new bool[3];
306
307 // op1 (aync 200ms) => op2 (async 200ms) => op3 (sync map)
308
309 var p = PromiseHelper
310 .Sleep(200, "Alan")
311 .Cancelled(() => flags[0] = true)
312 .Chain(x =>
313 PromiseHelper
314 .Sleep(200, "Hi, " + x)
315 .Map(y => y)
316 .Cancelled(() => flags[1] = true)
317 )
318 .Cancelled(() => flags[2] = true);
319 Thread.Sleep(300);
320 p.Cancel();
321 try {
322 Assert.AreEqual(p.Join(), "Hi, Alan");
323 Assert.Fail("Shouldn't get here");
324 } catch (OperationCanceledException) {
325 }
326
327 Assert.IsFalse(flags[0]);
328 Assert.IsTrue(flags[1]);
329 Assert.IsTrue(flags[2]);
330 }
331 }
100 }
332 }
101
333
@@ -46,6 +46,7
46 </ItemGroup>
46 </ItemGroup>
47 <ItemGroup>
47 <ItemGroup>
48 <Compile Include="AsyncTests.cs" />
48 <Compile Include="AsyncTests.cs" />
49 <Compile Include="PromiseHelper.cs" />
49 <Compile Include="Properties\AssemblyInfo.cs" />
50 <Compile Include="Properties\AssemblyInfo.cs" />
50 </ItemGroup>
51 </ItemGroup>
51 <ItemGroup>
52 <ItemGroup>
1 NO CONTENT: modified file, binary diff hidden
NO CONTENT: modified file, binary diff hidden
@@ -32,13 +32,25
32 <Reference Include="System" />
32 <Reference Include="System" />
33 </ItemGroup>
33 </ItemGroup>
34 <ItemGroup>
34 <ItemGroup>
35 <Compile Include="ICancellable.cs" />
36 <Compile Include="IProgressHandler.cs" />
37 <Compile Include="IProgressNotifier.cs" />
38 <Compile Include="IPromise.cs" />
39 <Compile Include="ITaskController.cs" />
40 <Compile Include="ManagedPromise.cs" />
41 <Compile Include="Parallels\DispatchPool.cs" />
42 <Compile Include="Parallels\ArrayTraits.cs" />
43 <Compile Include="Parallels\MTQueue.cs" />
44 <Compile Include="Parallels\WorkerPool.cs" />
45 <Compile Include="PromiseState.cs" />
46 <Compile Include="TaskController.cs" />
47 <Compile Include="ProgressInitEventArgs.cs" />
35 <Compile Include="Properties\AssemblyInfo.cs" />
48 <Compile Include="Properties\AssemblyInfo.cs" />
36 <Compile Include="Promise.cs" />
49 <Compile Include="Promise.cs" />
37 <Compile Include="AsyncPool.cs" />
50 <Compile Include="Parallels\AsyncPool.cs" />
38 <Compile Include="Safe.cs" />
51 <Compile Include="Safe.cs" />
52 <Compile Include="ValueEventArgs.cs" />
39 </ItemGroup>
53 </ItemGroup>
40 <Import Project="$(MSBuildBinPath)\Microsoft.CSharp.targets" />
54 <Import Project="$(MSBuildBinPath)\Microsoft.CSharp.targets" />
41 <ItemGroup>
55 <ItemGroup />
42 <Folder Include="Parallels\" />
43 </ItemGroup>
44 </Project> No newline at end of file
56 </Project>
@@ -1,18 +1,16
1 using System;
1 using System;
2 using System.Collections.Generic;
2 using System.Collections.Generic;
3 using System.Linq;
4 using System.Reflection;
3 using System.Reflection;
5 using System.Text;
6 using System.Diagnostics;
4 using System.Diagnostics;
7 using System.Threading;
5 using System.Threading;
8
6
9 namespace Implab {
7 namespace Implab {
10
8
11 public delegate void ErrorHandler(Exception e);
9 public delegate void ErrorHandler(Exception e);
12
10 public delegate T ErrorHandler<out T>(Exception e);
13 public delegate void ResultHandler<T>(T result);
11 public delegate void ResultHandler<in T>(T result);
14 public delegate TNew ResultMapper<TSrc, TNew>(TSrc result);
12 public delegate TNew ResultMapper<in TSrc, out TNew>(TSrc result);
15 public delegate Promise<TNew> ChainedOperation<TSrc, TNew>(TSrc result);
13 public delegate Promise<TNew> ChainedOperation<in TSrc, TNew>(TSrc result);
16
14
17 /// <summary>
15 /// <summary>
18 /// Класс для асинхронного получения результатов. Так называемое "обещание".
16 /// Класс для асинхронного получения результатов. Так называемое "обещание".
@@ -48,23 +46,23 namespace Implab {
48 /// только инициатор обещания иначе могут возникнуть противоречия.
46 /// только инициатор обещания иначе могут возникнуть противоречия.
49 /// </para>
47 /// </para>
50 /// </remarks>
48 /// </remarks>
51 public class Promise<T> {
49 public class Promise<T> : IPromise {
52
50
53 struct ResultHandlerInfo {
51 struct ResultHandlerInfo {
54 public ResultHandler<T> resultHandler;
52 public ResultHandler<T> resultHandler;
55 public ErrorHandler errorHandler;
53 public ErrorHandler errorHandler;
56 }
54 }
57
55
58 enum State {
56 readonly IPromise m_parent;
59 Unresolved,
57
60 Resolving,
58 LinkedList<ResultHandlerInfo> m_resultHandlers = new LinkedList<ResultHandlerInfo>();
61 Resolved,
59 LinkedList<Action> m_cancelHandlers = new LinkedList<Action>();
62 Cancelled
63 }
64
60
65 LinkedList<ResultHandlerInfo> m_handlersChain = new LinkedList<ResultHandlerInfo>();
61 readonly object m_lock = new Object();
66 State m_state;
62 readonly bool m_cancellable;
67 bool m_cancellable;
63 int m_childrenCount = 0;
64
65 PromiseState m_state;
68 T m_result;
66 T m_result;
69 Exception m_error;
67 Exception m_error;
70
68
@@ -72,13 +70,17 namespace Implab {
72 m_cancellable = true;
70 m_cancellable = true;
73 }
71 }
74
72
75 /// <summary>
73 public Promise(IPromise parent, bool cancellable) {
76 /// Событие, возникающее при отмене асинхронной операции.
74 m_cancellable = cancellable;
77 /// </summary>
75 m_parent = parent;
78 /// <description>
76 if (parent != null)
79 /// Как правило используется для оповещения объекта, выполняющего асинхронную операцию, о том, что ее следует отменить.
77 parent.HandleCancelled(InternalCancel);
80 /// </description>
78 }
81 public event EventHandler Cancelled;
79
80 void InternalCancel() {
81 // don't try to cancel parent :)
82 Cancel(false);
83 }
82
84
83 /// <summary>
85 /// <summary>
84 /// Выполняет обещание, сообщая об успешном выполнении.
86 /// Выполняет обещание, сообщая об успешном выполнении.
@@ -86,38 +88,39 namespace Implab {
86 /// <param name="result">Результат выполнения.</param>
88 /// <param name="result">Результат выполнения.</param>
87 /// <exception cref="InvalidOperationException">Данное обещание уже выполнено</exception>
89 /// <exception cref="InvalidOperationException">Данное обещание уже выполнено</exception>
88 public void Resolve(T result) {
90 public void Resolve(T result) {
89 lock (this) {
91 lock (m_lock) {
90 if (m_state == State.Cancelled)
92 if (m_state == PromiseState.Cancelled)
91 return;
93 return;
92 if (m_state != State.Unresolved)
94 if (m_state != PromiseState.Unresolved)
93 throw new InvalidOperationException("The promise is already resolved");
95 throw new InvalidOperationException("The promise is already resolved");
94 m_result = result;
96 m_result = result;
95 m_state = State.Resolving;
97 m_state = PromiseState.Resolved;
96 }
98 }
97
99
98 ResultHandlerInfo handler;
100 OnStateChanged();
99 while (FetchNextHandler(out handler))
100 InvokeHandler(handler);
101 }
101 }
102
102
103 /// <summary>
103 /// <summary>
104 /// Выполняет обещание, сообщая об ошибке
104 /// Выполняет обещание, сообщая об ошибке
105 /// </summary>
105 /// </summary>
106 /// <remarks>
107 /// Поскольку обещание должно работать в многопточной среде, при его выполнении сразу несколько потоков
108 /// могу вернуть ошибку, при этом только первая будет использована в качестве результата, остальные
109 /// будут проигнорированы.
110 /// </remarks>
106 /// <param name="error">Исключение возникшее при выполнении операции</param>
111 /// <param name="error">Исключение возникшее при выполнении операции</param>
107 /// <exception cref="InvalidOperationException">Данное обещание уже выполнено</exception>
112 /// <exception cref="InvalidOperationException">Данное обещание уже выполнено</exception>
108 public void Reject(Exception error) {
113 public void Reject(Exception error) {
109 lock (this) {
114 lock (m_lock) {
110 if (m_state == State.Cancelled)
115 if (m_state == PromiseState.Cancelled || m_state == PromiseState.Rejected)
111 return;
116 return;
112 if (m_state != State.Unresolved)
117 if (m_state != PromiseState.Unresolved)
113 throw new InvalidOperationException("The promise is already resolved");
118 throw new InvalidOperationException("The promise is already resolved");
114 m_error = error;
119 m_error = error;
115 m_state = State.Resolving;
120 m_state = PromiseState.Rejected;
116 }
121 }
117
122
118 ResultHandlerInfo handler;
123 OnStateChanged();
119 while (FetchNextHandler(out handler))
120 InvokeHandler(handler);
121 }
124 }
122
125
123 /// <summary>
126 /// <summary>
@@ -125,47 +128,31 namespace Implab {
125 /// </summary>
128 /// </summary>
126 /// <returns><c>true</c> Операция была отменена, обработчики не будут вызваны.<c>false</c> отмена не возможна, поскольку обещание уже выполнено и обработчики отработали.</returns>
129 /// <returns><c>true</c> Операция была отменена, обработчики не будут вызваны.<c>false</c> отмена не возможна, поскольку обещание уже выполнено и обработчики отработали.</returns>
127 public bool Cancel() {
130 public bool Cancel() {
128 lock (this) {
131 return Cancel(true);
129 if (m_state == State.Unresolved && m_cancellable) {
130 m_state = State.Cancelled;
131 EventHandler temp = Cancelled;
132
133 if (temp != null)
134 temp(this, new EventArgs());
135
136 return true;
137 } else
138 return false;
139 }
140 }
132 }
141
133
142 /// <summary>
134 /// <summary>
143 /// Добавляет обработчики событий выполнения обещания.
135 /// Adds new handlers to this promise.
144 /// </summary>
136 /// </summary>
145 /// <param name="success">Обработчик успешного выполнения обещания.
137 /// <param name="success">The handler of the successfully completed operation.
146 /// Данному обработчику будет передан результат выполнения операции.</param>
138 /// This handler will recieve an operation result as a parameter.</param>
147 /// <param name="error">Обработчик ошибки. Данный обработчик получит
139 /// <param name="error">Handles an exception that may occur during the operation.</param>
148 /// исключение возникшее при выполнении операции.</param>
140 /// <returns>The new promise chained to this one.</returns>
149 /// <returns>Само обещание</returns>
150 public Promise<T> Then(ResultHandler<T> success, ErrorHandler error) {
141 public Promise<T> Then(ResultHandler<T> success, ErrorHandler error) {
151 if (success == null && error == null)
142 if (success == null && error == null)
152 return this;
143 return this;
153
144
154 var medium = new Promise<T>();
145 var medium = new Promise<T>(this, true);
155
146
156 var handlerInfo = new ResultHandlerInfo();
147 var handlerInfo = new ResultHandlerInfo();
157
148
158 if (success != null)
149 if (success != null)
159 handlerInfo.resultHandler = x => {
150 handlerInfo.resultHandler = x => {
160 try {
151 success(x);
161 success(x);
152 medium.Resolve(x);
162 medium.Resolve(x);
163 } catch (Exception e) {
164 medium.Reject(e);
165 }
166 };
153 };
167 else
154 else
168 handlerInfo.resultHandler = x => medium.Resolve(x);
155 handlerInfo.resultHandler = medium.Resolve;
169
156
170 if (error != null)
157 if (error != null)
171 handlerInfo.errorHandler = x => {
158 handlerInfo.errorHandler = x => {
@@ -175,21 +162,106 namespace Implab {
175 medium.Reject(x);
162 medium.Reject(x);
176 };
163 };
177 else
164 else
178 handlerInfo.errorHandler = x => medium.Reject(x);
165 handlerInfo.errorHandler = medium.Reject;
166
167 AddHandler(handlerInfo);
168
169 return medium;
170 }
171
172 /// <summary>
173 /// Adds new handlers to this promise.
174 /// </summary>
175 /// <param name="success">The handler of the successfully completed operation.
176 /// This handler will recieve an operation result as a parameter.</param>
177 /// <param name="error">Handles an exception that may occur during the operation and returns the value which will be used as the result of the operation.</param>
178 /// <returns>The new promise chained to this one.</returns>
179 public Promise<T> Then(ResultHandler<T> success, ErrorHandler<T> error) {
180 if (success == null && error == null)
181 return this;
182
183 var medium = new Promise<T>(this, true);
184
185 var handlerInfo = new ResultHandlerInfo();
186
187 if (success != null)
188 handlerInfo.resultHandler = x => {
189 success(x);
190 medium.Resolve(x);
191 };
192 else
193 handlerInfo.resultHandler = medium.Resolve;
194
195 if (error != null)
196 handlerInfo.errorHandler = x => {
197 try {
198 medium.Resolve(error(x));
199 } catch { }
200 medium.Reject(x);
201 };
202 else
203 handlerInfo.errorHandler = medium.Reject;
179
204
180 AddHandler(handlerInfo);
205 AddHandler(handlerInfo);
181
206
182 return medium;
207 return medium;
183 }
208 }
184
209
210
185 public Promise<T> Then(ResultHandler<T> success) {
211 public Promise<T> Then(ResultHandler<T> success) {
186 return Then(success, null);
212 if (success == null)
213 return this;
214
215 var medium = new Promise<T>(this, true);
216
217 var handlerInfo = new ResultHandlerInfo();
218
219 if (success != null)
220 handlerInfo.resultHandler = x => {
221 success(x);
222 medium.Resolve(x);
223 };
224 else
225 handlerInfo.resultHandler = medium.Resolve;
226
227 handlerInfo.errorHandler = medium.Reject;
228
229 AddHandler(handlerInfo);
230
231 return medium;
187 }
232 }
188
233
189 public Promise<T> Error(ErrorHandler error) {
234 public Promise<T> Error(ErrorHandler error) {
190 return Then(null, error);
235 return Then(null, error);
191 }
236 }
192
237
238 /// <summary>
239 /// Handles error and allows to keep the promise.
240 /// </summary>
241 /// <remarks>
242 /// If the specified handler throws an exception, this exception will be used to reject the promise.
243 /// </remarks>
244 /// <param name="handler">The error handler which returns the result of the promise.</param>
245 /// <returns>New promise.</returns>
246 public Promise<T> Error(ErrorHandler<T> handler) {
247 if (handler == null)
248 return this;
249
250 var medium = new Promise<T>(this, true);
251
252 AddHandler(new ResultHandlerInfo {
253 errorHandler = e => {
254 try {
255 medium.Resolve(handler(e));
256 } catch (Exception e2) {
257 medium.Reject(e2);
258 }
259 }
260 });
261
262 return medium;
263 }
264
193 public Promise<T> Anyway(Action handler) {
265 public Promise<T> Anyway(Action handler) {
194 if (handler == null)
266 if (handler == null)
195 return this;
267 return this;
@@ -198,6 +270,7 namespace Implab {
198
270
199 AddHandler(new ResultHandlerInfo {
271 AddHandler(new ResultHandlerInfo {
200 resultHandler = x => {
272 resultHandler = x => {
273 // to avoid handler being called multiple times we handle exception by ourselfs
201 try {
274 try {
202 handler();
275 handler();
203 medium.Resolve(x);
276 medium.Resolve(x);
@@ -229,20 +302,15 namespace Implab {
229 throw new ArgumentNullException("mapper");
302 throw new ArgumentNullException("mapper");
230
303
231 // создаем прицепленное обещание
304 // создаем прицепленное обещание
232 Promise<TNew> chained = new Promise<TNew>();
305 var chained = new Promise<TNew>();
233
306
234 AddHandler(new ResultHandlerInfo() {
307 AddHandler(new ResultHandlerInfo() {
235 resultHandler = delegate(T result) {
308 resultHandler = result => chained.Resolve(mapper(result)),
236 try {
237 // если преобразование выдаст исключение, то сработает reject сцепленного deferred
238 chained.Resolve(mapper(result));
239 } catch (Exception e) {
240 chained.Reject(e);
241 }
242 },
243 errorHandler = delegate(Exception e) {
309 errorHandler = delegate(Exception e) {
244 if (error != null)
310 if (error != null)
245 error(e);
311 try {
312 error(e);
313 } catch { }
246 // в случае ошибки нужно передать исключение дальше по цепочке
314 // в случае ошибки нужно передать исключение дальше по цепочке
247 chained.Reject(e);
315 chained.Reject(e);
248 }
316 }
@@ -271,19 +339,21 namespace Implab {
271 // создать посредника, к которому будут подвызяваться следующие обработчики.
339 // создать посредника, к которому будут подвызяваться следующие обработчики.
272 // когда будет выполнена реальная асинхронная операция, она обратиться к посреднику, чтобы
340 // когда будет выполнена реальная асинхронная операция, она обратиться к посреднику, чтобы
273 // передать через него результаты работы.
341 // передать через него результаты работы.
274 Promise<TNew> medium = new Promise<TNew>();
342 var medium = new Promise<TNew>(this, true);
275
343
276 AddHandler(new ResultHandlerInfo() {
344 AddHandler(new ResultHandlerInfo {
277 resultHandler = delegate(T result) {
345 resultHandler = delegate(T result) {
278 try {
346 if (medium.State == PromiseState.Cancelled)
279 chained(result).Then(
347 return;
280 x => medium.Resolve(x),
348
281 e => medium.Reject(e)
349 var promise = chained(result);
282 );
350
283 } catch (Exception e) {
351 // notify chained operation that it's not needed
284 // если сцепленное действие выдало исключение вместо обещания, то передаем ошибку по цепочке
352 medium.Cancelled(() => promise.Cancel());
285 medium.Reject(e);
353 promise.Then(
286 }
354 x => medium.Resolve(x),
355 e => medium.Reject(e)
356 );
287 },
357 },
288 errorHandler = delegate(Exception e) {
358 errorHandler = delegate(Exception e) {
289 if (error != null)
359 if (error != null)
@@ -300,6 +370,22 namespace Implab {
300 return Chain(chained, null);
370 return Chain(chained, null);
301 }
371 }
302
372
373 public Promise<T> Cancelled(Action handler) {
374 if (handler == null)
375 return this;
376 lock (m_lock) {
377 if (m_state == PromiseState.Unresolved)
378 m_cancelHandlers.AddLast(handler);
379 else if (m_state == PromiseState.Cancelled)
380 handler();
381 }
382 return this;
383 }
384
385 public void HandleCancelled(Action handler) {
386 Cancelled(handler);
387 }
388
303 /// <summary>
389 /// <summary>
304 /// Дожидается отложенного обещания и в случае успеха, возвращает
390 /// Дожидается отложенного обещания и в случае успеха, возвращает
305 /// его, результат, в противном случае бросает исключение.
391 /// его, результат, в противном случае бросает исключение.
@@ -322,52 +408,37 namespace Implab {
322 /// <param name="timeout">Время ожидания</param>
408 /// <param name="timeout">Время ожидания</param>
323 /// <returns>Результат выполнения обещания</returns>
409 /// <returns>Результат выполнения обещания</returns>
324 public T Join(int timeout) {
410 public T Join(int timeout) {
325 ManualResetEvent evt = new ManualResetEvent(false);
411 var evt = new ManualResetEvent(false);
326 Anyway(() => evt.Set());
412 Anyway(() => evt.Set());
413 Cancelled(() => evt.Set());
327
414
328 if (!evt.WaitOne(timeout, true))
415 if (!evt.WaitOne(timeout, true))
329 throw new TimeoutException();
416 throw new TimeoutException();
330
417
331 if (m_error != null)
418 switch (State) {
332 throw new TargetInvocationException(m_error);
419 case PromiseState.Resolved:
333 else
420 return m_result;
334 return m_result;
421 case PromiseState.Cancelled:
422 throw new OperationCanceledException();
423 case PromiseState.Rejected:
424 throw new TargetInvocationException(m_error);
425 default:
426 throw new ApplicationException(String.Format("Invalid promise state {0}", State));
427 }
335 }
428 }
336
429
337 public T Join() {
430 public T Join() {
338 return Join(Timeout.Infinite);
431 return Join(Timeout.Infinite);
339 }
432 }
340
433
341 /// <summary>
342 /// Данный метод последовательно извлекает обработчики обещания и когда
343 /// их больше не осталось - ставит состояние "разрешено".
344 /// </summary>
345 /// <param name="handler">Информация об обработчике</param>
346 /// <returns>Признак того, что еще остались обработчики в очереди</returns>
347 bool FetchNextHandler(out ResultHandlerInfo handler) {
348 handler = default(ResultHandlerInfo);
349
350 lock (this) {
351 Debug.Assert(m_state == State.Resolving);
352
353 if (m_handlersChain.Count > 0) {
354 handler = m_handlersChain.First.Value;
355 m_handlersChain.RemoveFirst();
356 return true;
357 } else {
358 m_state = State.Resolved;
359 return false;
360 }
361 }
362 }
363
364 void AddHandler(ResultHandlerInfo handler) {
434 void AddHandler(ResultHandlerInfo handler) {
365 bool invokeRequired = false;
435 bool invokeRequired = false;
366
436
367 lock (this) {
437 lock (m_lock) {
368 if (m_state != State.Resolved)
438 m_childrenCount++;
369 m_handlersChain.AddLast(handler);
439 if (m_state == PromiseState.Unresolved) {
370 else
440 m_resultHandlers.AddLast(handler);
441 } else
371 invokeRequired = true;
442 invokeRequired = true;
372 }
443 }
373
444
@@ -377,21 +448,102 namespace Implab {
377 }
448 }
378
449
379 void InvokeHandler(ResultHandlerInfo handler) {
450 void InvokeHandler(ResultHandlerInfo handler) {
380 if (m_error == null) {
451 switch (m_state) {
381 try {
452 case PromiseState.Resolved:
382 if (handler.resultHandler != null)
453 try {
383 handler.resultHandler(m_result);
454 if (handler.resultHandler != null)
384 } catch { }
455 handler.resultHandler(m_result);
385 }
456 } catch (Exception e) {
386
457 try {
387 if (m_error != null) {
458 if (handler.errorHandler != null)
388 try {
459 handler.errorHandler(e);
389 if (handler.errorHandler != null)
460 } catch { }
390 handler.errorHandler(m_error);
461 }
391 } catch { }
462 break;
463 case PromiseState.Rejected:
464 try {
465 if (handler.errorHandler != null)
466 handler.errorHandler(m_error);
467 } catch { }
468 break;
469 default:
470 // do nothing
471 return;
392 }
472 }
393 }
473 }
394
474
475 protected virtual void OnStateChanged() {
476 switch (m_state) {
477 case PromiseState.Resolved:
478 foreach (var resultHandlerInfo in m_resultHandlers)
479 try {
480 if (resultHandlerInfo.resultHandler != null)
481 resultHandlerInfo.resultHandler(m_result);
482 } catch (Exception e) {
483 try {
484 if (resultHandlerInfo.errorHandler != null)
485 resultHandlerInfo.errorHandler(e);
486 } catch { }
487 }
488 break;
489 case PromiseState.Cancelled:
490 foreach (var cancelHandler in m_cancelHandlers)
491 cancelHandler();
492 break;
493 case PromiseState.Rejected:
494 foreach (var resultHandlerInfo in m_resultHandlers)
495 try {
496 if (resultHandlerInfo.errorHandler != null)
497 resultHandlerInfo.errorHandler(m_error);
498 } catch { }
499 break;
500 default:
501 throw new InvalidOperationException(String.Format("Promise entered an invalid state {0}", m_state));
502 }
503
504 m_resultHandlers = null;
505 m_cancelHandlers = null;
506 }
507
508
509
510 public bool IsExclusive {
511 get {
512 lock (m_lock) {
513 return m_childrenCount <= 1;
514 }
515 }
516 }
517
518 public PromiseState State {
519 get {
520 lock (m_lock) {
521 return m_state;
522 }
523 }
524 }
525
526 protected bool Cancel(bool dependencies) {
527 bool result;
528
529 lock (m_lock) {
530 if (m_state == PromiseState.Unresolved) {
531 m_state = PromiseState.Cancelled;
532 result = true;
533 } else {
534 result = false;
535 }
536 }
537
538 if (result)
539 OnStateChanged();
540
541 if (dependencies && m_parent != null && m_parent.IsExclusive) {
542 m_parent.Cancel();
543 }
544
545 return result;
546 }
395
547
396 }
548 }
397 }
549 }
1 NO CONTENT: file was removed
NO CONTENT: file was removed
General Comments 0
You need to be logged in to leave comments. Login now