##// END OF EJS Templates
added observable.collect() method to collect a sequnce to the array...
cin -
r129:66546e709732 v1.8.0 default
parent child
Show More
@@ -2,7 +2,7
2
2
3 ## SYNOPSIS
3 ## SYNOPSIS
4
4
5 ```tsx
5 ```jsx
6 import { djbase, djclass, bind, prototype, AbstractConstructor } from "@implab/djx/declare";
6 import { djbase, djclass, bind, prototype, AbstractConstructor } from "@implab/djx/declare";
7
7
8 import { DjxWidgetBase } from "@implab/djx/tsx/DjxWidgetBase";
8 import { DjxWidgetBase } from "@implab/djx/tsx/DjxWidgetBase";
@@ -175,7 +175,7 This is the base class for the djx widge
175 This class extends `dijit/_WidgetBase` and contains logic from `_AttachMixin` thus
175 This class extends `dijit/_WidgetBase` and contains logic from `_AttachMixin` thus
176 it is capable to handle `data-dojo-attach-*` attributes from the rendered markup.
176 it is capable to handle `data-dojo-attach-*` attributes from the rendered markup.
177
177
178 ```tsx
178 ```jsx
179 @djclass
179 @djclass
180 export class MyFirstWidget extends djbase(DjxWidgetBase) {
180 export class MyFirstWidget extends djbase(DjxWidgetBase) {
181 render() {
181 render() {
@@ -199,7 +199,7 Add to your `tsconfig.json` the followin
199 "experimentalDecorators": true,
199 "experimentalDecorators": true,
200 "jsxFactory": "createElement",
200 "jsxFactory": "createElement",
201 "jsx": "react",
201 "jsx": "react",
202 "target": "ES5", // minimal supported version
202 "target": "ES5",
203 "lib": ["ES2015", "DOM"]
203 "lib": ["ES2015", "DOM"]
204 }
204 }
205 }
205 }
@@ -225,7 +225,7 Dojo 1.x adds some standard options to d
225
225
226 The typical implementation of this technique would look like
226 The typical implementation of this technique would look like
227
227
228 ```tsx
228 ```jsx
229 import { createElement } from "@implab/djx/tsx";
229 import { createElement } from "@implab/djx/tsx";
230 import {djclass, djbase, bind} from "@implab/djx/declare";
230 import {djclass, djbase, bind} from "@implab/djx/declare";
231
231
@@ -251,7 +251,7 2. Attribute mappings support only simpl
251
251
252 This library helps you to get both goals with special trait `watch(...)`
252 This library helps you to get both goals with special trait `watch(...)`
253
253
254 ```tsx
254 ```jsx
255 import { createElement } from "@implab/djx/tsx";
255 import { createElement } from "@implab/djx/tsx";
256 import { djclass, djbase} from "@implab/djx/declare"
256 import { djclass, djbase} from "@implab/djx/declare"
257
257
@@ -274,7 +274,7 contents according to the value changes
274 The key feature of this approach that the rendering function within `watch` may
274 The key feature of this approach that the rendering function within `watch` may
275 return a complex rendition.
275 return a complex rendition.
276
276
277 ```tsx
277 ```jsx
278 // inside some widget
278 // inside some widget
279 render() {
279 render() {
280 return <section>
280 return <section>
@@ -304,3 +304,34 The `render` callback may return almost
304 * DOM Nodes and widgets are left intact,
304 * DOM Nodes and widgets are left intact,
305 * any other kind of value will cause an error.
305 * any other kind of value will cause an error.
306
306
307 The watch method allows to observe a single value, for the large sets of data
308 this isn't suitable well and may lead to performance issues. Dojo provides
309 observable stores to being able to track individual changes. The library provides
310 `watchFor(observable, render)` method to render observable query results and
311 handle changes on per item basis.
312
313 ```jsx
314 // inside some widget
315 staff = new Observable(new Memory<Employee>()),
316
317 getStuff() {
318 return this.staff.query();
319 }
320
321 addEmployee(employee: Employee) {
322 this.staff.add(employee); // the rendition will update automatically
323 }
324
325 render() {
326 return <table>
327 <thead>
328 <tr><th>Name</th><th>Position</th><th>Salary</th></tr>
329 </thead>
330 <tbody>
331 {watchFor(this.getStaff(), ({name, position, salary}) =>
332 <tr><td>{name}</td><td>{position}</td><td>{salary}</td></tr>
333 )}
334 </tbody>
335 </table>
336 }
337 ```
@@ -1,5 +1,6
1 import { Cancellation } from "@implab/core-amd/Cancellation";
1 import { Cancellation } from "@implab/core-amd/Cancellation";
2 import { ICancellation } from "@implab/core-amd/interfaces";
2 import { ICancellation } from "@implab/core-amd/interfaces";
3 import { isPromise } from "@implab/core-amd/safe";
3
4
4 /**
5 /**
5 * The interface for the consumer of an observable sequence
6 * The interface for the consumer of an observable sequence
@@ -137,9 +138,12 export interface Observable<T> extends S
137 pipe<U>(op: OperatorFn<T, U>): Observable<U>;
138 pipe<U>(op: OperatorFn<T, U>): Observable<U>;
138
139
139 /** Waits for the next event to occur and returns a promise for the next value
140 /** Waits for the next event to occur and returns a promise for the next value
140 * @param ct Cancellation token to
141 * @param ct Cancellation token
141 */
142 */
142 next(ct?: ICancellation): Promise<T>;
143 next(ct?: ICancellation): Promise<T>;
144
145 /** Collects items of the sequence to the array. */
146 collect(ct?: ICancellation): Promise<T[]>;
143 }
147 }
144
148
145 const noop = () => { };
149 const noop = () => { };
@@ -292,61 +296,97 const _observe = <T>(producer: Producer<
292
296
293 pipe: <U>(op: (source: Observable<T>) => Producer<U>) => observe(op(_observe(producer))),
297 pipe: <U>(op: (source: Observable<T>) => Producer<U>) => observe(op(_observe(producer))),
294
298
295 next: (ct?: ICancellation) => {
299 next: collect(
296 const _ct = ct ?? Cancellation.none;
300 producer,
297 return new Promise<T>((resolve, reject) => {
301 ({ next, complete, error, isClosed }) => ({
298 // wrap the producer to handle only single event
302 next: v => (next(v), complete()),
299 const once = fuse<T>(({ next, complete, error, isClosed }) => {
303 complete: () => error(new Error("The sequence is empty")),
300 const h = _ct.register(error);
304 error,
305 isClosed
306 })
307 ),
308
309 collect: collect(
310 producer,
311 ({ next, complete, ...rest }) => {
312 const data: T[] = [];
313 return {
314 next: v => data.push(v),
315 complete: () => (next(data), complete()),
316 ...rest
317 };
318 }
319 )
320 });
301
321
302 // is the _ct fires it will call error() and isClosed() will return true
322 const collect = <T, U>(
303 const cleanup = !isClosed() ?
323 producer: Producer<T>,
304 producer({
324 collector: (result: Sink<U>) => Sink<T>
305 next: v => (next(v), complete()),
325 ) => (ct = Cancellation.none) => new Promise<U>((resolve, reject) => {
306 complete: () => error(new Error("The sequence is empty")),
326 const fused = fuse<U>(({ next, complete, error, isClosed }) => {
307 error,
327 const h = ct.register(error);
308 isClosed
328 const cleanup = !isClosed() ?
309 }) ?? noop :
329 producer(collector({ next, complete, error, isClosed })) ?? noop :
310 noop;
330 noop;
311
331
312 return () => {
332 return () => {
313 h.destroy();
333 h.destroy();
314 cleanup();
334 cleanup();
315 };
335 };
316 });
336 });
317
337
318 once({
338 fused({
319 next: resolve,
339 next: resolve,
320 error: reject,
340 error: reject,
321 complete: noop,
341 complete: noop,
322 isClosed: () => false
342 isClosed: () => false
323 });
343 });
324 });
325 }
326 });
344 });
327
345
328 export const observe = <T>(producer: Producer<T>) => _observe(fuse(producer));
346 export const observe = <T>(producer: Producer<T>) => _observe(fuse(producer));
329
347
330 export const streamArray = <T>(items: T[]) => _observe<T>(
348 export const ofArray = <T>(items: T[]) => _observe<T>(
331 ({ next, complete }) => (
349 ({ next, complete }) => (
332 items.forEach(next),
350 items.forEach(next),
333 complete()
351 complete()
334 )
352 )
335 );
353 );
336
354
337 export const streamPromise = <T>(promise: PromiseLike<T>) => observe<T>(
355 const of1 = <T>(item: T | PromiseLike<T>) => observe<T>(
338 ({ next, error, complete }) => void promise.then(
356 ({ next, error, complete }) =>
339 v => (next(v), complete()),
357 isPromise(item) ?
340 error
358 void item.then(
341 )
359 v => (next(v), complete()),
360 error
361 ) :
362 (next(item), complete())
342 );
363 );
343
364
344 export const of = <T>(...items: T[]) => _observe<T>(
365 export const of = <T>(...items: (T | PromiseLike<T>)[]) => items.length === 1 ?
345 ({ next, complete }) => (
366 of1(items[0]) :
346 items.forEach(next),
367 observe<T>(
347 complete()
368 ({ next, error, complete, isClosed }) => {
348 )
369 const n = items.length;
349 );
370
371 const _next = (start: number) => {
372 if (start > 0 && isClosed()) // when resumed
373 return;
374
375 for (let i = start; i < n; i++) {
376 const r = items[i];
377 if (isPromise(r)) {
378 r.then(v => (next(v), _next(i + 1)), error);
379 return; // suspend
380 } else {
381 next(r);
382 }
383 }
384 complete();
385 };
386
387 _next(0);
388 }
389 );
350
390
351 export const empty = _observe<never>(({ complete }) => complete());
391 export const empty = _observe<never>(({ complete }) => complete());
352
392
@@ -474,7 +514,7 export const subject = <T>(producer: Pro
474 if (_subscribers === subscribers) {
514 if (_subscribers === subscribers) {
475 const pos = subscribers.indexOf(client);
515 const pos = subscribers.indexOf(client);
476 if (pos >= 0)
516 if (pos >= 0)
477 subscribers.splice(pos,1);
517 subscribers.splice(pos, 1);
478 if (!subscribers.length)
518 if (!subscribers.length)
479 cleanup();
519 cleanup();
480 }
520 }
@@ -35,13 +35,16 export const isDjObservableResults = <T>
35 export const query = <T, Q, O>(store: Queryable<T, Q, O>, includeUpdates = true) =>
35 export const query = <T, Q, O>(store: Queryable<T, Q, O>, includeUpdates = true) =>
36 (query?: Q, options?: O & { observe?: boolean }) => {
36 (query?: Q, options?: O & { observe?: boolean }) => {
37 return observe<OrderedUpdate<T>>(({ next, complete, error, isClosed }) => {
37 return observe<OrderedUpdate<T>>(({ next, complete, error, isClosed }) => {
38
39 const processResults = (items: T[]) =>
40 items.forEach((item, newIndex) => next({ item, newIndex, prevIndex: -1 }));
41
38 try {
42 try {
39 const results = store.query(query, options);
43 const results = store.query(query, options);
40 if (isPromise(results)) {
44 if (isPromise(results)) {
41 results.then(items => items.forEach((item, newIndex) => next({ item, newIndex, prevIndex: -1 })))
45 results.then(processResults).then(undefined, error);
42 .then(undefined, error);
43 } else {
46 } else {
44 results.forEach((item, newIndex) => next({ item, newIndex, prevIndex: -1 }));
47 processResults(results);
45 }
48 }
46
49
47 if (!isClosed() && (options?.observe !== false) && isDjObservableResults<T>(results)) {
50 if (!isClosed() && (options?.observe !== false) && isDjObservableResults<T>(results)) {
@@ -1,6 +1,7
1 // eslint-disable-next-line @typescript-eslint/triple-slash-reference
1 // eslint-disable-next-line @typescript-eslint/triple-slash-reference
2 /// <reference path="./css-plugin.d.ts"/>
2 /// <reference path="./css-plugin.d.ts"/>
3
3
4 import _WidgetBase = require("dijit/_WidgetBase");
4 import { Rendition } from "./tsx/traits";
5 import { Rendition } from "./tsx/traits";
5
6
6 declare global {
7 declare global {
@@ -8,7 +9,7 declare global {
8
9
9 type Ref<T> = ((value: T | undefined) => void);
10 type Ref<T> = ((value: T | undefined) => void);
10
11
11 type Element = Rendition;
12 type Element = Rendition | _WidgetBase;
12
13
13 interface DjxIntrinsicAttributes<E> {
14 interface DjxIntrinsicAttributes<E> {
14 /** alias for className */
15 /** alias for className */
@@ -1,6 +1,7
1 import { empty, observe } from "./observable";
1 import { empty, observe, of } from "./observable";
2 import * as t from "tap";
2 import * as tap from "tap";
3 import { Cancellation } from "@implab/core-amd/Cancellation";
3 import { Cancellation } from "@implab/core-amd/Cancellation";
4 import { delay } from "@implab/core-amd/safe";
4
5
5 const subj1 = observe<number>(({ next, complete }) => {
6 const subj1 = observe<number>(({ next, complete }) => {
6 next(1);
7 next(1);
@@ -16,10 +17,10 const consumer1 = {
16 };
17 };
17
18
18 subj1.subscribe(consumer1);
19 subj1.subscribe(consumer1);
19 t.equal(consumer1.sum, 1, "Should get only one value");
20 tap.equal(consumer1.sum, 1, "Should get only one value");
20
21
21 subj1.subscribe(consumer1);
22 subj1.subscribe(consumer1);
22 t.equal(consumer1.sum, 2, "Should get the value again");
23 tap.equal(consumer1.sum, 2, "Should get the value again");
23
24
24 const consumer2 = {
25 const consumer2 = {
25 value: 0,
26 value: 0,
@@ -32,31 +33,31 let maps = 0;
32
33
33 subj1
34 subj1
34 .map(v => {
35 .map(v => {
35 t.comment(`map1: ${v * 2}`);
36 tap.comment(`map1: ${v * 2}`);
36 maps++;
37 maps++;
37 return v * 2;
38 return v * 2;
38 })
39 })
39 .map(v => {
40 .map(v => {
40 t.comment(`map2: ${v * 2}`);
41 tap.comment(`map2: ${v * 2}`);
41 maps++;
42 maps++;
42 return v * 2;
43 return v * 2;
43 })
44 })
44 .map(v => {
45 .map(v => {
45 t.comment(`map3: ${v * 2}`);
46 tap.comment(`map3: ${v * 2}`);
46 maps++;
47 maps++;
47 return v * 2;
48 return v * 2;
48 })
49 })
49 .subscribe(consumer2);
50 .subscribe(consumer2);
50
51
51 t.equal(consumer2.value, 8, "Should map");
52 tap.equal(consumer2.value, 8, "Should map");
52 t.equal(maps, 3, "The map chain should not be executed after completion");
53 tap.equal(maps, 3, "The map chain should not be executed after completion");
53 t.ok(consumer2.completed, "The completion signal should pass through");
54 tap.ok(consumer2.completed, "The completion signal should pass through");
54
55
55 const subj2 = observe<number>(({ next, complete }) => {
56 const subj2 = observe<number>(({ next, complete }) => {
56 [1, 2, 3, 4, 5].forEach(next);
57 [1, 2, 3, 4, 5].forEach(next);
57 complete();
58 complete();
58 return () => {
59 return () => {
59 t.comment("subj2: unsubscribe");
60 tap.comment("subj2: unsubscribe");
60 };
61 };
61 });
62 });
62
63
@@ -67,7 +68,7 const consumer3 = {
67 subscribed: 0,
68 subscribed: 0,
68 unsubscribed: 0,
69 unsubscribed: 0,
69 next(v: "even" | "odd") {
70 next(v: "even" | "odd") {
70 this[v] ++;
71 this[v]++;
71 },
72 },
72 complete() {
73 complete() {
73 this.completed = true;
74 this.completed = true;
@@ -96,7 +97,7 const subj3 = subj2.pipe<"even" | "odd">
96 complete,
97 complete,
97 error
98 error
98 });
99 });
99 return () =>{
100 return () => {
100 consumer3.unsubscribe();
101 consumer3.unsubscribe();
101 h.unsubscribe();
102 h.unsubscribe();
102 };
103 };
@@ -104,24 +105,46 const subj3 = subj2.pipe<"even" | "odd">
104
105
105 subj3.subscribe(consumer3);
106 subj3.subscribe(consumer3);
106
107
107 t.equal(consumer3.odd, 2, "Should get 2 odd elements");
108 tap.equal(consumer3.odd, 2, "Should get 2 odd elements");
108 t.equal(consumer3.even, 2, "Should get 2 even elements");
109 tap.equal(consumer3.even, 2, "Should get 2 even elements");
109 t.ok(consumer3.completed, "The sequence should completed");
110 tap.ok(consumer3.completed, "The sequence should completed");
110 t.equal(consumer3.subscribed, 1, "The subscription should be done once");
111 tap.equal(consumer3.subscribed, 1, "The subscription should be done once");
111 t.equal(consumer3.unsubscribed, 1, "The cleanup should be done after completion");
112 tap.equal(consumer3.unsubscribed, 1, "The cleanup should be done after completion");
112
113
113 subj2.reduce((a,b) => a + b).subscribe({
114 subj2.reduce((a, b) => a + b).subscribe({
114 next: val => t.comment("subj2: reduce =", val),
115 next: val => tap.comment("subj2: reduce =", val),
115 complete: () => t.comment("subj2: complete")
116 complete: () => tap.comment("subj2: complete")
116 });
117 });
117
118
118 subj2.reduce((a,b) => a + b).next()
119 tap.test("of(...) tests", async t => {
119 .then(value => t.comment("subj2: next reduce=", value))
120 await subj2.reduce((a, b) => a + b).next()
120 .catch(() => {});
121 .then(value => t.comment("subj2: next reduce=", value));
122
123 await subj2.next().then(val => t.equal(val, 1, "Should peek the first element"));
124
125 const cancelled = new Cancellation(cancel => cancel());
126 await t.rejects(subj2.next(cancelled), "Cancelled next() method should fail");
127
128 await t.rejects(empty.next(), "Empty sequence should fail to get next element");
129
130 await of(delay(1).then(() => 1), Promise.resolve(2), 3)
131 .reduce<number[]>((a, x) => [...a, x], [])
132 .next()
133 .then(res => t.same(res, [1, 2, 3], "of(...) should keep the order"));
121
134
122 subj2.next().then(val => t.equal(val, 1, "Should peek the first element")).catch(() => {});
135 const rejected = Promise.reject("DIE!");
136 rejected.catch(() => { }); // SAFE AND SOUND
123
137
124 const cancelled = new Cancellation(cancel => cancel());
138 await t.resolves(
125 t.rejects(subj2.next(cancelled), "Cancelled next() method should fail").catch(() => {});
139 of(Promise.resolve(1), rejected).next(),
140 "of(...) should emit non-rejected items"
141 );
142 await t.rejects(
143 of(1, Promise.reject("DIE!")).reduce((a) => a).next(),
144 "of(...) should terminate with error when a parameter is rejected"
145 );
126
146
127 t.rejects(empty.next(), "Empty sequence should fail to get next element").catch(() => {}); No newline at end of file
147 t.same(await of(1,2,3).collect(), [1,2,3], ".collect() should return the collected sequence");
148 await t.rejects(of(1,2,3).collect(cancelled), ".collect() should support cancellation");
149
150 }).catch(() => { }); No newline at end of file
@@ -1,4 +1,4
1 import "./declare-tests";
1 import "./declare-tests";
2 import "./observable-tests";
2 import "./observable-tests";
3 import "./state-tests";
3 import "./state-tests";
4 import "./subject-tests"; No newline at end of file
4 import "./subject-tests";
General Comments 0
You need to be logged in to leave comments. Login now