Pipe events to numbers and maintain a running count using the scan operator

Rares Matei
InstructorRares Matei
Share this video with your friends

Social Share Links

Send Tweet
Published 5 years ago
Updated 4 years ago

In this lesson, we will use the simple data sources we created earlier, to create a more specialized stream that gives us the current count of tasks that are in progress.

Instructor: [0:00] Let's look at the first problem we have to solve. I'll paste it here so we can follow more easily. I'll use my raw initial sources that I have up here to create a more specialized load up observable that emits a one anytime a task starts. I'll do the same thing for a load down observable that emits a minus one, anytime a task completes.

[0:23] Now, we can use these two to combine them into an even more useful load variations observable, that gives us plus ones and minus ones, depending on how tasks are starting and ending. Notice how I've imported map two from the RxJS operator's package because it's meant to be piped and merge.

[0:43] Because we're actually using it to create a brand new observable, it's being imported from the route RxJS package. Let's celebrate progress. We're already in a much better position than when we started.

[0:53] This observable is actually all we need from now on to solve our problem, so we can forget anything that we have up here. I'll actually make this more obvious, and use the special comment from now on to mark that we can stop worrying about anything we have above it.

[1:09] This helps us work in a very restricted space. My cognitive demand is much lower when I can be sure that all the context I need to keep in my head is what I have highlighted, versus this whole page. Because we're pretending that we don't have access to what is above this, variable names are very important when we work like this.

[1:29] We shouldn't really have to go back up to see how all of this works. It should make sense from its name. We'll consider our current problems solved if we have this, an observable that gives us the current load count of tasks in our app. We'll start with our load variations observable.

[1:47] Because we need to maintain a running count between emissions, I'm going to pipe that to a scan. I'll quickly go up and import it. Now scan has the same API as the reduce array method. It accepts a function, which will receive two arguments, the total current number of loads and the change in loads, our variation that we get from here.

[2:14] What it's going to return is the previous load count, to which we will add the new change in the number of loads. It also accepts a starting value after a function, and we want it to start from zero.

[2:27] Just to quickly go back and recap, the moment the task starts, it will get mapped to a number one, so load variations will limit to one, which will, in turn, increase the count by one. If a task ends it will get mapped to a minus one, so load variations will limit to minus one, which will decrease our count by one.

[2:44] We started from some very raw streams, and we use those to create two more specialized streams. Then we combine those to create an even more useful stream all the way up to this, a stream that whenever somebody subscribes to it. They'll get the current number of loads in our application.