##// END OF EJS Templates
added 'buffer' and 'subject' observable operators
added 'buffer' and 'subject' observable operators

File last commit:

r133:a3fba6b6c42e default
r133:a3fba6b6c42e default
Show More
subject.ts
50 lines | 1.5 KiB | video/mp2t | TypeScriptLexer
cin
added 'buffer' and 'subject' observable operators
r133 import { Producer, Sink, Subscribable } from "../observable";
const noop = () => { };
/** Joins multiple subscribers to the single one. The resulting subscriber
* will be created when the first client subscribes and will be released
* with the the last client unsubscribed.
*
* Use this wrapper to prevent spawning multiple producers.
*
* @param source The source observable
* @returns The wrapped producer
*/
export const subject = <T>(source: Subscribable<T>): Producer<T> => {
let subscribers: Sink<T>[] = [];
let subscription = { unsubscribe: noop };
// cleanup method to release resources held by this subscription
const cleanup = (cb: (item: Sink<T>) => void) => {
const _subscribers = subscribers;
subscribers = [];
_subscribers.forEach(cb);
subscription.unsubscribe();
};
const sink: Sink<T> = {
isClosed: () => false,
complete: () => cleanup(s => s.complete()),
error: e => cleanup(s => s.error(e)),
next: v => subscribers.forEach(s => s.next(v))
};
return client => {
const _subscribers = subscribers;
subscribers.push(client);
if (subscribers.length === 1)
subscription = source.subscribe(sink);
return () => {
if (_subscribers === subscribers) {
const pos = subscribers.indexOf(client);
if (pos >= 0)
subscribers.splice(pos, 1);
if (!subscribers.length)
subscription.unsubscribe();
}
};
};
};