# HG changeset patch # User cin # Date 2023-01-07 21:36:35 # Node ID fbe158a5752ad4d38382a265d9db5a402e4f1f01 # Parent dd0b7cf0ce69501aca6e8c30f2ae4a1ca0540e21 added while, until methods to the observable interface. added stateful producer diff --git a/djx/src/main/ts/observable.ts b/djx/src/main/ts/observable.ts --- a/djx/src/main/ts/observable.ts +++ b/djx/src/main/ts/observable.ts @@ -66,6 +66,8 @@ export interface Subscribable { export type AccumulatorFn = (acc: A, value: T) => A; +export type OperatorFn = (source: Observable) => Producer; + /** The observable source of items. */ export interface Observable extends Subscribable { /** Transforms elements of the sequence with the specified mapper @@ -81,6 +83,17 @@ export interface Observable extends S */ filter(predicate: (value: T) => boolean): Observable; + /** Completes the sequence once the condition is met. + * @param predicate The condition which should be met to complete the sequence + */ + until(predicate: (value: T) => boolean): Observable; + + /** Keeps the sequence running while elements satisfy the condition. + * + * @param predicate The condition which should be met to continue. + */ + while(predicate: (value: T) => boolean): Observable; + /** Applies accumulator to each value in the sequence and * emits the accumulated value for each source element * @@ -102,13 +115,26 @@ export interface Observable extends S /** Concatenates the specified sequences with this observable * * @param seq sequences to concatenate with the current observable + * + * The concatenation doesn't accumulate values from the specified sequences, + * The result of the concatenation is the new observable which will switch + * to the next observable after the previous one completes. Values emitted + * before the next observable being active are lost. */ cat(...seq: Subscribable[]): Observable; + /** Pipes the specified operator to produce the new observable - * @param op The operator which consumes this observable and produces a new one + * @param op The operator consumes this observable and produces a new one + * + * The operator is a higher order function which takes a source observable + * and returns a producer for the new observable. + * + * This function can be used to create a complex mapping between source and + * resulting observables. The operator may have a state (or a side effect) + * and can be connected to multiple observables. */ - pipe(op: (source: Observable) => Producer): Observable; + pipe(op: OperatorFn): Observable; /** Waits for the next event to occur and returns a promise for the next value * @param ct Cancellation token to @@ -142,6 +168,8 @@ const fuse = (producer: Producer) void (0) : (done = true, cleanup(), fn(...args)); + const _fin0 = () => done ? void (0) : (done = true, cleanup()); + const safeSink = { next: (value: T) => { !done && next(value); }, error: _fin(error), @@ -149,9 +177,7 @@ const fuse = (producer: Producer) isClosed: () => done }; cleanup = producer(safeSink) ?? noop; - return done ? - (cleanup(), noop) : - _fin(noop); + return done ? cleanup() : _fin0; }; const _observe = (producer: Producer): Observable => ({ @@ -173,6 +199,22 @@ const _observe = (producer: Producer< }) ), + until: predicate => _observe(({ next, complete, ...rest }) => + producer({ + next: v => predicate(v) ? complete() : next(v), + complete, + ...rest + }) + ), + + while: predicate => _observe(({ next, complete, ...rest }) => + producer({ + next: v => predicate(v) ? next(v) : complete(), + complete, + ...rest + }) + ), + scan: (...args: [AccumulatorFn, A] | [AccumulatorFn]) => _observe(({ next, ...rest }) => { if (args.length === 1) { const [accumulator] = args; @@ -293,7 +335,10 @@ export const streamArray = (items: T[ ); export const streamPromise = (promise: PromiseLike) => observe( - ({next, error, complete}) => void promise.then(v => (next(v), complete()), error) + ({ next, error, complete }) => void promise.then( + v => (next(v), complete()), + error + ) ); export const of = (...items: T[]) => _observe( @@ -303,4 +348,90 @@ export const of = (...items: T[]) => ) ); -export const empty = _observe(({ complete }) => complete()); \ No newline at end of file +export const empty = _observe(({ complete }) => complete()); + +/** + * Creates a mutable state and the observable for the stored value. + * + * @param value The initial value for the state + * @returns an array of three elements `[observable, setter, getter]` + * + * The returned observable keeps the actual value and will emit it as the next + * element each time a consumer subscribes the observable. + * + * Calling the setter will update the stored value in the observable and notify + * all consumers. + */ +export const stateful = (producer: Producer): Producer => { + const fusedProducer = fuse(producer); + type Status = "active" | "complete" | "error"; + + let lastValue: T; + let hasValue = false; + let status: Status = "active"; + let lastError: unknown; + let subscribers: Sink[] = []; + + const sink: Sink = { + isClosed: () => status !== "active", + complete: () => { + if (status === "active") { + status = "complete"; + const _subscribers = subscribers; + subscribers = []; + _subscribers.forEach(s => s.complete()); + } + }, + error: e => { + if (status === "active") { + status = "error"; + lastError = e; + const _subscribers = subscribers; + subscribers = []; + _subscribers.forEach(s => s.error(e)); + } + }, + next: v => { + if (status === "active") { + hasValue = true; + lastValue = v; + const _subscribers = subscribers; + _subscribers.forEach(s => s.next(v)); + } + } + }; + + fusedProducer(sink); + + return (s: Sink) => { + const _subscribers = subscribers; + switch (status) { + case "active": + if (hasValue) + s.next(lastValue); // if hasValue is true, + // lastValue has a valid value + subscribers.push(s); + return () => { + if (_subscribers === subscribers) { + const pos = subscribers.indexOf(s); + if (pos >= 0) + subscribers.splice(pos, 1); + } + }; + case "complete": + s.complete(); + break; + case "error": + s.error(lastError); + break; + } + }; +}; + +const subject = (producer: Producer): Producer => { + const fusedProducer = fuse(producer); + + return () => { + + }; +}; \ No newline at end of file diff --git a/djx/src/test/ts/plan.ts b/djx/src/test/ts/plan.ts --- a/djx/src/test/ts/plan.ts +++ b/djx/src/test/ts/plan.ts @@ -1,2 +1,3 @@ import "./declare-tests"; -import "./observable-tests"; \ No newline at end of file +import "./observable-tests"; +import "./state-tests"; \ No newline at end of file diff --git a/djx/src/test/ts/state-tests.ts b/djx/src/test/ts/state-tests.ts new file mode 100644 --- /dev/null +++ b/djx/src/test/ts/state-tests.ts @@ -0,0 +1,27 @@ +import { observe, stateful } from "./observable"; +import * as t from "tap"; + +interface CounterState { + count: number; + + label: "low" | "mid" | "high" +} + +let set: (v: CounterState) => void = () => void (0); +const initial: CounterState = { count: 0, label: "low" }; +let value = initial; + +const obs = observe(stateful(({ next }) => { + next(initial); + set = next; +})); + +set({ count: 10, label: "mid" }); + +obs.subscribe({ + next: v => value = v +}); + +t.equal(value.count, 10, "State should update"); + +set({ count: 20, label: "high" }); diff --git a/playground/package-lock.json b/playground/package-lock.json --- a/playground/package-lock.json +++ b/playground/package-lock.json @@ -30,7 +30,6 @@ }, "../djx/build/npm/package": { "name": "@implab/djx", - "version": "1.6.3", "dev": true, "license": "BSD-2-Clause", "peerDependencies": {