diff --git a/djx/src/main/ts/observable.ts b/djx/src/main/ts/observable.ts --- a/djx/src/main/ts/observable.ts +++ b/djx/src/main/ts/observable.ts @@ -67,7 +67,7 @@ export interface Subscribable { export type AccumulatorFn = (acc: A, value: T) => A; -export type OperatorFn = (source: Observable) => Producer; +export type OperatorFn = (source: Observable) => Observable; /** The observable source of items. */ export interface Observable extends Subscribable { @@ -277,24 +277,19 @@ const _observe = (producer: Producer< cat: (...seq) => _observe(({ next, complete: final, ...rest }) => { let cleanup: () => void; - const complete = () => { - const continuation = seq.shift(); - if (continuation) { - // if we have a next sequence, subscribe to it - const subscription = continuation.subscribe({ next, complete, ...rest }); + const len = seq.length; + const complete = (i: number) => i < len ? + () => { + const subscription = seq[i].subscribe({ next, complete: complete(i + 1), ...rest }); cleanup = subscription.unsubscribe.bind(subscription); - } else { - // otherwise notify the consumer about completion - final(); - } - }; + } : final; - cleanup = producer({ next, complete, ...rest }) ?? noop; + cleanup = producer({ next, complete: complete(0), ...rest }) ?? noop; return () => cleanup(); }), - pipe: (op: OperatorFn) => observe(op(_observe(producer))), + pipe: (op: OperatorFn) => op(_observe(producer)), next: collect( producer, diff --git a/djx/src/main/ts/operators/buffer.ts b/djx/src/main/ts/operators/buffer.ts --- a/djx/src/main/ts/operators/buffer.ts +++ b/djx/src/main/ts/operators/buffer.ts @@ -1,6 +1,6 @@ import { Cancellation } from "@implab/core-amd/Cancellation"; import { CancelledError } from "@implab/core-amd/CancelledError"; -import { Producer, Sink, Subscribable } from "../observable"; +import { Observable, Sink, Subscribable, observe } from "../observable"; import { Scope } from "../tsx/Scope"; /** @@ -13,7 +13,7 @@ import { Scope } from "../tsx/Scope"; * @param ct Cancellation token to unsubscribe from the original observable. * */ -export const buffer = (length: number, ct = Cancellation.none) => (source: Subscribable): Producer => { +export const buffer = (length: number, ct = Cancellation.none) => (source: Subscribable): Observable => { type Status = "active" | "complete" | "error"; // ring-buffer, wpos will rotate in range (0...length-1) @@ -81,7 +81,7 @@ export const buffer = (length: number, c scope.own(ct.register(e => sink.error(e))); } - return (s: Sink) => { + return observe( s => { const _subscribers = subscribers; read(s.next); switch (status) { @@ -101,5 +101,5 @@ export const buffer = (length: number, c s.error(lastError); break; } - }; + }); }; \ No newline at end of file diff --git a/djx/src/main/ts/operators/subject.ts b/djx/src/main/ts/operators/subject.ts --- a/djx/src/main/ts/operators/subject.ts +++ b/djx/src/main/ts/operators/subject.ts @@ -1,4 +1,4 @@ -import { Producer, Sink, Subscribable } from "../observable"; +import { Observable, Sink, Subscribable, observe } from "../observable"; const noop = () => { }; @@ -11,7 +11,7 @@ const noop = () => { }; * @param source The source observable * @returns The wrapped producer */ -export const subject = (source: Subscribable): Producer => { +export const subject = (source: Subscribable): Observable => { let subscribers: Sink[] = []; let subscription = { unsubscribe: noop }; @@ -31,7 +31,7 @@ export const subject = (source: Subsc next: v => subscribers.forEach(s => s.next(v)) }; - return client => { + return observe(client => { const _subscribers = subscribers; subscribers.push(client); if (subscribers.length === 1) @@ -46,5 +46,5 @@ export const subject = (source: Subsc subscription.unsubscribe(); } }; - }; + }); }; diff --git a/djx/src/main/ts/store.ts b/djx/src/main/ts/store.ts --- a/djx/src/main/ts/store.ts +++ b/djx/src/main/ts/store.ts @@ -1,6 +1,8 @@ import { PromiseOrValue } from "@implab/core-amd/interfaces"; -import { isPromise } from "@implab/core-amd/safe"; -import { observe, Observable } from "./observable"; +import { isCancellable, isPromise } from "@implab/core-amd/safe"; +import { observe, Observable, empty } from "./observable"; +import { after } from "dojo/aspect"; +import { subject } from "./operators/subject"; export interface OrderedUpdate { /** The item is being updated */ @@ -59,3 +61,63 @@ export const query = (store: Qu }); }; + +interface IndexedStore { + get(id: string | number): PromiseLike | T | null | undefined; +} + +interface Notifications { + notify(item: T | undefined, id: string | number | undefined): void; +} + +const hasNotifications = (x: unknown): x is Notifications => + typeof x === "object" && x !== null && (typeof (x as Notifications).notify === "function"); + +interface GetOpts { + observe?: boolean; +} + +type Change = [item: NonNullable, id: string | number | undefined] | +[item: undefined | null, id: string | number]; + +const filterItem = (itemId: string | number) => + (source: Observable>) => + observe(({ next, complete, error }) => { + const subscription = source + .filter(([, id]) => id === itemId) + .subscribe({ + next: ([item]) => item !== null && item !== undefined ? next(item) : complete(), + complete, + error + }); + return () => subscription.unsubscribe(); + }); + +export const get = (store: IndexedStore) => { + const changes = hasNotifications(store) ? + observe>(({ next }) => { + const handle = after(store, "notify", (...args: Change) => next(args), true); + return () => handle.remove(); + }).pipe(subject) : empty; + + + return (id: string | number, opts: GetOpts = {}) => + observe(({ next, complete, error }) => { + const result = store.get(id); + + const handle = (x: T | null | undefined) => { + if (x !== null && x !== undefined) + next(x); + complete(); + }; + + if (isPromise(result)) { + result.then(handle, error); + + if (isCancellable(result)) + return () => result.cancel(); + } else { + handle(result); + } + }).cat(opts.observe !== false ? changes.pipe(filterItem(id)) : empty); +}; \ No newline at end of file diff --git a/djx/src/test/ts/observable-store-tests.ts b/djx/src/test/ts/observable-store-tests.ts new file mode 100644 --- /dev/null +++ b/djx/src/test/ts/observable-store-tests.ts @@ -0,0 +1,65 @@ +import Memory = require("dojo/store/Memory"); +import Observerable = require("dojo/store/Observable"); +import { get } from "./store"; +import tap = require("tap"); + +interface Person { + id: string; + + name: string; + + age: number; +} + +tap.test("store::get(...) tests", async t => { + const store = new Observerable(new Memory()); + + const getPerson = get(store); + + const peterId = "id:peter"; + + const samId = "id:sam"; + + const peter = getPerson(peterId); + const sam = getPerson(samId); + + const seq1 = await getPerson(peterId, { observe: false }).collect(); + + t.ok(seq1.length === 0, "Should be empty sequence"); + + let peterChangeCount = 0; + let samChangeCount = 0; + let peterDeleted = 0; + + const peterSubscription = peter.subscribe({ + next: () => peterChangeCount++, + complete: () => peterDeleted++ + }); + const samSubscription = sam.subscribe({ + next: () => samChangeCount++ + }); + + try { + t.equal(peterChangeCount, 0, "Should be no changes recorded"); + + store.put({id: peterId, name: "Peter", age: 30 }); + + t.equal(peterChangeCount, 1, "Should record 1 object change"); + t.equal(samChangeCount, 0, "Should not record other object changes"); + + store.remove(peterId); + + t.equal(peterDeleted, 1, "Should complete sequence"); + t.equal(peterChangeCount, 1, "Should not record remove operations"); + + store.put({id: peterId, name: "Peter", age: 29}); + + t.equal(peterChangeCount, 1, "Should not record changes after completion"); + + } finally { + peterSubscription.unsubscribe(); + samSubscription.unsubscribe(); + } + + +}).catch(() => { }); \ No newline at end of file diff --git a/djx/src/test/ts/observable-tests.ts b/djx/src/test/ts/observable-tests.ts --- a/djx/src/test/ts/observable-tests.ts +++ b/djx/src/test/ts/observable-tests.ts @@ -82,7 +82,7 @@ const consumer3 = { }; -const subj3 = subj2.pipe<"even" | "odd">(self => ({ next, complete, error }) => { +const subj3 = subj2.pipe<"even" | "odd">(self => observe(({ next, complete, error }) => { consumer3.subscribe(); let count = 0; const h = self.subscribe({ @@ -101,7 +101,7 @@ const subj3 = subj2.pipe<"even" | "odd"> consumer3.unsubscribe(); h.unsubscribe(); }; -}); +})); subj3.subscribe(consumer3); diff --git a/djx/src/test/ts/plan.ts b/djx/src/test/ts/plan.ts --- a/djx/src/test/ts/plan.ts +++ b/djx/src/test/ts/plan.ts @@ -2,3 +2,4 @@ import "./declare-tests"; import "./observable-tests"; import "./state-tests"; import "./subject-tests"; +import "./observable-store-tests"; diff --git a/playground/src/main/ts/view/NewAppointment.tsx b/playground/src/main/ts/view/NewAppointment.tsx --- a/playground/src/main/ts/view/NewAppointment.tsx +++ b/playground/src/main/ts/view/NewAppointment.tsx @@ -1,9 +1,8 @@ import { id as mid } from "module"; import { djbase, djclass } from "@implab/djx/declare"; -import { attach, bind, createElement, prop } from "@implab/djx/tsx"; +import { bind, createElement, prop } from "@implab/djx/tsx"; import { DjxWidgetBase } from "@implab/djx/tsx/DjxWidgetBase"; import Form from "@implab/djx/form/Form"; -import { LocalDateTime } from "@js-joda/core"; import { TraceSource } from "@implab/core-amd/log/TraceSource"; import DateTextBox = require("dijit/form/DateTextBox"); import Button = require("dijit/form/Button");