banner
Tomorrow

Tomorrow

不骄不躁
twitter

RxJS —— Subjects 主題

RxJS 中、Subjectは特別な Observable の一種であり、複数の観測者が同時に購読できるため、データを複数のコンシューマにマルチキャストすることができます。Subjects は非常に強力なツールであり、さまざまなシナリオで使用できます。例えば:

  • 複数のコンポーネント間でデータを共有する
  • 1 つのコンポーネントで Observable を購読し、別のコンポーネントでその Observable を使用する
  • 特定の Observable の購読をキャンセルする必要がある場合

したがって、Subjects を学ぶことで、RxJS の理解と使用が向上し、コーディングの効率とコードの品質が向上します。

各 Subject は観測者オブジェクトです

Subject を与えられた場合、それに購読し、オブザーバを提供すると、通常の単一の Observable からの実行か、Subject からの実行かをオブザーバの視点から判断することはできません。

Subject 内部では、subscribe は新しい実行を渡すことはありません。それは単に与えられたオブザーバをオブザーバリストに登録するだけであり、他のライブラリや言語で通常行われる addListener のような方法です。

各 Subject はオブザーバです

それは next(v)error(e)complete() のメソッドを持つオブジェクトです。Subject に新しい値を渡すには、単に next(value) を呼び出すだけで、それは登録された Subject のオブザーバにマルチキャストされます。

import { Subject } from "rxjs";

const subject = new Subject<number>();

subject.subscribe({
  next: (v) => console.log(`observer A: ${v}`),
});

subject.subscribe({
  next: (v) => console.log(`observer B: ${v}`),
});

subject.next(1);
subject.next(2);

// ログ:
// observerA: 1
// observerB: 1
// observerA: 2
// observerB: 2

Subject は Observer であるため、Subject を任意の Observable の購読パラメータとして提供することもできます。

import { Subject, from } from "rxjs";

const subject = new Subject<number>();

subject.subscribe({
  next: (v) => console.log(`observerA: ${v}`),
});

subject.subscribe({
  next: (v) => console.log(`observerB: ${v}`),
});

const observable = from([1, 2, 3]);

observable.subscribe(subject);

// ログ:
// observerA: 1
// observerB: 1
// observerA: 2
// observerB: 2
// observerA: 3
// observerB: 3

また、BehaviorSubject、ReplaySubject、AsyncSubject など、いくつかの特殊な Subject の種類もあります。

マルチキャストされた Observable#

マルチキャストされた Observable は、複数の観測者が同じ Observable の実行を見るために Subject を内部で使用します。

multicast 演算子は v8 バージョンで削除されます。

import { from, Subject, multicast } from "rxjs";

const source = from([1, 2, 3]);
const subject = new Subject<number>();
const multicasted = source.pipe(multicast(subject));

// 内部: `source.subscribe({ ... })`
multicasted.subscribe({
  next: (v) => console.log(`observerA: ${v}`),
});

multicasted.subscribe({
  next: (v) => console.log(`observerB: ${v}`),
});

// 内部: `source.subscribe(subject)`
multicasted.connect();

// ログ:
// observerA: 1
// observerB: 1
// observerA: 2
// observerB: 2
// observerA: 3
// observerB: 3

BehaviorSubject#

Subject のバリエーションの 1 つであり、"現在の値" の概念を持っています。それは、それに送信された最新の値を消費者に保存し、新しいオブザーバが購読するたびに BehaviorSubject から "現在の値" を即座に受け取ります。

以下の例では、BehaviorSubject は最初の Observer が購読する際に受け取る値 0 で初期化され、2 番目の Observer は値 2 を受け取ります。これは、値 2 を送信した後に購読しても同様です。

import { BehaviorSubject } from "rxjs";

const subject = new BehaviorSubject(0); // 0 は初期値です

subject.subscribe({
  next: (v) => console.log(`observerA: ${v}`),
});

subject.next(1);
subject.next(2);

subject.subscribe({
  next: (v) => console.log(`observerB: ${v}`),
});

subject.next(3);

// ログ:
// observerA: 0
// observerA: 1
// observerA: 2
// observerB: 2
// observerA: 3
// observerB: 3

ReplaySubject#

ReplaySubject は BehaviorSubject と似ていますが、古い値を購読者に送信するだけでなく、Observable の実行の一部を記録することもできます。

import { ReplaySubject } from "rxjs";

const subject = new ReplaySubject(3); // 新しい購読者に最新の 3 つの古い値をキャッシュします

subject.subscribe({
  next: (v) => console.log(`observerA: ${v}`),
});

subject.next(1);
subject.next(2);
subject.next(3);
subject.next(4);

subject.subscribe({
  next: (v) => console.log(`observerB: ${v}`),
});

subject.next(5);

// ログ:
// observerA: 1
// observerA: 2
// observerA: 3
// observerA: 4
// observerB: 2
// observerB: 3
// observerB: 4
// observerA: 5
// observerB: 5

バッファーサイズのパラメータの他に、値の存在時間をミリ秒単位で指定するウィンドウ時間を指定することもできます。

import { ReplaySubject } from "rxjs";

const subject = new ReplaySubject(100, 500 /* windowTime */);

subject.subscribe({
  next: (v) => console.log(`observerA: ${v}`),
});

let i = 1;
setInterval(() => subject.next(i++), 200);

setTimeout(() => {
  subject.subscribe({
    next: (v) => console.log(`observerB: ${v}`),
  });
}, 1000);

// ログ
// observerA: 1
// observerA: 2
// observerA: 3
// observerA: 4
// observerA: 5
// observerB: 3
// observerB: 4
// observerB: 5
// observerA: 6
// observerB: 6
// ...

AsyncSubject#

AsyncSubject は Observable の実行の最後の値のみをその観測者に送信し、実行が完了したときにのみ送信します。

import { AsyncSubject } from 'rxjs';

const subject = new AsyncSubject();

subject.subscribe({
 next: (v) => console.log(`observerA: ${v}`),
});

subject.next(1);
subject.next(2);
subject.next(3);
subject.next(4);

subject.subscribe({
 next: (v) => console.log(`observerB: ${v}`);
});

subject.next(5);
subject.complete();

// ログ:
// observerA: 5
// observerB: 5

Void subject#

値の発行は重要ではなく、イベント自体が重要な場合があります

const subject = new Subject<string>();
setTimeout(() => subject.next("dummy"), 1000);

void subject を宣言することで、値の重要性よりもイベント自体の重要性を示します。

const subject = new Subject<void>();

subject.subscribe({
  next: () => console.log(`One second has passed`),
});

setTimeout(() => subject.next(), 1000);
読み込み中...
文章は、創作者によって署名され、ブロックチェーンに安全に保存されています。