store.ts
58 lines
| 2.1 KiB
| video/mp2t
|
TypeScriptLexer
cin
|
r116 | import { PromiseOrValue } from "@implab/core-amd/interfaces"; | ||
import { isPromise } from "@implab/core-amd/safe"; | ||||
import { observe, Observable } from "./observable"; | ||||
export interface OrderedUpdate<T> { | ||||
/** The item is being updated */ | ||||
readonly item: T; | ||||
/** The previous index of the item, -1 in case it is inserted */ | ||||
readonly prevIndex: number; | ||||
/** The new index of the item, -1 in case it is deleted */ | ||||
readonly newIndex: number; | ||||
} | ||||
export type QueryResults<T> = Observable<OrderedUpdate<T>>; | ||||
interface DjObservableResults<T> { | ||||
/** | ||||
* Allows observation of results | ||||
*/ | ||||
observe(listener: (object: T, previousIndex: number, newIndex: number) => void, includeUpdates?: boolean): { | ||||
remove(): void; | ||||
}; | ||||
} | ||||
cin
|
r118 | interface Queryable<T, Q, O> { | ||
query(query?: Q, options?: O): PromiseOrValue<T[]>; | ||||
cin
|
r116 | } | ||
cin
|
r118 | export const isDjObservableResults = <T>(v: object): v is DjObservableResults<T> => | ||
cin
|
r116 | v && (typeof (v as { observe?: unknown; }).observe === "function"); | ||
cin
|
r118 | export const query = <T, Q, O>(store: Queryable<T, Q, O>, includeUpdates = true) => | ||
cin
|
r120 | (query?: Q, options?: O & { observe?: boolean }) => { | ||
cin
|
r116 | return observe<OrderedUpdate<T>>(({ next, complete, error, isClosed }) => { | ||
try { | ||||
cin
|
r118 | const results = store.query(query, options); | ||
cin
|
r116 | if (isPromise(results)) { | ||
results.then(items => items.forEach((item, newIndex) => next({ item, newIndex, prevIndex: -1 }))) | ||||
.then(undefined, error); | ||||
} else { | ||||
results.forEach((item, newIndex) => next({ item, newIndex, prevIndex: -1 })); | ||||
} | ||||
cin
|
r118 | if (!isClosed() && (options?.observe !== false) && isDjObservableResults<T>(results)) { | ||
cin
|
r116 | const h = results.observe((item, prevIndex, newIndex) => next({ item, prevIndex, newIndex }), includeUpdates); | ||
return () => h.remove(); | ||||
} else { | ||||
complete(); | ||||
} | ||||
} catch (err) { | ||||
error(err); | ||||
} | ||||
}); | ||||
}; | ||||