title: RxJS
date: 2020-05-21 16:42:08
tags:
RxJS指引
在前端,我们通常有这么一些方式来处理异步的东西:
- 回调
- 事件
- Promise
- Generator
其中,存在两种处理问题的方式,因为需求也是两种:
- 分发
- 流程
在处理分发的需求的时候,回调、事件或者类似订阅发布这种模式是比较合适的;
而在处理流程性质的需求时,Promise和Generator比较合适。
RxJS的优势在于结合了两种模式,它的每个Observable上都能够订阅,而Observable之间的关系,则能够体现流程(注意,RxJS里面的流程的控制和处理,其直观性略强于Promise,但弱于Generator)。
我们可以把一切输入都当做数据流来处理,比如说:
- 用户操作
- 网络响应
- 定时器
- Worker
RxJS提供了各种API来创建数据流:
- 单值:of, empty, never
- 多值:from
- 定时:interval, timer
- 从事件创建:fromEvent
- 从Promise创建:fromPromise
- 自定义创建:create
创建出来的数据流是一种可观察的序列,可以被订阅,也可以被用来做一些转换操作,比如:
- 改变数据形态:map, mapTo, pluck
- 过滤一些值:filter, skip, first, last, take
- 时间轴上的操作:delay, timeout, throttle, debounce, audit, bufferTime
- 累加:reduce, scan
- 异常处理:throw, catch, retry, finally
- 条件执行:takeUntil, delayWhen, retryWhen, subscribeOn, ObserveOn
- 转接:switch
也可以对若干个数据流进行组合:
- concat,保持原来的序列顺序连接两个数据流
- merge,合并序列
- race,预设条件为其中一个数据流完成
- forkJoin,预设条件为所有数据流都完成
- zip,取各来源数据流最后一个值合并为对象
- combineLatest,取各来源数据流最后一个值合并为数组
这时候回头看,其实RxJS在事件处理的路上已经走得太远了,从事件到流,它被称为lodash for events,倒不如说是lodash for stream更贴切,它提供的这些操作符也确实可以跟lodash媲美。
那么,数据的管道是什么形状的?
在RxJS中,存在这么几种东西:
- Observable 可观察序列,只出不进
- Observer 观察者,只进不出
- Subject 可出可进的可观察序列,可作为观察者
- ReplaySubject 带回放
- Subscription 订阅关系
基于RxJS编程,就好像是在组装管道,依赖关系其实是定义在管道上,而不是在数据上。所以,不存在命令式的那些问题,只要管道能够接起来,再放进去数据就可以了。所以,我们可以先定义管道之间的依赖关系,
1 | # 赚钱是为了买房,买房是为了赚钱 |
大部分业务系统前端不太适合用RxJS,大部分是中后台CRUD系统,因为两个原因:整体性、实时性的要求不高。
什么是整体性?这是一种系统设计的理念,系统中的很多业务模块不是孤立的,比如说,从展示上,GUI与命令行的差异在于什么?在于数据的冗余展示。我们可以把同一份业务数据以不同形态展示在不同视图上,甚至在PC端,由于屏幕大,可以允许同一份数据以不同形态同时展现,这时候,为了整体协调,对此数据的更新就会要产生很多分发和联动关系。
什么是实时性?这个其实有多个含义,一个比较重要的因素是服务端是否会主动向推送一些业务更新信息,如果用得比较多,也会产生不少的分发关系。
在分发和联动关系多的时候,RxJS才能更加体现出它比Generator、Promise的优势。
1 | let count = 0; |
1 | import { Observable } from "rxjs"; |
Observable
represents the idea of an invokable collection of future values or events
SINGLE | MULTIPLE | |
---|---|---|
Pull | Function | Iterator |
Push | Promise | Observable |
Pull and Push are two different protocols that describe how a data Producer can communicate with a data Consumer.
What is Pull? In Pull systems, the Consumer determines when it receives data from the data Producer. The Producer itself is unaware of when the data will be delivered to the Consumer.
Every JavaScript Function is a Pull system. The function is a Producer of data, and the code that calls the function is consuming it by “pulling” out a single return value from its call.
ES2015 introduced generator functions and iterators (function*), another type of Pull system. Code that calls iterator.next() is the Consumer, “pulling” out multiple values from the iterator (the Producer).
What is Push? In Push systems, the Producer determines when to send data to the Consumer. The Consumer is unaware of when it will receive that data.
Promises are the most common type of Push system in JavaScript today. A Promise (the Producer) delivers a resolved value to registered callbacks (the Consumers), but unlike functions, it is the Promise which is in charge of determining precisely when that value is “pushed” to the callbacks.
RxJS introduces Observables, a new Push system for JavaScript. An Observable is a Producer of multiple values, “pushing” them to Observers (Consumers).
- A Function is a lazily evaluated computation that synchronously returns a single value on invocation.
- A generator is a lazily evaluated computation that synchronously returns zero to (potentially) infinite values on iteration.
- A Promise is a computation that may (or may not) eventually return a single value.
- An Observable is a lazily evaluated computation that can synchronously or asynchronously return zero to (potentially) infinite values from the time it’s invoked onwards.
With observable.subscribe, the given Observer is not registered as a listener in the Observable. The Observable does not even maintain a list of attached Observers.
A subscribe call is simply a way to start an “Observable execution” and deliver values or events to an Observer of that execution.
In an Observable Execution, zero to infinite Next notifications may be delivered. If either an Error or Complete notification is delivered, then nothing else can be delivered afterwards.
1 | const observable = new Observable(function subscribe(subscriber) { |
Observer
is a collection of callbacks that knows how to listen to values delivered by the Observable
Subscription
represents the execution of an Observable, is primarily useful for cancelling the execution
1 | import { interval } from 'rxjs'; |
Subject
An RxJS Subject is a special type of Observable that allows values to be multicasted to many Observers. While plain Observables are unicast (each subscribed Observer owns an independent execution of the Observable), Subjects are multicast.
Subjects are like EventEmitters: they maintain a registry of many listeners.
- Every Subject is an Observable
- Every Subject is an Observer
1 | import { Subject } from 'rxjs'; |
converted a unicast Observable execution to multicast through the Subject
1 | import { Subject, from } from 'rxjs'; |
1 | import { from, Subject } from 'rxjs'; |
1 | import { interval, Subject } from 'rxjs'; |
1 | import { Observable, Subject } from "rxjs"; |
- publish() — is a shortcut for multicast(() => new Subject()):
- publishBehavior() — is a shortcut for multicast(new BehaviorSubject()):
- publishReplay(x) — is a shortcut for multicast(() => new ReplaySubject(x)):
- publishLast() — is a shortcut for multicast(new AsyncSubject()):
- share() — is a shortcut for multicast(() => new Subject()) + refCount():
- shareReplay(bufferSize) — is a multicasting operator that uses a ReplaySubject(). It doesn’t internally use the multicast operator itself, and as a result it always returns an observable, rather than a ConnectableObservable. It can be used either with a refCount, or without it
BehaviorSubject
A variant of Subject that requires an initial value and emits its current value whenever it is subscribed to
1 | import { BehaviorSubject } from 'rxjs'; |
ReplaySubject
A variant of Subject that “replays” or emits old values to new subscribers. It buffers a set number of values and will emit those values immediately to any new subscribers in addition to emitting new values to existing subscribers.
1 | import { ReplaySubject } from 'rxjs'; |
1 | import { ReplaySubject } from 'rxjs'; |
AsyncSubject
A variant of Subject that only emits a value when it completes. It will emit its latest value to all its observers on completion.
1 | import { AsyncSubject } from 'rxjs'; |
operators
are pure functions that enable a functional programming style of dealing with collections with operations like map, filter, concat, reduce, etc
- Pipeable Operators. When called, they do not change the existing Observable instance. Instead, they return a new Observable, whose subscription logic is based on the first Observable.
- Creation Operators. can be called as standalone functions to create a new Observable. Distinct from pipeable operators, creation operators are functions that can be used to create an Observable with some common predefined behavior or by joining other Observables.
1 | import { of } from 'rxjs'; |
1 | import { of } from 'rxjs'; |
1 | obs.pipe( |
1 | import { interval } from 'rxjs'; |
Higher-order Observables
Observables most commonly emit ordinary values like strings and numbers, but surprisingly often, it is necessary to handle Observables of Observables, so-called higher-order Observables.
But how do you work with a higher-order Observable? Typically, by flattening: by (somehow) converting a higher-order Observable into an ordinary Observable. For example:
1 | const fileObservable = urlObservable.pipe( |
Creation Operators
Join Creation Operators
Join Operators
Multicasting Operators
Utility Operators
Conditional and Boolean Operators
Mathematical and Aggregate Operators
Grouping Operators | |
---|---|
combineAll | Takes an observable as its source and emits the latest value from each observable when any one of the inner observables emits. |
concatAll | Emit all values from each source observable, finishing one observable before the beginning the next. |
Grouping Values | |
---|---|
groupBy | It organizes the values from the source observable according to a condition that we provide as a parameter. |
pairwise | Groups values in pairs. This is useful, for example, to compare the values that were emitted. |
toArray | Collects all of the values, stuffs them into an array and passes them along to any subscribers. Nothing is emitted from toArray until the source closes and then the array arrives as a single value. |
1 | of(...players) |
Value Transformation | |
---|---|
concatMap | takes each input value and passes it to the function provided to the operator as a parameter. The result from the function is then passed to the subscribers and the next value from the source is processed. |
defaultIfEmpty | Allows us to specify a value to be emitted from a source if it completes without emitting any values. |
map | Maybe the single most popular operator in RxJS. It allows us to take every value coming out of a source and run it through a function that we provide. Then the value returned from that function is what gets send to subscribers. |
Filtering Operators | |
---|---|
skip | will be filtered out from the beginning of the source observable. |
skipWhile | Allows us to evaluate the values coming from our source and use them to determine whether we should skip. This is an one-time determination. Means that once the supplied predicate function evaluates too false, every value after that is accepted and emitted to the subscribers. |
take | Is the opposite of skip. Here we provide a number value as a parameter and that’s the number of items that will be accepted from the source and passed along to our subscribers. |
distinct | Allows us to eliminate duplicates from our stream. There are a couple of approaches to do this. If we don’t provide any parameter, then it will exclude all duplicates. If we provide a function, it will use that function to determine what duplicate means. The function will receive each value emitted from the source observable in sequence and then returns a potentially modified value, which is what is used for the comparison to determine uniqueness. |
distinctUntilChanged | This is a bit different from the plain distinct in that will only compare a new value emitted from the source observable to the immediately previous value. If they’re different, the new value will be passed along. If they’re identical, the new value will be dropped. |
filter | Allows us to get as fancy as we would like our elimination of values from the source observable. We need to provide a predicate function to do the comparison. |
first | Emits just the first value from a source and the unsubscribes. |
elementAt | Let us emit only the value from the source observable from the specified position. |
find | Allow just the first instance of a value that causes the predicate function to be true to passed along to the subscribers. Once that first value as passed along, the source observable is unsubscribed. So, no additional values are emitted. |
single | It emits one of the tree things: True, Error and Undefined |
#####
Error Handling Operators | |
---|---|
catchError | It allows our code to be notified when an error occurs in the source stream and reacts by either passing the error along or emitting another observable. In either event, it prevents an unexpected or unhandled stoppage in the stream. |
throwIfEmpty | If our source completes without emitting anything, this operator will force that to be considered an error. I know, nothing says that our source must emit something before it completes, but this operator gives us that capability. |
retry | It is pretty straightforward and useful for things like HTTP requests that may fail due to network issues. This operator allows us to go ahead and to re-subscribe to try again, up at certain numbers of retries, if our source throws an error. You have to have in mind that it starts at the beginning of that observable. |
use the pipe() function to make new operators
1 | import { pipe } from 'rxjs'; |
creating new operators from scratch
1 | import { Observable } from 'rxjs'; |
Schedulers
A scheduler controls when a subscription starts and when notifications are delivered. It consists of three components.
- A Scheduler is a data structure. It knows how to store and queue tasks based on priority or other criteria.
- A Scheduler is an execution context. It denotes where and when the task is executed (e.g. immediately, or in another callback mechanism such as setTimeout or process.nextTick, or the animation frame).
- A Scheduler has a (virtual) clock. It provides a notion of “time” by a getter method now() on the scheduler. Tasks being scheduled on a particular scheduler will adhere only to the time denoted by that clock.
1 | import { Observable, asyncScheduler } from 'rxjs'; |
scheduler types
- null By not passing any scheduler, notifications are delivered synchronously and recursively. Use this for constant-time operations or tail recursive operations.
- queueScheduler Schedules on a queue in the current event frame (trampoline scheduler). Use this for iteration operations.
- asapScheduler Schedules on the micro task queue, which is the same queue used for promises. Basically after the current job, but before the next job. Use this for asynchronous conversions.
- asyncScheduler Schedules work with setInterval. Use this for time-based operations.
- animationFrameScheduler Schedules task that will happen just before next browser content repaint. Can be used to create smooth browser animations.
1 | interface SchedulerLike { |
Use subscribeOn
to schedule in what context will the subscribe() call happen
Use observeOn
to schedule in what context will notifications be delivered.
QUESTION
- 查询同一种数据,可能是同步的(缓存中获取 sync),可能是异步的(AJAX获取 async),业务代码编写需要考虑两种情况。
- 获取数据(pull)和数据的更新通知(push),写法是不同的,会加大业务代码编写的复杂度。
- 每个渲染数据,都是通过若干个查询过程(刚才提到的组合同步异步)组合而成,如何清晰地定义这种组合关系?
- 对于已有数据和未来数据,如何简化它们应用同样规则的代码复杂度。
同步和异步
无论Promise还是Observable,都可以实现同步和异步的封装
获取和订阅
从视图的角度来说,数据哪里来的并不重要,只管显示
1 | # promise |
1 | # observable |
可组合的数据管道
一个视图所需要的数据可能是这样的:(data1,data2,data3都是包含了同步和异步封装的一个过程)
- data1跟data2通过某种组合,得到一个结果
- 这个结果再去跟data3组合,得到最终结果
抽象这个过程: 可以把每个Observable视为一节数据流的管道,我们所要做的,是根据它们之间的关系,把这些管道组装起来,这样,从管道的某个入口传入数据,在末端就可以得到最终的结果。
1 | const A$ = Observable.interval(1000) |
现在和未来
我们编写业务程序的时候,往往会把现在和未来分开考虑,而忽略了他们之间存在的深层次的一致性。
合适的业务抽象应该是如下规则
1 | # 来源等于初始数据与新增数据的合并 |
很多时候,我们编写代码都会考虑进行合适的抽象,但这两个字代表的含义在很多场景下并不相同。
很多人会懂得把代码划分为若干方法,若干类型,若干组件,以为这样就能够把整套业务的运转过程抽象出来,其实不然。
业务逻辑的抽象是与业务单元不同的方式,前者是血脉和神经,后者是肢体和器官,两者需要结合在一起,才能够成为鲜活的整体。
一般场景下,业务单元的抽象难度相对较低,很容易理解,也容易获得关注,所以通常都能做得还不错,比如最近两年,对于组件化之类的话题,都能够谈得起来了,但对于业务逻辑的抽象,大部分项目是做得很不够的,值得深思。