# HG changeset patch # User cin # Date 2023-04-29 20:10:05 # Node ID a3fba6b6c42e3c7daacf53969beb426e18d2a13c # Parent b92de241dece63ab5af761bc4e73614dd62d2d55 added 'buffer' and 'subject' observable operators diff --git a/.vscode/settings.json b/.vscode/settings.json --- a/.vscode/settings.json +++ b/.vscode/settings.json @@ -10,6 +10,7 @@ "dijit", "djbase", "djclass", - "Unsubscribable" + "Unsubscribable", + "wpos" ] } \ No newline at end of file diff --git a/djx/src/main/ts/observable.ts b/djx/src/main/ts/observable.ts --- a/djx/src/main/ts/observable.ts +++ b/djx/src/main/ts/observable.ts @@ -9,17 +9,17 @@ export interface Observer { /** * Called for the next element in the sequence */ - next: (value: T) => void; + next?: (value: T) => void; /** * Called once when the error occurs in the sequence. */ - error: (e: unknown) => void; + error?: (e: unknown) => void; /** * Called once at the end of the sequence. */ - complete: () => void; + complete?: () => void; } /** @@ -62,7 +62,7 @@ export const isSubscribable = ).subscribe === "function"; export interface Subscribable { - subscribe(consumer: Partial>): Unsubscribable; + subscribe(consumer: Observer): Unsubscribable; } export type AccumulatorFn = (acc: A, value: T) => A; @@ -148,7 +148,7 @@ export interface Observable extends S const noop = () => { }; -const sink = (consumer: Partial>) => { +const sink = (consumer: Observer) => { const { next, error, complete } = consumer; return { next: next ? next.bind(consumer) : noop, @@ -185,7 +185,7 @@ const fuse = (producer: Producer) }; const _observe = (producer: Producer): Observable => ({ - subscribe: (consumer: Partial>) => ({ + subscribe: (consumer: Observer) => ({ unsubscribe: producer(sink(consumer)) ?? noop }), @@ -294,7 +294,7 @@ const _observe = (producer: Producer< return () => cleanup(); }), - pipe: (op: (source: Observable) => Producer) => observe(op(_observe(producer))), + pipe: (op: OperatorFn) => observe(op(_observe(producer))), next: collect( producer, @@ -345,6 +345,7 @@ const collect = ( export const observe = (producer: Producer) => _observe(fuse(producer)); +/** Converts an array to the observable sequence of its elements. */ export const ofArray = (items: T[]) => _observe( ({ next, complete }) => ( items.forEach(next), @@ -352,6 +353,13 @@ export const ofArray = (items: T[]) = ) ); +/** Converts a subscribable to the observable */ +export const ofSubscribable = (subscribable: Subscribable) => + observe(sink => { + const subscription = subscribable.subscribe(sink); + return () => subscription.unsubscribe(); + }); + const of1 = (item: T | PromiseLike) => observe( ({ next, error, complete }) => isPromise(item) ? @@ -362,6 +370,9 @@ const of1 = (item: T | PromiseLike (next(item), complete()) ); +/** Converts a list of parameter values to the observable sequence. The + * order of elements in the list will be preserved in the resulting sequence. + */ export const of = (...items: (T | PromiseLike)[]) => items.length === 1 ? of1(items[0]) : observe( @@ -389,135 +400,3 @@ export const of = (...items: (T | Pro ); export const empty = _observe(({ complete }) => complete()); - -/** - * Creates a mutable state and the observable for the stored value. - * - * @param value The initial value for the state - * @returns an array of three elements `[observable, setter, getter]` - * - * The returned observable keeps the actual value and will emit it as the next - * element each time a consumer subscribes the observable. - * - * Calling the setter will update the stored value in the observable and notify - * all consumers. - */ -export const stateful = (producer: Producer): Producer => { - const fusedProducer = fuse(producer); - type Status = "active" | "complete" | "error"; - - let lastValue: T; - let hasValue = false; - let status: Status = "active"; - let lastError: unknown; - let subscribers: Sink[] = []; - - const sink: Sink = { - isClosed: () => status !== "active", - complete: () => { - if (status === "active") { - status = "complete"; - const _subscribers = subscribers; - subscribers = []; - _subscribers.forEach(s => s.complete()); - } - }, - error: e => { - if (status === "active") { - status = "error"; - lastError = e; - const _subscribers = subscribers; - subscribers = []; - _subscribers.forEach(s => s.error(e)); - } - }, - next: v => { - if (status === "active") { - hasValue = true; - lastValue = v; - const _subscribers = subscribers; - _subscribers.forEach(s => s.next(v)); - } - } - }; - - fusedProducer(sink); - - return (s: Sink) => { - const _subscribers = subscribers; - switch (status) { - case "active": - if (hasValue) - s.next(lastValue); // if hasValue is true, - // lastValue has a valid value - subscribers.push(s); - return () => { - if (_subscribers === subscribers) { - const pos = subscribers.indexOf(s); - if (pos >= 0) - subscribers.splice(pos, 1); - } - }; - case "complete": - s.complete(); - break; - case "error": - s.error(lastError); - break; - } - }; -}; - -/** Create the producer which will be called once when the first subscriber is - * attached, next subscribers would share the same producer. When all - * subscribers are removed the producer will be cleaned up. - * - * Use this wrapper to prevent spawning multiple producers. - * - * @param producer The source producer - * @returns The wrapped producer - */ -export const subject = (producer: Producer): Producer => { - const fusedProducer = fuse(producer); - - let subscribers: Sink[] = []; - - let cleanup = noop; - - const sink: Sink = { - isClosed: () => false, - complete: () => { - const _subscribers = subscribers; - subscribers = []; - _subscribers.forEach(s => s.complete()); - cleanup(); - }, - error: e => { - const _subscribers = subscribers; - subscribers = []; - _subscribers.forEach(s => s.error(e)); - cleanup(); - }, - next: v => { - const _subscribers = subscribers; - _subscribers.forEach(s => s.next(v)); - } - }; - - return client => { - const _subscribers = subscribers; - subscribers.push(client); - if (subscribers.length === 1) - cleanup = fusedProducer(sink) ?? noop; - - return () => { - if (_subscribers === subscribers) { - const pos = subscribers.indexOf(client); - if (pos >= 0) - subscribers.splice(pos, 1); - if (!subscribers.length) - cleanup(); - } - }; - }; -}; \ No newline at end of file diff --git a/djx/src/main/ts/operators/buffer.ts b/djx/src/main/ts/operators/buffer.ts new file mode 100644 --- /dev/null +++ b/djx/src/main/ts/operators/buffer.ts @@ -0,0 +1,105 @@ +import { Cancellation } from "@implab/core-amd/Cancellation"; +import { CancelledError } from "@implab/core-amd/CancelledError"; +import { Producer, Sink, Subscribable } from "../observable"; +import { Scope } from "../tsx/Scope"; + +/** + * Creates a buffer with the specified length. The buffer will immediately + * subscribe to the source observable and start accumulating values. + * + * The resulting observable will repeat the buffered values for each new subscriber. + * + * @param length The number of elements to store. + * @param ct Cancellation token to unsubscribe from the original observable. + * + */ +export const buffer = (length: number, ct = Cancellation.none) => (source: Subscribable): Producer => { + type Status = "active" | "complete" | "error"; + + // ring-buffer, wpos will rotate in range (0...length-1) + let wpos = 0; + const buffer: T[] = []; + + // writes the next value to the buffer + const write = (value: T) => { + buffer[wpos] = value; + wpos = (wpos + 1) % length; + }; + + // reads contents of the buffer + // cb will be called for each value in the buffer + const read = (cb: (item: T) => void) => { + const start = wpos + length - buffer.length; + const end = wpos + length; + + for(let pos = start; pos < end; pos++ ) + cb(buffer[pos % length]); + }; + + let status: Status = "active"; + let lastError: unknown; + let subscribers: Sink[] = []; + + const scope = new Scope(); + + // cleanup method to release resources held by this subscription + const cleanup = (cb: (item: Sink) => void) => { + scope.destroy(); + const _subscribers = subscribers; + subscribers = []; + _subscribers.forEach(cb); + }; + + const sink: Sink = { + isClosed: () => status !== "active", + complete: () => { + if (status === "active") { + status = "complete"; + cleanup(s => s.complete()); + } + }, + error: e => { + if (status === "active") { + status = "error"; + lastError = e; + cleanup(s => s.error(e)); + } + }, + next: v => { + if (status === "active") { + write(v); + const _subscribers = subscribers; + _subscribers.forEach(s => s.next(v)); + } + } + }; + + if (ct.isRequested()) { + sink.error(new CancelledError("The operation was cancelled", ct)); + } else { + scope.own(source.subscribe(sink)); + scope.own(ct.register(e => sink.error(e))); + } + + return (s: Sink) => { + const _subscribers = subscribers; + read(s.next); + switch (status) { + case "active": + subscribers.push(s); + return () => { + if (_subscribers === subscribers) { + const pos = subscribers.indexOf(s); + if (pos >= 0) + subscribers.splice(pos, 1); + } + }; + case "complete": + s.complete(); + break; + case "error": + s.error(lastError); + break; + } + }; +}; \ No newline at end of file diff --git a/djx/src/main/ts/operators/subject.ts b/djx/src/main/ts/operators/subject.ts new file mode 100644 --- /dev/null +++ b/djx/src/main/ts/operators/subject.ts @@ -0,0 +1,50 @@ +import { Producer, Sink, Subscribable } from "../observable"; + +const noop = () => { }; + +/** Joins multiple subscribers to the single one. The resulting subscriber + * will be created when the first client subscribes and will be released + * with the the last client unsubscribed. + * + * Use this wrapper to prevent spawning multiple producers. + * + * @param source The source observable + * @returns The wrapped producer + */ +export const subject = (source: Subscribable): Producer => { + let subscribers: Sink[] = []; + + let subscription = { unsubscribe: noop }; + + // cleanup method to release resources held by this subscription + const cleanup = (cb: (item: Sink) => void) => { + const _subscribers = subscribers; + subscribers = []; + _subscribers.forEach(cb); + subscription.unsubscribe(); + }; + + const sink: Sink = { + isClosed: () => false, + complete: () => cleanup(s => s.complete()), + error: e => cleanup(s => s.error(e)), + next: v => subscribers.forEach(s => s.next(v)) + }; + + return client => { + const _subscribers = subscribers; + subscribers.push(client); + if (subscribers.length === 1) + subscription = source.subscribe(sink); + + return () => { + if (_subscribers === subscribers) { + const pos = subscribers.indexOf(client); + if (pos >= 0) + subscribers.splice(pos, 1); + if (!subscribers.length) + subscription.unsubscribe(); + } + }; + }; +}; diff --git a/djx/src/main/ts/tsx/render.ts b/djx/src/main/ts/tsx/render.ts --- a/djx/src/main/ts/tsx/render.ts +++ b/djx/src/main/ts/tsx/render.ts @@ -113,6 +113,7 @@ const endRender = (prev: Context, curren // called when the first beginRender is called for this iteration const onRendering = () => { + trace.log("Rendering started"); setTimeout(() => { if (_renderCount !== 0) trace.error("Rendering tasks aren't finished, currently running = {0}", _renderCount); @@ -121,6 +122,7 @@ const onRendering = () => { // called when all render operations are complete const onRendered = () => { + trace.log("Rendering compete"); _renderedHooks.forEach(guard); _renderedHooks = []; }; diff --git a/djx/src/test/ts/state-tests.ts b/djx/src/test/ts/state-tests.ts --- a/djx/src/test/ts/state-tests.ts +++ b/djx/src/test/ts/state-tests.ts @@ -1,4 +1,5 @@ -import { observe, stateful } from "./observable"; +import { observe } from "./observable"; +import { buffer } from "./operators/buffer"; import * as t from "tap"; interface CounterState { @@ -11,10 +12,10 @@ let set: (v: CounterState) => void = () const initial: CounterState = { count: 0, label: "low" }; let value = initial; -const obs = observe(stateful(({ next }) => { +const obs = observe(({ next }) => { next(initial); set = next; -})); +}).pipe(buffer(2)); set({ count: 10, label: "mid" }); diff --git a/djx/src/test/ts/subject-tests.ts b/djx/src/test/ts/subject-tests.ts --- a/djx/src/test/ts/subject-tests.ts +++ b/djx/src/test/ts/subject-tests.ts @@ -1,11 +1,12 @@ -import { observe, subject } from "./observable"; +import { observe } from "./observable"; +import { subject } from "./operators/subject"; import * as tap from "tap"; tap.test("Subject tests", t => { let nextEvent: (value: string) => void = () => void (0); - const subj1 = observe(subject(({ next }) => { + const subj1 = observe(({ next }) => { t.comment("Start subject"); nextEvent = next; @@ -14,7 +15,7 @@ tap.test("Subject tests", t => { nextEvent = () => void (0); t.comment("Stop subject"); }; - })); + }).pipe(subject); const h1 = subj1.subscribe({ next: v => t.comment(`h1 next: ${v}`) @@ -43,7 +44,7 @@ tap.test("Subject tests #2", t => { let nextEvent: (value: string) => void = () => void (0); - const subj1 = observe(subject(({ next, complete }) => { + const subj1 = observe(({ next, complete }) => { t.comment("Start subject"); complete(); @@ -53,7 +54,7 @@ tap.test("Subject tests #2", t => { nextEvent = () => void (0); t.comment("Stop subject"); }; - })); + }).pipe(subject); const h1 = subj1.subscribe({ next: v => t.comment(`h1 next: ${v}`)