symbiont_ex/lib/symbiont/engram.ex

619 lines
18 KiB
Elixir

defmodule Symbiont.Engram do
@moduledoc """
Engram: Persistent memory across Claude instances.
An engram is the physical trace a memory leaves in neural tissue. Every Claude
session (Cowork, Claude Code, Desktop) writes its engrams here on startup,
building a shared memory of what's being worked on across the ecosystem.
Two-tier memory model:
- world_state: A singleton markdown document updated at the end of any session
that changes things. Read at the start of every session.
- session_logs: Detailed logs available via get_session_logs/2 only when needed.
SQLite with WAL mode handles concurrent readers cleanly. Each session writes
only its own rows, so writer contention is minimal.
## Usage
# Get situation report (call at start of every session)
Symbiont.Engram.sitrep()
# Register a session
{:ok, sid} = Symbiont.Engram.register("cowork", "Building the Elixir port")
# Log progress
:ok = Symbiont.Engram.log(sid, "Finished router module")
# Claim a resource
:ok = Symbiont.Engram.lock_resource(sid, "/data/symbiont_ex/lib/symbiont/router.ex")
# Check locks before modifying a file
locks = Symbiont.Engram.check_locks("/data/symbiont_ex/lib/symbiont/router.ex")
# Heartbeat (call periodically on long sessions)
:ok = Symbiont.Engram.heartbeat(sid, "Still working, 60% done")
# Update world state before completing
:ok = Symbiont.Engram.set_world_state("Updated content", sid)
# Done
:ok = Symbiont.Engram.complete(sid, "Finished Elixir port. Tests passing.")
"""
use GenServer
require Logger
@default_db_path "/data/symbiont/engram.db"
@stale_threshold_minutes 30
# ── Client API ──────────────────────────────────────────────────────────
def start_link(opts \\ []) do
db_path = Keyword.get(opts, :db_path, @default_db_path)
GenServer.start_link(__MODULE__, db_path, name: Keyword.get(opts, :name, __MODULE__))
end
@doc "Get the current world state markdown."
def get_world_state(server \\ __MODULE__) do
GenServer.call(server, :get_world_state)
end
@doc "Replace the world state. Called at the end of any session that changes things."
def set_world_state(content, updated_by \\ nil, server \\ __MODULE__) do
GenServer.call(server, {:set_world_state, content, updated_by})
end
@doc "Register a new session. Returns `{:ok, session_id}`."
def register(session_type, summary, metadata \\ nil, server \\ __MODULE__) do
GenServer.call(server, {:register, session_type, summary, metadata})
end
@doc "Update heartbeat timestamp and optionally update summary."
def heartbeat(session_id, summary \\ nil, server \\ __MODULE__) do
GenServer.call(server, {:heartbeat, session_id, summary})
end
@doc "Mark session as completed with a summary of what was accomplished."
def complete(session_id, completion_summary, server \\ __MODULE__) do
GenServer.call(server, {:complete, session_id, completion_summary})
end
@doc "Log a progress entry for a session."
def log(session_id, entry, server \\ __MODULE__) do
GenServer.call(server, {:log, session_id, entry})
end
@doc "Get all currently active sessions."
def get_active_sessions(server \\ __MODULE__) do
GenServer.call(server, :get_active_sessions)
end
@doc "Get recently completed sessions for context."
def get_recent_sessions(hours \\ 24, server \\ __MODULE__) do
GenServer.call(server, {:get_recent_sessions, hours})
end
@doc "Get log entries for a specific session."
def get_session_logs(session_id, limit \\ 20, server \\ __MODULE__) do
GenServer.call(server, {:get_session_logs, session_id, limit})
end
@doc "Claim a resource lock. Warns if already locked by another session."
def lock_resource(session_id, resource, note \\ nil, server \\ __MODULE__) do
GenServer.call(server, {:lock_resource, session_id, resource, note})
end
@doc "Release a resource lock."
def release_resource(session_id, resource, server \\ __MODULE__) do
GenServer.call(server, {:release_resource, session_id, resource})
end
@doc "Check who has locks on a resource."
def check_locks(resource, server \\ __MODULE__) do
GenServer.call(server, {:check_locks, resource})
end
@doc """
Context-safe situation report. Designed to fit in any Claude context window.
Returns world state + active session one-liners + locked resources.
"""
def sitrep(server \\ __MODULE__) do
GenServer.call(server, :sitrep)
end
@doc "Get a single session by ID."
def get_session(session_id, server \\ __MODULE__) do
GenServer.call(server, {:get_session, session_id})
end
@doc "Get summary stats: total sessions, active count, log count."
def stats(server \\ __MODULE__) do
GenServer.call(server, :stats)
end
# ── GenServer Callbacks ─────────────────────────────────────────────────
@impl true
def init(db_path) do
db_path
|> Path.dirname()
|> File.mkdir_p!()
{:ok, conn} = Exqlite.Sqlite3.open(db_path)
:ok = exec(conn, "PRAGMA journal_mode=WAL")
:ok = exec(conn, "PRAGMA busy_timeout=5000")
init_tables(conn)
Logger.info("Engram started — db: #{db_path}")
{:ok, %{conn: conn, db_path: db_path}}
end
@impl true
def handle_call(:get_world_state, _from, %{conn: conn} = state) do
result =
case query_one(conn, "SELECT content FROM world_state WHERE id=1") do
{:ok, [content]} -> content
_ -> ""
end
{:reply, result, state}
end
def handle_call({:set_world_state, content, updated_by}, _from, %{conn: conn} = state) do
now = now_iso()
:ok =
exec(
conn,
"INSERT INTO world_state (id, updated_at, updated_by, content) VALUES (1, ?1, ?2, ?3) " <>
"ON CONFLICT(id) DO UPDATE SET updated_at=excluded.updated_at, " <>
"updated_by=excluded.updated_by, content=excluded.content",
[now, updated_by, content]
)
{:reply, :ok, state}
end
def handle_call({:register, session_type, summary, metadata}, _from, %{conn: conn} = state) do
sid = generate_session_id()
now = now_iso()
:ok =
exec(
conn,
"INSERT INTO sessions (id, session_type, summary, status, started_at, last_heartbeat, metadata) " <>
"VALUES (?1, ?2, ?3, 'active', ?4, ?5, ?6)",
[sid, session_type, summary, now, now, metadata]
)
Logger.info("Engram session registered: #{sid} (#{session_type}) — #{summary}")
{:reply, {:ok, sid}, state}
end
def handle_call({:heartbeat, session_id, summary}, _from, %{conn: conn} = state) do
now = now_iso()
if summary do
exec(
conn,
"UPDATE sessions SET last_heartbeat=?1, summary=?2, status='active' WHERE id=?3",
[now, summary, session_id]
)
else
exec(
conn,
"UPDATE sessions SET last_heartbeat=?1, status='active' WHERE id=?2",
[now, session_id]
)
end
{:reply, :ok, state}
end
def handle_call({:complete, session_id, completion_summary}, _from, %{conn: conn} = state) do
now = now_iso()
exec(
conn,
"UPDATE sessions SET status='completed', completed_at=?1, completion_summary=?2 WHERE id=?3",
[now, completion_summary, session_id]
)
exec(conn, "DELETE FROM resource_locks WHERE session_id=?1", [session_id])
Logger.info("Engram session completed: #{session_id}")
{:reply, :ok, state}
end
def handle_call({:log, session_id, entry}, _from, %{conn: conn} = state) do
now = now_iso()
exec(
conn,
"INSERT INTO session_logs (session_id, timestamp, entry) VALUES (?1, ?2, ?3)",
[session_id, now, entry]
)
{:reply, :ok, state}
end
def handle_call(:get_active_sessions, _from, %{conn: conn} = state) do
{:ok, rows} =
query_all(
conn,
"SELECT id, session_type, summary, status, started_at, last_heartbeat, completed_at, completion_summary, metadata " <>
"FROM sessions WHERE status='active' ORDER BY last_heartbeat DESC"
)
sessions = Enum.map(rows, &row_to_session/1)
{:reply, sessions, state}
end
def handle_call({:get_recent_sessions, hours}, _from, %{conn: conn} = state) do
cutoff =
DateTime.utc_now()
|> DateTime.add(-hours * 3600, :second)
|> DateTime.to_iso8601()
{:ok, rows} =
query_all(
conn,
"SELECT id, session_type, summary, status, started_at, last_heartbeat, completed_at, completion_summary, metadata " <>
"FROM sessions WHERE status='completed' AND completed_at > ?1 ORDER BY completed_at DESC",
[cutoff]
)
sessions = Enum.map(rows, &row_to_session/1)
{:reply, sessions, state}
end
def handle_call({:get_session_logs, session_id, limit}, _from, %{conn: conn} = state) do
{:ok, rows} =
query_all(
conn,
"SELECT id, session_id, timestamp, entry FROM session_logs " <>
"WHERE session_id=?1 ORDER BY timestamp DESC LIMIT ?2",
[session_id, limit]
)
logs =
Enum.map(rows, fn [id, sid, ts, entry] ->
%{id: id, session_id: sid, timestamp: ts, entry: entry}
end)
{:reply, logs, state}
end
def handle_call({:lock_resource, session_id, resource, note}, _from, %{conn: conn} = state) do
# Check for existing locks by other sessions
{:ok, existing} =
query_all(
conn,
"SELECT rl.session_id FROM resource_locks rl " <>
"JOIN sessions s ON rl.session_id = s.id " <>
"WHERE rl.resource=?1 AND rl.session_id != ?2 AND s.status='active'",
[resource, session_id]
)
if existing != [] do
holders = Enum.map(existing, fn [sid] -> sid end)
Logger.warning("Engram: resource '#{resource}' already locked by: #{Enum.join(holders, ", ")}")
end
now = now_iso()
exec(
conn,
"INSERT OR REPLACE INTO resource_locks (resource, session_id, locked_at, note) " <>
"VALUES (?1, ?2, ?3, ?4)",
[resource, session_id, now, note]
)
{:reply, :ok, state}
end
def handle_call({:release_resource, session_id, resource}, _from, %{conn: conn} = state) do
exec(
conn,
"DELETE FROM resource_locks WHERE resource=?1 AND session_id=?2",
[resource, session_id]
)
{:reply, :ok, state}
end
def handle_call({:check_locks, resource}, _from, %{conn: conn} = state) do
{:ok, rows} =
query_all(
conn,
"SELECT rl.resource, rl.session_id, rl.locked_at, rl.note, s.summary, s.session_type " <>
"FROM resource_locks rl JOIN sessions s ON rl.session_id = s.id WHERE rl.resource=?1",
[resource]
)
locks =
Enum.map(rows, fn [resource, sid, locked_at, note, summary, stype] ->
%{
resource: resource,
session_id: sid,
locked_at: locked_at,
note: note,
summary: summary,
session_type: stype
}
end)
{:reply, locks, state}
end
def handle_call(:sitrep, _from, %{conn: conn} = state) do
lines = []
# World state
world =
case query_one(conn, "SELECT content FROM world_state WHERE id=1") do
{:ok, [content]} -> content
_ -> nil
end
lines = if world, do: lines ++ [world, ""], else: lines
# Active sessions
{:ok, active_rows} =
query_all(
conn,
"SELECT id, session_type, summary, last_heartbeat FROM sessions " <>
"WHERE status='active' ORDER BY last_heartbeat DESC"
)
lines =
if active_rows != [] do
header = "**Active sessions (#{length(active_rows)}):**"
session_lines =
Enum.map(active_rows, fn [id, stype, summary, last_hb] ->
stale = is_stale?(last_hb)
marker = if stale, do: "⚠ stale", else: ""
truncated = String.slice(summary || "", 0, 100)
"- #{marker} `#{id}` (#{stype}): #{truncated}"
end)
lines ++ [header | session_lines] ++ [""]
else
lines
end
# Resource locks
{:ok, lock_rows} =
query_all(
conn,
"SELECT rl.resource, rl.session_id FROM resource_locks rl " <>
"JOIN sessions s ON rl.session_id = s.id WHERE s.status='active'"
)
lines =
if lock_rows != [] do
lock_lines =
Enum.map(lock_rows, fn [resource, sid] ->
"- `#{resource}` by `#{sid}`"
end)
lines ++ ["**Locked resources:**" | lock_lines] ++ [""]
else
lines
end
report =
if lines == [] do
"No world state set yet. Call `Symbiont.Engram.set_world_state/2` to initialize."
else
Enum.join(lines, "\n")
end
{:reply, report, state}
end
def handle_call({:get_session, session_id}, _from, %{conn: conn} = state) do
result =
case query_one(
conn,
"SELECT id, session_type, summary, status, started_at, last_heartbeat, completed_at, completion_summary, metadata " <>
"FROM sessions WHERE id=?1",
[session_id]
) do
{:ok, row} -> {:ok, row_to_session(row)}
:empty -> {:error, :not_found}
end
{:reply, result, state}
end
def handle_call(:stats, _from, %{conn: conn} = state) do
{:ok, [total]} = query_one(conn, "SELECT COUNT(*) FROM sessions")
{:ok, [active]} = query_one(conn, "SELECT COUNT(*) FROM sessions WHERE status='active'")
{:ok, [completed]} = query_one(conn, "SELECT COUNT(*) FROM sessions WHERE status='completed'")
{:ok, [logs]} = query_one(conn, "SELECT COUNT(*) FROM session_logs")
{:ok, [locks]} = query_one(conn, "SELECT COUNT(*) FROM resource_locks")
{:reply,
%{
total_sessions: total,
active_sessions: active,
completed_sessions: completed,
total_logs: logs,
active_locks: locks
}, state}
end
@impl true
def terminate(_reason, %{conn: conn}) do
Exqlite.Sqlite3.close(conn)
:ok
end
# ── Private Helpers ─────────────────────────────────────────────────────
defp init_tables(conn) do
exec(conn, """
CREATE TABLE IF NOT EXISTS world_state (
id INTEGER PRIMARY KEY CHECK (id = 1),
updated_at TEXT NOT NULL,
updated_by TEXT,
content TEXT NOT NULL
)
""")
exec(conn, """
CREATE TABLE IF NOT EXISTS sessions (
id TEXT PRIMARY KEY,
session_type TEXT NOT NULL,
summary TEXT NOT NULL,
status TEXT NOT NULL DEFAULT 'active',
started_at TEXT NOT NULL,
last_heartbeat TEXT NOT NULL,
completed_at TEXT,
completion_summary TEXT,
metadata TEXT
)
""")
exec(conn, """
CREATE TABLE IF NOT EXISTS session_logs (
id INTEGER PRIMARY KEY AUTOINCREMENT,
session_id TEXT NOT NULL REFERENCES sessions(id),
timestamp TEXT NOT NULL,
entry TEXT NOT NULL
)
""")
exec(conn, """
CREATE TABLE IF NOT EXISTS resource_locks (
resource TEXT NOT NULL,
session_id TEXT NOT NULL REFERENCES sessions(id),
locked_at TEXT NOT NULL,
note TEXT,
PRIMARY KEY (resource, session_id)
)
""")
exec(conn, "CREATE INDEX IF NOT EXISTS idx_sessions_status ON sessions(status)")
exec(conn, "CREATE INDEX IF NOT EXISTS idx_logs_session ON session_logs(session_id)")
exec(conn, "CREATE INDEX IF NOT EXISTS idx_locks_resource ON resource_locks(resource)")
end
defp generate_session_id do
now = DateTime.utc_now()
hex = :crypto.strong_rand_bytes(4) |> Base.encode16(case: :lower)
now
|> DateTime.to_iso8601()
|> String.replace(~r/[T:\-]/, "")
|> String.slice(0, 14)
|> Kernel.<>("-#{hex}")
end
defp now_iso do
DateTime.utc_now() |> DateTime.to_iso8601()
end
defp is_stale?(last_heartbeat) do
# Parse timestamp — handles both UTC DateTime (new) and naive (legacy Python)
with {:error, _} <- parse_as_datetime(last_heartbeat),
{:error, _} <- parse_as_naive(last_heartbeat) do
false
else
{:ok, dt} -> DateTime.diff(DateTime.utc_now(), dt, :minute) > @stale_threshold_minutes
end
end
defp parse_as_datetime(str) do
case DateTime.from_iso8601(str) do
{:ok, dt, _offset} -> {:ok, dt}
_ -> {:error, :invalid}
end
end
defp parse_as_naive(str) do
case NaiveDateTime.from_iso8601(str) do
{:ok, ndt} -> {:ok, DateTime.from_naive!(ndt, "Etc/UTC")}
_ -> {:error, :invalid}
end
end
defp row_to_session([id, stype, summary, status, started, last_hb, completed, comp_summary, metadata]) do
%{
id: id,
session_type: stype,
summary: summary,
status: status,
started_at: started,
last_heartbeat: last_hb,
completed_at: completed,
completion_summary: comp_summary,
metadata: metadata
}
end
# ── SQLite Helpers ──────────────────────────────────────────────────────
defp exec(conn, sql, params \\ []) do
{:ok, stmt} = Exqlite.Sqlite3.prepare(conn, sql)
try do
if params != [] do
:ok = Exqlite.Sqlite3.bind(stmt, params)
end
case Exqlite.Sqlite3.step(conn, stmt) do
:done -> :ok
{:row, _} -> :ok
{:error, reason} -> raise "SQLite exec error: #{inspect(reason)}"
end
after
Exqlite.Sqlite3.release(conn, stmt)
end
end
defp query_one(conn, sql, params \\ []) do
{:ok, stmt} = Exqlite.Sqlite3.prepare(conn, sql)
try do
if params != [] do
:ok = Exqlite.Sqlite3.bind(stmt, params)
end
case Exqlite.Sqlite3.step(conn, stmt) do
{:row, row} -> {:ok, row}
:done -> :empty
{:error, reason} -> raise "SQLite query error: #{inspect(reason)}"
end
after
Exqlite.Sqlite3.release(conn, stmt)
end
end
defp query_all(conn, sql, params \\ []) do
{:ok, stmt} = Exqlite.Sqlite3.prepare(conn, sql)
try do
if params != [] do
:ok = Exqlite.Sqlite3.bind(stmt, params)
end
rows = collect_rows(conn, stmt, [])
{:ok, rows}
after
Exqlite.Sqlite3.release(conn, stmt)
end
end
defp collect_rows(conn, stmt, acc) do
case Exqlite.Sqlite3.step(conn, stmt) do
{:row, row} -> collect_rows(conn, stmt, [row | acc])
:done -> Enum.reverse(acc)
{:error, reason} -> raise "SQLite step error mid-collection: #{inspect(reason)}"
end
end
end