import { empty, observe, of } from "./observable"; import * as tap from "tap"; import { Cancellation } from "@implab/core-amd/Cancellation"; import { delay } from "@implab/core-amd/safe"; const subj1 = observe(({ next, complete }) => { next(1); complete(); next(2); }); const consumer1 = { sum: 0, next(v: number) { this.sum += v; } }; subj1.subscribe(consumer1); tap.equal(consumer1.sum, 1, "Should get only one value"); subj1.subscribe(consumer1); tap.equal(consumer1.sum, 2, "Should get the value again"); const consumer2 = { value: 0, completed: false, next(v: number) { this.value = v; }, complete() { this.completed = true; } }; let maps = 0; subj1 .map(v => { tap.comment(`map1: ${v * 2}`); maps++; return v * 2; }) .map(v => { tap.comment(`map2: ${v * 2}`); maps++; return v * 2; }) .map(v => { tap.comment(`map3: ${v * 2}`); maps++; return v * 2; }) .subscribe(consumer2); tap.equal(consumer2.value, 8, "Should map"); tap.equal(maps, 3, "The map chain should not be executed after completion"); tap.ok(consumer2.completed, "The completion signal should pass through"); const subj2 = observe(({ next, complete }) => { [1, 2, 3, 4, 5].forEach(next); complete(); return () => { tap.comment("subj2: unsubscribe"); }; }); const consumer3 = { even: 0, odd: 0, completed: false, subscribed: 0, unsubscribed: 0, next(v: "even" | "odd") { this[v]++; }, complete() { this.completed = true; }, subscribe() { this.subscribed++; }, unsubscribe() { this.unsubscribed++; } }; const subj3 = subj2.pipe<"even" | "odd">(self => observe(({ next, complete, error }) => { consumer3.subscribe(); let count = 0; const h = self.subscribe({ next: val => { if (val % 2 === 0) next("odd"); else next("even"); if (++count === 4) complete(); }, complete, error }); return () => { consumer3.unsubscribe(); h.unsubscribe(); }; })); subj3.subscribe(consumer3); tap.equal(consumer3.odd, 2, "Should get 2 odd elements"); tap.equal(consumer3.even, 2, "Should get 2 even elements"); tap.ok(consumer3.completed, "The sequence should completed"); tap.equal(consumer3.subscribed, 1, "The subscription should be done once"); tap.equal(consumer3.unsubscribed, 1, "The cleanup should be done after completion"); subj2.reduce((a, b) => a + b).subscribe({ next: val => tap.comment("subj2: reduce =", val), complete: () => tap.comment("subj2: complete") }); tap.test("of(...) tests", async t => { await subj2.reduce((a, b) => a + b).next() .then(value => t.comment("subj2: next reduce=", value)); await subj2.next().then(val => t.equal(val, 1, "Should peek the first element")); const cancelled = new Cancellation(cancel => cancel()); await t.rejects(subj2.next(cancelled), "Cancelled next() method should fail"); await t.rejects(empty.next(), "Empty sequence should fail to get next element"); await of(delay(1).then(() => 1), Promise.resolve(2), 3) .reduce((a, x) => [...a, x], []) .next() .then(res => t.same(res, [1, 2, 3], "of(...) should keep the order")); const rejected = Promise.reject("DIE!"); rejected.catch(() => { }); // SAFE AND SOUND await t.resolves( of(Promise.resolve(1), rejected).next(), "of(...) should emit non-rejected items" ); await t.rejects( of(1, Promise.reject("DIE!")).reduce((a) => a).next(), "of(...) should terminate with error when a parameter is rejected" ); t.same(await of(1,2,3).collect(), [1,2,3], ".collect() should return the collected sequence"); await t.rejects(of(1,2,3).collect(cancelled), ".collect() should support cancellation"); }).catch(() => { }); tap.test(".tap() tests", async t => { const side: number[] = []; of(1,2) .tap({next: v => side.push(v), complete: () => side.push(0)}) .tap({next: v => side.push(v*v)}) .subscribe({}); t.same(side, [1,1,2,4,0], ".tap() should be called in the order of registration"); side.length = 0; await new Promise(resolve => { of(1,2,delay(1).then(() => 3)) .tap({next: v => side.push(v)}) .tap({ next: v => v === 1 && resolve()}) .subscribe({}); }); t.same(side, [1,2], ".tap() should be processed synchronously"); }).catch(() => {});