# HG changeset patch # User m407 # Date 2018-09-21 17:04:09 # Node ID e0b5fc764f84bf95bb0834f490a17e21acabba58 # Parent dd8f8dfcd934daa44544761f8755e9dd98a32510 # Parent ab0da47705f01924a81ab73ef82196524136da42 Merged in propose observables (pull request #2) Propose observables diff --git a/docs/cancellations.ru.md b/docs/en/cancellations.md rename from docs/cancellations.ru.md rename to docs/en/cancellations.md diff --git a/docs/en/observable.md b/docs/en/observable.md new file mode 100644 --- /dev/null +++ b/docs/en/observable.md @@ -0,0 +1,135 @@ +# Observable + +Универсальный способ организации потока сообщений. Данный механизм может +использоваться для оповещения об изменениях состояний объектов или для доставки +самостоятельных событий, например, связанных с действиями пользователя. + +Является реализацией классического шаблона наблюдателя с возможность сообщить +о коце потока событий. Данная реализация не содержит никаких дополнительных +функций, таких как фильтрация, канал с состоянием, преобразования сообщений и +т.п. Это сделано специально, чтобы реализация оставалась максимально простой. + +Пример того, как можно создать последовательность из 10 событий: + +```ts +var events = new Observable(async (notify, error, complete) => { + // цикл в котором возникает событие + for(let i = 0; i < 10; i++) { + await delay(1000); + // в качестве данных передается номер события + notify(i); + } + // по окончании последовательности информируем, что событий больше не будет + compelte(); +}); + +// создаем окно с отображением хода событий +var progress = showProgress({ min: 0, max: 9, current: 0}); + +// подписываемся на события +events.on( + // обработчик очередного события + msg => { + progress.setValue(msg); + }. + // обработчик ошибки + e => { + progress.showError(e); + }, + // обработчик конца потока + () => { + progress.close(); + } +); + +// ожидание следующего события +let firstEvent = await events.next(); +``` + +`Observable` можно создавать из событий другого объекта, например, виджета: + +```ts +// клсс +class Canvas { + readonly mouseMove: IObservable<[number,number]> + + postCreate() { + // превращаем события виджета в Observable + this.mouseMove = new Observable<[number,number]>((notify) => { + this.mousePad.on('mousemove',(e) => notify([e.clientX, e.clientY]) ); + }); + } +} + +``` + +Если объект инкапсулирует в себе `Observable`, он также может сохранить методы +для оповещения подписчиков для дальнейшего их использования внутри класса. + +```ts +// класс, который будет генерировать события местоположения +class PositionTracker implements IDestroyable { + // _nextPosition и _complete будут связаны с position при создании + // экземпляра PositionTracker. + _nextPosition: (pos: Position) => void + _complete: () => void + + readonly position: IObservable + + // конструктор + constructor(...args: any[]) { + super(args); + + // создаем Observable + this.position = new Observable((notify, error, complete) => { + // сохраняем методы для оповещения о новом местоположении + this._nextPosition = notify; + // метод об оповещении конца потока событий + this._complete = complete + }); + } + + // метод для очистки ресурсов + destroy() { + this._complete(); + + super(); + } +} + +``` + +## Observable и последовательности + +Можно сичтать, что `Observable` это некоторая аналогия итератора только в +парадигме событийного (или реактивного) программировния. Следует также понимать, +что при переходе от синхронного процедурного программирования к событийному так +же меняется и направление управления (Inverse Of Control), что означает +следующее: + +* при работе с итераторами клиенты сами определяют момент чтения следующего + элемента последовательности. +* при работе с `Observable` клиенты вынуждены обрабатывать эти события по мере + их поступления и не могут на это повлиять. + +Последний пункт можно изменить применив, например, буффер или канал с +состоянием, т.е. очередь, но данные механизмы выходят за рамки простого шаблона +наблюдателя. + +```ts +while(1) { + // ожидаем следующее событие, по сути это подписка только на одно событие + let next = await events.next(); + + // такой цикл может пропускать сообщения, поскольку асинхронная операция + // позволит возобновить создание новых событий, на которые мы не подписаны + await processEvent(next); + + // не только асинхронные операции могут привести к пропуску события + // например вызов метода, который приводит к созданию события так же + // приведет к тому, что созданное событие не будет обработано в текущем + // цикле + doSmthAndRiseEvent(); +} + +``` \ No newline at end of file diff --git a/docs/cancellations.ru.md b/docs/ru/cancellations.md copy from docs/cancellations.ru.md copy to docs/ru/cancellations.md diff --git a/docs/ru/observable.md b/docs/ru/observable.md new file mode 100644 --- /dev/null +++ b/docs/ru/observable.md @@ -0,0 +1,186 @@ +# Observable + +Универсальный способ организации потока сообщений. Данный механизм может +использоваться для оповещения об изменениях состояний объектов или для доставки +самостоятельных событий, например, связанных с действиями пользователя. + +Является реализацией классического шаблона наблюдателя с возможность сообщить +о коце потока событий. Данная реализация не содержит никаких дополнительных +функций, таких как фильтрация, канал с состоянием, преобразования сообщений и +т.п. Это сделано специально, чтобы реализация оставалась максимально простой. + +Пример того, как можно создать последовательность из 10 событий: + +```ts +var events = new Observable(async (notify, error, complete) => { + // цикл в котором возникает событие + for(let i = 0; i < 10; i++) { + await delay(1000); + // в качестве данных передается номер события + notify(i); + } + // по окончании последовательности информируем, что событий больше не будет + compelte(); +}); + +// создаем окно с отображением хода событий +var progress = showProgress({ min: 0, max: 9, current: 0}); + +// подписываемся на события +events.on( + // обработчик очередного события + msg => { + progress.setValue(msg); + }. + // обработчик ошибки + e => { + progress.showError(e); + }, + // обработчик конца потока + () => { + progress.close(); + } +); + +// ожидание следующего события +let firstEvent = await events.next(); +``` + +`Observable` можно создавать из событий другого объекта, например, виджета: + +```ts +// клсс +class Canvas { + readonly mouseMove: IObservable<[number,number]> + + postCreate() { + // превращаем события виджета в Observable + this.mouseMove = new Observable<[number,number]>((notify) => { + this.mousePad.on('mousemove',(e) => notify([e.clientX, e.clientY]) ); + }); + } +} + +``` + +Если объект инкапсулирует в себе `Observable`, он также может сохранить методы +для оповещения подписчиков для дальнейшего их использования внутри класса. + +```ts +// класс, который будет генерировать события местоположения +class PositionTracker implements IDestroyable { + // _nextPosition и _complete будут связаны с position при создании + // экземпляра PositionTracker. + _nextPosition: (pos: Position) => void + _complete: () => void + + readonly position: IObservable + + // конструктор + constructor(...args: any[]) { + super(args); + + // создаем Observable + this.position = new Observable((notify, error, complete) => { + // сохраняем методы для оповещения о новом местоположении + this._nextPosition = notify; + // метод об оповещении конца потока событий + this._complete = complete + }); + } + + // метод для очистки ресурсов + destroy() { + this._complete(); + + super(); + } +} +``` + +Существует также несколько варинатов получения сообщений + +```ts +// регистрация метода для получений событий +let subscription = pushEvents.on((msg) => { + displayPopup(msg); +}); + +// подписку можно отменить, после чего обработчики больше не будут вызываться +subcription.destroy(); + +// если требуется получить только одно сообщение можно использовать +// асинхронный метод next(ct?: ICancellation) + +let msg = await pushEvents.next(); + +// пример метода для получения координат с карты, который использует +// событие нажатия мышью для определения координат. + +class Map { + /** + + Получает координаты по щелчку мыши. + + @async + + @returns [lon,lat] + + */ + async peekCoordinates(ct: ICancellation = Cancellation.none) { + // получаем событие клика + let evt = this.viewport.click.next(ct); + + // преобразуем позицию на экране в координаты карты + return this.clientToCoodinates([evt.clientx,evt.clientY]); + } +} + + +let map : Map; // где-то объявлено + +// пример получения координат с карты +let coords = await map.peekCoordinates(); + +``` + +## Observable и последовательности + +Можно сичтать, что `Observable` это некоторая аналогия итератора только в +парадигме событийного (или реактивного) программировния. Следует также понимать, +что при переходе от синхронного процедурного программирования к событийному так +же меняется и направление управления (Inverse Of Control), что означает +следующее: + +* при работе с итераторами клиенты сами определяют момент чтения следующего + элемента последовательности. +* при работе с `Observable` клиенты вынуждены обрабатывать эти события по мере + их поступления и не могут на это повлиять. + +Последний пункт можно изменить применив, например, буффер или канал с +состоянием, т.е. очередь, но данные механизмы выходят за рамки простого шаблона +наблюдателя. + +```ts +// обработка в цикле не гарантирует получения всех сообщений +while(1) { + // ожидаем следующее событие, по сути это подписка только на одно событие + let next = await events.next(); + + // такой цикл может пропускать сообщения, поскольку асинхронная операция + // позволит возобновить создание новых событий, на которые мы не подписаны + await processEvent(next); + + // не только асинхронные операции могут привести к пропуску события + // например вызов метода, который приводит к созданию события так же + // приведет к тому, что созданное событие не будет обработано в текущем + // цикле + doSmthAndRiseEvent(); +} + +// для получения всех сообщений нужно регистрировать подписчика +events.on((data) => { + // будет вызван для всех сообщений + processEvent(data); +}); +``` \ No newline at end of file diff --git a/src/js/log/trace.js b/src/js/log/trace.js --- a/src/js/log/trace.js +++ b/src/js/log/trace.js @@ -1,6 +1,8 @@ -define(["./TraceSource"], function (TraceSource) { +define(["./TraceSource"], function (TraceSource_1) { 'use strict'; + var TraceSource = TraceSource_1.TraceSource; + return { on: function (filter, cb) { diff --git a/src/ts/Observable.ts b/src/ts/Observable.ts new file mode 100644 --- /dev/null +++ b/src/ts/Observable.ts @@ -0,0 +1,195 @@ +import { IObservable, IDestroyable, ICancellation } from './interfaces'; +import { Cancellation } from './Cancellation' +import { argumentNotNull } from './safe'; + + +interface Handler { + (x: T): void +} + +interface Initializer { + (notify: Handler, error?: (e: any) => void, complete?: () => void): void; +} + +// TODO: think about to move this interfaces.ts and make it public +interface IObserver { + next(event: T): void + + error(e: any): void + + complete(): void +} + +const noop = () => {}; + +export class Observable implements IObservable { + private _once = new Array>(); + + private _observers = new Array>(); + + + private _complete: boolean + + private _error: any + + constructor(func?: Initializer) { + if (func) + func( + this._notifyNext.bind(this), + this._notifyError.bind(this), + this._notifyCompleted.bind(this) + ); + } + + /** + * Registers handlers for the current observable object. + * + * @param next the handler for events + * @param error the handler for a error + * @param complete the handler for a completion + * @returns {IDestroyable} the handler for the current subscription, this + * handler can be used to unsubscribe from events. + * + */ + on(next: Handler, error?: Handler, complete?: () => void): IDestroyable { + argumentNotNull(next, "next"); + + let me = this; + + let observer: IObserver & IDestroyable = { + next: next, + error: error ? error.bind(null) : noop, + complete: complete ? complete.bind(null) : noop, + + destroy() { + me._removeObserver(this); + } + }; + + this._addObserver(observer); + + + return observer; + } + + private _addObserver(observer: IObserver) { + if (this._complete) { + try { + if (this._error) + observer.error(this._error); + else + observer.complete(); + } catch (e) { + this.onObserverException(e); + } + } else { + this._observers.push(observer); + } + } + + /** + * Waits for the next event. This method can't be used to read messages + * as a sequence since it can skip some messages between calls. + * + * @param ct a cancellation token + */ + next(ct: ICancellation = Cancellation.none): Promise { + return new Promise((resolve, reject) => { + let observer: IObserver = { + next: resolve, + error: reject, + complete: () => reject("No more events are available") + }; + + if (this._addOnce(observer) && ct.isSupported()) { + ct.register((e) => { + this._removeOnce(observer); + reject(e); + }); + } + }); + } + + private _addOnce(observer: IObserver) { + if (this._complete) { + try { + if (this._error) + observer.error(this._error); + else + observer.complete(); + } catch (e) { + this.onObserverException(e); + } + return false; + } + + this._once.push(observer); + return true; + } + + protected onObserverException(e: any) { + } + + private _removeOnce(d: IObserver) { + let i = this._once.indexOf(d); + if (i >= 0) + this._once.splice(i, 1); + } + + private _removeObserver(d: IObserver) { + let i = this._observers.indexOf(d); + if (i >= 0) + this._observers.splice(i, 1); + } + + private _notify(guard: (observer: IObserver) => void) { + if (this._once.length) { + for (let i = 0; i < this._once.length; i++) + guard(this._once[i]); + this._once = []; + } + + for (let i = 0; i < this._observers.length; i++) + guard(this._observers[i]); + } + + protected _notifyNext(evt: T) { + let guard = (observer: IObserver) => { + try { + observer.next(evt); + } catch (e) { + this.onObserverException(e); + } + } + + this._notify(guard); + } + + protected _notifyError(e: any) { + let guard = (observer: IObserver) => { + try { + observer.error(e); + } catch (e) { + this.onObserverException(e); + } + } + + this._notify(guard); + this._observers = []; + this._complete = true; + } + + protected _notifyCompleted() { + let guard = (observer: IObserver) => { + try { + observer.complete(); + } catch (e) { + this.onObserverException(e); + } + } + + this._notify(guard); + this._observers = []; + this._complete = true; + } +} \ No newline at end of file diff --git a/src/ts/components/ActivatableMixin.ts b/src/ts/components/ActivatableMixin.ts --- a/src/ts/components/ActivatableMixin.ts +++ b/src/ts/components/ActivatableMixin.ts @@ -1,13 +1,13 @@ import { IActivationController, IActivatable, ICancellation } from '../interfaces'; import { AsyncComponent } from './AsyncComponent'; import { Cancellation } from '../Cancellation'; -import * as TraceSource from '../log/TraceSource'; +import { TraceSource } from '../log/TraceSource'; type Constructor = new (...args: any[]) => T; const log = TraceSource.get('@implab/core/components/ActivatableMixin'); -function ActivatableMixin>(Base: TBase) { +export function ActivatableMixin>(Base: TBase) { return class extends Base implements IActivatable { _controller: IActivationController; @@ -80,8 +80,4 @@ function ActivatableMixin void; + _completion: Promise = Promise.resolve(); getCompletion() { return this._completion }; runOperation(op: (ct: ICancellation) => any, ct: ICancellation = Cancellation.none) { + // create inner cancellation bound to the passed cancellation token + let h: IDestroyable; + let inner = new Cancellation(cancel => { + + this._cancel = cancel; + h = ct.register(cancel); + }); + // TODO create cancellation source here - async function guard() { - await op(ct); + let guard = async () => { + try { + await op(inner); + } finally { + // after the operation is complete we need to cleanup the + // resources + destroy(h); + this._cancel = null; + } } return this._completion = guard(); } + + cancel(reason) { + if (this._cancel) + this._cancel(reason); + } } \ No newline at end of file diff --git a/src/ts/components/Observable.ts b/src/ts/components/Observable.ts deleted file mode 100644 --- a/src/ts/components/Observable.ts +++ /dev/null @@ -1,198 +0,0 @@ -import { IObservable, IDestroyable, ICancellation } from '../interfaces'; -import { Cancellation } from '../Cancellation' -import { argumentNotNull } from '../safe'; - - -interface Handler { - (x: T): void -} - -interface Initializer { - (notify: Handler, error?: (e: any) => void, complete?: () => void): void; -} - -// TODO: think about to move this interfaces.ts and make it public -interface IObserver { - next(event: T): void - - error(e: any): void - - complete(): void -} - -const noop = () => {}; - -class Observable implements IObservable { - private _once = new Array>(); - - private _observers = new Array>(); - - - private _complete: boolean - - private _error: any - - constructor(func?: Initializer) { - if (func) - func( - this._notifyNext.bind(this), - this._notifyError.bind(this), - this._notifyCompleted.bind(this) - ); - } - - /** - * Registers handlers for the current observable object. - * - * @param next the handler for events - * @param error the handler for a error - * @param complete the handler for a completion - * @returns {IDestroyable} the handler for the current subscription, this - * handler can be used to unsubscribe from events. - * - */ - on(next: Handler, error?: Handler, complete?: () => void): IDestroyable { - argumentNotNull(next, "next"); - - let me = this; - - let observer: IObserver & IDestroyable = { - next: next, - error: error ? error.bind(null) : noop, - complete: complete ? complete.bind(null) : noop, - - destroy() { - me._removeObserver(this); - } - } - - this._addObserver(observer); - - - return observer; - } - - private _addObserver(observer: IObserver) { - if (this._complete) { - try { - if (this._error) - observer.error(this._error); - else - observer.complete(); - } catch (e) { - this.onObserverException(e); - } - } else { - this._observers.push(observer); - } - } - - /** - * Waits for the next event. This method can't be used to read messages - * as a sequence since it can skip some messages between calls. - * - * @param ct a cancellation token - */ - next(ct: ICancellation = Cancellation.none): Promise { - return new Promise((resolve, reject) => { - let observer: IObserver = { - next: resolve, - error: reject, - complete: () => reject("No more events are available") - }; - - if (this._addOnce(observer) && ct.isSupported()) { - ct.register((e) => { - this._removeOnce(observer); - reject(e); - }); - } - }); - } - - private _addOnce(observer: IObserver) { - if (this._complete) { - try { - if (this._error) - observer.error(this._error); - else - observer.complete(); - } catch (e) { - this.onObserverException(e); - } - return false; - } - - this._once.push(observer); - return true; - } - - protected onObserverException(e: any) { - } - - private _removeOnce(d: IObserver) { - let i = this._once.indexOf(d); - if (i >= 0) - this._once.splice(i, 1); - } - - private _removeObserver(d: IObserver) { - let i = this._observers.indexOf(d); - if (i >= 0) - this._observers.splice(i, 1); - } - - private _notify(guard: (observer: IObserver) => void) { - if (this._once.length) { - for (let i = 0; i < this._once.length; i++) - guard(this._once[i]); - this._once = []; - } - - for (let i = 0; i < this._observers.length; i++) - guard(this._observers[i]); - } - - protected _notifyNext(evt: T) { - let guard = (observer: IObserver) => { - try { - observer.next(evt); - } catch (e) { - this.onObserverException(e); - } - } - - this._notify(guard); - } - - protected _notifyError(e: any) { - let guard = (observer: IObserver) => { - try { - observer.error(e); - } catch (e) { - this.onObserverException(e); - } - } - - this._notify(guard); - this._observers = []; - } - - protected _notifyCompleted() { - let guard = (observer: IObserver) => { - try { - observer.complete(); - } catch (e) { - this.onObserverException(e); - } - } - - this._notify(guard); - this._observers = []; - } -} - -namespace Observable { -} - -export = Observable; \ No newline at end of file diff --git a/src/ts/log/TraceEvent.ts b/src/ts/log/TraceEvent.ts deleted file mode 100644 --- a/src/ts/log/TraceEvent.ts +++ /dev/null @@ -1,21 +0,0 @@ -import * as TraceSource from './TraceSource' - -class TraceEvent { - readonly source: TraceSource; - - readonly level: Number; - - readonly arg: any; - - constructor(source: TraceSource, level: Number, arg: any) { - this.source = source; - this.level = level; - this.arg = arg; - } -} - -namespace TraceEvent { - -} - -export = TraceEvent \ No newline at end of file diff --git a/src/ts/log/TraceSource.ts b/src/ts/log/TraceSource.ts --- a/src/ts/log/TraceSource.ts +++ b/src/ts/log/TraceSource.ts @@ -1,8 +1,31 @@ import * as format from '../text/format' import { argumentNotNull } from '../safe'; -import * as Observable from '../components/Observable' +import { Observable } from '../Observable' import { IDestroyable } from '../interfaces'; -import * as TraceEvent from './TraceEvent' + +export const DebugLevel = 400; + +export const LogLevel = 300; + +export const WarnLevel = 200; + +export const ErrorLevel = 100; + +export const SilentLevel = 0; + +export class TraceEvent { + readonly source: TraceSource; + + readonly level: Number; + + readonly arg: any; + + constructor(source: TraceSource, level: Number, arg: any) { + this.source = source; + this.level = level; + this.arg = arg; + } +} class Registry { static readonly instance = new Registry(); @@ -56,14 +79,21 @@ class Registry { } } -class TraceSource extends Observable { +export class TraceSource { readonly id: any level: number + readonly events: Observable + + _notifyNext: (arg: TraceEvent) => void + constructor(id: any) { - super(); + this.id = id || new Object(); + this.events = new Observable((next) => { + this._notifyNext = next; + }) } protected emit(level: number, arg: any) { @@ -71,37 +101,37 @@ class TraceSource extends Observable= TraceSource.DebugLevel; + return this.level >= DebugLevel; } debug(msg: string, ...args: any[]) { - if (this.isEnabled(TraceSource.DebugLevel)) - this.emit(TraceSource.DebugLevel, format(msg, args)); + if (this.isEnabled(DebugLevel)) + this.emit(DebugLevel, format(msg, args)); } isLogEnabled() { - return this.level >= TraceSource.LogLevel; + return this.level >= LogLevel; } log(msg: string, ...args: any[]) { - if (this.isEnabled(TraceSource.LogLevel)) - this.emit(TraceSource.LogLevel, format(msg, args)); + if (this.isEnabled(LogLevel)) + this.emit(LogLevel, format(msg, args)); } isWarnEnabled() { - return this.level >= TraceSource.WarnLevel; + return this.level >= WarnLevel; } warn(msg: string, ...args: any[]) { - if (this.isEnabled(TraceSource.WarnLevel)) - this.emit(TraceSource.WarnLevel, format(msg, args)); + if (this.isEnabled(WarnLevel)) + this.emit(WarnLevel, format(msg, args)); } /** * returns true if errors will be recorded. */ isErrorEnabled() { - return this.level >= TraceSource.ErrorLevel; + return this.level >= ErrorLevel; } /** @@ -111,8 +141,8 @@ class TraceSource extends Observable(); writeEvents(source: IObservable, ct: ICancellation = Cancellation.none) { @@ -15,9 +14,9 @@ class ConsoleWriter implements IDestroya } writeEvent(next: TraceEvent) { - if (next.level >= TraceSource.LogLevel) { + if (next.level >= LogLevel) { console.log(next.source.id.toString(), next.arg); - } else if(next.level >= TraceSource.WarnLevel) { + } else if(next.level >= WarnLevel) { console.warn(next.source.id.toString(), next.arg); } else { console.error(next.source.id.toString(), next.arg); @@ -27,9 +26,4 @@ class ConsoleWriter implements IDestroya destroy() { this._subscriptions.forEach(x => x.destroy()); } -} - -namespace ConsoleWriter { -} - -export = ConsoleWriter; \ No newline at end of file +} \ No newline at end of file diff --git a/src/ts/safe.ts b/src/ts/safe.ts --- a/src/ts/safe.ts +++ b/src/ts/safe.ts @@ -228,4 +228,9 @@ export function first(sequence: any, cb: return err(new Error("The sequence is required")); else throw new Error("The sequence is required"); +} + +export function destroy(d: any) { + if (d && 'destroy' in d) + d.destroy(); } \ No newline at end of file diff --git a/test/js/plan.js b/test/js/plan.js --- a/test/js/plan.js +++ b/test/js/plan.js @@ -1,2 +1,3 @@ -define(["./ActivatableTests", "./trace-test", "./TraceSourceTests", "./CancellationTests"]); -//define(["./CancellationTests"]); \ No newline at end of file +//define(["./ActivatableTests", "./trace-test", "./TraceSourceTests", "./CancellationTests"]); +//define(["./CancellationTests"]); +define(["./ObservableTests"]); \ No newline at end of file diff --git a/test/js/trace-test.js b/test/js/trace-test.js --- a/test/js/trace-test.js +++ b/test/js/trace-test.js @@ -3,7 +3,8 @@ define(["tape"], function(tape) { var sourceId = '73a633f3-eab8-49b0-8601-07cae710f234'; var sourceId2 = '3ba9c7cd-ed77-437b-9a2f-1cbeb1226b5b'; tape('Load TraceSource for the module', function(t) { - require(["core/log/trace!" + sourceId, "core/log/TraceSource"], function(trace, TraceSource) { + require(["core/log/trace!" + sourceId, "core/log/TraceSource"], function(trace, TraceSource_1) { + var TraceSource = TraceSource_1.TraceSource; t.equal(trace && trace.id, sourceId, "trace should be taken from the loader plugin parameter"); var count = 0; diff --git a/test/ts/ActivatableTests.ts b/test/ts/ActivatableTests.ts --- a/test/ts/ActivatableTests.ts +++ b/test/ts/ActivatableTests.ts @@ -1,5 +1,5 @@ import * as tape from 'tape'; -import * as ActivatableMixin from '@implab/core/components/ActivatableMixin'; +import { ActivatableMixin} from '@implab/core/components/ActivatableMixin'; import { AsyncComponent } from '@implab/core/components/AsyncComponent'; import { IActivationController, IActivatable, ICancellation } from '@implab/core/interfaces'; import { Cancellation } from '@implab/core/Cancellation'; diff --git a/test/ts/ObservableTests.ts b/test/ts/ObservableTests.ts new file mode 100644 --- /dev/null +++ b/test/ts/ObservableTests.ts @@ -0,0 +1,74 @@ +import { TraceSource, DebugLevel } from '@implab/core/log/TraceSource' +import * as tape from 'tape'; +import { TapeWriter, delay } from './TestTraits'; +import { Observable } from '@implab/core/Observable'; +import { IObservable } from '@implab/core/interfaces'; + +let trace = TraceSource.get("ObservableTests"); + +tape('events sequence example', async t => { + + + let events: IObservable + + let done = new Promise((resolve) => { + events = new Observable(async (notify, fail, complete) => { + for (let i = 0; i < 10; i++) { + await delay(0); + notify(i); + } + complete(); + resolve(); + }); + }); + + let count = 0; + let complete = false; + events.on(x => count = count + x, null, () => complete = true); + + let first = await events.next(); + + t.equals(first, 0, "the first event"); + t.false(complete, "the sequence is not complete"); + + await done; + + t.equals(count, 45, "the summ of the evetns"); + t.true(complete, "the sequence is complete"); + + t.end(); +}); + +tape('event sequence termination', async t => { + let events: IObservable + + let done = new Promise((resolve) => { + events = new Observable(async (notify, fail, complete) => { + await delay(0); + notify(1); + complete(); + notify(2); + complete(); + fail("Sequence terminated"); + resolve(); + }); + }); + + let count = 0; + events.on(() => {}, (e) => count++, () => count++); + + let first = await events.next(); + t.equals(first, 1, "the first message"); + try { + await events.next(); + t.fail("shoud throw an exception"); + } catch(e) { + t.pass("the sequence is terminated"); + } + + await done; + + t.equals(count, 1, "the sequence must be terminated once"); + + t.end(); +}); \ No newline at end of file diff --git a/test/ts/TestTraits.ts b/test/ts/TestTraits.ts --- a/test/ts/TestTraits.ts +++ b/test/ts/TestTraits.ts @@ -1,9 +1,8 @@ -import { IObservable, ICancellation, IDestroyable } from "../../build/dist/interfaces"; -import * as TraceEvent from '../../build/dist/log/TraceEvent'; -import { Cancellation } from "../../build/dist/Cancellation"; -import * as TraceSource from "../../build/dist/log/TraceSource"; +import { IObservable, ICancellation, IDestroyable } from "@implab/core/interfaces"; +import { Cancellation } from "@implab/core/Cancellation"; +import { TraceEvent, LogLevel, WarnLevel } from "@implab/core/log/TraceSource"; import * as tape from 'tape'; -import { argumentNotNull } from "../../build/dist/safe"; +import { argumentNotNull } from "@implab/core/safe"; export class TapeWriter implements IDestroyable { readonly _tape: tape.Test @@ -24,9 +23,9 @@ export class TapeWriter implements IDest } writeEvent(next: TraceEvent) { - if (next.level >= TraceSource.LogLevel) { + if (next.level >= LogLevel) { this._tape.comment("LOG " + next.arg); - } else if (next.level >= TraceSource.WarnLevel) { + } else if (next.level >= WarnLevel) { this._tape.comment("WARN " + next.arg); } else { this._tape.comment("ERROR " + next.arg); diff --git a/test/ts/TraceSourceTests.ts b/test/ts/TraceSourceTests.ts --- a/test/ts/TraceSourceTests.ts +++ b/test/ts/TraceSourceTests.ts @@ -1,4 +1,4 @@ -import * as TraceSource from '@implab/core/log/TraceSource' +import { TraceSource, DebugLevel } from '@implab/core/log/TraceSource' import * as tape from 'tape'; import { TapeWriter } from './TestTraits'; @@ -7,11 +7,11 @@ const sourceId = 'test/TraceSourceTests' tape('trace message', t => { let trace = TraceSource.get(sourceId); - trace.level = TraceSource.DebugLevel; + trace.level = DebugLevel; - let h = trace.on((ev) => { + let h = trace.events.on((ev) => { t.equal(ev.source, trace, "sender should be the current trace source"); - t.equal(ev.level, TraceSource.DebugLevel, "level should be debug level"); + t.equal(ev.level, DebugLevel, "level should be debug level"); t.equal(ev.arg, "Hello, World!", "The message should be a formatted message"); t.end(); @@ -25,21 +25,21 @@ tape('trace message', t => { tape('trace event', t => { let trace = TraceSource.get(sourceId); - trace.level = TraceSource.DebugLevel; + trace.level = DebugLevel; let event = { name: "custom event" }; - let h = trace.on((ev) => { + let h = trace.events.on((ev) => { t.equal(ev.source, trace, "sender should be the current trace source"); - t.equal(ev.level, TraceSource.DebugLevel, "level should be debug level"); + t.equal(ev.level, DebugLevel, "level should be debug level"); t.equal(ev.arg, event, "The message should be the specified object"); t.end(); }); - trace.traceEvent(TraceSource.DebugLevel, event); + trace.traceEvent(DebugLevel, event); h.destroy(); }); @@ -48,11 +48,11 @@ tape('tape comment writer', async t => { let writer = new TapeWriter(t); TraceSource.on(ts => { - writer.writeEvents(ts); + writer.writeEvents(ts.events); }); let trace = TraceSource.get(sourceId); - trace.level = TraceSource.DebugLevel; + trace.level = DebugLevel; trace.log("Hello, {0}!", 'World'); trace.log("Multi\n line");