##// END OF EJS Templates
added store::get method to wrap up dojo/store/get
cin -
r136:435ce00ba245 v1.9.0-rc2 default
parent child
Show More
@@ -0,0 +1,65
1 import Memory = require("dojo/store/Memory");
2 import Observerable = require("dojo/store/Observable");
3 import { get } from "./store";
4 import tap = require("tap");
5
6 interface Person {
7 id: string;
8
9 name: string;
10
11 age: number;
12 }
13
14 tap.test("store::get(...) tests", async t => {
15 const store = new Observerable(new Memory<Person>());
16
17 const getPerson = get(store);
18
19 const peterId = "id:peter";
20
21 const samId = "id:sam";
22
23 const peter = getPerson(peterId);
24 const sam = getPerson(samId);
25
26 const seq1 = await getPerson(peterId, { observe: false }).collect();
27
28 t.ok(seq1.length === 0, "Should be empty sequence");
29
30 let peterChangeCount = 0;
31 let samChangeCount = 0;
32 let peterDeleted = 0;
33
34 const peterSubscription = peter.subscribe({
35 next: () => peterChangeCount++,
36 complete: () => peterDeleted++
37 });
38 const samSubscription = sam.subscribe({
39 next: () => samChangeCount++
40 });
41
42 try {
43 t.equal(peterChangeCount, 0, "Should be no changes recorded");
44
45 store.put({id: peterId, name: "Peter", age: 30 });
46
47 t.equal(peterChangeCount, 1, "Should record 1 object change");
48 t.equal(samChangeCount, 0, "Should not record other object changes");
49
50 store.remove(peterId);
51
52 t.equal(peterDeleted, 1, "Should complete sequence");
53 t.equal(peterChangeCount, 1, "Should not record remove operations");
54
55 store.put({id: peterId, name: "Peter", age: 29});
56
57 t.equal(peterChangeCount, 1, "Should not record changes after completion");
58
59 } finally {
60 peterSubscription.unsubscribe();
61 samSubscription.unsubscribe();
62 }
63
64
65 }).catch(() => { }); No newline at end of file
@@ -1,402 +1,397
1 1 import { Cancellation } from "@implab/core-amd/Cancellation";
2 2 import { ICancellation } from "@implab/core-amd/interfaces";
3 3 import { isPromise } from "@implab/core-amd/safe";
4 4
5 5 /**
6 6 * The interface for the consumer of an observable sequence
7 7 */
8 8 export interface Observer<T> {
9 9 /**
10 10 * Called for the next element in the sequence
11 11 */
12 12 next?: (value: T) => void;
13 13
14 14 /**
15 15 * Called once when the error occurs in the sequence.
16 16 */
17 17 error?: (e: unknown) => void;
18 18
19 19 /**
20 20 * Called once at the end of the sequence.
21 21 */
22 22 complete?: () => void;
23 23 }
24 24
25 25 /**
26 26 * The group of functions to feed an observable. These methods are provided to
27 27 * the producer to generate a stream of events.
28 28 */
29 29 export type Sink<T> = {
30 30 /**
31 31 * Call to send the next element in the sequence
32 32 */
33 33 next: (value: T) => void;
34 34
35 35 /**
36 36 * Call to notify about the error occurred in the sequence.
37 37 */
38 38 error: (e: unknown) => void;
39 39
40 40 /**
41 41 * Call to signal the end of the sequence.
42 42 */
43 43 complete: () => void;
44 44
45 45 /**
46 46 * Checks whether the sink is accepting new elements. It's safe to
47 47 * send elements to the closed sink.
48 48 */
49 49 isClosed: () => boolean;
50 50 };
51 51
52 52 export type Producer<T> = (sink: Sink<T>) => (void | (() => void));
53 53
54 54 export interface Unsubscribable {
55 55 unsubscribe(): void;
56 56 }
57 57
58 58 export const isUnsubscribable = (v: unknown): v is Unsubscribable =>
59 59 v !== null && v !== undefined && typeof (v as Unsubscribable).unsubscribe === "function";
60 60
61 61 export const isSubscribable = <T = unknown>(v: unknown): v is Subscribable<T> =>
62 62 v !== null && v !== undefined && typeof (v as Subscribable<unknown>).subscribe === "function";
63 63
64 64 export interface Subscribable<T> {
65 65 subscribe(consumer: Observer<T>): Unsubscribable;
66 66 }
67 67
68 68 export type AccumulatorFn<T, A> = (acc: A, value: T) => A;
69 69
70 export type OperatorFn<T, U> = (source: Observable<T>) => Producer<U>;
70 export type OperatorFn<T, U> = (source: Observable<T>) => Observable<U>;
71 71
72 72 /** The observable source of items. */
73 73 export interface Observable<T> extends Subscribable<T> {
74 74 /** Transforms elements of the sequence with the specified mapper
75 75 *
76 76 * @param mapper The mapper used to transform the values
77 77 */
78 78 map<T2>(mapper: (value: T) => T2): Observable<T2>;
79 79
80 80 /** Filters elements of the sequence. The resulting sequence will
81 81 * contain only elements which match the specified predicate.
82 82 *
83 83 * @param predicate The filter predicate.
84 84 */
85 85 filter(predicate: (value: T) => boolean): Observable<T>;
86 86
87 87 /** Completes the sequence once the condition is met.
88 88 * @param predicate The condition which should be met to complete the sequence
89 89 */
90 90 until(predicate: (value: T) => boolean): Observable<T>;
91 91
92 92 /** Keeps the sequence running while elements satisfy the condition.
93 93 *
94 94 * @param predicate The condition which should be met to continue.
95 95 */
96 96 while(predicate: (value: T) => boolean): Observable<T>;
97 97
98 98 /** Applies accumulator to each value in the sequence and
99 99 * emits the accumulated value for each source element
100 100 *
101 101 * @param accumulator
102 102 * @param initial
103 103 */
104 104 scan<A>(accumulator: AccumulatorFn<T, A>, initial: A): Observable<A>;
105 105 scan(accumulator: AccumulatorFn<T, T>): Observable<T>;
106 106
107 107 /** Applies accumulator to each value in the sequence and
108 108 * emits the accumulated value at the end of the sequence
109 109 *
110 110 * @param accumulator
111 111 * @param initial
112 112 */
113 113 reduce<A>(accumulator: AccumulatorFn<T, A>, initial: A): Observable<A>;
114 114 reduce(accumulator: AccumulatorFn<T, T>): Observable<T>;
115 115
116 116 /** Concatenates the specified sequences with this observable
117 117 *
118 118 * @param seq sequences to concatenate with the current observable
119 119 *
120 120 * The concatenation doesn't accumulate values from the specified sequences,
121 121 * The result of the concatenation is the new observable which will switch
122 122 * to the next observable after the previous one completes. Values emitted
123 123 * before the next observable being active are lost.
124 124 */
125 125 cat(...seq: Subscribable<T>[]): Observable<T>;
126 126
127 127
128 128 /** Pipes the specified operator to produce the new observable
129 129 * @param op The operator consumes this observable and produces a new one
130 130 *
131 131 * The operator is a higher order function which takes a source observable
132 132 * and returns a producer for the new observable.
133 133 *
134 134 * This function can be used to create a complex mapping between source and
135 135 * resulting observables. The operator may have a state (or a side effect)
136 136 * and can be connected to multiple observables.
137 137 */
138 138 pipe<U>(op: OperatorFn<T, U>): Observable<U>;
139 139
140 140 /** Waits for the next event to occur and returns a promise for the next value
141 141 * @param ct Cancellation token
142 142 */
143 143 next(ct?: ICancellation): Promise<T>;
144 144
145 145 /** Collects items of the sequence to the array. */
146 146 collect(ct?: ICancellation): Promise<T[]>;
147 147 }
148 148
149 149 const noop = () => { };
150 150
151 151 const sink = <T>(consumer: Observer<T>) => {
152 152 const { next, error, complete } = consumer;
153 153 return {
154 154 next: next ? next.bind(consumer) : noop,
155 155 error: error ? error.bind(consumer) : noop,
156 156 complete: complete ? complete.bind(consumer) : noop,
157 157 isClosed: () => false
158 158 };
159 159 };
160 160
161 161 /** Wraps the producer to handle tear down logic and subscription management
162 162 *
163 163 * @param producer The producer to wrap
164 164 * @returns The wrapper producer
165 165 */
166 166 const fuse = <T>(producer: Producer<T>) => ({ next, error, complete }: Sink<T>) => {
167 167 let done = false;
168 168 let cleanup = noop;
169 169
170 170 const _fin = <A extends unknown[]>(fn: (...args: A) => void) =>
171 171 (...args: A) => done ?
172 172 void (0) :
173 173 (done = true, cleanup(), fn(...args));
174 174
175 175 const _fin0 = () => done ? void (0) : (done = true, cleanup());
176 176
177 177 const safeSink = {
178 178 next: (value: T) => { !done && next(value); },
179 179 error: _fin(error),
180 180 complete: _fin(complete),
181 181 isClosed: () => done
182 182 };
183 183 cleanup = producer(safeSink) ?? noop;
184 184 return done ? cleanup() : _fin0;
185 185 };
186 186
187 187 const _observe = <T>(producer: Producer<T>): Observable<T> => ({
188 188 subscribe: (consumer: Observer<T>) => ({
189 189 unsubscribe: producer(sink(consumer)) ?? noop
190 190 }),
191 191
192 192 map: (mapper) => _observe(({ next, ...rest }) =>
193 193 producer({
194 194 next: next !== noop ? (v: T) => next(mapper(v)) : noop,
195 195 ...rest
196 196 })
197 197 ),
198 198
199 199 filter: (predicate) => _observe(({ next, ...rest }) =>
200 200 producer({
201 201 next: next !== noop ? (v: T) => predicate(v) ? next(v) : void (0) : noop,
202 202 ...rest
203 203 })
204 204 ),
205 205
206 206 until: predicate => _observe(({ next, complete, ...rest }) =>
207 207 producer({
208 208 next: v => predicate(v) ? complete() : next(v),
209 209 complete,
210 210 ...rest
211 211 })
212 212 ),
213 213
214 214 while: predicate => _observe(({ next, complete, ...rest }) =>
215 215 producer({
216 216 next: v => predicate(v) ? next(v) : complete(),
217 217 complete,
218 218 ...rest
219 219 })
220 220 ),
221 221
222 222 scan: <A>(...args: [AccumulatorFn<T, A>, A] | [AccumulatorFn<T, T>]) => _observe<T | A>(({ next, ...rest }) => {
223 223 if (args.length === 1) {
224 224 const [accumulator] = args;
225 225 let _acc: T;
226 226 let index = 0;
227 227 return producer({
228 228 next: next !== noop ? (v: T) => next(index++ === 0 ? _acc = v : _acc = accumulator(_acc, v)) : noop,
229 229 ...rest
230 230 });
231 231 } else {
232 232 const [accumulator, initial] = args;
233 233 let _acc = initial;
234 234 return producer({
235 235 next: next !== noop ? (v: T) => next(_acc = accumulator(_acc, v)) : noop,
236 236 ...rest
237 237 });
238 238 }
239 239 }),
240 240
241 241 reduce: <A>(...args: [AccumulatorFn<T, A>, A] | [AccumulatorFn<T, T>]) => _observe<T | A>(({ next, complete, error, ...rest }) => {
242 242 if (args.length === 1) {
243 243 const [accumulator] = args;
244 244 let _acc: T;
245 245 let index = 0;
246 246 return producer({
247 247 next: next !== noop ? (v: T) => {
248 248 _acc = index++ === 0 ? v : accumulator(_acc, v);
249 249 } : noop,
250 250 complete: () => {
251 251 if (index === 0) {
252 252 error(new Error("The sequence can't be empty"));
253 253 } else {
254 254 next(_acc);
255 255 complete();
256 256 }
257 257 },
258 258 error,
259 259 ...rest
260 260 });
261 261 } else {
262 262 const [accumulator, initial] = args;
263 263 let _acc = initial;
264 264 return producer({
265 265 next: next !== noop ? (v: T) => {
266 266 _acc = accumulator(_acc, v);
267 267 } : noop,
268 268 complete: () => {
269 269 next(_acc);
270 270 complete();
271 271 },
272 272 error,
273 273 ...rest
274 274 });
275 275 }
276 276 }),
277 277
278 278 cat: (...seq) => _observe(({ next, complete: final, ...rest }) => {
279 279 let cleanup: () => void;
280 const complete = () => {
281 const continuation = seq.shift();
282 if (continuation) {
283 // if we have a next sequence, subscribe to it
284 const subscription = continuation.subscribe({ next, complete, ...rest });
280 const len = seq.length;
281 const complete = (i: number) => i < len ?
282 () => {
283 const subscription = seq[i].subscribe({ next, complete: complete(i + 1), ...rest });
285 284 cleanup = subscription.unsubscribe.bind(subscription);
286 } else {
287 // otherwise notify the consumer about completion
288 final();
289 }
290 };
285 } : final;
291 286
292 cleanup = producer({ next, complete, ...rest }) ?? noop;
287 cleanup = producer({ next, complete: complete(0), ...rest }) ?? noop;
293 288
294 289 return () => cleanup();
295 290 }),
296 291
297 pipe: <U>(op: OperatorFn<T,U>) => observe(op(_observe(producer))),
292 pipe: <U>(op: OperatorFn<T, U>) => op(_observe(producer)),
298 293
299 294 next: collect(
300 295 producer,
301 296 ({ next, complete, error, isClosed }) => ({
302 297 next: v => (next(v), complete()),
303 298 complete: () => error(new Error("The sequence is empty")),
304 299 error,
305 300 isClosed
306 301 })
307 302 ),
308 303
309 304 collect: collect(
310 305 producer,
311 306 ({ next, complete, ...rest }) => {
312 307 const data: T[] = [];
313 308 return {
314 309 next: v => data.push(v),
315 310 complete: () => (next(data), complete()),
316 311 ...rest
317 312 };
318 313 }
319 314 )
320 315 });
321 316
322 317 const collect = <T, U>(
323 318 producer: Producer<T>,
324 319 collector: (result: Sink<U>) => Sink<T>
325 320 ) => (ct = Cancellation.none) => new Promise<U>((resolve, reject) => {
326 321 const fused = fuse<U>(({ next, complete, error, isClosed }) => {
327 322 const h = ct.register(error);
328 323 const cleanup = !isClosed() ?
329 324 producer(collector({ next, complete, error, isClosed })) ?? noop :
330 325 noop;
331 326
332 327 return () => {
333 328 h.destroy();
334 329 cleanup();
335 330 };
336 331 });
337 332
338 333 fused({
339 334 next: resolve,
340 335 error: reject,
341 336 complete: noop,
342 337 isClosed: () => false
343 338 });
344 339 });
345 340
346 341 export const observe = <T>(producer: Producer<T>) => _observe(fuse(producer));
347 342
348 343 /** Converts an array to the observable sequence of its elements. */
349 344 export const ofArray = <T>(items: T[]) => _observe<T>(
350 345 ({ next, complete }) => (
351 346 items.forEach(next),
352 347 complete()
353 348 )
354 349 );
355 350
356 351 /** Converts a subscribable to the observable */
357 352 export const ofSubscribable = <T>(subscribable: Subscribable<T>) =>
358 353 observe(sink => {
359 354 const subscription = subscribable.subscribe(sink);
360 355 return () => subscription.unsubscribe();
361 356 });
362 357
363 358 const of1 = <T>(item: T | PromiseLike<T>) => observe<T>(
364 359 ({ next, error, complete }) =>
365 360 isPromise(item) ?
366 361 void item.then(
367 362 v => (next(v), complete()),
368 363 error
369 364 ) :
370 365 (next(item), complete())
371 366 );
372 367
373 368 /** Converts a list of parameter values to the observable sequence. The
374 369 * order of elements in the list will be preserved in the resulting sequence.
375 370 */
376 371 export const of = <T>(...items: (T | PromiseLike<T>)[]) => items.length === 1 ?
377 372 of1(items[0]) :
378 373 observe<T>(
379 374 ({ next, error, complete, isClosed }) => {
380 375 const n = items.length;
381 376
382 377 const _next = (start: number) => {
383 378 if (start > 0 && isClosed()) // when resumed
384 379 return;
385 380
386 381 for (let i = start; i < n; i++) {
387 382 const r = items[i];
388 383 if (isPromise(r)) {
389 384 r.then(v => (next(v), _next(i + 1)), error);
390 385 return; // suspend
391 386 } else {
392 387 next(r);
393 388 }
394 389 }
395 390 complete();
396 391 };
397 392
398 393 _next(0);
399 394 }
400 395 );
401 396
402 397 export const empty = _observe<never>(({ complete }) => complete());
@@ -1,105 +1,105
1 1 import { Cancellation } from "@implab/core-amd/Cancellation";
2 2 import { CancelledError } from "@implab/core-amd/CancelledError";
3 import { Producer, Sink, Subscribable } from "../observable";
3 import { Observable, Sink, Subscribable, observe } from "../observable";
4 4 import { Scope } from "../tsx/Scope";
5 5
6 6 /**
7 7 * Creates a buffer with the specified length. The buffer will immediately
8 8 * subscribe to the source observable and start accumulating values.
9 9 *
10 10 * The resulting observable will repeat the buffered values for each new subscriber.
11 11 *
12 12 * @param length The number of elements to store.
13 13 * @param ct Cancellation token to unsubscribe from the original observable.
14 14 *
15 15 */
16 export const buffer = (length: number, ct = Cancellation.none) => <T>(source: Subscribable<T>): Producer<T> => {
16 export const buffer = (length: number, ct = Cancellation.none) => <T>(source: Subscribable<T>): Observable<T> => {
17 17 type Status = "active" | "complete" | "error";
18 18
19 19 // ring-buffer, wpos will rotate in range (0...length-1)
20 20 let wpos = 0;
21 21 const buffer: T[] = [];
22 22
23 23 // writes the next value to the buffer
24 24 const write = (value: T) => {
25 25 buffer[wpos] = value;
26 26 wpos = (wpos + 1) % length;
27 27 };
28 28
29 29 // reads contents of the buffer
30 30 // cb will be called for each value in the buffer
31 31 const read = (cb: (item: T) => void) => {
32 32 const start = wpos + length - buffer.length;
33 33 const end = wpos + length;
34 34
35 35 for(let pos = start; pos < end; pos++ )
36 36 cb(buffer[pos % length]);
37 37 };
38 38
39 39 let status: Status = "active";
40 40 let lastError: unknown;
41 41 let subscribers: Sink<T>[] = [];
42 42
43 43 const scope = new Scope();
44 44
45 45 // cleanup method to release resources held by this subscription
46 46 const cleanup = (cb: (item: Sink<T>) => void) => {
47 47 scope.destroy();
48 48 const _subscribers = subscribers;
49 49 subscribers = [];
50 50 _subscribers.forEach(cb);
51 51 };
52 52
53 53 const sink: Sink<T> = {
54 54 isClosed: () => status !== "active",
55 55 complete: () => {
56 56 if (status === "active") {
57 57 status = "complete";
58 58 cleanup(s => s.complete());
59 59 }
60 60 },
61 61 error: e => {
62 62 if (status === "active") {
63 63 status = "error";
64 64 lastError = e;
65 65 cleanup(s => s.error(e));
66 66 }
67 67 },
68 68 next: v => {
69 69 if (status === "active") {
70 70 write(v);
71 71 const _subscribers = subscribers;
72 72 _subscribers.forEach(s => s.next(v));
73 73 }
74 74 }
75 75 };
76 76
77 77 if (ct.isRequested()) {
78 78 sink.error(new CancelledError("The operation was cancelled", ct));
79 79 } else {
80 80 scope.own(source.subscribe(sink));
81 81 scope.own(ct.register(e => sink.error(e)));
82 82 }
83 83
84 return (s: Sink<T>) => {
84 return observe( s => {
85 85 const _subscribers = subscribers;
86 86 read(s.next);
87 87 switch (status) {
88 88 case "active":
89 89 subscribers.push(s);
90 90 return () => {
91 91 if (_subscribers === subscribers) {
92 92 const pos = subscribers.indexOf(s);
93 93 if (pos >= 0)
94 94 subscribers.splice(pos, 1);
95 95 }
96 96 };
97 97 case "complete":
98 98 s.complete();
99 99 break;
100 100 case "error":
101 101 s.error(lastError);
102 102 break;
103 103 }
104 };
104 });
105 105 }; No newline at end of file
@@ -1,50 +1,50
1 import { Producer, Sink, Subscribable } from "../observable";
1 import { Observable, Sink, Subscribable, observe } from "../observable";
2 2
3 3 const noop = () => { };
4 4
5 5 /** Joins multiple subscribers to the single one. The resulting subscriber
6 6 * will be created when the first client subscribes and will be released
7 7 * with the the last client unsubscribed.
8 8 *
9 9 * Use this wrapper to prevent spawning multiple producers.
10 10 *
11 11 * @param source The source observable
12 12 * @returns The wrapped producer
13 13 */
14 export const subject = <T>(source: Subscribable<T>): Producer<T> => {
14 export const subject = <T>(source: Subscribable<T>): Observable<T> => {
15 15 let subscribers: Sink<T>[] = [];
16 16
17 17 let subscription = { unsubscribe: noop };
18 18
19 19 // cleanup method to release resources held by this subscription
20 20 const cleanup = (cb: (item: Sink<T>) => void) => {
21 21 const _subscribers = subscribers;
22 22 subscribers = [];
23 23 _subscribers.forEach(cb);
24 24 subscription.unsubscribe();
25 25 };
26 26
27 27 const sink: Sink<T> = {
28 28 isClosed: () => false,
29 29 complete: () => cleanup(s => s.complete()),
30 30 error: e => cleanup(s => s.error(e)),
31 31 next: v => subscribers.forEach(s => s.next(v))
32 32 };
33 33
34 return client => {
34 return observe(client => {
35 35 const _subscribers = subscribers;
36 36 subscribers.push(client);
37 37 if (subscribers.length === 1)
38 38 subscription = source.subscribe(sink);
39 39
40 40 return () => {
41 41 if (_subscribers === subscribers) {
42 42 const pos = subscribers.indexOf(client);
43 43 if (pos >= 0)
44 44 subscribers.splice(pos, 1);
45 45 if (!subscribers.length)
46 46 subscription.unsubscribe();
47 47 }
48 48 };
49 };
49 });
50 50 };
@@ -1,61 +1,123
1 1 import { PromiseOrValue } from "@implab/core-amd/interfaces";
2 import { isPromise } from "@implab/core-amd/safe";
3 import { observe, Observable } from "./observable";
2 import { isCancellable, isPromise } from "@implab/core-amd/safe";
3 import { observe, Observable, empty } from "./observable";
4 import { after } from "dojo/aspect";
5 import { subject } from "./operators/subject";
4 6
5 7 export interface OrderedUpdate<T> {
6 8 /** The item is being updated */
7 9 readonly item: T;
8 10
9 11 /** The previous index of the item, -1 in case it is inserted */
10 12 readonly prevIndex: number;
11 13
12 14 /** The new index of the item, -1 in case it is deleted */
13 15 readonly newIndex: number;
14 16
15 17 }
16 18
17 19 export type QueryResults<T> = Observable<OrderedUpdate<T>>;
18 20
19 21 interface DjObservableResults<T> {
20 22 /**
21 23 * Allows observation of results
22 24 */
23 25 observe(listener: (object: T, previousIndex: number, newIndex: number) => void, includeUpdates?: boolean): {
24 26 remove(): void;
25 27 };
26 28 }
27 29
28 30 interface Queryable<T, Q, O> {
29 31 query(query?: Q, options?: O): PromiseOrValue<T[]>;
30 32 }
31 33
32 34 export const isDjObservableResults = <T>(v: object): v is DjObservableResults<T> =>
33 35 v && (typeof (v as { observe?: unknown; }).observe === "function");
34 36
35 37 export const query = <T, Q, O>(store: Queryable<T, Q, O>, includeUpdates = true) =>
36 38 (query?: Q, options?: O & { observe?: boolean }) => {
37 39 return observe<OrderedUpdate<T>>(({ next, complete, error, isClosed }) => {
38 40
39 41 const processResults = (items: T[]) =>
40 42 items.forEach((item, newIndex) => next({ item, newIndex, prevIndex: -1 }));
41 43
42 44 try {
43 45 const results = store.query(query, options);
44 46 if (isPromise(results)) {
45 47 results.then(processResults).then(undefined, error);
46 48 } else {
47 49 processResults(results);
48 50 }
49 51
50 52 if (!isClosed() && (options?.observe !== false) && isDjObservableResults<T>(results)) {
51 53 const h = results.observe((item, prevIndex, newIndex) => next({ item, prevIndex, newIndex }), includeUpdates);
52 54 return () => h.remove();
53 55 } else {
54 56 complete();
55 57 }
56 58 } catch (err) {
57 59 error(err);
58 60 }
59 61 });
60 62
61 63 };
64
65 interface IndexedStore<T> {
66 get(id: string | number): PromiseLike<T> | T | null | undefined;
67 }
68
69 interface Notifications<T> {
70 notify(item: T | undefined, id: string | number | undefined): void;
71 }
72
73 const hasNotifications = <T>(x: unknown): x is Notifications<T> =>
74 typeof x === "object" && x !== null && (typeof (x as Notifications<T>).notify === "function");
75
76 interface GetOpts {
77 observe?: boolean;
78 }
79
80 type Change<T> = [item: NonNullable<T>, id: string | number | undefined] |
81 [item: undefined | null, id: string | number];
82
83 const filterItem = (itemId: string | number) =>
84 <T>(source: Observable<Change<T>>) =>
85 observe<T>(({ next, complete, error }) => {
86 const subscription = source
87 .filter(([, id]) => id === itemId)
88 .subscribe({
89 next: ([item]) => item !== null && item !== undefined ? next(item) : complete(),
90 complete,
91 error
92 });
93 return () => subscription.unsubscribe();
94 });
95
96 export const get = <T>(store: IndexedStore<T>) => {
97 const changes = hasNotifications<T>(store) ?
98 observe<Change<T>>(({ next }) => {
99 const handle = after(store, "notify", (...args: Change<T>) => next(args), true);
100 return () => handle.remove();
101 }).pipe(subject) : empty;
102
103
104 return (id: string | number, opts: GetOpts = {}) =>
105 observe<T>(({ next, complete, error }) => {
106 const result = store.get(id);
107
108 const handle = (x: T | null | undefined) => {
109 if (x !== null && x !== undefined)
110 next(x);
111 complete();
112 };
113
114 if (isPromise(result)) {
115 result.then(handle, error);
116
117 if (isCancellable(result))
118 return () => result.cancel();
119 } else {
120 handle(result);
121 }
122 }).cat(opts.observe !== false ? changes.pipe(filterItem(id)) : empty);
123 }; No newline at end of file
@@ -1,150 +1,150
1 1 import { empty, observe, of } from "./observable";
2 2 import * as tap from "tap";
3 3 import { Cancellation } from "@implab/core-amd/Cancellation";
4 4 import { delay } from "@implab/core-amd/safe";
5 5
6 6 const subj1 = observe<number>(({ next, complete }) => {
7 7 next(1);
8 8 complete();
9 9 next(2);
10 10 });
11 11
12 12 const consumer1 = {
13 13 sum: 0,
14 14 next(v: number) {
15 15 this.sum += v;
16 16 }
17 17 };
18 18
19 19 subj1.subscribe(consumer1);
20 20 tap.equal(consumer1.sum, 1, "Should get only one value");
21 21
22 22 subj1.subscribe(consumer1);
23 23 tap.equal(consumer1.sum, 2, "Should get the value again");
24 24
25 25 const consumer2 = {
26 26 value: 0,
27 27 completed: false,
28 28 next(v: number) { this.value = v; },
29 29 complete() { this.completed = true; }
30 30 };
31 31
32 32 let maps = 0;
33 33
34 34 subj1
35 35 .map(v => {
36 36 tap.comment(`map1: ${v * 2}`);
37 37 maps++;
38 38 return v * 2;
39 39 })
40 40 .map(v => {
41 41 tap.comment(`map2: ${v * 2}`);
42 42 maps++;
43 43 return v * 2;
44 44 })
45 45 .map(v => {
46 46 tap.comment(`map3: ${v * 2}`);
47 47 maps++;
48 48 return v * 2;
49 49 })
50 50 .subscribe(consumer2);
51 51
52 52 tap.equal(consumer2.value, 8, "Should map");
53 53 tap.equal(maps, 3, "The map chain should not be executed after completion");
54 54 tap.ok(consumer2.completed, "The completion signal should pass through");
55 55
56 56 const subj2 = observe<number>(({ next, complete }) => {
57 57 [1, 2, 3, 4, 5].forEach(next);
58 58 complete();
59 59 return () => {
60 60 tap.comment("subj2: unsubscribe");
61 61 };
62 62 });
63 63
64 64 const consumer3 = {
65 65 even: 0,
66 66 odd: 0,
67 67 completed: false,
68 68 subscribed: 0,
69 69 unsubscribed: 0,
70 70 next(v: "even" | "odd") {
71 71 this[v]++;
72 72 },
73 73 complete() {
74 74 this.completed = true;
75 75 },
76 76 subscribe() {
77 77 this.subscribed++;
78 78 },
79 79 unsubscribe() {
80 80 this.unsubscribed++;
81 81 }
82 82 };
83 83
84 84
85 const subj3 = subj2.pipe<"even" | "odd">(self => ({ next, complete, error }) => {
85 const subj3 = subj2.pipe<"even" | "odd">(self => observe(({ next, complete, error }) => {
86 86 consumer3.subscribe();
87 87 let count = 0;
88 88 const h = self.subscribe({
89 89 next: val => {
90 90 if (val % 2 === 0)
91 91 next("odd");
92 92 else
93 93 next("even");
94 94 if (++count === 4)
95 95 complete();
96 96 },
97 97 complete,
98 98 error
99 99 });
100 100 return () => {
101 101 consumer3.unsubscribe();
102 102 h.unsubscribe();
103 103 };
104 });
104 }));
105 105
106 106 subj3.subscribe(consumer3);
107 107
108 108 tap.equal(consumer3.odd, 2, "Should get 2 odd elements");
109 109 tap.equal(consumer3.even, 2, "Should get 2 even elements");
110 110 tap.ok(consumer3.completed, "The sequence should completed");
111 111 tap.equal(consumer3.subscribed, 1, "The subscription should be done once");
112 112 tap.equal(consumer3.unsubscribed, 1, "The cleanup should be done after completion");
113 113
114 114 subj2.reduce((a, b) => a + b).subscribe({
115 115 next: val => tap.comment("subj2: reduce =", val),
116 116 complete: () => tap.comment("subj2: complete")
117 117 });
118 118
119 119 tap.test("of(...) tests", async t => {
120 120 await subj2.reduce((a, b) => a + b).next()
121 121 .then(value => t.comment("subj2: next reduce=", value));
122 122
123 123 await subj2.next().then(val => t.equal(val, 1, "Should peek the first element"));
124 124
125 125 const cancelled = new Cancellation(cancel => cancel());
126 126 await t.rejects(subj2.next(cancelled), "Cancelled next() method should fail");
127 127
128 128 await t.rejects(empty.next(), "Empty sequence should fail to get next element");
129 129
130 130 await of(delay(1).then(() => 1), Promise.resolve(2), 3)
131 131 .reduce<number[]>((a, x) => [...a, x], [])
132 132 .next()
133 133 .then(res => t.same(res, [1, 2, 3], "of(...) should keep the order"));
134 134
135 135 const rejected = Promise.reject("DIE!");
136 136 rejected.catch(() => { }); // SAFE AND SOUND
137 137
138 138 await t.resolves(
139 139 of(Promise.resolve(1), rejected).next(),
140 140 "of(...) should emit non-rejected items"
141 141 );
142 142 await t.rejects(
143 143 of(1, Promise.reject("DIE!")).reduce((a) => a).next(),
144 144 "of(...) should terminate with error when a parameter is rejected"
145 145 );
146 146
147 147 t.same(await of(1,2,3).collect(), [1,2,3], ".collect() should return the collected sequence");
148 148 await t.rejects(of(1,2,3).collect(cancelled), ".collect() should support cancellation");
149 149
150 150 }).catch(() => { }); No newline at end of file
@@ -1,4 +1,5
1 1 import "./declare-tests";
2 2 import "./observable-tests";
3 3 import "./state-tests";
4 4 import "./subject-tests";
5 import "./observable-store-tests";
@@ -1,60 +1,59
1 1 import { id as mid } from "module";
2 2 import { djbase, djclass } from "@implab/djx/declare";
3 import { attach, bind, createElement, prop } from "@implab/djx/tsx";
3 import { bind, createElement, prop } from "@implab/djx/tsx";
4 4 import { DjxWidgetBase } from "@implab/djx/tsx/DjxWidgetBase";
5 5 import Form from "@implab/djx/form/Form";
6 import { LocalDateTime } from "@js-joda/core";
7 6 import { TraceSource } from "@implab/core-amd/log/TraceSource";
8 7 import DateTextBox = require("dijit/form/DateTextBox");
9 8 import Button = require("dijit/form/Button");
10 9 import ValidationTextBox = require("dijit/form/ValidationTextBox");
11 10
12 11 const trace = TraceSource.get(mid);
13 12
14 13 interface NewAppointmentProps {
15 14 title: string;
16 15
17 16 startAt?: Date;
18 17 }
19 18
20 19 @djclass
21 20 export default class NewAppointment extends djbase(DjxWidgetBase) {
22 21 value: NewAppointmentProps = {title: "Appointment 1"};
23 22
24 23 render() {
25 24 return <section>
26 25 <Form<NewAppointmentProps> ref={bind("value", prop(this, "value"))}
27 26 method="DIALOG"
28 27 onSubmit={this._onSubmit}
29 28 onReset={this._onReset}
30 29 onValidStateChange={this._onValidStateChange}
31 30 onValueChange={this._onValueChange}
32 31 >
33 32 <p><label>Title: <ValidationTextBox required name="title"/></label></p>
34 33 <p><label>Appointment date: <DateTextBox required name="startAt"/></label></p>
35 34 <footer>
36 35 <Button type="submit">Send</Button>
37 36 <Button type="reset">Reset</Button>
38 37 </footer>
39 38 </Form>
40 39 </section>;
41 40 }
42 41
43 42 private readonly _onSubmit = (evt: Event) => {
44 43 trace.debug("onSubmit");
45 44 };
46 45
47 46 private readonly _onValidStateChange = (isValid?: boolean) => {
48 47 trace.debug("isValid={0}", isValid);
49 48 };
50 49
51 50 private readonly _onValueChange = (value: NewAppointmentProps) => {
52 51 trace.debug("valueChange={0}", value);
53 52 };
54 53
55 54 private readonly _onReset = (evt: Event) => {
56 55 trace.debug("onReset");
57 56 //evt.preventDefault();
58 57 };
59 58
60 59 } No newline at end of file
General Comments 0
You need to be logged in to leave comments. Login now