title: RxJS
date: 2020-05-21 16:42:08

tags:

RxJS指引

在前端,我们通常有这么一些方式来处理异步的东西:

  • 回调
  • 事件
  • Promise
  • Generator

其中,存在两种处理问题的方式,因为需求也是两种:

  • 分发
  • 流程

在处理分发的需求的时候,回调、事件或者类似订阅发布这种模式是比较合适的;
而在处理流程性质的需求时,Promise和Generator比较合适。

RxJS的优势在于结合了两种模式,它的每个Observable上都能够订阅,而Observable之间的关系,则能够体现流程(注意,RxJS里面的流程的控制和处理,其直观性略强于Promise,但弱于Generator)。

我们可以把一切输入都当做数据流来处理,比如说:

  • 用户操作
  • 网络响应
  • 定时器
  • Worker

RxJS提供了各种API来创建数据流:

  • 单值:of, empty, never
  • 多值:from
  • 定时:interval, timer
  • 从事件创建:fromEvent
  • 从Promise创建:fromPromise
  • 自定义创建:create

创建出来的数据流是一种可观察的序列,可以被订阅,也可以被用来做一些转换操作,比如:

  • 改变数据形态:map, mapTo, pluck
  • 过滤一些值:filter, skip, first, last, take
  • 时间轴上的操作:delay, timeout, throttle, debounce, audit, bufferTime
  • 累加:reduce, scan
  • 异常处理:throw, catch, retry, finally
  • 条件执行:takeUntil, delayWhen, retryWhen, subscribeOn, ObserveOn
  • 转接:switch

也可以对若干个数据流进行组合:

  • concat,保持原来的序列顺序连接两个数据流
  • merge,合并序列
  • race,预设条件为其中一个数据流完成
  • forkJoin,预设条件为所有数据流都完成
  • zip,取各来源数据流最后一个值合并为对象
  • combineLatest,取各来源数据流最后一个值合并为数组

这时候回头看,其实RxJS在事件处理的路上已经走得太远了,从事件到流,它被称为lodash for events,倒不如说是lodash for stream更贴切,它提供的这些操作符也确实可以跟lodash媲美。

那么,数据的管道是什么形状的?

在RxJS中,存在这么几种东西:

  • Observable 可观察序列,只出不进
  • Observer 观察者,只进不出
  • Subject 可出可进的可观察序列,可作为观察者
  • ReplaySubject 带回放
  • Subscription 订阅关系

基于RxJS编程,就好像是在组装管道,依赖关系其实是定义在管道上,而不是在数据上。所以,不存在命令式的那些问题,只要管道能够接起来,再放进去数据就可以了。所以,我们可以先定义管道之间的依赖关系,

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
# 赚钱是为了买房,买房是为了赚钱

工资周期 ———> 工资

房租周期 ———> 租金 ———> 收入 ———> 现金
↑ ↓
房子数量 <——— 新购房


import {interval, Subject, merge} from "rxjs";
import {mapTo, scan, startWith, withLatestFrom, map} from "rxjs/operators";

// 工资:每1个月工资收入为2
const salary$ = interval(1000).pipe(
mapTo(2),
map(s => {
console.log("salary : "+ (s))
return s;
})
);
// 房子
const house$ = new Subject();
// 房子数量
const houseCount$ = house$.pipe(
scan((acc, num)=> {
console.log("house count : "+ (acc+num))
return acc+num
},0),
startWith(0)
);
// 房租:每3个月房子数量*5
const rent$ = interval(3000).pipe(
withLatestFrom(houseCount$),
map(arr => {
console.log("rent : "+ (arr[1] * 5))
return arr[1] * 5
})
);
// 收入:工资+房租
const income$ = merge(salary$, rent$);
// 现金流:一旦现金流够买房,就去买
const cash$ = income$.pipe(
scan((acc,num) =>{
const newSum = acc+num;
const newHouse = Math.floor(newSum / 100);
console.log("cash : "+ newSum )
if(newHouse > 0) {
console.log("** BUY HOUSE ** ", newHouse)
house$.next(newHouse);
console.log("left : "+ newSum % 100 )
}
console.log("------ ")
return newSum % 100;
}, 0)
);

cash$.subscribe()

大部分业务系统前端不太适合用RxJS,大部分是中后台CRUD系统,因为两个原因:整体性、实时性的要求不高。

什么是整体性?这是一种系统设计的理念,系统中的很多业务模块不是孤立的,比如说,从展示上,GUI与命令行的差异在于什么?在于数据的冗余展示。我们可以把同一份业务数据以不同形态展示在不同视图上,甚至在PC端,由于屏幕大,可以允许同一份数据以不同形态同时展现,这时候,为了整体协调,对此数据的更新就会要产生很多分发和联动关系。

什么是实时性?这个其实有多个含义,一个比较重要的因素是服务端是否会主动向推送一些业务更新信息,如果用得比较多,也会产生不少的分发关系。

在分发和联动关系多的时候,RxJS才能更加体现出它比Generator、Promise的优势。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
let count = 0;
const rate = 1000;
let lastClick = Date.now() - rate;
document.addEventListener('click', event => {
if (Date.now() - lastClick >= rate) {
count += event.clientX;
console.log(count);
lastClick = Date.now();
}
});


import { fromEvent } from 'rxjs';
import { throttleTime, map, scan } from 'rxjs/operators';
fromEvent(document, 'click')
.pipe(
throttleTime(1000),
map(event => event.clientX),
scan((count, clientX) => count + clientX, 0)
)
.subscribe(console.log);
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
import { Observable } from "rxjs";

const observable = new Observable((subscriber) => {
subscriber.next(1);
subscriber.next(2);
subscriber.next(3);
setTimeout(() => {
subscriber.next(4); // happens asynchronously
subscriber.complete();
}, 1000);
});

console.log("just before subscribe");
observable.subscribe({
next(x) {
console.log("got value " + x);
},
error(err) {
console.error("something wrong occurred: " + err);
},
complete() {
console.log("done");
},
});
console.log("just after subscribe");

/**
'just before subscribe'
'got value 1'
'got value 2'
'got value 3'
'just after subscribe'
'got value 4'
'done'
**/

Observable

represents the idea of an invokable collection of future values or events

SINGLEMULTIPLE
PullFunctionIterator
PushPromiseObservable

Pull and Push are two different protocols that describe how a data Producer can communicate with a data Consumer.

What is Pull? In Pull systems, the Consumer determines when it receives data from the data Producer. The Producer itself is unaware of when the data will be delivered to the Consumer.

Every JavaScript Function is a Pull system. The function is a Producer of data, and the code that calls the function is consuming it by “pulling” out a single return value from its call.

ES2015 introduced generator functions and iterators (function*), another type of Pull system. Code that calls iterator.next() is the Consumer, “pulling” out multiple values from the iterator (the Producer).

What is Push? In Push systems, the Producer determines when to send data to the Consumer. The Consumer is unaware of when it will receive that data.

Promises are the most common type of Push system in JavaScript today. A Promise (the Producer) delivers a resolved value to registered callbacks (the Consumers), but unlike functions, it is the Promise which is in charge of determining precisely when that value is “pushed” to the callbacks.

RxJS introduces Observables, a new Push system for JavaScript. An Observable is a Producer of multiple values, “pushing” them to Observers (Consumers).

  • A Function is a lazily evaluated computation that synchronously returns a single value on invocation.
  • A generator is a lazily evaluated computation that synchronously returns zero to (potentially) infinite values on iteration.
  • A Promise is a computation that may (or may not) eventually return a single value.
  • An Observable is a lazily evaluated computation that can synchronously or asynchronously return zero to (potentially) infinite values from the time it’s invoked onwards.

With observable.subscribe, the given Observer is not registered as a listener in the Observable. The Observable does not even maintain a list of attached Observers.

A subscribe call is simply a way to start an “Observable execution” and deliver values or events to an Observer of that execution.

In an Observable Execution, zero to infinite Next notifications may be delivered. If either an Error or Complete notification is delivered, then nothing else can be delivered afterwards.

1
2
3
4
5
6
7
8
9
10
11
12
13
const observable = new Observable(function subscribe(subscriber) {
...
return function unsubscribe() {};
});
const subscription = observable.subscribe(Observer<T>)
subscription.unsubscribe();

export interface Observer<T> {
closed?: boolean;
next: (value: T) => void;
error: (err: any) => void;
complete: () => void;
}

Observer

is a collection of callbacks that knows how to listen to values delivered by the Observable

Subscription

represents the execution of an Observable, is primarily useful for cancelling the execution

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
import { interval } from 'rxjs';

const observable1 = interval(400);
const observable2 = interval(300);

const subscription = observable1.subscribe(x => console.log('first: ' + x));
const childSubscription = observable2.subscribe(x => console.log('second: ' + x));

subscription.add(childSubscription);
// subscription.remove(childSubscription);

setTimeout(() => {
// Unsubscribes BOTH subscription and childSubscription
subscription.unsubscribe();
}, 1000);

/*
'second: 0'
'first: 0'
'second: 1'
'first: 1'
'second: 2'
*/

Subject

An RxJS Subject is a special type of Observable that allows values to be multicasted to many Observers. While plain Observables are unicast (each subscribed Observer owns an independent execution of the Observable), Subjects are multicast.
Subjects are like EventEmitters: they maintain a registry of many listeners.

  • Every Subject is an Observable
  • Every Subject is an Observer
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
import { Subject } from 'rxjs';

const subject = new Subject<number>();

subject.subscribe({
next: (v) => console.log(`observerA: ${v}`)
});
subject.subscribe({
next: (v) => console.log(`observerB: ${v}`)
});

subject.next(1);
subject.next(2);

// Logs:
// observerA: 1
// observerB: 1
// observerA: 2
// observerB: 2
converted a unicast Observable execution to multicast through the Subject
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
import { Subject, from } from 'rxjs';

const subject = new Subject<number>();

subject.subscribe({
next: (v) => console.log(`observerA: ${v}`)
});
subject.subscribe({
next: (v) => console.log(`observerB: ${v}`)
});

const observable = from([1, 2, 3]);

observable.subscribe(subject); // You can subscribe providing a Subject

// Logs:
// observerA: 1
// observerB: 1
// observerA: 2
// observerB: 2
// observerA: 3
// observerB: 3
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
import { from, Subject } from 'rxjs';
import { multicast } from 'rxjs/operators';

const source = from([1, 2, 3]);
const subject = new Subject();
const multicasted = source.pipe(multicast(subject));

// These are, under the hood, `subject.subscribe({...})`:
multicasted.subscribe({
next: (v) => console.log(`observerA: ${v}`)
});
multicasted.subscribe({
next: (v) => console.log(`observerB: ${v}`)
});

// This is, under the hood, `source.subscribe(subject)`:
multicasted.connect();
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
import { interval, Subject } from 'rxjs';
import { multicast, refCount } from 'rxjs/operators';

const source = interval(500);
const subject = new Subject();
const refCounted = source.pipe(multicast(subject), refCount());
let subscription1, subscription2;

// This calls `connect()`, because
// it is the first subscriber to `refCounted`
console.log('observerA subscribed');
subscription1 = refCounted.subscribe({
next: (v) => console.log(`observerA: ${v}`)
});

setTimeout(() => {
console.log('observerB subscribed');
subscription2 = refCounted.subscribe({
next: (v) => console.log(`observerB: ${v}`)
});
}, 600);

setTimeout(() => {
console.log('observerA unsubscribed');
subscription1.unsubscribe();
}, 1200);

// This is when the shared Observable execution will stop, because
// `refCounted` would have no more subscribers after this
setTimeout(() => {
console.log('observerB unsubscribed');
subscription2.unsubscribe();
}, 2000);

// Logs
// observerA subscribed
// observerA: 0
// observerB subscribed
// observerA: 1
// observerB: 1
// observerA unsubscribed
// observerB: 2
// observerB unsubscribed
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
import { Observable, Subject } from "rxjs";
import { multicast } from "rxjs/operators";

const subject = new Subject();
subject.subscribe((data) => {
console.log(`Subject subscriber 1 recieves ${data}`)
})
subject.subscribe((data) => {
console.log(`Subject subscriber 2 recieves ${data}`)
})
// subject的observers接收到一样的值
subject.next(Math.random());

console.log("**********************************");
const observable = Observable.create((observer) => {
observer.next(Math.random());
});
observable.subscribe((data) => {
console.log(`Observable subscriber 1 recieves ${data}`);
});
observable.subscribe((data) => {
console.log(`Observable subscriber 2 recieves ${data}`);
});
// observable的observers接收到不一样的值

console.log("**********************************");
observable.subscribe(subject);
// observable的subject observer接收到一样的值

console.log("**********************************");
const multicasted = observable.pipe(multicast(subject))
multicasted.subscribe((data) => {
console.log(`multicasted subscriber 1 recieves ${data}`);
});
multicasted.subscribe((data) => {
console.log(`multicasted subscriber 2 recieves ${data}`);
});
multicasted.subscribe((data) => {
console.log(`multicasted subscriber 3 recieves ${data}`);
});
multicasted.connect();
// multicasted的observers接收到一样的值
// subject的observers接收到一样的值

// 'Subject subscriber 1 recieves 0.3360750588442847'
// 'Subject subscriber 2 recieves 0.3360750588442847'
// '**********************************'
// 'Observable subscriber 1 recieves 0.7352480735187386'
// 'Observable subscriber 2 recieves 0.9850076539900632'
// '**********************************'
// 'Subject subscriber 1 recieves 0.5108481360776986'
// 'Subject subscriber 2 recieves 0.5108481360776986'
// '**********************************'
// 'Subject subscriber 1 recieves 0.4067305433286976'
// 'Subject subscriber 2 recieves 0.4067305433286976'
// 'multicasted subscriber 1 recieves 0.4067305433286976'
// 'multicasted subscriber 2 recieves 0.4067305433286976'
// 'multicasted subscriber 3 recieves 0.4067305433286976'
  • publish() — is a shortcut for multicast(() => new Subject()):
  • publishBehavior() — is a shortcut for multicast(new BehaviorSubject()):
  • publishReplay(x) — is a shortcut for multicast(() => new ReplaySubject(x)):
  • publishLast() — is a shortcut for multicast(new AsyncSubject()):
  • share() — is a shortcut for multicast(() => new Subject()) + refCount():
  • shareReplay(bufferSize) — is a multicasting operator that uses a ReplaySubject(). It doesn’t internally use the multicast operator itself, and as a result it always returns an observable, rather than a ConnectableObservable. It can be used either with a refCount, or without it
BehaviorSubject

A variant of Subject that requires an initial value and emits its current value whenever it is subscribed to

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
import { BehaviorSubject } from 'rxjs';
const subject = new BehaviorSubject(0); // 0 is the initial value

subject.subscribe({
next: (v) => console.log(`observerA: ${v}`)
});

subject.next(1);
subject.next(2);

subject.subscribe({
next: (v) => console.log(`observerB: ${v}`)
});

subject.next(3);

// Logs
// observerA: 0
// observerA: 1
// observerA: 2
// observerB: 2
// observerA: 3
// observerB: 3

ReplaySubject

A variant of Subject that “replays” or emits old values to new subscribers. It buffers a set number of values and will emit those values immediately to any new subscribers in addition to emitting new values to existing subscribers.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
import { ReplaySubject } from 'rxjs';
const subject = new ReplaySubject(3); // buffer 3 values for new subscribers

subject.subscribe({
next: (v) => console.log(`observerA: ${v}`)
});

subject.next(1);
subject.next(2);
subject.next(3);
subject.next(4);

subject.subscribe({
next: (v) => console.log(`observerB: ${v}`)
});

subject.next(5);

// Logs:
// observerA: 1
// observerA: 2
// observerA: 3
// observerA: 4
// observerB: 2
// observerB: 3
// observerB: 4
// observerA: 5
// observerB: 5

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
import { ReplaySubject } from 'rxjs';
const subject = new ReplaySubject(100, 500 /* windowTime */);

subject.subscribe({
next: (v) => console.log(`observerA: ${v}`)
});

let i = 1;
setInterval(() => subject.next(i++), 200);

setTimeout(() => {
subject.subscribe({
next: (v) => console.log(`observerB: ${v}`)
});
}, 1000);

// Logs
// observerA: 1
// observerA: 2
// observerA: 3
// observerA: 4
// observerA: 5
// observerB: 3
// observerB: 4
// observerB: 5
// observerA: 6
// observerB: 6
// ...
AsyncSubject

A variant of Subject that only emits a value when it completes. It will emit its latest value to all its observers on completion.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
import { AsyncSubject } from 'rxjs';
const subject = new AsyncSubject();

subject.subscribe({
next: (v) => console.log(`observerA: ${v}`)
});

subject.next(1);
subject.next(2);
subject.next(3);
subject.next(4);

subject.subscribe({
next: (v) => console.log(`observerB: ${v}`)
});

subject.next(5);
subject.complete();

// Logs:
// observerA: 5
// observerB: 5

operators

are pure functions that enable a functional programming style of dealing with collections with operations like map, filter, concat, reduce, etc

  • Pipeable Operators. When called, they do not change the existing Observable instance. Instead, they return a new Observable, whose subscription logic is based on the first Observable.
  • Creation Operators. can be called as standalone functions to create a new Observable. Distinct from pipeable operators, creation operators are functions that can be used to create an Observable with some common predefined behavior or by joining other Observables.
1
2
3
4
5
6
7
8
9
import { of } from 'rxjs';
import { map } from 'rxjs/operators';

map(x => x * x)(of(1, 2, 3)).subscribe((v) => console.log(`value: ${v}`));

// Logs:
// value: 1
// value: 4
// value: 9
1
2
3
4
5
6
7
import { of } from 'rxjs';
import { first } from 'rxjs/operators';

first()(of(1, 2, 3)).subscribe((v) => console.log(`value: ${v}`));

// Logs:
// value: 1
1
2
3
4
5
6
7
obs.pipe(
op1(),
op2(),
op3(),
op4(),
)
// op4()(op3()(op2()(op1()(obs))))
1
2
3
import { interval } from 'rxjs';

const observable = interval(1000 /* number of milliseconds */);

Higher-order Observables
Observables most commonly emit ordinary values like strings and numbers, but surprisingly often, it is necessary to handle Observables of Observables, so-called higher-order Observables.

But how do you work with a higher-order Observable? Typically, by flattening: by (somehow) converting a higher-order Observable into an ordinary Observable. For example:

1
2
3
4
const fileObservable = urlObservable.pipe(
map(url => http.get(url)),
concatAll(),
);
Creation Operators
Join Creation Operators
Join Operators
Multicasting Operators
Utility Operators
Conditional and Boolean Operators
Mathematical and Aggregate Operators
Grouping Operators
combineAllTakes an observable as its source and emits the latest value from each observable when any one of the inner observables emits.
concatAllEmit all values from each source observable, finishing one observable before the beginning the next.
Grouping Values
groupByIt organizes the values from the source observable according to a condition that we provide as a parameter.
pairwiseGroups values in pairs. This is useful, for example, to compare the values that were emitted.
toArrayCollects all of the values, stuffs them into an array and passes them along to any subscribers. Nothing is emitted from toArray until the source closes and then the array arrives as a single value.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
of(...players)
.pipe(
groupBy(p => p.id),
mergeMap(player => {
player.pipe(
reduce((acc, cur) => {
acc.id = acc.id || cur.id;
acc.goals += cur.goals;
acc.week += cur.week;
return acc;
}, {id:null, goals:0, week:0})
)
})
)
.subscribe(console.log)
Value Transformation
concatMaptakes each input value and passes it to the function provided to the operator as a parameter. The result from the function is then passed to the subscribers and the next value from the source is processed.
defaultIfEmptyAllows us to specify a value to be emitted from a source if it completes without emitting any values.
mapMaybe the single most popular operator in RxJS. It allows us to take every value coming out of a source and run it through a function that we provide. Then the value returned from that function is what gets send to subscribers.
Filtering Operators
skipwill be filtered out from the beginning of the source observable.
skipWhileAllows us to evaluate the values coming from our source and use them to determine whether we should skip. This is an one-time determination. Means that once the supplied predicate function evaluates too false, every value after that is accepted and emitted to the subscribers.
takeIs the opposite of skip. Here we provide a number value as a parameter and that’s the number of items that will be accepted from the source and passed along to our subscribers.
distinctAllows us to eliminate duplicates from our stream. There are a couple of approaches to do this. If we don’t provide any parameter, then it will exclude all duplicates. If we provide a function, it will use that function to determine what duplicate means. The function will receive each value emitted from the source observable in sequence and then returns a potentially modified value, which is what is used for the comparison to determine uniqueness.
distinctUntilChangedThis is a bit different from the plain distinct in that will only compare a new value emitted from the source observable to the immediately previous value. If they’re different, the new value will be passed along. If they’re identical, the new value will be dropped.
filterAllows us to get as fancy as we would like our elimination of values from the source observable. We need to provide a predicate function to do the comparison.
firstEmits just the first value from a source and the unsubscribes.
elementAtLet us emit only the value from the source observable from the specified position.
findAllow just the first instance of a value that causes the predicate function to be true to passed along to the subscribers. Once that first value as passed along, the source observable is unsubscribed. So, no additional values are emitted.
singleIt emits one of the tree things: True, Error and Undefined

#####

Error Handling Operators
catchErrorIt allows our code to be notified when an error occurs in the source stream and reacts by either passing the error along or emitting another observable. In either event, it prevents an unexpected or unhandled stoppage in the stream.
throwIfEmptyIf our source completes without emitting anything, this operator will force that to be considered an error. I know, nothing says that our source must emit something before it completes, but this operator gives us that capability.
retryIt is pretty straightforward and useful for things like HTTP requests that may fail due to network issues. This operator allows us to go ahead and to re-subscribe to try again, up at certain numbers of retries, if our source throws an error. You have to have in mind that it starts at the beginning of that observable.
use the pipe() function to make new operators
1
2
3
4
5
6
7
8
9
import { pipe } from 'rxjs';
import { filter, map } from 'rxjs';

function discardOddDoubleEven() {
return pipe(
filter(v => ! (v % 2)),
map(v => v + v),
);
}
creating new operators from scratch
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
import { Observable } from 'rxjs';

function delay(delayInMillis) {
return (observable) => new Observable(observer => {
// this function will called each time this
// Observable is subscribed to.
const allTimerIDs = new Set();
const subscription = observable.subscribe({
next(value) {
const timerID = setTimeout(() => {
observer.next(value);
allTimerIDs.delete(timerID);
}, delayInMillis);
allTimerIDs.add(timerID);
},
error(err) {
observer.error(err);
},
complete() {
observer.complete();
}
});
// the return value is the teardown function,
// which will be invoked when the new
// Observable is unsubscribed from.
return () => {
subscription.unsubscribe();
allTimerIDs.forEach(timerID => {
clearTimeout(timerID);
});
}
});
}

Schedulers

A scheduler controls when a subscription starts and when notifications are delivered. It consists of three components.

  • A Scheduler is a data structure. It knows how to store and queue tasks based on priority or other criteria.
  • A Scheduler is an execution context. It denotes where and when the task is executed (e.g. immediately, or in another callback mechanism such as setTimeout or process.nextTick, or the animation frame).
  • A Scheduler has a (virtual) clock. It provides a notion of “time” by a getter method now() on the scheduler. Tasks being scheduled on a particular scheduler will adhere only to the time denoted by that clock.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
import { Observable, asyncScheduler } from 'rxjs';
import { observeOn } from 'rxjs/operators';

const observable = new Observable((observer) => {
observer.next(1);
observer.next(2);
observer.next(3);
observer.complete();
}).pipe(
observeOn(asyncScheduler)
);

console.log('just before subscribe');
observable.subscribe({
next(x) {
console.log('got value ' + x)
},
error(err) {
console.error('something wrong occurred: ' + err);
},
complete() {
console.log('done');
}
});
console.log('just after subscribe');


<!--
just before subscribe
just after subscribe
got value 1
got value 2
got value 3
done
-->
scheduler types
  • null By not passing any scheduler, notifications are delivered synchronously and recursively. Use this for constant-time operations or tail recursive operations.
  • queueScheduler Schedules on a queue in the current event frame (trampoline scheduler). Use this for iteration operations.
  • asapScheduler Schedules on the micro task queue, which is the same queue used for promises. Basically after the current job, but before the next job. Use this for asynchronous conversions.
  • asyncScheduler Schedules work with setInterval. Use this for time-based operations.
  • animationFrameScheduler Schedules task that will happen just before next browser content repaint. Can be used to create smooth browser animations.
1
2
3
4
interface SchedulerLike {
now(): number
schedule<T>(work: (this: SchedulerAction<T>, state?: T) => void, delay?: number, state?: T): Subscription
}

Use subscribeOn to schedule in what context will the subscribe() call happen
Use observeOn to schedule in what context will notifications be delivered.

QUESTION

  1. 查询同一种数据,可能是同步的(缓存中获取 sync),可能是异步的(AJAX获取 async),业务代码编写需要考虑两种情况。
  2. 获取数据(pull)和数据的更新通知(push),写法是不同的,会加大业务代码编写的复杂度。
  3. 每个渲染数据,都是通过若干个查询过程(刚才提到的组合同步异步)组合而成,如何清晰地定义这种组合关系?
  4. 对于已有数据和未来数据,如何简化它们应用同样规则的代码复杂度。
同步和异步

无论Promise还是Observable,都可以实现同步和异步的封装

获取和订阅

从视图的角度来说,数据哪里来的并不重要,只管显示

1
2
3
4
5
6
# promise
service.on('task', data => { // 订阅
// render
})

service.getData() // 需要主动加一句来触发初始化请求

1
2
3
4
# observable
getDataO().subscribe(data => { // 获取和订阅这两件事情合并到了一起
// render
})
可组合的数据管道

一个视图所需要的数据可能是这样的:(data1,data2,data3都是包含了同步和异步封装的一个过程)

  • data1跟data2通过某种组合,得到一个结果
  • 这个结果再去跟data3组合,得到最终结果

抽象这个过程: 可以把每个Observable视为一节数据流的管道,我们所要做的,是根据它们之间的关系,把这些管道组装起来,这样,从管道的某个入口传入数据,在末端就可以得到最终的结果。

1
2
3
4
5
6
7
8
const A$ = Observable.interval(1000)
const B$ = Observable.of(3)
const C$ = Observable.from([5, 6, 7])

const D$ = C$.toArray()
.map(arr => arr.reduce((a, b) => a + b), 0)
const E$ = Observable.combineLatest(A$, B$, D$)
.map(arr => arr.reduce((a, b) => a + b), 0)
现在和未来

我们编写业务程序的时候,往往会把现在和未来分开考虑,而忽略了他们之间存在的深层次的一致性。
合适的业务抽象应该是如下规则

1
2
3
4
5
6
7
# 来源等于初始数据与新增数据的合并
const source$ = start$.merge(patch$) // 例如start是查询,patch是推送

# 进入本列表的数据都应当经过某种过滤规则和某种排序规则
const final$ = source$.map(filterA).map(sorterA)

# 最后在final上添加订阅,整个过程就完美的映射到了界面上

很多时候,我们编写代码都会考虑进行合适的抽象,但这两个字代表的含义在很多场景下并不相同。

很多人会懂得把代码划分为若干方法,若干类型,若干组件,以为这样就能够把整套业务的运转过程抽象出来,其实不然。

业务逻辑的抽象是与业务单元不同的方式,前者是血脉和神经,后者是肢体和器官,两者需要结合在一起,才能够成为鲜活的整体。

一般场景下,业务单元的抽象难度相对较低,很容易理解,也容易获得关注,所以通常都能做得还不错,比如最近两年,对于组件化之类的话题,都能够谈得起来了,但对于业务逻辑的抽象,大部分项目是做得很不够的,值得深思。

Reference
  1. https://github.com/xufei/blog/issues/38