banner
Tomorrow

Tomorrow

不骄不躁
twitter

RxJS —— Observable 可观察对象

Observables 是多个值的惰性推送的集合,他们填补了下表中的缺失点:

SingleMultiple
PullFunctionIterator
PushPromiseObservable

下面是一个 Observable,它在订阅时立即(同步)推送值 1、2、3,并在订阅调用后一秒后推送值 4,然后完成:

import { Observable } from 'rxjs';

const observable = new Observable((subscriber) => {
	subscriber.next(1);
	subscriber.next(2);
	subscriber.next(3);
	setTimeout(() => {
		subscriber.next(4);
		subscriber.complete();
	}, 1000)
});

要调用 Observable 并查看这些值,我们需要订阅它:

import { Observable } from 'rxjs';

const observable = new Observable((subscriber) => {
	subscriber.next(1);
	subscriber.next(2);
	subscriber.next(3);
	setTimeout(() => {
		subscriber.next(4);
		subscriber.complete();
	}, 1000);
});

console.log('just before subscribe');

observable.subscribe(() => {
	next(x) {
		console.log('got value ' + x);
	},
	error(err) {
		console.error('something wrong occurred: ' + err);
	},
	complete() {
		console.log('done');
	}
});

console.log('just after subscribe');

// Logs:
// just before subscribe
// got value 1
// got value 2
// got value 3
// just after subscribe
// got value 4
// done

Pull & Push#

Pull 和 Push 是两种不同的协议,描述了数据生产者如何与数据消费者进行通信。

Pull: 消费者决定何时从数据生产者那里接收数据。Producer 本身并不知道数据何时会交付给 Consumer。

每个 Javascript 函数都是一个 Pull 系统。该函数就是数据的生产者,调用该函数的代码通过从其调用中 pull 单个返回值来使用它。

ES2015 引入了生成函数和迭代器 (function*),这是另一种 Pull 系统,调用 iterator.next() 的代码消费者,从迭代器(生产者)中 pull 出多个值。

ProducerConsumer
Pull被动:在请求时产生数据主动:决定何时请求数据
Push主动:按照自己的节奏生成数据被动:对接收到的数据做出反应。

Push: 生产者决定何时向消费者推送数据,消费者不知道何时会收到该数据。

Promises 是当今 Javascript 中最常见的推送系统类型。 Promise (生产者) 向已经注册的回调(消费者)传递已解析的值,但与函数不同的是,Promise 负责准确确定何时将值 “推送” 到回调。

RxJS 引入了 Observable, 一种新的 Javascript 推送系统。Observable 是多个值的生产者,将它们 “推送” 给观察者(消费者)。

  • 函数是一种惰性求值计算,它在调用时同步返回单个值。
  • 生成器是一种延迟评估的计算,它将迭代时同步返回零到(可能)无限值。
  • Promise 是一种可能(或可能不会)最终返回单个值的计算。
  • Observable 是一种惰性求值计算,从它被调用开始,它可以同步或异步返回零到(可能)无限值。

Observables 作为函数的归纳#

与流行的说法相反,Observables 既不像 EventEmitter, 也不像 Promises 的多值。Observables 在某些情况下可能表现的像 EventEmitters, 即当它们使用 RxJS Subject 进行多播时,但通常它们表现得不像 EventEmitters。

Observables 就像没有参数的函数,但推论出多个值。

function foo() {
	console.log('Hello');
	return 42;
}

const x = foo.call();
console.log(x);
const y = foo.call();
console.log(y);

// Logs:
// "Hello"
// 42
// "Hello"
// 42

使用 Observables:

import { Observable } from 'rxjs';

const foo = new Observable((subscriber) => {
	console.log('Hello');
	subscriber.next(42);
});

foo.subscribe((x) => {
	console.log(x);
})
foo.subscribe((y) => {
	console.log(y);
});

// Logs:
// "Hello"
// 42
// "Hello"
// 42

因为函数和 Observables 都是惰性计算,如果你不调用函数,console.log('Hello')是不会发生的。同样对于 Observables, 如果你不订阅它 (调用 subscribe), console.log('Hello')就不会发生。另外,“调用” 和 “订阅” 是一个孤立的操作:两个函数调用触发两个独立的副作用,两个 Observable 订阅触发两个独立的副作用。与 EventEmitters 不同,EventEmitters 具有共同的副作用并且不管订阅者是否存在都急于执行, Observables 没有共享执行且是惰性的。

订阅 Observable 类似于调用函数

有些人声称 Observables 是异步的。那不是真的,如果你用日志包围一个函数调用,就像这样:

console.log('before');
console.log(foo.call());
console.log('after');

// Logs:
// "before"
// "Hello"
// 42
// "after"

这与 Observables 行为相同:

console.log('before');
foo.subscribe((x) => {
	console.log(x);
});
console.log('after');

// Logs:
// "before"
// "Hello"
// 42
// "after"

这证明 foo 的订阅完全是同步的,就像一个函数一样。

Observables 能够同步或异步传递值。

那 Observable 和函数有什么不同呢?随着时间的推移,Observables 可以 “返回” 多个值,这个是函数做不到的。

import { Observable } from 'rxjs';

const foo = new Observable((subscriber) => {
	console.log('Hello');
	subscriber.next(42);
	subscriber.next(100);
	subscriber.next(200);
});

console.log('before');
foo.subscribe((x) => {
	console.log(x);
});
console.log('after');

// Logs:
// "before"
// "Hello"
// 42
// 100
// 200
// "after"

也可以异步返回值:

import { Observable } from 'rxjs';

const foo = new Observable((subscriber) => {
	console.log('Hello');
	subscriber.next(42);
	subscriber.next(100);
	subscriber.next(200);
	setTimeout(() => {
		subscriber.next(300); // 异步输出
	}, 1000)
});

console.log('before');
foo.subscribe((x) => {
	console.log(x);
});
console.log('after');

// Logs:
// "before"
// "Hello"
// 42
// 100
// 200
// "after"
// 300

结论:

  • func.call() 表示 “同步给我一个值”
  • observable.subscribe() 表示 “给我任意数量的值,同步或者异步”

Observable 剖析#

observable 是用 new Observable 或其他创建运算符创建的,通过观察者订阅,执行以向观察者传递 next/error/complete 通知,并且它们的执行可能会被释放。这四个方面都编码在一个 Observable 实例中,但其中某一些方面与其他类型相关,例如 Observer 和 Subscription。

核心 observable 关注点:

  • Creating 创建 Observable
  • Subscribing 订阅 Observable
  • Executing 执行 Observable
  • Disposing 处理 Observable

创建 Observables#

Observable 构造函数接受一个参数:subscribe 函数。

下面的示例创建了一个 Observable 以每秒向订阅者发送字符串 “hi”。

import { Observable } from 'rxjs';

const observable = new Observable(function subscribe(subscriber) {
	setInterval(() => {
		subscriber.next('hi');
	}, 1000);
});

可以使用 new Observable 创建 Observable。最常见的还可以使用创建函数来创建,例如 of、from、interval 等。

在上面的例子中,subscribe 函数是描述 Observable 最重要的部分。让我们看看订阅意味着什么。

订阅 Observables#

示例中的 Observable 可以被这样订阅:

observable.subscribe((x) => console.log(x));

new Observable(function subscribe(subscriber) {...}) 中的 observable.subscribesubscribe 同名并非巧合。在库中,它们是不同的,但出于实际目的,可以认为它们在概念上是相同的。

这显示了如何不在同一 Observable 的多个观察者之间共享 subscribe 调用。当用观察者调用 observable.subscribe 时,new Observable(function subscribe(subscriber) {...}) 中的函数 subscribe 会为该订阅者运行。每次调用 observable.subscribe 都会为给定的订阅者触发其自己的独立设置。

订阅一个 Observable 就像调用一个函数,在数据将被传送到的地方提供回调。

这与 addEventListener /removeEventListener 等处理事件程序 API 截然不同。使用 observable.subscribe 时,给定的 Observer 未在 Observable 中注册为监听器。Observable 甚至不维护附加的观察者列表。

执行 Observables#

new Observable(function subscribe(subscriber) {...}) 中的代码代表一个”Observable 执行 “,一个只对每个订阅的 Observer 发生的惰性计算。随着时间的推移,执行会同步或异步地产生多个值。

Observable 执行可以传递三种类型的值:

  • “Next” 下一个通知:发送一个值,如数字、字符串、对象等。
  • “Error” 错误通知:发送 Javascript 错误或异常。
  • “Complete” 完成通知:不发送值。

“Next” 是最常见和最重要的类型:它们代表传递给订阅者的实际数据。“Error” 和 “Complete” 通知在 Observable 执行期间可能只发生一次,并且只能有其中一种。

这些约束在 Observable Grammar 或 Contract 中表达的很好,写成正则表达式:

next*(error|complete)?

在 Observable 执行中,可以传递零到无限的 Next 通知。如果传递了错误或完成通知,则之后无法传递任何其他内容。

以下是 Observable 执行的示例,它传递三个 Next 通知,然后完成:

import { Observable } from 'rxjs';

const observable = new Observable(function subscribe(subscriber) {
  subscriber.next(1);
  subscriber.next(2);
  subscriber.next(3);
  subscriber.complete();
});

Observable 严格遵守 Observable Contract, 因此以下代码不会传递 Next 通知 4:

import { Observable } from 'rxjs';

const observable = new Observable(function subscribe(subscriber) {
  subscriber.next(1);
  subscriber.next(2);
  subscriber.next(3);
  subscriber.complete();
  subscriber.next(4); // 不会发出通知
});

最好用 try /catch 块将 subscribe 中的所有代码包装起来,如果它能捕获到异常,它将发送错误通知:

import { Observable } from 'rxjs';
 
const observable = new Observable(function subscribe(subscriber) {
  try {
    subscriber.next(1);
    subscriber.next(2);
    subscriber.next(3);
    subscriber.complete();
  } catch (err) {
    subscriber.error(err); // 如果捕获到一个错误,则传递一个错误
  }
});

处理 Observable 的执行#

因为 Observable 的执行可能是无限的,并且观察者想要在有限时间内终止执行是很常见的,所以我们需要一个 API 来取消执行。由于每次执行仅由一个 Observer 独占,一旦 Observer 完成接收值,它必须有一种方法来停止执行,以避免浪费计算能力或内存资源。

observable.subscribe 被调用时,Observer 会附加到新创建的 Observable 执行中。此调用会返回一个对象 Subscription。

const subscription = observable.subscribe((x) => console.log(x));

订阅代表正在进行的执行,并且有一个最小的 API,允许你取消执行。

import { from } from 'rxjs';

const observable = from([10, 20, 30]);
const subscription = observable.subscribe((x) => console.log(x));
// Later
subscription.unsubscribe();

当你订阅时,你会得到一个 subscription,它代表正在进行的 Observable 执行。只需调用 unsubscribe () 即可取消执行。

当我们使用 create() 创建 Observable 时,每个 Observable 都必须定义如何处理该执行的资源。您可以通过从 function subscribe() 中返回一个自定义的 unsubscribe 函数来做到这一点。

import { Observable } from 'rxjs';

const observable = new Observable(function subscribe(subscriber) {
	const intervalId = setInterval(() => {
		subscriber.next('hi');
	}, 1000);

	return function unsubscribe() {
		clearInterval(intervalId);
	}
});

就像 observable.subscribe 类似于 new Observable(function subscribe() {...}) 一样,我们从 subscribe 返回的 unsubscribe 在概念上等于 subscription.unsubscribe 。事实上,如果我们删除围绕这些概念的 ReactiveX 类型,我们就只剩下相当简单的 Javascript。

function subscribe(subscriber) {
	const intervalId = setInterval(() => {
    subscriber.next('hi');
  }, 1000);
 
  return function unsubscribe() {
    clearInterval(intervalId);
  };
}

const unsubscribe = subscribe({ next: (x) => console.log(x) });

// Later
unsubscribe();

我们使用 Observable、Observer 和 Subscription 等这些 Rx 类型的原因其实就是为了获得安全性以及与 Operators 的组合性。

加载中...
此文章数据所有权由区块链加密技术和智能合约保障仅归创作者所有。