observable.ts
233 lines
| 7.1 KiB
| video/mp2t
|
TypeScriptLexer
cin
|
r110 | import { PromiseOrValue } from "@implab/core-amd/interfaces"; | ||
import { isPromise } from "@implab/core-amd/safe"; | ||||
cin
|
r102 | /** | ||
* The interface for the consumer of an observable sequence | ||||
*/ | ||||
export interface Observer<T> { | ||||
/** | ||||
* Called for the next element in the sequence | ||||
*/ | ||||
next: (value: T) => void; | ||||
cin
|
r96 | |||
cin
|
r102 | /** | ||
* Called once when the error occurs in the sequence. | ||||
*/ | ||||
cin
|
r96 | error: (e: unknown) => void; | ||
cin
|
r102 | |||
/** | ||||
* Called once at the end of the sequence. | ||||
*/ | ||||
cin
|
r96 | complete: () => void; | ||
} | ||||
cin
|
r102 | /** | ||
cin
|
r110 | * The group of functions to feed an observable. These methods are provided to | ||
cin
|
r102 | * the producer to generate a stream of events. | ||
*/ | ||||
export type Sink<T> = { | ||||
cin
|
r110 | /** | ||
* Call to send the next element in the sequence | ||||
*/ | ||||
next: (value: T) => void; | ||||
/** | ||||
* Call to notify about the error occurred in the sequence. | ||||
*/ | ||||
error: (e: unknown) => void; | ||||
/** | ||||
* Call to signal the end of the sequence. | ||||
*/ | ||||
complete: () => void; | ||||
/** | ||||
* Checks whether the sink is accepting new elements. It's safe to | ||||
* send elements to the closed sink. | ||||
*/ | ||||
isClosed: () => boolean; | ||||
cin
|
r102 | }; | ||
cin
|
r96 | |||
export type Producer<T> = (sink: Sink<T>) => (void | (() => void)); | ||||
cin
|
r102 | export interface Unsubscribable { | ||
unsubscribe(): void; | ||||
} | ||||
export const isUnsubsribable = (v: unknown): v is Unsubscribable => | ||||
v !== null && v !== undefined && typeof (v as Unsubscribable).unsubscribe === "function"; | ||||
cin
|
r107 | export const isSubsribable = <T = unknown>(v: unknown): v is Subscribable<T> => | ||
cin
|
r102 | v !== null && v !== undefined && typeof (v as Subscribable<unknown>).subscribe === "function"; | ||
cin
|
r96 | |||
cin
|
r102 | export interface Subscribable<T> { | ||
subscribe(consumer: Partial<Observer<T>>): Unsubscribable; | ||||
} | ||||
/** The observable source of items. */ | ||||
export interface Observable<T> extends Subscribable<T> { | ||||
/** Transforms elements of the sequence with the specified mapper | ||||
* | ||||
* @param mapper The mapper used to transform the values | ||||
*/ | ||||
map<T2>(mapper: (value: T) => T2): Observable<T2>; | ||||
cin
|
r96 | |||
cin
|
r102 | /** Filters elements of the sequence. The resulting sequence will | ||
* contain only elements which match the specified predicate. | ||||
* | ||||
* @param predicate The filter predicate. | ||||
*/ | ||||
filter(predicate: (value: T) => boolean): Observable<T>; | ||||
cin
|
r96 | |||
cin
|
r102 | /** Applies accumulator to each value in the sequence and | ||
* emits the accumulated value for each source element | ||||
* | ||||
* @param accumulator | ||||
* @param initial | ||||
*/ | ||||
scan<A>(accumulator: (acc: A, value: T) => A, initial: A): Observable<A>; | ||||
cin
|
r110 | |||
cat(...seq: Subscribable<T>[]): Observable<T>; | ||||
cin
|
r114 | |||
pipe<U>(f: (source: Observable<T>) => Producer<U>): Observable<U>; | ||||
cin
|
r102 | } | ||
const noop = () => { }; | ||||
const sink = <T>(consumer: Partial<Observer<T>>) => { | ||||
const { next, error, complete } = consumer; | ||||
cin
|
r96 | return { | ||
cin
|
r102 | next: next ? next.bind(consumer) : noop, | ||
error: error ? error.bind(consumer) : noop, | ||||
cin
|
r110 | complete: complete ? complete.bind(consumer) : noop, | ||
isClosed: () => false | ||||
cin
|
r109 | }; | ||
cin
|
r102 | }; | ||
cin
|
r110 | /** Wraps the producer to handle tear down logic and subscription management | ||
* | ||||
* @param producer The producer to wrap | ||||
* @returns The wrapper producer | ||||
*/ | ||||
const fuse = <T>(producer: Producer<T>) => ({ next, error, complete }: Sink<T>) => { | ||||
cin
|
r102 | let done = false; | ||
cin
|
r110 | let cleanup = noop; | ||
const _fin = <A extends unknown[]>(fn: (...args: A) => void) => | ||||
(...args: A) => done ? | ||||
void (0) : | ||||
(done = true, cleanup(), fn(...args)); | ||||
const safeSink = { | ||||
cin
|
r109 | next: (value: T) => { !done && next(value); }, | ||
cin
|
r110 | error: _fin(error), | ||
complete: _fin(complete), | ||||
isClosed: () => done | ||||
cin
|
r109 | }; | ||
cin
|
r110 | cleanup = producer(safeSink) ?? noop; | ||
return done ? | ||||
(cleanup(), noop) : | ||||
_fin(noop); | ||||
cin
|
r109 | }; | ||
cin
|
r96 | |||
cin
|
r102 | const _observe = <T>(producer: Producer<T>): Observable<T> => ({ | ||
subscribe: (consumer: Partial<Observer<T>>) => ({ | ||||
unsubscribe: producer(sink(consumer)) ?? noop | ||||
}), | ||||
cin
|
r110 | map: (mapper) => _observe(({ next, ...rest }) => | ||
cin
|
r102 | producer({ | ||
next: next !== noop ? (v: T) => next(mapper(v)) : noop, | ||||
cin
|
r110 | ...rest | ||
}) | ||||
), | ||||
filter: (predicate) => _observe(({ next, ...rest }) => | ||||
producer({ | ||||
next: next !== noop ? (v: T) => predicate(v) ? next(v) : void (0) : noop, | ||||
...rest | ||||
cin
|
r102 | }) | ||
), | ||||
cin
|
r110 | scan: (accumulator, initial) => _observe(({ next, ...rest }) => { | ||
cin
|
r102 | let _acc = initial; | ||
return producer({ | ||||
cin
|
r110 | next: next !== noop ? (v: T) => next(_acc = accumulator(_acc, v)) : noop, | ||
...rest | ||||
cin
|
r102 | }); | ||
cin
|
r110 | }), | ||
cat: (...seq) => _observe(({ next, complete: final, ...rest }) => { | ||||
let cleanup: () => void; | ||||
const complete = () => { | ||||
const continuation = seq.shift(); | ||||
if (continuation) { | ||||
// if we have a next sequence, subscribe to it | ||||
const subscription = continuation.subscribe({ next, complete, ...rest }); | ||||
cleanup = subscription.unsubscribe.bind(subscription); | ||||
} else { | ||||
// otherwise notify the consumer about completion | ||||
final(); | ||||
} | ||||
}; | ||||
cleanup = producer({ next, complete, ...rest }) ?? noop; | ||||
return () => cleanup(); | ||||
cin
|
r114 | }), | ||
pipe: <U>(f: (source: Observable<T>) => Producer<U>) => observe(f(_observe(producer))) | ||||
cin
|
r96 | }); | ||
cin
|
r102 | |||
cin
|
r110 | export interface OrderUpdate<T> { | ||
/** The item is being updated */ | ||||
item: T; | ||||
/** The previous index of the item, -1 in case it is inserted */ | ||||
prevIndex: number; | ||||
/** The new index of the item, -1 in case it is deleted */ | ||||
newIndex: number; | ||||
} | ||||
interface ObservableResults<T> { | ||||
/** | ||||
* Allows observation of results | ||||
*/ | ||||
observe(listener: (object: T, previousIndex: number, newIndex: number) => void, includeUpdates?: boolean): { | ||||
remove(): void; | ||||
}; | ||||
} | ||||
interface Queryable<T, A extends unknown[]> { | ||||
query(...args: A): PromiseOrValue<T[]>; | ||||
} | ||||
export const isObservableResults = <T>(v: object): v is ObservableResults<T> => | ||||
v && (typeof (v as { observe?: unknown; }).observe === "function"); | ||||
export const observe = <T>(producer: Producer<T>) => _observe(fuse(producer)); | ||||
export const empty = observe<never>(({ complete }) => complete()); | ||||
export const query = <T, A extends unknown[]>(store: Queryable<T, A>) => | ||||
(...args: A) => { | ||||
return observe<OrderUpdate<T>>(({ next, complete, error }) => { | ||||
try { | ||||
const results = store.query(...args); | ||||
if (isPromise(results)) { | ||||
results.then(items => items.forEach((item, newIndex) => next({ item, newIndex, prevIndex: -1 }))) | ||||
.then(undefined, error); | ||||
} else { | ||||
results.forEach((item, newIndex) => next({ item, newIndex, prevIndex: -1 })); | ||||
} | ||||
if (isObservableResults<T>(results)) { | ||||
const h = results.observe((item, prevIndex, newIndex) => next({ item, prevIndex, newIndex })); | ||||
return () => h.remove(); | ||||
} else { | ||||
complete(); | ||||
} | ||||
} catch (err) { | ||||
error(err); | ||||
} | ||||
}); | ||||
}; | ||||