##// END OF EJS Templates
spelling fixes
spelling fixes

File last commit:

r18:a8dda6a00a16 propose cancellat...
r19:9d394c2adc2b propose cancellat...
Show More
Observable.ts
197 lines | 4.9 KiB | video/mp2t | TypeScriptLexer
cin
working on Observable
r14 import { IObservable, IDestroyable, ICancellation } from '../interfaces';
cin
refactoring, all common interfaces placed to core/interfaces.ts...
r13 import { Cancellation } from '../Cancellation'
import { argumentNotNull } from '../safe';
cin
working on Observable
r14 interface Handler<T> {
cin
fixed format-compile bug while formatting strings with new line symbols....
r15 (x: T): void
cin
working on Observable
r14 }
cin
refactoring, all common interfaces placed to core/interfaces.ts...
r13
cin
working on Observable
r14 interface Initializer<T> {
cin
added CancellationTests...
r18 (notify: Handler<T>, error?: (e: any) => void, complete?: () => void): void;
cin
working on Observable
r14 }
cin
fixed format-compile bug while formatting strings with new line symbols....
r15 // TODO: think about to move this interfaces.ts and make it public
interface IObserver<T> {
next(event: T): void
error(e: any): void
complete(): void
}
cin
refactoring, all common interfaces placed to core/interfaces.ts...
r13
cin
added CancellationTests...
r18 const noop = () => {};
class Observable<T> implements IObservable<T> {
cin
fixed format-compile bug while formatting strings with new line symbols....
r15 private _once = new Array<IObserver<T>>();
private _observers = new Array<IObserver<T>>();
cin
working on Observable
r14
cin
refactoring, all common interfaces placed to core/interfaces.ts...
r13
cin
fixed format-compile bug while formatting strings with new line symbols....
r15 private _complete: boolean
private _error: any
cin
working on Observable
r14
constructor(func?: Initializer<T>) {
cin
added CancellationTests...
r18 if (func)
func(
this._notifyNext.bind(this),
this._notifyError.bind(this),
this._notifyCompleted.bind(this)
);
cin
refactoring, all common interfaces placed to core/interfaces.ts...
r13 }
cin
fixed format-compile bug while formatting strings with new line symbols....
r15 /**
* Registers handlers for the current observable object.
*
* @param next the handler for events
* @param error the handler for a error
* @param complete the handler for a completion
* @returns {IDestroyable} the handler for the current subscription, this
* handler can be used to unsubscribe from events.
*
*/
on(next: Handler<T>, error?: Handler<any>, complete?: () => void): IDestroyable {
argumentNotNull(next, "next");
cin
refactoring, all common interfaces placed to core/interfaces.ts...
r13
let me = this;
cin
fixed format-compile bug while formatting strings with new line symbols....
r15
let observer: IObserver<T> & IDestroyable = {
next: next,
cin
added CancellationTests...
r18 error: error ? error.bind(null) : noop,
complete: complete ? complete.bind(null) : noop,
cin
fixed format-compile bug while formatting strings with new line symbols....
r15
cin
refactoring, all common interfaces placed to core/interfaces.ts...
r13 destroy() {
cin
fixed format-compile bug while formatting strings with new line symbols....
r15 me._removeObserver(this);
cin
refactoring, all common interfaces placed to core/interfaces.ts...
r13 }
}
cin
fixed format-compile bug while formatting strings with new line symbols....
r15
this._addObserver(observer);
return observer;
}
private _addObserver(observer: IObserver<T>) {
if (this._complete) {
try {
if (this._error)
observer.error(this._error);
else
observer.complete();
} catch (e) {
this.onObserverException(e);
}
} else {
this._observers.push(observer);
}
cin
refactoring, all common interfaces placed to core/interfaces.ts...
r13 }
cin
fixed format-compile bug while formatting strings with new line symbols....
r15 /**
* Waits for the next event. This method can't be used to read messages
* as a sequence since it can skip some messages between calls.
*
* @param ct a cancellation token
*/
cin
working on Observable
r14 next(ct: ICancellation = Cancellation.none): Promise<T> {
cin
refactoring, all common interfaces placed to core/interfaces.ts...
r13 return new Promise<T>((resolve, reject) => {
cin
fixed format-compile bug while formatting strings with new line symbols....
r15 let observer: IObserver<T> = {
next: resolve,
error: reject,
complete: () => reject("No more events are available")
};
if (this._addOnce(observer) && ct.isSupported()) {
cin
refactoring, all common interfaces placed to core/interfaces.ts...
r13 ct.register((e) => {
cin
fixed format-compile bug while formatting strings with new line symbols....
r15 this._removeOnce(observer);
cin
refactoring, all common interfaces placed to core/interfaces.ts...
r13 reject(e);
});
}
});
}
cin
fixed format-compile bug while formatting strings with new line symbols....
r15 private _addOnce(observer: IObserver<T>) {
if (this._complete) {
try {
if (this._error)
observer.error(this._error);
else
observer.complete();
} catch (e) {
this.onObserverException(e);
}
return false;
}
this._once.push(observer);
return true;
}
cin
working on Observable
r14 protected onObserverException(e: any) {
}
cin
fixed format-compile bug while formatting strings with new line symbols....
r15 private _removeOnce(d: IObserver<T>) {
cin
refactoring, all common interfaces placed to core/interfaces.ts...
r13 let i = this._once.indexOf(d);
if (i >= 0)
cin
added CancellationTests...
r18 this._once.splice(i, 1);
cin
refactoring, all common interfaces placed to core/interfaces.ts...
r13 }
cin
fixed format-compile bug while formatting strings with new line symbols....
r15 private _removeObserver(d: IObserver<T>) {
cin
refactoring, all common interfaces placed to core/interfaces.ts...
r13 let i = this._observers.indexOf(d);
if (i >= 0)
cin
added CancellationTests...
r18 this._observers.splice(i, 1);
cin
refactoring, all common interfaces placed to core/interfaces.ts...
r13 }
cin
fixed format-compile bug while formatting strings with new line symbols....
r15 private _notify(guard: (observer: IObserver<T>) => void) {
cin
refactoring, all common interfaces placed to core/interfaces.ts...
r13 if (this._once.length) {
for (let i = 0; i < this._once.length; i++)
guard(this._once[i]);
this._once = [];
}
for (let i = 0; i < this._observers.length; i++)
guard(this._observers[i]);
}
cin
fixed format-compile bug while formatting strings with new line symbols....
r15
protected _notifyNext(evt: T) {
let guard = (observer: IObserver<T>) => {
try {
observer.next(evt);
} catch (e) {
this.onObserverException(e);
}
}
this._notify(guard);
}
protected _notifyError(e: any) {
let guard = (observer: IObserver<T>) => {
try {
observer.error(e);
} catch (e) {
this.onObserverException(e);
}
}
this._notify(guard);
this._observers = [];
}
protected _notifyCompleted() {
let guard = (observer: IObserver<T>) => {
try {
observer.complete();
} catch (e) {
this.onObserverException(e);
}
}
this._notify(guard);
this._observers = [];
}
cin
refactoring, all common interfaces placed to core/interfaces.ts...
r13 }
namespace Observable {
}
export = Observable;