##// END OF EJS Templates
Added a fallback error handler to observables, it will report unhandled errors
cin -
r138:98b2c550c676 v1.9.0-rc3 default
parent child
Show More
@@ -1,397 +1,403
1 import { id as mid} from "module";
1 import { Cancellation } from "@implab/core-amd/Cancellation";
2 import { Cancellation } from "@implab/core-amd/Cancellation";
2 import { ICancellation } from "@implab/core-amd/interfaces";
3 import { ICancellation } from "@implab/core-amd/interfaces";
4 import { TraceSource } from "@implab/core-amd/log/TraceSource";
3 import { isPromise } from "@implab/core-amd/safe";
5 import { isPromise } from "@implab/core-amd/safe";
4
6
7 const trace = TraceSource.get(mid);
8
5 /**
9 /**
6 * The interface for the consumer of an observable sequence
10 * The interface for the consumer of an observable sequence
7 */
11 */
8 export interface Observer<T> {
12 export interface Observer<T> {
9 /**
13 /**
10 * Called for the next element in the sequence
14 * Called for the next element in the sequence
11 */
15 */
12 next?: (value: T) => void;
16 next?: (value: T) => void;
13
17
14 /**
18 /**
15 * Called once when the error occurs in the sequence.
19 * Called once when the error occurs in the sequence.
16 */
20 */
17 error?: (e: unknown) => void;
21 error?: (e: unknown) => void;
18
22
19 /**
23 /**
20 * Called once at the end of the sequence.
24 * Called once at the end of the sequence.
21 */
25 */
22 complete?: () => void;
26 complete?: () => void;
23 }
27 }
24
28
25 /**
29 /**
26 * The group of functions to feed an observable. These methods are provided to
30 * The group of functions to feed an observable. These methods are provided to
27 * the producer to generate a stream of events.
31 * the producer to generate a stream of events.
28 */
32 */
29 export type Sink<T> = {
33 export type Sink<T> = {
30 /**
34 /**
31 * Call to send the next element in the sequence
35 * Call to send the next element in the sequence
32 */
36 */
33 next: (value: T) => void;
37 next: (value: T) => void;
34
38
35 /**
39 /**
36 * Call to notify about the error occurred in the sequence.
40 * Call to notify about the error occurred in the sequence.
37 */
41 */
38 error: (e: unknown) => void;
42 error: (e: unknown) => void;
39
43
40 /**
44 /**
41 * Call to signal the end of the sequence.
45 * Call to signal the end of the sequence.
42 */
46 */
43 complete: () => void;
47 complete: () => void;
44
48
45 /**
49 /**
46 * Checks whether the sink is accepting new elements. It's safe to
50 * Checks whether the sink is accepting new elements. It's safe to
47 * send elements to the closed sink.
51 * send elements to the closed sink.
48 */
52 */
49 isClosed: () => boolean;
53 isClosed: () => boolean;
50 };
54 };
51
55
52 export type Producer<T> = (sink: Sink<T>) => (void | (() => void));
56 export type Producer<T> = (sink: Sink<T>) => (void | (() => void));
53
57
54 export interface Unsubscribable {
58 export interface Unsubscribable {
55 unsubscribe(): void;
59 unsubscribe(): void;
56 }
60 }
57
61
58 export const isUnsubscribable = (v: unknown): v is Unsubscribable =>
62 export const isUnsubscribable = (v: unknown): v is Unsubscribable =>
59 v !== null && v !== undefined && typeof (v as Unsubscribable).unsubscribe === "function";
63 v !== null && v !== undefined && typeof (v as Unsubscribable).unsubscribe === "function";
60
64
61 export const isSubscribable = <T = unknown>(v: unknown): v is Subscribable<T> =>
65 export const isSubscribable = <T = unknown>(v: unknown): v is Subscribable<T> =>
62 v !== null && v !== undefined && typeof (v as Subscribable<unknown>).subscribe === "function";
66 v !== null && v !== undefined && typeof (v as Subscribable<unknown>).subscribe === "function";
63
67
64 export interface Subscribable<T> {
68 export interface Subscribable<T> {
65 subscribe(consumer: Observer<T>): Unsubscribable;
69 subscribe(consumer: Observer<T>): Unsubscribable;
66 }
70 }
67
71
68 export type AccumulatorFn<T, A> = (acc: A, value: T) => A;
72 export type AccumulatorFn<T, A> = (acc: A, value: T) => A;
69
73
70 export type OperatorFn<T, U> = (source: Observable<T>) => Observable<U>;
74 export type OperatorFn<T, U> = (source: Observable<T>) => Observable<U>;
71
75
72 /** The observable source of items. */
76 /** The observable source of items. */
73 export interface Observable<T> extends Subscribable<T> {
77 export interface Observable<T> extends Subscribable<T> {
74 /** Transforms elements of the sequence with the specified mapper
78 /** Transforms elements of the sequence with the specified mapper
75 *
79 *
76 * @param mapper The mapper used to transform the values
80 * @param mapper The mapper used to transform the values
77 */
81 */
78 map<T2>(mapper: (value: T) => T2): Observable<T2>;
82 map<T2>(mapper: (value: T) => T2): Observable<T2>;
79
83
80 /** Filters elements of the sequence. The resulting sequence will
84 /** Filters elements of the sequence. The resulting sequence will
81 * contain only elements which match the specified predicate.
85 * contain only elements which match the specified predicate.
82 *
86 *
83 * @param predicate The filter predicate.
87 * @param predicate The filter predicate.
84 */
88 */
85 filter(predicate: (value: T) => boolean): Observable<T>;
89 filter(predicate: (value: T) => boolean): Observable<T>;
86
90
87 /** Completes the sequence once the condition is met.
91 /** Completes the sequence once the condition is met.
88 * @param predicate The condition which should be met to complete the sequence
92 * @param predicate The condition which should be met to complete the sequence
89 */
93 */
90 until(predicate: (value: T) => boolean): Observable<T>;
94 until(predicate: (value: T) => boolean): Observable<T>;
91
95
92 /** Keeps the sequence running while elements satisfy the condition.
96 /** Keeps the sequence running while elements satisfy the condition.
93 *
97 *
94 * @param predicate The condition which should be met to continue.
98 * @param predicate The condition which should be met to continue.
95 */
99 */
96 while(predicate: (value: T) => boolean): Observable<T>;
100 while(predicate: (value: T) => boolean): Observable<T>;
97
101
98 /** Applies accumulator to each value in the sequence and
102 /** Applies accumulator to each value in the sequence and
99 * emits the accumulated value for each source element
103 * emits the accumulated value for each source element
100 *
104 *
101 * @param accumulator
105 * @param accumulator
102 * @param initial
106 * @param initial
103 */
107 */
104 scan<A>(accumulator: AccumulatorFn<T, A>, initial: A): Observable<A>;
108 scan<A>(accumulator: AccumulatorFn<T, A>, initial: A): Observable<A>;
105 scan(accumulator: AccumulatorFn<T, T>): Observable<T>;
109 scan(accumulator: AccumulatorFn<T, T>): Observable<T>;
106
110
107 /** Applies accumulator to each value in the sequence and
111 /** Applies accumulator to each value in the sequence and
108 * emits the accumulated value at the end of the sequence
112 * emits the accumulated value at the end of the sequence
109 *
113 *
110 * @param accumulator
114 * @param accumulator
111 * @param initial
115 * @param initial
112 */
116 */
113 reduce<A>(accumulator: AccumulatorFn<T, A>, initial: A): Observable<A>;
117 reduce<A>(accumulator: AccumulatorFn<T, A>, initial: A): Observable<A>;
114 reduce(accumulator: AccumulatorFn<T, T>): Observable<T>;
118 reduce(accumulator: AccumulatorFn<T, T>): Observable<T>;
115
119
116 /** Concatenates the specified sequences with this observable
120 /** Concatenates the specified sequences with this observable
117 *
121 *
118 * @param seq sequences to concatenate with the current observable
122 * @param seq sequences to concatenate with the current observable
119 *
123 *
120 * The concatenation doesn't accumulate values from the specified sequences,
124 * The concatenation doesn't accumulate values from the specified sequences,
121 * The result of the concatenation is the new observable which will switch
125 * The result of the concatenation is the new observable which will switch
122 * to the next observable after the previous one completes. Values emitted
126 * to the next observable after the previous one completes. Values emitted
123 * before the next observable being active are lost.
127 * before the next observable being active are lost.
124 */
128 */
125 cat(...seq: Subscribable<T>[]): Observable<T>;
129 cat(...seq: Subscribable<T>[]): Observable<T>;
126
130
127
131
128 /** Pipes the specified operator to produce the new observable
132 /** Pipes the specified operator to produce the new observable
129 * @param op The operator consumes this observable and produces a new one
133 * @param op The operator consumes this observable and produces a new one
130 *
134 *
131 * The operator is a higher order function which takes a source observable
135 * The operator is a higher order function which takes a source observable
132 * and returns a producer for the new observable.
136 * and returns a producer for the new observable.
133 *
137 *
134 * This function can be used to create a complex mapping between source and
138 * This function can be used to create a complex mapping between source and
135 * resulting observables. The operator may have a state (or a side effect)
139 * resulting observables. The operator may have a state (or a side effect)
136 * and can be connected to multiple observables.
140 * and can be connected to multiple observables.
137 */
141 */
138 pipe<U>(op: OperatorFn<T, U>): Observable<U>;
142 pipe<U>(op: OperatorFn<T, U>): Observable<U>;
139
143
140 /** Waits for the next event to occur and returns a promise for the next value
144 /** Waits for the next event to occur and returns a promise for the next value
141 * @param ct Cancellation token
145 * @param ct Cancellation token
142 */
146 */
143 next(ct?: ICancellation): Promise<T>;
147 next(ct?: ICancellation): Promise<T>;
144
148
145 /** Collects items of the sequence to the array. */
149 /** Collects items of the sequence to the array. */
146 collect(ct?: ICancellation): Promise<T[]>;
150 collect(ct?: ICancellation): Promise<T[]>;
147 }
151 }
148
152
149 const noop = () => { };
153 const noop = () => { };
150
154
155 const errorFallback = (e: unknown) => trace.error("Unhandled observable error: {0}", e);
156
151 const sink = <T>(consumer: Observer<T>) => {
157 const sink = <T>(consumer: Observer<T>) => {
152 const { next, error, complete } = consumer;
158 const { next, error, complete } = consumer;
153 return {
159 return {
154 next: next ? next.bind(consumer) : noop,
160 next: next ? next.bind(consumer) : noop,
155 error: error ? error.bind(consumer) : noop,
161 error: error ? error.bind(consumer) : errorFallback,
156 complete: complete ? complete.bind(consumer) : noop,
162 complete: complete ? complete.bind(consumer) : noop,
157 isClosed: () => false
163 isClosed: () => false
158 };
164 };
159 };
165 };
160
166
161 /** Wraps the producer to handle tear down logic and subscription management
167 /** Wraps the producer to handle tear down logic and subscription management
162 *
168 *
163 * @param producer The producer to wrap
169 * @param producer The producer to wrap
164 * @returns The wrapper producer
170 * @returns The wrapper producer
165 */
171 */
166 const fuse = <T>(producer: Producer<T>) => ({ next, error, complete }: Sink<T>) => {
172 const fuse = <T>(producer: Producer<T>) => ({ next, error, complete }: Sink<T>) => {
167 let done = false;
173 let done = false;
168 let cleanup = noop;
174 let cleanup = noop;
169
175
170 const _fin = <A extends unknown[]>(fn: (...args: A) => void) =>
176 const _fin = <A extends unknown[]>(fn: (...args: A) => void) =>
171 (...args: A) => done ?
177 (...args: A) => done ?
172 void (0) :
178 void (0) :
173 (done = true, cleanup(), fn(...args));
179 (done = true, cleanup(), fn(...args));
174
180
175 const _fin0 = () => done ? void (0) : (done = true, cleanup());
181 const _fin0 = () => done ? void (0) : (done = true, cleanup());
176
182
177 const safeSink = {
183 const safeSink = {
178 next: (value: T) => { !done && next(value); },
184 next: (value: T) => { !done && next(value); },
179 error: _fin(error),
185 error: _fin(error),
180 complete: _fin(complete),
186 complete: _fin(complete),
181 isClosed: () => done
187 isClosed: () => done
182 };
188 };
183 cleanup = producer(safeSink) ?? noop;
189 cleanup = producer(safeSink) ?? noop;
184 return done ? cleanup() : _fin0;
190 return done ? cleanup() : _fin0;
185 };
191 };
186
192
187 const _observe = <T>(producer: Producer<T>): Observable<T> => ({
193 const _observe = <T>(producer: Producer<T>): Observable<T> => ({
188 subscribe: (consumer: Observer<T>) => ({
194 subscribe: (consumer: Observer<T>) => ({
189 unsubscribe: producer(sink(consumer)) ?? noop
195 unsubscribe: producer(sink(consumer)) ?? noop
190 }),
196 }),
191
197
192 map: (mapper) => _observe(({ next, ...rest }) =>
198 map: (mapper) => _observe(({ next, ...rest }) =>
193 producer({
199 producer({
194 next: next !== noop ? (v: T) => next(mapper(v)) : noop,
200 next: next !== noop ? (v: T) => next(mapper(v)) : noop,
195 ...rest
201 ...rest
196 })
202 })
197 ),
203 ),
198
204
199 filter: (predicate) => _observe(({ next, ...rest }) =>
205 filter: (predicate) => _observe(({ next, ...rest }) =>
200 producer({
206 producer({
201 next: next !== noop ? (v: T) => predicate(v) ? next(v) : void (0) : noop,
207 next: next !== noop ? (v: T) => predicate(v) ? next(v) : void (0) : noop,
202 ...rest
208 ...rest
203 })
209 })
204 ),
210 ),
205
211
206 until: predicate => _observe(({ next, complete, ...rest }) =>
212 until: predicate => _observe(({ next, complete, ...rest }) =>
207 producer({
213 producer({
208 next: v => predicate(v) ? complete() : next(v),
214 next: v => predicate(v) ? complete() : next(v),
209 complete,
215 complete,
210 ...rest
216 ...rest
211 })
217 })
212 ),
218 ),
213
219
214 while: predicate => _observe(({ next, complete, ...rest }) =>
220 while: predicate => _observe(({ next, complete, ...rest }) =>
215 producer({
221 producer({
216 next: v => predicate(v) ? next(v) : complete(),
222 next: v => predicate(v) ? next(v) : complete(),
217 complete,
223 complete,
218 ...rest
224 ...rest
219 })
225 })
220 ),
226 ),
221
227
222 scan: <A>(...args: [AccumulatorFn<T, A>, A] | [AccumulatorFn<T, T>]) => _observe<T | A>(({ next, ...rest }) => {
228 scan: <A>(...args: [AccumulatorFn<T, A>, A] | [AccumulatorFn<T, T>]) => _observe<T | A>(({ next, ...rest }) => {
223 if (args.length === 1) {
229 if (args.length === 1) {
224 const [accumulator] = args;
230 const [accumulator] = args;
225 let _acc: T;
231 let _acc: T;
226 let index = 0;
232 let index = 0;
227 return producer({
233 return producer({
228 next: next !== noop ? (v: T) => next(index++ === 0 ? _acc = v : _acc = accumulator(_acc, v)) : noop,
234 next: next !== noop ? (v: T) => next(index++ === 0 ? _acc = v : _acc = accumulator(_acc, v)) : noop,
229 ...rest
235 ...rest
230 });
236 });
231 } else {
237 } else {
232 const [accumulator, initial] = args;
238 const [accumulator, initial] = args;
233 let _acc = initial;
239 let _acc = initial;
234 return producer({
240 return producer({
235 next: next !== noop ? (v: T) => next(_acc = accumulator(_acc, v)) : noop,
241 next: next !== noop ? (v: T) => next(_acc = accumulator(_acc, v)) : noop,
236 ...rest
242 ...rest
237 });
243 });
238 }
244 }
239 }),
245 }),
240
246
241 reduce: <A>(...args: [AccumulatorFn<T, A>, A] | [AccumulatorFn<T, T>]) => _observe<T | A>(({ next, complete, error, ...rest }) => {
247 reduce: <A>(...args: [AccumulatorFn<T, A>, A] | [AccumulatorFn<T, T>]) => _observe<T | A>(({ next, complete, error, ...rest }) => {
242 if (args.length === 1) {
248 if (args.length === 1) {
243 const [accumulator] = args;
249 const [accumulator] = args;
244 let _acc: T;
250 let _acc: T;
245 let index = 0;
251 let index = 0;
246 return producer({
252 return producer({
247 next: next !== noop ? (v: T) => {
253 next: next !== noop ? (v: T) => {
248 _acc = index++ === 0 ? v : accumulator(_acc, v);
254 _acc = index++ === 0 ? v : accumulator(_acc, v);
249 } : noop,
255 } : noop,
250 complete: () => {
256 complete: () => {
251 if (index === 0) {
257 if (index === 0) {
252 error(new Error("The sequence can't be empty"));
258 error(new Error("The sequence can't be empty"));
253 } else {
259 } else {
254 next(_acc);
260 next(_acc);
255 complete();
261 complete();
256 }
262 }
257 },
263 },
258 error,
264 error,
259 ...rest
265 ...rest
260 });
266 });
261 } else {
267 } else {
262 const [accumulator, initial] = args;
268 const [accumulator, initial] = args;
263 let _acc = initial;
269 let _acc = initial;
264 return producer({
270 return producer({
265 next: next !== noop ? (v: T) => {
271 next: next !== noop ? (v: T) => {
266 _acc = accumulator(_acc, v);
272 _acc = accumulator(_acc, v);
267 } : noop,
273 } : noop,
268 complete: () => {
274 complete: () => {
269 next(_acc);
275 next(_acc);
270 complete();
276 complete();
271 },
277 },
272 error,
278 error,
273 ...rest
279 ...rest
274 });
280 });
275 }
281 }
276 }),
282 }),
277
283
278 cat: (...seq) => _observe(({ next, complete: final, ...rest }) => {
284 cat: (...seq) => _observe(({ next, complete: final, ...rest }) => {
279 let cleanup: () => void;
285 let cleanup: () => void;
280 const len = seq.length;
286 const len = seq.length;
281 const complete = (i: number) => i < len ?
287 const complete = (i: number) => i < len ?
282 () => {
288 () => {
283 const subscription = seq[i].subscribe({ next, complete: complete(i + 1), ...rest });
289 const subscription = seq[i].subscribe({ next, complete: complete(i + 1), ...rest });
284 cleanup = subscription.unsubscribe.bind(subscription);
290 cleanup = subscription.unsubscribe.bind(subscription);
285 } : final;
291 } : final;
286
292
287 cleanup = producer({ next, complete: complete(0), ...rest }) ?? noop;
293 cleanup = producer({ next, complete: complete(0), ...rest }) ?? noop;
288
294
289 return () => cleanup();
295 return () => cleanup();
290 }),
296 }),
291
297
292 pipe: <U>(op: OperatorFn<T, U>) => op(_observe(producer)),
298 pipe: <U>(op: OperatorFn<T, U>) => op(_observe(producer)),
293
299
294 next: collect(
300 next: collect(
295 producer,
301 producer,
296 ({ next, complete, error, isClosed }) => ({
302 ({ next, complete, error, isClosed }) => ({
297 next: v => (next(v), complete()),
303 next: v => (next(v), complete()),
298 complete: () => error(new Error("The sequence is empty")),
304 complete: () => error(new Error("The sequence is empty")),
299 error,
305 error,
300 isClosed
306 isClosed
301 })
307 })
302 ),
308 ),
303
309
304 collect: collect(
310 collect: collect(
305 producer,
311 producer,
306 ({ next, complete, ...rest }) => {
312 ({ next, complete, ...rest }) => {
307 const data: T[] = [];
313 const data: T[] = [];
308 return {
314 return {
309 next: v => data.push(v),
315 next: v => data.push(v),
310 complete: () => (next(data), complete()),
316 complete: () => (next(data), complete()),
311 ...rest
317 ...rest
312 };
318 };
313 }
319 }
314 )
320 )
315 });
321 });
316
322
317 const collect = <T, U>(
323 const collect = <T, U>(
318 producer: Producer<T>,
324 producer: Producer<T>,
319 collector: (result: Sink<U>) => Sink<T>
325 collector: (result: Sink<U>) => Sink<T>
320 ) => (ct = Cancellation.none) => new Promise<U>((resolve, reject) => {
326 ) => (ct = Cancellation.none) => new Promise<U>((resolve, reject) => {
321 const fused = fuse<U>(({ next, complete, error, isClosed }) => {
327 const fused = fuse<U>(({ next, complete, error, isClosed }) => {
322 const h = ct.register(error);
328 const h = ct.register(error);
323 const cleanup = !isClosed() ?
329 const cleanup = !isClosed() ?
324 producer(collector({ next, complete, error, isClosed })) ?? noop :
330 producer(collector({ next, complete, error, isClosed })) ?? noop :
325 noop;
331 noop;
326
332
327 return () => {
333 return () => {
328 h.destroy();
334 h.destroy();
329 cleanup();
335 cleanup();
330 };
336 };
331 });
337 });
332
338
333 fused({
339 fused({
334 next: resolve,
340 next: resolve,
335 error: reject,
341 error: reject,
336 complete: noop,
342 complete: noop,
337 isClosed: () => false
343 isClosed: () => false
338 });
344 });
339 });
345 });
340
346
341 export const observe = <T>(producer: Producer<T>) => _observe(fuse(producer));
347 export const observe = <T>(producer: Producer<T>) => _observe(fuse(producer));
342
348
343 /** Converts an array to the observable sequence of its elements. */
349 /** Converts an array to the observable sequence of its elements. */
344 export const ofArray = <T>(items: T[]) => _observe<T>(
350 export const ofArray = <T>(items: T[]) => _observe<T>(
345 ({ next, complete }) => (
351 ({ next, complete }) => (
346 items.forEach(next),
352 items.forEach(next),
347 complete()
353 complete()
348 )
354 )
349 );
355 );
350
356
351 /** Converts a subscribable to the observable */
357 /** Converts a subscribable to the observable */
352 export const ofSubscribable = <T>(subscribable: Subscribable<T>) =>
358 export const ofSubscribable = <T>(subscribable: Subscribable<T>) =>
353 observe(sink => {
359 observe(sink => {
354 const subscription = subscribable.subscribe(sink);
360 const subscription = subscribable.subscribe(sink);
355 return () => subscription.unsubscribe();
361 return () => subscription.unsubscribe();
356 });
362 });
357
363
358 const of1 = <T>(item: T | PromiseLike<T>) => observe<T>(
364 const of1 = <T>(item: T | PromiseLike<T>) => observe<T>(
359 ({ next, error, complete }) =>
365 ({ next, error, complete }) =>
360 isPromise(item) ?
366 isPromise(item) ?
361 void item.then(
367 void item.then(
362 v => (next(v), complete()),
368 v => (next(v), complete()),
363 error
369 error
364 ) :
370 ) :
365 (next(item), complete())
371 (next(item), complete())
366 );
372 );
367
373
368 /** Converts a list of parameter values to the observable sequence. The
374 /** Converts a list of parameter values to the observable sequence. The
369 * order of elements in the list will be preserved in the resulting sequence.
375 * order of elements in the list will be preserved in the resulting sequence.
370 */
376 */
371 export const of = <T>(...items: (T | PromiseLike<T>)[]) => items.length === 1 ?
377 export const of = <T>(...items: (T | PromiseLike<T>)[]) => items.length === 1 ?
372 of1(items[0]) :
378 of1(items[0]) :
373 observe<T>(
379 observe<T>(
374 ({ next, error, complete, isClosed }) => {
380 ({ next, error, complete, isClosed }) => {
375 const n = items.length;
381 const n = items.length;
376
382
377 const _next = (start: number) => {
383 const _next = (start: number) => {
378 if (start > 0 && isClosed()) // when resumed
384 if (start > 0 && isClosed()) // when resumed
379 return;
385 return;
380
386
381 for (let i = start; i < n; i++) {
387 for (let i = start; i < n; i++) {
382 const r = items[i];
388 const r = items[i];
383 if (isPromise(r)) {
389 if (isPromise(r)) {
384 r.then(v => (next(v), _next(i + 1)), error);
390 r.then(v => (next(v), _next(i + 1)), error);
385 return; // suspend
391 return; // suspend
386 } else {
392 } else {
387 next(r);
393 next(r);
388 }
394 }
389 }
395 }
390 complete();
396 complete();
391 };
397 };
392
398
393 _next(0);
399 _next(0);
394 }
400 }
395 );
401 );
396
402
397 export const empty = _observe<never>(({ complete }) => complete());
403 export const empty = _observe<never>(({ complete }) => complete());
@@ -1,123 +1,125
1 import { PromiseOrValue } from "@implab/core-amd/interfaces";
1 import { PromiseOrValue } from "@implab/core-amd/interfaces";
2 import { isCancellable, isPromise } from "@implab/core-amd/safe";
2 import { isCancellable, isPromise } from "@implab/core-amd/safe";
3 import { observe, Observable, empty } from "./observable";
3 import { observe, Observable, empty } from "./observable";
4 import { after } from "dojo/aspect";
4 import { after } from "dojo/aspect";
5 import { subject } from "./operators/subject";
6
5
7 export interface OrderedUpdate<T> {
6 export interface OrderedUpdate<T> {
8 /** The item is being updated */
7 /** The item is being updated */
9 readonly item: T;
8 readonly item: T;
10
9
11 /** The previous index of the item, -1 in case it is inserted */
10 /** The previous index of the item, -1 in case it is inserted */
12 readonly prevIndex: number;
11 readonly prevIndex: number;
13
12
14 /** The new index of the item, -1 in case it is deleted */
13 /** The new index of the item, -1 in case it is deleted */
15 readonly newIndex: number;
14 readonly newIndex: number;
16
15
17 }
16 }
18
17
19 export type QueryResults<T> = Observable<OrderedUpdate<T>>;
18 export type QueryResults<T> = Observable<OrderedUpdate<T>>;
20
19
21 interface DjObservableResults<T> {
20 interface DjObservableResults<T> {
22 /**
21 /**
23 * Allows observation of results
22 * Allows observation of results
24 */
23 */
25 observe(listener: (object: T, previousIndex: number, newIndex: number) => void, includeUpdates?: boolean): {
24 observe(listener: (object: T, previousIndex: number, newIndex: number) => void, includeUpdates?: boolean): {
26 remove(): void;
25 remove(): void;
27 };
26 };
28 }
27 }
29
28
30 interface Queryable<T, Q, O> {
29 interface Queryable<T, Q, O> {
31 query(query?: Q, options?: O): PromiseOrValue<T[]>;
30 query(query?: Q, options?: O): PromiseOrValue<T[]>;
32 }
31 }
33
32
34 export const isDjObservableResults = <T>(v: object): v is DjObservableResults<T> =>
33 export const isDjObservableResults = <T>(v: object): v is DjObservableResults<T> =>
35 v && (typeof (v as { observe?: unknown; }).observe === "function");
34 v && (typeof (v as { observe?: unknown; }).observe === "function");
36
35
37 export const query = <T, Q, O>(store: Queryable<T, Q, O>, includeUpdates = true) =>
36 export const query = <T, Q, O>(store: Queryable<T, Q, O>, includeUpdates = true) =>
38 (query?: Q, options?: O & { observe?: boolean }) => {
37 (query?: Q, options?: O & { observe?: boolean }) => {
39 return observe<OrderedUpdate<T>>(({ next, complete, error, isClosed }) => {
38 return observe<OrderedUpdate<T>>(({ next, complete, error, isClosed }) => {
40
39
41 const processResults = (items: T[]) =>
40 const processResults = (items: T[]) =>
42 items.forEach((item, newIndex) => next({ item, newIndex, prevIndex: -1 }));
41 items.forEach((item, newIndex) => next({ item, newIndex, prevIndex: -1 }));
43
42
44 try {
43 try {
45 const results = store.query(query, options);
44 const results = store.query(query, options);
46 if (isPromise(results)) {
45 if (isPromise(results)) {
47 results.then(processResults).then(undefined, error);
46 results.then(processResults).then(undefined, error);
48 } else {
47 } else {
49 processResults(results);
48 processResults(results);
50 }
49 }
51
50
52 if (!isClosed() && (options?.observe !== false) && isDjObservableResults<T>(results)) {
51 if (!isClosed() && (options?.observe !== false) && isDjObservableResults<T>(results)) {
53 const h = results.observe((item, prevIndex, newIndex) => next({ item, prevIndex, newIndex }), includeUpdates);
52 const h = results.observe((item, prevIndex, newIndex) => next({ item, prevIndex, newIndex }), includeUpdates);
54 return () => h.remove();
53 return () => h.remove();
55 } else {
54 } else {
56 complete();
55 complete();
57 }
56 }
58 } catch (err) {
57 } catch (err) {
59 error(err);
58 error(err);
60 }
59 }
61 });
60 });
62
61
63 };
62 };
64
63
65 interface IndexedStore<T> {
64 interface IndexedStore<T> {
66 get(id: string | number): PromiseLike<T> | T | null | undefined;
65 get(id: string | number): PromiseLike<T> | T | null | undefined;
67 }
66 }
68
67
69 interface Notifications<T> {
68 interface Notifications<T> {
70 notify(item: T | undefined, id: string | number | undefined): void;
69 notify(item: T | undefined, id: string | number | undefined): void;
71 }
70 }
72
71
73 const hasNotifications = <T>(x: unknown): x is Notifications<T> =>
72 const hasNotifications = <T>(x: unknown): x is Notifications<T> =>
74 typeof x === "object" && x !== null && (typeof (x as Notifications<T>).notify === "function");
73 typeof x === "object" && x !== null && (typeof (x as Notifications<T>).notify === "function");
75
74
76 interface GetOpts {
75 interface GetOpts {
77 observe?: boolean;
76 observe?: boolean;
78 }
77 }
79
78
80 type Change<T> = [item: NonNullable<T>, id: string | number | undefined] |
79 type Change<T> = [item: NonNullable<T>, id: string | number | undefined] |
81 [item: undefined | null, id: string | number];
80 [item: undefined | null, id: string | number];
82
81
83 const filterItem = (itemId: string | number) =>
82 const filterItem = (itemId: string | number) =>
84 <T>(source: Observable<Change<T>>) =>
83 <T>(source: Observable<Change<T>>) =>
85 observe<T>(({ next, complete, error }) => {
84 observe<T>(({ next, complete, error }) => {
86 const subscription = source
85 const subscription = source
87 .filter(([, id]) => id === itemId)
86 .filter(([, id]) => id === itemId)
88 .subscribe({
87 .subscribe({
89 next: ([item]) => item !== null && item !== undefined ? next(item) : complete(),
88 next: ([item]) => item !== null && item !== undefined ? next(item) : complete(),
90 complete,
89 complete,
91 error
90 error
92 });
91 });
93 return () => subscription.unsubscribe();
92 return () => subscription.unsubscribe();
94 });
93 });
95
94
96 export const get = <T>(store: IndexedStore<T>) => {
95 export const get = <T>(store: IndexedStore<T>) => {
97 const changes = hasNotifications<T>(store) ?
96 const changes = hasNotifications<T>(store) ?
98 observe<Change<T>>(({ next }) => {
97 observe<Change<T>>(({ next }) => {
99 const handle = after(store, "notify", (...args: Change<T>) => next(args), true);
98 const handle = after(store, "notify", (...args: Change<T>) => next(args), true);
100 return () => handle.remove();
99 return () => handle.remove();
101 }).pipe(subject) : empty;
100 }) : empty;
102
103
101
104 return (id: string | number, opts: GetOpts = {}) =>
102 return (id: string | number, opts: GetOpts = {}) =>
105 observe<T>(({ next, complete, error }) => {
103 observe<T>(({ next, complete, error }) => {
106 const result = store.get(id);
104 try {
105 const result = store.get(id);
107
106
108 const handle = (x: T | null | undefined) => {
107 const handle = (x: T | null | undefined) => {
109 if (x !== null && x !== undefined)
108 if (x !== null && x !== undefined)
110 next(x);
109 next(x);
111 complete();
110 complete();
112 };
111 };
113
112
114 if (isPromise(result)) {
113 if (isPromise(result)) {
115 result.then(handle, error);
114 result.then(handle).then(undefined, error);
116
115
117 if (isCancellable(result))
116 if (isCancellable(result))
118 return () => result.cancel();
117 return () => result.cancel();
119 } else {
118 } else {
120 handle(result);
119 handle(result);
120 }
121 } catch (e) {
122 error(e);
121 }
123 }
122 }).cat(opts.observe !== false ? changes.pipe(filterItem(id)) : empty);
124 }).cat(opts.observe !== false ? changes.pipe(filterItem(id)) : empty);
123 }; No newline at end of file
125 };
General Comments 0
You need to be logged in to leave comments. Login now