symbiont/symbiont/api.py
Muse f75452dd4c Rename sessions → Engram: persistent memory across Claude instances
Engram is the physical trace a memory leaves in neural tissue.
Every Claude session now writes its engrams to /data/symbiont/engram.db.

Changes:
- sessions.py → engram.py with class Engram (SessionRegistry alias kept)
- sessions.db → engram.db
- CLAUDE.md updated to use Engram
- Genesis session registered with full build history

Muse ecosystem: Cortex + Dendrite + Symbiont + Engram

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-03-20 13:45:34 +00:00

190 lines
5.5 KiB
Python

"""
FastAPI server for the Symbiont orchestrator.
Endpoints:
POST /task — Run a task through the router
POST /queue — Add a task to the queue
GET /status — Health check + rate limit status
GET /ledger — Recent ledger entries
GET /ledger/stats — Aggregate cost/usage stats
GET /sessions — Active and recent sessions
GET /sitrep — Situation report for new sessions
"""
import json
import logging
from datetime import datetime
from pathlib import Path
from typing import Optional
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
from .dispatcher import ModelTier, rate_limits
from .router import route_task
from .scheduler import enqueue_task, get_pending_tasks
from .engram import Engram as SessionRegistry
logger = logging.getLogger(__name__)
app = FastAPI(
title="Symbiont",
description="Self-sustaining AI orchestrator",
version="0.1.0",
)
LEDGER_PATH = Path("/data/symbiont/ledger.jsonl")
class TaskRequest(BaseModel):
task: str
system_prompt: Optional[str] = None
force_tier: Optional[str] = None # "haiku", "sonnet", "opus"
class QueueRequest(BaseModel):
task: str
priority: int = 5
metadata: Optional[dict] = None
@app.post("/task")
async def run_task(req: TaskRequest):
"""Execute a task immediately through the router. Logs to session registry."""
force = None
if req.force_tier:
try:
force = ModelTier(req.force_tier.lower())
except ValueError:
raise HTTPException(400, f"Invalid tier: {req.force_tier}")
# Register this dispatch as a micro-session
reg = SessionRegistry()
sid = reg.register("api-task", req.task[:120])
result = route_task(
req.task,
system_prompt=req.system_prompt,
force_tier=force,
)
# Complete the micro-session
if result["success"]:
reg.complete(sid, f"Completed via {result['model_used']} (${result['estimated_cost_usd']:.4f})")
else:
reg.complete(sid, f"Failed: {result.get('error', 'unknown')}")
return result
@app.post("/queue")
async def queue_task(req: QueueRequest):
"""Add a task to the queue for later processing."""
task_id = enqueue_task(req.task, req.priority, req.metadata)
return {"task_id": task_id, "status": "queued"}
@app.get("/status")
async def status():
"""Health check and current rate limit status."""
limits = {}
for tier in ModelTier:
if rate_limits.is_limited(tier):
limits[tier.value] = {
"limited": True,
"until": rate_limits.limited_until[tier].isoformat(),
}
else:
limits[tier.value] = {"limited": False}
pending = get_pending_tasks()
return {
"status": "alive",
"timestamp": datetime.now().isoformat(),
"rate_limits": limits,
"pending_tasks": len(pending),
}
@app.get("/ledger")
async def get_ledger(limit: int = 50):
"""Return the most recent ledger entries."""
if not LEDGER_PATH.exists():
return {"entries": [], "total": 0}
lines = LEDGER_PATH.read_text().strip().split("\n")
entries = []
for line in lines[-limit:]:
try:
entries.append(json.loads(line))
except json.JSONDecodeError:
continue
return {"entries": entries, "total": len(lines)}
@app.get("/ledger/stats")
async def ledger_stats():
"""Aggregate statistics from the ledger."""
if not LEDGER_PATH.exists():
return {"total_calls": 0}
lines = LEDGER_PATH.read_text().strip().split("\n")
entries = []
for line in lines:
try:
entries.append(json.loads(line))
except json.JSONDecodeError:
continue
if not entries:
return {"total_calls": 0}
total_cost = sum(e.get("estimated_cost_usd", 0) for e in entries)
by_model = {}
for e in entries:
model = e.get("model", "unknown")
if model not in by_model:
by_model[model] = {"calls": 0, "cost": 0, "tokens_in": 0, "tokens_out": 0}
by_model[model]["calls"] += 1
by_model[model]["cost"] += e.get("estimated_cost_usd", 0)
by_model[model]["tokens_in"] += e.get("input_tokens", 0)
by_model[model]["tokens_out"] += e.get("output_tokens", 0)
successes = sum(1 for e in entries if e.get("success"))
return {
"total_calls": len(entries),
"successful_calls": successes,
"total_estimated_cost_usd": round(total_cost, 4),
"by_model": by_model,
"first_entry": entries[0].get("timestamp") if entries else None,
"last_entry": entries[-1].get("timestamp") if entries else None,
}
@app.get("/sitrep")
async def get_sitrep():
"""Situation report — what other sessions are doing. Read this first."""
reg = SessionRegistry()
return {
"report": reg.get_situation_report(),
"active": reg.get_active_sessions(),
"recent_24h": reg.get_recent_sessions(hours=24),
}
@app.get("/sessions")
async def get_sessions(status: Optional[str] = None):
"""List sessions, optionally filtered by status."""
reg = SessionRegistry()
if status == "active":
return {"sessions": reg.get_active_sessions()}
elif status == "completed":
return {"sessions": reg.get_recent_sessions(hours=168)} # 1 week
else:
active = reg.get_active_sessions()
recent = reg.get_recent_sessions(hours=24)
return {"active": active, "recent_24h": recent}