##// END OF EJS Templates
Corrected Scope.own() to cleanup the supplied object immediately when the scope is disposed already
Corrected Scope.own() to cleanup the supplied object immediately when the scope is disposed already

File last commit:

r129:66546e709732 v1.8.0 default
r131:c7d9ad82b374 v1.8.1 default
Show More
observable.ts
522 lines | 15.9 KiB | video/mp2t | TypeScriptLexer
import { Cancellation } from "@implab/core-amd/Cancellation";
import { ICancellation } from "@implab/core-amd/interfaces";
import { isPromise } from "@implab/core-amd/safe";
/**
* The interface for the consumer of an observable sequence
*/
export interface Observer<T> {
/**
* Called for the next element in the sequence
*/
next: (value: T) => void;
/**
* Called once when the error occurs in the sequence.
*/
error: (e: unknown) => void;
/**
* Called once at the end of the sequence.
*/
complete: () => void;
}
/**
* The group of functions to feed an observable. These methods are provided to
* the producer to generate a stream of events.
*/
export type Sink<T> = {
/**
* Call to send the next element in the sequence
*/
next: (value: T) => void;
/**
* Call to notify about the error occurred in the sequence.
*/
error: (e: unknown) => void;
/**
* Call to signal the end of the sequence.
*/
complete: () => void;
/**
* Checks whether the sink is accepting new elements. It's safe to
* send elements to the closed sink.
*/
isClosed: () => boolean;
};
export type Producer<T> = (sink: Sink<T>) => (void | (() => void));
export interface Unsubscribable {
unsubscribe(): void;
}
export const isUnsubscribable = (v: unknown): v is Unsubscribable =>
v !== null && v !== undefined && typeof (v as Unsubscribable).unsubscribe === "function";
export const isSubscribable = <T = unknown>(v: unknown): v is Subscribable<T> =>
v !== null && v !== undefined && typeof (v as Subscribable<unknown>).subscribe === "function";
export interface Subscribable<T> {
subscribe(consumer: Partial<Observer<T>>): Unsubscribable;
}
export type AccumulatorFn<T, A> = (acc: A, value: T) => A;
export type OperatorFn<T, U> = (source: Observable<T>) => Producer<U>;
/** The observable source of items. */
export interface Observable<T> extends Subscribable<T> {
/** Transforms elements of the sequence with the specified mapper
*
* @param mapper The mapper used to transform the values
*/
map<T2>(mapper: (value: T) => T2): Observable<T2>;
/** Filters elements of the sequence. The resulting sequence will
* contain only elements which match the specified predicate.
*
* @param predicate The filter predicate.
*/
filter(predicate: (value: T) => boolean): Observable<T>;
/** Completes the sequence once the condition is met.
* @param predicate The condition which should be met to complete the sequence
*/
until(predicate: (value: T) => boolean): Observable<T>;
/** Keeps the sequence running while elements satisfy the condition.
*
* @param predicate The condition which should be met to continue.
*/
while(predicate: (value: T) => boolean): Observable<T>;
/** Applies accumulator to each value in the sequence and
* emits the accumulated value for each source element
*
* @param accumulator
* @param initial
*/
scan<A>(accumulator: AccumulatorFn<T, A>, initial: A): Observable<A>;
scan(accumulator: AccumulatorFn<T, T>): Observable<T>;
/** Applies accumulator to each value in the sequence and
* emits the accumulated value at the end of the sequence
*
* @param accumulator
* @param initial
*/
reduce<A>(accumulator: AccumulatorFn<T, A>, initial: A): Observable<A>;
reduce(accumulator: AccumulatorFn<T, T>): Observable<T>;
/** Concatenates the specified sequences with this observable
*
* @param seq sequences to concatenate with the current observable
*
* The concatenation doesn't accumulate values from the specified sequences,
* The result of the concatenation is the new observable which will switch
* to the next observable after the previous one completes. Values emitted
* before the next observable being active are lost.
*/
cat(...seq: Subscribable<T>[]): Observable<T>;
/** Pipes the specified operator to produce the new observable
* @param op The operator consumes this observable and produces a new one
*
* The operator is a higher order function which takes a source observable
* and returns a producer for the new observable.
*
* This function can be used to create a complex mapping between source and
* resulting observables. The operator may have a state (or a side effect)
* and can be connected to multiple observables.
*/
pipe<U>(op: OperatorFn<T, U>): Observable<U>;
/** Waits for the next event to occur and returns a promise for the next value
* @param ct Cancellation token
*/
next(ct?: ICancellation): Promise<T>;
/** Collects items of the sequence to the array. */
collect(ct?: ICancellation): Promise<T[]>;
}
const noop = () => { };
const sink = <T>(consumer: Partial<Observer<T>>) => {
const { next, error, complete } = consumer;
return {
next: next ? next.bind(consumer) : noop,
error: error ? error.bind(consumer) : noop,
complete: complete ? complete.bind(consumer) : noop,
isClosed: () => false
};
};
/** Wraps the producer to handle tear down logic and subscription management
*
* @param producer The producer to wrap
* @returns The wrapper producer
*/
const fuse = <T>(producer: Producer<T>) => ({ next, error, complete }: Sink<T>) => {
let done = false;
let cleanup = noop;
const _fin = <A extends unknown[]>(fn: (...args: A) => void) =>
(...args: A) => done ?
void (0) :
(done = true, cleanup(), fn(...args));
const _fin0 = () => done ? void (0) : (done = true, cleanup());
const safeSink = {
next: (value: T) => { !done && next(value); },
error: _fin(error),
complete: _fin(complete),
isClosed: () => done
};
cleanup = producer(safeSink) ?? noop;
return done ? cleanup() : _fin0;
};
const _observe = <T>(producer: Producer<T>): Observable<T> => ({
subscribe: (consumer: Partial<Observer<T>>) => ({
unsubscribe: producer(sink(consumer)) ?? noop
}),
map: (mapper) => _observe(({ next, ...rest }) =>
producer({
next: next !== noop ? (v: T) => next(mapper(v)) : noop,
...rest
})
),
filter: (predicate) => _observe(({ next, ...rest }) =>
producer({
next: next !== noop ? (v: T) => predicate(v) ? next(v) : void (0) : noop,
...rest
})
),
until: predicate => _observe(({ next, complete, ...rest }) =>
producer({
next: v => predicate(v) ? complete() : next(v),
complete,
...rest
})
),
while: predicate => _observe(({ next, complete, ...rest }) =>
producer({
next: v => predicate(v) ? next(v) : complete(),
complete,
...rest
})
),
scan: <A>(...args: [AccumulatorFn<T, A>, A] | [AccumulatorFn<T, T>]) => _observe<T | A>(({ next, ...rest }) => {
if (args.length === 1) {
const [accumulator] = args;
let _acc: T;
let index = 0;
return producer({
next: next !== noop ? (v: T) => next(index++ === 0 ? _acc = v : _acc = accumulator(_acc, v)) : noop,
...rest
});
} else {
const [accumulator, initial] = args;
let _acc = initial;
return producer({
next: next !== noop ? (v: T) => next(_acc = accumulator(_acc, v)) : noop,
...rest
});
}
}),
reduce: <A>(...args: [AccumulatorFn<T, A>, A] | [AccumulatorFn<T, T>]) => _observe<T | A>(({ next, complete, error, ...rest }) => {
if (args.length === 1) {
const [accumulator] = args;
let _acc: T;
let index = 0;
return producer({
next: next !== noop ? (v: T) => {
_acc = index++ === 0 ? v : accumulator(_acc, v);
} : noop,
complete: () => {
if (index === 0) {
error(new Error("The sequence can't be empty"));
} else {
next(_acc);
complete();
}
},
error,
...rest
});
} else {
const [accumulator, initial] = args;
let _acc = initial;
return producer({
next: next !== noop ? (v: T) => {
_acc = accumulator(_acc, v);
} : noop,
complete: () => {
next(_acc);
complete();
},
error,
...rest
});
}
}),
cat: (...seq) => _observe(({ next, complete: final, ...rest }) => {
let cleanup: () => void;
const complete = () => {
const continuation = seq.shift();
if (continuation) {
// if we have a next sequence, subscribe to it
const subscription = continuation.subscribe({ next, complete, ...rest });
cleanup = subscription.unsubscribe.bind(subscription);
} else {
// otherwise notify the consumer about completion
final();
}
};
cleanup = producer({ next, complete, ...rest }) ?? noop;
return () => cleanup();
}),
pipe: <U>(op: (source: Observable<T>) => Producer<U>) => observe(op(_observe(producer))),
next: collect(
producer,
({ next, complete, error, isClosed }) => ({
next: v => (next(v), complete()),
complete: () => error(new Error("The sequence is empty")),
error,
isClosed
})
),
collect: collect(
producer,
({ next, complete, ...rest }) => {
const data: T[] = [];
return {
next: v => data.push(v),
complete: () => (next(data), complete()),
...rest
};
}
)
});
const collect = <T, U>(
producer: Producer<T>,
collector: (result: Sink<U>) => Sink<T>
) => (ct = Cancellation.none) => new Promise<U>((resolve, reject) => {
const fused = fuse<U>(({ next, complete, error, isClosed }) => {
const h = ct.register(error);
const cleanup = !isClosed() ?
producer(collector({ next, complete, error, isClosed })) ?? noop :
noop;
return () => {
h.destroy();
cleanup();
};
});
fused({
next: resolve,
error: reject,
complete: noop,
isClosed: () => false
});
});
export const observe = <T>(producer: Producer<T>) => _observe(fuse(producer));
export const ofArray = <T>(items: T[]) => _observe<T>(
({ next, complete }) => (
items.forEach(next),
complete()
)
);
const of1 = <T>(item: T | PromiseLike<T>) => observe<T>(
({ next, error, complete }) =>
isPromise(item) ?
void item.then(
v => (next(v), complete()),
error
) :
(next(item), complete())
);
export const of = <T>(...items: (T | PromiseLike<T>)[]) => items.length === 1 ?
of1(items[0]) :
observe<T>(
({ next, error, complete, isClosed }) => {
const n = items.length;
const _next = (start: number) => {
if (start > 0 && isClosed()) // when resumed
return;
for (let i = start; i < n; i++) {
const r = items[i];
if (isPromise(r)) {
r.then(v => (next(v), _next(i + 1)), error);
return; // suspend
} else {
next(r);
}
}
complete();
};
_next(0);
}
);
export const empty = _observe<never>(({ complete }) => complete());
/**
* Creates a mutable state and the observable for the stored value.
*
* @param value The initial value for the state
* @returns an array of three elements `[observable, setter, getter]`
*
* The returned observable keeps the actual value and will emit it as the next
* element each time a consumer subscribes the observable.
*
* Calling the setter will update the stored value in the observable and notify
* all consumers.
*/
export const stateful = <T>(producer: Producer<T>): Producer<T> => {
const fusedProducer = fuse(producer);
type Status = "active" | "complete" | "error";
let lastValue: T;
let hasValue = false;
let status: Status = "active";
let lastError: unknown;
let subscribers: Sink<T>[] = [];
const sink: Sink<T> = {
isClosed: () => status !== "active",
complete: () => {
if (status === "active") {
status = "complete";
const _subscribers = subscribers;
subscribers = [];
_subscribers.forEach(s => s.complete());
}
},
error: e => {
if (status === "active") {
status = "error";
lastError = e;
const _subscribers = subscribers;
subscribers = [];
_subscribers.forEach(s => s.error(e));
}
},
next: v => {
if (status === "active") {
hasValue = true;
lastValue = v;
const _subscribers = subscribers;
_subscribers.forEach(s => s.next(v));
}
}
};
fusedProducer(sink);
return (s: Sink<T>) => {
const _subscribers = subscribers;
switch (status) {
case "active":
if (hasValue)
s.next(lastValue); // if hasValue is true,
// lastValue has a valid value
subscribers.push(s);
return () => {
if (_subscribers === subscribers) {
const pos = subscribers.indexOf(s);
if (pos >= 0)
subscribers.splice(pos, 1);
}
};
case "complete":
s.complete();
break;
case "error":
s.error(lastError);
break;
}
};
};
/** Create the producer which will be called once when the first subscriber is
* attached, next subscribers would share the same producer. When all
* subscribers are removed the producer will be cleaned up.
*
* Use this wrapper to prevent spawning multiple producers.
*
* @param producer The source producer
* @returns The wrapped producer
*/
export const subject = <T>(producer: Producer<T>): Producer<T> => {
const fusedProducer = fuse(producer);
let subscribers: Sink<T>[] = [];
let cleanup = noop;
const sink: Sink<T> = {
isClosed: () => false,
complete: () => {
const _subscribers = subscribers;
subscribers = [];
_subscribers.forEach(s => s.complete());
cleanup();
},
error: e => {
const _subscribers = subscribers;
subscribers = [];
_subscribers.forEach(s => s.error(e));
cleanup();
},
next: v => {
const _subscribers = subscribers;
_subscribers.forEach(s => s.next(v));
}
};
return client => {
const _subscribers = subscribers;
subscribers.push(client);
if (subscribers.length === 1)
cleanup = fusedProducer(sink) ?? noop;
return () => {
if (_subscribers === subscribers) {
const pos = subscribers.indexOf(client);
if (pos >= 0)
subscribers.splice(pos, 1);
if (!subscribers.length)
cleanup();
}
};
};
};