/** * The interface for the consumer of an observable sequence */ export interface Observer { /** * Called for the next element in the sequence */ next: (value: T) => void; /** * Called once when the error occurs in the sequence. */ error: (e: unknown) => void; /** * Called once at the end of the sequence. */ complete: () => void; } /** * The group of functions to feed an observable. This methods are provided to * the producer to generate a stream of events. */ export type Sink = { [k in keyof Observer]: (this: void, ...args: Parameters[k]>) => void; }; export type Producer = (sink: Sink) => (void | (() => void)); export interface Unsubscribable { unsubscribe(): void; } export const isUnsubsribable = (v: unknown): v is Unsubscribable => v !== null && v !== undefined && typeof (v as Unsubscribable).unsubscribe === "function"; export const isSubsribable = (v: unknown): v is Subscribable => v !== null && v !== undefined && typeof (v as Subscribable).subscribe === "function"; export interface Subscribable { subscribe(consumer: Partial>): Unsubscribable; } /** The observable source of items. */ export interface Observable extends Subscribable { /** Transforms elements of the sequence with the specified mapper * * @param mapper The mapper used to transform the values */ map(mapper: (value: T) => T2): Observable; /** Filters elements of the sequence. The resulting sequence will * contain only elements which match the specified predicate. * * @param predicate The filter predicate. */ filter(predicate: (value: T) => boolean): Observable; /** Applies accumulator to each value in the sequence and * emits the accumulated value for each source element * * @param accumulator * @param initial */ scan(accumulator: (acc: A, value: T) => A, initial: A): Observable; } const noop = () => { }; const sink = (consumer: Partial>) => { const { next, error, complete } = consumer; return { next: next ? next.bind(consumer) : noop, error: error ? error.bind(consumer) : noop, complete: complete ? complete.bind(consumer) : noop } }; const fuse = ({ next, error, complete }: Sink) => { let done = false; return { next: (value: T) => { !done && next(value) }, error: (e: unknown) => { !done && (done = true, error(e)) }, complete: () => { !done && (done = true, complete()) } } } const _observe = (producer: Producer): Observable => ({ subscribe: (consumer: Partial>) => ({ unsubscribe: producer(sink(consumer)) ?? noop }), map: (mapper) => _observe(({ next, error, complete }) => producer({ next: next !== noop ? (v: T) => next(mapper(v)) : noop, error, complete }) ), filter: (predicate) => _observe(({ next, error, complete }) => producer({ next: next !== noop ? (v: T) => predicate(v) ? next(v) : void (0) : noop, error, complete }) ), scan: (accumulator, initial) => _observe(({ next, error, complete }) => { let _acc = initial; return producer({ next: next !== noop ? (v: T) => next(_acc = accumulator(_acc, v)) : noop, error, complete }); }) }); export const observe = (producer: Producer): Observable => ({ subscribe: (consumer: Partial>) => ({ unsubscribe: producer(fuse(sink(consumer))) ?? noop }), map: (mapper) => _observe(({ next, error, complete }) => producer(fuse({ next: next !== noop ? (v: T) => next(mapper(v)) : noop, error, complete })) ), filter: (predicate) => _observe(({ next, error, complete }) => producer(fuse({ next: next !== noop ? (v: T) => predicate(v) ? next(v) : void (0) : noop, error, complete })) ), scan: (accumulator, initial) => observe(({ next, error, complete }) => { let _acc = initial; return producer(fuse({ next: next !== noop ? (v: T) => next(_acc = accumulator(_acc, v)) : noop, error, complete })); }) });