I Am Turns

Catch, Recover, and Continue RxJS Streams After An Error

Last updated:

I assumed RxJS streams worked like Promises, and I was very wrong.

Here’s a stream of stuff, with meatballs!

const stuff$ = Rx.Observable.of(
	'Spaghetti',
	42,
	'Meatballs'
);

Let’s make the stuff lowercase:

const lowercaseStuff$ = stuff$
	.map(x => x.toLowerCase());

Quick quiz: what is logged to console?

lowercaseStuff$.subscribe(
	(x => console.log('Success:', x)),
	(x => console.log('Error:', x)),
	(() => console.log('Complete'))
);

The answer may surprise you:

Success: spaghetti
Error: TypeError: x.toLowerCase is not a function

Where’s the meatballs?! It seems the error killed the stream. Let’s stop that from happening.

Attempt #1: Defensive coding

const lowercaseStuff$ = stuff$
	.map(x => {
		if (typeof x.toLowerCase !== 'function') {
			return undefined;
		}
		return x.toLowerCase();
	});

Result:

Success: spaghetti
Success: undefined
Success: meatballs
Complete

Hello meatballs! That was easy!

Hold up…

Maybe the “stream of stuff” is actually a response from a server. You expect to receive nicely structured JSON. Then one day, you don’t.

Maybe you’re passing the “stuff” into a 3rd party library, and it throws an error.

Errors will always happen. You can’t code defensively for every unpredictable possibility.

Let’s try something else.

Attempt #2: Try / catch

const lowercaseStuff$ = stuff$
	.map(x => {
		try {
			return x.toLowerCase();
		} catch(error) {
			return undefined;
		}
	});

Result:

Success: spaghetti
Success: undefined
Success: meatballs
Complete

Meatballs! 😍

But, what a nightmare to place this inside every operator:

const moreComplexStuff$ = stuff$
	.map(x => {
		try {
			return x.toLowerCase();
		} catch(error) {
			return undefined;
		}
	})
	.anotherOperator(x => {
		try {
			return doSomethingElse(x);
		} catch(error) {
			return undefined;
		}
		});

We can do better.

Attempt #3: Catch operator

const lowercaseStuff$ = stuff$
	.map(x => x.toLowerCase())
	.catch(error => Rx.Observable.of(error));

Result:

Success: spaghetti
Success: undefined
Complete

The error was caught, but the stream completes before the meatballs had a chance.

🤔 Hmm. The catch operator is returning a bran new stream. Maybe the original stream is completing because it’s replaced?

Attempt #4: Materialize / dematerialize

const lowercaseStuff$ = stuff$
	.map(x => x.toLowerCase())
	.materialize()
	.map(notification => {
		if (notification.kind === "E") {
			return Rx.Notification.createNext(undefined);
		}
		return notification;
	})
	.dematerialize();

This looks complex, but it’s behaving similar to the catch operator above. We are watching for an error notification, and then converting it into a next / success notification instead.

It’s pretty low level, but this time we have no catch operator replacing the main stream. Should be fine.

Success: spaghetti
Success: undefined
Complete

😖 No meatballs.

What is going on?!


Deep in RxJS Land

RxJS is internally wrapping code in operators (like map, filter, etc) with a try / catch. If an error is caught, it will notify subscribers, and then unsubscribe the stream. This is hard coded — you have no choice!

If an error is thrown within an operator, the stream will always complete.

Solution

Create a disposable stream. If an error occurs only the disposable stream dies, and the main stream lives on.

const lowercaseStuff$ = stuff$
	.switchMap((x) => {
		const disposableStream$ = Rx.Observable.of(x);
		return disposableStream$
			.map(x => x.toLowerCase())
			.catch(error => Rx.Observable.of(undefined));
	})

Result:

Success: spaghetti
Success: undefined
Success: meatballs
Complete

We have meatballs! 😋

This can be condensed further:

const lowercaseStuff$ = stuff$.switchMap(x =>
	Rx.Observable.of(x)
		.map(x => x.toLowerCase())
		.catch(error => Rx.Observable.of(undefined))
);

And here’s a real world scenario:

  • Processing a response from the server
  • Keeping the main stream alive (so future responses are processed after an error)
  • Performing error handling in the main stream
responseFromServer$
	.switchMap(response =>
		Rx.Observable.of(response)
			.map(response => codeThatMayThrowAnError(response))
			.catch(error => Rx.Observable.of(error))
	)
	.map(response => {
		if (response instanceof Error) {
			// Error handling here
		}
		return response;
	});
TypeScript has never been easier thanks to the TypeScript plugin for Babel. Discover 4 reasons why TypeScript + Babel are a perfect pair, and follow a step-by-step guide to upgrade to TypeScript in 10 minutes.
More
I dreaded returning to programming during a recent holiday. It turns out I’m just done with Angular. Is it useful in 2018?
More

It’s impossible to keep up with JavaScript.

You’re catching up every chance you get. Scrolling… reading… refreshing… skimming. You’re lost in 42 browser tabs of articles, tutorials, and GitHub repos. You bookmark a handful to check out later (or, never).

It’s overwhelming. There’s too much to know.

So I developed a system that easily keeps me up-to-date.

Find out more