Handle RxJS Subscriptions Properly Using Rx-Scavenger

logo.png

One of the must tricky parts in Angular (or simply when using RxJS in general) is to handle Observables’ Subscriptions; but don’t panic!

There are many ways to handle this properly and we’ll show you around the most common approaches.

TL;DR : Use Rx-Scavenger!
https://github.com/wishtack/wishtack-steroids/tree/master/packages/rx-scavenger

Common Solutions

Angular’s async Pipe

Using Angular’s async Pipe, it is possible to let Angular handle the subscription and unsubscribe when the component is destroyed (or simply when the element is destroyed using a structural directive like *ngIf for example).


import { Component } from '@angular/core';
import { interval } from 'rxjs';
@Component({
template: `<div>{{ count$ | async }}</div>`
})
export class CounterComponent {
count$ = interval(1000);
}

This can get tricky as multiple subscriptions might trigger the same processing multiple times and that’s when we need some operators like shareReplay which will connect once to the Observable and share the same data with all subscribers.


import { Component } from '@angular/core';
import { interval } from 'rxjs';
import { shareReplay } from 'rxjs/operators';
@Component({
template: `
<div>{{ count$ | async }}</div>
<div>{{ count$ | async }}</div>
`
})
export class CounterComponent {
count$ = interval(1000)
.pipe(shareReplay(1));
}

Limitations

The view is not the only final subscriber. Sometimes you will want to handle Observables programmatically from your TypeScript code.

In order to handle errors and completions you will probably need to use some extra operators like catchError, finalize or last and sometimes, things might get tricky.

takeUntil Operator

The other trick is to use the takeUntil operator on every Observable.

Once notified (we’ll be using a Subject) by ngOnDestroy, takeUntil will unsubscribe all opened Subscriptions.


import { Component, OnDestroy, OnInit } from '@angular/core';
import { Subject, interval } from 'rxjs';
import { takeUntil } from 'rxjs/operators';
@Component({
template: `
<div>{{ count }}</div>
<button (click)="startCounting()">Start Counting</button>
`
})
export class CounterComponent implements OnInit, OnDestroy {
count: number;
private _destroyed$ = new Subject();
ngOnInit() {
this.startCounting();
}
ngOnDestroy() {
this._destroyed$.next();
this._destroyed$.complete();
}
startCounting() {
interval(1000)
.pipe(takeUntil(this._destroyed$))
.subscribe(count => this.count = count);
}
}

Limitations

A bit verbose and thus error-prone.

Calling startCounting() method multiple times will create a race condition and turn our component into a crazy counter. We should unsubscribe from the previous Subscription.

Rx-Scavenger

Rx-Scavenger is an RxJS Subscription Garbage Collector specially made for Angular (but it works in other contexts too).

It is aiming to:

  • reduce the boilerplate code needed to handle Subscriptions properly,
  • handle subscription replacement in order to avoid race conditions like the one described above.

As you can see below, we give the instance of the component to the Scavenger and it takes care of generating all the (subject and ngOnDestroy) code we’ve written above with takeUntil.


import { Component, OnInit } from '@angular/core';
import { Scavenger } from '@wishtack/rx-scavenger';
import { interval } from 'rxjs';
@Component({
template: `<div>{{ count }}</div>`
})
export class CounterComponent implements OnDestroy, OnInit {
count: number;
private _scavenger = new Scavenger(this);
ngOnInit() {
this.startCounting();
}
ngOnDestroy() {
}
startCounting() {
interval(1000)
.pipe(this._scavenger.collect())
.subscribe(count => this.count = count);
}
}

but wait!!! We still didn’t handle the subscription replacement issue when calling startCounting() multiple times.

In order to fix that issue, we just have to replace collect() method by collectByKey(key: string) method and give it some unique key that represents the operation.


import { Component, OnInit } from '@angular/core';
import { Scavenger } from '@wishtack/rx-scavenger';
import { interval } from 'rxjs';
@Component({
template: `<div>{{ count }}</div>`
})
export class CounterComponent implements OnDestroy, OnInit {
count: number;
private _scavenger = new Scavenger(this);
ngOnInit() {
this.startCounting();
}
ngOnDestroy() {
}
startCounting() {
interval(1000)
.pipe(this._scavenger.collectByKey('count'))
.subscribe(count => this.count = count);
}
}

Now, every time we call this method, the previous Subscription is unsubscribed from automagically before subscribing to the new one.

Give it a try and let us know how it worked for you 😉

 


Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out /  Change )

Facebook photo

You are commenting using your Facebook account. Log Out /  Change )

Connecting to %s