Stopping a shared observable execution

André Staltz
InstructorAndré Staltz

Share this video with your friends

Send Tweet
Published 6 years ago
Updated 4 years ago

ConnectableObservable has the connect() method to conveniently dictate the start of the shared execution of the source Observable. However, we need a mechanism to dictate the stop of the shared execution, otherwise a leak happens. This lesson will teach you how to do that, and it's all about Subscriptions.

[00:00] The connect observable has the connect method to conveniently dictate the start of the shared execution of this source observable. But doing this is actually rather dangerous, because we may be created a leak, because connect here essentially says when should we start the shard execution, but we don't yet have anything saying when to stop the shared execution. Let me give an example.

[00:22] This source observable is finite, it has only five values being emitted, so I'm going to remove this and now it's an infinite observable, it ticks every one second. If we add do here just to console log when do those events happen on the source, then we're just tapping into the shared execution of this source observable, we're not adding observers. Then if we remember subscribe will always return a subscription, right? So we can keep the reference to that, to both A and B, and the we can do something like after five seconds, we're going to unsubscribe from A and unsubscribe from B.

[01:12] When we run this, we're going to see the source is producing those events and the observers are getting those, and then after five seconds both of those observers unsubscribe, but the shard execution keeps on going, because it doesn't have anything to say when should it stop. Connect just started it, and it keeps on going on forever. Connect actually returns a subscription, and we can keep that like this.

[01:43] Just like a subscribe will return a subscription, a connect also returns a subscription because internally, remember, connect with subscribe to this source observable using this object. So it makes sense, it should return a subscription. Then we can get this, and we can unsubscribe from that in order to tell when to stop the shared execution. Now if we run this after five seconds we're going to see that the shared execution will stop, and then nothing happens after that.

[02:23] Just remember that with connect we are manually controlling the start of the shared execution, and then we keep a subscription in order to manually control the stop of the shared execution. All of this is in order to avoid leaks.

Tomasz Kula
Tomasz Kula
~ 6 years ago

Does calling unsubscribe on connectableObservable subscription clear subscriptions for subA and subB?

Asumming:

  • sub = connectableObservable subscription to internal Subject
  • subA = obsA subscription to connectableObservable
  • subB = obsB subscription to connectableObservable

What should correct usubscribe code look like? ###A

setTimeout(() => {
  subA.unsubscribe();
  subB.unsubscribe();
  sub.unsubscribe();
}, 5000)

###B

setTimeout(() => {
  sub.unsubscribe();
}, 5000)
André Staltz
André Staltzinstructor
~ 6 years ago

Hi Tomasz. Unsubscribing from sub will not clear subscriptions A and B, because those are attached to the underlying subject. The subscription sub is attached to the source observable. So you would have to write:

setTimeout(() => {
  subA.unsubscribe();
  subB.unsubscribe();
  sub.unsubscribe();
}, 5000)

That said, usually we don't explicitly call connect(), so this use case is actually rather rare. As we see later on in the course, there are better ways of using multicast so that subscriptions are handled more automatically.