import { Observable, Sink, Subscribable, observe } from "../observable"; const noop = () => { }; /** Joins multiple subscribers to the single one. The resulting subscriber * will be created when the first client subscribes and will be released * with the the last client unsubscribed. * * Use this wrapper to prevent spawning multiple producers. * * @param source The source observable * @returns The wrapped producer */ export const subject = (source: Subscribable): Observable => { let subscribers: Sink[] = []; let subscription = { unsubscribe: noop }; // cleanup method to release resources held by this subscription const cleanup = (cb: (item: Sink) => void) => { const _subscribers = subscribers; subscribers = []; _subscribers.forEach(cb); subscription.unsubscribe(); }; const sink: Sink = { isClosed: () => false, complete: () => cleanup(s => s.complete()), error: e => cleanup(s => s.error(e)), next: v => subscribers.forEach(s => s.next(v)) }; return observe(client => { const _subscribers = subscribers; subscribers.push(client); if (subscribers.length === 1) subscription = source.subscribe(sink); return () => { if (_subscribers === subscribers) { const pos = subscribers.indexOf(client); if (pos >= 0) subscribers.splice(pos, 1); if (!subscribers.length) subscription.unsubscribe(); } }; }); };