Observable.ts
192 lines
| 4.9 KiB
| video/mp2t
|
TypeScriptLexer
|
|
r14 | import { IObservable, IDestroyable, ICancellation } from '../interfaces'; | |
|
|
r13 | import { Cancellation } from '../Cancellation' | |
| import { argumentNotNull } from '../safe'; | |||
|
|
r14 | interface Handler<T> { | |
|
|
r15 | (x: T): void | |
|
|
r14 | } | |
|
|
r13 | ||
|
|
r14 | interface Initializer<T> { | |
|
|
r18 | (notify: Handler<T>, error?: (e: any) => void, complete?: () => void): void; | |
|
|
r14 | } | |
|
|
r15 | // TODO: think about to move this interfaces.ts and make it public | |
| interface IObserver<T> { | |||
| next(event: T): void | |||
| error(e: any): void | |||
| complete(): void | |||
| } | |||
|
|
r13 | ||
|
|
r18 | const noop = () => {}; | |
|
|
r22 | export class Observable<T> implements IObservable<T> { | |
|
|
r15 | private _once = new Array<IObserver<T>>(); | |
| private _observers = new Array<IObserver<T>>(); | |||
|
|
r14 | ||
|
|
r13 | ||
|
|
r15 | private _complete: boolean | |
| private _error: any | |||
|
|
r14 | ||
| constructor(func?: Initializer<T>) { | |||
|
|
r18 | if (func) | |
| func( | |||
| this._notifyNext.bind(this), | |||
| this._notifyError.bind(this), | |||
| this._notifyCompleted.bind(this) | |||
| ); | |||
|
|
r13 | } | |
|
|
r15 | /** | |
| * Registers handlers for the current observable object. | |||
| * | |||
| * @param next the handler for events | |||
| * @param error the handler for a error | |||
| * @param complete the handler for a completion | |||
| * @returns {IDestroyable} the handler for the current subscription, this | |||
| * handler can be used to unsubscribe from events. | |||
| * | |||
| */ | |||
| on(next: Handler<T>, error?: Handler<any>, complete?: () => void): IDestroyable { | |||
| argumentNotNull(next, "next"); | |||
|
|
r13 | ||
| let me = this; | |||
|
|
r15 | ||
| let observer: IObserver<T> & IDestroyable = { | |||
| next: next, | |||
|
|
r18 | error: error ? error.bind(null) : noop, | |
| complete: complete ? complete.bind(null) : noop, | |||
|
|
r15 | ||
|
|
r13 | destroy() { | |
|
|
r15 | me._removeObserver(this); | |
|
|
r13 | } | |
| } | |||
|
|
r15 | ||
| this._addObserver(observer); | |||
| return observer; | |||
| } | |||
| private _addObserver(observer: IObserver<T>) { | |||
| if (this._complete) { | |||
| try { | |||
| if (this._error) | |||
| observer.error(this._error); | |||
| else | |||
| observer.complete(); | |||
| } catch (e) { | |||
| this.onObserverException(e); | |||
| } | |||
| } else { | |||
| this._observers.push(observer); | |||
| } | |||
|
|
r13 | } | |
|
|
r15 | /** | |
| * Waits for the next event. This method can't be used to read messages | |||
| * as a sequence since it can skip some messages between calls. | |||
| * | |||
| * @param ct a cancellation token | |||
| */ | |||
|
|
r14 | next(ct: ICancellation = Cancellation.none): Promise<T> { | |
|
|
r13 | return new Promise<T>((resolve, reject) => { | |
|
|
r15 | let observer: IObserver<T> = { | |
| next: resolve, | |||
| error: reject, | |||
| complete: () => reject("No more events are available") | |||
| }; | |||
| if (this._addOnce(observer) && ct.isSupported()) { | |||
|
|
r13 | ct.register((e) => { | |
|
|
r15 | this._removeOnce(observer); | |
|
|
r13 | reject(e); | |
| }); | |||
| } | |||
| }); | |||
| } | |||
|
|
r15 | private _addOnce(observer: IObserver<T>) { | |
| if (this._complete) { | |||
| try { | |||
| if (this._error) | |||
| observer.error(this._error); | |||
| else | |||
| observer.complete(); | |||
| } catch (e) { | |||
| this.onObserverException(e); | |||
| } | |||
| return false; | |||
| } | |||
| this._once.push(observer); | |||
| return true; | |||
| } | |||
|
|
r14 | protected onObserverException(e: any) { | |
| } | |||
|
|
r15 | private _removeOnce(d: IObserver<T>) { | |
|
|
r13 | let i = this._once.indexOf(d); | |
| if (i >= 0) | |||
|
|
r18 | this._once.splice(i, 1); | |
|
|
r13 | } | |
|
|
r15 | private _removeObserver(d: IObserver<T>) { | |
|
|
r13 | let i = this._observers.indexOf(d); | |
| if (i >= 0) | |||
|
|
r18 | this._observers.splice(i, 1); | |
|
|
r13 | } | |
|
|
r15 | private _notify(guard: (observer: IObserver<T>) => void) { | |
|
|
r13 | if (this._once.length) { | |
| for (let i = 0; i < this._once.length; i++) | |||
| guard(this._once[i]); | |||
| this._once = []; | |||
| } | |||
| for (let i = 0; i < this._observers.length; i++) | |||
| guard(this._observers[i]); | |||
| } | |||
|
|
r15 | ||
| protected _notifyNext(evt: T) { | |||
| let guard = (observer: IObserver<T>) => { | |||
| try { | |||
| observer.next(evt); | |||
| } catch (e) { | |||
| this.onObserverException(e); | |||
| } | |||
| } | |||
| this._notify(guard); | |||
| } | |||
| protected _notifyError(e: any) { | |||
| let guard = (observer: IObserver<T>) => { | |||
| try { | |||
| observer.error(e); | |||
| } catch (e) { | |||
| this.onObserverException(e); | |||
| } | |||
| } | |||
| this._notify(guard); | |||
| this._observers = []; | |||
| } | |||
| protected _notifyCompleted() { | |||
| let guard = (observer: IObserver<T>) => { | |||
| try { | |||
| observer.complete(); | |||
| } catch (e) { | |||
| this.onObserverException(e); | |||
| } | |||
| } | |||
| this._notify(guard); | |||
| this._observers = []; | |||
| } | |||
|
|
r22 | } |
