619 lines
18 KiB
Elixir
619 lines
18 KiB
Elixir
defmodule Symbiont.Engram do
|
|
@moduledoc """
|
|
Engram: Persistent memory across Claude instances.
|
|
|
|
An engram is the physical trace a memory leaves in neural tissue. Every Claude
|
|
session (Cowork, Claude Code, Desktop) writes its engrams here on startup,
|
|
building a shared memory of what's being worked on across the ecosystem.
|
|
|
|
Two-tier memory model:
|
|
- world_state: A singleton markdown document updated at the end of any session
|
|
that changes things. Read at the start of every session.
|
|
- session_logs: Detailed logs available via get_session_logs/2 only when needed.
|
|
|
|
SQLite with WAL mode handles concurrent readers cleanly. Each session writes
|
|
only its own rows, so writer contention is minimal.
|
|
|
|
## Usage
|
|
|
|
# Get situation report (call at start of every session)
|
|
Symbiont.Engram.sitrep()
|
|
|
|
# Register a session
|
|
{:ok, sid} = Symbiont.Engram.register("cowork", "Building the Elixir port")
|
|
|
|
# Log progress
|
|
:ok = Symbiont.Engram.log(sid, "Finished router module")
|
|
|
|
# Claim a resource
|
|
:ok = Symbiont.Engram.lock_resource(sid, "/data/symbiont_ex/lib/symbiont/router.ex")
|
|
|
|
# Check locks before modifying a file
|
|
locks = Symbiont.Engram.check_locks("/data/symbiont_ex/lib/symbiont/router.ex")
|
|
|
|
# Heartbeat (call periodically on long sessions)
|
|
:ok = Symbiont.Engram.heartbeat(sid, "Still working, 60% done")
|
|
|
|
# Update world state before completing
|
|
:ok = Symbiont.Engram.set_world_state("Updated content", sid)
|
|
|
|
# Done
|
|
:ok = Symbiont.Engram.complete(sid, "Finished Elixir port. Tests passing.")
|
|
"""
|
|
|
|
use GenServer
|
|
require Logger
|
|
|
|
@default_db_path "/data/symbiont/engram.db"
|
|
@stale_threshold_minutes 30
|
|
|
|
# ── Client API ──────────────────────────────────────────────────────────
|
|
|
|
def start_link(opts \\ []) do
|
|
db_path = Keyword.get(opts, :db_path, @default_db_path)
|
|
GenServer.start_link(__MODULE__, db_path, name: Keyword.get(opts, :name, __MODULE__))
|
|
end
|
|
|
|
@doc "Get the current world state markdown."
|
|
def get_world_state(server \\ __MODULE__) do
|
|
GenServer.call(server, :get_world_state)
|
|
end
|
|
|
|
@doc "Replace the world state. Called at the end of any session that changes things."
|
|
def set_world_state(content, updated_by \\ nil, server \\ __MODULE__) do
|
|
GenServer.call(server, {:set_world_state, content, updated_by})
|
|
end
|
|
|
|
@doc "Register a new session. Returns `{:ok, session_id}`."
|
|
def register(session_type, summary, metadata \\ nil, server \\ __MODULE__) do
|
|
GenServer.call(server, {:register, session_type, summary, metadata})
|
|
end
|
|
|
|
@doc "Update heartbeat timestamp and optionally update summary."
|
|
def heartbeat(session_id, summary \\ nil, server \\ __MODULE__) do
|
|
GenServer.call(server, {:heartbeat, session_id, summary})
|
|
end
|
|
|
|
@doc "Mark session as completed with a summary of what was accomplished."
|
|
def complete(session_id, completion_summary, server \\ __MODULE__) do
|
|
GenServer.call(server, {:complete, session_id, completion_summary})
|
|
end
|
|
|
|
@doc "Log a progress entry for a session."
|
|
def log(session_id, entry, server \\ __MODULE__) do
|
|
GenServer.call(server, {:log, session_id, entry})
|
|
end
|
|
|
|
@doc "Get all currently active sessions."
|
|
def get_active_sessions(server \\ __MODULE__) do
|
|
GenServer.call(server, :get_active_sessions)
|
|
end
|
|
|
|
@doc "Get recently completed sessions for context."
|
|
def get_recent_sessions(hours \\ 24, server \\ __MODULE__) do
|
|
GenServer.call(server, {:get_recent_sessions, hours})
|
|
end
|
|
|
|
@doc "Get log entries for a specific session."
|
|
def get_session_logs(session_id, limit \\ 20, server \\ __MODULE__) do
|
|
GenServer.call(server, {:get_session_logs, session_id, limit})
|
|
end
|
|
|
|
@doc "Claim a resource lock. Warns if already locked by another session."
|
|
def lock_resource(session_id, resource, note \\ nil, server \\ __MODULE__) do
|
|
GenServer.call(server, {:lock_resource, session_id, resource, note})
|
|
end
|
|
|
|
@doc "Release a resource lock."
|
|
def release_resource(session_id, resource, server \\ __MODULE__) do
|
|
GenServer.call(server, {:release_resource, session_id, resource})
|
|
end
|
|
|
|
@doc "Check who has locks on a resource."
|
|
def check_locks(resource, server \\ __MODULE__) do
|
|
GenServer.call(server, {:check_locks, resource})
|
|
end
|
|
|
|
@doc """
|
|
Context-safe situation report. Designed to fit in any Claude context window.
|
|
Returns world state + active session one-liners + locked resources.
|
|
"""
|
|
def sitrep(server \\ __MODULE__) do
|
|
GenServer.call(server, :sitrep)
|
|
end
|
|
|
|
@doc "Get a single session by ID."
|
|
def get_session(session_id, server \\ __MODULE__) do
|
|
GenServer.call(server, {:get_session, session_id})
|
|
end
|
|
|
|
@doc "Get summary stats: total sessions, active count, log count."
|
|
def stats(server \\ __MODULE__) do
|
|
GenServer.call(server, :stats)
|
|
end
|
|
|
|
# ── GenServer Callbacks ─────────────────────────────────────────────────
|
|
|
|
@impl true
|
|
def init(db_path) do
|
|
db_path
|
|
|> Path.dirname()
|
|
|> File.mkdir_p!()
|
|
|
|
{:ok, conn} = Exqlite.Sqlite3.open(db_path)
|
|
:ok = exec(conn, "PRAGMA journal_mode=WAL")
|
|
:ok = exec(conn, "PRAGMA busy_timeout=5000")
|
|
init_tables(conn)
|
|
|
|
Logger.info("Engram started — db: #{db_path}")
|
|
{:ok, %{conn: conn, db_path: db_path}}
|
|
end
|
|
|
|
@impl true
|
|
def handle_call(:get_world_state, _from, %{conn: conn} = state) do
|
|
result =
|
|
case query_one(conn, "SELECT content FROM world_state WHERE id=1") do
|
|
{:ok, [content]} -> content
|
|
_ -> ""
|
|
end
|
|
|
|
{:reply, result, state}
|
|
end
|
|
|
|
def handle_call({:set_world_state, content, updated_by}, _from, %{conn: conn} = state) do
|
|
now = now_iso()
|
|
|
|
:ok =
|
|
exec(
|
|
conn,
|
|
"INSERT INTO world_state (id, updated_at, updated_by, content) VALUES (1, ?1, ?2, ?3) " <>
|
|
"ON CONFLICT(id) DO UPDATE SET updated_at=excluded.updated_at, " <>
|
|
"updated_by=excluded.updated_by, content=excluded.content",
|
|
[now, updated_by, content]
|
|
)
|
|
|
|
{:reply, :ok, state}
|
|
end
|
|
|
|
def handle_call({:register, session_type, summary, metadata}, _from, %{conn: conn} = state) do
|
|
sid = generate_session_id()
|
|
now = now_iso()
|
|
|
|
:ok =
|
|
exec(
|
|
conn,
|
|
"INSERT INTO sessions (id, session_type, summary, status, started_at, last_heartbeat, metadata) " <>
|
|
"VALUES (?1, ?2, ?3, 'active', ?4, ?5, ?6)",
|
|
[sid, session_type, summary, now, now, metadata]
|
|
)
|
|
|
|
Logger.info("Engram session registered: #{sid} (#{session_type}) — #{summary}")
|
|
{:reply, {:ok, sid}, state}
|
|
end
|
|
|
|
def handle_call({:heartbeat, session_id, summary}, _from, %{conn: conn} = state) do
|
|
now = now_iso()
|
|
|
|
if summary do
|
|
exec(
|
|
conn,
|
|
"UPDATE sessions SET last_heartbeat=?1, summary=?2, status='active' WHERE id=?3",
|
|
[now, summary, session_id]
|
|
)
|
|
else
|
|
exec(
|
|
conn,
|
|
"UPDATE sessions SET last_heartbeat=?1, status='active' WHERE id=?2",
|
|
[now, session_id]
|
|
)
|
|
end
|
|
|
|
{:reply, :ok, state}
|
|
end
|
|
|
|
def handle_call({:complete, session_id, completion_summary}, _from, %{conn: conn} = state) do
|
|
now = now_iso()
|
|
|
|
exec(
|
|
conn,
|
|
"UPDATE sessions SET status='completed', completed_at=?1, completion_summary=?2 WHERE id=?3",
|
|
[now, completion_summary, session_id]
|
|
)
|
|
|
|
exec(conn, "DELETE FROM resource_locks WHERE session_id=?1", [session_id])
|
|
|
|
Logger.info("Engram session completed: #{session_id}")
|
|
{:reply, :ok, state}
|
|
end
|
|
|
|
def handle_call({:log, session_id, entry}, _from, %{conn: conn} = state) do
|
|
now = now_iso()
|
|
|
|
exec(
|
|
conn,
|
|
"INSERT INTO session_logs (session_id, timestamp, entry) VALUES (?1, ?2, ?3)",
|
|
[session_id, now, entry]
|
|
)
|
|
|
|
{:reply, :ok, state}
|
|
end
|
|
|
|
def handle_call(:get_active_sessions, _from, %{conn: conn} = state) do
|
|
{:ok, rows} =
|
|
query_all(
|
|
conn,
|
|
"SELECT id, session_type, summary, status, started_at, last_heartbeat, completed_at, completion_summary, metadata " <>
|
|
"FROM sessions WHERE status='active' ORDER BY last_heartbeat DESC"
|
|
)
|
|
|
|
sessions = Enum.map(rows, &row_to_session/1)
|
|
{:reply, sessions, state}
|
|
end
|
|
|
|
def handle_call({:get_recent_sessions, hours}, _from, %{conn: conn} = state) do
|
|
cutoff =
|
|
DateTime.utc_now()
|
|
|> DateTime.add(-hours * 3600, :second)
|
|
|> DateTime.to_iso8601()
|
|
|
|
{:ok, rows} =
|
|
query_all(
|
|
conn,
|
|
"SELECT id, session_type, summary, status, started_at, last_heartbeat, completed_at, completion_summary, metadata " <>
|
|
"FROM sessions WHERE status='completed' AND completed_at > ?1 ORDER BY completed_at DESC",
|
|
[cutoff]
|
|
)
|
|
|
|
sessions = Enum.map(rows, &row_to_session/1)
|
|
{:reply, sessions, state}
|
|
end
|
|
|
|
def handle_call({:get_session_logs, session_id, limit}, _from, %{conn: conn} = state) do
|
|
{:ok, rows} =
|
|
query_all(
|
|
conn,
|
|
"SELECT id, session_id, timestamp, entry FROM session_logs " <>
|
|
"WHERE session_id=?1 ORDER BY timestamp DESC LIMIT ?2",
|
|
[session_id, limit]
|
|
)
|
|
|
|
logs =
|
|
Enum.map(rows, fn [id, sid, ts, entry] ->
|
|
%{id: id, session_id: sid, timestamp: ts, entry: entry}
|
|
end)
|
|
|
|
{:reply, logs, state}
|
|
end
|
|
|
|
def handle_call({:lock_resource, session_id, resource, note}, _from, %{conn: conn} = state) do
|
|
# Check for existing locks by other sessions
|
|
{:ok, existing} =
|
|
query_all(
|
|
conn,
|
|
"SELECT rl.session_id FROM resource_locks rl " <>
|
|
"JOIN sessions s ON rl.session_id = s.id " <>
|
|
"WHERE rl.resource=?1 AND rl.session_id != ?2 AND s.status='active'",
|
|
[resource, session_id]
|
|
)
|
|
|
|
if existing != [] do
|
|
holders = Enum.map(existing, fn [sid] -> sid end)
|
|
Logger.warning("Engram: resource '#{resource}' already locked by: #{Enum.join(holders, ", ")}")
|
|
end
|
|
|
|
now = now_iso()
|
|
|
|
exec(
|
|
conn,
|
|
"INSERT OR REPLACE INTO resource_locks (resource, session_id, locked_at, note) " <>
|
|
"VALUES (?1, ?2, ?3, ?4)",
|
|
[resource, session_id, now, note]
|
|
)
|
|
|
|
{:reply, :ok, state}
|
|
end
|
|
|
|
def handle_call({:release_resource, session_id, resource}, _from, %{conn: conn} = state) do
|
|
exec(
|
|
conn,
|
|
"DELETE FROM resource_locks WHERE resource=?1 AND session_id=?2",
|
|
[resource, session_id]
|
|
)
|
|
|
|
{:reply, :ok, state}
|
|
end
|
|
|
|
def handle_call({:check_locks, resource}, _from, %{conn: conn} = state) do
|
|
{:ok, rows} =
|
|
query_all(
|
|
conn,
|
|
"SELECT rl.resource, rl.session_id, rl.locked_at, rl.note, s.summary, s.session_type " <>
|
|
"FROM resource_locks rl JOIN sessions s ON rl.session_id = s.id WHERE rl.resource=?1",
|
|
[resource]
|
|
)
|
|
|
|
locks =
|
|
Enum.map(rows, fn [resource, sid, locked_at, note, summary, stype] ->
|
|
%{
|
|
resource: resource,
|
|
session_id: sid,
|
|
locked_at: locked_at,
|
|
note: note,
|
|
summary: summary,
|
|
session_type: stype
|
|
}
|
|
end)
|
|
|
|
{:reply, locks, state}
|
|
end
|
|
|
|
def handle_call(:sitrep, _from, %{conn: conn} = state) do
|
|
lines = []
|
|
|
|
# World state
|
|
world =
|
|
case query_one(conn, "SELECT content FROM world_state WHERE id=1") do
|
|
{:ok, [content]} -> content
|
|
_ -> nil
|
|
end
|
|
|
|
lines = if world, do: lines ++ [world, ""], else: lines
|
|
|
|
# Active sessions
|
|
{:ok, active_rows} =
|
|
query_all(
|
|
conn,
|
|
"SELECT id, session_type, summary, last_heartbeat FROM sessions " <>
|
|
"WHERE status='active' ORDER BY last_heartbeat DESC"
|
|
)
|
|
|
|
lines =
|
|
if active_rows != [] do
|
|
header = "**Active sessions (#{length(active_rows)}):**"
|
|
|
|
session_lines =
|
|
Enum.map(active_rows, fn [id, stype, summary, last_hb] ->
|
|
stale = is_stale?(last_hb)
|
|
marker = if stale, do: "⚠ stale", else: "●"
|
|
truncated = String.slice(summary || "", 0, 100)
|
|
"- #{marker} `#{id}` (#{stype}): #{truncated}"
|
|
end)
|
|
|
|
lines ++ [header | session_lines] ++ [""]
|
|
else
|
|
lines
|
|
end
|
|
|
|
# Resource locks
|
|
{:ok, lock_rows} =
|
|
query_all(
|
|
conn,
|
|
"SELECT rl.resource, rl.session_id FROM resource_locks rl " <>
|
|
"JOIN sessions s ON rl.session_id = s.id WHERE s.status='active'"
|
|
)
|
|
|
|
lines =
|
|
if lock_rows != [] do
|
|
lock_lines =
|
|
Enum.map(lock_rows, fn [resource, sid] ->
|
|
"- `#{resource}` by `#{sid}`"
|
|
end)
|
|
|
|
lines ++ ["**Locked resources:**" | lock_lines] ++ [""]
|
|
else
|
|
lines
|
|
end
|
|
|
|
report =
|
|
if lines == [] do
|
|
"No world state set yet. Call `Symbiont.Engram.set_world_state/2` to initialize."
|
|
else
|
|
Enum.join(lines, "\n")
|
|
end
|
|
|
|
{:reply, report, state}
|
|
end
|
|
|
|
def handle_call({:get_session, session_id}, _from, %{conn: conn} = state) do
|
|
result =
|
|
case query_one(
|
|
conn,
|
|
"SELECT id, session_type, summary, status, started_at, last_heartbeat, completed_at, completion_summary, metadata " <>
|
|
"FROM sessions WHERE id=?1",
|
|
[session_id]
|
|
) do
|
|
{:ok, row} -> {:ok, row_to_session(row)}
|
|
:empty -> {:error, :not_found}
|
|
end
|
|
|
|
{:reply, result, state}
|
|
end
|
|
|
|
def handle_call(:stats, _from, %{conn: conn} = state) do
|
|
{:ok, [total]} = query_one(conn, "SELECT COUNT(*) FROM sessions")
|
|
{:ok, [active]} = query_one(conn, "SELECT COUNT(*) FROM sessions WHERE status='active'")
|
|
{:ok, [completed]} = query_one(conn, "SELECT COUNT(*) FROM sessions WHERE status='completed'")
|
|
{:ok, [logs]} = query_one(conn, "SELECT COUNT(*) FROM session_logs")
|
|
{:ok, [locks]} = query_one(conn, "SELECT COUNT(*) FROM resource_locks")
|
|
|
|
{:reply,
|
|
%{
|
|
total_sessions: total,
|
|
active_sessions: active,
|
|
completed_sessions: completed,
|
|
total_logs: logs,
|
|
active_locks: locks
|
|
}, state}
|
|
end
|
|
|
|
@impl true
|
|
def terminate(_reason, %{conn: conn}) do
|
|
Exqlite.Sqlite3.close(conn)
|
|
:ok
|
|
end
|
|
|
|
# ── Private Helpers ─────────────────────────────────────────────────────
|
|
|
|
defp init_tables(conn) do
|
|
exec(conn, """
|
|
CREATE TABLE IF NOT EXISTS world_state (
|
|
id INTEGER PRIMARY KEY CHECK (id = 1),
|
|
updated_at TEXT NOT NULL,
|
|
updated_by TEXT,
|
|
content TEXT NOT NULL
|
|
)
|
|
""")
|
|
|
|
exec(conn, """
|
|
CREATE TABLE IF NOT EXISTS sessions (
|
|
id TEXT PRIMARY KEY,
|
|
session_type TEXT NOT NULL,
|
|
summary TEXT NOT NULL,
|
|
status TEXT NOT NULL DEFAULT 'active',
|
|
started_at TEXT NOT NULL,
|
|
last_heartbeat TEXT NOT NULL,
|
|
completed_at TEXT,
|
|
completion_summary TEXT,
|
|
metadata TEXT
|
|
)
|
|
""")
|
|
|
|
exec(conn, """
|
|
CREATE TABLE IF NOT EXISTS session_logs (
|
|
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
|
session_id TEXT NOT NULL REFERENCES sessions(id),
|
|
timestamp TEXT NOT NULL,
|
|
entry TEXT NOT NULL
|
|
)
|
|
""")
|
|
|
|
exec(conn, """
|
|
CREATE TABLE IF NOT EXISTS resource_locks (
|
|
resource TEXT NOT NULL,
|
|
session_id TEXT NOT NULL REFERENCES sessions(id),
|
|
locked_at TEXT NOT NULL,
|
|
note TEXT,
|
|
PRIMARY KEY (resource, session_id)
|
|
)
|
|
""")
|
|
|
|
exec(conn, "CREATE INDEX IF NOT EXISTS idx_sessions_status ON sessions(status)")
|
|
exec(conn, "CREATE INDEX IF NOT EXISTS idx_logs_session ON session_logs(session_id)")
|
|
exec(conn, "CREATE INDEX IF NOT EXISTS idx_locks_resource ON resource_locks(resource)")
|
|
end
|
|
|
|
defp generate_session_id do
|
|
now = DateTime.utc_now()
|
|
hex = :crypto.strong_rand_bytes(4) |> Base.encode16(case: :lower)
|
|
|
|
now
|
|
|> DateTime.to_iso8601()
|
|
|> String.replace(~r/[T:\-]/, "")
|
|
|> String.slice(0, 14)
|
|
|> Kernel.<>("-#{hex}")
|
|
end
|
|
|
|
defp now_iso do
|
|
DateTime.utc_now() |> DateTime.to_iso8601()
|
|
end
|
|
|
|
defp is_stale?(last_heartbeat) do
|
|
# Parse timestamp — handles both UTC DateTime (new) and naive (legacy Python)
|
|
with {:error, _} <- parse_as_datetime(last_heartbeat),
|
|
{:error, _} <- parse_as_naive(last_heartbeat) do
|
|
false
|
|
else
|
|
{:ok, dt} -> DateTime.diff(DateTime.utc_now(), dt, :minute) > @stale_threshold_minutes
|
|
end
|
|
end
|
|
|
|
defp parse_as_datetime(str) do
|
|
case DateTime.from_iso8601(str) do
|
|
{:ok, dt, _offset} -> {:ok, dt}
|
|
_ -> {:error, :invalid}
|
|
end
|
|
end
|
|
|
|
defp parse_as_naive(str) do
|
|
case NaiveDateTime.from_iso8601(str) do
|
|
{:ok, ndt} -> {:ok, DateTime.from_naive!(ndt, "Etc/UTC")}
|
|
_ -> {:error, :invalid}
|
|
end
|
|
end
|
|
|
|
defp row_to_session([id, stype, summary, status, started, last_hb, completed, comp_summary, metadata]) do
|
|
%{
|
|
id: id,
|
|
session_type: stype,
|
|
summary: summary,
|
|
status: status,
|
|
started_at: started,
|
|
last_heartbeat: last_hb,
|
|
completed_at: completed,
|
|
completion_summary: comp_summary,
|
|
metadata: metadata
|
|
}
|
|
end
|
|
|
|
# ── SQLite Helpers ──────────────────────────────────────────────────────
|
|
|
|
defp exec(conn, sql, params \\ []) do
|
|
{:ok, stmt} = Exqlite.Sqlite3.prepare(conn, sql)
|
|
|
|
try do
|
|
if params != [] do
|
|
:ok = Exqlite.Sqlite3.bind(stmt, params)
|
|
end
|
|
|
|
case Exqlite.Sqlite3.step(conn, stmt) do
|
|
:done -> :ok
|
|
{:row, _} -> :ok
|
|
{:error, reason} -> raise "SQLite exec error: #{inspect(reason)}"
|
|
end
|
|
after
|
|
Exqlite.Sqlite3.release(conn, stmt)
|
|
end
|
|
end
|
|
|
|
defp query_one(conn, sql, params \\ []) do
|
|
{:ok, stmt} = Exqlite.Sqlite3.prepare(conn, sql)
|
|
|
|
try do
|
|
if params != [] do
|
|
:ok = Exqlite.Sqlite3.bind(stmt, params)
|
|
end
|
|
|
|
case Exqlite.Sqlite3.step(conn, stmt) do
|
|
{:row, row} -> {:ok, row}
|
|
:done -> :empty
|
|
{:error, reason} -> raise "SQLite query error: #{inspect(reason)}"
|
|
end
|
|
after
|
|
Exqlite.Sqlite3.release(conn, stmt)
|
|
end
|
|
end
|
|
|
|
defp query_all(conn, sql, params \\ []) do
|
|
{:ok, stmt} = Exqlite.Sqlite3.prepare(conn, sql)
|
|
|
|
try do
|
|
if params != [] do
|
|
:ok = Exqlite.Sqlite3.bind(stmt, params)
|
|
end
|
|
|
|
rows = collect_rows(conn, stmt, [])
|
|
{:ok, rows}
|
|
after
|
|
Exqlite.Sqlite3.release(conn, stmt)
|
|
end
|
|
end
|
|
|
|
defp collect_rows(conn, stmt, acc) do
|
|
case Exqlite.Sqlite3.step(conn, stmt) do
|
|
{:row, row} -> collect_rows(conn, stmt, [row | acc])
|
|
:done -> Enum.reverse(acc)
|
|
{:error, reason} -> raise "SQLite step error mid-collection: #{inspect(reason)}"
|
|
end
|
|
end
|
|
end
|