##// END OF EJS Templates
Implemented subscription SubscriptionImpl, fixed subscription resource management
Implemented subscription SubscriptionImpl, fixed subscription resource management

File last commit:

r158:078eca3dc271 v1.10.3 default
r158:078eca3dc271 v1.10.3 default
Show More
ObservableImpl.ts
241 lines | 8.3 KiB | video/mp2t | TypeScriptLexer
/ djx / src / main / ts / observable / ObservableImpl.ts
cin
Movin the observable implementation to the class
r153 import { id as mid } from "module";
cin
WIP observables
r157 import { AccumulatorFn, Observable, Observer, OperatorFn, Producer, Sink, Subscribable, Unsubscribable } from "./interfaces";
cin
Movin the observable implementation to the class
r153 import { TraceSource } from "@implab/core-amd/log/TraceSource";
cin
bump dependencies, bump ts to 5.2...
r155 import { Predicate } from "@implab/core-amd/interfaces";
import { Cancellation } from "@implab/core-amd/Cancellation";
cin
Implemented subscription SubscriptionImpl, fixed subscription resource management
r158 import { Subscription, SubscriptionImpl } from "./SubscriptionImpl";
cin
Movin the observable implementation to the class
r153
const trace = TraceSource.get(mid);
const noop = () => { };
const errorFallback = (e: unknown) => trace.error("Unhandled observable error: {0}", e);
cin
WIP observables
r157 type FusedSink<T> = Omit<Sink<T>, "isClosed">;
cin
Implemented subscription SubscriptionImpl, fixed subscription resource management
r158 export const fuse = <T>(producer: Producer<T>) =>
(sink: FusedSink<T>, subscription: Subscription): Unsubscribable => {
cin
WIP observables
r157
cin
Implemented subscription SubscriptionImpl, fixed subscription resource management
r158 subscription.teardown(producer({
next: subscription.around(sink.next),
complete: () => subscription.finalize() && sink.complete(),
error: err => subscription.finalize() && sink.error(err),
isClosed: subscription.isClosed.bind(subscription)
}));
cin
WIP observables
r157
cin
Implemented subscription SubscriptionImpl, fixed subscription resource management
r158 return subscription;
};
cin
WIP observables
r157
cin
Implemented subscription SubscriptionImpl, fixed subscription resource management
r158 type FusedProducer<T> = (sink: FusedSink<T>, subscription: Subscription) => void;
cin
WIP observables
r157
cin
Movin the observable implementation to the class
r153 export class ObservableImpl<T> implements Observable<T> {
cin
WIP observables
r157 private readonly _producer: FusedProducer<T>;
cin
Movin the observable implementation to the class
r153
cin
WIP observables
r157 constructor(producer: FusedProducer<T>) {
cin
Movin the observable implementation to the class
r153 this._producer = producer;
}
cin
Implemented subscription SubscriptionImpl, fixed subscription resource management
r158 subscribe(consumer: Observer<T> = {}): Unsubscribable {
const error = (err: unknown) => consumer.error ? consumer.error(err) : errorFallback(err);
cin
WIP observables
r157 const next = consumer.next ? consumer.next.bind(consumer) : noop;
cin
Implemented subscription SubscriptionImpl, fixed subscription resource management
r158 const complete = () => consumer.complete ? consumer.complete() : void (0);
cin
bump dependencies, bump ts to 5.2...
r155
cin
Implemented subscription SubscriptionImpl, fixed subscription resource management
r158 const subscription = new SubscriptionImpl();
cin
bump dependencies, bump ts to 5.2...
r155
cin
Implemented subscription SubscriptionImpl, fixed subscription resource management
r158 this._producer({ next, error, complete }, subscription);
return subscription;
cin
Movin the observable implementation to the class
r153 }
map<T2>(mapper: (value: T) => T2) {
cin
Implemented subscription SubscriptionImpl, fixed subscription resource management
r158 return new ObservableImpl<T2>(({ next, ...rest }, subscription) =>
(0, this._producer)(
{
next: next !== noop ? (v: T) => next(mapper(v)) : noop,
...rest
}, subscription
));
cin
Movin the observable implementation to the class
r153 }
tap({ next: tapNext, complete: tapComplete, error: tapError }: Observer<T>) {
cin
Implemented subscription SubscriptionImpl, fixed subscription resource management
r158 return new ObservableImpl<T>(({ next, complete, error }, subscription) =>
(0, this._producer)(
{
next: tapNext ? (v => (tapNext(v), next(v))) : next,
complete: tapComplete ? (() => (tapComplete(), complete())) : complete,
error: tapError ? (e => (tapError(e), error(e))) : error
},
subscription
));
cin
Movin the observable implementation to the class
r153 }
cin
bump dependencies, bump ts to 5.2...
r155 filter(predicate: Predicate<T>) {
cin
Implemented subscription SubscriptionImpl, fixed subscription resource management
r158 return new ObservableImpl<T>(({ next, ...rest }, subscription) =>
(0, this._producer)(
{
next: next !== noop ? (v: T) => predicate(v) ? next(v) : void (0) : noop,
...rest
},
subscription
));
cin
bump dependencies, bump ts to 5.2...
r155 }
cin
Movin the observable implementation to the class
r153
cin
bump dependencies, bump ts to 5.2...
r155 until(predicate: Predicate<T>) {
cin
Implemented subscription SubscriptionImpl, fixed subscription resource management
r158 return new ObservableImpl<T>(({ next, complete, ...rest }, subscription) =>
(0, this._producer)({
next: v => predicate(v) ? subscription.finalize() && complete() : next(v),
cin
bump dependencies, bump ts to 5.2...
r155 complete,
...rest
cin
Implemented subscription SubscriptionImpl, fixed subscription resource management
r158 }, subscription)
cin
bump dependencies, bump ts to 5.2...
r155 );
}
cin
Movin the observable implementation to the class
r153
cin
bump dependencies, bump ts to 5.2...
r155 while(predicate: Predicate<T>) {
cin
Implemented subscription SubscriptionImpl, fixed subscription resource management
r158 return new ObservableImpl<T>(({ next, complete, ...rest }, subscription) =>
(0, this._producer)({
next: v => predicate(v) ? next(v) : subscription.finalize() && complete(),
cin
bump dependencies, bump ts to 5.2...
r155 complete,
...rest
cin
Implemented subscription SubscriptionImpl, fixed subscription resource management
r158 }, subscription)
cin
bump dependencies, bump ts to 5.2...
r155 );
}
cin
WIP observables
r157 scan<A>(accumulator: AccumulatorFn<T, A>, initial: A): Observable<A>;
scan(accumulator: AccumulatorFn<T, T>): Observable<T>;
cin
bump dependencies, bump ts to 5.2...
r155 scan<A>(...args: [AccumulatorFn<T, A>, A] | [AccumulatorFn<T, T>]) {
cin
Movin the observable implementation to the class
r153 if (args.length === 1) {
cin
Implemented subscription SubscriptionImpl, fixed subscription resource management
r158 const [accumulator] = args;
return new ObservableImpl<T | A>(({ next, ...rest }, subscription) => {
cin
Movin the observable implementation to the class
r153 let _acc: T;
let index = 0;
cin
Implemented subscription SubscriptionImpl, fixed subscription resource management
r158 (0, this._producer)({
next: next !== noop ? (v: T) => next(
index++ === 0 ? _acc = v : _acc = accumulator(_acc, v)
) : noop,
cin
bump dependencies, bump ts to 5.2...
r155 ...rest
cin
Implemented subscription SubscriptionImpl, fixed subscription resource management
r158 }, subscription);
cin
bump dependencies, bump ts to 5.2...
r155 });
} else {
cin
Implemented subscription SubscriptionImpl, fixed subscription resource management
r158 const [accumulator, initial] = args;
return new ObservableImpl<T | A>(({ next, ...rest }, subscription) => {
cin
bump dependencies, bump ts to 5.2...
r155 let _acc = initial;
cin
Implemented subscription SubscriptionImpl, fixed subscription resource management
r158 (0, this._producer)({
cin
bump dependencies, bump ts to 5.2...
r155 next: next !== noop ? (v: T) => next(_acc = accumulator(_acc, v)) : noop,
...rest
cin
Implemented subscription SubscriptionImpl, fixed subscription resource management
r158 }, subscription);
cin
bump dependencies, bump ts to 5.2...
r155 });
}
}
cin
WIP observables
r157 reduce<A>(accumulator: AccumulatorFn<T, A>, initial: A): Observable<A>;
reduce(accumulator: AccumulatorFn<T, T>): Observable<T>;
cin
bump dependencies, bump ts to 5.2...
r155 reduce<A>(...args: [AccumulatorFn<T, A>, A] | [AccumulatorFn<T, T>]) {
if (args.length === 1) {
cin
Implemented subscription SubscriptionImpl, fixed subscription resource management
r158 const [accumulator] = args;
return new ObservableImpl<T>(({ next, complete, error }, subscription) => {
cin
bump dependencies, bump ts to 5.2...
r155 let _acc: T;
let index = 0;
cin
Implemented subscription SubscriptionImpl, fixed subscription resource management
r158 (0, this._producer)({
cin
Movin the observable implementation to the class
r153 next: next !== noop ? (v: T) => {
_acc = index++ === 0 ? v : accumulator(_acc, v);
} : noop,
complete: () => {
if (index === 0) {
error(new Error("The sequence can't be empty"));
} else {
next(_acc);
complete();
}
},
cin
WIP observables
r157 error
cin
Implemented subscription SubscriptionImpl, fixed subscription resource management
r158 }, subscription);
cin
bump dependencies, bump ts to 5.2...
r155 });
} else {
cin
Implemented subscription SubscriptionImpl, fixed subscription resource management
r158 const [accumulator, initial] = args;
return new ObservableImpl<A>(({ next, complete, error }, subscription) => {
cin
Movin the observable implementation to the class
r153 let _acc = initial;
cin
Implemented subscription SubscriptionImpl, fixed subscription resource management
r158 (0, this._producer)({
cin
Movin the observable implementation to the class
r153 next: next !== noop ? (v: T) => {
_acc = accumulator(_acc, v);
} : noop,
complete: () => {
next(_acc);
complete();
},
cin
WIP observables
r157 error
cin
Implemented subscription SubscriptionImpl, fixed subscription resource management
r158 }, subscription);
cin
bump dependencies, bump ts to 5.2...
r155 });
}
}
cin
Movin the observable implementation to the class
r153
cin
bump dependencies, bump ts to 5.2...
r155 cat(...seq: Subscribable<T>[]) {
cin
Implemented subscription SubscriptionImpl, fixed subscription resource management
r158 return new ObservableImpl<T>(({ next, complete: final, ...rest }, subscription) => {
cin
bump dependencies, bump ts to 5.2...
r155 const len = seq.length;
const complete = (i: number) => i < len ?
() => {
cin
Implemented subscription SubscriptionImpl, fixed subscription resource management
r158 h = seq[i].subscribe({ next, complete: complete(i + 1), ...rest });
} : () => subscription.finalize() && final();
cin
bump dependencies, bump ts to 5.2...
r155
cin
Implemented subscription SubscriptionImpl, fixed subscription resource management
r158 let h = this.subscribe({ next, complete: complete(0), ...rest });
subscription.teardown(() => h.unsubscribe());
cin
Movin the observable implementation to the class
r153
cin
bump dependencies, bump ts to 5.2...
r155 });
}
cin
Movin the observable implementation to the class
r153
cin
bump dependencies, bump ts to 5.2...
r155 pipe<U>(op: OperatorFn<T, U>): Observable<U> {
return op(this);
}
cin
Movin the observable implementation to the class
r153
cin
bump dependencies, bump ts to 5.2...
r155 next(ct = Cancellation.none) {
return new Promise<T>((resolve, reject) => {
ct.throwIfRequested();
cin
Implemented subscription SubscriptionImpl, fixed subscription resource management
r158 const subscription = new SubscriptionImpl();
subscription.cancellable(ct, reject);
(0, this._producer)({
next: resolve,
complete: () => reject(new Error("The sequence is empty")),
error: reject,
}, subscription);
cin
bump dependencies, bump ts to 5.2...
r155
});
}
collect(ct = Cancellation.none) {
return new Promise<T[]>((resolve, reject) => {
ct.throwIfRequested();
cin
Movin the observable implementation to the class
r153
cin
Implemented subscription SubscriptionImpl, fixed subscription resource management
r158 const subscription = new SubscriptionImpl();
subscription.cancellable(ct, reject);
cin
bump dependencies, bump ts to 5.2...
r155 const acc: T[] = [];
cin
Implemented subscription SubscriptionImpl, fixed subscription resource management
r158 (0, this._producer)({
next: (item: T) => acc.push(item),
complete: () => resolve(acc),
error: reject
}, subscription);
cin
bump dependencies, bump ts to 5.2...
r155 });
}
cin
WIP observables
r157 forEach(next: (item: T) => void, ct = Cancellation.none): Promise<void> {
return new Promise<void>((resolve, reject) => {
ct.throwIfRequested();
cin
Implemented subscription SubscriptionImpl, fixed subscription resource management
r158 const subscription = new SubscriptionImpl();
subscription.cancellable(ct, reject);
cin
WIP observables
r157
cin
Implemented subscription SubscriptionImpl, fixed subscription resource management
r158 (0, this._producer)({
next,
complete: resolve,
error: reject
}, subscription);
cin
WIP observables
r157 });
}
cin
Movin the observable implementation to the class
r153 }