##// END OF EJS Templates
code review, minor refactoring
cin -
r142:894b8239b953 v1.9.0-rc5 default
parent child
Show More
@@ -1,403 +1,411
1 1 import { id as mid} from "module";
2 2 import { Cancellation } from "@implab/core-amd/Cancellation";
3 3 import { ICancellation } from "@implab/core-amd/interfaces";
4 4 import { TraceSource } from "@implab/core-amd/log/TraceSource";
5 5 import { isPromise } from "@implab/core-amd/safe";
6 6
7 7 const trace = TraceSource.get(mid);
8 8
9 9 /**
10 10 * The interface for the consumer of an observable sequence
11 11 */
12 12 export interface Observer<T> {
13 13 /**
14 14 * Called for the next element in the sequence
15 15 */
16 next?: (value: T) => void;
16 next?(value: T): void;
17 17
18 18 /**
19 19 * Called once when the error occurs in the sequence.
20 20 */
21 error?: (e: unknown) => void;
21 error?(e: unknown): void;
22 22
23 23 /**
24 24 * Called once at the end of the sequence.
25 25 */
26 complete?: () => void;
26 complete?(): void;
27 27 }
28 28
29 29 /**
30 30 * The group of functions to feed an observable. These methods are provided to
31 31 * the producer to generate a stream of events.
32 32 */
33 33 export type Sink<T> = {
34 34 /**
35 35 * Call to send the next element in the sequence
36 36 */
37 37 next: (value: T) => void;
38 38
39 39 /**
40 40 * Call to notify about the error occurred in the sequence.
41 41 */
42 42 error: (e: unknown) => void;
43 43
44 44 /**
45 45 * Call to signal the end of the sequence.
46 46 */
47 47 complete: () => void;
48 48
49 49 /**
50 50 * Checks whether the sink is accepting new elements. It's safe to
51 51 * send elements to the closed sink.
52 52 */
53 53 isClosed: () => boolean;
54 54 };
55 55
56 56 export type Producer<T> = (sink: Sink<T>) => (void | (() => void));
57 57
58 type FusedSink<T> = Omit<Sink<T>, "isClosed">;
59
60 type FusedProducer<T> = (sink: FusedSink<T>) => (void | (() => void));
61
58 62 export interface Unsubscribable {
59 63 unsubscribe(): void;
60 64 }
61 65
62 66 export const isUnsubscribable = (v: unknown): v is Unsubscribable =>
63 67 v !== null && v !== undefined && typeof (v as Unsubscribable).unsubscribe === "function";
64 68
65 69 export const isSubscribable = <T = unknown>(v: unknown): v is Subscribable<T> =>
66 70 v !== null && v !== undefined && typeof (v as Subscribable<unknown>).subscribe === "function";
67 71
68 72 export interface Subscribable<T> {
69 73 subscribe(consumer: Observer<T>): Unsubscribable;
70 74 }
71 75
72 76 export type AccumulatorFn<T, A> = (acc: A, value: T) => A;
73 77
74 78 export type OperatorFn<T, U> = (source: Observable<T>) => Observable<U>;
75 79
76 80 /** The observable source of items. */
77 81 export interface Observable<T> extends Subscribable<T> {
78 82 /** Transforms elements of the sequence with the specified mapper
79 83 *
80 84 * @param mapper The mapper used to transform the values
81 85 */
82 86 map<T2>(mapper: (value: T) => T2): Observable<T2>;
83 87
84 88 /** Filters elements of the sequence. The resulting sequence will
85 89 * contain only elements which match the specified predicate.
86 90 *
87 91 * @param predicate The filter predicate.
88 92 */
89 93 filter(predicate: (value: T) => boolean): Observable<T>;
90 94
91 95 /** Completes the sequence once the condition is met.
92 96 * @param predicate The condition which should be met to complete the sequence
93 97 */
94 98 until(predicate: (value: T) => boolean): Observable<T>;
95 99
96 100 /** Keeps the sequence running while elements satisfy the condition.
97 101 *
98 102 * @param predicate The condition which should be met to continue.
99 103 */
100 104 while(predicate: (value: T) => boolean): Observable<T>;
101 105
102 106 /** Applies accumulator to each value in the sequence and
103 107 * emits the accumulated value for each source element
104 108 *
105 109 * @param accumulator
106 110 * @param initial
107 111 */
108 112 scan<A>(accumulator: AccumulatorFn<T, A>, initial: A): Observable<A>;
109 113 scan(accumulator: AccumulatorFn<T, T>): Observable<T>;
110 114
111 115 /** Applies accumulator to each value in the sequence and
112 116 * emits the accumulated value at the end of the sequence
113 117 *
114 118 * @param accumulator
115 119 * @param initial
116 120 */
117 121 reduce<A>(accumulator: AccumulatorFn<T, A>, initial: A): Observable<A>;
118 122 reduce(accumulator: AccumulatorFn<T, T>): Observable<T>;
119 123
120 124 /** Concatenates the specified sequences with this observable
121 125 *
122 126 * @param seq sequences to concatenate with the current observable
123 127 *
124 128 * The concatenation doesn't accumulate values from the specified sequences,
125 129 * The result of the concatenation is the new observable which will switch
126 130 * to the next observable after the previous one completes. Values emitted
127 131 * before the next observable being active are lost.
128 132 */
129 133 cat(...seq: Subscribable<T>[]): Observable<T>;
130 134
131 135
132 136 /** Pipes the specified operator to produce the new observable
133 137 * @param op The operator consumes this observable and produces a new one
134 138 *
135 139 * The operator is a higher order function which takes a source observable
136 140 * and returns a producer for the new observable.
137 141 *
138 142 * This function can be used to create a complex mapping between source and
139 143 * resulting observables. The operator may have a state (or a side effect)
140 144 * and can be connected to multiple observables.
141 145 */
142 146 pipe<U>(op: OperatorFn<T, U>): Observable<U>;
143 147
144 148 /** Waits for the next event to occur and returns a promise for the next value
145 149 * @param ct Cancellation token
146 150 */
147 151 next(ct?: ICancellation): Promise<T>;
148 152
149 153 /** Collects items of the sequence to the array. */
150 154 collect(ct?: ICancellation): Promise<T[]>;
151 155 }
152 156
153 157 const noop = () => { };
154 158
155 159 const errorFallback = (e: unknown) => trace.error("Unhandled observable error: {0}", e);
156 160
157 161 const sink = <T>(consumer: Observer<T>) => {
162 // eslint-disable-next-line @typescript-eslint/unbound-method
158 163 const { next, error, complete } = consumer;
159 164 return {
160 165 next: next ? next.bind(consumer) : noop,
161 error: error ? error.bind(consumer) : errorFallback,
162 complete: complete ? complete.bind(consumer) : noop,
163 isClosed: () => false
166 error: error ? error.bind(consumer) : errorFallback, // report unhandled errors
167 complete: complete ? complete.bind(consumer) : noop
164 168 };
165 169 };
166 170
167 171 /** Wraps the producer to handle tear down logic and subscription management
168 172 *
173 * The resulting producer will invoke cleanup logic on error or complete events
174 * and will prevent calling of any method from the sink.
175 *
169 176 * @param producer The producer to wrap
170 177 * @returns The wrapper producer
171 178 */
172 const fuse = <T>(producer: Producer<T>) => ({ next, error, complete }: Sink<T>) => {
179 const fuse = <T>(producer: Producer<T>) => ({ next, error, complete }: FusedSink<T>) => {
173 180 let done = false;
174 181 let cleanup = noop;
175 182
176 183 const _fin = <A extends unknown[]>(fn: (...args: A) => void) =>
177 184 (...args: A) => done ?
178 185 void (0) :
179 186 (done = true, cleanup(), fn(...args));
180 187
181 188 const _fin0 = () => done ? void (0) : (done = true, cleanup());
182 189
183 190 const safeSink = {
184 191 next: (value: T) => { !done && next(value); },
185 192 error: _fin(error),
186 193 complete: _fin(complete),
187 194 isClosed: () => done
188 195 };
196 // call the producer
189 197 cleanup = producer(safeSink) ?? noop;
198 // if the producer throws exception bypass it to the caller rather then to
199 // the sink. This is a feature.
200
201 // if the producer completed the sequence immediately call the cleanup in place
190 202 return done ? cleanup() : _fin0;
191 203 };
192 204
193 const _observe = <T>(producer: Producer<T>): Observable<T> => ({
205 const _observe = <T>(producer: FusedProducer<T>): Observable<T> => ({
194 206 subscribe: (consumer: Observer<T>) => ({
195 207 unsubscribe: producer(sink(consumer)) ?? noop
196 208 }),
197 209
198 210 map: (mapper) => _observe(({ next, ...rest }) =>
199 211 producer({
200 212 next: next !== noop ? (v: T) => next(mapper(v)) : noop,
201 213 ...rest
202 214 })
203 215 ),
204 216
205 217 filter: (predicate) => _observe(({ next, ...rest }) =>
206 218 producer({
207 219 next: next !== noop ? (v: T) => predicate(v) ? next(v) : void (0) : noop,
208 220 ...rest
209 221 })
210 222 ),
211 223
212 224 until: predicate => _observe(({ next, complete, ...rest }) =>
213 225 producer({
214 226 next: v => predicate(v) ? complete() : next(v),
215 227 complete,
216 228 ...rest
217 229 })
218 230 ),
219 231
220 232 while: predicate => _observe(({ next, complete, ...rest }) =>
221 233 producer({
222 234 next: v => predicate(v) ? next(v) : complete(),
223 235 complete,
224 236 ...rest
225 237 })
226 238 ),
227 239
228 240 scan: <A>(...args: [AccumulatorFn<T, A>, A] | [AccumulatorFn<T, T>]) => _observe<T | A>(({ next, ...rest }) => {
229 241 if (args.length === 1) {
230 242 const [accumulator] = args;
231 243 let _acc: T;
232 244 let index = 0;
233 245 return producer({
234 246 next: next !== noop ? (v: T) => next(index++ === 0 ? _acc = v : _acc = accumulator(_acc, v)) : noop,
235 247 ...rest
236 248 });
237 249 } else {
238 250 const [accumulator, initial] = args;
239 251 let _acc = initial;
240 252 return producer({
241 253 next: next !== noop ? (v: T) => next(_acc = accumulator(_acc, v)) : noop,
242 254 ...rest
243 255 });
244 256 }
245 257 }),
246 258
247 reduce: <A>(...args: [AccumulatorFn<T, A>, A] | [AccumulatorFn<T, T>]) => _observe<T | A>(({ next, complete, error, ...rest }) => {
259 reduce: <A>(...args: [AccumulatorFn<T, A>, A] | [AccumulatorFn<T, T>]) => _observe<T | A>(({ next, complete, error }) => {
248 260 if (args.length === 1) {
249 261 const [accumulator] = args;
250 262 let _acc: T;
251 263 let index = 0;
252 264 return producer({
253 265 next: next !== noop ? (v: T) => {
254 266 _acc = index++ === 0 ? v : accumulator(_acc, v);
255 267 } : noop,
256 268 complete: () => {
257 269 if (index === 0) {
258 270 error(new Error("The sequence can't be empty"));
259 271 } else {
260 272 next(_acc);
261 273 complete();
262 274 }
263 275 },
264 error,
265 ...rest
276 error
266 277 });
267 278 } else {
268 279 const [accumulator, initial] = args;
269 280 let _acc = initial;
270 281 return producer({
271 282 next: next !== noop ? (v: T) => {
272 283 _acc = accumulator(_acc, v);
273 284 } : noop,
274 285 complete: () => {
275 286 next(_acc);
276 287 complete();
277 288 },
278 error,
279 ...rest
289 error
280 290 });
281 291 }
282 292 }),
283 293
284 294 cat: (...seq) => _observe(({ next, complete: final, ...rest }) => {
285 295 let cleanup: () => void;
286 296 const len = seq.length;
287 297 const complete = (i: number) => i < len ?
288 298 () => {
289 299 const subscription = seq[i].subscribe({ next, complete: complete(i + 1), ...rest });
290 300 cleanup = subscription.unsubscribe.bind(subscription);
291 301 } : final;
292 302
293 303 cleanup = producer({ next, complete: complete(0), ...rest }) ?? noop;
294 304
295 305 return () => cleanup();
296 306 }),
297 307
298 308 pipe: <U>(op: OperatorFn<T, U>) => op(_observe(producer)),
299 309
300 310 next: collect(
301 311 producer,
302 ({ next, complete, error, isClosed }) => ({
312 ({ next, complete, error }) => ({
303 313 next: v => (next(v), complete()),
304 314 complete: () => error(new Error("The sequence is empty")),
305 error,
306 isClosed
315 error
307 316 })
308 317 ),
309 318
310 319 collect: collect(
311 320 producer,
312 ({ next, complete, ...rest }) => {
321 ({ next, complete, error}) => {
313 322 const data: T[] = [];
314 323 return {
315 324 next: v => data.push(v),
316 325 complete: () => (next(data), complete()),
317 ...rest
326 error
318 327 };
319 328 }
320 329 )
321 330 });
322 331
323 332 const collect = <T, U>(
324 producer: Producer<T>,
325 collector: (result: Sink<U>) => Sink<T>
333 producer: FusedProducer<T>,
334 collector: (result: FusedSink<U>) => FusedSink<T>
326 335 ) => (ct = Cancellation.none) => new Promise<U>((resolve, reject) => {
327 336 const fused = fuse<U>(({ next, complete, error, isClosed }) => {
328 337 const h = ct.register(error);
329 338 const cleanup = !isClosed() ?
330 producer(collector({ next, complete, error, isClosed })) ?? noop :
339 producer(collector({ next, complete, error })) ?? noop :
331 340 noop;
332 341
333 342 return () => {
334 343 h.destroy();
335 344 cleanup();
336 345 };
337 346 });
338 347
339 348 fused({
340 349 next: resolve,
341 350 error: reject,
342 complete: noop,
343 isClosed: () => false
351 complete: noop
344 352 });
345 353 });
346 354
347 355 export const observe = <T>(producer: Producer<T>) => _observe(fuse(producer));
348 356
349 357 /** Converts an array to the observable sequence of its elements. */
350 358 export const ofArray = <T>(items: T[]) => _observe<T>(
351 359 ({ next, complete }) => (
352 360 items.forEach(next),
353 361 complete()
354 362 )
355 363 );
356 364
357 365 /** Converts a subscribable to the observable */
358 366 export const ofSubscribable = <T>(subscribable: Subscribable<T>) =>
359 observe(sink => {
367 observe<T>(sink => {
360 368 const subscription = subscribable.subscribe(sink);
361 369 return () => subscription.unsubscribe();
362 370 });
363 371
364 372 const of1 = <T>(item: T | PromiseLike<T>) => observe<T>(
365 373 ({ next, error, complete }) =>
366 374 isPromise(item) ?
367 375 void item.then(
368 376 v => (next(v), complete()),
369 377 error
370 378 ) :
371 379 (next(item), complete())
372 380 );
373 381
374 382 /** Converts a list of parameter values to the observable sequence. The
375 383 * order of elements in the list will be preserved in the resulting sequence.
376 384 */
377 385 export const of = <T>(...items: (T | PromiseLike<T>)[]) => items.length === 1 ?
378 386 of1(items[0]) :
379 387 observe<T>(
380 388 ({ next, error, complete, isClosed }) => {
381 389 const n = items.length;
382 390
383 391 const _next = (start: number) => {
384 392 if (start > 0 && isClosed()) // when resumed
385 393 return;
386 394
387 395 for (let i = start; i < n; i++) {
388 396 const r = items[i];
389 397 if (isPromise(r)) {
390 398 r.then(v => (next(v), _next(i + 1)), error);
391 399 return; // suspend
392 400 } else {
393 401 next(r);
394 402 }
395 403 }
396 404 complete();
397 405 };
398 406
399 407 _next(0);
400 408 }
401 409 );
402 410
403 411 export const empty = _observe<never>(({ complete }) => complete());
@@ -1,50 +1,60
1 1 import { Observable, Sink, Subscribable, observe } from "../observable";
2 2
3 3 const noop = () => { };
4 4
5 /** Joins multiple subscribers to the single one. The resulting subscriber
5 /** Connects multiple subscribers to the single producer. The producer
6 6 * will be created when the first client subscribes and will be released
7 7 * with the the last client unsubscribed.
8 8 *
9 9 * Use this wrapper to prevent spawning multiple producers.
10 10 *
11 * The emitted values are not cached therefore the new subscriber will not receive
12 * the values emitted before it has been subscribed.
13 *
11 14 * @param source The source observable
12 * @returns The wrapped producer
15 * @returns The new observable
13 16 */
14 17 export const subject = <T>(source: Subscribable<T>): Observable<T> => {
15 let subscribers: Sink<T>[] = [];
18 let subscribers: Sink<T>[] = []; // the list of active subscribers
16 19
17 let subscription = { unsubscribe: noop };
20 let subscription = { unsubscribe: noop }; // current subscription
18 21
19 22 // cleanup method to release resources held by this subscription
20 23 const cleanup = (cb: (item: Sink<T>) => void) => {
21 24 const _subscribers = subscribers;
22 subscribers = [];
25 subscribers = []; // this will prevent a client cleanup login to run
23 26 _subscribers.forEach(cb);
24 subscription.unsubscribe();
27 // we don't need subscription.unsubscribe(), because cleanup is called
28 // from complete or error methods.
25 29 };
26 30
27 31 const sink: Sink<T> = {
28 32 isClosed: () => false,
29 33 complete: () => cleanup(s => s.complete()),
30 34 error: e => cleanup(s => s.error(e)),
31 35 next: v => subscribers.forEach(s => s.next(v))
32 36 };
33 37
34 38 return observe(client => {
35 39 const _subscribers = subscribers;
36 40 subscribers.push(client);
37 if (subscribers.length === 1)
38 subscription = source.subscribe(sink);
41 if (subscribers.length === 1) // this is the first client
42 subscription = source.subscribe(sink); // activate the producer
39 43
40 44 return () => {
45 // this is a cleanup logic for an individual client
41 46 if (_subscribers === subscribers) {
47 // is the current subscription to the producer is active
48
49 // find this client in the list of subscribers
42 50 const pos = subscribers.indexOf(client);
43 51 if (pos >= 0)
44 52 subscribers.splice(pos, 1);
53
54 // is this is the last subscriber we need to release the producer
45 55 if (!subscribers.length)
46 56 subscription.unsubscribe();
47 57 }
48 58 };
49 59 });
50 60 };
General Comments 0
You need to be logged in to leave comments. Login now