import { id as mid} from "module"; import { Cancellation } from "@implab/core-amd/Cancellation"; import { TraceSource } from "@implab/core-amd/log/TraceSource"; import { isPromise } from "@implab/core-amd/safe"; import { Unsubscribable, Observer, Producer, FusedSink, FusedProducer, AccumulatorFn, Observable, OperatorFn, Subscribable } from "./observable/interfaces"; export const isUnsubscribable = (v: unknown): v is Unsubscribable => v !== null && v !== undefined && typeof (v as Unsubscribable).unsubscribe === "function"; export const isSubscribable = (v: unknown): v is Subscribable => v !== null && v !== undefined && typeof (v as Subscribable).subscribe === "function"; const noop = () => { }; const sink = (consumer: Observer) => { // eslint-disable-next-line @typescript-eslint/unbound-method const { next, error, complete } = consumer; return { next: next ? next.bind(consumer) : noop, error: error ? error.bind(consumer) : errorFallback, // report unhandled errors complete: complete ? complete.bind(consumer) : noop }; }; /** Wraps the producer to handle tear down logic and subscription management * * The resulting producer will invoke cleanup logic on error or complete events * and will prevent calling of any method from the sink. * * @param producer The producer to wrap * @returns The wrapper producer */ const fuse = (producer: Producer) => ({ next, error, complete }: FusedSink) => { let done = false; let cleanup = noop; const _fin = (fn: (...args: A) => void) => (...args: A) => done ? void (0) : (done = true, cleanup(), fn(...args)); const _fin0 = () => done ? void (0) : (done = true, cleanup()); const safeSink = { next: (value: T) => { !done && next(value); }, error: _fin(error), complete: _fin(complete), isClosed: () => done }; // call the producer cleanup = producer(safeSink) ?? noop; // if the producer throws exception bypass it to the caller rather then to // the sink. This is a feature. // if the producer completed the sequence immediately call the cleanup in place return done ? cleanup() : _fin0; }; const _observe = (producer: FusedProducer): Observable => ({ subscribe: (consumer: Observer = {}) => ({ unsubscribe: producer(sink(consumer)) ?? noop }), map: (mapper) => _observe(({ next, ...rest }) => producer({ next: next !== noop ? (v: T) => next(mapper(v)) : noop, ...rest }) ), tap: ({next: tapNext, complete: tapComplete, error: tapError}) => _observe(({next,complete, error}) => producer({ next: tapNext ? (v => (tapNext(v), next(v))) : next, complete: tapComplete ? (() => (tapComplete(), complete())): complete, error: tapError ? (e => (tapError(e), error(e))) : error }) ), filter: (predicate) => _observe(({ next, ...rest }) => producer({ next: next !== noop ? (v: T) => predicate(v) ? next(v) : void (0) : noop, ...rest }) ), until: predicate => _observe(({ next, complete, ...rest }) => producer({ next: v => predicate(v) ? complete() : next(v), complete, ...rest }) ), while: predicate => _observe(({ next, complete, ...rest }) => producer({ next: v => predicate(v) ? next(v) : complete(), complete, ...rest }) ), scan: (...args: [AccumulatorFn, A] | [AccumulatorFn]) => _observe(({ next, ...rest }) => { if (args.length === 1) { const [accumulator] = args; let _acc: T; let index = 0; return producer({ next: next !== noop ? (v: T) => next(index++ === 0 ? _acc = v : _acc = accumulator(_acc, v)) : noop, ...rest }); } else { const [accumulator, initial] = args; let _acc = initial; return producer({ next: next !== noop ? (v: T) => next(_acc = accumulator(_acc, v)) : noop, ...rest }); } }), reduce: (...args: [AccumulatorFn, A] | [AccumulatorFn]) => _observe(({ next, complete, error }) => { if (args.length === 1) { const [accumulator] = args; let _acc: T; let index = 0; return producer({ next: next !== noop ? (v: T) => { _acc = index++ === 0 ? v : accumulator(_acc, v); } : noop, complete: () => { if (index === 0) { error(new Error("The sequence can't be empty")); } else { next(_acc); complete(); } }, error }); } else { const [accumulator, initial] = args; let _acc = initial; return producer({ next: next !== noop ? (v: T) => { _acc = accumulator(_acc, v); } : noop, complete: () => { next(_acc); complete(); }, error }); } }), cat: (...seq) => _observe(({ next, complete: final, ...rest }) => { let cleanup: () => void; const len = seq.length; const complete = (i: number) => i < len ? () => { const subscription = seq[i].subscribe({ next, complete: complete(i + 1), ...rest }); cleanup = subscription.unsubscribe.bind(subscription); } : final; cleanup = producer({ next, complete: complete(0), ...rest }) ?? noop; return () => cleanup(); }), pipe: (op: OperatorFn) => op(_observe(producer)), next: collect( producer, ({ next, complete, error }) => ({ next: v => (next(v), complete()), complete: () => error(new Error("The sequence is empty")), error }) ), collect: collect( producer, ({ next, complete, error}) => { const data: T[] = []; return { next: v => data.push(v), complete: () => (next(data), complete()), error }; } ) }); const collect = ( producer: FusedProducer, collector: (result: FusedSink) => FusedSink ) => (ct = Cancellation.none) => new Promise((resolve, reject) => { const fused = fuse(({ next, complete, error, isClosed }) => { const h = ct.register(error); const cleanup = !isClosed() ? producer(collector({ next, complete, error })) ?? noop : noop; return () => { h.destroy(); cleanup(); }; }); fused({ next: resolve, error: reject, complete: noop }); }); export const observe = (producer: Producer) => _observe(fuse(producer)); /** Converts an array to the observable sequence of its elements. */ export const ofArray = (items: T[]) => _observe( ({ next, complete }) => ( items.forEach(next), complete() ) ); /** Converts a subscribable to the observable */ export const ofSubscribable = (subscribable: Subscribable) => observe(sink => { const subscription = subscribable.subscribe(sink); return () => subscription.unsubscribe(); }); const of1 = (item: T | PromiseLike) => observe( ({ next, error, complete }) => isPromise(item) ? void item.then( v => (next(v), complete()), error ) : (next(item), complete()) ); /** Converts a list of parameter values to the observable sequence. The * order of elements in the list will be preserved in the resulting sequence. */ export const of = (...items: (T | PromiseLike)[]) => items.length === 1 ? of1(items[0]) : observe( ({ next, error, complete, isClosed }) => { const n = items.length; const _next = (start: number) => { if (start > 0 && isClosed()) // when resumed return; for (let i = start; i < n; i++) { const r = items[i]; if (isPromise(r)) { r.then(v => (next(v), _next(i + 1)), error); return; // suspend } else { next(r); } } complete(); }; _next(0); } ); export const empty = _observe(({ complete }) => complete());