##// END OF EJS Templates
corrected tear down logic handling in observables. Added support for observable query results
corrected tear down logic handling in observables. Added support for observable query results

File last commit:

r110:1a190b3a757d v1.4.0 default
r110:1a190b3a757d v1.4.0 default
Show More
observable.ts
229 lines | 6.9 KiB | video/mp2t | TypeScriptLexer
import { PromiseOrValue } from "@implab/core-amd/interfaces";
import { isPromise } from "@implab/core-amd/safe";
/**
* The interface for the consumer of an observable sequence
*/
export interface Observer<T> {
/**
* Called for the next element in the sequence
*/
next: (value: T) => void;
/**
* Called once when the error occurs in the sequence.
*/
error: (e: unknown) => void;
/**
* Called once at the end of the sequence.
*/
complete: () => void;
}
/**
* The group of functions to feed an observable. These methods are provided to
* the producer to generate a stream of events.
*/
export type Sink<T> = {
/**
* Call to send the next element in the sequence
*/
next: (value: T) => void;
/**
* Call to notify about the error occurred in the sequence.
*/
error: (e: unknown) => void;
/**
* Call to signal the end of the sequence.
*/
complete: () => void;
/**
* Checks whether the sink is accepting new elements. It's safe to
* send elements to the closed sink.
*/
isClosed: () => boolean;
};
export type Producer<T> = (sink: Sink<T>) => (void | (() => void));
export interface Unsubscribable {
unsubscribe(): void;
}
export const isUnsubsribable = (v: unknown): v is Unsubscribable =>
v !== null && v !== undefined && typeof (v as Unsubscribable).unsubscribe === "function";
export const isSubsribable = <T = unknown>(v: unknown): v is Subscribable<T> =>
v !== null && v !== undefined && typeof (v as Subscribable<unknown>).subscribe === "function";
export interface Subscribable<T> {
subscribe(consumer: Partial<Observer<T>>): Unsubscribable;
}
/** The observable source of items. */
export interface Observable<T> extends Subscribable<T> {
/** Transforms elements of the sequence with the specified mapper
*
* @param mapper The mapper used to transform the values
*/
map<T2>(mapper: (value: T) => T2): Observable<T2>;
/** Filters elements of the sequence. The resulting sequence will
* contain only elements which match the specified predicate.
*
* @param predicate The filter predicate.
*/
filter(predicate: (value: T) => boolean): Observable<T>;
/** Applies accumulator to each value in the sequence and
* emits the accumulated value for each source element
*
* @param accumulator
* @param initial
*/
scan<A>(accumulator: (acc: A, value: T) => A, initial: A): Observable<A>;
cat(...seq: Subscribable<T>[]): Observable<T>;
}
const noop = () => { };
const sink = <T>(consumer: Partial<Observer<T>>) => {
const { next, error, complete } = consumer;
return {
next: next ? next.bind(consumer) : noop,
error: error ? error.bind(consumer) : noop,
complete: complete ? complete.bind(consumer) : noop,
isClosed: () => false
};
};
/** Wraps the producer to handle tear down logic and subscription management
*
* @param producer The producer to wrap
* @returns The wrapper producer
*/
const fuse = <T>(producer: Producer<T>) => ({ next, error, complete }: Sink<T>) => {
let done = false;
let cleanup = noop;
const _fin = <A extends unknown[]>(fn: (...args: A) => void) =>
(...args: A) => done ?
void (0) :
(done = true, cleanup(), fn(...args));
const safeSink = {
next: (value: T) => { !done && next(value); },
error: _fin(error),
complete: _fin(complete),
isClosed: () => done
};
cleanup = producer(safeSink) ?? noop;
return done ?
(cleanup(), noop) :
_fin(noop);
};
const _observe = <T>(producer: Producer<T>): Observable<T> => ({
subscribe: (consumer: Partial<Observer<T>>) => ({
unsubscribe: producer(sink(consumer)) ?? noop
}),
map: (mapper) => _observe(({ next, ...rest }) =>
producer({
next: next !== noop ? (v: T) => next(mapper(v)) : noop,
...rest
})
),
filter: (predicate) => _observe(({ next, ...rest }) =>
producer({
next: next !== noop ? (v: T) => predicate(v) ? next(v) : void (0) : noop,
...rest
})
),
scan: (accumulator, initial) => _observe(({ next, ...rest }) => {
let _acc = initial;
return producer({
next: next !== noop ? (v: T) => next(_acc = accumulator(_acc, v)) : noop,
...rest
});
}),
cat: (...seq) => _observe(({ next, complete: final, ...rest }) => {
let cleanup: () => void;
const complete = () => {
const continuation = seq.shift();
if (continuation) {
// if we have a next sequence, subscribe to it
const subscription = continuation.subscribe({ next, complete, ...rest });
cleanup = subscription.unsubscribe.bind(subscription);
} else {
// otherwise notify the consumer about completion
final();
}
};
cleanup = producer({ next, complete, ...rest }) ?? noop;
return () => cleanup();
})
});
export interface OrderUpdate<T> {
/** The item is being updated */
item: T;
/** The previous index of the item, -1 in case it is inserted */
prevIndex: number;
/** The new index of the item, -1 in case it is deleted */
newIndex: number;
}
interface ObservableResults<T> {
/**
* Allows observation of results
*/
observe(listener: (object: T, previousIndex: number, newIndex: number) => void, includeUpdates?: boolean): {
remove(): void;
};
}
interface Queryable<T, A extends unknown[]> {
query(...args: A): PromiseOrValue<T[]>;
}
export const isObservableResults = <T>(v: object): v is ObservableResults<T> =>
v && (typeof (v as { observe?: unknown; }).observe === "function");
export const observe = <T>(producer: Producer<T>) => _observe(fuse(producer));
export const empty = observe<never>(({ complete }) => complete());
export const query = <T, A extends unknown[]>(store: Queryable<T, A>) =>
(...args: A) => {
return observe<OrderUpdate<T>>(({ next, complete, error }) => {
try {
const results = store.query(...args);
if (isPromise(results)) {
results.then(items => items.forEach((item, newIndex) => next({ item, newIndex, prevIndex: -1 })))
.then(undefined, error);
} else {
results.forEach((item, newIndex) => next({ item, newIndex, prevIndex: -1 }));
}
if (isObservableResults<T>(results)) {
const h = results.observe((item, prevIndex, newIndex) => next({ item, prevIndex, newIndex }));
return () => h.remove();
} else {
complete();
}
} catch (err) {
error(err);
}
});
};