# HG changeset patch # User cin # Date 2018-09-19 11:19:51 # Node ID d420abe33095b56dff4e234cfa48084fade45c8f # Parent 93dca6f27f52a31d38cd8b75cbc0b953219d48cd documenting Observable.ts moved to the root of @implab/core diff --git a/docs/ActivatableMixin/observable.ru.md b/docs/ActivatableMixin/observable.ru.md deleted file mode 100644 --- a/docs/ActivatableMixin/observable.ru.md +++ /dev/null @@ -1,55 +0,0 @@ -# Observable - -Универсальный способ организации потока сообщений. Данный механизм может -использоваться для оповещения об изменениях состояний объектов или для доставки -самостоятельных событий, например, связанных с действиями пользователя. - -Является реализацией классического наблюдателя с возможность сообщить о коце -потока событий. Данная реализация не содержит никаких дополнительных функций, -таких как фильтрация, канал с состоянием, преобразования сообщений и т.п. Это -сделано специально, чтобы реализация оставалась максимально простой. - -Пример того, как можно создать последовательность из 10 событий: - -```ts - -var events = new Observable(async (notify, error, complete) => { - // цикл в котором возникает событие - for(let i = 0; i < 10; i++) { - await delay(1000); - // в качестве данных передается номер события - notify(i); - } - // по окончании последовательности информируем, что событий больше не будет - compelte(); -}); - -// создаем окно с отображением хода событий -var progress = showProgress({ min: 0, max: 9, current: 0}); - -// подписываемся на события -events.on( - // обработчик очередного события - msg => { - progress.setValue(msg); - }. - // обработчик ошибки - e => { - progress.showError(e); - }, - // обработчик конца потока - () => { - progress.close(); - } -); - -``` - -```ts - -// превращаем события dom в Observable -var events = new Observable((notify) => { - on(domNode,'mousemove', notify); -}); - -``` \ No newline at end of file diff --git a/docs/observable.ru.md b/docs/observable.ru.md new file mode 100644 --- /dev/null +++ b/docs/observable.ru.md @@ -0,0 +1,135 @@ +# Observable + +Универсальный способ организации потока сообщений. Данный механизм может +использоваться для оповещения об изменениях состояний объектов или для доставки +самостоятельных событий, например, связанных с действиями пользователя. + +Является реализацией классического шаблона наблюдателя с возможность сообщить +о коце потока событий. Данная реализация не содержит никаких дополнительных +функций, таких как фильтрация, канал с состоянием, преобразования сообщений и +т.п. Это сделано специально, чтобы реализация оставалась максимально простой. + +Пример того, как можно создать последовательность из 10 событий: + +```ts +var events = new Observable(async (notify, error, complete) => { + // цикл в котором возникает событие + for(let i = 0; i < 10; i++) { + await delay(1000); + // в качестве данных передается номер события + notify(i); + } + // по окончании последовательности информируем, что событий больше не будет + compelte(); +}); + +// создаем окно с отображением хода событий +var progress = showProgress({ min: 0, max: 9, current: 0}); + +// подписываемся на события +events.on( + // обработчик очередного события + msg => { + progress.setValue(msg); + }. + // обработчик ошибки + e => { + progress.showError(e); + }, + // обработчик конца потока + () => { + progress.close(); + } +); + +// ожидание следующего события +let firstEvent = await events.next(); +``` + +`Observable` можно создавать из событий другого объекта, например, виджета: + +```ts +// клсс +class Canvas { + readonly mouseMove: IObservable<[number,number]> + + postCreate() { + // превращаем события виджета в Observable + this.mouseMove = new Observable<[number,number]>((notify) => { + this.mousePad.on('mousemove',(e) => notify([e.clientX, e.clientY]) ); + }); + } +} + +``` + +Если объект инкапсулирует в себе `Observable`, он также может сохранить методы +для оповещения подписчиков для дальнейшего их использования внутри класса. + +```ts +// класс, который будет генерировать события местоположения +class PositionTracker implements IDestroyable { + // _nextPosition и _complete будут связаны с position при создании + // экземпляра PositionTracker. + _nextPosition: (pos: Position) => void + _complete: () => void + + readonly position: IObservable + + // конструктор + constructor(...args: any[]) { + super(args); + + // создаем Observable + this.position = new Observable((notify, error, complete) => { + // сохраняем методы для оповещения о новом местоположении + this._nextPosition = notify; + // метод об оповещении конца потока событий + this._complete = complete + }); + } + + // метод для очистки ресурсов + destroy() { + this._complete(); + + super(); + } +} + +``` + +## Observable и последовательности + +Можно сичтать, что `Observable` это некоторая аналогия итератора только в +парадигме событийного (или реактивного) программировния. Следует также понимать, +что при переходе от синхронного процедурного программирования к событийному так +же меняется и направление управления (Inverse Of Control), что означает +следующее: + +* при работе с итераторами клиенты сами определяют момент чтения следующего + элемента последовательности. +* при работе с `Observable` клиенты вынуждены обрабатывать эти события по мере + их поступления и не могут на это повлиять. + +Последний пункт можно изменить применив, например, буффер или канал с +состоянием, т.е. очередь, но данные механизмы выходят за рамки простого шаблона +наблюдателя. + +```ts +while(1) { + // ожидаем следующее событие, по сути это подписка только на одно событие + let next = await events.next(); + + // такой цикл может пропускать сообщения, поскольку асинхронная операция + // позволит возобновить создание новых событий, на которые мы не подписаны + await processEvent(next); + + // не только асинхронные операции могут привести к пропуску события + // например вызов метода, который приводит к созданию события так же + // приведет к тому, что созданное событие не будет обработано в текущем + // цикле + doSmthAndRiseEvent(); +} + +``` \ No newline at end of file diff --git a/src/ts/Observable.ts b/src/ts/Observable.ts new file mode 100644 --- /dev/null +++ b/src/ts/Observable.ts @@ -0,0 +1,193 @@ +import { IObservable, IDestroyable, ICancellation } from './interfaces'; +import { Cancellation } from './Cancellation' +import { argumentNotNull } from './safe'; + + +interface Handler { + (x: T): void +} + +interface Initializer { + (notify: Handler, error?: (e: any) => void, complete?: () => void): void; +} + +// TODO: think about to move this interfaces.ts and make it public +interface IObserver { + next(event: T): void + + error(e: any): void + + complete(): void +} + +const noop = () => {}; + +export class Observable implements IObservable { + private _once = new Array>(); + + private _observers = new Array>(); + + + private _complete: boolean + + private _error: any + + constructor(func?: Initializer) { + if (func) + func( + this._notifyNext.bind(this), + this._notifyError.bind(this), + this._notifyCompleted.bind(this) + ); + } + + /** + * Registers handlers for the current observable object. + * + * @param next the handler for events + * @param error the handler for a error + * @param complete the handler for a completion + * @returns {IDestroyable} the handler for the current subscription, this + * handler can be used to unsubscribe from events. + * + */ + on(next: Handler, error?: Handler, complete?: () => void): IDestroyable { + argumentNotNull(next, "next"); + + let me = this; + + let observer: IObserver & IDestroyable = { + next: next, + error: error ? error.bind(null) : noop, + complete: complete ? complete.bind(null) : noop, + + destroy() { + me._removeObserver(this); + } + } + + this._addObserver(observer); + + + return observer; + } + + private _addObserver(observer: IObserver) { + if (this._complete) { + try { + if (this._error) + observer.error(this._error); + else + observer.complete(); + } catch (e) { + this.onObserverException(e); + } + } else { + this._observers.push(observer); + } + } + + /** + * Waits for the next event. This method can't be used to read messages + * as a sequence since it can skip some messages between calls. + * + * @param ct a cancellation token + */ + next(ct: ICancellation = Cancellation.none): Promise { + return new Promise((resolve, reject) => { + let observer: IObserver = { + next: resolve, + error: reject, + complete: () => reject("No more events are available") + }; + + if (this._addOnce(observer) && ct.isSupported()) { + ct.register((e) => { + this._removeOnce(observer); + reject(e); + }); + } + }); + } + + private _addOnce(observer: IObserver) { + if (this._complete) { + try { + if (this._error) + observer.error(this._error); + else + observer.complete(); + } catch (e) { + this.onObserverException(e); + } + return false; + } + + this._once.push(observer); + return true; + } + + protected onObserverException(e: any) { + } + + private _removeOnce(d: IObserver) { + let i = this._once.indexOf(d); + if (i >= 0) + this._once.splice(i, 1); + } + + private _removeObserver(d: IObserver) { + let i = this._observers.indexOf(d); + if (i >= 0) + this._observers.splice(i, 1); + } + + private _notify(guard: (observer: IObserver) => void) { + if (this._once.length) { + for (let i = 0; i < this._once.length; i++) + guard(this._once[i]); + this._once = []; + } + + for (let i = 0; i < this._observers.length; i++) + guard(this._observers[i]); + } + + protected _notifyNext(evt: T) { + let guard = (observer: IObserver) => { + try { + observer.next(evt); + } catch (e) { + this.onObserverException(e); + } + } + + this._notify(guard); + } + + protected _notifyError(e: any) { + let guard = (observer: IObserver) => { + try { + observer.error(e); + } catch (e) { + this.onObserverException(e); + } + } + + this._notify(guard); + this._observers = []; + } + + protected _notifyCompleted() { + let guard = (observer: IObserver) => { + try { + observer.complete(); + } catch (e) { + this.onObserverException(e); + } + } + + this._notify(guard); + this._observers = []; + } +} \ No newline at end of file diff --git a/src/ts/components/Observable.ts b/src/ts/components/Observable.ts deleted file mode 100644 --- a/src/ts/components/Observable.ts +++ /dev/null @@ -1,193 +0,0 @@ -import { IObservable, IDestroyable, ICancellation } from '../interfaces'; -import { Cancellation } from '../Cancellation' -import { argumentNotNull } from '../safe'; - - -interface Handler { - (x: T): void -} - -interface Initializer { - (notify: Handler, error?: (e: any) => void, complete?: () => void): void; -} - -// TODO: think about to move this interfaces.ts and make it public -interface IObserver { - next(event: T): void - - error(e: any): void - - complete(): void -} - -const noop = () => {}; - -export class Observable implements IObservable { - private _once = new Array>(); - - private _observers = new Array>(); - - - private _complete: boolean - - private _error: any - - constructor(func?: Initializer) { - if (func) - func( - this._notifyNext.bind(this), - this._notifyError.bind(this), - this._notifyCompleted.bind(this) - ); - } - - /** - * Registers handlers for the current observable object. - * - * @param next the handler for events - * @param error the handler for a error - * @param complete the handler for a completion - * @returns {IDestroyable} the handler for the current subscription, this - * handler can be used to unsubscribe from events. - * - */ - on(next: Handler, error?: Handler, complete?: () => void): IDestroyable { - argumentNotNull(next, "next"); - - let me = this; - - let observer: IObserver & IDestroyable = { - next: next, - error: error ? error.bind(null) : noop, - complete: complete ? complete.bind(null) : noop, - - destroy() { - me._removeObserver(this); - } - } - - this._addObserver(observer); - - - return observer; - } - - private _addObserver(observer: IObserver) { - if (this._complete) { - try { - if (this._error) - observer.error(this._error); - else - observer.complete(); - } catch (e) { - this.onObserverException(e); - } - } else { - this._observers.push(observer); - } - } - - /** - * Waits for the next event. This method can't be used to read messages - * as a sequence since it can skip some messages between calls. - * - * @param ct a cancellation token - */ - next(ct: ICancellation = Cancellation.none): Promise { - return new Promise((resolve, reject) => { - let observer: IObserver = { - next: resolve, - error: reject, - complete: () => reject("No more events are available") - }; - - if (this._addOnce(observer) && ct.isSupported()) { - ct.register((e) => { - this._removeOnce(observer); - reject(e); - }); - } - }); - } - - private _addOnce(observer: IObserver) { - if (this._complete) { - try { - if (this._error) - observer.error(this._error); - else - observer.complete(); - } catch (e) { - this.onObserverException(e); - } - return false; - } - - this._once.push(observer); - return true; - } - - protected onObserverException(e: any) { - } - - private _removeOnce(d: IObserver) { - let i = this._once.indexOf(d); - if (i >= 0) - this._once.splice(i, 1); - } - - private _removeObserver(d: IObserver) { - let i = this._observers.indexOf(d); - if (i >= 0) - this._observers.splice(i, 1); - } - - private _notify(guard: (observer: IObserver) => void) { - if (this._once.length) { - for (let i = 0; i < this._once.length; i++) - guard(this._once[i]); - this._once = []; - } - - for (let i = 0; i < this._observers.length; i++) - guard(this._observers[i]); - } - - protected _notifyNext(evt: T) { - let guard = (observer: IObserver) => { - try { - observer.next(evt); - } catch (e) { - this.onObserverException(e); - } - } - - this._notify(guard); - } - - protected _notifyError(e: any) { - let guard = (observer: IObserver) => { - try { - observer.error(e); - } catch (e) { - this.onObserverException(e); - } - } - - this._notify(guard); - this._observers = []; - } - - protected _notifyCompleted() { - let guard = (observer: IObserver) => { - try { - observer.complete(); - } catch (e) { - this.onObserverException(e); - } - } - - this._notify(guard); - this._observers = []; - } -} \ No newline at end of file diff --git a/src/ts/log/TraceSource.ts b/src/ts/log/TraceSource.ts --- a/src/ts/log/TraceSource.ts +++ b/src/ts/log/TraceSource.ts @@ -1,6 +1,6 @@ import * as format from '../text/format' import { argumentNotNull } from '../safe'; -import { Observable } from '../components/Observable' +import { Observable } from '../Observable' import { IDestroyable } from '../interfaces'; export const DebugLevel = 400;