##// END OF EJS Templates
added while, until methods to the observable interface....
cin -
r124:fbe158a5752a default
parent child
Show More
@@ -0,0 +1,27
1 import { observe, stateful } from "./observable";
2 import * as t from "tap";
3
4 interface CounterState {
5 count: number;
6
7 label: "low" | "mid" | "high"
8 }
9
10 let set: (v: CounterState) => void = () => void (0);
11 const initial: CounterState = { count: 0, label: "low" };
12 let value = initial;
13
14 const obs = observe(stateful<CounterState>(({ next }) => {
15 next(initial);
16 set = next;
17 }));
18
19 set({ count: 10, label: "mid" });
20
21 obs.subscribe({
22 next: v => value = v
23 });
24
25 t.equal(value.count, 10, "State should update");
26
27 set({ count: 20, label: "high" });
@@ -66,6 +66,8 export interface Subscribable<T> {
66 66
67 67 export type AccumulatorFn<T, A> = (acc: A, value: T) => A;
68 68
69 export type OperatorFn<T, U> = (source: Observable<T>) => Producer<U>;
70
69 71 /** The observable source of items. */
70 72 export interface Observable<T> extends Subscribable<T> {
71 73 /** Transforms elements of the sequence with the specified mapper
@@ -81,6 +83,17 export interface Observable<T> extends S
81 83 */
82 84 filter(predicate: (value: T) => boolean): Observable<T>;
83 85
86 /** Completes the sequence once the condition is met.
87 * @param predicate The condition which should be met to complete the sequence
88 */
89 until(predicate: (value: T) => boolean): Observable<T>;
90
91 /** Keeps the sequence running while elements satisfy the condition.
92 *
93 * @param predicate The condition which should be met to continue.
94 */
95 while(predicate: (value: T) => boolean): Observable<T>;
96
84 97 /** Applies accumulator to each value in the sequence and
85 98 * emits the accumulated value for each source element
86 99 *
@@ -102,13 +115,26 export interface Observable<T> extends S
102 115 /** Concatenates the specified sequences with this observable
103 116 *
104 117 * @param seq sequences to concatenate with the current observable
118 *
119 * The concatenation doesn't accumulate values from the specified sequences,
120 * The result of the concatenation is the new observable which will switch
121 * to the next observable after the previous one completes. Values emitted
122 * before the next observable being active are lost.
105 123 */
106 124 cat(...seq: Subscribable<T>[]): Observable<T>;
107 125
126
108 127 /** Pipes the specified operator to produce the new observable
109 * @param op The operator which consumes this observable and produces a new one
128 * @param op The operator consumes this observable and produces a new one
129 *
130 * The operator is a higher order function which takes a source observable
131 * and returns a producer for the new observable.
132 *
133 * This function can be used to create a complex mapping between source and
134 * resulting observables. The operator may have a state (or a side effect)
135 * and can be connected to multiple observables.
110 136 */
111 pipe<U>(op: (source: Observable<T>) => Producer<U>): Observable<U>;
137 pipe<U>(op: OperatorFn<T, U>): Observable<U>;
112 138
113 139 /** Waits for the next event to occur and returns a promise for the next value
114 140 * @param ct Cancellation token to
@@ -142,6 +168,8 const fuse = <T>(producer: Producer<T>)
142 168 void (0) :
143 169 (done = true, cleanup(), fn(...args));
144 170
171 const _fin0 = () => done ? void (0) : (done = true, cleanup());
172
145 173 const safeSink = {
146 174 next: (value: T) => { !done && next(value); },
147 175 error: _fin(error),
@@ -149,9 +177,7 const fuse = <T>(producer: Producer<T>)
149 177 isClosed: () => done
150 178 };
151 179 cleanup = producer(safeSink) ?? noop;
152 return done ?
153 (cleanup(), noop) :
154 _fin(noop);
180 return done ? cleanup() : _fin0;
155 181 };
156 182
157 183 const _observe = <T>(producer: Producer<T>): Observable<T> => ({
@@ -173,6 +199,22 const _observe = <T>(producer: Producer<
173 199 })
174 200 ),
175 201
202 until: predicate => _observe(({ next, complete, ...rest }) =>
203 producer({
204 next: v => predicate(v) ? complete() : next(v),
205 complete,
206 ...rest
207 })
208 ),
209
210 while: predicate => _observe(({ next, complete, ...rest }) =>
211 producer({
212 next: v => predicate(v) ? next(v) : complete(),
213 complete,
214 ...rest
215 })
216 ),
217
176 218 scan: <A>(...args: [AccumulatorFn<T, A>, A] | [AccumulatorFn<T, T>]) => _observe<T | A>(({ next, ...rest }) => {
177 219 if (args.length === 1) {
178 220 const [accumulator] = args;
@@ -293,7 +335,10 export const streamArray = <T>(items: T[
293 335 );
294 336
295 337 export const streamPromise = <T>(promise: PromiseLike<T>) => observe<T>(
296 ({next, error, complete}) => void promise.then(v => (next(v), complete()), error)
338 ({ next, error, complete }) => void promise.then(
339 v => (next(v), complete()),
340 error
341 )
297 342 );
298 343
299 344 export const of = <T>(...items: T[]) => _observe<T>(
@@ -303,4 +348,90 export const of = <T>(...items: T[]) =>
303 348 )
304 349 );
305 350
306 export const empty = _observe<never>(({ complete }) => complete()); No newline at end of file
351 export const empty = _observe<never>(({ complete }) => complete());
352
353 /**
354 * Creates a mutable state and the observable for the stored value.
355 *
356 * @param value The initial value for the state
357 * @returns an array of three elements `[observable, setter, getter]`
358 *
359 * The returned observable keeps the actual value and will emit it as the next
360 * element each time a consumer subscribes the observable.
361 *
362 * Calling the setter will update the stored value in the observable and notify
363 * all consumers.
364 */
365 export const stateful = <T>(producer: Producer<T>): Producer<T> => {
366 const fusedProducer = fuse(producer);
367 type Status = "active" | "complete" | "error";
368
369 let lastValue: T;
370 let hasValue = false;
371 let status: Status = "active";
372 let lastError: unknown;
373 let subscribers: Sink<T>[] = [];
374
375 const sink: Sink<T> = {
376 isClosed: () => status !== "active",
377 complete: () => {
378 if (status === "active") {
379 status = "complete";
380 const _subscribers = subscribers;
381 subscribers = [];
382 _subscribers.forEach(s => s.complete());
383 }
384 },
385 error: e => {
386 if (status === "active") {
387 status = "error";
388 lastError = e;
389 const _subscribers = subscribers;
390 subscribers = [];
391 _subscribers.forEach(s => s.error(e));
392 }
393 },
394 next: v => {
395 if (status === "active") {
396 hasValue = true;
397 lastValue = v;
398 const _subscribers = subscribers;
399 _subscribers.forEach(s => s.next(v));
400 }
401 }
402 };
403
404 fusedProducer(sink);
405
406 return (s: Sink<T>) => {
407 const _subscribers = subscribers;
408 switch (status) {
409 case "active":
410 if (hasValue)
411 s.next(lastValue); // if hasValue is true,
412 // lastValue has a valid value
413 subscribers.push(s);
414 return () => {
415 if (_subscribers === subscribers) {
416 const pos = subscribers.indexOf(s);
417 if (pos >= 0)
418 subscribers.splice(pos, 1);
419 }
420 };
421 case "complete":
422 s.complete();
423 break;
424 case "error":
425 s.error(lastError);
426 break;
427 }
428 };
429 };
430
431 const subject = <T>(producer: Producer<T>): Producer<T> => {
432 const fusedProducer = fuse(producer);
433
434 return () => {
435
436 };
437 }; No newline at end of file
@@ -1,2 +1,3
1 1 import "./declare-tests";
2 import "./observable-tests"; No newline at end of file
2 import "./observable-tests";
3 import "./state-tests"; No newline at end of file
@@ -30,7 +30,6
30 30 },
31 31 "../djx/build/npm/package": {
32 32 "name": "@implab/djx",
33 "version": "1.6.3",
34 33 "dev": true,
35 34 "license": "BSD-2-Clause",
36 35 "peerDependencies": {
General Comments 0
You need to be logged in to leave comments. Login now