##// END OF EJS Templates
Implemented subscription SubscriptionImpl, fixed subscription resource management
Implemented subscription SubscriptionImpl, fixed subscription resource management

File last commit:

r136:435ce00ba245 v1.9.0-rc2 default
r158:078eca3dc271 v1.10.3 default
Show More
buffer.ts
104 lines | 3.2 KiB | video/mp2t | TypeScriptLexer
import { Cancellation } from "@implab/core-amd/Cancellation";
import { CancelledError } from "@implab/core-amd/CancelledError";
import { Observable, Sink, Subscribable, observe } from "../observable";
import { Scope } from "../tsx/Scope";
/**
* Creates a buffer with the specified length. The buffer will immediately
* subscribe to the source observable and start accumulating values.
*
* The resulting observable will repeat the buffered values for each new subscriber.
*
* @param length The number of elements to store.
* @param ct Cancellation token to unsubscribe from the original observable.
*
*/
export const buffer = (length: number, ct = Cancellation.none) => <T>(source: Subscribable<T>): Observable<T> => {
type Status = "active" | "complete" | "error";
// ring-buffer, wpos will rotate in range (0...length-1)
let wpos = 0;
const buffer: T[] = [];
// writes the next value to the buffer
const write = (value: T) => {
buffer[wpos] = value;
wpos = (wpos + 1) % length;
};
// reads contents of the buffer
// cb will be called for each value in the buffer
const read = (cb: (item: T) => void) => {
const start = wpos + length - buffer.length;
const end = wpos + length;
for(let pos = start; pos < end; pos++ )
cb(buffer[pos % length]);
};
let status: Status = "active";
let lastError: unknown;
let subscribers: Sink<T>[] = [];
const scope = new Scope();
// cleanup method to release resources held by this subscription
const cleanup = (cb: (item: Sink<T>) => void) => {
scope.destroy();
const _subscribers = subscribers;
subscribers = [];
_subscribers.forEach(cb);
};
const sink: Sink<T> = {
isClosed: () => status !== "active",
complete: () => {
if (status === "active") {
status = "complete";
cleanup(s => s.complete());
}
},
error: e => {
if (status === "active") {
status = "error";
lastError = e;
cleanup(s => s.error(e));
}
},
next: v => {
if (status === "active") {
write(v);
const _subscribers = subscribers;
_subscribers.forEach(s => s.next(v));
}
}
};
if (ct.isRequested()) {
sink.error(new CancelledError("The operation was cancelled", ct));
} else {
scope.own(source.subscribe(sink));
scope.own(ct.register(e => sink.error(e)));
}
return observe( s => {
const _subscribers = subscribers;
read(s.next);
switch (status) {
case "active":
subscribers.push(s);
return () => {
if (_subscribers === subscribers) {
const pos = subscribers.indexOf(s);
if (pos >= 0)
subscribers.splice(pos, 1);
}
};
case "complete":
s.complete();
break;
case "error":
s.error(lastError);
break;
}
});
};