|
@@
-3,6
+3,7
import { AccumulatorFn, Observable, Obse
|
|
3
|
import { TraceSource } from "@implab/core-amd/log/TraceSource";
|
|
3
|
import { TraceSource } from "@implab/core-amd/log/TraceSource";
|
|
4
|
import { Predicate } from "@implab/core-amd/interfaces";
|
|
4
|
import { Predicate } from "@implab/core-amd/interfaces";
|
|
5
|
import { Cancellation } from "@implab/core-amd/Cancellation";
|
|
5
|
import { Cancellation } from "@implab/core-amd/Cancellation";
|
|
|
|
|
6
|
import { Subscription, SubscriptionImpl } from "./SubscriptionImpl";
|
|
6
|
|
|
7
|
|
|
7
|
const trace = TraceSource.get(mid);
|
|
8
|
const trace = TraceSource.get(mid);
|
|
8
|
|
|
9
|
|
|
@@
-10,54
+11,23
const noop = () => { };
|
|
10
|
|
|
11
|
|
|
11
|
const errorFallback = (e: unknown) => trace.error("Unhandled observable error: {0}", e);
|
|
12
|
const errorFallback = (e: unknown) => trace.error("Unhandled observable error: {0}", e);
|
|
12
|
|
|
13
|
|
|
13
|
const subscription = (reject: (err: unknown) => void, ct = Cancellation.none) => {
|
|
|
|
|
14
|
let done = false;
|
|
|
|
|
15
|
let cleanup = noop;
|
|
|
|
|
16
|
|
|
|
|
|
17
|
const finalize = () => done ? false : (cleanup(), done = true);
|
|
|
|
|
18
|
const isClosed = () => done;
|
|
|
|
|
19
|
|
|
|
|
|
20
|
const teardown = (cb: void | (() => void)) => {
|
|
|
|
|
21
|
if (cb) {
|
|
|
|
|
22
|
if (done) {
|
|
|
|
|
23
|
cb();
|
|
|
|
|
24
|
} else {
|
|
|
|
|
25
|
const _prev = cleanup;
|
|
|
|
|
26
|
cleanup = _prev === noop ? cb : () => (_prev(), cb());
|
|
|
|
|
27
|
}
|
|
|
|
|
28
|
}
|
|
|
|
|
29
|
};
|
|
|
|
|
30
|
|
|
|
|
|
31
|
const error = (err: unknown) => finalize() && reject(err);
|
|
|
|
|
32
|
|
|
|
|
|
33
|
if (ct.isSupported()) {
|
|
|
|
|
34
|
const h = ct.register(error);
|
|
|
|
|
35
|
teardown(() => h.destroy());
|
|
|
|
|
36
|
}
|
|
|
|
|
37
|
|
|
|
|
|
38
|
return { finalize, isClosed, teardown, error };
|
|
|
|
|
39
|
};
|
|
|
|
|
40
|
|
|
14
|
|
|
41
|
type FusedSink<T> = Omit<Sink<T>, "isClosed">;
|
|
15
|
type FusedSink<T> = Omit<Sink<T>, "isClosed">;
|
|
42
|
|
|
16
|
|
|
43
|
export const fuse = <T>(producer: Producer<T>) => (consumer: FusedSink<T>) => {
|
|
17
|
export const fuse = <T>(producer: Producer<T>) =>
|
|
44
|
|
|
18
|
(sink: FusedSink<T>, subscription: Subscription): Unsubscribable => {
|
|
45
|
const reject = (err: unknown) => consumer.error ? consumer.error(err) : errorFallback(err);
|
|
|
|
|
46
|
const { teardown, isClosed, finalize, error } = subscription(reject);
|
|
|
|
|
47
|
|
|
|
|
|
48
|
const next = consumer.next ? consumer.next.bind(consumer) : noop;
|
|
|
|
|
49
|
|
|
19
|
|
|
50
|
teardown(producer({
|
|
20
|
subscription.teardown(producer({
|
|
51
|
next: next !== noop ? (v => isClosed() || next(v)) : noop,
|
|
21
|
next: subscription.around(sink.next),
|
|
52
|
error,
|
|
22
|
complete: () => subscription.finalize() && sink.complete(),
|
|
53
|
complete: () => finalize() && consumer.complete && consumer.complete(),
|
|
23
|
error: err => subscription.finalize() && sink.error(err),
|
|
54
|
isClosed
|
|
24
|
isClosed: subscription.isClosed.bind(subscription)
|
|
55
|
}));
|
|
25
|
}));
|
|
56
|
|
|
26
|
|
|
57
|
return { unsubscribe: finalize };
|
|
27
|
return subscription;
|
|
58
|
};
|
|
28
|
};
|
|
59
|
|
|
29
|
|
|
60
|
type FusedProducer<T> = (consumer: FusedSink<T>) => void | (() => void);
|
|
30
|
type FusedProducer<T> = (sink: FusedSink<T>, subscription: Subscription) => void;
|
|
61
|
|
|
31
|
|
|
62
|
export class ObservableImpl<T> implements Observable<T> {
|
|
32
|
export class ObservableImpl<T> implements Observable<T> {
|
|
63
|
|
|
33
|
|
|
@@
-67,64
+37,68
export class ObservableImpl<T> implement
|
|
67
|
this._producer = producer;
|
|
37
|
this._producer = producer;
|
|
68
|
}
|
|
38
|
}
|
|
69
|
|
|
39
|
|
|
70
|
subscribe(consumer: Observer<T> = {}) {
|
|
40
|
subscribe(consumer: Observer<T> = {}): Unsubscribable {
|
|
71
|
const reject = (err: unknown) => consumer.error ? consumer.error(err) : errorFallback(err);
|
|
41
|
const error = (err: unknown) => consumer.error ? consumer.error(err) : errorFallback(err);
|
|
72
|
const next = consumer.next ? consumer.next.bind(consumer) : noop;
|
|
42
|
const next = consumer.next ? consumer.next.bind(consumer) : noop;
|
|
73
|
|
|
43
|
const complete = () => consumer.complete ? consumer.complete() : void (0);
|
|
74
|
const { teardown, isClosed, finalize, error } = subscription(reject);
|
|
|
|
|
75
|
|
|
44
|
|
|
76
|
teardown((0, this._producer)({
|
|
45
|
const subscription = new SubscriptionImpl();
|
|
77
|
next: next !== noop ? (v => isClosed() || next(v)) : noop,
|
|
|
|
|
78
|
error,
|
|
|
|
|
79
|
complete: () => finalize() && consumer.complete && consumer.complete()
|
|
|
|
|
80
|
}));
|
|
|
|
|
81
|
|
|
46
|
|
|
82
|
return { unsubscribe: finalize };
|
|
47
|
this._producer({ next, error, complete }, subscription);
|
|
|
|
|
48
|
|
|
|
|
|
49
|
return subscription;
|
|
83
|
}
|
|
50
|
}
|
|
84
|
|
|
51
|
|
|
85
|
map<T2>(mapper: (value: T) => T2) {
|
|
52
|
map<T2>(mapper: (value: T) => T2) {
|
|
86
|
return new ObservableImpl<T2>(({ next, ...rest }) =>
|
|
53
|
return new ObservableImpl<T2>(({ next, ...rest }, subscription) =>
|
|
87
|
this._producer({
|
|
54
|
(0, this._producer)(
|
|
88
|
next: next !== noop ? (v: T) => next(mapper(v)) : noop,
|
|
55
|
{
|
|
89
|
...rest
|
|
56
|
next: next !== noop ? (v: T) => next(mapper(v)) : noop,
|
|
90
|
}));
|
|
57
|
...rest
|
|
|
|
|
58
|
}, subscription
|
|
|
|
|
59
|
));
|
|
91
|
}
|
|
60
|
}
|
|
92
|
|
|
61
|
|
|
93
|
tap({ next: tapNext, complete: tapComplete, error: tapError }: Observer<T>) {
|
|
62
|
tap({ next: tapNext, complete: tapComplete, error: tapError }: Observer<T>) {
|
|
94
|
return new ObservableImpl<T>(({ next, complete, error }) =>
|
|
63
|
return new ObservableImpl<T>(({ next, complete, error }, subscription) =>
|
|
95
|
this._producer({
|
|
64
|
(0, this._producer)(
|
|
96
|
next: tapNext ? (v => (tapNext(v), next(v))) : next,
|
|
65
|
{
|
|
97
|
complete: tapComplete ? (() => (tapComplete(), complete())) : complete,
|
|
66
|
next: tapNext ? (v => (tapNext(v), next(v))) : next,
|
|
98
|
error: tapError ? (e => (tapError(e), error(e))) : error
|
|
67
|
complete: tapComplete ? (() => (tapComplete(), complete())) : complete,
|
|
99
|
}));
|
|
68
|
error: tapError ? (e => (tapError(e), error(e))) : error
|
|
|
|
|
69
|
},
|
|
|
|
|
70
|
subscription
|
|
|
|
|
71
|
));
|
|
100
|
}
|
|
72
|
}
|
|
101
|
|
|
73
|
|
|
102
|
filter(predicate: Predicate<T>) {
|
|
74
|
filter(predicate: Predicate<T>) {
|
|
103
|
return new ObservableImpl<T>(({ next, ...rest }) =>
|
|
75
|
return new ObservableImpl<T>(({ next, ...rest }, subscription) =>
|
|
104
|
this._producer({
|
|
76
|
(0, this._producer)(
|
|
105
|
next: next !== noop ? (v: T) => predicate(v) ? next(v) : void (0) : noop,
|
|
77
|
{
|
|
106
|
...rest
|
|
78
|
next: next !== noop ? (v: T) => predicate(v) ? next(v) : void (0) : noop,
|
|
107
|
})
|
|
79
|
...rest
|
|
108
|
);
|
|
80
|
},
|
|
|
|
|
81
|
subscription
|
|
|
|
|
82
|
));
|
|
109
|
}
|
|
83
|
}
|
|
110
|
|
|
84
|
|
|
111
|
until(predicate: Predicate<T>) {
|
|
85
|
until(predicate: Predicate<T>) {
|
|
112
|
return new ObservableImpl<T>(({ next, complete, ...rest }) =>
|
|
86
|
return new ObservableImpl<T>(({ next, complete, ...rest }, subscription) =>
|
|
113
|
this._producer({
|
|
87
|
(0, this._producer)({
|
|
114
|
next: v => predicate(v) ? complete() : next(v),
|
|
88
|
next: v => predicate(v) ? subscription.finalize() && complete() : next(v),
|
|
115
|
complete,
|
|
89
|
complete,
|
|
116
|
...rest
|
|
90
|
...rest
|
|
117
|
})
|
|
91
|
}, subscription)
|
|
118
|
);
|
|
92
|
);
|
|
119
|
}
|
|
93
|
}
|
|
120
|
|
|
94
|
|
|
121
|
while(predicate: Predicate<T>) {
|
|
95
|
while(predicate: Predicate<T>) {
|
|
122
|
return new ObservableImpl<T>(({ next, complete, ...rest }) =>
|
|
96
|
return new ObservableImpl<T>(({ next, complete, ...rest }, subscription) =>
|
|
123
|
this._producer({
|
|
97
|
(0, this._producer)({
|
|
124
|
next: v => predicate(v) ? next(v) : complete(),
|
|
98
|
next: v => predicate(v) ? next(v) : subscription.finalize() && complete(),
|
|
125
|
complete,
|
|
99
|
complete,
|
|
126
|
...rest
|
|
100
|
...rest
|
|
127
|
})
|
|
101
|
}, subscription)
|
|
128
|
);
|
|
102
|
);
|
|
129
|
}
|
|
103
|
}
|
|
130
|
|
|
104
|
|
|
@@
-132,23
+106,26
export class ObservableImpl<T> implement
|
|
132
|
scan(accumulator: AccumulatorFn<T, T>): Observable<T>;
|
|
106
|
scan(accumulator: AccumulatorFn<T, T>): Observable<T>;
|
|
133
|
scan<A>(...args: [AccumulatorFn<T, A>, A] | [AccumulatorFn<T, T>]) {
|
|
107
|
scan<A>(...args: [AccumulatorFn<T, A>, A] | [AccumulatorFn<T, T>]) {
|
|
134
|
if (args.length === 1) {
|
|
108
|
if (args.length === 1) {
|
|
135
|
return new ObservableImpl<T | A>(({ next, ...rest }) => {
|
|
109
|
const [accumulator] = args;
|
|
136
|
const [accumulator] = args;
|
|
110
|
|
|
|
|
|
111
|
return new ObservableImpl<T | A>(({ next, ...rest }, subscription) => {
|
|
137
|
let _acc: T;
|
|
112
|
let _acc: T;
|
|
138
|
let index = 0;
|
|
113
|
let index = 0;
|
|
139
|
return this._producer({
|
|
114
|
(0, this._producer)({
|
|
140
|
next: next !== noop ? (v: T) => next(index++ === 0 ? _acc = v : _acc = accumulator(_acc, v)) : noop,
|
|
115
|
next: next !== noop ? (v: T) => next(
|
|
|
|
|
116
|
index++ === 0 ? _acc = v : _acc = accumulator(_acc, v)
|
|
|
|
|
117
|
) : noop,
|
|
141
|
...rest
|
|
118
|
...rest
|
|
142
|
});
|
|
119
|
}, subscription);
|
|
143
|
});
|
|
120
|
});
|
|
144
|
} else {
|
|
121
|
} else {
|
|
145
|
return new ObservableImpl<T | A>(({ next, ...rest }) => {
|
|
122
|
const [accumulator, initial] = args;
|
|
146
|
const [accumulator, initial] = args;
|
|
123
|
return new ObservableImpl<T | A>(({ next, ...rest }, subscription) => {
|
|
147
|
let _acc = initial;
|
|
124
|
let _acc = initial;
|
|
148
|
return this._producer({
|
|
125
|
(0, this._producer)({
|
|
149
|
next: next !== noop ? (v: T) => next(_acc = accumulator(_acc, v)) : noop,
|
|
126
|
next: next !== noop ? (v: T) => next(_acc = accumulator(_acc, v)) : noop,
|
|
150
|
...rest
|
|
127
|
...rest
|
|
151
|
});
|
|
128
|
}, subscription);
|
|
152
|
});
|
|
129
|
});
|
|
153
|
}
|
|
130
|
}
|
|
154
|
}
|
|
131
|
}
|
|
@@
-158,11
+135,11
export class ObservableImpl<T> implement
|
|
158
|
reduce<A>(...args: [AccumulatorFn<T, A>, A] | [AccumulatorFn<T, T>]) {
|
|
135
|
reduce<A>(...args: [AccumulatorFn<T, A>, A] | [AccumulatorFn<T, T>]) {
|
|
159
|
|
|
136
|
|
|
160
|
if (args.length === 1) {
|
|
137
|
if (args.length === 1) {
|
|
161
|
return new ObservableImpl<T | A>(({ next, complete, error }) => {
|
|
138
|
const [accumulator] = args;
|
|
162
|
const [accumulator] = args;
|
|
139
|
return new ObservableImpl<T>(({ next, complete, error }, subscription) => {
|
|
163
|
let _acc: T;
|
|
140
|
let _acc: T;
|
|
164
|
let index = 0;
|
|
141
|
let index = 0;
|
|
165
|
return this._producer({
|
|
142
|
(0, this._producer)({
|
|
166
|
next: next !== noop ? (v: T) => {
|
|
143
|
next: next !== noop ? (v: T) => {
|
|
167
|
_acc = index++ === 0 ? v : accumulator(_acc, v);
|
|
144
|
_acc = index++ === 0 ? v : accumulator(_acc, v);
|
|
168
|
} : noop,
|
|
145
|
} : noop,
|
|
@@
-175,13
+152,13
export class ObservableImpl<T> implement
|
|
175
|
}
|
|
152
|
}
|
|
176
|
},
|
|
153
|
},
|
|
177
|
error
|
|
154
|
error
|
|
178
|
});
|
|
155
|
}, subscription);
|
|
179
|
});
|
|
156
|
});
|
|
180
|
} else {
|
|
157
|
} else {
|
|
181
|
return new ObservableImpl<T | A>(({ next, complete, error }) => {
|
|
158
|
const [accumulator, initial] = args;
|
|
182
|
const [accumulator, initial] = args;
|
|
159
|
return new ObservableImpl<A>(({ next, complete, error }, subscription) => {
|
|
183
|
let _acc = initial;
|
|
160
|
let _acc = initial;
|
|
184
|
return this._producer({
|
|
161
|
(0, this._producer)({
|
|
185
|
next: next !== noop ? (v: T) => {
|
|
162
|
next: next !== noop ? (v: T) => {
|
|
186
|
_acc = accumulator(_acc, v);
|
|
163
|
_acc = accumulator(_acc, v);
|
|
187
|
} : noop,
|
|
164
|
} : noop,
|
|
@@
-190,24
+167,23
export class ObservableImpl<T> implement
|
|
190
|
complete();
|
|
167
|
complete();
|
|
191
|
},
|
|
168
|
},
|
|
192
|
error
|
|
169
|
error
|
|
193
|
});
|
|
170
|
}, subscription);
|
|
194
|
});
|
|
171
|
});
|
|
195
|
}
|
|
172
|
}
|
|
196
|
}
|
|
173
|
}
|
|
197
|
|
|
174
|
|
|
198
|
cat(...seq: Subscribable<T>[]) {
|
|
175
|
cat(...seq: Subscribable<T>[]) {
|
|
199
|
return new ObservableImpl<T>(({ next, complete: final, ...rest }) => {
|
|
176
|
return new ObservableImpl<T>(({ next, complete: final, ...rest }, subscription) => {
|
|
200
|
let cleanup: () => void;
|
|
|
|
|
201
|
const len = seq.length;
|
|
177
|
const len = seq.length;
|
|
202
|
const complete = (i: number) => i < len ?
|
|
178
|
const complete = (i: number) => i < len ?
|
|
203
|
() => {
|
|
179
|
() => {
|
|
204
|
const subscription = seq[i].subscribe({ next, complete: complete(i + 1), ...rest });
|
|
180
|
h = seq[i].subscribe({ next, complete: complete(i + 1), ...rest });
|
|
205
|
cleanup = subscription.unsubscribe.bind(subscription);
|
|
181
|
} : () => subscription.finalize() && final();
|
|
206
|
} : final;
|
|
182
|
|
|
207
|
|
|
183
|
|
|
208
|
cleanup = this._producer({ next, complete: complete(0), ...rest }) ?? noop;
|
|
184
|
let h = this.subscribe({ next, complete: complete(0), ...rest });
|
|
|
|
|
185
|
subscription.teardown(() => h.unsubscribe());
|
|
209
|
|
|
186
|
|
|
210
|
return () => cleanup();
|
|
|
|
|
211
|
});
|
|
187
|
});
|
|
212
|
}
|
|
188
|
}
|
|
213
|
|
|
189
|
|
|
@@
-219,13
+195,14
export class ObservableImpl<T> implement
|
|
219
|
return new Promise<T>((resolve, reject) => {
|
|
195
|
return new Promise<T>((resolve, reject) => {
|
|
220
|
ct.throwIfRequested();
|
|
196
|
ct.throwIfRequested();
|
|
221
|
|
|
197
|
|
|
222
|
const { teardown, finalize, isClosed, error } = subscription(reject, ct);
|
|
198
|
const subscription = new SubscriptionImpl();
|
|
223
|
teardown((0, this._producer)({
|
|
199
|
subscription.cancellable(ct, reject);
|
|
224
|
next: (item: T) => finalize() && resolve(item),
|
|
200
|
|
|
225
|
complete: () => finalize() && reject(new Error("The sequence is empty")),
|
|
201
|
(0, this._producer)({
|
|
226
|
error,
|
|
202
|
next: resolve,
|
|
227
|
isClosed
|
|
203
|
complete: () => reject(new Error("The sequence is empty")),
|
|
228
|
}));
|
|
204
|
error: reject,
|
|
|
|
|
205
|
}, subscription);
|
|
229
|
|
|
206
|
|
|
230
|
});
|
|
207
|
});
|
|
231
|
}
|
|
208
|
}
|
|
@@
-234,15
+211,16
export class ObservableImpl<T> implement
|
|
234
|
return new Promise<T[]>((resolve, reject) => {
|
|
211
|
return new Promise<T[]>((resolve, reject) => {
|
|
235
|
ct.throwIfRequested();
|
|
212
|
ct.throwIfRequested();
|
|
236
|
|
|
213
|
|
|
|
|
|
214
|
const subscription = new SubscriptionImpl();
|
|
|
|
|
215
|
subscription.cancellable(ct, reject);
|
|
|
|
|
216
|
|
|
237
|
const acc: T[] = [];
|
|
217
|
const acc: T[] = [];
|
|
238
|
|
|
218
|
|
|
239
|
const { teardown, finalize, isClosed, error } = subscription(reject, ct);
|
|
219
|
(0, this._producer)({
|
|
240
|
teardown((0, this._producer)({
|
|
220
|
next: (item: T) => acc.push(item),
|
|
241
|
next: (item: T) => isClosed() ? void (0) : acc.push(item),
|
|
221
|
complete: () => resolve(acc),
|
|
242
|
complete: () => finalize() && resolve(acc),
|
|
222
|
error: reject
|
|
243
|
error,
|
|
223
|
}, subscription);
|
|
244
|
isClosed
|
|
|
|
|
245
|
}));
|
|
|
|
|
246
|
});
|
|
224
|
});
|
|
247
|
}
|
|
225
|
}
|
|
248
|
|
|
226
|
|
|
@@
-250,14
+228,14
export class ObservableImpl<T> implement
|
|
250
|
return new Promise<void>((resolve, reject) => {
|
|
228
|
return new Promise<void>((resolve, reject) => {
|
|
251
|
ct.throwIfRequested();
|
|
229
|
ct.throwIfRequested();
|
|
252
|
|
|
230
|
|
|
253
|
const { teardown, finalize, isClosed, error } = subscription(reject, ct);
|
|
231
|
const subscription = new SubscriptionImpl();
|
|
|
|
|
232
|
subscription.cancellable(ct, reject);
|
|
254
|
|
|
233
|
|
|
255
|
teardown((0, this._producer)({
|
|
234
|
(0, this._producer)({
|
|
256
|
next: item => !isClosed() && next(item),
|
|
235
|
next,
|
|
257
|
complete: () => finalize() && resolve(),
|
|
236
|
complete: resolve,
|
|
258
|
error,
|
|
237
|
error: reject
|
|
259
|
isClosed
|
|
238
|
}, subscription);
|
|
260
|
}));
|
|
|
|
|
261
|
});
|
|
239
|
});
|
|
262
|
}
|
|
240
|
}
|
|
263
|
|
|
241
|
|