##// END OF EJS Templates
added 'buffer' and 'subject' observable operators
cin -
r133:a3fba6b6c42e default
parent child
Show More
@@ -0,0 +1,105
1 import { Cancellation } from "@implab/core-amd/Cancellation";
2 import { CancelledError } from "@implab/core-amd/CancelledError";
3 import { Producer, Sink, Subscribable } from "../observable";
4 import { Scope } from "../tsx/Scope";
5
6 /**
7 * Creates a buffer with the specified length. The buffer will immediately
8 * subscribe to the source observable and start accumulating values.
9 *
10 * The resulting observable will repeat the buffered values for each new subscriber.
11 *
12 * @param length The number of elements to store.
13 * @param ct Cancellation token to unsubscribe from the original observable.
14 *
15 */
16 export const buffer = (length: number, ct = Cancellation.none) => <T>(source: Subscribable<T>): Producer<T> => {
17 type Status = "active" | "complete" | "error";
18
19 // ring-buffer, wpos will rotate in range (0...length-1)
20 let wpos = 0;
21 const buffer: T[] = [];
22
23 // writes the next value to the buffer
24 const write = (value: T) => {
25 buffer[wpos] = value;
26 wpos = (wpos + 1) % length;
27 };
28
29 // reads contents of the buffer
30 // cb will be called for each value in the buffer
31 const read = (cb: (item: T) => void) => {
32 const start = wpos + length - buffer.length;
33 const end = wpos + length;
34
35 for(let pos = start; pos < end; pos++ )
36 cb(buffer[pos % length]);
37 };
38
39 let status: Status = "active";
40 let lastError: unknown;
41 let subscribers: Sink<T>[] = [];
42
43 const scope = new Scope();
44
45 // cleanup method to release resources held by this subscription
46 const cleanup = (cb: (item: Sink<T>) => void) => {
47 scope.destroy();
48 const _subscribers = subscribers;
49 subscribers = [];
50 _subscribers.forEach(cb);
51 };
52
53 const sink: Sink<T> = {
54 isClosed: () => status !== "active",
55 complete: () => {
56 if (status === "active") {
57 status = "complete";
58 cleanup(s => s.complete());
59 }
60 },
61 error: e => {
62 if (status === "active") {
63 status = "error";
64 lastError = e;
65 cleanup(s => s.error(e));
66 }
67 },
68 next: v => {
69 if (status === "active") {
70 write(v);
71 const _subscribers = subscribers;
72 _subscribers.forEach(s => s.next(v));
73 }
74 }
75 };
76
77 if (ct.isRequested()) {
78 sink.error(new CancelledError("The operation was cancelled", ct));
79 } else {
80 scope.own(source.subscribe(sink));
81 scope.own(ct.register(e => sink.error(e)));
82 }
83
84 return (s: Sink<T>) => {
85 const _subscribers = subscribers;
86 read(s.next);
87 switch (status) {
88 case "active":
89 subscribers.push(s);
90 return () => {
91 if (_subscribers === subscribers) {
92 const pos = subscribers.indexOf(s);
93 if (pos >= 0)
94 subscribers.splice(pos, 1);
95 }
96 };
97 case "complete":
98 s.complete();
99 break;
100 case "error":
101 s.error(lastError);
102 break;
103 }
104 };
105 }; No newline at end of file
@@ -0,0 +1,50
1 import { Producer, Sink, Subscribable } from "../observable";
2
3 const noop = () => { };
4
5 /** Joins multiple subscribers to the single one. The resulting subscriber
6 * will be created when the first client subscribes and will be released
7 * with the the last client unsubscribed.
8 *
9 * Use this wrapper to prevent spawning multiple producers.
10 *
11 * @param source The source observable
12 * @returns The wrapped producer
13 */
14 export const subject = <T>(source: Subscribable<T>): Producer<T> => {
15 let subscribers: Sink<T>[] = [];
16
17 let subscription = { unsubscribe: noop };
18
19 // cleanup method to release resources held by this subscription
20 const cleanup = (cb: (item: Sink<T>) => void) => {
21 const _subscribers = subscribers;
22 subscribers = [];
23 _subscribers.forEach(cb);
24 subscription.unsubscribe();
25 };
26
27 const sink: Sink<T> = {
28 isClosed: () => false,
29 complete: () => cleanup(s => s.complete()),
30 error: e => cleanup(s => s.error(e)),
31 next: v => subscribers.forEach(s => s.next(v))
32 };
33
34 return client => {
35 const _subscribers = subscribers;
36 subscribers.push(client);
37 if (subscribers.length === 1)
38 subscription = source.subscribe(sink);
39
40 return () => {
41 if (_subscribers === subscribers) {
42 const pos = subscribers.indexOf(client);
43 if (pos >= 0)
44 subscribers.splice(pos, 1);
45 if (!subscribers.length)
46 subscription.unsubscribe();
47 }
48 };
49 };
50 };
@@ -10,6 +10,7
10 10 "dijit",
11 11 "djbase",
12 12 "djclass",
13 "Unsubscribable"
13 "Unsubscribable",
14 "wpos"
14 15 ]
15 16 } No newline at end of file
@@ -9,17 +9,17 export interface Observer<T> {
9 9 /**
10 10 * Called for the next element in the sequence
11 11 */
12 next: (value: T) => void;
12 next?: (value: T) => void;
13 13
14 14 /**
15 15 * Called once when the error occurs in the sequence.
16 16 */
17 error: (e: unknown) => void;
17 error?: (e: unknown) => void;
18 18
19 19 /**
20 20 * Called once at the end of the sequence.
21 21 */
22 complete: () => void;
22 complete?: () => void;
23 23 }
24 24
25 25 /**
@@ -62,7 +62,7 export const isSubscribable = <T = unkno
62 62 v !== null && v !== undefined && typeof (v as Subscribable<unknown>).subscribe === "function";
63 63
64 64 export interface Subscribable<T> {
65 subscribe(consumer: Partial<Observer<T>>): Unsubscribable;
65 subscribe(consumer: Observer<T>): Unsubscribable;
66 66 }
67 67
68 68 export type AccumulatorFn<T, A> = (acc: A, value: T) => A;
@@ -148,7 +148,7 export interface Observable<T> extends S
148 148
149 149 const noop = () => { };
150 150
151 const sink = <T>(consumer: Partial<Observer<T>>) => {
151 const sink = <T>(consumer: Observer<T>) => {
152 152 const { next, error, complete } = consumer;
153 153 return {
154 154 next: next ? next.bind(consumer) : noop,
@@ -185,7 +185,7 const fuse = <T>(producer: Producer<T>)
185 185 };
186 186
187 187 const _observe = <T>(producer: Producer<T>): Observable<T> => ({
188 subscribe: (consumer: Partial<Observer<T>>) => ({
188 subscribe: (consumer: Observer<T>) => ({
189 189 unsubscribe: producer(sink(consumer)) ?? noop
190 190 }),
191 191
@@ -294,7 +294,7 const _observe = <T>(producer: Producer<
294 294 return () => cleanup();
295 295 }),
296 296
297 pipe: <U>(op: (source: Observable<T>) => Producer<U>) => observe(op(_observe(producer))),
297 pipe: <U>(op: OperatorFn<T,U>) => observe(op(_observe(producer))),
298 298
299 299 next: collect(
300 300 producer,
@@ -345,6 +345,7 const collect = <T, U>(
345 345
346 346 export const observe = <T>(producer: Producer<T>) => _observe(fuse(producer));
347 347
348 /** Converts an array to the observable sequence of its elements. */
348 349 export const ofArray = <T>(items: T[]) => _observe<T>(
349 350 ({ next, complete }) => (
350 351 items.forEach(next),
@@ -352,6 +353,13 export const ofArray = <T>(items: T[]) =
352 353 )
353 354 );
354 355
356 /** Converts a subscribable to the observable */
357 export const ofSubscribable = <T>(subscribable: Subscribable<T>) =>
358 observe(sink => {
359 const subscription = subscribable.subscribe(sink);
360 return () => subscription.unsubscribe();
361 });
362
355 363 const of1 = <T>(item: T | PromiseLike<T>) => observe<T>(
356 364 ({ next, error, complete }) =>
357 365 isPromise(item) ?
@@ -362,6 +370,9 const of1 = <T>(item: T | PromiseLike<T>
362 370 (next(item), complete())
363 371 );
364 372
373 /** Converts a list of parameter values to the observable sequence. The
374 * order of elements in the list will be preserved in the resulting sequence.
375 */
365 376 export const of = <T>(...items: (T | PromiseLike<T>)[]) => items.length === 1 ?
366 377 of1(items[0]) :
367 378 observe<T>(
@@ -389,135 +400,3 export const of = <T>(...items: (T | Pro
389 400 );
390 401
391 402 export const empty = _observe<never>(({ complete }) => complete());
392
393 /**
394 * Creates a mutable state and the observable for the stored value.
395 *
396 * @param value The initial value for the state
397 * @returns an array of three elements `[observable, setter, getter]`
398 *
399 * The returned observable keeps the actual value and will emit it as the next
400 * element each time a consumer subscribes the observable.
401 *
402 * Calling the setter will update the stored value in the observable and notify
403 * all consumers.
404 */
405 export const stateful = <T>(producer: Producer<T>): Producer<T> => {
406 const fusedProducer = fuse(producer);
407 type Status = "active" | "complete" | "error";
408
409 let lastValue: T;
410 let hasValue = false;
411 let status: Status = "active";
412 let lastError: unknown;
413 let subscribers: Sink<T>[] = [];
414
415 const sink: Sink<T> = {
416 isClosed: () => status !== "active",
417 complete: () => {
418 if (status === "active") {
419 status = "complete";
420 const _subscribers = subscribers;
421 subscribers = [];
422 _subscribers.forEach(s => s.complete());
423 }
424 },
425 error: e => {
426 if (status === "active") {
427 status = "error";
428 lastError = e;
429 const _subscribers = subscribers;
430 subscribers = [];
431 _subscribers.forEach(s => s.error(e));
432 }
433 },
434 next: v => {
435 if (status === "active") {
436 hasValue = true;
437 lastValue = v;
438 const _subscribers = subscribers;
439 _subscribers.forEach(s => s.next(v));
440 }
441 }
442 };
443
444 fusedProducer(sink);
445
446 return (s: Sink<T>) => {
447 const _subscribers = subscribers;
448 switch (status) {
449 case "active":
450 if (hasValue)
451 s.next(lastValue); // if hasValue is true,
452 // lastValue has a valid value
453 subscribers.push(s);
454 return () => {
455 if (_subscribers === subscribers) {
456 const pos = subscribers.indexOf(s);
457 if (pos >= 0)
458 subscribers.splice(pos, 1);
459 }
460 };
461 case "complete":
462 s.complete();
463 break;
464 case "error":
465 s.error(lastError);
466 break;
467 }
468 };
469 };
470
471 /** Create the producer which will be called once when the first subscriber is
472 * attached, next subscribers would share the same producer. When all
473 * subscribers are removed the producer will be cleaned up.
474 *
475 * Use this wrapper to prevent spawning multiple producers.
476 *
477 * @param producer The source producer
478 * @returns The wrapped producer
479 */
480 export const subject = <T>(producer: Producer<T>): Producer<T> => {
481 const fusedProducer = fuse(producer);
482
483 let subscribers: Sink<T>[] = [];
484
485 let cleanup = noop;
486
487 const sink: Sink<T> = {
488 isClosed: () => false,
489 complete: () => {
490 const _subscribers = subscribers;
491 subscribers = [];
492 _subscribers.forEach(s => s.complete());
493 cleanup();
494 },
495 error: e => {
496 const _subscribers = subscribers;
497 subscribers = [];
498 _subscribers.forEach(s => s.error(e));
499 cleanup();
500 },
501 next: v => {
502 const _subscribers = subscribers;
503 _subscribers.forEach(s => s.next(v));
504 }
505 };
506
507 return client => {
508 const _subscribers = subscribers;
509 subscribers.push(client);
510 if (subscribers.length === 1)
511 cleanup = fusedProducer(sink) ?? noop;
512
513 return () => {
514 if (_subscribers === subscribers) {
515 const pos = subscribers.indexOf(client);
516 if (pos >= 0)
517 subscribers.splice(pos, 1);
518 if (!subscribers.length)
519 cleanup();
520 }
521 };
522 };
523 }; No newline at end of file
@@ -113,6 +113,7 const endRender = (prev: Context, curren
113 113
114 114 // called when the first beginRender is called for this iteration
115 115 const onRendering = () => {
116 trace.log("Rendering started");
116 117 setTimeout(() => {
117 118 if (_renderCount !== 0)
118 119 trace.error("Rendering tasks aren't finished, currently running = {0}", _renderCount);
@@ -121,6 +122,7 const onRendering = () => {
121 122
122 123 // called when all render operations are complete
123 124 const onRendered = () => {
125 trace.log("Rendering compete");
124 126 _renderedHooks.forEach(guard);
125 127 _renderedHooks = [];
126 128 };
@@ -1,4 +1,5
1 import { observe, stateful } from "./observable";
1 import { observe } from "./observable";
2 import { buffer } from "./operators/buffer";
2 3 import * as t from "tap";
3 4
4 5 interface CounterState {
@@ -11,10 +12,10 let set: (v: CounterState) => void = ()
11 12 const initial: CounterState = { count: 0, label: "low" };
12 13 let value = initial;
13 14
14 const obs = observe(stateful<CounterState>(({ next }) => {
15 const obs = observe<CounterState>(({ next }) => {
15 16 next(initial);
16 17 set = next;
17 }));
18 }).pipe(buffer(2));
18 19
19 20 set({ count: 10, label: "mid" });
20 21
@@ -1,11 +1,12
1 import { observe, subject } from "./observable";
1 import { observe } from "./observable";
2 import { subject } from "./operators/subject";
2 3 import * as tap from "tap";
3 4
4 5 tap.test("Subject tests", t => {
5 6
6 7 let nextEvent: (value: string) => void = () => void (0);
7 8
8 const subj1 = observe(subject<string>(({ next }) => {
9 const subj1 = observe<string>(({ next }) => {
9 10 t.comment("Start subject");
10 11
11 12 nextEvent = next;
@@ -14,7 +15,7 tap.test("Subject tests", t => {
14 15 nextEvent = () => void (0);
15 16 t.comment("Stop subject");
16 17 };
17 }));
18 }).pipe(subject);
18 19
19 20 const h1 = subj1.subscribe({
20 21 next: v => t.comment(`h1 next: ${v}`)
@@ -43,7 +44,7 tap.test("Subject tests #2", t => {
43 44
44 45 let nextEvent: (value: string) => void = () => void (0);
45 46
46 const subj1 = observe(subject<string>(({ next, complete }) => {
47 const subj1 = observe<string>(({ next, complete }) => {
47 48 t.comment("Start subject");
48 49
49 50 complete();
@@ -53,7 +54,7 tap.test("Subject tests #2", t => {
53 54 nextEvent = () => void (0);
54 55 t.comment("Stop subject");
55 56 };
56 }));
57 }).pipe(subject);
57 58
58 59 const h1 = subj1.subscribe({
59 60 next: v => t.comment(`h1 next: ${v}`)
General Comments 0
You need to be logged in to leave comments. Login now