Core orchestrator for self-sustaining AI agent: - Dispatcher: talks to Claude Code CLI with model tier selection - Router: classifies tasks via Haiku, routes to cheapest capable model - Scheduler: queue management + systemd self-wake timers - API: FastAPI endpoints for task execution and monitoring - Ledger: JSONL cost tracking for every inference call Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
49 lines
1.4 KiB
Python
49 lines
1.4 KiB
Python
"""
|
|
Wake module: Called by systemd timer when rate limits expire.
|
|
Processes any pending tasks in the queue.
|
|
"""
|
|
|
|
import logging
|
|
from .scheduler import get_pending_tasks, mark_task_done
|
|
from .router import route_task
|
|
|
|
logging.basicConfig(
|
|
level=logging.INFO,
|
|
format="%(asctime)s [%(name)s] %(levelname)s: %(message)s",
|
|
)
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
def process_queue():
|
|
"""Process all pending tasks in the queue."""
|
|
tasks = get_pending_tasks()
|
|
|
|
if not tasks:
|
|
logger.info("No pending tasks. Going back to sleep.")
|
|
return
|
|
|
|
logger.info(f"Woke up with {len(tasks)} pending tasks")
|
|
|
|
for task_entry in tasks:
|
|
task_id = task_entry["id"]
|
|
task_text = task_entry["task"]
|
|
|
|
logger.info(f"Processing task {task_id}: {task_text[:80]}...")
|
|
|
|
result = route_task(task_text)
|
|
|
|
if result["success"]:
|
|
mark_task_done(task_id, result["output"])
|
|
logger.info(f"Task {task_id} completed via {result['model_used']}")
|
|
elif result["rate_limited"]:
|
|
logger.warning(f"Task {task_id} still rate-limited, leaving in queue")
|
|
# The dispatcher will have already scheduled a new wake timer
|
|
break
|
|
else:
|
|
logger.error(f"Task {task_id} failed: {result['error']}")
|
|
mark_task_done(task_id, f"ERROR: {result['error']}")
|
|
|
|
|
|
if __name__ == "__main__":
|
|
process_queue()
|