|  |  | import { PromiseOrValue } from "@implab/core-amd/interfaces"; | 
                        
    
    
        |  |  | import { isPromise } from "@implab/core-amd/safe"; | 
                        
    
    
        |  |  |  | 
                        
    
    
        |  |  | /** | 
                        
    
    
        |  |  | * The interface for the consumer of an observable sequence | 
                        
    
    
        |  |  | */ | 
                        
    
    
        |  |  | export interface Observer<T> { | 
                        
    
    
        |  |  | /** | 
                        
    
    
        |  |  | * Called for the next element in the sequence | 
                        
    
    
        |  |  | */ | 
                        
    
    
        |  |  | next: (value: T) => void; | 
                        
    
    
        |  |  |  | 
                        
    
    
        |  |  | /** | 
                        
    
    
        |  |  | * Called once when the error occurs in the sequence. | 
                        
    
    
        |  |  | */ | 
                        
    
    
        |  |  | error: (e: unknown) => void; | 
                        
    
    
        |  |  |  | 
                        
    
    
        |  |  | /** | 
                        
    
    
        |  |  | * Called once at the end of the sequence. | 
                        
    
    
        |  |  | */ | 
                        
    
    
        |  |  | complete: () => void; | 
                        
    
    
        |  |  | } | 
                        
    
    
        |  |  |  | 
                        
    
    
        |  |  | /** | 
                        
    
    
        |  |  | * The group of functions to feed an observable. These methods are provided to | 
                        
    
    
        |  |  | * the producer to generate a stream of events. | 
                        
    
    
        |  |  | */ | 
                        
    
    
        |  |  | export type Sink<T> = { | 
                        
    
    
        |  |  | /** | 
                        
    
    
        |  |  | * Call to send the next element in the sequence | 
                        
    
    
        |  |  | */ | 
                        
    
    
        |  |  | next: (value: T) => void; | 
                        
    
    
        |  |  |  | 
                        
    
    
        |  |  | /** | 
                        
    
    
        |  |  | * Call to notify about the error occurred in the sequence. | 
                        
    
    
        |  |  | */ | 
                        
    
    
        |  |  | error: (e: unknown) => void; | 
                        
    
    
        |  |  |  | 
                        
    
    
        |  |  | /** | 
                        
    
    
        |  |  | * Call to signal the end of the sequence. | 
                        
    
    
        |  |  | */ | 
                        
    
    
        |  |  | complete: () => void; | 
                        
    
    
        |  |  |  | 
                        
    
    
        |  |  | /** | 
                        
    
    
        |  |  | * Checks whether the sink is accepting new elements. It's safe to | 
                        
    
    
        |  |  | * send elements to the closed sink. | 
                        
    
    
        |  |  | */ | 
                        
    
    
        |  |  | isClosed: () => boolean; | 
                        
    
    
        |  |  | }; | 
                        
    
    
        |  |  |  | 
                        
    
    
        |  |  | export type Producer<T> = (sink: Sink<T>) => (void | (() => void)); | 
                        
    
    
        |  |  |  | 
                        
    
    
        |  |  | export interface Unsubscribable { | 
                        
    
    
        |  |  | unsubscribe(): void; | 
                        
    
    
        |  |  | } | 
                        
    
    
        |  |  |  | 
                        
    
    
        |  |  | export const isUnsubsribable = (v: unknown): v is Unsubscribable => | 
                        
    
    
        |  |  | v !== null && v !== undefined && typeof (v as Unsubscribable).unsubscribe === "function"; | 
                        
    
    
        |  |  |  | 
                        
    
    
        |  |  | export const isSubsribable = <T = unknown>(v: unknown): v is Subscribable<T> => | 
                        
    
    
        |  |  | v !== null && v !== undefined && typeof (v as Subscribable<unknown>).subscribe === "function"; | 
                        
    
    
        |  |  |  | 
                        
    
    
        |  |  | export interface Subscribable<T> { | 
                        
    
    
        |  |  | subscribe(consumer: Partial<Observer<T>>): Unsubscribable; | 
                        
    
    
        |  |  | } | 
                        
    
    
        |  |  |  | 
                        
    
    
        |  |  | /** The observable source of items. */ | 
                        
    
    
        |  |  | export interface Observable<T> extends Subscribable<T> { | 
                        
    
    
        |  |  | /** Transforms elements of the sequence with the specified mapper | 
                        
    
    
        |  |  | * | 
                        
    
    
        |  |  | * @param mapper The mapper used to transform the values | 
                        
    
    
        |  |  | */ | 
                        
    
    
        |  |  | map<T2>(mapper: (value: T) => T2): Observable<T2>; | 
                        
    
    
        |  |  |  | 
                        
    
    
        |  |  | /** Filters elements of the sequence. The resulting sequence will | 
                        
    
    
        |  |  | * contain only elements which match the specified predicate. | 
                        
    
    
        |  |  | * | 
                        
    
    
        |  |  | * @param predicate The filter predicate. | 
                        
    
    
        |  |  | */ | 
                        
    
    
        |  |  | filter(predicate: (value: T) => boolean): Observable<T>; | 
                        
    
    
        |  |  |  | 
                        
    
    
        |  |  | /** Applies accumulator to each value in the sequence and | 
                        
    
    
        |  |  | * emits the accumulated value for each source element | 
                        
    
    
        |  |  | * | 
                        
    
    
        |  |  | * @param accumulator | 
                        
    
    
        |  |  | * @param initial | 
                        
    
    
        |  |  | */ | 
                        
    
    
        |  |  | scan<A>(accumulator: (acc: A, value: T) => A, initial: A): Observable<A>; | 
                        
    
    
        |  |  |  | 
                        
    
    
        |  |  | cat(...seq: Subscribable<T>[]): Observable<T>; | 
                        
    
    
        |  |  | } | 
                        
    
    
        |  |  |  | 
                        
    
    
        |  |  | const noop = () => { }; | 
                        
    
    
        |  |  |  | 
                        
    
    
        |  |  | const sink = <T>(consumer: Partial<Observer<T>>) => { | 
                        
    
    
        |  |  | const { next, error, complete } = consumer; | 
                        
    
    
        |  |  | return { | 
                        
    
    
        |  |  | next: next ? next.bind(consumer) : noop, | 
                        
    
    
        |  |  | error: error ? error.bind(consumer) : noop, | 
                        
    
    
        |  |  | complete: complete ? complete.bind(consumer) : noop, | 
                        
    
    
        |  |  | isClosed: () => false | 
                        
    
    
        |  |  | }; | 
                        
    
    
        |  |  | }; | 
                        
    
    
        |  |  |  | 
                        
    
    
        |  |  | /** Wraps the producer to handle tear down logic and subscription management | 
                        
    
    
        |  |  | * | 
                        
    
    
        |  |  | * @param producer The producer to wrap | 
                        
    
    
        |  |  | * @returns The wrapper producer | 
                        
    
    
        |  |  | */ | 
                        
    
    
        |  |  | const fuse = <T>(producer: Producer<T>) => ({ next, error, complete }: Sink<T>) => { | 
                        
    
    
        |  |  | let done = false; | 
                        
    
    
        |  |  | let cleanup = noop; | 
                        
    
    
        |  |  |  | 
                        
    
    
        |  |  | const _fin = <A extends unknown[]>(fn: (...args: A) => void) => | 
                        
    
    
        |  |  | (...args: A) => done ? | 
                        
    
    
        |  |  | void (0) : | 
                        
    
    
        |  |  | (done = true, cleanup(), fn(...args)); | 
                        
    
    
        |  |  |  | 
                        
    
    
        |  |  | const safeSink = { | 
                        
    
    
        |  |  | next: (value: T) => { !done && next(value); }, | 
                        
    
    
        |  |  | error: _fin(error), | 
                        
    
    
        |  |  | complete: _fin(complete), | 
                        
    
    
        |  |  | isClosed: () => done | 
                        
    
    
        |  |  | }; | 
                        
    
    
        |  |  | cleanup = producer(safeSink) ?? noop; | 
                        
    
    
        |  |  | return done ? | 
                        
    
    
        |  |  | (cleanup(), noop) : | 
                        
    
    
        |  |  | _fin(noop); | 
                        
    
    
        |  |  | }; | 
                        
    
    
        |  |  |  | 
                        
    
    
        |  |  | const _observe = <T>(producer: Producer<T>): Observable<T> => ({ | 
                        
    
    
        |  |  | subscribe: (consumer: Partial<Observer<T>>) => ({ | 
                        
    
    
        |  |  | unsubscribe: producer(sink(consumer)) ?? noop | 
                        
    
    
        |  |  | }), | 
                        
    
    
        |  |  | map: (mapper) => _observe(({ next, ...rest }) => | 
                        
    
    
        |  |  | producer({ | 
                        
    
    
        |  |  | next: next !== noop ? (v: T) => next(mapper(v)) : noop, | 
                        
    
    
        |  |  | ...rest | 
                        
    
    
        |  |  | }) | 
                        
    
    
        |  |  | ), | 
                        
    
    
        |  |  | filter: (predicate) => _observe(({ next, ...rest }) => | 
                        
    
    
        |  |  | producer({ | 
                        
    
    
        |  |  | next: next !== noop ? (v: T) => predicate(v) ? next(v) : void (0) : noop, | 
                        
    
    
        |  |  | ...rest | 
                        
    
    
        |  |  | }) | 
                        
    
    
        |  |  | ), | 
                        
    
    
        |  |  | scan: (accumulator, initial) => _observe(({ next, ...rest }) => { | 
                        
    
    
        |  |  | let _acc = initial; | 
                        
    
    
        |  |  | return producer({ | 
                        
    
    
        |  |  | next: next !== noop ? (v: T) => next(_acc = accumulator(_acc, v)) : noop, | 
                        
    
    
        |  |  | ...rest | 
                        
    
    
        |  |  | }); | 
                        
    
    
        |  |  | }), | 
                        
    
    
        |  |  |  | 
                        
    
    
        |  |  | cat: (...seq) => _observe(({ next, complete: final, ...rest }) => { | 
                        
    
    
        |  |  | let cleanup: () => void; | 
                        
    
    
        |  |  | const complete = () => { | 
                        
    
    
        |  |  | const continuation = seq.shift(); | 
                        
    
    
        |  |  | if (continuation) { | 
                        
    
    
        |  |  | // if we have a next sequence, subscribe to it | 
                        
    
    
        |  |  | const subscription = continuation.subscribe({ next, complete, ...rest }); | 
                        
    
    
        |  |  | cleanup = subscription.unsubscribe.bind(subscription); | 
                        
    
    
        |  |  | } else { | 
                        
    
    
        |  |  | // otherwise notify the consumer about completion | 
                        
    
    
        |  |  | final(); | 
                        
    
    
        |  |  | } | 
                        
    
    
        |  |  | }; | 
                        
    
    
        |  |  |  | 
                        
    
    
        |  |  | cleanup = producer({ next, complete, ...rest }) ?? noop; | 
                        
    
    
        |  |  |  | 
                        
    
    
        |  |  | return () => cleanup(); | 
                        
    
    
        |  |  | }) | 
                        
    
    
        |  |  | }); | 
                        
    
    
        |  |  |  | 
                        
    
    
        |  |  | export interface OrderUpdate<T> { | 
                        
    
    
        |  |  | /** The item is being updated */ | 
                        
    
    
        |  |  | item: T; | 
                        
    
    
        |  |  |  | 
                        
    
    
        |  |  | /** The previous index of the item, -1 in case it is inserted */ | 
                        
    
    
        |  |  | prevIndex: number; | 
                        
    
    
        |  |  |  | 
                        
    
    
        |  |  | /** The new index of the item, -1 in case it is deleted */ | 
                        
    
    
        |  |  | newIndex: number; | 
                        
    
    
        |  |  | } | 
                        
    
    
        |  |  |  | 
                        
    
    
        |  |  | interface ObservableResults<T> { | 
                        
    
    
        |  |  | /** | 
                        
    
    
        |  |  | * Allows observation of results | 
                        
    
    
        |  |  | */ | 
                        
    
    
        |  |  | observe(listener: (object: T, previousIndex: number, newIndex: number) => void, includeUpdates?: boolean): { | 
                        
    
    
        |  |  | remove(): void; | 
                        
    
    
        |  |  | }; | 
                        
    
    
        |  |  | } | 
                        
    
    
        |  |  |  | 
                        
    
    
        |  |  | interface Queryable<T, A extends unknown[]> { | 
                        
    
    
        |  |  | query(...args: A): PromiseOrValue<T[]>; | 
                        
    
    
        |  |  | } | 
                        
    
    
        |  |  |  | 
                        
    
    
        |  |  | export const isObservableResults = <T>(v: object): v is ObservableResults<T> => | 
                        
    
    
        |  |  | v && (typeof (v as { observe?: unknown; }).observe === "function"); | 
                        
    
    
        |  |  |  | 
                        
    
    
        |  |  | export const observe = <T>(producer: Producer<T>) => _observe(fuse(producer)); | 
                        
    
    
        |  |  |  | 
                        
    
    
        |  |  | export const empty = observe<never>(({ complete }) => complete()); | 
                        
    
    
        |  |  |  | 
                        
    
    
        |  |  | export const query = <T, A extends unknown[]>(store: Queryable<T, A>) => | 
                        
    
    
        |  |  | (...args: A) => { | 
                        
    
    
        |  |  | return observe<OrderUpdate<T>>(({ next, complete, error }) => { | 
                        
    
    
        |  |  | try { | 
                        
    
    
        |  |  | const results = store.query(...args); | 
                        
    
    
        |  |  | if (isPromise(results)) { | 
                        
    
    
        |  |  | results.then(items => items.forEach((item, newIndex) => next({ item, newIndex, prevIndex: -1 }))) | 
                        
    
    
        |  |  | .then(undefined, error); | 
                        
    
    
        |  |  | } else { | 
                        
    
    
        |  |  | results.forEach((item, newIndex) => next({ item, newIndex, prevIndex: -1 })); | 
                        
    
    
        |  |  | } | 
                        
    
    
        |  |  |  | 
                        
    
    
        |  |  | if (isObservableResults<T>(results)) { | 
                        
    
    
        |  |  | const h = results.observe((item, prevIndex, newIndex) => next({ item, prevIndex, newIndex })); | 
                        
    
    
        |  |  | return () => h.remove(); | 
                        
    
    
        |  |  | } else { | 
                        
    
    
        |  |  | complete(); | 
                        
    
    
        |  |  | } | 
                        
    
    
        |  |  | } catch (err) { | 
                        
    
    
        |  |  | error(err); | 
                        
    
    
        |  |  | } | 
                        
    
    
        |  |  | }); | 
                        
    
    
        |  |  |  | 
                        
    
    
        |  |  | }; | 
                        
    
    
        |  |  |  |