##// END OF EJS Templates
added 'buffer' and 'subject' observable operators
cin -
r133:a3fba6b6c42e default
parent child
Show More
@@ -0,0 +1,105
1 import { Cancellation } from "@implab/core-amd/Cancellation";
2 import { CancelledError } from "@implab/core-amd/CancelledError";
3 import { Producer, Sink, Subscribable } from "../observable";
4 import { Scope } from "../tsx/Scope";
5
6 /**
7 * Creates a buffer with the specified length. The buffer will immediately
8 * subscribe to the source observable and start accumulating values.
9 *
10 * The resulting observable will repeat the buffered values for each new subscriber.
11 *
12 * @param length The number of elements to store.
13 * @param ct Cancellation token to unsubscribe from the original observable.
14 *
15 */
16 export const buffer = (length: number, ct = Cancellation.none) => <T>(source: Subscribable<T>): Producer<T> => {
17 type Status = "active" | "complete" | "error";
18
19 // ring-buffer, wpos will rotate in range (0...length-1)
20 let wpos = 0;
21 const buffer: T[] = [];
22
23 // writes the next value to the buffer
24 const write = (value: T) => {
25 buffer[wpos] = value;
26 wpos = (wpos + 1) % length;
27 };
28
29 // reads contents of the buffer
30 // cb will be called for each value in the buffer
31 const read = (cb: (item: T) => void) => {
32 const start = wpos + length - buffer.length;
33 const end = wpos + length;
34
35 for(let pos = start; pos < end; pos++ )
36 cb(buffer[pos % length]);
37 };
38
39 let status: Status = "active";
40 let lastError: unknown;
41 let subscribers: Sink<T>[] = [];
42
43 const scope = new Scope();
44
45 // cleanup method to release resources held by this subscription
46 const cleanup = (cb: (item: Sink<T>) => void) => {
47 scope.destroy();
48 const _subscribers = subscribers;
49 subscribers = [];
50 _subscribers.forEach(cb);
51 };
52
53 const sink: Sink<T> = {
54 isClosed: () => status !== "active",
55 complete: () => {
56 if (status === "active") {
57 status = "complete";
58 cleanup(s => s.complete());
59 }
60 },
61 error: e => {
62 if (status === "active") {
63 status = "error";
64 lastError = e;
65 cleanup(s => s.error(e));
66 }
67 },
68 next: v => {
69 if (status === "active") {
70 write(v);
71 const _subscribers = subscribers;
72 _subscribers.forEach(s => s.next(v));
73 }
74 }
75 };
76
77 if (ct.isRequested()) {
78 sink.error(new CancelledError("The operation was cancelled", ct));
79 } else {
80 scope.own(source.subscribe(sink));
81 scope.own(ct.register(e => sink.error(e)));
82 }
83
84 return (s: Sink<T>) => {
85 const _subscribers = subscribers;
86 read(s.next);
87 switch (status) {
88 case "active":
89 subscribers.push(s);
90 return () => {
91 if (_subscribers === subscribers) {
92 const pos = subscribers.indexOf(s);
93 if (pos >= 0)
94 subscribers.splice(pos, 1);
95 }
96 };
97 case "complete":
98 s.complete();
99 break;
100 case "error":
101 s.error(lastError);
102 break;
103 }
104 };
105 }; No newline at end of file
@@ -0,0 +1,50
1 import { Producer, Sink, Subscribable } from "../observable";
2
3 const noop = () => { };
4
5 /** Joins multiple subscribers to the single one. The resulting subscriber
6 * will be created when the first client subscribes and will be released
7 * with the the last client unsubscribed.
8 *
9 * Use this wrapper to prevent spawning multiple producers.
10 *
11 * @param source The source observable
12 * @returns The wrapped producer
13 */
14 export const subject = <T>(source: Subscribable<T>): Producer<T> => {
15 let subscribers: Sink<T>[] = [];
16
17 let subscription = { unsubscribe: noop };
18
19 // cleanup method to release resources held by this subscription
20 const cleanup = (cb: (item: Sink<T>) => void) => {
21 const _subscribers = subscribers;
22 subscribers = [];
23 _subscribers.forEach(cb);
24 subscription.unsubscribe();
25 };
26
27 const sink: Sink<T> = {
28 isClosed: () => false,
29 complete: () => cleanup(s => s.complete()),
30 error: e => cleanup(s => s.error(e)),
31 next: v => subscribers.forEach(s => s.next(v))
32 };
33
34 return client => {
35 const _subscribers = subscribers;
36 subscribers.push(client);
37 if (subscribers.length === 1)
38 subscription = source.subscribe(sink);
39
40 return () => {
41 if (_subscribers === subscribers) {
42 const pos = subscribers.indexOf(client);
43 if (pos >= 0)
44 subscribers.splice(pos, 1);
45 if (!subscribers.length)
46 subscription.unsubscribe();
47 }
48 };
49 };
50 };
@@ -1,15 +1,16
1 {
1 {
2 "java.configuration.updateBuildConfiguration": "automatic",
2 "java.configuration.updateBuildConfiguration": "automatic",
3 "files.exclude": {
3 "files.exclude": {
4 "**/.classpath": true,
4 "**/.classpath": true,
5 "**/.project": true,
5 "**/.project": true,
6 "**/.settings": true,
6 "**/.settings": true,
7 "**/.factorypath": true
7 "**/.factorypath": true
8 },
8 },
9 "cSpell.words": [
9 "cSpell.words": [
10 "dijit",
10 "dijit",
11 "djbase",
11 "djbase",
12 "djclass",
12 "djclass",
13 "Unsubscribable"
13 "Unsubscribable",
14 "wpos"
14 ]
15 ]
15 } No newline at end of file
16 }
@@ -1,523 +1,402
1 import { Cancellation } from "@implab/core-amd/Cancellation";
1 import { Cancellation } from "@implab/core-amd/Cancellation";
2 import { ICancellation } from "@implab/core-amd/interfaces";
2 import { ICancellation } from "@implab/core-amd/interfaces";
3 import { isPromise } from "@implab/core-amd/safe";
3 import { isPromise } from "@implab/core-amd/safe";
4
4
5 /**
5 /**
6 * The interface for the consumer of an observable sequence
6 * The interface for the consumer of an observable sequence
7 */
7 */
8 export interface Observer<T> {
8 export interface Observer<T> {
9 /**
9 /**
10 * Called for the next element in the sequence
10 * Called for the next element in the sequence
11 */
11 */
12 next: (value: T) => void;
12 next?: (value: T) => void;
13
13
14 /**
14 /**
15 * Called once when the error occurs in the sequence.
15 * Called once when the error occurs in the sequence.
16 */
16 */
17 error: (e: unknown) => void;
17 error?: (e: unknown) => void;
18
18
19 /**
19 /**
20 * Called once at the end of the sequence.
20 * Called once at the end of the sequence.
21 */
21 */
22 complete: () => void;
22 complete?: () => void;
23 }
23 }
24
24
25 /**
25 /**
26 * The group of functions to feed an observable. These methods are provided to
26 * The group of functions to feed an observable. These methods are provided to
27 * the producer to generate a stream of events.
27 * the producer to generate a stream of events.
28 */
28 */
29 export type Sink<T> = {
29 export type Sink<T> = {
30 /**
30 /**
31 * Call to send the next element in the sequence
31 * Call to send the next element in the sequence
32 */
32 */
33 next: (value: T) => void;
33 next: (value: T) => void;
34
34
35 /**
35 /**
36 * Call to notify about the error occurred in the sequence.
36 * Call to notify about the error occurred in the sequence.
37 */
37 */
38 error: (e: unknown) => void;
38 error: (e: unknown) => void;
39
39
40 /**
40 /**
41 * Call to signal the end of the sequence.
41 * Call to signal the end of the sequence.
42 */
42 */
43 complete: () => void;
43 complete: () => void;
44
44
45 /**
45 /**
46 * Checks whether the sink is accepting new elements. It's safe to
46 * Checks whether the sink is accepting new elements. It's safe to
47 * send elements to the closed sink.
47 * send elements to the closed sink.
48 */
48 */
49 isClosed: () => boolean;
49 isClosed: () => boolean;
50 };
50 };
51
51
52 export type Producer<T> = (sink: Sink<T>) => (void | (() => void));
52 export type Producer<T> = (sink: Sink<T>) => (void | (() => void));
53
53
54 export interface Unsubscribable {
54 export interface Unsubscribable {
55 unsubscribe(): void;
55 unsubscribe(): void;
56 }
56 }
57
57
58 export const isUnsubscribable = (v: unknown): v is Unsubscribable =>
58 export const isUnsubscribable = (v: unknown): v is Unsubscribable =>
59 v !== null && v !== undefined && typeof (v as Unsubscribable).unsubscribe === "function";
59 v !== null && v !== undefined && typeof (v as Unsubscribable).unsubscribe === "function";
60
60
61 export const isSubscribable = <T = unknown>(v: unknown): v is Subscribable<T> =>
61 export const isSubscribable = <T = unknown>(v: unknown): v is Subscribable<T> =>
62 v !== null && v !== undefined && typeof (v as Subscribable<unknown>).subscribe === "function";
62 v !== null && v !== undefined && typeof (v as Subscribable<unknown>).subscribe === "function";
63
63
64 export interface Subscribable<T> {
64 export interface Subscribable<T> {
65 subscribe(consumer: Partial<Observer<T>>): Unsubscribable;
65 subscribe(consumer: Observer<T>): Unsubscribable;
66 }
66 }
67
67
68 export type AccumulatorFn<T, A> = (acc: A, value: T) => A;
68 export type AccumulatorFn<T, A> = (acc: A, value: T) => A;
69
69
70 export type OperatorFn<T, U> = (source: Observable<T>) => Producer<U>;
70 export type OperatorFn<T, U> = (source: Observable<T>) => Producer<U>;
71
71
72 /** The observable source of items. */
72 /** The observable source of items. */
73 export interface Observable<T> extends Subscribable<T> {
73 export interface Observable<T> extends Subscribable<T> {
74 /** Transforms elements of the sequence with the specified mapper
74 /** Transforms elements of the sequence with the specified mapper
75 *
75 *
76 * @param mapper The mapper used to transform the values
76 * @param mapper The mapper used to transform the values
77 */
77 */
78 map<T2>(mapper: (value: T) => T2): Observable<T2>;
78 map<T2>(mapper: (value: T) => T2): Observable<T2>;
79
79
80 /** Filters elements of the sequence. The resulting sequence will
80 /** Filters elements of the sequence. The resulting sequence will
81 * contain only elements which match the specified predicate.
81 * contain only elements which match the specified predicate.
82 *
82 *
83 * @param predicate The filter predicate.
83 * @param predicate The filter predicate.
84 */
84 */
85 filter(predicate: (value: T) => boolean): Observable<T>;
85 filter(predicate: (value: T) => boolean): Observable<T>;
86
86
87 /** Completes the sequence once the condition is met.
87 /** Completes the sequence once the condition is met.
88 * @param predicate The condition which should be met to complete the sequence
88 * @param predicate The condition which should be met to complete the sequence
89 */
89 */
90 until(predicate: (value: T) => boolean): Observable<T>;
90 until(predicate: (value: T) => boolean): Observable<T>;
91
91
92 /** Keeps the sequence running while elements satisfy the condition.
92 /** Keeps the sequence running while elements satisfy the condition.
93 *
93 *
94 * @param predicate The condition which should be met to continue.
94 * @param predicate The condition which should be met to continue.
95 */
95 */
96 while(predicate: (value: T) => boolean): Observable<T>;
96 while(predicate: (value: T) => boolean): Observable<T>;
97
97
98 /** Applies accumulator to each value in the sequence and
98 /** Applies accumulator to each value in the sequence and
99 * emits the accumulated value for each source element
99 * emits the accumulated value for each source element
100 *
100 *
101 * @param accumulator
101 * @param accumulator
102 * @param initial
102 * @param initial
103 */
103 */
104 scan<A>(accumulator: AccumulatorFn<T, A>, initial: A): Observable<A>;
104 scan<A>(accumulator: AccumulatorFn<T, A>, initial: A): Observable<A>;
105 scan(accumulator: AccumulatorFn<T, T>): Observable<T>;
105 scan(accumulator: AccumulatorFn<T, T>): Observable<T>;
106
106
107 /** Applies accumulator to each value in the sequence and
107 /** Applies accumulator to each value in the sequence and
108 * emits the accumulated value at the end of the sequence
108 * emits the accumulated value at the end of the sequence
109 *
109 *
110 * @param accumulator
110 * @param accumulator
111 * @param initial
111 * @param initial
112 */
112 */
113 reduce<A>(accumulator: AccumulatorFn<T, A>, initial: A): Observable<A>;
113 reduce<A>(accumulator: AccumulatorFn<T, A>, initial: A): Observable<A>;
114 reduce(accumulator: AccumulatorFn<T, T>): Observable<T>;
114 reduce(accumulator: AccumulatorFn<T, T>): Observable<T>;
115
115
116 /** Concatenates the specified sequences with this observable
116 /** Concatenates the specified sequences with this observable
117 *
117 *
118 * @param seq sequences to concatenate with the current observable
118 * @param seq sequences to concatenate with the current observable
119 *
119 *
120 * The concatenation doesn't accumulate values from the specified sequences,
120 * The concatenation doesn't accumulate values from the specified sequences,
121 * The result of the concatenation is the new observable which will switch
121 * The result of the concatenation is the new observable which will switch
122 * to the next observable after the previous one completes. Values emitted
122 * to the next observable after the previous one completes. Values emitted
123 * before the next observable being active are lost.
123 * before the next observable being active are lost.
124 */
124 */
125 cat(...seq: Subscribable<T>[]): Observable<T>;
125 cat(...seq: Subscribable<T>[]): Observable<T>;
126
126
127
127
128 /** Pipes the specified operator to produce the new observable
128 /** Pipes the specified operator to produce the new observable
129 * @param op The operator consumes this observable and produces a new one
129 * @param op The operator consumes this observable and produces a new one
130 *
130 *
131 * The operator is a higher order function which takes a source observable
131 * The operator is a higher order function which takes a source observable
132 * and returns a producer for the new observable.
132 * and returns a producer for the new observable.
133 *
133 *
134 * This function can be used to create a complex mapping between source and
134 * This function can be used to create a complex mapping between source and
135 * resulting observables. The operator may have a state (or a side effect)
135 * resulting observables. The operator may have a state (or a side effect)
136 * and can be connected to multiple observables.
136 * and can be connected to multiple observables.
137 */
137 */
138 pipe<U>(op: OperatorFn<T, U>): Observable<U>;
138 pipe<U>(op: OperatorFn<T, U>): Observable<U>;
139
139
140 /** Waits for the next event to occur and returns a promise for the next value
140 /** Waits for the next event to occur and returns a promise for the next value
141 * @param ct Cancellation token
141 * @param ct Cancellation token
142 */
142 */
143 next(ct?: ICancellation): Promise<T>;
143 next(ct?: ICancellation): Promise<T>;
144
144
145 /** Collects items of the sequence to the array. */
145 /** Collects items of the sequence to the array. */
146 collect(ct?: ICancellation): Promise<T[]>;
146 collect(ct?: ICancellation): Promise<T[]>;
147 }
147 }
148
148
149 const noop = () => { };
149 const noop = () => { };
150
150
151 const sink = <T>(consumer: Partial<Observer<T>>) => {
151 const sink = <T>(consumer: Observer<T>) => {
152 const { next, error, complete } = consumer;
152 const { next, error, complete } = consumer;
153 return {
153 return {
154 next: next ? next.bind(consumer) : noop,
154 next: next ? next.bind(consumer) : noop,
155 error: error ? error.bind(consumer) : noop,
155 error: error ? error.bind(consumer) : noop,
156 complete: complete ? complete.bind(consumer) : noop,
156 complete: complete ? complete.bind(consumer) : noop,
157 isClosed: () => false
157 isClosed: () => false
158 };
158 };
159 };
159 };
160
160
161 /** Wraps the producer to handle tear down logic and subscription management
161 /** Wraps the producer to handle tear down logic and subscription management
162 *
162 *
163 * @param producer The producer to wrap
163 * @param producer The producer to wrap
164 * @returns The wrapper producer
164 * @returns The wrapper producer
165 */
165 */
166 const fuse = <T>(producer: Producer<T>) => ({ next, error, complete }: Sink<T>) => {
166 const fuse = <T>(producer: Producer<T>) => ({ next, error, complete }: Sink<T>) => {
167 let done = false;
167 let done = false;
168 let cleanup = noop;
168 let cleanup = noop;
169
169
170 const _fin = <A extends unknown[]>(fn: (...args: A) => void) =>
170 const _fin = <A extends unknown[]>(fn: (...args: A) => void) =>
171 (...args: A) => done ?
171 (...args: A) => done ?
172 void (0) :
172 void (0) :
173 (done = true, cleanup(), fn(...args));
173 (done = true, cleanup(), fn(...args));
174
174
175 const _fin0 = () => done ? void (0) : (done = true, cleanup());
175 const _fin0 = () => done ? void (0) : (done = true, cleanup());
176
176
177 const safeSink = {
177 const safeSink = {
178 next: (value: T) => { !done && next(value); },
178 next: (value: T) => { !done && next(value); },
179 error: _fin(error),
179 error: _fin(error),
180 complete: _fin(complete),
180 complete: _fin(complete),
181 isClosed: () => done
181 isClosed: () => done
182 };
182 };
183 cleanup = producer(safeSink) ?? noop;
183 cleanup = producer(safeSink) ?? noop;
184 return done ? cleanup() : _fin0;
184 return done ? cleanup() : _fin0;
185 };
185 };
186
186
187 const _observe = <T>(producer: Producer<T>): Observable<T> => ({
187 const _observe = <T>(producer: Producer<T>): Observable<T> => ({
188 subscribe: (consumer: Partial<Observer<T>>) => ({
188 subscribe: (consumer: Observer<T>) => ({
189 unsubscribe: producer(sink(consumer)) ?? noop
189 unsubscribe: producer(sink(consumer)) ?? noop
190 }),
190 }),
191
191
192 map: (mapper) => _observe(({ next, ...rest }) =>
192 map: (mapper) => _observe(({ next, ...rest }) =>
193 producer({
193 producer({
194 next: next !== noop ? (v: T) => next(mapper(v)) : noop,
194 next: next !== noop ? (v: T) => next(mapper(v)) : noop,
195 ...rest
195 ...rest
196 })
196 })
197 ),
197 ),
198
198
199 filter: (predicate) => _observe(({ next, ...rest }) =>
199 filter: (predicate) => _observe(({ next, ...rest }) =>
200 producer({
200 producer({
201 next: next !== noop ? (v: T) => predicate(v) ? next(v) : void (0) : noop,
201 next: next !== noop ? (v: T) => predicate(v) ? next(v) : void (0) : noop,
202 ...rest
202 ...rest
203 })
203 })
204 ),
204 ),
205
205
206 until: predicate => _observe(({ next, complete, ...rest }) =>
206 until: predicate => _observe(({ next, complete, ...rest }) =>
207 producer({
207 producer({
208 next: v => predicate(v) ? complete() : next(v),
208 next: v => predicate(v) ? complete() : next(v),
209 complete,
209 complete,
210 ...rest
210 ...rest
211 })
211 })
212 ),
212 ),
213
213
214 while: predicate => _observe(({ next, complete, ...rest }) =>
214 while: predicate => _observe(({ next, complete, ...rest }) =>
215 producer({
215 producer({
216 next: v => predicate(v) ? next(v) : complete(),
216 next: v => predicate(v) ? next(v) : complete(),
217 complete,
217 complete,
218 ...rest
218 ...rest
219 })
219 })
220 ),
220 ),
221
221
222 scan: <A>(...args: [AccumulatorFn<T, A>, A] | [AccumulatorFn<T, T>]) => _observe<T | A>(({ next, ...rest }) => {
222 scan: <A>(...args: [AccumulatorFn<T, A>, A] | [AccumulatorFn<T, T>]) => _observe<T | A>(({ next, ...rest }) => {
223 if (args.length === 1) {
223 if (args.length === 1) {
224 const [accumulator] = args;
224 const [accumulator] = args;
225 let _acc: T;
225 let _acc: T;
226 let index = 0;
226 let index = 0;
227 return producer({
227 return producer({
228 next: next !== noop ? (v: T) => next(index++ === 0 ? _acc = v : _acc = accumulator(_acc, v)) : noop,
228 next: next !== noop ? (v: T) => next(index++ === 0 ? _acc = v : _acc = accumulator(_acc, v)) : noop,
229 ...rest
229 ...rest
230 });
230 });
231 } else {
231 } else {
232 const [accumulator, initial] = args;
232 const [accumulator, initial] = args;
233 let _acc = initial;
233 let _acc = initial;
234 return producer({
234 return producer({
235 next: next !== noop ? (v: T) => next(_acc = accumulator(_acc, v)) : noop,
235 next: next !== noop ? (v: T) => next(_acc = accumulator(_acc, v)) : noop,
236 ...rest
236 ...rest
237 });
237 });
238 }
238 }
239 }),
239 }),
240
240
241 reduce: <A>(...args: [AccumulatorFn<T, A>, A] | [AccumulatorFn<T, T>]) => _observe<T | A>(({ next, complete, error, ...rest }) => {
241 reduce: <A>(...args: [AccumulatorFn<T, A>, A] | [AccumulatorFn<T, T>]) => _observe<T | A>(({ next, complete, error, ...rest }) => {
242 if (args.length === 1) {
242 if (args.length === 1) {
243 const [accumulator] = args;
243 const [accumulator] = args;
244 let _acc: T;
244 let _acc: T;
245 let index = 0;
245 let index = 0;
246 return producer({
246 return producer({
247 next: next !== noop ? (v: T) => {
247 next: next !== noop ? (v: T) => {
248 _acc = index++ === 0 ? v : accumulator(_acc, v);
248 _acc = index++ === 0 ? v : accumulator(_acc, v);
249 } : noop,
249 } : noop,
250 complete: () => {
250 complete: () => {
251 if (index === 0) {
251 if (index === 0) {
252 error(new Error("The sequence can't be empty"));
252 error(new Error("The sequence can't be empty"));
253 } else {
253 } else {
254 next(_acc);
254 next(_acc);
255 complete();
255 complete();
256 }
256 }
257 },
257 },
258 error,
258 error,
259 ...rest
259 ...rest
260 });
260 });
261 } else {
261 } else {
262 const [accumulator, initial] = args;
262 const [accumulator, initial] = args;
263 let _acc = initial;
263 let _acc = initial;
264 return producer({
264 return producer({
265 next: next !== noop ? (v: T) => {
265 next: next !== noop ? (v: T) => {
266 _acc = accumulator(_acc, v);
266 _acc = accumulator(_acc, v);
267 } : noop,
267 } : noop,
268 complete: () => {
268 complete: () => {
269 next(_acc);
269 next(_acc);
270 complete();
270 complete();
271 },
271 },
272 error,
272 error,
273 ...rest
273 ...rest
274 });
274 });
275 }
275 }
276 }),
276 }),
277
277
278 cat: (...seq) => _observe(({ next, complete: final, ...rest }) => {
278 cat: (...seq) => _observe(({ next, complete: final, ...rest }) => {
279 let cleanup: () => void;
279 let cleanup: () => void;
280 const complete = () => {
280 const complete = () => {
281 const continuation = seq.shift();
281 const continuation = seq.shift();
282 if (continuation) {
282 if (continuation) {
283 // if we have a next sequence, subscribe to it
283 // if we have a next sequence, subscribe to it
284 const subscription = continuation.subscribe({ next, complete, ...rest });
284 const subscription = continuation.subscribe({ next, complete, ...rest });
285 cleanup = subscription.unsubscribe.bind(subscription);
285 cleanup = subscription.unsubscribe.bind(subscription);
286 } else {
286 } else {
287 // otherwise notify the consumer about completion
287 // otherwise notify the consumer about completion
288 final();
288 final();
289 }
289 }
290 };
290 };
291
291
292 cleanup = producer({ next, complete, ...rest }) ?? noop;
292 cleanup = producer({ next, complete, ...rest }) ?? noop;
293
293
294 return () => cleanup();
294 return () => cleanup();
295 }),
295 }),
296
296
297 pipe: <U>(op: (source: Observable<T>) => Producer<U>) => observe(op(_observe(producer))),
297 pipe: <U>(op: OperatorFn<T,U>) => observe(op(_observe(producer))),
298
298
299 next: collect(
299 next: collect(
300 producer,
300 producer,
301 ({ next, complete, error, isClosed }) => ({
301 ({ next, complete, error, isClosed }) => ({
302 next: v => (next(v), complete()),
302 next: v => (next(v), complete()),
303 complete: () => error(new Error("The sequence is empty")),
303 complete: () => error(new Error("The sequence is empty")),
304 error,
304 error,
305 isClosed
305 isClosed
306 })
306 })
307 ),
307 ),
308
308
309 collect: collect(
309 collect: collect(
310 producer,
310 producer,
311 ({ next, complete, ...rest }) => {
311 ({ next, complete, ...rest }) => {
312 const data: T[] = [];
312 const data: T[] = [];
313 return {
313 return {
314 next: v => data.push(v),
314 next: v => data.push(v),
315 complete: () => (next(data), complete()),
315 complete: () => (next(data), complete()),
316 ...rest
316 ...rest
317 };
317 };
318 }
318 }
319 )
319 )
320 });
320 });
321
321
322 const collect = <T, U>(
322 const collect = <T, U>(
323 producer: Producer<T>,
323 producer: Producer<T>,
324 collector: (result: Sink<U>) => Sink<T>
324 collector: (result: Sink<U>) => Sink<T>
325 ) => (ct = Cancellation.none) => new Promise<U>((resolve, reject) => {
325 ) => (ct = Cancellation.none) => new Promise<U>((resolve, reject) => {
326 const fused = fuse<U>(({ next, complete, error, isClosed }) => {
326 const fused = fuse<U>(({ next, complete, error, isClosed }) => {
327 const h = ct.register(error);
327 const h = ct.register(error);
328 const cleanup = !isClosed() ?
328 const cleanup = !isClosed() ?
329 producer(collector({ next, complete, error, isClosed })) ?? noop :
329 producer(collector({ next, complete, error, isClosed })) ?? noop :
330 noop;
330 noop;
331
331
332 return () => {
332 return () => {
333 h.destroy();
333 h.destroy();
334 cleanup();
334 cleanup();
335 };
335 };
336 });
336 });
337
337
338 fused({
338 fused({
339 next: resolve,
339 next: resolve,
340 error: reject,
340 error: reject,
341 complete: noop,
341 complete: noop,
342 isClosed: () => false
342 isClosed: () => false
343 });
343 });
344 });
344 });
345
345
346 export const observe = <T>(producer: Producer<T>) => _observe(fuse(producer));
346 export const observe = <T>(producer: Producer<T>) => _observe(fuse(producer));
347
347
348 /** Converts an array to the observable sequence of its elements. */
348 export const ofArray = <T>(items: T[]) => _observe<T>(
349 export const ofArray = <T>(items: T[]) => _observe<T>(
349 ({ next, complete }) => (
350 ({ next, complete }) => (
350 items.forEach(next),
351 items.forEach(next),
351 complete()
352 complete()
352 )
353 )
353 );
354 );
354
355
356 /** Converts a subscribable to the observable */
357 export const ofSubscribable = <T>(subscribable: Subscribable<T>) =>
358 observe(sink => {
359 const subscription = subscribable.subscribe(sink);
360 return () => subscription.unsubscribe();
361 });
362
355 const of1 = <T>(item: T | PromiseLike<T>) => observe<T>(
363 const of1 = <T>(item: T | PromiseLike<T>) => observe<T>(
356 ({ next, error, complete }) =>
364 ({ next, error, complete }) =>
357 isPromise(item) ?
365 isPromise(item) ?
358 void item.then(
366 void item.then(
359 v => (next(v), complete()),
367 v => (next(v), complete()),
360 error
368 error
361 ) :
369 ) :
362 (next(item), complete())
370 (next(item), complete())
363 );
371 );
364
372
373 /** Converts a list of parameter values to the observable sequence. The
374 * order of elements in the list will be preserved in the resulting sequence.
375 */
365 export const of = <T>(...items: (T | PromiseLike<T>)[]) => items.length === 1 ?
376 export const of = <T>(...items: (T | PromiseLike<T>)[]) => items.length === 1 ?
366 of1(items[0]) :
377 of1(items[0]) :
367 observe<T>(
378 observe<T>(
368 ({ next, error, complete, isClosed }) => {
379 ({ next, error, complete, isClosed }) => {
369 const n = items.length;
380 const n = items.length;
370
381
371 const _next = (start: number) => {
382 const _next = (start: number) => {
372 if (start > 0 && isClosed()) // when resumed
383 if (start > 0 && isClosed()) // when resumed
373 return;
384 return;
374
385
375 for (let i = start; i < n; i++) {
386 for (let i = start; i < n; i++) {
376 const r = items[i];
387 const r = items[i];
377 if (isPromise(r)) {
388 if (isPromise(r)) {
378 r.then(v => (next(v), _next(i + 1)), error);
389 r.then(v => (next(v), _next(i + 1)), error);
379 return; // suspend
390 return; // suspend
380 } else {
391 } else {
381 next(r);
392 next(r);
382 }
393 }
383 }
394 }
384 complete();
395 complete();
385 };
396 };
386
397
387 _next(0);
398 _next(0);
388 }
399 }
389 );
400 );
390
401
391 export const empty = _observe<never>(({ complete }) => complete());
402 export const empty = _observe<never>(({ complete }) => complete());
392
393 /**
394 * Creates a mutable state and the observable for the stored value.
395 *
396 * @param value The initial value for the state
397 * @returns an array of three elements `[observable, setter, getter]`
398 *
399 * The returned observable keeps the actual value and will emit it as the next
400 * element each time a consumer subscribes the observable.
401 *
402 * Calling the setter will update the stored value in the observable and notify
403 * all consumers.
404 */
405 export const stateful = <T>(producer: Producer<T>): Producer<T> => {
406 const fusedProducer = fuse(producer);
407 type Status = "active" | "complete" | "error";
408
409 let lastValue: T;
410 let hasValue = false;
411 let status: Status = "active";
412 let lastError: unknown;
413 let subscribers: Sink<T>[] = [];
414
415 const sink: Sink<T> = {
416 isClosed: () => status !== "active",
417 complete: () => {
418 if (status === "active") {
419 status = "complete";
420 const _subscribers = subscribers;
421 subscribers = [];
422 _subscribers.forEach(s => s.complete());
423 }
424 },
425 error: e => {
426 if (status === "active") {
427 status = "error";
428 lastError = e;
429 const _subscribers = subscribers;
430 subscribers = [];
431 _subscribers.forEach(s => s.error(e));
432 }
433 },
434 next: v => {
435 if (status === "active") {
436 hasValue = true;
437 lastValue = v;
438 const _subscribers = subscribers;
439 _subscribers.forEach(s => s.next(v));
440 }
441 }
442 };
443
444 fusedProducer(sink);
445
446 return (s: Sink<T>) => {
447 const _subscribers = subscribers;
448 switch (status) {
449 case "active":
450 if (hasValue)
451 s.next(lastValue); // if hasValue is true,
452 // lastValue has a valid value
453 subscribers.push(s);
454 return () => {
455 if (_subscribers === subscribers) {
456 const pos = subscribers.indexOf(s);
457 if (pos >= 0)
458 subscribers.splice(pos, 1);
459 }
460 };
461 case "complete":
462 s.complete();
463 break;
464 case "error":
465 s.error(lastError);
466 break;
467 }
468 };
469 };
470
471 /** Create the producer which will be called once when the first subscriber is
472 * attached, next subscribers would share the same producer. When all
473 * subscribers are removed the producer will be cleaned up.
474 *
475 * Use this wrapper to prevent spawning multiple producers.
476 *
477 * @param producer The source producer
478 * @returns The wrapped producer
479 */
480 export const subject = <T>(producer: Producer<T>): Producer<T> => {
481 const fusedProducer = fuse(producer);
482
483 let subscribers: Sink<T>[] = [];
484
485 let cleanup = noop;
486
487 const sink: Sink<T> = {
488 isClosed: () => false,
489 complete: () => {
490 const _subscribers = subscribers;
491 subscribers = [];
492 _subscribers.forEach(s => s.complete());
493 cleanup();
494 },
495 error: e => {
496 const _subscribers = subscribers;
497 subscribers = [];
498 _subscribers.forEach(s => s.error(e));
499 cleanup();
500 },
501 next: v => {
502 const _subscribers = subscribers;
503 _subscribers.forEach(s => s.next(v));
504 }
505 };
506
507 return client => {
508 const _subscribers = subscribers;
509 subscribers.push(client);
510 if (subscribers.length === 1)
511 cleanup = fusedProducer(sink) ?? noop;
512
513 return () => {
514 if (_subscribers === subscribers) {
515 const pos = subscribers.indexOf(client);
516 if (pos >= 0)
517 subscribers.splice(pos, 1);
518 if (!subscribers.length)
519 cleanup();
520 }
521 };
522 };
523 }; No newline at end of file
@@ -1,198 +1,200
1 import { TraceSource } from "@implab/core-amd/log/TraceSource";
1 import { TraceSource } from "@implab/core-amd/log/TraceSource";
2 import { isPromise } from "@implab/core-amd/safe";
2 import { isPromise } from "@implab/core-amd/safe";
3 import { id as mid } from "module";
3 import { id as mid } from "module";
4 import { IScope, Scope } from "./Scope";
4 import { IScope, Scope } from "./Scope";
5 import { isNode, isRendition, isWidget } from "./traits";
5 import { isNode, isRendition, isWidget } from "./traits";
6
6
7 const trace = TraceSource.get(mid);
7 const trace = TraceSource.get(mid);
8
8
9 interface Context {
9 interface Context {
10 readonly scope: IScope;
10 readonly scope: IScope;
11
11
12 readonly hooks?: (() => void)[];
12 readonly hooks?: (() => void)[];
13 }
13 }
14
14
15 let _context: Context = {
15 let _context: Context = {
16 scope: Scope.dummy
16 scope: Scope.dummy
17 };
17 };
18
18
19 let _renderCount = 0;
19 let _renderCount = 0;
20 let _renderId = 1;
20 let _renderId = 1;
21 let _renderedHooks: (() => void)[] = [];
21 let _renderedHooks: (() => void)[] = [];
22
22
23
23
24 const guard = (cb: () => unknown) => {
24 const guard = (cb: () => unknown) => {
25 try {
25 try {
26 const result = cb();
26 const result = cb();
27 if (isPromise(result)) {
27 if (isPromise(result)) {
28 const warn = (ret: unknown) => trace.error("The callback {0} competed asynchronously. result = {1}", cb, ret);
28 const warn = (ret: unknown) => trace.error("The callback {0} competed asynchronously. result = {1}", cb, ret);
29 result.then(warn, warn);
29 result.then(warn, warn);
30 }
30 }
31 } catch (e) {
31 } catch (e) {
32 trace.error(e);
32 trace.error(e);
33 }
33 }
34 };
34 };
35
35
36 /**
36 /**
37 *
37 *
38 * @param scope
38 * @param scope
39 * @returns
39 * @returns
40 */
40 */
41 export const beginRender = (scope = getScope()) => {
41 export const beginRender = (scope = getScope()) => {
42 const prev = _context;
42 const prev = _context;
43 _renderCount++;
43 _renderCount++;
44 const renderId = _renderId++;
44 const renderId = _renderId++;
45 trace.debug("beginRender [{0}], pending = {1}", renderId, _renderCount);
45 trace.debug("beginRender [{0}], pending = {1}", renderId, _renderCount);
46 if (_renderCount === 1)
46 if (_renderCount === 1)
47 onRendering();
47 onRendering();
48
48
49 _context = {
49 _context = {
50 scope,
50 scope,
51 hooks: []
51 hooks: []
52 };
52 };
53 return endRender(prev, _context, renderId);
53 return endRender(prev, _context, renderId);
54 };
54 };
55
55
56 /**
56 /**
57 * Method for a deferred rendering. Returns a promise with `beginRender()` function.
57 * Method for a deferred rendering. Returns a promise with `beginRender()` function.
58 * Call to `scheduleRender` will save the current context, and will increment pending
58 * Call to `scheduleRender` will save the current context, and will increment pending
59 * operations counter.
59 * operations counter.
60 *
60 *
61 * @example
61 * @example
62 *
62 *
63 * const begin = await scheduleRender();
63 * const begin = await scheduleRender();
64 * const end = begin();
64 * const end = begin();
65 * try {
65 * try {
66 * // do some DOM manipulations
66 * // do some DOM manipulations
67 * } finally {
67 * } finally {
68 * end();
68 * end();
69 * }
69 * }
70 *
70 *
71 * @param scope
71 * @param scope
72 * @returns
72 * @returns
73 */
73 */
74 export const scheduleRender = async (scope = getScope()) => {
74 export const scheduleRender = async (scope = getScope()) => {
75 _renderCount++;
75 _renderCount++;
76 const renderId = _renderId ++;
76 const renderId = _renderId ++;
77 trace.debug("scheduleRender [{0}], pending = {1}", renderId, _renderCount);
77 trace.debug("scheduleRender [{0}], pending = {1}", renderId, _renderCount);
78 if (_renderCount === 1)
78 if (_renderCount === 1)
79 onRendering();
79 onRendering();
80
80
81 await Promise.resolve();
81 await Promise.resolve();
82
82
83 return () => {
83 return () => {
84 trace.debug("beginRender [{0}], pending = {1}", renderId, _renderCount);
84 trace.debug("beginRender [{0}], pending = {1}", renderId, _renderCount);
85 const prev = _context;
85 const prev = _context;
86
86
87 _context = {
87 _context = {
88 scope,
88 scope,
89 hooks: []
89 hooks: []
90 };
90 };
91 return endRender(prev, _context, renderId);
91 return endRender(prev, _context, renderId);
92 };
92 };
93 };
93 };
94
94
95 /**
95 /**
96 * Completes render operation
96 * Completes render operation
97 */
97 */
98 const endRender = (prev: Context, current: Context, renderId: number) => () => {
98 const endRender = (prev: Context, current: Context, renderId: number) => () => {
99 if (_context !== current)
99 if (_context !== current)
100 trace.error("endRender mismatched beginRender call");
100 trace.error("endRender mismatched beginRender call");
101
101
102 const { hooks } = _context;
102 const { hooks } = _context;
103 if (hooks)
103 if (hooks)
104 hooks.forEach(guard);
104 hooks.forEach(guard);
105
105
106 _renderCount--;
106 _renderCount--;
107 _context = prev;
107 _context = prev;
108
108
109 trace.debug("endRender [{0}], pending = {1}", renderId, _renderCount);
109 trace.debug("endRender [{0}], pending = {1}", renderId, _renderCount);
110 if (_renderCount === 0)
110 if (_renderCount === 0)
111 onRendered();
111 onRendered();
112 };
112 };
113
113
114 // called when the first beginRender is called for this iteration
114 // called when the first beginRender is called for this iteration
115 const onRendering = () => {
115 const onRendering = () => {
116 trace.log("Rendering started");
116 setTimeout(() => {
117 setTimeout(() => {
117 if (_renderCount !== 0)
118 if (_renderCount !== 0)
118 trace.error("Rendering tasks aren't finished, currently running = {0}", _renderCount);
119 trace.error("Rendering tasks aren't finished, currently running = {0}", _renderCount);
119 });
120 });
120 };
121 };
121
122
122 // called when all render operations are complete
123 // called when all render operations are complete
123 const onRendered = () => {
124 const onRendered = () => {
125 trace.log("Rendering compete");
124 _renderedHooks.forEach(guard);
126 _renderedHooks.forEach(guard);
125 _renderedHooks = [];
127 _renderedHooks = [];
126 };
128 };
127
129
128 export const whenRendered = () => new Promise<void>((resolve) => {
130 export const whenRendered = () => new Promise<void>((resolve) => {
129 if (_renderCount)
131 if (_renderCount)
130 _renderedHooks.push(resolve);
132 _renderedHooks.push(resolve);
131 else
133 else
132 resolve();
134 resolve();
133 });
135 });
134
136
135 export const renderHook = (hook: () => void) => {
137 export const renderHook = (hook: () => void) => {
136 const { hooks } = _context;
138 const { hooks } = _context;
137 if (hooks)
139 if (hooks)
138 hooks.push(hook);
140 hooks.push(hook);
139 else
141 else
140 guard(hook);
142 guard(hook);
141 };
143 };
142
144
143 export const refHook = <T>(value: T, ref: JSX.Ref<T>) => {
145 export const refHook = <T>(value: T, ref: JSX.Ref<T>) => {
144 const { hooks, scope } = _context;
146 const { hooks, scope } = _context;
145 if (hooks)
147 if (hooks)
146 hooks.push(() => ref(value));
148 hooks.push(() => ref(value));
147 else
149 else
148 guard(() => ref(value));
150 guard(() => ref(value));
149
151
150 scope.own(() => ref(undefined));
152 scope.own(() => ref(undefined));
151 };
153 };
152
154
153 /** Returns the current scope */
155 /** Returns the current scope */
154 export const getScope = () => _context.scope;
156 export const getScope = () => _context.scope;
155
157
156 /** Schedules the rendition to be rendered to the DOM Node
158 /** Schedules the rendition to be rendered to the DOM Node
157 * @param rendition The rendition to be rendered
159 * @param rendition The rendition to be rendered
158 * @param scope The scope
160 * @param scope The scope
159 */
161 */
160 export const render = (rendition: unknown, scope = Scope.dummy) => {
162 export const render = (rendition: unknown, scope = Scope.dummy) => {
161 const complete = beginRender(scope);
163 const complete = beginRender(scope);
162 try {
164 try {
163 return getItemDom(rendition);
165 return getItemDom(rendition);
164 } finally {
166 } finally {
165 complete();
167 complete();
166 }
168 }
167 };
169 };
168
170
169 const emptyFragment = document.createDocumentFragment();
171 const emptyFragment = document.createDocumentFragment();
170
172
171 /** Renders DOM element for different types of the argument. */
173 /** Renders DOM element for different types of the argument. */
172 export const getItemDom = (v: unknown) => {
174 export const getItemDom = (v: unknown) => {
173 if (typeof v === "string" || typeof v === "number" || v instanceof RegExp || v instanceof Date) {
175 if (typeof v === "string" || typeof v === "number" || v instanceof RegExp || v instanceof Date) {
174 // primitive types converted to the text nodes
176 // primitive types converted to the text nodes
175 return document.createTextNode(v.toString());
177 return document.createTextNode(v.toString());
176 } else if (isNode(v)) {
178 } else if (isNode(v)) {
177 // nodes are kept as is
179 // nodes are kept as is
178 return v;
180 return v;
179 } else if (isRendition(v)) {
181 } else if (isRendition(v)) {
180 // renditions are instantiated
182 // renditions are instantiated
181 return v.getDomNode();
183 return v.getDomNode();
182 } else if (isWidget(v)) {
184 } else if (isWidget(v)) {
183 // widgets are converted to it's markup
185 // widgets are converted to it's markup
184 return v.domNode;
186 return v.domNode;
185 } else if (typeof v === "boolean" || v === null || v === undefined) {
187 } else if (typeof v === "boolean" || v === null || v === undefined) {
186 // null | undefined | boolean are removed
188 // null | undefined | boolean are removed
187 return emptyFragment;
189 return emptyFragment;
188 } else if (v instanceof Array) {
190 } else if (v instanceof Array) {
189 // arrays will be translated to document fragments
191 // arrays will be translated to document fragments
190 const fragment = document.createDocumentFragment();
192 const fragment = document.createDocumentFragment();
191 v.map(item => getItemDom(item))
193 v.map(item => getItemDom(item))
192 .forEach(node => fragment.appendChild(node));
194 .forEach(node => fragment.appendChild(node));
193 return fragment;
195 return fragment;
194 } else {
196 } else {
195 // bug: explicit error otherwise
197 // bug: explicit error otherwise
196 throw new Error(`Invalid parameter: ${String(v)}`);
198 throw new Error(`Invalid parameter: ${String(v)}`);
197 }
199 }
198 };
200 };
@@ -1,27 +1,28
1 import { observe, stateful } from "./observable";
1 import { observe } from "./observable";
2 import { buffer } from "./operators/buffer";
2 import * as t from "tap";
3 import * as t from "tap";
3
4
4 interface CounterState {
5 interface CounterState {
5 count: number;
6 count: number;
6
7
7 label: "low" | "mid" | "high"
8 label: "low" | "mid" | "high"
8 }
9 }
9
10
10 let set: (v: CounterState) => void = () => void (0);
11 let set: (v: CounterState) => void = () => void (0);
11 const initial: CounterState = { count: 0, label: "low" };
12 const initial: CounterState = { count: 0, label: "low" };
12 let value = initial;
13 let value = initial;
13
14
14 const obs = observe(stateful<CounterState>(({ next }) => {
15 const obs = observe<CounterState>(({ next }) => {
15 next(initial);
16 next(initial);
16 set = next;
17 set = next;
17 }));
18 }).pipe(buffer(2));
18
19
19 set({ count: 10, label: "mid" });
20 set({ count: 10, label: "mid" });
20
21
21 obs.subscribe({
22 obs.subscribe({
22 next: v => value = v
23 next: v => value = v
23 });
24 });
24
25
25 t.equal(value.count, 10, "State should update");
26 t.equal(value.count, 10, "State should update");
26
27
27 set({ count: 20, label: "high" });
28 set({ count: 20, label: "high" });
@@ -1,78 +1,79
1 import { observe, subject } from "./observable";
1 import { observe } from "./observable";
2 import { subject } from "./operators/subject";
2 import * as tap from "tap";
3 import * as tap from "tap";
3
4
4 tap.test("Subject tests", t => {
5 tap.test("Subject tests", t => {
5
6
6 let nextEvent: (value: string) => void = () => void (0);
7 let nextEvent: (value: string) => void = () => void (0);
7
8
8 const subj1 = observe(subject<string>(({ next }) => {
9 const subj1 = observe<string>(({ next }) => {
9 t.comment("Start subject");
10 t.comment("Start subject");
10
11
11 nextEvent = next;
12 nextEvent = next;
12
13
13 return () => {
14 return () => {
14 nextEvent = () => void (0);
15 nextEvent = () => void (0);
15 t.comment("Stop subject");
16 t.comment("Stop subject");
16 };
17 };
17 }));
18 }).pipe(subject);
18
19
19 const h1 = subj1.subscribe({
20 const h1 = subj1.subscribe({
20 next: v => t.comment(`h1 next: ${v}`)
21 next: v => t.comment(`h1 next: ${v}`)
21 });
22 });
22
23
23 nextEvent("first");
24 nextEvent("first");
24
25
25 const h2 = subj1.subscribe({
26 const h2 = subj1.subscribe({
26 next: v => t.comment(`h2 next: ${v}`)
27 next: v => t.comment(`h2 next: ${v}`)
27 });
28 });
28
29
29 nextEvent("second");
30 nextEvent("second");
30
31
31 h1.unsubscribe();
32 h1.unsubscribe();
32
33
33 nextEvent("third");
34 nextEvent("third");
34
35
35 h2.unsubscribe();
36 h2.unsubscribe();
36
37
37 t.pass("Subject finished");
38 t.pass("Subject finished");
38 t.end();
39 t.end();
39 }).catch(e => console.error(e));
40 }).catch(e => console.error(e));
40
41
41
42
42 tap.test("Subject tests #2", t => {
43 tap.test("Subject tests #2", t => {
43
44
44 let nextEvent: (value: string) => void = () => void (0);
45 let nextEvent: (value: string) => void = () => void (0);
45
46
46 const subj1 = observe(subject<string>(({ next, complete }) => {
47 const subj1 = observe<string>(({ next, complete }) => {
47 t.comment("Start subject");
48 t.comment("Start subject");
48
49
49 complete();
50 complete();
50 nextEvent = next;
51 nextEvent = next;
51
52
52 return () => {
53 return () => {
53 nextEvent = () => void (0);
54 nextEvent = () => void (0);
54 t.comment("Stop subject");
55 t.comment("Stop subject");
55 };
56 };
56 }));
57 }).pipe(subject);
57
58
58 const h1 = subj1.subscribe({
59 const h1 = subj1.subscribe({
59 next: v => t.comment(`h1 next: ${v}`)
60 next: v => t.comment(`h1 next: ${v}`)
60 });
61 });
61
62
62 nextEvent("first");
63 nextEvent("first");
63
64
64 const h2 = subj1.subscribe({
65 const h2 = subj1.subscribe({
65 next: v => t.comment(`h2 next: ${v}`)
66 next: v => t.comment(`h2 next: ${v}`)
66 });
67 });
67
68
68 nextEvent("second");
69 nextEvent("second");
69
70
70 h1.unsubscribe();
71 h1.unsubscribe();
71
72
72 nextEvent("third");
73 nextEvent("third");
73
74
74 h2.unsubscribe();
75 h2.unsubscribe();
75
76
76 t.pass("Subject finished");
77 t.pass("Subject finished");
77 t.end();
78 t.end();
78 }).catch(e => console.error(e)); No newline at end of file
79 }).catch(e => console.error(e));
General Comments 0
You need to be logged in to leave comments. Login now