##// END OF EJS Templates
Implemented subscription SubscriptionImpl, fixed subscription resource management
Implemented subscription SubscriptionImpl, fixed subscription resource management

File last commit:

r150:d9c99ae7dec8 v1.10.1 default
r158:078eca3dc271 v1.10.3 default
Show More
store.ts
170 lines | 5.9 KiB | video/mp2t | TypeScriptLexer
import { PromiseOrValue } from "@implab/core-amd/interfaces";
import { isCancellable, isPromise } from "@implab/core-amd/safe";
import { observe, Observable, empty } from "./observable";
import { after } from "dojo/aspect";
export interface OrderedUpdate<T> {
/** The item is being updated */
readonly item: T;
/** The previous index of the item, -1 in case it is inserted */
readonly prevIndex: number;
/** The new index of the item, -1 in case it is deleted */
readonly newIndex: number;
}
export type QueryResults<T> = Observable<OrderedUpdate<T>>;
interface DjObservableResults<T> {
/**
* Allows observation of results
*/
observe(listener: (object: T, previousIndex: number, newIndex: number) => void, includeUpdates?: boolean): {
remove(): void;
};
}
interface Queryable<T, Q, O> {
query(query?: Q, options?: O): PromiseOrValue<T[]>;
}
export const isDjObservableResults = <T>(v: object): v is DjObservableResults<T> =>
v && (typeof (v as { observe?: unknown; }).observe === "function");
export const query = <T, Q, O>(store: Queryable<T, Q, O>, includeUpdates = true) => {
const q = queryEx(store, includeUpdates);
return (query?: Q, options?: O & { observe?: boolean }) => {
const [data, updates] = q(query, options);
return options?.observe === false ? data : data.cat(updates);
};
};
/**
* Wraps the query method of the store, the resulting method takes a query
* expression and returns two observable sequences. The first sequence represents
* the results of the query, the second sequence provides the updates to the
* query results.
*
* @param store The store used to query data
* @param includeUpdates The flag to include item updates not only additions and
* deletions. By default this flag is set to true.
* @returns Two observable sequences
*/
export const queryEx = <T, Q, O>(store: Queryable<T, Q, O>, includeUpdates = true) =>
(query?: Q, options?: O): [data: QueryResults<T>, updates: QueryResults<T>] => {
/** count active observers */
let listeners = 0;
let results: PromiseOrValue<T[]> = [];
const data = observe<OrderedUpdate<T>>(({ next, complete, error }) => {
const processResults = (items: T[]) =>
items.forEach((item, newIndex) => next({ item, newIndex, prevIndex: -1 }));
try {
// is there are no active observers here, we need to query actual
// data from the store.
if (listeners === 0)
results = store.query(query, options);
if (isPromise(results)) {
results.then(processResults).then(complete, error);
if (isCancellable(results))
return results.cancel.bind(results);
} else {
processResults(results);
complete();
}
} catch (e) {
error(e);
}
});
const updates = observe<OrderedUpdate<T>>(({ next, complete, error, isClosed }) => {
try {
if (!isClosed() && isDjObservableResults<T>(results)) {
// subscribe fot the changes
listeners++;
const h = results.observe((item, prevIndex, newIndex) => next({ item, prevIndex, newIndex }), includeUpdates);
return () => {
// unsubscribe from changes
listeners--;
h.remove();
};
} else {
complete();
}
} catch (e) {
error(e);
}
});
return [data, updates];
};
interface IndexedStore<T> {
get(id: string | number): PromiseLike<T> | T | null | undefined;
}
interface Notifications<T> {
notify(item: T | undefined, id: string | number | undefined): void;
}
const hasNotifications = <T>(x: unknown): x is Notifications<T> =>
typeof x === "object" && x !== null && (typeof (x as Notifications<T>).notify === "function");
interface GetOpts {
observe?: boolean;
}
export type ItemUpdate<T> = [item: NonNullable<T>, id: string | number | undefined] |
[item: undefined | null, id: string | number];
const filterItem = (itemId: string | number) =>
<T>(source: Observable<ItemUpdate<T>>) =>
observe<T>(({ next, complete, error }) => {
const subscription = source
.filter(([, id]) => id === itemId)
.subscribe({
next: ([item]) => item !== null && item !== undefined ? next(item) : complete(),
complete,
error
});
return () => subscription.unsubscribe();
});
export const get = <T>(store: IndexedStore<T>) => {
const changes = hasNotifications<T>(store) ?
observe<ItemUpdate<T>>(({ next }) => {
const handle = after(store, "notify", (...args: ItemUpdate<T>) => next(args), true);
return () => handle.remove();
}) : empty;
return (id: string | number, opts: GetOpts = {}) =>
observe<T>(({ next, complete, error }) => {
try {
const result = store.get(id);
const handle = (x: T | null | undefined) => {
if (x !== null && x !== undefined)
next(x);
complete();
};
if (isPromise(result)) {
result.then(handle).then(undefined, error);
if (isCancellable(result))
return () => result.cancel();
} else {
handle(result);
}
} catch (e) {
error(e);
}
}).cat(opts.observe !== false ? changes.pipe(filterItem(id)) : empty);
};