##// END OF EJS Templates
added observable subject producer
cin -
r125:cede47727a1b v1.7.0 default
parent child
Show More

The requested changes are too big and content was truncated. Show full diff

@@ -0,0 +1,78
1 import { observe, subject } from "./observable";
2 import * as tap from "tap";
3
4 tap.test("Subject tests", t => {
5
6 let nextEvent: (value: string) => void = () => void (0);
7
8 const subj1 = observe(subject<string>(({ next }) => {
9 t.comment("Start subject");
10
11 nextEvent = next;
12
13 return () => {
14 nextEvent = () => void (0);
15 t.comment("Stop subject");
16 };
17 }));
18
19 const h1 = subj1.subscribe({
20 next: v => t.comment(`h1 next: ${v}`)
21 });
22
23 nextEvent("first");
24
25 const h2 = subj1.subscribe({
26 next: v => t.comment(`h2 next: ${v}`)
27 });
28
29 nextEvent("second");
30
31 h1.unsubscribe();
32
33 nextEvent("third");
34
35 h2.unsubscribe();
36
37 t.pass("Subject finished");
38 t.end();
39 }).catch(e => console.error(e));
40
41
42 tap.test("Subject tests #2", t => {
43
44 let nextEvent: (value: string) => void = () => void (0);
45
46 const subj1 = observe(subject<string>(({ next, complete }) => {
47 t.comment("Start subject");
48
49 complete();
50 nextEvent = next;
51
52 return () => {
53 nextEvent = () => void (0);
54 t.comment("Stop subject");
55 };
56 }));
57
58 const h1 = subj1.subscribe({
59 next: v => t.comment(`h1 next: ${v}`)
60 });
61
62 nextEvent("first");
63
64 const h2 = subj1.subscribe({
65 next: v => t.comment(`h2 next: ${v}`)
66 });
67
68 nextEvent("second");
69
70 h1.unsubscribe();
71
72 nextEvent("third");
73
74 h2.unsubscribe();
75
76 t.pass("Subject finished");
77 t.end();
78 }).catch(e => console.error(e)); No newline at end of file
@@ -1,198 +1,306
1 # @implab/djx
1 # @implab/djx
2
2
3 ## SYNOPSIS
3 ## SYNOPSIS
4
4
5 ```tsx
5 ```tsx
6 import { djbase, djclass, bind, prototype, AbstractConstructor } from "@implab/djx/declare";
6 import { djbase, djclass, bind, prototype, AbstractConstructor } from "@implab/djx/declare";
7
7
8 import { DjxWidgetBase } from "@implab/djx/tsx/DjxWidgetBase";
8 import { DjxWidgetBase } from "@implab/djx/tsx/DjxWidgetBase";
9 import { createElement } from "@implab/djx/tsx";
9 import { createElement } from "@implab/djx/tsx";
10
10
11 interface MyWidgetAttrs {
11 interface MyWidgetAttrs {
12 title: string;
12 title: string;
13
13
14 counter: number;
14 counter: number;
15 }
15 }
16
16
17 interface MyWidgetEvents {
17 interface MyWidgetEvents {
18 "count-inc": Event;
18 "count-inc": Event;
19
19
20 "count-dec": Event;
20 "count-dec": Event;
21 }
21 }
22
22
23
23
24 @djclass
24 @djclass
25 export class MyWidget extends djbase(
25 export class MyWidget extends djbase(
26 DjxWidgetBase as AbstractConstructor<DjxWidgetBase<MyWidgetAttrs, MyWidgetEvents>>
26 DjxWidgetBase as AbstractConstructor<DjxWidgetBase<MyWidgetAttrs, MyWidgetEvents>>
27 ) {
27 ) {
28
28
29 @bind({ node: "titleNode", type: "innerHTML" })
29 @bind({ node: "titleNode", type: "innerHTML" })
30 title = "";
30 title = "";
31
31
32 @prototype()
32 @prototype()
33 counter = 0;
33 counter = 0;
34
34
35 render() {
35 render() {
36 const Frame = (props: any) => <div>{props.children}</div>;
36 const Frame = (props: any) => <div>{props.children}</div>;
37 return <div
37 return <div
38 className="myWidget"
38 className="myWidget"
39 tabIndex={3}
39 tabIndex={3}
40 style={ alignContent: "center", border: "1px solid" }
40 style={ alignContent: "center", border: "1px solid" }
41 >
41 >
42 <h1 data-dojo-attach-point="titleNode"></h1>
42 <h1 data-dojo-attach-point="titleNode"></h1>
43 <Frame>
43 <Frame>
44 <span class="up-button" onclick={e => this._onIncClick(e)}>[+]</span>
44 <span class="up-button" onclick={e => this._onIncClick(e)}>[+]</span>
45 <span class="down-button" onclick={() => this._onDecClick()}>[-]</span>
45 <span class="down-button" onclick={() => this._onDecClick()}>[-]</span>
46 </Frame>
46 </Frame>
47 </div>;
47 </div>;
48 }
48 }
49
49
50 _onIncClick(e: MouseEvent) {
50 _onIncClick(e: MouseEvent) {
51 this.emit("count-inc", { bubbles: false });
51 this.emit("count-inc", { bubbles: false });
52 }
52 }
53
53
54 _onDecClick() {
54 _onDecClick() {
55 this.emit("count-dec", { bubbles: false });
55 this.emit("count-dec", { bubbles: false });
56 }
56 }
57 }
57 }
58
58
59 ```
59 ```
60
60
61 ## DESCRIPTION
61 ## DESCRIPTION
62
62
63 This package provides you with the tools to glue your good-fellow dojo with modern
63 This package provides you with the tools to glue your good-fellow dojo with modern
64 techniques of building the webapp. The core concept is to built around widgets and
64 techniques of building the webapp. The core concept is to built around widgets and
65 using .tsx to write it. Here are some features:
65 using .tsx to write it. Here are some features:
66
66
67 * `djbase()`, `@djaclass` - traits to declare your classes with `dojo/_base/declare`
67 * `djbase()`, `@djaclass` - traits to declare your classes with `dojo/_base/declare`
68 * `@implab/djx/tsx` - traits to build the rendering of your widgets with tsx
68 * `@implab/djx/tsx` - traits to build the rendering of your widgets with tsx
69 * `DjxWidgetBase` - abstract class which supports tsx markup and
69 * `DjxWidgetBase` - abstract class which supports tsx markup and
70 `data-dojo-attach-*` attributes.
70 `data-dojo-attach-*` attributes.
71 * `@bind(...)` - annotations provide an easy way of using standard dojo widget
71 * `@bind(...)` - annotations provide an easy way of using standard dojo widget
72 attribute bindings.
72 attribute bindings.
73
73
74 ### djbase, @djclass
74 ### djbase, @djclass
75
75
76 These two traits provide convenient way of using `dojo/_base/declare` in Typescript
76 These two traits provide convenient way of using `dojo/_base/declare` in Typescript
77 for declaring your classes.
77 for declaring your classes.
78
78
79 `djbase(...constructors)` - this method accepts a list of constructors in its
79 `djbase(...constructors)` - this method accepts a list of constructors in its
80 parameters and returns the **fake** base type which then can be used to derive
80 parameters and returns the **fake** base type which then can be used to derive
81 your own class. This allows you to provide the Typescript with the correct
81 your own class. This allows you to provide the Typescript with the correct
82 information about the base type and even use `super`!. The only caveat of
82 information about the base type and even use `super`!. The only caveat of
83 this approach is that you **MUST** decorate your class with `@djclass` annotation.
83 this approach is that you **MUST** decorate your class with `@djclass` annotation.
84
84
85 Consider the following example:
85 Consider the following example:
86
86
87 ```ts
87 ```ts
88 import { djbase, djclass } from "@implab/djx/declare";
88 import { djbase, djclass } from "@implab/djx/declare";
89 import { FooMixin } from "./FooMixin";
89 import { FooMixin } from "./FooMixin";
90 import { BarMixin } from "./BarMixin";
90 import { BarMixin } from "./BarMixin";
91 import { BoxMixin } from "./BoxMixin";
91 import { BoxMixin } from "./BoxMixin";
92
92
93 @djclass
93 @djclass
94 export class Baz extends djbase(FooMixin, BarMixin, BoxMixin) {
94 export class Baz extends djbase(FooMixin, BarMixin, BoxMixin) {
95 writeHello(out: string[]) {
95 writeHello(out: string[]) {
96 out.push("-> Baz");
96 out.push("-> Baz");
97
97
98 super.writeHello(out);
98 super.writeHello(out);
99
99
100 out.push("<- Baz");
100 out.push("<- Baz");
101 }
101 }
102 }
102 }
103
103
104 ```
104 ```
105
105
106 All mixins are declared like the one below:
106 All mixins are declared like the one below:
107
107
108 ```ts
108 ```ts
109 import { djclass, djbase } from "@implab/djx/declare";
109 import { djclass, djbase } from "@implab/djx/declare";
110
110
111 interface Super {
111 interface Super {
112 writeHello(out: string[]): void;
112 writeHello(out: string[]): void;
113
113
114 }
114 }
115
115
116 @djclass
116 @djclass
117 export class BarMixin extends djbase<Super>() {
117 export class BarMixin extends djbase<Super>() {
118 writeHello(out: string[]) {
118 writeHello(out: string[]) {
119 out.push("-> Bar");
119 out.push("-> Bar");
120
120
121 super.writeHello(out);
121 super.writeHello(out);
122
122
123 out.push("<- Bar");
123 out.push("<- Bar");
124 }
124 }
125 }
125 }
126 ```
126 ```
127
127
128 finally create an instance and call the `writeHello` method
128 finally create an instance and call the `writeHello` method
129
129
130 ```ts
130 ```ts
131 const baz = new Baz();
131 const baz = new Baz();
132
132
133 const data: string[] = [];
133 const data: string[] = [];
134 baz.writeHello(data);
134 baz.writeHello(data);
135
135
136 console.log(data.join("\n"));
136 console.log(data.join("\n"));
137
137
138 ```
138 ```
139
139
140 you will get the following output:
140 you will get the following output:
141
141
142 ```text
142 ```text
143 -> Baz
143 -> Baz
144 -> Box
144 -> Box
145 -> Bar
145 -> Bar
146 -> Foo
146 -> Foo
147 <- Foo
147 <- Foo
148 <- Bar
148 <- Bar
149 <- Box
149 <- Box
150 <- Baz
150 <- Baz
151 ```
151 ```
152
152
153 Let's take a closer look at the `Baz` declaration it uses `djbase` to derive
153 Let's take a closer look at the `Baz` declaration it uses `djbase` to derive
154 from three mixins and the class is decorated with `@djclass` to accomplish the
154 from three mixins and the class is decorated with `@djclass` to accomplish the
155 declaration and make a real constructor.
155 declaration and make a real constructor.
156
156
157 To allow access to the next sibling method (in terms of multiple inheritance)
157 To allow access to the next sibling method (in terms of multiple inheritance)
158 Dojo provides `this.inherited(arguments)` method but this approach leads to the
158 Dojo provides `this.inherited(arguments)` method but this approach leads to the
159 problem with 'strict' mode of ES5 and eliminates the type information about a
159 problem with 'strict' mode of ES5 and eliminates the type information about a
160 calling method. This library solves the problem calling inherited/next method by
160 calling method. This library solves the problem calling inherited/next method by
161 utilizing `super` keyword. Under the hood there are proxy methods generated in
161 utilizing `super` keyword. Under the hood there are proxy methods generated in
162 the prototype of the declared class which make calls to `this.inherited(...)`
162 the prototype of the declared class which make calls to `this.inherited(...)`
163 method. This technique is compatible with 'strict' mode.
163 method. This technique is compatible with 'strict' mode.
164
164
165 Mixins are declared similar, they also may have the base types although
165 Mixins are declared similar, they also may have the base types although
166 the most common case is declaring the mixin without any base classes. To allow
166 the most common case is declaring the mixin without any base classes. To allow
167 the mixin to access the next method declare the interface with
167 the mixin to access the next method declare the interface with
168 desired methods and use the special form of `djbase<Super>()` without arguments.
168 desired methods and use the special form of `djbase<Super>()` without arguments.
169
169
170 ### DjxWidgetBase<Attrs, Events>
170 ### DjxWidgetBase<Attrs, Events>
171
171
172 TODO
172 This is the base class for the djx widgets. It declares the abstract method
173 `render()` which is used to render the content of the widget, like `_TemplatedMixin`.
174
175 This class extends `dijit/_WidgetBase` and contains logic from `_AttachMixin` thus
176 it is capable to handle `data-dojo-attach-*` attributes from the rendered markup.
177
178 ```tsx
179 @djclass
180 export class MyFirstWidget extends djbase(DjxWidgetBase) {
181 render() {
182 return <h1>My first widget</h1>;
183 }
184 }
185 ```
173
186
174 ### Markup (.tsx)
187 ### Markup (.tsx)
175
188
176 Add to your `tsconfig.json` the following options
189 Add to your `tsconfig.json` the following options
177
190
178 ```json
191 ```json
179 {
192 {
180 "compilerOptions": {
193 "compilerOptions": {
181 "types": ["@implab/djx"],
194 "types": [
195 "@implab/djx",
196 "@implab/dojo-typings"
197 ],
198 "skipLibCheck": true,
182 "experimentalDecorators": true,
199 "experimentalDecorators": true,
183 "jsxFactory": "createElement",
200 "jsxFactory": "createElement",
184 "jsx": "react",
201 "jsx": "react",
202 "target": "ES5", // minimal supported version
203 "lib": ["ES2015", "DOM"]
185 }
204 }
186 }
205 }
187
206
188 ```
207 ```
189
208
190 Import `createElement` into your `.tsx` file
209 Import `createElement` into your `.tsx` file
191
210
192 ```ts
211 ```ts
193 import { createElement } from "@implab/djx/tsx";
212 import { createElement } from "@implab/djx/tsx";
194 ```
213 ```
195
214
196 You are ready to go!
215 You are ready to go!
197
216
198 TODO
217 ### Adding reactive behavior: refs, watch(...) and watchFor(...)
218
219 This library adds some reactive traits to update the generated DOM of the widget.
220 Dojo 1.x adds some standard options to deal with dynamic changes:
221
222 * `data-dojo-attach-point` allows to get reference to an element (or a nested widget)
223 * widget attribute mappings, allows to bind widget's property to a property of
224 the element, referenced by `data-dojo-attach-point`.
225
226 The typical implementation of this technique would look like
227
228 ```tsx
229 import { createElement } from "@implab/djx/tsx";
230 import {djclass, djbase, bind} from "@implab/djx/declare";
231
232 @djclass
233 export class MyFirstWidget extends djbase(DjxWidgetBase) {
234
235 // @bind will generate special attribute mapping
236 // _setCaptionAttr = { node: "captionNode", type: "innerHTML" }
237 @bind({ node: "captionNode", type: "innerHTML" })
238 caption = "My first widget";
239
240 render() {
241 return <h1 data-dojo-attach-point="captionNode"/>;
242 }
243 }
244 ```
245
246 Despite this is a natural way for the dojo it has some disadvantages:
247
248 1. The compiler doesn't check existence of the attach-point.
249 2. Attribute mappings support only simple mappings, it's difficult to update the
250 complex rendition.
251
252 This library helps you to get both goals with special trait `watch(...)`
253
254 ```tsx
255 import { createElement } from "@implab/djx/tsx";
256 import { djclass, djbase} from "@implab/djx/declare"
257
258 @djclass
259 export class MyFirstWidget extends djbase(DjxWidgetBase) {
260
261 caption = "My first widget";
262
263 render() {
264 return <h1>{watch(this,"caption", value => value)}</h1>;
265 }
266 }
267 ```
268
269 In this example we replaced attach-point with simple call to `watch` function
270 which renders string value to text representation (text node). It will create a
271 rendition which will observe the `caption` property of the widget and update its
272 contents according to the value changes of the property.
273
274 The key feature of this approach that the rendering function within `watch` may
275 return a complex rendition.
276
277 ```tsx
278 // inside some widget
279 render() {
280 return <section>
281 {watch(this,"user", value => value && [
282 <UserInfo user={value}/>,
283 <LogoutButton click={this._logoutClick}/>
284 ])}
285 </section>;
286 }
287
288 private readonly _logoutClick = () => { /* do logout */ }
289
290 ```
291
292 The `watch` function has two forms:
293
294 * `watch(stateful, prop, render)` - observes the specified property of the
295 `dojo/Stateful` object (or widget)
296 * `watch(observable, render)` - observes the specified observable. It supports
297 `rxjs` or `@implab/djx/observable` observables.
298
299 The `render` callback may return almost anything which will be converted to DOM:
300
301 * `boolean`, `null`, `undefined` - ignored,
302 * `string` - converted to text node,
303 * `array` - converted to DocumentFragment of its elements,
304 * DOM Nodes and widgets are left intact,
305 * any other kind of value will cause an error.
306
@@ -1,437 +1,483
1 import { Cancellation } from "@implab/core-amd/Cancellation";
1 import { Cancellation } from "@implab/core-amd/Cancellation";
2 import { ICancellation } from "@implab/core-amd/interfaces";
2 import { ICancellation } from "@implab/core-amd/interfaces";
3
3
4 /**
4 /**
5 * The interface for the consumer of an observable sequence
5 * The interface for the consumer of an observable sequence
6 */
6 */
7 export interface Observer<T> {
7 export interface Observer<T> {
8 /**
8 /**
9 * Called for the next element in the sequence
9 * Called for the next element in the sequence
10 */
10 */
11 next: (value: T) => void;
11 next: (value: T) => void;
12
12
13 /**
13 /**
14 * Called once when the error occurs in the sequence.
14 * Called once when the error occurs in the sequence.
15 */
15 */
16 error: (e: unknown) => void;
16 error: (e: unknown) => void;
17
17
18 /**
18 /**
19 * Called once at the end of the sequence.
19 * Called once at the end of the sequence.
20 */
20 */
21 complete: () => void;
21 complete: () => void;
22 }
22 }
23
23
24 /**
24 /**
25 * The group of functions to feed an observable. These methods are provided to
25 * The group of functions to feed an observable. These methods are provided to
26 * the producer to generate a stream of events.
26 * the producer to generate a stream of events.
27 */
27 */
28 export type Sink<T> = {
28 export type Sink<T> = {
29 /**
29 /**
30 * Call to send the next element in the sequence
30 * Call to send the next element in the sequence
31 */
31 */
32 next: (value: T) => void;
32 next: (value: T) => void;
33
33
34 /**
34 /**
35 * Call to notify about the error occurred in the sequence.
35 * Call to notify about the error occurred in the sequence.
36 */
36 */
37 error: (e: unknown) => void;
37 error: (e: unknown) => void;
38
38
39 /**
39 /**
40 * Call to signal the end of the sequence.
40 * Call to signal the end of the sequence.
41 */
41 */
42 complete: () => void;
42 complete: () => void;
43
43
44 /**
44 /**
45 * Checks whether the sink is accepting new elements. It's safe to
45 * Checks whether the sink is accepting new elements. It's safe to
46 * send elements to the closed sink.
46 * send elements to the closed sink.
47 */
47 */
48 isClosed: () => boolean;
48 isClosed: () => boolean;
49 };
49 };
50
50
51 export type Producer<T> = (sink: Sink<T>) => (void | (() => void));
51 export type Producer<T> = (sink: Sink<T>) => (void | (() => void));
52
52
53 export interface Unsubscribable {
53 export interface Unsubscribable {
54 unsubscribe(): void;
54 unsubscribe(): void;
55 }
55 }
56
56
57 export const isUnsubscribable = (v: unknown): v is Unsubscribable =>
57 export const isUnsubscribable = (v: unknown): v is Unsubscribable =>
58 v !== null && v !== undefined && typeof (v as Unsubscribable).unsubscribe === "function";
58 v !== null && v !== undefined && typeof (v as Unsubscribable).unsubscribe === "function";
59
59
60 export const isSubscribable = <T = unknown>(v: unknown): v is Subscribable<T> =>
60 export const isSubscribable = <T = unknown>(v: unknown): v is Subscribable<T> =>
61 v !== null && v !== undefined && typeof (v as Subscribable<unknown>).subscribe === "function";
61 v !== null && v !== undefined && typeof (v as Subscribable<unknown>).subscribe === "function";
62
62
63 export interface Subscribable<T> {
63 export interface Subscribable<T> {
64 subscribe(consumer: Partial<Observer<T>>): Unsubscribable;
64 subscribe(consumer: Partial<Observer<T>>): Unsubscribable;
65 }
65 }
66
66
67 export type AccumulatorFn<T, A> = (acc: A, value: T) => A;
67 export type AccumulatorFn<T, A> = (acc: A, value: T) => A;
68
68
69 export type OperatorFn<T, U> = (source: Observable<T>) => Producer<U>;
69 export type OperatorFn<T, U> = (source: Observable<T>) => Producer<U>;
70
70
71 /** The observable source of items. */
71 /** The observable source of items. */
72 export interface Observable<T> extends Subscribable<T> {
72 export interface Observable<T> extends Subscribable<T> {
73 /** Transforms elements of the sequence with the specified mapper
73 /** Transforms elements of the sequence with the specified mapper
74 *
74 *
75 * @param mapper The mapper used to transform the values
75 * @param mapper The mapper used to transform the values
76 */
76 */
77 map<T2>(mapper: (value: T) => T2): Observable<T2>;
77 map<T2>(mapper: (value: T) => T2): Observable<T2>;
78
78
79 /** Filters elements of the sequence. The resulting sequence will
79 /** Filters elements of the sequence. The resulting sequence will
80 * contain only elements which match the specified predicate.
80 * contain only elements which match the specified predicate.
81 *
81 *
82 * @param predicate The filter predicate.
82 * @param predicate The filter predicate.
83 */
83 */
84 filter(predicate: (value: T) => boolean): Observable<T>;
84 filter(predicate: (value: T) => boolean): Observable<T>;
85
85
86 /** Completes the sequence once the condition is met.
86 /** Completes the sequence once the condition is met.
87 * @param predicate The condition which should be met to complete the sequence
87 * @param predicate The condition which should be met to complete the sequence
88 */
88 */
89 until(predicate: (value: T) => boolean): Observable<T>;
89 until(predicate: (value: T) => boolean): Observable<T>;
90
90
91 /** Keeps the sequence running while elements satisfy the condition.
91 /** Keeps the sequence running while elements satisfy the condition.
92 *
92 *
93 * @param predicate The condition which should be met to continue.
93 * @param predicate The condition which should be met to continue.
94 */
94 */
95 while(predicate: (value: T) => boolean): Observable<T>;
95 while(predicate: (value: T) => boolean): Observable<T>;
96
96
97 /** Applies accumulator to each value in the sequence and
97 /** Applies accumulator to each value in the sequence and
98 * emits the accumulated value for each source element
98 * emits the accumulated value for each source element
99 *
99 *
100 * @param accumulator
100 * @param accumulator
101 * @param initial
101 * @param initial
102 */
102 */
103 scan<A>(accumulator: AccumulatorFn<T, A>, initial: A): Observable<A>;
103 scan<A>(accumulator: AccumulatorFn<T, A>, initial: A): Observable<A>;
104 scan(accumulator: AccumulatorFn<T, T>): Observable<T>;
104 scan(accumulator: AccumulatorFn<T, T>): Observable<T>;
105
105
106 /** Applies accumulator to each value in the sequence and
106 /** Applies accumulator to each value in the sequence and
107 * emits the accumulated value at the end of the sequence
107 * emits the accumulated value at the end of the sequence
108 *
108 *
109 * @param accumulator
109 * @param accumulator
110 * @param initial
110 * @param initial
111 */
111 */
112 reduce<A>(accumulator: AccumulatorFn<T, A>, initial: A): Observable<A>;
112 reduce<A>(accumulator: AccumulatorFn<T, A>, initial: A): Observable<A>;
113 reduce(accumulator: AccumulatorFn<T, T>): Observable<T>;
113 reduce(accumulator: AccumulatorFn<T, T>): Observable<T>;
114
114
115 /** Concatenates the specified sequences with this observable
115 /** Concatenates the specified sequences with this observable
116 *
116 *
117 * @param seq sequences to concatenate with the current observable
117 * @param seq sequences to concatenate with the current observable
118 *
118 *
119 * The concatenation doesn't accumulate values from the specified sequences,
119 * The concatenation doesn't accumulate values from the specified sequences,
120 * The result of the concatenation is the new observable which will switch
120 * The result of the concatenation is the new observable which will switch
121 * to the next observable after the previous one completes. Values emitted
121 * to the next observable after the previous one completes. Values emitted
122 * before the next observable being active are lost.
122 * before the next observable being active are lost.
123 */
123 */
124 cat(...seq: Subscribable<T>[]): Observable<T>;
124 cat(...seq: Subscribable<T>[]): Observable<T>;
125
125
126
126
127 /** Pipes the specified operator to produce the new observable
127 /** Pipes the specified operator to produce the new observable
128 * @param op The operator consumes this observable and produces a new one
128 * @param op The operator consumes this observable and produces a new one
129 *
129 *
130 * The operator is a higher order function which takes a source observable
130 * The operator is a higher order function which takes a source observable
131 * and returns a producer for the new observable.
131 * and returns a producer for the new observable.
132 *
132 *
133 * This function can be used to create a complex mapping between source and
133 * This function can be used to create a complex mapping between source and
134 * resulting observables. The operator may have a state (or a side effect)
134 * resulting observables. The operator may have a state (or a side effect)
135 * and can be connected to multiple observables.
135 * and can be connected to multiple observables.
136 */
136 */
137 pipe<U>(op: OperatorFn<T, U>): Observable<U>;
137 pipe<U>(op: OperatorFn<T, U>): Observable<U>;
138
138
139 /** Waits for the next event to occur and returns a promise for the next value
139 /** Waits for the next event to occur and returns a promise for the next value
140 * @param ct Cancellation token to
140 * @param ct Cancellation token to
141 */
141 */
142 next(ct?: ICancellation): Promise<T>;
142 next(ct?: ICancellation): Promise<T>;
143 }
143 }
144
144
145 const noop = () => { };
145 const noop = () => { };
146
146
147 const sink = <T>(consumer: Partial<Observer<T>>) => {
147 const sink = <T>(consumer: Partial<Observer<T>>) => {
148 const { next, error, complete } = consumer;
148 const { next, error, complete } = consumer;
149 return {
149 return {
150 next: next ? next.bind(consumer) : noop,
150 next: next ? next.bind(consumer) : noop,
151 error: error ? error.bind(consumer) : noop,
151 error: error ? error.bind(consumer) : noop,
152 complete: complete ? complete.bind(consumer) : noop,
152 complete: complete ? complete.bind(consumer) : noop,
153 isClosed: () => false
153 isClosed: () => false
154 };
154 };
155 };
155 };
156
156
157 /** Wraps the producer to handle tear down logic and subscription management
157 /** Wraps the producer to handle tear down logic and subscription management
158 *
158 *
159 * @param producer The producer to wrap
159 * @param producer The producer to wrap
160 * @returns The wrapper producer
160 * @returns The wrapper producer
161 */
161 */
162 const fuse = <T>(producer: Producer<T>) => ({ next, error, complete }: Sink<T>) => {
162 const fuse = <T>(producer: Producer<T>) => ({ next, error, complete }: Sink<T>) => {
163 let done = false;
163 let done = false;
164 let cleanup = noop;
164 let cleanup = noop;
165
165
166 const _fin = <A extends unknown[]>(fn: (...args: A) => void) =>
166 const _fin = <A extends unknown[]>(fn: (...args: A) => void) =>
167 (...args: A) => done ?
167 (...args: A) => done ?
168 void (0) :
168 void (0) :
169 (done = true, cleanup(), fn(...args));
169 (done = true, cleanup(), fn(...args));
170
170
171 const _fin0 = () => done ? void (0) : (done = true, cleanup());
171 const _fin0 = () => done ? void (0) : (done = true, cleanup());
172
172
173 const safeSink = {
173 const safeSink = {
174 next: (value: T) => { !done && next(value); },
174 next: (value: T) => { !done && next(value); },
175 error: _fin(error),
175 error: _fin(error),
176 complete: _fin(complete),
176 complete: _fin(complete),
177 isClosed: () => done
177 isClosed: () => done
178 };
178 };
179 cleanup = producer(safeSink) ?? noop;
179 cleanup = producer(safeSink) ?? noop;
180 return done ? cleanup() : _fin0;
180 return done ? cleanup() : _fin0;
181 };
181 };
182
182
183 const _observe = <T>(producer: Producer<T>): Observable<T> => ({
183 const _observe = <T>(producer: Producer<T>): Observable<T> => ({
184 subscribe: (consumer: Partial<Observer<T>>) => ({
184 subscribe: (consumer: Partial<Observer<T>>) => ({
185 unsubscribe: producer(sink(consumer)) ?? noop
185 unsubscribe: producer(sink(consumer)) ?? noop
186 }),
186 }),
187
187
188 map: (mapper) => _observe(({ next, ...rest }) =>
188 map: (mapper) => _observe(({ next, ...rest }) =>
189 producer({
189 producer({
190 next: next !== noop ? (v: T) => next(mapper(v)) : noop,
190 next: next !== noop ? (v: T) => next(mapper(v)) : noop,
191 ...rest
191 ...rest
192 })
192 })
193 ),
193 ),
194
194
195 filter: (predicate) => _observe(({ next, ...rest }) =>
195 filter: (predicate) => _observe(({ next, ...rest }) =>
196 producer({
196 producer({
197 next: next !== noop ? (v: T) => predicate(v) ? next(v) : void (0) : noop,
197 next: next !== noop ? (v: T) => predicate(v) ? next(v) : void (0) : noop,
198 ...rest
198 ...rest
199 })
199 })
200 ),
200 ),
201
201
202 until: predicate => _observe(({ next, complete, ...rest }) =>
202 until: predicate => _observe(({ next, complete, ...rest }) =>
203 producer({
203 producer({
204 next: v => predicate(v) ? complete() : next(v),
204 next: v => predicate(v) ? complete() : next(v),
205 complete,
205 complete,
206 ...rest
206 ...rest
207 })
207 })
208 ),
208 ),
209
209
210 while: predicate => _observe(({ next, complete, ...rest }) =>
210 while: predicate => _observe(({ next, complete, ...rest }) =>
211 producer({
211 producer({
212 next: v => predicate(v) ? next(v) : complete(),
212 next: v => predicate(v) ? next(v) : complete(),
213 complete,
213 complete,
214 ...rest
214 ...rest
215 })
215 })
216 ),
216 ),
217
217
218 scan: <A>(...args: [AccumulatorFn<T, A>, A] | [AccumulatorFn<T, T>]) => _observe<T | A>(({ next, ...rest }) => {
218 scan: <A>(...args: [AccumulatorFn<T, A>, A] | [AccumulatorFn<T, T>]) => _observe<T | A>(({ next, ...rest }) => {
219 if (args.length === 1) {
219 if (args.length === 1) {
220 const [accumulator] = args;
220 const [accumulator] = args;
221 let _acc: T;
221 let _acc: T;
222 let index = 0;
222 let index = 0;
223 return producer({
223 return producer({
224 next: next !== noop ? (v: T) => next(index++ === 0 ? _acc = v : _acc = accumulator(_acc, v)) : noop,
224 next: next !== noop ? (v: T) => next(index++ === 0 ? _acc = v : _acc = accumulator(_acc, v)) : noop,
225 ...rest
225 ...rest
226 });
226 });
227 } else {
227 } else {
228 const [accumulator, initial] = args;
228 const [accumulator, initial] = args;
229 let _acc = initial;
229 let _acc = initial;
230 return producer({
230 return producer({
231 next: next !== noop ? (v: T) => next(_acc = accumulator(_acc, v)) : noop,
231 next: next !== noop ? (v: T) => next(_acc = accumulator(_acc, v)) : noop,
232 ...rest
232 ...rest
233 });
233 });
234 }
234 }
235 }),
235 }),
236
236
237 reduce: <A>(...args: [AccumulatorFn<T, A>, A] | [AccumulatorFn<T, T>]) => _observe<T | A>(({ next, complete, error, ...rest }) => {
237 reduce: <A>(...args: [AccumulatorFn<T, A>, A] | [AccumulatorFn<T, T>]) => _observe<T | A>(({ next, complete, error, ...rest }) => {
238 if (args.length === 1) {
238 if (args.length === 1) {
239 const [accumulator] = args;
239 const [accumulator] = args;
240 let _acc: T;
240 let _acc: T;
241 let index = 0;
241 let index = 0;
242 return producer({
242 return producer({
243 next: next !== noop ? (v: T) => {
243 next: next !== noop ? (v: T) => {
244 _acc = index++ === 0 ? v : accumulator(_acc, v);
244 _acc = index++ === 0 ? v : accumulator(_acc, v);
245 } : noop,
245 } : noop,
246 complete: () => {
246 complete: () => {
247 if (index === 0) {
247 if (index === 0) {
248 error(new Error("The sequence can't be empty"));
248 error(new Error("The sequence can't be empty"));
249 } else {
249 } else {
250 next(_acc);
250 next(_acc);
251 complete();
251 complete();
252 }
252 }
253 },
253 },
254 error,
254 error,
255 ...rest
255 ...rest
256 });
256 });
257 } else {
257 } else {
258 const [accumulator, initial] = args;
258 const [accumulator, initial] = args;
259 let _acc = initial;
259 let _acc = initial;
260 return producer({
260 return producer({
261 next: next !== noop ? (v: T) => {
261 next: next !== noop ? (v: T) => {
262 _acc = accumulator(_acc, v);
262 _acc = accumulator(_acc, v);
263 } : noop,
263 } : noop,
264 complete: () => {
264 complete: () => {
265 next(_acc);
265 next(_acc);
266 complete();
266 complete();
267 },
267 },
268 error,
268 error,
269 ...rest
269 ...rest
270 });
270 });
271 }
271 }
272 }),
272 }),
273
273
274 cat: (...seq) => _observe(({ next, complete: final, ...rest }) => {
274 cat: (...seq) => _observe(({ next, complete: final, ...rest }) => {
275 let cleanup: () => void;
275 let cleanup: () => void;
276 const complete = () => {
276 const complete = () => {
277 const continuation = seq.shift();
277 const continuation = seq.shift();
278 if (continuation) {
278 if (continuation) {
279 // if we have a next sequence, subscribe to it
279 // if we have a next sequence, subscribe to it
280 const subscription = continuation.subscribe({ next, complete, ...rest });
280 const subscription = continuation.subscribe({ next, complete, ...rest });
281 cleanup = subscription.unsubscribe.bind(subscription);
281 cleanup = subscription.unsubscribe.bind(subscription);
282 } else {
282 } else {
283 // otherwise notify the consumer about completion
283 // otherwise notify the consumer about completion
284 final();
284 final();
285 }
285 }
286 };
286 };
287
287
288 cleanup = producer({ next, complete, ...rest }) ?? noop;
288 cleanup = producer({ next, complete, ...rest }) ?? noop;
289
289
290 return () => cleanup();
290 return () => cleanup();
291 }),
291 }),
292
292
293 pipe: <U>(op: (source: Observable<T>) => Producer<U>) => observe(op(_observe(producer))),
293 pipe: <U>(op: (source: Observable<T>) => Producer<U>) => observe(op(_observe(producer))),
294
294
295 next: (ct?: ICancellation) => {
295 next: (ct?: ICancellation) => {
296 const _ct = ct ?? Cancellation.none;
296 const _ct = ct ?? Cancellation.none;
297 return new Promise<T>((resolve, reject) => {
297 return new Promise<T>((resolve, reject) => {
298 // wrap the producer to handle only single event
298 // wrap the producer to handle only single event
299 const once = fuse<T>(({ next, complete, error, isClosed }) => {
299 const once = fuse<T>(({ next, complete, error, isClosed }) => {
300 const h = _ct.register(error);
300 const h = _ct.register(error);
301
301
302 // is the _ct fires it will call error() and isClosed() will return true
302 // is the _ct fires it will call error() and isClosed() will return true
303 const cleanup = !isClosed() ?
303 const cleanup = !isClosed() ?
304 producer({
304 producer({
305 next: v => (next(v), complete()),
305 next: v => (next(v), complete()),
306 complete: () => error(new Error("The sequence is empty")),
306 complete: () => error(new Error("The sequence is empty")),
307 error,
307 error,
308 isClosed
308 isClosed
309 }) ?? noop :
309 }) ?? noop :
310 noop;
310 noop;
311
311
312 return () => {
312 return () => {
313 h.destroy();
313 h.destroy();
314 cleanup();
314 cleanup();
315 };
315 };
316 });
316 });
317
317
318 once({
318 once({
319 next: resolve,
319 next: resolve,
320 error: reject,
320 error: reject,
321 complete: noop,
321 complete: noop,
322 isClosed: () => false
322 isClosed: () => false
323 });
323 });
324 });
324 });
325 }
325 }
326 });
326 });
327
327
328 export const observe = <T>(producer: Producer<T>) => _observe(fuse(producer));
328 export const observe = <T>(producer: Producer<T>) => _observe(fuse(producer));
329
329
330 export const streamArray = <T>(items: T[]) => _observe<T>(
330 export const streamArray = <T>(items: T[]) => _observe<T>(
331 ({ next, complete }) => (
331 ({ next, complete }) => (
332 items.forEach(next),
332 items.forEach(next),
333 complete()
333 complete()
334 )
334 )
335 );
335 );
336
336
337 export const streamPromise = <T>(promise: PromiseLike<T>) => observe<T>(
337 export const streamPromise = <T>(promise: PromiseLike<T>) => observe<T>(
338 ({ next, error, complete }) => void promise.then(
338 ({ next, error, complete }) => void promise.then(
339 v => (next(v), complete()),
339 v => (next(v), complete()),
340 error
340 error
341 )
341 )
342 );
342 );
343
343
344 export const of = <T>(...items: T[]) => _observe<T>(
344 export const of = <T>(...items: T[]) => _observe<T>(
345 ({ next, complete }) => (
345 ({ next, complete }) => (
346 items.forEach(next),
346 items.forEach(next),
347 complete()
347 complete()
348 )
348 )
349 );
349 );
350
350
351 export const empty = _observe<never>(({ complete }) => complete());
351 export const empty = _observe<never>(({ complete }) => complete());
352
352
353 /**
353 /**
354 * Creates a mutable state and the observable for the stored value.
354 * Creates a mutable state and the observable for the stored value.
355 *
355 *
356 * @param value The initial value for the state
356 * @param value The initial value for the state
357 * @returns an array of three elements `[observable, setter, getter]`
357 * @returns an array of three elements `[observable, setter, getter]`
358 *
358 *
359 * The returned observable keeps the actual value and will emit it as the next
359 * The returned observable keeps the actual value and will emit it as the next
360 * element each time a consumer subscribes the observable.
360 * element each time a consumer subscribes the observable.
361 *
361 *
362 * Calling the setter will update the stored value in the observable and notify
362 * Calling the setter will update the stored value in the observable and notify
363 * all consumers.
363 * all consumers.
364 */
364 */
365 export const stateful = <T>(producer: Producer<T>): Producer<T> => {
365 export const stateful = <T>(producer: Producer<T>): Producer<T> => {
366 const fusedProducer = fuse(producer);
366 const fusedProducer = fuse(producer);
367 type Status = "active" | "complete" | "error";
367 type Status = "active" | "complete" | "error";
368
368
369 let lastValue: T;
369 let lastValue: T;
370 let hasValue = false;
370 let hasValue = false;
371 let status: Status = "active";
371 let status: Status = "active";
372 let lastError: unknown;
372 let lastError: unknown;
373 let subscribers: Sink<T>[] = [];
373 let subscribers: Sink<T>[] = [];
374
374
375 const sink: Sink<T> = {
375 const sink: Sink<T> = {
376 isClosed: () => status !== "active",
376 isClosed: () => status !== "active",
377 complete: () => {
377 complete: () => {
378 if (status === "active") {
378 if (status === "active") {
379 status = "complete";
379 status = "complete";
380 const _subscribers = subscribers;
380 const _subscribers = subscribers;
381 subscribers = [];
381 subscribers = [];
382 _subscribers.forEach(s => s.complete());
382 _subscribers.forEach(s => s.complete());
383 }
383 }
384 },
384 },
385 error: e => {
385 error: e => {
386 if (status === "active") {
386 if (status === "active") {
387 status = "error";
387 status = "error";
388 lastError = e;
388 lastError = e;
389 const _subscribers = subscribers;
389 const _subscribers = subscribers;
390 subscribers = [];
390 subscribers = [];
391 _subscribers.forEach(s => s.error(e));
391 _subscribers.forEach(s => s.error(e));
392 }
392 }
393 },
393 },
394 next: v => {
394 next: v => {
395 if (status === "active") {
395 if (status === "active") {
396 hasValue = true;
396 hasValue = true;
397 lastValue = v;
397 lastValue = v;
398 const _subscribers = subscribers;
398 const _subscribers = subscribers;
399 _subscribers.forEach(s => s.next(v));
399 _subscribers.forEach(s => s.next(v));
400 }
400 }
401 }
401 }
402 };
402 };
403
403
404 fusedProducer(sink);
404 fusedProducer(sink);
405
405
406 return (s: Sink<T>) => {
406 return (s: Sink<T>) => {
407 const _subscribers = subscribers;
407 const _subscribers = subscribers;
408 switch (status) {
408 switch (status) {
409 case "active":
409 case "active":
410 if (hasValue)
410 if (hasValue)
411 s.next(lastValue); // if hasValue is true,
411 s.next(lastValue); // if hasValue is true,
412 // lastValue has a valid value
412 // lastValue has a valid value
413 subscribers.push(s);
413 subscribers.push(s);
414 return () => {
414 return () => {
415 if (_subscribers === subscribers) {
415 if (_subscribers === subscribers) {
416 const pos = subscribers.indexOf(s);
416 const pos = subscribers.indexOf(s);
417 if (pos >= 0)
417 if (pos >= 0)
418 subscribers.splice(pos, 1);
418 subscribers.splice(pos, 1);
419 }
419 }
420 };
420 };
421 case "complete":
421 case "complete":
422 s.complete();
422 s.complete();
423 break;
423 break;
424 case "error":
424 case "error":
425 s.error(lastError);
425 s.error(lastError);
426 break;
426 break;
427 }
427 }
428 };
428 };
429 };
429 };
430
430
431 const subject = <T>(producer: Producer<T>): Producer<T> => {
431 /** Create the producer which will be called once when the first subscriber is
432 * attached, next subscribers would share the same producer. When all
433 * subscribers are removed the producer will be cleaned up.
434 *
435 * Use this wrapper to prevent spawning multiple producers.
436 *
437 * @param producer The source producer
438 * @returns The wrapped producer
439 */
440 export const subject = <T>(producer: Producer<T>): Producer<T> => {
432 const fusedProducer = fuse(producer);
441 const fusedProducer = fuse(producer);
433
442
443 let subscribers: Sink<T>[] = [];
444
445 let cleanup = noop;
446
447 const sink: Sink<T> = {
448 isClosed: () => false,
449 complete: () => {
450 const _subscribers = subscribers;
451 subscribers = [];
452 _subscribers.forEach(s => s.complete());
453 cleanup();
454 },
455 error: e => {
456 const _subscribers = subscribers;
457 subscribers = [];
458 _subscribers.forEach(s => s.error(e));
459 cleanup();
460 },
461 next: v => {
462 const _subscribers = subscribers;
463 _subscribers.forEach(s => s.next(v));
464 }
465 };
466
467 return client => {
468 const _subscribers = subscribers;
469 subscribers.push(client);
470 if (subscribers.length === 1)
471 cleanup = fusedProducer(sink) ?? noop;
472
434 return () => {
473 return () => {
435
474 if (_subscribers === subscribers) {
475 const pos = subscribers.indexOf(client);
476 if (pos >= 0)
477 subscribers.splice(pos,1);
478 if (!subscribers.length)
479 cleanup();
480 }
481 };
436 };
482 };
437 }; No newline at end of file
483 };
@@ -1,3 +1,4
1 import "./declare-tests";
1 import "./declare-tests";
2 import "./observable-tests";
2 import "./observable-tests";
3 import "./state-tests"; No newline at end of file
3 import "./state-tests";
4 import "./subject-tests"; No newline at end of file
1 NO CONTENT: modified file
NO CONTENT: modified file
The requested commit or file is too big and content was truncated. Show full diff
General Comments 0
You need to be logged in to leave comments. Login now