##// END OF EJS Templates
added pipe method to observable
cin -
r114:e9a9ed6d7647 v1.5.0 default
parent child
Show More

The requested changes are too big and content was truncated. Show full diff

@@ -1,229 +1,233
1 import { PromiseOrValue } from "@implab/core-amd/interfaces";
1 import { PromiseOrValue } from "@implab/core-amd/interfaces";
2 import { isPromise } from "@implab/core-amd/safe";
2 import { isPromise } from "@implab/core-amd/safe";
3
3
4 /**
4 /**
5 * The interface for the consumer of an observable sequence
5 * The interface for the consumer of an observable sequence
6 */
6 */
7 export interface Observer<T> {
7 export interface Observer<T> {
8 /**
8 /**
9 * Called for the next element in the sequence
9 * Called for the next element in the sequence
10 */
10 */
11 next: (value: T) => void;
11 next: (value: T) => void;
12
12
13 /**
13 /**
14 * Called once when the error occurs in the sequence.
14 * Called once when the error occurs in the sequence.
15 */
15 */
16 error: (e: unknown) => void;
16 error: (e: unknown) => void;
17
17
18 /**
18 /**
19 * Called once at the end of the sequence.
19 * Called once at the end of the sequence.
20 */
20 */
21 complete: () => void;
21 complete: () => void;
22 }
22 }
23
23
24 /**
24 /**
25 * The group of functions to feed an observable. These methods are provided to
25 * The group of functions to feed an observable. These methods are provided to
26 * the producer to generate a stream of events.
26 * the producer to generate a stream of events.
27 */
27 */
28 export type Sink<T> = {
28 export type Sink<T> = {
29 /**
29 /**
30 * Call to send the next element in the sequence
30 * Call to send the next element in the sequence
31 */
31 */
32 next: (value: T) => void;
32 next: (value: T) => void;
33
33
34 /**
34 /**
35 * Call to notify about the error occurred in the sequence.
35 * Call to notify about the error occurred in the sequence.
36 */
36 */
37 error: (e: unknown) => void;
37 error: (e: unknown) => void;
38
38
39 /**
39 /**
40 * Call to signal the end of the sequence.
40 * Call to signal the end of the sequence.
41 */
41 */
42 complete: () => void;
42 complete: () => void;
43
43
44 /**
44 /**
45 * Checks whether the sink is accepting new elements. It's safe to
45 * Checks whether the sink is accepting new elements. It's safe to
46 * send elements to the closed sink.
46 * send elements to the closed sink.
47 */
47 */
48 isClosed: () => boolean;
48 isClosed: () => boolean;
49 };
49 };
50
50
51 export type Producer<T> = (sink: Sink<T>) => (void | (() => void));
51 export type Producer<T> = (sink: Sink<T>) => (void | (() => void));
52
52
53 export interface Unsubscribable {
53 export interface Unsubscribable {
54 unsubscribe(): void;
54 unsubscribe(): void;
55 }
55 }
56
56
57 export const isUnsubsribable = (v: unknown): v is Unsubscribable =>
57 export const isUnsubsribable = (v: unknown): v is Unsubscribable =>
58 v !== null && v !== undefined && typeof (v as Unsubscribable).unsubscribe === "function";
58 v !== null && v !== undefined && typeof (v as Unsubscribable).unsubscribe === "function";
59
59
60 export const isSubsribable = <T = unknown>(v: unknown): v is Subscribable<T> =>
60 export const isSubsribable = <T = unknown>(v: unknown): v is Subscribable<T> =>
61 v !== null && v !== undefined && typeof (v as Subscribable<unknown>).subscribe === "function";
61 v !== null && v !== undefined && typeof (v as Subscribable<unknown>).subscribe === "function";
62
62
63 export interface Subscribable<T> {
63 export interface Subscribable<T> {
64 subscribe(consumer: Partial<Observer<T>>): Unsubscribable;
64 subscribe(consumer: Partial<Observer<T>>): Unsubscribable;
65 }
65 }
66
66
67 /** The observable source of items. */
67 /** The observable source of items. */
68 export interface Observable<T> extends Subscribable<T> {
68 export interface Observable<T> extends Subscribable<T> {
69 /** Transforms elements of the sequence with the specified mapper
69 /** Transforms elements of the sequence with the specified mapper
70 *
70 *
71 * @param mapper The mapper used to transform the values
71 * @param mapper The mapper used to transform the values
72 */
72 */
73 map<T2>(mapper: (value: T) => T2): Observable<T2>;
73 map<T2>(mapper: (value: T) => T2): Observable<T2>;
74
74
75 /** Filters elements of the sequence. The resulting sequence will
75 /** Filters elements of the sequence. The resulting sequence will
76 * contain only elements which match the specified predicate.
76 * contain only elements which match the specified predicate.
77 *
77 *
78 * @param predicate The filter predicate.
78 * @param predicate The filter predicate.
79 */
79 */
80 filter(predicate: (value: T) => boolean): Observable<T>;
80 filter(predicate: (value: T) => boolean): Observable<T>;
81
81
82 /** Applies accumulator to each value in the sequence and
82 /** Applies accumulator to each value in the sequence and
83 * emits the accumulated value for each source element
83 * emits the accumulated value for each source element
84 *
84 *
85 * @param accumulator
85 * @param accumulator
86 * @param initial
86 * @param initial
87 */
87 */
88 scan<A>(accumulator: (acc: A, value: T) => A, initial: A): Observable<A>;
88 scan<A>(accumulator: (acc: A, value: T) => A, initial: A): Observable<A>;
89
89
90 cat(...seq: Subscribable<T>[]): Observable<T>;
90 cat(...seq: Subscribable<T>[]): Observable<T>;
91
92 pipe<U>(f: (source: Observable<T>) => Producer<U>): Observable<U>;
91 }
93 }
92
94
93 const noop = () => { };
95 const noop = () => { };
94
96
95 const sink = <T>(consumer: Partial<Observer<T>>) => {
97 const sink = <T>(consumer: Partial<Observer<T>>) => {
96 const { next, error, complete } = consumer;
98 const { next, error, complete } = consumer;
97 return {
99 return {
98 next: next ? next.bind(consumer) : noop,
100 next: next ? next.bind(consumer) : noop,
99 error: error ? error.bind(consumer) : noop,
101 error: error ? error.bind(consumer) : noop,
100 complete: complete ? complete.bind(consumer) : noop,
102 complete: complete ? complete.bind(consumer) : noop,
101 isClosed: () => false
103 isClosed: () => false
102 };
104 };
103 };
105 };
104
106
105 /** Wraps the producer to handle tear down logic and subscription management
107 /** Wraps the producer to handle tear down logic and subscription management
106 *
108 *
107 * @param producer The producer to wrap
109 * @param producer The producer to wrap
108 * @returns The wrapper producer
110 * @returns The wrapper producer
109 */
111 */
110 const fuse = <T>(producer: Producer<T>) => ({ next, error, complete }: Sink<T>) => {
112 const fuse = <T>(producer: Producer<T>) => ({ next, error, complete }: Sink<T>) => {
111 let done = false;
113 let done = false;
112 let cleanup = noop;
114 let cleanup = noop;
113
115
114 const _fin = <A extends unknown[]>(fn: (...args: A) => void) =>
116 const _fin = <A extends unknown[]>(fn: (...args: A) => void) =>
115 (...args: A) => done ?
117 (...args: A) => done ?
116 void (0) :
118 void (0) :
117 (done = true, cleanup(), fn(...args));
119 (done = true, cleanup(), fn(...args));
118
120
119 const safeSink = {
121 const safeSink = {
120 next: (value: T) => { !done && next(value); },
122 next: (value: T) => { !done && next(value); },
121 error: _fin(error),
123 error: _fin(error),
122 complete: _fin(complete),
124 complete: _fin(complete),
123 isClosed: () => done
125 isClosed: () => done
124 };
126 };
125 cleanup = producer(safeSink) ?? noop;
127 cleanup = producer(safeSink) ?? noop;
126 return done ?
128 return done ?
127 (cleanup(), noop) :
129 (cleanup(), noop) :
128 _fin(noop);
130 _fin(noop);
129 };
131 };
130
132
131 const _observe = <T>(producer: Producer<T>): Observable<T> => ({
133 const _observe = <T>(producer: Producer<T>): Observable<T> => ({
132 subscribe: (consumer: Partial<Observer<T>>) => ({
134 subscribe: (consumer: Partial<Observer<T>>) => ({
133 unsubscribe: producer(sink(consumer)) ?? noop
135 unsubscribe: producer(sink(consumer)) ?? noop
134 }),
136 }),
135 map: (mapper) => _observe(({ next, ...rest }) =>
137 map: (mapper) => _observe(({ next, ...rest }) =>
136 producer({
138 producer({
137 next: next !== noop ? (v: T) => next(mapper(v)) : noop,
139 next: next !== noop ? (v: T) => next(mapper(v)) : noop,
138 ...rest
140 ...rest
139 })
141 })
140 ),
142 ),
141 filter: (predicate) => _observe(({ next, ...rest }) =>
143 filter: (predicate) => _observe(({ next, ...rest }) =>
142 producer({
144 producer({
143 next: next !== noop ? (v: T) => predicate(v) ? next(v) : void (0) : noop,
145 next: next !== noop ? (v: T) => predicate(v) ? next(v) : void (0) : noop,
144 ...rest
146 ...rest
145 })
147 })
146 ),
148 ),
147 scan: (accumulator, initial) => _observe(({ next, ...rest }) => {
149 scan: (accumulator, initial) => _observe(({ next, ...rest }) => {
148 let _acc = initial;
150 let _acc = initial;
149 return producer({
151 return producer({
150 next: next !== noop ? (v: T) => next(_acc = accumulator(_acc, v)) : noop,
152 next: next !== noop ? (v: T) => next(_acc = accumulator(_acc, v)) : noop,
151 ...rest
153 ...rest
152 });
154 });
153 }),
155 }),
154
156
155 cat: (...seq) => _observe(({ next, complete: final, ...rest }) => {
157 cat: (...seq) => _observe(({ next, complete: final, ...rest }) => {
156 let cleanup: () => void;
158 let cleanup: () => void;
157 const complete = () => {
159 const complete = () => {
158 const continuation = seq.shift();
160 const continuation = seq.shift();
159 if (continuation) {
161 if (continuation) {
160 // if we have a next sequence, subscribe to it
162 // if we have a next sequence, subscribe to it
161 const subscription = continuation.subscribe({ next, complete, ...rest });
163 const subscription = continuation.subscribe({ next, complete, ...rest });
162 cleanup = subscription.unsubscribe.bind(subscription);
164 cleanup = subscription.unsubscribe.bind(subscription);
163 } else {
165 } else {
164 // otherwise notify the consumer about completion
166 // otherwise notify the consumer about completion
165 final();
167 final();
166 }
168 }
167 };
169 };
168
170
169 cleanup = producer({ next, complete, ...rest }) ?? noop;
171 cleanup = producer({ next, complete, ...rest }) ?? noop;
170
172
171 return () => cleanup();
173 return () => cleanup();
172 })
174 }),
175
176 pipe: <U>(f: (source: Observable<T>) => Producer<U>) => observe(f(_observe(producer)))
173 });
177 });
174
178
175 export interface OrderUpdate<T> {
179 export interface OrderUpdate<T> {
176 /** The item is being updated */
180 /** The item is being updated */
177 item: T;
181 item: T;
178
182
179 /** The previous index of the item, -1 in case it is inserted */
183 /** The previous index of the item, -1 in case it is inserted */
180 prevIndex: number;
184 prevIndex: number;
181
185
182 /** The new index of the item, -1 in case it is deleted */
186 /** The new index of the item, -1 in case it is deleted */
183 newIndex: number;
187 newIndex: number;
184 }
188 }
185
189
186 interface ObservableResults<T> {
190 interface ObservableResults<T> {
187 /**
191 /**
188 * Allows observation of results
192 * Allows observation of results
189 */
193 */
190 observe(listener: (object: T, previousIndex: number, newIndex: number) => void, includeUpdates?: boolean): {
194 observe(listener: (object: T, previousIndex: number, newIndex: number) => void, includeUpdates?: boolean): {
191 remove(): void;
195 remove(): void;
192 };
196 };
193 }
197 }
194
198
195 interface Queryable<T, A extends unknown[]> {
199 interface Queryable<T, A extends unknown[]> {
196 query(...args: A): PromiseOrValue<T[]>;
200 query(...args: A): PromiseOrValue<T[]>;
197 }
201 }
198
202
199 export const isObservableResults = <T>(v: object): v is ObservableResults<T> =>
203 export const isObservableResults = <T>(v: object): v is ObservableResults<T> =>
200 v && (typeof (v as { observe?: unknown; }).observe === "function");
204 v && (typeof (v as { observe?: unknown; }).observe === "function");
201
205
202 export const observe = <T>(producer: Producer<T>) => _observe(fuse(producer));
206 export const observe = <T>(producer: Producer<T>) => _observe(fuse(producer));
203
207
204 export const empty = observe<never>(({ complete }) => complete());
208 export const empty = observe<never>(({ complete }) => complete());
205
209
206 export const query = <T, A extends unknown[]>(store: Queryable<T, A>) =>
210 export const query = <T, A extends unknown[]>(store: Queryable<T, A>) =>
207 (...args: A) => {
211 (...args: A) => {
208 return observe<OrderUpdate<T>>(({ next, complete, error }) => {
212 return observe<OrderUpdate<T>>(({ next, complete, error }) => {
209 try {
213 try {
210 const results = store.query(...args);
214 const results = store.query(...args);
211 if (isPromise(results)) {
215 if (isPromise(results)) {
212 results.then(items => items.forEach((item, newIndex) => next({ item, newIndex, prevIndex: -1 })))
216 results.then(items => items.forEach((item, newIndex) => next({ item, newIndex, prevIndex: -1 })))
213 .then(undefined, error);
217 .then(undefined, error);
214 } else {
218 } else {
215 results.forEach((item, newIndex) => next({ item, newIndex, prevIndex: -1 }));
219 results.forEach((item, newIndex) => next({ item, newIndex, prevIndex: -1 }));
216 }
220 }
217
221
218 if (isObservableResults<T>(results)) {
222 if (isObservableResults<T>(results)) {
219 const h = results.observe((item, prevIndex, newIndex) => next({ item, prevIndex, newIndex }));
223 const h = results.observe((item, prevIndex, newIndex) => next({ item, prevIndex, newIndex }));
220 return () => h.remove();
224 return () => h.remove();
221 } else {
225 } else {
222 complete();
226 complete();
223 }
227 }
224 } catch (err) {
228 } catch (err) {
225 error(err);
229 error(err);
226 }
230 }
227 });
231 });
228
232
229 };
233 };
@@ -1,52 +1,82
1 import { observe } from "./observable";
1 import { observe } from "./observable";
2 import * as t from "tap";
2 import * as t from "tap";
3
3
4 const subj1 = observe<number>(({ next, complete }) => {
4 const subj1 = observe<number>(({ next, complete }) => {
5 next(1);
5 next(1);
6 complete();
6 complete();
7 next(2);
7 next(2);
8 });
8 });
9
9
10 const consumer1 = {
10 const consumer1 = {
11 sum: 0,
11 sum: 0,
12 next(v: number) {
12 next(v: number) {
13 this.sum += v;
13 this.sum += v;
14 }
14 }
15 };
15 };
16
16
17 subj1.subscribe(consumer1);
17 subj1.subscribe(consumer1);
18 t.equal(consumer1.sum, 1, "Should get only one value");
18 t.equal(consumer1.sum, 1, "Should get only one value");
19
19
20 subj1.subscribe(consumer1);
20 subj1.subscribe(consumer1);
21 t.equal(consumer1.sum, 2, "Should get the value again");
21 t.equal(consumer1.sum, 2, "Should get the value again");
22
22
23 const consumer2 = {
23 const consumer2 = {
24 value: 0,
24 value: 0,
25 completed: false,
25 completed: false,
26 next(v: number) { this.value = v; },
26 next(v: number) { this.value = v; },
27 complete() { this.completed = true; }
27 complete() { this.completed = true; }
28 };
28 };
29
29
30 let maps = 0;
30 let maps = 0;
31
31
32 subj1
32 subj1
33 .map(v => {
33 .map(v => {
34 t.comment(`map1: ${v * 2}`);
34 t.comment(`map1: ${v * 2}`);
35 maps++;
35 maps++;
36 return v * 2;
36 return v * 2;
37 })
37 })
38 .map (v => {
38 .map(v => {
39 t.comment(`map2: ${v * 2}`);
39 t.comment(`map2: ${v * 2}`);
40 maps++;
40 maps++;
41 return v * 2;
41 return v * 2;
42 })
42 })
43 .map(v => {
43 .map(v => {
44 t.comment(`map3: ${v * 2}`);
44 t.comment(`map3: ${v * 2}`);
45 maps++;
45 maps++;
46 return v * 2;
46 return v * 2;
47 })
47 })
48 .subscribe(consumer2);
48 .subscribe(consumer2);
49
49
50 t.equal(consumer2.value, 8, "Should map");
50 t.equal(consumer2.value, 8, "Should map");
51 t.equal(maps, 3, "The map chain should not be executed after completion");
51 t.equal(maps, 3, "The map chain should not be executed after completion");
52 t.ok(consumer2.completed, "The completion signal should pass through");
52 t.ok(consumer2.completed, "The completion signal should pass through");
53
54 const subj2 = observe<number>(({ next, complete }) => {
55 [1, 2, 3, 4, 5].forEach(next);
56 complete();
57 }).pipe<string>(self => ({ next, complete, error }) => {
58 t.comment("subj2: subscribe");
59 const h = self.subscribe({
60 next: val => {
61 if (val % 2 === 0)
62 next("odd");
63 else
64 next("even");
65 },
66 complete,
67 error
68 });
69 return () =>{
70 t.comment("subj2: unsubscribe");
71 h.unsubscribe();
72 };
73 });
74
75 subj2.subscribe({
76 next: val => t.comment("subj2: ", val),
77 complete: () => t.comment("subj2: complete")
78 });
79 subj2.subscribe({
80 next: val => t.comment("subj2: ", val),
81 complete: () => t.comment("subj2: complete")
82 }); No newline at end of file
1 NO CONTENT: modified file
NO CONTENT: modified file
The requested commit or file is too big and content was truncated. Show full diff
General Comments 0
You need to be logged in to leave comments. Login now