##// END OF EJS Templates
Implemented subscription SubscriptionImpl, fixed subscription resource management
cin -
r158:078eca3dc271 v1.10.3 default
parent child
Show More
@@ -0,0 +1,58
1 import { ICancellation } from "@implab/core-amd/interfaces";
2
3 export interface Subscription {
4 finalize(): boolean;
5
6 isClosed(): boolean;
7
8 teardown(cleanup: void | (() => void)): void;
9
10 around<T>(next: (item: T) => void): (item: T) => void;
11
12 unsubscribe(): void;
13 }
14
15 const noop = () => {};
16
17
18 export class SubscriptionImpl implements Subscription {
19
20 private _done = false;
21
22 private _cleanup = noop;
23
24 finalize() {
25 return this._done ? false : ((0,this._cleanup)(), this._done = true);
26 }
27
28 isClosed() {
29 return this._done;
30 }
31
32 teardown(cleanup: void | (() => void)) {
33 if (cleanup) {
34 if (this._done) {
35 cleanup();
36 } else {
37 const _prev = this._cleanup;
38 this._cleanup = _prev === noop ? cleanup : () => (_prev(), cleanup());
39 }
40 }
41 }
42
43 cancellable(ct: ICancellation, reject: (err: unknown) => void) {
44 if (ct.isSupported()) {
45 const h = ct.register(reason => this.finalize() && reject(reason));
46 this.teardown(() => h.destroy());
47 }
48 }
49
50 unsubscribe() {
51 this.finalize();
52 }
53
54 around<T>(next: (item: T) => void): (item: T) => void {
55 return (item) => this._done ? void(0) : next(item);
56 }
57
58 } No newline at end of file
@@ -1,69 +1,69
1 import { isPromise } from "@implab/core-amd/safe";
1 import { isPromise } from "@implab/core-amd/safe";
2 import { Unsubscribable, Producer, Observable, Subscribable } from "./observable/interfaces";
2 import { Unsubscribable, Producer, Observable, Subscribable } from "./observable/interfaces";
3 import { ObservableImpl } from "./observable/ObservableImpl";
3 import { ObservableImpl, fuse } from "./observable/ObservableImpl";
4
4
5 export const isUnsubscribable = (v: unknown): v is Unsubscribable =>
5 export const isUnsubscribable = (v: unknown): v is Unsubscribable =>
6 v !== null && v !== undefined && typeof (v as Unsubscribable).unsubscribe === "function";
6 v !== null && v !== undefined && typeof (v as Unsubscribable).unsubscribe === "function";
7
7
8 export const isSubscribable = <T = unknown>(v: unknown): v is Subscribable<T> =>
8 export const isSubscribable = <T = unknown>(v: unknown): v is Subscribable<T> =>
9 v !== null && v !== undefined && typeof (v as Subscribable<unknown>).subscribe === "function";
9 v !== null && v !== undefined && typeof (v as Subscribable<unknown>).subscribe === "function";
10
10
11 export const observe = <T>(producer: Producer<T>): Observable<T> => new ObservableImpl<T>(producer);
11 export const observe = <T>(producer: Producer<T>): Observable<T> => new ObservableImpl<T>(fuse(producer));
12
12
13 /** Converts an array to the observable sequence of its elements. */
13 /** Converts an array to the observable sequence of its elements. */
14 export const ofArray = <T>(items: T[]): Observable<T> => new ObservableImpl<T>(
14 export const ofArray = <T>(items: T[]): Observable<T> => new ObservableImpl<T>(
15 ({ next, complete }) => (
15 ({ next, complete }) => (
16 items.forEach(next),
16 items.forEach(next),
17 complete()
17 complete()
18 )
18 )
19 );
19 );
20
20
21 /** Converts a subscribable to the observable */
21 /** Converts a subscribable to the observable */
22 export const ofSubscribable = <T>(subscribable: Subscribable<T>) =>
22 export const ofSubscribable = <T>(subscribable: Subscribable<T>) =>
23 observe<T>(sink => {
23 observe<T>(sink => {
24 const subscription = subscribable.subscribe(sink);
24 const subscription = subscribable.subscribe(sink);
25 return () => subscription.unsubscribe();
25 return () => subscription.unsubscribe();
26 });
26 });
27
27
28 const of1 = <T>(item: T | PromiseLike<T>) => observe<T>(
28 const of1 = <T>(item: T | PromiseLike<T>) => observe<T>(
29 ({ next, error, complete }) =>
29 ({ next, error, complete }) =>
30 isPromise(item) ?
30 isPromise(item) ?
31 void item.then(
31 void item.then(
32 v => (next(v), complete()),
32 v => (next(v), complete()),
33 error
33 error
34 ) :
34 ) :
35 (next(item), complete())
35 (next(item), complete())
36 );
36 );
37
37
38 /** Converts a list of parameter values to the observable sequence. The
38 /** Converts a list of parameter values to the observable sequence. The
39 * order of elements in the list will be preserved in the resulting sequence.
39 * order of elements in the list will be preserved in the resulting sequence.
40 */
40 */
41 export const of = <T>(...items: (T | PromiseLike<T>)[]) => items.length === 1 ?
41 export const of = <T>(...items: (T | PromiseLike<T>)[]) => items.length === 1 ?
42 of1(items[0]) :
42 of1(items[0]) :
43 observe<T>(
43 observe<T>(
44 ({ next, error, complete, isClosed }) => {
44 ({ next, error, complete, isClosed }) => {
45 const n = items.length;
45 const n = items.length;
46
46
47 const _next = (start: number) => {
47 const _next = (start: number) => {
48 if (start > 0 && isClosed()) // when resumed
48 if (start > 0 && isClosed()) // when resumed
49 return;
49 return;
50
50
51 for (let i = start; i < n; i++) {
51 for (let i = start; i < n; i++) {
52 const r = items[i];
52 const r = items[i];
53 if (isPromise(r)) {
53 if (isPromise(r)) {
54 r.then(v => (next(v), _next(i + 1)), error);
54 r.then(v => (next(v), _next(i + 1)), error);
55 return; // suspend
55 return; // suspend
56 } else {
56 } else {
57 next(r);
57 next(r);
58 }
58 }
59 }
59 }
60 complete();
60 complete();
61 };
61 };
62
62
63 _next(0);
63 _next(0);
64 }
64 }
65 );
65 );
66
66
67 export const empty: Observable<never> = new ObservableImpl<never>(({ complete }) => complete());
67 export const empty: Observable<never> = new ObservableImpl<never>(({ complete }) => complete());
68
68
69 export type * from "./observable/interfaces"; No newline at end of file
69 export type * from "./observable/interfaces";
@@ -1,264 +1,242
1 import { id as mid } from "module";
1 import { id as mid } from "module";
2 import { AccumulatorFn, Observable, Observer, OperatorFn, Producer, Sink, Subscribable, Unsubscribable } from "./interfaces";
2 import { AccumulatorFn, Observable, Observer, OperatorFn, Producer, Sink, Subscribable, Unsubscribable } from "./interfaces";
3 import { TraceSource } from "@implab/core-amd/log/TraceSource";
3 import { TraceSource } from "@implab/core-amd/log/TraceSource";
4 import { Predicate } from "@implab/core-amd/interfaces";
4 import { Predicate } from "@implab/core-amd/interfaces";
5 import { Cancellation } from "@implab/core-amd/Cancellation";
5 import { Cancellation } from "@implab/core-amd/Cancellation";
6 import { Subscription, SubscriptionImpl } from "./SubscriptionImpl";
6
7
7 const trace = TraceSource.get(mid);
8 const trace = TraceSource.get(mid);
8
9
9 const noop = () => { };
10 const noop = () => { };
10
11
11 const errorFallback = (e: unknown) => trace.error("Unhandled observable error: {0}", e);
12 const errorFallback = (e: unknown) => trace.error("Unhandled observable error: {0}", e);
12
13
13 const subscription = (reject: (err: unknown) => void, ct = Cancellation.none) => {
14 let done = false;
15 let cleanup = noop;
16
17 const finalize = () => done ? false : (cleanup(), done = true);
18 const isClosed = () => done;
19
20 const teardown = (cb: void | (() => void)) => {
21 if (cb) {
22 if (done) {
23 cb();
24 } else {
25 const _prev = cleanup;
26 cleanup = _prev === noop ? cb : () => (_prev(), cb());
27 }
28 }
29 };
30
31 const error = (err: unknown) => finalize() && reject(err);
32
33 if (ct.isSupported()) {
34 const h = ct.register(error);
35 teardown(() => h.destroy());
36 }
37
38 return { finalize, isClosed, teardown, error };
39 };
40
14
41 type FusedSink<T> = Omit<Sink<T>, "isClosed">;
15 type FusedSink<T> = Omit<Sink<T>, "isClosed">;
42
16
43 export const fuse = <T>(producer: Producer<T>) => (consumer: FusedSink<T>) => {
17 export const fuse = <T>(producer: Producer<T>) =>
44
18 (sink: FusedSink<T>, subscription: Subscription): Unsubscribable => {
45 const reject = (err: unknown) => consumer.error ? consumer.error(err) : errorFallback(err);
46 const { teardown, isClosed, finalize, error } = subscription(reject);
47
48 const next = consumer.next ? consumer.next.bind(consumer) : noop;
49
19
50 teardown(producer({
20 subscription.teardown(producer({
51 next: next !== noop ? (v => isClosed() || next(v)) : noop,
21 next: subscription.around(sink.next),
52 error,
22 complete: () => subscription.finalize() && sink.complete(),
53 complete: () => finalize() && consumer.complete && consumer.complete(),
23 error: err => subscription.finalize() && sink.error(err),
54 isClosed
24 isClosed: subscription.isClosed.bind(subscription)
55 }));
25 }));
56
26
57 return { unsubscribe: finalize };
27 return subscription;
58 };
28 };
59
29
60 type FusedProducer<T> = (consumer: FusedSink<T>) => void | (() => void);
30 type FusedProducer<T> = (sink: FusedSink<T>, subscription: Subscription) => void;
61
31
62 export class ObservableImpl<T> implements Observable<T> {
32 export class ObservableImpl<T> implements Observable<T> {
63
33
64 private readonly _producer: FusedProducer<T>;
34 private readonly _producer: FusedProducer<T>;
65
35
66 constructor(producer: FusedProducer<T>) {
36 constructor(producer: FusedProducer<T>) {
67 this._producer = producer;
37 this._producer = producer;
68 }
38 }
69
39
70 subscribe(consumer: Observer<T> = {}) {
40 subscribe(consumer: Observer<T> = {}): Unsubscribable {
71 const reject = (err: unknown) => consumer.error ? consumer.error(err) : errorFallback(err);
41 const error = (err: unknown) => consumer.error ? consumer.error(err) : errorFallback(err);
72 const next = consumer.next ? consumer.next.bind(consumer) : noop;
42 const next = consumer.next ? consumer.next.bind(consumer) : noop;
73
43 const complete = () => consumer.complete ? consumer.complete() : void (0);
74 const { teardown, isClosed, finalize, error } = subscription(reject);
75
44
76 teardown((0, this._producer)({
45 const subscription = new SubscriptionImpl();
77 next: next !== noop ? (v => isClosed() || next(v)) : noop,
78 error,
79 complete: () => finalize() && consumer.complete && consumer.complete()
80 }));
81
46
82 return { unsubscribe: finalize };
47 this._producer({ next, error, complete }, subscription);
48
49 return subscription;
83 }
50 }
84
51
85 map<T2>(mapper: (value: T) => T2) {
52 map<T2>(mapper: (value: T) => T2) {
86 return new ObservableImpl<T2>(({ next, ...rest }) =>
53 return new ObservableImpl<T2>(({ next, ...rest }, subscription) =>
87 this._producer({
54 (0, this._producer)(
88 next: next !== noop ? (v: T) => next(mapper(v)) : noop,
55 {
89 ...rest
56 next: next !== noop ? (v: T) => next(mapper(v)) : noop,
90 }));
57 ...rest
58 }, subscription
59 ));
91 }
60 }
92
61
93 tap({ next: tapNext, complete: tapComplete, error: tapError }: Observer<T>) {
62 tap({ next: tapNext, complete: tapComplete, error: tapError }: Observer<T>) {
94 return new ObservableImpl<T>(({ next, complete, error }) =>
63 return new ObservableImpl<T>(({ next, complete, error }, subscription) =>
95 this._producer({
64 (0, this._producer)(
96 next: tapNext ? (v => (tapNext(v), next(v))) : next,
65 {
97 complete: tapComplete ? (() => (tapComplete(), complete())) : complete,
66 next: tapNext ? (v => (tapNext(v), next(v))) : next,
98 error: tapError ? (e => (tapError(e), error(e))) : error
67 complete: tapComplete ? (() => (tapComplete(), complete())) : complete,
99 }));
68 error: tapError ? (e => (tapError(e), error(e))) : error
69 },
70 subscription
71 ));
100 }
72 }
101
73
102 filter(predicate: Predicate<T>) {
74 filter(predicate: Predicate<T>) {
103 return new ObservableImpl<T>(({ next, ...rest }) =>
75 return new ObservableImpl<T>(({ next, ...rest }, subscription) =>
104 this._producer({
76 (0, this._producer)(
105 next: next !== noop ? (v: T) => predicate(v) ? next(v) : void (0) : noop,
77 {
106 ...rest
78 next: next !== noop ? (v: T) => predicate(v) ? next(v) : void (0) : noop,
107 })
79 ...rest
108 );
80 },
81 subscription
82 ));
109 }
83 }
110
84
111 until(predicate: Predicate<T>) {
85 until(predicate: Predicate<T>) {
112 return new ObservableImpl<T>(({ next, complete, ...rest }) =>
86 return new ObservableImpl<T>(({ next, complete, ...rest }, subscription) =>
113 this._producer({
87 (0, this._producer)({
114 next: v => predicate(v) ? complete() : next(v),
88 next: v => predicate(v) ? subscription.finalize() && complete() : next(v),
115 complete,
89 complete,
116 ...rest
90 ...rest
117 })
91 }, subscription)
118 );
92 );
119 }
93 }
120
94
121 while(predicate: Predicate<T>) {
95 while(predicate: Predicate<T>) {
122 return new ObservableImpl<T>(({ next, complete, ...rest }) =>
96 return new ObservableImpl<T>(({ next, complete, ...rest }, subscription) =>
123 this._producer({
97 (0, this._producer)({
124 next: v => predicate(v) ? next(v) : complete(),
98 next: v => predicate(v) ? next(v) : subscription.finalize() && complete(),
125 complete,
99 complete,
126 ...rest
100 ...rest
127 })
101 }, subscription)
128 );
102 );
129 }
103 }
130
104
131 scan<A>(accumulator: AccumulatorFn<T, A>, initial: A): Observable<A>;
105 scan<A>(accumulator: AccumulatorFn<T, A>, initial: A): Observable<A>;
132 scan(accumulator: AccumulatorFn<T, T>): Observable<T>;
106 scan(accumulator: AccumulatorFn<T, T>): Observable<T>;
133 scan<A>(...args: [AccumulatorFn<T, A>, A] | [AccumulatorFn<T, T>]) {
107 scan<A>(...args: [AccumulatorFn<T, A>, A] | [AccumulatorFn<T, T>]) {
134 if (args.length === 1) {
108 if (args.length === 1) {
135 return new ObservableImpl<T | A>(({ next, ...rest }) => {
109 const [accumulator] = args;
136 const [accumulator] = args;
110
111 return new ObservableImpl<T | A>(({ next, ...rest }, subscription) => {
137 let _acc: T;
112 let _acc: T;
138 let index = 0;
113 let index = 0;
139 return this._producer({
114 (0, this._producer)({
140 next: next !== noop ? (v: T) => next(index++ === 0 ? _acc = v : _acc = accumulator(_acc, v)) : noop,
115 next: next !== noop ? (v: T) => next(
116 index++ === 0 ? _acc = v : _acc = accumulator(_acc, v)
117 ) : noop,
141 ...rest
118 ...rest
142 });
119 }, subscription);
143 });
120 });
144 } else {
121 } else {
145 return new ObservableImpl<T | A>(({ next, ...rest }) => {
122 const [accumulator, initial] = args;
146 const [accumulator, initial] = args;
123 return new ObservableImpl<T | A>(({ next, ...rest }, subscription) => {
147 let _acc = initial;
124 let _acc = initial;
148 return this._producer({
125 (0, this._producer)({
149 next: next !== noop ? (v: T) => next(_acc = accumulator(_acc, v)) : noop,
126 next: next !== noop ? (v: T) => next(_acc = accumulator(_acc, v)) : noop,
150 ...rest
127 ...rest
151 });
128 }, subscription);
152 });
129 });
153 }
130 }
154 }
131 }
155
132
156 reduce<A>(accumulator: AccumulatorFn<T, A>, initial: A): Observable<A>;
133 reduce<A>(accumulator: AccumulatorFn<T, A>, initial: A): Observable<A>;
157 reduce(accumulator: AccumulatorFn<T, T>): Observable<T>;
134 reduce(accumulator: AccumulatorFn<T, T>): Observable<T>;
158 reduce<A>(...args: [AccumulatorFn<T, A>, A] | [AccumulatorFn<T, T>]) {
135 reduce<A>(...args: [AccumulatorFn<T, A>, A] | [AccumulatorFn<T, T>]) {
159
136
160 if (args.length === 1) {
137 if (args.length === 1) {
161 return new ObservableImpl<T | A>(({ next, complete, error }) => {
138 const [accumulator] = args;
162 const [accumulator] = args;
139 return new ObservableImpl<T>(({ next, complete, error }, subscription) => {
163 let _acc: T;
140 let _acc: T;
164 let index = 0;
141 let index = 0;
165 return this._producer({
142 (0, this._producer)({
166 next: next !== noop ? (v: T) => {
143 next: next !== noop ? (v: T) => {
167 _acc = index++ === 0 ? v : accumulator(_acc, v);
144 _acc = index++ === 0 ? v : accumulator(_acc, v);
168 } : noop,
145 } : noop,
169 complete: () => {
146 complete: () => {
170 if (index === 0) {
147 if (index === 0) {
171 error(new Error("The sequence can't be empty"));
148 error(new Error("The sequence can't be empty"));
172 } else {
149 } else {
173 next(_acc);
150 next(_acc);
174 complete();
151 complete();
175 }
152 }
176 },
153 },
177 error
154 error
178 });
155 }, subscription);
179 });
156 });
180 } else {
157 } else {
181 return new ObservableImpl<T | A>(({ next, complete, error }) => {
158 const [accumulator, initial] = args;
182 const [accumulator, initial] = args;
159 return new ObservableImpl<A>(({ next, complete, error }, subscription) => {
183 let _acc = initial;
160 let _acc = initial;
184 return this._producer({
161 (0, this._producer)({
185 next: next !== noop ? (v: T) => {
162 next: next !== noop ? (v: T) => {
186 _acc = accumulator(_acc, v);
163 _acc = accumulator(_acc, v);
187 } : noop,
164 } : noop,
188 complete: () => {
165 complete: () => {
189 next(_acc);
166 next(_acc);
190 complete();
167 complete();
191 },
168 },
192 error
169 error
193 });
170 }, subscription);
194 });
171 });
195 }
172 }
196 }
173 }
197
174
198 cat(...seq: Subscribable<T>[]) {
175 cat(...seq: Subscribable<T>[]) {
199 return new ObservableImpl<T>(({ next, complete: final, ...rest }) => {
176 return new ObservableImpl<T>(({ next, complete: final, ...rest }, subscription) => {
200 let cleanup: () => void;
201 const len = seq.length;
177 const len = seq.length;
202 const complete = (i: number) => i < len ?
178 const complete = (i: number) => i < len ?
203 () => {
179 () => {
204 const subscription = seq[i].subscribe({ next, complete: complete(i + 1), ...rest });
180 h = seq[i].subscribe({ next, complete: complete(i + 1), ...rest });
205 cleanup = subscription.unsubscribe.bind(subscription);
181 } : () => subscription.finalize() && final();
206 } : final;
182
207
183
208 cleanup = this._producer({ next, complete: complete(0), ...rest }) ?? noop;
184 let h = this.subscribe({ next, complete: complete(0), ...rest });
185 subscription.teardown(() => h.unsubscribe());
209
186
210 return () => cleanup();
211 });
187 });
212 }
188 }
213
189
214 pipe<U>(op: OperatorFn<T, U>): Observable<U> {
190 pipe<U>(op: OperatorFn<T, U>): Observable<U> {
215 return op(this);
191 return op(this);
216 }
192 }
217
193
218 next(ct = Cancellation.none) {
194 next(ct = Cancellation.none) {
219 return new Promise<T>((resolve, reject) => {
195 return new Promise<T>((resolve, reject) => {
220 ct.throwIfRequested();
196 ct.throwIfRequested();
221
197
222 const { teardown, finalize, isClosed, error } = subscription(reject, ct);
198 const subscription = new SubscriptionImpl();
223 teardown((0, this._producer)({
199 subscription.cancellable(ct, reject);
224 next: (item: T) => finalize() && resolve(item),
200
225 complete: () => finalize() && reject(new Error("The sequence is empty")),
201 (0, this._producer)({
226 error,
202 next: resolve,
227 isClosed
203 complete: () => reject(new Error("The sequence is empty")),
228 }));
204 error: reject,
205 }, subscription);
229
206
230 });
207 });
231 }
208 }
232
209
233 collect(ct = Cancellation.none) {
210 collect(ct = Cancellation.none) {
234 return new Promise<T[]>((resolve, reject) => {
211 return new Promise<T[]>((resolve, reject) => {
235 ct.throwIfRequested();
212 ct.throwIfRequested();
236
213
214 const subscription = new SubscriptionImpl();
215 subscription.cancellable(ct, reject);
216
237 const acc: T[] = [];
217 const acc: T[] = [];
238
218
239 const { teardown, finalize, isClosed, error } = subscription(reject, ct);
219 (0, this._producer)({
240 teardown((0, this._producer)({
220 next: (item: T) => acc.push(item),
241 next: (item: T) => isClosed() ? void (0) : acc.push(item),
221 complete: () => resolve(acc),
242 complete: () => finalize() && resolve(acc),
222 error: reject
243 error,
223 }, subscription);
244 isClosed
245 }));
246 });
224 });
247 }
225 }
248
226
249 forEach(next: (item: T) => void, ct = Cancellation.none): Promise<void> {
227 forEach(next: (item: T) => void, ct = Cancellation.none): Promise<void> {
250 return new Promise<void>((resolve, reject) => {
228 return new Promise<void>((resolve, reject) => {
251 ct.throwIfRequested();
229 ct.throwIfRequested();
252
230
253 const { teardown, finalize, isClosed, error } = subscription(reject, ct);
231 const subscription = new SubscriptionImpl();
232 subscription.cancellable(ct, reject);
254
233
255 teardown((0, this._producer)({
234 (0, this._producer)({
256 next: item => !isClosed() && next(item),
235 next,
257 complete: () => finalize() && resolve(),
236 complete: resolve,
258 error,
237 error: reject
259 isClosed
238 }, subscription);
260 }));
261 });
239 });
262 }
240 }
263
241
264 } No newline at end of file
242 }
General Comments 0
You need to be logged in to leave comments. Login now