##// END OF EJS Templates
added observable subject producer
cin -
r125:cede47727a1b v1.7.0 default
parent child
Show More

The requested changes are too big and content was truncated. Show full diff

@@ -0,0 +1,78
1 import { observe, subject } from "./observable";
2 import * as tap from "tap";
3
4 tap.test("Subject tests", t => {
5
6 let nextEvent: (value: string) => void = () => void (0);
7
8 const subj1 = observe(subject<string>(({ next }) => {
9 t.comment("Start subject");
10
11 nextEvent = next;
12
13 return () => {
14 nextEvent = () => void (0);
15 t.comment("Stop subject");
16 };
17 }));
18
19 const h1 = subj1.subscribe({
20 next: v => t.comment(`h1 next: ${v}`)
21 });
22
23 nextEvent("first");
24
25 const h2 = subj1.subscribe({
26 next: v => t.comment(`h2 next: ${v}`)
27 });
28
29 nextEvent("second");
30
31 h1.unsubscribe();
32
33 nextEvent("third");
34
35 h2.unsubscribe();
36
37 t.pass("Subject finished");
38 t.end();
39 }).catch(e => console.error(e));
40
41
42 tap.test("Subject tests #2", t => {
43
44 let nextEvent: (value: string) => void = () => void (0);
45
46 const subj1 = observe(subject<string>(({ next, complete }) => {
47 t.comment("Start subject");
48
49 complete();
50 nextEvent = next;
51
52 return () => {
53 nextEvent = () => void (0);
54 t.comment("Stop subject");
55 };
56 }));
57
58 const h1 = subj1.subscribe({
59 next: v => t.comment(`h1 next: ${v}`)
60 });
61
62 nextEvent("first");
63
64 const h2 = subj1.subscribe({
65 next: v => t.comment(`h2 next: ${v}`)
66 });
67
68 nextEvent("second");
69
70 h1.unsubscribe();
71
72 nextEvent("third");
73
74 h2.unsubscribe();
75
76 t.pass("Subject finished");
77 t.end();
78 }).catch(e => console.error(e)); No newline at end of file
@@ -1,198 +1,306
1 1 # @implab/djx
2 2
3 3 ## SYNOPSIS
4 4
5 5 ```tsx
6 6 import { djbase, djclass, bind, prototype, AbstractConstructor } from "@implab/djx/declare";
7 7
8 8 import { DjxWidgetBase } from "@implab/djx/tsx/DjxWidgetBase";
9 9 import { createElement } from "@implab/djx/tsx";
10 10
11 11 interface MyWidgetAttrs {
12 12 title: string;
13 13
14 14 counter: number;
15 15 }
16 16
17 17 interface MyWidgetEvents {
18 18 "count-inc": Event;
19 19
20 20 "count-dec": Event;
21 21 }
22 22
23 23
24 24 @djclass
25 25 export class MyWidget extends djbase(
26 26 DjxWidgetBase as AbstractConstructor<DjxWidgetBase<MyWidgetAttrs, MyWidgetEvents>>
27 27 ) {
28 28
29 29 @bind({ node: "titleNode", type: "innerHTML" })
30 30 title = "";
31 31
32 32 @prototype()
33 33 counter = 0;
34 34
35 35 render() {
36 36 const Frame = (props: any) => <div>{props.children}</div>;
37 37 return <div
38 38 className="myWidget"
39 39 tabIndex={3}
40 40 style={ alignContent: "center", border: "1px solid" }
41 41 >
42 42 <h1 data-dojo-attach-point="titleNode"></h1>
43 43 <Frame>
44 44 <span class="up-button" onclick={e => this._onIncClick(e)}>[+]</span>
45 45 <span class="down-button" onclick={() => this._onDecClick()}>[-]</span>
46 46 </Frame>
47 47 </div>;
48 48 }
49 49
50 50 _onIncClick(e: MouseEvent) {
51 51 this.emit("count-inc", { bubbles: false });
52 52 }
53 53
54 54 _onDecClick() {
55 55 this.emit("count-dec", { bubbles: false });
56 56 }
57 57 }
58 58
59 59 ```
60 60
61 61 ## DESCRIPTION
62 62
63 63 This package provides you with the tools to glue your good-fellow dojo with modern
64 64 techniques of building the webapp. The core concept is to built around widgets and
65 65 using .tsx to write it. Here are some features:
66 66
67 67 * `djbase()`, `@djaclass` - traits to declare your classes with `dojo/_base/declare`
68 68 * `@implab/djx/tsx` - traits to build the rendering of your widgets with tsx
69 69 * `DjxWidgetBase` - abstract class which supports tsx markup and
70 70 `data-dojo-attach-*` attributes.
71 71 * `@bind(...)` - annotations provide an easy way of using standard dojo widget
72 72 attribute bindings.
73 73
74 74 ### djbase, @djclass
75 75
76 76 These two traits provide convenient way of using `dojo/_base/declare` in Typescript
77 77 for declaring your classes.
78 78
79 79 `djbase(...constructors)` - this method accepts a list of constructors in its
80 80 parameters and returns the **fake** base type which then can be used to derive
81 81 your own class. This allows you to provide the Typescript with the correct
82 82 information about the base type and even use `super`!. The only caveat of
83 83 this approach is that you **MUST** decorate your class with `@djclass` annotation.
84 84
85 85 Consider the following example:
86 86
87 87 ```ts
88 88 import { djbase, djclass } from "@implab/djx/declare";
89 89 import { FooMixin } from "./FooMixin";
90 90 import { BarMixin } from "./BarMixin";
91 91 import { BoxMixin } from "./BoxMixin";
92 92
93 93 @djclass
94 94 export class Baz extends djbase(FooMixin, BarMixin, BoxMixin) {
95 95 writeHello(out: string[]) {
96 96 out.push("-> Baz");
97 97
98 98 super.writeHello(out);
99 99
100 100 out.push("<- Baz");
101 101 }
102 102 }
103 103
104 104 ```
105 105
106 106 All mixins are declared like the one below:
107 107
108 108 ```ts
109 109 import { djclass, djbase } from "@implab/djx/declare";
110 110
111 111 interface Super {
112 112 writeHello(out: string[]): void;
113 113
114 114 }
115 115
116 116 @djclass
117 117 export class BarMixin extends djbase<Super>() {
118 118 writeHello(out: string[]) {
119 119 out.push("-> Bar");
120 120
121 121 super.writeHello(out);
122 122
123 123 out.push("<- Bar");
124 124 }
125 125 }
126 126 ```
127 127
128 128 finally create an instance and call the `writeHello` method
129 129
130 130 ```ts
131 131 const baz = new Baz();
132 132
133 133 const data: string[] = [];
134 134 baz.writeHello(data);
135 135
136 136 console.log(data.join("\n"));
137 137
138 138 ```
139 139
140 140 you will get the following output:
141 141
142 142 ```text
143 143 -> Baz
144 144 -> Box
145 145 -> Bar
146 146 -> Foo
147 147 <- Foo
148 148 <- Bar
149 149 <- Box
150 150 <- Baz
151 151 ```
152 152
153 153 Let's take a closer look at the `Baz` declaration it uses `djbase` to derive
154 154 from three mixins and the class is decorated with `@djclass` to accomplish the
155 155 declaration and make a real constructor.
156 156
157 157 To allow access to the next sibling method (in terms of multiple inheritance)
158 158 Dojo provides `this.inherited(arguments)` method but this approach leads to the
159 159 problem with 'strict' mode of ES5 and eliminates the type information about a
160 160 calling method. This library solves the problem calling inherited/next method by
161 161 utilizing `super` keyword. Under the hood there are proxy methods generated in
162 162 the prototype of the declared class which make calls to `this.inherited(...)`
163 163 method. This technique is compatible with 'strict' mode.
164 164
165 165 Mixins are declared similar, they also may have the base types although
166 166 the most common case is declaring the mixin without any base classes. To allow
167 167 the mixin to access the next method declare the interface with
168 168 desired methods and use the special form of `djbase<Super>()` without arguments.
169 169
170 170 ### DjxWidgetBase<Attrs, Events>
171 171
172 TODO
172 This is the base class for the djx widgets. It declares the abstract method
173 `render()` which is used to render the content of the widget, like `_TemplatedMixin`.
174
175 This class extends `dijit/_WidgetBase` and contains logic from `_AttachMixin` thus
176 it is capable to handle `data-dojo-attach-*` attributes from the rendered markup.
177
178 ```tsx
179 @djclass
180 export class MyFirstWidget extends djbase(DjxWidgetBase) {
181 render() {
182 return <h1>My first widget</h1>;
183 }
184 }
185 ```
173 186
174 187 ### Markup (.tsx)
175 188
176 189 Add to your `tsconfig.json` the following options
177 190
178 191 ```json
179 192 {
180 193 "compilerOptions": {
181 "types": ["@implab/djx"],
194 "types": [
195 "@implab/djx",
196 "@implab/dojo-typings"
197 ],
198 "skipLibCheck": true,
182 199 "experimentalDecorators": true,
183 200 "jsxFactory": "createElement",
184 201 "jsx": "react",
202 "target": "ES5", // minimal supported version
203 "lib": ["ES2015", "DOM"]
185 204 }
186 205 }
187 206
188 207 ```
189 208
190 209 Import `createElement` into your `.tsx` file
191 210
192 211 ```ts
193 212 import { createElement } from "@implab/djx/tsx";
194 213 ```
195 214
196 215 You are ready to go!
197 216
198 TODO
217 ### Adding reactive behavior: refs, watch(...) and watchFor(...)
218
219 This library adds some reactive traits to update the generated DOM of the widget.
220 Dojo 1.x adds some standard options to deal with dynamic changes:
221
222 * `data-dojo-attach-point` allows to get reference to an element (or a nested widget)
223 * widget attribute mappings, allows to bind widget's property to a property of
224 the element, referenced by `data-dojo-attach-point`.
225
226 The typical implementation of this technique would look like
227
228 ```tsx
229 import { createElement } from "@implab/djx/tsx";
230 import {djclass, djbase, bind} from "@implab/djx/declare";
231
232 @djclass
233 export class MyFirstWidget extends djbase(DjxWidgetBase) {
234
235 // @bind will generate special attribute mapping
236 // _setCaptionAttr = { node: "captionNode", type: "innerHTML" }
237 @bind({ node: "captionNode", type: "innerHTML" })
238 caption = "My first widget";
239
240 render() {
241 return <h1 data-dojo-attach-point="captionNode"/>;
242 }
243 }
244 ```
245
246 Despite this is a natural way for the dojo it has some disadvantages:
247
248 1. The compiler doesn't check existence of the attach-point.
249 2. Attribute mappings support only simple mappings, it's difficult to update the
250 complex rendition.
251
252 This library helps you to get both goals with special trait `watch(...)`
253
254 ```tsx
255 import { createElement } from "@implab/djx/tsx";
256 import { djclass, djbase} from "@implab/djx/declare"
257
258 @djclass
259 export class MyFirstWidget extends djbase(DjxWidgetBase) {
260
261 caption = "My first widget";
262
263 render() {
264 return <h1>{watch(this,"caption", value => value)}</h1>;
265 }
266 }
267 ```
268
269 In this example we replaced attach-point with simple call to `watch` function
270 which renders string value to text representation (text node). It will create a
271 rendition which will observe the `caption` property of the widget and update its
272 contents according to the value changes of the property.
273
274 The key feature of this approach that the rendering function within `watch` may
275 return a complex rendition.
276
277 ```tsx
278 // inside some widget
279 render() {
280 return <section>
281 {watch(this,"user", value => value && [
282 <UserInfo user={value}/>,
283 <LogoutButton click={this._logoutClick}/>
284 ])}
285 </section>;
286 }
287
288 private readonly _logoutClick = () => { /* do logout */ }
289
290 ```
291
292 The `watch` function has two forms:
293
294 * `watch(stateful, prop, render)` - observes the specified property of the
295 `dojo/Stateful` object (or widget)
296 * `watch(observable, render)` - observes the specified observable. It supports
297 `rxjs` or `@implab/djx/observable` observables.
298
299 The `render` callback may return almost anything which will be converted to DOM:
300
301 * `boolean`, `null`, `undefined` - ignored,
302 * `string` - converted to text node,
303 * `array` - converted to DocumentFragment of its elements,
304 * DOM Nodes and widgets are left intact,
305 * any other kind of value will cause an error.
306
@@ -1,437 +1,483
1 1 import { Cancellation } from "@implab/core-amd/Cancellation";
2 2 import { ICancellation } from "@implab/core-amd/interfaces";
3 3
4 4 /**
5 5 * The interface for the consumer of an observable sequence
6 6 */
7 7 export interface Observer<T> {
8 8 /**
9 9 * Called for the next element in the sequence
10 10 */
11 11 next: (value: T) => void;
12 12
13 13 /**
14 14 * Called once when the error occurs in the sequence.
15 15 */
16 16 error: (e: unknown) => void;
17 17
18 18 /**
19 19 * Called once at the end of the sequence.
20 20 */
21 21 complete: () => void;
22 22 }
23 23
24 24 /**
25 25 * The group of functions to feed an observable. These methods are provided to
26 26 * the producer to generate a stream of events.
27 27 */
28 28 export type Sink<T> = {
29 29 /**
30 30 * Call to send the next element in the sequence
31 31 */
32 32 next: (value: T) => void;
33 33
34 34 /**
35 35 * Call to notify about the error occurred in the sequence.
36 36 */
37 37 error: (e: unknown) => void;
38 38
39 39 /**
40 40 * Call to signal the end of the sequence.
41 41 */
42 42 complete: () => void;
43 43
44 44 /**
45 45 * Checks whether the sink is accepting new elements. It's safe to
46 46 * send elements to the closed sink.
47 47 */
48 48 isClosed: () => boolean;
49 49 };
50 50
51 51 export type Producer<T> = (sink: Sink<T>) => (void | (() => void));
52 52
53 53 export interface Unsubscribable {
54 54 unsubscribe(): void;
55 55 }
56 56
57 57 export const isUnsubscribable = (v: unknown): v is Unsubscribable =>
58 58 v !== null && v !== undefined && typeof (v as Unsubscribable).unsubscribe === "function";
59 59
60 60 export const isSubscribable = <T = unknown>(v: unknown): v is Subscribable<T> =>
61 61 v !== null && v !== undefined && typeof (v as Subscribable<unknown>).subscribe === "function";
62 62
63 63 export interface Subscribable<T> {
64 64 subscribe(consumer: Partial<Observer<T>>): Unsubscribable;
65 65 }
66 66
67 67 export type AccumulatorFn<T, A> = (acc: A, value: T) => A;
68 68
69 69 export type OperatorFn<T, U> = (source: Observable<T>) => Producer<U>;
70 70
71 71 /** The observable source of items. */
72 72 export interface Observable<T> extends Subscribable<T> {
73 73 /** Transforms elements of the sequence with the specified mapper
74 74 *
75 75 * @param mapper The mapper used to transform the values
76 76 */
77 77 map<T2>(mapper: (value: T) => T2): Observable<T2>;
78 78
79 79 /** Filters elements of the sequence. The resulting sequence will
80 80 * contain only elements which match the specified predicate.
81 81 *
82 82 * @param predicate The filter predicate.
83 83 */
84 84 filter(predicate: (value: T) => boolean): Observable<T>;
85 85
86 86 /** Completes the sequence once the condition is met.
87 87 * @param predicate The condition which should be met to complete the sequence
88 88 */
89 89 until(predicate: (value: T) => boolean): Observable<T>;
90 90
91 91 /** Keeps the sequence running while elements satisfy the condition.
92 92 *
93 93 * @param predicate The condition which should be met to continue.
94 94 */
95 95 while(predicate: (value: T) => boolean): Observable<T>;
96 96
97 97 /** Applies accumulator to each value in the sequence and
98 98 * emits the accumulated value for each source element
99 99 *
100 100 * @param accumulator
101 101 * @param initial
102 102 */
103 103 scan<A>(accumulator: AccumulatorFn<T, A>, initial: A): Observable<A>;
104 104 scan(accumulator: AccumulatorFn<T, T>): Observable<T>;
105 105
106 106 /** Applies accumulator to each value in the sequence and
107 107 * emits the accumulated value at the end of the sequence
108 108 *
109 109 * @param accumulator
110 110 * @param initial
111 111 */
112 112 reduce<A>(accumulator: AccumulatorFn<T, A>, initial: A): Observable<A>;
113 113 reduce(accumulator: AccumulatorFn<T, T>): Observable<T>;
114 114
115 115 /** Concatenates the specified sequences with this observable
116 116 *
117 117 * @param seq sequences to concatenate with the current observable
118 118 *
119 119 * The concatenation doesn't accumulate values from the specified sequences,
120 120 * The result of the concatenation is the new observable which will switch
121 121 * to the next observable after the previous one completes. Values emitted
122 122 * before the next observable being active are lost.
123 123 */
124 124 cat(...seq: Subscribable<T>[]): Observable<T>;
125 125
126 126
127 127 /** Pipes the specified operator to produce the new observable
128 128 * @param op The operator consumes this observable and produces a new one
129 129 *
130 130 * The operator is a higher order function which takes a source observable
131 131 * and returns a producer for the new observable.
132 132 *
133 133 * This function can be used to create a complex mapping between source and
134 134 * resulting observables. The operator may have a state (or a side effect)
135 135 * and can be connected to multiple observables.
136 136 */
137 137 pipe<U>(op: OperatorFn<T, U>): Observable<U>;
138 138
139 139 /** Waits for the next event to occur and returns a promise for the next value
140 140 * @param ct Cancellation token to
141 141 */
142 142 next(ct?: ICancellation): Promise<T>;
143 143 }
144 144
145 145 const noop = () => { };
146 146
147 147 const sink = <T>(consumer: Partial<Observer<T>>) => {
148 148 const { next, error, complete } = consumer;
149 149 return {
150 150 next: next ? next.bind(consumer) : noop,
151 151 error: error ? error.bind(consumer) : noop,
152 152 complete: complete ? complete.bind(consumer) : noop,
153 153 isClosed: () => false
154 154 };
155 155 };
156 156
157 157 /** Wraps the producer to handle tear down logic and subscription management
158 158 *
159 159 * @param producer The producer to wrap
160 160 * @returns The wrapper producer
161 161 */
162 162 const fuse = <T>(producer: Producer<T>) => ({ next, error, complete }: Sink<T>) => {
163 163 let done = false;
164 164 let cleanup = noop;
165 165
166 166 const _fin = <A extends unknown[]>(fn: (...args: A) => void) =>
167 167 (...args: A) => done ?
168 168 void (0) :
169 169 (done = true, cleanup(), fn(...args));
170 170
171 171 const _fin0 = () => done ? void (0) : (done = true, cleanup());
172 172
173 173 const safeSink = {
174 174 next: (value: T) => { !done && next(value); },
175 175 error: _fin(error),
176 176 complete: _fin(complete),
177 177 isClosed: () => done
178 178 };
179 179 cleanup = producer(safeSink) ?? noop;
180 180 return done ? cleanup() : _fin0;
181 181 };
182 182
183 183 const _observe = <T>(producer: Producer<T>): Observable<T> => ({
184 184 subscribe: (consumer: Partial<Observer<T>>) => ({
185 185 unsubscribe: producer(sink(consumer)) ?? noop
186 186 }),
187 187
188 188 map: (mapper) => _observe(({ next, ...rest }) =>
189 189 producer({
190 190 next: next !== noop ? (v: T) => next(mapper(v)) : noop,
191 191 ...rest
192 192 })
193 193 ),
194 194
195 195 filter: (predicate) => _observe(({ next, ...rest }) =>
196 196 producer({
197 197 next: next !== noop ? (v: T) => predicate(v) ? next(v) : void (0) : noop,
198 198 ...rest
199 199 })
200 200 ),
201 201
202 202 until: predicate => _observe(({ next, complete, ...rest }) =>
203 203 producer({
204 204 next: v => predicate(v) ? complete() : next(v),
205 205 complete,
206 206 ...rest
207 207 })
208 208 ),
209 209
210 210 while: predicate => _observe(({ next, complete, ...rest }) =>
211 211 producer({
212 212 next: v => predicate(v) ? next(v) : complete(),
213 213 complete,
214 214 ...rest
215 215 })
216 216 ),
217 217
218 218 scan: <A>(...args: [AccumulatorFn<T, A>, A] | [AccumulatorFn<T, T>]) => _observe<T | A>(({ next, ...rest }) => {
219 219 if (args.length === 1) {
220 220 const [accumulator] = args;
221 221 let _acc: T;
222 222 let index = 0;
223 223 return producer({
224 224 next: next !== noop ? (v: T) => next(index++ === 0 ? _acc = v : _acc = accumulator(_acc, v)) : noop,
225 225 ...rest
226 226 });
227 227 } else {
228 228 const [accumulator, initial] = args;
229 229 let _acc = initial;
230 230 return producer({
231 231 next: next !== noop ? (v: T) => next(_acc = accumulator(_acc, v)) : noop,
232 232 ...rest
233 233 });
234 234 }
235 235 }),
236 236
237 237 reduce: <A>(...args: [AccumulatorFn<T, A>, A] | [AccumulatorFn<T, T>]) => _observe<T | A>(({ next, complete, error, ...rest }) => {
238 238 if (args.length === 1) {
239 239 const [accumulator] = args;
240 240 let _acc: T;
241 241 let index = 0;
242 242 return producer({
243 243 next: next !== noop ? (v: T) => {
244 244 _acc = index++ === 0 ? v : accumulator(_acc, v);
245 245 } : noop,
246 246 complete: () => {
247 247 if (index === 0) {
248 248 error(new Error("The sequence can't be empty"));
249 249 } else {
250 250 next(_acc);
251 251 complete();
252 252 }
253 253 },
254 254 error,
255 255 ...rest
256 256 });
257 257 } else {
258 258 const [accumulator, initial] = args;
259 259 let _acc = initial;
260 260 return producer({
261 261 next: next !== noop ? (v: T) => {
262 262 _acc = accumulator(_acc, v);
263 263 } : noop,
264 264 complete: () => {
265 265 next(_acc);
266 266 complete();
267 267 },
268 268 error,
269 269 ...rest
270 270 });
271 271 }
272 272 }),
273 273
274 274 cat: (...seq) => _observe(({ next, complete: final, ...rest }) => {
275 275 let cleanup: () => void;
276 276 const complete = () => {
277 277 const continuation = seq.shift();
278 278 if (continuation) {
279 279 // if we have a next sequence, subscribe to it
280 280 const subscription = continuation.subscribe({ next, complete, ...rest });
281 281 cleanup = subscription.unsubscribe.bind(subscription);
282 282 } else {
283 283 // otherwise notify the consumer about completion
284 284 final();
285 285 }
286 286 };
287 287
288 288 cleanup = producer({ next, complete, ...rest }) ?? noop;
289 289
290 290 return () => cleanup();
291 291 }),
292 292
293 293 pipe: <U>(op: (source: Observable<T>) => Producer<U>) => observe(op(_observe(producer))),
294 294
295 295 next: (ct?: ICancellation) => {
296 296 const _ct = ct ?? Cancellation.none;
297 297 return new Promise<T>((resolve, reject) => {
298 298 // wrap the producer to handle only single event
299 299 const once = fuse<T>(({ next, complete, error, isClosed }) => {
300 300 const h = _ct.register(error);
301 301
302 302 // is the _ct fires it will call error() and isClosed() will return true
303 303 const cleanup = !isClosed() ?
304 304 producer({
305 305 next: v => (next(v), complete()),
306 306 complete: () => error(new Error("The sequence is empty")),
307 307 error,
308 308 isClosed
309 309 }) ?? noop :
310 310 noop;
311 311
312 312 return () => {
313 313 h.destroy();
314 314 cleanup();
315 315 };
316 316 });
317 317
318 318 once({
319 319 next: resolve,
320 320 error: reject,
321 321 complete: noop,
322 322 isClosed: () => false
323 323 });
324 324 });
325 325 }
326 326 });
327 327
328 328 export const observe = <T>(producer: Producer<T>) => _observe(fuse(producer));
329 329
330 330 export const streamArray = <T>(items: T[]) => _observe<T>(
331 331 ({ next, complete }) => (
332 332 items.forEach(next),
333 333 complete()
334 334 )
335 335 );
336 336
337 337 export const streamPromise = <T>(promise: PromiseLike<T>) => observe<T>(
338 338 ({ next, error, complete }) => void promise.then(
339 339 v => (next(v), complete()),
340 340 error
341 341 )
342 342 );
343 343
344 344 export const of = <T>(...items: T[]) => _observe<T>(
345 345 ({ next, complete }) => (
346 346 items.forEach(next),
347 347 complete()
348 348 )
349 349 );
350 350
351 351 export const empty = _observe<never>(({ complete }) => complete());
352 352
353 353 /**
354 354 * Creates a mutable state and the observable for the stored value.
355 355 *
356 356 * @param value The initial value for the state
357 357 * @returns an array of three elements `[observable, setter, getter]`
358 358 *
359 359 * The returned observable keeps the actual value and will emit it as the next
360 360 * element each time a consumer subscribes the observable.
361 361 *
362 362 * Calling the setter will update the stored value in the observable and notify
363 363 * all consumers.
364 364 */
365 365 export const stateful = <T>(producer: Producer<T>): Producer<T> => {
366 366 const fusedProducer = fuse(producer);
367 367 type Status = "active" | "complete" | "error";
368 368
369 369 let lastValue: T;
370 370 let hasValue = false;
371 371 let status: Status = "active";
372 372 let lastError: unknown;
373 373 let subscribers: Sink<T>[] = [];
374 374
375 375 const sink: Sink<T> = {
376 376 isClosed: () => status !== "active",
377 377 complete: () => {
378 378 if (status === "active") {
379 379 status = "complete";
380 380 const _subscribers = subscribers;
381 381 subscribers = [];
382 382 _subscribers.forEach(s => s.complete());
383 383 }
384 384 },
385 385 error: e => {
386 386 if (status === "active") {
387 387 status = "error";
388 388 lastError = e;
389 389 const _subscribers = subscribers;
390 390 subscribers = [];
391 391 _subscribers.forEach(s => s.error(e));
392 392 }
393 393 },
394 394 next: v => {
395 395 if (status === "active") {
396 396 hasValue = true;
397 397 lastValue = v;
398 398 const _subscribers = subscribers;
399 399 _subscribers.forEach(s => s.next(v));
400 400 }
401 401 }
402 402 };
403 403
404 404 fusedProducer(sink);
405 405
406 406 return (s: Sink<T>) => {
407 407 const _subscribers = subscribers;
408 408 switch (status) {
409 409 case "active":
410 410 if (hasValue)
411 411 s.next(lastValue); // if hasValue is true,
412 // lastValue has a valid value
412 // lastValue has a valid value
413 413 subscribers.push(s);
414 414 return () => {
415 415 if (_subscribers === subscribers) {
416 416 const pos = subscribers.indexOf(s);
417 417 if (pos >= 0)
418 418 subscribers.splice(pos, 1);
419 419 }
420 420 };
421 421 case "complete":
422 422 s.complete();
423 423 break;
424 424 case "error":
425 425 s.error(lastError);
426 426 break;
427 427 }
428 428 };
429 429 };
430 430
431 const subject = <T>(producer: Producer<T>): Producer<T> => {
431 /** Create the producer which will be called once when the first subscriber is
432 * attached, next subscribers would share the same producer. When all
433 * subscribers are removed the producer will be cleaned up.
434 *
435 * Use this wrapper to prevent spawning multiple producers.
436 *
437 * @param producer The source producer
438 * @returns The wrapped producer
439 */
440 export const subject = <T>(producer: Producer<T>): Producer<T> => {
432 441 const fusedProducer = fuse(producer);
433 442
434 return () => {
443 let subscribers: Sink<T>[] = [];
444
445 let cleanup = noop;
435 446
447 const sink: Sink<T> = {
448 isClosed: () => false,
449 complete: () => {
450 const _subscribers = subscribers;
451 subscribers = [];
452 _subscribers.forEach(s => s.complete());
453 cleanup();
454 },
455 error: e => {
456 const _subscribers = subscribers;
457 subscribers = [];
458 _subscribers.forEach(s => s.error(e));
459 cleanup();
460 },
461 next: v => {
462 const _subscribers = subscribers;
463 _subscribers.forEach(s => s.next(v));
464 }
465 };
466
467 return client => {
468 const _subscribers = subscribers;
469 subscribers.push(client);
470 if (subscribers.length === 1)
471 cleanup = fusedProducer(sink) ?? noop;
472
473 return () => {
474 if (_subscribers === subscribers) {
475 const pos = subscribers.indexOf(client);
476 if (pos >= 0)
477 subscribers.splice(pos,1);
478 if (!subscribers.length)
479 cleanup();
480 }
481 };
436 482 };
437 483 }; No newline at end of file
@@ -1,3 +1,4
1 1 import "./declare-tests";
2 2 import "./observable-tests";
3 import "./state-tests"; No newline at end of file
3 import "./state-tests";
4 import "./subject-tests"; No newline at end of file
1 NO CONTENT: modified file
The requested commit or file is too big and content was truncated. Show full diff
General Comments 0
You need to be logged in to leave comments. Login now