Observables 是多个值的惰性推送的集合,他们填补了下表中的缺失点:
Single | Multiple | |
---|---|---|
Pull | Function | Iterator |
Push | Promise | Observable |
下面是一个 Observable,它在订阅时立即(同步)推送值 1、2、3,并在订阅调用后一秒后推送值 4,然后完成:
import { Observable } from 'rxjs';
const observable = new Observable((subscriber) => {
subscriber.next(1);
subscriber.next(2);
subscriber.next(3);
setTimeout(() => {
subscriber.next(4);
subscriber.complete();
}, 1000)
});
要调用 Observable 并查看这些值,我们需要订阅它:
import { Observable } from 'rxjs';
const observable = new Observable((subscriber) => {
subscriber.next(1);
subscriber.next(2);
subscriber.next(3);
setTimeout(() => {
subscriber.next(4);
subscriber.complete();
}, 1000);
});
console.log('just before subscribe');
observable.subscribe(() => {
next(x) {
console.log('got value ' + x);
},
error(err) {
console.error('something wrong occurred: ' + err);
},
complete() {
console.log('done');
}
});
console.log('just after subscribe');
// Logs:
// just before subscribe
// got value 1
// got value 2
// got value 3
// just after subscribe
// got value 4
// done
Pull & Push#
Pull 和 Push 是两种不同的协议,描述了数据生产者如何与数据消费者进行通信。
Pull: 消费者决定何时从数据生产者那里接收数据。Producer 本身并不知道数据何时会交付给 Consumer。
每个 Javascript 函数都是一个 Pull 系统。该函数就是数据的生产者,调用该函数的代码通过从其调用中 pull 单个返回值来使用它。
ES2015 引入了生成函数和迭代器 (function*),这是另一种 Pull 系统,调用 iterator.next()
的代码消费者,从迭代器(生产者)中 pull 出多个值。
Producer | Consumer | |
---|---|---|
Pull | 被动:在请求时产生数据 | 主动:决定何时请求数据 |
Push | 主动:按照自己的节奏生成数据 | 被动:对接收到的数据做出反应。 |
Push: 生产者决定何时向消费者推送数据,消费者不知道何时会收到该数据。
Promises 是当今 Javascript 中最常见的推送系统类型。 Promise (生产者) 向已经注册的回调(消费者)传递已解析的值,但与函数不同的是,Promise 负责准确确定何时将值 “推送” 到回调。
RxJS 引入了 Observable, 一种新的 Javascript 推送系统。Observable 是多个值的生产者,将它们 “推送” 给观察者(消费者)。
- 函数是一种惰性求值计算,它在调用时同步返回单个值。
- 生成器是一种延迟评估的计算,它将迭代时同步返回零到(可能)无限值。
- Promise 是一种可能(或可能不会)最终返回单个值的计算。
- Observable 是一种惰性求值计算,从它被调用开始,它可以同步或异步返回零到(可能)无限值。
Observables 作为函数的归纳#
与流行的说法相反,Observables 既不像 EventEmitter, 也不像 Promises 的多值。Observables 在某些情况下可能表现的像 EventEmitters, 即当它们使用 RxJS Subject 进行多播时,但通常它们表现得不像 EventEmitters。
Observables 就像没有参数的函数,但推论出多个值。
function foo() {
console.log('Hello');
return 42;
}
const x = foo.call();
console.log(x);
const y = foo.call();
console.log(y);
// Logs:
// "Hello"
// 42
// "Hello"
// 42
使用 Observables:
import { Observable } from 'rxjs';
const foo = new Observable((subscriber) => {
console.log('Hello');
subscriber.next(42);
});
foo.subscribe((x) => {
console.log(x);
})
foo.subscribe((y) => {
console.log(y);
});
// Logs:
// "Hello"
// 42
// "Hello"
// 42
因为函数和 Observables 都是惰性计算,如果你不调用函数,console.log('Hello')
是不会发生的。同样对于 Observables, 如果你不订阅它 (调用 subscribe), console.log('Hello')
就不会发生。另外,“调用” 和 “订阅” 是一个孤立的操作:两个函数调用触发两个独立的副作用,两个 Observable 订阅触发两个独立的副作用。与 EventEmitters 不同,EventEmitters 具有共同的副作用并且不管订阅者是否存在都急于执行, Observables 没有共享执行且是惰性的。
订阅 Observable 类似于调用函数
有些人声称 Observables 是异步的。那不是真的,如果你用日志包围一个函数调用,就像这样:
console.log('before');
console.log(foo.call());
console.log('after');
// Logs:
// "before"
// "Hello"
// 42
// "after"
这与 Observables 行为相同:
console.log('before');
foo.subscribe((x) => {
console.log(x);
});
console.log('after');
// Logs:
// "before"
// "Hello"
// 42
// "after"
这证明 foo 的订阅完全是同步的,就像一个函数一样。
Observables 能够同步或异步传递值。
那 Observable 和函数有什么不同呢?随着时间的推移,Observables 可以 “返回” 多个值,这个是函数做不到的。
import { Observable } from 'rxjs';
const foo = new Observable((subscriber) => {
console.log('Hello');
subscriber.next(42);
subscriber.next(100);
subscriber.next(200);
});
console.log('before');
foo.subscribe((x) => {
console.log(x);
});
console.log('after');
// Logs:
// "before"
// "Hello"
// 42
// 100
// 200
// "after"
也可以异步返回值:
import { Observable } from 'rxjs';
const foo = new Observable((subscriber) => {
console.log('Hello');
subscriber.next(42);
subscriber.next(100);
subscriber.next(200);
setTimeout(() => {
subscriber.next(300); // 异步输出
}, 1000)
});
console.log('before');
foo.subscribe((x) => {
console.log(x);
});
console.log('after');
// Logs:
// "before"
// "Hello"
// 42
// 100
// 200
// "after"
// 300
结论:
func.call()
表示 “同步给我一个值”observable.subscribe()
表示 “给我任意数量的值,同步或者异步”
Observable 剖析#
observable 是用 new Observable
或其他创建运算符创建的,通过观察者订阅,执行以向观察者传递 next/error/complete 通知,并且它们的执行可能会被释放。这四个方面都编码在一个 Observable 实例中,但其中某一些方面与其他类型相关,例如 Observer 和 Subscription。
核心 observable 关注点:
- Creating 创建 Observable
- Subscribing 订阅 Observable
- Executing 执行 Observable
- Disposing 处理 Observable
创建 Observables#
Observable 构造函数接受一个参数:subscribe 函数。
下面的示例创建了一个 Observable 以每秒向订阅者发送字符串 “hi”。
import { Observable } from 'rxjs';
const observable = new Observable(function subscribe(subscriber) {
setInterval(() => {
subscriber.next('hi');
}, 1000);
});
可以使用 new Observable 创建 Observable。最常见的还可以使用创建函数来创建,例如 of、from、interval 等。
在上面的例子中,subscribe 函数是描述 Observable 最重要的部分。让我们看看订阅意味着什么。
订阅 Observables#
示例中的 Observable 可以被这样订阅:
observable.subscribe((x) => console.log(x));
new Observable(function subscribe(subscriber) {...})
中的 observable.subscribe
和 subscribe
同名并非巧合。在库中,它们是不同的,但出于实际目的,可以认为它们在概念上是相同的。
这显示了如何不在同一 Observable 的多个观察者之间共享 subscribe 调用。当用观察者调用 observable.subscribe
时,new Observable(function subscribe(subscriber) {...})
中的函数 subscribe 会为该订阅者运行。每次调用 observable.subscribe
都会为给定的订阅者触发其自己的独立设置。
订阅一个 Observable 就像调用一个函数,在数据将被传送到的地方提供回调。
这与 addEventListener /removeEventListener 等处理事件程序 API 截然不同。使用 observable.subscribe
时,给定的 Observer 未在 Observable 中注册为监听器。Observable 甚至不维护附加的观察者列表。
执行 Observables#
new Observable(function subscribe(subscriber) {...})
中的代码代表一个”Observable 执行 “,一个只对每个订阅的 Observer 发生的惰性计算。随着时间的推移,执行会同步或异步地产生多个值。
Observable 执行可以传递三种类型的值:
- “Next” 下一个通知:发送一个值,如数字、字符串、对象等。
- “Error” 错误通知:发送 Javascript 错误或异常。
- “Complete” 完成通知:不发送值。
“Next” 是最常见和最重要的类型:它们代表传递给订阅者的实际数据。“Error” 和 “Complete” 通知在 Observable 执行期间可能只发生一次,并且只能有其中一种。
这些约束在 Observable Grammar 或 Contract 中表达的很好,写成正则表达式:
next*(error|complete)?
在 Observable 执行中,可以传递零到无限的 Next 通知。如果传递了错误或完成通知,则之后无法传递任何其他内容。
以下是 Observable 执行的示例,它传递三个 Next 通知,然后完成:
import { Observable } from 'rxjs';
const observable = new Observable(function subscribe(subscriber) {
subscriber.next(1);
subscriber.next(2);
subscriber.next(3);
subscriber.complete();
});
Observable 严格遵守 Observable Contract, 因此以下代码不会传递 Next 通知 4:
import { Observable } from 'rxjs';
const observable = new Observable(function subscribe(subscriber) {
subscriber.next(1);
subscriber.next(2);
subscriber.next(3);
subscriber.complete();
subscriber.next(4); // 不会发出通知
});
最好用 try /catch 块将 subscribe 中的所有代码包装起来,如果它能捕获到异常,它将发送错误通知:
import { Observable } from 'rxjs';
const observable = new Observable(function subscribe(subscriber) {
try {
subscriber.next(1);
subscriber.next(2);
subscriber.next(3);
subscriber.complete();
} catch (err) {
subscriber.error(err); // 如果捕获到一个错误,则传递一个错误
}
});
处理 Observable 的执行#
因为 Observable 的执行可能是无限的,并且观察者想要在有限时间内终止执行是很常见的,所以我们需要一个 API 来取消执行。由于每次执行仅由一个 Observer 独占,一旦 Observer 完成接收值,它必须有一种方法来停止执行,以避免浪费计算能力或内存资源。
当 observable.subscribe
被调用时,Observer 会附加到新创建的 Observable 执行中。此调用会返回一个对象 Subscription。
const subscription = observable.subscribe((x) => console.log(x));
订阅代表正在进行的执行,并且有一个最小的 API,允许你取消执行。
import { from } from 'rxjs';
const observable = from([10, 20, 30]);
const subscription = observable.subscribe((x) => console.log(x));
// Later
subscription.unsubscribe();
当你订阅时,你会得到一个 subscription,它代表正在进行的 Observable 执行。只需调用 unsubscribe () 即可取消执行。
当我们使用 create()
创建 Observable 时,每个 Observable 都必须定义如何处理该执行的资源。您可以通过从 function subscribe()
中返回一个自定义的 unsubscribe
函数来做到这一点。
import { Observable } from 'rxjs';
const observable = new Observable(function subscribe(subscriber) {
const intervalId = setInterval(() => {
subscriber.next('hi');
}, 1000);
return function unsubscribe() {
clearInterval(intervalId);
}
});
就像 observable.subscribe
类似于 new Observable(function subscribe() {...})
一样,我们从 subscribe 返回的 unsubscribe 在概念上等于 subscription.unsubscribe
。事实上,如果我们删除围绕这些概念的 ReactiveX 类型,我们就只剩下相当简单的 Javascript。
function subscribe(subscriber) {
const intervalId = setInterval(() => {
subscriber.next('hi');
}, 1000);
return function unsubscribe() {
clearInterval(intervalId);
};
}
const unsubscribe = subscribe({ next: (x) => console.log(x) });
// Later
unsubscribe();
我们使用 Observable、Observer 和 Subscription 等这些 Rx 类型的原因其实就是为了获得安全性以及与 Operators 的组合性。