defmodule Symbiont.Engram do @moduledoc """ Engram: Persistent memory across Claude instances. An engram is the physical trace a memory leaves in neural tissue. Every Claude session (Cowork, Claude Code, Desktop) writes its engrams here on startup, building a shared memory of what's being worked on across the ecosystem. Two-tier memory model: - world_state: A singleton markdown document updated at the end of any session that changes things. Read at the start of every session. - session_logs: Detailed logs available via get_session_logs/2 only when needed. SQLite with WAL mode handles concurrent readers cleanly. Each session writes only its own rows, so writer contention is minimal. ## Usage # Get situation report (call at start of every session) Symbiont.Engram.sitrep() # Register a session {:ok, sid} = Symbiont.Engram.register("cowork", "Building the Elixir port") # Log progress :ok = Symbiont.Engram.log(sid, "Finished router module") # Claim a resource :ok = Symbiont.Engram.lock_resource(sid, "/data/symbiont_ex/lib/symbiont/router.ex") # Check locks before modifying a file locks = Symbiont.Engram.check_locks("/data/symbiont_ex/lib/symbiont/router.ex") # Heartbeat (call periodically on long sessions) :ok = Symbiont.Engram.heartbeat(sid, "Still working, 60% done") # Update world state before completing :ok = Symbiont.Engram.set_world_state("Updated content", sid) # Done :ok = Symbiont.Engram.complete(sid, "Finished Elixir port. Tests passing.") """ use GenServer require Logger @default_db_path "/data/symbiont/engram.db" @stale_threshold_minutes 30 # ── Client API ────────────────────────────────────────────────────────── def start_link(opts \\ []) do db_path = Keyword.get(opts, :db_path, @default_db_path) GenServer.start_link(__MODULE__, db_path, name: Keyword.get(opts, :name, __MODULE__)) end @doc "Get the current world state markdown." def get_world_state(server \\ __MODULE__) do GenServer.call(server, :get_world_state) end @doc "Replace the world state. Called at the end of any session that changes things." def set_world_state(content, updated_by \\ nil, server \\ __MODULE__) do GenServer.call(server, {:set_world_state, content, updated_by}) end @doc "Register a new session. Returns `{:ok, session_id}`." def register(session_type, summary, metadata \\ nil, server \\ __MODULE__) do GenServer.call(server, {:register, session_type, summary, metadata}) end @doc "Update heartbeat timestamp and optionally update summary." def heartbeat(session_id, summary \\ nil, server \\ __MODULE__) do GenServer.call(server, {:heartbeat, session_id, summary}) end @doc "Mark session as completed with a summary of what was accomplished." def complete(session_id, completion_summary, server \\ __MODULE__) do GenServer.call(server, {:complete, session_id, completion_summary}) end @doc "Log a progress entry for a session." def log(session_id, entry, server \\ __MODULE__) do GenServer.call(server, {:log, session_id, entry}) end @doc "Get all currently active sessions." def get_active_sessions(server \\ __MODULE__) do GenServer.call(server, :get_active_sessions) end @doc "Get recently completed sessions for context." def get_recent_sessions(hours \\ 24, server \\ __MODULE__) do GenServer.call(server, {:get_recent_sessions, hours}) end @doc "Get log entries for a specific session." def get_session_logs(session_id, limit \\ 20, server \\ __MODULE__) do GenServer.call(server, {:get_session_logs, session_id, limit}) end @doc "Claim a resource lock. Warns if already locked by another session." def lock_resource(session_id, resource, note \\ nil, server \\ __MODULE__) do GenServer.call(server, {:lock_resource, session_id, resource, note}) end @doc "Release a resource lock." def release_resource(session_id, resource, server \\ __MODULE__) do GenServer.call(server, {:release_resource, session_id, resource}) end @doc "Check who has locks on a resource." def check_locks(resource, server \\ __MODULE__) do GenServer.call(server, {:check_locks, resource}) end @doc """ Context-safe situation report. Designed to fit in any Claude context window. Returns world state + active session one-liners + locked resources. """ def sitrep(server \\ __MODULE__) do GenServer.call(server, :sitrep) end @doc "Get a single session by ID." def get_session(session_id, server \\ __MODULE__) do GenServer.call(server, {:get_session, session_id}) end @doc "Get summary stats: total sessions, active count, log count." def stats(server \\ __MODULE__) do GenServer.call(server, :stats) end # ── GenServer Callbacks ───────────────────────────────────────────────── @impl true def init(db_path) do db_path |> Path.dirname() |> File.mkdir_p!() {:ok, conn} = Exqlite.Sqlite3.open(db_path) :ok = exec(conn, "PRAGMA journal_mode=WAL") :ok = exec(conn, "PRAGMA busy_timeout=5000") init_tables(conn) Logger.info("Engram started — db: #{db_path}") {:ok, %{conn: conn, db_path: db_path}} end @impl true def handle_call(:get_world_state, _from, %{conn: conn} = state) do result = case query_one(conn, "SELECT content FROM world_state WHERE id=1") do {:ok, [content]} -> content _ -> "" end {:reply, result, state} end def handle_call({:set_world_state, content, updated_by}, _from, %{conn: conn} = state) do now = now_iso() :ok = exec( conn, "INSERT INTO world_state (id, updated_at, updated_by, content) VALUES (1, ?1, ?2, ?3) " <> "ON CONFLICT(id) DO UPDATE SET updated_at=excluded.updated_at, " <> "updated_by=excluded.updated_by, content=excluded.content", [now, updated_by, content] ) {:reply, :ok, state} end def handle_call({:register, session_type, summary, metadata}, _from, %{conn: conn} = state) do sid = generate_session_id() now = now_iso() :ok = exec( conn, "INSERT INTO sessions (id, session_type, summary, status, started_at, last_heartbeat, metadata) " <> "VALUES (?1, ?2, ?3, 'active', ?4, ?5, ?6)", [sid, session_type, summary, now, now, metadata] ) Logger.info("Engram session registered: #{sid} (#{session_type}) — #{summary}") {:reply, {:ok, sid}, state} end def handle_call({:heartbeat, session_id, summary}, _from, %{conn: conn} = state) do now = now_iso() if summary do exec( conn, "UPDATE sessions SET last_heartbeat=?1, summary=?2, status='active' WHERE id=?3", [now, summary, session_id] ) else exec( conn, "UPDATE sessions SET last_heartbeat=?1, status='active' WHERE id=?2", [now, session_id] ) end {:reply, :ok, state} end def handle_call({:complete, session_id, completion_summary}, _from, %{conn: conn} = state) do now = now_iso() exec( conn, "UPDATE sessions SET status='completed', completed_at=?1, completion_summary=?2 WHERE id=?3", [now, completion_summary, session_id] ) exec(conn, "DELETE FROM resource_locks WHERE session_id=?1", [session_id]) Logger.info("Engram session completed: #{session_id}") {:reply, :ok, state} end def handle_call({:log, session_id, entry}, _from, %{conn: conn} = state) do now = now_iso() exec( conn, "INSERT INTO session_logs (session_id, timestamp, entry) VALUES (?1, ?2, ?3)", [session_id, now, entry] ) {:reply, :ok, state} end def handle_call(:get_active_sessions, _from, %{conn: conn} = state) do {:ok, rows} = query_all( conn, "SELECT id, session_type, summary, status, started_at, last_heartbeat, completed_at, completion_summary, metadata " <> "FROM sessions WHERE status='active' ORDER BY last_heartbeat DESC" ) sessions = Enum.map(rows, &row_to_session/1) {:reply, sessions, state} end def handle_call({:get_recent_sessions, hours}, _from, %{conn: conn} = state) do cutoff = DateTime.utc_now() |> DateTime.add(-hours * 3600, :second) |> DateTime.to_iso8601() {:ok, rows} = query_all( conn, "SELECT id, session_type, summary, status, started_at, last_heartbeat, completed_at, completion_summary, metadata " <> "FROM sessions WHERE status='completed' AND completed_at > ?1 ORDER BY completed_at DESC", [cutoff] ) sessions = Enum.map(rows, &row_to_session/1) {:reply, sessions, state} end def handle_call({:get_session_logs, session_id, limit}, _from, %{conn: conn} = state) do {:ok, rows} = query_all( conn, "SELECT id, session_id, timestamp, entry FROM session_logs " <> "WHERE session_id=?1 ORDER BY timestamp DESC LIMIT ?2", [session_id, limit] ) logs = Enum.map(rows, fn [id, sid, ts, entry] -> %{id: id, session_id: sid, timestamp: ts, entry: entry} end) {:reply, logs, state} end def handle_call({:lock_resource, session_id, resource, note}, _from, %{conn: conn} = state) do # Check for existing locks by other sessions {:ok, existing} = query_all( conn, "SELECT rl.session_id FROM resource_locks rl " <> "JOIN sessions s ON rl.session_id = s.id " <> "WHERE rl.resource=?1 AND rl.session_id != ?2 AND s.status='active'", [resource, session_id] ) if existing != [] do holders = Enum.map(existing, fn [sid] -> sid end) Logger.warning("Engram: resource '#{resource}' already locked by: #{Enum.join(holders, ", ")}") end now = now_iso() exec( conn, "INSERT OR REPLACE INTO resource_locks (resource, session_id, locked_at, note) " <> "VALUES (?1, ?2, ?3, ?4)", [resource, session_id, now, note] ) {:reply, :ok, state} end def handle_call({:release_resource, session_id, resource}, _from, %{conn: conn} = state) do exec( conn, "DELETE FROM resource_locks WHERE resource=?1 AND session_id=?2", [resource, session_id] ) {:reply, :ok, state} end def handle_call({:check_locks, resource}, _from, %{conn: conn} = state) do {:ok, rows} = query_all( conn, "SELECT rl.resource, rl.session_id, rl.locked_at, rl.note, s.summary, s.session_type " <> "FROM resource_locks rl JOIN sessions s ON rl.session_id = s.id WHERE rl.resource=?1", [resource] ) locks = Enum.map(rows, fn [resource, sid, locked_at, note, summary, stype] -> %{ resource: resource, session_id: sid, locked_at: locked_at, note: note, summary: summary, session_type: stype } end) {:reply, locks, state} end def handle_call(:sitrep, _from, %{conn: conn} = state) do lines = [] # World state world = case query_one(conn, "SELECT content FROM world_state WHERE id=1") do {:ok, [content]} -> content _ -> nil end lines = if world, do: lines ++ [world, ""], else: lines # Active sessions {:ok, active_rows} = query_all( conn, "SELECT id, session_type, summary, last_heartbeat FROM sessions " <> "WHERE status='active' ORDER BY last_heartbeat DESC" ) lines = if active_rows != [] do header = "**Active sessions (#{length(active_rows)}):**" session_lines = Enum.map(active_rows, fn [id, stype, summary, last_hb] -> stale = is_stale?(last_hb) marker = if stale, do: "⚠ stale", else: "●" truncated = String.slice(summary || "", 0, 100) "- #{marker} `#{id}` (#{stype}): #{truncated}" end) lines ++ [header | session_lines] ++ [""] else lines end # Resource locks {:ok, lock_rows} = query_all( conn, "SELECT rl.resource, rl.session_id FROM resource_locks rl " <> "JOIN sessions s ON rl.session_id = s.id WHERE s.status='active'" ) lines = if lock_rows != [] do lock_lines = Enum.map(lock_rows, fn [resource, sid] -> "- `#{resource}` by `#{sid}`" end) lines ++ ["**Locked resources:**" | lock_lines] ++ [""] else lines end report = if lines == [] do "No world state set yet. Call `Symbiont.Engram.set_world_state/2` to initialize." else Enum.join(lines, "\n") end {:reply, report, state} end def handle_call({:get_session, session_id}, _from, %{conn: conn} = state) do result = case query_one( conn, "SELECT id, session_type, summary, status, started_at, last_heartbeat, completed_at, completion_summary, metadata " <> "FROM sessions WHERE id=?1", [session_id] ) do {:ok, row} -> {:ok, row_to_session(row)} :empty -> {:error, :not_found} end {:reply, result, state} end def handle_call(:stats, _from, %{conn: conn} = state) do {:ok, [total]} = query_one(conn, "SELECT COUNT(*) FROM sessions") {:ok, [active]} = query_one(conn, "SELECT COUNT(*) FROM sessions WHERE status='active'") {:ok, [completed]} = query_one(conn, "SELECT COUNT(*) FROM sessions WHERE status='completed'") {:ok, [logs]} = query_one(conn, "SELECT COUNT(*) FROM session_logs") {:ok, [locks]} = query_one(conn, "SELECT COUNT(*) FROM resource_locks") {:reply, %{ total_sessions: total, active_sessions: active, completed_sessions: completed, total_logs: logs, active_locks: locks }, state} end @impl true def terminate(_reason, %{conn: conn}) do Exqlite.Sqlite3.close(conn) :ok end # ── Private Helpers ───────────────────────────────────────────────────── defp init_tables(conn) do exec(conn, """ CREATE TABLE IF NOT EXISTS world_state ( id INTEGER PRIMARY KEY CHECK (id = 1), updated_at TEXT NOT NULL, updated_by TEXT, content TEXT NOT NULL ) """) exec(conn, """ CREATE TABLE IF NOT EXISTS sessions ( id TEXT PRIMARY KEY, session_type TEXT NOT NULL, summary TEXT NOT NULL, status TEXT NOT NULL DEFAULT 'active', started_at TEXT NOT NULL, last_heartbeat TEXT NOT NULL, completed_at TEXT, completion_summary TEXT, metadata TEXT ) """) exec(conn, """ CREATE TABLE IF NOT EXISTS session_logs ( id INTEGER PRIMARY KEY AUTOINCREMENT, session_id TEXT NOT NULL REFERENCES sessions(id), timestamp TEXT NOT NULL, entry TEXT NOT NULL ) """) exec(conn, """ CREATE TABLE IF NOT EXISTS resource_locks ( resource TEXT NOT NULL, session_id TEXT NOT NULL REFERENCES sessions(id), locked_at TEXT NOT NULL, note TEXT, PRIMARY KEY (resource, session_id) ) """) exec(conn, "CREATE INDEX IF NOT EXISTS idx_sessions_status ON sessions(status)") exec(conn, "CREATE INDEX IF NOT EXISTS idx_logs_session ON session_logs(session_id)") exec(conn, "CREATE INDEX IF NOT EXISTS idx_locks_resource ON resource_locks(resource)") end defp generate_session_id do now = NaiveDateTime.local_now() hex = :crypto.strong_rand_bytes(4) |> Base.encode16(case: :lower) now |> NaiveDateTime.to_iso8601() |> String.replace(~r/[T:\-]/, "") |> String.slice(0, 14) |> Kernel.<>("-#{hex}") end defp now_iso do NaiveDateTime.local_now() |> NaiveDateTime.to_iso8601() end defp is_stale?(last_heartbeat) do case NaiveDateTime.from_iso8601(last_heartbeat) do {:ok, dt} -> NaiveDateTime.diff(NaiveDateTime.local_now(), dt, :minute) > @stale_threshold_minutes _ -> false end end defp row_to_session([id, stype, summary, status, started, last_hb, completed, comp_summary, metadata]) do %{ id: id, session_type: stype, summary: summary, status: status, started_at: started, last_heartbeat: last_hb, completed_at: completed, completion_summary: comp_summary, metadata: metadata } end # ── SQLite Helpers ────────────────────────────────────────────────────── defp exec(conn, sql, params \\ []) do {:ok, stmt} = Exqlite.Sqlite3.prepare(conn, sql) if params != [] do :ok = Exqlite.Sqlite3.bind(stmt, params) end case Exqlite.Sqlite3.step(conn, stmt) do :done -> :ok {:row, _} -> :ok {:error, reason} -> {:error, reason} end after :ok end defp query_one(conn, sql, params \\ []) do {:ok, stmt} = Exqlite.Sqlite3.prepare(conn, sql) if params != [] do :ok = Exqlite.Sqlite3.bind(stmt, params) end case Exqlite.Sqlite3.step(conn, stmt) do {:row, row} -> {:ok, row} :done -> :empty {:error, reason} -> {:error, reason} end end defp query_all(conn, sql, params \\ []) do {:ok, stmt} = Exqlite.Sqlite3.prepare(conn, sql) if params != [] do :ok = Exqlite.Sqlite3.bind(stmt, params) end rows = collect_rows(conn, stmt, []) {:ok, rows} end defp collect_rows(conn, stmt, acc) do case Exqlite.Sqlite3.step(conn, stmt) do {:row, row} -> collect_rows(conn, stmt, [row | acc]) :done -> Enum.reverse(acc) {:error, _reason} -> Enum.reverse(acc) end end end