| @@ -51,4 +51,5 435ce00ba2452d565f1ed3bb884643b4b88b9cde | |||||
| 51 | 98b2c550c676ff6acf16f5bd56ee3804342f80b7 v1.9.0-rc3 |
|
51 | 98b2c550c676ff6acf16f5bd56ee3804342f80b7 v1.9.0-rc3 | |
| 52 | 515d1b83ebdfaa93599451175055a94f711c079f v1.9.0-rc4 |
|
52 | 515d1b83ebdfaa93599451175055a94f711c079f v1.9.0-rc4 | |
| 53 | 894b8239b953c0384cf32cab46cf41dac97ea03b v1.9.0-rc5 |
|
53 | 894b8239b953c0384cf32cab46cf41dac97ea03b v1.9.0-rc5 | |
|
|
54 | 63215d91ae4b711eeb2145b9685240e1afada904 v1.9.0-rc6 | |||
| 54 | af4f8424e83d56e89a64f39e19514ca10dbd43c6 v1.9.0 |
|
55 | af4f8424e83d56e89a64f39e19514ca10dbd43c6 v1.9.0 | |
| @@ -70,7 +70,10 export const isSubscribable = <T = unkno | |||||
| 70 | v !== null && v !== undefined && typeof (v as Subscribable<unknown>).subscribe === "function"; |
|
70 | v !== null && v !== undefined && typeof (v as Subscribable<unknown>).subscribe === "function"; | |
| 71 |
|
71 | |||
| 72 | export interface Subscribable<T> { |
|
72 | export interface Subscribable<T> { | |
| 73 | subscribe(consumer: Observer<T>): Unsubscribable; |
|
73 | /** Subscribes a consumer to events. If a consumer isn't specified | |
|
|
74 | * this method activates the producer to achieve side affects if any. | |||
|
|
75 | */ | |||
|
|
76 | subscribe(consumer?: Observer<T>): Unsubscribable; | |||
| 74 | } |
|
77 | } | |
| 75 |
|
78 | |||
| 76 | export type AccumulatorFn<T, A> = (acc: A, value: T) => A; |
|
79 | export type AccumulatorFn<T, A> = (acc: A, value: T) => A; | |
| @@ -85,6 +88,13 export interface Observable<T> extends S | |||||
| 85 | */ |
|
88 | */ | |
| 86 | map<T2>(mapper: (value: T) => T2): Observable<T2>; |
|
89 | map<T2>(mapper: (value: T) => T2): Observable<T2>; | |
| 87 |
|
90 | |||
|
|
91 | /** Injects the specified observer into the each producer to consumer chain. | |||
|
|
92 | * The method is used to add side effect to the events processing. | |||
|
|
93 | * | |||
|
|
94 | * @param observer The consumer for the events | |||
|
|
95 | */ | |||
|
|
96 | tap(observer: Observer<T>): Observable<T>; | |||
|
|
97 | ||||
| 88 | /** Filters elements of the sequence. The resulting sequence will |
|
98 | /** Filters elements of the sequence. The resulting sequence will | |
| 89 | * contain only elements which match the specified predicate. |
|
99 | * contain only elements which match the specified predicate. | |
| 90 | * |
|
100 | * | |
| @@ -203,7 +213,7 const fuse = <T>(producer: Producer<T>) | |||||
| 203 | }; |
|
213 | }; | |
| 204 |
|
214 | |||
| 205 | const _observe = <T>(producer: FusedProducer<T>): Observable<T> => ({ |
|
215 | const _observe = <T>(producer: FusedProducer<T>): Observable<T> => ({ | |
| 206 | subscribe: (consumer: Observer<T>) => ({ |
|
216 | subscribe: (consumer: Observer<T> = {}) => ({ | |
| 207 | unsubscribe: producer(sink(consumer)) ?? noop |
|
217 | unsubscribe: producer(sink(consumer)) ?? noop | |
| 208 | }), |
|
218 | }), | |
| 209 |
|
219 | |||
| @@ -214,6 +224,14 const _observe = <T>(producer: FusedProd | |||||
| 214 | }) |
|
224 | }) | |
| 215 | ), |
|
225 | ), | |
| 216 |
|
226 | |||
|
|
227 | tap: ({next: tapNext, complete: tapComplete, error: tapError}) => _observe(({next,complete, error}) => | |||
|
|
228 | producer({ | |||
|
|
229 | next: tapNext ? (v => (tapNext(v), next(v))) : next, | |||
|
|
230 | complete: tapComplete ? (() => (tapComplete(), complete())): complete, | |||
|
|
231 | error: tapError ? (e => (tapError(e), error(e))) : error | |||
|
|
232 | }) | |||
|
|
233 | ), | |||
|
|
234 | ||||
| 217 | filter: (predicate) => _observe(({ next, ...rest }) => |
|
235 | filter: (predicate) => _observe(({ next, ...rest }) => | |
| 218 | producer({ |
|
236 | producer({ | |
| 219 | next: next !== noop ? (v: T) => predicate(v) ? next(v) : void (0) : noop, |
|
237 | next: next !== noop ? (v: T) => predicate(v) ? next(v) : void (0) : noop, | |
| @@ -33,34 +33,61 interface Queryable<T, Q, O> { | |||||
| 33 | export const isDjObservableResults = <T>(v: object): v is DjObservableResults<T> => |
|
33 | export const isDjObservableResults = <T>(v: object): v is DjObservableResults<T> => | |
| 34 | v && (typeof (v as { observe?: unknown; }).observe === "function"); |
|
34 | v && (typeof (v as { observe?: unknown; }).observe === "function"); | |
| 35 |
|
35 | |||
| 36 | export const query = <T, Q, O>(store: Queryable<T, Q, O>, includeUpdates = true) => |
|
36 | export const query = <T, Q, O>(store: Queryable<T, Q, O>, includeUpdates = true) => { | |
| 37 | (query?: Q, options?: O & { observe?: boolean }) => { |
|
37 | const q = queryEx(store, includeUpdates); | |
| 38 | return observe<OrderedUpdate<T>>(({ next, complete, error, isClosed }) => { |
|
38 | return (query?: Q, options?: O & { observe?: boolean }) => { | |
|
|
39 | const [data, updates] = q(query, options); | |||
|
|
40 | ||||
|
|
41 | return options?.observe === false ? data : data.cat(updates); | |||
|
|
42 | }; | |||
|
|
43 | }; | |||
| 39 |
|
44 | |||
|
|
45 | export const queryEx = <T, Q, O>(store: Queryable<T, Q, O>, includeUpdates = true) => | |||
|
|
46 | (query?: Q, options?: O): [data: QueryResults<T>, updates: QueryResults<T>] => { | |||
|
|
47 | ||||
|
|
48 | const pending: T[] = []; | |||
|
|
49 | ||||
|
|
50 | let results: PromiseOrValue<T[]> = pending; | |||
|
|
51 | ||||
|
|
52 | const data = observe<OrderedUpdate<T>>(({ next, complete, error }) => { | |||
| 40 | const processResults = (items: T[]) => |
|
53 | const processResults = (items: T[]) => | |
| 41 | items.forEach((item, newIndex) => next({ item, newIndex, prevIndex: -1 })); |
|
54 | items.forEach((item, newIndex) => next({ item, newIndex, prevIndex: -1 })); | |
| 42 |
|
55 | |||
| 43 | try { |
|
56 | try { | |
| 44 |
|
|
57 | if (results === pending) | |
|
|
58 | results = store.query(query, options); | |||
|
|
59 | ||||
| 45 | if (isPromise(results)) { |
|
60 | if (isPromise(results)) { | |
| 46 |
results.then(processResults).then( |
|
61 | results.then(processResults).then(complete, error); | |
|
|
62 | ||||
|
|
63 | if (isCancellable(results)) | |||
|
|
64 | return results.cancel.bind(results); | |||
| 47 | } else { |
|
65 | } else { | |
| 48 | processResults(results); |
|
66 | processResults(results); | |
|
|
67 | complete(); | |||
| 49 | } |
|
68 | } | |
|
|
69 | } catch (e) { | |||
|
|
70 | error(e); | |||
|
|
71 | } | |||
|
|
72 | }); | |||
| 50 |
|
73 | |||
| 51 | if (!isClosed() && (options?.observe !== false) && isDjObservableResults<T>(results)) { |
|
74 | const updates = observe<OrderedUpdate<T>>(({ next, complete, error, isClosed }) => { | |
|
|
75 | try { | |||
|
|
76 | if (!isClosed() && isDjObservableResults<T>(results)) { | |||
| 52 | const h = results.observe((item, prevIndex, newIndex) => next({ item, prevIndex, newIndex }), includeUpdates); |
|
77 | const h = results.observe((item, prevIndex, newIndex) => next({ item, prevIndex, newIndex }), includeUpdates); | |
| 53 | return () => h.remove(); |
|
78 | return () => h.remove(); | |
| 54 | } else { |
|
79 | } else { | |
| 55 | complete(); |
|
80 | complete(); | |
| 56 | } |
|
81 | } | |
| 57 |
} catch (e |
|
82 | } catch (e) { | |
| 58 |
error(e |
|
83 | error(e); | |
| 59 | } |
|
84 | } | |
| 60 | }); |
|
85 | }); | |
| 61 |
|
86 | |||
|
|
87 | return [data, updates]; | |||
| 62 | }; |
|
88 | }; | |
| 63 |
|
89 | |||
|
|
90 | ||||
| 64 | interface IndexedStore<T> { |
|
91 | interface IndexedStore<T> { | |
| 65 | get(id: string | number): PromiseLike<T> | T | null | undefined; |
|
92 | get(id: string | number): PromiseLike<T> | T | null | undefined; | |
| 66 | } |
|
93 | } | |
| @@ -76,11 +103,11 interface GetOpts { | |||||
| 76 | observe?: boolean; |
|
103 | observe?: boolean; | |
| 77 | } |
|
104 | } | |
| 78 |
|
105 | |||
| 79 |
type |
|
106 | export type ItemUpdate<T> = [item: NonNullable<T>, id: string | number | undefined] | | |
| 80 | [item: undefined | null, id: string | number]; |
|
107 | [item: undefined | null, id: string | number]; | |
| 81 |
|
108 | |||
| 82 | const filterItem = (itemId: string | number) => |
|
109 | const filterItem = (itemId: string | number) => | |
| 83 |
<T>(source: Observable< |
|
110 | <T>(source: Observable<ItemUpdate<T>>) => | |
| 84 | observe<T>(({ next, complete, error }) => { |
|
111 | observe<T>(({ next, complete, error }) => { | |
| 85 | const subscription = source |
|
112 | const subscription = source | |
| 86 | .filter(([, id]) => id === itemId) |
|
113 | .filter(([, id]) => id === itemId) | |
| @@ -94,8 +121,8 const filterItem = (itemId: string | num | |||||
| 94 |
|
121 | |||
| 95 | export const get = <T>(store: IndexedStore<T>) => { |
|
122 | export const get = <T>(store: IndexedStore<T>) => { | |
| 96 | const changes = hasNotifications<T>(store) ? |
|
123 | const changes = hasNotifications<T>(store) ? | |
| 97 |
observe< |
|
124 | observe<ItemUpdate<T>>(({ next }) => { | |
| 98 |
const handle = after(store, "notify", (...args: |
|
125 | const handle = after(store, "notify", (...args: ItemUpdate<T>) => next(args), true); | |
| 99 | return () => handle.remove(); |
|
126 | return () => handle.remove(); | |
| 100 | }) : empty; |
|
127 | }) : empty; | |
| 101 |
|
128 | |||
| @@ -1,6 +1,6 | |||||
| 1 | import Memory = require("dojo/store/Memory"); |
|
1 | import Memory = require("dojo/store/Memory"); | |
| 2 | import Observerable = require("dojo/store/Observable"); |
|
2 | import Observerable = require("dojo/store/Observable"); | |
| 3 | import { get } from "./store"; |
|
3 | import { get, queryEx } from "./store"; | |
| 4 | import tap = require("tap"); |
|
4 | import tap = require("tap"); | |
| 5 |
|
5 | |||
| 6 | interface Person { |
|
6 | interface Person { | |
| @@ -61,5 +61,29 tap.test("store::get(...) tests", async | |||||
| 61 | samSubscription.unsubscribe(); |
|
61 | samSubscription.unsubscribe(); | |
| 62 | } |
|
62 | } | |
| 63 |
|
63 | |||
|
|
64 | store.put({ id: samId, name: "Sam", age: 29}); | |||
|
|
65 | ||||
|
|
66 | const [data, updates] = queryEx(store)({ age: 29}, { sort: [{attribute: "id"}] }); | |||
|
|
67 | ||||
|
|
68 | const dump: string[] = []; | |||
|
|
69 | ||||
|
|
70 | const subscription = data | |||
|
|
71 | .tap({ | |||
|
|
72 | complete: () => dump.push("eof") | |||
|
|
73 | }) | |||
|
|
74 | .cat(updates) | |||
|
|
75 | .tap({ | |||
|
|
76 | next: ({item: {id}}) => dump.push(id), | |||
|
|
77 | complete: () => dump.push("eof") | |||
|
|
78 | }) | |||
|
|
79 | .subscribe({}); | |||
|
|
80 | ||||
|
|
81 | t.same(dump, ["id:peter", "id:sam", "eof"]); | |||
|
|
82 | ||||
|
|
83 | store.put({ id: "id:mary", name: "Mary", age: 29}); | |||
|
|
84 | ||||
|
|
85 | t.same(dump, ["id:peter", "id:sam", "eof", "id:mary"]); | |||
|
|
86 | ||||
|
|
87 | subscription.unsubscribe(); | |||
| 64 |
|
88 | |||
| 65 | }).catch(() => { }); No newline at end of file |
|
89 | }).catch(() => { }); | |
| @@ -147,4 +147,27 tap.test("of(...) tests", async t => { | |||||
| 147 | t.same(await of(1,2,3).collect(), [1,2,3], ".collect() should return the collected sequence"); |
|
147 | t.same(await of(1,2,3).collect(), [1,2,3], ".collect() should return the collected sequence"); | |
| 148 | await t.rejects(of(1,2,3).collect(cancelled), ".collect() should support cancellation"); |
|
148 | await t.rejects(of(1,2,3).collect(cancelled), ".collect() should support cancellation"); | |
| 149 |
|
149 | |||
| 150 | }).catch(() => { }); No newline at end of file |
|
150 | }).catch(() => { }); | |
|
|
151 | ||||
|
|
152 | tap.test(".tap() tests", async t => { | |||
|
|
153 | const side: number[] = []; | |||
|
|
154 | ||||
|
|
155 | of(1,2) | |||
|
|
156 | .tap({next: v => side.push(v), complete: () => side.push(0)}) | |||
|
|
157 | .tap({next: v => side.push(v*v)}) | |||
|
|
158 | .subscribe({}); | |||
|
|
159 | ||||
|
|
160 | t.same(side, [1,1,2,4,0], ".tap() should be called in the order of registration"); | |||
|
|
161 | ||||
|
|
162 | side.length = 0; | |||
|
|
163 | ||||
|
|
164 | await new Promise<void>(resolve => { | |||
|
|
165 | of(1,2,delay(1).then(() => 3)) | |||
|
|
166 | .tap({next: v => side.push(v)}) | |||
|
|
167 | .tap({ next: v => v === 1 && resolve()}) | |||
|
|
168 | .subscribe({}); | |||
|
|
169 | }); | |||
|
|
170 | ||||
|
|
171 | t.same(side, [1,2], ".tap() should be processed synchronously"); | |||
|
|
172 | ||||
|
|
173 | }).catch(() => {}); No newline at end of file | |||
General Comments 0
You need to be logged in to leave comments.
Login now
