Handling Multiple Streams with Merge

John Lindquist
InstructorJohn Lindquist

Share this video with your friends

Send Tweet
Published 6 years ago
Updated 3 years ago

You often need to handle multiple user interactions set to different streams. This lesson shows hows Observable.merge behaves like a "logical OR" to have your stream handle one interaction OR another.

[00:00] Now to wire up this reset behavior, I'll create a quick reset button. I'll duplicate the necessary things so reset, reset, a reset stream based on the reset button. Now I have a reset stream which will trigger every time I click "reset."

[00:19] The way to think about this is when my interval fires, I want it to pass this increment function, but when my reset fires, I want it to pass the reset function. When, actually, if you wanted to do one thing or another thing, you should think observable.merge.

[00:38] We'll start with interval.stops. I'll hit save and we'll still be at the exact same behavior start, stop, start, stop. Now I can say I want to switch the interval or I want it to use the reset. I'll hit save, I'll hit start, stop, and now when I hit reset, it's just going to manually increment that count.

[01:09] What's happening is we're mapping everything to that increment. What I really want is to map the interval to the increment, and I want to map the reset to the reset function. Now when I hit save, when intervals come through, we're incrementing and when I click reset, we're going back to zero.

[01:31] Intervals, increment, reset goes back to zero, reset, stop and we have that behavior working as expected. Now I can move this out. I'll call this inc or reset, paste it and we'll just say switch map to inc or reset. Hit save, start, stop, reset.

[01:59] Then we can clean this up to make this one line, and then to look at the way that everything reads now. We'll move this stream down here with the other stream. Our timer starts with some data, the data being an object with a count of zero which automatically gets pushed down into subscribe, and sets these scans accumulator.

[02:22] Then when I click start, it's going to switch over to inc or reset. Inc or reset is either going to push out every single second from my interval stream or when I click the reset button. What they're going to push through into the scan operator are functions.

[02:40] Either an increment function which takes an accumulator, this accumulator, and then increases the count by one or a reset function which simply resets it back to the original data. Then in my scan operator, it's going to take that function and call it on the current state or the current accumulator of our stream.

Vasyl
Vasyl
~ 4 years ago

Hi, I`m trying to start the same code in *.ts file, but receive an error: Error:(43, 18) TS2345:Argument of type '{ count: number; }' is not assignable to parameter of type '((acc: any) => { count: any; }) | ((acc: any) => { count: number; }) | IScheduler'. Type '{ count: number; }' is not assignable to type 'IScheduler'. Property 'now' is missing in type '{ count: number; }'. It is in the string with data param:

startButtonClick$ .switchMapTo(incOrReset$) .startWith(data) .scan((acc, curr) => curr(acc)) .subscribe((x)=> console.log(x)); However, this code works in *.js file. Can somebody help me with, how to solve current issue?

ganqqwerty
ganqqwerty
~ 4 years ago

mind-blowing, it really requires a new way of thinking, damn. I'm not sure I understand the diff between mapTo and switchMapTo. Is it correct that mapTo maps the stream to a function whereas switchMapTo maps the stream to another stream?

Bogdan
Bogdan
~ 3 years ago

Hi, I`m trying to start the same code in *.ts file, but receive an error: Error:(43, 18) TS2345:Argument of type '{ count: number; }' is not assignable to parameter of type '((acc: any) => { count: any; }) | ((acc: any) => { count: number; }) | IScheduler'. Type '{ count: number; }' is not assignable to type 'IScheduler'. Property 'now' is missing in type '{ count: number; }'. It is in the string with data param:

startButtonClick$ .switchMapTo(incOrReset$) .startWith(data) .scan((acc, curr) => curr(acc)) .subscribe((x)=> console.log(x)); However, this code works in *.js file. Can somebody help me with, how to solve current issue?

Hello! Not sure best or not, but i find way to resolve this (I have RxJS version 6.3.3). startStream.pipe( switchMapTo(merge( _interval.pipe(mapTo(inc)), resetStream.pipe(mapTo(data)) )), startWith(data), scan((acc: number, curr:(acc:number)=>number) => curr(acc)) ).subscribe((arg) => { console.log(arg) });

Bogdan
Bogdan
~ 3 years ago

UDP: Tips above correct if you use data as number, if data is object, as in lesson above, correct be:

startStream.pipe(
    switchMapTo(merge(
        _interval.pipe(mapTo(inc)),
        resetStream.pipe(mapTo(() => 0))
    )),
    startWith(0),
    scan((acc: {count: number}, curr:(acc:{count: number})=>{count: number}) => curr(acc))
).subscribe((arg) => {
    console.log(arg)
});