import { Observable, Sink, Subscribable, observe } from "../observable"; const noop = () => { }; /** Connects multiple subscribers to the single producer. The producer * will be created when the first client subscribes and will be released * with the the last client unsubscribed. * * Use this wrapper to prevent spawning multiple producers. * * The emitted values are not cached therefore the new subscriber will not receive * the values emitted before it has been subscribed. * * @param source The source observable * @returns The new observable */ export const subject = (source: Subscribable): Observable => { let subscribers: Sink[] = []; // the list of active subscribers let subscription = { unsubscribe: noop }; // current subscription // cleanup method to release resources held by this subscription const cleanup = (cb: (item: Sink) => void) => { const _subscribers = subscribers; subscribers = []; // this will prevent a client cleanup login to run _subscribers.forEach(cb); // we don't need subscription.unsubscribe(), because cleanup is called // from complete or error methods. }; const sink: Sink = { isClosed: () => false, complete: () => cleanup(s => s.complete()), error: e => cleanup(s => s.error(e)), next: v => subscribers.forEach(s => s.next(v)) }; return observe(client => { const _subscribers = subscribers; subscribers.push(client); if (subscribers.length === 1) // this is the first client subscription = source.subscribe(sink); // activate the producer return () => { // this is a cleanup logic for an individual client if (_subscribers === subscribers) { // is the current subscription to the producer is active // find this client in the list of subscribers const pos = subscribers.indexOf(client); if (pos >= 0) subscribers.splice(pos, 1); // is this is the last subscriber we need to release the producer if (!subscribers.length) subscription.unsubscribe(); } }; }); };