@@ -0,0 +1,27 | |||
|
1 | import { observe, stateful } from "./observable"; | |
|
2 | import * as t from "tap"; | |
|
3 | ||
|
4 | interface CounterState { | |
|
5 | count: number; | |
|
6 | ||
|
7 | label: "low" | "mid" | "high" | |
|
8 | } | |
|
9 | ||
|
10 | let set: (v: CounterState) => void = () => void (0); | |
|
11 | const initial: CounterState = { count: 0, label: "low" }; | |
|
12 | let value = initial; | |
|
13 | ||
|
14 | const obs = observe(stateful<CounterState>(({ next }) => { | |
|
15 | next(initial); | |
|
16 | set = next; | |
|
17 | })); | |
|
18 | ||
|
19 | set({ count: 10, label: "mid" }); | |
|
20 | ||
|
21 | obs.subscribe({ | |
|
22 | next: v => value = v | |
|
23 | }); | |
|
24 | ||
|
25 | t.equal(value.count, 10, "State should update"); | |
|
26 | ||
|
27 | set({ count: 20, label: "high" }); |
@@ -66,6 +66,8 export interface Subscribable<T> { | |||
|
66 | 66 | |
|
67 | 67 | export type AccumulatorFn<T, A> = (acc: A, value: T) => A; |
|
68 | 68 | |
|
69 | export type OperatorFn<T, U> = (source: Observable<T>) => Producer<U>; | |
|
70 | ||
|
69 | 71 | /** The observable source of items. */ |
|
70 | 72 | export interface Observable<T> extends Subscribable<T> { |
|
71 | 73 | /** Transforms elements of the sequence with the specified mapper |
@@ -81,6 +83,17 export interface Observable<T> extends S | |||
|
81 | 83 | */ |
|
82 | 84 | filter(predicate: (value: T) => boolean): Observable<T>; |
|
83 | 85 | |
|
86 | /** Completes the sequence once the condition is met. | |
|
87 | * @param predicate The condition which should be met to complete the sequence | |
|
88 | */ | |
|
89 | until(predicate: (value: T) => boolean): Observable<T>; | |
|
90 | ||
|
91 | /** Keeps the sequence running while elements satisfy the condition. | |
|
92 | * | |
|
93 | * @param predicate The condition which should be met to continue. | |
|
94 | */ | |
|
95 | while(predicate: (value: T) => boolean): Observable<T>; | |
|
96 | ||
|
84 | 97 | /** Applies accumulator to each value in the sequence and |
|
85 | 98 | * emits the accumulated value for each source element |
|
86 | 99 | * |
@@ -102,13 +115,26 export interface Observable<T> extends S | |||
|
102 | 115 | /** Concatenates the specified sequences with this observable |
|
103 | 116 | * |
|
104 | 117 | * @param seq sequences to concatenate with the current observable |
|
118 | * | |
|
119 | * The concatenation doesn't accumulate values from the specified sequences, | |
|
120 | * The result of the concatenation is the new observable which will switch | |
|
121 | * to the next observable after the previous one completes. Values emitted | |
|
122 | * before the next observable being active are lost. | |
|
105 | 123 | */ |
|
106 | 124 | cat(...seq: Subscribable<T>[]): Observable<T>; |
|
107 | 125 | |
|
126 | ||
|
108 | 127 | /** Pipes the specified operator to produce the new observable |
|
109 |
* @param op The operator |
|
|
128 | * @param op The operator consumes this observable and produces a new one | |
|
129 | * | |
|
130 | * The operator is a higher order function which takes a source observable | |
|
131 | * and returns a producer for the new observable. | |
|
132 | * | |
|
133 | * This function can be used to create a complex mapping between source and | |
|
134 | * resulting observables. The operator may have a state (or a side effect) | |
|
135 | * and can be connected to multiple observables. | |
|
110 | 136 | */ |
|
111 |
pipe<U>(op |
|
|
137 | pipe<U>(op: OperatorFn<T, U>): Observable<U>; | |
|
112 | 138 | |
|
113 | 139 | /** Waits for the next event to occur and returns a promise for the next value |
|
114 | 140 | * @param ct Cancellation token to |
@@ -142,6 +168,8 const fuse = <T>(producer: Producer<T>) | |||
|
142 | 168 | void (0) : |
|
143 | 169 | (done = true, cleanup(), fn(...args)); |
|
144 | 170 | |
|
171 | const _fin0 = () => done ? void (0) : (done = true, cleanup()); | |
|
172 | ||
|
145 | 173 | const safeSink = { |
|
146 | 174 | next: (value: T) => { !done && next(value); }, |
|
147 | 175 | error: _fin(error), |
@@ -149,9 +177,7 const fuse = <T>(producer: Producer<T>) | |||
|
149 | 177 | isClosed: () => done |
|
150 | 178 | }; |
|
151 | 179 | cleanup = producer(safeSink) ?? noop; |
|
152 | return done ? | |
|
153 | (cleanup(), noop) : | |
|
154 | _fin(noop); | |
|
180 | return done ? cleanup() : _fin0; | |
|
155 | 181 | }; |
|
156 | 182 | |
|
157 | 183 | const _observe = <T>(producer: Producer<T>): Observable<T> => ({ |
@@ -173,6 +199,22 const _observe = <T>(producer: Producer< | |||
|
173 | 199 | }) |
|
174 | 200 | ), |
|
175 | 201 | |
|
202 | until: predicate => _observe(({ next, complete, ...rest }) => | |
|
203 | producer({ | |
|
204 | next: v => predicate(v) ? complete() : next(v), | |
|
205 | complete, | |
|
206 | ...rest | |
|
207 | }) | |
|
208 | ), | |
|
209 | ||
|
210 | while: predicate => _observe(({ next, complete, ...rest }) => | |
|
211 | producer({ | |
|
212 | next: v => predicate(v) ? next(v) : complete(), | |
|
213 | complete, | |
|
214 | ...rest | |
|
215 | }) | |
|
216 | ), | |
|
217 | ||
|
176 | 218 | scan: <A>(...args: [AccumulatorFn<T, A>, A] | [AccumulatorFn<T, T>]) => _observe<T | A>(({ next, ...rest }) => { |
|
177 | 219 | if (args.length === 1) { |
|
178 | 220 | const [accumulator] = args; |
@@ -293,7 +335,10 export const streamArray = <T>(items: T[ | |||
|
293 | 335 | ); |
|
294 | 336 | |
|
295 | 337 | export const streamPromise = <T>(promise: PromiseLike<T>) => observe<T>( |
|
296 |
({next, error, complete}) => void promise.then( |
|
|
338 | ({ next, error, complete }) => void promise.then( | |
|
339 | v => (next(v), complete()), | |
|
340 | error | |
|
341 | ) | |
|
297 | 342 | ); |
|
298 | 343 | |
|
299 | 344 | export const of = <T>(...items: T[]) => _observe<T>( |
@@ -303,4 +348,90 export const of = <T>(...items: T[]) => | |||
|
303 | 348 | ) |
|
304 | 349 | ); |
|
305 | 350 | |
|
306 | export const empty = _observe<never>(({ complete }) => complete()); No newline at end of file | |
|
351 | export const empty = _observe<never>(({ complete }) => complete()); | |
|
352 | ||
|
353 | /** | |
|
354 | * Creates a mutable state and the observable for the stored value. | |
|
355 | * | |
|
356 | * @param value The initial value for the state | |
|
357 | * @returns an array of three elements `[observable, setter, getter]` | |
|
358 | * | |
|
359 | * The returned observable keeps the actual value and will emit it as the next | |
|
360 | * element each time a consumer subscribes the observable. | |
|
361 | * | |
|
362 | * Calling the setter will update the stored value in the observable and notify | |
|
363 | * all consumers. | |
|
364 | */ | |
|
365 | export const stateful = <T>(producer: Producer<T>): Producer<T> => { | |
|
366 | const fusedProducer = fuse(producer); | |
|
367 | type Status = "active" | "complete" | "error"; | |
|
368 | ||
|
369 | let lastValue: T; | |
|
370 | let hasValue = false; | |
|
371 | let status: Status = "active"; | |
|
372 | let lastError: unknown; | |
|
373 | let subscribers: Sink<T>[] = []; | |
|
374 | ||
|
375 | const sink: Sink<T> = { | |
|
376 | isClosed: () => status !== "active", | |
|
377 | complete: () => { | |
|
378 | if (status === "active") { | |
|
379 | status = "complete"; | |
|
380 | const _subscribers = subscribers; | |
|
381 | subscribers = []; | |
|
382 | _subscribers.forEach(s => s.complete()); | |
|
383 | } | |
|
384 | }, | |
|
385 | error: e => { | |
|
386 | if (status === "active") { | |
|
387 | status = "error"; | |
|
388 | lastError = e; | |
|
389 | const _subscribers = subscribers; | |
|
390 | subscribers = []; | |
|
391 | _subscribers.forEach(s => s.error(e)); | |
|
392 | } | |
|
393 | }, | |
|
394 | next: v => { | |
|
395 | if (status === "active") { | |
|
396 | hasValue = true; | |
|
397 | lastValue = v; | |
|
398 | const _subscribers = subscribers; | |
|
399 | _subscribers.forEach(s => s.next(v)); | |
|
400 | } | |
|
401 | } | |
|
402 | }; | |
|
403 | ||
|
404 | fusedProducer(sink); | |
|
405 | ||
|
406 | return (s: Sink<T>) => { | |
|
407 | const _subscribers = subscribers; | |
|
408 | switch (status) { | |
|
409 | case "active": | |
|
410 | if (hasValue) | |
|
411 | s.next(lastValue); // if hasValue is true, | |
|
412 | // lastValue has a valid value | |
|
413 | subscribers.push(s); | |
|
414 | return () => { | |
|
415 | if (_subscribers === subscribers) { | |
|
416 | const pos = subscribers.indexOf(s); | |
|
417 | if (pos >= 0) | |
|
418 | subscribers.splice(pos, 1); | |
|
419 | } | |
|
420 | }; | |
|
421 | case "complete": | |
|
422 | s.complete(); | |
|
423 | break; | |
|
424 | case "error": | |
|
425 | s.error(lastError); | |
|
426 | break; | |
|
427 | } | |
|
428 | }; | |
|
429 | }; | |
|
430 | ||
|
431 | const subject = <T>(producer: Producer<T>): Producer<T> => { | |
|
432 | const fusedProducer = fuse(producer); | |
|
433 | ||
|
434 | return () => { | |
|
435 | ||
|
436 | }; | |
|
437 | }; No newline at end of file |
General Comments 0
You need to be logged in to leave comments.
Login now