store.ts
122 lines
| 4.1 KiB
| video/mp2t
|
TypeScriptLexer
|
|
r116 | import { PromiseOrValue } from "@implab/core-amd/interfaces"; | |
|
|
r136 | import { isCancellable, isPromise } from "@implab/core-amd/safe"; | |
| import { observe, Observable, empty } from "./observable"; | |||
| import { after } from "dojo/aspect"; | |||
| import { subject } from "./operators/subject"; | |||
|
|
r116 | ||
| export interface OrderedUpdate<T> { | |||
| /** The item is being updated */ | |||
| readonly item: T; | |||
| /** The previous index of the item, -1 in case it is inserted */ | |||
| readonly prevIndex: number; | |||
| /** The new index of the item, -1 in case it is deleted */ | |||
| readonly newIndex: number; | |||
| } | |||
| export type QueryResults<T> = Observable<OrderedUpdate<T>>; | |||
| interface DjObservableResults<T> { | |||
| /** | |||
| * Allows observation of results | |||
| */ | |||
| observe(listener: (object: T, previousIndex: number, newIndex: number) => void, includeUpdates?: boolean): { | |||
| remove(): void; | |||
| }; | |||
| } | |||
|
|
r118 | interface Queryable<T, Q, O> { | |
| query(query?: Q, options?: O): PromiseOrValue<T[]>; | |||
|
|
r116 | } | |
|
|
r118 | export const isDjObservableResults = <T>(v: object): v is DjObservableResults<T> => | |
|
|
r116 | v && (typeof (v as { observe?: unknown; }).observe === "function"); | |
|
|
r118 | export const query = <T, Q, O>(store: Queryable<T, Q, O>, includeUpdates = true) => | |
|
|
r120 | (query?: Q, options?: O & { observe?: boolean }) => { | |
|
|
r116 | return observe<OrderedUpdate<T>>(({ next, complete, error, isClosed }) => { | |
|
|
r129 | ||
| const processResults = (items: T[]) => | |||
| items.forEach((item, newIndex) => next({ item, newIndex, prevIndex: -1 })); | |||
|
|
r116 | try { | |
|
|
r118 | const results = store.query(query, options); | |
|
|
r116 | if (isPromise(results)) { | |
|
|
r129 | results.then(processResults).then(undefined, error); | |
|
|
r116 | } else { | |
|
|
r129 | processResults(results); | |
|
|
r116 | } | |
|
|
r118 | if (!isClosed() && (options?.observe !== false) && isDjObservableResults<T>(results)) { | |
|
|
r116 | const h = results.observe((item, prevIndex, newIndex) => next({ item, prevIndex, newIndex }), includeUpdates); | |
| return () => h.remove(); | |||
| } else { | |||
| complete(); | |||
| } | |||
| } catch (err) { | |||
| error(err); | |||
| } | |||
| }); | |||
| }; | |||
|
|
r136 | ||
| interface IndexedStore<T> { | |||
| get(id: string | number): PromiseLike<T> | T | null | undefined; | |||
| } | |||
| interface Notifications<T> { | |||
| notify(item: T | undefined, id: string | number | undefined): void; | |||
| } | |||
| const hasNotifications = <T>(x: unknown): x is Notifications<T> => | |||
| typeof x === "object" && x !== null && (typeof (x as Notifications<T>).notify === "function"); | |||
| interface GetOpts { | |||
| observe?: boolean; | |||
| } | |||
| type Change<T> = [item: NonNullable<T>, id: string | number | undefined] | | |||
| [item: undefined | null, id: string | number]; | |||
| const filterItem = (itemId: string | number) => | |||
| <T>(source: Observable<Change<T>>) => | |||
| observe<T>(({ next, complete, error }) => { | |||
| const subscription = source | |||
| .filter(([, id]) => id === itemId) | |||
| .subscribe({ | |||
| next: ([item]) => item !== null && item !== undefined ? next(item) : complete(), | |||
| complete, | |||
| error | |||
| }); | |||
| return () => subscription.unsubscribe(); | |||
| }); | |||
| export const get = <T>(store: IndexedStore<T>) => { | |||
| const changes = hasNotifications<T>(store) ? | |||
| observe<Change<T>>(({ next }) => { | |||
| const handle = after(store, "notify", (...args: Change<T>) => next(args), true); | |||
| return () => handle.remove(); | |||
| }).pipe(subject) : empty; | |||
| return (id: string | number, opts: GetOpts = {}) => | |||
| observe<T>(({ next, complete, error }) => { | |||
| const result = store.get(id); | |||
| const handle = (x: T | null | undefined) => { | |||
| if (x !== null && x !== undefined) | |||
| next(x); | |||
| complete(); | |||
| }; | |||
| if (isPromise(result)) { | |||
| result.then(handle, error); | |||
| if (isCancellable(result)) | |||
| return () => result.cancel(); | |||
| } else { | |||
| handle(result); | |||
| } | |||
| }).cat(opts.observe !== false ? changes.pipe(filterItem(id)) : empty); | |||
| }; |
