title: RxJS
date: 2020-05-21 16:42:08
- 回调
- 事件
- Promise
- Generator
- 分发
- 流程
- 用户操作
- 网络响应
- 定时器
- Worker
- 单值:of, empty, never
- 多值:from
- 定时:interval, timer
- 从事件创建:fromEvent
- 从Promise创建:fromPromise
- 自定义创建:create
- 改变数据形态:map, mapTo, pluck
- 过滤一些值:filter, skip, first, last, take
- 时间轴上的操作:delay, timeout, throttle, debounce, audit, bufferTime
- 累加:reduce, scan
- 异常处理:throw, catch, retry, finally
- 条件执行:takeUntil, delayWhen, retryWhen, subscribeOn, ObserveOn
- 转接:switch
- concat,保持原来的序列顺序连接两个数据流
- merge,合并序列
- race,预设条件为其中一个数据流完成
- forkJoin,预设条件为所有数据流都完成
- zip,取各来源数据流最后一个值合并为对象
- combineLatest,取各来源数据流最后一个值合并为数组
这时候回头看,其实RxJS在事件处理的路上已经走得太远了,从事件到流,它被称为lodash for events,倒不如说是lodash for stream更贴切,它提供的这些操作符也确实可以跟lodash媲美。
- Observable 可观察序列,只出不进
- Observer 观察者,只进不出
- Subject 可出可进的可观察序列,可作为观察者
- ReplaySubject 带回放
- Subscription 订阅关系
1 | let count = 0; |
1 | import { Observable } from "rxjs"; |
represents the idea of an invokable collection of future values or events
Pull | Function | Iterator |
Push | Promise | Observable |
Pull and Push are two different protocols that describe how a data Producer can communicate with a data Consumer.
What is Pull? In Pull systems, the Consumer determines when it receives data from the data Producer. The Producer itself is unaware of when the data will be delivered to the Consumer.
Every JavaScript Function is a Pull system. The function is a Producer of data, and the code that calls the function is consuming it by “pulling” out a single return value from its call.
ES2015 introduced generator functions and iterators (function*), another type of Pull system. Code that calls iterator.next() is the Consumer, “pulling” out multiple values from the iterator (the Producer).
What is Push? In Push systems, the Producer determines when to send data to the Consumer. The Consumer is unaware of when it will receive that data.
Promises are the most common type of Push system in JavaScript today. A Promise (the Producer) delivers a resolved value to registered callbacks (the Consumers), but unlike functions, it is the Promise which is in charge of determining precisely when that value is “pushed” to the callbacks.
RxJS introduces Observables, a new Push system for JavaScript. An Observable is a Producer of multiple values, “pushing” them to Observers (Consumers).
- A Function is a lazily evaluated computation that synchronously returns a single value on invocation.
- A generator is a lazily evaluated computation that synchronously returns zero to (potentially) infinite values on iteration.
- A Promise is a computation that may (or may not) eventually return a single value.
- An Observable is a lazily evaluated computation that can synchronously or asynchronously return zero to (potentially) infinite values from the time it’s invoked onwards.
With observable.subscribe, the given Observer is not registered as a listener in the Observable. The Observable does not even maintain a list of attached Observers.
A subscribe call is simply a way to start an “Observable execution” and deliver values or events to an Observer of that execution.
In an Observable Execution, zero to infinite Next notifications may be delivered. If either an Error or Complete notification is delivered, then nothing else can be delivered afterwards.
1 | const observable = new Observable(function subscribe(subscriber) { |
is a collection of callbacks that knows how to listen to values delivered by the Observable
represents the execution of an Observable, is primarily useful for cancelling the execution
1 | import { interval } from 'rxjs'; |
An RxJS Subject is a special type of Observable that allows values to be multicasted to many Observers. While plain Observables are unicast (each subscribed Observer owns an independent execution of the Observable), Subjects are multicast.
Subjects are like EventEmitters: they maintain a registry of many listeners.
- Every Subject is an Observable
- Every Subject is an Observer
1 | import { Subject } from 'rxjs'; |
converted a unicast Observable execution to multicast through the Subject
1 | import { Subject, from } from 'rxjs'; |
1 | import { from, Subject } from 'rxjs'; |
1 | import { interval, Subject } from 'rxjs'; |
1 | import { Observable, Subject } from "rxjs"; |
- publish() — is a shortcut for multicast(() => new Subject()):
- publishBehavior() — is a shortcut for multicast(new BehaviorSubject()):
- publishReplay(x) — is a shortcut for multicast(() => new ReplaySubject(x)):
- publishLast() — is a shortcut for multicast(new AsyncSubject()):
- share() — is a shortcut for multicast(() => new Subject()) + refCount():
- shareReplay(bufferSize) — is a multicasting operator that uses a ReplaySubject(). It doesn’t internally use the multicast operator itself, and as a result it always returns an observable, rather than a ConnectableObservable. It can be used either with a refCount, or without it
A variant of Subject that requires an initial value and emits its current value whenever it is subscribed to
1 | import { BehaviorSubject } from 'rxjs'; |
A variant of Subject that “replays” or emits old values to new subscribers. It buffers a set number of values and will emit those values immediately to any new subscribers in addition to emitting new values to existing subscribers.
1 | import { ReplaySubject } from 'rxjs'; |
1 | import { ReplaySubject } from 'rxjs'; |
A variant of Subject that only emits a value when it completes. It will emit its latest value to all its observers on completion.
1 | import { AsyncSubject } from 'rxjs'; |
are pure functions that enable a functional programming style of dealing with collections with operations like map, filter, concat, reduce, etc
- Pipeable Operators. When called, they do not change the existing Observable instance. Instead, they return a new Observable, whose subscription logic is based on the first Observable.
- Creation Operators. can be called as standalone functions to create a new Observable. Distinct from pipeable operators, creation operators are functions that can be used to create an Observable with some common predefined behavior or by joining other Observables.
1 | import { of } from 'rxjs'; |
1 | import { of } from 'rxjs'; |
1 | obs.pipe( |
1 | import { interval } from 'rxjs'; |
Higher-order Observables
Observables most commonly emit ordinary values like strings and numbers, but surprisingly often, it is necessary to handle Observables of Observables, so-called higher-order Observables.
But how do you work with a higher-order Observable? Typically, by flattening: by (somehow) converting a higher-order Observable into an ordinary Observable. For example:
1 | const fileObservable = urlObservable.pipe( |
Creation Operators
Join Creation Operators
Join Operators
Multicasting Operators
Utility Operators
Conditional and Boolean Operators
Mathematical and Aggregate Operators
Grouping Operators | |
combineAll | Takes an observable as its source and emits the latest value from each observable when any one of the inner observables emits. |
concatAll | Emit all values from each source observable, finishing one observable before the beginning the next. |
Grouping Values | |
groupBy | It organizes the values from the source observable according to a condition that we provide as a parameter. |
pairwise | Groups values in pairs. This is useful, for example, to compare the values that were emitted. |
toArray | Collects all of the values, stuffs them into an array and passes them along to any subscribers. Nothing is emitted from toArray until the source closes and then the array arrives as a single value. |
1 | of(...players) |
Value Transformation | |
concatMap | takes each input value and passes it to the function provided to the operator as a parameter. The result from the function is then passed to the subscribers and the next value from the source is processed. |
defaultIfEmpty | Allows us to specify a value to be emitted from a source if it completes without emitting any values. |
map | Maybe the single most popular operator in RxJS. It allows us to take every value coming out of a source and run it through a function that we provide. Then the value returned from that function is what gets send to subscribers. |
Filtering Operators | |
skip | will be filtered out from the beginning of the source observable. |
skipWhile | Allows us to evaluate the values coming from our source and use them to determine whether we should skip. This is an one-time determination. Means that once the supplied predicate function evaluates too false, every value after that is accepted and emitted to the subscribers. |
take | Is the opposite of skip. Here we provide a number value as a parameter and that’s the number of items that will be accepted from the source and passed along to our subscribers. |
distinct | Allows us to eliminate duplicates from our stream. There are a couple of approaches to do this. If we don’t provide any parameter, then it will exclude all duplicates. If we provide a function, it will use that function to determine what duplicate means. The function will receive each value emitted from the source observable in sequence and then returns a potentially modified value, which is what is used for the comparison to determine uniqueness. |
distinctUntilChanged | This is a bit different from the plain distinct in that will only compare a new value emitted from the source observable to the immediately previous value. If they’re different, the new value will be passed along. If they’re identical, the new value will be dropped. |
filter | Allows us to get as fancy as we would like our elimination of values from the source observable. We need to provide a predicate function to do the comparison. |
first | Emits just the first value from a source and the unsubscribes. |
elementAt | Let us emit only the value from the source observable from the specified position. |
find | Allow just the first instance of a value that causes the predicate function to be true to passed along to the subscribers. Once that first value as passed along, the source observable is unsubscribed. So, no additional values are emitted. |
single | It emits one of the tree things: True, Error and Undefined |
Error Handling Operators | |
catchError | It allows our code to be notified when an error occurs in the source stream and reacts by either passing the error along or emitting another observable. In either event, it prevents an unexpected or unhandled stoppage in the stream. |
throwIfEmpty | If our source completes without emitting anything, this operator will force that to be considered an error. I know, nothing says that our source must emit something before it completes, but this operator gives us that capability. |
retry | It is pretty straightforward and useful for things like HTTP requests that may fail due to network issues. This operator allows us to go ahead and to re-subscribe to try again, up at certain numbers of retries, if our source throws an error. You have to have in mind that it starts at the beginning of that observable. |
use the pipe() function to make new operators
1 | import { pipe } from 'rxjs'; |
creating new operators from scratch
1 | import { Observable } from 'rxjs'; |
A scheduler controls when a subscription starts and when notifications are delivered. It consists of three components.
- A Scheduler is a data structure. It knows how to store and queue tasks based on priority or other criteria.
- A Scheduler is an execution context. It denotes where and when the task is executed (e.g. immediately, or in another callback mechanism such as setTimeout or process.nextTick, or the animation frame).
- A Scheduler has a (virtual) clock. It provides a notion of “time” by a getter method now() on the scheduler. Tasks being scheduled on a particular scheduler will adhere only to the time denoted by that clock.
1 | import { Observable, asyncScheduler } from 'rxjs'; |
scheduler types
- null By not passing any scheduler, notifications are delivered synchronously and recursively. Use this for constant-time operations or tail recursive operations.
- queueScheduler Schedules on a queue in the current event frame (trampoline scheduler). Use this for iteration operations.
- asapScheduler Schedules on the micro task queue, which is the same queue used for promises. Basically after the current job, but before the next job. Use this for asynchronous conversions.
- asyncScheduler Schedules work with setInterval. Use this for time-based operations.
- animationFrameScheduler Schedules task that will happen just before next browser content repaint. Can be used to create smooth browser animations.
1 | interface SchedulerLike { |
Use subscribeOn
to schedule in what context will the subscribe() call happen
Use observeOn
to schedule in what context will notifications be delivered.
- 查询同一种数据,可能是同步的(缓存中获取 sync),可能是异步的(AJAX获取 async),业务代码编写需要考虑两种情况。
- 获取数据(pull)和数据的更新通知(push),写法是不同的,会加大业务代码编写的复杂度。
- 每个渲染数据,都是通过若干个查询过程(刚才提到的组合同步异步)组合而成,如何清晰地定义这种组合关系?
- 对于已有数据和未来数据,如何简化它们应用同样规则的代码复杂度。
1 | # promise |
1 | # observable |
- data1跟data2通过某种组合,得到一个结果
- 这个结果再去跟data3组合,得到最终结果
抽象这个过程: 可以把每个Observable视为一节数据流的管道,我们所要做的,是根据它们之间的关系,把这些管道组装起来,这样,从管道的某个入口传入数据,在末端就可以得到最终的结果。
1 | const A$ = Observable.interval(1000) |
1 | # 来源等于初始数据与新增数据的合并 |