observable.ts
482 lines
| 15.0 KiB
| video/mp2t
|
TypeScriptLexer
cin
|
r116 | import { Cancellation } from "@implab/core-amd/Cancellation"; | |
import { ICancellation } from "@implab/core-amd/interfaces"; | |||
cin
|
r110 | ||
cin
|
r102 | /** | |
* The interface for the consumer of an observable sequence | |||
*/ | |||
export interface Observer<T> { | |||
/** | |||
* Called for the next element in the sequence | |||
*/ | |||
next: (value: T) => void; | |||
cin
|
r96 | ||
cin
|
r102 | /** | |
* Called once when the error occurs in the sequence. | |||
*/ | |||
cin
|
r96 | error: (e: unknown) => void; | |
cin
|
r102 | ||
/** | |||
* Called once at the end of the sequence. | |||
*/ | |||
cin
|
r96 | complete: () => void; | |
} | |||
cin
|
r102 | /** | |
cin
|
r110 | * The group of functions to feed an observable. These methods are provided to | |
cin
|
r102 | * the producer to generate a stream of events. | |
*/ | |||
export type Sink<T> = { | |||
cin
|
r110 | /** | |
* Call to send the next element in the sequence | |||
*/ | |||
cin
|
r116 | next: (value: T) => void; | |
/** | |||
* Call to notify about the error occurred in the sequence. | |||
*/ | |||
error: (e: unknown) => void; | |||
cin
|
r110 | ||
cin
|
r116 | /** | |
* Call to signal the end of the sequence. | |||
*/ | |||
complete: () => void; | |||
cin
|
r110 | ||
cin
|
r116 | /** | |
* Checks whether the sink is accepting new elements. It's safe to | |||
* send elements to the closed sink. | |||
*/ | |||
isClosed: () => boolean; | |||
cin
|
r102 | }; | |
cin
|
r96 | ||
export type Producer<T> = (sink: Sink<T>) => (void | (() => void)); | |||
cin
|
r102 | export interface Unsubscribable { | |
unsubscribe(): void; | |||
} | |||
cin
|
r118 | export const isUnsubscribable = (v: unknown): v is Unsubscribable => | |
cin
|
r102 | v !== null && v !== undefined && typeof (v as Unsubscribable).unsubscribe === "function"; | |
cin
|
r118 | export const isSubscribable = <T = unknown>(v: unknown): v is Subscribable<T> => | |
cin
|
r102 | v !== null && v !== undefined && typeof (v as Subscribable<unknown>).subscribe === "function"; | |
cin
|
r96 | ||
cin
|
r102 | export interface Subscribable<T> { | |
subscribe(consumer: Partial<Observer<T>>): Unsubscribable; | |||
} | |||
cin
|
r116 | export type AccumulatorFn<T, A> = (acc: A, value: T) => A; | |
cin
|
r124 | export type OperatorFn<T, U> = (source: Observable<T>) => Producer<U>; | |
cin
|
r102 | /** The observable source of items. */ | |
export interface Observable<T> extends Subscribable<T> { | |||
/** Transforms elements of the sequence with the specified mapper | |||
* | |||
* @param mapper The mapper used to transform the values | |||
*/ | |||
map<T2>(mapper: (value: T) => T2): Observable<T2>; | |||
cin
|
r96 | ||
cin
|
r102 | /** Filters elements of the sequence. The resulting sequence will | |
* contain only elements which match the specified predicate. | |||
* | |||
* @param predicate The filter predicate. | |||
*/ | |||
filter(predicate: (value: T) => boolean): Observable<T>; | |||
cin
|
r96 | ||
cin
|
r124 | /** Completes the sequence once the condition is met. | |
* @param predicate The condition which should be met to complete the sequence | |||
*/ | |||
until(predicate: (value: T) => boolean): Observable<T>; | |||
/** Keeps the sequence running while elements satisfy the condition. | |||
* | |||
* @param predicate The condition which should be met to continue. | |||
*/ | |||
while(predicate: (value: T) => boolean): Observable<T>; | |||
cin
|
r102 | /** Applies accumulator to each value in the sequence and | |
* emits the accumulated value for each source element | |||
* | |||
* @param accumulator | |||
* @param initial | |||
*/ | |||
cin
|
r116 | scan<A>(accumulator: AccumulatorFn<T, A>, initial: A): Observable<A>; | |
scan(accumulator: AccumulatorFn<T, T>): Observable<T>; | |||
cin
|
r110 | ||
cin
|
r116 | /** Applies accumulator to each value in the sequence and | |
* emits the accumulated value at the end of the sequence | |||
* | |||
* @param accumulator | |||
* @param initial | |||
*/ | |||
reduce<A>(accumulator: AccumulatorFn<T, A>, initial: A): Observable<A>; | |||
reduce(accumulator: AccumulatorFn<T, T>): Observable<T>; | |||
/** Concatenates the specified sequences with this observable | |||
* | |||
* @param seq sequences to concatenate with the current observable | |||
cin
|
r124 | * | |
* The concatenation doesn't accumulate values from the specified sequences, | |||
* The result of the concatenation is the new observable which will switch | |||
* to the next observable after the previous one completes. Values emitted | |||
* before the next observable being active are lost. | |||
cin
|
r116 | */ | |
cin
|
r110 | cat(...seq: Subscribable<T>[]): Observable<T>; | |
cin
|
r114 | ||
cin
|
r124 | ||
cin
|
r116 | /** Pipes the specified operator to produce the new observable | |
cin
|
r124 | * @param op The operator consumes this observable and produces a new one | |
* | |||
* The operator is a higher order function which takes a source observable | |||
* and returns a producer for the new observable. | |||
* | |||
* This function can be used to create a complex mapping between source and | |||
* resulting observables. The operator may have a state (or a side effect) | |||
* and can be connected to multiple observables. | |||
cin
|
r116 | */ | |
cin
|
r124 | pipe<U>(op: OperatorFn<T, U>): Observable<U>; | |
cin
|
r116 | ||
/** Waits for the next event to occur and returns a promise for the next value | |||
* @param ct Cancellation token to | |||
*/ | |||
next(ct?: ICancellation): Promise<T>; | |||
cin
|
r102 | } | |
const noop = () => { }; | |||
const sink = <T>(consumer: Partial<Observer<T>>) => { | |||
const { next, error, complete } = consumer; | |||
cin
|
r96 | return { | |
cin
|
r102 | next: next ? next.bind(consumer) : noop, | |
error: error ? error.bind(consumer) : noop, | |||
cin
|
r110 | complete: complete ? complete.bind(consumer) : noop, | |
isClosed: () => false | |||
cin
|
r109 | }; | |
cin
|
r102 | }; | |
cin
|
r110 | /** Wraps the producer to handle tear down logic and subscription management | |
* | |||
* @param producer The producer to wrap | |||
* @returns The wrapper producer | |||
*/ | |||
const fuse = <T>(producer: Producer<T>) => ({ next, error, complete }: Sink<T>) => { | |||
cin
|
r102 | let done = false; | |
cin
|
r110 | let cleanup = noop; | |
const _fin = <A extends unknown[]>(fn: (...args: A) => void) => | |||
(...args: A) => done ? | |||
void (0) : | |||
(done = true, cleanup(), fn(...args)); | |||
cin
|
r124 | const _fin0 = () => done ? void (0) : (done = true, cleanup()); | |
cin
|
r110 | const safeSink = { | |
cin
|
r109 | next: (value: T) => { !done && next(value); }, | |
cin
|
r110 | error: _fin(error), | |
complete: _fin(complete), | |||
isClosed: () => done | |||
cin
|
r109 | }; | |
cin
|
r110 | cleanup = producer(safeSink) ?? noop; | |
cin
|
r124 | return done ? cleanup() : _fin0; | |
cin
|
r109 | }; | |
cin
|
r96 | ||
cin
|
r102 | const _observe = <T>(producer: Producer<T>): Observable<T> => ({ | |
subscribe: (consumer: Partial<Observer<T>>) => ({ | |||
unsubscribe: producer(sink(consumer)) ?? noop | |||
}), | |||
cin
|
r116 | ||
cin
|
r110 | map: (mapper) => _observe(({ next, ...rest }) => | |
cin
|
r102 | producer({ | |
next: next !== noop ? (v: T) => next(mapper(v)) : noop, | |||
cin
|
r110 | ...rest | |
}) | |||
), | |||
cin
|
r116 | ||
cin
|
r110 | filter: (predicate) => _observe(({ next, ...rest }) => | |
producer({ | |||
next: next !== noop ? (v: T) => predicate(v) ? next(v) : void (0) : noop, | |||
...rest | |||
cin
|
r102 | }) | |
), | |||
cin
|
r116 | ||
cin
|
r124 | until: predicate => _observe(({ next, complete, ...rest }) => | |
producer({ | |||
next: v => predicate(v) ? complete() : next(v), | |||
complete, | |||
...rest | |||
}) | |||
), | |||
while: predicate => _observe(({ next, complete, ...rest }) => | |||
producer({ | |||
next: v => predicate(v) ? next(v) : complete(), | |||
complete, | |||
...rest | |||
}) | |||
), | |||
cin
|
r116 | scan: <A>(...args: [AccumulatorFn<T, A>, A] | [AccumulatorFn<T, T>]) => _observe<T | A>(({ next, ...rest }) => { | |
if (args.length === 1) { | |||
const [accumulator] = args; | |||
let _acc: T; | |||
let index = 0; | |||
return producer({ | |||
next: next !== noop ? (v: T) => next(index++ === 0 ? _acc = v : _acc = accumulator(_acc, v)) : noop, | |||
...rest | |||
}); | |||
} else { | |||
const [accumulator, initial] = args; | |||
let _acc = initial; | |||
return producer({ | |||
next: next !== noop ? (v: T) => next(_acc = accumulator(_acc, v)) : noop, | |||
...rest | |||
}); | |||
} | |||
}), | |||
reduce: <A>(...args: [AccumulatorFn<T, A>, A] | [AccumulatorFn<T, T>]) => _observe<T | A>(({ next, complete, error, ...rest }) => { | |||
if (args.length === 1) { | |||
const [accumulator] = args; | |||
let _acc: T; | |||
let index = 0; | |||
return producer({ | |||
next: next !== noop ? (v: T) => { | |||
_acc = index++ === 0 ? v : accumulator(_acc, v); | |||
} : noop, | |||
complete: () => { | |||
if (index === 0) { | |||
error(new Error("The sequence can't be empty")); | |||
} else { | |||
next(_acc); | |||
complete(); | |||
} | |||
}, | |||
error, | |||
...rest | |||
}); | |||
} else { | |||
const [accumulator, initial] = args; | |||
let _acc = initial; | |||
return producer({ | |||
next: next !== noop ? (v: T) => { | |||
_acc = accumulator(_acc, v); | |||
} : noop, | |||
complete: () => { | |||
next(_acc); | |||
complete(); | |||
}, | |||
error, | |||
...rest | |||
}); | |||
} | |||
cin
|
r110 | }), | |
cat: (...seq) => _observe(({ next, complete: final, ...rest }) => { | |||
let cleanup: () => void; | |||
const complete = () => { | |||
const continuation = seq.shift(); | |||
if (continuation) { | |||
// if we have a next sequence, subscribe to it | |||
const subscription = continuation.subscribe({ next, complete, ...rest }); | |||
cleanup = subscription.unsubscribe.bind(subscription); | |||
} else { | |||
// otherwise notify the consumer about completion | |||
final(); | |||
} | |||
}; | |||
cleanup = producer({ next, complete, ...rest }) ?? noop; | |||
return () => cleanup(); | |||
cin
|
r114 | }), | |
cin
|
r116 | pipe: <U>(op: (source: Observable<T>) => Producer<U>) => observe(op(_observe(producer))), | |
cin
|
r110 | ||
cin
|
r116 | next: (ct?: ICancellation) => { | |
const _ct = ct ?? Cancellation.none; | |||
return new Promise<T>((resolve, reject) => { | |||
// wrap the producer to handle only single event | |||
const once = fuse<T>(({ next, complete, error, isClosed }) => { | |||
const h = _ct.register(error); | |||
cin
|
r110 | ||
cin
|
r116 | // is the _ct fires it will call error() and isClosed() will return true | |
const cleanup = !isClosed() ? | |||
producer({ | |||
next: v => (next(v), complete()), | |||
complete: () => error(new Error("The sequence is empty")), | |||
error, | |||
isClosed | |||
}) ?? noop : | |||
noop; | |||
cin
|
r110 | ||
cin
|
r116 | return () => { | |
h.destroy(); | |||
cleanup(); | |||
}; | |||
}); | |||
cin
|
r110 | ||
cin
|
r116 | once({ | |
next: resolve, | |||
error: reject, | |||
complete: noop, | |||
isClosed: () => false | |||
}); | |||
}); | |||
} | |||
}); | |||
cin
|
r110 | ||
export const observe = <T>(producer: Producer<T>) => _observe(fuse(producer)); | |||
cin
|
r116 | export const streamArray = <T>(items: T[]) => _observe<T>( | |
({ next, complete }) => ( | |||
items.forEach(next), | |||
complete() | |||
) | |||
); | |||
cin
|
r110 | ||
cin
|
r116 | export const streamPromise = <T>(promise: PromiseLike<T>) => observe<T>( | |
cin
|
r124 | ({ next, error, complete }) => void promise.then( | |
v => (next(v), complete()), | |||
error | |||
) | |||
cin
|
r116 | ); | |
cin
|
r110 | ||
cin
|
r116 | export const of = <T>(...items: T[]) => _observe<T>( | |
({ next, complete }) => ( | |||
items.forEach(next), | |||
complete() | |||
) | |||
); | |||
cin
|
r110 | ||
cin
|
r124 | export const empty = _observe<never>(({ complete }) => complete()); | |
/** | |||
* Creates a mutable state and the observable for the stored value. | |||
* | |||
* @param value The initial value for the state | |||
* @returns an array of three elements `[observable, setter, getter]` | |||
* | |||
* The returned observable keeps the actual value and will emit it as the next | |||
* element each time a consumer subscribes the observable. | |||
* | |||
* Calling the setter will update the stored value in the observable and notify | |||
* all consumers. | |||
*/ | |||
export const stateful = <T>(producer: Producer<T>): Producer<T> => { | |||
const fusedProducer = fuse(producer); | |||
type Status = "active" | "complete" | "error"; | |||
let lastValue: T; | |||
let hasValue = false; | |||
let status: Status = "active"; | |||
let lastError: unknown; | |||
let subscribers: Sink<T>[] = []; | |||
const sink: Sink<T> = { | |||
isClosed: () => status !== "active", | |||
complete: () => { | |||
if (status === "active") { | |||
status = "complete"; | |||
const _subscribers = subscribers; | |||
subscribers = []; | |||
_subscribers.forEach(s => s.complete()); | |||
} | |||
}, | |||
error: e => { | |||
if (status === "active") { | |||
status = "error"; | |||
lastError = e; | |||
const _subscribers = subscribers; | |||
subscribers = []; | |||
_subscribers.forEach(s => s.error(e)); | |||
} | |||
}, | |||
next: v => { | |||
if (status === "active") { | |||
hasValue = true; | |||
lastValue = v; | |||
const _subscribers = subscribers; | |||
_subscribers.forEach(s => s.next(v)); | |||
} | |||
} | |||
}; | |||
fusedProducer(sink); | |||
return (s: Sink<T>) => { | |||
const _subscribers = subscribers; | |||
switch (status) { | |||
case "active": | |||
if (hasValue) | |||
s.next(lastValue); // if hasValue is true, | |||
cin
|
r125 | // lastValue has a valid value | |
cin
|
r124 | subscribers.push(s); | |
return () => { | |||
if (_subscribers === subscribers) { | |||
const pos = subscribers.indexOf(s); | |||
if (pos >= 0) | |||
subscribers.splice(pos, 1); | |||
} | |||
}; | |||
case "complete": | |||
s.complete(); | |||
break; | |||
case "error": | |||
s.error(lastError); | |||
break; | |||
} | |||
}; | |||
}; | |||
cin
|
r125 | /** Create the producer which will be called once when the first subscriber is | |
* attached, next subscribers would share the same producer. When all | |||
* subscribers are removed the producer will be cleaned up. | |||
* | |||
* Use this wrapper to prevent spawning multiple producers. | |||
* | |||
* @param producer The source producer | |||
* @returns The wrapped producer | |||
*/ | |||
export const subject = <T>(producer: Producer<T>): Producer<T> => { | |||
cin
|
r124 | const fusedProducer = fuse(producer); | |
cin
|
r125 | let subscribers: Sink<T>[] = []; | |
let cleanup = noop; | |||
cin
|
r124 | ||
cin
|
r125 | const sink: Sink<T> = { | |
isClosed: () => false, | |||
complete: () => { | |||
const _subscribers = subscribers; | |||
subscribers = []; | |||
_subscribers.forEach(s => s.complete()); | |||
cleanup(); | |||
}, | |||
error: e => { | |||
const _subscribers = subscribers; | |||
subscribers = []; | |||
_subscribers.forEach(s => s.error(e)); | |||
cleanup(); | |||
}, | |||
next: v => { | |||
const _subscribers = subscribers; | |||
_subscribers.forEach(s => s.next(v)); | |||
} | |||
}; | |||
return client => { | |||
const _subscribers = subscribers; | |||
subscribers.push(client); | |||
if (subscribers.length === 1) | |||
cleanup = fusedProducer(sink) ?? noop; | |||
return () => { | |||
if (_subscribers === subscribers) { | |||
const pos = subscribers.indexOf(client); | |||
if (pos >= 0) | |||
subscribers.splice(pos,1); | |||
if (!subscribers.length) | |||
cleanup(); | |||
} | |||
}; | |||
cin
|
r124 | }; | |
}; |