| @@ -0,0 +1,58 | |||||
|
|
1 | import { ICancellation } from "@implab/core-amd/interfaces"; | |||
|
|
2 | ||||
|
|
3 | export interface Subscription { | |||
|
|
4 | finalize(): boolean; | |||
|
|
5 | ||||
|
|
6 | isClosed(): boolean; | |||
|
|
7 | ||||
|
|
8 | teardown(cleanup: void | (() => void)): void; | |||
|
|
9 | ||||
|
|
10 | around<T>(next: (item: T) => void): (item: T) => void; | |||
|
|
11 | ||||
|
|
12 | unsubscribe(): void; | |||
|
|
13 | } | |||
|
|
14 | ||||
|
|
15 | const noop = () => {}; | |||
|
|
16 | ||||
|
|
17 | ||||
|
|
18 | export class SubscriptionImpl implements Subscription { | |||
|
|
19 | ||||
|
|
20 | private _done = false; | |||
|
|
21 | ||||
|
|
22 | private _cleanup = noop; | |||
|
|
23 | ||||
|
|
24 | finalize() { | |||
|
|
25 | return this._done ? false : ((0,this._cleanup)(), this._done = true); | |||
|
|
26 | } | |||
|
|
27 | ||||
|
|
28 | isClosed() { | |||
|
|
29 | return this._done; | |||
|
|
30 | } | |||
|
|
31 | ||||
|
|
32 | teardown(cleanup: void | (() => void)) { | |||
|
|
33 | if (cleanup) { | |||
|
|
34 | if (this._done) { | |||
|
|
35 | cleanup(); | |||
|
|
36 | } else { | |||
|
|
37 | const _prev = this._cleanup; | |||
|
|
38 | this._cleanup = _prev === noop ? cleanup : () => (_prev(), cleanup()); | |||
|
|
39 | } | |||
|
|
40 | } | |||
|
|
41 | } | |||
|
|
42 | ||||
|
|
43 | cancellable(ct: ICancellation, reject: (err: unknown) => void) { | |||
|
|
44 | if (ct.isSupported()) { | |||
|
|
45 | const h = ct.register(reason => this.finalize() && reject(reason)); | |||
|
|
46 | this.teardown(() => h.destroy()); | |||
|
|
47 | } | |||
|
|
48 | } | |||
|
|
49 | ||||
|
|
50 | unsubscribe() { | |||
|
|
51 | this.finalize(); | |||
|
|
52 | } | |||
|
|
53 | ||||
|
|
54 | around<T>(next: (item: T) => void): (item: T) => void { | |||
|
|
55 | return (item) => this._done ? void(0) : next(item); | |||
|
|
56 | } | |||
|
|
57 | ||||
|
|
58 | } No newline at end of file | |||
| @@ -1,69 +1,69 | |||||
| 1 | import { isPromise } from "@implab/core-amd/safe"; |
|
1 | import { isPromise } from "@implab/core-amd/safe"; | |
| 2 | import { Unsubscribable, Producer, Observable, Subscribable } from "./observable/interfaces"; |
|
2 | import { Unsubscribable, Producer, Observable, Subscribable } from "./observable/interfaces"; | |
| 3 | import { ObservableImpl } from "./observable/ObservableImpl"; |
|
3 | import { ObservableImpl, fuse } from "./observable/ObservableImpl"; | |
| 4 |
|
4 | |||
| 5 | export const isUnsubscribable = (v: unknown): v is Unsubscribable => |
|
5 | export const isUnsubscribable = (v: unknown): v is Unsubscribable => | |
| 6 | v !== null && v !== undefined && typeof (v as Unsubscribable).unsubscribe === "function"; |
|
6 | v !== null && v !== undefined && typeof (v as Unsubscribable).unsubscribe === "function"; | |
| 7 |
|
7 | |||
| 8 | export const isSubscribable = <T = unknown>(v: unknown): v is Subscribable<T> => |
|
8 | export const isSubscribable = <T = unknown>(v: unknown): v is Subscribable<T> => | |
| 9 | v !== null && v !== undefined && typeof (v as Subscribable<unknown>).subscribe === "function"; |
|
9 | v !== null && v !== undefined && typeof (v as Subscribable<unknown>).subscribe === "function"; | |
| 10 |
|
10 | |||
| 11 | export const observe = <T>(producer: Producer<T>): Observable<T> => new ObservableImpl<T>(producer); |
|
11 | export const observe = <T>(producer: Producer<T>): Observable<T> => new ObservableImpl<T>(fuse(producer)); | |
| 12 |
|
12 | |||
| 13 | /** Converts an array to the observable sequence of its elements. */ |
|
13 | /** Converts an array to the observable sequence of its elements. */ | |
| 14 | export const ofArray = <T>(items: T[]): Observable<T> => new ObservableImpl<T>( |
|
14 | export const ofArray = <T>(items: T[]): Observable<T> => new ObservableImpl<T>( | |
| 15 | ({ next, complete }) => ( |
|
15 | ({ next, complete }) => ( | |
| 16 | items.forEach(next), |
|
16 | items.forEach(next), | |
| 17 | complete() |
|
17 | complete() | |
| 18 | ) |
|
18 | ) | |
| 19 | ); |
|
19 | ); | |
| 20 |
|
20 | |||
| 21 | /** Converts a subscribable to the observable */ |
|
21 | /** Converts a subscribable to the observable */ | |
| 22 | export const ofSubscribable = <T>(subscribable: Subscribable<T>) => |
|
22 | export const ofSubscribable = <T>(subscribable: Subscribable<T>) => | |
| 23 | observe<T>(sink => { |
|
23 | observe<T>(sink => { | |
| 24 | const subscription = subscribable.subscribe(sink); |
|
24 | const subscription = subscribable.subscribe(sink); | |
| 25 | return () => subscription.unsubscribe(); |
|
25 | return () => subscription.unsubscribe(); | |
| 26 | }); |
|
26 | }); | |
| 27 |
|
27 | |||
| 28 | const of1 = <T>(item: T | PromiseLike<T>) => observe<T>( |
|
28 | const of1 = <T>(item: T | PromiseLike<T>) => observe<T>( | |
| 29 | ({ next, error, complete }) => |
|
29 | ({ next, error, complete }) => | |
| 30 | isPromise(item) ? |
|
30 | isPromise(item) ? | |
| 31 | void item.then( |
|
31 | void item.then( | |
| 32 | v => (next(v), complete()), |
|
32 | v => (next(v), complete()), | |
| 33 | error |
|
33 | error | |
| 34 | ) : |
|
34 | ) : | |
| 35 | (next(item), complete()) |
|
35 | (next(item), complete()) | |
| 36 | ); |
|
36 | ); | |
| 37 |
|
37 | |||
| 38 | /** Converts a list of parameter values to the observable sequence. The |
|
38 | /** Converts a list of parameter values to the observable sequence. The | |
| 39 | * order of elements in the list will be preserved in the resulting sequence. |
|
39 | * order of elements in the list will be preserved in the resulting sequence. | |
| 40 | */ |
|
40 | */ | |
| 41 | export const of = <T>(...items: (T | PromiseLike<T>)[]) => items.length === 1 ? |
|
41 | export const of = <T>(...items: (T | PromiseLike<T>)[]) => items.length === 1 ? | |
| 42 | of1(items[0]) : |
|
42 | of1(items[0]) : | |
| 43 | observe<T>( |
|
43 | observe<T>( | |
| 44 | ({ next, error, complete, isClosed }) => { |
|
44 | ({ next, error, complete, isClosed }) => { | |
| 45 | const n = items.length; |
|
45 | const n = items.length; | |
| 46 |
|
46 | |||
| 47 | const _next = (start: number) => { |
|
47 | const _next = (start: number) => { | |
| 48 | if (start > 0 && isClosed()) // when resumed |
|
48 | if (start > 0 && isClosed()) // when resumed | |
| 49 | return; |
|
49 | return; | |
| 50 |
|
50 | |||
| 51 | for (let i = start; i < n; i++) { |
|
51 | for (let i = start; i < n; i++) { | |
| 52 | const r = items[i]; |
|
52 | const r = items[i]; | |
| 53 | if (isPromise(r)) { |
|
53 | if (isPromise(r)) { | |
| 54 | r.then(v => (next(v), _next(i + 1)), error); |
|
54 | r.then(v => (next(v), _next(i + 1)), error); | |
| 55 | return; // suspend |
|
55 | return; // suspend | |
| 56 | } else { |
|
56 | } else { | |
| 57 | next(r); |
|
57 | next(r); | |
| 58 | } |
|
58 | } | |
| 59 | } |
|
59 | } | |
| 60 | complete(); |
|
60 | complete(); | |
| 61 | }; |
|
61 | }; | |
| 62 |
|
62 | |||
| 63 | _next(0); |
|
63 | _next(0); | |
| 64 | } |
|
64 | } | |
| 65 | ); |
|
65 | ); | |
| 66 |
|
66 | |||
| 67 | export const empty: Observable<never> = new ObservableImpl<never>(({ complete }) => complete()); |
|
67 | export const empty: Observable<never> = new ObservableImpl<never>(({ complete }) => complete()); | |
| 68 |
|
68 | |||
| 69 | export type * from "./observable/interfaces"; No newline at end of file |
|
69 | export type * from "./observable/interfaces"; | |
| @@ -1,264 +1,242 | |||||
| 1 | import { id as mid } from "module"; |
|
1 | import { id as mid } from "module"; | |
| 2 | import { AccumulatorFn, Observable, Observer, OperatorFn, Producer, Sink, Subscribable, Unsubscribable } from "./interfaces"; |
|
2 | import { AccumulatorFn, Observable, Observer, OperatorFn, Producer, Sink, Subscribable, Unsubscribable } from "./interfaces"; | |
| 3 | import { TraceSource } from "@implab/core-amd/log/TraceSource"; |
|
3 | import { TraceSource } from "@implab/core-amd/log/TraceSource"; | |
| 4 | import { Predicate } from "@implab/core-amd/interfaces"; |
|
4 | import { Predicate } from "@implab/core-amd/interfaces"; | |
| 5 | import { Cancellation } from "@implab/core-amd/Cancellation"; |
|
5 | import { Cancellation } from "@implab/core-amd/Cancellation"; | |
|
|
6 | import { Subscription, SubscriptionImpl } from "./SubscriptionImpl"; | |||
| 6 |
|
7 | |||
| 7 | const trace = TraceSource.get(mid); |
|
8 | const trace = TraceSource.get(mid); | |
| 8 |
|
9 | |||
| 9 | const noop = () => { }; |
|
10 | const noop = () => { }; | |
| 10 |
|
11 | |||
| 11 | const errorFallback = (e: unknown) => trace.error("Unhandled observable error: {0}", e); |
|
12 | const errorFallback = (e: unknown) => trace.error("Unhandled observable error: {0}", e); | |
| 12 |
|
13 | |||
| 13 | const subscription = (reject: (err: unknown) => void, ct = Cancellation.none) => { |
|
|||
| 14 | let done = false; |
|
|||
| 15 | let cleanup = noop; |
|
|||
| 16 |
|
||||
| 17 | const finalize = () => done ? false : (cleanup(), done = true); |
|
|||
| 18 | const isClosed = () => done; |
|
|||
| 19 |
|
||||
| 20 | const teardown = (cb: void | (() => void)) => { |
|
|||
| 21 | if (cb) { |
|
|||
| 22 | if (done) { |
|
|||
| 23 | cb(); |
|
|||
| 24 | } else { |
|
|||
| 25 | const _prev = cleanup; |
|
|||
| 26 | cleanup = _prev === noop ? cb : () => (_prev(), cb()); |
|
|||
| 27 | } |
|
|||
| 28 | } |
|
|||
| 29 | }; |
|
|||
| 30 |
|
||||
| 31 | const error = (err: unknown) => finalize() && reject(err); |
|
|||
| 32 |
|
||||
| 33 | if (ct.isSupported()) { |
|
|||
| 34 | const h = ct.register(error); |
|
|||
| 35 | teardown(() => h.destroy()); |
|
|||
| 36 | } |
|
|||
| 37 |
|
||||
| 38 | return { finalize, isClosed, teardown, error }; |
|
|||
| 39 | }; |
|
|||
| 40 |
|
14 | |||
| 41 | type FusedSink<T> = Omit<Sink<T>, "isClosed">; |
|
15 | type FusedSink<T> = Omit<Sink<T>, "isClosed">; | |
| 42 |
|
16 | |||
| 43 |
export const fuse = <T>(producer: Producer<T>) => |
|
17 | export const fuse = <T>(producer: Producer<T>) => | |
| 44 |
|
18 | (sink: FusedSink<T>, subscription: Subscription): Unsubscribable => { | ||
| 45 | const reject = (err: unknown) => consumer.error ? consumer.error(err) : errorFallback(err); |
|
|||
| 46 | const { teardown, isClosed, finalize, error } = subscription(reject); |
|
|||
| 47 |
|
||||
| 48 | const next = consumer.next ? consumer.next.bind(consumer) : noop; |
|
|||
| 49 |
|
19 | |||
| 50 | teardown(producer({ |
|
20 | subscription.teardown(producer({ | |
| 51 | next: next !== noop ? (v => isClosed() || next(v)) : noop, |
|
21 | next: subscription.around(sink.next), | |
| 52 | error, |
|
22 | complete: () => subscription.finalize() && sink.complete(), | |
| 53 | complete: () => finalize() && consumer.complete && consumer.complete(), |
|
23 | error: err => subscription.finalize() && sink.error(err), | |
| 54 | isClosed |
|
24 | isClosed: subscription.isClosed.bind(subscription) | |
| 55 | })); |
|
25 | })); | |
| 56 |
|
26 | |||
| 57 |
return |
|
27 | return subscription; | |
| 58 | }; |
|
28 | }; | |
| 59 |
|
29 | |||
| 60 |
type FusedProducer<T> = ( |
|
30 | type FusedProducer<T> = (sink: FusedSink<T>, subscription: Subscription) => void; | |
| 61 |
|
31 | |||
| 62 | export class ObservableImpl<T> implements Observable<T> { |
|
32 | export class ObservableImpl<T> implements Observable<T> { | |
| 63 |
|
33 | |||
| 64 | private readonly _producer: FusedProducer<T>; |
|
34 | private readonly _producer: FusedProducer<T>; | |
| 65 |
|
35 | |||
| 66 | constructor(producer: FusedProducer<T>) { |
|
36 | constructor(producer: FusedProducer<T>) { | |
| 67 | this._producer = producer; |
|
37 | this._producer = producer; | |
| 68 | } |
|
38 | } | |
| 69 |
|
39 | |||
| 70 | subscribe(consumer: Observer<T> = {}) { |
|
40 | subscribe(consumer: Observer<T> = {}): Unsubscribable { | |
| 71 |
const |
|
41 | const error = (err: unknown) => consumer.error ? consumer.error(err) : errorFallback(err); | |
| 72 | const next = consumer.next ? consumer.next.bind(consumer) : noop; |
|
42 | const next = consumer.next ? consumer.next.bind(consumer) : noop; | |
| 73 |
|
43 | const complete = () => consumer.complete ? consumer.complete() : void (0); | ||
| 74 | const { teardown, isClosed, finalize, error } = subscription(reject); |
|
|||
| 75 |
|
44 | |||
| 76 | teardown((0, this._producer)({ |
|
45 | const subscription = new SubscriptionImpl(); | |
| 77 | next: next !== noop ? (v => isClosed() || next(v)) : noop, |
|
|||
| 78 | error, |
|
|||
| 79 | complete: () => finalize() && consumer.complete && consumer.complete() |
|
|||
| 80 | })); |
|
|||
| 81 |
|
46 | |||
| 82 | return { unsubscribe: finalize }; |
|
47 | this._producer({ next, error, complete }, subscription); | |
|
|
48 | ||||
|
|
49 | return subscription; | |||
| 83 | } |
|
50 | } | |
| 84 |
|
51 | |||
| 85 | map<T2>(mapper: (value: T) => T2) { |
|
52 | map<T2>(mapper: (value: T) => T2) { | |
| 86 | return new ObservableImpl<T2>(({ next, ...rest }) => |
|
53 | return new ObservableImpl<T2>(({ next, ...rest }, subscription) => | |
| 87 |
this._producer( |
|
54 | (0, this._producer)( | |
| 88 | next: next !== noop ? (v: T) => next(mapper(v)) : noop, |
|
55 | { | |
| 89 | ...rest |
|
56 | next: next !== noop ? (v: T) => next(mapper(v)) : noop, | |
| 90 | })); |
|
57 | ...rest | |
|
|
58 | }, subscription | |||
|
|
59 | )); | |||
| 91 | } |
|
60 | } | |
| 92 |
|
61 | |||
| 93 | tap({ next: tapNext, complete: tapComplete, error: tapError }: Observer<T>) { |
|
62 | tap({ next: tapNext, complete: tapComplete, error: tapError }: Observer<T>) { | |
| 94 | return new ObservableImpl<T>(({ next, complete, error }) => |
|
63 | return new ObservableImpl<T>(({ next, complete, error }, subscription) => | |
| 95 |
this._producer( |
|
64 | (0, this._producer)( | |
| 96 | next: tapNext ? (v => (tapNext(v), next(v))) : next, |
|
65 | { | |
| 97 |
|
|
66 | next: tapNext ? (v => (tapNext(v), next(v))) : next, | |
| 98 |
|
|
67 | complete: tapComplete ? (() => (tapComplete(), complete())) : complete, | |
| 99 | })); |
|
68 | error: tapError ? (e => (tapError(e), error(e))) : error | |
|
|
69 | }, | |||
|
|
70 | subscription | |||
|
|
71 | )); | |||
| 100 | } |
|
72 | } | |
| 101 |
|
73 | |||
| 102 | filter(predicate: Predicate<T>) { |
|
74 | filter(predicate: Predicate<T>) { | |
| 103 | return new ObservableImpl<T>(({ next, ...rest }) => |
|
75 | return new ObservableImpl<T>(({ next, ...rest }, subscription) => | |
| 104 |
this._producer( |
|
76 | (0, this._producer)( | |
| 105 | next: next !== noop ? (v: T) => predicate(v) ? next(v) : void (0) : noop, |
|
77 | { | |
| 106 | ...rest |
|
78 | next: next !== noop ? (v: T) => predicate(v) ? next(v) : void (0) : noop, | |
| 107 | }) |
|
79 | ...rest | |
| 108 | ); |
|
80 | }, | |
|
|
81 | subscription | |||
|
|
82 | )); | |||
| 109 | } |
|
83 | } | |
| 110 |
|
84 | |||
| 111 | until(predicate: Predicate<T>) { |
|
85 | until(predicate: Predicate<T>) { | |
| 112 | return new ObservableImpl<T>(({ next, complete, ...rest }) => |
|
86 | return new ObservableImpl<T>(({ next, complete, ...rest }, subscription) => | |
| 113 | this._producer({ |
|
87 | (0, this._producer)({ | |
| 114 | next: v => predicate(v) ? complete() : next(v), |
|
88 | next: v => predicate(v) ? subscription.finalize() && complete() : next(v), | |
| 115 | complete, |
|
89 | complete, | |
| 116 | ...rest |
|
90 | ...rest | |
| 117 | }) |
|
91 | }, subscription) | |
| 118 | ); |
|
92 | ); | |
| 119 | } |
|
93 | } | |
| 120 |
|
94 | |||
| 121 | while(predicate: Predicate<T>) { |
|
95 | while(predicate: Predicate<T>) { | |
| 122 | return new ObservableImpl<T>(({ next, complete, ...rest }) => |
|
96 | return new ObservableImpl<T>(({ next, complete, ...rest }, subscription) => | |
| 123 | this._producer({ |
|
97 | (0, this._producer)({ | |
| 124 | next: v => predicate(v) ? next(v) : complete(), |
|
98 | next: v => predicate(v) ? next(v) : subscription.finalize() && complete(), | |
| 125 | complete, |
|
99 | complete, | |
| 126 | ...rest |
|
100 | ...rest | |
| 127 | }) |
|
101 | }, subscription) | |
| 128 | ); |
|
102 | ); | |
| 129 | } |
|
103 | } | |
| 130 |
|
104 | |||
| 131 | scan<A>(accumulator: AccumulatorFn<T, A>, initial: A): Observable<A>; |
|
105 | scan<A>(accumulator: AccumulatorFn<T, A>, initial: A): Observable<A>; | |
| 132 | scan(accumulator: AccumulatorFn<T, T>): Observable<T>; |
|
106 | scan(accumulator: AccumulatorFn<T, T>): Observable<T>; | |
| 133 | scan<A>(...args: [AccumulatorFn<T, A>, A] | [AccumulatorFn<T, T>]) { |
|
107 | scan<A>(...args: [AccumulatorFn<T, A>, A] | [AccumulatorFn<T, T>]) { | |
| 134 | if (args.length === 1) { |
|
108 | if (args.length === 1) { | |
| 135 | return new ObservableImpl<T | A>(({ next, ...rest }) => { |
|
109 | const [accumulator] = args; | |
| 136 | const [accumulator] = args; |
|
110 | ||
|
|
111 | return new ObservableImpl<T | A>(({ next, ...rest }, subscription) => { | |||
| 137 | let _acc: T; |
|
112 | let _acc: T; | |
| 138 | let index = 0; |
|
113 | let index = 0; | |
| 139 |
|
|
114 | (0, this._producer)({ | |
| 140 |
next: next !== noop ? (v: T) => next( |
|
115 | next: next !== noop ? (v: T) => next( | |
|
|
116 | index++ === 0 ? _acc = v : _acc = accumulator(_acc, v) | |||
|
|
117 | ) : noop, | |||
| 141 | ...rest |
|
118 | ...rest | |
| 142 | }); |
|
119 | }, subscription); | |
| 143 | }); |
|
120 | }); | |
| 144 | } else { |
|
121 | } else { | |
| 145 | return new ObservableImpl<T | A>(({ next, ...rest }) => { |
|
122 | const [accumulator, initial] = args; | |
| 146 | const [accumulator, initial] = args; |
|
123 | return new ObservableImpl<T | A>(({ next, ...rest }, subscription) => { | |
| 147 | let _acc = initial; |
|
124 | let _acc = initial; | |
| 148 |
|
|
125 | (0, this._producer)({ | |
| 149 | next: next !== noop ? (v: T) => next(_acc = accumulator(_acc, v)) : noop, |
|
126 | next: next !== noop ? (v: T) => next(_acc = accumulator(_acc, v)) : noop, | |
| 150 | ...rest |
|
127 | ...rest | |
| 151 | }); |
|
128 | }, subscription); | |
| 152 | }); |
|
129 | }); | |
| 153 | } |
|
130 | } | |
| 154 | } |
|
131 | } | |
| 155 |
|
132 | |||
| 156 | reduce<A>(accumulator: AccumulatorFn<T, A>, initial: A): Observable<A>; |
|
133 | reduce<A>(accumulator: AccumulatorFn<T, A>, initial: A): Observable<A>; | |
| 157 | reduce(accumulator: AccumulatorFn<T, T>): Observable<T>; |
|
134 | reduce(accumulator: AccumulatorFn<T, T>): Observable<T>; | |
| 158 | reduce<A>(...args: [AccumulatorFn<T, A>, A] | [AccumulatorFn<T, T>]) { |
|
135 | reduce<A>(...args: [AccumulatorFn<T, A>, A] | [AccumulatorFn<T, T>]) { | |
| 159 |
|
136 | |||
| 160 | if (args.length === 1) { |
|
137 | if (args.length === 1) { | |
| 161 | return new ObservableImpl<T | A>(({ next, complete, error }) => { |
|
138 | const [accumulator] = args; | |
| 162 | const [accumulator] = args; |
|
139 | return new ObservableImpl<T>(({ next, complete, error }, subscription) => { | |
| 163 | let _acc: T; |
|
140 | let _acc: T; | |
| 164 | let index = 0; |
|
141 | let index = 0; | |
| 165 |
|
|
142 | (0, this._producer)({ | |
| 166 | next: next !== noop ? (v: T) => { |
|
143 | next: next !== noop ? (v: T) => { | |
| 167 | _acc = index++ === 0 ? v : accumulator(_acc, v); |
|
144 | _acc = index++ === 0 ? v : accumulator(_acc, v); | |
| 168 | } : noop, |
|
145 | } : noop, | |
| 169 | complete: () => { |
|
146 | complete: () => { | |
| 170 | if (index === 0) { |
|
147 | if (index === 0) { | |
| 171 | error(new Error("The sequence can't be empty")); |
|
148 | error(new Error("The sequence can't be empty")); | |
| 172 | } else { |
|
149 | } else { | |
| 173 | next(_acc); |
|
150 | next(_acc); | |
| 174 | complete(); |
|
151 | complete(); | |
| 175 | } |
|
152 | } | |
| 176 | }, |
|
153 | }, | |
| 177 | error |
|
154 | error | |
| 178 | }); |
|
155 | }, subscription); | |
| 179 | }); |
|
156 | }); | |
| 180 | } else { |
|
157 | } else { | |
| 181 | return new ObservableImpl<T | A>(({ next, complete, error }) => { |
|
158 | const [accumulator, initial] = args; | |
| 182 | const [accumulator, initial] = args; |
|
159 | return new ObservableImpl<A>(({ next, complete, error }, subscription) => { | |
| 183 | let _acc = initial; |
|
160 | let _acc = initial; | |
| 184 |
|
|
161 | (0, this._producer)({ | |
| 185 | next: next !== noop ? (v: T) => { |
|
162 | next: next !== noop ? (v: T) => { | |
| 186 | _acc = accumulator(_acc, v); |
|
163 | _acc = accumulator(_acc, v); | |
| 187 | } : noop, |
|
164 | } : noop, | |
| 188 | complete: () => { |
|
165 | complete: () => { | |
| 189 | next(_acc); |
|
166 | next(_acc); | |
| 190 | complete(); |
|
167 | complete(); | |
| 191 | }, |
|
168 | }, | |
| 192 | error |
|
169 | error | |
| 193 | }); |
|
170 | }, subscription); | |
| 194 | }); |
|
171 | }); | |
| 195 | } |
|
172 | } | |
| 196 | } |
|
173 | } | |
| 197 |
|
174 | |||
| 198 | cat(...seq: Subscribable<T>[]) { |
|
175 | cat(...seq: Subscribable<T>[]) { | |
| 199 | return new ObservableImpl<T>(({ next, complete: final, ...rest }) => { |
|
176 | return new ObservableImpl<T>(({ next, complete: final, ...rest }, subscription) => { | |
| 200 | let cleanup: () => void; |
|
|||
| 201 | const len = seq.length; |
|
177 | const len = seq.length; | |
| 202 | const complete = (i: number) => i < len ? |
|
178 | const complete = (i: number) => i < len ? | |
| 203 | () => { |
|
179 | () => { | |
| 204 |
|
|
180 | h = seq[i].subscribe({ next, complete: complete(i + 1), ...rest }); | |
| 205 |
|
|
181 | } : () => subscription.finalize() && final(); | |
| 206 | } : final; |
|
182 | ||
| 207 |
|
183 | |||
| 208 |
|
|
184 | let h = this.subscribe({ next, complete: complete(0), ...rest }); | |
|
|
185 | subscription.teardown(() => h.unsubscribe()); | |||
| 209 |
|
186 | |||
| 210 | return () => cleanup(); |
|
|||
| 211 | }); |
|
187 | }); | |
| 212 | } |
|
188 | } | |
| 213 |
|
189 | |||
| 214 | pipe<U>(op: OperatorFn<T, U>): Observable<U> { |
|
190 | pipe<U>(op: OperatorFn<T, U>): Observable<U> { | |
| 215 | return op(this); |
|
191 | return op(this); | |
| 216 | } |
|
192 | } | |
| 217 |
|
193 | |||
| 218 | next(ct = Cancellation.none) { |
|
194 | next(ct = Cancellation.none) { | |
| 219 | return new Promise<T>((resolve, reject) => { |
|
195 | return new Promise<T>((resolve, reject) => { | |
| 220 | ct.throwIfRequested(); |
|
196 | ct.throwIfRequested(); | |
| 221 |
|
197 | |||
| 222 | const { teardown, finalize, isClosed, error } = subscription(reject, ct); |
|
198 | const subscription = new SubscriptionImpl(); | |
| 223 | teardown((0, this._producer)({ |
|
199 | subscription.cancellable(ct, reject); | |
| 224 | next: (item: T) => finalize() && resolve(item), |
|
200 | ||
| 225 | complete: () => finalize() && reject(new Error("The sequence is empty")), |
|
201 | (0, this._producer)({ | |
| 226 |
|
|
202 | next: resolve, | |
| 227 | isClosed |
|
203 | complete: () => reject(new Error("The sequence is empty")), | |
| 228 | })); |
|
204 | error: reject, | |
|
|
205 | }, subscription); | |||
| 229 |
|
206 | |||
| 230 | }); |
|
207 | }); | |
| 231 | } |
|
208 | } | |
| 232 |
|
209 | |||
| 233 | collect(ct = Cancellation.none) { |
|
210 | collect(ct = Cancellation.none) { | |
| 234 | return new Promise<T[]>((resolve, reject) => { |
|
211 | return new Promise<T[]>((resolve, reject) => { | |
| 235 | ct.throwIfRequested(); |
|
212 | ct.throwIfRequested(); | |
| 236 |
|
213 | |||
|
|
214 | const subscription = new SubscriptionImpl(); | |||
|
|
215 | subscription.cancellable(ct, reject); | |||
|
|
216 | ||||
| 237 | const acc: T[] = []; |
|
217 | const acc: T[] = []; | |
| 238 |
|
218 | |||
| 239 | const { teardown, finalize, isClosed, error } = subscription(reject, ct); |
|
219 | (0, this._producer)({ | |
| 240 | teardown((0, this._producer)({ |
|
220 | next: (item: T) => acc.push(item), | |
| 241 | next: (item: T) => isClosed() ? void (0) : acc.push(item), |
|
221 | complete: () => resolve(acc), | |
| 242 | complete: () => finalize() && resolve(acc), |
|
222 | error: reject | |
| 243 | error, |
|
223 | }, subscription); | |
| 244 | isClosed |
|
|||
| 245 | })); |
|
|||
| 246 | }); |
|
224 | }); | |
| 247 | } |
|
225 | } | |
| 248 |
|
226 | |||
| 249 | forEach(next: (item: T) => void, ct = Cancellation.none): Promise<void> { |
|
227 | forEach(next: (item: T) => void, ct = Cancellation.none): Promise<void> { | |
| 250 | return new Promise<void>((resolve, reject) => { |
|
228 | return new Promise<void>((resolve, reject) => { | |
| 251 | ct.throwIfRequested(); |
|
229 | ct.throwIfRequested(); | |
| 252 |
|
230 | |||
| 253 | const { teardown, finalize, isClosed, error } = subscription(reject, ct); |
|
231 | const subscription = new SubscriptionImpl(); | |
|
|
232 | subscription.cancellable(ct, reject); | |||
| 254 |
|
233 | |||
| 255 |
|
|
234 | (0, this._producer)({ | |
| 256 | next: item => !isClosed() && next(item), |
|
235 | next, | |
| 257 |
complete: |
|
236 | complete: resolve, | |
| 258 |
error |
|
237 | error: reject | |
| 259 | isClosed |
|
238 | }, subscription); | |
| 260 | })); |
|
|||
| 261 | }); |
|
239 | }); | |
| 262 | } |
|
240 | } | |
| 263 |
|
241 | |||
| 264 | } No newline at end of file |
|
242 | } | |
General Comments 0
You need to be logged in to leave comments.
Login now
