import { ICancellation } from "@implab/core-amd/interfaces"; /** * The interface for the consumer of an observable sequence */ export interface Observer { /** * Called for the next element in the sequence */ next?(value: T): void; /** * Called once when the error occurs in the sequence. */ error?(e: unknown): void; /** * Called once at the end of the sequence. */ complete?(): void; } /** * The group of functions to feed an observable. These methods are provided to * the producer to generate a stream of events. */ export type Sink = { /** * Call to send the next element in the sequence */ next: (value: T) => void; /** * Call to notify about the error occurred in the sequence. */ error: (e: unknown) => void; /** * Call to signal the end of the sequence. */ complete: () => void; /** * Checks whether the sink is accepting new elements. It's safe to * send elements to the closed sink. * * This method is useful to notify a producer while it's emitting the series * of synchronous events. */ isClosed: () => boolean; }; export type Producer = (sink: Sink) => (void | (() => void)); export type FusedSink = Omit, "isClosed">; export type FusedProducer = (sink: FusedSink) => (void | (() => void)); export interface Unsubscribable { unsubscribe(): void; } export interface Subscribable { /** Subscribes a consumer to events. If a consumer isn't specified * this method activates the producer to achieve side affects if any. */ subscribe(consumer?: Observer): Unsubscribable; } export type AccumulatorFn = (acc: A, value: T) => A; export type OperatorFn = (source: Observable) => Observable; /** The observable source of items. */ export interface Observable extends Subscribable { /** Transforms elements of the sequence with the specified mapper * * @param mapper The mapper used to transform the values */ map(mapper: (value: T) => T2): Observable; /** Injects the specified observer into the each producer to consumer chain. * The method is used to add side effect to the events processing. * * @param observer The consumer for the events */ tap(observer: Observer): Observable; /** Filters elements of the sequence. The resulting sequence will * contain only elements which match the specified predicate. * * @param predicate The filter predicate. */ filter(predicate: (value: T) => boolean): Observable; /** Completes the sequence once the condition is met. * @param predicate The condition which should be met to complete the sequence */ until(predicate: (value: T) => boolean): Observable; /** Keeps the sequence running while elements satisfy the condition. * * @param predicate The condition which should be met to continue. */ while(predicate: (value: T) => boolean): Observable; /** Applies accumulator to each value in the sequence and * emits the accumulated value for each source element * * @param accumulator * @param initial */ scan(accumulator: AccumulatorFn, initial: A): Observable; scan(accumulator: AccumulatorFn): Observable; /** Applies accumulator to each value in the sequence and * emits the accumulated value at the end of the sequence * * @param accumulator * @param initial */ reduce(accumulator: AccumulatorFn, initial: A): Observable; reduce(accumulator: AccumulatorFn): Observable; /** Concatenates the specified sequences with this observable * * @param seq sequences to concatenate with the current observable * * The concatenation doesn't accumulate values from the specified sequences, * The result of the concatenation is the new observable which will switch * to the next observable after the previous one completes. Values emitted * before the next observable being active are lost. */ cat(...seq: Subscribable[]): Observable; /** Pipes the specified operator to produce the new observable * @param op The operator consumes this observable and produces a new one * * The operator is a higher order function which takes a source observable * and returns a producer for the new observable. * * This function can be used to create a complex mapping between source and * resulting observables. The operator may have a state (or a side effect) * and can be connected to multiple observables. */ pipe(op: OperatorFn): Observable; /** Waits for the next event to occur and returns a promise for the next value * @param ct Cancellation token */ next(ct?: ICancellation): Promise; /** Collects items of the sequence to the array. */ collect(ct?: ICancellation): Promise; }