observable.ts
522 lines
| 15.9 KiB
| video/mp2t
|
TypeScriptLexer
|
|
r116 | import { Cancellation } from "@implab/core-amd/Cancellation"; | |
| import { ICancellation } from "@implab/core-amd/interfaces"; | |||
|
|
r129 | import { isPromise } from "@implab/core-amd/safe"; | |
|
|
r110 | ||
|
|
r102 | /** | |
| * The interface for the consumer of an observable sequence | |||
| */ | |||
| export interface Observer<T> { | |||
| /** | |||
| * Called for the next element in the sequence | |||
| */ | |||
| next: (value: T) => void; | |||
|
|
r96 | ||
|
|
r102 | /** | |
| * Called once when the error occurs in the sequence. | |||
| */ | |||
|
|
r96 | error: (e: unknown) => void; | |
|
|
r102 | ||
| /** | |||
| * Called once at the end of the sequence. | |||
| */ | |||
|
|
r96 | complete: () => void; | |
| } | |||
|
|
r102 | /** | |
|
|
r110 | * The group of functions to feed an observable. These methods are provided to | |
|
|
r102 | * the producer to generate a stream of events. | |
| */ | |||
| export type Sink<T> = { | |||
|
|
r110 | /** | |
| * Call to send the next element in the sequence | |||
| */ | |||
|
|
r116 | next: (value: T) => void; | |
| /** | |||
| * Call to notify about the error occurred in the sequence. | |||
| */ | |||
| error: (e: unknown) => void; | |||
|
|
r110 | ||
|
|
r116 | /** | |
| * Call to signal the end of the sequence. | |||
| */ | |||
| complete: () => void; | |||
|
|
r110 | ||
|
|
r116 | /** | |
| * Checks whether the sink is accepting new elements. It's safe to | |||
| * send elements to the closed sink. | |||
| */ | |||
| isClosed: () => boolean; | |||
|
|
r102 | }; | |
|
|
r96 | ||
| export type Producer<T> = (sink: Sink<T>) => (void | (() => void)); | |||
|
|
r102 | export interface Unsubscribable { | |
| unsubscribe(): void; | |||
| } | |||
|
|
r118 | export const isUnsubscribable = (v: unknown): v is Unsubscribable => | |
|
|
r102 | v !== null && v !== undefined && typeof (v as Unsubscribable).unsubscribe === "function"; | |
|
|
r118 | export const isSubscribable = <T = unknown>(v: unknown): v is Subscribable<T> => | |
|
|
r102 | v !== null && v !== undefined && typeof (v as Subscribable<unknown>).subscribe === "function"; | |
|
|
r96 | ||
|
|
r102 | export interface Subscribable<T> { | |
| subscribe(consumer: Partial<Observer<T>>): Unsubscribable; | |||
| } | |||
|
|
r116 | export type AccumulatorFn<T, A> = (acc: A, value: T) => A; | |
|
|
r124 | export type OperatorFn<T, U> = (source: Observable<T>) => Producer<U>; | |
|
|
r102 | /** The observable source of items. */ | |
| export interface Observable<T> extends Subscribable<T> { | |||
| /** Transforms elements of the sequence with the specified mapper | |||
| * | |||
| * @param mapper The mapper used to transform the values | |||
| */ | |||
| map<T2>(mapper: (value: T) => T2): Observable<T2>; | |||
|
|
r96 | ||
|
|
r102 | /** Filters elements of the sequence. The resulting sequence will | |
| * contain only elements which match the specified predicate. | |||
| * | |||
| * @param predicate The filter predicate. | |||
| */ | |||
| filter(predicate: (value: T) => boolean): Observable<T>; | |||
|
|
r96 | ||
|
|
r124 | /** Completes the sequence once the condition is met. | |
| * @param predicate The condition which should be met to complete the sequence | |||
| */ | |||
| until(predicate: (value: T) => boolean): Observable<T>; | |||
| /** Keeps the sequence running while elements satisfy the condition. | |||
| * | |||
| * @param predicate The condition which should be met to continue. | |||
| */ | |||
| while(predicate: (value: T) => boolean): Observable<T>; | |||
|
|
r102 | /** Applies accumulator to each value in the sequence and | |
| * emits the accumulated value for each source element | |||
| * | |||
| * @param accumulator | |||
| * @param initial | |||
| */ | |||
|
|
r116 | scan<A>(accumulator: AccumulatorFn<T, A>, initial: A): Observable<A>; | |
| scan(accumulator: AccumulatorFn<T, T>): Observable<T>; | |||
|
|
r110 | ||
|
|
r116 | /** Applies accumulator to each value in the sequence and | |
| * emits the accumulated value at the end of the sequence | |||
| * | |||
| * @param accumulator | |||
| * @param initial | |||
| */ | |||
| reduce<A>(accumulator: AccumulatorFn<T, A>, initial: A): Observable<A>; | |||
| reduce(accumulator: AccumulatorFn<T, T>): Observable<T>; | |||
| /** Concatenates the specified sequences with this observable | |||
| * | |||
| * @param seq sequences to concatenate with the current observable | |||
|
|
r124 | * | |
| * The concatenation doesn't accumulate values from the specified sequences, | |||
| * The result of the concatenation is the new observable which will switch | |||
| * to the next observable after the previous one completes. Values emitted | |||
| * before the next observable being active are lost. | |||
|
|
r116 | */ | |
|
|
r110 | cat(...seq: Subscribable<T>[]): Observable<T>; | |
|
|
r114 | ||
|
|
r124 | ||
|
|
r116 | /** Pipes the specified operator to produce the new observable | |
|
|
r124 | * @param op The operator consumes this observable and produces a new one | |
| * | |||
| * The operator is a higher order function which takes a source observable | |||
| * and returns a producer for the new observable. | |||
| * | |||
| * This function can be used to create a complex mapping between source and | |||
| * resulting observables. The operator may have a state (or a side effect) | |||
| * and can be connected to multiple observables. | |||
|
|
r116 | */ | |
|
|
r124 | pipe<U>(op: OperatorFn<T, U>): Observable<U>; | |
|
|
r116 | ||
| /** Waits for the next event to occur and returns a promise for the next value | |||
|
|
r129 | * @param ct Cancellation token | |
|
|
r116 | */ | |
| next(ct?: ICancellation): Promise<T>; | |||
|
|
r129 | ||
| /** Collects items of the sequence to the array. */ | |||
| collect(ct?: ICancellation): Promise<T[]>; | |||
|
|
r102 | } | |
| const noop = () => { }; | |||
| const sink = <T>(consumer: Partial<Observer<T>>) => { | |||
| const { next, error, complete } = consumer; | |||
|
|
r96 | return { | |
|
|
r102 | next: next ? next.bind(consumer) : noop, | |
| error: error ? error.bind(consumer) : noop, | |||
|
|
r110 | complete: complete ? complete.bind(consumer) : noop, | |
| isClosed: () => false | |||
|
|
r109 | }; | |
|
|
r102 | }; | |
|
|
r110 | /** Wraps the producer to handle tear down logic and subscription management | |
| * | |||
| * @param producer The producer to wrap | |||
| * @returns The wrapper producer | |||
| */ | |||
| const fuse = <T>(producer: Producer<T>) => ({ next, error, complete }: Sink<T>) => { | |||
|
|
r102 | let done = false; | |
|
|
r110 | let cleanup = noop; | |
| const _fin = <A extends unknown[]>(fn: (...args: A) => void) => | |||
| (...args: A) => done ? | |||
| void (0) : | |||
| (done = true, cleanup(), fn(...args)); | |||
|
|
r124 | const _fin0 = () => done ? void (0) : (done = true, cleanup()); | |
|
|
r110 | const safeSink = { | |
|
|
r109 | next: (value: T) => { !done && next(value); }, | |
|
|
r110 | error: _fin(error), | |
| complete: _fin(complete), | |||
| isClosed: () => done | |||
|
|
r109 | }; | |
|
|
r110 | cleanup = producer(safeSink) ?? noop; | |
|
|
r124 | return done ? cleanup() : _fin0; | |
|
|
r109 | }; | |
|
|
r96 | ||
|
|
r102 | const _observe = <T>(producer: Producer<T>): Observable<T> => ({ | |
| subscribe: (consumer: Partial<Observer<T>>) => ({ | |||
| unsubscribe: producer(sink(consumer)) ?? noop | |||
| }), | |||
|
|
r116 | ||
|
|
r110 | map: (mapper) => _observe(({ next, ...rest }) => | |
|
|
r102 | producer({ | |
| next: next !== noop ? (v: T) => next(mapper(v)) : noop, | |||
|
|
r110 | ...rest | |
| }) | |||
| ), | |||
|
|
r116 | ||
|
|
r110 | filter: (predicate) => _observe(({ next, ...rest }) => | |
| producer({ | |||
| next: next !== noop ? (v: T) => predicate(v) ? next(v) : void (0) : noop, | |||
| ...rest | |||
|
|
r102 | }) | |
| ), | |||
|
|
r116 | ||
|
|
r124 | until: predicate => _observe(({ next, complete, ...rest }) => | |
| producer({ | |||
| next: v => predicate(v) ? complete() : next(v), | |||
| complete, | |||
| ...rest | |||
| }) | |||
| ), | |||
| while: predicate => _observe(({ next, complete, ...rest }) => | |||
| producer({ | |||
| next: v => predicate(v) ? next(v) : complete(), | |||
| complete, | |||
| ...rest | |||
| }) | |||
| ), | |||
|
|
r116 | scan: <A>(...args: [AccumulatorFn<T, A>, A] | [AccumulatorFn<T, T>]) => _observe<T | A>(({ next, ...rest }) => { | |
| if (args.length === 1) { | |||
| const [accumulator] = args; | |||
| let _acc: T; | |||
| let index = 0; | |||
| return producer({ | |||
| next: next !== noop ? (v: T) => next(index++ === 0 ? _acc = v : _acc = accumulator(_acc, v)) : noop, | |||
| ...rest | |||
| }); | |||
| } else { | |||
| const [accumulator, initial] = args; | |||
| let _acc = initial; | |||
| return producer({ | |||
| next: next !== noop ? (v: T) => next(_acc = accumulator(_acc, v)) : noop, | |||
| ...rest | |||
| }); | |||
| } | |||
| }), | |||
| reduce: <A>(...args: [AccumulatorFn<T, A>, A] | [AccumulatorFn<T, T>]) => _observe<T | A>(({ next, complete, error, ...rest }) => { | |||
| if (args.length === 1) { | |||
| const [accumulator] = args; | |||
| let _acc: T; | |||
| let index = 0; | |||
| return producer({ | |||
| next: next !== noop ? (v: T) => { | |||
| _acc = index++ === 0 ? v : accumulator(_acc, v); | |||
| } : noop, | |||
| complete: () => { | |||
| if (index === 0) { | |||
| error(new Error("The sequence can't be empty")); | |||
| } else { | |||
| next(_acc); | |||
| complete(); | |||
| } | |||
| }, | |||
| error, | |||
| ...rest | |||
| }); | |||
| } else { | |||
| const [accumulator, initial] = args; | |||
| let _acc = initial; | |||
| return producer({ | |||
| next: next !== noop ? (v: T) => { | |||
| _acc = accumulator(_acc, v); | |||
| } : noop, | |||
| complete: () => { | |||
| next(_acc); | |||
| complete(); | |||
| }, | |||
| error, | |||
| ...rest | |||
| }); | |||
| } | |||
|
|
r110 | }), | |
| cat: (...seq) => _observe(({ next, complete: final, ...rest }) => { | |||
| let cleanup: () => void; | |||
| const complete = () => { | |||
| const continuation = seq.shift(); | |||
| if (continuation) { | |||
| // if we have a next sequence, subscribe to it | |||
| const subscription = continuation.subscribe({ next, complete, ...rest }); | |||
| cleanup = subscription.unsubscribe.bind(subscription); | |||
| } else { | |||
| // otherwise notify the consumer about completion | |||
| final(); | |||
| } | |||
| }; | |||
| cleanup = producer({ next, complete, ...rest }) ?? noop; | |||
| return () => cleanup(); | |||
|
|
r114 | }), | |
|
|
r116 | pipe: <U>(op: (source: Observable<T>) => Producer<U>) => observe(op(_observe(producer))), | |
|
|
r110 | ||
|
|
r129 | next: collect( | |
| producer, | |||
| ({ next, complete, error, isClosed }) => ({ | |||
| next: v => (next(v), complete()), | |||
| complete: () => error(new Error("The sequence is empty")), | |||
| error, | |||
| isClosed | |||
| }) | |||
| ), | |||
| collect: collect( | |||
| producer, | |||
| ({ next, complete, ...rest }) => { | |||
| const data: T[] = []; | |||
| return { | |||
| next: v => data.push(v), | |||
| complete: () => (next(data), complete()), | |||
| ...rest | |||
| }; | |||
| } | |||
| ) | |||
| }); | |||
|
|
r110 | ||
|
|
r129 | const collect = <T, U>( | |
| producer: Producer<T>, | |||
| collector: (result: Sink<U>) => Sink<T> | |||
| ) => (ct = Cancellation.none) => new Promise<U>((resolve, reject) => { | |||
| const fused = fuse<U>(({ next, complete, error, isClosed }) => { | |||
| const h = ct.register(error); | |||
| const cleanup = !isClosed() ? | |||
| producer(collector({ next, complete, error, isClosed })) ?? noop : | |||
| noop; | |||
|
|
r110 | ||
|
|
r129 | return () => { | |
| h.destroy(); | |||
| cleanup(); | |||
| }; | |||
| }); | |||
|
|
r110 | ||
|
|
r129 | fused({ | |
| next: resolve, | |||
| error: reject, | |||
| complete: noop, | |||
| isClosed: () => false | |||
| }); | |||
|
|
r116 | }); | |
|
|
r110 | ||
| export const observe = <T>(producer: Producer<T>) => _observe(fuse(producer)); | |||
|
|
r129 | export const ofArray = <T>(items: T[]) => _observe<T>( | |
|
|
r116 | ({ next, complete }) => ( | |
| items.forEach(next), | |||
| complete() | |||
| ) | |||
| ); | |||
|
|
r110 | ||
|
|
r129 | const of1 = <T>(item: T | PromiseLike<T>) => observe<T>( | |
| ({ next, error, complete }) => | |||
| isPromise(item) ? | |||
| void item.then( | |||
| v => (next(v), complete()), | |||
| error | |||
| ) : | |||
| (next(item), complete()) | |||
|
|
r116 | ); | |
|
|
r110 | ||
|
|
r129 | export const of = <T>(...items: (T | PromiseLike<T>)[]) => items.length === 1 ? | |
| of1(items[0]) : | |||
| observe<T>( | |||
| ({ next, error, complete, isClosed }) => { | |||
| const n = items.length; | |||
| const _next = (start: number) => { | |||
| if (start > 0 && isClosed()) // when resumed | |||
| return; | |||
| for (let i = start; i < n; i++) { | |||
| const r = items[i]; | |||
| if (isPromise(r)) { | |||
| r.then(v => (next(v), _next(i + 1)), error); | |||
| return; // suspend | |||
| } else { | |||
| next(r); | |||
| } | |||
| } | |||
| complete(); | |||
| }; | |||
| _next(0); | |||
| } | |||
| ); | |||
|
|
r110 | ||
|
|
r124 | export const empty = _observe<never>(({ complete }) => complete()); | |
| /** | |||
| * Creates a mutable state and the observable for the stored value. | |||
| * | |||
| * @param value The initial value for the state | |||
| * @returns an array of three elements `[observable, setter, getter]` | |||
| * | |||
| * The returned observable keeps the actual value and will emit it as the next | |||
| * element each time a consumer subscribes the observable. | |||
| * | |||
| * Calling the setter will update the stored value in the observable and notify | |||
| * all consumers. | |||
| */ | |||
| export const stateful = <T>(producer: Producer<T>): Producer<T> => { | |||
| const fusedProducer = fuse(producer); | |||
| type Status = "active" | "complete" | "error"; | |||
| let lastValue: T; | |||
| let hasValue = false; | |||
| let status: Status = "active"; | |||
| let lastError: unknown; | |||
| let subscribers: Sink<T>[] = []; | |||
| const sink: Sink<T> = { | |||
| isClosed: () => status !== "active", | |||
| complete: () => { | |||
| if (status === "active") { | |||
| status = "complete"; | |||
| const _subscribers = subscribers; | |||
| subscribers = []; | |||
| _subscribers.forEach(s => s.complete()); | |||
| } | |||
| }, | |||
| error: e => { | |||
| if (status === "active") { | |||
| status = "error"; | |||
| lastError = e; | |||
| const _subscribers = subscribers; | |||
| subscribers = []; | |||
| _subscribers.forEach(s => s.error(e)); | |||
| } | |||
| }, | |||
| next: v => { | |||
| if (status === "active") { | |||
| hasValue = true; | |||
| lastValue = v; | |||
| const _subscribers = subscribers; | |||
| _subscribers.forEach(s => s.next(v)); | |||
| } | |||
| } | |||
| }; | |||
| fusedProducer(sink); | |||
| return (s: Sink<T>) => { | |||
| const _subscribers = subscribers; | |||
| switch (status) { | |||
| case "active": | |||
| if (hasValue) | |||
| s.next(lastValue); // if hasValue is true, | |||
|
|
r125 | // lastValue has a valid value | |
|
|
r124 | subscribers.push(s); | |
| return () => { | |||
| if (_subscribers === subscribers) { | |||
| const pos = subscribers.indexOf(s); | |||
| if (pos >= 0) | |||
| subscribers.splice(pos, 1); | |||
| } | |||
| }; | |||
| case "complete": | |||
| s.complete(); | |||
| break; | |||
| case "error": | |||
| s.error(lastError); | |||
| break; | |||
| } | |||
| }; | |||
| }; | |||
|
|
r125 | /** Create the producer which will be called once when the first subscriber is | |
| * attached, next subscribers would share the same producer. When all | |||
| * subscribers are removed the producer will be cleaned up. | |||
| * | |||
| * Use this wrapper to prevent spawning multiple producers. | |||
| * | |||
| * @param producer The source producer | |||
| * @returns The wrapped producer | |||
| */ | |||
| export const subject = <T>(producer: Producer<T>): Producer<T> => { | |||
|
|
r124 | const fusedProducer = fuse(producer); | |
|
|
r125 | let subscribers: Sink<T>[] = []; | |
| let cleanup = noop; | |||
|
|
r124 | ||
|
|
r125 | const sink: Sink<T> = { | |
| isClosed: () => false, | |||
| complete: () => { | |||
| const _subscribers = subscribers; | |||
| subscribers = []; | |||
| _subscribers.forEach(s => s.complete()); | |||
| cleanup(); | |||
| }, | |||
| error: e => { | |||
| const _subscribers = subscribers; | |||
| subscribers = []; | |||
| _subscribers.forEach(s => s.error(e)); | |||
| cleanup(); | |||
| }, | |||
| next: v => { | |||
| const _subscribers = subscribers; | |||
| _subscribers.forEach(s => s.next(v)); | |||
| } | |||
| }; | |||
| return client => { | |||
| const _subscribers = subscribers; | |||
| subscribers.push(client); | |||
| if (subscribers.length === 1) | |||
| cleanup = fusedProducer(sink) ?? noop; | |||
| return () => { | |||
| if (_subscribers === subscribers) { | |||
| const pos = subscribers.indexOf(client); | |||
| if (pos >= 0) | |||
|
|
r129 | subscribers.splice(pos, 1); | |
|
|
r125 | if (!subscribers.length) | |
| cleanup(); | |||
| } | |||
| }; | |||
|
|
r124 | }; | |
| }; |
