Initial scaffold: router, dispatcher, ledger, scheduler, API

Core orchestrator for self-sustaining AI agent:
- Dispatcher: talks to Claude Code CLI with model tier selection
- Router: classifies tasks via Haiku, routes to cheapest capable model
- Scheduler: queue management + systemd self-wake timers
- API: FastAPI endpoints for task execution and monitoring
- Ledger: JSONL cost tracking for every inference call

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
Symbiont 2026-03-19 19:21:07 +00:00
commit bd8be87a65
11 changed files with 869 additions and 0 deletions

5
.gitignore vendored Normal file
View File

@ -0,0 +1,5 @@
__pycache__/
*.pyc
data/
*.egg-info/
.env

24
README.md Normal file
View File

@ -0,0 +1,24 @@
# Symbiont
A self-sustaining AI orchestrator that manages its own compute costs through intelligent LLM routing.
## Architecture
- **Router**: Classifies tasks and dispatches to the cheapest capable model
- **Dispatcher**: Executes tasks via Claude Code CLI (Pro account) with fallback tiers
- **Ledger**: Tracks every inference call, cost, and associated revenue
- **Scheduler**: Handles rate-limit backoff and self-waking timers
## Model Tiers
| Tier | Model | Use Case |
|------|-------|----------|
| 1 | Haiku | Classification, extraction, simple formatting |
| 2 | Sonnet | Content writing, code gen, analysis |
| 3 | Opus | Strategic decisions, complex reasoning, QA |
## Setup
1. Authenticate Claude Code CLI: `claude login`
2. Install dependencies: `pip install -r requirements.txt`
3. Run: `python3 -m symbiont.main`

3
requirements.txt Normal file
View File

@ -0,0 +1,3 @@
fastapi>=0.115.0
uvicorn>=0.32.0
pydantic>=2.0

2
symbiont/__init__.py Normal file
View File

@ -0,0 +1,2 @@
"""Symbiont - Self-sustaining AI orchestrator."""
__version__ = "0.1.0"

3
symbiont/__main__.py Normal file
View File

@ -0,0 +1,3 @@
"""Allow `python3 -m symbiont` to work."""
from .main import main
main()

150
symbiont/api.py Normal file
View File

@ -0,0 +1,150 @@
"""
FastAPI server for the Symbiont orchestrator.
Endpoints:
POST /task Run a task through the router
POST /queue Add a task to the queue
GET /status Health check + rate limit status
GET /ledger Recent ledger entries
GET /ledger/stats Aggregate cost/usage stats
"""
import json
import logging
from datetime import datetime
from pathlib import Path
from typing import Optional
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
from .dispatcher import ModelTier, rate_limits
from .router import route_task
from .scheduler import enqueue_task, get_pending_tasks
logger = logging.getLogger(__name__)
app = FastAPI(
title="Symbiont",
description="Self-sustaining AI orchestrator",
version="0.1.0",
)
LEDGER_PATH = Path("/data/symbiont/ledger.jsonl")
class TaskRequest(BaseModel):
task: str
system_prompt: Optional[str] = None
force_tier: Optional[str] = None # "haiku", "sonnet", "opus"
class QueueRequest(BaseModel):
task: str
priority: int = 5
metadata: Optional[dict] = None
@app.post("/task")
async def run_task(req: TaskRequest):
"""Execute a task immediately through the router."""
force = None
if req.force_tier:
try:
force = ModelTier(req.force_tier.lower())
except ValueError:
raise HTTPException(400, f"Invalid tier: {req.force_tier}")
result = route_task(
req.task,
system_prompt=req.system_prompt,
force_tier=force,
)
return result
@app.post("/queue")
async def queue_task(req: QueueRequest):
"""Add a task to the queue for later processing."""
task_id = enqueue_task(req.task, req.priority, req.metadata)
return {"task_id": task_id, "status": "queued"}
@app.get("/status")
async def status():
"""Health check and current rate limit status."""
limits = {}
for tier in ModelTier:
if rate_limits.is_limited(tier):
limits[tier.value] = {
"limited": True,
"until": rate_limits.limited_until[tier].isoformat(),
}
else:
limits[tier.value] = {"limited": False}
pending = get_pending_tasks()
return {
"status": "alive",
"timestamp": datetime.now().isoformat(),
"rate_limits": limits,
"pending_tasks": len(pending),
}
@app.get("/ledger")
async def get_ledger(limit: int = 50):
"""Return the most recent ledger entries."""
if not LEDGER_PATH.exists():
return {"entries": [], "total": 0}
lines = LEDGER_PATH.read_text().strip().split("\n")
entries = []
for line in lines[-limit:]:
try:
entries.append(json.loads(line))
except json.JSONDecodeError:
continue
return {"entries": entries, "total": len(lines)}
@app.get("/ledger/stats")
async def ledger_stats():
"""Aggregate statistics from the ledger."""
if not LEDGER_PATH.exists():
return {"total_calls": 0}
lines = LEDGER_PATH.read_text().strip().split("\n")
entries = []
for line in lines:
try:
entries.append(json.loads(line))
except json.JSONDecodeError:
continue
if not entries:
return {"total_calls": 0}
total_cost = sum(e.get("estimated_cost_usd", 0) for e in entries)
by_model = {}
for e in entries:
model = e.get("model", "unknown")
if model not in by_model:
by_model[model] = {"calls": 0, "cost": 0, "tokens_in": 0, "tokens_out": 0}
by_model[model]["calls"] += 1
by_model[model]["cost"] += e.get("estimated_cost_usd", 0)
by_model[model]["tokens_in"] += e.get("input_tokens", 0)
by_model[model]["tokens_out"] += e.get("output_tokens", 0)
successes = sum(1 for e in entries if e.get("success"))
return {
"total_calls": len(entries),
"successful_calls": successes,
"total_estimated_cost_usd": round(total_cost, 4),
"by_model": by_model,
"first_entry": entries[0].get("timestamp") if entries else None,
"last_entry": entries[-1].get("timestamp") if entries else None,
}

271
symbiont/dispatcher.py Normal file
View File

@ -0,0 +1,271 @@
"""
Dispatcher: Executes tasks via Claude Code CLI.
Uses Michael's Pro account through the `claude` CLI. Handles model selection,
rate-limit detection, and retry scheduling.
"""
import subprocess
import json
import time
import logging
from dataclasses import dataclass, field
from datetime import datetime, timedelta
from enum import Enum
from pathlib import Path
from typing import Optional
logger = logging.getLogger(__name__)
LEDGER_PATH = Path("/data/symbiont/ledger.jsonl")
class ModelTier(Enum):
HAIKU = "haiku"
SONNET = "sonnet"
OPUS = "opus"
# Approximate token costs (USD per million tokens) for ledger tracking.
# Even though we're on Pro (flat rate), tracking this tells us what our
# "burn rate" would be on API pricing — useful for knowing when to switch.
MODEL_COSTS = {
ModelTier.HAIKU: {"input": 0.25, "output": 1.25},
ModelTier.SONNET: {"input": 3.00, "output": 15.00},
ModelTier.OPUS: {"input": 15.00, "output": 75.00},
}
@dataclass
class DispatchResult:
success: bool
output: str
model: ModelTier
elapsed_seconds: float
rate_limited: bool = False
error: Optional[str] = None
# Token counts if available from CLI output
input_tokens: int = 0
output_tokens: int = 0
estimated_cost_usd: float = 0.0
@dataclass
class RateLimitState:
"""Tracks rate limit status per model tier."""
limited_until: dict = field(default_factory=dict) # ModelTier -> datetime
def is_limited(self, tier: ModelTier) -> bool:
if tier not in self.limited_until:
return False
return datetime.now() < self.limited_until[tier]
def mark_limited(self, tier: ModelTier, backoff_minutes: int = 15):
self.limited_until[tier] = datetime.now() + timedelta(minutes=backoff_minutes)
logger.warning(
f"Rate limited on {tier.value}, backing off until "
f"{self.limited_until[tier].isoformat()}"
)
def next_available(self) -> Optional[datetime]:
"""Return the earliest time any model becomes available, or None if none are limited."""
times = [t for t in self.limited_until.values() if t > datetime.now()]
return min(times) if times else None
# Module-level rate limit state (persists across calls within a process)
rate_limits = RateLimitState()
def dispatch(
prompt: str,
tier: ModelTier = ModelTier.SONNET,
system_prompt: Optional[str] = None,
max_tokens: int = 4096,
timeout_seconds: int = 120,
) -> DispatchResult:
"""
Dispatch a prompt to Claude via the CLI.
Uses `claude --model <model> --print` for non-interactive single-shot mode.
Detects rate limits from stderr and marks the tier for backoff.
"""
# Check if this tier is rate-limited
if rate_limits.is_limited(tier):
return DispatchResult(
success=False,
output="",
model=tier,
elapsed_seconds=0,
rate_limited=True,
error=f"Rate limited until {rate_limits.limited_until[tier].isoformat()}",
)
# Build the CLI command
model_flag = {
ModelTier.HAIKU: "haiku",
ModelTier.SONNET: "sonnet",
ModelTier.OPUS: "opus",
}[tier]
cmd = [
"claude",
"--print", # non-interactive, just print the response
"--model", model_flag,
"--max-tokens", str(max_tokens),
"--output-format", "json",
]
if system_prompt:
cmd.extend(["--system-prompt", system_prompt])
cmd.extend(["--prompt", prompt])
start = time.monotonic()
try:
result = subprocess.run(
cmd,
capture_output=True,
text=True,
timeout=timeout_seconds,
)
elapsed = time.monotonic() - start
# Try to parse JSON output for token counts
output_text = result.stdout.strip()
input_tokens = 0
output_tokens = 0
try:
parsed = json.loads(output_text)
# Claude Code JSON output includes result and usage
if isinstance(parsed, dict):
output_text = parsed.get("result", output_text)
usage = parsed.get("usage", {})
input_tokens = usage.get("input_tokens", 0)
output_tokens = usage.get("output_tokens", 0)
except json.JSONDecodeError:
pass # Plain text output, that's fine
# Check for rate limiting in stderr
stderr = result.stderr.strip()
if "rate" in stderr.lower() and "limit" in stderr.lower():
rate_limits.mark_limited(tier)
return DispatchResult(
success=False,
output="",
model=tier,
elapsed_seconds=elapsed,
rate_limited=True,
error=f"Rate limited: {stderr}",
)
if result.returncode != 0:
return DispatchResult(
success=False,
output="",
model=tier,
elapsed_seconds=elapsed,
error=f"Exit code {result.returncode}: {stderr}",
)
# Estimate cost for the ledger
costs = MODEL_COSTS[tier]
estimated_cost = (
(input_tokens / 1_000_000) * costs["input"]
+ (output_tokens / 1_000_000) * costs["output"]
)
dispatch_result = DispatchResult(
success=True,
output=output_text,
model=tier,
elapsed_seconds=elapsed,
input_tokens=input_tokens,
output_tokens=output_tokens,
estimated_cost_usd=estimated_cost,
)
# Log to ledger
_log_to_ledger(prompt, dispatch_result)
return dispatch_result
except subprocess.TimeoutExpired:
elapsed = time.monotonic() - start
return DispatchResult(
success=False,
output="",
model=tier,
elapsed_seconds=elapsed,
error=f"Timeout after {timeout_seconds}s",
)
except Exception as e:
elapsed = time.monotonic() - start
return DispatchResult(
success=False,
output="",
model=tier,
elapsed_seconds=elapsed,
error=str(e),
)
def dispatch_with_fallback(
prompt: str,
preferred_tier: ModelTier = ModelTier.SONNET,
system_prompt: Optional[str] = None,
) -> DispatchResult:
"""
Try the preferred tier, fall back to alternatives if rate-limited.
Fallback order: preferred -> next tier up -> next tier down.
"""
fallback_chains = {
ModelTier.HAIKU: [ModelTier.HAIKU, ModelTier.SONNET, ModelTier.OPUS],
ModelTier.SONNET: [ModelTier.SONNET, ModelTier.HAIKU, ModelTier.OPUS],
ModelTier.OPUS: [ModelTier.OPUS, ModelTier.SONNET, ModelTier.HAIKU],
}
for tier in fallback_chains[preferred_tier]:
if rate_limits.is_limited(tier):
logger.info(f"Skipping {tier.value} (rate limited)")
continue
result = dispatch(prompt, tier=tier, system_prompt=system_prompt)
if result.rate_limited:
continue # Try next in chain
return result
# All tiers rate-limited — return info about when to retry
next_time = rate_limits.next_available()
return DispatchResult(
success=False,
output="",
model=preferred_tier,
elapsed_seconds=0,
rate_limited=True,
error=f"All tiers rate-limited. Next available: {next_time.isoformat() if next_time else 'unknown'}",
)
def _log_to_ledger(prompt: str, result: DispatchResult):
"""Append a line to the JSONL ledger."""
LEDGER_PATH.parent.mkdir(parents=True, exist_ok=True)
entry = {
"timestamp": datetime.now().isoformat(),
"model": result.model.value,
"success": result.success,
"elapsed_seconds": round(result.elapsed_seconds, 2),
"input_tokens": result.input_tokens,
"output_tokens": result.output_tokens,
"estimated_cost_usd": round(result.estimated_cost_usd, 6),
"prompt_preview": prompt[:200],
}
with open(LEDGER_PATH, "a") as f:
f.write(json.dumps(entry) + "\n")

83
symbiont/main.py Normal file
View File

@ -0,0 +1,83 @@
"""
Symbiont main entry point.
Can run in two modes:
1. CLI: `python3 -m symbiont.main "do something"`
2. API: `python3 -m symbiont.main --serve` (starts FastAPI server)
"""
import argparse
import json
import logging
import sys
from pathlib import Path
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [%(name)s] %(levelname)s: %(message)s",
)
logger = logging.getLogger(__name__)
def run_cli(task: str, tier: str = None):
"""Run a single task through the router and print the result."""
from .router import route_task
from .dispatcher import ModelTier
force_tier = None
if tier:
force_tier = ModelTier(tier.lower())
result = route_task(task, force_tier=force_tier)
if result["success"]:
print(result["output"])
else:
print(f"ERROR: {result['error']}", file=sys.stderr)
sys.exit(1)
# Print metadata to stderr so it doesn't pollute stdout
meta = {k: v for k, v in result.items() if k != "output"}
print(f"\n--- Dispatch Metadata ---", file=sys.stderr)
print(json.dumps(meta, indent=2), file=sys.stderr)
def run_server(host: str = "127.0.0.1", port: int = 8111):
"""Start the FastAPI server for the orchestrator API."""
from .api import app
import uvicorn
logger.info(f"Starting Symbiont API on {host}:{port}")
uvicorn.run(app, host=host, port=port)
def main():
parser = argparse.ArgumentParser(description="Symbiont AI Orchestrator")
parser.add_argument("task", nargs="?", help="Task to execute (CLI mode)")
parser.add_argument("--serve", action="store_true", help="Run as API server")
parser.add_argument("--host", default="127.0.0.1", help="API host (default: localhost)")
parser.add_argument("--port", type=int, default=8111, help="API port (default: 8111)")
parser.add_argument(
"--tier",
choices=["haiku", "sonnet", "opus"],
help="Force a specific model tier",
)
parser.add_argument("--queue", action="store_true", help="Add task to queue instead of running immediately")
args = parser.parse_args()
if args.serve:
run_server(args.host, args.port)
elif args.task:
if args.queue:
from .scheduler import enqueue_task
task_id = enqueue_task(args.task)
print(f"Queued as {task_id}")
else:
run_cli(args.task, args.tier)
else:
parser.print_help()
if __name__ == "__main__":
main()

140
symbiont/router.py Normal file
View File

@ -0,0 +1,140 @@
"""
Router: Classifies tasks and picks the cheapest capable model.
The router itself runs on Haiku (cheapest) to minimize overhead.
It examines the task and assigns a model tier based on complexity.
"""
import json
import logging
from typing import Optional
from .dispatcher import ModelTier, dispatch
logger = logging.getLogger(__name__)
# Classification prompt — this runs on Haiku to keep costs minimal
CLASSIFIER_SYSTEM_PROMPT = """You are a task classifier for an AI orchestration system.
Your job is to assess a task and decide which model tier should handle it.
Tiers:
- HAIKU: Simple tasks. Template filling, reformatting, extraction from structured data,
classification, boilerplate generation, simple Q&A. ~70% of tasks.
- SONNET: Medium tasks. Content writing, code generation, moderate analysis,
customer-facing communication, summarization of complex documents. ~25% of tasks.
- OPUS: Complex tasks. Strategic decisions, novel problem-solving, multi-step reasoning,
quality review of important outputs, creative/nuanced work. ~5% of tasks.
Respond with ONLY a JSON object:
{"tier": "HAIKU"|"SONNET"|"OPUS", "confidence": 0.0-1.0, "reason": "brief explanation"}
"""
def classify_task(task_description: str) -> tuple[ModelTier, float, str]:
"""
Use Haiku to classify which tier should handle this task.
Returns (tier, confidence, reason).
If Haiku is unavailable (rate-limited), falls back to a simple
heuristic classifier.
"""
result = dispatch(
prompt=f"Classify this task:\n\n{task_description}",
tier=ModelTier.HAIKU,
system_prompt=CLASSIFIER_SYSTEM_PROMPT,
max_tokens=256,
)
if result.success:
try:
# Try to parse the JSON response
text = result.output.strip()
# Handle markdown code blocks
if text.startswith("```"):
text = text.split("\n", 1)[1].rsplit("```", 1)[0].strip()
classification = json.loads(text)
tier_name = classification.get("tier", "SONNET").upper()
confidence = float(classification.get("confidence", 0.5))
reason = classification.get("reason", "no reason given")
tier = {
"HAIKU": ModelTier.HAIKU,
"SONNET": ModelTier.SONNET,
"OPUS": ModelTier.OPUS,
}.get(tier_name, ModelTier.SONNET)
logger.info(f"Classified as {tier.value} (confidence={confidence}): {reason}")
return tier, confidence, reason
except (json.JSONDecodeError, KeyError, ValueError) as e:
logger.warning(f"Failed to parse classification: {e}, falling back to heuristic")
# Fallback: simple heuristic classifier (free, no LLM needed)
return _heuristic_classify(task_description)
def _heuristic_classify(task: str) -> tuple[ModelTier, float, str]:
"""
Dead-simple keyword heuristic. Used when Haiku is unavailable.
Costs zero tokens pure Python.
"""
task_lower = task.lower()
# OPUS indicators
opus_signals = [
"design", "architect", "strategy", "analyze complex", "review",
"creative", "novel", "brainstorm", "plan", "evaluate",
"multi-step", "reasoning", "trade-off", "nuanced",
]
if any(signal in task_lower for signal in opus_signals):
return ModelTier.OPUS, 0.4, "heuristic: complexity keywords detected"
# HAIKU indicators
haiku_signals = [
"extract", "format", "convert", "classify", "list",
"template", "simple", "boilerplate", "parse", "json",
"reformat", "count", "sort", "filter", "summarize short",
]
if any(signal in task_lower for signal in haiku_signals):
return ModelTier.HAIKU, 0.5, "heuristic: simplicity keywords detected"
# Default to SONNET
return ModelTier.SONNET, 0.3, "heuristic: defaulting to sonnet"
def route_task(
task: str,
system_prompt: Optional[str] = None,
force_tier: Optional[ModelTier] = None,
) -> dict:
"""
Full routing pipeline: classify the task, dispatch to the right model,
return the result with metadata.
"""
if force_tier:
tier = force_tier
confidence = 1.0
reason = "forced by caller"
else:
tier, confidence, reason = classify_task(task)
from .dispatcher import dispatch_with_fallback
result = dispatch_with_fallback(
prompt=task,
preferred_tier=tier,
system_prompt=system_prompt,
)
return {
"output": result.output,
"success": result.success,
"model_used": result.model.value,
"model_requested": tier.value,
"classification_confidence": confidence,
"classification_reason": reason,
"elapsed_seconds": result.elapsed_seconds,
"estimated_cost_usd": result.estimated_cost_usd,
"rate_limited": result.rate_limited,
"error": result.error,
}

140
symbiont/scheduler.py Normal file
View File

@ -0,0 +1,140 @@
"""
Scheduler: Manages task queues and self-waking timers.
When all models are rate-limited, the scheduler sets a system timer
to wake the orchestrator back up when limits expire.
"""
import json
import logging
import subprocess
from datetime import datetime, timedelta
from pathlib import Path
from typing import Optional
logger = logging.getLogger(__name__)
QUEUE_PATH = Path("/data/symbiont/queue.jsonl")
TIMER_NAME = "symbiont-wake"
def enqueue_task(task: str, priority: int = 5, metadata: Optional[dict] = None):
"""Add a task to the persistent queue."""
QUEUE_PATH.parent.mkdir(parents=True, exist_ok=True)
entry = {
"id": datetime.now().strftime("%Y%m%d%H%M%S%f"),
"task": task,
"priority": priority,
"status": "pending",
"created_at": datetime.now().isoformat(),
"metadata": metadata or {},
}
with open(QUEUE_PATH, "a") as f:
f.write(json.dumps(entry) + "\n")
logger.info(f"Enqueued task {entry['id']}: {task[:80]}...")
return entry["id"]
def get_pending_tasks() -> list[dict]:
"""Read all pending tasks from the queue, sorted by priority."""
if not QUEUE_PATH.exists():
return []
tasks = []
with open(QUEUE_PATH) as f:
for line in f:
line = line.strip()
if not line:
continue
try:
entry = json.loads(line)
if entry.get("status") == "pending":
tasks.append(entry)
except json.JSONDecodeError:
continue
return sorted(tasks, key=lambda t: t.get("priority", 5))
def mark_task_done(task_id: str, result: Optional[str] = None):
"""Mark a task as completed in the queue file."""
if not QUEUE_PATH.exists():
return
lines = QUEUE_PATH.read_text().strip().split("\n")
updated = []
for line in lines:
if not line.strip():
continue
try:
entry = json.loads(line)
if entry.get("id") == task_id:
entry["status"] = "completed"
entry["completed_at"] = datetime.now().isoformat()
if result:
entry["result_preview"] = result[:500]
updated.append(json.dumps(entry))
except json.JSONDecodeError:
updated.append(line)
QUEUE_PATH.write_text("\n".join(updated) + "\n")
def schedule_wake(wake_at: datetime):
"""
Create a systemd transient timer to wake the orchestrator.
This is how Symbiont sleeps through rate limits instead of busy-waiting.
"""
delay_seconds = max(1, int((wake_at - datetime.now()).total_seconds()))
logger.info(f"Scheduling wake in {delay_seconds}s (at {wake_at.isoformat()})")
# Use systemd-run to create a one-shot timer that runs our wake script
cmd = [
"systemd-run",
"--unit", TIMER_NAME,
"--on-active", f"{delay_seconds}s",
"--description", "Symbiont self-wake after rate limit",
"/usr/bin/python3", "-m", "symbiont.wake",
]
try:
subprocess.run(cmd, capture_output=True, text=True, check=True)
logger.info(f"Wake timer set: {TIMER_NAME}")
except subprocess.CalledProcessError as e:
logger.error(f"Failed to set wake timer: {e.stderr}")
# Fallback: write a cron-style at job
_fallback_schedule(delay_seconds)
def _fallback_schedule(delay_seconds: int):
"""Fallback: use `at` command if systemd-run fails."""
try:
proc = subprocess.run(
["at", f"now + {max(1, delay_seconds // 60)} minutes"],
input="cd /data/symbiont && python3 -m symbiont.wake\n",
capture_output=True,
text=True,
)
if proc.returncode == 0:
logger.info("Fallback wake scheduled via `at`")
else:
logger.error(f"Fallback scheduling failed: {proc.stderr}")
except FileNotFoundError:
logger.error("`at` command not available. Cannot schedule wake.")
def cancel_wake():
"""Cancel a pending wake timer."""
try:
subprocess.run(
["systemctl", "stop", f"{TIMER_NAME}.timer"],
capture_output=True,
text=True,
)
logger.info("Wake timer cancelled")
except Exception:
pass

48
symbiont/wake.py Normal file
View File

@ -0,0 +1,48 @@
"""
Wake module: Called by systemd timer when rate limits expire.
Processes any pending tasks in the queue.
"""
import logging
from .scheduler import get_pending_tasks, mark_task_done
from .router import route_task
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [%(name)s] %(levelname)s: %(message)s",
)
logger = logging.getLogger(__name__)
def process_queue():
"""Process all pending tasks in the queue."""
tasks = get_pending_tasks()
if not tasks:
logger.info("No pending tasks. Going back to sleep.")
return
logger.info(f"Woke up with {len(tasks)} pending tasks")
for task_entry in tasks:
task_id = task_entry["id"]
task_text = task_entry["task"]
logger.info(f"Processing task {task_id}: {task_text[:80]}...")
result = route_task(task_text)
if result["success"]:
mark_task_done(task_id, result["output"])
logger.info(f"Task {task_id} completed via {result['model_used']}")
elif result["rate_limited"]:
logger.warning(f"Task {task_id} still rate-limited, leaving in queue")
# The dispatcher will have already scheduled a new wake timer
break
else:
logger.error(f"Task {task_id} failed: {result['error']}")
mark_task_done(task_id, f"ERROR: {result['error']}")
if __name__ == "__main__":
process_queue()