import { PromiseOrValue } from "@implab/core-amd/interfaces"; import { isCancellable, isPromise } from "@implab/core-amd/safe"; import { observe, Observable, empty } from "./observable"; import { after } from "dojo/aspect"; import { subject } from "./operators/subject"; export interface OrderedUpdate { /** The item is being updated */ readonly item: T; /** The previous index of the item, -1 in case it is inserted */ readonly prevIndex: number; /** The new index of the item, -1 in case it is deleted */ readonly newIndex: number; } export type QueryResults = Observable>; interface DjObservableResults { /** * Allows observation of results */ observe(listener: (object: T, previousIndex: number, newIndex: number) => void, includeUpdates?: boolean): { remove(): void; }; } interface Queryable { query(query?: Q, options?: O): PromiseOrValue; } export const isDjObservableResults = (v: object): v is DjObservableResults => v && (typeof (v as { observe?: unknown; }).observe === "function"); export const query = (store: Queryable, includeUpdates = true) => (query?: Q, options?: O & { observe?: boolean }) => { return observe>(({ next, complete, error, isClosed }) => { const processResults = (items: T[]) => items.forEach((item, newIndex) => next({ item, newIndex, prevIndex: -1 })); try { const results = store.query(query, options); if (isPromise(results)) { results.then(processResults).then(undefined, error); } else { processResults(results); } if (!isClosed() && (options?.observe !== false) && isDjObservableResults(results)) { const h = results.observe((item, prevIndex, newIndex) => next({ item, prevIndex, newIndex }), includeUpdates); return () => h.remove(); } else { complete(); } } catch (err) { error(err); } }); }; interface IndexedStore { get(id: string | number): PromiseLike | T | null | undefined; } interface Notifications { notify(item: T | undefined, id: string | number | undefined): void; } const hasNotifications = (x: unknown): x is Notifications => typeof x === "object" && x !== null && (typeof (x as Notifications).notify === "function"); interface GetOpts { observe?: boolean; } type Change = [item: NonNullable, id: string | number | undefined] | [item: undefined | null, id: string | number]; const filterItem = (itemId: string | number) => (source: Observable>) => observe(({ next, complete, error }) => { const subscription = source .filter(([, id]) => id === itemId) .subscribe({ next: ([item]) => item !== null && item !== undefined ? next(item) : complete(), complete, error }); return () => subscription.unsubscribe(); }); export const get = (store: IndexedStore) => { const changes = hasNotifications(store) ? observe>(({ next }) => { const handle = after(store, "notify", (...args: Change) => next(args), true); return () => handle.remove(); }).pipe(subject) : empty; return (id: string | number, opts: GetOpts = {}) => observe(({ next, complete, error }) => { const result = store.get(id); const handle = (x: T | null | undefined) => { if (x !== null && x !== undefined) next(x); complete(); }; if (isPromise(result)) { result.then(handle, error); if (isCancellable(result)) return () => result.cancel(); } else { handle(result); } }).cat(opts.observe !== false ? changes.pipe(filterItem(id)) : empty); };