import { id as mid } from "module"; import { AccumulatorFn, Observable, Observer, OperatorFn, Producer, Sink, Subscribable, Unsubscribable } from "./interfaces"; import { TraceSource } from "@implab/core-amd/log/TraceSource"; import { Predicate } from "@implab/core-amd/interfaces"; import { Cancellation } from "@implab/core-amd/Cancellation"; import { Subscription, SubscriptionImpl } from "./SubscriptionImpl"; const trace = TraceSource.get(mid); const noop = () => { }; const errorFallback = (e: unknown) => trace.error("Unhandled observable error: {0}", e); type FusedSink = Omit, "isClosed">; export const fuse = (producer: Producer) => (sink: FusedSink, subscription: Subscription): Unsubscribable => { subscription.teardown(producer({ next: subscription.around(sink.next), complete: () => subscription.finalize() && sink.complete(), error: err => subscription.finalize() && sink.error(err), isClosed: subscription.isClosed.bind(subscription) })); return subscription; }; type FusedProducer = (sink: FusedSink, subscription: Subscription) => void; export class ObservableImpl implements Observable { private readonly _producer: FusedProducer; constructor(producer: FusedProducer) { this._producer = producer; } subscribe(consumer: Observer = {}): Unsubscribable { const error = (err: unknown) => consumer.error ? consumer.error(err) : errorFallback(err); const next = consumer.next ? consumer.next.bind(consumer) : noop; const complete = () => consumer.complete ? consumer.complete() : void (0); const subscription = new SubscriptionImpl(); this._producer({ next, error, complete }, subscription); return subscription; } map(mapper: (value: T) => T2) { return new ObservableImpl(({ next, ...rest }, subscription) => (0, this._producer)( { next: next !== noop ? (v: T) => next(mapper(v)) : noop, ...rest }, subscription )); } tap({ next: tapNext, complete: tapComplete, error: tapError }: Observer) { return new ObservableImpl(({ next, complete, error }, subscription) => (0, this._producer)( { next: tapNext ? (v => (tapNext(v), next(v))) : next, complete: tapComplete ? (() => (tapComplete(), complete())) : complete, error: tapError ? (e => (tapError(e), error(e))) : error }, subscription )); } filter(predicate: Predicate) { return new ObservableImpl(({ next, ...rest }, subscription) => (0, this._producer)( { next: next !== noop ? (v: T) => predicate(v) ? next(v) : void (0) : noop, ...rest }, subscription )); } until(predicate: Predicate) { return new ObservableImpl(({ next, complete, ...rest }, subscription) => (0, this._producer)({ next: v => predicate(v) ? subscription.finalize() && complete() : next(v), complete, ...rest }, subscription) ); } while(predicate: Predicate) { return new ObservableImpl(({ next, complete, ...rest }, subscription) => (0, this._producer)({ next: v => predicate(v) ? next(v) : subscription.finalize() && complete(), complete, ...rest }, subscription) ); } scan(accumulator: AccumulatorFn, initial: A): Observable; scan(accumulator: AccumulatorFn): Observable; scan(...args: [AccumulatorFn, A] | [AccumulatorFn]) { if (args.length === 1) { const [accumulator] = args; return new ObservableImpl(({ next, ...rest }, subscription) => { let _acc: T; let index = 0; (0, this._producer)({ next: next !== noop ? (v: T) => next( index++ === 0 ? _acc = v : _acc = accumulator(_acc, v) ) : noop, ...rest }, subscription); }); } else { const [accumulator, initial] = args; return new ObservableImpl(({ next, ...rest }, subscription) => { let _acc = initial; (0, this._producer)({ next: next !== noop ? (v: T) => next(_acc = accumulator(_acc, v)) : noop, ...rest }, subscription); }); } } reduce(accumulator: AccumulatorFn, initial: A): Observable; reduce(accumulator: AccumulatorFn): Observable; reduce(...args: [AccumulatorFn, A] | [AccumulatorFn]) { if (args.length === 1) { const [accumulator] = args; return new ObservableImpl(({ next, complete, error }, subscription) => { let _acc: T; let index = 0; (0, this._producer)({ next: next !== noop ? (v: T) => { _acc = index++ === 0 ? v : accumulator(_acc, v); } : noop, complete: () => { if (index === 0) { error(new Error("The sequence can't be empty")); } else { next(_acc); complete(); } }, error }, subscription); }); } else { const [accumulator, initial] = args; return new ObservableImpl(({ next, complete, error }, subscription) => { let _acc = initial; (0, this._producer)({ next: next !== noop ? (v: T) => { _acc = accumulator(_acc, v); } : noop, complete: () => { next(_acc); complete(); }, error }, subscription); }); } } cat(...seq: Subscribable[]) { return new ObservableImpl(({ next, complete: final, ...rest }, subscription) => { const len = seq.length; const complete = (i: number) => i < len ? () => { h = seq[i].subscribe({ next, complete: complete(i + 1), ...rest }); } : () => subscription.finalize() && final(); let h = this.subscribe({ next, complete: complete(0), ...rest }); subscription.teardown(() => h.unsubscribe()); }); } pipe(op: OperatorFn): Observable { return op(this); } next(ct = Cancellation.none) { return new Promise((resolve, reject) => { ct.throwIfRequested(); const subscription = new SubscriptionImpl(); subscription.cancellable(ct, reject); (0, this._producer)({ next: resolve, complete: () => reject(new Error("The sequence is empty")), error: reject, }, subscription); }); } collect(ct = Cancellation.none) { return new Promise((resolve, reject) => { ct.throwIfRequested(); const subscription = new SubscriptionImpl(); subscription.cancellable(ct, reject); const acc: T[] = []; (0, this._producer)({ next: (item: T) => acc.push(item), complete: () => resolve(acc), error: reject }, subscription); }); } forEach(next: (item: T) => void, ct = Cancellation.none): Promise { return new Promise((resolve, reject) => { ct.throwIfRequested(); const subscription = new SubscriptionImpl(); subscription.cancellable(ct, reject); (0, this._producer)({ next, complete: resolve, error: reject }, subscription); }); } }