buffer.ts
104 lines
| 3.2 KiB
| video/mp2t
|
TypeScriptLexer
cin
|
r133 | import { Cancellation } from "@implab/core-amd/Cancellation"; | |
import { CancelledError } from "@implab/core-amd/CancelledError"; | |||
import { Producer, Sink, Subscribable } from "../observable"; | |||
import { Scope } from "../tsx/Scope"; | |||
/** | |||
* Creates a buffer with the specified length. The buffer will immediately | |||
* subscribe to the source observable and start accumulating values. | |||
* | |||
* The resulting observable will repeat the buffered values for each new subscriber. | |||
* | |||
* @param length The number of elements to store. | |||
* @param ct Cancellation token to unsubscribe from the original observable. | |||
* | |||
*/ | |||
export const buffer = (length: number, ct = Cancellation.none) => <T>(source: Subscribable<T>): Producer<T> => { | |||
type Status = "active" | "complete" | "error"; | |||
// ring-buffer, wpos will rotate in range (0...length-1) | |||
let wpos = 0; | |||
const buffer: T[] = []; | |||
// writes the next value to the buffer | |||
const write = (value: T) => { | |||
buffer[wpos] = value; | |||
wpos = (wpos + 1) % length; | |||
}; | |||
// reads contents of the buffer | |||
// cb will be called for each value in the buffer | |||
const read = (cb: (item: T) => void) => { | |||
const start = wpos + length - buffer.length; | |||
const end = wpos + length; | |||
for(let pos = start; pos < end; pos++ ) | |||
cb(buffer[pos % length]); | |||
}; | |||
let status: Status = "active"; | |||
let lastError: unknown; | |||
let subscribers: Sink<T>[] = []; | |||
const scope = new Scope(); | |||
// cleanup method to release resources held by this subscription | |||
const cleanup = (cb: (item: Sink<T>) => void) => { | |||
scope.destroy(); | |||
const _subscribers = subscribers; | |||
subscribers = []; | |||
_subscribers.forEach(cb); | |||
}; | |||
const sink: Sink<T> = { | |||
isClosed: () => status !== "active", | |||
complete: () => { | |||
if (status === "active") { | |||
status = "complete"; | |||
cleanup(s => s.complete()); | |||
} | |||
}, | |||
error: e => { | |||
if (status === "active") { | |||
status = "error"; | |||
lastError = e; | |||
cleanup(s => s.error(e)); | |||
} | |||
}, | |||
next: v => { | |||
if (status === "active") { | |||
write(v); | |||
const _subscribers = subscribers; | |||
_subscribers.forEach(s => s.next(v)); | |||
} | |||
} | |||
}; | |||
if (ct.isRequested()) { | |||
sink.error(new CancelledError("The operation was cancelled", ct)); | |||
} else { | |||
scope.own(source.subscribe(sink)); | |||
scope.own(ct.register(e => sink.error(e))); | |||
} | |||
return (s: Sink<T>) => { | |||
const _subscribers = subscribers; | |||
read(s.next); | |||
switch (status) { | |||
case "active": | |||
subscribers.push(s); | |||
return () => { | |||
if (_subscribers === subscribers) { | |||
const pos = subscribers.indexOf(s); | |||
if (pos >= 0) | |||
subscribers.splice(pos, 1); | |||
} | |||
}; | |||
case "complete": | |||
s.complete(); | |||
break; | |||
case "error": | |||
s.error(lastError); | |||
break; | |||
} | |||
}; | |||
}; |