store.ts
122 lines
| 4.1 KiB
| video/mp2t
|
TypeScriptLexer
cin
|
r116 | import { PromiseOrValue } from "@implab/core-amd/interfaces"; | |
cin
|
r136 | import { isCancellable, isPromise } from "@implab/core-amd/safe"; | |
import { observe, Observable, empty } from "./observable"; | |||
import { after } from "dojo/aspect"; | |||
import { subject } from "./operators/subject"; | |||
cin
|
r116 | ||
export interface OrderedUpdate<T> { | |||
/** The item is being updated */ | |||
readonly item: T; | |||
/** The previous index of the item, -1 in case it is inserted */ | |||
readonly prevIndex: number; | |||
/** The new index of the item, -1 in case it is deleted */ | |||
readonly newIndex: number; | |||
} | |||
export type QueryResults<T> = Observable<OrderedUpdate<T>>; | |||
interface DjObservableResults<T> { | |||
/** | |||
* Allows observation of results | |||
*/ | |||
observe(listener: (object: T, previousIndex: number, newIndex: number) => void, includeUpdates?: boolean): { | |||
remove(): void; | |||
}; | |||
} | |||
cin
|
r118 | interface Queryable<T, Q, O> { | |
query(query?: Q, options?: O): PromiseOrValue<T[]>; | |||
cin
|
r116 | } | |
cin
|
r118 | export const isDjObservableResults = <T>(v: object): v is DjObservableResults<T> => | |
cin
|
r116 | v && (typeof (v as { observe?: unknown; }).observe === "function"); | |
cin
|
r118 | export const query = <T, Q, O>(store: Queryable<T, Q, O>, includeUpdates = true) => | |
cin
|
r120 | (query?: Q, options?: O & { observe?: boolean }) => { | |
cin
|
r116 | return observe<OrderedUpdate<T>>(({ next, complete, error, isClosed }) => { | |
cin
|
r129 | ||
const processResults = (items: T[]) => | |||
items.forEach((item, newIndex) => next({ item, newIndex, prevIndex: -1 })); | |||
cin
|
r116 | try { | |
cin
|
r118 | const results = store.query(query, options); | |
cin
|
r116 | if (isPromise(results)) { | |
cin
|
r129 | results.then(processResults).then(undefined, error); | |
cin
|
r116 | } else { | |
cin
|
r129 | processResults(results); | |
cin
|
r116 | } | |
cin
|
r118 | if (!isClosed() && (options?.observe !== false) && isDjObservableResults<T>(results)) { | |
cin
|
r116 | const h = results.observe((item, prevIndex, newIndex) => next({ item, prevIndex, newIndex }), includeUpdates); | |
return () => h.remove(); | |||
} else { | |||
complete(); | |||
} | |||
} catch (err) { | |||
error(err); | |||
} | |||
}); | |||
}; | |||
cin
|
r136 | ||
interface IndexedStore<T> { | |||
get(id: string | number): PromiseLike<T> | T | null | undefined; | |||
} | |||
interface Notifications<T> { | |||
notify(item: T | undefined, id: string | number | undefined): void; | |||
} | |||
const hasNotifications = <T>(x: unknown): x is Notifications<T> => | |||
typeof x === "object" && x !== null && (typeof (x as Notifications<T>).notify === "function"); | |||
interface GetOpts { | |||
observe?: boolean; | |||
} | |||
type Change<T> = [item: NonNullable<T>, id: string | number | undefined] | | |||
[item: undefined | null, id: string | number]; | |||
const filterItem = (itemId: string | number) => | |||
<T>(source: Observable<Change<T>>) => | |||
observe<T>(({ next, complete, error }) => { | |||
const subscription = source | |||
.filter(([, id]) => id === itemId) | |||
.subscribe({ | |||
next: ([item]) => item !== null && item !== undefined ? next(item) : complete(), | |||
complete, | |||
error | |||
}); | |||
return () => subscription.unsubscribe(); | |||
}); | |||
export const get = <T>(store: IndexedStore<T>) => { | |||
const changes = hasNotifications<T>(store) ? | |||
observe<Change<T>>(({ next }) => { | |||
const handle = after(store, "notify", (...args: Change<T>) => next(args), true); | |||
return () => handle.remove(); | |||
}).pipe(subject) : empty; | |||
return (id: string | number, opts: GetOpts = {}) => | |||
observe<T>(({ next, complete, error }) => { | |||
const result = store.get(id); | |||
const handle = (x: T | null | undefined) => { | |||
if (x !== null && x !== undefined) | |||
next(x); | |||
complete(); | |||
}; | |||
if (isPromise(result)) { | |||
result.then(handle, error); | |||
if (isCancellable(result)) | |||
return () => result.cancel(); | |||
} else { | |||
handle(result); | |||
} | |||
}).cat(opts.observe !== false ? changes.pipe(filterItem(id)) : empty); | |||
}; |