##// END OF EJS Templates
Слияние с promises
cin -
r18:0c924dff5498 merge default
parent child
Show More
@@ -0,0 +1,17
1 using Implab.Parallels;
2 using System;
3 using System.Collections.Generic;
4 using System.Linq;
5 using System.Text;
6 using System.Threading;
7
8 namespace Implab.Test {
9 class PromiseHelper {
10 public static Promise<T> Sleep<T>(int timeout, T retVal) {
11 return AsyncPool.Invoke(() => {
12 Thread.Sleep(timeout);
13 return retVal;
14 });
15 }
16 }
17 }
1 NO CONTENT: new file 100644, binary diff hidden
NO CONTENT: new file 100644, binary diff hidden
@@ -0,0 +1,10
1 using System;
2 using System.Collections.Generic;
3 using System.Linq;
4 using System.Text;
5
6 namespace Implab {
7 public interface ICancellable {
8 bool Cancel();
9 }
10 }
@@ -0,0 +1,18
1 using System;
2 using System.Collections.Generic;
3 using System.Linq;
4 using System.Text;
5
6 namespace Implab {
7 public interface IProgressHandler {
8 string Message {
9 get;
10 set;
11 }
12 float CurrentProgress {
13 get;
14 set;
15 }
16 void InitProgress(float current, float max, string message);
17 }
18 }
@@ -0,0 +1,14
1 using System;
2 using System.Collections.Generic;
3 using System.Linq;
4 using System.Text;
5
6 namespace Implab
7 {
8 public interface IProgressNotifier
9 {
10 event EventHandler<ValueEventArgs<string>> MessageUpdated;
11 event EventHandler<ValueEventArgs<float>> ProgressUpdated;
12 event EventHandler<ProgressInitEventArgs> ProgressInit;
13 }
14 }
@@ -0,0 +1,33
1 using System;
2 using System.Collections.Generic;
3 using System.Linq;
4 using System.Text;
5
6 namespace Implab
7 {
8 public interface IPromise: ICancellable
9 {
10 /// <summary>
11 /// Check whereather the promise has no more than one dependent promise.
12 /// </summary>
13 bool IsExclusive
14 {
15 get;
16 }
17
18 /// <summary>
19 /// The current state of the promise.
20 /// </summary>
21 PromiseState State
22 {
23 get;
24 }
25
26 /// <summary>
27 /// Registers handler for the case when the promise is cencelled. If the promise already cancelled the
28 /// handler will be invoked immediatelly.
29 /// </summary>
30 /// <param name="handler">The handler</param>
31 void HandleCancelled(Action handler);
32 }
33 }
@@ -0,0 +1,12
1 using System;
2 using System.Collections.Generic;
3 using System.Linq;
4 using System.Text;
5
6 namespace Implab {
7 public interface ITaskController: IProgressHandler {
8 bool Cancelled {
9 get;
10 }
11 }
12 }
@@ -0,0 +1,11
1 using System;
2 using System.Collections.Generic;
3 using System.Linq;
4 using System.Text;
5
6 namespace Implab {
7
8 /*public class ManagedPromise<T>: Promise<T>, ITaskController, IProgressNotifier {
9
10 }*/
11 }
@@ -0,0 +1,171
1 using System;
2 using System.Collections.Generic;
3 using System.Diagnostics;
4 using System.Linq;
5 using System.Text;
6 using System.Threading;
7
8 namespace Implab.Parallels {
9 public static class ArrayTraits {
10 class ArrayIterator<TSrc> : DispatchPool<int> {
11 readonly Action<TSrc> m_action;
12 readonly TSrc[] m_source;
13 readonly Promise<int> m_promise = new Promise<int>();
14
15 int m_pending;
16 int m_next;
17
18 public ArrayIterator(TSrc[] source, Action<TSrc> action, int threads)
19 : base(threads) {
20
21 Debug.Assert(source != null);
22 Debug.Assert(action != null);
23
24 m_next = 0;
25 m_source = source;
26 m_pending = source.Length;
27 m_action = action;
28
29 m_promise.Anyway(() => Dispose());
30 m_promise.Cancelled(() => Dispose());
31
32 InitPool();
33 }
34
35 public Promise<int> Promise {
36 get {
37 return m_promise;
38 }
39 }
40
41 protected override bool TryDequeue(out int unit) {
42 unit = Interlocked.Increment(ref m_next) - 1;
43 return unit >= m_source.Length ? false : true;
44 }
45
46 protected override void InvokeUnit(int unit) {
47 try {
48 m_action(m_source[unit]);
49 var pending = Interlocked.Decrement(ref m_pending);
50 if (pending == 0)
51 m_promise.Resolve(m_source.Length);
52 } catch (Exception e) {
53 m_promise.Reject(e);
54 }
55 }
56 }
57
58 class ArrayMapper<TSrc, TDst>: DispatchPool<int> {
59 readonly Func<TSrc, TDst> m_transform;
60 readonly TSrc[] m_source;
61 readonly TDst[] m_dest;
62 readonly Promise<TDst[]> m_promise = new Promise<TDst[]>();
63
64 int m_pending;
65 int m_next;
66
67 public ArrayMapper(TSrc[] source, Func<TSrc, TDst> transform, int threads)
68 : base(threads) {
69
70 Debug.Assert (source != null);
71 Debug.Assert( transform != null);
72
73 m_next = 0;
74 m_source = source;
75 m_dest = new TDst[source.Length];
76 m_pending = source.Length;
77 m_transform = transform;
78
79 m_promise.Anyway(() => Dispose());
80 m_promise.Cancelled(() => Dispose());
81
82 InitPool();
83 }
84
85 public Promise<TDst[]> Promise {
86 get {
87 return m_promise;
88 }
89 }
90
91 protected override bool TryDequeue(out int unit) {
92 unit = Interlocked.Increment(ref m_next) - 1;
93 return unit >= m_source.Length ? false : true;
94 }
95
96 protected override void InvokeUnit(int unit) {
97 try {
98 m_dest[unit] = m_transform(m_source[unit]);
99 var pending = Interlocked.Decrement(ref m_pending);
100 if (pending == 0)
101 m_promise.Resolve(m_dest);
102 } catch (Exception e) {
103 m_promise.Reject(e);
104 }
105 }
106 }
107
108 public static Promise<TDst[]> ParallelMap<TSrc, TDst> (this TSrc[] source, Func<TSrc,TDst> transform, int threads) {
109 if (source == null)
110 throw new ArgumentNullException("source");
111 if (transform == null)
112 throw new ArgumentNullException("transform");
113
114 var mapper = new ArrayMapper<TSrc, TDst>(source, transform, threads);
115 return mapper.Promise;
116 }
117
118 public static Promise<int> ParallelForEach<TSrc>(this TSrc[] source, Action<TSrc> action, int threads) {
119 if (source == null)
120 throw new ArgumentNullException("source");
121 if (action == null)
122 throw new ArgumentNullException("action");
123
124 var iter = new ArrayIterator<TSrc>(source, action, threads);
125 return iter.Promise;
126 }
127
128 public static Promise<TDst[]> ChainedMap<TSrc, TDst>(this TSrc[] source, ChainedOperation<TSrc, TDst> transform, int threads) {
129 if (source == null)
130 throw new ArgumentNullException("source");
131 if (transform == null)
132 throw new ArgumentNullException("transform");
133 if (threads <= 0)
134 throw new ArgumentOutOfRangeException("Threads number must be greater then zero");
135
136 var promise = new Promise<TDst[]>();
137 var res = new TDst[source.Length];
138 var pending = source.Length;
139 var semaphore = new Semaphore(threads, threads);
140
141 AsyncPool.InvokeNewThread(() => {
142 for (int i = 0; i < source.Length; i++) {
143 if(promise.State != PromiseState.Unresolved)
144 break; // stop processing in case of error or cancellation
145 var idx = i;
146 semaphore.WaitOne();
147 try {
148 var p1 = transform(source[i]);
149 p1.Anyway(() => semaphore.Release());
150 p1.Cancelled(() => semaphore.Release());
151 p1.Then(
152 x => {
153 res[idx] = x;
154 var left = Interlocked.Decrement(ref pending);
155 if (left == 0)
156 promise.Resolve(res);
157 },
158 e => promise.Reject(e)
159 );
160
161 } catch (Exception e) {
162 promise.Reject(e);
163 }
164 }
165 return 0;
166 });
167
168 return promise.Anyway(() => semaphore.Dispose());
169 }
170 }
171 }
@@ -0,0 +1,44
1 using System;
2 using System.Threading;
3
4 namespace Implab.Parallels {
5 /// <summary>
6 /// Класс для распаралеливания задач.
7 /// </summary>
8 /// <remarks>
9 /// Используя данный класс и лямда выражения можно распараллелить
10 /// вычисления, для этого используется концепция обещаний.
11 /// </remarks>
12 public static class AsyncPool {
13
14 public static Promise<T> Invoke<T>(Func<T> func) {
15 var p = new Promise<T>();
16
17 ThreadPool.QueueUserWorkItem(param => {
18 try {
19 p.Resolve(func());
20 } catch(Exception e) {
21 p.Reject(e);
22 }
23 });
24
25 return p;
26 }
27
28 public static Promise<T> InvokeNewThread<T>(Func<T> func) {
29 var p = new Promise<T>();
30
31 var worker = new Thread(() => {
32 try {
33 p.Resolve(func());
34 } catch (Exception e) {
35 p.Reject(e);
36 }
37 });
38 worker.IsBackground = true;
39 worker.Start();
40
41 return p;
42 }
43 }
44 }
@@ -0,0 +1,238
1 using System;
2 using System.Collections.Generic;
3 using System.Linq;
4 using System.Text;
5 using System.Threading;
6 using System.Diagnostics;
7
8 namespace Implab.Parallels {
9 public abstract class DispatchPool<TUnit> : IDisposable {
10 readonly int m_minThreads;
11 readonly int m_maxThreads;
12 int m_runningThreads = 0;
13 int m_maxRunningThreads = 0;
14 int m_suspended = 0;
15 int m_exitRequired = 0;
16 AutoResetEvent m_hasTasks = new AutoResetEvent(false);
17
18 protected DispatchPool(int min, int max) {
19 if (min < 0)
20 throw new ArgumentOutOfRangeException("min");
21 if (max <= 0)
22 throw new ArgumentOutOfRangeException("max");
23
24 if (min > max)
25 min = max;
26 m_minThreads = min;
27 m_maxThreads = max;
28 }
29
30 protected DispatchPool(int threads)
31 : this(threads, threads) {
32 }
33
34 protected DispatchPool() {
35 int maxThreads, maxCP;
36 ThreadPool.GetMaxThreads(out maxThreads, out maxCP);
37
38 m_minThreads = 0;
39 m_maxThreads = maxThreads;
40 }
41
42 protected void InitPool() {
43 for (int i = 0; i < m_minThreads; i++)
44 StartWorker();
45 }
46
47 public int ThreadCount {
48 get {
49 return m_runningThreads;
50 }
51 }
52
53 public int MaxRunningThreads {
54 get {
55 return m_maxRunningThreads;
56 }
57 }
58
59 protected bool IsDisposed {
60 get {
61 return m_exitRequired != 0;
62 }
63 }
64
65 protected abstract bool TryDequeue(out TUnit unit);
66
67 protected virtual bool ExtendPool() {
68 if (m_suspended > 0) {
69 m_hasTasks.Set();
70 return true;
71 } else
72 return StartWorker();
73 }
74
75 /// <summary>
76 /// Запускает либо новый поток, если раньше не было ни одного потока, либо устанавливает событие пробуждение одного спящего потока
77 /// </summary>
78 protected void WakePool() {
79 m_hasTasks.Set(); // wake sleeping thread;
80
81 if (AllocateThreadSlot(1)) {
82 var worker = new Thread(this.Worker);
83 worker.IsBackground = true;
84 worker.Start();
85 }
86 }
87
88 protected virtual void Suspend() {
89 m_hasTasks.WaitOne();
90 }
91
92 #region thread slots traits
93
94 bool AllocateThreadSlot() {
95 int current;
96 // use spins to allocate slot for the new thread
97 do {
98 current = m_runningThreads;
99 if (current >= m_maxThreads || m_exitRequired != 0)
100 // no more slots left or the pool has been disposed
101 return false;
102 } while (current != Interlocked.CompareExchange(ref m_runningThreads, current + 1, current));
103
104 UpdateMaxThreads(current + 1);
105
106 return true;
107 }
108
109 bool AllocateThreadSlot(int desired) {
110 if (desired - 1 != Interlocked.CompareExchange(ref m_runningThreads, desired, desired - 1))
111 return false;
112
113 UpdateMaxThreads(desired);
114
115 return true;
116 }
117
118 bool ReleaseThreadSlot(out bool last) {
119 last = false;
120 int current;
121 // use spins to release slot for the new thread
122 do {
123 current = m_runningThreads;
124 if (current <= m_minThreads && m_exitRequired == 0)
125 // the thread is reserved
126 return false;
127 } while (current != Interlocked.CompareExchange(ref m_runningThreads, current - 1, current));
128
129 last = (current == 1);
130
131 return true;
132 }
133
134 /// <summary>
135 /// releases thread slot unconditionally, used during cleanup
136 /// </summary>
137 /// <returns>true - no more threads left</returns>
138 bool ReleaseThreadSlotAnyway() {
139 var left = Interlocked.Decrement(ref m_runningThreads);
140 return left == 0;
141 }
142
143 void UpdateMaxThreads(int count) {
144 int max;
145 do {
146 max = m_maxRunningThreads;
147 if (max >= count)
148 break;
149 } while(max != Interlocked.CompareExchange(ref m_maxRunningThreads, count, max));
150 }
151
152 #endregion
153
154 bool StartWorker() {
155 if (AllocateThreadSlot()) {
156 // slot successfully allocated
157 var worker = new Thread(this.Worker);
158 worker.IsBackground = true;
159 worker.Start();
160
161 return true;
162 } else {
163 return false;
164 }
165 }
166
167 bool FetchTask(out TUnit unit) {
168 do {
169 // exit if requested
170 if (m_exitRequired != 0) {
171 // release the thread slot
172 if (ReleaseThreadSlotAnyway()) // it was the last worker
173 m_hasTasks.Dispose();
174 else
175 m_hasTasks.Set(); // wake next worker
176 unit = default(TUnit);
177 return false;
178 }
179
180 // fetch task
181 if (TryDequeue(out unit)) {
182 ExtendPool();
183 return true;
184 }
185
186 //no tasks left, exit if the thread is no longer needed
187 bool last;
188 if (ReleaseThreadSlot(out last)) {
189 if (last && m_hasTasks.WaitOne(0)) {
190 if (AllocateThreadSlot(1))
191 continue; // spin again...
192 else
193 // we failed to reallocate slot for this thread
194 // therefore we need to release the event
195 m_hasTasks.Set();
196 }
197
198 return false;
199 }
200
201 // entering suspend state
202 Interlocked.Increment(ref m_suspended);
203 // keep this thread and wait
204 Suspend();
205 Interlocked.Decrement(ref m_suspended);
206 } while (true);
207 }
208
209 protected abstract void InvokeUnit(TUnit unit);
210
211 void Worker() {
212 TUnit unit;
213 while (FetchTask(out unit))
214 InvokeUnit(unit);
215 }
216
217 protected virtual void Dispose(bool disposing) {
218 if (disposing) {
219 if (m_exitRequired == 0) {
220 if (Interlocked.CompareExchange(ref m_exitRequired, 1, 0) != 0)
221 return;
222
223 // wake sleeping threads
224 m_hasTasks.Set();
225 GC.SuppressFinalize(this);
226 }
227 }
228 }
229
230 public void Dispose() {
231 Dispose(true);
232 }
233
234 ~DispatchPool() {
235 Dispose(false);
236 }
237 }
238 }
@@ -0,0 +1,74
1 using System;
2 using System.Collections.Generic;
3 using System.Linq;
4 using System.Text;
5 using System.Threading;
6
7 namespace Implab.Parallels {
8 public class MTQueue<T> {
9 class Node {
10 public Node(T value) {
11 this.value = value;
12 }
13 public readonly T value;
14 public Node next;
15 }
16
17 Node m_first;
18 Node m_last;
19
20 public void Enqueue(T value) {
21 var last = m_last;
22 var next = new Node(value);
23
24 while (last != Interlocked.CompareExchange(ref m_last, next, last))
25 last = m_last;
26
27 if (last != null)
28 last.next = next;
29 else
30 m_first = next;
31 }
32
33 public bool TryDequeue(out T value) {
34 Node first;
35 Node next = null;
36 value = default(T);
37
38 do {
39 first = m_first;
40 if (first == null)
41 return false;
42 next = first.next;
43 if (next == null) {
44 // this is the last element,
45 // then try to update tail
46 if (first != Interlocked.CompareExchange(ref m_last, null, first)) {
47 // this is inconsistent situation which means that the queue is empty
48 if (m_last == null)
49 return false;
50 // tail has been changed, that means that we need to restart
51 continue;
52 }
53
54 // tail succesfully updated and first.next will never be changed
55 // other readers will fail due to inconsistency m_last != m_fist, but m_first.next == null
56 // but the writer may update the m_first since the m_last is null
57
58 // so we need to fix inconsistency by setting m_first to null, but if it already has been
59 // updated by a writer then we should just give up
60 Interlocked.CompareExchange(ref m_first, null, first);
61 break;
62
63 } else {
64 if (first == Interlocked.CompareExchange(ref m_first, next, first))
65 // head succesfully updated
66 break;
67 }
68 } while (true);
69
70 value = first.value;
71 return true;
72 }
73 }
74 }
@@ -0,0 +1,89
1 using System;
2 using System.Collections.Generic;
3 using System.Linq;
4 using System.Text;
5 using System.Threading;
6 using System.Diagnostics;
7
8 namespace Implab.Parallels {
9 public class WorkerPool : DispatchPool<Action> {
10
11 MTQueue<Action> m_queue = new MTQueue<Action>();
12 int m_queueLength = 0;
13 readonly int m_threshold = 1;
14
15 public WorkerPool(int minThreads, int maxThreads, int threshold)
16 : base(minThreads, maxThreads) {
17 m_threshold = threshold;
18 InitPool();
19 }
20
21 public WorkerPool(int minThreads, int maxThreads) :
22 base(minThreads, maxThreads) {
23 InitPool();
24 }
25
26 public WorkerPool(int threads)
27 : base(threads) {
28 InitPool();
29 }
30
31 public WorkerPool()
32 : base() {
33 InitPool();
34 }
35
36 public Promise<T> Invoke<T>(Func<T> task) {
37 if (task == null)
38 throw new ArgumentNullException("task");
39 if (IsDisposed)
40 throw new ObjectDisposedException(ToString());
41
42 var promise = new Promise<T>();
43
44 EnqueueTask(delegate() {
45 try {
46 promise.Resolve(task());
47 } catch (Exception e) {
48 promise.Reject(e);
49 }
50 });
51
52 return promise;
53 }
54
55 protected void EnqueueTask(Action unit) {
56 Debug.Assert(unit != null);
57 var len = Interlocked.Increment(ref m_queueLength);
58 m_queue.Enqueue(unit);
59
60 if(!ExtendPool())
61 WakePool();
62 }
63
64 protected override bool ExtendPool() {
65 if (m_queueLength <= m_threshold*ThreadCount)
66 // in this case we are in active thread and it request for additional workers
67 // satisfy it only when queue is longer than threshold
68 return false;
69 return base.ExtendPool();
70 }
71
72 protected override bool TryDequeue(out Action unit) {
73 if (m_queue.TryDequeue(out unit)) {
74 Interlocked.Decrement(ref m_queueLength);
75 return true;
76 }
77 return false;
78 }
79
80 protected override void InvokeUnit(Action unit) {
81 unit();
82 }
83
84 protected override void Suspend() {
85 if (m_queueLength == 0)
86 base.Suspend();
87 }
88 }
89 }
@@ -0,0 +1,36
1 using System;
2 using System.Collections.Generic;
3 using System.Linq;
4 using System.Text;
5
6 namespace Implab
7 {
8 [Serializable]
9 public class ProgressInitEventArgs: EventArgs
10 {
11 public float MaxProgress
12 {
13 get;
14 private set;
15 }
16
17 public float CurrentProgress
18 {
19 get;
20 private set;
21 }
22
23 public string Message
24 {
25 get;
26 private set;
27 }
28
29 public ProgressInitEventArgs(float current, float max, string message)
30 {
31 this.MaxProgress = max;
32 this.CurrentProgress = current;
33 this.Message = message;
34 }
35 }
36 }
@@ -0,0 +1,15
1 using System;
2 using System.Collections.Generic;
3 using System.Linq;
4 using System.Text;
5
6 namespace Implab
7 {
8 public enum PromiseState
9 {
10 Unresolved,
11 Resolved,
12 Cancelled,
13 Rejected
14 }
15 }
@@ -0,0 +1,132
1 using System;
2 using System.Collections.Generic;
3 using System.Linq;
4 using System.Text;
5 using System.Threading;
6
7 namespace Implab
8 {
9 /// <summary>
10 /// This class allows to interact with asyncronuos task.
11 /// </summary>
12 /// <remarks>
13 /// Members of this object are thread safe.
14 /// </remarks>
15 class TaskController: IProgressNotifier, ITaskController, ICancellable
16 {
17 readonly object m_lock;
18 string m_message;
19
20 float m_current;
21 float m_max;
22
23 bool m_cancelled;
24
25 public event EventHandler<ValueEventArgs<string>> MessageUpdated;
26 public event EventHandler<ValueEventArgs<float>> ProgressUpdated;
27 public event EventHandler<ProgressInitEventArgs> ProgressInit;
28
29 public TaskController()
30 {
31 m_lock = new Object();
32 }
33
34 public string Message
35 {
36 get
37 {
38 lock (m_lock)
39 return m_message;
40 }
41 set
42 {
43 lock (m_lock)
44 {
45 m_message = value;
46 OnMessageUpdated();
47 }
48 }
49 }
50
51 public float CurrentProgress
52 {
53 get
54 {
55 lock (m_lock)
56 return m_current;
57 }
58 set
59 {
60 lock (m_lock)
61 {
62 var prev = m_current;
63 m_current = value;
64 if (m_current >= m_max)
65 m_current = m_max;
66 if (m_current != prev)
67 OnProgressUpdated();
68 }
69 }
70 }
71
72 public void InitProgress(float current, float max, string message)
73 {
74 if (max < 0)
75 throw new ArgumentOutOfRangeException("max");
76 if (current < 0 || current > max)
77 throw new ArgumentOutOfRangeException("current");
78
79 lock(m_lock) {
80 m_current = current;
81 m_max = max;
82 m_message = message;
83 OnProgressInit();
84 }
85 }
86
87 public bool Cancelled {
88 get {
89 lock (m_lock)
90 return m_cancelled;
91 }
92 }
93
94 public bool Cancel() {
95 lock (m_lock) {
96 if (!m_cancelled) {
97 m_cancelled = true;
98 return true;
99 } else {
100 return false;
101 }
102 }
103 }
104
105 protected virtual void OnMessageUpdated()
106 {
107 var temp = MessageUpdated;
108 if (temp != null)
109 {
110 temp(this, new ValueEventArgs<string>(m_message));
111 }
112 }
113
114 protected virtual void OnProgressUpdated()
115 {
116 var temp = ProgressUpdated;
117 if (temp != null)
118 {
119 temp(this,new ValueEventArgs<float>(m_current));
120 }
121 }
122
123 protected virtual void OnProgressInit()
124 {
125 var temp = ProgressInit;
126 if (temp != null)
127 {
128 temp(this, new ProgressInitEventArgs(m_current,m_max, m_message));
129 }
130 }
131 }
132 }
@@ -0,0 +1,21
1 using System;
2 using System.Collections.Generic;
3 using System.Linq;
4 using System.Text;
5
6 namespace Implab
7 {
8 [Serializable]
9 public class ValueEventArgs<T>: EventArgs
10 {
11 public ValueEventArgs(T value)
12 {
13 this.Value = value;
14 }
15 public T Value
16 {
17 get;
18 private set;
19 }
20 }
21 }
@@ -1,12 +1,13
1 syntax: glob
1 syntax: glob
2 Implab.Test/bin/
2 Implab.Test/bin/
3 *.user
3 *.user
4 Implab.Test/obj/
4 Implab.Test/obj/
5 *.userprefs
5 *.userprefs
6 Implab/bin/
6 Implab/bin/
7 Implab/obj/
7 Implab/obj/
8 TestResults/
8 TestResults/
9 Implab.Fx/obj/
9 Implab.Fx/obj/
10 Implab.Fx/bin/
10 Implab.Fx/bin/
11 Implab.Fx.Test/bin/
11 Implab.Fx.Test/bin/
12 Implab.Fx.Test/obj/
12 Implab.Fx.Test/obj/
13 _ReSharper.Implab/
@@ -1,101 +1,333
1 using System;
1 using System;
2 using Microsoft.VisualStudio.TestTools.UnitTesting;
2 using Microsoft.VisualStudio.TestTools.UnitTesting;
3 using Implab;
4 using System.Reflection;
3 using System.Reflection;
5 using System.Threading;
4 using System.Threading;
5 using Implab.Parallels;
6
6
7 namespace Implab.Tests
7 namespace Implab.Test {
8 {
8 [TestClass]
9 [TestClass]
9 public class AsyncTests {
10 public class AsyncTests
10 [TestMethod]
11 {
11 public void ResolveTest() {
12 [TestMethod]
12 int res = -1;
13 public void ResolveTest ()
13 var p = new Promise<int>();
14 {
14 p.Then(x => res = x);
15 int res = -1;
15 p.Resolve(100);
16 var p = new Promise<int> ();
16
17 p.Then (x => res = x);
17 Assert.AreEqual(res, 100);
18 p.Resolve (100);
18 }
19
20 [TestMethod]
21 public void RejectTest() {
22 int res = -1;
23 Exception err = null;
24
25 var p = new Promise<int>();
26 p.Then(x => res = x, e => err = e);
27 p.Reject(new ApplicationException("error"));
28
29 Assert.AreEqual(res, -1);
30 Assert.AreEqual(err.Message, "error");
31
32 }
33
34 [TestMethod]
35 public void JoinSuccessTest() {
36 var p = new Promise<int>();
37 p.Resolve(100);
38 Assert.AreEqual(p.Join(), 100);
39 }
19
40
20 Assert.AreEqual (res, 100);
41 [TestMethod]
21 }
42 public void JoinFailTest() {
43 var p = new Promise<int>();
44 p.Reject(new ApplicationException("failed"));
45
46 try {
47 p.Join();
48 throw new ApplicationException("WRONG!");
49 } catch (TargetInvocationException err) {
50 Assert.AreEqual(err.InnerException.Message, "failed");
51 } catch {
52 Assert.Fail("Got wrong excaption");
53 }
54 }
55
56 [TestMethod]
57 public void MapTest() {
58 var p = new Promise<int>();
59
60 var p2 = p.Map(x => x.ToString());
61 p.Resolve(100);
62
63 Assert.AreEqual(p2.Join(), "100");
64 }
65
66 [TestMethod]
67 public void FixErrorTest() {
68 var p = new Promise<int>();
69
70 var p2 = p.Error(e => 101);
71
72 p.Reject(new Exception());
73
74 Assert.AreEqual(p2.Join(), 101);
75 }
22
76
23 [TestMethod]
77 [TestMethod]
24 public void RejectTest ()
78 public void ChainTest() {
25 {
79 var p1 = new Promise<int>();
26 int res = -1;
80
27 Exception err = null;
81 var p3 = p1.Chain(x => {
82 var p2 = new Promise<string>();
83 p2.Resolve(x.ToString());
84 return p2;
85 });
86
87 p1.Resolve(100);
28
88
29 var p = new Promise<int> ();
89 Assert.AreEqual(p3.Join(), "100");
30 p.Then (x => res = x, e => err = e);
90 }
31 p.Reject (new ApplicationException ("error"));
32
91
33 Assert.AreEqual (res, -1);
92 [TestMethod]
34 Assert.AreEqual (err.Message, "error");
93 public void PoolTest() {
94 var pid = Thread.CurrentThread.ManagedThreadId;
95 var p = AsyncPool.Invoke(() => Thread.CurrentThread.ManagedThreadId);
35
96
36 }
97 Assert.AreNotEqual(pid, p.Join());
98 }
37
99
38 [TestMethod]
100 [TestMethod]
39 public void JoinSuccessTest ()
101 public void WorkerPoolSizeTest() {
40 {
102 var pool = new WorkerPool(5, 10, 0);
41 var p = new Promise<int> ();
103
42 p.Resolve (100);
104 Assert.AreEqual(5, pool.ThreadCount);
43 Assert.AreEqual (p.Join (), 100);
105
44 }
106 pool.Invoke(() => { Thread.Sleep(1000000); return 10; });
107 pool.Invoke(() => { Thread.Sleep(1000000); return 10; });
108 pool.Invoke(() => { Thread.Sleep(1000000); return 10; });
109
110 Assert.AreEqual(5, pool.ThreadCount);
111
112 for (int i = 0; i < 100; i++)
113 pool.Invoke(() => { Thread.Sleep(1000000); return 10; });
114 Thread.Sleep(100);
115 Assert.AreEqual(10, pool.ThreadCount);
116
117 pool.Dispose();
118 }
119
120 [TestMethod]
121 public void WorkerPoolCorrectTest() {
122 var pool = new WorkerPool(0,1000,100);
123
124 int iterations = 1000;
125 int pending = iterations;
126 var stop = new ManualResetEvent(false);
127
128 var count = 0;
129 for (int i = 0; i < iterations; i++) {
130 pool
131 .Invoke(() => 1)
132 .Then(x => Interlocked.Add(ref count, x))
133 .Then(x => Math.Log10(x))
134 .Anyway(() => {
135 Interlocked.Decrement(ref pending);
136 if (pending == 0)
137 stop.Set();
138 });
139 }
140
141 stop.WaitOne();
142
143 Assert.AreEqual(iterations, count);
144 Console.WriteLine("Max threads: {0}", pool.MaxRunningThreads);
145 pool.Dispose();
146
147 }
148
149 [TestMethod]
150 public void WorkerPoolDisposeTest() {
151 var pool = new WorkerPool(5, 20);
152 Assert.AreEqual(5, pool.ThreadCount);
153 pool.Dispose();
154 Thread.Sleep(100);
155 Assert.AreEqual(0, pool.ThreadCount);
156 pool.Dispose();
157 }
45
158
46 [TestMethod]
159 [TestMethod]
47 public void JoinFailTest ()
160 public void MTQueueTest() {
48 {
161 var queue = new MTQueue<int>();
49 var p = new Promise<int> ();
162 int res;
50 p.Reject (new ApplicationException ("failed"));
163
164 queue.Enqueue(10);
165 Assert.IsTrue(queue.TryDequeue(out res));
166 Assert.AreEqual(10, res);
167 Assert.IsFalse(queue.TryDequeue(out res));
168
169 for (int i = 0; i < 1000; i++)
170 queue.Enqueue(i);
171
172 for (int i = 0; i < 1000; i++) {
173 queue.TryDequeue(out res);
174 Assert.AreEqual(i, res);
175 }
176
177 int writers = 0;
178 int readers = 0;
179 var stop = new ManualResetEvent(false);
180 int total = 0;
181
182 int itemsPerWriter = 1000;
183 int writersCount = 3;
51
184
52 try {
185 for (int i = 0; i < writersCount; i++) {
53 p.Join ();
186 Interlocked.Increment(ref writers);
54 throw new ApplicationException ("WRONG!");
187 var wn = i;
55 } catch (TargetInvocationException err) {
188 AsyncPool
56 Assert.AreEqual (err.InnerException.Message, "failed");
189 .InvokeNewThread(() => {
57 } catch {
190 for (int ii = 0; ii < itemsPerWriter; ii++) {
58 Assert.Fail ("Got wrong excaption");
191 queue.Enqueue(1);
59 }
192 }
60 }
193 return 1;
194 })
195 .Anyway(() => Interlocked.Decrement(ref writers));
196 }
197
198 for (int i = 0; i < 10; i++) {
199 Interlocked.Increment(ref readers);
200 var wn = i;
201 AsyncPool
202 .InvokeNewThread(() => {
203 int t;
204 do {
205 while (queue.TryDequeue(out t))
206 Interlocked.Add(ref total, t);
207 } while (writers > 0);
208 return 1;
209 })
210 .Anyway(() => {
211 Interlocked.Decrement(ref readers);
212 if (readers == 0)
213 stop.Set();
214 });
215 }
216
217 stop.WaitOne();
218
219 Assert.AreEqual(itemsPerWriter * writersCount, total);
220 }
61
221
62 [TestMethod]
222 [TestMethod]
63 public void MapTest ()
223 public void ParallelMapTest() {
64 {
224
65 var p = new Promise<int> ();
225 int count = 100000;
226
227 double[] args = new double[count];
228 var rand = new Random();
229
230 for (int i = 0; i < count; i++)
231 args[i] = rand.NextDouble();
66
232
67 var p2 = p.Map (x => x.ToString ());
233 var t = Environment.TickCount;
68 p.Resolve (100);
234 var res = args.ParallelMap(x => Math.Sin(x*x), 4).Join();
235
236 Console.WriteLine("Map complete in {0} ms", Environment.TickCount - t);
69
237
70 Assert.AreEqual (p2.Join (), "100");
238 t = Environment.TickCount;
71 }
239 for (int i = 0; i < count; i++)
240 Assert.AreEqual(Math.Sin(args[i] * args[i]), res[i]);
241 Console.WriteLine("Verified in {0} ms", Environment.TickCount - t);
242 }
72
243
73 [TestMethod]
244 [TestMethod]
74 public void ChainTest ()
245 public void ChainedMapTest() {
75 {
246
76 var p1 = new Promise<int> ();
247 using (var pool = new WorkerPool(8,100,0)) {
248 int count = 10000;
249
250 double[] args = new double[count];
251 var rand = new Random();
252
253 for (int i = 0; i < count; i++)
254 args[i] = rand.NextDouble();
77
255
78 var p3 = p1.Chain (x => {
256 var t = Environment.TickCount;
79 var p2 = new Promise<string> ();
257 var res = args
80 p2.Resolve (x.ToString ());
258 .ChainedMap(
81 return p2;
259 x => pool.Invoke(
82 });
260 () => Math.Sin(x * x)
261 ),
262 4
263 )
264 .Join();
83
265
84 p1.Resolve (100);
266 Console.WriteLine("Map complete in {0} ms", Environment.TickCount - t);
85
267
86 Assert.AreEqual (p3.Join (), "100");
268 t = Environment.TickCount;
87 }
269 for (int i = 0; i < count; i++)
270 Assert.AreEqual(Math.Sin(args[i] * args[i]), res[i]);
271 Console.WriteLine("Verified in {0} ms", Environment.TickCount - t);
272 Console.WriteLine("Max workers: {0}", pool.MaxRunningThreads);
273 }
274 }
88
275
89 [TestMethod]
276 [TestMethod]
90 public void PoolTest ()
277 public void ParallelForEachTest() {
91 {
278
92 var pid = Thread.CurrentThread.ManagedThreadId;
279 int count = 100000;
93 var p = AsyncPool.Invoke (() => {
280
94 return Thread.CurrentThread.ManagedThreadId;
281 int[] args = new int[count];
95 });
282 var rand = new Random();
283
284 for (int i = 0; i < count; i++)
285 args[i] = (int)(rand.NextDouble() * 100);
286
287 int result = 0;
288
289 var t = Environment.TickCount;
290 args.ParallelForEach(x => Interlocked.Add(ref result, x), 4).Join();
291
292 Console.WriteLine("Iteration complete in {0} ms, result: {1}", Environment.TickCount - t, result);
293
294 int result2 = 0;
295
296 t = Environment.TickCount;
297 for (int i = 0; i < count; i++)
298 result2 += args[i];
299 Assert.AreEqual(result2, result);
300 Console.WriteLine("Verified in {0} ms", Environment.TickCount - t);
301 }
96
302
97 Assert.AreNotEqual (pid, p.Join ());
303 [TestMethod]
98 }
304 public void ComplexCase1Test() {
99 }
305 var flags = new bool[3];
306
307 // op1 (aync 200ms) => op2 (async 200ms) => op3 (sync map)
308
309 var p = PromiseHelper
310 .Sleep(200, "Alan")
311 .Cancelled(() => flags[0] = true)
312 .Chain(x =>
313 PromiseHelper
314 .Sleep(200, "Hi, " + x)
315 .Map(y => y)
316 .Cancelled(() => flags[1] = true)
317 )
318 .Cancelled(() => flags[2] = true);
319 Thread.Sleep(300);
320 p.Cancel();
321 try {
322 Assert.AreEqual(p.Join(), "Hi, Alan");
323 Assert.Fail("Shouldn't get here");
324 } catch (OperationCanceledException) {
325 }
326
327 Assert.IsFalse(flags[0]);
328 Assert.IsTrue(flags[1]);
329 Assert.IsTrue(flags[2]);
330 }
331 }
100 }
332 }
101
333
@@ -1,65 +1,66
1 <?xml version="1.0" encoding="utf-8"?>
1 <?xml version="1.0" encoding="utf-8"?>
2 <Project ToolsVersion="4.0" DefaultTargets="Build" xmlns="http://schemas.microsoft.com/developer/msbuild/2003">
2 <Project ToolsVersion="4.0" DefaultTargets="Build" xmlns="http://schemas.microsoft.com/developer/msbuild/2003">
3 <PropertyGroup>
3 <PropertyGroup>
4 <Configuration Condition=" '$(Configuration)' == '' ">Debug</Configuration>
4 <Configuration Condition=" '$(Configuration)' == '' ">Debug</Configuration>
5 <Platform Condition=" '$(Platform)' == '' ">AnyCPU</Platform>
5 <Platform Condition=" '$(Platform)' == '' ">AnyCPU</Platform>
6 <ProductVersion>
6 <ProductVersion>
7 </ProductVersion>
7 </ProductVersion>
8 <SchemaVersion>2.0</SchemaVersion>
8 <SchemaVersion>2.0</SchemaVersion>
9 <ProjectGuid>{63F92C0C-61BF-48C0-A377-8D67C3C661D0}</ProjectGuid>
9 <ProjectGuid>{63F92C0C-61BF-48C0-A377-8D67C3C661D0}</ProjectGuid>
10 <OutputType>Library</OutputType>
10 <OutputType>Library</OutputType>
11 <AppDesignerFolder>Properties</AppDesignerFolder>
11 <AppDesignerFolder>Properties</AppDesignerFolder>
12 <RootNamespace>Implab.Test</RootNamespace>
12 <RootNamespace>Implab.Test</RootNamespace>
13 <AssemblyName>Implab.Test</AssemblyName>
13 <AssemblyName>Implab.Test</AssemblyName>
14 <TargetFrameworkVersion>v4.0</TargetFrameworkVersion>
14 <TargetFrameworkVersion>v4.0</TargetFrameworkVersion>
15 <FileAlignment>512</FileAlignment>
15 <FileAlignment>512</FileAlignment>
16 <ProjectTypeGuids>{3AC096D0-A1C2-E12C-1390-A8335801FDAB};{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}</ProjectTypeGuids>
16 <ProjectTypeGuids>{3AC096D0-A1C2-E12C-1390-A8335801FDAB};{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}</ProjectTypeGuids>
17 </PropertyGroup>
17 </PropertyGroup>
18 <PropertyGroup Condition=" '$(Configuration)|$(Platform)' == 'Debug|AnyCPU' ">
18 <PropertyGroup Condition=" '$(Configuration)|$(Platform)' == 'Debug|AnyCPU' ">
19 <DebugSymbols>true</DebugSymbols>
19 <DebugSymbols>true</DebugSymbols>
20 <DebugType>full</DebugType>
20 <DebugType>full</DebugType>
21 <Optimize>false</Optimize>
21 <Optimize>false</Optimize>
22 <OutputPath>bin\Debug\</OutputPath>
22 <OutputPath>bin\Debug\</OutputPath>
23 <DefineConstants>DEBUG;TRACE</DefineConstants>
23 <DefineConstants>DEBUG;TRACE</DefineConstants>
24 <ErrorReport>prompt</ErrorReport>
24 <ErrorReport>prompt</ErrorReport>
25 <WarningLevel>4</WarningLevel>
25 <WarningLevel>4</WarningLevel>
26 </PropertyGroup>
26 </PropertyGroup>
27 <PropertyGroup Condition=" '$(Configuration)|$(Platform)' == 'Release|AnyCPU' ">
27 <PropertyGroup Condition=" '$(Configuration)|$(Platform)' == 'Release|AnyCPU' ">
28 <DebugType>pdbonly</DebugType>
28 <DebugType>pdbonly</DebugType>
29 <Optimize>true</Optimize>
29 <Optimize>true</Optimize>
30 <OutputPath>bin\Release\</OutputPath>
30 <OutputPath>bin\Release\</OutputPath>
31 <DefineConstants>TRACE</DefineConstants>
31 <DefineConstants>TRACE</DefineConstants>
32 <ErrorReport>prompt</ErrorReport>
32 <ErrorReport>prompt</ErrorReport>
33 <WarningLevel>4</WarningLevel>
33 <WarningLevel>4</WarningLevel>
34 </PropertyGroup>
34 </PropertyGroup>
35 <ItemGroup>
35 <ItemGroup>
36 <Reference Include="Microsoft.VisualStudio.QualityTools.UnitTestFramework, Version=10.0.0.0, Culture=neutral, PublicKeyToken=b03f5f7f11d50a3a, processorArchitecture=MSIL" />
36 <Reference Include="Microsoft.VisualStudio.QualityTools.UnitTestFramework, Version=10.0.0.0, Culture=neutral, PublicKeyToken=b03f5f7f11d50a3a, processorArchitecture=MSIL" />
37 <Reference Include="System" />
37 <Reference Include="System" />
38 <Reference Include="System.Core">
38 <Reference Include="System.Core">
39 <RequiredTargetFramework>3.5</RequiredTargetFramework>
39 <RequiredTargetFramework>3.5</RequiredTargetFramework>
40 </Reference>
40 </Reference>
41 </ItemGroup>
41 </ItemGroup>
42 <ItemGroup>
42 <ItemGroup>
43 <CodeAnalysisDependentAssemblyPaths Condition=" '$(VS100COMNTOOLS)' != '' " Include="$(VS100COMNTOOLS)..\IDE\PrivateAssemblies">
43 <CodeAnalysisDependentAssemblyPaths Condition=" '$(VS100COMNTOOLS)' != '' " Include="$(VS100COMNTOOLS)..\IDE\PrivateAssemblies">
44 <Visible>False</Visible>
44 <Visible>False</Visible>
45 </CodeAnalysisDependentAssemblyPaths>
45 </CodeAnalysisDependentAssemblyPaths>
46 </ItemGroup>
46 </ItemGroup>
47 <ItemGroup>
47 <ItemGroup>
48 <Compile Include="AsyncTests.cs" />
48 <Compile Include="AsyncTests.cs" />
49 <Compile Include="PromiseHelper.cs" />
49 <Compile Include="Properties\AssemblyInfo.cs" />
50 <Compile Include="Properties\AssemblyInfo.cs" />
50 </ItemGroup>
51 </ItemGroup>
51 <ItemGroup>
52 <ItemGroup>
52 <ProjectReference Include="..\Implab\Implab.csproj">
53 <ProjectReference Include="..\Implab\Implab.csproj">
53 <Project>{99B95D0D-9CF9-4F70-8ADF-F4D0AA5CB0D9}</Project>
54 <Project>{99B95D0D-9CF9-4F70-8ADF-F4D0AA5CB0D9}</Project>
54 <Name>Implab</Name>
55 <Name>Implab</Name>
55 </ProjectReference>
56 </ProjectReference>
56 </ItemGroup>
57 </ItemGroup>
57 <Import Project="$(MSBuildBinPath)\Microsoft.CSharp.targets" />
58 <Import Project="$(MSBuildBinPath)\Microsoft.CSharp.targets" />
58 <!-- To modify your build process, add your task inside one of the targets below and uncomment it.
59 <!-- To modify your build process, add your task inside one of the targets below and uncomment it.
59 Other similar extension points exist, see Microsoft.Common.targets.
60 Other similar extension points exist, see Microsoft.Common.targets.
60 <Target Name="BeforeBuild">
61 <Target Name="BeforeBuild">
61 </Target>
62 </Target>
62 <Target Name="AfterBuild">
63 <Target Name="AfterBuild">
63 </Target>
64 </Target>
64 -->
65 -->
65 </Project> No newline at end of file
66 </Project>
1 NO CONTENT: modified file, binary diff hidden
NO CONTENT: modified file, binary diff hidden
@@ -1,44 +1,56
1 <?xml version="1.0" encoding="utf-8"?>
1 <?xml version="1.0" encoding="utf-8"?>
2 <Project DefaultTargets="Build" ToolsVersion="4.0" xmlns="http://schemas.microsoft.com/developer/msbuild/2003">
2 <Project DefaultTargets="Build" ToolsVersion="4.0" xmlns="http://schemas.microsoft.com/developer/msbuild/2003">
3 <PropertyGroup>
3 <PropertyGroup>
4 <Configuration Condition=" '$(Configuration)' == '' ">Debug</Configuration>
4 <Configuration Condition=" '$(Configuration)' == '' ">Debug</Configuration>
5 <Platform Condition=" '$(Platform)' == '' ">AnyCPU</Platform>
5 <Platform Condition=" '$(Platform)' == '' ">AnyCPU</Platform>
6 <ProductVersion>10.0.0</ProductVersion>
6 <ProductVersion>10.0.0</ProductVersion>
7 <SchemaVersion>2.0</SchemaVersion>
7 <SchemaVersion>2.0</SchemaVersion>
8 <ProjectGuid>{F550F1F8-8746-4AD0-9614-855F4C4B7F05}</ProjectGuid>
8 <ProjectGuid>{F550F1F8-8746-4AD0-9614-855F4C4B7F05}</ProjectGuid>
9 <OutputType>Library</OutputType>
9 <OutputType>Library</OutputType>
10 <RootNamespace>Implab</RootNamespace>
10 <RootNamespace>Implab</RootNamespace>
11 <AssemblyName>Implab</AssemblyName>
11 <AssemblyName>Implab</AssemblyName>
12 </PropertyGroup>
12 </PropertyGroup>
13 <PropertyGroup Condition=" '$(Configuration)|$(Platform)' == 'Debug|AnyCPU' ">
13 <PropertyGroup Condition=" '$(Configuration)|$(Platform)' == 'Debug|AnyCPU' ">
14 <DebugSymbols>true</DebugSymbols>
14 <DebugSymbols>true</DebugSymbols>
15 <DebugType>full</DebugType>
15 <DebugType>full</DebugType>
16 <Optimize>false</Optimize>
16 <Optimize>false</Optimize>
17 <OutputPath>bin\Debug</OutputPath>
17 <OutputPath>bin\Debug</OutputPath>
18 <DefineConstants>DEBUG;</DefineConstants>
18 <DefineConstants>DEBUG;</DefineConstants>
19 <ErrorReport>prompt</ErrorReport>
19 <ErrorReport>prompt</ErrorReport>
20 <WarningLevel>4</WarningLevel>
20 <WarningLevel>4</WarningLevel>
21 <ConsolePause>false</ConsolePause>
21 <ConsolePause>false</ConsolePause>
22 </PropertyGroup>
22 </PropertyGroup>
23 <PropertyGroup Condition=" '$(Configuration)|$(Platform)' == 'Release|AnyCPU' ">
23 <PropertyGroup Condition=" '$(Configuration)|$(Platform)' == 'Release|AnyCPU' ">
24 <DebugType>full</DebugType>
24 <DebugType>full</DebugType>
25 <Optimize>true</Optimize>
25 <Optimize>true</Optimize>
26 <OutputPath>bin\Release</OutputPath>
26 <OutputPath>bin\Release</OutputPath>
27 <ErrorReport>prompt</ErrorReport>
27 <ErrorReport>prompt</ErrorReport>
28 <WarningLevel>4</WarningLevel>
28 <WarningLevel>4</WarningLevel>
29 <ConsolePause>false</ConsolePause>
29 <ConsolePause>false</ConsolePause>
30 </PropertyGroup>
30 </PropertyGroup>
31 <ItemGroup>
31 <ItemGroup>
32 <Reference Include="System" />
32 <Reference Include="System" />
33 </ItemGroup>
33 </ItemGroup>
34 <ItemGroup>
34 <ItemGroup>
35 <Compile Include="ICancellable.cs" />
36 <Compile Include="IProgressHandler.cs" />
37 <Compile Include="IProgressNotifier.cs" />
38 <Compile Include="IPromise.cs" />
39 <Compile Include="ITaskController.cs" />
40 <Compile Include="ManagedPromise.cs" />
41 <Compile Include="Parallels\DispatchPool.cs" />
42 <Compile Include="Parallels\ArrayTraits.cs" />
43 <Compile Include="Parallels\MTQueue.cs" />
44 <Compile Include="Parallels\WorkerPool.cs" />
45 <Compile Include="PromiseState.cs" />
46 <Compile Include="TaskController.cs" />
47 <Compile Include="ProgressInitEventArgs.cs" />
35 <Compile Include="Properties\AssemblyInfo.cs" />
48 <Compile Include="Properties\AssemblyInfo.cs" />
36 <Compile Include="Promise.cs" />
49 <Compile Include="Promise.cs" />
37 <Compile Include="AsyncPool.cs" />
50 <Compile Include="Parallels\AsyncPool.cs" />
38 <Compile Include="Safe.cs" />
51 <Compile Include="Safe.cs" />
52 <Compile Include="ValueEventArgs.cs" />
39 </ItemGroup>
53 </ItemGroup>
40 <Import Project="$(MSBuildBinPath)\Microsoft.CSharp.targets" />
54 <Import Project="$(MSBuildBinPath)\Microsoft.CSharp.targets" />
41 <ItemGroup>
55 <ItemGroup />
42 <Folder Include="Parallels\" />
43 </ItemGroup>
44 </Project> No newline at end of file
56 </Project>
@@ -1,397 +1,549
1 using System;
1 using System;
2 using System.Collections.Generic;
2 using System.Collections.Generic;
3 using System.Linq;
4 using System.Reflection;
3 using System.Reflection;
5 using System.Text;
6 using System.Diagnostics;
4 using System.Diagnostics;
7 using System.Threading;
5 using System.Threading;
8
6
9 namespace Implab {
7 namespace Implab {
10
8
11 public delegate void ErrorHandler(Exception e);
9 public delegate void ErrorHandler(Exception e);
12
10 public delegate T ErrorHandler<out T>(Exception e);
13 public delegate void ResultHandler<T>(T result);
11 public delegate void ResultHandler<in T>(T result);
14 public delegate TNew ResultMapper<TSrc, TNew>(TSrc result);
12 public delegate TNew ResultMapper<in TSrc, out TNew>(TSrc result);
15 public delegate Promise<TNew> ChainedOperation<TSrc, TNew>(TSrc result);
13 public delegate Promise<TNew> ChainedOperation<in TSrc, TNew>(TSrc result);
16
14
17 /// <summary>
15 /// <summary>
18 /// Класс для асинхронного получения результатов. Так называемое "обещание".
16 /// Класс для асинхронного получения результатов. Так называемое "обещание".
19 /// </summary>
17 /// </summary>
20 /// <typeparam name="T">Тип получаемого результата</typeparam>
18 /// <typeparam name="T">Тип получаемого результата</typeparam>
21 /// <remarks>
19 /// <remarks>
22 /// <para>Сервис при обращении к его методу дает обещаиние о выполнении операции,
20 /// <para>Сервис при обращении к его методу дает обещаиние о выполнении операции,
23 /// клиент получив такое обещание может установить ряд обратных вызово для получения
21 /// клиент получив такое обещание может установить ряд обратных вызово для получения
24 /// событий выполнения обещания, тоесть завершения операции и предоставлении результатов.</para>
22 /// событий выполнения обещания, тоесть завершения операции и предоставлении результатов.</para>
25 /// <para>
23 /// <para>
26 /// Обещение может быть как выполнено, так и выполнено с ошибкой. Для подписки на
24 /// Обещение может быть как выполнено, так и выполнено с ошибкой. Для подписки на
27 /// данные события клиент должен использовать методы <c>Then</c>.
25 /// данные события клиент должен использовать методы <c>Then</c>.
28 /// </para>
26 /// </para>
29 /// <para>
27 /// <para>
30 /// Сервис, в свою очередь, по окончанию выполнения операции (возможно с ошибкой),
28 /// Сервис, в свою очередь, по окончанию выполнения операции (возможно с ошибкой),
31 /// использует методы <c>Resolve</c> либо <c>Reject</c> для оповещения клиетна о
29 /// использует методы <c>Resolve</c> либо <c>Reject</c> для оповещения клиетна о
32 /// выполнении обещания.
30 /// выполнении обещания.
33 /// </para>
31 /// </para>
34 /// <para>
32 /// <para>
35 /// Если сервер успел выполнить обещание еще до того, как клиент на него подписался,
33 /// Если сервер успел выполнить обещание еще до того, как клиент на него подписался,
36 /// то в момент подписки клиента будут вызваны соответсвующие события в синхронном
34 /// то в момент подписки клиента будут вызваны соответсвующие события в синхронном
37 /// режиме и клиент будет оповещен в любом случае. Иначе, обработчики добавляются в
35 /// режиме и клиент будет оповещен в любом случае. Иначе, обработчики добавляются в
38 /// список в порядке подписания и в этом же порядке они будут вызваны при выполнении
36 /// список в порядке подписания и в этом же порядке они будут вызваны при выполнении
39 /// обещания.
37 /// обещания.
40 /// </para>
38 /// </para>
41 /// <para>
39 /// <para>
42 /// Обрабатывая результаты обещания можно преобразовывать результаты либо инициировать
40 /// Обрабатывая результаты обещания можно преобразовывать результаты либо инициировать
43 /// связанные асинхронные операции, которые также возвращают обещания. Для этого следует
41 /// связанные асинхронные операции, которые также возвращают обещания. Для этого следует
44 /// использовать соответствующую форму методе <c>Then</c>.
42 /// использовать соответствующую форму методе <c>Then</c>.
45 /// </para>
43 /// </para>
46 /// <para>
44 /// <para>
47 /// Также хорошим правилом является то, что <c>Resolve</c> и <c>Reject</c> должен вызывать
45 /// Также хорошим правилом является то, что <c>Resolve</c> и <c>Reject</c> должен вызывать
48 /// только инициатор обещания иначе могут возникнуть противоречия.
46 /// только инициатор обещания иначе могут возникнуть противоречия.
49 /// </para>
47 /// </para>
50 /// </remarks>
48 /// </remarks>
51 public class Promise<T> {
49 public class Promise<T> : IPromise {
52
50
53 struct ResultHandlerInfo {
51 struct ResultHandlerInfo {
54 public ResultHandler<T> resultHandler;
52 public ResultHandler<T> resultHandler;
55 public ErrorHandler errorHandler;
53 public ErrorHandler errorHandler;
56 }
54 }
57
55
58 enum State {
56 readonly IPromise m_parent;
59 Unresolved,
57
60 Resolving,
58 LinkedList<ResultHandlerInfo> m_resultHandlers = new LinkedList<ResultHandlerInfo>();
61 Resolved,
59 LinkedList<Action> m_cancelHandlers = new LinkedList<Action>();
62 Cancelled
63 }
64
60
65 LinkedList<ResultHandlerInfo> m_handlersChain = new LinkedList<ResultHandlerInfo>();
61 readonly object m_lock = new Object();
66 State m_state;
62 readonly bool m_cancellable;
67 bool m_cancellable;
63 int m_childrenCount = 0;
64
65 PromiseState m_state;
68 T m_result;
66 T m_result;
69 Exception m_error;
67 Exception m_error;
70
68
71 public Promise() {
69 public Promise() {
72 m_cancellable = true;
70 m_cancellable = true;
73 }
71 }
74
72
75 /// <summary>
73 public Promise(IPromise parent, bool cancellable) {
76 /// Событие, возникающее при отмене асинхронной операции.
74 m_cancellable = cancellable;
77 /// </summary>
75 m_parent = parent;
78 /// <description>
76 if (parent != null)
79 /// Как правило используется для оповещения объекта, выполняющего асинхронную операцию, о том, что ее следует отменить.
77 parent.HandleCancelled(InternalCancel);
80 /// </description>
78 }
81 public event EventHandler Cancelled;
79
80 void InternalCancel() {
81 // don't try to cancel parent :)
82 Cancel(false);
83 }
82
84
83 /// <summary>
85 /// <summary>
84 /// Выполняет обещание, сообщая об успешном выполнении.
86 /// Выполняет обещание, сообщая об успешном выполнении.
85 /// </summary>
87 /// </summary>
86 /// <param name="result">Результат выполнения.</param>
88 /// <param name="result">Результат выполнения.</param>
87 /// <exception cref="InvalidOperationException">Данное обещание уже выполнено</exception>
89 /// <exception cref="InvalidOperationException">Данное обещание уже выполнено</exception>
88 public void Resolve(T result) {
90 public void Resolve(T result) {
89 lock (this) {
91 lock (m_lock) {
90 if (m_state == State.Cancelled)
92 if (m_state == PromiseState.Cancelled)
91 return;
93 return;
92 if (m_state != State.Unresolved)
94 if (m_state != PromiseState.Unresolved)
93 throw new InvalidOperationException("The promise is already resolved");
95 throw new InvalidOperationException("The promise is already resolved");
94 m_result = result;
96 m_result = result;
95 m_state = State.Resolving;
97 m_state = PromiseState.Resolved;
96 }
98 }
97
99
98 ResultHandlerInfo handler;
100 OnStateChanged();
99 while (FetchNextHandler(out handler))
100 InvokeHandler(handler);
101 }
101 }
102
102
103 /// <summary>
103 /// <summary>
104 /// Выполняет обещание, сообщая об ошибке
104 /// Выполняет обещание, сообщая об ошибке
105 /// </summary>
105 /// </summary>
106 /// <remarks>
107 /// Поскольку обещание должно работать в многопточной среде, при его выполнении сразу несколько потоков
108 /// могу вернуть ошибку, при этом только первая будет использована в качестве результата, остальные
109 /// будут проигнорированы.
110 /// </remarks>
106 /// <param name="error">Исключение возникшее при выполнении операции</param>
111 /// <param name="error">Исключение возникшее при выполнении операции</param>
107 /// <exception cref="InvalidOperationException">Данное обещание уже выполнено</exception>
112 /// <exception cref="InvalidOperationException">Данное обещание уже выполнено</exception>
108 public void Reject(Exception error) {
113 public void Reject(Exception error) {
109 lock (this) {
114 lock (m_lock) {
110 if (m_state == State.Cancelled)
115 if (m_state == PromiseState.Cancelled || m_state == PromiseState.Rejected)
111 return;
116 return;
112 if (m_state != State.Unresolved)
117 if (m_state != PromiseState.Unresolved)
113 throw new InvalidOperationException("The promise is already resolved");
118 throw new InvalidOperationException("The promise is already resolved");
114 m_error = error;
119 m_error = error;
115 m_state = State.Resolving;
120 m_state = PromiseState.Rejected;
116 }
121 }
117
122
118 ResultHandlerInfo handler;
123 OnStateChanged();
119 while (FetchNextHandler(out handler))
120 InvokeHandler(handler);
121 }
124 }
122
125
123 /// <summary>
126 /// <summary>
124 /// Отменяет операцию, если это возможно.
127 /// Отменяет операцию, если это возможно.
125 /// </summary>
128 /// </summary>
126 /// <returns><c>true</c> Операция была отменена, обработчики не будут вызваны.<c>false</c> отмена не возможна, поскольку обещание уже выполнено и обработчики отработали.</returns>
129 /// <returns><c>true</c> Операция была отменена, обработчики не будут вызваны.<c>false</c> отмена не возможна, поскольку обещание уже выполнено и обработчики отработали.</returns>
127 public bool Cancel() {
130 public bool Cancel() {
128 lock (this) {
131 return Cancel(true);
129 if (m_state == State.Unresolved && m_cancellable) {
130 m_state = State.Cancelled;
131 EventHandler temp = Cancelled;
132
133 if (temp != null)
134 temp(this, new EventArgs());
135
136 return true;
137 } else
138 return false;
139 }
140 }
132 }
141
133
142 /// <summary>
134 /// <summary>
143 /// Добавляет обработчики событий выполнения обещания.
135 /// Adds new handlers to this promise.
144 /// </summary>
136 /// </summary>
145 /// <param name="success">Обработчик успешного выполнения обещания.
137 /// <param name="success">The handler of the successfully completed operation.
146 /// Данному обработчику будет передан результат выполнения операции.</param>
138 /// This handler will recieve an operation result as a parameter.</param>
147 /// <param name="error">Обработчик ошибки. Данный обработчик получит
139 /// <param name="error">Handles an exception that may occur during the operation.</param>
148 /// исключение возникшее при выполнении операции.</param>
140 /// <returns>The new promise chained to this one.</returns>
149 /// <returns>Само обещание</returns>
150 public Promise<T> Then(ResultHandler<T> success, ErrorHandler error) {
141 public Promise<T> Then(ResultHandler<T> success, ErrorHandler error) {
151 if (success == null && error == null)
142 if (success == null && error == null)
152 return this;
143 return this;
153
144
154 var medium = new Promise<T>();
145 var medium = new Promise<T>(this, true);
155
146
156 var handlerInfo = new ResultHandlerInfo();
147 var handlerInfo = new ResultHandlerInfo();
157
148
158 if (success != null)
149 if (success != null)
159 handlerInfo.resultHandler = x => {
150 handlerInfo.resultHandler = x => {
160 try {
151 success(x);
161 success(x);
152 medium.Resolve(x);
162 medium.Resolve(x);
163 } catch (Exception e) {
164 medium.Reject(e);
165 }
166 };
153 };
167 else
154 else
168 handlerInfo.resultHandler = x => medium.Resolve(x);
155 handlerInfo.resultHandler = medium.Resolve;
169
156
170 if (error != null)
157 if (error != null)
171 handlerInfo.errorHandler = x => {
158 handlerInfo.errorHandler = x => {
172 try {
159 try {
173 error(x);
160 error(x);
174 } catch { }
161 } catch { }
175 medium.Reject(x);
162 medium.Reject(x);
176 };
163 };
177 else
164 else
178 handlerInfo.errorHandler = x => medium.Reject(x);
165 handlerInfo.errorHandler = medium.Reject;
166
167 AddHandler(handlerInfo);
168
169 return medium;
170 }
171
172 /// <summary>
173 /// Adds new handlers to this promise.
174 /// </summary>
175 /// <param name="success">The handler of the successfully completed operation.
176 /// This handler will recieve an operation result as a parameter.</param>
177 /// <param name="error">Handles an exception that may occur during the operation and returns the value which will be used as the result of the operation.</param>
178 /// <returns>The new promise chained to this one.</returns>
179 public Promise<T> Then(ResultHandler<T> success, ErrorHandler<T> error) {
180 if (success == null && error == null)
181 return this;
182
183 var medium = new Promise<T>(this, true);
184
185 var handlerInfo = new ResultHandlerInfo();
186
187 if (success != null)
188 handlerInfo.resultHandler = x => {
189 success(x);
190 medium.Resolve(x);
191 };
192 else
193 handlerInfo.resultHandler = medium.Resolve;
194
195 if (error != null)
196 handlerInfo.errorHandler = x => {
197 try {
198 medium.Resolve(error(x));
199 } catch { }
200 medium.Reject(x);
201 };
202 else
203 handlerInfo.errorHandler = medium.Reject;
179
204
180 AddHandler(handlerInfo);
205 AddHandler(handlerInfo);
181
206
182 return medium;
207 return medium;
183 }
208 }
184
209
210
185 public Promise<T> Then(ResultHandler<T> success) {
211 public Promise<T> Then(ResultHandler<T> success) {
186 return Then(success, null);
212 if (success == null)
213 return this;
214
215 var medium = new Promise<T>(this, true);
216
217 var handlerInfo = new ResultHandlerInfo();
218
219 if (success != null)
220 handlerInfo.resultHandler = x => {
221 success(x);
222 medium.Resolve(x);
223 };
224 else
225 handlerInfo.resultHandler = medium.Resolve;
226
227 handlerInfo.errorHandler = medium.Reject;
228
229 AddHandler(handlerInfo);
230
231 return medium;
187 }
232 }
188
233
189 public Promise<T> Error(ErrorHandler error) {
234 public Promise<T> Error(ErrorHandler error) {
190 return Then(null, error);
235 return Then(null, error);
191 }
236 }
192
237
238 /// <summary>
239 /// Handles error and allows to keep the promise.
240 /// </summary>
241 /// <remarks>
242 /// If the specified handler throws an exception, this exception will be used to reject the promise.
243 /// </remarks>
244 /// <param name="handler">The error handler which returns the result of the promise.</param>
245 /// <returns>New promise.</returns>
246 public Promise<T> Error(ErrorHandler<T> handler) {
247 if (handler == null)
248 return this;
249
250 var medium = new Promise<T>(this, true);
251
252 AddHandler(new ResultHandlerInfo {
253 errorHandler = e => {
254 try {
255 medium.Resolve(handler(e));
256 } catch (Exception e2) {
257 medium.Reject(e2);
258 }
259 }
260 });
261
262 return medium;
263 }
264
193 public Promise<T> Anyway(Action handler) {
265 public Promise<T> Anyway(Action handler) {
194 if (handler == null)
266 if (handler == null)
195 return this;
267 return this;
196
268
197 var medium = new Promise<T>();
269 var medium = new Promise<T>();
198
270
199 AddHandler(new ResultHandlerInfo {
271 AddHandler(new ResultHandlerInfo {
200 resultHandler = x => {
272 resultHandler = x => {
273 // to avoid handler being called multiple times we handle exception by ourselfs
201 try {
274 try {
202 handler();
275 handler();
203 medium.Resolve(x);
276 medium.Resolve(x);
204 } catch (Exception e) {
277 } catch (Exception e) {
205 medium.Reject(e);
278 medium.Reject(e);
206 }
279 }
207 },
280 },
208 errorHandler = x => {
281 errorHandler = x => {
209 try {
282 try {
210 handler();
283 handler();
211 } catch { }
284 } catch { }
212 medium.Reject(x);
285 medium.Reject(x);
213 }
286 }
214 });
287 });
215
288
216 return medium;
289 return medium;
217 }
290 }
218
291
219 /// <summary>
292 /// <summary>
220 /// Позволяет преобразовать результат выполения операции к новому типу.
293 /// Позволяет преобразовать результат выполения операции к новому типу.
221 /// </summary>
294 /// </summary>
222 /// <typeparam name="TNew">Новый тип результата.</typeparam>
295 /// <typeparam name="TNew">Новый тип результата.</typeparam>
223 /// <param name="mapper">Преобразование результата к новому типу.</param>
296 /// <param name="mapper">Преобразование результата к новому типу.</param>
224 /// <param name="error">Обработчик ошибки. Данный обработчик получит
297 /// <param name="error">Обработчик ошибки. Данный обработчик получит
225 /// исключение возникшее при выполнении операции.</param>
298 /// исключение возникшее при выполнении операции.</param>
226 /// <returns>Новое обещание, которое будет выполнено при выполнении исходного обещания.</returns>
299 /// <returns>Новое обещание, которое будет выполнено при выполнении исходного обещания.</returns>
227 public Promise<TNew> Map<TNew>(ResultMapper<T, TNew> mapper, ErrorHandler error) {
300 public Promise<TNew> Map<TNew>(ResultMapper<T, TNew> mapper, ErrorHandler error) {
228 if (mapper == null)
301 if (mapper == null)
229 throw new ArgumentNullException("mapper");
302 throw new ArgumentNullException("mapper");
230
303
231 // создаем прицепленное обещание
304 // создаем прицепленное обещание
232 Promise<TNew> chained = new Promise<TNew>();
305 var chained = new Promise<TNew>();
233
306
234 AddHandler(new ResultHandlerInfo() {
307 AddHandler(new ResultHandlerInfo() {
235 resultHandler = delegate(T result) {
308 resultHandler = result => chained.Resolve(mapper(result)),
236 try {
237 // если преобразование выдаст исключение, то сработает reject сцепленного deferred
238 chained.Resolve(mapper(result));
239 } catch (Exception e) {
240 chained.Reject(e);
241 }
242 },
243 errorHandler = delegate(Exception e) {
309 errorHandler = delegate(Exception e) {
244 if (error != null)
310 if (error != null)
245 error(e);
311 try {
312 error(e);
313 } catch { }
246 // в случае ошибки нужно передать исключение дальше по цепочке
314 // в случае ошибки нужно передать исключение дальше по цепочке
247 chained.Reject(e);
315 chained.Reject(e);
248 }
316 }
249 });
317 });
250
318
251 return chained;
319 return chained;
252 }
320 }
253
321
254 public Promise<TNew> Map<TNew>(ResultMapper<T, TNew> mapper) {
322 public Promise<TNew> Map<TNew>(ResultMapper<T, TNew> mapper) {
255 return Map(mapper, null);
323 return Map(mapper, null);
256 }
324 }
257
325
258 /// <summary>
326 /// <summary>
259 /// Сцепляет несколько аснхронных операций. Указанная асинхронная операция будет вызвана после
327 /// Сцепляет несколько аснхронных операций. Указанная асинхронная операция будет вызвана после
260 /// выполнения текущей, а результат текущей операции может быть использован для инициализации
328 /// выполнения текущей, а результат текущей операции может быть использован для инициализации
261 /// новой операции.
329 /// новой операции.
262 /// </summary>
330 /// </summary>
263 /// <typeparam name="TNew">Тип результата указанной асинхронной операции.</typeparam>
331 /// <typeparam name="TNew">Тип результата указанной асинхронной операции.</typeparam>
264 /// <param name="chained">Асинхронная операция, которая должна будет начаться после выполнения текущей.</param>
332 /// <param name="chained">Асинхронная операция, которая должна будет начаться после выполнения текущей.</param>
265 /// <param name="error">Обработчик ошибки. Данный обработчик получит
333 /// <param name="error">Обработчик ошибки. Данный обработчик получит
266 /// исключение возникшее при выполнении текуещй операции.</param>
334 /// исключение возникшее при выполнении текуещй операции.</param>
267 /// <returns>Новое обещание, которое будет выполнено по окончанию указанной аснхронной операции.</returns>
335 /// <returns>Новое обещание, которое будет выполнено по окончанию указанной аснхронной операции.</returns>
268 public Promise<TNew> Chain<TNew>(ChainedOperation<T, TNew> chained, ErrorHandler error) {
336 public Promise<TNew> Chain<TNew>(ChainedOperation<T, TNew> chained, ErrorHandler error) {
269
337
270 // проблема в том, что на момент связывания еще не начата асинхронная операция, поэтому нужно
338 // проблема в том, что на момент связывания еще не начата асинхронная операция, поэтому нужно
271 // создать посредника, к которому будут подвызяваться следующие обработчики.
339 // создать посредника, к которому будут подвызяваться следующие обработчики.
272 // когда будет выполнена реальная асинхронная операция, она обратиться к посреднику, чтобы
340 // когда будет выполнена реальная асинхронная операция, она обратиться к посреднику, чтобы
273 // передать через него результаты работы.
341 // передать через него результаты работы.
274 Promise<TNew> medium = new Promise<TNew>();
342 var medium = new Promise<TNew>(this, true);
275
343
276 AddHandler(new ResultHandlerInfo() {
344 AddHandler(new ResultHandlerInfo {
277 resultHandler = delegate(T result) {
345 resultHandler = delegate(T result) {
278 try {
346 if (medium.State == PromiseState.Cancelled)
279 chained(result).Then(
347 return;
280 x => medium.Resolve(x),
348
281 e => medium.Reject(e)
349 var promise = chained(result);
282 );
350
283 } catch (Exception e) {
351 // notify chained operation that it's not needed
284 // если сцепленное действие выдало исключение вместо обещания, то передаем ошибку по цепочке
352 medium.Cancelled(() => promise.Cancel());
285 medium.Reject(e);
353 promise.Then(
286 }
354 x => medium.Resolve(x),
355 e => medium.Reject(e)
356 );
287 },
357 },
288 errorHandler = delegate(Exception e) {
358 errorHandler = delegate(Exception e) {
289 if (error != null)
359 if (error != null)
290 error(e);
360 error(e);
291 // в случае ошибки нужно передать исключение дальше по цепочке
361 // в случае ошибки нужно передать исключение дальше по цепочке
292 medium.Reject(e);
362 medium.Reject(e);
293 }
363 }
294 });
364 });
295
365
296 return medium;
366 return medium;
297 }
367 }
298
368
299 public Promise<TNew> Chain<TNew>(ChainedOperation<T, TNew> chained) {
369 public Promise<TNew> Chain<TNew>(ChainedOperation<T, TNew> chained) {
300 return Chain(chained, null);
370 return Chain(chained, null);
301 }
371 }
302
372
373 public Promise<T> Cancelled(Action handler) {
374 if (handler == null)
375 return this;
376 lock (m_lock) {
377 if (m_state == PromiseState.Unresolved)
378 m_cancelHandlers.AddLast(handler);
379 else if (m_state == PromiseState.Cancelled)
380 handler();
381 }
382 return this;
383 }
384
385 public void HandleCancelled(Action handler) {
386 Cancelled(handler);
387 }
388
303 /// <summary>
389 /// <summary>
304 /// Дожидается отложенного обещания и в случае успеха, возвращает
390 /// Дожидается отложенного обещания и в случае успеха, возвращает
305 /// его, результат, в противном случае бросает исключение.
391 /// его, результат, в противном случае бросает исключение.
306 /// </summary>
392 /// </summary>
307 /// <remarks>
393 /// <remarks>
308 /// <para>
394 /// <para>
309 /// Если ожидание обещания было прервано по таймауту, это не значит,
395 /// Если ожидание обещания было прервано по таймауту, это не значит,
310 /// что обещание было отменено или что-то в этом роде, это только
396 /// что обещание было отменено или что-то в этом роде, это только
311 /// означает, что мы его не дождались, однако все зарегистрированные
397 /// означает, что мы его не дождались, однако все зарегистрированные
312 /// обработчики, как были так остались и они будут вызваны, когда
398 /// обработчики, как были так остались и они будут вызваны, когда
313 /// обещание будет выполнено.
399 /// обещание будет выполнено.
314 /// </para>
400 /// </para>
315 /// <para>
401 /// <para>
316 /// Такое поведение вполне оправдано поскольку таймаут может истечь
402 /// Такое поведение вполне оправдано поскольку таймаут может истечь
317 /// в тот момент, когда началась обработка цепочки обработчиков, и
403 /// в тот момент, когда началась обработка цепочки обработчиков, и
318 /// к тому же текущее обещание может стоять в цепочке обещаний и его
404 /// к тому же текущее обещание может стоять в цепочке обещаний и его
319 /// отклонение может привести к непрогнозируемому результату.
405 /// отклонение может привести к непрогнозируемому результату.
320 /// </para>
406 /// </para>
321 /// </remarks>
407 /// </remarks>
322 /// <param name="timeout">Время ожидания</param>
408 /// <param name="timeout">Время ожидания</param>
323 /// <returns>Результат выполнения обещания</returns>
409 /// <returns>Результат выполнения обещания</returns>
324 public T Join(int timeout) {
410 public T Join(int timeout) {
325 ManualResetEvent evt = new ManualResetEvent(false);
411 var evt = new ManualResetEvent(false);
326 Anyway(() => evt.Set());
412 Anyway(() => evt.Set());
413 Cancelled(() => evt.Set());
327
414
328 if (!evt.WaitOne(timeout, true))
415 if (!evt.WaitOne(timeout, true))
329 throw new TimeoutException();
416 throw new TimeoutException();
330
417
331 if (m_error != null)
418 switch (State) {
332 throw new TargetInvocationException(m_error);
419 case PromiseState.Resolved:
333 else
420 return m_result;
334 return m_result;
421 case PromiseState.Cancelled:
422 throw new OperationCanceledException();
423 case PromiseState.Rejected:
424 throw new TargetInvocationException(m_error);
425 default:
426 throw new ApplicationException(String.Format("Invalid promise state {0}", State));
427 }
335 }
428 }
336
429
337 public T Join() {
430 public T Join() {
338 return Join(Timeout.Infinite);
431 return Join(Timeout.Infinite);
339 }
432 }
340
433
341 /// <summary>
342 /// Данный метод последовательно извлекает обработчики обещания и когда
343 /// их больше не осталось - ставит состояние "разрешено".
344 /// </summary>
345 /// <param name="handler">Информация об обработчике</param>
346 /// <returns>Признак того, что еще остались обработчики в очереди</returns>
347 bool FetchNextHandler(out ResultHandlerInfo handler) {
348 handler = default(ResultHandlerInfo);
349
350 lock (this) {
351 Debug.Assert(m_state == State.Resolving);
352
353 if (m_handlersChain.Count > 0) {
354 handler = m_handlersChain.First.Value;
355 m_handlersChain.RemoveFirst();
356 return true;
357 } else {
358 m_state = State.Resolved;
359 return false;
360 }
361 }
362 }
363
364 void AddHandler(ResultHandlerInfo handler) {
434 void AddHandler(ResultHandlerInfo handler) {
365 bool invokeRequired = false;
435 bool invokeRequired = false;
366
436
367 lock (this) {
437 lock (m_lock) {
368 if (m_state != State.Resolved)
438 m_childrenCount++;
369 m_handlersChain.AddLast(handler);
439 if (m_state == PromiseState.Unresolved) {
370 else
440 m_resultHandlers.AddLast(handler);
441 } else
371 invokeRequired = true;
442 invokeRequired = true;
372 }
443 }
373
444
374 // обработчики не должны блокировать сам объект
445 // обработчики не должны блокировать сам объект
375 if (invokeRequired)
446 if (invokeRequired)
376 InvokeHandler(handler);
447 InvokeHandler(handler);
377 }
448 }
378
449
379 void InvokeHandler(ResultHandlerInfo handler) {
450 void InvokeHandler(ResultHandlerInfo handler) {
380 if (m_error == null) {
451 switch (m_state) {
381 try {
452 case PromiseState.Resolved:
382 if (handler.resultHandler != null)
453 try {
383 handler.resultHandler(m_result);
454 if (handler.resultHandler != null)
384 } catch { }
455 handler.resultHandler(m_result);
385 }
456 } catch (Exception e) {
386
457 try {
387 if (m_error != null) {
458 if (handler.errorHandler != null)
388 try {
459 handler.errorHandler(e);
389 if (handler.errorHandler != null)
460 } catch { }
390 handler.errorHandler(m_error);
461 }
391 } catch { }
462 break;
463 case PromiseState.Rejected:
464 try {
465 if (handler.errorHandler != null)
466 handler.errorHandler(m_error);
467 } catch { }
468 break;
469 default:
470 // do nothing
471 return;
392 }
472 }
393 }
473 }
394
474
475 protected virtual void OnStateChanged() {
476 switch (m_state) {
477 case PromiseState.Resolved:
478 foreach (var resultHandlerInfo in m_resultHandlers)
479 try {
480 if (resultHandlerInfo.resultHandler != null)
481 resultHandlerInfo.resultHandler(m_result);
482 } catch (Exception e) {
483 try {
484 if (resultHandlerInfo.errorHandler != null)
485 resultHandlerInfo.errorHandler(e);
486 } catch { }
487 }
488 break;
489 case PromiseState.Cancelled:
490 foreach (var cancelHandler in m_cancelHandlers)
491 cancelHandler();
492 break;
493 case PromiseState.Rejected:
494 foreach (var resultHandlerInfo in m_resultHandlers)
495 try {
496 if (resultHandlerInfo.errorHandler != null)
497 resultHandlerInfo.errorHandler(m_error);
498 } catch { }
499 break;
500 default:
501 throw new InvalidOperationException(String.Format("Promise entered an invalid state {0}", m_state));
502 }
503
504 m_resultHandlers = null;
505 m_cancelHandlers = null;
506 }
507
508
509
510 public bool IsExclusive {
511 get {
512 lock (m_lock) {
513 return m_childrenCount <= 1;
514 }
515 }
516 }
517
518 public PromiseState State {
519 get {
520 lock (m_lock) {
521 return m_state;
522 }
523 }
524 }
525
526 protected bool Cancel(bool dependencies) {
527 bool result;
528
529 lock (m_lock) {
530 if (m_state == PromiseState.Unresolved) {
531 m_state = PromiseState.Cancelled;
532 result = true;
533 } else {
534 result = false;
535 }
536 }
537
538 if (result)
539 OnStateChanged();
540
541 if (dependencies && m_parent != null && m_parent.IsExclusive) {
542 m_parent.Cancel();
543 }
544
545 return result;
546 }
395
547
396 }
548 }
397 }
549 }
1 NO CONTENT: file was removed
NO CONTENT: file was removed
General Comments 0
You need to be logged in to leave comments. Login now