|
|
import { empty, observe, of } from "./observable";
|
|
|
import * as tap from "tap";
|
|
|
import { Cancellation } from "@implab/core-amd/Cancellation";
|
|
|
import { delay } from "@implab/core-amd/safe";
|
|
|
|
|
|
const subj1 = observe<number>(({ next, error, complete }) => {
|
|
|
next(1);
|
|
|
complete();
|
|
|
next(2);
|
|
|
next(3);
|
|
|
error(new Error("This error should be ignored"));
|
|
|
next(4);
|
|
|
});
|
|
|
|
|
|
const consumer1 = {
|
|
|
sum: 0,
|
|
|
next(v: number) {
|
|
|
this.sum += v;
|
|
|
}
|
|
|
};
|
|
|
|
|
|
subj1.subscribe(consumer1);
|
|
|
tap.equal(consumer1.sum, 1, "Should get only one value");
|
|
|
|
|
|
subj1.subscribe(consumer1);
|
|
|
tap.equal(consumer1.sum, 2, "Should get the value again");
|
|
|
|
|
|
const consumer2 = {
|
|
|
value: 0,
|
|
|
completed: false,
|
|
|
next(v: number) { this.value = v; },
|
|
|
complete() { this.completed = true; }
|
|
|
};
|
|
|
|
|
|
let maps = 0;
|
|
|
|
|
|
subj1
|
|
|
.map(v => {
|
|
|
tap.comment(`map1: ${v * 2}`);
|
|
|
maps++;
|
|
|
return v * 2;
|
|
|
})
|
|
|
.map(v => {
|
|
|
tap.comment(`map2: ${v * 2}`);
|
|
|
maps++;
|
|
|
return v * 2;
|
|
|
})
|
|
|
.map(v => {
|
|
|
tap.comment(`map3: ${v * 2}`);
|
|
|
maps++;
|
|
|
return v * 2;
|
|
|
})
|
|
|
.subscribe(consumer2);
|
|
|
|
|
|
tap.equal(consumer2.value, 8, "Should map");
|
|
|
tap.equal(maps, 3, "The map chain should not be executed after completion");
|
|
|
tap.ok(consumer2.completed, "The completion signal should pass through");
|
|
|
|
|
|
const subj2 = observe<number>(({ next, complete }) => {
|
|
|
[1, 2, 3, 4, 5].forEach(next);
|
|
|
complete();
|
|
|
return () => {
|
|
|
tap.comment("subj2: unsubscribe");
|
|
|
};
|
|
|
});
|
|
|
|
|
|
const consumer3 = {
|
|
|
even: 0,
|
|
|
odd: 0,
|
|
|
completed: false,
|
|
|
subscribed: 0,
|
|
|
unsubscribed: 0,
|
|
|
next(v: "even" | "odd") {
|
|
|
this[v]++;
|
|
|
},
|
|
|
complete() {
|
|
|
this.completed = true;
|
|
|
},
|
|
|
subscribe() {
|
|
|
this.subscribed++;
|
|
|
},
|
|
|
unsubscribe() {
|
|
|
this.unsubscribed++;
|
|
|
}
|
|
|
};
|
|
|
|
|
|
|
|
|
const subj3 = subj2.pipe<"even" | "odd">(self => observe(({ next, complete, error }) => {
|
|
|
consumer3.subscribe();
|
|
|
let count = 0;
|
|
|
const h = self.subscribe({
|
|
|
next: val => {
|
|
|
if (val % 2 === 0)
|
|
|
next("odd");
|
|
|
else
|
|
|
next("even");
|
|
|
if (++count === 4)
|
|
|
complete();
|
|
|
},
|
|
|
complete,
|
|
|
error
|
|
|
});
|
|
|
return () => {
|
|
|
consumer3.unsubscribe();
|
|
|
h.unsubscribe();
|
|
|
};
|
|
|
}));
|
|
|
|
|
|
subj3.subscribe(consumer3);
|
|
|
|
|
|
tap.equal(consumer3.odd, 2, "Should get 2 odd elements");
|
|
|
tap.equal(consumer3.even, 2, "Should get 2 even elements");
|
|
|
tap.ok(consumer3.completed, "The sequence should completed");
|
|
|
tap.equal(consumer3.subscribed, 1, "The subscription should be done once");
|
|
|
tap.equal(consumer3.unsubscribed, 1, "The cleanup should be done after completion");
|
|
|
|
|
|
subj2.reduce((a, b) => a + b).subscribe({
|
|
|
next: val => tap.comment("subj2: reduce =", val),
|
|
|
complete: () => tap.comment("subj2: complete")
|
|
|
});
|
|
|
|
|
|
tap.test("of(...) tests", async t => {
|
|
|
await subj2.reduce((a, b) => a + b).next()
|
|
|
.then(value => t.comment("subj2: next reduce=", value));
|
|
|
|
|
|
await subj2.next().then(val => t.equal(val, 1, "Should peek the first element"));
|
|
|
|
|
|
const cancelled = new Cancellation(cancel => cancel());
|
|
|
await t.rejects(subj2.next(cancelled), "Cancelled next() method should fail");
|
|
|
|
|
|
await t.rejects(empty.next(), "Empty sequence should fail to get next element");
|
|
|
|
|
|
await of(delay(1).then(() => 1), Promise.resolve(2), 3)
|
|
|
.reduce<number[]>((a, x) => [...a, x], [])
|
|
|
.next()
|
|
|
.then(res => t.same(res, [1, 2, 3], "of(...) should keep the order"));
|
|
|
|
|
|
const rejected = Promise.reject("DIE!");
|
|
|
rejected.catch(() => { }); // SAFE AND SOUND
|
|
|
|
|
|
await t.resolves(
|
|
|
of(Promise.resolve(1), rejected).next(),
|
|
|
"of(...) should emit non-rejected items"
|
|
|
);
|
|
|
await t.rejects(
|
|
|
of(1, Promise.reject("DIE!")).reduce((a) => a).next(),
|
|
|
"of(...) should terminate with error when a parameter is rejected"
|
|
|
);
|
|
|
|
|
|
t.same(await of(1, 2, 3).collect(), [1, 2, 3], ".collect() should return the collected sequence");
|
|
|
await t.rejects(of(1, 2, 3).collect(cancelled), ".collect() should support cancellation");
|
|
|
|
|
|
}).catch(() => { });
|
|
|
|
|
|
tap.test(".tap() tests", async t => {
|
|
|
const side: number[] = [];
|
|
|
|
|
|
of(1, 2)
|
|
|
.tap({ next: v => side.push(v), complete: () => side.push(0) })
|
|
|
.tap({ next: v => side.push(v * v) })
|
|
|
.subscribe({});
|
|
|
|
|
|
t.same(side, [1, 1, 2, 4, 0], ".tap() should be called in the order of registration");
|
|
|
|
|
|
side.length = 0;
|
|
|
|
|
|
await new Promise<void>(resolve => {
|
|
|
of(1, 2, delay(1).then(() => 3))
|
|
|
.tap({ next: v => side.push(v) })
|
|
|
.tap({ next: v => v === 1 && resolve() })
|
|
|
.subscribe({});
|
|
|
});
|
|
|
|
|
|
t.same(side, [1, 2], ".tap() should be processed synchronously");
|
|
|
|
|
|
}).catch(() => { });
|
|
|
|
|
|
tap.test(".while() tests", async t => {
|
|
|
|
|
|
const seq = of(1, 2, 3, 4).while(v => v <= 2);
|
|
|
|
|
|
t.same(await seq.collect(), [1, 2], "Should collect only taken elements");
|
|
|
|
|
|
const data: number[] = [];
|
|
|
let complete = 0;
|
|
|
seq.subscribe({
|
|
|
next: v => data.push(v),
|
|
|
complete: () => complete++
|
|
|
});
|
|
|
|
|
|
t.same(data, [1, 2], "Should receive only taken elements");
|
|
|
t.equal(complete, 1, "Complete should run once");
|
|
|
|
|
|
}).catch(() => { });
|