Higher Order Observables
The term higher order (something) probably sounds a lot fancier than it really is. It is also really common to hear the term higher order functions in programming. The basic idea of a higher order function is that it does either of the following (or both):
- Accepts a function as a parameter
- Returns a function
A higher order function is a function that returns a function, or accepts a function as a parameter. A higher order observable is an observable that emits observables.
You may find this supplementary video helpful to provide some context:
Most of what we have looked at so far deals with observables that return simple values. For example, this:
const myObservable$ = from([1,2,3,4,5]);Is an observable that emits numbers:
12345Let’s imagine a different scenario, we are going to borrow the general concept of the example in the documentation. Let’s say we have an observable of APIs we want to make a request to in order to fetch some data:
const myObservable$ = from([ 'https://api-one.com/data', 'https://api-two.com/data', 'https://api-three.com/data', 'https://api-four.com/data']);Everything is looking pretty normal so far — we have an observable that emits four strings:
https://api-one.com/datahttps://api-two.com/datahttps://api-three.com/datahttps://api-four.com/dataThat’s not of much use to us — we want the actual data from those URLs. This is
a good scenario for our pipe — we want to transform this stream in some way.
We want to transform this stream from being an observable of URLs into an
observable of data fetched from those URLs.
The first thing that might spring to mind is to use the map operator:
myObservable$.pipe( map(url => this.http.get(url)))Now we are taking the URL that is passed into the map and making an HTTP
request with Angular’s HttpClient to fetch the data for us. This looks
promising, but there is a problem…
A get request made with the HttpClient will return an observable. What we
want is the data from the HTTP request being emitted on this stream, but if we
subscribe to the stream above what we actually get is:
ObservableObservableObservableObservableWe still need to subscribe to those observables to execute the HTTP request
and to get the data out of them. So, how should we handle that? The most natural
step might then be to… just subscribe to them. Maybe we do something like
this:
myObservable$.pipe( map(url => this.http.get(url))).subscribe((result) => { result.subscribe((data) => console.log(data))})This would work… but it’s getting a bit ugly with nested subscribes. Not only
that, we probably want to do something more than just console.log this data
— but how do we use it? Maybe we take each emission and push it into an array:
const results = [];
myObservable$.pipe( map(url => this.http.get(url))).subscribe((result) => { result.subscribe((data) => results.push(data))})In case my tone hasn’t been obvious you absolutely should not do this. It
works, results will now contain the results of all the HTTP requests, but it
is messy and is not coding in a reactive way (since we explicitly need to
subscribe to pull the data out of streams in order to work with it). We are
making things way harder than they need to be.
Flattening Operators
The whole point of the example above was to show why the flattening
operators we are about to talk about are useful. These operators solve this
situation for us, in different ways depending on what exactly we want to do.
When we have a higher order observable, or an observable that returns an
observable, we can use these operators to subscribe to the inner
observable (the inner observable is the observable being returned by the
observable) for us. They “flatten” the inner observables by subscribing to all
of them for us, and just giving us the simple values from within those streams.
To demonstrate these operators, we are going to continue using the same scenario we just discussed:
const myObservable$ = from([ 'https://api-one.com/data', 'https://api-two.com/data', 'https://api-three.com/data', 'https://api-four.com/data']);We want to transform this stream by taking these values and making HTTP requests to the urls.
switchMap
Let’s take a look at switchMap first, as it is probably the most common.
import { switchMap } from 'rxjs/operators';
myObservable$.pipe( switchMap(url => this.http.get(url))).subscribe(val => console.log(val));This example might seem oddly familiar. It is exactly the same as our “bad”
example, except instead of the normal map operator we swapped it out with
switchMap. This will give us the following result:
(data from api-four)We kind of get the result we want — this will return the actual data from the
URL rather than an observable for the HTTP request to get that data. But… it
only gives us the last one. The result from making a GET request to:
https://api-four.com/data.
This is often the behaviour we want, but not for this scenario. What switchMap
will do is the following:
- Receives a new
urlvalue from the outer observable (myObservable$) - Passes the
urlinto thegetcall, and subscribes to it (the inner observable) - Emits the value from the inner observable
If the http.get() call returned instantly/synchronously, then switchMap
would indeed return all the data for us:
(data from api-one)(data from api-two)(data from api-three)(data from api-four)But, an HTTP request might take some time to complete, maybe even a few seconds.
What happens with switchMap is that it will receive its first value, and
subscribe to the inner http.get() observable. However, if it receives its
second value from the outer observable (myObservable$) before it gets
a result from the inner observable (http.get), it will cancel the request
and launch a new request for the second value instead. The process will go more
like this:
- Receives a
api-onevalue from the outer observable (myObservable$) - Passes the
api-oneURL into thegetcall, and subscribes to it (the inner observable) - Receives a new
api-twovalue from the outer observable - Unsubscribes from the
http.get()forapi-oneand subscribes tohttp.get()forapi-twoinstead - Receives a new
api-threevalue from the outer observable - Unsubscribes from the
http.get()forapi-twoand subscribes tohttp.get()forapi-threeinstead - Receives a new
api-fourvalue from the outer observable - Unsubscribes from the
http.get()forapi-threeand subscribes tohttp.get()forapi-fourinstead - Emits the value from the inner observable for
api-four
All of the values get emitted from myObservable$ before it has time to finish
its http.get() request for each value, so it only has time to fetch and emit
the value for the last value (because no more values are coming in that cause it
to be cancelled before it can finish).
This is the special behaviour of switchMap: whenever it receives a new value
it will cancel the previous subscription.
Although this is not suited to this particular situation, it often is the appropriate flattening operator to use. For example, imagine the scenario of a user entering a search term into a search box. When they enter a value, we make an API request to get the results. If the user enters:
socksWe will take that value and use it to launch an HTTP request to get the results
for socks. But, if the user enters a new search term before that one returns:
pantsIt doesn’t make any sense to finish the request for socks, we don’t need it
any more. We would use a switchMap to cancel the in progress request, and just
return whatever the latest one is instead.
concatMap
With switchMap explained, we should be able to go a little lighter on the
explanations for all the rest. They all do basically the same thing, they are
only differentiated by the way they handle the situation of new values emitted
from the outer observable before the inner observable has completed. As
we saw with switchMap, it will cancel older values and switch to the new value
instead.
Let’s see what happens with concatMap:
import { concatMap } from 'rxjs/operators';
myObservable$.pipe( concatMap(url => this.http.get(url))).subscribe(val => console.log(val));Again, it looks exactly the same apart from the fact we are using the
concatMap operator. The result this will give us is:
(data from api-one)(data from api-two)(data from api-three)(data from api-four)That seems like what we want! It would work, but it’s actually not the most
efficient way to do this. Imagine that each http.get() takes 1000ms to
complete. The result of this stream would actually look like this:
(data from api-one) // arrives after 1 second(data from api-two) // arrives after 2 seconds(data from api-three) // arrives after 3 seconds(data from api-four) // arrives after 4 secondsIn total, it takes us 4000ms to get all of the results. The special behaviour
of concatMap is that it will wait for each inner observable to
complete before moving on to the next value. It sort of creates a nice
orderly queue as additional values arrive — like a bank or a post office (that
can only help one customer at a time). The process in more detail would look
like this:
- Receives a
api-onevalue from the outer observable (myObservable$) - Passes the
api-oneURL into thegetcall, and subscribes to it (the inner observable) - Receives a new
api-twovalue from the outer observable - Receives a new
api-threevalue from the outer observable - Receives a new
api-fourvalue from the outer observable - Emits the result of the
api-oneURL - Passes the
api-twoURL into thegetcall, and subscribes to it (the inner observable) - Emits the result of the
api-twoURL - Passes the
api-threeURL into thegetcall, and subscribes to it (the inner observable) - Emits the result of the
api-threeURL - Passes the
api-fourURL into thegetcall, and subscribes to it (the inner observable) - Emits the result of the
api-fourURL
Our concatMap receives all of the values very quickly, but it just deals with
them one at a time. Once one is done, the next one is started. This behaviour is
desirable where you want the observables to complete in order, but in this
case we probably don’t care about the order, we just want them to be all fetched
as fast as possible and at the same time.
That’s where our next operator comes into play.
mergeMap
The mergeMap operator is very similar to concatMap — it will also
subscribe to and emit the values for all inner observables. The difference
is that a mergeMap is not like an orderly queue at the post office or bank.
Maybe it’s more like a really good bank where everyone gets served instantly
by different tellers.
A mergeMap will instantly subscribe to the inner observable for any new
values it receives, and will emit the results as soon as it has them. It doesn’t
care about order or timing:
import { mergeMap } from 'rxjs/operators';
myObservable$.pipe( mergeMap(url => this.http.get(url))).subscribe(val => console.log(val));In our last example, we assumed all requests took 1000ms to perform. This
isn’t really realistic. The time it takes for a request to complete is going to
vary. Let’s say each request might take anywhere between 800ms and 1200ms.
With mergeMap, this might result in a result like this:
(data from api-one) // arrives after 0.8 seconds(data from api-two) // arrives after 0.9 seconds(data from api-three) // arrives after 0.9 seconds(data from api-four) // arrives after 1.1 secondsor it might result in this:
(data from api-one) // arrives after 0.8 seconds(data from api-four) // arrives after 0.8 seconds(data from api-three) // arrives after 1 second(data from api-two) // arrives after 1 secondNotice that in this case the api-four result is received second. The
mergeMap operator doesn’t care about the order — everything gets subscribed to
as soon as it arrives, and it will emit as soon as it completes.
You can use this operator when the order doesn’t matter. This is probably the most suitable operator to this example — we get all of the data from all of the URLs as quickly as possible — but it really depends on the specific situation.
exhaustMap
The last one we are going to look at is probably the least used (at least in my
experience) but can still be useful. This one is more similar in spirit to
switchMap. To recap, switchMap will cancel any inner observable in progress
if a new value arrives from the outer observable. The exhaustMap operator will
ignore values from the outer observable if it already has an active
subscription for the inner observable. Once the inner observable completes, it
will start accepting values from the outer observable again, but any values that
were emitted before the completion of the inner observable will be missed.
To use our bank/post office analogy again, imagine you arrive at the bank but there is a customer already being served. Too bad, you’re just going to go home and you never get served. Maybe the next customer to come along is a bit luckier than you. When they arrive the first customer has finished being served, so now the customer that just arrived will get served.
Unlike the concatMap which will store/queue values that arrive during an
active subscription, and then address them later, the exhaustMap will just
drop them completely if they arrive during an active subscription:
import { exhaustMap } from 'rxjs/operators';
myObservable$.pipe( exhaustMap(url => this.http.get(url))).subscribe(val => console.log(val));This might lead to a situation like this:
(data from api-one) // arrives after 1 second(data from api-four) // arrives after 2 secondsWhilst the first request is being processed, the values for api-two and
api-three might be received. However, since the inner observable/HTTP request
for api-one hasn’t completed yet, these values are ignored. If the api-four
value arrives after the api-one request has been completed, then it will
be processed. Depending on the timing of things, we might just end up with only
the first api-one request being successfully completed.
Recap
All of the following questions assume we are dealing with higher-order observables: