- read

Angular: The Single Rule To Get Better At RxJS

David Dal Busco 34

As we do not want to subscribe, we have first to transform the method that is called by the component to return an Observable .

list(): Observable<Coin[]> {
return this.httpClient
.get<Coin[]>(`https://api.coinpaprika.com/v1/coins`)
...
}

Without any other changes, the compiler will warn you about the return values that are not matching (as we are still subscribing to the stream and therefore are actually returning a Subscription). That’s why, we replace the subscribe with an RxJS operator. In the particular case we are using tap because we still want to assign the result to the store.

list(): Observable<Coin[]> {
return this.httpClient
.get<Coin[]>(`https://api.coinpaprika.com/v1/coins`)
.pipe(
tap((allCoins: Coin[]) => {
if (allCoins.length > 10) {
this.coins = allCoins.filter(
(coin: Coin) =>
!coin.is_new && coin.rank > 0 && coin.rank < 100
);
}
}),
takeUntil(this.destroy$))
}

Because we are now not subscribing anymore, we can remove the takeUntil and let the caller handles the way it steams the data.

import { Injectable } from '@angular/core';
import { HttpClient } from '@angular/common/http';

import { Observable } from 'rxjs';
import { tap } from 'rxjs/operators';

export type Coin = Record<string, string | number | boolean>;

@Injectable({
providedIn: 'root'
})
export class CoinsService {
constructor(private httpClient: HttpClient) {}

private coins: Coin[] = [];

list(): Observable<Coin[]> {
return this.httpClient
.get<Coin[]>(`https://api.coinpaprika.com/v1/coins`)
.pipe(
tap((allCoins: Coin[]) => {
if (allCoins.length > 10) {
this.coins = allCoins.filter(
(coin: Coin) =>
!coin.is_new && coin.rank > 0 && coin.rank < 100
);
}
})
);
}

getCoins(): Coin[] {
return this.coins;
}
}

The code has already become cleaner, no more subscription and destroy lifecycle but, the code is still mixing different approaches. That’s why we take advantages of RxJS filter and map operators to make it more reactive.

list(): Observable<Coin[]> {
return this.httpClient
.get<Coin[]>(`https://api.coinpaprika.com/v1/coins`)
.pipe(
filter((allCoins: Coin[]) => allCoins.length > 10),
map((allCoins: Coin[]) =>
allCoins.filter(
(coin: Coin) =>
!coin.is_new && coin.rank > 0 && coin.rank < 100
)
),
tap((topCoins: Coin[]) => (this.coins = topCoins))
);
}

The imperative if has become a reactive filter and the array.filter has been moved to a map transformer. Thanks to these last modifications the data sources flow through the stream that describe what we want as a results.

Step 3: Async Pipe

To reach our ultimate goal, we want to remove the subscription in the component in order to leverage the | async pipe. Therefore, we have to improve our service. On the other hand, we still want it to act as a store.

That’s why, as an intermediate step, we replace the imperative state coins of the service with a BehaviorSubject, a special type of Observable that allows values to be multicasted to many Observers (source), and exposes it streams publicly as a readonly Observable variable.

import { Injectable } from '@angular/core';
import { HttpClient } from '@angular/common/http';

import { BehaviorSubject, Observable } from 'rxjs';
import { filter, map, tap } from 'rxjs/operators';

export type Coin = Record<string, string | number | boolean>;

@Injectable({
providedIn: 'root'
})
export class CoinsService {
constructor(private httpClient: HttpClient) {}

private coins: BehaviorSubject<Coin[]> = new BehaviorSubject<
Coin[]
>([]);

readonly coins$: Observable<Coin[]> = this.coins.asObservable();

list(): Observable<Coin[]> {
return this.httpClient
.get<Coin[]>(`https://api.coinpaprika.com/v1/coins`)
.pipe(
filter((allCoins: Coin[]) => allCoins.length > 10),
map((allCoins: Coin[]) =>
allCoins.filter(
(coin: Coin) =>
!coin.is_new && coin.rank > 0 && coin.rank < 100
)
),
tap((topCoins: Coin[]) => this.coins.next(topCoins))
);
}
}

In comparison to our previous changes, this is breaking. That’s why we have to adapt the component to remove the getter and replace it with an observable we can ultimately use in the template.

import { Component, OnDestroy, OnInit } from '@angular/core';

import { Observable, Subject } from 'rxjs';
import { takeUntil } from 'rxjs/operators';

import { Coin, CoinsService } from '../coins.service';

@Component({
selector: 'app-coins',
templateUrl: './coins.component.html',
styleUrls: ['./coins.component.css']
})
export class CoinsComponent implements OnInit, OnDestroy {
constructor(private readonly coinsService: CoinsService) {}

private destroy$: Subject<void> = new Subject<void>();

coins$: Observable<Coin[]> = this.coinsService.coins$;

ngOnInit(): void {
this.coinsService
.list()
.pipe(takeUntil(this.destroy$))
.subscribe(() => {});
}

ngOnDestroy() {
this.destroy$.next();
this.destroy$.complete();
}
}

Finally, we introduce the famous async pipe.

<article *ngFor="let coin of coins$ | async">

Step 4: No Subscribe And Reactive

Our current solution is really close to comply with the goals, we are using a stream to get the data and to display the results but, we still have to subscribe to trigger the loading the currencies.

That’s why we try to remove the subject.

import { Injectable } from '@angular/core';
import { HttpClient } from '@angular/common/http';

import { Observable } from 'rxjs';
import { filter, map } from 'rxjs/operators';

export type Coin = Record<string, string | number | boolean>;

@Injectable({
providedIn: 'root'
})
export class CoinsService {
constructor(private httpClient: HttpClient) {}

readonly coins$: Observable<Coin[]> = ... // <- TODO

list(): Observable<Coin[]> {
return this.httpClient
.get<Coin[]>(`https://api.coinpaprika.com/v1/coins`)
.pipe(
filter((allCoins: Coin[]) => allCoins.length > 10),
map((allCoins: Coin[]) =>
allCoins.filter(
(coin: Coin) =>
!coin.is_new && coin.rank > 0 && coin.rank < 100
)
)
);
}
}

We notice the exposed observable, coins$ , is now lacking a source.

On the other hand, we still have the stream that process the flow of the data as we except.

Yes, that’s right, we connect both.

readonly coins$: Observable<Coin[]> = this.httpClient
.get<Coin[]>(`https://api.coinpaprika.com/v1/coins`)
.pipe(
filter((allCoins: Coin[]) => allCoins.length > 10),
map((allCoins: Coin[]) =>
allCoins.filter(
(coin: Coin) =>
!coin.is_new && coin.rank > 0 && coin.rank < 100
)
)
);

However, doing so, we do loose the state management feature we had in place thanks to the use of the BehaviorSubject. That’s why we introduce a shareReplay that will also replay values, that will also make our service acts as a store.

import { Injectable } from '@angular/core';
import { HttpClient } from '@angular/common/http';

import { Observable } from 'rxjs';
import {filter, map, shareReplay} from 'rxjs/operators';

export type Coin = Record<string, string | number | boolean>;

@Injectable({
providedIn: 'root'
})
export class CoinsService {
constructor(private httpClient: HttpClient) {}

readonly coins$: Observable<Coin[]> = this.httpClient
.get<Coin[]>(`https://api.coinpaprika.com/v1/coins`)
.pipe(
filter((allCoins: Coin[]) => allCoins.length > 10),
map((allCoins: Coin[]) =>
allCoins.filter(
(coin: Coin) =>
!coin.is_new && coin.rank > 0 && coin.rank < 100
)
),
shareReplay({ bufferSize: 1, refCount: true })
);
}

If you never used shareReplay before, be careful when using it. Read more in the blog post of Kwinten Pisman.

Finally, we can remove our last subscription in the component and also all linked code that has for goal to handle the un-subscription.

import { Component } from '@angular/core';

import { Observable } from 'rxjs';

import { Coin, CoinsService } from '../coins.service';

@Component({
selector: 'app-coins',
templateUrl: './coins.component.html',
styleUrls: ['./coins.component.css']
})
export class CoinsComponent {
constructor(private readonly coinsService: CoinsService) {}

readonly coins$: Observable<Coin[]> = this.coinsService.coins$;
}

If you compare to its original version, has not the component become really slim and easy to understand?

A last check to the GUI.

All cryptos are still listed, the code is reactive and we are not using any “subscribe” anymore 🥳.