##// END OF EJS Templates
WIP observables
cin -
r157:f9518367061a default
parent child
Show More
@@ -1,105 +1,69
1 1 import { isPromise } from "@implab/core-amd/safe";
2 import { Unsubscribable, Producer, FusedSink, Observable, Subscribable } from "./observable/interfaces";
2 import { Unsubscribable, Producer, Observable, Subscribable } from "./observable/interfaces";
3 3 import { ObservableImpl } from "./observable/ObservableImpl";
4 4
5 5 export const isUnsubscribable = (v: unknown): v is Unsubscribable =>
6 6 v !== null && v !== undefined && typeof (v as Unsubscribable).unsubscribe === "function";
7 7
8 8 export const isSubscribable = <T = unknown>(v: unknown): v is Subscribable<T> =>
9 9 v !== null && v !== undefined && typeof (v as Subscribable<unknown>).subscribe === "function";
10 10
11 const noop = () => { };
12
13 /** Wraps the producer to handle tear down logic and subscription management
14 *
15 * The resulting producer will invoke cleanup logic on error or complete events
16 * and will prevent calling of any method from the sink.
17 *
18 * @param producer The producer to wrap
19 * @returns The wrapper producer
20 */
21 const fuse = <T>(producer: Producer<T>) => ({ next, error, complete }: FusedSink<T>) => {
22 let done = false;
23 let cleanup = noop;
24
25 const _fin = <A extends unknown[]>(fn: (...args: A) => void) =>
26 (...args: A) => done ?
27 void (0) :
28 (done = true, cleanup(), fn(...args));
29
30 const _fin0 = () => done ? void (0) : (done = true, cleanup());
31
32 const safeSink = {
33 next: (value: T) => { !done && next(value); },
34 error: _fin(error),
35 complete: _fin(complete),
36 isClosed: () => done
37 };
38 // call the producer
39 cleanup = producer(safeSink) ?? noop;
40 // if the producer throws exception bypass it to the caller rather then to
41 // the sink. This is a feature.
42
43 // if the producer completed the sequence immediately call the cleanup in place
44 return done ? cleanup() : _fin0;
45 };
46
47 export const observe = <T>(producer: Producer<T>): Observable<T> => new ObservableImpl<T>(fuse(producer));
11 export const observe = <T>(producer: Producer<T>): Observable<T> => new ObservableImpl<T>(producer);
48 12
49 13 /** Converts an array to the observable sequence of its elements. */
50 export const ofArray = <T>(items: T[]) => new ObservableImpl<T>(
14 export const ofArray = <T>(items: T[]): Observable<T> => new ObservableImpl<T>(
51 15 ({ next, complete }) => (
52 16 items.forEach(next),
53 17 complete()
54 18 )
55 19 );
56 20
57 21 /** Converts a subscribable to the observable */
58 22 export const ofSubscribable = <T>(subscribable: Subscribable<T>) =>
59 23 observe<T>(sink => {
60 24 const subscription = subscribable.subscribe(sink);
61 25 return () => subscription.unsubscribe();
62 26 });
63 27
64 28 const of1 = <T>(item: T | PromiseLike<T>) => observe<T>(
65 29 ({ next, error, complete }) =>
66 30 isPromise(item) ?
67 31 void item.then(
68 32 v => (next(v), complete()),
69 33 error
70 34 ) :
71 35 (next(item), complete())
72 36 );
73 37
74 38 /** Converts a list of parameter values to the observable sequence. The
75 39 * order of elements in the list will be preserved in the resulting sequence.
76 40 */
77 41 export const of = <T>(...items: (T | PromiseLike<T>)[]) => items.length === 1 ?
78 42 of1(items[0]) :
79 43 observe<T>(
80 44 ({ next, error, complete, isClosed }) => {
81 45 const n = items.length;
82 46
83 47 const _next = (start: number) => {
84 48 if (start > 0 && isClosed()) // when resumed
85 49 return;
86 50
87 51 for (let i = start; i < n; i++) {
88 52 const r = items[i];
89 53 if (isPromise(r)) {
90 54 r.then(v => (next(v), _next(i + 1)), error);
91 55 return; // suspend
92 56 } else {
93 57 next(r);
94 58 }
95 59 }
96 60 complete();
97 61 };
98 62
99 63 _next(0);
100 64 }
101 65 );
102 66
103 export const empty = new ObservableImpl<never>(({ complete }) => complete());
67 export const empty: Observable<never> = new ObservableImpl<never>(({ complete }) => complete());
104 68
105 69 export type * from "./observable/interfaces"; No newline at end of file
@@ -1,226 +1,264
1 1 import { id as mid } from "module";
2 import { AccumulatorFn, Observable, Observer, OperatorFn, Producer, Subscribable } from "./interfaces";
2 import { AccumulatorFn, Observable, Observer, OperatorFn, Producer, Sink, Subscribable, Unsubscribable } from "./interfaces";
3 3 import { TraceSource } from "@implab/core-amd/log/TraceSource";
4 4 import { Predicate } from "@implab/core-amd/interfaces";
5 5 import { Cancellation } from "@implab/core-amd/Cancellation";
6 6
7 7 const trace = TraceSource.get(mid);
8 8
9 9 const noop = () => { };
10 10
11 11 const errorFallback = (e: unknown) => trace.error("Unhandled observable error: {0}", e);
12 12
13 const subscription = () => {
13 const subscription = (reject: (err: unknown) => void, ct = Cancellation.none) => {
14 14 let done = false;
15 15 let cleanup = noop;
16 16
17 17 const finalize = () => done ? false : (cleanup(), done = true);
18 18 const isClosed = () => done;
19 19
20 20 const teardown = (cb: void | (() => void)) => {
21 21 if (cb) {
22 22 if (done) {
23 23 cb();
24 24 } else {
25 25 const _prev = cleanup;
26 cleanup = () => (_prev(), cb());
26 cleanup = _prev === noop ? cb : () => (_prev(), cb());
27 27 }
28 28 }
29 29 };
30 30
31 return { finalize, isClosed, teardown };
31 const error = (err: unknown) => finalize() && reject(err);
32
33 if (ct.isSupported()) {
34 const h = ct.register(error);
35 teardown(() => h.destroy());
36 }
37
38 return { finalize, isClosed, teardown, error };
32 39 };
33 40
41 type FusedSink<T> = Omit<Sink<T>, "isClosed">;
42
43 export const fuse = <T>(producer: Producer<T>) => (consumer: FusedSink<T>) => {
44
45 const reject = (err: unknown) => consumer.error ? consumer.error(err) : errorFallback(err);
46 const { teardown, isClosed, finalize, error } = subscription(reject);
47
48 const next = consumer.next ? consumer.next.bind(consumer) : noop;
49
50 teardown(producer({
51 next: next !== noop ? (v => isClosed() || next(v)) : noop,
52 error,
53 complete: () => finalize() && consumer.complete && consumer.complete(),
54 isClosed
55 }));
56
57 return { unsubscribe: finalize };
58 };
59
60 type FusedProducer<T> = (consumer: FusedSink<T>) => void | (() => void);
61
34 62 export class ObservableImpl<T> implements Observable<T> {
35 63
36 private readonly _producer: Producer<T>;
64 private readonly _producer: FusedProducer<T>;
37 65
38 constructor(producer: Producer<T>) {
66 constructor(producer: FusedProducer<T>) {
39 67 this._producer = producer;
40 68 }
41 69
42 70 subscribe(consumer: Observer<T> = {}) {
43 const { teardown, isClosed, finalize } = subscription();
71 const reject = (err: unknown) => consumer.error ? consumer.error(err) : errorFallback(err);
72 const next = consumer.next ? consumer.next.bind(consumer) : noop;
73
74 const { teardown, isClosed, finalize, error } = subscription(reject);
44 75
45 teardown(this._producer({
46 next: consumer.next ? consumer.next.bind(consumer) : noop,
47 error: err => finalize() && (consumer.error ? consumer.error(err) : errorFallback(err)),
48 complete: () => finalize() && consumer.complete && consumer.complete(),
49 isClosed
76 teardown((0, this._producer)({
77 next: next !== noop ? (v => isClosed() || next(v)) : noop,
78 error,
79 complete: () => finalize() && consumer.complete && consumer.complete()
50 80 }));
51 81
52 82 return { unsubscribe: finalize };
53 83 }
54 84
55 85 map<T2>(mapper: (value: T) => T2) {
56 86 return new ObservableImpl<T2>(({ next, ...rest }) =>
57 87 this._producer({
58 88 next: next !== noop ? (v: T) => next(mapper(v)) : noop,
59 89 ...rest
60 90 }));
61 91 }
62 92
63 93 tap({ next: tapNext, complete: tapComplete, error: tapError }: Observer<T>) {
64 return new ObservableImpl<T>(({ next, complete, error, isClosed }) =>
94 return new ObservableImpl<T>(({ next, complete, error }) =>
65 95 this._producer({
66 96 next: tapNext ? (v => (tapNext(v), next(v))) : next,
67 97 complete: tapComplete ? (() => (tapComplete(), complete())) : complete,
68 error: tapError ? (e => (tapError(e), error(e))) : error,
69 isClosed
98 error: tapError ? (e => (tapError(e), error(e))) : error
70 99 }));
71 100 }
72 101
73 102 filter(predicate: Predicate<T>) {
74 103 return new ObservableImpl<T>(({ next, ...rest }) =>
75 104 this._producer({
76 105 next: next !== noop ? (v: T) => predicate(v) ? next(v) : void (0) : noop,
77 106 ...rest
78 107 })
79 108 );
80 109 }
81 110
82 111 until(predicate: Predicate<T>) {
83 112 return new ObservableImpl<T>(({ next, complete, ...rest }) =>
84 113 this._producer({
85 114 next: v => predicate(v) ? complete() : next(v),
86 115 complete,
87 116 ...rest
88 117 })
89 118 );
90 119 }
91 120
92 121 while(predicate: Predicate<T>) {
93 122 return new ObservableImpl<T>(({ next, complete, ...rest }) =>
94 123 this._producer({
95 124 next: v => predicate(v) ? next(v) : complete(),
96 125 complete,
97 126 ...rest
98 127 })
99 128 );
100 129 }
101 130
131 scan<A>(accumulator: AccumulatorFn<T, A>, initial: A): Observable<A>;
132 scan(accumulator: AccumulatorFn<T, T>): Observable<T>;
102 133 scan<A>(...args: [AccumulatorFn<T, A>, A] | [AccumulatorFn<T, T>]) {
103 134 if (args.length === 1) {
104 135 return new ObservableImpl<T | A>(({ next, ...rest }) => {
105 136 const [accumulator] = args;
106 137 let _acc: T;
107 138 let index = 0;
108 139 return this._producer({
109 140 next: next !== noop ? (v: T) => next(index++ === 0 ? _acc = v : _acc = accumulator(_acc, v)) : noop,
110 141 ...rest
111 142 });
112 143 });
113 144 } else {
114 145 return new ObservableImpl<T | A>(({ next, ...rest }) => {
115 146 const [accumulator, initial] = args;
116 147 let _acc = initial;
117 148 return this._producer({
118 149 next: next !== noop ? (v: T) => next(_acc = accumulator(_acc, v)) : noop,
119 150 ...rest
120 151 });
121 152 });
122 153 }
123 154 }
124 155
156 reduce<A>(accumulator: AccumulatorFn<T, A>, initial: A): Observable<A>;
157 reduce(accumulator: AccumulatorFn<T, T>): Observable<T>;
125 158 reduce<A>(...args: [AccumulatorFn<T, A>, A] | [AccumulatorFn<T, T>]) {
126 159
127 160 if (args.length === 1) {
128 return new ObservableImpl<T | A>(({ next, complete, error, isClosed }) => {
161 return new ObservableImpl<T | A>(({ next, complete, error }) => {
129 162 const [accumulator] = args;
130 163 let _acc: T;
131 164 let index = 0;
132 165 return this._producer({
133 166 next: next !== noop ? (v: T) => {
134 167 _acc = index++ === 0 ? v : accumulator(_acc, v);
135 168 } : noop,
136 169 complete: () => {
137 170 if (index === 0) {
138 171 error(new Error("The sequence can't be empty"));
139 172 } else {
140 173 next(_acc);
141 174 complete();
142 175 }
143 176 },
144 error,
145 isClosed
177 error
146 178 });
147 179 });
148 180 } else {
149 return new ObservableImpl<T | A>(({ next, complete, error, isClosed }) => {
181 return new ObservableImpl<T | A>(({ next, complete, error }) => {
150 182 const [accumulator, initial] = args;
151 183 let _acc = initial;
152 184 return this._producer({
153 185 next: next !== noop ? (v: T) => {
154 186 _acc = accumulator(_acc, v);
155 187 } : noop,
156 188 complete: () => {
157 189 next(_acc);
158 190 complete();
159 191 },
160 error,
161 isClosed
192 error
162 193 });
163 194 });
164 195 }
165 196 }
166 197
167 198 cat(...seq: Subscribable<T>[]) {
168 199 return new ObservableImpl<T>(({ next, complete: final, ...rest }) => {
169 200 let cleanup: () => void;
170 201 const len = seq.length;
171 202 const complete = (i: number) => i < len ?
172 203 () => {
173 204 const subscription = seq[i].subscribe({ next, complete: complete(i + 1), ...rest });
174 205 cleanup = subscription.unsubscribe.bind(subscription);
175 206 } : final;
176 207
177 208 cleanup = this._producer({ next, complete: complete(0), ...rest }) ?? noop;
178 209
179 210 return () => cleanup();
180 211 });
181 212 }
182 213
183 214 pipe<U>(op: OperatorFn<T, U>): Observable<U> {
184 215 return op(this);
185 216 }
186 217
187 218 next(ct = Cancellation.none) {
188 219 return new Promise<T>((resolve, reject) => {
189 220 ct.throwIfRequested();
190 221
191 const { teardown, finalize, isClosed } = subscription();
192
193 const error = (err: unknown) => finalize() && reject(err);
194 const h = ct.register(error);
195 teardown(() => h.destroy());
222 const { teardown, finalize, isClosed, error } = subscription(reject, ct);
196 223 teardown((0, this._producer)({
197 224 next: (item: T) => finalize() && resolve(item),
198 225 complete: () => finalize() && reject(new Error("The sequence is empty")),
199 226 error,
200 227 isClosed
201 228 }));
202 229
203 230 });
204 231 }
205 232
206 233 collect(ct = Cancellation.none) {
207 234 return new Promise<T[]>((resolve, reject) => {
208 235 ct.throwIfRequested();
209 236
210 237 const acc: T[] = [];
211 238
212 const { teardown, finalize, isClosed } = subscription();
213 const error = (err: unknown) => finalize() && reject(err);
214
215 const h = ct.register(error);
216 teardown(() => h.destroy());
239 const { teardown, finalize, isClosed, error } = subscription(reject, ct);
217 240 teardown((0, this._producer)({
218 241 next: (item: T) => isClosed() ? void (0) : acc.push(item),
219 242 complete: () => finalize() && resolve(acc),
220 243 error,
221 244 isClosed
222 245 }));
223 246 });
224 247 }
225 248
249 forEach(next: (item: T) => void, ct = Cancellation.none): Promise<void> {
250 return new Promise<void>((resolve, reject) => {
251 ct.throwIfRequested();
252
253 const { teardown, finalize, isClosed, error } = subscription(reject, ct);
254
255 teardown((0, this._producer)({
256 next: item => !isClosed() && next(item),
257 complete: () => finalize() && resolve(),
258 error,
259 isClosed
260 }));
261 });
262 }
263
226 264 } No newline at end of file
@@ -1,156 +1,164
1 1 import { ICancellation } from "@implab/core-amd/interfaces";
2 2
3 3 /**
4 4 * The interface for the consumer of an observable sequence
5 5 */
6 6 export interface Observer<T> {
7 7 /**
8 8 * Called for the next element in the sequence
9 9 */
10 10 next?(value: T): void;
11 11
12 12 /**
13 13 * Called once when the error occurs in the sequence.
14 14 */
15 15 error?(e: unknown): void;
16 16
17 17 /**
18 18 * Called once at the end of the sequence.
19 19 */
20 20 complete?(): void;
21 21 }
22 22
23 23 /**
24 24 * The group of functions to feed an observable. These methods are provided to
25 25 * the producer to generate a stream of events.
26 26 */
27 27 export type Sink<T> = {
28 28 /**
29 29 * Call to send the next element in the sequence
30 30 */
31 31 next: (value: T) => void;
32 32
33 33 /**
34 34 * Call to notify about the error occurred in the sequence.
35 35 */
36 36 error: (e: unknown) => void;
37 37
38 38 /**
39 39 * Call to signal the end of the sequence.
40 40 */
41 41 complete: () => void;
42 42
43 43 /**
44 44 * Checks whether the sink is accepting new elements. It's safe to
45 45 * send elements to the closed sink.
46 46 *
47 47 * This method is useful to notify a producer while it's emitting the series
48 48 * of synchronous events.
49 49 */
50 50 isClosed: () => boolean;
51 51 };
52 52
53 53 export type Producer<T> = (sink: Sink<T>) => (void | (() => void));
54 54
55 export type FusedSink<T> = Omit<Sink<T>, "isClosed">;
56
57 export type FusedProducer<T> = (sink: FusedSink<T>) => (void | (() => void));
58
59 55 export interface Unsubscribable {
60 56 unsubscribe(): void;
61 57 }
62 58
63 59 export interface Subscribable<T> {
64 60 /** Subscribes a consumer to events. If a consumer isn't specified
65 61 * this method activates the producer to achieve side affects if any.
66 62 */
67 63 subscribe(consumer?: Observer<T>): Unsubscribable;
68 64 }
69 65
70 66 export type AccumulatorFn<T, A> = (acc: A, value: T) => A;
71 67
72 68 export type OperatorFn<T, U> = (source: Observable<T>) => Observable<U>;
73 69
74 70 /** The observable source of items. */
75 71 export interface Observable<T> extends Subscribable<T> {
76 72 /** Transforms elements of the sequence with the specified mapper
77 73 *
78 74 * @param mapper The mapper used to transform the values
79 75 */
80 76 map<T2>(mapper: (value: T) => T2): Observable<T2>;
81 77
82 78 /** Injects the specified observer into the each producer to consumer chain.
83 79 * The method is used to add side effect to the events processing.
84 80 *
85 81 * @param observer The consumer for the events
86 82 */
87 83 tap(observer: Observer<T>): Observable<T>;
88 84
89 85 /** Filters elements of the sequence. The resulting sequence will
90 86 * contain only elements which match the specified predicate.
91 87 *
92 88 * @param predicate The filter predicate.
93 89 */
94 90 filter(predicate: (value: T) => boolean): Observable<T>;
95 91
96 92 /** Completes the sequence once the condition is met.
97 93 * @param predicate The condition which should be met to complete the sequence
98 94 */
99 95 until(predicate: (value: T) => boolean): Observable<T>;
100 96
101 97 /** Keeps the sequence running while elements satisfy the condition.
102 98 *
103 99 * @param predicate The condition which should be met to continue.
104 100 */
105 101 while(predicate: (value: T) => boolean): Observable<T>;
106 102
107 103 /** Applies accumulator to each value in the sequence and
108 104 * emits the accumulated value for each source element
109 105 *
110 106 * @param accumulator
111 107 * @param initial
112 108 */
113 109 scan<A>(accumulator: AccumulatorFn<T, A>, initial: A): Observable<A>;
114 110 scan(accumulator: AccumulatorFn<T, T>): Observable<T>;
115 111
116 112 /** Applies accumulator to each value in the sequence and
117 113 * emits the accumulated value at the end of the sequence
118 114 *
119 115 * @param accumulator
120 116 * @param initial
121 117 */
122 118 reduce<A>(accumulator: AccumulatorFn<T, A>, initial: A): Observable<A>;
123 119 reduce(accumulator: AccumulatorFn<T, T>): Observable<T>;
124 120
125 121 /** Concatenates the specified sequences with this observable
126 122 *
127 123 * @param seq sequences to concatenate with the current observable
128 124 *
129 125 * The concatenation doesn't accumulate values from the specified sequences,
130 126 * The result of the concatenation is the new observable which will switch
131 127 * to the next observable after the previous one completes. Values emitted
132 128 * before the next observable being active are lost.
133 129 */
134 130 cat(...seq: Subscribable<T>[]): Observable<T>;
135 131
136 132
137 133 /** Pipes the specified operator to produce the new observable
138 134 * @param op The operator consumes this observable and produces a new one
139 135 *
140 136 * The operator is a higher order function which takes a source observable
141 137 * and returns a producer for the new observable.
142 138 *
143 139 * This function can be used to create a complex mapping between source and
144 140 * resulting observables. The operator may have a state (or a side effect)
145 141 * and can be connected to multiple observables.
146 142 */
147 143 pipe<U>(op: OperatorFn<T, U>): Observable<U>;
148 144
149 145 /** Waits for the next event to occur and returns a promise for the next value
150 146 * @param ct Cancellation token
151 147 */
152 148 next(ct?: ICancellation): Promise<T>;
153 149
154 150 /** Collects items of the sequence to the array. */
155 151 collect(ct?: ICancellation): Promise<T[]>;
152
153 /**
154 * Iterates through the elements in this observable until the end of the
155 * sequence is reached or the operation is cancelled.
156 *
157 * @param next The callback for the next item in the sequence
158 * @param ct The optional cancellation token for this operation
159 *
160 * @returns A promise which will be fulfilled when the operation is completed.
161 * In case of the cancellation this promise will be rejected.
162 */
163 forEach(next: (item: T) => void, ct?: ICancellation): Promise<void>;
156 164 } No newline at end of file
@@ -1,191 +1,194
1 1 import { empty, observe, of } from "./observable";
2 2 import * as tap from "tap";
3 3 import { Cancellation } from "@implab/core-amd/Cancellation";
4 4 import { delay } from "@implab/core-amd/safe";
5 5
6 const subj1 = observe<number>(({ next, complete }) => {
6 const subj1 = observe<number>(({ next, error, complete }) => {
7 7 next(1);
8 8 complete();
9 9 next(2);
10 next(3);
11 error(new Error("This error should be ignored"));
12 next(4);
10 13 });
11 14
12 15 const consumer1 = {
13 16 sum: 0,
14 17 next(v: number) {
15 18 this.sum += v;
16 19 }
17 20 };
18 21
19 22 subj1.subscribe(consumer1);
20 23 tap.equal(consumer1.sum, 1, "Should get only one value");
21 24
22 25 subj1.subscribe(consumer1);
23 26 tap.equal(consumer1.sum, 2, "Should get the value again");
24 27
25 28 const consumer2 = {
26 29 value: 0,
27 30 completed: false,
28 31 next(v: number) { this.value = v; },
29 32 complete() { this.completed = true; }
30 33 };
31 34
32 35 let maps = 0;
33 36
34 37 subj1
35 38 .map(v => {
36 39 tap.comment(`map1: ${v * 2}`);
37 40 maps++;
38 41 return v * 2;
39 42 })
40 43 .map(v => {
41 44 tap.comment(`map2: ${v * 2}`);
42 45 maps++;
43 46 return v * 2;
44 47 })
45 48 .map(v => {
46 49 tap.comment(`map3: ${v * 2}`);
47 50 maps++;
48 51 return v * 2;
49 52 })
50 53 .subscribe(consumer2);
51 54
52 55 tap.equal(consumer2.value, 8, "Should map");
53 56 tap.equal(maps, 3, "The map chain should not be executed after completion");
54 57 tap.ok(consumer2.completed, "The completion signal should pass through");
55 58
56 59 const subj2 = observe<number>(({ next, complete }) => {
57 60 [1, 2, 3, 4, 5].forEach(next);
58 61 complete();
59 62 return () => {
60 63 tap.comment("subj2: unsubscribe");
61 64 };
62 65 });
63 66
64 67 const consumer3 = {
65 68 even: 0,
66 69 odd: 0,
67 70 completed: false,
68 71 subscribed: 0,
69 72 unsubscribed: 0,
70 73 next(v: "even" | "odd") {
71 74 this[v]++;
72 75 },
73 76 complete() {
74 77 this.completed = true;
75 78 },
76 79 subscribe() {
77 80 this.subscribed++;
78 81 },
79 82 unsubscribe() {
80 83 this.unsubscribed++;
81 84 }
82 85 };
83 86
84 87
85 88 const subj3 = subj2.pipe<"even" | "odd">(self => observe(({ next, complete, error }) => {
86 89 consumer3.subscribe();
87 90 let count = 0;
88 91 const h = self.subscribe({
89 92 next: val => {
90 93 if (val % 2 === 0)
91 94 next("odd");
92 95 else
93 96 next("even");
94 97 if (++count === 4)
95 98 complete();
96 99 },
97 100 complete,
98 101 error
99 102 });
100 103 return () => {
101 104 consumer3.unsubscribe();
102 105 h.unsubscribe();
103 106 };
104 107 }));
105 108
106 109 subj3.subscribe(consumer3);
107 110
108 111 tap.equal(consumer3.odd, 2, "Should get 2 odd elements");
109 112 tap.equal(consumer3.even, 2, "Should get 2 even elements");
110 113 tap.ok(consumer3.completed, "The sequence should completed");
111 114 tap.equal(consumer3.subscribed, 1, "The subscription should be done once");
112 115 tap.equal(consumer3.unsubscribed, 1, "The cleanup should be done after completion");
113 116
114 117 subj2.reduce((a, b) => a + b).subscribe({
115 118 next: val => tap.comment("subj2: reduce =", val),
116 119 complete: () => tap.comment("subj2: complete")
117 120 });
118 121
119 122 tap.test("of(...) tests", async t => {
120 123 await subj2.reduce((a, b) => a + b).next()
121 124 .then(value => t.comment("subj2: next reduce=", value));
122 125
123 126 await subj2.next().then(val => t.equal(val, 1, "Should peek the first element"));
124 127
125 128 const cancelled = new Cancellation(cancel => cancel());
126 129 await t.rejects(subj2.next(cancelled), "Cancelled next() method should fail");
127 130
128 131 await t.rejects(empty.next(), "Empty sequence should fail to get next element");
129 132
130 133 await of(delay(1).then(() => 1), Promise.resolve(2), 3)
131 134 .reduce<number[]>((a, x) => [...a, x], [])
132 135 .next()
133 136 .then(res => t.same(res, [1, 2, 3], "of(...) should keep the order"));
134 137
135 138 const rejected = Promise.reject("DIE!");
136 139 rejected.catch(() => { }); // SAFE AND SOUND
137 140
138 141 await t.resolves(
139 142 of(Promise.resolve(1), rejected).next(),
140 143 "of(...) should emit non-rejected items"
141 144 );
142 145 await t.rejects(
143 146 of(1, Promise.reject("DIE!")).reduce((a) => a).next(),
144 147 "of(...) should terminate with error when a parameter is rejected"
145 148 );
146 149
147 150 t.same(await of(1, 2, 3).collect(), [1, 2, 3], ".collect() should return the collected sequence");
148 151 await t.rejects(of(1, 2, 3).collect(cancelled), ".collect() should support cancellation");
149 152
150 153 }).catch(() => { });
151 154
152 155 tap.test(".tap() tests", async t => {
153 156 const side: number[] = [];
154 157
155 158 of(1, 2)
156 159 .tap({ next: v => side.push(v), complete: () => side.push(0) })
157 160 .tap({ next: v => side.push(v * v) })
158 161 .subscribe({});
159 162
160 163 t.same(side, [1, 1, 2, 4, 0], ".tap() should be called in the order of registration");
161 164
162 165 side.length = 0;
163 166
164 167 await new Promise<void>(resolve => {
165 168 of(1, 2, delay(1).then(() => 3))
166 169 .tap({ next: v => side.push(v) })
167 170 .tap({ next: v => v === 1 && resolve() })
168 171 .subscribe({});
169 172 });
170 173
171 174 t.same(side, [1, 2], ".tap() should be processed synchronously");
172 175
173 176 }).catch(() => { });
174 177
175 178 tap.test(".while() tests", async t => {
176 179
177 180 const seq = of(1, 2, 3, 4).while(v => v <= 2);
178 181
179 182 t.same(await seq.collect(), [1, 2], "Should collect only taken elements");
180 183
181 184 const data: number[] = [];
182 185 let complete = 0;
183 186 seq.subscribe({
184 187 next: v => data.push(v),
185 188 complete: () => complete++
186 189 });
187 190
188 191 t.same(data, [1, 2], "Should receive only taken elements");
189 192 t.equal(complete, 1, "Complete should run once");
190 193
191 194 }).catch(() => { }); No newline at end of file
General Comments 0
You need to be logged in to leave comments. Login now