|
|
import { Observable, Sink, Subscribable, observe } from "../observable";
|
|
|
|
|
|
const noop = () => { };
|
|
|
|
|
|
/** Joins multiple subscribers to the single one. The resulting subscriber
|
|
|
* will be created when the first client subscribes and will be released
|
|
|
* with the the last client unsubscribed.
|
|
|
*
|
|
|
* Use this wrapper to prevent spawning multiple producers.
|
|
|
*
|
|
|
* @param source The source observable
|
|
|
* @returns The wrapped producer
|
|
|
*/
|
|
|
export const subject = <T>(source: Subscribable<T>): Observable<T> => {
|
|
|
let subscribers: Sink<T>[] = [];
|
|
|
|
|
|
let subscription = { unsubscribe: noop };
|
|
|
|
|
|
// cleanup method to release resources held by this subscription
|
|
|
const cleanup = (cb: (item: Sink<T>) => void) => {
|
|
|
const _subscribers = subscribers;
|
|
|
subscribers = [];
|
|
|
_subscribers.forEach(cb);
|
|
|
subscription.unsubscribe();
|
|
|
};
|
|
|
|
|
|
const sink: Sink<T> = {
|
|
|
isClosed: () => false,
|
|
|
complete: () => cleanup(s => s.complete()),
|
|
|
error: e => cleanup(s => s.error(e)),
|
|
|
next: v => subscribers.forEach(s => s.next(v))
|
|
|
};
|
|
|
|
|
|
return observe(client => {
|
|
|
const _subscribers = subscribers;
|
|
|
subscribers.push(client);
|
|
|
if (subscribers.length === 1)
|
|
|
subscription = source.subscribe(sink);
|
|
|
|
|
|
return () => {
|
|
|
if (_subscribers === subscribers) {
|
|
|
const pos = subscribers.indexOf(client);
|
|
|
if (pos >= 0)
|
|
|
subscribers.splice(pos, 1);
|
|
|
if (!subscribers.length)
|
|
|
subscription.unsubscribe();
|
|
|
}
|
|
|
};
|
|
|
});
|
|
|
};
|
|
|
|