skills/elixir/elixir-part2-concurrency.md

8.3 KiB

Elixir Part 2: Concurrency, OTP & State Machines

BEAM Process Fundamentals

  • Processes are cheap — ~2KB initial memory, microseconds to spawn, millions concurrent
  • No shared memory — message passing only
  • Per-process GC — no stop-the-world pauses
  • Preemptive scheduling — one scheduler per CPU core, ~4000 reductions then yield
  • Process isolation — one crash doesn't affect others

Spawning and Messaging

pid = spawn(fn ->
  receive do
    {:greet, name} -> IO.puts("Hello, #{name}!")
  end
end)
send(pid, {:greet, "Michael"})

# Linked — bidirectional crash propagation
pid = spawn_link(fn -> raise "boom" end)

# Monitored — one-directional crash notification
ref = Process.monitor(pid)
receive do
  {:DOWN, ^ref, :process, ^pid, reason} -> handle_crash(reason)
end

Task — Structured Concurrency

Task.start(fn -> send_email(user) end)                    # Fire and forget
task = Task.async(fn -> expensive_computation() end)       # Async/await
result = Task.await(task, 30_000)

Task.async_stream(urls, &fetch_url/1, max_concurrency: 10)  # Parallel map
|> Enum.to_list()

GenServer — Stateful Server Processes

defmodule Counter do
  use GenServer

  # Client API
  def start_link(initial \\ 0), do: GenServer.start_link(__MODULE__, initial, name: __MODULE__)
  def increment, do: GenServer.cast(__MODULE__, :increment)
  def get, do: GenServer.call(__MODULE__, :get)

  # Server callbacks
  @impl true
  def init(initial), do: {:ok, initial}

  @impl true
  def handle_cast(:increment, count), do: {:noreply, count + 1}

  @impl true
  def handle_call(:get, _from, count), do: {:reply, count, count}

  @impl true
  def handle_info(:tick, count) do
    Process.send_after(self(), :tick, 1000)
    {:noreply, count}
  end
end

Key principle: Callbacks run sequentially — this is both the synchronization mechanism and potential bottleneck. Keep callbacks fast; delegate heavy work to spawned tasks.


GenStateMachine — State Machines for Agentic Workflows

Why GenStateMachine over GenServer for agents? GenServer has a single state and handles all messages uniformly. GenStateMachine (wrapping Erlang's :gen_statem) provides:

  • Explicit states with per-state event handling
  • Built-in timeouts — state timeouts (reset on state change), generic timeouts, event timeouts
  • Postpone — defer events until the right state
  • State enter callbacks — run setup logic when entering a state
  • Event types — distinguish calls, casts, info, timeouts, internal events

These map naturally onto agentic patterns: an agent in "thinking" state ignores new requests (postpone), has retry timeouts, transitions through well-defined phases, and runs setup on each state entry.

Installation

{:gen_state_machine, "~> 3.0"}

Callback Modes

:handle_event_function (default) — single handle_event/4 for all states:

defmodule AgentFSM do
  use GenStateMachine

  # Client API
  def start_link(opts), do: GenStateMachine.start_link(__MODULE__, opts)
  def submit(pid, task), do: GenStateMachine.call(pid, {:submit, task})
  def status(pid), do: GenStateMachine.call(pid, :status)

  # Server callbacks
  def init(opts) do
    {:ok, :idle, %{tasks: [], results: [], config: opts}}
  end

  # State: idle
  def handle_event({:call, from}, {:submit, task}, :idle, data) do
    {:next_state, :processing, %{data | tasks: [task]},
     [{:reply, from, :accepted}, {:state_timeout, 30_000, :timeout}]}
  end

  def handle_event({:call, from}, :status, state, data) do
    {:keep_state_and_data, [{:reply, from, {state, length(data.results)}}]}
  end

  # State: processing — with timeout
  def handle_event(:state_timeout, :timeout, :processing, data) do
    {:next_state, :error, %{data | error: :timeout}}
  end

  # Internal event for completion
  def handle_event(:info, {:result, result}, :processing, data) do
    {:next_state, :complete, %{data | results: [result | data.results]}}
  end

  # Postpone submissions while processing
  def handle_event({:call, _from}, {:submit, _task}, :processing, _data) do
    {:keep_state_and_data, :postpone}
  end
end

:state_functions — each state is a separate function (states must be atoms):

defmodule WorkflowFSM do
  use GenStateMachine, callback_mode: [:state_functions, :state_enter]

  def init(_), do: {:ok, :pending, %{}}

  # State enter callbacks — run on every state transition
  def pending(:enter, _old_state, data) do
    {:keep_state, %{data | entered_at: DateTime.utc_now()}}
  end

  def pending({:call, from}, {:start, params}, data) do
    {:next_state, :running, %{data | params: params},
     [{:reply, from, :ok}, {:state_timeout, 60_000, :execution_timeout}]}
  end

  def running(:enter, :pending, data) do
    # Setup when entering running from pending
    send(self(), :execute)
    {:keep_state, data}
  end

  def running(:info, :execute, data) do
    result = do_work(data.params)
    {:next_state, :complete, %{data | result: result}}
  end

  def running(:state_timeout, :execution_timeout, data) do
    {:next_state, :failed, %{data | error: :timeout}}
  end

  # Postpone any calls while running
  def running({:call, _from}, _request, _data) do
    {:keep_state_and_data, :postpone}
  end

  def complete(:enter, _old, data), do: {:keep_state, data}
  def complete({:call, from}, :get_result, data) do
    {:keep_state_and_data, [{:reply, from, {:ok, data.result}}]}
  end

  def failed(:enter, _old, data), do: {:keep_state, data}
  def failed({:call, from}, :get_error, data) do
    {:keep_state_and_data, [{:reply, from, {:error, data.error}}]}
  end
end

Timeout Types

Type Behavior Use Case
{:timeout, ms, event} Generic — survives state changes Periodic polling
{:state_timeout, ms, event} Resets on state change Per-state deadlines
{:event_timeout, ms, event} Resets on any event Inactivity detection

Key Actions in Return Tuples

# Return format: {:next_state, new_state, new_data, actions}
actions = [
  {:reply, from, response},                    # Reply to caller
  {:state_timeout, 30_000, :deadline},          # State-scoped timeout
  {:timeout, 5_000, :poll},                     # Generic timeout
  :postpone,                                    # Defer event to next state
  :hibernate,                                   # Reduce memory footprint
  {:next_event, :internal, :setup}              # Queue internal event
]

When to Use GenStateMachine vs GenServer

Scenario Use
Simple key-value state, CRUD GenServer
Request/response server GenServer
Well-defined state transitions GenStateMachine
Need built-in timeouts per state GenStateMachine
Events valid only in certain states GenStateMachine
Agentic workflow with phases GenStateMachine
Need postpone/defer semantics GenStateMachine

Supervisors — Let It Crash

defmodule MyApp.Application do
  use Application

  @impl true
  def start(_type, _args) do
    children = [
      MyApp.Repo,
      {MyApp.Cache, []},
      {Task.Supervisor, name: MyApp.TaskSupervisor},
      MyAppWeb.Endpoint
    ]
    opts = [strategy: :one_for_one, name: MyApp.Supervisor]
    Supervisor.start_link(children, opts)
  end
end

Strategies:

  • :one_for_one — restart only crashed child (most common)
  • :one_for_all — restart all if any crashes (tightly coupled)
  • :rest_for_one — restart crashed + all started after it

DynamicSupervisor

defmodule MyApp.SessionSupervisor do
  use DynamicSupervisor

  def start_link(_), do: DynamicSupervisor.start_link(__MODULE__, :ok, name: __MODULE__)
  def init(:ok), do: DynamicSupervisor.init(strategy: :one_for_one)

  def start_session(session_id) do
    DynamicSupervisor.start_child(__MODULE__, {MyApp.Session, session_id})
  end
end

Registry & Process Discovery

# In supervisor
{Registry, keys: :unique, name: MyApp.Registry}

# Register a process
Registry.register(MyApp.Registry, "session:#{id}", %{})

# Lookup
case Registry.lookup(MyApp.Registry, "session:#{id}") do
  [{pid, _value}] -> {:ok, pid}
  [] -> {:error, :not_found}
end

# Use as GenServer name
GenServer.start_link(MyWorker, arg, name: {:via, Registry, {MyApp.Registry, "worker:#{id}"}})