@@ -70,7 +70,10 export const isSubscribable = <T = unkno | |||||
70 | v !== null && v !== undefined && typeof (v as Subscribable<unknown>).subscribe === "function"; |
|
70 | v !== null && v !== undefined && typeof (v as Subscribable<unknown>).subscribe === "function"; | |
71 |
|
71 | |||
72 | export interface Subscribable<T> { |
|
72 | export interface Subscribable<T> { | |
73 | subscribe(consumer: Observer<T>): Unsubscribable; |
|
73 | /** Subscribes a consumer to events. If a consumer isn't specified | |
|
74 | * this method activates the producer to achieve side affects if any. | |||
|
75 | */ | |||
|
76 | subscribe(consumer?: Observer<T>): Unsubscribable; | |||
74 | } |
|
77 | } | |
75 |
|
78 | |||
76 | export type AccumulatorFn<T, A> = (acc: A, value: T) => A; |
|
79 | export type AccumulatorFn<T, A> = (acc: A, value: T) => A; | |
@@ -85,6 +88,13 export interface Observable<T> extends S | |||||
85 | */ |
|
88 | */ | |
86 | map<T2>(mapper: (value: T) => T2): Observable<T2>; |
|
89 | map<T2>(mapper: (value: T) => T2): Observable<T2>; | |
87 |
|
90 | |||
|
91 | /** Injects the specified observer into the each producer to consumer chain. | |||
|
92 | * The method is used to add side effect to the events processing. | |||
|
93 | * | |||
|
94 | * @param observer The consumer for the events | |||
|
95 | */ | |||
|
96 | tap(observer: Observer<T>): Observable<T>; | |||
|
97 | ||||
88 | /** Filters elements of the sequence. The resulting sequence will |
|
98 | /** Filters elements of the sequence. The resulting sequence will | |
89 | * contain only elements which match the specified predicate. |
|
99 | * contain only elements which match the specified predicate. | |
90 | * |
|
100 | * | |
@@ -203,7 +213,7 const fuse = <T>(producer: Producer<T>) | |||||
203 | }; |
|
213 | }; | |
204 |
|
214 | |||
205 | const _observe = <T>(producer: FusedProducer<T>): Observable<T> => ({ |
|
215 | const _observe = <T>(producer: FusedProducer<T>): Observable<T> => ({ | |
206 | subscribe: (consumer: Observer<T>) => ({ |
|
216 | subscribe: (consumer: Observer<T> = {}) => ({ | |
207 | unsubscribe: producer(sink(consumer)) ?? noop |
|
217 | unsubscribe: producer(sink(consumer)) ?? noop | |
208 | }), |
|
218 | }), | |
209 |
|
219 | |||
@@ -214,6 +224,14 const _observe = <T>(producer: FusedProd | |||||
214 | }) |
|
224 | }) | |
215 | ), |
|
225 | ), | |
216 |
|
226 | |||
|
227 | tap: ({next: tapNext, complete: tapComplete, error: tapError}) => _observe(({next,complete, error}) => | |||
|
228 | producer({ | |||
|
229 | next: tapNext ? (v => (tapNext(v), next(v))) : next, | |||
|
230 | complete: tapComplete ? (() => (tapComplete(), complete())): complete, | |||
|
231 | error: tapError ? (e => (tapError(e), error(e))) : error | |||
|
232 | }) | |||
|
233 | ), | |||
|
234 | ||||
217 | filter: (predicate) => _observe(({ next, ...rest }) => |
|
235 | filter: (predicate) => _observe(({ next, ...rest }) => | |
218 | producer({ |
|
236 | producer({ | |
219 | next: next !== noop ? (v: T) => predicate(v) ? next(v) : void (0) : noop, |
|
237 | next: next !== noop ? (v: T) => predicate(v) ? next(v) : void (0) : noop, |
@@ -33,34 +33,61 interface Queryable<T, Q, O> { | |||||
33 | export const isDjObservableResults = <T>(v: object): v is DjObservableResults<T> => |
|
33 | export const isDjObservableResults = <T>(v: object): v is DjObservableResults<T> => | |
34 | v && (typeof (v as { observe?: unknown; }).observe === "function"); |
|
34 | v && (typeof (v as { observe?: unknown; }).observe === "function"); | |
35 |
|
35 | |||
36 | export const query = <T, Q, O>(store: Queryable<T, Q, O>, includeUpdates = true) => |
|
36 | export const query = <T, Q, O>(store: Queryable<T, Q, O>, includeUpdates = true) => { | |
37 | (query?: Q, options?: O & { observe?: boolean }) => { |
|
37 | const q = queryEx(store, includeUpdates); | |
38 | return observe<OrderedUpdate<T>>(({ next, complete, error, isClosed }) => { |
|
38 | return (query?: Q, options?: O & { observe?: boolean }) => { | |
|
39 | const [data, updates] = q(query, options); | |||
|
40 | ||||
|
41 | return options?.observe === false ? data : data.cat(updates); | |||
|
42 | }; | |||
|
43 | }; | |||
39 |
|
44 | |||
|
45 | export const queryEx = <T, Q, O>(store: Queryable<T, Q, O>, includeUpdates = true) => | |||
|
46 | (query?: Q, options?: O): [data: QueryResults<T>, updates: QueryResults<T>] => { | |||
|
47 | ||||
|
48 | const pending: T[] = []; | |||
|
49 | ||||
|
50 | let results: PromiseOrValue<T[]> = pending; | |||
|
51 | ||||
|
52 | const data = observe<OrderedUpdate<T>>(({ next, complete, error }) => { | |||
40 | const processResults = (items: T[]) => |
|
53 | const processResults = (items: T[]) => | |
41 | items.forEach((item, newIndex) => next({ item, newIndex, prevIndex: -1 })); |
|
54 | items.forEach((item, newIndex) => next({ item, newIndex, prevIndex: -1 })); | |
42 |
|
55 | |||
43 | try { |
|
56 | try { | |
44 |
|
|
57 | if (results === pending) | |
|
58 | results = store.query(query, options); | |||
|
59 | ||||
45 | if (isPromise(results)) { |
|
60 | if (isPromise(results)) { | |
46 |
results.then(processResults).then( |
|
61 | results.then(processResults).then(complete, error); | |
|
62 | ||||
|
63 | if (isCancellable(results)) | |||
|
64 | return results.cancel.bind(results); | |||
47 | } else { |
|
65 | } else { | |
48 | processResults(results); |
|
66 | processResults(results); | |
|
67 | complete(); | |||
49 | } |
|
68 | } | |
|
69 | } catch (e) { | |||
|
70 | error(e); | |||
|
71 | } | |||
|
72 | }); | |||
50 |
|
73 | |||
51 | if (!isClosed() && (options?.observe !== false) && isDjObservableResults<T>(results)) { |
|
74 | const updates = observe<OrderedUpdate<T>>(({ next, complete, error, isClosed }) => { | |
|
75 | try { | |||
|
76 | if (!isClosed() && isDjObservableResults<T>(results)) { | |||
52 | const h = results.observe((item, prevIndex, newIndex) => next({ item, prevIndex, newIndex }), includeUpdates); |
|
77 | const h = results.observe((item, prevIndex, newIndex) => next({ item, prevIndex, newIndex }), includeUpdates); | |
53 | return () => h.remove(); |
|
78 | return () => h.remove(); | |
54 | } else { |
|
79 | } else { | |
55 | complete(); |
|
80 | complete(); | |
56 | } |
|
81 | } | |
57 |
} catch (e |
|
82 | } catch (e) { | |
58 |
error(e |
|
83 | error(e); | |
59 | } |
|
84 | } | |
60 | }); |
|
85 | }); | |
61 |
|
86 | |||
|
87 | return [data, updates]; | |||
62 | }; |
|
88 | }; | |
63 |
|
89 | |||
|
90 | ||||
64 | interface IndexedStore<T> { |
|
91 | interface IndexedStore<T> { | |
65 | get(id: string | number): PromiseLike<T> | T | null | undefined; |
|
92 | get(id: string | number): PromiseLike<T> | T | null | undefined; | |
66 | } |
|
93 | } | |
@@ -76,11 +103,11 interface GetOpts { | |||||
76 | observe?: boolean; |
|
103 | observe?: boolean; | |
77 | } |
|
104 | } | |
78 |
|
105 | |||
79 |
type |
|
106 | export type ItemUpdate<T> = [item: NonNullable<T>, id: string | number | undefined] | | |
80 | [item: undefined | null, id: string | number]; |
|
107 | [item: undefined | null, id: string | number]; | |
81 |
|
108 | |||
82 | const filterItem = (itemId: string | number) => |
|
109 | const filterItem = (itemId: string | number) => | |
83 |
<T>(source: Observable< |
|
110 | <T>(source: Observable<ItemUpdate<T>>) => | |
84 | observe<T>(({ next, complete, error }) => { |
|
111 | observe<T>(({ next, complete, error }) => { | |
85 | const subscription = source |
|
112 | const subscription = source | |
86 | .filter(([, id]) => id === itemId) |
|
113 | .filter(([, id]) => id === itemId) | |
@@ -94,8 +121,8 const filterItem = (itemId: string | num | |||||
94 |
|
121 | |||
95 | export const get = <T>(store: IndexedStore<T>) => { |
|
122 | export const get = <T>(store: IndexedStore<T>) => { | |
96 | const changes = hasNotifications<T>(store) ? |
|
123 | const changes = hasNotifications<T>(store) ? | |
97 |
observe< |
|
124 | observe<ItemUpdate<T>>(({ next }) => { | |
98 |
const handle = after(store, "notify", (...args: |
|
125 | const handle = after(store, "notify", (...args: ItemUpdate<T>) => next(args), true); | |
99 | return () => handle.remove(); |
|
126 | return () => handle.remove(); | |
100 | }) : empty; |
|
127 | }) : empty; | |
101 |
|
128 |
@@ -1,6 +1,6 | |||||
1 | import Memory = require("dojo/store/Memory"); |
|
1 | import Memory = require("dojo/store/Memory"); | |
2 | import Observerable = require("dojo/store/Observable"); |
|
2 | import Observerable = require("dojo/store/Observable"); | |
3 | import { get } from "./store"; |
|
3 | import { get, queryEx } from "./store"; | |
4 | import tap = require("tap"); |
|
4 | import tap = require("tap"); | |
5 |
|
5 | |||
6 | interface Person { |
|
6 | interface Person { | |
@@ -61,5 +61,29 tap.test("store::get(...) tests", async | |||||
61 | samSubscription.unsubscribe(); |
|
61 | samSubscription.unsubscribe(); | |
62 | } |
|
62 | } | |
63 |
|
63 | |||
|
64 | store.put({ id: samId, name: "Sam", age: 29}); | |||
|
65 | ||||
|
66 | const [data, updates] = queryEx(store)({ age: 29}, { sort: [{attribute: "id"}] }); | |||
|
67 | ||||
|
68 | const dump: string[] = []; | |||
|
69 | ||||
|
70 | const subscription = data | |||
|
71 | .tap({ | |||
|
72 | complete: () => dump.push("eof") | |||
|
73 | }) | |||
|
74 | .cat(updates) | |||
|
75 | .tap({ | |||
|
76 | next: ({item: {id}}) => dump.push(id), | |||
|
77 | complete: () => dump.push("eof") | |||
|
78 | }) | |||
|
79 | .subscribe({}); | |||
|
80 | ||||
|
81 | t.same(dump, ["id:peter", "id:sam", "eof"]); | |||
|
82 | ||||
|
83 | store.put({ id: "id:mary", name: "Mary", age: 29}); | |||
|
84 | ||||
|
85 | t.same(dump, ["id:peter", "id:sam", "eof", "id:mary"]); | |||
|
86 | ||||
|
87 | subscription.unsubscribe(); | |||
64 |
|
88 | |||
65 | }).catch(() => { }); No newline at end of file |
|
89 | }).catch(() => { }); |
@@ -147,4 +147,27 tap.test("of(...) tests", async t => { | |||||
147 | t.same(await of(1,2,3).collect(), [1,2,3], ".collect() should return the collected sequence"); |
|
147 | t.same(await of(1,2,3).collect(), [1,2,3], ".collect() should return the collected sequence"); | |
148 | await t.rejects(of(1,2,3).collect(cancelled), ".collect() should support cancellation"); |
|
148 | await t.rejects(of(1,2,3).collect(cancelled), ".collect() should support cancellation"); | |
149 |
|
149 | |||
|
150 | }).catch(() => { }); | |||
|
151 | ||||
|
152 | tap.test(".tap() tests", async t => { | |||
|
153 | const side: number[] = []; | |||
|
154 | ||||
|
155 | of(1,2) | |||
|
156 | .tap({next: v => side.push(v), complete: () => side.push(0)}) | |||
|
157 | .tap({next: v => side.push(v*v)}) | |||
|
158 | .subscribe({}); | |||
|
159 | ||||
|
160 | t.same(side, [1,1,2,4,0], ".tap() should be called in the order of registration"); | |||
|
161 | ||||
|
162 | side.length = 0; | |||
|
163 | ||||
|
164 | await new Promise<void>(resolve => { | |||
|
165 | of(1,2,delay(1).then(() => 3)) | |||
|
166 | .tap({next: v => side.push(v)}) | |||
|
167 | .tap({ next: v => v === 1 && resolve()}) | |||
|
168 | .subscribe({}); | |||
|
169 | }); | |||
|
170 | ||||
|
171 | t.same(side, [1,2], ".tap() should be processed synchronously"); | |||
|
172 | ||||
150 |
}).catch(() => { |
|
173 | }).catch(() => {}); |
General Comments 0
You need to be logged in to leave comments.
Login now