##// END OF EJS Templates
Close v1.3 branch
Close v1.3 branch

File last commit:

r102:c65ea2350b1a v1.3
r103:4a1b0098cd9e v1.3
Show More
observable.ts
147 lines | 4.4 KiB | video/mp2t | TypeScriptLexer
/**
* The interface for the consumer of an observable sequence
*/
export interface Observer<T> {
/**
* Called for the next element in the sequence
*/
next: (value: T) => void;
/**
* Called once when the error occurs in the sequence.
*/
error: (e: unknown) => void;
/**
* Called once at the end of the sequence.
*/
complete: () => void;
}
/**
* The group of functions to feed an observable. This methods are provided to
* the producer to generate a stream of events.
*/
export type Sink<T> = {
[k in keyof Observer<T>]: (this: void, ...args: Parameters<Observer<T>[k]>) => void;
};
export type Producer<T> = (sink: Sink<T>) => (void | (() => void));
export interface Unsubscribable {
unsubscribe(): void;
}
export const isUnsubsribable = (v: unknown): v is Unsubscribable =>
v !== null && v !== undefined && typeof (v as Unsubscribable).unsubscribe === "function";
export const isSubsribable = (v: unknown): v is Subscribable<unknown> =>
v !== null && v !== undefined && typeof (v as Subscribable<unknown>).subscribe === "function";
export interface Subscribable<T> {
subscribe(consumer: Partial<Observer<T>>): Unsubscribable;
}
/** The observable source of items. */
export interface Observable<T> extends Subscribable<T> {
/** Transforms elements of the sequence with the specified mapper
*
* @param mapper The mapper used to transform the values
*/
map<T2>(mapper: (value: T) => T2): Observable<T2>;
/** Filters elements of the sequence. The resulting sequence will
* contain only elements which match the specified predicate.
*
* @param predicate The filter predicate.
*/
filter(predicate: (value: T) => boolean): Observable<T>;
/** Applies accumulator to each value in the sequence and
* emits the accumulated value for each source element
*
* @param accumulator
* @param initial
*/
scan<A>(accumulator: (acc: A, value: T) => A, initial: A): Observable<A>;
}
const noop = () => { };
const sink = <T>(consumer: Partial<Observer<T>>) => {
const { next, error, complete } = consumer;
return {
next: next ? next.bind(consumer) : noop,
error: error ? error.bind(consumer) : noop,
complete: complete ? complete.bind(consumer) : noop
}
};
const fuse = <T>({ next, error, complete }: Sink<T>) => {
let done = false;
return {
next: (value: T) => { !done && next(value) },
error: (e: unknown) => { !done && (done = true, error(e)) },
complete: () => { !done && (done = true, complete()) }
}
}
const _observe = <T>(producer: Producer<T>): Observable<T> => ({
subscribe: (consumer: Partial<Observer<T>>) => ({
unsubscribe: producer(sink(consumer)) ?? noop
}),
map: (mapper) => _observe(({ next, error, complete }) =>
producer({
next: next !== noop ? (v: T) => next(mapper(v)) : noop,
error,
complete
})
),
filter: (predicate) => _observe(({ next, error, complete }) =>
producer({
next: next !== noop ?
(v: T) => predicate(v) ? next(v) : void(0) : noop,
error,
complete
})
),
scan: (accumulator, initial) => _observe(({ next, error, complete }) => {
let _acc = initial;
return producer({
next: next !== noop ?
(v: T) => next(_acc = accumulator(_acc, v)) : noop,
error,
complete
});
})
});
export const observe = <T>(producer: Producer<T>): Observable<T> => ({
subscribe: (consumer: Partial<Observer<T>>) => ({
unsubscribe: producer(fuse(sink(consumer))) ?? noop
}),
map: (mapper) => _observe(({ next, error, complete }) =>
producer(fuse({
next: next !== noop ?
(v: T) => next(mapper(v)) : noop,
error,
complete
}))
),
filter: (predicate) => _observe(({ next, error, complete }) =>
producer(fuse({
next: next !== noop ?
(v: T) => predicate(v) ? next(v) : void (0) : noop,
error,
complete
}))
),
scan: (accumulator, initial?) => observe(({ next, error, complete }) => {
let _acc = initial;
return producer(fuse({
next: next !== noop ? (v: T) => next(_acc = accumulator(_acc, v)) : noop,
error,
complete
}));
})
});