symbiont/symbiont/engram.py
Muse 18252e05e6 Engram: add world_state table, redesign sitrep() for context safety
sitrep() now returns world_state + active session one-liners only.
Detailed logs available via get_session_logs() when needed.
Initial world state written documenting full Muse ecosystem.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-03-20 14:15:41 +00:00

306 lines
12 KiB
Python

"""
Engram: Persistent memory across Claude instances.
An engram is the physical trace a memory leaves in neural tissue. Every Claude
session (Cowork, Claude Code, Desktop) writes its engrams here on startup,
building a shared memory of what's being worked on across the ecosystem.
This lets each instance see what others are working on, avoid conflicts on
shared resources, and pick up context from recently completed work.
Two-tier memory model:
- world_state: A singleton markdown document updated at the end of any session that
changes things. Read at the start of every session (fits safely in context).
- session_logs: Detailed logs available via get_session_logs() only when needed.
SQLite with WAL mode handles 2-4 concurrent readers cleanly. Each session
writes only its own rows, so writer contention is minimal.
Usage:
from symbiont.engram import Engram, sitrep
# Start every session with:
print(sitrep())
eng = Engram()
sid = eng.register("cowork", "Building the Elixir port of Symbiont")
# Check what siblings are doing
active = eng.get_active_sessions()
recent = eng.get_recent_sessions(hours=24)
# Log progress
eng.log(sid, "Finished router module, starting dispatcher")
# Claim a resource (prevents conflicts)
eng.lock_resource(sid, "/data/symbiont/symbiont/router.py")
# Before modifying a file, check if someone else has it
locks = eng.check_locks("/data/symbiont/symbiont/router.py")
# Heartbeat (call periodically on long sessions)
eng.heartbeat(sid, "Still working on dispatcher, 60% done")
# Update world state before completing (optional, only if your work changes the world)
eng.set_world_state("Updated world state content", updated_by=sid)
# Done
eng.complete(sid, "Finished Elixir port of router + dispatcher. Tests passing.")
"""
import sqlite3
import logging
import uuid
from datetime import datetime, timedelta
from pathlib import Path
from typing import Optional
logger = logging.getLogger(__name__)
DB_PATH = Path("/data/symbiont/engram.db")
class Engram:
def __init__(self, db_path: Optional[Path] = None):
self.db_path = db_path or DB_PATH
self.db_path.parent.mkdir(parents=True, exist_ok=True)
self._init_db()
def _connect(self):
conn = sqlite3.connect(str(self.db_path), timeout=10)
conn.row_factory = sqlite3.Row
conn.execute("PRAGMA journal_mode=WAL")
conn.execute("PRAGMA busy_timeout=5000")
return conn
def _init_db(self):
with self._connect() as conn:
conn.executescript("""
CREATE TABLE IF NOT EXISTS world_state (
id INTEGER PRIMARY KEY CHECK (id = 1),
updated_at TEXT NOT NULL,
updated_by TEXT,
content TEXT NOT NULL
);
CREATE TABLE IF NOT EXISTS sessions (
id TEXT PRIMARY KEY,
session_type TEXT NOT NULL,
summary TEXT NOT NULL,
status TEXT NOT NULL DEFAULT 'active',
started_at TEXT NOT NULL,
last_heartbeat TEXT NOT NULL,
completed_at TEXT,
completion_summary TEXT,
metadata TEXT
);
CREATE TABLE IF NOT EXISTS session_logs (
id INTEGER PRIMARY KEY AUTOINCREMENT,
session_id TEXT NOT NULL REFERENCES sessions(id),
timestamp TEXT NOT NULL,
entry TEXT NOT NULL
);
CREATE TABLE IF NOT EXISTS resource_locks (
resource TEXT NOT NULL,
session_id TEXT NOT NULL REFERENCES sessions(id),
locked_at TEXT NOT NULL,
note TEXT,
PRIMARY KEY (resource, session_id)
);
CREATE INDEX IF NOT EXISTS idx_sessions_status ON sessions(status);
CREATE INDEX IF NOT EXISTS idx_logs_session ON session_logs(session_id);
CREATE INDEX IF NOT EXISTS idx_locks_resource ON resource_locks(resource);
""")
def get_world_state(self) -> str:
"""Get the current world state. Returns empty string if not set."""
with self._connect() as conn:
row = conn.execute("SELECT content FROM world_state WHERE id=1").fetchone()
return row[0] if row else ""
def set_world_state(self, content: str, updated_by: str = None):
"""Replace the world state. Called at the end of any session that changes things."""
now = datetime.now().isoformat()
with self._connect() as conn:
conn.execute(
"INSERT INTO world_state (id, updated_at, updated_by, content) VALUES (1, ?, ?, ?) "
"ON CONFLICT(id) DO UPDATE SET updated_at=excluded.updated_at, "
"updated_by=excluded.updated_by, content=excluded.content",
(now, updated_by, content),
)
def register(self, session_type: str, summary: str, metadata: Optional[str] = None) -> str:
"""Register a new session. Returns session ID."""
sid = datetime.now().strftime("%Y%m%d-%H%M%S-") + uuid.uuid4().hex[:8]
now = datetime.now().isoformat()
with self._connect() as conn:
conn.execute(
"INSERT INTO sessions (id, session_type, summary, status, started_at, last_heartbeat, metadata) "
"VALUES (?, ?, ?, 'active', ?, ?, ?)",
(sid, session_type, summary, now, now, metadata),
)
logger.info(f"Session registered: {sid} ({session_type}) — {summary}")
return sid
def heartbeat(self, session_id: str, summary: Optional[str] = None):
"""Update heartbeat timestamp and optionally update summary."""
now = datetime.now().isoformat()
with self._connect() as conn:
if summary:
conn.execute(
"UPDATE sessions SET last_heartbeat=?, summary=?, status='active' WHERE id=?",
(now, summary, session_id),
)
else:
conn.execute(
"UPDATE sessions SET last_heartbeat=?, status='active' WHERE id=?",
(now, session_id),
)
def complete(self, session_id: str, completion_summary: str):
"""Mark session as completed with a summary of what was accomplished."""
now = datetime.now().isoformat()
with self._connect() as conn:
conn.execute(
"UPDATE sessions SET status='completed', completed_at=?, completion_summary=? WHERE id=?",
(now, completion_summary, session_id),
)
conn.execute("DELETE FROM resource_locks WHERE session_id=?", (session_id,))
logger.info(f"Session completed: {session_id}")
def log(self, session_id: str, entry: str):
"""Log a progress entry for a session."""
now = datetime.now().isoformat()
with self._connect() as conn:
conn.execute(
"INSERT INTO session_logs (session_id, timestamp, entry) VALUES (?, ?, ?)",
(session_id, now, entry),
)
def get_active_sessions(self) -> list[dict]:
"""Get all currently active sessions."""
with self._connect() as conn:
rows = conn.execute(
"SELECT * FROM sessions WHERE status='active' ORDER BY last_heartbeat DESC"
).fetchall()
return [dict(r) for r in rows]
def get_recent_sessions(self, hours: int = 24) -> list[dict]:
"""Get recently completed sessions for context."""
cutoff = (datetime.now() - timedelta(hours=hours)).isoformat()
with self._connect() as conn:
rows = conn.execute(
"SELECT * FROM sessions WHERE status='completed' AND completed_at > ? "
"ORDER BY completed_at DESC",
(cutoff,),
).fetchall()
return [dict(r) for r in rows]
def get_session_logs(self, session_id: str, limit: int = 20) -> list[dict]:
"""Get log entries for a specific session. Detailed logs available on-demand only."""
with self._connect() as conn:
rows = conn.execute(
"SELECT * FROM session_logs WHERE session_id=? ORDER BY timestamp DESC LIMIT ?",
(session_id, limit),
).fetchall()
return [dict(r) for r in rows]
def lock_resource(self, session_id: str, resource: str, note: Optional[str] = None):
"""Claim a resource lock. Warns if already locked by another session."""
existing = self.check_locks(resource)
other_locks = [l for l in existing if l["session_id"] != session_id]
if other_locks:
logger.warning(
f"Resource '{resource}' already locked by: "
+ ", ".join(l["session_id"] for l in other_locks)
)
now = datetime.now().isoformat()
with self._connect() as conn:
conn.execute(
"INSERT OR REPLACE INTO resource_locks (resource, session_id, locked_at, note) "
"VALUES (?, ?, ?, ?)",
(resource, session_id, now, note),
)
def release_resource(self, session_id: str, resource: str):
"""Release a resource lock."""
with self._connect() as conn:
conn.execute(
"DELETE FROM resource_locks WHERE resource=? AND session_id=?",
(resource, session_id),
)
def check_locks(self, resource: str) -> list[dict]:
"""Check who has locks on a resource."""
with self._connect() as conn:
rows = conn.execute(
"SELECT rl.*, s.summary, s.session_type FROM resource_locks rl "
"JOIN sessions s ON rl.session_id = s.id "
"WHERE rl.resource=?",
(resource,),
).fetchall()
return [dict(r) for r in rows]
def get_situation_report(self) -> str:
"""
Context-safe situation report. Designed to fit in any Claude context window
without overwhelming it. Returns world state + active session one-liners only.
Detailed logs are available via get_session_logs(session_id) when needed.
"""
lines = []
# World state (the maintained truth)
world = self.get_world_state()
if world:
lines.append(world)
lines.append("")
# Active sessions — one line each, nothing more
active = self.get_active_sessions()
if active:
lines.append(f"**Active sessions ({len(active)}):**")
for s in active:
last = datetime.fromisoformat(s['last_heartbeat'])
stale = (datetime.now() - last) > timedelta(minutes=30)
marker = "⚠ stale" if stale else ""
lines.append(f"- {marker} `{s['id']}` ({s['session_type']}): {s['summary'][:100]}")
lines.append("")
# Resource locks — only if any exist
with self._connect() as conn:
locks = conn.execute(
"SELECT rl.resource, rl.session_id FROM resource_locks rl "
"JOIN sessions s ON rl.session_id = s.id WHERE s.status='active'"
).fetchall()
if locks:
lines.append("**Locked resources:**")
for l in locks:
lines.append(f"- `{l[0]}` by `{l[1]}`")
lines.append("")
if not lines:
lines.append("No world state set yet. Call `eng.set_world_state(...)` to initialize.")
return "\n".join(lines)
def sitrep() -> str:
"""Get a situation report. Call this at the start of every session."""
return Engram().get_situation_report()
def update_world_state(content: str, session_id: str = None):
"""Module-level convenience: update the world state."""
Engram().set_world_state(content, updated_by=session_id)
# Backward compatibility alias
SessionRegistry = Engram