|
|
import { Cancellation } from "@implab/core-amd/Cancellation";
|
|
|
import { CancelledError } from "@implab/core-amd/CancelledError";
|
|
|
import { Observable, Sink, Subscribable, observe } from "../observable";
|
|
|
import { Scope } from "../tsx/Scope";
|
|
|
|
|
|
/**
|
|
|
* Creates a buffer with the specified length. The buffer will immediately
|
|
|
* subscribe to the source observable and start accumulating values.
|
|
|
*
|
|
|
* The resulting observable will repeat the buffered values for each new subscriber.
|
|
|
*
|
|
|
* @param length The number of elements to store.
|
|
|
* @param ct Cancellation token to unsubscribe from the original observable.
|
|
|
*
|
|
|
*/
|
|
|
export const buffer = (length: number, ct = Cancellation.none) => <T>(source: Subscribable<T>): Observable<T> => {
|
|
|
type Status = "active" | "complete" | "error";
|
|
|
|
|
|
// ring-buffer, wpos will rotate in range (0...length-1)
|
|
|
let wpos = 0;
|
|
|
const buffer: T[] = [];
|
|
|
|
|
|
// writes the next value to the buffer
|
|
|
const write = (value: T) => {
|
|
|
buffer[wpos] = value;
|
|
|
wpos = (wpos + 1) % length;
|
|
|
};
|
|
|
|
|
|
// reads contents of the buffer
|
|
|
// cb will be called for each value in the buffer
|
|
|
const read = (cb: (item: T) => void) => {
|
|
|
const start = wpos + length - buffer.length;
|
|
|
const end = wpos + length;
|
|
|
|
|
|
for(let pos = start; pos < end; pos++ )
|
|
|
cb(buffer[pos % length]);
|
|
|
};
|
|
|
|
|
|
let status: Status = "active";
|
|
|
let lastError: unknown;
|
|
|
let subscribers: Sink<T>[] = [];
|
|
|
|
|
|
const scope = new Scope();
|
|
|
|
|
|
// cleanup method to release resources held by this subscription
|
|
|
const cleanup = (cb: (item: Sink<T>) => void) => {
|
|
|
scope.destroy();
|
|
|
const _subscribers = subscribers;
|
|
|
subscribers = [];
|
|
|
_subscribers.forEach(cb);
|
|
|
};
|
|
|
|
|
|
const sink: Sink<T> = {
|
|
|
isClosed: () => status !== "active",
|
|
|
complete: () => {
|
|
|
if (status === "active") {
|
|
|
status = "complete";
|
|
|
cleanup(s => s.complete());
|
|
|
}
|
|
|
},
|
|
|
error: e => {
|
|
|
if (status === "active") {
|
|
|
status = "error";
|
|
|
lastError = e;
|
|
|
cleanup(s => s.error(e));
|
|
|
}
|
|
|
},
|
|
|
next: v => {
|
|
|
if (status === "active") {
|
|
|
write(v);
|
|
|
const _subscribers = subscribers;
|
|
|
_subscribers.forEach(s => s.next(v));
|
|
|
}
|
|
|
}
|
|
|
};
|
|
|
|
|
|
if (ct.isRequested()) {
|
|
|
sink.error(new CancelledError("The operation was cancelled", ct));
|
|
|
} else {
|
|
|
scope.own(source.subscribe(sink));
|
|
|
scope.own(ct.register(e => sink.error(e)));
|
|
|
}
|
|
|
|
|
|
return observe( s => {
|
|
|
const _subscribers = subscribers;
|
|
|
read(s.next);
|
|
|
switch (status) {
|
|
|
case "active":
|
|
|
subscribers.push(s);
|
|
|
return () => {
|
|
|
if (_subscribers === subscribers) {
|
|
|
const pos = subscribers.indexOf(s);
|
|
|
if (pos >= 0)
|
|
|
subscribers.splice(pos, 1);
|
|
|
}
|
|
|
};
|
|
|
case "complete":
|
|
|
s.complete();
|
|
|
break;
|
|
|
case "error":
|
|
|
s.error(lastError);
|
|
|
break;
|
|
|
}
|
|
|
});
|
|
|
};
|