import { Cancellation } from "@implab/core-amd/Cancellation"; import { ICancellation } from "@implab/core-amd/interfaces"; /** * The interface for the consumer of an observable sequence */ export interface Observer { /** * Called for the next element in the sequence */ next: (value: T) => void; /** * Called once when the error occurs in the sequence. */ error: (e: unknown) => void; /** * Called once at the end of the sequence. */ complete: () => void; } /** * The group of functions to feed an observable. These methods are provided to * the producer to generate a stream of events. */ export type Sink = { /** * Call to send the next element in the sequence */ next: (value: T) => void; /** * Call to notify about the error occurred in the sequence. */ error: (e: unknown) => void; /** * Call to signal the end of the sequence. */ complete: () => void; /** * Checks whether the sink is accepting new elements. It's safe to * send elements to the closed sink. */ isClosed: () => boolean; }; export type Producer = (sink: Sink) => (void | (() => void)); export interface Unsubscribable { unsubscribe(): void; } export const isUnsubscribable = (v: unknown): v is Unsubscribable => v !== null && v !== undefined && typeof (v as Unsubscribable).unsubscribe === "function"; export const isSubscribable = (v: unknown): v is Subscribable => v !== null && v !== undefined && typeof (v as Subscribable).subscribe === "function"; export interface Subscribable { subscribe(consumer: Partial>): Unsubscribable; } export type AccumulatorFn = (acc: A, value: T) => A; export type OperatorFn = (source: Observable) => Producer; /** The observable source of items. */ export interface Observable extends Subscribable { /** Transforms elements of the sequence with the specified mapper * * @param mapper The mapper used to transform the values */ map(mapper: (value: T) => T2): Observable; /** Filters elements of the sequence. The resulting sequence will * contain only elements which match the specified predicate. * * @param predicate The filter predicate. */ filter(predicate: (value: T) => boolean): Observable; /** Completes the sequence once the condition is met. * @param predicate The condition which should be met to complete the sequence */ until(predicate: (value: T) => boolean): Observable; /** Keeps the sequence running while elements satisfy the condition. * * @param predicate The condition which should be met to continue. */ while(predicate: (value: T) => boolean): Observable; /** Applies accumulator to each value in the sequence and * emits the accumulated value for each source element * * @param accumulator * @param initial */ scan(accumulator: AccumulatorFn, initial: A): Observable; scan(accumulator: AccumulatorFn): Observable; /** Applies accumulator to each value in the sequence and * emits the accumulated value at the end of the sequence * * @param accumulator * @param initial */ reduce(accumulator: AccumulatorFn, initial: A): Observable; reduce(accumulator: AccumulatorFn): Observable; /** Concatenates the specified sequences with this observable * * @param seq sequences to concatenate with the current observable * * The concatenation doesn't accumulate values from the specified sequences, * The result of the concatenation is the new observable which will switch * to the next observable after the previous one completes. Values emitted * before the next observable being active are lost. */ cat(...seq: Subscribable[]): Observable; /** Pipes the specified operator to produce the new observable * @param op The operator consumes this observable and produces a new one * * The operator is a higher order function which takes a source observable * and returns a producer for the new observable. * * This function can be used to create a complex mapping between source and * resulting observables. The operator may have a state (or a side effect) * and can be connected to multiple observables. */ pipe(op: OperatorFn): Observable; /** Waits for the next event to occur and returns a promise for the next value * @param ct Cancellation token to */ next(ct?: ICancellation): Promise; } const noop = () => { }; const sink = (consumer: Partial>) => { const { next, error, complete } = consumer; return { next: next ? next.bind(consumer) : noop, error: error ? error.bind(consumer) : noop, complete: complete ? complete.bind(consumer) : noop, isClosed: () => false }; }; /** Wraps the producer to handle tear down logic and subscription management * * @param producer The producer to wrap * @returns The wrapper producer */ const fuse = (producer: Producer) => ({ next, error, complete }: Sink) => { let done = false; let cleanup = noop; const _fin = (fn: (...args: A) => void) => (...args: A) => done ? void (0) : (done = true, cleanup(), fn(...args)); const _fin0 = () => done ? void (0) : (done = true, cleanup()); const safeSink = { next: (value: T) => { !done && next(value); }, error: _fin(error), complete: _fin(complete), isClosed: () => done }; cleanup = producer(safeSink) ?? noop; return done ? cleanup() : _fin0; }; const _observe = (producer: Producer): Observable => ({ subscribe: (consumer: Partial>) => ({ unsubscribe: producer(sink(consumer)) ?? noop }), map: (mapper) => _observe(({ next, ...rest }) => producer({ next: next !== noop ? (v: T) => next(mapper(v)) : noop, ...rest }) ), filter: (predicate) => _observe(({ next, ...rest }) => producer({ next: next !== noop ? (v: T) => predicate(v) ? next(v) : void (0) : noop, ...rest }) ), until: predicate => _observe(({ next, complete, ...rest }) => producer({ next: v => predicate(v) ? complete() : next(v), complete, ...rest }) ), while: predicate => _observe(({ next, complete, ...rest }) => producer({ next: v => predicate(v) ? next(v) : complete(), complete, ...rest }) ), scan: (...args: [AccumulatorFn, A] | [AccumulatorFn]) => _observe(({ next, ...rest }) => { if (args.length === 1) { const [accumulator] = args; let _acc: T; let index = 0; return producer({ next: next !== noop ? (v: T) => next(index++ === 0 ? _acc = v : _acc = accumulator(_acc, v)) : noop, ...rest }); } else { const [accumulator, initial] = args; let _acc = initial; return producer({ next: next !== noop ? (v: T) => next(_acc = accumulator(_acc, v)) : noop, ...rest }); } }), reduce: (...args: [AccumulatorFn, A] | [AccumulatorFn]) => _observe(({ next, complete, error, ...rest }) => { if (args.length === 1) { const [accumulator] = args; let _acc: T; let index = 0; return producer({ next: next !== noop ? (v: T) => { _acc = index++ === 0 ? v : accumulator(_acc, v); } : noop, complete: () => { if (index === 0) { error(new Error("The sequence can't be empty")); } else { next(_acc); complete(); } }, error, ...rest }); } else { const [accumulator, initial] = args; let _acc = initial; return producer({ next: next !== noop ? (v: T) => { _acc = accumulator(_acc, v); } : noop, complete: () => { next(_acc); complete(); }, error, ...rest }); } }), cat: (...seq) => _observe(({ next, complete: final, ...rest }) => { let cleanup: () => void; const complete = () => { const continuation = seq.shift(); if (continuation) { // if we have a next sequence, subscribe to it const subscription = continuation.subscribe({ next, complete, ...rest }); cleanup = subscription.unsubscribe.bind(subscription); } else { // otherwise notify the consumer about completion final(); } }; cleanup = producer({ next, complete, ...rest }) ?? noop; return () => cleanup(); }), pipe: (op: (source: Observable) => Producer) => observe(op(_observe(producer))), next: (ct?: ICancellation) => { const _ct = ct ?? Cancellation.none; return new Promise((resolve, reject) => { // wrap the producer to handle only single event const once = fuse(({ next, complete, error, isClosed }) => { const h = _ct.register(error); // is the _ct fires it will call error() and isClosed() will return true const cleanup = !isClosed() ? producer({ next: v => (next(v), complete()), complete: () => error(new Error("The sequence is empty")), error, isClosed }) ?? noop : noop; return () => { h.destroy(); cleanup(); }; }); once({ next: resolve, error: reject, complete: noop, isClosed: () => false }); }); } }); export const observe = (producer: Producer) => _observe(fuse(producer)); export const streamArray = (items: T[]) => _observe( ({ next, complete }) => ( items.forEach(next), complete() ) ); export const streamPromise = (promise: PromiseLike) => observe( ({ next, error, complete }) => void promise.then( v => (next(v), complete()), error ) ); export const of = (...items: T[]) => _observe( ({ next, complete }) => ( items.forEach(next), complete() ) ); export const empty = _observe(({ complete }) => complete()); /** * Creates a mutable state and the observable for the stored value. * * @param value The initial value for the state * @returns an array of three elements `[observable, setter, getter]` * * The returned observable keeps the actual value and will emit it as the next * element each time a consumer subscribes the observable. * * Calling the setter will update the stored value in the observable and notify * all consumers. */ export const stateful = (producer: Producer): Producer => { const fusedProducer = fuse(producer); type Status = "active" | "complete" | "error"; let lastValue: T; let hasValue = false; let status: Status = "active"; let lastError: unknown; let subscribers: Sink[] = []; const sink: Sink = { isClosed: () => status !== "active", complete: () => { if (status === "active") { status = "complete"; const _subscribers = subscribers; subscribers = []; _subscribers.forEach(s => s.complete()); } }, error: e => { if (status === "active") { status = "error"; lastError = e; const _subscribers = subscribers; subscribers = []; _subscribers.forEach(s => s.error(e)); } }, next: v => { if (status === "active") { hasValue = true; lastValue = v; const _subscribers = subscribers; _subscribers.forEach(s => s.next(v)); } } }; fusedProducer(sink); return (s: Sink) => { const _subscribers = subscribers; switch (status) { case "active": if (hasValue) s.next(lastValue); // if hasValue is true, // lastValue has a valid value subscribers.push(s); return () => { if (_subscribers === subscribers) { const pos = subscribers.indexOf(s); if (pos >= 0) subscribers.splice(pos, 1); } }; case "complete": s.complete(); break; case "error": s.error(lastError); break; } }; }; const subject = (producer: Producer): Producer => { const fusedProducer = fuse(producer); return () => { }; };