##// END OF EJS Templates
added while, until methods to the observable interface....
cin -
r124:fbe158a5752a default
parent child
Show More

The requested changes are too big and content was truncated. Show full diff

@@ -0,0 +1,27
1 import { observe, stateful } from "./observable";
2 import * as t from "tap";
3
4 interface CounterState {
5 count: number;
6
7 label: "low" | "mid" | "high"
8 }
9
10 let set: (v: CounterState) => void = () => void (0);
11 const initial: CounterState = { count: 0, label: "low" };
12 let value = initial;
13
14 const obs = observe(stateful<CounterState>(({ next }) => {
15 next(initial);
16 set = next;
17 }));
18
19 set({ count: 10, label: "mid" });
20
21 obs.subscribe({
22 next: v => value = v
23 });
24
25 t.equal(value.count, 10, "State should update");
26
27 set({ count: 20, label: "high" });
@@ -1,306 +1,437
1 1 import { Cancellation } from "@implab/core-amd/Cancellation";
2 2 import { ICancellation } from "@implab/core-amd/interfaces";
3 3
4 4 /**
5 5 * The interface for the consumer of an observable sequence
6 6 */
7 7 export interface Observer<T> {
8 8 /**
9 9 * Called for the next element in the sequence
10 10 */
11 11 next: (value: T) => void;
12 12
13 13 /**
14 14 * Called once when the error occurs in the sequence.
15 15 */
16 16 error: (e: unknown) => void;
17 17
18 18 /**
19 19 * Called once at the end of the sequence.
20 20 */
21 21 complete: () => void;
22 22 }
23 23
24 24 /**
25 25 * The group of functions to feed an observable. These methods are provided to
26 26 * the producer to generate a stream of events.
27 27 */
28 28 export type Sink<T> = {
29 29 /**
30 30 * Call to send the next element in the sequence
31 31 */
32 32 next: (value: T) => void;
33 33
34 34 /**
35 35 * Call to notify about the error occurred in the sequence.
36 36 */
37 37 error: (e: unknown) => void;
38 38
39 39 /**
40 40 * Call to signal the end of the sequence.
41 41 */
42 42 complete: () => void;
43 43
44 44 /**
45 45 * Checks whether the sink is accepting new elements. It's safe to
46 46 * send elements to the closed sink.
47 47 */
48 48 isClosed: () => boolean;
49 49 };
50 50
51 51 export type Producer<T> = (sink: Sink<T>) => (void | (() => void));
52 52
53 53 export interface Unsubscribable {
54 54 unsubscribe(): void;
55 55 }
56 56
57 57 export const isUnsubscribable = (v: unknown): v is Unsubscribable =>
58 58 v !== null && v !== undefined && typeof (v as Unsubscribable).unsubscribe === "function";
59 59
60 60 export const isSubscribable = <T = unknown>(v: unknown): v is Subscribable<T> =>
61 61 v !== null && v !== undefined && typeof (v as Subscribable<unknown>).subscribe === "function";
62 62
63 63 export interface Subscribable<T> {
64 64 subscribe(consumer: Partial<Observer<T>>): Unsubscribable;
65 65 }
66 66
67 67 export type AccumulatorFn<T, A> = (acc: A, value: T) => A;
68 68
69 export type OperatorFn<T, U> = (source: Observable<T>) => Producer<U>;
70
69 71 /** The observable source of items. */
70 72 export interface Observable<T> extends Subscribable<T> {
71 73 /** Transforms elements of the sequence with the specified mapper
72 74 *
73 75 * @param mapper The mapper used to transform the values
74 76 */
75 77 map<T2>(mapper: (value: T) => T2): Observable<T2>;
76 78
77 79 /** Filters elements of the sequence. The resulting sequence will
78 80 * contain only elements which match the specified predicate.
79 81 *
80 82 * @param predicate The filter predicate.
81 83 */
82 84 filter(predicate: (value: T) => boolean): Observable<T>;
83 85
86 /** Completes the sequence once the condition is met.
87 * @param predicate The condition which should be met to complete the sequence
88 */
89 until(predicate: (value: T) => boolean): Observable<T>;
90
91 /** Keeps the sequence running while elements satisfy the condition.
92 *
93 * @param predicate The condition which should be met to continue.
94 */
95 while(predicate: (value: T) => boolean): Observable<T>;
96
84 97 /** Applies accumulator to each value in the sequence and
85 98 * emits the accumulated value for each source element
86 99 *
87 100 * @param accumulator
88 101 * @param initial
89 102 */
90 103 scan<A>(accumulator: AccumulatorFn<T, A>, initial: A): Observable<A>;
91 104 scan(accumulator: AccumulatorFn<T, T>): Observable<T>;
92 105
93 106 /** Applies accumulator to each value in the sequence and
94 107 * emits the accumulated value at the end of the sequence
95 108 *
96 109 * @param accumulator
97 110 * @param initial
98 111 */
99 112 reduce<A>(accumulator: AccumulatorFn<T, A>, initial: A): Observable<A>;
100 113 reduce(accumulator: AccumulatorFn<T, T>): Observable<T>;
101 114
102 115 /** Concatenates the specified sequences with this observable
103 116 *
104 117 * @param seq sequences to concatenate with the current observable
118 *
119 * The concatenation doesn't accumulate values from the specified sequences,
120 * The result of the concatenation is the new observable which will switch
121 * to the next observable after the previous one completes. Values emitted
122 * before the next observable being active are lost.
105 123 */
106 124 cat(...seq: Subscribable<T>[]): Observable<T>;
107 125
126
108 127 /** Pipes the specified operator to produce the new observable
109 * @param op The operator which consumes this observable and produces a new one
128 * @param op The operator consumes this observable and produces a new one
129 *
130 * The operator is a higher order function which takes a source observable
131 * and returns a producer for the new observable.
132 *
133 * This function can be used to create a complex mapping between source and
134 * resulting observables. The operator may have a state (or a side effect)
135 * and can be connected to multiple observables.
110 136 */
111 pipe<U>(op: (source: Observable<T>) => Producer<U>): Observable<U>;
137 pipe<U>(op: OperatorFn<T, U>): Observable<U>;
112 138
113 139 /** Waits for the next event to occur and returns a promise for the next value
114 140 * @param ct Cancellation token to
115 141 */
116 142 next(ct?: ICancellation): Promise<T>;
117 143 }
118 144
119 145 const noop = () => { };
120 146
121 147 const sink = <T>(consumer: Partial<Observer<T>>) => {
122 148 const { next, error, complete } = consumer;
123 149 return {
124 150 next: next ? next.bind(consumer) : noop,
125 151 error: error ? error.bind(consumer) : noop,
126 152 complete: complete ? complete.bind(consumer) : noop,
127 153 isClosed: () => false
128 154 };
129 155 };
130 156
131 157 /** Wraps the producer to handle tear down logic and subscription management
132 158 *
133 159 * @param producer The producer to wrap
134 160 * @returns The wrapper producer
135 161 */
136 162 const fuse = <T>(producer: Producer<T>) => ({ next, error, complete }: Sink<T>) => {
137 163 let done = false;
138 164 let cleanup = noop;
139 165
140 166 const _fin = <A extends unknown[]>(fn: (...args: A) => void) =>
141 167 (...args: A) => done ?
142 168 void (0) :
143 169 (done = true, cleanup(), fn(...args));
144 170
171 const _fin0 = () => done ? void (0) : (done = true, cleanup());
172
145 173 const safeSink = {
146 174 next: (value: T) => { !done && next(value); },
147 175 error: _fin(error),
148 176 complete: _fin(complete),
149 177 isClosed: () => done
150 178 };
151 179 cleanup = producer(safeSink) ?? noop;
152 return done ?
153 (cleanup(), noop) :
154 _fin(noop);
180 return done ? cleanup() : _fin0;
155 181 };
156 182
157 183 const _observe = <T>(producer: Producer<T>): Observable<T> => ({
158 184 subscribe: (consumer: Partial<Observer<T>>) => ({
159 185 unsubscribe: producer(sink(consumer)) ?? noop
160 186 }),
161 187
162 188 map: (mapper) => _observe(({ next, ...rest }) =>
163 189 producer({
164 190 next: next !== noop ? (v: T) => next(mapper(v)) : noop,
165 191 ...rest
166 192 })
167 193 ),
168 194
169 195 filter: (predicate) => _observe(({ next, ...rest }) =>
170 196 producer({
171 197 next: next !== noop ? (v: T) => predicate(v) ? next(v) : void (0) : noop,
172 198 ...rest
173 199 })
174 200 ),
175 201
202 until: predicate => _observe(({ next, complete, ...rest }) =>
203 producer({
204 next: v => predicate(v) ? complete() : next(v),
205 complete,
206 ...rest
207 })
208 ),
209
210 while: predicate => _observe(({ next, complete, ...rest }) =>
211 producer({
212 next: v => predicate(v) ? next(v) : complete(),
213 complete,
214 ...rest
215 })
216 ),
217
176 218 scan: <A>(...args: [AccumulatorFn<T, A>, A] | [AccumulatorFn<T, T>]) => _observe<T | A>(({ next, ...rest }) => {
177 219 if (args.length === 1) {
178 220 const [accumulator] = args;
179 221 let _acc: T;
180 222 let index = 0;
181 223 return producer({
182 224 next: next !== noop ? (v: T) => next(index++ === 0 ? _acc = v : _acc = accumulator(_acc, v)) : noop,
183 225 ...rest
184 226 });
185 227 } else {
186 228 const [accumulator, initial] = args;
187 229 let _acc = initial;
188 230 return producer({
189 231 next: next !== noop ? (v: T) => next(_acc = accumulator(_acc, v)) : noop,
190 232 ...rest
191 233 });
192 234 }
193 235 }),
194 236
195 237 reduce: <A>(...args: [AccumulatorFn<T, A>, A] | [AccumulatorFn<T, T>]) => _observe<T | A>(({ next, complete, error, ...rest }) => {
196 238 if (args.length === 1) {
197 239 const [accumulator] = args;
198 240 let _acc: T;
199 241 let index = 0;
200 242 return producer({
201 243 next: next !== noop ? (v: T) => {
202 244 _acc = index++ === 0 ? v : accumulator(_acc, v);
203 245 } : noop,
204 246 complete: () => {
205 247 if (index === 0) {
206 248 error(new Error("The sequence can't be empty"));
207 249 } else {
208 250 next(_acc);
209 251 complete();
210 252 }
211 253 },
212 254 error,
213 255 ...rest
214 256 });
215 257 } else {
216 258 const [accumulator, initial] = args;
217 259 let _acc = initial;
218 260 return producer({
219 261 next: next !== noop ? (v: T) => {
220 262 _acc = accumulator(_acc, v);
221 263 } : noop,
222 264 complete: () => {
223 265 next(_acc);
224 266 complete();
225 267 },
226 268 error,
227 269 ...rest
228 270 });
229 271 }
230 272 }),
231 273
232 274 cat: (...seq) => _observe(({ next, complete: final, ...rest }) => {
233 275 let cleanup: () => void;
234 276 const complete = () => {
235 277 const continuation = seq.shift();
236 278 if (continuation) {
237 279 // if we have a next sequence, subscribe to it
238 280 const subscription = continuation.subscribe({ next, complete, ...rest });
239 281 cleanup = subscription.unsubscribe.bind(subscription);
240 282 } else {
241 283 // otherwise notify the consumer about completion
242 284 final();
243 285 }
244 286 };
245 287
246 288 cleanup = producer({ next, complete, ...rest }) ?? noop;
247 289
248 290 return () => cleanup();
249 291 }),
250 292
251 293 pipe: <U>(op: (source: Observable<T>) => Producer<U>) => observe(op(_observe(producer))),
252 294
253 295 next: (ct?: ICancellation) => {
254 296 const _ct = ct ?? Cancellation.none;
255 297 return new Promise<T>((resolve, reject) => {
256 298 // wrap the producer to handle only single event
257 299 const once = fuse<T>(({ next, complete, error, isClosed }) => {
258 300 const h = _ct.register(error);
259 301
260 302 // is the _ct fires it will call error() and isClosed() will return true
261 303 const cleanup = !isClosed() ?
262 304 producer({
263 305 next: v => (next(v), complete()),
264 306 complete: () => error(new Error("The sequence is empty")),
265 307 error,
266 308 isClosed
267 309 }) ?? noop :
268 310 noop;
269 311
270 312 return () => {
271 313 h.destroy();
272 314 cleanup();
273 315 };
274 316 });
275 317
276 318 once({
277 319 next: resolve,
278 320 error: reject,
279 321 complete: noop,
280 322 isClosed: () => false
281 323 });
282 324 });
283 325 }
284 326 });
285 327
286 328 export const observe = <T>(producer: Producer<T>) => _observe(fuse(producer));
287 329
288 330 export const streamArray = <T>(items: T[]) => _observe<T>(
289 331 ({ next, complete }) => (
290 332 items.forEach(next),
291 333 complete()
292 334 )
293 335 );
294 336
295 337 export const streamPromise = <T>(promise: PromiseLike<T>) => observe<T>(
296 ({next, error, complete}) => void promise.then(v => (next(v), complete()), error)
338 ({ next, error, complete }) => void promise.then(
339 v => (next(v), complete()),
340 error
341 )
297 342 );
298 343
299 344 export const of = <T>(...items: T[]) => _observe<T>(
300 345 ({ next, complete }) => (
301 346 items.forEach(next),
302 347 complete()
303 348 )
304 349 );
305 350
306 export const empty = _observe<never>(({ complete }) => complete()); No newline at end of file
351 export const empty = _observe<never>(({ complete }) => complete());
352
353 /**
354 * Creates a mutable state and the observable for the stored value.
355 *
356 * @param value The initial value for the state
357 * @returns an array of three elements `[observable, setter, getter]`
358 *
359 * The returned observable keeps the actual value and will emit it as the next
360 * element each time a consumer subscribes the observable.
361 *
362 * Calling the setter will update the stored value in the observable and notify
363 * all consumers.
364 */
365 export const stateful = <T>(producer: Producer<T>): Producer<T> => {
366 const fusedProducer = fuse(producer);
367 type Status = "active" | "complete" | "error";
368
369 let lastValue: T;
370 let hasValue = false;
371 let status: Status = "active";
372 let lastError: unknown;
373 let subscribers: Sink<T>[] = [];
374
375 const sink: Sink<T> = {
376 isClosed: () => status !== "active",
377 complete: () => {
378 if (status === "active") {
379 status = "complete";
380 const _subscribers = subscribers;
381 subscribers = [];
382 _subscribers.forEach(s => s.complete());
383 }
384 },
385 error: e => {
386 if (status === "active") {
387 status = "error";
388 lastError = e;
389 const _subscribers = subscribers;
390 subscribers = [];
391 _subscribers.forEach(s => s.error(e));
392 }
393 },
394 next: v => {
395 if (status === "active") {
396 hasValue = true;
397 lastValue = v;
398 const _subscribers = subscribers;
399 _subscribers.forEach(s => s.next(v));
400 }
401 }
402 };
403
404 fusedProducer(sink);
405
406 return (s: Sink<T>) => {
407 const _subscribers = subscribers;
408 switch (status) {
409 case "active":
410 if (hasValue)
411 s.next(lastValue); // if hasValue is true,
412 // lastValue has a valid value
413 subscribers.push(s);
414 return () => {
415 if (_subscribers === subscribers) {
416 const pos = subscribers.indexOf(s);
417 if (pos >= 0)
418 subscribers.splice(pos, 1);
419 }
420 };
421 case "complete":
422 s.complete();
423 break;
424 case "error":
425 s.error(lastError);
426 break;
427 }
428 };
429 };
430
431 const subject = <T>(producer: Producer<T>): Producer<T> => {
432 const fusedProducer = fuse(producer);
433
434 return () => {
435
436 };
437 }; No newline at end of file
@@ -1,2 +1,3
1 1 import "./declare-tests";
2 import "./observable-tests"; No newline at end of file
2 import "./observable-tests";
3 import "./state-tests"; No newline at end of file
1 NO CONTENT: modified file
The requested commit or file is too big and content was truncated. Show full diff
General Comments 0
You need to be logged in to leave comments. Login now