##// END OF EJS Templates
WIP observables
cin -
r157:f9518367061a default
parent child
Show More
@@ -1,5 +1,5
1 1 import { isPromise } from "@implab/core-amd/safe";
2 import { Unsubscribable, Producer, FusedSink, Observable, Subscribable } from "./observable/interfaces";
2 import { Unsubscribable, Producer, Observable, Subscribable } from "./observable/interfaces";
3 3 import { ObservableImpl } from "./observable/ObservableImpl";
4 4
5 5 export const isUnsubscribable = (v: unknown): v is Unsubscribable =>
@@ -8,46 +8,10 export const isUnsubscribable = (v: unkn
8 8 export const isSubscribable = <T = unknown>(v: unknown): v is Subscribable<T> =>
9 9 v !== null && v !== undefined && typeof (v as Subscribable<unknown>).subscribe === "function";
10 10
11 const noop = () => { };
12
13 /** Wraps the producer to handle tear down logic and subscription management
14 *
15 * The resulting producer will invoke cleanup logic on error or complete events
16 * and will prevent calling of any method from the sink.
17 *
18 * @param producer The producer to wrap
19 * @returns The wrapper producer
20 */
21 const fuse = <T>(producer: Producer<T>) => ({ next, error, complete }: FusedSink<T>) => {
22 let done = false;
23 let cleanup = noop;
24
25 const _fin = <A extends unknown[]>(fn: (...args: A) => void) =>
26 (...args: A) => done ?
27 void (0) :
28 (done = true, cleanup(), fn(...args));
29
30 const _fin0 = () => done ? void (0) : (done = true, cleanup());
31
32 const safeSink = {
33 next: (value: T) => { !done && next(value); },
34 error: _fin(error),
35 complete: _fin(complete),
36 isClosed: () => done
37 };
38 // call the producer
39 cleanup = producer(safeSink) ?? noop;
40 // if the producer throws exception bypass it to the caller rather then to
41 // the sink. This is a feature.
42
43 // if the producer completed the sequence immediately call the cleanup in place
44 return done ? cleanup() : _fin0;
45 };
46
47 export const observe = <T>(producer: Producer<T>): Observable<T> => new ObservableImpl<T>(fuse(producer));
11 export const observe = <T>(producer: Producer<T>): Observable<T> => new ObservableImpl<T>(producer);
48 12
49 13 /** Converts an array to the observable sequence of its elements. */
50 export const ofArray = <T>(items: T[]) => new ObservableImpl<T>(
14 export const ofArray = <T>(items: T[]): Observable<T> => new ObservableImpl<T>(
51 15 ({ next, complete }) => (
52 16 items.forEach(next),
53 17 complete()
@@ -100,6 +64,6 export const of = <T>(...items: (T | Pro
100 64 }
101 65 );
102 66
103 export const empty = new ObservableImpl<never>(({ complete }) => complete());
67 export const empty: Observable<never> = new ObservableImpl<never>(({ complete }) => complete());
104 68
105 69 export type * from "./observable/interfaces"; No newline at end of file
@@ -1,5 +1,5
1 1 import { id as mid } from "module";
2 import { AccumulatorFn, Observable, Observer, OperatorFn, Producer, Subscribable } from "./interfaces";
2 import { AccumulatorFn, Observable, Observer, OperatorFn, Producer, Sink, Subscribable, Unsubscribable } from "./interfaces";
3 3 import { TraceSource } from "@implab/core-amd/log/TraceSource";
4 4 import { Predicate } from "@implab/core-amd/interfaces";
5 5 import { Cancellation } from "@implab/core-amd/Cancellation";
@@ -10,7 +10,7 const noop = () => { };
10 10
11 11 const errorFallback = (e: unknown) => trace.error("Unhandled observable error: {0}", e);
12 12
13 const subscription = () => {
13 const subscription = (reject: (err: unknown) => void, ct = Cancellation.none) => {
14 14 let done = false;
15 15 let cleanup = noop;
16 16
@@ -23,30 +23,60 const subscription = () => {
23 23 cb();
24 24 } else {
25 25 const _prev = cleanup;
26 cleanup = () => (_prev(), cb());
26 cleanup = _prev === noop ? cb : () => (_prev(), cb());
27 27 }
28 28 }
29 29 };
30 30
31 return { finalize, isClosed, teardown };
31 const error = (err: unknown) => finalize() && reject(err);
32
33 if (ct.isSupported()) {
34 const h = ct.register(error);
35 teardown(() => h.destroy());
36 }
37
38 return { finalize, isClosed, teardown, error };
32 39 };
33 40
41 type FusedSink<T> = Omit<Sink<T>, "isClosed">;
42
43 export const fuse = <T>(producer: Producer<T>) => (consumer: FusedSink<T>) => {
44
45 const reject = (err: unknown) => consumer.error ? consumer.error(err) : errorFallback(err);
46 const { teardown, isClosed, finalize, error } = subscription(reject);
47
48 const next = consumer.next ? consumer.next.bind(consumer) : noop;
49
50 teardown(producer({
51 next: next !== noop ? (v => isClosed() || next(v)) : noop,
52 error,
53 complete: () => finalize() && consumer.complete && consumer.complete(),
54 isClosed
55 }));
56
57 return { unsubscribe: finalize };
58 };
59
60 type FusedProducer<T> = (consumer: FusedSink<T>) => void | (() => void);
61
34 62 export class ObservableImpl<T> implements Observable<T> {
35 63
36 private readonly _producer: Producer<T>;
64 private readonly _producer: FusedProducer<T>;
37 65
38 constructor(producer: Producer<T>) {
66 constructor(producer: FusedProducer<T>) {
39 67 this._producer = producer;
40 68 }
41 69
42 70 subscribe(consumer: Observer<T> = {}) {
43 const { teardown, isClosed, finalize } = subscription();
71 const reject = (err: unknown) => consumer.error ? consumer.error(err) : errorFallback(err);
72 const next = consumer.next ? consumer.next.bind(consumer) : noop;
73
74 const { teardown, isClosed, finalize, error } = subscription(reject);
44 75
45 teardown(this._producer({
46 next: consumer.next ? consumer.next.bind(consumer) : noop,
47 error: err => finalize() && (consumer.error ? consumer.error(err) : errorFallback(err)),
48 complete: () => finalize() && consumer.complete && consumer.complete(),
49 isClosed
76 teardown((0, this._producer)({
77 next: next !== noop ? (v => isClosed() || next(v)) : noop,
78 error,
79 complete: () => finalize() && consumer.complete && consumer.complete()
50 80 }));
51 81
52 82 return { unsubscribe: finalize };
@@ -61,12 +91,11 export class ObservableImpl<T> implement
61 91 }
62 92
63 93 tap({ next: tapNext, complete: tapComplete, error: tapError }: Observer<T>) {
64 return new ObservableImpl<T>(({ next, complete, error, isClosed }) =>
94 return new ObservableImpl<T>(({ next, complete, error }) =>
65 95 this._producer({
66 96 next: tapNext ? (v => (tapNext(v), next(v))) : next,
67 97 complete: tapComplete ? (() => (tapComplete(), complete())) : complete,
68 error: tapError ? (e => (tapError(e), error(e))) : error,
69 isClosed
98 error: tapError ? (e => (tapError(e), error(e))) : error
70 99 }));
71 100 }
72 101
@@ -99,6 +128,8 export class ObservableImpl<T> implement
99 128 );
100 129 }
101 130
131 scan<A>(accumulator: AccumulatorFn<T, A>, initial: A): Observable<A>;
132 scan(accumulator: AccumulatorFn<T, T>): Observable<T>;
102 133 scan<A>(...args: [AccumulatorFn<T, A>, A] | [AccumulatorFn<T, T>]) {
103 134 if (args.length === 1) {
104 135 return new ObservableImpl<T | A>(({ next, ...rest }) => {
@@ -122,10 +153,12 export class ObservableImpl<T> implement
122 153 }
123 154 }
124 155
156 reduce<A>(accumulator: AccumulatorFn<T, A>, initial: A): Observable<A>;
157 reduce(accumulator: AccumulatorFn<T, T>): Observable<T>;
125 158 reduce<A>(...args: [AccumulatorFn<T, A>, A] | [AccumulatorFn<T, T>]) {
126 159
127 160 if (args.length === 1) {
128 return new ObservableImpl<T | A>(({ next, complete, error, isClosed }) => {
161 return new ObservableImpl<T | A>(({ next, complete, error }) => {
129 162 const [accumulator] = args;
130 163 let _acc: T;
131 164 let index = 0;
@@ -141,12 +174,11 export class ObservableImpl<T> implement
141 174 complete();
142 175 }
143 176 },
144 error,
145 isClosed
177 error
146 178 });
147 179 });
148 180 } else {
149 return new ObservableImpl<T | A>(({ next, complete, error, isClosed }) => {
181 return new ObservableImpl<T | A>(({ next, complete, error }) => {
150 182 const [accumulator, initial] = args;
151 183 let _acc = initial;
152 184 return this._producer({
@@ -157,8 +189,7 export class ObservableImpl<T> implement
157 189 next(_acc);
158 190 complete();
159 191 },
160 error,
161 isClosed
192 error
162 193 });
163 194 });
164 195 }
@@ -188,11 +219,7 export class ObservableImpl<T> implement
188 219 return new Promise<T>((resolve, reject) => {
189 220 ct.throwIfRequested();
190 221
191 const { teardown, finalize, isClosed } = subscription();
192
193 const error = (err: unknown) => finalize() && reject(err);
194 const h = ct.register(error);
195 teardown(() => h.destroy());
222 const { teardown, finalize, isClosed, error } = subscription(reject, ct);
196 223 teardown((0, this._producer)({
197 224 next: (item: T) => finalize() && resolve(item),
198 225 complete: () => finalize() && reject(new Error("The sequence is empty")),
@@ -209,11 +236,7 export class ObservableImpl<T> implement
209 236
210 237 const acc: T[] = [];
211 238
212 const { teardown, finalize, isClosed } = subscription();
213 const error = (err: unknown) => finalize() && reject(err);
214
215 const h = ct.register(error);
216 teardown(() => h.destroy());
239 const { teardown, finalize, isClosed, error } = subscription(reject, ct);
217 240 teardown((0, this._producer)({
218 241 next: (item: T) => isClosed() ? void (0) : acc.push(item),
219 242 complete: () => finalize() && resolve(acc),
@@ -223,4 +246,19 export class ObservableImpl<T> implement
223 246 });
224 247 }
225 248
249 forEach(next: (item: T) => void, ct = Cancellation.none): Promise<void> {
250 return new Promise<void>((resolve, reject) => {
251 ct.throwIfRequested();
252
253 const { teardown, finalize, isClosed, error } = subscription(reject, ct);
254
255 teardown((0, this._producer)({
256 next: item => !isClosed() && next(item),
257 complete: () => finalize() && resolve(),
258 error,
259 isClosed
260 }));
261 });
262 }
263
226 264 } No newline at end of file
@@ -52,10 +52,6 export type Sink<T> = {
52 52
53 53 export type Producer<T> = (sink: Sink<T>) => (void | (() => void));
54 54
55 export type FusedSink<T> = Omit<Sink<T>, "isClosed">;
56
57 export type FusedProducer<T> = (sink: FusedSink<T>) => (void | (() => void));
58
59 55 export interface Unsubscribable {
60 56 unsubscribe(): void;
61 57 }
@@ -153,4 +149,16 export interface Observable<T> extends S
153 149
154 150 /** Collects items of the sequence to the array. */
155 151 collect(ct?: ICancellation): Promise<T[]>;
152
153 /**
154 * Iterates through the elements in this observable until the end of the
155 * sequence is reached or the operation is cancelled.
156 *
157 * @param next The callback for the next item in the sequence
158 * @param ct The optional cancellation token for this operation
159 *
160 * @returns A promise which will be fulfilled when the operation is completed.
161 * In case of the cancellation this promise will be rejected.
162 */
163 forEach(next: (item: T) => void, ct?: ICancellation): Promise<void>;
156 164 } No newline at end of file
@@ -3,10 +3,13 import * as tap from "tap";
3 3 import { Cancellation } from "@implab/core-amd/Cancellation";
4 4 import { delay } from "@implab/core-amd/safe";
5 5
6 const subj1 = observe<number>(({ next, complete }) => {
6 const subj1 = observe<number>(({ next, error, complete }) => {
7 7 next(1);
8 8 complete();
9 9 next(2);
10 next(3);
11 error(new Error("This error should be ignored"));
12 next(4);
10 13 });
11 14
12 15 const consumer1 = {
General Comments 0
You need to be logged in to leave comments. Login now