##// END OF EJS Templates
Added tag v1.9.0 for changeset af4f8424e83d
Added tag v1.9.0 for changeset af4f8424e83d

File last commit:

r144:63215d91ae4b v1.9.0-rc6 default
r147:ae34865a484b default
Show More
store.ts
124 lines | 4.2 KiB | video/mp2t | TypeScriptLexer
import { PromiseOrValue } from "@implab/core-amd/interfaces";
import { isCancellable, isPromise } from "@implab/core-amd/safe";
import { observe, Observable, empty } from "./observable";
import { after } from "dojo/aspect";
export interface OrderedUpdate<T> {
/** The item is being updated */
readonly item: T;
/** The previous index of the item, -1 in case it is inserted */
readonly prevIndex: number;
/** The new index of the item, -1 in case it is deleted */
readonly newIndex: number;
}
export type QueryResults<T> = Observable<OrderedUpdate<T>>;
interface DjObservableResults<T> {
/**
* Allows observation of results
*/
observe(listener: (object: T, previousIndex: number, newIndex: number) => void, includeUpdates?: boolean): {
remove(): void;
};
}
interface Queryable<T, Q, O> {
query(query?: Q, options?: O): PromiseOrValue<T[]>;
}
export const isDjObservableResults = <T>(v: object): v is DjObservableResults<T> =>
v && (typeof (v as { observe?: unknown; }).observe === "function");
export const query = <T, Q, O>(store: Queryable<T, Q, O>, includeUpdates = true) =>
(query?: Q, options?: O & { observe?: boolean }) => {
return observe<OrderedUpdate<T>>(({ next, complete, error, isClosed }) => {
const processResults = (items: T[]) =>
items.forEach((item, newIndex) => next({ item, newIndex, prevIndex: -1 }));
try {
const results = store.query(query, options);
if (isPromise(results)) {
results.then(processResults).then(undefined, error);
} else {
processResults(results);
}
if (!isClosed() && (options?.observe !== false) && isDjObservableResults<T>(results)) {
const h = results.observe((item, prevIndex, newIndex) => next({ item, prevIndex, newIndex }), includeUpdates);
return () => h.remove();
} else {
complete();
}
} catch (err) {
error(err);
}
});
};
interface IndexedStore<T> {
get(id: string | number): PromiseLike<T> | T | null | undefined;
}
interface Notifications<T> {
notify(item: T | undefined, id: string | number | undefined): void;
}
const hasNotifications = <T>(x: unknown): x is Notifications<T> =>
typeof x === "object" && x !== null && (typeof (x as Notifications<T>).notify === "function");
interface GetOpts {
observe?: boolean;
}
type Change<T> = [item: NonNullable<T>, id: string | number | undefined] |
[item: undefined | null, id: string | number];
const filterItem = (itemId: string | number) =>
<T>(source: Observable<Change<T>>) =>
observe<T>(({ next, complete, error }) => {
const subscription = source
.filter(([, id]) => id === itemId)
.subscribe({
next: ([item]) => item !== null && item !== undefined ? next(item) : complete(),
complete,
error
});
return () => subscription.unsubscribe();
});
export const get = <T>(store: IndexedStore<T>) => {
const changes = hasNotifications<T>(store) ?
observe<Change<T>>(({ next }) => {
const handle = after(store, "notify", (...args: Change<T>) => next(args), true);
return () => handle.remove();
}) : empty;
return (id: string | number, opts: GetOpts = {}) =>
observe<T>(({ next, complete, error }) => {
try {
const result = store.get(id);
const handle = (x: T | null | undefined) => {
if (x !== null && x !== undefined)
next(x);
complete();
};
if (isPromise(result)) {
result.then(handle).then(undefined, error);
if (isCancellable(result))
return () => result.cancel();
} else {
handle(result);
}
} catch (e) {
error(e);
}
}).cat(opts.observe !== false ? changes.pipe(filterItem(id)) : empty);
};