Flattening and Transforming Observable Arrays with RxJs

Flattening and Transforming Observable Arrays with RxJs

Rxjs is an open-source library use to handle streams of events. It’s a very powerful tool but it has a steep learning curve and can be tricky at times.

The issue we will be discussing in this article is dealing with a list of objects inside a stream and needing to execute another async operation on one property of those objects. This can result in a Observable<Array<Observable<T>>>, which can be challenging to subscribe to. We’ll explore different solutions to overcome this challenge and transform it into an Observable<Observable<Array<T>>>.

This article will also help you to resolve challenge #11 of Angular Challenges, which is intended for developers who already have a strong understanding of observables. However anyone can read this article and learn from it. If you want to try this challenge first, I encourage you to do so then come back to compare your solution with the guidance provided here. (You can also submit a PR that I’ll review)

Let’s start with a very basic example to understand the issue more clearly.

readonly persons$ = this.service.get(selectPersons);
// ^? Observable<Persons[]> where Person = {id: number, name: string}

We have a readonly persons$ observable that retrieves a list of persons from our store, which could be NgRx, RxAngular, or a simple service using a Subject. In this example, the implementation details are hidden behind this.service.

The goal is to fetch the address of each person from our backend and return a list of persons with their corresponding addresses.

Imperative approach:

The approach I see the most often inside codebase is to use imperative style and nest the subscribe callbacks, like this:

let personWithAddress: PersonWithAddress[];

this.persons$.subscribe((persons) => {
persons.forEach((person) => {
this.http.getAddress(person.id).subscribe((address) => {
personWithAddress.push({…person, …address});
});
});
});

However, this approach is discouraged as it results in nested subscribes and can quickly become messy, even in this simple example.

Naive approach with Rxjs:

Let’s try again with a more reactive approach, using rxjs operators.

What I often see when teaching people on that type of issue is this code :

personWithAddress$ = this.persons$.pipe(
// ?^ Observable<Observable<PersonWithAddress>>
mergeMap((persons) =>
persons.map((p) =>
//^ map is not a rxjs operator but a Array function
this.http.getAddress(p.id).pipe(map(address => ({…p, …address})))))
// ^return an observable we never subscribe to
);

People struggle with trying to use mergeMap, switchMap,… to flatten the Observable<Observable<T>> structure. However, this is not possible since persons.map returns an array of Observables, not a single Observable.

Forkjoin:

Another well-known option is the forkJoin operator, often used when working with an array of Observables that complete (note that the word “complete” is crucial, as forkJoin will only emit once all Observables have completed).

Here’s an example using forkJoin:

personWithAddress$ = this.persons$.pipe(
mergeMap((persons) =>
forkJoin(
persons.map((p) =>
this.http.getAddress(p.id).pipe(map((address) => ({ …p, …address })))
)
)
)
);

This code compiles and works as intended. However, it can be difficult to read and debug due to the high level of nesting inside the block of code.

The final trick:

One very important piece of advice to remember when working with streams of object arrays is to first flatten the structure to work with a stream of simple objects. Working with arrays can create a lot of overhead, while working with a simple stream of objects is what we are used to working with every day.

The two magical operators for achieving this are mergeAll and toArray. The first one lets us flatten the array to a stream of simple objects, and the second one puts the stream back into an array when it’s finished. (note that toArray will only emit once all Observables have completed).

Here’s an example of how we can use these operators in action:

personWithAddress$ = this.persons$.pipe(
mergeAll(), // flatten to Observable<Person>
mergeMap((p) =>
this.getAddress(p.id).pipe(map((address) => ({ …p, …address })))
),
toArray() // back to Observable<PersonWithAddress[]>
);

Advantages of using this method:

Only one level of nestingWorking with a simple object is preferred over working with an array of objects.

That’s it for this article! I hope you have learned a new trick to ease your usage of rxjs and become a better reactive programmer. 🚀

You can find me on Medium, Twitter or Github. Don’t hesitate to reach out to me if you have any questions.

Flattening and Transforming Observable Arrays with RxJs was originally published in ngconf on Medium, where people are continuing the conversation by highlighting and responding to this story.

Leave a Comment

Your email address will not be published. Required fields are marked *