##// END OF EJS Templates
added tap() method to observable...
cin -
r144:63215d91ae4b v1.9.0-rc6 default
parent child
Show More
@@ -70,7 +70,10 export const isSubscribable = <T = unkno
70 v !== null && v !== undefined && typeof (v as Subscribable<unknown>).subscribe === "function";
70 v !== null && v !== undefined && typeof (v as Subscribable<unknown>).subscribe === "function";
71
71
72 export interface Subscribable<T> {
72 export interface Subscribable<T> {
73 subscribe(consumer: Observer<T>): Unsubscribable;
73 /** Subscribes a consumer to events. If a consumer isn't specified
74 * this method activates the producer to achieve side affects if any.
75 */
76 subscribe(consumer?: Observer<T>): Unsubscribable;
74 }
77 }
75
78
76 export type AccumulatorFn<T, A> = (acc: A, value: T) => A;
79 export type AccumulatorFn<T, A> = (acc: A, value: T) => A;
@@ -85,6 +88,13 export interface Observable<T> extends S
85 */
88 */
86 map<T2>(mapper: (value: T) => T2): Observable<T2>;
89 map<T2>(mapper: (value: T) => T2): Observable<T2>;
87
90
91 /** Injects the specified observer into the each producer to consumer chain.
92 * The method is used to add side effect to the events processing.
93 *
94 * @param observer The consumer for the events
95 */
96 tap(observer: Observer<T>): Observable<T>;
97
88 /** Filters elements of the sequence. The resulting sequence will
98 /** Filters elements of the sequence. The resulting sequence will
89 * contain only elements which match the specified predicate.
99 * contain only elements which match the specified predicate.
90 *
100 *
@@ -203,7 +213,7 const fuse = <T>(producer: Producer<T>)
203 };
213 };
204
214
205 const _observe = <T>(producer: FusedProducer<T>): Observable<T> => ({
215 const _observe = <T>(producer: FusedProducer<T>): Observable<T> => ({
206 subscribe: (consumer: Observer<T>) => ({
216 subscribe: (consumer: Observer<T> = {}) => ({
207 unsubscribe: producer(sink(consumer)) ?? noop
217 unsubscribe: producer(sink(consumer)) ?? noop
208 }),
218 }),
209
219
@@ -214,6 +224,14 const _observe = <T>(producer: FusedProd
214 })
224 })
215 ),
225 ),
216
226
227 tap: ({next: tapNext, complete: tapComplete, error: tapError}) => _observe(({next,complete, error}) =>
228 producer({
229 next: tapNext ? (v => (tapNext(v), next(v))) : next,
230 complete: tapComplete ? (() => (tapComplete(), complete())): complete,
231 error: tapError ? (e => (tapError(e), error(e))) : error
232 })
233 ),
234
217 filter: (predicate) => _observe(({ next, ...rest }) =>
235 filter: (predicate) => _observe(({ next, ...rest }) =>
218 producer({
236 producer({
219 next: next !== noop ? (v: T) => predicate(v) ? next(v) : void (0) : noop,
237 next: next !== noop ? (v: T) => predicate(v) ? next(v) : void (0) : noop,
@@ -33,34 +33,61 interface Queryable<T, Q, O> {
33 export const isDjObservableResults = <T>(v: object): v is DjObservableResults<T> =>
33 export const isDjObservableResults = <T>(v: object): v is DjObservableResults<T> =>
34 v && (typeof (v as { observe?: unknown; }).observe === "function");
34 v && (typeof (v as { observe?: unknown; }).observe === "function");
35
35
36 export const query = <T, Q, O>(store: Queryable<T, Q, O>, includeUpdates = true) =>
36 export const query = <T, Q, O>(store: Queryable<T, Q, O>, includeUpdates = true) => {
37 (query?: Q, options?: O & { observe?: boolean }) => {
37 const q = queryEx(store, includeUpdates);
38 return observe<OrderedUpdate<T>>(({ next, complete, error, isClosed }) => {
38 return (query?: Q, options?: O & { observe?: boolean }) => {
39 const [data, updates] = q(query, options);
40
41 return options?.observe === false ? data : data.cat(updates);
42 };
43 };
39
44
45 export const queryEx = <T, Q, O>(store: Queryable<T, Q, O>, includeUpdates = true) =>
46 (query?: Q, options?: O): [data: QueryResults<T>, updates: QueryResults<T>] => {
47
48 const pending: T[] = [];
49
50 let results: PromiseOrValue<T[]> = pending;
51
52 const data = observe<OrderedUpdate<T>>(({ next, complete, error }) => {
40 const processResults = (items: T[]) =>
53 const processResults = (items: T[]) =>
41 items.forEach((item, newIndex) => next({ item, newIndex, prevIndex: -1 }));
54 items.forEach((item, newIndex) => next({ item, newIndex, prevIndex: -1 }));
42
55
43 try {
56 try {
44 const results = store.query(query, options);
57 if (results === pending)
58 results = store.query(query, options);
59
45 if (isPromise(results)) {
60 if (isPromise(results)) {
46 results.then(processResults).then(undefined, error);
61 results.then(processResults).then(complete, error);
62
63 if (isCancellable(results))
64 return results.cancel.bind(results);
47 } else {
65 } else {
48 processResults(results);
66 processResults(results);
67 complete();
49 }
68 }
69 } catch (e) {
70 error(e);
71 }
72 });
50
73
51 if (!isClosed() && (options?.observe !== false) && isDjObservableResults<T>(results)) {
74 const updates = observe<OrderedUpdate<T>>(({ next, complete, error, isClosed }) => {
75 try {
76 if (!isClosed() && isDjObservableResults<T>(results)) {
52 const h = results.observe((item, prevIndex, newIndex) => next({ item, prevIndex, newIndex }), includeUpdates);
77 const h = results.observe((item, prevIndex, newIndex) => next({ item, prevIndex, newIndex }), includeUpdates);
53 return () => h.remove();
78 return () => h.remove();
54 } else {
79 } else {
55 complete();
80 complete();
56 }
81 }
57 } catch (err) {
82 } catch (e) {
58 error(err);
83 error(e);
59 }
84 }
60 });
85 });
61
86
87 return [data, updates];
62 };
88 };
63
89
90
64 interface IndexedStore<T> {
91 interface IndexedStore<T> {
65 get(id: string | number): PromiseLike<T> | T | null | undefined;
92 get(id: string | number): PromiseLike<T> | T | null | undefined;
66 }
93 }
@@ -76,11 +103,11 interface GetOpts {
76 observe?: boolean;
103 observe?: boolean;
77 }
104 }
78
105
79 type Change<T> = [item: NonNullable<T>, id: string | number | undefined] |
106 export type ItemUpdate<T> = [item: NonNullable<T>, id: string | number | undefined] |
80 [item: undefined | null, id: string | number];
107 [item: undefined | null, id: string | number];
81
108
82 const filterItem = (itemId: string | number) =>
109 const filterItem = (itemId: string | number) =>
83 <T>(source: Observable<Change<T>>) =>
110 <T>(source: Observable<ItemUpdate<T>>) =>
84 observe<T>(({ next, complete, error }) => {
111 observe<T>(({ next, complete, error }) => {
85 const subscription = source
112 const subscription = source
86 .filter(([, id]) => id === itemId)
113 .filter(([, id]) => id === itemId)
@@ -94,8 +121,8 const filterItem = (itemId: string | num
94
121
95 export const get = <T>(store: IndexedStore<T>) => {
122 export const get = <T>(store: IndexedStore<T>) => {
96 const changes = hasNotifications<T>(store) ?
123 const changes = hasNotifications<T>(store) ?
97 observe<Change<T>>(({ next }) => {
124 observe<ItemUpdate<T>>(({ next }) => {
98 const handle = after(store, "notify", (...args: Change<T>) => next(args), true);
125 const handle = after(store, "notify", (...args: ItemUpdate<T>) => next(args), true);
99 return () => handle.remove();
126 return () => handle.remove();
100 }) : empty;
127 }) : empty;
101
128
@@ -1,6 +1,6
1 import Memory = require("dojo/store/Memory");
1 import Memory = require("dojo/store/Memory");
2 import Observerable = require("dojo/store/Observable");
2 import Observerable = require("dojo/store/Observable");
3 import { get } from "./store";
3 import { get, queryEx } from "./store";
4 import tap = require("tap");
4 import tap = require("tap");
5
5
6 interface Person {
6 interface Person {
@@ -61,5 +61,29 tap.test("store::get(...) tests", async
61 samSubscription.unsubscribe();
61 samSubscription.unsubscribe();
62 }
62 }
63
63
64 store.put({ id: samId, name: "Sam", age: 29});
65
66 const [data, updates] = queryEx(store)({ age: 29}, { sort: [{attribute: "id"}] });
67
68 const dump: string[] = [];
69
70 const subscription = data
71 .tap({
72 complete: () => dump.push("eof")
73 })
74 .cat(updates)
75 .tap({
76 next: ({item: {id}}) => dump.push(id),
77 complete: () => dump.push("eof")
78 })
79 .subscribe({});
80
81 t.same(dump, ["id:peter", "id:sam", "eof"]);
82
83 store.put({ id: "id:mary", name: "Mary", age: 29});
84
85 t.same(dump, ["id:peter", "id:sam", "eof", "id:mary"]);
86
87 subscription.unsubscribe();
64
88
65 }).catch(() => { }); No newline at end of file
89 }).catch(() => { });
@@ -147,4 +147,27 tap.test("of(...) tests", async t => {
147 t.same(await of(1,2,3).collect(), [1,2,3], ".collect() should return the collected sequence");
147 t.same(await of(1,2,3).collect(), [1,2,3], ".collect() should return the collected sequence");
148 await t.rejects(of(1,2,3).collect(cancelled), ".collect() should support cancellation");
148 await t.rejects(of(1,2,3).collect(cancelled), ".collect() should support cancellation");
149
149
150 }).catch(() => { });
151
152 tap.test(".tap() tests", async t => {
153 const side: number[] = [];
154
155 of(1,2)
156 .tap({next: v => side.push(v), complete: () => side.push(0)})
157 .tap({next: v => side.push(v*v)})
158 .subscribe({});
159
160 t.same(side, [1,1,2,4,0], ".tap() should be called in the order of registration");
161
162 side.length = 0;
163
164 await new Promise<void>(resolve => {
165 of(1,2,delay(1).then(() => 3))
166 .tap({next: v => side.push(v)})
167 .tap({ next: v => v === 1 && resolve()})
168 .subscribe({});
169 });
170
171 t.same(side, [1,2], ".tap() should be processed synchronously");
172
150 }).catch(() => { }); No newline at end of file
173 }).catch(() => {});
General Comments 0
You need to be logged in to leave comments. Login now