@@ -0,0 +1,58 | |||||
|
1 | import { ICancellation } from "@implab/core-amd/interfaces"; | |||
|
2 | ||||
|
3 | export interface Subscription { | |||
|
4 | finalize(): boolean; | |||
|
5 | ||||
|
6 | isClosed(): boolean; | |||
|
7 | ||||
|
8 | teardown(cleanup: void | (() => void)): void; | |||
|
9 | ||||
|
10 | around<T>(next: (item: T) => void): (item: T) => void; | |||
|
11 | ||||
|
12 | unsubscribe(): void; | |||
|
13 | } | |||
|
14 | ||||
|
15 | const noop = () => {}; | |||
|
16 | ||||
|
17 | ||||
|
18 | export class SubscriptionImpl implements Subscription { | |||
|
19 | ||||
|
20 | private _done = false; | |||
|
21 | ||||
|
22 | private _cleanup = noop; | |||
|
23 | ||||
|
24 | finalize() { | |||
|
25 | return this._done ? false : ((0,this._cleanup)(), this._done = true); | |||
|
26 | } | |||
|
27 | ||||
|
28 | isClosed() { | |||
|
29 | return this._done; | |||
|
30 | } | |||
|
31 | ||||
|
32 | teardown(cleanup: void | (() => void)) { | |||
|
33 | if (cleanup) { | |||
|
34 | if (this._done) { | |||
|
35 | cleanup(); | |||
|
36 | } else { | |||
|
37 | const _prev = this._cleanup; | |||
|
38 | this._cleanup = _prev === noop ? cleanup : () => (_prev(), cleanup()); | |||
|
39 | } | |||
|
40 | } | |||
|
41 | } | |||
|
42 | ||||
|
43 | cancellable(ct: ICancellation, reject: (err: unknown) => void) { | |||
|
44 | if (ct.isSupported()) { | |||
|
45 | const h = ct.register(reason => this.finalize() && reject(reason)); | |||
|
46 | this.teardown(() => h.destroy()); | |||
|
47 | } | |||
|
48 | } | |||
|
49 | ||||
|
50 | unsubscribe() { | |||
|
51 | this.finalize(); | |||
|
52 | } | |||
|
53 | ||||
|
54 | around<T>(next: (item: T) => void): (item: T) => void { | |||
|
55 | return (item) => this._done ? void(0) : next(item); | |||
|
56 | } | |||
|
57 | ||||
|
58 | } No newline at end of file |
@@ -1,69 +1,69 | |||||
1 | import { isPromise } from "@implab/core-amd/safe"; |
|
1 | import { isPromise } from "@implab/core-amd/safe"; | |
2 | import { Unsubscribable, Producer, Observable, Subscribable } from "./observable/interfaces"; |
|
2 | import { Unsubscribable, Producer, Observable, Subscribable } from "./observable/interfaces"; | |
3 | import { ObservableImpl } from "./observable/ObservableImpl"; |
|
3 | import { ObservableImpl, fuse } from "./observable/ObservableImpl"; | |
4 |
|
4 | |||
5 | export const isUnsubscribable = (v: unknown): v is Unsubscribable => |
|
5 | export const isUnsubscribable = (v: unknown): v is Unsubscribable => | |
6 | v !== null && v !== undefined && typeof (v as Unsubscribable).unsubscribe === "function"; |
|
6 | v !== null && v !== undefined && typeof (v as Unsubscribable).unsubscribe === "function"; | |
7 |
|
7 | |||
8 | export const isSubscribable = <T = unknown>(v: unknown): v is Subscribable<T> => |
|
8 | export const isSubscribable = <T = unknown>(v: unknown): v is Subscribable<T> => | |
9 | v !== null && v !== undefined && typeof (v as Subscribable<unknown>).subscribe === "function"; |
|
9 | v !== null && v !== undefined && typeof (v as Subscribable<unknown>).subscribe === "function"; | |
10 |
|
10 | |||
11 | export const observe = <T>(producer: Producer<T>): Observable<T> => new ObservableImpl<T>(producer); |
|
11 | export const observe = <T>(producer: Producer<T>): Observable<T> => new ObservableImpl<T>(fuse(producer)); | |
12 |
|
12 | |||
13 | /** Converts an array to the observable sequence of its elements. */ |
|
13 | /** Converts an array to the observable sequence of its elements. */ | |
14 | export const ofArray = <T>(items: T[]): Observable<T> => new ObservableImpl<T>( |
|
14 | export const ofArray = <T>(items: T[]): Observable<T> => new ObservableImpl<T>( | |
15 | ({ next, complete }) => ( |
|
15 | ({ next, complete }) => ( | |
16 | items.forEach(next), |
|
16 | items.forEach(next), | |
17 | complete() |
|
17 | complete() | |
18 | ) |
|
18 | ) | |
19 | ); |
|
19 | ); | |
20 |
|
20 | |||
21 | /** Converts a subscribable to the observable */ |
|
21 | /** Converts a subscribable to the observable */ | |
22 | export const ofSubscribable = <T>(subscribable: Subscribable<T>) => |
|
22 | export const ofSubscribable = <T>(subscribable: Subscribable<T>) => | |
23 | observe<T>(sink => { |
|
23 | observe<T>(sink => { | |
24 | const subscription = subscribable.subscribe(sink); |
|
24 | const subscription = subscribable.subscribe(sink); | |
25 | return () => subscription.unsubscribe(); |
|
25 | return () => subscription.unsubscribe(); | |
26 | }); |
|
26 | }); | |
27 |
|
27 | |||
28 | const of1 = <T>(item: T | PromiseLike<T>) => observe<T>( |
|
28 | const of1 = <T>(item: T | PromiseLike<T>) => observe<T>( | |
29 | ({ next, error, complete }) => |
|
29 | ({ next, error, complete }) => | |
30 | isPromise(item) ? |
|
30 | isPromise(item) ? | |
31 | void item.then( |
|
31 | void item.then( | |
32 | v => (next(v), complete()), |
|
32 | v => (next(v), complete()), | |
33 | error |
|
33 | error | |
34 | ) : |
|
34 | ) : | |
35 | (next(item), complete()) |
|
35 | (next(item), complete()) | |
36 | ); |
|
36 | ); | |
37 |
|
37 | |||
38 | /** Converts a list of parameter values to the observable sequence. The |
|
38 | /** Converts a list of parameter values to the observable sequence. The | |
39 | * order of elements in the list will be preserved in the resulting sequence. |
|
39 | * order of elements in the list will be preserved in the resulting sequence. | |
40 | */ |
|
40 | */ | |
41 | export const of = <T>(...items: (T | PromiseLike<T>)[]) => items.length === 1 ? |
|
41 | export const of = <T>(...items: (T | PromiseLike<T>)[]) => items.length === 1 ? | |
42 | of1(items[0]) : |
|
42 | of1(items[0]) : | |
43 | observe<T>( |
|
43 | observe<T>( | |
44 | ({ next, error, complete, isClosed }) => { |
|
44 | ({ next, error, complete, isClosed }) => { | |
45 | const n = items.length; |
|
45 | const n = items.length; | |
46 |
|
46 | |||
47 | const _next = (start: number) => { |
|
47 | const _next = (start: number) => { | |
48 | if (start > 0 && isClosed()) // when resumed |
|
48 | if (start > 0 && isClosed()) // when resumed | |
49 | return; |
|
49 | return; | |
50 |
|
50 | |||
51 | for (let i = start; i < n; i++) { |
|
51 | for (let i = start; i < n; i++) { | |
52 | const r = items[i]; |
|
52 | const r = items[i]; | |
53 | if (isPromise(r)) { |
|
53 | if (isPromise(r)) { | |
54 | r.then(v => (next(v), _next(i + 1)), error); |
|
54 | r.then(v => (next(v), _next(i + 1)), error); | |
55 | return; // suspend |
|
55 | return; // suspend | |
56 | } else { |
|
56 | } else { | |
57 | next(r); |
|
57 | next(r); | |
58 | } |
|
58 | } | |
59 | } |
|
59 | } | |
60 | complete(); |
|
60 | complete(); | |
61 | }; |
|
61 | }; | |
62 |
|
62 | |||
63 | _next(0); |
|
63 | _next(0); | |
64 | } |
|
64 | } | |
65 | ); |
|
65 | ); | |
66 |
|
66 | |||
67 | export const empty: Observable<never> = new ObservableImpl<never>(({ complete }) => complete()); |
|
67 | export const empty: Observable<never> = new ObservableImpl<never>(({ complete }) => complete()); | |
68 |
|
68 | |||
69 | export type * from "./observable/interfaces"; No newline at end of file |
|
69 | export type * from "./observable/interfaces"; |
@@ -1,264 +1,242 | |||||
1 | import { id as mid } from "module"; |
|
1 | import { id as mid } from "module"; | |
2 | import { AccumulatorFn, Observable, Observer, OperatorFn, Producer, Sink, Subscribable, Unsubscribable } from "./interfaces"; |
|
2 | import { AccumulatorFn, Observable, Observer, OperatorFn, Producer, Sink, Subscribable, Unsubscribable } from "./interfaces"; | |
3 | import { TraceSource } from "@implab/core-amd/log/TraceSource"; |
|
3 | import { TraceSource } from "@implab/core-amd/log/TraceSource"; | |
4 | import { Predicate } from "@implab/core-amd/interfaces"; |
|
4 | import { Predicate } from "@implab/core-amd/interfaces"; | |
5 | import { Cancellation } from "@implab/core-amd/Cancellation"; |
|
5 | import { Cancellation } from "@implab/core-amd/Cancellation"; | |
|
6 | import { Subscription, SubscriptionImpl } from "./SubscriptionImpl"; | |||
6 |
|
7 | |||
7 | const trace = TraceSource.get(mid); |
|
8 | const trace = TraceSource.get(mid); | |
8 |
|
9 | |||
9 | const noop = () => { }; |
|
10 | const noop = () => { }; | |
10 |
|
11 | |||
11 | const errorFallback = (e: unknown) => trace.error("Unhandled observable error: {0}", e); |
|
12 | const errorFallback = (e: unknown) => trace.error("Unhandled observable error: {0}", e); | |
12 |
|
13 | |||
13 | const subscription = (reject: (err: unknown) => void, ct = Cancellation.none) => { |
|
|||
14 | let done = false; |
|
|||
15 | let cleanup = noop; |
|
|||
16 |
|
||||
17 | const finalize = () => done ? false : (cleanup(), done = true); |
|
|||
18 | const isClosed = () => done; |
|
|||
19 |
|
||||
20 | const teardown = (cb: void | (() => void)) => { |
|
|||
21 | if (cb) { |
|
|||
22 | if (done) { |
|
|||
23 | cb(); |
|
|||
24 | } else { |
|
|||
25 | const _prev = cleanup; |
|
|||
26 | cleanup = _prev === noop ? cb : () => (_prev(), cb()); |
|
|||
27 | } |
|
|||
28 | } |
|
|||
29 | }; |
|
|||
30 |
|
||||
31 | const error = (err: unknown) => finalize() && reject(err); |
|
|||
32 |
|
||||
33 | if (ct.isSupported()) { |
|
|||
34 | const h = ct.register(error); |
|
|||
35 | teardown(() => h.destroy()); |
|
|||
36 | } |
|
|||
37 |
|
||||
38 | return { finalize, isClosed, teardown, error }; |
|
|||
39 | }; |
|
|||
40 |
|
14 | |||
41 | type FusedSink<T> = Omit<Sink<T>, "isClosed">; |
|
15 | type FusedSink<T> = Omit<Sink<T>, "isClosed">; | |
42 |
|
16 | |||
43 |
export const fuse = <T>(producer: Producer<T>) => |
|
17 | export const fuse = <T>(producer: Producer<T>) => | |
44 |
|
18 | (sink: FusedSink<T>, subscription: Subscription): Unsubscribable => { | ||
45 | const reject = (err: unknown) => consumer.error ? consumer.error(err) : errorFallback(err); |
|
|||
46 | const { teardown, isClosed, finalize, error } = subscription(reject); |
|
|||
47 |
|
19 | |||
48 | const next = consumer.next ? consumer.next.bind(consumer) : noop; |
|
20 | subscription.teardown(producer({ | |
49 |
|
21 | next: subscription.around(sink.next), | ||
50 | teardown(producer({ |
|
22 | complete: () => subscription.finalize() && sink.complete(), | |
51 | next: next !== noop ? (v => isClosed() || next(v)) : noop, |
|
23 | error: err => subscription.finalize() && sink.error(err), | |
52 | error, |
|
24 | isClosed: subscription.isClosed.bind(subscription) | |
53 | complete: () => finalize() && consumer.complete && consumer.complete(), |
|
|||
54 | isClosed |
|
|||
55 | })); |
|
25 | })); | |
56 |
|
26 | |||
57 |
return |
|
27 | return subscription; | |
58 | }; |
|
28 | }; | |
59 |
|
29 | |||
60 |
type FusedProducer<T> = ( |
|
30 | type FusedProducer<T> = (sink: FusedSink<T>, subscription: Subscription) => void; | |
61 |
|
31 | |||
62 | export class ObservableImpl<T> implements Observable<T> { |
|
32 | export class ObservableImpl<T> implements Observable<T> { | |
63 |
|
33 | |||
64 | private readonly _producer: FusedProducer<T>; |
|
34 | private readonly _producer: FusedProducer<T>; | |
65 |
|
35 | |||
66 | constructor(producer: FusedProducer<T>) { |
|
36 | constructor(producer: FusedProducer<T>) { | |
67 | this._producer = producer; |
|
37 | this._producer = producer; | |
68 | } |
|
38 | } | |
69 |
|
39 | |||
70 | subscribe(consumer: Observer<T> = {}) { |
|
40 | subscribe(consumer: Observer<T> = {}): Unsubscribable { | |
71 |
const |
|
41 | const error = (err: unknown) => consumer.error ? consumer.error(err) : errorFallback(err); | |
72 | const next = consumer.next ? consumer.next.bind(consumer) : noop; |
|
42 | const next = consumer.next ? consumer.next.bind(consumer) : noop; | |
73 |
|
43 | const complete = () => consumer.complete ? consumer.complete() : void (0); | ||
74 | const { teardown, isClosed, finalize, error } = subscription(reject); |
|
|||
75 |
|
44 | |||
76 | teardown((0, this._producer)({ |
|
45 | const subscription = new SubscriptionImpl(); | |
77 | next: next !== noop ? (v => isClosed() || next(v)) : noop, |
|
|||
78 | error, |
|
|||
79 | complete: () => finalize() && consumer.complete && consumer.complete() |
|
|||
80 | })); |
|
|||
81 |
|
46 | |||
82 | return { unsubscribe: finalize }; |
|
47 | this._producer({ next, error, complete }, subscription); | |
|
48 | ||||
|
49 | return subscription; | |||
83 | } |
|
50 | } | |
84 |
|
51 | |||
85 | map<T2>(mapper: (value: T) => T2) { |
|
52 | map<T2>(mapper: (value: T) => T2) { | |
86 | return new ObservableImpl<T2>(({ next, ...rest }) => |
|
53 | return new ObservableImpl<T2>(({ next, ...rest }, subscription) => | |
87 |
this._producer( |
|
54 | (0, this._producer)( | |
|
55 | { | |||
88 | next: next !== noop ? (v: T) => next(mapper(v)) : noop, |
|
56 | next: next !== noop ? (v: T) => next(mapper(v)) : noop, | |
89 | ...rest |
|
57 | ...rest | |
90 | })); |
|
58 | }, subscription | |
|
59 | )); | |||
91 | } |
|
60 | } | |
92 |
|
61 | |||
93 | tap({ next: tapNext, complete: tapComplete, error: tapError }: Observer<T>) { |
|
62 | tap({ next: tapNext, complete: tapComplete, error: tapError }: Observer<T>) { | |
94 | return new ObservableImpl<T>(({ next, complete, error }) => |
|
63 | return new ObservableImpl<T>(({ next, complete, error }, subscription) => | |
95 |
this._producer( |
|
64 | (0, this._producer)( | |
|
65 | { | |||
96 | next: tapNext ? (v => (tapNext(v), next(v))) : next, |
|
66 | next: tapNext ? (v => (tapNext(v), next(v))) : next, | |
97 | complete: tapComplete ? (() => (tapComplete(), complete())) : complete, |
|
67 | complete: tapComplete ? (() => (tapComplete(), complete())) : complete, | |
98 | error: tapError ? (e => (tapError(e), error(e))) : error |
|
68 | error: tapError ? (e => (tapError(e), error(e))) : error | |
99 |
} |
|
69 | }, | |
|
70 | subscription | |||
|
71 | )); | |||
100 | } |
|
72 | } | |
101 |
|
73 | |||
102 | filter(predicate: Predicate<T>) { |
|
74 | filter(predicate: Predicate<T>) { | |
103 | return new ObservableImpl<T>(({ next, ...rest }) => |
|
75 | return new ObservableImpl<T>(({ next, ...rest }, subscription) => | |
104 |
this._producer( |
|
76 | (0, this._producer)( | |
|
77 | { | |||
105 | next: next !== noop ? (v: T) => predicate(v) ? next(v) : void (0) : noop, |
|
78 | next: next !== noop ? (v: T) => predicate(v) ? next(v) : void (0) : noop, | |
106 | ...rest |
|
79 | ...rest | |
107 |
} |
|
80 | }, | |
108 | ); |
|
81 | subscription | |
|
82 | )); | |||
109 | } |
|
83 | } | |
110 |
|
84 | |||
111 | until(predicate: Predicate<T>) { |
|
85 | until(predicate: Predicate<T>) { | |
112 | return new ObservableImpl<T>(({ next, complete, ...rest }) => |
|
86 | return new ObservableImpl<T>(({ next, complete, ...rest }, subscription) => | |
113 | this._producer({ |
|
87 | (0, this._producer)({ | |
114 | next: v => predicate(v) ? complete() : next(v), |
|
88 | next: v => predicate(v) ? subscription.finalize() && complete() : next(v), | |
115 | complete, |
|
89 | complete, | |
116 | ...rest |
|
90 | ...rest | |
117 | }) |
|
91 | }, subscription) | |
118 | ); |
|
92 | ); | |
119 | } |
|
93 | } | |
120 |
|
94 | |||
121 | while(predicate: Predicate<T>) { |
|
95 | while(predicate: Predicate<T>) { | |
122 | return new ObservableImpl<T>(({ next, complete, ...rest }) => |
|
96 | return new ObservableImpl<T>(({ next, complete, ...rest }, subscription) => | |
123 | this._producer({ |
|
97 | (0, this._producer)({ | |
124 | next: v => predicate(v) ? next(v) : complete(), |
|
98 | next: v => predicate(v) ? next(v) : subscription.finalize() && complete(), | |
125 | complete, |
|
99 | complete, | |
126 | ...rest |
|
100 | ...rest | |
127 | }) |
|
101 | }, subscription) | |
128 | ); |
|
102 | ); | |
129 | } |
|
103 | } | |
130 |
|
104 | |||
131 | scan<A>(accumulator: AccumulatorFn<T, A>, initial: A): Observable<A>; |
|
105 | scan<A>(accumulator: AccumulatorFn<T, A>, initial: A): Observable<A>; | |
132 | scan(accumulator: AccumulatorFn<T, T>): Observable<T>; |
|
106 | scan(accumulator: AccumulatorFn<T, T>): Observable<T>; | |
133 | scan<A>(...args: [AccumulatorFn<T, A>, A] | [AccumulatorFn<T, T>]) { |
|
107 | scan<A>(...args: [AccumulatorFn<T, A>, A] | [AccumulatorFn<T, T>]) { | |
134 | if (args.length === 1) { |
|
108 | if (args.length === 1) { | |
135 | return new ObservableImpl<T | A>(({ next, ...rest }) => { |
|
|||
136 |
|
|
109 | const [accumulator] = args; | |
|
110 | ||||
|
111 | return new ObservableImpl<T | A>(({ next, ...rest }, subscription) => { | |||
137 | let _acc: T; |
|
112 | let _acc: T; | |
138 | let index = 0; |
|
113 | let index = 0; | |
139 |
|
|
114 | (0, this._producer)({ | |
140 |
next: next !== noop ? (v: T) => next( |
|
115 | next: next !== noop ? (v: T) => next( | |
|
116 | index++ === 0 ? _acc = v : _acc = accumulator(_acc, v) | |||
|
117 | ) : noop, | |||
141 | ...rest |
|
118 | ...rest | |
142 | }); |
|
119 | }, subscription); | |
143 | }); |
|
120 | }); | |
144 | } else { |
|
121 | } else { | |
145 | return new ObservableImpl<T | A>(({ next, ...rest }) => { |
|
|||
146 |
|
|
122 | const [accumulator, initial] = args; | |
|
123 | return new ObservableImpl<T | A>(({ next, ...rest }, subscription) => { | |||
147 | let _acc = initial; |
|
124 | let _acc = initial; | |
148 |
|
|
125 | (0, this._producer)({ | |
149 | next: next !== noop ? (v: T) => next(_acc = accumulator(_acc, v)) : noop, |
|
126 | next: next !== noop ? (v: T) => next(_acc = accumulator(_acc, v)) : noop, | |
150 | ...rest |
|
127 | ...rest | |
151 | }); |
|
128 | }, subscription); | |
152 | }); |
|
129 | }); | |
153 | } |
|
130 | } | |
154 | } |
|
131 | } | |
155 |
|
132 | |||
156 | reduce<A>(accumulator: AccumulatorFn<T, A>, initial: A): Observable<A>; |
|
133 | reduce<A>(accumulator: AccumulatorFn<T, A>, initial: A): Observable<A>; | |
157 | reduce(accumulator: AccumulatorFn<T, T>): Observable<T>; |
|
134 | reduce(accumulator: AccumulatorFn<T, T>): Observable<T>; | |
158 | reduce<A>(...args: [AccumulatorFn<T, A>, A] | [AccumulatorFn<T, T>]) { |
|
135 | reduce<A>(...args: [AccumulatorFn<T, A>, A] | [AccumulatorFn<T, T>]) { | |
159 |
|
136 | |||
160 | if (args.length === 1) { |
|
137 | if (args.length === 1) { | |
161 | return new ObservableImpl<T | A>(({ next, complete, error }) => { |
|
|||
162 |
|
|
138 | const [accumulator] = args; | |
|
139 | return new ObservableImpl<T>(({ next, complete, error }, subscription) => { | |||
163 | let _acc: T; |
|
140 | let _acc: T; | |
164 | let index = 0; |
|
141 | let index = 0; | |
165 |
|
|
142 | (0, this._producer)({ | |
166 | next: next !== noop ? (v: T) => { |
|
143 | next: next !== noop ? (v: T) => { | |
167 | _acc = index++ === 0 ? v : accumulator(_acc, v); |
|
144 | _acc = index++ === 0 ? v : accumulator(_acc, v); | |
168 | } : noop, |
|
145 | } : noop, | |
169 | complete: () => { |
|
146 | complete: () => { | |
170 | if (index === 0) { |
|
147 | if (index === 0) { | |
171 | error(new Error("The sequence can't be empty")); |
|
148 | error(new Error("The sequence can't be empty")); | |
172 | } else { |
|
149 | } else { | |
173 | next(_acc); |
|
150 | next(_acc); | |
174 | complete(); |
|
151 | complete(); | |
175 | } |
|
152 | } | |
176 | }, |
|
153 | }, | |
177 | error |
|
154 | error | |
178 | }); |
|
155 | }, subscription); | |
179 | }); |
|
156 | }); | |
180 | } else { |
|
157 | } else { | |
181 | return new ObservableImpl<T | A>(({ next, complete, error }) => { |
|
|||
182 |
|
|
158 | const [accumulator, initial] = args; | |
|
159 | return new ObservableImpl<A>(({ next, complete, error }, subscription) => { | |||
183 | let _acc = initial; |
|
160 | let _acc = initial; | |
184 |
|
|
161 | (0, this._producer)({ | |
185 | next: next !== noop ? (v: T) => { |
|
162 | next: next !== noop ? (v: T) => { | |
186 | _acc = accumulator(_acc, v); |
|
163 | _acc = accumulator(_acc, v); | |
187 | } : noop, |
|
164 | } : noop, | |
188 | complete: () => { |
|
165 | complete: () => { | |
189 | next(_acc); |
|
166 | next(_acc); | |
190 | complete(); |
|
167 | complete(); | |
191 | }, |
|
168 | }, | |
192 | error |
|
169 | error | |
193 | }); |
|
170 | }, subscription); | |
194 | }); |
|
171 | }); | |
195 | } |
|
172 | } | |
196 | } |
|
173 | } | |
197 |
|
174 | |||
198 | cat(...seq: Subscribable<T>[]) { |
|
175 | cat(...seq: Subscribable<T>[]) { | |
199 | return new ObservableImpl<T>(({ next, complete: final, ...rest }) => { |
|
176 | return new ObservableImpl<T>(({ next, complete: final, ...rest }, subscription) => { | |
200 | let cleanup: () => void; |
|
|||
201 | const len = seq.length; |
|
177 | const len = seq.length; | |
202 | const complete = (i: number) => i < len ? |
|
178 | const complete = (i: number) => i < len ? | |
203 | () => { |
|
179 | () => { | |
204 |
|
|
180 | h = seq[i].subscribe({ next, complete: complete(i + 1), ...rest }); | |
205 |
|
|
181 | } : () => subscription.finalize() && final(); | |
206 | } : final; |
|
182 | ||
207 |
|
183 | |||
208 |
|
|
184 | let h = this.subscribe({ next, complete: complete(0), ...rest }); | |
|
185 | subscription.teardown(() => h.unsubscribe()); | |||
209 |
|
186 | |||
210 | return () => cleanup(); |
|
|||
211 | }); |
|
187 | }); | |
212 | } |
|
188 | } | |
213 |
|
189 | |||
214 | pipe<U>(op: OperatorFn<T, U>): Observable<U> { |
|
190 | pipe<U>(op: OperatorFn<T, U>): Observable<U> { | |
215 | return op(this); |
|
191 | return op(this); | |
216 | } |
|
192 | } | |
217 |
|
193 | |||
218 | next(ct = Cancellation.none) { |
|
194 | next(ct = Cancellation.none) { | |
219 | return new Promise<T>((resolve, reject) => { |
|
195 | return new Promise<T>((resolve, reject) => { | |
220 | ct.throwIfRequested(); |
|
196 | ct.throwIfRequested(); | |
221 |
|
197 | |||
222 | const { teardown, finalize, isClosed, error } = subscription(reject, ct); |
|
198 | const subscription = new SubscriptionImpl(); | |
223 | teardown((0, this._producer)({ |
|
199 | subscription.cancellable(ct, reject); | |
224 | next: (item: T) => finalize() && resolve(item), |
|
200 | ||
225 | complete: () => finalize() && reject(new Error("The sequence is empty")), |
|
201 | (0, this._producer)({ | |
226 |
|
|
202 | next: resolve, | |
227 | isClosed |
|
203 | complete: () => reject(new Error("The sequence is empty")), | |
228 | })); |
|
204 | error: reject, | |
|
205 | }, subscription); | |||
229 |
|
206 | |||
230 | }); |
|
207 | }); | |
231 | } |
|
208 | } | |
232 |
|
209 | |||
233 | collect(ct = Cancellation.none) { |
|
210 | collect(ct = Cancellation.none) { | |
234 | return new Promise<T[]>((resolve, reject) => { |
|
211 | return new Promise<T[]>((resolve, reject) => { | |
235 | ct.throwIfRequested(); |
|
212 | ct.throwIfRequested(); | |
236 |
|
213 | |||
|
214 | const subscription = new SubscriptionImpl(); | |||
|
215 | subscription.cancellable(ct, reject); | |||
|
216 | ||||
237 | const acc: T[] = []; |
|
217 | const acc: T[] = []; | |
238 |
|
218 | |||
239 | const { teardown, finalize, isClosed, error } = subscription(reject, ct); |
|
219 | (0, this._producer)({ | |
240 | teardown((0, this._producer)({ |
|
220 | next: (item: T) => acc.push(item), | |
241 | next: (item: T) => isClosed() ? void (0) : acc.push(item), |
|
221 | complete: () => resolve(acc), | |
242 | complete: () => finalize() && resolve(acc), |
|
222 | error: reject | |
243 | error, |
|
223 | }, subscription); | |
244 | isClosed |
|
|||
245 | })); |
|
|||
246 | }); |
|
224 | }); | |
247 | } |
|
225 | } | |
248 |
|
226 | |||
249 | forEach(next: (item: T) => void, ct = Cancellation.none): Promise<void> { |
|
227 | forEach(next: (item: T) => void, ct = Cancellation.none): Promise<void> { | |
250 | return new Promise<void>((resolve, reject) => { |
|
228 | return new Promise<void>((resolve, reject) => { | |
251 | ct.throwIfRequested(); |
|
229 | ct.throwIfRequested(); | |
252 |
|
230 | |||
253 | const { teardown, finalize, isClosed, error } = subscription(reject, ct); |
|
231 | const subscription = new SubscriptionImpl(); | |
|
232 | subscription.cancellable(ct, reject); | |||
254 |
|
233 | |||
255 |
|
|
234 | (0, this._producer)({ | |
256 | next: item => !isClosed() && next(item), |
|
235 | next, | |
257 |
complete: |
|
236 | complete: resolve, | |
258 |
error |
|
237 | error: reject | |
259 | isClosed |
|
238 | }, subscription); | |
260 | })); |
|
|||
261 | }); |
|
239 | }); | |
262 | } |
|
240 | } | |
263 |
|
241 | |||
264 | } No newline at end of file |
|
242 | } |
General Comments 0
You need to be logged in to leave comments.
Login now