ObservableImpl.ts
225 lines
| 7.4 KiB
| video/mp2t
|
TypeScriptLexer
|
|
r153 | import { id as mid } from "module"; | |
|
|
r155 | import { AccumulatorFn, Observable, Observer, OperatorFn, Producer, Subscribable } from "./interfaces"; | |
|
|
r153 | import { TraceSource } from "@implab/core-amd/log/TraceSource"; | |
|
|
r155 | import { Predicate } from "@implab/core-amd/interfaces"; | |
| import { Cancellation } from "@implab/core-amd/Cancellation"; | |||
|
|
r153 | ||
| const trace = TraceSource.get(mid); | |||
| const noop = () => { }; | |||
| const errorFallback = (e: unknown) => trace.error("Unhandled observable error: {0}", e); | |||
|
|
r155 | const subscription = () => { | |
| let done = false; | |||
| let cleanup = noop; | |||
| const finalize = () => done ? false : (cleanup(), done = true); | |||
| const isClosed = () => done; | |||
| const teardown = (cb: void | (() => void)) => { | |||
| if (cb) { | |||
| if (done) { | |||
| cb(); | |||
| } else { | |||
| const _prev = cleanup; | |||
| cleanup = () => (_prev(), cb()); | |||
| } | |||
| } | |||
|
|
r153 | }; | |
|
|
r155 | ||
| return { finalize, isClosed, teardown }; | |||
|
|
r153 | }; | |
| export class ObservableImpl<T> implements Observable<T> { | |||
|
|
r155 | private readonly _producer: Producer<T>; | |
|
|
r153 | ||
|
|
r155 | constructor(producer: Producer<T>) { | |
|
|
r153 | this._producer = producer; | |
| } | |||
| subscribe(consumer: Observer<T> = {}) { | |||
|
|
r155 | const { teardown, isClosed, finalize } = subscription(); | |
| teardown(this._producer({ | |||
| next: consumer.next ? consumer.next.bind(consumer) : noop, | |||
| error: err => finalize() && (consumer.error ? consumer.error(err) : errorFallback(err)), | |||
| complete: () => finalize() && consumer.complete && consumer.complete(), | |||
| isClosed | |||
| })); | |||
| return { unsubscribe: finalize }; | |||
|
|
r153 | } | |
| map<T2>(mapper: (value: T) => T2) { | |||
| return new ObservableImpl<T2>(({ next, ...rest }) => | |||
| this._producer({ | |||
| next: next !== noop ? (v: T) => next(mapper(v)) : noop, | |||
| ...rest | |||
| })); | |||
| } | |||
| tap({ next: tapNext, complete: tapComplete, error: tapError }: Observer<T>) { | |||
|
|
r155 | return new ObservableImpl<T>(({ next, complete, error, isClosed }) => | |
|
|
r153 | this._producer({ | |
| next: tapNext ? (v => (tapNext(v), next(v))) : next, | |||
| complete: tapComplete ? (() => (tapComplete(), complete())) : complete, | |||
|
|
r155 | error: tapError ? (e => (tapError(e), error(e))) : error, | |
| isClosed | |||
|
|
r153 | })); | |
| } | |||
|
|
r155 | filter(predicate: Predicate<T>) { | |
| return new ObservableImpl<T>(({ next, ...rest }) => | |||
| this._producer({ | |||
| next: next !== noop ? (v: T) => predicate(v) ? next(v) : void (0) : noop, | |||
| ...rest | |||
| }) | |||
| ); | |||
| } | |||
|
|
r153 | ||
|
|
r155 | until(predicate: Predicate<T>) { | |
| return new ObservableImpl<T>(({ next, complete, ...rest }) => | |||
| this._producer({ | |||
| next: v => predicate(v) ? complete() : next(v), | |||
| complete, | |||
| ...rest | |||
| }) | |||
| ); | |||
| } | |||
|
|
r153 | ||
|
|
r155 | while(predicate: Predicate<T>) { | |
| return new ObservableImpl<T>(({ next, complete, ...rest }) => | |||
| this._producer({ | |||
| next: v => predicate(v) ? next(v) : complete(), | |||
| complete, | |||
| ...rest | |||
| }) | |||
| ); | |||
| } | |||
| scan<A>(...args: [AccumulatorFn<T, A>, A] | [AccumulatorFn<T, T>]) { | |||
|
|
r153 | if (args.length === 1) { | |
|
|
r155 | return new ObservableImpl<T | A>(({ next, ...rest }) => { | |
|
|
r153 | const [accumulator] = args; | |
| let _acc: T; | |||
| let index = 0; | |||
|
|
r155 | return this._producer({ | |
| next: next !== noop ? (v: T) => next(index++ === 0 ? _acc = v : _acc = accumulator(_acc, v)) : noop, | |||
| ...rest | |||
| }); | |||
| }); | |||
| } else { | |||
| return new ObservableImpl<T | A>(({ next, ...rest }) => { | |||
| const [accumulator, initial] = args; | |||
| let _acc = initial; | |||
| return this._producer({ | |||
| next: next !== noop ? (v: T) => next(_acc = accumulator(_acc, v)) : noop, | |||
| ...rest | |||
| }); | |||
| }); | |||
| } | |||
| } | |||
| reduce<A>(...args: [AccumulatorFn<T, A>, A] | [AccumulatorFn<T, T>]) { | |||
| if (args.length === 1) { | |||
| return new ObservableImpl<T | A>(({ next, complete, error, isClosed }) => { | |||
| const [accumulator] = args; | |||
| let _acc: T; | |||
| let index = 0; | |||
| return this._producer({ | |||
|
|
r153 | next: next !== noop ? (v: T) => { | |
| _acc = index++ === 0 ? v : accumulator(_acc, v); | |||
| } : noop, | |||
| complete: () => { | |||
| if (index === 0) { | |||
| error(new Error("The sequence can't be empty")); | |||
| } else { | |||
| next(_acc); | |||
| complete(); | |||
| } | |||
| }, | |||
|
|
r155 | error, | |
| isClosed | |||
|
|
r153 | }); | |
|
|
r155 | }); | |
| } else { | |||
| return new ObservableImpl<T | A>(({ next, complete, error, isClosed }) => { | |||
|
|
r153 | const [accumulator, initial] = args; | |
| let _acc = initial; | |||
|
|
r155 | return this._producer({ | |
|
|
r153 | next: next !== noop ? (v: T) => { | |
| _acc = accumulator(_acc, v); | |||
| } : noop, | |||
| complete: () => { | |||
| next(_acc); | |||
| complete(); | |||
| }, | |||
|
|
r155 | error, | |
| isClosed | |||
|
|
r153 | }); | |
|
|
r155 | }); | |
| } | |||
| } | |||
|
|
r153 | ||
|
|
r155 | cat(...seq: Subscribable<T>[]) { | |
| return new ObservableImpl<T>(({ next, complete: final, ...rest }) => { | |||
| let cleanup: () => void; | |||
| const len = seq.length; | |||
| const complete = (i: number) => i < len ? | |||
| () => { | |||
| const subscription = seq[i].subscribe({ next, complete: complete(i + 1), ...rest }); | |||
| cleanup = subscription.unsubscribe.bind(subscription); | |||
| } : final; | |||
| cleanup = this._producer({ next, complete: complete(0), ...rest }) ?? noop; | |||
|
|
r153 | ||
|
|
r155 | return () => cleanup(); | |
| }); | |||
| } | |||
|
|
r153 | ||
|
|
r155 | pipe<U>(op: OperatorFn<T, U>): Observable<U> { | |
| return op(this); | |||
| } | |||
|
|
r153 | ||
|
|
r155 | next(ct = Cancellation.none) { | |
| return new Promise<T>((resolve, reject) => { | |||
| ct.throwIfRequested(); | |||
| const { teardown, finalize, isClosed } = subscription(); | |||
|
|
r153 | ||
|
|
r155 | const error = (err: unknown) => finalize() && reject(err); | |
| const h = ct.register(error); | |||
| teardown(() => h.destroy()); | |||
| teardown((0, this._producer)({ | |||
| next: (item: T) => finalize() && resolve(item), | |||
| complete: () => finalize() && reject(new Error("The sequence is empty")), | |||
| error, | |||
| isClosed | |||
| })); | |||
| }); | |||
| } | |||
| collect(ct = Cancellation.none) { | |||
| return new Promise<T[]>((resolve, reject) => { | |||
| ct.throwIfRequested(); | |||
|
|
r153 | ||
|
|
r155 | const acc: T[] = []; | |
| const { teardown, finalize, isClosed } = subscription(); | |||
| const error = (err: unknown) => finalize() && reject(err); | |||
| const h = ct.register(error); | |||
| teardown(() => h.destroy()); | |||
| teardown((0, this._producer)({ | |||
| next: (item: T) => isClosed() ? void (0) : acc.push(item), | |||
| complete: () => finalize() && resolve(acc), | |||
| error, | |||
| isClosed | |||
| })); | |||
| }); | |||
| } | |||
|
|
r153 | } |
