|
|
import { id as mid } from "module";
|
|
|
import { AccumulatorFn, Observable, Observer, OperatorFn, Producer, Sink, Subscribable, Unsubscribable } from "./interfaces";
|
|
|
import { TraceSource } from "@implab/core-amd/log/TraceSource";
|
|
|
import { Predicate } from "@implab/core-amd/interfaces";
|
|
|
import { Cancellation } from "@implab/core-amd/Cancellation";
|
|
|
|
|
|
const trace = TraceSource.get(mid);
|
|
|
|
|
|
const noop = () => { };
|
|
|
|
|
|
const errorFallback = (e: unknown) => trace.error("Unhandled observable error: {0}", e);
|
|
|
|
|
|
const subscription = (reject: (err: unknown) => void, ct = Cancellation.none) => {
|
|
|
let done = false;
|
|
|
let cleanup = noop;
|
|
|
|
|
|
const finalize = () => done ? false : (cleanup(), done = true);
|
|
|
const isClosed = () => done;
|
|
|
|
|
|
const teardown = (cb: void | (() => void)) => {
|
|
|
if (cb) {
|
|
|
if (done) {
|
|
|
cb();
|
|
|
} else {
|
|
|
const _prev = cleanup;
|
|
|
cleanup = _prev === noop ? cb : () => (_prev(), cb());
|
|
|
}
|
|
|
}
|
|
|
};
|
|
|
|
|
|
const error = (err: unknown) => finalize() && reject(err);
|
|
|
|
|
|
if (ct.isSupported()) {
|
|
|
const h = ct.register(error);
|
|
|
teardown(() => h.destroy());
|
|
|
}
|
|
|
|
|
|
return { finalize, isClosed, teardown, error };
|
|
|
};
|
|
|
|
|
|
type FusedSink<T> = Omit<Sink<T>, "isClosed">;
|
|
|
|
|
|
export const fuse = <T>(producer: Producer<T>) => (consumer: FusedSink<T>) => {
|
|
|
|
|
|
const reject = (err: unknown) => consumer.error ? consumer.error(err) : errorFallback(err);
|
|
|
const { teardown, isClosed, finalize, error } = subscription(reject);
|
|
|
|
|
|
const next = consumer.next ? consumer.next.bind(consumer) : noop;
|
|
|
|
|
|
teardown(producer({
|
|
|
next: next !== noop ? (v => isClosed() || next(v)) : noop,
|
|
|
error,
|
|
|
complete: () => finalize() && consumer.complete && consumer.complete(),
|
|
|
isClosed
|
|
|
}));
|
|
|
|
|
|
return { unsubscribe: finalize };
|
|
|
};
|
|
|
|
|
|
type FusedProducer<T> = (consumer: FusedSink<T>) => void | (() => void);
|
|
|
|
|
|
export class ObservableImpl<T> implements Observable<T> {
|
|
|
|
|
|
private readonly _producer: FusedProducer<T>;
|
|
|
|
|
|
constructor(producer: FusedProducer<T>) {
|
|
|
this._producer = producer;
|
|
|
}
|
|
|
|
|
|
subscribe(consumer: Observer<T> = {}) {
|
|
|
const reject = (err: unknown) => consumer.error ? consumer.error(err) : errorFallback(err);
|
|
|
const next = consumer.next ? consumer.next.bind(consumer) : noop;
|
|
|
|
|
|
const { teardown, isClosed, finalize, error } = subscription(reject);
|
|
|
|
|
|
teardown((0, this._producer)({
|
|
|
next: next !== noop ? (v => isClosed() || next(v)) : noop,
|
|
|
error,
|
|
|
complete: () => finalize() && consumer.complete && consumer.complete()
|
|
|
}));
|
|
|
|
|
|
return { unsubscribe: finalize };
|
|
|
}
|
|
|
|
|
|
map<T2>(mapper: (value: T) => T2) {
|
|
|
return new ObservableImpl<T2>(({ next, ...rest }) =>
|
|
|
this._producer({
|
|
|
next: next !== noop ? (v: T) => next(mapper(v)) : noop,
|
|
|
...rest
|
|
|
}));
|
|
|
}
|
|
|
|
|
|
tap({ next: tapNext, complete: tapComplete, error: tapError }: Observer<T>) {
|
|
|
return new ObservableImpl<T>(({ next, complete, error }) =>
|
|
|
this._producer({
|
|
|
next: tapNext ? (v => (tapNext(v), next(v))) : next,
|
|
|
complete: tapComplete ? (() => (tapComplete(), complete())) : complete,
|
|
|
error: tapError ? (e => (tapError(e), error(e))) : error
|
|
|
}));
|
|
|
}
|
|
|
|
|
|
filter(predicate: Predicate<T>) {
|
|
|
return new ObservableImpl<T>(({ next, ...rest }) =>
|
|
|
this._producer({
|
|
|
next: next !== noop ? (v: T) => predicate(v) ? next(v) : void (0) : noop,
|
|
|
...rest
|
|
|
})
|
|
|
);
|
|
|
}
|
|
|
|
|
|
until(predicate: Predicate<T>) {
|
|
|
return new ObservableImpl<T>(({ next, complete, ...rest }) =>
|
|
|
this._producer({
|
|
|
next: v => predicate(v) ? complete() : next(v),
|
|
|
complete,
|
|
|
...rest
|
|
|
})
|
|
|
);
|
|
|
}
|
|
|
|
|
|
while(predicate: Predicate<T>) {
|
|
|
return new ObservableImpl<T>(({ next, complete, ...rest }) =>
|
|
|
this._producer({
|
|
|
next: v => predicate(v) ? next(v) : complete(),
|
|
|
complete,
|
|
|
...rest
|
|
|
})
|
|
|
);
|
|
|
}
|
|
|
|
|
|
scan<A>(accumulator: AccumulatorFn<T, A>, initial: A): Observable<A>;
|
|
|
scan(accumulator: AccumulatorFn<T, T>): Observable<T>;
|
|
|
scan<A>(...args: [AccumulatorFn<T, A>, A] | [AccumulatorFn<T, T>]) {
|
|
|
if (args.length === 1) {
|
|
|
return new ObservableImpl<T | A>(({ next, ...rest }) => {
|
|
|
const [accumulator] = args;
|
|
|
let _acc: T;
|
|
|
let index = 0;
|
|
|
return this._producer({
|
|
|
next: next !== noop ? (v: T) => next(index++ === 0 ? _acc = v : _acc = accumulator(_acc, v)) : noop,
|
|
|
...rest
|
|
|
});
|
|
|
});
|
|
|
} else {
|
|
|
return new ObservableImpl<T | A>(({ next, ...rest }) => {
|
|
|
const [accumulator, initial] = args;
|
|
|
let _acc = initial;
|
|
|
return this._producer({
|
|
|
next: next !== noop ? (v: T) => next(_acc = accumulator(_acc, v)) : noop,
|
|
|
...rest
|
|
|
});
|
|
|
});
|
|
|
}
|
|
|
}
|
|
|
|
|
|
reduce<A>(accumulator: AccumulatorFn<T, A>, initial: A): Observable<A>;
|
|
|
reduce(accumulator: AccumulatorFn<T, T>): Observable<T>;
|
|
|
reduce<A>(...args: [AccumulatorFn<T, A>, A] | [AccumulatorFn<T, T>]) {
|
|
|
|
|
|
if (args.length === 1) {
|
|
|
return new ObservableImpl<T | A>(({ next, complete, error }) => {
|
|
|
const [accumulator] = args;
|
|
|
let _acc: T;
|
|
|
let index = 0;
|
|
|
return this._producer({
|
|
|
next: next !== noop ? (v: T) => {
|
|
|
_acc = index++ === 0 ? v : accumulator(_acc, v);
|
|
|
} : noop,
|
|
|
complete: () => {
|
|
|
if (index === 0) {
|
|
|
error(new Error("The sequence can't be empty"));
|
|
|
} else {
|
|
|
next(_acc);
|
|
|
complete();
|
|
|
}
|
|
|
},
|
|
|
error
|
|
|
});
|
|
|
});
|
|
|
} else {
|
|
|
return new ObservableImpl<T | A>(({ next, complete, error }) => {
|
|
|
const [accumulator, initial] = args;
|
|
|
let _acc = initial;
|
|
|
return this._producer({
|
|
|
next: next !== noop ? (v: T) => {
|
|
|
_acc = accumulator(_acc, v);
|
|
|
} : noop,
|
|
|
complete: () => {
|
|
|
next(_acc);
|
|
|
complete();
|
|
|
},
|
|
|
error
|
|
|
});
|
|
|
});
|
|
|
}
|
|
|
}
|
|
|
|
|
|
cat(...seq: Subscribable<T>[]) {
|
|
|
return new ObservableImpl<T>(({ next, complete: final, ...rest }) => {
|
|
|
let cleanup: () => void;
|
|
|
const len = seq.length;
|
|
|
const complete = (i: number) => i < len ?
|
|
|
() => {
|
|
|
const subscription = seq[i].subscribe({ next, complete: complete(i + 1), ...rest });
|
|
|
cleanup = subscription.unsubscribe.bind(subscription);
|
|
|
} : final;
|
|
|
|
|
|
cleanup = this._producer({ next, complete: complete(0), ...rest }) ?? noop;
|
|
|
|
|
|
return () => cleanup();
|
|
|
});
|
|
|
}
|
|
|
|
|
|
pipe<U>(op: OperatorFn<T, U>): Observable<U> {
|
|
|
return op(this);
|
|
|
}
|
|
|
|
|
|
next(ct = Cancellation.none) {
|
|
|
return new Promise<T>((resolve, reject) => {
|
|
|
ct.throwIfRequested();
|
|
|
|
|
|
const { teardown, finalize, isClosed, error } = subscription(reject, ct);
|
|
|
teardown((0, this._producer)({
|
|
|
next: (item: T) => finalize() && resolve(item),
|
|
|
complete: () => finalize() && reject(new Error("The sequence is empty")),
|
|
|
error,
|
|
|
isClosed
|
|
|
}));
|
|
|
|
|
|
});
|
|
|
}
|
|
|
|
|
|
collect(ct = Cancellation.none) {
|
|
|
return new Promise<T[]>((resolve, reject) => {
|
|
|
ct.throwIfRequested();
|
|
|
|
|
|
const acc: T[] = [];
|
|
|
|
|
|
const { teardown, finalize, isClosed, error } = subscription(reject, ct);
|
|
|
teardown((0, this._producer)({
|
|
|
next: (item: T) => isClosed() ? void (0) : acc.push(item),
|
|
|
complete: () => finalize() && resolve(acc),
|
|
|
error,
|
|
|
isClosed
|
|
|
}));
|
|
|
});
|
|
|
}
|
|
|
|
|
|
forEach(next: (item: T) => void, ct = Cancellation.none): Promise<void> {
|
|
|
return new Promise<void>((resolve, reject) => {
|
|
|
ct.throwIfRequested();
|
|
|
|
|
|
const { teardown, finalize, isClosed, error } = subscription(reject, ct);
|
|
|
|
|
|
teardown((0, this._producer)({
|
|
|
next: item => !isClosed() && next(item),
|
|
|
complete: () => finalize() && resolve(),
|
|
|
error,
|
|
|
isClosed
|
|
|
}));
|
|
|
});
|
|
|
}
|
|
|
|
|
|
}
|