banner
Tomorrow

Tomorrow

不骄不躁
twitter

RxJS —— Observable 観察可能なオブジェクト

Observables は複数の値の遅延プッシュの集合であり、下の表の欠落点を埋めます:

単一複数
プル関数イテレーター
プッシュプロミスObservable

以下は、Observable の例で、購読時に即座に(同期的に)値 1、2、3 をプッシュし、購読呼び出しの 1 秒後に値 4 をプッシュし、その後完了します:

import { Observable } from 'rxjs';

const observable = new Observable((subscriber) => {
	subscriber.next(1);
	subscriber.next(2);
	subscriber.next(3);
	setTimeout(() => {
		subscriber.next(4);
		subscriber.complete();
	}, 1000)
});

Observable を呼び出してこれらの値を確認するには、購読する必要があります:

import { Observable } from 'rxjs';

const observable = new Observable((subscriber) => {
	subscriber.next(1);
	subscriber.next(2);
	subscriber.next(3);
	setTimeout(() => {
		subscriber.next(4);
		subscriber.complete();
	}, 1000);
});

console.log('購読の直前');

observable.subscribe(() => {
	next(x) {
		console.log('値を取得しました ' + x);
	},
	error(err) {
		console.error('何か問題が発生しました: ' + err);
	},
	complete() {
		console.log('完了');
	}
});

console.log('購読の直後');

// ログ:
// 購読の直前
// 値を取得しました 1
// 値を取得しました 2
// 値を取得しました 3
// 購読の直後
// 値を取得しました 4
// 完了

プルとプッシュ#

プルとプッシュは、データプロデューサーがデータコンシューマーと通信する方法を説明する 2 つの異なるプロトコルです。

プル: コンシューマーがデータプロデューサーからデータを受け取るタイミングを決定します。プロデューサー自身は、データがコンシューマーにいつ配信されるかを知りません。

すべての Javascript 関数はプルシステムです。その関数がデータのプロデューサーであり、その関数を呼び出すコードがその呼び出しから単一の戻り値をプルして使用します。

ES2015 では、生成関数とイテレーター(function*)が導入され、これは別のプルシステムであり、iterator.next() を呼び出すコードがイテレーター(プロデューサー)から複数の値をプルします。

プロデューサーコンシューマー
プル受動的:要求時にデータを生成能動的:データを要求するタイミングを決定
プッシュ能動的:自分のペースでデータを生成受動的:受け取ったデータに反応する。

プッシュ: プロデューサーがコンシューマーにデータをプッシュするタイミングを決定し、コンシューマーはそのデータをいつ受け取るかを知りません。

プロミスは、今日の Javascript で最も一般的なプッシュシステムのタイプです。プロミス(プロデューサー)は、すでに登録されたコールバック(コンシューマー)に解決された値を渡しますが、関数とは異なり、プロミスは値をコールバックに「プッシュ」するタイミングを正確に決定する責任があります。

RxJS は Observable を導入しました。これは新しい Javascript プッシュシステムです。Observable は複数の値のプロデューサーであり、それらを観察者(コンシューマー)に「プッシュ」します。

  • 関数は遅延評価計算の一種であり、呼び出し時に単一の値を同期的に返します。
  • ジェネレーターは遅延評価計算の一種であり、イテレーション時に同期的にゼロから(場合によっては)無限の値を返します。
  • プロミスは、最終的に単一の値を返す可能性がある(またはない)計算です。
  • Observable は遅延評価計算の一種であり、呼び出されると、同期的または非同期的にゼロから(場合によっては)無限の値を返すことができます。

関数としての Observables の帰納#

一般的な言説とは反対に、Observables は EventEmitter のようでもなく、プロミスのような複数の値でもありません。Observables は、特定の状況下で EventEmitters のように振る舞うことがありますが、通常は EventEmitters のようには振る舞いません。

Observables は引数のない関数のようですが、複数の値を推論します。

function foo() {
	console.log('こんにちは');
	return 42;
}

const x = foo.call();
console.log(x);
const y = foo.call();
console.log(y);

// ログ:
// "こんにちは"
// 42
// "こんにちは"
// 42

Observables を使用すると:

import { Observable } from 'rxjs';

const foo = new Observable((subscriber) => {
	console.log('こんにちは');
	subscriber.next(42);
});

foo.subscribe((x) => {
	console.log(x);
})
foo.subscribe((y) => {
	console.log(y);
});

// ログ:
// "こんにちは"
// 42
// "こんにちは"
// 42

関数と Observables はどちらも遅延計算であるため、関数を呼び出さなければ、console.log('こんにちは') は発生しません。同様に、Observables に対しても、購読しなければ(subscribe を呼び出さなければ)、console.log('こんにちは') は発生しません。また、「呼び出し」と「購読」は独立した操作です:2 つの関数呼び出しは 2 つの独立した副作用を引き起こし、2 つの Observable 購読は 2 つの独立した副作用を引き起こします。EventEmitters とは異なり、EventEmitters は共通の副作用を持ち、購読者が存在するかどうかにかかわらず急いで実行されますが、Observables は共有実行を持たず、遅延的です。

Observable を購読することは、関数を呼び出すことに似ています。

一部の人々は Observables が非同期であると主張しています。それは本当ではありません。関数呼び出しをログで囲むと、次のようになります:

console.log('前');
console.log(foo.call());
console.log('後');

// ログ:
// "前"
// "こんにちは"
// 42
// "後"

これは Observables の動作と同じです:

console.log('前');
foo.subscribe((x) => {
	console.log(x);
});
console.log('後');

// ログ:
// "前"
// "こんにちは"
// 42
// "後"

これは foo の購読が完全に同期的であることを証明します。関数のように。

Observables は同期または非同期で値を渡すことができます。

では、Observable と関数の違いは何でしょうか?時間の経過とともに、Observables は「複数の値」を「返す」ことができますが、これは関数にはできません。

import { Observable } from 'rxjs';

const foo = new Observable((subscriber) => {
	console.log('こんにちは');
	subscriber.next(42);
	subscriber.next(100);
	subscriber.next(200);
});

console.log('前');
foo.subscribe((x) => {
	console.log(x);
});
console.log('後');

// ログ:
// "前"
// "こんにちは"
// 42
// 100
// 200
// "後"

値を非同期で返すこともできます:

import { Observable } from 'rxjs';

const foo = new Observable((subscriber) => {
	console.log('こんにちは');
	subscriber.next(42);
	subscriber.next(100);
	subscriber.next(200);
	setTimeout(() => {
		subscriber.next(300); // 非同期出力
	}, 1000)
});

console.log('前');
foo.subscribe((x) => {
	console.log(x);
});
console.log('後');

// ログ:
// "前"
// "こんにちは"
// 42
// 100
// 200
// "後"
// 300

結論:

  • func.call() は「同期的に値をください」を意味します。
  • observable.subscribe() は「任意の数の値をください、同期的または非同期的に」を意味します。

Observable の剖析#

observable は new Observable または他の作成演算子を使用して作成され、オブザーバーが購読することによって、next/error/complete 通知をオブザーバーに渡すために実行され、実行は解放される可能性があります。これらの 4 つの側面はすべて Observable インスタンスにエンコードされていますが、その中のいくつかの側面は他のタイプに関連しています。たとえば、Observer と Subscription です。

コアの observable の関心事:

  • 作成 Observable の作成
  • 購読 Observable の購読
  • 実行 Observable の実行
  • 処理 Observable の処理

Observables の作成#

Observable コンストラクタは、1 つの引数を受け取ります:subscribe 関数。

以下の例は、1 秒ごとに購読者に文字列「hi」を送信する Observable を作成します。

import { Observable } from 'rxjs';

const observable = new Observable(function subscribe(subscriber) {
	setInterval(() => {
		subscriber.next('hi');
	}, 1000);
});

new Observable を使用して Observable を作成できます。最も一般的な方法は、of、from、interval などの作成関数を使用することです。

上記の例では、subscribe 関数は Observable を説明する最も重要な部分です。購読が何を意味するのか見てみましょう。

Observables の購読#

例の Observable は次のように購読できます:

observable.subscribe((x) => console.log(x));

new Observable(function subscribe(subscriber) {...})observable.subscribesubscribe は同名ですが、偶然ではありません。ライブラリ内では、それらは異なりますが、実際の目的のために、概念的には同じであると考えることができます。

これは、同じ Observable の複数のオブザーバー間で subscribe 呼び出しを共有しない方法を示しています。オブザーバーで observable.subscribe を呼び出すと、new Observable(function subscribe(subscriber) {...}) の関数 subscribe がその購読者のために実行されます。observable.subscribe を呼び出すたびに、特定の購読者のために独自の独立した設定がトリガーされます。

Observable を購読することは、データが送信される場所にコールバックを提供する関数を呼び出すことに似ています。

これは、addEventListener /removeEventListener などのイベントハンドラ API とは全く異なります。observable.subscribe を使用する場合、特定の Observer は Observable にリスナーとして登録されません。Observable は追加のオブザーバーリストを維持すらしません。

Observables の実行#

new Observable(function subscribe(subscriber) {...}) のコードは「Observable の実行」を表し、各購読のオブザーバーに対してのみ発生する遅延計算です。時間の経過とともに、実行は同期的または非同期的に複数の値を生成します。

Observable の実行は、3 種類の値を渡すことができます:

  • 「次」次の通知:数字、文字列、オブジェクトなどの値を送信します。
  • 「エラー」エラー通知:Javascript エラーまたは例外を送信します。
  • 「完了」完了通知:値を送信しません。

「次」は最も一般的で重要なタイプです:それらは購読者に渡される実際のデータを表します。「エラー」と「完了」通知は、Observable の実行中に一度だけ発生する可能性があり、そのうちの一つだけが存在できます。

これらの制約は、Observable 文法または契約でよく表現され、正規表現として書かれます:

next*(error|complete)?

Observable の実行中に、ゼロから無限の次の通知を渡すことができます。エラーまたは完了通知が渡された場合、その後は他の何も渡すことはできません。

以下は、3 つの次の通知を渡し、その後完了する Observable の実行の例です:

import { Observable } from 'rxjs';

const observable = new Observable(function subscribe(subscriber) {
  subscriber.next(1);
  subscriber.next(2);
  subscriber.next(3);
  subscriber.complete();
});

Observable は Observable 契約を厳密に遵守しているため、以下のコードは次の通知 4 を渡しません:

import { Observable } from 'rxjs';

const observable = new Observable(function subscribe(subscriber) {
  subscriber.next(1);
  subscriber.next(2);
  subscriber.next(3);
  subscriber.complete();
  subscriber.next(4); // 通知は発信されません
});

subscribe 内のすべてのコードを try /catch ブロックでラップすることをお勧めします。例外をキャッチできた場合、エラー通知を送信します:

import { Observable } from 'rxjs';
 
const observable = new Observable(function subscribe(subscriber) {
  try {
    subscriber.next(1);
    subscriber.next(2);
    subscriber.next(3);
    subscriber.complete();
  } catch (err) {
    subscriber.error(err); // エラーをキャッチした場合、エラーを渡します
  }
});

Observable の実行の処理#

Observable の実行は無限である可能性があり、オブザーバーが有限の時間内に実行を終了したいと考えることは非常に一般的です。そのため、実行をキャンセルするための API が必要です。各実行は独占的に 1 つのオブザーバーによってのみ行われるため、オブザーバーが値の受信を完了した場合、計算能力やメモリリソースの無駄を避けるために、実行を停止する方法が必要です。

observable.subscribe が呼び出されると、オブザーバーは新しく作成された Observable 実行に追加されます。この呼び出しは、オブジェクト Subscription を返します。

const subscription = observable.subscribe((x) => console.log(x));

購読は進行中の実行を表し、実行をキャンセルするための最小限の API を持っています。

import { from } from 'rxjs';

const observable = from([10, 20, 30]);
const subscription = observable.subscribe((x) => console.log(x));
// 後で
subscription.unsubscribe();

購読すると、進行中の Observable 実行を表す購読を取得します。unsubscribe () を呼び出すだけで実行をキャンセルできます。

create() を使用して Observable を作成する場合、各 Observable はその実行のリソースを処理する方法を定義する必要があります。function subscribe() からカスタムの unsubscribe 関数を返すことで、これを実現できます。

import { Observable } from 'rxjs';

const observable = new Observable(function subscribe(subscriber) {
	const intervalId = setInterval(() => {
		subscriber.next('hi');
	}, 1000);

	return function unsubscribe() {
		clearInterval(intervalId);
	}
});

observable.subscribenew Observable(function subscribe() {...}) に似ているように、subscribe から返される unsubscribe は概念的には subscription.unsubscribe と等しいです。実際、これらの概念を囲む ReactiveX タイプを削除すると、かなりシンプルな Javascript だけが残ります。

function subscribe(subscriber) {
	const intervalId = setInterval(() => {
    subscriber.next('hi');
  }, 1000);
 
  return function unsubscribe() {
    clearInterval(intervalId);
  };
}

const unsubscribe = subscribe({ next: (x) => console.log(x) });

// 後で
unsubscribe();

Observable、Observer、Subscription などの Rx タイプを使用する理由は、実際には安全性と Operators との組み合わせのためです。

読み込み中...
文章は、創作者によって署名され、ブロックチェーンに安全に保存されています。