banner
Tomorrow

Tomorrow

不骄不躁
twitter

RxJS —— Subjects 主题

在 RxJS 中,Subject是一种特殊的 Observable,它允许将多个观察者同时订阅它,从而可以将数据多播给多个消费者。 Subjects 是一种非常强大的工具,可以在很多场景下使用,例如:

  • 在多个组件之间共享数据
  • 在一个组件中订阅一个 Observable,但是又想在另一个组件中使用这个 Observable
  • 当需要取消订阅某个 Observable 时

因此,学习 Subjects 可以让我们更好地理解和使用 RxJS,提高编码效率和代码质量。

每个 Subject 都是一个观察者对象

给定一个 Subject, 你可以订阅它,提供一个 observer, 它将开始正常接受值。从观察者的角度来看,它无法判断 Observable 的执行是来自普通的单播 Observable,还是来自 Subject。

在 Subject 内部,subscribe 不会调用传递值的新执行。它只是将给定的观察者注册到观察者列表中,类似于 addListener 通常在其他库或语言中的工作方式。

每个 Subject 都是一个观察者

它是一个具有方法 next(v)error(e)complete() 的对象。要向 Subject 传递新的值,只需要调用 next(value) ,它就被会多播到注册监听的 Subject 的观察者。

import { Subject } from "rxjs";

const subject = new Subject<number>();

subject.subscribe({
  next: (v) => console.log(`observer A: ${v}`),
});

subject.subscribe({
  next: (v) => console.log(`observer B: ${v}`),
});

subject.next(1);
subject.next(2);

// Logs:
// observerA: 1
// observerB: 1
// observerA: 2
// observerB: 2

由于 Subject 是一个 Observer, 这也意味着你可以提供一个 Subject 作为任何 Observable 的 subscribe 参数,如下所示:

import { Subject, from } from "rxjs";

const subject = new Subject<number>();

subject.subscribe({
  next: (v) => console.log(`observerA: ${v}`),
});

subject.subscribe({
  next: (v) => console.log(`observerB: ${v}`),
});

const observable = from([1, 2, 3]);

observable.subscribe(subject);

// Logs:
// observerA: 1
// observerB: 1
// observerA: 2
// observerB: 2
// observerA: 3
// observerB: 3

还有些 Subject 类型的特化: BehaviorSubject、ReplaySubject 和 AsyncSubject。

Multicasted Observables#

多播 Observable 在底层使用 Subject 让多个观察者看到相同的 Observable 执行。

multicast 操作符在 v8 版本中将被移除。

import { from, Subject, multicast } from "rxjs";

const source = from([1, 2, 3]);
const subject = new Subject<number>();
const multicasted = source.pipe(multicast(subject));

// 底层:`source.subscribe({ ... })`
multicasted.subscribe({
  next: (v) => console.log(`observerA: ${v}`),
});

multicasted.subscribe({
  next: (v) => console.log(`observerB: ${v}`),
});

// 底层:`source.subscribe(subject)`
multicasted.connect();

// Logs:
// observerA: 1
// observerB: 1
// observerA: 2
// observerB: 2
// observerA: 3
// observerB: 3

BehaviorSubject#

Subject 的变体之一,它有一个 “当前值” 的概念。它存储发送给它的消费者的最新值,每当一个新的观察者订阅时,它会立即从 BehaviorSubject 接收 “当前值”。

在下面的例子中,BehaviorSubject 被初始化为第一个 Observer 在订阅时收到的值 0, 第二个观察者收到值 2, 即使它再发送值 2 之后订阅。

import { BehaviorSubject } from "rxjs";

const subject = new BehaviorSubject(0); // 0 是初始值

subject.subscribe({
  next: (v) => console.log(`observerA: ${v}`),
});

subject.next(1);
subject.next(2);

subject.subscribe({
  next: (v) => console.log(`observerB: ${v}`),
});

subject.next(3);

// Logs:
// observerA: 0
// observerA: 1
// observerA: 2
// observerB: 2
// observerA: 3
// observerB: 3

ReplaySubject#

ReplaySubject 和 BehaviorSubject 类似,它可以将旧值发送给订阅者, 但它也可以记录 Observable 执行的一部分。

import { ReplaySubject } from "rxjs";

const subject = new ReplaySubject(3); // 给新的订阅者缓存最新的 3 个旧值

subject.subscribe({
  next: (v) => console.log(`observerA: ${v}`),
});

subject.next(1);
subject.next(2);
subject.next(3);
subject.next(4);

subject.subscribe({
  next: (v) => console.log(`observerB: ${v}`),
});

subject.next(5);

// Logs:
// observerA: 1
// observerA: 2
// observerA: 3
// observerA: 4
// observerB: 2
// observerB: 3
// observerB: 4
// observerA: 5
// observerB: 5

除了缓冲区大小的参数外,还可以指定一个以毫秒为单位的窗口时间,以确定记录值的存在时间。

import { ReplaySubject } from "rxjs";

const subject = new ReplaySubject(100, 500 /* windowTime */);

subject.subscribe({
  next: (v) => console.log(`observerA: ${v}`),
});

let i = 1;
setInterval(() => subject.next(i++), 200);

setTimeout(() => {
  subject.subscribe({
    next: (v) => console.log(`observerB: ${v}`),
  });
}, 1000);

// Logs
// observerA: 1
// observerA: 2
// observerA: 3
// observerA: 4
// observerA: 5
// observerB: 3
// observerB: 4
// observerB: 5
// observerA: 6
// observerB: 6
// ...

AsyncSubject#

AsyncSubject 仅将 Observable 执行的最后一个值发送给其观察者,并且仅在执行完成时发送。

import { AsyncSubject } from 'rxjs';

const subject = new AsyncSubject();

subject.subscribe({
 next: (v) => console.log(`observerA: ${v}`),
});

subject.next(1);
subject.next(2);
subject.next(3);
subject.next(4);

subject.subscribe({
 next: (v) => console.log(`observerB: ${v}`);
});

subject.next(5);
subject.complete();

// Logs:
// observerA: 5
// observerB: 5

Void subject#

有时,发出的值并不像发出值这一事实那么重要

const subject = new Subject<string>();
setTimeout(() => subject.next("dummy"), 1000);

通过申明一个 void subject, 表示值无关重要,事件本身更重要。

const subject = new Subject<void>();

subject.subscribe({
  next: () => console.log(`One second has passed`),
});

setTimeout(() => subject.next(), 1000);
載入中......
此文章數據所有權由區塊鏈加密技術和智能合約保障僅歸創作者所有。