##// END OF EJS Templates
Merge
cin -
r154:2a5720a0816e merge default
parent child
Show More
@@ -54,3 +54,4 894b8239b953c0384cf32cab46cf41dac97ea03b
54 54 63215d91ae4b711eeb2145b9685240e1afada904 v1.9.0-rc6
55 55 af4f8424e83d56e89a64f39e19514ca10dbd43c6 v1.9.0
56 56 63f3ad8e6cffcf32fc5a555e55e8544227293d4c v1.10.0
57 d9c99ae7dec87ef47aded05def6a094a892df0e6 v1.10.1
@@ -42,19 +42,32 export const query = <T, Q, O>(store: Qu
42 42 };
43 43 };
44 44
45 /**
46 * Wraps the query method of the store, the resulting method takes a query
47 * expression and returns two observable sequences. The first sequence represents
48 * the results of the query, the second sequence provides the updates to the
49 * query results.
50 *
51 * @param store The store used to query data
52 * @param includeUpdates The flag to include item updates not only additions and
53 * deletions. By default this flag is set to true.
54 * @returns Two observable sequences
55 */
45 56 export const queryEx = <T, Q, O>(store: Queryable<T, Q, O>, includeUpdates = true) =>
46 57 (query?: Q, options?: O): [data: QueryResults<T>, updates: QueryResults<T>] => {
47 58
48 const pending: T[] = [];
49
50 let results: PromiseOrValue<T[]> = pending;
59 /** count active observers */
60 let listeners = 0;
61 let results: PromiseOrValue<T[]> = [];
51 62
52 63 const data = observe<OrderedUpdate<T>>(({ next, complete, error }) => {
53 64 const processResults = (items: T[]) =>
54 65 items.forEach((item, newIndex) => next({ item, newIndex, prevIndex: -1 }));
55 66
56 67 try {
57 if (results === pending)
68 // is there are no active observers here, we need to query actual
69 // data from the store.
70 if (listeners === 0)
58 71 results = store.query(query, options);
59 72
60 73 if (isPromise(results)) {
@@ -74,8 +87,14 export const queryEx = <T, Q, O>(store:
74 87 const updates = observe<OrderedUpdate<T>>(({ next, complete, error, isClosed }) => {
75 88 try {
76 89 if (!isClosed() && isDjObservableResults<T>(results)) {
90 // subscribe fot the changes
91 listeners++;
77 92 const h = results.observe((item, prevIndex, newIndex) => next({ item, prevIndex, newIndex }), includeUpdates);
78 return () => h.remove();
93 return () => {
94 // unsubscribe from changes
95 listeners--;
96 h.remove();
97 };
79 98 } else {
80 99 complete();
81 100 }
@@ -170,4 +170,22 tap.test(".tap() tests", async t => {
170 170
171 171 t.same(side, [1,2], ".tap() should be processed synchronously");
172 172
173 }).catch(() => { });
174
175 tap.test(".while() tests", async t => {
176
177 const seq = of(1, 2, 3, 4).while(v => v <= 2);
178
179 t.same(await seq.collect(), [1, 2], "Should collect only taken elements");
180
181 const data: number[] = [];
182 let complete = 0;
183 seq.subscribe({
184 next: v => data.push(v),
185 complete: () => complete++
186 });
187
188 t.same(data, [1, 2], "Should receive only taken elements");
189 t.equal(complete, 1, "Complete should run once");
190
173 191 }).catch(() => {}); No newline at end of file
@@ -56,8 +56,8 tasks.matching{ it.name =~ /^configureTs
56 56 }
57 57 }
58 58
59 npmInstall {
60 //npmInstall.dependsOn it
59 task npmInstallLocalDeps {
60 npmInstall.dependsOn it
61 61 dependsOn configurations.npmLocal
62 62
63 63 doFirst {
General Comments 0
You need to be logged in to leave comments. Login now