Elixir - external process connection supervisor pattern
Unfortunately, it’s pretty difficult to title this post, but it’s a common problem. Let’s say that you have an application that depends on an external service like a database. If you have a pool of workers that are under supervision and the database goes down, what will happen is that erlang supervisor will attempt to restart max_restarts in max_seconds . If the database is down for a long time, after it fails, it will kill the children and exit itself. This is a problem.
A potential solution to this is to abstract out the connection and do retries. We can use this to wait for the server to come up or perhaps to connect to a different server if we have a list of them. I recently did this for a RabbitMQ broker class from https://hex.pm/packages/amqp. It didn’t support poolboy nor retries on the connection, so I had to write those in for the class. Elixir packages are fairly new, so you might have to do something of the sort to get a more robust interface.
So here’s the pattern. The trick is to use an :erlang.send_after to set a timer to try to connect and do a bit of pattern matching.
For instance, init calls :connect
def init([]) do
:erlang.send(self(), :connect)
{:ok, nil}
end
Then :connect calls handle_rabbit_connect with the connection open function:
def handle_info(:connect, state) do
handle_rabbit_connect(AMQP.Connection.open, state)
end
Then we have a bit of pattern matching, and use :erlang_send_after to try again. We could also try to connect to different hosts, in this part of the code. The other thing to note is that the tcp connection process id on success is linked to the current GenServer process. Without that, we won’t be able to handle the failure of rabbitmq going down or if a tcp connection dies for some reason as the pool of GenServer processes would still be running.
def handle_rabbit_connect({:error, _}, state) do
IO.puts("Error connecting to rabbit")
:erlang.send_after(5000,self(),:connect)
{:noreply, nil}
end
def handle_rabbit_connect({:ok, conn}, state) do
%AMQP.Connection{pid: pid} = conn
Process.link(pid)
IO.puts("Connected to rabbit")
{:noreply, pid}
end
For completeness, here’s the full code for the connection manager and the pool supervisor with poolboy. Note that this only connects to localhost, connection options should be passed to the last argument of poolboy.child_spec:
defmodule AMQPConnectManager do
use GenServer
def start_link(_) do
GenServer.start_link(__MODULE__, [], [])
end
def init([]) do
:erlang.send(self(), :connect)
{:ok, nil}
end
def handle_info(:connect, state) do
handle_rabbit_connect(AMQP.Connection.open, state)
end
def handle_rabbit_connect({:error, _}, state) do
IO.puts("Error connecting to rabbit")
:erlang.send_after(5000,self(),:connect)
{:noreply, nil}
end
def handle_rabbit_connect({:ok, conn}, state) do
%AMQP.Connection{pid: pid} = conn
Process.link(pid)
IO.puts("Connected to rabbit")
{:noreply, pid}
end
def handle_call(:channel,_from,state) do
conn = %AMQP.Connection{pid: state}
{:ok, channel} = AMQP.Channel.open(conn)
{:reply, {:ok, channel}, state}
end
def handle_cast(_msg, state) do
{:noreply, state}
end
end
defmodule AMQPPool do
use Supervisor
def start_link do
Supervisor.start_link(__MODULE__, [])
end
def init([]) do
pool_options = [
{:name, {:local, :amqp_pool}},
{:worker_module, AMQPConnectManager},
{:size, 10},
{:max_overflow, 10}
]
children = [
:poolboy.child_spec( :amqp_pool, pool_options, [] )
]
supervise(children, strategy: :one_for_one)
end
def channel() do
:poolboy.transaction(:amqp_pool, fn(p) ->
GenServer.call(p, :channel)
end)
end
end
tl;dr - if you have a external application your code relies on, create a connection manager to ensure that you can have your pool working