| @@ -88,6 +88,8 export interface Observable<T> extends S | |||||
| 88 | scan<A>(accumulator: (acc: A, value: T) => A, initial: A): Observable<A>; |  | 88 | scan<A>(accumulator: (acc: A, value: T) => A, initial: A): Observable<A>; | |
| 89 |  | 89 | |||
| 90 | cat(...seq: Subscribable<T>[]): Observable<T>; |  | 90 | cat(...seq: Subscribable<T>[]): Observable<T>; | |
|  | 91 | ||||
|  | 92 | pipe<U>(f: (source: Observable<T>) => Producer<U>): Observable<U>; | |||
| 91 | } |  | 93 | } | |
| 92 |  | 94 | |||
| 93 | const noop = () => { }; |  | 95 | const noop = () => { }; | |
| @@ -169,7 +171,9 const _observe = <T>(producer: Producer< | |||||
| 169 | cleanup = producer({ next, complete, ...rest }) ?? noop; |  | 171 | cleanup = producer({ next, complete, ...rest }) ?? noop; | |
| 170 |  | 172 | |||
| 171 | return () => cleanup(); |  | 173 | return () => cleanup(); | |
| 172 | }) |  | 174 | }), | |
|  | 175 | ||||
|  | 176 | pipe: <U>(f: (source: Observable<T>) => Producer<U>) => observe(f(_observe(producer))) | |||
| 173 | }); |  | 177 | }); | |
| 174 |  | 178 | |||
| 175 | export interface OrderUpdate<T> { |  | 179 | export interface OrderUpdate<T> { | |
| @@ -35,7 +35,7 subj1 | |||||
| 35 | maps++; |  | 35 | maps++; | |
| 36 | return v * 2; |  | 36 | return v * 2; | |
| 37 | }) |  | 37 | }) | |
| 38 | .map |  | 38 | .map(v => { | |
| 39 | t.comment(`map2: ${v * 2}`); |  | 39 | t.comment(`map2: ${v * 2}`); | |
| 40 | maps++; |  | 40 | maps++; | |
| 41 | return v * 2; |  | 41 | return v * 2; | |
| @@ -50,3 +50,33 subj1 | |||||
| 50 | t.equal(consumer2.value, 8, "Should map"); |  | 50 | t.equal(consumer2.value, 8, "Should map"); | |
| 51 | t.equal(maps, 3, "The map chain should not be executed after completion"); |  | 51 | t.equal(maps, 3, "The map chain should not be executed after completion"); | |
| 52 | t.ok(consumer2.completed, "The completion signal should pass through"); |  | 52 | t.ok(consumer2.completed, "The completion signal should pass through"); | |
|  | 53 | ||||
|  | 54 | const subj2 = observe<number>(({ next, complete }) => { | |||
|  | 55 | [1, 2, 3, 4, 5].forEach(next); | |||
|  | 56 | complete(); | |||
|  | 57 | }).pipe<string>(self => ({ next, complete, error }) => { | |||
|  | 58 | t.comment("subj2: subscribe"); | |||
|  | 59 | const h = self.subscribe({ | |||
|  | 60 | next: val => { | |||
|  | 61 | if (val % 2 === 0) | |||
|  | 62 | next("odd"); | |||
|  | 63 | else | |||
|  | 64 | next("even"); | |||
|  | 65 | }, | |||
|  | 66 | complete, | |||
|  | 67 | error | |||
|  | 68 | }); | |||
|  | 69 | return () =>{ | |||
|  | 70 | t.comment("subj2: unsubscribe"); | |||
|  | 71 | h.unsubscribe(); | |||
|  | 72 | }; | |||
|  | 73 | }); | |||
|  | 74 | ||||
|  | 75 | subj2.subscribe({ | |||
|  | 76 | next: val => t.comment("subj2: ", val), | |||
|  | 77 | complete: () => t.comment("subj2: complete") | |||
|  | 78 | }); | |||
|  | 79 | subj2.subscribe({ | |||
|  | 80 | next: val => t.comment("subj2: ", val), | |||
|  | 81 | complete: () => t.comment("subj2: complete") | |||
|  | 82 | }); No newline at end of file | |||
| @@ -30,7 +30,6 | |||||
| 30 | }, |  | 30 | }, | |
| 31 | "../djx/build/npm/package": { |  | 31 | "../djx/build/npm/package": { | |
| 32 | "name": "@implab/djx", |  | 32 | "name": "@implab/djx", | |
| 33 | "version": "1.4.4", |  | |||
| 34 | "dev": true, |  | 33 | "dev": true, | |
| 35 | "license": "BSD-2-Clause", |  | 34 | "license": "BSD-2-Clause", | |
| 36 | "peerDependencies": { |  | 35 | "peerDependencies": { | |
        
        General Comments 0
    
    
  
  
                      You need to be logged in to leave comments.
                      Login now
                    
                