572 lines
18 KiB
Markdown
572 lines
18 KiB
Markdown
# Elixir Part 2: Concurrency, OTP & State Machines
|
|
|
|
## BEAM Process Fundamentals
|
|
|
|
- **Processes are cheap** — ~2KB initial memory, microseconds to spawn, millions concurrent
|
|
- **No shared memory** — message passing only
|
|
- **Per-process GC** — no stop-the-world pauses
|
|
- **Preemptive scheduling** — one scheduler per CPU core, ~4000 reductions then yield
|
|
- **Process isolation** — one crash doesn't affect others
|
|
|
|
### Spawning and Messaging
|
|
```elixir
|
|
pid = spawn(fn ->
|
|
receive do
|
|
{:greet, name} -> IO.puts("Hello, #{name}!")
|
|
end
|
|
end)
|
|
send(pid, {:greet, "Michael"})
|
|
|
|
# Linked — bidirectional crash propagation
|
|
pid = spawn_link(fn -> raise "boom" end)
|
|
|
|
# Monitored — one-directional crash notification
|
|
ref = Process.monitor(pid)
|
|
receive do
|
|
{:DOWN, ^ref, :process, ^pid, reason} -> handle_crash(reason)
|
|
end
|
|
```
|
|
|
|
---
|
|
|
|
## Task — Structured Concurrency
|
|
|
|
### Basic Patterns
|
|
```elixir
|
|
Task.start(fn -> send_email(user) end) # Fire and forget (linked)
|
|
task = Task.async(fn -> expensive_computation() end) # Async/await (linked + monitored)
|
|
result = Task.await(task, 30_000) # MUST await to drain mailbox
|
|
```
|
|
|
|
**Critical:** `Task.async` creates a bidirectional link — if either the caller or task crashes, both crash. This preserves sequential semantics (you'd have crashed doing it synchronously too).
|
|
|
|
### async_stream — Parallel Enumeration
|
|
```elixir
|
|
Task.async_stream(urls, &fetch_url/1,
|
|
max_concurrency: 10, # Default: System.schedulers_online()
|
|
ordered: true, # Maintain input order (default)
|
|
timeout: 30_000, # Per-task timeout
|
|
on_timeout: :kill_task # :exit (raise) or :kill_task (return exit tuple)
|
|
)
|
|
|> Stream.filter(&match?({:ok, _}, &1))
|
|
|> Enum.map(fn {:ok, result} -> result end)
|
|
```
|
|
|
|
### Task.Supervisor — Production Patterns
|
|
```elixir
|
|
# In your supervision tree
|
|
{Task.Supervisor, name: MyApp.TaskSupervisor}
|
|
|
|
# Fire-and-forget (no result tracking)
|
|
Task.Supervisor.start_child(MyApp.TaskSupervisor, fn -> do_work() end)
|
|
|
|
# Linked async — caller crashes if task crashes
|
|
Task.Supervisor.async(MyApp.TaskSupervisor, fn -> compute() end) |> Task.await()
|
|
|
|
# Unlinked async — caller survives task failure
|
|
task = Task.Supervisor.async_nolink(MyApp.TaskSupervisor, fn -> risky_work() end)
|
|
case Task.yield(task, 5_000) || Task.shutdown(task) do
|
|
{:ok, result} -> result
|
|
{:exit, reason} -> handle_failure(reason)
|
|
nil -> handle_timeout()
|
|
end
|
|
```
|
|
|
|
**GenServer integration — never `await` in a GenServer** (blocks all other messages):
|
|
```elixir
|
|
def handle_call(:compute, _from, state) do
|
|
# Start unlinked task, handle result in handle_info
|
|
Task.Supervisor.async_nolink(MyApp.TaskSupervisor, fn -> heavy_work() end)
|
|
{:noreply, state}
|
|
end
|
|
|
|
def handle_info({ref, result}, state) do
|
|
Process.demonitor(ref, [:flush]) # Clean up the monitor
|
|
{:noreply, %{state | result: result}}
|
|
end
|
|
|
|
def handle_info({:DOWN, _ref, :process, _pid, reason}, state) do
|
|
{:noreply, %{state | error: reason}} # Task crashed
|
|
end
|
|
```
|
|
|
|
---
|
|
|
|
## GenServer — Stateful Server Processes
|
|
|
|
```elixir
|
|
defmodule Counter do
|
|
use GenServer
|
|
|
|
# Client API
|
|
def start_link(initial \\ 0), do: GenServer.start_link(__MODULE__, initial, name: __MODULE__)
|
|
def increment, do: GenServer.cast(__MODULE__, :increment)
|
|
def get, do: GenServer.call(__MODULE__, :get)
|
|
|
|
# Server callbacks
|
|
@impl true
|
|
def init(initial), do: {:ok, initial}
|
|
|
|
@impl true
|
|
def handle_cast(:increment, count), do: {:noreply, count + 1}
|
|
|
|
@impl true
|
|
def handle_call(:get, _from, count), do: {:reply, count, count}
|
|
|
|
@impl true
|
|
def handle_info(:tick, count) do
|
|
Process.send_after(self(), :tick, 1000)
|
|
{:noreply, count}
|
|
end
|
|
end
|
|
```
|
|
|
|
**Key principle:** Callbacks run sequentially — this is both the synchronization mechanism and potential bottleneck. Keep callbacks fast; delegate heavy work to spawned tasks.
|
|
|
|
### handle_continue — Post-Init Work
|
|
|
|
Split expensive initialization so the process doesn't block its supervisor:
|
|
|
|
```elixir
|
|
@impl true
|
|
def init(config) do
|
|
{:ok, %{config: config, data: nil}, {:continue, :load_data}}
|
|
end
|
|
|
|
@impl true
|
|
def handle_continue(:load_data, state) do
|
|
data = expensive_load(state.config)
|
|
{:noreply, %{state | data: data}}
|
|
end
|
|
```
|
|
|
|
`{:continue, term}` can also be returned from `handle_call`, `handle_cast`, and `handle_info`.
|
|
|
|
### Delayed Reply Pattern
|
|
|
|
Return `{:noreply, state}` from `handle_call` and reply later with `GenServer.reply/2`:
|
|
|
|
```elixir
|
|
@impl true
|
|
def handle_call(:slow_query, from, state) do
|
|
Task.Supervisor.start_child(MyApp.TaskSupervisor, fn ->
|
|
result = run_query()
|
|
GenServer.reply(from, result)
|
|
end)
|
|
{:noreply, state} # Don't block the GenServer
|
|
end
|
|
```
|
|
|
|
### Process Registration & Naming
|
|
|
|
```elixir
|
|
# Local atom name
|
|
GenServer.start_link(MyServer, arg, name: MyServer)
|
|
|
|
# Global name (cluster-wide via :global)
|
|
GenServer.start_link(MyServer, arg, name: {:global, :my_server})
|
|
|
|
# Via Registry (recommended for dynamic naming)
|
|
GenServer.start_link(MyServer, arg,
|
|
name: {:via, Registry, {MyApp.Registry, "worker:#{id}"}})
|
|
```
|
|
|
|
### code_change — Hot Code Upgrades
|
|
|
|
```elixir
|
|
@impl true
|
|
def code_change(old_vsn, state, _extra) do
|
|
# Transform state shape between versions
|
|
new_state = Map.put_new(state, :new_field, default_value())
|
|
{:ok, new_state}
|
|
end
|
|
```
|
|
|
|
---
|
|
|
|
## GenStateMachine — State Machines for Agentic Workflows
|
|
|
|
**Why GenStateMachine over GenServer for agents?** GenServer has a single state and handles all messages uniformly. GenStateMachine (wrapping Erlang's `:gen_statem`) provides:
|
|
- **Explicit states** with per-state event handling
|
|
- **Built-in timeouts** — state timeouts (reset on state change), generic timeouts, event timeouts
|
|
- **Postpone** — defer events until the right state
|
|
- **State enter callbacks** — run setup logic when entering a state
|
|
- **Event types** — distinguish calls, casts, info, timeouts, internal events
|
|
|
|
These map naturally onto agentic patterns: an agent in "thinking" state ignores new requests (postpone), has retry timeouts, transitions through well-defined phases, and runs setup on each state entry.
|
|
|
|
### Installation
|
|
```elixir
|
|
{:gen_state_machine, "~> 3.0"}
|
|
```
|
|
|
|
### Callback Modes
|
|
|
|
**`:handle_event_function`** (default) — single `handle_event/4` for all states:
|
|
```elixir
|
|
defmodule AgentFSM do
|
|
use GenStateMachine
|
|
|
|
# Client API
|
|
def start_link(opts), do: GenStateMachine.start_link(__MODULE__, opts)
|
|
def submit(pid, task), do: GenStateMachine.call(pid, {:submit, task})
|
|
def status(pid), do: GenStateMachine.call(pid, :status)
|
|
|
|
# Server callbacks
|
|
def init(opts) do
|
|
{:ok, :idle, %{tasks: [], results: [], config: opts}}
|
|
end
|
|
|
|
# State: idle
|
|
def handle_event({:call, from}, {:submit, task}, :idle, data) do
|
|
{:next_state, :processing, %{data | tasks: [task]},
|
|
[{:reply, from, :accepted}, {:state_timeout, 30_000, :timeout}]}
|
|
end
|
|
|
|
def handle_event({:call, from}, :status, state, data) do
|
|
{:keep_state_and_data, [{:reply, from, {state, length(data.results)}}]}
|
|
end
|
|
|
|
# State: processing — with timeout
|
|
def handle_event(:state_timeout, :timeout, :processing, data) do
|
|
{:next_state, :error, %{data | error: :timeout}}
|
|
end
|
|
|
|
# Internal event for completion
|
|
def handle_event(:info, {:result, result}, :processing, data) do
|
|
{:next_state, :complete, %{data | results: [result | data.results]}}
|
|
end
|
|
|
|
# Postpone submissions while processing
|
|
def handle_event({:call, _from}, {:submit, _task}, :processing, _data) do
|
|
{:keep_state_and_data, :postpone}
|
|
end
|
|
end
|
|
```
|
|
|
|
**`:state_functions`** — each state is a separate function (states must be atoms):
|
|
```elixir
|
|
defmodule WorkflowFSM do
|
|
use GenStateMachine, callback_mode: [:state_functions, :state_enter]
|
|
|
|
def init(_), do: {:ok, :pending, %{}}
|
|
|
|
# State enter callbacks — run on every state transition
|
|
def pending(:enter, _old_state, data) do
|
|
{:keep_state, %{data | entered_at: DateTime.utc_now()}}
|
|
end
|
|
|
|
def pending({:call, from}, {:start, params}, data) do
|
|
{:next_state, :running, %{data | params: params},
|
|
[{:reply, from, :ok}, {:state_timeout, 60_000, :execution_timeout}]}
|
|
end
|
|
|
|
def running(:enter, :pending, data) do
|
|
# Setup when entering running from pending
|
|
send(self(), :execute)
|
|
{:keep_state, data}
|
|
end
|
|
|
|
def running(:info, :execute, data) do
|
|
result = do_work(data.params)
|
|
{:next_state, :complete, %{data | result: result}}
|
|
end
|
|
|
|
def running(:state_timeout, :execution_timeout, data) do
|
|
{:next_state, :failed, %{data | error: :timeout}}
|
|
end
|
|
|
|
# Postpone any calls while running
|
|
def running({:call, _from}, _request, _data) do
|
|
{:keep_state_and_data, :postpone}
|
|
end
|
|
|
|
def complete(:enter, _old, data), do: {:keep_state, data}
|
|
def complete({:call, from}, :get_result, data) do
|
|
{:keep_state_and_data, [{:reply, from, {:ok, data.result}}]}
|
|
end
|
|
|
|
def failed(:enter, _old, data), do: {:keep_state, data}
|
|
def failed({:call, from}, :get_error, data) do
|
|
{:keep_state_and_data, [{:reply, from, {:error, data.error}}]}
|
|
end
|
|
end
|
|
```
|
|
|
|
### Timeout Types
|
|
|
|
| Type | Behavior | Use Case |
|
|
|------|----------|----------|
|
|
| `{:timeout, ms, event}` | Generic — survives state changes | Periodic polling |
|
|
| `{:state_timeout, ms, event}` | Resets on state change | Per-state deadlines |
|
|
| `{:event_timeout, ms, event}` | Resets on any event | Inactivity detection |
|
|
|
|
### Key Actions in Return Tuples
|
|
|
|
```elixir
|
|
# Return format: {:next_state, new_state, new_data, actions}
|
|
actions = [
|
|
{:reply, from, response}, # Reply to caller
|
|
{:state_timeout, 30_000, :deadline}, # State-scoped timeout
|
|
{:timeout, 5_000, :poll}, # Generic timeout
|
|
:postpone, # Defer event to next state
|
|
:hibernate, # Reduce memory footprint
|
|
{:next_event, :internal, :setup} # Queue internal event
|
|
]
|
|
```
|
|
|
|
### When to Use GenStateMachine vs GenServer
|
|
|
|
| Scenario | Use |
|
|
|----------|-----|
|
|
| Simple key-value state, CRUD | GenServer |
|
|
| Request/response server | GenServer |
|
|
| Well-defined state transitions | **GenStateMachine** |
|
|
| Need built-in timeouts per state | **GenStateMachine** |
|
|
| Events valid only in certain states | **GenStateMachine** |
|
|
| Agentic workflow with phases | **GenStateMachine** |
|
|
| Need postpone/defer semantics | **GenStateMachine** |
|
|
|
|
---
|
|
|
|
## Supervisors — Let It Crash
|
|
|
|
### Child Specifications
|
|
|
|
Every supervised process needs a child spec — a map describing how to start, restart, and shut it down:
|
|
|
|
```elixir
|
|
%{
|
|
id: MyWorker, # Unique identifier (required)
|
|
start: {MyWorker, :start_link, [arg]}, # {Module, Function, Args} (required)
|
|
restart: :permanent, # :permanent | :transient | :temporary
|
|
shutdown: 5_000, # Timeout in ms, or :brutal_kill, or :infinity
|
|
type: :worker # :worker | :supervisor
|
|
}
|
|
```
|
|
|
|
`use GenServer` / `use Supervisor` auto-generates `child_spec/1` — you rarely write these by hand.
|
|
|
|
### Restart Semantics
|
|
|
|
| Strategy | Behavior |
|
|
|----------|----------|
|
|
| `:permanent` | Always restart (default) — use for core services |
|
|
| `:transient` | Restart only on abnormal exit — use for temporary workers that should complete |
|
|
| `:temporary` | Never restart — spec auto-deleted on exit |
|
|
|
|
**What counts as "normal" exit:** `:normal`, `:shutdown`, `{:shutdown, term}` — these suppress error logs too.
|
|
|
|
### Crash Budget
|
|
|
|
```elixir
|
|
opts = [
|
|
strategy: :one_for_one,
|
|
max_restarts: 3, # Max restarts within window (default: 3)
|
|
max_seconds: 5 # Window in seconds (default: 5)
|
|
]
|
|
```
|
|
|
|
Exceed the budget and the supervisor itself exits with `:shutdown` — escalating to *its* supervisor.
|
|
|
|
### Strategies
|
|
|
|
```elixir
|
|
defmodule MyApp.Application do
|
|
use Application
|
|
|
|
@impl true
|
|
def start(_type, _args) do
|
|
children = [
|
|
MyApp.Repo, # Start first
|
|
{MyApp.Cache, []}, # Depends on Repo
|
|
{Task.Supervisor, name: MyApp.TaskSupervisor},
|
|
MyAppWeb.Endpoint # Start last
|
|
]
|
|
opts = [strategy: :one_for_one, name: MyApp.Supervisor]
|
|
Supervisor.start_link(children, opts)
|
|
end
|
|
end
|
|
```
|
|
|
|
- `:one_for_one` — restart only crashed child (most common)
|
|
- `:one_for_all` — restart all if any crashes (tightly coupled; usually too aggressive)
|
|
- `:rest_for_one` — restart crashed + all started after it (use when startup order = dependency order)
|
|
|
|
**Shutdown order is reverse start order.** If a child exceeds its `:shutdown` timeout, it gets killed.
|
|
|
|
### auto_shutdown (Elixir 1.19+)
|
|
|
|
```elixir
|
|
Supervisor.start_link(children,
|
|
strategy: :one_for_one,
|
|
auto_shutdown: :any_significant # or :all_significant
|
|
)
|
|
```
|
|
|
|
The supervisor exits automatically when significant children (marked with `significant: true` in child spec) terminate.
|
|
|
|
### DynamicSupervisor
|
|
|
|
For children started on demand at runtime:
|
|
|
|
```elixir
|
|
defmodule MyApp.SessionSupervisor do
|
|
use DynamicSupervisor
|
|
|
|
def start_link(_), do: DynamicSupervisor.start_link(__MODULE__, :ok, name: __MODULE__)
|
|
def init(:ok), do: DynamicSupervisor.init(strategy: :one_for_one)
|
|
|
|
def start_session(session_id) do
|
|
DynamicSupervisor.start_child(__MODULE__, {MyApp.Session, session_id})
|
|
end
|
|
|
|
def count, do: DynamicSupervisor.count_children(__MODULE__)
|
|
def list, do: DynamicSupervisor.which_children(__MODULE__)
|
|
end
|
|
```
|
|
|
|
**When to use DynamicSupervisor vs Supervisor:**
|
|
- **DynamicSupervisor** — children start on demand, large populations, no ordering needed
|
|
- **Supervisor** — mostly static children, startup order matters, small fixed set
|
|
|
|
**Bottleneck?** DynamicSupervisor is a single process. Use `PartitionSupervisor` to distribute load:
|
|
|
|
```elixir
|
|
{PartitionSupervisor,
|
|
child_spec: DynamicSupervisor,
|
|
name: MyApp.PartitionedSupervisors}
|
|
|
|
# Start child on a partition
|
|
DynamicSupervisor.start_child(
|
|
{:via, PartitionSupervisor, {MyApp.PartitionedSupervisors, self()}},
|
|
child_spec
|
|
)
|
|
```
|
|
|
|
---
|
|
|
|
## Agent — Simple State Container
|
|
|
|
For simple state sharing when GenServer is overkill:
|
|
|
|
```elixir
|
|
{:ok, agent} = Agent.start_link(fn -> %{} end, name: :cache)
|
|
|
|
Agent.update(:cache, &Map.put(&1, :key, "value"))
|
|
Agent.get(:cache, &Map.get(&1, :key))
|
|
|
|
# Atomic read-modify-write
|
|
Agent.get_and_update(:cache, fn state ->
|
|
{Map.get(state, :key), Map.put(state, :key, "new_value")}
|
|
end)
|
|
```
|
|
|
|
**Limitations:** No message handling, no lifecycle callbacks, no state enter/exit logic. If you need any of that, use GenServer. Anonymous functions fail across distributed nodes — use `{module, fun, args}` form for distributed Agents.
|
|
|
|
---
|
|
|
|
## Application — Supervision Tree Entry Point
|
|
|
|
### Application Environment
|
|
|
|
```elixir
|
|
# mix.exs — compile-time defaults
|
|
def application do
|
|
[extra_applications: [:logger], env: [pool_size: 10]]
|
|
end
|
|
|
|
# config/config.exs — compile-time overrides
|
|
config :my_app, pool_size: 20
|
|
|
|
# config/runtime.exs — runtime overrides (env vars, secrets)
|
|
config :my_app, pool_size: String.to_integer(System.get_env("POOL_SIZE") || "10")
|
|
|
|
# Reading
|
|
Application.get_env(:my_app, :pool_size, 10) # With default
|
|
Application.fetch_env!(:my_app, :pool_size) # Raises if missing
|
|
Application.compile_env(:my_app, :pool_size) # Compile-time validated
|
|
```
|
|
|
|
**Library warning:** Avoid application environment in libraries — it's global mutable state that creates tight coupling. Pass config explicitly via function arguments or init options.
|
|
|
|
### Start Types
|
|
|
|
| Type | Behavior |
|
|
|------|----------|
|
|
| `:permanent` | Node terminates if this app terminates |
|
|
| `:transient` | Only abnormal exits affect other apps |
|
|
| `:temporary` | Standalone — failures don't cascade (default) |
|
|
|
|
---
|
|
|
|
## Registry & Process Discovery
|
|
|
|
### Unique Keys — One Process Per Key
|
|
|
|
```elixir
|
|
# In supervisor
|
|
{Registry, keys: :unique, name: MyApp.Registry}
|
|
|
|
# Register a process
|
|
Registry.register(MyApp.Registry, "session:#{id}", %{meta: "data"})
|
|
|
|
# Lookup
|
|
case Registry.lookup(MyApp.Registry, "session:#{id}") do
|
|
[{pid, value}] -> {:ok, pid}
|
|
[] -> {:error, :not_found}
|
|
end
|
|
|
|
# Use as GenServer name
|
|
GenServer.start_link(MyWorker, arg,
|
|
name: {:via, Registry, {MyApp.Registry, "worker:#{id}"}})
|
|
```
|
|
|
|
### Duplicate Keys — Pub/Sub Pattern
|
|
|
|
```elixir
|
|
{Registry, keys: :duplicate, name: MyApp.PubSub}
|
|
|
|
# Subscribe (from each interested process)
|
|
Registry.register(MyApp.PubSub, "events:orders", [])
|
|
|
|
# Broadcast
|
|
Registry.dispatch(MyApp.PubSub, "events:orders", fn entries ->
|
|
for {pid, _} <- entries, do: send(pid, {:order_created, order})
|
|
end)
|
|
```
|
|
|
|
### Performance Tuning
|
|
|
|
```elixir
|
|
{Registry, keys: :unique, name: MyApp.Registry,
|
|
partitions: System.schedulers_online()}
|
|
```
|
|
|
|
Partition the registry for concurrent throughput under high load.
|
|
|
|
### Registry Metadata
|
|
|
|
```elixir
|
|
Registry.put_meta(MyApp.Registry, :config, %{version: 2})
|
|
{:ok, config} = Registry.meta(MyApp.Registry, :config)
|
|
```
|
|
|
|
---
|
|
|
|
## OTP Production Checklist
|
|
|
|
- Use `:one_for_one` by default; `:rest_for_one` when order = dependency
|
|
- Set `max_restarts`/`max_seconds` appropriate to your domain
|
|
- Set `:shutdown` to `:infinity` for nested supervisors, timeout for workers
|
|
- Use `PartitionSupervisor` if DynamicSupervisor becomes a bottleneck
|
|
- Use `:transient` restart for temporary workers; `:permanent` for core services
|
|
- Use `handle_continue` for post-init setup to avoid blocking the supervisor
|
|
- Use delayed reply pattern in GenServer for long-running operations
|
|
- Never `await` a Task inside a GenServer — use `async_nolink` + `handle_info`
|
|
- Always use `Task.Supervisor` over raw `Task.async` in production for visibility
|
|
- For Registry pub/sub, use `:duplicate` keys and `dispatch/3`
|
|
- Partition registries with `System.schedulers_online()` under high concurrency
|
|
- Don't use GenServer for code organization — use plain modules for pure functions
|
|
- Don't use Agent for complex state — graduate to GenServer
|