diff --git a/djx/src/main/ts/observable.ts b/djx/src/main/ts/observable.ts --- a/djx/src/main/ts/observable.ts +++ b/djx/src/main/ts/observable.ts @@ -13,17 +13,17 @@ export interface Observer { /** * Called for the next element in the sequence */ - next?: (value: T) => void; + next?(value: T): void; /** * Called once when the error occurs in the sequence. */ - error?: (e: unknown) => void; + error?(e: unknown): void; /** * Called once at the end of the sequence. */ - complete?: () => void; + complete?(): void; } /** @@ -55,6 +55,10 @@ export type Sink = { export type Producer = (sink: Sink) => (void | (() => void)); +type FusedSink = Omit, "isClosed">; + +type FusedProducer = (sink: FusedSink) => (void | (() => void)); + export interface Unsubscribable { unsubscribe(): void; } @@ -155,21 +159,24 @@ const noop = () => { }; const errorFallback = (e: unknown) => trace.error("Unhandled observable error: {0}", e); const sink = (consumer: Observer) => { + // eslint-disable-next-line @typescript-eslint/unbound-method const { next, error, complete } = consumer; return { next: next ? next.bind(consumer) : noop, - error: error ? error.bind(consumer) : errorFallback, - complete: complete ? complete.bind(consumer) : noop, - isClosed: () => false + error: error ? error.bind(consumer) : errorFallback, // report unhandled errors + complete: complete ? complete.bind(consumer) : noop }; }; /** Wraps the producer to handle tear down logic and subscription management * + * The resulting producer will invoke cleanup logic on error or complete events + * and will prevent calling of any method from the sink. + * * @param producer The producer to wrap * @returns The wrapper producer */ -const fuse = (producer: Producer) => ({ next, error, complete }: Sink) => { +const fuse = (producer: Producer) => ({ next, error, complete }: FusedSink) => { let done = false; let cleanup = noop; @@ -186,11 +193,16 @@ const fuse = (producer: Producer) complete: _fin(complete), isClosed: () => done }; + // call the producer cleanup = producer(safeSink) ?? noop; + // if the producer throws exception bypass it to the caller rather then to + // the sink. This is a feature. + + // if the producer completed the sequence immediately call the cleanup in place return done ? cleanup() : _fin0; }; -const _observe = (producer: Producer): Observable => ({ +const _observe = (producer: FusedProducer): Observable => ({ subscribe: (consumer: Observer) => ({ unsubscribe: producer(sink(consumer)) ?? noop }), @@ -244,7 +256,7 @@ const _observe = (producer: Producer< } }), - reduce: (...args: [AccumulatorFn, A] | [AccumulatorFn]) => _observe(({ next, complete, error, ...rest }) => { + reduce: (...args: [AccumulatorFn, A] | [AccumulatorFn]) => _observe(({ next, complete, error }) => { if (args.length === 1) { const [accumulator] = args; let _acc: T; @@ -261,8 +273,7 @@ const _observe = (producer: Producer< complete(); } }, - error, - ...rest + error }); } else { const [accumulator, initial] = args; @@ -275,8 +286,7 @@ const _observe = (producer: Producer< next(_acc); complete(); }, - error, - ...rest + error }); } }), @@ -299,35 +309,34 @@ const _observe = (producer: Producer< next: collect( producer, - ({ next, complete, error, isClosed }) => ({ + ({ next, complete, error }) => ({ next: v => (next(v), complete()), complete: () => error(new Error("The sequence is empty")), - error, - isClosed + error }) ), collect: collect( producer, - ({ next, complete, ...rest }) => { + ({ next, complete, error}) => { const data: T[] = []; return { next: v => data.push(v), complete: () => (next(data), complete()), - ...rest + error }; } ) }); const collect = ( - producer: Producer, - collector: (result: Sink) => Sink + producer: FusedProducer, + collector: (result: FusedSink) => FusedSink ) => (ct = Cancellation.none) => new Promise((resolve, reject) => { const fused = fuse(({ next, complete, error, isClosed }) => { const h = ct.register(error); const cleanup = !isClosed() ? - producer(collector({ next, complete, error, isClosed })) ?? noop : + producer(collector({ next, complete, error })) ?? noop : noop; return () => { @@ -339,8 +348,7 @@ const collect = ( fused({ next: resolve, error: reject, - complete: noop, - isClosed: () => false + complete: noop }); }); @@ -356,7 +364,7 @@ export const ofArray = (items: T[]) = /** Converts a subscribable to the observable */ export const ofSubscribable = (subscribable: Subscribable) => - observe(sink => { + observe(sink => { const subscription = subscribable.subscribe(sink); return () => subscription.unsubscribe(); }); diff --git a/djx/src/main/ts/operators/subject.ts b/djx/src/main/ts/operators/subject.ts --- a/djx/src/main/ts/operators/subject.ts +++ b/djx/src/main/ts/operators/subject.ts @@ -2,26 +2,30 @@ import { Observable, Sink, Subscribable, const noop = () => { }; -/** Joins multiple subscribers to the single one. The resulting subscriber +/** Connects multiple subscribers to the single producer. The producer * will be created when the first client subscribes and will be released * with the the last client unsubscribed. * * Use this wrapper to prevent spawning multiple producers. * + * The emitted values are not cached therefore the new subscriber will not receive + * the values emitted before it has been subscribed. + * * @param source The source observable - * @returns The wrapped producer + * @returns The new observable */ export const subject = (source: Subscribable): Observable => { - let subscribers: Sink[] = []; + let subscribers: Sink[] = []; // the list of active subscribers - let subscription = { unsubscribe: noop }; + let subscription = { unsubscribe: noop }; // current subscription // cleanup method to release resources held by this subscription const cleanup = (cb: (item: Sink) => void) => { const _subscribers = subscribers; - subscribers = []; + subscribers = []; // this will prevent a client cleanup login to run _subscribers.forEach(cb); - subscription.unsubscribe(); + // we don't need subscription.unsubscribe(), because cleanup is called + // from complete or error methods. }; const sink: Sink = { @@ -34,14 +38,20 @@ export const subject = (source: Subsc return observe(client => { const _subscribers = subscribers; subscribers.push(client); - if (subscribers.length === 1) - subscription = source.subscribe(sink); + if (subscribers.length === 1) // this is the first client + subscription = source.subscribe(sink); // activate the producer return () => { + // this is a cleanup logic for an individual client if (_subscribers === subscribers) { + // is the current subscription to the producer is active + + // find this client in the list of subscribers const pos = subscribers.indexOf(client); if (pos >= 0) subscribers.splice(pos, 1); + + // is this is the last subscriber we need to release the producer if (!subscribers.length) subscription.unsubscribe(); }