subject.ts
50 lines
| 1.6 KiB
| video/mp2t
|
TypeScriptLexer
cin
|
r136 | import { Observable, Sink, Subscribable, observe } from "../observable"; | ||
cin
|
r133 | |||
const noop = () => { }; | ||||
/** Joins multiple subscribers to the single one. The resulting subscriber | ||||
* will be created when the first client subscribes and will be released | ||||
* with the the last client unsubscribed. | ||||
* | ||||
* Use this wrapper to prevent spawning multiple producers. | ||||
* | ||||
* @param source The source observable | ||||
* @returns The wrapped producer | ||||
*/ | ||||
cin
|
r136 | export const subject = <T>(source: Subscribable<T>): Observable<T> => { | ||
cin
|
r133 | let subscribers: Sink<T>[] = []; | ||
let subscription = { unsubscribe: noop }; | ||||
// cleanup method to release resources held by this subscription | ||||
const cleanup = (cb: (item: Sink<T>) => void) => { | ||||
const _subscribers = subscribers; | ||||
subscribers = []; | ||||
_subscribers.forEach(cb); | ||||
subscription.unsubscribe(); | ||||
}; | ||||
const sink: Sink<T> = { | ||||
isClosed: () => false, | ||||
complete: () => cleanup(s => s.complete()), | ||||
error: e => cleanup(s => s.error(e)), | ||||
next: v => subscribers.forEach(s => s.next(v)) | ||||
}; | ||||
cin
|
r136 | return observe(client => { | ||
cin
|
r133 | const _subscribers = subscribers; | ||
subscribers.push(client); | ||||
if (subscribers.length === 1) | ||||
subscription = source.subscribe(sink); | ||||
return () => { | ||||
if (_subscribers === subscribers) { | ||||
const pos = subscribers.indexOf(client); | ||||
if (pos >= 0) | ||||
subscribers.splice(pos, 1); | ||||
if (!subscribers.length) | ||||
subscription.unsubscribe(); | ||||
} | ||||
}; | ||||
cin
|
r136 | }); | ||
cin
|
r133 | }; | ||