##// END OF EJS Templates
added tap() method to observable...
cin -
r144:63215d91ae4b v1.9.0-rc6 default
parent child
Show More
@@ -1,411 +1,429
1 1 import { id as mid} from "module";
2 2 import { Cancellation } from "@implab/core-amd/Cancellation";
3 3 import { ICancellation } from "@implab/core-amd/interfaces";
4 4 import { TraceSource } from "@implab/core-amd/log/TraceSource";
5 5 import { isPromise } from "@implab/core-amd/safe";
6 6
7 7 const trace = TraceSource.get(mid);
8 8
9 9 /**
10 10 * The interface for the consumer of an observable sequence
11 11 */
12 12 export interface Observer<T> {
13 13 /**
14 14 * Called for the next element in the sequence
15 15 */
16 16 next?(value: T): void;
17 17
18 18 /**
19 19 * Called once when the error occurs in the sequence.
20 20 */
21 21 error?(e: unknown): void;
22 22
23 23 /**
24 24 * Called once at the end of the sequence.
25 25 */
26 26 complete?(): void;
27 27 }
28 28
29 29 /**
30 30 * The group of functions to feed an observable. These methods are provided to
31 31 * the producer to generate a stream of events.
32 32 */
33 33 export type Sink<T> = {
34 34 /**
35 35 * Call to send the next element in the sequence
36 36 */
37 37 next: (value: T) => void;
38 38
39 39 /**
40 40 * Call to notify about the error occurred in the sequence.
41 41 */
42 42 error: (e: unknown) => void;
43 43
44 44 /**
45 45 * Call to signal the end of the sequence.
46 46 */
47 47 complete: () => void;
48 48
49 49 /**
50 50 * Checks whether the sink is accepting new elements. It's safe to
51 51 * send elements to the closed sink.
52 52 */
53 53 isClosed: () => boolean;
54 54 };
55 55
56 56 export type Producer<T> = (sink: Sink<T>) => (void | (() => void));
57 57
58 58 type FusedSink<T> = Omit<Sink<T>, "isClosed">;
59 59
60 60 type FusedProducer<T> = (sink: FusedSink<T>) => (void | (() => void));
61 61
62 62 export interface Unsubscribable {
63 63 unsubscribe(): void;
64 64 }
65 65
66 66 export const isUnsubscribable = (v: unknown): v is Unsubscribable =>
67 67 v !== null && v !== undefined && typeof (v as Unsubscribable).unsubscribe === "function";
68 68
69 69 export const isSubscribable = <T = unknown>(v: unknown): v is Subscribable<T> =>
70 70 v !== null && v !== undefined && typeof (v as Subscribable<unknown>).subscribe === "function";
71 71
72 72 export interface Subscribable<T> {
73 subscribe(consumer: Observer<T>): Unsubscribable;
73 /** Subscribes a consumer to events. If a consumer isn't specified
74 * this method activates the producer to achieve side affects if any.
75 */
76 subscribe(consumer?: Observer<T>): Unsubscribable;
74 77 }
75 78
76 79 export type AccumulatorFn<T, A> = (acc: A, value: T) => A;
77 80
78 81 export type OperatorFn<T, U> = (source: Observable<T>) => Observable<U>;
79 82
80 83 /** The observable source of items. */
81 84 export interface Observable<T> extends Subscribable<T> {
82 85 /** Transforms elements of the sequence with the specified mapper
83 86 *
84 87 * @param mapper The mapper used to transform the values
85 88 */
86 89 map<T2>(mapper: (value: T) => T2): Observable<T2>;
87 90
91 /** Injects the specified observer into the each producer to consumer chain.
92 * The method is used to add side effect to the events processing.
93 *
94 * @param observer The consumer for the events
95 */
96 tap(observer: Observer<T>): Observable<T>;
97
88 98 /** Filters elements of the sequence. The resulting sequence will
89 99 * contain only elements which match the specified predicate.
90 100 *
91 101 * @param predicate The filter predicate.
92 102 */
93 103 filter(predicate: (value: T) => boolean): Observable<T>;
94 104
95 105 /** Completes the sequence once the condition is met.
96 106 * @param predicate The condition which should be met to complete the sequence
97 107 */
98 108 until(predicate: (value: T) => boolean): Observable<T>;
99 109
100 110 /** Keeps the sequence running while elements satisfy the condition.
101 111 *
102 112 * @param predicate The condition which should be met to continue.
103 113 */
104 114 while(predicate: (value: T) => boolean): Observable<T>;
105 115
106 116 /** Applies accumulator to each value in the sequence and
107 117 * emits the accumulated value for each source element
108 118 *
109 119 * @param accumulator
110 120 * @param initial
111 121 */
112 122 scan<A>(accumulator: AccumulatorFn<T, A>, initial: A): Observable<A>;
113 123 scan(accumulator: AccumulatorFn<T, T>): Observable<T>;
114 124
115 125 /** Applies accumulator to each value in the sequence and
116 126 * emits the accumulated value at the end of the sequence
117 127 *
118 128 * @param accumulator
119 129 * @param initial
120 130 */
121 131 reduce<A>(accumulator: AccumulatorFn<T, A>, initial: A): Observable<A>;
122 132 reduce(accumulator: AccumulatorFn<T, T>): Observable<T>;
123 133
124 134 /** Concatenates the specified sequences with this observable
125 135 *
126 136 * @param seq sequences to concatenate with the current observable
127 137 *
128 138 * The concatenation doesn't accumulate values from the specified sequences,
129 139 * The result of the concatenation is the new observable which will switch
130 140 * to the next observable after the previous one completes. Values emitted
131 141 * before the next observable being active are lost.
132 142 */
133 143 cat(...seq: Subscribable<T>[]): Observable<T>;
134 144
135 145
136 146 /** Pipes the specified operator to produce the new observable
137 147 * @param op The operator consumes this observable and produces a new one
138 148 *
139 149 * The operator is a higher order function which takes a source observable
140 150 * and returns a producer for the new observable.
141 151 *
142 152 * This function can be used to create a complex mapping between source and
143 153 * resulting observables. The operator may have a state (or a side effect)
144 154 * and can be connected to multiple observables.
145 155 */
146 156 pipe<U>(op: OperatorFn<T, U>): Observable<U>;
147 157
148 158 /** Waits for the next event to occur and returns a promise for the next value
149 159 * @param ct Cancellation token
150 160 */
151 161 next(ct?: ICancellation): Promise<T>;
152 162
153 163 /** Collects items of the sequence to the array. */
154 164 collect(ct?: ICancellation): Promise<T[]>;
155 165 }
156 166
157 167 const noop = () => { };
158 168
159 169 const errorFallback = (e: unknown) => trace.error("Unhandled observable error: {0}", e);
160 170
161 171 const sink = <T>(consumer: Observer<T>) => {
162 172 // eslint-disable-next-line @typescript-eslint/unbound-method
163 173 const { next, error, complete } = consumer;
164 174 return {
165 175 next: next ? next.bind(consumer) : noop,
166 176 error: error ? error.bind(consumer) : errorFallback, // report unhandled errors
167 177 complete: complete ? complete.bind(consumer) : noop
168 178 };
169 179 };
170 180
171 181 /** Wraps the producer to handle tear down logic and subscription management
172 182 *
173 183 * The resulting producer will invoke cleanup logic on error or complete events
174 184 * and will prevent calling of any method from the sink.
175 185 *
176 186 * @param producer The producer to wrap
177 187 * @returns The wrapper producer
178 188 */
179 189 const fuse = <T>(producer: Producer<T>) => ({ next, error, complete }: FusedSink<T>) => {
180 190 let done = false;
181 191 let cleanup = noop;
182 192
183 193 const _fin = <A extends unknown[]>(fn: (...args: A) => void) =>
184 194 (...args: A) => done ?
185 195 void (0) :
186 196 (done = true, cleanup(), fn(...args));
187 197
188 198 const _fin0 = () => done ? void (0) : (done = true, cleanup());
189 199
190 200 const safeSink = {
191 201 next: (value: T) => { !done && next(value); },
192 202 error: _fin(error),
193 203 complete: _fin(complete),
194 204 isClosed: () => done
195 205 };
196 206 // call the producer
197 207 cleanup = producer(safeSink) ?? noop;
198 208 // if the producer throws exception bypass it to the caller rather then to
199 209 // the sink. This is a feature.
200 210
201 211 // if the producer completed the sequence immediately call the cleanup in place
202 212 return done ? cleanup() : _fin0;
203 213 };
204 214
205 215 const _observe = <T>(producer: FusedProducer<T>): Observable<T> => ({
206 subscribe: (consumer: Observer<T>) => ({
216 subscribe: (consumer: Observer<T> = {}) => ({
207 217 unsubscribe: producer(sink(consumer)) ?? noop
208 218 }),
209 219
210 220 map: (mapper) => _observe(({ next, ...rest }) =>
211 221 producer({
212 222 next: next !== noop ? (v: T) => next(mapper(v)) : noop,
213 223 ...rest
214 224 })
215 225 ),
216 226
227 tap: ({next: tapNext, complete: tapComplete, error: tapError}) => _observe(({next,complete, error}) =>
228 producer({
229 next: tapNext ? (v => (tapNext(v), next(v))) : next,
230 complete: tapComplete ? (() => (tapComplete(), complete())): complete,
231 error: tapError ? (e => (tapError(e), error(e))) : error
232 })
233 ),
234
217 235 filter: (predicate) => _observe(({ next, ...rest }) =>
218 236 producer({
219 237 next: next !== noop ? (v: T) => predicate(v) ? next(v) : void (0) : noop,
220 238 ...rest
221 239 })
222 240 ),
223 241
224 242 until: predicate => _observe(({ next, complete, ...rest }) =>
225 243 producer({
226 244 next: v => predicate(v) ? complete() : next(v),
227 245 complete,
228 246 ...rest
229 247 })
230 248 ),
231 249
232 250 while: predicate => _observe(({ next, complete, ...rest }) =>
233 251 producer({
234 252 next: v => predicate(v) ? next(v) : complete(),
235 253 complete,
236 254 ...rest
237 255 })
238 256 ),
239 257
240 258 scan: <A>(...args: [AccumulatorFn<T, A>, A] | [AccumulatorFn<T, T>]) => _observe<T | A>(({ next, ...rest }) => {
241 259 if (args.length === 1) {
242 260 const [accumulator] = args;
243 261 let _acc: T;
244 262 let index = 0;
245 263 return producer({
246 264 next: next !== noop ? (v: T) => next(index++ === 0 ? _acc = v : _acc = accumulator(_acc, v)) : noop,
247 265 ...rest
248 266 });
249 267 } else {
250 268 const [accumulator, initial] = args;
251 269 let _acc = initial;
252 270 return producer({
253 271 next: next !== noop ? (v: T) => next(_acc = accumulator(_acc, v)) : noop,
254 272 ...rest
255 273 });
256 274 }
257 275 }),
258 276
259 277 reduce: <A>(...args: [AccumulatorFn<T, A>, A] | [AccumulatorFn<T, T>]) => _observe<T | A>(({ next, complete, error }) => {
260 278 if (args.length === 1) {
261 279 const [accumulator] = args;
262 280 let _acc: T;
263 281 let index = 0;
264 282 return producer({
265 283 next: next !== noop ? (v: T) => {
266 284 _acc = index++ === 0 ? v : accumulator(_acc, v);
267 285 } : noop,
268 286 complete: () => {
269 287 if (index === 0) {
270 288 error(new Error("The sequence can't be empty"));
271 289 } else {
272 290 next(_acc);
273 291 complete();
274 292 }
275 293 },
276 294 error
277 295 });
278 296 } else {
279 297 const [accumulator, initial] = args;
280 298 let _acc = initial;
281 299 return producer({
282 300 next: next !== noop ? (v: T) => {
283 301 _acc = accumulator(_acc, v);
284 302 } : noop,
285 303 complete: () => {
286 304 next(_acc);
287 305 complete();
288 306 },
289 307 error
290 308 });
291 309 }
292 310 }),
293 311
294 312 cat: (...seq) => _observe(({ next, complete: final, ...rest }) => {
295 313 let cleanup: () => void;
296 314 const len = seq.length;
297 315 const complete = (i: number) => i < len ?
298 316 () => {
299 317 const subscription = seq[i].subscribe({ next, complete: complete(i + 1), ...rest });
300 318 cleanup = subscription.unsubscribe.bind(subscription);
301 319 } : final;
302 320
303 321 cleanup = producer({ next, complete: complete(0), ...rest }) ?? noop;
304 322
305 323 return () => cleanup();
306 324 }),
307 325
308 326 pipe: <U>(op: OperatorFn<T, U>) => op(_observe(producer)),
309 327
310 328 next: collect(
311 329 producer,
312 330 ({ next, complete, error }) => ({
313 331 next: v => (next(v), complete()),
314 332 complete: () => error(new Error("The sequence is empty")),
315 333 error
316 334 })
317 335 ),
318 336
319 337 collect: collect(
320 338 producer,
321 339 ({ next, complete, error}) => {
322 340 const data: T[] = [];
323 341 return {
324 342 next: v => data.push(v),
325 343 complete: () => (next(data), complete()),
326 344 error
327 345 };
328 346 }
329 347 )
330 348 });
331 349
332 350 const collect = <T, U>(
333 351 producer: FusedProducer<T>,
334 352 collector: (result: FusedSink<U>) => FusedSink<T>
335 353 ) => (ct = Cancellation.none) => new Promise<U>((resolve, reject) => {
336 354 const fused = fuse<U>(({ next, complete, error, isClosed }) => {
337 355 const h = ct.register(error);
338 356 const cleanup = !isClosed() ?
339 357 producer(collector({ next, complete, error })) ?? noop :
340 358 noop;
341 359
342 360 return () => {
343 361 h.destroy();
344 362 cleanup();
345 363 };
346 364 });
347 365
348 366 fused({
349 367 next: resolve,
350 368 error: reject,
351 369 complete: noop
352 370 });
353 371 });
354 372
355 373 export const observe = <T>(producer: Producer<T>) => _observe(fuse(producer));
356 374
357 375 /** Converts an array to the observable sequence of its elements. */
358 376 export const ofArray = <T>(items: T[]) => _observe<T>(
359 377 ({ next, complete }) => (
360 378 items.forEach(next),
361 379 complete()
362 380 )
363 381 );
364 382
365 383 /** Converts a subscribable to the observable */
366 384 export const ofSubscribable = <T>(subscribable: Subscribable<T>) =>
367 385 observe<T>(sink => {
368 386 const subscription = subscribable.subscribe(sink);
369 387 return () => subscription.unsubscribe();
370 388 });
371 389
372 390 const of1 = <T>(item: T | PromiseLike<T>) => observe<T>(
373 391 ({ next, error, complete }) =>
374 392 isPromise(item) ?
375 393 void item.then(
376 394 v => (next(v), complete()),
377 395 error
378 396 ) :
379 397 (next(item), complete())
380 398 );
381 399
382 400 /** Converts a list of parameter values to the observable sequence. The
383 401 * order of elements in the list will be preserved in the resulting sequence.
384 402 */
385 403 export const of = <T>(...items: (T | PromiseLike<T>)[]) => items.length === 1 ?
386 404 of1(items[0]) :
387 405 observe<T>(
388 406 ({ next, error, complete, isClosed }) => {
389 407 const n = items.length;
390 408
391 409 const _next = (start: number) => {
392 410 if (start > 0 && isClosed()) // when resumed
393 411 return;
394 412
395 413 for (let i = start; i < n; i++) {
396 414 const r = items[i];
397 415 if (isPromise(r)) {
398 416 r.then(v => (next(v), _next(i + 1)), error);
399 417 return; // suspend
400 418 } else {
401 419 next(r);
402 420 }
403 421 }
404 422 complete();
405 423 };
406 424
407 425 _next(0);
408 426 }
409 427 );
410 428
411 429 export const empty = _observe<never>(({ complete }) => complete());
@@ -1,125 +1,152
1 1 import { PromiseOrValue } from "@implab/core-amd/interfaces";
2 2 import { isCancellable, isPromise } from "@implab/core-amd/safe";
3 3 import { observe, Observable, empty } from "./observable";
4 4 import { after } from "dojo/aspect";
5 5
6 6 export interface OrderedUpdate<T> {
7 7 /** The item is being updated */
8 8 readonly item: T;
9 9
10 10 /** The previous index of the item, -1 in case it is inserted */
11 11 readonly prevIndex: number;
12 12
13 13 /** The new index of the item, -1 in case it is deleted */
14 14 readonly newIndex: number;
15 15
16 16 }
17 17
18 18 export type QueryResults<T> = Observable<OrderedUpdate<T>>;
19 19
20 20 interface DjObservableResults<T> {
21 21 /**
22 22 * Allows observation of results
23 23 */
24 24 observe(listener: (object: T, previousIndex: number, newIndex: number) => void, includeUpdates?: boolean): {
25 25 remove(): void;
26 26 };
27 27 }
28 28
29 29 interface Queryable<T, Q, O> {
30 30 query(query?: Q, options?: O): PromiseOrValue<T[]>;
31 31 }
32 32
33 33 export const isDjObservableResults = <T>(v: object): v is DjObservableResults<T> =>
34 34 v && (typeof (v as { observe?: unknown; }).observe === "function");
35 35
36 export const query = <T, Q, O>(store: Queryable<T, Q, O>, includeUpdates = true) =>
37 (query?: Q, options?: O & { observe?: boolean }) => {
38 return observe<OrderedUpdate<T>>(({ next, complete, error, isClosed }) => {
36 export const query = <T, Q, O>(store: Queryable<T, Q, O>, includeUpdates = true) => {
37 const q = queryEx(store, includeUpdates);
38 return (query?: Q, options?: O & { observe?: boolean }) => {
39 const [data, updates] = q(query, options);
40
41 return options?.observe === false ? data : data.cat(updates);
42 };
43 };
39 44
45 export const queryEx = <T, Q, O>(store: Queryable<T, Q, O>, includeUpdates = true) =>
46 (query?: Q, options?: O): [data: QueryResults<T>, updates: QueryResults<T>] => {
47
48 const pending: T[] = [];
49
50 let results: PromiseOrValue<T[]> = pending;
51
52 const data = observe<OrderedUpdate<T>>(({ next, complete, error }) => {
40 53 const processResults = (items: T[]) =>
41 54 items.forEach((item, newIndex) => next({ item, newIndex, prevIndex: -1 }));
42 55
43 56 try {
44 const results = store.query(query, options);
57 if (results === pending)
58 results = store.query(query, options);
59
45 60 if (isPromise(results)) {
46 results.then(processResults).then(undefined, error);
61 results.then(processResults).then(complete, error);
62
63 if (isCancellable(results))
64 return results.cancel.bind(results);
47 65 } else {
48 66 processResults(results);
67 complete();
49 68 }
69 } catch (e) {
70 error(e);
71 }
72 });
50 73
51 if (!isClosed() && (options?.observe !== false) && isDjObservableResults<T>(results)) {
74 const updates = observe<OrderedUpdate<T>>(({ next, complete, error, isClosed }) => {
75 try {
76 if (!isClosed() && isDjObservableResults<T>(results)) {
52 77 const h = results.observe((item, prevIndex, newIndex) => next({ item, prevIndex, newIndex }), includeUpdates);
53 78 return () => h.remove();
54 79 } else {
55 80 complete();
56 81 }
57 } catch (err) {
58 error(err);
82 } catch (e) {
83 error(e);
59 84 }
60 85 });
61 86
87 return [data, updates];
62 88 };
63 89
90
64 91 interface IndexedStore<T> {
65 92 get(id: string | number): PromiseLike<T> | T | null | undefined;
66 93 }
67 94
68 95 interface Notifications<T> {
69 96 notify(item: T | undefined, id: string | number | undefined): void;
70 97 }
71 98
72 99 const hasNotifications = <T>(x: unknown): x is Notifications<T> =>
73 100 typeof x === "object" && x !== null && (typeof (x as Notifications<T>).notify === "function");
74 101
75 102 interface GetOpts {
76 103 observe?: boolean;
77 104 }
78 105
79 type Change<T> = [item: NonNullable<T>, id: string | number | undefined] |
106 export type ItemUpdate<T> = [item: NonNullable<T>, id: string | number | undefined] |
80 107 [item: undefined | null, id: string | number];
81 108
82 109 const filterItem = (itemId: string | number) =>
83 <T>(source: Observable<Change<T>>) =>
110 <T>(source: Observable<ItemUpdate<T>>) =>
84 111 observe<T>(({ next, complete, error }) => {
85 112 const subscription = source
86 113 .filter(([, id]) => id === itemId)
87 114 .subscribe({
88 115 next: ([item]) => item !== null && item !== undefined ? next(item) : complete(),
89 116 complete,
90 117 error
91 118 });
92 119 return () => subscription.unsubscribe();
93 120 });
94 121
95 122 export const get = <T>(store: IndexedStore<T>) => {
96 123 const changes = hasNotifications<T>(store) ?
97 observe<Change<T>>(({ next }) => {
98 const handle = after(store, "notify", (...args: Change<T>) => next(args), true);
124 observe<ItemUpdate<T>>(({ next }) => {
125 const handle = after(store, "notify", (...args: ItemUpdate<T>) => next(args), true);
99 126 return () => handle.remove();
100 127 }) : empty;
101 128
102 129 return (id: string | number, opts: GetOpts = {}) =>
103 130 observe<T>(({ next, complete, error }) => {
104 131 try {
105 132 const result = store.get(id);
106 133
107 134 const handle = (x: T | null | undefined) => {
108 135 if (x !== null && x !== undefined)
109 136 next(x);
110 137 complete();
111 138 };
112 139
113 140 if (isPromise(result)) {
114 141 result.then(handle).then(undefined, error);
115 142
116 143 if (isCancellable(result))
117 144 return () => result.cancel();
118 145 } else {
119 146 handle(result);
120 147 }
121 148 } catch (e) {
122 149 error(e);
123 150 }
124 151 }).cat(opts.observe !== false ? changes.pipe(filterItem(id)) : empty);
125 152 }; No newline at end of file
@@ -1,65 +1,89
1 1 import Memory = require("dojo/store/Memory");
2 2 import Observerable = require("dojo/store/Observable");
3 import { get } from "./store";
3 import { get, queryEx } from "./store";
4 4 import tap = require("tap");
5 5
6 6 interface Person {
7 7 id: string;
8 8
9 9 name: string;
10 10
11 11 age: number;
12 12 }
13 13
14 14 tap.test("store::get(...) tests", async t => {
15 15 const store = new Observerable(new Memory<Person>());
16 16
17 17 const getPerson = get(store);
18 18
19 19 const peterId = "id:peter";
20 20
21 21 const samId = "id:sam";
22 22
23 23 const peter = getPerson(peterId);
24 24 const sam = getPerson(samId);
25 25
26 26 const seq1 = await getPerson(peterId, { observe: false }).collect();
27 27
28 28 t.ok(seq1.length === 0, "Should be empty sequence");
29 29
30 30 let peterChangeCount = 0;
31 31 let samChangeCount = 0;
32 32 let peterDeleted = 0;
33 33
34 34 const peterSubscription = peter.subscribe({
35 35 next: () => peterChangeCount++,
36 36 complete: () => peterDeleted++
37 37 });
38 38 const samSubscription = sam.subscribe({
39 39 next: () => samChangeCount++
40 40 });
41 41
42 42 try {
43 43 t.equal(peterChangeCount, 0, "Should be no changes recorded");
44 44
45 45 store.put({id: peterId, name: "Peter", age: 30 });
46 46
47 47 t.equal(peterChangeCount, 1, "Should record 1 object change");
48 48 t.equal(samChangeCount, 0, "Should not record other object changes");
49 49
50 50 store.remove(peterId);
51 51
52 52 t.equal(peterDeleted, 1, "Should complete sequence");
53 53 t.equal(peterChangeCount, 1, "Should not record remove operations");
54 54
55 55 store.put({id: peterId, name: "Peter", age: 29});
56 56
57 57 t.equal(peterChangeCount, 1, "Should not record changes after completion");
58 58
59 59 } finally {
60 60 peterSubscription.unsubscribe();
61 61 samSubscription.unsubscribe();
62 62 }
63 63
64 store.put({ id: samId, name: "Sam", age: 29});
65
66 const [data, updates] = queryEx(store)({ age: 29}, { sort: [{attribute: "id"}] });
67
68 const dump: string[] = [];
69
70 const subscription = data
71 .tap({
72 complete: () => dump.push("eof")
73 })
74 .cat(updates)
75 .tap({
76 next: ({item: {id}}) => dump.push(id),
77 complete: () => dump.push("eof")
78 })
79 .subscribe({});
80
81 t.same(dump, ["id:peter", "id:sam", "eof"]);
82
83 store.put({ id: "id:mary", name: "Mary", age: 29});
84
85 t.same(dump, ["id:peter", "id:sam", "eof", "id:mary"]);
86
87 subscription.unsubscribe();
64 88
65 89 }).catch(() => { }); No newline at end of file
@@ -1,150 +1,173
1 1 import { empty, observe, of } from "./observable";
2 2 import * as tap from "tap";
3 3 import { Cancellation } from "@implab/core-amd/Cancellation";
4 4 import { delay } from "@implab/core-amd/safe";
5 5
6 6 const subj1 = observe<number>(({ next, complete }) => {
7 7 next(1);
8 8 complete();
9 9 next(2);
10 10 });
11 11
12 12 const consumer1 = {
13 13 sum: 0,
14 14 next(v: number) {
15 15 this.sum += v;
16 16 }
17 17 };
18 18
19 19 subj1.subscribe(consumer1);
20 20 tap.equal(consumer1.sum, 1, "Should get only one value");
21 21
22 22 subj1.subscribe(consumer1);
23 23 tap.equal(consumer1.sum, 2, "Should get the value again");
24 24
25 25 const consumer2 = {
26 26 value: 0,
27 27 completed: false,
28 28 next(v: number) { this.value = v; },
29 29 complete() { this.completed = true; }
30 30 };
31 31
32 32 let maps = 0;
33 33
34 34 subj1
35 35 .map(v => {
36 36 tap.comment(`map1: ${v * 2}`);
37 37 maps++;
38 38 return v * 2;
39 39 })
40 40 .map(v => {
41 41 tap.comment(`map2: ${v * 2}`);
42 42 maps++;
43 43 return v * 2;
44 44 })
45 45 .map(v => {
46 46 tap.comment(`map3: ${v * 2}`);
47 47 maps++;
48 48 return v * 2;
49 49 })
50 50 .subscribe(consumer2);
51 51
52 52 tap.equal(consumer2.value, 8, "Should map");
53 53 tap.equal(maps, 3, "The map chain should not be executed after completion");
54 54 tap.ok(consumer2.completed, "The completion signal should pass through");
55 55
56 56 const subj2 = observe<number>(({ next, complete }) => {
57 57 [1, 2, 3, 4, 5].forEach(next);
58 58 complete();
59 59 return () => {
60 60 tap.comment("subj2: unsubscribe");
61 61 };
62 62 });
63 63
64 64 const consumer3 = {
65 65 even: 0,
66 66 odd: 0,
67 67 completed: false,
68 68 subscribed: 0,
69 69 unsubscribed: 0,
70 70 next(v: "even" | "odd") {
71 71 this[v]++;
72 72 },
73 73 complete() {
74 74 this.completed = true;
75 75 },
76 76 subscribe() {
77 77 this.subscribed++;
78 78 },
79 79 unsubscribe() {
80 80 this.unsubscribed++;
81 81 }
82 82 };
83 83
84 84
85 85 const subj3 = subj2.pipe<"even" | "odd">(self => observe(({ next, complete, error }) => {
86 86 consumer3.subscribe();
87 87 let count = 0;
88 88 const h = self.subscribe({
89 89 next: val => {
90 90 if (val % 2 === 0)
91 91 next("odd");
92 92 else
93 93 next("even");
94 94 if (++count === 4)
95 95 complete();
96 96 },
97 97 complete,
98 98 error
99 99 });
100 100 return () => {
101 101 consumer3.unsubscribe();
102 102 h.unsubscribe();
103 103 };
104 104 }));
105 105
106 106 subj3.subscribe(consumer3);
107 107
108 108 tap.equal(consumer3.odd, 2, "Should get 2 odd elements");
109 109 tap.equal(consumer3.even, 2, "Should get 2 even elements");
110 110 tap.ok(consumer3.completed, "The sequence should completed");
111 111 tap.equal(consumer3.subscribed, 1, "The subscription should be done once");
112 112 tap.equal(consumer3.unsubscribed, 1, "The cleanup should be done after completion");
113 113
114 114 subj2.reduce((a, b) => a + b).subscribe({
115 115 next: val => tap.comment("subj2: reduce =", val),
116 116 complete: () => tap.comment("subj2: complete")
117 117 });
118 118
119 119 tap.test("of(...) tests", async t => {
120 120 await subj2.reduce((a, b) => a + b).next()
121 121 .then(value => t.comment("subj2: next reduce=", value));
122 122
123 123 await subj2.next().then(val => t.equal(val, 1, "Should peek the first element"));
124 124
125 125 const cancelled = new Cancellation(cancel => cancel());
126 126 await t.rejects(subj2.next(cancelled), "Cancelled next() method should fail");
127 127
128 128 await t.rejects(empty.next(), "Empty sequence should fail to get next element");
129 129
130 130 await of(delay(1).then(() => 1), Promise.resolve(2), 3)
131 131 .reduce<number[]>((a, x) => [...a, x], [])
132 132 .next()
133 133 .then(res => t.same(res, [1, 2, 3], "of(...) should keep the order"));
134 134
135 135 const rejected = Promise.reject("DIE!");
136 136 rejected.catch(() => { }); // SAFE AND SOUND
137 137
138 138 await t.resolves(
139 139 of(Promise.resolve(1), rejected).next(),
140 140 "of(...) should emit non-rejected items"
141 141 );
142 142 await t.rejects(
143 143 of(1, Promise.reject("DIE!")).reduce((a) => a).next(),
144 144 "of(...) should terminate with error when a parameter is rejected"
145 145 );
146 146
147 147 t.same(await of(1,2,3).collect(), [1,2,3], ".collect() should return the collected sequence");
148 148 await t.rejects(of(1,2,3).collect(cancelled), ".collect() should support cancellation");
149 149
150 }).catch(() => { }); No newline at end of file
150 }).catch(() => { });
151
152 tap.test(".tap() tests", async t => {
153 const side: number[] = [];
154
155 of(1,2)
156 .tap({next: v => side.push(v), complete: () => side.push(0)})
157 .tap({next: v => side.push(v*v)})
158 .subscribe({});
159
160 t.same(side, [1,1,2,4,0], ".tap() should be called in the order of registration");
161
162 side.length = 0;
163
164 await new Promise<void>(resolve => {
165 of(1,2,delay(1).then(() => 3))
166 .tap({next: v => side.push(v)})
167 .tap({ next: v => v === 1 && resolve()})
168 .subscribe({});
169 });
170
171 t.same(side, [1,2], ".tap() should be processed synchronously");
172
173 }).catch(() => {}); No newline at end of file
General Comments 0
You need to be logged in to leave comments. Login now