##// END OF EJS Templates
added observable.collect() method to collect a sequnce to the array...
cin -
r129:66546e709732 v1.8.0 default
parent child
Show More
@@ -2,7 +2,7
2 2
3 3 ## SYNOPSIS
4 4
5 ```tsx
5 ```jsx
6 6 import { djbase, djclass, bind, prototype, AbstractConstructor } from "@implab/djx/declare";
7 7
8 8 import { DjxWidgetBase } from "@implab/djx/tsx/DjxWidgetBase";
@@ -175,7 +175,7 This is the base class for the djx widge
175 175 This class extends `dijit/_WidgetBase` and contains logic from `_AttachMixin` thus
176 176 it is capable to handle `data-dojo-attach-*` attributes from the rendered markup.
177 177
178 ```tsx
178 ```jsx
179 179 @djclass
180 180 export class MyFirstWidget extends djbase(DjxWidgetBase) {
181 181 render() {
@@ -199,7 +199,7 Add to your `tsconfig.json` the followin
199 199 "experimentalDecorators": true,
200 200 "jsxFactory": "createElement",
201 201 "jsx": "react",
202 "target": "ES5", // minimal supported version
202 "target": "ES5",
203 203 "lib": ["ES2015", "DOM"]
204 204 }
205 205 }
@@ -225,7 +225,7 Dojo 1.x adds some standard options to d
225 225
226 226 The typical implementation of this technique would look like
227 227
228 ```tsx
228 ```jsx
229 229 import { createElement } from "@implab/djx/tsx";
230 230 import {djclass, djbase, bind} from "@implab/djx/declare";
231 231
@@ -251,7 +251,7 2. Attribute mappings support only simpl
251 251
252 252 This library helps you to get both goals with special trait `watch(...)`
253 253
254 ```tsx
254 ```jsx
255 255 import { createElement } from "@implab/djx/tsx";
256 256 import { djclass, djbase} from "@implab/djx/declare"
257 257
@@ -274,7 +274,7 contents according to the value changes
274 274 The key feature of this approach that the rendering function within `watch` may
275 275 return a complex rendition.
276 276
277 ```tsx
277 ```jsx
278 278 // inside some widget
279 279 render() {
280 280 return <section>
@@ -304,3 +304,34 The `render` callback may return almost
304 304 * DOM Nodes and widgets are left intact,
305 305 * any other kind of value will cause an error.
306 306
307 The watch method allows to observe a single value, for the large sets of data
308 this isn't suitable well and may lead to performance issues. Dojo provides
309 observable stores to being able to track individual changes. The library provides
310 `watchFor(observable, render)` method to render observable query results and
311 handle changes on per item basis.
312
313 ```jsx
314 // inside some widget
315 staff = new Observable(new Memory<Employee>()),
316
317 getStuff() {
318 return this.staff.query();
319 }
320
321 addEmployee(employee: Employee) {
322 this.staff.add(employee); // the rendition will update automatically
323 }
324
325 render() {
326 return <table>
327 <thead>
328 <tr><th>Name</th><th>Position</th><th>Salary</th></tr>
329 </thead>
330 <tbody>
331 {watchFor(this.getStaff(), ({name, position, salary}) =>
332 <tr><td>{name}</td><td>{position}</td><td>{salary}</td></tr>
333 )}
334 </tbody>
335 </table>
336 }
337 ```
@@ -1,5 +1,6
1 1 import { Cancellation } from "@implab/core-amd/Cancellation";
2 2 import { ICancellation } from "@implab/core-amd/interfaces";
3 import { isPromise } from "@implab/core-amd/safe";
3 4
4 5 /**
5 6 * The interface for the consumer of an observable sequence
@@ -137,9 +138,12 export interface Observable<T> extends S
137 138 pipe<U>(op: OperatorFn<T, U>): Observable<U>;
138 139
139 140 /** Waits for the next event to occur and returns a promise for the next value
140 * @param ct Cancellation token to
141 * @param ct Cancellation token
141 142 */
142 143 next(ct?: ICancellation): Promise<T>;
144
145 /** Collects items of the sequence to the array. */
146 collect(ct?: ICancellation): Promise<T[]>;
143 147 }
144 148
145 149 const noop = () => { };
@@ -292,61 +296,97 const _observe = <T>(producer: Producer<
292 296
293 297 pipe: <U>(op: (source: Observable<T>) => Producer<U>) => observe(op(_observe(producer))),
294 298
295 next: (ct?: ICancellation) => {
296 const _ct = ct ?? Cancellation.none;
297 return new Promise<T>((resolve, reject) => {
298 // wrap the producer to handle only single event
299 const once = fuse<T>(({ next, complete, error, isClosed }) => {
300 const h = _ct.register(error);
299 next: collect(
300 producer,
301 ({ next, complete, error, isClosed }) => ({
302 next: v => (next(v), complete()),
303 complete: () => error(new Error("The sequence is empty")),
304 error,
305 isClosed
306 })
307 ),
308
309 collect: collect(
310 producer,
311 ({ next, complete, ...rest }) => {
312 const data: T[] = [];
313 return {
314 next: v => data.push(v),
315 complete: () => (next(data), complete()),
316 ...rest
317 };
318 }
319 )
320 });
301 321
302 // is the _ct fires it will call error() and isClosed() will return true
303 const cleanup = !isClosed() ?
304 producer({
305 next: v => (next(v), complete()),
306 complete: () => error(new Error("The sequence is empty")),
307 error,
308 isClosed
309 }) ?? noop :
310 noop;
322 const collect = <T, U>(
323 producer: Producer<T>,
324 collector: (result: Sink<U>) => Sink<T>
325 ) => (ct = Cancellation.none) => new Promise<U>((resolve, reject) => {
326 const fused = fuse<U>(({ next, complete, error, isClosed }) => {
327 const h = ct.register(error);
328 const cleanup = !isClosed() ?
329 producer(collector({ next, complete, error, isClosed })) ?? noop :
330 noop;
311 331
312 return () => {
313 h.destroy();
314 cleanup();
315 };
316 });
332 return () => {
333 h.destroy();
334 cleanup();
335 };
336 });
317 337
318 once({
319 next: resolve,
320 error: reject,
321 complete: noop,
322 isClosed: () => false
323 });
324 });
325 }
338 fused({
339 next: resolve,
340 error: reject,
341 complete: noop,
342 isClosed: () => false
343 });
326 344 });
327 345
328 346 export const observe = <T>(producer: Producer<T>) => _observe(fuse(producer));
329 347
330 export const streamArray = <T>(items: T[]) => _observe<T>(
348 export const ofArray = <T>(items: T[]) => _observe<T>(
331 349 ({ next, complete }) => (
332 350 items.forEach(next),
333 351 complete()
334 352 )
335 353 );
336 354
337 export const streamPromise = <T>(promise: PromiseLike<T>) => observe<T>(
338 ({ next, error, complete }) => void promise.then(
339 v => (next(v), complete()),
340 error
341 )
355 const of1 = <T>(item: T | PromiseLike<T>) => observe<T>(
356 ({ next, error, complete }) =>
357 isPromise(item) ?
358 void item.then(
359 v => (next(v), complete()),
360 error
361 ) :
362 (next(item), complete())
342 363 );
343 364
344 export const of = <T>(...items: T[]) => _observe<T>(
345 ({ next, complete }) => (
346 items.forEach(next),
347 complete()
348 )
349 );
365 export const of = <T>(...items: (T | PromiseLike<T>)[]) => items.length === 1 ?
366 of1(items[0]) :
367 observe<T>(
368 ({ next, error, complete, isClosed }) => {
369 const n = items.length;
370
371 const _next = (start: number) => {
372 if (start > 0 && isClosed()) // when resumed
373 return;
374
375 for (let i = start; i < n; i++) {
376 const r = items[i];
377 if (isPromise(r)) {
378 r.then(v => (next(v), _next(i + 1)), error);
379 return; // suspend
380 } else {
381 next(r);
382 }
383 }
384 complete();
385 };
386
387 _next(0);
388 }
389 );
350 390
351 391 export const empty = _observe<never>(({ complete }) => complete());
352 392
@@ -474,7 +514,7 export const subject = <T>(producer: Pro
474 514 if (_subscribers === subscribers) {
475 515 const pos = subscribers.indexOf(client);
476 516 if (pos >= 0)
477 subscribers.splice(pos,1);
517 subscribers.splice(pos, 1);
478 518 if (!subscribers.length)
479 519 cleanup();
480 520 }
@@ -35,13 +35,16 export const isDjObservableResults = <T>
35 35 export const query = <T, Q, O>(store: Queryable<T, Q, O>, includeUpdates = true) =>
36 36 (query?: Q, options?: O & { observe?: boolean }) => {
37 37 return observe<OrderedUpdate<T>>(({ next, complete, error, isClosed }) => {
38
39 const processResults = (items: T[]) =>
40 items.forEach((item, newIndex) => next({ item, newIndex, prevIndex: -1 }));
41
38 42 try {
39 43 const results = store.query(query, options);
40 44 if (isPromise(results)) {
41 results.then(items => items.forEach((item, newIndex) => next({ item, newIndex, prevIndex: -1 })))
42 .then(undefined, error);
45 results.then(processResults).then(undefined, error);
43 46 } else {
44 results.forEach((item, newIndex) => next({ item, newIndex, prevIndex: -1 }));
47 processResults(results);
45 48 }
46 49
47 50 if (!isClosed() && (options?.observe !== false) && isDjObservableResults<T>(results)) {
@@ -1,6 +1,7
1 1 // eslint-disable-next-line @typescript-eslint/triple-slash-reference
2 2 /// <reference path="./css-plugin.d.ts"/>
3 3
4 import _WidgetBase = require("dijit/_WidgetBase");
4 5 import { Rendition } from "./tsx/traits";
5 6
6 7 declare global {
@@ -8,7 +9,7 declare global {
8 9
9 10 type Ref<T> = ((value: T | undefined) => void);
10 11
11 type Element = Rendition;
12 type Element = Rendition | _WidgetBase;
12 13
13 14 interface DjxIntrinsicAttributes<E> {
14 15 /** alias for className */
@@ -1,6 +1,7
1 import { empty, observe } from "./observable";
2 import * as t from "tap";
1 import { empty, observe, of } from "./observable";
2 import * as tap from "tap";
3 3 import { Cancellation } from "@implab/core-amd/Cancellation";
4 import { delay } from "@implab/core-amd/safe";
4 5
5 6 const subj1 = observe<number>(({ next, complete }) => {
6 7 next(1);
@@ -16,10 +17,10 const consumer1 = {
16 17 };
17 18
18 19 subj1.subscribe(consumer1);
19 t.equal(consumer1.sum, 1, "Should get only one value");
20 tap.equal(consumer1.sum, 1, "Should get only one value");
20 21
21 22 subj1.subscribe(consumer1);
22 t.equal(consumer1.sum, 2, "Should get the value again");
23 tap.equal(consumer1.sum, 2, "Should get the value again");
23 24
24 25 const consumer2 = {
25 26 value: 0,
@@ -32,31 +33,31 let maps = 0;
32 33
33 34 subj1
34 35 .map(v => {
35 t.comment(`map1: ${v * 2}`);
36 tap.comment(`map1: ${v * 2}`);
36 37 maps++;
37 38 return v * 2;
38 39 })
39 40 .map(v => {
40 t.comment(`map2: ${v * 2}`);
41 tap.comment(`map2: ${v * 2}`);
41 42 maps++;
42 43 return v * 2;
43 44 })
44 45 .map(v => {
45 t.comment(`map3: ${v * 2}`);
46 tap.comment(`map3: ${v * 2}`);
46 47 maps++;
47 48 return v * 2;
48 49 })
49 50 .subscribe(consumer2);
50 51
51 t.equal(consumer2.value, 8, "Should map");
52 t.equal(maps, 3, "The map chain should not be executed after completion");
53 t.ok(consumer2.completed, "The completion signal should pass through");
52 tap.equal(consumer2.value, 8, "Should map");
53 tap.equal(maps, 3, "The map chain should not be executed after completion");
54 tap.ok(consumer2.completed, "The completion signal should pass through");
54 55
55 56 const subj2 = observe<number>(({ next, complete }) => {
56 57 [1, 2, 3, 4, 5].forEach(next);
57 58 complete();
58 59 return () => {
59 t.comment("subj2: unsubscribe");
60 tap.comment("subj2: unsubscribe");
60 61 };
61 62 });
62 63
@@ -67,7 +68,7 const consumer3 = {
67 68 subscribed: 0,
68 69 unsubscribed: 0,
69 70 next(v: "even" | "odd") {
70 this[v] ++;
71 this[v]++;
71 72 },
72 73 complete() {
73 74 this.completed = true;
@@ -96,7 +97,7 const subj3 = subj2.pipe<"even" | "odd">
96 97 complete,
97 98 error
98 99 });
99 return () =>{
100 return () => {
100 101 consumer3.unsubscribe();
101 102 h.unsubscribe();
102 103 };
@@ -104,24 +105,46 const subj3 = subj2.pipe<"even" | "odd">
104 105
105 106 subj3.subscribe(consumer3);
106 107
107 t.equal(consumer3.odd, 2, "Should get 2 odd elements");
108 t.equal(consumer3.even, 2, "Should get 2 even elements");
109 t.ok(consumer3.completed, "The sequence should completed");
110 t.equal(consumer3.subscribed, 1, "The subscription should be done once");
111 t.equal(consumer3.unsubscribed, 1, "The cleanup should be done after completion");
108 tap.equal(consumer3.odd, 2, "Should get 2 odd elements");
109 tap.equal(consumer3.even, 2, "Should get 2 even elements");
110 tap.ok(consumer3.completed, "The sequence should completed");
111 tap.equal(consumer3.subscribed, 1, "The subscription should be done once");
112 tap.equal(consumer3.unsubscribed, 1, "The cleanup should be done after completion");
112 113
113 subj2.reduce((a,b) => a + b).subscribe({
114 next: val => t.comment("subj2: reduce =", val),
115 complete: () => t.comment("subj2: complete")
114 subj2.reduce((a, b) => a + b).subscribe({
115 next: val => tap.comment("subj2: reduce =", val),
116 complete: () => tap.comment("subj2: complete")
116 117 });
117 118
118 subj2.reduce((a,b) => a + b).next()
119 .then(value => t.comment("subj2: next reduce=", value))
120 .catch(() => {});
119 tap.test("of(...) tests", async t => {
120 await subj2.reduce((a, b) => a + b).next()
121 .then(value => t.comment("subj2: next reduce=", value));
122
123 await subj2.next().then(val => t.equal(val, 1, "Should peek the first element"));
124
125 const cancelled = new Cancellation(cancel => cancel());
126 await t.rejects(subj2.next(cancelled), "Cancelled next() method should fail");
127
128 await t.rejects(empty.next(), "Empty sequence should fail to get next element");
129
130 await of(delay(1).then(() => 1), Promise.resolve(2), 3)
131 .reduce<number[]>((a, x) => [...a, x], [])
132 .next()
133 .then(res => t.same(res, [1, 2, 3], "of(...) should keep the order"));
121 134
122 subj2.next().then(val => t.equal(val, 1, "Should peek the first element")).catch(() => {});
135 const rejected = Promise.reject("DIE!");
136 rejected.catch(() => { }); // SAFE AND SOUND
123 137
124 const cancelled = new Cancellation(cancel => cancel());
125 t.rejects(subj2.next(cancelled), "Cancelled next() method should fail").catch(() => {});
138 await t.resolves(
139 of(Promise.resolve(1), rejected).next(),
140 "of(...) should emit non-rejected items"
141 );
142 await t.rejects(
143 of(1, Promise.reject("DIE!")).reduce((a) => a).next(),
144 "of(...) should terminate with error when a parameter is rejected"
145 );
126 146
127 t.rejects(empty.next(), "Empty sequence should fail to get next element").catch(() => {}); No newline at end of file
147 t.same(await of(1,2,3).collect(), [1,2,3], ".collect() should return the collected sequence");
148 await t.rejects(of(1,2,3).collect(cancelled), ".collect() should support cancellation");
149
150 }).catch(() => { }); No newline at end of file
@@ -1,4 +1,4
1 1 import "./declare-tests";
2 2 import "./observable-tests";
3 3 import "./state-tests";
4 import "./subject-tests"; No newline at end of file
4 import "./subject-tests";
General Comments 0
You need to be logged in to leave comments. Login now