observable.ts
34 lines
| 887 B
| video/mp2t
|
TypeScriptLexer
cin
|
r96 | import { IDestroyable } from "@implab/core-amd/interfaces"; | ||
export interface Sink<T> { | ||||
next: (value: T) => void; | ||||
error: (e: unknown) => void; | ||||
complete: () => void; | ||||
} | ||||
export type Consumer<T> = Partial<Sink<T>>; | ||||
export type Producer<T> = (sink: Sink<T>) => (void | (() => void)); | ||||
export interface Observable<T> { | ||||
on(sink: Partial<Sink<T>>): IDestroyable; | ||||
} | ||||
const noop = () => {}; | ||||
const sink = <T>(consumer: Consumer<T>) => { | ||||
const { next = noop, error = noop, complete = noop } = consumer; | ||||
let done = false; | ||||
return { | ||||
next: (value: T) => done && next(value), | ||||
error: (e: unknown) => done && (done = true, error(e)), | ||||
complete: () => done && (done = true, complete()) | ||||
}; | ||||
} | ||||
export const observe = <T>(producer: Producer<T>) => ({ | ||||
on: (consumer: Consumer<T>) => ({ | ||||
destroy: producer(sink(consumer)) ?? noop | ||||
}) | ||||
}); | ||||