##// END OF EJS Templates
added reduce() and next() methods to observable...
cin -
r116:aac297dda27d v1.6.0 default
parent child
Show More
@@ -0,0 +1,19
1 {
2 // Используйте IntelliSense, чтобы узнать о возможных атрибутах.
3 // Наведите указатель мыши, чтобы просмотреть описания существующих атрибутов.
4 // Для получения дополнительной информации посетите: https://go.microsoft.com/fwlink/?linkid=830387
5 "version": "0.2.0",
6 "configurations": [
7 {
8 "type": "node",
9 "request": "launch",
10 "name": "Launch tests",
11 "skipFiles": [
12 "<node_internals>/**"
13 ],
14 "program": "${workspaceFolder}/djx/build/test/index.js",
15 "cwd": "${workspaceFolder}/djx/build/test",
16 "console": "integratedTerminal"
17 }
18 ]
19 } No newline at end of file
@@ -0,0 +1,58
1 import { PromiseOrValue } from "@implab/core-amd/interfaces";
2 import { isPromise } from "@implab/core-amd/safe";
3 import { observe, Observable } from "./observable";
4
5 export interface OrderedUpdate<T> {
6 /** The item is being updated */
7 readonly item: T;
8
9 /** The previous index of the item, -1 in case it is inserted */
10 readonly prevIndex: number;
11
12 /** The new index of the item, -1 in case it is deleted */
13 readonly newIndex: number;
14
15 }
16
17 export type QueryResults<T> = Observable<OrderedUpdate<T>>;
18
19 interface DjObservableResults<T> {
20 /**
21 * Allows observation of results
22 */
23 observe(listener: (object: T, previousIndex: number, newIndex: number) => void, includeUpdates?: boolean): {
24 remove(): void;
25 };
26 }
27
28 interface Queryable<T, A extends unknown[]> {
29 query(...args: A): PromiseOrValue<T[]>;
30 }
31
32 export const isObservableResults = <T>(v: object): v is DjObservableResults<T> =>
33 v && (typeof (v as { observe?: unknown; }).observe === "function");
34
35 export const query = <T, A extends unknown[]>(store: Queryable<T, A>, includeUpdates = true) =>
36 (...args: A) => {
37 return observe<OrderedUpdate<T>>(({ next, complete, error, isClosed }) => {
38 try {
39 const results = store.query(...args);
40 if (isPromise(results)) {
41 results.then(items => items.forEach((item, newIndex) => next({ item, newIndex, prevIndex: -1 })))
42 .then(undefined, error);
43 } else {
44 results.forEach((item, newIndex) => next({ item, newIndex, prevIndex: -1 }));
45 }
46
47 if (!isClosed() && isObservableResults<T>(results)) {
48 const h = results.observe((item, prevIndex, newIndex) => next({ item, prevIndex, newIndex }), includeUpdates);
49 return () => h.remove();
50 } else {
51 complete();
52 }
53 } catch (err) {
54 error(err);
55 }
56 });
57
58 };
@@ -9,7 +9,7
9 "version": "0.0.1-dev",
9 "version": "0.0.1-dev",
10 "license": "BSD-2-Clause",
10 "license": "BSD-2-Clause",
11 "devDependencies": {
11 "devDependencies": {
12 "@implab/core-amd": "^1.4.0",
12 "@implab/core-amd": "^1.4.6",
13 "@implab/dojo-typings": "1.0.3",
13 "@implab/dojo-typings": "1.0.3",
14 "@types/chai": "4.1.3",
14 "@types/chai": "4.1.3",
15 "@types/requirejs": "2.1.31",
15 "@types/requirejs": "2.1.31",
@@ -31,7 +31,7
31 "yaml": "~1.7.2"
31 "yaml": "~1.7.2"
32 },
32 },
33 "peerDependencies": {
33 "peerDependencies": {
34 "@implab/core-amd": "^1.4.0",
34 "@implab/core-amd": "^1.4.6",
35 "dojo": "^1.10.0"
35 "dojo": "^1.10.0"
36 }
36 }
37 },
37 },
@@ -453,10 +453,13
453 "dev": true
453 "dev": true
454 },
454 },
455 "node_modules/@implab/core-amd": {
455 "node_modules/@implab/core-amd": {
456 "version": "1.4.0",
456 "version": "1.4.6",
457 "resolved": "https://registry.npmjs.org/@implab/core-amd/-/core-amd-1.4.0.tgz",
457 "resolved": "https://registry.npmjs.org/@implab/core-amd/-/core-amd-1.4.6.tgz",
458 "integrity": "sha512-gaJX1mhri7YpmXDTAYELZnmTznzXYpk2AI7Decsttdi6xY+bqGgH24q0AFcKrx8RY2jfsFXxDdf0fITz2HpBbw==",
458 "integrity": "sha512-I1RwUAxeiodePpiBzveoHaehMSAyk7NFPPPEvDqfphHBC8yXoXWAaUrp7EcOKEzjXAs7lJQVhNpmjCjIqoj6BQ==",
459 "dev": true
459 "dev": true,
460 "peerDependencies": {
461 "dojo": "^1.10.0"
462 }
460 },
463 },
461 "node_modules/@implab/dojo-typings": {
464 "node_modules/@implab/dojo-typings": {
462 "version": "1.0.3",
465 "version": "1.0.3",
@@ -7040,10 +7043,11
7040 "dev": true
7043 "dev": true
7041 },
7044 },
7042 "@implab/core-amd": {
7045 "@implab/core-amd": {
7043 "version": "1.4.0",
7046 "version": "1.4.6",
7044 "resolved": "https://registry.npmjs.org/@implab/core-amd/-/core-amd-1.4.0.tgz",
7047 "resolved": "https://registry.npmjs.org/@implab/core-amd/-/core-amd-1.4.6.tgz",
7045 "integrity": "sha512-gaJX1mhri7YpmXDTAYELZnmTznzXYpk2AI7Decsttdi6xY+bqGgH24q0AFcKrx8RY2jfsFXxDdf0fITz2HpBbw==",
7048 "integrity": "sha512-I1RwUAxeiodePpiBzveoHaehMSAyk7NFPPPEvDqfphHBC8yXoXWAaUrp7EcOKEzjXAs7lJQVhNpmjCjIqoj6BQ==",
7046 "dev": true
7049 "dev": true,
7050 "requires": {}
7047 },
7051 },
7048 "@implab/dojo-typings": {
7052 "@implab/dojo-typings": {
7049 "version": "1.0.3",
7053 "version": "1.0.3",
@@ -15,11 +15,11
15 "access": "public"
15 "access": "public"
16 },
16 },
17 "peerDependencies": {
17 "peerDependencies": {
18 "@implab/core-amd": "^1.4.0",
18 "@implab/core-amd": "^1.4.6",
19 "dojo": "^1.10.0"
19 "dojo": "^1.10.0"
20 },
20 },
21 "devDependencies": {
21 "devDependencies": {
22 "@implab/core-amd": "^1.4.0",
22 "@implab/core-amd": "^1.4.6",
23 "@types/chai": "4.1.3",
23 "@types/chai": "4.1.3",
24 "@types/requirejs": "2.1.31",
24 "@types/requirejs": "2.1.31",
25 "@types/yaml": "1.2.0",
25 "@types/yaml": "1.2.0",
@@ -1,5 +1,5
1 import { PromiseOrValue } from "@implab/core-amd/interfaces";
1 import { Cancellation } from "@implab/core-amd/Cancellation";
2 import { isPromise } from "@implab/core-amd/safe";
2 import { ICancellation } from "@implab/core-amd/interfaces";
3
3
4 /**
4 /**
5 * The interface for the consumer of an observable sequence
5 * The interface for the consumer of an observable sequence
@@ -29,23 +29,23 export type Sink<T> = {
29 /**
29 /**
30 * Call to send the next element in the sequence
30 * Call to send the next element in the sequence
31 */
31 */
32 next: (value: T) => void;
32 next: (value: T) => void;
33
34 /**
35 * Call to notify about the error occurred in the sequence.
36 */
37 error: (e: unknown) => void;
33
38
34 /**
39 /**
35 * Call to notify about the error occurred in the sequence.
40 * Call to signal the end of the sequence.
36 */
41 */
37 error: (e: unknown) => void;
42 complete: () => void;
38
39 /**
40 * Call to signal the end of the sequence.
41 */
42 complete: () => void;
43
43
44 /**
44 /**
45 * Checks whether the sink is accepting new elements. It's safe to
45 * Checks whether the sink is accepting new elements. It's safe to
46 * send elements to the closed sink.
46 * send elements to the closed sink.
47 */
47 */
48 isClosed: () => boolean;
48 isClosed: () => boolean;
49 };
49 };
50
50
51 export type Producer<T> = (sink: Sink<T>) => (void | (() => void));
51 export type Producer<T> = (sink: Sink<T>) => (void | (() => void));
@@ -64,6 +64,8 export interface Subscribable<T> {
64 subscribe(consumer: Partial<Observer<T>>): Unsubscribable;
64 subscribe(consumer: Partial<Observer<T>>): Unsubscribable;
65 }
65 }
66
66
67 export type AccumulatorFn<T, A> = (acc: A, value: T) => A;
68
67 /** The observable source of items. */
69 /** The observable source of items. */
68 export interface Observable<T> extends Subscribable<T> {
70 export interface Observable<T> extends Subscribable<T> {
69 /** Transforms elements of the sequence with the specified mapper
71 /** Transforms elements of the sequence with the specified mapper
@@ -85,11 +87,33 export interface Observable<T> extends S
85 * @param accumulator
87 * @param accumulator
86 * @param initial
88 * @param initial
87 */
89 */
88 scan<A>(accumulator: (acc: A, value: T) => A, initial: A): Observable<A>;
90 scan<A>(accumulator: AccumulatorFn<T, A>, initial: A): Observable<A>;
91 scan(accumulator: AccumulatorFn<T, T>): Observable<T>;
89
92
93 /** Applies accumulator to each value in the sequence and
94 * emits the accumulated value at the end of the sequence
95 *
96 * @param accumulator
97 * @param initial
98 */
99 reduce<A>(accumulator: AccumulatorFn<T, A>, initial: A): Observable<A>;
100 reduce(accumulator: AccumulatorFn<T, T>): Observable<T>;
101
102 /** Concatenates the specified sequences with this observable
103 *
104 * @param seq sequences to concatenate with the current observable
105 */
90 cat(...seq: Subscribable<T>[]): Observable<T>;
106 cat(...seq: Subscribable<T>[]): Observable<T>;
91
107
92 pipe<U>(f: (source: Observable<T>) => Producer<U>): Observable<U>;
108 /** Pipes the specified operator to produce the new observable
109 * @param op The operator which consumes this observable and produces a new one
110 */
111 pipe<U>(op: (source: Observable<T>) => Producer<U>): Observable<U>;
112
113 /** Waits for the next event to occur and returns a promise for the next value
114 * @param ct Cancellation token to
115 */
116 next(ct?: ICancellation): Promise<T>;
93 }
117 }
94
118
95 const noop = () => { };
119 const noop = () => { };
@@ -134,24 +158,75 const _observe = <T>(producer: Producer<
134 subscribe: (consumer: Partial<Observer<T>>) => ({
158 subscribe: (consumer: Partial<Observer<T>>) => ({
135 unsubscribe: producer(sink(consumer)) ?? noop
159 unsubscribe: producer(sink(consumer)) ?? noop
136 }),
160 }),
161
137 map: (mapper) => _observe(({ next, ...rest }) =>
162 map: (mapper) => _observe(({ next, ...rest }) =>
138 producer({
163 producer({
139 next: next !== noop ? (v: T) => next(mapper(v)) : noop,
164 next: next !== noop ? (v: T) => next(mapper(v)) : noop,
140 ...rest
165 ...rest
141 })
166 })
142 ),
167 ),
168
143 filter: (predicate) => _observe(({ next, ...rest }) =>
169 filter: (predicate) => _observe(({ next, ...rest }) =>
144 producer({
170 producer({
145 next: next !== noop ? (v: T) => predicate(v) ? next(v) : void (0) : noop,
171 next: next !== noop ? (v: T) => predicate(v) ? next(v) : void (0) : noop,
146 ...rest
172 ...rest
147 })
173 })
148 ),
174 ),
149 scan: (accumulator, initial) => _observe(({ next, ...rest }) => {
175
150 let _acc = initial;
176 scan: <A>(...args: [AccumulatorFn<T, A>, A] | [AccumulatorFn<T, T>]) => _observe<T | A>(({ next, ...rest }) => {
151 return producer({
177 if (args.length === 1) {
152 next: next !== noop ? (v: T) => next(_acc = accumulator(_acc, v)) : noop,
178 const [accumulator] = args;
153 ...rest
179 let _acc: T;
154 });
180 let index = 0;
181 return producer({
182 next: next !== noop ? (v: T) => next(index++ === 0 ? _acc = v : _acc = accumulator(_acc, v)) : noop,
183 ...rest
184 });
185 } else {
186 const [accumulator, initial] = args;
187 let _acc = initial;
188 return producer({
189 next: next !== noop ? (v: T) => next(_acc = accumulator(_acc, v)) : noop,
190 ...rest
191 });
192 }
193 }),
194
195 reduce: <A>(...args: [AccumulatorFn<T, A>, A] | [AccumulatorFn<T, T>]) => _observe<T | A>(({ next, complete, error, ...rest }) => {
196 if (args.length === 1) {
197 const [accumulator] = args;
198 let _acc: T;
199 let index = 0;
200 return producer({
201 next: next !== noop ? (v: T) => {
202 _acc = index++ === 0 ? v : accumulator(_acc, v);
203 } : noop,
204 complete: () => {
205 if (index === 0) {
206 error(new Error("The sequence can't be empty"));
207 } else {
208 next(_acc);
209 complete();
210 }
211 },
212 error,
213 ...rest
214 });
215 } else {
216 const [accumulator, initial] = args;
217 let _acc = initial;
218 return producer({
219 next: next !== noop ? (v: T) => {
220 _acc = accumulator(_acc, v);
221 } : noop,
222 complete: () => {
223 next(_acc);
224 complete();
225 },
226 error,
227 ...rest
228 });
229 }
155 }),
230 }),
156
231
157 cat: (...seq) => _observe(({ next, complete: final, ...rest }) => {
232 cat: (...seq) => _observe(({ next, complete: final, ...rest }) => {
@@ -173,61 +248,59 const _observe = <T>(producer: Producer<
173 return () => cleanup();
248 return () => cleanup();
174 }),
249 }),
175
250
176 pipe: <U>(f: (source: Observable<T>) => Producer<U>) => observe(f(_observe(producer)))
251 pipe: <U>(op: (source: Observable<T>) => Producer<U>) => observe(op(_observe(producer))),
177 });
178
179 export interface OrderUpdate<T> {
180 /** The item is being updated */
181 item: T;
182
252
183 /** The previous index of the item, -1 in case it is inserted */
253 next: (ct?: ICancellation) => {
184 prevIndex: number;
254 const _ct = ct ?? Cancellation.none;
185
255 return new Promise<T>((resolve, reject) => {
186 /** The new index of the item, -1 in case it is deleted */
256 // wrap the producer to handle only single event
187 newIndex: number;
257 const once = fuse<T>(({ next, complete, error, isClosed }) => {
188 }
258 const h = _ct.register(error);
189
259
190 interface ObservableResults<T> {
260 // is the _ct fires it will call error() and isClosed() will return true
191 /**
261 const cleanup = !isClosed() ?
192 * Allows observation of results
262 producer({
193 */
263 next: v => (next(v), complete()),
194 observe(listener: (object: T, previousIndex: number, newIndex: number) => void, includeUpdates?: boolean): {
264 complete: () => error(new Error("The sequence is empty")),
195 remove(): void;
265 error,
196 };
266 isClosed
197 }
267 }) ?? noop :
268 noop;
198
269
199 interface Queryable<T, A extends unknown[]> {
270 return () => {
200 query(...args: A): PromiseOrValue<T[]>;
271 h.destroy();
201 }
272 cleanup();
273 };
274 });
202
275
203 export const isObservableResults = <T>(v: object): v is ObservableResults<T> =>
276 once({
204 v && (typeof (v as { observe?: unknown; }).observe === "function");
277 next: resolve,
278 error: reject,
279 complete: noop,
280 isClosed: () => false
281 });
282 });
283 }
284 });
205
285
206 export const observe = <T>(producer: Producer<T>) => _observe(fuse(producer));
286 export const observe = <T>(producer: Producer<T>) => _observe(fuse(producer));
207
287
208 export const empty = observe<never>(({ complete }) => complete());
288 export const streamArray = <T>(items: T[]) => _observe<T>(
289 ({ next, complete }) => (
290 items.forEach(next),
291 complete()
292 )
293 );
209
294
210 export const query = <T, A extends unknown[]>(store: Queryable<T, A>) =>
295 export const streamPromise = <T>(promise: PromiseLike<T>) => observe<T>(
211 (...args: A) => {
296 ({next, error, complete}) => void promise.then(v => (next(v), complete()), error)
212 return observe<OrderUpdate<T>>(({ next, complete, error }) => {
297 );
213 try {
214 const results = store.query(...args);
215 if (isPromise(results)) {
216 results.then(items => items.forEach((item, newIndex) => next({ item, newIndex, prevIndex: -1 })))
217 .then(undefined, error);
218 } else {
219 results.forEach((item, newIndex) => next({ item, newIndex, prevIndex: -1 }));
220 }
221
298
222 if (isObservableResults<T>(results)) {
299 export const of = <T>(...items: T[]) => _observe<T>(
223 const h = results.observe((item, prevIndex, newIndex) => next({ item, prevIndex, newIndex }));
300 ({ next, complete }) => (
224 return () => h.remove();
301 items.forEach(next),
225 } else {
302 complete()
226 complete();
303 )
227 }
304 );
228 } catch (err) {
229 error(err);
230 }
231 });
232
305
233 };
306 export const empty = _observe<never>(({ complete }) => complete()); No newline at end of file
@@ -7,10 +7,11 import Stateful = require("dojo/Stateful
7 import _WidgetBase = require("dijit/_WidgetBase");
7 import _WidgetBase = require("dijit/_WidgetBase");
8 import { DjxWidgetBase } from "./tsx/DjxWidgetBase";
8 import { DjxWidgetBase } from "./tsx/DjxWidgetBase";
9 import { WatchRendition } from "./tsx/WatchRendition";
9 import { WatchRendition } from "./tsx/WatchRendition";
10 import { Observable, observe, OrderUpdate, Subscribable } from "./observable";
10 import { Observable, observe, Subscribable } from "./observable";
11 import djAttr = require("dojo/dom-attr");
11 import djAttr = require("dojo/dom-attr");
12 import djClass = require("dojo/dom-class");
12 import djClass = require("dojo/dom-class");
13 import { AnimationAttrs, WatchForRendition } from "./tsx/WatchForRendition";
13 import { AnimationAttrs, WatchForRendition } from "./tsx/WatchForRendition";
14 import { OrderedUpdate } from "./store";
14
15
15 export function createElement<T extends Constructor | string | ((props: object) => Element)>(elementType: T, ...args: unknown[]): Rendition {
16 export function createElement<T extends Constructor | string | ((props: object) => Element)>(elementType: T, ...args: unknown[]): Rendition {
16 if (typeof elementType === "string") {
17 if (typeof elementType === "string") {
@@ -101,7 +102,7 export function watch(
101 }
102 }
102 }
103 }
103
104
104 export const watchFor = <T>(source: T[] | Subscribable<OrderUpdate<T>>, render: (item: T, index: number) => unknown, opts: AnimationAttrs = {}) => {
105 export const watchFor = <T>(source: T[] | Subscribable<OrderedUpdate<T>>, render: (item: T, index: number) => unknown, opts: AnimationAttrs = {}) => {
105 return new WatchForRendition({
106 return new WatchForRendition({
106 ...opts,
107 ...opts,
107 subject: source,
108 subject: source,
@@ -9,7 +9,8 import { collectNodes, destroy as safeDe
9 import { IDestroyable } from "@implab/core-amd/interfaces";
9 import { IDestroyable } from "@implab/core-amd/interfaces";
10 import { play } from "../play";
10 import { play } from "../play";
11 import * as fx from "dojo/fx";
11 import * as fx from "dojo/fx";
12 import { isObservableResults, isSubsribable, OrderUpdate, Subscribable } from "../observable";
12 import { isSubsribable, Subscribable } from "../observable";
13 import { isObservableResults, OrderedUpdate } from "../store";
13
14
14 const trace = TraceSource.get(mid);
15 const trace = TraceSource.get(mid);
15
16
@@ -21,7 +22,7 interface ItemRendition {
21 destroy(): void;
22 destroy(): void;
22 }
23 }
23
24
24 interface RenderTask<T> extends OrderUpdate<T> {
25 interface RenderTask<T> extends OrderedUpdate<T> {
25 animate: boolean;
26 animate: boolean;
26 }
27 }
27
28
@@ -34,7 +35,7 export interface AnimationAttrs {
34 }
35 }
35
36
36 export interface WatchForRenditionAttrs<T> extends AnimationAttrs {
37 export interface WatchForRenditionAttrs<T> extends AnimationAttrs {
37 subject: T[] | Subscribable<OrderUpdate<T>>;
38 subject: T[] | Subscribable<OrderedUpdate<T>>;
38
39
39 component: (arg: T, index: number) => unknown;
40 component: (arg: T, index: number) => unknown;
40 }
41 }
@@ -60,7 +61,7 export class WatchForRendition<T> extend
60
61
61 private readonly _itemRenditions: ItemRendition[] = [];
62 private readonly _itemRenditions: ItemRendition[] = [];
62
63
63 private readonly _subject: T[] | Subscribable<OrderUpdate<T>>;
64 private readonly _subject: T[] | Subscribable<OrderedUpdate<T>>;
64
65
65 private readonly _renderTasks: RenderTask<T>[] = [];
66 private readonly _renderTasks: RenderTask<T>[] = [];
66
67
@@ -97,7 +98,7 export class WatchForRendition<T> extend
97 const result = this._subject;
98 const result = this._subject;
98
99
99 if (result) {
100 if (result) {
100 if (isSubsribable<OrderUpdate<T>>(result)) {
101 if (isSubsribable<OrderedUpdate<T>>(result)) {
101 let animate = false;
102 let animate = false;
102 const subscription = result.subscribe({
103 const subscription = result.subscribe({
103 next: ({ item, prevIndex, newIndex }) => this._onItemUpdated({ item, prevIndex, newIndex, animate })
104 next: ({ item, prevIndex, newIndex }) => this._onItemUpdated({ item, prevIndex, newIndex, animate })
@@ -1,5 +1,6
1 import { observe } from "./observable";
1 import { empty, observe } from "./observable";
2 import * as t from "tap";
2 import * as t from "tap";
3 import { Cancellation } from "@implab/core-amd/Cancellation";
3
4
4 const subj1 = observe<number>(({ next, complete }) => {
5 const subj1 = observe<number>(({ next, complete }) => {
5 next(1);
6 next(1);
@@ -54,29 +55,73 t.ok(consumer2.completed, "The completio
54 const subj2 = observe<number>(({ next, complete }) => {
55 const subj2 = observe<number>(({ next, complete }) => {
55 [1, 2, 3, 4, 5].forEach(next);
56 [1, 2, 3, 4, 5].forEach(next);
56 complete();
57 complete();
57 }).pipe<string>(self => ({ next, complete, error }) => {
58 return () => {
58 t.comment("subj2: subscribe");
59 t.comment("subj2: unsubscribe");
60 };
61 });
62
63 const consumer3 = {
64 even: 0,
65 odd: 0,
66 completed: false,
67 subscribed: 0,
68 unsubscribed: 0,
69 next(v: "even" | "odd") {
70 this[v] ++;
71 },
72 complete() {
73 this.completed = true;
74 },
75 subscribe() {
76 this.subscribed++;
77 },
78 unsubscribe() {
79 this.unsubscribed++;
80 }
81 };
82
83
84 const subj3 = subj2.pipe<"even" | "odd">(self => ({ next, complete, error }) => {
85 consumer3.subscribe();
86 let count = 0;
59 const h = self.subscribe({
87 const h = self.subscribe({
60 next: val => {
88 next: val => {
61 if (val % 2 === 0)
89 if (val % 2 === 0)
62 next("odd");
90 next("odd");
63 else
91 else
64 next("even");
92 next("even");
93 if (++count === 4)
94 complete();
65 },
95 },
66 complete,
96 complete,
67 error
97 error
68 });
98 });
69 return () =>{
99 return () =>{
70 t.comment("subj2: unsubscribe");
100 consumer3.unsubscribe();
71 h.unsubscribe();
101 h.unsubscribe();
72 };
102 };
73 });
103 });
74
104
75 subj2.subscribe({
105 subj3.subscribe(consumer3);
76 next: val => t.comment("subj2: ", val),
106
107 t.equal(consumer3.odd, 2, "Should get 2 odd elements");
108 t.equal(consumer3.even, 2, "Should get 2 even elements");
109 t.ok(consumer3.completed, "The sequence should completed");
110 t.equal(consumer3.subscribed, 1, "The subscription should be done once");
111 t.equal(consumer3.unsubscribed, 1, "The cleanup should be done after completion");
112
113 subj2.reduce((a,b) => a + b).subscribe({
114 next: val => t.comment("subj2: reduce =", val),
77 complete: () => t.comment("subj2: complete")
115 complete: () => t.comment("subj2: complete")
78 });
116 });
79 subj2.subscribe({
117
80 next: val => t.comment("subj2: ", val),
118 subj2.reduce((a,b) => a + b).next()
81 complete: () => t.comment("subj2: complete")
119 .then(value => t.comment("subj2: next reduce=", value))
82 }); No newline at end of file
120 .catch(() => {});
121
122 subj2.next().then(val => t.equal(val, 1, "Should peek the first element")).catch(() => {});
123
124 const cancelled = new Cancellation(cancel => cancel());
125 t.rejects(subj2.next(cancelled), "Cancelled next() method should fail").catch(() => {});
126
127 t.rejects(empty.next(), "Empty sequence should fail to get next element").catch(() => {}); No newline at end of file
@@ -9,6 +9,7
9 "target": "ES5",
9 "target": "ES5",
10 "jsx": "react",
10 "jsx": "react",
11 "lib": ["es5", "es2015.promise", "es2015.symbol", "es2015.iterable", "dom", "scripthost"],
11 "lib": ["es5", "es2015.promise", "es2015.symbol", "es2015.iterable", "dom", "scripthost"],
12 "noUnusedLocals": true
12 "noUnusedLocals": true,
13 "downlevelIteration": true
13 }
14 }
14 } No newline at end of file
15 }
@@ -33,7 +33,7
33 "dev": true,
33 "dev": true,
34 "license": "BSD-2-Clause",
34 "license": "BSD-2-Clause",
35 "peerDependencies": {
35 "peerDependencies": {
36 "@implab/core-amd": "^1.4.0",
36 "@implab/core-amd": "^1.4.6",
37 "dojo": "^1.10.0"
37 "dojo": "^1.10.0"
38 }
38 }
39 },
39 },
@@ -1,4 +1,5
1 import { Contact } from "./Contact";
1 import { Contact } from "./Contact";
2 import { QueryResults } from "@implab/djx/store";
2
3
3 export type AppointmentRole = "organizer" | "speaker" | "participant";
4 export type AppointmentRole = "organizer" | "speaker" | "participant";
4
5
@@ -16,5 +17,5 export interface Appointment {
16 /** Duration in minutes */
17 /** Duration in minutes */
17 duration: number;
18 duration: number;
18
19
19 getMembers(role?: AppointmentRole): Member[];
20 getMembers(role?: AppointmentRole): QueryResults<Member>;
20 } No newline at end of file
21 }
@@ -3,9 +3,9 import Observable = require("dojo/store/
3 import { Appointment, AppointmentRole, Member } from "./Appointment";
3 import { Appointment, AppointmentRole, Member } from "./Appointment";
4 import { Contact } from "./Contact";
4 import { Contact } from "./Contact";
5 import { Uuid } from "@implab/core-amd/Uuid";
5 import { Uuid } from "@implab/core-amd/Uuid";
6 import { query } from "@implab/djx/observable";
7 import { IDestroyable } from "@implab/core-amd/interfaces";
6 import { IDestroyable } from "@implab/core-amd/interfaces";
8 import { delay } from "@implab/core-amd/safe";
7 import { delay } from "@implab/core-amd/safe";
8 import { query } from "@implab/djx/store";
9
9
10 type AppointmentRecord = Omit<Appointment, "getMembers"> & { id: string };
10 type AppointmentRecord = Omit<Appointment, "getMembers"> & { id: string };
11
11
@@ -41,8 +41,12 export class MainContext implements IDes
41 );
41 );
42 }
42 }
43
43
44 private readonly _queryAppointmentsRx = query(this._appointments);
45
46 private readonly _queryMembersRx = query(this._members);
47
44 queryAppointments({ dateFrom, dateTo }: { dateFrom?: Date; dateTo?: Date; } = {}) {
48 queryAppointments({ dateFrom, dateTo }: { dateFrom?: Date; dateTo?: Date; } = {}) {
45 return query(this._appointments)(({ startAt }) =>
49 return this._queryAppointmentsRx(({ startAt }) =>
46 (!dateFrom || dateFrom <= startAt) &&
50 (!dateFrom || dateFrom <= startAt) &&
47 (!dateTo || startAt <= dateTo)
51 (!dateTo || startAt <= dateTo)
48 ).map(item(this._mapAppointment));
52 ).map(item(this._mapAppointment));
@@ -61,7 +65,7 export class MainContext implements IDes
61 title,
65 title,
62 startAt,
66 startAt,
63 duration,
67 duration,
64 getMembers: (role?: AppointmentRole) => this._members.query(role ? { appointmentId: id, role } : { appointmentId: id })
68 getMembers: (role?: AppointmentRole) => this._queryMembersRx(role ? { appointmentId: id, role } : { appointmentId: id })
65 });
69 });
66
70
67 destroy() {
71 destroy() {
@@ -1,7 +1,8
1 import { id as mid } from "module";
1 import { id as mid } from "module";
2 import { BehaviorSubject, Observer, Unsubscribable } from "rxjs";
2 import { BehaviorSubject, Observer, Unsubscribable } from "rxjs";
3 import { IDestroyable } from "@implab/core-amd/interfaces";
3 import { IDestroyable } from "@implab/core-amd/interfaces";
4 import { OrderUpdate, Observable } from "@implab/djx/observable";
4 import { Observable } from "@implab/djx/observable";
5 import { OrderedUpdate } from "@implab/djx/store";
5 import { Appointment, Member } from "./Appointment";
6 import { Appointment, Member } from "./Appointment";
6 import { MainContext } from "./MainContext";
7 import { MainContext } from "./MainContext";
7 import { LocalDate } from "@js-joda/core";
8 import { LocalDate } from "@js-joda/core";
@@ -11,7 +12,7 import { TraceSource } from "@implab/cor
11 const trace = TraceSource.get(mid);
12 const trace = TraceSource.get(mid);
12
13
13 export interface State {
14 export interface State {
14 appointments: Observable<OrderUpdate<Appointment>>;
15 appointments: Observable<OrderedUpdate<Appointment>>;
15
16
16 dateTo: LocalDate;
17 dateTo: LocalDate;
17
18
@@ -2,7 +2,8 import { djbase, djclass } from "@implab
2 import { DjxWidgetBase } from "@implab/djx/tsx/DjxWidgetBase";
2 import { DjxWidgetBase } from "@implab/djx/tsx/DjxWidgetBase";
3 import { bind, createElement, prop, watch, watchFor } from "@implab/djx/tsx";
3 import { bind, createElement, prop, watch, watchFor } from "@implab/djx/tsx";
4 import MainModel from "../model/MainModel";
4 import MainModel from "../model/MainModel";
5 import { OrderUpdate, Observable } from "@implab/djx/observable";
5 import { Observable } from "@implab/djx/observable";
6 import { OrderedUpdate } from "@implab/djx/store";
6 import { Appointment } from "../model/Appointment";
7 import { Appointment } from "../model/Appointment";
7 import { LocalDate } from "@js-joda/core";
8 import { LocalDate } from "@js-joda/core";
8 import Button = require("dijit/form/Button");
9 import Button = require("dijit/form/Button");
@@ -10,7 +11,7 import Button = require("dijit/form/Butt
10 @djclass
11 @djclass
11 export default class MainWidget extends djbase(DjxWidgetBase) {
12 export default class MainWidget extends djbase(DjxWidgetBase) {
12
13
13 appointments?: Observable<OrderUpdate<Appointment>>;
14 appointments?: Observable<OrderedUpdate<Appointment>>;
14
15
15 model: MainModel;
16 model: MainModel;
16
17
General Comments 0
You need to be logged in to leave comments. Login now