##// END OF EJS Templates
Added tag v1.10.2 for changeset 6acbe6efbe20
Added tag v1.10.2 for changeset 6acbe6efbe20

File last commit:

r155:6acbe6efbe20 v1.10.2 default
r156:46e0da3cebdf default
Show More
ObservableImpl.ts
225 lines | 7.4 KiB | video/mp2t | TypeScriptLexer
import { id as mid } from "module";
import { AccumulatorFn, Observable, Observer, OperatorFn, Producer, Subscribable } from "./interfaces";
import { TraceSource } from "@implab/core-amd/log/TraceSource";
import { Predicate } from "@implab/core-amd/interfaces";
import { Cancellation } from "@implab/core-amd/Cancellation";
const trace = TraceSource.get(mid);
const noop = () => { };
const errorFallback = (e: unknown) => trace.error("Unhandled observable error: {0}", e);
const subscription = () => {
let done = false;
let cleanup = noop;
const finalize = () => done ? false : (cleanup(), done = true);
const isClosed = () => done;
const teardown = (cb: void | (() => void)) => {
if (cb) {
if (done) {
cb();
} else {
const _prev = cleanup;
cleanup = () => (_prev(), cb());
}
}
};
return { finalize, isClosed, teardown };
};
export class ObservableImpl<T> implements Observable<T> {
private readonly _producer: Producer<T>;
constructor(producer: Producer<T>) {
this._producer = producer;
}
subscribe(consumer: Observer<T> = {}) {
const { teardown, isClosed, finalize } = subscription();
teardown(this._producer({
next: consumer.next ? consumer.next.bind(consumer) : noop,
error: err => finalize() && (consumer.error ? consumer.error(err) : errorFallback(err)),
complete: () => finalize() && consumer.complete && consumer.complete(),
isClosed
}));
return { unsubscribe: finalize };
}
map<T2>(mapper: (value: T) => T2) {
return new ObservableImpl<T2>(({ next, ...rest }) =>
this._producer({
next: next !== noop ? (v: T) => next(mapper(v)) : noop,
...rest
}));
}
tap({ next: tapNext, complete: tapComplete, error: tapError }: Observer<T>) {
return new ObservableImpl<T>(({ next, complete, error, isClosed }) =>
this._producer({
next: tapNext ? (v => (tapNext(v), next(v))) : next,
complete: tapComplete ? (() => (tapComplete(), complete())) : complete,
error: tapError ? (e => (tapError(e), error(e))) : error,
isClosed
}));
}
filter(predicate: Predicate<T>) {
return new ObservableImpl<T>(({ next, ...rest }) =>
this._producer({
next: next !== noop ? (v: T) => predicate(v) ? next(v) : void (0) : noop,
...rest
})
);
}
until(predicate: Predicate<T>) {
return new ObservableImpl<T>(({ next, complete, ...rest }) =>
this._producer({
next: v => predicate(v) ? complete() : next(v),
complete,
...rest
})
);
}
while(predicate: Predicate<T>) {
return new ObservableImpl<T>(({ next, complete, ...rest }) =>
this._producer({
next: v => predicate(v) ? next(v) : complete(),
complete,
...rest
})
);
}
scan<A>(...args: [AccumulatorFn<T, A>, A] | [AccumulatorFn<T, T>]) {
if (args.length === 1) {
return new ObservableImpl<T | A>(({ next, ...rest }) => {
const [accumulator] = args;
let _acc: T;
let index = 0;
return this._producer({
next: next !== noop ? (v: T) => next(index++ === 0 ? _acc = v : _acc = accumulator(_acc, v)) : noop,
...rest
});
});
} else {
return new ObservableImpl<T | A>(({ next, ...rest }) => {
const [accumulator, initial] = args;
let _acc = initial;
return this._producer({
next: next !== noop ? (v: T) => next(_acc = accumulator(_acc, v)) : noop,
...rest
});
});
}
}
reduce<A>(...args: [AccumulatorFn<T, A>, A] | [AccumulatorFn<T, T>]) {
if (args.length === 1) {
return new ObservableImpl<T | A>(({ next, complete, error, isClosed }) => {
const [accumulator] = args;
let _acc: T;
let index = 0;
return this._producer({
next: next !== noop ? (v: T) => {
_acc = index++ === 0 ? v : accumulator(_acc, v);
} : noop,
complete: () => {
if (index === 0) {
error(new Error("The sequence can't be empty"));
} else {
next(_acc);
complete();
}
},
error,
isClosed
});
});
} else {
return new ObservableImpl<T | A>(({ next, complete, error, isClosed }) => {
const [accumulator, initial] = args;
let _acc = initial;
return this._producer({
next: next !== noop ? (v: T) => {
_acc = accumulator(_acc, v);
} : noop,
complete: () => {
next(_acc);
complete();
},
error,
isClosed
});
});
}
}
cat(...seq: Subscribable<T>[]) {
return new ObservableImpl<T>(({ next, complete: final, ...rest }) => {
let cleanup: () => void;
const len = seq.length;
const complete = (i: number) => i < len ?
() => {
const subscription = seq[i].subscribe({ next, complete: complete(i + 1), ...rest });
cleanup = subscription.unsubscribe.bind(subscription);
} : final;
cleanup = this._producer({ next, complete: complete(0), ...rest }) ?? noop;
return () => cleanup();
});
}
pipe<U>(op: OperatorFn<T, U>): Observable<U> {
return op(this);
}
next(ct = Cancellation.none) {
return new Promise<T>((resolve, reject) => {
ct.throwIfRequested();
const { teardown, finalize, isClosed } = subscription();
const error = (err: unknown) => finalize() && reject(err);
const h = ct.register(error);
teardown(() => h.destroy());
teardown((0, this._producer)({
next: (item: T) => finalize() && resolve(item),
complete: () => finalize() && reject(new Error("The sequence is empty")),
error,
isClosed
}));
});
}
collect(ct = Cancellation.none) {
return new Promise<T[]>((resolve, reject) => {
ct.throwIfRequested();
const acc: T[] = [];
const { teardown, finalize, isClosed } = subscription();
const error = (err: unknown) => finalize() && reject(err);
const h = ct.register(error);
teardown(() => h.destroy());
teardown((0, this._producer)({
next: (item: T) => isClosed() ? void (0) : acc.push(item),
complete: () => finalize() && resolve(acc),
error,
isClosed
}));
});
}
}