##// END OF EJS Templates
Added Cancellation.combine(...tokens) method to combine multiple cancellation...
cin -
r170:1391543e8282 v1.4.5 default
parent child
Show More
@@ -0,0 +1,41
1 import { ICancellation, IDestroyable } from "./interfaces";
2
3 export class CancellationAggregate implements ICancellation {
4 private readonly _tokens: ICancellation[];
5
6 constructor(tokens: ICancellation[]) {
7 this._tokens = tokens || [];
8 }
9
10 throwIfRequested() {
11 this._tokens.forEach(ct => ct.throwIfRequested());
12 }
13
14 isRequested() {
15 return this._tokens.some(ct => ct.isRequested());
16 }
17 isSupported() {
18 return !!this._tokens.length;
19 }
20 register(cb: (e: any) => void): IDestroyable {
21 let fired = false;
22
23 const once = (e: any) => {
24 if (!fired) {
25 fired = true;
26 destroy();
27 cb(e);
28 }
29 }
30
31 const destroy = () => subscriptions
32 .splice(0,subscriptions.length) // empty array
33 .forEach(subscription => subscription.destroy()); // cleanup
34
35 const subscriptions = this._tokens.map(ct => ct.register(once))
36
37 return {
38 destroy
39 };
40 }
41 }
@@ -1,83 +1,106
1 import { CancellationAggregate } from "./CancellationAggregate";
1 2 import { ICancellation, IDestroyable } from "./interfaces";
2 3 import { argumentNotNull, destroyed } from "./safe";
3 4
4 5 export class Cancellation implements ICancellation {
5 6 private _reason: any;
6 7 private _cbs: Array<(e: any) => void> | undefined;
7 8
8 9 constructor(action: (cancel: (e?: any) => void) => void) {
9 10 argumentNotNull(action, "action");
10 11
11 12 action(this._cancel.bind(this));
12 13 }
13 14
14 15 isSupported(): boolean {
15 16 return true;
16 17 }
17 18 throwIfRequested(): void {
18 19 if (this._reason)
19 20 throw this._reason;
20 21 }
21 22
22 23 isRequested(): boolean {
23 24 return !!this._reason;
24 25 }
25 26
26 27 register(cb: (e: any) => void): IDestroyable {
27 28 argumentNotNull(cb, "cb");
28 29
29 30 if (this._reason) {
30 31 cb(this._reason);
31 32 return destroyed;
32 33 } else {
33 34 if (!this._cbs)
34 35 this._cbs = [cb];
35 36 else
36 37 this._cbs.push(cb);
37 38
38 39 const me = this;
39 40 return {
40 41 destroy() {
41 42 me._unregister(cb);
42 43 }
43 44 };
44 45 }
45 46 }
46 47
47 48 private _unregister(cb: any) {
48 49 if (this._cbs) {
49 50 const i = this._cbs.indexOf(cb);
50 51 if (i >= 0)
51 52 this._cbs.splice(i, 1);
52 53 }
53 54 }
54 55
55 56 private _cancel(reason: any) {
56 57 if (this._reason)
57 58 return;
58 59
59 60 this._reason = (reason = reason || new Error("Operation cancelled"));
60 61
61 62 if (this._cbs) {
62 63 this._cbs.forEach(cb => cb(reason));
63 64 this._cbs = undefined;
64 65 }
65 66 }
66 67
67 68 static readonly none: ICancellation = {
68 69 isSupported(): boolean {
69 70 return false;
70 71 },
71 72
72 73 throwIfRequested(): void {
73 74 },
74 75
75 76 isRequested(): boolean {
76 77 return false;
77 78 },
78 79
79 80 register(_cb: (e: any) => void): IDestroyable {
80 81 return destroyed;
81 82 }
82 83 };
84
85 /**
86 * Combines multiple cancellation tokens to the single aggregated token.
87 *
88 * Aggregated token will be considered as signalled when some tokens are
89 * signalled. The cancellation callback can be registered with the `register`
90 * method, it will be fired once with the first signalled token, all other
91 * tokens will be ignored.
92 *
93 * The tokens which don't support cancellation are filtered out, if there are
94 * no tokens left in the list the method returns `Cancellation.none`.
95 *
96 * @param args The list of cancellation tokens to combine
97 * @returns
98 */
99 static combine(...args: ICancellation[]) {
100 const tokens = args.filter(ct => ct.isSupported());
101 return tokens.length > 1 ?
102 new CancellationAggregate(tokens) :
103 tokens.length == 1 ? tokens[0] :
104 this.none;
105 }
83 106 }
@@ -1,41 +1,35
1 1 import { Cancellation } from "../Cancellation";
2 import { IAsyncComponent, ICancellation, ICancellable, IDestroyable } from "../interfaces";
3 import { destroy } from "../safe";
2 import { IAsyncComponent, ICancellation, ICancellable } from "../interfaces";
4 3
5 4 const noop = () => void (0);
6 5
7 6 export class AsyncComponent implements IAsyncComponent, ICancellable {
8 7 _cancel: ((e: any) => void) = noop;
9 8
10 9 _completion: Promise<void> = Promise.resolve();
11 10
12 11 getCompletion() { return this._completion; }
13 12
14 13 runOperation(op: (ct: ICancellation) => any, ct: ICancellation = Cancellation.none) {
15 14 // create inner cancellation bound to the passed cancellation token
16 let h: IDestroyable;
17 15 const inner = new Cancellation(cancel => {
18
19 16 this._cancel = cancel;
20 h = ct.register(cancel);
21 17 });
22 18
23 // TODO create cancellation source here
24 19 const guard = async () => {
25 20 try {
26 await op(inner);
21 return op(Cancellation.combine(ct, inner));
27 22 } finally {
28 23 // after the operation is complete we need to cleanup the
29 24 // resources
30 destroy(h);
31 25 this._cancel = noop;
32 26 }
33 27 };
34 28
35 29 return this._completion = guard();
36 30 }
37 31
38 32 cancel(reason: any) {
39 33 this._cancel(reason);
40 34 }
41 35 }
@@ -1,88 +1,106
1 1 import { Cancellation } from "../Cancellation";
2 import { CancellationAggregate } from "../CancellationAggregate";
2 3 import { delay, notImplemented } from "../safe";
3 4 import { test } from "./TestTraits";
4 5
5 6 test("standalone cancellation", async t => {
6 7
7 8 let doCancel: (e: any) => void = notImplemented;
8 9
9 10 const ct = new Cancellation(cancel => {
10 11 doCancel = cancel;
11 12 });
12 13
13 14 let counter = 0;
14 15 const reason = "BILL";
15 16
16 17 t.true(ct.isSupported(), "Cancellation must be supported");
17 18 t.false(ct.isRequested(), "Cancellation shouldn't be requested");
18 19 ct.throwIfRequested();
19 20 t.pass("The exception shouldn't be thrown unless the cancellation is requested");
20 21
21 22 ct.register(() => counter++);
22 23 t.equals(counter, 0, "counter should be zero");
23 24
24 25 ct.register(() => counter++).destroy();
25 26
26 27 doCancel(reason);
27 28
28 29 t.true(ct.isRequested(), "Cancellation should be requested");
29 30 t.equals(counter, 1, "The registered callback should be triggered");
30 31
31 32 ct.register(() => counter++);
32 33 t.equals(counter, 2, "The callback should be triggered immediately");
33 34
34 35 let msg;
35 36 ct.register(e => msg = e);
36 37 t.equals(msg, reason, "The cancellation reason should be passed to callback");
37 38
38 39 try {
39 40 msg = null;
40 41 ct.throwIfRequested();
41 42 t.fail("The exception should be thrown");
42 43 } catch (e) {
43 44 msg = e;
44 45 }
45 46 t.equals(msg, reason, "The cancellation reason should be catched");
46 47 });
47 48
48 49 test("async cancellation", async t => {
49 50
50 51 const ct = new Cancellation(cancel => {
51 52 cancel("STOP!");
52 53 });
53 54
54 55 try {
55 56 await delay(0, ct);
56 57 t.fail("Should thow the exception");
57 58 } catch (e) {
58 59 t.equals(e, "STOP!", "Should throw the cancellation reason");
59 60 }
60 61 });
61 62
62 63 test("cancel with external event", async t => {
63 64 const ct = new Cancellation(cancel => {
64 65 setTimeout(x => cancel("STOP!"), 0);
65 66 });
66 67
67 68 try {
68 69 await delay(10000, ct);
69 70 t.fail("Should thow the exception");
70 71 } catch (e) {
71 72 t.equals(e, "STOP!", "Should throw the cancellation reason");
72 73 }
73 74 });
74 75
75 76 test("operation normal flow", async t => {
76 77
77 78 let htimeout;
78 79 const ct = new Cancellation(cancel => {
79 80 htimeout = setTimeout(() => cancel("STOP!"), 1000);
80 81 });
81 82
82 83 try {
83 84 await delay(0, ct);
84 85 t.pass("Should pass");
85 86 } finally {
86 87 clearTimeout(htimeout);
87 88 }
88 89 });
90
91 test("combine cancellations", t => {
92 const ct1 = new Cancellation(cancel => {
93 cancel("cancelled");
94 });
95
96 const ct2 = new Cancellation(() => {});
97
98 const cct1 = Cancellation.combine(Cancellation.none, ct1);
99
100 t.equals(cct1, ct1, "Cancellation.combine should filter out Cancellation.none tokens");
101
102 const cct2 = Cancellation.combine(Cancellation.none, ct1, ct2);
103
104 t.assert(cct2 instanceof CancellationAggregate, "Cancellation.combine should return CancellationAggregate");
105 t.true(cct2.isRequested(), "CancellationAggregate should return isRequested true if any of cancellations is requested");
106 });
General Comments 0
You need to be logged in to leave comments. Login now