##// END OF EJS Templates
Merge
cin -
r148:63f3ad8e6cff merge v1.10.0 default
parent child
Show More
@@ -1,54 +1,55
1 fc9f82c082ef432137da086a1fe9c37a12ba16a2 v1.0.0-rc2
1 fc9f82c082ef432137da086a1fe9c37a12ba16a2 v1.0.0-rc2
2 6d80d7901b4c8ffe8728e4a7bf5f4b7e7a669bb5 v1.0.0-rc3
2 6d80d7901b4c8ffe8728e4a7bf5f4b7e7a669bb5 v1.0.0-rc3
3 5a2c44d8e1f34dd30c2b50f92b7dc2e8f3247c43 v1.0.0-rc5
3 5a2c44d8e1f34dd30c2b50f92b7dc2e8f3247c43 v1.0.0-rc5
4 6c01fabe9ea9fb5e753fbeae8b0d2664e7072a66 v1.0.0-rc6
4 6c01fabe9ea9fb5e753fbeae8b0d2664e7072a66 v1.0.0-rc6
5 9e546fe36fdddc8324f1098ee950fa1a7ba19b93 v1.0.0-rc7
5 9e546fe36fdddc8324f1098ee950fa1a7ba19b93 v1.0.0-rc7
6 8f4d5e2c719a20ae6d65f1f4b5e2141ed765e975 v1.0.0-rc8
6 8f4d5e2c719a20ae6d65f1f4b5e2141ed765e975 v1.0.0-rc8
7 a1ab2b5975ad4b19599fb61538e7aaf329fb528c v1.0.0-rc10
7 a1ab2b5975ad4b19599fb61538e7aaf329fb528c v1.0.0-rc10
8 9b77ac3bf8f200876450ad50c308a9441a7f39c7 v1.0.0-rc11
8 9b77ac3bf8f200876450ad50c308a9441a7f39c7 v1.0.0-rc11
9 32b72f33756d3d10553b743f0b9f4504148cf97d v1.0.0-rc12
9 32b72f33756d3d10553b743f0b9f4504148cf97d v1.0.0-rc12
10 b88fac0e76c0e61e397e2995f468f7cf342afbc9 v1.0.0-rc13
10 b88fac0e76c0e61e397e2995f468f7cf342afbc9 v1.0.0-rc13
11 a46488b209e8aac583c1634043147d87740c63b4 v1.0.0-rc14
11 a46488b209e8aac583c1634043147d87740c63b4 v1.0.0-rc14
12 1174538197f6796384e643f62100292f1377b137 v1.0.0-rc15
12 1174538197f6796384e643f62100292f1377b137 v1.0.0-rc15
13 e8012fdf09ae442094f3831abe70649f8520659e 1.0.0-rc16
13 e8012fdf09ae442094f3831abe70649f8520659e 1.0.0-rc16
14 a1a1ef050ecc9d2a780e308cbffc2ad915a5c13d 1.0.0-rc17
14 a1a1ef050ecc9d2a780e308cbffc2ad915a5c13d 1.0.0-rc17
15 5c6c7e16919cff4019a55661725789d287439b75 v1.0.0-rc18
15 5c6c7e16919cff4019a55661725789d287439b75 v1.0.0-rc18
16 3b6c4159c66cecf6c5957d33ad474919608489c6 v1.0.0
16 3b6c4159c66cecf6c5957d33ad474919608489c6 v1.0.0
17 18383b2dcc1ae97fe3673242c84fe8d4b50a55f0 v1.0.1
17 18383b2dcc1ae97fe3673242c84fe8d4b50a55f0 v1.0.1
18 ed3c20c09b000386b5204b483955eb61ee662eff v1.0.2
18 ed3c20c09b000386b5204b483955eb61ee662eff v1.0.2
19 030ea350f98bb7a06e636251ea5cad403217e868 v1.0.3
19 030ea350f98bb7a06e636251ea5cad403217e868 v1.0.3
20 346ba910a5425ed95f7a67ef4db1dbbf599e2cab v1.0.5
20 346ba910a5425ed95f7a67ef4db1dbbf599e2cab v1.0.5
21 deb0ed6fb68015d89ba1287f6b766d1f80b6e4ff v1.0.7
21 deb0ed6fb68015d89ba1287f6b766d1f80b6e4ff v1.0.7
22 dd0d589acfbbe938a5630547e83ad934ab597b64 v1.0.8
22 dd0d589acfbbe938a5630547e83ad934ab597b64 v1.0.8
23 f2499237b5bf0ac2d73a8e437b8f15fa8a6306a5 v1.0.9
23 f2499237b5bf0ac2d73a8e437b8f15fa8a6306a5 v1.0.9
24 d4f0cdae9577caa605d9317d0ceb93f7b67895f3 v1.0.10
24 d4f0cdae9577caa605d9317d0ceb93f7b67895f3 v1.0.10
25 1a0018655d1c3dafc16ae50971cb535b8555e246 v1.1.0
25 1a0018655d1c3dafc16ae50971cb535b8555e246 v1.1.0
26 ff3695b0a48f00155ad6c3d7e000fe185ee6f95a v1.1.1
26 ff3695b0a48f00155ad6c3d7e000fe185ee6f95a v1.1.1
27 c276b9b7fa833242d1bf75eeeaae9924feee51e8 v1.1.2
27 c276b9b7fa833242d1bf75eeeaae9924feee51e8 v1.1.2
28 fdde09e66c009f4950ec69a393a8e789725da758 v1.2.0
28 fdde09e66c009f4950ec69a393a8e789725da758 v1.2.0
29 16678c6055f20ce1a49bdca5e6dabce88ee8deed v1.2.1
29 16678c6055f20ce1a49bdca5e6dabce88ee8deed v1.2.1
30 bc7556143fe536e3df374bc0070146311884284e v1.2.2
30 bc7556143fe536e3df374bc0070146311884284e v1.2.2
31 e5bb5e80ce96fc4ca0fbbf0b5390443a616e99fa v1.2.3
31 e5bb5e80ce96fc4ca0fbbf0b5390443a616e99fa v1.2.3
32 cc5be30e84f8a0d9a8e25447651576d9e46ab154 v1.2.4
32 cc5be30e84f8a0d9a8e25447651576d9e46ab154 v1.2.4
33 2807ab11174c7446830d91d5f1b652a18c6ecae5 v1.2.5
33 2807ab11174c7446830d91d5f1b652a18c6ecae5 v1.2.5
34 35a7b6319ebe24973fe10b8d82c19d3d86857b4e v1.2.6
34 35a7b6319ebe24973fe10b8d82c19d3d86857b4e v1.2.6
35 70058deb750dc18fcd9be83c28cb8371530fffd8 v1.2.7
35 70058deb750dc18fcd9be83c28cb8371530fffd8 v1.2.7
36 367f8caa5bf8fd2d4a56a6cff40be31ab82a2727 v1.2.8
36 367f8caa5bf8fd2d4a56a6cff40be31ab82a2727 v1.2.8
37 15c829aa08a2cf65cc074640199e79c5e49d2c93 v1.3.0
37 15c829aa08a2cf65cc074640199e79c5e49d2c93 v1.3.0
38 1a190b3a757dc650c2c612073cbffaaa720b6090 v1.4.0
38 1a190b3a757dc650c2c612073cbffaaa720b6090 v1.4.0
39 2ccfaae984e9a458580664923c87f258cff9890f v1.4.4
39 2ccfaae984e9a458580664923c87f258cff9890f v1.4.4
40 e9a9ed6d7647848e9c7f28e6e9f92d009259607d v1.5.0
40 e9a9ed6d7647848e9c7f28e6e9f92d009259607d v1.5.0
41 aac297dda27dac1e86bc93617ba172a2c92415f5 v1.6.0
41 aac297dda27dac1e86bc93617ba172a2c92415f5 v1.6.0
42 e07418577cbcaa6e31ccbf87c4495324ecbd88bd v1.6.1
42 e07418577cbcaa6e31ccbf87c4495324ecbd88bd v1.6.1
43 bc1b4dd8ca1a03af095ea1a61169c6025ef37fce v1.6.2
43 bc1b4dd8ca1a03af095ea1a61169c6025ef37fce v1.6.2
44 fb2ea4d6aabac2b374f3b2bfddaca6a83dedf34f v1.6.3
44 fb2ea4d6aabac2b374f3b2bfddaca6a83dedf34f v1.6.3
45 cede47727a1b0867d19be1d168aceb00b5e74e3f v1.7.0
45 cede47727a1b0867d19be1d168aceb00b5e74e3f v1.7.0
46 8095aad89415099d8e1942a8e67050c22e111d8b v1.7.1
46 8095aad89415099d8e1942a8e67050c22e111d8b v1.7.1
47 66546e70973248de01601aa364eadf11dd2f3a9f v1.8.0
47 66546e70973248de01601aa364eadf11dd2f3a9f v1.8.0
48 c7d9ad82b374c2a20710eba8fa40a9b1472ddc77 v1.8.1
48 c7d9ad82b374c2a20710eba8fa40a9b1472ddc77 v1.8.1
49 f139e2153e0da5649558b1ed9369da860d442c0d v1.9.0-rc1
49 f139e2153e0da5649558b1ed9369da860d442c0d v1.9.0-rc1
50 435ce00ba2452d565f1ed3bb884643b4b88b9cde v1.9.0-rc2
50 435ce00ba2452d565f1ed3bb884643b4b88b9cde v1.9.0-rc2
51 98b2c550c676ff6acf16f5bd56ee3804342f80b7 v1.9.0-rc3
51 98b2c550c676ff6acf16f5bd56ee3804342f80b7 v1.9.0-rc3
52 515d1b83ebdfaa93599451175055a94f711c079f v1.9.0-rc4
52 515d1b83ebdfaa93599451175055a94f711c079f v1.9.0-rc4
53 894b8239b953c0384cf32cab46cf41dac97ea03b v1.9.0-rc5
53 894b8239b953c0384cf32cab46cf41dac97ea03b v1.9.0-rc5
54 63215d91ae4b711eeb2145b9685240e1afada904 v1.9.0-rc6
54 af4f8424e83d56e89a64f39e19514ca10dbd43c6 v1.9.0
55 af4f8424e83d56e89a64f39e19514ca10dbd43c6 v1.9.0
@@ -1,411 +1,429
1 import { id as mid} from "module";
1 import { id as mid} from "module";
2 import { Cancellation } from "@implab/core-amd/Cancellation";
2 import { Cancellation } from "@implab/core-amd/Cancellation";
3 import { ICancellation } from "@implab/core-amd/interfaces";
3 import { ICancellation } from "@implab/core-amd/interfaces";
4 import { TraceSource } from "@implab/core-amd/log/TraceSource";
4 import { TraceSource } from "@implab/core-amd/log/TraceSource";
5 import { isPromise } from "@implab/core-amd/safe";
5 import { isPromise } from "@implab/core-amd/safe";
6
6
7 const trace = TraceSource.get(mid);
7 const trace = TraceSource.get(mid);
8
8
9 /**
9 /**
10 * The interface for the consumer of an observable sequence
10 * The interface for the consumer of an observable sequence
11 */
11 */
12 export interface Observer<T> {
12 export interface Observer<T> {
13 /**
13 /**
14 * Called for the next element in the sequence
14 * Called for the next element in the sequence
15 */
15 */
16 next?(value: T): void;
16 next?(value: T): void;
17
17
18 /**
18 /**
19 * Called once when the error occurs in the sequence.
19 * Called once when the error occurs in the sequence.
20 */
20 */
21 error?(e: unknown): void;
21 error?(e: unknown): void;
22
22
23 /**
23 /**
24 * Called once at the end of the sequence.
24 * Called once at the end of the sequence.
25 */
25 */
26 complete?(): void;
26 complete?(): void;
27 }
27 }
28
28
29 /**
29 /**
30 * The group of functions to feed an observable. These methods are provided to
30 * The group of functions to feed an observable. These methods are provided to
31 * the producer to generate a stream of events.
31 * the producer to generate a stream of events.
32 */
32 */
33 export type Sink<T> = {
33 export type Sink<T> = {
34 /**
34 /**
35 * Call to send the next element in the sequence
35 * Call to send the next element in the sequence
36 */
36 */
37 next: (value: T) => void;
37 next: (value: T) => void;
38
38
39 /**
39 /**
40 * Call to notify about the error occurred in the sequence.
40 * Call to notify about the error occurred in the sequence.
41 */
41 */
42 error: (e: unknown) => void;
42 error: (e: unknown) => void;
43
43
44 /**
44 /**
45 * Call to signal the end of the sequence.
45 * Call to signal the end of the sequence.
46 */
46 */
47 complete: () => void;
47 complete: () => void;
48
48
49 /**
49 /**
50 * Checks whether the sink is accepting new elements. It's safe to
50 * Checks whether the sink is accepting new elements. It's safe to
51 * send elements to the closed sink.
51 * send elements to the closed sink.
52 */
52 */
53 isClosed: () => boolean;
53 isClosed: () => boolean;
54 };
54 };
55
55
56 export type Producer<T> = (sink: Sink<T>) => (void | (() => void));
56 export type Producer<T> = (sink: Sink<T>) => (void | (() => void));
57
57
58 type FusedSink<T> = Omit<Sink<T>, "isClosed">;
58 type FusedSink<T> = Omit<Sink<T>, "isClosed">;
59
59
60 type FusedProducer<T> = (sink: FusedSink<T>) => (void | (() => void));
60 type FusedProducer<T> = (sink: FusedSink<T>) => (void | (() => void));
61
61
62 export interface Unsubscribable {
62 export interface Unsubscribable {
63 unsubscribe(): void;
63 unsubscribe(): void;
64 }
64 }
65
65
66 export const isUnsubscribable = (v: unknown): v is Unsubscribable =>
66 export const isUnsubscribable = (v: unknown): v is Unsubscribable =>
67 v !== null && v !== undefined && typeof (v as Unsubscribable).unsubscribe === "function";
67 v !== null && v !== undefined && typeof (v as Unsubscribable).unsubscribe === "function";
68
68
69 export const isSubscribable = <T = unknown>(v: unknown): v is Subscribable<T> =>
69 export const isSubscribable = <T = unknown>(v: unknown): v is Subscribable<T> =>
70 v !== null && v !== undefined && typeof (v as Subscribable<unknown>).subscribe === "function";
70 v !== null && v !== undefined && typeof (v as Subscribable<unknown>).subscribe === "function";
71
71
72 export interface Subscribable<T> {
72 export interface Subscribable<T> {
73 subscribe(consumer: Observer<T>): Unsubscribable;
73 /** Subscribes a consumer to events. If a consumer isn't specified
74 * this method activates the producer to achieve side affects if any.
75 */
76 subscribe(consumer?: Observer<T>): Unsubscribable;
74 }
77 }
75
78
76 export type AccumulatorFn<T, A> = (acc: A, value: T) => A;
79 export type AccumulatorFn<T, A> = (acc: A, value: T) => A;
77
80
78 export type OperatorFn<T, U> = (source: Observable<T>) => Observable<U>;
81 export type OperatorFn<T, U> = (source: Observable<T>) => Observable<U>;
79
82
80 /** The observable source of items. */
83 /** The observable source of items. */
81 export interface Observable<T> extends Subscribable<T> {
84 export interface Observable<T> extends Subscribable<T> {
82 /** Transforms elements of the sequence with the specified mapper
85 /** Transforms elements of the sequence with the specified mapper
83 *
86 *
84 * @param mapper The mapper used to transform the values
87 * @param mapper The mapper used to transform the values
85 */
88 */
86 map<T2>(mapper: (value: T) => T2): Observable<T2>;
89 map<T2>(mapper: (value: T) => T2): Observable<T2>;
87
90
91 /** Injects the specified observer into the each producer to consumer chain.
92 * The method is used to add side effect to the events processing.
93 *
94 * @param observer The consumer for the events
95 */
96 tap(observer: Observer<T>): Observable<T>;
97
88 /** Filters elements of the sequence. The resulting sequence will
98 /** Filters elements of the sequence. The resulting sequence will
89 * contain only elements which match the specified predicate.
99 * contain only elements which match the specified predicate.
90 *
100 *
91 * @param predicate The filter predicate.
101 * @param predicate The filter predicate.
92 */
102 */
93 filter(predicate: (value: T) => boolean): Observable<T>;
103 filter(predicate: (value: T) => boolean): Observable<T>;
94
104
95 /** Completes the sequence once the condition is met.
105 /** Completes the sequence once the condition is met.
96 * @param predicate The condition which should be met to complete the sequence
106 * @param predicate The condition which should be met to complete the sequence
97 */
107 */
98 until(predicate: (value: T) => boolean): Observable<T>;
108 until(predicate: (value: T) => boolean): Observable<T>;
99
109
100 /** Keeps the sequence running while elements satisfy the condition.
110 /** Keeps the sequence running while elements satisfy the condition.
101 *
111 *
102 * @param predicate The condition which should be met to continue.
112 * @param predicate The condition which should be met to continue.
103 */
113 */
104 while(predicate: (value: T) => boolean): Observable<T>;
114 while(predicate: (value: T) => boolean): Observable<T>;
105
115
106 /** Applies accumulator to each value in the sequence and
116 /** Applies accumulator to each value in the sequence and
107 * emits the accumulated value for each source element
117 * emits the accumulated value for each source element
108 *
118 *
109 * @param accumulator
119 * @param accumulator
110 * @param initial
120 * @param initial
111 */
121 */
112 scan<A>(accumulator: AccumulatorFn<T, A>, initial: A): Observable<A>;
122 scan<A>(accumulator: AccumulatorFn<T, A>, initial: A): Observable<A>;
113 scan(accumulator: AccumulatorFn<T, T>): Observable<T>;
123 scan(accumulator: AccumulatorFn<T, T>): Observable<T>;
114
124
115 /** Applies accumulator to each value in the sequence and
125 /** Applies accumulator to each value in the sequence and
116 * emits the accumulated value at the end of the sequence
126 * emits the accumulated value at the end of the sequence
117 *
127 *
118 * @param accumulator
128 * @param accumulator
119 * @param initial
129 * @param initial
120 */
130 */
121 reduce<A>(accumulator: AccumulatorFn<T, A>, initial: A): Observable<A>;
131 reduce<A>(accumulator: AccumulatorFn<T, A>, initial: A): Observable<A>;
122 reduce(accumulator: AccumulatorFn<T, T>): Observable<T>;
132 reduce(accumulator: AccumulatorFn<T, T>): Observable<T>;
123
133
124 /** Concatenates the specified sequences with this observable
134 /** Concatenates the specified sequences with this observable
125 *
135 *
126 * @param seq sequences to concatenate with the current observable
136 * @param seq sequences to concatenate with the current observable
127 *
137 *
128 * The concatenation doesn't accumulate values from the specified sequences,
138 * The concatenation doesn't accumulate values from the specified sequences,
129 * The result of the concatenation is the new observable which will switch
139 * The result of the concatenation is the new observable which will switch
130 * to the next observable after the previous one completes. Values emitted
140 * to the next observable after the previous one completes. Values emitted
131 * before the next observable being active are lost.
141 * before the next observable being active are lost.
132 */
142 */
133 cat(...seq: Subscribable<T>[]): Observable<T>;
143 cat(...seq: Subscribable<T>[]): Observable<T>;
134
144
135
145
136 /** Pipes the specified operator to produce the new observable
146 /** Pipes the specified operator to produce the new observable
137 * @param op The operator consumes this observable and produces a new one
147 * @param op The operator consumes this observable and produces a new one
138 *
148 *
139 * The operator is a higher order function which takes a source observable
149 * The operator is a higher order function which takes a source observable
140 * and returns a producer for the new observable.
150 * and returns a producer for the new observable.
141 *
151 *
142 * This function can be used to create a complex mapping between source and
152 * This function can be used to create a complex mapping between source and
143 * resulting observables. The operator may have a state (or a side effect)
153 * resulting observables. The operator may have a state (or a side effect)
144 * and can be connected to multiple observables.
154 * and can be connected to multiple observables.
145 */
155 */
146 pipe<U>(op: OperatorFn<T, U>): Observable<U>;
156 pipe<U>(op: OperatorFn<T, U>): Observable<U>;
147
157
148 /** Waits for the next event to occur and returns a promise for the next value
158 /** Waits for the next event to occur and returns a promise for the next value
149 * @param ct Cancellation token
159 * @param ct Cancellation token
150 */
160 */
151 next(ct?: ICancellation): Promise<T>;
161 next(ct?: ICancellation): Promise<T>;
152
162
153 /** Collects items of the sequence to the array. */
163 /** Collects items of the sequence to the array. */
154 collect(ct?: ICancellation): Promise<T[]>;
164 collect(ct?: ICancellation): Promise<T[]>;
155 }
165 }
156
166
157 const noop = () => { };
167 const noop = () => { };
158
168
159 const errorFallback = (e: unknown) => trace.error("Unhandled observable error: {0}", e);
169 const errorFallback = (e: unknown) => trace.error("Unhandled observable error: {0}", e);
160
170
161 const sink = <T>(consumer: Observer<T>) => {
171 const sink = <T>(consumer: Observer<T>) => {
162 // eslint-disable-next-line @typescript-eslint/unbound-method
172 // eslint-disable-next-line @typescript-eslint/unbound-method
163 const { next, error, complete } = consumer;
173 const { next, error, complete } = consumer;
164 return {
174 return {
165 next: next ? next.bind(consumer) : noop,
175 next: next ? next.bind(consumer) : noop,
166 error: error ? error.bind(consumer) : errorFallback, // report unhandled errors
176 error: error ? error.bind(consumer) : errorFallback, // report unhandled errors
167 complete: complete ? complete.bind(consumer) : noop
177 complete: complete ? complete.bind(consumer) : noop
168 };
178 };
169 };
179 };
170
180
171 /** Wraps the producer to handle tear down logic and subscription management
181 /** Wraps the producer to handle tear down logic and subscription management
172 *
182 *
173 * The resulting producer will invoke cleanup logic on error or complete events
183 * The resulting producer will invoke cleanup logic on error or complete events
174 * and will prevent calling of any method from the sink.
184 * and will prevent calling of any method from the sink.
175 *
185 *
176 * @param producer The producer to wrap
186 * @param producer The producer to wrap
177 * @returns The wrapper producer
187 * @returns The wrapper producer
178 */
188 */
179 const fuse = <T>(producer: Producer<T>) => ({ next, error, complete }: FusedSink<T>) => {
189 const fuse = <T>(producer: Producer<T>) => ({ next, error, complete }: FusedSink<T>) => {
180 let done = false;
190 let done = false;
181 let cleanup = noop;
191 let cleanup = noop;
182
192
183 const _fin = <A extends unknown[]>(fn: (...args: A) => void) =>
193 const _fin = <A extends unknown[]>(fn: (...args: A) => void) =>
184 (...args: A) => done ?
194 (...args: A) => done ?
185 void (0) :
195 void (0) :
186 (done = true, cleanup(), fn(...args));
196 (done = true, cleanup(), fn(...args));
187
197
188 const _fin0 = () => done ? void (0) : (done = true, cleanup());
198 const _fin0 = () => done ? void (0) : (done = true, cleanup());
189
199
190 const safeSink = {
200 const safeSink = {
191 next: (value: T) => { !done && next(value); },
201 next: (value: T) => { !done && next(value); },
192 error: _fin(error),
202 error: _fin(error),
193 complete: _fin(complete),
203 complete: _fin(complete),
194 isClosed: () => done
204 isClosed: () => done
195 };
205 };
196 // call the producer
206 // call the producer
197 cleanup = producer(safeSink) ?? noop;
207 cleanup = producer(safeSink) ?? noop;
198 // if the producer throws exception bypass it to the caller rather then to
208 // if the producer throws exception bypass it to the caller rather then to
199 // the sink. This is a feature.
209 // the sink. This is a feature.
200
210
201 // if the producer completed the sequence immediately call the cleanup in place
211 // if the producer completed the sequence immediately call the cleanup in place
202 return done ? cleanup() : _fin0;
212 return done ? cleanup() : _fin0;
203 };
213 };
204
214
205 const _observe = <T>(producer: FusedProducer<T>): Observable<T> => ({
215 const _observe = <T>(producer: FusedProducer<T>): Observable<T> => ({
206 subscribe: (consumer: Observer<T>) => ({
216 subscribe: (consumer: Observer<T> = {}) => ({
207 unsubscribe: producer(sink(consumer)) ?? noop
217 unsubscribe: producer(sink(consumer)) ?? noop
208 }),
218 }),
209
219
210 map: (mapper) => _observe(({ next, ...rest }) =>
220 map: (mapper) => _observe(({ next, ...rest }) =>
211 producer({
221 producer({
212 next: next !== noop ? (v: T) => next(mapper(v)) : noop,
222 next: next !== noop ? (v: T) => next(mapper(v)) : noop,
213 ...rest
223 ...rest
214 })
224 })
215 ),
225 ),
216
226
227 tap: ({next: tapNext, complete: tapComplete, error: tapError}) => _observe(({next,complete, error}) =>
228 producer({
229 next: tapNext ? (v => (tapNext(v), next(v))) : next,
230 complete: tapComplete ? (() => (tapComplete(), complete())): complete,
231 error: tapError ? (e => (tapError(e), error(e))) : error
232 })
233 ),
234
217 filter: (predicate) => _observe(({ next, ...rest }) =>
235 filter: (predicate) => _observe(({ next, ...rest }) =>
218 producer({
236 producer({
219 next: next !== noop ? (v: T) => predicate(v) ? next(v) : void (0) : noop,
237 next: next !== noop ? (v: T) => predicate(v) ? next(v) : void (0) : noop,
220 ...rest
238 ...rest
221 })
239 })
222 ),
240 ),
223
241
224 until: predicate => _observe(({ next, complete, ...rest }) =>
242 until: predicate => _observe(({ next, complete, ...rest }) =>
225 producer({
243 producer({
226 next: v => predicate(v) ? complete() : next(v),
244 next: v => predicate(v) ? complete() : next(v),
227 complete,
245 complete,
228 ...rest
246 ...rest
229 })
247 })
230 ),
248 ),
231
249
232 while: predicate => _observe(({ next, complete, ...rest }) =>
250 while: predicate => _observe(({ next, complete, ...rest }) =>
233 producer({
251 producer({
234 next: v => predicate(v) ? next(v) : complete(),
252 next: v => predicate(v) ? next(v) : complete(),
235 complete,
253 complete,
236 ...rest
254 ...rest
237 })
255 })
238 ),
256 ),
239
257
240 scan: <A>(...args: [AccumulatorFn<T, A>, A] | [AccumulatorFn<T, T>]) => _observe<T | A>(({ next, ...rest }) => {
258 scan: <A>(...args: [AccumulatorFn<T, A>, A] | [AccumulatorFn<T, T>]) => _observe<T | A>(({ next, ...rest }) => {
241 if (args.length === 1) {
259 if (args.length === 1) {
242 const [accumulator] = args;
260 const [accumulator] = args;
243 let _acc: T;
261 let _acc: T;
244 let index = 0;
262 let index = 0;
245 return producer({
263 return producer({
246 next: next !== noop ? (v: T) => next(index++ === 0 ? _acc = v : _acc = accumulator(_acc, v)) : noop,
264 next: next !== noop ? (v: T) => next(index++ === 0 ? _acc = v : _acc = accumulator(_acc, v)) : noop,
247 ...rest
265 ...rest
248 });
266 });
249 } else {
267 } else {
250 const [accumulator, initial] = args;
268 const [accumulator, initial] = args;
251 let _acc = initial;
269 let _acc = initial;
252 return producer({
270 return producer({
253 next: next !== noop ? (v: T) => next(_acc = accumulator(_acc, v)) : noop,
271 next: next !== noop ? (v: T) => next(_acc = accumulator(_acc, v)) : noop,
254 ...rest
272 ...rest
255 });
273 });
256 }
274 }
257 }),
275 }),
258
276
259 reduce: <A>(...args: [AccumulatorFn<T, A>, A] | [AccumulatorFn<T, T>]) => _observe<T | A>(({ next, complete, error }) => {
277 reduce: <A>(...args: [AccumulatorFn<T, A>, A] | [AccumulatorFn<T, T>]) => _observe<T | A>(({ next, complete, error }) => {
260 if (args.length === 1) {
278 if (args.length === 1) {
261 const [accumulator] = args;
279 const [accumulator] = args;
262 let _acc: T;
280 let _acc: T;
263 let index = 0;
281 let index = 0;
264 return producer({
282 return producer({
265 next: next !== noop ? (v: T) => {
283 next: next !== noop ? (v: T) => {
266 _acc = index++ === 0 ? v : accumulator(_acc, v);
284 _acc = index++ === 0 ? v : accumulator(_acc, v);
267 } : noop,
285 } : noop,
268 complete: () => {
286 complete: () => {
269 if (index === 0) {
287 if (index === 0) {
270 error(new Error("The sequence can't be empty"));
288 error(new Error("The sequence can't be empty"));
271 } else {
289 } else {
272 next(_acc);
290 next(_acc);
273 complete();
291 complete();
274 }
292 }
275 },
293 },
276 error
294 error
277 });
295 });
278 } else {
296 } else {
279 const [accumulator, initial] = args;
297 const [accumulator, initial] = args;
280 let _acc = initial;
298 let _acc = initial;
281 return producer({
299 return producer({
282 next: next !== noop ? (v: T) => {
300 next: next !== noop ? (v: T) => {
283 _acc = accumulator(_acc, v);
301 _acc = accumulator(_acc, v);
284 } : noop,
302 } : noop,
285 complete: () => {
303 complete: () => {
286 next(_acc);
304 next(_acc);
287 complete();
305 complete();
288 },
306 },
289 error
307 error
290 });
308 });
291 }
309 }
292 }),
310 }),
293
311
294 cat: (...seq) => _observe(({ next, complete: final, ...rest }) => {
312 cat: (...seq) => _observe(({ next, complete: final, ...rest }) => {
295 let cleanup: () => void;
313 let cleanup: () => void;
296 const len = seq.length;
314 const len = seq.length;
297 const complete = (i: number) => i < len ?
315 const complete = (i: number) => i < len ?
298 () => {
316 () => {
299 const subscription = seq[i].subscribe({ next, complete: complete(i + 1), ...rest });
317 const subscription = seq[i].subscribe({ next, complete: complete(i + 1), ...rest });
300 cleanup = subscription.unsubscribe.bind(subscription);
318 cleanup = subscription.unsubscribe.bind(subscription);
301 } : final;
319 } : final;
302
320
303 cleanup = producer({ next, complete: complete(0), ...rest }) ?? noop;
321 cleanup = producer({ next, complete: complete(0), ...rest }) ?? noop;
304
322
305 return () => cleanup();
323 return () => cleanup();
306 }),
324 }),
307
325
308 pipe: <U>(op: OperatorFn<T, U>) => op(_observe(producer)),
326 pipe: <U>(op: OperatorFn<T, U>) => op(_observe(producer)),
309
327
310 next: collect(
328 next: collect(
311 producer,
329 producer,
312 ({ next, complete, error }) => ({
330 ({ next, complete, error }) => ({
313 next: v => (next(v), complete()),
331 next: v => (next(v), complete()),
314 complete: () => error(new Error("The sequence is empty")),
332 complete: () => error(new Error("The sequence is empty")),
315 error
333 error
316 })
334 })
317 ),
335 ),
318
336
319 collect: collect(
337 collect: collect(
320 producer,
338 producer,
321 ({ next, complete, error}) => {
339 ({ next, complete, error}) => {
322 const data: T[] = [];
340 const data: T[] = [];
323 return {
341 return {
324 next: v => data.push(v),
342 next: v => data.push(v),
325 complete: () => (next(data), complete()),
343 complete: () => (next(data), complete()),
326 error
344 error
327 };
345 };
328 }
346 }
329 )
347 )
330 });
348 });
331
349
332 const collect = <T, U>(
350 const collect = <T, U>(
333 producer: FusedProducer<T>,
351 producer: FusedProducer<T>,
334 collector: (result: FusedSink<U>) => FusedSink<T>
352 collector: (result: FusedSink<U>) => FusedSink<T>
335 ) => (ct = Cancellation.none) => new Promise<U>((resolve, reject) => {
353 ) => (ct = Cancellation.none) => new Promise<U>((resolve, reject) => {
336 const fused = fuse<U>(({ next, complete, error, isClosed }) => {
354 const fused = fuse<U>(({ next, complete, error, isClosed }) => {
337 const h = ct.register(error);
355 const h = ct.register(error);
338 const cleanup = !isClosed() ?
356 const cleanup = !isClosed() ?
339 producer(collector({ next, complete, error })) ?? noop :
357 producer(collector({ next, complete, error })) ?? noop :
340 noop;
358 noop;
341
359
342 return () => {
360 return () => {
343 h.destroy();
361 h.destroy();
344 cleanup();
362 cleanup();
345 };
363 };
346 });
364 });
347
365
348 fused({
366 fused({
349 next: resolve,
367 next: resolve,
350 error: reject,
368 error: reject,
351 complete: noop
369 complete: noop
352 });
370 });
353 });
371 });
354
372
355 export const observe = <T>(producer: Producer<T>) => _observe(fuse(producer));
373 export const observe = <T>(producer: Producer<T>) => _observe(fuse(producer));
356
374
357 /** Converts an array to the observable sequence of its elements. */
375 /** Converts an array to the observable sequence of its elements. */
358 export const ofArray = <T>(items: T[]) => _observe<T>(
376 export const ofArray = <T>(items: T[]) => _observe<T>(
359 ({ next, complete }) => (
377 ({ next, complete }) => (
360 items.forEach(next),
378 items.forEach(next),
361 complete()
379 complete()
362 )
380 )
363 );
381 );
364
382
365 /** Converts a subscribable to the observable */
383 /** Converts a subscribable to the observable */
366 export const ofSubscribable = <T>(subscribable: Subscribable<T>) =>
384 export const ofSubscribable = <T>(subscribable: Subscribable<T>) =>
367 observe<T>(sink => {
385 observe<T>(sink => {
368 const subscription = subscribable.subscribe(sink);
386 const subscription = subscribable.subscribe(sink);
369 return () => subscription.unsubscribe();
387 return () => subscription.unsubscribe();
370 });
388 });
371
389
372 const of1 = <T>(item: T | PromiseLike<T>) => observe<T>(
390 const of1 = <T>(item: T | PromiseLike<T>) => observe<T>(
373 ({ next, error, complete }) =>
391 ({ next, error, complete }) =>
374 isPromise(item) ?
392 isPromise(item) ?
375 void item.then(
393 void item.then(
376 v => (next(v), complete()),
394 v => (next(v), complete()),
377 error
395 error
378 ) :
396 ) :
379 (next(item), complete())
397 (next(item), complete())
380 );
398 );
381
399
382 /** Converts a list of parameter values to the observable sequence. The
400 /** Converts a list of parameter values to the observable sequence. The
383 * order of elements in the list will be preserved in the resulting sequence.
401 * order of elements in the list will be preserved in the resulting sequence.
384 */
402 */
385 export const of = <T>(...items: (T | PromiseLike<T>)[]) => items.length === 1 ?
403 export const of = <T>(...items: (T | PromiseLike<T>)[]) => items.length === 1 ?
386 of1(items[0]) :
404 of1(items[0]) :
387 observe<T>(
405 observe<T>(
388 ({ next, error, complete, isClosed }) => {
406 ({ next, error, complete, isClosed }) => {
389 const n = items.length;
407 const n = items.length;
390
408
391 const _next = (start: number) => {
409 const _next = (start: number) => {
392 if (start > 0 && isClosed()) // when resumed
410 if (start > 0 && isClosed()) // when resumed
393 return;
411 return;
394
412
395 for (let i = start; i < n; i++) {
413 for (let i = start; i < n; i++) {
396 const r = items[i];
414 const r = items[i];
397 if (isPromise(r)) {
415 if (isPromise(r)) {
398 r.then(v => (next(v), _next(i + 1)), error);
416 r.then(v => (next(v), _next(i + 1)), error);
399 return; // suspend
417 return; // suspend
400 } else {
418 } else {
401 next(r);
419 next(r);
402 }
420 }
403 }
421 }
404 complete();
422 complete();
405 };
423 };
406
424
407 _next(0);
425 _next(0);
408 }
426 }
409 );
427 );
410
428
411 export const empty = _observe<never>(({ complete }) => complete());
429 export const empty = _observe<never>(({ complete }) => complete());
@@ -1,125 +1,152
1 import { PromiseOrValue } from "@implab/core-amd/interfaces";
1 import { PromiseOrValue } from "@implab/core-amd/interfaces";
2 import { isCancellable, isPromise } from "@implab/core-amd/safe";
2 import { isCancellable, isPromise } from "@implab/core-amd/safe";
3 import { observe, Observable, empty } from "./observable";
3 import { observe, Observable, empty } from "./observable";
4 import { after } from "dojo/aspect";
4 import { after } from "dojo/aspect";
5
5
6 export interface OrderedUpdate<T> {
6 export interface OrderedUpdate<T> {
7 /** The item is being updated */
7 /** The item is being updated */
8 readonly item: T;
8 readonly item: T;
9
9
10 /** The previous index of the item, -1 in case it is inserted */
10 /** The previous index of the item, -1 in case it is inserted */
11 readonly prevIndex: number;
11 readonly prevIndex: number;
12
12
13 /** The new index of the item, -1 in case it is deleted */
13 /** The new index of the item, -1 in case it is deleted */
14 readonly newIndex: number;
14 readonly newIndex: number;
15
15
16 }
16 }
17
17
18 export type QueryResults<T> = Observable<OrderedUpdate<T>>;
18 export type QueryResults<T> = Observable<OrderedUpdate<T>>;
19
19
20 interface DjObservableResults<T> {
20 interface DjObservableResults<T> {
21 /**
21 /**
22 * Allows observation of results
22 * Allows observation of results
23 */
23 */
24 observe(listener: (object: T, previousIndex: number, newIndex: number) => void, includeUpdates?: boolean): {
24 observe(listener: (object: T, previousIndex: number, newIndex: number) => void, includeUpdates?: boolean): {
25 remove(): void;
25 remove(): void;
26 };
26 };
27 }
27 }
28
28
29 interface Queryable<T, Q, O> {
29 interface Queryable<T, Q, O> {
30 query(query?: Q, options?: O): PromiseOrValue<T[]>;
30 query(query?: Q, options?: O): PromiseOrValue<T[]>;
31 }
31 }
32
32
33 export const isDjObservableResults = <T>(v: object): v is DjObservableResults<T> =>
33 export const isDjObservableResults = <T>(v: object): v is DjObservableResults<T> =>
34 v && (typeof (v as { observe?: unknown; }).observe === "function");
34 v && (typeof (v as { observe?: unknown; }).observe === "function");
35
35
36 export const query = <T, Q, O>(store: Queryable<T, Q, O>, includeUpdates = true) =>
36 export const query = <T, Q, O>(store: Queryable<T, Q, O>, includeUpdates = true) => {
37 (query?: Q, options?: O & { observe?: boolean }) => {
37 const q = queryEx(store, includeUpdates);
38 return observe<OrderedUpdate<T>>(({ next, complete, error, isClosed }) => {
38 return (query?: Q, options?: O & { observe?: boolean }) => {
39 const [data, updates] = q(query, options);
40
41 return options?.observe === false ? data : data.cat(updates);
42 };
43 };
39
44
45 export const queryEx = <T, Q, O>(store: Queryable<T, Q, O>, includeUpdates = true) =>
46 (query?: Q, options?: O): [data: QueryResults<T>, updates: QueryResults<T>] => {
47
48 const pending: T[] = [];
49
50 let results: PromiseOrValue<T[]> = pending;
51
52 const data = observe<OrderedUpdate<T>>(({ next, complete, error }) => {
40 const processResults = (items: T[]) =>
53 const processResults = (items: T[]) =>
41 items.forEach((item, newIndex) => next({ item, newIndex, prevIndex: -1 }));
54 items.forEach((item, newIndex) => next({ item, newIndex, prevIndex: -1 }));
42
55
43 try {
56 try {
44 const results = store.query(query, options);
57 if (results === pending)
58 results = store.query(query, options);
59
45 if (isPromise(results)) {
60 if (isPromise(results)) {
46 results.then(processResults).then(undefined, error);
61 results.then(processResults).then(complete, error);
62
63 if (isCancellable(results))
64 return results.cancel.bind(results);
47 } else {
65 } else {
48 processResults(results);
66 processResults(results);
67 complete();
49 }
68 }
69 } catch (e) {
70 error(e);
71 }
72 });
50
73
51 if (!isClosed() && (options?.observe !== false) && isDjObservableResults<T>(results)) {
74 const updates = observe<OrderedUpdate<T>>(({ next, complete, error, isClosed }) => {
75 try {
76 if (!isClosed() && isDjObservableResults<T>(results)) {
52 const h = results.observe((item, prevIndex, newIndex) => next({ item, prevIndex, newIndex }), includeUpdates);
77 const h = results.observe((item, prevIndex, newIndex) => next({ item, prevIndex, newIndex }), includeUpdates);
53 return () => h.remove();
78 return () => h.remove();
54 } else {
79 } else {
55 complete();
80 complete();
56 }
81 }
57 } catch (err) {
82 } catch (e) {
58 error(err);
83 error(e);
59 }
84 }
60 });
85 });
61
86
87 return [data, updates];
62 };
88 };
63
89
90
64 interface IndexedStore<T> {
91 interface IndexedStore<T> {
65 get(id: string | number): PromiseLike<T> | T | null | undefined;
92 get(id: string | number): PromiseLike<T> | T | null | undefined;
66 }
93 }
67
94
68 interface Notifications<T> {
95 interface Notifications<T> {
69 notify(item: T | undefined, id: string | number | undefined): void;
96 notify(item: T | undefined, id: string | number | undefined): void;
70 }
97 }
71
98
72 const hasNotifications = <T>(x: unknown): x is Notifications<T> =>
99 const hasNotifications = <T>(x: unknown): x is Notifications<T> =>
73 typeof x === "object" && x !== null && (typeof (x as Notifications<T>).notify === "function");
100 typeof x === "object" && x !== null && (typeof (x as Notifications<T>).notify === "function");
74
101
75 interface GetOpts {
102 interface GetOpts {
76 observe?: boolean;
103 observe?: boolean;
77 }
104 }
78
105
79 type Change<T> = [item: NonNullable<T>, id: string | number | undefined] |
106 export type ItemUpdate<T> = [item: NonNullable<T>, id: string | number | undefined] |
80 [item: undefined | null, id: string | number];
107 [item: undefined | null, id: string | number];
81
108
82 const filterItem = (itemId: string | number) =>
109 const filterItem = (itemId: string | number) =>
83 <T>(source: Observable<Change<T>>) =>
110 <T>(source: Observable<ItemUpdate<T>>) =>
84 observe<T>(({ next, complete, error }) => {
111 observe<T>(({ next, complete, error }) => {
85 const subscription = source
112 const subscription = source
86 .filter(([, id]) => id === itemId)
113 .filter(([, id]) => id === itemId)
87 .subscribe({
114 .subscribe({
88 next: ([item]) => item !== null && item !== undefined ? next(item) : complete(),
115 next: ([item]) => item !== null && item !== undefined ? next(item) : complete(),
89 complete,
116 complete,
90 error
117 error
91 });
118 });
92 return () => subscription.unsubscribe();
119 return () => subscription.unsubscribe();
93 });
120 });
94
121
95 export const get = <T>(store: IndexedStore<T>) => {
122 export const get = <T>(store: IndexedStore<T>) => {
96 const changes = hasNotifications<T>(store) ?
123 const changes = hasNotifications<T>(store) ?
97 observe<Change<T>>(({ next }) => {
124 observe<ItemUpdate<T>>(({ next }) => {
98 const handle = after(store, "notify", (...args: Change<T>) => next(args), true);
125 const handle = after(store, "notify", (...args: ItemUpdate<T>) => next(args), true);
99 return () => handle.remove();
126 return () => handle.remove();
100 }) : empty;
127 }) : empty;
101
128
102 return (id: string | number, opts: GetOpts = {}) =>
129 return (id: string | number, opts: GetOpts = {}) =>
103 observe<T>(({ next, complete, error }) => {
130 observe<T>(({ next, complete, error }) => {
104 try {
131 try {
105 const result = store.get(id);
132 const result = store.get(id);
106
133
107 const handle = (x: T | null | undefined) => {
134 const handle = (x: T | null | undefined) => {
108 if (x !== null && x !== undefined)
135 if (x !== null && x !== undefined)
109 next(x);
136 next(x);
110 complete();
137 complete();
111 };
138 };
112
139
113 if (isPromise(result)) {
140 if (isPromise(result)) {
114 result.then(handle).then(undefined, error);
141 result.then(handle).then(undefined, error);
115
142
116 if (isCancellable(result))
143 if (isCancellable(result))
117 return () => result.cancel();
144 return () => result.cancel();
118 } else {
145 } else {
119 handle(result);
146 handle(result);
120 }
147 }
121 } catch (e) {
148 } catch (e) {
122 error(e);
149 error(e);
123 }
150 }
124 }).cat(opts.observe !== false ? changes.pipe(filterItem(id)) : empty);
151 }).cat(opts.observe !== false ? changes.pipe(filterItem(id)) : empty);
125 }; No newline at end of file
152 };
@@ -1,65 +1,89
1 import Memory = require("dojo/store/Memory");
1 import Memory = require("dojo/store/Memory");
2 import Observerable = require("dojo/store/Observable");
2 import Observerable = require("dojo/store/Observable");
3 import { get } from "./store";
3 import { get, queryEx } from "./store";
4 import tap = require("tap");
4 import tap = require("tap");
5
5
6 interface Person {
6 interface Person {
7 id: string;
7 id: string;
8
8
9 name: string;
9 name: string;
10
10
11 age: number;
11 age: number;
12 }
12 }
13
13
14 tap.test("store::get(...) tests", async t => {
14 tap.test("store::get(...) tests", async t => {
15 const store = new Observerable(new Memory<Person>());
15 const store = new Observerable(new Memory<Person>());
16
16
17 const getPerson = get(store);
17 const getPerson = get(store);
18
18
19 const peterId = "id:peter";
19 const peterId = "id:peter";
20
20
21 const samId = "id:sam";
21 const samId = "id:sam";
22
22
23 const peter = getPerson(peterId);
23 const peter = getPerson(peterId);
24 const sam = getPerson(samId);
24 const sam = getPerson(samId);
25
25
26 const seq1 = await getPerson(peterId, { observe: false }).collect();
26 const seq1 = await getPerson(peterId, { observe: false }).collect();
27
27
28 t.ok(seq1.length === 0, "Should be empty sequence");
28 t.ok(seq1.length === 0, "Should be empty sequence");
29
29
30 let peterChangeCount = 0;
30 let peterChangeCount = 0;
31 let samChangeCount = 0;
31 let samChangeCount = 0;
32 let peterDeleted = 0;
32 let peterDeleted = 0;
33
33
34 const peterSubscription = peter.subscribe({
34 const peterSubscription = peter.subscribe({
35 next: () => peterChangeCount++,
35 next: () => peterChangeCount++,
36 complete: () => peterDeleted++
36 complete: () => peterDeleted++
37 });
37 });
38 const samSubscription = sam.subscribe({
38 const samSubscription = sam.subscribe({
39 next: () => samChangeCount++
39 next: () => samChangeCount++
40 });
40 });
41
41
42 try {
42 try {
43 t.equal(peterChangeCount, 0, "Should be no changes recorded");
43 t.equal(peterChangeCount, 0, "Should be no changes recorded");
44
44
45 store.put({id: peterId, name: "Peter", age: 30 });
45 store.put({id: peterId, name: "Peter", age: 30 });
46
46
47 t.equal(peterChangeCount, 1, "Should record 1 object change");
47 t.equal(peterChangeCount, 1, "Should record 1 object change");
48 t.equal(samChangeCount, 0, "Should not record other object changes");
48 t.equal(samChangeCount, 0, "Should not record other object changes");
49
49
50 store.remove(peterId);
50 store.remove(peterId);
51
51
52 t.equal(peterDeleted, 1, "Should complete sequence");
52 t.equal(peterDeleted, 1, "Should complete sequence");
53 t.equal(peterChangeCount, 1, "Should not record remove operations");
53 t.equal(peterChangeCount, 1, "Should not record remove operations");
54
54
55 store.put({id: peterId, name: "Peter", age: 29});
55 store.put({id: peterId, name: "Peter", age: 29});
56
56
57 t.equal(peterChangeCount, 1, "Should not record changes after completion");
57 t.equal(peterChangeCount, 1, "Should not record changes after completion");
58
58
59 } finally {
59 } finally {
60 peterSubscription.unsubscribe();
60 peterSubscription.unsubscribe();
61 samSubscription.unsubscribe();
61 samSubscription.unsubscribe();
62 }
62 }
63
63
64 store.put({ id: samId, name: "Sam", age: 29});
65
66 const [data, updates] = queryEx(store)({ age: 29}, { sort: [{attribute: "id"}] });
67
68 const dump: string[] = [];
69
70 const subscription = data
71 .tap({
72 complete: () => dump.push("eof")
73 })
74 .cat(updates)
75 .tap({
76 next: ({item: {id}}) => dump.push(id),
77 complete: () => dump.push("eof")
78 })
79 .subscribe({});
80
81 t.same(dump, ["id:peter", "id:sam", "eof"]);
82
83 store.put({ id: "id:mary", name: "Mary", age: 29});
84
85 t.same(dump, ["id:peter", "id:sam", "eof", "id:mary"]);
86
87 subscription.unsubscribe();
64
88
65 }).catch(() => { }); No newline at end of file
89 }).catch(() => { });
@@ -1,150 +1,173
1 import { empty, observe, of } from "./observable";
1 import { empty, observe, of } from "./observable";
2 import * as tap from "tap";
2 import * as tap from "tap";
3 import { Cancellation } from "@implab/core-amd/Cancellation";
3 import { Cancellation } from "@implab/core-amd/Cancellation";
4 import { delay } from "@implab/core-amd/safe";
4 import { delay } from "@implab/core-amd/safe";
5
5
6 const subj1 = observe<number>(({ next, complete }) => {
6 const subj1 = observe<number>(({ next, complete }) => {
7 next(1);
7 next(1);
8 complete();
8 complete();
9 next(2);
9 next(2);
10 });
10 });
11
11
12 const consumer1 = {
12 const consumer1 = {
13 sum: 0,
13 sum: 0,
14 next(v: number) {
14 next(v: number) {
15 this.sum += v;
15 this.sum += v;
16 }
16 }
17 };
17 };
18
18
19 subj1.subscribe(consumer1);
19 subj1.subscribe(consumer1);
20 tap.equal(consumer1.sum, 1, "Should get only one value");
20 tap.equal(consumer1.sum, 1, "Should get only one value");
21
21
22 subj1.subscribe(consumer1);
22 subj1.subscribe(consumer1);
23 tap.equal(consumer1.sum, 2, "Should get the value again");
23 tap.equal(consumer1.sum, 2, "Should get the value again");
24
24
25 const consumer2 = {
25 const consumer2 = {
26 value: 0,
26 value: 0,
27 completed: false,
27 completed: false,
28 next(v: number) { this.value = v; },
28 next(v: number) { this.value = v; },
29 complete() { this.completed = true; }
29 complete() { this.completed = true; }
30 };
30 };
31
31
32 let maps = 0;
32 let maps = 0;
33
33
34 subj1
34 subj1
35 .map(v => {
35 .map(v => {
36 tap.comment(`map1: ${v * 2}`);
36 tap.comment(`map1: ${v * 2}`);
37 maps++;
37 maps++;
38 return v * 2;
38 return v * 2;
39 })
39 })
40 .map(v => {
40 .map(v => {
41 tap.comment(`map2: ${v * 2}`);
41 tap.comment(`map2: ${v * 2}`);
42 maps++;
42 maps++;
43 return v * 2;
43 return v * 2;
44 })
44 })
45 .map(v => {
45 .map(v => {
46 tap.comment(`map3: ${v * 2}`);
46 tap.comment(`map3: ${v * 2}`);
47 maps++;
47 maps++;
48 return v * 2;
48 return v * 2;
49 })
49 })
50 .subscribe(consumer2);
50 .subscribe(consumer2);
51
51
52 tap.equal(consumer2.value, 8, "Should map");
52 tap.equal(consumer2.value, 8, "Should map");
53 tap.equal(maps, 3, "The map chain should not be executed after completion");
53 tap.equal(maps, 3, "The map chain should not be executed after completion");
54 tap.ok(consumer2.completed, "The completion signal should pass through");
54 tap.ok(consumer2.completed, "The completion signal should pass through");
55
55
56 const subj2 = observe<number>(({ next, complete }) => {
56 const subj2 = observe<number>(({ next, complete }) => {
57 [1, 2, 3, 4, 5].forEach(next);
57 [1, 2, 3, 4, 5].forEach(next);
58 complete();
58 complete();
59 return () => {
59 return () => {
60 tap.comment("subj2: unsubscribe");
60 tap.comment("subj2: unsubscribe");
61 };
61 };
62 });
62 });
63
63
64 const consumer3 = {
64 const consumer3 = {
65 even: 0,
65 even: 0,
66 odd: 0,
66 odd: 0,
67 completed: false,
67 completed: false,
68 subscribed: 0,
68 subscribed: 0,
69 unsubscribed: 0,
69 unsubscribed: 0,
70 next(v: "even" | "odd") {
70 next(v: "even" | "odd") {
71 this[v]++;
71 this[v]++;
72 },
72 },
73 complete() {
73 complete() {
74 this.completed = true;
74 this.completed = true;
75 },
75 },
76 subscribe() {
76 subscribe() {
77 this.subscribed++;
77 this.subscribed++;
78 },
78 },
79 unsubscribe() {
79 unsubscribe() {
80 this.unsubscribed++;
80 this.unsubscribed++;
81 }
81 }
82 };
82 };
83
83
84
84
85 const subj3 = subj2.pipe<"even" | "odd">(self => observe(({ next, complete, error }) => {
85 const subj3 = subj2.pipe<"even" | "odd">(self => observe(({ next, complete, error }) => {
86 consumer3.subscribe();
86 consumer3.subscribe();
87 let count = 0;
87 let count = 0;
88 const h = self.subscribe({
88 const h = self.subscribe({
89 next: val => {
89 next: val => {
90 if (val % 2 === 0)
90 if (val % 2 === 0)
91 next("odd");
91 next("odd");
92 else
92 else
93 next("even");
93 next("even");
94 if (++count === 4)
94 if (++count === 4)
95 complete();
95 complete();
96 },
96 },
97 complete,
97 complete,
98 error
98 error
99 });
99 });
100 return () => {
100 return () => {
101 consumer3.unsubscribe();
101 consumer3.unsubscribe();
102 h.unsubscribe();
102 h.unsubscribe();
103 };
103 };
104 }));
104 }));
105
105
106 subj3.subscribe(consumer3);
106 subj3.subscribe(consumer3);
107
107
108 tap.equal(consumer3.odd, 2, "Should get 2 odd elements");
108 tap.equal(consumer3.odd, 2, "Should get 2 odd elements");
109 tap.equal(consumer3.even, 2, "Should get 2 even elements");
109 tap.equal(consumer3.even, 2, "Should get 2 even elements");
110 tap.ok(consumer3.completed, "The sequence should completed");
110 tap.ok(consumer3.completed, "The sequence should completed");
111 tap.equal(consumer3.subscribed, 1, "The subscription should be done once");
111 tap.equal(consumer3.subscribed, 1, "The subscription should be done once");
112 tap.equal(consumer3.unsubscribed, 1, "The cleanup should be done after completion");
112 tap.equal(consumer3.unsubscribed, 1, "The cleanup should be done after completion");
113
113
114 subj2.reduce((a, b) => a + b).subscribe({
114 subj2.reduce((a, b) => a + b).subscribe({
115 next: val => tap.comment("subj2: reduce =", val),
115 next: val => tap.comment("subj2: reduce =", val),
116 complete: () => tap.comment("subj2: complete")
116 complete: () => tap.comment("subj2: complete")
117 });
117 });
118
118
119 tap.test("of(...) tests", async t => {
119 tap.test("of(...) tests", async t => {
120 await subj2.reduce((a, b) => a + b).next()
120 await subj2.reduce((a, b) => a + b).next()
121 .then(value => t.comment("subj2: next reduce=", value));
121 .then(value => t.comment("subj2: next reduce=", value));
122
122
123 await subj2.next().then(val => t.equal(val, 1, "Should peek the first element"));
123 await subj2.next().then(val => t.equal(val, 1, "Should peek the first element"));
124
124
125 const cancelled = new Cancellation(cancel => cancel());
125 const cancelled = new Cancellation(cancel => cancel());
126 await t.rejects(subj2.next(cancelled), "Cancelled next() method should fail");
126 await t.rejects(subj2.next(cancelled), "Cancelled next() method should fail");
127
127
128 await t.rejects(empty.next(), "Empty sequence should fail to get next element");
128 await t.rejects(empty.next(), "Empty sequence should fail to get next element");
129
129
130 await of(delay(1).then(() => 1), Promise.resolve(2), 3)
130 await of(delay(1).then(() => 1), Promise.resolve(2), 3)
131 .reduce<number[]>((a, x) => [...a, x], [])
131 .reduce<number[]>((a, x) => [...a, x], [])
132 .next()
132 .next()
133 .then(res => t.same(res, [1, 2, 3], "of(...) should keep the order"));
133 .then(res => t.same(res, [1, 2, 3], "of(...) should keep the order"));
134
134
135 const rejected = Promise.reject("DIE!");
135 const rejected = Promise.reject("DIE!");
136 rejected.catch(() => { }); // SAFE AND SOUND
136 rejected.catch(() => { }); // SAFE AND SOUND
137
137
138 await t.resolves(
138 await t.resolves(
139 of(Promise.resolve(1), rejected).next(),
139 of(Promise.resolve(1), rejected).next(),
140 "of(...) should emit non-rejected items"
140 "of(...) should emit non-rejected items"
141 );
141 );
142 await t.rejects(
142 await t.rejects(
143 of(1, Promise.reject("DIE!")).reduce((a) => a).next(),
143 of(1, Promise.reject("DIE!")).reduce((a) => a).next(),
144 "of(...) should terminate with error when a parameter is rejected"
144 "of(...) should terminate with error when a parameter is rejected"
145 );
145 );
146
146
147 t.same(await of(1,2,3).collect(), [1,2,3], ".collect() should return the collected sequence");
147 t.same(await of(1,2,3).collect(), [1,2,3], ".collect() should return the collected sequence");
148 await t.rejects(of(1,2,3).collect(cancelled), ".collect() should support cancellation");
148 await t.rejects(of(1,2,3).collect(cancelled), ".collect() should support cancellation");
149
149
150 }).catch(() => { });
151
152 tap.test(".tap() tests", async t => {
153 const side: number[] = [];
154
155 of(1,2)
156 .tap({next: v => side.push(v), complete: () => side.push(0)})
157 .tap({next: v => side.push(v*v)})
158 .subscribe({});
159
160 t.same(side, [1,1,2,4,0], ".tap() should be called in the order of registration");
161
162 side.length = 0;
163
164 await new Promise<void>(resolve => {
165 of(1,2,delay(1).then(() => 3))
166 .tap({next: v => side.push(v)})
167 .tap({ next: v => v === 1 && resolve()})
168 .subscribe({});
169 });
170
171 t.same(side, [1,2], ".tap() should be processed synchronously");
172
150 }).catch(() => { }); No newline at end of file
173 }).catch(() => {});
General Comments 0
You need to be logged in to leave comments. Login now