RxJS 是 Javascript 的响应式编程库。它使用可观察的序列来解决异步操作和事件处理。它提供一种核心类型,即 Observable、卫星类型(Observer、Schedulers、Subjects)和受 Array 方法 (map、filter、reduce、every 等)启发的运算符,以允许处理异步事件作为集合。
ReactiveX 将观察者模式与迭代器模式以及函数式编程与集合相结合,以满足对管理事件序列的理想方式的需求。
RxJS 中解决异步事件管理的基本概念是:
- Observable: 表示未来值或事件的可调用集合的想法。
- Observer: 是回调的集合,知道如何监听 Observer 传递的值。
- Subscription: 表示 Observable 的执行,主要用于取消执行。
- Operators: 运算符,是纯函数,支持使用 map、filter、concat、reduce 等操作处理集合的函数式编程风格。
- Subject: 相当于一个 EventEmitter,是将一个值或事件多播给多个 Observers 的唯一途径。
- Schedulers: 调度程序,是控制并发的集中式调度程序,允许我们在计算发生时进行协调,例如 setTimeout 或 requestAnimationFrame 等。
举个例子
通常你注册事件监听器
document.addEventListener('click', () => console.log('Clicked!'));
使用 RxJS,你可以创建一个可观察者对象
import { fromEvent } from 'rxjs';
fromEvent(document, 'click').subscribe(() => console.log('Clicked!'));
纯函数#
使 RxJS 强大的是它的纯函数产生值的能力。这意味着你的代码不太容易出错。
通常你会创建一个不纯的函数,你的代码的其他部分可能会弄乱你的状态。
let count = 0;
document.addEventListener('click', () => console.log(`Clicked ${++counts} times`));
使用 RxJS 可以隔离状态。
import { fromEvent, scan } from 'rxjs';
fromEvent(document, 'click')
.pipe(scan((count) => count + 1, 0))
.subscribe((count) => console.log(`Clicked ${++count} times`));
scan 运算符的工作方式与数组 reduce 类似。它接受一个暴露给回调的值。回调的返回值将成为下一次回调运行时的入参。
流#
RxJS 拥有一整套操作符,可帮助你控制事件如何流经你的可观察对象。
这是你允许每秒最多点击一次的方式,使用纯 javascript:
let count = 0;
let rate = 1000;
let lastClick = Date.now() - rate;
document.addEventListener('click', () => {
if (Date.now() - lastClick >= rate) {
console.log(`Clicked ${++count} times`);
lastClick = Date.now();
}
});
用 RxJS 实现:
import { fromEvent, throttleTime, scan } from 'rxjs';
fromEvent(document, 'click')
.pipe(
throttleTime(1000),
scan((count) => count + 1, 0)
)
.subscribe((count) => console.log(`Clicked ${count} times`));
其他流控制运算符还有 filter、delay、debounceTime、take、takeUntil、distinct、distinctUntilChanged 等。
值#
你可以转换通过可观察者对象的值。
以下是如何在纯 javascript 中为每次点击添加当前鼠标 x 的位置:
let count = 0;
const rate = 1000;
let lastClick = Date.now() - rate;
document.addEventListener('click', (event) => {
if (Date.now() - lastClick >= rate) {
count += event.clientX;
console.log(count);
lastClick = Date.now();
}
})
用 RxJS 实现:
import { fromEvent, throttleTime, map, scan } from 'rxjs';
fromEvent(document, 'click')
.pipe(
throttleTime(1000),
map((event) => event.clientX),
scan((count, clientX) => count + clientX, 0)
)
.subscribe((count) => console.log(count));
其他转换值的操作符还有 pluck、pairwise、sample 等。