import { IObservable, IDestroyable, ICancellation } from '../interfaces'; import { Cancellation } from '../Cancellation' import { argumentNotNull } from '../safe'; interface Handler { (x:T) : void } interface Initializer { (notify: Handler) : (() => void) | void; } class Observable implements IObservable, IDestroyable { private _once = new Array>(); private readonly _observers = new Array>(); private readonly _cleanup : (() => void) | void; constructor(func?: Initializer) { this._cleanup = func && func(this._notify.bind(this)); } on(observer: Handler, error?: Handler, complete?: () => void): IDestroyable { argumentNotNull(observer, "observer"); this._observers.push(observer); let me = this; return { destroy() { me._removeObserver(observer); } } } next(ct: ICancellation = Cancellation.none): Promise { return new Promise((resolve, reject) => { this._once.push(resolve); if (ct.isSupported()) { ct.register((e) => { this._removeOnce(resolve); reject(e); }); } }); } destroy() { if(this._cleanup) this._cleanup.call(null); } protected onObserverException(e: any) { } private _removeOnce(d: Handler) { let i = this._once.indexOf(d); if (i >= 0) this._once.splice(i); } private _removeObserver(d: Handler) { let i = this._observers.indexOf(d); if (i >= 0) this._observers.splice(i); } protected _notify(evt: T) { let guard = (observer: Handler) => { try { observer(evt); } catch (e) { this.onObserverException(e); } } if (this._once.length) { for (let i = 0; i < this._once.length; i++) guard(this._once[i]); this._once = []; } for (let i = 0; i < this._observers.length; i++) guard(this._observers[i]); } } namespace Observable { } export = Observable;