##// END OF EJS Templates
added store::get method to wrap up dojo/store/get
cin -
r136:435ce00ba245 v1.9.0-rc2 default
parent child
Show More
@@ -0,0 +1,65
1 import Memory = require("dojo/store/Memory");
2 import Observerable = require("dojo/store/Observable");
3 import { get } from "./store";
4 import tap = require("tap");
5
6 interface Person {
7 id: string;
8
9 name: string;
10
11 age: number;
12 }
13
14 tap.test("store::get(...) tests", async t => {
15 const store = new Observerable(new Memory<Person>());
16
17 const getPerson = get(store);
18
19 const peterId = "id:peter";
20
21 const samId = "id:sam";
22
23 const peter = getPerson(peterId);
24 const sam = getPerson(samId);
25
26 const seq1 = await getPerson(peterId, { observe: false }).collect();
27
28 t.ok(seq1.length === 0, "Should be empty sequence");
29
30 let peterChangeCount = 0;
31 let samChangeCount = 0;
32 let peterDeleted = 0;
33
34 const peterSubscription = peter.subscribe({
35 next: () => peterChangeCount++,
36 complete: () => peterDeleted++
37 });
38 const samSubscription = sam.subscribe({
39 next: () => samChangeCount++
40 });
41
42 try {
43 t.equal(peterChangeCount, 0, "Should be no changes recorded");
44
45 store.put({id: peterId, name: "Peter", age: 30 });
46
47 t.equal(peterChangeCount, 1, "Should record 1 object change");
48 t.equal(samChangeCount, 0, "Should not record other object changes");
49
50 store.remove(peterId);
51
52 t.equal(peterDeleted, 1, "Should complete sequence");
53 t.equal(peterChangeCount, 1, "Should not record remove operations");
54
55 store.put({id: peterId, name: "Peter", age: 29});
56
57 t.equal(peterChangeCount, 1, "Should not record changes after completion");
58
59 } finally {
60 peterSubscription.unsubscribe();
61 samSubscription.unsubscribe();
62 }
63
64
65 }).catch(() => { }); No newline at end of file
@@ -1,402 +1,397
1 import { Cancellation } from "@implab/core-amd/Cancellation";
1 import { Cancellation } from "@implab/core-amd/Cancellation";
2 import { ICancellation } from "@implab/core-amd/interfaces";
2 import { ICancellation } from "@implab/core-amd/interfaces";
3 import { isPromise } from "@implab/core-amd/safe";
3 import { isPromise } from "@implab/core-amd/safe";
4
4
5 /**
5 /**
6 * The interface for the consumer of an observable sequence
6 * The interface for the consumer of an observable sequence
7 */
7 */
8 export interface Observer<T> {
8 export interface Observer<T> {
9 /**
9 /**
10 * Called for the next element in the sequence
10 * Called for the next element in the sequence
11 */
11 */
12 next?: (value: T) => void;
12 next?: (value: T) => void;
13
13
14 /**
14 /**
15 * Called once when the error occurs in the sequence.
15 * Called once when the error occurs in the sequence.
16 */
16 */
17 error?: (e: unknown) => void;
17 error?: (e: unknown) => void;
18
18
19 /**
19 /**
20 * Called once at the end of the sequence.
20 * Called once at the end of the sequence.
21 */
21 */
22 complete?: () => void;
22 complete?: () => void;
23 }
23 }
24
24
25 /**
25 /**
26 * The group of functions to feed an observable. These methods are provided to
26 * The group of functions to feed an observable. These methods are provided to
27 * the producer to generate a stream of events.
27 * the producer to generate a stream of events.
28 */
28 */
29 export type Sink<T> = {
29 export type Sink<T> = {
30 /**
30 /**
31 * Call to send the next element in the sequence
31 * Call to send the next element in the sequence
32 */
32 */
33 next: (value: T) => void;
33 next: (value: T) => void;
34
34
35 /**
35 /**
36 * Call to notify about the error occurred in the sequence.
36 * Call to notify about the error occurred in the sequence.
37 */
37 */
38 error: (e: unknown) => void;
38 error: (e: unknown) => void;
39
39
40 /**
40 /**
41 * Call to signal the end of the sequence.
41 * Call to signal the end of the sequence.
42 */
42 */
43 complete: () => void;
43 complete: () => void;
44
44
45 /**
45 /**
46 * Checks whether the sink is accepting new elements. It's safe to
46 * Checks whether the sink is accepting new elements. It's safe to
47 * send elements to the closed sink.
47 * send elements to the closed sink.
48 */
48 */
49 isClosed: () => boolean;
49 isClosed: () => boolean;
50 };
50 };
51
51
52 export type Producer<T> = (sink: Sink<T>) => (void | (() => void));
52 export type Producer<T> = (sink: Sink<T>) => (void | (() => void));
53
53
54 export interface Unsubscribable {
54 export interface Unsubscribable {
55 unsubscribe(): void;
55 unsubscribe(): void;
56 }
56 }
57
57
58 export const isUnsubscribable = (v: unknown): v is Unsubscribable =>
58 export const isUnsubscribable = (v: unknown): v is Unsubscribable =>
59 v !== null && v !== undefined && typeof (v as Unsubscribable).unsubscribe === "function";
59 v !== null && v !== undefined && typeof (v as Unsubscribable).unsubscribe === "function";
60
60
61 export const isSubscribable = <T = unknown>(v: unknown): v is Subscribable<T> =>
61 export const isSubscribable = <T = unknown>(v: unknown): v is Subscribable<T> =>
62 v !== null && v !== undefined && typeof (v as Subscribable<unknown>).subscribe === "function";
62 v !== null && v !== undefined && typeof (v as Subscribable<unknown>).subscribe === "function";
63
63
64 export interface Subscribable<T> {
64 export interface Subscribable<T> {
65 subscribe(consumer: Observer<T>): Unsubscribable;
65 subscribe(consumer: Observer<T>): Unsubscribable;
66 }
66 }
67
67
68 export type AccumulatorFn<T, A> = (acc: A, value: T) => A;
68 export type AccumulatorFn<T, A> = (acc: A, value: T) => A;
69
69
70 export type OperatorFn<T, U> = (source: Observable<T>) => Producer<U>;
70 export type OperatorFn<T, U> = (source: Observable<T>) => Observable<U>;
71
71
72 /** The observable source of items. */
72 /** The observable source of items. */
73 export interface Observable<T> extends Subscribable<T> {
73 export interface Observable<T> extends Subscribable<T> {
74 /** Transforms elements of the sequence with the specified mapper
74 /** Transforms elements of the sequence with the specified mapper
75 *
75 *
76 * @param mapper The mapper used to transform the values
76 * @param mapper The mapper used to transform the values
77 */
77 */
78 map<T2>(mapper: (value: T) => T2): Observable<T2>;
78 map<T2>(mapper: (value: T) => T2): Observable<T2>;
79
79
80 /** Filters elements of the sequence. The resulting sequence will
80 /** Filters elements of the sequence. The resulting sequence will
81 * contain only elements which match the specified predicate.
81 * contain only elements which match the specified predicate.
82 *
82 *
83 * @param predicate The filter predicate.
83 * @param predicate The filter predicate.
84 */
84 */
85 filter(predicate: (value: T) => boolean): Observable<T>;
85 filter(predicate: (value: T) => boolean): Observable<T>;
86
86
87 /** Completes the sequence once the condition is met.
87 /** Completes the sequence once the condition is met.
88 * @param predicate The condition which should be met to complete the sequence
88 * @param predicate The condition which should be met to complete the sequence
89 */
89 */
90 until(predicate: (value: T) => boolean): Observable<T>;
90 until(predicate: (value: T) => boolean): Observable<T>;
91
91
92 /** Keeps the sequence running while elements satisfy the condition.
92 /** Keeps the sequence running while elements satisfy the condition.
93 *
93 *
94 * @param predicate The condition which should be met to continue.
94 * @param predicate The condition which should be met to continue.
95 */
95 */
96 while(predicate: (value: T) => boolean): Observable<T>;
96 while(predicate: (value: T) => boolean): Observable<T>;
97
97
98 /** Applies accumulator to each value in the sequence and
98 /** Applies accumulator to each value in the sequence and
99 * emits the accumulated value for each source element
99 * emits the accumulated value for each source element
100 *
100 *
101 * @param accumulator
101 * @param accumulator
102 * @param initial
102 * @param initial
103 */
103 */
104 scan<A>(accumulator: AccumulatorFn<T, A>, initial: A): Observable<A>;
104 scan<A>(accumulator: AccumulatorFn<T, A>, initial: A): Observable<A>;
105 scan(accumulator: AccumulatorFn<T, T>): Observable<T>;
105 scan(accumulator: AccumulatorFn<T, T>): Observable<T>;
106
106
107 /** Applies accumulator to each value in the sequence and
107 /** Applies accumulator to each value in the sequence and
108 * emits the accumulated value at the end of the sequence
108 * emits the accumulated value at the end of the sequence
109 *
109 *
110 * @param accumulator
110 * @param accumulator
111 * @param initial
111 * @param initial
112 */
112 */
113 reduce<A>(accumulator: AccumulatorFn<T, A>, initial: A): Observable<A>;
113 reduce<A>(accumulator: AccumulatorFn<T, A>, initial: A): Observable<A>;
114 reduce(accumulator: AccumulatorFn<T, T>): Observable<T>;
114 reduce(accumulator: AccumulatorFn<T, T>): Observable<T>;
115
115
116 /** Concatenates the specified sequences with this observable
116 /** Concatenates the specified sequences with this observable
117 *
117 *
118 * @param seq sequences to concatenate with the current observable
118 * @param seq sequences to concatenate with the current observable
119 *
119 *
120 * The concatenation doesn't accumulate values from the specified sequences,
120 * The concatenation doesn't accumulate values from the specified sequences,
121 * The result of the concatenation is the new observable which will switch
121 * The result of the concatenation is the new observable which will switch
122 * to the next observable after the previous one completes. Values emitted
122 * to the next observable after the previous one completes. Values emitted
123 * before the next observable being active are lost.
123 * before the next observable being active are lost.
124 */
124 */
125 cat(...seq: Subscribable<T>[]): Observable<T>;
125 cat(...seq: Subscribable<T>[]): Observable<T>;
126
126
127
127
128 /** Pipes the specified operator to produce the new observable
128 /** Pipes the specified operator to produce the new observable
129 * @param op The operator consumes this observable and produces a new one
129 * @param op The operator consumes this observable and produces a new one
130 *
130 *
131 * The operator is a higher order function which takes a source observable
131 * The operator is a higher order function which takes a source observable
132 * and returns a producer for the new observable.
132 * and returns a producer for the new observable.
133 *
133 *
134 * This function can be used to create a complex mapping between source and
134 * This function can be used to create a complex mapping between source and
135 * resulting observables. The operator may have a state (or a side effect)
135 * resulting observables. The operator may have a state (or a side effect)
136 * and can be connected to multiple observables.
136 * and can be connected to multiple observables.
137 */
137 */
138 pipe<U>(op: OperatorFn<T, U>): Observable<U>;
138 pipe<U>(op: OperatorFn<T, U>): Observable<U>;
139
139
140 /** Waits for the next event to occur and returns a promise for the next value
140 /** Waits for the next event to occur and returns a promise for the next value
141 * @param ct Cancellation token
141 * @param ct Cancellation token
142 */
142 */
143 next(ct?: ICancellation): Promise<T>;
143 next(ct?: ICancellation): Promise<T>;
144
144
145 /** Collects items of the sequence to the array. */
145 /** Collects items of the sequence to the array. */
146 collect(ct?: ICancellation): Promise<T[]>;
146 collect(ct?: ICancellation): Promise<T[]>;
147 }
147 }
148
148
149 const noop = () => { };
149 const noop = () => { };
150
150
151 const sink = <T>(consumer: Observer<T>) => {
151 const sink = <T>(consumer: Observer<T>) => {
152 const { next, error, complete } = consumer;
152 const { next, error, complete } = consumer;
153 return {
153 return {
154 next: next ? next.bind(consumer) : noop,
154 next: next ? next.bind(consumer) : noop,
155 error: error ? error.bind(consumer) : noop,
155 error: error ? error.bind(consumer) : noop,
156 complete: complete ? complete.bind(consumer) : noop,
156 complete: complete ? complete.bind(consumer) : noop,
157 isClosed: () => false
157 isClosed: () => false
158 };
158 };
159 };
159 };
160
160
161 /** Wraps the producer to handle tear down logic and subscription management
161 /** Wraps the producer to handle tear down logic and subscription management
162 *
162 *
163 * @param producer The producer to wrap
163 * @param producer The producer to wrap
164 * @returns The wrapper producer
164 * @returns The wrapper producer
165 */
165 */
166 const fuse = <T>(producer: Producer<T>) => ({ next, error, complete }: Sink<T>) => {
166 const fuse = <T>(producer: Producer<T>) => ({ next, error, complete }: Sink<T>) => {
167 let done = false;
167 let done = false;
168 let cleanup = noop;
168 let cleanup = noop;
169
169
170 const _fin = <A extends unknown[]>(fn: (...args: A) => void) =>
170 const _fin = <A extends unknown[]>(fn: (...args: A) => void) =>
171 (...args: A) => done ?
171 (...args: A) => done ?
172 void (0) :
172 void (0) :
173 (done = true, cleanup(), fn(...args));
173 (done = true, cleanup(), fn(...args));
174
174
175 const _fin0 = () => done ? void (0) : (done = true, cleanup());
175 const _fin0 = () => done ? void (0) : (done = true, cleanup());
176
176
177 const safeSink = {
177 const safeSink = {
178 next: (value: T) => { !done && next(value); },
178 next: (value: T) => { !done && next(value); },
179 error: _fin(error),
179 error: _fin(error),
180 complete: _fin(complete),
180 complete: _fin(complete),
181 isClosed: () => done
181 isClosed: () => done
182 };
182 };
183 cleanup = producer(safeSink) ?? noop;
183 cleanup = producer(safeSink) ?? noop;
184 return done ? cleanup() : _fin0;
184 return done ? cleanup() : _fin0;
185 };
185 };
186
186
187 const _observe = <T>(producer: Producer<T>): Observable<T> => ({
187 const _observe = <T>(producer: Producer<T>): Observable<T> => ({
188 subscribe: (consumer: Observer<T>) => ({
188 subscribe: (consumer: Observer<T>) => ({
189 unsubscribe: producer(sink(consumer)) ?? noop
189 unsubscribe: producer(sink(consumer)) ?? noop
190 }),
190 }),
191
191
192 map: (mapper) => _observe(({ next, ...rest }) =>
192 map: (mapper) => _observe(({ next, ...rest }) =>
193 producer({
193 producer({
194 next: next !== noop ? (v: T) => next(mapper(v)) : noop,
194 next: next !== noop ? (v: T) => next(mapper(v)) : noop,
195 ...rest
195 ...rest
196 })
196 })
197 ),
197 ),
198
198
199 filter: (predicate) => _observe(({ next, ...rest }) =>
199 filter: (predicate) => _observe(({ next, ...rest }) =>
200 producer({
200 producer({
201 next: next !== noop ? (v: T) => predicate(v) ? next(v) : void (0) : noop,
201 next: next !== noop ? (v: T) => predicate(v) ? next(v) : void (0) : noop,
202 ...rest
202 ...rest
203 })
203 })
204 ),
204 ),
205
205
206 until: predicate => _observe(({ next, complete, ...rest }) =>
206 until: predicate => _observe(({ next, complete, ...rest }) =>
207 producer({
207 producer({
208 next: v => predicate(v) ? complete() : next(v),
208 next: v => predicate(v) ? complete() : next(v),
209 complete,
209 complete,
210 ...rest
210 ...rest
211 })
211 })
212 ),
212 ),
213
213
214 while: predicate => _observe(({ next, complete, ...rest }) =>
214 while: predicate => _observe(({ next, complete, ...rest }) =>
215 producer({
215 producer({
216 next: v => predicate(v) ? next(v) : complete(),
216 next: v => predicate(v) ? next(v) : complete(),
217 complete,
217 complete,
218 ...rest
218 ...rest
219 })
219 })
220 ),
220 ),
221
221
222 scan: <A>(...args: [AccumulatorFn<T, A>, A] | [AccumulatorFn<T, T>]) => _observe<T | A>(({ next, ...rest }) => {
222 scan: <A>(...args: [AccumulatorFn<T, A>, A] | [AccumulatorFn<T, T>]) => _observe<T | A>(({ next, ...rest }) => {
223 if (args.length === 1) {
223 if (args.length === 1) {
224 const [accumulator] = args;
224 const [accumulator] = args;
225 let _acc: T;
225 let _acc: T;
226 let index = 0;
226 let index = 0;
227 return producer({
227 return producer({
228 next: next !== noop ? (v: T) => next(index++ === 0 ? _acc = v : _acc = accumulator(_acc, v)) : noop,
228 next: next !== noop ? (v: T) => next(index++ === 0 ? _acc = v : _acc = accumulator(_acc, v)) : noop,
229 ...rest
229 ...rest
230 });
230 });
231 } else {
231 } else {
232 const [accumulator, initial] = args;
232 const [accumulator, initial] = args;
233 let _acc = initial;
233 let _acc = initial;
234 return producer({
234 return producer({
235 next: next !== noop ? (v: T) => next(_acc = accumulator(_acc, v)) : noop,
235 next: next !== noop ? (v: T) => next(_acc = accumulator(_acc, v)) : noop,
236 ...rest
236 ...rest
237 });
237 });
238 }
238 }
239 }),
239 }),
240
240
241 reduce: <A>(...args: [AccumulatorFn<T, A>, A] | [AccumulatorFn<T, T>]) => _observe<T | A>(({ next, complete, error, ...rest }) => {
241 reduce: <A>(...args: [AccumulatorFn<T, A>, A] | [AccumulatorFn<T, T>]) => _observe<T | A>(({ next, complete, error, ...rest }) => {
242 if (args.length === 1) {
242 if (args.length === 1) {
243 const [accumulator] = args;
243 const [accumulator] = args;
244 let _acc: T;
244 let _acc: T;
245 let index = 0;
245 let index = 0;
246 return producer({
246 return producer({
247 next: next !== noop ? (v: T) => {
247 next: next !== noop ? (v: T) => {
248 _acc = index++ === 0 ? v : accumulator(_acc, v);
248 _acc = index++ === 0 ? v : accumulator(_acc, v);
249 } : noop,
249 } : noop,
250 complete: () => {
250 complete: () => {
251 if (index === 0) {
251 if (index === 0) {
252 error(new Error("The sequence can't be empty"));
252 error(new Error("The sequence can't be empty"));
253 } else {
253 } else {
254 next(_acc);
254 next(_acc);
255 complete();
255 complete();
256 }
256 }
257 },
257 },
258 error,
258 error,
259 ...rest
259 ...rest
260 });
260 });
261 } else {
261 } else {
262 const [accumulator, initial] = args;
262 const [accumulator, initial] = args;
263 let _acc = initial;
263 let _acc = initial;
264 return producer({
264 return producer({
265 next: next !== noop ? (v: T) => {
265 next: next !== noop ? (v: T) => {
266 _acc = accumulator(_acc, v);
266 _acc = accumulator(_acc, v);
267 } : noop,
267 } : noop,
268 complete: () => {
268 complete: () => {
269 next(_acc);
269 next(_acc);
270 complete();
270 complete();
271 },
271 },
272 error,
272 error,
273 ...rest
273 ...rest
274 });
274 });
275 }
275 }
276 }),
276 }),
277
277
278 cat: (...seq) => _observe(({ next, complete: final, ...rest }) => {
278 cat: (...seq) => _observe(({ next, complete: final, ...rest }) => {
279 let cleanup: () => void;
279 let cleanup: () => void;
280 const complete = () => {
280 const len = seq.length;
281 const continuation = seq.shift();
281 const complete = (i: number) => i < len ?
282 if (continuation) {
282 () => {
283 // if we have a next sequence, subscribe to it
283 const subscription = seq[i].subscribe({ next, complete: complete(i + 1), ...rest });
284 const subscription = continuation.subscribe({ next, complete, ...rest });
285 cleanup = subscription.unsubscribe.bind(subscription);
284 cleanup = subscription.unsubscribe.bind(subscription);
286 } else {
285 } : final;
287 // otherwise notify the consumer about completion
288 final();
289 }
290 };
291
286
292 cleanup = producer({ next, complete, ...rest }) ?? noop;
287 cleanup = producer({ next, complete: complete(0), ...rest }) ?? noop;
293
288
294 return () => cleanup();
289 return () => cleanup();
295 }),
290 }),
296
291
297 pipe: <U>(op: OperatorFn<T,U>) => observe(op(_observe(producer))),
292 pipe: <U>(op: OperatorFn<T, U>) => op(_observe(producer)),
298
293
299 next: collect(
294 next: collect(
300 producer,
295 producer,
301 ({ next, complete, error, isClosed }) => ({
296 ({ next, complete, error, isClosed }) => ({
302 next: v => (next(v), complete()),
297 next: v => (next(v), complete()),
303 complete: () => error(new Error("The sequence is empty")),
298 complete: () => error(new Error("The sequence is empty")),
304 error,
299 error,
305 isClosed
300 isClosed
306 })
301 })
307 ),
302 ),
308
303
309 collect: collect(
304 collect: collect(
310 producer,
305 producer,
311 ({ next, complete, ...rest }) => {
306 ({ next, complete, ...rest }) => {
312 const data: T[] = [];
307 const data: T[] = [];
313 return {
308 return {
314 next: v => data.push(v),
309 next: v => data.push(v),
315 complete: () => (next(data), complete()),
310 complete: () => (next(data), complete()),
316 ...rest
311 ...rest
317 };
312 };
318 }
313 }
319 )
314 )
320 });
315 });
321
316
322 const collect = <T, U>(
317 const collect = <T, U>(
323 producer: Producer<T>,
318 producer: Producer<T>,
324 collector: (result: Sink<U>) => Sink<T>
319 collector: (result: Sink<U>) => Sink<T>
325 ) => (ct = Cancellation.none) => new Promise<U>((resolve, reject) => {
320 ) => (ct = Cancellation.none) => new Promise<U>((resolve, reject) => {
326 const fused = fuse<U>(({ next, complete, error, isClosed }) => {
321 const fused = fuse<U>(({ next, complete, error, isClosed }) => {
327 const h = ct.register(error);
322 const h = ct.register(error);
328 const cleanup = !isClosed() ?
323 const cleanup = !isClosed() ?
329 producer(collector({ next, complete, error, isClosed })) ?? noop :
324 producer(collector({ next, complete, error, isClosed })) ?? noop :
330 noop;
325 noop;
331
326
332 return () => {
327 return () => {
333 h.destroy();
328 h.destroy();
334 cleanup();
329 cleanup();
335 };
330 };
336 });
331 });
337
332
338 fused({
333 fused({
339 next: resolve,
334 next: resolve,
340 error: reject,
335 error: reject,
341 complete: noop,
336 complete: noop,
342 isClosed: () => false
337 isClosed: () => false
343 });
338 });
344 });
339 });
345
340
346 export const observe = <T>(producer: Producer<T>) => _observe(fuse(producer));
341 export const observe = <T>(producer: Producer<T>) => _observe(fuse(producer));
347
342
348 /** Converts an array to the observable sequence of its elements. */
343 /** Converts an array to the observable sequence of its elements. */
349 export const ofArray = <T>(items: T[]) => _observe<T>(
344 export const ofArray = <T>(items: T[]) => _observe<T>(
350 ({ next, complete }) => (
345 ({ next, complete }) => (
351 items.forEach(next),
346 items.forEach(next),
352 complete()
347 complete()
353 )
348 )
354 );
349 );
355
350
356 /** Converts a subscribable to the observable */
351 /** Converts a subscribable to the observable */
357 export const ofSubscribable = <T>(subscribable: Subscribable<T>) =>
352 export const ofSubscribable = <T>(subscribable: Subscribable<T>) =>
358 observe(sink => {
353 observe(sink => {
359 const subscription = subscribable.subscribe(sink);
354 const subscription = subscribable.subscribe(sink);
360 return () => subscription.unsubscribe();
355 return () => subscription.unsubscribe();
361 });
356 });
362
357
363 const of1 = <T>(item: T | PromiseLike<T>) => observe<T>(
358 const of1 = <T>(item: T | PromiseLike<T>) => observe<T>(
364 ({ next, error, complete }) =>
359 ({ next, error, complete }) =>
365 isPromise(item) ?
360 isPromise(item) ?
366 void item.then(
361 void item.then(
367 v => (next(v), complete()),
362 v => (next(v), complete()),
368 error
363 error
369 ) :
364 ) :
370 (next(item), complete())
365 (next(item), complete())
371 );
366 );
372
367
373 /** Converts a list of parameter values to the observable sequence. The
368 /** Converts a list of parameter values to the observable sequence. The
374 * order of elements in the list will be preserved in the resulting sequence.
369 * order of elements in the list will be preserved in the resulting sequence.
375 */
370 */
376 export const of = <T>(...items: (T | PromiseLike<T>)[]) => items.length === 1 ?
371 export const of = <T>(...items: (T | PromiseLike<T>)[]) => items.length === 1 ?
377 of1(items[0]) :
372 of1(items[0]) :
378 observe<T>(
373 observe<T>(
379 ({ next, error, complete, isClosed }) => {
374 ({ next, error, complete, isClosed }) => {
380 const n = items.length;
375 const n = items.length;
381
376
382 const _next = (start: number) => {
377 const _next = (start: number) => {
383 if (start > 0 && isClosed()) // when resumed
378 if (start > 0 && isClosed()) // when resumed
384 return;
379 return;
385
380
386 for (let i = start; i < n; i++) {
381 for (let i = start; i < n; i++) {
387 const r = items[i];
382 const r = items[i];
388 if (isPromise(r)) {
383 if (isPromise(r)) {
389 r.then(v => (next(v), _next(i + 1)), error);
384 r.then(v => (next(v), _next(i + 1)), error);
390 return; // suspend
385 return; // suspend
391 } else {
386 } else {
392 next(r);
387 next(r);
393 }
388 }
394 }
389 }
395 complete();
390 complete();
396 };
391 };
397
392
398 _next(0);
393 _next(0);
399 }
394 }
400 );
395 );
401
396
402 export const empty = _observe<never>(({ complete }) => complete());
397 export const empty = _observe<never>(({ complete }) => complete());
@@ -1,105 +1,105
1 import { Cancellation } from "@implab/core-amd/Cancellation";
1 import { Cancellation } from "@implab/core-amd/Cancellation";
2 import { CancelledError } from "@implab/core-amd/CancelledError";
2 import { CancelledError } from "@implab/core-amd/CancelledError";
3 import { Producer, Sink, Subscribable } from "../observable";
3 import { Observable, Sink, Subscribable, observe } from "../observable";
4 import { Scope } from "../tsx/Scope";
4 import { Scope } from "../tsx/Scope";
5
5
6 /**
6 /**
7 * Creates a buffer with the specified length. The buffer will immediately
7 * Creates a buffer with the specified length. The buffer will immediately
8 * subscribe to the source observable and start accumulating values.
8 * subscribe to the source observable and start accumulating values.
9 *
9 *
10 * The resulting observable will repeat the buffered values for each new subscriber.
10 * The resulting observable will repeat the buffered values for each new subscriber.
11 *
11 *
12 * @param length The number of elements to store.
12 * @param length The number of elements to store.
13 * @param ct Cancellation token to unsubscribe from the original observable.
13 * @param ct Cancellation token to unsubscribe from the original observable.
14 *
14 *
15 */
15 */
16 export const buffer = (length: number, ct = Cancellation.none) => <T>(source: Subscribable<T>): Producer<T> => {
16 export const buffer = (length: number, ct = Cancellation.none) => <T>(source: Subscribable<T>): Observable<T> => {
17 type Status = "active" | "complete" | "error";
17 type Status = "active" | "complete" | "error";
18
18
19 // ring-buffer, wpos will rotate in range (0...length-1)
19 // ring-buffer, wpos will rotate in range (0...length-1)
20 let wpos = 0;
20 let wpos = 0;
21 const buffer: T[] = [];
21 const buffer: T[] = [];
22
22
23 // writes the next value to the buffer
23 // writes the next value to the buffer
24 const write = (value: T) => {
24 const write = (value: T) => {
25 buffer[wpos] = value;
25 buffer[wpos] = value;
26 wpos = (wpos + 1) % length;
26 wpos = (wpos + 1) % length;
27 };
27 };
28
28
29 // reads contents of the buffer
29 // reads contents of the buffer
30 // cb will be called for each value in the buffer
30 // cb will be called for each value in the buffer
31 const read = (cb: (item: T) => void) => {
31 const read = (cb: (item: T) => void) => {
32 const start = wpos + length - buffer.length;
32 const start = wpos + length - buffer.length;
33 const end = wpos + length;
33 const end = wpos + length;
34
34
35 for(let pos = start; pos < end; pos++ )
35 for(let pos = start; pos < end; pos++ )
36 cb(buffer[pos % length]);
36 cb(buffer[pos % length]);
37 };
37 };
38
38
39 let status: Status = "active";
39 let status: Status = "active";
40 let lastError: unknown;
40 let lastError: unknown;
41 let subscribers: Sink<T>[] = [];
41 let subscribers: Sink<T>[] = [];
42
42
43 const scope = new Scope();
43 const scope = new Scope();
44
44
45 // cleanup method to release resources held by this subscription
45 // cleanup method to release resources held by this subscription
46 const cleanup = (cb: (item: Sink<T>) => void) => {
46 const cleanup = (cb: (item: Sink<T>) => void) => {
47 scope.destroy();
47 scope.destroy();
48 const _subscribers = subscribers;
48 const _subscribers = subscribers;
49 subscribers = [];
49 subscribers = [];
50 _subscribers.forEach(cb);
50 _subscribers.forEach(cb);
51 };
51 };
52
52
53 const sink: Sink<T> = {
53 const sink: Sink<T> = {
54 isClosed: () => status !== "active",
54 isClosed: () => status !== "active",
55 complete: () => {
55 complete: () => {
56 if (status === "active") {
56 if (status === "active") {
57 status = "complete";
57 status = "complete";
58 cleanup(s => s.complete());
58 cleanup(s => s.complete());
59 }
59 }
60 },
60 },
61 error: e => {
61 error: e => {
62 if (status === "active") {
62 if (status === "active") {
63 status = "error";
63 status = "error";
64 lastError = e;
64 lastError = e;
65 cleanup(s => s.error(e));
65 cleanup(s => s.error(e));
66 }
66 }
67 },
67 },
68 next: v => {
68 next: v => {
69 if (status === "active") {
69 if (status === "active") {
70 write(v);
70 write(v);
71 const _subscribers = subscribers;
71 const _subscribers = subscribers;
72 _subscribers.forEach(s => s.next(v));
72 _subscribers.forEach(s => s.next(v));
73 }
73 }
74 }
74 }
75 };
75 };
76
76
77 if (ct.isRequested()) {
77 if (ct.isRequested()) {
78 sink.error(new CancelledError("The operation was cancelled", ct));
78 sink.error(new CancelledError("The operation was cancelled", ct));
79 } else {
79 } else {
80 scope.own(source.subscribe(sink));
80 scope.own(source.subscribe(sink));
81 scope.own(ct.register(e => sink.error(e)));
81 scope.own(ct.register(e => sink.error(e)));
82 }
82 }
83
83
84 return (s: Sink<T>) => {
84 return observe( s => {
85 const _subscribers = subscribers;
85 const _subscribers = subscribers;
86 read(s.next);
86 read(s.next);
87 switch (status) {
87 switch (status) {
88 case "active":
88 case "active":
89 subscribers.push(s);
89 subscribers.push(s);
90 return () => {
90 return () => {
91 if (_subscribers === subscribers) {
91 if (_subscribers === subscribers) {
92 const pos = subscribers.indexOf(s);
92 const pos = subscribers.indexOf(s);
93 if (pos >= 0)
93 if (pos >= 0)
94 subscribers.splice(pos, 1);
94 subscribers.splice(pos, 1);
95 }
95 }
96 };
96 };
97 case "complete":
97 case "complete":
98 s.complete();
98 s.complete();
99 break;
99 break;
100 case "error":
100 case "error":
101 s.error(lastError);
101 s.error(lastError);
102 break;
102 break;
103 }
103 }
104 };
104 });
105 }; No newline at end of file
105 };
@@ -1,50 +1,50
1 import { Producer, Sink, Subscribable } from "../observable";
1 import { Observable, Sink, Subscribable, observe } from "../observable";
2
2
3 const noop = () => { };
3 const noop = () => { };
4
4
5 /** Joins multiple subscribers to the single one. The resulting subscriber
5 /** Joins multiple subscribers to the single one. The resulting subscriber
6 * will be created when the first client subscribes and will be released
6 * will be created when the first client subscribes and will be released
7 * with the the last client unsubscribed.
7 * with the the last client unsubscribed.
8 *
8 *
9 * Use this wrapper to prevent spawning multiple producers.
9 * Use this wrapper to prevent spawning multiple producers.
10 *
10 *
11 * @param source The source observable
11 * @param source The source observable
12 * @returns The wrapped producer
12 * @returns The wrapped producer
13 */
13 */
14 export const subject = <T>(source: Subscribable<T>): Producer<T> => {
14 export const subject = <T>(source: Subscribable<T>): Observable<T> => {
15 let subscribers: Sink<T>[] = [];
15 let subscribers: Sink<T>[] = [];
16
16
17 let subscription = { unsubscribe: noop };
17 let subscription = { unsubscribe: noop };
18
18
19 // cleanup method to release resources held by this subscription
19 // cleanup method to release resources held by this subscription
20 const cleanup = (cb: (item: Sink<T>) => void) => {
20 const cleanup = (cb: (item: Sink<T>) => void) => {
21 const _subscribers = subscribers;
21 const _subscribers = subscribers;
22 subscribers = [];
22 subscribers = [];
23 _subscribers.forEach(cb);
23 _subscribers.forEach(cb);
24 subscription.unsubscribe();
24 subscription.unsubscribe();
25 };
25 };
26
26
27 const sink: Sink<T> = {
27 const sink: Sink<T> = {
28 isClosed: () => false,
28 isClosed: () => false,
29 complete: () => cleanup(s => s.complete()),
29 complete: () => cleanup(s => s.complete()),
30 error: e => cleanup(s => s.error(e)),
30 error: e => cleanup(s => s.error(e)),
31 next: v => subscribers.forEach(s => s.next(v))
31 next: v => subscribers.forEach(s => s.next(v))
32 };
32 };
33
33
34 return client => {
34 return observe(client => {
35 const _subscribers = subscribers;
35 const _subscribers = subscribers;
36 subscribers.push(client);
36 subscribers.push(client);
37 if (subscribers.length === 1)
37 if (subscribers.length === 1)
38 subscription = source.subscribe(sink);
38 subscription = source.subscribe(sink);
39
39
40 return () => {
40 return () => {
41 if (_subscribers === subscribers) {
41 if (_subscribers === subscribers) {
42 const pos = subscribers.indexOf(client);
42 const pos = subscribers.indexOf(client);
43 if (pos >= 0)
43 if (pos >= 0)
44 subscribers.splice(pos, 1);
44 subscribers.splice(pos, 1);
45 if (!subscribers.length)
45 if (!subscribers.length)
46 subscription.unsubscribe();
46 subscription.unsubscribe();
47 }
47 }
48 };
48 };
49 };
49 });
50 };
50 };
@@ -1,61 +1,123
1 import { PromiseOrValue } from "@implab/core-amd/interfaces";
1 import { PromiseOrValue } from "@implab/core-amd/interfaces";
2 import { isPromise } from "@implab/core-amd/safe";
2 import { isCancellable, isPromise } from "@implab/core-amd/safe";
3 import { observe, Observable } from "./observable";
3 import { observe, Observable, empty } from "./observable";
4 import { after } from "dojo/aspect";
5 import { subject } from "./operators/subject";
4
6
5 export interface OrderedUpdate<T> {
7 export interface OrderedUpdate<T> {
6 /** The item is being updated */
8 /** The item is being updated */
7 readonly item: T;
9 readonly item: T;
8
10
9 /** The previous index of the item, -1 in case it is inserted */
11 /** The previous index of the item, -1 in case it is inserted */
10 readonly prevIndex: number;
12 readonly prevIndex: number;
11
13
12 /** The new index of the item, -1 in case it is deleted */
14 /** The new index of the item, -1 in case it is deleted */
13 readonly newIndex: number;
15 readonly newIndex: number;
14
16
15 }
17 }
16
18
17 export type QueryResults<T> = Observable<OrderedUpdate<T>>;
19 export type QueryResults<T> = Observable<OrderedUpdate<T>>;
18
20
19 interface DjObservableResults<T> {
21 interface DjObservableResults<T> {
20 /**
22 /**
21 * Allows observation of results
23 * Allows observation of results
22 */
24 */
23 observe(listener: (object: T, previousIndex: number, newIndex: number) => void, includeUpdates?: boolean): {
25 observe(listener: (object: T, previousIndex: number, newIndex: number) => void, includeUpdates?: boolean): {
24 remove(): void;
26 remove(): void;
25 };
27 };
26 }
28 }
27
29
28 interface Queryable<T, Q, O> {
30 interface Queryable<T, Q, O> {
29 query(query?: Q, options?: O): PromiseOrValue<T[]>;
31 query(query?: Q, options?: O): PromiseOrValue<T[]>;
30 }
32 }
31
33
32 export const isDjObservableResults = <T>(v: object): v is DjObservableResults<T> =>
34 export const isDjObservableResults = <T>(v: object): v is DjObservableResults<T> =>
33 v && (typeof (v as { observe?: unknown; }).observe === "function");
35 v && (typeof (v as { observe?: unknown; }).observe === "function");
34
36
35 export const query = <T, Q, O>(store: Queryable<T, Q, O>, includeUpdates = true) =>
37 export const query = <T, Q, O>(store: Queryable<T, Q, O>, includeUpdates = true) =>
36 (query?: Q, options?: O & { observe?: boolean }) => {
38 (query?: Q, options?: O & { observe?: boolean }) => {
37 return observe<OrderedUpdate<T>>(({ next, complete, error, isClosed }) => {
39 return observe<OrderedUpdate<T>>(({ next, complete, error, isClosed }) => {
38
40
39 const processResults = (items: T[]) =>
41 const processResults = (items: T[]) =>
40 items.forEach((item, newIndex) => next({ item, newIndex, prevIndex: -1 }));
42 items.forEach((item, newIndex) => next({ item, newIndex, prevIndex: -1 }));
41
43
42 try {
44 try {
43 const results = store.query(query, options);
45 const results = store.query(query, options);
44 if (isPromise(results)) {
46 if (isPromise(results)) {
45 results.then(processResults).then(undefined, error);
47 results.then(processResults).then(undefined, error);
46 } else {
48 } else {
47 processResults(results);
49 processResults(results);
48 }
50 }
49
51
50 if (!isClosed() && (options?.observe !== false) && isDjObservableResults<T>(results)) {
52 if (!isClosed() && (options?.observe !== false) && isDjObservableResults<T>(results)) {
51 const h = results.observe((item, prevIndex, newIndex) => next({ item, prevIndex, newIndex }), includeUpdates);
53 const h = results.observe((item, prevIndex, newIndex) => next({ item, prevIndex, newIndex }), includeUpdates);
52 return () => h.remove();
54 return () => h.remove();
53 } else {
55 } else {
54 complete();
56 complete();
55 }
57 }
56 } catch (err) {
58 } catch (err) {
57 error(err);
59 error(err);
58 }
60 }
59 });
61 });
60
62
61 };
63 };
64
65 interface IndexedStore<T> {
66 get(id: string | number): PromiseLike<T> | T | null | undefined;
67 }
68
69 interface Notifications<T> {
70 notify(item: T | undefined, id: string | number | undefined): void;
71 }
72
73 const hasNotifications = <T>(x: unknown): x is Notifications<T> =>
74 typeof x === "object" && x !== null && (typeof (x as Notifications<T>).notify === "function");
75
76 interface GetOpts {
77 observe?: boolean;
78 }
79
80 type Change<T> = [item: NonNullable<T>, id: string | number | undefined] |
81 [item: undefined | null, id: string | number];
82
83 const filterItem = (itemId: string | number) =>
84 <T>(source: Observable<Change<T>>) =>
85 observe<T>(({ next, complete, error }) => {
86 const subscription = source
87 .filter(([, id]) => id === itemId)
88 .subscribe({
89 next: ([item]) => item !== null && item !== undefined ? next(item) : complete(),
90 complete,
91 error
92 });
93 return () => subscription.unsubscribe();
94 });
95
96 export const get = <T>(store: IndexedStore<T>) => {
97 const changes = hasNotifications<T>(store) ?
98 observe<Change<T>>(({ next }) => {
99 const handle = after(store, "notify", (...args: Change<T>) => next(args), true);
100 return () => handle.remove();
101 }).pipe(subject) : empty;
102
103
104 return (id: string | number, opts: GetOpts = {}) =>
105 observe<T>(({ next, complete, error }) => {
106 const result = store.get(id);
107
108 const handle = (x: T | null | undefined) => {
109 if (x !== null && x !== undefined)
110 next(x);
111 complete();
112 };
113
114 if (isPromise(result)) {
115 result.then(handle, error);
116
117 if (isCancellable(result))
118 return () => result.cancel();
119 } else {
120 handle(result);
121 }
122 }).cat(opts.observe !== false ? changes.pipe(filterItem(id)) : empty);
123 }; No newline at end of file
@@ -1,150 +1,150
1 import { empty, observe, of } from "./observable";
1 import { empty, observe, of } from "./observable";
2 import * as tap from "tap";
2 import * as tap from "tap";
3 import { Cancellation } from "@implab/core-amd/Cancellation";
3 import { Cancellation } from "@implab/core-amd/Cancellation";
4 import { delay } from "@implab/core-amd/safe";
4 import { delay } from "@implab/core-amd/safe";
5
5
6 const subj1 = observe<number>(({ next, complete }) => {
6 const subj1 = observe<number>(({ next, complete }) => {
7 next(1);
7 next(1);
8 complete();
8 complete();
9 next(2);
9 next(2);
10 });
10 });
11
11
12 const consumer1 = {
12 const consumer1 = {
13 sum: 0,
13 sum: 0,
14 next(v: number) {
14 next(v: number) {
15 this.sum += v;
15 this.sum += v;
16 }
16 }
17 };
17 };
18
18
19 subj1.subscribe(consumer1);
19 subj1.subscribe(consumer1);
20 tap.equal(consumer1.sum, 1, "Should get only one value");
20 tap.equal(consumer1.sum, 1, "Should get only one value");
21
21
22 subj1.subscribe(consumer1);
22 subj1.subscribe(consumer1);
23 tap.equal(consumer1.sum, 2, "Should get the value again");
23 tap.equal(consumer1.sum, 2, "Should get the value again");
24
24
25 const consumer2 = {
25 const consumer2 = {
26 value: 0,
26 value: 0,
27 completed: false,
27 completed: false,
28 next(v: number) { this.value = v; },
28 next(v: number) { this.value = v; },
29 complete() { this.completed = true; }
29 complete() { this.completed = true; }
30 };
30 };
31
31
32 let maps = 0;
32 let maps = 0;
33
33
34 subj1
34 subj1
35 .map(v => {
35 .map(v => {
36 tap.comment(`map1: ${v * 2}`);
36 tap.comment(`map1: ${v * 2}`);
37 maps++;
37 maps++;
38 return v * 2;
38 return v * 2;
39 })
39 })
40 .map(v => {
40 .map(v => {
41 tap.comment(`map2: ${v * 2}`);
41 tap.comment(`map2: ${v * 2}`);
42 maps++;
42 maps++;
43 return v * 2;
43 return v * 2;
44 })
44 })
45 .map(v => {
45 .map(v => {
46 tap.comment(`map3: ${v * 2}`);
46 tap.comment(`map3: ${v * 2}`);
47 maps++;
47 maps++;
48 return v * 2;
48 return v * 2;
49 })
49 })
50 .subscribe(consumer2);
50 .subscribe(consumer2);
51
51
52 tap.equal(consumer2.value, 8, "Should map");
52 tap.equal(consumer2.value, 8, "Should map");
53 tap.equal(maps, 3, "The map chain should not be executed after completion");
53 tap.equal(maps, 3, "The map chain should not be executed after completion");
54 tap.ok(consumer2.completed, "The completion signal should pass through");
54 tap.ok(consumer2.completed, "The completion signal should pass through");
55
55
56 const subj2 = observe<number>(({ next, complete }) => {
56 const subj2 = observe<number>(({ next, complete }) => {
57 [1, 2, 3, 4, 5].forEach(next);
57 [1, 2, 3, 4, 5].forEach(next);
58 complete();
58 complete();
59 return () => {
59 return () => {
60 tap.comment("subj2: unsubscribe");
60 tap.comment("subj2: unsubscribe");
61 };
61 };
62 });
62 });
63
63
64 const consumer3 = {
64 const consumer3 = {
65 even: 0,
65 even: 0,
66 odd: 0,
66 odd: 0,
67 completed: false,
67 completed: false,
68 subscribed: 0,
68 subscribed: 0,
69 unsubscribed: 0,
69 unsubscribed: 0,
70 next(v: "even" | "odd") {
70 next(v: "even" | "odd") {
71 this[v]++;
71 this[v]++;
72 },
72 },
73 complete() {
73 complete() {
74 this.completed = true;
74 this.completed = true;
75 },
75 },
76 subscribe() {
76 subscribe() {
77 this.subscribed++;
77 this.subscribed++;
78 },
78 },
79 unsubscribe() {
79 unsubscribe() {
80 this.unsubscribed++;
80 this.unsubscribed++;
81 }
81 }
82 };
82 };
83
83
84
84
85 const subj3 = subj2.pipe<"even" | "odd">(self => ({ next, complete, error }) => {
85 const subj3 = subj2.pipe<"even" | "odd">(self => observe(({ next, complete, error }) => {
86 consumer3.subscribe();
86 consumer3.subscribe();
87 let count = 0;
87 let count = 0;
88 const h = self.subscribe({
88 const h = self.subscribe({
89 next: val => {
89 next: val => {
90 if (val % 2 === 0)
90 if (val % 2 === 0)
91 next("odd");
91 next("odd");
92 else
92 else
93 next("even");
93 next("even");
94 if (++count === 4)
94 if (++count === 4)
95 complete();
95 complete();
96 },
96 },
97 complete,
97 complete,
98 error
98 error
99 });
99 });
100 return () => {
100 return () => {
101 consumer3.unsubscribe();
101 consumer3.unsubscribe();
102 h.unsubscribe();
102 h.unsubscribe();
103 };
103 };
104 });
104 }));
105
105
106 subj3.subscribe(consumer3);
106 subj3.subscribe(consumer3);
107
107
108 tap.equal(consumer3.odd, 2, "Should get 2 odd elements");
108 tap.equal(consumer3.odd, 2, "Should get 2 odd elements");
109 tap.equal(consumer3.even, 2, "Should get 2 even elements");
109 tap.equal(consumer3.even, 2, "Should get 2 even elements");
110 tap.ok(consumer3.completed, "The sequence should completed");
110 tap.ok(consumer3.completed, "The sequence should completed");
111 tap.equal(consumer3.subscribed, 1, "The subscription should be done once");
111 tap.equal(consumer3.subscribed, 1, "The subscription should be done once");
112 tap.equal(consumer3.unsubscribed, 1, "The cleanup should be done after completion");
112 tap.equal(consumer3.unsubscribed, 1, "The cleanup should be done after completion");
113
113
114 subj2.reduce((a, b) => a + b).subscribe({
114 subj2.reduce((a, b) => a + b).subscribe({
115 next: val => tap.comment("subj2: reduce =", val),
115 next: val => tap.comment("subj2: reduce =", val),
116 complete: () => tap.comment("subj2: complete")
116 complete: () => tap.comment("subj2: complete")
117 });
117 });
118
118
119 tap.test("of(...) tests", async t => {
119 tap.test("of(...) tests", async t => {
120 await subj2.reduce((a, b) => a + b).next()
120 await subj2.reduce((a, b) => a + b).next()
121 .then(value => t.comment("subj2: next reduce=", value));
121 .then(value => t.comment("subj2: next reduce=", value));
122
122
123 await subj2.next().then(val => t.equal(val, 1, "Should peek the first element"));
123 await subj2.next().then(val => t.equal(val, 1, "Should peek the first element"));
124
124
125 const cancelled = new Cancellation(cancel => cancel());
125 const cancelled = new Cancellation(cancel => cancel());
126 await t.rejects(subj2.next(cancelled), "Cancelled next() method should fail");
126 await t.rejects(subj2.next(cancelled), "Cancelled next() method should fail");
127
127
128 await t.rejects(empty.next(), "Empty sequence should fail to get next element");
128 await t.rejects(empty.next(), "Empty sequence should fail to get next element");
129
129
130 await of(delay(1).then(() => 1), Promise.resolve(2), 3)
130 await of(delay(1).then(() => 1), Promise.resolve(2), 3)
131 .reduce<number[]>((a, x) => [...a, x], [])
131 .reduce<number[]>((a, x) => [...a, x], [])
132 .next()
132 .next()
133 .then(res => t.same(res, [1, 2, 3], "of(...) should keep the order"));
133 .then(res => t.same(res, [1, 2, 3], "of(...) should keep the order"));
134
134
135 const rejected = Promise.reject("DIE!");
135 const rejected = Promise.reject("DIE!");
136 rejected.catch(() => { }); // SAFE AND SOUND
136 rejected.catch(() => { }); // SAFE AND SOUND
137
137
138 await t.resolves(
138 await t.resolves(
139 of(Promise.resolve(1), rejected).next(),
139 of(Promise.resolve(1), rejected).next(),
140 "of(...) should emit non-rejected items"
140 "of(...) should emit non-rejected items"
141 );
141 );
142 await t.rejects(
142 await t.rejects(
143 of(1, Promise.reject("DIE!")).reduce((a) => a).next(),
143 of(1, Promise.reject("DIE!")).reduce((a) => a).next(),
144 "of(...) should terminate with error when a parameter is rejected"
144 "of(...) should terminate with error when a parameter is rejected"
145 );
145 );
146
146
147 t.same(await of(1,2,3).collect(), [1,2,3], ".collect() should return the collected sequence");
147 t.same(await of(1,2,3).collect(), [1,2,3], ".collect() should return the collected sequence");
148 await t.rejects(of(1,2,3).collect(cancelled), ".collect() should support cancellation");
148 await t.rejects(of(1,2,3).collect(cancelled), ".collect() should support cancellation");
149
149
150 }).catch(() => { }); No newline at end of file
150 }).catch(() => { });
@@ -1,4 +1,5
1 import "./declare-tests";
1 import "./declare-tests";
2 import "./observable-tests";
2 import "./observable-tests";
3 import "./state-tests";
3 import "./state-tests";
4 import "./subject-tests";
4 import "./subject-tests";
5 import "./observable-store-tests";
@@ -1,60 +1,59
1 import { id as mid } from "module";
1 import { id as mid } from "module";
2 import { djbase, djclass } from "@implab/djx/declare";
2 import { djbase, djclass } from "@implab/djx/declare";
3 import { attach, bind, createElement, prop } from "@implab/djx/tsx";
3 import { bind, createElement, prop } from "@implab/djx/tsx";
4 import { DjxWidgetBase } from "@implab/djx/tsx/DjxWidgetBase";
4 import { DjxWidgetBase } from "@implab/djx/tsx/DjxWidgetBase";
5 import Form from "@implab/djx/form/Form";
5 import Form from "@implab/djx/form/Form";
6 import { LocalDateTime } from "@js-joda/core";
7 import { TraceSource } from "@implab/core-amd/log/TraceSource";
6 import { TraceSource } from "@implab/core-amd/log/TraceSource";
8 import DateTextBox = require("dijit/form/DateTextBox");
7 import DateTextBox = require("dijit/form/DateTextBox");
9 import Button = require("dijit/form/Button");
8 import Button = require("dijit/form/Button");
10 import ValidationTextBox = require("dijit/form/ValidationTextBox");
9 import ValidationTextBox = require("dijit/form/ValidationTextBox");
11
10
12 const trace = TraceSource.get(mid);
11 const trace = TraceSource.get(mid);
13
12
14 interface NewAppointmentProps {
13 interface NewAppointmentProps {
15 title: string;
14 title: string;
16
15
17 startAt?: Date;
16 startAt?: Date;
18 }
17 }
19
18
20 @djclass
19 @djclass
21 export default class NewAppointment extends djbase(DjxWidgetBase) {
20 export default class NewAppointment extends djbase(DjxWidgetBase) {
22 value: NewAppointmentProps = {title: "Appointment 1"};
21 value: NewAppointmentProps = {title: "Appointment 1"};
23
22
24 render() {
23 render() {
25 return <section>
24 return <section>
26 <Form<NewAppointmentProps> ref={bind("value", prop(this, "value"))}
25 <Form<NewAppointmentProps> ref={bind("value", prop(this, "value"))}
27 method="DIALOG"
26 method="DIALOG"
28 onSubmit={this._onSubmit}
27 onSubmit={this._onSubmit}
29 onReset={this._onReset}
28 onReset={this._onReset}
30 onValidStateChange={this._onValidStateChange}
29 onValidStateChange={this._onValidStateChange}
31 onValueChange={this._onValueChange}
30 onValueChange={this._onValueChange}
32 >
31 >
33 <p><label>Title: <ValidationTextBox required name="title"/></label></p>
32 <p><label>Title: <ValidationTextBox required name="title"/></label></p>
34 <p><label>Appointment date: <DateTextBox required name="startAt"/></label></p>
33 <p><label>Appointment date: <DateTextBox required name="startAt"/></label></p>
35 <footer>
34 <footer>
36 <Button type="submit">Send</Button>
35 <Button type="submit">Send</Button>
37 <Button type="reset">Reset</Button>
36 <Button type="reset">Reset</Button>
38 </footer>
37 </footer>
39 </Form>
38 </Form>
40 </section>;
39 </section>;
41 }
40 }
42
41
43 private readonly _onSubmit = (evt: Event) => {
42 private readonly _onSubmit = (evt: Event) => {
44 trace.debug("onSubmit");
43 trace.debug("onSubmit");
45 };
44 };
46
45
47 private readonly _onValidStateChange = (isValid?: boolean) => {
46 private readonly _onValidStateChange = (isValid?: boolean) => {
48 trace.debug("isValid={0}", isValid);
47 trace.debug("isValid={0}", isValid);
49 };
48 };
50
49
51 private readonly _onValueChange = (value: NewAppointmentProps) => {
50 private readonly _onValueChange = (value: NewAppointmentProps) => {
52 trace.debug("valueChange={0}", value);
51 trace.debug("valueChange={0}", value);
53 };
52 };
54
53
55 private readonly _onReset = (evt: Event) => {
54 private readonly _onReset = (evt: Event) => {
56 trace.debug("onReset");
55 trace.debug("onReset");
57 //evt.preventDefault();
56 //evt.preventDefault();
58 };
57 };
59
58
60 } No newline at end of file
59 }
General Comments 0
You need to be logged in to leave comments. Login now