DEV Community

Cover image for Managing Distributed State with GenServers in Phoenix and Elixir
Pulkit Goyal for AppSignal

Posted on • Originally published at blog.appsignal.com

Managing Distributed State with GenServers in Phoenix and Elixir

Phoenix and Elixir are designed at their core to build real-time, fault-tolerant applications. With its elegant syntax and the robustness of the Erlang VM, Elixir is an ideal candidate for tackling the challenges of distributed state management.

This two-part series will guide Phoenix/Elixir developers through the intricacies of working with Phoenix in a distributed setup.

The focus of this first post will be on GenServers — the key components behind state management in distributed systems. We will explore how GenServers can be leveraged to maintain state across nodes in a Phoenix application, ensuring data consistency and fault tolerance. We’ll break down the complexities of GenServers and distributed state management into manageable and understandable segments.

Overview of Our Phoenix Application

Let's focus on distributed state management in the context of a CRUD API. Imagine you have a popular API service built with Phoenix that handles a large number of requests from various clients. To prevent abuse and ensure fair usage, you want to implement a rate limiter that restricts the number of requests a client can make within a specific timeframe, such as 100 requests per minute.

You can implement this simply using the "token bucket" strategy. In this strategy, a bucket is maintained for each client that is filled with tokens at a constant rate. Each API call consumes a single token from the bucket and is blocked if there are no tokens available.

In the next section, we will set up a token bucket for a single-node scenario using a GenServer.

A Single-Node Rate Limiter Implementation Using GenServer for Elixir

GenServers are a cornerstone within the Elixir ecosystem for managing state and encapsulating functionality. They are the go-to structure for maintaining state within an application because they can hold state throughout their lifecycle and be interacted with asynchronously.

defmodule MyApp.TokenBucketRateLimiter do
  use GenServer

  # tokens per second
  @rate 1
  # maximum tokens in the bucket
  @bucket_size 10

  def start_link([]), do: GenServer.start_link(__MODULE__, nil, name: __MODULE__)

  @impl true
  def init(nil), do: {:ok, %{buckets: %{}}}

  def allow?(client_id) do
    GenServer.call(__MODULE__, {:allow?, client_id})
  end

  @impl true
  def handle_call({:allow?, client_id}, _from, state) do
    current_time = System.monotonic_time(:second)
    {reply, updated_bucket} =
      state.buckets
      |> get_or_init_bucket(client_id)
      |> update_bucket(current_time)
      |> maybe_consume_token(current_time)

    state = put_in(state, [:buckets, client_id], updated_bucket)
    {:reply, reply, state}
  end

  defp maybe_consume_token(%{tokens: tokens} = bucket, current_time)
       when tokens >= 1 do
    updated_bucket = %{bucket | tokens: tokens - 1, last_checked: current_time}
    {true, updated_bucket}
  end

  defp maybe_consume_token(bucket, _current_time), do: {false, bucket}

  defp get_or_init_bucket(buckets, client_id) do
    Map.get(buckets, client_id, %{
      tokens: @bucket_size,
      last_checked: current_time
    })
  end

  defp update_bucket(bucket, current_time) do
    # Add tokens based on the last time the bucket was checked
    time_passed = current_time - bucket.last_checked
    new_tokens = time_passed * @rate
    updated_tokens = min(bucket.tokens + new_tokens, @bucket_size)
    %{bucket | tokens: updated_tokens, last_checked: current_time}
  end
end
Enter fullscreen mode Exit fullscreen mode

This works well in a single-node setup because the GenServer holds the token state for the entire application across web requests. We can add it to the application's supervision tree to start the API rate limiter automatically:

defmodule MyApp.Application do
  use Application

  @impl true
  def start(_type, _args) do
    children = [
      # Other Children...
      MyApp.TokenBucketRateLimiter
    ]

    opts = [strategy: :one_for_one, name: MyApp.Supervisor]
    Supervisor.start_link(children, opts)
  end

  # ...
end
Enter fullscreen mode Exit fullscreen mode

The first time allow? is called with a new client ID, it initializes the bucket to 10 tokens, and then each allow? with the same client ID consumes one token per request. Eventually, the eleventh request in the same second is rejected because there are no more tokens. If the client tries again after a while, the requests are allowed again.

iex> 1..11 |> Enum.map(fn _i -> MyApp.TokenBucketRateLimiter.allow?("some client id") end)
[true, true, true, true, true, true, true, true, true, true, false]

iex> Process.sleep(1000)
:ok
iex> MyApp.TokenBucketRateLimiter.allow?("some client id")
true
Enter fullscreen mode Exit fullscreen mode

But as the app scales and we deploy it across multiple nodes (e.g., in a Kubernetes cluster or across different regions), we face the challenge of coordinating rate limiting across all nodes.

Each node needs to be aware of the request counts from other nodes to enforce the rate limit consistently. Without a distributed solution, each node would only track requests it directly handles, leading to inconsistent rate limiting.

You might think that maintaining request limits in memory is not feasible at a scale that requires multiple nodes. But since each client's state is just a single integer for the token size and a single integer for the timestamp, this doesn't require a large amount of memory (2 * 64 bytes per client). Even if there are 100k concurrent clients, less than 15MB of in-memory state is required.

Understanding GenServers in a Distributed Elixir Environment

Distributed systems pose unique challenges, particularly when it comes to state management. Network partitions, node failures, and latency are just a few issues that can disrupt state consistency across nodes. Ensuring that all nodes have the correct state or can recover it when things go wrong is critical for a system's reliability.

GenServers can be distributed across nodes in a cluster, allowing them to share state and handle requests from different parts of a system. By leveraging the built-in distribution mechanisms of the BEAM VM, GenServers can communicate across nodes with the same ease as within a single node.

In the next section, we will use some strategies to support rate limiting across multiple nodes in the cluster.

Implementing Distributed GenServers Using a Single Global Process

The simplest strategy to support a rate limiter across a whole cluster is to start a single global process that manages the limits.
This is simple to do using Erlang's :global name registration.

In the single node setup above, we register the GenServer under the name MyApp.TokenBucketRateLimiter (__MODULE__ contains the name of the current module). This registers the process locally on the node. It is also possible to register it globally using a {:global, name} tuple when calling GenServer.start_link/3.
This registers the process under the given name globally across the whole cluster. Note that since we update the process name in start_link, we also have to update it when using any call, cast, and friends.

Let's update the code to do this:

defmodule MyApp.TokenBucketRateLimiter do
  def start_link([]) do
    GenServer.start_link(__MODULE__, nil, name: {:global, __MODULE__})
  end

  def allow?(client_id) do
    GenServer.call({:global, __MODULE__}, {:allow?, client_id})
  end
end
Enter fullscreen mode Exit fullscreen mode

This enforces that only a single process with the global name MyApp.TokenBucketRateLimiter can exist across the cluster. However, this is insufficient, as it will prevent all other application nodes from joining our cluster because the supervision tree cannot start all its processes. Let's try it out by starting a few nodes in the cluster.

  1. The first node starts fine:
    iex --name node1@127.0.0.1 --cookie asdf -S mix
   iex(node1@127.0.0.1)1>
Enter fullscreen mode Exit fullscreen mode
  1. But as soon as we start a second node and connect it to the cluster, the application crashes:
    iex --name node2@127.0.0.1 --cookie asdf -S mix
   iex(node2@127.0.0.1)1> Node.connect(:"node1@127.0.0.1")
   [notice] Application my_app exited: shutdown
Enter fullscreen mode Exit fullscreen mode

We need to make sure that the start_link function of the rate limiter doesn't generate any errors when a process already exists on another node. To do this, we can check and modify the start_link function to return :ignore when the process already exists:

def start_link([]) do
  tuple = {:global, __MODULE__}

  case GenServer.whereis(tuple) do
    nil -> GenServer.start_link(__MODULE__, nil, name: tuple)
    _pid -> :ignore
  end
end
Enter fullscreen mode Exit fullscreen mode

Once that's done, we can start multiple nodes in the cluster, and they will all ping a central rate limiter instance to allow or deny incoming requests. The central rate limiter will be the GenServer process that happens to start first, i.e., the first node in the cluster to come up.

While this works, there are several issues with this approach:

  1. The node is a single point of failure now. If the node goes down, the rate limiter will be unreachable until another node starts it. Even worse, this is not handled automatically, so we might be left with no rate limiter process unless a new node is started.
  2. The state is on a single node. This means that if the rate limiter process is terminated/restarted (either due to a programming error or node failure), the state is lost.
  3. All nodes in the cluster have to make a call to the single node on every API call.

Using Multiple Processes Across the Cluster

In order to provide better fault tolerance when managing state across distributed nodes, we can use a combination of techniques, including:

  • State partitioning: Dividing the state into partitions so that each GenServer instance handles a subset of the data.
  • Replication: Replicating state across nodes to provide redundancy and increase fault tolerance.
  • Conflict resolution: Implementing strategies to handle state conflicts, such as using version vectors or CRDTs (Conflict-free Replicated Data Types).

Let's see how we can use CRDTs to provide super-fast data access alongside eventual consistency and data replication guarantees.

We will use the Elixir library DeltaCrdt to manage shared state across multiple nodes in the cluster.

Note: In addition to DeltaCrdt, other libraries like Horde and Swarm can help you to coordinate processes and state across several nodes in the cluster.

Let's add it to mix.exs to get started:

defmodule MyApp.MixProject do
  defp deps do
    [
      # ...
      {:delta_crdt, "~> 0.6.3"}
    ]
  end
end
Enter fullscreen mode Exit fullscreen mode

DeltaCrdt provides a simple-to-use API for managing a map of data that is guaranteed to be conflict-free (across distributed usage) and replicated across multiple nodes. For conflict resolution, it provides AWLWWMap (that stands for 'Add-Wins Last-Write-Wins Map) so that in the case of a conflict, the last write always wins.

The API is simple — you start multiple DeltaCrdt processes (which can be on the same node or multiple nodes) and create a cluster with them using set_neighbours/2:

{:ok, crdt1} = DeltaCrdt.start_link(DeltaCrdt.AWLWWMap)
{:ok, crdt2} = DeltaCrdt.start_link(DeltaCrdt.AWLWWMap)
DeltaCrdt.set_neighbours(crdt1, [crdt2])
DeltaCrdt.read(crdt1)
%{}
Enter fullscreen mode Exit fullscreen mode

Once neighbours are set, adding any data to one CRDT automatically syncs with the second one.

DeltaCrdt.put(crdt1, "foo", "bar")
DeltaCrdt.read(crdt2)
%{"foo" => "bar"}
Enter fullscreen mode Exit fullscreen mode

Starting A One Rate Limiter Per Node

With this out of the way, we can update our implementation of the rate limiter to use DeltaCrdt to sync states. First, we need to make sure all rate limiter processes are discoverable globally, but with unique names. One way to do that is to use the node name when naming the process:

defmodule MyApp.TokenBucketRateLimiter do
  use GenServer

  def start_link([]), do: GenServer.start_link(__MODULE__, nil, name: global_tuple())

  defp global_tuple(node \\ Node.self()), do: {:global, to_string(node) <> to_string(__MODULE__)}
end
Enter fullscreen mode Exit fullscreen mode

Syncing GenServer State With CRDT

Now that all nodes start their own copy of the rate limiter, we need to sync the state. Let's do that by starting a DeltaCrdt process from the GenServer and using the CRDT to make decisions about the rate limits:

defmodule MyApp.TokenBucketRateLimiter do
  use GenServer

  @impl true
  def init(nil) do
    {:ok, crdt} = DeltaCrdt.start_link(DeltaCrdt.AWLWWMap, sync_interval: 100)

    {:ok, %{crdt: crdt}}
  end

  def allow?(client_id), do: GenServer.call(global_tuple(), {:allow?, client_id})

  @impl true
  def handle_call({:allow?, client_id}, _from, state) do
    current_time = System.os_time(:second)

    {reply, updated_bucket} =
      state.crdt
      |> get_or_init_bucket(client_id, current_time)
      |> update_bucket(current_time)
      |> maybe_consume_token(current_time)

    # Update the CRDT with the new bucket state
    DeltaCrdt.put(state.crdt, client_id, updated_bucket)

    {:reply, reply, state}
  end

  defp get_or_init_bucket(crdt, client_id, current_time) do
    case DeltaCrdt.get(crdt, client_id) do
      nil -> default_bucket(current_time)
      bucket -> bucket
    end
  end

  defp default_bucket(time), do: %{tokens: @bucket_size, last_checked: time}
end
Enter fullscreen mode Exit fullscreen mode

The major changes here from the single-node approach are:

  1. Instead of Map.get from the local GenServer state, we use DeltaCrdt.get to get the key from the CRDT instead. This fetches the data from the local copy of the CRDT which is very fast to access and guarantees eventual consistency.
  2. Similar to the above, we use DeltaCrdt.put to update the shared state instead of the local GenServer state when updating the tokens count. This is again a fast operation that updates the local state — but with a guarantee that the changes will be eventually synced with all processes in the cluster.

Clustering the GenServers

While we are almost there, there is one last step left. The CRDTs are currently local and do not sync with other nodes in the GenServer. We need to use the set_neighbours/2 API from DeltaCRDT to set up a sync.

Let's create a new function named update_neighbours inside the GenServer that does this:

defp update_neighbours(crdt) do
  neighbours =
    Node.list()
    |> Enum.map(fn node ->
      node
      |> global_tuple()
      |> GenServer.whereis()
      |> :sys.get_state()
      |> Map.get(:crdt)
    end)

  DeltaCrdt.set_neighbours(crdt, neighbours)
end
Enter fullscreen mode Exit fullscreen mode

Now all we need to do is call this function when the GenServer starts (from init or a handle_continue callback).

The above piece of code takes care of updating CRDT neighbors when a new process starts. But set_neighbours is a one-sided operation (i.e., it sets up puts from the CRDT to its neighbors, but not the other way around). To ensure servers that have already started also update their neighbors, we must call update_neighbours on all cluster processes as soon as any service starts.

Let's add a new refresh_peer_neighbours function to do that (which should then be called after the initialization of the GenServer).

defp refresh_peer_neighbours() do
  Node.list()
  |> Enum.each(fn node ->
    node
    |> global_tuple()
    |> GenServer.whereis()
    |> Process.send(pid, :refresh_neighbours, [])
  end)
end

@impl true
def handle_info(:refresh_neighbours, state) do
  update_neighbours(state.crdt)
  {:noreply, state}
end
Enter fullscreen mode Exit fullscreen mode

Handling Cluster Changes

With all this in place, there is still one final piece of the puzzle. What happens when nodes are added or removed from the cluster? To update the CRDT neighborhood on cluster-level changes, we can use :net_kernel.monitor_nodes(true) from Erlang. This monitors the nodes and refreshes the neighbors on any nodeup or nodedown events:

@impl true
def init(nil) do
  {:ok, crdt} = DeltaCrdt.start_link(DeltaCrdt.AWLWWMap, sync_interval: 100)

  # Monitor for addition of new nodes
  :net_kernel.monitor_nodes(true)

  {:ok, %{crdt: crdt}, {:continue, nil}}
end

@impl true
def handle_info({:nodeup, _node}, state) do
  Process.send_after(self(), :refresh_neighbours, 1_000)
  {:noreply, state}
end

def handle_info({:nodedown, _node}, state) do
  Process.send_after(self(), :refresh_neighbours, 1_000)
  {:noreply, state}
end
Enter fullscreen mode Exit fullscreen mode

This finally brings us to the end of our implementation. There are some edge cases to consider:

  1. Avoiding deadlocks when refreshing neighbours (since we need to fetch the PID of the CRDTs from all nodes).
  2. Handling cases where the RateLimiter does not exist on a node (e.g., when it is being restarted).

Here is the full implementation. Let's take it for a spin:

# Node 1
 iex --name node1@127.0.0.1 --cookie asdf -S mix
iex(node1@127.0.0.1)1>

# Node2
 iex --name node2@127.0.0.1 --cookie asdf -S mix
iex(node2@127.0.0.1)1> Node.connect(:"node1@127.0.0.1")
true
iex(node2@127.0.0.1)2> MyApp.TokenBucketRateLimiter.allow?("some client id")
true
iex(node2@127.0.0.1)3> MyApp.TokenBucketRateLimiter.crdt() |> DeltaCrdt.get("some client id")
%{tokens: 9, last_checked: 1725358705}

# Back on Node1
iex(node1@127.0.0.1)1> MyApp.TokenBucketRateLimiter.crdt() |> DeltaCrdt.get("some client id")
%{tokens: 9, last_checked: 1725358705}
Enter fullscreen mode Exit fullscreen mode

And that's it for part one of this series!

Wrapping Up

In this first part of our Distributed Phoenix series, we've taken an in-depth look at how GenServers and Delta CRDTs can be leveraged to manage distributed state in a Phoenix application. We've implemented a token bucket rate limiter across multiple nodes, ensuring consistency and fault tolerance even in a distributed setup.

This foundational knowledge is crucial for building scalable, resilient systems that can handle high traffic and maintain data integrity across multiple servers.

In part two, we'll dive deeper into advanced techniques for optimizing distributed systems, including strategies for deploying and scaling distributed applications.

Stay tuned for more insights and practical examples that will help you master the art of distributed applications with Phoenix and Elixir.

P.S. If you'd like to read Elixir Alchemy posts as soon as they get off the press, subscribe to our Elixir Alchemy newsletter and never miss a single post!

Top comments (0)