##// END OF EJS Templates
Added a fallback error handler to observables, it will report unhandled errors
cin -
r138:98b2c550c676 v1.9.0-rc3 default
parent child
Show More
@@ -1,397 +1,403
1 import { id as mid} from "module";
1 2 import { Cancellation } from "@implab/core-amd/Cancellation";
2 3 import { ICancellation } from "@implab/core-amd/interfaces";
4 import { TraceSource } from "@implab/core-amd/log/TraceSource";
3 5 import { isPromise } from "@implab/core-amd/safe";
4 6
7 const trace = TraceSource.get(mid);
8
5 9 /**
6 10 * The interface for the consumer of an observable sequence
7 11 */
8 12 export interface Observer<T> {
9 13 /**
10 14 * Called for the next element in the sequence
11 15 */
12 16 next?: (value: T) => void;
13 17
14 18 /**
15 19 * Called once when the error occurs in the sequence.
16 20 */
17 21 error?: (e: unknown) => void;
18 22
19 23 /**
20 24 * Called once at the end of the sequence.
21 25 */
22 26 complete?: () => void;
23 27 }
24 28
25 29 /**
26 30 * The group of functions to feed an observable. These methods are provided to
27 31 * the producer to generate a stream of events.
28 32 */
29 33 export type Sink<T> = {
30 34 /**
31 35 * Call to send the next element in the sequence
32 36 */
33 37 next: (value: T) => void;
34 38
35 39 /**
36 40 * Call to notify about the error occurred in the sequence.
37 41 */
38 42 error: (e: unknown) => void;
39 43
40 44 /**
41 45 * Call to signal the end of the sequence.
42 46 */
43 47 complete: () => void;
44 48
45 49 /**
46 50 * Checks whether the sink is accepting new elements. It's safe to
47 51 * send elements to the closed sink.
48 52 */
49 53 isClosed: () => boolean;
50 54 };
51 55
52 56 export type Producer<T> = (sink: Sink<T>) => (void | (() => void));
53 57
54 58 export interface Unsubscribable {
55 59 unsubscribe(): void;
56 60 }
57 61
58 62 export const isUnsubscribable = (v: unknown): v is Unsubscribable =>
59 63 v !== null && v !== undefined && typeof (v as Unsubscribable).unsubscribe === "function";
60 64
61 65 export const isSubscribable = <T = unknown>(v: unknown): v is Subscribable<T> =>
62 66 v !== null && v !== undefined && typeof (v as Subscribable<unknown>).subscribe === "function";
63 67
64 68 export interface Subscribable<T> {
65 69 subscribe(consumer: Observer<T>): Unsubscribable;
66 70 }
67 71
68 72 export type AccumulatorFn<T, A> = (acc: A, value: T) => A;
69 73
70 74 export type OperatorFn<T, U> = (source: Observable<T>) => Observable<U>;
71 75
72 76 /** The observable source of items. */
73 77 export interface Observable<T> extends Subscribable<T> {
74 78 /** Transforms elements of the sequence with the specified mapper
75 79 *
76 80 * @param mapper The mapper used to transform the values
77 81 */
78 82 map<T2>(mapper: (value: T) => T2): Observable<T2>;
79 83
80 84 /** Filters elements of the sequence. The resulting sequence will
81 85 * contain only elements which match the specified predicate.
82 86 *
83 87 * @param predicate The filter predicate.
84 88 */
85 89 filter(predicate: (value: T) => boolean): Observable<T>;
86 90
87 91 /** Completes the sequence once the condition is met.
88 92 * @param predicate The condition which should be met to complete the sequence
89 93 */
90 94 until(predicate: (value: T) => boolean): Observable<T>;
91 95
92 96 /** Keeps the sequence running while elements satisfy the condition.
93 97 *
94 98 * @param predicate The condition which should be met to continue.
95 99 */
96 100 while(predicate: (value: T) => boolean): Observable<T>;
97 101
98 102 /** Applies accumulator to each value in the sequence and
99 103 * emits the accumulated value for each source element
100 104 *
101 105 * @param accumulator
102 106 * @param initial
103 107 */
104 108 scan<A>(accumulator: AccumulatorFn<T, A>, initial: A): Observable<A>;
105 109 scan(accumulator: AccumulatorFn<T, T>): Observable<T>;
106 110
107 111 /** Applies accumulator to each value in the sequence and
108 112 * emits the accumulated value at the end of the sequence
109 113 *
110 114 * @param accumulator
111 115 * @param initial
112 116 */
113 117 reduce<A>(accumulator: AccumulatorFn<T, A>, initial: A): Observable<A>;
114 118 reduce(accumulator: AccumulatorFn<T, T>): Observable<T>;
115 119
116 120 /** Concatenates the specified sequences with this observable
117 121 *
118 122 * @param seq sequences to concatenate with the current observable
119 123 *
120 124 * The concatenation doesn't accumulate values from the specified sequences,
121 125 * The result of the concatenation is the new observable which will switch
122 126 * to the next observable after the previous one completes. Values emitted
123 127 * before the next observable being active are lost.
124 128 */
125 129 cat(...seq: Subscribable<T>[]): Observable<T>;
126 130
127 131
128 132 /** Pipes the specified operator to produce the new observable
129 133 * @param op The operator consumes this observable and produces a new one
130 134 *
131 135 * The operator is a higher order function which takes a source observable
132 136 * and returns a producer for the new observable.
133 137 *
134 138 * This function can be used to create a complex mapping between source and
135 139 * resulting observables. The operator may have a state (or a side effect)
136 140 * and can be connected to multiple observables.
137 141 */
138 142 pipe<U>(op: OperatorFn<T, U>): Observable<U>;
139 143
140 144 /** Waits for the next event to occur and returns a promise for the next value
141 145 * @param ct Cancellation token
142 146 */
143 147 next(ct?: ICancellation): Promise<T>;
144 148
145 149 /** Collects items of the sequence to the array. */
146 150 collect(ct?: ICancellation): Promise<T[]>;
147 151 }
148 152
149 153 const noop = () => { };
150 154
155 const errorFallback = (e: unknown) => trace.error("Unhandled observable error: {0}", e);
156
151 157 const sink = <T>(consumer: Observer<T>) => {
152 158 const { next, error, complete } = consumer;
153 159 return {
154 160 next: next ? next.bind(consumer) : noop,
155 error: error ? error.bind(consumer) : noop,
161 error: error ? error.bind(consumer) : errorFallback,
156 162 complete: complete ? complete.bind(consumer) : noop,
157 163 isClosed: () => false
158 164 };
159 165 };
160 166
161 167 /** Wraps the producer to handle tear down logic and subscription management
162 168 *
163 169 * @param producer The producer to wrap
164 170 * @returns The wrapper producer
165 171 */
166 172 const fuse = <T>(producer: Producer<T>) => ({ next, error, complete }: Sink<T>) => {
167 173 let done = false;
168 174 let cleanup = noop;
169 175
170 176 const _fin = <A extends unknown[]>(fn: (...args: A) => void) =>
171 177 (...args: A) => done ?
172 178 void (0) :
173 179 (done = true, cleanup(), fn(...args));
174 180
175 181 const _fin0 = () => done ? void (0) : (done = true, cleanup());
176 182
177 183 const safeSink = {
178 184 next: (value: T) => { !done && next(value); },
179 185 error: _fin(error),
180 186 complete: _fin(complete),
181 187 isClosed: () => done
182 188 };
183 189 cleanup = producer(safeSink) ?? noop;
184 190 return done ? cleanup() : _fin0;
185 191 };
186 192
187 193 const _observe = <T>(producer: Producer<T>): Observable<T> => ({
188 194 subscribe: (consumer: Observer<T>) => ({
189 195 unsubscribe: producer(sink(consumer)) ?? noop
190 196 }),
191 197
192 198 map: (mapper) => _observe(({ next, ...rest }) =>
193 199 producer({
194 200 next: next !== noop ? (v: T) => next(mapper(v)) : noop,
195 201 ...rest
196 202 })
197 203 ),
198 204
199 205 filter: (predicate) => _observe(({ next, ...rest }) =>
200 206 producer({
201 207 next: next !== noop ? (v: T) => predicate(v) ? next(v) : void (0) : noop,
202 208 ...rest
203 209 })
204 210 ),
205 211
206 212 until: predicate => _observe(({ next, complete, ...rest }) =>
207 213 producer({
208 214 next: v => predicate(v) ? complete() : next(v),
209 215 complete,
210 216 ...rest
211 217 })
212 218 ),
213 219
214 220 while: predicate => _observe(({ next, complete, ...rest }) =>
215 221 producer({
216 222 next: v => predicate(v) ? next(v) : complete(),
217 223 complete,
218 224 ...rest
219 225 })
220 226 ),
221 227
222 228 scan: <A>(...args: [AccumulatorFn<T, A>, A] | [AccumulatorFn<T, T>]) => _observe<T | A>(({ next, ...rest }) => {
223 229 if (args.length === 1) {
224 230 const [accumulator] = args;
225 231 let _acc: T;
226 232 let index = 0;
227 233 return producer({
228 234 next: next !== noop ? (v: T) => next(index++ === 0 ? _acc = v : _acc = accumulator(_acc, v)) : noop,
229 235 ...rest
230 236 });
231 237 } else {
232 238 const [accumulator, initial] = args;
233 239 let _acc = initial;
234 240 return producer({
235 241 next: next !== noop ? (v: T) => next(_acc = accumulator(_acc, v)) : noop,
236 242 ...rest
237 243 });
238 244 }
239 245 }),
240 246
241 247 reduce: <A>(...args: [AccumulatorFn<T, A>, A] | [AccumulatorFn<T, T>]) => _observe<T | A>(({ next, complete, error, ...rest }) => {
242 248 if (args.length === 1) {
243 249 const [accumulator] = args;
244 250 let _acc: T;
245 251 let index = 0;
246 252 return producer({
247 253 next: next !== noop ? (v: T) => {
248 254 _acc = index++ === 0 ? v : accumulator(_acc, v);
249 255 } : noop,
250 256 complete: () => {
251 257 if (index === 0) {
252 258 error(new Error("The sequence can't be empty"));
253 259 } else {
254 260 next(_acc);
255 261 complete();
256 262 }
257 263 },
258 264 error,
259 265 ...rest
260 266 });
261 267 } else {
262 268 const [accumulator, initial] = args;
263 269 let _acc = initial;
264 270 return producer({
265 271 next: next !== noop ? (v: T) => {
266 272 _acc = accumulator(_acc, v);
267 273 } : noop,
268 274 complete: () => {
269 275 next(_acc);
270 276 complete();
271 277 },
272 278 error,
273 279 ...rest
274 280 });
275 281 }
276 282 }),
277 283
278 284 cat: (...seq) => _observe(({ next, complete: final, ...rest }) => {
279 285 let cleanup: () => void;
280 286 const len = seq.length;
281 287 const complete = (i: number) => i < len ?
282 288 () => {
283 289 const subscription = seq[i].subscribe({ next, complete: complete(i + 1), ...rest });
284 290 cleanup = subscription.unsubscribe.bind(subscription);
285 291 } : final;
286 292
287 293 cleanup = producer({ next, complete: complete(0), ...rest }) ?? noop;
288 294
289 295 return () => cleanup();
290 296 }),
291 297
292 298 pipe: <U>(op: OperatorFn<T, U>) => op(_observe(producer)),
293 299
294 300 next: collect(
295 301 producer,
296 302 ({ next, complete, error, isClosed }) => ({
297 303 next: v => (next(v), complete()),
298 304 complete: () => error(new Error("The sequence is empty")),
299 305 error,
300 306 isClosed
301 307 })
302 308 ),
303 309
304 310 collect: collect(
305 311 producer,
306 312 ({ next, complete, ...rest }) => {
307 313 const data: T[] = [];
308 314 return {
309 315 next: v => data.push(v),
310 316 complete: () => (next(data), complete()),
311 317 ...rest
312 318 };
313 319 }
314 320 )
315 321 });
316 322
317 323 const collect = <T, U>(
318 324 producer: Producer<T>,
319 325 collector: (result: Sink<U>) => Sink<T>
320 326 ) => (ct = Cancellation.none) => new Promise<U>((resolve, reject) => {
321 327 const fused = fuse<U>(({ next, complete, error, isClosed }) => {
322 328 const h = ct.register(error);
323 329 const cleanup = !isClosed() ?
324 330 producer(collector({ next, complete, error, isClosed })) ?? noop :
325 331 noop;
326 332
327 333 return () => {
328 334 h.destroy();
329 335 cleanup();
330 336 };
331 337 });
332 338
333 339 fused({
334 340 next: resolve,
335 341 error: reject,
336 342 complete: noop,
337 343 isClosed: () => false
338 344 });
339 345 });
340 346
341 347 export const observe = <T>(producer: Producer<T>) => _observe(fuse(producer));
342 348
343 349 /** Converts an array to the observable sequence of its elements. */
344 350 export const ofArray = <T>(items: T[]) => _observe<T>(
345 351 ({ next, complete }) => (
346 352 items.forEach(next),
347 353 complete()
348 354 )
349 355 );
350 356
351 357 /** Converts a subscribable to the observable */
352 358 export const ofSubscribable = <T>(subscribable: Subscribable<T>) =>
353 359 observe(sink => {
354 360 const subscription = subscribable.subscribe(sink);
355 361 return () => subscription.unsubscribe();
356 362 });
357 363
358 364 const of1 = <T>(item: T | PromiseLike<T>) => observe<T>(
359 365 ({ next, error, complete }) =>
360 366 isPromise(item) ?
361 367 void item.then(
362 368 v => (next(v), complete()),
363 369 error
364 370 ) :
365 371 (next(item), complete())
366 372 );
367 373
368 374 /** Converts a list of parameter values to the observable sequence. The
369 375 * order of elements in the list will be preserved in the resulting sequence.
370 376 */
371 377 export const of = <T>(...items: (T | PromiseLike<T>)[]) => items.length === 1 ?
372 378 of1(items[0]) :
373 379 observe<T>(
374 380 ({ next, error, complete, isClosed }) => {
375 381 const n = items.length;
376 382
377 383 const _next = (start: number) => {
378 384 if (start > 0 && isClosed()) // when resumed
379 385 return;
380 386
381 387 for (let i = start; i < n; i++) {
382 388 const r = items[i];
383 389 if (isPromise(r)) {
384 390 r.then(v => (next(v), _next(i + 1)), error);
385 391 return; // suspend
386 392 } else {
387 393 next(r);
388 394 }
389 395 }
390 396 complete();
391 397 };
392 398
393 399 _next(0);
394 400 }
395 401 );
396 402
397 403 export const empty = _observe<never>(({ complete }) => complete());
@@ -1,123 +1,125
1 1 import { PromiseOrValue } from "@implab/core-amd/interfaces";
2 2 import { isCancellable, isPromise } from "@implab/core-amd/safe";
3 3 import { observe, Observable, empty } from "./observable";
4 4 import { after } from "dojo/aspect";
5 import { subject } from "./operators/subject";
6 5
7 6 export interface OrderedUpdate<T> {
8 7 /** The item is being updated */
9 8 readonly item: T;
10 9
11 10 /** The previous index of the item, -1 in case it is inserted */
12 11 readonly prevIndex: number;
13 12
14 13 /** The new index of the item, -1 in case it is deleted */
15 14 readonly newIndex: number;
16 15
17 16 }
18 17
19 18 export type QueryResults<T> = Observable<OrderedUpdate<T>>;
20 19
21 20 interface DjObservableResults<T> {
22 21 /**
23 22 * Allows observation of results
24 23 */
25 24 observe(listener: (object: T, previousIndex: number, newIndex: number) => void, includeUpdates?: boolean): {
26 25 remove(): void;
27 26 };
28 27 }
29 28
30 29 interface Queryable<T, Q, O> {
31 30 query(query?: Q, options?: O): PromiseOrValue<T[]>;
32 31 }
33 32
34 33 export const isDjObservableResults = <T>(v: object): v is DjObservableResults<T> =>
35 34 v && (typeof (v as { observe?: unknown; }).observe === "function");
36 35
37 36 export const query = <T, Q, O>(store: Queryable<T, Q, O>, includeUpdates = true) =>
38 37 (query?: Q, options?: O & { observe?: boolean }) => {
39 38 return observe<OrderedUpdate<T>>(({ next, complete, error, isClosed }) => {
40 39
41 40 const processResults = (items: T[]) =>
42 41 items.forEach((item, newIndex) => next({ item, newIndex, prevIndex: -1 }));
43 42
44 43 try {
45 44 const results = store.query(query, options);
46 45 if (isPromise(results)) {
47 46 results.then(processResults).then(undefined, error);
48 47 } else {
49 48 processResults(results);
50 49 }
51 50
52 51 if (!isClosed() && (options?.observe !== false) && isDjObservableResults<T>(results)) {
53 52 const h = results.observe((item, prevIndex, newIndex) => next({ item, prevIndex, newIndex }), includeUpdates);
54 53 return () => h.remove();
55 54 } else {
56 55 complete();
57 56 }
58 57 } catch (err) {
59 58 error(err);
60 59 }
61 60 });
62 61
63 62 };
64 63
65 64 interface IndexedStore<T> {
66 65 get(id: string | number): PromiseLike<T> | T | null | undefined;
67 66 }
68 67
69 68 interface Notifications<T> {
70 69 notify(item: T | undefined, id: string | number | undefined): void;
71 70 }
72 71
73 72 const hasNotifications = <T>(x: unknown): x is Notifications<T> =>
74 73 typeof x === "object" && x !== null && (typeof (x as Notifications<T>).notify === "function");
75 74
76 75 interface GetOpts {
77 76 observe?: boolean;
78 77 }
79 78
80 79 type Change<T> = [item: NonNullable<T>, id: string | number | undefined] |
81 80 [item: undefined | null, id: string | number];
82 81
83 82 const filterItem = (itemId: string | number) =>
84 83 <T>(source: Observable<Change<T>>) =>
85 84 observe<T>(({ next, complete, error }) => {
86 85 const subscription = source
87 86 .filter(([, id]) => id === itemId)
88 87 .subscribe({
89 88 next: ([item]) => item !== null && item !== undefined ? next(item) : complete(),
90 89 complete,
91 90 error
92 91 });
93 92 return () => subscription.unsubscribe();
94 93 });
95 94
96 95 export const get = <T>(store: IndexedStore<T>) => {
97 96 const changes = hasNotifications<T>(store) ?
98 97 observe<Change<T>>(({ next }) => {
99 98 const handle = after(store, "notify", (...args: Change<T>) => next(args), true);
100 99 return () => handle.remove();
101 }).pipe(subject) : empty;
102
100 }) : empty;
103 101
104 102 return (id: string | number, opts: GetOpts = {}) =>
105 103 observe<T>(({ next, complete, error }) => {
104 try {
106 105 const result = store.get(id);
107 106
108 107 const handle = (x: T | null | undefined) => {
109 108 if (x !== null && x !== undefined)
110 109 next(x);
111 110 complete();
112 111 };
113 112
114 113 if (isPromise(result)) {
115 result.then(handle, error);
114 result.then(handle).then(undefined, error);
116 115
117 116 if (isCancellable(result))
118 117 return () => result.cancel();
119 118 } else {
120 119 handle(result);
121 120 }
121 } catch (e) {
122 error(e);
123 }
122 124 }).cat(opts.observe !== false ? changes.pipe(filterItem(id)) : empty);
123 125 }; No newline at end of file
General Comments 0
You need to be logged in to leave comments. Login now