##// END OF EJS Templates
Merge
cin -
r148:63f3ad8e6cff merge v1.10.0 default
parent child
Show More
@@ -1,54 +1,55
1 1 fc9f82c082ef432137da086a1fe9c37a12ba16a2 v1.0.0-rc2
2 2 6d80d7901b4c8ffe8728e4a7bf5f4b7e7a669bb5 v1.0.0-rc3
3 3 5a2c44d8e1f34dd30c2b50f92b7dc2e8f3247c43 v1.0.0-rc5
4 4 6c01fabe9ea9fb5e753fbeae8b0d2664e7072a66 v1.0.0-rc6
5 5 9e546fe36fdddc8324f1098ee950fa1a7ba19b93 v1.0.0-rc7
6 6 8f4d5e2c719a20ae6d65f1f4b5e2141ed765e975 v1.0.0-rc8
7 7 a1ab2b5975ad4b19599fb61538e7aaf329fb528c v1.0.0-rc10
8 8 9b77ac3bf8f200876450ad50c308a9441a7f39c7 v1.0.0-rc11
9 9 32b72f33756d3d10553b743f0b9f4504148cf97d v1.0.0-rc12
10 10 b88fac0e76c0e61e397e2995f468f7cf342afbc9 v1.0.0-rc13
11 11 a46488b209e8aac583c1634043147d87740c63b4 v1.0.0-rc14
12 12 1174538197f6796384e643f62100292f1377b137 v1.0.0-rc15
13 13 e8012fdf09ae442094f3831abe70649f8520659e 1.0.0-rc16
14 14 a1a1ef050ecc9d2a780e308cbffc2ad915a5c13d 1.0.0-rc17
15 15 5c6c7e16919cff4019a55661725789d287439b75 v1.0.0-rc18
16 16 3b6c4159c66cecf6c5957d33ad474919608489c6 v1.0.0
17 17 18383b2dcc1ae97fe3673242c84fe8d4b50a55f0 v1.0.1
18 18 ed3c20c09b000386b5204b483955eb61ee662eff v1.0.2
19 19 030ea350f98bb7a06e636251ea5cad403217e868 v1.0.3
20 20 346ba910a5425ed95f7a67ef4db1dbbf599e2cab v1.0.5
21 21 deb0ed6fb68015d89ba1287f6b766d1f80b6e4ff v1.0.7
22 22 dd0d589acfbbe938a5630547e83ad934ab597b64 v1.0.8
23 23 f2499237b5bf0ac2d73a8e437b8f15fa8a6306a5 v1.0.9
24 24 d4f0cdae9577caa605d9317d0ceb93f7b67895f3 v1.0.10
25 25 1a0018655d1c3dafc16ae50971cb535b8555e246 v1.1.0
26 26 ff3695b0a48f00155ad6c3d7e000fe185ee6f95a v1.1.1
27 27 c276b9b7fa833242d1bf75eeeaae9924feee51e8 v1.1.2
28 28 fdde09e66c009f4950ec69a393a8e789725da758 v1.2.0
29 29 16678c6055f20ce1a49bdca5e6dabce88ee8deed v1.2.1
30 30 bc7556143fe536e3df374bc0070146311884284e v1.2.2
31 31 e5bb5e80ce96fc4ca0fbbf0b5390443a616e99fa v1.2.3
32 32 cc5be30e84f8a0d9a8e25447651576d9e46ab154 v1.2.4
33 33 2807ab11174c7446830d91d5f1b652a18c6ecae5 v1.2.5
34 34 35a7b6319ebe24973fe10b8d82c19d3d86857b4e v1.2.6
35 35 70058deb750dc18fcd9be83c28cb8371530fffd8 v1.2.7
36 36 367f8caa5bf8fd2d4a56a6cff40be31ab82a2727 v1.2.8
37 37 15c829aa08a2cf65cc074640199e79c5e49d2c93 v1.3.0
38 38 1a190b3a757dc650c2c612073cbffaaa720b6090 v1.4.0
39 39 2ccfaae984e9a458580664923c87f258cff9890f v1.4.4
40 40 e9a9ed6d7647848e9c7f28e6e9f92d009259607d v1.5.0
41 41 aac297dda27dac1e86bc93617ba172a2c92415f5 v1.6.0
42 42 e07418577cbcaa6e31ccbf87c4495324ecbd88bd v1.6.1
43 43 bc1b4dd8ca1a03af095ea1a61169c6025ef37fce v1.6.2
44 44 fb2ea4d6aabac2b374f3b2bfddaca6a83dedf34f v1.6.3
45 45 cede47727a1b0867d19be1d168aceb00b5e74e3f v1.7.0
46 46 8095aad89415099d8e1942a8e67050c22e111d8b v1.7.1
47 47 66546e70973248de01601aa364eadf11dd2f3a9f v1.8.0
48 48 c7d9ad82b374c2a20710eba8fa40a9b1472ddc77 v1.8.1
49 49 f139e2153e0da5649558b1ed9369da860d442c0d v1.9.0-rc1
50 50 435ce00ba2452d565f1ed3bb884643b4b88b9cde v1.9.0-rc2
51 51 98b2c550c676ff6acf16f5bd56ee3804342f80b7 v1.9.0-rc3
52 52 515d1b83ebdfaa93599451175055a94f711c079f v1.9.0-rc4
53 53 894b8239b953c0384cf32cab46cf41dac97ea03b v1.9.0-rc5
54 63215d91ae4b711eeb2145b9685240e1afada904 v1.9.0-rc6
54 55 af4f8424e83d56e89a64f39e19514ca10dbd43c6 v1.9.0
@@ -1,411 +1,429
1 1 import { id as mid} from "module";
2 2 import { Cancellation } from "@implab/core-amd/Cancellation";
3 3 import { ICancellation } from "@implab/core-amd/interfaces";
4 4 import { TraceSource } from "@implab/core-amd/log/TraceSource";
5 5 import { isPromise } from "@implab/core-amd/safe";
6 6
7 7 const trace = TraceSource.get(mid);
8 8
9 9 /**
10 10 * The interface for the consumer of an observable sequence
11 11 */
12 12 export interface Observer<T> {
13 13 /**
14 14 * Called for the next element in the sequence
15 15 */
16 16 next?(value: T): void;
17 17
18 18 /**
19 19 * Called once when the error occurs in the sequence.
20 20 */
21 21 error?(e: unknown): void;
22 22
23 23 /**
24 24 * Called once at the end of the sequence.
25 25 */
26 26 complete?(): void;
27 27 }
28 28
29 29 /**
30 30 * The group of functions to feed an observable. These methods are provided to
31 31 * the producer to generate a stream of events.
32 32 */
33 33 export type Sink<T> = {
34 34 /**
35 35 * Call to send the next element in the sequence
36 36 */
37 37 next: (value: T) => void;
38 38
39 39 /**
40 40 * Call to notify about the error occurred in the sequence.
41 41 */
42 42 error: (e: unknown) => void;
43 43
44 44 /**
45 45 * Call to signal the end of the sequence.
46 46 */
47 47 complete: () => void;
48 48
49 49 /**
50 50 * Checks whether the sink is accepting new elements. It's safe to
51 51 * send elements to the closed sink.
52 52 */
53 53 isClosed: () => boolean;
54 54 };
55 55
56 56 export type Producer<T> = (sink: Sink<T>) => (void | (() => void));
57 57
58 58 type FusedSink<T> = Omit<Sink<T>, "isClosed">;
59 59
60 60 type FusedProducer<T> = (sink: FusedSink<T>) => (void | (() => void));
61 61
62 62 export interface Unsubscribable {
63 63 unsubscribe(): void;
64 64 }
65 65
66 66 export const isUnsubscribable = (v: unknown): v is Unsubscribable =>
67 67 v !== null && v !== undefined && typeof (v as Unsubscribable).unsubscribe === "function";
68 68
69 69 export const isSubscribable = <T = unknown>(v: unknown): v is Subscribable<T> =>
70 70 v !== null && v !== undefined && typeof (v as Subscribable<unknown>).subscribe === "function";
71 71
72 72 export interface Subscribable<T> {
73 subscribe(consumer: Observer<T>): Unsubscribable;
73 /** Subscribes a consumer to events. If a consumer isn't specified
74 * this method activates the producer to achieve side affects if any.
75 */
76 subscribe(consumer?: Observer<T>): Unsubscribable;
74 77 }
75 78
76 79 export type AccumulatorFn<T, A> = (acc: A, value: T) => A;
77 80
78 81 export type OperatorFn<T, U> = (source: Observable<T>) => Observable<U>;
79 82
80 83 /** The observable source of items. */
81 84 export interface Observable<T> extends Subscribable<T> {
82 85 /** Transforms elements of the sequence with the specified mapper
83 86 *
84 87 * @param mapper The mapper used to transform the values
85 88 */
86 89 map<T2>(mapper: (value: T) => T2): Observable<T2>;
87 90
91 /** Injects the specified observer into the each producer to consumer chain.
92 * The method is used to add side effect to the events processing.
93 *
94 * @param observer The consumer for the events
95 */
96 tap(observer: Observer<T>): Observable<T>;
97
88 98 /** Filters elements of the sequence. The resulting sequence will
89 99 * contain only elements which match the specified predicate.
90 100 *
91 101 * @param predicate The filter predicate.
92 102 */
93 103 filter(predicate: (value: T) => boolean): Observable<T>;
94 104
95 105 /** Completes the sequence once the condition is met.
96 106 * @param predicate The condition which should be met to complete the sequence
97 107 */
98 108 until(predicate: (value: T) => boolean): Observable<T>;
99 109
100 110 /** Keeps the sequence running while elements satisfy the condition.
101 111 *
102 112 * @param predicate The condition which should be met to continue.
103 113 */
104 114 while(predicate: (value: T) => boolean): Observable<T>;
105 115
106 116 /** Applies accumulator to each value in the sequence and
107 117 * emits the accumulated value for each source element
108 118 *
109 119 * @param accumulator
110 120 * @param initial
111 121 */
112 122 scan<A>(accumulator: AccumulatorFn<T, A>, initial: A): Observable<A>;
113 123 scan(accumulator: AccumulatorFn<T, T>): Observable<T>;
114 124
115 125 /** Applies accumulator to each value in the sequence and
116 126 * emits the accumulated value at the end of the sequence
117 127 *
118 128 * @param accumulator
119 129 * @param initial
120 130 */
121 131 reduce<A>(accumulator: AccumulatorFn<T, A>, initial: A): Observable<A>;
122 132 reduce(accumulator: AccumulatorFn<T, T>): Observable<T>;
123 133
124 134 /** Concatenates the specified sequences with this observable
125 135 *
126 136 * @param seq sequences to concatenate with the current observable
127 137 *
128 138 * The concatenation doesn't accumulate values from the specified sequences,
129 139 * The result of the concatenation is the new observable which will switch
130 140 * to the next observable after the previous one completes. Values emitted
131 141 * before the next observable being active are lost.
132 142 */
133 143 cat(...seq: Subscribable<T>[]): Observable<T>;
134 144
135 145
136 146 /** Pipes the specified operator to produce the new observable
137 147 * @param op The operator consumes this observable and produces a new one
138 148 *
139 149 * The operator is a higher order function which takes a source observable
140 150 * and returns a producer for the new observable.
141 151 *
142 152 * This function can be used to create a complex mapping between source and
143 153 * resulting observables. The operator may have a state (or a side effect)
144 154 * and can be connected to multiple observables.
145 155 */
146 156 pipe<U>(op: OperatorFn<T, U>): Observable<U>;
147 157
148 158 /** Waits for the next event to occur and returns a promise for the next value
149 159 * @param ct Cancellation token
150 160 */
151 161 next(ct?: ICancellation): Promise<T>;
152 162
153 163 /** Collects items of the sequence to the array. */
154 164 collect(ct?: ICancellation): Promise<T[]>;
155 165 }
156 166
157 167 const noop = () => { };
158 168
159 169 const errorFallback = (e: unknown) => trace.error("Unhandled observable error: {0}", e);
160 170
161 171 const sink = <T>(consumer: Observer<T>) => {
162 172 // eslint-disable-next-line @typescript-eslint/unbound-method
163 173 const { next, error, complete } = consumer;
164 174 return {
165 175 next: next ? next.bind(consumer) : noop,
166 176 error: error ? error.bind(consumer) : errorFallback, // report unhandled errors
167 177 complete: complete ? complete.bind(consumer) : noop
168 178 };
169 179 };
170 180
171 181 /** Wraps the producer to handle tear down logic and subscription management
172 182 *
173 183 * The resulting producer will invoke cleanup logic on error or complete events
174 184 * and will prevent calling of any method from the sink.
175 185 *
176 186 * @param producer The producer to wrap
177 187 * @returns The wrapper producer
178 188 */
179 189 const fuse = <T>(producer: Producer<T>) => ({ next, error, complete }: FusedSink<T>) => {
180 190 let done = false;
181 191 let cleanup = noop;
182 192
183 193 const _fin = <A extends unknown[]>(fn: (...args: A) => void) =>
184 194 (...args: A) => done ?
185 195 void (0) :
186 196 (done = true, cleanup(), fn(...args));
187 197
188 198 const _fin0 = () => done ? void (0) : (done = true, cleanup());
189 199
190 200 const safeSink = {
191 201 next: (value: T) => { !done && next(value); },
192 202 error: _fin(error),
193 203 complete: _fin(complete),
194 204 isClosed: () => done
195 205 };
196 206 // call the producer
197 207 cleanup = producer(safeSink) ?? noop;
198 208 // if the producer throws exception bypass it to the caller rather then to
199 209 // the sink. This is a feature.
200 210
201 211 // if the producer completed the sequence immediately call the cleanup in place
202 212 return done ? cleanup() : _fin0;
203 213 };
204 214
205 215 const _observe = <T>(producer: FusedProducer<T>): Observable<T> => ({
206 subscribe: (consumer: Observer<T>) => ({
216 subscribe: (consumer: Observer<T> = {}) => ({
207 217 unsubscribe: producer(sink(consumer)) ?? noop
208 218 }),
209 219
210 220 map: (mapper) => _observe(({ next, ...rest }) =>
211 221 producer({
212 222 next: next !== noop ? (v: T) => next(mapper(v)) : noop,
213 223 ...rest
214 224 })
215 225 ),
216 226
227 tap: ({next: tapNext, complete: tapComplete, error: tapError}) => _observe(({next,complete, error}) =>
228 producer({
229 next: tapNext ? (v => (tapNext(v), next(v))) : next,
230 complete: tapComplete ? (() => (tapComplete(), complete())): complete,
231 error: tapError ? (e => (tapError(e), error(e))) : error
232 })
233 ),
234
217 235 filter: (predicate) => _observe(({ next, ...rest }) =>
218 236 producer({
219 237 next: next !== noop ? (v: T) => predicate(v) ? next(v) : void (0) : noop,
220 238 ...rest
221 239 })
222 240 ),
223 241
224 242 until: predicate => _observe(({ next, complete, ...rest }) =>
225 243 producer({
226 244 next: v => predicate(v) ? complete() : next(v),
227 245 complete,
228 246 ...rest
229 247 })
230 248 ),
231 249
232 250 while: predicate => _observe(({ next, complete, ...rest }) =>
233 251 producer({
234 252 next: v => predicate(v) ? next(v) : complete(),
235 253 complete,
236 254 ...rest
237 255 })
238 256 ),
239 257
240 258 scan: <A>(...args: [AccumulatorFn<T, A>, A] | [AccumulatorFn<T, T>]) => _observe<T | A>(({ next, ...rest }) => {
241 259 if (args.length === 1) {
242 260 const [accumulator] = args;
243 261 let _acc: T;
244 262 let index = 0;
245 263 return producer({
246 264 next: next !== noop ? (v: T) => next(index++ === 0 ? _acc = v : _acc = accumulator(_acc, v)) : noop,
247 265 ...rest
248 266 });
249 267 } else {
250 268 const [accumulator, initial] = args;
251 269 let _acc = initial;
252 270 return producer({
253 271 next: next !== noop ? (v: T) => next(_acc = accumulator(_acc, v)) : noop,
254 272 ...rest
255 273 });
256 274 }
257 275 }),
258 276
259 277 reduce: <A>(...args: [AccumulatorFn<T, A>, A] | [AccumulatorFn<T, T>]) => _observe<T | A>(({ next, complete, error }) => {
260 278 if (args.length === 1) {
261 279 const [accumulator] = args;
262 280 let _acc: T;
263 281 let index = 0;
264 282 return producer({
265 283 next: next !== noop ? (v: T) => {
266 284 _acc = index++ === 0 ? v : accumulator(_acc, v);
267 285 } : noop,
268 286 complete: () => {
269 287 if (index === 0) {
270 288 error(new Error("The sequence can't be empty"));
271 289 } else {
272 290 next(_acc);
273 291 complete();
274 292 }
275 293 },
276 294 error
277 295 });
278 296 } else {
279 297 const [accumulator, initial] = args;
280 298 let _acc = initial;
281 299 return producer({
282 300 next: next !== noop ? (v: T) => {
283 301 _acc = accumulator(_acc, v);
284 302 } : noop,
285 303 complete: () => {
286 304 next(_acc);
287 305 complete();
288 306 },
289 307 error
290 308 });
291 309 }
292 310 }),
293 311
294 312 cat: (...seq) => _observe(({ next, complete: final, ...rest }) => {
295 313 let cleanup: () => void;
296 314 const len = seq.length;
297 315 const complete = (i: number) => i < len ?
298 316 () => {
299 317 const subscription = seq[i].subscribe({ next, complete: complete(i + 1), ...rest });
300 318 cleanup = subscription.unsubscribe.bind(subscription);
301 319 } : final;
302 320
303 321 cleanup = producer({ next, complete: complete(0), ...rest }) ?? noop;
304 322
305 323 return () => cleanup();
306 324 }),
307 325
308 326 pipe: <U>(op: OperatorFn<T, U>) => op(_observe(producer)),
309 327
310 328 next: collect(
311 329 producer,
312 330 ({ next, complete, error }) => ({
313 331 next: v => (next(v), complete()),
314 332 complete: () => error(new Error("The sequence is empty")),
315 333 error
316 334 })
317 335 ),
318 336
319 337 collect: collect(
320 338 producer,
321 339 ({ next, complete, error}) => {
322 340 const data: T[] = [];
323 341 return {
324 342 next: v => data.push(v),
325 343 complete: () => (next(data), complete()),
326 344 error
327 345 };
328 346 }
329 347 )
330 348 });
331 349
332 350 const collect = <T, U>(
333 351 producer: FusedProducer<T>,
334 352 collector: (result: FusedSink<U>) => FusedSink<T>
335 353 ) => (ct = Cancellation.none) => new Promise<U>((resolve, reject) => {
336 354 const fused = fuse<U>(({ next, complete, error, isClosed }) => {
337 355 const h = ct.register(error);
338 356 const cleanup = !isClosed() ?
339 357 producer(collector({ next, complete, error })) ?? noop :
340 358 noop;
341 359
342 360 return () => {
343 361 h.destroy();
344 362 cleanup();
345 363 };
346 364 });
347 365
348 366 fused({
349 367 next: resolve,
350 368 error: reject,
351 369 complete: noop
352 370 });
353 371 });
354 372
355 373 export const observe = <T>(producer: Producer<T>) => _observe(fuse(producer));
356 374
357 375 /** Converts an array to the observable sequence of its elements. */
358 376 export const ofArray = <T>(items: T[]) => _observe<T>(
359 377 ({ next, complete }) => (
360 378 items.forEach(next),
361 379 complete()
362 380 )
363 381 );
364 382
365 383 /** Converts a subscribable to the observable */
366 384 export const ofSubscribable = <T>(subscribable: Subscribable<T>) =>
367 385 observe<T>(sink => {
368 386 const subscription = subscribable.subscribe(sink);
369 387 return () => subscription.unsubscribe();
370 388 });
371 389
372 390 const of1 = <T>(item: T | PromiseLike<T>) => observe<T>(
373 391 ({ next, error, complete }) =>
374 392 isPromise(item) ?
375 393 void item.then(
376 394 v => (next(v), complete()),
377 395 error
378 396 ) :
379 397 (next(item), complete())
380 398 );
381 399
382 400 /** Converts a list of parameter values to the observable sequence. The
383 401 * order of elements in the list will be preserved in the resulting sequence.
384 402 */
385 403 export const of = <T>(...items: (T | PromiseLike<T>)[]) => items.length === 1 ?
386 404 of1(items[0]) :
387 405 observe<T>(
388 406 ({ next, error, complete, isClosed }) => {
389 407 const n = items.length;
390 408
391 409 const _next = (start: number) => {
392 410 if (start > 0 && isClosed()) // when resumed
393 411 return;
394 412
395 413 for (let i = start; i < n; i++) {
396 414 const r = items[i];
397 415 if (isPromise(r)) {
398 416 r.then(v => (next(v), _next(i + 1)), error);
399 417 return; // suspend
400 418 } else {
401 419 next(r);
402 420 }
403 421 }
404 422 complete();
405 423 };
406 424
407 425 _next(0);
408 426 }
409 427 );
410 428
411 429 export const empty = _observe<never>(({ complete }) => complete());
@@ -1,125 +1,152
1 1 import { PromiseOrValue } from "@implab/core-amd/interfaces";
2 2 import { isCancellable, isPromise } from "@implab/core-amd/safe";
3 3 import { observe, Observable, empty } from "./observable";
4 4 import { after } from "dojo/aspect";
5 5
6 6 export interface OrderedUpdate<T> {
7 7 /** The item is being updated */
8 8 readonly item: T;
9 9
10 10 /** The previous index of the item, -1 in case it is inserted */
11 11 readonly prevIndex: number;
12 12
13 13 /** The new index of the item, -1 in case it is deleted */
14 14 readonly newIndex: number;
15 15
16 16 }
17 17
18 18 export type QueryResults<T> = Observable<OrderedUpdate<T>>;
19 19
20 20 interface DjObservableResults<T> {
21 21 /**
22 22 * Allows observation of results
23 23 */
24 24 observe(listener: (object: T, previousIndex: number, newIndex: number) => void, includeUpdates?: boolean): {
25 25 remove(): void;
26 26 };
27 27 }
28 28
29 29 interface Queryable<T, Q, O> {
30 30 query(query?: Q, options?: O): PromiseOrValue<T[]>;
31 31 }
32 32
33 33 export const isDjObservableResults = <T>(v: object): v is DjObservableResults<T> =>
34 34 v && (typeof (v as { observe?: unknown; }).observe === "function");
35 35
36 export const query = <T, Q, O>(store: Queryable<T, Q, O>, includeUpdates = true) =>
37 (query?: Q, options?: O & { observe?: boolean }) => {
38 return observe<OrderedUpdate<T>>(({ next, complete, error, isClosed }) => {
36 export const query = <T, Q, O>(store: Queryable<T, Q, O>, includeUpdates = true) => {
37 const q = queryEx(store, includeUpdates);
38 return (query?: Q, options?: O & { observe?: boolean }) => {
39 const [data, updates] = q(query, options);
40
41 return options?.observe === false ? data : data.cat(updates);
42 };
43 };
39 44
45 export const queryEx = <T, Q, O>(store: Queryable<T, Q, O>, includeUpdates = true) =>
46 (query?: Q, options?: O): [data: QueryResults<T>, updates: QueryResults<T>] => {
47
48 const pending: T[] = [];
49
50 let results: PromiseOrValue<T[]> = pending;
51
52 const data = observe<OrderedUpdate<T>>(({ next, complete, error }) => {
40 53 const processResults = (items: T[]) =>
41 54 items.forEach((item, newIndex) => next({ item, newIndex, prevIndex: -1 }));
42 55
43 56 try {
44 const results = store.query(query, options);
57 if (results === pending)
58 results = store.query(query, options);
59
45 60 if (isPromise(results)) {
46 results.then(processResults).then(undefined, error);
61 results.then(processResults).then(complete, error);
62
63 if (isCancellable(results))
64 return results.cancel.bind(results);
47 65 } else {
48 66 processResults(results);
67 complete();
49 68 }
69 } catch (e) {
70 error(e);
71 }
72 });
50 73
51 if (!isClosed() && (options?.observe !== false) && isDjObservableResults<T>(results)) {
74 const updates = observe<OrderedUpdate<T>>(({ next, complete, error, isClosed }) => {
75 try {
76 if (!isClosed() && isDjObservableResults<T>(results)) {
52 77 const h = results.observe((item, prevIndex, newIndex) => next({ item, prevIndex, newIndex }), includeUpdates);
53 78 return () => h.remove();
54 79 } else {
55 80 complete();
56 81 }
57 } catch (err) {
58 error(err);
82 } catch (e) {
83 error(e);
59 84 }
60 85 });
61 86
87 return [data, updates];
62 88 };
63 89
90
64 91 interface IndexedStore<T> {
65 92 get(id: string | number): PromiseLike<T> | T | null | undefined;
66 93 }
67 94
68 95 interface Notifications<T> {
69 96 notify(item: T | undefined, id: string | number | undefined): void;
70 97 }
71 98
72 99 const hasNotifications = <T>(x: unknown): x is Notifications<T> =>
73 100 typeof x === "object" && x !== null && (typeof (x as Notifications<T>).notify === "function");
74 101
75 102 interface GetOpts {
76 103 observe?: boolean;
77 104 }
78 105
79 type Change<T> = [item: NonNullable<T>, id: string | number | undefined] |
106 export type ItemUpdate<T> = [item: NonNullable<T>, id: string | number | undefined] |
80 107 [item: undefined | null, id: string | number];
81 108
82 109 const filterItem = (itemId: string | number) =>
83 <T>(source: Observable<Change<T>>) =>
110 <T>(source: Observable<ItemUpdate<T>>) =>
84 111 observe<T>(({ next, complete, error }) => {
85 112 const subscription = source
86 113 .filter(([, id]) => id === itemId)
87 114 .subscribe({
88 115 next: ([item]) => item !== null && item !== undefined ? next(item) : complete(),
89 116 complete,
90 117 error
91 118 });
92 119 return () => subscription.unsubscribe();
93 120 });
94 121
95 122 export const get = <T>(store: IndexedStore<T>) => {
96 123 const changes = hasNotifications<T>(store) ?
97 observe<Change<T>>(({ next }) => {
98 const handle = after(store, "notify", (...args: Change<T>) => next(args), true);
124 observe<ItemUpdate<T>>(({ next }) => {
125 const handle = after(store, "notify", (...args: ItemUpdate<T>) => next(args), true);
99 126 return () => handle.remove();
100 127 }) : empty;
101 128
102 129 return (id: string | number, opts: GetOpts = {}) =>
103 130 observe<T>(({ next, complete, error }) => {
104 131 try {
105 132 const result = store.get(id);
106 133
107 134 const handle = (x: T | null | undefined) => {
108 135 if (x !== null && x !== undefined)
109 136 next(x);
110 137 complete();
111 138 };
112 139
113 140 if (isPromise(result)) {
114 141 result.then(handle).then(undefined, error);
115 142
116 143 if (isCancellable(result))
117 144 return () => result.cancel();
118 145 } else {
119 146 handle(result);
120 147 }
121 148 } catch (e) {
122 149 error(e);
123 150 }
124 151 }).cat(opts.observe !== false ? changes.pipe(filterItem(id)) : empty);
125 152 }; No newline at end of file
@@ -1,65 +1,89
1 1 import Memory = require("dojo/store/Memory");
2 2 import Observerable = require("dojo/store/Observable");
3 import { get } from "./store";
3 import { get, queryEx } from "./store";
4 4 import tap = require("tap");
5 5
6 6 interface Person {
7 7 id: string;
8 8
9 9 name: string;
10 10
11 11 age: number;
12 12 }
13 13
14 14 tap.test("store::get(...) tests", async t => {
15 15 const store = new Observerable(new Memory<Person>());
16 16
17 17 const getPerson = get(store);
18 18
19 19 const peterId = "id:peter";
20 20
21 21 const samId = "id:sam";
22 22
23 23 const peter = getPerson(peterId);
24 24 const sam = getPerson(samId);
25 25
26 26 const seq1 = await getPerson(peterId, { observe: false }).collect();
27 27
28 28 t.ok(seq1.length === 0, "Should be empty sequence");
29 29
30 30 let peterChangeCount = 0;
31 31 let samChangeCount = 0;
32 32 let peterDeleted = 0;
33 33
34 34 const peterSubscription = peter.subscribe({
35 35 next: () => peterChangeCount++,
36 36 complete: () => peterDeleted++
37 37 });
38 38 const samSubscription = sam.subscribe({
39 39 next: () => samChangeCount++
40 40 });
41 41
42 42 try {
43 43 t.equal(peterChangeCount, 0, "Should be no changes recorded");
44 44
45 45 store.put({id: peterId, name: "Peter", age: 30 });
46 46
47 47 t.equal(peterChangeCount, 1, "Should record 1 object change");
48 48 t.equal(samChangeCount, 0, "Should not record other object changes");
49 49
50 50 store.remove(peterId);
51 51
52 52 t.equal(peterDeleted, 1, "Should complete sequence");
53 53 t.equal(peterChangeCount, 1, "Should not record remove operations");
54 54
55 55 store.put({id: peterId, name: "Peter", age: 29});
56 56
57 57 t.equal(peterChangeCount, 1, "Should not record changes after completion");
58 58
59 59 } finally {
60 60 peterSubscription.unsubscribe();
61 61 samSubscription.unsubscribe();
62 62 }
63 63
64 store.put({ id: samId, name: "Sam", age: 29});
65
66 const [data, updates] = queryEx(store)({ age: 29}, { sort: [{attribute: "id"}] });
67
68 const dump: string[] = [];
69
70 const subscription = data
71 .tap({
72 complete: () => dump.push("eof")
73 })
74 .cat(updates)
75 .tap({
76 next: ({item: {id}}) => dump.push(id),
77 complete: () => dump.push("eof")
78 })
79 .subscribe({});
80
81 t.same(dump, ["id:peter", "id:sam", "eof"]);
82
83 store.put({ id: "id:mary", name: "Mary", age: 29});
84
85 t.same(dump, ["id:peter", "id:sam", "eof", "id:mary"]);
86
87 subscription.unsubscribe();
64 88
65 89 }).catch(() => { }); No newline at end of file
@@ -1,150 +1,173
1 1 import { empty, observe, of } from "./observable";
2 2 import * as tap from "tap";
3 3 import { Cancellation } from "@implab/core-amd/Cancellation";
4 4 import { delay } from "@implab/core-amd/safe";
5 5
6 6 const subj1 = observe<number>(({ next, complete }) => {
7 7 next(1);
8 8 complete();
9 9 next(2);
10 10 });
11 11
12 12 const consumer1 = {
13 13 sum: 0,
14 14 next(v: number) {
15 15 this.sum += v;
16 16 }
17 17 };
18 18
19 19 subj1.subscribe(consumer1);
20 20 tap.equal(consumer1.sum, 1, "Should get only one value");
21 21
22 22 subj1.subscribe(consumer1);
23 23 tap.equal(consumer1.sum, 2, "Should get the value again");
24 24
25 25 const consumer2 = {
26 26 value: 0,
27 27 completed: false,
28 28 next(v: number) { this.value = v; },
29 29 complete() { this.completed = true; }
30 30 };
31 31
32 32 let maps = 0;
33 33
34 34 subj1
35 35 .map(v => {
36 36 tap.comment(`map1: ${v * 2}`);
37 37 maps++;
38 38 return v * 2;
39 39 })
40 40 .map(v => {
41 41 tap.comment(`map2: ${v * 2}`);
42 42 maps++;
43 43 return v * 2;
44 44 })
45 45 .map(v => {
46 46 tap.comment(`map3: ${v * 2}`);
47 47 maps++;
48 48 return v * 2;
49 49 })
50 50 .subscribe(consumer2);
51 51
52 52 tap.equal(consumer2.value, 8, "Should map");
53 53 tap.equal(maps, 3, "The map chain should not be executed after completion");
54 54 tap.ok(consumer2.completed, "The completion signal should pass through");
55 55
56 56 const subj2 = observe<number>(({ next, complete }) => {
57 57 [1, 2, 3, 4, 5].forEach(next);
58 58 complete();
59 59 return () => {
60 60 tap.comment("subj2: unsubscribe");
61 61 };
62 62 });
63 63
64 64 const consumer3 = {
65 65 even: 0,
66 66 odd: 0,
67 67 completed: false,
68 68 subscribed: 0,
69 69 unsubscribed: 0,
70 70 next(v: "even" | "odd") {
71 71 this[v]++;
72 72 },
73 73 complete() {
74 74 this.completed = true;
75 75 },
76 76 subscribe() {
77 77 this.subscribed++;
78 78 },
79 79 unsubscribe() {
80 80 this.unsubscribed++;
81 81 }
82 82 };
83 83
84 84
85 85 const subj3 = subj2.pipe<"even" | "odd">(self => observe(({ next, complete, error }) => {
86 86 consumer3.subscribe();
87 87 let count = 0;
88 88 const h = self.subscribe({
89 89 next: val => {
90 90 if (val % 2 === 0)
91 91 next("odd");
92 92 else
93 93 next("even");
94 94 if (++count === 4)
95 95 complete();
96 96 },
97 97 complete,
98 98 error
99 99 });
100 100 return () => {
101 101 consumer3.unsubscribe();
102 102 h.unsubscribe();
103 103 };
104 104 }));
105 105
106 106 subj3.subscribe(consumer3);
107 107
108 108 tap.equal(consumer3.odd, 2, "Should get 2 odd elements");
109 109 tap.equal(consumer3.even, 2, "Should get 2 even elements");
110 110 tap.ok(consumer3.completed, "The sequence should completed");
111 111 tap.equal(consumer3.subscribed, 1, "The subscription should be done once");
112 112 tap.equal(consumer3.unsubscribed, 1, "The cleanup should be done after completion");
113 113
114 114 subj2.reduce((a, b) => a + b).subscribe({
115 115 next: val => tap.comment("subj2: reduce =", val),
116 116 complete: () => tap.comment("subj2: complete")
117 117 });
118 118
119 119 tap.test("of(...) tests", async t => {
120 120 await subj2.reduce((a, b) => a + b).next()
121 121 .then(value => t.comment("subj2: next reduce=", value));
122 122
123 123 await subj2.next().then(val => t.equal(val, 1, "Should peek the first element"));
124 124
125 125 const cancelled = new Cancellation(cancel => cancel());
126 126 await t.rejects(subj2.next(cancelled), "Cancelled next() method should fail");
127 127
128 128 await t.rejects(empty.next(), "Empty sequence should fail to get next element");
129 129
130 130 await of(delay(1).then(() => 1), Promise.resolve(2), 3)
131 131 .reduce<number[]>((a, x) => [...a, x], [])
132 132 .next()
133 133 .then(res => t.same(res, [1, 2, 3], "of(...) should keep the order"));
134 134
135 135 const rejected = Promise.reject("DIE!");
136 136 rejected.catch(() => { }); // SAFE AND SOUND
137 137
138 138 await t.resolves(
139 139 of(Promise.resolve(1), rejected).next(),
140 140 "of(...) should emit non-rejected items"
141 141 );
142 142 await t.rejects(
143 143 of(1, Promise.reject("DIE!")).reduce((a) => a).next(),
144 144 "of(...) should terminate with error when a parameter is rejected"
145 145 );
146 146
147 147 t.same(await of(1,2,3).collect(), [1,2,3], ".collect() should return the collected sequence");
148 148 await t.rejects(of(1,2,3).collect(cancelled), ".collect() should support cancellation");
149 149
150 }).catch(() => { });
151
152 tap.test(".tap() tests", async t => {
153 const side: number[] = [];
154
155 of(1,2)
156 .tap({next: v => side.push(v), complete: () => side.push(0)})
157 .tap({next: v => side.push(v*v)})
158 .subscribe({});
159
160 t.same(side, [1,1,2,4,0], ".tap() should be called in the order of registration");
161
162 side.length = 0;
163
164 await new Promise<void>(resolve => {
165 of(1,2,delay(1).then(() => 3))
166 .tap({next: v => side.push(v)})
167 .tap({ next: v => v === 1 && resolve()})
168 .subscribe({});
169 });
170
171 t.same(side, [1,2], ".tap() should be processed synchronously");
172
150 173 }).catch(() => { }); No newline at end of file
General Comments 0
You need to be logged in to leave comments. Login now