@@ -0,0 +1,58 | |||
|
1 | import { ICancellation } from "@implab/core-amd/interfaces"; | |
|
2 | ||
|
3 | export interface Subscription { | |
|
4 | finalize(): boolean; | |
|
5 | ||
|
6 | isClosed(): boolean; | |
|
7 | ||
|
8 | teardown(cleanup: void | (() => void)): void; | |
|
9 | ||
|
10 | around<T>(next: (item: T) => void): (item: T) => void; | |
|
11 | ||
|
12 | unsubscribe(): void; | |
|
13 | } | |
|
14 | ||
|
15 | const noop = () => {}; | |
|
16 | ||
|
17 | ||
|
18 | export class SubscriptionImpl implements Subscription { | |
|
19 | ||
|
20 | private _done = false; | |
|
21 | ||
|
22 | private _cleanup = noop; | |
|
23 | ||
|
24 | finalize() { | |
|
25 | return this._done ? false : ((0,this._cleanup)(), this._done = true); | |
|
26 | } | |
|
27 | ||
|
28 | isClosed() { | |
|
29 | return this._done; | |
|
30 | } | |
|
31 | ||
|
32 | teardown(cleanup: void | (() => void)) { | |
|
33 | if (cleanup) { | |
|
34 | if (this._done) { | |
|
35 | cleanup(); | |
|
36 | } else { | |
|
37 | const _prev = this._cleanup; | |
|
38 | this._cleanup = _prev === noop ? cleanup : () => (_prev(), cleanup()); | |
|
39 | } | |
|
40 | } | |
|
41 | } | |
|
42 | ||
|
43 | cancellable(ct: ICancellation, reject: (err: unknown) => void) { | |
|
44 | if (ct.isSupported()) { | |
|
45 | const h = ct.register(reason => this.finalize() && reject(reason)); | |
|
46 | this.teardown(() => h.destroy()); | |
|
47 | } | |
|
48 | } | |
|
49 | ||
|
50 | unsubscribe() { | |
|
51 | this.finalize(); | |
|
52 | } | |
|
53 | ||
|
54 | around<T>(next: (item: T) => void): (item: T) => void { | |
|
55 | return (item) => this._done ? void(0) : next(item); | |
|
56 | } | |
|
57 | ||
|
58 | } No newline at end of file |
@@ -1,6 +1,6 | |||
|
1 | 1 | import { isPromise } from "@implab/core-amd/safe"; |
|
2 | 2 | import { Unsubscribable, Producer, Observable, Subscribable } from "./observable/interfaces"; |
|
3 | import { ObservableImpl } from "./observable/ObservableImpl"; | |
|
3 | import { ObservableImpl, fuse } from "./observable/ObservableImpl"; | |
|
4 | 4 | |
|
5 | 5 | export const isUnsubscribable = (v: unknown): v is Unsubscribable => |
|
6 | 6 | v !== null && v !== undefined && typeof (v as Unsubscribable).unsubscribe === "function"; |
@@ -8,7 +8,7 export const isUnsubscribable = (v: unkn | |||
|
8 | 8 | export const isSubscribable = <T = unknown>(v: unknown): v is Subscribable<T> => |
|
9 | 9 | v !== null && v !== undefined && typeof (v as Subscribable<unknown>).subscribe === "function"; |
|
10 | 10 | |
|
11 | export const observe = <T>(producer: Producer<T>): Observable<T> => new ObservableImpl<T>(producer); | |
|
11 | export const observe = <T>(producer: Producer<T>): Observable<T> => new ObservableImpl<T>(fuse(producer)); | |
|
12 | 12 | |
|
13 | 13 | /** Converts an array to the observable sequence of its elements. */ |
|
14 | 14 | export const ofArray = <T>(items: T[]): Observable<T> => new ObservableImpl<T>( |
@@ -3,6 +3,7 import { AccumulatorFn, Observable, Obse | |||
|
3 | 3 | import { TraceSource } from "@implab/core-amd/log/TraceSource"; |
|
4 | 4 | import { Predicate } from "@implab/core-amd/interfaces"; |
|
5 | 5 | import { Cancellation } from "@implab/core-amd/Cancellation"; |
|
6 | import { Subscription, SubscriptionImpl } from "./SubscriptionImpl"; | |
|
6 | 7 | |
|
7 | 8 | const trace = TraceSource.get(mid); |
|
8 | 9 | |
@@ -10,54 +11,23 const noop = () => { }; | |||
|
10 | 11 | |
|
11 | 12 | const errorFallback = (e: unknown) => trace.error("Unhandled observable error: {0}", e); |
|
12 | 13 | |
|
13 | const subscription = (reject: (err: unknown) => void, ct = Cancellation.none) => { | |
|
14 | let done = false; | |
|
15 | let cleanup = noop; | |
|
16 | ||
|
17 | const finalize = () => done ? false : (cleanup(), done = true); | |
|
18 | const isClosed = () => done; | |
|
19 | ||
|
20 | const teardown = (cb: void | (() => void)) => { | |
|
21 | if (cb) { | |
|
22 | if (done) { | |
|
23 | cb(); | |
|
24 | } else { | |
|
25 | const _prev = cleanup; | |
|
26 | cleanup = _prev === noop ? cb : () => (_prev(), cb()); | |
|
27 | } | |
|
28 | } | |
|
29 | }; | |
|
30 | ||
|
31 | const error = (err: unknown) => finalize() && reject(err); | |
|
32 | ||
|
33 | if (ct.isSupported()) { | |
|
34 | const h = ct.register(error); | |
|
35 | teardown(() => h.destroy()); | |
|
36 | } | |
|
37 | ||
|
38 | return { finalize, isClosed, teardown, error }; | |
|
39 | }; | |
|
40 | 14 | |
|
41 | 15 | type FusedSink<T> = Omit<Sink<T>, "isClosed">; |
|
42 | 16 | |
|
43 |
export const fuse = <T>(producer: Producer<T>) => |
|
|
44 | ||
|
45 | const reject = (err: unknown) => consumer.error ? consumer.error(err) : errorFallback(err); | |
|
46 | const { teardown, isClosed, finalize, error } = subscription(reject); | |
|
17 | export const fuse = <T>(producer: Producer<T>) => | |
|
18 | (sink: FusedSink<T>, subscription: Subscription): Unsubscribable => { | |
|
47 | 19 | |
|
48 | const next = consumer.next ? consumer.next.bind(consumer) : noop; | |
|
49 | ||
|
50 | teardown(producer({ | |
|
51 | next: next !== noop ? (v => isClosed() || next(v)) : noop, | |
|
52 | error, | |
|
53 | complete: () => finalize() && consumer.complete && consumer.complete(), | |
|
54 | isClosed | |
|
20 | subscription.teardown(producer({ | |
|
21 | next: subscription.around(sink.next), | |
|
22 | complete: () => subscription.finalize() && sink.complete(), | |
|
23 | error: err => subscription.finalize() && sink.error(err), | |
|
24 | isClosed: subscription.isClosed.bind(subscription) | |
|
55 | 25 | })); |
|
56 | 26 | |
|
57 |
return |
|
|
27 | return subscription; | |
|
58 | 28 | }; |
|
59 | 29 | |
|
60 |
type FusedProducer<T> = ( |
|
|
30 | type FusedProducer<T> = (sink: FusedSink<T>, subscription: Subscription) => void; | |
|
61 | 31 | |
|
62 | 32 | export class ObservableImpl<T> implements Observable<T> { |
|
63 | 33 | |
@@ -67,64 +37,68 export class ObservableImpl<T> implement | |||
|
67 | 37 | this._producer = producer; |
|
68 | 38 | } |
|
69 | 39 | |
|
70 | subscribe(consumer: Observer<T> = {}) { | |
|
71 |
const |
|
|
40 | subscribe(consumer: Observer<T> = {}): Unsubscribable { | |
|
41 | const error = (err: unknown) => consumer.error ? consumer.error(err) : errorFallback(err); | |
|
72 | 42 | const next = consumer.next ? consumer.next.bind(consumer) : noop; |
|
73 | ||
|
74 | const { teardown, isClosed, finalize, error } = subscription(reject); | |
|
43 | const complete = () => consumer.complete ? consumer.complete() : void (0); | |
|
75 | 44 | |
|
76 | teardown((0, this._producer)({ | |
|
77 | next: next !== noop ? (v => isClosed() || next(v)) : noop, | |
|
78 | error, | |
|
79 | complete: () => finalize() && consumer.complete && consumer.complete() | |
|
80 | })); | |
|
45 | const subscription = new SubscriptionImpl(); | |
|
81 | 46 | |
|
82 | return { unsubscribe: finalize }; | |
|
47 | this._producer({ next, error, complete }, subscription); | |
|
48 | ||
|
49 | return subscription; | |
|
83 | 50 | } |
|
84 | 51 | |
|
85 | 52 | map<T2>(mapper: (value: T) => T2) { |
|
86 | return new ObservableImpl<T2>(({ next, ...rest }) => | |
|
87 |
this._producer( |
|
|
53 | return new ObservableImpl<T2>(({ next, ...rest }, subscription) => | |
|
54 | (0, this._producer)( | |
|
55 | { | |
|
88 | 56 | next: next !== noop ? (v: T) => next(mapper(v)) : noop, |
|
89 | 57 | ...rest |
|
90 | })); | |
|
58 | }, subscription | |
|
59 | )); | |
|
91 | 60 | } |
|
92 | 61 | |
|
93 | 62 | tap({ next: tapNext, complete: tapComplete, error: tapError }: Observer<T>) { |
|
94 | return new ObservableImpl<T>(({ next, complete, error }) => | |
|
95 |
this._producer( |
|
|
63 | return new ObservableImpl<T>(({ next, complete, error }, subscription) => | |
|
64 | (0, this._producer)( | |
|
65 | { | |
|
96 | 66 | next: tapNext ? (v => (tapNext(v), next(v))) : next, |
|
97 | 67 | complete: tapComplete ? (() => (tapComplete(), complete())) : complete, |
|
98 | 68 | error: tapError ? (e => (tapError(e), error(e))) : error |
|
99 |
} |
|
|
69 | }, | |
|
70 | subscription | |
|
71 | )); | |
|
100 | 72 | } |
|
101 | 73 | |
|
102 | 74 | filter(predicate: Predicate<T>) { |
|
103 | return new ObservableImpl<T>(({ next, ...rest }) => | |
|
104 |
this._producer( |
|
|
75 | return new ObservableImpl<T>(({ next, ...rest }, subscription) => | |
|
76 | (0, this._producer)( | |
|
77 | { | |
|
105 | 78 | next: next !== noop ? (v: T) => predicate(v) ? next(v) : void (0) : noop, |
|
106 | 79 | ...rest |
|
107 |
} |
|
|
108 | ); | |
|
80 | }, | |
|
81 | subscription | |
|
82 | )); | |
|
109 | 83 | } |
|
110 | 84 | |
|
111 | 85 | until(predicate: Predicate<T>) { |
|
112 | return new ObservableImpl<T>(({ next, complete, ...rest }) => | |
|
113 | this._producer({ | |
|
114 | next: v => predicate(v) ? complete() : next(v), | |
|
86 | return new ObservableImpl<T>(({ next, complete, ...rest }, subscription) => | |
|
87 | (0, this._producer)({ | |
|
88 | next: v => predicate(v) ? subscription.finalize() && complete() : next(v), | |
|
115 | 89 | complete, |
|
116 | 90 | ...rest |
|
117 | }) | |
|
91 | }, subscription) | |
|
118 | 92 | ); |
|
119 | 93 | } |
|
120 | 94 | |
|
121 | 95 | while(predicate: Predicate<T>) { |
|
122 | return new ObservableImpl<T>(({ next, complete, ...rest }) => | |
|
123 | this._producer({ | |
|
124 | next: v => predicate(v) ? next(v) : complete(), | |
|
96 | return new ObservableImpl<T>(({ next, complete, ...rest }, subscription) => | |
|
97 | (0, this._producer)({ | |
|
98 | next: v => predicate(v) ? next(v) : subscription.finalize() && complete(), | |
|
125 | 99 | complete, |
|
126 | 100 | ...rest |
|
127 | }) | |
|
101 | }, subscription) | |
|
128 | 102 | ); |
|
129 | 103 | } |
|
130 | 104 | |
@@ -132,23 +106,26 export class ObservableImpl<T> implement | |||
|
132 | 106 | scan(accumulator: AccumulatorFn<T, T>): Observable<T>; |
|
133 | 107 | scan<A>(...args: [AccumulatorFn<T, A>, A] | [AccumulatorFn<T, T>]) { |
|
134 | 108 | if (args.length === 1) { |
|
135 | return new ObservableImpl<T | A>(({ next, ...rest }) => { | |
|
136 | 109 |
|
|
110 | ||
|
111 | return new ObservableImpl<T | A>(({ next, ...rest }, subscription) => { | |
|
137 | 112 | let _acc: T; |
|
138 | 113 | let index = 0; |
|
139 |
|
|
|
140 |
next: next !== noop ? (v: T) => next( |
|
|
114 | (0, this._producer)({ | |
|
115 | next: next !== noop ? (v: T) => next( | |
|
116 | index++ === 0 ? _acc = v : _acc = accumulator(_acc, v) | |
|
117 | ) : noop, | |
|
141 | 118 | ...rest |
|
142 | }); | |
|
119 | }, subscription); | |
|
143 | 120 | }); |
|
144 | 121 | } else { |
|
145 | return new ObservableImpl<T | A>(({ next, ...rest }) => { | |
|
146 | 122 |
|
|
123 | return new ObservableImpl<T | A>(({ next, ...rest }, subscription) => { | |
|
147 | 124 | let _acc = initial; |
|
148 |
|
|
|
125 | (0, this._producer)({ | |
|
149 | 126 | next: next !== noop ? (v: T) => next(_acc = accumulator(_acc, v)) : noop, |
|
150 | 127 | ...rest |
|
151 | }); | |
|
128 | }, subscription); | |
|
152 | 129 | }); |
|
153 | 130 | } |
|
154 | 131 | } |
@@ -158,11 +135,11 export class ObservableImpl<T> implement | |||
|
158 | 135 | reduce<A>(...args: [AccumulatorFn<T, A>, A] | [AccumulatorFn<T, T>]) { |
|
159 | 136 | |
|
160 | 137 | if (args.length === 1) { |
|
161 | return new ObservableImpl<T | A>(({ next, complete, error }) => { | |
|
162 | 138 |
|
|
139 | return new ObservableImpl<T>(({ next, complete, error }, subscription) => { | |
|
163 | 140 | let _acc: T; |
|
164 | 141 | let index = 0; |
|
165 |
|
|
|
142 | (0, this._producer)({ | |
|
166 | 143 | next: next !== noop ? (v: T) => { |
|
167 | 144 | _acc = index++ === 0 ? v : accumulator(_acc, v); |
|
168 | 145 | } : noop, |
@@ -175,13 +152,13 export class ObservableImpl<T> implement | |||
|
175 | 152 | } |
|
176 | 153 | }, |
|
177 | 154 | error |
|
178 | }); | |
|
155 | }, subscription); | |
|
179 | 156 | }); |
|
180 | 157 | } else { |
|
181 | return new ObservableImpl<T | A>(({ next, complete, error }) => { | |
|
182 | 158 |
|
|
159 | return new ObservableImpl<A>(({ next, complete, error }, subscription) => { | |
|
183 | 160 | let _acc = initial; |
|
184 |
|
|
|
161 | (0, this._producer)({ | |
|
185 | 162 | next: next !== noop ? (v: T) => { |
|
186 | 163 | _acc = accumulator(_acc, v); |
|
187 | 164 | } : noop, |
@@ -190,24 +167,23 export class ObservableImpl<T> implement | |||
|
190 | 167 | complete(); |
|
191 | 168 | }, |
|
192 | 169 | error |
|
193 | }); | |
|
170 | }, subscription); | |
|
194 | 171 | }); |
|
195 | 172 | } |
|
196 | 173 | } |
|
197 | 174 | |
|
198 | 175 | cat(...seq: Subscribable<T>[]) { |
|
199 | return new ObservableImpl<T>(({ next, complete: final, ...rest }) => { | |
|
200 | let cleanup: () => void; | |
|
176 | return new ObservableImpl<T>(({ next, complete: final, ...rest }, subscription) => { | |
|
201 | 177 | const len = seq.length; |
|
202 | 178 | const complete = (i: number) => i < len ? |
|
203 | 179 | () => { |
|
204 |
|
|
|
205 |
|
|
|
206 | } : final; | |
|
180 | h = seq[i].subscribe({ next, complete: complete(i + 1), ...rest }); | |
|
181 | } : () => subscription.finalize() && final(); | |
|
182 | ||
|
207 | 183 | |
|
208 |
|
|
|
184 | let h = this.subscribe({ next, complete: complete(0), ...rest }); | |
|
185 | subscription.teardown(() => h.unsubscribe()); | |
|
209 | 186 | |
|
210 | return () => cleanup(); | |
|
211 | 187 | }); |
|
212 | 188 | } |
|
213 | 189 | |
@@ -219,13 +195,14 export class ObservableImpl<T> implement | |||
|
219 | 195 | return new Promise<T>((resolve, reject) => { |
|
220 | 196 | ct.throwIfRequested(); |
|
221 | 197 | |
|
222 | const { teardown, finalize, isClosed, error } = subscription(reject, ct); | |
|
223 | teardown((0, this._producer)({ | |
|
224 | next: (item: T) => finalize() && resolve(item), | |
|
225 | complete: () => finalize() && reject(new Error("The sequence is empty")), | |
|
226 |
|
|
|
227 | isClosed | |
|
228 | })); | |
|
198 | const subscription = new SubscriptionImpl(); | |
|
199 | subscription.cancellable(ct, reject); | |
|
200 | ||
|
201 | (0, this._producer)({ | |
|
202 | next: resolve, | |
|
203 | complete: () => reject(new Error("The sequence is empty")), | |
|
204 | error: reject, | |
|
205 | }, subscription); | |
|
229 | 206 | |
|
230 | 207 | }); |
|
231 | 208 | } |
@@ -234,15 +211,16 export class ObservableImpl<T> implement | |||
|
234 | 211 | return new Promise<T[]>((resolve, reject) => { |
|
235 | 212 | ct.throwIfRequested(); |
|
236 | 213 | |
|
214 | const subscription = new SubscriptionImpl(); | |
|
215 | subscription.cancellable(ct, reject); | |
|
216 | ||
|
237 | 217 | const acc: T[] = []; |
|
238 | 218 | |
|
239 | const { teardown, finalize, isClosed, error } = subscription(reject, ct); | |
|
240 | teardown((0, this._producer)({ | |
|
241 | next: (item: T) => isClosed() ? void (0) : acc.push(item), | |
|
242 | complete: () => finalize() && resolve(acc), | |
|
243 | error, | |
|
244 | isClosed | |
|
245 | })); | |
|
219 | (0, this._producer)({ | |
|
220 | next: (item: T) => acc.push(item), | |
|
221 | complete: () => resolve(acc), | |
|
222 | error: reject | |
|
223 | }, subscription); | |
|
246 | 224 | }); |
|
247 | 225 | } |
|
248 | 226 | |
@@ -250,14 +228,14 export class ObservableImpl<T> implement | |||
|
250 | 228 | return new Promise<void>((resolve, reject) => { |
|
251 | 229 | ct.throwIfRequested(); |
|
252 | 230 | |
|
253 | const { teardown, finalize, isClosed, error } = subscription(reject, ct); | |
|
231 | const subscription = new SubscriptionImpl(); | |
|
232 | subscription.cancellable(ct, reject); | |
|
254 | 233 | |
|
255 |
|
|
|
256 | next: item => !isClosed() && next(item), | |
|
257 |
complete: |
|
|
258 |
error |
|
|
259 | isClosed | |
|
260 | })); | |
|
234 | (0, this._producer)({ | |
|
235 | next, | |
|
236 | complete: resolve, | |
|
237 | error: reject | |
|
238 | }, subscription); | |
|
261 | 239 | }); |
|
262 | 240 | } |
|
263 | 241 |
General Comments 0
You need to be logged in to leave comments.
Login now