import { PromiseOrValue } from "@implab/core-amd/interfaces"; import { isPromise } from "@implab/core-amd/safe"; /** * The interface for the consumer of an observable sequence */ export interface Observer { /** * Called for the next element in the sequence */ next: (value: T) => void; /** * Called once when the error occurs in the sequence. */ error: (e: unknown) => void; /** * Called once at the end of the sequence. */ complete: () => void; } /** * The group of functions to feed an observable. These methods are provided to * the producer to generate a stream of events. */ export type Sink = { /** * Call to send the next element in the sequence */ next: (value: T) => void; /** * Call to notify about the error occurred in the sequence. */ error: (e: unknown) => void; /** * Call to signal the end of the sequence. */ complete: () => void; /** * Checks whether the sink is accepting new elements. It's safe to * send elements to the closed sink. */ isClosed: () => boolean; }; export type Producer = (sink: Sink) => (void | (() => void)); export interface Unsubscribable { unsubscribe(): void; } export const isUnsubsribable = (v: unknown): v is Unsubscribable => v !== null && v !== undefined && typeof (v as Unsubscribable).unsubscribe === "function"; export const isSubsribable = (v: unknown): v is Subscribable => v !== null && v !== undefined && typeof (v as Subscribable).subscribe === "function"; export interface Subscribable { subscribe(consumer: Partial>): Unsubscribable; } /** The observable source of items. */ export interface Observable extends Subscribable { /** Transforms elements of the sequence with the specified mapper * * @param mapper The mapper used to transform the values */ map(mapper: (value: T) => T2): Observable; /** Filters elements of the sequence. The resulting sequence will * contain only elements which match the specified predicate. * * @param predicate The filter predicate. */ filter(predicate: (value: T) => boolean): Observable; /** Applies accumulator to each value in the sequence and * emits the accumulated value for each source element * * @param accumulator * @param initial */ scan(accumulator: (acc: A, value: T) => A, initial: A): Observable; cat(...seq: Subscribable[]): Observable; pipe(f: (source: Observable) => Producer): Observable; } const noop = () => { }; const sink = (consumer: Partial>) => { const { next, error, complete } = consumer; return { next: next ? next.bind(consumer) : noop, error: error ? error.bind(consumer) : noop, complete: complete ? complete.bind(consumer) : noop, isClosed: () => false }; }; /** Wraps the producer to handle tear down logic and subscription management * * @param producer The producer to wrap * @returns The wrapper producer */ const fuse = (producer: Producer) => ({ next, error, complete }: Sink) => { let done = false; let cleanup = noop; const _fin = (fn: (...args: A) => void) => (...args: A) => done ? void (0) : (done = true, cleanup(), fn(...args)); const safeSink = { next: (value: T) => { !done && next(value); }, error: _fin(error), complete: _fin(complete), isClosed: () => done }; cleanup = producer(safeSink) ?? noop; return done ? (cleanup(), noop) : _fin(noop); }; const _observe = (producer: Producer): Observable => ({ subscribe: (consumer: Partial>) => ({ unsubscribe: producer(sink(consumer)) ?? noop }), map: (mapper) => _observe(({ next, ...rest }) => producer({ next: next !== noop ? (v: T) => next(mapper(v)) : noop, ...rest }) ), filter: (predicate) => _observe(({ next, ...rest }) => producer({ next: next !== noop ? (v: T) => predicate(v) ? next(v) : void (0) : noop, ...rest }) ), scan: (accumulator, initial) => _observe(({ next, ...rest }) => { let _acc = initial; return producer({ next: next !== noop ? (v: T) => next(_acc = accumulator(_acc, v)) : noop, ...rest }); }), cat: (...seq) => _observe(({ next, complete: final, ...rest }) => { let cleanup: () => void; const complete = () => { const continuation = seq.shift(); if (continuation) { // if we have a next sequence, subscribe to it const subscription = continuation.subscribe({ next, complete, ...rest }); cleanup = subscription.unsubscribe.bind(subscription); } else { // otherwise notify the consumer about completion final(); } }; cleanup = producer({ next, complete, ...rest }) ?? noop; return () => cleanup(); }), pipe: (f: (source: Observable) => Producer) => observe(f(_observe(producer))) }); export interface OrderUpdate { /** The item is being updated */ item: T; /** The previous index of the item, -1 in case it is inserted */ prevIndex: number; /** The new index of the item, -1 in case it is deleted */ newIndex: number; } interface ObservableResults { /** * Allows observation of results */ observe(listener: (object: T, previousIndex: number, newIndex: number) => void, includeUpdates?: boolean): { remove(): void; }; } interface Queryable { query(...args: A): PromiseOrValue; } export const isObservableResults = (v: object): v is ObservableResults => v && (typeof (v as { observe?: unknown; }).observe === "function"); export const observe = (producer: Producer) => _observe(fuse(producer)); export const empty = observe(({ complete }) => complete()); export const query = (store: Queryable) => (...args: A) => { return observe>(({ next, complete, error }) => { try { const results = store.query(...args); if (isPromise(results)) { results.then(items => items.forEach((item, newIndex) => next({ item, newIndex, prevIndex: -1 }))) .then(undefined, error); } else { results.forEach((item, newIndex) => next({ item, newIndex, prevIndex: -1 })); } if (isObservableResults(results)) { const h = results.observe((item, prevIndex, newIndex) => next({ item, prevIndex, newIndex })); return () => h.remove(); } else { complete(); } } catch (err) { error(err); } }); };