# Elixir Part 2: Concurrency, OTP & State Machines ## BEAM Process Fundamentals - **Processes are cheap** — ~2KB initial memory, microseconds to spawn, millions concurrent - **No shared memory** — message passing only - **Per-process GC** — no stop-the-world pauses - **Preemptive scheduling** — one scheduler per CPU core, ~4000 reductions then yield - **Process isolation** — one crash doesn't affect others ### Spawning and Messaging ```elixir pid = spawn(fn -> receive do {:greet, name} -> IO.puts("Hello, #{name}!") end end) send(pid, {:greet, "Michael"}) # Linked — bidirectional crash propagation pid = spawn_link(fn -> raise "boom" end) # Monitored — one-directional crash notification ref = Process.monitor(pid) receive do {:DOWN, ^ref, :process, ^pid, reason} -> handle_crash(reason) end ``` --- ## Task — Structured Concurrency ### Basic Patterns ```elixir Task.start(fn -> send_email(user) end) # Fire and forget (linked) task = Task.async(fn -> expensive_computation() end) # Async/await (linked + monitored) result = Task.await(task, 30_000) # MUST await to drain mailbox ``` **Critical:** `Task.async` creates a bidirectional link — if either the caller or task crashes, both crash. This preserves sequential semantics (you'd have crashed doing it synchronously too). ### async_stream — Parallel Enumeration ```elixir Task.async_stream(urls, &fetch_url/1, max_concurrency: 10, # Default: System.schedulers_online() ordered: true, # Maintain input order (default) timeout: 30_000, # Per-task timeout on_timeout: :kill_task # :exit (raise) or :kill_task (return exit tuple) ) |> Stream.filter(&match?({:ok, _}, &1)) |> Enum.map(fn {:ok, result} -> result end) ``` ### Task.Supervisor — Production Patterns ```elixir # In your supervision tree {Task.Supervisor, name: MyApp.TaskSupervisor} # Fire-and-forget (no result tracking) Task.Supervisor.start_child(MyApp.TaskSupervisor, fn -> do_work() end) # Linked async — caller crashes if task crashes Task.Supervisor.async(MyApp.TaskSupervisor, fn -> compute() end) |> Task.await() # Unlinked async — caller survives task failure task = Task.Supervisor.async_nolink(MyApp.TaskSupervisor, fn -> risky_work() end) case Task.yield(task, 5_000) || Task.shutdown(task) do {:ok, result} -> result {:exit, reason} -> handle_failure(reason) nil -> handle_timeout() end ``` **GenServer integration — never `await` in a GenServer** (blocks all other messages): ```elixir def handle_call(:compute, _from, state) do # Start unlinked task, handle result in handle_info Task.Supervisor.async_nolink(MyApp.TaskSupervisor, fn -> heavy_work() end) {:noreply, state} end def handle_info({ref, result}, state) do Process.demonitor(ref, [:flush]) # Clean up the monitor {:noreply, %{state | result: result}} end def handle_info({:DOWN, _ref, :process, _pid, reason}, state) do {:noreply, %{state | error: reason}} # Task crashed end ``` --- ## GenServer — Stateful Server Processes ```elixir defmodule Counter do use GenServer # Client API def start_link(initial \\ 0), do: GenServer.start_link(__MODULE__, initial, name: __MODULE__) def increment, do: GenServer.cast(__MODULE__, :increment) def get, do: GenServer.call(__MODULE__, :get) # Server callbacks @impl true def init(initial), do: {:ok, initial} @impl true def handle_cast(:increment, count), do: {:noreply, count + 1} @impl true def handle_call(:get, _from, count), do: {:reply, count, count} @impl true def handle_info(:tick, count) do Process.send_after(self(), :tick, 1000) {:noreply, count} end end ``` **Key principle:** Callbacks run sequentially — this is both the synchronization mechanism and potential bottleneck. Keep callbacks fast; delegate heavy work to spawned tasks. ### handle_continue — Post-Init Work Split expensive initialization so the process doesn't block its supervisor: ```elixir @impl true def init(config) do {:ok, %{config: config, data: nil}, {:continue, :load_data}} end @impl true def handle_continue(:load_data, state) do data = expensive_load(state.config) {:noreply, %{state | data: data}} end ``` `{:continue, term}` can also be returned from `handle_call`, `handle_cast`, and `handle_info`. ### Delayed Reply Pattern Return `{:noreply, state}` from `handle_call` and reply later with `GenServer.reply/2`: ```elixir @impl true def handle_call(:slow_query, from, state) do Task.Supervisor.start_child(MyApp.TaskSupervisor, fn -> result = run_query() GenServer.reply(from, result) end) {:noreply, state} # Don't block the GenServer end ``` ### Process Registration & Naming ```elixir # Local atom name GenServer.start_link(MyServer, arg, name: MyServer) # Global name (cluster-wide via :global) GenServer.start_link(MyServer, arg, name: {:global, :my_server}) # Via Registry (recommended for dynamic naming) GenServer.start_link(MyServer, arg, name: {:via, Registry, {MyApp.Registry, "worker:#{id}"}}) ``` ### code_change — Hot Code Upgrades ```elixir @impl true def code_change(old_vsn, state, _extra) do # Transform state shape between versions new_state = Map.put_new(state, :new_field, default_value()) {:ok, new_state} end ``` --- ## GenStateMachine — State Machines for Agentic Workflows **Why GenStateMachine over GenServer for agents?** GenServer has a single state and handles all messages uniformly. GenStateMachine (wrapping Erlang's `:gen_statem`) provides: - **Explicit states** with per-state event handling - **Built-in timeouts** — state timeouts (reset on state change), generic timeouts, event timeouts - **Postpone** — defer events until the right state - **State enter callbacks** — run setup logic when entering a state - **Event types** — distinguish calls, casts, info, timeouts, internal events These map naturally onto agentic patterns: an agent in "thinking" state ignores new requests (postpone), has retry timeouts, transitions through well-defined phases, and runs setup on each state entry. ### Installation ```elixir {:gen_state_machine, "~> 3.0"} ``` ### Callback Modes **`:handle_event_function`** (default) — single `handle_event/4` for all states: ```elixir defmodule AgentFSM do use GenStateMachine # Client API def start_link(opts), do: GenStateMachine.start_link(__MODULE__, opts) def submit(pid, task), do: GenStateMachine.call(pid, {:submit, task}) def status(pid), do: GenStateMachine.call(pid, :status) # Server callbacks def init(opts) do {:ok, :idle, %{tasks: [], results: [], config: opts}} end # State: idle def handle_event({:call, from}, {:submit, task}, :idle, data) do {:next_state, :processing, %{data | tasks: [task]}, [{:reply, from, :accepted}, {:state_timeout, 30_000, :timeout}]} end def handle_event({:call, from}, :status, state, data) do {:keep_state_and_data, [{:reply, from, {state, length(data.results)}}]} end # State: processing — with timeout def handle_event(:state_timeout, :timeout, :processing, data) do {:next_state, :error, %{data | error: :timeout}} end # Internal event for completion def handle_event(:info, {:result, result}, :processing, data) do {:next_state, :complete, %{data | results: [result | data.results]}} end # Postpone submissions while processing def handle_event({:call, _from}, {:submit, _task}, :processing, _data) do {:keep_state_and_data, :postpone} end end ``` **`:state_functions`** — each state is a separate function (states must be atoms): ```elixir defmodule WorkflowFSM do use GenStateMachine, callback_mode: [:state_functions, :state_enter] def init(_), do: {:ok, :pending, %{}} # State enter callbacks — run on every state transition def pending(:enter, _old_state, data) do {:keep_state, %{data | entered_at: DateTime.utc_now()}} end def pending({:call, from}, {:start, params}, data) do {:next_state, :running, %{data | params: params}, [{:reply, from, :ok}, {:state_timeout, 60_000, :execution_timeout}]} end def running(:enter, :pending, data) do # Setup when entering running from pending send(self(), :execute) {:keep_state, data} end def running(:info, :execute, data) do result = do_work(data.params) {:next_state, :complete, %{data | result: result}} end def running(:state_timeout, :execution_timeout, data) do {:next_state, :failed, %{data | error: :timeout}} end # Postpone any calls while running def running({:call, _from}, _request, _data) do {:keep_state_and_data, :postpone} end def complete(:enter, _old, data), do: {:keep_state, data} def complete({:call, from}, :get_result, data) do {:keep_state_and_data, [{:reply, from, {:ok, data.result}}]} end def failed(:enter, _old, data), do: {:keep_state, data} def failed({:call, from}, :get_error, data) do {:keep_state_and_data, [{:reply, from, {:error, data.error}}]} end end ``` ### Timeout Types | Type | Behavior | Use Case | |------|----------|----------| | `{:timeout, ms, event}` | Generic — survives state changes | Periodic polling | | `{:state_timeout, ms, event}` | Resets on state change | Per-state deadlines | | `{:event_timeout, ms, event}` | Resets on any event | Inactivity detection | ### Key Actions in Return Tuples ```elixir # Return format: {:next_state, new_state, new_data, actions} actions = [ {:reply, from, response}, # Reply to caller {:state_timeout, 30_000, :deadline}, # State-scoped timeout {:timeout, 5_000, :poll}, # Generic timeout :postpone, # Defer event to next state :hibernate, # Reduce memory footprint {:next_event, :internal, :setup} # Queue internal event ] ``` ### When to Use GenStateMachine vs GenServer | Scenario | Use | |----------|-----| | Simple key-value state, CRUD | GenServer | | Request/response server | GenServer | | Well-defined state transitions | **GenStateMachine** | | Need built-in timeouts per state | **GenStateMachine** | | Events valid only in certain states | **GenStateMachine** | | Agentic workflow with phases | **GenStateMachine** | | Need postpone/defer semantics | **GenStateMachine** | --- ## Supervisors — Let It Crash ### Child Specifications Every supervised process needs a child spec — a map describing how to start, restart, and shut it down: ```elixir %{ id: MyWorker, # Unique identifier (required) start: {MyWorker, :start_link, [arg]}, # {Module, Function, Args} (required) restart: :permanent, # :permanent | :transient | :temporary shutdown: 5_000, # Timeout in ms, or :brutal_kill, or :infinity type: :worker # :worker | :supervisor } ``` `use GenServer` / `use Supervisor` auto-generates `child_spec/1` — you rarely write these by hand. ### Restart Semantics | Strategy | Behavior | |----------|----------| | `:permanent` | Always restart (default) — use for core services | | `:transient` | Restart only on abnormal exit — use for temporary workers that should complete | | `:temporary` | Never restart — spec auto-deleted on exit | **What counts as "normal" exit:** `:normal`, `:shutdown`, `{:shutdown, term}` — these suppress error logs too. ### Crash Budget ```elixir opts = [ strategy: :one_for_one, max_restarts: 3, # Max restarts within window (default: 3) max_seconds: 5 # Window in seconds (default: 5) ] ``` Exceed the budget and the supervisor itself exits with `:shutdown` — escalating to *its* supervisor. ### Strategies ```elixir defmodule MyApp.Application do use Application @impl true def start(_type, _args) do children = [ MyApp.Repo, # Start first {MyApp.Cache, []}, # Depends on Repo {Task.Supervisor, name: MyApp.TaskSupervisor}, MyAppWeb.Endpoint # Start last ] opts = [strategy: :one_for_one, name: MyApp.Supervisor] Supervisor.start_link(children, opts) end end ``` - `:one_for_one` — restart only crashed child (most common) - `:one_for_all` — restart all if any crashes (tightly coupled; usually too aggressive) - `:rest_for_one` — restart crashed + all started after it (use when startup order = dependency order) **Shutdown order is reverse start order.** If a child exceeds its `:shutdown` timeout, it gets killed. ### auto_shutdown (Elixir 1.19+) ```elixir Supervisor.start_link(children, strategy: :one_for_one, auto_shutdown: :any_significant # or :all_significant ) ``` The supervisor exits automatically when significant children (marked with `significant: true` in child spec) terminate. ### DynamicSupervisor For children started on demand at runtime: ```elixir defmodule MyApp.SessionSupervisor do use DynamicSupervisor def start_link(_), do: DynamicSupervisor.start_link(__MODULE__, :ok, name: __MODULE__) def init(:ok), do: DynamicSupervisor.init(strategy: :one_for_one) def start_session(session_id) do DynamicSupervisor.start_child(__MODULE__, {MyApp.Session, session_id}) end def count, do: DynamicSupervisor.count_children(__MODULE__) def list, do: DynamicSupervisor.which_children(__MODULE__) end ``` **When to use DynamicSupervisor vs Supervisor:** - **DynamicSupervisor** — children start on demand, large populations, no ordering needed - **Supervisor** — mostly static children, startup order matters, small fixed set **Bottleneck?** DynamicSupervisor is a single process. Use `PartitionSupervisor` to distribute load: ```elixir {PartitionSupervisor, child_spec: DynamicSupervisor, name: MyApp.PartitionedSupervisors} # Start child on a partition DynamicSupervisor.start_child( {:via, PartitionSupervisor, {MyApp.PartitionedSupervisors, self()}}, child_spec ) ``` --- ## Agent — Simple State Container For simple state sharing when GenServer is overkill: ```elixir {:ok, agent} = Agent.start_link(fn -> %{} end, name: :cache) Agent.update(:cache, &Map.put(&1, :key, "value")) Agent.get(:cache, &Map.get(&1, :key)) # Atomic read-modify-write Agent.get_and_update(:cache, fn state -> {Map.get(state, :key), Map.put(state, :key, "new_value")} end) ``` **Limitations:** No message handling, no lifecycle callbacks, no state enter/exit logic. If you need any of that, use GenServer. Anonymous functions fail across distributed nodes — use `{module, fun, args}` form for distributed Agents. --- ## Application — Supervision Tree Entry Point ### Application Environment ```elixir # mix.exs — compile-time defaults def application do [extra_applications: [:logger], env: [pool_size: 10]] end # config/config.exs — compile-time overrides config :my_app, pool_size: 20 # config/runtime.exs — runtime overrides (env vars, secrets) config :my_app, pool_size: String.to_integer(System.get_env("POOL_SIZE") || "10") # Reading Application.get_env(:my_app, :pool_size, 10) # With default Application.fetch_env!(:my_app, :pool_size) # Raises if missing Application.compile_env(:my_app, :pool_size) # Compile-time validated ``` **Library warning:** Avoid application environment in libraries — it's global mutable state that creates tight coupling. Pass config explicitly via function arguments or init options. ### Start Types | Type | Behavior | |------|----------| | `:permanent` | Node terminates if this app terminates | | `:transient` | Only abnormal exits affect other apps | | `:temporary` | Standalone — failures don't cascade (default) | --- ## Registry & Process Discovery ### Unique Keys — One Process Per Key ```elixir # In supervisor {Registry, keys: :unique, name: MyApp.Registry} # Register a process Registry.register(MyApp.Registry, "session:#{id}", %{meta: "data"}) # Lookup case Registry.lookup(MyApp.Registry, "session:#{id}") do [{pid, value}] -> {:ok, pid} [] -> {:error, :not_found} end # Use as GenServer name GenServer.start_link(MyWorker, arg, name: {:via, Registry, {MyApp.Registry, "worker:#{id}"}}) ``` ### Duplicate Keys — Pub/Sub Pattern ```elixir {Registry, keys: :duplicate, name: MyApp.PubSub} # Subscribe (from each interested process) Registry.register(MyApp.PubSub, "events:orders", []) # Broadcast Registry.dispatch(MyApp.PubSub, "events:orders", fn entries -> for {pid, _} <- entries, do: send(pid, {:order_created, order}) end) ``` ### Performance Tuning ```elixir {Registry, keys: :unique, name: MyApp.Registry, partitions: System.schedulers_online()} ``` Partition the registry for concurrent throughput under high load. ### Registry Metadata ```elixir Registry.put_meta(MyApp.Registry, :config, %{version: 2}) {:ok, config} = Registry.meta(MyApp.Registry, :config) ``` --- ## OTP Production Checklist - Use `:one_for_one` by default; `:rest_for_one` when order = dependency - Set `max_restarts`/`max_seconds` appropriate to your domain - Set `:shutdown` to `:infinity` for nested supervisors, timeout for workers - Use `PartitionSupervisor` if DynamicSupervisor becomes a bottleneck - Use `:transient` restart for temporary workers; `:permanent` for core services - Use `handle_continue` for post-init setup to avoid blocking the supervisor - Use delayed reply pattern in GenServer for long-running operations - Never `await` a Task inside a GenServer — use `async_nolink` + `handle_info` - Always use `Task.Supervisor` over raw `Task.async` in production for visibility - For Registry pub/sub, use `:duplicate` keys and `dispatch/3` - Partition registries with `System.schedulers_online()` under high concurrency - Don't use GenServer for code organization — use plain modules for pure functions - Don't use Agent for complex state — graduate to GenServer