# HG changeset patch # User cin # Date 2023-09-30 21:53:27 # Node ID f9518367061a0fc344eecc34563d613c1b1765ba # Parent 46e0da3cebdf9d503e451e6aa4f320f0e7784828 WIP observables diff --git a/djx/src/main/ts/observable.ts b/djx/src/main/ts/observable.ts --- a/djx/src/main/ts/observable.ts +++ b/djx/src/main/ts/observable.ts @@ -1,5 +1,5 @@ import { isPromise } from "@implab/core-amd/safe"; -import { Unsubscribable, Producer, FusedSink, Observable, Subscribable } from "./observable/interfaces"; +import { Unsubscribable, Producer, Observable, Subscribable } from "./observable/interfaces"; import { ObservableImpl } from "./observable/ObservableImpl"; export const isUnsubscribable = (v: unknown): v is Unsubscribable => @@ -8,46 +8,10 @@ export const isUnsubscribable = (v: unkn export const isSubscribable = (v: unknown): v is Subscribable => v !== null && v !== undefined && typeof (v as Subscribable).subscribe === "function"; -const noop = () => { }; - -/** Wraps the producer to handle tear down logic and subscription management - * - * The resulting producer will invoke cleanup logic on error or complete events - * and will prevent calling of any method from the sink. - * - * @param producer The producer to wrap - * @returns The wrapper producer - */ -const fuse = (producer: Producer) => ({ next, error, complete }: FusedSink) => { - let done = false; - let cleanup = noop; - - const _fin = (fn: (...args: A) => void) => - (...args: A) => done ? - void (0) : - (done = true, cleanup(), fn(...args)); - - const _fin0 = () => done ? void (0) : (done = true, cleanup()); - - const safeSink = { - next: (value: T) => { !done && next(value); }, - error: _fin(error), - complete: _fin(complete), - isClosed: () => done - }; - // call the producer - cleanup = producer(safeSink) ?? noop; - // if the producer throws exception bypass it to the caller rather then to - // the sink. This is a feature. - - // if the producer completed the sequence immediately call the cleanup in place - return done ? cleanup() : _fin0; -}; - -export const observe = (producer: Producer): Observable => new ObservableImpl(fuse(producer)); +export const observe = (producer: Producer): Observable => new ObservableImpl(producer); /** Converts an array to the observable sequence of its elements. */ -export const ofArray = (items: T[]) => new ObservableImpl( +export const ofArray = (items: T[]): Observable => new ObservableImpl( ({ next, complete }) => ( items.forEach(next), complete() @@ -100,6 +64,6 @@ export const of = (...items: (T | Pro } ); -export const empty = new ObservableImpl(({ complete }) => complete()); +export const empty: Observable = new ObservableImpl(({ complete }) => complete()); export type * from "./observable/interfaces"; \ No newline at end of file diff --git a/djx/src/main/ts/observable/ObservableImpl.ts b/djx/src/main/ts/observable/ObservableImpl.ts --- a/djx/src/main/ts/observable/ObservableImpl.ts +++ b/djx/src/main/ts/observable/ObservableImpl.ts @@ -1,5 +1,5 @@ import { id as mid } from "module"; -import { AccumulatorFn, Observable, Observer, OperatorFn, Producer, Subscribable } from "./interfaces"; +import { AccumulatorFn, Observable, Observer, OperatorFn, Producer, Sink, Subscribable, Unsubscribable } from "./interfaces"; import { TraceSource } from "@implab/core-amd/log/TraceSource"; import { Predicate } from "@implab/core-amd/interfaces"; import { Cancellation } from "@implab/core-amd/Cancellation"; @@ -10,7 +10,7 @@ const noop = () => { }; const errorFallback = (e: unknown) => trace.error("Unhandled observable error: {0}", e); -const subscription = () => { +const subscription = (reject: (err: unknown) => void, ct = Cancellation.none) => { let done = false; let cleanup = noop; @@ -23,30 +23,60 @@ const subscription = () => { cb(); } else { const _prev = cleanup; - cleanup = () => (_prev(), cb()); + cleanup = _prev === noop ? cb : () => (_prev(), cb()); } } }; - return { finalize, isClosed, teardown }; + const error = (err: unknown) => finalize() && reject(err); + + if (ct.isSupported()) { + const h = ct.register(error); + teardown(() => h.destroy()); + } + + return { finalize, isClosed, teardown, error }; }; +type FusedSink = Omit, "isClosed">; + +export const fuse = (producer: Producer) => (consumer: FusedSink) => { + + const reject = (err: unknown) => consumer.error ? consumer.error(err) : errorFallback(err); + const { teardown, isClosed, finalize, error } = subscription(reject); + + const next = consumer.next ? consumer.next.bind(consumer) : noop; + + teardown(producer({ + next: next !== noop ? (v => isClosed() || next(v)) : noop, + error, + complete: () => finalize() && consumer.complete && consumer.complete(), + isClosed + })); + + return { unsubscribe: finalize }; +}; + +type FusedProducer = (consumer: FusedSink) => void | (() => void); + export class ObservableImpl implements Observable { - private readonly _producer: Producer; + private readonly _producer: FusedProducer; - constructor(producer: Producer) { + constructor(producer: FusedProducer) { this._producer = producer; } subscribe(consumer: Observer = {}) { - const { teardown, isClosed, finalize } = subscription(); + const reject = (err: unknown) => consumer.error ? consumer.error(err) : errorFallback(err); + const next = consumer.next ? consumer.next.bind(consumer) : noop; + + const { teardown, isClosed, finalize, error } = subscription(reject); - teardown(this._producer({ - next: consumer.next ? consumer.next.bind(consumer) : noop, - error: err => finalize() && (consumer.error ? consumer.error(err) : errorFallback(err)), - complete: () => finalize() && consumer.complete && consumer.complete(), - isClosed + teardown((0, this._producer)({ + next: next !== noop ? (v => isClosed() || next(v)) : noop, + error, + complete: () => finalize() && consumer.complete && consumer.complete() })); return { unsubscribe: finalize }; @@ -61,12 +91,11 @@ export class ObservableImpl implement } tap({ next: tapNext, complete: tapComplete, error: tapError }: Observer) { - return new ObservableImpl(({ next, complete, error, isClosed }) => + return new ObservableImpl(({ next, complete, error }) => this._producer({ next: tapNext ? (v => (tapNext(v), next(v))) : next, complete: tapComplete ? (() => (tapComplete(), complete())) : complete, - error: tapError ? (e => (tapError(e), error(e))) : error, - isClosed + error: tapError ? (e => (tapError(e), error(e))) : error })); } @@ -99,6 +128,8 @@ export class ObservableImpl implement ); } + scan(accumulator: AccumulatorFn, initial: A): Observable; + scan(accumulator: AccumulatorFn): Observable; scan(...args: [AccumulatorFn, A] | [AccumulatorFn]) { if (args.length === 1) { return new ObservableImpl(({ next, ...rest }) => { @@ -122,10 +153,12 @@ export class ObservableImpl implement } } + reduce(accumulator: AccumulatorFn, initial: A): Observable; + reduce(accumulator: AccumulatorFn): Observable; reduce(...args: [AccumulatorFn, A] | [AccumulatorFn]) { if (args.length === 1) { - return new ObservableImpl(({ next, complete, error, isClosed }) => { + return new ObservableImpl(({ next, complete, error }) => { const [accumulator] = args; let _acc: T; let index = 0; @@ -141,12 +174,11 @@ export class ObservableImpl implement complete(); } }, - error, - isClosed + error }); }); } else { - return new ObservableImpl(({ next, complete, error, isClosed }) => { + return new ObservableImpl(({ next, complete, error }) => { const [accumulator, initial] = args; let _acc = initial; return this._producer({ @@ -157,8 +189,7 @@ export class ObservableImpl implement next(_acc); complete(); }, - error, - isClosed + error }); }); } @@ -188,11 +219,7 @@ export class ObservableImpl implement return new Promise((resolve, reject) => { ct.throwIfRequested(); - const { teardown, finalize, isClosed } = subscription(); - - const error = (err: unknown) => finalize() && reject(err); - const h = ct.register(error); - teardown(() => h.destroy()); + const { teardown, finalize, isClosed, error } = subscription(reject, ct); teardown((0, this._producer)({ next: (item: T) => finalize() && resolve(item), complete: () => finalize() && reject(new Error("The sequence is empty")), @@ -209,11 +236,7 @@ export class ObservableImpl implement const acc: T[] = []; - const { teardown, finalize, isClosed } = subscription(); - const error = (err: unknown) => finalize() && reject(err); - - const h = ct.register(error); - teardown(() => h.destroy()); + const { teardown, finalize, isClosed, error } = subscription(reject, ct); teardown((0, this._producer)({ next: (item: T) => isClosed() ? void (0) : acc.push(item), complete: () => finalize() && resolve(acc), @@ -223,4 +246,19 @@ export class ObservableImpl implement }); } + forEach(next: (item: T) => void, ct = Cancellation.none): Promise { + return new Promise((resolve, reject) => { + ct.throwIfRequested(); + + const { teardown, finalize, isClosed, error } = subscription(reject, ct); + + teardown((0, this._producer)({ + next: item => !isClosed() && next(item), + complete: () => finalize() && resolve(), + error, + isClosed + })); + }); + } + } \ No newline at end of file diff --git a/djx/src/main/typings/observable/interfaces.d.ts b/djx/src/main/typings/observable/interfaces.d.ts --- a/djx/src/main/typings/observable/interfaces.d.ts +++ b/djx/src/main/typings/observable/interfaces.d.ts @@ -52,10 +52,6 @@ export type Sink = { export type Producer = (sink: Sink) => (void | (() => void)); -export type FusedSink = Omit, "isClosed">; - -export type FusedProducer = (sink: FusedSink) => (void | (() => void)); - export interface Unsubscribable { unsubscribe(): void; } @@ -153,4 +149,16 @@ export interface Observable extends S /** Collects items of the sequence to the array. */ collect(ct?: ICancellation): Promise; + + /** + * Iterates through the elements in this observable until the end of the + * sequence is reached or the operation is cancelled. + * + * @param next The callback for the next item in the sequence + * @param ct The optional cancellation token for this operation + * + * @returns A promise which will be fulfilled when the operation is completed. + * In case of the cancellation this promise will be rejected. + */ + forEach(next: (item: T) => void, ct?: ICancellation): Promise; } \ No newline at end of file diff --git a/djx/src/test/ts/observable-tests.ts b/djx/src/test/ts/observable-tests.ts --- a/djx/src/test/ts/observable-tests.ts +++ b/djx/src/test/ts/observable-tests.ts @@ -3,10 +3,13 @@ import * as tap from "tap"; import { Cancellation } from "@implab/core-amd/Cancellation"; import { delay } from "@implab/core-amd/safe"; -const subj1 = observe(({ next, complete }) => { +const subj1 = observe(({ next, error, complete }) => { next(1); complete(); next(2); + next(3); + error(new Error("This error should be ignored")); + next(4); }); const consumer1 = {