Most of the common RxJS operators are about transformation, combination or filtering, but this lesson is about a new category, error handling operators, and its most important operator: catch().
I thought if we pass in the second arg to catch, it will return the current Observable and when the error no longer happens, it will continue with the flow. That doesn't appear to be the case. Is there a way around this, so for instance, if we had:
const str1$ = Rx.Observable.of('a','b', 33, 'c', 'd');
const upper1$ = str1$.map(x => x.toUpperCase());
upper1$.catch((e, obs) => obs).subscribe(
x => console.log("E: ", x),
e => console.log(e.toString()),
c => console.log('Complete j!')
I was hoping that it would hit the "catch" when it got to 33, which it did.. but then return the Observable, continue to "c", the "d" and complete.... no.
Hi Dean. The catch operator will subscribe to the returned value in its function, and by default observables in RxJS are cold, which is also the case of this example.
str1$ is cold,
upper1$ is cold, so
obs will refer to
upper1$. We have lessons in Egghead about cold and hot observables, but to summarise, you can understand "cold observable" as a video which can be watched ("subscribed"). So this makes the catch semantics easier to comprehend: upon an error while watching
upper1$, cancel the current view of that video and start watching
upper1$ video. This should, as with videos, produce a looping behavior of
To fix this issue, you want to make the source observable hot. The analogy here is a live video which is playing regardless of viewers. You can do that by using
.publish().refCount(). This will change the semantics of
subscribe so that it won't mean "start the live video from the beginning", it will mean just "observe the live video currently being played". We need to do one more fix: since
Observable.of emits synchronously, then there may a race condition where
catch re-subscribes too late. We can affect the sequencing of synchronous computations by choosing a different scheduler, in this case we use the
queue scheduler. The code below gives you the behavior you were looking for.
const str1$ = Rx.Observable.of('a','b', 33, 'c', 'd', Rx.Scheduler.queue) .publish().refCount(); const upper1$ = str1$.map(x => x.toUpperCase()); upper1$.catch((e, obs) => obs).subscribe( x => console.log(x), e => console.log(e.toString()), c => console.log('Complete j!') )
We only had to modify the definition of
str1$, not the others.