Introducing the Observable

Jafar Husain
InstructorJafar Husain
Share this video with your friends

Social Share Links

Send Tweet
Published 9 years ago
Updated 5 years ago

In this lesson we will get introduced to the Observable type. An Observable is a 'collection that arrives over time'. Observables can be used to model events, asynchronous requests, and animations. Observables can also be transformed, combined, and consumed using the Array methods we learned in the previous lessons. We can write powerful and expressive asynchronous programs using the few simple methods we've learned so far.

[00:00] I've got good news for you. It's time to start introducing you to a little sneak peek of asynchronous programming. Up until now, we've been learning how these methods can be used to transform collections. In fact, I'm going to teach you guys that events, and asynchronous requests, and animations can all be modeled as asynchronous collections that arrive over time.

[00:21] Once we can model all these things as collections we can use all of the array methods that we're all now familiar with, map, filter, concatAll, and forEach to combine together these asynchronous data sources and build complex asynchronous programs very easily.

[00:38] How can we possibly model an asynchronous operation as an array? We can't, but there is a type that we can use to model an asynchronous operation, as a collection, a collection that arrives over time. It's called an Observable.

[00:51] The way we get the Observable library is that we add the reactive extensions library. It's down here at the bottom in JS bin, rx.all. You can also Google for rx.js and download it if you're playing offline, if you're not using JS bin. I'm going to add this library.

[01:06] An Observable is just like an array, except the difference is with an array, all the data is stored in memory, whereas with an Observable, no data is stored in memory, and the items arrive over time, asynchronously. An array has all the data ready, right there, just to pop out synchronously, but an Observable is a collection of items that arrive over time.

[01:26] Another example, something very similar to an Observable that we all probably have a little bit of experience with is a DOM event. How do we hook up an EventListener to listen for DOM events? First, let's just add ourselves a button. As you can see, I've already got a button over here. I'm going to give it an ID of button. I'm going to hide the html view and show the output, so we can see button. Then I'm going to create a variable to store a handler to that button. Now I'm going to create a handler which I want to invoke whenever the button is clicked.

[02:07] I addEventListener to search for the name of the event and then a function to execute when that event arrives asynchronously. Now, if I run this and I click this, great, I see "alert clicked." If I want to unsubscribe from the event after the very first click, I would go button.removeEventListener, and pass in the event name once again and then the handler I'm trying to remove. Now if I run this and I click, I get clicked, but now since I've unsubscribed from the event I never get another click again.

[02:50] This is one way of thinking about events, which is interacting with events, through APIs that's sort of hanging off an object. But, Observable gives us another and much more powerful way of thinking of events, because it gives us an object by which to represent that event stream, and using that object, we can call methods like map, filter, and concatAll. All we have to do is convert the event into a collection, an observable collection by going Observable.fromEvent, passing in the DOM element itself, and then the name of the event.

[03:25] I'm going to comment this out right here. If I want to print something out every single time this button is clicked, it's really the same as when I consume items in an array and do something with it. If I want to consume the items in an array, 1,2,3, and do something like print to the console, I use forEach.

[03:46] I'll show the console here, and run this, and we see I get 1,2,3. I can use the exact same forEach method to consume all of the items in the click collection. I'll do exactly that. I get out, it's just a collection of all the event objects that would have been sent to your event handler, right here, and now I'm going to go ahead and alert that I've been clicked.

[04:23] If I run this, sure enough, I get an alert that says I've been clicked. Notice how similar these two pieces of code really are. If we stop thinking about events as just these appendages that hang off of objects and start thinking of them as first class collections, we can use all the methods that we already know, forEach, concatAll, et cetera.

[04:52] Notice that in this particular example we really haven't bought ourselves anything. In fact, it's kind of trivial. We're also missing something which is the ability to unsubscribe from the event. How would I unsubscribe from clicks?

[05:04] Notice that there's a difference between the way forEach executes for Observables and forEach executes for an array. If I want to print something out after everything in an array, all I have to is I have to move that code right directly underneath the forEach expression. I could just do this. Furthermore, if I want to catch an error that might occur, say for example in my forEach handler, all I have to do is surround the entire thing with a try-catch statement.

[05:46] Why can't I use the exact same approach here? The difference is that arrays, the forEach method always executes synchronously, because all of the data in the collection is already there, so all forEach has to do is run through it. Because this operation executes synchronously, if any kind of throw expression appears inside of here, it will be caught and pumped down to catch.

[06:11] Furthermore, by the time we get to the end of the forEach expression, we know we've already gone through all of the items in the array. If we print out anything we know that it's only going to print out when all of the items in the collection have been exhausted.

[06:24] But with an Observable, forEach executes over time. It executes asynchronously. This code isn't going to get executed until I click that button. Which means that if I try and do something similar, and I surround all this stuff inside of a try-catch, and I add a log that says done, which prints as soon as this entire event stream is finished, and I try and catch any errors that occur and print them out, this is not going to work the same way I'm afraid. Because this code is not going to get executed now, right here in place. It's going to get executed later on when somebody clicks the button.

[07:12] At that point it's going to be on the event loop, which means it's not going to be surrounded in the try-catch anymore. Nor, by the way, we're not going to be able to catch errors that are thrown in here using this technique. This will not hit this callback right here. That's a serious problem. Furthermore, this is not going to execute when the asynchronous collection ends, in other words when there are no more items that come through our Observable, it's going to execute immediately, because forEach immediately returns and later on the callback runs asynchronously.

[07:52] We have a problem. How do we do something either A, when the collection has been completely exhausted, or how do we do something when an error is detected when we're working with an asynchronous collection?

[08:03] In order to accommodate this, forEach for an Observable works differently, slightly differently, than an array. It accepts two extra callbacks, so there are actually three callbacks that are passed to an Observable. The first one is called the onNext callback, because it receives the next data that arrives. The next one is called the onError callback, because it receives any error that arrives. I'm going to take this code that we reserved for handling errors and I'm going to move it into this callback.

[08:36] What about completion? There's a third callback called onCompleted, and it accepts no arguments. It just gets invoked when the Observable's complete. Now, I can get rid of my try-catch, because any error that occurs asynchronously is going to be sent to this onError method, and onCompleted, I'm going to take this done expression here and I'm going to move it inside this callback, because these two callbacks are invoked asynchronously, we can get the same effect. If an error occurs asynchronously, this callback will be invoked, and if the stream of clicks ends, this callback will be invoked.

[09:23] Let's have another look at the synchronous and asynchronous examples stacked up. When we're working with a synchronous collection like an array, we do things like this. When we're working with an asynchronous collection, an Observable, we do things like this.

[09:38] There's one more trick that forEach has up its sleeve. We haven't explained still because forEach is asynchronous, how we can unsubscribe, how we can stop listening for button clicks, specifically, this line right here. How do we do that? it turns out that when we call forEach on an Observable we get a subscription object out.

[09:58] What's a subscription object? A subscription object is just a little object that we can use to unsubscribe from an Observable, which is another way of saying, "We don't care about any more of the data that comes out of an Observable."

[10:11] Again, there's no equivalent for unsubscribing in an array because forEach completes synchronously. There's really no opportunity for you to stop listening or call unsubscribe, whereas with an event, because things happen asynchronously, really at any time you can decide that you want to stop listening.

[10:27] The equivalent here is a subscription object and then as soon as the first click comes through we're going to call subscription.dispose. All that does is it's a way of saying, "I'm not interested in getting anymore onNext methods. I don't want my callback invoked anymore times, or any callback for that matter, no matter how many more items go through click." Under the hood, that's going to trigger the Observable to callRemoveEventListener and it's going to clean up the handler for us.

[10:59] I'm going to comment out this Array code, and we're going to give this a run, right now. I click once, but now notice we're affecting the subscription.dispose to get executed, and so if I click again, nothing happens. We stopped listening for the event. These two pieces of code are roughly equivalent.

[11:27] Notice that an error will never come out of a DOM event. DOM events don't fire errors, but later on we'll learn that Observables can be used to model asynchronous requests as well, and asynchronous requests do sent errors.

[11:39] The good news is that no matter how complex your expression, how many maps and filters and concatAlls you use, Observable will detect any asynchronous error that occurs and it will pipe that to this onError function. We don't have to worry about forwarding on errors. It's actually very similar to the way try-catch works.

Jay
Jay
~ 9 years ago

The onCompleted callback never gets called. I was expecting that to be called after subscription.dispose(). Wouldn't that need to happen for the array and observable code to be equivalent?

Joel Hooks
Joel Hooks
~ 9 years ago

You are disposing of the subscription to the Observable prior to the Observable's completion. In this case (click watching) the Observable is "infinite" and will never complete.

labeeb
labeeb
~ 8 years ago

I commented out subscription.dispose(); still why onCompeleted is not called?

Zach  Sosana
Zach Sosana
~ 8 years ago

Is there something I am missing? I am using node, I am able to get it viewing on my localhost but not able to get a simple alert working.

I require rx in my js file , i put that js script tag in my html

Alesei Narkevitch
Alesei Narkevitch
~ 8 years ago

which version of Rx.js you are requiring?

kisanbhat
kisanbhat
~ 8 years ago

rx.all.js:7631 Uncaught TypeError: Cannot read property 'addListener' of null

I am getting the above mentioned error and in HTML code I have added rx.all.js (v4.1.0) . Any idea why I am getting this error?

Tony Brown
Tony Brown
~ 8 years ago

This course has totally changed the way I think about handling events, thank you so much for such a clear explanation of the technique.

mobility-team
mobility-team
~ 7 years ago

I guess it's important to notice that Array.prototype.forEach is not asynchronous, but Observable.prototype.forEach is. Or am I wrong?

Jafar Husain
Jafar Husaininstructor
~ 7 years ago

Technically it's possible for an observable to fire either synchronously or asynchronously. Overall you're right.

Daniel
Daniel
~ 7 years ago

With the newer version of rxjs => https://cdnjs.cloudflare.com/ajax/libs/rxjs/5.0.0/Rx.min.js, it calls the onError(error) right away, even before I click on the button - it triggers on page load. Will you update your tutorial or provide an answer on why is this happening? Thank you.

Jafar Husain
Jafar Husaininstructor
~ 7 years ago

I think you will have to include a code example. There are several cosmetic differences between the version of RX used in this course and the new version of RX. The code may not work exactly as is. I will need to see your port.

anton
anton
~ 7 years ago

This works in 5.0.1: const subscription = clicks.subscribe(function onNext(e) { alert('clicked'); subscription.unsubscribe(); }, function onError(error) { console.log('Error!'); }, function onCompleted() { console.log('done'); }); It looks like subscribe method should be used instead of forEach. In addition to this dispose method should be replaced with unsubscribe.

Bing
Bing
~ 7 years ago

good point

Samuel Elgozi
Samuel Elgozi
~ 7 years ago

There is one thing I'm not sure I got. The async forEach function you are talking about is not the same one we have in Array.prototype.forEach right? It is one similar to it in functionality but in the prototype of the observable object in the rxjs library.. right?

coisnepe
coisnepe
~ 6 years ago

This does not work, which is why it'd be nice to: 1 - Clearly state what versions of packages are used 2 - Update the course regularly...

Ferenc
Ferenc
~ 6 years ago

It seems in version 5.0.3 (that's what I'm testing with), the forEach method has the following form:

forEach(next: Function, PromiseCtor: PromiseConstructor): Promise

So basically it returns a promise which gets finished on completion (like the second callback in v4), and gets rejected in case of errors. So in my understanding, the example in this video could be migrated to v5 like:

Rx.Observable
  .fromEvent(btn, 'click')
  .forEach((e) => console.log('clicked'))
  .then(() => console.log('completed'))
  .catch(() => console.log('an error happened'));

Please tell me, if I got this wrong :)

Markdown supported.
Become a member to join the discussionEnroll Today