##// END OF EJS Templates
added while, until methods to the observable interface....
cin -
r124:fbe158a5752a default
parent child
Show More

The requested changes are too big and content was truncated. Show full diff

@@ -0,0 +1,27
1 import { observe, stateful } from "./observable";
2 import * as t from "tap";
3
4 interface CounterState {
5 count: number;
6
7 label: "low" | "mid" | "high"
8 }
9
10 let set: (v: CounterState) => void = () => void (0);
11 const initial: CounterState = { count: 0, label: "low" };
12 let value = initial;
13
14 const obs = observe(stateful<CounterState>(({ next }) => {
15 next(initial);
16 set = next;
17 }));
18
19 set({ count: 10, label: "mid" });
20
21 obs.subscribe({
22 next: v => value = v
23 });
24
25 t.equal(value.count, 10, "State should update");
26
27 set({ count: 20, label: "high" });
@@ -1,306 +1,437
1 import { Cancellation } from "@implab/core-amd/Cancellation";
1 import { Cancellation } from "@implab/core-amd/Cancellation";
2 import { ICancellation } from "@implab/core-amd/interfaces";
2 import { ICancellation } from "@implab/core-amd/interfaces";
3
3
4 /**
4 /**
5 * The interface for the consumer of an observable sequence
5 * The interface for the consumer of an observable sequence
6 */
6 */
7 export interface Observer<T> {
7 export interface Observer<T> {
8 /**
8 /**
9 * Called for the next element in the sequence
9 * Called for the next element in the sequence
10 */
10 */
11 next: (value: T) => void;
11 next: (value: T) => void;
12
12
13 /**
13 /**
14 * Called once when the error occurs in the sequence.
14 * Called once when the error occurs in the sequence.
15 */
15 */
16 error: (e: unknown) => void;
16 error: (e: unknown) => void;
17
17
18 /**
18 /**
19 * Called once at the end of the sequence.
19 * Called once at the end of the sequence.
20 */
20 */
21 complete: () => void;
21 complete: () => void;
22 }
22 }
23
23
24 /**
24 /**
25 * The group of functions to feed an observable. These methods are provided to
25 * The group of functions to feed an observable. These methods are provided to
26 * the producer to generate a stream of events.
26 * the producer to generate a stream of events.
27 */
27 */
28 export type Sink<T> = {
28 export type Sink<T> = {
29 /**
29 /**
30 * Call to send the next element in the sequence
30 * Call to send the next element in the sequence
31 */
31 */
32 next: (value: T) => void;
32 next: (value: T) => void;
33
33
34 /**
34 /**
35 * Call to notify about the error occurred in the sequence.
35 * Call to notify about the error occurred in the sequence.
36 */
36 */
37 error: (e: unknown) => void;
37 error: (e: unknown) => void;
38
38
39 /**
39 /**
40 * Call to signal the end of the sequence.
40 * Call to signal the end of the sequence.
41 */
41 */
42 complete: () => void;
42 complete: () => void;
43
43
44 /**
44 /**
45 * Checks whether the sink is accepting new elements. It's safe to
45 * Checks whether the sink is accepting new elements. It's safe to
46 * send elements to the closed sink.
46 * send elements to the closed sink.
47 */
47 */
48 isClosed: () => boolean;
48 isClosed: () => boolean;
49 };
49 };
50
50
51 export type Producer<T> = (sink: Sink<T>) => (void | (() => void));
51 export type Producer<T> = (sink: Sink<T>) => (void | (() => void));
52
52
53 export interface Unsubscribable {
53 export interface Unsubscribable {
54 unsubscribe(): void;
54 unsubscribe(): void;
55 }
55 }
56
56
57 export const isUnsubscribable = (v: unknown): v is Unsubscribable =>
57 export const isUnsubscribable = (v: unknown): v is Unsubscribable =>
58 v !== null && v !== undefined && typeof (v as Unsubscribable).unsubscribe === "function";
58 v !== null && v !== undefined && typeof (v as Unsubscribable).unsubscribe === "function";
59
59
60 export const isSubscribable = <T = unknown>(v: unknown): v is Subscribable<T> =>
60 export const isSubscribable = <T = unknown>(v: unknown): v is Subscribable<T> =>
61 v !== null && v !== undefined && typeof (v as Subscribable<unknown>).subscribe === "function";
61 v !== null && v !== undefined && typeof (v as Subscribable<unknown>).subscribe === "function";
62
62
63 export interface Subscribable<T> {
63 export interface Subscribable<T> {
64 subscribe(consumer: Partial<Observer<T>>): Unsubscribable;
64 subscribe(consumer: Partial<Observer<T>>): Unsubscribable;
65 }
65 }
66
66
67 export type AccumulatorFn<T, A> = (acc: A, value: T) => A;
67 export type AccumulatorFn<T, A> = (acc: A, value: T) => A;
68
68
69 export type OperatorFn<T, U> = (source: Observable<T>) => Producer<U>;
70
69 /** The observable source of items. */
71 /** The observable source of items. */
70 export interface Observable<T> extends Subscribable<T> {
72 export interface Observable<T> extends Subscribable<T> {
71 /** Transforms elements of the sequence with the specified mapper
73 /** Transforms elements of the sequence with the specified mapper
72 *
74 *
73 * @param mapper The mapper used to transform the values
75 * @param mapper The mapper used to transform the values
74 */
76 */
75 map<T2>(mapper: (value: T) => T2): Observable<T2>;
77 map<T2>(mapper: (value: T) => T2): Observable<T2>;
76
78
77 /** Filters elements of the sequence. The resulting sequence will
79 /** Filters elements of the sequence. The resulting sequence will
78 * contain only elements which match the specified predicate.
80 * contain only elements which match the specified predicate.
79 *
81 *
80 * @param predicate The filter predicate.
82 * @param predicate The filter predicate.
81 */
83 */
82 filter(predicate: (value: T) => boolean): Observable<T>;
84 filter(predicate: (value: T) => boolean): Observable<T>;
83
85
86 /** Completes the sequence once the condition is met.
87 * @param predicate The condition which should be met to complete the sequence
88 */
89 until(predicate: (value: T) => boolean): Observable<T>;
90
91 /** Keeps the sequence running while elements satisfy the condition.
92 *
93 * @param predicate The condition which should be met to continue.
94 */
95 while(predicate: (value: T) => boolean): Observable<T>;
96
84 /** Applies accumulator to each value in the sequence and
97 /** Applies accumulator to each value in the sequence and
85 * emits the accumulated value for each source element
98 * emits the accumulated value for each source element
86 *
99 *
87 * @param accumulator
100 * @param accumulator
88 * @param initial
101 * @param initial
89 */
102 */
90 scan<A>(accumulator: AccumulatorFn<T, A>, initial: A): Observable<A>;
103 scan<A>(accumulator: AccumulatorFn<T, A>, initial: A): Observable<A>;
91 scan(accumulator: AccumulatorFn<T, T>): Observable<T>;
104 scan(accumulator: AccumulatorFn<T, T>): Observable<T>;
92
105
93 /** Applies accumulator to each value in the sequence and
106 /** Applies accumulator to each value in the sequence and
94 * emits the accumulated value at the end of the sequence
107 * emits the accumulated value at the end of the sequence
95 *
108 *
96 * @param accumulator
109 * @param accumulator
97 * @param initial
110 * @param initial
98 */
111 */
99 reduce<A>(accumulator: AccumulatorFn<T, A>, initial: A): Observable<A>;
112 reduce<A>(accumulator: AccumulatorFn<T, A>, initial: A): Observable<A>;
100 reduce(accumulator: AccumulatorFn<T, T>): Observable<T>;
113 reduce(accumulator: AccumulatorFn<T, T>): Observable<T>;
101
114
102 /** Concatenates the specified sequences with this observable
115 /** Concatenates the specified sequences with this observable
103 *
116 *
104 * @param seq sequences to concatenate with the current observable
117 * @param seq sequences to concatenate with the current observable
118 *
119 * The concatenation doesn't accumulate values from the specified sequences,
120 * The result of the concatenation is the new observable which will switch
121 * to the next observable after the previous one completes. Values emitted
122 * before the next observable being active are lost.
105 */
123 */
106 cat(...seq: Subscribable<T>[]): Observable<T>;
124 cat(...seq: Subscribable<T>[]): Observable<T>;
107
125
126
108 /** Pipes the specified operator to produce the new observable
127 /** Pipes the specified operator to produce the new observable
109 * @param op The operator which consumes this observable and produces a new one
128 * @param op The operator consumes this observable and produces a new one
129 *
130 * The operator is a higher order function which takes a source observable
131 * and returns a producer for the new observable.
132 *
133 * This function can be used to create a complex mapping between source and
134 * resulting observables. The operator may have a state (or a side effect)
135 * and can be connected to multiple observables.
110 */
136 */
111 pipe<U>(op: (source: Observable<T>) => Producer<U>): Observable<U>;
137 pipe<U>(op: OperatorFn<T, U>): Observable<U>;
112
138
113 /** Waits for the next event to occur and returns a promise for the next value
139 /** Waits for the next event to occur and returns a promise for the next value
114 * @param ct Cancellation token to
140 * @param ct Cancellation token to
115 */
141 */
116 next(ct?: ICancellation): Promise<T>;
142 next(ct?: ICancellation): Promise<T>;
117 }
143 }
118
144
119 const noop = () => { };
145 const noop = () => { };
120
146
121 const sink = <T>(consumer: Partial<Observer<T>>) => {
147 const sink = <T>(consumer: Partial<Observer<T>>) => {
122 const { next, error, complete } = consumer;
148 const { next, error, complete } = consumer;
123 return {
149 return {
124 next: next ? next.bind(consumer) : noop,
150 next: next ? next.bind(consumer) : noop,
125 error: error ? error.bind(consumer) : noop,
151 error: error ? error.bind(consumer) : noop,
126 complete: complete ? complete.bind(consumer) : noop,
152 complete: complete ? complete.bind(consumer) : noop,
127 isClosed: () => false
153 isClosed: () => false
128 };
154 };
129 };
155 };
130
156
131 /** Wraps the producer to handle tear down logic and subscription management
157 /** Wraps the producer to handle tear down logic and subscription management
132 *
158 *
133 * @param producer The producer to wrap
159 * @param producer The producer to wrap
134 * @returns The wrapper producer
160 * @returns The wrapper producer
135 */
161 */
136 const fuse = <T>(producer: Producer<T>) => ({ next, error, complete }: Sink<T>) => {
162 const fuse = <T>(producer: Producer<T>) => ({ next, error, complete }: Sink<T>) => {
137 let done = false;
163 let done = false;
138 let cleanup = noop;
164 let cleanup = noop;
139
165
140 const _fin = <A extends unknown[]>(fn: (...args: A) => void) =>
166 const _fin = <A extends unknown[]>(fn: (...args: A) => void) =>
141 (...args: A) => done ?
167 (...args: A) => done ?
142 void (0) :
168 void (0) :
143 (done = true, cleanup(), fn(...args));
169 (done = true, cleanup(), fn(...args));
144
170
171 const _fin0 = () => done ? void (0) : (done = true, cleanup());
172
145 const safeSink = {
173 const safeSink = {
146 next: (value: T) => { !done && next(value); },
174 next: (value: T) => { !done && next(value); },
147 error: _fin(error),
175 error: _fin(error),
148 complete: _fin(complete),
176 complete: _fin(complete),
149 isClosed: () => done
177 isClosed: () => done
150 };
178 };
151 cleanup = producer(safeSink) ?? noop;
179 cleanup = producer(safeSink) ?? noop;
152 return done ?
180 return done ? cleanup() : _fin0;
153 (cleanup(), noop) :
154 _fin(noop);
155 };
181 };
156
182
157 const _observe = <T>(producer: Producer<T>): Observable<T> => ({
183 const _observe = <T>(producer: Producer<T>): Observable<T> => ({
158 subscribe: (consumer: Partial<Observer<T>>) => ({
184 subscribe: (consumer: Partial<Observer<T>>) => ({
159 unsubscribe: producer(sink(consumer)) ?? noop
185 unsubscribe: producer(sink(consumer)) ?? noop
160 }),
186 }),
161
187
162 map: (mapper) => _observe(({ next, ...rest }) =>
188 map: (mapper) => _observe(({ next, ...rest }) =>
163 producer({
189 producer({
164 next: next !== noop ? (v: T) => next(mapper(v)) : noop,
190 next: next !== noop ? (v: T) => next(mapper(v)) : noop,
165 ...rest
191 ...rest
166 })
192 })
167 ),
193 ),
168
194
169 filter: (predicate) => _observe(({ next, ...rest }) =>
195 filter: (predicate) => _observe(({ next, ...rest }) =>
170 producer({
196 producer({
171 next: next !== noop ? (v: T) => predicate(v) ? next(v) : void (0) : noop,
197 next: next !== noop ? (v: T) => predicate(v) ? next(v) : void (0) : noop,
172 ...rest
198 ...rest
173 })
199 })
174 ),
200 ),
175
201
202 until: predicate => _observe(({ next, complete, ...rest }) =>
203 producer({
204 next: v => predicate(v) ? complete() : next(v),
205 complete,
206 ...rest
207 })
208 ),
209
210 while: predicate => _observe(({ next, complete, ...rest }) =>
211 producer({
212 next: v => predicate(v) ? next(v) : complete(),
213 complete,
214 ...rest
215 })
216 ),
217
176 scan: <A>(...args: [AccumulatorFn<T, A>, A] | [AccumulatorFn<T, T>]) => _observe<T | A>(({ next, ...rest }) => {
218 scan: <A>(...args: [AccumulatorFn<T, A>, A] | [AccumulatorFn<T, T>]) => _observe<T | A>(({ next, ...rest }) => {
177 if (args.length === 1) {
219 if (args.length === 1) {
178 const [accumulator] = args;
220 const [accumulator] = args;
179 let _acc: T;
221 let _acc: T;
180 let index = 0;
222 let index = 0;
181 return producer({
223 return producer({
182 next: next !== noop ? (v: T) => next(index++ === 0 ? _acc = v : _acc = accumulator(_acc, v)) : noop,
224 next: next !== noop ? (v: T) => next(index++ === 0 ? _acc = v : _acc = accumulator(_acc, v)) : noop,
183 ...rest
225 ...rest
184 });
226 });
185 } else {
227 } else {
186 const [accumulator, initial] = args;
228 const [accumulator, initial] = args;
187 let _acc = initial;
229 let _acc = initial;
188 return producer({
230 return producer({
189 next: next !== noop ? (v: T) => next(_acc = accumulator(_acc, v)) : noop,
231 next: next !== noop ? (v: T) => next(_acc = accumulator(_acc, v)) : noop,
190 ...rest
232 ...rest
191 });
233 });
192 }
234 }
193 }),
235 }),
194
236
195 reduce: <A>(...args: [AccumulatorFn<T, A>, A] | [AccumulatorFn<T, T>]) => _observe<T | A>(({ next, complete, error, ...rest }) => {
237 reduce: <A>(...args: [AccumulatorFn<T, A>, A] | [AccumulatorFn<T, T>]) => _observe<T | A>(({ next, complete, error, ...rest }) => {
196 if (args.length === 1) {
238 if (args.length === 1) {
197 const [accumulator] = args;
239 const [accumulator] = args;
198 let _acc: T;
240 let _acc: T;
199 let index = 0;
241 let index = 0;
200 return producer({
242 return producer({
201 next: next !== noop ? (v: T) => {
243 next: next !== noop ? (v: T) => {
202 _acc = index++ === 0 ? v : accumulator(_acc, v);
244 _acc = index++ === 0 ? v : accumulator(_acc, v);
203 } : noop,
245 } : noop,
204 complete: () => {
246 complete: () => {
205 if (index === 0) {
247 if (index === 0) {
206 error(new Error("The sequence can't be empty"));
248 error(new Error("The sequence can't be empty"));
207 } else {
249 } else {
208 next(_acc);
250 next(_acc);
209 complete();
251 complete();
210 }
252 }
211 },
253 },
212 error,
254 error,
213 ...rest
255 ...rest
214 });
256 });
215 } else {
257 } else {
216 const [accumulator, initial] = args;
258 const [accumulator, initial] = args;
217 let _acc = initial;
259 let _acc = initial;
218 return producer({
260 return producer({
219 next: next !== noop ? (v: T) => {
261 next: next !== noop ? (v: T) => {
220 _acc = accumulator(_acc, v);
262 _acc = accumulator(_acc, v);
221 } : noop,
263 } : noop,
222 complete: () => {
264 complete: () => {
223 next(_acc);
265 next(_acc);
224 complete();
266 complete();
225 },
267 },
226 error,
268 error,
227 ...rest
269 ...rest
228 });
270 });
229 }
271 }
230 }),
272 }),
231
273
232 cat: (...seq) => _observe(({ next, complete: final, ...rest }) => {
274 cat: (...seq) => _observe(({ next, complete: final, ...rest }) => {
233 let cleanup: () => void;
275 let cleanup: () => void;
234 const complete = () => {
276 const complete = () => {
235 const continuation = seq.shift();
277 const continuation = seq.shift();
236 if (continuation) {
278 if (continuation) {
237 // if we have a next sequence, subscribe to it
279 // if we have a next sequence, subscribe to it
238 const subscription = continuation.subscribe({ next, complete, ...rest });
280 const subscription = continuation.subscribe({ next, complete, ...rest });
239 cleanup = subscription.unsubscribe.bind(subscription);
281 cleanup = subscription.unsubscribe.bind(subscription);
240 } else {
282 } else {
241 // otherwise notify the consumer about completion
283 // otherwise notify the consumer about completion
242 final();
284 final();
243 }
285 }
244 };
286 };
245
287
246 cleanup = producer({ next, complete, ...rest }) ?? noop;
288 cleanup = producer({ next, complete, ...rest }) ?? noop;
247
289
248 return () => cleanup();
290 return () => cleanup();
249 }),
291 }),
250
292
251 pipe: <U>(op: (source: Observable<T>) => Producer<U>) => observe(op(_observe(producer))),
293 pipe: <U>(op: (source: Observable<T>) => Producer<U>) => observe(op(_observe(producer))),
252
294
253 next: (ct?: ICancellation) => {
295 next: (ct?: ICancellation) => {
254 const _ct = ct ?? Cancellation.none;
296 const _ct = ct ?? Cancellation.none;
255 return new Promise<T>((resolve, reject) => {
297 return new Promise<T>((resolve, reject) => {
256 // wrap the producer to handle only single event
298 // wrap the producer to handle only single event
257 const once = fuse<T>(({ next, complete, error, isClosed }) => {
299 const once = fuse<T>(({ next, complete, error, isClosed }) => {
258 const h = _ct.register(error);
300 const h = _ct.register(error);
259
301
260 // is the _ct fires it will call error() and isClosed() will return true
302 // is the _ct fires it will call error() and isClosed() will return true
261 const cleanup = !isClosed() ?
303 const cleanup = !isClosed() ?
262 producer({
304 producer({
263 next: v => (next(v), complete()),
305 next: v => (next(v), complete()),
264 complete: () => error(new Error("The sequence is empty")),
306 complete: () => error(new Error("The sequence is empty")),
265 error,
307 error,
266 isClosed
308 isClosed
267 }) ?? noop :
309 }) ?? noop :
268 noop;
310 noop;
269
311
270 return () => {
312 return () => {
271 h.destroy();
313 h.destroy();
272 cleanup();
314 cleanup();
273 };
315 };
274 });
316 });
275
317
276 once({
318 once({
277 next: resolve,
319 next: resolve,
278 error: reject,
320 error: reject,
279 complete: noop,
321 complete: noop,
280 isClosed: () => false
322 isClosed: () => false
281 });
323 });
282 });
324 });
283 }
325 }
284 });
326 });
285
327
286 export const observe = <T>(producer: Producer<T>) => _observe(fuse(producer));
328 export const observe = <T>(producer: Producer<T>) => _observe(fuse(producer));
287
329
288 export const streamArray = <T>(items: T[]) => _observe<T>(
330 export const streamArray = <T>(items: T[]) => _observe<T>(
289 ({ next, complete }) => (
331 ({ next, complete }) => (
290 items.forEach(next),
332 items.forEach(next),
291 complete()
333 complete()
292 )
334 )
293 );
335 );
294
336
295 export const streamPromise = <T>(promise: PromiseLike<T>) => observe<T>(
337 export const streamPromise = <T>(promise: PromiseLike<T>) => observe<T>(
296 ({next, error, complete}) => void promise.then(v => (next(v), complete()), error)
338 ({ next, error, complete }) => void promise.then(
339 v => (next(v), complete()),
340 error
341 )
297 );
342 );
298
343
299 export const of = <T>(...items: T[]) => _observe<T>(
344 export const of = <T>(...items: T[]) => _observe<T>(
300 ({ next, complete }) => (
345 ({ next, complete }) => (
301 items.forEach(next),
346 items.forEach(next),
302 complete()
347 complete()
303 )
348 )
304 );
349 );
305
350
306 export const empty = _observe<never>(({ complete }) => complete()); No newline at end of file
351 export const empty = _observe<never>(({ complete }) => complete());
352
353 /**
354 * Creates a mutable state and the observable for the stored value.
355 *
356 * @param value The initial value for the state
357 * @returns an array of three elements `[observable, setter, getter]`
358 *
359 * The returned observable keeps the actual value and will emit it as the next
360 * element each time a consumer subscribes the observable.
361 *
362 * Calling the setter will update the stored value in the observable and notify
363 * all consumers.
364 */
365 export const stateful = <T>(producer: Producer<T>): Producer<T> => {
366 const fusedProducer = fuse(producer);
367 type Status = "active" | "complete" | "error";
368
369 let lastValue: T;
370 let hasValue = false;
371 let status: Status = "active";
372 let lastError: unknown;
373 let subscribers: Sink<T>[] = [];
374
375 const sink: Sink<T> = {
376 isClosed: () => status !== "active",
377 complete: () => {
378 if (status === "active") {
379 status = "complete";
380 const _subscribers = subscribers;
381 subscribers = [];
382 _subscribers.forEach(s => s.complete());
383 }
384 },
385 error: e => {
386 if (status === "active") {
387 status = "error";
388 lastError = e;
389 const _subscribers = subscribers;
390 subscribers = [];
391 _subscribers.forEach(s => s.error(e));
392 }
393 },
394 next: v => {
395 if (status === "active") {
396 hasValue = true;
397 lastValue = v;
398 const _subscribers = subscribers;
399 _subscribers.forEach(s => s.next(v));
400 }
401 }
402 };
403
404 fusedProducer(sink);
405
406 return (s: Sink<T>) => {
407 const _subscribers = subscribers;
408 switch (status) {
409 case "active":
410 if (hasValue)
411 s.next(lastValue); // if hasValue is true,
412 // lastValue has a valid value
413 subscribers.push(s);
414 return () => {
415 if (_subscribers === subscribers) {
416 const pos = subscribers.indexOf(s);
417 if (pos >= 0)
418 subscribers.splice(pos, 1);
419 }
420 };
421 case "complete":
422 s.complete();
423 break;
424 case "error":
425 s.error(lastError);
426 break;
427 }
428 };
429 };
430
431 const subject = <T>(producer: Producer<T>): Producer<T> => {
432 const fusedProducer = fuse(producer);
433
434 return () => {
435
436 };
437 }; No newline at end of file
@@ -1,2 +1,3
1 import "./declare-tests";
1 import "./declare-tests";
2 import "./observable-tests"; No newline at end of file
2 import "./observable-tests";
3 import "./state-tests"; No newline at end of file
1 NO CONTENT: modified file
NO CONTENT: modified file
The requested commit or file is too big and content was truncated. Show full diff
General Comments 0
You need to be logged in to leave comments. Login now