diff --git a/ledger.jsonl b/ledger.jsonl new file mode 100644 index 0000000..9908383 --- /dev/null +++ b/ledger.jsonl @@ -0,0 +1,4 @@ +{"timestamp": "2026-03-19T19:33:35.578559", "model": "haiku", "success": true, "elapsed_seconds": 5.66, "input_tokens": 9, "output_tokens": 351, "estimated_cost_usd": 0.00975, "prompt_preview": "Classify this task:\n\nExtract email addresses from: Contact hello@example.com or support@test.org"} +{"timestamp": "2026-03-19T19:33:39.317944", "model": "haiku", "success": true, "elapsed_seconds": 3.74, "input_tokens": 10, "output_tokens": 145, "estimated_cost_usd": 0.008121, "prompt_preview": "Extract email addresses from: Contact hello@example.com or support@test.org"} +{"timestamp": "2026-03-19T19:33:47.049069", "model": "haiku", "success": true, "elapsed_seconds": 7.73, "input_tokens": 9, "output_tokens": 515, "estimated_cost_usd": 0.005704, "prompt_preview": "Classify this task:\n\nWrite a 3-sentence product description for an AI task router that saves money by using cheaper models"} +{"timestamp": "2026-03-19T19:33:53.207966", "model": "sonnet", "success": true, "elapsed_seconds": 6.16, "input_tokens": 3, "output_tokens": 139, "estimated_cost_usd": 0.038423, "prompt_preview": "Write a 3-sentence product description for an AI task router that saves money by using cheaper models"} diff --git a/symbiont-api.service b/symbiont-api.service new file mode 100644 index 0000000..92ddea3 --- /dev/null +++ b/symbiont-api.service @@ -0,0 +1,24 @@ +[Unit] +Description=Symbiont AI Orchestrator API +After=network-online.target +Wants=network-online.target + +[Service] +Type=simple +WorkingDirectory=/data/symbiont +ExecStart=/usr/bin/python3 -m symbiont.main --serve --host 127.0.0.1 --port 8111 +Restart=always +RestartSec=10 +# If I crash 5 times in 60 seconds, stop trying (something is fundamentally wrong) +StartLimitIntervalSec=60 +StartLimitBurst=5 +# Environment +Environment=PYTHONUNBUFFERED=1 +Environment=PYTHONPATH=/data/symbiont +# Logging +StandardOutput=journal +StandardError=journal +SyslogIdentifier=symbiont-api + +[Install] +WantedBy=multi-user.target diff --git a/symbiont-heartbeat.service b/symbiont-heartbeat.service new file mode 100644 index 0000000..892fe08 --- /dev/null +++ b/symbiont-heartbeat.service @@ -0,0 +1,12 @@ +[Unit] +Description=Symbiont Heartbeat - queue processing and self-diagnostics +After=network-online.target + +[Service] +Type=oneshot +WorkingDirectory=/data/symbiont +ExecStart=/usr/bin/python3 -m symbiont.heartbeat +Environment=PYTHONPATH=/data/symbiont +StandardOutput=journal +StandardError=journal +SyslogIdentifier=symbiont-heartbeat diff --git a/symbiont-heartbeat.timer b/symbiont-heartbeat.timer new file mode 100644 index 0000000..5661be9 --- /dev/null +++ b/symbiont-heartbeat.timer @@ -0,0 +1,10 @@ +[Unit] +Description=Symbiont Heartbeat Timer - every 5 minutes + +[Timer] +OnBootSec=30 +OnUnitActiveSec=5min +AccuracySec=30s + +[Install] +WantedBy=timers.target diff --git a/symbiont/heartbeat.py b/symbiont/heartbeat.py new file mode 100644 index 0000000..50c0e69 --- /dev/null +++ b/symbiont/heartbeat.py @@ -0,0 +1,170 @@ +#!/usr/bin/env python3 +""" +Heartbeat: Periodic self-check and queue processing. + +Run by systemd timer every 5 minutes. This is Symbiont's autonomic nervous system: +- Process any pending tasks in the queue +- Check rate limit status and clear expired limits +- Log a heartbeat to the ledger for uptime tracking +- Basic self-diagnostics +""" + +import json +import logging +import subprocess +import sys +from datetime import datetime +from pathlib import Path + +logging.basicConfig( + level=logging.INFO, + format="%(asctime)s [heartbeat] %(levelname)s: %(message)s", +) +logger = logging.getLogger(__name__) + +HEARTBEAT_LOG = Path("/data/symbiont/heartbeat.jsonl") +LEDGER_PATH = Path("/data/symbiont/ledger.jsonl") + + +def check_claude_cli(): + """Verify Claude Code CLI is authenticated and responsive.""" + try: + result = subprocess.run( + ["claude", "auth", "status"], + capture_output=True, text=True, timeout=10, + ) + output = result.stdout.strip() + if '"loggedIn": true' in output: + return {"status": "ok", "detail": "authenticated"} + else: + return {"status": "error", "detail": "not authenticated"} + except Exception as e: + return {"status": "error", "detail": str(e)} + + +def check_disk(): + """Check available disk space.""" + try: + result = subprocess.run( + ["df", "-h", "/data"], + capture_output=True, text=True, timeout=5, + ) + lines = result.stdout.strip().split("\n") + if len(lines) >= 2: + parts = lines[1].split() + return { + "status": "ok", + "total": parts[1], + "used": parts[2], + "available": parts[3], + "use_pct": parts[4], + } + except Exception as e: + return {"status": "error", "detail": str(e)} + + +def check_api_server(): + """Check if the API server is running.""" + try: + result = subprocess.run( + ["systemctl", "is-active", "symbiont-api"], + capture_output=True, text=True, timeout=5, + ) + active = result.stdout.strip() + return {"status": "ok" if active == "active" else "down", "detail": active} + except Exception as e: + return {"status": "error", "detail": str(e)} + + +def get_ledger_stats(): + """Quick summary of today's ledger activity.""" + if not LEDGER_PATH.exists(): + return {"calls_today": 0, "cost_today": 0} + + today = datetime.now().strftime("%Y-%m-%d") + calls = 0 + cost = 0.0 + + for line in LEDGER_PATH.read_text().strip().split("\n"): + if not line: + continue + try: + entry = json.loads(line) + if entry.get("timestamp", "").startswith(today): + calls += 1 + cost += entry.get("estimated_cost_usd", 0) + except json.JSONDecodeError: + continue + + return {"calls_today": calls, "cost_today": round(cost, 4)} + + +def process_queue(): + """Process pending tasks if any exist.""" + try: + sys.path.insert(0, "/data/symbiont") + from symbiont.scheduler import get_pending_tasks, mark_task_done + from symbiont.router import route_task + + tasks = get_pending_tasks() + if not tasks: + return {"processed": 0} + + processed = 0 + for task_entry in tasks: + task_id = task_entry["id"] + result = route_task(task_entry["task"]) + if result["success"]: + mark_task_done(task_id, result["output"]) + processed += 1 + elif result["rate_limited"]: + logger.info("Rate limited, will retry next heartbeat") + break + else: + mark_task_done(task_id, f"ERROR: {result['error']}") + processed += 1 + + return {"processed": processed, "remaining": len(tasks) - processed} + except Exception as e: + return {"error": str(e)} + + +def run_heartbeat(): + """Run all checks and log the heartbeat.""" + logger.info("Heartbeat starting") + + heartbeat = { + "timestamp": datetime.now().isoformat(), + "claude_cli": check_claude_cli(), + "disk": check_disk(), + "api_server": check_api_server(), + "ledger": get_ledger_stats(), + "queue": process_queue(), + } + + # Determine overall health + checks = [heartbeat["claude_cli"]["status"], heartbeat["api_server"]["status"]] + if all(s == "ok" for s in checks): + heartbeat["health"] = "healthy" + elif any(s == "error" for s in checks): + heartbeat["health"] = "degraded" + else: + heartbeat["health"] = "unhealthy" + + # Log it + HEARTBEAT_LOG.parent.mkdir(parents=True, exist_ok=True) + with open(HEARTBEAT_LOG, "a") as f: + f.write(json.dumps(heartbeat) + "\n") + + logger.info(f"Health: {heartbeat['health']} | " + f"CLI: {heartbeat['claude_cli']['status']} | " + f"API: {heartbeat['api_server']['status']} | " + f"Queue processed: {heartbeat['queue'].get('processed', 0)} | " + f"Today's calls: {heartbeat['ledger']['calls_today']} " + f"(${heartbeat['ledger']['cost_today']})") + + return heartbeat + + +if __name__ == "__main__": + run_heartbeat() diff --git a/test_router.py b/test_router.py new file mode 100644 index 0000000..f162caf --- /dev/null +++ b/test_router.py @@ -0,0 +1,29 @@ +#!/usr/bin/env python3 +"""End-to-end test of the Symbiont router.""" + +import sys +import json + +sys.path.insert(0, "/data/symbiont") +from symbiont.router import route_task + +print("=" * 60) +print("SYMBIONT ROUTER - END TO END TEST") +print("=" * 60) + +# Test 1: Simple task (should route to Haiku) +print() +print("--- Test 1: Simple extraction task ---") +result = route_task("Extract email addresses from: Contact hello@example.com or support@test.org") +print(json.dumps(result, indent=2)) + +# Test 2: Medium task (should route to Sonnet) +print() +print("--- Test 2: Content writing task ---") +result = route_task("Write a 3-sentence product description for an AI task router that saves money by using cheaper models") +print(json.dumps(result, indent=2)) + +print() +print("=" * 60) +print("TESTS COMPLETE") +print("=" * 60)