import { id as mid } from "module"; import { AccumulatorFn, Observable, Observer, OperatorFn, Producer, Sink, Subscribable, Unsubscribable } from "./interfaces"; import { TraceSource } from "@implab/core-amd/log/TraceSource"; import { Predicate } from "@implab/core-amd/interfaces"; import { Cancellation } from "@implab/core-amd/Cancellation"; const trace = TraceSource.get(mid); const noop = () => { }; const errorFallback = (e: unknown) => trace.error("Unhandled observable error: {0}", e); const subscription = (reject: (err: unknown) => void, ct = Cancellation.none) => { let done = false; let cleanup = noop; const finalize = () => done ? false : (cleanup(), done = true); const isClosed = () => done; const teardown = (cb: void | (() => void)) => { if (cb) { if (done) { cb(); } else { const _prev = cleanup; cleanup = _prev === noop ? cb : () => (_prev(), cb()); } } }; const error = (err: unknown) => finalize() && reject(err); if (ct.isSupported()) { const h = ct.register(error); teardown(() => h.destroy()); } return { finalize, isClosed, teardown, error }; }; type FusedSink = Omit, "isClosed">; export const fuse = (producer: Producer) => (consumer: FusedSink) => { const reject = (err: unknown) => consumer.error ? consumer.error(err) : errorFallback(err); const { teardown, isClosed, finalize, error } = subscription(reject); const next = consumer.next ? consumer.next.bind(consumer) : noop; teardown(producer({ next: next !== noop ? (v => isClosed() || next(v)) : noop, error, complete: () => finalize() && consumer.complete && consumer.complete(), isClosed })); return { unsubscribe: finalize }; }; type FusedProducer = (consumer: FusedSink) => void | (() => void); export class ObservableImpl implements Observable { private readonly _producer: FusedProducer; constructor(producer: FusedProducer) { this._producer = producer; } subscribe(consumer: Observer = {}) { const reject = (err: unknown) => consumer.error ? consumer.error(err) : errorFallback(err); const next = consumer.next ? consumer.next.bind(consumer) : noop; const { teardown, isClosed, finalize, error } = subscription(reject); teardown((0, this._producer)({ next: next !== noop ? (v => isClosed() || next(v)) : noop, error, complete: () => finalize() && consumer.complete && consumer.complete() })); return { unsubscribe: finalize }; } map(mapper: (value: T) => T2) { return new ObservableImpl(({ next, ...rest }) => this._producer({ next: next !== noop ? (v: T) => next(mapper(v)) : noop, ...rest })); } tap({ next: tapNext, complete: tapComplete, error: tapError }: Observer) { return new ObservableImpl(({ next, complete, error }) => this._producer({ next: tapNext ? (v => (tapNext(v), next(v))) : next, complete: tapComplete ? (() => (tapComplete(), complete())) : complete, error: tapError ? (e => (tapError(e), error(e))) : error })); } filter(predicate: Predicate) { return new ObservableImpl(({ next, ...rest }) => this._producer({ next: next !== noop ? (v: T) => predicate(v) ? next(v) : void (0) : noop, ...rest }) ); } until(predicate: Predicate) { return new ObservableImpl(({ next, complete, ...rest }) => this._producer({ next: v => predicate(v) ? complete() : next(v), complete, ...rest }) ); } while(predicate: Predicate) { return new ObservableImpl(({ next, complete, ...rest }) => this._producer({ next: v => predicate(v) ? next(v) : complete(), complete, ...rest }) ); } scan(accumulator: AccumulatorFn, initial: A): Observable; scan(accumulator: AccumulatorFn): Observable; scan(...args: [AccumulatorFn, A] | [AccumulatorFn]) { if (args.length === 1) { return new ObservableImpl(({ next, ...rest }) => { const [accumulator] = args; let _acc: T; let index = 0; return this._producer({ next: next !== noop ? (v: T) => next(index++ === 0 ? _acc = v : _acc = accumulator(_acc, v)) : noop, ...rest }); }); } else { return new ObservableImpl(({ next, ...rest }) => { const [accumulator, initial] = args; let _acc = initial; return this._producer({ next: next !== noop ? (v: T) => next(_acc = accumulator(_acc, v)) : noop, ...rest }); }); } } reduce(accumulator: AccumulatorFn, initial: A): Observable; reduce(accumulator: AccumulatorFn): Observable; reduce(...args: [AccumulatorFn, A] | [AccumulatorFn]) { if (args.length === 1) { return new ObservableImpl(({ next, complete, error }) => { const [accumulator] = args; let _acc: T; let index = 0; return this._producer({ next: next !== noop ? (v: T) => { _acc = index++ === 0 ? v : accumulator(_acc, v); } : noop, complete: () => { if (index === 0) { error(new Error("The sequence can't be empty")); } else { next(_acc); complete(); } }, error }); }); } else { return new ObservableImpl(({ next, complete, error }) => { const [accumulator, initial] = args; let _acc = initial; return this._producer({ next: next !== noop ? (v: T) => { _acc = accumulator(_acc, v); } : noop, complete: () => { next(_acc); complete(); }, error }); }); } } cat(...seq: Subscribable[]) { return new ObservableImpl(({ next, complete: final, ...rest }) => { let cleanup: () => void; const len = seq.length; const complete = (i: number) => i < len ? () => { const subscription = seq[i].subscribe({ next, complete: complete(i + 1), ...rest }); cleanup = subscription.unsubscribe.bind(subscription); } : final; cleanup = this._producer({ next, complete: complete(0), ...rest }) ?? noop; return () => cleanup(); }); } pipe(op: OperatorFn): Observable { return op(this); } next(ct = Cancellation.none) { return new Promise((resolve, reject) => { ct.throwIfRequested(); const { teardown, finalize, isClosed, error } = subscription(reject, ct); teardown((0, this._producer)({ next: (item: T) => finalize() && resolve(item), complete: () => finalize() && reject(new Error("The sequence is empty")), error, isClosed })); }); } collect(ct = Cancellation.none) { return new Promise((resolve, reject) => { ct.throwIfRequested(); const acc: T[] = []; const { teardown, finalize, isClosed, error } = subscription(reject, ct); teardown((0, this._producer)({ next: (item: T) => isClosed() ? void (0) : acc.push(item), complete: () => finalize() && resolve(acc), error, isClosed })); }); } forEach(next: (item: T) => void, ct = Cancellation.none): Promise { return new Promise((resolve, reject) => { ct.throwIfRequested(); const { teardown, finalize, isClosed, error } = subscription(reject, ct); teardown((0, this._producer)({ next: item => !isClosed() && next(item), complete: () => finalize() && resolve(), error, isClosed })); }); } }