import { IObservable, IDestroyable, ICancellation } from '../interfaces'; import { Cancellation } from '../Cancellation' import { argumentNotNull } from '../safe'; interface Handler { (x: T): void } interface Initializer { (notify: Handler, error?: (e: any) => void, complete?: () => void): (() => void) | void; } // TODO: think about to move this interfaces.ts and make it public interface IObserver { next(event: T): void error(e: any): void complete(): void } class Observable implements IObservable, IDestroyable { private _once = new Array>(); private _observers = new Array>(); private _cleanup: (() => void) | void; private _complete: boolean private _error: any constructor(func?: Initializer) { this._cleanup = func && func( this._notifyNext.bind(this), this._notifyError.bind(this), this._notifyCompleted.bind(this) ); } /** * Registers handlers for the current observable object. * * @param next the handler for events * @param error the handler for a error * @param complete the handler for a completion * @returns {IDestroyable} the handler for the current subscription, this * handler can be used to unsubscribe from events. * */ on(next: Handler, error?: Handler, complete?: () => void): IDestroyable { argumentNotNull(next, "next"); let me = this; let observer: IObserver & IDestroyable = { next: next, error(e: any) { if (error) error(e); }, complete() { if (complete) complete(); }, destroy() { me._removeObserver(this); } } this._addObserver(observer); return observer; } private _addObserver(observer: IObserver) { if (this._complete) { try { if (this._error) observer.error(this._error); else observer.complete(); } catch (e) { this.onObserverException(e); } } else { this._observers.push(observer); } } /** * Waits for the next event. This method can't be used to read messages * as a sequence since it can skip some messages between calls. * * @param ct a cancellation token */ next(ct: ICancellation = Cancellation.none): Promise { return new Promise((resolve, reject) => { let observer: IObserver = { next: resolve, error: reject, complete: () => reject("No more events are available") }; if (this._addOnce(observer) && ct.isSupported()) { ct.register((e) => { this._removeOnce(observer); reject(e); }); } }); } private _addOnce(observer: IObserver) { if (this._complete) { try { if (this._error) observer.error(this._error); else observer.complete(); } catch (e) { this.onObserverException(e); } return false; } this._once.push(observer); return true; } destroy() { if (this._complete) this._notifyCompleted(); let cleanup = this._cleanup; if (cleanup) { this._cleanup = null; cleanup(); } } protected onObserverException(e: any) { } private _removeOnce(d: IObserver) { let i = this._once.indexOf(d); if (i >= 0) this._once.splice(i); } private _removeObserver(d: IObserver) { let i = this._observers.indexOf(d); if (i >= 0) this._observers.splice(i); } private _notify(guard: (observer: IObserver) => void) { if (this._once.length) { for (let i = 0; i < this._once.length; i++) guard(this._once[i]); this._once = []; } for (let i = 0; i < this._observers.length; i++) guard(this._observers[i]); } protected _notifyNext(evt: T) { let guard = (observer: IObserver) => { try { observer.next(evt); } catch (e) { this.onObserverException(e); } } this._notify(guard); } protected _notifyError(e: any) { let guard = (observer: IObserver) => { try { observer.error(e); } catch (e) { this.onObserverException(e); } } this._notify(guard); this._observers = []; } protected _notifyCompleted() { let guard = (observer: IObserver) => { try { observer.complete(); } catch (e) { this.onObserverException(e); } } this._notify(guard); this._observers = []; } } namespace Observable { } export = Observable;