##// END OF EJS Templates
corrected tear down logic handling in observables. Added support for observable query results
corrected tear down logic handling in observables. Added support for observable query results

File last commit:

r110:1a190b3a757d v1.4.0 default
r110:1a190b3a757d v1.4.0 default
Show More
observable.ts
229 lines | 6.9 KiB | video/mp2t | TypeScriptLexer
/ djx / src / main / ts / observable.ts
cin
corrected tear down logic handling in observables. Added support for observable query results
r110 import { PromiseOrValue } from "@implab/core-amd/interfaces";
import { isPromise } from "@implab/core-amd/safe";
cin
`Subscribable` is made compatible with rxjs, added map, filter and scan...
r102 /**
* The interface for the consumer of an observable sequence
*/
export interface Observer<T> {
/**
* Called for the next element in the sequence
*/
next: (value: T) => void;
cin
refactoring, adding scope to rendering methods
r96
cin
`Subscribable` is made compatible with rxjs, added map, filter and scan...
r102 /**
* Called once when the error occurs in the sequence.
*/
cin
refactoring, adding scope to rendering methods
r96 error: (e: unknown) => void;
cin
`Subscribable` is made compatible with rxjs, added map, filter and scan...
r102
/**
* Called once at the end of the sequence.
*/
cin
refactoring, adding scope to rendering methods
r96 complete: () => void;
}
cin
`Subscribable` is made compatible with rxjs, added map, filter and scan...
r102 /**
cin
corrected tear down logic handling in observables. Added support for observable query results
r110 * The group of functions to feed an observable. These methods are provided to
cin
`Subscribable` is made compatible with rxjs, added map, filter and scan...
r102 * the producer to generate a stream of events.
*/
export type Sink<T> = {
cin
corrected tear down logic handling in observables. Added support for observable query results
r110 /**
* Call to send the next element in the sequence
*/
next: (value: T) => void;
/**
* Call to notify about the error occurred in the sequence.
*/
error: (e: unknown) => void;
/**
* Call to signal the end of the sequence.
*/
complete: () => void;
/**
* Checks whether the sink is accepting new elements. It's safe to
* send elements to the closed sink.
*/
isClosed: () => boolean;
cin
`Subscribable` is made compatible with rxjs, added map, filter and scan...
r102 };
cin
refactoring, adding scope to rendering methods
r96
export type Producer<T> = (sink: Sink<T>) => (void | (() => void));
cin
`Subscribable` is made compatible with rxjs, added map, filter and scan...
r102 export interface Unsubscribable {
unsubscribe(): void;
}
export const isUnsubsribable = (v: unknown): v is Unsubscribable =>
v !== null && v !== undefined && typeof (v as Unsubscribable).unsubscribe === "function";
cin
Working on WatchForRendition
r107 export const isSubsribable = <T = unknown>(v: unknown): v is Subscribable<T> =>
cin
`Subscribable` is made compatible with rxjs, added map, filter and scan...
r102 v !== null && v !== undefined && typeof (v as Subscribable<unknown>).subscribe === "function";
cin
refactoring, adding scope to rendering methods
r96
cin
`Subscribable` is made compatible with rxjs, added map, filter and scan...
r102 export interface Subscribable<T> {
subscribe(consumer: Partial<Observer<T>>): Unsubscribable;
}
/** The observable source of items. */
export interface Observable<T> extends Subscribable<T> {
/** Transforms elements of the sequence with the specified mapper
*
* @param mapper The mapper used to transform the values
*/
map<T2>(mapper: (value: T) => T2): Observable<T2>;
cin
refactoring, adding scope to rendering methods
r96
cin
`Subscribable` is made compatible with rxjs, added map, filter and scan...
r102 /** Filters elements of the sequence. The resulting sequence will
* contain only elements which match the specified predicate.
*
* @param predicate The filter predicate.
*/
filter(predicate: (value: T) => boolean): Observable<T>;
cin
refactoring, adding scope to rendering methods
r96
cin
`Subscribable` is made compatible with rxjs, added map, filter and scan...
r102 /** Applies accumulator to each value in the sequence and
* emits the accumulated value for each source element
*
* @param accumulator
* @param initial
*/
scan<A>(accumulator: (acc: A, value: T) => A, initial: A): Observable<A>;
cin
corrected tear down logic handling in observables. Added support for observable query results
r110
cat(...seq: Subscribable<T>[]): Observable<T>;
cin
`Subscribable` is made compatible with rxjs, added map, filter and scan...
r102 }
const noop = () => { };
const sink = <T>(consumer: Partial<Observer<T>>) => {
const { next, error, complete } = consumer;
cin
refactoring, adding scope to rendering methods
r96 return {
cin
`Subscribable` is made compatible with rxjs, added map, filter and scan...
r102 next: next ? next.bind(consumer) : noop,
error: error ? error.bind(consumer) : noop,
cin
corrected tear down logic handling in observables. Added support for observable query results
r110 complete: complete ? complete.bind(consumer) : noop,
isClosed: () => false
cin
linting
r109 };
cin
`Subscribable` is made compatible with rxjs, added map, filter and scan...
r102 };
cin
corrected tear down logic handling in observables. Added support for observable query results
r110 /** Wraps the producer to handle tear down logic and subscription management
*
* @param producer The producer to wrap
* @returns The wrapper producer
*/
const fuse = <T>(producer: Producer<T>) => ({ next, error, complete }: Sink<T>) => {
cin
`Subscribable` is made compatible with rxjs, added map, filter and scan...
r102 let done = false;
cin
corrected tear down logic handling in observables. Added support for observable query results
r110 let cleanup = noop;
const _fin = <A extends unknown[]>(fn: (...args: A) => void) =>
(...args: A) => done ?
void (0) :
(done = true, cleanup(), fn(...args));
const safeSink = {
cin
linting
r109 next: (value: T) => { !done && next(value); },
cin
corrected tear down logic handling in observables. Added support for observable query results
r110 error: _fin(error),
complete: _fin(complete),
isClosed: () => done
cin
linting
r109 };
cin
corrected tear down logic handling in observables. Added support for observable query results
r110 cleanup = producer(safeSink) ?? noop;
return done ?
(cleanup(), noop) :
_fin(noop);
cin
linting
r109 };
cin
refactoring, adding scope to rendering methods
r96
cin
`Subscribable` is made compatible with rxjs, added map, filter and scan...
r102 const _observe = <T>(producer: Producer<T>): Observable<T> => ({
subscribe: (consumer: Partial<Observer<T>>) => ({
unsubscribe: producer(sink(consumer)) ?? noop
}),
cin
corrected tear down logic handling in observables. Added support for observable query results
r110 map: (mapper) => _observe(({ next, ...rest }) =>
cin
`Subscribable` is made compatible with rxjs, added map, filter and scan...
r102 producer({
next: next !== noop ? (v: T) => next(mapper(v)) : noop,
cin
corrected tear down logic handling in observables. Added support for observable query results
r110 ...rest
})
),
filter: (predicate) => _observe(({ next, ...rest }) =>
producer({
next: next !== noop ? (v: T) => predicate(v) ? next(v) : void (0) : noop,
...rest
cin
`Subscribable` is made compatible with rxjs, added map, filter and scan...
r102 })
),
cin
corrected tear down logic handling in observables. Added support for observable query results
r110 scan: (accumulator, initial) => _observe(({ next, ...rest }) => {
cin
`Subscribable` is made compatible with rxjs, added map, filter and scan...
r102 let _acc = initial;
return producer({
cin
corrected tear down logic handling in observables. Added support for observable query results
r110 next: next !== noop ? (v: T) => next(_acc = accumulator(_acc, v)) : noop,
...rest
cin
`Subscribable` is made compatible with rxjs, added map, filter and scan...
r102 });
cin
corrected tear down logic handling in observables. Added support for observable query results
r110 }),
cat: (...seq) => _observe(({ next, complete: final, ...rest }) => {
let cleanup: () => void;
const complete = () => {
const continuation = seq.shift();
if (continuation) {
// if we have a next sequence, subscribe to it
const subscription = continuation.subscribe({ next, complete, ...rest });
cleanup = subscription.unsubscribe.bind(subscription);
} else {
// otherwise notify the consumer about completion
final();
}
};
cleanup = producer({ next, complete, ...rest }) ?? noop;
return () => cleanup();
cin
refactoring, adding scope to rendering methods
r96 })
});
cin
`Subscribable` is made compatible with rxjs, added map, filter and scan...
r102
cin
corrected tear down logic handling in observables. Added support for observable query results
r110 export interface OrderUpdate<T> {
/** The item is being updated */
item: T;
/** The previous index of the item, -1 in case it is inserted */
prevIndex: number;
/** The new index of the item, -1 in case it is deleted */
newIndex: number;
}
interface ObservableResults<T> {
/**
* Allows observation of results
*/
observe(listener: (object: T, previousIndex: number, newIndex: number) => void, includeUpdates?: boolean): {
remove(): void;
};
}
interface Queryable<T, A extends unknown[]> {
query(...args: A): PromiseOrValue<T[]>;
}
export const isObservableResults = <T>(v: object): v is ObservableResults<T> =>
v && (typeof (v as { observe?: unknown; }).observe === "function");
export const observe = <T>(producer: Producer<T>) => _observe(fuse(producer));
export const empty = observe<never>(({ complete }) => complete());
export const query = <T, A extends unknown[]>(store: Queryable<T, A>) =>
(...args: A) => {
return observe<OrderUpdate<T>>(({ next, complete, error }) => {
try {
const results = store.query(...args);
if (isPromise(results)) {
results.then(items => items.forEach((item, newIndex) => next({ item, newIndex, prevIndex: -1 })))
.then(undefined, error);
} else {
results.forEach((item, newIndex) => next({ item, newIndex, prevIndex: -1 }));
}
if (isObservableResults<T>(results)) {
const h = results.observe((item, prevIndex, newIndex) => next({ item, prevIndex, newIndex }));
return () => h.remove();
} else {
complete();
}
} catch (err) {
error(err);
}
});
};