These days I wanted to have a strong grasp at the inner working of GenStage so that I could use it efficiently at work.

TL;DR You can find my proof-of-concept on https://github.com/bdusauso/genstage-sandbox

The problem

My professional use case is quite simple to explain: it’s a push-based worker. It means that some data are pushed by the client into a buffer, and they are delivered to a consumer, one at a time.

This, obviously, should not be complicated.

It’s not, at least when you’ve understood how the toolkit you use works. Which I didn’t. Even after having read the documentation, I kept myself struggling understanding the demand mechanism. So I asked for help on the excellent Elixir forum. I then started to understand how it really works internally, thanks to the explanation of the participants. I even read this article from Johanna Larsson for an even better understanding.

Armed with these new insights, I began working on some modification of my proof-of-concept. Unfortunately there is one tiny difference in my use case that changes everything: acknowledgement. This mechanism adds resilience to the whole system: each time an event is getting processed by the consumer, the latter sends back an acknowledgement to the producer with the unique identifier of the former. That way the producer knows the event has been correctly processed and can then proceed in sending the next message. If something goes wrong on the consumer side, the producer will send the same message again until the correct event identifier has been sent back with an acknowledgement to it.

So, how is this different from the “classic” push-based style ?

In the “classic” way of doing things, the producer will enqueue the data in the buffer only if there’s no demand left, otherwise it will directly propagate the event in order to satisfy the demand. So basically the only driver is the demand.

In the approach I took the demand is not the driver anymore, it’s the acknowledgement mechanism. As we stated hereabove we are processing one event at a time in order to simplify the whole process, we can then safely ignore the demand.

Moreover we need a mechanism in order to know if we have to send the last message again in case the consumer has crashed. I have to say that, even if this seems trivial, it gave me some headache … It’s easy to look after complicated solution when the answer is right in front of you.

The code

Let’s explore the different pieces of the puzzle.

(Note: I’ve slightly changed the formatting of the code so it minimizes the horizontal scrolling)

We’ll begin by the client API

defmodule Sandbox do
  def publish(message) do 
    GenServer.call(producer_name(), {:publish, {message, UUID.uuid4()}})
  end
end

You can clearly see that we attribute an unique identifier with each message. Note that I could have done that in the producer.

Next is the state of the producer. It simply consists of a buffer, the current message awaiting for an acknowledgement and the pid of the consumer. The latter is needed so we know if it has crashed; we can then act on that.

defmodule Sandbox.Producer do
  use GenStage

  defmodule State do
    defstruct buffer: nil,
              to_ack: nil,
              consumer: nil
  end

end


Publishing a message

After this overview of its state, let’s take a look at what’s going on when we want to publish a message. We have to differentiate the cases when the consumer is not there - either because it hasn’t subscribed yet to the producer or because it has crashed.

When the consumer is not present we just enqueue the message in the buffer and set it as the message to be acked.

def handle_call({:publish, {_, id} = msg}, _, %State{consumer: nil} = state) do
  enqueue(state.buffer, msg)
  to_ack = if state.to_ack, do: state.to_ack, else: peek(state.buffer)

  {:reply, {:ok, msg}, [], %State{state | to_ack: to_ack}}
end


Things get a little different when the consumer is present: if there is no message awaiting for ack we just take the next message from the buffer and propagate it, otherwise we do nothing.

def handle_call({:publish, {_, id} = msg}, _, %State{} = state) do
  enqueue(state.buffer, msg)
  {events, to_ack} =
    case state.to_ack do
      nil ->
        elem = peek(state.buffer)
        {[elem], elem}

      _ ->
        {[], state.to_ack}
    end

  {:reply, {:ok, msg}, events, %State{state | to_ack: to_ack}}
end


Handling the demand

Next onto the demand handling. As said earlier, it’s not the main driver but we have to handle it anyway. It’s just that we don’t care about how many events the consumer is requesting.

To be honest, I even don’t know if we can fall into that first specific case …

def handle_demand(_, %State{consumer: nil} = state) do
  {:noreply, [], state}
end

When the consumer is present but there’s no message awaiting ack just means there was no queued message when the last ack was received so the consumer had nothing to do. But since that moment some messages may have been queued. We need to check that out.

def handle_demand(_, %State{to_ack: nil} = state) do
  Logger.debug("Received demand - #{inspect(state)}")
  {events, to_ack} =
    if empty?(state.buffer),
      do: {[], nil},
      else: {[peek(state.buffer), peek(state.buffer)]}

  {:noreply, events, %State{state | to_ack: to_ack}}
end

Finally the most interesting case: if, when handling the demand, there is still a message awaiting ack, it means that something most surely went wrong on the consumer side. That means that if it crashes, it is directly restarted by the supervisor and resubscribes to the producer, triggering a demand to the latter. We just propagate the message awaiting ack again.

def handle_demand(_, %State{} = state) do
  {:noreply, [state.to_ack], state}
end


Monitoring the consumer

We need to know when the consumer crashes so we can update our state and set the consumer’s pid to nil.

def handle_info({:DOWN, _, :process, pid, _}, %State{consumer: pid} = state) do
  {:noreply, [], %State{state | consumer: nil}}
end

But we also need to set it when the consumer is subscribing to the producer

def handle_subscribe(:consumer, _options, {pid, _}, %State{} = state) do
  Process.monitor(pid)
  {:automatic, %State{state | consumer: pid}}
end


Acking the message

Here’s the big difference compared to the classical push-based approach. Indeed, it’s the only part where we dequeue the message from the buffer, that is we receive the ack from the consumer, stating this message has been successfully processed. We can then remove it from the buffer and send the next message, if available.

Just note that we make sure we receive the ack for the message awaiting it by pattern matching on the id.

def handle_info({:ack, id}, %State{to_ack: {_, id}} = state) do
  dequeue(state.buffer)
  {events, to_ack} =
    if empty?(state.buffer),
      do: {[], nil},
      else: {[peek(state.buffer)], peek(state.buffer)}

  {:noreply, events, %State{state | to_ack: to_ack}}
end

And that’s it !

There’s nothing really interesting on the consumer side, it just processes the message and send back an ack to the producer.

Conclusion

Even though the code is not hard to understand it took me a while to figure out how all the moving parts fitted together. I had some misconceptions about how things worked behind the curtains, especially regarding the demand mechanisme.

In the end I learned a lot doing this proof-of-concept.

I’d like to thank all the people involved in the discussion on the Elixir Forum, it really helped me !

One final note

I’m completely aware this look very similar to Broadway in the end, and in fact I was wondering if I could use it instead of rolling out my own solution.

Indeed, Broadway nearly completely fit my needs:

  • built-in acknowledgement mechanism
  • fault-tolerance with minimal data loss
  • ordering of events

I’m a real fan of Broadway, I use it a lot these days. I even did a pull request for its RabbitMQ adapter.

But there are multiple reasons why I continued with my own solution:

  • you get a better understanding of things if you build them by yourself
  • Broadway is built on GenStage, so in the end I would have been facing the same problems
  • as far as I know there’s no way of sending a call to a Broadway producer since it is opaquely started by Broadway itself. It sets a specific name and you can’t access the pid of it