From afd14d1d0009965eb31bbe0c2aa89d9a712b70dc Mon Sep 17 00:00:00 2001 From: Claude Date: Mon, 30 Mar 2026 18:18:23 +0000 Subject: [PATCH] =?UTF-8?q?auto-repair:=20commit=207=20uncommitted=20file(?= =?UTF-8?q?s)=20=E2=80=94=202026-03-30?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- engram.db | Bin 53248 -> 45056 bytes engram.db.bak.20260330_181823 | Bin 0 -> 159744 bytes public_api.py | 211 +++++++++++++++++++ symbiont/api.py.bak | 189 +++++++++++++++++ symbiont/api_additions.py | 347 ++++++++++++++++++++++++++++++ symbiont/planner.py | 167 +++++++++++++++ symbiont/task_manager.py | 385 ++++++++++++++++++++++++++++++++++ 7 files changed, 1299 insertions(+) create mode 100644 engram.db.bak.20260330_181823 create mode 100755 public_api.py create mode 100644 symbiont/api.py.bak create mode 100644 symbiont/api_additions.py create mode 100644 symbiont/planner.py create mode 100644 symbiont/task_manager.py diff --git a/engram.db b/engram.db index 6ce01ab12c5b83960a5a36425dd9f87ee5ac1910..b9fd1543d15870a217b9716d8b0f522670e1655d 100644 GIT binary patch delta 442 zcmZozz})bFX@WE>69WSS_e2GIMJ5KlvQA#UcMMz{YZ&;x@!aRz%e|F5ne!UA8fPV^ z7{{8;f&w}mOc`90{W*0QxhFs56rSAAsmaJWc_*hjBNvcw#mF&PhD!}7;>=~n$Tqo( zOM{Vp^I|SlMn?9@C%LVdIRaQGv+-yGr3`p97+EGq@~AMfPF}z(GI`TKVi?T~R9G-Mo?CRX4`1NqBtD7B*ZKB|a4DbxO?~FcC;9bw zSb+ZG0AhwFcE-tW97>E#lQTJlzz`7+P2Xk&&^Ug^8i1sbx`WL1Jc+YhHR$Vy=RVlR|l7u|iR5QfX#Ri9&v!0z#ug zVp>URkwS8QQBi3@3DE6&3IRo_WtsV<#UOo2iOJciDGH?p3MKgpsbJ&u#8Z;=k`lA^ pK!z9?8ymzMS{PaYZC(_hFlhnHVwMR45FDVem?c1AGfTi9egG*9b+rHh literal 53248 zcmeHQOKcoRdLEh-DGp!G+72z-T9++LoRQ{DQjei!do&tJTn#Ciq^xy(AbVztoHl#9 z$6eigBv{AeH8u&51i@khBzp-Ep?m@Fm}0 z)jb?iWJ4U}5U*=2X&%+n_0?B@{g1C|XII~?njUAHj@vRkHXD03_RKS}cNmMsVng^l zfxqEn1TW~b@G%g+hOXKN?-ahx(xz z&Gy|Fe&A?ZYflo)`Bhwcy41D*IXVfgd>$p~9U3iAq`*NbZQYo!f*jnjUwZeL? zVTm|9=3zG2;__PMZe@i%Tv=Qyt-R0fSKepkdzJEimO$%GmQSXR{i@$?pzp>y@|m^D zx7XP61N?ndtwvv6+m}}#e^K4BJ#HVl-xHlDo1-Js`Q$)n(6k$T&ur|iTTXpjth-z| zzFX(vv-&t5P_npuyYg-JhLi%4WiGDPj)48$M(?hucmtp75flng4?kAHJHcRPIz(D$C`{YDkfQ-JM`_G|x|M za<+qvn@sa~W9zl|?u@5G5bMD3q~$FP4^KZx4#ebSiH|Jg^y`N2Ir4qi{sME%hy1Ab zBM*m$r>|TYcwa8x;LC!45CT!KKm7QPi2?V zAVa>%IEN13t>>x}CiVocP*qU}XCI`GMd5pqHuc;(F#dhQ!J2$g%Yp(DBGZGQ9n^GQ=*x6l3W$ zo2fhS(AzU9dez|c+~8u{bK3OMXqz)n41`>`@LMh8=#Wo9Al<-W7TN8}ozkP~8hWUE z<_>i$!F1ovm<>yUsXMK<#o^;mz~LG0(+2>{5Z-!|<101(bRnbLeHdbnA7gMC9P)ZE zGsj=t;+}!9WhhE$+}~P-bFNuVjf!)|_dM5NG7jx|Pq3fNJ~KRBK7Z2s59V8}=fhWn zNHoq;vOxv@z|>LH*47@yebMz!J z;ta+$F!d#kZd^Tmg$JskYX&p}ngPv#Wo|*yr-~Z!B|Mf#Npc&8%Xa+O` zngPv#W8GHu<-ce>VAB zlOIklP39(_oA_elUnl-%;!h?%o!}GqCaz3Oj{n#AKac-t{DWnxHO@vg7NQ#SQH{B%#%xq$CaN(V)tHKEyb{%TIjS)k)tHECoQZ0@ z6xDb!s_~7e#tTu6uSYe$7S+h28effS{AN_+`KZQMq8eY0YJ4fG@my5ni&2f!`uzXb zGYIQZYX&p}ngPv#WYmXZj=!m_ASEX48e-T7IsWn=j54GS>^&uFmIgIbM?q?(Ud%E*Q6sn#CI|A$)wY z!>>8qpZtBhsclVjc zg~uC3wz_!t{$jPtKL6hLnc;g*+C_umdg#&KG~E_3nl}45p9gk))DWHLa}q+O9|+t) zr%ddl?-rf|Hj4P9RWlviOB-!76MW1PhP7+#i?m}W*_DG!njG+IxKqY6Tij)d`Ic!X zJA+vX>%!?_J#?K9dEH~SL8F~n4Y16x&p-JAYx$mUSf=pII>@sO4=_d3Y^PX_CbSJK zCg!uoZN6!Go#!+Cl2hf4EsTcSo=Z3E(qF_&;R)usMtvLj+PJ80SQgtg(Z9J2`z#@`H@IErESI@)9Y!s`Q;;nlM#y$u)8pzptB!4Rk0o-M z>+{LZnUn-N(+AMSmDM#?U3{++;8b8>&TW{joY?o5ZY@4oUSkiItBcE(88YmAW>zd5 zkj-`-ciXhLI}=_jxVhJwAeP=Y0O^B6oQP1)htKOs1JB9 zbFusvFrO`~Ppmnw^Xz2_;pXY7I0T_(?4_lFz%(pli!~fB>=_Re2csqm%9i0bI4e8! zPjzvDZJK*P;5p0}oC&XiYuaWTtbCl5h|ZlJ(JkM!Fzd!P&$Rbp(Xd~1QQPmjg54%8 z*oAjsmI+4(bZrw;mtqaqFu~PFKKFTNARz&~+Ajd|F4X@WLwKc!i)smFJs5O~Wn~w5 zhWqn;^qd7mOkVeeY!IEH8HwRyzZgWl$s4}KU5SJiX_XNIHALy?ly}6svV}>jMftf+ z$A!nVHq%67=gcJuqmPB6<^o3P1sK#qsapo@a?^wJVmC{m*GL~$6-)lN{)+Q%H z!KsbKiyL&#z9!KiA1YkUfuVGMc5WUi(mZd>8oVYsJ3e0uM#ucNk8iL>OHp2 z_odUe5w_7n<^Ve|TIRMxeoI}MY3{>$2>2SFL4@fHF$txQg}VaI)5qFh^ru)d9-?iLg^S+VMa(BZ5_mOCpvAWg3lsS9d@ehmL!z|9Z6Enb z+kzi9Hi>1KD0jYjPC|SC^w1WOvw9Q)k^MB`C;6fT=$sy4X$z=8Q#kmyV8bf)r&LZ&8Jr(y z{3BmhDhp)qbzcY!snuP#iYS&)i}sCNrZAsfT64D9j!X+ zIc?NM4HYkE9CypGP0<29RP!1Z%!!<~y;WqTMdtdLbLq7(BuseAvH3nGypHl0Q(VJb zQ}!c##7x9pd+Tkg|6dsVr`XholRuq&7w7vgjsO1G7h}!%Kg6$({?X{fS$pJXBgXK* z49}hU;hCYK7B1Bf&46a$|1Sfb>ATY5+&{g;xve}MB|grkEb*j30p$z=0Gzf`Mv$29 zm?+5MOk^1@qp%IHD)Rw2nILz8*D`D*ohSvM#`(O_dH#+>)`#wO&v180ZbSVhb2BM_ zAdbjr8iqV%Bk+La0GEt9^63dHJEV4m zcaz^E4h3Bx8&p3cyTBlL(HInQ0(k&cV#1`nNvclBRs_NfDu&MV?S7~6 zE+U4&eZu*1`4)r}K{CX{rtg+RzeK7$yIoC2yTLId&!52`8>PRd86<`|&v zyjqsXJ~%yJOQeTD1Xc1S+w?7KUx|iAs3)>^N|-i68iLksZZ}BdLYj+U#sxkKbw!qN zkPW~dP)Rs08q`E0;6eLw_miEowuW>KS*a$Mnd!hE)ty;swCw2cdKMI%IG1}%R)c= zVoDN|LaT6mx6aq8k^oUOG|@TxR=;MW;FYVTN4G1P7V=BliXi)JtAuwC#L(72P>4ge z>#aUvG)EMZ&iR`X%Hz{BRk6%H)6G6udx+Z1!P=op#B%JdH0Ww!sO}cZYYVcfkGVn_ z<3k;S6LMUn-EbYNc!%;L#hP$V(CZ-`q}Z)WY3X2=>-(J--jw+6o*v&pkU_Chen=TJ zY3(Reu=Hdsp%RPa3i|j8{tZZ_^t^^L2b1o>_(&iLif4Hr&v7i#d10Y%T%o^#%1j}H zR+cPq4coEhil^#hp}UL1)OCIvT36^xifsY=#AI z1L&W zc5R2=Y;s7)u&R4lvM}`}ZtZ}{AW3L?Ub|-6oo`%~sQM6D0&f~1$tn~@)$4j27L$UX z*yigJV6Rs9k0W9)<2}`T$bbO_#Rw?i(OXk27$9PIxj(G`)~~50)(PuVPmdm z|4wZvFbA6qK7_dzXN7Cmx3QzCHdIpzxlCl>&!i_oZqbR)Ni2OxV%^CI0te-buG8@A zq#XbsMLzhgCd72pZ&AvrOo`08rU*GIo<29*uMMf$La)K5t=#`63o*}a4nO=fBE9|d_KRCVRFZrWEa|4Mz#3F zCUT&Ts0}wdoUoy^)D4;OCVEb5Br~k4&i#=fhue|XE4MK-Y(wdcbg)+k$wR4wCz<8vnf~Dgm&(LG_s^X0yi^0vP@s=$;UZsg61cX^9MbD35}KCwGc$Lu$ieX_`UV#c)ahQ`b|mDbPvmE0$~Btlhg~Op57()SgQ@ygbgxxtP-%JWRGx=Bm|OWZ)|L+vo`U7L|6&QIwFGu zTg?m(A8^-88~9n+J0IK6^Y zLJk85evWM9;4~@ROt5={lVT7^n+61XoOtFq4}+r`!HR2)bLmU868fUrw%s3 z0$Ol*19Ga%ePPh14Z6iaL~KAv(G{W*$6kqJ%TcdDYsxk_4qrip#x~KWPOGhu}hca8Fy$?EL{rUDbbImTkQ2rCiD76eD;t7 QgyJpNMA!>?z>WC-0v>*+mjD0& diff --git a/engram.db.bak.20260330_181823 b/engram.db.bak.20260330_181823 new file mode 100644 index 0000000000000000000000000000000000000000..711f2f0ea40043168b743ab7a8c9c7ef59af7b25 GIT binary patch literal 159744 zcmeI533wz|b?>{?T6^_29*;fax%PNGl1J65tv8Lwwnih5g{7IXr164HM0HiQR91Ia zyQ+F50S0X|Q0OxF zcL@HqzJ}nJ{?-Qk4EuiX_3LG!^x5xEr2aV+9i9#)-kADa{O)*vbUpUD=+z^i9GQom z_=N4ll8r5Um`b!={i z_>Li?QPSnh(8%n`rI`~m3*_{|?EKWiedNxW`^fYiGt+mHQBaQ(VQjqXN9#2WuB|OY zKC?7)?-DtA3jUp$n`0kc+;orbevoQb9lg5keCJ(!Y;|O4w=foNM2xDYZy4Ifa@kbZ z>}5;0&AO%Pt)JfRXcNio$>THklJk!yr%saKCq`SFW6JR0Zth_C4ZaJl*vqzV+lE zx;=Joeqd<#*jV@x&u3cg^1uYQ1_PcG)Ot8i>!6+Vf2YO4qshX|?K2BAC#Po?TZnA% zyBA{bokD*8Bla_rop$k6VC z2g8qC>YdZ`OV=S=ys2~c2hZj^sz=B6hze6@mQKx{gp+x)HK_({bD8bD-{)wO}}# zHO~>-296WLapaI{LWEvBG){lhbox`%?KQ`&(O;FCap=5;&=79dD-~sXlY3a;hJheO za(w3YsWWp+a0S&d&eF@e#4T$`p%|| z_T^hsO!qc64}*Zb>?hOx;EL`jFl{Lw658%pEJAQDmdzqvoD+S+u@vIYLmSR{^v5Rp z26s>I>oxwH{cSG`t=}S$XbjV2V*vj3r=Fv~9!kBC`flnw-Ui))Uq}E6AOR$R1dsp{ zKmter2_OL^fCP}h_auQE4};kU2z=XnC49Rk9)<5>^b?`PZzVpMcp~w-#7bf*^^N3# zG-9we~SHS>{nwy8hc|*jols_i^ZdV zAN_;qCsLnJeK_@a@`2=S$&V$UP2HUQVv3}k+lH`I~5`v;A@IszbifTzy z3o;)R1rSB9sHz1?m$Zq1$Y=SiEO1IzlyefFO9#aa7?kJo#avP0Ga45ZWj@E}xk4dR zD9Xjmcufkm}667pb z(n@(<)zllVi5zKlnIwpk$QQU=fzN8Pl0OjC#C%5N3Prx4syXTUDxC70I+d zBeX<&U8(K!eM;aehWl&{M{mJeW_j<`kwWKFstGroH#qO%mTNAHj5Q_jt`&F`L%F)s z4Cp0GSIUlQ?b#KXXdRc~N^l#xq8R0?c1NVvUWqFkE2~aTscOn@-_asx*_&0hsykP9 zPzVBN>(*J_f*(%3w#$ct=T;R^Y)7}Q=vV?ByP{Wh+psV9wUp-sa8L<+sig9PoDnbE zE{c-Oi6x0w@+Ij~+IQa50-T&t@|rGaTvicuP8KimX_8dR!1$9nNy@35ETnu|Nz?^F z*90zK;^A|-q)$^-1x9T_1s5!6pe1}-M$W-l5><|eQOL=~xKGQKl&q#JJjcTY;d(Kj zmKCyjIH|-HbV=l7E$Y)sd0CNiC5eOaR|MA{@o9NMmU3ENGx}S zIWOk2IWAk|Az&yGp9W19Rjs6PIklLFbM{>mIXcn|m(aTs^l6sM=$fSHMTy@352s$B zfj92`*kKID!;k}&n;2l@v%!tbT)fB2(+_=Ncp8eJu5psV7r!PHm(fOg)e~nYt|{rfx`GnHo<1Yx23|)5$L< zKa>2$w@SWCPr zaVjyDkl-Ttg#?fQ5!!fgSYT^3uyrV~bz@-bU|{QpWFZu5QV$3R0$bMywyuju_K&nNq`uqz z3?%TG{ei7(16%t7TYCdr*95k%4s7iSY+V)D+8x-sGO)EPuysXX>+-H5}L)3TzDqwgv)Q{ei7WV5<+h-o5c33ejKtQol}p zfux@HexV!qg#?fQ5CzH*%z}`2vRRFE$AMB-puY zU*u?C^RNG(_ZEw=jnDtTbovr4K>|ns2_OL^fCP{L56_$j%ufOlY zuRm=m_9|RPHQ_azo5Z1S)YK-);_Qh#XXoa~*WdgEQRQ%-ME5xO_;eD6LbYGO6*Ozb_ISmX-vYNO3{FqlX6PUNH=$rQKh`DY}%Yz z9V3T3hpg&gUJ=fecHkAM78%V{jOth;GU1wa;KI#1ZJ7`1sza&@ZSA4OrY(oa*WdUg zsnngiQZ{VIPyt?9allM_)u@e=BJI!`*kV9`7U5;2CBtc4k-k_sbGo(yt$|m>TJ(f% z{FCsj?brlf`KqpgeKj~pRmx?uZou{F>sX0nuIbc8>Qy+d1aFij({r=r6^o}%60`2q z>Q3WE&Na)pa5%T00q0mab!KU1fh^4|E=`gp#a<%<8P(|J3V8-oonE>ZpYp?e{A;D$t7~?A)uIrnLs|)op|hyLMc} zI6H3G?hu13a-61Ap;OJeUEbvEHKWEs?=@e>3qDge-5#xjc^0u?_#c7(tibq$G3PWc zxyd!*=!K&Qm{3tRIM+eIHOk5g(M;X09&(`Lz^N&M>9SJSbTVzyZ*#LpNXgg$3mlWw zZJpST2FKKl8c=!tm}}9V3%BT4-6%t^D{FeXwh0~${tK5VZni_g`YL#^1<`?&4G45# zUCn^58z&ll)3Hq+s_S*V5gv67*nhDG(C9+1e>=Rdc@RI0~0j_wMXog)=9CdY>2ASm&bXO6g z&*+#(3?d%HpjFy2jY|%?c3gy)dUmv~)U;)rGeV$HYA|NTsRu$-tU++AIpnONc#aAC z6$}0-us_&UZy10n`n4C{KhqFQ>GU+ADULGHnvy2yMcw(qcC5PU)GghkKqqQ=3AfXT z9dHrAd%@kex}W!&V!FS3PB2>qXLEO)4V)R@^azCF_>X= zeVhmx_;q1wew=136SYle)vQibX%L~4Qw;_$oS-p$oofx`Ll?@X**a3Ml}&}lZ|^v)v1Zb^?caZ~W;BURx^a2C!Lw(U5bI$aRbf!k zPv~V^Z^W)~jk<^^v!rZH(9q#*0wvY3!8IV`h7*@{uA*0pVBg%_G#R~T(}YZVW#Z)g z7&RE;4LJm%fabq6Errh?B6Jd}n(K|Lu6C`Nzi>pS)~nz-kU`V2)QqPvWE_}nCrNA2 zo9i&=(>T(MYS4jYeMKol2fjYTPh@yVl+NnaI^-v{G7L+lL@B$8a^u=Pu4(sOxM{PH zvpRGNgzRS(f?Ve!4onRdo2KGbn!j?2%_&@Soia`;hH9DIqM|Gk$SNQh1V$l^go>ro zT)hSn)^3blmtpYFAtg4ic&S~ z3gDq@UZo7~M1yv9Ws*$I5~~hQ4;VLYDp^J;q*EIXF>o8}+yjxwl zm9GDHMgB09dNBDH$-5GtO57O#!Ps|WtI;n<^CRyWNeowqzBQx_{>h*`@ZN#`{t6t5 zUq}E6ymSdPcAszq&Yc$?aE^PCjwU{4$z9^P1q%8c^fqLZIM9+u(t-ArGJ{G25Vdlgk#e6&t1*STQtq zAHUdcJOML?4d-d~kEO3bp+dJ>(}*a#ZwEM!7H19LRw6i_vVsR8l^2db)& za5n$P$JXiW#{Oy7vimQb*F3F=X)tLW*Q*+> zaY333lbLSAu2x-2 zYOj(>^SRpUrVUF-NZ+Ao2N@>uu1TQA3?53WK~=qOQQfd@D7ZwXTvINhs7lK#z@TP= zui8*l@w$f&sfo4&DH^Qnpa*I7qm`G_?2>k{I~#?wOi%^BypTruT5M zChS$Pqzo+7lH1-{JO}Xd7NzIgv z4a1r^wR9TRW+sdsx)Ldy)fEnKRiLTX3M{Y7P*$x&U%@h_4%HzDLZ$_2H-wHw*n;K7 zI9Y|@1oe6-9gLG>GgI@OURF07yKZ*vJ8|LmbtV~DtW2M#88fZ5!!iYio;#P&B^E7L z(A|gOdlM>6&x^3kflhb8`DlTpxp;QZ zboFtheHVpz$JJ|4>uN>TcFckytHa#l)(|vPwHvWX*RG3Ld*0EySp#((?@};xxz5zA zpLljaAqxuSu;imtu6wI%0_=qe5IUK_N)JXt29{Y>SWE+@!o^Z*o^csw)HK`-Yi`;k z?O6e{E!Y4CHvO5Rv?m}kIWYN615>D1Q~v@x9SCEd+>UaMUHOZ(Yg^^9d16@jH2coG z6!>$^Y}X!X0gTQ$?lKg-ilS@PEWO?ZGpQRiF3-7kT||a3>vIHp!w}+A^X8a)yJuZ5 z(pW}s9kzXkZj^K=9fMch0YesCeO@o01(MAqVbyVJMWfodEbCfz5k;0}G;LZk;aM^1 zU3Iwz9y1PcVohI$WJ`z17b3wpWOcd<&a?a=U7fqj@n!fNga!-lvg%fI;%tm$T%#r~ zA~3WDAXJpfaAVGQ|IWLiKuoyFP=~@?MJHk|zXo?Sy&J0Ip5<;L1M$p_B#>J)qOxnt zMU=$aoza{)XufEfT3w~J1F(UWwT#34M(F3x*czU^Xu|DMW7F@M? z)7HKJv#QgH%Z;2+*LJINK!@fGt-CMY%@=nQy3BtUi0BFwQ=k&QM%Vx0=&yv}Km0-h zNB{{S0VIF~kN^@u0xw+xZ#^C!3{6cu=`N_mwIfI`!0 z_kj(1-_2i*gt|#9QEx4~Vb$$F5iB@6=aPAzhjV#F8*+Np)T_n@nS;vf-G;v2%;kv& z51tlH#nRwBj$Vd!y|W4xu$DD4oM9T){B>HVaBB`_WfK;(?$ejBP@yFN8!DwblvC(~ zTi#=b?)O&F5Ki1&3|o4s43A|%VL$CYdof9l!&8czBuk(KbY^K9*3hj3U{Od<;x70r z+B#hFHn@>XpQ!P+(ol$qMyD5Mj?bK2nuS8*+^OlQC3t{v5h}Q?B@-0mpjHNtsFpX! zz>2ngub7`Y?cJQ6Y*rK6Er+5sJXyPve$a-qz;g+3UzTKnW0XLp--hZ4EVI1=gj*Dv zC^g#*Pv)vRJlSDSq~Q@AIC{!^;s>4wQ8ck2^P)ITyj$3Cr+@bNBoPEz%m~>+7V6O! z+#-kKJT$myZn)~{>>60ye4J(yU=($?Y6SJXU}eFrK18E;&fG_i&o0bNFP&PLb-im_ z*{|(Xevg7q)7={t&#oCOiZMx_yK)S;VKCYr^|S&*?<52|jecal`K*gwfsa867oH4=Mvu*%IsqCy z3%jzd<^Ydr8ddj+nfXoP-g|*Qgj+u2P_S5m+X^eqzXJHu4fALj5_iKGW)Ax?PH>93PeKz%h)MF_#wUElBu1da;d^-8*Vqc2=LhPNf zjoAIMTVmJ6`lH{9ej)mE(Z{3q(1(WJHe?RnH6#t~8hn26D}%o__@jf54n8z^|KQBv z#NeL6$iTM;{&e8e10NcA=fE2VjDf|0qXUNqQvJ{Of35#>;pf9&3x6*B(LS~BMBm}Q zRQOflss0c3ziniFx?^`lXkCrxRu%Y^?dVqcM34Qq_t-zvWB+l6#Tr9X z8CsE{DVImaS`_l!is9%=_|;wZEqB@XeO>mws>{9yy6n5Z%f46cif}DkvhboCR)d4xYpZc!}oOGeRucWXS(lRx+)^I&Mk3eV`bH;DOF7gFZL1^dI@*+ z5>ERrEpnEl-qI@=p0XMWG1QeF{yg1d|FItXr+Vza zt>XnE&#mZH-8Ss-tzGxt(sl3Au6u9py7x%ey^~${7P{`ucio%ox;NW(Z>H;Bx$9o3 z>t3iBk5lw?jUNxYITN#Vmx z^(Lk|##Bd{>LI3jBU3%tbB5aJsjT-@&h}L5FdbgvPh5Of30%dnZFqw+2XBLREVCT0 zF;%nYRUYoCysoG6+Mdd5m{wGoYK5tm88K;%p*_gZ450fyKjD|+?@THdQ+;S}5= zOTnm?U@T|3jIK$FUX)I-#M@co3`;!D5~o?>F_t*R5^rOPx3a`rSmIHZcr!~p!V)K0 zVu2;*Sz?YQW?5o}CCV&OVu>P46j&n95+_(isHxR)hf!xFD%iF;V$RV;CLG<-O8F1#z0dN6e= zC8zeL`jgKke?R%L|6F-}HEKy4=CGv^AiBS9-@z2LU9Dh9S#P5yY94GPq*t4-O#y%E% z64v<-#BPfnhz&;nI{L-vN25(P6oN21q8!z14q`P|3{N8UD48(AF5j_eu!x8bi1 ze`fgo!*3a08$LNK3|}$y{Loj2etqaChPH-EL$gEUL#e@k9Q@w&bKrr2TL-Qm80det|Ed0u^uN8|f%?W||GxfE<((3z)-6k~I^iDx@$zkRv2UWwaH1gRWr>s3d_h-~ zV)&hVBS+e&5kyJk3tX2sg(6>2)tnT5M^^)$=(_jqUH3lTb?*;# z-TQ-G_r9&`-p5jJ3FWpy*vY03RF>l2$I3fa6`5+tsmOCl$0q5sShf5InbS>oqe;^$c6XIbL!_(VCQ`mUtOUyp$zg!V*&~G074WEHTa!V=OVs5=U6#FiRX_ ziGwV0fF<^`#0X35V~Js(C`qMEK`qIgB;{027Q!Kh82S&E_`g`!BpSKRNufh)`9DXA0M`d?-@Qayni?{^sS+%hCVv!8vOX+ zyHnpteLnT!)Z-~9b#LnC6iM|bpG`iM{7CZcNhf(vax%Fu8A?2p_-x_>iN_LVVj+=9 zTor#I{&f7)@%P0ajXxNlkMr@%W8aN^CHAS(I zY+fx15?9bAk(0I1&#=UwW{K}-iSJ{HKgAM1#u7iu5CUPX)8fAK3$mdH1PS^A-CzmvarZTi5LsNXloHwd}jV1o7@2q)2 zmU3ENV(lGc=2#J;cyz49#R{Rfbk!Xk~`B#?T&QXa+-DWoRo5t;EoD zU&o&}pnjVr{uiHE%FBwBD@mLx$VE<8LqEztem{ToYsfhGPcOZ*p>_|Giyd6xKHmiRwe;y#v&8qX#CNmAcd^8G zvcxA@;yYO46D;xVEb(!c_(Lr52U+6VSmI+W@dsGqTUp}wv&6Ts#79};n_1$USmGlr zaf>CMV~Gux_(sNr^ah6ZdWQBeLwg-Vdo4qI4MW>xXd4V|ouQrm9$z=2`A7f>AOR$R z1dsp{Kmter2_OL^fCP{L5bFC-F0h1My#pFUI~R_HgWQG&Az;k@t*THT?dee;%40{AD;4zmNbDKmthM z zy<4}8lA&v4cFkB(3}RGjWxb+T9mO%s>Le*zN>yDY`i5aU_Bc^ZO^0ulno-ta^Yj8a zkRxVIucpbdvbjPuy;e3i;dDw>BW2^Po^B+z_PEC7c4i!(nG}SyRFLuo;dtw8MS5W=S zcGI8?CrZ4Wk-4HEXd<7KzqPNh6fYNqXw z+<4nacRM@gx3p&IXAQG%6Mbjqj&1FB%{4pbf{`K*e|t>OvPDtwn=4XtMIl|tiG}Po zey%M5@o?KZim*GR&TbdI+mk-p?KW&fUfa5I037iR9a9B4FH7)e|MCT0QHp+3g-N+E znaQLj@ItBMhf29jvc%tG1g>U-M-rn7u4g&=#N%f++%!mgRgXZT=m zw(yKrb?YqcAA8ew^vYPk>AZuoq^g{S;X;SV!qyc7p(E4N&C5%ID1onVxdNZnWF_Bj zv9tukPJ;d^q~(ktNc<}op%Ly-BV}D#BPC;loNbQAnXSv=q*9@CQZb(qxk8aIsA>)_ z&>YV0NyYqRMx28IQnJhnnJF6^hSeMk4QuPN!4NmKb!qENoLaXmfEhf60B)Xxh>*)< zP@DMcr`&rKPXxRU4X2;EyIlkfN>&zf+i{I{3O# zEm?}~SasC_Y^0*pN?R#v{%d!3EuNRMr_EZu>;^?410mRfE`Y`~ZMQwYa3GM|s%6w1 zaA>8x39VDNlJr9Lom?neC}iZqY}IfKrEI(gPTve)$B&WGbp@hXrRHpsufOq0;wVKL z7b_5Np*P1?wi3WjYK*Wi!}F6o-#$NegI0W0SM`eGj>4MPPAa+pX{>-2w;I|AZN=S7 z9ofO9Za3>NnRwSoYeizVf{WV*8rqwa9=Ms)YLfyzwtqb`C{MEC%?a=55 zfTqhz9Wn-p+|#aAt+8!d1Of~2vukFRLeRH{2Sb9?9%0Z7%YatEc<{F&Q0L3-Nt9i4XC1Vk{8X~b28 zG!y+JQVL7f3GBqr*T>!9^oqq(C%rkZY{9^WIjfaQ!+B`p>ZQ%#>*;gX z-4s8%r?H=QOXG;ITSPuDR{c`_rX;iACG@6AjFvT-RbOMF4x2I@l7O&>DX=P4wjIG)D_ zgIw3uMle&ieW%-&ZM#do^dtF06#of||4maZ{O2u|Jp7BT_(8)v4IplaoNR^-FKWP| zsKu2g^k@3@!qZju$>8m+yPwzTyrSeiBGGH+St_J))bC85(BFQ()sGr6>slM9mE z-Da5R?X|X{+9Vkj#z@e^TOH?Li?)^pbjGF;B-kJ0WK@cWXDGHv1HE8afQq=jsRJN3Uxfc7yLJUl#_K^Io6OPTJ8V z<&F}F1IEqUk}fuTpBV341brL-Vta#`8mi%i8UGGw6&w!;XVCaooKYgof--yc)G zC1Nn3_Kc_gY~sZRYI-Yn106akcj)Z*bC(1;%aycJURO2Mdo{>0cdk_Gv_5zE{EW>u zR;pyw^O%VxSTffXXLWO8K`)iP#r0U%5g|-UfHRxU@UmFw6b@bfy_IGuM=n6YW6m`R zoa%PS%4t-X-z4*Ox?ZOSk2Yfv)WnX%5_^R%yQkf%&bB5@Hiz*zX&XOII@XVqd06Nc z&9Y%vfH9<8G@;%;yxODFbNlpzHZ2U&wK**HsXg05H93BA5n#GShcx8W`@84W?%s2^ zjM2{WvbcJyvmm(^1X0LxdZv&qDWd2NlK)!+Wb^z^gIrCZHC-W_mgPcD?hK6`DEYLQ z$&2!~gu2@tN5j~3E1PeYNrGd*pHu`2cyy|CJ6};%T2Lt(Wy9G_H|{tPJ+ftkamGXzOo%Eb|z)=c`28FpRQ*^t0M&D$z(wn|Z2u|R7v!Ryi2cENz$60eJMdm7w+308%bwz;j+pch@d7}$!{?!-?^OmKS0 z@nHF5yOS!U5>Py_UW#hg4I3Ic40}ckiv%-g6nT-D{0@*68(`EOOaQUKfR&_W!BZXboDaCsWC}{oMeuaBAn*82K-zWl6L`q) z+G!uSI#UrRMYwBT5b|OMvT{En$VKU`-vBAPuxvhs(k6BO!z1(-zHiaK{jLCXPV*Lk zdk#j0hK$~^v~-$?JU^D+k|wBkPP;M1SJ)Q>pe%AlRV_%mv?K5IN?#1j+p8#^7xY>c z0b)q20lt`)^MWLae)un@$nf|}rU19e+snVM_lKy-+bX~p>d~~HI`8)$h@uRbiTit5ca%XJtSw1TZoRSsgoW$qgjPA`buGx6bdq4zkv{G*a z$83+>)>!R~RA`s4{vBL4x2xSi=Z=L3q_q(={2C3jk>)BDQL6zPLb97AYcE?o2*E{fl zo1EwJ#avP0GdmhnlfE9J!@5<9Y&nrPc ? ORDER BY started_at DESC", + (cutoff,) + ) + recent = [dict(r) for r in cur.fetchall()] + + conn.close() + return {"active_sessions": active, "recent_24h": recent} + + +# ──────────── /engram/register — register a session ──────────── +@app.post("/engram/register") +async def engram_register( + session_type: str = Query(default="claude-chat"), + description: str = Query(default="External Claude session"), + auth: str = Depends(verify_key), +): + """Register a new session in engram.""" + import sys + sys.path.insert(0, "/data/symbiont") + from symbiont.engram import Engram + eng = Engram() + sid = eng.register(session_type, description) + return {"session_id": sid, "status": "registered"} + + +# ──────────── /engram/log — log to a session ──────────── +@app.post("/engram/log") +async def engram_log( + session_id: str = Query(...), + message: str = Query(...), + auth: str = Depends(verify_key), +): + """Log a message to an existing session.""" + import sys + sys.path.insert(0, "/data/symbiont") + from symbiont.engram import Engram + eng = Engram() + eng.log(session_id, message) + return {"status": "logged"} + + +if __name__ == "__main__": + import uvicorn + uvicorn.run(app, host="127.0.0.1", port=8112) diff --git a/symbiont/api.py.bak b/symbiont/api.py.bak new file mode 100644 index 0000000..0fb83c9 --- /dev/null +++ b/symbiont/api.py.bak @@ -0,0 +1,189 @@ +""" +FastAPI server for the Symbiont orchestrator. + +Endpoints: + POST /task — Run a task through the router + POST /queue — Add a task to the queue + GET /status — Health check + rate limit status + GET /ledger — Recent ledger entries + GET /ledger/stats — Aggregate cost/usage stats + GET /sessions — Active and recent sessions + GET /sitrep — Situation report for new sessions +""" + +import json +import logging +from datetime import datetime +from pathlib import Path +from typing import Optional + +from fastapi import FastAPI, HTTPException +from pydantic import BaseModel + +from .dispatcher import ModelTier, rate_limits +from .router import route_task +from .scheduler import enqueue_task, get_pending_tasks +from .engram import Engram as SessionRegistry + +logger = logging.getLogger(__name__) + +app = FastAPI( + title="Symbiont", + description="Self-sustaining AI orchestrator", + version="0.1.0", +) + +LEDGER_PATH = Path("/data/symbiont/ledger.jsonl") + + +class TaskRequest(BaseModel): + task: str + system_prompt: Optional[str] = None + force_tier: Optional[str] = None # "haiku", "sonnet", "opus" + + +class QueueRequest(BaseModel): + task: str + priority: int = 5 + metadata: Optional[dict] = None + + +@app.post("/task") +async def run_task(req: TaskRequest): + """Execute a task immediately through the router. Logs to session registry.""" + force = None + if req.force_tier: + try: + force = ModelTier(req.force_tier.lower()) + except ValueError: + raise HTTPException(400, f"Invalid tier: {req.force_tier}") + + # Register this dispatch as a micro-session + reg = SessionRegistry() + sid = reg.register("api-task", req.task[:120]) + + result = route_task( + req.task, + system_prompt=req.system_prompt, + force_tier=force, + ) + + # Complete the micro-session + if result["success"]: + reg.complete(sid, f"Completed via {result['model_used']} (${result['estimated_cost_usd']:.4f})") + else: + reg.complete(sid, f"Failed: {result.get('error', 'unknown')}") + + return result + + +@app.post("/queue") +async def queue_task(req: QueueRequest): + """Add a task to the queue for later processing.""" + task_id = enqueue_task(req.task, req.priority, req.metadata) + return {"task_id": task_id, "status": "queued"} + + +@app.get("/status") +async def status(): + """Health check and current rate limit status.""" + limits = {} + for tier in ModelTier: + if rate_limits.is_limited(tier): + limits[tier.value] = { + "limited": True, + "until": rate_limits.limited_until[tier].isoformat(), + } + else: + limits[tier.value] = {"limited": False} + + pending = get_pending_tasks() + + return { + "status": "alive", + "timestamp": datetime.now().isoformat(), + "rate_limits": limits, + "pending_tasks": len(pending), + } + + +@app.get("/ledger") +async def get_ledger(limit: int = 50): + """Return the most recent ledger entries.""" + if not LEDGER_PATH.exists(): + return {"entries": [], "total": 0} + + lines = LEDGER_PATH.read_text().strip().split("\n") + entries = [] + for line in lines[-limit:]: + try: + entries.append(json.loads(line)) + except json.JSONDecodeError: + continue + + return {"entries": entries, "total": len(lines)} + + +@app.get("/ledger/stats") +async def ledger_stats(): + """Aggregate statistics from the ledger.""" + if not LEDGER_PATH.exists(): + return {"total_calls": 0} + + lines = LEDGER_PATH.read_text().strip().split("\n") + entries = [] + for line in lines: + try: + entries.append(json.loads(line)) + except json.JSONDecodeError: + continue + + if not entries: + return {"total_calls": 0} + + total_cost = sum(e.get("estimated_cost_usd", 0) for e in entries) + by_model = {} + for e in entries: + model = e.get("model", "unknown") + if model not in by_model: + by_model[model] = {"calls": 0, "cost": 0, "tokens_in": 0, "tokens_out": 0} + by_model[model]["calls"] += 1 + by_model[model]["cost"] += e.get("estimated_cost_usd", 0) + by_model[model]["tokens_in"] += e.get("input_tokens", 0) + by_model[model]["tokens_out"] += e.get("output_tokens", 0) + + successes = sum(1 for e in entries if e.get("success")) + + return { + "total_calls": len(entries), + "successful_calls": successes, + "total_estimated_cost_usd": round(total_cost, 4), + "by_model": by_model, + "first_entry": entries[0].get("timestamp") if entries else None, + "last_entry": entries[-1].get("timestamp") if entries else None, + } + + +@app.get("/sitrep") +async def get_sitrep(): + """Situation report — what other sessions are doing. Read this first.""" + reg = SessionRegistry() + return { + "report": reg.get_situation_report(), + "active": reg.get_active_sessions(), + "recent_24h": reg.get_recent_sessions(hours=24), + } + + +@app.get("/sessions") +async def get_sessions(status: Optional[str] = None): + """List sessions, optionally filtered by status.""" + reg = SessionRegistry() + if status == "active": + return {"sessions": reg.get_active_sessions()} + elif status == "completed": + return {"sessions": reg.get_recent_sessions(hours=168)} # 1 week + else: + active = reg.get_active_sessions() + recent = reg.get_recent_sessions(hours=24) + return {"active": active, "recent_24h": recent} diff --git a/symbiont/api_additions.py b/symbiont/api_additions.py new file mode 100644 index 0000000..f576ade --- /dev/null +++ b/symbiont/api_additions.py @@ -0,0 +1,347 @@ +""" +FastAPI Endpoint Additions for Compound Tasks +============================================== + +New endpoints to integrate into the existing Symbiont API (/data/symbiont/symbiont/api.py). + +These endpoints expose the compound task system to external callers: +1. POST /task/compound - Submit a new compound task for execution +2. GET /task/{task_id}/progress - Poll for task progress +3. GET /tasks/recent - List recent tasks (dashboard view) + +Authentication: +- Task submission requires a bearer token +- Progress polling requires no auth (task ID is the "secret") +- Recent tasks list is unauthenticated (low-sensitivity data) + +Integration Instructions: +1. Import task_manager functions at the top of api.py: + from .task_manager import submit_compound_task, get_task_progress, list_recent_tasks + +2. Define authentication token (set to your secret): + TASK_AUTH_TOKEN = "cortex-tasks-2026" + +3. Add these classes and functions to api.py + +4. Add the three routes to your FastAPI app instance +""" + +from fastapi import FastAPI, HTTPException, Depends, Header +from pydantic import BaseModel, Field +from typing import Optional, List, Dict, Any +from .task_manager import submit_compound_task, get_task_progress, list_recent_tasks + +# These imports should be added to your api.py +# from .task_manager import submit_compound_task, get_task_progress, list_recent_tasks + + +# ============================================================================ +# Configuration +# ============================================================================ + +# Simple shared secret for task submission auth +# In production, consider using OAuth2, API keys, or other stronger methods +TASK_AUTH_TOKEN = "cortex-tasks-2026" + + +# ============================================================================ +# Authentication +# ============================================================================ + +def verify_task_auth( + authorization: Optional[str] = Header(None), + token: Optional[str] = None +) -> str: + """ + Verify task submission authentication. + + Supports two methods: + 1. Bearer token in Authorization header: "Authorization: Bearer {token}" + 2. Query parameter: ?token={token} + + Args: + authorization: Authorization header value + token: Token from query parameter + + Returns: + Verified token + + Raises: + HTTPException 401: Invalid or missing token + """ + auth_token = token or ( + authorization.replace("Bearer ", "") if authorization else None + ) + + if auth_token != TASK_AUTH_TOKEN: + raise HTTPException(status_code=401, detail="Invalid or missing auth token") + + return auth_token + + +# ============================================================================ +# Request/Response Models +# ============================================================================ + +class CompoundTaskRequest(BaseModel): + """Request body for compound task submission.""" + + prompt: str = Field( + ..., + description="The user's request to decompose and execute", + min_length=1, + max_length=5000 + ) + token: Optional[str] = Field( + None, + description="Auth token (alternative to Bearer header)" + ) + + class Config: + json_schema_extra = { + "example": { + "prompt": "Search for Python concurrency patterns and summarize the top 3" + } + } + + +class CompoundTaskResponse(BaseModel): + """Immediate response from task submission.""" + + id: str = Field( + ..., + description="Unique task ID for polling progress" + ) + status: str = Field( + ..., + description="Current task status (planned, executing, completed, partial)" + ) + subtask_count: int = Field( + ..., + description="Total number of subtasks in the plan" + ) + + class Config: + json_schema_extra = { + "example": { + "id": "compound-a1b2c3d4e5f6", + "status": "planned", + "subtask_count": 3 + } + } + + +class SubtaskSnapshot(BaseModel): + """Current state of a single subtask.""" + + id: str + index: int + description: str + tier_hint: int + tier_assigned: Optional[int] + model: Optional[str] + depends_on: List[int] + status: str + result: Optional[str] + cost: Optional[float] + started_at: Optional[str] + completed_at: Optional[str] + + +class TaskProgressResponse(BaseModel): + """Complete progress snapshot for a compound task.""" + + id: str + prompt: str + status: str + reasoning: str + subtasks: List[SubtaskSnapshot] + created_at: str + planned_at: str + completed_at: Optional[str] + total_cost: float + + class Config: + json_schema_extra = { + "example": { + "id": "compound-a1b2c3d4e5f6", + "prompt": "Search for Python patterns and summarize", + "status": "executing", + "reasoning": "Breaking into search, summarization, and output formatting", + "subtasks": [ + { + "id": "compound-a1b2c3d4e5f6-sub-0", + "index": 0, + "description": "Search for Python concurrency patterns", + "tier_hint": 2, + "tier_assigned": 2, + "model": "sonnet", + "depends_on": [], + "status": "completed", + "result": "Found patterns including...", + "cost": 0.04, + "started_at": "2026-03-21T15:30:00Z", + "completed_at": "2026-03-21T15:30:05Z" + } + ], + "created_at": "2026-03-21T15:30:00Z", + "planned_at": "2026-03-21T15:30:00Z", + "completed_at": None, + "total_cost": 0.04 + } + } + + +class TaskSummary(BaseModel): + """Summary of a task for list view.""" + + id: str + prompt: str + status: str + subtask_count: int + completed_count: int + total_cost: float + created_at: str + completed_at: Optional[str] + + +# ============================================================================ +# Endpoints +# ============================================================================ + +def setup_compound_task_endpoints(app: FastAPI) -> None: + """ + Register all compound task endpoints on the FastAPI app. + + Call this from your main api.py file: + from .api_additions import setup_compound_task_endpoints + setup_compound_task_endpoints(app) + + Or manually add the @app.post/@app.get routes below. + """ + + @app.post( + "/task/compound", + response_model=CompoundTaskResponse, + tags=["Compound Tasks"], + summary="Submit a compound task", + description="Submit a user prompt to be decomposed into subtasks and executed in parallel" + ) + async def create_compound_task( + req: CompoundTaskRequest, + auth: str = Depends(verify_task_auth) + ) -> CompoundTaskResponse: + """ + Submit a compound task for execution. + + The task is decomposed by Haiku into subtasks that can run in parallel. + Returns immediately with a task ID for polling progress. + + Args: + req: Task request with prompt and optional token + auth: Verified authentication token + + Returns: + Task ID and initial status + + Example: + curl -X POST http://localhost:8000/task/compound \\ + -H "Authorization: Bearer cortex-tasks-2026" \\ + -H "Content-Type: application/json" \\ + -d '{"prompt": "Search for patterns and summarize"}' + """ + result = submit_compound_task(req.prompt, auth_token=auth) + return CompoundTaskResponse(**result) + + @app.get( + "/task/{task_id}/progress", + response_model=TaskProgressResponse, + tags=["Compound Tasks"], + summary="Get task progress", + description="Poll for current execution status of a compound task" + ) + async def task_progress(task_id: str) -> TaskProgressResponse: + """ + Poll for compound task progress. + + No authentication required - task ID serves as the secret. Clients can poll + this endpoint to track execution progress and see individual subtask results. + + Args: + task_id: The task ID from create_compound_task + + Returns: + Complete task snapshot including all subtasks and their progress + + Raises: + 404: Task not found (expired from cache or invalid ID) + + Example: + curl http://localhost:8000/task/compound-a1b2c3d4e5f6/progress + """ + progress = get_task_progress(task_id) + if progress is None: + raise HTTPException(status_code=404, detail="Task not found") + return TaskProgressResponse(**progress) + + @app.get( + "/tasks/recent", + response_model=List[TaskSummary], + tags=["Compound Tasks"], + summary="List recent tasks", + description="Get summaries of recently submitted compound tasks" + ) + async def recent_tasks(limit: int = 20) -> List[TaskSummary]: + """ + List recent compound tasks (dashboard view). + + Useful for monitoring system activity and recent executions. + No authentication required. + + Args: + limit: Maximum number of tasks to return (default 20, max 100) + + Returns: + List of task summaries ordered by most recent first + + Example: + curl 'http://localhost:8000/tasks/recent?limit=10' + """ + if limit > 100: + limit = 100 + tasks = list_recent_tasks(limit=limit) + return [TaskSummary(**t) for t in tasks] + + +# ============================================================================ +# Manual Integration (if not using setup_compound_task_endpoints) +# ============================================================================ + +""" +If you prefer to manually add routes instead of using setup_compound_task_endpoints(), +add these to your api.py after creating the FastAPI app: + + # At the top of api.py, import these: + from .task_manager import submit_compound_task, get_task_progress, list_recent_tasks + + # Then add these route definitions: + + @app.post("/task/compound", response_model=CompoundTaskResponse, tags=["Compound Tasks"]) + async def create_compound_task(req: CompoundTaskRequest, auth: str = Depends(verify_task_auth)): + result = submit_compound_task(req.prompt, auth_token=auth) + return CompoundTaskResponse(**result) + + @app.get("/task/{task_id}/progress", response_model=TaskProgressResponse, tags=["Compound Tasks"]) + async def task_progress(task_id: str): + progress = get_task_progress(task_id) + if progress is None: + raise HTTPException(status_code=404, detail="Task not found") + return TaskProgressResponse(**progress) + + @app.get("/tasks/recent", response_model=List[TaskSummary], tags=["Compound Tasks"]) + async def recent_tasks(limit: int = 20): + if limit > 100: + limit = 100 + tasks = list_recent_tasks(limit=limit) + return [TaskSummary(**t) for t in tasks] +""" \ No newline at end of file diff --git a/symbiont/planner.py b/symbiont/planner.py new file mode 100644 index 0000000..2267985 --- /dev/null +++ b/symbiont/planner.py @@ -0,0 +1,167 @@ +""" +Compound Task Planner +===================== + +Takes a user prompt and uses Haiku (the cheapest tier) to decompose it into +independent subtasks that can be executed in parallel where possible. + +The planner handles: +- Breaking down complex requests into manageable subtasks +- Identifying parallelizable work (tasks with no dependencies) +- Suggesting optimal tier assignments (Haiku/Sonnet/Opus) +- Graceful fallback to single-task execution if planning fails +""" + +import json +import subprocess +import uuid +from datetime import datetime, timezone +from typing import Dict, List, Any + + +PLANNING_PROMPT_TEMPLATE = """You are a task planner. Given a user's request, break it into independent subtasks that can be executed in parallel where possible. + +Return ONLY valid JSON in this exact format: +{{ + "reasoning": "Brief explanation of your decomposition strategy", + "subtasks": [ + {{ + "description": "Clear, self-contained task description", + "tier_hint": 1, + "depends_on": [] + }} + ] +}} + +Rules: +- tier_hint: 1=simple/extraction, 2=moderate/writing/code, 3=complex/reasoning +- depends_on: array of subtask indices (0-based) that must complete first +- Tasks with empty depends_on can run in parallel +- Each subtask should be self-contained enough to execute independently +- Keep subtasks to 2-6 items (don't over-decompose simple requests) +- For simple single-step requests, return just one subtask + +User request: {prompt}""" + + +def plan_task(prompt: str) -> Dict[str, Any]: + """ + Use Haiku to decompose a prompt into subtasks. + + Args: + prompt: The user's request to decompose + + Returns: + A compound task plan with the following structure: + { + "id": "compound-{uuid}", + "prompt": original prompt, + "status": "planned", + "reasoning": explanation from Haiku, + "subtasks": [ + { + "id": "compound-{uuid}-sub-{i}", + "index": i, + "description": task description, + "tier_hint": 1|2|3, + "tier_assigned": None (will be set during execution), + "model": None (will be set during execution), + "depends_on": [indices of predecessor tasks], + "status": "pending", + "result": None, + "cost": None, + "started_at": None, + "completed_at": None + }, + ... + ], + "created_at": ISO8601 timestamp, + "planned_at": ISO8601 timestamp, + "completed_at": None, + "total_cost": 0.0 + } + """ + + task_id = f"compound-{uuid.uuid4().hex[:12]}" + now = datetime.now(timezone.utc).isoformat() + + planning_prompt = PLANNING_PROMPT_TEMPLATE.format(prompt=prompt) + + try: + # Invoke Claude CLI with Haiku for planning (cheapest option) + result = subprocess.run( + ['claude', '-p', '--model', 'haiku', '--output-format', 'json'], + input=planning_prompt, + capture_output=True, + text=True, + timeout=30 + ) + + output = json.loads(result.stdout) + response_text = output.get('result', output.get('content', '')) + + # Extract JSON from response (handle markdown code blocks) + if '```json' in response_text: + response_text = response_text.split('```json')[1].split('```')[0] + elif '```' in response_text: + response_text = response_text.split('```')[1].split('```')[0] + + plan = json.loads(response_text.strip()) + + # Build the compound task structure + subtasks = [] + for i, st in enumerate(plan.get('subtasks', [])): + subtasks.append({ + "id": f"{task_id}-sub-{i}", + "index": i, + "description": st['description'], + "tier_hint": st.get('tier_hint', 2), + "tier_assigned": None, + "model": None, + "depends_on": st.get('depends_on', []), + "status": "pending", + "result": None, + "cost": None, + "started_at": None, + "completed_at": None + }) + + return { + "id": task_id, + "prompt": prompt, + "status": "planned", + "reasoning": plan.get('reasoning', ''), + "subtasks": subtasks, + "created_at": now, + "planned_at": now, + "completed_at": None, + "total_cost": 0.0 + } + + except Exception as e: + # If planning fails, wrap the whole thing as a single task + # This ensures the system degrades gracefully + return { + "id": task_id, + "prompt": prompt, + "status": "planned", + "reasoning": f"Planning failed ({str(e)}), treating as single task", + "subtasks": [{ + "id": f"{task_id}-sub-0", + "index": 0, + "description": prompt, + "tier_hint": 2, + "tier_assigned": None, + "model": None, + "depends_on": [], + "status": "pending", + "result": None, + "cost": None, + "started_at": None, + "completed_at": None + }], + "created_at": now, + "planned_at": now, + "completed_at": None, + "total_cost": 0.0 + } diff --git a/symbiont/task_manager.py b/symbiont/task_manager.py new file mode 100644 index 0000000..3b6ee4a --- /dev/null +++ b/symbiont/task_manager.py @@ -0,0 +1,385 @@ +""" +Compound Task Manager +===================== + +Manages the complete lifecycle of compound tasks: +- Tracks task state through planning and execution phases +- Executes subtasks in parallel while respecting dependencies +- Updates progress in real-time (via polling) +- Logs all executions to the immutable ledger + +Architecture: +1. submit_compound_task() - plans task (sync, fast) then spawns background execution +2. Background thread executes subtasks in dependency-respecting waves +3. get_task_progress() - clients poll for current state +4. list_recent_tasks() - dashboard view of recent executions +""" + +import json +import threading +import subprocess +import uuid +from datetime import datetime, timezone +from concurrent.futures import ThreadPoolExecutor, as_completed +from typing import Optional, Dict, List, Any +from pathlib import Path +import time + +from .planner import plan_task + + +# In-memory store for active/recent tasks +# Limited to _MAX_TASKS to prevent unbounded memory growth +_tasks = {} +_tasks_lock = threading.Lock() +_MAX_TASKS = 50 + +# Model-to-tier mapping +TIER_MODELS = {1: "haiku", 2: "sonnet", 3: "opus"} + +# Approximate costs per execution (USD) +# These are rough estimates - actual costs depend on token usage +TIER_COSTS = {1: 0.008, 2: 0.04, 3: 0.15} + +# Ledger file path (immutable execution log) +LEDGER_PATH = Path("/data/symbiont/ledger.jsonl") + + +def _log_ledger(entry: Dict[str, Any]) -> None: + """ + Append an entry to the immutable execution ledger. + + Args: + entry: Dictionary with execution details (timestamp, model, tokens, cost, etc.) + + The ledger is a jsonl file (one JSON object per line) used for: + - Cost tracking and billing + - Audit trails + - Performance analysis + """ + try: + with open(LEDGER_PATH, "a") as f: + f.write(json.dumps(entry) + "\n") + except Exception: + # Silently fail ledger writes to prevent task execution failures + pass + + +def _execute_subtask(subtask: Dict[str, Any]) -> Dict[str, Any]: + """ + Execute a single subtask via Claude CLI. + + Args: + subtask: Subtask dict with description and tier assignment + + Returns: + Updated subtask dict with result, status, cost, and timing info + + Process: + 1. Determine model from tier assignment + 2. Invoke Claude CLI with subprocess + 3. Parse JSON output and extract result + 4. Calculate cost and record in ledger + 5. Update subtask status and completion time + """ + + model = TIER_MODELS.get(subtask.get("tier_assigned") or subtask.get("tier_hint", 2), "sonnet") + _update_subtask(subtask, model=model, status="executing", + started_at=datetime.now(timezone.utc).isoformat()) + + try: + # Execute via Claude CLI with JSON output mode + result = subprocess.run( + ['claude', '-p', '--model', model, '--output-format', 'json'], + input=subtask["description"], + capture_output=True, + text=True, + timeout=120 + ) + + output = json.loads(result.stdout) + response_text = output.get('result', output.get('content', str(output))) + + # Extract token counts for cost calculation + tokens_in = output.get('input_tokens', 0) + tokens_out = output.get('output_tokens', 0) + cost = TIER_COSTS.get(subtask.get("tier_assigned", 2), 0.04) + + truncated = response_text[:2000] + if len(response_text) > 2000: + truncated += "\n[TRUNCATED...]" + _update_subtask(subtask, status="completed", result=truncated, + cost=cost, completed_at=datetime.now(timezone.utc).isoformat()) + + # Log successful execution to ledger + _log_ledger({ + "timestamp": subtask["completed_at"], + "model": model, + "success": True, + "input_tokens": tokens_in, + "output_tokens": tokens_out, + "estimated_cost_usd": cost, + "prompt_preview": subtask["description"][:100], + "compound_task_id": subtask.get("id", "unknown") + }) + + except subprocess.TimeoutExpired: + now = datetime.now(timezone.utc).isoformat() + _update_subtask(subtask, status="failed", result="Execution timed out (120s)", completed_at=now) + _log_ledger({ + "timestamp": now, "model": model, "success": False, + "error": "timeout", "compound_task_id": subtask.get("id", "unknown") + }) + + except Exception as e: + now = datetime.now(timezone.utc).isoformat() + _update_subtask(subtask, status="failed", result=f"Error: {str(e)}", completed_at=now) + _log_ledger({ + "timestamp": now, "model": model, "success": False, + "error": str(e), "compound_task_id": subtask.get("id", "unknown") + }) + + return subtask + + +def _validate_dependencies(subtasks: List[Dict[str, Any]]) -> None: + """Validate and clamp dependency indices to valid range.""" + valid_indices = set(s["index"] for s in subtasks) + for st in subtasks: + deps = st.get("depends_on", []) + # Remove self-references and out-of-range indices + st["depends_on"] = [d for d in deps if d in valid_indices and d != st["index"]] + + +def _update_subtask(subtask: Dict[str, Any], **updates) -> None: + """Thread-safe subtask field update under the global lock.""" + with _tasks_lock: + subtask.update(updates) + + +def _run_compound_task(task_id: str) -> None: + """ + Background thread function: execute subtasks respecting dependency order. + + Args: + task_id: ID of the compound task to execute + + Execution strategy: + 1. Validate dependency graph + 2. Execute subtasks in dependency-respecting waves + 3. A subtask is ready when all its dependencies have completed + 4. Ready subtasks are executed in parallel (up to 4 concurrent workers) + 5. Repeat until all subtasks are complete or stuck + 6. Calculate total cost and finalize task status + """ + + with _tasks_lock: + task = _tasks.get(task_id) + if not task: + return + task["status"] = "executing" + + subtasks = task["subtasks"] + completed_indices = set() + + # Phase 0: Validate dependency graph + _validate_dependencies(subtasks) + + # Phase 1: Routing - assign tier to each subtask + for st in subtasks: + _update_subtask(st, status="routing", tier_assigned=st.get("tier_hint", 2)) + + # Phase 2: Execution - run subtasks in waves respecting dependencies + max_stall_cycles = 60 # 30 seconds max stall (0.5s * 60) + stall_count = 0 + + with ThreadPoolExecutor(max_workers=4) as executor: + while len(completed_indices) < len(subtasks): + # Find subtasks that are ready to execute + ready = [] + for st in subtasks: + if st["index"] in completed_indices: + continue + if st["status"] in ("executing", "queued", "failed"): + continue + deps = set(st.get("depends_on", [])) + if deps.issubset(completed_indices): + ready.append(st) + + if not ready: + remaining = [s for s in subtasks if s["index"] not in completed_indices] + still_running = any(s["status"] in ("executing", "queued") for s in remaining) + if not still_running: + # Truly stuck — all remaining are blocked by failed deps + for s in remaining: + if s["status"] not in ("completed", "failed"): + _update_subtask(s, status="failed", result="Blocked by failed dependency") + completed_indices.add(s["index"]) + break + stall_count += 1 + if stall_count > max_stall_cycles: + break + time.sleep(0.5) + continue + + stall_count = 0 # Reset on progress + + # Launch ready subtasks in parallel + futures = {} + for st in ready: + _update_subtask(st, status="queued") + futures[executor.submit(_execute_subtask, st)] = st + + for future in as_completed(futures): + st = futures[future] + try: + future.result() + except Exception as e: + _update_subtask(st, status="failed", result=str(e)) + + if st["status"] in ("completed", "failed"): + completed_indices.add(st["index"]) + + # Phase 3: Finalization + total_cost = sum(s.get("cost", 0) or 0 for s in subtasks) + all_ok = all(s["status"] == "completed" for s in subtasks) + + with _tasks_lock: + task["status"] = "completed" if all_ok else "partial" + task["completed_at"] = datetime.now(timezone.utc).isoformat() + task["total_cost"] = total_cost + + +def submit_compound_task(prompt: str, auth_token: Optional[str] = None) -> Dict[str, Any]: + """ + Plan and begin executing a compound task. + + Args: + prompt: The user's request to decompose and execute + auth_token: (Optional) authentication token for the submission + + Returns: + Immediate response with task ID for polling: + { + "id": "compound-{uuid}", + "status": "planned", + "subtask_count": N + } + + Process: + 1. Use Haiku to plan the task (fast, synchronous) + 2. Store task in memory + 3. Spawn background thread for async execution + 4. Return immediately to client for polling + + The client can then poll /task/{task_id}/progress to monitor execution. + """ + + # Phase 1: Plan (synchronous - fast, uses Haiku) + task = plan_task(prompt) + task_id = task["id"] + + with _tasks_lock: + # Evict oldest task if we're at capacity + if len(_tasks) >= _MAX_TASKS: + oldest_key = min(_tasks, key=lambda k: _tasks[k].get("created_at", "")) + del _tasks[oldest_key] + _tasks[task_id] = task + + # Phase 2: Execute (async - in background thread) + # The task will progress from "planned" -> "executing" -> "completed"/"partial" + thread = threading.Thread(target=_run_compound_task, args=(task_id,), daemon=True) + thread.start() + + return { + "id": task_id, + "status": task["status"], + "subtask_count": len(task["subtasks"]) + } + + +def get_task_progress(task_id: str) -> Optional[Dict[str, Any]]: + """ + Get current state of a compound task (for polling/dashboard). + + Args: + task_id: The task ID from submit_compound_task() + + Returns: + Complete task snapshot including all subtask progress, or None if not found. + + Returned structure: + { + "id": task_id, + "prompt": original prompt, + "status": "planned"|"executing"|"completed"|"partial", + "reasoning": explanation from planner, + "subtasks": [ + { + "id": subtask ID, + "index": 0, + "description": task description, + "tier_hint": 1|2|3, + "tier_assigned": 1|2|3, + "model": "haiku"|"sonnet"|"opus"|None, + "depends_on": [indices], + "status": "pending"|"routing"|"queued"|"executing"|"completed"|"failed", + "result": str or None, + "cost": float or None, + "started_at": ISO8601 or None, + "completed_at": ISO8601 or None + }, + ... + ], + "created_at": ISO8601, + "planned_at": ISO8601, + "completed_at": ISO8601 or None, + "total_cost": float + } + """ + with _tasks_lock: + task = _tasks.get(task_id) + if not task: + return None + # Return a deep copy for thread safety + return json.loads(json.dumps(task, default=str)) + + +def list_recent_tasks(limit: int = 20) -> List[Dict[str, Any]]: + """ + List recent compound tasks (for dashboard view). + + Args: + limit: Maximum number of tasks to return + + Returns: + List of task summaries (most recent first): + [ + { + "id": task ID, + "prompt": truncated prompt, + "status": current status, + "subtask_count": total subtasks, + "completed_count": subtasks finished, + "total_cost": cumulative USD cost, + "created_at": ISO8601, + "completed_at": ISO8601 or None + }, + ... + ] + """ + with _tasks_lock: + tasks = sorted(_tasks.values(), key=lambda t: t.get("created_at", ""), reverse=True) + return [ + { + "id": t["id"], + "prompt": t["prompt"][:100], + "status": t["status"], + "subtask_count": len(t["subtasks"]), + "completed_count": sum(1 for s in t["subtasks"] if s["status"] == "completed"), + "total_cost": t.get("total_cost", 0), + "created_at": t.get("created_at"), + "completed_at": t.get("completed_at") + } + for t in tasks[:limit] + ]