import { PromiseOrValue } from "@implab/core-amd/interfaces"; import { isCancellable, isPromise } from "@implab/core-amd/safe"; import { observe, Observable, empty } from "./observable"; import { after } from "dojo/aspect"; export interface OrderedUpdate { /** The item is being updated */ readonly item: T; /** The previous index of the item, -1 in case it is inserted */ readonly prevIndex: number; /** The new index of the item, -1 in case it is deleted */ readonly newIndex: number; } export type QueryResults = Observable>; interface DjObservableResults { /** * Allows observation of results */ observe(listener: (object: T, previousIndex: number, newIndex: number) => void, includeUpdates?: boolean): { remove(): void; }; } interface Queryable { query(query?: Q, options?: O): PromiseOrValue; } export const isDjObservableResults = (v: object): v is DjObservableResults => v && (typeof (v as { observe?: unknown; }).observe === "function"); export const query = (store: Queryable, includeUpdates = true) => { const q = queryEx(store, includeUpdates); return (query?: Q, options?: O & { observe?: boolean }) => { const [data, updates] = q(query, options); return options?.observe === false ? data : data.cat(updates); }; }; export const queryEx = (store: Queryable, includeUpdates = true) => (query?: Q, options?: O): [data: QueryResults, updates: QueryResults] => { const pending: T[] = []; let results: PromiseOrValue = pending; const data = observe>(({ next, complete, error }) => { const processResults = (items: T[]) => items.forEach((item, newIndex) => next({ item, newIndex, prevIndex: -1 })); try { if (results === pending) results = store.query(query, options); if (isPromise(results)) { results.then(processResults).then(complete, error); if (isCancellable(results)) return results.cancel.bind(results); } else { processResults(results); complete(); } } catch (e) { error(e); } }); const updates = observe>(({ next, complete, error, isClosed }) => { try { if (!isClosed() && isDjObservableResults(results)) { const h = results.observe((item, prevIndex, newIndex) => next({ item, prevIndex, newIndex }), includeUpdates); return () => h.remove(); } else { complete(); } } catch (e) { error(e); } }); return [data, updates]; }; interface IndexedStore { get(id: string | number): PromiseLike | T | null | undefined; } interface Notifications { notify(item: T | undefined, id: string | number | undefined): void; } const hasNotifications = (x: unknown): x is Notifications => typeof x === "object" && x !== null && (typeof (x as Notifications).notify === "function"); interface GetOpts { observe?: boolean; } export type ItemUpdate = [item: NonNullable, id: string | number | undefined] | [item: undefined | null, id: string | number]; const filterItem = (itemId: string | number) => (source: Observable>) => observe(({ next, complete, error }) => { const subscription = source .filter(([, id]) => id === itemId) .subscribe({ next: ([item]) => item !== null && item !== undefined ? next(item) : complete(), complete, error }); return () => subscription.unsubscribe(); }); export const get = (store: IndexedStore) => { const changes = hasNotifications(store) ? observe>(({ next }) => { const handle = after(store, "notify", (...args: ItemUpdate) => next(args), true); return () => handle.remove(); }) : empty; return (id: string | number, opts: GetOpts = {}) => observe(({ next, complete, error }) => { try { const result = store.get(id); const handle = (x: T | null | undefined) => { if (x !== null && x !== undefined) next(x); complete(); }; if (isPromise(result)) { result.then(handle).then(undefined, error); if (isCancellable(result)) return () => result.cancel(); } else { handle(result); } } catch (e) { error(e); } }).cat(opts.observe !== false ? changes.pipe(filterItem(id)) : empty); };