observable.ts
68 lines
| 2.4 KiB
| video/mp2t
|
TypeScriptLexer
cin
|
r129 | import { isPromise } from "@implab/core-amd/safe"; | |
cin
|
r157 | import { Unsubscribable, Producer, Observable, Subscribable } from "./observable/interfaces"; | |
cin
|
r158 | import { ObservableImpl, fuse } from "./observable/ObservableImpl"; | |
cin
|
r102 | ||
cin
|
r118 | export const isUnsubscribable = (v: unknown): v is Unsubscribable => | |
cin
|
r102 | v !== null && v !== undefined && typeof (v as Unsubscribable).unsubscribe === "function"; | |
cin
|
r118 | export const isSubscribable = <T = unknown>(v: unknown): v is Subscribable<T> => | |
cin
|
r102 | v !== null && v !== undefined && typeof (v as Subscribable<unknown>).subscribe === "function"; | |
cin
|
r96 | ||
cin
|
r158 | export const observe = <T>(producer: Producer<T>): Observable<T> => new ObservableImpl<T>(fuse(producer)); | |
cin
|
r110 | ||
cin
|
r133 | /** Converts an array to the observable sequence of its elements. */ | |
cin
|
r157 | export const ofArray = <T>(items: T[]): Observable<T> => new ObservableImpl<T>( | |
cin
|
r116 | ({ next, complete }) => ( | |
items.forEach(next), | |||
complete() | |||
) | |||
); | |||
cin
|
r110 | ||
cin
|
r133 | /** Converts a subscribable to the observable */ | |
export const ofSubscribable = <T>(subscribable: Subscribable<T>) => | |||
cin
|
r142 | observe<T>(sink => { | |
cin
|
r133 | const subscription = subscribable.subscribe(sink); | |
return () => subscription.unsubscribe(); | |||
}); | |||
cin
|
r129 | const of1 = <T>(item: T | PromiseLike<T>) => observe<T>( | |
({ next, error, complete }) => | |||
isPromise(item) ? | |||
void item.then( | |||
v => (next(v), complete()), | |||
error | |||
) : | |||
(next(item), complete()) | |||
cin
|
r116 | ); | |
cin
|
r110 | ||
cin
|
r133 | /** Converts a list of parameter values to the observable sequence. The | |
* order of elements in the list will be preserved in the resulting sequence. | |||
*/ | |||
cin
|
r129 | export const of = <T>(...items: (T | PromiseLike<T>)[]) => items.length === 1 ? | |
of1(items[0]) : | |||
observe<T>( | |||
({ next, error, complete, isClosed }) => { | |||
const n = items.length; | |||
const _next = (start: number) => { | |||
if (start > 0 && isClosed()) // when resumed | |||
return; | |||
for (let i = start; i < n; i++) { | |||
const r = items[i]; | |||
if (isPromise(r)) { | |||
r.then(v => (next(v), _next(i + 1)), error); | |||
return; // suspend | |||
} else { | |||
next(r); | |||
} | |||
} | |||
complete(); | |||
}; | |||
_next(0); | |||
} | |||
); | |||
cin
|
r110 | ||
cin
|
r157 | export const empty: Observable<never> = new ObservableImpl<never>(({ complete }) => complete()); | |
cin
|
r155 | ||
export type * from "./observable/interfaces"; |