The ability to reply to discussions is limited to PRO members. Want to join in the discussion? Click here to subscribe now.

Reusable multicasting with Subject factories

Reusable multicasting with Subject factories

5:19
The way we use publish() (or multicast with an RxJS Subject) makes the shared Observable not reusable if the shared execution happens to complete or emit an error. In this lesson we will see how to use a simple Subject factory function in order to create a new Subject, one for each shared execution, whenever connect() is called.
Watch this lesson now
Avatar
egghead.io

The way we use publish() (or multicast with an RxJS Subject) makes the shared Observable not reusable if the shared execution happens to complete or emit an error. In this lesson we will see how to use a simple Subject factory function in order to create a new Subject, one for each shared execution, whenever connect() is called.

Avatar
Bogdan

Hi,
This topic of Subject factories looks very interesting but i can't wrap my head around one problem...What if i want to have a Subject (actually a BehaviourSubject) which i want to use to push values to the stream continuously and also use for multicast, and the factory would seem to solve the problem of resubscription if the stream errors somehow
Basically i want to use this construct for state shaped by actions
Like
var actionSubject = new Rx.BehaviourSubject()
var reducer = (state, action) => ( {...state, [action.type]: action.payload} )
var state = Rx.Observable.from({})
.multicast(actionSubject)
.flatMap(action =>
isObservable(action) ? action : Rx.Observable.from([action]))
.scan(reducer)
.refCount()

var actionStream = subject => action => {
subject.next(action)
}
var action = actionStream(actionSubject)

The idea is that if I swap the actionSubject with a factory in multicast I would have to find a way to push the actions to that newly created subject from that point on...correct? Or is there a better way to do this altogether?

Avatar
Andre

Bogdan, if you already have a subject then you do not need to apply multicast. You can simply write:

var stateObservable = actionSubject
  .flatMap(action => isObservable(action) ? action : Rx.Observable.from([action]))
  .scan(reducer)
  .refCount()

Multicast is useful for plain observables, not subjects. That's because multicast basically converts an observable to the subject, but you already have a subject.

In reply to Bogdan
HEY, QUICK QUESTION!
Joel's Head
Why are we asking?