##// END OF EJS Templates
WIP observables
cin -
r157:f9518367061a default
parent child
Show More
@@ -1,105 +1,69
1 import { isPromise } from "@implab/core-amd/safe";
1 import { isPromise } from "@implab/core-amd/safe";
2 import { Unsubscribable, Producer, FusedSink, Observable, Subscribable } from "./observable/interfaces";
2 import { Unsubscribable, Producer, Observable, Subscribable } from "./observable/interfaces";
3 import { ObservableImpl } from "./observable/ObservableImpl";
3 import { ObservableImpl } from "./observable/ObservableImpl";
4
4
5 export const isUnsubscribable = (v: unknown): v is Unsubscribable =>
5 export const isUnsubscribable = (v: unknown): v is Unsubscribable =>
6 v !== null && v !== undefined && typeof (v as Unsubscribable).unsubscribe === "function";
6 v !== null && v !== undefined && typeof (v as Unsubscribable).unsubscribe === "function";
7
7
8 export const isSubscribable = <T = unknown>(v: unknown): v is Subscribable<T> =>
8 export const isSubscribable = <T = unknown>(v: unknown): v is Subscribable<T> =>
9 v !== null && v !== undefined && typeof (v as Subscribable<unknown>).subscribe === "function";
9 v !== null && v !== undefined && typeof (v as Subscribable<unknown>).subscribe === "function";
10
10
11 const noop = () => { };
11 export const observe = <T>(producer: Producer<T>): Observable<T> => new ObservableImpl<T>(producer);
12
13 /** Wraps the producer to handle tear down logic and subscription management
14 *
15 * The resulting producer will invoke cleanup logic on error or complete events
16 * and will prevent calling of any method from the sink.
17 *
18 * @param producer The producer to wrap
19 * @returns The wrapper producer
20 */
21 const fuse = <T>(producer: Producer<T>) => ({ next, error, complete }: FusedSink<T>) => {
22 let done = false;
23 let cleanup = noop;
24
25 const _fin = <A extends unknown[]>(fn: (...args: A) => void) =>
26 (...args: A) => done ?
27 void (0) :
28 (done = true, cleanup(), fn(...args));
29
30 const _fin0 = () => done ? void (0) : (done = true, cleanup());
31
32 const safeSink = {
33 next: (value: T) => { !done && next(value); },
34 error: _fin(error),
35 complete: _fin(complete),
36 isClosed: () => done
37 };
38 // call the producer
39 cleanup = producer(safeSink) ?? noop;
40 // if the producer throws exception bypass it to the caller rather then to
41 // the sink. This is a feature.
42
43 // if the producer completed the sequence immediately call the cleanup in place
44 return done ? cleanup() : _fin0;
45 };
46
47 export const observe = <T>(producer: Producer<T>): Observable<T> => new ObservableImpl<T>(fuse(producer));
48
12
49 /** Converts an array to the observable sequence of its elements. */
13 /** Converts an array to the observable sequence of its elements. */
50 export const ofArray = <T>(items: T[]) => new ObservableImpl<T>(
14 export const ofArray = <T>(items: T[]): Observable<T> => new ObservableImpl<T>(
51 ({ next, complete }) => (
15 ({ next, complete }) => (
52 items.forEach(next),
16 items.forEach(next),
53 complete()
17 complete()
54 )
18 )
55 );
19 );
56
20
57 /** Converts a subscribable to the observable */
21 /** Converts a subscribable to the observable */
58 export const ofSubscribable = <T>(subscribable: Subscribable<T>) =>
22 export const ofSubscribable = <T>(subscribable: Subscribable<T>) =>
59 observe<T>(sink => {
23 observe<T>(sink => {
60 const subscription = subscribable.subscribe(sink);
24 const subscription = subscribable.subscribe(sink);
61 return () => subscription.unsubscribe();
25 return () => subscription.unsubscribe();
62 });
26 });
63
27
64 const of1 = <T>(item: T | PromiseLike<T>) => observe<T>(
28 const of1 = <T>(item: T | PromiseLike<T>) => observe<T>(
65 ({ next, error, complete }) =>
29 ({ next, error, complete }) =>
66 isPromise(item) ?
30 isPromise(item) ?
67 void item.then(
31 void item.then(
68 v => (next(v), complete()),
32 v => (next(v), complete()),
69 error
33 error
70 ) :
34 ) :
71 (next(item), complete())
35 (next(item), complete())
72 );
36 );
73
37
74 /** Converts a list of parameter values to the observable sequence. The
38 /** Converts a list of parameter values to the observable sequence. The
75 * order of elements in the list will be preserved in the resulting sequence.
39 * order of elements in the list will be preserved in the resulting sequence.
76 */
40 */
77 export const of = <T>(...items: (T | PromiseLike<T>)[]) => items.length === 1 ?
41 export const of = <T>(...items: (T | PromiseLike<T>)[]) => items.length === 1 ?
78 of1(items[0]) :
42 of1(items[0]) :
79 observe<T>(
43 observe<T>(
80 ({ next, error, complete, isClosed }) => {
44 ({ next, error, complete, isClosed }) => {
81 const n = items.length;
45 const n = items.length;
82
46
83 const _next = (start: number) => {
47 const _next = (start: number) => {
84 if (start > 0 && isClosed()) // when resumed
48 if (start > 0 && isClosed()) // when resumed
85 return;
49 return;
86
50
87 for (let i = start; i < n; i++) {
51 for (let i = start; i < n; i++) {
88 const r = items[i];
52 const r = items[i];
89 if (isPromise(r)) {
53 if (isPromise(r)) {
90 r.then(v => (next(v), _next(i + 1)), error);
54 r.then(v => (next(v), _next(i + 1)), error);
91 return; // suspend
55 return; // suspend
92 } else {
56 } else {
93 next(r);
57 next(r);
94 }
58 }
95 }
59 }
96 complete();
60 complete();
97 };
61 };
98
62
99 _next(0);
63 _next(0);
100 }
64 }
101 );
65 );
102
66
103 export const empty = new ObservableImpl<never>(({ complete }) => complete());
67 export const empty: Observable<never> = new ObservableImpl<never>(({ complete }) => complete());
104
68
105 export type * from "./observable/interfaces"; No newline at end of file
69 export type * from "./observable/interfaces";
@@ -1,226 +1,264
1 import { id as mid } from "module";
1 import { id as mid } from "module";
2 import { AccumulatorFn, Observable, Observer, OperatorFn, Producer, Subscribable } from "./interfaces";
2 import { AccumulatorFn, Observable, Observer, OperatorFn, Producer, Sink, Subscribable, Unsubscribable } from "./interfaces";
3 import { TraceSource } from "@implab/core-amd/log/TraceSource";
3 import { TraceSource } from "@implab/core-amd/log/TraceSource";
4 import { Predicate } from "@implab/core-amd/interfaces";
4 import { Predicate } from "@implab/core-amd/interfaces";
5 import { Cancellation } from "@implab/core-amd/Cancellation";
5 import { Cancellation } from "@implab/core-amd/Cancellation";
6
6
7 const trace = TraceSource.get(mid);
7 const trace = TraceSource.get(mid);
8
8
9 const noop = () => { };
9 const noop = () => { };
10
10
11 const errorFallback = (e: unknown) => trace.error("Unhandled observable error: {0}", e);
11 const errorFallback = (e: unknown) => trace.error("Unhandled observable error: {0}", e);
12
12
13 const subscription = () => {
13 const subscription = (reject: (err: unknown) => void, ct = Cancellation.none) => {
14 let done = false;
14 let done = false;
15 let cleanup = noop;
15 let cleanup = noop;
16
16
17 const finalize = () => done ? false : (cleanup(), done = true);
17 const finalize = () => done ? false : (cleanup(), done = true);
18 const isClosed = () => done;
18 const isClosed = () => done;
19
19
20 const teardown = (cb: void | (() => void)) => {
20 const teardown = (cb: void | (() => void)) => {
21 if (cb) {
21 if (cb) {
22 if (done) {
22 if (done) {
23 cb();
23 cb();
24 } else {
24 } else {
25 const _prev = cleanup;
25 const _prev = cleanup;
26 cleanup = () => (_prev(), cb());
26 cleanup = _prev === noop ? cb : () => (_prev(), cb());
27 }
27 }
28 }
28 }
29 };
29 };
30
30
31 return { finalize, isClosed, teardown };
31 const error = (err: unknown) => finalize() && reject(err);
32
33 if (ct.isSupported()) {
34 const h = ct.register(error);
35 teardown(() => h.destroy());
36 }
37
38 return { finalize, isClosed, teardown, error };
32 };
39 };
33
40
41 type FusedSink<T> = Omit<Sink<T>, "isClosed">;
42
43 export const fuse = <T>(producer: Producer<T>) => (consumer: FusedSink<T>) => {
44
45 const reject = (err: unknown) => consumer.error ? consumer.error(err) : errorFallback(err);
46 const { teardown, isClosed, finalize, error } = subscription(reject);
47
48 const next = consumer.next ? consumer.next.bind(consumer) : noop;
49
50 teardown(producer({
51 next: next !== noop ? (v => isClosed() || next(v)) : noop,
52 error,
53 complete: () => finalize() && consumer.complete && consumer.complete(),
54 isClosed
55 }));
56
57 return { unsubscribe: finalize };
58 };
59
60 type FusedProducer<T> = (consumer: FusedSink<T>) => void | (() => void);
61
34 export class ObservableImpl<T> implements Observable<T> {
62 export class ObservableImpl<T> implements Observable<T> {
35
63
36 private readonly _producer: Producer<T>;
64 private readonly _producer: FusedProducer<T>;
37
65
38 constructor(producer: Producer<T>) {
66 constructor(producer: FusedProducer<T>) {
39 this._producer = producer;
67 this._producer = producer;
40 }
68 }
41
69
42 subscribe(consumer: Observer<T> = {}) {
70 subscribe(consumer: Observer<T> = {}) {
43 const { teardown, isClosed, finalize } = subscription();
71 const reject = (err: unknown) => consumer.error ? consumer.error(err) : errorFallback(err);
72 const next = consumer.next ? consumer.next.bind(consumer) : noop;
73
74 const { teardown, isClosed, finalize, error } = subscription(reject);
44
75
45 teardown(this._producer({
76 teardown((0, this._producer)({
46 next: consumer.next ? consumer.next.bind(consumer) : noop,
77 next: next !== noop ? (v => isClosed() || next(v)) : noop,
47 error: err => finalize() && (consumer.error ? consumer.error(err) : errorFallback(err)),
78 error,
48 complete: () => finalize() && consumer.complete && consumer.complete(),
79 complete: () => finalize() && consumer.complete && consumer.complete()
49 isClosed
50 }));
80 }));
51
81
52 return { unsubscribe: finalize };
82 return { unsubscribe: finalize };
53 }
83 }
54
84
55 map<T2>(mapper: (value: T) => T2) {
85 map<T2>(mapper: (value: T) => T2) {
56 return new ObservableImpl<T2>(({ next, ...rest }) =>
86 return new ObservableImpl<T2>(({ next, ...rest }) =>
57 this._producer({
87 this._producer({
58 next: next !== noop ? (v: T) => next(mapper(v)) : noop,
88 next: next !== noop ? (v: T) => next(mapper(v)) : noop,
59 ...rest
89 ...rest
60 }));
90 }));
61 }
91 }
62
92
63 tap({ next: tapNext, complete: tapComplete, error: tapError }: Observer<T>) {
93 tap({ next: tapNext, complete: tapComplete, error: tapError }: Observer<T>) {
64 return new ObservableImpl<T>(({ next, complete, error, isClosed }) =>
94 return new ObservableImpl<T>(({ next, complete, error }) =>
65 this._producer({
95 this._producer({
66 next: tapNext ? (v => (tapNext(v), next(v))) : next,
96 next: tapNext ? (v => (tapNext(v), next(v))) : next,
67 complete: tapComplete ? (() => (tapComplete(), complete())) : complete,
97 complete: tapComplete ? (() => (tapComplete(), complete())) : complete,
68 error: tapError ? (e => (tapError(e), error(e))) : error,
98 error: tapError ? (e => (tapError(e), error(e))) : error
69 isClosed
70 }));
99 }));
71 }
100 }
72
101
73 filter(predicate: Predicate<T>) {
102 filter(predicate: Predicate<T>) {
74 return new ObservableImpl<T>(({ next, ...rest }) =>
103 return new ObservableImpl<T>(({ next, ...rest }) =>
75 this._producer({
104 this._producer({
76 next: next !== noop ? (v: T) => predicate(v) ? next(v) : void (0) : noop,
105 next: next !== noop ? (v: T) => predicate(v) ? next(v) : void (0) : noop,
77 ...rest
106 ...rest
78 })
107 })
79 );
108 );
80 }
109 }
81
110
82 until(predicate: Predicate<T>) {
111 until(predicate: Predicate<T>) {
83 return new ObservableImpl<T>(({ next, complete, ...rest }) =>
112 return new ObservableImpl<T>(({ next, complete, ...rest }) =>
84 this._producer({
113 this._producer({
85 next: v => predicate(v) ? complete() : next(v),
114 next: v => predicate(v) ? complete() : next(v),
86 complete,
115 complete,
87 ...rest
116 ...rest
88 })
117 })
89 );
118 );
90 }
119 }
91
120
92 while(predicate: Predicate<T>) {
121 while(predicate: Predicate<T>) {
93 return new ObservableImpl<T>(({ next, complete, ...rest }) =>
122 return new ObservableImpl<T>(({ next, complete, ...rest }) =>
94 this._producer({
123 this._producer({
95 next: v => predicate(v) ? next(v) : complete(),
124 next: v => predicate(v) ? next(v) : complete(),
96 complete,
125 complete,
97 ...rest
126 ...rest
98 })
127 })
99 );
128 );
100 }
129 }
101
130
131 scan<A>(accumulator: AccumulatorFn<T, A>, initial: A): Observable<A>;
132 scan(accumulator: AccumulatorFn<T, T>): Observable<T>;
102 scan<A>(...args: [AccumulatorFn<T, A>, A] | [AccumulatorFn<T, T>]) {
133 scan<A>(...args: [AccumulatorFn<T, A>, A] | [AccumulatorFn<T, T>]) {
103 if (args.length === 1) {
134 if (args.length === 1) {
104 return new ObservableImpl<T | A>(({ next, ...rest }) => {
135 return new ObservableImpl<T | A>(({ next, ...rest }) => {
105 const [accumulator] = args;
136 const [accumulator] = args;
106 let _acc: T;
137 let _acc: T;
107 let index = 0;
138 let index = 0;
108 return this._producer({
139 return this._producer({
109 next: next !== noop ? (v: T) => next(index++ === 0 ? _acc = v : _acc = accumulator(_acc, v)) : noop,
140 next: next !== noop ? (v: T) => next(index++ === 0 ? _acc = v : _acc = accumulator(_acc, v)) : noop,
110 ...rest
141 ...rest
111 });
142 });
112 });
143 });
113 } else {
144 } else {
114 return new ObservableImpl<T | A>(({ next, ...rest }) => {
145 return new ObservableImpl<T | A>(({ next, ...rest }) => {
115 const [accumulator, initial] = args;
146 const [accumulator, initial] = args;
116 let _acc = initial;
147 let _acc = initial;
117 return this._producer({
148 return this._producer({
118 next: next !== noop ? (v: T) => next(_acc = accumulator(_acc, v)) : noop,
149 next: next !== noop ? (v: T) => next(_acc = accumulator(_acc, v)) : noop,
119 ...rest
150 ...rest
120 });
151 });
121 });
152 });
122 }
153 }
123 }
154 }
124
155
156 reduce<A>(accumulator: AccumulatorFn<T, A>, initial: A): Observable<A>;
157 reduce(accumulator: AccumulatorFn<T, T>): Observable<T>;
125 reduce<A>(...args: [AccumulatorFn<T, A>, A] | [AccumulatorFn<T, T>]) {
158 reduce<A>(...args: [AccumulatorFn<T, A>, A] | [AccumulatorFn<T, T>]) {
126
159
127 if (args.length === 1) {
160 if (args.length === 1) {
128 return new ObservableImpl<T | A>(({ next, complete, error, isClosed }) => {
161 return new ObservableImpl<T | A>(({ next, complete, error }) => {
129 const [accumulator] = args;
162 const [accumulator] = args;
130 let _acc: T;
163 let _acc: T;
131 let index = 0;
164 let index = 0;
132 return this._producer({
165 return this._producer({
133 next: next !== noop ? (v: T) => {
166 next: next !== noop ? (v: T) => {
134 _acc = index++ === 0 ? v : accumulator(_acc, v);
167 _acc = index++ === 0 ? v : accumulator(_acc, v);
135 } : noop,
168 } : noop,
136 complete: () => {
169 complete: () => {
137 if (index === 0) {
170 if (index === 0) {
138 error(new Error("The sequence can't be empty"));
171 error(new Error("The sequence can't be empty"));
139 } else {
172 } else {
140 next(_acc);
173 next(_acc);
141 complete();
174 complete();
142 }
175 }
143 },
176 },
144 error,
177 error
145 isClosed
146 });
178 });
147 });
179 });
148 } else {
180 } else {
149 return new ObservableImpl<T | A>(({ next, complete, error, isClosed }) => {
181 return new ObservableImpl<T | A>(({ next, complete, error }) => {
150 const [accumulator, initial] = args;
182 const [accumulator, initial] = args;
151 let _acc = initial;
183 let _acc = initial;
152 return this._producer({
184 return this._producer({
153 next: next !== noop ? (v: T) => {
185 next: next !== noop ? (v: T) => {
154 _acc = accumulator(_acc, v);
186 _acc = accumulator(_acc, v);
155 } : noop,
187 } : noop,
156 complete: () => {
188 complete: () => {
157 next(_acc);
189 next(_acc);
158 complete();
190 complete();
159 },
191 },
160 error,
192 error
161 isClosed
162 });
193 });
163 });
194 });
164 }
195 }
165 }
196 }
166
197
167 cat(...seq: Subscribable<T>[]) {
198 cat(...seq: Subscribable<T>[]) {
168 return new ObservableImpl<T>(({ next, complete: final, ...rest }) => {
199 return new ObservableImpl<T>(({ next, complete: final, ...rest }) => {
169 let cleanup: () => void;
200 let cleanup: () => void;
170 const len = seq.length;
201 const len = seq.length;
171 const complete = (i: number) => i < len ?
202 const complete = (i: number) => i < len ?
172 () => {
203 () => {
173 const subscription = seq[i].subscribe({ next, complete: complete(i + 1), ...rest });
204 const subscription = seq[i].subscribe({ next, complete: complete(i + 1), ...rest });
174 cleanup = subscription.unsubscribe.bind(subscription);
205 cleanup = subscription.unsubscribe.bind(subscription);
175 } : final;
206 } : final;
176
207
177 cleanup = this._producer({ next, complete: complete(0), ...rest }) ?? noop;
208 cleanup = this._producer({ next, complete: complete(0), ...rest }) ?? noop;
178
209
179 return () => cleanup();
210 return () => cleanup();
180 });
211 });
181 }
212 }
182
213
183 pipe<U>(op: OperatorFn<T, U>): Observable<U> {
214 pipe<U>(op: OperatorFn<T, U>): Observable<U> {
184 return op(this);
215 return op(this);
185 }
216 }
186
217
187 next(ct = Cancellation.none) {
218 next(ct = Cancellation.none) {
188 return new Promise<T>((resolve, reject) => {
219 return new Promise<T>((resolve, reject) => {
189 ct.throwIfRequested();
220 ct.throwIfRequested();
190
221
191 const { teardown, finalize, isClosed } = subscription();
222 const { teardown, finalize, isClosed, error } = subscription(reject, ct);
192
193 const error = (err: unknown) => finalize() && reject(err);
194 const h = ct.register(error);
195 teardown(() => h.destroy());
196 teardown((0, this._producer)({
223 teardown((0, this._producer)({
197 next: (item: T) => finalize() && resolve(item),
224 next: (item: T) => finalize() && resolve(item),
198 complete: () => finalize() && reject(new Error("The sequence is empty")),
225 complete: () => finalize() && reject(new Error("The sequence is empty")),
199 error,
226 error,
200 isClosed
227 isClosed
201 }));
228 }));
202
229
203 });
230 });
204 }
231 }
205
232
206 collect(ct = Cancellation.none) {
233 collect(ct = Cancellation.none) {
207 return new Promise<T[]>((resolve, reject) => {
234 return new Promise<T[]>((resolve, reject) => {
208 ct.throwIfRequested();
235 ct.throwIfRequested();
209
236
210 const acc: T[] = [];
237 const acc: T[] = [];
211
238
212 const { teardown, finalize, isClosed } = subscription();
239 const { teardown, finalize, isClosed, error } = subscription(reject, ct);
213 const error = (err: unknown) => finalize() && reject(err);
214
215 const h = ct.register(error);
216 teardown(() => h.destroy());
217 teardown((0, this._producer)({
240 teardown((0, this._producer)({
218 next: (item: T) => isClosed() ? void (0) : acc.push(item),
241 next: (item: T) => isClosed() ? void (0) : acc.push(item),
219 complete: () => finalize() && resolve(acc),
242 complete: () => finalize() && resolve(acc),
220 error,
243 error,
221 isClosed
244 isClosed
222 }));
245 }));
223 });
246 });
224 }
247 }
225
248
249 forEach(next: (item: T) => void, ct = Cancellation.none): Promise<void> {
250 return new Promise<void>((resolve, reject) => {
251 ct.throwIfRequested();
252
253 const { teardown, finalize, isClosed, error } = subscription(reject, ct);
254
255 teardown((0, this._producer)({
256 next: item => !isClosed() && next(item),
257 complete: () => finalize() && resolve(),
258 error,
259 isClosed
260 }));
261 });
262 }
263
226 } No newline at end of file
264 }
@@ -1,156 +1,164
1 import { ICancellation } from "@implab/core-amd/interfaces";
1 import { ICancellation } from "@implab/core-amd/interfaces";
2
2
3 /**
3 /**
4 * The interface for the consumer of an observable sequence
4 * The interface for the consumer of an observable sequence
5 */
5 */
6 export interface Observer<T> {
6 export interface Observer<T> {
7 /**
7 /**
8 * Called for the next element in the sequence
8 * Called for the next element in the sequence
9 */
9 */
10 next?(value: T): void;
10 next?(value: T): void;
11
11
12 /**
12 /**
13 * Called once when the error occurs in the sequence.
13 * Called once when the error occurs in the sequence.
14 */
14 */
15 error?(e: unknown): void;
15 error?(e: unknown): void;
16
16
17 /**
17 /**
18 * Called once at the end of the sequence.
18 * Called once at the end of the sequence.
19 */
19 */
20 complete?(): void;
20 complete?(): void;
21 }
21 }
22
22
23 /**
23 /**
24 * The group of functions to feed an observable. These methods are provided to
24 * The group of functions to feed an observable. These methods are provided to
25 * the producer to generate a stream of events.
25 * the producer to generate a stream of events.
26 */
26 */
27 export type Sink<T> = {
27 export type Sink<T> = {
28 /**
28 /**
29 * Call to send the next element in the sequence
29 * Call to send the next element in the sequence
30 */
30 */
31 next: (value: T) => void;
31 next: (value: T) => void;
32
32
33 /**
33 /**
34 * Call to notify about the error occurred in the sequence.
34 * Call to notify about the error occurred in the sequence.
35 */
35 */
36 error: (e: unknown) => void;
36 error: (e: unknown) => void;
37
37
38 /**
38 /**
39 * Call to signal the end of the sequence.
39 * Call to signal the end of the sequence.
40 */
40 */
41 complete: () => void;
41 complete: () => void;
42
42
43 /**
43 /**
44 * Checks whether the sink is accepting new elements. It's safe to
44 * Checks whether the sink is accepting new elements. It's safe to
45 * send elements to the closed sink.
45 * send elements to the closed sink.
46 *
46 *
47 * This method is useful to notify a producer while it's emitting the series
47 * This method is useful to notify a producer while it's emitting the series
48 * of synchronous events.
48 * of synchronous events.
49 */
49 */
50 isClosed: () => boolean;
50 isClosed: () => boolean;
51 };
51 };
52
52
53 export type Producer<T> = (sink: Sink<T>) => (void | (() => void));
53 export type Producer<T> = (sink: Sink<T>) => (void | (() => void));
54
54
55 export type FusedSink<T> = Omit<Sink<T>, "isClosed">;
56
57 export type FusedProducer<T> = (sink: FusedSink<T>) => (void | (() => void));
58
59 export interface Unsubscribable {
55 export interface Unsubscribable {
60 unsubscribe(): void;
56 unsubscribe(): void;
61 }
57 }
62
58
63 export interface Subscribable<T> {
59 export interface Subscribable<T> {
64 /** Subscribes a consumer to events. If a consumer isn't specified
60 /** Subscribes a consumer to events. If a consumer isn't specified
65 * this method activates the producer to achieve side affects if any.
61 * this method activates the producer to achieve side affects if any.
66 */
62 */
67 subscribe(consumer?: Observer<T>): Unsubscribable;
63 subscribe(consumer?: Observer<T>): Unsubscribable;
68 }
64 }
69
65
70 export type AccumulatorFn<T, A> = (acc: A, value: T) => A;
66 export type AccumulatorFn<T, A> = (acc: A, value: T) => A;
71
67
72 export type OperatorFn<T, U> = (source: Observable<T>) => Observable<U>;
68 export type OperatorFn<T, U> = (source: Observable<T>) => Observable<U>;
73
69
74 /** The observable source of items. */
70 /** The observable source of items. */
75 export interface Observable<T> extends Subscribable<T> {
71 export interface Observable<T> extends Subscribable<T> {
76 /** Transforms elements of the sequence with the specified mapper
72 /** Transforms elements of the sequence with the specified mapper
77 *
73 *
78 * @param mapper The mapper used to transform the values
74 * @param mapper The mapper used to transform the values
79 */
75 */
80 map<T2>(mapper: (value: T) => T2): Observable<T2>;
76 map<T2>(mapper: (value: T) => T2): Observable<T2>;
81
77
82 /** Injects the specified observer into the each producer to consumer chain.
78 /** Injects the specified observer into the each producer to consumer chain.
83 * The method is used to add side effect to the events processing.
79 * The method is used to add side effect to the events processing.
84 *
80 *
85 * @param observer The consumer for the events
81 * @param observer The consumer for the events
86 */
82 */
87 tap(observer: Observer<T>): Observable<T>;
83 tap(observer: Observer<T>): Observable<T>;
88
84
89 /** Filters elements of the sequence. The resulting sequence will
85 /** Filters elements of the sequence. The resulting sequence will
90 * contain only elements which match the specified predicate.
86 * contain only elements which match the specified predicate.
91 *
87 *
92 * @param predicate The filter predicate.
88 * @param predicate The filter predicate.
93 */
89 */
94 filter(predicate: (value: T) => boolean): Observable<T>;
90 filter(predicate: (value: T) => boolean): Observable<T>;
95
91
96 /** Completes the sequence once the condition is met.
92 /** Completes the sequence once the condition is met.
97 * @param predicate The condition which should be met to complete the sequence
93 * @param predicate The condition which should be met to complete the sequence
98 */
94 */
99 until(predicate: (value: T) => boolean): Observable<T>;
95 until(predicate: (value: T) => boolean): Observable<T>;
100
96
101 /** Keeps the sequence running while elements satisfy the condition.
97 /** Keeps the sequence running while elements satisfy the condition.
102 *
98 *
103 * @param predicate The condition which should be met to continue.
99 * @param predicate The condition which should be met to continue.
104 */
100 */
105 while(predicate: (value: T) => boolean): Observable<T>;
101 while(predicate: (value: T) => boolean): Observable<T>;
106
102
107 /** Applies accumulator to each value in the sequence and
103 /** Applies accumulator to each value in the sequence and
108 * emits the accumulated value for each source element
104 * emits the accumulated value for each source element
109 *
105 *
110 * @param accumulator
106 * @param accumulator
111 * @param initial
107 * @param initial
112 */
108 */
113 scan<A>(accumulator: AccumulatorFn<T, A>, initial: A): Observable<A>;
109 scan<A>(accumulator: AccumulatorFn<T, A>, initial: A): Observable<A>;
114 scan(accumulator: AccumulatorFn<T, T>): Observable<T>;
110 scan(accumulator: AccumulatorFn<T, T>): Observable<T>;
115
111
116 /** Applies accumulator to each value in the sequence and
112 /** Applies accumulator to each value in the sequence and
117 * emits the accumulated value at the end of the sequence
113 * emits the accumulated value at the end of the sequence
118 *
114 *
119 * @param accumulator
115 * @param accumulator
120 * @param initial
116 * @param initial
121 */
117 */
122 reduce<A>(accumulator: AccumulatorFn<T, A>, initial: A): Observable<A>;
118 reduce<A>(accumulator: AccumulatorFn<T, A>, initial: A): Observable<A>;
123 reduce(accumulator: AccumulatorFn<T, T>): Observable<T>;
119 reduce(accumulator: AccumulatorFn<T, T>): Observable<T>;
124
120
125 /** Concatenates the specified sequences with this observable
121 /** Concatenates the specified sequences with this observable
126 *
122 *
127 * @param seq sequences to concatenate with the current observable
123 * @param seq sequences to concatenate with the current observable
128 *
124 *
129 * The concatenation doesn't accumulate values from the specified sequences,
125 * The concatenation doesn't accumulate values from the specified sequences,
130 * The result of the concatenation is the new observable which will switch
126 * The result of the concatenation is the new observable which will switch
131 * to the next observable after the previous one completes. Values emitted
127 * to the next observable after the previous one completes. Values emitted
132 * before the next observable being active are lost.
128 * before the next observable being active are lost.
133 */
129 */
134 cat(...seq: Subscribable<T>[]): Observable<T>;
130 cat(...seq: Subscribable<T>[]): Observable<T>;
135
131
136
132
137 /** Pipes the specified operator to produce the new observable
133 /** Pipes the specified operator to produce the new observable
138 * @param op The operator consumes this observable and produces a new one
134 * @param op The operator consumes this observable and produces a new one
139 *
135 *
140 * The operator is a higher order function which takes a source observable
136 * The operator is a higher order function which takes a source observable
141 * and returns a producer for the new observable.
137 * and returns a producer for the new observable.
142 *
138 *
143 * This function can be used to create a complex mapping between source and
139 * This function can be used to create a complex mapping between source and
144 * resulting observables. The operator may have a state (or a side effect)
140 * resulting observables. The operator may have a state (or a side effect)
145 * and can be connected to multiple observables.
141 * and can be connected to multiple observables.
146 */
142 */
147 pipe<U>(op: OperatorFn<T, U>): Observable<U>;
143 pipe<U>(op: OperatorFn<T, U>): Observable<U>;
148
144
149 /** Waits for the next event to occur and returns a promise for the next value
145 /** Waits for the next event to occur and returns a promise for the next value
150 * @param ct Cancellation token
146 * @param ct Cancellation token
151 */
147 */
152 next(ct?: ICancellation): Promise<T>;
148 next(ct?: ICancellation): Promise<T>;
153
149
154 /** Collects items of the sequence to the array. */
150 /** Collects items of the sequence to the array. */
155 collect(ct?: ICancellation): Promise<T[]>;
151 collect(ct?: ICancellation): Promise<T[]>;
152
153 /**
154 * Iterates through the elements in this observable until the end of the
155 * sequence is reached or the operation is cancelled.
156 *
157 * @param next The callback for the next item in the sequence
158 * @param ct The optional cancellation token for this operation
159 *
160 * @returns A promise which will be fulfilled when the operation is completed.
161 * In case of the cancellation this promise will be rejected.
162 */
163 forEach(next: (item: T) => void, ct?: ICancellation): Promise<void>;
156 } No newline at end of file
164 }
@@ -1,191 +1,194
1 import { empty, observe, of } from "./observable";
1 import { empty, observe, of } from "./observable";
2 import * as tap from "tap";
2 import * as tap from "tap";
3 import { Cancellation } from "@implab/core-amd/Cancellation";
3 import { Cancellation } from "@implab/core-amd/Cancellation";
4 import { delay } from "@implab/core-amd/safe";
4 import { delay } from "@implab/core-amd/safe";
5
5
6 const subj1 = observe<number>(({ next, complete }) => {
6 const subj1 = observe<number>(({ next, error, complete }) => {
7 next(1);
7 next(1);
8 complete();
8 complete();
9 next(2);
9 next(2);
10 next(3);
11 error(new Error("This error should be ignored"));
12 next(4);
10 });
13 });
11
14
12 const consumer1 = {
15 const consumer1 = {
13 sum: 0,
16 sum: 0,
14 next(v: number) {
17 next(v: number) {
15 this.sum += v;
18 this.sum += v;
16 }
19 }
17 };
20 };
18
21
19 subj1.subscribe(consumer1);
22 subj1.subscribe(consumer1);
20 tap.equal(consumer1.sum, 1, "Should get only one value");
23 tap.equal(consumer1.sum, 1, "Should get only one value");
21
24
22 subj1.subscribe(consumer1);
25 subj1.subscribe(consumer1);
23 tap.equal(consumer1.sum, 2, "Should get the value again");
26 tap.equal(consumer1.sum, 2, "Should get the value again");
24
27
25 const consumer2 = {
28 const consumer2 = {
26 value: 0,
29 value: 0,
27 completed: false,
30 completed: false,
28 next(v: number) { this.value = v; },
31 next(v: number) { this.value = v; },
29 complete() { this.completed = true; }
32 complete() { this.completed = true; }
30 };
33 };
31
34
32 let maps = 0;
35 let maps = 0;
33
36
34 subj1
37 subj1
35 .map(v => {
38 .map(v => {
36 tap.comment(`map1: ${v * 2}`);
39 tap.comment(`map1: ${v * 2}`);
37 maps++;
40 maps++;
38 return v * 2;
41 return v * 2;
39 })
42 })
40 .map(v => {
43 .map(v => {
41 tap.comment(`map2: ${v * 2}`);
44 tap.comment(`map2: ${v * 2}`);
42 maps++;
45 maps++;
43 return v * 2;
46 return v * 2;
44 })
47 })
45 .map(v => {
48 .map(v => {
46 tap.comment(`map3: ${v * 2}`);
49 tap.comment(`map3: ${v * 2}`);
47 maps++;
50 maps++;
48 return v * 2;
51 return v * 2;
49 })
52 })
50 .subscribe(consumer2);
53 .subscribe(consumer2);
51
54
52 tap.equal(consumer2.value, 8, "Should map");
55 tap.equal(consumer2.value, 8, "Should map");
53 tap.equal(maps, 3, "The map chain should not be executed after completion");
56 tap.equal(maps, 3, "The map chain should not be executed after completion");
54 tap.ok(consumer2.completed, "The completion signal should pass through");
57 tap.ok(consumer2.completed, "The completion signal should pass through");
55
58
56 const subj2 = observe<number>(({ next, complete }) => {
59 const subj2 = observe<number>(({ next, complete }) => {
57 [1, 2, 3, 4, 5].forEach(next);
60 [1, 2, 3, 4, 5].forEach(next);
58 complete();
61 complete();
59 return () => {
62 return () => {
60 tap.comment("subj2: unsubscribe");
63 tap.comment("subj2: unsubscribe");
61 };
64 };
62 });
65 });
63
66
64 const consumer3 = {
67 const consumer3 = {
65 even: 0,
68 even: 0,
66 odd: 0,
69 odd: 0,
67 completed: false,
70 completed: false,
68 subscribed: 0,
71 subscribed: 0,
69 unsubscribed: 0,
72 unsubscribed: 0,
70 next(v: "even" | "odd") {
73 next(v: "even" | "odd") {
71 this[v]++;
74 this[v]++;
72 },
75 },
73 complete() {
76 complete() {
74 this.completed = true;
77 this.completed = true;
75 },
78 },
76 subscribe() {
79 subscribe() {
77 this.subscribed++;
80 this.subscribed++;
78 },
81 },
79 unsubscribe() {
82 unsubscribe() {
80 this.unsubscribed++;
83 this.unsubscribed++;
81 }
84 }
82 };
85 };
83
86
84
87
85 const subj3 = subj2.pipe<"even" | "odd">(self => observe(({ next, complete, error }) => {
88 const subj3 = subj2.pipe<"even" | "odd">(self => observe(({ next, complete, error }) => {
86 consumer3.subscribe();
89 consumer3.subscribe();
87 let count = 0;
90 let count = 0;
88 const h = self.subscribe({
91 const h = self.subscribe({
89 next: val => {
92 next: val => {
90 if (val % 2 === 0)
93 if (val % 2 === 0)
91 next("odd");
94 next("odd");
92 else
95 else
93 next("even");
96 next("even");
94 if (++count === 4)
97 if (++count === 4)
95 complete();
98 complete();
96 },
99 },
97 complete,
100 complete,
98 error
101 error
99 });
102 });
100 return () => {
103 return () => {
101 consumer3.unsubscribe();
104 consumer3.unsubscribe();
102 h.unsubscribe();
105 h.unsubscribe();
103 };
106 };
104 }));
107 }));
105
108
106 subj3.subscribe(consumer3);
109 subj3.subscribe(consumer3);
107
110
108 tap.equal(consumer3.odd, 2, "Should get 2 odd elements");
111 tap.equal(consumer3.odd, 2, "Should get 2 odd elements");
109 tap.equal(consumer3.even, 2, "Should get 2 even elements");
112 tap.equal(consumer3.even, 2, "Should get 2 even elements");
110 tap.ok(consumer3.completed, "The sequence should completed");
113 tap.ok(consumer3.completed, "The sequence should completed");
111 tap.equal(consumer3.subscribed, 1, "The subscription should be done once");
114 tap.equal(consumer3.subscribed, 1, "The subscription should be done once");
112 tap.equal(consumer3.unsubscribed, 1, "The cleanup should be done after completion");
115 tap.equal(consumer3.unsubscribed, 1, "The cleanup should be done after completion");
113
116
114 subj2.reduce((a, b) => a + b).subscribe({
117 subj2.reduce((a, b) => a + b).subscribe({
115 next: val => tap.comment("subj2: reduce =", val),
118 next: val => tap.comment("subj2: reduce =", val),
116 complete: () => tap.comment("subj2: complete")
119 complete: () => tap.comment("subj2: complete")
117 });
120 });
118
121
119 tap.test("of(...) tests", async t => {
122 tap.test("of(...) tests", async t => {
120 await subj2.reduce((a, b) => a + b).next()
123 await subj2.reduce((a, b) => a + b).next()
121 .then(value => t.comment("subj2: next reduce=", value));
124 .then(value => t.comment("subj2: next reduce=", value));
122
125
123 await subj2.next().then(val => t.equal(val, 1, "Should peek the first element"));
126 await subj2.next().then(val => t.equal(val, 1, "Should peek the first element"));
124
127
125 const cancelled = new Cancellation(cancel => cancel());
128 const cancelled = new Cancellation(cancel => cancel());
126 await t.rejects(subj2.next(cancelled), "Cancelled next() method should fail");
129 await t.rejects(subj2.next(cancelled), "Cancelled next() method should fail");
127
130
128 await t.rejects(empty.next(), "Empty sequence should fail to get next element");
131 await t.rejects(empty.next(), "Empty sequence should fail to get next element");
129
132
130 await of(delay(1).then(() => 1), Promise.resolve(2), 3)
133 await of(delay(1).then(() => 1), Promise.resolve(2), 3)
131 .reduce<number[]>((a, x) => [...a, x], [])
134 .reduce<number[]>((a, x) => [...a, x], [])
132 .next()
135 .next()
133 .then(res => t.same(res, [1, 2, 3], "of(...) should keep the order"));
136 .then(res => t.same(res, [1, 2, 3], "of(...) should keep the order"));
134
137
135 const rejected = Promise.reject("DIE!");
138 const rejected = Promise.reject("DIE!");
136 rejected.catch(() => { }); // SAFE AND SOUND
139 rejected.catch(() => { }); // SAFE AND SOUND
137
140
138 await t.resolves(
141 await t.resolves(
139 of(Promise.resolve(1), rejected).next(),
142 of(Promise.resolve(1), rejected).next(),
140 "of(...) should emit non-rejected items"
143 "of(...) should emit non-rejected items"
141 );
144 );
142 await t.rejects(
145 await t.rejects(
143 of(1, Promise.reject("DIE!")).reduce((a) => a).next(),
146 of(1, Promise.reject("DIE!")).reduce((a) => a).next(),
144 "of(...) should terminate with error when a parameter is rejected"
147 "of(...) should terminate with error when a parameter is rejected"
145 );
148 );
146
149
147 t.same(await of(1, 2, 3).collect(), [1, 2, 3], ".collect() should return the collected sequence");
150 t.same(await of(1, 2, 3).collect(), [1, 2, 3], ".collect() should return the collected sequence");
148 await t.rejects(of(1, 2, 3).collect(cancelled), ".collect() should support cancellation");
151 await t.rejects(of(1, 2, 3).collect(cancelled), ".collect() should support cancellation");
149
152
150 }).catch(() => { });
153 }).catch(() => { });
151
154
152 tap.test(".tap() tests", async t => {
155 tap.test(".tap() tests", async t => {
153 const side: number[] = [];
156 const side: number[] = [];
154
157
155 of(1, 2)
158 of(1, 2)
156 .tap({ next: v => side.push(v), complete: () => side.push(0) })
159 .tap({ next: v => side.push(v), complete: () => side.push(0) })
157 .tap({ next: v => side.push(v * v) })
160 .tap({ next: v => side.push(v * v) })
158 .subscribe({});
161 .subscribe({});
159
162
160 t.same(side, [1, 1, 2, 4, 0], ".tap() should be called in the order of registration");
163 t.same(side, [1, 1, 2, 4, 0], ".tap() should be called in the order of registration");
161
164
162 side.length = 0;
165 side.length = 0;
163
166
164 await new Promise<void>(resolve => {
167 await new Promise<void>(resolve => {
165 of(1, 2, delay(1).then(() => 3))
168 of(1, 2, delay(1).then(() => 3))
166 .tap({ next: v => side.push(v) })
169 .tap({ next: v => side.push(v) })
167 .tap({ next: v => v === 1 && resolve() })
170 .tap({ next: v => v === 1 && resolve() })
168 .subscribe({});
171 .subscribe({});
169 });
172 });
170
173
171 t.same(side, [1, 2], ".tap() should be processed synchronously");
174 t.same(side, [1, 2], ".tap() should be processed synchronously");
172
175
173 }).catch(() => { });
176 }).catch(() => { });
174
177
175 tap.test(".while() tests", async t => {
178 tap.test(".while() tests", async t => {
176
179
177 const seq = of(1, 2, 3, 4).while(v => v <= 2);
180 const seq = of(1, 2, 3, 4).while(v => v <= 2);
178
181
179 t.same(await seq.collect(), [1, 2], "Should collect only taken elements");
182 t.same(await seq.collect(), [1, 2], "Should collect only taken elements");
180
183
181 const data: number[] = [];
184 const data: number[] = [];
182 let complete = 0;
185 let complete = 0;
183 seq.subscribe({
186 seq.subscribe({
184 next: v => data.push(v),
187 next: v => data.push(v),
185 complete: () => complete++
188 complete: () => complete++
186 });
189 });
187
190
188 t.same(data, [1, 2], "Should receive only taken elements");
191 t.same(data, [1, 2], "Should receive only taken elements");
189 t.equal(complete, 1, "Complete should run once");
192 t.equal(complete, 1, "Complete should run once");
190
193
191 }).catch(() => { }); No newline at end of file
194 }).catch(() => { });
General Comments 0
You need to be logged in to leave comments. Login now