import { PromiseOrValue } from "@implab/core-amd/interfaces"; import { isPromise } from "@implab/core-amd/safe"; import { observe, Observable } from "./observable"; export interface OrderedUpdate { /** The item is being updated */ readonly item: T; /** The previous index of the item, -1 in case it is inserted */ readonly prevIndex: number; /** The new index of the item, -1 in case it is deleted */ readonly newIndex: number; } export type QueryResults = Observable>; interface DjObservableResults { /** * Allows observation of results */ observe(listener: (object: T, previousIndex: number, newIndex: number) => void, includeUpdates?: boolean): { remove(): void; }; } interface Queryable { query(query?: Q, options?: O): PromiseOrValue; } export const isDjObservableResults = (v: object): v is DjObservableResults => v && (typeof (v as { observe?: unknown; }).observe === "function"); export const query = (store: Queryable, includeUpdates = true) => (query?: Q, options?: O & { observe?: boolean }) => { return observe>(({ next, complete, error, isClosed }) => { const processResults = (items: T[]) => items.forEach((item, newIndex) => next({ item, newIndex, prevIndex: -1 })); try { const results = store.query(query, options); if (isPromise(results)) { results.then(processResults).then(undefined, error); } else { processResults(results); } if (!isClosed() && (options?.observe !== false) && isDjObservableResults(results)) { const h = results.observe((item, prevIndex, newIndex) => next({ item, prevIndex, newIndex }), includeUpdates); return () => h.remove(); } else { complete(); } } catch (err) { error(err); } }); };