##// END OF EJS Templates
added CancellationTests...
cin -
r18:a8dda6a00a16 propose cancellat...
parent child
Show More
@@ -0,0 +1,1
1 *.tgz No newline at end of file
@@ -0,0 +1,257
1 # Cancellations. Отмена асинхронных операций
2
3 Использование Promise позволяет организовать обработку результатов работы
4 асинхронных фукнций. Ключевые слова async/await позволяют работать с
5 асинхронными вызовами в стиле процедурного программирования, хотя по сути это
6 событиный подход. При всей своей красоте даннго подхода в нем умышленно
7 отсутсвует механизм отмены асинхронной операции, т.е. ее можно начать, но нельзя
8 отказаться от результатов ее выполнения, даже если это уже не требуется.
9
10 Примером того, когда может потребоваться отмена является загрузка большого
11 файла, при которой пользователю отображается окно хода операции с возможностью
12 ее отмены.
13
14 ```ts
15 // имеется некоторый HTTP клиент
16 let client = new HttpClient();
17
18 // загружается большой файл, с использованием медленного канала
19 let data = await client.getJsonAsync('http://host/large-file.json');
20
21 ```
22
23 Как поступить в данной ситуации, прежде всего нужно, чтобы сама операция
24 поддерживала возможность отмены, предположим, что для этого есть метод
25 `client.abort()`.
26
27 ```ts
28
29 // имеется некоторый HTTP клиент
30 let client = new HttpClient();
31
32 // отображаем окно с информацией о хоже операции
33 let progressView = showProgress("Downloading, please wait...");
34
35 // код оборачивается в try/finally поскольку созданную форму нужно закрыть
36 try {
37 // загружается большой файл, с использованием медленного канала
38 // здесь, в отличии от предыдущего примера, мы не дожидаемся результата,
39 // а запоминаем обещание в переменную downloadTask
40 let downloadTask = client.getJsonAsync('http://host/large-file.json');
41
42 // связываем нажатие кнопки с отменой загрузки
43 progressView.once('cancel', () => client.abort());
44
45 // ожидаем окончания загрузки данных
46
47 let data = await downloadTask;
48 } finally {
49 // независимот от результата закрываем форму
50 // при этом также происходит ануллирование подписок на события
51 progressView.close();
52 }
53
54 ```
55
56 Технические приведенное решение выглядит не плохо, но проблемы появляются, когда
57 требуется организовать отмену нескольких операций, особенно если они вложенные.
58
59 ```ts
60 // обновление информации о человеке на форме
61 async function updatePersonInfo(info) {
62 let client = new RestApiClient();
63
64 // выплнение нескольких асинхронных операций
65 let org = await client.getOrgAsync(info.orgId);
66 let city = await client.getCityAsync(info.cityId);
67
68 // обновление содержимого представления
69 renderContent({
70 person: info,
71 org: org,
72 city: city
73 });
74 }
75
76 ```
77
78 Чтобы реализовать возможность отмены такой операции требуется, чтобы в логике
79 самой операции была реализована поддержка отмены. Для реализации этого
80 потребуется чтобы у операции была информация о запросе отмены, причем данная
81 информация относится именно к текущей операции.
82
83 Информацию о состоянии запроса на отмену назовет **маркер отмены (cancellation
84 token)**. Поскольку маркер отмены тесно связан с операцией, его удобно
85 передавать в виде параметра, тогда код операции будет выглядеть так:
86
87 ```ts
88 // обновление информации о человеке на форме
89 // ct - маркер отмены
90 async function updatePersonInfo(info, ct) {
91 let client = new RestApiClient();
92
93 // выплнение нескольких асинхронных операций
94 // маркер отмены просто передается далее по цепочке вызовов, без
95 // дополнительных действий
96 let org = await client.getOrg(info.orgId, ct);
97 let city = await client.getCity(info.cityId, ct);
98
99 // обновление содержимого представления
100 renderContent({
101 person: info,
102 org: org,
103 city: city
104 });
105 }
106
107 ///////////////////////////////////////////////////////////////////////////////
108 // ... где-то в другом месте
109 ///////////////////////////////////////////////////////////////////////////////
110
111 // отображаем окно с информацией о хоже операции
112 let progressView = showProgress("Loading, please wait...");
113
114 // создаем маркер отмены для операции на основе события 'cancel'.
115 let ct = new Cancellation(cancel => progressView.on('cancel', cancel));
116
117 // код оборачивается в try/finally поскольку созданную форму нужно закрыть
118 try {
119 // асинхронно получаем информацию о человеке
120 let data = await getPersonInfo(personId, ct);
121
122 // асинхронно обновляем представление
123 await updatePersonInfo(data, ct);
124 } finally {
125 // независимот от результата закрываем форму
126 // при этом также происходит ануллирование подписок на события
127 progressView.close();
128 }
129
130 ```
131
132 Таким образом тот, кто начинает асинхронную операцию заранее определяет как эта
133 опреция будет отменена.
134
135 Важно понимать, что для реализации отмены операции
136 могут выделаться ресурсы требующие явного освобождения (DOM, таймеры, события),
137 об их освобождении по окончанию операции (успешном или нет) должен позаботиться
138 инициатор этой операции. `Cancellation` выступает только в роли посредника для
139 доставки события отмены операции до конечного получателя, он не отслеживает и
140 не освобождает ресурсы, кроме того, асинхронная операция может его попросту
141 проигнорировать.
142
143 ## `ICancellation` Маркер отмены операции
144
145 Интерфейс маркера отмены операции. Используется асинхронными операциями, чтобы
146 получить оповещение о требуемой отмене.
147
148 ### `isSupported(): boolean`
149
150 Определяет, может ли быть запрошена отмена операции через данный маркер.
151
152 ### `isRequested(): boolean`
153
154 Возвращает текущее состояние запроса на отмену.
155
156 ### `throwIfRequested(): void`
157
158 Если отмена была запрошена, бросает в качестве исключения причину отмены.
159
160 ### `register(cb: (e:any) => void): IDestroyable`
161
162 Метод, зарегистрировать обработчик на запрос отмены. Если отмена была запрошена
163 зарегистрированный обработчик будет вызван ровно один раз, независимо от того,
164 был ли он зарегистрирован до или после запроса отмены.
165
166 Если отмена уже была запрошена, обработчик будет вызван сразу при регистрации,
167 при этом исключения, которые могу возникнуть в обработчике не будут обработаны,
168 а передадуться наверх.
169
170 Вызов данного метода приводит к выделению ресурсов, поэтому операция,
171 зарегистрировавшая обработчик должна освободить подписку, которую вернет метод.
172
173 ```ts
174 async function getAsync(url: string, ct: ICancellation = Cancellation.none) {
175 // переменная в которой будет запомнена подписка на запрос отмены
176 let reg;
177 try {
178 // оборачиваем операцию загрузки в Promise
179 return await new Promise<string>((resolve, reject) => {
180 // объект Xhr
181 const xhr = new XMLHttpRequest();
182 xhr.open("GET", url);
183
184 // регистрируем обработчики Promise
185 xhr.onload = () => resolve(xhr.responseText);
186 xhr.onerror = () => reject(xhr.statusText);
187
188 // отправляем запрос
189 xhr.send();
190
191 // подписываемся на запрос отмены
192 reg = ct.register((e) => {
193 reject(e);
194 xhr.abort();
195 });
196 });
197 } finally {
198 if (reg)
199 reg.destroy();
200 }
201 }
202
203 ```
204
205 Использование метода `register()` предполагается для организации отмены операций
206 не поддерживающих маркеры отмены.
207
208 ## `Cancellation` Источник отмены
209
210 Класс используется для создания маркеров отмены. Позволяет создать маркер при
211 начале асинхронной операции и связать его, например, с событием DOM.
212
213 Также маркер можно создавать, когда требуется сложное условие отмены текущей и
214 всех нижележещих операций.
215
216 Как правило в большинстве операций достаточно маркера переданного в параметрах,
217 этот же маркер может передаваться ниже.
218
219 ### `constructor(exec: (cancel: (reason:any) => void ) => void )`
220
221 Создает новый маркер, при помощи параметра и инициализирует его при помощи
222 фукнции, переданной в параметре `exec`.
223
224 ```ts
225
226 let htimer;
227 let ct = new Cancellation(cancel => {
228 htimer = setTimeout(() => cancel("The request is timed out."), 1000);
229 });
230
231 try {
232 let text = await getAsync(url, ct);
233 } finally {
234 // инициатор должен освобождать ресурсы
235 // передача недействительного htimer не приводит ни к каким последствиям
236 clearTimeout(htimer);
237 }
238
239 ```
240
241 ## `Cancellation.none: ICancellation`
242
243 Статическое свойство только для чтения, в котором находится специальный токен
244 запроса отмены. Этот токен означает, что отмена никогда не может произойти.
245
246 Данный токен рекомендуется использовать как значение по-умолчанию для
247 параметров, принимающих токен отмены.
248
249 ```ts
250
251 async function load(url: string, ct: ICancellation = Cancellation.none) {
252 ct.throwIfRequested();
253
254 // ... the rest of method
255 }
256
257 ``` No newline at end of file
@@ -0,0 +1,97
1 import * as tape from 'tape';
2 import { Cancellation } from '@implab/core/Cancellation';
3 import { ICancellation } from '@implab/core/interfaces';
4 import { delay } from './TestTraits';
5
6 tape('standalone cancellation', async t => {
7
8 let doCancel: (e) => void;
9
10 let ct = new Cancellation(cancel => {
11 doCancel = cancel;
12 });
13
14 let counter = 0;
15 let reason = "BILL";
16
17 t.true(ct.isSupported(), "Cancellation must be supported");
18 t.false(ct.isRequested(), "Cancellation shouldn't be requested");
19 ct.throwIfRequested();
20 t.pass("The exception shouldn't be thrown unless the cancellation is requested");
21
22 ct.register(() => counter++);
23 t.equals(counter, 0, "counter should be zero");
24
25 ct.register(() => counter++).destroy();
26
27 doCancel(reason);
28
29 t.true(ct.isRequested(), "Cancellation should be requested");
30 t.equals(counter, 1, "The registered callback should be triggered");
31
32 ct.register(() => counter++);
33 t.equals(counter, 2, "The callback should be triggered immediately");
34
35 let msg;
36 ct.register((e) => msg = e);
37 t.equals(msg, reason, "The cancellation reason should be passed to callback");
38
39 try {
40 msg = null;
41 ct.throwIfRequested();
42 t.fail("The exception should be thrown");
43 } catch (e) {
44 msg = e;
45 }
46 t.equals(msg, reason, "The cancellation reason should be catched");
47
48 t.end();
49 });
50
51 tape('async cancellation', async t => {
52
53 let ct = new Cancellation(cancel => {
54 cancel("STOP!");
55 });
56
57 try {
58 await delay(0, ct);
59 t.fail("Should thow the exception");
60 } catch (e) {
61 t.equals(e, "STOP!", "Should throw the cancellation reason");
62 }
63
64 t.end();
65 });
66
67 tape('cancel with external event', async t => {
68 let ct = new Cancellation((cancel) => {
69 setTimeout(x => cancel('STOP!'), 0);
70 })
71
72 try {
73 await delay(10000, ct);
74 t.fail("Should thow the exception");
75 } catch (e) {
76 t.equals(e, "STOP!", "Should throw the cancellation reason");
77 }
78
79 t.end();
80 });
81
82 tape('operation normal flow', async t => {
83
84 let htimeout;
85 let ct = new Cancellation((cancel) => {
86 htimeout = setTimeout(() => cancel("STOP!"), 1000);
87 });
88
89 try {
90 await delay(0, ct);
91 t.pass("Should pass");
92 } finally {
93 clearTimeout(htimeout);
94 }
95
96 t.end();
97 }); No newline at end of file
@@ -1,91 +1,91
1
1
2 println "version: $version"
2 println "version: $version"
3
3
4 def distDir = "$buildDir/dist"
4 def distDir = "$buildDir/dist"
5 def testDir = "$buildDir/test"
5 def testDir = "$buildDir/test"
6
6
7 task clean {
7 task clean {
8 doLast {
8 doLast {
9 delete buildDir
9 delete buildDir
10 delete 'node_modules/@implab'
10 delete 'node_modules/@implab'
11 }
11 }
12 }
12 }
13
13
14 task cleanNpm {
14 task cleanNpm {
15 doLast {
15 doLast {
16 delete 'node_modules'
16 delete 'node_modules'
17 }
17 }
18 }
18 }
19
19
20 task _npmInstall() {
20 task _npmInstall() {
21 inputs.file("package.json")
21 inputs.file("package.json")
22 outputs.dir("node_modules")
22 outputs.dir("node_modules")
23 doLast {
23 doLast {
24 exec {
24 exec {
25 commandLine 'npm', 'install'
25 commandLine 'npm', 'install'
26 }
26 }
27 }
27 }
28 }
28 }
29
29
30 task _legacyJs(type:Copy) {
30 task _legacyJs(type:Copy) {
31 from 'src/js/'
31 from 'src/js/'
32 into distDir
32 into distDir
33 }
33 }
34
34
35 task _buildTs(dependsOn: _npmInstall, type:Exec) {
35 task _buildTs(dependsOn: _npmInstall, type:Exec) {
36 inputs.dir('src/ts')
36 inputs.dir('src/ts')
37 inputs.file('tsc.json')
37 inputs.file('tsc.json')
38 outputs.dir(distDir)
38 outputs.dir(distDir)
39
39
40 commandLine 'node_modules/.bin/tsc', '-p', 'tsc.json'
40 commandLine 'node_modules/.bin/tsc', '-p', 'tsc.json'
41 }
41 }
42
42
43 task _packageMeta(type: Copy) {
43 task _packageMeta(type: Copy) {
44 inputs.property("version", version)
44 inputs.property("version", version)
45 from('.') {
45 from('.') {
46 include 'package.json', 'readme.md', 'license', 'history.md'
46 include 'package.json', '.npmignore', 'readme.md', 'license', 'history.md'
47 }
47 }
48 into distDir
48 into distDir
49 doLast {
49 doLast {
50 exec {
50 exec {
51 workingDir distDir
51 workingDir distDir
52 commandLine 'npm', 'version', version
52 commandLine 'npm', 'version', version
53 }
53 }
54 }
54 }
55 }
55 }
56
56
57 task build(dependsOn: [_npmInstall, _buildTs, _legacyJs, _packageMeta]) {
57 task build(dependsOn: [_npmInstall, _buildTs, _legacyJs, _packageMeta]) {
58
58
59 }
59 }
60
60
61 task _localInstall(dependsOn: build, type: Exec) {
61 task _localInstall(dependsOn: build, type: Exec) {
62 inputs.file("$distDir/package.json")
62 inputs.file("$distDir/package.json")
63 outputs.upToDateWhen {
63 outputs.upToDateWhen {
64 new File("$projectDir/node_modules/@implab/core").exists()
64 new File("$projectDir/node_modules/@implab/core").exists()
65 }
65 }
66
66
67 commandLine 'npm', 'install', '--no-save', '--force', distDir
67 commandLine 'npm', 'install', '--no-save', '--force', distDir
68 }
68 }
69
69
70 task copyJsTests(type: Copy) {
70 task copyJsTests(type: Copy) {
71 from 'test/js'
71 from 'test/js'
72 into testDir
72 into testDir
73 }
73 }
74
74
75 task buildTests(dependsOn: _localInstall, type: Exec) {
75 task buildTests(dependsOn: _localInstall, type: Exec) {
76 inputs.dir('test/ts')
76 inputs.dir('test/ts')
77 inputs.file('tsc.test.json')
77 inputs.file('tsc.test.json')
78 outputs.dir(testDir)
78 outputs.dir(testDir)
79
79
80 commandLine 'node_modules/.bin/tsc', '-p', 'tsc.test.json'
80 commandLine 'node_modules/.bin/tsc', '-p', 'tsc.test.json'
81 }
81 }
82
82
83 task test(dependsOn: [copyJsTests, buildTests], type: Exec) {
83 task test(dependsOn: [copyJsTests, buildTests], type: Exec) {
84 commandLine 'node', 'run-amd-tests.js'
84 commandLine 'node', 'run-amd-tests.js'
85 }
85 }
86
86
87 task pack(dependsOn: build, type: Exec) {
87 task pack(dependsOn: build, type: Exec) {
88 workingDir = distDir
88 workingDir = distDir
89
89
90 commandLine 'npm', 'pack'
90 commandLine 'npm', 'pack'
91 } No newline at end of file
91 }
@@ -1,67 +1,89
1 import { ICancellation } from "./interfaces";
1 import { ICancellation, IDestroyable } from "./interfaces";
2 import { argumentNotNull } from "./safe";
2 import { argumentNotNull } from "./safe";
3
3
4 const destroyed = {
5 destroy() {
6 }
7 };
8
4 export class Cancellation implements ICancellation {
9 export class Cancellation implements ICancellation {
5 private _reason: any;
10 private _reason: any;
6 private _cbs: Array<(e) => void>;
11 private _cbs: Array<(e) => void>;
7
12
8 constructor(action: (cancel: (e) => void) => void) {
13 constructor(action: (cancel: (e) => void) => void) {
9 argumentNotNull(action, "action");
14 argumentNotNull(action, "action");
10
15
11 action(this._cancel.bind(this));
16 action(this._cancel.bind(this));
12 }
17 }
13
18
14 isSupported(): boolean {
19 isSupported(): boolean {
15 return true;
20 return true;
16 }
21 }
17 throwIfRequested(): void {
22 throwIfRequested(): void {
18 if (this._reason)
23 if (this._reason)
19 throw this._reason;
24 throw this._reason;
20 }
25 }
21
26
22 isRequested(): boolean {
27 isRequested(): boolean {
23 return !!this._reason;
28 return !!this._reason;
24 }
29 }
25
30
26 register(cb: (e: any) => void): void {
31 register(cb: (e: any) => void): IDestroyable {
27 argumentNotNull(cb, "cb");
32 argumentNotNull(cb, "cb");
28
33
29 if (this._reason) {
34 if (this._reason) {
30 cb(this._reason);
35 cb(this._reason);
36 return destroyed;
31 } else {
37 } else {
32 if (!this._cbs)
38 if (!this._cbs)
33 this._cbs = [cb];
39 this._cbs = [cb];
34 else
40 else
35 this._cbs.push(cb);
41 this._cbs.push(cb);
42
43 let me = this;
44 return {
45 destroy() {
46 me._unregister(cb);
47 }
48 };
49 }
50 }
51
52 private _unregister(cb) {
53 if(this._cbs) {
54 let i = this._cbs.indexOf(cb);
55 if ( i>=0 )
56 this._cbs.splice(i,1);
36 }
57 }
37 }
58 }
38
59
39 private _cancel(reason) {
60 private _cancel(reason) {
40 if (this._reason)
61 if (this._reason)
41 return;
62 return;
42
63
43 this._reason = (reason = reason || new Error("Operation cancelled"));
64 this._reason = (reason = reason || new Error("Operation cancelled"));
44
65
45
66
46 if (this._cbs) {
67 if (this._cbs) {
47 this._cbs.forEach(cb => cb(reason));
68 this._cbs.forEach(cb => cb(reason));
48 this._cbs = null;
69 this._cbs = null;
49 }
70 }
50 }
71 }
51
72
52 static readonly none: ICancellation = {
73 static readonly none: ICancellation = {
53 isSupported(): boolean {
74 isSupported(): boolean {
54 return false;
75 return false;
55 },
76 },
56
77
57 throwIfRequested(): void {
78 throwIfRequested(): void {
58 },
79 },
59
80
60 isRequested(): boolean {
81 isRequested(): boolean {
61 return false;
82 return false;
62 },
83 },
63
84
64 register(_cb: (e: any) => void): void {
85 register(_cb: (e: any) => void): IDestroyable {
86 return destroyed;
65 }
87 }
66 };
88 };
67 } No newline at end of file
89 }
@@ -1,215 +1,198
1 import { IObservable, IDestroyable, ICancellation } from '../interfaces';
1 import { IObservable, IDestroyable, ICancellation } from '../interfaces';
2 import { Cancellation } from '../Cancellation'
2 import { Cancellation } from '../Cancellation'
3 import { argumentNotNull } from '../safe';
3 import { argumentNotNull } from '../safe';
4
4
5
5
6 interface Handler<T> {
6 interface Handler<T> {
7 (x: T): void
7 (x: T): void
8 }
8 }
9
9
10 interface Initializer<T> {
10 interface Initializer<T> {
11 (notify: Handler<T>, error?: (e: any) => void, complete?: () => void): (() => void) | void;
11 (notify: Handler<T>, error?: (e: any) => void, complete?: () => void): void;
12 }
12 }
13
13
14 // TODO: think about to move this interfaces.ts and make it public
14 // TODO: think about to move this interfaces.ts and make it public
15 interface IObserver<T> {
15 interface IObserver<T> {
16 next(event: T): void
16 next(event: T): void
17
17
18 error(e: any): void
18 error(e: any): void
19
19
20 complete(): void
20 complete(): void
21 }
21 }
22
22
23 class Observable<T> implements IObservable<T>, IDestroyable {
23 const noop = () => {};
24
25 class Observable<T> implements IObservable<T> {
24 private _once = new Array<IObserver<T>>();
26 private _once = new Array<IObserver<T>>();
25
27
26 private _observers = new Array<IObserver<T>>();
28 private _observers = new Array<IObserver<T>>();
27
29
28 private _cleanup: (() => void) | void;
29
30
30 private _complete: boolean
31 private _complete: boolean
31
32
32 private _error: any
33 private _error: any
33
34
34 constructor(func?: Initializer<T>) {
35 constructor(func?: Initializer<T>) {
35 this._cleanup = func && func(
36 if (func)
37 func(
36 this._notifyNext.bind(this),
38 this._notifyNext.bind(this),
37 this._notifyError.bind(this),
39 this._notifyError.bind(this),
38 this._notifyCompleted.bind(this)
40 this._notifyCompleted.bind(this)
39 );
41 );
40 }
42 }
41
43
42 /**
44 /**
43 * Registers handlers for the current observable object.
45 * Registers handlers for the current observable object.
44 *
46 *
45 * @param next the handler for events
47 * @param next the handler for events
46 * @param error the handler for a error
48 * @param error the handler for a error
47 * @param complete the handler for a completion
49 * @param complete the handler for a completion
48 * @returns {IDestroyable} the handler for the current subscription, this
50 * @returns {IDestroyable} the handler for the current subscription, this
49 * handler can be used to unsubscribe from events.
51 * handler can be used to unsubscribe from events.
50 *
52 *
51 */
53 */
52 on(next: Handler<T>, error?: Handler<any>, complete?: () => void): IDestroyable {
54 on(next: Handler<T>, error?: Handler<any>, complete?: () => void): IDestroyable {
53 argumentNotNull(next, "next");
55 argumentNotNull(next, "next");
54
56
55 let me = this;
57 let me = this;
56
58
57 let observer: IObserver<T> & IDestroyable = {
59 let observer: IObserver<T> & IDestroyable = {
58 next: next,
60 next: next,
59
61 error: error ? error.bind(null) : noop,
60 error(e: any) {
62 complete: complete ? complete.bind(null) : noop,
61 if (error)
62 error(e);
63 },
64
65 complete() {
66 if (complete)
67 complete();
68 },
69
63
70 destroy() {
64 destroy() {
71 me._removeObserver(this);
65 me._removeObserver(this);
72 }
66 }
73 }
67 }
74
68
75 this._addObserver(observer);
69 this._addObserver(observer);
76
70
77
71
78 return observer;
72 return observer;
79 }
73 }
80
74
81 private _addObserver(observer: IObserver<T>) {
75 private _addObserver(observer: IObserver<T>) {
82 if (this._complete) {
76 if (this._complete) {
83 try {
77 try {
84 if (this._error)
78 if (this._error)
85 observer.error(this._error);
79 observer.error(this._error);
86 else
80 else
87 observer.complete();
81 observer.complete();
88 } catch (e) {
82 } catch (e) {
89 this.onObserverException(e);
83 this.onObserverException(e);
90 }
84 }
91 } else {
85 } else {
92 this._observers.push(observer);
86 this._observers.push(observer);
93 }
87 }
94 }
88 }
95
89
96 /**
90 /**
97 * Waits for the next event. This method can't be used to read messages
91 * Waits for the next event. This method can't be used to read messages
98 * as a sequence since it can skip some messages between calls.
92 * as a sequence since it can skip some messages between calls.
99 *
93 *
100 * @param ct a cancellation token
94 * @param ct a cancellation token
101 */
95 */
102 next(ct: ICancellation = Cancellation.none): Promise<T> {
96 next(ct: ICancellation = Cancellation.none): Promise<T> {
103 return new Promise<T>((resolve, reject) => {
97 return new Promise<T>((resolve, reject) => {
104 let observer: IObserver<T> = {
98 let observer: IObserver<T> = {
105 next: resolve,
99 next: resolve,
106 error: reject,
100 error: reject,
107 complete: () => reject("No more events are available")
101 complete: () => reject("No more events are available")
108 };
102 };
109
103
110 if (this._addOnce(observer) && ct.isSupported()) {
104 if (this._addOnce(observer) && ct.isSupported()) {
111 ct.register((e) => {
105 ct.register((e) => {
112 this._removeOnce(observer);
106 this._removeOnce(observer);
113 reject(e);
107 reject(e);
114 });
108 });
115 }
109 }
116 });
110 });
117 }
111 }
118
112
119 private _addOnce(observer: IObserver<T>) {
113 private _addOnce(observer: IObserver<T>) {
120 if (this._complete) {
114 if (this._complete) {
121 try {
115 try {
122 if (this._error)
116 if (this._error)
123 observer.error(this._error);
117 observer.error(this._error);
124 else
118 else
125 observer.complete();
119 observer.complete();
126 } catch (e) {
120 } catch (e) {
127 this.onObserverException(e);
121 this.onObserverException(e);
128 }
122 }
129 return false;
123 return false;
130 }
124 }
131
125
132 this._once.push(observer);
126 this._once.push(observer);
133 return true;
127 return true;
134 }
128 }
135
129
136 destroy() {
137 if (this._complete)
138 this._notifyCompleted();
139
140 let cleanup = this._cleanup;
141 if (cleanup) {
142 this._cleanup = null;
143 cleanup();
144 }
145 }
146
147 protected onObserverException(e: any) {
130 protected onObserverException(e: any) {
148 }
131 }
149
132
150 private _removeOnce(d: IObserver<T>) {
133 private _removeOnce(d: IObserver<T>) {
151 let i = this._once.indexOf(d);
134 let i = this._once.indexOf(d);
152 if (i >= 0)
135 if (i >= 0)
153 this._once.splice(i);
136 this._once.splice(i, 1);
154 }
137 }
155
138
156 private _removeObserver(d: IObserver<T>) {
139 private _removeObserver(d: IObserver<T>) {
157 let i = this._observers.indexOf(d);
140 let i = this._observers.indexOf(d);
158 if (i >= 0)
141 if (i >= 0)
159 this._observers.splice(i);
142 this._observers.splice(i, 1);
160 }
143 }
161
144
162 private _notify(guard: (observer: IObserver<T>) => void) {
145 private _notify(guard: (observer: IObserver<T>) => void) {
163 if (this._once.length) {
146 if (this._once.length) {
164 for (let i = 0; i < this._once.length; i++)
147 for (let i = 0; i < this._once.length; i++)
165 guard(this._once[i]);
148 guard(this._once[i]);
166 this._once = [];
149 this._once = [];
167 }
150 }
168
151
169 for (let i = 0; i < this._observers.length; i++)
152 for (let i = 0; i < this._observers.length; i++)
170 guard(this._observers[i]);
153 guard(this._observers[i]);
171 }
154 }
172
155
173 protected _notifyNext(evt: T) {
156 protected _notifyNext(evt: T) {
174 let guard = (observer: IObserver<T>) => {
157 let guard = (observer: IObserver<T>) => {
175 try {
158 try {
176 observer.next(evt);
159 observer.next(evt);
177 } catch (e) {
160 } catch (e) {
178 this.onObserverException(e);
161 this.onObserverException(e);
179 }
162 }
180 }
163 }
181
164
182 this._notify(guard);
165 this._notify(guard);
183 }
166 }
184
167
185 protected _notifyError(e: any) {
168 protected _notifyError(e: any) {
186 let guard = (observer: IObserver<T>) => {
169 let guard = (observer: IObserver<T>) => {
187 try {
170 try {
188 observer.error(e);
171 observer.error(e);
189 } catch (e) {
172 } catch (e) {
190 this.onObserverException(e);
173 this.onObserverException(e);
191 }
174 }
192 }
175 }
193
176
194 this._notify(guard);
177 this._notify(guard);
195 this._observers = [];
178 this._observers = [];
196 }
179 }
197
180
198 protected _notifyCompleted() {
181 protected _notifyCompleted() {
199 let guard = (observer: IObserver<T>) => {
182 let guard = (observer: IObserver<T>) => {
200 try {
183 try {
201 observer.complete();
184 observer.complete();
202 } catch (e) {
185 } catch (e) {
203 this.onObserverException(e);
186 this.onObserverException(e);
204 }
187 }
205 }
188 }
206
189
207 this._notify(guard);
190 this._notify(guard);
208 this._observers = [];
191 this._observers = [];
209 }
192 }
210 }
193 }
211
194
212 namespace Observable {
195 namespace Observable {
213 }
196 }
214
197
215 export = Observable; No newline at end of file
198 export = Observable;
@@ -1,76 +1,76
1 export interface IDestroyable {
1 export interface IDestroyable {
2 destroy();
2 destroy();
3 }
3 }
4
4
5 export interface ICancellation {
5 export interface ICancellation {
6 throwIfRequested(): void;
6 throwIfRequested(): void;
7 isRequested(): boolean;
7 isRequested(): boolean;
8 isSupported(): boolean;
8 isSupported(): boolean;
9 register(cb: (e: any) => void): void;
9 register(cb: (e: any) => void): IDestroyable;
10 }
10 }
11
11
12 /**
12 /**
13 * Интерфейс поддерживающий асинхронную активацию
13 * Интерфейс поддерживающий асинхронную активацию
14 */
14 */
15 export interface IActivatable {
15 export interface IActivatable {
16 /**
16 /**
17 * @returns Boolean indicates the current state
17 * @returns Boolean indicates the current state
18 */
18 */
19 isActive(): boolean;
19 isActive(): boolean;
20
20
21 /**
21 /**
22 * Starts the component activation
22 * Starts the component activation
23 * @param ct cancellation token for this operation
23 * @param ct cancellation token for this operation
24 */
24 */
25 activate(ct?: ICancellation) : Promise<void>;
25 activate(ct?: ICancellation) : Promise<void>;
26
26
27 /**
27 /**
28 * Starts the component deactivation
28 * Starts the component deactivation
29 * @param ct cancellation token for this operation
29 * @param ct cancellation token for this operation
30 */
30 */
31 deactivate(ct?: ICancellation) : Promise<void>;
31 deactivate(ct?: ICancellation) : Promise<void>;
32
32
33 /**
33 /**
34 * Sets the activation controller for this component
34 * Sets the activation controller for this component
35 * @param controller The activation controller
35 * @param controller The activation controller
36 *
36 *
37 * Activation controller checks whether this component
37 * Activation controller checks whether this component
38 * can be activated and manages the active state of the
38 * can be activated and manages the active state of the
39 * component
39 * component
40 */
40 */
41 setActivationController(controller: IActivationController);
41 setActivationController(controller: IActivationController);
42
42
43 /**
43 /**
44 * Gets the current activation controller for this component
44 * Gets the current activation controller for this component
45 */
45 */
46 getActivationController(): IActivationController;
46 getActivationController(): IActivationController;
47 }
47 }
48
48
49 export interface IActivationController {
49 export interface IActivationController {
50 activating(component: IActivatable, ct?: ICancellation): Promise<void>;
50 activating(component: IActivatable, ct?: ICancellation): Promise<void>;
51
51
52 activated(component: IActivatable, ct?: ICancellation): Promise<void>;
52 activated(component: IActivatable, ct?: ICancellation): Promise<void>;
53
53
54 deactivating(component: IActivatable, ct?: ICancellation): Promise<void>;
54 deactivating(component: IActivatable, ct?: ICancellation): Promise<void>;
55
55
56 deactivated(component: IActivatable, ct?: ICancellation): Promise<void>;
56 deactivated(component: IActivatable, ct?: ICancellation): Promise<void>;
57
57
58 deactivate(ct?: ICancellation): Promise<void>;
58 deactivate(ct?: ICancellation): Promise<void>;
59
59
60 activate(component: IActivatable, ct?: ICancellation): Promise<void>;
60 activate(component: IActivatable, ct?: ICancellation): Promise<void>;
61
61
62 getActive(): IActivatable;
62 getActive(): IActivatable;
63 }
63 }
64
64
65 export interface IAsyncComponent {
65 export interface IAsyncComponent {
66 getCompletion(): Promise<void>;
66 getCompletion(): Promise<void>;
67 }
67 }
68
68
69 export interface ICancellable {
69 export interface ICancellable {
70 cancel(reason?: any): void;
70 cancel(reason?: any): void;
71 }
71 }
72
72
73 export interface IObservable<T> {
73 export interface IObservable<T> {
74 on(next: (x:T) => void, error?: (e:any) => void, complete?:() => void): IDestroyable;
74 on(next: (x:T) => void, error?: (e:any) => void, complete?:() => void): IDestroyable;
75 next(ct?: ICancellation) : Promise<T>;
75 next(ct?: ICancellation) : Promise<T>;
76 } No newline at end of file
76 }
@@ -1,1 +1,2
1 define(["./ActivatableTests", "./trace-test", "./TraceSourceTests"]); No newline at end of file
1 define(["./ActivatableTests", "./trace-test", "./TraceSourceTests", "./CancellationTests"]);
2 //define(["./CancellationTests"]); No newline at end of file
@@ -1,39 +1,63
1 import { IObservable, ICancellation, IDestroyable } from "../../build/dist/interfaces";
1 import { IObservable, ICancellation, IDestroyable } from "../../build/dist/interfaces";
2 import * as TraceEvent from '../../build/dist/log/TraceEvent';
2 import * as TraceEvent from '../../build/dist/log/TraceEvent';
3 import { Cancellation } from "../../build/dist/Cancellation";
3 import { Cancellation } from "../../build/dist/Cancellation";
4 import * as TraceSource from "../../build/dist/log/TraceSource";
4 import * as TraceSource from "../../build/dist/log/TraceSource";
5 import * as tape from 'tape';
5 import * as tape from 'tape';
6 import { argumentNotNull } from "../../build/dist/safe";
6 import { argumentNotNull } from "../../build/dist/safe";
7
7
8 export class TapeWriter implements IDestroyable {
8 export class TapeWriter implements IDestroyable {
9 readonly _tape: tape.Test
9 readonly _tape: tape.Test
10
10
11 _subscriptions = new Array<IDestroyable>();
11 _subscriptions = new Array<IDestroyable>();
12
12
13 constructor(tape: tape.Test) {
13 constructor(tape: tape.Test) {
14 argumentNotNull(tape, "tape");
14 argumentNotNull(tape, "tape");
15 this._tape = tape;
15 this._tape = tape;
16 }
16 }
17
17
18 writeEvents(source: IObservable<TraceEvent>, ct: ICancellation = Cancellation.none) {
18 writeEvents(source: IObservable<TraceEvent>, ct: ICancellation = Cancellation.none) {
19 let subscription = source.on(this.writeEvent.bind(this));
19 let subscription = source.on(this.writeEvent.bind(this));
20 if (ct.isSupported()) {
20 if (ct.isSupported()) {
21 ct.register(subscription.destroy.bind(subscription));
21 ct.register(subscription.destroy.bind(subscription));
22 }
22 }
23 this._subscriptions.push(subscription);
23 this._subscriptions.push(subscription);
24 }
24 }
25
25
26 writeEvent(next: TraceEvent) {
26 writeEvent(next: TraceEvent) {
27 if (next.level >= TraceSource.LogLevel) {
27 if (next.level >= TraceSource.LogLevel) {
28 this._tape.comment("LOG " + next.arg);
28 this._tape.comment("LOG " + next.arg);
29 } else if(next.level >= TraceSource.WarnLevel) {
29 } else if (next.level >= TraceSource.WarnLevel) {
30 this._tape.comment("WARN " + next.arg);
30 this._tape.comment("WARN " + next.arg);
31 } else {
31 } else {
32 this._tape.comment("ERROR " + next.arg);
32 this._tape.comment("ERROR " + next.arg);
33 }
33 }
34 }
34 }
35
35
36 destroy() {
36 destroy() {
37 this._subscriptions.forEach(x => x.destroy());
37 this._subscriptions.forEach(x => x.destroy());
38 }
38 }
39 }
40
41 export async function delay(timeout: number, ct: ICancellation = Cancellation.none) {
42 let un: IDestroyable;
43
44 try {
45 await new Promise((resolve, reject) => {
46 if (ct.isRequested()) {
47 un = ct.register(reject);
48 } else {
49 let ht = setTimeout(() => {
50 resolve();
51 }, timeout);
52
53 un = ct.register(e => {
54 clearTimeout(ht);
55 reject(e);
56 });
57 }
58 });
59 } finally {
60 if(un)
61 un.destroy();
62 };
39 } No newline at end of file
63 }
@@ -1,70 +1,69
1 import * as TraceSource from '../../build/dist/log/TraceSource'
1 import * as TraceSource from '@implab/core/log/TraceSource'
2 import * as tape from 'tape';
2 import * as tape from 'tape';
3 import * as ConsoleWriter from '../../build/dist/log/writers/ConsoleWriter';
4 import { TapeWriter } from './TestTraits';
3 import { TapeWriter } from './TestTraits';
5
4
6 const sourceId = 'test/TraceSourceTests';
5 const sourceId = 'test/TraceSourceTests';
7
6
8 tape('trace message', t => {
7 tape('trace message', t => {
9 let trace = TraceSource.get(sourceId);
8 let trace = TraceSource.get(sourceId);
10
9
11 trace.level = TraceSource.DebugLevel;
10 trace.level = TraceSource.DebugLevel;
12
11
13 let h = trace.on((ev) => {
12 let h = trace.on((ev) => {
14 t.equal(ev.source, trace, "sender should be the current trace source");
13 t.equal(ev.source, trace, "sender should be the current trace source");
15 t.equal(ev.level, TraceSource.DebugLevel, "level should be debug level");
14 t.equal(ev.level, TraceSource.DebugLevel, "level should be debug level");
16 t.equal(ev.arg, "Hello, World!", "The message should be a formatted message");
15 t.equal(ev.arg, "Hello, World!", "The message should be a formatted message");
17
16
18 t.end();
17 t.end();
19 });
18 });
20
19
21 trace.debug("Hello, {0}!", "World");
20 trace.debug("Hello, {0}!", "World");
22
21
23 h.destroy();
22 h.destroy();
24 });
23 });
25
24
26 tape('trace event', t => {
25 tape('trace event', t => {
27 let trace = TraceSource.get(sourceId);
26 let trace = TraceSource.get(sourceId);
28
27
29 trace.level = TraceSource.DebugLevel;
28 trace.level = TraceSource.DebugLevel;
30
29
31 let event = {
30 let event = {
32 name: "custom event"
31 name: "custom event"
33 };
32 };
34
33
35 let h = trace.on((ev) => {
34 let h = trace.on((ev) => {
36 t.equal(ev.source, trace, "sender should be the current trace source");
35 t.equal(ev.source, trace, "sender should be the current trace source");
37 t.equal(ev.level, TraceSource.DebugLevel, "level should be debug level");
36 t.equal(ev.level, TraceSource.DebugLevel, "level should be debug level");
38 t.equal(ev.arg, event, "The message should be the specified object");
37 t.equal(ev.arg, event, "The message should be the specified object");
39
38
40 t.end();
39 t.end();
41 });
40 });
42
41
43 trace.traceEvent(TraceSource.DebugLevel, event);
42 trace.traceEvent(TraceSource.DebugLevel, event);
44
43
45 h.destroy();
44 h.destroy();
46 });
45 });
47
46
48 tape('tape comment writer', async t => {
47 tape('tape comment writer', async t => {
49 let writer = new TapeWriter(t);
48 let writer = new TapeWriter(t);
50
49
51 TraceSource.on(ts => {
50 TraceSource.on(ts => {
52 writer.writeEvents(ts);
51 writer.writeEvents(ts);
53 });
52 });
54
53
55 let trace = TraceSource.get(sourceId);
54 let trace = TraceSource.get(sourceId);
56 trace.level = TraceSource.DebugLevel;
55 trace.level = TraceSource.DebugLevel;
57
56
58 trace.log("Hello, {0}!", 'World');
57 trace.log("Hello, {0}!", 'World');
59 trace.log("Multi\n line");
58 trace.log("Multi\n line");
60 trace.warn("Look at me!");
59 trace.warn("Look at me!");
61 trace.error("DIE!");
60 trace.error("DIE!");
62
61
63 writer.destroy();
62 writer.destroy();
64
63
65 trace.log("You shouldn't see it!");
64 trace.log("You shouldn't see it!");
66
65
67 t.comment("DONE");
66 t.comment("DONE");
68
67
69 t.end();
68 t.end();
70 }); No newline at end of file
69 });
General Comments 0
You need to be logged in to leave comments. Login now