##// END OF EJS Templates
added pipe method to observable
cin -
r114:e9a9ed6d7647 v1.5.0 default
parent child
Show More

The requested changes are too big and content was truncated. Show full diff

@@ -1,229 +1,233
1 1 import { PromiseOrValue } from "@implab/core-amd/interfaces";
2 2 import { isPromise } from "@implab/core-amd/safe";
3 3
4 4 /**
5 5 * The interface for the consumer of an observable sequence
6 6 */
7 7 export interface Observer<T> {
8 8 /**
9 9 * Called for the next element in the sequence
10 10 */
11 11 next: (value: T) => void;
12 12
13 13 /**
14 14 * Called once when the error occurs in the sequence.
15 15 */
16 16 error: (e: unknown) => void;
17 17
18 18 /**
19 19 * Called once at the end of the sequence.
20 20 */
21 21 complete: () => void;
22 22 }
23 23
24 24 /**
25 25 * The group of functions to feed an observable. These methods are provided to
26 26 * the producer to generate a stream of events.
27 27 */
28 28 export type Sink<T> = {
29 29 /**
30 30 * Call to send the next element in the sequence
31 31 */
32 32 next: (value: T) => void;
33 33
34 34 /**
35 35 * Call to notify about the error occurred in the sequence.
36 36 */
37 37 error: (e: unknown) => void;
38 38
39 39 /**
40 40 * Call to signal the end of the sequence.
41 41 */
42 42 complete: () => void;
43 43
44 44 /**
45 45 * Checks whether the sink is accepting new elements. It's safe to
46 46 * send elements to the closed sink.
47 47 */
48 48 isClosed: () => boolean;
49 49 };
50 50
51 51 export type Producer<T> = (sink: Sink<T>) => (void | (() => void));
52 52
53 53 export interface Unsubscribable {
54 54 unsubscribe(): void;
55 55 }
56 56
57 57 export const isUnsubsribable = (v: unknown): v is Unsubscribable =>
58 58 v !== null && v !== undefined && typeof (v as Unsubscribable).unsubscribe === "function";
59 59
60 60 export const isSubsribable = <T = unknown>(v: unknown): v is Subscribable<T> =>
61 61 v !== null && v !== undefined && typeof (v as Subscribable<unknown>).subscribe === "function";
62 62
63 63 export interface Subscribable<T> {
64 64 subscribe(consumer: Partial<Observer<T>>): Unsubscribable;
65 65 }
66 66
67 67 /** The observable source of items. */
68 68 export interface Observable<T> extends Subscribable<T> {
69 69 /** Transforms elements of the sequence with the specified mapper
70 70 *
71 71 * @param mapper The mapper used to transform the values
72 72 */
73 73 map<T2>(mapper: (value: T) => T2): Observable<T2>;
74 74
75 75 /** Filters elements of the sequence. The resulting sequence will
76 76 * contain only elements which match the specified predicate.
77 77 *
78 78 * @param predicate The filter predicate.
79 79 */
80 80 filter(predicate: (value: T) => boolean): Observable<T>;
81 81
82 82 /** Applies accumulator to each value in the sequence and
83 83 * emits the accumulated value for each source element
84 84 *
85 85 * @param accumulator
86 86 * @param initial
87 87 */
88 88 scan<A>(accumulator: (acc: A, value: T) => A, initial: A): Observable<A>;
89 89
90 90 cat(...seq: Subscribable<T>[]): Observable<T>;
91
92 pipe<U>(f: (source: Observable<T>) => Producer<U>): Observable<U>;
91 93 }
92 94
93 95 const noop = () => { };
94 96
95 97 const sink = <T>(consumer: Partial<Observer<T>>) => {
96 98 const { next, error, complete } = consumer;
97 99 return {
98 100 next: next ? next.bind(consumer) : noop,
99 101 error: error ? error.bind(consumer) : noop,
100 102 complete: complete ? complete.bind(consumer) : noop,
101 103 isClosed: () => false
102 104 };
103 105 };
104 106
105 107 /** Wraps the producer to handle tear down logic and subscription management
106 108 *
107 109 * @param producer The producer to wrap
108 110 * @returns The wrapper producer
109 111 */
110 112 const fuse = <T>(producer: Producer<T>) => ({ next, error, complete }: Sink<T>) => {
111 113 let done = false;
112 114 let cleanup = noop;
113 115
114 116 const _fin = <A extends unknown[]>(fn: (...args: A) => void) =>
115 117 (...args: A) => done ?
116 118 void (0) :
117 119 (done = true, cleanup(), fn(...args));
118 120
119 121 const safeSink = {
120 122 next: (value: T) => { !done && next(value); },
121 123 error: _fin(error),
122 124 complete: _fin(complete),
123 125 isClosed: () => done
124 126 };
125 127 cleanup = producer(safeSink) ?? noop;
126 128 return done ?
127 129 (cleanup(), noop) :
128 130 _fin(noop);
129 131 };
130 132
131 133 const _observe = <T>(producer: Producer<T>): Observable<T> => ({
132 134 subscribe: (consumer: Partial<Observer<T>>) => ({
133 135 unsubscribe: producer(sink(consumer)) ?? noop
134 136 }),
135 137 map: (mapper) => _observe(({ next, ...rest }) =>
136 138 producer({
137 139 next: next !== noop ? (v: T) => next(mapper(v)) : noop,
138 140 ...rest
139 141 })
140 142 ),
141 143 filter: (predicate) => _observe(({ next, ...rest }) =>
142 144 producer({
143 145 next: next !== noop ? (v: T) => predicate(v) ? next(v) : void (0) : noop,
144 146 ...rest
145 147 })
146 148 ),
147 149 scan: (accumulator, initial) => _observe(({ next, ...rest }) => {
148 150 let _acc = initial;
149 151 return producer({
150 152 next: next !== noop ? (v: T) => next(_acc = accumulator(_acc, v)) : noop,
151 153 ...rest
152 154 });
153 155 }),
154 156
155 157 cat: (...seq) => _observe(({ next, complete: final, ...rest }) => {
156 158 let cleanup: () => void;
157 159 const complete = () => {
158 160 const continuation = seq.shift();
159 161 if (continuation) {
160 162 // if we have a next sequence, subscribe to it
161 163 const subscription = continuation.subscribe({ next, complete, ...rest });
162 164 cleanup = subscription.unsubscribe.bind(subscription);
163 165 } else {
164 166 // otherwise notify the consumer about completion
165 167 final();
166 168 }
167 169 };
168 170
169 171 cleanup = producer({ next, complete, ...rest }) ?? noop;
170 172
171 173 return () => cleanup();
172 })
174 }),
175
176 pipe: <U>(f: (source: Observable<T>) => Producer<U>) => observe(f(_observe(producer)))
173 177 });
174 178
175 179 export interface OrderUpdate<T> {
176 180 /** The item is being updated */
177 181 item: T;
178 182
179 183 /** The previous index of the item, -1 in case it is inserted */
180 184 prevIndex: number;
181 185
182 186 /** The new index of the item, -1 in case it is deleted */
183 187 newIndex: number;
184 188 }
185 189
186 190 interface ObservableResults<T> {
187 191 /**
188 192 * Allows observation of results
189 193 */
190 194 observe(listener: (object: T, previousIndex: number, newIndex: number) => void, includeUpdates?: boolean): {
191 195 remove(): void;
192 196 };
193 197 }
194 198
195 199 interface Queryable<T, A extends unknown[]> {
196 200 query(...args: A): PromiseOrValue<T[]>;
197 201 }
198 202
199 203 export const isObservableResults = <T>(v: object): v is ObservableResults<T> =>
200 204 v && (typeof (v as { observe?: unknown; }).observe === "function");
201 205
202 206 export const observe = <T>(producer: Producer<T>) => _observe(fuse(producer));
203 207
204 208 export const empty = observe<never>(({ complete }) => complete());
205 209
206 210 export const query = <T, A extends unknown[]>(store: Queryable<T, A>) =>
207 211 (...args: A) => {
208 212 return observe<OrderUpdate<T>>(({ next, complete, error }) => {
209 213 try {
210 214 const results = store.query(...args);
211 215 if (isPromise(results)) {
212 216 results.then(items => items.forEach((item, newIndex) => next({ item, newIndex, prevIndex: -1 })))
213 217 .then(undefined, error);
214 218 } else {
215 219 results.forEach((item, newIndex) => next({ item, newIndex, prevIndex: -1 }));
216 220 }
217 221
218 222 if (isObservableResults<T>(results)) {
219 223 const h = results.observe((item, prevIndex, newIndex) => next({ item, prevIndex, newIndex }));
220 224 return () => h.remove();
221 225 } else {
222 226 complete();
223 227 }
224 228 } catch (err) {
225 229 error(err);
226 230 }
227 231 });
228 232
229 233 };
@@ -1,52 +1,82
1 1 import { observe } from "./observable";
2 2 import * as t from "tap";
3 3
4 4 const subj1 = observe<number>(({ next, complete }) => {
5 5 next(1);
6 6 complete();
7 7 next(2);
8 8 });
9 9
10 10 const consumer1 = {
11 11 sum: 0,
12 12 next(v: number) {
13 13 this.sum += v;
14 14 }
15 15 };
16 16
17 17 subj1.subscribe(consumer1);
18 18 t.equal(consumer1.sum, 1, "Should get only one value");
19 19
20 20 subj1.subscribe(consumer1);
21 21 t.equal(consumer1.sum, 2, "Should get the value again");
22 22
23 23 const consumer2 = {
24 24 value: 0,
25 25 completed: false,
26 26 next(v: number) { this.value = v; },
27 27 complete() { this.completed = true; }
28 28 };
29 29
30 30 let maps = 0;
31 31
32 32 subj1
33 33 .map(v => {
34 34 t.comment(`map1: ${v * 2}`);
35 35 maps++;
36 36 return v * 2;
37 37 })
38 .map (v => {
38 .map(v => {
39 39 t.comment(`map2: ${v * 2}`);
40 40 maps++;
41 41 return v * 2;
42 42 })
43 43 .map(v => {
44 44 t.comment(`map3: ${v * 2}`);
45 45 maps++;
46 46 return v * 2;
47 47 })
48 48 .subscribe(consumer2);
49 49
50 50 t.equal(consumer2.value, 8, "Should map");
51 51 t.equal(maps, 3, "The map chain should not be executed after completion");
52 52 t.ok(consumer2.completed, "The completion signal should pass through");
53
54 const subj2 = observe<number>(({ next, complete }) => {
55 [1, 2, 3, 4, 5].forEach(next);
56 complete();
57 }).pipe<string>(self => ({ next, complete, error }) => {
58 t.comment("subj2: subscribe");
59 const h = self.subscribe({
60 next: val => {
61 if (val % 2 === 0)
62 next("odd");
63 else
64 next("even");
65 },
66 complete,
67 error
68 });
69 return () =>{
70 t.comment("subj2: unsubscribe");
71 h.unsubscribe();
72 };
73 });
74
75 subj2.subscribe({
76 next: val => t.comment("subj2: ", val),
77 complete: () => t.comment("subj2: complete")
78 });
79 subj2.subscribe({
80 next: val => t.comment("subj2: ", val),
81 complete: () => t.comment("subj2: complete")
82 }); No newline at end of file
1 NO CONTENT: modified file
The requested commit or file is too big and content was truncated. Show full diff
General Comments 0
You need to be logged in to leave comments. Login now