Understand RxJS Operators

André Staltz
InstructorAndré Staltz
Share this video with your friends

Social Share Links

Send Tweet

We have covered the basics of what is Observable.create, and other creation functions. Now lets finally dive into operators, which are the focus of this course. We will see how operators are simple pure functions attached to the Observable type.

🚨 Since we are importing interval from RxJS, we don't need to preface our Observables with Rx.Observable. Observable.create is deprecated, use new Observable instead. You can no longer .{operator}, you need to .pipe({operator}) instead.

[00:00] The focus of this RxJS course are operators. Operators are functions attached to the observable types. There is, for instance, foo.map, foo.filter, foo.merge, foo.combineLatest. Each of these is a function, which takes a source observable as input and outputs result observable. They all look a bit like this. They take a source observable as input, and they're supposed to return a result observable. That result is calculated somehow depending on the source.

[00:37] Each of these operators is a pure or immutable operation, which means that the source stays untouched. Even though the operator is attached to the source observable, for instance, when you call foo.combineLatest, foo stays completely untouched. We're not changing it. What we're getting is instead a result from this function call. That bar is now an observable which we can use or, for instance, subscribe to.

[01:08] For demonstration purposes, let's make a multiply by 10 operator. It's going to depend on this foo observable. It's basically going to multiply each of these numbers by 10. Every operator is a function that takes a source observable as input, and it should return this result observable.

[01:32] How do we create the result observable? We've seen that we can use Rx.Observable.create, can give the function which is a subscribe function, and the observer for that. What are we supposed to do inside here? We need to depend on the source. The only way that we can use an observable is by subscribing to it. Let's do that.

[01:55] Here, we're going to the three call backs. For the next value, we want to get that value from the source, and multiply it by 10. Deliver that to the observer for this result observable. We're also going to handle errors. We're just going to simply forward them to the result. Also, we're going to handle complete.

[02:33] Instead of subscribing to foo, if I call this multiply by 10 giving foo as the source, and I'll replace foo with bar, instead of one, two, three, four, five, we see 10, 20, 30, 40, and 50. Multiply by 10 is a bit of a free-standing operator. It's not actually attached to the observable type like combineLatest is. For demonstration purposes, we can also attach this multiply by 10 directly on the observable type.

[03:07] This is not actually recommended to do in production or any kind of series app. Here, we're just going to monkey patch the observable type to have this multiply by 10 as an operator there.

[03:23] Now, we need to do a small modification because since this is now attached to the prototype, we can get the source not as an argument, but simply as the this keyword. That's our source now.

[03:39] Then, we can call multiply by 10 in a slightly different way here since it's now attached to every observable. Foo is an observable so we have there multiply by 10 available for us. It takes now arguments as we see. That's what bar is going to be. This still works.

[03:58] We can take this even further a bit. We can allow some arguments here. For instance, we could allow not just multiply by 10, but multiply by any number. Then, we can provide that multiplier as an argument like this. Then, we can use that here. Then, we can give that number directly as an argument. As we can see, we can also change this from 10 to 100, and it still works.

[04:31] What is going on here is that we have a chain of subscriptions. Notice that when I subscribe to bar, internally, the subscription of bar which is result will do a subscription to source which is foo in our case. It means that when we subscribe to bar, it will then trigger a subscription to foo. That's what it means to have a subscription chain because we are triggering these subscriptions upstream towards the source.

[05:07] If you have many operators in chain like this, with some arguments in between, then, it means that once you subscribe to the observable that this returns, that will subscribe to this one, which will subscribe to this one, which will subscribe to that.

[05:25] We saw that an operator is a function attached to each observable, which will return a new observable -- that's important to remember --leaving the source observable untouched.