observable.ts
305 lines
| 9.4 KiB
| video/mp2t
|
TypeScriptLexer
cin
|
r116 | import { Cancellation } from "@implab/core-amd/Cancellation"; | |
import { ICancellation } from "@implab/core-amd/interfaces"; | |||
cin
|
r110 | ||
cin
|
r102 | /** | |
* The interface for the consumer of an observable sequence | |||
*/ | |||
export interface Observer<T> { | |||
/** | |||
* Called for the next element in the sequence | |||
*/ | |||
next: (value: T) => void; | |||
cin
|
r96 | ||
cin
|
r102 | /** | |
* Called once when the error occurs in the sequence. | |||
*/ | |||
cin
|
r96 | error: (e: unknown) => void; | |
cin
|
r102 | ||
/** | |||
* Called once at the end of the sequence. | |||
*/ | |||
cin
|
r96 | complete: () => void; | |
} | |||
cin
|
r102 | /** | |
cin
|
r110 | * The group of functions to feed an observable. These methods are provided to | |
cin
|
r102 | * the producer to generate a stream of events. | |
*/ | |||
export type Sink<T> = { | |||
cin
|
r110 | /** | |
* Call to send the next element in the sequence | |||
*/ | |||
cin
|
r116 | next: (value: T) => void; | |
/** | |||
* Call to notify about the error occurred in the sequence. | |||
*/ | |||
error: (e: unknown) => void; | |||
cin
|
r110 | ||
cin
|
r116 | /** | |
* Call to signal the end of the sequence. | |||
*/ | |||
complete: () => void; | |||
cin
|
r110 | ||
cin
|
r116 | /** | |
* Checks whether the sink is accepting new elements. It's safe to | |||
* send elements to the closed sink. | |||
*/ | |||
isClosed: () => boolean; | |||
cin
|
r102 | }; | |
cin
|
r96 | ||
export type Producer<T> = (sink: Sink<T>) => (void | (() => void)); | |||
cin
|
r102 | export interface Unsubscribable { | |
unsubscribe(): void; | |||
} | |||
export const isUnsubsribable = (v: unknown): v is Unsubscribable => | |||
v !== null && v !== undefined && typeof (v as Unsubscribable).unsubscribe === "function"; | |||
cin
|
r107 | export const isSubsribable = <T = unknown>(v: unknown): v is Subscribable<T> => | |
cin
|
r102 | v !== null && v !== undefined && typeof (v as Subscribable<unknown>).subscribe === "function"; | |
cin
|
r96 | ||
cin
|
r102 | export interface Subscribable<T> { | |
subscribe(consumer: Partial<Observer<T>>): Unsubscribable; | |||
} | |||
cin
|
r116 | export type AccumulatorFn<T, A> = (acc: A, value: T) => A; | |
cin
|
r102 | /** The observable source of items. */ | |
export interface Observable<T> extends Subscribable<T> { | |||
/** Transforms elements of the sequence with the specified mapper | |||
* | |||
* @param mapper The mapper used to transform the values | |||
*/ | |||
map<T2>(mapper: (value: T) => T2): Observable<T2>; | |||
cin
|
r96 | ||
cin
|
r102 | /** Filters elements of the sequence. The resulting sequence will | |
* contain only elements which match the specified predicate. | |||
* | |||
* @param predicate The filter predicate. | |||
*/ | |||
filter(predicate: (value: T) => boolean): Observable<T>; | |||
cin
|
r96 | ||
cin
|
r102 | /** Applies accumulator to each value in the sequence and | |
* emits the accumulated value for each source element | |||
* | |||
* @param accumulator | |||
* @param initial | |||
*/ | |||
cin
|
r116 | scan<A>(accumulator: AccumulatorFn<T, A>, initial: A): Observable<A>; | |
scan(accumulator: AccumulatorFn<T, T>): Observable<T>; | |||
cin
|
r110 | ||
cin
|
r116 | /** Applies accumulator to each value in the sequence and | |
* emits the accumulated value at the end of the sequence | |||
* | |||
* @param accumulator | |||
* @param initial | |||
*/ | |||
reduce<A>(accumulator: AccumulatorFn<T, A>, initial: A): Observable<A>; | |||
reduce(accumulator: AccumulatorFn<T, T>): Observable<T>; | |||
/** Concatenates the specified sequences with this observable | |||
* | |||
* @param seq sequences to concatenate with the current observable | |||
*/ | |||
cin
|
r110 | cat(...seq: Subscribable<T>[]): Observable<T>; | |
cin
|
r114 | ||
cin
|
r116 | /** Pipes the specified operator to produce the new observable | |
* @param op The operator which consumes this observable and produces a new one | |||
*/ | |||
pipe<U>(op: (source: Observable<T>) => Producer<U>): Observable<U>; | |||
/** Waits for the next event to occur and returns a promise for the next value | |||
* @param ct Cancellation token to | |||
*/ | |||
next(ct?: ICancellation): Promise<T>; | |||
cin
|
r102 | } | |
const noop = () => { }; | |||
const sink = <T>(consumer: Partial<Observer<T>>) => { | |||
const { next, error, complete } = consumer; | |||
cin
|
r96 | return { | |
cin
|
r102 | next: next ? next.bind(consumer) : noop, | |
error: error ? error.bind(consumer) : noop, | |||
cin
|
r110 | complete: complete ? complete.bind(consumer) : noop, | |
isClosed: () => false | |||
cin
|
r109 | }; | |
cin
|
r102 | }; | |
cin
|
r110 | /** Wraps the producer to handle tear down logic and subscription management | |
* | |||
* @param producer The producer to wrap | |||
* @returns The wrapper producer | |||
*/ | |||
const fuse = <T>(producer: Producer<T>) => ({ next, error, complete }: Sink<T>) => { | |||
cin
|
r102 | let done = false; | |
cin
|
r110 | let cleanup = noop; | |
const _fin = <A extends unknown[]>(fn: (...args: A) => void) => | |||
(...args: A) => done ? | |||
void (0) : | |||
(done = true, cleanup(), fn(...args)); | |||
const safeSink = { | |||
cin
|
r109 | next: (value: T) => { !done && next(value); }, | |
cin
|
r110 | error: _fin(error), | |
complete: _fin(complete), | |||
isClosed: () => done | |||
cin
|
r109 | }; | |
cin
|
r110 | cleanup = producer(safeSink) ?? noop; | |
return done ? | |||
(cleanup(), noop) : | |||
_fin(noop); | |||
cin
|
r109 | }; | |
cin
|
r96 | ||
cin
|
r102 | const _observe = <T>(producer: Producer<T>): Observable<T> => ({ | |
subscribe: (consumer: Partial<Observer<T>>) => ({ | |||
unsubscribe: producer(sink(consumer)) ?? noop | |||
}), | |||
cin
|
r116 | ||
cin
|
r110 | map: (mapper) => _observe(({ next, ...rest }) => | |
cin
|
r102 | producer({ | |
next: next !== noop ? (v: T) => next(mapper(v)) : noop, | |||
cin
|
r110 | ...rest | |
}) | |||
), | |||
cin
|
r116 | ||
cin
|
r110 | filter: (predicate) => _observe(({ next, ...rest }) => | |
producer({ | |||
next: next !== noop ? (v: T) => predicate(v) ? next(v) : void (0) : noop, | |||
...rest | |||
cin
|
r102 | }) | |
), | |||
cin
|
r116 | ||
scan: <A>(...args: [AccumulatorFn<T, A>, A] | [AccumulatorFn<T, T>]) => _observe<T | A>(({ next, ...rest }) => { | |||
if (args.length === 1) { | |||
const [accumulator] = args; | |||
let _acc: T; | |||
let index = 0; | |||
return producer({ | |||
next: next !== noop ? (v: T) => next(index++ === 0 ? _acc = v : _acc = accumulator(_acc, v)) : noop, | |||
...rest | |||
}); | |||
} else { | |||
const [accumulator, initial] = args; | |||
let _acc = initial; | |||
return producer({ | |||
next: next !== noop ? (v: T) => next(_acc = accumulator(_acc, v)) : noop, | |||
...rest | |||
}); | |||
} | |||
}), | |||
reduce: <A>(...args: [AccumulatorFn<T, A>, A] | [AccumulatorFn<T, T>]) => _observe<T | A>(({ next, complete, error, ...rest }) => { | |||
if (args.length === 1) { | |||
const [accumulator] = args; | |||
let _acc: T; | |||
let index = 0; | |||
return producer({ | |||
next: next !== noop ? (v: T) => { | |||
_acc = index++ === 0 ? v : accumulator(_acc, v); | |||
} : noop, | |||
complete: () => { | |||
if (index === 0) { | |||
error(new Error("The sequence can't be empty")); | |||
} else { | |||
next(_acc); | |||
complete(); | |||
} | |||
}, | |||
error, | |||
...rest | |||
}); | |||
} else { | |||
const [accumulator, initial] = args; | |||
let _acc = initial; | |||
return producer({ | |||
next: next !== noop ? (v: T) => { | |||
_acc = accumulator(_acc, v); | |||
} : noop, | |||
complete: () => { | |||
next(_acc); | |||
complete(); | |||
}, | |||
error, | |||
...rest | |||
}); | |||
} | |||
cin
|
r110 | }), | |
cat: (...seq) => _observe(({ next, complete: final, ...rest }) => { | |||
let cleanup: () => void; | |||
const complete = () => { | |||
const continuation = seq.shift(); | |||
if (continuation) { | |||
// if we have a next sequence, subscribe to it | |||
const subscription = continuation.subscribe({ next, complete, ...rest }); | |||
cleanup = subscription.unsubscribe.bind(subscription); | |||
} else { | |||
// otherwise notify the consumer about completion | |||
final(); | |||
} | |||
}; | |||
cleanup = producer({ next, complete, ...rest }) ?? noop; | |||
return () => cleanup(); | |||
cin
|
r114 | }), | |
cin
|
r116 | pipe: <U>(op: (source: Observable<T>) => Producer<U>) => observe(op(_observe(producer))), | |
cin
|
r110 | ||
cin
|
r116 | next: (ct?: ICancellation) => { | |
const _ct = ct ?? Cancellation.none; | |||
return new Promise<T>((resolve, reject) => { | |||
// wrap the producer to handle only single event | |||
const once = fuse<T>(({ next, complete, error, isClosed }) => { | |||
const h = _ct.register(error); | |||
cin
|
r110 | ||
cin
|
r116 | // is the _ct fires it will call error() and isClosed() will return true | |
const cleanup = !isClosed() ? | |||
producer({ | |||
next: v => (next(v), complete()), | |||
complete: () => error(new Error("The sequence is empty")), | |||
error, | |||
isClosed | |||
}) ?? noop : | |||
noop; | |||
cin
|
r110 | ||
cin
|
r116 | return () => { | |
h.destroy(); | |||
cleanup(); | |||
}; | |||
}); | |||
cin
|
r110 | ||
cin
|
r116 | once({ | |
next: resolve, | |||
error: reject, | |||
complete: noop, | |||
isClosed: () => false | |||
}); | |||
}); | |||
} | |||
}); | |||
cin
|
r110 | ||
export const observe = <T>(producer: Producer<T>) => _observe(fuse(producer)); | |||
cin
|
r116 | export const streamArray = <T>(items: T[]) => _observe<T>( | |
({ next, complete }) => ( | |||
items.forEach(next), | |||
complete() | |||
) | |||
); | |||
cin
|
r110 | ||
cin
|
r116 | export const streamPromise = <T>(promise: PromiseLike<T>) => observe<T>( | |
({next, error, complete}) => void promise.then(v => (next(v), complete()), error) | |||
); | |||
cin
|
r110 | ||
cin
|
r116 | export const of = <T>(...items: T[]) => _observe<T>( | |
({ next, complete }) => ( | |||
items.forEach(next), | |||
complete() | |||
) | |||
); | |||
cin
|
r110 | ||
cin
|
r116 | export const empty = _observe<never>(({ complete }) => complete()); |