Reusable multicasting with Subject factories

André Staltz
InstructorAndré Staltz
Share this video with your friends

Social Share Links

Send Tweet
Published 8 years ago
Updated 5 years ago

The way we use publish() (or multicast with an RxJS Subject) makes the shared Observable not reusable if the shared execution happens to complete or emit an error. In this lesson we will see how to use a simple Subject factory function in order to create a new Subject, one for each shared execution, whenever connect() is called.

[00:00] Now that we know about Publish, let's go back for a while to using Multicast with a new Rx subject, because there's something very important that we need to know about this. Doing this is the same thing as doing publish, but let's remind ourselves that it's the same thing as creating the subject separately here before. Once we've constructed that subject, we pass it to the Multicast.

[00:26] This means that whatever happens on this source observable will be replicated on the subject. For instance, if we had "take 6" here, then this observable would emit six values and then complete, which means that that complete event would also happen on this subject.

[00:44] If we plot on a marble diagram what happens on this subject, we would see zero, one, two, three, four, five, and then Complete. Let's remember that after a complete happens, we cannot have more events being admitted. That's the observable contract. It's enforced inside RxJS, and we can't change that.

[01:06] If we run this code, we're going to see that when A subscribes, A will trigger the shared execution with connect. Then B arrives, and then after that, A leaves. B will see the last event, five. B will see the complete, and then B unsubscribes. When the number of unsubscribers goes from one to zero, with reference counting, we are stopping the shared execution.

[01:33] What happens if we would, after, let's say, eight seconds, A would subscribe again? In this case, after eight seconds, the number of subscribers would go from zero to one. What does that mean? It means a connect, because we have reference counting. This means that the source observable would be subscribed using this same subject.

[02:00] Pay attention that this subject already saw a complete, so this subject will not be able to emit zero and one again, because you cannot emit values after complete. What happens is, A subscribes in the beginning. It starts the shared execution, and then after a while, A leaves. B sees the last event, and B unsubscribes, but then when A subscribes again, it changed the number of subscribers from zero to one.

[02:29] The source observable is subscribed, but this subject has already emitted complete, and that means that it cannot emit zero, one, two, and three. This makes the shared observable not really reusable. We want to be able to use this multiple times. The way that this is written right now, just makes the shared observable not be reusable, because once the last observer leaves, we can't really use it any more.

[03:01] How could we make it reusable? Gladly, Multicast can take a subject like this, or it can take a subjectFactory. A subjectFactory is a function that once you call it, it will return to you a new subject, let's say like this. That's good, because now if we would create a second subject, Subject 2, then we would be able to start emitting those values for observer A, like that.

[03:37] Because this one is done, it cannot do anything anymore, but this one can, because it's completely new. Multicast can take either a subject, which will be the same one forever, or it can take a subjectFactory, which is basically a way of creating new subjects. The subjectFactory will be called when we connect.

[03:58] Here, when we subscribe for the first time, when the number of subscribers goes from zero to one, we're going to call connect. We're going to create a new subject, and we're going to use that subject to Multicast here on this source observable.

[04:12] As we know that shared execution's going to stop, then after a while, we're going to do that again. With connect, it's going to call the subjectFactory to create a new subject, and then that's going to be used for the next observers. Let's try to run that and see what happens.

[04:30] On the first zero to one, this shared execution starts, B arrives, and then after a while, A leaves, B leaves, and then we subscribe again. The number of subscribers goes from zero to one, we connect, we create a new subject with the subjectFactory, and that's how this shared observable suddenly became reusable.

[04:54] It means that it can stop a shared execution, but it can also start again by creating a new shared execution with a new subject. Bear in mind that with a Multicast, you can either pass a static subject here that's created just once, or you can pass a subjectFactory, which is a way of creating an entirely new subject for each connect, or for each new shared execution.

Bogdan Nenu
Bogdan Nenu
~ 8 years ago

Hi, This topic of Subject factories looks very interesting but i can't wrap my head around one problem...What if i want to have a Subject (actually a BehaviourSubject) which i want to use to push values to the stream continuously and also use for multicast, and the factory would seem to solve the problem of resubscription if the stream errors somehow Basically i want to use this construct for state shaped by actions Like var actionSubject = new Rx.BehaviourSubject() var reducer = (state, action) => ( {...state, [action.type]: action.payload} ) var state = Rx.Observable.from({}) .multicast(actionSubject) .flatMap(action => isObservable(action) ? action : Rx.Observable.from([action])) .scan(reducer) .refCount()

var actionStream = subject => action => { subject.next(action) } var action = actionStream(actionSubject)

The idea is that if I swap the actionSubject with a factory in multicast I would have to find a way to push the actions to that newly created subject from that point on...correct? Or is there a better way to do this altogether?

André Staltz
André Staltzinstructor
~ 8 years ago

Bogdan, if you already have a subject then you do not need to apply multicast. You can simply write:

var stateObservable = actionSubject
  .flatMap(action => isObservable(action) ? action : Rx.Observable.from([action]))
  .scan(reducer)
  .refCount()

Multicast is useful for plain observables, not subjects. That's because multicast basically converts an observable to the subject, but you already have a subject.

Kevin Pinny
Kevin Pinny
~ 7 years ago

The reason why directly putting a Rx.Subject() in a multicast prevents from 're-connecting' is because of the fact that multicast holds a reference to that Subject, while a factory always generates a new instance of that Subject? Do I understand this correctly?

André Staltz
André Staltzinstructor
~ 7 years ago

Yes, that's correct.

Byron McMullen
Byron McMullen
~ 6 years ago

This took me awhile to figure out so I'll leave it here:

If your source observable does not emit done before the refcount returns to 0, then when you re-subscribe to shared observable A will receive each of the emitted values of source.

let shared = Observable.interval(1000).take(6).multicast(new Subject()).refCount();
let subA = shared.subscribe(observerA);

// note, we are unsubscribing before the shared observable could emit 
// all 6 values, which would take 6000ms
setTimeout(function() {
  subA.unsubscribe();
}, 5000); 

setTimeout(function() {
  subA = shared.subscribe(observerA);
}, 8000);
// this will emit
// A next 0
// A next 1
// ...
// A done

This confused me because at first glance it appears to behave similarly to if you had passed a factory to multiCast.

Markdown supported.
Become a member to join the discussionEnroll Today