##// END OF EJS Templates
added provided and configure methods to the fluent container configuration, added applyConfig method to the container
added provided and configure methods to the fluent container configuration, added applyConfig method to the container

File last commit:

r133:09ea4b9e3735 ioc ts support
r142:be7edf08a115 v1.4.0-rc3 default
Show More
Observable.ts
199 lines | 5.3 KiB | video/mp2t | TypeScriptLexer
cin
Added safe.delay...
r76 import { IObservable, IDestroyable, ICancellation, IObserver } from "./interfaces";
cin
changed the project structure
r49 import { Cancellation } from "./Cancellation";
cin
working on fluent configuration
r133 import { argumentNotNull } from "./safe";
cin
changed the project structure
r49
type Handler<T> = (x: T) => void;
cin
corrected code to support ts strict mode...
r115 type Initializer<T> = (notify: Handler<T>, error: (e: any) => void, complete: () => void) => void;
cin
changed the project structure
r49
cin
Added safe.delay...
r76 const noop = () => { };
cin
changed the project structure
r49
cin
Added safe.delay...
r76 function isObserver(val: any): val is IObserver<any> {
return val && (typeof val.next === "function");
cin
changed the project structure
r49 }
export class Observable<T> implements IObservable<T> {
private _once = new Array<IObserver<T>>();
private _observers = new Array<IObserver<T>>();
cin
corrected code to support ts strict mode...
r115 private _complete = false;
cin
changed the project structure
r49
private _error: any;
constructor(func?: Initializer<T>) {
if (func)
func(
this._notifyNext.bind(this),
this._notifyError.bind(this),
this._notifyCompleted.bind(this)
);
}
/**
* Registers handlers for the current observable object.
*
* @param next the handler for events
* @param error the handler for a error
* @param complete the handler for a completion
* @returns {IDestroyable} the handler for the current subscription, this
* handler can be used to unsubscribe from events.
*
*/
on(next: Handler<T>, error?: Handler<any>, complete?: () => void): IDestroyable {
argumentNotNull(next, "next");
const me = this;
const observer: IObserver<T> & IDestroyable = {
cin
corrected code to support ts strict mode...
r115 next: next.bind(null),
cin
changed the project structure
r49 error: error ? error.bind(null) : noop,
complete: complete ? complete.bind(null) : noop,
destroy() {
me._removeObserver(this);
}
};
this._addObserver(observer);
return observer;
}
cin
Added safe.delay...
r76 subscribe(next: IObserver<T> | Handler<T>, error?: Handler<any>, complete?: () => void): IDestroyable {
if (isObserver(next)) {
cin
corrected code to support ts strict mode...
r115 this._addObserver(next);
return {
destroy: () => this._removeObserver(next)
cin
Added safe.delay...
r76 };
cin
corrected code to support ts strict mode...
r115 } else {
cin
Added safe.delay...
r76 const observer = {
cin
corrected code to support ts strict mode...
r115 next: next.bind(null),
error: error ? error.bind(null) : noop,
complete: complete ? complete.bind(null) : noop
cin
Added safe.delay...
r76 };
cin
corrected code to support ts strict mode...
r115
this._addObserver(observer);
return {
destroy: () => this._removeObserver(observer)
cin
Added safe.delay...
r76 };
}
}
cin
changed the project structure
r49 private _addObserver(observer: IObserver<T>) {
if (this._complete) {
try {
if (this._error)
observer.error(this._error);
else
observer.complete();
} catch (e) {
this.onObserverException(e);
}
} else {
this._observers.push(observer);
}
}
/**
* Waits for the next event. This method can't be used to read messages
* as a sequence since it can skip some messages between calls.
*
* @param ct a cancellation token
*/
cin
Added safe.delay...
r76 next(ct: ICancellation = Cancellation.none) {
cin
changed the project structure
r49 return new Promise<T>((resolve, reject) => {
const observer: IObserver<T> = {
next: resolve,
error: reject,
complete: () => reject("No more events are available")
};
if (this._addOnce(observer) && ct.isSupported()) {
ct.register(e => {
this._removeOnce(observer);
reject(e);
});
}
});
}
private _addOnce(observer: IObserver<T>) {
if (this._complete) {
try {
if (this._error)
observer.error(this._error);
else
observer.complete();
} catch (e) {
this.onObserverException(e);
}
return false;
}
this._once.push(observer);
return true;
}
protected onObserverException(e: any) {
}
private _removeOnce(d: IObserver<T>) {
const i = this._once.indexOf(d);
if (i >= 0)
this._once.splice(i, 1);
}
private _removeObserver(d: IObserver<T>) {
const i = this._observers.indexOf(d);
if (i >= 0)
this._observers.splice(i, 1);
}
private _notify(guard: (observer: IObserver<T>) => void) {
this._once.forEach(guard);
this._once = [];
this._observers.forEach(guard);
}
protected _notifyNext(evt: T) {
const guard = (observer: IObserver<T>) => {
try {
observer.next(evt);
} catch (e) {
this.onObserverException(e);
}
};
this._notify(guard);
}
protected _notifyError(e: any) {
const guard = (observer: IObserver<T>) => {
try {
observer.error(e);
} catch (e) {
this.onObserverException(e);
}
};
this._notify(guard);
this._observers = [];
this._complete = true;
}
protected _notifyCompleted() {
const guard = (observer: IObserver<T>) => {
try {
observer.complete();
} catch (e) {
this.onObserverException(e);
}
};
this._notify(guard);
this._observers = [];
this._complete = true;
}
}