##// END OF EJS Templates
Added safe.delay...
cin -
r76:a193ba786ffc default
parent child
Show More
@@ -0,0 +1,86
1 import tape = require("tape");
2 import { delay } from "./TestTraits";
3 import { Cancellation } from "@implab/core/Cancellation";
4 import { first, isPromise } from "@implab/core/safe";
5
6 tape("await delay test", async t => {
7 // schedule delay
8 let resolved = false;
9 let res = delay(0).then(() => resolved = true);
10
11 t.false(resolved, "the delay should be async");
12
13 await res;
14 t.pass("await delay");
15
16 // create cancellation token
17 let cancel: (e?: any) => void;
18 const ct = new Cancellation(c => cancel = c);
19
20 // schedule delay
21 resolved = false;
22 res = delay(0, ct).then(() => resolved = true);
23
24 t.false(resolved, "created delay with ct");
25
26 // cancel
27 cancel();
28
29 try {
30 await res;
31 t.fail("the delay should fail when it is cancelled");
32 } catch {
33 t.pass("the delay is cancelled");
34 }
35
36 let died = false;
37
38 // try schedule delay after the cancellation is requested
39 res = delay(0, ct).then(x => true, () => died = true);
40
41 t.false(died, "The delay should be scheduled even if the cancellation is requested");
42
43 await res;
44 t.true(died, "the delay should fail when cancelled");
45
46 t.end();
47 });
48
49 tape("sequemce test", async t => {
50 const sequence = ["a", "b", "c"];
51 const empty = [];
52
53 // synchronous tests
54 t.equals(first(sequence), "a", "Should return the first element");
55
56 let v: string;
57 let e: Error;
58 first(sequence, x => v = x);
59 t.equal(v, "a", "The callback should be called for the first element");
60
61 t.throws(() => {
62 first(empty);
63 }, "Should throw when the sequence is empty");
64
65 t.throws(() => {
66 first(empty, x => v = x);
67 }, "Should throw when the sequence is empty");
68
69 first(empty, null, x => e = x);
70 t.true(e, "The errorback should be called for the empty sequence");
71
72 // async tests
73 const asyncSequence = Promise.resolve(sequence);
74 const asyncEmptySequence = Promise.resolve(empty);
75
76 const promise = first(asyncSequence);
77 t.true(isPromise(promise), "Should return promise");
78
79 v = await promise;
80 t.equal(v, "a", "Should return the first element");
81
82 v = await new Promise(resolve => first(asyncSequence, resolve));
83 t.equal(v, "a", "The callback should be called for the first element");
84
85 t.end();
86 });
@@ -65,4 +65,4
65
65
66 ```shell
66 ```shell
67 ./gradlew test pack -Pjsmodule=commonjs -Ptarget=es2017
67 ./gradlew test pack -Pjsmodule=commonjs -Ptarget=es2017
68 ``` No newline at end of file
68 ```
@@ -1,16 +1,11
1 import { ICancellation, IDestroyable } from "./interfaces";
1 import { ICancellation, IDestroyable } from "./interfaces";
2 import { argumentNotNull } from "./safe";
2 import { argumentNotNull, destroyed } from "./safe";
3
4 const destroyed = {
5 destroy() {
6 }
7 };
8
3
9 export class Cancellation implements ICancellation {
4 export class Cancellation implements ICancellation {
10 private _reason: any;
5 private _reason: any;
11 private _cbs: Array<(e) => void>;
6 private _cbs: Array<(e: any) => void>;
12
7
13 constructor(action: (cancel: (e) => void) => void) {
8 constructor(action: (cancel: (e?: any) => void) => void) {
14 argumentNotNull(action, "action");
9 argumentNotNull(action, "action");
15
10
16 action(this._cancel.bind(this));
11 action(this._cancel.bind(this));
@@ -1,22 +1,17
1 import { IObservable, IDestroyable, ICancellation } from "./interfaces";
1 import { IObservable, IDestroyable, ICancellation, IObserver } from "./interfaces";
2 import { Cancellation } from "./Cancellation";
2 import { Cancellation } from "./Cancellation";
3 import { argumentNotNull } from "./safe";
3 import { argumentNotNull, destroyed } from "./safe";
4
4
5 type Handler<T> = (x: T) => void;
5 type Handler<T> = (x: T) => void;
6
6
7 type Initializer<T> = (notify: Handler<T>, error?: (e: any) => void, complete?: () => void) => void;
7 type Initializer<T> = (notify: Handler<T>, error?: (e: any) => void, complete?: () => void) => void;
8
8
9 // TODO: think about to move this interfaces.ts and make it public
9 const noop = () => { };
10 interface IObserver<T> {
11 next(event: T): void;
12
10
13 error(e: any): void;
11 function isObserver(val: any): val is IObserver<any> {
14
12 return val && (typeof val.next === "function");
15 complete(): void;
16 }
13 }
17
14
18 const noop = () => { };
19
20 export class Observable<T> implements IObservable<T> {
15 export class Observable<T> implements IObservable<T> {
21 private _once = new Array<IObserver<T>>();
16 private _once = new Array<IObserver<T>>();
22
17
@@ -65,6 +60,35 export class Observable<T> implements IO
65 return observer;
60 return observer;
66 }
61 }
67
62
63 subscribe(next: IObserver<T> | Handler<T>, error?: Handler<any>, complete?: () => void): IDestroyable {
64 if (isObserver(next)) {
65 const me = this;
66 const subscription = {
67 destroy() {
68 me._removeObserver(next);
69 }
70 };
71 this._addObserver(next);
72 return subscription;
73 } else if (next) {
74 const observer = {
75 next,
76 error,
77 complete
78 };
79 const me = this;
80 const subscription = {
81 destroy() {
82 me._removeObserver(observer);
83 }
84 };
85 this._addObserver(observer);
86 return subscription;
87 } else {
88 return destroyed;
89 }
90 }
91
68 private _addObserver(observer: IObserver<T>) {
92 private _addObserver(observer: IObserver<T>) {
69 if (this._complete) {
93 if (this._complete) {
70 try {
94 try {
@@ -86,7 +110,7 export class Observable<T> implements IO
86 *
110 *
87 * @param ct a cancellation token
111 * @param ct a cancellation token
88 */
112 */
89 next(ct: ICancellation = Cancellation.none): Promise<T> {
113 next(ct: ICancellation = Cancellation.none) {
90 return new Promise<T>((resolve, reject) => {
114 return new Promise<T>((resolve, reject) => {
91 const observer: IObserver<T> = {
115 const observer: IObserver<T> = {
92 next: resolve,
116 next: resolve,
@@ -62,7 +62,7 export class Configuration {
62 _require: ModuleResolver;
62 _require: ModuleResolver;
63
63
64 constructor(container: Container) {
64 constructor(container: Container) {
65 argumentNotNull(container, container);
65 argumentNotNull(container, "container");
66 this._container = container;
66 this._container = container;
67 this._path = [];
67 this._path = [];
68 }
68 }
@@ -91,3 +91,11 export interface IObservable<T> {
91 on(next: (x: T) => void, error?: (e: any) => void, complete?: () => void): IDestroyable;
91 on(next: (x: T) => void, error?: (e: any) => void, complete?: () => void): IDestroyable;
92 next(ct?: ICancellation): Promise<T>;
92 next(ct?: ICancellation): Promise<T>;
93 }
93 }
94
95 export interface IObserver<T> {
96 next(event: T): void;
97
98 error(e: any): void;
99
100 complete(): void;
101 }
@@ -1,4 +1,5
1 import { ICancellable, Constructor } from "./interfaces";
1 import { ICancellable, Constructor } from "./interfaces";
2 import { Cancellation } from "./Cancellation";
2
3
3 let _nextOid = 0;
4 let _nextOid = 0;
4 const _oid = typeof Symbol === "function" ?
5 const _oid = typeof Symbol === "function" ?
@@ -246,6 +247,24 export function delegate<T, K extends ke
246 };
247 };
247 }
248 }
248
249
250 export function delay(timeMs: number, ct = Cancellation.none) {
251 return new Promise((resolve, reject) => {
252 if (ct.isRequested()) {
253 ct.register(reject);
254 } else {
255 const h = ct.register(e => {
256 clearTimeout(id);
257 reject(e);
258 // we don't nedd to unregister h, since ct is already disposed
259 });
260 const id = setTimeout(() => {
261 h.destroy();
262 resolve();
263 }, timeMs);
264 }
265 });
266 }
267
249 /**
268 /**
250 * Для каждого элемента массива вызывает указанную функцию и сохраняет
269 * Для каждого элемента массива вызывает указанную функцию и сохраняет
251 * возвращенное значение в массиве результатов.
270 * возвращенное значение в массиве результатов.
@@ -381,8 +400,8 export function firstWhere<T>(
381 export function firstWhere<T>(
400 export function firstWhere<T>(
382 sequence: ArrayLike<T> | PromiseLike<ArrayLike<T>>,
401 sequence: ArrayLike<T> | PromiseLike<ArrayLike<T>>,
383 predicate?: (x: T) => boolean,
402 predicate?: (x: T) => boolean,
384 cb?: (x: T) => void,
403 cb?: (x: T) => any,
385 err?: (x: Error) => void
404 err?: (x: Error) => any
386 ) {
405 ) {
387 if (isPromise(sequence)) {
406 if (isPromise(sequence)) {
388 return sequence.then(res => firstWhere(res, predicate, cb, err));
407 return sequence.then(res => firstWhere(res, predicate, cb, err));
@@ -394,7 +413,7 export function firstWhere<T>(
394 throw new Error("The sequence is empty");
413 throw new Error("The sequence is empty");
395 } else {
414 } else {
396 if (!predicate) {
415 if (!predicate) {
397 return cb ? cb(sequence[0]) : sequence[0];
416 return cb ? cb(sequence[0]) && void (0) : sequence[0];
398 } else {
417 } else {
399 for (let i = 0; i < sequence.length; i++) {
418 for (let i = 0; i < sequence.length; i++) {
400 const v = sequence[i];
419 const v = sequence[i];
@@ -426,3 +445,12 export function destroy(d: any) {
426 */
445 */
427 export function nowait(p: Promise<any>) {
446 export function nowait(p: Promise<any>) {
428 }
447 }
448
449 /** represents already destroyed object.
450 */
451 export const destroyed = {
452 /** Calling to this method doesn't affect anything, noop.
453 */
454 destroy() {
455 }
456 };
@@ -4,5 +4,6 define([
4 "./TraceSourceTests",
4 "./TraceSourceTests",
5 "./CancellationTests",
5 "./CancellationTests",
6 "./ObservableTests",
6 "./ObservableTests",
7 "./ContainerTests"
7 "./ContainerTests",
8 "./SafeTests"
8 ]); No newline at end of file
9 ]);
General Comments 0
You need to be logged in to leave comments. Login now