Observable
Универсальный способ организации потока сообщений. Данный механизм может использоваться для оповещения об изменениях состояний объектов или для доставки самостоятельных событий, например, связанных с действиями пользователя.
Является реализацией классического шаблона наблюдателя с возможность сообщить о коце потока событий. Данная реализация не содержит никаких дополнительных функций, таких как фильтрация, канал с состоянием, преобразования сообщений и т.п. Это сделано специально, чтобы реализация оставалась максимально простой.
Пример того, как можно создать последовательность из 10 событий:
var events = new Observable(async (notify, error, complete) => { // цикл в котором возникает событие for(let i = 0; i < 10; i++) { await delay(1000); // в качестве данных передается номер события notify(i); } // по окончании последовательности информируем, что событий больше не будет compelte(); }); // создаем окно с отображением хода событий var progress = showProgress({ min: 0, max: 9, current: 0}); // подписываемся на события events.on( // обработчик очередного события msg => { progress.setValue(msg); }. // обработчик ошибки e => { progress.showError(e); }, // обработчик конца потока () => { progress.close(); } ); // ожидание следующего события let firstEvent = await events.next();
Observable можно создавать из событий другого объекта, например, виджета:
// клсс class Canvas { readonly mouseMove: IObservable<[number,number]> postCreate() { // превращаем события виджета в Observable this.mouseMove = new Observable<[number,number]>((notify) => { this.mousePad.on('mousemove',(e) => notify([e.clientX, e.clientY]) ); }); } }
Если объект инкапсулирует в себе Observable, он также может сохранить методы
для оповещения подписчиков для дальнейшего их использования внутри класса.
// класс, который будет генерировать события местоположения class PositionTracker implements IDestroyable { // _nextPosition и _complete будут связаны с position при создании // экземпляра PositionTracker. _nextPosition: (pos: Position) => void _complete: () => void readonly position: IObservable<Position> // конструктор constructor(...args: any[]) { super(args); // создаем Observable this.position = new Observable<Position>((notify, error, complete) => { // сохраняем методы для оповещения о новом местоположении this._nextPosition = notify; // метод об оповещении конца потока событий this._complete = complete }); } // метод для очистки ресурсов destroy() { this._complete(); super(); } }
Observable и последовательности
Можно сичтать, что Observable это некоторая аналогия итератора только в
парадигме событийного (или реактивного) программировния. Следует также понимать,
что при переходе от синхронного процедурного программирования к событийному так
же меняется и направление управления (Inverse Of Control), что означает
следующее:
- при работе с итераторами клиенты сами определяют момент чтения следующего элемента последовательности.
- при работе с
Observableклиенты вынуждены обрабатывать эти события по мере их поступления и не могут на это повлиять.
Последний пункт можно изменить применив, например, буффер или канал с состоянием, т.е. очередь, но данные механизмы выходят за рамки простого шаблона наблюдателя.
while(1) { // ожидаем следующее событие, по сути это подписка только на одно событие let next = await events.next(); // такой цикл может пропускать сообщения, поскольку асинхронная операция // позволит возобновить создание новых событий, на которые мы не подписаны await processEvent(next); // не только асинхронные операции могут привести к пропуску события // например вызов метода, который приводит к созданию события так же // приведет к тому, что созданное событие не будет обработано в текущем // цикле doSmthAndRiseEvent(); }
