observable.ts
147 lines
| 4.4 KiB
| video/mp2t
|
TypeScriptLexer
cin
|
r102 | /** | ||
* The interface for the consumer of an observable sequence | ||||
*/ | ||||
export interface Observer<T> { | ||||
/** | ||||
* Called for the next element in the sequence | ||||
*/ | ||||
next: (value: T) => void; | ||||
cin
|
r96 | |||
cin
|
r102 | /** | ||
* Called once when the error occurs in the sequence. | ||||
*/ | ||||
cin
|
r96 | error: (e: unknown) => void; | ||
cin
|
r102 | |||
/** | ||||
* Called once at the end of the sequence. | ||||
*/ | ||||
cin
|
r96 | complete: () => void; | ||
} | ||||
cin
|
r102 | /** | ||
* The group of functions to feed an observable. This methods are provided to | ||||
* the producer to generate a stream of events. | ||||
*/ | ||||
export type Sink<T> = { | ||||
[k in keyof Observer<T>]: (this: void, ...args: Parameters<Observer<T>[k]>) => void; | ||||
}; | ||||
cin
|
r96 | |||
export type Producer<T> = (sink: Sink<T>) => (void | (() => void)); | ||||
cin
|
r102 | export interface Unsubscribable { | ||
unsubscribe(): void; | ||||
} | ||||
export const isUnsubsribable = (v: unknown): v is Unsubscribable => | ||||
v !== null && v !== undefined && typeof (v as Unsubscribable).unsubscribe === "function"; | ||||
export const isSubsribable = (v: unknown): v is Subscribable<unknown> => | ||||
v !== null && v !== undefined && typeof (v as Subscribable<unknown>).subscribe === "function"; | ||||
cin
|
r96 | |||
cin
|
r102 | export interface Subscribable<T> { | ||
subscribe(consumer: Partial<Observer<T>>): Unsubscribable; | ||||
} | ||||
/** The observable source of items. */ | ||||
export interface Observable<T> extends Subscribable<T> { | ||||
/** Transforms elements of the sequence with the specified mapper | ||||
* | ||||
* @param mapper The mapper used to transform the values | ||||
*/ | ||||
map<T2>(mapper: (value: T) => T2): Observable<T2>; | ||||
cin
|
r96 | |||
cin
|
r102 | /** Filters elements of the sequence. The resulting sequence will | ||
* contain only elements which match the specified predicate. | ||||
* | ||||
* @param predicate The filter predicate. | ||||
*/ | ||||
filter(predicate: (value: T) => boolean): Observable<T>; | ||||
cin
|
r96 | |||
cin
|
r102 | /** Applies accumulator to each value in the sequence and | ||
* emits the accumulated value for each source element | ||||
* | ||||
* @param accumulator | ||||
* @param initial | ||||
*/ | ||||
scan<A>(accumulator: (acc: A, value: T) => A, initial: A): Observable<A>; | ||||
} | ||||
const noop = () => { }; | ||||
const sink = <T>(consumer: Partial<Observer<T>>) => { | ||||
const { next, error, complete } = consumer; | ||||
cin
|
r96 | return { | ||
cin
|
r102 | next: next ? next.bind(consumer) : noop, | ||
error: error ? error.bind(consumer) : noop, | ||||
complete: complete ? complete.bind(consumer) : noop | ||||
} | ||||
}; | ||||
const fuse = <T>({ next, error, complete }: Sink<T>) => { | ||||
let done = false; | ||||
return { | ||||
next: (value: T) => { !done && next(value) }, | ||||
error: (e: unknown) => { !done && (done = true, error(e)) }, | ||||
complete: () => { !done && (done = true, complete()) } | ||||
} | ||||
cin
|
r96 | } | ||
cin
|
r102 | const _observe = <T>(producer: Producer<T>): Observable<T> => ({ | ||
subscribe: (consumer: Partial<Observer<T>>) => ({ | ||||
unsubscribe: producer(sink(consumer)) ?? noop | ||||
}), | ||||
map: (mapper) => _observe(({ next, error, complete }) => | ||||
producer({ | ||||
next: next !== noop ? (v: T) => next(mapper(v)) : noop, | ||||
error, | ||||
complete | ||||
}) | ||||
), | ||||
filter: (predicate) => _observe(({ next, error, complete }) => | ||||
producer({ | ||||
next: next !== noop ? | ||||
(v: T) => predicate(v) ? next(v) : void(0) : noop, | ||||
error, | ||||
complete | ||||
}) | ||||
), | ||||
scan: (accumulator, initial) => _observe(({ next, error, complete }) => { | ||||
let _acc = initial; | ||||
return producer({ | ||||
next: next !== noop ? | ||||
(v: T) => next(_acc = accumulator(_acc, v)) : noop, | ||||
error, | ||||
complete | ||||
}); | ||||
cin
|
r96 | }) | ||
}); | ||||
cin
|
r102 | |||
export const observe = <T>(producer: Producer<T>): Observable<T> => ({ | ||||
subscribe: (consumer: Partial<Observer<T>>) => ({ | ||||
unsubscribe: producer(fuse(sink(consumer))) ?? noop | ||||
}), | ||||
map: (mapper) => _observe(({ next, error, complete }) => | ||||
producer(fuse({ | ||||
next: next !== noop ? | ||||
(v: T) => next(mapper(v)) : noop, | ||||
error, | ||||
complete | ||||
})) | ||||
), | ||||
filter: (predicate) => _observe(({ next, error, complete }) => | ||||
producer(fuse({ | ||||
next: next !== noop ? | ||||
(v: T) => predicate(v) ? next(v) : void (0) : noop, | ||||
error, | ||||
complete | ||||
})) | ||||
), | ||||
scan: (accumulator, initial?) => observe(({ next, error, complete }) => { | ||||
let _acc = initial; | ||||
return producer(fuse({ | ||||
next: next !== noop ? (v: T) => next(_acc = accumulator(_acc, v)) : noop, | ||||
error, | ||||
complete | ||||
})); | ||||
}) | ||||
}); | ||||