##// END OF EJS Templates
Слияние с promises
cin -
r18:0c924dff5498 merge default
parent child
Show More
@@ -0,0 +1,17
1 using Implab.Parallels;
2 using System;
3 using System.Collections.Generic;
4 using System.Linq;
5 using System.Text;
6 using System.Threading;
7
8 namespace Implab.Test {
9 class PromiseHelper {
10 public static Promise<T> Sleep<T>(int timeout, T retVal) {
11 return AsyncPool.Invoke(() => {
12 Thread.Sleep(timeout);
13 return retVal;
14 });
15 }
16 }
17 }
1 NO CONTENT: new file 100644, binary diff hidden
@@ -0,0 +1,10
1 using System;
2 using System.Collections.Generic;
3 using System.Linq;
4 using System.Text;
5
6 namespace Implab {
7 public interface ICancellable {
8 bool Cancel();
9 }
10 }
@@ -0,0 +1,18
1 using System;
2 using System.Collections.Generic;
3 using System.Linq;
4 using System.Text;
5
6 namespace Implab {
7 public interface IProgressHandler {
8 string Message {
9 get;
10 set;
11 }
12 float CurrentProgress {
13 get;
14 set;
15 }
16 void InitProgress(float current, float max, string message);
17 }
18 }
@@ -0,0 +1,14
1 using System;
2 using System.Collections.Generic;
3 using System.Linq;
4 using System.Text;
5
6 namespace Implab
7 {
8 public interface IProgressNotifier
9 {
10 event EventHandler<ValueEventArgs<string>> MessageUpdated;
11 event EventHandler<ValueEventArgs<float>> ProgressUpdated;
12 event EventHandler<ProgressInitEventArgs> ProgressInit;
13 }
14 }
@@ -0,0 +1,33
1 using System;
2 using System.Collections.Generic;
3 using System.Linq;
4 using System.Text;
5
6 namespace Implab
7 {
8 public interface IPromise: ICancellable
9 {
10 /// <summary>
11 /// Check whereather the promise has no more than one dependent promise.
12 /// </summary>
13 bool IsExclusive
14 {
15 get;
16 }
17
18 /// <summary>
19 /// The current state of the promise.
20 /// </summary>
21 PromiseState State
22 {
23 get;
24 }
25
26 /// <summary>
27 /// Registers handler for the case when the promise is cencelled. If the promise already cancelled the
28 /// handler will be invoked immediatelly.
29 /// </summary>
30 /// <param name="handler">The handler</param>
31 void HandleCancelled(Action handler);
32 }
33 }
@@ -0,0 +1,12
1 using System;
2 using System.Collections.Generic;
3 using System.Linq;
4 using System.Text;
5
6 namespace Implab {
7 public interface ITaskController: IProgressHandler {
8 bool Cancelled {
9 get;
10 }
11 }
12 }
@@ -0,0 +1,11
1 using System;
2 using System.Collections.Generic;
3 using System.Linq;
4 using System.Text;
5
6 namespace Implab {
7
8 /*public class ManagedPromise<T>: Promise<T>, ITaskController, IProgressNotifier {
9
10 }*/
11 }
@@ -0,0 +1,171
1 using System;
2 using System.Collections.Generic;
3 using System.Diagnostics;
4 using System.Linq;
5 using System.Text;
6 using System.Threading;
7
8 namespace Implab.Parallels {
9 public static class ArrayTraits {
10 class ArrayIterator<TSrc> : DispatchPool<int> {
11 readonly Action<TSrc> m_action;
12 readonly TSrc[] m_source;
13 readonly Promise<int> m_promise = new Promise<int>();
14
15 int m_pending;
16 int m_next;
17
18 public ArrayIterator(TSrc[] source, Action<TSrc> action, int threads)
19 : base(threads) {
20
21 Debug.Assert(source != null);
22 Debug.Assert(action != null);
23
24 m_next = 0;
25 m_source = source;
26 m_pending = source.Length;
27 m_action = action;
28
29 m_promise.Anyway(() => Dispose());
30 m_promise.Cancelled(() => Dispose());
31
32 InitPool();
33 }
34
35 public Promise<int> Promise {
36 get {
37 return m_promise;
38 }
39 }
40
41 protected override bool TryDequeue(out int unit) {
42 unit = Interlocked.Increment(ref m_next) - 1;
43 return unit >= m_source.Length ? false : true;
44 }
45
46 protected override void InvokeUnit(int unit) {
47 try {
48 m_action(m_source[unit]);
49 var pending = Interlocked.Decrement(ref m_pending);
50 if (pending == 0)
51 m_promise.Resolve(m_source.Length);
52 } catch (Exception e) {
53 m_promise.Reject(e);
54 }
55 }
56 }
57
58 class ArrayMapper<TSrc, TDst>: DispatchPool<int> {
59 readonly Func<TSrc, TDst> m_transform;
60 readonly TSrc[] m_source;
61 readonly TDst[] m_dest;
62 readonly Promise<TDst[]> m_promise = new Promise<TDst[]>();
63
64 int m_pending;
65 int m_next;
66
67 public ArrayMapper(TSrc[] source, Func<TSrc, TDst> transform, int threads)
68 : base(threads) {
69
70 Debug.Assert (source != null);
71 Debug.Assert( transform != null);
72
73 m_next = 0;
74 m_source = source;
75 m_dest = new TDst[source.Length];
76 m_pending = source.Length;
77 m_transform = transform;
78
79 m_promise.Anyway(() => Dispose());
80 m_promise.Cancelled(() => Dispose());
81
82 InitPool();
83 }
84
85 public Promise<TDst[]> Promise {
86 get {
87 return m_promise;
88 }
89 }
90
91 protected override bool TryDequeue(out int unit) {
92 unit = Interlocked.Increment(ref m_next) - 1;
93 return unit >= m_source.Length ? false : true;
94 }
95
96 protected override void InvokeUnit(int unit) {
97 try {
98 m_dest[unit] = m_transform(m_source[unit]);
99 var pending = Interlocked.Decrement(ref m_pending);
100 if (pending == 0)
101 m_promise.Resolve(m_dest);
102 } catch (Exception e) {
103 m_promise.Reject(e);
104 }
105 }
106 }
107
108 public static Promise<TDst[]> ParallelMap<TSrc, TDst> (this TSrc[] source, Func<TSrc,TDst> transform, int threads) {
109 if (source == null)
110 throw new ArgumentNullException("source");
111 if (transform == null)
112 throw new ArgumentNullException("transform");
113
114 var mapper = new ArrayMapper<TSrc, TDst>(source, transform, threads);
115 return mapper.Promise;
116 }
117
118 public static Promise<int> ParallelForEach<TSrc>(this TSrc[] source, Action<TSrc> action, int threads) {
119 if (source == null)
120 throw new ArgumentNullException("source");
121 if (action == null)
122 throw new ArgumentNullException("action");
123
124 var iter = new ArrayIterator<TSrc>(source, action, threads);
125 return iter.Promise;
126 }
127
128 public static Promise<TDst[]> ChainedMap<TSrc, TDst>(this TSrc[] source, ChainedOperation<TSrc, TDst> transform, int threads) {
129 if (source == null)
130 throw new ArgumentNullException("source");
131 if (transform == null)
132 throw new ArgumentNullException("transform");
133 if (threads <= 0)
134 throw new ArgumentOutOfRangeException("Threads number must be greater then zero");
135
136 var promise = new Promise<TDst[]>();
137 var res = new TDst[source.Length];
138 var pending = source.Length;
139 var semaphore = new Semaphore(threads, threads);
140
141 AsyncPool.InvokeNewThread(() => {
142 for (int i = 0; i < source.Length; i++) {
143 if(promise.State != PromiseState.Unresolved)
144 break; // stop processing in case of error or cancellation
145 var idx = i;
146 semaphore.WaitOne();
147 try {
148 var p1 = transform(source[i]);
149 p1.Anyway(() => semaphore.Release());
150 p1.Cancelled(() => semaphore.Release());
151 p1.Then(
152 x => {
153 res[idx] = x;
154 var left = Interlocked.Decrement(ref pending);
155 if (left == 0)
156 promise.Resolve(res);
157 },
158 e => promise.Reject(e)
159 );
160
161 } catch (Exception e) {
162 promise.Reject(e);
163 }
164 }
165 return 0;
166 });
167
168 return promise.Anyway(() => semaphore.Dispose());
169 }
170 }
171 }
@@ -0,0 +1,44
1 using System;
2 using System.Threading;
3
4 namespace Implab.Parallels {
5 /// <summary>
6 /// Класс для распаралеливания задач.
7 /// </summary>
8 /// <remarks>
9 /// Используя данный класс и лямда выражения можно распараллелить
10 /// вычисления, для этого используется концепция обещаний.
11 /// </remarks>
12 public static class AsyncPool {
13
14 public static Promise<T> Invoke<T>(Func<T> func) {
15 var p = new Promise<T>();
16
17 ThreadPool.QueueUserWorkItem(param => {
18 try {
19 p.Resolve(func());
20 } catch(Exception e) {
21 p.Reject(e);
22 }
23 });
24
25 return p;
26 }
27
28 public static Promise<T> InvokeNewThread<T>(Func<T> func) {
29 var p = new Promise<T>();
30
31 var worker = new Thread(() => {
32 try {
33 p.Resolve(func());
34 } catch (Exception e) {
35 p.Reject(e);
36 }
37 });
38 worker.IsBackground = true;
39 worker.Start();
40
41 return p;
42 }
43 }
44 }
@@ -0,0 +1,238
1 using System;
2 using System.Collections.Generic;
3 using System.Linq;
4 using System.Text;
5 using System.Threading;
6 using System.Diagnostics;
7
8 namespace Implab.Parallels {
9 public abstract class DispatchPool<TUnit> : IDisposable {
10 readonly int m_minThreads;
11 readonly int m_maxThreads;
12 int m_runningThreads = 0;
13 int m_maxRunningThreads = 0;
14 int m_suspended = 0;
15 int m_exitRequired = 0;
16 AutoResetEvent m_hasTasks = new AutoResetEvent(false);
17
18 protected DispatchPool(int min, int max) {
19 if (min < 0)
20 throw new ArgumentOutOfRangeException("min");
21 if (max <= 0)
22 throw new ArgumentOutOfRangeException("max");
23
24 if (min > max)
25 min = max;
26 m_minThreads = min;
27 m_maxThreads = max;
28 }
29
30 protected DispatchPool(int threads)
31 : this(threads, threads) {
32 }
33
34 protected DispatchPool() {
35 int maxThreads, maxCP;
36 ThreadPool.GetMaxThreads(out maxThreads, out maxCP);
37
38 m_minThreads = 0;
39 m_maxThreads = maxThreads;
40 }
41
42 protected void InitPool() {
43 for (int i = 0; i < m_minThreads; i++)
44 StartWorker();
45 }
46
47 public int ThreadCount {
48 get {
49 return m_runningThreads;
50 }
51 }
52
53 public int MaxRunningThreads {
54 get {
55 return m_maxRunningThreads;
56 }
57 }
58
59 protected bool IsDisposed {
60 get {
61 return m_exitRequired != 0;
62 }
63 }
64
65 protected abstract bool TryDequeue(out TUnit unit);
66
67 protected virtual bool ExtendPool() {
68 if (m_suspended > 0) {
69 m_hasTasks.Set();
70 return true;
71 } else
72 return StartWorker();
73 }
74
75 /// <summary>
76 /// Запускает либо новый поток, если раньше не было ни одного потока, либо устанавливает событие пробуждение одного спящего потока
77 /// </summary>
78 protected void WakePool() {
79 m_hasTasks.Set(); // wake sleeping thread;
80
81 if (AllocateThreadSlot(1)) {
82 var worker = new Thread(this.Worker);
83 worker.IsBackground = true;
84 worker.Start();
85 }
86 }
87
88 protected virtual void Suspend() {
89 m_hasTasks.WaitOne();
90 }
91
92 #region thread slots traits
93
94 bool AllocateThreadSlot() {
95 int current;
96 // use spins to allocate slot for the new thread
97 do {
98 current = m_runningThreads;
99 if (current >= m_maxThreads || m_exitRequired != 0)
100 // no more slots left or the pool has been disposed
101 return false;
102 } while (current != Interlocked.CompareExchange(ref m_runningThreads, current + 1, current));
103
104 UpdateMaxThreads(current + 1);
105
106 return true;
107 }
108
109 bool AllocateThreadSlot(int desired) {
110 if (desired - 1 != Interlocked.CompareExchange(ref m_runningThreads, desired, desired - 1))
111 return false;
112
113 UpdateMaxThreads(desired);
114
115 return true;
116 }
117
118 bool ReleaseThreadSlot(out bool last) {
119 last = false;
120 int current;
121 // use spins to release slot for the new thread
122 do {
123 current = m_runningThreads;
124 if (current <= m_minThreads && m_exitRequired == 0)
125 // the thread is reserved
126 return false;
127 } while (current != Interlocked.CompareExchange(ref m_runningThreads, current - 1, current));
128
129 last = (current == 1);
130
131 return true;
132 }
133
134 /// <summary>
135 /// releases thread slot unconditionally, used during cleanup
136 /// </summary>
137 /// <returns>true - no more threads left</returns>
138 bool ReleaseThreadSlotAnyway() {
139 var left = Interlocked.Decrement(ref m_runningThreads);
140 return left == 0;
141 }
142
143 void UpdateMaxThreads(int count) {
144 int max;
145 do {
146 max = m_maxRunningThreads;
147 if (max >= count)
148 break;
149 } while(max != Interlocked.CompareExchange(ref m_maxRunningThreads, count, max));
150 }
151
152 #endregion
153
154 bool StartWorker() {
155 if (AllocateThreadSlot()) {
156 // slot successfully allocated
157 var worker = new Thread(this.Worker);
158 worker.IsBackground = true;
159 worker.Start();
160
161 return true;
162 } else {
163 return false;
164 }
165 }
166
167 bool FetchTask(out TUnit unit) {
168 do {
169 // exit if requested
170 if (m_exitRequired != 0) {
171 // release the thread slot
172 if (ReleaseThreadSlotAnyway()) // it was the last worker
173 m_hasTasks.Dispose();
174 else
175 m_hasTasks.Set(); // wake next worker
176 unit = default(TUnit);
177 return false;
178 }
179
180 // fetch task
181 if (TryDequeue(out unit)) {
182 ExtendPool();
183 return true;
184 }
185
186 //no tasks left, exit if the thread is no longer needed
187 bool last;
188 if (ReleaseThreadSlot(out last)) {
189 if (last && m_hasTasks.WaitOne(0)) {
190 if (AllocateThreadSlot(1))
191 continue; // spin again...
192 else
193 // we failed to reallocate slot for this thread
194 // therefore we need to release the event
195 m_hasTasks.Set();
196 }
197
198 return false;
199 }
200
201 // entering suspend state
202 Interlocked.Increment(ref m_suspended);
203 // keep this thread and wait
204 Suspend();
205 Interlocked.Decrement(ref m_suspended);
206 } while (true);
207 }
208
209 protected abstract void InvokeUnit(TUnit unit);
210
211 void Worker() {
212 TUnit unit;
213 while (FetchTask(out unit))
214 InvokeUnit(unit);
215 }
216
217 protected virtual void Dispose(bool disposing) {
218 if (disposing) {
219 if (m_exitRequired == 0) {
220 if (Interlocked.CompareExchange(ref m_exitRequired, 1, 0) != 0)
221 return;
222
223 // wake sleeping threads
224 m_hasTasks.Set();
225 GC.SuppressFinalize(this);
226 }
227 }
228 }
229
230 public void Dispose() {
231 Dispose(true);
232 }
233
234 ~DispatchPool() {
235 Dispose(false);
236 }
237 }
238 }
@@ -0,0 +1,74
1 using System;
2 using System.Collections.Generic;
3 using System.Linq;
4 using System.Text;
5 using System.Threading;
6
7 namespace Implab.Parallels {
8 public class MTQueue<T> {
9 class Node {
10 public Node(T value) {
11 this.value = value;
12 }
13 public readonly T value;
14 public Node next;
15 }
16
17 Node m_first;
18 Node m_last;
19
20 public void Enqueue(T value) {
21 var last = m_last;
22 var next = new Node(value);
23
24 while (last != Interlocked.CompareExchange(ref m_last, next, last))
25 last = m_last;
26
27 if (last != null)
28 last.next = next;
29 else
30 m_first = next;
31 }
32
33 public bool TryDequeue(out T value) {
34 Node first;
35 Node next = null;
36 value = default(T);
37
38 do {
39 first = m_first;
40 if (first == null)
41 return false;
42 next = first.next;
43 if (next == null) {
44 // this is the last element,
45 // then try to update tail
46 if (first != Interlocked.CompareExchange(ref m_last, null, first)) {
47 // this is inconsistent situation which means that the queue is empty
48 if (m_last == null)
49 return false;
50 // tail has been changed, that means that we need to restart
51 continue;
52 }
53
54 // tail succesfully updated and first.next will never be changed
55 // other readers will fail due to inconsistency m_last != m_fist, but m_first.next == null
56 // but the writer may update the m_first since the m_last is null
57
58 // so we need to fix inconsistency by setting m_first to null, but if it already has been
59 // updated by a writer then we should just give up
60 Interlocked.CompareExchange(ref m_first, null, first);
61 break;
62
63 } else {
64 if (first == Interlocked.CompareExchange(ref m_first, next, first))
65 // head succesfully updated
66 break;
67 }
68 } while (true);
69
70 value = first.value;
71 return true;
72 }
73 }
74 }
@@ -0,0 +1,89
1 using System;
2 using System.Collections.Generic;
3 using System.Linq;
4 using System.Text;
5 using System.Threading;
6 using System.Diagnostics;
7
8 namespace Implab.Parallels {
9 public class WorkerPool : DispatchPool<Action> {
10
11 MTQueue<Action> m_queue = new MTQueue<Action>();
12 int m_queueLength = 0;
13 readonly int m_threshold = 1;
14
15 public WorkerPool(int minThreads, int maxThreads, int threshold)
16 : base(minThreads, maxThreads) {
17 m_threshold = threshold;
18 InitPool();
19 }
20
21 public WorkerPool(int minThreads, int maxThreads) :
22 base(minThreads, maxThreads) {
23 InitPool();
24 }
25
26 public WorkerPool(int threads)
27 : base(threads) {
28 InitPool();
29 }
30
31 public WorkerPool()
32 : base() {
33 InitPool();
34 }
35
36 public Promise<T> Invoke<T>(Func<T> task) {
37 if (task == null)
38 throw new ArgumentNullException("task");
39 if (IsDisposed)
40 throw new ObjectDisposedException(ToString());
41
42 var promise = new Promise<T>();
43
44 EnqueueTask(delegate() {
45 try {
46 promise.Resolve(task());
47 } catch (Exception e) {
48 promise.Reject(e);
49 }
50 });
51
52 return promise;
53 }
54
55 protected void EnqueueTask(Action unit) {
56 Debug.Assert(unit != null);
57 var len = Interlocked.Increment(ref m_queueLength);
58 m_queue.Enqueue(unit);
59
60 if(!ExtendPool())
61 WakePool();
62 }
63
64 protected override bool ExtendPool() {
65 if (m_queueLength <= m_threshold*ThreadCount)
66 // in this case we are in active thread and it request for additional workers
67 // satisfy it only when queue is longer than threshold
68 return false;
69 return base.ExtendPool();
70 }
71
72 protected override bool TryDequeue(out Action unit) {
73 if (m_queue.TryDequeue(out unit)) {
74 Interlocked.Decrement(ref m_queueLength);
75 return true;
76 }
77 return false;
78 }
79
80 protected override void InvokeUnit(Action unit) {
81 unit();
82 }
83
84 protected override void Suspend() {
85 if (m_queueLength == 0)
86 base.Suspend();
87 }
88 }
89 }
@@ -0,0 +1,36
1 using System;
2 using System.Collections.Generic;
3 using System.Linq;
4 using System.Text;
5
6 namespace Implab
7 {
8 [Serializable]
9 public class ProgressInitEventArgs: EventArgs
10 {
11 public float MaxProgress
12 {
13 get;
14 private set;
15 }
16
17 public float CurrentProgress
18 {
19 get;
20 private set;
21 }
22
23 public string Message
24 {
25 get;
26 private set;
27 }
28
29 public ProgressInitEventArgs(float current, float max, string message)
30 {
31 this.MaxProgress = max;
32 this.CurrentProgress = current;
33 this.Message = message;
34 }
35 }
36 }
@@ -0,0 +1,15
1 using System;
2 using System.Collections.Generic;
3 using System.Linq;
4 using System.Text;
5
6 namespace Implab
7 {
8 public enum PromiseState
9 {
10 Unresolved,
11 Resolved,
12 Cancelled,
13 Rejected
14 }
15 }
@@ -0,0 +1,132
1 using System;
2 using System.Collections.Generic;
3 using System.Linq;
4 using System.Text;
5 using System.Threading;
6
7 namespace Implab
8 {
9 /// <summary>
10 /// This class allows to interact with asyncronuos task.
11 /// </summary>
12 /// <remarks>
13 /// Members of this object are thread safe.
14 /// </remarks>
15 class TaskController: IProgressNotifier, ITaskController, ICancellable
16 {
17 readonly object m_lock;
18 string m_message;
19
20 float m_current;
21 float m_max;
22
23 bool m_cancelled;
24
25 public event EventHandler<ValueEventArgs<string>> MessageUpdated;
26 public event EventHandler<ValueEventArgs<float>> ProgressUpdated;
27 public event EventHandler<ProgressInitEventArgs> ProgressInit;
28
29 public TaskController()
30 {
31 m_lock = new Object();
32 }
33
34 public string Message
35 {
36 get
37 {
38 lock (m_lock)
39 return m_message;
40 }
41 set
42 {
43 lock (m_lock)
44 {
45 m_message = value;
46 OnMessageUpdated();
47 }
48 }
49 }
50
51 public float CurrentProgress
52 {
53 get
54 {
55 lock (m_lock)
56 return m_current;
57 }
58 set
59 {
60 lock (m_lock)
61 {
62 var prev = m_current;
63 m_current = value;
64 if (m_current >= m_max)
65 m_current = m_max;
66 if (m_current != prev)
67 OnProgressUpdated();
68 }
69 }
70 }
71
72 public void InitProgress(float current, float max, string message)
73 {
74 if (max < 0)
75 throw new ArgumentOutOfRangeException("max");
76 if (current < 0 || current > max)
77 throw new ArgumentOutOfRangeException("current");
78
79 lock(m_lock) {
80 m_current = current;
81 m_max = max;
82 m_message = message;
83 OnProgressInit();
84 }
85 }
86
87 public bool Cancelled {
88 get {
89 lock (m_lock)
90 return m_cancelled;
91 }
92 }
93
94 public bool Cancel() {
95 lock (m_lock) {
96 if (!m_cancelled) {
97 m_cancelled = true;
98 return true;
99 } else {
100 return false;
101 }
102 }
103 }
104
105 protected virtual void OnMessageUpdated()
106 {
107 var temp = MessageUpdated;
108 if (temp != null)
109 {
110 temp(this, new ValueEventArgs<string>(m_message));
111 }
112 }
113
114 protected virtual void OnProgressUpdated()
115 {
116 var temp = ProgressUpdated;
117 if (temp != null)
118 {
119 temp(this,new ValueEventArgs<float>(m_current));
120 }
121 }
122
123 protected virtual void OnProgressInit()
124 {
125 var temp = ProgressInit;
126 if (temp != null)
127 {
128 temp(this, new ProgressInitEventArgs(m_current,m_max, m_message));
129 }
130 }
131 }
132 }
@@ -0,0 +1,21
1 using System;
2 using System.Collections.Generic;
3 using System.Linq;
4 using System.Text;
5
6 namespace Implab
7 {
8 [Serializable]
9 public class ValueEventArgs<T>: EventArgs
10 {
11 public ValueEventArgs(T value)
12 {
13 this.Value = value;
14 }
15 public T Value
16 {
17 get;
18 private set;
19 }
20 }
21 }
@@ -1,12 +1,13
1 1 syntax: glob
2 2 Implab.Test/bin/
3 3 *.user
4 4 Implab.Test/obj/
5 5 *.userprefs
6 6 Implab/bin/
7 7 Implab/obj/
8 8 TestResults/
9 9 Implab.Fx/obj/
10 10 Implab.Fx/bin/
11 11 Implab.Fx.Test/bin/
12 12 Implab.Fx.Test/obj/
13 _ReSharper.Implab/
@@ -1,101 +1,333
1 1 using System;
2 2 using Microsoft.VisualStudio.TestTools.UnitTesting;
3 using Implab;
4 3 using System.Reflection;
5 4 using System.Threading;
5 using Implab.Parallels;
6 6
7 namespace Implab.Tests
8 {
9 [TestClass]
10 public class AsyncTests
11 {
12 [TestMethod]
13 public void ResolveTest ()
14 {
15 int res = -1;
16 var p = new Promise<int> ();
17 p.Then (x => res = x);
18 p.Resolve (100);
7 namespace Implab.Test {
8 [TestClass]
9 public class AsyncTests {
10 [TestMethod]
11 public void ResolveTest() {
12 int res = -1;
13 var p = new Promise<int>();
14 p.Then(x => res = x);
15 p.Resolve(100);
16
17 Assert.AreEqual(res, 100);
18 }
19
20 [TestMethod]
21 public void RejectTest() {
22 int res = -1;
23 Exception err = null;
24
25 var p = new Promise<int>();
26 p.Then(x => res = x, e => err = e);
27 p.Reject(new ApplicationException("error"));
28
29 Assert.AreEqual(res, -1);
30 Assert.AreEqual(err.Message, "error");
31
32 }
33
34 [TestMethod]
35 public void JoinSuccessTest() {
36 var p = new Promise<int>();
37 p.Resolve(100);
38 Assert.AreEqual(p.Join(), 100);
39 }
19 40
20 Assert.AreEqual (res, 100);
21 }
41 [TestMethod]
42 public void JoinFailTest() {
43 var p = new Promise<int>();
44 p.Reject(new ApplicationException("failed"));
45
46 try {
47 p.Join();
48 throw new ApplicationException("WRONG!");
49 } catch (TargetInvocationException err) {
50 Assert.AreEqual(err.InnerException.Message, "failed");
51 } catch {
52 Assert.Fail("Got wrong excaption");
53 }
54 }
55
56 [TestMethod]
57 public void MapTest() {
58 var p = new Promise<int>();
59
60 var p2 = p.Map(x => x.ToString());
61 p.Resolve(100);
62
63 Assert.AreEqual(p2.Join(), "100");
64 }
65
66 [TestMethod]
67 public void FixErrorTest() {
68 var p = new Promise<int>();
69
70 var p2 = p.Error(e => 101);
71
72 p.Reject(new Exception());
73
74 Assert.AreEqual(p2.Join(), 101);
75 }
22 76
23 77 [TestMethod]
24 public void RejectTest ()
25 {
26 int res = -1;
27 Exception err = null;
78 public void ChainTest() {
79 var p1 = new Promise<int>();
80
81 var p3 = p1.Chain(x => {
82 var p2 = new Promise<string>();
83 p2.Resolve(x.ToString());
84 return p2;
85 });
86
87 p1.Resolve(100);
28 88
29 var p = new Promise<int> ();
30 p.Then (x => res = x, e => err = e);
31 p.Reject (new ApplicationException ("error"));
89 Assert.AreEqual(p3.Join(), "100");
90 }
32 91
33 Assert.AreEqual (res, -1);
34 Assert.AreEqual (err.Message, "error");
92 [TestMethod]
93 public void PoolTest() {
94 var pid = Thread.CurrentThread.ManagedThreadId;
95 var p = AsyncPool.Invoke(() => Thread.CurrentThread.ManagedThreadId);
35 96
36 }
97 Assert.AreNotEqual(pid, p.Join());
98 }
37 99
38 100 [TestMethod]
39 public void JoinSuccessTest ()
40 {
41 var p = new Promise<int> ();
42 p.Resolve (100);
43 Assert.AreEqual (p.Join (), 100);
44 }
101 public void WorkerPoolSizeTest() {
102 var pool = new WorkerPool(5, 10, 0);
103
104 Assert.AreEqual(5, pool.ThreadCount);
105
106 pool.Invoke(() => { Thread.Sleep(1000000); return 10; });
107 pool.Invoke(() => { Thread.Sleep(1000000); return 10; });
108 pool.Invoke(() => { Thread.Sleep(1000000); return 10; });
109
110 Assert.AreEqual(5, pool.ThreadCount);
111
112 for (int i = 0; i < 100; i++)
113 pool.Invoke(() => { Thread.Sleep(1000000); return 10; });
114 Thread.Sleep(100);
115 Assert.AreEqual(10, pool.ThreadCount);
116
117 pool.Dispose();
118 }
119
120 [TestMethod]
121 public void WorkerPoolCorrectTest() {
122 var pool = new WorkerPool(0,1000,100);
123
124 int iterations = 1000;
125 int pending = iterations;
126 var stop = new ManualResetEvent(false);
127
128 var count = 0;
129 for (int i = 0; i < iterations; i++) {
130 pool
131 .Invoke(() => 1)
132 .Then(x => Interlocked.Add(ref count, x))
133 .Then(x => Math.Log10(x))
134 .Anyway(() => {
135 Interlocked.Decrement(ref pending);
136 if (pending == 0)
137 stop.Set();
138 });
139 }
140
141 stop.WaitOne();
142
143 Assert.AreEqual(iterations, count);
144 Console.WriteLine("Max threads: {0}", pool.MaxRunningThreads);
145 pool.Dispose();
146
147 }
148
149 [TestMethod]
150 public void WorkerPoolDisposeTest() {
151 var pool = new WorkerPool(5, 20);
152 Assert.AreEqual(5, pool.ThreadCount);
153 pool.Dispose();
154 Thread.Sleep(100);
155 Assert.AreEqual(0, pool.ThreadCount);
156 pool.Dispose();
157 }
45 158
46 159 [TestMethod]
47 public void JoinFailTest ()
48 {
49 var p = new Promise<int> ();
50 p.Reject (new ApplicationException ("failed"));
160 public void MTQueueTest() {
161 var queue = new MTQueue<int>();
162 int res;
163
164 queue.Enqueue(10);
165 Assert.IsTrue(queue.TryDequeue(out res));
166 Assert.AreEqual(10, res);
167 Assert.IsFalse(queue.TryDequeue(out res));
168
169 for (int i = 0; i < 1000; i++)
170 queue.Enqueue(i);
171
172 for (int i = 0; i < 1000; i++) {
173 queue.TryDequeue(out res);
174 Assert.AreEqual(i, res);
175 }
176
177 int writers = 0;
178 int readers = 0;
179 var stop = new ManualResetEvent(false);
180 int total = 0;
181
182 int itemsPerWriter = 1000;
183 int writersCount = 3;
51 184
52 try {
53 p.Join ();
54 throw new ApplicationException ("WRONG!");
55 } catch (TargetInvocationException err) {
56 Assert.AreEqual (err.InnerException.Message, "failed");
57 } catch {
58 Assert.Fail ("Got wrong excaption");
59 }
60 }
185 for (int i = 0; i < writersCount; i++) {
186 Interlocked.Increment(ref writers);
187 var wn = i;
188 AsyncPool
189 .InvokeNewThread(() => {
190 for (int ii = 0; ii < itemsPerWriter; ii++) {
191 queue.Enqueue(1);
192 }
193 return 1;
194 })
195 .Anyway(() => Interlocked.Decrement(ref writers));
196 }
197
198 for (int i = 0; i < 10; i++) {
199 Interlocked.Increment(ref readers);
200 var wn = i;
201 AsyncPool
202 .InvokeNewThread(() => {
203 int t;
204 do {
205 while (queue.TryDequeue(out t))
206 Interlocked.Add(ref total, t);
207 } while (writers > 0);
208 return 1;
209 })
210 .Anyway(() => {
211 Interlocked.Decrement(ref readers);
212 if (readers == 0)
213 stop.Set();
214 });
215 }
216
217 stop.WaitOne();
218
219 Assert.AreEqual(itemsPerWriter * writersCount, total);
220 }
61 221
62 222 [TestMethod]
63 public void MapTest ()
64 {
65 var p = new Promise<int> ();
223 public void ParallelMapTest() {
224
225 int count = 100000;
226
227 double[] args = new double[count];
228 var rand = new Random();
229
230 for (int i = 0; i < count; i++)
231 args[i] = rand.NextDouble();
66 232
67 var p2 = p.Map (x => x.ToString ());
68 p.Resolve (100);
233 var t = Environment.TickCount;
234 var res = args.ParallelMap(x => Math.Sin(x*x), 4).Join();
235
236 Console.WriteLine("Map complete in {0} ms", Environment.TickCount - t);
69 237
70 Assert.AreEqual (p2.Join (), "100");
71 }
238 t = Environment.TickCount;
239 for (int i = 0; i < count; i++)
240 Assert.AreEqual(Math.Sin(args[i] * args[i]), res[i]);
241 Console.WriteLine("Verified in {0} ms", Environment.TickCount - t);
242 }
72 243
73 244 [TestMethod]
74 public void ChainTest ()
75 {
76 var p1 = new Promise<int> ();
245 public void ChainedMapTest() {
246
247 using (var pool = new WorkerPool(8,100,0)) {
248 int count = 10000;
249
250 double[] args = new double[count];
251 var rand = new Random();
252
253 for (int i = 0; i < count; i++)
254 args[i] = rand.NextDouble();
77 255
78 var p3 = p1.Chain (x => {
79 var p2 = new Promise<string> ();
80 p2.Resolve (x.ToString ());
81 return p2;
82 });
256 var t = Environment.TickCount;
257 var res = args
258 .ChainedMap(
259 x => pool.Invoke(
260 () => Math.Sin(x * x)
261 ),
262 4
263 )
264 .Join();
83 265
84 p1.Resolve (100);
266 Console.WriteLine("Map complete in {0} ms", Environment.TickCount - t);
85 267
86 Assert.AreEqual (p3.Join (), "100");
87 }
268 t = Environment.TickCount;
269 for (int i = 0; i < count; i++)
270 Assert.AreEqual(Math.Sin(args[i] * args[i]), res[i]);
271 Console.WriteLine("Verified in {0} ms", Environment.TickCount - t);
272 Console.WriteLine("Max workers: {0}", pool.MaxRunningThreads);
273 }
274 }
88 275
89 276 [TestMethod]
90 public void PoolTest ()
91 {
92 var pid = Thread.CurrentThread.ManagedThreadId;
93 var p = AsyncPool.Invoke (() => {
94 return Thread.CurrentThread.ManagedThreadId;
95 });
277 public void ParallelForEachTest() {
278
279 int count = 100000;
280
281 int[] args = new int[count];
282 var rand = new Random();
283
284 for (int i = 0; i < count; i++)
285 args[i] = (int)(rand.NextDouble() * 100);
286
287 int result = 0;
288
289 var t = Environment.TickCount;
290 args.ParallelForEach(x => Interlocked.Add(ref result, x), 4).Join();
291
292 Console.WriteLine("Iteration complete in {0} ms, result: {1}", Environment.TickCount - t, result);
293
294 int result2 = 0;
295
296 t = Environment.TickCount;
297 for (int i = 0; i < count; i++)
298 result2 += args[i];
299 Assert.AreEqual(result2, result);
300 Console.WriteLine("Verified in {0} ms", Environment.TickCount - t);
301 }
96 302
97 Assert.AreNotEqual (pid, p.Join ());
98 }
99 }
303 [TestMethod]
304 public void ComplexCase1Test() {
305 var flags = new bool[3];
306
307 // op1 (aync 200ms) => op2 (async 200ms) => op3 (sync map)
308
309 var p = PromiseHelper
310 .Sleep(200, "Alan")
311 .Cancelled(() => flags[0] = true)
312 .Chain(x =>
313 PromiseHelper
314 .Sleep(200, "Hi, " + x)
315 .Map(y => y)
316 .Cancelled(() => flags[1] = true)
317 )
318 .Cancelled(() => flags[2] = true);
319 Thread.Sleep(300);
320 p.Cancel();
321 try {
322 Assert.AreEqual(p.Join(), "Hi, Alan");
323 Assert.Fail("Shouldn't get here");
324 } catch (OperationCanceledException) {
325 }
326
327 Assert.IsFalse(flags[0]);
328 Assert.IsTrue(flags[1]);
329 Assert.IsTrue(flags[2]);
330 }
331 }
100 332 }
101 333
@@ -1,65 +1,66
1 1 <?xml version="1.0" encoding="utf-8"?>
2 2 <Project ToolsVersion="4.0" DefaultTargets="Build" xmlns="http://schemas.microsoft.com/developer/msbuild/2003">
3 3 <PropertyGroup>
4 4 <Configuration Condition=" '$(Configuration)' == '' ">Debug</Configuration>
5 5 <Platform Condition=" '$(Platform)' == '' ">AnyCPU</Platform>
6 6 <ProductVersion>
7 7 </ProductVersion>
8 8 <SchemaVersion>2.0</SchemaVersion>
9 9 <ProjectGuid>{63F92C0C-61BF-48C0-A377-8D67C3C661D0}</ProjectGuid>
10 10 <OutputType>Library</OutputType>
11 11 <AppDesignerFolder>Properties</AppDesignerFolder>
12 12 <RootNamespace>Implab.Test</RootNamespace>
13 13 <AssemblyName>Implab.Test</AssemblyName>
14 14 <TargetFrameworkVersion>v4.0</TargetFrameworkVersion>
15 15 <FileAlignment>512</FileAlignment>
16 16 <ProjectTypeGuids>{3AC096D0-A1C2-E12C-1390-A8335801FDAB};{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}</ProjectTypeGuids>
17 17 </PropertyGroup>
18 18 <PropertyGroup Condition=" '$(Configuration)|$(Platform)' == 'Debug|AnyCPU' ">
19 19 <DebugSymbols>true</DebugSymbols>
20 20 <DebugType>full</DebugType>
21 21 <Optimize>false</Optimize>
22 22 <OutputPath>bin\Debug\</OutputPath>
23 23 <DefineConstants>DEBUG;TRACE</DefineConstants>
24 24 <ErrorReport>prompt</ErrorReport>
25 25 <WarningLevel>4</WarningLevel>
26 26 </PropertyGroup>
27 27 <PropertyGroup Condition=" '$(Configuration)|$(Platform)' == 'Release|AnyCPU' ">
28 28 <DebugType>pdbonly</DebugType>
29 29 <Optimize>true</Optimize>
30 30 <OutputPath>bin\Release\</OutputPath>
31 31 <DefineConstants>TRACE</DefineConstants>
32 32 <ErrorReport>prompt</ErrorReport>
33 33 <WarningLevel>4</WarningLevel>
34 34 </PropertyGroup>
35 35 <ItemGroup>
36 36 <Reference Include="Microsoft.VisualStudio.QualityTools.UnitTestFramework, Version=10.0.0.0, Culture=neutral, PublicKeyToken=b03f5f7f11d50a3a, processorArchitecture=MSIL" />
37 37 <Reference Include="System" />
38 38 <Reference Include="System.Core">
39 39 <RequiredTargetFramework>3.5</RequiredTargetFramework>
40 40 </Reference>
41 41 </ItemGroup>
42 42 <ItemGroup>
43 43 <CodeAnalysisDependentAssemblyPaths Condition=" '$(VS100COMNTOOLS)' != '' " Include="$(VS100COMNTOOLS)..\IDE\PrivateAssemblies">
44 44 <Visible>False</Visible>
45 45 </CodeAnalysisDependentAssemblyPaths>
46 46 </ItemGroup>
47 47 <ItemGroup>
48 48 <Compile Include="AsyncTests.cs" />
49 <Compile Include="PromiseHelper.cs" />
49 50 <Compile Include="Properties\AssemblyInfo.cs" />
50 51 </ItemGroup>
51 52 <ItemGroup>
52 53 <ProjectReference Include="..\Implab\Implab.csproj">
53 54 <Project>{99B95D0D-9CF9-4F70-8ADF-F4D0AA5CB0D9}</Project>
54 55 <Name>Implab</Name>
55 56 </ProjectReference>
56 57 </ItemGroup>
57 58 <Import Project="$(MSBuildBinPath)\Microsoft.CSharp.targets" />
58 59 <!-- To modify your build process, add your task inside one of the targets below and uncomment it.
59 60 Other similar extension points exist, see Microsoft.Common.targets.
60 61 <Target Name="BeforeBuild">
61 62 </Target>
62 63 <Target Name="AfterBuild">
63 64 </Target>
64 65 -->
65 66 </Project> No newline at end of file
1 NO CONTENT: modified file, binary diff hidden
@@ -1,44 +1,56
1 1 <?xml version="1.0" encoding="utf-8"?>
2 2 <Project DefaultTargets="Build" ToolsVersion="4.0" xmlns="http://schemas.microsoft.com/developer/msbuild/2003">
3 3 <PropertyGroup>
4 4 <Configuration Condition=" '$(Configuration)' == '' ">Debug</Configuration>
5 5 <Platform Condition=" '$(Platform)' == '' ">AnyCPU</Platform>
6 6 <ProductVersion>10.0.0</ProductVersion>
7 7 <SchemaVersion>2.0</SchemaVersion>
8 8 <ProjectGuid>{F550F1F8-8746-4AD0-9614-855F4C4B7F05}</ProjectGuid>
9 9 <OutputType>Library</OutputType>
10 10 <RootNamespace>Implab</RootNamespace>
11 11 <AssemblyName>Implab</AssemblyName>
12 12 </PropertyGroup>
13 13 <PropertyGroup Condition=" '$(Configuration)|$(Platform)' == 'Debug|AnyCPU' ">
14 14 <DebugSymbols>true</DebugSymbols>
15 15 <DebugType>full</DebugType>
16 16 <Optimize>false</Optimize>
17 17 <OutputPath>bin\Debug</OutputPath>
18 18 <DefineConstants>DEBUG;</DefineConstants>
19 19 <ErrorReport>prompt</ErrorReport>
20 20 <WarningLevel>4</WarningLevel>
21 21 <ConsolePause>false</ConsolePause>
22 22 </PropertyGroup>
23 23 <PropertyGroup Condition=" '$(Configuration)|$(Platform)' == 'Release|AnyCPU' ">
24 24 <DebugType>full</DebugType>
25 25 <Optimize>true</Optimize>
26 26 <OutputPath>bin\Release</OutputPath>
27 27 <ErrorReport>prompt</ErrorReport>
28 28 <WarningLevel>4</WarningLevel>
29 29 <ConsolePause>false</ConsolePause>
30 30 </PropertyGroup>
31 31 <ItemGroup>
32 32 <Reference Include="System" />
33 33 </ItemGroup>
34 34 <ItemGroup>
35 <Compile Include="ICancellable.cs" />
36 <Compile Include="IProgressHandler.cs" />
37 <Compile Include="IProgressNotifier.cs" />
38 <Compile Include="IPromise.cs" />
39 <Compile Include="ITaskController.cs" />
40 <Compile Include="ManagedPromise.cs" />
41 <Compile Include="Parallels\DispatchPool.cs" />
42 <Compile Include="Parallels\ArrayTraits.cs" />
43 <Compile Include="Parallels\MTQueue.cs" />
44 <Compile Include="Parallels\WorkerPool.cs" />
45 <Compile Include="PromiseState.cs" />
46 <Compile Include="TaskController.cs" />
47 <Compile Include="ProgressInitEventArgs.cs" />
35 48 <Compile Include="Properties\AssemblyInfo.cs" />
36 49 <Compile Include="Promise.cs" />
37 <Compile Include="AsyncPool.cs" />
50 <Compile Include="Parallels\AsyncPool.cs" />
38 51 <Compile Include="Safe.cs" />
52 <Compile Include="ValueEventArgs.cs" />
39 53 </ItemGroup>
40 54 <Import Project="$(MSBuildBinPath)\Microsoft.CSharp.targets" />
41 <ItemGroup>
42 <Folder Include="Parallels\" />
43 </ItemGroup>
55 <ItemGroup />
44 56 </Project> No newline at end of file
@@ -1,397 +1,549
1 1 using System;
2 2 using System.Collections.Generic;
3 using System.Linq;
4 3 using System.Reflection;
5 using System.Text;
6 4 using System.Diagnostics;
7 5 using System.Threading;
8 6
9 7 namespace Implab {
10 8
11 9 public delegate void ErrorHandler(Exception e);
12
13 public delegate void ResultHandler<T>(T result);
14 public delegate TNew ResultMapper<TSrc, TNew>(TSrc result);
15 public delegate Promise<TNew> ChainedOperation<TSrc, TNew>(TSrc result);
10 public delegate T ErrorHandler<out T>(Exception e);
11 public delegate void ResultHandler<in T>(T result);
12 public delegate TNew ResultMapper<in TSrc, out TNew>(TSrc result);
13 public delegate Promise<TNew> ChainedOperation<in TSrc, TNew>(TSrc result);
16 14
17 15 /// <summary>
18 16 /// Класс для асинхронного получения результатов. Так называемое "обещание".
19 17 /// </summary>
20 18 /// <typeparam name="T">Тип получаемого результата</typeparam>
21 19 /// <remarks>
22 20 /// <para>Сервис при обращении к его методу дает обещаиние о выполнении операции,
23 21 /// клиент получив такое обещание может установить ряд обратных вызово для получения
24 22 /// событий выполнения обещания, тоесть завершения операции и предоставлении результатов.</para>
25 23 /// <para>
26 24 /// Обещение может быть как выполнено, так и выполнено с ошибкой. Для подписки на
27 25 /// данные события клиент должен использовать методы <c>Then</c>.
28 26 /// </para>
29 27 /// <para>
30 28 /// Сервис, в свою очередь, по окончанию выполнения операции (возможно с ошибкой),
31 29 /// использует методы <c>Resolve</c> либо <c>Reject</c> для оповещения клиетна о
32 30 /// выполнении обещания.
33 31 /// </para>
34 32 /// <para>
35 33 /// Если сервер успел выполнить обещание еще до того, как клиент на него подписался,
36 34 /// то в момент подписки клиента будут вызваны соответсвующие события в синхронном
37 35 /// режиме и клиент будет оповещен в любом случае. Иначе, обработчики добавляются в
38 36 /// список в порядке подписания и в этом же порядке они будут вызваны при выполнении
39 37 /// обещания.
40 38 /// </para>
41 39 /// <para>
42 40 /// Обрабатывая результаты обещания можно преобразовывать результаты либо инициировать
43 41 /// связанные асинхронные операции, которые также возвращают обещания. Для этого следует
44 42 /// использовать соответствующую форму методе <c>Then</c>.
45 43 /// </para>
46 44 /// <para>
47 45 /// Также хорошим правилом является то, что <c>Resolve</c> и <c>Reject</c> должен вызывать
48 46 /// только инициатор обещания иначе могут возникнуть противоречия.
49 47 /// </para>
50 48 /// </remarks>
51 public class Promise<T> {
49 public class Promise<T> : IPromise {
52 50
53 51 struct ResultHandlerInfo {
54 52 public ResultHandler<T> resultHandler;
55 53 public ErrorHandler errorHandler;
56 54 }
57 55
58 enum State {
59 Unresolved,
60 Resolving,
61 Resolved,
62 Cancelled
63 }
56 readonly IPromise m_parent;
57
58 LinkedList<ResultHandlerInfo> m_resultHandlers = new LinkedList<ResultHandlerInfo>();
59 LinkedList<Action> m_cancelHandlers = new LinkedList<Action>();
64 60
65 LinkedList<ResultHandlerInfo> m_handlersChain = new LinkedList<ResultHandlerInfo>();
66 State m_state;
67 bool m_cancellable;
61 readonly object m_lock = new Object();
62 readonly bool m_cancellable;
63 int m_childrenCount = 0;
64
65 PromiseState m_state;
68 66 T m_result;
69 67 Exception m_error;
70 68
71 69 public Promise() {
72 70 m_cancellable = true;
73 71 }
74 72
75 /// <summary>
76 /// Событие, возникающее при отмене асинхронной операции.
77 /// </summary>
78 /// <description>
79 /// Как правило используется для оповещения объекта, выполняющего асинхронную операцию, о том, что ее следует отменить.
80 /// </description>
81 public event EventHandler Cancelled;
73 public Promise(IPromise parent, bool cancellable) {
74 m_cancellable = cancellable;
75 m_parent = parent;
76 if (parent != null)
77 parent.HandleCancelled(InternalCancel);
78 }
79
80 void InternalCancel() {
81 // don't try to cancel parent :)
82 Cancel(false);
83 }
82 84
83 85 /// <summary>
84 86 /// Выполняет обещание, сообщая об успешном выполнении.
85 87 /// </summary>
86 88 /// <param name="result">Результат выполнения.</param>
87 89 /// <exception cref="InvalidOperationException">Данное обещание уже выполнено</exception>
88 90 public void Resolve(T result) {
89 lock (this) {
90 if (m_state == State.Cancelled)
91 lock (m_lock) {
92 if (m_state == PromiseState.Cancelled)
91 93 return;
92 if (m_state != State.Unresolved)
94 if (m_state != PromiseState.Unresolved)
93 95 throw new InvalidOperationException("The promise is already resolved");
94 96 m_result = result;
95 m_state = State.Resolving;
97 m_state = PromiseState.Resolved;
96 98 }
97 99
98 ResultHandlerInfo handler;
99 while (FetchNextHandler(out handler))
100 InvokeHandler(handler);
100 OnStateChanged();
101 101 }
102 102
103 103 /// <summary>
104 104 /// Выполняет обещание, сообщая об ошибке
105 105 /// </summary>
106 /// <remarks>
107 /// Поскольку обещание должно работать в многопточной среде, при его выполнении сразу несколько потоков
108 /// могу вернуть ошибку, при этом только первая будет использована в качестве результата, остальные
109 /// будут проигнорированы.
110 /// </remarks>
106 111 /// <param name="error">Исключение возникшее при выполнении операции</param>
107 112 /// <exception cref="InvalidOperationException">Данное обещание уже выполнено</exception>
108 113 public void Reject(Exception error) {
109 lock (this) {
110 if (m_state == State.Cancelled)
114 lock (m_lock) {
115 if (m_state == PromiseState.Cancelled || m_state == PromiseState.Rejected)
111 116 return;
112 if (m_state != State.Unresolved)
117 if (m_state != PromiseState.Unresolved)
113 118 throw new InvalidOperationException("The promise is already resolved");
114 119 m_error = error;
115 m_state = State.Resolving;
120 m_state = PromiseState.Rejected;
116 121 }
117 122
118 ResultHandlerInfo handler;
119 while (FetchNextHandler(out handler))
120 InvokeHandler(handler);
123 OnStateChanged();
121 124 }
122 125
123 126 /// <summary>
124 127 /// Отменяет операцию, если это возможно.
125 128 /// </summary>
126 129 /// <returns><c>true</c> Операция была отменена, обработчики не будут вызваны.<c>false</c> отмена не возможна, поскольку обещание уже выполнено и обработчики отработали.</returns>
127 130 public bool Cancel() {
128 lock (this) {
129 if (m_state == State.Unresolved && m_cancellable) {
130 m_state = State.Cancelled;
131 EventHandler temp = Cancelled;
132
133 if (temp != null)
134 temp(this, new EventArgs());
135
136 return true;
137 } else
138 return false;
139 }
131 return Cancel(true);
140 132 }
141 133
142 134 /// <summary>
143 /// Добавляет обработчики событий выполнения обещания.
135 /// Adds new handlers to this promise.
144 136 /// </summary>
145 /// <param name="success">Обработчик успешного выполнения обещания.
146 /// Данному обработчику будет передан результат выполнения операции.</param>
147 /// <param name="error">Обработчик ошибки. Данный обработчик получит
148 /// исключение возникшее при выполнении операции.</param>
149 /// <returns>Само обещание</returns>
137 /// <param name="success">The handler of the successfully completed operation.
138 /// This handler will recieve an operation result as a parameter.</param>
139 /// <param name="error">Handles an exception that may occur during the operation.</param>
140 /// <returns>The new promise chained to this one.</returns>
150 141 public Promise<T> Then(ResultHandler<T> success, ErrorHandler error) {
151 142 if (success == null && error == null)
152 143 return this;
153 144
154 var medium = new Promise<T>();
145 var medium = new Promise<T>(this, true);
155 146
156 147 var handlerInfo = new ResultHandlerInfo();
157 148
158 149 if (success != null)
159 150 handlerInfo.resultHandler = x => {
160 try {
161 success(x);
162 medium.Resolve(x);
163 } catch (Exception e) {
164 medium.Reject(e);
165 }
151 success(x);
152 medium.Resolve(x);
166 153 };
167 154 else
168 handlerInfo.resultHandler = x => medium.Resolve(x);
155 handlerInfo.resultHandler = medium.Resolve;
169 156
170 157 if (error != null)
171 158 handlerInfo.errorHandler = x => {
172 159 try {
173 160 error(x);
174 161 } catch { }
175 162 medium.Reject(x);
176 163 };
177 164 else
178 handlerInfo.errorHandler = x => medium.Reject(x);
165 handlerInfo.errorHandler = medium.Reject;
166
167 AddHandler(handlerInfo);
168
169 return medium;
170 }
171
172 /// <summary>
173 /// Adds new handlers to this promise.
174 /// </summary>
175 /// <param name="success">The handler of the successfully completed operation.
176 /// This handler will recieve an operation result as a parameter.</param>
177 /// <param name="error">Handles an exception that may occur during the operation and returns the value which will be used as the result of the operation.</param>
178 /// <returns>The new promise chained to this one.</returns>
179 public Promise<T> Then(ResultHandler<T> success, ErrorHandler<T> error) {
180 if (success == null && error == null)
181 return this;
182
183 var medium = new Promise<T>(this, true);
184
185 var handlerInfo = new ResultHandlerInfo();
186
187 if (success != null)
188 handlerInfo.resultHandler = x => {
189 success(x);
190 medium.Resolve(x);
191 };
192 else
193 handlerInfo.resultHandler = medium.Resolve;
194
195 if (error != null)
196 handlerInfo.errorHandler = x => {
197 try {
198 medium.Resolve(error(x));
199 } catch { }
200 medium.Reject(x);
201 };
202 else
203 handlerInfo.errorHandler = medium.Reject;
179 204
180 205 AddHandler(handlerInfo);
181 206
182 207 return medium;
183 208 }
184 209
210
185 211 public Promise<T> Then(ResultHandler<T> success) {
186 return Then(success, null);
212 if (success == null)
213 return this;
214
215 var medium = new Promise<T>(this, true);
216
217 var handlerInfo = new ResultHandlerInfo();
218
219 if (success != null)
220 handlerInfo.resultHandler = x => {
221 success(x);
222 medium.Resolve(x);
223 };
224 else
225 handlerInfo.resultHandler = medium.Resolve;
226
227 handlerInfo.errorHandler = medium.Reject;
228
229 AddHandler(handlerInfo);
230
231 return medium;
187 232 }
188 233
189 234 public Promise<T> Error(ErrorHandler error) {
190 235 return Then(null, error);
191 236 }
192 237
238 /// <summary>
239 /// Handles error and allows to keep the promise.
240 /// </summary>
241 /// <remarks>
242 /// If the specified handler throws an exception, this exception will be used to reject the promise.
243 /// </remarks>
244 /// <param name="handler">The error handler which returns the result of the promise.</param>
245 /// <returns>New promise.</returns>
246 public Promise<T> Error(ErrorHandler<T> handler) {
247 if (handler == null)
248 return this;
249
250 var medium = new Promise<T>(this, true);
251
252 AddHandler(new ResultHandlerInfo {
253 errorHandler = e => {
254 try {
255 medium.Resolve(handler(e));
256 } catch (Exception e2) {
257 medium.Reject(e2);
258 }
259 }
260 });
261
262 return medium;
263 }
264
193 265 public Promise<T> Anyway(Action handler) {
194 266 if (handler == null)
195 267 return this;
196 268
197 269 var medium = new Promise<T>();
198 270
199 271 AddHandler(new ResultHandlerInfo {
200 272 resultHandler = x => {
273 // to avoid handler being called multiple times we handle exception by ourselfs
201 274 try {
202 275 handler();
203 276 medium.Resolve(x);
204 277 } catch (Exception e) {
205 278 medium.Reject(e);
206 279 }
207 280 },
208 281 errorHandler = x => {
209 282 try {
210 283 handler();
211 284 } catch { }
212 285 medium.Reject(x);
213 286 }
214 287 });
215 288
216 289 return medium;
217 290 }
218 291
219 292 /// <summary>
220 293 /// Позволяет преобразовать результат выполения операции к новому типу.
221 294 /// </summary>
222 295 /// <typeparam name="TNew">Новый тип результата.</typeparam>
223 296 /// <param name="mapper">Преобразование результата к новому типу.</param>
224 297 /// <param name="error">Обработчик ошибки. Данный обработчик получит
225 298 /// исключение возникшее при выполнении операции.</param>
226 299 /// <returns>Новое обещание, которое будет выполнено при выполнении исходного обещания.</returns>
227 300 public Promise<TNew> Map<TNew>(ResultMapper<T, TNew> mapper, ErrorHandler error) {
228 301 if (mapper == null)
229 302 throw new ArgumentNullException("mapper");
230 303
231 304 // создаем прицепленное обещание
232 Promise<TNew> chained = new Promise<TNew>();
305 var chained = new Promise<TNew>();
233 306
234 307 AddHandler(new ResultHandlerInfo() {
235 resultHandler = delegate(T result) {
236 try {
237 // если преобразование выдаст исключение, то сработает reject сцепленного deferred
238 chained.Resolve(mapper(result));
239 } catch (Exception e) {
240 chained.Reject(e);
241 }
242 },
308 resultHandler = result => chained.Resolve(mapper(result)),
243 309 errorHandler = delegate(Exception e) {
244 310 if (error != null)
245 error(e);
311 try {
312 error(e);
313 } catch { }
246 314 // в случае ошибки нужно передать исключение дальше по цепочке
247 315 chained.Reject(e);
248 316 }
249 317 });
250 318
251 319 return chained;
252 320 }
253 321
254 322 public Promise<TNew> Map<TNew>(ResultMapper<T, TNew> mapper) {
255 323 return Map(mapper, null);
256 324 }
257 325
258 326 /// <summary>
259 327 /// Сцепляет несколько аснхронных операций. Указанная асинхронная операция будет вызвана после
260 328 /// выполнения текущей, а результат текущей операции может быть использован для инициализации
261 329 /// новой операции.
262 330 /// </summary>
263 331 /// <typeparam name="TNew">Тип результата указанной асинхронной операции.</typeparam>
264 332 /// <param name="chained">Асинхронная операция, которая должна будет начаться после выполнения текущей.</param>
265 333 /// <param name="error">Обработчик ошибки. Данный обработчик получит
266 334 /// исключение возникшее при выполнении текуещй операции.</param>
267 335 /// <returns>Новое обещание, которое будет выполнено по окончанию указанной аснхронной операции.</returns>
268 336 public Promise<TNew> Chain<TNew>(ChainedOperation<T, TNew> chained, ErrorHandler error) {
269 337
270 338 // проблема в том, что на момент связывания еще не начата асинхронная операция, поэтому нужно
271 339 // создать посредника, к которому будут подвызяваться следующие обработчики.
272 340 // когда будет выполнена реальная асинхронная операция, она обратиться к посреднику, чтобы
273 341 // передать через него результаты работы.
274 Promise<TNew> medium = new Promise<TNew>();
342 var medium = new Promise<TNew>(this, true);
275 343
276 AddHandler(new ResultHandlerInfo() {
344 AddHandler(new ResultHandlerInfo {
277 345 resultHandler = delegate(T result) {
278 try {
279 chained(result).Then(
280 x => medium.Resolve(x),
281 e => medium.Reject(e)
282 );
283 } catch (Exception e) {
284 // если сцепленное действие выдало исключение вместо обещания, то передаем ошибку по цепочке
285 medium.Reject(e);
286 }
346 if (medium.State == PromiseState.Cancelled)
347 return;
348
349 var promise = chained(result);
350
351 // notify chained operation that it's not needed
352 medium.Cancelled(() => promise.Cancel());
353 promise.Then(
354 x => medium.Resolve(x),
355 e => medium.Reject(e)
356 );
287 357 },
288 358 errorHandler = delegate(Exception e) {
289 359 if (error != null)
290 360 error(e);
291 361 // в случае ошибки нужно передать исключение дальше по цепочке
292 362 medium.Reject(e);
293 363 }
294 364 });
295 365
296 366 return medium;
297 367 }
298 368
299 369 public Promise<TNew> Chain<TNew>(ChainedOperation<T, TNew> chained) {
300 370 return Chain(chained, null);
301 371 }
302 372
373 public Promise<T> Cancelled(Action handler) {
374 if (handler == null)
375 return this;
376 lock (m_lock) {
377 if (m_state == PromiseState.Unresolved)
378 m_cancelHandlers.AddLast(handler);
379 else if (m_state == PromiseState.Cancelled)
380 handler();
381 }
382 return this;
383 }
384
385 public void HandleCancelled(Action handler) {
386 Cancelled(handler);
387 }
388
303 389 /// <summary>
304 390 /// Дожидается отложенного обещания и в случае успеха, возвращает
305 391 /// его, результат, в противном случае бросает исключение.
306 392 /// </summary>
307 393 /// <remarks>
308 394 /// <para>
309 395 /// Если ожидание обещания было прервано по таймауту, это не значит,
310 396 /// что обещание было отменено или что-то в этом роде, это только
311 397 /// означает, что мы его не дождались, однако все зарегистрированные
312 398 /// обработчики, как были так остались и они будут вызваны, когда
313 399 /// обещание будет выполнено.
314 400 /// </para>
315 401 /// <para>
316 402 /// Такое поведение вполне оправдано поскольку таймаут может истечь
317 403 /// в тот момент, когда началась обработка цепочки обработчиков, и
318 404 /// к тому же текущее обещание может стоять в цепочке обещаний и его
319 405 /// отклонение может привести к непрогнозируемому результату.
320 406 /// </para>
321 407 /// </remarks>
322 408 /// <param name="timeout">Время ожидания</param>
323 409 /// <returns>Результат выполнения обещания</returns>
324 410 public T Join(int timeout) {
325 ManualResetEvent evt = new ManualResetEvent(false);
411 var evt = new ManualResetEvent(false);
326 412 Anyway(() => evt.Set());
413 Cancelled(() => evt.Set());
327 414
328 415 if (!evt.WaitOne(timeout, true))
329 416 throw new TimeoutException();
330 417
331 if (m_error != null)
332 throw new TargetInvocationException(m_error);
333 else
334 return m_result;
418 switch (State) {
419 case PromiseState.Resolved:
420 return m_result;
421 case PromiseState.Cancelled:
422 throw new OperationCanceledException();
423 case PromiseState.Rejected:
424 throw new TargetInvocationException(m_error);
425 default:
426 throw new ApplicationException(String.Format("Invalid promise state {0}", State));
427 }
335 428 }
336 429
337 430 public T Join() {
338 431 return Join(Timeout.Infinite);
339 432 }
340 433
341 /// <summary>
342 /// Данный метод последовательно извлекает обработчики обещания и когда
343 /// их больше не осталось - ставит состояние "разрешено".
344 /// </summary>
345 /// <param name="handler">Информация об обработчике</param>
346 /// <returns>Признак того, что еще остались обработчики в очереди</returns>
347 bool FetchNextHandler(out ResultHandlerInfo handler) {
348 handler = default(ResultHandlerInfo);
349
350 lock (this) {
351 Debug.Assert(m_state == State.Resolving);
352
353 if (m_handlersChain.Count > 0) {
354 handler = m_handlersChain.First.Value;
355 m_handlersChain.RemoveFirst();
356 return true;
357 } else {
358 m_state = State.Resolved;
359 return false;
360 }
361 }
362 }
363
364 434 void AddHandler(ResultHandlerInfo handler) {
365 435 bool invokeRequired = false;
366 436
367 lock (this) {
368 if (m_state != State.Resolved)
369 m_handlersChain.AddLast(handler);
370 else
437 lock (m_lock) {
438 m_childrenCount++;
439 if (m_state == PromiseState.Unresolved) {
440 m_resultHandlers.AddLast(handler);
441 } else
371 442 invokeRequired = true;
372 443 }
373 444
374 445 // обработчики не должны блокировать сам объект
375 446 if (invokeRequired)
376 447 InvokeHandler(handler);
377 448 }
378 449
379 450 void InvokeHandler(ResultHandlerInfo handler) {
380 if (m_error == null) {
381 try {
382 if (handler.resultHandler != null)
383 handler.resultHandler(m_result);
384 } catch { }
385 }
386
387 if (m_error != null) {
388 try {
389 if (handler.errorHandler != null)
390 handler.errorHandler(m_error);
391 } catch { }
451 switch (m_state) {
452 case PromiseState.Resolved:
453 try {
454 if (handler.resultHandler != null)
455 handler.resultHandler(m_result);
456 } catch (Exception e) {
457 try {
458 if (handler.errorHandler != null)
459 handler.errorHandler(e);
460 } catch { }
461 }
462 break;
463 case PromiseState.Rejected:
464 try {
465 if (handler.errorHandler != null)
466 handler.errorHandler(m_error);
467 } catch { }
468 break;
469 default:
470 // do nothing
471 return;
392 472 }
393 473 }
394 474
475 protected virtual void OnStateChanged() {
476 switch (m_state) {
477 case PromiseState.Resolved:
478 foreach (var resultHandlerInfo in m_resultHandlers)
479 try {
480 if (resultHandlerInfo.resultHandler != null)
481 resultHandlerInfo.resultHandler(m_result);
482 } catch (Exception e) {
483 try {
484 if (resultHandlerInfo.errorHandler != null)
485 resultHandlerInfo.errorHandler(e);
486 } catch { }
487 }
488 break;
489 case PromiseState.Cancelled:
490 foreach (var cancelHandler in m_cancelHandlers)
491 cancelHandler();
492 break;
493 case PromiseState.Rejected:
494 foreach (var resultHandlerInfo in m_resultHandlers)
495 try {
496 if (resultHandlerInfo.errorHandler != null)
497 resultHandlerInfo.errorHandler(m_error);
498 } catch { }
499 break;
500 default:
501 throw new InvalidOperationException(String.Format("Promise entered an invalid state {0}", m_state));
502 }
503
504 m_resultHandlers = null;
505 m_cancelHandlers = null;
506 }
507
508
509
510 public bool IsExclusive {
511 get {
512 lock (m_lock) {
513 return m_childrenCount <= 1;
514 }
515 }
516 }
517
518 public PromiseState State {
519 get {
520 lock (m_lock) {
521 return m_state;
522 }
523 }
524 }
525
526 protected bool Cancel(bool dependencies) {
527 bool result;
528
529 lock (m_lock) {
530 if (m_state == PromiseState.Unresolved) {
531 m_state = PromiseState.Cancelled;
532 result = true;
533 } else {
534 result = false;
535 }
536 }
537
538 if (result)
539 OnStateChanged();
540
541 if (dependencies && m_parent != null && m_parent.IsExclusive) {
542 m_parent.Cancel();
543 }
544
545 return result;
546 }
395 547
396 548 }
397 549 }
1 NO CONTENT: file was removed
General Comments 0
You need to be logged in to leave comments. Login now