banner
Tomorrow

Tomorrow

不骄不躁
twitter

RxJS —— Subjects 主題

在 RxJS 中,Subject是一種特殊的 Observable,它允許將多個觀察者同時訂閱它,從而可以將數據多播給多個消費者。 Subjects 是一種非常強大的工具,可以在很多場景下使用,例如:

  • 在多個組件之間共享數據
  • 在一個組件中訂閱一個 Observable,但是又想在另一個組件中使用這個 Observable
  • 當需要取消訂閱某個 Observable 時

因此,學習 Subjects 可以讓我們更好地理解和使用 RxJS,提高編碼效率和代碼質量。

每個 Subject 都是一個觀察者對象

給定一個 Subject, 你可以訂閱它,提供一個 observer, 它將開始正常接受值。從觀察者的角度來看,它無法判斷 Observable 的執行是來自普通的單播 Observable,還是來自 Subject。

在 Subject 內部,subscribe 不會調用傳遞值的新執行。它只是將給定的觀察者註冊到觀察者列表中,類似於 addListener 通常在其他庫或語言中的工作方式。

每個 Subject 都是一個觀察者

它是一個具有方法 next(v)error(e)complete() 的對象。要向 Subject 傳遞新的值,只需要調用 next(value) ,它就被會多播到註冊監聽的 Subject 的觀察者。

import { Subject } from "rxjs";

const subject = new Subject<number>();

subject.subscribe({
  next: (v) => console.log(`observer A: ${v}`),
});

subject.subscribe({
  next: (v) => console.log(`observer B: ${v}`),
});

subject.next(1);
subject.next(2);

// Logs:
// observerA: 1
// observerB: 1
// observerA: 2
// observerB: 2

由於 Subject 是一個 Observer, 這也意味著你可以提供一個 Subject 作為任何 Observable 的 subscribe 參數,如下所示:

import { Subject, from } from "rxjs";

const subject = new Subject<number>();

subject.subscribe({
  next: (v) => console.log(`observerA: ${v}`),
});

subject.subscribe({
  next: (v) => console.log(`observerB: ${v}`),
});

const observable = from([1, 2, 3]);

observable.subscribe(subject);

// Logs:
// observerA: 1
// observerB: 1
// observerA: 2
// observerB: 2
// observerA: 3
// observerB: 3

還有些 Subject 類型的特化: BehaviorSubject、ReplaySubject 和 AsyncSubject。

Multicasted Observables#

多播 Observable 在底層使用 Subject 讓多個觀察者看到相同的 Observable 執行。

multicast 操作符在 v8 版本中將被移除。

import { from, Subject, multicast } from "rxjs";

const source = from([1, 2, 3]);
const subject = new Subject<number>();
const multicasted = source.pipe(multicast(subject));

// 底層:`source.subscribe({ ... })`
multicasted.subscribe({
  next: (v) => console.log(`observerA: ${v}`),
});

multicasted.subscribe({
  next: (v) => console.log(`observerB: ${v}`),
});

// 底層:`source.subscribe(subject)`
multicasted.connect();

// Logs:
// observerA: 1
// observerB: 1
// observerA: 2
// observerB: 2
// observerA: 3
// observerB: 3

BehaviorSubject#

Subject 的變體之一,它有一個 “當前值” 的概念。它存儲發送給它的消費者的最新值,每當一個新的觀察者訂閱時,它會立即從 BehaviorSubject 接收 “當前值”。

在下面的例子中,BehaviorSubject 被初始化為第一個 Observer 在訂閱時收到的值 0, 第二個觀察者收到值 2, 即使它再發送值 2 之後訂閱。

import { BehaviorSubject } from "rxjs";

const subject = new BehaviorSubject(0); // 0 是初始值

subject.subscribe({
  next: (v) => console.log(`observerA: ${v}`),
});

subject.next(1);
subject.next(2);

subject.subscribe({
  next: (v) => console.log(`observerB: ${v}`),
});

subject.next(3);

// Logs:
// observerA: 0
// observerA: 1
// observerA: 2
// observerB: 2
// observerA: 3
// observerB: 3

ReplaySubject#

ReplaySubject 和 BehaviorSubject 類似,它可以將舊值發送給訂閱者, 但它也可以記錄 Observable 執行的一部分。

import { ReplaySubject } from "rxjs";

const subject = new ReplaySubject(3); // 給新的訂閱者緩存最新的 3 個舊值

subject.subscribe({
  next: (v) => console.log(`observerA: ${v}`),
});

subject.next(1);
subject.next(2);
subject.next(3);
subject.next(4);

subject.subscribe({
  next: (v) => console.log(`observerB: ${v}`),
});

subject.next(5);

// Logs:
// observerA: 1
// observerA: 2
// observerA: 3
// observerA: 4
// observerB: 2
// observerB: 3
// observerB: 4
// observerA: 5
// observerB: 5

除了緩衝區大小的參數外,還可以指定一個以毫秒為單位的窗口時間,以確定記錄值的存在時間。

import { ReplaySubject } from "rxjs";

const subject = new ReplaySubject(100, 500 /* windowTime */);

subject.subscribe({
  next: (v) => console.log(`observerA: ${v}`),
});

let i = 1;
setInterval(() => subject.next(i++), 200);

setTimeout(() => {
  subject.subscribe({
    next: (v) => console.log(`observerB: ${v}`),
  });
}, 1000);

// Logs
// observerA: 1
// observerA: 2
// observerA: 3
// observerA: 4
// observerA: 5
// observerB: 3
// observerB: 4
// observerB: 5
// observerA: 6
// observerB: 6
// ...

AsyncSubject#

AsyncSubject 僅將 Observable 執行的最後一個值發送給其觀察者,並且僅在執行完成時發送。

import { AsyncSubject } from 'rxjs';

const subject = new AsyncSubject();

subject.subscribe({
 next: (v) => console.log(`observerA: ${v}`),
});

subject.next(1);
subject.next(2);
subject.next(3);
subject.next(4);

subject.subscribe({
 next: (v) => console.log(`observerB: ${v}`),
});

subject.next(5);
subject.complete();

// Logs:
// observerA: 5
// observerB: 5

Void subject#

有時,發出的值並不像發出值這一事實那麼重要

const subject = new Subject<string>();
setTimeout(() => subject.next("dummy"), 1000);

通過申明一個 void subject, 表示值無關重要,事件本身更重要。

const subject = new Subject<void>();

subject.subscribe({
  next: () => console.log(`One second has passed`),
});

setTimeout(() => subject.next(), 1000);
載入中......
此文章數據所有權由區塊鏈加密技術和智能合約保障僅歸創作者所有。