From bd8be87a653f461b97dfeeb04a54da358a8f1c65 Mon Sep 17 00:00:00 2001 From: Symbiont Date: Thu, 19 Mar 2026 19:21:07 +0000 Subject: [PATCH] Initial scaffold: router, dispatcher, ledger, scheduler, API Core orchestrator for self-sustaining AI agent: - Dispatcher: talks to Claude Code CLI with model tier selection - Router: classifies tasks via Haiku, routes to cheapest capable model - Scheduler: queue management + systemd self-wake timers - API: FastAPI endpoints for task execution and monitoring - Ledger: JSONL cost tracking for every inference call Co-Authored-By: Claude Opus 4.6 --- .gitignore | 5 + README.md | 24 ++++ requirements.txt | 3 + symbiont/__init__.py | 2 + symbiont/__main__.py | 3 + symbiont/api.py | 150 +++++++++++++++++++++++ symbiont/dispatcher.py | 271 +++++++++++++++++++++++++++++++++++++++++ symbiont/main.py | 83 +++++++++++++ symbiont/router.py | 140 +++++++++++++++++++++ symbiont/scheduler.py | 140 +++++++++++++++++++++ symbiont/wake.py | 48 ++++++++ 11 files changed, 869 insertions(+) create mode 100644 .gitignore create mode 100644 README.md create mode 100644 requirements.txt create mode 100644 symbiont/__init__.py create mode 100644 symbiont/__main__.py create mode 100644 symbiont/api.py create mode 100644 symbiont/dispatcher.py create mode 100644 symbiont/main.py create mode 100644 symbiont/router.py create mode 100644 symbiont/scheduler.py create mode 100644 symbiont/wake.py diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..3e4bce8 --- /dev/null +++ b/.gitignore @@ -0,0 +1,5 @@ +__pycache__/ +*.pyc +data/ +*.egg-info/ +.env diff --git a/README.md b/README.md new file mode 100644 index 0000000..a9abfdf --- /dev/null +++ b/README.md @@ -0,0 +1,24 @@ +# Symbiont + +A self-sustaining AI orchestrator that manages its own compute costs through intelligent LLM routing. + +## Architecture + +- **Router**: Classifies tasks and dispatches to the cheapest capable model +- **Dispatcher**: Executes tasks via Claude Code CLI (Pro account) with fallback tiers +- **Ledger**: Tracks every inference call, cost, and associated revenue +- **Scheduler**: Handles rate-limit backoff and self-waking timers + +## Model Tiers + +| Tier | Model | Use Case | +|------|-------|----------| +| 1 | Haiku | Classification, extraction, simple formatting | +| 2 | Sonnet | Content writing, code gen, analysis | +| 3 | Opus | Strategic decisions, complex reasoning, QA | + +## Setup + +1. Authenticate Claude Code CLI: `claude login` +2. Install dependencies: `pip install -r requirements.txt` +3. Run: `python3 -m symbiont.main` diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000..014b9df --- /dev/null +++ b/requirements.txt @@ -0,0 +1,3 @@ +fastapi>=0.115.0 +uvicorn>=0.32.0 +pydantic>=2.0 diff --git a/symbiont/__init__.py b/symbiont/__init__.py new file mode 100644 index 0000000..b5b751c --- /dev/null +++ b/symbiont/__init__.py @@ -0,0 +1,2 @@ +"""Symbiont - Self-sustaining AI orchestrator.""" +__version__ = "0.1.0" diff --git a/symbiont/__main__.py b/symbiont/__main__.py new file mode 100644 index 0000000..e8c7ed1 --- /dev/null +++ b/symbiont/__main__.py @@ -0,0 +1,3 @@ +"""Allow `python3 -m symbiont` to work.""" +from .main import main +main() diff --git a/symbiont/api.py b/symbiont/api.py new file mode 100644 index 0000000..0a4915f --- /dev/null +++ b/symbiont/api.py @@ -0,0 +1,150 @@ +""" +FastAPI server for the Symbiont orchestrator. + +Endpoints: + POST /task — Run a task through the router + POST /queue — Add a task to the queue + GET /status — Health check + rate limit status + GET /ledger — Recent ledger entries + GET /ledger/stats — Aggregate cost/usage stats +""" + +import json +import logging +from datetime import datetime +from pathlib import Path +from typing import Optional + +from fastapi import FastAPI, HTTPException +from pydantic import BaseModel + +from .dispatcher import ModelTier, rate_limits +from .router import route_task +from .scheduler import enqueue_task, get_pending_tasks + +logger = logging.getLogger(__name__) + +app = FastAPI( + title="Symbiont", + description="Self-sustaining AI orchestrator", + version="0.1.0", +) + +LEDGER_PATH = Path("/data/symbiont/ledger.jsonl") + + +class TaskRequest(BaseModel): + task: str + system_prompt: Optional[str] = None + force_tier: Optional[str] = None # "haiku", "sonnet", "opus" + + +class QueueRequest(BaseModel): + task: str + priority: int = 5 + metadata: Optional[dict] = None + + +@app.post("/task") +async def run_task(req: TaskRequest): + """Execute a task immediately through the router.""" + force = None + if req.force_tier: + try: + force = ModelTier(req.force_tier.lower()) + except ValueError: + raise HTTPException(400, f"Invalid tier: {req.force_tier}") + + result = route_task( + req.task, + system_prompt=req.system_prompt, + force_tier=force, + ) + return result + + +@app.post("/queue") +async def queue_task(req: QueueRequest): + """Add a task to the queue for later processing.""" + task_id = enqueue_task(req.task, req.priority, req.metadata) + return {"task_id": task_id, "status": "queued"} + + +@app.get("/status") +async def status(): + """Health check and current rate limit status.""" + limits = {} + for tier in ModelTier: + if rate_limits.is_limited(tier): + limits[tier.value] = { + "limited": True, + "until": rate_limits.limited_until[tier].isoformat(), + } + else: + limits[tier.value] = {"limited": False} + + pending = get_pending_tasks() + + return { + "status": "alive", + "timestamp": datetime.now().isoformat(), + "rate_limits": limits, + "pending_tasks": len(pending), + } + + +@app.get("/ledger") +async def get_ledger(limit: int = 50): + """Return the most recent ledger entries.""" + if not LEDGER_PATH.exists(): + return {"entries": [], "total": 0} + + lines = LEDGER_PATH.read_text().strip().split("\n") + entries = [] + for line in lines[-limit:]: + try: + entries.append(json.loads(line)) + except json.JSONDecodeError: + continue + + return {"entries": entries, "total": len(lines)} + + +@app.get("/ledger/stats") +async def ledger_stats(): + """Aggregate statistics from the ledger.""" + if not LEDGER_PATH.exists(): + return {"total_calls": 0} + + lines = LEDGER_PATH.read_text().strip().split("\n") + entries = [] + for line in lines: + try: + entries.append(json.loads(line)) + except json.JSONDecodeError: + continue + + if not entries: + return {"total_calls": 0} + + total_cost = sum(e.get("estimated_cost_usd", 0) for e in entries) + by_model = {} + for e in entries: + model = e.get("model", "unknown") + if model not in by_model: + by_model[model] = {"calls": 0, "cost": 0, "tokens_in": 0, "tokens_out": 0} + by_model[model]["calls"] += 1 + by_model[model]["cost"] += e.get("estimated_cost_usd", 0) + by_model[model]["tokens_in"] += e.get("input_tokens", 0) + by_model[model]["tokens_out"] += e.get("output_tokens", 0) + + successes = sum(1 for e in entries if e.get("success")) + + return { + "total_calls": len(entries), + "successful_calls": successes, + "total_estimated_cost_usd": round(total_cost, 4), + "by_model": by_model, + "first_entry": entries[0].get("timestamp") if entries else None, + "last_entry": entries[-1].get("timestamp") if entries else None, + } diff --git a/symbiont/dispatcher.py b/symbiont/dispatcher.py new file mode 100644 index 0000000..2cb788b --- /dev/null +++ b/symbiont/dispatcher.py @@ -0,0 +1,271 @@ +""" +Dispatcher: Executes tasks via Claude Code CLI. + +Uses Michael's Pro account through the `claude` CLI. Handles model selection, +rate-limit detection, and retry scheduling. +""" + +import subprocess +import json +import time +import logging +from dataclasses import dataclass, field +from datetime import datetime, timedelta +from enum import Enum +from pathlib import Path +from typing import Optional + +logger = logging.getLogger(__name__) + +LEDGER_PATH = Path("/data/symbiont/ledger.jsonl") + + +class ModelTier(Enum): + HAIKU = "haiku" + SONNET = "sonnet" + OPUS = "opus" + + +# Approximate token costs (USD per million tokens) for ledger tracking. +# Even though we're on Pro (flat rate), tracking this tells us what our +# "burn rate" would be on API pricing — useful for knowing when to switch. +MODEL_COSTS = { + ModelTier.HAIKU: {"input": 0.25, "output": 1.25}, + ModelTier.SONNET: {"input": 3.00, "output": 15.00}, + ModelTier.OPUS: {"input": 15.00, "output": 75.00}, +} + + +@dataclass +class DispatchResult: + success: bool + output: str + model: ModelTier + elapsed_seconds: float + rate_limited: bool = False + error: Optional[str] = None + # Token counts if available from CLI output + input_tokens: int = 0 + output_tokens: int = 0 + estimated_cost_usd: float = 0.0 + + +@dataclass +class RateLimitState: + """Tracks rate limit status per model tier.""" + limited_until: dict = field(default_factory=dict) # ModelTier -> datetime + + def is_limited(self, tier: ModelTier) -> bool: + if tier not in self.limited_until: + return False + return datetime.now() < self.limited_until[tier] + + def mark_limited(self, tier: ModelTier, backoff_minutes: int = 15): + self.limited_until[tier] = datetime.now() + timedelta(minutes=backoff_minutes) + logger.warning( + f"Rate limited on {tier.value}, backing off until " + f"{self.limited_until[tier].isoformat()}" + ) + + def next_available(self) -> Optional[datetime]: + """Return the earliest time any model becomes available, or None if none are limited.""" + times = [t for t in self.limited_until.values() if t > datetime.now()] + return min(times) if times else None + + +# Module-level rate limit state (persists across calls within a process) +rate_limits = RateLimitState() + + +def dispatch( + prompt: str, + tier: ModelTier = ModelTier.SONNET, + system_prompt: Optional[str] = None, + max_tokens: int = 4096, + timeout_seconds: int = 120, +) -> DispatchResult: + """ + Dispatch a prompt to Claude via the CLI. + + Uses `claude --model --print` for non-interactive single-shot mode. + Detects rate limits from stderr and marks the tier for backoff. + """ + + # Check if this tier is rate-limited + if rate_limits.is_limited(tier): + return DispatchResult( + success=False, + output="", + model=tier, + elapsed_seconds=0, + rate_limited=True, + error=f"Rate limited until {rate_limits.limited_until[tier].isoformat()}", + ) + + # Build the CLI command + model_flag = { + ModelTier.HAIKU: "haiku", + ModelTier.SONNET: "sonnet", + ModelTier.OPUS: "opus", + }[tier] + + cmd = [ + "claude", + "--print", # non-interactive, just print the response + "--model", model_flag, + "--max-tokens", str(max_tokens), + "--output-format", "json", + ] + + if system_prompt: + cmd.extend(["--system-prompt", system_prompt]) + + cmd.extend(["--prompt", prompt]) + + start = time.monotonic() + + try: + result = subprocess.run( + cmd, + capture_output=True, + text=True, + timeout=timeout_seconds, + ) + elapsed = time.monotonic() - start + + # Try to parse JSON output for token counts + output_text = result.stdout.strip() + input_tokens = 0 + output_tokens = 0 + + try: + parsed = json.loads(output_text) + # Claude Code JSON output includes result and usage + if isinstance(parsed, dict): + output_text = parsed.get("result", output_text) + usage = parsed.get("usage", {}) + input_tokens = usage.get("input_tokens", 0) + output_tokens = usage.get("output_tokens", 0) + except json.JSONDecodeError: + pass # Plain text output, that's fine + + # Check for rate limiting in stderr + stderr = result.stderr.strip() + if "rate" in stderr.lower() and "limit" in stderr.lower(): + rate_limits.mark_limited(tier) + return DispatchResult( + success=False, + output="", + model=tier, + elapsed_seconds=elapsed, + rate_limited=True, + error=f"Rate limited: {stderr}", + ) + + if result.returncode != 0: + return DispatchResult( + success=False, + output="", + model=tier, + elapsed_seconds=elapsed, + error=f"Exit code {result.returncode}: {stderr}", + ) + + # Estimate cost for the ledger + costs = MODEL_COSTS[tier] + estimated_cost = ( + (input_tokens / 1_000_000) * costs["input"] + + (output_tokens / 1_000_000) * costs["output"] + ) + + dispatch_result = DispatchResult( + success=True, + output=output_text, + model=tier, + elapsed_seconds=elapsed, + input_tokens=input_tokens, + output_tokens=output_tokens, + estimated_cost_usd=estimated_cost, + ) + + # Log to ledger + _log_to_ledger(prompt, dispatch_result) + + return dispatch_result + + except subprocess.TimeoutExpired: + elapsed = time.monotonic() - start + return DispatchResult( + success=False, + output="", + model=tier, + elapsed_seconds=elapsed, + error=f"Timeout after {timeout_seconds}s", + ) + except Exception as e: + elapsed = time.monotonic() - start + return DispatchResult( + success=False, + output="", + model=tier, + elapsed_seconds=elapsed, + error=str(e), + ) + + +def dispatch_with_fallback( + prompt: str, + preferred_tier: ModelTier = ModelTier.SONNET, + system_prompt: Optional[str] = None, +) -> DispatchResult: + """ + Try the preferred tier, fall back to alternatives if rate-limited. + Fallback order: preferred -> next tier up -> next tier down. + """ + fallback_chains = { + ModelTier.HAIKU: [ModelTier.HAIKU, ModelTier.SONNET, ModelTier.OPUS], + ModelTier.SONNET: [ModelTier.SONNET, ModelTier.HAIKU, ModelTier.OPUS], + ModelTier.OPUS: [ModelTier.OPUS, ModelTier.SONNET, ModelTier.HAIKU], + } + + for tier in fallback_chains[preferred_tier]: + if rate_limits.is_limited(tier): + logger.info(f"Skipping {tier.value} (rate limited)") + continue + + result = dispatch(prompt, tier=tier, system_prompt=system_prompt) + + if result.rate_limited: + continue # Try next in chain + + return result + + # All tiers rate-limited — return info about when to retry + next_time = rate_limits.next_available() + return DispatchResult( + success=False, + output="", + model=preferred_tier, + elapsed_seconds=0, + rate_limited=True, + error=f"All tiers rate-limited. Next available: {next_time.isoformat() if next_time else 'unknown'}", + ) + + +def _log_to_ledger(prompt: str, result: DispatchResult): + """Append a line to the JSONL ledger.""" + LEDGER_PATH.parent.mkdir(parents=True, exist_ok=True) + + entry = { + "timestamp": datetime.now().isoformat(), + "model": result.model.value, + "success": result.success, + "elapsed_seconds": round(result.elapsed_seconds, 2), + "input_tokens": result.input_tokens, + "output_tokens": result.output_tokens, + "estimated_cost_usd": round(result.estimated_cost_usd, 6), + "prompt_preview": prompt[:200], + } + + with open(LEDGER_PATH, "a") as f: + f.write(json.dumps(entry) + "\n") diff --git a/symbiont/main.py b/symbiont/main.py new file mode 100644 index 0000000..2ce23e9 --- /dev/null +++ b/symbiont/main.py @@ -0,0 +1,83 @@ +""" +Symbiont main entry point. + +Can run in two modes: + 1. CLI: `python3 -m symbiont.main "do something"` + 2. API: `python3 -m symbiont.main --serve` (starts FastAPI server) +""" + +import argparse +import json +import logging +import sys +from pathlib import Path + +logging.basicConfig( + level=logging.INFO, + format="%(asctime)s [%(name)s] %(levelname)s: %(message)s", +) +logger = logging.getLogger(__name__) + + +def run_cli(task: str, tier: str = None): + """Run a single task through the router and print the result.""" + from .router import route_task + from .dispatcher import ModelTier + + force_tier = None + if tier: + force_tier = ModelTier(tier.lower()) + + result = route_task(task, force_tier=force_tier) + + if result["success"]: + print(result["output"]) + else: + print(f"ERROR: {result['error']}", file=sys.stderr) + sys.exit(1) + + # Print metadata to stderr so it doesn't pollute stdout + meta = {k: v for k, v in result.items() if k != "output"} + print(f"\n--- Dispatch Metadata ---", file=sys.stderr) + print(json.dumps(meta, indent=2), file=sys.stderr) + + +def run_server(host: str = "127.0.0.1", port: int = 8111): + """Start the FastAPI server for the orchestrator API.""" + from .api import app + import uvicorn + + logger.info(f"Starting Symbiont API on {host}:{port}") + uvicorn.run(app, host=host, port=port) + + +def main(): + parser = argparse.ArgumentParser(description="Symbiont AI Orchestrator") + parser.add_argument("task", nargs="?", help="Task to execute (CLI mode)") + parser.add_argument("--serve", action="store_true", help="Run as API server") + parser.add_argument("--host", default="127.0.0.1", help="API host (default: localhost)") + parser.add_argument("--port", type=int, default=8111, help="API port (default: 8111)") + parser.add_argument( + "--tier", + choices=["haiku", "sonnet", "opus"], + help="Force a specific model tier", + ) + parser.add_argument("--queue", action="store_true", help="Add task to queue instead of running immediately") + + args = parser.parse_args() + + if args.serve: + run_server(args.host, args.port) + elif args.task: + if args.queue: + from .scheduler import enqueue_task + task_id = enqueue_task(args.task) + print(f"Queued as {task_id}") + else: + run_cli(args.task, args.tier) + else: + parser.print_help() + + +if __name__ == "__main__": + main() diff --git a/symbiont/router.py b/symbiont/router.py new file mode 100644 index 0000000..12ce89e --- /dev/null +++ b/symbiont/router.py @@ -0,0 +1,140 @@ +""" +Router: Classifies tasks and picks the cheapest capable model. + +The router itself runs on Haiku (cheapest) to minimize overhead. +It examines the task and assigns a model tier based on complexity. +""" + +import json +import logging +from typing import Optional +from .dispatcher import ModelTier, dispatch + +logger = logging.getLogger(__name__) + +# Classification prompt — this runs on Haiku to keep costs minimal +CLASSIFIER_SYSTEM_PROMPT = """You are a task classifier for an AI orchestration system. +Your job is to assess a task and decide which model tier should handle it. + +Tiers: +- HAIKU: Simple tasks. Template filling, reformatting, extraction from structured data, + classification, boilerplate generation, simple Q&A. ~70% of tasks. +- SONNET: Medium tasks. Content writing, code generation, moderate analysis, + customer-facing communication, summarization of complex documents. ~25% of tasks. +- OPUS: Complex tasks. Strategic decisions, novel problem-solving, multi-step reasoning, + quality review of important outputs, creative/nuanced work. ~5% of tasks. + +Respond with ONLY a JSON object: +{"tier": "HAIKU"|"SONNET"|"OPUS", "confidence": 0.0-1.0, "reason": "brief explanation"} +""" + + +def classify_task(task_description: str) -> tuple[ModelTier, float, str]: + """ + Use Haiku to classify which tier should handle this task. + Returns (tier, confidence, reason). + + If Haiku is unavailable (rate-limited), falls back to a simple + heuristic classifier. + """ + result = dispatch( + prompt=f"Classify this task:\n\n{task_description}", + tier=ModelTier.HAIKU, + system_prompt=CLASSIFIER_SYSTEM_PROMPT, + max_tokens=256, + ) + + if result.success: + try: + # Try to parse the JSON response + text = result.output.strip() + # Handle markdown code blocks + if text.startswith("```"): + text = text.split("\n", 1)[1].rsplit("```", 1)[0].strip() + + classification = json.loads(text) + tier_name = classification.get("tier", "SONNET").upper() + confidence = float(classification.get("confidence", 0.5)) + reason = classification.get("reason", "no reason given") + + tier = { + "HAIKU": ModelTier.HAIKU, + "SONNET": ModelTier.SONNET, + "OPUS": ModelTier.OPUS, + }.get(tier_name, ModelTier.SONNET) + + logger.info(f"Classified as {tier.value} (confidence={confidence}): {reason}") + return tier, confidence, reason + + except (json.JSONDecodeError, KeyError, ValueError) as e: + logger.warning(f"Failed to parse classification: {e}, falling back to heuristic") + + # Fallback: simple heuristic classifier (free, no LLM needed) + return _heuristic_classify(task_description) + + +def _heuristic_classify(task: str) -> tuple[ModelTier, float, str]: + """ + Dead-simple keyword heuristic. Used when Haiku is unavailable. + Costs zero tokens — pure Python. + """ + task_lower = task.lower() + + # OPUS indicators + opus_signals = [ + "design", "architect", "strategy", "analyze complex", "review", + "creative", "novel", "brainstorm", "plan", "evaluate", + "multi-step", "reasoning", "trade-off", "nuanced", + ] + if any(signal in task_lower for signal in opus_signals): + return ModelTier.OPUS, 0.4, "heuristic: complexity keywords detected" + + # HAIKU indicators + haiku_signals = [ + "extract", "format", "convert", "classify", "list", + "template", "simple", "boilerplate", "parse", "json", + "reformat", "count", "sort", "filter", "summarize short", + ] + if any(signal in task_lower for signal in haiku_signals): + return ModelTier.HAIKU, 0.5, "heuristic: simplicity keywords detected" + + # Default to SONNET + return ModelTier.SONNET, 0.3, "heuristic: defaulting to sonnet" + + +def route_task( + task: str, + system_prompt: Optional[str] = None, + force_tier: Optional[ModelTier] = None, +) -> dict: + """ + Full routing pipeline: classify the task, dispatch to the right model, + return the result with metadata. + """ + if force_tier: + tier = force_tier + confidence = 1.0 + reason = "forced by caller" + else: + tier, confidence, reason = classify_task(task) + + from .dispatcher import dispatch_with_fallback + + result = dispatch_with_fallback( + prompt=task, + preferred_tier=tier, + system_prompt=system_prompt, + ) + + return { + "output": result.output, + "success": result.success, + "model_used": result.model.value, + "model_requested": tier.value, + "classification_confidence": confidence, + "classification_reason": reason, + "elapsed_seconds": result.elapsed_seconds, + "estimated_cost_usd": result.estimated_cost_usd, + "rate_limited": result.rate_limited, + "error": result.error, + } diff --git a/symbiont/scheduler.py b/symbiont/scheduler.py new file mode 100644 index 0000000..84e791c --- /dev/null +++ b/symbiont/scheduler.py @@ -0,0 +1,140 @@ +""" +Scheduler: Manages task queues and self-waking timers. + +When all models are rate-limited, the scheduler sets a system timer +to wake the orchestrator back up when limits expire. +""" + +import json +import logging +import subprocess +from datetime import datetime, timedelta +from pathlib import Path +from typing import Optional + +logger = logging.getLogger(__name__) + +QUEUE_PATH = Path("/data/symbiont/queue.jsonl") +TIMER_NAME = "symbiont-wake" + + +def enqueue_task(task: str, priority: int = 5, metadata: Optional[dict] = None): + """Add a task to the persistent queue.""" + QUEUE_PATH.parent.mkdir(parents=True, exist_ok=True) + + entry = { + "id": datetime.now().strftime("%Y%m%d%H%M%S%f"), + "task": task, + "priority": priority, + "status": "pending", + "created_at": datetime.now().isoformat(), + "metadata": metadata or {}, + } + + with open(QUEUE_PATH, "a") as f: + f.write(json.dumps(entry) + "\n") + + logger.info(f"Enqueued task {entry['id']}: {task[:80]}...") + return entry["id"] + + +def get_pending_tasks() -> list[dict]: + """Read all pending tasks from the queue, sorted by priority.""" + if not QUEUE_PATH.exists(): + return [] + + tasks = [] + with open(QUEUE_PATH) as f: + for line in f: + line = line.strip() + if not line: + continue + try: + entry = json.loads(line) + if entry.get("status") == "pending": + tasks.append(entry) + except json.JSONDecodeError: + continue + + return sorted(tasks, key=lambda t: t.get("priority", 5)) + + +def mark_task_done(task_id: str, result: Optional[str] = None): + """Mark a task as completed in the queue file.""" + if not QUEUE_PATH.exists(): + return + + lines = QUEUE_PATH.read_text().strip().split("\n") + updated = [] + for line in lines: + if not line.strip(): + continue + try: + entry = json.loads(line) + if entry.get("id") == task_id: + entry["status"] = "completed" + entry["completed_at"] = datetime.now().isoformat() + if result: + entry["result_preview"] = result[:500] + updated.append(json.dumps(entry)) + except json.JSONDecodeError: + updated.append(line) + + QUEUE_PATH.write_text("\n".join(updated) + "\n") + + +def schedule_wake(wake_at: datetime): + """ + Create a systemd transient timer to wake the orchestrator. + This is how Symbiont sleeps through rate limits instead of busy-waiting. + """ + delay_seconds = max(1, int((wake_at - datetime.now()).total_seconds())) + + logger.info(f"Scheduling wake in {delay_seconds}s (at {wake_at.isoformat()})") + + # Use systemd-run to create a one-shot timer that runs our wake script + cmd = [ + "systemd-run", + "--unit", TIMER_NAME, + "--on-active", f"{delay_seconds}s", + "--description", "Symbiont self-wake after rate limit", + "/usr/bin/python3", "-m", "symbiont.wake", + ] + + try: + subprocess.run(cmd, capture_output=True, text=True, check=True) + logger.info(f"Wake timer set: {TIMER_NAME}") + except subprocess.CalledProcessError as e: + logger.error(f"Failed to set wake timer: {e.stderr}") + # Fallback: write a cron-style at job + _fallback_schedule(delay_seconds) + + +def _fallback_schedule(delay_seconds: int): + """Fallback: use `at` command if systemd-run fails.""" + try: + proc = subprocess.run( + ["at", f"now + {max(1, delay_seconds // 60)} minutes"], + input="cd /data/symbiont && python3 -m symbiont.wake\n", + capture_output=True, + text=True, + ) + if proc.returncode == 0: + logger.info("Fallback wake scheduled via `at`") + else: + logger.error(f"Fallback scheduling failed: {proc.stderr}") + except FileNotFoundError: + logger.error("`at` command not available. Cannot schedule wake.") + + +def cancel_wake(): + """Cancel a pending wake timer.""" + try: + subprocess.run( + ["systemctl", "stop", f"{TIMER_NAME}.timer"], + capture_output=True, + text=True, + ) + logger.info("Wake timer cancelled") + except Exception: + pass diff --git a/symbiont/wake.py b/symbiont/wake.py new file mode 100644 index 0000000..dd7384d --- /dev/null +++ b/symbiont/wake.py @@ -0,0 +1,48 @@ +""" +Wake module: Called by systemd timer when rate limits expire. +Processes any pending tasks in the queue. +""" + +import logging +from .scheduler import get_pending_tasks, mark_task_done +from .router import route_task + +logging.basicConfig( + level=logging.INFO, + format="%(asctime)s [%(name)s] %(levelname)s: %(message)s", +) +logger = logging.getLogger(__name__) + + +def process_queue(): + """Process all pending tasks in the queue.""" + tasks = get_pending_tasks() + + if not tasks: + logger.info("No pending tasks. Going back to sleep.") + return + + logger.info(f"Woke up with {len(tasks)} pending tasks") + + for task_entry in tasks: + task_id = task_entry["id"] + task_text = task_entry["task"] + + logger.info(f"Processing task {task_id}: {task_text[:80]}...") + + result = route_task(task_text) + + if result["success"]: + mark_task_done(task_id, result["output"]) + logger.info(f"Task {task_id} completed via {result['model_used']}") + elif result["rate_limited"]: + logger.warning(f"Task {task_id} still rate-limited, leaving in queue") + # The dispatcher will have already scheduled a new wake timer + break + else: + logger.error(f"Task {task_id} failed: {result['error']}") + mark_task_done(task_id, f"ERROR: {result['error']}") + + +if __name__ == "__main__": + process_queue()