import { Cancellation } from "@implab/core-amd/Cancellation"; import { CancelledError } from "@implab/core-amd/CancelledError"; import { Observable, Sink, Subscribable, observe } from "../observable"; import { Scope } from "../tsx/Scope"; /** * Creates a buffer with the specified length. The buffer will immediately * subscribe to the source observable and start accumulating values. * * The resulting observable will repeat the buffered values for each new subscriber. * * @param length The number of elements to store. * @param ct Cancellation token to unsubscribe from the original observable. * */ export const buffer = (length: number, ct = Cancellation.none) => (source: Subscribable): Observable => { type Status = "active" | "complete" | "error"; // ring-buffer, wpos will rotate in range (0...length-1) let wpos = 0; const buffer: T[] = []; // writes the next value to the buffer const write = (value: T) => { buffer[wpos] = value; wpos = (wpos + 1) % length; }; // reads contents of the buffer // cb will be called for each value in the buffer const read = (cb: (item: T) => void) => { const start = wpos + length - buffer.length; const end = wpos + length; for(let pos = start; pos < end; pos++ ) cb(buffer[pos % length]); }; let status: Status = "active"; let lastError: unknown; let subscribers: Sink[] = []; const scope = new Scope(); // cleanup method to release resources held by this subscription const cleanup = (cb: (item: Sink) => void) => { scope.destroy(); const _subscribers = subscribers; subscribers = []; _subscribers.forEach(cb); }; const sink: Sink = { isClosed: () => status !== "active", complete: () => { if (status === "active") { status = "complete"; cleanup(s => s.complete()); } }, error: e => { if (status === "active") { status = "error"; lastError = e; cleanup(s => s.error(e)); } }, next: v => { if (status === "active") { write(v); const _subscribers = subscribers; _subscribers.forEach(s => s.next(v)); } } }; if (ct.isRequested()) { sink.error(new CancelledError("The operation was cancelled", ct)); } else { scope.own(source.subscribe(sink)); scope.own(ct.register(e => sink.error(e))); } return observe( s => { const _subscribers = subscribers; read(s.next); switch (status) { case "active": subscribers.push(s); return () => { if (_subscribers === subscribers) { const pos = subscribers.indexOf(s); if (pos >= 0) subscribers.splice(pos, 1); } }; case "complete": s.complete(); break; case "error": s.error(lastError); break; } }); };