store.ts
170 lines
| 5.9 KiB
| video/mp2t
|
TypeScriptLexer
cin
|
r116 | import { PromiseOrValue } from "@implab/core-amd/interfaces"; | |
cin
|
r136 | import { isCancellable, isPromise } from "@implab/core-amd/safe"; | |
import { observe, Observable, empty } from "./observable"; | |||
import { after } from "dojo/aspect"; | |||
cin
|
r116 | ||
export interface OrderedUpdate<T> { | |||
/** The item is being updated */ | |||
readonly item: T; | |||
/** The previous index of the item, -1 in case it is inserted */ | |||
readonly prevIndex: number; | |||
/** The new index of the item, -1 in case it is deleted */ | |||
readonly newIndex: number; | |||
} | |||
export type QueryResults<T> = Observable<OrderedUpdate<T>>; | |||
interface DjObservableResults<T> { | |||
/** | |||
* Allows observation of results | |||
*/ | |||
observe(listener: (object: T, previousIndex: number, newIndex: number) => void, includeUpdates?: boolean): { | |||
remove(): void; | |||
}; | |||
} | |||
cin
|
r118 | interface Queryable<T, Q, O> { | |
query(query?: Q, options?: O): PromiseOrValue<T[]>; | |||
cin
|
r116 | } | |
cin
|
r118 | export const isDjObservableResults = <T>(v: object): v is DjObservableResults<T> => | |
cin
|
r116 | v && (typeof (v as { observe?: unknown; }).observe === "function"); | |
cin
|
r144 | export const query = <T, Q, O>(store: Queryable<T, Q, O>, includeUpdates = true) => { | |
const q = queryEx(store, includeUpdates); | |||
return (query?: Q, options?: O & { observe?: boolean }) => { | |||
const [data, updates] = q(query, options); | |||
return options?.observe === false ? data : data.cat(updates); | |||
}; | |||
}; | |||
cin
|
r129 | ||
cin
|
r150 | /** | |
* Wraps the query method of the store, the resulting method takes a query | |||
* expression and returns two observable sequences. The first sequence represents | |||
* the results of the query, the second sequence provides the updates to the | |||
* query results. | |||
* | |||
* @param store The store used to query data | |||
* @param includeUpdates The flag to include item updates not only additions and | |||
* deletions. By default this flag is set to true. | |||
* @returns Two observable sequences | |||
*/ | |||
cin
|
r144 | export const queryEx = <T, Q, O>(store: Queryable<T, Q, O>, includeUpdates = true) => | |
(query?: Q, options?: O): [data: QueryResults<T>, updates: QueryResults<T>] => { | |||
cin
|
r150 | /** count active observers */ | |
let listeners = 0; | |||
let results: PromiseOrValue<T[]> = []; | |||
cin
|
r144 | ||
const data = observe<OrderedUpdate<T>>(({ next, complete, error }) => { | |||
cin
|
r129 | const processResults = (items: T[]) => | |
items.forEach((item, newIndex) => next({ item, newIndex, prevIndex: -1 })); | |||
cin
|
r116 | try { | |
cin
|
r150 | // is there are no active observers here, we need to query actual | |
// data from the store. | |||
if (listeners === 0) | |||
cin
|
r144 | results = store.query(query, options); | |
cin
|
r116 | if (isPromise(results)) { | |
cin
|
r144 | results.then(processResults).then(complete, error); | |
if (isCancellable(results)) | |||
return results.cancel.bind(results); | |||
cin
|
r116 | } else { | |
cin
|
r129 | processResults(results); | |
cin
|
r144 | complete(); | |
cin
|
r116 | } | |
cin
|
r144 | } catch (e) { | |
error(e); | |||
} | |||
}); | |||
cin
|
r116 | ||
cin
|
r144 | const updates = observe<OrderedUpdate<T>>(({ next, complete, error, isClosed }) => { | |
try { | |||
if (!isClosed() && isDjObservableResults<T>(results)) { | |||
cin
|
r150 | // subscribe fot the changes | |
listeners++; | |||
cin
|
r116 | const h = results.observe((item, prevIndex, newIndex) => next({ item, prevIndex, newIndex }), includeUpdates); | |
cin
|
r150 | return () => { | |
// unsubscribe from changes | |||
listeners--; | |||
h.remove(); | |||
}; | |||
cin
|
r116 | } else { | |
complete(); | |||
} | |||
cin
|
r144 | } catch (e) { | |
error(e); | |||
cin
|
r116 | } | |
}); | |||
cin
|
r144 | return [data, updates]; | |
cin
|
r116 | }; | |
cin
|
r136 | ||
cin
|
r144 | ||
cin
|
r136 | interface IndexedStore<T> { | |
get(id: string | number): PromiseLike<T> | T | null | undefined; | |||
} | |||
interface Notifications<T> { | |||
notify(item: T | undefined, id: string | number | undefined): void; | |||
} | |||
const hasNotifications = <T>(x: unknown): x is Notifications<T> => | |||
typeof x === "object" && x !== null && (typeof (x as Notifications<T>).notify === "function"); | |||
interface GetOpts { | |||
observe?: boolean; | |||
} | |||
cin
|
r144 | export type ItemUpdate<T> = [item: NonNullable<T>, id: string | number | undefined] | | |
cin
|
r136 | [item: undefined | null, id: string | number]; | |
const filterItem = (itemId: string | number) => | |||
cin
|
r144 | <T>(source: Observable<ItemUpdate<T>>) => | |
cin
|
r136 | observe<T>(({ next, complete, error }) => { | |
const subscription = source | |||
.filter(([, id]) => id === itemId) | |||
.subscribe({ | |||
next: ([item]) => item !== null && item !== undefined ? next(item) : complete(), | |||
complete, | |||
error | |||
}); | |||
return () => subscription.unsubscribe(); | |||
}); | |||
export const get = <T>(store: IndexedStore<T>) => { | |||
const changes = hasNotifications<T>(store) ? | |||
cin
|
r144 | observe<ItemUpdate<T>>(({ next }) => { | |
const handle = after(store, "notify", (...args: ItemUpdate<T>) => next(args), true); | |||
cin
|
r136 | return () => handle.remove(); | |
cin
|
r138 | }) : empty; | |
cin
|
r136 | ||
return (id: string | number, opts: GetOpts = {}) => | |||
observe<T>(({ next, complete, error }) => { | |||
cin
|
r138 | try { | |
const result = store.get(id); | |||
cin
|
r136 | ||
cin
|
r138 | const handle = (x: T | null | undefined) => { | |
if (x !== null && x !== undefined) | |||
next(x); | |||
complete(); | |||
}; | |||
cin
|
r136 | ||
cin
|
r138 | if (isPromise(result)) { | |
result.then(handle).then(undefined, error); | |||
cin
|
r136 | ||
cin
|
r138 | if (isCancellable(result)) | |
return () => result.cancel(); | |||
} else { | |||
handle(result); | |||
} | |||
} catch (e) { | |||
error(e); | |||
cin
|
r136 | } | |
}).cat(opts.observe !== false ? changes.pipe(filterItem(id)) : empty); | |||
}; |