##// END OF EJS Templates
code review, minor refactoring
cin -
r142:894b8239b953 v1.9.0-rc5 default
parent child
Show More
@@ -13,17 +13,17 export interface Observer<T> {
13 /**
13 /**
14 * Called for the next element in the sequence
14 * Called for the next element in the sequence
15 */
15 */
16 next?: (value: T) => void;
16 next?(value: T): void;
17
17
18 /**
18 /**
19 * Called once when the error occurs in the sequence.
19 * Called once when the error occurs in the sequence.
20 */
20 */
21 error?: (e: unknown) => void;
21 error?(e: unknown): void;
22
22
23 /**
23 /**
24 * Called once at the end of the sequence.
24 * Called once at the end of the sequence.
25 */
25 */
26 complete?: () => void;
26 complete?(): void;
27 }
27 }
28
28
29 /**
29 /**
@@ -55,6 +55,10 export type Sink<T> = {
55
55
56 export type Producer<T> = (sink: Sink<T>) => (void | (() => void));
56 export type Producer<T> = (sink: Sink<T>) => (void | (() => void));
57
57
58 type FusedSink<T> = Omit<Sink<T>, "isClosed">;
59
60 type FusedProducer<T> = (sink: FusedSink<T>) => (void | (() => void));
61
58 export interface Unsubscribable {
62 export interface Unsubscribable {
59 unsubscribe(): void;
63 unsubscribe(): void;
60 }
64 }
@@ -155,21 +159,24 const noop = () => { };
155 const errorFallback = (e: unknown) => trace.error("Unhandled observable error: {0}", e);
159 const errorFallback = (e: unknown) => trace.error("Unhandled observable error: {0}", e);
156
160
157 const sink = <T>(consumer: Observer<T>) => {
161 const sink = <T>(consumer: Observer<T>) => {
162 // eslint-disable-next-line @typescript-eslint/unbound-method
158 const { next, error, complete } = consumer;
163 const { next, error, complete } = consumer;
159 return {
164 return {
160 next: next ? next.bind(consumer) : noop,
165 next: next ? next.bind(consumer) : noop,
161 error: error ? error.bind(consumer) : errorFallback,
166 error: error ? error.bind(consumer) : errorFallback, // report unhandled errors
162 complete: complete ? complete.bind(consumer) : noop,
167 complete: complete ? complete.bind(consumer) : noop
163 isClosed: () => false
164 };
168 };
165 };
169 };
166
170
167 /** Wraps the producer to handle tear down logic and subscription management
171 /** Wraps the producer to handle tear down logic and subscription management
168 *
172 *
173 * The resulting producer will invoke cleanup logic on error or complete events
174 * and will prevent calling of any method from the sink.
175 *
169 * @param producer The producer to wrap
176 * @param producer The producer to wrap
170 * @returns The wrapper producer
177 * @returns The wrapper producer
171 */
178 */
172 const fuse = <T>(producer: Producer<T>) => ({ next, error, complete }: Sink<T>) => {
179 const fuse = <T>(producer: Producer<T>) => ({ next, error, complete }: FusedSink<T>) => {
173 let done = false;
180 let done = false;
174 let cleanup = noop;
181 let cleanup = noop;
175
182
@@ -186,11 +193,16 const fuse = <T>(producer: Producer<T>)
186 complete: _fin(complete),
193 complete: _fin(complete),
187 isClosed: () => done
194 isClosed: () => done
188 };
195 };
196 // call the producer
189 cleanup = producer(safeSink) ?? noop;
197 cleanup = producer(safeSink) ?? noop;
198 // if the producer throws exception bypass it to the caller rather then to
199 // the sink. This is a feature.
200
201 // if the producer completed the sequence immediately call the cleanup in place
190 return done ? cleanup() : _fin0;
202 return done ? cleanup() : _fin0;
191 };
203 };
192
204
193 const _observe = <T>(producer: Producer<T>): Observable<T> => ({
205 const _observe = <T>(producer: FusedProducer<T>): Observable<T> => ({
194 subscribe: (consumer: Observer<T>) => ({
206 subscribe: (consumer: Observer<T>) => ({
195 unsubscribe: producer(sink(consumer)) ?? noop
207 unsubscribe: producer(sink(consumer)) ?? noop
196 }),
208 }),
@@ -244,7 +256,7 const _observe = <T>(producer: Producer<
244 }
256 }
245 }),
257 }),
246
258
247 reduce: <A>(...args: [AccumulatorFn<T, A>, A] | [AccumulatorFn<T, T>]) => _observe<T | A>(({ next, complete, error, ...rest }) => {
259 reduce: <A>(...args: [AccumulatorFn<T, A>, A] | [AccumulatorFn<T, T>]) => _observe<T | A>(({ next, complete, error }) => {
248 if (args.length === 1) {
260 if (args.length === 1) {
249 const [accumulator] = args;
261 const [accumulator] = args;
250 let _acc: T;
262 let _acc: T;
@@ -261,8 +273,7 const _observe = <T>(producer: Producer<
261 complete();
273 complete();
262 }
274 }
263 },
275 },
264 error,
276 error
265 ...rest
266 });
277 });
267 } else {
278 } else {
268 const [accumulator, initial] = args;
279 const [accumulator, initial] = args;
@@ -275,8 +286,7 const _observe = <T>(producer: Producer<
275 next(_acc);
286 next(_acc);
276 complete();
287 complete();
277 },
288 },
278 error,
289 error
279 ...rest
280 });
290 });
281 }
291 }
282 }),
292 }),
@@ -299,35 +309,34 const _observe = <T>(producer: Producer<
299
309
300 next: collect(
310 next: collect(
301 producer,
311 producer,
302 ({ next, complete, error, isClosed }) => ({
312 ({ next, complete, error }) => ({
303 next: v => (next(v), complete()),
313 next: v => (next(v), complete()),
304 complete: () => error(new Error("The sequence is empty")),
314 complete: () => error(new Error("The sequence is empty")),
305 error,
315 error
306 isClosed
307 })
316 })
308 ),
317 ),
309
318
310 collect: collect(
319 collect: collect(
311 producer,
320 producer,
312 ({ next, complete, ...rest }) => {
321 ({ next, complete, error}) => {
313 const data: T[] = [];
322 const data: T[] = [];
314 return {
323 return {
315 next: v => data.push(v),
324 next: v => data.push(v),
316 complete: () => (next(data), complete()),
325 complete: () => (next(data), complete()),
317 ...rest
326 error
318 };
327 };
319 }
328 }
320 )
329 )
321 });
330 });
322
331
323 const collect = <T, U>(
332 const collect = <T, U>(
324 producer: Producer<T>,
333 producer: FusedProducer<T>,
325 collector: (result: Sink<U>) => Sink<T>
334 collector: (result: FusedSink<U>) => FusedSink<T>
326 ) => (ct = Cancellation.none) => new Promise<U>((resolve, reject) => {
335 ) => (ct = Cancellation.none) => new Promise<U>((resolve, reject) => {
327 const fused = fuse<U>(({ next, complete, error, isClosed }) => {
336 const fused = fuse<U>(({ next, complete, error, isClosed }) => {
328 const h = ct.register(error);
337 const h = ct.register(error);
329 const cleanup = !isClosed() ?
338 const cleanup = !isClosed() ?
330 producer(collector({ next, complete, error, isClosed })) ?? noop :
339 producer(collector({ next, complete, error })) ?? noop :
331 noop;
340 noop;
332
341
333 return () => {
342 return () => {
@@ -339,8 +348,7 const collect = <T, U>(
339 fused({
348 fused({
340 next: resolve,
349 next: resolve,
341 error: reject,
350 error: reject,
342 complete: noop,
351 complete: noop
343 isClosed: () => false
344 });
352 });
345 });
353 });
346
354
@@ -356,7 +364,7 export const ofArray = <T>(items: T[]) =
356
364
357 /** Converts a subscribable to the observable */
365 /** Converts a subscribable to the observable */
358 export const ofSubscribable = <T>(subscribable: Subscribable<T>) =>
366 export const ofSubscribable = <T>(subscribable: Subscribable<T>) =>
359 observe(sink => {
367 observe<T>(sink => {
360 const subscription = subscribable.subscribe(sink);
368 const subscription = subscribable.subscribe(sink);
361 return () => subscription.unsubscribe();
369 return () => subscription.unsubscribe();
362 });
370 });
@@ -2,26 +2,30 import { Observable, Sink, Subscribable,
2
2
3 const noop = () => { };
3 const noop = () => { };
4
4
5 /** Joins multiple subscribers to the single one. The resulting subscriber
5 /** Connects multiple subscribers to the single producer. The producer
6 * will be created when the first client subscribes and will be released
6 * will be created when the first client subscribes and will be released
7 * with the the last client unsubscribed.
7 * with the the last client unsubscribed.
8 *
8 *
9 * Use this wrapper to prevent spawning multiple producers.
9 * Use this wrapper to prevent spawning multiple producers.
10 *
10 *
11 * The emitted values are not cached therefore the new subscriber will not receive
12 * the values emitted before it has been subscribed.
13 *
11 * @param source The source observable
14 * @param source The source observable
12 * @returns The wrapped producer
15 * @returns The new observable
13 */
16 */
14 export const subject = <T>(source: Subscribable<T>): Observable<T> => {
17 export const subject = <T>(source: Subscribable<T>): Observable<T> => {
15 let subscribers: Sink<T>[] = [];
18 let subscribers: Sink<T>[] = []; // the list of active subscribers
16
19
17 let subscription = { unsubscribe: noop };
20 let subscription = { unsubscribe: noop }; // current subscription
18
21
19 // cleanup method to release resources held by this subscription
22 // cleanup method to release resources held by this subscription
20 const cleanup = (cb: (item: Sink<T>) => void) => {
23 const cleanup = (cb: (item: Sink<T>) => void) => {
21 const _subscribers = subscribers;
24 const _subscribers = subscribers;
22 subscribers = [];
25 subscribers = []; // this will prevent a client cleanup login to run
23 _subscribers.forEach(cb);
26 _subscribers.forEach(cb);
24 subscription.unsubscribe();
27 // we don't need subscription.unsubscribe(), because cleanup is called
28 // from complete or error methods.
25 };
29 };
26
30
27 const sink: Sink<T> = {
31 const sink: Sink<T> = {
@@ -34,14 +38,20 export const subject = <T>(source: Subsc
34 return observe(client => {
38 return observe(client => {
35 const _subscribers = subscribers;
39 const _subscribers = subscribers;
36 subscribers.push(client);
40 subscribers.push(client);
37 if (subscribers.length === 1)
41 if (subscribers.length === 1) // this is the first client
38 subscription = source.subscribe(sink);
42 subscription = source.subscribe(sink); // activate the producer
39
43
40 return () => {
44 return () => {
45 // this is a cleanup logic for an individual client
41 if (_subscribers === subscribers) {
46 if (_subscribers === subscribers) {
47 // is the current subscription to the producer is active
48
49 // find this client in the list of subscribers
42 const pos = subscribers.indexOf(client);
50 const pos = subscribers.indexOf(client);
43 if (pos >= 0)
51 if (pos >= 0)
44 subscribers.splice(pos, 1);
52 subscribers.splice(pos, 1);
53
54 // is this is the last subscriber we need to release the producer
45 if (!subscribers.length)
55 if (!subscribers.length)
46 subscription.unsubscribe();
56 subscription.unsubscribe();
47 }
57 }
General Comments 0
You need to be logged in to leave comments. Login now