banner
Tomorrow

Tomorrow

不骄不躁
twitter

RxJS —— Subjects 主题

In RxJS, Subject is a special type of Observable that allows multiple observers to subscribe to it, enabling data to be multicast to multiple consumers. Subjects are powerful tools that can be used in many scenarios, such as:

  • Sharing data between multiple components
  • Subscribing to an Observable in one component and using that Observable in another component
  • Unsubscribing from an Observable when needed

Therefore, learning about Subjects can help us better understand and use RxJS, improving coding efficiency and code quality.

Every Subject is an observer object

Given a Subject, you can subscribe to it by providing an observer, which will start receiving values normally. From the observer's perspective, it cannot tell whether the execution of the Observable is from a regular unicast Observable or from a Subject.

Inside the Subject, subscribing does not trigger a new execution that passes values. It simply registers the given observer to the list of observers, similar to how addListener typically works in other libraries or languages.

Every Subject is an observable

It is an object with methods next(v), error(e), and complete(). To pass new values to the Subject, simply call next(value), and it will be multicast to the registered observers of the Subject.

import { Subject } from "rxjs";

const subject = new Subject<number>();

subject.subscribe({
  next: (v) => console.log(`observer A: ${v}`),
});

subject.subscribe({
  next: (v) => console.log(`observer B: ${v}`),
});

subject.next(1);
subject.next(2);

// Logs:
// observerA: 1
// observerB: 1
// observerA: 2
// observerB: 2

Since a Subject is an Observer, it means you can provide a Subject as the subscribe parameter for any Observable, as shown below:

import { Subject, from } from "rxjs";

const subject = new Subject<number>();

subject.subscribe({
  next: (v) => console.log(`observerA: ${v}`),
});

subject.subscribe({
  next: (v) => console.log(`observerB: ${v}`),
});

const observable = from([1, 2, 3]);

observable.subscribe(subject);

// Logs:
// observerA: 1
// observerB: 1
// observerA: 2
// observerB: 2
// observerA: 3
// observerB: 3

There are also some specialized types of Subjects: BehaviorSubject, ReplaySubject, and AsyncSubject.

Multicasted Observables#

A multicasted Observable uses a Subject underneath to make multiple observers see the same Observable execution.

The multicast operator will be removed in version 8.

import { from, Subject, multicast } from "rxjs";

const source = from([1, 2, 3]);
const subject = new Subject<number>();
const multicasted = source.pipe(multicast(subject));

// Underlying: `source.subscribe({ ... })`
multicasted.subscribe({
  next: (v) => console.log(`observerA: ${v}`),
});

multicasted.subscribe({
  next: (v) => console.log(`observerB: ${v}`),
});

// Underlying: `source.subscribe(subject)`
multicasted.connect();

// Logs:
// observerA: 1
// observerB: 1
// observerA: 2
// observerB: 2
// observerA: 3
// observerB: 3

BehaviorSubject#

One of the variants of Subject, it has the concept of "current value". It stores the latest value sent to its consumers and immediately receives the "current value" from BehaviorSubject whenever a new observer subscribes.

In the example below, BehaviorSubject is initialized with the value 0, which is received by the first observer when it subscribes. The second observer receives the value 2, even though it subscribes after the value 2 is sent.

import { BehaviorSubject } from "rxjs";

const subject = new BehaviorSubject(0); // 0 is the initial value

subject.subscribe({
  next: (v) => console.log(`observerA: ${v}`),
});

subject.next(1);
subject.next(2);

subject.subscribe({
  next: (v) => console.log(`observerB: ${v}`),
});

subject.next(3);

// Logs:
// observerA: 0
// observerA: 1
// observerA: 2
// observerB: 2
// observerA: 3
// observerB: 3

ReplaySubject#

Similar to BehaviorSubject, ReplaySubject can send old values to subscribers, but it can also record a portion of the Observable execution.

import { ReplaySubject } from "rxjs";

const subject = new ReplaySubject(3); // Cache the latest 3 old values for new subscribers

subject.subscribe({
  next: (v) => console.log(`observerA: ${v}`),
});

subject.next(1);
subject.next(2);
subject.next(3);
subject.next(4);

subject.subscribe({
  next: (v) => console.log(`observerB: ${v}`),
});

subject.next(5);

// Logs:
// observerA: 1
// observerA: 2
// observerA: 3
// observerA: 4
// observerB: 2
// observerB: 3
// observerB: 4
// observerA: 5
// observerB: 5

In addition to the buffer size parameter, you can also specify a window time in milliseconds to determine the existence time of recorded values.

import { ReplaySubject } from "rxjs";

const subject = new ReplaySubject(100, 500 /* windowTime */);

subject.subscribe({
  next: (v) => console.log(`observerA: ${v}`),
});

let i = 1;
setInterval(() => subject.next(i++), 200);

setTimeout(() => {
  subject.subscribe({
    next: (v) => console.log(`observerB: ${v}`),
  });
}, 1000);

// Logs
// observerA: 1
// observerA: 2
// observerA: 3
// observerA: 4
// observerA: 5
// observerB: 3
// observerB: 4
// observerB: 5
// observerA: 6
// observerB: 6
// ...

AsyncSubject#

AsyncSubject only sends the last value of the Observable execution to its observers, and only when the execution is completed.

import { AsyncSubject } from 'rxjs';

const subject = new AsyncSubject();

subject.subscribe({
 next: (v) => console.log(`observerA: ${v}`),
});

subject.next(1);
subject.next(2);
subject.next(3);
subject.next(4);

subject.subscribe({
 next: (v) => console.log(`observerB: ${v}`);
});

subject.next(5);
subject.complete();

// Logs:
// observerA: 5
// observerB: 5

Void subject#

Sometimes, the emitted value is not as important as the fact that an event occurred.

const subject = new Subject<string>();
setTimeout(() => subject.next("dummy"), 1000);

By declaring a void subject, it signifies that the value is not important, and the event itself is more important.

const subject = new Subject<void>();

subject.subscribe({
  next: () => console.log(`One second has passed`),
});

setTimeout(() => subject.next(), 1000);
Loading...
Ownership of this post data is guaranteed by blockchain and smart contracts to the creator alone.