##// END OF EJS Templates
fixed format-compile bug while formatting strings with new line symbols....
fixed format-compile bug while formatting strings with new line symbols. added error and completion handlers to Observable ConsoleTraceWriter is now destroyable

File last commit:

r15:d5a3d3ab9fd7 propose cancellat...
r15:d5a3d3ab9fd7 propose cancellat...
Show More
Observable.ts
214 lines | 5.2 KiB | video/mp2t | TypeScriptLexer
cin
working on Observable
r14 import { IObservable, IDestroyable, ICancellation } from '../interfaces';
cin
refactoring, all common interfaces placed to core/interfaces.ts...
r13 import { Cancellation } from '../Cancellation'
import { argumentNotNull } from '../safe';
cin
working on Observable
r14 interface Handler<T> {
cin
fixed format-compile bug while formatting strings with new line symbols....
r15 (x: T): void
cin
working on Observable
r14 }
cin
refactoring, all common interfaces placed to core/interfaces.ts...
r13
cin
working on Observable
r14 interface Initializer<T> {
cin
fixed format-compile bug while formatting strings with new line symbols....
r15 (notify: Handler<T>, error?: (e: any) => void, complete?: () => void): (() => void) | void;
cin
working on Observable
r14 }
cin
fixed format-compile bug while formatting strings with new line symbols....
r15 // TODO: think about to move this interfaces.ts and make it public
interface IObserver<T> {
next(event: T): void
error(e: any): void
complete(): void
}
cin
refactoring, all common interfaces placed to core/interfaces.ts...
r13
cin
working on Observable
r14 class Observable<T> implements IObservable<T>, IDestroyable {
cin
fixed format-compile bug while formatting strings with new line symbols....
r15 private _once = new Array<IObserver<T>>();
private _observers = new Array<IObserver<T>>();
cin
working on Observable
r14
cin
fixed format-compile bug while formatting strings with new line symbols....
r15 private _cleanup: (() => void) | void;
cin
refactoring, all common interfaces placed to core/interfaces.ts...
r13
cin
fixed format-compile bug while formatting strings with new line symbols....
r15 private _complete: boolean
private _error: any
cin
working on Observable
r14
constructor(func?: Initializer<T>) {
cin
fixed format-compile bug while formatting strings with new line symbols....
r15 this._cleanup = func && func(
this._notifyNext.bind(this),
this._notifyError.bind(this),
this._notifyCompleted.bind(this)
);
cin
refactoring, all common interfaces placed to core/interfaces.ts...
r13 }
cin
fixed format-compile bug while formatting strings with new line symbols....
r15 /**
* Registers handlers for the current observable object.
*
* @param next the handler for events
* @param error the handler for a error
* @param complete the handler for a completion
* @returns {IDestroyable} the handler for the current subscription, this
* handler can be used to unsubscribe from events.
*
*/
on(next: Handler<T>, error?: Handler<any>, complete?: () => void): IDestroyable {
argumentNotNull(next, "next");
cin
refactoring, all common interfaces placed to core/interfaces.ts...
r13
let me = this;
cin
fixed format-compile bug while formatting strings with new line symbols....
r15
let observer: IObserver<T> & IDestroyable = {
next: next,
error(e: any) {
if (error)
error(e);
},
complete() {
if (complete)
complete();
},
cin
refactoring, all common interfaces placed to core/interfaces.ts...
r13 destroy() {
cin
fixed format-compile bug while formatting strings with new line symbols....
r15 me._removeObserver(this);
cin
refactoring, all common interfaces placed to core/interfaces.ts...
r13 }
}
cin
fixed format-compile bug while formatting strings with new line symbols....
r15
this._addObserver(observer);
return observer;
}
private _addObserver(observer: IObserver<T>) {
if (this._complete) {
try {
if (this._error)
observer.error(this._error);
else
observer.complete();
} catch (e) {
this.onObserverException(e);
}
} else {
this._observers.push(observer);
}
cin
refactoring, all common interfaces placed to core/interfaces.ts...
r13 }
cin
fixed format-compile bug while formatting strings with new line symbols....
r15 /**
* Waits for the next event. This method can't be used to read messages
* as a sequence since it can skip some messages between calls.
*
* @param ct a cancellation token
*/
cin
working on Observable
r14 next(ct: ICancellation = Cancellation.none): Promise<T> {
cin
refactoring, all common interfaces placed to core/interfaces.ts...
r13 return new Promise<T>((resolve, reject) => {
cin
fixed format-compile bug while formatting strings with new line symbols....
r15 let observer: IObserver<T> = {
next: resolve,
error: reject,
complete: () => reject("No more events are available")
};
if (this._addOnce(observer) && ct.isSupported()) {
cin
refactoring, all common interfaces placed to core/interfaces.ts...
r13 ct.register((e) => {
cin
fixed format-compile bug while formatting strings with new line symbols....
r15 this._removeOnce(observer);
cin
refactoring, all common interfaces placed to core/interfaces.ts...
r13 reject(e);
});
}
});
}
cin
fixed format-compile bug while formatting strings with new line symbols....
r15 private _addOnce(observer: IObserver<T>) {
if (this._complete) {
try {
if (this._error)
observer.error(this._error);
else
observer.complete();
} catch (e) {
this.onObserverException(e);
}
return false;
}
this._once.push(observer);
return true;
}
cin
working on Observable
r14 destroy() {
cin
fixed format-compile bug while formatting strings with new line symbols....
r15 if (this._complete)
this._notifyCompleted();
let cleanup = this._cleanup;
if (cleanup) {
this._cleanup = null;
cleanup();
}
cin
refactoring, all common interfaces placed to core/interfaces.ts...
r13 }
cin
working on Observable
r14 protected onObserverException(e: any) {
}
cin
fixed format-compile bug while formatting strings with new line symbols....
r15 private _removeOnce(d: IObserver<T>) {
cin
refactoring, all common interfaces placed to core/interfaces.ts...
r13 let i = this._once.indexOf(d);
if (i >= 0)
this._once.splice(i);
}
cin
fixed format-compile bug while formatting strings with new line symbols....
r15 private _removeObserver(d: IObserver<T>) {
cin
refactoring, all common interfaces placed to core/interfaces.ts...
r13 let i = this._observers.indexOf(d);
if (i >= 0)
this._observers.splice(i);
}
cin
fixed format-compile bug while formatting strings with new line symbols....
r15 private _notify(guard: (observer: IObserver<T>) => void) {
cin
refactoring, all common interfaces placed to core/interfaces.ts...
r13 if (this._once.length) {
for (let i = 0; i < this._once.length; i++)
guard(this._once[i]);
this._once = [];
}
for (let i = 0; i < this._observers.length; i++)
guard(this._observers[i]);
}
cin
fixed format-compile bug while formatting strings with new line symbols....
r15
protected _notifyNext(evt: T) {
let guard = (observer: IObserver<T>) => {
try {
observer.next(evt);
} catch (e) {
this.onObserverException(e);
}
}
this._notify(guard);
}
protected _notifyError(e: any) {
let guard = (observer: IObserver<T>) => {
try {
observer.error(e);
} catch (e) {
this.onObserverException(e);
}
}
this._notify(guard);
this._observers = [];
}
protected _notifyCompleted() {
let guard = (observer: IObserver<T>) => {
try {
observer.complete();
} catch (e) {
this.onObserverException(e);
}
}
this._notify(guard);
this._observers = [];
}
cin
refactoring, all common interfaces placed to core/interfaces.ts...
r13 }
namespace Observable {
}
export = Observable;