##// END OF EJS Templates
Added tag v1.6.0 for changeset aac297dda27d
Added tag v1.6.0 for changeset aac297dda27d

File last commit:

r116:aac297dda27d v1.6.0 default
r117:df9fa867d799 default
Show More
store.ts
58 lines | 2.0 KiB | video/mp2t | TypeScriptLexer
cin
added reduce() and next() methods to observable...
r116 import { PromiseOrValue } from "@implab/core-amd/interfaces";
import { isPromise } from "@implab/core-amd/safe";
import { observe, Observable } from "./observable";
export interface OrderedUpdate<T> {
/** The item is being updated */
readonly item: T;
/** The previous index of the item, -1 in case it is inserted */
readonly prevIndex: number;
/** The new index of the item, -1 in case it is deleted */
readonly newIndex: number;
}
export type QueryResults<T> = Observable<OrderedUpdate<T>>;
interface DjObservableResults<T> {
/**
* Allows observation of results
*/
observe(listener: (object: T, previousIndex: number, newIndex: number) => void, includeUpdates?: boolean): {
remove(): void;
};
}
interface Queryable<T, A extends unknown[]> {
query(...args: A): PromiseOrValue<T[]>;
}
export const isObservableResults = <T>(v: object): v is DjObservableResults<T> =>
v && (typeof (v as { observe?: unknown; }).observe === "function");
export const query = <T, A extends unknown[]>(store: Queryable<T, A>, includeUpdates = true) =>
(...args: A) => {
return observe<OrderedUpdate<T>>(({ next, complete, error, isClosed }) => {
try {
const results = store.query(...args);
if (isPromise(results)) {
results.then(items => items.forEach((item, newIndex) => next({ item, newIndex, prevIndex: -1 })))
.then(undefined, error);
} else {
results.forEach((item, newIndex) => next({ item, newIndex, prevIndex: -1 }));
}
if (!isClosed() && isObservableResults<T>(results)) {
const h = results.observe((item, prevIndex, newIndex) => next({ item, prevIndex, newIndex }), includeUpdates);
return () => h.remove();
} else {
complete();
}
} catch (err) {
error(err);
}
});
};