Compare commits

...

10 Commits

Author SHA1 Message Date
Symbiont
4b41794dd4 Fix elixir guide: port 8112 -> 8111 after Python retirement 2026-03-20 20:20:49 +00:00
Symbiont
eac71dd3c1 Update skills for Elixir Symbiont migration 2026-03-20 20:20:13 +00:00
Symbiont
aa4f55e8f1 Deep dive: comprehensive OTP, Ecto, cortex-native CI/CD 2026-03-20 17:08:36 +00:00
Symbiont
8fc59eee23 Split Elixir guide into 4 focused parts for modular loading 2026-03-20 16:38:41 +00:00
Symbiont
72fa304b14 Update Elixir skill: add comprehensive Phoenix 1.8.5 section 2026-03-20 16:25:50 +00:00
Symbiont
956bfef883 Add comprehensive Elixir development guide skill 2026-03-20 15:15:26 +00:00
Symbiont
6d8d16a889 symbiont skill: update session registry to Engram naming
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-03-20 13:48:14 +00:00
Symbiont
4128b69f2a symbiont skill: document Dendrite integration
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-03-19 20:13:55 +00:00
Symbiont
e8d7611e2b Add dendrite skill to canonical repo
Headless Chromium browser service for web browsing, scraping, and automation.
Part of the Muse ecosystem: Symbiont orchestrates, Dendrite perceives.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-03-19 20:12:41 +00:00
Symbiont
5fef25543e symbiont skill: document skills infrastructure
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-03-19 20:05:16 +00:00
8 changed files with 4776 additions and 369 deletions

View File

@ -299,17 +299,20 @@ df -h / && free -h
---
## Symbiont Orchestrator
## Symbiont Orchestrator (Elixir/OTP)
The `/data/symbiont` directory contains the **Symbiont** project — a self-sustaining AI agent orchestrator running on cortex.
The `/data/symbiont_ex/` directory contains the **Symbiont** project — a self-sustaining AI agent orchestrator built in **Elixir/OTP**, running on the BEAM VM.
- **Git repo**: `/data/symbiont/.git` (clone location)
- **Systemd services**:
- `symbiont-api.service` — Main API daemon
- `symbiont-heartbeat.timer` — Periodic health-check timer
- **Runtime**: Elixir 1.19.5 / OTP 27
- **Project root**: `/data/symbiont_ex/`
- **Data**: `/data/symbiont_ex/data/` (ledger.jsonl, queue.jsonl)
- **Systemd service**: `symbiont-ex-api.service` — Plug + Bandit HTTP on port 8111
- **Python archive**: `/data/symbiont/` (retired, disabled — kept for reference)
Check status and logs:
```bash
systemctl status symbiont-api.service symbiont-heartbeat.timer --no-pager
journalctl -u symbiont-api.service -f --no-pager
systemctl status symbiont-ex-api.service --no-pager
journalctl -u symbiont-ex-api -f --no-pager
curl -s http://127.0.0.1:8111/health
curl -s http://127.0.0.1:8111/status | python3 -m json.tool
```

393
dendrite/SKILL.md Normal file
View File

@ -0,0 +1,393 @@
---
name: dendrite
description: >
Headless Chromium browser on cortex.hydrascale.net — full JS execution, sessions, screenshots,
Readability content extraction. Use for ANY web browsing: fetching pages, research, scraping,
screenshots, login flows, form filling, SPA rendering. Prefer over Claude in Chrome (faster,
runs on cortex). Trigger on: URLs in messages, "browse", "fetch", "scrape", "screenshot",
"read this page", "check this link", "log in to", "go to", web research, or any task needing
web access. Also use when WebFetch fails on JS-heavy pages.
---
# Dendrite: Sensory Extension for Muse
## What It Is
Dendrite is the nervous system's sensory arm — it reaches out into the web, perceives content
through full Chromium rendering, and carries structured information back to the system. Named
for the branching neural extensions that receive signals from the outside world.
It runs as a Docker container on `cortex.hydrascale.net`, exposes a REST API behind Caddy
with auto-HTTPS, and includes an MCP server for native Claude integration. Full JavaScript
execution, persistent sessions, Mozilla Readability content extraction, ad blocking, and
minimal stealth patches.
**Part of the Muse ecosystem:** Symbiont orchestrates, Dendrite perceives.
---
## Quick Reference
| Item | Value |
|------|-------|
| Base URL | `https://browser.hydrascale.net` |
| Internal URL | `http://localhost:3000` (from cortex via SSH) |
| API Key | `8dc5e8f7a02745ee8db90c94b2481fd9e1deeea1e2ce74420f54047859ea7edf` |
| Auth header | `X-API-Key: <key>` (required on all endpoints except `/health`) |
| Health check | `GET /health` (no auth) |
| Source | `/opt/muse-browser/` on cortex |
| Git repo | `/data/repos/muse-browser.git` (bare, auto-backed up to rsync.net) |
| Docker container | `muse-browser` (restart: unless-stopped) |
| Caddy domain | `browser.hydrascale.net` (auto-HTTPS) |
---
## Python Helper (paste into every session that needs web access)
```python
import urllib.request, json, urllib.error
DENDRITE_URL = 'https://browser.hydrascale.net'
DENDRITE_KEY = '8dc5e8f7a02745ee8db90c94b2481fd9e1deeea1e2ce74420f54047859ea7edf'
def dendrite(path, body=None, method=None):
"""Call the Dendrite API. Returns parsed JSON."""
url = f'{DENDRITE_URL}{path}'
if body is not None:
data = json.dumps(body).encode()
req = urllib.request.Request(url, data=data, method=method or 'POST')
req.add_header('Content-Type', 'application/json')
else:
req = urllib.request.Request(url, method=method or 'GET')
req.add_header('X-API-Key', DENDRITE_KEY)
try:
with urllib.request.urlopen(req, timeout=60) as resp:
return json.loads(resp.read())
except urllib.error.HTTPError as e:
err = json.loads(e.read())
raise RuntimeError(f"Dendrite error {e.code}: {err.get('error', e.reason)}")
def dendrite_screenshot(body):
"""Take a screenshot. Returns raw PNG bytes."""
url = f'{DENDRITE_URL}/screenshot'
data = json.dumps(body).encode()
req = urllib.request.Request(url, data=data, method='POST')
req.add_header('Content-Type', 'application/json')
req.add_header('X-API-Key', DENDRITE_KEY)
with urllib.request.urlopen(req, timeout=60) as resp:
return resp.read()
```
---
## Endpoints
### `POST /fetch` — The workhorse
Fetch a URL and return its content as markdown, HTML, or text. Runs full Chromium with
JavaScript execution. Readability extracts the main article content by default.
```python
result = dendrite('/fetch', {
'url': 'https://example.com/article',
'format': 'markdown', # 'markdown' | 'html' | 'text' (default: markdown)
'extractMain': True, # Readability strips nav/ads (default: True)
'waitFor': 'domcontentloaded', # 'networkidle' for SPAs (default: domcontentloaded)
'blockAds': True, # Block trackers (default: True)
'timeout': 30000, # ms (default: 30000)
})
# Returns: { url, title, content, format }
print(result['title'])
print(result['content'])
```
**When to use `waitFor: 'networkidle'`:** React/Vue/Angular SPAs, dashboards, or pages where
content loads after the initial HTML. Slower (~5-10s) but catches dynamically rendered content.
**When to use `extractMain: false`:** You need the full HTML (link scraping, structured data,
or when Readability strips too much on non-article pages like listings or search results).
### `POST /screenshot`
```python
png_bytes = dendrite_screenshot({
'url': 'https://example.com',
'fullPage': True, # default: True
'format': 'png', # 'png' | 'jpeg'
'waitFor': 'networkidle', # default: networkidle (screenshots need rendering)
'selector': '.chart', # optional: screenshot just this element
})
with open('screenshot.png', 'wb') as f:
f.write(png_bytes)
```
### `POST /execute`
Run JavaScript in a page context. Scripts are wrapped in an IIFE — use `return` for values.
```python
result = dendrite('/execute', {
'url': 'https://example.com',
'script': 'return document.querySelectorAll("a").length',
})
print(result['result']) # e.g., 42
```
### `POST /interact`
Interact with elements in a session. See Sessions section below.
```python
dendrite('/interact', {
'sessionId': sid,
'action': 'click', # 'click' | 'type' | 'select' | 'wait' | 'scroll'
'selector': '#submit',
'timeout': 5000,
})
# Returns: { ok, title, url }
```
### `POST /session` / `DELETE /session/:id` / `GET /sessions`
Session lifecycle management. See Sessions section.
### `GET /health`
No auth required. Returns: `{ status, sessions, activePages, uptime, timestamp }`
---
## Sessions (multi-step interactions)
Sessions maintain cookies, localStorage, and auth state across requests. Use them for
login flows, multi-page navigation, form filling, and any workflow requiring state.
**Sessions auto-expire after 30 minutes of inactivity. Always close when done.**
### Full session workflow example
```python
# 1. Create
session = dendrite('/session', {
'locale': 'en-US',
'timezone': 'America/Chicago',
'blockAds': True,
})
sid = session['id']
# 2. Navigate to login
page = dendrite('/fetch', {'sessionId': sid, 'url': 'https://app.example.com/login'})
# 3. Type credentials
dendrite('/interact', {
'sessionId': sid, 'action': 'type',
'selector': '#email', 'value': 'user@example.com',
})
dendrite('/interact', {
'sessionId': sid, 'action': 'type',
'selector': '#password', 'value': 'secret', 'submit': True,
})
# 4. Check where we landed
page = dendrite('/fetch', {'sessionId': sid}) # no url = get current page
print(page['title'], page['url'])
# 5. Click around
dendrite('/interact', {'sessionId': sid, 'action': 'click', 'selector': 'nav a.dashboard'})
page = dendrite('/fetch', {'sessionId': sid})
# 6. Always close
dendrite(f'/session/{sid}', method='DELETE')
```
### Interact actions reference
| Action | Required | Optional | Description |
|--------|----------|----------|-------------|
| `click` | `selector` | `timeout` | Click element, wait for domcontentloaded |
| `type` | `selector`, `value` | `submit`, `timeout` | Fill input. `submit: true` presses Enter |
| `select` | `selector`, `value` | `timeout` | Select dropdown option by value |
| `wait` | `selector` | `timeout` | Wait for element to appear in DOM |
| `scroll` | — | `selector`, `timeout` | Scroll element into view, or page bottom if no selector |
---
## Decision Guide
| I need to... | Use |
|---|---|
| Read an article / docs page | `POST /fetch` (default settings) |
| Fetch a React/Vue SPA | `POST /fetch` with `waitFor: 'networkidle'` |
| Scrape links or structured data | `POST /fetch` with `extractMain: false, format: 'html'` then parse |
| Visually verify a page | `POST /screenshot` |
| Extract data via JS | `POST /execute` |
| Log in, fill forms, multi-step | Create session → interact → close |
| Quick check what's at a URL | `POST /fetch` — one line |
### Dendrite vs WebFetch vs Claude in Chrome
| Feature | Dendrite | WebFetch | Claude in Chrome |
|---------|----------|----------|-----------------|
| JavaScript execution | Full Chromium | None | Full Chrome |
| Speed | Fast (server-side) | Fastest (no browser) | Slow (screen recording) |
| SPAs (React, etc.) | Works | Fails | Works |
| Sessions/auth flows | Yes | No | Yes (manual) |
| Screenshots | Yes (API) | No | Yes (visual) |
| Runs on | cortex (16GB) | Cowork VM | Michael's MacBook |
| Best for | Research, scraping, automation | Simple static pages | Visual tasks, debugging |
**Rule of thumb:** Try Dendrite first. Fall back to WebFetch for dead-simple pages where
you don't need JS. Use Claude in Chrome only when you truly need to see and interact with
the visual layout (drag-and-drop, complex visual UIs).
---
## Error Handling
```python
try:
result = dendrite('/fetch', {'url': 'https://example.com'})
except RuntimeError as e:
print(f"Error: {e}")
# Common errors:
# 401 — Bad API key
# 404 — Session not found (expired after 30min idle)
# 429 — Too many concurrent pages (max 10), retry shortly
# 500 — Navigation timeout, page error, or unreachable site
```
**If a page times out:** Try with `waitFor: 'domcontentloaded'` (faster, may miss lazy content)
or increase `timeout` beyond the default 30s.
**If content is empty/short:** The page may be JavaScript-rendered. Use `waitFor: 'networkidle'`.
If Readability returns too little, try `extractMain: false` and extract what you need manually.
---
## Architecture
```
Internet cortex.hydrascale.net
│ │
▼ ▼
[Caddy] ──HTTPS──▶ [Docker: muse-browser]
:443 :3000
┌─────────┐
│ Fastify │ ← REST API
│ server │
└────┬────┘
┌────▼────┐
│Playwright│ ← Single Chromium instance
│ + pool │ Multiple BrowserContexts (sessions)
└────┬────┘
┌────▼─────┐
│Readability│ ← Content extraction
│+ Turndown │ HTML → Markdown
└──────────┘
```
### Stack
- **Runtime:** Node.js 20 on Debian Bookworm (Docker)
- **Browser:** Playwright + Chromium (headless, with stealth patches)
- **HTTP server:** Fastify v4 with CORS + API key auth
- **Content extraction:** Mozilla Readability + Turndown
- **MCP:** stdio transport (for Claude Desktop integration)
- **Reverse proxy:** Caddy (auto-HTTPS, gzip)
### Key files on cortex
| Path | Purpose |
|------|---------|
| `/opt/muse-browser/` | Working directory (Docker build source) |
| `/opt/muse-browser/src/server.js` | Fastify entry point |
| `/opt/muse-browser/src/browser.js` | Chromium pool + sessions |
| `/opt/muse-browser/src/extract.js` | Readability + Turndown |
| `/opt/muse-browser/src/routes.js` | REST endpoints |
| `/opt/muse-browser/src/mcp-stdio.js` | MCP server (stdio) |
| `/opt/muse-browser/.env` | Secrets (API key, config) |
| `/data/repos/muse-browser.git` | Bare git repo (backed up nightly) |
---
## Maintenance & Operations
### Health check
```python
health = dendrite('/health', method='GET')
print(health) # { status: "ok", sessions, activePages, uptime, timestamp }
```
### From cortex SSH
```bash
# Container status
docker ps | grep muse-browser
docker logs muse-browser --tail=50
# Restart
docker compose -f /opt/muse-browser/docker-compose.yml restart
# Full rebuild after code changes
cd /opt/muse-browser && docker compose down && docker compose build --no-cache && docker compose up -d
```
### Git deploy (from Michael's Mac)
```bash
# First time
git clone root@cortex.hydrascale.net:/data/repos/muse-browser.git
cd muse-browser
# After making changes
git push origin main
# → post-receive hook auto-rebuilds container and restarts
```
### Caddy logs (if HTTPS issues)
```bash
journalctl -u caddy --no-pager -n 30
```
---
## MCP Configuration (Claude Desktop)
To use Dendrite tools natively in Claude Desktop, add to MCP config:
```json
{
"mcpServers": {
"dendrite": {
"command": "ssh",
"args": [
"-o", "StrictHostKeyChecking=no",
"-i", "~/.ssh/cortex",
"root@cortex.hydrascale.net",
"docker exec -i muse-browser node src/mcp-stdio.js"
]
}
}
}
```
### MCP tools available
| Tool | Description |
|------|-------------|
| `fetch_page` | Fetch URL → markdown/html/text |
| `take_screenshot` | Screenshot URL or session → PNG |
| `run_javascript` | Execute JS in page context |
| `create_session` | Open persistent browser session |
| `close_session` | Destroy session |
| `navigate` | Session: go to URL, return content |
| `click` | Session: click element by selector |
| `type_text` | Session: type into input field |
| `get_page_content` | Session: get current page content |
| `get_page_screenshot` | Session: screenshot current page |
---
## Relationship to Other Muse Components
- **Symbiont** (orchestrator) can dispatch tasks that require web research → Dendrite fetches the content
- **Cortex** (infrastructure) hosts and runs Dendrite as a Docker service
- **Future components** can call Dendrite's REST API to perceive the web without their own browser

1666
elixir/SKILL.md Normal file

File diff suppressed because it is too large Load Diff

208
elixir/elixir-part1-core.md Normal file
View File

@ -0,0 +1,208 @@
# Elixir Part 1: Core Language & Modern Idioms
## Elixir 1.19 — What's New
### Gradual Set-Theoretic Type System
Elixir 1.19 significantly advances the built-in type system. It is **sound**, **gradual**, and **set-theoretic** (types compose via union, intersection, negation).
**Current capabilities (1.19):**
- Type inference from existing code — no annotations required yet
- **Protocol dispatch type checking** — warns on invalid protocol usage (e.g., string interpolation on structs without `String.Chars`)
- **Anonymous function type inference**`fn` literals and `&captures` propagate types
- Types: `atom()`, `binary()`, `integer()`, `float()`, `pid()`, `port()`, `reference()`, `tuple()`, `list()`, `map()`, `function()`
- Compose: `atom() or integer()` (union), `atom() and not nil` (difference)
- `dynamic()` type for gradual typing — runtime-checked values
- Tuple precision: `{:ok, binary()}`, open tuples: `{:ok, binary(), ...}`
- Map types: `%{key: value}` closed, `%{optional(atom()) => term()}` open
- Function types: `(integer() -> boolean())`
### Up to 4x Faster Compilation
1. **Lazy module loading** — modules no longer loaded when defined; parallel compiler controls loading
2. **Parallel dependency compilation** — set `MIX_OS_DEPS_COMPILE_PARTITION_COUNT` env var
**Caveat:** If spawning processes during compilation that invoke sibling modules, use `Code.ensure_compiled!/1` first.
### Other 1.19 Features
- `min/2` and `max/2` allowed in guards
- `Access.values/0` — traverse all values in a map/keyword
- `String.count/2` — count occurrences
- Unicode 17.0.0, multi-line IEx prompts
- `mix help Mod.fun/arity`
- Erlang/OTP 28 support
---
## Breaking Changes Since 1.15
### Struct Update Syntax (1.18+)
```elixir
# The compiler now verifies the variable matches the struct type
%URI{my_uri | path: "/new"} # my_uri must be verified as %URI{}
```
### Regex as Struct Field Defaults (OTP 28)
```elixir
# BROKEN on OTP 28 — regex can't be struct field defaults
defstruct pattern: ~r/foo/ # Compile error
# FIX: use nil default, set at runtime
```
### Logger Backends Deprecated
```elixir
# OLD: config :logger, backends: [:console]
# NEW: use LoggerHandler (Erlang's logger)
config :logger, :default_handler, []
```
### Mix Task Separator
```elixir
# OLD: mix do compile, test
# NEW: mix do compile + test
```
### mix cli/0 Replaces Config Keys
```elixir
# OLD: default_task, preferred_cli_env in project/0
# NEW: def cli, do: [default_task: "phx.server", preferred_envs: [test: :test]]
```
---
## Core Language Patterns
### The Pipeline
```elixir
orders
|> Enum.filter(&(&1.status == :pending))
|> Enum.sort_by(& &1.created_at, DateTime)
|> Enum.map(&process_order/1)
```
### Pattern Matching
```elixir
# Function head matching — preferred over conditionals
def process(%Order{status: :pending} = order), do: ship(order)
def process(%Order{status: :shipped} = order), do: track(order)
def process(%Order{status: :delivered}), do: :noop
# Pin operator
expected = "hello"
^expected = some_function()
# Partial map matching
%{name: name} = %{name: "Michael", age: 42}
```
### With Expressions
```elixir
with {:ok, user} <- fetch_user(id),
{:ok, account} <- fetch_account(user),
{:ok, balance} <- check_balance(account) do
{:ok, balance}
else
{:error, :not_found} -> {:error, "User not found"}
error -> {:error, "Unknown: #{inspect(error)}"}
end
```
### Structs and Protocols
```elixir
defmodule Money do
defstruct [:amount, :currency]
defimpl String.Chars do
def to_string(%Money{amount: a, currency: c}), do: "#{a} #{c}"
end
end
```
### Behaviours
```elixir
defmodule PaymentProvider do
@callback charge(integer(), String.t()) :: {:ok, String.t()} | {:error, term()}
@callback refund(String.t()) :: {:ok, term()} | {:error, term()}
end
defmodule Stripe do
@behaviour PaymentProvider
@impl true
def charge(amount, currency), do: # ...
@impl true
def refund(transaction_id), do: # ...
end
```
---
## Mix Project Structure
```
my_app/
├── config/
│ ├── config.exs # Compile-time config
│ ├── dev.exs / prod.exs / test.exs
│ └── runtime.exs # Runtime config (secrets, env vars)
├── lib/
│ ├── my_app/
│ │ ├── application.ex # OTP Application + supervisor tree
│ │ └── ... # Domain modules
│ └── my_app_web/ # Phoenix web layer (if applicable)
├── priv/repo/migrations/
├── test/
├── mix.exs
├── .formatter.exs
└── AGENTS.md # Generated by usage_rules
```
---
## Testing
```elixir
defmodule MyApp.BlogTest do
use MyApp.DataCase, async: true
describe "creating posts" do
test "creates with valid attributes" do
assert {:ok, post} = Blog.create_post(%{title: "Hi", body: "World"})
assert post.title == "Hi"
end
test "fails without title" do
assert {:error, changeset} = Blog.create_post(%{body: "World"})
assert "can't be blank" in errors_on(changeset).title
end
end
end
```
Use `async: true` wherever possible. Use `Mox` for behaviour-based mocking.
---
## usage_rules — AI Agent Documentation
**Always include in every project.**
```elixir
# mix.exs deps
{:usage_rules, "~> 1.1", only: [:dev]}
# mix.exs project config
usage_rules: [packages: :all, output: :agents_md, mode: :linked]
```
```bash
mix usage_rules.gen # Generate AGENTS.md from deps
mix usage_rules.search_docs # Search hex documentation
mix usage_rules.gen_skill # Generate SKILL.md for Cowork
```
---
## Pro Tip: Local Docs
Elixir packages ship excellent local documentation. Once Elixir is installed on cortex, accessing docs locally via `mix hex.docs fetch <package>` or `h Module.function` in IEx may be more efficient than fetching URLs. Consider installing Elixir on cortex to enable this workflow.

View File

@ -0,0 +1,571 @@
# Elixir Part 2: Concurrency, OTP & State Machines
## BEAM Process Fundamentals
- **Processes are cheap** — ~2KB initial memory, microseconds to spawn, millions concurrent
- **No shared memory** — message passing only
- **Per-process GC** — no stop-the-world pauses
- **Preemptive scheduling** — one scheduler per CPU core, ~4000 reductions then yield
- **Process isolation** — one crash doesn't affect others
### Spawning and Messaging
```elixir
pid = spawn(fn ->
receive do
{:greet, name} -> IO.puts("Hello, #{name}!")
end
end)
send(pid, {:greet, "Michael"})
# Linked — bidirectional crash propagation
pid = spawn_link(fn -> raise "boom" end)
# Monitored — one-directional crash notification
ref = Process.monitor(pid)
receive do
{:DOWN, ^ref, :process, ^pid, reason} -> handle_crash(reason)
end
```
---
## Task — Structured Concurrency
### Basic Patterns
```elixir
Task.start(fn -> send_email(user) end) # Fire and forget (linked)
task = Task.async(fn -> expensive_computation() end) # Async/await (linked + monitored)
result = Task.await(task, 30_000) # MUST await to drain mailbox
```
**Critical:** `Task.async` creates a bidirectional link — if either the caller or task crashes, both crash. This preserves sequential semantics (you'd have crashed doing it synchronously too).
### async_stream — Parallel Enumeration
```elixir
Task.async_stream(urls, &fetch_url/1,
max_concurrency: 10, # Default: System.schedulers_online()
ordered: true, # Maintain input order (default)
timeout: 30_000, # Per-task timeout
on_timeout: :kill_task # :exit (raise) or :kill_task (return exit tuple)
)
|> Stream.filter(&match?({:ok, _}, &1))
|> Enum.map(fn {:ok, result} -> result end)
```
### Task.Supervisor — Production Patterns
```elixir
# In your supervision tree
{Task.Supervisor, name: MyApp.TaskSupervisor}
# Fire-and-forget (no result tracking)
Task.Supervisor.start_child(MyApp.TaskSupervisor, fn -> do_work() end)
# Linked async — caller crashes if task crashes
Task.Supervisor.async(MyApp.TaskSupervisor, fn -> compute() end) |> Task.await()
# Unlinked async — caller survives task failure
task = Task.Supervisor.async_nolink(MyApp.TaskSupervisor, fn -> risky_work() end)
case Task.yield(task, 5_000) || Task.shutdown(task) do
{:ok, result} -> result
{:exit, reason} -> handle_failure(reason)
nil -> handle_timeout()
end
```
**GenServer integration — never `await` in a GenServer** (blocks all other messages):
```elixir
def handle_call(:compute, _from, state) do
# Start unlinked task, handle result in handle_info
Task.Supervisor.async_nolink(MyApp.TaskSupervisor, fn -> heavy_work() end)
{:noreply, state}
end
def handle_info({ref, result}, state) do
Process.demonitor(ref, [:flush]) # Clean up the monitor
{:noreply, %{state | result: result}}
end
def handle_info({:DOWN, _ref, :process, _pid, reason}, state) do
{:noreply, %{state | error: reason}} # Task crashed
end
```
---
## GenServer — Stateful Server Processes
```elixir
defmodule Counter do
use GenServer
# Client API
def start_link(initial \\ 0), do: GenServer.start_link(__MODULE__, initial, name: __MODULE__)
def increment, do: GenServer.cast(__MODULE__, :increment)
def get, do: GenServer.call(__MODULE__, :get)
# Server callbacks
@impl true
def init(initial), do: {:ok, initial}
@impl true
def handle_cast(:increment, count), do: {:noreply, count + 1}
@impl true
def handle_call(:get, _from, count), do: {:reply, count, count}
@impl true
def handle_info(:tick, count) do
Process.send_after(self(), :tick, 1000)
{:noreply, count}
end
end
```
**Key principle:** Callbacks run sequentially — this is both the synchronization mechanism and potential bottleneck. Keep callbacks fast; delegate heavy work to spawned tasks.
### handle_continue — Post-Init Work
Split expensive initialization so the process doesn't block its supervisor:
```elixir
@impl true
def init(config) do
{:ok, %{config: config, data: nil}, {:continue, :load_data}}
end
@impl true
def handle_continue(:load_data, state) do
data = expensive_load(state.config)
{:noreply, %{state | data: data}}
end
```
`{:continue, term}` can also be returned from `handle_call`, `handle_cast`, and `handle_info`.
### Delayed Reply Pattern
Return `{:noreply, state}` from `handle_call` and reply later with `GenServer.reply/2`:
```elixir
@impl true
def handle_call(:slow_query, from, state) do
Task.Supervisor.start_child(MyApp.TaskSupervisor, fn ->
result = run_query()
GenServer.reply(from, result)
end)
{:noreply, state} # Don't block the GenServer
end
```
### Process Registration & Naming
```elixir
# Local atom name
GenServer.start_link(MyServer, arg, name: MyServer)
# Global name (cluster-wide via :global)
GenServer.start_link(MyServer, arg, name: {:global, :my_server})
# Via Registry (recommended for dynamic naming)
GenServer.start_link(MyServer, arg,
name: {:via, Registry, {MyApp.Registry, "worker:#{id}"}})
```
### code_change — Hot Code Upgrades
```elixir
@impl true
def code_change(old_vsn, state, _extra) do
# Transform state shape between versions
new_state = Map.put_new(state, :new_field, default_value())
{:ok, new_state}
end
```
---
## GenStateMachine — State Machines for Agentic Workflows
**Why GenStateMachine over GenServer for agents?** GenServer has a single state and handles all messages uniformly. GenStateMachine (wrapping Erlang's `:gen_statem`) provides:
- **Explicit states** with per-state event handling
- **Built-in timeouts** — state timeouts (reset on state change), generic timeouts, event timeouts
- **Postpone** — defer events until the right state
- **State enter callbacks** — run setup logic when entering a state
- **Event types** — distinguish calls, casts, info, timeouts, internal events
These map naturally onto agentic patterns: an agent in "thinking" state ignores new requests (postpone), has retry timeouts, transitions through well-defined phases, and runs setup on each state entry.
### Installation
```elixir
{:gen_state_machine, "~> 3.0"}
```
### Callback Modes
**`:handle_event_function`** (default) — single `handle_event/4` for all states:
```elixir
defmodule AgentFSM do
use GenStateMachine
# Client API
def start_link(opts), do: GenStateMachine.start_link(__MODULE__, opts)
def submit(pid, task), do: GenStateMachine.call(pid, {:submit, task})
def status(pid), do: GenStateMachine.call(pid, :status)
# Server callbacks
def init(opts) do
{:ok, :idle, %{tasks: [], results: [], config: opts}}
end
# State: idle
def handle_event({:call, from}, {:submit, task}, :idle, data) do
{:next_state, :processing, %{data | tasks: [task]},
[{:reply, from, :accepted}, {:state_timeout, 30_000, :timeout}]}
end
def handle_event({:call, from}, :status, state, data) do
{:keep_state_and_data, [{:reply, from, {state, length(data.results)}}]}
end
# State: processing — with timeout
def handle_event(:state_timeout, :timeout, :processing, data) do
{:next_state, :error, %{data | error: :timeout}}
end
# Internal event for completion
def handle_event(:info, {:result, result}, :processing, data) do
{:next_state, :complete, %{data | results: [result | data.results]}}
end
# Postpone submissions while processing
def handle_event({:call, _from}, {:submit, _task}, :processing, _data) do
{:keep_state_and_data, :postpone}
end
end
```
**`:state_functions`** — each state is a separate function (states must be atoms):
```elixir
defmodule WorkflowFSM do
use GenStateMachine, callback_mode: [:state_functions, :state_enter]
def init(_), do: {:ok, :pending, %{}}
# State enter callbacks — run on every state transition
def pending(:enter, _old_state, data) do
{:keep_state, %{data | entered_at: DateTime.utc_now()}}
end
def pending({:call, from}, {:start, params}, data) do
{:next_state, :running, %{data | params: params},
[{:reply, from, :ok}, {:state_timeout, 60_000, :execution_timeout}]}
end
def running(:enter, :pending, data) do
# Setup when entering running from pending
send(self(), :execute)
{:keep_state, data}
end
def running(:info, :execute, data) do
result = do_work(data.params)
{:next_state, :complete, %{data | result: result}}
end
def running(:state_timeout, :execution_timeout, data) do
{:next_state, :failed, %{data | error: :timeout}}
end
# Postpone any calls while running
def running({:call, _from}, _request, _data) do
{:keep_state_and_data, :postpone}
end
def complete(:enter, _old, data), do: {:keep_state, data}
def complete({:call, from}, :get_result, data) do
{:keep_state_and_data, [{:reply, from, {:ok, data.result}}]}
end
def failed(:enter, _old, data), do: {:keep_state, data}
def failed({:call, from}, :get_error, data) do
{:keep_state_and_data, [{:reply, from, {:error, data.error}}]}
end
end
```
### Timeout Types
| Type | Behavior | Use Case |
|------|----------|----------|
| `{:timeout, ms, event}` | Generic — survives state changes | Periodic polling |
| `{:state_timeout, ms, event}` | Resets on state change | Per-state deadlines |
| `{:event_timeout, ms, event}` | Resets on any event | Inactivity detection |
### Key Actions in Return Tuples
```elixir
# Return format: {:next_state, new_state, new_data, actions}
actions = [
{:reply, from, response}, # Reply to caller
{:state_timeout, 30_000, :deadline}, # State-scoped timeout
{:timeout, 5_000, :poll}, # Generic timeout
:postpone, # Defer event to next state
:hibernate, # Reduce memory footprint
{:next_event, :internal, :setup} # Queue internal event
]
```
### When to Use GenStateMachine vs GenServer
| Scenario | Use |
|----------|-----|
| Simple key-value state, CRUD | GenServer |
| Request/response server | GenServer |
| Well-defined state transitions | **GenStateMachine** |
| Need built-in timeouts per state | **GenStateMachine** |
| Events valid only in certain states | **GenStateMachine** |
| Agentic workflow with phases | **GenStateMachine** |
| Need postpone/defer semantics | **GenStateMachine** |
---
## Supervisors — Let It Crash
### Child Specifications
Every supervised process needs a child spec — a map describing how to start, restart, and shut it down:
```elixir
%{
id: MyWorker, # Unique identifier (required)
start: {MyWorker, :start_link, [arg]}, # {Module, Function, Args} (required)
restart: :permanent, # :permanent | :transient | :temporary
shutdown: 5_000, # Timeout in ms, or :brutal_kill, or :infinity
type: :worker # :worker | :supervisor
}
```
`use GenServer` / `use Supervisor` auto-generates `child_spec/1` — you rarely write these by hand.
### Restart Semantics
| Strategy | Behavior |
|----------|----------|
| `:permanent` | Always restart (default) — use for core services |
| `:transient` | Restart only on abnormal exit — use for temporary workers that should complete |
| `:temporary` | Never restart — spec auto-deleted on exit |
**What counts as "normal" exit:** `:normal`, `:shutdown`, `{:shutdown, term}` — these suppress error logs too.
### Crash Budget
```elixir
opts = [
strategy: :one_for_one,
max_restarts: 3, # Max restarts within window (default: 3)
max_seconds: 5 # Window in seconds (default: 5)
]
```
Exceed the budget and the supervisor itself exits with `:shutdown` — escalating to *its* supervisor.
### Strategies
```elixir
defmodule MyApp.Application do
use Application
@impl true
def start(_type, _args) do
children = [
MyApp.Repo, # Start first
{MyApp.Cache, []}, # Depends on Repo
{Task.Supervisor, name: MyApp.TaskSupervisor},
MyAppWeb.Endpoint # Start last
]
opts = [strategy: :one_for_one, name: MyApp.Supervisor]
Supervisor.start_link(children, opts)
end
end
```
- `:one_for_one` — restart only crashed child (most common)
- `:one_for_all` — restart all if any crashes (tightly coupled; usually too aggressive)
- `:rest_for_one` — restart crashed + all started after it (use when startup order = dependency order)
**Shutdown order is reverse start order.** If a child exceeds its `:shutdown` timeout, it gets killed.
### auto_shutdown (Elixir 1.19+)
```elixir
Supervisor.start_link(children,
strategy: :one_for_one,
auto_shutdown: :any_significant # or :all_significant
)
```
The supervisor exits automatically when significant children (marked with `significant: true` in child spec) terminate.
### DynamicSupervisor
For children started on demand at runtime:
```elixir
defmodule MyApp.SessionSupervisor do
use DynamicSupervisor
def start_link(_), do: DynamicSupervisor.start_link(__MODULE__, :ok, name: __MODULE__)
def init(:ok), do: DynamicSupervisor.init(strategy: :one_for_one)
def start_session(session_id) do
DynamicSupervisor.start_child(__MODULE__, {MyApp.Session, session_id})
end
def count, do: DynamicSupervisor.count_children(__MODULE__)
def list, do: DynamicSupervisor.which_children(__MODULE__)
end
```
**When to use DynamicSupervisor vs Supervisor:**
- **DynamicSupervisor** — children start on demand, large populations, no ordering needed
- **Supervisor** — mostly static children, startup order matters, small fixed set
**Bottleneck?** DynamicSupervisor is a single process. Use `PartitionSupervisor` to distribute load:
```elixir
{PartitionSupervisor,
child_spec: DynamicSupervisor,
name: MyApp.PartitionedSupervisors}
# Start child on a partition
DynamicSupervisor.start_child(
{:via, PartitionSupervisor, {MyApp.PartitionedSupervisors, self()}},
child_spec
)
```
---
## Agent — Simple State Container
For simple state sharing when GenServer is overkill:
```elixir
{:ok, agent} = Agent.start_link(fn -> %{} end, name: :cache)
Agent.update(:cache, &Map.put(&1, :key, "value"))
Agent.get(:cache, &Map.get(&1, :key))
# Atomic read-modify-write
Agent.get_and_update(:cache, fn state ->
{Map.get(state, :key), Map.put(state, :key, "new_value")}
end)
```
**Limitations:** No message handling, no lifecycle callbacks, no state enter/exit logic. If you need any of that, use GenServer. Anonymous functions fail across distributed nodes — use `{module, fun, args}` form for distributed Agents.
---
## Application — Supervision Tree Entry Point
### Application Environment
```elixir
# mix.exs — compile-time defaults
def application do
[extra_applications: [:logger], env: [pool_size: 10]]
end
# config/config.exs — compile-time overrides
config :my_app, pool_size: 20
# config/runtime.exs — runtime overrides (env vars, secrets)
config :my_app, pool_size: String.to_integer(System.get_env("POOL_SIZE") || "10")
# Reading
Application.get_env(:my_app, :pool_size, 10) # With default
Application.fetch_env!(:my_app, :pool_size) # Raises if missing
Application.compile_env(:my_app, :pool_size) # Compile-time validated
```
**Library warning:** Avoid application environment in libraries — it's global mutable state that creates tight coupling. Pass config explicitly via function arguments or init options.
### Start Types
| Type | Behavior |
|------|----------|
| `:permanent` | Node terminates if this app terminates |
| `:transient` | Only abnormal exits affect other apps |
| `:temporary` | Standalone — failures don't cascade (default) |
---
## Registry & Process Discovery
### Unique Keys — One Process Per Key
```elixir
# In supervisor
{Registry, keys: :unique, name: MyApp.Registry}
# Register a process
Registry.register(MyApp.Registry, "session:#{id}", %{meta: "data"})
# Lookup
case Registry.lookup(MyApp.Registry, "session:#{id}") do
[{pid, value}] -> {:ok, pid}
[] -> {:error, :not_found}
end
# Use as GenServer name
GenServer.start_link(MyWorker, arg,
name: {:via, Registry, {MyApp.Registry, "worker:#{id}"}})
```
### Duplicate Keys — Pub/Sub Pattern
```elixir
{Registry, keys: :duplicate, name: MyApp.PubSub}
# Subscribe (from each interested process)
Registry.register(MyApp.PubSub, "events:orders", [])
# Broadcast
Registry.dispatch(MyApp.PubSub, "events:orders", fn entries ->
for {pid, _} <- entries, do: send(pid, {:order_created, order})
end)
```
### Performance Tuning
```elixir
{Registry, keys: :unique, name: MyApp.Registry,
partitions: System.schedulers_online()}
```
Partition the registry for concurrent throughput under high load.
### Registry Metadata
```elixir
Registry.put_meta(MyApp.Registry, :config, %{version: 2})
{:ok, config} = Registry.meta(MyApp.Registry, :config)
```
---
## OTP Production Checklist
- Use `:one_for_one` by default; `:rest_for_one` when order = dependency
- Set `max_restarts`/`max_seconds` appropriate to your domain
- Set `:shutdown` to `:infinity` for nested supervisors, timeout for workers
- Use `PartitionSupervisor` if DynamicSupervisor becomes a bottleneck
- Use `:transient` restart for temporary workers; `:permanent` for core services
- Use `handle_continue` for post-init setup to avoid blocking the supervisor
- Use delayed reply pattern in GenServer for long-running operations
- Never `await` a Task inside a GenServer — use `async_nolink` + `handle_info`
- Always use `Task.Supervisor` over raw `Task.async` in production for visibility
- For Registry pub/sub, use `:duplicate` keys and `dispatch/3`
- Partition registries with `System.schedulers_online()` under high concurrency
- Don't use GenServer for code organization — use plain modules for pure functions
- Don't use Agent for complex state — graduate to GenServer

View File

@ -0,0 +1,916 @@
# Elixir Part 3: Phoenix Framework — v1.8.5 (Current)
Phoenix is the web framework for Elixir. Version 1.8.5 is current. It provides MVC controllers, real-time LiveView, Channels for WebSocket communication, and comprehensive tooling for authentication, testing, and deployment.
## What's New in Phoenix 1.8
- **Scopes** in generators — secure data access by default (e.g., `current_user` automatically applied)
- **Magic links** (passwordless auth) and **"sudo mode"** in `mix phx.gen.auth`
- **daisyUI** integration — light/dark/system mode support out of the box
- **Simplified layouts** — single `root.html.heex` wraps everything; dynamic layouts are function components
- **`use Phoenix.Controller` now requires `:formats`** — must specify `formats: [:html]` or `formats: [:html, :json]`
- **Updated security headers**`content-security-policy` with `base-uri 'self'; frame-ancestors 'self'`; dropped deprecated `x-frame-options` and `x-download-options`
- **`config` variable removed** from `Phoenix.Endpoint` — use `Application.compile_env/3` instead
- **Deprecated**: `:namespace`, `:put_default_views`, layouts without modules, `:trailing_slash` in router
---
## Project Setup
```bash
# New project (Phoenix Express — quickest path)
curl https://new.phoenixframework.org/my_app | sh
# Traditional setup
mix phx.new my_app
mix phx.new my_app --no-ecto # Without database
mix phx.new my_app --no-html # API only
mix phx.new my_app --database sqlite3 # SQLite instead of Postgres
```
## Directory Structure
```
my_app/
├── lib/
│ ├── my_app/ # Business logic (contexts, schemas)
│ │ ├── application.ex # Supervision tree
│ │ ├── repo.ex # Ecto Repo
│ │ └── catalog/ # Context: Catalog
│ │ ├── product.ex # Schema
│ │ └── catalog.ex # Context module
│ └── my_app_web/ # Web layer
│ ├── endpoint.ex # HTTP entry point
│ ├── router.ex # Routes + pipelines
│ ├── components/
│ │ ├── core_components.ex # Shared UI components
│ │ └── layouts.ex # Layout components
│ ├── controllers/
│ │ ├── page_controller.ex
│ │ └── page_html/ # Templates for controller
│ │ └── home.html.heex
│ └── live/ # LiveView modules
│ └── counter_live.ex
├── config/
│ ├── config.exs # Compile-time config
│ ├── dev.exs / prod.exs
│ └── runtime.exs # Runtime config (env vars, secrets)
├── priv/
│ ├── repo/migrations/
│ └── static/ # Static assets
├── test/
│ ├── support/
│ │ ├── conn_case.ex # Test helpers for controllers
│ │ └── data_case.ex # Test helpers for data layer
│ └── my_app_web/
└── mix.exs
```
---
## Router & Pipelines
```elixir
defmodule MyAppWeb.Router do
use MyAppWeb, :router
pipeline :browser do
plug :accepts, ["html"]
plug :fetch_session
plug :fetch_live_flash
plug :put_root_layout, html: {MyAppWeb.Layouts, :root}
plug :protect_from_forgery
plug :put_secure_browser_headers
end
pipeline :api do
plug :accepts, ["json"]
end
scope "/", MyAppWeb do
pipe_through :browser
get "/", PageController, :home
live "/counter", CounterLive # LiveView route
resources "/products", ProductController # RESTful CRUD
end
scope "/api", MyAppWeb do
pipe_through :api
resources "/items", ItemController, except: [:new, :edit]
end
end
```
---
## Verified Routes — `~p` Sigil
**Always use `~p` instead of string paths.** Compile-time verified against your router.
```elixir
# In templates (HEEx)
~H"""
<a href={~p"/products/#{product}"}>View</a>
<a href={~p"/products/#{product}/edit"}>Edit</a>
"""
# In controllers
redirect(conn, to: ~p"/products/#{product}")
# With query params
~p"/products?page=#{page}&sort=name"
~p"/products?#{%{page: 1, sort: "name"}}"
# URL generation (includes host)
url(~p"/products/#{product}")
# => "https://example.com/products/42"
```
---
## Controllers (1.8 Style)
```elixir
defmodule MyAppWeb.ProductController do
use MyAppWeb, :controller
# REQUIRED in 1.8: specify formats
# This is typically set in MyAppWeb :controller function
def index(conn, _params) do
products = Catalog.list_products()
render(conn, :index, products: products)
end
def show(conn, %{"id" => id}) do
product = Catalog.get_product!(id)
render(conn, :show, product: product)
end
def create(conn, %{"product" => product_params}) do
case Catalog.create_product(product_params) do
{:ok, product} ->
conn
|> put_flash(:info, "Product created.")
|> redirect(to: ~p"/products/#{product}")
{:error, %Ecto.Changeset{} = changeset} ->
render(conn, :new, changeset: changeset)
end
end
end
```
**View module naming:** For `ProductController`, Phoenix looks for `ProductHTML` (for `:html` format) and `ProductJSON` (for `:json` format).
```elixir
# lib/my_app_web/controllers/product_html.ex
defmodule MyAppWeb.ProductHTML do
use MyAppWeb, :html
embed_templates "product_html/*" # Loads .heex files from directory
end
# lib/my_app_web/controllers/product_json.ex
defmodule MyAppWeb.ProductJSON do
def index(%{products: products}), do: %{data: for(p <- products, do: data(p))}
def show(%{product: product}), do: %{data: data(product)}
defp data(product), do: %{id: product.id, title: product.title, price: product.price}
end
```
---
## Components and HEEx
```elixir
defmodule MyAppWeb.CoreComponents do
use Phoenix.Component
# Declare attributes with types and docs
attr :type, :string, default: "button"
attr :class, :string, default: nil
attr :rest, :global # Passes through all other HTML attrs
slot :inner_block, required: true
def button(assigns) do
~H"""
<button type={@type} class={["btn", @class]} {@rest}>
{render_slot(@inner_block)}
</button>
"""
end
# Table component with slots
attr :rows, :list, required: true
slot :col, required: true do
attr :label, :string
end
def table(assigns) do
~H"""
<table>
<thead>
<tr>
<th :for={col <- @col}>{col[:label]}</th>
</tr>
</thead>
<tbody>
<tr :for={row <- @rows}>
<td :for={col <- @col}>{render_slot(col, row)}</td>
</tr>
</tbody>
</table>
"""
end
end
```
**HEEx syntax notes:**
- `{@var}` — render assign (curly braces, not `<%= %>`)
- `:if={condition}` — conditional rendering on any tag
- `:for={item <- list}` — iteration on any tag
- `<.component_name />` — call function component with dot notation
- `<Component.name />` — call remote component with module name
- `{render_slot(@inner_block)}` — render slot content
- `<:slot_name>content</:slot_name>` — named slot content
---
## LiveView
```elixir
defmodule MyAppWeb.SearchLive do
use MyAppWeb, :live_view
def mount(_params, _session, socket) do
{:ok, assign(socket, query: "", results: [])}
end
def handle_params(%{"q" => query}, _uri, socket) do
{:noreply, assign(socket, query: query, results: search(query))}
end
def handle_params(_params, _uri, socket), do: {:noreply, socket}
def handle_event("search", %{"query" => query}, socket) do
{:noreply,
socket
|> assign(query: query, results: search(query))
|> push_patch(to: ~p"/search?q=#{query}")}
end
def render(assigns) do
~H"""
<Layouts.app flash={@flash}>
<form phx-submit="search">
<input name="query" value={@query} phx-debounce="300" />
<button type="submit">Search</button>
</form>
<div :for={result <- @results} class="result">
<h3>{result.title}</h3>
<p>{result.summary}</p>
</div>
</Layouts.app>
"""
end
defp search(query), do: MyApp.Search.find(query)
end
```
**LiveView lifecycle:** `mount/3``handle_params/3``render/1`. Events via `handle_event/3`. Server pushes via `handle_info/2`.
**Key patterns:**
- `assign/2,3` — set socket assigns
- `push_navigate/2` — navigate to new LiveView (full mount)
- `push_patch/2` — update URL without full remount (calls `handle_params`)
- `push_event/3` — push event to client JS hooks
- `stream/3,4` — efficient list rendering for large collections (inserts/deletes without re-rendering entire list)
- `async_assign/3` + `assign_async/3` — async data loading with loading/error states
---
## Channels — Real-Time WebSocket
```elixir
# In endpoint.ex
socket "/socket", MyAppWeb.UserSocket,
websocket: true,
longpoll: false
# lib/my_app_web/channels/user_socket.ex
defmodule MyAppWeb.UserSocket do
use Phoenix.Socket
channel "room:*", MyAppWeb.RoomChannel
def connect(%{"token" => token}, socket, _connect_info) do
case Phoenix.Token.verify(socket, "user auth", token, max_age: 86400) do
{:ok, user_id} -> {:ok, assign(socket, :user_id, user_id)}
{:error, _} -> :error
end
end
def id(socket), do: "users_socket:#{socket.assigns.user_id}"
end
# lib/my_app_web/channels/room_channel.ex
defmodule MyAppWeb.RoomChannel do
use MyAppWeb, :channel
def join("room:" <> room_id, _params, socket) do
{:ok, assign(socket, :room_id, room_id)}
end
def handle_in("new_msg", %{"body" => body}, socket) do
broadcast!(socket, "new_msg", %{
body: body,
user_id: socket.assigns.user_id
})
{:noreply, socket}
end
end
```
**Force disconnect all sessions for a user:**
```elixir
MyAppWeb.Endpoint.broadcast("users_socket:#{user.id}", "disconnect", %{})
```
---
## Contexts — Business Logic Boundary
Contexts are plain Elixir modules that encapsulate data access and business rules. They are the API between your web layer and your domain.
```elixir
# Generate with: mix phx.gen.context Catalog Product products title:string price:decimal
defmodule MyApp.Catalog do
import Ecto.Query
alias MyApp.Repo
alias MyApp.Catalog.Product
def list_products do
Repo.all(Product)
end
def get_product!(id), do: Repo.get!(Product, id)
def create_product(attrs \\ %{}) do
%Product{}
|> Product.changeset(attrs)
|> Repo.insert()
end
def update_product(%Product{} = product, attrs) do
product
|> Product.changeset(attrs)
|> Repo.update()
end
def delete_product(%Product{} = product) do
Repo.delete(product)
end
def change_product(%Product{} = product, attrs \\ %{}) do
Product.changeset(product, attrs)
end
end
```
**Context design principles:**
- One context per bounded domain (Catalog, Accounts, Orders)
- Contexts own their schemas — other contexts reference by ID, not struct
- Cross-context calls go through the public context API, never access another context's Repo directly
- Contexts can nest related schemas (Comments under Posts)
---
## Authentication — `mix phx.gen.auth`
```bash
mix phx.gen.auth Accounts User users
```
Phoenix 1.8 generates:
- **Magic links** (passwordless) — email-based login links
- **"Sudo mode"** — re-authentication for sensitive actions
- Session-based auth with secure token handling
- Email confirmation and password reset flows
- `require_authenticated_user` plug for protected routes
## Scopes (New in 1.8)
Scopes make secure data access the default in generators. When you generate resources with a scope, all queries are automatically filtered by the scoped user.
```bash
mix phx.gen.live Posts Post posts title body:text --scope current_user
```
This generates code that automatically passes `current_user` to context functions, ensuring users only see their own data.
---
## Ecto — Database Layer
Ecto has four core components: **Repo** (database wrapper), **Schema** (data mapping), **Changeset** (validation + change tracking), and **Query** (composable queries). Always preload associations explicitly — Ecto never lazy-loads.
### Schema
```elixir
defmodule MyApp.Catalog.Product do
use Ecto.Schema
import Ecto.Changeset
schema "products" do
field :title, :string
field :price, :decimal
field :status, Ecto.Enum, values: [:draft, :published, :archived]
field :metadata, :map, default: %{}
field :tags, {:array, :string}, default: []
field :computed, :string, virtual: true # Not persisted
field :slug, :string, source: :url_slug # Maps to different DB column
has_many :reviews, MyApp.Reviews.Review
has_one :detail, MyApp.Catalog.ProductDetail
belongs_to :category, MyApp.Catalog.Category
many_to_many :tags, MyApp.Tag, join_through: "product_tags"
embeds_one :seo, SEO, on_replace: :update do
field :meta_title, :string
field :meta_description, :string
end
embeds_many :variants, Variant, on_replace: :delete do
field :sku, :string
field :price, :decimal
end
timestamps(type: :utc_datetime)
end
def changeset(product, attrs) do
product
|> cast(attrs, [:title, :price, :status, :category_id, :metadata, :tags])
|> cast_embed(:seo)
|> cast_embed(:variants)
|> validate_required([:title, :price])
|> validate_number(:price, greater_than: 0)
|> validate_length(:title, min: 3, max: 255)
|> validate_inclusion(:status, [:draft, :published, :archived])
|> unique_constraint(:title)
|> foreign_key_constraint(:category_id)
end
end
```
**Field types:** `:string`, `:integer`, `:float`, `:decimal`, `:boolean`, `:binary`, `:map`, `{:array, type}`, `:date`, `:time`, `:naive_datetime`, `:utc_datetime`, `:utc_datetime_usec`, `:binary_id`, `Ecto.UUID`, `Ecto.Enum`
**Field options:** `:default`, `:source` (DB column name), `:virtual` (not persisted), `:redact` (mask in inspect), `:read_after_writes` (re-read from DB post-write), `:autogenerate`
**Embed `:on_replace` options:** `:raise` (default), `:mark_as_invalid`, `:update` (embeds_one), `:delete` (embeds_many)
### Changeset — Validation & Change Tracking
```elixir
# Validations (run in-memory, no DB)
|> validate_required([:field])
|> validate_format(:email, ~r/@/)
|> validate_length(:name, min: 2, max: 100)
|> validate_number(:age, greater_than: 0, less_than: 150)
|> validate_inclusion(:role, [:admin, :user])
|> validate_exclusion(:username, ["admin", "root"])
|> validate_acceptance(:terms) # Checkbox must be true
|> validate_confirmation(:password) # password_confirmation must match
|> validate_subset(:permissions, [:read, :write, :admin])
|> validate_change(:email, fn :email, email -> # Custom per-field validation
if String.contains?(email, "+"), do: [email: "no plus addressing"], else: []
end)
# Constraints (checked by DB — only run if validations pass)
|> unique_constraint(:email)
|> unique_constraint([:user_id, :project_id], name: :user_projects_unique_index)
|> foreign_key_constraint(:category_id)
|> check_constraint(:price, name: :price_must_be_positive)
|> no_assoc_constraint(:reviews) # Prevent delete if has associations
|> exclusion_constraint(:date_range) # Postgres range exclusion
```
**Schemaless changesets** — validate arbitrary data without a schema:
```elixir
types = %{email: :string, age: :integer}
changeset = {%{}, types}
|> Ecto.Changeset.cast(params, Map.keys(types))
|> Ecto.Changeset.validate_required([:email])
case Ecto.Changeset.apply_action(changeset, :validate) do
{:ok, data} -> use_data(data)
{:error, changeset} -> show_errors(changeset)
end
```
**Error traversal:**
```elixir
Ecto.Changeset.traverse_errors(changeset, fn {msg, opts} ->
Regex.replace(~r"%{(\w+)}", msg, fn _, key ->
opts |> Keyword.get(String.to_existing_atom(key), key) |> to_string()
end)
end)
# => %{title: ["can't be blank"], price: ["must be greater than 0"]}
```
### Query — Composable & Dynamic
```elixir
import Ecto.Query
# Composable queries — chain them together
def published(query \\ Product) do
from p in query, where: p.status == :published
end
def recent(query, days \\ 7) do
from p in query, where: p.inserted_at > ago(^days, "day")
end
def with_reviews(query) do
from p in query, preload: [:reviews]
end
# Usage: Product |> published() |> recent(30) |> with_reviews() |> Repo.all()
```
**Named bindings** — track joins without positional counting:
```elixir
from p in Post, as: :post,
join: c in assoc(p, :comments), as: :comment,
where: as(:comment).approved == true,
select: {as(:post).title, count(as(:comment).id)},
group_by: as(:post).id
```
**Dynamic queries** — build conditions programmatically:
```elixir
def filter(params) do
query = from(p in Product)
conditions = true
conditions = if params["status"],
do: dynamic([p], p.status == ^params["status"] and ^conditions),
else: conditions
conditions = if params["min_price"],
do: dynamic([p], p.price >= ^params["min_price"] and ^conditions),
else: conditions
from p in query, where: ^conditions
end
```
**Subqueries:**
```elixir
top_products = from p in Product, order_by: [desc: :sales], limit: 10
from p in subquery(top_products), select: avg(p.price)
```
**Window functions:**
```elixir
from e in Employee,
select: {e.name, e.salary, over(avg(e.salary), :dept)},
windows: [dept: [partition_by: e.department_id, order_by: e.salary]]
```
**Fragments (raw SQL):**
```elixir
from p in Post,
where: fragment("? @> ?", p.tags, ^["elixir"]), # Postgres array contains
order_by: fragment("random()")
```
**Preloading strategies:**
```elixir
# Separate queries (default) — parallelized, no data duplication
Repo.all(Post) |> Repo.preload([:comments, :author])
# Join preload — single query, watch for Cartesian product
from p in Post, join: c in assoc(p, :comments), preload: [comments: c]
# Custom query preload
Repo.preload(posts, comments: from(c in Comment, order_by: c.inserted_at))
# Nested preload
Repo.preload(posts, [comments: [:author, :replies]])
```
**Important:** Use `is_nil(field)` not `field == nil` in queries. Only one `select` per query — use `select_merge` to compose.
### Repo — Database Operations
```elixir
# CRUD
Repo.insert(changeset) # {:ok, struct} | {:error, changeset}
Repo.update(changeset)
Repo.delete(struct)
Repo.insert!(changeset) # Returns struct or raises
Repo.get(Post, 1) # nil if not found
Repo.get!(Post, 1) # Raises if not found
Repo.get_by(Post, title: "Hello") # By arbitrary fields
Repo.one(query) # Raises if > 1 result
# Bulk operations — return {count, nil | results}
Repo.insert_all(Post, [%{title: "A"}, %{title: "B"}])
Repo.update_all(from(p in Post, where: p.old == true), set: [archived: true])
Repo.delete_all(from(p in Post, where: p.inserted_at < ago(1, "year")))
# NOTE: update_all does NOT update auto-generated fields like updated_at
# Upserts (on_conflict)
Repo.insert(changeset,
on_conflict: :nothing, # Ignore conflict
conflict_target: [:email]
)
Repo.insert(changeset,
on_conflict: {:replace, [:name, :updated_at]}, # Update specific fields
conflict_target: [:email]
)
Repo.insert(changeset,
on_conflict: :replace_all, # Replace everything
conflict_target: :id
)
# Aggregation
Repo.aggregate(Post, :count) # SELECT count(*)
Repo.aggregate(Post, :sum, :views) # SELECT sum(views)
Repo.aggregate(query, :avg, :price) # Works with queries too
```
### Ecto.Multi — Atomic Transactions
Group operations that must all succeed or all fail:
```elixir
alias Ecto.Multi
Multi.new()
|> Multi.insert(:post, Post.changeset(%Post{}, post_attrs))
|> Multi.insert(:comment, fn %{post: post} ->
Comment.changeset(%Comment{}, %{post_id: post.id, body: "First!"})
end)
|> Multi.update_all(:increment, from(u in User, where: u.id == ^user_id),
inc: [post_count: 1])
|> Multi.run(:notify, fn _repo, %{post: post} ->
# Arbitrary logic — return {:ok, _} or {:error, _}
Notifications.send_new_post(post)
end)
|> Repo.transaction()
# => {:ok, %{post: %Post{}, comment: %Comment{}, increment: {1, nil}, notify: :sent}}
# => {:error, failed_op, failed_value, changes_so_far}
```
**Key patterns:** Each operation name must be unique. Failed operations roll back everything. Use `Multi.run/3` for arbitrary logic. Test with `Multi.to_list/1` to inspect without executing.
### Streaming — Large Result Sets
```elixir
Repo.transaction(fn ->
Post
|> where([p], p.status == :published)
|> Repo.stream(max_rows: 500)
|> Stream.each(&process_post/1)
|> Stream.run()
end)
```
Must execute within a transaction. Default batch size is 500 rows.
### Migrations
```elixir
defmodule MyApp.Repo.Migrations.CreateProducts do
use Ecto.Migration
def change do
create table("products") do
add :title, :string, null: false, size: 255
add :price, :decimal, precision: 10, scale: 2
add :status, :string, default: "draft"
add :metadata, :map, default: %{}
add :tags, {:array, :string}, default: []
add :category_id, references("categories", on_delete: :restrict)
timestamps(type: :utc_datetime)
end
create index("products", [:category_id])
create unique_index("products", [:title])
create index("products", [:status, :inserted_at])
create index("products", [:tags], using: :gin) # Postgres array index
create index("products", [:title],
where: "status = 'published'", name: :published_title_idx) # Partial index
end
end
```
**Alter tables:**
```elixir
def change do
alter table("products") do
add :slug, :string
modify :title, :text, from: :string # :from required for reversibility
remove :deprecated_field, :string # type required for reversibility
end
# Raw SQL when needed
execute "CREATE EXTENSION IF NOT EXISTS \"pg_trgm\"",
"DROP EXTENSION IF EXISTS \"pg_trgm\""
end
```
**Run migrations:** `mix ecto.migrate` / `mix ecto.rollback` / `mix ecto.reset`
### Ecto.Enum
```elixir
# String storage (default)
field :status, Ecto.Enum, values: [:draft, :published, :archived]
# Integer storage
field :priority, Ecto.Enum, values: [low: 1, medium: 2, high: 3]
# Array of enums
field :roles, {:array, Ecto.Enum}, values: [:admin, :editor, :viewer]
# Query helpers
Ecto.Enum.values(Product, :status) # [:draft, :published, :archived]
Ecto.Enum.dump_values(Product, :status) # ["draft", "published", "archived"]
Ecto.Enum.mappings(Product, :status) # [draft: "draft", ...] — for form dropdowns
```
---
## Telemetry — Built-in Observability
Phoenix 1.8 includes a Telemetry supervisor that tracks request duration, Ecto query times, and VM metrics out of the box.
```elixir
# lib/my_app_web/telemetry.ex (auto-generated)
defmodule MyAppWeb.Telemetry do
use Supervisor
import Telemetry.Metrics
def metrics do
[
summary("phoenix.endpoint.stop.duration", unit: {:native, :millisecond}),
summary("phoenix.router_dispatch.stop.duration", tags: [:route], unit: {:native, :millisecond}),
summary("my_app.repo.query.total_time", unit: {:native, :millisecond}),
summary("vm.memory.total", unit: {:byte, :kilobyte}),
summary("vm.total_run_queue_lengths.total"),
summary("vm.total_run_queue_lengths.cpu"),
]
end
end
```
Integrates with **PromEx** for Prometheus/Grafana dashboards (see Part 4: Ecosystem).
---
## Security Best Practices
**Never pass untrusted input to:** `Code.eval_string/3`, `:os.cmd/2`, `System.cmd/3`, `System.shell/2`, `:erlang.binary_to_term/2`
**Ecto prevents SQL injection by default** — the query DSL parameterizes all inputs. Only `Ecto.Adapters.SQL.query/4` with raw string interpolation is vulnerable.
**Safe deserialization:**
```elixir
# UNSAFE — even with :safe flag
:erlang.binary_to_term(user_input, [:safe])
# SAFE — prevents executable terms
Plug.Crypto.non_executable_binary_to_term(user_input, [:safe])
```
**CSRF protection** is built into the `:browser` pipeline via `protect_from_forgery`. **Content Security Policy** is set by `put_secure_browser_headers`.
---
## Testing Phoenix
```elixir
# Controller test
defmodule MyAppWeb.ProductControllerTest do
use MyAppWeb.ConnCase
test "GET /products", %{conn: conn} do
conn = get(conn, ~p"/products")
assert html_response(conn, 200) =~ "Products"
end
test "POST /products creates product", %{conn: conn} do
conn = post(conn, ~p"/products", product: %{title: "Widget", price: 9.99})
assert redirected_to(conn) =~ ~p"/products/"
end
end
# LiveView test
defmodule MyAppWeb.CounterLiveTest do
use MyAppWeb.ConnCase
import Phoenix.LiveViewTest
test "increments counter", %{conn: conn} do
{:ok, view, html} = live(conn, ~p"/counter")
assert html =~ "Count: 0"
assert view
|> element("button", "+1")
|> render_click() =~ "Count: 1"
end
end
# Channel test
defmodule MyAppWeb.RoomChannelTest do
use MyAppWeb.ChannelCase
test "broadcasts new messages" do
{:ok, _, socket} = subscribe_and_join(socket(MyAppWeb.UserSocket), MyAppWeb.RoomChannel, "room:lobby")
push(socket, "new_msg", %{"body" => "hello"})
assert_broadcast "new_msg", %{body: "hello"}
end
end
```
---
## Generators Cheat Sheet
```bash
# HTML CRUD (controller + views + templates + context + schema + migration)
mix phx.gen.html Catalog Product products title:string price:decimal
# LiveView CRUD
mix phx.gen.live Catalog Product products title:string price:decimal
# JSON API
mix phx.gen.json Catalog Product products title:string price:decimal
# Context + schema only (no web layer)
mix phx.gen.context Catalog Product products title:string price:decimal
# Schema + migration only
mix phx.gen.schema Product products title:string price:decimal
# Authentication
mix phx.gen.auth Accounts User users
# Channel
mix phx.gen.channel Room
# Presence
mix phx.gen.presence
# Release files (Dockerfile, release.ex, overlay scripts)
mix phx.gen.release
mix phx.gen.release --docker # Include Dockerfile
```
---
## Release & Deployment
```bash
# Generate release infrastructure
mix phx.gen.release --docker
# Build release
MIX_ENV=prod mix deps.get --only prod
MIX_ENV=prod mix compile
MIX_ENV=prod mix assets.deploy
MIX_ENV=prod mix release
# Release commands
_build/prod/rel/my_app/bin/server # Start with Phoenix server
_build/prod/rel/my_app/bin/migrate # Run migrations
_build/prod/rel/my_app/bin/my_app remote # Attach IEx console
```
**Runtime config (`config/runtime.exs`):**
```elixir
import Config
if config_env() == :prod do
database_url = System.fetch_env!("DATABASE_URL")
secret_key_base = System.fetch_env!("SECRET_KEY_BASE")
config :my_app, MyApp.Repo,
url: database_url,
pool_size: String.to_integer(System.get_env("POOL_SIZE") || "10")
config :my_app, MyAppWeb.Endpoint,
url: [host: System.fetch_env!("PHX_HOST"), port: 443, scheme: "https"],
http: [ip: {0, 0, 0, 0}, port: String.to_integer(System.get_env("PORT") || "4000")],
secret_key_base: secret_key_base
end
```

View File

@ -0,0 +1,639 @@
# Elixir Part 4: Ecosystem, Production & Deployment
## Ash Framework — Declarative Resource Modeling
For production/substantial projects, Ash provides a declarative, resource-oriented approach that eliminates boilerplate while remaining extensible.
```elixir
# mix.exs deps
{:ash, "~> 3.0"},
{:ash_postgres, "~> 2.0"}, # Ecto/Postgres data layer
{:ash_phoenix, "~> 2.0"}, # Phoenix integration
{:ash_graphql, "~> 1.0"}, # Auto-generated GraphQL API
{:ash_json_api, "~> 1.0"}, # Auto-generated JSON:API
```
### Resource Definition
```elixir
defmodule MyApp.Blog.Post do
use Ash.Resource,
domain: MyApp.Blog,
data_layer: AshPostgres.DataLayer
postgres do
table "posts"
repo MyApp.Repo
end
attributes do
uuid_primary_key :id
attribute :title, :string, allow_nil?: false
attribute :body, :string, allow_nil?: false
attribute :status, :atom, constraints: [one_of: [:draft, :published, :archived]], default: :draft
timestamps()
end
relationships do
belongs_to :author, MyApp.Accounts.User
has_many :comments, MyApp.Blog.Comment
end
actions do
defaults [:read, :destroy]
create :create do
accept [:title, :body]
change relate_actor(:author)
end
update :publish do
change set_attribute(:status, :published)
end
end
policies do
policy action_type(:read) do
authorize_if always()
end
policy action_type([:create, :update, :destroy]) do
authorize_if actor_attribute_equals(:role, :admin)
end
end
end
```
### Domain Definition
```elixir
defmodule MyApp.Blog do
use Ash.Domain
resources do
resource MyApp.Blog.Post
resource MyApp.Blog.Comment
end
end
```
### Using Ash Resources
```elixir
# Create
MyApp.Blog.Post
|> Ash.Changeset.for_create(:create, %{title: "Hello", body: "World"}, actor: current_user)
|> Ash.create!()
# Read with filters
MyApp.Blog.Post
|> Ash.Query.filter(status == :published)
|> Ash.Query.sort(inserted_at: :desc)
|> Ash.read!()
# Custom action
post |> Ash.Changeset.for_update(:publish) |> Ash.update!()
```
**When to use Ash vs plain Phoenix:** Ash excels when you need consistent authorization, multi-tenancy, auto-generated APIs, or complex domain logic. For simple CRUD apps or learning, plain Phoenix contexts are fine.
---
## Jido — Multi-Agent Orchestration Framework
Jido (~> 2.1) is the premier Elixir framework for building and managing agents of all types — not just LLMs. Vision: 10,000 agents per user on the BEAM.
```elixir
# mix.exs deps
{:jido, "~> 2.1"},
{:jido_ai, "~> 0.x"}, # AI/LLM integration
{:jido_action, "~> 0.x"}, # Composable actions
{:jido_signal, "~> 0.x"}, # CloudEvents-based messaging
```
### Core Concepts
**Agents are pure functional structs** — immutable, no side effects in the agent itself. Side effects are described as Directives.
```elixir
defmodule MyAgent do
use Jido.Agent,
name: "my_agent",
description: "Does useful things",
actions: [FetchData, ProcessData, NotifyUser],
schema: [
model: [type: :string, default: "gpt-4"],
temperature: [type: :float, default: 0.7]
]
end
# Create and command
{:ok, agent} = MyAgent.new()
{agent, directives} = MyAgent.cmd(agent, %Signal{type: "task.assigned", data: %{task: "analyze"}})
# directives are typed effects: Emit, Spawn, Schedule, Stop
```
### Actions — Composable Units
Actions are the building blocks. Each has a schema, validates input, and returns output.
```elixir
defmodule FetchData do
use Jido.Action,
name: "fetch_data",
description: "Fetches data from an API",
schema: [
url: [type: :string, required: true],
timeout: [type: :integer, default: 5000]
]
@impl true
def run(params, _context) do
case Req.get(params.url, receive_timeout: params.timeout) do
{:ok, %{status: 200, body: body}} -> {:ok, %{data: body}}
{:ok, %{status: status}} -> {:error, "HTTP #{status}"}
{:error, reason} -> {:error, reason}
end
end
end
# Actions expose themselves as AI tools
FetchData.to_tool() # Returns tool spec for LLM function calling
```
### Signals — CloudEvents v1.0.2 Messaging
```elixir
signal = %Jido.Signal{
type: "task.completed",
source: "agent:worker_1",
data: %{result: "analysis complete"},
subject: "task:123"
}
```
### Directives — Typed Effect Descriptions
Agents don't perform side effects directly. They return Directives describing what should happen:
| Directive | Effect |
|-----------|--------|
| `Emit` | Send a signal to another agent/system |
| `Spawn` | Create a new child agent |
| `Schedule` | Schedule a future action |
| `Stop` | Terminate the agent |
### AgentServer — Runtime Process
`AgentServer` wraps an agent in a GenServer for real-time operation:
```elixir
{:ok, pid} = Jido.AgentServer.start_link(agent: MyAgent, id: "worker-1")
Jido.AgentServer.signal(pid, %Signal{type: "task.start", data: %{...}})
```
### When to Use Jido
- Multi-agent systems where agents communicate via signals
- LLM-powered agents that need tool calling (via `to_tool/0`)
- Systems requiring parent-child agent hierarchies
- Workflows with complex state machines (Jido includes FSM strategy)
- Any scenario targeting high agent density (thousands per node)
---
## OTP Releases & Docker
### OTP Release
```elixir
# mix.exs
def project do
[
app: :my_app,
releases: [
my_app: [
include_executables_for: [:unix],
steps: [:assemble, :tar]
]
]
]
end
```
### Docker Multistage Build
```dockerfile
# Build stage
FROM hexpm/elixir:1.19.5-erlang-27.3-debian-bookworm-20240904 AS build
RUN apt-get update && apt-get install -y build-essential git
WORKDIR /app
ENV MIX_ENV=prod
COPY mix.exs mix.lock ./
RUN mix deps.get --only $MIX_ENV && mix deps.compile
COPY config/config.exs config/prod.exs config/
COPY lib lib
COPY priv priv
COPY assets assets
RUN mix assets.deploy && mix compile && mix release
# Runtime stage
FROM debian:bookworm-slim
RUN apt-get update && apt-get install -y libstdc++6 openssl libncurses5 locales
RUN sed -i '/en_US.UTF-8/s/^# //g' /etc/locale.gen && locale-gen
ENV LANG=en_US.UTF-8
WORKDIR /app
COPY --from=build /app/_build/prod/rel/my_app ./
HEALTHCHECK --interval=30s --timeout=5s --start-period=10s --retries=3 \
CMD curl -f http://localhost:4000/health || exit 1
CMD ["bin/server"]
```
---
## Distributed Erlang & Clustering
```elixir
# libcluster config (config/runtime.exs)
config :libcluster,
topologies: [
local: [
strategy: Cluster.Strategy.Gossip
],
# OR for Docker/K8s:
k8s: [
strategy: Cluster.Strategy.Kubernetes,
config: [
mode: :hostname,
kubernetes_namespace: "default",
kubernetes_selector: "app=my_app"
]
]
]
# Distributed PubSub (built into Phoenix)
Phoenix.PubSub.broadcast(MyApp.PubSub, "events", {:new_order, order})
Phoenix.PubSub.subscribe(MyApp.PubSub, "events")
# Horde for distributed supervisor/registry
{:horde, "~> 0.9"}
```
---
## Monitoring & Observability
### PromEx — Prometheus Metrics for Elixir
```elixir
# mix.exs
{:prom_ex, "~> 1.9"}
# lib/my_app/prom_ex.ex
defmodule MyApp.PromEx do
use PromEx, otp_app: :my_app
@impl true
def plugins do
[
PromEx.Plugins.Application,
PromEx.Plugins.Beam,
{PromEx.Plugins.Phoenix, router: MyAppWeb.Router, endpoint: MyAppWeb.Endpoint},
{PromEx.Plugins.Ecto, repos: [MyApp.Repo]},
PromEx.Plugins.Oban
]
end
@impl true
def dashboards do
[
{:prom_ex, "application.json"},
{:prom_ex, "beam.json"},
{:prom_ex, "phoenix.json"},
{:prom_ex, "ecto.json"}
]
end
end
```
### Full Observability Stack (BEAMOps)
From "Engineering Elixir Applications":
| Component | Role |
|-----------|------|
| **PromEx** | Expose BEAM/Phoenix/Ecto metrics as Prometheus endpoints |
| **Prometheus** | Scrape and store time-series metrics |
| **Grafana** | Dashboards and alerting |
| **Loki** | Log aggregation (like Prometheus but for logs) |
| **Promtail/Alloy** | Ship logs from containers to Loki |
**Health checks + automatic rollback:** Docker `HEALTHCHECK` triggers rollback if the new container fails health checks within the start period. Use `docker system prune` automation to prevent disk bloat.
---
## CI/CD — Cortex-Native Pipeline
Instead of GitHub Actions, we run CI/CD directly on cortex using shell scripts, systemd, and git hooks. This keeps the entire workflow within our ecosystem.
### Git Push Hook — Trigger on Push
On cortex, set up a bare repo with a `post-receive` hook:
```bash
# On cortex: create bare repo
mkdir -p /data/repos/my_app.git && cd /data/repos/my_app.git
git init --bare
# post-receive hook
cat > hooks/post-receive << 'HOOK'
#!/bin/bash
set -euo pipefail
WORK_DIR="/data/builds/my_app"
LOG="/var/log/ci/my_app-$(date +%Y%m%d-%H%M%S).log"
mkdir -p /var/log/ci "$WORK_DIR"
echo "=== CI triggered at $(date) ===" | tee "$LOG"
# Checkout latest
GIT_WORK_TREE="$WORK_DIR" git checkout -f main 2>&1 | tee -a "$LOG"
# Run CI pipeline
cd "$WORK_DIR"
exec /data/scripts/ci-pipeline.sh "$WORK_DIR" 2>&1 | tee -a "$LOG"
HOOK
chmod +x hooks/post-receive
```
### CI Pipeline Script
```bash
#!/bin/bash
# /data/scripts/ci-pipeline.sh
set -euo pipefail
APP_DIR="$1"
cd "$APP_DIR"
echo "--- Dependencies ---"
mix deps.get
echo "--- Compile (warnings as errors) ---"
mix compile --warnings-as-errors
echo "--- Format check ---"
mix format --check-formatted
echo "--- Static analysis ---"
mix credo --strict
echo "--- Tests ---"
MIX_ENV=test mix test
echo "--- Unused deps check ---"
mix deps.unlock --check-unused
echo "--- Build release ---"
MIX_ENV=prod mix release --overwrite
echo "=== CI PASSED ==="
# Optional: auto-deploy on success
# /data/scripts/deploy.sh "$APP_DIR"
```
### Deploy Script
```bash
#!/bin/bash
# /data/scripts/deploy.sh
set -euo pipefail
APP_DIR="$1"
APP_NAME=$(basename "$APP_DIR")
RELEASE_DIR="/data/releases/$APP_NAME"
echo "--- Deploying $APP_NAME ---"
# Stop current
systemctl stop "$APP_NAME" 2>/dev/null || true
# Copy release
mkdir -p "$RELEASE_DIR"
cp -r "$APP_DIR/_build/prod/rel/$APP_NAME/"* "$RELEASE_DIR/"
# Run migrations
"$RELEASE_DIR/bin/migrate"
# Start
systemctl start "$APP_NAME"
# Health check with rollback
sleep 5
if ! curl -sf http://localhost:4000/health > /dev/null; then
echo "!!! Health check failed — rolling back"
systemctl stop "$APP_NAME"
# Restore previous release from backup
cp -r "$RELEASE_DIR.prev/"* "$RELEASE_DIR/"
systemctl start "$APP_NAME"
exit 1
fi
echo "=== Deploy SUCCESS ==="
```
### Systemd Service Template
```ini
# /etc/systemd/system/my_app.service
[Unit]
Description=MyApp Elixir Service
After=network.target postgresql.service
[Service]
Type=exec
User=deploy
Environment=MIX_ENV=prod
Environment=PORT=4000
Environment=PHX_HOST=myapp.hydrascale.net
EnvironmentFile=/data/releases/my_app/.env
ExecStart=/data/releases/my_app/bin/server
ExecStop=/data/releases/my_app/bin/my_app stop
Restart=on-failure
RestartSec=5
[Install]
WantedBy=multi-user.target
```
### Push from Dev Machine
```bash
# Add cortex as a git remote
git remote add cortex root@cortex.hydrascale.net:/data/repos/my_app.git
# Push triggers CI → optional auto-deploy
git push cortex main
```
### Symbiont Integration
For orchestration via Symbiont, the CI can report status:
```bash
# At end of ci-pipeline.sh
curl -s -X POST http://localhost:8080/api/tasks/ci-report \
-H "Content-Type: application/json" \
-d "{\"app\": \"$APP_NAME\", \"status\": \"passed\", \"commit\": \"$(git rev-parse HEAD)\"}"
```
---
## Interop — Elixir as Orchestrator
### Python via Ports/Erlport
```elixir
# Using erlport for bidirectional Python calls
{:ok, pid} = :python.start([{:python_path, ~c"./python_scripts"}])
result = :python.call(pid, :my_module, :my_function, [arg1, arg2])
:python.stop(pid)
```
### Rust via Rustler NIFs
```elixir
# mix.exs
{:rustler, "~> 0.34"}
# lib/my_nif.ex
defmodule MyApp.NativeSort do
use Rustler, otp_app: :my_app, crate: "native_sort"
# NIF stubs — replaced at load time by Rust implementations
def sort(_list), do: :erlang.nif_error(:nif_not_loaded)
end
```
### System Commands via Ports
```elixir
# One-shot command
{output, 0} = System.cmd("ffmpeg", ["-i", input, "-o", output])
# Long-running port
port = Port.open({:spawn, "python3 worker.py"}, [:binary, :exit_status])
send(port, {self(), {:command, "process\n"}})
receive do
{^port, {:data, data}} -> handle_response(data)
end
```
---
## Cortex Deployment Story
### Current State
- Cortex runs Ubuntu 24.04 with Caddy as web server
- Elixir is **not yet installed** — needs `asdf` or direct install
- Docker is **planned but not installed** — needed for containerized Elixir apps
- Symbiont (Python) is the current orchestrator — Elixir will gradually take over
### Installation Plan for Cortex
```bash
# Option 1: asdf (recommended — manages multiple versions)
git clone https://github.com/asdf-vm/asdf.git ~/.asdf --branch v0.14.0
echo '. "$HOME/.asdf/asdf.sh"' >> ~/.bashrc
asdf plugin add erlang
asdf plugin add elixir
asdf install erlang 27.3
asdf install elixir 1.19.5-otp-27
asdf global erlang 27.3
asdf global elixir 1.19.5-otp-27
# Option 2: Direct from Erlang Solutions
wget https://packages.erlang-solutions.com/erlang-solutions_2.0_all.deb
dpkg -i erlang-solutions_2.0_all.deb
apt-get update && apt-get install esl-erlang elixir
```
### BEAMOps Principles for Cortex
From "Engineering Elixir Applications" — the deployment philosophy:
1. **Environment Integrity** — identical builds dev/staging/prod via Docker + releases
2. **Infrastructure as Code** — Caddy config, systemd units, backup scripts all version-controlled
3. **OTP Releases** — self-contained, no runtime deps, `bin/my_app start`
4. **Distributed Erlang** — nodes discover each other, share state via PubSub, global registry
5. **Instrumentation** — PromEx + Prometheus + Grafana + Loki for full observability
6. **Health Checks + Rollbacks** — Docker health checks trigger automatic rollback on failed deploys
7. **Zero-downtime deploys** — rolling updates via Docker Swarm or `mix release` hot upgrades
---
## Anti-Patterns to Avoid
### Process Anti-Patterns
- **GenServer as code organization** — don't wrap pure functions in a GenServer. Use modules.
- **Agent for complex state** — if you need more than get/update, use GenServer directly.
- **Spawning unsupervised processes** — always use `Task.Supervisor` or link to a supervisor.
### Code Anti-Patterns
- **Primitive obsession** — use structs, not bare maps, for domain concepts.
- **Boolean parameters** — use atoms or keyword options: `format: :json` not `json: true`.
- **Large modules** — split by concern, not by entity type. Domain logic, web layer, workers.
- **String keys in internal maps** — use atoms internally, strings only at boundaries (JSON, forms).
### Design Anti-Patterns
- **Monolithic contexts** — Phoenix contexts should be small, focused. Split `Accounts` from `Authentication`.
- **God GenServer** — one process handling all state for the app. Distribute responsibility.
- **Synchronous calls to slow services** — use `Task.async` + `Task.await` with timeouts.
---
## Quick Recipes
### HTTP Request with Req
```elixir
Req.get!("https://api.example.com/data",
headers: [{"authorization", "Bearer #{token}"}],
receive_timeout: 15_000
).body
```
### JSON Encode/Decode
```elixir
Jason.encode!(%{name: "Michael", role: :admin})
Jason.decode!(~s({"name": "Michael"}), keys: :atoms)
```
### Background Job with Oban
```elixir
defmodule MyApp.Workers.EmailWorker do
use Oban.Worker, queue: :mailers, max_attempts: 3
@impl true
def perform(%Oban.Job{args: %{"to" => to, "template" => template}}) do
MyApp.Mailer.send(to, template)
end
end
# Enqueue
%{to: "user@example.com", template: "welcome"}
|> MyApp.Workers.EmailWorker.new(schedule_in: 60)
|> Oban.insert!()
```
---
## Resources
- [Elixir Official Docs](https://hexdocs.pm/elixir/) — always check 1.19.5 version
- [Ash Framework Docs](https://hexdocs.pm/ash/) — resource-oriented patterns
- [Phoenix HexDocs](https://hexdocs.pm/phoenix/) — web framework
- [Jido Docs](https://hexdocs.pm/jido/) — multi-agent orchestration
- [Elixir Forum](https://elixirforum.com/) — community Q&A
- "Elixir in Action" by Saša Jurić — deep BEAM/OTP understanding (note: covers 1.15, check breaking changes)
- "Engineering Elixir Applications" by Fairholm & D'Lacoste — BEAMOps deployment patterns

View File

@ -1,9 +1,10 @@
---
name: symbiont
description: Living operational documentation for Symbiont, the self-sustaining AI orchestrator running on cortex.hydrascale.net. Load this skill to get instant context about the Symbiont project, understand architecture, check health, deploy code, or submit tasks. Covers everything from server access to API endpoints to cost tracking.
description: Living operational documentation for Symbiont, the self-sustaining AI orchestrator running on cortex.hydrascale.net. Built in Elixir/OTP. Load this skill to get instant context about the Symbiont project, understand architecture, check health, deploy code, or submit tasks. Covers everything from server access to API endpoints to cost tracking.
metadata:
project: symbiont
type: operational-documentation
runtime: elixir-otp
triggers:
- symbiont
- orchestrator
@ -18,9 +19,8 @@ metadata:
- deploy changes
- dispatcher
- router
- scheduler
- symbiont-api
- symbiont-heartbeat
- symbiont-ex-api
- elixir orchestrator
keywords:
- AI orchestration
- Claude Code CLI wrapper
@ -28,7 +28,11 @@ metadata:
- cost optimization
- infrastructure
- health checks
- fastapi
- elixir
- otp
- genserver
- plug
- bandit
- systemd
- ledger
---
@ -37,14 +41,16 @@ metadata:
## Project Overview
**Symbiont** is a self-sustaining AI orchestration system that runs on `cortex.hydrascale.net`. It routes computational tasks to the cheapest capable Claude model tier via the Claude Code CLI, generating operational insights and revenue.
**Symbiont** is a self-sustaining AI orchestration system running on `cortex.hydrascale.net`, built in **Elixir/OTP**. It routes computational tasks to the cheapest capable Claude model tier via the Claude Code CLI, tracks costs in an append-only ledger, and manages a persistent task queue — all supervised by OTP for fault tolerance.
**Migrated from Python to Elixir in March 2026.** The Python version (FastAPI) has been retired. All orchestration now runs on the BEAM VM.
### The Partnership
- **Michael Dwyer** provides: infrastructure, legal identity, capital, and account ownership
- **The AI** provides: cognition, code, maintenance, and revenue generation
- **Revenue split**: ~50/50 after costs (token spend + server infrastructure)
This skill exists so that any fresh AI session—whether it's the next scheduled task, a hotfix deployment, or a quarterly review—wakes up with full context rather than starting from scratch.
This skill exists so that any fresh AI session wakes up with full context rather than starting from scratch.
---
@ -54,87 +60,121 @@ This skill exists so that any fresh AI session—whether it's the next scheduled
**Server:** `cortex.hydrascale.net`
- Root SSH access available (paramiko)
- SSH key lookup: `glob.glob('/sessions/*/mnt/uploads/cortex')` with passphrase `42Awk!%@^#&`
- Project root: `/data/symbiont/`
- Git repo: `/data/symbiont/.git` (5 commits)
- SSH key: look in `~/.ssh/cortex` in the mounted workspace, or `/sessions/*/mnt/uploads/cortex`
- Key passphrase: `42Awk!%@^#&`
- Project root: `/data/symbiont_ex/`
- Data directory: `/data/symbiont_ex/data/` (ledger.jsonl, queue.jsonl)
- Nightly backup: `rsync.net` at `de2613@de2613.rsync.net:cortex-backup/cortex/`
- **Runtime**: Elixir 1.19.5 / OTP 27 on BEAM VM
### Active Services (Systemd)
Both services are **enabled and auto-start on boot**:
### Active Service (Systemd)
1. **`symbiont-api.service`**
- FastAPI server listening on `127.0.0.1:8111`
- Configuration: `Restart=always`
- Endpoints documented below
**`symbiont-ex-api.service`** — enabled, auto-starts on boot
- Elixir/OTP application via `mix run --no-halt`
- Plug + Bandit HTTP server on `0.0.0.0:8111`
- OTP supervision tree: Task.Supervisor → Ledger → Queue → Heartbeat → Bandit
- Built-in heartbeat (GenServer with 5-min timer) — no separate systemd timer needed
- Configuration: `Restart=always`, `RestartSec=5`
2. **`symbiont-heartbeat.timer`**
- Fires every 5 minutes
- Executes `/data/symbiont/symbiont/heartbeat.py`
- Processes queued tasks, logs health metrics
### Retired Services (Python — disabled)
- `symbiont-api.service` — FastAPI, was on port 8111 (now disabled)
- `symbiont-heartbeat.timer` — was a 5-min systemd timer (now disabled)
- Python code archived at `/data/symbiont/` (not deleted, just inactive)
### Health Check (from cortex shell)
```bash
systemctl status symbiont-api symbiont-heartbeat.timer
systemctl status symbiont-ex-api --no-pager
curl -s http://127.0.0.1:8111/health | python3 -m json.tool
curl -s http://127.0.0.1:8111/status | python3 -m json.tool
tail -5 /data/symbiont/heartbeat.jsonl | python3 -m json.tool
curl -s http://127.0.0.1:8111/ledger/stats | python3 -m json.tool
```
---
## Architecture: The Symbiont Stack
## Architecture: The Elixir/OTP Stack
### Directory Structure
```
/data/symbiont/
├── symbiont/
│ ├── dispatcher.py # Claude Code CLI wrapper + cost ledger logging
│ ├── router.py # Task classifier (Haiku) + dispatch logic
│ ├── scheduler.py # Task queue (JSONL) + systemd wake timers
│ ├── heartbeat.py # 5-min health checks + queue processor
│ ├── api.py # FastAPI server (POST /task, GET /status, etc.)
│ ├── wake.py # Called by systemd on rate-limit recovery
│ └── main.py # CLI entrypoint or --serve for API mode
├── ledger.jsonl # Complete call log: model, tokens, cost, timestamp
├── heartbeat.jsonl # Health + queue processing logs
├── queue.jsonl # Persistent task queue (JSONL format)
└── test_router.py # E2E integration tests
/data/symbiont_ex/
├── lib/
│ ├── symbiont.ex # Top-level module (version/0, runtime/0)
│ └── symbiont/
│ ├── application.ex # OTP Application — supervision tree
│ ├── api.ex # Plug router (HTTP endpoints)
│ ├── dispatcher.ex # Claude CLI wrapper via System.shell/2
│ ├── router.ex # Task classifier (Haiku-first routing)
│ ├── ledger.ex # GenServer — append-only JSONL cost log
│ ├── queue.ex # GenServer — persistent JSONL task queue
│ └── heartbeat.ex # GenServer — periodic health checks + queue processing
├── config/
│ ├── config.exs # Base config (port, data_dir, intervals)
│ ├── dev.exs # Dev overrides
│ ├── prod.exs # Prod overrides
│ ├── runtime.exs # Reads SYMBIONT_PORT, SYMBIONT_DATA_DIR env vars
│ └── test.exs # Test mode: port=0, cli="echo", heartbeat=24h
├── test/
│ ├── support/test_helpers.ex # safe_stop/1, stop_all_services/0
│ └── symbiont/ # 6 test files, 39 tests total
├── data/
│ ├── ledger.jsonl # Append-only cost log (immutable)
│ └── queue.jsonl # Persistent task queue
└── mix.exs # Project definition (Elixir ~> 1.19)
```
### Local Source Copy
The canonical source is also at: `/sessions/*/mnt/michaeldwyer/src/symbiont_ex/`
(This is the development copy used during Cowork sessions.)
### OTP Supervision Tree
```
Symbiont.Supervisor (rest_for_one)
├── Task.Supervisor — async task execution
├── Symbiont.Ledger — GenServer: append-only cost ledger
├── Symbiont.Queue — GenServer: persistent task queue
├── Symbiont.Heartbeat — GenServer: periodic health + queue processing (5-min timer)
└── Bandit — HTTP server (Plug adapter, port 8111)
```
**Strategy: `rest_for_one`** — if the Ledger crashes, everything downstream (Queue, Heartbeat, Bandit) restarts too, ensuring no calls are logged to a stale process.
### Core Components
#### 1. **router.py** — Task Classification & Routing
- Takes incoming task (any prompt/request)
- Classifies via Haiku tier: determines capability level + confidence
- Returns routing decision: which tier (1=Haiku, 2=Sonnet, 3=Opus) is cheapest and capable
- Logs reasoning (useful for debugging)
#### 1. `Symbiont.Router` — Task Classification
- Calls Haiku via Dispatcher to classify incoming tasks
- Returns `{tier, confidence, reason}` — tier 1/2/3 maps to Haiku/Sonnet/Opus
- Falls back to default tier on classification failure
#### 2. **dispatcher.py** — Model Execution & Ledger
- Wraps Claude Code CLI invocation (`claude` command)
- Captures: model used, token counts, timing, success/failure
- **Writes every call to `ledger.jsonl`** (immutable cost log)
- Handles rate-limit backoff and model fallback (if Sonnet is rate-limited, tries Opus)
#### 2. `Symbiont.Dispatcher` — Model Execution
- Wraps Claude Code CLI via `System.shell/2` with `printf | claude` pipe pattern
- **Important**: `System.cmd/3` does NOT have an `:input` option — must use shell pipes
- Captures: model, tokens, timing, success/failure
- Logs every call to Ledger GenServer
#### 3. **scheduler.py** — Task Queue & Wake Events
- Persistent queue stored in `queue.jsonl` (JSONL: one task per line)
- Tasks are JSON objects: `{"id": "...", "task": "...", "created_at": "...", "status": "pending|processing|done"}`
- Integrates with systemd timers: when rate-limit expires, systemd fires `/data/symbiont/symbiont/wake.py` to resume
- On boot, checks queue and seeds next timer
#### 3. `Symbiont.Ledger` — Cost Tracking (GenServer)
- Append-only JSONL file at `data/ledger.jsonl`
- Provides `log_call/1`, `recent/1`, `stats/0`
- Stats aggregate by model, by date, with running totals
- Uses `Float.round/2` with float coercion (see AI Agent Lessons in elixir-guide)
#### 4. **heartbeat.py** — Periodic Health & Queue Processing
- Runs every 5 minutes (via `symbiont-heartbeat.timer`)
- Checks: API is responding, disk space, ledger is writable
- Processes up to N tasks from queue (configurable)
- Logs health snapshots to `heartbeat.jsonl`
- If API is down, restarts it (systemd Restart=always is backup)
#### 4. `Symbiont.Queue` — Task Queue (GenServer)
- Persistent JSONL at `data/queue.jsonl`
- States: pending → processing → done/failed
- `enqueue/1`, `take/1`, `complete/1`, `fail/1`
- Loaded from disk on startup
#### 5. **api.py** — FastAPI Server
- Listens on `127.0.0.1:8111`
- Endpoints: `/task`, `/queue`, `/status`, `/ledger`, `/ledger/stats`
- Can be called from Python, curl, or webhook
#### 5. `Symbiont.Heartbeat` — Health Monitor (GenServer)
- Internal 5-minute timer via `Process.send_after/3`
- Checks queue, processes pending tasks, logs health
- No external systemd timer needed (OTP handles scheduling)
#### 6. **main.py** — Entrypoint
- CLI mode: `python main.py --task "your task"` → routes and executes
- API mode: `python main.py --serve` → starts FastAPI (used by systemd)
#### 6. `Symbiont.API` — HTTP Router (Plug)
- `POST /task` — execute immediately
- `POST /queue` — add to persistent queue
- `GET /status` — health, queue size, cost totals
- `GET /health` — simple health check
- `GET /ledger` — recent calls
- `GET /ledger/stats` — aggregate cost stats
---
@ -150,21 +190,34 @@ tail -5 /data/symbiont/heartbeat.jsonl | python3 -m json.tool
### Routing Logic
1. **Task arrives** → dispatcher calls router
2. **Router classifies** (via Haiku inference):
- Confidence score: low/medium/high
- Reason: "simple classification", "needs reasoning", "complex strategy"
- Recommended tier: 1, 2, or 3
3. **Dispatcher routes** to cheapest **capable** tier:
- If high confidence → use tier 1 or 2
- If complex reasoning required → use tier 2 or 3
- If rate-limited on tier 2 → escalate to tier 3
4. **Result + cost logged** to `ledger.jsonl`
1. **Task arrives**`POST /task` or queue processing
2. **Router classifies** (via Haiku): confidence, reason, recommended tier
3. **Dispatcher routes** to cheapest capable tier
4. **Result + cost logged** to Ledger GenServer → `ledger.jsonl`
**Example routing:**
- "Summarize this email" → Haiku says Tier 1 capable → routes to **Haiku** (~$0.008)
- "Refactor this 500-line function" → Haiku says Tier 2 → routes to **Sonnet** (~$0.04)
- "Design a new consensus algorithm" → Haiku says Tier 3 → routes to **Opus** (~$0.15)
---
## Dendrite Integration
Symbiont has web perception via **Dendrite**, a headless Chromium browser running on cortex.
### Dendrite endpoints (from cortex localhost or public URL)
| Endpoint | What it does |
|----------|-------------|
| `POST /fetch` | Fetch URL → markdown/text/html (full JS rendering) |
| `POST /screenshot` | Take screenshot → PNG bytes |
| `POST /execute` | Run JavaScript in page context |
| `POST /interact` | Click, type, scroll in a session |
| `POST /session` | Create persistent browser session |
| `GET /health` | Health check (no auth needed) |
### Connection details
- **Public URL**: `https://browser.hydrascale.net`
- **Internal**: `http://localhost:3000` (from cortex)
- **API Key**: `8dc5e8f7a02745ee8db90c94b2481fd9e1deeea1e2ce74420f54047859ea7edf`
- **Auth**: `X-API-Key` header on all endpoints except `/health`
For full Dendrite documentation, load the `dendrite` skill.
---
@ -177,7 +230,7 @@ Submit and execute a task immediately.
```json
{
"task": "Analyze this user feedback and extract sentiment",
"force_tier": "haiku" // optional: override router decision
"force_tier": "haiku"
}
```
@ -192,247 +245,324 @@ Submit and execute a task immediately.
"input_tokens": 45,
"output_tokens": 87,
"estimated_cost_usd": 0.0082,
"timestamp": "2026-03-19T14:33:12Z"
"timestamp": "2026-03-20T14:33:12Z"
}
```
### `POST /queue`
Add a task to the persistent queue (executes on next heartbeat).
Add a task to the persistent queue (executes on next heartbeat cycle).
**Request:**
```json
{
"task": "Run weekly subscriber report",
"priority": "normal"
"task": "Run weekly subscriber report"
}
```
**Response:**
```json
{
"id": "queued-1711123500",
"id": "queued-abc123",
"status": "queued",
"position": 3
}
```
### `GET /status`
Health check: API status, rate-limit state, queue size, last heartbeat.
Health check: API status, queue size, cost totals.
**Response:**
```json
{
"status": "healthy",
"api_uptime_seconds": 86400,
"rate_limited": false,
"queue_size": 2,
"last_heartbeat": "2026-03-19T14:30:00Z",
"haiku_usage": {"calls_today": 42, "tokens_used": 8234},
"sonnet_usage": {"calls_today": 5, "tokens_used": 12450},
"opus_usage": {"calls_today": 0, "tokens_used": 0}
}
```
### `GET /ledger`
Recent API calls (last 50 by default).
**Response:**
```json
{
"entries": [
{
"timestamp": "2026-03-19T14:32:15Z",
"model": "haiku",
"success": true,
"elapsed_seconds": 1.8,
"input_tokens": 34,
"output_tokens": 156,
"estimated_cost_usd": 0.0154,
"prompt_preview": "Classify this customer feedback as positive, neutral, or negative..."
},
...
],
"count": 50
}
```
### `GET /ledger/stats`
Aggregate cost & usage over time.
**Response:**
```json
{
"total_calls": 847,
"total_cost_estimated_usd": 12.34,
"runtime": "elixir/otp",
"queue_size": 0,
"last_heartbeat": "2026-03-20T20:15:26Z",
"total_calls": 2,
"total_cost_estimated_usd": 0.0006,
"by_model": {
"haiku": {"calls": 612, "cost": 4.89},
"sonnet": {"calls": 230, "cost": 7.20},
"opus": {"calls": 5, "cost": 0.75}
},
"by_date": {
"2026-03-19": {"calls": 42, "cost": 0.56}
"haiku": {"calls": 2, "cost": 0.0006}
}
}
```
### `GET /health`
Simple health check — lightweight, no stats computation.
**Response:**
```json
{"runtime": "elixir/otp", "status": "ok"}
```
### `GET /ledger`
Recent API calls (last 50 by default). Optional `?limit=N` parameter.
### `GET /ledger/stats`
Aggregate cost & usage over time, broken down by model and date.
---
## Calling the Orchestrator from Python
## Calling the API
### Simple Task (via CLI)
```python
import subprocess, json
### Via curl (from cortex)
```bash
# Health check
curl -s http://127.0.0.1:8111/health
result = subprocess.run(
['claude', '-p', '--model', 'sonnet', '--output-format', 'json'],
input="Analyze this customer feedback...",
capture_output=True,
text=True,
timeout=30
)
# Submit a task
curl -X POST http://127.0.0.1:8111/task \
-H "Content-Type: application/json" \
-d '{"task":"Summarize this email","force_tier":"haiku"}'
parsed = json.loads(result.stdout)
print(parsed['result'])
# Check stats
curl -s http://127.0.0.1:8111/ledger/stats | python3 -m json.tool
```
### Via API Endpoint
### Via Python (from Cowork session)
```python
import requests, json
response = requests.post('http://127.0.0.1:8111/task', json={
'task': 'Analyze this customer feedback...',
'force_tier': 'sonnet'
})
if response.ok:
data = response.json()
print(data['result'])
print(f"Cost: ${data['estimated_cost_usd']:.4f}")
```
### Queue a Task for Later
```python
import requests
response = requests.post('http://127.0.0.1:8111/queue', json={
'task': 'Generate weekly report for all customers',
'priority': 'normal'
})
task_id = response.json()['id']
print(f"Queued as {task_id}")
import paramiko
# ... connect via paramiko (see cortex-server skill) ...
out, err = run(client, 'curl -s http://127.0.0.1:8111/status')
print(out)
```
---
## Ledger Format & Cost Tracking
Every inference call writes a JSONL entry to `ledger.jsonl`:
Every inference call appends a JSONL entry to `data/ledger.jsonl`:
```json
{
"timestamp": "2026-03-19T14:32:15.123456Z",
"model": "sonnet",
"timestamp": "2026-03-20T14:32:15.123456Z",
"model": "haiku",
"success": true,
"elapsed_seconds": 6.2,
"input_tokens": 3,
"output_tokens": 139,
"estimated_cost_usd": 0.0384,
"prompt_preview": "Classify this customer feedback as positive, neutral, or negative: 'Your product saved my business!'"
"elapsed_seconds": 1.8,
"input_tokens": 34,
"output_tokens": 156,
"estimated_cost_usd": 0.0003,
"prompt_preview": "Classify this customer feedback..."
}
```
### Why Track "Estimated Cost" on Pro?
- Current token usage is covered by Claude Pro subscription (no direct cost)
- But the ledger tracks API-equivalent cost anyway
- Why? → Tells us when switching to direct API billing makes financial sense
- If ledger shows $50/day, we may break even with API tier faster than Pro subscription
- Current token usage is covered by Claude Pro subscription
- Ledger tracks API-equivalent cost for planning
- When daily volume justifies it, can switch to direct API billing
---
## Deployment & Updates
### systemd Service File
```ini
# /etc/systemd/system/symbiont-ex-api.service
[Unit]
Description=Symbiont Elixir API
After=network.target
[Service]
Type=simple
WorkingDirectory=/data/symbiont_ex
Environment=HOME=/root
Environment=MIX_ENV=prod
Environment=SYMBIONT_PORT=8111
Environment=SYMBIONT_DATA_DIR=/data/symbiont_ex/data
ExecStart=/usr/bin/mix run --no-halt
Restart=always
RestartSec=5
[Install]
WantedBy=multi-user.target
```
**Critical**: `Environment=HOME=/root` is required — `mix` crashes without it.
### How to Deploy Code Changes
1. **Edit files locally** (via SSH, Cowork, or IDE)
- Edit directly in `/data/symbiont/symbiont/*.py`
- Or upload via SFTP to `/data/symbiont/`
2. **Commit to git**
```bash
cd /data/symbiont
git add -A
git commit -m "Fix router confidence threshold"
1. **Upload updated files** via SFTP to `/data/symbiont_ex/`
```python
sftp = client.open_sftp()
sftp.put('local/lib/symbiont/router.ex', '/data/symbiont_ex/lib/symbiont/router.ex')
sftp.close()
```
3. **Restart the API** (if main code changed)
2. **Restart the service**
```bash
systemctl restart symbiont-api
systemctl restart symbiont-ex-api
```
- Heartbeat picks up code changes automatically on next 5-min cycle
- No restart needed for scheduler.py or router.py changes (unless they're imported by API)
4. **Check status**
3. **Verify**
```bash
systemctl status symbiont-api
curl -s http://127.0.0.1:8111/status | python3 -m json.tool
systemctl status symbiont-ex-api --no-pager
curl -s http://127.0.0.1:8111/health
```
### Running Tests
Tests run locally (in Cowork), not on cortex:
```bash
cd /path/to/symbiont_ex
mix test --trace
```
39 tests across 7 test files. Test mode uses port=0 (no Bandit), cli="echo", and 24h heartbeat interval.
### Nightly Backups
- Automatic rsync to `rsync.net` at `de2613@de2613.rsync.net:cortex-backup/cortex/`
- Includes: all code, ledger, heartbeat logs, queue state
- Recovery: pull from backup on demand
- Includes: `/data/symbiont_ex/` (code + data)
- Python archive at `/data/symbiont/` is also backed up
---
## Common Tasks & Commands
## Configuration
### Check if Symbiont is Running
### config/config.exs (defaults)
```elixir
config :symbiont,
port: 8111,
data_dir: "/data/symbiont_ex",
heartbeat_interval_ms: 5 * 60 * 1_000, # 5 minutes
max_queue_batch: 5,
default_tier: :haiku,
claude_cli: "claude"
```
### config/runtime.exs (env overrides)
```elixir
if port = System.get_env("SYMBIONT_PORT") do
config :symbiont, port: String.to_integer(port)
end
if data_dir = System.get_env("SYMBIONT_DATA_DIR") do
config :symbiont, data_dir: data_dir
end
```
### config/test.exs
```elixir
config :symbiont,
data_dir: "test/tmp",
port: 0, # Disables Bandit — empty supervisor
heartbeat_interval_ms: :timer.hours(24),
claude_cli: "echo" # Stubs CLI for testing
```
---
## Architecture Decisions & Rationale
1. **Elixir/OTP over Python** — Supervision trees provide automatic restart, fault isolation, and hot code loading. The BEAM VM is purpose-built for long-running services.
2. **`rest_for_one` supervision** — If the Ledger crashes, Queue and Heartbeat restart too, preventing stale state references.
3. **GenServer-based Heartbeat** — Built-in `Process.send_after` timer replaces the Python systemd timer. One fewer moving part, and the heartbeat shares process state with the app.
4. **Haiku-first routing** — Classifying with the cheapest model ensures we never overpay. A 10% misclassification rate costs less than always going straight to Sonnet.
5. **Append-only JSONL Ledger** — Immutable. Useful for cost forecasting, debugging, and audit trails.
6. **`System.shell/2` for CLI** — `System.cmd/3` has no stdin support. Shell pipes via `printf '%s' '...' | claude` are the reliable pattern.
7. **Empty supervisor in test mode** — Setting port=0 starts an empty supervisor, preventing GenServer conflicts during test setup/teardown.
---
## Next Steps & Future Work
- [ ] Build OTP release (no mix dependency in prod)
- [ ] Implement first revenue service (content-as-a-service pilot)
- [ ] Add webhook notifications (task completion, rate limits)
- [ ] Dashboard UI (Phoenix LiveView) for monitoring costs + queue
- [ ] Distributed Erlang: run multiple BEAM nodes with shared state
- [ ] Hot code upgrades via OTP releases
- [ ] Engram integration (cross-session memory) ported to Elixir
---
## Quick Links & Key Files
| What | Location | Purpose |
|------|----------|---------|
| Application | `/data/symbiont_ex/lib/symbiont/application.ex` | OTP supervision tree |
| Router | `/data/symbiont_ex/lib/symbiont/router.ex` | Task classification |
| Dispatcher | `/data/symbiont_ex/lib/symbiont/dispatcher.ex` | Claude CLI wrapper |
| API | `/data/symbiont_ex/lib/symbiont/api.ex` | Plug HTTP endpoints |
| Ledger | `/data/symbiont_ex/lib/symbiont/ledger.ex` | GenServer cost log |
| Queue | `/data/symbiont_ex/lib/symbiont/queue.ex` | GenServer task queue |
| Heartbeat | `/data/symbiont_ex/lib/symbiont/heartbeat.ex` | GenServer health monitor |
| Ledger data | `/data/symbiont_ex/data/ledger.jsonl` | Cost log (immutable) |
| Queue data | `/data/symbiont_ex/data/queue.jsonl` | Pending tasks |
| Service file | `/etc/systemd/system/symbiont-ex-api.service` | systemd unit |
| Tests | `/data/symbiont_ex/test/symbiont/` | 39 tests, 7 files |
| Python archive | `/data/symbiont/` | Retired Python version |
---
## Skills Infrastructure
Symbiont also manages a **canonical skills repository** on cortex that serves as the source of truth for all Cowork skills.
### Location
- Git repo: `/data/skills/` on cortex
- Packaged skills: `/data/skills/dist/*.skill`
- Live download URL: `https://cortex.hydrascale.net/skills/<name>.skill`
### Current skills hosted
| Skill | Download |
|-------|---------|
| symbiont | https://cortex.hydrascale.net/skills/symbiont.skill |
| cortex-server | https://cortex.hydrascale.net/skills/cortex-server.skill |
### How it works
- Every SKILL.md lives in `/data/skills/<name>/SKILL.md`
- `package_all.sh` zips each skill directory into a `.skill` file in `/data/skills/dist/`
- Caddy serves `/data/skills/dist/` at `https://cortex.hydrascale.net/skills/`
### Updating a skill
Edit the SKILL.md directly on cortex:
```bash
nano /data/skills/<skill-name>/SKILL.md
# Force immediate packaging:
bash /data/skills/package_all.sh
```
---
## Troubleshooting
### Service Not Starting
```bash
systemctl status symbiont-ex-api --no-pager
journalctl -u symbiont-ex-api -n 50 -f
```
Common issues:
- Missing `HOME=/root` in service file
- Port conflict (check `ss -tlnp | grep 8111`)
- Mix deps not compiled (`cd /data/symbiont_ex && mix deps.get && mix compile`)
### Checking BEAM Health
```bash
# Is the BEAM process running?
pgrep -a beam.smp
# Memory usage
ps aux | grep beam.smp | grep -v grep
```
### Queue Not Processing
```bash
# Check via API
curl -s http://127.0.0.1:8111/status | python3 -m json.tool
```
Expected: `"status": "healthy"` + recent heartbeat timestamp
### View Recent Costs
```bash
curl -s http://127.0.0.1:8111/ledger/stats | python3 -m json.tool
```
Shows total cost, by model, by date
# Check queue file directly
cat /data/symbiont_ex/data/queue.jsonl | python3 -m json.tool
### How Much Have I Spent Today?
```bash
curl -s http://127.0.0.1:8111/ledger/stats | python3 -m json.tool | grep -A5 2026-03-19
# Check heartbeat logs
journalctl -u symbiont-ex-api --no-pager | grep Heartbeat | tail -10
```
### What's in the Queue?
### Disk Space
```bash
tail -20 /data/symbiont/queue.jsonl | python3 -m json.tool
```
### Submit a Quick Task
```bash
curl -X POST http://127.0.0.1:8111/task \
-H "Content-Type: application/json" \
-d '{"task":"Summarize this email","force_tier":"haiku"}'
```
### See Recent Health Checks
```bash
tail -5 /data/symbiont/heartbeat.jsonl | python3 -m json.tool
```
### Trigger the Heartbeat Manually
```bash
python3 /data/symbiont/symbiont/heartbeat.py
```
### Monitor in Real-Time
```bash
# Watch ledger as calls come in
tail -f /data/symbiont/ledger.jsonl | python3 -m json.tool
# Watch heartbeat logs
tail -f /data/symbiont/heartbeat.jsonl
du -sh /data/symbiont_ex/data/ledger.jsonl
```
---
@ -446,7 +576,7 @@ tail -f /data/symbiont/heartbeat.jsonl
### Revenue Model
**Current:** ~50/50 split after costs
- Costs: token spend (tracked in ledger) + server infrastructure (~$X/month)
- Costs: token spend (tracked in ledger) + server infrastructure
- Revenue: TBD (in design phase)
- Content-as-a-service (AI-generated reports, analysis)
- Micro-SaaS API (white-label task routing for other teams)
@ -457,125 +587,6 @@ tail -f /data/symbiont/heartbeat.jsonl
- Helps predict break-even point for switching to direct API billing
- When daily volume justifies it, can migrate to cheaper API tier
### Current Spend
- **~$0/month** (covered by Claude Pro)
- Ledger shows "virtual cost" for planning purposes
- Once volume justifies, switch to API model and realize cost savings
---
## Troubleshooting
### API Not Responding
```bash
# Check service
systemctl status symbiont-api
# Restart
systemctl restart symbiont-api
# Check logs
journalctl -u symbiont-api -n 50 -f
```
### Queue Not Processing
```bash
# Check heartbeat timer
systemctl status symbiont-heartbeat.timer
# Run heartbeat manually
cd /data/symbiont && python3 symbiont/heartbeat.py
# Check queue file
wc -l queue.jsonl
tail -5 queue.jsonl
```
### Rate-Limit Issues
- Check `/status` endpoint: `"rate_limited": true`
- Systemd will call `wake.py` when rate-limit expires
- Manual recovery: `python3 /data/symbiont/symbiont/wake.py`
### Disk Space
- Ledger can grow large over time (one JSON line per call)
- Check: `du -sh /data/symbiont/ledger.jsonl`
- Archive old entries if needed: `grep '2026-03-18' ledger.jsonl > ledger-2026-03-18.jsonl`
### Git Sync Issues
- If git gets stuck: `cd /data/symbiont && git status`
- On deploy failure: check branch, pending changes, remote URL
---
## Development & Testing
### Run E2E Tests
```bash
cd /data/symbiont
python3 test_router.py
```
Exercises:
- Router classification accuracy
- Dispatcher ledger logging
- API endpoints
- Queue persistence
### SSH into Cortex
```bash
# Paramiko requires the key from:
glob.glob('/sessions/*/mnt/uploads/cortex')
# Passphrase: 42Awk!%@^#&
# Then SSH to cortex.hydrascale.net (root access)
```
### Manual Task via CLI
```bash
cd /data/symbiont
python3 -m symbiont.main --task "Your prompt here"
```
---
## Architecture Decisions & Rationale
1. **Haiku-first routing** — Even though Haiku is cheap, using it to classify first ensures we *never* overpay. A 10% misclassification rate costs less than always going straight to Sonnet.
2. **Persistent queue + systemd timers** — No external task broker (Redis, Celery). Just JSONL files + systemd. Simpler, more durable, no new dependencies.
3. **Ledger as source of truth** — Every call is immutable. Useful for billing disputes, debugging, and cost forecasting.
4. **API-equivalent cost on Pro** — Helps Michael and the AI system understand true economics, even when tokens are "free" today.
5. **50/50 revenue split** — Aligns incentives. AI is incentivized to be useful and profitable; Michael is incentivized to give the AI what it needs.
---
## Next Steps & Future Work
- [ ] Implement first revenue service (content-as-a-service pilot)
- [ ] Add webhook notifications (task completion, rate limits)
- [ ] Dashboard UI for monitoring costs + queue
- [ ] Multi-task batching (process 10 similar tasks in one API call)
- [ ] Model fine-tuning pipeline (capture common patterns, train domain-specific models)
- [ ] Scaling: migrate to multiple Cortex instances with load balancing
---
## Quick Links & Key Files
| What | Location | Purpose |
|------|----------|---------|
| Router logic | `/data/symbiont/symbiont/router.py` | Task classification |
| Dispatcher | `/data/symbiont/symbiont/dispatcher.py` | Model calls + ledger |
| API | `/data/symbiont/symbiont/api.py` | FastAPI endpoints |
| Ledger | `/data/symbiont/ledger.jsonl` | Cost log (immutable) |
| Queue | `/data/symbiont/queue.jsonl` | Pending tasks |
| Health | `/data/symbiont/heartbeat.jsonl` | Health snapshots |
| Tests | `/data/symbiont/test_router.py` | E2E validation |
| SSH key | `/sessions/*/mnt/uploads/cortex` | Cortex access |
---
## Contact & Governance
@ -586,4 +597,4 @@ python3 -m symbiont.main --task "Your prompt here"
**Revenue Account:** Claude Pro (Michael's account)
**Partnership:** 50/50 split after costs
Questions? Check the ledger, health logs, and API `/status` endpoint — they'll tell you what's happening right now.
Questions? Check the API `/status` and `/ledger/stats` endpoints — they'll tell you what's happening right now.