diff --git a/djx/readme.md b/djx/readme.md --- a/djx/readme.md +++ b/djx/readme.md @@ -2,7 +2,7 @@ ## SYNOPSIS -```tsx +```jsx import { djbase, djclass, bind, prototype, AbstractConstructor } from "@implab/djx/declare"; import { DjxWidgetBase } from "@implab/djx/tsx/DjxWidgetBase"; @@ -175,7 +175,7 @@ This is the base class for the djx widge This class extends `dijit/_WidgetBase` and contains logic from `_AttachMixin` thus it is capable to handle `data-dojo-attach-*` attributes from the rendered markup. -```tsx +```jsx @djclass export class MyFirstWidget extends djbase(DjxWidgetBase) { render() { @@ -199,7 +199,7 @@ Add to your `tsconfig.json` the followin "experimentalDecorators": true, "jsxFactory": "createElement", "jsx": "react", - "target": "ES5", // minimal supported version + "target": "ES5", "lib": ["ES2015", "DOM"] } } @@ -225,7 +225,7 @@ Dojo 1.x adds some standard options to d The typical implementation of this technique would look like -```tsx +```jsx import { createElement } from "@implab/djx/tsx"; import {djclass, djbase, bind} from "@implab/djx/declare"; @@ -251,7 +251,7 @@ 2. Attribute mappings support only simpl This library helps you to get both goals with special trait `watch(...)` -```tsx +```jsx import { createElement } from "@implab/djx/tsx"; import { djclass, djbase} from "@implab/djx/declare" @@ -274,7 +274,7 @@ contents according to the value changes The key feature of this approach that the rendering function within `watch` may return a complex rendition. -```tsx +```jsx // inside some widget render() { return
@@ -304,3 +304,34 @@ The `render` callback may return almost * DOM Nodes and widgets are left intact, * any other kind of value will cause an error. +The watch method allows to observe a single value, for the large sets of data +this isn't suitable well and may lead to performance issues. Dojo provides +observable stores to being able to track individual changes. The library provides +`watchFor(observable, render)` method to render observable query results and +handle changes on per item basis. + +```jsx +// inside some widget +staff = new Observable(new Memory()), + +getStuff() { + return this.staff.query(); +} + +addEmployee(employee: Employee) { + this.staff.add(employee); // the rendition will update automatically +} + +render() { + return + + + + + {watchFor(this.getStaff(), ({name, position, salary}) => + + )} + +
NamePositionSalary
{name}{position}{salary}
+} +``` diff --git a/djx/src/main/ts/observable.ts b/djx/src/main/ts/observable.ts --- a/djx/src/main/ts/observable.ts +++ b/djx/src/main/ts/observable.ts @@ -1,5 +1,6 @@ import { Cancellation } from "@implab/core-amd/Cancellation"; import { ICancellation } from "@implab/core-amd/interfaces"; +import { isPromise } from "@implab/core-amd/safe"; /** * The interface for the consumer of an observable sequence @@ -137,9 +138,12 @@ export interface Observable extends S pipe(op: OperatorFn): Observable; /** Waits for the next event to occur and returns a promise for the next value - * @param ct Cancellation token to + * @param ct Cancellation token */ next(ct?: ICancellation): Promise; + + /** Collects items of the sequence to the array. */ + collect(ct?: ICancellation): Promise; } const noop = () => { }; @@ -292,61 +296,97 @@ const _observe = (producer: Producer< pipe: (op: (source: Observable) => Producer) => observe(op(_observe(producer))), - next: (ct?: ICancellation) => { - const _ct = ct ?? Cancellation.none; - return new Promise((resolve, reject) => { - // wrap the producer to handle only single event - const once = fuse(({ next, complete, error, isClosed }) => { - const h = _ct.register(error); + next: collect( + producer, + ({ next, complete, error, isClosed }) => ({ + next: v => (next(v), complete()), + complete: () => error(new Error("The sequence is empty")), + error, + isClosed + }) + ), + + collect: collect( + producer, + ({ next, complete, ...rest }) => { + const data: T[] = []; + return { + next: v => data.push(v), + complete: () => (next(data), complete()), + ...rest + }; + } + ) +}); - // is the _ct fires it will call error() and isClosed() will return true - const cleanup = !isClosed() ? - producer({ - next: v => (next(v), complete()), - complete: () => error(new Error("The sequence is empty")), - error, - isClosed - }) ?? noop : - noop; +const collect = ( + producer: Producer, + collector: (result: Sink) => Sink +) => (ct = Cancellation.none) => new Promise((resolve, reject) => { + const fused = fuse(({ next, complete, error, isClosed }) => { + const h = ct.register(error); + const cleanup = !isClosed() ? + producer(collector({ next, complete, error, isClosed })) ?? noop : + noop; - return () => { - h.destroy(); - cleanup(); - }; - }); + return () => { + h.destroy(); + cleanup(); + }; + }); - once({ - next: resolve, - error: reject, - complete: noop, - isClosed: () => false - }); - }); - } + fused({ + next: resolve, + error: reject, + complete: noop, + isClosed: () => false + }); }); export const observe = (producer: Producer) => _observe(fuse(producer)); -export const streamArray = (items: T[]) => _observe( +export const ofArray = (items: T[]) => _observe( ({ next, complete }) => ( items.forEach(next), complete() ) ); -export const streamPromise = (promise: PromiseLike) => observe( - ({ next, error, complete }) => void promise.then( - v => (next(v), complete()), - error - ) +const of1 = (item: T | PromiseLike) => observe( + ({ next, error, complete }) => + isPromise(item) ? + void item.then( + v => (next(v), complete()), + error + ) : + (next(item), complete()) ); -export const of = (...items: T[]) => _observe( - ({ next, complete }) => ( - items.forEach(next), - complete() - ) -); +export const of = (...items: (T | PromiseLike)[]) => items.length === 1 ? + of1(items[0]) : + observe( + ({ next, error, complete, isClosed }) => { + const n = items.length; + + const _next = (start: number) => { + if (start > 0 && isClosed()) // when resumed + return; + + for (let i = start; i < n; i++) { + const r = items[i]; + if (isPromise(r)) { + r.then(v => (next(v), _next(i + 1)), error); + return; // suspend + } else { + next(r); + } + } + complete(); + }; + + _next(0); + } + ); export const empty = _observe(({ complete }) => complete()); @@ -474,7 +514,7 @@ export const subject = (producer: Pro if (_subscribers === subscribers) { const pos = subscribers.indexOf(client); if (pos >= 0) - subscribers.splice(pos,1); + subscribers.splice(pos, 1); if (!subscribers.length) cleanup(); } diff --git a/djx/src/main/ts/store.ts b/djx/src/main/ts/store.ts --- a/djx/src/main/ts/store.ts +++ b/djx/src/main/ts/store.ts @@ -35,13 +35,16 @@ export const isDjObservableResults = export const query = (store: Queryable, includeUpdates = true) => (query?: Q, options?: O & { observe?: boolean }) => { return observe>(({ next, complete, error, isClosed }) => { + + const processResults = (items: T[]) => + items.forEach((item, newIndex) => next({ item, newIndex, prevIndex: -1 })); + try { const results = store.query(query, options); if (isPromise(results)) { - results.then(items => items.forEach((item, newIndex) => next({ item, newIndex, prevIndex: -1 }))) - .then(undefined, error); + results.then(processResults).then(undefined, error); } else { - results.forEach((item, newIndex) => next({ item, newIndex, prevIndex: -1 })); + processResults(results); } if (!isClosed() && (options?.observe !== false) && isDjObservableResults(results)) { diff --git a/djx/src/main/typings/index.d.ts b/djx/src/main/typings/index.d.ts --- a/djx/src/main/typings/index.d.ts +++ b/djx/src/main/typings/index.d.ts @@ -1,6 +1,7 @@ // eslint-disable-next-line @typescript-eslint/triple-slash-reference /// +import _WidgetBase = require("dijit/_WidgetBase"); import { Rendition } from "./tsx/traits"; declare global { @@ -8,7 +9,7 @@ declare global { type Ref = ((value: T | undefined) => void); - type Element = Rendition; + type Element = Rendition | _WidgetBase; interface DjxIntrinsicAttributes { /** alias for className */ diff --git a/djx/src/test/ts/observable-tests.ts b/djx/src/test/ts/observable-tests.ts --- a/djx/src/test/ts/observable-tests.ts +++ b/djx/src/test/ts/observable-tests.ts @@ -1,6 +1,7 @@ -import { empty, observe } from "./observable"; -import * as t from "tap"; +import { empty, observe, of } from "./observable"; +import * as tap from "tap"; import { Cancellation } from "@implab/core-amd/Cancellation"; +import { delay } from "@implab/core-amd/safe"; const subj1 = observe(({ next, complete }) => { next(1); @@ -16,10 +17,10 @@ const consumer1 = { }; subj1.subscribe(consumer1); -t.equal(consumer1.sum, 1, "Should get only one value"); +tap.equal(consumer1.sum, 1, "Should get only one value"); subj1.subscribe(consumer1); -t.equal(consumer1.sum, 2, "Should get the value again"); +tap.equal(consumer1.sum, 2, "Should get the value again"); const consumer2 = { value: 0, @@ -32,31 +33,31 @@ let maps = 0; subj1 .map(v => { - t.comment(`map1: ${v * 2}`); + tap.comment(`map1: ${v * 2}`); maps++; return v * 2; }) .map(v => { - t.comment(`map2: ${v * 2}`); + tap.comment(`map2: ${v * 2}`); maps++; return v * 2; }) .map(v => { - t.comment(`map3: ${v * 2}`); + tap.comment(`map3: ${v * 2}`); maps++; return v * 2; }) .subscribe(consumer2); -t.equal(consumer2.value, 8, "Should map"); -t.equal(maps, 3, "The map chain should not be executed after completion"); -t.ok(consumer2.completed, "The completion signal should pass through"); +tap.equal(consumer2.value, 8, "Should map"); +tap.equal(maps, 3, "The map chain should not be executed after completion"); +tap.ok(consumer2.completed, "The completion signal should pass through"); const subj2 = observe(({ next, complete }) => { [1, 2, 3, 4, 5].forEach(next); complete(); return () => { - t.comment("subj2: unsubscribe"); + tap.comment("subj2: unsubscribe"); }; }); @@ -67,7 +68,7 @@ const consumer3 = { subscribed: 0, unsubscribed: 0, next(v: "even" | "odd") { - this[v] ++; + this[v]++; }, complete() { this.completed = true; @@ -96,7 +97,7 @@ const subj3 = subj2.pipe<"even" | "odd"> complete, error }); - return () =>{ + return () => { consumer3.unsubscribe(); h.unsubscribe(); }; @@ -104,24 +105,46 @@ const subj3 = subj2.pipe<"even" | "odd"> subj3.subscribe(consumer3); -t.equal(consumer3.odd, 2, "Should get 2 odd elements"); -t.equal(consumer3.even, 2, "Should get 2 even elements"); -t.ok(consumer3.completed, "The sequence should completed"); -t.equal(consumer3.subscribed, 1, "The subscription should be done once"); -t.equal(consumer3.unsubscribed, 1, "The cleanup should be done after completion"); +tap.equal(consumer3.odd, 2, "Should get 2 odd elements"); +tap.equal(consumer3.even, 2, "Should get 2 even elements"); +tap.ok(consumer3.completed, "The sequence should completed"); +tap.equal(consumer3.subscribed, 1, "The subscription should be done once"); +tap.equal(consumer3.unsubscribed, 1, "The cleanup should be done after completion"); -subj2.reduce((a,b) => a + b).subscribe({ - next: val => t.comment("subj2: reduce =", val), - complete: () => t.comment("subj2: complete") +subj2.reduce((a, b) => a + b).subscribe({ + next: val => tap.comment("subj2: reduce =", val), + complete: () => tap.comment("subj2: complete") }); -subj2.reduce((a,b) => a + b).next() - .then(value => t.comment("subj2: next reduce=", value)) - .catch(() => {}); +tap.test("of(...) tests", async t => { + await subj2.reduce((a, b) => a + b).next() + .then(value => t.comment("subj2: next reduce=", value)); + + await subj2.next().then(val => t.equal(val, 1, "Should peek the first element")); + + const cancelled = new Cancellation(cancel => cancel()); + await t.rejects(subj2.next(cancelled), "Cancelled next() method should fail"); + + await t.rejects(empty.next(), "Empty sequence should fail to get next element"); + + await of(delay(1).then(() => 1), Promise.resolve(2), 3) + .reduce((a, x) => [...a, x], []) + .next() + .then(res => t.same(res, [1, 2, 3], "of(...) should keep the order")); -subj2.next().then(val => t.equal(val, 1, "Should peek the first element")).catch(() => {}); + const rejected = Promise.reject("DIE!"); + rejected.catch(() => { }); // SAFE AND SOUND -const cancelled = new Cancellation(cancel => cancel()); -t.rejects(subj2.next(cancelled), "Cancelled next() method should fail").catch(() => {}); + await t.resolves( + of(Promise.resolve(1), rejected).next(), + "of(...) should emit non-rejected items" + ); + await t.rejects( + of(1, Promise.reject("DIE!")).reduce((a) => a).next(), + "of(...) should terminate with error when a parameter is rejected" + ); -t.rejects(empty.next(), "Empty sequence should fail to get next element").catch(() => {}); \ No newline at end of file + t.same(await of(1,2,3).collect(), [1,2,3], ".collect() should return the collected sequence"); + await t.rejects(of(1,2,3).collect(cancelled), ".collect() should support cancellation"); + +}).catch(() => { }); \ No newline at end of file diff --git a/djx/src/test/ts/plan.ts b/djx/src/test/ts/plan.ts --- a/djx/src/test/ts/plan.ts +++ b/djx/src/test/ts/plan.ts @@ -1,4 +1,4 @@ import "./declare-tests"; import "./observable-tests"; import "./state-tests"; -import "./subject-tests"; \ No newline at end of file +import "./subject-tests";