##// END OF EJS Templates
added tap() method to observable...
cin -
r144:63215d91ae4b v1.9.0-rc6 default
parent child
Show More
@@ -1,411 +1,429
1 import { id as mid} from "module";
1 import { id as mid} from "module";
2 import { Cancellation } from "@implab/core-amd/Cancellation";
2 import { Cancellation } from "@implab/core-amd/Cancellation";
3 import { ICancellation } from "@implab/core-amd/interfaces";
3 import { ICancellation } from "@implab/core-amd/interfaces";
4 import { TraceSource } from "@implab/core-amd/log/TraceSource";
4 import { TraceSource } from "@implab/core-amd/log/TraceSource";
5 import { isPromise } from "@implab/core-amd/safe";
5 import { isPromise } from "@implab/core-amd/safe";
6
6
7 const trace = TraceSource.get(mid);
7 const trace = TraceSource.get(mid);
8
8
9 /**
9 /**
10 * The interface for the consumer of an observable sequence
10 * The interface for the consumer of an observable sequence
11 */
11 */
12 export interface Observer<T> {
12 export interface Observer<T> {
13 /**
13 /**
14 * Called for the next element in the sequence
14 * Called for the next element in the sequence
15 */
15 */
16 next?(value: T): void;
16 next?(value: T): void;
17
17
18 /**
18 /**
19 * Called once when the error occurs in the sequence.
19 * Called once when the error occurs in the sequence.
20 */
20 */
21 error?(e: unknown): void;
21 error?(e: unknown): void;
22
22
23 /**
23 /**
24 * Called once at the end of the sequence.
24 * Called once at the end of the sequence.
25 */
25 */
26 complete?(): void;
26 complete?(): void;
27 }
27 }
28
28
29 /**
29 /**
30 * The group of functions to feed an observable. These methods are provided to
30 * The group of functions to feed an observable. These methods are provided to
31 * the producer to generate a stream of events.
31 * the producer to generate a stream of events.
32 */
32 */
33 export type Sink<T> = {
33 export type Sink<T> = {
34 /**
34 /**
35 * Call to send the next element in the sequence
35 * Call to send the next element in the sequence
36 */
36 */
37 next: (value: T) => void;
37 next: (value: T) => void;
38
38
39 /**
39 /**
40 * Call to notify about the error occurred in the sequence.
40 * Call to notify about the error occurred in the sequence.
41 */
41 */
42 error: (e: unknown) => void;
42 error: (e: unknown) => void;
43
43
44 /**
44 /**
45 * Call to signal the end of the sequence.
45 * Call to signal the end of the sequence.
46 */
46 */
47 complete: () => void;
47 complete: () => void;
48
48
49 /**
49 /**
50 * Checks whether the sink is accepting new elements. It's safe to
50 * Checks whether the sink is accepting new elements. It's safe to
51 * send elements to the closed sink.
51 * send elements to the closed sink.
52 */
52 */
53 isClosed: () => boolean;
53 isClosed: () => boolean;
54 };
54 };
55
55
56 export type Producer<T> = (sink: Sink<T>) => (void | (() => void));
56 export type Producer<T> = (sink: Sink<T>) => (void | (() => void));
57
57
58 type FusedSink<T> = Omit<Sink<T>, "isClosed">;
58 type FusedSink<T> = Omit<Sink<T>, "isClosed">;
59
59
60 type FusedProducer<T> = (sink: FusedSink<T>) => (void | (() => void));
60 type FusedProducer<T> = (sink: FusedSink<T>) => (void | (() => void));
61
61
62 export interface Unsubscribable {
62 export interface Unsubscribable {
63 unsubscribe(): void;
63 unsubscribe(): void;
64 }
64 }
65
65
66 export const isUnsubscribable = (v: unknown): v is Unsubscribable =>
66 export const isUnsubscribable = (v: unknown): v is Unsubscribable =>
67 v !== null && v !== undefined && typeof (v as Unsubscribable).unsubscribe === "function";
67 v !== null && v !== undefined && typeof (v as Unsubscribable).unsubscribe === "function";
68
68
69 export const isSubscribable = <T = unknown>(v: unknown): v is Subscribable<T> =>
69 export const isSubscribable = <T = unknown>(v: unknown): v is Subscribable<T> =>
70 v !== null && v !== undefined && typeof (v as Subscribable<unknown>).subscribe === "function";
70 v !== null && v !== undefined && typeof (v as Subscribable<unknown>).subscribe === "function";
71
71
72 export interface Subscribable<T> {
72 export interface Subscribable<T> {
73 subscribe(consumer: Observer<T>): Unsubscribable;
73 /** Subscribes a consumer to events. If a consumer isn't specified
74 * this method activates the producer to achieve side affects if any.
75 */
76 subscribe(consumer?: Observer<T>): Unsubscribable;
74 }
77 }
75
78
76 export type AccumulatorFn<T, A> = (acc: A, value: T) => A;
79 export type AccumulatorFn<T, A> = (acc: A, value: T) => A;
77
80
78 export type OperatorFn<T, U> = (source: Observable<T>) => Observable<U>;
81 export type OperatorFn<T, U> = (source: Observable<T>) => Observable<U>;
79
82
80 /** The observable source of items. */
83 /** The observable source of items. */
81 export interface Observable<T> extends Subscribable<T> {
84 export interface Observable<T> extends Subscribable<T> {
82 /** Transforms elements of the sequence with the specified mapper
85 /** Transforms elements of the sequence with the specified mapper
83 *
86 *
84 * @param mapper The mapper used to transform the values
87 * @param mapper The mapper used to transform the values
85 */
88 */
86 map<T2>(mapper: (value: T) => T2): Observable<T2>;
89 map<T2>(mapper: (value: T) => T2): Observable<T2>;
87
90
91 /** Injects the specified observer into the each producer to consumer chain.
92 * The method is used to add side effect to the events processing.
93 *
94 * @param observer The consumer for the events
95 */
96 tap(observer: Observer<T>): Observable<T>;
97
88 /** Filters elements of the sequence. The resulting sequence will
98 /** Filters elements of the sequence. The resulting sequence will
89 * contain only elements which match the specified predicate.
99 * contain only elements which match the specified predicate.
90 *
100 *
91 * @param predicate The filter predicate.
101 * @param predicate The filter predicate.
92 */
102 */
93 filter(predicate: (value: T) => boolean): Observable<T>;
103 filter(predicate: (value: T) => boolean): Observable<T>;
94
104
95 /** Completes the sequence once the condition is met.
105 /** Completes the sequence once the condition is met.
96 * @param predicate The condition which should be met to complete the sequence
106 * @param predicate The condition which should be met to complete the sequence
97 */
107 */
98 until(predicate: (value: T) => boolean): Observable<T>;
108 until(predicate: (value: T) => boolean): Observable<T>;
99
109
100 /** Keeps the sequence running while elements satisfy the condition.
110 /** Keeps the sequence running while elements satisfy the condition.
101 *
111 *
102 * @param predicate The condition which should be met to continue.
112 * @param predicate The condition which should be met to continue.
103 */
113 */
104 while(predicate: (value: T) => boolean): Observable<T>;
114 while(predicate: (value: T) => boolean): Observable<T>;
105
115
106 /** Applies accumulator to each value in the sequence and
116 /** Applies accumulator to each value in the sequence and
107 * emits the accumulated value for each source element
117 * emits the accumulated value for each source element
108 *
118 *
109 * @param accumulator
119 * @param accumulator
110 * @param initial
120 * @param initial
111 */
121 */
112 scan<A>(accumulator: AccumulatorFn<T, A>, initial: A): Observable<A>;
122 scan<A>(accumulator: AccumulatorFn<T, A>, initial: A): Observable<A>;
113 scan(accumulator: AccumulatorFn<T, T>): Observable<T>;
123 scan(accumulator: AccumulatorFn<T, T>): Observable<T>;
114
124
115 /** Applies accumulator to each value in the sequence and
125 /** Applies accumulator to each value in the sequence and
116 * emits the accumulated value at the end of the sequence
126 * emits the accumulated value at the end of the sequence
117 *
127 *
118 * @param accumulator
128 * @param accumulator
119 * @param initial
129 * @param initial
120 */
130 */
121 reduce<A>(accumulator: AccumulatorFn<T, A>, initial: A): Observable<A>;
131 reduce<A>(accumulator: AccumulatorFn<T, A>, initial: A): Observable<A>;
122 reduce(accumulator: AccumulatorFn<T, T>): Observable<T>;
132 reduce(accumulator: AccumulatorFn<T, T>): Observable<T>;
123
133
124 /** Concatenates the specified sequences with this observable
134 /** Concatenates the specified sequences with this observable
125 *
135 *
126 * @param seq sequences to concatenate with the current observable
136 * @param seq sequences to concatenate with the current observable
127 *
137 *
128 * The concatenation doesn't accumulate values from the specified sequences,
138 * The concatenation doesn't accumulate values from the specified sequences,
129 * The result of the concatenation is the new observable which will switch
139 * The result of the concatenation is the new observable which will switch
130 * to the next observable after the previous one completes. Values emitted
140 * to the next observable after the previous one completes. Values emitted
131 * before the next observable being active are lost.
141 * before the next observable being active are lost.
132 */
142 */
133 cat(...seq: Subscribable<T>[]): Observable<T>;
143 cat(...seq: Subscribable<T>[]): Observable<T>;
134
144
135
145
136 /** Pipes the specified operator to produce the new observable
146 /** Pipes the specified operator to produce the new observable
137 * @param op The operator consumes this observable and produces a new one
147 * @param op The operator consumes this observable and produces a new one
138 *
148 *
139 * The operator is a higher order function which takes a source observable
149 * The operator is a higher order function which takes a source observable
140 * and returns a producer for the new observable.
150 * and returns a producer for the new observable.
141 *
151 *
142 * This function can be used to create a complex mapping between source and
152 * This function can be used to create a complex mapping between source and
143 * resulting observables. The operator may have a state (or a side effect)
153 * resulting observables. The operator may have a state (or a side effect)
144 * and can be connected to multiple observables.
154 * and can be connected to multiple observables.
145 */
155 */
146 pipe<U>(op: OperatorFn<T, U>): Observable<U>;
156 pipe<U>(op: OperatorFn<T, U>): Observable<U>;
147
157
148 /** Waits for the next event to occur and returns a promise for the next value
158 /** Waits for the next event to occur and returns a promise for the next value
149 * @param ct Cancellation token
159 * @param ct Cancellation token
150 */
160 */
151 next(ct?: ICancellation): Promise<T>;
161 next(ct?: ICancellation): Promise<T>;
152
162
153 /** Collects items of the sequence to the array. */
163 /** Collects items of the sequence to the array. */
154 collect(ct?: ICancellation): Promise<T[]>;
164 collect(ct?: ICancellation): Promise<T[]>;
155 }
165 }
156
166
157 const noop = () => { };
167 const noop = () => { };
158
168
159 const errorFallback = (e: unknown) => trace.error("Unhandled observable error: {0}", e);
169 const errorFallback = (e: unknown) => trace.error("Unhandled observable error: {0}", e);
160
170
161 const sink = <T>(consumer: Observer<T>) => {
171 const sink = <T>(consumer: Observer<T>) => {
162 // eslint-disable-next-line @typescript-eslint/unbound-method
172 // eslint-disable-next-line @typescript-eslint/unbound-method
163 const { next, error, complete } = consumer;
173 const { next, error, complete } = consumer;
164 return {
174 return {
165 next: next ? next.bind(consumer) : noop,
175 next: next ? next.bind(consumer) : noop,
166 error: error ? error.bind(consumer) : errorFallback, // report unhandled errors
176 error: error ? error.bind(consumer) : errorFallback, // report unhandled errors
167 complete: complete ? complete.bind(consumer) : noop
177 complete: complete ? complete.bind(consumer) : noop
168 };
178 };
169 };
179 };
170
180
171 /** Wraps the producer to handle tear down logic and subscription management
181 /** Wraps the producer to handle tear down logic and subscription management
172 *
182 *
173 * The resulting producer will invoke cleanup logic on error or complete events
183 * The resulting producer will invoke cleanup logic on error or complete events
174 * and will prevent calling of any method from the sink.
184 * and will prevent calling of any method from the sink.
175 *
185 *
176 * @param producer The producer to wrap
186 * @param producer The producer to wrap
177 * @returns The wrapper producer
187 * @returns The wrapper producer
178 */
188 */
179 const fuse = <T>(producer: Producer<T>) => ({ next, error, complete }: FusedSink<T>) => {
189 const fuse = <T>(producer: Producer<T>) => ({ next, error, complete }: FusedSink<T>) => {
180 let done = false;
190 let done = false;
181 let cleanup = noop;
191 let cleanup = noop;
182
192
183 const _fin = <A extends unknown[]>(fn: (...args: A) => void) =>
193 const _fin = <A extends unknown[]>(fn: (...args: A) => void) =>
184 (...args: A) => done ?
194 (...args: A) => done ?
185 void (0) :
195 void (0) :
186 (done = true, cleanup(), fn(...args));
196 (done = true, cleanup(), fn(...args));
187
197
188 const _fin0 = () => done ? void (0) : (done = true, cleanup());
198 const _fin0 = () => done ? void (0) : (done = true, cleanup());
189
199
190 const safeSink = {
200 const safeSink = {
191 next: (value: T) => { !done && next(value); },
201 next: (value: T) => { !done && next(value); },
192 error: _fin(error),
202 error: _fin(error),
193 complete: _fin(complete),
203 complete: _fin(complete),
194 isClosed: () => done
204 isClosed: () => done
195 };
205 };
196 // call the producer
206 // call the producer
197 cleanup = producer(safeSink) ?? noop;
207 cleanup = producer(safeSink) ?? noop;
198 // if the producer throws exception bypass it to the caller rather then to
208 // if the producer throws exception bypass it to the caller rather then to
199 // the sink. This is a feature.
209 // the sink. This is a feature.
200
210
201 // if the producer completed the sequence immediately call the cleanup in place
211 // if the producer completed the sequence immediately call the cleanup in place
202 return done ? cleanup() : _fin0;
212 return done ? cleanup() : _fin0;
203 };
213 };
204
214
205 const _observe = <T>(producer: FusedProducer<T>): Observable<T> => ({
215 const _observe = <T>(producer: FusedProducer<T>): Observable<T> => ({
206 subscribe: (consumer: Observer<T>) => ({
216 subscribe: (consumer: Observer<T> = {}) => ({
207 unsubscribe: producer(sink(consumer)) ?? noop
217 unsubscribe: producer(sink(consumer)) ?? noop
208 }),
218 }),
209
219
210 map: (mapper) => _observe(({ next, ...rest }) =>
220 map: (mapper) => _observe(({ next, ...rest }) =>
211 producer({
221 producer({
212 next: next !== noop ? (v: T) => next(mapper(v)) : noop,
222 next: next !== noop ? (v: T) => next(mapper(v)) : noop,
213 ...rest
223 ...rest
214 })
224 })
215 ),
225 ),
216
226
227 tap: ({next: tapNext, complete: tapComplete, error: tapError}) => _observe(({next,complete, error}) =>
228 producer({
229 next: tapNext ? (v => (tapNext(v), next(v))) : next,
230 complete: tapComplete ? (() => (tapComplete(), complete())): complete,
231 error: tapError ? (e => (tapError(e), error(e))) : error
232 })
233 ),
234
217 filter: (predicate) => _observe(({ next, ...rest }) =>
235 filter: (predicate) => _observe(({ next, ...rest }) =>
218 producer({
236 producer({
219 next: next !== noop ? (v: T) => predicate(v) ? next(v) : void (0) : noop,
237 next: next !== noop ? (v: T) => predicate(v) ? next(v) : void (0) : noop,
220 ...rest
238 ...rest
221 })
239 })
222 ),
240 ),
223
241
224 until: predicate => _observe(({ next, complete, ...rest }) =>
242 until: predicate => _observe(({ next, complete, ...rest }) =>
225 producer({
243 producer({
226 next: v => predicate(v) ? complete() : next(v),
244 next: v => predicate(v) ? complete() : next(v),
227 complete,
245 complete,
228 ...rest
246 ...rest
229 })
247 })
230 ),
248 ),
231
249
232 while: predicate => _observe(({ next, complete, ...rest }) =>
250 while: predicate => _observe(({ next, complete, ...rest }) =>
233 producer({
251 producer({
234 next: v => predicate(v) ? next(v) : complete(),
252 next: v => predicate(v) ? next(v) : complete(),
235 complete,
253 complete,
236 ...rest
254 ...rest
237 })
255 })
238 ),
256 ),
239
257
240 scan: <A>(...args: [AccumulatorFn<T, A>, A] | [AccumulatorFn<T, T>]) => _observe<T | A>(({ next, ...rest }) => {
258 scan: <A>(...args: [AccumulatorFn<T, A>, A] | [AccumulatorFn<T, T>]) => _observe<T | A>(({ next, ...rest }) => {
241 if (args.length === 1) {
259 if (args.length === 1) {
242 const [accumulator] = args;
260 const [accumulator] = args;
243 let _acc: T;
261 let _acc: T;
244 let index = 0;
262 let index = 0;
245 return producer({
263 return producer({
246 next: next !== noop ? (v: T) => next(index++ === 0 ? _acc = v : _acc = accumulator(_acc, v)) : noop,
264 next: next !== noop ? (v: T) => next(index++ === 0 ? _acc = v : _acc = accumulator(_acc, v)) : noop,
247 ...rest
265 ...rest
248 });
266 });
249 } else {
267 } else {
250 const [accumulator, initial] = args;
268 const [accumulator, initial] = args;
251 let _acc = initial;
269 let _acc = initial;
252 return producer({
270 return producer({
253 next: next !== noop ? (v: T) => next(_acc = accumulator(_acc, v)) : noop,
271 next: next !== noop ? (v: T) => next(_acc = accumulator(_acc, v)) : noop,
254 ...rest
272 ...rest
255 });
273 });
256 }
274 }
257 }),
275 }),
258
276
259 reduce: <A>(...args: [AccumulatorFn<T, A>, A] | [AccumulatorFn<T, T>]) => _observe<T | A>(({ next, complete, error }) => {
277 reduce: <A>(...args: [AccumulatorFn<T, A>, A] | [AccumulatorFn<T, T>]) => _observe<T | A>(({ next, complete, error }) => {
260 if (args.length === 1) {
278 if (args.length === 1) {
261 const [accumulator] = args;
279 const [accumulator] = args;
262 let _acc: T;
280 let _acc: T;
263 let index = 0;
281 let index = 0;
264 return producer({
282 return producer({
265 next: next !== noop ? (v: T) => {
283 next: next !== noop ? (v: T) => {
266 _acc = index++ === 0 ? v : accumulator(_acc, v);
284 _acc = index++ === 0 ? v : accumulator(_acc, v);
267 } : noop,
285 } : noop,
268 complete: () => {
286 complete: () => {
269 if (index === 0) {
287 if (index === 0) {
270 error(new Error("The sequence can't be empty"));
288 error(new Error("The sequence can't be empty"));
271 } else {
289 } else {
272 next(_acc);
290 next(_acc);
273 complete();
291 complete();
274 }
292 }
275 },
293 },
276 error
294 error
277 });
295 });
278 } else {
296 } else {
279 const [accumulator, initial] = args;
297 const [accumulator, initial] = args;
280 let _acc = initial;
298 let _acc = initial;
281 return producer({
299 return producer({
282 next: next !== noop ? (v: T) => {
300 next: next !== noop ? (v: T) => {
283 _acc = accumulator(_acc, v);
301 _acc = accumulator(_acc, v);
284 } : noop,
302 } : noop,
285 complete: () => {
303 complete: () => {
286 next(_acc);
304 next(_acc);
287 complete();
305 complete();
288 },
306 },
289 error
307 error
290 });
308 });
291 }
309 }
292 }),
310 }),
293
311
294 cat: (...seq) => _observe(({ next, complete: final, ...rest }) => {
312 cat: (...seq) => _observe(({ next, complete: final, ...rest }) => {
295 let cleanup: () => void;
313 let cleanup: () => void;
296 const len = seq.length;
314 const len = seq.length;
297 const complete = (i: number) => i < len ?
315 const complete = (i: number) => i < len ?
298 () => {
316 () => {
299 const subscription = seq[i].subscribe({ next, complete: complete(i + 1), ...rest });
317 const subscription = seq[i].subscribe({ next, complete: complete(i + 1), ...rest });
300 cleanup = subscription.unsubscribe.bind(subscription);
318 cleanup = subscription.unsubscribe.bind(subscription);
301 } : final;
319 } : final;
302
320
303 cleanup = producer({ next, complete: complete(0), ...rest }) ?? noop;
321 cleanup = producer({ next, complete: complete(0), ...rest }) ?? noop;
304
322
305 return () => cleanup();
323 return () => cleanup();
306 }),
324 }),
307
325
308 pipe: <U>(op: OperatorFn<T, U>) => op(_observe(producer)),
326 pipe: <U>(op: OperatorFn<T, U>) => op(_observe(producer)),
309
327
310 next: collect(
328 next: collect(
311 producer,
329 producer,
312 ({ next, complete, error }) => ({
330 ({ next, complete, error }) => ({
313 next: v => (next(v), complete()),
331 next: v => (next(v), complete()),
314 complete: () => error(new Error("The sequence is empty")),
332 complete: () => error(new Error("The sequence is empty")),
315 error
333 error
316 })
334 })
317 ),
335 ),
318
336
319 collect: collect(
337 collect: collect(
320 producer,
338 producer,
321 ({ next, complete, error}) => {
339 ({ next, complete, error}) => {
322 const data: T[] = [];
340 const data: T[] = [];
323 return {
341 return {
324 next: v => data.push(v),
342 next: v => data.push(v),
325 complete: () => (next(data), complete()),
343 complete: () => (next(data), complete()),
326 error
344 error
327 };
345 };
328 }
346 }
329 )
347 )
330 });
348 });
331
349
332 const collect = <T, U>(
350 const collect = <T, U>(
333 producer: FusedProducer<T>,
351 producer: FusedProducer<T>,
334 collector: (result: FusedSink<U>) => FusedSink<T>
352 collector: (result: FusedSink<U>) => FusedSink<T>
335 ) => (ct = Cancellation.none) => new Promise<U>((resolve, reject) => {
353 ) => (ct = Cancellation.none) => new Promise<U>((resolve, reject) => {
336 const fused = fuse<U>(({ next, complete, error, isClosed }) => {
354 const fused = fuse<U>(({ next, complete, error, isClosed }) => {
337 const h = ct.register(error);
355 const h = ct.register(error);
338 const cleanup = !isClosed() ?
356 const cleanup = !isClosed() ?
339 producer(collector({ next, complete, error })) ?? noop :
357 producer(collector({ next, complete, error })) ?? noop :
340 noop;
358 noop;
341
359
342 return () => {
360 return () => {
343 h.destroy();
361 h.destroy();
344 cleanup();
362 cleanup();
345 };
363 };
346 });
364 });
347
365
348 fused({
366 fused({
349 next: resolve,
367 next: resolve,
350 error: reject,
368 error: reject,
351 complete: noop
369 complete: noop
352 });
370 });
353 });
371 });
354
372
355 export const observe = <T>(producer: Producer<T>) => _observe(fuse(producer));
373 export const observe = <T>(producer: Producer<T>) => _observe(fuse(producer));
356
374
357 /** Converts an array to the observable sequence of its elements. */
375 /** Converts an array to the observable sequence of its elements. */
358 export const ofArray = <T>(items: T[]) => _observe<T>(
376 export const ofArray = <T>(items: T[]) => _observe<T>(
359 ({ next, complete }) => (
377 ({ next, complete }) => (
360 items.forEach(next),
378 items.forEach(next),
361 complete()
379 complete()
362 )
380 )
363 );
381 );
364
382
365 /** Converts a subscribable to the observable */
383 /** Converts a subscribable to the observable */
366 export const ofSubscribable = <T>(subscribable: Subscribable<T>) =>
384 export const ofSubscribable = <T>(subscribable: Subscribable<T>) =>
367 observe<T>(sink => {
385 observe<T>(sink => {
368 const subscription = subscribable.subscribe(sink);
386 const subscription = subscribable.subscribe(sink);
369 return () => subscription.unsubscribe();
387 return () => subscription.unsubscribe();
370 });
388 });
371
389
372 const of1 = <T>(item: T | PromiseLike<T>) => observe<T>(
390 const of1 = <T>(item: T | PromiseLike<T>) => observe<T>(
373 ({ next, error, complete }) =>
391 ({ next, error, complete }) =>
374 isPromise(item) ?
392 isPromise(item) ?
375 void item.then(
393 void item.then(
376 v => (next(v), complete()),
394 v => (next(v), complete()),
377 error
395 error
378 ) :
396 ) :
379 (next(item), complete())
397 (next(item), complete())
380 );
398 );
381
399
382 /** Converts a list of parameter values to the observable sequence. The
400 /** Converts a list of parameter values to the observable sequence. The
383 * order of elements in the list will be preserved in the resulting sequence.
401 * order of elements in the list will be preserved in the resulting sequence.
384 */
402 */
385 export const of = <T>(...items: (T | PromiseLike<T>)[]) => items.length === 1 ?
403 export const of = <T>(...items: (T | PromiseLike<T>)[]) => items.length === 1 ?
386 of1(items[0]) :
404 of1(items[0]) :
387 observe<T>(
405 observe<T>(
388 ({ next, error, complete, isClosed }) => {
406 ({ next, error, complete, isClosed }) => {
389 const n = items.length;
407 const n = items.length;
390
408
391 const _next = (start: number) => {
409 const _next = (start: number) => {
392 if (start > 0 && isClosed()) // when resumed
410 if (start > 0 && isClosed()) // when resumed
393 return;
411 return;
394
412
395 for (let i = start; i < n; i++) {
413 for (let i = start; i < n; i++) {
396 const r = items[i];
414 const r = items[i];
397 if (isPromise(r)) {
415 if (isPromise(r)) {
398 r.then(v => (next(v), _next(i + 1)), error);
416 r.then(v => (next(v), _next(i + 1)), error);
399 return; // suspend
417 return; // suspend
400 } else {
418 } else {
401 next(r);
419 next(r);
402 }
420 }
403 }
421 }
404 complete();
422 complete();
405 };
423 };
406
424
407 _next(0);
425 _next(0);
408 }
426 }
409 );
427 );
410
428
411 export const empty = _observe<never>(({ complete }) => complete());
429 export const empty = _observe<never>(({ complete }) => complete());
@@ -1,125 +1,152
1 import { PromiseOrValue } from "@implab/core-amd/interfaces";
1 import { PromiseOrValue } from "@implab/core-amd/interfaces";
2 import { isCancellable, isPromise } from "@implab/core-amd/safe";
2 import { isCancellable, isPromise } from "@implab/core-amd/safe";
3 import { observe, Observable, empty } from "./observable";
3 import { observe, Observable, empty } from "./observable";
4 import { after } from "dojo/aspect";
4 import { after } from "dojo/aspect";
5
5
6 export interface OrderedUpdate<T> {
6 export interface OrderedUpdate<T> {
7 /** The item is being updated */
7 /** The item is being updated */
8 readonly item: T;
8 readonly item: T;
9
9
10 /** The previous index of the item, -1 in case it is inserted */
10 /** The previous index of the item, -1 in case it is inserted */
11 readonly prevIndex: number;
11 readonly prevIndex: number;
12
12
13 /** The new index of the item, -1 in case it is deleted */
13 /** The new index of the item, -1 in case it is deleted */
14 readonly newIndex: number;
14 readonly newIndex: number;
15
15
16 }
16 }
17
17
18 export type QueryResults<T> = Observable<OrderedUpdate<T>>;
18 export type QueryResults<T> = Observable<OrderedUpdate<T>>;
19
19
20 interface DjObservableResults<T> {
20 interface DjObservableResults<T> {
21 /**
21 /**
22 * Allows observation of results
22 * Allows observation of results
23 */
23 */
24 observe(listener: (object: T, previousIndex: number, newIndex: number) => void, includeUpdates?: boolean): {
24 observe(listener: (object: T, previousIndex: number, newIndex: number) => void, includeUpdates?: boolean): {
25 remove(): void;
25 remove(): void;
26 };
26 };
27 }
27 }
28
28
29 interface Queryable<T, Q, O> {
29 interface Queryable<T, Q, O> {
30 query(query?: Q, options?: O): PromiseOrValue<T[]>;
30 query(query?: Q, options?: O): PromiseOrValue<T[]>;
31 }
31 }
32
32
33 export const isDjObservableResults = <T>(v: object): v is DjObservableResults<T> =>
33 export const isDjObservableResults = <T>(v: object): v is DjObservableResults<T> =>
34 v && (typeof (v as { observe?: unknown; }).observe === "function");
34 v && (typeof (v as { observe?: unknown; }).observe === "function");
35
35
36 export const query = <T, Q, O>(store: Queryable<T, Q, O>, includeUpdates = true) =>
36 export const query = <T, Q, O>(store: Queryable<T, Q, O>, includeUpdates = true) => {
37 (query?: Q, options?: O & { observe?: boolean }) => {
37 const q = queryEx(store, includeUpdates);
38 return observe<OrderedUpdate<T>>(({ next, complete, error, isClosed }) => {
38 return (query?: Q, options?: O & { observe?: boolean }) => {
39 const [data, updates] = q(query, options);
40
41 return options?.observe === false ? data : data.cat(updates);
42 };
43 };
39
44
45 export const queryEx = <T, Q, O>(store: Queryable<T, Q, O>, includeUpdates = true) =>
46 (query?: Q, options?: O): [data: QueryResults<T>, updates: QueryResults<T>] => {
47
48 const pending: T[] = [];
49
50 let results: PromiseOrValue<T[]> = pending;
51
52 const data = observe<OrderedUpdate<T>>(({ next, complete, error }) => {
40 const processResults = (items: T[]) =>
53 const processResults = (items: T[]) =>
41 items.forEach((item, newIndex) => next({ item, newIndex, prevIndex: -1 }));
54 items.forEach((item, newIndex) => next({ item, newIndex, prevIndex: -1 }));
42
55
43 try {
56 try {
44 const results = store.query(query, options);
57 if (results === pending)
58 results = store.query(query, options);
59
45 if (isPromise(results)) {
60 if (isPromise(results)) {
46 results.then(processResults).then(undefined, error);
61 results.then(processResults).then(complete, error);
62
63 if (isCancellable(results))
64 return results.cancel.bind(results);
47 } else {
65 } else {
48 processResults(results);
66 processResults(results);
67 complete();
49 }
68 }
69 } catch (e) {
70 error(e);
71 }
72 });
50
73
51 if (!isClosed() && (options?.observe !== false) && isDjObservableResults<T>(results)) {
74 const updates = observe<OrderedUpdate<T>>(({ next, complete, error, isClosed }) => {
75 try {
76 if (!isClosed() && isDjObservableResults<T>(results)) {
52 const h = results.observe((item, prevIndex, newIndex) => next({ item, prevIndex, newIndex }), includeUpdates);
77 const h = results.observe((item, prevIndex, newIndex) => next({ item, prevIndex, newIndex }), includeUpdates);
53 return () => h.remove();
78 return () => h.remove();
54 } else {
79 } else {
55 complete();
80 complete();
56 }
81 }
57 } catch (err) {
82 } catch (e) {
58 error(err);
83 error(e);
59 }
84 }
60 });
85 });
61
86
87 return [data, updates];
62 };
88 };
63
89
90
64 interface IndexedStore<T> {
91 interface IndexedStore<T> {
65 get(id: string | number): PromiseLike<T> | T | null | undefined;
92 get(id: string | number): PromiseLike<T> | T | null | undefined;
66 }
93 }
67
94
68 interface Notifications<T> {
95 interface Notifications<T> {
69 notify(item: T | undefined, id: string | number | undefined): void;
96 notify(item: T | undefined, id: string | number | undefined): void;
70 }
97 }
71
98
72 const hasNotifications = <T>(x: unknown): x is Notifications<T> =>
99 const hasNotifications = <T>(x: unknown): x is Notifications<T> =>
73 typeof x === "object" && x !== null && (typeof (x as Notifications<T>).notify === "function");
100 typeof x === "object" && x !== null && (typeof (x as Notifications<T>).notify === "function");
74
101
75 interface GetOpts {
102 interface GetOpts {
76 observe?: boolean;
103 observe?: boolean;
77 }
104 }
78
105
79 type Change<T> = [item: NonNullable<T>, id: string | number | undefined] |
106 export type ItemUpdate<T> = [item: NonNullable<T>, id: string | number | undefined] |
80 [item: undefined | null, id: string | number];
107 [item: undefined | null, id: string | number];
81
108
82 const filterItem = (itemId: string | number) =>
109 const filterItem = (itemId: string | number) =>
83 <T>(source: Observable<Change<T>>) =>
110 <T>(source: Observable<ItemUpdate<T>>) =>
84 observe<T>(({ next, complete, error }) => {
111 observe<T>(({ next, complete, error }) => {
85 const subscription = source
112 const subscription = source
86 .filter(([, id]) => id === itemId)
113 .filter(([, id]) => id === itemId)
87 .subscribe({
114 .subscribe({
88 next: ([item]) => item !== null && item !== undefined ? next(item) : complete(),
115 next: ([item]) => item !== null && item !== undefined ? next(item) : complete(),
89 complete,
116 complete,
90 error
117 error
91 });
118 });
92 return () => subscription.unsubscribe();
119 return () => subscription.unsubscribe();
93 });
120 });
94
121
95 export const get = <T>(store: IndexedStore<T>) => {
122 export const get = <T>(store: IndexedStore<T>) => {
96 const changes = hasNotifications<T>(store) ?
123 const changes = hasNotifications<T>(store) ?
97 observe<Change<T>>(({ next }) => {
124 observe<ItemUpdate<T>>(({ next }) => {
98 const handle = after(store, "notify", (...args: Change<T>) => next(args), true);
125 const handle = after(store, "notify", (...args: ItemUpdate<T>) => next(args), true);
99 return () => handle.remove();
126 return () => handle.remove();
100 }) : empty;
127 }) : empty;
101
128
102 return (id: string | number, opts: GetOpts = {}) =>
129 return (id: string | number, opts: GetOpts = {}) =>
103 observe<T>(({ next, complete, error }) => {
130 observe<T>(({ next, complete, error }) => {
104 try {
131 try {
105 const result = store.get(id);
132 const result = store.get(id);
106
133
107 const handle = (x: T | null | undefined) => {
134 const handle = (x: T | null | undefined) => {
108 if (x !== null && x !== undefined)
135 if (x !== null && x !== undefined)
109 next(x);
136 next(x);
110 complete();
137 complete();
111 };
138 };
112
139
113 if (isPromise(result)) {
140 if (isPromise(result)) {
114 result.then(handle).then(undefined, error);
141 result.then(handle).then(undefined, error);
115
142
116 if (isCancellable(result))
143 if (isCancellable(result))
117 return () => result.cancel();
144 return () => result.cancel();
118 } else {
145 } else {
119 handle(result);
146 handle(result);
120 }
147 }
121 } catch (e) {
148 } catch (e) {
122 error(e);
149 error(e);
123 }
150 }
124 }).cat(opts.observe !== false ? changes.pipe(filterItem(id)) : empty);
151 }).cat(opts.observe !== false ? changes.pipe(filterItem(id)) : empty);
125 }; No newline at end of file
152 };
@@ -1,65 +1,89
1 import Memory = require("dojo/store/Memory");
1 import Memory = require("dojo/store/Memory");
2 import Observerable = require("dojo/store/Observable");
2 import Observerable = require("dojo/store/Observable");
3 import { get } from "./store";
3 import { get, queryEx } from "./store";
4 import tap = require("tap");
4 import tap = require("tap");
5
5
6 interface Person {
6 interface Person {
7 id: string;
7 id: string;
8
8
9 name: string;
9 name: string;
10
10
11 age: number;
11 age: number;
12 }
12 }
13
13
14 tap.test("store::get(...) tests", async t => {
14 tap.test("store::get(...) tests", async t => {
15 const store = new Observerable(new Memory<Person>());
15 const store = new Observerable(new Memory<Person>());
16
16
17 const getPerson = get(store);
17 const getPerson = get(store);
18
18
19 const peterId = "id:peter";
19 const peterId = "id:peter";
20
20
21 const samId = "id:sam";
21 const samId = "id:sam";
22
22
23 const peter = getPerson(peterId);
23 const peter = getPerson(peterId);
24 const sam = getPerson(samId);
24 const sam = getPerson(samId);
25
25
26 const seq1 = await getPerson(peterId, { observe: false }).collect();
26 const seq1 = await getPerson(peterId, { observe: false }).collect();
27
27
28 t.ok(seq1.length === 0, "Should be empty sequence");
28 t.ok(seq1.length === 0, "Should be empty sequence");
29
29
30 let peterChangeCount = 0;
30 let peterChangeCount = 0;
31 let samChangeCount = 0;
31 let samChangeCount = 0;
32 let peterDeleted = 0;
32 let peterDeleted = 0;
33
33
34 const peterSubscription = peter.subscribe({
34 const peterSubscription = peter.subscribe({
35 next: () => peterChangeCount++,
35 next: () => peterChangeCount++,
36 complete: () => peterDeleted++
36 complete: () => peterDeleted++
37 });
37 });
38 const samSubscription = sam.subscribe({
38 const samSubscription = sam.subscribe({
39 next: () => samChangeCount++
39 next: () => samChangeCount++
40 });
40 });
41
41
42 try {
42 try {
43 t.equal(peterChangeCount, 0, "Should be no changes recorded");
43 t.equal(peterChangeCount, 0, "Should be no changes recorded");
44
44
45 store.put({id: peterId, name: "Peter", age: 30 });
45 store.put({id: peterId, name: "Peter", age: 30 });
46
46
47 t.equal(peterChangeCount, 1, "Should record 1 object change");
47 t.equal(peterChangeCount, 1, "Should record 1 object change");
48 t.equal(samChangeCount, 0, "Should not record other object changes");
48 t.equal(samChangeCount, 0, "Should not record other object changes");
49
49
50 store.remove(peterId);
50 store.remove(peterId);
51
51
52 t.equal(peterDeleted, 1, "Should complete sequence");
52 t.equal(peterDeleted, 1, "Should complete sequence");
53 t.equal(peterChangeCount, 1, "Should not record remove operations");
53 t.equal(peterChangeCount, 1, "Should not record remove operations");
54
54
55 store.put({id: peterId, name: "Peter", age: 29});
55 store.put({id: peterId, name: "Peter", age: 29});
56
56
57 t.equal(peterChangeCount, 1, "Should not record changes after completion");
57 t.equal(peterChangeCount, 1, "Should not record changes after completion");
58
58
59 } finally {
59 } finally {
60 peterSubscription.unsubscribe();
60 peterSubscription.unsubscribe();
61 samSubscription.unsubscribe();
61 samSubscription.unsubscribe();
62 }
62 }
63
63
64 store.put({ id: samId, name: "Sam", age: 29});
65
66 const [data, updates] = queryEx(store)({ age: 29}, { sort: [{attribute: "id"}] });
67
68 const dump: string[] = [];
69
70 const subscription = data
71 .tap({
72 complete: () => dump.push("eof")
73 })
74 .cat(updates)
75 .tap({
76 next: ({item: {id}}) => dump.push(id),
77 complete: () => dump.push("eof")
78 })
79 .subscribe({});
80
81 t.same(dump, ["id:peter", "id:sam", "eof"]);
82
83 store.put({ id: "id:mary", name: "Mary", age: 29});
84
85 t.same(dump, ["id:peter", "id:sam", "eof", "id:mary"]);
86
87 subscription.unsubscribe();
64
88
65 }).catch(() => { }); No newline at end of file
89 }).catch(() => { });
@@ -1,150 +1,173
1 import { empty, observe, of } from "./observable";
1 import { empty, observe, of } from "./observable";
2 import * as tap from "tap";
2 import * as tap from "tap";
3 import { Cancellation } from "@implab/core-amd/Cancellation";
3 import { Cancellation } from "@implab/core-amd/Cancellation";
4 import { delay } from "@implab/core-amd/safe";
4 import { delay } from "@implab/core-amd/safe";
5
5
6 const subj1 = observe<number>(({ next, complete }) => {
6 const subj1 = observe<number>(({ next, complete }) => {
7 next(1);
7 next(1);
8 complete();
8 complete();
9 next(2);
9 next(2);
10 });
10 });
11
11
12 const consumer1 = {
12 const consumer1 = {
13 sum: 0,
13 sum: 0,
14 next(v: number) {
14 next(v: number) {
15 this.sum += v;
15 this.sum += v;
16 }
16 }
17 };
17 };
18
18
19 subj1.subscribe(consumer1);
19 subj1.subscribe(consumer1);
20 tap.equal(consumer1.sum, 1, "Should get only one value");
20 tap.equal(consumer1.sum, 1, "Should get only one value");
21
21
22 subj1.subscribe(consumer1);
22 subj1.subscribe(consumer1);
23 tap.equal(consumer1.sum, 2, "Should get the value again");
23 tap.equal(consumer1.sum, 2, "Should get the value again");
24
24
25 const consumer2 = {
25 const consumer2 = {
26 value: 0,
26 value: 0,
27 completed: false,
27 completed: false,
28 next(v: number) { this.value = v; },
28 next(v: number) { this.value = v; },
29 complete() { this.completed = true; }
29 complete() { this.completed = true; }
30 };
30 };
31
31
32 let maps = 0;
32 let maps = 0;
33
33
34 subj1
34 subj1
35 .map(v => {
35 .map(v => {
36 tap.comment(`map1: ${v * 2}`);
36 tap.comment(`map1: ${v * 2}`);
37 maps++;
37 maps++;
38 return v * 2;
38 return v * 2;
39 })
39 })
40 .map(v => {
40 .map(v => {
41 tap.comment(`map2: ${v * 2}`);
41 tap.comment(`map2: ${v * 2}`);
42 maps++;
42 maps++;
43 return v * 2;
43 return v * 2;
44 })
44 })
45 .map(v => {
45 .map(v => {
46 tap.comment(`map3: ${v * 2}`);
46 tap.comment(`map3: ${v * 2}`);
47 maps++;
47 maps++;
48 return v * 2;
48 return v * 2;
49 })
49 })
50 .subscribe(consumer2);
50 .subscribe(consumer2);
51
51
52 tap.equal(consumer2.value, 8, "Should map");
52 tap.equal(consumer2.value, 8, "Should map");
53 tap.equal(maps, 3, "The map chain should not be executed after completion");
53 tap.equal(maps, 3, "The map chain should not be executed after completion");
54 tap.ok(consumer2.completed, "The completion signal should pass through");
54 tap.ok(consumer2.completed, "The completion signal should pass through");
55
55
56 const subj2 = observe<number>(({ next, complete }) => {
56 const subj2 = observe<number>(({ next, complete }) => {
57 [1, 2, 3, 4, 5].forEach(next);
57 [1, 2, 3, 4, 5].forEach(next);
58 complete();
58 complete();
59 return () => {
59 return () => {
60 tap.comment("subj2: unsubscribe");
60 tap.comment("subj2: unsubscribe");
61 };
61 };
62 });
62 });
63
63
64 const consumer3 = {
64 const consumer3 = {
65 even: 0,
65 even: 0,
66 odd: 0,
66 odd: 0,
67 completed: false,
67 completed: false,
68 subscribed: 0,
68 subscribed: 0,
69 unsubscribed: 0,
69 unsubscribed: 0,
70 next(v: "even" | "odd") {
70 next(v: "even" | "odd") {
71 this[v]++;
71 this[v]++;
72 },
72 },
73 complete() {
73 complete() {
74 this.completed = true;
74 this.completed = true;
75 },
75 },
76 subscribe() {
76 subscribe() {
77 this.subscribed++;
77 this.subscribed++;
78 },
78 },
79 unsubscribe() {
79 unsubscribe() {
80 this.unsubscribed++;
80 this.unsubscribed++;
81 }
81 }
82 };
82 };
83
83
84
84
85 const subj3 = subj2.pipe<"even" | "odd">(self => observe(({ next, complete, error }) => {
85 const subj3 = subj2.pipe<"even" | "odd">(self => observe(({ next, complete, error }) => {
86 consumer3.subscribe();
86 consumer3.subscribe();
87 let count = 0;
87 let count = 0;
88 const h = self.subscribe({
88 const h = self.subscribe({
89 next: val => {
89 next: val => {
90 if (val % 2 === 0)
90 if (val % 2 === 0)
91 next("odd");
91 next("odd");
92 else
92 else
93 next("even");
93 next("even");
94 if (++count === 4)
94 if (++count === 4)
95 complete();
95 complete();
96 },
96 },
97 complete,
97 complete,
98 error
98 error
99 });
99 });
100 return () => {
100 return () => {
101 consumer3.unsubscribe();
101 consumer3.unsubscribe();
102 h.unsubscribe();
102 h.unsubscribe();
103 };
103 };
104 }));
104 }));
105
105
106 subj3.subscribe(consumer3);
106 subj3.subscribe(consumer3);
107
107
108 tap.equal(consumer3.odd, 2, "Should get 2 odd elements");
108 tap.equal(consumer3.odd, 2, "Should get 2 odd elements");
109 tap.equal(consumer3.even, 2, "Should get 2 even elements");
109 tap.equal(consumer3.even, 2, "Should get 2 even elements");
110 tap.ok(consumer3.completed, "The sequence should completed");
110 tap.ok(consumer3.completed, "The sequence should completed");
111 tap.equal(consumer3.subscribed, 1, "The subscription should be done once");
111 tap.equal(consumer3.subscribed, 1, "The subscription should be done once");
112 tap.equal(consumer3.unsubscribed, 1, "The cleanup should be done after completion");
112 tap.equal(consumer3.unsubscribed, 1, "The cleanup should be done after completion");
113
113
114 subj2.reduce((a, b) => a + b).subscribe({
114 subj2.reduce((a, b) => a + b).subscribe({
115 next: val => tap.comment("subj2: reduce =", val),
115 next: val => tap.comment("subj2: reduce =", val),
116 complete: () => tap.comment("subj2: complete")
116 complete: () => tap.comment("subj2: complete")
117 });
117 });
118
118
119 tap.test("of(...) tests", async t => {
119 tap.test("of(...) tests", async t => {
120 await subj2.reduce((a, b) => a + b).next()
120 await subj2.reduce((a, b) => a + b).next()
121 .then(value => t.comment("subj2: next reduce=", value));
121 .then(value => t.comment("subj2: next reduce=", value));
122
122
123 await subj2.next().then(val => t.equal(val, 1, "Should peek the first element"));
123 await subj2.next().then(val => t.equal(val, 1, "Should peek the first element"));
124
124
125 const cancelled = new Cancellation(cancel => cancel());
125 const cancelled = new Cancellation(cancel => cancel());
126 await t.rejects(subj2.next(cancelled), "Cancelled next() method should fail");
126 await t.rejects(subj2.next(cancelled), "Cancelled next() method should fail");
127
127
128 await t.rejects(empty.next(), "Empty sequence should fail to get next element");
128 await t.rejects(empty.next(), "Empty sequence should fail to get next element");
129
129
130 await of(delay(1).then(() => 1), Promise.resolve(2), 3)
130 await of(delay(1).then(() => 1), Promise.resolve(2), 3)
131 .reduce<number[]>((a, x) => [...a, x], [])
131 .reduce<number[]>((a, x) => [...a, x], [])
132 .next()
132 .next()
133 .then(res => t.same(res, [1, 2, 3], "of(...) should keep the order"));
133 .then(res => t.same(res, [1, 2, 3], "of(...) should keep the order"));
134
134
135 const rejected = Promise.reject("DIE!");
135 const rejected = Promise.reject("DIE!");
136 rejected.catch(() => { }); // SAFE AND SOUND
136 rejected.catch(() => { }); // SAFE AND SOUND
137
137
138 await t.resolves(
138 await t.resolves(
139 of(Promise.resolve(1), rejected).next(),
139 of(Promise.resolve(1), rejected).next(),
140 "of(...) should emit non-rejected items"
140 "of(...) should emit non-rejected items"
141 );
141 );
142 await t.rejects(
142 await t.rejects(
143 of(1, Promise.reject("DIE!")).reduce((a) => a).next(),
143 of(1, Promise.reject("DIE!")).reduce((a) => a).next(),
144 "of(...) should terminate with error when a parameter is rejected"
144 "of(...) should terminate with error when a parameter is rejected"
145 );
145 );
146
146
147 t.same(await of(1,2,3).collect(), [1,2,3], ".collect() should return the collected sequence");
147 t.same(await of(1,2,3).collect(), [1,2,3], ".collect() should return the collected sequence");
148 await t.rejects(of(1,2,3).collect(cancelled), ".collect() should support cancellation");
148 await t.rejects(of(1,2,3).collect(cancelled), ".collect() should support cancellation");
149
149
150 }).catch(() => { });
151
152 tap.test(".tap() tests", async t => {
153 const side: number[] = [];
154
155 of(1,2)
156 .tap({next: v => side.push(v), complete: () => side.push(0)})
157 .tap({next: v => side.push(v*v)})
158 .subscribe({});
159
160 t.same(side, [1,1,2,4,0], ".tap() should be called in the order of registration");
161
162 side.length = 0;
163
164 await new Promise<void>(resolve => {
165 of(1,2,delay(1).then(() => 3))
166 .tap({next: v => side.push(v)})
167 .tap({ next: v => v === 1 && resolve()})
168 .subscribe({});
169 });
170
171 t.same(side, [1,2], ".tap() should be processed synchronously");
172
150 }).catch(() => { }); No newline at end of file
173 }).catch(() => {});
General Comments 0
You need to be logged in to leave comments. Login now