observable.ts
402 lines
| 12.4 KiB
| video/mp2t
|
TypeScriptLexer
cin
|
r116 | import { Cancellation } from "@implab/core-amd/Cancellation"; | ||
import { ICancellation } from "@implab/core-amd/interfaces"; | ||||
cin
|
r129 | import { isPromise } from "@implab/core-amd/safe"; | ||
cin
|
r110 | |||
cin
|
r102 | /** | ||
* The interface for the consumer of an observable sequence | ||||
*/ | ||||
export interface Observer<T> { | ||||
/** | ||||
* Called for the next element in the sequence | ||||
*/ | ||||
cin
|
r133 | next?: (value: T) => void; | ||
cin
|
r96 | |||
cin
|
r102 | /** | ||
* Called once when the error occurs in the sequence. | ||||
*/ | ||||
cin
|
r133 | error?: (e: unknown) => void; | ||
cin
|
r102 | |||
/** | ||||
* Called once at the end of the sequence. | ||||
*/ | ||||
cin
|
r133 | complete?: () => void; | ||
cin
|
r96 | } | ||
cin
|
r102 | /** | ||
cin
|
r110 | * The group of functions to feed an observable. These methods are provided to | ||
cin
|
r102 | * the producer to generate a stream of events. | ||
*/ | ||||
export type Sink<T> = { | ||||
cin
|
r110 | /** | ||
* Call to send the next element in the sequence | ||||
*/ | ||||
cin
|
r116 | next: (value: T) => void; | ||
/** | ||||
* Call to notify about the error occurred in the sequence. | ||||
*/ | ||||
error: (e: unknown) => void; | ||||
cin
|
r110 | |||
cin
|
r116 | /** | ||
* Call to signal the end of the sequence. | ||||
*/ | ||||
complete: () => void; | ||||
cin
|
r110 | |||
cin
|
r116 | /** | ||
* Checks whether the sink is accepting new elements. It's safe to | ||||
* send elements to the closed sink. | ||||
*/ | ||||
isClosed: () => boolean; | ||||
cin
|
r102 | }; | ||
cin
|
r96 | |||
export type Producer<T> = (sink: Sink<T>) => (void | (() => void)); | ||||
cin
|
r102 | export interface Unsubscribable { | ||
unsubscribe(): void; | ||||
} | ||||
cin
|
r118 | export const isUnsubscribable = (v: unknown): v is Unsubscribable => | ||
cin
|
r102 | v !== null && v !== undefined && typeof (v as Unsubscribable).unsubscribe === "function"; | ||
cin
|
r118 | export const isSubscribable = <T = unknown>(v: unknown): v is Subscribable<T> => | ||
cin
|
r102 | v !== null && v !== undefined && typeof (v as Subscribable<unknown>).subscribe === "function"; | ||
cin
|
r96 | |||
cin
|
r102 | export interface Subscribable<T> { | ||
cin
|
r133 | subscribe(consumer: Observer<T>): Unsubscribable; | ||
cin
|
r102 | } | ||
cin
|
r116 | export type AccumulatorFn<T, A> = (acc: A, value: T) => A; | ||
cin
|
r124 | export type OperatorFn<T, U> = (source: Observable<T>) => Producer<U>; | ||
cin
|
r102 | /** The observable source of items. */ | ||
export interface Observable<T> extends Subscribable<T> { | ||||
/** Transforms elements of the sequence with the specified mapper | ||||
* | ||||
* @param mapper The mapper used to transform the values | ||||
*/ | ||||
map<T2>(mapper: (value: T) => T2): Observable<T2>; | ||||
cin
|
r96 | |||
cin
|
r102 | /** Filters elements of the sequence. The resulting sequence will | ||
* contain only elements which match the specified predicate. | ||||
* | ||||
* @param predicate The filter predicate. | ||||
*/ | ||||
filter(predicate: (value: T) => boolean): Observable<T>; | ||||
cin
|
r96 | |||
cin
|
r124 | /** Completes the sequence once the condition is met. | ||
* @param predicate The condition which should be met to complete the sequence | ||||
*/ | ||||
until(predicate: (value: T) => boolean): Observable<T>; | ||||
/** Keeps the sequence running while elements satisfy the condition. | ||||
* | ||||
* @param predicate The condition which should be met to continue. | ||||
*/ | ||||
while(predicate: (value: T) => boolean): Observable<T>; | ||||
cin
|
r102 | /** Applies accumulator to each value in the sequence and | ||
* emits the accumulated value for each source element | ||||
* | ||||
* @param accumulator | ||||
* @param initial | ||||
*/ | ||||
cin
|
r116 | scan<A>(accumulator: AccumulatorFn<T, A>, initial: A): Observable<A>; | ||
scan(accumulator: AccumulatorFn<T, T>): Observable<T>; | ||||
cin
|
r110 | |||
cin
|
r116 | /** Applies accumulator to each value in the sequence and | ||
* emits the accumulated value at the end of the sequence | ||||
* | ||||
* @param accumulator | ||||
* @param initial | ||||
*/ | ||||
reduce<A>(accumulator: AccumulatorFn<T, A>, initial: A): Observable<A>; | ||||
reduce(accumulator: AccumulatorFn<T, T>): Observable<T>; | ||||
/** Concatenates the specified sequences with this observable | ||||
* | ||||
* @param seq sequences to concatenate with the current observable | ||||
cin
|
r124 | * | ||
* The concatenation doesn't accumulate values from the specified sequences, | ||||
* The result of the concatenation is the new observable which will switch | ||||
* to the next observable after the previous one completes. Values emitted | ||||
* before the next observable being active are lost. | ||||
cin
|
r116 | */ | ||
cin
|
r110 | cat(...seq: Subscribable<T>[]): Observable<T>; | ||
cin
|
r114 | |||
cin
|
r124 | |||
cin
|
r116 | /** Pipes the specified operator to produce the new observable | ||
cin
|
r124 | * @param op The operator consumes this observable and produces a new one | ||
* | ||||
* The operator is a higher order function which takes a source observable | ||||
* and returns a producer for the new observable. | ||||
* | ||||
* This function can be used to create a complex mapping between source and | ||||
* resulting observables. The operator may have a state (or a side effect) | ||||
* and can be connected to multiple observables. | ||||
cin
|
r116 | */ | ||
cin
|
r124 | pipe<U>(op: OperatorFn<T, U>): Observable<U>; | ||
cin
|
r116 | |||
/** Waits for the next event to occur and returns a promise for the next value | ||||
cin
|
r129 | * @param ct Cancellation token | ||
cin
|
r116 | */ | ||
next(ct?: ICancellation): Promise<T>; | ||||
cin
|
r129 | |||
/** Collects items of the sequence to the array. */ | ||||
collect(ct?: ICancellation): Promise<T[]>; | ||||
cin
|
r102 | } | ||
const noop = () => { }; | ||||
cin
|
r133 | const sink = <T>(consumer: Observer<T>) => { | ||
cin
|
r102 | const { next, error, complete } = consumer; | ||
cin
|
r96 | return { | ||
cin
|
r102 | next: next ? next.bind(consumer) : noop, | ||
error: error ? error.bind(consumer) : noop, | ||||
cin
|
r110 | complete: complete ? complete.bind(consumer) : noop, | ||
isClosed: () => false | ||||
cin
|
r109 | }; | ||
cin
|
r102 | }; | ||
cin
|
r110 | /** Wraps the producer to handle tear down logic and subscription management | ||
* | ||||
* @param producer The producer to wrap | ||||
* @returns The wrapper producer | ||||
*/ | ||||
const fuse = <T>(producer: Producer<T>) => ({ next, error, complete }: Sink<T>) => { | ||||
cin
|
r102 | let done = false; | ||
cin
|
r110 | let cleanup = noop; | ||
const _fin = <A extends unknown[]>(fn: (...args: A) => void) => | ||||
(...args: A) => done ? | ||||
void (0) : | ||||
(done = true, cleanup(), fn(...args)); | ||||
cin
|
r124 | const _fin0 = () => done ? void (0) : (done = true, cleanup()); | ||
cin
|
r110 | const safeSink = { | ||
cin
|
r109 | next: (value: T) => { !done && next(value); }, | ||
cin
|
r110 | error: _fin(error), | ||
complete: _fin(complete), | ||||
isClosed: () => done | ||||
cin
|
r109 | }; | ||
cin
|
r110 | cleanup = producer(safeSink) ?? noop; | ||
cin
|
r124 | return done ? cleanup() : _fin0; | ||
cin
|
r109 | }; | ||
cin
|
r96 | |||
cin
|
r102 | const _observe = <T>(producer: Producer<T>): Observable<T> => ({ | ||
cin
|
r133 | subscribe: (consumer: Observer<T>) => ({ | ||
cin
|
r102 | unsubscribe: producer(sink(consumer)) ?? noop | ||
}), | ||||
cin
|
r116 | |||
cin
|
r110 | map: (mapper) => _observe(({ next, ...rest }) => | ||
cin
|
r102 | producer({ | ||
next: next !== noop ? (v: T) => next(mapper(v)) : noop, | ||||
cin
|
r110 | ...rest | ||
}) | ||||
), | ||||
cin
|
r116 | |||
cin
|
r110 | filter: (predicate) => _observe(({ next, ...rest }) => | ||
producer({ | ||||
next: next !== noop ? (v: T) => predicate(v) ? next(v) : void (0) : noop, | ||||
...rest | ||||
cin
|
r102 | }) | ||
), | ||||
cin
|
r116 | |||
cin
|
r124 | until: predicate => _observe(({ next, complete, ...rest }) => | ||
producer({ | ||||
next: v => predicate(v) ? complete() : next(v), | ||||
complete, | ||||
...rest | ||||
}) | ||||
), | ||||
while: predicate => _observe(({ next, complete, ...rest }) => | ||||
producer({ | ||||
next: v => predicate(v) ? next(v) : complete(), | ||||
complete, | ||||
...rest | ||||
}) | ||||
), | ||||
cin
|
r116 | scan: <A>(...args: [AccumulatorFn<T, A>, A] | [AccumulatorFn<T, T>]) => _observe<T | A>(({ next, ...rest }) => { | ||
if (args.length === 1) { | ||||
const [accumulator] = args; | ||||
let _acc: T; | ||||
let index = 0; | ||||
return producer({ | ||||
next: next !== noop ? (v: T) => next(index++ === 0 ? _acc = v : _acc = accumulator(_acc, v)) : noop, | ||||
...rest | ||||
}); | ||||
} else { | ||||
const [accumulator, initial] = args; | ||||
let _acc = initial; | ||||
return producer({ | ||||
next: next !== noop ? (v: T) => next(_acc = accumulator(_acc, v)) : noop, | ||||
...rest | ||||
}); | ||||
} | ||||
}), | ||||
reduce: <A>(...args: [AccumulatorFn<T, A>, A] | [AccumulatorFn<T, T>]) => _observe<T | A>(({ next, complete, error, ...rest }) => { | ||||
if (args.length === 1) { | ||||
const [accumulator] = args; | ||||
let _acc: T; | ||||
let index = 0; | ||||
return producer({ | ||||
next: next !== noop ? (v: T) => { | ||||
_acc = index++ === 0 ? v : accumulator(_acc, v); | ||||
} : noop, | ||||
complete: () => { | ||||
if (index === 0) { | ||||
error(new Error("The sequence can't be empty")); | ||||
} else { | ||||
next(_acc); | ||||
complete(); | ||||
} | ||||
}, | ||||
error, | ||||
...rest | ||||
}); | ||||
} else { | ||||
const [accumulator, initial] = args; | ||||
let _acc = initial; | ||||
return producer({ | ||||
next: next !== noop ? (v: T) => { | ||||
_acc = accumulator(_acc, v); | ||||
} : noop, | ||||
complete: () => { | ||||
next(_acc); | ||||
complete(); | ||||
}, | ||||
error, | ||||
...rest | ||||
}); | ||||
} | ||||
cin
|
r110 | }), | ||
cat: (...seq) => _observe(({ next, complete: final, ...rest }) => { | ||||
let cleanup: () => void; | ||||
const complete = () => { | ||||
const continuation = seq.shift(); | ||||
if (continuation) { | ||||
// if we have a next sequence, subscribe to it | ||||
const subscription = continuation.subscribe({ next, complete, ...rest }); | ||||
cleanup = subscription.unsubscribe.bind(subscription); | ||||
} else { | ||||
// otherwise notify the consumer about completion | ||||
final(); | ||||
} | ||||
}; | ||||
cleanup = producer({ next, complete, ...rest }) ?? noop; | ||||
return () => cleanup(); | ||||
cin
|
r114 | }), | ||
cin
|
r133 | pipe: <U>(op: OperatorFn<T,U>) => observe(op(_observe(producer))), | ||
cin
|
r110 | |||
cin
|
r129 | next: collect( | ||
producer, | ||||
({ next, complete, error, isClosed }) => ({ | ||||
next: v => (next(v), complete()), | ||||
complete: () => error(new Error("The sequence is empty")), | ||||
error, | ||||
isClosed | ||||
}) | ||||
), | ||||
collect: collect( | ||||
producer, | ||||
({ next, complete, ...rest }) => { | ||||
const data: T[] = []; | ||||
return { | ||||
next: v => data.push(v), | ||||
complete: () => (next(data), complete()), | ||||
...rest | ||||
}; | ||||
} | ||||
) | ||||
}); | ||||
cin
|
r110 | |||
cin
|
r129 | const collect = <T, U>( | ||
producer: Producer<T>, | ||||
collector: (result: Sink<U>) => Sink<T> | ||||
) => (ct = Cancellation.none) => new Promise<U>((resolve, reject) => { | ||||
const fused = fuse<U>(({ next, complete, error, isClosed }) => { | ||||
const h = ct.register(error); | ||||
const cleanup = !isClosed() ? | ||||
producer(collector({ next, complete, error, isClosed })) ?? noop : | ||||
noop; | ||||
cin
|
r110 | |||
cin
|
r129 | return () => { | ||
h.destroy(); | ||||
cleanup(); | ||||
}; | ||||
}); | ||||
cin
|
r110 | |||
cin
|
r129 | fused({ | ||
next: resolve, | ||||
error: reject, | ||||
complete: noop, | ||||
isClosed: () => false | ||||
}); | ||||
cin
|
r116 | }); | ||
cin
|
r110 | |||
export const observe = <T>(producer: Producer<T>) => _observe(fuse(producer)); | ||||
cin
|
r133 | /** Converts an array to the observable sequence of its elements. */ | ||
cin
|
r129 | export const ofArray = <T>(items: T[]) => _observe<T>( | ||
cin
|
r116 | ({ next, complete }) => ( | ||
items.forEach(next), | ||||
complete() | ||||
) | ||||
); | ||||
cin
|
r110 | |||
cin
|
r133 | /** Converts a subscribable to the observable */ | ||
export const ofSubscribable = <T>(subscribable: Subscribable<T>) => | ||||
observe(sink => { | ||||
const subscription = subscribable.subscribe(sink); | ||||
return () => subscription.unsubscribe(); | ||||
}); | ||||
cin
|
r129 | const of1 = <T>(item: T | PromiseLike<T>) => observe<T>( | ||
({ next, error, complete }) => | ||||
isPromise(item) ? | ||||
void item.then( | ||||
v => (next(v), complete()), | ||||
error | ||||
) : | ||||
(next(item), complete()) | ||||
cin
|
r116 | ); | ||
cin
|
r110 | |||
cin
|
r133 | /** Converts a list of parameter values to the observable sequence. The | ||
* order of elements in the list will be preserved in the resulting sequence. | ||||
*/ | ||||
cin
|
r129 | export const of = <T>(...items: (T | PromiseLike<T>)[]) => items.length === 1 ? | ||
of1(items[0]) : | ||||
observe<T>( | ||||
({ next, error, complete, isClosed }) => { | ||||
const n = items.length; | ||||
const _next = (start: number) => { | ||||
if (start > 0 && isClosed()) // when resumed | ||||
return; | ||||
for (let i = start; i < n; i++) { | ||||
const r = items[i]; | ||||
if (isPromise(r)) { | ||||
r.then(v => (next(v), _next(i + 1)), error); | ||||
return; // suspend | ||||
} else { | ||||
next(r); | ||||
} | ||||
} | ||||
complete(); | ||||
}; | ||||
_next(0); | ||||
} | ||||
); | ||||
cin
|
r110 | |||
cin
|
r124 | export const empty = _observe<never>(({ complete }) => complete()); | ||