##// END OF EJS Templates
code review, minor refactoring
cin -
r142:894b8239b953 v1.9.0-rc5 default
parent child
Show More
@@ -1,403 +1,411
1 import { id as mid} from "module";
1 import { id as mid} from "module";
2 import { Cancellation } from "@implab/core-amd/Cancellation";
2 import { Cancellation } from "@implab/core-amd/Cancellation";
3 import { ICancellation } from "@implab/core-amd/interfaces";
3 import { ICancellation } from "@implab/core-amd/interfaces";
4 import { TraceSource } from "@implab/core-amd/log/TraceSource";
4 import { TraceSource } from "@implab/core-amd/log/TraceSource";
5 import { isPromise } from "@implab/core-amd/safe";
5 import { isPromise } from "@implab/core-amd/safe";
6
6
7 const trace = TraceSource.get(mid);
7 const trace = TraceSource.get(mid);
8
8
9 /**
9 /**
10 * The interface for the consumer of an observable sequence
10 * The interface for the consumer of an observable sequence
11 */
11 */
12 export interface Observer<T> {
12 export interface Observer<T> {
13 /**
13 /**
14 * Called for the next element in the sequence
14 * Called for the next element in the sequence
15 */
15 */
16 next?: (value: T) => void;
16 next?(value: T): void;
17
17
18 /**
18 /**
19 * Called once when the error occurs in the sequence.
19 * Called once when the error occurs in the sequence.
20 */
20 */
21 error?: (e: unknown) => void;
21 error?(e: unknown): void;
22
22
23 /**
23 /**
24 * Called once at the end of the sequence.
24 * Called once at the end of the sequence.
25 */
25 */
26 complete?: () => void;
26 complete?(): void;
27 }
27 }
28
28
29 /**
29 /**
30 * The group of functions to feed an observable. These methods are provided to
30 * The group of functions to feed an observable. These methods are provided to
31 * the producer to generate a stream of events.
31 * the producer to generate a stream of events.
32 */
32 */
33 export type Sink<T> = {
33 export type Sink<T> = {
34 /**
34 /**
35 * Call to send the next element in the sequence
35 * Call to send the next element in the sequence
36 */
36 */
37 next: (value: T) => void;
37 next: (value: T) => void;
38
38
39 /**
39 /**
40 * Call to notify about the error occurred in the sequence.
40 * Call to notify about the error occurred in the sequence.
41 */
41 */
42 error: (e: unknown) => void;
42 error: (e: unknown) => void;
43
43
44 /**
44 /**
45 * Call to signal the end of the sequence.
45 * Call to signal the end of the sequence.
46 */
46 */
47 complete: () => void;
47 complete: () => void;
48
48
49 /**
49 /**
50 * Checks whether the sink is accepting new elements. It's safe to
50 * Checks whether the sink is accepting new elements. It's safe to
51 * send elements to the closed sink.
51 * send elements to the closed sink.
52 */
52 */
53 isClosed: () => boolean;
53 isClosed: () => boolean;
54 };
54 };
55
55
56 export type Producer<T> = (sink: Sink<T>) => (void | (() => void));
56 export type Producer<T> = (sink: Sink<T>) => (void | (() => void));
57
57
58 type FusedSink<T> = Omit<Sink<T>, "isClosed">;
59
60 type FusedProducer<T> = (sink: FusedSink<T>) => (void | (() => void));
61
58 export interface Unsubscribable {
62 export interface Unsubscribable {
59 unsubscribe(): void;
63 unsubscribe(): void;
60 }
64 }
61
65
62 export const isUnsubscribable = (v: unknown): v is Unsubscribable =>
66 export const isUnsubscribable = (v: unknown): v is Unsubscribable =>
63 v !== null && v !== undefined && typeof (v as Unsubscribable).unsubscribe === "function";
67 v !== null && v !== undefined && typeof (v as Unsubscribable).unsubscribe === "function";
64
68
65 export const isSubscribable = <T = unknown>(v: unknown): v is Subscribable<T> =>
69 export const isSubscribable = <T = unknown>(v: unknown): v is Subscribable<T> =>
66 v !== null && v !== undefined && typeof (v as Subscribable<unknown>).subscribe === "function";
70 v !== null && v !== undefined && typeof (v as Subscribable<unknown>).subscribe === "function";
67
71
68 export interface Subscribable<T> {
72 export interface Subscribable<T> {
69 subscribe(consumer: Observer<T>): Unsubscribable;
73 subscribe(consumer: Observer<T>): Unsubscribable;
70 }
74 }
71
75
72 export type AccumulatorFn<T, A> = (acc: A, value: T) => A;
76 export type AccumulatorFn<T, A> = (acc: A, value: T) => A;
73
77
74 export type OperatorFn<T, U> = (source: Observable<T>) => Observable<U>;
78 export type OperatorFn<T, U> = (source: Observable<T>) => Observable<U>;
75
79
76 /** The observable source of items. */
80 /** The observable source of items. */
77 export interface Observable<T> extends Subscribable<T> {
81 export interface Observable<T> extends Subscribable<T> {
78 /** Transforms elements of the sequence with the specified mapper
82 /** Transforms elements of the sequence with the specified mapper
79 *
83 *
80 * @param mapper The mapper used to transform the values
84 * @param mapper The mapper used to transform the values
81 */
85 */
82 map<T2>(mapper: (value: T) => T2): Observable<T2>;
86 map<T2>(mapper: (value: T) => T2): Observable<T2>;
83
87
84 /** Filters elements of the sequence. The resulting sequence will
88 /** Filters elements of the sequence. The resulting sequence will
85 * contain only elements which match the specified predicate.
89 * contain only elements which match the specified predicate.
86 *
90 *
87 * @param predicate The filter predicate.
91 * @param predicate The filter predicate.
88 */
92 */
89 filter(predicate: (value: T) => boolean): Observable<T>;
93 filter(predicate: (value: T) => boolean): Observable<T>;
90
94
91 /** Completes the sequence once the condition is met.
95 /** Completes the sequence once the condition is met.
92 * @param predicate The condition which should be met to complete the sequence
96 * @param predicate The condition which should be met to complete the sequence
93 */
97 */
94 until(predicate: (value: T) => boolean): Observable<T>;
98 until(predicate: (value: T) => boolean): Observable<T>;
95
99
96 /** Keeps the sequence running while elements satisfy the condition.
100 /** Keeps the sequence running while elements satisfy the condition.
97 *
101 *
98 * @param predicate The condition which should be met to continue.
102 * @param predicate The condition which should be met to continue.
99 */
103 */
100 while(predicate: (value: T) => boolean): Observable<T>;
104 while(predicate: (value: T) => boolean): Observable<T>;
101
105
102 /** Applies accumulator to each value in the sequence and
106 /** Applies accumulator to each value in the sequence and
103 * emits the accumulated value for each source element
107 * emits the accumulated value for each source element
104 *
108 *
105 * @param accumulator
109 * @param accumulator
106 * @param initial
110 * @param initial
107 */
111 */
108 scan<A>(accumulator: AccumulatorFn<T, A>, initial: A): Observable<A>;
112 scan<A>(accumulator: AccumulatorFn<T, A>, initial: A): Observable<A>;
109 scan(accumulator: AccumulatorFn<T, T>): Observable<T>;
113 scan(accumulator: AccumulatorFn<T, T>): Observable<T>;
110
114
111 /** Applies accumulator to each value in the sequence and
115 /** Applies accumulator to each value in the sequence and
112 * emits the accumulated value at the end of the sequence
116 * emits the accumulated value at the end of the sequence
113 *
117 *
114 * @param accumulator
118 * @param accumulator
115 * @param initial
119 * @param initial
116 */
120 */
117 reduce<A>(accumulator: AccumulatorFn<T, A>, initial: A): Observable<A>;
121 reduce<A>(accumulator: AccumulatorFn<T, A>, initial: A): Observable<A>;
118 reduce(accumulator: AccumulatorFn<T, T>): Observable<T>;
122 reduce(accumulator: AccumulatorFn<T, T>): Observable<T>;
119
123
120 /** Concatenates the specified sequences with this observable
124 /** Concatenates the specified sequences with this observable
121 *
125 *
122 * @param seq sequences to concatenate with the current observable
126 * @param seq sequences to concatenate with the current observable
123 *
127 *
124 * The concatenation doesn't accumulate values from the specified sequences,
128 * The concatenation doesn't accumulate values from the specified sequences,
125 * The result of the concatenation is the new observable which will switch
129 * The result of the concatenation is the new observable which will switch
126 * to the next observable after the previous one completes. Values emitted
130 * to the next observable after the previous one completes. Values emitted
127 * before the next observable being active are lost.
131 * before the next observable being active are lost.
128 */
132 */
129 cat(...seq: Subscribable<T>[]): Observable<T>;
133 cat(...seq: Subscribable<T>[]): Observable<T>;
130
134
131
135
132 /** Pipes the specified operator to produce the new observable
136 /** Pipes the specified operator to produce the new observable
133 * @param op The operator consumes this observable and produces a new one
137 * @param op The operator consumes this observable and produces a new one
134 *
138 *
135 * The operator is a higher order function which takes a source observable
139 * The operator is a higher order function which takes a source observable
136 * and returns a producer for the new observable.
140 * and returns a producer for the new observable.
137 *
141 *
138 * This function can be used to create a complex mapping between source and
142 * This function can be used to create a complex mapping between source and
139 * resulting observables. The operator may have a state (or a side effect)
143 * resulting observables. The operator may have a state (or a side effect)
140 * and can be connected to multiple observables.
144 * and can be connected to multiple observables.
141 */
145 */
142 pipe<U>(op: OperatorFn<T, U>): Observable<U>;
146 pipe<U>(op: OperatorFn<T, U>): Observable<U>;
143
147
144 /** Waits for the next event to occur and returns a promise for the next value
148 /** Waits for the next event to occur and returns a promise for the next value
145 * @param ct Cancellation token
149 * @param ct Cancellation token
146 */
150 */
147 next(ct?: ICancellation): Promise<T>;
151 next(ct?: ICancellation): Promise<T>;
148
152
149 /** Collects items of the sequence to the array. */
153 /** Collects items of the sequence to the array. */
150 collect(ct?: ICancellation): Promise<T[]>;
154 collect(ct?: ICancellation): Promise<T[]>;
151 }
155 }
152
156
153 const noop = () => { };
157 const noop = () => { };
154
158
155 const errorFallback = (e: unknown) => trace.error("Unhandled observable error: {0}", e);
159 const errorFallback = (e: unknown) => trace.error("Unhandled observable error: {0}", e);
156
160
157 const sink = <T>(consumer: Observer<T>) => {
161 const sink = <T>(consumer: Observer<T>) => {
162 // eslint-disable-next-line @typescript-eslint/unbound-method
158 const { next, error, complete } = consumer;
163 const { next, error, complete } = consumer;
159 return {
164 return {
160 next: next ? next.bind(consumer) : noop,
165 next: next ? next.bind(consumer) : noop,
161 error: error ? error.bind(consumer) : errorFallback,
166 error: error ? error.bind(consumer) : errorFallback, // report unhandled errors
162 complete: complete ? complete.bind(consumer) : noop,
167 complete: complete ? complete.bind(consumer) : noop
163 isClosed: () => false
164 };
168 };
165 };
169 };
166
170
167 /** Wraps the producer to handle tear down logic and subscription management
171 /** Wraps the producer to handle tear down logic and subscription management
168 *
172 *
173 * The resulting producer will invoke cleanup logic on error or complete events
174 * and will prevent calling of any method from the sink.
175 *
169 * @param producer The producer to wrap
176 * @param producer The producer to wrap
170 * @returns The wrapper producer
177 * @returns The wrapper producer
171 */
178 */
172 const fuse = <T>(producer: Producer<T>) => ({ next, error, complete }: Sink<T>) => {
179 const fuse = <T>(producer: Producer<T>) => ({ next, error, complete }: FusedSink<T>) => {
173 let done = false;
180 let done = false;
174 let cleanup = noop;
181 let cleanup = noop;
175
182
176 const _fin = <A extends unknown[]>(fn: (...args: A) => void) =>
183 const _fin = <A extends unknown[]>(fn: (...args: A) => void) =>
177 (...args: A) => done ?
184 (...args: A) => done ?
178 void (0) :
185 void (0) :
179 (done = true, cleanup(), fn(...args));
186 (done = true, cleanup(), fn(...args));
180
187
181 const _fin0 = () => done ? void (0) : (done = true, cleanup());
188 const _fin0 = () => done ? void (0) : (done = true, cleanup());
182
189
183 const safeSink = {
190 const safeSink = {
184 next: (value: T) => { !done && next(value); },
191 next: (value: T) => { !done && next(value); },
185 error: _fin(error),
192 error: _fin(error),
186 complete: _fin(complete),
193 complete: _fin(complete),
187 isClosed: () => done
194 isClosed: () => done
188 };
195 };
196 // call the producer
189 cleanup = producer(safeSink) ?? noop;
197 cleanup = producer(safeSink) ?? noop;
198 // if the producer throws exception bypass it to the caller rather then to
199 // the sink. This is a feature.
200
201 // if the producer completed the sequence immediately call the cleanup in place
190 return done ? cleanup() : _fin0;
202 return done ? cleanup() : _fin0;
191 };
203 };
192
204
193 const _observe = <T>(producer: Producer<T>): Observable<T> => ({
205 const _observe = <T>(producer: FusedProducer<T>): Observable<T> => ({
194 subscribe: (consumer: Observer<T>) => ({
206 subscribe: (consumer: Observer<T>) => ({
195 unsubscribe: producer(sink(consumer)) ?? noop
207 unsubscribe: producer(sink(consumer)) ?? noop
196 }),
208 }),
197
209
198 map: (mapper) => _observe(({ next, ...rest }) =>
210 map: (mapper) => _observe(({ next, ...rest }) =>
199 producer({
211 producer({
200 next: next !== noop ? (v: T) => next(mapper(v)) : noop,
212 next: next !== noop ? (v: T) => next(mapper(v)) : noop,
201 ...rest
213 ...rest
202 })
214 })
203 ),
215 ),
204
216
205 filter: (predicate) => _observe(({ next, ...rest }) =>
217 filter: (predicate) => _observe(({ next, ...rest }) =>
206 producer({
218 producer({
207 next: next !== noop ? (v: T) => predicate(v) ? next(v) : void (0) : noop,
219 next: next !== noop ? (v: T) => predicate(v) ? next(v) : void (0) : noop,
208 ...rest
220 ...rest
209 })
221 })
210 ),
222 ),
211
223
212 until: predicate => _observe(({ next, complete, ...rest }) =>
224 until: predicate => _observe(({ next, complete, ...rest }) =>
213 producer({
225 producer({
214 next: v => predicate(v) ? complete() : next(v),
226 next: v => predicate(v) ? complete() : next(v),
215 complete,
227 complete,
216 ...rest
228 ...rest
217 })
229 })
218 ),
230 ),
219
231
220 while: predicate => _observe(({ next, complete, ...rest }) =>
232 while: predicate => _observe(({ next, complete, ...rest }) =>
221 producer({
233 producer({
222 next: v => predicate(v) ? next(v) : complete(),
234 next: v => predicate(v) ? next(v) : complete(),
223 complete,
235 complete,
224 ...rest
236 ...rest
225 })
237 })
226 ),
238 ),
227
239
228 scan: <A>(...args: [AccumulatorFn<T, A>, A] | [AccumulatorFn<T, T>]) => _observe<T | A>(({ next, ...rest }) => {
240 scan: <A>(...args: [AccumulatorFn<T, A>, A] | [AccumulatorFn<T, T>]) => _observe<T | A>(({ next, ...rest }) => {
229 if (args.length === 1) {
241 if (args.length === 1) {
230 const [accumulator] = args;
242 const [accumulator] = args;
231 let _acc: T;
243 let _acc: T;
232 let index = 0;
244 let index = 0;
233 return producer({
245 return producer({
234 next: next !== noop ? (v: T) => next(index++ === 0 ? _acc = v : _acc = accumulator(_acc, v)) : noop,
246 next: next !== noop ? (v: T) => next(index++ === 0 ? _acc = v : _acc = accumulator(_acc, v)) : noop,
235 ...rest
247 ...rest
236 });
248 });
237 } else {
249 } else {
238 const [accumulator, initial] = args;
250 const [accumulator, initial] = args;
239 let _acc = initial;
251 let _acc = initial;
240 return producer({
252 return producer({
241 next: next !== noop ? (v: T) => next(_acc = accumulator(_acc, v)) : noop,
253 next: next !== noop ? (v: T) => next(_acc = accumulator(_acc, v)) : noop,
242 ...rest
254 ...rest
243 });
255 });
244 }
256 }
245 }),
257 }),
246
258
247 reduce: <A>(...args: [AccumulatorFn<T, A>, A] | [AccumulatorFn<T, T>]) => _observe<T | A>(({ next, complete, error, ...rest }) => {
259 reduce: <A>(...args: [AccumulatorFn<T, A>, A] | [AccumulatorFn<T, T>]) => _observe<T | A>(({ next, complete, error }) => {
248 if (args.length === 1) {
260 if (args.length === 1) {
249 const [accumulator] = args;
261 const [accumulator] = args;
250 let _acc: T;
262 let _acc: T;
251 let index = 0;
263 let index = 0;
252 return producer({
264 return producer({
253 next: next !== noop ? (v: T) => {
265 next: next !== noop ? (v: T) => {
254 _acc = index++ === 0 ? v : accumulator(_acc, v);
266 _acc = index++ === 0 ? v : accumulator(_acc, v);
255 } : noop,
267 } : noop,
256 complete: () => {
268 complete: () => {
257 if (index === 0) {
269 if (index === 0) {
258 error(new Error("The sequence can't be empty"));
270 error(new Error("The sequence can't be empty"));
259 } else {
271 } else {
260 next(_acc);
272 next(_acc);
261 complete();
273 complete();
262 }
274 }
263 },
275 },
264 error,
276 error
265 ...rest
266 });
277 });
267 } else {
278 } else {
268 const [accumulator, initial] = args;
279 const [accumulator, initial] = args;
269 let _acc = initial;
280 let _acc = initial;
270 return producer({
281 return producer({
271 next: next !== noop ? (v: T) => {
282 next: next !== noop ? (v: T) => {
272 _acc = accumulator(_acc, v);
283 _acc = accumulator(_acc, v);
273 } : noop,
284 } : noop,
274 complete: () => {
285 complete: () => {
275 next(_acc);
286 next(_acc);
276 complete();
287 complete();
277 },
288 },
278 error,
289 error
279 ...rest
280 });
290 });
281 }
291 }
282 }),
292 }),
283
293
284 cat: (...seq) => _observe(({ next, complete: final, ...rest }) => {
294 cat: (...seq) => _observe(({ next, complete: final, ...rest }) => {
285 let cleanup: () => void;
295 let cleanup: () => void;
286 const len = seq.length;
296 const len = seq.length;
287 const complete = (i: number) => i < len ?
297 const complete = (i: number) => i < len ?
288 () => {
298 () => {
289 const subscription = seq[i].subscribe({ next, complete: complete(i + 1), ...rest });
299 const subscription = seq[i].subscribe({ next, complete: complete(i + 1), ...rest });
290 cleanup = subscription.unsubscribe.bind(subscription);
300 cleanup = subscription.unsubscribe.bind(subscription);
291 } : final;
301 } : final;
292
302
293 cleanup = producer({ next, complete: complete(0), ...rest }) ?? noop;
303 cleanup = producer({ next, complete: complete(0), ...rest }) ?? noop;
294
304
295 return () => cleanup();
305 return () => cleanup();
296 }),
306 }),
297
307
298 pipe: <U>(op: OperatorFn<T, U>) => op(_observe(producer)),
308 pipe: <U>(op: OperatorFn<T, U>) => op(_observe(producer)),
299
309
300 next: collect(
310 next: collect(
301 producer,
311 producer,
302 ({ next, complete, error, isClosed }) => ({
312 ({ next, complete, error }) => ({
303 next: v => (next(v), complete()),
313 next: v => (next(v), complete()),
304 complete: () => error(new Error("The sequence is empty")),
314 complete: () => error(new Error("The sequence is empty")),
305 error,
315 error
306 isClosed
307 })
316 })
308 ),
317 ),
309
318
310 collect: collect(
319 collect: collect(
311 producer,
320 producer,
312 ({ next, complete, ...rest }) => {
321 ({ next, complete, error}) => {
313 const data: T[] = [];
322 const data: T[] = [];
314 return {
323 return {
315 next: v => data.push(v),
324 next: v => data.push(v),
316 complete: () => (next(data), complete()),
325 complete: () => (next(data), complete()),
317 ...rest
326 error
318 };
327 };
319 }
328 }
320 )
329 )
321 });
330 });
322
331
323 const collect = <T, U>(
332 const collect = <T, U>(
324 producer: Producer<T>,
333 producer: FusedProducer<T>,
325 collector: (result: Sink<U>) => Sink<T>
334 collector: (result: FusedSink<U>) => FusedSink<T>
326 ) => (ct = Cancellation.none) => new Promise<U>((resolve, reject) => {
335 ) => (ct = Cancellation.none) => new Promise<U>((resolve, reject) => {
327 const fused = fuse<U>(({ next, complete, error, isClosed }) => {
336 const fused = fuse<U>(({ next, complete, error, isClosed }) => {
328 const h = ct.register(error);
337 const h = ct.register(error);
329 const cleanup = !isClosed() ?
338 const cleanup = !isClosed() ?
330 producer(collector({ next, complete, error, isClosed })) ?? noop :
339 producer(collector({ next, complete, error })) ?? noop :
331 noop;
340 noop;
332
341
333 return () => {
342 return () => {
334 h.destroy();
343 h.destroy();
335 cleanup();
344 cleanup();
336 };
345 };
337 });
346 });
338
347
339 fused({
348 fused({
340 next: resolve,
349 next: resolve,
341 error: reject,
350 error: reject,
342 complete: noop,
351 complete: noop
343 isClosed: () => false
344 });
352 });
345 });
353 });
346
354
347 export const observe = <T>(producer: Producer<T>) => _observe(fuse(producer));
355 export const observe = <T>(producer: Producer<T>) => _observe(fuse(producer));
348
356
349 /** Converts an array to the observable sequence of its elements. */
357 /** Converts an array to the observable sequence of its elements. */
350 export const ofArray = <T>(items: T[]) => _observe<T>(
358 export const ofArray = <T>(items: T[]) => _observe<T>(
351 ({ next, complete }) => (
359 ({ next, complete }) => (
352 items.forEach(next),
360 items.forEach(next),
353 complete()
361 complete()
354 )
362 )
355 );
363 );
356
364
357 /** Converts a subscribable to the observable */
365 /** Converts a subscribable to the observable */
358 export const ofSubscribable = <T>(subscribable: Subscribable<T>) =>
366 export const ofSubscribable = <T>(subscribable: Subscribable<T>) =>
359 observe(sink => {
367 observe<T>(sink => {
360 const subscription = subscribable.subscribe(sink);
368 const subscription = subscribable.subscribe(sink);
361 return () => subscription.unsubscribe();
369 return () => subscription.unsubscribe();
362 });
370 });
363
371
364 const of1 = <T>(item: T | PromiseLike<T>) => observe<T>(
372 const of1 = <T>(item: T | PromiseLike<T>) => observe<T>(
365 ({ next, error, complete }) =>
373 ({ next, error, complete }) =>
366 isPromise(item) ?
374 isPromise(item) ?
367 void item.then(
375 void item.then(
368 v => (next(v), complete()),
376 v => (next(v), complete()),
369 error
377 error
370 ) :
378 ) :
371 (next(item), complete())
379 (next(item), complete())
372 );
380 );
373
381
374 /** Converts a list of parameter values to the observable sequence. The
382 /** Converts a list of parameter values to the observable sequence. The
375 * order of elements in the list will be preserved in the resulting sequence.
383 * order of elements in the list will be preserved in the resulting sequence.
376 */
384 */
377 export const of = <T>(...items: (T | PromiseLike<T>)[]) => items.length === 1 ?
385 export const of = <T>(...items: (T | PromiseLike<T>)[]) => items.length === 1 ?
378 of1(items[0]) :
386 of1(items[0]) :
379 observe<T>(
387 observe<T>(
380 ({ next, error, complete, isClosed }) => {
388 ({ next, error, complete, isClosed }) => {
381 const n = items.length;
389 const n = items.length;
382
390
383 const _next = (start: number) => {
391 const _next = (start: number) => {
384 if (start > 0 && isClosed()) // when resumed
392 if (start > 0 && isClosed()) // when resumed
385 return;
393 return;
386
394
387 for (let i = start; i < n; i++) {
395 for (let i = start; i < n; i++) {
388 const r = items[i];
396 const r = items[i];
389 if (isPromise(r)) {
397 if (isPromise(r)) {
390 r.then(v => (next(v), _next(i + 1)), error);
398 r.then(v => (next(v), _next(i + 1)), error);
391 return; // suspend
399 return; // suspend
392 } else {
400 } else {
393 next(r);
401 next(r);
394 }
402 }
395 }
403 }
396 complete();
404 complete();
397 };
405 };
398
406
399 _next(0);
407 _next(0);
400 }
408 }
401 );
409 );
402
410
403 export const empty = _observe<never>(({ complete }) => complete());
411 export const empty = _observe<never>(({ complete }) => complete());
@@ -1,50 +1,60
1 import { Observable, Sink, Subscribable, observe } from "../observable";
1 import { Observable, Sink, Subscribable, observe } from "../observable";
2
2
3 const noop = () => { };
3 const noop = () => { };
4
4
5 /** Joins multiple subscribers to the single one. The resulting subscriber
5 /** Connects multiple subscribers to the single producer. The producer
6 * will be created when the first client subscribes and will be released
6 * will be created when the first client subscribes and will be released
7 * with the the last client unsubscribed.
7 * with the the last client unsubscribed.
8 *
8 *
9 * Use this wrapper to prevent spawning multiple producers.
9 * Use this wrapper to prevent spawning multiple producers.
10 *
10 *
11 * The emitted values are not cached therefore the new subscriber will not receive
12 * the values emitted before it has been subscribed.
13 *
11 * @param source The source observable
14 * @param source The source observable
12 * @returns The wrapped producer
15 * @returns The new observable
13 */
16 */
14 export const subject = <T>(source: Subscribable<T>): Observable<T> => {
17 export const subject = <T>(source: Subscribable<T>): Observable<T> => {
15 let subscribers: Sink<T>[] = [];
18 let subscribers: Sink<T>[] = []; // the list of active subscribers
16
19
17 let subscription = { unsubscribe: noop };
20 let subscription = { unsubscribe: noop }; // current subscription
18
21
19 // cleanup method to release resources held by this subscription
22 // cleanup method to release resources held by this subscription
20 const cleanup = (cb: (item: Sink<T>) => void) => {
23 const cleanup = (cb: (item: Sink<T>) => void) => {
21 const _subscribers = subscribers;
24 const _subscribers = subscribers;
22 subscribers = [];
25 subscribers = []; // this will prevent a client cleanup login to run
23 _subscribers.forEach(cb);
26 _subscribers.forEach(cb);
24 subscription.unsubscribe();
27 // we don't need subscription.unsubscribe(), because cleanup is called
28 // from complete or error methods.
25 };
29 };
26
30
27 const sink: Sink<T> = {
31 const sink: Sink<T> = {
28 isClosed: () => false,
32 isClosed: () => false,
29 complete: () => cleanup(s => s.complete()),
33 complete: () => cleanup(s => s.complete()),
30 error: e => cleanup(s => s.error(e)),
34 error: e => cleanup(s => s.error(e)),
31 next: v => subscribers.forEach(s => s.next(v))
35 next: v => subscribers.forEach(s => s.next(v))
32 };
36 };
33
37
34 return observe(client => {
38 return observe(client => {
35 const _subscribers = subscribers;
39 const _subscribers = subscribers;
36 subscribers.push(client);
40 subscribers.push(client);
37 if (subscribers.length === 1)
41 if (subscribers.length === 1) // this is the first client
38 subscription = source.subscribe(sink);
42 subscription = source.subscribe(sink); // activate the producer
39
43
40 return () => {
44 return () => {
45 // this is a cleanup logic for an individual client
41 if (_subscribers === subscribers) {
46 if (_subscribers === subscribers) {
47 // is the current subscription to the producer is active
48
49 // find this client in the list of subscribers
42 const pos = subscribers.indexOf(client);
50 const pos = subscribers.indexOf(client);
43 if (pos >= 0)
51 if (pos >= 0)
44 subscribers.splice(pos, 1);
52 subscribers.splice(pos, 1);
53
54 // is this is the last subscriber we need to release the producer
45 if (!subscribers.length)
55 if (!subscribers.length)
46 subscription.unsubscribe();
56 subscription.unsubscribe();
47 }
57 }
48 };
58 };
49 });
59 });
50 };
60 };
General Comments 0
You need to be logged in to leave comments. Login now