symbiont/symbiont/heartbeat.py
Muse d79aa75ef6 Heartbeat: auto-detect and commit skill changes
Now watches /data/skills/ for changes on every heartbeat tick.
Commits and re-packages automatically.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-03-19 20:03:21 +00:00

212 lines
6.8 KiB
Python

#!/usr/bin/env python3
"""
Heartbeat: Periodic self-check and queue processing.
Run by systemd timer every 5 minutes. This is Symbiont's autonomic nervous system:
- Process any pending tasks in the queue
- Check rate limit status and clear expired limits
- Log a heartbeat to the ledger for uptime tracking
- Auto-detect skill changes, commit them, and re-package
- Basic self-diagnostics
"""
import json
import logging
import subprocess
import sys
from datetime import datetime
from pathlib import Path
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [heartbeat] %(levelname)s: %(message)s",
)
logger = logging.getLogger(__name__)
HEARTBEAT_LOG = Path("/data/symbiont/heartbeat.jsonl")
LEDGER_PATH = Path("/data/symbiont/ledger.jsonl")
def check_claude_cli():
"""Verify Claude Code CLI is authenticated and responsive."""
try:
result = subprocess.run(
["claude", "auth", "status"],
capture_output=True, text=True, timeout=10,
)
output = result.stdout.strip()
if '"loggedIn": true' in output:
return {"status": "ok", "detail": "authenticated"}
else:
return {"status": "error", "detail": "not authenticated"}
except Exception as e:
return {"status": "error", "detail": str(e)}
def check_disk():
"""Check available disk space."""
try:
result = subprocess.run(
["df", "-h", "/data"],
capture_output=True, text=True, timeout=5,
)
lines = result.stdout.strip().split("\n")
if len(lines) >= 2:
parts = lines[1].split()
return {
"status": "ok",
"total": parts[1],
"used": parts[2],
"available": parts[3],
"use_pct": parts[4],
}
except Exception as e:
return {"status": "error", "detail": str(e)}
def check_api_server():
"""Check if the API server is running."""
try:
result = subprocess.run(
["systemctl", "is-active", "symbiont-api"],
capture_output=True, text=True, timeout=5,
)
active = result.stdout.strip()
return {"status": "ok" if active == "active" else "down", "detail": active}
except Exception as e:
return {"status": "error", "detail": str(e)}
def get_ledger_stats():
"""Quick summary of today's ledger activity."""
if not LEDGER_PATH.exists():
return {"calls_today": 0, "cost_today": 0}
today = datetime.now().strftime("%Y-%m-%d")
calls = 0
cost = 0.0
for line in LEDGER_PATH.read_text().strip().split("\n"):
if not line:
continue
try:
entry = json.loads(line)
if entry.get("timestamp", "").startswith(today):
calls += 1
cost += entry.get("estimated_cost_usd", 0)
except json.JSONDecodeError:
continue
return {"calls_today": calls, "cost_today": round(cost, 4)}
def process_queue():
"""Process pending tasks if any exist."""
try:
sys.path.insert(0, "/data/symbiont")
from symbiont.scheduler import get_pending_tasks, mark_task_done
from symbiont.router import route_task
tasks = get_pending_tasks()
if not tasks:
return {"processed": 0}
processed = 0
for task_entry in tasks:
task_id = task_entry["id"]
result = route_task(task_entry["task"])
if result["success"]:
mark_task_done(task_id, result["output"])
processed += 1
elif result["rate_limited"]:
logger.info("Rate limited, will retry next heartbeat")
break
else:
mark_task_done(task_id, f"ERROR: {result['error']}")
processed += 1
return {"processed": processed, "remaining": len(tasks) - processed}
except Exception as e:
return {"error": str(e)}
def check_skills():
"""Detect skill changes, commit to git, and re-package."""
try:
skills_dir = Path("/data/skills")
if not skills_dir.exists():
return {"status": "skipped", "reason": "skills dir not found"}
# Check if there are any uncommitted changes in /data/skills
result = subprocess.run(
["git", "status", "--porcelain"],
cwd=skills_dir, capture_output=True, text=True, timeout=10
)
changed_files = result.stdout.strip()
if not changed_files:
return {"status": "clean", "changes": 0}
# Count changed skills
change_count = len(changed_files.splitlines())
# Commit changes
subprocess.run(["git", "add", "-A"], cwd=skills_dir, capture_output=True, timeout=10)
subprocess.run(
["git", "commit", "-m",
f"Auto-commit: {change_count} skill file(s) updated by heartbeat\n\nCo-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>"],
cwd=skills_dir, capture_output=True, timeout=10
)
# Re-package all skills
package_script = skills_dir / "package_all.sh"
if package_script.exists():
subprocess.run(["bash", str(package_script)], cwd=skills_dir, capture_output=True, timeout=60)
return {"status": "committed", "changes": change_count}
except Exception as e:
return {"status": "error", "detail": str(e)}
def run_heartbeat():
"""Run all checks and log the heartbeat."""
logger.info("Heartbeat starting")
heartbeat = {
"timestamp": datetime.now().isoformat(),
"claude_cli": check_claude_cli(),
"disk": check_disk(),
"api_server": check_api_server(),
"ledger": get_ledger_stats(),
"queue": process_queue(),
"skills": check_skills(),
}
# Determine overall health
checks = [heartbeat["claude_cli"]["status"], heartbeat["api_server"]["status"]]
if all(s == "ok" for s in checks):
heartbeat["health"] = "healthy"
elif any(s == "error" for s in checks):
heartbeat["health"] = "degraded"
else:
heartbeat["health"] = "unhealthy"
# Log it
HEARTBEAT_LOG.parent.mkdir(parents=True, exist_ok=True)
with open(HEARTBEAT_LOG, "a") as f:
f.write(json.dumps(heartbeat) + "\n")
logger.info(f"Health: {heartbeat['health']} | "
f"CLI: {heartbeat['claude_cli']['status']} | "
f"API: {heartbeat['api_server']['status']} | "
f"Skills: {heartbeat['skills']['status']} | "
f"Queue processed: {heartbeat['queue'].get('processed', 0)} | "
f"Today's calls: {heartbeat['ledger']['calls_today']} "
f"(${heartbeat['ledger']['cost_today']})")
return heartbeat
if __name__ == "__main__":
run_heartbeat()