@@ -70,7 +70,10 export const isSubscribable = <T = unkno | |||
|
70 | 70 | v !== null && v !== undefined && typeof (v as Subscribable<unknown>).subscribe === "function"; |
|
71 | 71 | |
|
72 | 72 | export interface Subscribable<T> { |
|
73 | subscribe(consumer: Observer<T>): Unsubscribable; | |
|
73 | /** Subscribes a consumer to events. If a consumer isn't specified | |
|
74 | * this method activates the producer to achieve side affects if any. | |
|
75 | */ | |
|
76 | subscribe(consumer?: Observer<T>): Unsubscribable; | |
|
74 | 77 | } |
|
75 | 78 | |
|
76 | 79 | export type AccumulatorFn<T, A> = (acc: A, value: T) => A; |
@@ -85,6 +88,13 export interface Observable<T> extends S | |||
|
85 | 88 | */ |
|
86 | 89 | map<T2>(mapper: (value: T) => T2): Observable<T2>; |
|
87 | 90 | |
|
91 | /** Injects the specified observer into the each producer to consumer chain. | |
|
92 | * The method is used to add side effect to the events processing. | |
|
93 | * | |
|
94 | * @param observer The consumer for the events | |
|
95 | */ | |
|
96 | tap(observer: Observer<T>): Observable<T>; | |
|
97 | ||
|
88 | 98 | /** Filters elements of the sequence. The resulting sequence will |
|
89 | 99 | * contain only elements which match the specified predicate. |
|
90 | 100 | * |
@@ -203,7 +213,7 const fuse = <T>(producer: Producer<T>) | |||
|
203 | 213 | }; |
|
204 | 214 | |
|
205 | 215 | const _observe = <T>(producer: FusedProducer<T>): Observable<T> => ({ |
|
206 | subscribe: (consumer: Observer<T>) => ({ | |
|
216 | subscribe: (consumer: Observer<T> = {}) => ({ | |
|
207 | 217 | unsubscribe: producer(sink(consumer)) ?? noop |
|
208 | 218 | }), |
|
209 | 219 | |
@@ -214,6 +224,14 const _observe = <T>(producer: FusedProd | |||
|
214 | 224 | }) |
|
215 | 225 | ), |
|
216 | 226 | |
|
227 | tap: ({next: tapNext, complete: tapComplete, error: tapError}) => _observe(({next,complete, error}) => | |
|
228 | producer({ | |
|
229 | next: tapNext ? (v => (tapNext(v), next(v))) : next, | |
|
230 | complete: tapComplete ? (() => (tapComplete(), complete())): complete, | |
|
231 | error: tapError ? (e => (tapError(e), error(e))) : error | |
|
232 | }) | |
|
233 | ), | |
|
234 | ||
|
217 | 235 | filter: (predicate) => _observe(({ next, ...rest }) => |
|
218 | 236 | producer({ |
|
219 | 237 | next: next !== noop ? (v: T) => predicate(v) ? next(v) : void (0) : noop, |
@@ -33,34 +33,61 interface Queryable<T, Q, O> { | |||
|
33 | 33 | export const isDjObservableResults = <T>(v: object): v is DjObservableResults<T> => |
|
34 | 34 | v && (typeof (v as { observe?: unknown; }).observe === "function"); |
|
35 | 35 | |
|
36 | export const query = <T, Q, O>(store: Queryable<T, Q, O>, includeUpdates = true) => | |
|
37 | (query?: Q, options?: O & { observe?: boolean }) => { | |
|
38 | return observe<OrderedUpdate<T>>(({ next, complete, error, isClosed }) => { | |
|
36 | export const query = <T, Q, O>(store: Queryable<T, Q, O>, includeUpdates = true) => { | |
|
37 | const q = queryEx(store, includeUpdates); | |
|
38 | return (query?: Q, options?: O & { observe?: boolean }) => { | |
|
39 | const [data, updates] = q(query, options); | |
|
40 | ||
|
41 | return options?.observe === false ? data : data.cat(updates); | |
|
42 | }; | |
|
43 | }; | |
|
39 | 44 | |
|
45 | export const queryEx = <T, Q, O>(store: Queryable<T, Q, O>, includeUpdates = true) => | |
|
46 | (query?: Q, options?: O): [data: QueryResults<T>, updates: QueryResults<T>] => { | |
|
47 | ||
|
48 | const pending: T[] = []; | |
|
49 | ||
|
50 | let results: PromiseOrValue<T[]> = pending; | |
|
51 | ||
|
52 | const data = observe<OrderedUpdate<T>>(({ next, complete, error }) => { | |
|
40 | 53 | const processResults = (items: T[]) => |
|
41 | 54 | items.forEach((item, newIndex) => next({ item, newIndex, prevIndex: -1 })); |
|
42 | 55 | |
|
43 | 56 | try { |
|
44 |
|
|
|
57 | if (results === pending) | |
|
58 | results = store.query(query, options); | |
|
59 | ||
|
45 | 60 | if (isPromise(results)) { |
|
46 |
results.then(processResults).then( |
|
|
61 | results.then(processResults).then(complete, error); | |
|
62 | ||
|
63 | if (isCancellable(results)) | |
|
64 | return results.cancel.bind(results); | |
|
47 | 65 | } else { |
|
48 | 66 | processResults(results); |
|
67 | complete(); | |
|
49 | 68 | } |
|
69 | } catch (e) { | |
|
70 | error(e); | |
|
71 | } | |
|
72 | }); | |
|
50 | 73 | |
|
51 | if (!isClosed() && (options?.observe !== false) && isDjObservableResults<T>(results)) { | |
|
74 | const updates = observe<OrderedUpdate<T>>(({ next, complete, error, isClosed }) => { | |
|
75 | try { | |
|
76 | if (!isClosed() && isDjObservableResults<T>(results)) { | |
|
52 | 77 | const h = results.observe((item, prevIndex, newIndex) => next({ item, prevIndex, newIndex }), includeUpdates); |
|
53 | 78 | return () => h.remove(); |
|
54 | 79 | } else { |
|
55 | 80 | complete(); |
|
56 | 81 | } |
|
57 |
} catch (e |
|
|
58 |
error(e |
|
|
82 | } catch (e) { | |
|
83 | error(e); | |
|
59 | 84 | } |
|
60 | 85 | }); |
|
61 | 86 | |
|
87 | return [data, updates]; | |
|
62 | 88 | }; |
|
63 | 89 | |
|
90 | ||
|
64 | 91 | interface IndexedStore<T> { |
|
65 | 92 | get(id: string | number): PromiseLike<T> | T | null | undefined; |
|
66 | 93 | } |
@@ -76,11 +103,11 interface GetOpts { | |||
|
76 | 103 | observe?: boolean; |
|
77 | 104 | } |
|
78 | 105 | |
|
79 |
type |
|
|
106 | export type ItemUpdate<T> = [item: NonNullable<T>, id: string | number | undefined] | | |
|
80 | 107 | [item: undefined | null, id: string | number]; |
|
81 | 108 | |
|
82 | 109 | const filterItem = (itemId: string | number) => |
|
83 |
<T>(source: Observable< |
|
|
110 | <T>(source: Observable<ItemUpdate<T>>) => | |
|
84 | 111 | observe<T>(({ next, complete, error }) => { |
|
85 | 112 | const subscription = source |
|
86 | 113 | .filter(([, id]) => id === itemId) |
@@ -94,8 +121,8 const filterItem = (itemId: string | num | |||
|
94 | 121 | |
|
95 | 122 | export const get = <T>(store: IndexedStore<T>) => { |
|
96 | 123 | const changes = hasNotifications<T>(store) ? |
|
97 |
observe< |
|
|
98 |
const handle = after(store, "notify", (...args: |
|
|
124 | observe<ItemUpdate<T>>(({ next }) => { | |
|
125 | const handle = after(store, "notify", (...args: ItemUpdate<T>) => next(args), true); | |
|
99 | 126 | return () => handle.remove(); |
|
100 | 127 | }) : empty; |
|
101 | 128 |
@@ -1,6 +1,6 | |||
|
1 | 1 | import Memory = require("dojo/store/Memory"); |
|
2 | 2 | import Observerable = require("dojo/store/Observable"); |
|
3 | import { get } from "./store"; | |
|
3 | import { get, queryEx } from "./store"; | |
|
4 | 4 | import tap = require("tap"); |
|
5 | 5 | |
|
6 | 6 | interface Person { |
@@ -61,5 +61,29 tap.test("store::get(...) tests", async | |||
|
61 | 61 | samSubscription.unsubscribe(); |
|
62 | 62 | } |
|
63 | 63 | |
|
64 | store.put({ id: samId, name: "Sam", age: 29}); | |
|
65 | ||
|
66 | const [data, updates] = queryEx(store)({ age: 29}, { sort: [{attribute: "id"}] }); | |
|
67 | ||
|
68 | const dump: string[] = []; | |
|
69 | ||
|
70 | const subscription = data | |
|
71 | .tap({ | |
|
72 | complete: () => dump.push("eof") | |
|
73 | }) | |
|
74 | .cat(updates) | |
|
75 | .tap({ | |
|
76 | next: ({item: {id}}) => dump.push(id), | |
|
77 | complete: () => dump.push("eof") | |
|
78 | }) | |
|
79 | .subscribe({}); | |
|
80 | ||
|
81 | t.same(dump, ["id:peter", "id:sam", "eof"]); | |
|
82 | ||
|
83 | store.put({ id: "id:mary", name: "Mary", age: 29}); | |
|
84 | ||
|
85 | t.same(dump, ["id:peter", "id:sam", "eof", "id:mary"]); | |
|
86 | ||
|
87 | subscription.unsubscribe(); | |
|
64 | 88 | |
|
65 | 89 | }).catch(() => { }); No newline at end of file |
@@ -147,4 +147,27 tap.test("of(...) tests", async t => { | |||
|
147 | 147 | t.same(await of(1,2,3).collect(), [1,2,3], ".collect() should return the collected sequence"); |
|
148 | 148 | await t.rejects(of(1,2,3).collect(cancelled), ".collect() should support cancellation"); |
|
149 | 149 | |
|
150 | }).catch(() => { }); | |
|
151 | ||
|
152 | tap.test(".tap() tests", async t => { | |
|
153 | const side: number[] = []; | |
|
154 | ||
|
155 | of(1,2) | |
|
156 | .tap({next: v => side.push(v), complete: () => side.push(0)}) | |
|
157 | .tap({next: v => side.push(v*v)}) | |
|
158 | .subscribe({}); | |
|
159 | ||
|
160 | t.same(side, [1,1,2,4,0], ".tap() should be called in the order of registration"); | |
|
161 | ||
|
162 | side.length = 0; | |
|
163 | ||
|
164 | await new Promise<void>(resolve => { | |
|
165 | of(1,2,delay(1).then(() => 3)) | |
|
166 | .tap({next: v => side.push(v)}) | |
|
167 | .tap({ next: v => v === 1 && resolve()}) | |
|
168 | .subscribe({}); | |
|
169 | }); | |
|
170 | ||
|
171 | t.same(side, [1,2], ".tap() should be processed synchronously"); | |
|
172 | ||
|
150 | 173 |
}).catch(() => { |
General Comments 0
You need to be logged in to leave comments.
Login now