subject.ts
60 lines
| 2.2 KiB
| video/mp2t
|
TypeScriptLexer
cin
|
r136 | import { Observable, Sink, Subscribable, observe } from "../observable"; | ||
cin
|
r133 | |||
const noop = () => { }; | ||||
cin
|
r142 | /** Connects multiple subscribers to the single producer. The producer | ||
cin
|
r133 | * will be created when the first client subscribes and will be released | ||
* with the the last client unsubscribed. | ||||
* | ||||
* Use this wrapper to prevent spawning multiple producers. | ||||
* | ||||
cin
|
r142 | * The emitted values are not cached therefore the new subscriber will not receive | ||
* the values emitted before it has been subscribed. | ||||
* | ||||
cin
|
r133 | * @param source The source observable | ||
cin
|
r142 | * @returns The new observable | ||
cin
|
r133 | */ | ||
cin
|
r136 | export const subject = <T>(source: Subscribable<T>): Observable<T> => { | ||
cin
|
r142 | let subscribers: Sink<T>[] = []; // the list of active subscribers | ||
cin
|
r133 | |||
cin
|
r142 | let subscription = { unsubscribe: noop }; // current subscription | ||
cin
|
r133 | |||
// cleanup method to release resources held by this subscription | ||||
const cleanup = (cb: (item: Sink<T>) => void) => { | ||||
const _subscribers = subscribers; | ||||
cin
|
r142 | subscribers = []; // this will prevent a client cleanup login to run | ||
cin
|
r133 | _subscribers.forEach(cb); | ||
cin
|
r142 | // we don't need subscription.unsubscribe(), because cleanup is called | ||
// from complete or error methods. | ||||
cin
|
r133 | }; | ||
const sink: Sink<T> = { | ||||
isClosed: () => false, | ||||
complete: () => cleanup(s => s.complete()), | ||||
error: e => cleanup(s => s.error(e)), | ||||
next: v => subscribers.forEach(s => s.next(v)) | ||||
}; | ||||
cin
|
r136 | return observe(client => { | ||
cin
|
r133 | const _subscribers = subscribers; | ||
subscribers.push(client); | ||||
cin
|
r142 | if (subscribers.length === 1) // this is the first client | ||
subscription = source.subscribe(sink); // activate the producer | ||||
cin
|
r133 | |||
return () => { | ||||
cin
|
r142 | // this is a cleanup logic for an individual client | ||
cin
|
r133 | if (_subscribers === subscribers) { | ||
cin
|
r142 | // is the current subscription to the producer is active | ||
// find this client in the list of subscribers | ||||
cin
|
r133 | const pos = subscribers.indexOf(client); | ||
if (pos >= 0) | ||||
subscribers.splice(pos, 1); | ||||
cin
|
r142 | |||
// is this is the last subscriber we need to release the producer | ||||
cin
|
r133 | if (!subscribers.length) | ||
subscription.unsubscribe(); | ||||
} | ||||
}; | ||||
cin
|
r136 | }); | ||
cin
|
r133 | }; | ||