Observable.ts
205 lines
| 5.5 KiB
| video/mp2t
|
TypeScriptLexer
|
|
r76 | import { IObservable, IDestroyable, ICancellation, IObserver } from "./interfaces"; | ||
|
|
r49 | import { Cancellation } from "./Cancellation"; | ||
|
|
r76 | import { argumentNotNull, destroyed } from "./safe"; | ||
|
|
r49 | |||
| type Handler<T> = (x: T) => void; | ||||
|
|
r115 | type Initializer<T> = (notify: Handler<T>, error: (e: any) => void, complete: () => void) => void; | ||
|
|
r49 | |||
|
|
r76 | const noop = () => { }; | ||
|
|
r49 | |||
|
|
r115 | const nulObserver: IObserver<any> = Object.freeze({ | ||
| next: noop, | ||||
| error: noop, | ||||
| complete: noop | ||||
| }); | ||||
|
|
r76 | function isObserver(val: any): val is IObserver<any> { | ||
| return val && (typeof val.next === "function"); | ||||
|
|
r49 | } | ||
| export class Observable<T> implements IObservable<T> { | ||||
| private _once = new Array<IObserver<T>>(); | ||||
| private _observers = new Array<IObserver<T>>(); | ||||
|
|
r115 | private _complete = false; | ||
|
|
r49 | |||
| private _error: any; | ||||
| constructor(func?: Initializer<T>) { | ||||
| if (func) | ||||
| func( | ||||
| this._notifyNext.bind(this), | ||||
| this._notifyError.bind(this), | ||||
| this._notifyCompleted.bind(this) | ||||
| ); | ||||
| } | ||||
| /** | ||||
| * Registers handlers for the current observable object. | ||||
| * | ||||
| * @param next the handler for events | ||||
| * @param error the handler for a error | ||||
| * @param complete the handler for a completion | ||||
| * @returns {IDestroyable} the handler for the current subscription, this | ||||
| * handler can be used to unsubscribe from events. | ||||
| * | ||||
| */ | ||||
| on(next: Handler<T>, error?: Handler<any>, complete?: () => void): IDestroyable { | ||||
| argumentNotNull(next, "next"); | ||||
| const me = this; | ||||
| const observer: IObserver<T> & IDestroyable = { | ||||
|
|
r115 | next: next.bind(null), | ||
|
|
r49 | error: error ? error.bind(null) : noop, | ||
| complete: complete ? complete.bind(null) : noop, | ||||
| destroy() { | ||||
| me._removeObserver(this); | ||||
| } | ||||
| }; | ||||
| this._addObserver(observer); | ||||
| return observer; | ||||
| } | ||||
|
|
r76 | subscribe(next: IObserver<T> | Handler<T>, error?: Handler<any>, complete?: () => void): IDestroyable { | ||
| if (isObserver(next)) { | ||||
|
|
r115 | this._addObserver(next); | ||
| return { | ||||
| destroy: () => this._removeObserver(next) | ||||
|
|
r76 | }; | ||
|
|
r115 | } else { | ||
|
|
r76 | const observer = { | ||
|
|
r115 | next: next.bind(null), | ||
| error: error ? error.bind(null) : noop, | ||||
| complete: complete ? complete.bind(null) : noop | ||||
|
|
r76 | }; | ||
|
|
r115 | |||
| this._addObserver(observer); | ||||
| return { | ||||
| destroy: () => this._removeObserver(observer) | ||||
|
|
r76 | }; | ||
| } | ||||
| } | ||||
|
|
r49 | private _addObserver(observer: IObserver<T>) { | ||
| if (this._complete) { | ||||
| try { | ||||
| if (this._error) | ||||
| observer.error(this._error); | ||||
| else | ||||
| observer.complete(); | ||||
| } catch (e) { | ||||
| this.onObserverException(e); | ||||
| } | ||||
| } else { | ||||
| this._observers.push(observer); | ||||
| } | ||||
| } | ||||
| /** | ||||
| * Waits for the next event. This method can't be used to read messages | ||||
| * as a sequence since it can skip some messages between calls. | ||||
| * | ||||
| * @param ct a cancellation token | ||||
| */ | ||||
|
|
r76 | next(ct: ICancellation = Cancellation.none) { | ||
|
|
r49 | return new Promise<T>((resolve, reject) => { | ||
| const observer: IObserver<T> = { | ||||
| next: resolve, | ||||
| error: reject, | ||||
| complete: () => reject("No more events are available") | ||||
| }; | ||||
| if (this._addOnce(observer) && ct.isSupported()) { | ||||
| ct.register(e => { | ||||
| this._removeOnce(observer); | ||||
| reject(e); | ||||
| }); | ||||
| } | ||||
| }); | ||||
| } | ||||
| private _addOnce(observer: IObserver<T>) { | ||||
| if (this._complete) { | ||||
| try { | ||||
| if (this._error) | ||||
| observer.error(this._error); | ||||
| else | ||||
| observer.complete(); | ||||
| } catch (e) { | ||||
| this.onObserverException(e); | ||||
| } | ||||
| return false; | ||||
| } | ||||
| this._once.push(observer); | ||||
| return true; | ||||
| } | ||||
| protected onObserverException(e: any) { | ||||
| } | ||||
| private _removeOnce(d: IObserver<T>) { | ||||
| const i = this._once.indexOf(d); | ||||
| if (i >= 0) | ||||
| this._once.splice(i, 1); | ||||
| } | ||||
| private _removeObserver(d: IObserver<T>) { | ||||
| const i = this._observers.indexOf(d); | ||||
| if (i >= 0) | ||||
| this._observers.splice(i, 1); | ||||
| } | ||||
| private _notify(guard: (observer: IObserver<T>) => void) { | ||||
| this._once.forEach(guard); | ||||
| this._once = []; | ||||
| this._observers.forEach(guard); | ||||
| } | ||||
| protected _notifyNext(evt: T) { | ||||
| const guard = (observer: IObserver<T>) => { | ||||
| try { | ||||
| observer.next(evt); | ||||
| } catch (e) { | ||||
| this.onObserverException(e); | ||||
| } | ||||
| }; | ||||
| this._notify(guard); | ||||
| } | ||||
| protected _notifyError(e: any) { | ||||
| const guard = (observer: IObserver<T>) => { | ||||
| try { | ||||
| observer.error(e); | ||||
| } catch (e) { | ||||
| this.onObserverException(e); | ||||
| } | ||||
| }; | ||||
| this._notify(guard); | ||||
| this._observers = []; | ||||
| this._complete = true; | ||||
| } | ||||
| protected _notifyCompleted() { | ||||
| const guard = (observer: IObserver<T>) => { | ||||
| try { | ||||
| observer.complete(); | ||||
| } catch (e) { | ||||
| this.onObserverException(e); | ||||
| } | ||||
| }; | ||||
| this._notify(guard); | ||||
| this._observers = []; | ||||
| this._complete = true; | ||||
| } | ||||
| } | ||||
