import { empty, observe } from "./observable"; import * as t from "tap"; import { Cancellation } from "@implab/core-amd/Cancellation"; const subj1 = observe(({ next, complete }) => { next(1); complete(); next(2); }); const consumer1 = { sum: 0, next(v: number) { this.sum += v; } }; subj1.subscribe(consumer1); t.equal(consumer1.sum, 1, "Should get only one value"); subj1.subscribe(consumer1); t.equal(consumer1.sum, 2, "Should get the value again"); const consumer2 = { value: 0, completed: false, next(v: number) { this.value = v; }, complete() { this.completed = true; } }; let maps = 0; subj1 .map(v => { t.comment(`map1: ${v * 2}`); maps++; return v * 2; }) .map(v => { t.comment(`map2: ${v * 2}`); maps++; return v * 2; }) .map(v => { t.comment(`map3: ${v * 2}`); maps++; return v * 2; }) .subscribe(consumer2); t.equal(consumer2.value, 8, "Should map"); t.equal(maps, 3, "The map chain should not be executed after completion"); t.ok(consumer2.completed, "The completion signal should pass through"); const subj2 = observe(({ next, complete }) => { [1, 2, 3, 4, 5].forEach(next); complete(); return () => { t.comment("subj2: unsubscribe"); }; }); const consumer3 = { even: 0, odd: 0, completed: false, subscribed: 0, unsubscribed: 0, next(v: "even" | "odd") { this[v] ++; }, complete() { this.completed = true; }, subscribe() { this.subscribed++; }, unsubscribe() { this.unsubscribed++; } }; const subj3 = subj2.pipe<"even" | "odd">(self => ({ next, complete, error }) => { consumer3.subscribe(); let count = 0; const h = self.subscribe({ next: val => { if (val % 2 === 0) next("odd"); else next("even"); if (++count === 4) complete(); }, complete, error }); return () =>{ consumer3.unsubscribe(); h.unsubscribe(); }; }); subj3.subscribe(consumer3); t.equal(consumer3.odd, 2, "Should get 2 odd elements"); t.equal(consumer3.even, 2, "Should get 2 even elements"); t.ok(consumer3.completed, "The sequence should completed"); t.equal(consumer3.subscribed, 1, "The subscription should be done once"); t.equal(consumer3.unsubscribed, 1, "The cleanup should be done after completion"); subj2.reduce((a,b) => a + b).subscribe({ next: val => t.comment("subj2: reduce =", val), complete: () => t.comment("subj2: complete") }); subj2.reduce((a,b) => a + b).next() .then(value => t.comment("subj2: next reduce=", value)) .catch(() => {}); subj2.next().then(val => t.equal(val, 1, "Should peek the first element")).catch(() => {}); const cancelled = new Cancellation(cancel => cancel()); t.rejects(subj2.next(cancelled), "Cancelled next() method should fail").catch(() => {}); t.rejects(empty.next(), "Empty sequence should fail to get next element").catch(() => {});