store.ts
61 lines
| 2.1 KiB
| video/mp2t
|
TypeScriptLexer
|
|
r116 | import { PromiseOrValue } from "@implab/core-amd/interfaces"; | ||
| import { isPromise } from "@implab/core-amd/safe"; | ||||
| import { observe, Observable } from "./observable"; | ||||
| export interface OrderedUpdate<T> { | ||||
| /** The item is being updated */ | ||||
| readonly item: T; | ||||
| /** The previous index of the item, -1 in case it is inserted */ | ||||
| readonly prevIndex: number; | ||||
| /** The new index of the item, -1 in case it is deleted */ | ||||
| readonly newIndex: number; | ||||
| } | ||||
| export type QueryResults<T> = Observable<OrderedUpdate<T>>; | ||||
| interface DjObservableResults<T> { | ||||
| /** | ||||
| * Allows observation of results | ||||
| */ | ||||
| observe(listener: (object: T, previousIndex: number, newIndex: number) => void, includeUpdates?: boolean): { | ||||
| remove(): void; | ||||
| }; | ||||
| } | ||||
|
|
r118 | interface Queryable<T, Q, O> { | ||
| query(query?: Q, options?: O): PromiseOrValue<T[]>; | ||||
|
|
r116 | } | ||
|
|
r118 | export const isDjObservableResults = <T>(v: object): v is DjObservableResults<T> => | ||
|
|
r116 | v && (typeof (v as { observe?: unknown; }).observe === "function"); | ||
|
|
r118 | export const query = <T, Q, O>(store: Queryable<T, Q, O>, includeUpdates = true) => | ||
|
|
r120 | (query?: Q, options?: O & { observe?: boolean }) => { | ||
|
|
r116 | return observe<OrderedUpdate<T>>(({ next, complete, error, isClosed }) => { | ||
|
|
r129 | |||
| const processResults = (items: T[]) => | ||||
| items.forEach((item, newIndex) => next({ item, newIndex, prevIndex: -1 })); | ||||
|
|
r116 | try { | ||
|
|
r118 | const results = store.query(query, options); | ||
|
|
r116 | if (isPromise(results)) { | ||
|
|
r129 | results.then(processResults).then(undefined, error); | ||
|
|
r116 | } else { | ||
|
|
r129 | processResults(results); | ||
|
|
r116 | } | ||
|
|
r118 | if (!isClosed() && (options?.observe !== false) && isDjObservableResults<T>(results)) { | ||
|
|
r116 | const h = results.observe((item, prevIndex, newIndex) => next({ item, prevIndex, newIndex }), includeUpdates); | ||
| return () => h.remove(); | ||||
| } else { | ||||
| complete(); | ||||
| } | ||||
| } catch (err) { | ||||
| error(err); | ||||
| } | ||||
| }); | ||||
| }; | ||||
