##// END OF EJS Templates
Corrected Scope.own() to cleanup the supplied object immediately when the scope is disposed already
Corrected Scope.own() to cleanup the supplied object immediately when the scope is disposed already

File last commit:

r129:66546e709732 v1.8.0 default
r131:c7d9ad82b374 v1.8.1 default
Show More
observable.ts
522 lines | 15.9 KiB | video/mp2t | TypeScriptLexer
/ djx / src / main / ts / observable.ts
cin
added reduce() and next() methods to observable...
r116 import { Cancellation } from "@implab/core-amd/Cancellation";
import { ICancellation } from "@implab/core-amd/interfaces";
cin
added observable.collect() method to collect a sequnce to the array...
r129 import { isPromise } from "@implab/core-amd/safe";
cin
corrected tear down logic handling in observables. Added support for observable query results
r110
cin
`Subscribable` is made compatible with rxjs, added map, filter and scan...
r102 /**
* The interface for the consumer of an observable sequence
*/
export interface Observer<T> {
/**
* Called for the next element in the sequence
*/
next: (value: T) => void;
cin
refactoring, adding scope to rendering methods
r96
cin
`Subscribable` is made compatible with rxjs, added map, filter and scan...
r102 /**
* Called once when the error occurs in the sequence.
*/
cin
refactoring, adding scope to rendering methods
r96 error: (e: unknown) => void;
cin
`Subscribable` is made compatible with rxjs, added map, filter and scan...
r102
/**
* Called once at the end of the sequence.
*/
cin
refactoring, adding scope to rendering methods
r96 complete: () => void;
}
cin
`Subscribable` is made compatible with rxjs, added map, filter and scan...
r102 /**
cin
corrected tear down logic handling in observables. Added support for observable query results
r110 * The group of functions to feed an observable. These methods are provided to
cin
`Subscribable` is made compatible with rxjs, added map, filter and scan...
r102 * the producer to generate a stream of events.
*/
export type Sink<T> = {
cin
corrected tear down logic handling in observables. Added support for observable query results
r110 /**
* Call to send the next element in the sequence
*/
cin
added reduce() and next() methods to observable...
r116 next: (value: T) => void;
/**
* Call to notify about the error occurred in the sequence.
*/
error: (e: unknown) => void;
cin
corrected tear down logic handling in observables. Added support for observable query results
r110
cin
added reduce() and next() methods to observable...
r116 /**
* Call to signal the end of the sequence.
*/
complete: () => void;
cin
corrected tear down logic handling in observables. Added support for observable query results
r110
cin
added reduce() and next() methods to observable...
r116 /**
* Checks whether the sink is accepting new elements. It's safe to
* send elements to the closed sink.
*/
isClosed: () => boolean;
cin
`Subscribable` is made compatible with rxjs, added map, filter and scan...
r102 };
cin
refactoring, adding scope to rendering methods
r96
export type Producer<T> = (sink: Sink<T>) => (void | (() => void));
cin
`Subscribable` is made compatible with rxjs, added map, filter and scan...
r102 export interface Unsubscribable {
unsubscribe(): void;
}
cin
added whenRendered() method to wait for pending oprations to complete
r118 export const isUnsubscribable = (v: unknown): v is Unsubscribable =>
cin
`Subscribable` is made compatible with rxjs, added map, filter and scan...
r102 v !== null && v !== undefined && typeof (v as Unsubscribable).unsubscribe === "function";
cin
added whenRendered() method to wait for pending oprations to complete
r118 export const isSubscribable = <T = unknown>(v: unknown): v is Subscribable<T> =>
cin
`Subscribable` is made compatible with rxjs, added map, filter and scan...
r102 v !== null && v !== undefined && typeof (v as Subscribable<unknown>).subscribe === "function";
cin
refactoring, adding scope to rendering methods
r96
cin
`Subscribable` is made compatible with rxjs, added map, filter and scan...
r102 export interface Subscribable<T> {
subscribe(consumer: Partial<Observer<T>>): Unsubscribable;
}
cin
added reduce() and next() methods to observable...
r116 export type AccumulatorFn<T, A> = (acc: A, value: T) => A;
cin
added while, until methods to the observable interface....
r124 export type OperatorFn<T, U> = (source: Observable<T>) => Producer<U>;
cin
`Subscribable` is made compatible with rxjs, added map, filter and scan...
r102 /** The observable source of items. */
export interface Observable<T> extends Subscribable<T> {
/** Transforms elements of the sequence with the specified mapper
*
* @param mapper The mapper used to transform the values
*/
map<T2>(mapper: (value: T) => T2): Observable<T2>;
cin
refactoring, adding scope to rendering methods
r96
cin
`Subscribable` is made compatible with rxjs, added map, filter and scan...
r102 /** Filters elements of the sequence. The resulting sequence will
* contain only elements which match the specified predicate.
*
* @param predicate The filter predicate.
*/
filter(predicate: (value: T) => boolean): Observable<T>;
cin
refactoring, adding scope to rendering methods
r96
cin
added while, until methods to the observable interface....
r124 /** Completes the sequence once the condition is met.
* @param predicate The condition which should be met to complete the sequence
*/
until(predicate: (value: T) => boolean): Observable<T>;
/** Keeps the sequence running while elements satisfy the condition.
*
* @param predicate The condition which should be met to continue.
*/
while(predicate: (value: T) => boolean): Observable<T>;
cin
`Subscribable` is made compatible with rxjs, added map, filter and scan...
r102 /** Applies accumulator to each value in the sequence and
* emits the accumulated value for each source element
*
* @param accumulator
* @param initial
*/
cin
added reduce() and next() methods to observable...
r116 scan<A>(accumulator: AccumulatorFn<T, A>, initial: A): Observable<A>;
scan(accumulator: AccumulatorFn<T, T>): Observable<T>;
cin
corrected tear down logic handling in observables. Added support for observable query results
r110
cin
added reduce() and next() methods to observable...
r116 /** Applies accumulator to each value in the sequence and
* emits the accumulated value at the end of the sequence
*
* @param accumulator
* @param initial
*/
reduce<A>(accumulator: AccumulatorFn<T, A>, initial: A): Observable<A>;
reduce(accumulator: AccumulatorFn<T, T>): Observable<T>;
/** Concatenates the specified sequences with this observable
*
* @param seq sequences to concatenate with the current observable
cin
added while, until methods to the observable interface....
r124 *
* The concatenation doesn't accumulate values from the specified sequences,
* The result of the concatenation is the new observable which will switch
* to the next observable after the previous one completes. Values emitted
* before the next observable being active are lost.
cin
added reduce() and next() methods to observable...
r116 */
cin
corrected tear down logic handling in observables. Added support for observable query results
r110 cat(...seq: Subscribable<T>[]): Observable<T>;
cin
added pipe method to observable
r114
cin
added while, until methods to the observable interface....
r124
cin
added reduce() and next() methods to observable...
r116 /** Pipes the specified operator to produce the new observable
cin
added while, until methods to the observable interface....
r124 * @param op The operator consumes this observable and produces a new one
*
* The operator is a higher order function which takes a source observable
* and returns a producer for the new observable.
*
* This function can be used to create a complex mapping between source and
* resulting observables. The operator may have a state (or a side effect)
* and can be connected to multiple observables.
cin
added reduce() and next() methods to observable...
r116 */
cin
added while, until methods to the observable interface....
r124 pipe<U>(op: OperatorFn<T, U>): Observable<U>;
cin
added reduce() and next() methods to observable...
r116
/** Waits for the next event to occur and returns a promise for the next value
cin
added observable.collect() method to collect a sequnce to the array...
r129 * @param ct Cancellation token
cin
added reduce() and next() methods to observable...
r116 */
next(ct?: ICancellation): Promise<T>;
cin
added observable.collect() method to collect a sequnce to the array...
r129
/** Collects items of the sequence to the array. */
collect(ct?: ICancellation): Promise<T[]>;
cin
`Subscribable` is made compatible with rxjs, added map, filter and scan...
r102 }
const noop = () => { };
const sink = <T>(consumer: Partial<Observer<T>>) => {
const { next, error, complete } = consumer;
cin
refactoring, adding scope to rendering methods
r96 return {
cin
`Subscribable` is made compatible with rxjs, added map, filter and scan...
r102 next: next ? next.bind(consumer) : noop,
error: error ? error.bind(consumer) : noop,
cin
corrected tear down logic handling in observables. Added support for observable query results
r110 complete: complete ? complete.bind(consumer) : noop,
isClosed: () => false
cin
linting
r109 };
cin
`Subscribable` is made compatible with rxjs, added map, filter and scan...
r102 };
cin
corrected tear down logic handling in observables. Added support for observable query results
r110 /** Wraps the producer to handle tear down logic and subscription management
*
* @param producer The producer to wrap
* @returns The wrapper producer
*/
const fuse = <T>(producer: Producer<T>) => ({ next, error, complete }: Sink<T>) => {
cin
`Subscribable` is made compatible with rxjs, added map, filter and scan...
r102 let done = false;
cin
corrected tear down logic handling in observables. Added support for observable query results
r110 let cleanup = noop;
const _fin = <A extends unknown[]>(fn: (...args: A) => void) =>
(...args: A) => done ?
void (0) :
(done = true, cleanup(), fn(...args));
cin
added while, until methods to the observable interface....
r124 const _fin0 = () => done ? void (0) : (done = true, cleanup());
cin
corrected tear down logic handling in observables. Added support for observable query results
r110 const safeSink = {
cin
linting
r109 next: (value: T) => { !done && next(value); },
cin
corrected tear down logic handling in observables. Added support for observable query results
r110 error: _fin(error),
complete: _fin(complete),
isClosed: () => done
cin
linting
r109 };
cin
corrected tear down logic handling in observables. Added support for observable query results
r110 cleanup = producer(safeSink) ?? noop;
cin
added while, until methods to the observable interface....
r124 return done ? cleanup() : _fin0;
cin
linting
r109 };
cin
refactoring, adding scope to rendering methods
r96
cin
`Subscribable` is made compatible with rxjs, added map, filter and scan...
r102 const _observe = <T>(producer: Producer<T>): Observable<T> => ({
subscribe: (consumer: Partial<Observer<T>>) => ({
unsubscribe: producer(sink(consumer)) ?? noop
}),
cin
added reduce() and next() methods to observable...
r116
cin
corrected tear down logic handling in observables. Added support for observable query results
r110 map: (mapper) => _observe(({ next, ...rest }) =>
cin
`Subscribable` is made compatible with rxjs, added map, filter and scan...
r102 producer({
next: next !== noop ? (v: T) => next(mapper(v)) : noop,
cin
corrected tear down logic handling in observables. Added support for observable query results
r110 ...rest
})
),
cin
added reduce() and next() methods to observable...
r116
cin
corrected tear down logic handling in observables. Added support for observable query results
r110 filter: (predicate) => _observe(({ next, ...rest }) =>
producer({
next: next !== noop ? (v: T) => predicate(v) ? next(v) : void (0) : noop,
...rest
cin
`Subscribable` is made compatible with rxjs, added map, filter and scan...
r102 })
),
cin
added reduce() and next() methods to observable...
r116
cin
added while, until methods to the observable interface....
r124 until: predicate => _observe(({ next, complete, ...rest }) =>
producer({
next: v => predicate(v) ? complete() : next(v),
complete,
...rest
})
),
while: predicate => _observe(({ next, complete, ...rest }) =>
producer({
next: v => predicate(v) ? next(v) : complete(),
complete,
...rest
})
),
cin
added reduce() and next() methods to observable...
r116 scan: <A>(...args: [AccumulatorFn<T, A>, A] | [AccumulatorFn<T, T>]) => _observe<T | A>(({ next, ...rest }) => {
if (args.length === 1) {
const [accumulator] = args;
let _acc: T;
let index = 0;
return producer({
next: next !== noop ? (v: T) => next(index++ === 0 ? _acc = v : _acc = accumulator(_acc, v)) : noop,
...rest
});
} else {
const [accumulator, initial] = args;
let _acc = initial;
return producer({
next: next !== noop ? (v: T) => next(_acc = accumulator(_acc, v)) : noop,
...rest
});
}
}),
reduce: <A>(...args: [AccumulatorFn<T, A>, A] | [AccumulatorFn<T, T>]) => _observe<T | A>(({ next, complete, error, ...rest }) => {
if (args.length === 1) {
const [accumulator] = args;
let _acc: T;
let index = 0;
return producer({
next: next !== noop ? (v: T) => {
_acc = index++ === 0 ? v : accumulator(_acc, v);
} : noop,
complete: () => {
if (index === 0) {
error(new Error("The sequence can't be empty"));
} else {
next(_acc);
complete();
}
},
error,
...rest
});
} else {
const [accumulator, initial] = args;
let _acc = initial;
return producer({
next: next !== noop ? (v: T) => {
_acc = accumulator(_acc, v);
} : noop,
complete: () => {
next(_acc);
complete();
},
error,
...rest
});
}
cin
corrected tear down logic handling in observables. Added support for observable query results
r110 }),
cat: (...seq) => _observe(({ next, complete: final, ...rest }) => {
let cleanup: () => void;
const complete = () => {
const continuation = seq.shift();
if (continuation) {
// if we have a next sequence, subscribe to it
const subscription = continuation.subscribe({ next, complete, ...rest });
cleanup = subscription.unsubscribe.bind(subscription);
} else {
// otherwise notify the consumer about completion
final();
}
};
cleanup = producer({ next, complete, ...rest }) ?? noop;
return () => cleanup();
cin
added pipe method to observable
r114 }),
cin
added reduce() and next() methods to observable...
r116 pipe: <U>(op: (source: Observable<T>) => Producer<U>) => observe(op(_observe(producer))),
cin
corrected tear down logic handling in observables. Added support for observable query results
r110
cin
added observable.collect() method to collect a sequnce to the array...
r129 next: collect(
producer,
({ next, complete, error, isClosed }) => ({
next: v => (next(v), complete()),
complete: () => error(new Error("The sequence is empty")),
error,
isClosed
})
),
collect: collect(
producer,
({ next, complete, ...rest }) => {
const data: T[] = [];
return {
next: v => data.push(v),
complete: () => (next(data), complete()),
...rest
};
}
)
});
cin
corrected tear down logic handling in observables. Added support for observable query results
r110
cin
added observable.collect() method to collect a sequnce to the array...
r129 const collect = <T, U>(
producer: Producer<T>,
collector: (result: Sink<U>) => Sink<T>
) => (ct = Cancellation.none) => new Promise<U>((resolve, reject) => {
const fused = fuse<U>(({ next, complete, error, isClosed }) => {
const h = ct.register(error);
const cleanup = !isClosed() ?
producer(collector({ next, complete, error, isClosed })) ?? noop :
noop;
cin
corrected tear down logic handling in observables. Added support for observable query results
r110
cin
added observable.collect() method to collect a sequnce to the array...
r129 return () => {
h.destroy();
cleanup();
};
});
cin
corrected tear down logic handling in observables. Added support for observable query results
r110
cin
added observable.collect() method to collect a sequnce to the array...
r129 fused({
next: resolve,
error: reject,
complete: noop,
isClosed: () => false
});
cin
added reduce() and next() methods to observable...
r116 });
cin
corrected tear down logic handling in observables. Added support for observable query results
r110
export const observe = <T>(producer: Producer<T>) => _observe(fuse(producer));
cin
added observable.collect() method to collect a sequnce to the array...
r129 export const ofArray = <T>(items: T[]) => _observe<T>(
cin
added reduce() and next() methods to observable...
r116 ({ next, complete }) => (
items.forEach(next),
complete()
)
);
cin
corrected tear down logic handling in observables. Added support for observable query results
r110
cin
added observable.collect() method to collect a sequnce to the array...
r129 const of1 = <T>(item: T | PromiseLike<T>) => observe<T>(
({ next, error, complete }) =>
isPromise(item) ?
void item.then(
v => (next(v), complete()),
error
) :
(next(item), complete())
cin
added reduce() and next() methods to observable...
r116 );
cin
corrected tear down logic handling in observables. Added support for observable query results
r110
cin
added observable.collect() method to collect a sequnce to the array...
r129 export const of = <T>(...items: (T | PromiseLike<T>)[]) => items.length === 1 ?
of1(items[0]) :
observe<T>(
({ next, error, complete, isClosed }) => {
const n = items.length;
const _next = (start: number) => {
if (start > 0 && isClosed()) // when resumed
return;
for (let i = start; i < n; i++) {
const r = items[i];
if (isPromise(r)) {
r.then(v => (next(v), _next(i + 1)), error);
return; // suspend
} else {
next(r);
}
}
complete();
};
_next(0);
}
);
cin
corrected tear down logic handling in observables. Added support for observable query results
r110
cin
added while, until methods to the observable interface....
r124 export const empty = _observe<never>(({ complete }) => complete());
/**
* Creates a mutable state and the observable for the stored value.
*
* @param value The initial value for the state
* @returns an array of three elements `[observable, setter, getter]`
*
* The returned observable keeps the actual value and will emit it as the next
* element each time a consumer subscribes the observable.
*
* Calling the setter will update the stored value in the observable and notify
* all consumers.
*/
export const stateful = <T>(producer: Producer<T>): Producer<T> => {
const fusedProducer = fuse(producer);
type Status = "active" | "complete" | "error";
let lastValue: T;
let hasValue = false;
let status: Status = "active";
let lastError: unknown;
let subscribers: Sink<T>[] = [];
const sink: Sink<T> = {
isClosed: () => status !== "active",
complete: () => {
if (status === "active") {
status = "complete";
const _subscribers = subscribers;
subscribers = [];
_subscribers.forEach(s => s.complete());
}
},
error: e => {
if (status === "active") {
status = "error";
lastError = e;
const _subscribers = subscribers;
subscribers = [];
_subscribers.forEach(s => s.error(e));
}
},
next: v => {
if (status === "active") {
hasValue = true;
lastValue = v;
const _subscribers = subscribers;
_subscribers.forEach(s => s.next(v));
}
}
};
fusedProducer(sink);
return (s: Sink<T>) => {
const _subscribers = subscribers;
switch (status) {
case "active":
if (hasValue)
s.next(lastValue); // if hasValue is true,
cin
added observable subject producer
r125 // lastValue has a valid value
cin
added while, until methods to the observable interface....
r124 subscribers.push(s);
return () => {
if (_subscribers === subscribers) {
const pos = subscribers.indexOf(s);
if (pos >= 0)
subscribers.splice(pos, 1);
}
};
case "complete":
s.complete();
break;
case "error":
s.error(lastError);
break;
}
};
};
cin
added observable subject producer
r125 /** Create the producer which will be called once when the first subscriber is
* attached, next subscribers would share the same producer. When all
* subscribers are removed the producer will be cleaned up.
*
* Use this wrapper to prevent spawning multiple producers.
*
* @param producer The source producer
* @returns The wrapped producer
*/
export const subject = <T>(producer: Producer<T>): Producer<T> => {
cin
added while, until methods to the observable interface....
r124 const fusedProducer = fuse(producer);
cin
added observable subject producer
r125 let subscribers: Sink<T>[] = [];
let cleanup = noop;
cin
added while, until methods to the observable interface....
r124
cin
added observable subject producer
r125 const sink: Sink<T> = {
isClosed: () => false,
complete: () => {
const _subscribers = subscribers;
subscribers = [];
_subscribers.forEach(s => s.complete());
cleanup();
},
error: e => {
const _subscribers = subscribers;
subscribers = [];
_subscribers.forEach(s => s.error(e));
cleanup();
},
next: v => {
const _subscribers = subscribers;
_subscribers.forEach(s => s.next(v));
}
};
return client => {
const _subscribers = subscribers;
subscribers.push(client);
if (subscribers.length === 1)
cleanup = fusedProducer(sink) ?? noop;
return () => {
if (_subscribers === subscribers) {
const pos = subscribers.indexOf(client);
if (pos >= 0)
cin
added observable.collect() method to collect a sequnce to the array...
r129 subscribers.splice(pos, 1);
cin
added observable subject producer
r125 if (!subscribers.length)
cleanup();
}
};
cin
added while, until methods to the observable interface....
r124 };
};