observable-tests.ts
172 lines
| 4.6 KiB
| video/mp2t
|
TypeScriptLexer
|
|
r129 | import { empty, observe, of } from "./observable"; | |
| import * as tap from "tap"; | |||
|
|
r116 | import { Cancellation } from "@implab/core-amd/Cancellation"; | |
|
|
r129 | import { delay } from "@implab/core-amd/safe"; | |
|
|
r102 | ||
| const subj1 = observe<number>(({ next, complete }) => { | |||
| next(1); | |||
| complete(); | |||
| next(2); | |||
| }); | |||
| const consumer1 = { | |||
| sum: 0, | |||
| next(v: number) { | |||
| this.sum += v; | |||
| } | |||
|
|
r108 | }; | |
|
|
r102 | ||
| subj1.subscribe(consumer1); | |||
|
|
r129 | tap.equal(consumer1.sum, 1, "Should get only one value"); | |
|
|
r102 | ||
| subj1.subscribe(consumer1); | |||
|
|
r129 | tap.equal(consumer1.sum, 2, "Should get the value again"); | |
|
|
r102 | ||
| const consumer2 = { | |||
| value: 0, | |||
| completed: false, | |||
| next(v: number) { this.value = v; }, | |||
| complete() { this.completed = true; } | |||
| }; | |||
| let maps = 0; | |||
| subj1 | |||
| .map(v => { | |||
|
|
r129 | tap.comment(`map1: ${v * 2}`); | |
|
|
r102 | maps++; | |
| return v * 2; | |||
| }) | |||
|
|
r114 | .map(v => { | |
|
|
r129 | tap.comment(`map2: ${v * 2}`); | |
|
|
r102 | maps++; | |
| return v * 2; | |||
| }) | |||
| .map(v => { | |||
|
|
r129 | tap.comment(`map3: ${v * 2}`); | |
|
|
r102 | maps++; | |
|
|
r108 | return v * 2; | |
|
|
r102 | }) | |
| .subscribe(consumer2); | |||
|
|
r129 | tap.equal(consumer2.value, 8, "Should map"); | |
| tap.equal(maps, 3, "The map chain should not be executed after completion"); | |||
| tap.ok(consumer2.completed, "The completion signal should pass through"); | |||
|
|
r114 | ||
| const subj2 = observe<number>(({ next, complete }) => { | |||
| [1, 2, 3, 4, 5].forEach(next); | |||
| complete(); | |||
|
|
r116 | return () => { | |
|
|
r129 | tap.comment("subj2: unsubscribe"); | |
|
|
r116 | }; | |
| }); | |||
| const consumer3 = { | |||
| even: 0, | |||
| odd: 0, | |||
| completed: false, | |||
| subscribed: 0, | |||
| unsubscribed: 0, | |||
| next(v: "even" | "odd") { | |||
|
|
r129 | this[v]++; | |
|
|
r116 | }, | |
| complete() { | |||
| this.completed = true; | |||
| }, | |||
| subscribe() { | |||
| this.subscribed++; | |||
| }, | |||
| unsubscribe() { | |||
| this.unsubscribed++; | |||
| } | |||
| }; | |||
|
|
r136 | const subj3 = subj2.pipe<"even" | "odd">(self => observe(({ next, complete, error }) => { | |
|
|
r116 | consumer3.subscribe(); | |
| let count = 0; | |||
|
|
r114 | const h = self.subscribe({ | |
| next: val => { | |||
| if (val % 2 === 0) | |||
| next("odd"); | |||
| else | |||
| next("even"); | |||
|
|
r116 | if (++count === 4) | |
| complete(); | |||
|
|
r114 | }, | |
| complete, | |||
| error | |||
| }); | |||
|
|
r129 | return () => { | |
|
|
r116 | consumer3.unsubscribe(); | |
|
|
r114 | h.unsubscribe(); | |
| }; | |||
|
|
r136 | })); | |
|
|
r114 | ||
|
|
r116 | subj3.subscribe(consumer3); | |
|
|
r129 | tap.equal(consumer3.odd, 2, "Should get 2 odd elements"); | |
| tap.equal(consumer3.even, 2, "Should get 2 even elements"); | |||
| tap.ok(consumer3.completed, "The sequence should completed"); | |||
| tap.equal(consumer3.subscribed, 1, "The subscription should be done once"); | |||
| tap.equal(consumer3.unsubscribed, 1, "The cleanup should be done after completion"); | |||
|
|
r116 | ||
|
|
r129 | subj2.reduce((a, b) => a + b).subscribe({ | |
| next: val => tap.comment("subj2: reduce =", val), | |||
| complete: () => tap.comment("subj2: complete") | |||
|
|
r114 | }); | |
|
|
r116 | ||
|
|
r129 | tap.test("of(...) tests", async t => { | |
| await subj2.reduce((a, b) => a + b).next() | |||
| .then(value => t.comment("subj2: next reduce=", value)); | |||
| await subj2.next().then(val => t.equal(val, 1, "Should peek the first element")); | |||
| const cancelled = new Cancellation(cancel => cancel()); | |||
| await t.rejects(subj2.next(cancelled), "Cancelled next() method should fail"); | |||
| await t.rejects(empty.next(), "Empty sequence should fail to get next element"); | |||
| await of(delay(1).then(() => 1), Promise.resolve(2), 3) | |||
| .reduce<number[]>((a, x) => [...a, x], []) | |||
| .next() | |||
| .then(res => t.same(res, [1, 2, 3], "of(...) should keep the order")); | |||
|
|
r116 | ||
|
|
r129 | const rejected = Promise.reject("DIE!"); | |
| rejected.catch(() => { }); // SAFE AND SOUND | |||
|
|
r116 | ||
|
|
r129 | await t.resolves( | |
| of(Promise.resolve(1), rejected).next(), | |||
| "of(...) should emit non-rejected items" | |||
| ); | |||
| await t.rejects( | |||
| of(1, Promise.reject("DIE!")).reduce((a) => a).next(), | |||
| "of(...) should terminate with error when a parameter is rejected" | |||
| ); | |||
|
|
r116 | ||
|
|
r129 | t.same(await of(1,2,3).collect(), [1,2,3], ".collect() should return the collected sequence"); | |
| await t.rejects(of(1,2,3).collect(cancelled), ".collect() should support cancellation"); | |||
|
|
r144 | }).catch(() => { }); | |
| tap.test(".tap() tests", async t => { | |||
| const side: number[] = []; | |||
| of(1,2) | |||
| .tap({next: v => side.push(v), complete: () => side.push(0)}) | |||
| .tap({next: v => side.push(v*v)}) | |||
| .subscribe({}); | |||
| t.same(side, [1,1,2,4,0], ".tap() should be called in the order of registration"); | |||
| side.length = 0; | |||
| await new Promise<void>(resolve => { | |||
| of(1,2,delay(1).then(() => 3)) | |||
| .tap({next: v => side.push(v)}) | |||
| .tap({ next: v => v === 1 && resolve()}) | |||
| .subscribe({}); | |||
| }); | |||
| t.same(side, [1,2], ".tap() should be processed synchronously"); | |||
| }).catch(() => {}); |
