diff --git a/djx/src/main/ts/observable.ts b/djx/src/main/ts/observable.ts --- a/djx/src/main/ts/observable.ts +++ b/djx/src/main/ts/observable.ts @@ -1,67 +1,10 @@ import { id as mid} from "module"; import { Cancellation } from "@implab/core-amd/Cancellation"; -import { ICancellation } from "@implab/core-amd/interfaces"; import { TraceSource } from "@implab/core-amd/log/TraceSource"; import { isPromise } from "@implab/core-amd/safe"; - -const trace = TraceSource.get(mid); - -/** - * The interface for the consumer of an observable sequence - */ -export interface Observer { - /** - * Called for the next element in the sequence - */ - next?(value: T): void; - - /** - * Called once when the error occurs in the sequence. - */ - error?(e: unknown): void; - - /** - * Called once at the end of the sequence. - */ - complete?(): void; -} +import { Unsubscribable, Observer, Producer, FusedSink, FusedProducer, AccumulatorFn, Observable, OperatorFn, Subscribable } from "./observable/interfaces"; -/** - * The group of functions to feed an observable. These methods are provided to - * the producer to generate a stream of events. - */ -export type Sink = { - /** - * Call to send the next element in the sequence - */ - next: (value: T) => void; - /** - * Call to notify about the error occurred in the sequence. - */ - error: (e: unknown) => void; - - /** - * Call to signal the end of the sequence. - */ - complete: () => void; - - /** - * Checks whether the sink is accepting new elements. It's safe to - * send elements to the closed sink. - */ - isClosed: () => boolean; -}; - -export type Producer = (sink: Sink) => (void | (() => void)); - -type FusedSink = Omit, "isClosed">; - -type FusedProducer = (sink: FusedSink) => (void | (() => void)); - -export interface Unsubscribable { - unsubscribe(): void; -} export const isUnsubscribable = (v: unknown): v is Unsubscribable => v !== null && v !== undefined && typeof (v as Unsubscribable).unsubscribe === "function"; @@ -69,104 +12,11 @@ export const isUnsubscribable = (v: unkn export const isSubscribable = (v: unknown): v is Subscribable => v !== null && v !== undefined && typeof (v as Subscribable).subscribe === "function"; -export interface Subscribable { - /** Subscribes a consumer to events. If a consumer isn't specified - * this method activates the producer to achieve side affects if any. - */ - subscribe(consumer?: Observer): Unsubscribable; -} -export type AccumulatorFn = (acc: A, value: T) => A; - -export type OperatorFn = (source: Observable) => Observable; - -/** The observable source of items. */ -export interface Observable extends Subscribable { - /** Transforms elements of the sequence with the specified mapper - * - * @param mapper The mapper used to transform the values - */ - map(mapper: (value: T) => T2): Observable; - - /** Injects the specified observer into the each producer to consumer chain. - * The method is used to add side effect to the events processing. - * - * @param observer The consumer for the events - */ - tap(observer: Observer): Observable; - - /** Filters elements of the sequence. The resulting sequence will - * contain only elements which match the specified predicate. - * - * @param predicate The filter predicate. - */ - filter(predicate: (value: T) => boolean): Observable; - - /** Completes the sequence once the condition is met. - * @param predicate The condition which should be met to complete the sequence - */ - until(predicate: (value: T) => boolean): Observable; - - /** Keeps the sequence running while elements satisfy the condition. - * - * @param predicate The condition which should be met to continue. - */ - while(predicate: (value: T) => boolean): Observable; - - /** Applies accumulator to each value in the sequence and - * emits the accumulated value for each source element - * - * @param accumulator - * @param initial - */ - scan(accumulator: AccumulatorFn, initial: A): Observable; - scan(accumulator: AccumulatorFn): Observable; - - /** Applies accumulator to each value in the sequence and - * emits the accumulated value at the end of the sequence - * - * @param accumulator - * @param initial - */ - reduce(accumulator: AccumulatorFn, initial: A): Observable; - reduce(accumulator: AccumulatorFn): Observable; - - /** Concatenates the specified sequences with this observable - * - * @param seq sequences to concatenate with the current observable - * - * The concatenation doesn't accumulate values from the specified sequences, - * The result of the concatenation is the new observable which will switch - * to the next observable after the previous one completes. Values emitted - * before the next observable being active are lost. - */ - cat(...seq: Subscribable[]): Observable; - - - /** Pipes the specified operator to produce the new observable - * @param op The operator consumes this observable and produces a new one - * - * The operator is a higher order function which takes a source observable - * and returns a producer for the new observable. - * - * This function can be used to create a complex mapping between source and - * resulting observables. The operator may have a state (or a side effect) - * and can be connected to multiple observables. - */ - pipe(op: OperatorFn): Observable; - - /** Waits for the next event to occur and returns a promise for the next value - * @param ct Cancellation token - */ - next(ct?: ICancellation): Promise; - - /** Collects items of the sequence to the array. */ - collect(ct?: ICancellation): Promise; -} const noop = () => { }; -const errorFallback = (e: unknown) => trace.error("Unhandled observable error: {0}", e); + const sink = (consumer: Observer) => { // eslint-disable-next-line @typescript-eslint/unbound-method diff --git a/djx/src/main/ts/observable/ObservableImpl.ts b/djx/src/main/ts/observable/ObservableImpl.ts new file mode 100644 --- /dev/null +++ b/djx/src/main/ts/observable/ObservableImpl.ts @@ -0,0 +1,169 @@ +import { id as mid } from "module"; +import { AccumulatorFn, FusedProducer, Observable, Observer, OperatorFn } from "./interfaces"; +import { TraceSource } from "@implab/core-amd/log/TraceSource"; +import { scan } from "dojo/parser"; +import { filter } from "dojo/query"; + +const trace = TraceSource.get(mid); + +const noop = () => { }; + +const errorFallback = (e: unknown) => trace.error("Unhandled observable error: {0}", e); + +const sink = (consumer: Observer) => { + // eslint-disable-next-line @typescript-eslint/unbound-method + const { next, error, complete } = consumer; + return { + next: next ? next.bind(consumer) : noop, + error: error ? error.bind(consumer) : errorFallback, // report unhandled errors + complete: complete ? complete.bind(consumer) : noop + }; +}; + +const + +export class ObservableImpl implements Observable { + + private readonly _producer: FusedProducer; + + constructor(producer: FusedProducer) { + this._producer = producer; + } + + subscribe(consumer: Observer = {}) { + return { + unsubscribe: this._producer(sink(consumer)) ?? noop + } + } + + map(mapper: (value: T) => T2) { + return new ObservableImpl(({ next, ...rest }) => + this._producer({ + next: next !== noop ? (v: T) => next(mapper(v)) : noop, + ...rest + })); + } + + tap({ next: tapNext, complete: tapComplete, error: tapError }: Observer) { + return new ObservableImpl(({ next, complete, error }) => + this._producer({ + next: tapNext ? (v => (tapNext(v), next(v))) : next, + complete: tapComplete ? (() => (tapComplete(), complete())) : complete, + error: tapError ? (e => (tapError(e), error(e))) : error + })); + } + + filter: (predicate) => _observe(({ next, ...rest }) => + producer({ + next: next !== noop ? (v: T) => predicate(v) ? next(v) : void (0) : noop, + ...rest + }) + ), + +until: predicate => _observe(({ next, complete, ...rest }) => + producer({ + next: v => predicate(v) ? complete() : next(v), + complete, + ...rest + }) +), + + while: predicate => _observe(({ next, complete, ...rest }) => + producer({ + next: v => predicate(v) ? next(v) : complete(), + complete, + ...rest + }) +), + + scan: (...args: [AccumulatorFn, A] | [AccumulatorFn]) => _observe(({ next, ...rest }) => { + if (args.length === 1) { + const [accumulator] = args; + let _acc: T; + let index = 0; + return producer({ + next: next !== noop ? (v: T) => next(index++ === 0 ? _acc = v : _acc = accumulator(_acc, v)) : noop, + ...rest + }); + } else { + const [accumulator, initial] = args; + let _acc = initial; + return producer({ + next: next !== noop ? (v: T) => next(_acc = accumulator(_acc, v)) : noop, + ...rest + }); + } + }), + + reduce: (...args: [AccumulatorFn, A] | [AccumulatorFn]) => _observe(({ next, complete, error }) => { + if (args.length === 1) { + const [accumulator] = args; + let _acc: T; + let index = 0; + return producer({ + next: next !== noop ? (v: T) => { + _acc = index++ === 0 ? v : accumulator(_acc, v); + } : noop, + complete: () => { + if (index === 0) { + error(new Error("The sequence can't be empty")); + } else { + next(_acc); + complete(); + } + }, + error + }); + } else { + const [accumulator, initial] = args; + let _acc = initial; + return producer({ + next: next !== noop ? (v: T) => { + _acc = accumulator(_acc, v); + } : noop, + complete: () => { + next(_acc); + complete(); + }, + error + }); + } + }), + + cat: (...seq) => _observe(({ next, complete: final, ...rest }) => { + let cleanup: () => void; + const len = seq.length; + const complete = (i: number) => i < len ? + () => { + const subscription = seq[i].subscribe({ next, complete: complete(i + 1), ...rest }); + cleanup = subscription.unsubscribe.bind(subscription); + } : final; + + cleanup = producer({ next, complete: complete(0), ...rest }) ?? noop; + + return () => cleanup(); + }), + + pipe: (op: OperatorFn) => op(_observe(producer)), + + next: collect( + producer, + ({ next, complete, error }) => ({ + next: v => (next(v), complete()), + complete: () => error(new Error("The sequence is empty")), + error + }) + ), + + collect: collect( + producer, + ({ next, complete, error }) => { + const data: T[] = []; + return { + next: v => data.push(v), + complete: () => (next(data), complete()), + error + }; + } + ) +} \ No newline at end of file diff --git a/djx/src/main/ts/observable/interfaces.d.ts b/djx/src/main/ts/observable/interfaces.d.ts new file mode 100644 --- /dev/null +++ b/djx/src/main/ts/observable/interfaces.d.ts @@ -0,0 +1,153 @@ +import { ICancellation } from "@implab/core-amd/interfaces"; + +/** + * The interface for the consumer of an observable sequence + */ +export interface Observer { + /** + * Called for the next element in the sequence + */ + next?(value: T): void; + + /** + * Called once when the error occurs in the sequence. + */ + error?(e: unknown): void; + + /** + * Called once at the end of the sequence. + */ + complete?(): void; +} + +/** + * The group of functions to feed an observable. These methods are provided to + * the producer to generate a stream of events. + */ +export type Sink = { + /** + * Call to send the next element in the sequence + */ + next: (value: T) => void; + + /** + * Call to notify about the error occurred in the sequence. + */ + error: (e: unknown) => void; + + /** + * Call to signal the end of the sequence. + */ + complete: () => void; + + /** + * Checks whether the sink is accepting new elements. It's safe to + * send elements to the closed sink. + */ + isClosed: () => boolean; +}; + +export type Producer = (sink: Sink) => (void | (() => void)); + +export type FusedSink = Omit, "isClosed">; + +export type FusedProducer = (sink: FusedSink) => (void | (() => void)); + +export interface Unsubscribable { + unsubscribe(): void; +} + +export interface Subscribable { + /** Subscribes a consumer to events. If a consumer isn't specified + * this method activates the producer to achieve side affects if any. + */ + subscribe(consumer?: Observer): Unsubscribable; +} + +export type AccumulatorFn = (acc: A, value: T) => A; + +export type OperatorFn = (source: Observable) => Observable; + +/** The observable source of items. */ +export interface Observable extends Subscribable { + /** Transforms elements of the sequence with the specified mapper + * + * @param mapper The mapper used to transform the values + */ + map(mapper: (value: T) => T2): Observable; + + /** Injects the specified observer into the each producer to consumer chain. + * The method is used to add side effect to the events processing. + * + * @param observer The consumer for the events + */ + tap(observer: Observer): Observable; + + /** Filters elements of the sequence. The resulting sequence will + * contain only elements which match the specified predicate. + * + * @param predicate The filter predicate. + */ + filter(predicate: (value: T) => boolean): Observable; + + /** Completes the sequence once the condition is met. + * @param predicate The condition which should be met to complete the sequence + */ + until(predicate: (value: T) => boolean): Observable; + + /** Keeps the sequence running while elements satisfy the condition. + * + * @param predicate The condition which should be met to continue. + */ + while(predicate: (value: T) => boolean): Observable; + + /** Applies accumulator to each value in the sequence and + * emits the accumulated value for each source element + * + * @param accumulator + * @param initial + */ + scan(accumulator: AccumulatorFn, initial: A): Observable; + scan(accumulator: AccumulatorFn): Observable; + + /** Applies accumulator to each value in the sequence and + * emits the accumulated value at the end of the sequence + * + * @param accumulator + * @param initial + */ + reduce(accumulator: AccumulatorFn, initial: A): Observable; + reduce(accumulator: AccumulatorFn): Observable; + + /** Concatenates the specified sequences with this observable + * + * @param seq sequences to concatenate with the current observable + * + * The concatenation doesn't accumulate values from the specified sequences, + * The result of the concatenation is the new observable which will switch + * to the next observable after the previous one completes. Values emitted + * before the next observable being active are lost. + */ + cat(...seq: Subscribable[]): Observable; + + + /** Pipes the specified operator to produce the new observable + * @param op The operator consumes this observable and produces a new one + * + * The operator is a higher order function which takes a source observable + * and returns a producer for the new observable. + * + * This function can be used to create a complex mapping between source and + * resulting observables. The operator may have a state (or a side effect) + * and can be connected to multiple observables. + */ + pipe(op: OperatorFn): Observable; + + /** Waits for the next event to occur and returns a promise for the next value + * @param ct Cancellation token + */ + next(ct?: ICancellation): Promise; + + /** Collects items of the sequence to the array. */ + collect(ct?: ICancellation): Promise; +} \ No newline at end of file diff --git a/playground/package-lock.json b/playground/package-lock.json --- a/playground/package-lock.json +++ b/playground/package-lock.json @@ -30,6 +30,7 @@ }, "../djx/build/npm/package": { "name": "@implab/djx", + "version": "1.10.0", "dev": true, "license": "BSD-2-Clause", "peerDependencies": {