banner
Tomorrow

Tomorrow

不骄不躁
twitter

RxJS —— Observable 可观察对象

Observables 是多个值的惰性推送的集合,他们填补了下表中的缺失点:

SingleMultiple
PullFunctionIterator
PushPromiseObservable

下面是一个 Observable,它在订阅时立即(同步)推送值 1、2、3,并在订阅调用后一秒后推送值 4,然后完成:

import { Observable } from 'rxjs';

const observable = new Observable((subscriber) => {
	subscriber.next(1);
	subscriber.next(2);
	subscriber.next(3);
	setTimeout(() => {
		subscriber.next(4);
		subscriber.complete();
	}, 1000)
});

要调用 Observable 并查看这些值,我们需要订阅它:

import { Observable } from 'rxjs';

const observable = new Observable((subscriber) => {
	subscriber.next(1);
	subscriber.next(2);
	subscriber.next(3);
	setTimeout(() => {
		subscriber.next(4);
		subscriber.complete();
	}, 1000);
});

console.log('just before subscribe');

observable.subscribe(() => {
	next(x) {
		console.log('got value ' + x);
	},
	error(err) {
		console.error('something wrong occurred: ' + err);
	},
	complete() {
		console.log('done');
	}
});

console.log('just after subscribe');

// Logs:
// just before subscribe
// got value 1
// got value 2
// got value 3
// just after subscribe
// got value 4
// done

Pull & Push#

Pull 和 Push 是两种不同的协议,描述了数据生产者如何与数据消费者进行通信。

Pull: 消费者决定何时从数据生产者那里接收数据。Producer 本身并不知道数据何时会交付给 Consumer。

每个 Javascript 函数都是一个 Pull 系统。该函数就是数据的生产者,调用该函数的代码通过从其调用中 pull 单个返回值来使用它。

ES2015 引入了生成函数和迭代器 (function*),这是另一种 Pull 系统,调用 iterator.next() 的代码消费者,从迭代器(生产者)中 pull 出多个值。

ProducerConsumer
Pull被动:在请求时产生数据主动:决定何时请求数据
Push主动:按照自己的节奏生成数据被动:对接收到的数据做出反应。

Push: 生产者决定何时向消费者推送数据,消费者不知道何时会收到该数据。

Promises 是当今 Javascript 中最常见的推送系统类型。 Promise (生产者) 向已经注册的回调(消费者)传递已解析的值,但与函数不同的是,Promise 负责准确确定何时将值 “推送” 到回调。

RxJS 引入了 Observable, 一种新的 Javascript 推送系统。Observable 是多个值的生产者,将它们 “推送” 给观察者(消费者)。

  • 函数是一种惰性求值计算,它在调用时同步返回单个值。
  • 生成器是一种延迟评估的计算,它将迭代时同步返回零到(可能)无限值。
  • Promise 是一种可能(或可能不会)最终返回单个值的计算。
  • Observable 是一种惰性求值计算,从它被调用开始,它可以同步或异步返回零到(可能)无限值。

Observables 作为函数的归纳#

与流行的说法相反,Observables 既不像 EventEmitter, 也不像 Promises 的多值。Observables 在某些情况下可能表现的像 EventEmitters, 即当它们使用 RxJS Subject 进行多播时,但通常它们表现得不像 EventEmitters。

Observables 就像没有参数的函数,但推论出多个值。

function foo() {
	console.log('Hello');
	return 42;
}

const x = foo.call();
console.log(x);
const y = foo.call();
console.log(y);

// Logs:
// "Hello"
// 42
// "Hello"
// 42

使用 Observables:

import { Observable } from 'rxjs';

const foo = new Observable((subscriber) => {
	console.log('Hello');
	subscriber.next(42);
});

foo.subscribe((x) => {
	console.log(x);
})
foo.subscribe((y) => {
	console.log(y);
});

// Logs:
// "Hello"
// 42
// "Hello"
// 42

因为函数和 Observables 都是惰性计算,如果你不调用函数,console.log('Hello')是不会发生的。同样对于 Observables, 如果你不订阅它 (调用 subscribe), console.log('Hello')就不会发生。另外,“调用” 和 “订阅” 是一个孤立的操作:两个函数调用触发两个独立的副作用,两个 Observable 订阅触发两个独立的副作用。与 EventEmitters 不同,EventEmitters 具有共同的副作用并且不管订阅者是否存在都急于执行, Observables 没有共享执行且是惰性的。

订阅 Observable 类似于调用函数

有些人声称 Observables 是异步的。那不是真的,如果你用日志包围一个函数调用,就像这样:

console.log('before');
console.log(foo.call());
console.log('after');

// Logs:
// "before"
// "Hello"
// 42
// "after"

这与 Observables 行为相同:

console.log('before');
foo.subscribe((x) => {
	console.log(x);
});
console.log('after');

// Logs:
// "before"
// "Hello"
// 42
// "after"

这证明 foo 的订阅完全是同步的,就像一个函数一样。

Observables 能够同步或异步传递值。

那 Observable 和函数有什么不同呢?随着时间的推移,Observables 可以 “返回” 多个值,这个是函数做不到的。

import { Observable } from 'rxjs';

const foo = new Observable((subscriber) => {
	console.log('Hello');
	subscriber.next(42);
	subscriber.next(100);
	subscriber.next(200);
});

console.log('before');
foo.subscribe((x) => {
	console.log(x);
});
console.log('after');

// Logs:
// "before"
// "Hello"
// 42
// 100
// 200
// "after"

也可以异步返回值:

import { Observable } from 'rxjs';

const foo = new Observable((subscriber) => {
	console.log('Hello');
	subscriber.next(42);
	subscriber.next(100);
	subscriber.next(200);
	setTimeout(() => {
		subscriber.next(300); // 异步输出
	}, 1000)
});

console.log('before');
foo.subscribe((x) => {
	console.log(x);
});
console.log('after');

// Logs:
// "before"
// "Hello"
// 42
// 100
// 200
// "after"
// 300

结论:

  • func.call() 表示 “同步给我一个值”
  • observable.subscribe() 表示 “给我任意数量的值,同步或者异步”

Observable 剖析#

observable 是用 new Observable 或其他创建运算符创建的,通过观察者订阅,执行以向观察者传递 next/error/complete 通知,并且它们的执行可能会被释放。这四个方面都编码在一个 Observable 实例中,但其中某一些方面与其他类型相关,例如 Observer 和 Subscription。

核心 observable 关注点:

  • Creating 创建 Observable
  • Subscribing 订阅 Observable
  • Executing 执行 Observable
  • Disposing 处理 Observable

创建 Observables#

Observable 构造函数接受一个参数:subscribe 函数。

下面的示例创建了一个 Observable 以每秒向订阅者发送字符串 “hi”。

import { Observable } from 'rxjs';

const observable = new Observable(function subscribe(subscriber) {
	setInterval(() => {
		subscriber.next('hi');
	}, 1000);
});

可以使用 new Observable 创建 Observable。最常见的还可以使用创建函数来创建,例如 of、from、interval 等。

在上面的例子中,subscribe 函数是描述 Observable 最重要的部分。让我们看看订阅意味着什么。

订阅 Observables#

示例中的 Observable 可以被这样订阅:

observable.subscribe((x) => console.log(x));

new Observable(function subscribe(subscriber) {...}) 中的 observable.subscribesubscribe 同名并非巧合。在库中,它们是不同的,但出于实际目的,可以认为它们在概念上是相同的。

这显示了如何不在同一 Observable 的多个观察者之间共享 subscribe 调用。当用观察者调用 observable.subscribe 时,new Observable(function subscribe(subscriber) {...}) 中的函数 subscribe 会为该订阅者运行。每次调用 observable.subscribe 都会为给定的订阅者触发其自己的独立设置。

订阅一个 Observable 就像调用一个函数,在数据将被传送到的地方提供回调。

这与 addEventListener /removeEventListener 等处理事件程序 API 截然不同。使用 observable.subscribe 时,给定的 Observer 未在 Observable 中注册为监听器。Observable 甚至不维护附加的观察者列表。

执行 Observables#

new Observable(function subscribe(subscriber) {...}) 中的代码代表一个”Observable 执行 “,一个只对每个订阅的 Observer 发生的惰性计算。随着时间的推移,执行会同步或异步地产生多个值。

Observable 执行可以传递三种类型的值:

  • “Next” 下一个通知:发送一个值,如数字、字符串、对象等。
  • “Error” 错误通知:发送 Javascript 错误或异常。
  • “Complete” 完成通知:不发送值。

“Next” 是最常见和最重要的类型:它们代表传递给订阅者的实际数据。“Error” 和 “Complete” 通知在 Observable 执行期间可能只发生一次,并且只能有其中一种。

这些约束在 Observable Grammar 或 Contract 中表达的很好,写成正则表达式:

next*(error|complete)?

在 Observable 执行中,可以传递零到无限的 Next 通知。如果传递了错误或完成通知,则之后无法传递任何其他内容。

以下是 Observable 执行的示例,它传递三个 Next 通知,然后完成:

import { Observable } from 'rxjs';

const observable = new Observable(function subscribe(subscriber) {
  subscriber.next(1);
  subscriber.next(2);
  subscriber.next(3);
  subscriber.complete();
});

Observable 严格遵守 Observable Contract, 因此以下代码不会传递 Next 通知 4:

import { Observable } from 'rxjs';

const observable = new Observable(function subscribe(subscriber) {
  subscriber.next(1);
  subscriber.next(2);
  subscriber.next(3);
  subscriber.complete();
  subscriber.next(4); // 不会发出通知
});

最好用 try /catch 块将 subscribe 中的所有代码包装起来,如果它能捕获到异常,它将发送错误通知:

import { Observable } from 'rxjs';
 
const observable = new Observable(function subscribe(subscriber) {
  try {
    subscriber.next(1);
    subscriber.next(2);
    subscriber.next(3);
    subscriber.complete();
  } catch (err) {
    subscriber.error(err); // 如果捕获到一个错误,则传递一个错误
  }
});

处理 Observable 的执行#

因为 Observable 的执行可能是无限的,并且观察者想要在有限时间内终止执行是很常见的,所以我们需要一个 API 来取消执行。由于每次执行仅由一个 Observer 独占,一旦 Observer 完成接收值,它必须有一种方法来停止执行,以避免浪费计算能力或内存资源。

observable.subscribe 被调用时,Observer 会附加到新创建的 Observable 执行中。此调用会返回一个对象 Subscription。

const subscription = observable.subscribe((x) => console.log(x));

订阅代表正在进行的执行,并且有一个最小的 API,允许你取消执行。

import { from } from 'rxjs';

const observable = from([10, 20, 30]);
const subscription = observable.subscribe((x) => console.log(x));
// Later
subscription.unsubscribe();

当你订阅时,你会得到一个 subscription,它代表正在进行的 Observable 执行。只需调用 unsubscribe () 即可取消执行。

当我们使用 create() 创建 Observable 时,每个 Observable 都必须定义如何处理该执行的资源。您可以通过从 function subscribe() 中返回一个自定义的 unsubscribe 函数来做到这一点。

import { Observable } from 'rxjs';

const observable = new Observable(function subscribe(subscriber) {
	const intervalId = setInterval(() => {
		subscriber.next('hi');
	}, 1000);

	return function unsubscribe() {
		clearInterval(intervalId);
	}
});

就像 observable.subscribe 类似于 new Observable(function subscribe() {...}) 一样,我们从 subscribe 返回的 unsubscribe 在概念上等于 subscription.unsubscribe 。事实上,如果我们删除围绕这些概念的 ReactiveX 类型,我们就只剩下相当简单的 Javascript。

function subscribe(subscriber) {
	const intervalId = setInterval(() => {
    subscriber.next('hi');
  }, 1000);
 
  return function unsubscribe() {
    clearInterval(intervalId);
  };
}

const unsubscribe = subscribe({ next: (x) => console.log(x) });

// Later
unsubscribe();

我们使用 Observable、Observer 和 Subscription 等这些 Rx 类型的原因其实就是为了获得安全性以及与 Operators 的组合性。

Loading...
Ownership of this post data is guaranteed by blockchain and smart contracts to the creator alone.