在 RxJS 中,Subject是一種特殊的 Observable,它允許將多個觀察者同時訂閱它,從而可以將數據多播給多個消費者。 Subjects 是一種非常強大的工具,可以在很多場景下使用,例如:
- 在多個組件之間共享數據
- 在一個組件中訂閱一個 Observable,但是又想在另一個組件中使用這個 Observable
- 當需要取消訂閱某個 Observable 時
因此,學習 Subjects 可以讓我們更好地理解和使用 RxJS,提高編碼效率和代碼質量。
每個 Subject 都是一個觀察者對象
給定一個 Subject, 你可以訂閱它,提供一個 observer, 它將開始正常接受值。從觀察者的角度來看,它無法判斷 Observable 的執行是來自普通的單播 Observable,還是來自 Subject。
在 Subject 內部,subscribe 不會調用傳遞值的新執行。它只是將給定的觀察者註冊到觀察者列表中,類似於 addListener 通常在其他庫或語言中的工作方式。
每個 Subject 都是一個觀察者
它是一個具有方法 next(v)
、error(e)
和 complete()
的對象。要向 Subject 傳遞新的值,只需要調用 next(value)
,它就被會多播到註冊監聽的 Subject 的觀察者。
import { Subject } from "rxjs";
const subject = new Subject<number>();
subject.subscribe({
next: (v) => console.log(`observer A: ${v}`),
});
subject.subscribe({
next: (v) => console.log(`observer B: ${v}`),
});
subject.next(1);
subject.next(2);
// Logs:
// observerA: 1
// observerB: 1
// observerA: 2
// observerB: 2
由於 Subject 是一個 Observer, 這也意味著你可以提供一個 Subject 作為任何 Observable 的 subscribe 參數,如下所示:
import { Subject, from } from "rxjs";
const subject = new Subject<number>();
subject.subscribe({
next: (v) => console.log(`observerA: ${v}`),
});
subject.subscribe({
next: (v) => console.log(`observerB: ${v}`),
});
const observable = from([1, 2, 3]);
observable.subscribe(subject);
// Logs:
// observerA: 1
// observerB: 1
// observerA: 2
// observerB: 2
// observerA: 3
// observerB: 3
還有些 Subject 類型的特化: BehaviorSubject、ReplaySubject 和 AsyncSubject。
Multicasted Observables#
多播 Observable 在底層使用 Subject 讓多個觀察者看到相同的 Observable 執行。
multicast 操作符在 v8 版本中將被移除。
import { from, Subject, multicast } from "rxjs";
const source = from([1, 2, 3]);
const subject = new Subject<number>();
const multicasted = source.pipe(multicast(subject));
// 底層:`source.subscribe({ ... })`
multicasted.subscribe({
next: (v) => console.log(`observerA: ${v}`),
});
multicasted.subscribe({
next: (v) => console.log(`observerB: ${v}`),
});
// 底層:`source.subscribe(subject)`
multicasted.connect();
// Logs:
// observerA: 1
// observerB: 1
// observerA: 2
// observerB: 2
// observerA: 3
// observerB: 3
BehaviorSubject#
Subject 的變體之一,它有一個 “當前值” 的概念。它存儲發送給它的消費者的最新值,每當一個新的觀察者訂閱時,它會立即從 BehaviorSubject 接收 “當前值”。
在下面的例子中,BehaviorSubject 被初始化為第一個 Observer 在訂閱時收到的值 0, 第二個觀察者收到值 2, 即使它再發送值 2 之後訂閱。
import { BehaviorSubject } from "rxjs";
const subject = new BehaviorSubject(0); // 0 是初始值
subject.subscribe({
next: (v) => console.log(`observerA: ${v}`),
});
subject.next(1);
subject.next(2);
subject.subscribe({
next: (v) => console.log(`observerB: ${v}`),
});
subject.next(3);
// Logs:
// observerA: 0
// observerA: 1
// observerA: 2
// observerB: 2
// observerA: 3
// observerB: 3
ReplaySubject#
ReplaySubject 和 BehaviorSubject 類似,它可以將舊值發送給訂閱者, 但它也可以記錄 Observable 執行的一部分。
import { ReplaySubject } from "rxjs";
const subject = new ReplaySubject(3); // 給新的訂閱者緩存最新的 3 個舊值
subject.subscribe({
next: (v) => console.log(`observerA: ${v}`),
});
subject.next(1);
subject.next(2);
subject.next(3);
subject.next(4);
subject.subscribe({
next: (v) => console.log(`observerB: ${v}`),
});
subject.next(5);
// Logs:
// observerA: 1
// observerA: 2
// observerA: 3
// observerA: 4
// observerB: 2
// observerB: 3
// observerB: 4
// observerA: 5
// observerB: 5
除了緩衝區大小的參數外,還可以指定一個以毫秒為單位的窗口時間,以確定記錄值的存在時間。
import { ReplaySubject } from "rxjs";
const subject = new ReplaySubject(100, 500 /* windowTime */);
subject.subscribe({
next: (v) => console.log(`observerA: ${v}`),
});
let i = 1;
setInterval(() => subject.next(i++), 200);
setTimeout(() => {
subject.subscribe({
next: (v) => console.log(`observerB: ${v}`),
});
}, 1000);
// Logs
// observerA: 1
// observerA: 2
// observerA: 3
// observerA: 4
// observerA: 5
// observerB: 3
// observerB: 4
// observerB: 5
// observerA: 6
// observerB: 6
// ...
AsyncSubject#
AsyncSubject 僅將 Observable 執行的最後一個值發送給其觀察者,並且僅在執行完成時發送。
import { AsyncSubject } from 'rxjs';
const subject = new AsyncSubject();
subject.subscribe({
next: (v) => console.log(`observerA: ${v}`),
});
subject.next(1);
subject.next(2);
subject.next(3);
subject.next(4);
subject.subscribe({
next: (v) => console.log(`observerB: ${v}`),
});
subject.next(5);
subject.complete();
// Logs:
// observerA: 5
// observerB: 5
Void subject#
有時,發出的值並不像發出值這一事實那麼重要
const subject = new Subject<string>();
setTimeout(() => subject.next("dummy"), 1000);
通過申明一個 void subject, 表示值無關重要,事件本身更重要。
const subject = new Subject<void>();
subject.subscribe({
next: () => console.log(`One second has passed`),
});
setTimeout(() => subject.next(), 1000);