diff --git a/djx/src/main/ts/observable.ts b/djx/src/main/ts/observable.ts --- a/djx/src/main/ts/observable.ts +++ b/djx/src/main/ts/observable.ts @@ -1,6 +1,6 @@ import { isPromise } from "@implab/core-amd/safe"; import { Unsubscribable, Producer, Observable, Subscribable } from "./observable/interfaces"; -import { ObservableImpl } from "./observable/ObservableImpl"; +import { ObservableImpl, fuse } from "./observable/ObservableImpl"; export const isUnsubscribable = (v: unknown): v is Unsubscribable => v !== null && v !== undefined && typeof (v as Unsubscribable).unsubscribe === "function"; @@ -8,7 +8,7 @@ export const isUnsubscribable = (v: unkn export const isSubscribable = (v: unknown): v is Subscribable => v !== null && v !== undefined && typeof (v as Subscribable).subscribe === "function"; -export const observe = (producer: Producer): Observable => new ObservableImpl(producer); +export const observe = (producer: Producer): Observable => new ObservableImpl(fuse(producer)); /** Converts an array to the observable sequence of its elements. */ export const ofArray = (items: T[]): Observable => new ObservableImpl( diff --git a/djx/src/main/ts/observable/ObservableImpl.ts b/djx/src/main/ts/observable/ObservableImpl.ts --- a/djx/src/main/ts/observable/ObservableImpl.ts +++ b/djx/src/main/ts/observable/ObservableImpl.ts @@ -3,6 +3,7 @@ import { AccumulatorFn, Observable, Obse import { TraceSource } from "@implab/core-amd/log/TraceSource"; import { Predicate } from "@implab/core-amd/interfaces"; import { Cancellation } from "@implab/core-amd/Cancellation"; +import { Subscription, SubscriptionImpl } from "./SubscriptionImpl"; const trace = TraceSource.get(mid); @@ -10,54 +11,23 @@ const noop = () => { }; const errorFallback = (e: unknown) => trace.error("Unhandled observable error: {0}", e); -const subscription = (reject: (err: unknown) => void, ct = Cancellation.none) => { - let done = false; - let cleanup = noop; - - const finalize = () => done ? false : (cleanup(), done = true); - const isClosed = () => done; - - const teardown = (cb: void | (() => void)) => { - if (cb) { - if (done) { - cb(); - } else { - const _prev = cleanup; - cleanup = _prev === noop ? cb : () => (_prev(), cb()); - } - } - }; - - const error = (err: unknown) => finalize() && reject(err); - - if (ct.isSupported()) { - const h = ct.register(error); - teardown(() => h.destroy()); - } - - return { finalize, isClosed, teardown, error }; -}; type FusedSink = Omit, "isClosed">; -export const fuse = (producer: Producer) => (consumer: FusedSink) => { - - const reject = (err: unknown) => consumer.error ? consumer.error(err) : errorFallback(err); - const { teardown, isClosed, finalize, error } = subscription(reject); - - const next = consumer.next ? consumer.next.bind(consumer) : noop; +export const fuse = (producer: Producer) => + (sink: FusedSink, subscription: Subscription): Unsubscribable => { - teardown(producer({ - next: next !== noop ? (v => isClosed() || next(v)) : noop, - error, - complete: () => finalize() && consumer.complete && consumer.complete(), - isClosed - })); + subscription.teardown(producer({ + next: subscription.around(sink.next), + complete: () => subscription.finalize() && sink.complete(), + error: err => subscription.finalize() && sink.error(err), + isClosed: subscription.isClosed.bind(subscription) + })); - return { unsubscribe: finalize }; -}; + return subscription; + }; -type FusedProducer = (consumer: FusedSink) => void | (() => void); +type FusedProducer = (sink: FusedSink, subscription: Subscription) => void; export class ObservableImpl implements Observable { @@ -67,64 +37,68 @@ export class ObservableImpl implement this._producer = producer; } - subscribe(consumer: Observer = {}) { - const reject = (err: unknown) => consumer.error ? consumer.error(err) : errorFallback(err); + subscribe(consumer: Observer = {}): Unsubscribable { + const error = (err: unknown) => consumer.error ? consumer.error(err) : errorFallback(err); const next = consumer.next ? consumer.next.bind(consumer) : noop; - - const { teardown, isClosed, finalize, error } = subscription(reject); + const complete = () => consumer.complete ? consumer.complete() : void (0); - teardown((0, this._producer)({ - next: next !== noop ? (v => isClosed() || next(v)) : noop, - error, - complete: () => finalize() && consumer.complete && consumer.complete() - })); + const subscription = new SubscriptionImpl(); - return { unsubscribe: finalize }; + this._producer({ next, error, complete }, subscription); + + return subscription; } map(mapper: (value: T) => T2) { - return new ObservableImpl(({ next, ...rest }) => - this._producer({ - next: next !== noop ? (v: T) => next(mapper(v)) : noop, - ...rest - })); + return new ObservableImpl(({ next, ...rest }, subscription) => + (0, this._producer)( + { + next: next !== noop ? (v: T) => next(mapper(v)) : noop, + ...rest + }, subscription + )); } tap({ next: tapNext, complete: tapComplete, error: tapError }: Observer) { - return new ObservableImpl(({ next, complete, error }) => - this._producer({ - next: tapNext ? (v => (tapNext(v), next(v))) : next, - complete: tapComplete ? (() => (tapComplete(), complete())) : complete, - error: tapError ? (e => (tapError(e), error(e))) : error - })); + return new ObservableImpl(({ next, complete, error }, subscription) => + (0, this._producer)( + { + next: tapNext ? (v => (tapNext(v), next(v))) : next, + complete: tapComplete ? (() => (tapComplete(), complete())) : complete, + error: tapError ? (e => (tapError(e), error(e))) : error + }, + subscription + )); } filter(predicate: Predicate) { - return new ObservableImpl(({ next, ...rest }) => - this._producer({ - next: next !== noop ? (v: T) => predicate(v) ? next(v) : void (0) : noop, - ...rest - }) - ); + return new ObservableImpl(({ next, ...rest }, subscription) => + (0, this._producer)( + { + next: next !== noop ? (v: T) => predicate(v) ? next(v) : void (0) : noop, + ...rest + }, + subscription + )); } until(predicate: Predicate) { - return new ObservableImpl(({ next, complete, ...rest }) => - this._producer({ - next: v => predicate(v) ? complete() : next(v), + return new ObservableImpl(({ next, complete, ...rest }, subscription) => + (0, this._producer)({ + next: v => predicate(v) ? subscription.finalize() && complete() : next(v), complete, ...rest - }) + }, subscription) ); } while(predicate: Predicate) { - return new ObservableImpl(({ next, complete, ...rest }) => - this._producer({ - next: v => predicate(v) ? next(v) : complete(), + return new ObservableImpl(({ next, complete, ...rest }, subscription) => + (0, this._producer)({ + next: v => predicate(v) ? next(v) : subscription.finalize() && complete(), complete, ...rest - }) + }, subscription) ); } @@ -132,23 +106,26 @@ export class ObservableImpl implement scan(accumulator: AccumulatorFn): Observable; scan(...args: [AccumulatorFn, A] | [AccumulatorFn]) { if (args.length === 1) { - return new ObservableImpl(({ next, ...rest }) => { - const [accumulator] = args; + const [accumulator] = args; + + return new ObservableImpl(({ next, ...rest }, subscription) => { let _acc: T; let index = 0; - return this._producer({ - next: next !== noop ? (v: T) => next(index++ === 0 ? _acc = v : _acc = accumulator(_acc, v)) : noop, + (0, this._producer)({ + next: next !== noop ? (v: T) => next( + index++ === 0 ? _acc = v : _acc = accumulator(_acc, v) + ) : noop, ...rest - }); + }, subscription); }); } else { - return new ObservableImpl(({ next, ...rest }) => { - const [accumulator, initial] = args; + const [accumulator, initial] = args; + return new ObservableImpl(({ next, ...rest }, subscription) => { let _acc = initial; - return this._producer({ + (0, this._producer)({ next: next !== noop ? (v: T) => next(_acc = accumulator(_acc, v)) : noop, ...rest - }); + }, subscription); }); } } @@ -158,11 +135,11 @@ export class ObservableImpl implement reduce(...args: [AccumulatorFn, A] | [AccumulatorFn]) { if (args.length === 1) { - return new ObservableImpl(({ next, complete, error }) => { - const [accumulator] = args; + const [accumulator] = args; + return new ObservableImpl(({ next, complete, error }, subscription) => { let _acc: T; let index = 0; - return this._producer({ + (0, this._producer)({ next: next !== noop ? (v: T) => { _acc = index++ === 0 ? v : accumulator(_acc, v); } : noop, @@ -175,13 +152,13 @@ export class ObservableImpl implement } }, error - }); + }, subscription); }); } else { - return new ObservableImpl(({ next, complete, error }) => { - const [accumulator, initial] = args; + const [accumulator, initial] = args; + return new ObservableImpl(({ next, complete, error }, subscription) => { let _acc = initial; - return this._producer({ + (0, this._producer)({ next: next !== noop ? (v: T) => { _acc = accumulator(_acc, v); } : noop, @@ -190,24 +167,23 @@ export class ObservableImpl implement complete(); }, error - }); + }, subscription); }); } } cat(...seq: Subscribable[]) { - return new ObservableImpl(({ next, complete: final, ...rest }) => { - let cleanup: () => void; + return new ObservableImpl(({ next, complete: final, ...rest }, subscription) => { const len = seq.length; const complete = (i: number) => i < len ? () => { - const subscription = seq[i].subscribe({ next, complete: complete(i + 1), ...rest }); - cleanup = subscription.unsubscribe.bind(subscription); - } : final; + h = seq[i].subscribe({ next, complete: complete(i + 1), ...rest }); + } : () => subscription.finalize() && final(); + - cleanup = this._producer({ next, complete: complete(0), ...rest }) ?? noop; + let h = this.subscribe({ next, complete: complete(0), ...rest }); + subscription.teardown(() => h.unsubscribe()); - return () => cleanup(); }); } @@ -219,13 +195,14 @@ export class ObservableImpl implement return new Promise((resolve, reject) => { ct.throwIfRequested(); - const { teardown, finalize, isClosed, error } = subscription(reject, ct); - teardown((0, this._producer)({ - next: (item: T) => finalize() && resolve(item), - complete: () => finalize() && reject(new Error("The sequence is empty")), - error, - isClosed - })); + const subscription = new SubscriptionImpl(); + subscription.cancellable(ct, reject); + + (0, this._producer)({ + next: resolve, + complete: () => reject(new Error("The sequence is empty")), + error: reject, + }, subscription); }); } @@ -234,15 +211,16 @@ export class ObservableImpl implement return new Promise((resolve, reject) => { ct.throwIfRequested(); + const subscription = new SubscriptionImpl(); + subscription.cancellable(ct, reject); + const acc: T[] = []; - const { teardown, finalize, isClosed, error } = subscription(reject, ct); - teardown((0, this._producer)({ - next: (item: T) => isClosed() ? void (0) : acc.push(item), - complete: () => finalize() && resolve(acc), - error, - isClosed - })); + (0, this._producer)({ + next: (item: T) => acc.push(item), + complete: () => resolve(acc), + error: reject + }, subscription); }); } @@ -250,14 +228,14 @@ export class ObservableImpl implement return new Promise((resolve, reject) => { ct.throwIfRequested(); - const { teardown, finalize, isClosed, error } = subscription(reject, ct); + const subscription = new SubscriptionImpl(); + subscription.cancellable(ct, reject); - teardown((0, this._producer)({ - next: item => !isClosed() && next(item), - complete: () => finalize() && resolve(), - error, - isClosed - })); + (0, this._producer)({ + next, + complete: resolve, + error: reject + }, subscription); }); } diff --git a/djx/src/main/ts/observable/SubscriptionImpl.ts b/djx/src/main/ts/observable/SubscriptionImpl.ts new file mode 100644 --- /dev/null +++ b/djx/src/main/ts/observable/SubscriptionImpl.ts @@ -0,0 +1,58 @@ +import { ICancellation } from "@implab/core-amd/interfaces"; + +export interface Subscription { + finalize(): boolean; + + isClosed(): boolean; + + teardown(cleanup: void | (() => void)): void; + + around(next: (item: T) => void): (item: T) => void; + + unsubscribe(): void; +} + +const noop = () => {}; + + +export class SubscriptionImpl implements Subscription { + + private _done = false; + + private _cleanup = noop; + + finalize() { + return this._done ? false : ((0,this._cleanup)(), this._done = true); + } + + isClosed() { + return this._done; + } + + teardown(cleanup: void | (() => void)) { + if (cleanup) { + if (this._done) { + cleanup(); + } else { + const _prev = this._cleanup; + this._cleanup = _prev === noop ? cleanup : () => (_prev(), cleanup()); + } + } + } + + cancellable(ct: ICancellation, reject: (err: unknown) => void) { + if (ct.isSupported()) { + const h = ct.register(reason => this.finalize() && reject(reason)); + this.teardown(() => h.destroy()); + } + } + + unsubscribe() { + this.finalize(); + } + + around(next: (item: T) => void): (item: T) => void { + return (item) => this._done ? void(0) : next(item); + } + +} \ No newline at end of file