observable.ts
229 lines
| 6.9 KiB
| video/mp2t
|
TypeScriptLexer
|
|
r110 | import { PromiseOrValue } from "@implab/core-amd/interfaces"; | ||
| import { isPromise } from "@implab/core-amd/safe"; | ||||
|
|
r102 | /** | ||
| * The interface for the consumer of an observable sequence | ||||
| */ | ||||
| export interface Observer<T> { | ||||
| /** | ||||
| * Called for the next element in the sequence | ||||
| */ | ||||
| next: (value: T) => void; | ||||
|
|
r96 | |||
|
|
r102 | /** | ||
| * Called once when the error occurs in the sequence. | ||||
| */ | ||||
|
|
r96 | error: (e: unknown) => void; | ||
|
|
r102 | |||
| /** | ||||
| * Called once at the end of the sequence. | ||||
| */ | ||||
|
|
r96 | complete: () => void; | ||
| } | ||||
|
|
r102 | /** | ||
|
|
r110 | * The group of functions to feed an observable. These methods are provided to | ||
|
|
r102 | * the producer to generate a stream of events. | ||
| */ | ||||
| export type Sink<T> = { | ||||
|
|
r110 | /** | ||
| * Call to send the next element in the sequence | ||||
| */ | ||||
| next: (value: T) => void; | ||||
| /** | ||||
| * Call to notify about the error occurred in the sequence. | ||||
| */ | ||||
| error: (e: unknown) => void; | ||||
| /** | ||||
| * Call to signal the end of the sequence. | ||||
| */ | ||||
| complete: () => void; | ||||
| /** | ||||
| * Checks whether the sink is accepting new elements. It's safe to | ||||
| * send elements to the closed sink. | ||||
| */ | ||||
| isClosed: () => boolean; | ||||
|
|
r102 | }; | ||
|
|
r96 | |||
| export type Producer<T> = (sink: Sink<T>) => (void | (() => void)); | ||||
|
|
r102 | export interface Unsubscribable { | ||
| unsubscribe(): void; | ||||
| } | ||||
| export const isUnsubsribable = (v: unknown): v is Unsubscribable => | ||||
| v !== null && v !== undefined && typeof (v as Unsubscribable).unsubscribe === "function"; | ||||
|
|
r107 | export const isSubsribable = <T = unknown>(v: unknown): v is Subscribable<T> => | ||
|
|
r102 | v !== null && v !== undefined && typeof (v as Subscribable<unknown>).subscribe === "function"; | ||
|
|
r96 | |||
|
|
r102 | export interface Subscribable<T> { | ||
| subscribe(consumer: Partial<Observer<T>>): Unsubscribable; | ||||
| } | ||||
| /** The observable source of items. */ | ||||
| export interface Observable<T> extends Subscribable<T> { | ||||
| /** Transforms elements of the sequence with the specified mapper | ||||
| * | ||||
| * @param mapper The mapper used to transform the values | ||||
| */ | ||||
| map<T2>(mapper: (value: T) => T2): Observable<T2>; | ||||
|
|
r96 | |||
|
|
r102 | /** Filters elements of the sequence. The resulting sequence will | ||
| * contain only elements which match the specified predicate. | ||||
| * | ||||
| * @param predicate The filter predicate. | ||||
| */ | ||||
| filter(predicate: (value: T) => boolean): Observable<T>; | ||||
|
|
r96 | |||
|
|
r102 | /** Applies accumulator to each value in the sequence and | ||
| * emits the accumulated value for each source element | ||||
| * | ||||
| * @param accumulator | ||||
| * @param initial | ||||
| */ | ||||
| scan<A>(accumulator: (acc: A, value: T) => A, initial: A): Observable<A>; | ||||
|
|
r110 | |||
| cat(...seq: Subscribable<T>[]): Observable<T>; | ||||
|
|
r102 | } | ||
| const noop = () => { }; | ||||
| const sink = <T>(consumer: Partial<Observer<T>>) => { | ||||
| const { next, error, complete } = consumer; | ||||
|
|
r96 | return { | ||
|
|
r102 | next: next ? next.bind(consumer) : noop, | ||
| error: error ? error.bind(consumer) : noop, | ||||
|
|
r110 | complete: complete ? complete.bind(consumer) : noop, | ||
| isClosed: () => false | ||||
|
|
r109 | }; | ||
|
|
r102 | }; | ||
|
|
r110 | /** Wraps the producer to handle tear down logic and subscription management | ||
| * | ||||
| * @param producer The producer to wrap | ||||
| * @returns The wrapper producer | ||||
| */ | ||||
| const fuse = <T>(producer: Producer<T>) => ({ next, error, complete }: Sink<T>) => { | ||||
|
|
r102 | let done = false; | ||
|
|
r110 | let cleanup = noop; | ||
| const _fin = <A extends unknown[]>(fn: (...args: A) => void) => | ||||
| (...args: A) => done ? | ||||
| void (0) : | ||||
| (done = true, cleanup(), fn(...args)); | ||||
| const safeSink = { | ||||
|
|
r109 | next: (value: T) => { !done && next(value); }, | ||
|
|
r110 | error: _fin(error), | ||
| complete: _fin(complete), | ||||
| isClosed: () => done | ||||
|
|
r109 | }; | ||
|
|
r110 | cleanup = producer(safeSink) ?? noop; | ||
| return done ? | ||||
| (cleanup(), noop) : | ||||
| _fin(noop); | ||||
|
|
r109 | }; | ||
|
|
r96 | |||
|
|
r102 | const _observe = <T>(producer: Producer<T>): Observable<T> => ({ | ||
| subscribe: (consumer: Partial<Observer<T>>) => ({ | ||||
| unsubscribe: producer(sink(consumer)) ?? noop | ||||
| }), | ||||
|
|
r110 | map: (mapper) => _observe(({ next, ...rest }) => | ||
|
|
r102 | producer({ | ||
| next: next !== noop ? (v: T) => next(mapper(v)) : noop, | ||||
|
|
r110 | ...rest | ||
| }) | ||||
| ), | ||||
| filter: (predicate) => _observe(({ next, ...rest }) => | ||||
| producer({ | ||||
| next: next !== noop ? (v: T) => predicate(v) ? next(v) : void (0) : noop, | ||||
| ...rest | ||||
|
|
r102 | }) | ||
| ), | ||||
|
|
r110 | scan: (accumulator, initial) => _observe(({ next, ...rest }) => { | ||
|
|
r102 | let _acc = initial; | ||
| return producer({ | ||||
|
|
r110 | next: next !== noop ? (v: T) => next(_acc = accumulator(_acc, v)) : noop, | ||
| ...rest | ||||
|
|
r102 | }); | ||
|
|
r110 | }), | ||
| cat: (...seq) => _observe(({ next, complete: final, ...rest }) => { | ||||
| let cleanup: () => void; | ||||
| const complete = () => { | ||||
| const continuation = seq.shift(); | ||||
| if (continuation) { | ||||
| // if we have a next sequence, subscribe to it | ||||
| const subscription = continuation.subscribe({ next, complete, ...rest }); | ||||
| cleanup = subscription.unsubscribe.bind(subscription); | ||||
| } else { | ||||
| // otherwise notify the consumer about completion | ||||
| final(); | ||||
| } | ||||
| }; | ||||
| cleanup = producer({ next, complete, ...rest }) ?? noop; | ||||
| return () => cleanup(); | ||||
|
|
r96 | }) | ||
| }); | ||||
|
|
r102 | |||
|
|
r110 | export interface OrderUpdate<T> { | ||
| /** The item is being updated */ | ||||
| item: T; | ||||
| /** The previous index of the item, -1 in case it is inserted */ | ||||
| prevIndex: number; | ||||
| /** The new index of the item, -1 in case it is deleted */ | ||||
| newIndex: number; | ||||
| } | ||||
| interface ObservableResults<T> { | ||||
| /** | ||||
| * Allows observation of results | ||||
| */ | ||||
| observe(listener: (object: T, previousIndex: number, newIndex: number) => void, includeUpdates?: boolean): { | ||||
| remove(): void; | ||||
| }; | ||||
| } | ||||
| interface Queryable<T, A extends unknown[]> { | ||||
| query(...args: A): PromiseOrValue<T[]>; | ||||
| } | ||||
| export const isObservableResults = <T>(v: object): v is ObservableResults<T> => | ||||
| v && (typeof (v as { observe?: unknown; }).observe === "function"); | ||||
| export const observe = <T>(producer: Producer<T>) => _observe(fuse(producer)); | ||||
| export const empty = observe<never>(({ complete }) => complete()); | ||||
| export const query = <T, A extends unknown[]>(store: Queryable<T, A>) => | ||||
| (...args: A) => { | ||||
| return observe<OrderUpdate<T>>(({ next, complete, error }) => { | ||||
| try { | ||||
| const results = store.query(...args); | ||||
| if (isPromise(results)) { | ||||
| results.then(items => items.forEach((item, newIndex) => next({ item, newIndex, prevIndex: -1 }))) | ||||
| .then(undefined, error); | ||||
| } else { | ||||
| results.forEach((item, newIndex) => next({ item, newIndex, prevIndex: -1 })); | ||||
| } | ||||
| if (isObservableResults<T>(results)) { | ||||
| const h = results.observe((item, prevIndex, newIndex) => next({ item, prevIndex, newIndex })); | ||||
| return () => h.remove(); | ||||
| } else { | ||||
| complete(); | ||||
| } | ||||
| } catch (err) { | ||||
| error(err); | ||||
| } | ||||
| }); | ||||
| }; | ||||
