##// END OF EJS Templates
WIP observables
cin -
r157:f9518367061a default
parent child
Show More
@@ -1,5 +1,5
1 import { isPromise } from "@implab/core-amd/safe";
1 import { isPromise } from "@implab/core-amd/safe";
2 import { Unsubscribable, Producer, FusedSink, Observable, Subscribable } from "./observable/interfaces";
2 import { Unsubscribable, Producer, Observable, Subscribable } from "./observable/interfaces";
3 import { ObservableImpl } from "./observable/ObservableImpl";
3 import { ObservableImpl } from "./observable/ObservableImpl";
4
4
5 export const isUnsubscribable = (v: unknown): v is Unsubscribable =>
5 export const isUnsubscribable = (v: unknown): v is Unsubscribable =>
@@ -8,46 +8,10 export const isUnsubscribable = (v: unkn
8 export const isSubscribable = <T = unknown>(v: unknown): v is Subscribable<T> =>
8 export const isSubscribable = <T = unknown>(v: unknown): v is Subscribable<T> =>
9 v !== null && v !== undefined && typeof (v as Subscribable<unknown>).subscribe === "function";
9 v !== null && v !== undefined && typeof (v as Subscribable<unknown>).subscribe === "function";
10
10
11 const noop = () => { };
11 export const observe = <T>(producer: Producer<T>): Observable<T> => new ObservableImpl<T>(producer);
12
13 /** Wraps the producer to handle tear down logic and subscription management
14 *
15 * The resulting producer will invoke cleanup logic on error or complete events
16 * and will prevent calling of any method from the sink.
17 *
18 * @param producer The producer to wrap
19 * @returns The wrapper producer
20 */
21 const fuse = <T>(producer: Producer<T>) => ({ next, error, complete }: FusedSink<T>) => {
22 let done = false;
23 let cleanup = noop;
24
25 const _fin = <A extends unknown[]>(fn: (...args: A) => void) =>
26 (...args: A) => done ?
27 void (0) :
28 (done = true, cleanup(), fn(...args));
29
30 const _fin0 = () => done ? void (0) : (done = true, cleanup());
31
32 const safeSink = {
33 next: (value: T) => { !done && next(value); },
34 error: _fin(error),
35 complete: _fin(complete),
36 isClosed: () => done
37 };
38 // call the producer
39 cleanup = producer(safeSink) ?? noop;
40 // if the producer throws exception bypass it to the caller rather then to
41 // the sink. This is a feature.
42
43 // if the producer completed the sequence immediately call the cleanup in place
44 return done ? cleanup() : _fin0;
45 };
46
47 export const observe = <T>(producer: Producer<T>): Observable<T> => new ObservableImpl<T>(fuse(producer));
48
12
49 /** Converts an array to the observable sequence of its elements. */
13 /** Converts an array to the observable sequence of its elements. */
50 export const ofArray = <T>(items: T[]) => new ObservableImpl<T>(
14 export const ofArray = <T>(items: T[]): Observable<T> => new ObservableImpl<T>(
51 ({ next, complete }) => (
15 ({ next, complete }) => (
52 items.forEach(next),
16 items.forEach(next),
53 complete()
17 complete()
@@ -100,6 +64,6 export const of = <T>(...items: (T | Pro
100 }
64 }
101 );
65 );
102
66
103 export const empty = new ObservableImpl<never>(({ complete }) => complete());
67 export const empty: Observable<never> = new ObservableImpl<never>(({ complete }) => complete());
104
68
105 export type * from "./observable/interfaces"; No newline at end of file
69 export type * from "./observable/interfaces";
@@ -1,5 +1,5
1 import { id as mid } from "module";
1 import { id as mid } from "module";
2 import { AccumulatorFn, Observable, Observer, OperatorFn, Producer, Subscribable } from "./interfaces";
2 import { AccumulatorFn, Observable, Observer, OperatorFn, Producer, Sink, Subscribable, Unsubscribable } from "./interfaces";
3 import { TraceSource } from "@implab/core-amd/log/TraceSource";
3 import { TraceSource } from "@implab/core-amd/log/TraceSource";
4 import { Predicate } from "@implab/core-amd/interfaces";
4 import { Predicate } from "@implab/core-amd/interfaces";
5 import { Cancellation } from "@implab/core-amd/Cancellation";
5 import { Cancellation } from "@implab/core-amd/Cancellation";
@@ -10,7 +10,7 const noop = () => { };
10
10
11 const errorFallback = (e: unknown) => trace.error("Unhandled observable error: {0}", e);
11 const errorFallback = (e: unknown) => trace.error("Unhandled observable error: {0}", e);
12
12
13 const subscription = () => {
13 const subscription = (reject: (err: unknown) => void, ct = Cancellation.none) => {
14 let done = false;
14 let done = false;
15 let cleanup = noop;
15 let cleanup = noop;
16
16
@@ -23,30 +23,60 const subscription = () => {
23 cb();
23 cb();
24 } else {
24 } else {
25 const _prev = cleanup;
25 const _prev = cleanup;
26 cleanup = () => (_prev(), cb());
26 cleanup = _prev === noop ? cb : () => (_prev(), cb());
27 }
27 }
28 }
28 }
29 };
29 };
30
30
31 return { finalize, isClosed, teardown };
31 const error = (err: unknown) => finalize() && reject(err);
32
33 if (ct.isSupported()) {
34 const h = ct.register(error);
35 teardown(() => h.destroy());
36 }
37
38 return { finalize, isClosed, teardown, error };
32 };
39 };
33
40
41 type FusedSink<T> = Omit<Sink<T>, "isClosed">;
42
43 export const fuse = <T>(producer: Producer<T>) => (consumer: FusedSink<T>) => {
44
45 const reject = (err: unknown) => consumer.error ? consumer.error(err) : errorFallback(err);
46 const { teardown, isClosed, finalize, error } = subscription(reject);
47
48 const next = consumer.next ? consumer.next.bind(consumer) : noop;
49
50 teardown(producer({
51 next: next !== noop ? (v => isClosed() || next(v)) : noop,
52 error,
53 complete: () => finalize() && consumer.complete && consumer.complete(),
54 isClosed
55 }));
56
57 return { unsubscribe: finalize };
58 };
59
60 type FusedProducer<T> = (consumer: FusedSink<T>) => void | (() => void);
61
34 export class ObservableImpl<T> implements Observable<T> {
62 export class ObservableImpl<T> implements Observable<T> {
35
63
36 private readonly _producer: Producer<T>;
64 private readonly _producer: FusedProducer<T>;
37
65
38 constructor(producer: Producer<T>) {
66 constructor(producer: FusedProducer<T>) {
39 this._producer = producer;
67 this._producer = producer;
40 }
68 }
41
69
42 subscribe(consumer: Observer<T> = {}) {
70 subscribe(consumer: Observer<T> = {}) {
43 const { teardown, isClosed, finalize } = subscription();
71 const reject = (err: unknown) => consumer.error ? consumer.error(err) : errorFallback(err);
72 const next = consumer.next ? consumer.next.bind(consumer) : noop;
73
74 const { teardown, isClosed, finalize, error } = subscription(reject);
44
75
45 teardown(this._producer({
76 teardown((0, this._producer)({
46 next: consumer.next ? consumer.next.bind(consumer) : noop,
77 next: next !== noop ? (v => isClosed() || next(v)) : noop,
47 error: err => finalize() && (consumer.error ? consumer.error(err) : errorFallback(err)),
78 error,
48 complete: () => finalize() && consumer.complete && consumer.complete(),
79 complete: () => finalize() && consumer.complete && consumer.complete()
49 isClosed
50 }));
80 }));
51
81
52 return { unsubscribe: finalize };
82 return { unsubscribe: finalize };
@@ -61,12 +91,11 export class ObservableImpl<T> implement
61 }
91 }
62
92
63 tap({ next: tapNext, complete: tapComplete, error: tapError }: Observer<T>) {
93 tap({ next: tapNext, complete: tapComplete, error: tapError }: Observer<T>) {
64 return new ObservableImpl<T>(({ next, complete, error, isClosed }) =>
94 return new ObservableImpl<T>(({ next, complete, error }) =>
65 this._producer({
95 this._producer({
66 next: tapNext ? (v => (tapNext(v), next(v))) : next,
96 next: tapNext ? (v => (tapNext(v), next(v))) : next,
67 complete: tapComplete ? (() => (tapComplete(), complete())) : complete,
97 complete: tapComplete ? (() => (tapComplete(), complete())) : complete,
68 error: tapError ? (e => (tapError(e), error(e))) : error,
98 error: tapError ? (e => (tapError(e), error(e))) : error
69 isClosed
70 }));
99 }));
71 }
100 }
72
101
@@ -99,6 +128,8 export class ObservableImpl<T> implement
99 );
128 );
100 }
129 }
101
130
131 scan<A>(accumulator: AccumulatorFn<T, A>, initial: A): Observable<A>;
132 scan(accumulator: AccumulatorFn<T, T>): Observable<T>;
102 scan<A>(...args: [AccumulatorFn<T, A>, A] | [AccumulatorFn<T, T>]) {
133 scan<A>(...args: [AccumulatorFn<T, A>, A] | [AccumulatorFn<T, T>]) {
103 if (args.length === 1) {
134 if (args.length === 1) {
104 return new ObservableImpl<T | A>(({ next, ...rest }) => {
135 return new ObservableImpl<T | A>(({ next, ...rest }) => {
@@ -122,10 +153,12 export class ObservableImpl<T> implement
122 }
153 }
123 }
154 }
124
155
156 reduce<A>(accumulator: AccumulatorFn<T, A>, initial: A): Observable<A>;
157 reduce(accumulator: AccumulatorFn<T, T>): Observable<T>;
125 reduce<A>(...args: [AccumulatorFn<T, A>, A] | [AccumulatorFn<T, T>]) {
158 reduce<A>(...args: [AccumulatorFn<T, A>, A] | [AccumulatorFn<T, T>]) {
126
159
127 if (args.length === 1) {
160 if (args.length === 1) {
128 return new ObservableImpl<T | A>(({ next, complete, error, isClosed }) => {
161 return new ObservableImpl<T | A>(({ next, complete, error }) => {
129 const [accumulator] = args;
162 const [accumulator] = args;
130 let _acc: T;
163 let _acc: T;
131 let index = 0;
164 let index = 0;
@@ -141,12 +174,11 export class ObservableImpl<T> implement
141 complete();
174 complete();
142 }
175 }
143 },
176 },
144 error,
177 error
145 isClosed
146 });
178 });
147 });
179 });
148 } else {
180 } else {
149 return new ObservableImpl<T | A>(({ next, complete, error, isClosed }) => {
181 return new ObservableImpl<T | A>(({ next, complete, error }) => {
150 const [accumulator, initial] = args;
182 const [accumulator, initial] = args;
151 let _acc = initial;
183 let _acc = initial;
152 return this._producer({
184 return this._producer({
@@ -157,8 +189,7 export class ObservableImpl<T> implement
157 next(_acc);
189 next(_acc);
158 complete();
190 complete();
159 },
191 },
160 error,
192 error
161 isClosed
162 });
193 });
163 });
194 });
164 }
195 }
@@ -188,11 +219,7 export class ObservableImpl<T> implement
188 return new Promise<T>((resolve, reject) => {
219 return new Promise<T>((resolve, reject) => {
189 ct.throwIfRequested();
220 ct.throwIfRequested();
190
221
191 const { teardown, finalize, isClosed } = subscription();
222 const { teardown, finalize, isClosed, error } = subscription(reject, ct);
192
193 const error = (err: unknown) => finalize() && reject(err);
194 const h = ct.register(error);
195 teardown(() => h.destroy());
196 teardown((0, this._producer)({
223 teardown((0, this._producer)({
197 next: (item: T) => finalize() && resolve(item),
224 next: (item: T) => finalize() && resolve(item),
198 complete: () => finalize() && reject(new Error("The sequence is empty")),
225 complete: () => finalize() && reject(new Error("The sequence is empty")),
@@ -209,11 +236,7 export class ObservableImpl<T> implement
209
236
210 const acc: T[] = [];
237 const acc: T[] = [];
211
238
212 const { teardown, finalize, isClosed } = subscription();
239 const { teardown, finalize, isClosed, error } = subscription(reject, ct);
213 const error = (err: unknown) => finalize() && reject(err);
214
215 const h = ct.register(error);
216 teardown(() => h.destroy());
217 teardown((0, this._producer)({
240 teardown((0, this._producer)({
218 next: (item: T) => isClosed() ? void (0) : acc.push(item),
241 next: (item: T) => isClosed() ? void (0) : acc.push(item),
219 complete: () => finalize() && resolve(acc),
242 complete: () => finalize() && resolve(acc),
@@ -223,4 +246,19 export class ObservableImpl<T> implement
223 });
246 });
224 }
247 }
225
248
249 forEach(next: (item: T) => void, ct = Cancellation.none): Promise<void> {
250 return new Promise<void>((resolve, reject) => {
251 ct.throwIfRequested();
252
253 const { teardown, finalize, isClosed, error } = subscription(reject, ct);
254
255 teardown((0, this._producer)({
256 next: item => !isClosed() && next(item),
257 complete: () => finalize() && resolve(),
258 error,
259 isClosed
260 }));
261 });
262 }
263
226 } No newline at end of file
264 }
@@ -52,10 +52,6 export type Sink<T> = {
52
52
53 export type Producer<T> = (sink: Sink<T>) => (void | (() => void));
53 export type Producer<T> = (sink: Sink<T>) => (void | (() => void));
54
54
55 export type FusedSink<T> = Omit<Sink<T>, "isClosed">;
56
57 export type FusedProducer<T> = (sink: FusedSink<T>) => (void | (() => void));
58
59 export interface Unsubscribable {
55 export interface Unsubscribable {
60 unsubscribe(): void;
56 unsubscribe(): void;
61 }
57 }
@@ -153,4 +149,16 export interface Observable<T> extends S
153
149
154 /** Collects items of the sequence to the array. */
150 /** Collects items of the sequence to the array. */
155 collect(ct?: ICancellation): Promise<T[]>;
151 collect(ct?: ICancellation): Promise<T[]>;
152
153 /**
154 * Iterates through the elements in this observable until the end of the
155 * sequence is reached or the operation is cancelled.
156 *
157 * @param next The callback for the next item in the sequence
158 * @param ct The optional cancellation token for this operation
159 *
160 * @returns A promise which will be fulfilled when the operation is completed.
161 * In case of the cancellation this promise will be rejected.
162 */
163 forEach(next: (item: T) => void, ct?: ICancellation): Promise<void>;
156 } No newline at end of file
164 }
@@ -3,10 +3,13 import * as tap from "tap";
3 import { Cancellation } from "@implab/core-amd/Cancellation";
3 import { Cancellation } from "@implab/core-amd/Cancellation";
4 import { delay } from "@implab/core-amd/safe";
4 import { delay } from "@implab/core-amd/safe";
5
5
6 const subj1 = observe<number>(({ next, complete }) => {
6 const subj1 = observe<number>(({ next, error, complete }) => {
7 next(1);
7 next(1);
8 complete();
8 complete();
9 next(2);
9 next(2);
10 next(3);
11 error(new Error("This error should be ignored"));
12 next(4);
10 });
13 });
11
14
12 const consumer1 = {
15 const consumer1 = {
General Comments 0
You need to be logged in to leave comments. Login now