##// END OF EJS Templates
added pipe method to observable
cin -
r114:e9a9ed6d7647 v1.5.0 default
parent child
Show More
@@ -88,6 +88,8 export interface Observable<T> extends S
88 88 scan<A>(accumulator: (acc: A, value: T) => A, initial: A): Observable<A>;
89 89
90 90 cat(...seq: Subscribable<T>[]): Observable<T>;
91
92 pipe<U>(f: (source: Observable<T>) => Producer<U>): Observable<U>;
91 93 }
92 94
93 95 const noop = () => { };
@@ -169,7 +171,9 const _observe = <T>(producer: Producer<
169 171 cleanup = producer({ next, complete, ...rest }) ?? noop;
170 172
171 173 return () => cleanup();
172 })
174 }),
175
176 pipe: <U>(f: (source: Observable<T>) => Producer<U>) => observe(f(_observe(producer)))
173 177 });
174 178
175 179 export interface OrderUpdate<T> {
@@ -50,3 +50,33 subj1
50 50 t.equal(consumer2.value, 8, "Should map");
51 51 t.equal(maps, 3, "The map chain should not be executed after completion");
52 52 t.ok(consumer2.completed, "The completion signal should pass through");
53
54 const subj2 = observe<number>(({ next, complete }) => {
55 [1, 2, 3, 4, 5].forEach(next);
56 complete();
57 }).pipe<string>(self => ({ next, complete, error }) => {
58 t.comment("subj2: subscribe");
59 const h = self.subscribe({
60 next: val => {
61 if (val % 2 === 0)
62 next("odd");
63 else
64 next("even");
65 },
66 complete,
67 error
68 });
69 return () =>{
70 t.comment("subj2: unsubscribe");
71 h.unsubscribe();
72 };
73 });
74
75 subj2.subscribe({
76 next: val => t.comment("subj2: ", val),
77 complete: () => t.comment("subj2: complete")
78 });
79 subj2.subscribe({
80 next: val => t.comment("subj2: ", val),
81 complete: () => t.comment("subj2: complete")
82 }); No newline at end of file
@@ -30,7 +30,6
30 30 },
31 31 "../djx/build/npm/package": {
32 32 "name": "@implab/djx",
33 "version": "1.4.4",
34 33 "dev": true,
35 34 "license": "BSD-2-Clause",
36 35 "peerDependencies": {
General Comments 0
You need to be logged in to leave comments. Login now