##// END OF EJS Templates
Added Cancellation.combine(...tokens) method to combine multiple cancellation...
cin -
r170:1391543e8282 v1.4.5 default
parent child
Show More
@@ -0,0 +1,41
1 import { ICancellation, IDestroyable } from "./interfaces";
2
3 export class CancellationAggregate implements ICancellation {
4 private readonly _tokens: ICancellation[];
5
6 constructor(tokens: ICancellation[]) {
7 this._tokens = tokens || [];
8 }
9
10 throwIfRequested() {
11 this._tokens.forEach(ct => ct.throwIfRequested());
12 }
13
14 isRequested() {
15 return this._tokens.some(ct => ct.isRequested());
16 }
17 isSupported() {
18 return !!this._tokens.length;
19 }
20 register(cb: (e: any) => void): IDestroyable {
21 let fired = false;
22
23 const once = (e: any) => {
24 if (!fired) {
25 fired = true;
26 destroy();
27 cb(e);
28 }
29 }
30
31 const destroy = () => subscriptions
32 .splice(0,subscriptions.length) // empty array
33 .forEach(subscription => subscription.destroy()); // cleanup
34
35 const subscriptions = this._tokens.map(ct => ct.register(once))
36
37 return {
38 destroy
39 };
40 }
41 }
@@ -1,83 +1,106
1 import { CancellationAggregate } from "./CancellationAggregate";
1 import { ICancellation, IDestroyable } from "./interfaces";
2 import { ICancellation, IDestroyable } from "./interfaces";
2 import { argumentNotNull, destroyed } from "./safe";
3 import { argumentNotNull, destroyed } from "./safe";
3
4
4 export class Cancellation implements ICancellation {
5 export class Cancellation implements ICancellation {
5 private _reason: any;
6 private _reason: any;
6 private _cbs: Array<(e: any) => void> | undefined;
7 private _cbs: Array<(e: any) => void> | undefined;
7
8
8 constructor(action: (cancel: (e?: any) => void) => void) {
9 constructor(action: (cancel: (e?: any) => void) => void) {
9 argumentNotNull(action, "action");
10 argumentNotNull(action, "action");
10
11
11 action(this._cancel.bind(this));
12 action(this._cancel.bind(this));
12 }
13 }
13
14
14 isSupported(): boolean {
15 isSupported(): boolean {
15 return true;
16 return true;
16 }
17 }
17 throwIfRequested(): void {
18 throwIfRequested(): void {
18 if (this._reason)
19 if (this._reason)
19 throw this._reason;
20 throw this._reason;
20 }
21 }
21
22
22 isRequested(): boolean {
23 isRequested(): boolean {
23 return !!this._reason;
24 return !!this._reason;
24 }
25 }
25
26
26 register(cb: (e: any) => void): IDestroyable {
27 register(cb: (e: any) => void): IDestroyable {
27 argumentNotNull(cb, "cb");
28 argumentNotNull(cb, "cb");
28
29
29 if (this._reason) {
30 if (this._reason) {
30 cb(this._reason);
31 cb(this._reason);
31 return destroyed;
32 return destroyed;
32 } else {
33 } else {
33 if (!this._cbs)
34 if (!this._cbs)
34 this._cbs = [cb];
35 this._cbs = [cb];
35 else
36 else
36 this._cbs.push(cb);
37 this._cbs.push(cb);
37
38
38 const me = this;
39 const me = this;
39 return {
40 return {
40 destroy() {
41 destroy() {
41 me._unregister(cb);
42 me._unregister(cb);
42 }
43 }
43 };
44 };
44 }
45 }
45 }
46 }
46
47
47 private _unregister(cb: any) {
48 private _unregister(cb: any) {
48 if (this._cbs) {
49 if (this._cbs) {
49 const i = this._cbs.indexOf(cb);
50 const i = this._cbs.indexOf(cb);
50 if (i >= 0)
51 if (i >= 0)
51 this._cbs.splice(i, 1);
52 this._cbs.splice(i, 1);
52 }
53 }
53 }
54 }
54
55
55 private _cancel(reason: any) {
56 private _cancel(reason: any) {
56 if (this._reason)
57 if (this._reason)
57 return;
58 return;
58
59
59 this._reason = (reason = reason || new Error("Operation cancelled"));
60 this._reason = (reason = reason || new Error("Operation cancelled"));
60
61
61 if (this._cbs) {
62 if (this._cbs) {
62 this._cbs.forEach(cb => cb(reason));
63 this._cbs.forEach(cb => cb(reason));
63 this._cbs = undefined;
64 this._cbs = undefined;
64 }
65 }
65 }
66 }
66
67
67 static readonly none: ICancellation = {
68 static readonly none: ICancellation = {
68 isSupported(): boolean {
69 isSupported(): boolean {
69 return false;
70 return false;
70 },
71 },
71
72
72 throwIfRequested(): void {
73 throwIfRequested(): void {
73 },
74 },
74
75
75 isRequested(): boolean {
76 isRequested(): boolean {
76 return false;
77 return false;
77 },
78 },
78
79
79 register(_cb: (e: any) => void): IDestroyable {
80 register(_cb: (e: any) => void): IDestroyable {
80 return destroyed;
81 return destroyed;
81 }
82 }
82 };
83 };
84
85 /**
86 * Combines multiple cancellation tokens to the single aggregated token.
87 *
88 * Aggregated token will be considered as signalled when some tokens are
89 * signalled. The cancellation callback can be registered with the `register`
90 * method, it will be fired once with the first signalled token, all other
91 * tokens will be ignored.
92 *
93 * The tokens which don't support cancellation are filtered out, if there are
94 * no tokens left in the list the method returns `Cancellation.none`.
95 *
96 * @param args The list of cancellation tokens to combine
97 * @returns
98 */
99 static combine(...args: ICancellation[]) {
100 const tokens = args.filter(ct => ct.isSupported());
101 return tokens.length > 1 ?
102 new CancellationAggregate(tokens) :
103 tokens.length == 1 ? tokens[0] :
104 this.none;
105 }
83 }
106 }
@@ -1,41 +1,35
1 import { Cancellation } from "../Cancellation";
1 import { Cancellation } from "../Cancellation";
2 import { IAsyncComponent, ICancellation, ICancellable, IDestroyable } from "../interfaces";
2 import { IAsyncComponent, ICancellation, ICancellable } from "../interfaces";
3 import { destroy } from "../safe";
4
3
5 const noop = () => void (0);
4 const noop = () => void (0);
6
5
7 export class AsyncComponent implements IAsyncComponent, ICancellable {
6 export class AsyncComponent implements IAsyncComponent, ICancellable {
8 _cancel: ((e: any) => void) = noop;
7 _cancel: ((e: any) => void) = noop;
9
8
10 _completion: Promise<void> = Promise.resolve();
9 _completion: Promise<void> = Promise.resolve();
11
10
12 getCompletion() { return this._completion; }
11 getCompletion() { return this._completion; }
13
12
14 runOperation(op: (ct: ICancellation) => any, ct: ICancellation = Cancellation.none) {
13 runOperation(op: (ct: ICancellation) => any, ct: ICancellation = Cancellation.none) {
15 // create inner cancellation bound to the passed cancellation token
14 // create inner cancellation bound to the passed cancellation token
16 let h: IDestroyable;
17 const inner = new Cancellation(cancel => {
15 const inner = new Cancellation(cancel => {
18
19 this._cancel = cancel;
16 this._cancel = cancel;
20 h = ct.register(cancel);
21 });
17 });
22
18
23 // TODO create cancellation source here
24 const guard = async () => {
19 const guard = async () => {
25 try {
20 try {
26 await op(inner);
21 return op(Cancellation.combine(ct, inner));
27 } finally {
22 } finally {
28 // after the operation is complete we need to cleanup the
23 // after the operation is complete we need to cleanup the
29 // resources
24 // resources
30 destroy(h);
31 this._cancel = noop;
25 this._cancel = noop;
32 }
26 }
33 };
27 };
34
28
35 return this._completion = guard();
29 return this._completion = guard();
36 }
30 }
37
31
38 cancel(reason: any) {
32 cancel(reason: any) {
39 this._cancel(reason);
33 this._cancel(reason);
40 }
34 }
41 }
35 }
@@ -1,88 +1,106
1 import { Cancellation } from "../Cancellation";
1 import { Cancellation } from "../Cancellation";
2 import { CancellationAggregate } from "../CancellationAggregate";
2 import { delay, notImplemented } from "../safe";
3 import { delay, notImplemented } from "../safe";
3 import { test } from "./TestTraits";
4 import { test } from "./TestTraits";
4
5
5 test("standalone cancellation", async t => {
6 test("standalone cancellation", async t => {
6
7
7 let doCancel: (e: any) => void = notImplemented;
8 let doCancel: (e: any) => void = notImplemented;
8
9
9 const ct = new Cancellation(cancel => {
10 const ct = new Cancellation(cancel => {
10 doCancel = cancel;
11 doCancel = cancel;
11 });
12 });
12
13
13 let counter = 0;
14 let counter = 0;
14 const reason = "BILL";
15 const reason = "BILL";
15
16
16 t.true(ct.isSupported(), "Cancellation must be supported");
17 t.true(ct.isSupported(), "Cancellation must be supported");
17 t.false(ct.isRequested(), "Cancellation shouldn't be requested");
18 t.false(ct.isRequested(), "Cancellation shouldn't be requested");
18 ct.throwIfRequested();
19 ct.throwIfRequested();
19 t.pass("The exception shouldn't be thrown unless the cancellation is requested");
20 t.pass("The exception shouldn't be thrown unless the cancellation is requested");
20
21
21 ct.register(() => counter++);
22 ct.register(() => counter++);
22 t.equals(counter, 0, "counter should be zero");
23 t.equals(counter, 0, "counter should be zero");
23
24
24 ct.register(() => counter++).destroy();
25 ct.register(() => counter++).destroy();
25
26
26 doCancel(reason);
27 doCancel(reason);
27
28
28 t.true(ct.isRequested(), "Cancellation should be requested");
29 t.true(ct.isRequested(), "Cancellation should be requested");
29 t.equals(counter, 1, "The registered callback should be triggered");
30 t.equals(counter, 1, "The registered callback should be triggered");
30
31
31 ct.register(() => counter++);
32 ct.register(() => counter++);
32 t.equals(counter, 2, "The callback should be triggered immediately");
33 t.equals(counter, 2, "The callback should be triggered immediately");
33
34
34 let msg;
35 let msg;
35 ct.register(e => msg = e);
36 ct.register(e => msg = e);
36 t.equals(msg, reason, "The cancellation reason should be passed to callback");
37 t.equals(msg, reason, "The cancellation reason should be passed to callback");
37
38
38 try {
39 try {
39 msg = null;
40 msg = null;
40 ct.throwIfRequested();
41 ct.throwIfRequested();
41 t.fail("The exception should be thrown");
42 t.fail("The exception should be thrown");
42 } catch (e) {
43 } catch (e) {
43 msg = e;
44 msg = e;
44 }
45 }
45 t.equals(msg, reason, "The cancellation reason should be catched");
46 t.equals(msg, reason, "The cancellation reason should be catched");
46 });
47 });
47
48
48 test("async cancellation", async t => {
49 test("async cancellation", async t => {
49
50
50 const ct = new Cancellation(cancel => {
51 const ct = new Cancellation(cancel => {
51 cancel("STOP!");
52 cancel("STOP!");
52 });
53 });
53
54
54 try {
55 try {
55 await delay(0, ct);
56 await delay(0, ct);
56 t.fail("Should thow the exception");
57 t.fail("Should thow the exception");
57 } catch (e) {
58 } catch (e) {
58 t.equals(e, "STOP!", "Should throw the cancellation reason");
59 t.equals(e, "STOP!", "Should throw the cancellation reason");
59 }
60 }
60 });
61 });
61
62
62 test("cancel with external event", async t => {
63 test("cancel with external event", async t => {
63 const ct = new Cancellation(cancel => {
64 const ct = new Cancellation(cancel => {
64 setTimeout(x => cancel("STOP!"), 0);
65 setTimeout(x => cancel("STOP!"), 0);
65 });
66 });
66
67
67 try {
68 try {
68 await delay(10000, ct);
69 await delay(10000, ct);
69 t.fail("Should thow the exception");
70 t.fail("Should thow the exception");
70 } catch (e) {
71 } catch (e) {
71 t.equals(e, "STOP!", "Should throw the cancellation reason");
72 t.equals(e, "STOP!", "Should throw the cancellation reason");
72 }
73 }
73 });
74 });
74
75
75 test("operation normal flow", async t => {
76 test("operation normal flow", async t => {
76
77
77 let htimeout;
78 let htimeout;
78 const ct = new Cancellation(cancel => {
79 const ct = new Cancellation(cancel => {
79 htimeout = setTimeout(() => cancel("STOP!"), 1000);
80 htimeout = setTimeout(() => cancel("STOP!"), 1000);
80 });
81 });
81
82
82 try {
83 try {
83 await delay(0, ct);
84 await delay(0, ct);
84 t.pass("Should pass");
85 t.pass("Should pass");
85 } finally {
86 } finally {
86 clearTimeout(htimeout);
87 clearTimeout(htimeout);
87 }
88 }
88 });
89 });
90
91 test("combine cancellations", t => {
92 const ct1 = new Cancellation(cancel => {
93 cancel("cancelled");
94 });
95
96 const ct2 = new Cancellation(() => {});
97
98 const cct1 = Cancellation.combine(Cancellation.none, ct1);
99
100 t.equals(cct1, ct1, "Cancellation.combine should filter out Cancellation.none tokens");
101
102 const cct2 = Cancellation.combine(Cancellation.none, ct1, ct2);
103
104 t.assert(cct2 instanceof CancellationAggregate, "Cancellation.combine should return CancellationAggregate");
105 t.true(cct2.isRequested(), "CancellationAggregate should return isRequested true if any of cancellations is requested");
106 });
General Comments 0
You need to be logged in to leave comments. Login now