Observables 是多個值的惰性推送的集合,他們填補了下表中的缺失點:
Single | Multiple | |
---|---|---|
Pull | Function | Iterator |
Push | Promise | Observable |
下面是一個 Observable,它在訂閱時立即(同步)推送值 1、2、3,並在訂閱調用後一秒後推送值 4,然後完成:
import { Observable } from 'rxjs';
const observable = new Observable((subscriber) => {
subscriber.next(1);
subscriber.next(2);
subscriber.next(3);
setTimeout(() => {
subscriber.next(4);
subscriber.complete();
}, 1000)
});
要調用 Observable 並查看這些值,我們需要訂閱它:
import { Observable } from 'rxjs';
const observable = new Observable((subscriber) => {
subscriber.next(1);
subscriber.next(2);
subscriber.next(3);
setTimeout(() => {
subscriber.next(4);
subscriber.complete();
}, 1000);
});
console.log('just before subscribe');
observable.subscribe(() => {
next(x) {
console.log('got value ' + x);
},
error(err) {
console.error('something wrong occurred: ' + err);
},
complete() {
console.log('done');
}
});
console.log('just after subscribe');
// Logs:
// just before subscribe
// got value 1
// got value 2
// got value 3
// just after subscribe
// got value 4
// done
Pull & Push#
Pull 和 Push 是兩種不同的協議,描述了數據生產者如何與數據消費者進行通信。
Pull: 消費者決定何時從數據生產者那裡接收數據。Producer 本身並不知道數據何時會交付給 Consumer。
每個 Javascript 函數都是一個 Pull 系統。該函數就是數據的生產者,調用該函數的代碼通過從其調用中 pull 單個返回值來使用它。
ES2015 引入了生成函數和迭代器 (function*),這是另一種 Pull 系統,調用 iterator.next()
的代碼消費者,從迭代器(生產者)中 pull 出多個值。
Producer | Consumer | |
---|---|---|
Pull | 被動:在請求時產生數據 | 主動:決定何時請求數據 |
Push | 主動:按照自己的節奏生成數據 | 被動:對接收到的數據做出反應。 |
Push: 生產者決定何時向消費者推送數據,消費者不知道何時會收到該數據。
Promises 是當今 Javascript 中最常見的推送系統類型。 Promise (生產者) 向已經註冊的回調(消費者)傳遞已解析的值,但與函數不同的是,Promise 負責準確確定何時將值 “推送” 到回調。
RxJS 引入了 Observable,一種新的 Javascript 推送系統。Observable 是多個值的生產者,將它們 “推送” 給觀察者(消費者)。
- 函數是一種惰性求值計算,它在調用時同步返回單個值。
- 生成器是一種延遲評估的計算,它將迭代時同步返回零到(可能)無限值。
- Promise 是一種可能(或可能不會)最終返回單個值的計算。
- Observable 是一種惰性求值計算,從它被調用開始,它可以同步或異步返回零到(可能)無限值。
Observables 作為函數的歸納#
與流行的說法相反,Observables 既不像 EventEmitter,也不像 Promises 的多值。Observables 在某些情況下可能表現得像 EventEmitters,即當它們使用 RxJS Subject 進行多播時,但通常它們表現得不像 EventEmitters。
Observables 就像沒有參數的函數,但推論出多個值。
function foo() {
console.log('Hello');
return 42;
}
const x = foo.call();
console.log(x);
const y = foo.call();
console.log(y);
// Logs:
// "Hello"
// 42
// "Hello"
// 42
使用 Observables:
import { Observable } from 'rxjs';
const foo = new Observable((subscriber) => {
console.log('Hello');
subscriber.next(42);
});
foo.subscribe((x) => {
console.log(x);
})
foo.subscribe((y) => {
console.log(y);
});
// Logs:
// "Hello"
// 42
// "Hello"
// 42
因為函數和 Observables 都是惰性計算,如果你不調用函數,console.log('Hello')
是不會發生的。同樣對於 Observables,如果你不訂閱它 (調用 subscribe),console.log('Hello')
就不會發生。另外,“調用” 和 “訂閱” 是一個孤立的操作:兩個函數調用觸發兩個獨立的副作用,兩個 Observable 訂閱觸發兩個獨立的副作用。與 EventEmitters 不同,EventEmitters 具有共同的副作用並且不管訂閱者是否存在都急於執行,Observables 沒有共享執行且是惰性的。
訂閱 Observable 類似於調用函數
有些人聲稱 Observables 是異步的。那不是真的,如果你用日誌包圍一個函數調用,就像這樣:
console.log('before');
console.log(foo.call());
console.log('after');
// Logs:
// "before"
// "Hello"
// 42
// "after"
這與 Observables 行為相同:
console.log('before');
foo.subscribe((x) => {
console.log(x);
});
console.log('after');
// Logs:
// "before"
// "Hello"
// 42
// "after"
這證明 foo 的訂閱完全是同步的,就像一個函數一樣。
Observables 能夠同步或異步傳遞值。
那 Observable 和函數有什麼不同呢?隨著時間的推移,Observables 可以 “返回” 多個值,這個是函數做不到的。
import { Observable } from 'rxjs';
const foo = new Observable((subscriber) => {
console.log('Hello');
subscriber.next(42);
subscriber.next(100);
subscriber.next(200);
});
console.log('before');
foo.subscribe((x) => {
console.log(x);
});
console.log('after');
// Logs:
// "before"
// "Hello"
// 42
// 100
// 200
// "after"
也可以異步返回值:
import { Observable } from 'rxjs';
const foo = new Observable((subscriber) => {
console.log('Hello');
subscriber.next(42);
subscriber.next(100);
subscriber.next(200);
setTimeout(() => {
subscriber.next(300); // 異步輸出
}, 1000)
});
console.log('before');
foo.subscribe((x) => {
console.log(x);
});
console.log('after');
// Logs:
// "before"
// "Hello"
// 42
// 100
// 200
// "after"
// 300
結論:
func.call()
表示 “同步給我一個值”observable.subscribe()
表示 “給我任意數量的值,同步或者異步”
Observable 剖析#
observable 是用 new Observable
或其他創建運算符創建的,通過觀察者訂閱,執行以向觀察者傳遞 next/error/complete 通知,並且它們的執行可能會被釋放。這四個方面都編碼在一個 Observable 實例中,但其中某一些方面與其他類型相關,例如 Observer 和 Subscription。
核心 observable 關注點:
- Creating 創建 Observable
- Subscribing 訂閱 Observable
- Executing 執行 Observable
- Disposing 處理 Observable
創建 Observables#
Observable 構造函數接受一個參數:subscribe 函數。
下面的示例創建了一個 Observable 以每秒向訂閱者發送字符串 “hi”。
import { Observable } from 'rxjs';
const observable = new Observable(function subscribe(subscriber) {
setInterval(() => {
subscriber.next('hi');
}, 1000);
});
可以使用 new Observable 創建 Observable。最常見的還可以使用創建函數來創建,例如 of、from、interval 等。
在上面的例子中,subscribe 函數是描述 Observable 最重要的部分。讓我們看看訂閱意味著什麼。
訂閱 Observables#
示例中的 Observable 可以被這樣訂閱:
observable.subscribe((x) => console.log(x));
new Observable(function subscribe(subscriber) {...})
中的 observable.subscribe
和 subscribe
同名並非巧合。在庫中,它們是不同的,但出於實際目的,可以認為它們在概念上是相同的。
這顯示了如何不在同一 Observable 的多個觀察者之間共享 subscribe 調用。當用觀察者調用 observable.subscribe
時,new Observable(function subscribe(subscriber) {...})
中的函數 subscribe 會為該訂閱者運行。每次調用 observable.subscribe
都會為給定的訂閱者觸發其自己的獨立設置。
訂閱一個 Observable 就像調用一個函數,在數據將被傳送到的地方提供回調。
這與 addEventListener /removeEventListener 等處理事件程序 API 截然不同。使用 observable.subscribe
時,給定的 Observer 未在 Observable 中註冊為監聽器。Observable 甚至不維護附加的觀察者列表。
執行 Observables#
new Observable(function subscribe(subscriber) {...})
中的代碼代表一個”Observable 執行 “,一個只對每個訂閱的 Observer 發生的惰性計算。隨著時間的推移,執行會同步或異步地產生多個值。
Observable 執行可以傳遞三種類型的值:
- “Next” 下個通知:發送一個值,如數字、字符串、對象等。
- “Error” 錯誤通知:發送 Javascript 錯誤或異常。
- “Complete” 完成通知:不發送值。
“Next” 是最常見和最重要的類型:它們代表傳遞給訂閱者的實際數據。“Error” 和 “Complete” 通知在 Observable 執行期間可能只發生一次,並且只能有其中一種。
這些約束在 Observable Grammar 或 Contract 中表達得很好,寫成正則表達式:
next*(error|complete)?
在 Observable 執行中,可以傳遞零到無限的 Next 通知。如果傳遞了錯誤或完成通知,則之後無法傳遞任何其他內容。
以下是 Observable 執行的示例,它傳遞三個 Next 通知,然後完成:
import { Observable } from 'rxjs';
const observable = new Observable(function subscribe(subscriber) {
subscriber.next(1);
subscriber.next(2);
subscriber.next(3);
subscriber.complete();
});
Observable 嚴格遵守 Observable Contract,因此以下代碼不會傳遞 Next 通知 4:
import { Observable } from 'rxjs';
const observable = new Observable(function subscribe(subscriber) {
subscriber.next(1);
subscriber.next(2);
subscriber.next(3);
subscriber.complete();
subscriber.next(4); // 不會發出通知
});
最好用 try /catch 塊將 subscribe 中的所有代碼包裝起來,如果它能捕獲到異常,它將發送錯誤通知:
import { Observable } from 'rxjs';
const observable = new Observable(function subscribe(subscriber) {
try {
subscriber.next(1);
subscriber.next(2);
subscriber.next(3);
subscriber.complete();
} catch (err) {
subscriber.error(err); // 如果捕獲到一個錯誤,則傳遞一個錯誤
}
});
處理 Observable 的執行#
因為 Observable 的執行可能是無限的,並且觀察者想要在有限時間內終止執行是很常見的,所以我們需要一個 API 來取消執行。由於每次執行僅由一個 Observer 獨占,一旦 Observer 完成接收值,它必須有一種方法來停止執行,以避免浪費計算能力或內存資源。
當 observable.subscribe
被調用時,Observer 會附加到新創建的 Observable 執行中。此調用會返回一個對象 Subscription。
const subscription = observable.subscribe((x) => console.log(x));
訂閱代表正在進行的執行,並且有一個最小的 API,允許你取消執行。
import { from } from 'rxjs';
const observable = from([10, 20, 30]);
const subscription = observable.subscribe((x) => console.log(x));
// Later
subscription.unsubscribe();
當你訂閱時,你會得到一個 subscription,它代表正在進行的 Observable 執行。只需調用 unsubscribe () 即可取消執行。
當我們使用 create()
創建 Observable 時,每個 Observable 都必須定義如何處理該執行的資源。您可以通過從 function subscribe()
中返回一個自定義的 unsubscribe
函數來做到這一點。
import { Observable } from 'rxjs';
const observable = new Observable(function subscribe(subscriber) {
const intervalId = setInterval(() => {
subscriber.next('hi');
}, 1000);
return function unsubscribe() {
clearInterval(intervalId);
}
});
就像 observable.subscribe
類似於 new Observable(function subscribe() {...})
一樣,我們從 subscribe 返回的 unsubscribe 在概念上等於 subscription.unsubscribe
。事實上,如果我們刪除圍繞這些概念的 ReactiveX 類型,我們就只剩下相當簡單的 Javascript。
function subscribe(subscriber) {
const intervalId = setInterval(() => {
subscriber.next('hi');
}, 1000);
return function unsubscribe() {
clearInterval(intervalId);
};
}
const unsubscribe = subscribe({ next: (x) => console.log(x) });
// Later
unsubscribe();
我們使用 Observable、Observer 和 Subscription 等這些 Rx 類型的原因其實就是為了獲得安全性以及與 Operators 的組合性。