diff --git a/.hgtags b/.hgtags --- a/.hgtags +++ b/.hgtags @@ -54,3 +54,4 @@ 894b8239b953c0384cf32cab46cf41dac97ea03b 63215d91ae4b711eeb2145b9685240e1afada904 v1.9.0-rc6 af4f8424e83d56e89a64f39e19514ca10dbd43c6 v1.9.0 63f3ad8e6cffcf32fc5a555e55e8544227293d4c v1.10.0 +d9c99ae7dec87ef47aded05def6a094a892df0e6 v1.10.1 diff --git a/djx/src/main/ts/store.ts b/djx/src/main/ts/store.ts --- a/djx/src/main/ts/store.ts +++ b/djx/src/main/ts/store.ts @@ -42,19 +42,32 @@ export const query = (store: Qu }; }; +/** + * Wraps the query method of the store, the resulting method takes a query + * expression and returns two observable sequences. The first sequence represents + * the results of the query, the second sequence provides the updates to the + * query results. + * + * @param store The store used to query data + * @param includeUpdates The flag to include item updates not only additions and + * deletions. By default this flag is set to true. + * @returns Two observable sequences + */ export const queryEx = (store: Queryable, includeUpdates = true) => (query?: Q, options?: O): [data: QueryResults, updates: QueryResults] => { - const pending: T[] = []; - - let results: PromiseOrValue = pending; + /** count active observers */ + let listeners = 0; + let results: PromiseOrValue = []; const data = observe>(({ next, complete, error }) => { const processResults = (items: T[]) => items.forEach((item, newIndex) => next({ item, newIndex, prevIndex: -1 })); try { - if (results === pending) + // is there are no active observers here, we need to query actual + // data from the store. + if (listeners === 0) results = store.query(query, options); if (isPromise(results)) { @@ -74,8 +87,14 @@ export const queryEx = (store: const updates = observe>(({ next, complete, error, isClosed }) => { try { if (!isClosed() && isDjObservableResults(results)) { + // subscribe fot the changes + listeners++; const h = results.observe((item, prevIndex, newIndex) => next({ item, prevIndex, newIndex }), includeUpdates); - return () => h.remove(); + return () => { + // unsubscribe from changes + listeners--; + h.remove(); + }; } else { complete(); } diff --git a/djx/src/test/ts/observable-tests.ts b/djx/src/test/ts/observable-tests.ts --- a/djx/src/test/ts/observable-tests.ts +++ b/djx/src/test/ts/observable-tests.ts @@ -144,30 +144,48 @@ tap.test("of(...) tests", async t => { "of(...) should terminate with error when a parameter is rejected" ); - t.same(await of(1,2,3).collect(), [1,2,3], ".collect() should return the collected sequence"); - await t.rejects(of(1,2,3).collect(cancelled), ".collect() should support cancellation"); + t.same(await of(1, 2, 3).collect(), [1, 2, 3], ".collect() should return the collected sequence"); + await t.rejects(of(1, 2, 3).collect(cancelled), ".collect() should support cancellation"); }).catch(() => { }); tap.test(".tap() tests", async t => { const side: number[] = []; - of(1,2) - .tap({next: v => side.push(v), complete: () => side.push(0)}) - .tap({next: v => side.push(v*v)}) + of(1, 2) + .tap({ next: v => side.push(v), complete: () => side.push(0) }) + .tap({ next: v => side.push(v * v) }) .subscribe({}); - t.same(side, [1,1,2,4,0], ".tap() should be called in the order of registration"); + t.same(side, [1, 1, 2, 4, 0], ".tap() should be called in the order of registration"); side.length = 0; await new Promise(resolve => { - of(1,2,delay(1).then(() => 3)) - .tap({next: v => side.push(v)}) - .tap({ next: v => v === 1 && resolve()}) + of(1, 2, delay(1).then(() => 3)) + .tap({ next: v => side.push(v) }) + .tap({ next: v => v === 1 && resolve() }) .subscribe({}); }); - t.same(side, [1,2], ".tap() should be processed synchronously"); + t.same(side, [1, 2], ".tap() should be processed synchronously"); + +}).catch(() => { }); + +tap.test(".while() tests", async t => { + + const seq = of(1, 2, 3, 4).while(v => v <= 2); + + t.same(await seq.collect(), [1, 2], "Should collect only taken elements"); -}).catch(() => {}); \ No newline at end of file + const data: number[] = []; + let complete = 0; + seq.subscribe({ + next: v => data.push(v), + complete: () => complete++ + }); + + t.same(data, [1, 2], "Should receive only taken elements"); + t.equal(complete, 1, "Complete should run once"); + +}).catch(() => { }); \ No newline at end of file diff --git a/playground/build.gradle b/playground/build.gradle --- a/playground/build.gradle +++ b/playground/build.gradle @@ -56,8 +56,8 @@ tasks.matching{ it.name =~ /^configureTs } } -npmInstall { - //npmInstall.dependsOn it +task npmInstallLocalDeps { + npmInstall.dependsOn it dependsOn configurations.npmLocal doFirst {