Ein Leitfaden für Anfänger zu RxJS & Redux Observable

Redux-Observable ist eine RxJS-basierte Middleware für Redux, mit der Entwickler mit asynchronen Aktionen arbeiten können. Es ist eine Alternative zu Redux-Thunk und Redux-Saga.

Dieser Artikel behandelt die Grundlagen von RxJS, das Einrichten von Redux-Observables und einige seiner praktischen Anwendungsfälle. Aber vorher müssen wir das Beobachtermuster verstehen .

Beobachtermuster

Im Observer-Muster verwaltet ein Objekt mit der Bezeichnung "Observable" oder "Subject" eine Sammlung von Abonnenten mit der Bezeichnung "Observers". Wenn sich der Zustand der Probanden ändert, werden alle Beobachter benachrichtigt.

In JavaScript sind Ereignisemitter und Ereignishandler das einfachste Beispiel.

Wenn Sie dies tun .addEventListener, schieben Sie einen Beobachter in die Beobachter-Sammlung des Subjekts. Wann immer das Ereignis eintritt, benachrichtigt das Subjekt alle Beobachter.

RxJS

Laut der offiziellen Website,

RxJS ist eine JavaScript-Implementierung von ReactiveX, einer Bibliothek zum Erstellen asynchroner und ereignisbasierter Programme unter Verwendung beobachtbarer Sequenzen.

In einfachen Worten ist RxJS eine Implementierung des Observer-Musters. Es erweitert auch das Observer-Muster, indem es Operatoren bereitstellt, mit denen wir Observables und Subjects deklarativ zusammenstellen können.

Beobachter, Observable, Operatoren und Subjekte sind die Bausteine ​​von RxJS. Schauen wir uns nun jeden einzelnen genauer an.

Beobachter

Beobachter sind Objekte, die Observablen und Subjekte abonnieren können. Nach dem Abonnieren können sie Benachrichtigungen von drei Typen erhalten - next, error und complete.

Jedes Objekt mit der folgenden Struktur kann als Beobachter verwendet werden.

interface Observer { closed?: boolean; next: (value: T) => void; error: (err: any) => void; complete: () => void; }

Wenn die beobachtbare Schübe nächsten, Fehler- und vollständige Meldungen, die Observer .next, .errorund .completeMethoden aufgerufen werden.

Observables

Observables sind Objekte, die über einen bestimmten Zeitraum Daten ausgeben können. Es kann mit dem "Marmordiagramm" dargestellt werden.

Wenn die horizontale Linie die Zeit darstellt, stellen die kreisförmigen Knoten die vom Observable ausgegebenen Daten dar, und die vertikale Linie zeigt an, dass das Observable erfolgreich abgeschlossen wurde.

Bei Observables kann ein Fehler auftreten. Das Kreuz repräsentiert den vom Observable ausgegebenen Fehler.

Die Zustände "abgeschlossen" und "Fehler" sind endgültig. Das bedeutet, dass Observables nach erfolgreichem Abschluss oder Auftreten eines Fehlers keine Daten mehr ausgeben können.

Observable erstellen

Observables werden mit dem new ObservableKonstruktor erstellt, der ein Argument akzeptiert - die Subscribe-Funktion. Observables können auch mit einigen Operatoren erstellt werden, aber darüber werden wir später sprechen, wenn wir über Operatoren sprechen.

import { Observable } from 'rxjs'; const observable = new Observable(subscriber => { // Subscribe function });

Abonnieren eines Observable

Observables können mit ihrer .subscribeMethode abonniert werden und einen Observer übergeben.

observable.subscribe({ next: (x) => console.log(x), error: (x) => console.log(x), complete: () => console.log('completed'); });

Ausführung eines Observable

Die Subscribe-Funktion, die wir an den new ObservableKonstruktor übergeben haben, wird jedes Mal ausgeführt, wenn das Observable abonniert wird.

Die Abonnementfunktion akzeptiert ein Argument - den Abonnenten. Der Abonnent ähnelt die Struktur eines Beobachters, und es hat die gleichen 3 - Methoden: .next, .error, und .complete.

Observables können mithilfe der .nextMethode Daten an den Observer senden. Wenn das Observable erfolgreich abgeschlossen wurde, kann es den Observer mithilfe der .completeMethode benachrichtigen . Wenn das Observable auf einen Fehler gestoßen ist, kann es den Fehler mithilfe der .errorMethode an den Observer weiterleiten.

// Create an Observable const observable = new Observable(subscriber => { subscriber.next('first data'); subscriber.next('second data'); setTimeout(() => { subscriber.next('after 1 second - last data'); subscriber.complete(); subscriber.next('data after completion'); //  console.log(x), error: (x) => console.log(x), complete: () => console.log('completed') }); // Outputs: // // first data // second data // third data // after 1 second - last data // completed

Observables sind Unicast

Observables sind Unicast , was bedeutet, dass Observables höchstens einen Teilnehmer haben können. Wenn ein Observer ein Observable abonniert, erhält er eine Kopie des Observable mit einem eigenen Ausführungspfad, wodurch das Observables-Unicast erstellt wird.

Es ist wie ein YouTube-Video. Alle Zuschauer sehen denselben Videoinhalt, können jedoch verschiedene Segmente des Videos ansehen.

Beispiel : Lassen Sie uns ein Observable erstellen, das innerhalb von 10 Sekunden 1 bis 10 ausgibt. Abonnieren Sie dann das Observable einmal sofort und nach 5 Sekunden erneut.

// Create an Observable that emits data every second for 10 seconds const observable = new Observable(subscriber => { let count = 1; const interval = setInterval(() => { subscriber.next(count++); if (count > 10) { clearInterval(interval); } }, 1000); }); // Subscribe to the Observable observable.subscribe({ next: value => { console.log(`Observer 1: ${value}`); } }); // After 5 seconds subscribe again setTimeout(() => { observable.subscribe({ next: value => { console.log(`Observer 2: ${value}`); } }); }, 5000); /* Output Observer 1: 1 Observer 1: 2 Observer 1: 3 Observer 1: 4 Observer 1: 5 Observer 2: 1 Observer 1: 6 Observer 2: 2 Observer 1: 7 Observer 2: 3 Observer 1: 8 Observer 2: 4 Observer 1: 9 Observer 2: 5 Observer 1: 10 Observer 2: 6 Observer 2: 7 Observer 2: 8 Observer 2: 9 Observer 2: 10 */

In der Ausgabe können Sie feststellen, dass der zweite Observer von 1 aus mit dem Drucken begonnen hat, obwohl er nach 5 Sekunden abonniert hat. Dies liegt daran, dass der zweite Beobachter eine Kopie des Observable erhalten hat, dessen Abonnementfunktion erneut aufgerufen wurde. Dies zeigt das Unicast-Verhalten von Observables.

Themen

Ein Subjekt ist eine spezielle Art von Observable.

Erstellen eines Betreffs

Mit dem new SubjectKonstruktor wird ein Betreff erstellt .

import { Subject } from 'rxjs'; // Create a subject const subject = new Subject();

Abonnieren eines Betreffs

Das Abonnieren eines Betreffs ähnelt dem Abonnieren eines Observable: Sie verwenden die .subscribeMethode und übergeben einen Observer.

subject.subscribe({ next: (x) => console.log(x), error: (x) => console.log(x), complete: () => console.log("done") });

Ausführung eines Subjekts

Im Gegensatz zu Observable ruft ein Thema seine eigenen .next, .errorund .completeMethoden , um Daten zu Beobachtern zu schieben.

// Push data to all observers subject.next('first data'); // Push error to all observers subject.error('oops something went wrong'); // Complete subject.complete('done');

Themen sind Multicast

Subjekte sind Multicast: Mehrere Beobachter teilen sich das gleiche Subjekt und seinen Ausführungspfad. Dies bedeutet, dass alle Benachrichtigungen an alle Beobachter gesendet werden. Es ist wie ein Live-Programm. Alle Zuschauer sehen gleichzeitig dasselbe Segment desselben Inhalts.

Beispiel: Lassen Sie uns einen Betreff erstellen, der innerhalb von 10 Sekunden 1 bis 10 ausgibt. Abonnieren Sie dann das Observable einmal sofort und nach 5 Sekunden erneut.

// Create a subject const subject = new Subject(); let count = 1; const interval = setInterval(() => { subscriber.next(count++); if (count > 10) { clearInterval(interval); } }, 1000); // Subscribe to the subjects subject.subscribe(data => { console.log(`Observer 1: ${data}`); }); // After 5 seconds subscribe again setTimeout(() => { subject.subscribe(data => { console.log(`Observer 2: ${data}`); }); }, 5000); /* OUTPUT Observer 1: 1 Observer 1: 2 Observer 1: 3 Observer 1: 4 Observer 1: 5 Observer 2: 5 Observer 1: 6 Observer 2: 6 Observer 1: 7 Observer 2: 7 Observer 1: 8 Observer 2: 8 Observer 1: 9 Observer 2: 9 Observer 1: 10 Observer 2: 10 */ 

In the output, you can notice that the second Observer started printing from 5 instead of starting from 1. This happens because the second Observer is sharing the same Subject. Since it subscribed after 5 seconds, the Subject has already finished emitting 1 to 4. This illustrates the multicast behavior of a Subject.

Subjects are both Observable and Observer

Subjects have the .next, .error and .complete methods. That means that they follow the structure of Observers. Hence, a Subject can also be used as an Observer and passed to the .subscribe function of Observables or other Subjects.

Example: let us create an Observable and a Subject. Then subscribe to the Observable using the Subject as an Observer. Finally, subscribe to the Subject. All the values emitted by the Observable will be pushed to the Subject, and the Subject will broadcast the received values to all its Observers.

// Create an Observable that emits data every second const observable = new Observable(subscriber => { let count = 1; const interval = setInterval(() => { subscriber.next(count++); if (count > 5) { clearInterval(interval); } }, 1000); }); // Create a subject const subject = new Subject(); // Use the Subject as Observer and subscribe to the Observable observable.subscribe(subject); // Subscribe to the subject subject.subscribe({ next: value => console.log(value) }); /* Output 1 2 3 4 5 */

Operators

Operators are what make RxJS useful. Operators are pure functions that return a new Observable. They can be categorized into 2 main categories:

  1. Creation Operators
  2. Pipeable Operators

Creation Operators

Creation Operators are functions that can create a new Observable.

Example: we can create an Observable that emits each element of an array using the from operator.

const observable = from([2, 30, 5, 22, 60, 1]); observable.subscribe({ next: (value) => console.log("Received", value), error: (err) => console.log(err), complete: () => console.log("done") }); /* OUTPUTS Received 2 Received 30 Received 5 Received 22 Received 60 Received 1 done */

The same can be an Observable using the marble diagram.

Pipeable Operators

Pipeable Operators sind Funktionen, die ein Observable als Eingabe verwenden und ein neues Observable mit geändertem Verhalten zurückgeben.

Beispiel: Nehmen wir das Observable, das wir mit dem fromOperator erstellt haben. Mit diesem Observable können wir nun ein neues Observable erstellen, das mit dem filterOperator nur Zahlen größer als 10 ausgibt.

const greaterThanTen = observable.pipe(filter(x => x > 10)); greaterThanTen.subscribe(console.log, console.log, () => console.log("completed")); // OUTPUT // 11 // 12 // 13 // 14 // 15

Das gleiche kann mit dem Marmordiagramm dargestellt werden.

Es gibt viele weitere nützliche Operatoren. Die vollständige Liste der Operatoren sowie Beispiele in der offiziellen RxJS-Dokumentation finden Sie hier.

Es ist wichtig, alle häufig verwendeten Operatoren zu verstehen. Hier sind einige Operatoren, die ich oft benutze:

  1. mergeMap
  2. switchMap
  3. exhaustMap
  4. map
  5. catchError
  6. startWith
  7. delay
  8. debounce
  9. throttle
  10. interval
  11. from
  12. of

Redux Observables

Laut der offiziellen Website,

RxJS-basierte Middleware für Redux. Erstellen und brechen Sie asynchrone Aktionen ab, um Nebenwirkungen und mehr zu erzielen.

In Redux werden bei jedem Auslösen einer Aktion alle Reduzierungsfunktionen ausgeführt und ein neuer Status zurückgegeben.

Redux-Observable übernimmt alle diese ausgelösten Aktionen und neuen Zustände und erstellt daraus zwei Observable - Actions Observable action$und States Observable state$.

Die beobachtbaren Aktionen geben alle Aktionen aus, die mit dem ausgelöst werden store.dispatch(). Beobachtbare Zustände geben alle neuen Zustandsobjekte aus, die vom Root-Reduzierer zurückgegeben werden.

Epen

Laut der offiziellen Website,

Es ist eine Funktion, die einen Strom von Aktionen ausführt und einen Strom von Aktionen zurückgibt. Aktionen rein, Aktionen raus.

Epics are functions that can be used to subscribe to Actions and States Observables. Once subscribed, epics will receive the stream of actions and states as input, and it must return a stream of actions as an output. Actions In - Actions Out.

const someEpic = (action$, state$) => { return action$.pipe( // subscribe to actions observable map(action => { // Receive every action, Actions In return someOtherAction(); // return an action, Actions Out }) ) }

It is important to understand that all the actions received in the Epic have already finished running through the reducers.

Inside an Epic, we can use any RxJS observable patterns, and this is what makes redux-observables useful.

Example: we can use the .filter operator to create a new intermediate observable. Similarly, we can create any number of intermediate observables, but the final output of the final observable must be an action, otherwise an exception will be raised by redux-observable.

const sampleEpic = (action$, state$) => { return action$.pipe( filter(action => action.payload.age >= 18), // can create intermediate observables and streams map(value => above18(value)) // where above18 is an action creator ); }

Every action emitted by the Epics are immediately dispatched using the store.dispatch().

Setup

First, let's install the dependencies.

npm install --save rxjs redux-observable

Create a separate folder named epics to keep all the epics. Create a new file index.js inside the epics folder and combine all the epics using the combineEpics function to create the root epic. Then export the root epic.

import { combineEpics } from 'redux-observable'; import { epic1 } from './epic1'; import { epic2 } from './epic2'; const epic1 = (action$, state$) => { ... } const epic2 = (action$, state$) => { ... } export default combineEpics(epic1, epic2);

Create an epic middleware using the createEpicMiddleware function and pass it to the createStore Redux function.

import { createEpicMiddleware } from 'redux-observable'; import { createStore, applyMiddleware } from 'redux'; import rootEpic from './rootEpics'; const epicMiddleware = createEpicMiddlware(); const store = createStore( rootReducer, applyMiddleware(epicMiddlware) );

Finally, pass the root epic to epic middleware's .run method.

epicMiddleware.run(rootEpic);

Some Practical Usecases

RxJS has a big learning curve, and the redux-observable setup worsens the already painful Redux setup process. All that makes Redux observable look like an overkill. But here are some practical use cases that can change your mind.

Throughout this section, I will be comparing redux-observables with redux-thunk to show how redux-observables can be helpful in complex use-cases. I don't hate redux-thunk, I love it, and I use it every day!

1. Make API Calls

Usecase: Make an API call to fetch comments of a post. Show loaders when the API call is in progress and also handle API errors.

A redux-thunk implementation will look like this,

function getComments(postId){ return (dispatch) => { dispatch(getCommentsInProgress()); axios.get(`/v1/api/posts/${postId}/comments`).then(response => { dispatch(getCommentsSuccess(response.data.comments)); }).catch(() => { dispatch(getCommentsFailed()); }); } }

and this is absolutely correct. But the action creator is bloated.

We can write an Epic to implement the same using redux-observables.

const getCommentsEpic = (action$, state$) => action$.pipe( ofType('GET_COMMENTS'), mergeMap((action) => from(axios.get(`/v1/api/posts/${action.payload.postId}/comments`).pipe( map(response => getCommentsSuccess(response.data.comments)), catchError(() => getCommentsFailed()), startWith(getCommentsInProgress()) ) );

Now it allows us to have a clean and simple action creator like this,

function getComments(postId) { return { type: 'GET_COMMENTS', payload: { postId } } }

2. Request Debouncing

Usecase: Provide autocompletion for a text field by calling an API whenever the value of the text field changes. API call should be made 1 second after the user has stopped typing.

A redux-thunk implementation will look like this,

let timeout; function valueChanged(value) { return dispatch => { dispatch(loadSuggestionsInProgress()); dispatch({ type: 'VALUE_CHANGED', payload: { value } }); // If changed again within 1 second, cancel the timeout timeout && clearTimeout(timeout); // Make API Call after 1 second timeout = setTimeout(() => { axios.get(`/suggestions?q=${value}`) .then(response => dispatch(loadSuggestionsSuccess(response.data.suggestions))) .catch(() => dispatch(loadSuggestionsFailed())) }, 1000, value); } }

It requires a global variable timeout. When we start using global variables, our action creators are not longer pure functions. It also becomes difficult to unit test the action creators that use a global variable.

We can implement the same with redux-observable using the .debounce operator.

const loadSuggestionsEpic = (action$, state$) => action$.pipe( ofType('VALUE_CHANGED'), debounce(1000), mergeMap(action => from(axios.get(`/suggestions?q=${action.payload.value}`)).pipe( map(response => loadSuggestionsSuccess(response.data.suggestions)), catchError(() => loadSuggestionsFailed()) )), startWith(loadSuggestionsInProgress()) );

Now, our action creators can be cleaned up, and more importantly, they can be pure functions again.

function valueChanged(value) { return { type: 'VALUE_CHANGED', payload: { value } } }

3. Request Cancellation

Usecase: Continuing the previous use-case, assume that the user didn't type anything for 1 second, and we made our 1st API call to fetch the suggestions.

Let's say the API itself takes an average of 2-3 seconds to return the result. Now, if the user types something while the 1st API call is in progress, after 1 second, we will make our 2nd API. We can end up having two API calls at the same time, and it can create a race condition.

To avoid this, we need to cancel the 1st API call before making the 2nd API call.

A redux-thunk implementation will look like this,

let timeout; var cancelToken = axios.cancelToken; let apiCall; function valueChanged(value) { return dispatch => { dispatch(loadSuggestionsInProgress()); dispatch({ type: 'VALUE_CHANGED', payload: { value } }); // If changed again within 1 second, cancel the timeout timeout && clearTimeout(timeout); // Make API Call after 1 second timeout = setTimeout(() => { // Cancel the existing API apiCall && apiCall.cancel('Operation cancelled'); // Generate a new token apiCall = cancelToken.source(); axios.get(`/suggestions?q=${value}`, { cancelToken: apiCall.token }) .then(response => dispatch(loadSuggestionsSuccess(response.data.suggestions))) .catch(() => dispatch(loadSuggestionsFailed())) }, 1000, value); } }

Now, it requires another global variable to store the Axios's cancel token. More global variables = more impure functions!

To implement the same using redux-observable, all we need to do is replace .mergeMap with .switchMap.

const loadSuggestionsEpic = (action$, state$) => action$.pipe( ofType('VALUE_CHANGED'), throttle(1000), switchMap(action => from(axios.get(`/suggestions?q=${action.payload.value}`)).pipe( map(response => loadSuggestionsSuccess(response.data.suggestions)), catchError(() => loadSuggestionsFailed()) )), startWith(loadSuggestionsInProgress()) );

Since it doesn't require any changes to our action creators, they can continue to be pure functions.

Similarly, there are many use-cases where Redux-Observables actually shines! For example, polling an API, showing snack bars, managing WebSocket connections, etc.

To Conclude

If you are developing a Redux application that involves such complex use-cases, it is highly recommended to use Redux-Observables. After all, the benefits of using it are directly proportional to the complexity of your application, and it is evident from the above mentioned practical use-cases.

I strongly believe using the right set of libraries will help us to develop much cleaner and maintainable applications, and in the long term, the benefits of using them will outweigh the drawbacks.