##// END OF EJS Templates
added 'buffer' and 'subject' observable operators
cin -
r133:a3fba6b6c42e default
parent child
Show More
@@ -0,0 +1,105
1 import { Cancellation } from "@implab/core-amd/Cancellation";
2 import { CancelledError } from "@implab/core-amd/CancelledError";
3 import { Producer, Sink, Subscribable } from "../observable";
4 import { Scope } from "../tsx/Scope";
5
6 /**
7 * Creates a buffer with the specified length. The buffer will immediately
8 * subscribe to the source observable and start accumulating values.
9 *
10 * The resulting observable will repeat the buffered values for each new subscriber.
11 *
12 * @param length The number of elements to store.
13 * @param ct Cancellation token to unsubscribe from the original observable.
14 *
15 */
16 export const buffer = (length: number, ct = Cancellation.none) => <T>(source: Subscribable<T>): Producer<T> => {
17 type Status = "active" | "complete" | "error";
18
19 // ring-buffer, wpos will rotate in range (0...length-1)
20 let wpos = 0;
21 const buffer: T[] = [];
22
23 // writes the next value to the buffer
24 const write = (value: T) => {
25 buffer[wpos] = value;
26 wpos = (wpos + 1) % length;
27 };
28
29 // reads contents of the buffer
30 // cb will be called for each value in the buffer
31 const read = (cb: (item: T) => void) => {
32 const start = wpos + length - buffer.length;
33 const end = wpos + length;
34
35 for(let pos = start; pos < end; pos++ )
36 cb(buffer[pos % length]);
37 };
38
39 let status: Status = "active";
40 let lastError: unknown;
41 let subscribers: Sink<T>[] = [];
42
43 const scope = new Scope();
44
45 // cleanup method to release resources held by this subscription
46 const cleanup = (cb: (item: Sink<T>) => void) => {
47 scope.destroy();
48 const _subscribers = subscribers;
49 subscribers = [];
50 _subscribers.forEach(cb);
51 };
52
53 const sink: Sink<T> = {
54 isClosed: () => status !== "active",
55 complete: () => {
56 if (status === "active") {
57 status = "complete";
58 cleanup(s => s.complete());
59 }
60 },
61 error: e => {
62 if (status === "active") {
63 status = "error";
64 lastError = e;
65 cleanup(s => s.error(e));
66 }
67 },
68 next: v => {
69 if (status === "active") {
70 write(v);
71 const _subscribers = subscribers;
72 _subscribers.forEach(s => s.next(v));
73 }
74 }
75 };
76
77 if (ct.isRequested()) {
78 sink.error(new CancelledError("The operation was cancelled", ct));
79 } else {
80 scope.own(source.subscribe(sink));
81 scope.own(ct.register(e => sink.error(e)));
82 }
83
84 return (s: Sink<T>) => {
85 const _subscribers = subscribers;
86 read(s.next);
87 switch (status) {
88 case "active":
89 subscribers.push(s);
90 return () => {
91 if (_subscribers === subscribers) {
92 const pos = subscribers.indexOf(s);
93 if (pos >= 0)
94 subscribers.splice(pos, 1);
95 }
96 };
97 case "complete":
98 s.complete();
99 break;
100 case "error":
101 s.error(lastError);
102 break;
103 }
104 };
105 }; No newline at end of file
@@ -0,0 +1,50
1 import { Producer, Sink, Subscribable } from "../observable";
2
3 const noop = () => { };
4
5 /** Joins multiple subscribers to the single one. The resulting subscriber
6 * will be created when the first client subscribes and will be released
7 * with the the last client unsubscribed.
8 *
9 * Use this wrapper to prevent spawning multiple producers.
10 *
11 * @param source The source observable
12 * @returns The wrapped producer
13 */
14 export const subject = <T>(source: Subscribable<T>): Producer<T> => {
15 let subscribers: Sink<T>[] = [];
16
17 let subscription = { unsubscribe: noop };
18
19 // cleanup method to release resources held by this subscription
20 const cleanup = (cb: (item: Sink<T>) => void) => {
21 const _subscribers = subscribers;
22 subscribers = [];
23 _subscribers.forEach(cb);
24 subscription.unsubscribe();
25 };
26
27 const sink: Sink<T> = {
28 isClosed: () => false,
29 complete: () => cleanup(s => s.complete()),
30 error: e => cleanup(s => s.error(e)),
31 next: v => subscribers.forEach(s => s.next(v))
32 };
33
34 return client => {
35 const _subscribers = subscribers;
36 subscribers.push(client);
37 if (subscribers.length === 1)
38 subscription = source.subscribe(sink);
39
40 return () => {
41 if (_subscribers === subscribers) {
42 const pos = subscribers.indexOf(client);
43 if (pos >= 0)
44 subscribers.splice(pos, 1);
45 if (!subscribers.length)
46 subscription.unsubscribe();
47 }
48 };
49 };
50 };
@@ -1,15 +1,16
1 1 {
2 2 "java.configuration.updateBuildConfiguration": "automatic",
3 3 "files.exclude": {
4 4 "**/.classpath": true,
5 5 "**/.project": true,
6 6 "**/.settings": true,
7 7 "**/.factorypath": true
8 8 },
9 9 "cSpell.words": [
10 10 "dijit",
11 11 "djbase",
12 12 "djclass",
13 "Unsubscribable"
13 "Unsubscribable",
14 "wpos"
14 15 ]
15 16 } No newline at end of file
@@ -1,523 +1,402
1 1 import { Cancellation } from "@implab/core-amd/Cancellation";
2 2 import { ICancellation } from "@implab/core-amd/interfaces";
3 3 import { isPromise } from "@implab/core-amd/safe";
4 4
5 5 /**
6 6 * The interface for the consumer of an observable sequence
7 7 */
8 8 export interface Observer<T> {
9 9 /**
10 10 * Called for the next element in the sequence
11 11 */
12 next: (value: T) => void;
12 next?: (value: T) => void;
13 13
14 14 /**
15 15 * Called once when the error occurs in the sequence.
16 16 */
17 error: (e: unknown) => void;
17 error?: (e: unknown) => void;
18 18
19 19 /**
20 20 * Called once at the end of the sequence.
21 21 */
22 complete: () => void;
22 complete?: () => void;
23 23 }
24 24
25 25 /**
26 26 * The group of functions to feed an observable. These methods are provided to
27 27 * the producer to generate a stream of events.
28 28 */
29 29 export type Sink<T> = {
30 30 /**
31 31 * Call to send the next element in the sequence
32 32 */
33 33 next: (value: T) => void;
34 34
35 35 /**
36 36 * Call to notify about the error occurred in the sequence.
37 37 */
38 38 error: (e: unknown) => void;
39 39
40 40 /**
41 41 * Call to signal the end of the sequence.
42 42 */
43 43 complete: () => void;
44 44
45 45 /**
46 46 * Checks whether the sink is accepting new elements. It's safe to
47 47 * send elements to the closed sink.
48 48 */
49 49 isClosed: () => boolean;
50 50 };
51 51
52 52 export type Producer<T> = (sink: Sink<T>) => (void | (() => void));
53 53
54 54 export interface Unsubscribable {
55 55 unsubscribe(): void;
56 56 }
57 57
58 58 export const isUnsubscribable = (v: unknown): v is Unsubscribable =>
59 59 v !== null && v !== undefined && typeof (v as Unsubscribable).unsubscribe === "function";
60 60
61 61 export const isSubscribable = <T = unknown>(v: unknown): v is Subscribable<T> =>
62 62 v !== null && v !== undefined && typeof (v as Subscribable<unknown>).subscribe === "function";
63 63
64 64 export interface Subscribable<T> {
65 subscribe(consumer: Partial<Observer<T>>): Unsubscribable;
65 subscribe(consumer: Observer<T>): Unsubscribable;
66 66 }
67 67
68 68 export type AccumulatorFn<T, A> = (acc: A, value: T) => A;
69 69
70 70 export type OperatorFn<T, U> = (source: Observable<T>) => Producer<U>;
71 71
72 72 /** The observable source of items. */
73 73 export interface Observable<T> extends Subscribable<T> {
74 74 /** Transforms elements of the sequence with the specified mapper
75 75 *
76 76 * @param mapper The mapper used to transform the values
77 77 */
78 78 map<T2>(mapper: (value: T) => T2): Observable<T2>;
79 79
80 80 /** Filters elements of the sequence. The resulting sequence will
81 81 * contain only elements which match the specified predicate.
82 82 *
83 83 * @param predicate The filter predicate.
84 84 */
85 85 filter(predicate: (value: T) => boolean): Observable<T>;
86 86
87 87 /** Completes the sequence once the condition is met.
88 88 * @param predicate The condition which should be met to complete the sequence
89 89 */
90 90 until(predicate: (value: T) => boolean): Observable<T>;
91 91
92 92 /** Keeps the sequence running while elements satisfy the condition.
93 93 *
94 94 * @param predicate The condition which should be met to continue.
95 95 */
96 96 while(predicate: (value: T) => boolean): Observable<T>;
97 97
98 98 /** Applies accumulator to each value in the sequence and
99 99 * emits the accumulated value for each source element
100 100 *
101 101 * @param accumulator
102 102 * @param initial
103 103 */
104 104 scan<A>(accumulator: AccumulatorFn<T, A>, initial: A): Observable<A>;
105 105 scan(accumulator: AccumulatorFn<T, T>): Observable<T>;
106 106
107 107 /** Applies accumulator to each value in the sequence and
108 108 * emits the accumulated value at the end of the sequence
109 109 *
110 110 * @param accumulator
111 111 * @param initial
112 112 */
113 113 reduce<A>(accumulator: AccumulatorFn<T, A>, initial: A): Observable<A>;
114 114 reduce(accumulator: AccumulatorFn<T, T>): Observable<T>;
115 115
116 116 /** Concatenates the specified sequences with this observable
117 117 *
118 118 * @param seq sequences to concatenate with the current observable
119 119 *
120 120 * The concatenation doesn't accumulate values from the specified sequences,
121 121 * The result of the concatenation is the new observable which will switch
122 122 * to the next observable after the previous one completes. Values emitted
123 123 * before the next observable being active are lost.
124 124 */
125 125 cat(...seq: Subscribable<T>[]): Observable<T>;
126 126
127 127
128 128 /** Pipes the specified operator to produce the new observable
129 129 * @param op The operator consumes this observable and produces a new one
130 130 *
131 131 * The operator is a higher order function which takes a source observable
132 132 * and returns a producer for the new observable.
133 133 *
134 134 * This function can be used to create a complex mapping between source and
135 135 * resulting observables. The operator may have a state (or a side effect)
136 136 * and can be connected to multiple observables.
137 137 */
138 138 pipe<U>(op: OperatorFn<T, U>): Observable<U>;
139 139
140 140 /** Waits for the next event to occur and returns a promise for the next value
141 141 * @param ct Cancellation token
142 142 */
143 143 next(ct?: ICancellation): Promise<T>;
144 144
145 145 /** Collects items of the sequence to the array. */
146 146 collect(ct?: ICancellation): Promise<T[]>;
147 147 }
148 148
149 149 const noop = () => { };
150 150
151 const sink = <T>(consumer: Partial<Observer<T>>) => {
151 const sink = <T>(consumer: Observer<T>) => {
152 152 const { next, error, complete } = consumer;
153 153 return {
154 154 next: next ? next.bind(consumer) : noop,
155 155 error: error ? error.bind(consumer) : noop,
156 156 complete: complete ? complete.bind(consumer) : noop,
157 157 isClosed: () => false
158 158 };
159 159 };
160 160
161 161 /** Wraps the producer to handle tear down logic and subscription management
162 162 *
163 163 * @param producer The producer to wrap
164 164 * @returns The wrapper producer
165 165 */
166 166 const fuse = <T>(producer: Producer<T>) => ({ next, error, complete }: Sink<T>) => {
167 167 let done = false;
168 168 let cleanup = noop;
169 169
170 170 const _fin = <A extends unknown[]>(fn: (...args: A) => void) =>
171 171 (...args: A) => done ?
172 172 void (0) :
173 173 (done = true, cleanup(), fn(...args));
174 174
175 175 const _fin0 = () => done ? void (0) : (done = true, cleanup());
176 176
177 177 const safeSink = {
178 178 next: (value: T) => { !done && next(value); },
179 179 error: _fin(error),
180 180 complete: _fin(complete),
181 181 isClosed: () => done
182 182 };
183 183 cleanup = producer(safeSink) ?? noop;
184 184 return done ? cleanup() : _fin0;
185 185 };
186 186
187 187 const _observe = <T>(producer: Producer<T>): Observable<T> => ({
188 subscribe: (consumer: Partial<Observer<T>>) => ({
188 subscribe: (consumer: Observer<T>) => ({
189 189 unsubscribe: producer(sink(consumer)) ?? noop
190 190 }),
191 191
192 192 map: (mapper) => _observe(({ next, ...rest }) =>
193 193 producer({
194 194 next: next !== noop ? (v: T) => next(mapper(v)) : noop,
195 195 ...rest
196 196 })
197 197 ),
198 198
199 199 filter: (predicate) => _observe(({ next, ...rest }) =>
200 200 producer({
201 201 next: next !== noop ? (v: T) => predicate(v) ? next(v) : void (0) : noop,
202 202 ...rest
203 203 })
204 204 ),
205 205
206 206 until: predicate => _observe(({ next, complete, ...rest }) =>
207 207 producer({
208 208 next: v => predicate(v) ? complete() : next(v),
209 209 complete,
210 210 ...rest
211 211 })
212 212 ),
213 213
214 214 while: predicate => _observe(({ next, complete, ...rest }) =>
215 215 producer({
216 216 next: v => predicate(v) ? next(v) : complete(),
217 217 complete,
218 218 ...rest
219 219 })
220 220 ),
221 221
222 222 scan: <A>(...args: [AccumulatorFn<T, A>, A] | [AccumulatorFn<T, T>]) => _observe<T | A>(({ next, ...rest }) => {
223 223 if (args.length === 1) {
224 224 const [accumulator] = args;
225 225 let _acc: T;
226 226 let index = 0;
227 227 return producer({
228 228 next: next !== noop ? (v: T) => next(index++ === 0 ? _acc = v : _acc = accumulator(_acc, v)) : noop,
229 229 ...rest
230 230 });
231 231 } else {
232 232 const [accumulator, initial] = args;
233 233 let _acc = initial;
234 234 return producer({
235 235 next: next !== noop ? (v: T) => next(_acc = accumulator(_acc, v)) : noop,
236 236 ...rest
237 237 });
238 238 }
239 239 }),
240 240
241 241 reduce: <A>(...args: [AccumulatorFn<T, A>, A] | [AccumulatorFn<T, T>]) => _observe<T | A>(({ next, complete, error, ...rest }) => {
242 242 if (args.length === 1) {
243 243 const [accumulator] = args;
244 244 let _acc: T;
245 245 let index = 0;
246 246 return producer({
247 247 next: next !== noop ? (v: T) => {
248 248 _acc = index++ === 0 ? v : accumulator(_acc, v);
249 249 } : noop,
250 250 complete: () => {
251 251 if (index === 0) {
252 252 error(new Error("The sequence can't be empty"));
253 253 } else {
254 254 next(_acc);
255 255 complete();
256 256 }
257 257 },
258 258 error,
259 259 ...rest
260 260 });
261 261 } else {
262 262 const [accumulator, initial] = args;
263 263 let _acc = initial;
264 264 return producer({
265 265 next: next !== noop ? (v: T) => {
266 266 _acc = accumulator(_acc, v);
267 267 } : noop,
268 268 complete: () => {
269 269 next(_acc);
270 270 complete();
271 271 },
272 272 error,
273 273 ...rest
274 274 });
275 275 }
276 276 }),
277 277
278 278 cat: (...seq) => _observe(({ next, complete: final, ...rest }) => {
279 279 let cleanup: () => void;
280 280 const complete = () => {
281 281 const continuation = seq.shift();
282 282 if (continuation) {
283 283 // if we have a next sequence, subscribe to it
284 284 const subscription = continuation.subscribe({ next, complete, ...rest });
285 285 cleanup = subscription.unsubscribe.bind(subscription);
286 286 } else {
287 287 // otherwise notify the consumer about completion
288 288 final();
289 289 }
290 290 };
291 291
292 292 cleanup = producer({ next, complete, ...rest }) ?? noop;
293 293
294 294 return () => cleanup();
295 295 }),
296 296
297 pipe: <U>(op: (source: Observable<T>) => Producer<U>) => observe(op(_observe(producer))),
297 pipe: <U>(op: OperatorFn<T,U>) => observe(op(_observe(producer))),
298 298
299 299 next: collect(
300 300 producer,
301 301 ({ next, complete, error, isClosed }) => ({
302 302 next: v => (next(v), complete()),
303 303 complete: () => error(new Error("The sequence is empty")),
304 304 error,
305 305 isClosed
306 306 })
307 307 ),
308 308
309 309 collect: collect(
310 310 producer,
311 311 ({ next, complete, ...rest }) => {
312 312 const data: T[] = [];
313 313 return {
314 314 next: v => data.push(v),
315 315 complete: () => (next(data), complete()),
316 316 ...rest
317 317 };
318 318 }
319 319 )
320 320 });
321 321
322 322 const collect = <T, U>(
323 323 producer: Producer<T>,
324 324 collector: (result: Sink<U>) => Sink<T>
325 325 ) => (ct = Cancellation.none) => new Promise<U>((resolve, reject) => {
326 326 const fused = fuse<U>(({ next, complete, error, isClosed }) => {
327 327 const h = ct.register(error);
328 328 const cleanup = !isClosed() ?
329 329 producer(collector({ next, complete, error, isClosed })) ?? noop :
330 330 noop;
331 331
332 332 return () => {
333 333 h.destroy();
334 334 cleanup();
335 335 };
336 336 });
337 337
338 338 fused({
339 339 next: resolve,
340 340 error: reject,
341 341 complete: noop,
342 342 isClosed: () => false
343 343 });
344 344 });
345 345
346 346 export const observe = <T>(producer: Producer<T>) => _observe(fuse(producer));
347 347
348 /** Converts an array to the observable sequence of its elements. */
348 349 export const ofArray = <T>(items: T[]) => _observe<T>(
349 350 ({ next, complete }) => (
350 351 items.forEach(next),
351 352 complete()
352 353 )
353 354 );
354 355
356 /** Converts a subscribable to the observable */
357 export const ofSubscribable = <T>(subscribable: Subscribable<T>) =>
358 observe(sink => {
359 const subscription = subscribable.subscribe(sink);
360 return () => subscription.unsubscribe();
361 });
362
355 363 const of1 = <T>(item: T | PromiseLike<T>) => observe<T>(
356 364 ({ next, error, complete }) =>
357 365 isPromise(item) ?
358 366 void item.then(
359 367 v => (next(v), complete()),
360 368 error
361 369 ) :
362 370 (next(item), complete())
363 371 );
364 372
373 /** Converts a list of parameter values to the observable sequence. The
374 * order of elements in the list will be preserved in the resulting sequence.
375 */
365 376 export const of = <T>(...items: (T | PromiseLike<T>)[]) => items.length === 1 ?
366 377 of1(items[0]) :
367 378 observe<T>(
368 379 ({ next, error, complete, isClosed }) => {
369 380 const n = items.length;
370 381
371 382 const _next = (start: number) => {
372 383 if (start > 0 && isClosed()) // when resumed
373 384 return;
374 385
375 386 for (let i = start; i < n; i++) {
376 387 const r = items[i];
377 388 if (isPromise(r)) {
378 389 r.then(v => (next(v), _next(i + 1)), error);
379 390 return; // suspend
380 391 } else {
381 392 next(r);
382 393 }
383 394 }
384 395 complete();
385 396 };
386 397
387 398 _next(0);
388 399 }
389 400 );
390 401
391 402 export const empty = _observe<never>(({ complete }) => complete());
392
393 /**
394 * Creates a mutable state and the observable for the stored value.
395 *
396 * @param value The initial value for the state
397 * @returns an array of three elements `[observable, setter, getter]`
398 *
399 * The returned observable keeps the actual value and will emit it as the next
400 * element each time a consumer subscribes the observable.
401 *
402 * Calling the setter will update the stored value in the observable and notify
403 * all consumers.
404 */
405 export const stateful = <T>(producer: Producer<T>): Producer<T> => {
406 const fusedProducer = fuse(producer);
407 type Status = "active" | "complete" | "error";
408
409 let lastValue: T;
410 let hasValue = false;
411 let status: Status = "active";
412 let lastError: unknown;
413 let subscribers: Sink<T>[] = [];
414
415 const sink: Sink<T> = {
416 isClosed: () => status !== "active",
417 complete: () => {
418 if (status === "active") {
419 status = "complete";
420 const _subscribers = subscribers;
421 subscribers = [];
422 _subscribers.forEach(s => s.complete());
423 }
424 },
425 error: e => {
426 if (status === "active") {
427 status = "error";
428 lastError = e;
429 const _subscribers = subscribers;
430 subscribers = [];
431 _subscribers.forEach(s => s.error(e));
432 }
433 },
434 next: v => {
435 if (status === "active") {
436 hasValue = true;
437 lastValue = v;
438 const _subscribers = subscribers;
439 _subscribers.forEach(s => s.next(v));
440 }
441 }
442 };
443
444 fusedProducer(sink);
445
446 return (s: Sink<T>) => {
447 const _subscribers = subscribers;
448 switch (status) {
449 case "active":
450 if (hasValue)
451 s.next(lastValue); // if hasValue is true,
452 // lastValue has a valid value
453 subscribers.push(s);
454 return () => {
455 if (_subscribers === subscribers) {
456 const pos = subscribers.indexOf(s);
457 if (pos >= 0)
458 subscribers.splice(pos, 1);
459 }
460 };
461 case "complete":
462 s.complete();
463 break;
464 case "error":
465 s.error(lastError);
466 break;
467 }
468 };
469 };
470
471 /** Create the producer which will be called once when the first subscriber is
472 * attached, next subscribers would share the same producer. When all
473 * subscribers are removed the producer will be cleaned up.
474 *
475 * Use this wrapper to prevent spawning multiple producers.
476 *
477 * @param producer The source producer
478 * @returns The wrapped producer
479 */
480 export const subject = <T>(producer: Producer<T>): Producer<T> => {
481 const fusedProducer = fuse(producer);
482
483 let subscribers: Sink<T>[] = [];
484
485 let cleanup = noop;
486
487 const sink: Sink<T> = {
488 isClosed: () => false,
489 complete: () => {
490 const _subscribers = subscribers;
491 subscribers = [];
492 _subscribers.forEach(s => s.complete());
493 cleanup();
494 },
495 error: e => {
496 const _subscribers = subscribers;
497 subscribers = [];
498 _subscribers.forEach(s => s.error(e));
499 cleanup();
500 },
501 next: v => {
502 const _subscribers = subscribers;
503 _subscribers.forEach(s => s.next(v));
504 }
505 };
506
507 return client => {
508 const _subscribers = subscribers;
509 subscribers.push(client);
510 if (subscribers.length === 1)
511 cleanup = fusedProducer(sink) ?? noop;
512
513 return () => {
514 if (_subscribers === subscribers) {
515 const pos = subscribers.indexOf(client);
516 if (pos >= 0)
517 subscribers.splice(pos, 1);
518 if (!subscribers.length)
519 cleanup();
520 }
521 };
522 };
523 }; No newline at end of file
@@ -1,198 +1,200
1 1 import { TraceSource } from "@implab/core-amd/log/TraceSource";
2 2 import { isPromise } from "@implab/core-amd/safe";
3 3 import { id as mid } from "module";
4 4 import { IScope, Scope } from "./Scope";
5 5 import { isNode, isRendition, isWidget } from "./traits";
6 6
7 7 const trace = TraceSource.get(mid);
8 8
9 9 interface Context {
10 10 readonly scope: IScope;
11 11
12 12 readonly hooks?: (() => void)[];
13 13 }
14 14
15 15 let _context: Context = {
16 16 scope: Scope.dummy
17 17 };
18 18
19 19 let _renderCount = 0;
20 20 let _renderId = 1;
21 21 let _renderedHooks: (() => void)[] = [];
22 22
23 23
24 24 const guard = (cb: () => unknown) => {
25 25 try {
26 26 const result = cb();
27 27 if (isPromise(result)) {
28 28 const warn = (ret: unknown) => trace.error("The callback {0} competed asynchronously. result = {1}", cb, ret);
29 29 result.then(warn, warn);
30 30 }
31 31 } catch (e) {
32 32 trace.error(e);
33 33 }
34 34 };
35 35
36 36 /**
37 37 *
38 38 * @param scope
39 39 * @returns
40 40 */
41 41 export const beginRender = (scope = getScope()) => {
42 42 const prev = _context;
43 43 _renderCount++;
44 44 const renderId = _renderId++;
45 45 trace.debug("beginRender [{0}], pending = {1}", renderId, _renderCount);
46 46 if (_renderCount === 1)
47 47 onRendering();
48 48
49 49 _context = {
50 50 scope,
51 51 hooks: []
52 52 };
53 53 return endRender(prev, _context, renderId);
54 54 };
55 55
56 56 /**
57 57 * Method for a deferred rendering. Returns a promise with `beginRender()` function.
58 58 * Call to `scheduleRender` will save the current context, and will increment pending
59 59 * operations counter.
60 60 *
61 61 * @example
62 62 *
63 63 * const begin = await scheduleRender();
64 64 * const end = begin();
65 65 * try {
66 66 * // do some DOM manipulations
67 67 * } finally {
68 68 * end();
69 69 * }
70 70 *
71 71 * @param scope
72 72 * @returns
73 73 */
74 74 export const scheduleRender = async (scope = getScope()) => {
75 75 _renderCount++;
76 76 const renderId = _renderId ++;
77 77 trace.debug("scheduleRender [{0}], pending = {1}", renderId, _renderCount);
78 78 if (_renderCount === 1)
79 79 onRendering();
80 80
81 81 await Promise.resolve();
82 82
83 83 return () => {
84 84 trace.debug("beginRender [{0}], pending = {1}", renderId, _renderCount);
85 85 const prev = _context;
86 86
87 87 _context = {
88 88 scope,
89 89 hooks: []
90 90 };
91 91 return endRender(prev, _context, renderId);
92 92 };
93 93 };
94 94
95 95 /**
96 96 * Completes render operation
97 97 */
98 98 const endRender = (prev: Context, current: Context, renderId: number) => () => {
99 99 if (_context !== current)
100 100 trace.error("endRender mismatched beginRender call");
101 101
102 102 const { hooks } = _context;
103 103 if (hooks)
104 104 hooks.forEach(guard);
105 105
106 106 _renderCount--;
107 107 _context = prev;
108 108
109 109 trace.debug("endRender [{0}], pending = {1}", renderId, _renderCount);
110 110 if (_renderCount === 0)
111 111 onRendered();
112 112 };
113 113
114 114 // called when the first beginRender is called for this iteration
115 115 const onRendering = () => {
116 trace.log("Rendering started");
116 117 setTimeout(() => {
117 118 if (_renderCount !== 0)
118 119 trace.error("Rendering tasks aren't finished, currently running = {0}", _renderCount);
119 120 });
120 121 };
121 122
122 123 // called when all render operations are complete
123 124 const onRendered = () => {
125 trace.log("Rendering compete");
124 126 _renderedHooks.forEach(guard);
125 127 _renderedHooks = [];
126 128 };
127 129
128 130 export const whenRendered = () => new Promise<void>((resolve) => {
129 131 if (_renderCount)
130 132 _renderedHooks.push(resolve);
131 133 else
132 134 resolve();
133 135 });
134 136
135 137 export const renderHook = (hook: () => void) => {
136 138 const { hooks } = _context;
137 139 if (hooks)
138 140 hooks.push(hook);
139 141 else
140 142 guard(hook);
141 143 };
142 144
143 145 export const refHook = <T>(value: T, ref: JSX.Ref<T>) => {
144 146 const { hooks, scope } = _context;
145 147 if (hooks)
146 148 hooks.push(() => ref(value));
147 149 else
148 150 guard(() => ref(value));
149 151
150 152 scope.own(() => ref(undefined));
151 153 };
152 154
153 155 /** Returns the current scope */
154 156 export const getScope = () => _context.scope;
155 157
156 158 /** Schedules the rendition to be rendered to the DOM Node
157 159 * @param rendition The rendition to be rendered
158 160 * @param scope The scope
159 161 */
160 162 export const render = (rendition: unknown, scope = Scope.dummy) => {
161 163 const complete = beginRender(scope);
162 164 try {
163 165 return getItemDom(rendition);
164 166 } finally {
165 167 complete();
166 168 }
167 169 };
168 170
169 171 const emptyFragment = document.createDocumentFragment();
170 172
171 173 /** Renders DOM element for different types of the argument. */
172 174 export const getItemDom = (v: unknown) => {
173 175 if (typeof v === "string" || typeof v === "number" || v instanceof RegExp || v instanceof Date) {
174 176 // primitive types converted to the text nodes
175 177 return document.createTextNode(v.toString());
176 178 } else if (isNode(v)) {
177 179 // nodes are kept as is
178 180 return v;
179 181 } else if (isRendition(v)) {
180 182 // renditions are instantiated
181 183 return v.getDomNode();
182 184 } else if (isWidget(v)) {
183 185 // widgets are converted to it's markup
184 186 return v.domNode;
185 187 } else if (typeof v === "boolean" || v === null || v === undefined) {
186 188 // null | undefined | boolean are removed
187 189 return emptyFragment;
188 190 } else if (v instanceof Array) {
189 191 // arrays will be translated to document fragments
190 192 const fragment = document.createDocumentFragment();
191 193 v.map(item => getItemDom(item))
192 194 .forEach(node => fragment.appendChild(node));
193 195 return fragment;
194 196 } else {
195 197 // bug: explicit error otherwise
196 198 throw new Error(`Invalid parameter: ${String(v)}`);
197 199 }
198 200 };
@@ -1,27 +1,28
1 import { observe, stateful } from "./observable";
1 import { observe } from "./observable";
2 import { buffer } from "./operators/buffer";
2 3 import * as t from "tap";
3 4
4 5 interface CounterState {
5 6 count: number;
6 7
7 8 label: "low" | "mid" | "high"
8 9 }
9 10
10 11 let set: (v: CounterState) => void = () => void (0);
11 12 const initial: CounterState = { count: 0, label: "low" };
12 13 let value = initial;
13 14
14 const obs = observe(stateful<CounterState>(({ next }) => {
15 const obs = observe<CounterState>(({ next }) => {
15 16 next(initial);
16 17 set = next;
17 }));
18 }).pipe(buffer(2));
18 19
19 20 set({ count: 10, label: "mid" });
20 21
21 22 obs.subscribe({
22 23 next: v => value = v
23 24 });
24 25
25 26 t.equal(value.count, 10, "State should update");
26 27
27 28 set({ count: 20, label: "high" });
@@ -1,78 +1,79
1 import { observe, subject } from "./observable";
1 import { observe } from "./observable";
2 import { subject } from "./operators/subject";
2 3 import * as tap from "tap";
3 4
4 5 tap.test("Subject tests", t => {
5 6
6 7 let nextEvent: (value: string) => void = () => void (0);
7 8
8 const subj1 = observe(subject<string>(({ next }) => {
9 const subj1 = observe<string>(({ next }) => {
9 10 t.comment("Start subject");
10 11
11 12 nextEvent = next;
12 13
13 14 return () => {
14 15 nextEvent = () => void (0);
15 16 t.comment("Stop subject");
16 17 };
17 }));
18 }).pipe(subject);
18 19
19 20 const h1 = subj1.subscribe({
20 21 next: v => t.comment(`h1 next: ${v}`)
21 22 });
22 23
23 24 nextEvent("first");
24 25
25 26 const h2 = subj1.subscribe({
26 27 next: v => t.comment(`h2 next: ${v}`)
27 28 });
28 29
29 30 nextEvent("second");
30 31
31 32 h1.unsubscribe();
32 33
33 34 nextEvent("third");
34 35
35 36 h2.unsubscribe();
36 37
37 38 t.pass("Subject finished");
38 39 t.end();
39 40 }).catch(e => console.error(e));
40 41
41 42
42 43 tap.test("Subject tests #2", t => {
43 44
44 45 let nextEvent: (value: string) => void = () => void (0);
45 46
46 const subj1 = observe(subject<string>(({ next, complete }) => {
47 const subj1 = observe<string>(({ next, complete }) => {
47 48 t.comment("Start subject");
48 49
49 50 complete();
50 51 nextEvent = next;
51 52
52 53 return () => {
53 54 nextEvent = () => void (0);
54 55 t.comment("Stop subject");
55 56 };
56 }));
57 }).pipe(subject);
57 58
58 59 const h1 = subj1.subscribe({
59 60 next: v => t.comment(`h1 next: ${v}`)
60 61 });
61 62
62 63 nextEvent("first");
63 64
64 65 const h2 = subj1.subscribe({
65 66 next: v => t.comment(`h2 next: ${v}`)
66 67 });
67 68
68 69 nextEvent("second");
69 70
70 71 h1.unsubscribe();
71 72
72 73 nextEvent("third");
73 74
74 75 h2.unsubscribe();
75 76
76 77 t.pass("Subject finished");
77 78 t.end();
78 79 }).catch(e => console.error(e)); No newline at end of file
General Comments 0
You need to be logged in to leave comments. Login now