# HG changeset patch # User cin # Date 2023-08-07 23:34:36 # Node ID 63215d91ae4b711eeb2145b9685240e1afada904 # Parent 62b21ca84dcebbbdecb38c5c0f6495be48fcd82d added tap() method to observable added queryEx() function to store diff --git a/djx/src/main/ts/observable.ts b/djx/src/main/ts/observable.ts --- a/djx/src/main/ts/observable.ts +++ b/djx/src/main/ts/observable.ts @@ -70,7 +70,10 @@ export const isSubscribable = ).subscribe === "function"; export interface Subscribable { - subscribe(consumer: Observer): Unsubscribable; + /** Subscribes a consumer to events. If a consumer isn't specified + * this method activates the producer to achieve side affects if any. + */ + subscribe(consumer?: Observer): Unsubscribable; } export type AccumulatorFn = (acc: A, value: T) => A; @@ -85,6 +88,13 @@ export interface Observable extends S */ map(mapper: (value: T) => T2): Observable; + /** Injects the specified observer into the each producer to consumer chain. + * The method is used to add side effect to the events processing. + * + * @param observer The consumer for the events + */ + tap(observer: Observer): Observable; + /** Filters elements of the sequence. The resulting sequence will * contain only elements which match the specified predicate. * @@ -203,7 +213,7 @@ const fuse = (producer: Producer) }; const _observe = (producer: FusedProducer): Observable => ({ - subscribe: (consumer: Observer) => ({ + subscribe: (consumer: Observer = {}) => ({ unsubscribe: producer(sink(consumer)) ?? noop }), @@ -214,6 +224,14 @@ const _observe = (producer: FusedProd }) ), + tap: ({next: tapNext, complete: tapComplete, error: tapError}) => _observe(({next,complete, error}) => + producer({ + next: tapNext ? (v => (tapNext(v), next(v))) : next, + complete: tapComplete ? (() => (tapComplete(), complete())): complete, + error: tapError ? (e => (tapError(e), error(e))) : error + }) + ), + filter: (predicate) => _observe(({ next, ...rest }) => producer({ next: next !== noop ? (v: T) => predicate(v) ? next(v) : void (0) : noop, diff --git a/djx/src/main/ts/store.ts b/djx/src/main/ts/store.ts --- a/djx/src/main/ts/store.ts +++ b/djx/src/main/ts/store.ts @@ -33,34 +33,61 @@ interface Queryable { export const isDjObservableResults = (v: object): v is DjObservableResults => v && (typeof (v as { observe?: unknown; }).observe === "function"); -export const query = (store: Queryable, includeUpdates = true) => - (query?: Q, options?: O & { observe?: boolean }) => { - return observe>(({ next, complete, error, isClosed }) => { +export const query = (store: Queryable, includeUpdates = true) => { + const q = queryEx(store, includeUpdates); + return (query?: Q, options?: O & { observe?: boolean }) => { + const [data, updates] = q(query, options); + + return options?.observe === false ? data : data.cat(updates); + }; +}; +export const queryEx = (store: Queryable, includeUpdates = true) => + (query?: Q, options?: O): [data: QueryResults, updates: QueryResults] => { + + const pending: T[] = []; + + let results: PromiseOrValue = pending; + + const data = observe>(({ next, complete, error }) => { const processResults = (items: T[]) => items.forEach((item, newIndex) => next({ item, newIndex, prevIndex: -1 })); try { - const results = store.query(query, options); + if (results === pending) + results = store.query(query, options); + if (isPromise(results)) { - results.then(processResults).then(undefined, error); + results.then(processResults).then(complete, error); + + if (isCancellable(results)) + return results.cancel.bind(results); } else { processResults(results); + complete(); } + } catch (e) { + error(e); + } + }); - if (!isClosed() && (options?.observe !== false) && isDjObservableResults(results)) { + const updates = observe>(({ next, complete, error, isClosed }) => { + try { + if (!isClosed() && isDjObservableResults(results)) { const h = results.observe((item, prevIndex, newIndex) => next({ item, prevIndex, newIndex }), includeUpdates); return () => h.remove(); } else { complete(); } - } catch (err) { - error(err); + } catch (e) { + error(e); } }); + return [data, updates]; }; + interface IndexedStore { get(id: string | number): PromiseLike | T | null | undefined; } @@ -76,11 +103,11 @@ interface GetOpts { observe?: boolean; } -type Change = [item: NonNullable, id: string | number | undefined] | +export type ItemUpdate = [item: NonNullable, id: string | number | undefined] | [item: undefined | null, id: string | number]; const filterItem = (itemId: string | number) => - (source: Observable>) => + (source: Observable>) => observe(({ next, complete, error }) => { const subscription = source .filter(([, id]) => id === itemId) @@ -94,8 +121,8 @@ const filterItem = (itemId: string | num export const get = (store: IndexedStore) => { const changes = hasNotifications(store) ? - observe>(({ next }) => { - const handle = after(store, "notify", (...args: Change) => next(args), true); + observe>(({ next }) => { + const handle = after(store, "notify", (...args: ItemUpdate) => next(args), true); return () => handle.remove(); }) : empty; diff --git a/djx/src/test/ts/observable-store-tests.ts b/djx/src/test/ts/observable-store-tests.ts --- a/djx/src/test/ts/observable-store-tests.ts +++ b/djx/src/test/ts/observable-store-tests.ts @@ -1,6 +1,6 @@ import Memory = require("dojo/store/Memory"); import Observerable = require("dojo/store/Observable"); -import { get } from "./store"; +import { get, queryEx } from "./store"; import tap = require("tap"); interface Person { @@ -61,5 +61,29 @@ tap.test("store::get(...) tests", async samSubscription.unsubscribe(); } + store.put({ id: samId, name: "Sam", age: 29}); + + const [data, updates] = queryEx(store)({ age: 29}, { sort: [{attribute: "id"}] }); + + const dump: string[] = []; + + const subscription = data + .tap({ + complete: () => dump.push("eof") + }) + .cat(updates) + .tap({ + next: ({item: {id}}) => dump.push(id), + complete: () => dump.push("eof") + }) + .subscribe({}); + + t.same(dump, ["id:peter", "id:sam", "eof"]); + + store.put({ id: "id:mary", name: "Mary", age: 29}); + + t.same(dump, ["id:peter", "id:sam", "eof", "id:mary"]); + + subscription.unsubscribe(); }).catch(() => { }); \ No newline at end of file diff --git a/djx/src/test/ts/observable-tests.ts b/djx/src/test/ts/observable-tests.ts --- a/djx/src/test/ts/observable-tests.ts +++ b/djx/src/test/ts/observable-tests.ts @@ -147,4 +147,27 @@ tap.test("of(...) tests", async t => { t.same(await of(1,2,3).collect(), [1,2,3], ".collect() should return the collected sequence"); await t.rejects(of(1,2,3).collect(cancelled), ".collect() should support cancellation"); -}).catch(() => { }); \ No newline at end of file +}).catch(() => { }); + +tap.test(".tap() tests", async t => { + const side: number[] = []; + + of(1,2) + .tap({next: v => side.push(v), complete: () => side.push(0)}) + .tap({next: v => side.push(v*v)}) + .subscribe({}); + + t.same(side, [1,1,2,4,0], ".tap() should be called in the order of registration"); + + side.length = 0; + + await new Promise(resolve => { + of(1,2,delay(1).then(() => 3)) + .tap({next: v => side.push(v)}) + .tap({ next: v => v === 1 && resolve()}) + .subscribe({}); + }); + + t.same(side, [1,2], ".tap() should be processed synchronously"); + +}).catch(() => {}); \ No newline at end of file