- Published on
Preserving GenServer State on Node Shutdown
- Authors
- Name
- Will Ricketts
- @willricketts
Long-running GenServers are an elegant weapon. In this post, we explore how to persist their state on node shutdown and then rehydrate it on system startup.
To begin, let's create an example GenServer that serves the purpose of maintaining a queue of items, and once the queue reaches a set threshold, it will operate on each item in queue and then clear it.
defmodule MyApp.ManagedQueue do
use GenServer
@tick_interval 1000
@queue_threshold 10
def start_link(opts) do
GenServer.start_link(__MODULE__, opts, name: __MODULE__)
end
def init(_) do
schedule_tick()
{:ok, []}
end
def handle_info(:tick, state) do
# check the length of the state and if it's above the threshold, do something
if length(state) > @queue_threshold do
some_logic_with_the_state(state)
schedule_tick()
# return the newly emptied state list
{:noreply, []}
else
schedule_tick()
{:noreply, state}
end
end
def handle_cast({:enqueue, item}, state) do
{:noreply, [item | state]}
end
defp schedule_tick do
Process.send_after(self(), :tick, @tick_interval)
end
end
What's happening here is that we start this GenServer with an empty list as its state and schedules a :tick
message to be handled by handle_info/2
after the time defined in @tick_interval
. Whenever it receives an :enqueue
message, it adds the item to the list.
Once the length of the queue exceeds the number defined in @queue_threshold
, we call some_logic_with_the_state/1
and then clear the queue.
This seems like a great idea, and it is, but there's a problem. If the node that the GenServer is running on is restarted, the GenServer will be restarted as well. This means that the state will be lost. Fear not though, we have the tools to fix this.
More than likely, you've got some sort of persistence layer in place in your application. For the sake of this demonstration, let's assume that you're using Ecto and PostgreSQL.
Persisting the state
defmodule MyApp.QueueManagerSnapshots.QueueManagerSnapshot do
use Ecto.Schema
import Ecto.Changeset
schema "queue_manager_snapshots" do
field :genserver_name, :string
field :state_checksum, :string
field :state, :binary
timestamps()
end
end
defmodule MyApp.QueueManagerSnapshots do
import Ecto.Query
alias MyApp.Repo
alias MyApp.QueueManagerSnapshots.QueueManagerSnapshot, as: Snapshot
@spec get_latest_snapshot(genserver_name :: String.t()) :: Snapshot.t() | nil
def get_latest_snapshot(genserver_name) do
Snapshot
|> where(genserver_name: ^genserver_name)
|> order_by(desc: :inserted_at)
|> limit(1)
|> Repo.one()
|> case do
nil ->
nil
snapshot ->
snapshot
|> Map.get(:state)
|> :erlang.binary_to_term()
end
end
@spec create_snapshot(genserver_name :: String.t(), state :: term()) :: Snapshot.t()
def create_snapshot(genserver_name, state) do
%Snapshot{}
|> Snapshot.changeset(%{
genserver_name: genserver_name,
state_checksum: checksum(state),
state: :erlang.term_to_binary(state)
})
|> maybe_insert_snapshot()
end
defp checksum(state) do
state
|> :erlang.md5()
|> Base.encode16(case: :lower)
end
defp maybe_insert_snapshot(changeset) do
# If a snapshot already exists with the same checksum and is the latest snapshot, we don't need to insert it again
case get_latest_snapshot(changeset.changes.genserver_name) do
%Snapshot{state_checksum: checksum} when checksum == changeset.changes.state_checksum ->
changeset
_ ->
Repo.insert!(changeset)
end
end
end
The formula laid out above is fairly simple. We define an Ecto schema for a queue_manager_snapshots
table that has fields to identify snapshots by the GenServer process name and a checksum of the state that will be used to ensure we don't insert any duplicate records. Obviously, we probably don't need a field for GenServer name, but it would be necessary for a more generic genserver_snapshots
table.
We then define a context module of MyApp.QueueManagerSnapshots
for working with and manipulating snapshots. Inside, we define a function to get the latest snapshot and a function to create snapshots.
Creating a snapshot is fairly straightforward. We give the function the name of the GenServer process and the current state of the GenServer. It then calculates a checksum of the state and builds a binary representation of the state. Finally, it inserts the snapshot into the database if it passes the maybe_insert_snapshot/1
check. Before a snapshot is inserted, we check to see if the latest snapshot has an identical checksum and if so, we don't insert a duplicate.
Integrating with the GenServer
Now that we have a way to persist the state of our GenServer, we need to make sure that it's restored when the node starts as well as ensuring that the state is saved when the node shuts down. This will rely on the init/1
function and a terminate/2
function.
defmodule MyApp.ManagedQueue do
use GenServer
alias MyApp.QueueManagerSnapshots
@tick_interval 1000
@queue_threshold 10
def start_link(opts) do
GenServer.start_link(__MODULE__, opts, name: __MODULE__)
end
def init(_) do
# Enable the GenServer to receive shutdown signals
Process.flag(:trap_exit, true)
schedule_tick()
{:ok, QueueManagerSnapshots.get_latest_snapshot(__MODULE__) || []}
end
def handle_info(:tick, state) do
# check the length of the state and if it's above the threshold, do something
if length(state) > @queue_threshold do
some_logic_with_the_state(state)
schedule_tick()
{:noreply, []}
else
schedule_tick()
{:noreply, state}
end
end
def handle_cast({:enqueue, item}, state) do
{:noreply, [item | state]}
end
# Handle graceful shutdown
def terminate(_reason, state) do
QueueManagerSnapshots.create_snapshot(__MODULE__, state)
end
defp schedule_tick do
Process.send_after(self(), :tick, @tick_interval)
end
end
In the example above we have two key pieces of the implementation to focus on. Firstly, when the GenServer starts, it will call into the QueueManagerSnapshots
module to get the latest snapshot of the state. If there is no snapshot, then the GenServer will start with an empty list. If there is a snapshot, then the GenServer will start with the snapshot from the database. The snapshot is a list of items that have not been processed by the queue manager. Secondly, we use Process.flag(:trap_exit, true)
in the init/1
callback to ensure our GenServer can handle shutdown signals gracefully. When the system initiates a shutdown, OTP will call our terminate/2
callback before the process exits. This gives us an opportunity to create a final snapshot of the GenServer's state, which the QueueManagerSnapshots
module will store in the database. When the system comes back up, this snapshot will be used to restore the GenServer's state.
This pattern works exceedingly well for global singleton processes in a distributed Elixir environment. The snapshots are stored in the database and are restored when the system comes back up. This is a very simple example, but it is the basis for a lot of the complexity in distributed systems. I hope you enjoyed this and find it useful in your work as much as I have!