ObservableImpl.ts
241 lines
| 8.3 KiB
| video/mp2t
|
TypeScriptLexer
cin
|
r153 | import { id as mid } from "module"; | |
cin
|
r157 | import { AccumulatorFn, Observable, Observer, OperatorFn, Producer, Sink, Subscribable, Unsubscribable } from "./interfaces"; | |
cin
|
r153 | import { TraceSource } from "@implab/core-amd/log/TraceSource"; | |
cin
|
r155 | import { Predicate } from "@implab/core-amd/interfaces"; | |
import { Cancellation } from "@implab/core-amd/Cancellation"; | |||
cin
|
r158 | import { Subscription, SubscriptionImpl } from "./SubscriptionImpl"; | |
cin
|
r153 | ||
const trace = TraceSource.get(mid); | |||
const noop = () => { }; | |||
const errorFallback = (e: unknown) => trace.error("Unhandled observable error: {0}", e); | |||
cin
|
r157 | type FusedSink<T> = Omit<Sink<T>, "isClosed">; | |
cin
|
r158 | export const fuse = <T>(producer: Producer<T>) => | |
(sink: FusedSink<T>, subscription: Subscription): Unsubscribable => { | |||
cin
|
r157 | ||
cin
|
r158 | subscription.teardown(producer({ | |
next: subscription.around(sink.next), | |||
complete: () => subscription.finalize() && sink.complete(), | |||
error: err => subscription.finalize() && sink.error(err), | |||
isClosed: subscription.isClosed.bind(subscription) | |||
})); | |||
cin
|
r157 | ||
cin
|
r158 | return subscription; | |
}; | |||
cin
|
r157 | ||
cin
|
r158 | type FusedProducer<T> = (sink: FusedSink<T>, subscription: Subscription) => void; | |
cin
|
r157 | ||
cin
|
r153 | export class ObservableImpl<T> implements Observable<T> { | |
cin
|
r157 | private readonly _producer: FusedProducer<T>; | |
cin
|
r153 | ||
cin
|
r157 | constructor(producer: FusedProducer<T>) { | |
cin
|
r153 | this._producer = producer; | |
} | |||
cin
|
r158 | subscribe(consumer: Observer<T> = {}): Unsubscribable { | |
const error = (err: unknown) => consumer.error ? consumer.error(err) : errorFallback(err); | |||
cin
|
r157 | const next = consumer.next ? consumer.next.bind(consumer) : noop; | |
cin
|
r158 | const complete = () => consumer.complete ? consumer.complete() : void (0); | |
cin
|
r155 | ||
cin
|
r158 | const subscription = new SubscriptionImpl(); | |
cin
|
r155 | ||
cin
|
r158 | this._producer({ next, error, complete }, subscription); | |
return subscription; | |||
cin
|
r153 | } | |
map<T2>(mapper: (value: T) => T2) { | |||
cin
|
r158 | return new ObservableImpl<T2>(({ next, ...rest }, subscription) => | |
(0, this._producer)( | |||
{ | |||
next: next !== noop ? (v: T) => next(mapper(v)) : noop, | |||
...rest | |||
}, subscription | |||
)); | |||
cin
|
r153 | } | |
tap({ next: tapNext, complete: tapComplete, error: tapError }: Observer<T>) { | |||
cin
|
r158 | return new ObservableImpl<T>(({ next, complete, error }, subscription) => | |
(0, this._producer)( | |||
{ | |||
next: tapNext ? (v => (tapNext(v), next(v))) : next, | |||
complete: tapComplete ? (() => (tapComplete(), complete())) : complete, | |||
error: tapError ? (e => (tapError(e), error(e))) : error | |||
}, | |||
subscription | |||
)); | |||
cin
|
r153 | } | |
cin
|
r155 | filter(predicate: Predicate<T>) { | |
cin
|
r158 | return new ObservableImpl<T>(({ next, ...rest }, subscription) => | |
(0, this._producer)( | |||
{ | |||
next: next !== noop ? (v: T) => predicate(v) ? next(v) : void (0) : noop, | |||
...rest | |||
}, | |||
subscription | |||
)); | |||
cin
|
r155 | } | |
cin
|
r153 | ||
cin
|
r155 | until(predicate: Predicate<T>) { | |
cin
|
r158 | return new ObservableImpl<T>(({ next, complete, ...rest }, subscription) => | |
(0, this._producer)({ | |||
next: v => predicate(v) ? subscription.finalize() && complete() : next(v), | |||
cin
|
r155 | complete, | |
...rest | |||
cin
|
r158 | }, subscription) | |
cin
|
r155 | ); | |
} | |||
cin
|
r153 | ||
cin
|
r155 | while(predicate: Predicate<T>) { | |
cin
|
r158 | return new ObservableImpl<T>(({ next, complete, ...rest }, subscription) => | |
(0, this._producer)({ | |||
next: v => predicate(v) ? next(v) : subscription.finalize() && complete(), | |||
cin
|
r155 | complete, | |
...rest | |||
cin
|
r158 | }, subscription) | |
cin
|
r155 | ); | |
} | |||
cin
|
r157 | scan<A>(accumulator: AccumulatorFn<T, A>, initial: A): Observable<A>; | |
scan(accumulator: AccumulatorFn<T, T>): Observable<T>; | |||
cin
|
r155 | scan<A>(...args: [AccumulatorFn<T, A>, A] | [AccumulatorFn<T, T>]) { | |
cin
|
r153 | if (args.length === 1) { | |
cin
|
r158 | const [accumulator] = args; | |
return new ObservableImpl<T | A>(({ next, ...rest }, subscription) => { | |||
cin
|
r153 | let _acc: T; | |
let index = 0; | |||
cin
|
r158 | (0, this._producer)({ | |
next: next !== noop ? (v: T) => next( | |||
index++ === 0 ? _acc = v : _acc = accumulator(_acc, v) | |||
) : noop, | |||
cin
|
r155 | ...rest | |
cin
|
r158 | }, subscription); | |
cin
|
r155 | }); | |
} else { | |||
cin
|
r158 | const [accumulator, initial] = args; | |
return new ObservableImpl<T | A>(({ next, ...rest }, subscription) => { | |||
cin
|
r155 | let _acc = initial; | |
cin
|
r158 | (0, this._producer)({ | |
cin
|
r155 | next: next !== noop ? (v: T) => next(_acc = accumulator(_acc, v)) : noop, | |
...rest | |||
cin
|
r158 | }, subscription); | |
cin
|
r155 | }); | |
} | |||
} | |||
cin
|
r157 | reduce<A>(accumulator: AccumulatorFn<T, A>, initial: A): Observable<A>; | |
reduce(accumulator: AccumulatorFn<T, T>): Observable<T>; | |||
cin
|
r155 | reduce<A>(...args: [AccumulatorFn<T, A>, A] | [AccumulatorFn<T, T>]) { | |
if (args.length === 1) { | |||
cin
|
r158 | const [accumulator] = args; | |
return new ObservableImpl<T>(({ next, complete, error }, subscription) => { | |||
cin
|
r155 | let _acc: T; | |
let index = 0; | |||
cin
|
r158 | (0, this._producer)({ | |
cin
|
r153 | next: next !== noop ? (v: T) => { | |
_acc = index++ === 0 ? v : accumulator(_acc, v); | |||
} : noop, | |||
complete: () => { | |||
if (index === 0) { | |||
error(new Error("The sequence can't be empty")); | |||
} else { | |||
next(_acc); | |||
complete(); | |||
} | |||
}, | |||
cin
|
r157 | error | |
cin
|
r158 | }, subscription); | |
cin
|
r155 | }); | |
} else { | |||
cin
|
r158 | const [accumulator, initial] = args; | |
return new ObservableImpl<A>(({ next, complete, error }, subscription) => { | |||
cin
|
r153 | let _acc = initial; | |
cin
|
r158 | (0, this._producer)({ | |
cin
|
r153 | next: next !== noop ? (v: T) => { | |
_acc = accumulator(_acc, v); | |||
} : noop, | |||
complete: () => { | |||
next(_acc); | |||
complete(); | |||
}, | |||
cin
|
r157 | error | |
cin
|
r158 | }, subscription); | |
cin
|
r155 | }); | |
} | |||
} | |||
cin
|
r153 | ||
cin
|
r155 | cat(...seq: Subscribable<T>[]) { | |
cin
|
r158 | return new ObservableImpl<T>(({ next, complete: final, ...rest }, subscription) => { | |
cin
|
r155 | const len = seq.length; | |
const complete = (i: number) => i < len ? | |||
() => { | |||
cin
|
r158 | h = seq[i].subscribe({ next, complete: complete(i + 1), ...rest }); | |
} : () => subscription.finalize() && final(); | |||
cin
|
r155 | ||
cin
|
r158 | let h = this.subscribe({ next, complete: complete(0), ...rest }); | |
subscription.teardown(() => h.unsubscribe()); | |||
cin
|
r153 | ||
cin
|
r155 | }); | |
} | |||
cin
|
r153 | ||
cin
|
r155 | pipe<U>(op: OperatorFn<T, U>): Observable<U> { | |
return op(this); | |||
} | |||
cin
|
r153 | ||
cin
|
r155 | next(ct = Cancellation.none) { | |
return new Promise<T>((resolve, reject) => { | |||
ct.throwIfRequested(); | |||
cin
|
r158 | const subscription = new SubscriptionImpl(); | |
subscription.cancellable(ct, reject); | |||
(0, this._producer)({ | |||
next: resolve, | |||
complete: () => reject(new Error("The sequence is empty")), | |||
error: reject, | |||
}, subscription); | |||
cin
|
r155 | ||
}); | |||
} | |||
collect(ct = Cancellation.none) { | |||
return new Promise<T[]>((resolve, reject) => { | |||
ct.throwIfRequested(); | |||
cin
|
r153 | ||
cin
|
r158 | const subscription = new SubscriptionImpl(); | |
subscription.cancellable(ct, reject); | |||
cin
|
r155 | const acc: T[] = []; | |
cin
|
r158 | (0, this._producer)({ | |
next: (item: T) => acc.push(item), | |||
complete: () => resolve(acc), | |||
error: reject | |||
}, subscription); | |||
cin
|
r155 | }); | |
} | |||
cin
|
r157 | forEach(next: (item: T) => void, ct = Cancellation.none): Promise<void> { | |
return new Promise<void>((resolve, reject) => { | |||
ct.throwIfRequested(); | |||
cin
|
r158 | const subscription = new SubscriptionImpl(); | |
subscription.cancellable(ct, reject); | |||
cin
|
r157 | ||
cin
|
r158 | (0, this._producer)({ | |
next, | |||
complete: resolve, | |||
error: reject | |||
}, subscription); | |||
cin
|
r157 | }); | |
} | |||
cin
|
r153 | } |