##// END OF EJS Templates
Merge
Merge

File last commit:

r146:af4f8424e83d v1.9.0 default
r154:2a5720a0816e merge default
Show More
subject.ts
60 lines | 2.2 KiB | video/mp2t | TypeScriptLexer
cin
added store::get method to wrap up dojo/store/get
r136 import { Observable, Sink, Subscribable, observe } from "../observable";
cin
added 'buffer' and 'subject' observable operators
r133
const noop = () => { };
cin
code review, minor refactoring
r142 /** Connects multiple subscribers to the single producer. The producer
cin
added 'buffer' and 'subject' observable operators
r133 * will be created when the first client subscribes and will be released
* with the the last client unsubscribed.
*
* Use this wrapper to prevent spawning multiple producers.
*
cin
Added priorities to render tasks, revisited rendering scheduler...
r146 * The emitted values are not cached therefore new subscribers will not receive
* the values emitted before they had subscribed.
cin
code review, minor refactoring
r142 *
cin
added 'buffer' and 'subject' observable operators
r133 * @param source The source observable
cin
code review, minor refactoring
r142 * @returns The new observable
cin
added 'buffer' and 'subject' observable operators
r133 */
cin
added store::get method to wrap up dojo/store/get
r136 export const subject = <T>(source: Subscribable<T>): Observable<T> => {
cin
code review, minor refactoring
r142 let subscribers: Sink<T>[] = []; // the list of active subscribers
cin
added 'buffer' and 'subject' observable operators
r133
cin
code review, minor refactoring
r142 let subscription = { unsubscribe: noop }; // current subscription
cin
added 'buffer' and 'subject' observable operators
r133
// cleanup method to release resources held by this subscription
const cleanup = (cb: (item: Sink<T>) => void) => {
const _subscribers = subscribers;
cin
code review, minor refactoring
r142 subscribers = []; // this will prevent a client cleanup login to run
cin
added 'buffer' and 'subject' observable operators
r133 _subscribers.forEach(cb);
cin
code review, minor refactoring
r142 // we don't need subscription.unsubscribe(), because cleanup is called
// from complete or error methods.
cin
added 'buffer' and 'subject' observable operators
r133 };
const sink: Sink<T> = {
isClosed: () => false,
complete: () => cleanup(s => s.complete()),
error: e => cleanup(s => s.error(e)),
next: v => subscribers.forEach(s => s.next(v))
};
cin
added store::get method to wrap up dojo/store/get
r136 return observe(client => {
cin
added 'buffer' and 'subject' observable operators
r133 const _subscribers = subscribers;
subscribers.push(client);
cin
code review, minor refactoring
r142 if (subscribers.length === 1) // this is the first client
subscription = source.subscribe(sink); // activate the producer
cin
added 'buffer' and 'subject' observable operators
r133
return () => {
cin
code review, minor refactoring
r142 // this is a cleanup logic for an individual client
cin
added 'buffer' and 'subject' observable operators
r133 if (_subscribers === subscribers) {
cin
code review, minor refactoring
r142 // is the current subscription to the producer is active
// find this client in the list of subscribers
cin
added 'buffer' and 'subject' observable operators
r133 const pos = subscribers.indexOf(client);
if (pos >= 0)
subscribers.splice(pos, 1);
cin
code review, minor refactoring
r142
// is this is the last subscriber we need to release the producer
cin
added 'buffer' and 'subject' observable operators
r133 if (!subscribers.length)
subscription.unsubscribe();
}
};
cin
added store::get method to wrap up dojo/store/get
r136 });
cin
added 'buffer' and 'subject' observable operators
r133 };