RxJS - 使用指南

Posted by huangqing on July 11, 2017

RxJS

reactive官网 / RxJS官网 / RxJS github / jsfiddle

弹珠图工具

BOOK

Rx

RxJS是结合了观察者模式,迭代器模式和函数式编程优点的产物。

RP 是针对异步数据流的编程。

基于流的概念,RP赋予了你一系列神奇的函数工具集,使用他们可以合并、创建、过滤这些流。 一个流或者一系列流可以作为另一个流的输入。你可以 合并 两个流,从一堆流中 过滤 你真正感兴趣的那一些,将值从一个流 映射 到另一个流.

流是包含了有时序,正在进行事件的序列,可以发射(emmit)值(某种类型)、错误、完成信号。流在包含按钮的浏览器窗口被关闭时发出完成信号。

我们异步地捕获发射的事件,定义一系列函数在值被发射后,在错误被发射后,在完成信号被发射后执行。有时,我们忽略对错误,完成信号地处理,仅仅关注对值的处理。对流进行监听,通常称为订阅,处理流的函数是观测者,流是被观测的主体。这就是观测者设计模式。

Promises

ES6中引入了Promises来解决回调地狱问题。promise代表了异步程序,并在未来某个时刻完成。使用细节请谷歌百度。然而promise也有自身的缺点:

  • 数据源产生多个值,比如鼠标移动事情或者文件系统的字节流;
  • 没有失败重试的机制;
  • 没有取消机制;

RxJS可以解决的问题

  • 我们知道传统的for,while对循环体中的异步程序是无法感知的,或者说,它们不会等待异步程序执行完毕再进入下一轮循环。

  • 错误处理是任何程序都要解决的问题,本身就已很复杂的回调函数中再嵌入try/catch块吗?如果还想加入重试机制呢?
  • 商业逻辑内嵌在回调函数中,可读性差,复杂度高。现如今流行的组件化编程,即可重用,又可解耦,还方便测试;
  • 闭包是强大的,过度地使用闭包将导致我们不得不谨慎地审视变量的作用域以及其值。再加上共享变量带来的副作用,+ 混杂在if/else条件语句和for循环中,每天都会有修不完的bug;
  • 根据事件或耗时操作无响应的时间进行取消操作;
  • 自己实现throttling和debouncing是很困难的(二者区别见http://www.jianshu.com/p/e91775195608)
  • 众所周知的事件监听带来的内存泄露问题;

RxJS可以优雅地替代callback,或者基于promises的任何第三方库,使我们可以使用一个编程模型来对待任何数据源(除了远程http请求,Node.js中的Event Emitter也使用的是回调机制)。也就是说,我们可以用RxJS来处理读取文件,http请求,鼠标点击,鼠标移动这些事件。

RxJS中的组件

  • 生产者:在RxJS中的生产者叫做Observables。Observables负责推送事件,但不处理事件;

  • 消费者:在RxJS中的消费者叫做Observer

  • 管道:在RxJS中,管道中的一个一个函数叫做observable operators,简称operators

  • 时间:我们知道异步程序不容易处理的背后实质就是时间问题,RxJS是面向异步编程的解决方案,因此时间遍布于RxJS中的每一个角落。目前为止,我们只需要了解时间在RxJS中不是恒定的,产生事件的快慢与否取决于我们的需求。当然,RxJS给了我们解决方案。

响应式编程范式与其他编程范式

面向对象编程以状态为中心,函数式编程以行为为中心,而响应式编程则需要我们把数据看做是改变并流动着的。RxJS可以很好的和其他范式配合起来使用。我们可以用面向对象范式来构建我们的模型,用函数式编程和响应式编程来构建行为和处理事件。

在面向对象编程中,状态是保存在变量或者集合对象里的。而响应式编程中的状态是短暂的、瞬间的,也就是说在Rx中是永远不保留状态的。

面向对象编程是典型的命令式编程,而响应式编程则鼓励我们写声明式的程序,也就是表达做什么,而不是表达怎么做。RxJS从函数式编程中借鉴了这一点。

RxJS 教程

RxJS 简书

Observable

RxJS-Observable

Subscriber

create:A static factory for a Subscriber, given a (potentially partial) definition of an Observer.

public static create(next: function(x: ?T): void, error: function(e: ?any): void, complete: function(): void): Subscriber<T>

Subject

reactivex:subject

在RxJS中,Subject是一类特殊的Observable,它可以向多个Observer多路推送数值。普通的Observable并不具备多路推送的能力(每一个Observer都有自己独立的执行环境),而Subject可以共享一个执行环境。

通过Subject将普通的Observable单路推送转换为多路推送。这说明了Subject的作用:作为单路Observable转变为多路Observable的桥梁

每一个Subject都是一个Observable(可观察对象) 对于一个Subject,你可以订阅(subscribe)它,Observer会和往常一样接收到数据。从Observer的视角看,它并不能区分自己的执行环境是普通Observable的单路推送还是基于Subject的多路推送。

Subject的内部实现中,并不会在被订阅(subscribe)后创建新的执行环境。它仅仅会把新的Observer注册在由它本身维护的Observer列表中,这和其他语言、库中的addListener机制类似。

每一个Subject也可以作为Observer(观察者) Subject同样也是一个由next(v),error(e),和 complete()这些方法组成的对象。调用next(theValue)方法后,Subject会向所有已经在其上注册的Observer多路推送theValue。

下面的例子中,我们在Subject上注册了两个Observer,并且多路推送了一些数值:

var subject = new Rx.Subject();

subject.subscribe({
  next: (v) => console.log('observerA: ' + v)
});
subject.subscribe({
  next: (v) => console.log('observerB: ' + v)
});

subject.next(1);
subject.next(2);

控制台输出结果如下:

observerA: 1
observerB: 1
observerA: 2
observerB: 2

既然Subject是一个Observer,你可以把它作为subscribe(订阅)普通Observable时的参数,如下面例子所示:

var subject = new Rx.Subject();

subject.subscribe({
  next: (v) => console.log('observerA: ' + v)
});

subject.subscribe({
  next: (v) => console.log('observerB: ' + v)
});

var observable = Rx.Observable.from([1, 2, 3]);

observable.subscribe(subject); // 你可以传递Subject来订阅observable

执行后结果如下:

observerA: 1
observerB: 1
observerA: 2
observerB: 2
observerA: 3
observerB: 3

还有几种特殊的Subject 类型,分别是BehaviorSubjectReplaySubject,和 AsyncSubject

BehaviorSubject

BehaviorSubject是Subject的一个衍生类,具有“最新的值”的概念。它总是保存最近向数据消费者发送的值,当一个Observer订阅后,它会即刻从BehaviorSubject收到“最新的值”。

ReplaySubject

ReplaySubject 如同于BehaviorSubject是 Subject 的子类。通过 ReplaySubject可以向新的订阅者推送旧数值,就像一个录像机ReplaySubject可以记录Observable的一部分状态(过去时间内推送的值)。

一个ReplaySubject可以记录Observable执行过程中推送的多个值,并向新的订阅者回放它们。

AsyncSubject

AsyncSubject是Subject的另外一个衍生类,Observable仅会在执行完成后,推送执行环境中的最后一个值。

AsyncSubject 与 last() 操作符相似,等待完成通知后推送执行过程的最后一个值。

RxJava 第二篇 - Subject使用及示例

RxJS 核心概念之Subject