##// END OF EJS Templates
added tap() method to observable...
cin -
r144:63215d91ae4b v1.9.0-rc6 default
parent child
Show More
@@ -70,7 +70,10 export const isSubscribable = <T = unkno
70 70 v !== null && v !== undefined && typeof (v as Subscribable<unknown>).subscribe === "function";
71 71
72 72 export interface Subscribable<T> {
73 subscribe(consumer: Observer<T>): Unsubscribable;
73 /** Subscribes a consumer to events. If a consumer isn't specified
74 * this method activates the producer to achieve side affects if any.
75 */
76 subscribe(consumer?: Observer<T>): Unsubscribable;
74 77 }
75 78
76 79 export type AccumulatorFn<T, A> = (acc: A, value: T) => A;
@@ -85,6 +88,13 export interface Observable<T> extends S
85 88 */
86 89 map<T2>(mapper: (value: T) => T2): Observable<T2>;
87 90
91 /** Injects the specified observer into the each producer to consumer chain.
92 * The method is used to add side effect to the events processing.
93 *
94 * @param observer The consumer for the events
95 */
96 tap(observer: Observer<T>): Observable<T>;
97
88 98 /** Filters elements of the sequence. The resulting sequence will
89 99 * contain only elements which match the specified predicate.
90 100 *
@@ -203,7 +213,7 const fuse = <T>(producer: Producer<T>)
203 213 };
204 214
205 215 const _observe = <T>(producer: FusedProducer<T>): Observable<T> => ({
206 subscribe: (consumer: Observer<T>) => ({
216 subscribe: (consumer: Observer<T> = {}) => ({
207 217 unsubscribe: producer(sink(consumer)) ?? noop
208 218 }),
209 219
@@ -214,6 +224,14 const _observe = <T>(producer: FusedProd
214 224 })
215 225 ),
216 226
227 tap: ({next: tapNext, complete: tapComplete, error: tapError}) => _observe(({next,complete, error}) =>
228 producer({
229 next: tapNext ? (v => (tapNext(v), next(v))) : next,
230 complete: tapComplete ? (() => (tapComplete(), complete())): complete,
231 error: tapError ? (e => (tapError(e), error(e))) : error
232 })
233 ),
234
217 235 filter: (predicate) => _observe(({ next, ...rest }) =>
218 236 producer({
219 237 next: next !== noop ? (v: T) => predicate(v) ? next(v) : void (0) : noop,
@@ -33,34 +33,61 interface Queryable<T, Q, O> {
33 33 export const isDjObservableResults = <T>(v: object): v is DjObservableResults<T> =>
34 34 v && (typeof (v as { observe?: unknown; }).observe === "function");
35 35
36 export const query = <T, Q, O>(store: Queryable<T, Q, O>, includeUpdates = true) =>
37 (query?: Q, options?: O & { observe?: boolean }) => {
38 return observe<OrderedUpdate<T>>(({ next, complete, error, isClosed }) => {
36 export const query = <T, Q, O>(store: Queryable<T, Q, O>, includeUpdates = true) => {
37 const q = queryEx(store, includeUpdates);
38 return (query?: Q, options?: O & { observe?: boolean }) => {
39 const [data, updates] = q(query, options);
40
41 return options?.observe === false ? data : data.cat(updates);
42 };
43 };
39 44
45 export const queryEx = <T, Q, O>(store: Queryable<T, Q, O>, includeUpdates = true) =>
46 (query?: Q, options?: O): [data: QueryResults<T>, updates: QueryResults<T>] => {
47
48 const pending: T[] = [];
49
50 let results: PromiseOrValue<T[]> = pending;
51
52 const data = observe<OrderedUpdate<T>>(({ next, complete, error }) => {
40 53 const processResults = (items: T[]) =>
41 54 items.forEach((item, newIndex) => next({ item, newIndex, prevIndex: -1 }));
42 55
43 56 try {
44 const results = store.query(query, options);
57 if (results === pending)
58 results = store.query(query, options);
59
45 60 if (isPromise(results)) {
46 results.then(processResults).then(undefined, error);
61 results.then(processResults).then(complete, error);
62
63 if (isCancellable(results))
64 return results.cancel.bind(results);
47 65 } else {
48 66 processResults(results);
67 complete();
49 68 }
69 } catch (e) {
70 error(e);
71 }
72 });
50 73
51 if (!isClosed() && (options?.observe !== false) && isDjObservableResults<T>(results)) {
74 const updates = observe<OrderedUpdate<T>>(({ next, complete, error, isClosed }) => {
75 try {
76 if (!isClosed() && isDjObservableResults<T>(results)) {
52 77 const h = results.observe((item, prevIndex, newIndex) => next({ item, prevIndex, newIndex }), includeUpdates);
53 78 return () => h.remove();
54 79 } else {
55 80 complete();
56 81 }
57 } catch (err) {
58 error(err);
82 } catch (e) {
83 error(e);
59 84 }
60 85 });
61 86
87 return [data, updates];
62 88 };
63 89
90
64 91 interface IndexedStore<T> {
65 92 get(id: string | number): PromiseLike<T> | T | null | undefined;
66 93 }
@@ -76,11 +103,11 interface GetOpts {
76 103 observe?: boolean;
77 104 }
78 105
79 type Change<T> = [item: NonNullable<T>, id: string | number | undefined] |
106 export type ItemUpdate<T> = [item: NonNullable<T>, id: string | number | undefined] |
80 107 [item: undefined | null, id: string | number];
81 108
82 109 const filterItem = (itemId: string | number) =>
83 <T>(source: Observable<Change<T>>) =>
110 <T>(source: Observable<ItemUpdate<T>>) =>
84 111 observe<T>(({ next, complete, error }) => {
85 112 const subscription = source
86 113 .filter(([, id]) => id === itemId)
@@ -94,8 +121,8 const filterItem = (itemId: string | num
94 121
95 122 export const get = <T>(store: IndexedStore<T>) => {
96 123 const changes = hasNotifications<T>(store) ?
97 observe<Change<T>>(({ next }) => {
98 const handle = after(store, "notify", (...args: Change<T>) => next(args), true);
124 observe<ItemUpdate<T>>(({ next }) => {
125 const handle = after(store, "notify", (...args: ItemUpdate<T>) => next(args), true);
99 126 return () => handle.remove();
100 127 }) : empty;
101 128
@@ -1,6 +1,6
1 1 import Memory = require("dojo/store/Memory");
2 2 import Observerable = require("dojo/store/Observable");
3 import { get } from "./store";
3 import { get, queryEx } from "./store";
4 4 import tap = require("tap");
5 5
6 6 interface Person {
@@ -61,5 +61,29 tap.test("store::get(...) tests", async
61 61 samSubscription.unsubscribe();
62 62 }
63 63
64 store.put({ id: samId, name: "Sam", age: 29});
65
66 const [data, updates] = queryEx(store)({ age: 29}, { sort: [{attribute: "id"}] });
67
68 const dump: string[] = [];
69
70 const subscription = data
71 .tap({
72 complete: () => dump.push("eof")
73 })
74 .cat(updates)
75 .tap({
76 next: ({item: {id}}) => dump.push(id),
77 complete: () => dump.push("eof")
78 })
79 .subscribe({});
80
81 t.same(dump, ["id:peter", "id:sam", "eof"]);
82
83 store.put({ id: "id:mary", name: "Mary", age: 29});
84
85 t.same(dump, ["id:peter", "id:sam", "eof", "id:mary"]);
86
87 subscription.unsubscribe();
64 88
65 89 }).catch(() => { }); No newline at end of file
@@ -147,4 +147,27 tap.test("of(...) tests", async t => {
147 147 t.same(await of(1,2,3).collect(), [1,2,3], ".collect() should return the collected sequence");
148 148 await t.rejects(of(1,2,3).collect(cancelled), ".collect() should support cancellation");
149 149
150 }).catch(() => { }); No newline at end of file
150 }).catch(() => { });
151
152 tap.test(".tap() tests", async t => {
153 const side: number[] = [];
154
155 of(1,2)
156 .tap({next: v => side.push(v), complete: () => side.push(0)})
157 .tap({next: v => side.push(v*v)})
158 .subscribe({});
159
160 t.same(side, [1,1,2,4,0], ".tap() should be called in the order of registration");
161
162 side.length = 0;
163
164 await new Promise<void>(resolve => {
165 of(1,2,delay(1).then(() => 3))
166 .tap({next: v => side.push(v)})
167 .tap({ next: v => v === 1 && resolve()})
168 .subscribe({});
169 });
170
171 t.same(side, [1,2], ".tap() should be processed synchronously");
172
173 }).catch(() => {}); No newline at end of file
General Comments 0
You need to be logged in to leave comments. Login now