diff --git a/djx/src/main/ts/observable.ts b/djx/src/main/ts/observable.ts --- a/djx/src/main/ts/observable.ts +++ b/djx/src/main/ts/observable.ts @@ -88,6 +88,8 @@ export interface Observable extends S scan(accumulator: (acc: A, value: T) => A, initial: A): Observable; cat(...seq: Subscribable[]): Observable; + + pipe(f: (source: Observable) => Producer): Observable; } const noop = () => { }; @@ -169,7 +171,9 @@ const _observe = (producer: Producer< cleanup = producer({ next, complete, ...rest }) ?? noop; return () => cleanup(); - }) + }), + + pipe: (f: (source: Observable) => Producer) => observe(f(_observe(producer))) }); export interface OrderUpdate { diff --git a/djx/src/test/ts/observable-tests.ts b/djx/src/test/ts/observable-tests.ts --- a/djx/src/test/ts/observable-tests.ts +++ b/djx/src/test/ts/observable-tests.ts @@ -35,7 +35,7 @@ subj1 maps++; return v * 2; }) - .map (v => { + .map(v => { t.comment(`map2: ${v * 2}`); maps++; return v * 2; @@ -50,3 +50,33 @@ subj1 t.equal(consumer2.value, 8, "Should map"); t.equal(maps, 3, "The map chain should not be executed after completion"); t.ok(consumer2.completed, "The completion signal should pass through"); + +const subj2 = observe(({ next, complete }) => { + [1, 2, 3, 4, 5].forEach(next); + complete(); +}).pipe(self => ({ next, complete, error }) => { + t.comment("subj2: subscribe"); + const h = self.subscribe({ + next: val => { + if (val % 2 === 0) + next("odd"); + else + next("even"); + }, + complete, + error + }); + return () =>{ + t.comment("subj2: unsubscribe"); + h.unsubscribe(); + }; +}); + +subj2.subscribe({ + next: val => t.comment("subj2: ", val), + complete: () => t.comment("subj2: complete") +}); +subj2.subscribe({ + next: val => t.comment("subj2: ", val), + complete: () => t.comment("subj2: complete") +}); \ No newline at end of file diff --git a/playground/package-lock.json b/playground/package-lock.json --- a/playground/package-lock.json +++ b/playground/package-lock.json @@ -30,7 +30,6 @@ }, "../djx/build/npm/package": { "name": "@implab/djx", - "version": "1.4.4", "dev": true, "license": "BSD-2-Clause", "peerDependencies": {