##// END OF EJS Templates
Added childContainer service to container services, ServiceContaier is destroyable, fixed browser Uuid version
Added childContainer service to container services, ServiceContaier is destroyable, fixed browser Uuid version

File last commit:

r133:09ea4b9e3735 ioc ts support
r146:f3f5c56d3b3e v1.4.0-rc5 default
Show More
Observable.ts
199 lines | 5.3 KiB | video/mp2t | TypeScriptLexer
cin
Added safe.delay...
r76 import { IObservable, IDestroyable, ICancellation, IObserver } from "./interfaces";
cin
changed the project structure
r49 import { Cancellation } from "./Cancellation";
cin
working on fluent configuration
r133 import { argumentNotNull } from "./safe";
cin
changed the project structure
r49
type Handler<T> = (x: T) => void;
cin
corrected code to support ts strict mode...
r115 type Initializer<T> = (notify: Handler<T>, error: (e: any) => void, complete: () => void) => void;
cin
changed the project structure
r49
cin
Added safe.delay...
r76 const noop = () => { };
cin
changed the project structure
r49
cin
Added safe.delay...
r76 function isObserver(val: any): val is IObserver<any> {
return val && (typeof val.next === "function");
cin
changed the project structure
r49 }
export class Observable<T> implements IObservable<T> {
private _once = new Array<IObserver<T>>();
private _observers = new Array<IObserver<T>>();
cin
corrected code to support ts strict mode...
r115 private _complete = false;
cin
changed the project structure
r49
private _error: any;
constructor(func?: Initializer<T>) {
if (func)
func(
this._notifyNext.bind(this),
this._notifyError.bind(this),
this._notifyCompleted.bind(this)
);
}
/**
* Registers handlers for the current observable object.
*
* @param next the handler for events
* @param error the handler for a error
* @param complete the handler for a completion
* @returns {IDestroyable} the handler for the current subscription, this
* handler can be used to unsubscribe from events.
*
*/
on(next: Handler<T>, error?: Handler<any>, complete?: () => void): IDestroyable {
argumentNotNull(next, "next");
const me = this;
const observer: IObserver<T> & IDestroyable = {
cin
corrected code to support ts strict mode...
r115 next: next.bind(null),
cin
changed the project structure
r49 error: error ? error.bind(null) : noop,
complete: complete ? complete.bind(null) : noop,
destroy() {
me._removeObserver(this);
}
};
this._addObserver(observer);
return observer;
}
cin
Added safe.delay...
r76 subscribe(next: IObserver<T> | Handler<T>, error?: Handler<any>, complete?: () => void): IDestroyable {
if (isObserver(next)) {
cin
corrected code to support ts strict mode...
r115 this._addObserver(next);
return {
destroy: () => this._removeObserver(next)
cin
Added safe.delay...
r76 };
cin
corrected code to support ts strict mode...
r115 } else {
cin
Added safe.delay...
r76 const observer = {
cin
corrected code to support ts strict mode...
r115 next: next.bind(null),
error: error ? error.bind(null) : noop,
complete: complete ? complete.bind(null) : noop
cin
Added safe.delay...
r76 };
cin
corrected code to support ts strict mode...
r115
this._addObserver(observer);
return {
destroy: () => this._removeObserver(observer)
cin
Added safe.delay...
r76 };
}
}
cin
changed the project structure
r49 private _addObserver(observer: IObserver<T>) {
if (this._complete) {
try {
if (this._error)
observer.error(this._error);
else
observer.complete();
} catch (e) {
this.onObserverException(e);
}
} else {
this._observers.push(observer);
}
}
/**
* Waits for the next event. This method can't be used to read messages
* as a sequence since it can skip some messages between calls.
*
* @param ct a cancellation token
*/
cin
Added safe.delay...
r76 next(ct: ICancellation = Cancellation.none) {
cin
changed the project structure
r49 return new Promise<T>((resolve, reject) => {
const observer: IObserver<T> = {
next: resolve,
error: reject,
complete: () => reject("No more events are available")
};
if (this._addOnce(observer) && ct.isSupported()) {
ct.register(e => {
this._removeOnce(observer);
reject(e);
});
}
});
}
private _addOnce(observer: IObserver<T>) {
if (this._complete) {
try {
if (this._error)
observer.error(this._error);
else
observer.complete();
} catch (e) {
this.onObserverException(e);
}
return false;
}
this._once.push(observer);
return true;
}
protected onObserverException(e: any) {
}
private _removeOnce(d: IObserver<T>) {
const i = this._once.indexOf(d);
if (i >= 0)
this._once.splice(i, 1);
}
private _removeObserver(d: IObserver<T>) {
const i = this._observers.indexOf(d);
if (i >= 0)
this._observers.splice(i, 1);
}
private _notify(guard: (observer: IObserver<T>) => void) {
this._once.forEach(guard);
this._once = [];
this._observers.forEach(guard);
}
protected _notifyNext(evt: T) {
const guard = (observer: IObserver<T>) => {
try {
observer.next(evt);
} catch (e) {
this.onObserverException(e);
}
};
this._notify(guard);
}
protected _notifyError(e: any) {
const guard = (observer: IObserver<T>) => {
try {
observer.error(e);
} catch (e) {
this.onObserverException(e);
}
};
this._notify(guard);
this._observers = [];
this._complete = true;
}
protected _notifyCompleted() {
const guard = (observer: IObserver<T>) => {
try {
observer.complete();
} catch (e) {
this.onObserverException(e);
}
};
this._notify(guard);
this._observers = [];
this._complete = true;
}
}