observable.ts
34 lines
| 910 B
| video/mp2t
|
TypeScriptLexer
cin
|
r96 | import { IDestroyable } from "@implab/core-amd/interfaces"; | ||
export interface Sink<T> { | ||||
next: (value: T) => void; | ||||
error: (e: unknown) => void; | ||||
complete: () => void; | ||||
} | ||||
export type Consumer<T> = Partial<Sink<T>>; | ||||
export type Producer<T> = (sink: Sink<T>) => (void | (() => void)); | ||||
export interface Observable<T> { | ||||
cin
|
r97 | on(consumer: Partial<Sink<T>>): IDestroyable; | ||
cin
|
r96 | } | ||
const noop = () => {}; | ||||
const sink = <T>(consumer: Consumer<T>) => { | ||||
const { next = noop, error = noop, complete = noop } = consumer; | ||||
let done = false; | ||||
return { | ||||
cin
|
r101 | next: (value: T) => !done && next(value), | ||
error: (e: unknown) => !done && (done = true, error(e)), | ||||
complete: () => !done && (done = true, complete()) | ||||
cin
|
r96 | }; | ||
} | ||||
cin
|
r97 | export const observe = <T>(producer: Producer<T>) : Observable<T> => ({ | ||
cin
|
r96 | on: (consumer: Consumer<T>) => ({ | ||
destroy: producer(sink(consumer)) ?? noop | ||||
}) | ||||
}); | ||||