import { IObservable, IDestroyable, ICancellation } from './interfaces'; import { Cancellation } from './Cancellation' import { argumentNotNull } from './safe'; interface Handler { (x: T): void } interface Initializer { (notify: Handler, error?: (e: any) => void, complete?: () => void): void; } // TODO: think about to move this interfaces.ts and make it public interface IObserver { next(event: T): void error(e: any): void complete(): void } const noop = () => {}; export class Observable implements IObservable { private _once = new Array>(); private _observers = new Array>(); private _complete: boolean private _error: any constructor(func?: Initializer) { if (func) func( this._notifyNext.bind(this), this._notifyError.bind(this), this._notifyCompleted.bind(this) ); } /** * Registers handlers for the current observable object. * * @param next the handler for events * @param error the handler for a error * @param complete the handler for a completion * @returns {IDestroyable} the handler for the current subscription, this * handler can be used to unsubscribe from events. * */ on(next: Handler, error?: Handler, complete?: () => void): IDestroyable { argumentNotNull(next, "next"); let me = this; let observer: IObserver & IDestroyable = { next: next, error: error ? error.bind(null) : noop, complete: complete ? complete.bind(null) : noop, destroy() { me._removeObserver(this); } }; this._addObserver(observer); return observer; } private _addObserver(observer: IObserver) { if (this._complete) { try { if (this._error) observer.error(this._error); else observer.complete(); } catch (e) { this.onObserverException(e); } } else { this._observers.push(observer); } } /** * Waits for the next event. This method can't be used to read messages * as a sequence since it can skip some messages between calls. * * @param ct a cancellation token */ next(ct: ICancellation = Cancellation.none): Promise { return new Promise((resolve, reject) => { let observer: IObserver = { next: resolve, error: reject, complete: () => reject("No more events are available") }; if (this._addOnce(observer) && ct.isSupported()) { ct.register((e) => { this._removeOnce(observer); reject(e); }); } }); } private _addOnce(observer: IObserver) { if (this._complete) { try { if (this._error) observer.error(this._error); else observer.complete(); } catch (e) { this.onObserverException(e); } return false; } this._once.push(observer); return true; } protected onObserverException(e: any) { } private _removeOnce(d: IObserver) { let i = this._once.indexOf(d); if (i >= 0) this._once.splice(i, 1); } private _removeObserver(d: IObserver) { let i = this._observers.indexOf(d); if (i >= 0) this._observers.splice(i, 1); } private _notify(guard: (observer: IObserver) => void) { if (this._once.length) { for (let i = 0; i < this._once.length; i++) guard(this._once[i]); this._once = []; } for (let i = 0; i < this._observers.length; i++) guard(this._observers[i]); } protected _notifyNext(evt: T) { let guard = (observer: IObserver) => { try { observer.next(evt); } catch (e) { this.onObserverException(e); } } this._notify(guard); } protected _notifyError(e: any) { let guard = (observer: IObserver) => { try { observer.error(e); } catch (e) { this.onObserverException(e); } } this._notify(guard); this._observers = []; } protected _notifyCompleted() { let guard = (observer: IObserver) => { try { observer.complete(); } catch (e) { this.onObserverException(e); } } this._notify(guard); this._observers = []; } }