banner
Tomorrow

Tomorrow

不骄不躁
twitter

RxJS —— Observable

Observables are a collection of lazy push of multiple values, filling in the missing points in the table below:

SingleMultiple
PullFunctionIterator
PushPromiseObservable

Here is an Observable that immediately (synchronously) pushes the values 1, 2, and 3 upon subscription, and then pushes the value 4 one second after the subscription call, and then completes:

import { Observable } from 'rxjs';

const observable = new Observable((subscriber) => {
	subscriber.next(1);
	subscriber.next(2);
	subscriber.next(3);
	setTimeout(() => {
		subscriber.next(4);
		subscriber.complete();
	}, 1000)
});

To call the Observable and see these values, we need to subscribe to it:

import { Observable } from 'rxjs';

const observable = new Observable((subscriber) => {
	subscriber.next(1);
	subscriber.next(2);
	subscriber.next(3);
	setTimeout(() => {
		subscriber.next(4);
		subscriber.complete();
	}, 1000);
});

console.log('just before subscribe');

observable.subscribe(() => {
	next(x) {
		console.log('got value ' + x);
	},
	error(err) {
		console.error('something wrong occurred: ' + err);
	},
	complete() {
		console.log('done');
	}
});

console.log('just after subscribe');

// Logs:
// just before subscribe
// got value 1
// got value 2
// got value 3
// just after subscribe
// got value 4
// done

Pull & Push#

Pull and Push are two different protocols that describe how data producers communicate with data consumers.

Pull: The consumer decides when to receive data from the data producer. The producer itself does not know when the data will be delivered to the consumer.

Every JavaScript function is a Pull system. The function is the data producer, and the code that calls the function uses it by pulling a single return value from its call.

ES2015 introduced generator functions and iterators (function*), which is another Pull system where the code that calls iterator.next() pulls multiple values from the iterator (producer).

ProducerConsumer
PullPassive: produces data on requestActive: decides when to request data
PushActive: generates data at its own pacePassive: reacts to received data.

Push: The producer decides when to push data to the consumer, and the consumer does not know when it will receive that data.

Promises are the most common type of push system in today's JavaScript. A Promise (producer) passes resolved values to registered callbacks (consumers), but unlike functions, the Promise is responsible for accurately determining when to "push" the value to the callback.

RxJS introduces Observables, a new JavaScript push system. Observables are producers of multiple values that "push" them to observers (consumers).

  • Functions are a form of lazy evaluation that synchronously returns a single value when called.
  • Generators are a form of deferred evaluation that synchronously returns zero to (possibly) infinite values during iteration.
  • Promises are computations that may (or may not) eventually return a single value.
  • Observables are a form of lazy evaluation that can synchronously or asynchronously return zero to (possibly) infinite values from the moment they are called.

Observables as Function Induction#

Contrary to popular belief, Observables are neither like EventEmitters nor like multi-valued Promises. Observables may behave like EventEmitters in certain cases, specifically when they are multicast using RxJS Subject, but generally, they do not behave like EventEmitters.

Observables are like functions without parameters, but they infer multiple values.

function foo() {
	console.log('Hello');
	return 42;
}

const x = foo.call();
console.log(x);
const y = foo.call();
console.log(y);

// Logs:
// "Hello"
// 42
// "Hello"
// 42

Using Observables:

import { Observable } from 'rxjs';

const foo = new Observable((subscriber) => {
	console.log('Hello');
	subscriber.next(42);
});

foo.subscribe((x) => {
	console.log(x);
})
foo.subscribe((y) => {
	console.log(y);
});

// Logs:
// "Hello"
// 42
// "Hello"
// 42

Because both functions and Observables are lazy computations, if you do not call the function, console.log('Hello') will not happen. Similarly for Observables, if you do not subscribe to it (call subscribe), console.log('Hello') will not happen. Furthermore, "calling" and "subscribing" are isolated operations: two function calls trigger two independent side effects, and two Observable subscriptions trigger two independent side effects. Unlike EventEmitters, which have shared side effects and eagerly execute regardless of whether subscribers exist, Observables do not share execution and are lazy.

Subscribing to an Observable is like calling a function

Some people claim that Observables are asynchronous. This is not true; if you log around a function call, like this:

console.log('before');
console.log(foo.call());
console.log('after');

// Logs:
// "before"
// "Hello"
// 42
// "after"

This behaves the same as Observables:

console.log('before');
foo.subscribe((x) => {
	console.log(x);
});
console.log('after');

// Logs:
// "before"
// "Hello"
// 42
// "after"

This proves that the subscription to foo is completely synchronous, just like a function.

Observables can pass values synchronously or asynchronously.

So what is the difference between an Observable and a function? Over time, Observables can "return" multiple values, which a function cannot do.

import { Observable } from 'rxjs';

const foo = new Observable((subscriber) => {
	console.log('Hello');
	subscriber.next(42);
	subscriber.next(100);
	subscriber.next(200);
});

console.log('before');
foo.subscribe((x) => {
	console.log(x);
});
console.log('after');

// Logs:
// "before"
// "Hello"
// 42
// 100
// 200
// "after"

Values can also be returned asynchronously:

import { Observable } from 'rxjs';

const foo = new Observable((subscriber) => {
	console.log('Hello');
	subscriber.next(42);
	subscriber.next(100);
	subscriber.next(200);
	setTimeout(() => {
		subscriber.next(300); // Asynchronous output
	}, 1000)
});

console.log('before');
foo.subscribe((x) => {
	console.log(x);
});
console.log('after');

// Logs:
// "before"
// "Hello"
// 42
// 100
// 200
// "after"
// 300

Conclusion:

  • func.call() means "synchronously give me a value"
  • observable.subscribe() means "give me any number of values, synchronously or asynchronously"

Observable Dissection#

An observable is created with new Observable or other creation operators, executed by subscribing to an observer to pass next/error/complete notifications, and their execution may be disposed of. These four aspects are all encoded in an Observable instance, but some of these aspects are related to other types, such as Observer and Subscription.

Core observable concerns:

  • Creating Creating Observable
  • Subscribing Subscribing to Observable
  • Executing Executing Observable
  • Disposing Disposing Observable

Creating Observables#

The Observable constructor takes one parameter: the subscribe function.

The following example creates an Observable that sends the string "hi" to subscribers every second.

import { Observable } from 'rxjs';

const observable = new Observable(function subscribe(subscriber) {
	setInterval(() => {
		subscriber.next('hi');
	}, 1000);
});

You can create an Observable using new Observable. The most common way is to use creation functions like of, from, interval, etc.

In the above example, the subscribe function is the most important part of describing the Observable. Let's see what subscribing means.

Subscribing to Observables#

The Observable in the example can be subscribed to like this:

observable.subscribe((x) => console.log(x));

The observable.subscribe in new Observable(function subscribe(subscriber) {...}) and subscribe having the same name is not a coincidence. In the library, they are different, but for practical purposes, they can be considered conceptually the same.

This shows how the subscribe call is not shared among multiple observers of the same Observable. When an observer calls observable.subscribe, the function subscribe in new Observable(function subscribe(subscriber) {...}) runs for that subscriber. Each call to observable.subscribe triggers its own independent setup for the given subscriber.

Subscribing to an Observable is like calling a function, providing a callback where data will be sent.

This is in stark contrast to event handler APIs like addEventListener / removeEventListener. When using observable.subscribe, the given Observer is not registered as a listener in the Observable. The Observable does not even maintain an additional list of observers.

Executing Observables#

The code in new Observable(function subscribe(subscriber) {...}) represents an "Observable execution," a lazy computation that occurs only for each subscribed Observer. Over time, the execution can synchronously or asynchronously produce multiple values.

Observable execution can pass three types of values:

  • "Next" notification: sends a value, such as a number, string, object, etc.
  • "Error" notification: sends a JavaScript error or exception.
  • "Complete" notification: does not send a value.

"Next" is the most common and important type: they represent the actual data passed to subscribers. "Error" and "Complete" notifications may only occur once during Observable execution, and only one of them can occur.

These constraints are well expressed in the Observable Grammar or Contract, written as a regular expression:

next*(error|complete)?

In Observable execution, zero to infinite Next notifications can be passed. If an error or complete notification is passed, no further notifications can be sent afterward.

Here is an example of Observable execution that passes three Next notifications and then completes:

import { Observable } from 'rxjs';

const observable = new Observable(function subscribe(subscriber) {
  subscriber.next(1);
  subscriber.next(2);
  subscriber.next(3);
  subscriber.complete();
});

Observables strictly adhere to the Observable Contract, so the following code will not pass Next notification 4:

import { Observable } from 'rxjs';

const observable = new Observable(function subscribe(subscriber) {
  subscriber.next(1);
  subscriber.next(2);
  subscriber.next(3);
  subscriber.complete();
  subscriber.next(4); // Will not emit notification
});

It is best to wrap all code in subscribe with a try/catch block, so if it catches an exception, it will send an error notification:

import { Observable } from 'rxjs';
 
const observable = new Observable(function subscribe(subscriber) {
  try {
    subscriber.next(1);
    subscriber.next(2);
    subscriber.next(3);
    subscriber.complete();
  } catch (err) {
    subscriber.error(err); // Pass an error if caught
  }
});

Handling Observable Execution#

Since the execution of an Observable can be infinite, and it is common for observers to want to terminate execution within a finite time, we need an API to cancel execution. Since each execution is exclusively owned by one Observer, once the Observer has finished receiving values, it must have a way to stop execution to avoid wasting computational power or memory resources.

When observable.subscribe is called, the Observer attaches to the newly created Observable execution. This call returns an object Subscription.

const subscription = observable.subscribe((x) => console.log(x));

The subscription represents the ongoing execution and has a minimal API that allows you to cancel execution.

import { from } from 'rxjs';

const observable = from([10, 20, 30]);
const subscription = observable.subscribe((x) => console.log(x));
// Later
subscription.unsubscribe();

When you subscribe, you get a subscription that represents the ongoing Observable execution. Just call unsubscribe() to cancel execution.

When we create an Observable using create(), each Observable must define how to handle the resources of that execution. You can do this by returning a custom unsubscribe function from function subscribe().

import { Observable } from 'rxjs';

const observable = new Observable(function subscribe(subscriber) {
	const intervalId = setInterval(() => {
		subscriber.next('hi');
	}, 1000);

	return function unsubscribe() {
		clearInterval(intervalId);
	}
});

Just as observable.subscribe is similar to new Observable(function subscribe() {...}), the unsubscribe we return from subscribe is conceptually equivalent to subscription.unsubscribe. In fact, if we remove the ReactiveX types surrounding these concepts, we are left with quite simple JavaScript.

function subscribe(subscriber) {
	const intervalId = setInterval(() => {
    subscriber.next('hi');
  }, 1000);
 
  return function unsubscribe() {
    clearInterval(intervalId);
  };
}

const unsubscribe = subscribe({ next: (x) => console.log(x) });

// Later
unsubscribe();

The reason we use these Rx types like Observable, Observer, and Subscription is actually for safety and composability with Operators.

Loading...
Ownership of this post data is guaranteed by blockchain and smart contracts to the creator alone.