Deep dive: comprehensive OTP, Ecto, cortex-native CI/CD
This commit is contained in:
parent
8fc59eee23
commit
aa4f55e8f1
@ -27,14 +27,67 @@ receive do
|
|||||||
end
|
end
|
||||||
```
|
```
|
||||||
|
|
||||||
### Task — Structured Concurrency
|
---
|
||||||
```elixir
|
|
||||||
Task.start(fn -> send_email(user) end) # Fire and forget
|
|
||||||
task = Task.async(fn -> expensive_computation() end) # Async/await
|
|
||||||
result = Task.await(task, 30_000)
|
|
||||||
|
|
||||||
Task.async_stream(urls, &fetch_url/1, max_concurrency: 10) # Parallel map
|
## Task — Structured Concurrency
|
||||||
|> Enum.to_list()
|
|
||||||
|
### Basic Patterns
|
||||||
|
```elixir
|
||||||
|
Task.start(fn -> send_email(user) end) # Fire and forget (linked)
|
||||||
|
task = Task.async(fn -> expensive_computation() end) # Async/await (linked + monitored)
|
||||||
|
result = Task.await(task, 30_000) # MUST await to drain mailbox
|
||||||
|
```
|
||||||
|
|
||||||
|
**Critical:** `Task.async` creates a bidirectional link — if either the caller or task crashes, both crash. This preserves sequential semantics (you'd have crashed doing it synchronously too).
|
||||||
|
|
||||||
|
### async_stream — Parallel Enumeration
|
||||||
|
```elixir
|
||||||
|
Task.async_stream(urls, &fetch_url/1,
|
||||||
|
max_concurrency: 10, # Default: System.schedulers_online()
|
||||||
|
ordered: true, # Maintain input order (default)
|
||||||
|
timeout: 30_000, # Per-task timeout
|
||||||
|
on_timeout: :kill_task # :exit (raise) or :kill_task (return exit tuple)
|
||||||
|
)
|
||||||
|
|> Stream.filter(&match?({:ok, _}, &1))
|
||||||
|
|> Enum.map(fn {:ok, result} -> result end)
|
||||||
|
```
|
||||||
|
|
||||||
|
### Task.Supervisor — Production Patterns
|
||||||
|
```elixir
|
||||||
|
# In your supervision tree
|
||||||
|
{Task.Supervisor, name: MyApp.TaskSupervisor}
|
||||||
|
|
||||||
|
# Fire-and-forget (no result tracking)
|
||||||
|
Task.Supervisor.start_child(MyApp.TaskSupervisor, fn -> do_work() end)
|
||||||
|
|
||||||
|
# Linked async — caller crashes if task crashes
|
||||||
|
Task.Supervisor.async(MyApp.TaskSupervisor, fn -> compute() end) |> Task.await()
|
||||||
|
|
||||||
|
# Unlinked async — caller survives task failure
|
||||||
|
task = Task.Supervisor.async_nolink(MyApp.TaskSupervisor, fn -> risky_work() end)
|
||||||
|
case Task.yield(task, 5_000) || Task.shutdown(task) do
|
||||||
|
{:ok, result} -> result
|
||||||
|
{:exit, reason} -> handle_failure(reason)
|
||||||
|
nil -> handle_timeout()
|
||||||
|
end
|
||||||
|
```
|
||||||
|
|
||||||
|
**GenServer integration — never `await` in a GenServer** (blocks all other messages):
|
||||||
|
```elixir
|
||||||
|
def handle_call(:compute, _from, state) do
|
||||||
|
# Start unlinked task, handle result in handle_info
|
||||||
|
Task.Supervisor.async_nolink(MyApp.TaskSupervisor, fn -> heavy_work() end)
|
||||||
|
{:noreply, state}
|
||||||
|
end
|
||||||
|
|
||||||
|
def handle_info({ref, result}, state) do
|
||||||
|
Process.demonitor(ref, [:flush]) # Clean up the monitor
|
||||||
|
{:noreply, %{state | result: result}}
|
||||||
|
end
|
||||||
|
|
||||||
|
def handle_info({:DOWN, _ref, :process, _pid, reason}, state) do
|
||||||
|
{:noreply, %{state | error: reason}} # Task crashed
|
||||||
|
end
|
||||||
```
|
```
|
||||||
|
|
||||||
---
|
---
|
||||||
@ -70,6 +123,65 @@ end
|
|||||||
|
|
||||||
**Key principle:** Callbacks run sequentially — this is both the synchronization mechanism and potential bottleneck. Keep callbacks fast; delegate heavy work to spawned tasks.
|
**Key principle:** Callbacks run sequentially — this is both the synchronization mechanism and potential bottleneck. Keep callbacks fast; delegate heavy work to spawned tasks.
|
||||||
|
|
||||||
|
### handle_continue — Post-Init Work
|
||||||
|
|
||||||
|
Split expensive initialization so the process doesn't block its supervisor:
|
||||||
|
|
||||||
|
```elixir
|
||||||
|
@impl true
|
||||||
|
def init(config) do
|
||||||
|
{:ok, %{config: config, data: nil}, {:continue, :load_data}}
|
||||||
|
end
|
||||||
|
|
||||||
|
@impl true
|
||||||
|
def handle_continue(:load_data, state) do
|
||||||
|
data = expensive_load(state.config)
|
||||||
|
{:noreply, %{state | data: data}}
|
||||||
|
end
|
||||||
|
```
|
||||||
|
|
||||||
|
`{:continue, term}` can also be returned from `handle_call`, `handle_cast`, and `handle_info`.
|
||||||
|
|
||||||
|
### Delayed Reply Pattern
|
||||||
|
|
||||||
|
Return `{:noreply, state}` from `handle_call` and reply later with `GenServer.reply/2`:
|
||||||
|
|
||||||
|
```elixir
|
||||||
|
@impl true
|
||||||
|
def handle_call(:slow_query, from, state) do
|
||||||
|
Task.Supervisor.start_child(MyApp.TaskSupervisor, fn ->
|
||||||
|
result = run_query()
|
||||||
|
GenServer.reply(from, result)
|
||||||
|
end)
|
||||||
|
{:noreply, state} # Don't block the GenServer
|
||||||
|
end
|
||||||
|
```
|
||||||
|
|
||||||
|
### Process Registration & Naming
|
||||||
|
|
||||||
|
```elixir
|
||||||
|
# Local atom name
|
||||||
|
GenServer.start_link(MyServer, arg, name: MyServer)
|
||||||
|
|
||||||
|
# Global name (cluster-wide via :global)
|
||||||
|
GenServer.start_link(MyServer, arg, name: {:global, :my_server})
|
||||||
|
|
||||||
|
# Via Registry (recommended for dynamic naming)
|
||||||
|
GenServer.start_link(MyServer, arg,
|
||||||
|
name: {:via, Registry, {MyApp.Registry, "worker:#{id}"}})
|
||||||
|
```
|
||||||
|
|
||||||
|
### code_change — Hot Code Upgrades
|
||||||
|
|
||||||
|
```elixir
|
||||||
|
@impl true
|
||||||
|
def code_change(old_vsn, state, _extra) do
|
||||||
|
# Transform state shape between versions
|
||||||
|
new_state = Map.put_new(state, :new_field, default_value())
|
||||||
|
{:ok, new_state}
|
||||||
|
end
|
||||||
|
```
|
||||||
|
|
||||||
---
|
---
|
||||||
|
|
||||||
## GenStateMachine — State Machines for Agentic Workflows
|
## GenStateMachine — State Machines for Agentic Workflows
|
||||||
@ -219,6 +331,46 @@ actions = [
|
|||||||
|
|
||||||
## Supervisors — Let It Crash
|
## Supervisors — Let It Crash
|
||||||
|
|
||||||
|
### Child Specifications
|
||||||
|
|
||||||
|
Every supervised process needs a child spec — a map describing how to start, restart, and shut it down:
|
||||||
|
|
||||||
|
```elixir
|
||||||
|
%{
|
||||||
|
id: MyWorker, # Unique identifier (required)
|
||||||
|
start: {MyWorker, :start_link, [arg]}, # {Module, Function, Args} (required)
|
||||||
|
restart: :permanent, # :permanent | :transient | :temporary
|
||||||
|
shutdown: 5_000, # Timeout in ms, or :brutal_kill, or :infinity
|
||||||
|
type: :worker # :worker | :supervisor
|
||||||
|
}
|
||||||
|
```
|
||||||
|
|
||||||
|
`use GenServer` / `use Supervisor` auto-generates `child_spec/1` — you rarely write these by hand.
|
||||||
|
|
||||||
|
### Restart Semantics
|
||||||
|
|
||||||
|
| Strategy | Behavior |
|
||||||
|
|----------|----------|
|
||||||
|
| `:permanent` | Always restart (default) — use for core services |
|
||||||
|
| `:transient` | Restart only on abnormal exit — use for temporary workers that should complete |
|
||||||
|
| `:temporary` | Never restart — spec auto-deleted on exit |
|
||||||
|
|
||||||
|
**What counts as "normal" exit:** `:normal`, `:shutdown`, `{:shutdown, term}` — these suppress error logs too.
|
||||||
|
|
||||||
|
### Crash Budget
|
||||||
|
|
||||||
|
```elixir
|
||||||
|
opts = [
|
||||||
|
strategy: :one_for_one,
|
||||||
|
max_restarts: 3, # Max restarts within window (default: 3)
|
||||||
|
max_seconds: 5 # Window in seconds (default: 5)
|
||||||
|
]
|
||||||
|
```
|
||||||
|
|
||||||
|
Exceed the budget and the supervisor itself exits with `:shutdown` — escalating to *its* supervisor.
|
||||||
|
|
||||||
|
### Strategies
|
||||||
|
|
||||||
```elixir
|
```elixir
|
||||||
defmodule MyApp.Application do
|
defmodule MyApp.Application do
|
||||||
use Application
|
use Application
|
||||||
@ -226,10 +378,10 @@ defmodule MyApp.Application do
|
|||||||
@impl true
|
@impl true
|
||||||
def start(_type, _args) do
|
def start(_type, _args) do
|
||||||
children = [
|
children = [
|
||||||
MyApp.Repo,
|
MyApp.Repo, # Start first
|
||||||
{MyApp.Cache, []},
|
{MyApp.Cache, []}, # Depends on Repo
|
||||||
{Task.Supervisor, name: MyApp.TaskSupervisor},
|
{Task.Supervisor, name: MyApp.TaskSupervisor},
|
||||||
MyAppWeb.Endpoint
|
MyAppWeb.Endpoint # Start last
|
||||||
]
|
]
|
||||||
opts = [strategy: :one_for_one, name: MyApp.Supervisor]
|
opts = [strategy: :one_for_one, name: MyApp.Supervisor]
|
||||||
Supervisor.start_link(children, opts)
|
Supervisor.start_link(children, opts)
|
||||||
@ -237,12 +389,27 @@ defmodule MyApp.Application do
|
|||||||
end
|
end
|
||||||
```
|
```
|
||||||
|
|
||||||
**Strategies:**
|
|
||||||
- `:one_for_one` — restart only crashed child (most common)
|
- `:one_for_one` — restart only crashed child (most common)
|
||||||
- `:one_for_all` — restart all if any crashes (tightly coupled)
|
- `:one_for_all` — restart all if any crashes (tightly coupled; usually too aggressive)
|
||||||
- `:rest_for_one` — restart crashed + all started after it
|
- `:rest_for_one` — restart crashed + all started after it (use when startup order = dependency order)
|
||||||
|
|
||||||
|
**Shutdown order is reverse start order.** If a child exceeds its `:shutdown` timeout, it gets killed.
|
||||||
|
|
||||||
|
### auto_shutdown (Elixir 1.19+)
|
||||||
|
|
||||||
|
```elixir
|
||||||
|
Supervisor.start_link(children,
|
||||||
|
strategy: :one_for_one,
|
||||||
|
auto_shutdown: :any_significant # or :all_significant
|
||||||
|
)
|
||||||
|
```
|
||||||
|
|
||||||
|
The supervisor exits automatically when significant children (marked with `significant: true` in child spec) terminate.
|
||||||
|
|
||||||
### DynamicSupervisor
|
### DynamicSupervisor
|
||||||
|
|
||||||
|
For children started on demand at runtime:
|
||||||
|
|
||||||
```elixir
|
```elixir
|
||||||
defmodule MyApp.SessionSupervisor do
|
defmodule MyApp.SessionSupervisor do
|
||||||
use DynamicSupervisor
|
use DynamicSupervisor
|
||||||
@ -253,26 +420,152 @@ defmodule MyApp.SessionSupervisor do
|
|||||||
def start_session(session_id) do
|
def start_session(session_id) do
|
||||||
DynamicSupervisor.start_child(__MODULE__, {MyApp.Session, session_id})
|
DynamicSupervisor.start_child(__MODULE__, {MyApp.Session, session_id})
|
||||||
end
|
end
|
||||||
|
|
||||||
|
def count, do: DynamicSupervisor.count_children(__MODULE__)
|
||||||
|
def list, do: DynamicSupervisor.which_children(__MODULE__)
|
||||||
end
|
end
|
||||||
```
|
```
|
||||||
|
|
||||||
|
**When to use DynamicSupervisor vs Supervisor:**
|
||||||
|
- **DynamicSupervisor** — children start on demand, large populations, no ordering needed
|
||||||
|
- **Supervisor** — mostly static children, startup order matters, small fixed set
|
||||||
|
|
||||||
|
**Bottleneck?** DynamicSupervisor is a single process. Use `PartitionSupervisor` to distribute load:
|
||||||
|
|
||||||
|
```elixir
|
||||||
|
{PartitionSupervisor,
|
||||||
|
child_spec: DynamicSupervisor,
|
||||||
|
name: MyApp.PartitionedSupervisors}
|
||||||
|
|
||||||
|
# Start child on a partition
|
||||||
|
DynamicSupervisor.start_child(
|
||||||
|
{:via, PartitionSupervisor, {MyApp.PartitionedSupervisors, self()}},
|
||||||
|
child_spec
|
||||||
|
)
|
||||||
|
```
|
||||||
|
|
||||||
|
---
|
||||||
|
|
||||||
|
## Agent — Simple State Container
|
||||||
|
|
||||||
|
For simple state sharing when GenServer is overkill:
|
||||||
|
|
||||||
|
```elixir
|
||||||
|
{:ok, agent} = Agent.start_link(fn -> %{} end, name: :cache)
|
||||||
|
|
||||||
|
Agent.update(:cache, &Map.put(&1, :key, "value"))
|
||||||
|
Agent.get(:cache, &Map.get(&1, :key))
|
||||||
|
|
||||||
|
# Atomic read-modify-write
|
||||||
|
Agent.get_and_update(:cache, fn state ->
|
||||||
|
{Map.get(state, :key), Map.put(state, :key, "new_value")}
|
||||||
|
end)
|
||||||
|
```
|
||||||
|
|
||||||
|
**Limitations:** No message handling, no lifecycle callbacks, no state enter/exit logic. If you need any of that, use GenServer. Anonymous functions fail across distributed nodes — use `{module, fun, args}` form for distributed Agents.
|
||||||
|
|
||||||
|
---
|
||||||
|
|
||||||
|
## Application — Supervision Tree Entry Point
|
||||||
|
|
||||||
|
### Application Environment
|
||||||
|
|
||||||
|
```elixir
|
||||||
|
# mix.exs — compile-time defaults
|
||||||
|
def application do
|
||||||
|
[extra_applications: [:logger], env: [pool_size: 10]]
|
||||||
|
end
|
||||||
|
|
||||||
|
# config/config.exs — compile-time overrides
|
||||||
|
config :my_app, pool_size: 20
|
||||||
|
|
||||||
|
# config/runtime.exs — runtime overrides (env vars, secrets)
|
||||||
|
config :my_app, pool_size: String.to_integer(System.get_env("POOL_SIZE") || "10")
|
||||||
|
|
||||||
|
# Reading
|
||||||
|
Application.get_env(:my_app, :pool_size, 10) # With default
|
||||||
|
Application.fetch_env!(:my_app, :pool_size) # Raises if missing
|
||||||
|
Application.compile_env(:my_app, :pool_size) # Compile-time validated
|
||||||
|
```
|
||||||
|
|
||||||
|
**Library warning:** Avoid application environment in libraries — it's global mutable state that creates tight coupling. Pass config explicitly via function arguments or init options.
|
||||||
|
|
||||||
|
### Start Types
|
||||||
|
|
||||||
|
| Type | Behavior |
|
||||||
|
|------|----------|
|
||||||
|
| `:permanent` | Node terminates if this app terminates |
|
||||||
|
| `:transient` | Only abnormal exits affect other apps |
|
||||||
|
| `:temporary` | Standalone — failures don't cascade (default) |
|
||||||
|
|
||||||
---
|
---
|
||||||
|
|
||||||
## Registry & Process Discovery
|
## Registry & Process Discovery
|
||||||
|
|
||||||
|
### Unique Keys — One Process Per Key
|
||||||
|
|
||||||
```elixir
|
```elixir
|
||||||
# In supervisor
|
# In supervisor
|
||||||
{Registry, keys: :unique, name: MyApp.Registry}
|
{Registry, keys: :unique, name: MyApp.Registry}
|
||||||
|
|
||||||
# Register a process
|
# Register a process
|
||||||
Registry.register(MyApp.Registry, "session:#{id}", %{})
|
Registry.register(MyApp.Registry, "session:#{id}", %{meta: "data"})
|
||||||
|
|
||||||
# Lookup
|
# Lookup
|
||||||
case Registry.lookup(MyApp.Registry, "session:#{id}") do
|
case Registry.lookup(MyApp.Registry, "session:#{id}") do
|
||||||
[{pid, _value}] -> {:ok, pid}
|
[{pid, value}] -> {:ok, pid}
|
||||||
[] -> {:error, :not_found}
|
[] -> {:error, :not_found}
|
||||||
end
|
end
|
||||||
|
|
||||||
# Use as GenServer name
|
# Use as GenServer name
|
||||||
GenServer.start_link(MyWorker, arg, name: {:via, Registry, {MyApp.Registry, "worker:#{id}"}})
|
GenServer.start_link(MyWorker, arg,
|
||||||
|
name: {:via, Registry, {MyApp.Registry, "worker:#{id}"}})
|
||||||
```
|
```
|
||||||
|
|
||||||
|
### Duplicate Keys — Pub/Sub Pattern
|
||||||
|
|
||||||
|
```elixir
|
||||||
|
{Registry, keys: :duplicate, name: MyApp.PubSub}
|
||||||
|
|
||||||
|
# Subscribe (from each interested process)
|
||||||
|
Registry.register(MyApp.PubSub, "events:orders", [])
|
||||||
|
|
||||||
|
# Broadcast
|
||||||
|
Registry.dispatch(MyApp.PubSub, "events:orders", fn entries ->
|
||||||
|
for {pid, _} <- entries, do: send(pid, {:order_created, order})
|
||||||
|
end)
|
||||||
|
```
|
||||||
|
|
||||||
|
### Performance Tuning
|
||||||
|
|
||||||
|
```elixir
|
||||||
|
{Registry, keys: :unique, name: MyApp.Registry,
|
||||||
|
partitions: System.schedulers_online()}
|
||||||
|
```
|
||||||
|
|
||||||
|
Partition the registry for concurrent throughput under high load.
|
||||||
|
|
||||||
|
### Registry Metadata
|
||||||
|
|
||||||
|
```elixir
|
||||||
|
Registry.put_meta(MyApp.Registry, :config, %{version: 2})
|
||||||
|
{:ok, config} = Registry.meta(MyApp.Registry, :config)
|
||||||
|
```
|
||||||
|
|
||||||
|
---
|
||||||
|
|
||||||
|
## OTP Production Checklist
|
||||||
|
|
||||||
|
- Use `:one_for_one` by default; `:rest_for_one` when order = dependency
|
||||||
|
- Set `max_restarts`/`max_seconds` appropriate to your domain
|
||||||
|
- Set `:shutdown` to `:infinity` for nested supervisors, timeout for workers
|
||||||
|
- Use `PartitionSupervisor` if DynamicSupervisor becomes a bottleneck
|
||||||
|
- Use `:transient` restart for temporary workers; `:permanent` for core services
|
||||||
|
- Use `handle_continue` for post-init setup to avoid blocking the supervisor
|
||||||
|
- Use delayed reply pattern in GenServer for long-running operations
|
||||||
|
- Never `await` a Task inside a GenServer — use `async_nolink` + `handle_info`
|
||||||
|
- Always use `Task.Supervisor` over raw `Task.async` in production for visibility
|
||||||
|
- For Registry pub/sub, use `:duplicate` keys and `dispatch/3`
|
||||||
|
- Partition registries with `System.schedulers_online()` under high concurrency
|
||||||
|
- Don't use GenServer for code organization — use plain modules for pure functions
|
||||||
|
- Don't use Agent for complex state — graduate to GenServer
|
||||||
|
|||||||
@ -416,8 +416,11 @@ This generates code that automatically passes `current_user` to context function
|
|||||||
|
|
||||||
## Ecto — Database Layer
|
## Ecto — Database Layer
|
||||||
|
|
||||||
|
Ecto has four core components: **Repo** (database wrapper), **Schema** (data mapping), **Changeset** (validation + change tracking), and **Query** (composable queries). Always preload associations explicitly — Ecto never lazy-loads.
|
||||||
|
|
||||||
|
### Schema
|
||||||
|
|
||||||
```elixir
|
```elixir
|
||||||
# Schema
|
|
||||||
defmodule MyApp.Catalog.Product do
|
defmodule MyApp.Catalog.Product do
|
||||||
use Ecto.Schema
|
use Ecto.Schema
|
||||||
import Ecto.Changeset
|
import Ecto.Changeset
|
||||||
@ -426,25 +429,105 @@ defmodule MyApp.Catalog.Product do
|
|||||||
field :title, :string
|
field :title, :string
|
||||||
field :price, :decimal
|
field :price, :decimal
|
||||||
field :status, Ecto.Enum, values: [:draft, :published, :archived]
|
field :status, Ecto.Enum, values: [:draft, :published, :archived]
|
||||||
|
field :metadata, :map, default: %{}
|
||||||
|
field :tags, {:array, :string}, default: []
|
||||||
|
field :computed, :string, virtual: true # Not persisted
|
||||||
|
field :slug, :string, source: :url_slug # Maps to different DB column
|
||||||
|
|
||||||
has_many :reviews, MyApp.Reviews.Review
|
has_many :reviews, MyApp.Reviews.Review
|
||||||
|
has_one :detail, MyApp.Catalog.ProductDetail
|
||||||
belongs_to :category, MyApp.Catalog.Category
|
belongs_to :category, MyApp.Catalog.Category
|
||||||
|
many_to_many :tags, MyApp.Tag, join_through: "product_tags"
|
||||||
|
|
||||||
|
embeds_one :seo, SEO, on_replace: :update do
|
||||||
|
field :meta_title, :string
|
||||||
|
field :meta_description, :string
|
||||||
|
end
|
||||||
|
|
||||||
|
embeds_many :variants, Variant, on_replace: :delete do
|
||||||
|
field :sku, :string
|
||||||
|
field :price, :decimal
|
||||||
|
end
|
||||||
|
|
||||||
timestamps(type: :utc_datetime)
|
timestamps(type: :utc_datetime)
|
||||||
end
|
end
|
||||||
|
|
||||||
def changeset(product, attrs) do
|
def changeset(product, attrs) do
|
||||||
product
|
product
|
||||||
|> cast(attrs, [:title, :price, :status, :category_id])
|
|> cast(attrs, [:title, :price, :status, :category_id, :metadata, :tags])
|
||||||
|
|> cast_embed(:seo)
|
||||||
|
|> cast_embed(:variants)
|
||||||
|> validate_required([:title, :price])
|
|> validate_required([:title, :price])
|
||||||
|> validate_number(:price, greater_than: 0)
|
|> validate_number(:price, greater_than: 0)
|
||||||
|
|> validate_length(:title, min: 3, max: 255)
|
||||||
|
|> validate_inclusion(:status, [:draft, :published, :archived])
|
||||||
|> unique_constraint(:title)
|
|> unique_constraint(:title)
|
||||||
|> foreign_key_constraint(:category_id)
|
|> foreign_key_constraint(:category_id)
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
```
|
||||||
|
|
||||||
# Queries
|
**Field types:** `:string`, `:integer`, `:float`, `:decimal`, `:boolean`, `:binary`, `:map`, `{:array, type}`, `:date`, `:time`, `:naive_datetime`, `:utc_datetime`, `:utc_datetime_usec`, `:binary_id`, `Ecto.UUID`, `Ecto.Enum`
|
||||||
|
|
||||||
|
**Field options:** `:default`, `:source` (DB column name), `:virtual` (not persisted), `:redact` (mask in inspect), `:read_after_writes` (re-read from DB post-write), `:autogenerate`
|
||||||
|
|
||||||
|
**Embed `:on_replace` options:** `:raise` (default), `:mark_as_invalid`, `:update` (embeds_one), `:delete` (embeds_many)
|
||||||
|
|
||||||
|
### Changeset — Validation & Change Tracking
|
||||||
|
|
||||||
|
```elixir
|
||||||
|
# Validations (run in-memory, no DB)
|
||||||
|
|> validate_required([:field])
|
||||||
|
|> validate_format(:email, ~r/@/)
|
||||||
|
|> validate_length(:name, min: 2, max: 100)
|
||||||
|
|> validate_number(:age, greater_than: 0, less_than: 150)
|
||||||
|
|> validate_inclusion(:role, [:admin, :user])
|
||||||
|
|> validate_exclusion(:username, ["admin", "root"])
|
||||||
|
|> validate_acceptance(:terms) # Checkbox must be true
|
||||||
|
|> validate_confirmation(:password) # password_confirmation must match
|
||||||
|
|> validate_subset(:permissions, [:read, :write, :admin])
|
||||||
|
|> validate_change(:email, fn :email, email -> # Custom per-field validation
|
||||||
|
if String.contains?(email, "+"), do: [email: "no plus addressing"], else: []
|
||||||
|
end)
|
||||||
|
|
||||||
|
# Constraints (checked by DB — only run if validations pass)
|
||||||
|
|> unique_constraint(:email)
|
||||||
|
|> unique_constraint([:user_id, :project_id], name: :user_projects_unique_index)
|
||||||
|
|> foreign_key_constraint(:category_id)
|
||||||
|
|> check_constraint(:price, name: :price_must_be_positive)
|
||||||
|
|> no_assoc_constraint(:reviews) # Prevent delete if has associations
|
||||||
|
|> exclusion_constraint(:date_range) # Postgres range exclusion
|
||||||
|
```
|
||||||
|
|
||||||
|
**Schemaless changesets** — validate arbitrary data without a schema:
|
||||||
|
```elixir
|
||||||
|
types = %{email: :string, age: :integer}
|
||||||
|
changeset = {%{}, types}
|
||||||
|
|> Ecto.Changeset.cast(params, Map.keys(types))
|
||||||
|
|> Ecto.Changeset.validate_required([:email])
|
||||||
|
|
||||||
|
case Ecto.Changeset.apply_action(changeset, :validate) do
|
||||||
|
{:ok, data} -> use_data(data)
|
||||||
|
{:error, changeset} -> show_errors(changeset)
|
||||||
|
end
|
||||||
|
```
|
||||||
|
|
||||||
|
**Error traversal:**
|
||||||
|
```elixir
|
||||||
|
Ecto.Changeset.traverse_errors(changeset, fn {msg, opts} ->
|
||||||
|
Regex.replace(~r"%{(\w+)}", msg, fn _, key ->
|
||||||
|
opts |> Keyword.get(String.to_existing_atom(key), key) |> to_string()
|
||||||
|
end)
|
||||||
|
end)
|
||||||
|
# => %{title: ["can't be blank"], price: ["must be greater than 0"]}
|
||||||
|
```
|
||||||
|
|
||||||
|
### Query — Composable & Dynamic
|
||||||
|
|
||||||
|
```elixir
|
||||||
import Ecto.Query
|
import Ecto.Query
|
||||||
|
|
||||||
# Composable queries
|
# Composable queries — chain them together
|
||||||
def published(query \\ Product) do
|
def published(query \\ Product) do
|
||||||
from p in query, where: p.status == :published
|
from p in query, where: p.status == :published
|
||||||
end
|
end
|
||||||
@ -460,6 +543,211 @@ end
|
|||||||
# Usage: Product |> published() |> recent(30) |> with_reviews() |> Repo.all()
|
# Usage: Product |> published() |> recent(30) |> with_reviews() |> Repo.all()
|
||||||
```
|
```
|
||||||
|
|
||||||
|
**Named bindings** — track joins without positional counting:
|
||||||
|
```elixir
|
||||||
|
from p in Post, as: :post,
|
||||||
|
join: c in assoc(p, :comments), as: :comment,
|
||||||
|
where: as(:comment).approved == true,
|
||||||
|
select: {as(:post).title, count(as(:comment).id)},
|
||||||
|
group_by: as(:post).id
|
||||||
|
```
|
||||||
|
|
||||||
|
**Dynamic queries** — build conditions programmatically:
|
||||||
|
```elixir
|
||||||
|
def filter(params) do
|
||||||
|
query = from(p in Product)
|
||||||
|
|
||||||
|
conditions = true
|
||||||
|
|
||||||
|
conditions = if params["status"],
|
||||||
|
do: dynamic([p], p.status == ^params["status"] and ^conditions),
|
||||||
|
else: conditions
|
||||||
|
|
||||||
|
conditions = if params["min_price"],
|
||||||
|
do: dynamic([p], p.price >= ^params["min_price"] and ^conditions),
|
||||||
|
else: conditions
|
||||||
|
|
||||||
|
from p in query, where: ^conditions
|
||||||
|
end
|
||||||
|
```
|
||||||
|
|
||||||
|
**Subqueries:**
|
||||||
|
```elixir
|
||||||
|
top_products = from p in Product, order_by: [desc: :sales], limit: 10
|
||||||
|
from p in subquery(top_products), select: avg(p.price)
|
||||||
|
```
|
||||||
|
|
||||||
|
**Window functions:**
|
||||||
|
```elixir
|
||||||
|
from e in Employee,
|
||||||
|
select: {e.name, e.salary, over(avg(e.salary), :dept)},
|
||||||
|
windows: [dept: [partition_by: e.department_id, order_by: e.salary]]
|
||||||
|
```
|
||||||
|
|
||||||
|
**Fragments (raw SQL):**
|
||||||
|
```elixir
|
||||||
|
from p in Post,
|
||||||
|
where: fragment("? @> ?", p.tags, ^["elixir"]), # Postgres array contains
|
||||||
|
order_by: fragment("random()")
|
||||||
|
```
|
||||||
|
|
||||||
|
**Preloading strategies:**
|
||||||
|
```elixir
|
||||||
|
# Separate queries (default) — parallelized, no data duplication
|
||||||
|
Repo.all(Post) |> Repo.preload([:comments, :author])
|
||||||
|
|
||||||
|
# Join preload — single query, watch for Cartesian product
|
||||||
|
from p in Post, join: c in assoc(p, :comments), preload: [comments: c]
|
||||||
|
|
||||||
|
# Custom query preload
|
||||||
|
Repo.preload(posts, comments: from(c in Comment, order_by: c.inserted_at))
|
||||||
|
|
||||||
|
# Nested preload
|
||||||
|
Repo.preload(posts, [comments: [:author, :replies]])
|
||||||
|
```
|
||||||
|
|
||||||
|
**Important:** Use `is_nil(field)` not `field == nil` in queries. Only one `select` per query — use `select_merge` to compose.
|
||||||
|
|
||||||
|
### Repo — Database Operations
|
||||||
|
|
||||||
|
```elixir
|
||||||
|
# CRUD
|
||||||
|
Repo.insert(changeset) # {:ok, struct} | {:error, changeset}
|
||||||
|
Repo.update(changeset)
|
||||||
|
Repo.delete(struct)
|
||||||
|
Repo.insert!(changeset) # Returns struct or raises
|
||||||
|
Repo.get(Post, 1) # nil if not found
|
||||||
|
Repo.get!(Post, 1) # Raises if not found
|
||||||
|
Repo.get_by(Post, title: "Hello") # By arbitrary fields
|
||||||
|
Repo.one(query) # Raises if > 1 result
|
||||||
|
|
||||||
|
# Bulk operations — return {count, nil | results}
|
||||||
|
Repo.insert_all(Post, [%{title: "A"}, %{title: "B"}])
|
||||||
|
Repo.update_all(from(p in Post, where: p.old == true), set: [archived: true])
|
||||||
|
Repo.delete_all(from(p in Post, where: p.inserted_at < ago(1, "year")))
|
||||||
|
# NOTE: update_all does NOT update auto-generated fields like updated_at
|
||||||
|
|
||||||
|
# Upserts (on_conflict)
|
||||||
|
Repo.insert(changeset,
|
||||||
|
on_conflict: :nothing, # Ignore conflict
|
||||||
|
conflict_target: [:email]
|
||||||
|
)
|
||||||
|
Repo.insert(changeset,
|
||||||
|
on_conflict: {:replace, [:name, :updated_at]}, # Update specific fields
|
||||||
|
conflict_target: [:email]
|
||||||
|
)
|
||||||
|
Repo.insert(changeset,
|
||||||
|
on_conflict: :replace_all, # Replace everything
|
||||||
|
conflict_target: :id
|
||||||
|
)
|
||||||
|
|
||||||
|
# Aggregation
|
||||||
|
Repo.aggregate(Post, :count) # SELECT count(*)
|
||||||
|
Repo.aggregate(Post, :sum, :views) # SELECT sum(views)
|
||||||
|
Repo.aggregate(query, :avg, :price) # Works with queries too
|
||||||
|
```
|
||||||
|
|
||||||
|
### Ecto.Multi — Atomic Transactions
|
||||||
|
|
||||||
|
Group operations that must all succeed or all fail:
|
||||||
|
|
||||||
|
```elixir
|
||||||
|
alias Ecto.Multi
|
||||||
|
|
||||||
|
Multi.new()
|
||||||
|
|> Multi.insert(:post, Post.changeset(%Post{}, post_attrs))
|
||||||
|
|> Multi.insert(:comment, fn %{post: post} ->
|
||||||
|
Comment.changeset(%Comment{}, %{post_id: post.id, body: "First!"})
|
||||||
|
end)
|
||||||
|
|> Multi.update_all(:increment, from(u in User, where: u.id == ^user_id),
|
||||||
|
inc: [post_count: 1])
|
||||||
|
|> Multi.run(:notify, fn _repo, %{post: post} ->
|
||||||
|
# Arbitrary logic — return {:ok, _} or {:error, _}
|
||||||
|
Notifications.send_new_post(post)
|
||||||
|
end)
|
||||||
|
|> Repo.transaction()
|
||||||
|
# => {:ok, %{post: %Post{}, comment: %Comment{}, increment: {1, nil}, notify: :sent}}
|
||||||
|
# => {:error, failed_op, failed_value, changes_so_far}
|
||||||
|
```
|
||||||
|
|
||||||
|
**Key patterns:** Each operation name must be unique. Failed operations roll back everything. Use `Multi.run/3` for arbitrary logic. Test with `Multi.to_list/1` to inspect without executing.
|
||||||
|
|
||||||
|
### Streaming — Large Result Sets
|
||||||
|
|
||||||
|
```elixir
|
||||||
|
Repo.transaction(fn ->
|
||||||
|
Post
|
||||||
|
|> where([p], p.status == :published)
|
||||||
|
|> Repo.stream(max_rows: 500)
|
||||||
|
|> Stream.each(&process_post/1)
|
||||||
|
|> Stream.run()
|
||||||
|
end)
|
||||||
|
```
|
||||||
|
|
||||||
|
Must execute within a transaction. Default batch size is 500 rows.
|
||||||
|
|
||||||
|
### Migrations
|
||||||
|
|
||||||
|
```elixir
|
||||||
|
defmodule MyApp.Repo.Migrations.CreateProducts do
|
||||||
|
use Ecto.Migration
|
||||||
|
|
||||||
|
def change do
|
||||||
|
create table("products") do
|
||||||
|
add :title, :string, null: false, size: 255
|
||||||
|
add :price, :decimal, precision: 10, scale: 2
|
||||||
|
add :status, :string, default: "draft"
|
||||||
|
add :metadata, :map, default: %{}
|
||||||
|
add :tags, {:array, :string}, default: []
|
||||||
|
add :category_id, references("categories", on_delete: :restrict)
|
||||||
|
timestamps(type: :utc_datetime)
|
||||||
|
end
|
||||||
|
|
||||||
|
create index("products", [:category_id])
|
||||||
|
create unique_index("products", [:title])
|
||||||
|
create index("products", [:status, :inserted_at])
|
||||||
|
create index("products", [:tags], using: :gin) # Postgres array index
|
||||||
|
create index("products", [:title],
|
||||||
|
where: "status = 'published'", name: :published_title_idx) # Partial index
|
||||||
|
end
|
||||||
|
end
|
||||||
|
```
|
||||||
|
|
||||||
|
**Alter tables:**
|
||||||
|
```elixir
|
||||||
|
def change do
|
||||||
|
alter table("products") do
|
||||||
|
add :slug, :string
|
||||||
|
modify :title, :text, from: :string # :from required for reversibility
|
||||||
|
remove :deprecated_field, :string # type required for reversibility
|
||||||
|
end
|
||||||
|
|
||||||
|
# Raw SQL when needed
|
||||||
|
execute "CREATE EXTENSION IF NOT EXISTS \"pg_trgm\"",
|
||||||
|
"DROP EXTENSION IF EXISTS \"pg_trgm\""
|
||||||
|
end
|
||||||
|
```
|
||||||
|
|
||||||
|
**Run migrations:** `mix ecto.migrate` / `mix ecto.rollback` / `mix ecto.reset`
|
||||||
|
|
||||||
|
### Ecto.Enum
|
||||||
|
|
||||||
|
```elixir
|
||||||
|
# String storage (default)
|
||||||
|
field :status, Ecto.Enum, values: [:draft, :published, :archived]
|
||||||
|
|
||||||
|
# Integer storage
|
||||||
|
field :priority, Ecto.Enum, values: [low: 1, medium: 2, high: 3]
|
||||||
|
|
||||||
|
# Array of enums
|
||||||
|
field :roles, {:array, Ecto.Enum}, values: [:admin, :editor, :viewer]
|
||||||
|
|
||||||
|
# Query helpers
|
||||||
|
Ecto.Enum.values(Product, :status) # [:draft, :published, :archived]
|
||||||
|
Ecto.Enum.dump_values(Product, :status) # ["draft", "published", "archived"]
|
||||||
|
Ecto.Enum.mappings(Product, :status) # [draft: "draft", ...] — for form dropdowns
|
||||||
|
```
|
||||||
|
|
||||||
---
|
---
|
||||||
|
|
||||||
## Telemetry — Built-in Observability
|
## Telemetry — Built-in Observability
|
||||||
|
|||||||
@ -338,33 +338,158 @@ From "Engineering Elixir Applications":
|
|||||||
|
|
||||||
---
|
---
|
||||||
|
|
||||||
## CI/CD Pipeline (GitHub Actions)
|
## CI/CD — Cortex-Native Pipeline
|
||||||
|
|
||||||
```yaml
|
Instead of GitHub Actions, we run CI/CD directly on cortex using shell scripts, systemd, and git hooks. This keeps the entire workflow within our ecosystem.
|
||||||
name: Elixir CI
|
|
||||||
on: [push, pull_request]
|
|
||||||
|
|
||||||
jobs:
|
### Git Push Hook — Trigger on Push
|
||||||
test:
|
|
||||||
runs-on: ubuntu-latest
|
On cortex, set up a bare repo with a `post-receive` hook:
|
||||||
services:
|
|
||||||
db:
|
```bash
|
||||||
image: postgres:16
|
# On cortex: create bare repo
|
||||||
env:
|
mkdir -p /data/repos/my_app.git && cd /data/repos/my_app.git
|
||||||
POSTGRES_PASSWORD: postgres
|
git init --bare
|
||||||
ports: ['5432:5432']
|
|
||||||
steps:
|
# post-receive hook
|
||||||
- uses: actions/checkout@v4
|
cat > hooks/post-receive << 'HOOK'
|
||||||
- uses: erlef/setup-beam@v1
|
#!/bin/bash
|
||||||
with:
|
set -euo pipefail
|
||||||
elixir-version: '1.19.5'
|
|
||||||
otp-version: '27.3'
|
WORK_DIR="/data/builds/my_app"
|
||||||
- run: mix deps.get
|
LOG="/var/log/ci/my_app-$(date +%Y%m%d-%H%M%S).log"
|
||||||
- run: mix compile --warnings-as-errors
|
mkdir -p /var/log/ci "$WORK_DIR"
|
||||||
- run: mix format --check-formatted
|
|
||||||
- run: mix credo --strict
|
echo "=== CI triggered at $(date) ===" | tee "$LOG"
|
||||||
- run: mix test
|
|
||||||
- run: mix deps.unlock --check-unused
|
# Checkout latest
|
||||||
|
GIT_WORK_TREE="$WORK_DIR" git checkout -f main 2>&1 | tee -a "$LOG"
|
||||||
|
|
||||||
|
# Run CI pipeline
|
||||||
|
cd "$WORK_DIR"
|
||||||
|
exec /data/scripts/ci-pipeline.sh "$WORK_DIR" 2>&1 | tee -a "$LOG"
|
||||||
|
HOOK
|
||||||
|
chmod +x hooks/post-receive
|
||||||
|
```
|
||||||
|
|
||||||
|
### CI Pipeline Script
|
||||||
|
|
||||||
|
```bash
|
||||||
|
#!/bin/bash
|
||||||
|
# /data/scripts/ci-pipeline.sh
|
||||||
|
set -euo pipefail
|
||||||
|
APP_DIR="$1"
|
||||||
|
cd "$APP_DIR"
|
||||||
|
|
||||||
|
echo "--- Dependencies ---"
|
||||||
|
mix deps.get
|
||||||
|
|
||||||
|
echo "--- Compile (warnings as errors) ---"
|
||||||
|
mix compile --warnings-as-errors
|
||||||
|
|
||||||
|
echo "--- Format check ---"
|
||||||
|
mix format --check-formatted
|
||||||
|
|
||||||
|
echo "--- Static analysis ---"
|
||||||
|
mix credo --strict
|
||||||
|
|
||||||
|
echo "--- Tests ---"
|
||||||
|
MIX_ENV=test mix test
|
||||||
|
|
||||||
|
echo "--- Unused deps check ---"
|
||||||
|
mix deps.unlock --check-unused
|
||||||
|
|
||||||
|
echo "--- Build release ---"
|
||||||
|
MIX_ENV=prod mix release --overwrite
|
||||||
|
|
||||||
|
echo "=== CI PASSED ==="
|
||||||
|
|
||||||
|
# Optional: auto-deploy on success
|
||||||
|
# /data/scripts/deploy.sh "$APP_DIR"
|
||||||
|
```
|
||||||
|
|
||||||
|
### Deploy Script
|
||||||
|
|
||||||
|
```bash
|
||||||
|
#!/bin/bash
|
||||||
|
# /data/scripts/deploy.sh
|
||||||
|
set -euo pipefail
|
||||||
|
APP_DIR="$1"
|
||||||
|
APP_NAME=$(basename "$APP_DIR")
|
||||||
|
RELEASE_DIR="/data/releases/$APP_NAME"
|
||||||
|
|
||||||
|
echo "--- Deploying $APP_NAME ---"
|
||||||
|
|
||||||
|
# Stop current
|
||||||
|
systemctl stop "$APP_NAME" 2>/dev/null || true
|
||||||
|
|
||||||
|
# Copy release
|
||||||
|
mkdir -p "$RELEASE_DIR"
|
||||||
|
cp -r "$APP_DIR/_build/prod/rel/$APP_NAME/"* "$RELEASE_DIR/"
|
||||||
|
|
||||||
|
# Run migrations
|
||||||
|
"$RELEASE_DIR/bin/migrate"
|
||||||
|
|
||||||
|
# Start
|
||||||
|
systemctl start "$APP_NAME"
|
||||||
|
|
||||||
|
# Health check with rollback
|
||||||
|
sleep 5
|
||||||
|
if ! curl -sf http://localhost:4000/health > /dev/null; then
|
||||||
|
echo "!!! Health check failed — rolling back"
|
||||||
|
systemctl stop "$APP_NAME"
|
||||||
|
# Restore previous release from backup
|
||||||
|
cp -r "$RELEASE_DIR.prev/"* "$RELEASE_DIR/"
|
||||||
|
systemctl start "$APP_NAME"
|
||||||
|
exit 1
|
||||||
|
fi
|
||||||
|
|
||||||
|
echo "=== Deploy SUCCESS ==="
|
||||||
|
```
|
||||||
|
|
||||||
|
### Systemd Service Template
|
||||||
|
|
||||||
|
```ini
|
||||||
|
# /etc/systemd/system/my_app.service
|
||||||
|
[Unit]
|
||||||
|
Description=MyApp Elixir Service
|
||||||
|
After=network.target postgresql.service
|
||||||
|
|
||||||
|
[Service]
|
||||||
|
Type=exec
|
||||||
|
User=deploy
|
||||||
|
Environment=MIX_ENV=prod
|
||||||
|
Environment=PORT=4000
|
||||||
|
Environment=PHX_HOST=myapp.hydrascale.net
|
||||||
|
EnvironmentFile=/data/releases/my_app/.env
|
||||||
|
ExecStart=/data/releases/my_app/bin/server
|
||||||
|
ExecStop=/data/releases/my_app/bin/my_app stop
|
||||||
|
Restart=on-failure
|
||||||
|
RestartSec=5
|
||||||
|
|
||||||
|
[Install]
|
||||||
|
WantedBy=multi-user.target
|
||||||
|
```
|
||||||
|
|
||||||
|
### Push from Dev Machine
|
||||||
|
|
||||||
|
```bash
|
||||||
|
# Add cortex as a git remote
|
||||||
|
git remote add cortex root@cortex.hydrascale.net:/data/repos/my_app.git
|
||||||
|
|
||||||
|
# Push triggers CI → optional auto-deploy
|
||||||
|
git push cortex main
|
||||||
|
```
|
||||||
|
|
||||||
|
### Symbiont Integration
|
||||||
|
|
||||||
|
For orchestration via Symbiont, the CI can report status:
|
||||||
|
|
||||||
|
```bash
|
||||||
|
# At end of ci-pipeline.sh
|
||||||
|
curl -s -X POST http://localhost:8080/api/tasks/ci-report \
|
||||||
|
-H "Content-Type: application/json" \
|
||||||
|
-d "{\"app\": \"$APP_NAME\", \"status\": \"passed\", \"commit\": \"$(git rev-parse HEAD)\"}"
|
||||||
```
|
```
|
||||||
|
|
||||||
---
|
---
|
||||||
|
|||||||
Loading…
Reference in New Issue
Block a user