##// END OF EJS Templates
added store::get method to wrap up dojo/store/get
cin -
r136:435ce00ba245 v1.9.0-rc2 default
parent child
Show More
@@ -0,0 +1,65
1 import Memory = require("dojo/store/Memory");
2 import Observerable = require("dojo/store/Observable");
3 import { get } from "./store";
4 import tap = require("tap");
5
6 interface Person {
7 id: string;
8
9 name: string;
10
11 age: number;
12 }
13
14 tap.test("store::get(...) tests", async t => {
15 const store = new Observerable(new Memory<Person>());
16
17 const getPerson = get(store);
18
19 const peterId = "id:peter";
20
21 const samId = "id:sam";
22
23 const peter = getPerson(peterId);
24 const sam = getPerson(samId);
25
26 const seq1 = await getPerson(peterId, { observe: false }).collect();
27
28 t.ok(seq1.length === 0, "Should be empty sequence");
29
30 let peterChangeCount = 0;
31 let samChangeCount = 0;
32 let peterDeleted = 0;
33
34 const peterSubscription = peter.subscribe({
35 next: () => peterChangeCount++,
36 complete: () => peterDeleted++
37 });
38 const samSubscription = sam.subscribe({
39 next: () => samChangeCount++
40 });
41
42 try {
43 t.equal(peterChangeCount, 0, "Should be no changes recorded");
44
45 store.put({id: peterId, name: "Peter", age: 30 });
46
47 t.equal(peterChangeCount, 1, "Should record 1 object change");
48 t.equal(samChangeCount, 0, "Should not record other object changes");
49
50 store.remove(peterId);
51
52 t.equal(peterDeleted, 1, "Should complete sequence");
53 t.equal(peterChangeCount, 1, "Should not record remove operations");
54
55 store.put({id: peterId, name: "Peter", age: 29});
56
57 t.equal(peterChangeCount, 1, "Should not record changes after completion");
58
59 } finally {
60 peterSubscription.unsubscribe();
61 samSubscription.unsubscribe();
62 }
63
64
65 }).catch(() => { }); No newline at end of file
@@ -67,7 +67,7 export interface Subscribable<T> {
67 67
68 68 export type AccumulatorFn<T, A> = (acc: A, value: T) => A;
69 69
70 export type OperatorFn<T, U> = (source: Observable<T>) => Producer<U>;
70 export type OperatorFn<T, U> = (source: Observable<T>) => Observable<U>;
71 71
72 72 /** The observable source of items. */
73 73 export interface Observable<T> extends Subscribable<T> {
@@ -277,24 +277,19 const _observe = <T>(producer: Producer<
277 277
278 278 cat: (...seq) => _observe(({ next, complete: final, ...rest }) => {
279 279 let cleanup: () => void;
280 const complete = () => {
281 const continuation = seq.shift();
282 if (continuation) {
283 // if we have a next sequence, subscribe to it
284 const subscription = continuation.subscribe({ next, complete, ...rest });
280 const len = seq.length;
281 const complete = (i: number) => i < len ?
282 () => {
283 const subscription = seq[i].subscribe({ next, complete: complete(i + 1), ...rest });
285 284 cleanup = subscription.unsubscribe.bind(subscription);
286 } else {
287 // otherwise notify the consumer about completion
288 final();
289 }
290 };
285 } : final;
291 286
292 cleanup = producer({ next, complete, ...rest }) ?? noop;
287 cleanup = producer({ next, complete: complete(0), ...rest }) ?? noop;
293 288
294 289 return () => cleanup();
295 290 }),
296 291
297 pipe: <U>(op: OperatorFn<T,U>) => observe(op(_observe(producer))),
292 pipe: <U>(op: OperatorFn<T, U>) => op(_observe(producer)),
298 293
299 294 next: collect(
300 295 producer,
@@ -1,6 +1,6
1 1 import { Cancellation } from "@implab/core-amd/Cancellation";
2 2 import { CancelledError } from "@implab/core-amd/CancelledError";
3 import { Producer, Sink, Subscribable } from "../observable";
3 import { Observable, Sink, Subscribable, observe } from "../observable";
4 4 import { Scope } from "../tsx/Scope";
5 5
6 6 /**
@@ -13,7 +13,7 import { Scope } from "../tsx/Scope";
13 13 * @param ct Cancellation token to unsubscribe from the original observable.
14 14 *
15 15 */
16 export const buffer = (length: number, ct = Cancellation.none) => <T>(source: Subscribable<T>): Producer<T> => {
16 export const buffer = (length: number, ct = Cancellation.none) => <T>(source: Subscribable<T>): Observable<T> => {
17 17 type Status = "active" | "complete" | "error";
18 18
19 19 // ring-buffer, wpos will rotate in range (0...length-1)
@@ -81,7 +81,7 export const buffer = (length: number, c
81 81 scope.own(ct.register(e => sink.error(e)));
82 82 }
83 83
84 return (s: Sink<T>) => {
84 return observe( s => {
85 85 const _subscribers = subscribers;
86 86 read(s.next);
87 87 switch (status) {
@@ -101,5 +101,5 export const buffer = (length: number, c
101 101 s.error(lastError);
102 102 break;
103 103 }
104 };
104 });
105 105 }; No newline at end of file
@@ -1,4 +1,4
1 import { Producer, Sink, Subscribable } from "../observable";
1 import { Observable, Sink, Subscribable, observe } from "../observable";
2 2
3 3 const noop = () => { };
4 4
@@ -11,7 +11,7 const noop = () => { };
11 11 * @param source The source observable
12 12 * @returns The wrapped producer
13 13 */
14 export const subject = <T>(source: Subscribable<T>): Producer<T> => {
14 export const subject = <T>(source: Subscribable<T>): Observable<T> => {
15 15 let subscribers: Sink<T>[] = [];
16 16
17 17 let subscription = { unsubscribe: noop };
@@ -31,7 +31,7 export const subject = <T>(source: Subsc
31 31 next: v => subscribers.forEach(s => s.next(v))
32 32 };
33 33
34 return client => {
34 return observe(client => {
35 35 const _subscribers = subscribers;
36 36 subscribers.push(client);
37 37 if (subscribers.length === 1)
@@ -46,5 +46,5 export const subject = <T>(source: Subsc
46 46 subscription.unsubscribe();
47 47 }
48 48 };
49 };
49 });
50 50 };
@@ -1,6 +1,8
1 1 import { PromiseOrValue } from "@implab/core-amd/interfaces";
2 import { isPromise } from "@implab/core-amd/safe";
3 import { observe, Observable } from "./observable";
2 import { isCancellable, isPromise } from "@implab/core-amd/safe";
3 import { observe, Observable, empty } from "./observable";
4 import { after } from "dojo/aspect";
5 import { subject } from "./operators/subject";
4 6
5 7 export interface OrderedUpdate<T> {
6 8 /** The item is being updated */
@@ -59,3 +61,63 export const query = <T, Q, O>(store: Qu
59 61 });
60 62
61 63 };
64
65 interface IndexedStore<T> {
66 get(id: string | number): PromiseLike<T> | T | null | undefined;
67 }
68
69 interface Notifications<T> {
70 notify(item: T | undefined, id: string | number | undefined): void;
71 }
72
73 const hasNotifications = <T>(x: unknown): x is Notifications<T> =>
74 typeof x === "object" && x !== null && (typeof (x as Notifications<T>).notify === "function");
75
76 interface GetOpts {
77 observe?: boolean;
78 }
79
80 type Change<T> = [item: NonNullable<T>, id: string | number | undefined] |
81 [item: undefined | null, id: string | number];
82
83 const filterItem = (itemId: string | number) =>
84 <T>(source: Observable<Change<T>>) =>
85 observe<T>(({ next, complete, error }) => {
86 const subscription = source
87 .filter(([, id]) => id === itemId)
88 .subscribe({
89 next: ([item]) => item !== null && item !== undefined ? next(item) : complete(),
90 complete,
91 error
92 });
93 return () => subscription.unsubscribe();
94 });
95
96 export const get = <T>(store: IndexedStore<T>) => {
97 const changes = hasNotifications<T>(store) ?
98 observe<Change<T>>(({ next }) => {
99 const handle = after(store, "notify", (...args: Change<T>) => next(args), true);
100 return () => handle.remove();
101 }).pipe(subject) : empty;
102
103
104 return (id: string | number, opts: GetOpts = {}) =>
105 observe<T>(({ next, complete, error }) => {
106 const result = store.get(id);
107
108 const handle = (x: T | null | undefined) => {
109 if (x !== null && x !== undefined)
110 next(x);
111 complete();
112 };
113
114 if (isPromise(result)) {
115 result.then(handle, error);
116
117 if (isCancellable(result))
118 return () => result.cancel();
119 } else {
120 handle(result);
121 }
122 }).cat(opts.observe !== false ? changes.pipe(filterItem(id)) : empty);
123 }; No newline at end of file
@@ -82,7 +82,7 const consumer3 = {
82 82 };
83 83
84 84
85 const subj3 = subj2.pipe<"even" | "odd">(self => ({ next, complete, error }) => {
85 const subj3 = subj2.pipe<"even" | "odd">(self => observe(({ next, complete, error }) => {
86 86 consumer3.subscribe();
87 87 let count = 0;
88 88 const h = self.subscribe({
@@ -101,7 +101,7 const subj3 = subj2.pipe<"even" | "odd">
101 101 consumer3.unsubscribe();
102 102 h.unsubscribe();
103 103 };
104 });
104 }));
105 105
106 106 subj3.subscribe(consumer3);
107 107
@@ -2,3 +2,4 import "./declare-tests";
2 2 import "./observable-tests";
3 3 import "./state-tests";
4 4 import "./subject-tests";
5 import "./observable-store-tests";
@@ -1,9 +1,8
1 1 import { id as mid } from "module";
2 2 import { djbase, djclass } from "@implab/djx/declare";
3 import { attach, bind, createElement, prop } from "@implab/djx/tsx";
3 import { bind, createElement, prop } from "@implab/djx/tsx";
4 4 import { DjxWidgetBase } from "@implab/djx/tsx/DjxWidgetBase";
5 5 import Form from "@implab/djx/form/Form";
6 import { LocalDateTime } from "@js-joda/core";
7 6 import { TraceSource } from "@implab/core-amd/log/TraceSource";
8 7 import DateTextBox = require("dijit/form/DateTextBox");
9 8 import Button = require("dijit/form/Button");
General Comments 0
You need to be logged in to leave comments. Login now