##// END OF EJS Templates
code review, minor refactoring
cin -
r142:894b8239b953 v1.9.0-rc5 default
parent child
Show More
@@ -13,17 +13,17 export interface Observer<T> {
13 13 /**
14 14 * Called for the next element in the sequence
15 15 */
16 next?: (value: T) => void;
16 next?(value: T): void;
17 17
18 18 /**
19 19 * Called once when the error occurs in the sequence.
20 20 */
21 error?: (e: unknown) => void;
21 error?(e: unknown): void;
22 22
23 23 /**
24 24 * Called once at the end of the sequence.
25 25 */
26 complete?: () => void;
26 complete?(): void;
27 27 }
28 28
29 29 /**
@@ -55,6 +55,10 export type Sink<T> = {
55 55
56 56 export type Producer<T> = (sink: Sink<T>) => (void | (() => void));
57 57
58 type FusedSink<T> = Omit<Sink<T>, "isClosed">;
59
60 type FusedProducer<T> = (sink: FusedSink<T>) => (void | (() => void));
61
58 62 export interface Unsubscribable {
59 63 unsubscribe(): void;
60 64 }
@@ -155,21 +159,24 const noop = () => { };
155 159 const errorFallback = (e: unknown) => trace.error("Unhandled observable error: {0}", e);
156 160
157 161 const sink = <T>(consumer: Observer<T>) => {
162 // eslint-disable-next-line @typescript-eslint/unbound-method
158 163 const { next, error, complete } = consumer;
159 164 return {
160 165 next: next ? next.bind(consumer) : noop,
161 error: error ? error.bind(consumer) : errorFallback,
162 complete: complete ? complete.bind(consumer) : noop,
163 isClosed: () => false
166 error: error ? error.bind(consumer) : errorFallback, // report unhandled errors
167 complete: complete ? complete.bind(consumer) : noop
164 168 };
165 169 };
166 170
167 171 /** Wraps the producer to handle tear down logic and subscription management
168 172 *
173 * The resulting producer will invoke cleanup logic on error or complete events
174 * and will prevent calling of any method from the sink.
175 *
169 176 * @param producer The producer to wrap
170 177 * @returns The wrapper producer
171 178 */
172 const fuse = <T>(producer: Producer<T>) => ({ next, error, complete }: Sink<T>) => {
179 const fuse = <T>(producer: Producer<T>) => ({ next, error, complete }: FusedSink<T>) => {
173 180 let done = false;
174 181 let cleanup = noop;
175 182
@@ -186,11 +193,16 const fuse = <T>(producer: Producer<T>)
186 193 complete: _fin(complete),
187 194 isClosed: () => done
188 195 };
196 // call the producer
189 197 cleanup = producer(safeSink) ?? noop;
198 // if the producer throws exception bypass it to the caller rather then to
199 // the sink. This is a feature.
200
201 // if the producer completed the sequence immediately call the cleanup in place
190 202 return done ? cleanup() : _fin0;
191 203 };
192 204
193 const _observe = <T>(producer: Producer<T>): Observable<T> => ({
205 const _observe = <T>(producer: FusedProducer<T>): Observable<T> => ({
194 206 subscribe: (consumer: Observer<T>) => ({
195 207 unsubscribe: producer(sink(consumer)) ?? noop
196 208 }),
@@ -244,7 +256,7 const _observe = <T>(producer: Producer<
244 256 }
245 257 }),
246 258
247 reduce: <A>(...args: [AccumulatorFn<T, A>, A] | [AccumulatorFn<T, T>]) => _observe<T | A>(({ next, complete, error, ...rest }) => {
259 reduce: <A>(...args: [AccumulatorFn<T, A>, A] | [AccumulatorFn<T, T>]) => _observe<T | A>(({ next, complete, error }) => {
248 260 if (args.length === 1) {
249 261 const [accumulator] = args;
250 262 let _acc: T;
@@ -261,8 +273,7 const _observe = <T>(producer: Producer<
261 273 complete();
262 274 }
263 275 },
264 error,
265 ...rest
276 error
266 277 });
267 278 } else {
268 279 const [accumulator, initial] = args;
@@ -275,8 +286,7 const _observe = <T>(producer: Producer<
275 286 next(_acc);
276 287 complete();
277 288 },
278 error,
279 ...rest
289 error
280 290 });
281 291 }
282 292 }),
@@ -299,35 +309,34 const _observe = <T>(producer: Producer<
299 309
300 310 next: collect(
301 311 producer,
302 ({ next, complete, error, isClosed }) => ({
312 ({ next, complete, error }) => ({
303 313 next: v => (next(v), complete()),
304 314 complete: () => error(new Error("The sequence is empty")),
305 error,
306 isClosed
315 error
307 316 })
308 317 ),
309 318
310 319 collect: collect(
311 320 producer,
312 ({ next, complete, ...rest }) => {
321 ({ next, complete, error}) => {
313 322 const data: T[] = [];
314 323 return {
315 324 next: v => data.push(v),
316 325 complete: () => (next(data), complete()),
317 ...rest
326 error
318 327 };
319 328 }
320 329 )
321 330 });
322 331
323 332 const collect = <T, U>(
324 producer: Producer<T>,
325 collector: (result: Sink<U>) => Sink<T>
333 producer: FusedProducer<T>,
334 collector: (result: FusedSink<U>) => FusedSink<T>
326 335 ) => (ct = Cancellation.none) => new Promise<U>((resolve, reject) => {
327 336 const fused = fuse<U>(({ next, complete, error, isClosed }) => {
328 337 const h = ct.register(error);
329 338 const cleanup = !isClosed() ?
330 producer(collector({ next, complete, error, isClosed })) ?? noop :
339 producer(collector({ next, complete, error })) ?? noop :
331 340 noop;
332 341
333 342 return () => {
@@ -339,8 +348,7 const collect = <T, U>(
339 348 fused({
340 349 next: resolve,
341 350 error: reject,
342 complete: noop,
343 isClosed: () => false
351 complete: noop
344 352 });
345 353 });
346 354
@@ -356,7 +364,7 export const ofArray = <T>(items: T[]) =
356 364
357 365 /** Converts a subscribable to the observable */
358 366 export const ofSubscribable = <T>(subscribable: Subscribable<T>) =>
359 observe(sink => {
367 observe<T>(sink => {
360 368 const subscription = subscribable.subscribe(sink);
361 369 return () => subscription.unsubscribe();
362 370 });
@@ -2,26 +2,30 import { Observable, Sink, Subscribable,
2 2
3 3 const noop = () => { };
4 4
5 /** Joins multiple subscribers to the single one. The resulting subscriber
5 /** Connects multiple subscribers to the single producer. The producer
6 6 * will be created when the first client subscribes and will be released
7 7 * with the the last client unsubscribed.
8 8 *
9 9 * Use this wrapper to prevent spawning multiple producers.
10 10 *
11 * The emitted values are not cached therefore the new subscriber will not receive
12 * the values emitted before it has been subscribed.
13 *
11 14 * @param source The source observable
12 * @returns The wrapped producer
15 * @returns The new observable
13 16 */
14 17 export const subject = <T>(source: Subscribable<T>): Observable<T> => {
15 let subscribers: Sink<T>[] = [];
18 let subscribers: Sink<T>[] = []; // the list of active subscribers
16 19
17 let subscription = { unsubscribe: noop };
20 let subscription = { unsubscribe: noop }; // current subscription
18 21
19 22 // cleanup method to release resources held by this subscription
20 23 const cleanup = (cb: (item: Sink<T>) => void) => {
21 24 const _subscribers = subscribers;
22 subscribers = [];
25 subscribers = []; // this will prevent a client cleanup login to run
23 26 _subscribers.forEach(cb);
24 subscription.unsubscribe();
27 // we don't need subscription.unsubscribe(), because cleanup is called
28 // from complete or error methods.
25 29 };
26 30
27 31 const sink: Sink<T> = {
@@ -34,14 +38,20 export const subject = <T>(source: Subsc
34 38 return observe(client => {
35 39 const _subscribers = subscribers;
36 40 subscribers.push(client);
37 if (subscribers.length === 1)
38 subscription = source.subscribe(sink);
41 if (subscribers.length === 1) // this is the first client
42 subscription = source.subscribe(sink); // activate the producer
39 43
40 44 return () => {
45 // this is a cleanup logic for an individual client
41 46 if (_subscribers === subscribers) {
47 // is the current subscription to the producer is active
48
49 // find this client in the list of subscribers
42 50 const pos = subscribers.indexOf(client);
43 51 if (pos >= 0)
44 52 subscribers.splice(pos, 1);
53
54 // is this is the last subscriber we need to release the producer
45 55 if (!subscribers.length)
46 56 subscription.unsubscribe();
47 57 }
General Comments 0
You need to be logged in to leave comments. Login now