banner
Tomorrow

Tomorrow

不骄不躁
twitter

RxJS —— Observable 可觀察對象

Observables 是多個值的惰性推送的集合,他們填補了下表中的缺失點:

SingleMultiple
PullFunctionIterator
PushPromiseObservable

下面是一個 Observable,它在訂閱時立即(同步)推送值 1、2、3,並在訂閱調用後一秒後推送值 4,然後完成:

import { Observable } from 'rxjs';

const observable = new Observable((subscriber) => {
	subscriber.next(1);
	subscriber.next(2);
	subscriber.next(3);
	setTimeout(() => {
		subscriber.next(4);
		subscriber.complete();
	}, 1000)
});

要調用 Observable 並查看這些值,我們需要訂閱它:

import { Observable } from 'rxjs';

const observable = new Observable((subscriber) => {
	subscriber.next(1);
	subscriber.next(2);
	subscriber.next(3);
	setTimeout(() => {
		subscriber.next(4);
		subscriber.complete();
	}, 1000);
});

console.log('just before subscribe');

observable.subscribe(() => {
	next(x) {
		console.log('got value ' + x);
	},
	error(err) {
		console.error('something wrong occurred: ' + err);
	},
	complete() {
		console.log('done');
	}
});

console.log('just after subscribe');

// Logs:
// just before subscribe
// got value 1
// got value 2
// got value 3
// just after subscribe
// got value 4
// done

Pull & Push#

Pull 和 Push 是兩種不同的協議,描述了數據生產者如何與數據消費者進行通信。

Pull: 消費者決定何時從數據生產者那裡接收數據。Producer 本身並不知道數據何時會交付給 Consumer。

每個 Javascript 函數都是一個 Pull 系統。該函數就是數據的生產者,調用該函數的代碼通過從其調用中 pull 單個返回值來使用它。

ES2015 引入了生成函數和迭代器 (function*),這是另一種 Pull 系統,調用 iterator.next() 的代碼消費者,從迭代器(生產者)中 pull 出多個值。

ProducerConsumer
Pull被動:在請求時產生數據主動:決定何時請求數據
Push主動:按照自己的節奏生成數據被動:對接收到的數據做出反應。

Push: 生產者決定何時向消費者推送數據,消費者不知道何時會收到該數據。

Promises 是當今 Javascript 中最常見的推送系統類型。 Promise (生產者) 向已經註冊的回調(消費者)傳遞已解析的值,但與函數不同的是,Promise 負責準確確定何時將值 “推送” 到回調。

RxJS 引入了 Observable,一種新的 Javascript 推送系統。Observable 是多個值的生產者,將它們 “推送” 給觀察者(消費者)。

  • 函數是一種惰性求值計算,它在調用時同步返回單個值。
  • 生成器是一種延遲評估的計算,它將迭代時同步返回零到(可能)無限值。
  • Promise 是一種可能(或可能不會)最終返回單個值的計算。
  • Observable 是一種惰性求值計算,從它被調用開始,它可以同步或異步返回零到(可能)無限值。

Observables 作為函數的歸納#

與流行的說法相反,Observables 既不像 EventEmitter,也不像 Promises 的多值。Observables 在某些情況下可能表現得像 EventEmitters,即當它們使用 RxJS Subject 進行多播時,但通常它們表現得不像 EventEmitters。

Observables 就像沒有參數的函數,但推論出多個值。

function foo() {
	console.log('Hello');
	return 42;
}

const x = foo.call();
console.log(x);
const y = foo.call();
console.log(y);

// Logs:
// "Hello"
// 42
// "Hello"
// 42

使用 Observables:

import { Observable } from 'rxjs';

const foo = new Observable((subscriber) => {
	console.log('Hello');
	subscriber.next(42);
});

foo.subscribe((x) => {
	console.log(x);
})
foo.subscribe((y) => {
	console.log(y);
});

// Logs:
// "Hello"
// 42
// "Hello"
// 42

因為函數和 Observables 都是惰性計算,如果你不調用函數,console.log('Hello')是不會發生的。同樣對於 Observables,如果你不訂閱它 (調用 subscribe),console.log('Hello')就不會發生。另外,“調用” 和 “訂閱” 是一個孤立的操作:兩個函數調用觸發兩個獨立的副作用,兩個 Observable 訂閱觸發兩個獨立的副作用。與 EventEmitters 不同,EventEmitters 具有共同的副作用並且不管訂閱者是否存在都急於執行,Observables 沒有共享執行且是惰性的。

訂閱 Observable 類似於調用函數

有些人聲稱 Observables 是異步的。那不是真的,如果你用日誌包圍一個函數調用,就像這樣:

console.log('before');
console.log(foo.call());
console.log('after');

// Logs:
// "before"
// "Hello"
// 42
// "after"

這與 Observables 行為相同:

console.log('before');
foo.subscribe((x) => {
	console.log(x);
});
console.log('after');

// Logs:
// "before"
// "Hello"
// 42
// "after"

這證明 foo 的訂閱完全是同步的,就像一個函數一樣。

Observables 能夠同步或異步傳遞值。

那 Observable 和函數有什麼不同呢?隨著時間的推移,Observables 可以 “返回” 多個值,這個是函數做不到的。

import { Observable } from 'rxjs';

const foo = new Observable((subscriber) => {
	console.log('Hello');
	subscriber.next(42);
	subscriber.next(100);
	subscriber.next(200);
});

console.log('before');
foo.subscribe((x) => {
	console.log(x);
});
console.log('after');

// Logs:
// "before"
// "Hello"
// 42
// 100
// 200
// "after"

也可以異步返回值:

import { Observable } from 'rxjs';

const foo = new Observable((subscriber) => {
	console.log('Hello');
	subscriber.next(42);
	subscriber.next(100);
	subscriber.next(200);
	setTimeout(() => {
		subscriber.next(300); // 異步輸出
	}, 1000)
});

console.log('before');
foo.subscribe((x) => {
	console.log(x);
});
console.log('after');

// Logs:
// "before"
// "Hello"
// 42
// 100
// 200
// "after"
// 300

結論:

  • func.call() 表示 “同步給我一個值”
  • observable.subscribe() 表示 “給我任意數量的值,同步或者異步”

Observable 剖析#

observable 是用 new Observable 或其他創建運算符創建的,通過觀察者訂閱,執行以向觀察者傳遞 next/error/complete 通知,並且它們的執行可能會被釋放。這四個方面都編碼在一個 Observable 實例中,但其中某一些方面與其他類型相關,例如 Observer 和 Subscription。

核心 observable 關注點:

  • Creating 創建 Observable
  • Subscribing 訂閱 Observable
  • Executing 執行 Observable
  • Disposing 處理 Observable

創建 Observables#

Observable 構造函數接受一個參數:subscribe 函數。

下面的示例創建了一個 Observable 以每秒向訂閱者發送字符串 “hi”。

import { Observable } from 'rxjs';

const observable = new Observable(function subscribe(subscriber) {
	setInterval(() => {
		subscriber.next('hi');
	}, 1000);
});

可以使用 new Observable 創建 Observable。最常見的還可以使用創建函數來創建,例如 of、from、interval 等。

在上面的例子中,subscribe 函數是描述 Observable 最重要的部分。讓我們看看訂閱意味著什麼。

訂閱 Observables#

示例中的 Observable 可以被這樣訂閱:

observable.subscribe((x) => console.log(x));

new Observable(function subscribe(subscriber) {...}) 中的 observable.subscribesubscribe 同名並非巧合。在庫中,它們是不同的,但出於實際目的,可以認為它們在概念上是相同的。

這顯示了如何不在同一 Observable 的多個觀察者之間共享 subscribe 調用。當用觀察者調用 observable.subscribe 時,new Observable(function subscribe(subscriber) {...}) 中的函數 subscribe 會為該訂閱者運行。每次調用 observable.subscribe 都會為給定的訂閱者觸發其自己的獨立設置。

訂閱一個 Observable 就像調用一個函數,在數據將被傳送到的地方提供回調。

這與 addEventListener /removeEventListener 等處理事件程序 API 截然不同。使用 observable.subscribe 時,給定的 Observer 未在 Observable 中註冊為監聽器。Observable 甚至不維護附加的觀察者列表。

執行 Observables#

new Observable(function subscribe(subscriber) {...}) 中的代碼代表一個”Observable 執行 “,一個只對每個訂閱的 Observer 發生的惰性計算。隨著時間的推移,執行會同步或異步地產生多個值。

Observable 執行可以傳遞三種類型的值:

  • “Next” 下個通知:發送一個值,如數字、字符串、對象等。
  • “Error” 錯誤通知:發送 Javascript 錯誤或異常。
  • “Complete” 完成通知:不發送值。

“Next” 是最常見和最重要的類型:它們代表傳遞給訂閱者的實際數據。“Error” 和 “Complete” 通知在 Observable 執行期間可能只發生一次,並且只能有其中一種。

這些約束在 Observable Grammar 或 Contract 中表達得很好,寫成正則表達式:

next*(error|complete)?

在 Observable 執行中,可以傳遞零到無限的 Next 通知。如果傳遞了錯誤或完成通知,則之後無法傳遞任何其他內容。

以下是 Observable 執行的示例,它傳遞三個 Next 通知,然後完成:

import { Observable } from 'rxjs';

const observable = new Observable(function subscribe(subscriber) {
  subscriber.next(1);
  subscriber.next(2);
  subscriber.next(3);
  subscriber.complete();
});

Observable 嚴格遵守 Observable Contract,因此以下代碼不會傳遞 Next 通知 4:

import { Observable } from 'rxjs';

const observable = new Observable(function subscribe(subscriber) {
  subscriber.next(1);
  subscriber.next(2);
  subscriber.next(3);
  subscriber.complete();
  subscriber.next(4); // 不會發出通知
});

最好用 try /catch 塊將 subscribe 中的所有代碼包裝起來,如果它能捕獲到異常,它將發送錯誤通知:

import { Observable } from 'rxjs';
 
const observable = new Observable(function subscribe(subscriber) {
  try {
    subscriber.next(1);
    subscriber.next(2);
    subscriber.next(3);
    subscriber.complete();
  } catch (err) {
    subscriber.error(err); // 如果捕獲到一個錯誤,則傳遞一個錯誤
  }
});

處理 Observable 的執行#

因為 Observable 的執行可能是無限的,並且觀察者想要在有限時間內終止執行是很常見的,所以我們需要一個 API 來取消執行。由於每次執行僅由一個 Observer 獨占,一旦 Observer 完成接收值,它必須有一種方法來停止執行,以避免浪費計算能力或內存資源。

observable.subscribe 被調用時,Observer 會附加到新創建的 Observable 執行中。此調用會返回一個對象 Subscription。

const subscription = observable.subscribe((x) => console.log(x));

訂閱代表正在進行的執行,並且有一個最小的 API,允許你取消執行。

import { from } from 'rxjs';

const observable = from([10, 20, 30]);
const subscription = observable.subscribe((x) => console.log(x));
// Later
subscription.unsubscribe();

當你訂閱時,你會得到一個 subscription,它代表正在進行的 Observable 執行。只需調用 unsubscribe () 即可取消執行。

當我們使用 create() 創建 Observable 時,每個 Observable 都必須定義如何處理該執行的資源。您可以通過從 function subscribe() 中返回一個自定義的 unsubscribe 函數來做到這一點。

import { Observable } from 'rxjs';

const observable = new Observable(function subscribe(subscriber) {
	const intervalId = setInterval(() => {
		subscriber.next('hi');
	}, 1000);

	return function unsubscribe() {
		clearInterval(intervalId);
	}
});

就像 observable.subscribe 類似於 new Observable(function subscribe() {...}) 一樣,我們從 subscribe 返回的 unsubscribe 在概念上等於 subscription.unsubscribe。事實上,如果我們刪除圍繞這些概念的 ReactiveX 類型,我們就只剩下相當簡單的 Javascript。

function subscribe(subscriber) {
	const intervalId = setInterval(() => {
    subscriber.next('hi');
  }, 1000);
 
  return function unsubscribe() {
    clearInterval(intervalId);
  };
}

const unsubscribe = subscribe({ next: (x) => console.log(x) });

// Later
unsubscribe();

我們使用 Observable、Observer 和 Subscription 等這些 Rx 類型的原因其實就是為了獲得安全性以及與 Operators 的組合性。

載入中......
此文章數據所有權由區塊鏈加密技術和智能合約保障僅歸創作者所有。