Go ProSign in

Article

Comparing Callbags to RxJS for Reactive Programming

Callbags is a different approach that I discovered for reactive programming with streams in JavaScript. It’s a spec that introduces a different abstraction. If you are familiar with RxJS, then Callbags can be understood as a generalization of RxJS, and because of that, they are able to neatly solve problems that were not quite within RxJS's power. As implementations of the spec, there are many community libraries.

The name “Callbags” is not a typo, because the spec is about a few rules to callback-based programming in JavaScript. The Callbag spec roughly says: "if your callbacks follow this specific new shape *(type, payload) => {...}* and a few new rules, then you are playing the game named Callbag". Then, once playing that game, you can start applying higher-level strategies. If you think of Callbag-compliant functions as soldiers, then Callbag utility libraries are generals with war tactics. Instead of playing the game directly, you orchestrate it through a sequence of strategies.

In this article I'll compare Callbags with another existing solution for async and callback-based programming in JavaScript: RxJS. It could also be compared to IxJS or pull-stream or xstream. But I'll pick RxJS to compare with, simply because it's more popular and easier to relate with. Note also that I'm using comparisons just to help make this article more relatable to concrete use cases, not because of competition between libraries.

Similarities

First, there are many use cases where RxJS and Callbags will look similar. For instance, consider this RxJS snippet:

const { Observable } = require("rxjs");

Observable.interval(1000)
	.map(x => x + 1)
	.filter(x => x % 2)
	.take(5)
	.subscribe({
		next: x => console.log(x),
		error: e => {},
		complete: () => {}
	});

With Callbags, it would be:

const { pipe, interval, map, filter, take, forEach } = require("callbag-basics")

pipe(
  interval(1000),
  map(x => x + 1),
  filter(x => x % 2),
  take(5),
  forEach(x => console.log(x)),
)

At first, it may seem that the difference is the API driven by pipe. That is just a superficial difference. Conceptually, those two snippets have no differences. In fact, I made an API that wraps Callbags in an RxJS-looking interface:

const Observable = require("callbag-pseudo-rxjs");

Observable.interval(1000)
  .map(x => x + 1)
  .filter(x => x % 2)
  .take(5)
  .subscribe({
    next: x => console.log(x),
    error: e => {},
    complete: () => {},
  });

Now this snippet above looks exactly like the RxJS one, and also behaves exactly the same, but it's powered internally by Callbags.

So what difference do Callbags have?

Introducing Pullables

The most important difference is that with Callbags, you can send data upwards at any point. In RxJS, next data can only go downwards, from producer to consumer. This is why data flows from interval, to map, to filter, to take, and then to subscribe, but never the opposite direction.

Data can flow upwards or downwards in a chain of Callbag operators. At this point, it's helpful to introduce some terminology:

Sources are Callbags that produce data, such as interval, fromEvent, etc. You can think of them as, roughly, RxJS Observables.

Many Callbag sources are like RxJS Observables, they just send data downwards, we will call these "listenables". However, because Callbags allow both directions of data flow, some sources send data downwards only upon receiving a message from below. We will call these "pullables".

Sinks are Callbags that consume data, such as forEach. You can think of them as, roughly, Observers.

Operators are functions that take a source as input and return another source.

Let's see an example of a factory of sources, fromIter, which converts an Array or Iterable to a pullable source.

// Source sends items 10,20,30,40,50, but only one item *per request*
const source = fromIter([10,20,30,40,50])

// "Subscribe" to the source
source(0, (type, data) => {
  if (type === 1) console.log(data);
});

The way we are "subscribing" to the source is low-level and exposes some Callbag details, like 0 as the first argument, (type, data) => {} as second argument. Usually we use high-level utilities like pipe and forEach.

That code just listens for messages coming down from source, it doesn't send messages upwards. The source, being a pullable, will actually not send back any data, because it is waiting to receive a request from below.

Gladly, the source provides us a way of sending messages back to it, with the talkback function:

const source = fromIter([10,20,30,40,50])

source(0, (type, data) => {
  if (type === 0) {
    const talkback = data;
    // Every 2 seconds, send a message "null" back to the source
    setInterval(() => talkback(1, null), 2000);
  }
  if (type === 1) console.log(data);
});
// 10 (after 2 seconds)
// 20 (after 2 seconds)
// 30 (after 2 seconds)
// 40 (after 2 seconds)
// 50 (after 2 seconds)

Note that this is vastly different to RxJS's Observable.from([10,20,30,40,50]) because that would have sent all those numbers immediately as soon as subscribed.

The goal with Callbag libraries is to hide the details of that soup of type=0, type=1, talkback, etc, so you can focus on the big picture. There is a handy operator that helps us in this case, [sample](https://github.com/staltz/callbag-sample):

pipe(
  A,
  sample(B),
  forEach(b => console.log(b)),
)

The snippet above says "when sample receives a message from A, then send a message upwards to B and make B to send messages downwards to forEach". We can use sample to send a request message every 2 seconds to the fromIter like this:

pipe(
  interval(2000),                         // every 2 seconds...
  sample(fromIter([10, 20, 30, 40, 50])), // send a message to fromIter, and
  forEach(x => console.log(x))            // listen to what fromIter sends
)

You can see this code in action in this CodeSandbox. To recap:

  • Listenable sources are Callbags similar to RxJS Observables, they can be subscribed and will thereafter send data downwards to their sinks
  • Pullable sources are also listenable sources, but they require one message from the sink before sending one message to the sink. We also say that pullable sources have "1:1 ratio" of messages upwards versus messages downwards.

There are also hybrid types of sources, that need to be pulled, but which may not strictly follow the 1:1 ratio. One of the uses cases for that is to implement pausable streams.

Pausable streams

RxJS Observables basically cannot be paused and resumed. There are ways of indirectly implementing a pause/resume feature using Observables, but a single Observable cannot be directly paused. That is because a "pause" is a message upwards, from consumer to producer, but with Observables the messages only go downwards.

If we would try to add that capability to RxJS, then we would need the producer (the Observable) to observe the consumer (the Observer). Fortunately, Callbags fill this gap because they are, conceptually, Observables and Observers at the same time.

For demonstrating pause/resume, I quickly put together this demo of a pausableInterval Callbag. It behaves like Observabe.interval but it pauses or resumes whenever it gets a message. First I'll show a simple use case of pausableInterval, later I'll show its implementation:

// Source is a producer of incremental numbers every 600ms
const source = pausableInterval(600);

// "Subscribe" to the source
source(0, (type, data) => {
  if (type === 0) {
    const talkback = data;
    // Every 2 seconds, send a message "null" back to the source
    setInterval(() => talkback(1, null), 2000);
  }
  if (type === 1) console.log(data);
});
// 0
// 1
// 2
// ...pauses and waits...
// 3
// 4
// 5
// ...pauses and waits...
// 6
// 7
// 8
// ...

When we subscribe to the source, we start getting incremental numbers every 600ms. We can call the talkback every 2 seconds with argument type 1 and some data, and that is what the pausableInterval will receive. The way pausableInterval is built, it toggles its pause/resume state whenever it receives a message, no matter what's the content of the message. Instead of doing it low-level with zeros and ones and talkbacks, we can use the handy sample operator that we saw previously, like this:

pipe(
  interval(2000),                // every 2 seconds...
  sample(pausableInterval(600)), // send message that will toggle pausableInterval
  forEach(x => console.log(x)),  // listen to what pausableInterval sends
)

You can check out the demo working in this CodeSandbox. RxJS cannot do this! While it's possible to somehow build a pause/resume feature using Observables in clever ways, it's difficult because we have to go around the fact that Observables only send data downwards. The implementation of the pausableInterval Callbag is below, just in case you are curious about it:

const pausableInterval = period => (start, sink) => {
  if (start !== 0) return;
  let i = 0, id = null;
  const pause = () => {
    clearInterval(id);
    id = null;
  };
  const resume = () => {
    id = setInterval(() => { sink(1, i++); }, period);
  };
  sink(0, t => {
    if (t === 1) {
      if (id) pause();
      else resume();
    }
    if (t === 2) pause();
  });
  resume();
};

Decentralized operator ecosystem

As I was writing this article, I also decided to take that snippet of code for pausableInterval and make it available on GitHub and npm. One of the goals of the Callbag project was to lower the barrier for creating and sharing an operator. As a consequence, there are no official operators, instead we highlight and categorize all of the community operators in the official Wiki.

The reasoning behind this radical approach, compared to RxJS, was to avoid operator creep, somewhat similar to feature creeps in software engineering.

When RxJS v4 was written as RxJS v5, we intentionally decided to reduce the amount of operators and avoid some operator creep. I am not convinced we succeeded in that, because RxJS v5 has more than a hundred operators and new operators are proposed every month.

I also created xstream with the intention of reducing the amount of core operators to roughly 20, but there are still frequent proposals for extra operators. I believe utility libraries like Lodash and Ramda may go through similar pressures for expansion.

I think all these libraries have ways of utilizing third-party operators, but the communities are used to consuming only official operators that come with the libraries. This is bad for the community because it creates a bottleneck at the centralized repository, because there is often discussion whether to accept or reject operator proposals. Also, not all official operators are equally documented and tested.

With Callbags, I decided to take a new approach: there is no official repository for operators, everything is community-driven and no operators are considered "core" while others are third-party. If you want a centralized bundle of curated operators, it's easy to create one like callbag-basics, which simply imports community operators. Also, there is no core library serving as the engine for Callbags, it's just functions!

I was surprised to see how well the community understood this idea and started writing their own operators in the first few weeks. One of my goals was to encourage people to become library authors. If you've ever created a simple utils file in your project at work, then you basically already know how to make a library. I'm glad that Callbags was the way this person got started as a library author, and one my favorite operators that they published was callbag-latest.

Sampling the latest value

callbag-latest will convert a listenable source into a pullable source, by storing the latest value in memory and sending it downwards only upon request.

Here is an example of latest in action: we use it together with sample to send the latest mouse coordinates every time a click happens on a button.

const {fromEvent, pipe, map, forEach} = require('callbag-basics');
const sample = require('callbag-sample');
const latest = require('callbag-latest');

const mouseXY = pipe(
  fromEvent(document, "mousemove"),
  map(e => `(${e.clientX},${e.clientY})`)
);;

const submitActionStream = pipe(
  fromEvent(window.submitBtn, "click"), // when a click happens on the submitBtn
  sample(latest(mouseXY)), // pull out the latest value that mouseXY sent
  map(v => ({type: "SUBMIT", value: v})), // and map it to an action object
  forEach(ac => console.log(ac))
);

Check the CodeSandbox for that.

This may look like RxJS's withLatestFrom. I was involved in the early phases of [withLatestFrom](https://github.com/ReactiveX/RxJava/issues/405) in RxJava and helped implement it in RxJS, and I can tell that latest is not exactly the same as that. The equivalent of RxJS A.withLatestFrom(B) is Callbag's sampleCombine(latest(A))(B), so latest is more granular than withLatestFrom. latest just does converts a listenable to a pullable.

Another way in RxJS to remember the latest value is with a BehaviorSubject or a ReplaySubject, but they are meant for multicasting (transforming cold Observables to hot), and the latest value is sent only when new Observers are added. "Pullables" are almost exist in RxJS, because these two subject types are essentially "pulled" by every new Observer added. But other than that, there isn't first-class support for generic pullables.

So neither withLatestFrom nor the two types of subjects give you only the specific capability of remembering the latest value, because they come with more capabilities than that. Callbag latest gives you only that and nothing else, and allows you to compose it with other operators to customize it further, for instance sample(latest(B)) or sampleCombine(latest(B)).

TodoMVC with Callbags

To see a real example of Callbags used in a webapp, check out TodoMVC with Callbags, brought to you by David Waller, who also authored the latest operator that we saw above.

The most interesting use of Callbags in that repository is in the actions.js file, where Redux-like actions are declared based on DOM events, for instance to cancel an edit upon "focusout" or ESC key:

const cancelEditActions = pipe(
  fromDelegatedEvent(root, 'input.edit', 'focusout'),
  mergeWith(pipe(
    fromDelegatedEvent(root, 'input.edit', 'keyup'),
    filter(e => e.key === 'Escape')
  )),
  mapTo({type: 'CANCELEDIT'})
);

Apart from the practical benefits of Callbag operators, they are also very lightweight in KB count. The TodoMVC app is just 6.9 KB gzipped, and contains Snabbdom, Callbags, and app logic:

In comparison, the React Redux TodoMVC is 57.7 KB gzipped.

To compare with RxJS in size, note that Callbags don't have a core library, there are only utilities. While core RxJS v5.5.6 (with Observable, Subscriber, Subscription, and no operators!) is 4.93 KB gzipped, all of the Callbag operators used in the TodoMVC demo take in total just 1.5 KB when gzipped.

So if you're looking for extremely lightweight reactive streams, Callbags might be a good option for your project.

Between Observables and functions

Comparing with RxJS helps to gain an understanding of the benefits of Callbags, but they are not a direct alternative to Observables. When you put these primitives on a scale from least general, to most general, Callbags fill the gap between Observables and functions:

Generality: **Promise < Observable < Callbag** < Function

We know that Observables can do everything that Promises can do. For instance, while Promises can only resolve one value at most, Observables have less restrictions on the amount of emissions. There are two ways of understanding this, and both are correct and sound: (1) Observables are more powerful and more general than Promises, (2) Observables are less "safe", less predictable than Promises.

Similarly, functions can do everything that Observables can do, and that's how raw functions are more powerful and more general, but also less safe and less predictable. The gap between Observables and functions is quite big, though. Just imagine how much work it would take to migrate an Observable-based project to vanilla JavaScript functions.

Callbags fill that gap. They are more powerful and more general than Observables, but also less safe and less predictable. For instance, when you get a new source Callbag, you are not sure how it handles incoming messages and outgoing messages. Is it a listenable, a pullable, or a hybrid? The scale for predictability is the inverse of generality:

Predictability: **Function < Callbag** < Observable < Promise

Because there is no competition between all these different primitives, mixing Promises and functions and Observables is normal when working in a JavaScript codebase. Callbags add some exciting possibilities to bridge between primitives, where before we had to do it manually with functions.

In the example below, some Callbag operators are acting as the tendons that connect RxJS bones with IxJS muscles:

const observable = Rx.Observable
  .fromEvent(window.button, "click")
  .mapTo(null)
  .delay(500)

const iterable = Ix.Iterable
  .from([1, 2, 3, 4, 5, 6])
  .filter(x => x % 2 === 0)
  .map(x => x * 2)

pipe(
  fromObs(observable),
  sample(fromIter(iterable)),
  forEach(ac => console.log(ac))
)

See this example running as a CodeSandbox.

I hope to write more content and guides about Callbags, so stay tuned. It's a project that I believe is about interoperability and community, more than anything else.