Observable.ts
184 lines
| 4.8 KiB
| video/mp2t
|
TypeScriptLexer
|
|
r39 | import { IObservable, IDestroyable, ICancellation } from "./interfaces"; | ||
| import { Cancellation } from "./Cancellation"; | ||||
| import { argumentNotNull } from "./safe"; | ||||
|
|
r23 | |||
|
|
r39 | type Handler<T> = (x: T) => void; | ||
|
|
r23 | |||
|
|
r39 | type Initializer<T> = (notify: Handler<T>, error?: (e: any) => void, complete?: () => void) => void; | ||
|
|
r23 | |||
| // TODO: think about to move this interfaces.ts and make it public | ||||
| interface IObserver<T> { | ||||
|
|
r39 | next(event: T): void; | ||
|
|
r23 | |||
|
|
r39 | error(e: any): void; | ||
|
|
r23 | |||
|
|
r39 | complete(): void; | ||
|
|
r23 | } | ||
|
|
r39 | const noop = () => { }; | ||
|
|
r23 | |||
| export class Observable<T> implements IObservable<T> { | ||||
| private _once = new Array<IObserver<T>>(); | ||||
| private _observers = new Array<IObserver<T>>(); | ||||
|
|
r39 | private _complete: boolean; | ||
|
|
r23 | |||
|
|
r39 | private _error: any; | ||
|
|
r23 | |||
| constructor(func?: Initializer<T>) { | ||||
| if (func) | ||||
| func( | ||||
| this._notifyNext.bind(this), | ||||
| this._notifyError.bind(this), | ||||
| this._notifyCompleted.bind(this) | ||||
| ); | ||||
| } | ||||
| /** | ||||
| * Registers handlers for the current observable object. | ||||
|
|
r39 | * | ||
|
|
r23 | * @param next the handler for events | ||
| * @param error the handler for a error | ||||
| * @param complete the handler for a completion | ||||
| * @returns {IDestroyable} the handler for the current subscription, this | ||||
| * handler can be used to unsubscribe from events. | ||||
|
|
r39 | * | ||
|
|
r23 | */ | ||
| on(next: Handler<T>, error?: Handler<any>, complete?: () => void): IDestroyable { | ||||
| argumentNotNull(next, "next"); | ||||
|
|
r39 | const me = this; | ||
|
|
r23 | |||
|
|
r39 | const observer: IObserver<T> & IDestroyable = { | ||
| next, | ||||
|
|
r23 | error: error ? error.bind(null) : noop, | ||
| complete: complete ? complete.bind(null) : noop, | ||||
| destroy() { | ||||
| me._removeObserver(this); | ||||
| } | ||||
|
|
r25 | }; | ||
|
|
r23 | |||
| this._addObserver(observer); | ||||
| return observer; | ||||
| } | ||||
| private _addObserver(observer: IObserver<T>) { | ||||
| if (this._complete) { | ||||
| try { | ||||
| if (this._error) | ||||
| observer.error(this._error); | ||||
| else | ||||
| observer.complete(); | ||||
| } catch (e) { | ||||
| this.onObserverException(e); | ||||
| } | ||||
| } else { | ||||
| this._observers.push(observer); | ||||
| } | ||||
| } | ||||
| /** | ||||
| * Waits for the next event. This method can't be used to read messages | ||||
| * as a sequence since it can skip some messages between calls. | ||||
|
|
r39 | * | ||
|
|
r23 | * @param ct a cancellation token | ||
| */ | ||||
| next(ct: ICancellation = Cancellation.none): Promise<T> { | ||||
| return new Promise<T>((resolve, reject) => { | ||||
|
|
r39 | const observer: IObserver<T> = { | ||
|
|
r23 | next: resolve, | ||
| error: reject, | ||||
| complete: () => reject("No more events are available") | ||||
| }; | ||||
| if (this._addOnce(observer) && ct.isSupported()) { | ||||
|
|
r39 | ct.register(e => { | ||
|
|
r23 | this._removeOnce(observer); | ||
| reject(e); | ||||
| }); | ||||
| } | ||||
| }); | ||||
| } | ||||
| private _addOnce(observer: IObserver<T>) { | ||||
| if (this._complete) { | ||||
| try { | ||||
| if (this._error) | ||||
| observer.error(this._error); | ||||
| else | ||||
| observer.complete(); | ||||
| } catch (e) { | ||||
| this.onObserverException(e); | ||||
| } | ||||
| return false; | ||||
| } | ||||
| this._once.push(observer); | ||||
| return true; | ||||
| } | ||||
| protected onObserverException(e: any) { | ||||
| } | ||||
| private _removeOnce(d: IObserver<T>) { | ||||
|
|
r39 | const i = this._once.indexOf(d); | ||
|
|
r23 | if (i >= 0) | ||
| this._once.splice(i, 1); | ||||
| } | ||||
| private _removeObserver(d: IObserver<T>) { | ||||
|
|
r39 | const i = this._observers.indexOf(d); | ||
|
|
r23 | if (i >= 0) | ||
| this._observers.splice(i, 1); | ||||
| } | ||||
| private _notify(guard: (observer: IObserver<T>) => void) { | ||||
|
|
r39 | this._once.forEach(guard); | ||
| this._once = []; | ||||
|
|
r23 | |||
|
|
r39 | this._observers.forEach(guard); | ||
|
|
r23 | } | ||
| protected _notifyNext(evt: T) { | ||||
|
|
r39 | const guard = (observer: IObserver<T>) => { | ||
|
|
r23 | try { | ||
| observer.next(evt); | ||||
| } catch (e) { | ||||
| this.onObserverException(e); | ||||
| } | ||||
|
|
r39 | }; | ||
|
|
r23 | |||
| this._notify(guard); | ||||
| } | ||||
| protected _notifyError(e: any) { | ||||
|
|
r39 | const guard = (observer: IObserver<T>) => { | ||
|
|
r23 | try { | ||
| observer.error(e); | ||||
| } catch (e) { | ||||
| this.onObserverException(e); | ||||
| } | ||||
|
|
r39 | }; | ||
|
|
r23 | |||
| this._notify(guard); | ||||
| this._observers = []; | ||||
|
|
r26 | this._complete = true; | ||
|
|
r23 | } | ||
| protected _notifyCompleted() { | ||||
|
|
r39 | const guard = (observer: IObserver<T>) => { | ||
|
|
r23 | try { | ||
| observer.complete(); | ||||
| } catch (e) { | ||||
| this.onObserverException(e); | ||||
| } | ||||
|
|
r39 | }; | ||
|
|
r23 | |||
| this._notify(guard); | ||||
| this._observers = []; | ||||
|
|
r26 | this._complete = true; | ||
|
|
r23 | } | ||
|
|
r39 | } | ||
