|
|
import { PromiseOrValue } from "@implab/core-amd/interfaces";
|
|
|
import { isCancellable, isPromise } from "@implab/core-amd/safe";
|
|
|
import { observe, Observable, empty } from "./observable";
|
|
|
import { after } from "dojo/aspect";
|
|
|
|
|
|
export interface OrderedUpdate<T> {
|
|
|
/** The item is being updated */
|
|
|
readonly item: T;
|
|
|
|
|
|
/** The previous index of the item, -1 in case it is inserted */
|
|
|
readonly prevIndex: number;
|
|
|
|
|
|
/** The new index of the item, -1 in case it is deleted */
|
|
|
readonly newIndex: number;
|
|
|
|
|
|
}
|
|
|
|
|
|
export type QueryResults<T> = Observable<OrderedUpdate<T>>;
|
|
|
|
|
|
interface DjObservableResults<T> {
|
|
|
/**
|
|
|
* Allows observation of results
|
|
|
*/
|
|
|
observe(listener: (object: T, previousIndex: number, newIndex: number) => void, includeUpdates?: boolean): {
|
|
|
remove(): void;
|
|
|
};
|
|
|
}
|
|
|
|
|
|
interface Queryable<T, Q, O> {
|
|
|
query(query?: Q, options?: O): PromiseOrValue<T[]>;
|
|
|
}
|
|
|
|
|
|
export const isDjObservableResults = <T>(v: object): v is DjObservableResults<T> =>
|
|
|
v && (typeof (v as { observe?: unknown; }).observe === "function");
|
|
|
|
|
|
export const query = <T, Q, O>(store: Queryable<T, Q, O>, includeUpdates = true) => {
|
|
|
const q = queryEx(store, includeUpdates);
|
|
|
return (query?: Q, options?: O & { observe?: boolean }) => {
|
|
|
const [data, updates] = q(query, options);
|
|
|
|
|
|
return options?.observe === false ? data : data.cat(updates);
|
|
|
};
|
|
|
};
|
|
|
|
|
|
/**
|
|
|
* Wraps the query method of the store, the resulting method takes a query
|
|
|
* expression and returns two observable sequences. The first sequence represents
|
|
|
* the results of the query, the second sequence provides the updates to the
|
|
|
* query results.
|
|
|
*
|
|
|
* @param store The store used to query data
|
|
|
* @param includeUpdates The flag to include item updates not only additions and
|
|
|
* deletions. By default this flag is set to true.
|
|
|
* @returns Two observable sequences
|
|
|
*/
|
|
|
export const queryEx = <T, Q, O>(store: Queryable<T, Q, O>, includeUpdates = true) =>
|
|
|
(query?: Q, options?: O): [data: QueryResults<T>, updates: QueryResults<T>] => {
|
|
|
|
|
|
/** count active observers */
|
|
|
let listeners = 0;
|
|
|
let results: PromiseOrValue<T[]> = [];
|
|
|
|
|
|
const data = observe<OrderedUpdate<T>>(({ next, complete, error }) => {
|
|
|
const processResults = (items: T[]) =>
|
|
|
items.forEach((item, newIndex) => next({ item, newIndex, prevIndex: -1 }));
|
|
|
|
|
|
try {
|
|
|
// is there are no active observers here, we need to query actual
|
|
|
// data from the store.
|
|
|
if (listeners === 0)
|
|
|
results = store.query(query, options);
|
|
|
|
|
|
if (isPromise(results)) {
|
|
|
results.then(processResults).then(complete, error);
|
|
|
|
|
|
if (isCancellable(results))
|
|
|
return results.cancel.bind(results);
|
|
|
} else {
|
|
|
processResults(results);
|
|
|
complete();
|
|
|
}
|
|
|
} catch (e) {
|
|
|
error(e);
|
|
|
}
|
|
|
});
|
|
|
|
|
|
const updates = observe<OrderedUpdate<T>>(({ next, complete, error, isClosed }) => {
|
|
|
try {
|
|
|
if (!isClosed() && isDjObservableResults<T>(results)) {
|
|
|
// subscribe fot the changes
|
|
|
listeners++;
|
|
|
const h = results.observe((item, prevIndex, newIndex) => next({ item, prevIndex, newIndex }), includeUpdates);
|
|
|
return () => {
|
|
|
// unsubscribe from changes
|
|
|
listeners--;
|
|
|
h.remove();
|
|
|
};
|
|
|
} else {
|
|
|
complete();
|
|
|
}
|
|
|
} catch (e) {
|
|
|
error(e);
|
|
|
}
|
|
|
});
|
|
|
|
|
|
return [data, updates];
|
|
|
};
|
|
|
|
|
|
|
|
|
interface IndexedStore<T> {
|
|
|
get(id: string | number): PromiseLike<T> | T | null | undefined;
|
|
|
}
|
|
|
|
|
|
interface Notifications<T> {
|
|
|
notify(item: T | undefined, id: string | number | undefined): void;
|
|
|
}
|
|
|
|
|
|
const hasNotifications = <T>(x: unknown): x is Notifications<T> =>
|
|
|
typeof x === "object" && x !== null && (typeof (x as Notifications<T>).notify === "function");
|
|
|
|
|
|
interface GetOpts {
|
|
|
observe?: boolean;
|
|
|
}
|
|
|
|
|
|
export type ItemUpdate<T> = [item: NonNullable<T>, id: string | number | undefined] |
|
|
|
[item: undefined | null, id: string | number];
|
|
|
|
|
|
const filterItem = (itemId: string | number) =>
|
|
|
<T>(source: Observable<ItemUpdate<T>>) =>
|
|
|
observe<T>(({ next, complete, error }) => {
|
|
|
const subscription = source
|
|
|
.filter(([, id]) => id === itemId)
|
|
|
.subscribe({
|
|
|
next: ([item]) => item !== null && item !== undefined ? next(item) : complete(),
|
|
|
complete,
|
|
|
error
|
|
|
});
|
|
|
return () => subscription.unsubscribe();
|
|
|
});
|
|
|
|
|
|
export const get = <T>(store: IndexedStore<T>) => {
|
|
|
const changes = hasNotifications<T>(store) ?
|
|
|
observe<ItemUpdate<T>>(({ next }) => {
|
|
|
const handle = after(store, "notify", (...args: ItemUpdate<T>) => next(args), true);
|
|
|
return () => handle.remove();
|
|
|
}) : empty;
|
|
|
|
|
|
return (id: string | number, opts: GetOpts = {}) =>
|
|
|
observe<T>(({ next, complete, error }) => {
|
|
|
try {
|
|
|
const result = store.get(id);
|
|
|
|
|
|
const handle = (x: T | null | undefined) => {
|
|
|
if (x !== null && x !== undefined)
|
|
|
next(x);
|
|
|
complete();
|
|
|
};
|
|
|
|
|
|
if (isPromise(result)) {
|
|
|
result.then(handle).then(undefined, error);
|
|
|
|
|
|
if (isCancellable(result))
|
|
|
return () => result.cancel();
|
|
|
} else {
|
|
|
handle(result);
|
|
|
}
|
|
|
} catch (e) {
|
|
|
error(e);
|
|
|
}
|
|
|
}).cat(opts.observe !== false ? changes.pipe(filterItem(id)) : empty);
|
|
|
};
|