observable.ts
68 lines
| 2.4 KiB
| video/mp2t
|
TypeScriptLexer
|
|
r129 | import { isPromise } from "@implab/core-amd/safe"; | |
|
|
r157 | import { Unsubscribable, Producer, Observable, Subscribable } from "./observable/interfaces"; | |
|
|
r158 | import { ObservableImpl, fuse } from "./observable/ObservableImpl"; | |
|
|
r102 | ||
|
|
r118 | export const isUnsubscribable = (v: unknown): v is Unsubscribable => | |
|
|
r102 | v !== null && v !== undefined && typeof (v as Unsubscribable).unsubscribe === "function"; | |
|
|
r118 | export const isSubscribable = <T = unknown>(v: unknown): v is Subscribable<T> => | |
|
|
r102 | v !== null && v !== undefined && typeof (v as Subscribable<unknown>).subscribe === "function"; | |
|
|
r96 | ||
|
|
r158 | export const observe = <T>(producer: Producer<T>): Observable<T> => new ObservableImpl<T>(fuse(producer)); | |
|
|
r110 | ||
|
|
r133 | /** Converts an array to the observable sequence of its elements. */ | |
|
|
r157 | export const ofArray = <T>(items: T[]): Observable<T> => new ObservableImpl<T>( | |
|
|
r116 | ({ next, complete }) => ( | |
| items.forEach(next), | |||
| complete() | |||
| ) | |||
| ); | |||
|
|
r110 | ||
|
|
r133 | /** Converts a subscribable to the observable */ | |
| export const ofSubscribable = <T>(subscribable: Subscribable<T>) => | |||
|
|
r142 | observe<T>(sink => { | |
|
|
r133 | const subscription = subscribable.subscribe(sink); | |
| return () => subscription.unsubscribe(); | |||
| }); | |||
|
|
r129 | const of1 = <T>(item: T | PromiseLike<T>) => observe<T>( | |
| ({ next, error, complete }) => | |||
| isPromise(item) ? | |||
| void item.then( | |||
| v => (next(v), complete()), | |||
| error | |||
| ) : | |||
| (next(item), complete()) | |||
|
|
r116 | ); | |
|
|
r110 | ||
|
|
r133 | /** Converts a list of parameter values to the observable sequence. The | |
| * order of elements in the list will be preserved in the resulting sequence. | |||
| */ | |||
|
|
r129 | export const of = <T>(...items: (T | PromiseLike<T>)[]) => items.length === 1 ? | |
| of1(items[0]) : | |||
| observe<T>( | |||
| ({ next, error, complete, isClosed }) => { | |||
| const n = items.length; | |||
| const _next = (start: number) => { | |||
| if (start > 0 && isClosed()) // when resumed | |||
| return; | |||
| for (let i = start; i < n; i++) { | |||
| const r = items[i]; | |||
| if (isPromise(r)) { | |||
| r.then(v => (next(v), _next(i + 1)), error); | |||
| return; // suspend | |||
| } else { | |||
| next(r); | |||
| } | |||
| } | |||
| complete(); | |||
| }; | |||
| _next(0); | |||
| } | |||
| ); | |||
|
|
r110 | ||
|
|
r157 | export const empty: Observable<never> = new ObservableImpl<never>(({ complete }) => complete()); | |
|
|
r155 | ||
| export type * from "./observable/interfaces"; |
