##// END OF EJS Templates
Implemented subscription SubscriptionImpl, fixed subscription resource management
cin -
r158:078eca3dc271 v1.10.3 default
parent child
Show More
@@ -0,0 +1,58
1 import { ICancellation } from "@implab/core-amd/interfaces";
2
3 export interface Subscription {
4 finalize(): boolean;
5
6 isClosed(): boolean;
7
8 teardown(cleanup: void | (() => void)): void;
9
10 around<T>(next: (item: T) => void): (item: T) => void;
11
12 unsubscribe(): void;
13 }
14
15 const noop = () => {};
16
17
18 export class SubscriptionImpl implements Subscription {
19
20 private _done = false;
21
22 private _cleanup = noop;
23
24 finalize() {
25 return this._done ? false : ((0,this._cleanup)(), this._done = true);
26 }
27
28 isClosed() {
29 return this._done;
30 }
31
32 teardown(cleanup: void | (() => void)) {
33 if (cleanup) {
34 if (this._done) {
35 cleanup();
36 } else {
37 const _prev = this._cleanup;
38 this._cleanup = _prev === noop ? cleanup : () => (_prev(), cleanup());
39 }
40 }
41 }
42
43 cancellable(ct: ICancellation, reject: (err: unknown) => void) {
44 if (ct.isSupported()) {
45 const h = ct.register(reason => this.finalize() && reject(reason));
46 this.teardown(() => h.destroy());
47 }
48 }
49
50 unsubscribe() {
51 this.finalize();
52 }
53
54 around<T>(next: (item: T) => void): (item: T) => void {
55 return (item) => this._done ? void(0) : next(item);
56 }
57
58 } No newline at end of file
@@ -1,69 +1,69
1 1 import { isPromise } from "@implab/core-amd/safe";
2 2 import { Unsubscribable, Producer, Observable, Subscribable } from "./observable/interfaces";
3 import { ObservableImpl } from "./observable/ObservableImpl";
3 import { ObservableImpl, fuse } from "./observable/ObservableImpl";
4 4
5 5 export const isUnsubscribable = (v: unknown): v is Unsubscribable =>
6 6 v !== null && v !== undefined && typeof (v as Unsubscribable).unsubscribe === "function";
7 7
8 8 export const isSubscribable = <T = unknown>(v: unknown): v is Subscribable<T> =>
9 9 v !== null && v !== undefined && typeof (v as Subscribable<unknown>).subscribe === "function";
10 10
11 export const observe = <T>(producer: Producer<T>): Observable<T> => new ObservableImpl<T>(producer);
11 export const observe = <T>(producer: Producer<T>): Observable<T> => new ObservableImpl<T>(fuse(producer));
12 12
13 13 /** Converts an array to the observable sequence of its elements. */
14 14 export const ofArray = <T>(items: T[]): Observable<T> => new ObservableImpl<T>(
15 15 ({ next, complete }) => (
16 16 items.forEach(next),
17 17 complete()
18 18 )
19 19 );
20 20
21 21 /** Converts a subscribable to the observable */
22 22 export const ofSubscribable = <T>(subscribable: Subscribable<T>) =>
23 23 observe<T>(sink => {
24 24 const subscription = subscribable.subscribe(sink);
25 25 return () => subscription.unsubscribe();
26 26 });
27 27
28 28 const of1 = <T>(item: T | PromiseLike<T>) => observe<T>(
29 29 ({ next, error, complete }) =>
30 30 isPromise(item) ?
31 31 void item.then(
32 32 v => (next(v), complete()),
33 33 error
34 34 ) :
35 35 (next(item), complete())
36 36 );
37 37
38 38 /** Converts a list of parameter values to the observable sequence. The
39 39 * order of elements in the list will be preserved in the resulting sequence.
40 40 */
41 41 export const of = <T>(...items: (T | PromiseLike<T>)[]) => items.length === 1 ?
42 42 of1(items[0]) :
43 43 observe<T>(
44 44 ({ next, error, complete, isClosed }) => {
45 45 const n = items.length;
46 46
47 47 const _next = (start: number) => {
48 48 if (start > 0 && isClosed()) // when resumed
49 49 return;
50 50
51 51 for (let i = start; i < n; i++) {
52 52 const r = items[i];
53 53 if (isPromise(r)) {
54 54 r.then(v => (next(v), _next(i + 1)), error);
55 55 return; // suspend
56 56 } else {
57 57 next(r);
58 58 }
59 59 }
60 60 complete();
61 61 };
62 62
63 63 _next(0);
64 64 }
65 65 );
66 66
67 67 export const empty: Observable<never> = new ObservableImpl<never>(({ complete }) => complete());
68 68
69 69 export type * from "./observable/interfaces"; No newline at end of file
@@ -1,264 +1,242
1 1 import { id as mid } from "module";
2 2 import { AccumulatorFn, Observable, Observer, OperatorFn, Producer, Sink, Subscribable, Unsubscribable } from "./interfaces";
3 3 import { TraceSource } from "@implab/core-amd/log/TraceSource";
4 4 import { Predicate } from "@implab/core-amd/interfaces";
5 5 import { Cancellation } from "@implab/core-amd/Cancellation";
6 import { Subscription, SubscriptionImpl } from "./SubscriptionImpl";
6 7
7 8 const trace = TraceSource.get(mid);
8 9
9 10 const noop = () => { };
10 11
11 12 const errorFallback = (e: unknown) => trace.error("Unhandled observable error: {0}", e);
12 13
13 const subscription = (reject: (err: unknown) => void, ct = Cancellation.none) => {
14 let done = false;
15 let cleanup = noop;
16
17 const finalize = () => done ? false : (cleanup(), done = true);
18 const isClosed = () => done;
19
20 const teardown = (cb: void | (() => void)) => {
21 if (cb) {
22 if (done) {
23 cb();
24 } else {
25 const _prev = cleanup;
26 cleanup = _prev === noop ? cb : () => (_prev(), cb());
27 }
28 }
29 };
30
31 const error = (err: unknown) => finalize() && reject(err);
32
33 if (ct.isSupported()) {
34 const h = ct.register(error);
35 teardown(() => h.destroy());
36 }
37
38 return { finalize, isClosed, teardown, error };
39 };
40 14
41 15 type FusedSink<T> = Omit<Sink<T>, "isClosed">;
42 16
43 export const fuse = <T>(producer: Producer<T>) => (consumer: FusedSink<T>) => {
44
45 const reject = (err: unknown) => consumer.error ? consumer.error(err) : errorFallback(err);
46 const { teardown, isClosed, finalize, error } = subscription(reject);
47
48 const next = consumer.next ? consumer.next.bind(consumer) : noop;
17 export const fuse = <T>(producer: Producer<T>) =>
18 (sink: FusedSink<T>, subscription: Subscription): Unsubscribable => {
49 19
50 teardown(producer({
51 next: next !== noop ? (v => isClosed() || next(v)) : noop,
52 error,
53 complete: () => finalize() && consumer.complete && consumer.complete(),
54 isClosed
55 }));
20 subscription.teardown(producer({
21 next: subscription.around(sink.next),
22 complete: () => subscription.finalize() && sink.complete(),
23 error: err => subscription.finalize() && sink.error(err),
24 isClosed: subscription.isClosed.bind(subscription)
25 }));
56 26
57 return { unsubscribe: finalize };
58 };
27 return subscription;
28 };
59 29
60 type FusedProducer<T> = (consumer: FusedSink<T>) => void | (() => void);
30 type FusedProducer<T> = (sink: FusedSink<T>, subscription: Subscription) => void;
61 31
62 32 export class ObservableImpl<T> implements Observable<T> {
63 33
64 34 private readonly _producer: FusedProducer<T>;
65 35
66 36 constructor(producer: FusedProducer<T>) {
67 37 this._producer = producer;
68 38 }
69 39
70 subscribe(consumer: Observer<T> = {}) {
71 const reject = (err: unknown) => consumer.error ? consumer.error(err) : errorFallback(err);
40 subscribe(consumer: Observer<T> = {}): Unsubscribable {
41 const error = (err: unknown) => consumer.error ? consumer.error(err) : errorFallback(err);
72 42 const next = consumer.next ? consumer.next.bind(consumer) : noop;
73
74 const { teardown, isClosed, finalize, error } = subscription(reject);
43 const complete = () => consumer.complete ? consumer.complete() : void (0);
75 44
76 teardown((0, this._producer)({
77 next: next !== noop ? (v => isClosed() || next(v)) : noop,
78 error,
79 complete: () => finalize() && consumer.complete && consumer.complete()
80 }));
45 const subscription = new SubscriptionImpl();
81 46
82 return { unsubscribe: finalize };
47 this._producer({ next, error, complete }, subscription);
48
49 return subscription;
83 50 }
84 51
85 52 map<T2>(mapper: (value: T) => T2) {
86 return new ObservableImpl<T2>(({ next, ...rest }) =>
87 this._producer({
88 next: next !== noop ? (v: T) => next(mapper(v)) : noop,
89 ...rest
90 }));
53 return new ObservableImpl<T2>(({ next, ...rest }, subscription) =>
54 (0, this._producer)(
55 {
56 next: next !== noop ? (v: T) => next(mapper(v)) : noop,
57 ...rest
58 }, subscription
59 ));
91 60 }
92 61
93 62 tap({ next: tapNext, complete: tapComplete, error: tapError }: Observer<T>) {
94 return new ObservableImpl<T>(({ next, complete, error }) =>
95 this._producer({
96 next: tapNext ? (v => (tapNext(v), next(v))) : next,
97 complete: tapComplete ? (() => (tapComplete(), complete())) : complete,
98 error: tapError ? (e => (tapError(e), error(e))) : error
99 }));
63 return new ObservableImpl<T>(({ next, complete, error }, subscription) =>
64 (0, this._producer)(
65 {
66 next: tapNext ? (v => (tapNext(v), next(v))) : next,
67 complete: tapComplete ? (() => (tapComplete(), complete())) : complete,
68 error: tapError ? (e => (tapError(e), error(e))) : error
69 },
70 subscription
71 ));
100 72 }
101 73
102 74 filter(predicate: Predicate<T>) {
103 return new ObservableImpl<T>(({ next, ...rest }) =>
104 this._producer({
105 next: next !== noop ? (v: T) => predicate(v) ? next(v) : void (0) : noop,
106 ...rest
107 })
108 );
75 return new ObservableImpl<T>(({ next, ...rest }, subscription) =>
76 (0, this._producer)(
77 {
78 next: next !== noop ? (v: T) => predicate(v) ? next(v) : void (0) : noop,
79 ...rest
80 },
81 subscription
82 ));
109 83 }
110 84
111 85 until(predicate: Predicate<T>) {
112 return new ObservableImpl<T>(({ next, complete, ...rest }) =>
113 this._producer({
114 next: v => predicate(v) ? complete() : next(v),
86 return new ObservableImpl<T>(({ next, complete, ...rest }, subscription) =>
87 (0, this._producer)({
88 next: v => predicate(v) ? subscription.finalize() && complete() : next(v),
115 89 complete,
116 90 ...rest
117 })
91 }, subscription)
118 92 );
119 93 }
120 94
121 95 while(predicate: Predicate<T>) {
122 return new ObservableImpl<T>(({ next, complete, ...rest }) =>
123 this._producer({
124 next: v => predicate(v) ? next(v) : complete(),
96 return new ObservableImpl<T>(({ next, complete, ...rest }, subscription) =>
97 (0, this._producer)({
98 next: v => predicate(v) ? next(v) : subscription.finalize() && complete(),
125 99 complete,
126 100 ...rest
127 })
101 }, subscription)
128 102 );
129 103 }
130 104
131 105 scan<A>(accumulator: AccumulatorFn<T, A>, initial: A): Observable<A>;
132 106 scan(accumulator: AccumulatorFn<T, T>): Observable<T>;
133 107 scan<A>(...args: [AccumulatorFn<T, A>, A] | [AccumulatorFn<T, T>]) {
134 108 if (args.length === 1) {
135 return new ObservableImpl<T | A>(({ next, ...rest }) => {
136 const [accumulator] = args;
109 const [accumulator] = args;
110
111 return new ObservableImpl<T | A>(({ next, ...rest }, subscription) => {
137 112 let _acc: T;
138 113 let index = 0;
139 return this._producer({
140 next: next !== noop ? (v: T) => next(index++ === 0 ? _acc = v : _acc = accumulator(_acc, v)) : noop,
114 (0, this._producer)({
115 next: next !== noop ? (v: T) => next(
116 index++ === 0 ? _acc = v : _acc = accumulator(_acc, v)
117 ) : noop,
141 118 ...rest
142 });
119 }, subscription);
143 120 });
144 121 } else {
145 return new ObservableImpl<T | A>(({ next, ...rest }) => {
146 const [accumulator, initial] = args;
122 const [accumulator, initial] = args;
123 return new ObservableImpl<T | A>(({ next, ...rest }, subscription) => {
147 124 let _acc = initial;
148 return this._producer({
125 (0, this._producer)({
149 126 next: next !== noop ? (v: T) => next(_acc = accumulator(_acc, v)) : noop,
150 127 ...rest
151 });
128 }, subscription);
152 129 });
153 130 }
154 131 }
155 132
156 133 reduce<A>(accumulator: AccumulatorFn<T, A>, initial: A): Observable<A>;
157 134 reduce(accumulator: AccumulatorFn<T, T>): Observable<T>;
158 135 reduce<A>(...args: [AccumulatorFn<T, A>, A] | [AccumulatorFn<T, T>]) {
159 136
160 137 if (args.length === 1) {
161 return new ObservableImpl<T | A>(({ next, complete, error }) => {
162 const [accumulator] = args;
138 const [accumulator] = args;
139 return new ObservableImpl<T>(({ next, complete, error }, subscription) => {
163 140 let _acc: T;
164 141 let index = 0;
165 return this._producer({
142 (0, this._producer)({
166 143 next: next !== noop ? (v: T) => {
167 144 _acc = index++ === 0 ? v : accumulator(_acc, v);
168 145 } : noop,
169 146 complete: () => {
170 147 if (index === 0) {
171 148 error(new Error("The sequence can't be empty"));
172 149 } else {
173 150 next(_acc);
174 151 complete();
175 152 }
176 153 },
177 154 error
178 });
155 }, subscription);
179 156 });
180 157 } else {
181 return new ObservableImpl<T | A>(({ next, complete, error }) => {
182 const [accumulator, initial] = args;
158 const [accumulator, initial] = args;
159 return new ObservableImpl<A>(({ next, complete, error }, subscription) => {
183 160 let _acc = initial;
184 return this._producer({
161 (0, this._producer)({
185 162 next: next !== noop ? (v: T) => {
186 163 _acc = accumulator(_acc, v);
187 164 } : noop,
188 165 complete: () => {
189 166 next(_acc);
190 167 complete();
191 168 },
192 169 error
193 });
170 }, subscription);
194 171 });
195 172 }
196 173 }
197 174
198 175 cat(...seq: Subscribable<T>[]) {
199 return new ObservableImpl<T>(({ next, complete: final, ...rest }) => {
200 let cleanup: () => void;
176 return new ObservableImpl<T>(({ next, complete: final, ...rest }, subscription) => {
201 177 const len = seq.length;
202 178 const complete = (i: number) => i < len ?
203 179 () => {
204 const subscription = seq[i].subscribe({ next, complete: complete(i + 1), ...rest });
205 cleanup = subscription.unsubscribe.bind(subscription);
206 } : final;
180 h = seq[i].subscribe({ next, complete: complete(i + 1), ...rest });
181 } : () => subscription.finalize() && final();
182
207 183
208 cleanup = this._producer({ next, complete: complete(0), ...rest }) ?? noop;
184 let h = this.subscribe({ next, complete: complete(0), ...rest });
185 subscription.teardown(() => h.unsubscribe());
209 186
210 return () => cleanup();
211 187 });
212 188 }
213 189
214 190 pipe<U>(op: OperatorFn<T, U>): Observable<U> {
215 191 return op(this);
216 192 }
217 193
218 194 next(ct = Cancellation.none) {
219 195 return new Promise<T>((resolve, reject) => {
220 196 ct.throwIfRequested();
221 197
222 const { teardown, finalize, isClosed, error } = subscription(reject, ct);
223 teardown((0, this._producer)({
224 next: (item: T) => finalize() && resolve(item),
225 complete: () => finalize() && reject(new Error("The sequence is empty")),
226 error,
227 isClosed
228 }));
198 const subscription = new SubscriptionImpl();
199 subscription.cancellable(ct, reject);
200
201 (0, this._producer)({
202 next: resolve,
203 complete: () => reject(new Error("The sequence is empty")),
204 error: reject,
205 }, subscription);
229 206
230 207 });
231 208 }
232 209
233 210 collect(ct = Cancellation.none) {
234 211 return new Promise<T[]>((resolve, reject) => {
235 212 ct.throwIfRequested();
236 213
214 const subscription = new SubscriptionImpl();
215 subscription.cancellable(ct, reject);
216
237 217 const acc: T[] = [];
238 218
239 const { teardown, finalize, isClosed, error } = subscription(reject, ct);
240 teardown((0, this._producer)({
241 next: (item: T) => isClosed() ? void (0) : acc.push(item),
242 complete: () => finalize() && resolve(acc),
243 error,
244 isClosed
245 }));
219 (0, this._producer)({
220 next: (item: T) => acc.push(item),
221 complete: () => resolve(acc),
222 error: reject
223 }, subscription);
246 224 });
247 225 }
248 226
249 227 forEach(next: (item: T) => void, ct = Cancellation.none): Promise<void> {
250 228 return new Promise<void>((resolve, reject) => {
251 229 ct.throwIfRequested();
252 230
253 const { teardown, finalize, isClosed, error } = subscription(reject, ct);
231 const subscription = new SubscriptionImpl();
232 subscription.cancellable(ct, reject);
254 233
255 teardown((0, this._producer)({
256 next: item => !isClosed() && next(item),
257 complete: () => finalize() && resolve(),
258 error,
259 isClosed
260 }));
234 (0, this._producer)({
235 next,
236 complete: resolve,
237 error: reject
238 }, subscription);
261 239 });
262 240 }
263 241
264 242 } No newline at end of file
General Comments 0
You need to be logged in to leave comments. Login now