subject.ts
        
        
            
                    50 lines
            
             | 1.5 KiB
            
                | video/mp2t
            
             |
                TypeScriptLexer
            
          
        |  | r133 | import { Producer, Sink, Subscribable } from "../observable"; | ||
| const noop = () => { }; | ||||
| /** Joins multiple subscribers to the single one. The resulting subscriber | ||||
| * will be created when the first client subscribes and will be released | ||||
| * with the the last client unsubscribed. | ||||
| * | ||||
| * Use this wrapper to prevent spawning multiple producers. | ||||
| * | ||||
| * @param source The source observable | ||||
| * @returns The wrapped producer | ||||
| */ | ||||
| export const subject = <T>(source: Subscribable<T>): Producer<T> => { | ||||
| let subscribers: Sink<T>[] = []; | ||||
| let subscription = { unsubscribe: noop }; | ||||
| // cleanup method to release resources held by this subscription | ||||
| const cleanup = (cb: (item: Sink<T>) => void) => { | ||||
| const _subscribers = subscribers; | ||||
| subscribers = []; | ||||
| _subscribers.forEach(cb); | ||||
| subscription.unsubscribe(); | ||||
| }; | ||||
| const sink: Sink<T> = { | ||||
| isClosed: () => false, | ||||
| complete: () => cleanup(s => s.complete()), | ||||
| error: e => cleanup(s => s.error(e)), | ||||
| next: v => subscribers.forEach(s => s.next(v)) | ||||
| }; | ||||
| return client => { | ||||
| const _subscribers = subscribers; | ||||
| subscribers.push(client); | ||||
| if (subscribers.length === 1) | ||||
| subscription = source.subscribe(sink); | ||||
| return () => { | ||||
| if (_subscribers === subscribers) { | ||||
| const pos = subscribers.indexOf(client); | ||||
| if (pos >= 0) | ||||
| subscribers.splice(pos, 1); | ||||
| if (!subscribers.length) | ||||
| subscription.unsubscribe(); | ||||
| } | ||||
| }; | ||||
| }; | ||||
| }; | ||||
