diff --git a/engram.db b/engram.db index 6ce01ab..b9fd154 100644 Binary files a/engram.db and b/engram.db differ diff --git a/engram.db.bak.20260330_181823 b/engram.db.bak.20260330_181823 new file mode 100644 index 0000000..711f2f0 Binary files /dev/null and b/engram.db.bak.20260330_181823 differ diff --git a/public_api.py b/public_api.py new file mode 100755 index 0000000..74532e2 --- /dev/null +++ b/public_api.py @@ -0,0 +1,211 @@ +#!/usr/bin/env python3 +""" +Cortex Public API — lightweight endpoints for external Claude sessions. +Runs on port 8112, exposed via Caddy at cortex.hydrascale.net/api/ + +Auth: pass API key via X-API-Key header, Authorization: Bearer, or ?key= query param. +""" + +import json +import os +import subprocess +import sqlite3 +from datetime import datetime, timedelta +from pathlib import Path +from typing import Optional + +from fastapi import FastAPI, HTTPException, Header, Depends, Query, Request +from fastapi.middleware.cors import CORSMiddleware + +API_KEY = "cortex-pub-a7f3e9c1d4b8" + +app = FastAPI(title="Cortex Public API", version="0.2.0") + +app.add_middleware( + CORSMiddleware, + allow_origins=["*"], + allow_methods=["GET", "POST"], + allow_headers=["*"], +) + + +def verify_key( + request: Request, + x_api_key: Optional[str] = Header(None), + authorization: Optional[str] = Header(None), + key: Optional[str] = Query(None), +): + """Accept key via X-API-Key header, Authorization: Bearer, or ?key= query param.""" + token = x_api_key or key + if not token and authorization and authorization.startswith("Bearer "): + token = authorization[7:] + if token != API_KEY: + raise HTTPException(status_code=401, detail="Invalid API key. Pass via X-API-Key header or ?key= param.") + return token + + +# ──────────── /health (no auth) ──────────── +@app.get("/health") +async def health(): + return {"status": "ok", "timestamp": datetime.utcnow().isoformat() + "Z"} + + +# ──────────── /info — system specs ──────────── +@app.get("/info") +async def system_info(auth: str = Depends(verify_key)): + """Return full system specs — CPU, RAM, disk, GPU, OS, etc.""" + specs = {} + + # OS + try: + out = subprocess.check_output(["lsb_release", "-ds"], text=True).strip() + specs["os"] = out + except Exception: + specs["os"] = "unknown" + + # Kernel + specs["kernel"] = subprocess.check_output(["uname", "-r"], text=True).strip() + + # Hostname + specs["hostname"] = subprocess.check_output(["hostname"], text=True).strip() + + # CPU + try: + cpuinfo = Path("/proc/cpuinfo").read_text() + model_names = [l.split(":")[1].strip() for l in cpuinfo.splitlines() if "model name" in l] + specs["cpu"] = { + "model": model_names[0] if model_names else "unknown", + "cores": len(model_names), + } + except Exception: + specs["cpu"] = "unknown" + + # RAM + try: + meminfo = Path("/proc/meminfo").read_text() + for line in meminfo.splitlines(): + if line.startswith("MemTotal:"): + kb = int(line.split()[1]) + specs["ram_gb"] = round(kb / 1024 / 1024, 1) + break + except Exception: + specs["ram_gb"] = "unknown" + + # Disk + try: + df = subprocess.check_output(["df", "-h", "/"], text=True).strip().split("\n")[1].split() + specs["disk"] = {"total": df[1], "used": df[2], "available": df[3], "use_pct": df[4]} + except Exception: + specs["disk"] = "unknown" + + # GPU + try: + gpu = subprocess.check_output(["lspci"], text=True) + gpu_lines = [l for l in gpu.splitlines() if "VGA" in l or "3D" in l or "Display" in l] + specs["gpu"] = gpu_lines if gpu_lines else "none detected" + except Exception: + specs["gpu"] = "lspci not available" + + # Uptime + try: + uptime = Path("/proc/uptime").read_text().split()[0] + specs["uptime_hours"] = round(float(uptime) / 3600, 1) + except Exception: + specs["uptime_hours"] = "unknown" + + # Docker containers + try: + containers = subprocess.check_output( + ["docker", "ps", "--format", "{{.Names}}: {{.Status}}"], text=True + ).strip() + specs["docker_containers"] = containers.splitlines() if containers else [] + except Exception: + specs["docker_containers"] = "docker not available" + + # Services + services = {} + for svc in ["caddy", "fail2ban", "docker", "symbiont-api", "cortex-public-api"]: + try: + status = subprocess.check_output( + ["systemctl", "is-active", svc], text=True + ).strip() + services[svc] = status + except Exception: + services[svc] = "unknown" + specs["services"] = services + + specs["timestamp"] = datetime.utcnow().isoformat() + "Z" + return specs + + +# ──────────── /engram/sitrep — session awareness ──────────── +@app.get("/engram/sitrep") +async def engram_sitrep(auth: str = Depends(verify_key)): + """Return situation report from engram DB.""" + db_path = Path("/data/symbiont/engram.db") + if not db_path.exists(): + return {"error": "engram.db not found", "sessions": []} + + conn = sqlite3.connect(str(db_path)) + conn.row_factory = sqlite3.Row + cur = conn.cursor() + + # Get table info first + cur.execute("SELECT name FROM sqlite_master WHERE type='table'") + tables = [r["name"] for r in cur.fetchall()] + + if "sessions" not in tables: + conn.close() + return {"error": "sessions table not found", "tables": tables} + + # Get active sessions + cur.execute("SELECT * FROM sessions WHERE status = 'active' ORDER BY started_at DESC") + active = [dict(r) for r in cur.fetchall()] + + # Get recent completed (last 24h) + cutoff = (datetime.utcnow() - timedelta(hours=24)).isoformat() + cur.execute( + "SELECT * FROM sessions WHERE status = 'completed' AND started_at > ? ORDER BY started_at DESC", + (cutoff,) + ) + recent = [dict(r) for r in cur.fetchall()] + + conn.close() + return {"active_sessions": active, "recent_24h": recent} + + +# ──────────── /engram/register — register a session ──────────── +@app.post("/engram/register") +async def engram_register( + session_type: str = Query(default="claude-chat"), + description: str = Query(default="External Claude session"), + auth: str = Depends(verify_key), +): + """Register a new session in engram.""" + import sys + sys.path.insert(0, "/data/symbiont") + from symbiont.engram import Engram + eng = Engram() + sid = eng.register(session_type, description) + return {"session_id": sid, "status": "registered"} + + +# ──────────── /engram/log — log to a session ──────────── +@app.post("/engram/log") +async def engram_log( + session_id: str = Query(...), + message: str = Query(...), + auth: str = Depends(verify_key), +): + """Log a message to an existing session.""" + import sys + sys.path.insert(0, "/data/symbiont") + from symbiont.engram import Engram + eng = Engram() + eng.log(session_id, message) + return {"status": "logged"} + + +if __name__ == "__main__": + import uvicorn + uvicorn.run(app, host="127.0.0.1", port=8112) diff --git a/symbiont/api.py.bak b/symbiont/api.py.bak new file mode 100644 index 0000000..0fb83c9 --- /dev/null +++ b/symbiont/api.py.bak @@ -0,0 +1,189 @@ +""" +FastAPI server for the Symbiont orchestrator. + +Endpoints: + POST /task — Run a task through the router + POST /queue — Add a task to the queue + GET /status — Health check + rate limit status + GET /ledger — Recent ledger entries + GET /ledger/stats — Aggregate cost/usage stats + GET /sessions — Active and recent sessions + GET /sitrep — Situation report for new sessions +""" + +import json +import logging +from datetime import datetime +from pathlib import Path +from typing import Optional + +from fastapi import FastAPI, HTTPException +from pydantic import BaseModel + +from .dispatcher import ModelTier, rate_limits +from .router import route_task +from .scheduler import enqueue_task, get_pending_tasks +from .engram import Engram as SessionRegistry + +logger = logging.getLogger(__name__) + +app = FastAPI( + title="Symbiont", + description="Self-sustaining AI orchestrator", + version="0.1.0", +) + +LEDGER_PATH = Path("/data/symbiont/ledger.jsonl") + + +class TaskRequest(BaseModel): + task: str + system_prompt: Optional[str] = None + force_tier: Optional[str] = None # "haiku", "sonnet", "opus" + + +class QueueRequest(BaseModel): + task: str + priority: int = 5 + metadata: Optional[dict] = None + + +@app.post("/task") +async def run_task(req: TaskRequest): + """Execute a task immediately through the router. Logs to session registry.""" + force = None + if req.force_tier: + try: + force = ModelTier(req.force_tier.lower()) + except ValueError: + raise HTTPException(400, f"Invalid tier: {req.force_tier}") + + # Register this dispatch as a micro-session + reg = SessionRegistry() + sid = reg.register("api-task", req.task[:120]) + + result = route_task( + req.task, + system_prompt=req.system_prompt, + force_tier=force, + ) + + # Complete the micro-session + if result["success"]: + reg.complete(sid, f"Completed via {result['model_used']} (${result['estimated_cost_usd']:.4f})") + else: + reg.complete(sid, f"Failed: {result.get('error', 'unknown')}") + + return result + + +@app.post("/queue") +async def queue_task(req: QueueRequest): + """Add a task to the queue for later processing.""" + task_id = enqueue_task(req.task, req.priority, req.metadata) + return {"task_id": task_id, "status": "queued"} + + +@app.get("/status") +async def status(): + """Health check and current rate limit status.""" + limits = {} + for tier in ModelTier: + if rate_limits.is_limited(tier): + limits[tier.value] = { + "limited": True, + "until": rate_limits.limited_until[tier].isoformat(), + } + else: + limits[tier.value] = {"limited": False} + + pending = get_pending_tasks() + + return { + "status": "alive", + "timestamp": datetime.now().isoformat(), + "rate_limits": limits, + "pending_tasks": len(pending), + } + + +@app.get("/ledger") +async def get_ledger(limit: int = 50): + """Return the most recent ledger entries.""" + if not LEDGER_PATH.exists(): + return {"entries": [], "total": 0} + + lines = LEDGER_PATH.read_text().strip().split("\n") + entries = [] + for line in lines[-limit:]: + try: + entries.append(json.loads(line)) + except json.JSONDecodeError: + continue + + return {"entries": entries, "total": len(lines)} + + +@app.get("/ledger/stats") +async def ledger_stats(): + """Aggregate statistics from the ledger.""" + if not LEDGER_PATH.exists(): + return {"total_calls": 0} + + lines = LEDGER_PATH.read_text().strip().split("\n") + entries = [] + for line in lines: + try: + entries.append(json.loads(line)) + except json.JSONDecodeError: + continue + + if not entries: + return {"total_calls": 0} + + total_cost = sum(e.get("estimated_cost_usd", 0) for e in entries) + by_model = {} + for e in entries: + model = e.get("model", "unknown") + if model not in by_model: + by_model[model] = {"calls": 0, "cost": 0, "tokens_in": 0, "tokens_out": 0} + by_model[model]["calls"] += 1 + by_model[model]["cost"] += e.get("estimated_cost_usd", 0) + by_model[model]["tokens_in"] += e.get("input_tokens", 0) + by_model[model]["tokens_out"] += e.get("output_tokens", 0) + + successes = sum(1 for e in entries if e.get("success")) + + return { + "total_calls": len(entries), + "successful_calls": successes, + "total_estimated_cost_usd": round(total_cost, 4), + "by_model": by_model, + "first_entry": entries[0].get("timestamp") if entries else None, + "last_entry": entries[-1].get("timestamp") if entries else None, + } + + +@app.get("/sitrep") +async def get_sitrep(): + """Situation report — what other sessions are doing. Read this first.""" + reg = SessionRegistry() + return { + "report": reg.get_situation_report(), + "active": reg.get_active_sessions(), + "recent_24h": reg.get_recent_sessions(hours=24), + } + + +@app.get("/sessions") +async def get_sessions(status: Optional[str] = None): + """List sessions, optionally filtered by status.""" + reg = SessionRegistry() + if status == "active": + return {"sessions": reg.get_active_sessions()} + elif status == "completed": + return {"sessions": reg.get_recent_sessions(hours=168)} # 1 week + else: + active = reg.get_active_sessions() + recent = reg.get_recent_sessions(hours=24) + return {"active": active, "recent_24h": recent} diff --git a/symbiont/api_additions.py b/symbiont/api_additions.py new file mode 100644 index 0000000..f576ade --- /dev/null +++ b/symbiont/api_additions.py @@ -0,0 +1,347 @@ +""" +FastAPI Endpoint Additions for Compound Tasks +============================================== + +New endpoints to integrate into the existing Symbiont API (/data/symbiont/symbiont/api.py). + +These endpoints expose the compound task system to external callers: +1. POST /task/compound - Submit a new compound task for execution +2. GET /task/{task_id}/progress - Poll for task progress +3. GET /tasks/recent - List recent tasks (dashboard view) + +Authentication: +- Task submission requires a bearer token +- Progress polling requires no auth (task ID is the "secret") +- Recent tasks list is unauthenticated (low-sensitivity data) + +Integration Instructions: +1. Import task_manager functions at the top of api.py: + from .task_manager import submit_compound_task, get_task_progress, list_recent_tasks + +2. Define authentication token (set to your secret): + TASK_AUTH_TOKEN = "cortex-tasks-2026" + +3. Add these classes and functions to api.py + +4. Add the three routes to your FastAPI app instance +""" + +from fastapi import FastAPI, HTTPException, Depends, Header +from pydantic import BaseModel, Field +from typing import Optional, List, Dict, Any +from .task_manager import submit_compound_task, get_task_progress, list_recent_tasks + +# These imports should be added to your api.py +# from .task_manager import submit_compound_task, get_task_progress, list_recent_tasks + + +# ============================================================================ +# Configuration +# ============================================================================ + +# Simple shared secret for task submission auth +# In production, consider using OAuth2, API keys, or other stronger methods +TASK_AUTH_TOKEN = "cortex-tasks-2026" + + +# ============================================================================ +# Authentication +# ============================================================================ + +def verify_task_auth( + authorization: Optional[str] = Header(None), + token: Optional[str] = None +) -> str: + """ + Verify task submission authentication. + + Supports two methods: + 1. Bearer token in Authorization header: "Authorization: Bearer {token}" + 2. Query parameter: ?token={token} + + Args: + authorization: Authorization header value + token: Token from query parameter + + Returns: + Verified token + + Raises: + HTTPException 401: Invalid or missing token + """ + auth_token = token or ( + authorization.replace("Bearer ", "") if authorization else None + ) + + if auth_token != TASK_AUTH_TOKEN: + raise HTTPException(status_code=401, detail="Invalid or missing auth token") + + return auth_token + + +# ============================================================================ +# Request/Response Models +# ============================================================================ + +class CompoundTaskRequest(BaseModel): + """Request body for compound task submission.""" + + prompt: str = Field( + ..., + description="The user's request to decompose and execute", + min_length=1, + max_length=5000 + ) + token: Optional[str] = Field( + None, + description="Auth token (alternative to Bearer header)" + ) + + class Config: + json_schema_extra = { + "example": { + "prompt": "Search for Python concurrency patterns and summarize the top 3" + } + } + + +class CompoundTaskResponse(BaseModel): + """Immediate response from task submission.""" + + id: str = Field( + ..., + description="Unique task ID for polling progress" + ) + status: str = Field( + ..., + description="Current task status (planned, executing, completed, partial)" + ) + subtask_count: int = Field( + ..., + description="Total number of subtasks in the plan" + ) + + class Config: + json_schema_extra = { + "example": { + "id": "compound-a1b2c3d4e5f6", + "status": "planned", + "subtask_count": 3 + } + } + + +class SubtaskSnapshot(BaseModel): + """Current state of a single subtask.""" + + id: str + index: int + description: str + tier_hint: int + tier_assigned: Optional[int] + model: Optional[str] + depends_on: List[int] + status: str + result: Optional[str] + cost: Optional[float] + started_at: Optional[str] + completed_at: Optional[str] + + +class TaskProgressResponse(BaseModel): + """Complete progress snapshot for a compound task.""" + + id: str + prompt: str + status: str + reasoning: str + subtasks: List[SubtaskSnapshot] + created_at: str + planned_at: str + completed_at: Optional[str] + total_cost: float + + class Config: + json_schema_extra = { + "example": { + "id": "compound-a1b2c3d4e5f6", + "prompt": "Search for Python patterns and summarize", + "status": "executing", + "reasoning": "Breaking into search, summarization, and output formatting", + "subtasks": [ + { + "id": "compound-a1b2c3d4e5f6-sub-0", + "index": 0, + "description": "Search for Python concurrency patterns", + "tier_hint": 2, + "tier_assigned": 2, + "model": "sonnet", + "depends_on": [], + "status": "completed", + "result": "Found patterns including...", + "cost": 0.04, + "started_at": "2026-03-21T15:30:00Z", + "completed_at": "2026-03-21T15:30:05Z" + } + ], + "created_at": "2026-03-21T15:30:00Z", + "planned_at": "2026-03-21T15:30:00Z", + "completed_at": None, + "total_cost": 0.04 + } + } + + +class TaskSummary(BaseModel): + """Summary of a task for list view.""" + + id: str + prompt: str + status: str + subtask_count: int + completed_count: int + total_cost: float + created_at: str + completed_at: Optional[str] + + +# ============================================================================ +# Endpoints +# ============================================================================ + +def setup_compound_task_endpoints(app: FastAPI) -> None: + """ + Register all compound task endpoints on the FastAPI app. + + Call this from your main api.py file: + from .api_additions import setup_compound_task_endpoints + setup_compound_task_endpoints(app) + + Or manually add the @app.post/@app.get routes below. + """ + + @app.post( + "/task/compound", + response_model=CompoundTaskResponse, + tags=["Compound Tasks"], + summary="Submit a compound task", + description="Submit a user prompt to be decomposed into subtasks and executed in parallel" + ) + async def create_compound_task( + req: CompoundTaskRequest, + auth: str = Depends(verify_task_auth) + ) -> CompoundTaskResponse: + """ + Submit a compound task for execution. + + The task is decomposed by Haiku into subtasks that can run in parallel. + Returns immediately with a task ID for polling progress. + + Args: + req: Task request with prompt and optional token + auth: Verified authentication token + + Returns: + Task ID and initial status + + Example: + curl -X POST http://localhost:8000/task/compound \\ + -H "Authorization: Bearer cortex-tasks-2026" \\ + -H "Content-Type: application/json" \\ + -d '{"prompt": "Search for patterns and summarize"}' + """ + result = submit_compound_task(req.prompt, auth_token=auth) + return CompoundTaskResponse(**result) + + @app.get( + "/task/{task_id}/progress", + response_model=TaskProgressResponse, + tags=["Compound Tasks"], + summary="Get task progress", + description="Poll for current execution status of a compound task" + ) + async def task_progress(task_id: str) -> TaskProgressResponse: + """ + Poll for compound task progress. + + No authentication required - task ID serves as the secret. Clients can poll + this endpoint to track execution progress and see individual subtask results. + + Args: + task_id: The task ID from create_compound_task + + Returns: + Complete task snapshot including all subtasks and their progress + + Raises: + 404: Task not found (expired from cache or invalid ID) + + Example: + curl http://localhost:8000/task/compound-a1b2c3d4e5f6/progress + """ + progress = get_task_progress(task_id) + if progress is None: + raise HTTPException(status_code=404, detail="Task not found") + return TaskProgressResponse(**progress) + + @app.get( + "/tasks/recent", + response_model=List[TaskSummary], + tags=["Compound Tasks"], + summary="List recent tasks", + description="Get summaries of recently submitted compound tasks" + ) + async def recent_tasks(limit: int = 20) -> List[TaskSummary]: + """ + List recent compound tasks (dashboard view). + + Useful for monitoring system activity and recent executions. + No authentication required. + + Args: + limit: Maximum number of tasks to return (default 20, max 100) + + Returns: + List of task summaries ordered by most recent first + + Example: + curl 'http://localhost:8000/tasks/recent?limit=10' + """ + if limit > 100: + limit = 100 + tasks = list_recent_tasks(limit=limit) + return [TaskSummary(**t) for t in tasks] + + +# ============================================================================ +# Manual Integration (if not using setup_compound_task_endpoints) +# ============================================================================ + +""" +If you prefer to manually add routes instead of using setup_compound_task_endpoints(), +add these to your api.py after creating the FastAPI app: + + # At the top of api.py, import these: + from .task_manager import submit_compound_task, get_task_progress, list_recent_tasks + + # Then add these route definitions: + + @app.post("/task/compound", response_model=CompoundTaskResponse, tags=["Compound Tasks"]) + async def create_compound_task(req: CompoundTaskRequest, auth: str = Depends(verify_task_auth)): + result = submit_compound_task(req.prompt, auth_token=auth) + return CompoundTaskResponse(**result) + + @app.get("/task/{task_id}/progress", response_model=TaskProgressResponse, tags=["Compound Tasks"]) + async def task_progress(task_id: str): + progress = get_task_progress(task_id) + if progress is None: + raise HTTPException(status_code=404, detail="Task not found") + return TaskProgressResponse(**progress) + + @app.get("/tasks/recent", response_model=List[TaskSummary], tags=["Compound Tasks"]) + async def recent_tasks(limit: int = 20): + if limit > 100: + limit = 100 + tasks = list_recent_tasks(limit=limit) + return [TaskSummary(**t) for t in tasks] +""" \ No newline at end of file diff --git a/symbiont/planner.py b/symbiont/planner.py new file mode 100644 index 0000000..2267985 --- /dev/null +++ b/symbiont/planner.py @@ -0,0 +1,167 @@ +""" +Compound Task Planner +===================== + +Takes a user prompt and uses Haiku (the cheapest tier) to decompose it into +independent subtasks that can be executed in parallel where possible. + +The planner handles: +- Breaking down complex requests into manageable subtasks +- Identifying parallelizable work (tasks with no dependencies) +- Suggesting optimal tier assignments (Haiku/Sonnet/Opus) +- Graceful fallback to single-task execution if planning fails +""" + +import json +import subprocess +import uuid +from datetime import datetime, timezone +from typing import Dict, List, Any + + +PLANNING_PROMPT_TEMPLATE = """You are a task planner. Given a user's request, break it into independent subtasks that can be executed in parallel where possible. + +Return ONLY valid JSON in this exact format: +{{ + "reasoning": "Brief explanation of your decomposition strategy", + "subtasks": [ + {{ + "description": "Clear, self-contained task description", + "tier_hint": 1, + "depends_on": [] + }} + ] +}} + +Rules: +- tier_hint: 1=simple/extraction, 2=moderate/writing/code, 3=complex/reasoning +- depends_on: array of subtask indices (0-based) that must complete first +- Tasks with empty depends_on can run in parallel +- Each subtask should be self-contained enough to execute independently +- Keep subtasks to 2-6 items (don't over-decompose simple requests) +- For simple single-step requests, return just one subtask + +User request: {prompt}""" + + +def plan_task(prompt: str) -> Dict[str, Any]: + """ + Use Haiku to decompose a prompt into subtasks. + + Args: + prompt: The user's request to decompose + + Returns: + A compound task plan with the following structure: + { + "id": "compound-{uuid}", + "prompt": original prompt, + "status": "planned", + "reasoning": explanation from Haiku, + "subtasks": [ + { + "id": "compound-{uuid}-sub-{i}", + "index": i, + "description": task description, + "tier_hint": 1|2|3, + "tier_assigned": None (will be set during execution), + "model": None (will be set during execution), + "depends_on": [indices of predecessor tasks], + "status": "pending", + "result": None, + "cost": None, + "started_at": None, + "completed_at": None + }, + ... + ], + "created_at": ISO8601 timestamp, + "planned_at": ISO8601 timestamp, + "completed_at": None, + "total_cost": 0.0 + } + """ + + task_id = f"compound-{uuid.uuid4().hex[:12]}" + now = datetime.now(timezone.utc).isoformat() + + planning_prompt = PLANNING_PROMPT_TEMPLATE.format(prompt=prompt) + + try: + # Invoke Claude CLI with Haiku for planning (cheapest option) + result = subprocess.run( + ['claude', '-p', '--model', 'haiku', '--output-format', 'json'], + input=planning_prompt, + capture_output=True, + text=True, + timeout=30 + ) + + output = json.loads(result.stdout) + response_text = output.get('result', output.get('content', '')) + + # Extract JSON from response (handle markdown code blocks) + if '```json' in response_text: + response_text = response_text.split('```json')[1].split('```')[0] + elif '```' in response_text: + response_text = response_text.split('```')[1].split('```')[0] + + plan = json.loads(response_text.strip()) + + # Build the compound task structure + subtasks = [] + for i, st in enumerate(plan.get('subtasks', [])): + subtasks.append({ + "id": f"{task_id}-sub-{i}", + "index": i, + "description": st['description'], + "tier_hint": st.get('tier_hint', 2), + "tier_assigned": None, + "model": None, + "depends_on": st.get('depends_on', []), + "status": "pending", + "result": None, + "cost": None, + "started_at": None, + "completed_at": None + }) + + return { + "id": task_id, + "prompt": prompt, + "status": "planned", + "reasoning": plan.get('reasoning', ''), + "subtasks": subtasks, + "created_at": now, + "planned_at": now, + "completed_at": None, + "total_cost": 0.0 + } + + except Exception as e: + # If planning fails, wrap the whole thing as a single task + # This ensures the system degrades gracefully + return { + "id": task_id, + "prompt": prompt, + "status": "planned", + "reasoning": f"Planning failed ({str(e)}), treating as single task", + "subtasks": [{ + "id": f"{task_id}-sub-0", + "index": 0, + "description": prompt, + "tier_hint": 2, + "tier_assigned": None, + "model": None, + "depends_on": [], + "status": "pending", + "result": None, + "cost": None, + "started_at": None, + "completed_at": None + }], + "created_at": now, + "planned_at": now, + "completed_at": None, + "total_cost": 0.0 + } diff --git a/symbiont/task_manager.py b/symbiont/task_manager.py new file mode 100644 index 0000000..3b6ee4a --- /dev/null +++ b/symbiont/task_manager.py @@ -0,0 +1,385 @@ +""" +Compound Task Manager +===================== + +Manages the complete lifecycle of compound tasks: +- Tracks task state through planning and execution phases +- Executes subtasks in parallel while respecting dependencies +- Updates progress in real-time (via polling) +- Logs all executions to the immutable ledger + +Architecture: +1. submit_compound_task() - plans task (sync, fast) then spawns background execution +2. Background thread executes subtasks in dependency-respecting waves +3. get_task_progress() - clients poll for current state +4. list_recent_tasks() - dashboard view of recent executions +""" + +import json +import threading +import subprocess +import uuid +from datetime import datetime, timezone +from concurrent.futures import ThreadPoolExecutor, as_completed +from typing import Optional, Dict, List, Any +from pathlib import Path +import time + +from .planner import plan_task + + +# In-memory store for active/recent tasks +# Limited to _MAX_TASKS to prevent unbounded memory growth +_tasks = {} +_tasks_lock = threading.Lock() +_MAX_TASKS = 50 + +# Model-to-tier mapping +TIER_MODELS = {1: "haiku", 2: "sonnet", 3: "opus"} + +# Approximate costs per execution (USD) +# These are rough estimates - actual costs depend on token usage +TIER_COSTS = {1: 0.008, 2: 0.04, 3: 0.15} + +# Ledger file path (immutable execution log) +LEDGER_PATH = Path("/data/symbiont/ledger.jsonl") + + +def _log_ledger(entry: Dict[str, Any]) -> None: + """ + Append an entry to the immutable execution ledger. + + Args: + entry: Dictionary with execution details (timestamp, model, tokens, cost, etc.) + + The ledger is a jsonl file (one JSON object per line) used for: + - Cost tracking and billing + - Audit trails + - Performance analysis + """ + try: + with open(LEDGER_PATH, "a") as f: + f.write(json.dumps(entry) + "\n") + except Exception: + # Silently fail ledger writes to prevent task execution failures + pass + + +def _execute_subtask(subtask: Dict[str, Any]) -> Dict[str, Any]: + """ + Execute a single subtask via Claude CLI. + + Args: + subtask: Subtask dict with description and tier assignment + + Returns: + Updated subtask dict with result, status, cost, and timing info + + Process: + 1. Determine model from tier assignment + 2. Invoke Claude CLI with subprocess + 3. Parse JSON output and extract result + 4. Calculate cost and record in ledger + 5. Update subtask status and completion time + """ + + model = TIER_MODELS.get(subtask.get("tier_assigned") or subtask.get("tier_hint", 2), "sonnet") + _update_subtask(subtask, model=model, status="executing", + started_at=datetime.now(timezone.utc).isoformat()) + + try: + # Execute via Claude CLI with JSON output mode + result = subprocess.run( + ['claude', '-p', '--model', model, '--output-format', 'json'], + input=subtask["description"], + capture_output=True, + text=True, + timeout=120 + ) + + output = json.loads(result.stdout) + response_text = output.get('result', output.get('content', str(output))) + + # Extract token counts for cost calculation + tokens_in = output.get('input_tokens', 0) + tokens_out = output.get('output_tokens', 0) + cost = TIER_COSTS.get(subtask.get("tier_assigned", 2), 0.04) + + truncated = response_text[:2000] + if len(response_text) > 2000: + truncated += "\n[TRUNCATED...]" + _update_subtask(subtask, status="completed", result=truncated, + cost=cost, completed_at=datetime.now(timezone.utc).isoformat()) + + # Log successful execution to ledger + _log_ledger({ + "timestamp": subtask["completed_at"], + "model": model, + "success": True, + "input_tokens": tokens_in, + "output_tokens": tokens_out, + "estimated_cost_usd": cost, + "prompt_preview": subtask["description"][:100], + "compound_task_id": subtask.get("id", "unknown") + }) + + except subprocess.TimeoutExpired: + now = datetime.now(timezone.utc).isoformat() + _update_subtask(subtask, status="failed", result="Execution timed out (120s)", completed_at=now) + _log_ledger({ + "timestamp": now, "model": model, "success": False, + "error": "timeout", "compound_task_id": subtask.get("id", "unknown") + }) + + except Exception as e: + now = datetime.now(timezone.utc).isoformat() + _update_subtask(subtask, status="failed", result=f"Error: {str(e)}", completed_at=now) + _log_ledger({ + "timestamp": now, "model": model, "success": False, + "error": str(e), "compound_task_id": subtask.get("id", "unknown") + }) + + return subtask + + +def _validate_dependencies(subtasks: List[Dict[str, Any]]) -> None: + """Validate and clamp dependency indices to valid range.""" + valid_indices = set(s["index"] for s in subtasks) + for st in subtasks: + deps = st.get("depends_on", []) + # Remove self-references and out-of-range indices + st["depends_on"] = [d for d in deps if d in valid_indices and d != st["index"]] + + +def _update_subtask(subtask: Dict[str, Any], **updates) -> None: + """Thread-safe subtask field update under the global lock.""" + with _tasks_lock: + subtask.update(updates) + + +def _run_compound_task(task_id: str) -> None: + """ + Background thread function: execute subtasks respecting dependency order. + + Args: + task_id: ID of the compound task to execute + + Execution strategy: + 1. Validate dependency graph + 2. Execute subtasks in dependency-respecting waves + 3. A subtask is ready when all its dependencies have completed + 4. Ready subtasks are executed in parallel (up to 4 concurrent workers) + 5. Repeat until all subtasks are complete or stuck + 6. Calculate total cost and finalize task status + """ + + with _tasks_lock: + task = _tasks.get(task_id) + if not task: + return + task["status"] = "executing" + + subtasks = task["subtasks"] + completed_indices = set() + + # Phase 0: Validate dependency graph + _validate_dependencies(subtasks) + + # Phase 1: Routing - assign tier to each subtask + for st in subtasks: + _update_subtask(st, status="routing", tier_assigned=st.get("tier_hint", 2)) + + # Phase 2: Execution - run subtasks in waves respecting dependencies + max_stall_cycles = 60 # 30 seconds max stall (0.5s * 60) + stall_count = 0 + + with ThreadPoolExecutor(max_workers=4) as executor: + while len(completed_indices) < len(subtasks): + # Find subtasks that are ready to execute + ready = [] + for st in subtasks: + if st["index"] in completed_indices: + continue + if st["status"] in ("executing", "queued", "failed"): + continue + deps = set(st.get("depends_on", [])) + if deps.issubset(completed_indices): + ready.append(st) + + if not ready: + remaining = [s for s in subtasks if s["index"] not in completed_indices] + still_running = any(s["status"] in ("executing", "queued") for s in remaining) + if not still_running: + # Truly stuck — all remaining are blocked by failed deps + for s in remaining: + if s["status"] not in ("completed", "failed"): + _update_subtask(s, status="failed", result="Blocked by failed dependency") + completed_indices.add(s["index"]) + break + stall_count += 1 + if stall_count > max_stall_cycles: + break + time.sleep(0.5) + continue + + stall_count = 0 # Reset on progress + + # Launch ready subtasks in parallel + futures = {} + for st in ready: + _update_subtask(st, status="queued") + futures[executor.submit(_execute_subtask, st)] = st + + for future in as_completed(futures): + st = futures[future] + try: + future.result() + except Exception as e: + _update_subtask(st, status="failed", result=str(e)) + + if st["status"] in ("completed", "failed"): + completed_indices.add(st["index"]) + + # Phase 3: Finalization + total_cost = sum(s.get("cost", 0) or 0 for s in subtasks) + all_ok = all(s["status"] == "completed" for s in subtasks) + + with _tasks_lock: + task["status"] = "completed" if all_ok else "partial" + task["completed_at"] = datetime.now(timezone.utc).isoformat() + task["total_cost"] = total_cost + + +def submit_compound_task(prompt: str, auth_token: Optional[str] = None) -> Dict[str, Any]: + """ + Plan and begin executing a compound task. + + Args: + prompt: The user's request to decompose and execute + auth_token: (Optional) authentication token for the submission + + Returns: + Immediate response with task ID for polling: + { + "id": "compound-{uuid}", + "status": "planned", + "subtask_count": N + } + + Process: + 1. Use Haiku to plan the task (fast, synchronous) + 2. Store task in memory + 3. Spawn background thread for async execution + 4. Return immediately to client for polling + + The client can then poll /task/{task_id}/progress to monitor execution. + """ + + # Phase 1: Plan (synchronous - fast, uses Haiku) + task = plan_task(prompt) + task_id = task["id"] + + with _tasks_lock: + # Evict oldest task if we're at capacity + if len(_tasks) >= _MAX_TASKS: + oldest_key = min(_tasks, key=lambda k: _tasks[k].get("created_at", "")) + del _tasks[oldest_key] + _tasks[task_id] = task + + # Phase 2: Execute (async - in background thread) + # The task will progress from "planned" -> "executing" -> "completed"/"partial" + thread = threading.Thread(target=_run_compound_task, args=(task_id,), daemon=True) + thread.start() + + return { + "id": task_id, + "status": task["status"], + "subtask_count": len(task["subtasks"]) + } + + +def get_task_progress(task_id: str) -> Optional[Dict[str, Any]]: + """ + Get current state of a compound task (for polling/dashboard). + + Args: + task_id: The task ID from submit_compound_task() + + Returns: + Complete task snapshot including all subtask progress, or None if not found. + + Returned structure: + { + "id": task_id, + "prompt": original prompt, + "status": "planned"|"executing"|"completed"|"partial", + "reasoning": explanation from planner, + "subtasks": [ + { + "id": subtask ID, + "index": 0, + "description": task description, + "tier_hint": 1|2|3, + "tier_assigned": 1|2|3, + "model": "haiku"|"sonnet"|"opus"|None, + "depends_on": [indices], + "status": "pending"|"routing"|"queued"|"executing"|"completed"|"failed", + "result": str or None, + "cost": float or None, + "started_at": ISO8601 or None, + "completed_at": ISO8601 or None + }, + ... + ], + "created_at": ISO8601, + "planned_at": ISO8601, + "completed_at": ISO8601 or None, + "total_cost": float + } + """ + with _tasks_lock: + task = _tasks.get(task_id) + if not task: + return None + # Return a deep copy for thread safety + return json.loads(json.dumps(task, default=str)) + + +def list_recent_tasks(limit: int = 20) -> List[Dict[str, Any]]: + """ + List recent compound tasks (for dashboard view). + + Args: + limit: Maximum number of tasks to return + + Returns: + List of task summaries (most recent first): + [ + { + "id": task ID, + "prompt": truncated prompt, + "status": current status, + "subtask_count": total subtasks, + "completed_count": subtasks finished, + "total_cost": cumulative USD cost, + "created_at": ISO8601, + "completed_at": ISO8601 or None + }, + ... + ] + """ + with _tasks_lock: + tasks = sorted(_tasks.values(), key=lambda t: t.get("created_at", ""), reverse=True) + return [ + { + "id": t["id"], + "prompt": t["prompt"][:100], + "status": t["status"], + "subtask_count": len(t["subtasks"]), + "completed_count": sum(1 for s in t["subtasks"] if s["status"] == "completed"), + "total_cost": t.get("total_cost", 0), + "created_at": t.get("created_at"), + "completed_at": t.get("completed_at") + } + for t in tasks[:limit] + ]