skills/elixir/elixir-part2-concurrency.md

18 KiB

Elixir Part 2: Concurrency, OTP & State Machines

BEAM Process Fundamentals

  • Processes are cheap — ~2KB initial memory, microseconds to spawn, millions concurrent
  • No shared memory — message passing only
  • Per-process GC — no stop-the-world pauses
  • Preemptive scheduling — one scheduler per CPU core, ~4000 reductions then yield
  • Process isolation — one crash doesn't affect others

Spawning and Messaging

pid = spawn(fn ->
  receive do
    {:greet, name} -> IO.puts("Hello, #{name}!")
  end
end)
send(pid, {:greet, "Michael"})

# Linked — bidirectional crash propagation
pid = spawn_link(fn -> raise "boom" end)

# Monitored — one-directional crash notification
ref = Process.monitor(pid)
receive do
  {:DOWN, ^ref, :process, ^pid, reason} -> handle_crash(reason)
end

Task — Structured Concurrency

Basic Patterns

Task.start(fn -> send_email(user) end)                    # Fire and forget (linked)
task = Task.async(fn -> expensive_computation() end)       # Async/await (linked + monitored)
result = Task.await(task, 30_000)                          # MUST await to drain mailbox

Critical: Task.async creates a bidirectional link — if either the caller or task crashes, both crash. This preserves sequential semantics (you'd have crashed doing it synchronously too).

async_stream — Parallel Enumeration

Task.async_stream(urls, &fetch_url/1,
  max_concurrency: 10,    # Default: System.schedulers_online()
  ordered: true,           # Maintain input order (default)
  timeout: 30_000,         # Per-task timeout
  on_timeout: :kill_task   # :exit (raise) or :kill_task (return exit tuple)
)
|> Stream.filter(&match?({:ok, _}, &1))
|> Enum.map(fn {:ok, result} -> result end)

Task.Supervisor — Production Patterns

# In your supervision tree
{Task.Supervisor, name: MyApp.TaskSupervisor}

# Fire-and-forget (no result tracking)
Task.Supervisor.start_child(MyApp.TaskSupervisor, fn -> do_work() end)

# Linked async — caller crashes if task crashes
Task.Supervisor.async(MyApp.TaskSupervisor, fn -> compute() end) |> Task.await()

# Unlinked async — caller survives task failure
task = Task.Supervisor.async_nolink(MyApp.TaskSupervisor, fn -> risky_work() end)
case Task.yield(task, 5_000) || Task.shutdown(task) do
  {:ok, result} -> result
  {:exit, reason} -> handle_failure(reason)
  nil -> handle_timeout()
end

GenServer integration — never await in a GenServer (blocks all other messages):

def handle_call(:compute, _from, state) do
  # Start unlinked task, handle result in handle_info
  Task.Supervisor.async_nolink(MyApp.TaskSupervisor, fn -> heavy_work() end)
  {:noreply, state}
end

def handle_info({ref, result}, state) do
  Process.demonitor(ref, [:flush])  # Clean up the monitor
  {:noreply, %{state | result: result}}
end

def handle_info({:DOWN, _ref, :process, _pid, reason}, state) do
  {:noreply, %{state | error: reason}}  # Task crashed
end

GenServer — Stateful Server Processes

defmodule Counter do
  use GenServer

  # Client API
  def start_link(initial \\ 0), do: GenServer.start_link(__MODULE__, initial, name: __MODULE__)
  def increment, do: GenServer.cast(__MODULE__, :increment)
  def get, do: GenServer.call(__MODULE__, :get)

  # Server callbacks
  @impl true
  def init(initial), do: {:ok, initial}

  @impl true
  def handle_cast(:increment, count), do: {:noreply, count + 1}

  @impl true
  def handle_call(:get, _from, count), do: {:reply, count, count}

  @impl true
  def handle_info(:tick, count) do
    Process.send_after(self(), :tick, 1000)
    {:noreply, count}
  end
end

Key principle: Callbacks run sequentially — this is both the synchronization mechanism and potential bottleneck. Keep callbacks fast; delegate heavy work to spawned tasks.

handle_continue — Post-Init Work

Split expensive initialization so the process doesn't block its supervisor:

@impl true
def init(config) do
  {:ok, %{config: config, data: nil}, {:continue, :load_data}}
end

@impl true
def handle_continue(:load_data, state) do
  data = expensive_load(state.config)
  {:noreply, %{state | data: data}}
end

{:continue, term} can also be returned from handle_call, handle_cast, and handle_info.

Delayed Reply Pattern

Return {:noreply, state} from handle_call and reply later with GenServer.reply/2:

@impl true
def handle_call(:slow_query, from, state) do
  Task.Supervisor.start_child(MyApp.TaskSupervisor, fn ->
    result = run_query()
    GenServer.reply(from, result)
  end)
  {:noreply, state}  # Don't block the GenServer
end

Process Registration & Naming

# Local atom name
GenServer.start_link(MyServer, arg, name: MyServer)

# Global name (cluster-wide via :global)
GenServer.start_link(MyServer, arg, name: {:global, :my_server})

# Via Registry (recommended for dynamic naming)
GenServer.start_link(MyServer, arg,
  name: {:via, Registry, {MyApp.Registry, "worker:#{id}"}})

code_change — Hot Code Upgrades

@impl true
def code_change(old_vsn, state, _extra) do
  # Transform state shape between versions
  new_state = Map.put_new(state, :new_field, default_value())
  {:ok, new_state}
end

GenStateMachine — State Machines for Agentic Workflows

Why GenStateMachine over GenServer for agents? GenServer has a single state and handles all messages uniformly. GenStateMachine (wrapping Erlang's :gen_statem) provides:

  • Explicit states with per-state event handling
  • Built-in timeouts — state timeouts (reset on state change), generic timeouts, event timeouts
  • Postpone — defer events until the right state
  • State enter callbacks — run setup logic when entering a state
  • Event types — distinguish calls, casts, info, timeouts, internal events

These map naturally onto agentic patterns: an agent in "thinking" state ignores new requests (postpone), has retry timeouts, transitions through well-defined phases, and runs setup on each state entry.

Installation

{:gen_state_machine, "~> 3.0"}

Callback Modes

:handle_event_function (default) — single handle_event/4 for all states:

defmodule AgentFSM do
  use GenStateMachine

  # Client API
  def start_link(opts), do: GenStateMachine.start_link(__MODULE__, opts)
  def submit(pid, task), do: GenStateMachine.call(pid, {:submit, task})
  def status(pid), do: GenStateMachine.call(pid, :status)

  # Server callbacks
  def init(opts) do
    {:ok, :idle, %{tasks: [], results: [], config: opts}}
  end

  # State: idle
  def handle_event({:call, from}, {:submit, task}, :idle, data) do
    {:next_state, :processing, %{data | tasks: [task]},
     [{:reply, from, :accepted}, {:state_timeout, 30_000, :timeout}]}
  end

  def handle_event({:call, from}, :status, state, data) do
    {:keep_state_and_data, [{:reply, from, {state, length(data.results)}}]}
  end

  # State: processing — with timeout
  def handle_event(:state_timeout, :timeout, :processing, data) do
    {:next_state, :error, %{data | error: :timeout}}
  end

  # Internal event for completion
  def handle_event(:info, {:result, result}, :processing, data) do
    {:next_state, :complete, %{data | results: [result | data.results]}}
  end

  # Postpone submissions while processing
  def handle_event({:call, _from}, {:submit, _task}, :processing, _data) do
    {:keep_state_and_data, :postpone}
  end
end

:state_functions — each state is a separate function (states must be atoms):

defmodule WorkflowFSM do
  use GenStateMachine, callback_mode: [:state_functions, :state_enter]

  def init(_), do: {:ok, :pending, %{}}

  # State enter callbacks — run on every state transition
  def pending(:enter, _old_state, data) do
    {:keep_state, %{data | entered_at: DateTime.utc_now()}}
  end

  def pending({:call, from}, {:start, params}, data) do
    {:next_state, :running, %{data | params: params},
     [{:reply, from, :ok}, {:state_timeout, 60_000, :execution_timeout}]}
  end

  def running(:enter, :pending, data) do
    # Setup when entering running from pending
    send(self(), :execute)
    {:keep_state, data}
  end

  def running(:info, :execute, data) do
    result = do_work(data.params)
    {:next_state, :complete, %{data | result: result}}
  end

  def running(:state_timeout, :execution_timeout, data) do
    {:next_state, :failed, %{data | error: :timeout}}
  end

  # Postpone any calls while running
  def running({:call, _from}, _request, _data) do
    {:keep_state_and_data, :postpone}
  end

  def complete(:enter, _old, data), do: {:keep_state, data}
  def complete({:call, from}, :get_result, data) do
    {:keep_state_and_data, [{:reply, from, {:ok, data.result}}]}
  end

  def failed(:enter, _old, data), do: {:keep_state, data}
  def failed({:call, from}, :get_error, data) do
    {:keep_state_and_data, [{:reply, from, {:error, data.error}}]}
  end
end

Timeout Types

Type Behavior Use Case
{:timeout, ms, event} Generic — survives state changes Periodic polling
{:state_timeout, ms, event} Resets on state change Per-state deadlines
{:event_timeout, ms, event} Resets on any event Inactivity detection

Key Actions in Return Tuples

# Return format: {:next_state, new_state, new_data, actions}
actions = [
  {:reply, from, response},                    # Reply to caller
  {:state_timeout, 30_000, :deadline},          # State-scoped timeout
  {:timeout, 5_000, :poll},                     # Generic timeout
  :postpone,                                    # Defer event to next state
  :hibernate,                                   # Reduce memory footprint
  {:next_event, :internal, :setup}              # Queue internal event
]

When to Use GenStateMachine vs GenServer

Scenario Use
Simple key-value state, CRUD GenServer
Request/response server GenServer
Well-defined state transitions GenStateMachine
Need built-in timeouts per state GenStateMachine
Events valid only in certain states GenStateMachine
Agentic workflow with phases GenStateMachine
Need postpone/defer semantics GenStateMachine

Supervisors — Let It Crash

Child Specifications

Every supervised process needs a child spec — a map describing how to start, restart, and shut it down:

%{
  id: MyWorker,                    # Unique identifier (required)
  start: {MyWorker, :start_link, [arg]},  # {Module, Function, Args} (required)
  restart: :permanent,             # :permanent | :transient | :temporary
  shutdown: 5_000,                 # Timeout in ms, or :brutal_kill, or :infinity
  type: :worker                    # :worker | :supervisor
}

use GenServer / use Supervisor auto-generates child_spec/1 — you rarely write these by hand.

Restart Semantics

Strategy Behavior
:permanent Always restart (default) — use for core services
:transient Restart only on abnormal exit — use for temporary workers that should complete
:temporary Never restart — spec auto-deleted on exit

What counts as "normal" exit: :normal, :shutdown, {:shutdown, term} — these suppress error logs too.

Crash Budget

opts = [
  strategy: :one_for_one,
  max_restarts: 3,         # Max restarts within window (default: 3)
  max_seconds: 5           # Window in seconds (default: 5)
]

Exceed the budget and the supervisor itself exits with :shutdown — escalating to its supervisor.

Strategies

defmodule MyApp.Application do
  use Application

  @impl true
  def start(_type, _args) do
    children = [
      MyApp.Repo,                                        # Start first
      {MyApp.Cache, []},                                 # Depends on Repo
      {Task.Supervisor, name: MyApp.TaskSupervisor},
      MyAppWeb.Endpoint                                  # Start last
    ]
    opts = [strategy: :one_for_one, name: MyApp.Supervisor]
    Supervisor.start_link(children, opts)
  end
end
  • :one_for_one — restart only crashed child (most common)
  • :one_for_all — restart all if any crashes (tightly coupled; usually too aggressive)
  • :rest_for_one — restart crashed + all started after it (use when startup order = dependency order)

Shutdown order is reverse start order. If a child exceeds its :shutdown timeout, it gets killed.

auto_shutdown (Elixir 1.19+)

Supervisor.start_link(children,
  strategy: :one_for_one,
  auto_shutdown: :any_significant  # or :all_significant
)

The supervisor exits automatically when significant children (marked with significant: true in child spec) terminate.

DynamicSupervisor

For children started on demand at runtime:

defmodule MyApp.SessionSupervisor do
  use DynamicSupervisor

  def start_link(_), do: DynamicSupervisor.start_link(__MODULE__, :ok, name: __MODULE__)
  def init(:ok), do: DynamicSupervisor.init(strategy: :one_for_one)

  def start_session(session_id) do
    DynamicSupervisor.start_child(__MODULE__, {MyApp.Session, session_id})
  end

  def count, do: DynamicSupervisor.count_children(__MODULE__)
  def list, do: DynamicSupervisor.which_children(__MODULE__)
end

When to use DynamicSupervisor vs Supervisor:

  • DynamicSupervisor — children start on demand, large populations, no ordering needed
  • Supervisor — mostly static children, startup order matters, small fixed set

Bottleneck? DynamicSupervisor is a single process. Use PartitionSupervisor to distribute load:

{PartitionSupervisor,
  child_spec: DynamicSupervisor,
  name: MyApp.PartitionedSupervisors}

# Start child on a partition
DynamicSupervisor.start_child(
  {:via, PartitionSupervisor, {MyApp.PartitionedSupervisors, self()}},
  child_spec
)

Agent — Simple State Container

For simple state sharing when GenServer is overkill:

{:ok, agent} = Agent.start_link(fn -> %{} end, name: :cache)

Agent.update(:cache, &Map.put(&1, :key, "value"))
Agent.get(:cache, &Map.get(&1, :key))

# Atomic read-modify-write
Agent.get_and_update(:cache, fn state ->
  {Map.get(state, :key), Map.put(state, :key, "new_value")}
end)

Limitations: No message handling, no lifecycle callbacks, no state enter/exit logic. If you need any of that, use GenServer. Anonymous functions fail across distributed nodes — use {module, fun, args} form for distributed Agents.


Application — Supervision Tree Entry Point

Application Environment

# mix.exs — compile-time defaults
def application do
  [extra_applications: [:logger], env: [pool_size: 10]]
end

# config/config.exs — compile-time overrides
config :my_app, pool_size: 20

# config/runtime.exs — runtime overrides (env vars, secrets)
config :my_app, pool_size: String.to_integer(System.get_env("POOL_SIZE") || "10")

# Reading
Application.get_env(:my_app, :pool_size, 10)     # With default
Application.fetch_env!(:my_app, :pool_size)       # Raises if missing
Application.compile_env(:my_app, :pool_size)      # Compile-time validated

Library warning: Avoid application environment in libraries — it's global mutable state that creates tight coupling. Pass config explicitly via function arguments or init options.

Start Types

Type Behavior
:permanent Node terminates if this app terminates
:transient Only abnormal exits affect other apps
:temporary Standalone — failures don't cascade (default)

Registry & Process Discovery

Unique Keys — One Process Per Key

# In supervisor
{Registry, keys: :unique, name: MyApp.Registry}

# Register a process
Registry.register(MyApp.Registry, "session:#{id}", %{meta: "data"})

# Lookup
case Registry.lookup(MyApp.Registry, "session:#{id}") do
  [{pid, value}] -> {:ok, pid}
  [] -> {:error, :not_found}
end

# Use as GenServer name
GenServer.start_link(MyWorker, arg,
  name: {:via, Registry, {MyApp.Registry, "worker:#{id}"}})

Duplicate Keys — Pub/Sub Pattern

{Registry, keys: :duplicate, name: MyApp.PubSub}

# Subscribe (from each interested process)
Registry.register(MyApp.PubSub, "events:orders", [])

# Broadcast
Registry.dispatch(MyApp.PubSub, "events:orders", fn entries ->
  for {pid, _} <- entries, do: send(pid, {:order_created, order})
end)

Performance Tuning

{Registry, keys: :unique, name: MyApp.Registry,
  partitions: System.schedulers_online()}

Partition the registry for concurrent throughput under high load.

Registry Metadata

Registry.put_meta(MyApp.Registry, :config, %{version: 2})
{:ok, config} = Registry.meta(MyApp.Registry, :config)

OTP Production Checklist

  • Use :one_for_one by default; :rest_for_one when order = dependency
  • Set max_restarts/max_seconds appropriate to your domain
  • Set :shutdown to :infinity for nested supervisors, timeout for workers
  • Use PartitionSupervisor if DynamicSupervisor becomes a bottleneck
  • Use :transient restart for temporary workers; :permanent for core services
  • Use handle_continue for post-init setup to avoid blocking the supervisor
  • Use delayed reply pattern in GenServer for long-running operations
  • Never await a Task inside a GenServer — use async_nolink + handle_info
  • Always use Task.Supervisor over raw Task.async in production for visibility
  • For Registry pub/sub, use :duplicate keys and dispatch/3
  • Partition registries with System.schedulers_online() under high concurrency
  • Don't use GenServer for code organization — use plain modules for pure functions
  • Don't use Agent for complex state — graduate to GenServer