auto-repair: commit 7 uncommitted file(s) — 2026-03-30

This commit is contained in:
Claude 2026-03-30 18:18:23 +00:00
parent 18252e05e6
commit afd14d1d00
7 changed files with 1299 additions and 0 deletions

BIN
engram.db

Binary file not shown.

Binary file not shown.

211
public_api.py Executable file
View File

@ -0,0 +1,211 @@
#!/usr/bin/env python3
"""
Cortex Public API lightweight endpoints for external Claude sessions.
Runs on port 8112, exposed via Caddy at cortex.hydrascale.net/api/
Auth: pass API key via X-API-Key header, Authorization: Bearer, or ?key= query param.
"""
import json
import os
import subprocess
import sqlite3
from datetime import datetime, timedelta
from pathlib import Path
from typing import Optional
from fastapi import FastAPI, HTTPException, Header, Depends, Query, Request
from fastapi.middleware.cors import CORSMiddleware
API_KEY = "cortex-pub-a7f3e9c1d4b8"
app = FastAPI(title="Cortex Public API", version="0.2.0")
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_methods=["GET", "POST"],
allow_headers=["*"],
)
def verify_key(
request: Request,
x_api_key: Optional[str] = Header(None),
authorization: Optional[str] = Header(None),
key: Optional[str] = Query(None),
):
"""Accept key via X-API-Key header, Authorization: Bearer, or ?key= query param."""
token = x_api_key or key
if not token and authorization and authorization.startswith("Bearer "):
token = authorization[7:]
if token != API_KEY:
raise HTTPException(status_code=401, detail="Invalid API key. Pass via X-API-Key header or ?key= param.")
return token
# ──────────── /health (no auth) ────────────
@app.get("/health")
async def health():
return {"status": "ok", "timestamp": datetime.utcnow().isoformat() + "Z"}
# ──────────── /info — system specs ────────────
@app.get("/info")
async def system_info(auth: str = Depends(verify_key)):
"""Return full system specs — CPU, RAM, disk, GPU, OS, etc."""
specs = {}
# OS
try:
out = subprocess.check_output(["lsb_release", "-ds"], text=True).strip()
specs["os"] = out
except Exception:
specs["os"] = "unknown"
# Kernel
specs["kernel"] = subprocess.check_output(["uname", "-r"], text=True).strip()
# Hostname
specs["hostname"] = subprocess.check_output(["hostname"], text=True).strip()
# CPU
try:
cpuinfo = Path("/proc/cpuinfo").read_text()
model_names = [l.split(":")[1].strip() for l in cpuinfo.splitlines() if "model name" in l]
specs["cpu"] = {
"model": model_names[0] if model_names else "unknown",
"cores": len(model_names),
}
except Exception:
specs["cpu"] = "unknown"
# RAM
try:
meminfo = Path("/proc/meminfo").read_text()
for line in meminfo.splitlines():
if line.startswith("MemTotal:"):
kb = int(line.split()[1])
specs["ram_gb"] = round(kb / 1024 / 1024, 1)
break
except Exception:
specs["ram_gb"] = "unknown"
# Disk
try:
df = subprocess.check_output(["df", "-h", "/"], text=True).strip().split("\n")[1].split()
specs["disk"] = {"total": df[1], "used": df[2], "available": df[3], "use_pct": df[4]}
except Exception:
specs["disk"] = "unknown"
# GPU
try:
gpu = subprocess.check_output(["lspci"], text=True)
gpu_lines = [l for l in gpu.splitlines() if "VGA" in l or "3D" in l or "Display" in l]
specs["gpu"] = gpu_lines if gpu_lines else "none detected"
except Exception:
specs["gpu"] = "lspci not available"
# Uptime
try:
uptime = Path("/proc/uptime").read_text().split()[0]
specs["uptime_hours"] = round(float(uptime) / 3600, 1)
except Exception:
specs["uptime_hours"] = "unknown"
# Docker containers
try:
containers = subprocess.check_output(
["docker", "ps", "--format", "{{.Names}}: {{.Status}}"], text=True
).strip()
specs["docker_containers"] = containers.splitlines() if containers else []
except Exception:
specs["docker_containers"] = "docker not available"
# Services
services = {}
for svc in ["caddy", "fail2ban", "docker", "symbiont-api", "cortex-public-api"]:
try:
status = subprocess.check_output(
["systemctl", "is-active", svc], text=True
).strip()
services[svc] = status
except Exception:
services[svc] = "unknown"
specs["services"] = services
specs["timestamp"] = datetime.utcnow().isoformat() + "Z"
return specs
# ──────────── /engram/sitrep — session awareness ────────────
@app.get("/engram/sitrep")
async def engram_sitrep(auth: str = Depends(verify_key)):
"""Return situation report from engram DB."""
db_path = Path("/data/symbiont/engram.db")
if not db_path.exists():
return {"error": "engram.db not found", "sessions": []}
conn = sqlite3.connect(str(db_path))
conn.row_factory = sqlite3.Row
cur = conn.cursor()
# Get table info first
cur.execute("SELECT name FROM sqlite_master WHERE type='table'")
tables = [r["name"] for r in cur.fetchall()]
if "sessions" not in tables:
conn.close()
return {"error": "sessions table not found", "tables": tables}
# Get active sessions
cur.execute("SELECT * FROM sessions WHERE status = 'active' ORDER BY started_at DESC")
active = [dict(r) for r in cur.fetchall()]
# Get recent completed (last 24h)
cutoff = (datetime.utcnow() - timedelta(hours=24)).isoformat()
cur.execute(
"SELECT * FROM sessions WHERE status = 'completed' AND started_at > ? ORDER BY started_at DESC",
(cutoff,)
)
recent = [dict(r) for r in cur.fetchall()]
conn.close()
return {"active_sessions": active, "recent_24h": recent}
# ──────────── /engram/register — register a session ────────────
@app.post("/engram/register")
async def engram_register(
session_type: str = Query(default="claude-chat"),
description: str = Query(default="External Claude session"),
auth: str = Depends(verify_key),
):
"""Register a new session in engram."""
import sys
sys.path.insert(0, "/data/symbiont")
from symbiont.engram import Engram
eng = Engram()
sid = eng.register(session_type, description)
return {"session_id": sid, "status": "registered"}
# ──────────── /engram/log — log to a session ────────────
@app.post("/engram/log")
async def engram_log(
session_id: str = Query(...),
message: str = Query(...),
auth: str = Depends(verify_key),
):
"""Log a message to an existing session."""
import sys
sys.path.insert(0, "/data/symbiont")
from symbiont.engram import Engram
eng = Engram()
eng.log(session_id, message)
return {"status": "logged"}
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="127.0.0.1", port=8112)

189
symbiont/api.py.bak Normal file
View File

@ -0,0 +1,189 @@
"""
FastAPI server for the Symbiont orchestrator.
Endpoints:
POST /task Run a task through the router
POST /queue Add a task to the queue
GET /status Health check + rate limit status
GET /ledger Recent ledger entries
GET /ledger/stats Aggregate cost/usage stats
GET /sessions Active and recent sessions
GET /sitrep Situation report for new sessions
"""
import json
import logging
from datetime import datetime
from pathlib import Path
from typing import Optional
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
from .dispatcher import ModelTier, rate_limits
from .router import route_task
from .scheduler import enqueue_task, get_pending_tasks
from .engram import Engram as SessionRegistry
logger = logging.getLogger(__name__)
app = FastAPI(
title="Symbiont",
description="Self-sustaining AI orchestrator",
version="0.1.0",
)
LEDGER_PATH = Path("/data/symbiont/ledger.jsonl")
class TaskRequest(BaseModel):
task: str
system_prompt: Optional[str] = None
force_tier: Optional[str] = None # "haiku", "sonnet", "opus"
class QueueRequest(BaseModel):
task: str
priority: int = 5
metadata: Optional[dict] = None
@app.post("/task")
async def run_task(req: TaskRequest):
"""Execute a task immediately through the router. Logs to session registry."""
force = None
if req.force_tier:
try:
force = ModelTier(req.force_tier.lower())
except ValueError:
raise HTTPException(400, f"Invalid tier: {req.force_tier}")
# Register this dispatch as a micro-session
reg = SessionRegistry()
sid = reg.register("api-task", req.task[:120])
result = route_task(
req.task,
system_prompt=req.system_prompt,
force_tier=force,
)
# Complete the micro-session
if result["success"]:
reg.complete(sid, f"Completed via {result['model_used']} (${result['estimated_cost_usd']:.4f})")
else:
reg.complete(sid, f"Failed: {result.get('error', 'unknown')}")
return result
@app.post("/queue")
async def queue_task(req: QueueRequest):
"""Add a task to the queue for later processing."""
task_id = enqueue_task(req.task, req.priority, req.metadata)
return {"task_id": task_id, "status": "queued"}
@app.get("/status")
async def status():
"""Health check and current rate limit status."""
limits = {}
for tier in ModelTier:
if rate_limits.is_limited(tier):
limits[tier.value] = {
"limited": True,
"until": rate_limits.limited_until[tier].isoformat(),
}
else:
limits[tier.value] = {"limited": False}
pending = get_pending_tasks()
return {
"status": "alive",
"timestamp": datetime.now().isoformat(),
"rate_limits": limits,
"pending_tasks": len(pending),
}
@app.get("/ledger")
async def get_ledger(limit: int = 50):
"""Return the most recent ledger entries."""
if not LEDGER_PATH.exists():
return {"entries": [], "total": 0}
lines = LEDGER_PATH.read_text().strip().split("\n")
entries = []
for line in lines[-limit:]:
try:
entries.append(json.loads(line))
except json.JSONDecodeError:
continue
return {"entries": entries, "total": len(lines)}
@app.get("/ledger/stats")
async def ledger_stats():
"""Aggregate statistics from the ledger."""
if not LEDGER_PATH.exists():
return {"total_calls": 0}
lines = LEDGER_PATH.read_text().strip().split("\n")
entries = []
for line in lines:
try:
entries.append(json.loads(line))
except json.JSONDecodeError:
continue
if not entries:
return {"total_calls": 0}
total_cost = sum(e.get("estimated_cost_usd", 0) for e in entries)
by_model = {}
for e in entries:
model = e.get("model", "unknown")
if model not in by_model:
by_model[model] = {"calls": 0, "cost": 0, "tokens_in": 0, "tokens_out": 0}
by_model[model]["calls"] += 1
by_model[model]["cost"] += e.get("estimated_cost_usd", 0)
by_model[model]["tokens_in"] += e.get("input_tokens", 0)
by_model[model]["tokens_out"] += e.get("output_tokens", 0)
successes = sum(1 for e in entries if e.get("success"))
return {
"total_calls": len(entries),
"successful_calls": successes,
"total_estimated_cost_usd": round(total_cost, 4),
"by_model": by_model,
"first_entry": entries[0].get("timestamp") if entries else None,
"last_entry": entries[-1].get("timestamp") if entries else None,
}
@app.get("/sitrep")
async def get_sitrep():
"""Situation report — what other sessions are doing. Read this first."""
reg = SessionRegistry()
return {
"report": reg.get_situation_report(),
"active": reg.get_active_sessions(),
"recent_24h": reg.get_recent_sessions(hours=24),
}
@app.get("/sessions")
async def get_sessions(status: Optional[str] = None):
"""List sessions, optionally filtered by status."""
reg = SessionRegistry()
if status == "active":
return {"sessions": reg.get_active_sessions()}
elif status == "completed":
return {"sessions": reg.get_recent_sessions(hours=168)} # 1 week
else:
active = reg.get_active_sessions()
recent = reg.get_recent_sessions(hours=24)
return {"active": active, "recent_24h": recent}

347
symbiont/api_additions.py Normal file
View File

@ -0,0 +1,347 @@
"""
FastAPI Endpoint Additions for Compound Tasks
==============================================
New endpoints to integrate into the existing Symbiont API (/data/symbiont/symbiont/api.py).
These endpoints expose the compound task system to external callers:
1. POST /task/compound - Submit a new compound task for execution
2. GET /task/{task_id}/progress - Poll for task progress
3. GET /tasks/recent - List recent tasks (dashboard view)
Authentication:
- Task submission requires a bearer token
- Progress polling requires no auth (task ID is the "secret")
- Recent tasks list is unauthenticated (low-sensitivity data)
Integration Instructions:
1. Import task_manager functions at the top of api.py:
from .task_manager import submit_compound_task, get_task_progress, list_recent_tasks
2. Define authentication token (set to your secret):
TASK_AUTH_TOKEN = "cortex-tasks-2026"
3. Add these classes and functions to api.py
4. Add the three routes to your FastAPI app instance
"""
from fastapi import FastAPI, HTTPException, Depends, Header
from pydantic import BaseModel, Field
from typing import Optional, List, Dict, Any
from .task_manager import submit_compound_task, get_task_progress, list_recent_tasks
# These imports should be added to your api.py
# from .task_manager import submit_compound_task, get_task_progress, list_recent_tasks
# ============================================================================
# Configuration
# ============================================================================
# Simple shared secret for task submission auth
# In production, consider using OAuth2, API keys, or other stronger methods
TASK_AUTH_TOKEN = "cortex-tasks-2026"
# ============================================================================
# Authentication
# ============================================================================
def verify_task_auth(
authorization: Optional[str] = Header(None),
token: Optional[str] = None
) -> str:
"""
Verify task submission authentication.
Supports two methods:
1. Bearer token in Authorization header: "Authorization: Bearer {token}"
2. Query parameter: ?token={token}
Args:
authorization: Authorization header value
token: Token from query parameter
Returns:
Verified token
Raises:
HTTPException 401: Invalid or missing token
"""
auth_token = token or (
authorization.replace("Bearer ", "") if authorization else None
)
if auth_token != TASK_AUTH_TOKEN:
raise HTTPException(status_code=401, detail="Invalid or missing auth token")
return auth_token
# ============================================================================
# Request/Response Models
# ============================================================================
class CompoundTaskRequest(BaseModel):
"""Request body for compound task submission."""
prompt: str = Field(
...,
description="The user's request to decompose and execute",
min_length=1,
max_length=5000
)
token: Optional[str] = Field(
None,
description="Auth token (alternative to Bearer header)"
)
class Config:
json_schema_extra = {
"example": {
"prompt": "Search for Python concurrency patterns and summarize the top 3"
}
}
class CompoundTaskResponse(BaseModel):
"""Immediate response from task submission."""
id: str = Field(
...,
description="Unique task ID for polling progress"
)
status: str = Field(
...,
description="Current task status (planned, executing, completed, partial)"
)
subtask_count: int = Field(
...,
description="Total number of subtasks in the plan"
)
class Config:
json_schema_extra = {
"example": {
"id": "compound-a1b2c3d4e5f6",
"status": "planned",
"subtask_count": 3
}
}
class SubtaskSnapshot(BaseModel):
"""Current state of a single subtask."""
id: str
index: int
description: str
tier_hint: int
tier_assigned: Optional[int]
model: Optional[str]
depends_on: List[int]
status: str
result: Optional[str]
cost: Optional[float]
started_at: Optional[str]
completed_at: Optional[str]
class TaskProgressResponse(BaseModel):
"""Complete progress snapshot for a compound task."""
id: str
prompt: str
status: str
reasoning: str
subtasks: List[SubtaskSnapshot]
created_at: str
planned_at: str
completed_at: Optional[str]
total_cost: float
class Config:
json_schema_extra = {
"example": {
"id": "compound-a1b2c3d4e5f6",
"prompt": "Search for Python patterns and summarize",
"status": "executing",
"reasoning": "Breaking into search, summarization, and output formatting",
"subtasks": [
{
"id": "compound-a1b2c3d4e5f6-sub-0",
"index": 0,
"description": "Search for Python concurrency patterns",
"tier_hint": 2,
"tier_assigned": 2,
"model": "sonnet",
"depends_on": [],
"status": "completed",
"result": "Found patterns including...",
"cost": 0.04,
"started_at": "2026-03-21T15:30:00Z",
"completed_at": "2026-03-21T15:30:05Z"
}
],
"created_at": "2026-03-21T15:30:00Z",
"planned_at": "2026-03-21T15:30:00Z",
"completed_at": None,
"total_cost": 0.04
}
}
class TaskSummary(BaseModel):
"""Summary of a task for list view."""
id: str
prompt: str
status: str
subtask_count: int
completed_count: int
total_cost: float
created_at: str
completed_at: Optional[str]
# ============================================================================
# Endpoints
# ============================================================================
def setup_compound_task_endpoints(app: FastAPI) -> None:
"""
Register all compound task endpoints on the FastAPI app.
Call this from your main api.py file:
from .api_additions import setup_compound_task_endpoints
setup_compound_task_endpoints(app)
Or manually add the @app.post/@app.get routes below.
"""
@app.post(
"/task/compound",
response_model=CompoundTaskResponse,
tags=["Compound Tasks"],
summary="Submit a compound task",
description="Submit a user prompt to be decomposed into subtasks and executed in parallel"
)
async def create_compound_task(
req: CompoundTaskRequest,
auth: str = Depends(verify_task_auth)
) -> CompoundTaskResponse:
"""
Submit a compound task for execution.
The task is decomposed by Haiku into subtasks that can run in parallel.
Returns immediately with a task ID for polling progress.
Args:
req: Task request with prompt and optional token
auth: Verified authentication token
Returns:
Task ID and initial status
Example:
curl -X POST http://localhost:8000/task/compound \\
-H "Authorization: Bearer cortex-tasks-2026" \\
-H "Content-Type: application/json" \\
-d '{"prompt": "Search for patterns and summarize"}'
"""
result = submit_compound_task(req.prompt, auth_token=auth)
return CompoundTaskResponse(**result)
@app.get(
"/task/{task_id}/progress",
response_model=TaskProgressResponse,
tags=["Compound Tasks"],
summary="Get task progress",
description="Poll for current execution status of a compound task"
)
async def task_progress(task_id: str) -> TaskProgressResponse:
"""
Poll for compound task progress.
No authentication required - task ID serves as the secret. Clients can poll
this endpoint to track execution progress and see individual subtask results.
Args:
task_id: The task ID from create_compound_task
Returns:
Complete task snapshot including all subtasks and their progress
Raises:
404: Task not found (expired from cache or invalid ID)
Example:
curl http://localhost:8000/task/compound-a1b2c3d4e5f6/progress
"""
progress = get_task_progress(task_id)
if progress is None:
raise HTTPException(status_code=404, detail="Task not found")
return TaskProgressResponse(**progress)
@app.get(
"/tasks/recent",
response_model=List[TaskSummary],
tags=["Compound Tasks"],
summary="List recent tasks",
description="Get summaries of recently submitted compound tasks"
)
async def recent_tasks(limit: int = 20) -> List[TaskSummary]:
"""
List recent compound tasks (dashboard view).
Useful for monitoring system activity and recent executions.
No authentication required.
Args:
limit: Maximum number of tasks to return (default 20, max 100)
Returns:
List of task summaries ordered by most recent first
Example:
curl 'http://localhost:8000/tasks/recent?limit=10'
"""
if limit > 100:
limit = 100
tasks = list_recent_tasks(limit=limit)
return [TaskSummary(**t) for t in tasks]
# ============================================================================
# Manual Integration (if not using setup_compound_task_endpoints)
# ============================================================================
"""
If you prefer to manually add routes instead of using setup_compound_task_endpoints(),
add these to your api.py after creating the FastAPI app:
# At the top of api.py, import these:
from .task_manager import submit_compound_task, get_task_progress, list_recent_tasks
# Then add these route definitions:
@app.post("/task/compound", response_model=CompoundTaskResponse, tags=["Compound Tasks"])
async def create_compound_task(req: CompoundTaskRequest, auth: str = Depends(verify_task_auth)):
result = submit_compound_task(req.prompt, auth_token=auth)
return CompoundTaskResponse(**result)
@app.get("/task/{task_id}/progress", response_model=TaskProgressResponse, tags=["Compound Tasks"])
async def task_progress(task_id: str):
progress = get_task_progress(task_id)
if progress is None:
raise HTTPException(status_code=404, detail="Task not found")
return TaskProgressResponse(**progress)
@app.get("/tasks/recent", response_model=List[TaskSummary], tags=["Compound Tasks"])
async def recent_tasks(limit: int = 20):
if limit > 100:
limit = 100
tasks = list_recent_tasks(limit=limit)
return [TaskSummary(**t) for t in tasks]
"""

167
symbiont/planner.py Normal file
View File

@ -0,0 +1,167 @@
"""
Compound Task Planner
=====================
Takes a user prompt and uses Haiku (the cheapest tier) to decompose it into
independent subtasks that can be executed in parallel where possible.
The planner handles:
- Breaking down complex requests into manageable subtasks
- Identifying parallelizable work (tasks with no dependencies)
- Suggesting optimal tier assignments (Haiku/Sonnet/Opus)
- Graceful fallback to single-task execution if planning fails
"""
import json
import subprocess
import uuid
from datetime import datetime, timezone
from typing import Dict, List, Any
PLANNING_PROMPT_TEMPLATE = """You are a task planner. Given a user's request, break it into independent subtasks that can be executed in parallel where possible.
Return ONLY valid JSON in this exact format:
{{
"reasoning": "Brief explanation of your decomposition strategy",
"subtasks": [
{{
"description": "Clear, self-contained task description",
"tier_hint": 1,
"depends_on": []
}}
]
}}
Rules:
- tier_hint: 1=simple/extraction, 2=moderate/writing/code, 3=complex/reasoning
- depends_on: array of subtask indices (0-based) that must complete first
- Tasks with empty depends_on can run in parallel
- Each subtask should be self-contained enough to execute independently
- Keep subtasks to 2-6 items (don't over-decompose simple requests)
- For simple single-step requests, return just one subtask
User request: {prompt}"""
def plan_task(prompt: str) -> Dict[str, Any]:
"""
Use Haiku to decompose a prompt into subtasks.
Args:
prompt: The user's request to decompose
Returns:
A compound task plan with the following structure:
{
"id": "compound-{uuid}",
"prompt": original prompt,
"status": "planned",
"reasoning": explanation from Haiku,
"subtasks": [
{
"id": "compound-{uuid}-sub-{i}",
"index": i,
"description": task description,
"tier_hint": 1|2|3,
"tier_assigned": None (will be set during execution),
"model": None (will be set during execution),
"depends_on": [indices of predecessor tasks],
"status": "pending",
"result": None,
"cost": None,
"started_at": None,
"completed_at": None
},
...
],
"created_at": ISO8601 timestamp,
"planned_at": ISO8601 timestamp,
"completed_at": None,
"total_cost": 0.0
}
"""
task_id = f"compound-{uuid.uuid4().hex[:12]}"
now = datetime.now(timezone.utc).isoformat()
planning_prompt = PLANNING_PROMPT_TEMPLATE.format(prompt=prompt)
try:
# Invoke Claude CLI with Haiku for planning (cheapest option)
result = subprocess.run(
['claude', '-p', '--model', 'haiku', '--output-format', 'json'],
input=planning_prompt,
capture_output=True,
text=True,
timeout=30
)
output = json.loads(result.stdout)
response_text = output.get('result', output.get('content', ''))
# Extract JSON from response (handle markdown code blocks)
if '```json' in response_text:
response_text = response_text.split('```json')[1].split('```')[0]
elif '```' in response_text:
response_text = response_text.split('```')[1].split('```')[0]
plan = json.loads(response_text.strip())
# Build the compound task structure
subtasks = []
for i, st in enumerate(plan.get('subtasks', [])):
subtasks.append({
"id": f"{task_id}-sub-{i}",
"index": i,
"description": st['description'],
"tier_hint": st.get('tier_hint', 2),
"tier_assigned": None,
"model": None,
"depends_on": st.get('depends_on', []),
"status": "pending",
"result": None,
"cost": None,
"started_at": None,
"completed_at": None
})
return {
"id": task_id,
"prompt": prompt,
"status": "planned",
"reasoning": plan.get('reasoning', ''),
"subtasks": subtasks,
"created_at": now,
"planned_at": now,
"completed_at": None,
"total_cost": 0.0
}
except Exception as e:
# If planning fails, wrap the whole thing as a single task
# This ensures the system degrades gracefully
return {
"id": task_id,
"prompt": prompt,
"status": "planned",
"reasoning": f"Planning failed ({str(e)}), treating as single task",
"subtasks": [{
"id": f"{task_id}-sub-0",
"index": 0,
"description": prompt,
"tier_hint": 2,
"tier_assigned": None,
"model": None,
"depends_on": [],
"status": "pending",
"result": None,
"cost": None,
"started_at": None,
"completed_at": None
}],
"created_at": now,
"planned_at": now,
"completed_at": None,
"total_cost": 0.0
}

385
symbiont/task_manager.py Normal file
View File

@ -0,0 +1,385 @@
"""
Compound Task Manager
=====================
Manages the complete lifecycle of compound tasks:
- Tracks task state through planning and execution phases
- Executes subtasks in parallel while respecting dependencies
- Updates progress in real-time (via polling)
- Logs all executions to the immutable ledger
Architecture:
1. submit_compound_task() - plans task (sync, fast) then spawns background execution
2. Background thread executes subtasks in dependency-respecting waves
3. get_task_progress() - clients poll for current state
4. list_recent_tasks() - dashboard view of recent executions
"""
import json
import threading
import subprocess
import uuid
from datetime import datetime, timezone
from concurrent.futures import ThreadPoolExecutor, as_completed
from typing import Optional, Dict, List, Any
from pathlib import Path
import time
from .planner import plan_task
# In-memory store for active/recent tasks
# Limited to _MAX_TASKS to prevent unbounded memory growth
_tasks = {}
_tasks_lock = threading.Lock()
_MAX_TASKS = 50
# Model-to-tier mapping
TIER_MODELS = {1: "haiku", 2: "sonnet", 3: "opus"}
# Approximate costs per execution (USD)
# These are rough estimates - actual costs depend on token usage
TIER_COSTS = {1: 0.008, 2: 0.04, 3: 0.15}
# Ledger file path (immutable execution log)
LEDGER_PATH = Path("/data/symbiont/ledger.jsonl")
def _log_ledger(entry: Dict[str, Any]) -> None:
"""
Append an entry to the immutable execution ledger.
Args:
entry: Dictionary with execution details (timestamp, model, tokens, cost, etc.)
The ledger is a jsonl file (one JSON object per line) used for:
- Cost tracking and billing
- Audit trails
- Performance analysis
"""
try:
with open(LEDGER_PATH, "a") as f:
f.write(json.dumps(entry) + "\n")
except Exception:
# Silently fail ledger writes to prevent task execution failures
pass
def _execute_subtask(subtask: Dict[str, Any]) -> Dict[str, Any]:
"""
Execute a single subtask via Claude CLI.
Args:
subtask: Subtask dict with description and tier assignment
Returns:
Updated subtask dict with result, status, cost, and timing info
Process:
1. Determine model from tier assignment
2. Invoke Claude CLI with subprocess
3. Parse JSON output and extract result
4. Calculate cost and record in ledger
5. Update subtask status and completion time
"""
model = TIER_MODELS.get(subtask.get("tier_assigned") or subtask.get("tier_hint", 2), "sonnet")
_update_subtask(subtask, model=model, status="executing",
started_at=datetime.now(timezone.utc).isoformat())
try:
# Execute via Claude CLI with JSON output mode
result = subprocess.run(
['claude', '-p', '--model', model, '--output-format', 'json'],
input=subtask["description"],
capture_output=True,
text=True,
timeout=120
)
output = json.loads(result.stdout)
response_text = output.get('result', output.get('content', str(output)))
# Extract token counts for cost calculation
tokens_in = output.get('input_tokens', 0)
tokens_out = output.get('output_tokens', 0)
cost = TIER_COSTS.get(subtask.get("tier_assigned", 2), 0.04)
truncated = response_text[:2000]
if len(response_text) > 2000:
truncated += "\n[TRUNCATED...]"
_update_subtask(subtask, status="completed", result=truncated,
cost=cost, completed_at=datetime.now(timezone.utc).isoformat())
# Log successful execution to ledger
_log_ledger({
"timestamp": subtask["completed_at"],
"model": model,
"success": True,
"input_tokens": tokens_in,
"output_tokens": tokens_out,
"estimated_cost_usd": cost,
"prompt_preview": subtask["description"][:100],
"compound_task_id": subtask.get("id", "unknown")
})
except subprocess.TimeoutExpired:
now = datetime.now(timezone.utc).isoformat()
_update_subtask(subtask, status="failed", result="Execution timed out (120s)", completed_at=now)
_log_ledger({
"timestamp": now, "model": model, "success": False,
"error": "timeout", "compound_task_id": subtask.get("id", "unknown")
})
except Exception as e:
now = datetime.now(timezone.utc).isoformat()
_update_subtask(subtask, status="failed", result=f"Error: {str(e)}", completed_at=now)
_log_ledger({
"timestamp": now, "model": model, "success": False,
"error": str(e), "compound_task_id": subtask.get("id", "unknown")
})
return subtask
def _validate_dependencies(subtasks: List[Dict[str, Any]]) -> None:
"""Validate and clamp dependency indices to valid range."""
valid_indices = set(s["index"] for s in subtasks)
for st in subtasks:
deps = st.get("depends_on", [])
# Remove self-references and out-of-range indices
st["depends_on"] = [d for d in deps if d in valid_indices and d != st["index"]]
def _update_subtask(subtask: Dict[str, Any], **updates) -> None:
"""Thread-safe subtask field update under the global lock."""
with _tasks_lock:
subtask.update(updates)
def _run_compound_task(task_id: str) -> None:
"""
Background thread function: execute subtasks respecting dependency order.
Args:
task_id: ID of the compound task to execute
Execution strategy:
1. Validate dependency graph
2. Execute subtasks in dependency-respecting waves
3. A subtask is ready when all its dependencies have completed
4. Ready subtasks are executed in parallel (up to 4 concurrent workers)
5. Repeat until all subtasks are complete or stuck
6. Calculate total cost and finalize task status
"""
with _tasks_lock:
task = _tasks.get(task_id)
if not task:
return
task["status"] = "executing"
subtasks = task["subtasks"]
completed_indices = set()
# Phase 0: Validate dependency graph
_validate_dependencies(subtasks)
# Phase 1: Routing - assign tier to each subtask
for st in subtasks:
_update_subtask(st, status="routing", tier_assigned=st.get("tier_hint", 2))
# Phase 2: Execution - run subtasks in waves respecting dependencies
max_stall_cycles = 60 # 30 seconds max stall (0.5s * 60)
stall_count = 0
with ThreadPoolExecutor(max_workers=4) as executor:
while len(completed_indices) < len(subtasks):
# Find subtasks that are ready to execute
ready = []
for st in subtasks:
if st["index"] in completed_indices:
continue
if st["status"] in ("executing", "queued", "failed"):
continue
deps = set(st.get("depends_on", []))
if deps.issubset(completed_indices):
ready.append(st)
if not ready:
remaining = [s for s in subtasks if s["index"] not in completed_indices]
still_running = any(s["status"] in ("executing", "queued") for s in remaining)
if not still_running:
# Truly stuck — all remaining are blocked by failed deps
for s in remaining:
if s["status"] not in ("completed", "failed"):
_update_subtask(s, status="failed", result="Blocked by failed dependency")
completed_indices.add(s["index"])
break
stall_count += 1
if stall_count > max_stall_cycles:
break
time.sleep(0.5)
continue
stall_count = 0 # Reset on progress
# Launch ready subtasks in parallel
futures = {}
for st in ready:
_update_subtask(st, status="queued")
futures[executor.submit(_execute_subtask, st)] = st
for future in as_completed(futures):
st = futures[future]
try:
future.result()
except Exception as e:
_update_subtask(st, status="failed", result=str(e))
if st["status"] in ("completed", "failed"):
completed_indices.add(st["index"])
# Phase 3: Finalization
total_cost = sum(s.get("cost", 0) or 0 for s in subtasks)
all_ok = all(s["status"] == "completed" for s in subtasks)
with _tasks_lock:
task["status"] = "completed" if all_ok else "partial"
task["completed_at"] = datetime.now(timezone.utc).isoformat()
task["total_cost"] = total_cost
def submit_compound_task(prompt: str, auth_token: Optional[str] = None) -> Dict[str, Any]:
"""
Plan and begin executing a compound task.
Args:
prompt: The user's request to decompose and execute
auth_token: (Optional) authentication token for the submission
Returns:
Immediate response with task ID for polling:
{
"id": "compound-{uuid}",
"status": "planned",
"subtask_count": N
}
Process:
1. Use Haiku to plan the task (fast, synchronous)
2. Store task in memory
3. Spawn background thread for async execution
4. Return immediately to client for polling
The client can then poll /task/{task_id}/progress to monitor execution.
"""
# Phase 1: Plan (synchronous - fast, uses Haiku)
task = plan_task(prompt)
task_id = task["id"]
with _tasks_lock:
# Evict oldest task if we're at capacity
if len(_tasks) >= _MAX_TASKS:
oldest_key = min(_tasks, key=lambda k: _tasks[k].get("created_at", ""))
del _tasks[oldest_key]
_tasks[task_id] = task
# Phase 2: Execute (async - in background thread)
# The task will progress from "planned" -> "executing" -> "completed"/"partial"
thread = threading.Thread(target=_run_compound_task, args=(task_id,), daemon=True)
thread.start()
return {
"id": task_id,
"status": task["status"],
"subtask_count": len(task["subtasks"])
}
def get_task_progress(task_id: str) -> Optional[Dict[str, Any]]:
"""
Get current state of a compound task (for polling/dashboard).
Args:
task_id: The task ID from submit_compound_task()
Returns:
Complete task snapshot including all subtask progress, or None if not found.
Returned structure:
{
"id": task_id,
"prompt": original prompt,
"status": "planned"|"executing"|"completed"|"partial",
"reasoning": explanation from planner,
"subtasks": [
{
"id": subtask ID,
"index": 0,
"description": task description,
"tier_hint": 1|2|3,
"tier_assigned": 1|2|3,
"model": "haiku"|"sonnet"|"opus"|None,
"depends_on": [indices],
"status": "pending"|"routing"|"queued"|"executing"|"completed"|"failed",
"result": str or None,
"cost": float or None,
"started_at": ISO8601 or None,
"completed_at": ISO8601 or None
},
...
],
"created_at": ISO8601,
"planned_at": ISO8601,
"completed_at": ISO8601 or None,
"total_cost": float
}
"""
with _tasks_lock:
task = _tasks.get(task_id)
if not task:
return None
# Return a deep copy for thread safety
return json.loads(json.dumps(task, default=str))
def list_recent_tasks(limit: int = 20) -> List[Dict[str, Any]]:
"""
List recent compound tasks (for dashboard view).
Args:
limit: Maximum number of tasks to return
Returns:
List of task summaries (most recent first):
[
{
"id": task ID,
"prompt": truncated prompt,
"status": current status,
"subtask_count": total subtasks,
"completed_count": subtasks finished,
"total_cost": cumulative USD cost,
"created_at": ISO8601,
"completed_at": ISO8601 or None
},
...
]
"""
with _tasks_lock:
tasks = sorted(_tasks.values(), key=lambda t: t.get("created_at", ""), reverse=True)
return [
{
"id": t["id"],
"prompt": t["prompt"][:100],
"status": t["status"],
"subtask_count": len(t["subtasks"]),
"completed_count": sum(1 for s in t["subtasks"] if s["status"] == "completed"),
"total_cost": t.get("total_cost", 0),
"created_at": t.get("created_at"),
"completed_at": t.get("completed_at")
}
for t in tasks[:limit]
]