Published on

Preserving GenServer State on Node Shutdown

Authors

Long-running GenServers are an elegant weapon. In this post, we explore how to persist their state on node shutdown and then rehydrate it on system startup.

To begin, let's create an example GenServer that serves the purpose of maintaining a queue of items, and once the queue reaches a set threshold, it will operate on each item in queue and then clear it.

defmodule MyApp.ManagedQueue do
  use GenServer

  @tick_interval 1000
  @queue_threshold 10

  def start_link(opts) do
    GenServer.start_link(__MODULE__, opts, name: __MODULE__)
  end

  def init(_) do
    schedule_tick()
    {:ok, []}
  end

  def handle_info(:tick, state) do
    # check the length of the state and if it's above the threshold, do something
    if length(state) > @queue_threshold do
      some_logic_with_the_state(state)
      schedule_tick()

      # return the newly emptied state list
      {:noreply, []}
    else
      schedule_tick()
      {:noreply, state}
    end
  end

  def handle_cast({:enqueue, item}, state) do
    {:noreply, [item | state]}
  end

  defp schedule_tick do
    Process.send_after(self(), :tick, @tick_interval)
  end
end

What's happening here is that we start this GenServer with an empty list as its state and schedules a :tick message to be handled by handle_info/2 after the time defined in @tick_interval. Whenever it receives an :enqueue message, it adds the item to the list.

Once the length of the queue exceeds the number defined in @queue_threshold, we call some_logic_with_the_state/1 and then clear the queue.

This seems like a great idea, and it is, but there's a problem. If the node that the GenServer is running on is restarted, the GenServer will be restarted as well. This means that the state will be lost. Fear not though, we have the tools to fix this.

More than likely, you've got some sort of persistence layer in place in your application. For the sake of this demonstration, let's assume that you're using Ecto and PostgreSQL.

Persisting the state

defmodule MyApp.QueueManagerSnapshots.QueueManagerSnapshot do
  use Ecto.Schema
  import Ecto.Changeset

  schema "queue_manager_snapshots" do
    field :genserver_name, :string
    field :state_checksum, :string
    field :state, :binary

    timestamps()
  end
end

defmodule MyApp.QueueManagerSnapshots do

  import Ecto.Query

  alias MyApp.Repo
  alias MyApp.QueueManagerSnapshots.QueueManagerSnapshot, as: Snapshot

  @spec get_latest_snapshot(genserver_name :: String.t()) :: Snapshot.t() | nil
  def get_latest_snapshot(genserver_name) do
    Snapshot
    |> where(genserver_name: ^genserver_name)
    |> order_by(desc: :inserted_at)
    |> limit(1)
    |> Repo.one()
    |> case do
      nil ->
        nil

      snapshot ->
        snapshot
        |> Map.get(:state)
        |> :erlang.binary_to_term()
    end
  end

  @spec create_snapshot(genserver_name :: String.t(), state :: term()) :: Snapshot.t()
  def create_snapshot(genserver_name, state) do
    %Snapshot{}
    |> Snapshot.changeset(%{
      genserver_name: genserver_name,
      state_checksum: checksum(state),
      state: :erlang.term_to_binary(state)
    })
    |> maybe_insert_snapshot()
  end

  defp checksum(state) do
    state
    |> :erlang.md5()
    |> Base.encode16(case: :lower)
  end

  defp maybe_insert_snapshot(changeset) do
    # If a snapshot already exists with the same checksum and is the latest snapshot, we don't need to insert it again
    case get_latest_snapshot(changeset.changes.genserver_name) do
      %Snapshot{state_checksum: checksum} when checksum == changeset.changes.state_checksum ->
        changeset

      _ ->
        Repo.insert!(changeset)
    end
  end
end

The formula laid out above is fairly simple. We define an Ecto schema for a queue_manager_snapshots table that has fields to identify snapshots by the GenServer process name and a checksum of the state that will be used to ensure we don't insert any duplicate records. Obviously, we probably don't need a field for GenServer name, but it would be necessary for a more generic genserver_snapshots table.

We then define a context module of MyApp.QueueManagerSnapshots for working with and manipulating snapshots. Inside, we define a function to get the latest snapshot and a function to create snapshots.

Creating a snapshot is fairly straightforward. We give the function the name of the GenServer process and the current state of the GenServer. It then calculates a checksum of the state and builds a binary representation of the state. Finally, it inserts the snapshot into the database if it passes the maybe_insert_snapshot/1 check. Before a snapshot is inserted, we check to see if the latest snapshot has an identical checksum and if so, we don't insert a duplicate.

Integrating with the GenServer

Now that we have a way to persist the state of our GenServer, we need to make sure that it's restored when the node starts as well as ensuring that the state is saved when the node shuts down. This will rely on the init/1 function and a terminate/2 function.

defmodule MyApp.ManagedQueue do
  use GenServer

  alias MyApp.QueueManagerSnapshots

  @tick_interval 1000
  @queue_threshold 10

  def start_link(opts) do
    GenServer.start_link(__MODULE__, opts, name: __MODULE__)
  end

  def init(_) do
    # Enable the GenServer to receive shutdown signals
    Process.flag(:trap_exit, true)
    schedule_tick()
    {:ok, QueueManagerSnapshots.get_latest_snapshot(__MODULE__) || []}
  end

  def handle_info(:tick, state) do
    # check the length of the state and if it's above the threshold, do something
    if length(state) > @queue_threshold do
      some_logic_with_the_state(state)
      schedule_tick()
      {:noreply, []}
    else
      schedule_tick()
      {:noreply, state}
    end
  end

  def handle_cast({:enqueue, item}, state) do
    {:noreply, [item | state]}
  end

  # Handle graceful shutdown
  def terminate(_reason, state) do
    QueueManagerSnapshots.create_snapshot(__MODULE__, state)
  end

  defp schedule_tick do
    Process.send_after(self(), :tick, @tick_interval)
  end
end

In the example above we have two key pieces of the implementation to focus on. Firstly, when the GenServer starts, it will call into the QueueManagerSnapshots module to get the latest snapshot of the state. If there is no snapshot, then the GenServer will start with an empty list. If there is a snapshot, then the GenServer will start with the snapshot from the database. The snapshot is a list of items that have not been processed by the queue manager. Secondly, we use Process.flag(:trap_exit, true) in the init/1 callback to ensure our GenServer can handle shutdown signals gracefully. When the system initiates a shutdown, OTP will call our terminate/2 callback before the process exits. This gives us an opportunity to create a final snapshot of the GenServer's state, which the QueueManagerSnapshots module will store in the database. When the system comes back up, this snapshot will be used to restore the GenServer's state.

This pattern works exceedingly well for global singleton processes in a distributed Elixir environment. The snapshots are stored in the database and are restored when the system comes back up. This is a very simple example, but it is the basis for a lot of the complexity in distributed systems. I hope you enjoyed this and find it useful in your work as much as I have!