observable.ts
104 lines
| 3.6 KiB
| video/mp2t
|
TypeScriptLexer
cin
|
r129 | import { isPromise } from "@implab/core-amd/safe"; | |
cin
|
r155 | import { Unsubscribable, Producer, FusedSink, Observable, Subscribable } from "./observable/interfaces"; | |
import { ObservableImpl } from "./observable/ObservableImpl"; | |||
cin
|
r102 | ||
cin
|
r118 | export const isUnsubscribable = (v: unknown): v is Unsubscribable => | |
cin
|
r102 | v !== null && v !== undefined && typeof (v as Unsubscribable).unsubscribe === "function"; | |
cin
|
r118 | export const isSubscribable = <T = unknown>(v: unknown): v is Subscribable<T> => | |
cin
|
r102 | v !== null && v !== undefined && typeof (v as Subscribable<unknown>).subscribe === "function"; | |
cin
|
r96 | ||
cin
|
r102 | const noop = () => { }; | |
cin
|
r110 | /** Wraps the producer to handle tear down logic and subscription management | |
* | |||
cin
|
r142 | * The resulting producer will invoke cleanup logic on error or complete events | |
* and will prevent calling of any method from the sink. | |||
* | |||
cin
|
r110 | * @param producer The producer to wrap | |
* @returns The wrapper producer | |||
*/ | |||
cin
|
r142 | const fuse = <T>(producer: Producer<T>) => ({ next, error, complete }: FusedSink<T>) => { | |
cin
|
r102 | let done = false; | |
cin
|
r110 | let cleanup = noop; | |
const _fin = <A extends unknown[]>(fn: (...args: A) => void) => | |||
(...args: A) => done ? | |||
void (0) : | |||
(done = true, cleanup(), fn(...args)); | |||
cin
|
r124 | const _fin0 = () => done ? void (0) : (done = true, cleanup()); | |
cin
|
r110 | const safeSink = { | |
cin
|
r109 | next: (value: T) => { !done && next(value); }, | |
cin
|
r110 | error: _fin(error), | |
complete: _fin(complete), | |||
isClosed: () => done | |||
cin
|
r109 | }; | |
cin
|
r142 | // call the producer | |
cin
|
r110 | cleanup = producer(safeSink) ?? noop; | |
cin
|
r142 | // if the producer throws exception bypass it to the caller rather then to | |
// the sink. This is a feature. | |||
// if the producer completed the sequence immediately call the cleanup in place | |||
cin
|
r124 | return done ? cleanup() : _fin0; | |
cin
|
r109 | }; | |
cin
|
r96 | ||
cin
|
r155 | export const observe = <T>(producer: Producer<T>): Observable<T> => new ObservableImpl<T>(fuse(producer)); | |
cin
|
r110 | ||
cin
|
r133 | /** Converts an array to the observable sequence of its elements. */ | |
cin
|
r155 | export const ofArray = <T>(items: T[]) => new ObservableImpl<T>( | |
cin
|
r116 | ({ next, complete }) => ( | |
items.forEach(next), | |||
complete() | |||
) | |||
); | |||
cin
|
r110 | ||
cin
|
r133 | /** Converts a subscribable to the observable */ | |
export const ofSubscribable = <T>(subscribable: Subscribable<T>) => | |||
cin
|
r142 | observe<T>(sink => { | |
cin
|
r133 | const subscription = subscribable.subscribe(sink); | |
return () => subscription.unsubscribe(); | |||
}); | |||
cin
|
r129 | const of1 = <T>(item: T | PromiseLike<T>) => observe<T>( | |
({ next, error, complete }) => | |||
isPromise(item) ? | |||
void item.then( | |||
v => (next(v), complete()), | |||
error | |||
) : | |||
(next(item), complete()) | |||
cin
|
r116 | ); | |
cin
|
r110 | ||
cin
|
r133 | /** Converts a list of parameter values to the observable sequence. The | |
* order of elements in the list will be preserved in the resulting sequence. | |||
*/ | |||
cin
|
r129 | export const of = <T>(...items: (T | PromiseLike<T>)[]) => items.length === 1 ? | |
of1(items[0]) : | |||
observe<T>( | |||
({ next, error, complete, isClosed }) => { | |||
const n = items.length; | |||
const _next = (start: number) => { | |||
if (start > 0 && isClosed()) // when resumed | |||
return; | |||
for (let i = start; i < n; i++) { | |||
const r = items[i]; | |||
if (isPromise(r)) { | |||
r.then(v => (next(v), _next(i + 1)), error); | |||
return; // suspend | |||
} else { | |||
next(r); | |||
} | |||
} | |||
complete(); | |||
}; | |||
_next(0); | |||
} | |||
); | |||
cin
|
r110 | ||
cin
|
r155 | export const empty = new ObservableImpl<never>(({ complete }) => complete()); | |
export type * from "./observable/interfaces"; |