RxJS(以下简称Rx)是使用 Observables 进行反应式编程的库,让组织异步和基于回调的代码更为方便。我们先从最核心的 Observable 来了解Rx,并且尝试以我们已有的知识—函数为基础来类比,帮助理解 Observable。
Observable 一种特殊的函数
我们首先选择了函数作为类比对象,尝试从不同角度分析Observable的特性。
封装
我们先看一个最普通的函数:
1 | function foo() { |
我们声明了函数 foo()
,在里面封装了一段很简单的逻辑,然后调用函数,执行函数内部封装的逻辑。
然后我们来看使用 Observable 是怎样实现的:
1 | const foo = Rx.Observable.create(() => { |
上面的例子中,通过 Rx.Observable.create()
来创建 Observable 对象,将同样将一段代码逻辑封装到 Observable 对象 foo
中,然后通过 foo.subscribe()
来执行封装的代码逻辑。
对于普通函数和 Observable 对象,封装的代码逻辑在每次调用时都会重新执行一次。从这一点看,Observable 能够和普通函数一样实现封装代码进行复用。
返回值
普通函数调用后可以有返回值:
1 | function foo() { |
Observable 执行后也会产生值,不过和函数直接返回的方式不同,要通过回调函数方式获取:
1 | const foo = Rx.Observable.create((observer) => { |
Observable 对象是通过在内部调用 observer.next(42)
这种方式返回值,而调用方则通过回调函数来接收返回的数据。形式上比普通函数直接返回值麻烦一些。
从调用方的角度来看,两个过程分别是:
- 普通函数:调用 > 执行逻辑 > 返回数据
- Observable:订阅(subscribe) > 执行逻辑 > 返回数据
从获取返回值方式来看,调用函数是一种直接获取数据的模式,从函数那里“拿”(pull)数据;而 Observable 订阅后,是要由 Observable 通过间接调用回调函数的方式,将数据“推”(push)给调用方。
这里 pull 和 push 的重要区别在于,push 模式下,Observable 可以决定什么时候返回值,以及返回几个值(即调用回调函数的次数)。
1 | const foo = Rx.Observable.create((observer) => { |
上面例子中,Observable 返回了两个值,第1个值同步返回,第2个值则是过了1秒后异步返回。
也就是说,从返回值来说,Observable 相比普通函数区别在于:
- 可以返回多个值
- 可以异步返回值
tip:如果你对ES6有所了解,应该知道 Promise 对象也可以实现异步返回值,那么 Promise 和 Observable 的区别有哪些?可以思考一下。
异常处理
函数执行可能出现异常情况,例如:
1 | function foo() { |
我们可以在函数调用的地方捕获到异常状态进行处理:
1 | try { |
对于 Observable,也有错误处理的机制:
1 | const foo = Rx.Observable.create((observer) => { |
Observable 的 subscribe()
方法支持传入额外的回调函数,用于处理异常情况。和函数执行类似,出现错误之后,Observable 就不再继续返回数据了。
subscribe()
方法还支持另一种形式传入回调函数:
1 | foo.subscribe({ |
这种形式下,传入的对象和 Observable 内部执行函数中的 observer 参数在形式上就比较一致了。
中止执行
Observable 内部的逻辑可以异步多个返回值,甚至返回无数个值:
1 | const foo = Rx.Observable.create((observer) => { |
上面例子中,Observable 对象每隔 1 秒会返回一个值给调用方,这时如果不退出程序,就会一直返回下去。
Rx 提供了中止 Observable 执行的机制:
1 | const foo = Rx.Observable.create((observer) => { |
subscribe()
方法返回一个订阅对象 subscription,该对象上的 unsubscribe()
方法用于取消订阅,也就是中止 Observable 内部逻辑的执行,停止返回新的数据。
对于具体的 Observable 对象是如何中止执行,则要由 Observable 在执行后返回一个用于中止执行的函数,像上面例子中的这种方式。
Observable 执行结束后,会触发观察者的 complete 回调:
1 | foo.subscribe({ |
Observable 的观察者共有上面三种回调:
- next:获得数据
- error:处理异常
- complete:执行结束
其中 next 可以被多次调用,error 和 complete 最多只有一个被调用一次(任意一个被调用后不再触发其他回调)。
数据转换
对于函数返回值,有时候我们要转换后再使用,例如:
1 | function foo() { |
对于 Observable 返回的值,也会有类似的情况,通常采用以下方式:
1 | const foo = Rx.Observable.create((observer) => { |
其实foo.map()
返回了新的 Observable 对象,上面代码等价于:
1 | const foo2 = foo.map(i => i * 2) |
Observable 对象 foo2 被订阅时执行的内部逻辑可以简单视为:
1 | function subscribe(observer) { |
将这种对数据的处理和数组进行比较看看:
1 | const array = [0, 1, 2, 3, 4, 5] |
除了 map() 方法,Observable 还提供了多种转换方法,如 filter() 用于过滤数据,find() 值返回第一个满足条件的数据,reduce() 对数据进行累积处理,在执行结束后返回最终的数据。这些方法和数组方法功能是类似的,只不过是对 Observable 对象返回值的处理。还有一些转换方法更加强大,例如可以 debounceTime() 可以在时间维度上对数据进行拦截等等。
Observable 的转换方法,本质是创建了一个新的 Observable,新的 Observable 基于一定的逻辑对原 Observable 的返回值进行转换处理,然后再推送给观察者。
数据通信机制
接下来我们深入讨论一下返回值的问题,我们把 Observable 类比函数主要都有处理数据并返回值的特性,而处理数据的返回值方式可以分为拉取和推送两种,与其对应的数据处理对象即为消费者和生产者。
拉取很简单,就是一种消费者主动索取所需数据的方式,生产者只有当消费者发出需求信号时才会提供数据。函数调用就是一种简单的拉取实例,消费者主动调用。
推送则是由生产者决定何时向消费者传送数据,消费者无法自己单独决定获取数据的时机,被动接受。Js中的事件监听和 Promise(还包括回调函数)就是很典型的数据推送系统(或者叫观察者模式/Observer Pattern,回调可以看做只有一个观察者的观察者模式),而 Observable 就是一种更为强大的数据推送系统( Promise ++)。
以下为几类常见的拉取和推送模型:
- Function: 单次拉取模型,调用时会同步的返回单一值
- Generator/Iterator: 多次拉取模型,调用时会同步的返回0到无限多个值
- Promise: 单次推送模型,由生产者决定时机返回单一值
- Observable: 多次推送模型,由生产者决定时机返回0到无限多个值
如果把函数理解为处理数据并返回值,那么以上这些可以都理解为函数的变种,现在我们可以回答一下返回值最后关于 Promise 和 Observable 区别的问题:
Promise 可以算作一个 Observable,那么 Observable 其实就是 Promise++ 。在 Rx 中,你可以很简单地把一个 Promise 转换成一个 Observable,只需要:var stream = Rx.Observable.fromPromise(promise) ,接下来我们会使用他。Observable 和 Promise++ 的唯一区别是前者不兼容 Promise/A+ ,但是理论上来讲是没有冲突的。Promise 其实就是只有单独一个值 的 Observable ,但后者更胜一筹的是允许多个返回值(多次 emit)。
这其实是一件很棒的事情,Promise 能做的事情,Observable 也能做。Promise 不能做的事情,Observable 还是能做。
总结
我们从函数的角度总结了 Observable 的一些特性,我们把 Observable 看做一个特殊的函数,它有和函数类似的东西,例如封装了一段逻辑,每次调用时都会重新执行逻辑,执行有返回数据等;也有更特殊的,例如数据是推送(push)的方式返回给调用方法,返回值可以是异步,并且可以返回多个值等。
下次我们会尝试从流的角度分析 Observable。