Guarantee Ordering of Events with RxJS

Share this video with your friends

Social Share Links

Send Tweet
Published 6 years ago
Updated 4 years ago

Did it ever happen to you that you had to make sure certain user events are processed in order? Maybe some of those events were even async. Guaranteeing ordering can be quite tricky, unless you know some RxJS magic. In this lesson I'll briefly show you how to use RxJS's concatMap and switchMap operators to make sure events are processed in the right ordering and thus to prevent our UI from getting out of sync.

Instructor: [00:01] Let's take a look at this very simple use case here, which somehow reflects a potential real world use case. Here above, in the first panel, the user has the possibility to make some choices to click some check boxes.

[00:13] Now, if I click a check box, then some time passes, so some async event happens, and it will resolve, then it's being added here on the list below. Same here with option two. If I again uncheck, that according option is then removed away from the list below.

[00:30] Now, let's quickly look how that happens. Here, we basically have a list handler, which is happening or firing whenever I click here, one of these check boxes. I get here an event in. Basically, a check box event, as well as the label, and the value of that option here.

[00:49] Then what I do is basically, whenever that option is check, then I simulate here some timeout, some async event. Usually, this could be, for instance, a call to the server with the given ID. You have to fetch some additional data, for instance.

[01:02] For sake of the simplicity here, we basically just uses a timeout. After one and a half seconds, that result comes here. Then we just add the same option to our list of records. At the same time, when that check box event is not checked, I basically remove it from the set of records by here using the spread operator, and filtering out the ID, which I unchecked.

[01:26] Now, maybe you have already spotted the potential problem here. We see this section in between is async. It takes here in this case one and a half seconds. In the real-world case, we don't even know how long it will take.

[01:38] This on the other side is synchronous, so it happens immediately. Now, have a look what happens here if I do something like a double click. As you can see, a check box is unchecked, but the option still gets added here to the result.

[01:54] Of course, why is that? Well, the first click of the double click enters here, sets the timeout, and then continues. The second click immediately goes inside here, tries to filter out the record, doesn't find it, so nothing actually happens.

[02:07] Then a bit later, this one comes back, and adds the record, which we just clicked. Of course, this button doesn't work as we want. What we somehow have to guarantee is we have to make all of these events happen in order.

[02:21] Basically, if we double click, we have two kind of events. The first one that goes in here, and then the second one. We have to make sure that the second one always happens after the first one. Now, implementing that can be actually quite tricky.

[02:34] We could handle it via some flags, which we'll remember, which we set whenever we trigger here an async action. Then we somehow would have to remember to do the action here, which happens afterwards. What happens if the user clicks like four times? We would have to handle that as well. '

[02:50] It can get quite tricky. Let's see how we can solve this, actually, by using RxJS. In RxJS, the idea here is that we have some kind of subject, which is basically like a data pipe, to which we pipe in the events. Then we will apply some kind of operators that make sure that these events are processed in sequence.

[03:11] First of all, let me here add a selection subject, which is of type subject. Selected the imported one here from RxJS. Let's make sure we type this one here so we don't have to use any. What we want to do is to extract this one here into dedicated interface. Let's call it here selection event. Then obviously, also down here, we will use the same one, which gets here passed into our event handler.

[03:42] So far, we haven't done nothing great, but just a bit of refactoring. Now, let's define how our selection subject here works. What we want to do here, we can do that in the extractor, also here in the onInit. We want basically to listen to that selection event. We use the selectionSubject.pipe.

[04:01] The pipe here allows us to specify then different operators that will be applied to our RxJS observable here. In the end, we will subscribe to do the effective action. Let's for now implement just empty here. Great. In order to guarantee the ordering of the events, we can use a very specific operator from RxJS, which is called concatMap.

[04:23] ConcatMap will make sure that when we here, for instance, the data, which will be our event here, let's also import that from RxJS. ConcatMap will make sure that whenever we get here something in, we have to return an observable. Then it will subscribe to that observable in sequence.

[04:44] When we get multiple calls that come inside here, it will handle internally the ordering of these events, and then pass them along to our subscribe here below. In our case, this is actually our selection event. What we can do here is we do basically the same as we did in our event handler below. '

[05:02] We tell here whenever that is checked, then we basically have to do the async operation, else we basically just have to remove that record again from the list. Now, I don't want to implement actually the side effect inside here. What I want to do here is just categorize what I want to do. I'm returning here some kind of what I could call action what in the subscribe has to be done there.

[05:27] This is just an implementation choice I am taking. Let's take here the type, which in this case, will be remove, and then the data that we have to remove will be the ID. Let's also here filter out ID, which can be quite handy. This is actually the same code I'm constructing here, which we already have here below.

[05:45] I'm here returning an observable of that ID. Let's import also this one here. Then we have to implement here the async part. We can just copy over, there's a timeout. Now, as you can see here, we have to return an observable, so there's a timeout just left like this wouldn't work. Now, there are different possibilities of how we could wrap this timeout, so timeout into observable one.

[06:10] There is a specific observable which is called timer, where we could use then the 1,500 milliseconds again, and basically here do via the pipe mapping, and return what we went onto return. We could also explicitly create here our observable.

[06:25] Let's do that. Let's do an observable.create. We get here an observer, and then inside here, we can use .setTimeout. Inside .setTimeout, rather than pushing this again, because that's something we will handle in a subscribe, we just return whatever we want to do.

[06:43] Like in this case, we need to return such an action, which has a type of, let's say, add, and the data will be the option, which we get passed in. Something like data.option. We also need to make sure to complete that observer.

[07:00] Obviously, we could type this to here to make sure we are also type safe. In this case, we complete that observable once .setTimeout has done its job, because then the observable can be closed, and it's done.

[07:12] Another thing we shouldn't forget is also up this. As you know, observables can be canceled. Whenever we cancel it before .setTimeout even starts, we just don't want to have it start at all. Let's save here the timeout.

[07:29] Then below, we can say something like whenever the unsubscribe is called, which is also a function, then we do the clear timeout of that timeout. Let's also add here a console.log, fetching record, just so that when we cancel it later, we see where we got effected, the cancel.

[07:55] With that now, we have defined our concatMap, and we have told RxJS how to deal here with our events. Whenever, they come in, they will be classified based on whether they are going into that section, doing .setTimeout, or just returning the operation that should be done.

[08:13] Obviously, we need to implement effective logic. That's actually quite simple. We get here an action, and then what we can do here is to say if .action.type is equals to add, then we do the records.push, action.data.

[08:35] Otherwise, we do the same as we did here below, so something like this. We filter out the record that's got to be passed, and we just have to switch here that with the data, which will contain the ID. Now, let's here add just any to that action to not have to type it. That said, we should be safe.

[08:56] Now that we have implemented here the whole sequence, below in the unchecked list change, we don't have to execute it, of course. The only thing we have to do here is to save the selection subject, and emit that event directly into it.

[09:09] We pipe the data through our event handler here into our subject, which will then travel here through that concatMap, do the operation, fire here the action that has to be done, which will then be handled. The whole concatMap will make sure that here, for instance, our removal, even if it has been clicked, or even if it faster, it will be treated after this one has completed.

[09:34] The ordering is guaranteed. Now, let's save this and compile again, and now, let's try whether this works. Let's do a double click. Nothing happens. Let's do a click. Let's see where it works. Yes, it works. We can also remove it again.

[09:48] The implementation is still that. Now, there is still potential to improve this whole process, because if you note, for instance, when we double click, the events are happening in order because we don't get here a result added wrongly.

[10:02] Actually, what happens behind the scenes, all actions will be executed. Even if we double click, the whole part of .setTimeout will be called. In fact, you can see that fetching record below here. It will fire that add, and then afterwards immediately remove the fire so the record will get added and removed immediately.

[10:20] We don't just recognize it here, but behind the scenes, it actually happens. Now, this is completely useless, and we could optimize that because if we do some double checks, so if we check and uncheck immediately, it's like undoing the action.

[10:33] There is no need for even executing that whole process in .setTimeout here. That's the nice thing of cancelable observables, which on the other side with promises, you wouldn't be able to do. Let's see how we could do that.

[10:44] Now, instead of using concatMap, we can simply switch out here the operator. I'm using here a switch map. The property of switch map is that whenever a second observable gets in, the previous one will be canceled. We will be unsubscribed, and it will be terminated.

[11:01] Let's see how that works. I'm just saving again the whole project here, let it compile. Now, let's clear the log here, and let's see whether it still works. We will click. The option gets added. We also see that fetching record. If I click again, it gets removed.

[11:15] Now, take a look what happens if I double click. We wait a couple seconds. Nothing actually happens. Not even that fetching record here gets executed, so the timeout is completely canceled. That's happening because the event, the second observable here comes in immediately after the first one.

[11:32] It cancels here that first observable, which means the unsubscribe will be executed, which clears here our timeout. Therefore, that logic here in between is never executed, actually. This is a very nice mechanism for optimizing our code. As you can see, we just had to switch out here from concatMap to switch map.