##// END OF EJS Templates
added observable.collect() method to collect a sequnce to the array...
cin -
r129:66546e709732 v1.8.0 default
parent child
Show More
@@ -1,306 +1,337
1 1 # @implab/djx
2 2
3 3 ## SYNOPSIS
4 4
5 ```tsx
5 ```jsx
6 6 import { djbase, djclass, bind, prototype, AbstractConstructor } from "@implab/djx/declare";
7 7
8 8 import { DjxWidgetBase } from "@implab/djx/tsx/DjxWidgetBase";
9 9 import { createElement } from "@implab/djx/tsx";
10 10
11 11 interface MyWidgetAttrs {
12 12 title: string;
13 13
14 14 counter: number;
15 15 }
16 16
17 17 interface MyWidgetEvents {
18 18 "count-inc": Event;
19 19
20 20 "count-dec": Event;
21 21 }
22 22
23 23
24 24 @djclass
25 25 export class MyWidget extends djbase(
26 26 DjxWidgetBase as AbstractConstructor<DjxWidgetBase<MyWidgetAttrs, MyWidgetEvents>>
27 27 ) {
28 28
29 29 @bind({ node: "titleNode", type: "innerHTML" })
30 30 title = "";
31 31
32 32 @prototype()
33 33 counter = 0;
34 34
35 35 render() {
36 36 const Frame = (props: any) => <div>{props.children}</div>;
37 37 return <div
38 38 className="myWidget"
39 39 tabIndex={3}
40 40 style={ alignContent: "center", border: "1px solid" }
41 41 >
42 42 <h1 data-dojo-attach-point="titleNode"></h1>
43 43 <Frame>
44 44 <span class="up-button" onclick={e => this._onIncClick(e)}>[+]</span>
45 45 <span class="down-button" onclick={() => this._onDecClick()}>[-]</span>
46 46 </Frame>
47 47 </div>;
48 48 }
49 49
50 50 _onIncClick(e: MouseEvent) {
51 51 this.emit("count-inc", { bubbles: false });
52 52 }
53 53
54 54 _onDecClick() {
55 55 this.emit("count-dec", { bubbles: false });
56 56 }
57 57 }
58 58
59 59 ```
60 60
61 61 ## DESCRIPTION
62 62
63 63 This package provides you with the tools to glue your good-fellow dojo with modern
64 64 techniques of building the webapp. The core concept is to built around widgets and
65 65 using .tsx to write it. Here are some features:
66 66
67 67 * `djbase()`, `@djaclass` - traits to declare your classes with `dojo/_base/declare`
68 68 * `@implab/djx/tsx` - traits to build the rendering of your widgets with tsx
69 69 * `DjxWidgetBase` - abstract class which supports tsx markup and
70 70 `data-dojo-attach-*` attributes.
71 71 * `@bind(...)` - annotations provide an easy way of using standard dojo widget
72 72 attribute bindings.
73 73
74 74 ### djbase, @djclass
75 75
76 76 These two traits provide convenient way of using `dojo/_base/declare` in Typescript
77 77 for declaring your classes.
78 78
79 79 `djbase(...constructors)` - this method accepts a list of constructors in its
80 80 parameters and returns the **fake** base type which then can be used to derive
81 81 your own class. This allows you to provide the Typescript with the correct
82 82 information about the base type and even use `super`!. The only caveat of
83 83 this approach is that you **MUST** decorate your class with `@djclass` annotation.
84 84
85 85 Consider the following example:
86 86
87 87 ```ts
88 88 import { djbase, djclass } from "@implab/djx/declare";
89 89 import { FooMixin } from "./FooMixin";
90 90 import { BarMixin } from "./BarMixin";
91 91 import { BoxMixin } from "./BoxMixin";
92 92
93 93 @djclass
94 94 export class Baz extends djbase(FooMixin, BarMixin, BoxMixin) {
95 95 writeHello(out: string[]) {
96 96 out.push("-> Baz");
97 97
98 98 super.writeHello(out);
99 99
100 100 out.push("<- Baz");
101 101 }
102 102 }
103 103
104 104 ```
105 105
106 106 All mixins are declared like the one below:
107 107
108 108 ```ts
109 109 import { djclass, djbase } from "@implab/djx/declare";
110 110
111 111 interface Super {
112 112 writeHello(out: string[]): void;
113 113
114 114 }
115 115
116 116 @djclass
117 117 export class BarMixin extends djbase<Super>() {
118 118 writeHello(out: string[]) {
119 119 out.push("-> Bar");
120 120
121 121 super.writeHello(out);
122 122
123 123 out.push("<- Bar");
124 124 }
125 125 }
126 126 ```
127 127
128 128 finally create an instance and call the `writeHello` method
129 129
130 130 ```ts
131 131 const baz = new Baz();
132 132
133 133 const data: string[] = [];
134 134 baz.writeHello(data);
135 135
136 136 console.log(data.join("\n"));
137 137
138 138 ```
139 139
140 140 you will get the following output:
141 141
142 142 ```text
143 143 -> Baz
144 144 -> Box
145 145 -> Bar
146 146 -> Foo
147 147 <- Foo
148 148 <- Bar
149 149 <- Box
150 150 <- Baz
151 151 ```
152 152
153 153 Let's take a closer look at the `Baz` declaration it uses `djbase` to derive
154 154 from three mixins and the class is decorated with `@djclass` to accomplish the
155 155 declaration and make a real constructor.
156 156
157 157 To allow access to the next sibling method (in terms of multiple inheritance)
158 158 Dojo provides `this.inherited(arguments)` method but this approach leads to the
159 159 problem with 'strict' mode of ES5 and eliminates the type information about a
160 160 calling method. This library solves the problem calling inherited/next method by
161 161 utilizing `super` keyword. Under the hood there are proxy methods generated in
162 162 the prototype of the declared class which make calls to `this.inherited(...)`
163 163 method. This technique is compatible with 'strict' mode.
164 164
165 165 Mixins are declared similar, they also may have the base types although
166 166 the most common case is declaring the mixin without any base classes. To allow
167 167 the mixin to access the next method declare the interface with
168 168 desired methods and use the special form of `djbase<Super>()` without arguments.
169 169
170 170 ### DjxWidgetBase<Attrs, Events>
171 171
172 172 This is the base class for the djx widgets. It declares the abstract method
173 173 `render()` which is used to render the content of the widget, like `_TemplatedMixin`.
174 174
175 175 This class extends `dijit/_WidgetBase` and contains logic from `_AttachMixin` thus
176 176 it is capable to handle `data-dojo-attach-*` attributes from the rendered markup.
177 177
178 ```tsx
178 ```jsx
179 179 @djclass
180 180 export class MyFirstWidget extends djbase(DjxWidgetBase) {
181 181 render() {
182 182 return <h1>My first widget</h1>;
183 183 }
184 184 }
185 185 ```
186 186
187 187 ### Markup (.tsx)
188 188
189 189 Add to your `tsconfig.json` the following options
190 190
191 191 ```json
192 192 {
193 193 "compilerOptions": {
194 194 "types": [
195 195 "@implab/djx",
196 196 "@implab/dojo-typings"
197 197 ],
198 198 "skipLibCheck": true,
199 199 "experimentalDecorators": true,
200 200 "jsxFactory": "createElement",
201 201 "jsx": "react",
202 "target": "ES5", // minimal supported version
202 "target": "ES5",
203 203 "lib": ["ES2015", "DOM"]
204 204 }
205 205 }
206 206
207 207 ```
208 208
209 209 Import `createElement` into your `.tsx` file
210 210
211 211 ```ts
212 212 import { createElement } from "@implab/djx/tsx";
213 213 ```
214 214
215 215 You are ready to go!
216 216
217 217 ### Adding reactive behavior: refs, watch(...) and watchFor(...)
218 218
219 219 This library adds some reactive traits to update the generated DOM of the widget.
220 220 Dojo 1.x adds some standard options to deal with dynamic changes:
221 221
222 222 * `data-dojo-attach-point` allows to get reference to an element (or a nested widget)
223 223 * widget attribute mappings, allows to bind widget's property to a property of
224 224 the element, referenced by `data-dojo-attach-point`.
225 225
226 226 The typical implementation of this technique would look like
227 227
228 ```tsx
228 ```jsx
229 229 import { createElement } from "@implab/djx/tsx";
230 230 import {djclass, djbase, bind} from "@implab/djx/declare";
231 231
232 232 @djclass
233 233 export class MyFirstWidget extends djbase(DjxWidgetBase) {
234 234
235 235 // @bind will generate special attribute mapping
236 236 // _setCaptionAttr = { node: "captionNode", type: "innerHTML" }
237 237 @bind({ node: "captionNode", type: "innerHTML" })
238 238 caption = "My first widget";
239 239
240 240 render() {
241 241 return <h1 data-dojo-attach-point="captionNode"/>;
242 242 }
243 243 }
244 244 ```
245 245
246 246 Despite this is a natural way for the dojo it has some disadvantages:
247 247
248 248 1. The compiler doesn't check existence of the attach-point.
249 249 2. Attribute mappings support only simple mappings, it's difficult to update the
250 250 complex rendition.
251 251
252 252 This library helps you to get both goals with special trait `watch(...)`
253 253
254 ```tsx
254 ```jsx
255 255 import { createElement } from "@implab/djx/tsx";
256 256 import { djclass, djbase} from "@implab/djx/declare"
257 257
258 258 @djclass
259 259 export class MyFirstWidget extends djbase(DjxWidgetBase) {
260 260
261 261 caption = "My first widget";
262 262
263 263 render() {
264 264 return <h1>{watch(this,"caption", value => value)}</h1>;
265 265 }
266 266 }
267 267 ```
268 268
269 269 In this example we replaced attach-point with simple call to `watch` function
270 270 which renders string value to text representation (text node). It will create a
271 271 rendition which will observe the `caption` property of the widget and update its
272 272 contents according to the value changes of the property.
273 273
274 274 The key feature of this approach that the rendering function within `watch` may
275 275 return a complex rendition.
276 276
277 ```tsx
277 ```jsx
278 278 // inside some widget
279 279 render() {
280 280 return <section>
281 281 {watch(this,"user", value => value && [
282 282 <UserInfo user={value}/>,
283 283 <LogoutButton click={this._logoutClick}/>
284 284 ])}
285 285 </section>;
286 286 }
287 287
288 288 private readonly _logoutClick = () => { /* do logout */ }
289 289
290 290 ```
291 291
292 292 The `watch` function has two forms:
293 293
294 294 * `watch(stateful, prop, render)` - observes the specified property of the
295 295 `dojo/Stateful` object (or widget)
296 296 * `watch(observable, render)` - observes the specified observable. It supports
297 297 `rxjs` or `@implab/djx/observable` observables.
298 298
299 299 The `render` callback may return almost anything which will be converted to DOM:
300 300
301 301 * `boolean`, `null`, `undefined` - ignored,
302 302 * `string` - converted to text node,
303 303 * `array` - converted to DocumentFragment of its elements,
304 304 * DOM Nodes and widgets are left intact,
305 305 * any other kind of value will cause an error.
306 306
307 The watch method allows to observe a single value, for the large sets of data
308 this isn't suitable well and may lead to performance issues. Dojo provides
309 observable stores to being able to track individual changes. The library provides
310 `watchFor(observable, render)` method to render observable query results and
311 handle changes on per item basis.
312
313 ```jsx
314 // inside some widget
315 staff = new Observable(new Memory<Employee>()),
316
317 getStuff() {
318 return this.staff.query();
319 }
320
321 addEmployee(employee: Employee) {
322 this.staff.add(employee); // the rendition will update automatically
323 }
324
325 render() {
326 return <table>
327 <thead>
328 <tr><th>Name</th><th>Position</th><th>Salary</th></tr>
329 </thead>
330 <tbody>
331 {watchFor(this.getStaff(), ({name, position, salary}) =>
332 <tr><td>{name}</td><td>{position}</td><td>{salary}</td></tr>
333 )}
334 </tbody>
335 </table>
336 }
337 ```
@@ -1,483 +1,523
1 1 import { Cancellation } from "@implab/core-amd/Cancellation";
2 2 import { ICancellation } from "@implab/core-amd/interfaces";
3 import { isPromise } from "@implab/core-amd/safe";
3 4
4 5 /**
5 6 * The interface for the consumer of an observable sequence
6 7 */
7 8 export interface Observer<T> {
8 9 /**
9 10 * Called for the next element in the sequence
10 11 */
11 12 next: (value: T) => void;
12 13
13 14 /**
14 15 * Called once when the error occurs in the sequence.
15 16 */
16 17 error: (e: unknown) => void;
17 18
18 19 /**
19 20 * Called once at the end of the sequence.
20 21 */
21 22 complete: () => void;
22 23 }
23 24
24 25 /**
25 26 * The group of functions to feed an observable. These methods are provided to
26 27 * the producer to generate a stream of events.
27 28 */
28 29 export type Sink<T> = {
29 30 /**
30 31 * Call to send the next element in the sequence
31 32 */
32 33 next: (value: T) => void;
33 34
34 35 /**
35 36 * Call to notify about the error occurred in the sequence.
36 37 */
37 38 error: (e: unknown) => void;
38 39
39 40 /**
40 41 * Call to signal the end of the sequence.
41 42 */
42 43 complete: () => void;
43 44
44 45 /**
45 46 * Checks whether the sink is accepting new elements. It's safe to
46 47 * send elements to the closed sink.
47 48 */
48 49 isClosed: () => boolean;
49 50 };
50 51
51 52 export type Producer<T> = (sink: Sink<T>) => (void | (() => void));
52 53
53 54 export interface Unsubscribable {
54 55 unsubscribe(): void;
55 56 }
56 57
57 58 export const isUnsubscribable = (v: unknown): v is Unsubscribable =>
58 59 v !== null && v !== undefined && typeof (v as Unsubscribable).unsubscribe === "function";
59 60
60 61 export const isSubscribable = <T = unknown>(v: unknown): v is Subscribable<T> =>
61 62 v !== null && v !== undefined && typeof (v as Subscribable<unknown>).subscribe === "function";
62 63
63 64 export interface Subscribable<T> {
64 65 subscribe(consumer: Partial<Observer<T>>): Unsubscribable;
65 66 }
66 67
67 68 export type AccumulatorFn<T, A> = (acc: A, value: T) => A;
68 69
69 70 export type OperatorFn<T, U> = (source: Observable<T>) => Producer<U>;
70 71
71 72 /** The observable source of items. */
72 73 export interface Observable<T> extends Subscribable<T> {
73 74 /** Transforms elements of the sequence with the specified mapper
74 75 *
75 76 * @param mapper The mapper used to transform the values
76 77 */
77 78 map<T2>(mapper: (value: T) => T2): Observable<T2>;
78 79
79 80 /** Filters elements of the sequence. The resulting sequence will
80 81 * contain only elements which match the specified predicate.
81 82 *
82 83 * @param predicate The filter predicate.
83 84 */
84 85 filter(predicate: (value: T) => boolean): Observable<T>;
85 86
86 87 /** Completes the sequence once the condition is met.
87 88 * @param predicate The condition which should be met to complete the sequence
88 89 */
89 90 until(predicate: (value: T) => boolean): Observable<T>;
90 91
91 92 /** Keeps the sequence running while elements satisfy the condition.
92 93 *
93 94 * @param predicate The condition which should be met to continue.
94 95 */
95 96 while(predicate: (value: T) => boolean): Observable<T>;
96 97
97 98 /** Applies accumulator to each value in the sequence and
98 99 * emits the accumulated value for each source element
99 100 *
100 101 * @param accumulator
101 102 * @param initial
102 103 */
103 104 scan<A>(accumulator: AccumulatorFn<T, A>, initial: A): Observable<A>;
104 105 scan(accumulator: AccumulatorFn<T, T>): Observable<T>;
105 106
106 107 /** Applies accumulator to each value in the sequence and
107 108 * emits the accumulated value at the end of the sequence
108 109 *
109 110 * @param accumulator
110 111 * @param initial
111 112 */
112 113 reduce<A>(accumulator: AccumulatorFn<T, A>, initial: A): Observable<A>;
113 114 reduce(accumulator: AccumulatorFn<T, T>): Observable<T>;
114 115
115 116 /** Concatenates the specified sequences with this observable
116 117 *
117 118 * @param seq sequences to concatenate with the current observable
118 119 *
119 120 * The concatenation doesn't accumulate values from the specified sequences,
120 121 * The result of the concatenation is the new observable which will switch
121 122 * to the next observable after the previous one completes. Values emitted
122 123 * before the next observable being active are lost.
123 124 */
124 125 cat(...seq: Subscribable<T>[]): Observable<T>;
125 126
126 127
127 128 /** Pipes the specified operator to produce the new observable
128 129 * @param op The operator consumes this observable and produces a new one
129 130 *
130 131 * The operator is a higher order function which takes a source observable
131 132 * and returns a producer for the new observable.
132 133 *
133 134 * This function can be used to create a complex mapping between source and
134 135 * resulting observables. The operator may have a state (or a side effect)
135 136 * and can be connected to multiple observables.
136 137 */
137 138 pipe<U>(op: OperatorFn<T, U>): Observable<U>;
138 139
139 140 /** Waits for the next event to occur and returns a promise for the next value
140 * @param ct Cancellation token to
141 * @param ct Cancellation token
141 142 */
142 143 next(ct?: ICancellation): Promise<T>;
144
145 /** Collects items of the sequence to the array. */
146 collect(ct?: ICancellation): Promise<T[]>;
143 147 }
144 148
145 149 const noop = () => { };
146 150
147 151 const sink = <T>(consumer: Partial<Observer<T>>) => {
148 152 const { next, error, complete } = consumer;
149 153 return {
150 154 next: next ? next.bind(consumer) : noop,
151 155 error: error ? error.bind(consumer) : noop,
152 156 complete: complete ? complete.bind(consumer) : noop,
153 157 isClosed: () => false
154 158 };
155 159 };
156 160
157 161 /** Wraps the producer to handle tear down logic and subscription management
158 162 *
159 163 * @param producer The producer to wrap
160 164 * @returns The wrapper producer
161 165 */
162 166 const fuse = <T>(producer: Producer<T>) => ({ next, error, complete }: Sink<T>) => {
163 167 let done = false;
164 168 let cleanup = noop;
165 169
166 170 const _fin = <A extends unknown[]>(fn: (...args: A) => void) =>
167 171 (...args: A) => done ?
168 172 void (0) :
169 173 (done = true, cleanup(), fn(...args));
170 174
171 175 const _fin0 = () => done ? void (0) : (done = true, cleanup());
172 176
173 177 const safeSink = {
174 178 next: (value: T) => { !done && next(value); },
175 179 error: _fin(error),
176 180 complete: _fin(complete),
177 181 isClosed: () => done
178 182 };
179 183 cleanup = producer(safeSink) ?? noop;
180 184 return done ? cleanup() : _fin0;
181 185 };
182 186
183 187 const _observe = <T>(producer: Producer<T>): Observable<T> => ({
184 188 subscribe: (consumer: Partial<Observer<T>>) => ({
185 189 unsubscribe: producer(sink(consumer)) ?? noop
186 190 }),
187 191
188 192 map: (mapper) => _observe(({ next, ...rest }) =>
189 193 producer({
190 194 next: next !== noop ? (v: T) => next(mapper(v)) : noop,
191 195 ...rest
192 196 })
193 197 ),
194 198
195 199 filter: (predicate) => _observe(({ next, ...rest }) =>
196 200 producer({
197 201 next: next !== noop ? (v: T) => predicate(v) ? next(v) : void (0) : noop,
198 202 ...rest
199 203 })
200 204 ),
201 205
202 206 until: predicate => _observe(({ next, complete, ...rest }) =>
203 207 producer({
204 208 next: v => predicate(v) ? complete() : next(v),
205 209 complete,
206 210 ...rest
207 211 })
208 212 ),
209 213
210 214 while: predicate => _observe(({ next, complete, ...rest }) =>
211 215 producer({
212 216 next: v => predicate(v) ? next(v) : complete(),
213 217 complete,
214 218 ...rest
215 219 })
216 220 ),
217 221
218 222 scan: <A>(...args: [AccumulatorFn<T, A>, A] | [AccumulatorFn<T, T>]) => _observe<T | A>(({ next, ...rest }) => {
219 223 if (args.length === 1) {
220 224 const [accumulator] = args;
221 225 let _acc: T;
222 226 let index = 0;
223 227 return producer({
224 228 next: next !== noop ? (v: T) => next(index++ === 0 ? _acc = v : _acc = accumulator(_acc, v)) : noop,
225 229 ...rest
226 230 });
227 231 } else {
228 232 const [accumulator, initial] = args;
229 233 let _acc = initial;
230 234 return producer({
231 235 next: next !== noop ? (v: T) => next(_acc = accumulator(_acc, v)) : noop,
232 236 ...rest
233 237 });
234 238 }
235 239 }),
236 240
237 241 reduce: <A>(...args: [AccumulatorFn<T, A>, A] | [AccumulatorFn<T, T>]) => _observe<T | A>(({ next, complete, error, ...rest }) => {
238 242 if (args.length === 1) {
239 243 const [accumulator] = args;
240 244 let _acc: T;
241 245 let index = 0;
242 246 return producer({
243 247 next: next !== noop ? (v: T) => {
244 248 _acc = index++ === 0 ? v : accumulator(_acc, v);
245 249 } : noop,
246 250 complete: () => {
247 251 if (index === 0) {
248 252 error(new Error("The sequence can't be empty"));
249 253 } else {
250 254 next(_acc);
251 255 complete();
252 256 }
253 257 },
254 258 error,
255 259 ...rest
256 260 });
257 261 } else {
258 262 const [accumulator, initial] = args;
259 263 let _acc = initial;
260 264 return producer({
261 265 next: next !== noop ? (v: T) => {
262 266 _acc = accumulator(_acc, v);
263 267 } : noop,
264 268 complete: () => {
265 269 next(_acc);
266 270 complete();
267 271 },
268 272 error,
269 273 ...rest
270 274 });
271 275 }
272 276 }),
273 277
274 278 cat: (...seq) => _observe(({ next, complete: final, ...rest }) => {
275 279 let cleanup: () => void;
276 280 const complete = () => {
277 281 const continuation = seq.shift();
278 282 if (continuation) {
279 283 // if we have a next sequence, subscribe to it
280 284 const subscription = continuation.subscribe({ next, complete, ...rest });
281 285 cleanup = subscription.unsubscribe.bind(subscription);
282 286 } else {
283 287 // otherwise notify the consumer about completion
284 288 final();
285 289 }
286 290 };
287 291
288 292 cleanup = producer({ next, complete, ...rest }) ?? noop;
289 293
290 294 return () => cleanup();
291 295 }),
292 296
293 297 pipe: <U>(op: (source: Observable<T>) => Producer<U>) => observe(op(_observe(producer))),
294 298
295 next: (ct?: ICancellation) => {
296 const _ct = ct ?? Cancellation.none;
297 return new Promise<T>((resolve, reject) => {
298 // wrap the producer to handle only single event
299 const once = fuse<T>(({ next, complete, error, isClosed }) => {
300 const h = _ct.register(error);
299 next: collect(
300 producer,
301 ({ next, complete, error, isClosed }) => ({
302 next: v => (next(v), complete()),
303 complete: () => error(new Error("The sequence is empty")),
304 error,
305 isClosed
306 })
307 ),
308
309 collect: collect(
310 producer,
311 ({ next, complete, ...rest }) => {
312 const data: T[] = [];
313 return {
314 next: v => data.push(v),
315 complete: () => (next(data), complete()),
316 ...rest
317 };
318 }
319 )
320 });
301 321
302 // is the _ct fires it will call error() and isClosed() will return true
303 const cleanup = !isClosed() ?
304 producer({
305 next: v => (next(v), complete()),
306 complete: () => error(new Error("The sequence is empty")),
307 error,
308 isClosed
309 }) ?? noop :
310 noop;
322 const collect = <T, U>(
323 producer: Producer<T>,
324 collector: (result: Sink<U>) => Sink<T>
325 ) => (ct = Cancellation.none) => new Promise<U>((resolve, reject) => {
326 const fused = fuse<U>(({ next, complete, error, isClosed }) => {
327 const h = ct.register(error);
328 const cleanup = !isClosed() ?
329 producer(collector({ next, complete, error, isClosed })) ?? noop :
330 noop;
311 331
312 return () => {
313 h.destroy();
314 cleanup();
315 };
316 });
332 return () => {
333 h.destroy();
334 cleanup();
335 };
336 });
317 337
318 once({
319 next: resolve,
320 error: reject,
321 complete: noop,
322 isClosed: () => false
323 });
324 });
325 }
338 fused({
339 next: resolve,
340 error: reject,
341 complete: noop,
342 isClosed: () => false
343 });
326 344 });
327 345
328 346 export const observe = <T>(producer: Producer<T>) => _observe(fuse(producer));
329 347
330 export const streamArray = <T>(items: T[]) => _observe<T>(
348 export const ofArray = <T>(items: T[]) => _observe<T>(
331 349 ({ next, complete }) => (
332 350 items.forEach(next),
333 351 complete()
334 352 )
335 353 );
336 354
337 export const streamPromise = <T>(promise: PromiseLike<T>) => observe<T>(
338 ({ next, error, complete }) => void promise.then(
339 v => (next(v), complete()),
340 error
341 )
355 const of1 = <T>(item: T | PromiseLike<T>) => observe<T>(
356 ({ next, error, complete }) =>
357 isPromise(item) ?
358 void item.then(
359 v => (next(v), complete()),
360 error
361 ) :
362 (next(item), complete())
342 363 );
343 364
344 export const of = <T>(...items: T[]) => _observe<T>(
345 ({ next, complete }) => (
346 items.forEach(next),
347 complete()
348 )
349 );
365 export const of = <T>(...items: (T | PromiseLike<T>)[]) => items.length === 1 ?
366 of1(items[0]) :
367 observe<T>(
368 ({ next, error, complete, isClosed }) => {
369 const n = items.length;
370
371 const _next = (start: number) => {
372 if (start > 0 && isClosed()) // when resumed
373 return;
374
375 for (let i = start; i < n; i++) {
376 const r = items[i];
377 if (isPromise(r)) {
378 r.then(v => (next(v), _next(i + 1)), error);
379 return; // suspend
380 } else {
381 next(r);
382 }
383 }
384 complete();
385 };
386
387 _next(0);
388 }
389 );
350 390
351 391 export const empty = _observe<never>(({ complete }) => complete());
352 392
353 393 /**
354 394 * Creates a mutable state and the observable for the stored value.
355 395 *
356 396 * @param value The initial value for the state
357 397 * @returns an array of three elements `[observable, setter, getter]`
358 398 *
359 399 * The returned observable keeps the actual value and will emit it as the next
360 400 * element each time a consumer subscribes the observable.
361 401 *
362 402 * Calling the setter will update the stored value in the observable and notify
363 403 * all consumers.
364 404 */
365 405 export const stateful = <T>(producer: Producer<T>): Producer<T> => {
366 406 const fusedProducer = fuse(producer);
367 407 type Status = "active" | "complete" | "error";
368 408
369 409 let lastValue: T;
370 410 let hasValue = false;
371 411 let status: Status = "active";
372 412 let lastError: unknown;
373 413 let subscribers: Sink<T>[] = [];
374 414
375 415 const sink: Sink<T> = {
376 416 isClosed: () => status !== "active",
377 417 complete: () => {
378 418 if (status === "active") {
379 419 status = "complete";
380 420 const _subscribers = subscribers;
381 421 subscribers = [];
382 422 _subscribers.forEach(s => s.complete());
383 423 }
384 424 },
385 425 error: e => {
386 426 if (status === "active") {
387 427 status = "error";
388 428 lastError = e;
389 429 const _subscribers = subscribers;
390 430 subscribers = [];
391 431 _subscribers.forEach(s => s.error(e));
392 432 }
393 433 },
394 434 next: v => {
395 435 if (status === "active") {
396 436 hasValue = true;
397 437 lastValue = v;
398 438 const _subscribers = subscribers;
399 439 _subscribers.forEach(s => s.next(v));
400 440 }
401 441 }
402 442 };
403 443
404 444 fusedProducer(sink);
405 445
406 446 return (s: Sink<T>) => {
407 447 const _subscribers = subscribers;
408 448 switch (status) {
409 449 case "active":
410 450 if (hasValue)
411 451 s.next(lastValue); // if hasValue is true,
412 452 // lastValue has a valid value
413 453 subscribers.push(s);
414 454 return () => {
415 455 if (_subscribers === subscribers) {
416 456 const pos = subscribers.indexOf(s);
417 457 if (pos >= 0)
418 458 subscribers.splice(pos, 1);
419 459 }
420 460 };
421 461 case "complete":
422 462 s.complete();
423 463 break;
424 464 case "error":
425 465 s.error(lastError);
426 466 break;
427 467 }
428 468 };
429 469 };
430 470
431 471 /** Create the producer which will be called once when the first subscriber is
432 472 * attached, next subscribers would share the same producer. When all
433 473 * subscribers are removed the producer will be cleaned up.
434 474 *
435 475 * Use this wrapper to prevent spawning multiple producers.
436 476 *
437 477 * @param producer The source producer
438 478 * @returns The wrapped producer
439 479 */
440 480 export const subject = <T>(producer: Producer<T>): Producer<T> => {
441 481 const fusedProducer = fuse(producer);
442 482
443 483 let subscribers: Sink<T>[] = [];
444 484
445 485 let cleanup = noop;
446 486
447 487 const sink: Sink<T> = {
448 488 isClosed: () => false,
449 489 complete: () => {
450 490 const _subscribers = subscribers;
451 491 subscribers = [];
452 492 _subscribers.forEach(s => s.complete());
453 493 cleanup();
454 494 },
455 495 error: e => {
456 496 const _subscribers = subscribers;
457 497 subscribers = [];
458 498 _subscribers.forEach(s => s.error(e));
459 499 cleanup();
460 500 },
461 501 next: v => {
462 502 const _subscribers = subscribers;
463 503 _subscribers.forEach(s => s.next(v));
464 504 }
465 505 };
466 506
467 507 return client => {
468 508 const _subscribers = subscribers;
469 509 subscribers.push(client);
470 510 if (subscribers.length === 1)
471 511 cleanup = fusedProducer(sink) ?? noop;
472 512
473 513 return () => {
474 514 if (_subscribers === subscribers) {
475 515 const pos = subscribers.indexOf(client);
476 516 if (pos >= 0)
477 subscribers.splice(pos,1);
517 subscribers.splice(pos, 1);
478 518 if (!subscribers.length)
479 519 cleanup();
480 520 }
481 521 };
482 522 };
483 523 }; No newline at end of file
@@ -1,58 +1,61
1 1 import { PromiseOrValue } from "@implab/core-amd/interfaces";
2 2 import { isPromise } from "@implab/core-amd/safe";
3 3 import { observe, Observable } from "./observable";
4 4
5 5 export interface OrderedUpdate<T> {
6 6 /** The item is being updated */
7 7 readonly item: T;
8 8
9 9 /** The previous index of the item, -1 in case it is inserted */
10 10 readonly prevIndex: number;
11 11
12 12 /** The new index of the item, -1 in case it is deleted */
13 13 readonly newIndex: number;
14 14
15 15 }
16 16
17 17 export type QueryResults<T> = Observable<OrderedUpdate<T>>;
18 18
19 19 interface DjObservableResults<T> {
20 20 /**
21 21 * Allows observation of results
22 22 */
23 23 observe(listener: (object: T, previousIndex: number, newIndex: number) => void, includeUpdates?: boolean): {
24 24 remove(): void;
25 25 };
26 26 }
27 27
28 28 interface Queryable<T, Q, O> {
29 29 query(query?: Q, options?: O): PromiseOrValue<T[]>;
30 30 }
31 31
32 32 export const isDjObservableResults = <T>(v: object): v is DjObservableResults<T> =>
33 33 v && (typeof (v as { observe?: unknown; }).observe === "function");
34 34
35 35 export const query = <T, Q, O>(store: Queryable<T, Q, O>, includeUpdates = true) =>
36 36 (query?: Q, options?: O & { observe?: boolean }) => {
37 37 return observe<OrderedUpdate<T>>(({ next, complete, error, isClosed }) => {
38
39 const processResults = (items: T[]) =>
40 items.forEach((item, newIndex) => next({ item, newIndex, prevIndex: -1 }));
41
38 42 try {
39 43 const results = store.query(query, options);
40 44 if (isPromise(results)) {
41 results.then(items => items.forEach((item, newIndex) => next({ item, newIndex, prevIndex: -1 })))
42 .then(undefined, error);
45 results.then(processResults).then(undefined, error);
43 46 } else {
44 results.forEach((item, newIndex) => next({ item, newIndex, prevIndex: -1 }));
47 processResults(results);
45 48 }
46 49
47 50 if (!isClosed() && (options?.observe !== false) && isDjObservableResults<T>(results)) {
48 51 const h = results.observe((item, prevIndex, newIndex) => next({ item, prevIndex, newIndex }), includeUpdates);
49 52 return () => h.remove();
50 53 } else {
51 54 complete();
52 55 }
53 56 } catch (err) {
54 57 error(err);
55 58 }
56 59 });
57 60
58 61 };
@@ -1,89 +1,90
1 1 // eslint-disable-next-line @typescript-eslint/triple-slash-reference
2 2 /// <reference path="./css-plugin.d.ts"/>
3 3
4 import _WidgetBase = require("dijit/_WidgetBase");
4 5 import { Rendition } from "./tsx/traits";
5 6
6 7 declare global {
7 8 namespace JSX {
8 9
9 10 type Ref<T> = ((value: T | undefined) => void);
10 11
11 type Element = Rendition;
12 type Element = Rendition | _WidgetBase;
12 13
13 14 interface DjxIntrinsicAttributes<E> {
14 15 /** alias for className */
15 16 class?: string;
16 17
17 18 /** specifies the name of the property in the widget where the the
18 19 * reference to the current object will be stored
19 20 */
20 21 "data-dojo-attach-point"?: string;
21 22
22 23 /** specifies handlers map for the events */
23 24 "data-dojo-attach-event"?: string;
24 25
25 26 ref?: Ref<E>;
26 27
27 28 /** @deprecated */
28 29 [attr: string]: unknown;
29 30 }
30 31
31 32 // eslint-disable-next-line @typescript-eslint/no-empty-interface
32 33 interface DjxIntrinsicElements {
33 34 }
34 35
35 36 type RecursivePartial<T> = T extends string | number | boolean | null | undefined | ((...args: unknown[]) => unknown) ?
36 37 T :
37 38 { [k in keyof T]?: RecursivePartial<T[k]> };
38 39
39 40 type MatchingMemberKeys<T, U> = {
40 41 [K in keyof T]: T[K] extends U ? K : never;
41 42 }[keyof T];
42 43 type NotMatchingMemberKeys<T, U> = {
43 44 [K in keyof T]: T[K] extends U ? never : K;
44 45 }[keyof T];
45 46
46 47 type ExtractMembers<T, U> = Pick<T, MatchingMemberKeys<T, U>>;
47 48
48 49 type ExcludeMembers<T, U> = Pick<T, NotMatchingMemberKeys<T, U>>;
49 50
50 51 type ElementAttrNames<E> = NotMatchingMemberKeys<E, (...args: unknown[]) => unknown>;
51 52
52 53 type ElementAttrType<E, K extends string | symbol> = K extends keyof E ? RecursivePartial<E[K]> : string;
53 54
54 55
55 56 type ElementAttrNamesBlacklist = "children" | "getRootNode" | keyof EventTarget;
56 57
57 58 /** This type extracts keys of the specified parameter E by the following rule:
58 59 * 1. skips all ElementAttrNamesBlacklist
59 60 * 2. skips all methods except with the signature of event handlers
60 61 */
61 62 type AssignableElementAttrNames<E> = {
62 63 [K in keyof E]: K extends ElementAttrNamesBlacklist ? never :
63 64 ((evt: Event) => unknown) extends E[K] ? K :
64 65 E[K] extends ((...args: unknown[]) => unknown) ? never :
65 66 K;
66 67 }[keyof E];
67 68
68 69 type LaxElement<E extends object> =
69 70 RecursivePartial<Pick<E, AssignableElementAttrNames<E>>> &
70 71 DjxIntrinsicAttributes<E>;
71 72
72 73 type LaxIntrinsicElementsMap = {
73 74 [tag in keyof HTMLElementTagNameMap]: LaxElement<HTMLElementTagNameMap[tag]>
74 75 } & DjxIntrinsicElements;
75 76
76 77 type IntrinsicElements = {
77 78 [tag in keyof LaxIntrinsicElementsMap]: LaxIntrinsicElementsMap[tag];
78 79 };
79 80
80 81 interface ElementChildrenAttribute {
81 82 children: unknown;
82 83 }
83 84
84 85 interface IntrinsicClassAttributes<T> {
85 86 ref?: Ref<T>;
86 87 children?: unknown;
87 88 }
88 89 }
89 90 } No newline at end of file
@@ -1,127 +1,150
1 import { empty, observe } from "./observable";
2 import * as t from "tap";
1 import { empty, observe, of } from "./observable";
2 import * as tap from "tap";
3 3 import { Cancellation } from "@implab/core-amd/Cancellation";
4 import { delay } from "@implab/core-amd/safe";
4 5
5 6 const subj1 = observe<number>(({ next, complete }) => {
6 7 next(1);
7 8 complete();
8 9 next(2);
9 10 });
10 11
11 12 const consumer1 = {
12 13 sum: 0,
13 14 next(v: number) {
14 15 this.sum += v;
15 16 }
16 17 };
17 18
18 19 subj1.subscribe(consumer1);
19 t.equal(consumer1.sum, 1, "Should get only one value");
20 tap.equal(consumer1.sum, 1, "Should get only one value");
20 21
21 22 subj1.subscribe(consumer1);
22 t.equal(consumer1.sum, 2, "Should get the value again");
23 tap.equal(consumer1.sum, 2, "Should get the value again");
23 24
24 25 const consumer2 = {
25 26 value: 0,
26 27 completed: false,
27 28 next(v: number) { this.value = v; },
28 29 complete() { this.completed = true; }
29 30 };
30 31
31 32 let maps = 0;
32 33
33 34 subj1
34 35 .map(v => {
35 t.comment(`map1: ${v * 2}`);
36 tap.comment(`map1: ${v * 2}`);
36 37 maps++;
37 38 return v * 2;
38 39 })
39 40 .map(v => {
40 t.comment(`map2: ${v * 2}`);
41 tap.comment(`map2: ${v * 2}`);
41 42 maps++;
42 43 return v * 2;
43 44 })
44 45 .map(v => {
45 t.comment(`map3: ${v * 2}`);
46 tap.comment(`map3: ${v * 2}`);
46 47 maps++;
47 48 return v * 2;
48 49 })
49 50 .subscribe(consumer2);
50 51
51 t.equal(consumer2.value, 8, "Should map");
52 t.equal(maps, 3, "The map chain should not be executed after completion");
53 t.ok(consumer2.completed, "The completion signal should pass through");
52 tap.equal(consumer2.value, 8, "Should map");
53 tap.equal(maps, 3, "The map chain should not be executed after completion");
54 tap.ok(consumer2.completed, "The completion signal should pass through");
54 55
55 56 const subj2 = observe<number>(({ next, complete }) => {
56 57 [1, 2, 3, 4, 5].forEach(next);
57 58 complete();
58 59 return () => {
59 t.comment("subj2: unsubscribe");
60 tap.comment("subj2: unsubscribe");
60 61 };
61 62 });
62 63
63 64 const consumer3 = {
64 65 even: 0,
65 66 odd: 0,
66 67 completed: false,
67 68 subscribed: 0,
68 69 unsubscribed: 0,
69 70 next(v: "even" | "odd") {
70 this[v] ++;
71 this[v]++;
71 72 },
72 73 complete() {
73 74 this.completed = true;
74 75 },
75 76 subscribe() {
76 77 this.subscribed++;
77 78 },
78 79 unsubscribe() {
79 80 this.unsubscribed++;
80 81 }
81 82 };
82 83
83 84
84 85 const subj3 = subj2.pipe<"even" | "odd">(self => ({ next, complete, error }) => {
85 86 consumer3.subscribe();
86 87 let count = 0;
87 88 const h = self.subscribe({
88 89 next: val => {
89 90 if (val % 2 === 0)
90 91 next("odd");
91 92 else
92 93 next("even");
93 94 if (++count === 4)
94 95 complete();
95 96 },
96 97 complete,
97 98 error
98 99 });
99 return () =>{
100 return () => {
100 101 consumer3.unsubscribe();
101 102 h.unsubscribe();
102 103 };
103 104 });
104 105
105 106 subj3.subscribe(consumer3);
106 107
107 t.equal(consumer3.odd, 2, "Should get 2 odd elements");
108 t.equal(consumer3.even, 2, "Should get 2 even elements");
109 t.ok(consumer3.completed, "The sequence should completed");
110 t.equal(consumer3.subscribed, 1, "The subscription should be done once");
111 t.equal(consumer3.unsubscribed, 1, "The cleanup should be done after completion");
108 tap.equal(consumer3.odd, 2, "Should get 2 odd elements");
109 tap.equal(consumer3.even, 2, "Should get 2 even elements");
110 tap.ok(consumer3.completed, "The sequence should completed");
111 tap.equal(consumer3.subscribed, 1, "The subscription should be done once");
112 tap.equal(consumer3.unsubscribed, 1, "The cleanup should be done after completion");
112 113
113 subj2.reduce((a,b) => a + b).subscribe({
114 next: val => t.comment("subj2: reduce =", val),
115 complete: () => t.comment("subj2: complete")
114 subj2.reduce((a, b) => a + b).subscribe({
115 next: val => tap.comment("subj2: reduce =", val),
116 complete: () => tap.comment("subj2: complete")
116 117 });
117 118
118 subj2.reduce((a,b) => a + b).next()
119 .then(value => t.comment("subj2: next reduce=", value))
120 .catch(() => {});
119 tap.test("of(...) tests", async t => {
120 await subj2.reduce((a, b) => a + b).next()
121 .then(value => t.comment("subj2: next reduce=", value));
122
123 await subj2.next().then(val => t.equal(val, 1, "Should peek the first element"));
124
125 const cancelled = new Cancellation(cancel => cancel());
126 await t.rejects(subj2.next(cancelled), "Cancelled next() method should fail");
127
128 await t.rejects(empty.next(), "Empty sequence should fail to get next element");
129
130 await of(delay(1).then(() => 1), Promise.resolve(2), 3)
131 .reduce<number[]>((a, x) => [...a, x], [])
132 .next()
133 .then(res => t.same(res, [1, 2, 3], "of(...) should keep the order"));
121 134
122 subj2.next().then(val => t.equal(val, 1, "Should peek the first element")).catch(() => {});
135 const rejected = Promise.reject("DIE!");
136 rejected.catch(() => { }); // SAFE AND SOUND
123 137
124 const cancelled = new Cancellation(cancel => cancel());
125 t.rejects(subj2.next(cancelled), "Cancelled next() method should fail").catch(() => {});
138 await t.resolves(
139 of(Promise.resolve(1), rejected).next(),
140 "of(...) should emit non-rejected items"
141 );
142 await t.rejects(
143 of(1, Promise.reject("DIE!")).reduce((a) => a).next(),
144 "of(...) should terminate with error when a parameter is rejected"
145 );
126 146
127 t.rejects(empty.next(), "Empty sequence should fail to get next element").catch(() => {}); No newline at end of file
147 t.same(await of(1,2,3).collect(), [1,2,3], ".collect() should return the collected sequence");
148 await t.rejects(of(1,2,3).collect(cancelled), ".collect() should support cancellation");
149
150 }).catch(() => { }); No newline at end of file
@@ -1,4 +1,4
1 1 import "./declare-tests";
2 2 import "./observable-tests";
3 3 import "./state-tests";
4 import "./subject-tests"; No newline at end of file
4 import "./subject-tests";
General Comments 0
You need to be logged in to leave comments. Login now