Catch, Recover, and Continue RxJS Streams After An Error

I assumed RxJS streams worked like Promises, and I was very wrong.

Here’s a stream of stuff, with meatballs!

const stuff$ = Rx.Observable.of(

Let’s make the stuff lowercase:

const lowercaseStuff$ = stuff$
	.map(x => x.toLowerCase());

Quick quiz: what is logged to console?

	(x => console.log('Success:', x)),
	(x => console.log('Error:', x)),
	(() => console.log('Complete'))

The answer may surprise you:

Success: spaghetti
Error: TypeError: x.toLowerCase is not a function

Where’s the meatballs?! It seems the error killed the stream. Let’s stop that from happening.

Attempt #1: Defensive coding

const lowercaseStuff$ = stuff$
	.map(x => {
		if (typeof x.toLowerCase !== 'function') {
			return undefined;
		return x.toLowerCase();


Success: spaghetti
Success: undefined
Success: meatballs

Hello meatballs! That was easy!

Hold up…

Maybe the “stream of stuff” is actually a response from a server. You expect to receive nicely structured JSON. Then one day, you don’t.

Maybe you’re passing the “stuff” into a 3rd party library, and it throws an error.

Errors will always happen. You can’t code defensively for every unpredictable possibility.

Let’s try something else.

Attempt #2: Try / catch

const lowercaseStuff$ = stuff$
	.map(x => {
		try {
			return x.toLowerCase();
		} catch(error) {
			return undefined;


Success: spaghetti
Success: undefined
Success: meatballs

Meatballs! 😍

But, what a nightmare to place this inside every operator:

const moreComplexStuff$ = stuff$
	.map(x => {
		try {
			return x.toLowerCase();
		} catch(error) {
			return undefined;
	.anotherOperator(x => {
		try {
			return doSomethingElse(x);
		} catch(error) {
			return undefined;

We can do better.

Attempt #3: Catch operator

const lowercaseStuff$ = stuff$
	.map(x => x.toLowerCase())
	.catch(error => Rx.Observable.of(error));


Success: spaghetti
Success: undefined

The error was caught, but the stream completes before the meatballs had a chance.

🤔 Hmm. The catch operator is returning a bran new stream. Maybe the original stream is completing because it’s replaced?

Attempt #4: Materialize / dematerialize

const lowercaseStuff$ = stuff$
	.map(x => x.toLowerCase())
	.map(notification => {
		if (notification.kind === "E") {
			return Rx.Notification.createNext(undefined);
		return notification;

This looks complex, but it’s behaving similar to the catch operator above. We are watching for an error notification, and then converting it into a next / success notification instead.

It’s pretty low level, but this time we have no catch operator replacing the main stream. Should be fine.

Success: spaghetti
Success: undefined

😖 No meatballs.

What is going on?!

Deep in RxJS Land

RxJS is internally wrapping code in operators (like map, filter, etc) with a try / catch. If an error is caught, it will notify subscribers, and then unsubscribe the stream. This is hard coded — you have no choice!

If an error is thrown within an operator, the stream will always complete.


Create a disposable stream. If an error occurs only the disposable stream dies, and the main stream lives on.

const lowercaseStuff$ = stuff$
	.switchMap((x) => {
		const disposableStream$ = Rx.Observable.of(x);
		return disposableStream$
			.map(x => x.toLowerCase())
			.catch(error => Rx.Observable.of(undefined));


Success: spaghetti
Success: undefined
Success: meatballs

We have meatballs! 😋

This can be condensed further:

const lowercaseStuff$ = stuff$.switchMap(x =>
		.map(x => x.toLowerCase())
		.catch(error => Rx.Observable.of(undefined))

And here’s a real world scenario:

  • Processing a response from the server
  • Keeping the main stream alive (so future responses are processed after an error)
  • Performing error handling in the main stream
	.switchMap(response =>
			.map(response => codeThatMayThrowAnError(response))
			.catch(error => Rx.Observable.of(error))
	.map(response => {
		if (response instanceof Error) {
			// Error handling here
		return response;
