· elixir rabbitmq

Elixir - external process connection supervisor pattern

Unfortunately, it’s pretty difficult to title this post, but it’s a common problem. Let’s say that you have an application that depends on an external service like a database. If you have a pool of workers that are under supervision and the database goes down, what will happen is that erlang supervisor will attempt to restart max_restarts in max_seconds . If the database is down for a long time, after it fails, it will kill the children and exit itself. This is a problem.

A potential solution to this is to abstract out the connection and do retries. We can use this to wait for the server to come up or perhaps to connect to a different server if we have a list of them. I recently did this for a RabbitMQ broker class from https://hex.pm/packages/amqp. It didn’t support poolboy nor retries on the connection, so I had to write those in for the class. Elixir packages are fairly new, so you might have to do something of the sort to get a more robust interface.

So here’s the pattern. The trick is to use an :erlang.send_after to set a timer to try to connect and do a bit of pattern matching.

For instance, init calls :connect

  def init([]) do
    :erlang.send(self(), :connect)
    {:ok, nil}
  end

Then :connect calls handle_rabbit_connect with the connection open function:

  def handle_info(:connect, state) do
    handle_rabbit_connect(AMQP.Connection.open, state)
  end

Then we have a bit of pattern matching, and use :erlang_send_after to try again. We could also try to connect to different hosts, in this part of the code. The other thing to note is that the tcp connection process id on success is linked to the current GenServer process. Without that, we won’t be able to handle the failure of rabbitmq going down or if a tcp connection dies for some reason as the pool of GenServer processes would still be running.

  def handle_rabbit_connect({:error, _}, state) do
    IO.puts("Error connecting to rabbit")
    :erlang.send_after(5000,self(),:connect)
    {:noreply, nil}
  end

  def handle_rabbit_connect({:ok, conn}, state) do
    %AMQP.Connection{pid: pid} = conn
    Process.link(pid)
    IO.puts("Connected to rabbit")
    {:noreply, pid}
  end

For completeness, here’s the full code for the connection manager and the pool supervisor with poolboy. Note that this only connects to localhost, connection options should be passed to the last argument of poolboy.child_spec:

defmodule AMQPConnectManager do

  use GenServer

  def start_link(_) do
    GenServer.start_link(__MODULE__, [], [])
  end

  def init([]) do
    :erlang.send(self(), :connect)
    {:ok, nil}
  end

  def handle_info(:connect, state) do
    handle_rabbit_connect(AMQP.Connection.open, state)
  end

  def handle_rabbit_connect({:error, _}, state) do
    IO.puts("Error connecting to rabbit")
    :erlang.send_after(5000,self(),:connect)
    {:noreply, nil}
  end

  def handle_rabbit_connect({:ok, conn}, state) do
    %AMQP.Connection{pid: pid} = conn
    Process.link(pid)
    IO.puts("Connected to rabbit")
    {:noreply, pid}
  end

  def handle_call(:channel,_from,state) do
    conn = %AMQP.Connection{pid: state}
    {:ok, channel} = AMQP.Channel.open(conn)
    {:reply, {:ok, channel}, state}
  end

  def handle_cast(_msg, state) do
    {:noreply, state}
  end
end
defmodule AMQPPool do
  use Supervisor

  def start_link do
    Supervisor.start_link(__MODULE__, [])
  end

  def init([]) do
    pool_options = [
      {:name, {:local, :amqp_pool}},
      {:worker_module, AMQPConnectManager},
      {:size, 10},
      {:max_overflow, 10}
      ]
    children = [
      :poolboy.child_spec( :amqp_pool, pool_options, [] )
      ]
    supervise(children, strategy: :one_for_one)
  end

  def channel() do
    :poolboy.transaction(:amqp_pool, fn(p) -> 
       GenServer.call(p, :channel)
     end)
  end
end

tl;dr - if you have a external application your code relies on, create a connection manager to ensure that you can have your pool working

  • LinkedIn
  • Tumblr
  • Reddit
  • Google+
  • Pinterest
  • Pocket