##// END OF EJS Templates
Implemented subscription SubscriptionImpl, fixed subscription resource management
cin -
r158:078eca3dc271 v1.10.3 default
parent child
Show More
@@ -0,0 +1,58
1 import { ICancellation } from "@implab/core-amd/interfaces";
2
3 export interface Subscription {
4 finalize(): boolean;
5
6 isClosed(): boolean;
7
8 teardown(cleanup: void | (() => void)): void;
9
10 around<T>(next: (item: T) => void): (item: T) => void;
11
12 unsubscribe(): void;
13 }
14
15 const noop = () => {};
16
17
18 export class SubscriptionImpl implements Subscription {
19
20 private _done = false;
21
22 private _cleanup = noop;
23
24 finalize() {
25 return this._done ? false : ((0,this._cleanup)(), this._done = true);
26 }
27
28 isClosed() {
29 return this._done;
30 }
31
32 teardown(cleanup: void | (() => void)) {
33 if (cleanup) {
34 if (this._done) {
35 cleanup();
36 } else {
37 const _prev = this._cleanup;
38 this._cleanup = _prev === noop ? cleanup : () => (_prev(), cleanup());
39 }
40 }
41 }
42
43 cancellable(ct: ICancellation, reject: (err: unknown) => void) {
44 if (ct.isSupported()) {
45 const h = ct.register(reason => this.finalize() && reject(reason));
46 this.teardown(() => h.destroy());
47 }
48 }
49
50 unsubscribe() {
51 this.finalize();
52 }
53
54 around<T>(next: (item: T) => void): (item: T) => void {
55 return (item) => this._done ? void(0) : next(item);
56 }
57
58 } No newline at end of file
@@ -1,6 +1,6
1 1 import { isPromise } from "@implab/core-amd/safe";
2 2 import { Unsubscribable, Producer, Observable, Subscribable } from "./observable/interfaces";
3 import { ObservableImpl } from "./observable/ObservableImpl";
3 import { ObservableImpl, fuse } from "./observable/ObservableImpl";
4 4
5 5 export const isUnsubscribable = (v: unknown): v is Unsubscribable =>
6 6 v !== null && v !== undefined && typeof (v as Unsubscribable).unsubscribe === "function";
@@ -8,7 +8,7 export const isUnsubscribable = (v: unkn
8 8 export const isSubscribable = <T = unknown>(v: unknown): v is Subscribable<T> =>
9 9 v !== null && v !== undefined && typeof (v as Subscribable<unknown>).subscribe === "function";
10 10
11 export const observe = <T>(producer: Producer<T>): Observable<T> => new ObservableImpl<T>(producer);
11 export const observe = <T>(producer: Producer<T>): Observable<T> => new ObservableImpl<T>(fuse(producer));
12 12
13 13 /** Converts an array to the observable sequence of its elements. */
14 14 export const ofArray = <T>(items: T[]): Observable<T> => new ObservableImpl<T>(
@@ -3,6 +3,7 import { AccumulatorFn, Observable, Obse
3 3 import { TraceSource } from "@implab/core-amd/log/TraceSource";
4 4 import { Predicate } from "@implab/core-amd/interfaces";
5 5 import { Cancellation } from "@implab/core-amd/Cancellation";
6 import { Subscription, SubscriptionImpl } from "./SubscriptionImpl";
6 7
7 8 const trace = TraceSource.get(mid);
8 9
@@ -10,54 +11,23 const noop = () => { };
10 11
11 12 const errorFallback = (e: unknown) => trace.error("Unhandled observable error: {0}", e);
12 13
13 const subscription = (reject: (err: unknown) => void, ct = Cancellation.none) => {
14 let done = false;
15 let cleanup = noop;
16
17 const finalize = () => done ? false : (cleanup(), done = true);
18 const isClosed = () => done;
19
20 const teardown = (cb: void | (() => void)) => {
21 if (cb) {
22 if (done) {
23 cb();
24 } else {
25 const _prev = cleanup;
26 cleanup = _prev === noop ? cb : () => (_prev(), cb());
27 }
28 }
29 };
30
31 const error = (err: unknown) => finalize() && reject(err);
32
33 if (ct.isSupported()) {
34 const h = ct.register(error);
35 teardown(() => h.destroy());
36 }
37
38 return { finalize, isClosed, teardown, error };
39 };
40 14
41 15 type FusedSink<T> = Omit<Sink<T>, "isClosed">;
42 16
43 export const fuse = <T>(producer: Producer<T>) => (consumer: FusedSink<T>) => {
44
45 const reject = (err: unknown) => consumer.error ? consumer.error(err) : errorFallback(err);
46 const { teardown, isClosed, finalize, error } = subscription(reject);
17 export const fuse = <T>(producer: Producer<T>) =>
18 (sink: FusedSink<T>, subscription: Subscription): Unsubscribable => {
47 19
48 const next = consumer.next ? consumer.next.bind(consumer) : noop;
49
50 teardown(producer({
51 next: next !== noop ? (v => isClosed() || next(v)) : noop,
52 error,
53 complete: () => finalize() && consumer.complete && consumer.complete(),
54 isClosed
20 subscription.teardown(producer({
21 next: subscription.around(sink.next),
22 complete: () => subscription.finalize() && sink.complete(),
23 error: err => subscription.finalize() && sink.error(err),
24 isClosed: subscription.isClosed.bind(subscription)
55 25 }));
56 26
57 return { unsubscribe: finalize };
27 return subscription;
58 28 };
59 29
60 type FusedProducer<T> = (consumer: FusedSink<T>) => void | (() => void);
30 type FusedProducer<T> = (sink: FusedSink<T>, subscription: Subscription) => void;
61 31
62 32 export class ObservableImpl<T> implements Observable<T> {
63 33
@@ -67,64 +37,68 export class ObservableImpl<T> implement
67 37 this._producer = producer;
68 38 }
69 39
70 subscribe(consumer: Observer<T> = {}) {
71 const reject = (err: unknown) => consumer.error ? consumer.error(err) : errorFallback(err);
40 subscribe(consumer: Observer<T> = {}): Unsubscribable {
41 const error = (err: unknown) => consumer.error ? consumer.error(err) : errorFallback(err);
72 42 const next = consumer.next ? consumer.next.bind(consumer) : noop;
73
74 const { teardown, isClosed, finalize, error } = subscription(reject);
43 const complete = () => consumer.complete ? consumer.complete() : void (0);
75 44
76 teardown((0, this._producer)({
77 next: next !== noop ? (v => isClosed() || next(v)) : noop,
78 error,
79 complete: () => finalize() && consumer.complete && consumer.complete()
80 }));
45 const subscription = new SubscriptionImpl();
81 46
82 return { unsubscribe: finalize };
47 this._producer({ next, error, complete }, subscription);
48
49 return subscription;
83 50 }
84 51
85 52 map<T2>(mapper: (value: T) => T2) {
86 return new ObservableImpl<T2>(({ next, ...rest }) =>
87 this._producer({
53 return new ObservableImpl<T2>(({ next, ...rest }, subscription) =>
54 (0, this._producer)(
55 {
88 56 next: next !== noop ? (v: T) => next(mapper(v)) : noop,
89 57 ...rest
90 }));
58 }, subscription
59 ));
91 60 }
92 61
93 62 tap({ next: tapNext, complete: tapComplete, error: tapError }: Observer<T>) {
94 return new ObservableImpl<T>(({ next, complete, error }) =>
95 this._producer({
63 return new ObservableImpl<T>(({ next, complete, error }, subscription) =>
64 (0, this._producer)(
65 {
96 66 next: tapNext ? (v => (tapNext(v), next(v))) : next,
97 67 complete: tapComplete ? (() => (tapComplete(), complete())) : complete,
98 68 error: tapError ? (e => (tapError(e), error(e))) : error
99 }));
69 },
70 subscription
71 ));
100 72 }
101 73
102 74 filter(predicate: Predicate<T>) {
103 return new ObservableImpl<T>(({ next, ...rest }) =>
104 this._producer({
75 return new ObservableImpl<T>(({ next, ...rest }, subscription) =>
76 (0, this._producer)(
77 {
105 78 next: next !== noop ? (v: T) => predicate(v) ? next(v) : void (0) : noop,
106 79 ...rest
107 })
108 );
80 },
81 subscription
82 ));
109 83 }
110 84
111 85 until(predicate: Predicate<T>) {
112 return new ObservableImpl<T>(({ next, complete, ...rest }) =>
113 this._producer({
114 next: v => predicate(v) ? complete() : next(v),
86 return new ObservableImpl<T>(({ next, complete, ...rest }, subscription) =>
87 (0, this._producer)({
88 next: v => predicate(v) ? subscription.finalize() && complete() : next(v),
115 89 complete,
116 90 ...rest
117 })
91 }, subscription)
118 92 );
119 93 }
120 94
121 95 while(predicate: Predicate<T>) {
122 return new ObservableImpl<T>(({ next, complete, ...rest }) =>
123 this._producer({
124 next: v => predicate(v) ? next(v) : complete(),
96 return new ObservableImpl<T>(({ next, complete, ...rest }, subscription) =>
97 (0, this._producer)({
98 next: v => predicate(v) ? next(v) : subscription.finalize() && complete(),
125 99 complete,
126 100 ...rest
127 })
101 }, subscription)
128 102 );
129 103 }
130 104
@@ -132,23 +106,26 export class ObservableImpl<T> implement
132 106 scan(accumulator: AccumulatorFn<T, T>): Observable<T>;
133 107 scan<A>(...args: [AccumulatorFn<T, A>, A] | [AccumulatorFn<T, T>]) {
134 108 if (args.length === 1) {
135 return new ObservableImpl<T | A>(({ next, ...rest }) => {
136 109 const [accumulator] = args;
110
111 return new ObservableImpl<T | A>(({ next, ...rest }, subscription) => {
137 112 let _acc: T;
138 113 let index = 0;
139 return this._producer({
140 next: next !== noop ? (v: T) => next(index++ === 0 ? _acc = v : _acc = accumulator(_acc, v)) : noop,
114 (0, this._producer)({
115 next: next !== noop ? (v: T) => next(
116 index++ === 0 ? _acc = v : _acc = accumulator(_acc, v)
117 ) : noop,
141 118 ...rest
142 });
119 }, subscription);
143 120 });
144 121 } else {
145 return new ObservableImpl<T | A>(({ next, ...rest }) => {
146 122 const [accumulator, initial] = args;
123 return new ObservableImpl<T | A>(({ next, ...rest }, subscription) => {
147 124 let _acc = initial;
148 return this._producer({
125 (0, this._producer)({
149 126 next: next !== noop ? (v: T) => next(_acc = accumulator(_acc, v)) : noop,
150 127 ...rest
151 });
128 }, subscription);
152 129 });
153 130 }
154 131 }
@@ -158,11 +135,11 export class ObservableImpl<T> implement
158 135 reduce<A>(...args: [AccumulatorFn<T, A>, A] | [AccumulatorFn<T, T>]) {
159 136
160 137 if (args.length === 1) {
161 return new ObservableImpl<T | A>(({ next, complete, error }) => {
162 138 const [accumulator] = args;
139 return new ObservableImpl<T>(({ next, complete, error }, subscription) => {
163 140 let _acc: T;
164 141 let index = 0;
165 return this._producer({
142 (0, this._producer)({
166 143 next: next !== noop ? (v: T) => {
167 144 _acc = index++ === 0 ? v : accumulator(_acc, v);
168 145 } : noop,
@@ -175,13 +152,13 export class ObservableImpl<T> implement
175 152 }
176 153 },
177 154 error
178 });
155 }, subscription);
179 156 });
180 157 } else {
181 return new ObservableImpl<T | A>(({ next, complete, error }) => {
182 158 const [accumulator, initial] = args;
159 return new ObservableImpl<A>(({ next, complete, error }, subscription) => {
183 160 let _acc = initial;
184 return this._producer({
161 (0, this._producer)({
185 162 next: next !== noop ? (v: T) => {
186 163 _acc = accumulator(_acc, v);
187 164 } : noop,
@@ -190,24 +167,23 export class ObservableImpl<T> implement
190 167 complete();
191 168 },
192 169 error
193 });
170 }, subscription);
194 171 });
195 172 }
196 173 }
197 174
198 175 cat(...seq: Subscribable<T>[]) {
199 return new ObservableImpl<T>(({ next, complete: final, ...rest }) => {
200 let cleanup: () => void;
176 return new ObservableImpl<T>(({ next, complete: final, ...rest }, subscription) => {
201 177 const len = seq.length;
202 178 const complete = (i: number) => i < len ?
203 179 () => {
204 const subscription = seq[i].subscribe({ next, complete: complete(i + 1), ...rest });
205 cleanup = subscription.unsubscribe.bind(subscription);
206 } : final;
180 h = seq[i].subscribe({ next, complete: complete(i + 1), ...rest });
181 } : () => subscription.finalize() && final();
182
207 183
208 cleanup = this._producer({ next, complete: complete(0), ...rest }) ?? noop;
184 let h = this.subscribe({ next, complete: complete(0), ...rest });
185 subscription.teardown(() => h.unsubscribe());
209 186
210 return () => cleanup();
211 187 });
212 188 }
213 189
@@ -219,13 +195,14 export class ObservableImpl<T> implement
219 195 return new Promise<T>((resolve, reject) => {
220 196 ct.throwIfRequested();
221 197
222 const { teardown, finalize, isClosed, error } = subscription(reject, ct);
223 teardown((0, this._producer)({
224 next: (item: T) => finalize() && resolve(item),
225 complete: () => finalize() && reject(new Error("The sequence is empty")),
226 error,
227 isClosed
228 }));
198 const subscription = new SubscriptionImpl();
199 subscription.cancellable(ct, reject);
200
201 (0, this._producer)({
202 next: resolve,
203 complete: () => reject(new Error("The sequence is empty")),
204 error: reject,
205 }, subscription);
229 206
230 207 });
231 208 }
@@ -234,15 +211,16 export class ObservableImpl<T> implement
234 211 return new Promise<T[]>((resolve, reject) => {
235 212 ct.throwIfRequested();
236 213
214 const subscription = new SubscriptionImpl();
215 subscription.cancellable(ct, reject);
216
237 217 const acc: T[] = [];
238 218
239 const { teardown, finalize, isClosed, error } = subscription(reject, ct);
240 teardown((0, this._producer)({
241 next: (item: T) => isClosed() ? void (0) : acc.push(item),
242 complete: () => finalize() && resolve(acc),
243 error,
244 isClosed
245 }));
219 (0, this._producer)({
220 next: (item: T) => acc.push(item),
221 complete: () => resolve(acc),
222 error: reject
223 }, subscription);
246 224 });
247 225 }
248 226
@@ -250,14 +228,14 export class ObservableImpl<T> implement
250 228 return new Promise<void>((resolve, reject) => {
251 229 ct.throwIfRequested();
252 230
253 const { teardown, finalize, isClosed, error } = subscription(reject, ct);
231 const subscription = new SubscriptionImpl();
232 subscription.cancellable(ct, reject);
254 233
255 teardown((0, this._producer)({
256 next: item => !isClosed() && next(item),
257 complete: () => finalize() && resolve(),
258 error,
259 isClosed
260 }));
234 (0, this._producer)({
235 next,
236 complete: resolve,
237 error: reject
238 }, subscription);
261 239 });
262 240 }
263 241
General Comments 0
You need to be logged in to leave comments. Login now