# HG changeset patch # User cin # Date 2022-10-11 22:51:19 # Node ID aac297dda27dac1e86bc93617ba172a2c92415f5 # Parent b4bb7c903a747f14237294c29577ca8c040ba5d5 added reduce() and next() methods to observable query traits are moved to store.ts diff --git a/.vscode/launch.json b/.vscode/launch.json new file mode 100644 --- /dev/null +++ b/.vscode/launch.json @@ -0,0 +1,19 @@ +{ + // Используйте IntelliSense, чтобы узнать о возможных атрибутах. + // Наведите указатель мыши, чтобы просмотреть описания существующих атрибутов. + // Для получения дополнительной информации посетите: https://go.microsoft.com/fwlink/?linkid=830387 + "version": "0.2.0", + "configurations": [ + { + "type": "node", + "request": "launch", + "name": "Launch tests", + "skipFiles": [ + "/**" + ], + "program": "${workspaceFolder}/djx/build/test/index.js", + "cwd": "${workspaceFolder}/djx/build/test", + "console": "integratedTerminal" + } + ] +} \ No newline at end of file diff --git a/djx/package-lock.json b/djx/package-lock.json --- a/djx/package-lock.json +++ b/djx/package-lock.json @@ -9,7 +9,7 @@ "version": "0.0.1-dev", "license": "BSD-2-Clause", "devDependencies": { - "@implab/core-amd": "^1.4.0", + "@implab/core-amd": "^1.4.6", "@implab/dojo-typings": "1.0.3", "@types/chai": "4.1.3", "@types/requirejs": "2.1.31", @@ -31,7 +31,7 @@ "yaml": "~1.7.2" }, "peerDependencies": { - "@implab/core-amd": "^1.4.0", + "@implab/core-amd": "^1.4.6", "dojo": "^1.10.0" } }, @@ -453,10 +453,13 @@ "dev": true }, "node_modules/@implab/core-amd": { - "version": "1.4.0", - "resolved": "https://registry.npmjs.org/@implab/core-amd/-/core-amd-1.4.0.tgz", - "integrity": "sha512-gaJX1mhri7YpmXDTAYELZnmTznzXYpk2AI7Decsttdi6xY+bqGgH24q0AFcKrx8RY2jfsFXxDdf0fITz2HpBbw==", - "dev": true + "version": "1.4.6", + "resolved": "https://registry.npmjs.org/@implab/core-amd/-/core-amd-1.4.6.tgz", + "integrity": "sha512-I1RwUAxeiodePpiBzveoHaehMSAyk7NFPPPEvDqfphHBC8yXoXWAaUrp7EcOKEzjXAs7lJQVhNpmjCjIqoj6BQ==", + "dev": true, + "peerDependencies": { + "dojo": "^1.10.0" + } }, "node_modules/@implab/dojo-typings": { "version": "1.0.3", @@ -7040,10 +7043,11 @@ "dev": true }, "@implab/core-amd": { - "version": "1.4.0", - "resolved": "https://registry.npmjs.org/@implab/core-amd/-/core-amd-1.4.0.tgz", - "integrity": "sha512-gaJX1mhri7YpmXDTAYELZnmTznzXYpk2AI7Decsttdi6xY+bqGgH24q0AFcKrx8RY2jfsFXxDdf0fITz2HpBbw==", - "dev": true + "version": "1.4.6", + "resolved": "https://registry.npmjs.org/@implab/core-amd/-/core-amd-1.4.6.tgz", + "integrity": "sha512-I1RwUAxeiodePpiBzveoHaehMSAyk7NFPPPEvDqfphHBC8yXoXWAaUrp7EcOKEzjXAs7lJQVhNpmjCjIqoj6BQ==", + "dev": true, + "requires": {} }, "@implab/dojo-typings": { "version": "1.0.3", diff --git a/djx/package.json b/djx/package.json --- a/djx/package.json +++ b/djx/package.json @@ -15,11 +15,11 @@ "access": "public" }, "peerDependencies": { - "@implab/core-amd": "^1.4.0", + "@implab/core-amd": "^1.4.6", "dojo": "^1.10.0" }, "devDependencies": { - "@implab/core-amd": "^1.4.0", + "@implab/core-amd": "^1.4.6", "@types/chai": "4.1.3", "@types/requirejs": "2.1.31", "@types/yaml": "1.2.0", diff --git a/djx/src/main/ts/observable.ts b/djx/src/main/ts/observable.ts --- a/djx/src/main/ts/observable.ts +++ b/djx/src/main/ts/observable.ts @@ -1,5 +1,5 @@ -import { PromiseOrValue } from "@implab/core-amd/interfaces"; -import { isPromise } from "@implab/core-amd/safe"; +import { Cancellation } from "@implab/core-amd/Cancellation"; +import { ICancellation } from "@implab/core-amd/interfaces"; /** * The interface for the consumer of an observable sequence @@ -29,23 +29,23 @@ export type Sink = { /** * Call to send the next element in the sequence */ - next: (value: T) => void; + next: (value: T) => void; + + /** + * Call to notify about the error occurred in the sequence. + */ + error: (e: unknown) => void; - /** - * Call to notify about the error occurred in the sequence. - */ - error: (e: unknown) => void; - - /** - * Call to signal the end of the sequence. - */ - complete: () => void; + /** + * Call to signal the end of the sequence. + */ + complete: () => void; - /** - * Checks whether the sink is accepting new elements. It's safe to - * send elements to the closed sink. - */ - isClosed: () => boolean; + /** + * Checks whether the sink is accepting new elements. It's safe to + * send elements to the closed sink. + */ + isClosed: () => boolean; }; export type Producer = (sink: Sink) => (void | (() => void)); @@ -64,6 +64,8 @@ export interface Subscribable { subscribe(consumer: Partial>): Unsubscribable; } +export type AccumulatorFn = (acc: A, value: T) => A; + /** The observable source of items. */ export interface Observable extends Subscribable { /** Transforms elements of the sequence with the specified mapper @@ -85,11 +87,33 @@ export interface Observable extends S * @param accumulator * @param initial */ - scan(accumulator: (acc: A, value: T) => A, initial: A): Observable; + scan(accumulator: AccumulatorFn, initial: A): Observable; + scan(accumulator: AccumulatorFn): Observable; + /** Applies accumulator to each value in the sequence and + * emits the accumulated value at the end of the sequence + * + * @param accumulator + * @param initial + */ + reduce(accumulator: AccumulatorFn, initial: A): Observable; + reduce(accumulator: AccumulatorFn): Observable; + + /** Concatenates the specified sequences with this observable + * + * @param seq sequences to concatenate with the current observable + */ cat(...seq: Subscribable[]): Observable; - pipe(f: (source: Observable) => Producer): Observable; + /** Pipes the specified operator to produce the new observable + * @param op The operator which consumes this observable and produces a new one + */ + pipe(op: (source: Observable) => Producer): Observable; + + /** Waits for the next event to occur and returns a promise for the next value + * @param ct Cancellation token to + */ + next(ct?: ICancellation): Promise; } const noop = () => { }; @@ -134,24 +158,75 @@ const _observe = (producer: Producer< subscribe: (consumer: Partial>) => ({ unsubscribe: producer(sink(consumer)) ?? noop }), + map: (mapper) => _observe(({ next, ...rest }) => producer({ next: next !== noop ? (v: T) => next(mapper(v)) : noop, ...rest }) ), + filter: (predicate) => _observe(({ next, ...rest }) => producer({ next: next !== noop ? (v: T) => predicate(v) ? next(v) : void (0) : noop, ...rest }) ), - scan: (accumulator, initial) => _observe(({ next, ...rest }) => { - let _acc = initial; - return producer({ - next: next !== noop ? (v: T) => next(_acc = accumulator(_acc, v)) : noop, - ...rest - }); + + scan: (...args: [AccumulatorFn, A] | [AccumulatorFn]) => _observe(({ next, ...rest }) => { + if (args.length === 1) { + const [accumulator] = args; + let _acc: T; + let index = 0; + return producer({ + next: next !== noop ? (v: T) => next(index++ === 0 ? _acc = v : _acc = accumulator(_acc, v)) : noop, + ...rest + }); + } else { + const [accumulator, initial] = args; + let _acc = initial; + return producer({ + next: next !== noop ? (v: T) => next(_acc = accumulator(_acc, v)) : noop, + ...rest + }); + } + }), + + reduce: (...args: [AccumulatorFn, A] | [AccumulatorFn]) => _observe(({ next, complete, error, ...rest }) => { + if (args.length === 1) { + const [accumulator] = args; + let _acc: T; + let index = 0; + return producer({ + next: next !== noop ? (v: T) => { + _acc = index++ === 0 ? v : accumulator(_acc, v); + } : noop, + complete: () => { + if (index === 0) { + error(new Error("The sequence can't be empty")); + } else { + next(_acc); + complete(); + } + }, + error, + ...rest + }); + } else { + const [accumulator, initial] = args; + let _acc = initial; + return producer({ + next: next !== noop ? (v: T) => { + _acc = accumulator(_acc, v); + } : noop, + complete: () => { + next(_acc); + complete(); + }, + error, + ...rest + }); + } }), cat: (...seq) => _observe(({ next, complete: final, ...rest }) => { @@ -173,61 +248,59 @@ const _observe = (producer: Producer< return () => cleanup(); }), - pipe: (f: (source: Observable) => Producer) => observe(f(_observe(producer))) -}); - -export interface OrderUpdate { - /** The item is being updated */ - item: T; + pipe: (op: (source: Observable) => Producer) => observe(op(_observe(producer))), - /** The previous index of the item, -1 in case it is inserted */ - prevIndex: number; - - /** The new index of the item, -1 in case it is deleted */ - newIndex: number; -} + next: (ct?: ICancellation) => { + const _ct = ct ?? Cancellation.none; + return new Promise((resolve, reject) => { + // wrap the producer to handle only single event + const once = fuse(({ next, complete, error, isClosed }) => { + const h = _ct.register(error); -interface ObservableResults { - /** - * Allows observation of results - */ - observe(listener: (object: T, previousIndex: number, newIndex: number) => void, includeUpdates?: boolean): { - remove(): void; - }; -} + // is the _ct fires it will call error() and isClosed() will return true + const cleanup = !isClosed() ? + producer({ + next: v => (next(v), complete()), + complete: () => error(new Error("The sequence is empty")), + error, + isClosed + }) ?? noop : + noop; -interface Queryable { - query(...args: A): PromiseOrValue; -} + return () => { + h.destroy(); + cleanup(); + }; + }); -export const isObservableResults = (v: object): v is ObservableResults => - v && (typeof (v as { observe?: unknown; }).observe === "function"); + once({ + next: resolve, + error: reject, + complete: noop, + isClosed: () => false + }); + }); + } +}); export const observe = (producer: Producer) => _observe(fuse(producer)); -export const empty = observe(({ complete }) => complete()); +export const streamArray = (items: T[]) => _observe( + ({ next, complete }) => ( + items.forEach(next), + complete() + ) +); -export const query = (store: Queryable) => - (...args: A) => { - return observe>(({ next, complete, error }) => { - try { - const results = store.query(...args); - if (isPromise(results)) { - results.then(items => items.forEach((item, newIndex) => next({ item, newIndex, prevIndex: -1 }))) - .then(undefined, error); - } else { - results.forEach((item, newIndex) => next({ item, newIndex, prevIndex: -1 })); - } +export const streamPromise = (promise: PromiseLike) => observe( + ({next, error, complete}) => void promise.then(v => (next(v), complete()), error) +); - if (isObservableResults(results)) { - const h = results.observe((item, prevIndex, newIndex) => next({ item, prevIndex, newIndex })); - return () => h.remove(); - } else { - complete(); - } - } catch (err) { - error(err); - } - }); +export const of = (...items: T[]) => _observe( + ({ next, complete }) => ( + items.forEach(next), + complete() + ) +); - }; +export const empty = _observe(({ complete }) => complete()); \ No newline at end of file diff --git a/djx/src/main/ts/store.ts b/djx/src/main/ts/store.ts new file mode 100644 --- /dev/null +++ b/djx/src/main/ts/store.ts @@ -0,0 +1,58 @@ +import { PromiseOrValue } from "@implab/core-amd/interfaces"; +import { isPromise } from "@implab/core-amd/safe"; +import { observe, Observable } from "./observable"; + +export interface OrderedUpdate { + /** The item is being updated */ + readonly item: T; + + /** The previous index of the item, -1 in case it is inserted */ + readonly prevIndex: number; + + /** The new index of the item, -1 in case it is deleted */ + readonly newIndex: number; + +} + +export type QueryResults = Observable>; + +interface DjObservableResults { + /** + * Allows observation of results + */ + observe(listener: (object: T, previousIndex: number, newIndex: number) => void, includeUpdates?: boolean): { + remove(): void; + }; +} + +interface Queryable { + query(...args: A): PromiseOrValue; +} + +export const isObservableResults = (v: object): v is DjObservableResults => + v && (typeof (v as { observe?: unknown; }).observe === "function"); + +export const query = (store: Queryable, includeUpdates = true) => + (...args: A) => { + return observe>(({ next, complete, error, isClosed }) => { + try { + const results = store.query(...args); + if (isPromise(results)) { + results.then(items => items.forEach((item, newIndex) => next({ item, newIndex, prevIndex: -1 }))) + .then(undefined, error); + } else { + results.forEach((item, newIndex) => next({ item, newIndex, prevIndex: -1 })); + } + + if (!isClosed() && isObservableResults(results)) { + const h = results.observe((item, prevIndex, newIndex) => next({ item, prevIndex, newIndex }), includeUpdates); + return () => h.remove(); + } else { + complete(); + } + } catch (err) { + error(err); + } + }); + + }; diff --git a/djx/src/main/ts/tsx.ts b/djx/src/main/ts/tsx.ts --- a/djx/src/main/ts/tsx.ts +++ b/djx/src/main/ts/tsx.ts @@ -7,10 +7,11 @@ import Stateful = require("dojo/Stateful import _WidgetBase = require("dijit/_WidgetBase"); import { DjxWidgetBase } from "./tsx/DjxWidgetBase"; import { WatchRendition } from "./tsx/WatchRendition"; -import { Observable, observe, OrderUpdate, Subscribable } from "./observable"; +import { Observable, observe, Subscribable } from "./observable"; import djAttr = require("dojo/dom-attr"); import djClass = require("dojo/dom-class"); import { AnimationAttrs, WatchForRendition } from "./tsx/WatchForRendition"; +import { OrderedUpdate } from "./store"; export function createElement Element)>(elementType: T, ...args: unknown[]): Rendition { if (typeof elementType === "string") { @@ -101,7 +102,7 @@ export function watch( } } -export const watchFor = (source: T[] | Subscribable>, render: (item: T, index: number) => unknown, opts: AnimationAttrs = {}) => { +export const watchFor = (source: T[] | Subscribable>, render: (item: T, index: number) => unknown, opts: AnimationAttrs = {}) => { return new WatchForRendition({ ...opts, subject: source, diff --git a/djx/src/main/ts/tsx/WatchForRendition.ts b/djx/src/main/ts/tsx/WatchForRendition.ts --- a/djx/src/main/ts/tsx/WatchForRendition.ts +++ b/djx/src/main/ts/tsx/WatchForRendition.ts @@ -9,7 +9,8 @@ import { collectNodes, destroy as safeDe import { IDestroyable } from "@implab/core-amd/interfaces"; import { play } from "../play"; import * as fx from "dojo/fx"; -import { isObservableResults, isSubsribable, OrderUpdate, Subscribable } from "../observable"; +import { isSubsribable, Subscribable } from "../observable"; +import { isObservableResults, OrderedUpdate } from "../store"; const trace = TraceSource.get(mid); @@ -21,7 +22,7 @@ interface ItemRendition { destroy(): void; } -interface RenderTask extends OrderUpdate { +interface RenderTask extends OrderedUpdate { animate: boolean; } @@ -34,7 +35,7 @@ export interface AnimationAttrs { } export interface WatchForRenditionAttrs extends AnimationAttrs { - subject: T[] | Subscribable>; + subject: T[] | Subscribable>; component: (arg: T, index: number) => unknown; } @@ -60,7 +61,7 @@ export class WatchForRendition extend private readonly _itemRenditions: ItemRendition[] = []; - private readonly _subject: T[] | Subscribable>; + private readonly _subject: T[] | Subscribable>; private readonly _renderTasks: RenderTask[] = []; @@ -97,7 +98,7 @@ export class WatchForRendition extend const result = this._subject; if (result) { - if (isSubsribable>(result)) { + if (isSubsribable>(result)) { let animate = false; const subscription = result.subscribe({ next: ({ item, prevIndex, newIndex }) => this._onItemUpdated({ item, prevIndex, newIndex, animate }) diff --git a/djx/src/test/ts/observable-tests.ts b/djx/src/test/ts/observable-tests.ts --- a/djx/src/test/ts/observable-tests.ts +++ b/djx/src/test/ts/observable-tests.ts @@ -1,5 +1,6 @@ -import { observe } from "./observable"; +import { empty, observe } from "./observable"; import * as t from "tap"; +import { Cancellation } from "@implab/core-amd/Cancellation"; const subj1 = observe(({ next, complete }) => { next(1); @@ -54,29 +55,73 @@ t.ok(consumer2.completed, "The completio const subj2 = observe(({ next, complete }) => { [1, 2, 3, 4, 5].forEach(next); complete(); -}).pipe(self => ({ next, complete, error }) => { - t.comment("subj2: subscribe"); + return () => { + t.comment("subj2: unsubscribe"); + }; +}); + +const consumer3 = { + even: 0, + odd: 0, + completed: false, + subscribed: 0, + unsubscribed: 0, + next(v: "even" | "odd") { + this[v] ++; + }, + complete() { + this.completed = true; + }, + subscribe() { + this.subscribed++; + }, + unsubscribe() { + this.unsubscribed++; + } +}; + + +const subj3 = subj2.pipe<"even" | "odd">(self => ({ next, complete, error }) => { + consumer3.subscribe(); + let count = 0; const h = self.subscribe({ next: val => { if (val % 2 === 0) next("odd"); else next("even"); + if (++count === 4) + complete(); }, complete, error }); return () =>{ - t.comment("subj2: unsubscribe"); + consumer3.unsubscribe(); h.unsubscribe(); }; }); -subj2.subscribe({ - next: val => t.comment("subj2: ", val), +subj3.subscribe(consumer3); + +t.equal(consumer3.odd, 2, "Should get 2 odd elements"); +t.equal(consumer3.even, 2, "Should get 2 even elements"); +t.ok(consumer3.completed, "The sequence should completed"); +t.equal(consumer3.subscribed, 1, "The subscription should be done once"); +t.equal(consumer3.unsubscribed, 1, "The cleanup should be done after completion"); + +subj2.reduce((a,b) => a + b).subscribe({ + next: val => t.comment("subj2: reduce =", val), complete: () => t.comment("subj2: complete") }); -subj2.subscribe({ - next: val => t.comment("subj2: ", val), - complete: () => t.comment("subj2: complete") -}); \ No newline at end of file + +subj2.reduce((a,b) => a + b).next() + .then(value => t.comment("subj2: next reduce=", value)) + .catch(() => {}); + +subj2.next().then(val => t.equal(val, 1, "Should peek the first element")).catch(() => {}); + +const cancelled = new Cancellation(cancel => cancel()); +t.rejects(subj2.next(cancelled), "Cancelled next() method should fail").catch(() => {}); + +t.rejects(empty.next(), "Empty sequence should fail to get next element").catch(() => {}); \ No newline at end of file diff --git a/djx/src/tsconfig.json b/djx/src/tsconfig.json --- a/djx/src/tsconfig.json +++ b/djx/src/tsconfig.json @@ -9,6 +9,7 @@ "target": "ES5", "jsx": "react", "lib": ["es5", "es2015.promise", "es2015.symbol", "es2015.iterable", "dom", "scripthost"], - "noUnusedLocals": true + "noUnusedLocals": true, + "downlevelIteration": true } } \ No newline at end of file diff --git a/playground/package-lock.json b/playground/package-lock.json --- a/playground/package-lock.json +++ b/playground/package-lock.json @@ -33,7 +33,7 @@ "dev": true, "license": "BSD-2-Clause", "peerDependencies": { - "@implab/core-amd": "^1.4.0", + "@implab/core-amd": "^1.4.6", "dojo": "^1.10.0" } }, diff --git a/playground/src/main/ts/model/Appointment.ts b/playground/src/main/ts/model/Appointment.ts --- a/playground/src/main/ts/model/Appointment.ts +++ b/playground/src/main/ts/model/Appointment.ts @@ -1,4 +1,5 @@ import { Contact } from "./Contact"; +import { QueryResults } from "@implab/djx/store"; export type AppointmentRole = "organizer" | "speaker" | "participant"; @@ -16,5 +17,5 @@ export interface Appointment { /** Duration in minutes */ duration: number; - getMembers(role?: AppointmentRole): Member[]; + getMembers(role?: AppointmentRole): QueryResults; } \ No newline at end of file diff --git a/playground/src/main/ts/model/MainContext.ts b/playground/src/main/ts/model/MainContext.ts --- a/playground/src/main/ts/model/MainContext.ts +++ b/playground/src/main/ts/model/MainContext.ts @@ -3,9 +3,9 @@ import Observable = require("dojo/store/ import { Appointment, AppointmentRole, Member } from "./Appointment"; import { Contact } from "./Contact"; import { Uuid } from "@implab/core-amd/Uuid"; -import { query } from "@implab/djx/observable"; import { IDestroyable } from "@implab/core-amd/interfaces"; import { delay } from "@implab/core-amd/safe"; +import { query } from "@implab/djx/store"; type AppointmentRecord = Omit & { id: string }; @@ -41,8 +41,12 @@ export class MainContext implements IDes ); } + private readonly _queryAppointmentsRx = query(this._appointments); + + private readonly _queryMembersRx = query(this._members); + queryAppointments({ dateFrom, dateTo }: { dateFrom?: Date; dateTo?: Date; } = {}) { - return query(this._appointments)(({ startAt }) => + return this._queryAppointmentsRx(({ startAt }) => (!dateFrom || dateFrom <= startAt) && (!dateTo || startAt <= dateTo) ).map(item(this._mapAppointment)); @@ -61,7 +65,7 @@ export class MainContext implements IDes title, startAt, duration, - getMembers: (role?: AppointmentRole) => this._members.query(role ? { appointmentId: id, role } : { appointmentId: id }) + getMembers: (role?: AppointmentRole) => this._queryMembersRx(role ? { appointmentId: id, role } : { appointmentId: id }) }); destroy() { diff --git a/playground/src/main/ts/model/MainModel.ts b/playground/src/main/ts/model/MainModel.ts --- a/playground/src/main/ts/model/MainModel.ts +++ b/playground/src/main/ts/model/MainModel.ts @@ -1,7 +1,8 @@ import { id as mid } from "module"; import { BehaviorSubject, Observer, Unsubscribable } from "rxjs"; import { IDestroyable } from "@implab/core-amd/interfaces"; -import { OrderUpdate, Observable } from "@implab/djx/observable"; +import { Observable } from "@implab/djx/observable"; +import { OrderedUpdate } from "@implab/djx/store"; import { Appointment, Member } from "./Appointment"; import { MainContext } from "./MainContext"; import { LocalDate } from "@js-joda/core"; @@ -11,7 +12,7 @@ import { TraceSource } from "@implab/cor const trace = TraceSource.get(mid); export interface State { - appointments: Observable>; + appointments: Observable>; dateTo: LocalDate; diff --git a/playground/src/main/ts/view/MainWidget.tsx b/playground/src/main/ts/view/MainWidget.tsx --- a/playground/src/main/ts/view/MainWidget.tsx +++ b/playground/src/main/ts/view/MainWidget.tsx @@ -2,7 +2,8 @@ import { djbase, djclass } from "@implab import { DjxWidgetBase } from "@implab/djx/tsx/DjxWidgetBase"; import { bind, createElement, prop, watch, watchFor } from "@implab/djx/tsx"; import MainModel from "../model/MainModel"; -import { OrderUpdate, Observable } from "@implab/djx/observable"; +import { Observable } from "@implab/djx/observable"; +import { OrderedUpdate } from "@implab/djx/store"; import { Appointment } from "../model/Appointment"; import { LocalDate } from "@js-joda/core"; import Button = require("dijit/form/Button"); @@ -10,7 +11,7 @@ import Button = require("dijit/form/Butt @djclass export default class MainWidget extends djbase(DjxWidgetBase) { - appointments?: Observable>; + appointments?: Observable>; model: MainModel;