From 8b32d54e791d1d07b05f7f9f45178d21ad08573d Mon Sep 17 00:00:00 2001 From: anthonyrawlins Date: Thu, 10 Jul 2025 12:46:52 +1000 Subject: [PATCH] Copy CCLI source to backend for Docker builds MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 🤖 Generated with [Claude Code](https://claude.ai/code) Co-Authored-By: Claude --- backend/ccli_src/__init__.py | 1 + backend/ccli_src/agents/__init__.py | 1 + .../__pycache__/__init__.cpython-310.pyc | Bin 0 -> 148 bytes .../cli_agent_factory.cpython-310.pyc | Bin 0 -> 9129 bytes .../gemini_cli_agent.cpython-310.pyc | Bin 0 -> 9427 bytes backend/ccli_src/agents/cli_agent_factory.py | 344 ++++++++++++++++ backend/ccli_src/agents/gemini_cli_agent.py | 369 +++++++++++++++++ backend/ccli_src/executors/__init__.py | 1 + .../__pycache__/__init__.cpython-310.pyc | Bin 0 -> 151 bytes .../__pycache__/ssh_executor.cpython-310.pyc | Bin 0 -> 7153 bytes .../ccli_src/executors/simple_ssh_executor.py | 148 +++++++ backend/ccli_src/executors/ssh_executor.py | 221 ++++++++++ backend/ccli_src/src/__init__.py | 1 + backend/ccli_src/src/agents/__init__.py | 1 + .../__pycache__/__init__.cpython-310.pyc | Bin 0 -> 148 bytes .../cli_agent_factory.cpython-310.pyc | Bin 0 -> 9129 bytes .../gemini_cli_agent.cpython-310.pyc | Bin 0 -> 9427 bytes .../ccli_src/src/agents/cli_agent_factory.py | 344 ++++++++++++++++ .../ccli_src/src/agents/gemini_cli_agent.py | 369 +++++++++++++++++ backend/ccli_src/src/executors/__init__.py | 1 + .../__pycache__/__init__.cpython-310.pyc | Bin 0 -> 151 bytes .../__pycache__/ssh_executor.cpython-310.pyc | Bin 0 -> 7153 bytes .../src/executors/simple_ssh_executor.py | 148 +++++++ .../ccli_src/src/executors/ssh_executor.py | 221 ++++++++++ .../src/tests/test_gemini_cli_agent.py | 380 ++++++++++++++++++ .../ccli_src/tests/test_gemini_cli_agent.py | 380 ++++++++++++++++++ 26 files changed, 2930 insertions(+) create mode 100644 backend/ccli_src/__init__.py create mode 100644 backend/ccli_src/agents/__init__.py create mode 100644 backend/ccli_src/agents/__pycache__/__init__.cpython-310.pyc create mode 100644 backend/ccli_src/agents/__pycache__/cli_agent_factory.cpython-310.pyc create mode 100644 backend/ccli_src/agents/__pycache__/gemini_cli_agent.cpython-310.pyc create mode 100644 backend/ccli_src/agents/cli_agent_factory.py create mode 100644 backend/ccli_src/agents/gemini_cli_agent.py create mode 100644 backend/ccli_src/executors/__init__.py create mode 100644 backend/ccli_src/executors/__pycache__/__init__.cpython-310.pyc create mode 100644 backend/ccli_src/executors/__pycache__/ssh_executor.cpython-310.pyc create mode 100644 backend/ccli_src/executors/simple_ssh_executor.py create mode 100644 backend/ccli_src/executors/ssh_executor.py create mode 100644 backend/ccli_src/src/__init__.py create mode 100644 backend/ccli_src/src/agents/__init__.py create mode 100644 backend/ccli_src/src/agents/__pycache__/__init__.cpython-310.pyc create mode 100644 backend/ccli_src/src/agents/__pycache__/cli_agent_factory.cpython-310.pyc create mode 100644 backend/ccli_src/src/agents/__pycache__/gemini_cli_agent.cpython-310.pyc create mode 100644 backend/ccli_src/src/agents/cli_agent_factory.py create mode 100644 backend/ccli_src/src/agents/gemini_cli_agent.py create mode 100644 backend/ccli_src/src/executors/__init__.py create mode 100644 backend/ccli_src/src/executors/__pycache__/__init__.cpython-310.pyc create mode 100644 backend/ccli_src/src/executors/__pycache__/ssh_executor.cpython-310.pyc create mode 100644 backend/ccli_src/src/executors/simple_ssh_executor.py create mode 100644 backend/ccli_src/src/executors/ssh_executor.py create mode 100644 backend/ccli_src/src/tests/test_gemini_cli_agent.py create mode 100644 backend/ccli_src/tests/test_gemini_cli_agent.py diff --git a/backend/ccli_src/__init__.py b/backend/ccli_src/__init__.py new file mode 100644 index 00000000..a78f9637 --- /dev/null +++ b/backend/ccli_src/__init__.py @@ -0,0 +1 @@ +# CCLI Source Package \ No newline at end of file diff --git a/backend/ccli_src/agents/__init__.py b/backend/ccli_src/agents/__init__.py new file mode 100644 index 00000000..b0d360dd --- /dev/null +++ b/backend/ccli_src/agents/__init__.py @@ -0,0 +1 @@ +# CLI Agents Package \ No newline at end of file diff --git a/backend/ccli_src/agents/__pycache__/__init__.cpython-310.pyc b/backend/ccli_src/agents/__pycache__/__init__.cpython-310.pyc new file mode 100644 index 0000000000000000000000000000000000000000..d1a1a4d3765732a7ef587d7604e2373cf06485eb GIT binary patch literal 148 zcmd1j<>g`kfggDpB0%afn1e(x0@)5gT+9L_QW%06G#UL?G8BP?5yUTZ{fzwFRQ;0t zyh?pXPyK?T{H)aEl4AXg%(7Jds4}u^_QIx2arP0e0v=S+4wO+4G?~)-YEfX2qh?Hd`ULzxPlLQ6K zP(1@_iGaInQ-|c>L(Z0}$SPmDP3B&)?Sre{?jb;ZyjNxB2@Un)U~}>HlPKGmk6yR}iLEG^R77p;vVLnhm3AR?MbV zv6`t$s%ckjo!YHNx|ykDWI5HyHglDnEZdEdX1?ViTcEb~%h8J^iQD+SzV zSq}F(c|XQ;s2O2-)a3V!J-t%I{U|HozOZLh#+l(xurWTqZm=R7e`!^Yu_-peC)qKc z;wGQu_PQ~xO>5|rU$@yLJN`0NInIn#t#qROv)tnS6=z|Cd!cjJt%bhW$t?=*hCFaw zk2y`(b2m^%uj^0TfHMcTUKltp>fxrdEjZ)rbr01w-&?P5w1gYheJ_~Fy#bfBa>O&wHoEB)uzu{4JzlW)n_fY5x1nPRp!@FZQon|$x3qhLd;3Ehx$MQLbIh;HRlR^+iH zF-nBCBE~?X+`Z-U^23Gu)rFNP`*3+-^+9>1d@mYZe6X}!T_`Wy|M}|5YBaj^VDZsU zmdk4kYby`RQF?88bq(EmELNonn)Vn7T>(9WSe1OiGO9nsG82{axPmfB!m>xO+lt98 zo?^zF#%=tX_)ViGLw1CF3-?+4rtnMs>^WF)CC5il&Y+wHb92!|qHs%OA$8LG_Jv;A zS@(tG_Gw?;3qsec@hGE|pw6P4Dv@4Bwlc7&Fesg9gz=yz>e8;(qBQs1Ck@WZrTmdx zH-k__V^v7a^Oc~2YE>$fFu(+nB0|g(q(+hT#P=IVGW!ggPvZ)TAUVx4GSJK-E~0{3 z53!pRU>;Xs;ZX^@6||Pa)&JY%I4{<49k)0`;E;lHY)oR*w|2?EZknw zXS{Eo33jYIPmbJPJkQj?JSVz+M%YQ!hfTk-DtUH_z4ub9jPe3I&Bu6gT7!fA9y{~W zf(srWs6ERZSv!H+bG(TD$GC<57CX-_$d*a=scgA~mgBt0r(&0UnxBA6{yw`5mwZz8 zxYBE9B{|oL^$fep-iKqK?#_Nf&VG$uNA0P&_B}jrhRvepG*&=QIYCdss&BBHsQq4d zbl(av<`(+^t!Lsf-L`n_ZT2Br&%UGe2kaxXI_!t+WAN%6FJ$kqdG;f$dk&Xce{aSN z`!W0EWeWT|&o0797NW7lNaEPj{->OSPkbkBq(;#Qj-(fk=%(wn135sF&xQZI4s&>S zSdtSy_52rJ?D7+%lfh0-5~#osIBW+VzFvzvs#W-&ihH$I5cP zLipj0sL*u3tilu2T0+3pMq@RhUf$1DdMc)W1G{Af*b zBKGUf!ip2R!P8)^WQ(&{gK&tPBXXX|1tR1S-j3CU@4fJSmTWJRRKy$nXy!)y8o}@$PJ#o~9-NGUP@M%YXisD!1+#r+ct`IV^zNMzLL}h+l6D8t9B4~wf0(_0 ze{mc=a*qSEldy^9tGI$wARCzZi|j9qU47T+7+ab!I{H5v&j7dOo=dv4B=E_Yxgo7B z%1I0@5yCKAa&yE?82R=mU(*8!N!gkWXk+}@(CzQqA0Ne%c=0_Z&dfFZn%f9IK_Qv1 zWQe<%ElSlx-VDTNC=WmVIPv$;WF1-v!BN}H&=M~Fua+RUS|dRwA1WuN@<-a3l1N8C zQ2KU>m6;sfOT1& z)PcF1_R<)g>)5~5ULym=*fi~|qtDSU^!+h5B4^HbQrO?<>y+FdB^d?xOE5x4R*H@) zwRDccz1=?D!h+Zc?sV^HxDrR5-GSiA3mwu93IQrMuPhCW9FW=_C&@#T-b$Kr_UeSb z<0J42fC&f6;`n0DpgEL)X7OOMt9KQ3~oK(LlgYeZM68Z5Wc`h z;cF3r!fKs4^}uO)F?>O8?)mZZV`T(8BmiRFC$;Sb8tb&L!C@olvK@zi2?r2db5sZ* zC?#vCkSw%)iJrvjflMWwC#}$V!l{D2qXQI^Sj?+G-^rkOaN2TFhd8?O7MKO=T;!}_){lXQL zn@Z0n{j5YWkyZ1NOB0rfQqPIGk=@$HkpmZhO?9~*!&NRsNli3aRr{@O$O8f8JR+;^ zt^1{n_zQGVj)pwHxKATSlp0kTPCOKM9I8&qy$n0od_22s{0H<4Wk}0xOCVdd($V;g{5qBA>K=C+rl7vEwAZ+^2 zxFp^)EL*opgz`)Iu>Ggp$l>U6lybc@Jc`dTJs{W_3W=`QGg+xTUBgBEn@2~v36y&( zI=Em7HM*W-@*!`+!_rm;chEkjS_UkA?Jj`#Jf0Mp+y1t+bn!R1h0J^XVTj0P_gf6QAU+X%@BD14QqxTmXp`ULV>N&NP)=$k@aJ zks*PMOIG(|{r^NSDVUtmK6j*G`g;!uh=fDBESLIg6uJUBM~jy50+kSH9Q6`CBNlv>?lQ_vtDyKb%+j6O(q(e8u&odj`6C2b(jw3?q z9#(tL>kBgu^?6L>0tobVNHZxu3`P84OAzpQb2EG#aeHI8RdA?_r2K(JOZoSbK*^I-)o_D@Ix7$?qCl1qSO z0xCL1oHuM8rT@x&V|-_&+s?2Ih9?;i0x2Yc5h;YfN8wj`|G@(k@U}{&+Y@j|W+DWc zJz4uWqo=$>#g|wl_RbtcVnQSnP+@C7%^(b9C2&#bX;EhKSd(HgzcQs*@GHeanG9SW zy`h7EQteZV4UP!a+CD-cZ5;2T5kQriNq2!rPfE=0)To8ddIzP$>L6myuSMlR@2Q!KI4sTE-1TF zC^bpGB>M&-V0TQna5x#HQ_?%mnk(Rx9lDVxfUB#Y(NrGZv*E<9JDhT^{sYxJP8mmq zGh2ZZpRhVt_{_%4HRpVCPJ6yIFt~ftD5KY%_?78k?ZAMmSDzvqK?dtAFeVRo+{WN0 z9bcN~|0T6t?0?IO6isf@6+7zilTOOx!@?`Q)eEvO1)?Q>me83{^8h+NPn{ehGTIK} zWE@)kj0V{1e5Hq}X;fsZjUI)Kqf>E066fDi)N#6cjWm^xNR4p=4y5-bzzlG9^^mTr z{T(rtn4{I{QiDquj(YPYUrF_nJN1rh8lwccGw~jg(?n#5T0{xi9KDZj@^I5#dccvh_!!_&>CP$^V~m_P=Eo4Qo=DSNf!W(K`RbsqqPG zDtpE{bN0+wrf7@+mVAd5e3vtdf0ip2OIR)Kc;G`^BF+Z1vk+dkrHOQ??`baX;4X~z z)q5~y$_nFCm%cX_>j;SWb3!-QDdUd<#cJFyd1I~~egq$*V}#s>dY4H7$+Kll0$SoL z6jeN2MQH$2llV#8eaqj|2pIu0!51D-zN@zvV4lck>m09fo%NO{t+UVHb<>?zK;fZ( zqw)5TDacij2E`#?&|;!=+@ViNq<&DxN3;)Goeg-Xluz2jlzh`JeoW(SoTSjfh}2Zt zjnZDy*~sGf6NVs0;B9j9KMWoSI?IesC}1`dXVtajPYO!YWzJ0LkTR$$zkW-)Vpe-QLT-_J_E-;czoNBN5H-x~$J{E6fHB5I@Bxp8&<%C+_PGUw(;>+6&$ zUjFcd8`rPix_TwE-@SD0I_}P`Yad+w@S|IwYw{%4#>tmIUfYiP;aU<6cGfOkTN}!# z8@7{pZM$YTnRk>W9mTM?8m|HbbAs}Z93U!xz9R-W@HFY1!fbhgvodG5z zjPc~u^ohk!P`iOX*pWi|`lV44oyTE^*n-fb_r+dk(;GyIhlP=Bi}m!(3}#jb_xo%4 zKE!J@<+Zf9cBeDod|KwLnts!iXE8L*s*hKYlr+n5byu(Ce|5dCR}60Rf*?$7 z#3nkvTo9md?9j;pZ2`b$%bk{$P!z*bE)*uf?-1O>>ZWmAVR@MWU8D+d;hD>)grAzW82dhl4f{ zn-#aihoZBE9cdc!cc`@kiA}IAGbhf6O(^&QuGmFVGL{Tax4ue`&+O^{2a56>ddu^a zc$6^8c&fio#Z^lFfD+n0NxI?N;kSmTbj5Wf3VsYa{TKm{o_ss-p5p1tffw2UmD0eZ zb(KaXz*($$RoO<_f%YwwlrHtf%V4?iWYSS*PX%(zp zR_(ww?|Np-Fd50gAx%qICeSmMk0Fx)myh*bosir$#vpbVv@vN>BRvN*0+pDMED>8F zZX$Qsocv_t!*wqXWhdx$Qr1E8M;HvMkp|~{R=6MZM&SgBm<;ihIUqt3BcH~;j&3pG z$2BT2y=g;1k-DtrO`46DslyozlB~!dQ^G*JNI4SMtgzXOf<$4I7CsNk_o(h=I`M~i za1qxbov2_*=`sC;A*oFp*P$Ox;36*O%@b%c`|vI)(D{JX)^(^T4k}A*I1$Y?VLyU(pLtDZ0Lkg~lu_i- zA8dn>dfVVAJtFF$&K}+}05=4h<8xj(xZjb{pil3NAudByk)(MtY)1h9&N&Z#sZUOP z4k+;|zaT3qn$h!vPG+MW@$yElSJ6Mp{X#do|&xk)9d*H2Q*Y`L9R- zds;wEU1(%R*HZ^HiH-@Pdj>NiO2^wXA#jRGz&F&MKSV-q%rJ_2ehh~wTNn~^1jU)~ zKY=Fcgyw-?uSznc@)GqdWaR{erRS5qi?ez>YPZ8U-W*M8D$0GxpD(G#Q{?bFvXuab z5^RP3;riyH{3Ci1sS#d>r0tbNw-gI{1G%h3#8e%TEK)^u^t*;I#nfpm9_-6Z=vGpPgh!rTyUk)nlvie~Yf zNNnBKYr0`7zr+2i=GZ3HEt!U8=<2WXoAjuHJ18SdAI!8G2M(DcL-Gm+Ym=Uz{K)#! zPW&2I{7WQ1*GcPl^~a?3jjnmOAWt&Z3W?oyL|zLV8pN!-z}pj8v81?H8f$yyvA&PL zU4szYCB(v>ruDIL4VLnKZPy%|w|Vrv3Oy6nzM%ta)ONP!0rB*IXAvUxah1K>(978y zyq08pR^zNNwyD^LGB;(;1i=g9SZ&kX-Y3hJiw^aAN6My3X6mHoiFuf(^5o_OsTv`388qs zSj2oEjap}H z?i;%n+PQm0q@^(c2xyvsWVh_SeLBXnC{k<8%9t6W%wyDfoYGTEl&9lV zX2-d$4Sr~8$;!^Z=-Mu#X6bhi&6w7HR8;e|*T(K-?FVm9j*VT2bv}Es@WjsNJNKTC zu_L?gr|tmsnC=hs2PC3PqCPtxu34m-v8`6BYRH+Pw#VTya;^c=sCAF;oftcG7RgB} zv3fam*OLFm&rU-X@Hz1bPF8Vb>~zm`Uzx0AOp&Oiw0VspZ3u;Vgwv4Bvx;SaGZRD) z1TU>^B!Qep;k>k>B$GoI-a6YKM9^5%i(KHzE#kNZF>L{RKk=|w@R&iq131nT#tqax zCwrEzP{>fhnFA~w_IlexIIom*Zn$IeE}HTi5DJq$6b`-SA}W}k&cEgs9;5}*G+tWt zlug9g_PlPGo8nCx?;}e7m8o>E^F_6- zPQ$|_4-rcT6*ex~>6n(th8$*>bP>$5zGnzSuyz#I*n;*15$r;zAZL$3JD=IR1=6P= zkI=0MW!PgI(&(n=-XTD+MV^3)gXTFpW`p5~B6@K&k_eH1y2e2@1lCsLZST!DJ>%`jba`|zxYf@uyG2oYkP3Bi9&4=T{5m{KRpVIWzqTU>+x%x zGxHo;la)fPxo5(bL4)2a3mqXQw4kwYLWT6?WfD1WG~`3VWcYm4gfhL9*4Cp3*n46@ z6x&QoA6&lSZQQzY_2$itm^oOh>15850o?(CU=b))wrn`5EEtp+*w=jTH zwF_ZWVZh)-89z6v&b%Rg`LM-^$$u;*LR&yVKLR6Y8_3E&C1hoqRpn=$L0*OZRd@JHaQ^&Bp*k@IDptij zmMJU^3Dc|`L7!svJZx6aolqE8pX^qwwE1;3PnX}HUDVV(77k`2{}S!wUr}-qN%NSp zTnh;TqvZbO-oLV76j_uVO4y`K&6tWDEy2N?>5@-|q*ff=Q{H7CcE+@4^JR`Z2?W`Lbb<-2bjM8fc}S+j`?SFiT&*K;36i!7SmB zmLm;EnvP|XR{3*CAF!<5$7dS4kM-lk<&8akAD=UDN3a|YC2Ue+A-JI-xS>BmTt6Xh z4gLy^SSClZTO8|P=;y|!q3P&x?^XVGYm!PvSQ2Q?Sq+C$Iq0(4X){BF2hNdbK2s{;@L-=noDB@V$=<_^^aj@>WC6`q@UiCUHmozh$3&l>$ zCB~DUm{aaET@ERBbjNqs+*DG$iI)TVf(RH$3NF1%Ze4-CsZ4;v_aaoH{*)XCgTAW3 z&t`8Mu_;o6v=W91A~~IQ4C+8=P(dP+8zrSoMQH=FS{~)`;c{;!+dvJsXC^|YsbWkX zzmPV(fCTX3yM*2?ippeaO;1mo^j@vd|@~r0+TWp~~p!e@zw= z-ZFXArdFbds z)0;@S%Ri%pZ42udHZYr1caM^o64ILLqZsLN_08h%kjtz)2(Ze3q)Pg-#dDpRKd*#> zFY+3a4O}t#UGzT-OHRXUyt1<1SX`;CywzA List[str]: + """Get list of all predefined agent IDs""" + return list(cls.PREDEFINED_AGENTS.keys()) + + @classmethod + def get_enabled_agent_ids(cls) -> List[str]: + """Get list of enabled predefined agent IDs""" + return [ + agent_id for agent_id, definition in cls.PREDEFINED_AGENTS.items() + if definition.enabled + ] + + @classmethod + def get_agent_definition(cls, agent_id: str) -> Optional[CliAgentDefinition]: + """Get predefined agent definition by ID""" + return cls.PREDEFINED_AGENTS.get(agent_id) + + def create_agent(self, agent_id: str, custom_config: Optional[Dict[str, Any]] = None) -> GeminiCliAgent: + """ + Create a CLI agent instance + + Args: + agent_id: ID of predefined agent or custom ID + custom_config: Optional custom configuration to override defaults + + Returns: + GeminiCliAgent instance + + Raises: + ValueError: If agent_id is unknown and no custom_config provided + """ + + # Check if agent already exists + if agent_id in self.active_agents: + self.logger.warning(f"Agent {agent_id} already exists, returning existing instance") + return self.active_agents[agent_id] + + # Get configuration + if agent_id in self.PREDEFINED_AGENTS: + definition = self.PREDEFINED_AGENTS[agent_id] + + if not definition.enabled: + self.logger.warning(f"Agent {agent_id} is disabled but being created anyway") + + config_dict = definition.config.copy() + specialization = definition.specialization.value + + # Apply custom overrides + if custom_config: + config_dict.update(custom_config) + + elif custom_config: + # Custom agent configuration + config_dict = custom_config + specialization = custom_config.get("specialization", "general_ai") + + else: + raise ValueError(f"Unknown agent ID '{agent_id}' and no custom configuration provided") + + # Determine agent type and create appropriate agent + agent_type = config_dict.get("agent_type", "gemini") + + if agent_type == "gemini" or agent_type == CliAgentType.GEMINI: + agent = self._create_gemini_agent(agent_id, config_dict, specialization) + else: + raise ValueError(f"Unsupported agent type: {agent_type}") + + # Store in active agents + self.active_agents[agent_id] = agent + + self.logger.info(f"Created CLI agent: {agent_id} ({specialization})") + return agent + + def _create_gemini_agent(self, agent_id: str, config_dict: Dict[str, Any], specialization: str) -> GeminiCliAgent: + """Create a Gemini CLI agent with the given configuration""" + + # Create GeminiCliConfig from dictionary + config = GeminiCliConfig( + host=config_dict["host"], + node_version=config_dict["node_version"], + model=config_dict.get("model", "gemini-2.5-pro"), + max_concurrent=config_dict.get("max_concurrent", 2), + command_timeout=config_dict.get("command_timeout", 60), + ssh_timeout=config_dict.get("ssh_timeout", 5), + node_path=config_dict.get("node_path"), + gemini_path=config_dict.get("gemini_path") + ) + + return GeminiCliAgent(config, specialization) + + def get_agent(self, agent_id: str) -> Optional[GeminiCliAgent]: + """Get an existing agent instance""" + return self.active_agents.get(agent_id) + + def remove_agent(self, agent_id: str) -> bool: + """Remove an agent instance""" + if agent_id in self.active_agents: + agent = self.active_agents.pop(agent_id) + # Note: Cleanup should be called by the caller if needed + self.logger.info(f"Removed CLI agent: {agent_id}") + return True + return False + + def get_active_agents(self) -> Dict[str, GeminiCliAgent]: + """Get all active agent instances""" + return self.active_agents.copy() + + def get_agent_info(self, agent_id: str) -> Optional[Dict[str, Any]]: + """Get information about an agent""" + + # Check active agents + if agent_id in self.active_agents: + agent = self.active_agents[agent_id] + return { + "agent_id": agent_id, + "status": "active", + "host": agent.config.host, + "model": agent.config.model, + "specialization": agent.specialization, + "active_tasks": len(agent.active_tasks), + "max_concurrent": agent.config.max_concurrent, + "statistics": agent.get_statistics() + } + + # Check predefined but not active + if agent_id in self.PREDEFINED_AGENTS: + definition = self.PREDEFINED_AGENTS[agent_id] + return { + "agent_id": agent_id, + "status": "available" if definition.enabled else "disabled", + "agent_type": definition.agent_type.value, + "specialization": definition.specialization.value, + "description": definition.description, + "config": definition.config + } + + return None + + def list_all_agents(self) -> Dict[str, Dict[str, Any]]: + """List all agents (predefined and active)""" + all_agents = {} + + # Add predefined agents + for agent_id in self.PREDEFINED_AGENTS: + all_agents[agent_id] = self.get_agent_info(agent_id) + + # Add any custom active agents not in predefined list + for agent_id in self.active_agents: + if agent_id not in all_agents: + all_agents[agent_id] = self.get_agent_info(agent_id) + + return all_agents + + async def health_check_all(self) -> Dict[str, Dict[str, Any]]: + """Perform health checks on all active agents""" + health_results = {} + + for agent_id, agent in self.active_agents.items(): + try: + health_results[agent_id] = await agent.health_check() + except Exception as e: + health_results[agent_id] = { + "agent_id": agent_id, + "error": str(e), + "healthy": False + } + + return health_results + + async def cleanup_all(self): + """Clean up all active agents""" + for agent_id, agent in list(self.active_agents.items()): + try: + await agent.cleanup() + self.logger.info(f"Cleaned up agent: {agent_id}") + except Exception as e: + self.logger.error(f"Error cleaning up agent {agent_id}: {e}") + + self.active_agents.clear() + + @classmethod + def create_custom_agent_config(cls, host: str, node_version: str, + specialization: str = "general_ai", + **kwargs) -> Dict[str, Any]: + """ + Helper to create custom agent configuration + + Args: + host: Target host for SSH connection + node_version: Node.js version (e.g., "v22.14.0") + specialization: Agent specialization + **kwargs: Additional configuration options + + Returns: + Configuration dictionary for create_agent() + """ + config = { + "host": host, + "node_version": node_version, + "specialization": specialization, + "agent_type": "gemini", + "model": "gemini-2.5-pro", + "max_concurrent": 2, + "command_timeout": 60, + "ssh_timeout": 5 + } + + config.update(kwargs) + return config + + +# Module-level convenience functions +_default_factory = None + +def get_default_factory() -> CliAgentFactory: + """Get the default CLI agent factory instance""" + global _default_factory + if _default_factory is None: + _default_factory = CliAgentFactory() + return _default_factory + +def create_agent(agent_id: str, custom_config: Optional[Dict[str, Any]] = None) -> GeminiCliAgent: + """Convenience function to create an agent using the default factory""" + factory = get_default_factory() + return factory.create_agent(agent_id, custom_config) \ No newline at end of file diff --git a/backend/ccli_src/agents/gemini_cli_agent.py b/backend/ccli_src/agents/gemini_cli_agent.py new file mode 100644 index 00000000..804e258a --- /dev/null +++ b/backend/ccli_src/agents/gemini_cli_agent.py @@ -0,0 +1,369 @@ +""" +Gemini CLI Agent Adapter +Provides a standardized interface for executing tasks on Gemini CLI via SSH. +""" + +import asyncio +import json +import time +import logging +import hashlib +from dataclasses import dataclass, asdict +from typing import Dict, Any, Optional, List +from enum import Enum + +from executors.ssh_executor import SSHExecutor, SSHConfig, SSHResult + + +class TaskStatus(Enum): + """Task execution status""" + PENDING = "pending" + RUNNING = "running" + COMPLETED = "completed" + FAILED = "failed" + TIMEOUT = "timeout" + + +@dataclass +class GeminiCliConfig: + """Configuration for Gemini CLI agent""" + host: str + node_version: str + model: str = "gemini-2.5-pro" + max_concurrent: int = 2 + command_timeout: int = 60 + ssh_timeout: int = 5 + node_path: Optional[str] = None + gemini_path: Optional[str] = None + + def __post_init__(self): + """Auto-generate paths if not provided""" + if self.node_path is None: + self.node_path = f"/home/tony/.nvm/versions/node/{self.node_version}/bin/node" + if self.gemini_path is None: + self.gemini_path = f"/home/tony/.nvm/versions/node/{self.node_version}/bin/gemini" + + +@dataclass +class TaskRequest: + """Represents a task to be executed""" + prompt: str + model: Optional[str] = None + task_id: Optional[str] = None + priority: int = 3 + metadata: Optional[Dict[str, Any]] = None + + def __post_init__(self): + """Generate task ID if not provided""" + if self.task_id is None: + # Generate a unique task ID based on prompt and timestamp + content = f"{self.prompt}_{time.time()}" + self.task_id = hashlib.md5(content.encode()).hexdigest()[:12] + + +@dataclass +class TaskResult: + """Result of a task execution""" + task_id: str + status: TaskStatus + response: Optional[str] = None + error: Optional[str] = None + execution_time: float = 0.0 + model: Optional[str] = None + agent_id: Optional[str] = None + metadata: Optional[Dict[str, Any]] = None + + def to_dict(self) -> Dict[str, Any]: + """Convert to dictionary for JSON serialization""" + result = asdict(self) + result['status'] = self.status.value + return result + + +class GeminiCliAgent: + """ + Adapter for Google Gemini CLI execution via SSH + + Provides a consistent interface for executing AI tasks on remote Gemini CLI installations + while handling SSH connections, environment setup, error recovery, and concurrent execution. + """ + + def __init__(self, config: GeminiCliConfig, specialization: str = "general_ai"): + self.config = config + self.specialization = specialization + self.agent_id = f"{config.host}-gemini" + + # SSH configuration + self.ssh_config = SSHConfig( + host=config.host, + connect_timeout=config.ssh_timeout, + command_timeout=config.command_timeout + ) + + # SSH executor with connection pooling + self.ssh_executor = SSHExecutor(pool_size=3, persist_timeout=120) + + # Task management + self.active_tasks: Dict[str, asyncio.Task] = {} + self.task_history: List[TaskResult] = [] + self.max_history = 100 + + # Logging + self.logger = logging.getLogger(f"gemini_cli.{config.host}") + + # Performance tracking + self.stats = { + "total_tasks": 0, + "successful_tasks": 0, + "failed_tasks": 0, + "total_execution_time": 0.0, + "average_execution_time": 0.0 + } + + async def execute_task(self, request: TaskRequest) -> TaskResult: + """ + Execute a task on the Gemini CLI + + Args: + request: TaskRequest containing prompt and configuration + + Returns: + TaskResult with execution status and response + """ + + # Check concurrent task limit + if len(self.active_tasks) >= self.config.max_concurrent: + return TaskResult( + task_id=request.task_id, + status=TaskStatus.FAILED, + error=f"Agent at maximum concurrent tasks ({self.config.max_concurrent})", + agent_id=self.agent_id + ) + + # Start task execution + task = asyncio.create_task(self._execute_task_impl(request)) + self.active_tasks[request.task_id] = task + + try: + result = await task + return result + finally: + # Clean up task from active list + self.active_tasks.pop(request.task_id, None) + + async def _execute_task_impl(self, request: TaskRequest) -> TaskResult: + """Internal implementation of task execution""" + start_time = time.time() + model = request.model or self.config.model + + try: + self.logger.info(f"Starting task {request.task_id} with model {model}") + + # Build the CLI command + command = self._build_cli_command(request.prompt, model) + + # Execute via SSH + ssh_result = await self.ssh_executor.execute(self.ssh_config, command) + + execution_time = time.time() - start_time + + # Process result + if ssh_result.returncode == 0: + result = TaskResult( + task_id=request.task_id, + status=TaskStatus.COMPLETED, + response=self._clean_response(ssh_result.stdout), + execution_time=execution_time, + model=model, + agent_id=self.agent_id, + metadata={ + "ssh_duration": ssh_result.duration, + "command": command, + "stderr": ssh_result.stderr + } + ) + self.stats["successful_tasks"] += 1 + else: + result = TaskResult( + task_id=request.task_id, + status=TaskStatus.FAILED, + error=f"CLI execution failed: {ssh_result.stderr}", + execution_time=execution_time, + model=model, + agent_id=self.agent_id, + metadata={ + "returncode": ssh_result.returncode, + "command": command, + "stdout": ssh_result.stdout, + "stderr": ssh_result.stderr + } + ) + self.stats["failed_tasks"] += 1 + + except Exception as e: + execution_time = time.time() - start_time + self.logger.error(f"Task {request.task_id} failed: {e}") + + result = TaskResult( + task_id=request.task_id, + status=TaskStatus.FAILED, + error=str(e), + execution_time=execution_time, + model=model, + agent_id=self.agent_id + ) + self.stats["failed_tasks"] += 1 + + # Update statistics + self.stats["total_tasks"] += 1 + self.stats["total_execution_time"] += execution_time + self.stats["average_execution_time"] = ( + self.stats["total_execution_time"] / self.stats["total_tasks"] + ) + + # Add to history (with size limit) + self.task_history.append(result) + if len(self.task_history) > self.max_history: + self.task_history.pop(0) + + self.logger.info(f"Task {request.task_id} completed with status {result.status.value}") + return result + + def _build_cli_command(self, prompt: str, model: str) -> str: + """Build the complete CLI command for execution""" + + # Environment setup + env_setup = f"source ~/.nvm/nvm.sh && nvm use {self.config.node_version}" + + # Escape the prompt for shell safety + escaped_prompt = prompt.replace("'", "'\\''") + + # Build gemini command + gemini_cmd = f"echo '{escaped_prompt}' | {self.config.gemini_path} --model {model}" + + # Complete command + full_command = f"{env_setup} && {gemini_cmd}" + + return full_command + + def _clean_response(self, raw_output: str) -> str: + """Clean up the raw CLI output""" + lines = raw_output.strip().split('\n') + + # Remove NVM output lines + cleaned_lines = [] + for line in lines: + if not (line.startswith('Now using node') or + line.startswith('MCP STDERR') or + line.strip() == ''): + cleaned_lines.append(line) + + return '\n'.join(cleaned_lines).strip() + + async def health_check(self) -> Dict[str, Any]: + """Perform a health check on the agent""" + try: + # Test SSH connection + ssh_healthy = await self.ssh_executor.test_connection(self.ssh_config) + + # Test Gemini CLI with a simple prompt + if ssh_healthy: + test_request = TaskRequest( + prompt="Say 'health check ok'", + task_id="health_check" + ) + result = await self.execute_task(test_request) + cli_healthy = result.status == TaskStatus.COMPLETED + response_time = result.execution_time + else: + cli_healthy = False + response_time = None + + # Get connection stats + connection_stats = await self.ssh_executor.get_connection_stats() + + return { + "agent_id": self.agent_id, + "host": self.config.host, + "ssh_healthy": ssh_healthy, + "cli_healthy": cli_healthy, + "response_time": response_time, + "active_tasks": len(self.active_tasks), + "max_concurrent": self.config.max_concurrent, + "total_tasks": self.stats["total_tasks"], + "success_rate": ( + self.stats["successful_tasks"] / max(self.stats["total_tasks"], 1) + ), + "average_execution_time": self.stats["average_execution_time"], + "connection_stats": connection_stats, + "model": self.config.model, + "specialization": self.specialization + } + + except Exception as e: + self.logger.error(f"Health check failed: {e}") + return { + "agent_id": self.agent_id, + "host": self.config.host, + "ssh_healthy": False, + "cli_healthy": False, + "error": str(e) + } + + async def get_task_status(self, task_id: str) -> Optional[TaskResult]: + """Get the status of a specific task""" + # Check active tasks + if task_id in self.active_tasks: + task = self.active_tasks[task_id] + if task.done(): + return task.result() + else: + return TaskResult( + task_id=task_id, + status=TaskStatus.RUNNING, + agent_id=self.agent_id + ) + + # Check history + for result in reversed(self.task_history): + if result.task_id == task_id: + return result + + return None + + async def cancel_task(self, task_id: str) -> bool: + """Cancel a running task""" + if task_id in self.active_tasks: + task = self.active_tasks[task_id] + if not task.done(): + task.cancel() + return True + return False + + def get_statistics(self) -> Dict[str, Any]: + """Get agent performance statistics""" + return { + "agent_id": self.agent_id, + "host": self.config.host, + "specialization": self.specialization, + "model": self.config.model, + "stats": self.stats.copy(), + "active_tasks": len(self.active_tasks), + "history_length": len(self.task_history) + } + + async def cleanup(self): + """Clean up resources""" + # Cancel any active tasks + for task_id, task in list(self.active_tasks.items()): + if not task.done(): + task.cancel() + + # Wait for tasks to complete + if self.active_tasks: + await asyncio.gather(*self.active_tasks.values(), return_exceptions=True) + + # Close SSH connections + await self.ssh_executor.cleanup() + + self.logger.info(f"Agent {self.agent_id} cleaned up successfully") \ No newline at end of file diff --git a/backend/ccli_src/executors/__init__.py b/backend/ccli_src/executors/__init__.py new file mode 100644 index 00000000..be7eca53 --- /dev/null +++ b/backend/ccli_src/executors/__init__.py @@ -0,0 +1 @@ +# Executors Package \ No newline at end of file diff --git a/backend/ccli_src/executors/__pycache__/__init__.cpython-310.pyc b/backend/ccli_src/executors/__pycache__/__init__.cpython-310.pyc new file mode 100644 index 0000000000000000000000000000000000000000..f10a8402bccd7ba2c71bdedd7152790f95383338 GIT binary patch literal 151 zcmd1j<>g`k0+t_n8NxvNF^Gc<7=auIATDMB5-AM944RC7D;bJF!U*D*m3~HkZmNDs zeqN=%qo;mBQGQlxa!Ij%MrK*6esXe7rhaizvVLksYI12weo?W0e0*kJW=VX!UP0w8 V4x8Nkl+v73JCK>hOhAH#0RTj&BAWmJ literal 0 HcmV?d00001 diff --git a/backend/ccli_src/executors/__pycache__/ssh_executor.cpython-310.pyc b/backend/ccli_src/executors/__pycache__/ssh_executor.cpython-310.pyc new file mode 100644 index 0000000000000000000000000000000000000000..8d065c1767dbdb2cc7ae0f50aa789b08b5417b11 GIT binary patch literal 7153 zcmZ`;OOGT+wa$oqRMxYf?s+iSiEV5O%ry93!-YFoW7@Cb+HK^@PyVG)T zw+HTl*YX67bq1vazvUm4TjhgFtHN#Wpn6bi)r6(JS1jeJ()*Sw-H~@htIjRHDx;;$ zEe(vQs4AYTcVue`Pc>D?QyovsN_JLMLofHFT2jmJJFQi9t*zItRbK6cYM|qQdb?2=>Tc4H!uT0H92}r8&^!fg z&jj>18xtHx(V!ph=4-unE$Hm(Fo`c$?m=4Cc2i_!)k!+tK_`xzcILlvNGo>+ne$@5 zn`HJ2;W5Unb>ic&i&YbSE5Qq~i0RS-^FJF8S8>JPMxiYzz-r02AmWy-9qlT4#p1h- zyN727_Y&@I(e4%Z(iKbj5Jx#H<9pxK@ofr+mIh^|R%!?Bh4HLd=W3r0T z$IH5 zqAr^4I5F8$yWI)HDB%@jYVC(f=Jp0rC;7-S8xQ~&A3wjj7ai!$Bnpo=U%0k;Xrg`S zF5cYh-_)Dk?x4RJo9BUkPtF|7;8f^z2{QnQi0TV(m&A9-MvSPR@UvbRZ?s9pmVDYm72be zx3lVvFggm`q|CTkHw}z1Bq>9ZGAmT9Qn5zGMJg^)u}%fO!~|4)jtTiAJId8g&tXP!Tdwl8r^>pt@1u=4#A%gP zO;^Acb=AOAm081T)zk_kzLu>(%#e6t|KEjar(b@Z&G9rhEZDv7_4?hu4!g(q=v(hy zL6Mb-8QO6_)!EXaHgP{rCi3pmsL7A&bZy~^A4f3~BdcwVBude@P|Cy}JBfRj+C63M zSRaY)X32aWW6e`2vTC7a;;gJRh<10uNLhJTC$FInQzh+X9(8E*k)1iQ9`t7O=X67A zA$#qI{iNOg0-j;q5YXkx;LOWGnRx{*5U3_UQX4svKjDi1isC&n68qM# zz+SkEUs=DlPb7WvzPxFf7wJBd7pxO|WGlKmBWG;iwc=+JXYAay&}v_>&OSms(wFRo<^{Znwt%2qUh+7+VQ^@5%ALTQ7!5{)v*`DNP-~^t$Kq;M znQ0HSNkg9B9Knp-{8M?nQM9aKyU3ELLp?wfwL4iS#`oMeS3^OGz{{xsusmUR>DxHx}*A z1KL3Y#Z3(8-=RW)1{f^B_pS6=dWq^I0q6oA3iq)9d>=3B@-9IyZwIUaElFU!ZiSCJ zR^lALAOL+~rK?!Q9ec_i!9)OW8z6NE#@DSMV9tsCt{v9$`;j<8YyQ@6p>OBZ)1>E! z``0?`(_u~o? z6Jq+Rs<9_8ynE4_d-MwX{EDu^5H#-`e%j3u!f_4vNvxylZ1fN2DX+5qLJN4*H#->gSdb4mvnRf$}zKbxl_%5?W^O)BcQJ9tTL`)AbAf}W7JGbTKB+L)e7Lv2_>WWBPi|<1^+yaS3@NcOT(l4<{PR?u#6u zz=Wrq4P>xA%9VibyfA2_s6{E<&9@0W|P6}6Fn0a$b`l%=mDw7+5{ zm63-)(%Y}$s*h+D1f0tMz`H)NfyVJbz#;@po_3tS4G1mMZtzA5N$HJ@Pgu>;@y}r^ zW!TF4{0!`6dAy=1XvJt%8Ugy{Ef3 zUdGWxsmL4|i{?KeUh_XFKs=Nfm@*Y4bW^2*;X;hTnZk7OVvV2xts5gg4bpRqhL9Tz zIvD%SRr57kkzvP9=nXt(K1UOV^hxBXPe8a`WE7k7-tE?*nF3kB;XdY zD!;Q?IR#>~L$9h}j>&U2j(;ezp|g_wOBvCO?L}Cd*D?2}xZ-c3khgs)$z$8=2s~@| zYwoID6X%6Q@Irr*FaP64nOYmNCN824fd4eTFc0?yQ+{|LN~ZK4*c?F$JO}8GKn%L0 zAQVJr-;mKdWl5A}O`v!DToG(y*1j{wc~Jp5@2qfBm3FWq1)dBfKmFYIbevE^L&}^* zJV8GWjv{jdZZF-?-MuJyde+%aFz4xQLwq?dfM`(MLc7AE3pT!8&hYYK}XG z;Z&vOhtxqZoP?B=QJ5c7afY_CC3?eL3h`SEj49law@Ggsq5+LGNF(=|h^GW}YTiO1 zNp5wT?fwK0lR$DGN1S~a;)jv~Nzev-`dtfwU#fURadkeef^RD+~x|DBu&X2*gzYo80oi`89AdjCa z47!4`I5|?14VjNc*b7XI*G`{+snL;hP@Fk)=&@+X@pk&?VhhQHCoy><>~QBudeIOU zY>+T@rVXyngiM&EwOL1n8QQDJ@*0Q*m>BQ#cBWc?aO$>sg*35t=8@&8w1?i1`wQk1 zqVCLjn?h;IxD2VvxKwB=dQ^~In_VhqO6D3z*4NNMV`F+zo&{|<8*8giZIt~L>EKk8 zM&#i%DhoftCQ%gAPXcugvA0Cru{sC}7?+gIH>0Sq(h&fkJ{+rPcVPo*(KqO9I ziXqd}?)=4P>^V2R)5HHl8=-IYZ4W-m?4vsK?h7y_0g3ob8nOK%CgLe>yWm z8i2H!(*QY31MpRL`rK9&-qazTpX#7D4EcXAjKsKqa5&I~XF2uX2$&kN{5DRj=omTQ zAZZbBq>emqAxB=EDNW=wr#C)BTFW~?aroar5e@wv(~5k%B0YEjGoY0p$ SSHResult: + """Execute a command via SSH with retries and error handling""" + + for attempt in range(config.max_retries + 1): + try: + return await self._execute_once(config, command, **kwargs) + + except Exception as e: + self.logger.warning(f"SSH execution attempt {attempt + 1} failed for {config.host}: {e}") + + if attempt < config.max_retries: + await asyncio.sleep(1) # Brief delay before retry + else: + # Final attempt failed + raise Exception(f"SSH execution failed after {config.max_retries + 1} attempts: {e}") + + async def _execute_once(self, config: SSHConfig, command: str, **kwargs) -> SSHResult: + """Execute command once via SSH""" + start_time = time.time() + + # Build SSH command + ssh_cmd = self._build_ssh_command(config, command) + + try: + # Execute command with timeout + process = await asyncio.create_subprocess_exec( + *ssh_cmd, + stdout=asyncio.subprocess.PIPE, + stderr=asyncio.subprocess.PIPE, + **kwargs + ) + + stdout, stderr = await asyncio.wait_for( + process.communicate(), + timeout=config.command_timeout + ) + + duration = time.time() - start_time + + return SSHResult( + stdout=stdout.decode('utf-8'), + stderr=stderr.decode('utf-8'), + returncode=process.returncode, + duration=duration, + host=config.host, + command=command + ) + + except asyncio.TimeoutError: + duration = time.time() - start_time + raise Exception(f"SSH command timeout after {config.command_timeout}s on {config.host}") + + except Exception as e: + duration = time.time() - start_time + self.logger.error(f"SSH execution error on {config.host}: {e}") + raise + + def _build_ssh_command(self, config: SSHConfig, command: str) -> list: + """Build SSH command array""" + ssh_cmd = ["ssh"] + + # Add SSH options + for option, value in config.ssh_options.items(): + ssh_cmd.extend(["-o", f"{option}={value}"]) + + # Add destination + if config.username: + destination = f"{config.username}@{config.host}" + else: + destination = config.host + + ssh_cmd.append(destination) + ssh_cmd.append(command) + + return ssh_cmd + + async def test_connection(self, config: SSHConfig) -> bool: + """Test if SSH connection is working""" + try: + result = await self.execute(config, "echo 'connection_test'") + return result.returncode == 0 and "connection_test" in result.stdout + except Exception as e: + self.logger.error(f"Connection test failed for {config.host}: {e}") + return False + + async def get_connection_stats(self) -> Dict[str, Any]: + """Get statistics about current connections (simplified for subprocess)""" + return { + "total_connections": 0, # subprocess doesn't maintain persistent connections + "connection_type": "subprocess" + } + + async def cleanup(self): + """Cleanup resources (no-op for subprocess)""" + pass + + +# Alias for compatibility +SSHExecutor = SimpleSSHExecutor \ No newline at end of file diff --git a/backend/ccli_src/executors/ssh_executor.py b/backend/ccli_src/executors/ssh_executor.py new file mode 100644 index 00000000..00a03df3 --- /dev/null +++ b/backend/ccli_src/executors/ssh_executor.py @@ -0,0 +1,221 @@ +""" +SSH Executor for CCLI +Handles SSH connections, command execution, and connection pooling for CLI agents. +""" + +import asyncio +import asyncssh +import time +import logging +from dataclasses import dataclass +from typing import Optional, Dict, Any +from contextlib import asynccontextmanager + + +@dataclass +class SSHResult: + """Result of an SSH command execution""" + stdout: str + stderr: str + returncode: int + duration: float + host: str + command: str + + +@dataclass +class SSHConfig: + """SSH connection configuration""" + host: str + username: str = "tony" + connect_timeout: int = 5 + command_timeout: int = 30 + max_retries: int = 2 + known_hosts: Optional[str] = None + + +class SSHConnectionPool: + """Manages SSH connection pooling for efficiency""" + + def __init__(self, pool_size: int = 3, persist_timeout: int = 60): + self.pool_size = pool_size + self.persist_timeout = persist_timeout + self.connections: Dict[str, Dict[str, Any]] = {} + self.logger = logging.getLogger(__name__) + + async def get_connection(self, config: SSHConfig) -> asyncssh.SSHClientConnection: + """Get a pooled SSH connection, creating if needed""" + host_key = f"{config.username}@{config.host}" + + # Check if we have a valid connection + if host_key in self.connections: + conn_info = self.connections[host_key] + connection = conn_info['connection'] + + # Check if connection is still alive and not expired + if (not connection.is_closed() and + time.time() - conn_info['created'] < self.persist_timeout): + self.logger.debug(f"Reusing connection to {host_key}") + return connection + else: + # Connection expired or closed, remove it + self.logger.debug(f"Connection to {host_key} expired, creating new one") + await self._close_connection(host_key) + + # Create new connection + self.logger.debug(f"Creating new SSH connection to {host_key}") + connection = await asyncssh.connect( + config.host, + username=config.username, + connect_timeout=config.connect_timeout, + known_hosts=config.known_hosts + ) + + self.connections[host_key] = { + 'connection': connection, + 'created': time.time(), + 'uses': 0 + } + + return connection + + async def _close_connection(self, host_key: str): + """Close and remove a connection from the pool""" + if host_key in self.connections: + try: + conn_info = self.connections[host_key] + if not conn_info['connection'].is_closed(): + conn_info['connection'].close() + await conn_info['connection'].wait_closed() + except Exception as e: + self.logger.warning(f"Error closing connection to {host_key}: {e}") + finally: + del self.connections[host_key] + + async def close_all(self): + """Close all pooled connections""" + for host_key in list(self.connections.keys()): + await self._close_connection(host_key) + + +class SSHExecutor: + """Main SSH command executor with connection pooling and error handling""" + + def __init__(self, pool_size: int = 3, persist_timeout: int = 60): + self.pool = SSHConnectionPool(pool_size, persist_timeout) + self.logger = logging.getLogger(__name__) + + async def execute(self, config: SSHConfig, command: str, **kwargs) -> SSHResult: + """Execute a command via SSH with retries and error handling""" + + for attempt in range(config.max_retries + 1): + try: + return await self._execute_once(config, command, **kwargs) + + except (asyncssh.Error, asyncio.TimeoutError, OSError) as e: + self.logger.warning(f"SSH execution attempt {attempt + 1} failed for {config.host}: {e}") + + if attempt < config.max_retries: + # Close any bad connections and retry + host_key = f"{config.username}@{config.host}" + await self.pool._close_connection(host_key) + await asyncio.sleep(1) # Brief delay before retry + else: + # Final attempt failed + raise Exception(f"SSH execution failed after {config.max_retries + 1} attempts: {e}") + + async def _execute_once(self, config: SSHConfig, command: str, **kwargs) -> SSHResult: + """Execute command once via SSH""" + start_time = time.time() + + try: + connection = await self.pool.get_connection(config) + + # Execute command with timeout + result = await asyncio.wait_for( + connection.run(command, check=False, **kwargs), + timeout=config.command_timeout + ) + + duration = time.time() - start_time + + # Update connection usage stats + host_key = f"{config.username}@{config.host}" + if host_key in self.pool.connections: + self.pool.connections[host_key]['uses'] += 1 + + return SSHResult( + stdout=result.stdout, + stderr=result.stderr, + returncode=result.exit_status, + duration=duration, + host=config.host, + command=command + ) + + except asyncio.TimeoutError: + duration = time.time() - start_time + raise Exception(f"SSH command timeout after {config.command_timeout}s on {config.host}") + + except Exception as e: + duration = time.time() - start_time + self.logger.error(f"SSH execution error on {config.host}: {e}") + raise + + async def test_connection(self, config: SSHConfig) -> bool: + """Test if SSH connection is working""" + try: + result = await self.execute(config, "echo 'connection_test'") + return result.returncode == 0 and "connection_test" in result.stdout + except Exception as e: + self.logger.error(f"Connection test failed for {config.host}: {e}") + return False + + async def get_connection_stats(self) -> Dict[str, Any]: + """Get statistics about current connections""" + stats = { + "total_connections": len(self.pool.connections), + "connections": {} + } + + for host_key, conn_info in self.pool.connections.items(): + stats["connections"][host_key] = { + "created": conn_info["created"], + "age_seconds": time.time() - conn_info["created"], + "uses": conn_info["uses"], + "is_closed": conn_info["connection"].is_closed() + } + + return stats + + async def cleanup(self): + """Close all connections and cleanup resources""" + await self.pool.close_all() + + @asynccontextmanager + async def connection_context(self, config: SSHConfig): + """Context manager for SSH connections""" + try: + connection = await self.pool.get_connection(config) + yield connection + except Exception as e: + self.logger.error(f"SSH connection context error: {e}") + raise + # Connection stays in pool for reuse + + +# Module-level convenience functions +_default_executor = None + +def get_default_executor() -> SSHExecutor: + """Get the default SSH executor instance""" + global _default_executor + if _default_executor is None: + _default_executor = SSHExecutor() + return _default_executor + +async def execute_ssh_command(host: str, command: str, **kwargs) -> SSHResult: + """Convenience function for simple SSH command execution""" + config = SSHConfig(host=host) + executor = get_default_executor() + return await executor.execute(config, command, **kwargs) \ No newline at end of file diff --git a/backend/ccli_src/src/__init__.py b/backend/ccli_src/src/__init__.py new file mode 100644 index 00000000..a78f9637 --- /dev/null +++ b/backend/ccli_src/src/__init__.py @@ -0,0 +1 @@ +# CCLI Source Package \ No newline at end of file diff --git a/backend/ccli_src/src/agents/__init__.py b/backend/ccli_src/src/agents/__init__.py new file mode 100644 index 00000000..b0d360dd --- /dev/null +++ b/backend/ccli_src/src/agents/__init__.py @@ -0,0 +1 @@ +# CLI Agents Package \ No newline at end of file diff --git a/backend/ccli_src/src/agents/__pycache__/__init__.cpython-310.pyc b/backend/ccli_src/src/agents/__pycache__/__init__.cpython-310.pyc new file mode 100644 index 0000000000000000000000000000000000000000..d1a1a4d3765732a7ef587d7604e2373cf06485eb GIT binary patch literal 148 zcmd1j<>g`kfggDpB0%afn1e(x0@)5gT+9L_QW%06G#UL?G8BP?5yUTZ{fzwFRQ;0t zyh?pXPyK?T{H)aEl4AXg%(7Jds4}u^_QIx2arP0e0v=S+4wO+4G?~)-YEfX2qh?Hd`ULzxPlLQ6K zP(1@_iGaInQ-|c>L(Z0}$SPmDP3B&)?Sre{?jb;ZyjNxB2@Un)U~}>HlPKGmk6yR}iLEG^R77p;vVLnhm3AR?MbV zv6`t$s%ckjo!YHNx|ykDWI5HyHglDnEZdEdX1?ViTcEb~%h8J^iQD+SzV zSq}F(c|XQ;s2O2-)a3V!J-t%I{U|HozOZLh#+l(xurWTqZm=R7e`!^Yu_-peC)qKc z;wGQu_PQ~xO>5|rU$@yLJN`0NInIn#t#qROv)tnS6=z|Cd!cjJt%bhW$t?=*hCFaw zk2y`(b2m^%uj^0TfHMcTUKltp>fxrdEjZ)rbr01w-&?P5w1gYheJ_~Fy#bfBa>O&wHoEB)uzu{4JzlW)n_fY5x1nPRp!@FZQon|$x3qhLd;3Ehx$MQLbIh;HRlR^+iH zF-nBCBE~?X+`Z-U^23Gu)rFNP`*3+-^+9>1d@mYZe6X}!T_`Wy|M}|5YBaj^VDZsU zmdk4kYby`RQF?88bq(EmELNonn)Vn7T>(9WSe1OiGO9nsG82{axPmfB!m>xO+lt98 zo?^zF#%=tX_)ViGLw1CF3-?+4rtnMs>^WF)CC5il&Y+wHb92!|qHs%OA$8LG_Jv;A zS@(tG_Gw?;3qsec@hGE|pw6P4Dv@4Bwlc7&Fesg9gz=yz>e8;(qBQs1Ck@WZrTmdx zH-k__V^v7a^Oc~2YE>$fFu(+nB0|g(q(+hT#P=IVGW!ggPvZ)TAUVx4GSJK-E~0{3 z53!pRU>;Xs;ZX^@6||Pa)&JY%I4{<49k)0`;E;lHY)oR*w|2?EZknw zXS{Eo33jYIPmbJPJkQj?JSVz+M%YQ!hfTk-DtUH_z4ub9jPe3I&Bu6gT7!fA9y{~W zf(srWs6ERZSv!H+bG(TD$GC<57CX-_$d*a=scgA~mgBt0r(&0UnxBA6{yw`5mwZz8 zxYBE9B{|oL^$fep-iKqK?#_Nf&VG$uNA0P&_B}jrhRvepG*&=QIYCdss&BBHsQq4d zbl(av<`(+^t!Lsf-L`n_ZT2Br&%UGe2kaxXI_!t+WAN%6FJ$kqdG;f$dk&Xce{aSN z`!W0EWeWT|&o0797NW7lNaEPj{->OSPkbkBq(;#Qj-(fk=%(wn135sF&xQZI4s&>S zSdtSy_52rJ?D7+%lfh0-5~#osIBW+VzFvzvs#W-&ihH$I5cP zLipj0sL*u3tilu2T0+3pMq@RhUf$1DdMc)W1G{Af*b zBKGUf!ip2R!P8)^WQ(&{gK&tPBXXX|1tR1S-j3CU@4fJSmTWJRRKy$nXy!)y8o}@$PJ#o~9-NGUP@M%YXisD!1+#r+ct`IV^zNMzLL}h+l6D8t9B4~wf0(_0 ze{mc=a*qSEldy^9tGI$wARCzZi|j9qU47T+7+ab!I{H5v&j7dOo=dv4B=E_Yxgo7B z%1I0@5yCKAa&yE?82R=mU(*8!N!gkWXk+}@(CzQqA0Ne%c=0_Z&dfFZn%f9IK_Qv1 zWQe<%ElSlx-VDTNC=WmVIPv$;WF1-v!BN}H&=M~Fua+RUS|dRwA1WuN@<-a3l1N8C zQ2KU>m6;sfOT1& z)PcF1_R<)g>)5~5ULym=*fi~|qtDSU^!+h5B4^HbQrO?<>y+FdB^d?xOE5x4R*H@) zwRDccz1=?D!h+Zc?sV^HxDrR5-GSiA3mwu93IQrMuPhCW9FW=_C&@#T-b$Kr_UeSb z<0J42fC&f6;`n0DpgEL)X7OOMt9KQ3~oK(LlgYeZM68Z5Wc`h z;cF3r!fKs4^}uO)F?>O8?)mZZV`T(8BmiRFC$;Sb8tb&L!C@olvK@zi2?r2db5sZ* zC?#vCkSw%)iJrvjflMWwC#}$V!l{D2qXQI^Sj?+G-^rkOaN2TFhd8?O7MKO=T;!}_){lXQL zn@Z0n{j5YWkyZ1NOB0rfQqPIGk=@$HkpmZhO?9~*!&NRsNli3aRr{@O$O8f8JR+;^ zt^1{n_zQGVj)pwHxKATSlp0kTPCOKM9I8&qy$n0od_22s{0H<4Wk}0xOCVdd($V;g{5qBA>K=C+rl7vEwAZ+^2 zxFp^)EL*opgz`)Iu>Ggp$l>U6lybc@Jc`dTJs{W_3W=`QGg+xTUBgBEn@2~v36y&( zI=Em7HM*W-@*!`+!_rm;chEkjS_UkA?Jj`#Jf0Mp+y1t+bn!R1h0J^XVTj0P_gf6QAU+X%@BD14QqxTmXp`ULV>N&NP)=$k@aJ zks*PMOIG(|{r^NSDVUtmK6j*G`g;!uh=fDBESLIg6uJUBM~jy50+kSH9Q6`CBNlv>?lQ_vtDyKb%+j6O(q(e8u&odj`6C2b(jw3?q z9#(tL>kBgu^?6L>0tobVNHZxu3`P84OAzpQb2EG#aeHI8RdA?_r2K(JOZoSbK*^I-)o_D@Ix7$?qCl1qSO z0xCL1oHuM8rT@x&V|-_&+s?2Ih9?;i0x2Yc5h;YfN8wj`|G@(k@U}{&+Y@j|W+DWc zJz4uWqo=$>#g|wl_RbtcVnQSnP+@C7%^(b9C2&#bX;EhKSd(HgzcQs*@GHeanG9SW zy`h7EQteZV4UP!a+CD-cZ5;2T5kQriNq2!rPfE=0)To8ddIzP$>L6myuSMlR@2Q!KI4sTE-1TF zC^bpGB>M&-V0TQna5x#HQ_?%mnk(Rx9lDVxfUB#Y(NrGZv*E<9JDhT^{sYxJP8mmq zGh2ZZpRhVt_{_%4HRpVCPJ6yIFt~ftD5KY%_?78k?ZAMmSDzvqK?dtAFeVRo+{WN0 z9bcN~|0T6t?0?IO6isf@6+7zilTOOx!@?`Q)eEvO1)?Q>me83{^8h+NPn{ehGTIK} zWE@)kj0V{1e5Hq}X;fsZjUI)Kqf>E066fDi)N#6cjWm^xNR4p=4y5-bzzlG9^^mTr z{T(rtn4{I{QiDquj(YPYUrF_nJN1rh8lwccGw~jg(?n#5T0{xi9KDZj@^I5#dccvh_!!_&>CP$^V~m_P=Eo4Qo=DSNf!W(K`RbsqqPG zDtpE{bN0+wrf7@+mVAd5e3vtdf0ip2OIR)Kc;G`^BF+Z1vk+dkrHOQ??`baX;4X~z z)q5~y$_nFCm%cX_>j;SWb3!-QDdUd<#cJFyd1I~~egq$*V}#s>dY4H7$+Kll0$SoL z6jeN2MQH$2llV#8eaqj|2pIu0!51D-zN@zvV4lck>m09fo%NO{t+UVHb<>?zK;fZ( zqw)5TDacij2E`#?&|;!=+@ViNq<&DxN3;)Goeg-Xluz2jlzh`JeoW(SoTSjfh}2Zt zjnZDy*~sGf6NVs0;B9j9KMWoSI?IesC}1`dXVtajPYO!YWzJ0LkTR$$zkW-)Vpe-QLT-_J_E-;czoNBN5H-x~$J{E6fHB5I@Bxp8&<%C+_PGUw(;>+6&$ zUjFcd8`rPix_TwE-@SD0I_}P`Yad+w@S|IwYw{%4#>tmIUfYiP;aU<6cGfOkTN}!# z8@7{pZM$YTnRk>W9mTM?8m|HbbAs}Z93U!xz9R-W@HFY1!fbhgvodG5z zjPc~u^ohk!P`iOX*pWi|`lV44oyTE^*n-fb_r+dk(;GyIhlP=Bi}m!(3}#jb_xo%4 zKE!J@<+Zf9cBeDod|KwLnts!iXE8L*s*hKYlr+n5byu(Ce|5dCR}60Rf*?$7 z#3nkvTo9md?9j;pZ2`b$%bk{$P!z*bE)*uf?-1O>>ZWmAVR@MWU8D+d;hD>)grAzW82dhl4f{ zn-#aihoZBE9cdc!cc`@kiA}IAGbhf6O(^&QuGmFVGL{Tax4ue`&+O^{2a56>ddu^a zc$6^8c&fio#Z^lFfD+n0NxI?N;kSmTbj5Wf3VsYa{TKm{o_ss-p5p1tffw2UmD0eZ zb(KaXz*($$RoO<_f%YwwlrHtf%V4?iWYSS*PX%(zp zR_(ww?|Np-Fd50gAx%qICeSmMk0Fx)myh*bosir$#vpbVv@vN>BRvN*0+pDMED>8F zZX$Qsocv_t!*wqXWhdx$Qr1E8M;HvMkp|~{R=6MZM&SgBm<;ihIUqt3BcH~;j&3pG z$2BT2y=g;1k-DtrO`46DslyozlB~!dQ^G*JNI4SMtgzXOf<$4I7CsNk_o(h=I`M~i za1qxbov2_*=`sC;A*oFp*P$Ox;36*O%@b%c`|vI)(D{JX)^(^T4k}A*I1$Y?VLyU(pLtDZ0Lkg~lu_i- zA8dn>dfVVAJtFF$&K}+}05=4h<8xj(xZjb{pil3NAudByk)(MtY)1h9&N&Z#sZUOP z4k+;|zaT3qn$h!vPG+MW@$yElSJ6Mp{X#do|&xk)9d*H2Q*Y`L9R- zds;wEU1(%R*HZ^HiH-@Pdj>NiO2^wXA#jRGz&F&MKSV-q%rJ_2ehh~wTNn~^1jU)~ zKY=Fcgyw-?uSznc@)GqdWaR{erRS5qi?ez>YPZ8U-W*M8D$0GxpD(G#Q{?bFvXuab z5^RP3;riyH{3Ci1sS#d>r0tbNw-gI{1G%h3#8e%TEK)^u^t*;I#nfpm9_-6Z=vGpPgh!rTyUk)nlvie~Yf zNNnBKYr0`7zr+2i=GZ3HEt!U8=<2WXoAjuHJ18SdAI!8G2M(DcL-Gm+Ym=Uz{K)#! zPW&2I{7WQ1*GcPl^~a?3jjnmOAWt&Z3W?oyL|zLV8pN!-z}pj8v81?H8f$yyvA&PL zU4szYCB(v>ruDIL4VLnKZPy%|w|Vrv3Oy6nzM%ta)ONP!0rB*IXAvUxah1K>(978y zyq08pR^zNNwyD^LGB;(;1i=g9SZ&kX-Y3hJiw^aAN6My3X6mHoiFuf(^5o_OsTv`388qs zSj2oEjap}H z?i;%n+PQm0q@^(c2xyvsWVh_SeLBXnC{k<8%9t6W%wyDfoYGTEl&9lV zX2-d$4Sr~8$;!^Z=-Mu#X6bhi&6w7HR8;e|*T(K-?FVm9j*VT2bv}Es@WjsNJNKTC zu_L?gr|tmsnC=hs2PC3PqCPtxu34m-v8`6BYRH+Pw#VTya;^c=sCAF;oftcG7RgB} zv3fam*OLFm&rU-X@Hz1bPF8Vb>~zm`Uzx0AOp&Oiw0VspZ3u;Vgwv4Bvx;SaGZRD) z1TU>^B!Qep;k>k>B$GoI-a6YKM9^5%i(KHzE#kNZF>L{RKk=|w@R&iq131nT#tqax zCwrEzP{>fhnFA~w_IlexIIom*Zn$IeE}HTi5DJq$6b`-SA}W}k&cEgs9;5}*G+tWt zlug9g_PlPGo8nCx?;}e7m8o>E^F_6- zPQ$|_4-rcT6*ex~>6n(th8$*>bP>$5zGnzSuyz#I*n;*15$r;zAZL$3JD=IR1=6P= zkI=0MW!PgI(&(n=-XTD+MV^3)gXTFpW`p5~B6@K&k_eH1y2e2@1lCsLZST!DJ>%`jba`|zxYf@uyG2oYkP3Bi9&4=T{5m{KRpVIWzqTU>+x%x zGxHo;la)fPxo5(bL4)2a3mqXQw4kwYLWT6?WfD1WG~`3VWcYm4gfhL9*4Cp3*n46@ z6x&QoA6&lSZQQzY_2$itm^oOh>15850o?(CU=b))wrn`5EEtp+*w=jTH zwF_ZWVZh)-89z6v&b%Rg`LM-^$$u;*LR&yVKLR6Y8_3E&C1hoqRpn=$L0*OZRd@JHaQ^&Bp*k@IDptij zmMJU^3Dc|`L7!svJZx6aolqE8pX^qwwE1;3PnX}HUDVV(77k`2{}S!wUr}-qN%NSp zTnh;TqvZbO-oLV76j_uVO4y`K&6tWDEy2N?>5@-|q*ff=Q{H7CcE+@4^JR`Z2?W`Lbb<-2bjM8fc}S+j`?SFiT&*K;36i!7SmB zmLm;EnvP|XR{3*CAF!<5$7dS4kM-lk<&8akAD=UDN3a|YC2Ue+A-JI-xS>BmTt6Xh z4gLy^SSClZTO8|P=;y|!q3P&x?^XVGYm!PvSQ2Q?Sq+C$Iq0(4X){BF2hNdbK2s{;@L-=noDB@V$=<_^^aj@>WC6`q@UiCUHmozh$3&l>$ zCB~DUm{aaET@ERBbjNqs+*DG$iI)TVf(RH$3NF1%Ze4-CsZ4;v_aaoH{*)XCgTAW3 z&t`8Mu_;o6v=W91A~~IQ4C+8=P(dP+8zrSoMQH=FS{~)`;c{;!+dvJsXC^|YsbWkX zzmPV(fCTX3yM*2?ippeaO;1mo^j@vd|@~r0+TWp~~p!e@zw= z-ZFXArdFbds z)0;@S%Ri%pZ42udHZYr1caM^o64ILLqZsLN_08h%kjtz)2(Ze3q)Pg-#dDpRKd*#> zFY+3a4O}t#UGzT-OHRXUyt1<1SX`;CywzA List[str]: + """Get list of all predefined agent IDs""" + return list(cls.PREDEFINED_AGENTS.keys()) + + @classmethod + def get_enabled_agent_ids(cls) -> List[str]: + """Get list of enabled predefined agent IDs""" + return [ + agent_id for agent_id, definition in cls.PREDEFINED_AGENTS.items() + if definition.enabled + ] + + @classmethod + def get_agent_definition(cls, agent_id: str) -> Optional[CliAgentDefinition]: + """Get predefined agent definition by ID""" + return cls.PREDEFINED_AGENTS.get(agent_id) + + def create_agent(self, agent_id: str, custom_config: Optional[Dict[str, Any]] = None) -> GeminiCliAgent: + """ + Create a CLI agent instance + + Args: + agent_id: ID of predefined agent or custom ID + custom_config: Optional custom configuration to override defaults + + Returns: + GeminiCliAgent instance + + Raises: + ValueError: If agent_id is unknown and no custom_config provided + """ + + # Check if agent already exists + if agent_id in self.active_agents: + self.logger.warning(f"Agent {agent_id} already exists, returning existing instance") + return self.active_agents[agent_id] + + # Get configuration + if agent_id in self.PREDEFINED_AGENTS: + definition = self.PREDEFINED_AGENTS[agent_id] + + if not definition.enabled: + self.logger.warning(f"Agent {agent_id} is disabled but being created anyway") + + config_dict = definition.config.copy() + specialization = definition.specialization.value + + # Apply custom overrides + if custom_config: + config_dict.update(custom_config) + + elif custom_config: + # Custom agent configuration + config_dict = custom_config + specialization = custom_config.get("specialization", "general_ai") + + else: + raise ValueError(f"Unknown agent ID '{agent_id}' and no custom configuration provided") + + # Determine agent type and create appropriate agent + agent_type = config_dict.get("agent_type", "gemini") + + if agent_type == "gemini" or agent_type == CliAgentType.GEMINI: + agent = self._create_gemini_agent(agent_id, config_dict, specialization) + else: + raise ValueError(f"Unsupported agent type: {agent_type}") + + # Store in active agents + self.active_agents[agent_id] = agent + + self.logger.info(f"Created CLI agent: {agent_id} ({specialization})") + return agent + + def _create_gemini_agent(self, agent_id: str, config_dict: Dict[str, Any], specialization: str) -> GeminiCliAgent: + """Create a Gemini CLI agent with the given configuration""" + + # Create GeminiCliConfig from dictionary + config = GeminiCliConfig( + host=config_dict["host"], + node_version=config_dict["node_version"], + model=config_dict.get("model", "gemini-2.5-pro"), + max_concurrent=config_dict.get("max_concurrent", 2), + command_timeout=config_dict.get("command_timeout", 60), + ssh_timeout=config_dict.get("ssh_timeout", 5), + node_path=config_dict.get("node_path"), + gemini_path=config_dict.get("gemini_path") + ) + + return GeminiCliAgent(config, specialization) + + def get_agent(self, agent_id: str) -> Optional[GeminiCliAgent]: + """Get an existing agent instance""" + return self.active_agents.get(agent_id) + + def remove_agent(self, agent_id: str) -> bool: + """Remove an agent instance""" + if agent_id in self.active_agents: + agent = self.active_agents.pop(agent_id) + # Note: Cleanup should be called by the caller if needed + self.logger.info(f"Removed CLI agent: {agent_id}") + return True + return False + + def get_active_agents(self) -> Dict[str, GeminiCliAgent]: + """Get all active agent instances""" + return self.active_agents.copy() + + def get_agent_info(self, agent_id: str) -> Optional[Dict[str, Any]]: + """Get information about an agent""" + + # Check active agents + if agent_id in self.active_agents: + agent = self.active_agents[agent_id] + return { + "agent_id": agent_id, + "status": "active", + "host": agent.config.host, + "model": agent.config.model, + "specialization": agent.specialization, + "active_tasks": len(agent.active_tasks), + "max_concurrent": agent.config.max_concurrent, + "statistics": agent.get_statistics() + } + + # Check predefined but not active + if agent_id in self.PREDEFINED_AGENTS: + definition = self.PREDEFINED_AGENTS[agent_id] + return { + "agent_id": agent_id, + "status": "available" if definition.enabled else "disabled", + "agent_type": definition.agent_type.value, + "specialization": definition.specialization.value, + "description": definition.description, + "config": definition.config + } + + return None + + def list_all_agents(self) -> Dict[str, Dict[str, Any]]: + """List all agents (predefined and active)""" + all_agents = {} + + # Add predefined agents + for agent_id in self.PREDEFINED_AGENTS: + all_agents[agent_id] = self.get_agent_info(agent_id) + + # Add any custom active agents not in predefined list + for agent_id in self.active_agents: + if agent_id not in all_agents: + all_agents[agent_id] = self.get_agent_info(agent_id) + + return all_agents + + async def health_check_all(self) -> Dict[str, Dict[str, Any]]: + """Perform health checks on all active agents""" + health_results = {} + + for agent_id, agent in self.active_agents.items(): + try: + health_results[agent_id] = await agent.health_check() + except Exception as e: + health_results[agent_id] = { + "agent_id": agent_id, + "error": str(e), + "healthy": False + } + + return health_results + + async def cleanup_all(self): + """Clean up all active agents""" + for agent_id, agent in list(self.active_agents.items()): + try: + await agent.cleanup() + self.logger.info(f"Cleaned up agent: {agent_id}") + except Exception as e: + self.logger.error(f"Error cleaning up agent {agent_id}: {e}") + + self.active_agents.clear() + + @classmethod + def create_custom_agent_config(cls, host: str, node_version: str, + specialization: str = "general_ai", + **kwargs) -> Dict[str, Any]: + """ + Helper to create custom agent configuration + + Args: + host: Target host for SSH connection + node_version: Node.js version (e.g., "v22.14.0") + specialization: Agent specialization + **kwargs: Additional configuration options + + Returns: + Configuration dictionary for create_agent() + """ + config = { + "host": host, + "node_version": node_version, + "specialization": specialization, + "agent_type": "gemini", + "model": "gemini-2.5-pro", + "max_concurrent": 2, + "command_timeout": 60, + "ssh_timeout": 5 + } + + config.update(kwargs) + return config + + +# Module-level convenience functions +_default_factory = None + +def get_default_factory() -> CliAgentFactory: + """Get the default CLI agent factory instance""" + global _default_factory + if _default_factory is None: + _default_factory = CliAgentFactory() + return _default_factory + +def create_agent(agent_id: str, custom_config: Optional[Dict[str, Any]] = None) -> GeminiCliAgent: + """Convenience function to create an agent using the default factory""" + factory = get_default_factory() + return factory.create_agent(agent_id, custom_config) \ No newline at end of file diff --git a/backend/ccli_src/src/agents/gemini_cli_agent.py b/backend/ccli_src/src/agents/gemini_cli_agent.py new file mode 100644 index 00000000..804e258a --- /dev/null +++ b/backend/ccli_src/src/agents/gemini_cli_agent.py @@ -0,0 +1,369 @@ +""" +Gemini CLI Agent Adapter +Provides a standardized interface for executing tasks on Gemini CLI via SSH. +""" + +import asyncio +import json +import time +import logging +import hashlib +from dataclasses import dataclass, asdict +from typing import Dict, Any, Optional, List +from enum import Enum + +from executors.ssh_executor import SSHExecutor, SSHConfig, SSHResult + + +class TaskStatus(Enum): + """Task execution status""" + PENDING = "pending" + RUNNING = "running" + COMPLETED = "completed" + FAILED = "failed" + TIMEOUT = "timeout" + + +@dataclass +class GeminiCliConfig: + """Configuration for Gemini CLI agent""" + host: str + node_version: str + model: str = "gemini-2.5-pro" + max_concurrent: int = 2 + command_timeout: int = 60 + ssh_timeout: int = 5 + node_path: Optional[str] = None + gemini_path: Optional[str] = None + + def __post_init__(self): + """Auto-generate paths if not provided""" + if self.node_path is None: + self.node_path = f"/home/tony/.nvm/versions/node/{self.node_version}/bin/node" + if self.gemini_path is None: + self.gemini_path = f"/home/tony/.nvm/versions/node/{self.node_version}/bin/gemini" + + +@dataclass +class TaskRequest: + """Represents a task to be executed""" + prompt: str + model: Optional[str] = None + task_id: Optional[str] = None + priority: int = 3 + metadata: Optional[Dict[str, Any]] = None + + def __post_init__(self): + """Generate task ID if not provided""" + if self.task_id is None: + # Generate a unique task ID based on prompt and timestamp + content = f"{self.prompt}_{time.time()}" + self.task_id = hashlib.md5(content.encode()).hexdigest()[:12] + + +@dataclass +class TaskResult: + """Result of a task execution""" + task_id: str + status: TaskStatus + response: Optional[str] = None + error: Optional[str] = None + execution_time: float = 0.0 + model: Optional[str] = None + agent_id: Optional[str] = None + metadata: Optional[Dict[str, Any]] = None + + def to_dict(self) -> Dict[str, Any]: + """Convert to dictionary for JSON serialization""" + result = asdict(self) + result['status'] = self.status.value + return result + + +class GeminiCliAgent: + """ + Adapter for Google Gemini CLI execution via SSH + + Provides a consistent interface for executing AI tasks on remote Gemini CLI installations + while handling SSH connections, environment setup, error recovery, and concurrent execution. + """ + + def __init__(self, config: GeminiCliConfig, specialization: str = "general_ai"): + self.config = config + self.specialization = specialization + self.agent_id = f"{config.host}-gemini" + + # SSH configuration + self.ssh_config = SSHConfig( + host=config.host, + connect_timeout=config.ssh_timeout, + command_timeout=config.command_timeout + ) + + # SSH executor with connection pooling + self.ssh_executor = SSHExecutor(pool_size=3, persist_timeout=120) + + # Task management + self.active_tasks: Dict[str, asyncio.Task] = {} + self.task_history: List[TaskResult] = [] + self.max_history = 100 + + # Logging + self.logger = logging.getLogger(f"gemini_cli.{config.host}") + + # Performance tracking + self.stats = { + "total_tasks": 0, + "successful_tasks": 0, + "failed_tasks": 0, + "total_execution_time": 0.0, + "average_execution_time": 0.0 + } + + async def execute_task(self, request: TaskRequest) -> TaskResult: + """ + Execute a task on the Gemini CLI + + Args: + request: TaskRequest containing prompt and configuration + + Returns: + TaskResult with execution status and response + """ + + # Check concurrent task limit + if len(self.active_tasks) >= self.config.max_concurrent: + return TaskResult( + task_id=request.task_id, + status=TaskStatus.FAILED, + error=f"Agent at maximum concurrent tasks ({self.config.max_concurrent})", + agent_id=self.agent_id + ) + + # Start task execution + task = asyncio.create_task(self._execute_task_impl(request)) + self.active_tasks[request.task_id] = task + + try: + result = await task + return result + finally: + # Clean up task from active list + self.active_tasks.pop(request.task_id, None) + + async def _execute_task_impl(self, request: TaskRequest) -> TaskResult: + """Internal implementation of task execution""" + start_time = time.time() + model = request.model or self.config.model + + try: + self.logger.info(f"Starting task {request.task_id} with model {model}") + + # Build the CLI command + command = self._build_cli_command(request.prompt, model) + + # Execute via SSH + ssh_result = await self.ssh_executor.execute(self.ssh_config, command) + + execution_time = time.time() - start_time + + # Process result + if ssh_result.returncode == 0: + result = TaskResult( + task_id=request.task_id, + status=TaskStatus.COMPLETED, + response=self._clean_response(ssh_result.stdout), + execution_time=execution_time, + model=model, + agent_id=self.agent_id, + metadata={ + "ssh_duration": ssh_result.duration, + "command": command, + "stderr": ssh_result.stderr + } + ) + self.stats["successful_tasks"] += 1 + else: + result = TaskResult( + task_id=request.task_id, + status=TaskStatus.FAILED, + error=f"CLI execution failed: {ssh_result.stderr}", + execution_time=execution_time, + model=model, + agent_id=self.agent_id, + metadata={ + "returncode": ssh_result.returncode, + "command": command, + "stdout": ssh_result.stdout, + "stderr": ssh_result.stderr + } + ) + self.stats["failed_tasks"] += 1 + + except Exception as e: + execution_time = time.time() - start_time + self.logger.error(f"Task {request.task_id} failed: {e}") + + result = TaskResult( + task_id=request.task_id, + status=TaskStatus.FAILED, + error=str(e), + execution_time=execution_time, + model=model, + agent_id=self.agent_id + ) + self.stats["failed_tasks"] += 1 + + # Update statistics + self.stats["total_tasks"] += 1 + self.stats["total_execution_time"] += execution_time + self.stats["average_execution_time"] = ( + self.stats["total_execution_time"] / self.stats["total_tasks"] + ) + + # Add to history (with size limit) + self.task_history.append(result) + if len(self.task_history) > self.max_history: + self.task_history.pop(0) + + self.logger.info(f"Task {request.task_id} completed with status {result.status.value}") + return result + + def _build_cli_command(self, prompt: str, model: str) -> str: + """Build the complete CLI command for execution""" + + # Environment setup + env_setup = f"source ~/.nvm/nvm.sh && nvm use {self.config.node_version}" + + # Escape the prompt for shell safety + escaped_prompt = prompt.replace("'", "'\\''") + + # Build gemini command + gemini_cmd = f"echo '{escaped_prompt}' | {self.config.gemini_path} --model {model}" + + # Complete command + full_command = f"{env_setup} && {gemini_cmd}" + + return full_command + + def _clean_response(self, raw_output: str) -> str: + """Clean up the raw CLI output""" + lines = raw_output.strip().split('\n') + + # Remove NVM output lines + cleaned_lines = [] + for line in lines: + if not (line.startswith('Now using node') or + line.startswith('MCP STDERR') or + line.strip() == ''): + cleaned_lines.append(line) + + return '\n'.join(cleaned_lines).strip() + + async def health_check(self) -> Dict[str, Any]: + """Perform a health check on the agent""" + try: + # Test SSH connection + ssh_healthy = await self.ssh_executor.test_connection(self.ssh_config) + + # Test Gemini CLI with a simple prompt + if ssh_healthy: + test_request = TaskRequest( + prompt="Say 'health check ok'", + task_id="health_check" + ) + result = await self.execute_task(test_request) + cli_healthy = result.status == TaskStatus.COMPLETED + response_time = result.execution_time + else: + cli_healthy = False + response_time = None + + # Get connection stats + connection_stats = await self.ssh_executor.get_connection_stats() + + return { + "agent_id": self.agent_id, + "host": self.config.host, + "ssh_healthy": ssh_healthy, + "cli_healthy": cli_healthy, + "response_time": response_time, + "active_tasks": len(self.active_tasks), + "max_concurrent": self.config.max_concurrent, + "total_tasks": self.stats["total_tasks"], + "success_rate": ( + self.stats["successful_tasks"] / max(self.stats["total_tasks"], 1) + ), + "average_execution_time": self.stats["average_execution_time"], + "connection_stats": connection_stats, + "model": self.config.model, + "specialization": self.specialization + } + + except Exception as e: + self.logger.error(f"Health check failed: {e}") + return { + "agent_id": self.agent_id, + "host": self.config.host, + "ssh_healthy": False, + "cli_healthy": False, + "error": str(e) + } + + async def get_task_status(self, task_id: str) -> Optional[TaskResult]: + """Get the status of a specific task""" + # Check active tasks + if task_id in self.active_tasks: + task = self.active_tasks[task_id] + if task.done(): + return task.result() + else: + return TaskResult( + task_id=task_id, + status=TaskStatus.RUNNING, + agent_id=self.agent_id + ) + + # Check history + for result in reversed(self.task_history): + if result.task_id == task_id: + return result + + return None + + async def cancel_task(self, task_id: str) -> bool: + """Cancel a running task""" + if task_id in self.active_tasks: + task = self.active_tasks[task_id] + if not task.done(): + task.cancel() + return True + return False + + def get_statistics(self) -> Dict[str, Any]: + """Get agent performance statistics""" + return { + "agent_id": self.agent_id, + "host": self.config.host, + "specialization": self.specialization, + "model": self.config.model, + "stats": self.stats.copy(), + "active_tasks": len(self.active_tasks), + "history_length": len(self.task_history) + } + + async def cleanup(self): + """Clean up resources""" + # Cancel any active tasks + for task_id, task in list(self.active_tasks.items()): + if not task.done(): + task.cancel() + + # Wait for tasks to complete + if self.active_tasks: + await asyncio.gather(*self.active_tasks.values(), return_exceptions=True) + + # Close SSH connections + await self.ssh_executor.cleanup() + + self.logger.info(f"Agent {self.agent_id} cleaned up successfully") \ No newline at end of file diff --git a/backend/ccli_src/src/executors/__init__.py b/backend/ccli_src/src/executors/__init__.py new file mode 100644 index 00000000..be7eca53 --- /dev/null +++ b/backend/ccli_src/src/executors/__init__.py @@ -0,0 +1 @@ +# Executors Package \ No newline at end of file diff --git a/backend/ccli_src/src/executors/__pycache__/__init__.cpython-310.pyc b/backend/ccli_src/src/executors/__pycache__/__init__.cpython-310.pyc new file mode 100644 index 0000000000000000000000000000000000000000..f10a8402bccd7ba2c71bdedd7152790f95383338 GIT binary patch literal 151 zcmd1j<>g`k0+t_n8NxvNF^Gc<7=auIATDMB5-AM944RC7D;bJF!U*D*m3~HkZmNDs zeqN=%qo;mBQGQlxa!Ij%MrK*6esXe7rhaizvVLksYI12weo?W0e0*kJW=VX!UP0w8 V4x8Nkl+v73JCK>hOhAH#0RTj&BAWmJ literal 0 HcmV?d00001 diff --git a/backend/ccli_src/src/executors/__pycache__/ssh_executor.cpython-310.pyc b/backend/ccli_src/src/executors/__pycache__/ssh_executor.cpython-310.pyc new file mode 100644 index 0000000000000000000000000000000000000000..8d065c1767dbdb2cc7ae0f50aa789b08b5417b11 GIT binary patch literal 7153 zcmZ`;OOGT+wa$oqRMxYf?s+iSiEV5O%ry93!-YFoW7@Cb+HK^@PyVG)T zw+HTl*YX67bq1vazvUm4TjhgFtHN#Wpn6bi)r6(JS1jeJ()*Sw-H~@htIjRHDx;;$ zEe(vQs4AYTcVue`Pc>D?QyovsN_JLMLofHFT2jmJJFQi9t*zItRbK6cYM|qQdb?2=>Tc4H!uT0H92}r8&^!fg z&jj>18xtHx(V!ph=4-unE$Hm(Fo`c$?m=4Cc2i_!)k!+tK_`xzcILlvNGo>+ne$@5 zn`HJ2;W5Unb>ic&i&YbSE5Qq~i0RS-^FJF8S8>JPMxiYzz-r02AmWy-9qlT4#p1h- zyN727_Y&@I(e4%Z(iKbj5Jx#H<9pxK@ofr+mIh^|R%!?Bh4HLd=W3r0T z$IH5 zqAr^4I5F8$yWI)HDB%@jYVC(f=Jp0rC;7-S8xQ~&A3wjj7ai!$Bnpo=U%0k;Xrg`S zF5cYh-_)Dk?x4RJo9BUkPtF|7;8f^z2{QnQi0TV(m&A9-MvSPR@UvbRZ?s9pmVDYm72be zx3lVvFggm`q|CTkHw}z1Bq>9ZGAmT9Qn5zGMJg^)u}%fO!~|4)jtTiAJId8g&tXP!Tdwl8r^>pt@1u=4#A%gP zO;^Acb=AOAm081T)zk_kzLu>(%#e6t|KEjar(b@Z&G9rhEZDv7_4?hu4!g(q=v(hy zL6Mb-8QO6_)!EXaHgP{rCi3pmsL7A&bZy~^A4f3~BdcwVBude@P|Cy}JBfRj+C63M zSRaY)X32aWW6e`2vTC7a;;gJRh<10uNLhJTC$FInQzh+X9(8E*k)1iQ9`t7O=X67A zA$#qI{iNOg0-j;q5YXkx;LOWGnRx{*5U3_UQX4svKjDi1isC&n68qM# zz+SkEUs=DlPb7WvzPxFf7wJBd7pxO|WGlKmBWG;iwc=+JXYAay&}v_>&OSms(wFRo<^{Znwt%2qUh+7+VQ^@5%ALTQ7!5{)v*`DNP-~^t$Kq;M znQ0HSNkg9B9Knp-{8M?nQM9aKyU3ELLp?wfwL4iS#`oMeS3^OGz{{xsusmUR>DxHx}*A z1KL3Y#Z3(8-=RW)1{f^B_pS6=dWq^I0q6oA3iq)9d>=3B@-9IyZwIUaElFU!ZiSCJ zR^lALAOL+~rK?!Q9ec_i!9)OW8z6NE#@DSMV9tsCt{v9$`;j<8YyQ@6p>OBZ)1>E! z``0?`(_u~o? z6Jq+Rs<9_8ynE4_d-MwX{EDu^5H#-`e%j3u!f_4vNvxylZ1fN2DX+5qLJN4*H#->gSdb4mvnRf$}zKbxl_%5?W^O)BcQJ9tTL`)AbAf}W7JGbTKB+L)e7Lv2_>WWBPi|<1^+yaS3@NcOT(l4<{PR?u#6u zz=Wrq4P>xA%9VibyfA2_s6{E<&9@0W|P6}6Fn0a$b`l%=mDw7+5{ zm63-)(%Y}$s*h+D1f0tMz`H)NfyVJbz#;@po_3tS4G1mMZtzA5N$HJ@Pgu>;@y}r^ zW!TF4{0!`6dAy=1XvJt%8Ugy{Ef3 zUdGWxsmL4|i{?KeUh_XFKs=Nfm@*Y4bW^2*;X;hTnZk7OVvV2xts5gg4bpRqhL9Tz zIvD%SRr57kkzvP9=nXt(K1UOV^hxBXPe8a`WE7k7-tE?*nF3kB;XdY zD!;Q?IR#>~L$9h}j>&U2j(;ezp|g_wOBvCO?L}Cd*D?2}xZ-c3khgs)$z$8=2s~@| zYwoID6X%6Q@Irr*FaP64nOYmNCN824fd4eTFc0?yQ+{|LN~ZK4*c?F$JO}8GKn%L0 zAQVJr-;mKdWl5A}O`v!DToG(y*1j{wc~Jp5@2qfBm3FWq1)dBfKmFYIbevE^L&}^* zJV8GWjv{jdZZF-?-MuJyde+%aFz4xQLwq?dfM`(MLc7AE3pT!8&hYYK}XG z;Z&vOhtxqZoP?B=QJ5c7afY_CC3?eL3h`SEj49law@Ggsq5+LGNF(=|h^GW}YTiO1 zNp5wT?fwK0lR$DGN1S~a;)jv~Nzev-`dtfwU#fURadkeef^RD+~x|DBu&X2*gzYo80oi`89AdjCa z47!4`I5|?14VjNc*b7XI*G`{+snL;hP@Fk)=&@+X@pk&?VhhQHCoy><>~QBudeIOU zY>+T@rVXyngiM&EwOL1n8QQDJ@*0Q*m>BQ#cBWc?aO$>sg*35t=8@&8w1?i1`wQk1 zqVCLjn?h;IxD2VvxKwB=dQ^~In_VhqO6D3z*4NNMV`F+zo&{|<8*8giZIt~L>EKk8 zM&#i%DhoftCQ%gAPXcugvA0Cru{sC}7?+gIH>0Sq(h&fkJ{+rPcVPo*(KqO9I ziXqd}?)=4P>^V2R)5HHl8=-IYZ4W-m?4vsK?h7y_0g3ob8nOK%CgLe>yWm z8i2H!(*QY31MpRL`rK9&-qazTpX#7D4EcXAjKsKqa5&I~XF2uX2$&kN{5DRj=omTQ zAZZbBq>emqAxB=EDNW=wr#C)BTFW~?aroar5e@wv(~5k%B0YEjGoY0p$ SSHResult: + """Execute a command via SSH with retries and error handling""" + + for attempt in range(config.max_retries + 1): + try: + return await self._execute_once(config, command, **kwargs) + + except Exception as e: + self.logger.warning(f"SSH execution attempt {attempt + 1} failed for {config.host}: {e}") + + if attempt < config.max_retries: + await asyncio.sleep(1) # Brief delay before retry + else: + # Final attempt failed + raise Exception(f"SSH execution failed after {config.max_retries + 1} attempts: {e}") + + async def _execute_once(self, config: SSHConfig, command: str, **kwargs) -> SSHResult: + """Execute command once via SSH""" + start_time = time.time() + + # Build SSH command + ssh_cmd = self._build_ssh_command(config, command) + + try: + # Execute command with timeout + process = await asyncio.create_subprocess_exec( + *ssh_cmd, + stdout=asyncio.subprocess.PIPE, + stderr=asyncio.subprocess.PIPE, + **kwargs + ) + + stdout, stderr = await asyncio.wait_for( + process.communicate(), + timeout=config.command_timeout + ) + + duration = time.time() - start_time + + return SSHResult( + stdout=stdout.decode('utf-8'), + stderr=stderr.decode('utf-8'), + returncode=process.returncode, + duration=duration, + host=config.host, + command=command + ) + + except asyncio.TimeoutError: + duration = time.time() - start_time + raise Exception(f"SSH command timeout after {config.command_timeout}s on {config.host}") + + except Exception as e: + duration = time.time() - start_time + self.logger.error(f"SSH execution error on {config.host}: {e}") + raise + + def _build_ssh_command(self, config: SSHConfig, command: str) -> list: + """Build SSH command array""" + ssh_cmd = ["ssh"] + + # Add SSH options + for option, value in config.ssh_options.items(): + ssh_cmd.extend(["-o", f"{option}={value}"]) + + # Add destination + if config.username: + destination = f"{config.username}@{config.host}" + else: + destination = config.host + + ssh_cmd.append(destination) + ssh_cmd.append(command) + + return ssh_cmd + + async def test_connection(self, config: SSHConfig) -> bool: + """Test if SSH connection is working""" + try: + result = await self.execute(config, "echo 'connection_test'") + return result.returncode == 0 and "connection_test" in result.stdout + except Exception as e: + self.logger.error(f"Connection test failed for {config.host}: {e}") + return False + + async def get_connection_stats(self) -> Dict[str, Any]: + """Get statistics about current connections (simplified for subprocess)""" + return { + "total_connections": 0, # subprocess doesn't maintain persistent connections + "connection_type": "subprocess" + } + + async def cleanup(self): + """Cleanup resources (no-op for subprocess)""" + pass + + +# Alias for compatibility +SSHExecutor = SimpleSSHExecutor \ No newline at end of file diff --git a/backend/ccli_src/src/executors/ssh_executor.py b/backend/ccli_src/src/executors/ssh_executor.py new file mode 100644 index 00000000..00a03df3 --- /dev/null +++ b/backend/ccli_src/src/executors/ssh_executor.py @@ -0,0 +1,221 @@ +""" +SSH Executor for CCLI +Handles SSH connections, command execution, and connection pooling for CLI agents. +""" + +import asyncio +import asyncssh +import time +import logging +from dataclasses import dataclass +from typing import Optional, Dict, Any +from contextlib import asynccontextmanager + + +@dataclass +class SSHResult: + """Result of an SSH command execution""" + stdout: str + stderr: str + returncode: int + duration: float + host: str + command: str + + +@dataclass +class SSHConfig: + """SSH connection configuration""" + host: str + username: str = "tony" + connect_timeout: int = 5 + command_timeout: int = 30 + max_retries: int = 2 + known_hosts: Optional[str] = None + + +class SSHConnectionPool: + """Manages SSH connection pooling for efficiency""" + + def __init__(self, pool_size: int = 3, persist_timeout: int = 60): + self.pool_size = pool_size + self.persist_timeout = persist_timeout + self.connections: Dict[str, Dict[str, Any]] = {} + self.logger = logging.getLogger(__name__) + + async def get_connection(self, config: SSHConfig) -> asyncssh.SSHClientConnection: + """Get a pooled SSH connection, creating if needed""" + host_key = f"{config.username}@{config.host}" + + # Check if we have a valid connection + if host_key in self.connections: + conn_info = self.connections[host_key] + connection = conn_info['connection'] + + # Check if connection is still alive and not expired + if (not connection.is_closed() and + time.time() - conn_info['created'] < self.persist_timeout): + self.logger.debug(f"Reusing connection to {host_key}") + return connection + else: + # Connection expired or closed, remove it + self.logger.debug(f"Connection to {host_key} expired, creating new one") + await self._close_connection(host_key) + + # Create new connection + self.logger.debug(f"Creating new SSH connection to {host_key}") + connection = await asyncssh.connect( + config.host, + username=config.username, + connect_timeout=config.connect_timeout, + known_hosts=config.known_hosts + ) + + self.connections[host_key] = { + 'connection': connection, + 'created': time.time(), + 'uses': 0 + } + + return connection + + async def _close_connection(self, host_key: str): + """Close and remove a connection from the pool""" + if host_key in self.connections: + try: + conn_info = self.connections[host_key] + if not conn_info['connection'].is_closed(): + conn_info['connection'].close() + await conn_info['connection'].wait_closed() + except Exception as e: + self.logger.warning(f"Error closing connection to {host_key}: {e}") + finally: + del self.connections[host_key] + + async def close_all(self): + """Close all pooled connections""" + for host_key in list(self.connections.keys()): + await self._close_connection(host_key) + + +class SSHExecutor: + """Main SSH command executor with connection pooling and error handling""" + + def __init__(self, pool_size: int = 3, persist_timeout: int = 60): + self.pool = SSHConnectionPool(pool_size, persist_timeout) + self.logger = logging.getLogger(__name__) + + async def execute(self, config: SSHConfig, command: str, **kwargs) -> SSHResult: + """Execute a command via SSH with retries and error handling""" + + for attempt in range(config.max_retries + 1): + try: + return await self._execute_once(config, command, **kwargs) + + except (asyncssh.Error, asyncio.TimeoutError, OSError) as e: + self.logger.warning(f"SSH execution attempt {attempt + 1} failed for {config.host}: {e}") + + if attempt < config.max_retries: + # Close any bad connections and retry + host_key = f"{config.username}@{config.host}" + await self.pool._close_connection(host_key) + await asyncio.sleep(1) # Brief delay before retry + else: + # Final attempt failed + raise Exception(f"SSH execution failed after {config.max_retries + 1} attempts: {e}") + + async def _execute_once(self, config: SSHConfig, command: str, **kwargs) -> SSHResult: + """Execute command once via SSH""" + start_time = time.time() + + try: + connection = await self.pool.get_connection(config) + + # Execute command with timeout + result = await asyncio.wait_for( + connection.run(command, check=False, **kwargs), + timeout=config.command_timeout + ) + + duration = time.time() - start_time + + # Update connection usage stats + host_key = f"{config.username}@{config.host}" + if host_key in self.pool.connections: + self.pool.connections[host_key]['uses'] += 1 + + return SSHResult( + stdout=result.stdout, + stderr=result.stderr, + returncode=result.exit_status, + duration=duration, + host=config.host, + command=command + ) + + except asyncio.TimeoutError: + duration = time.time() - start_time + raise Exception(f"SSH command timeout after {config.command_timeout}s on {config.host}") + + except Exception as e: + duration = time.time() - start_time + self.logger.error(f"SSH execution error on {config.host}: {e}") + raise + + async def test_connection(self, config: SSHConfig) -> bool: + """Test if SSH connection is working""" + try: + result = await self.execute(config, "echo 'connection_test'") + return result.returncode == 0 and "connection_test" in result.stdout + except Exception as e: + self.logger.error(f"Connection test failed for {config.host}: {e}") + return False + + async def get_connection_stats(self) -> Dict[str, Any]: + """Get statistics about current connections""" + stats = { + "total_connections": len(self.pool.connections), + "connections": {} + } + + for host_key, conn_info in self.pool.connections.items(): + stats["connections"][host_key] = { + "created": conn_info["created"], + "age_seconds": time.time() - conn_info["created"], + "uses": conn_info["uses"], + "is_closed": conn_info["connection"].is_closed() + } + + return stats + + async def cleanup(self): + """Close all connections and cleanup resources""" + await self.pool.close_all() + + @asynccontextmanager + async def connection_context(self, config: SSHConfig): + """Context manager for SSH connections""" + try: + connection = await self.pool.get_connection(config) + yield connection + except Exception as e: + self.logger.error(f"SSH connection context error: {e}") + raise + # Connection stays in pool for reuse + + +# Module-level convenience functions +_default_executor = None + +def get_default_executor() -> SSHExecutor: + """Get the default SSH executor instance""" + global _default_executor + if _default_executor is None: + _default_executor = SSHExecutor() + return _default_executor + +async def execute_ssh_command(host: str, command: str, **kwargs) -> SSHResult: + """Convenience function for simple SSH command execution""" + config = SSHConfig(host=host) + executor = get_default_executor() + return await executor.execute(config, command, **kwargs) \ No newline at end of file diff --git a/backend/ccli_src/src/tests/test_gemini_cli_agent.py b/backend/ccli_src/src/tests/test_gemini_cli_agent.py new file mode 100644 index 00000000..125b7c27 --- /dev/null +++ b/backend/ccli_src/src/tests/test_gemini_cli_agent.py @@ -0,0 +1,380 @@ +""" +Unit tests for GeminiCliAgent +""" + +import pytest +import asyncio +from unittest.mock import Mock, AsyncMock, patch +from dataclasses import dataclass + +import sys +import os +sys.path.append(os.path.dirname(os.path.dirname(__file__))) + +from agents.gemini_cli_agent import ( + GeminiCliAgent, GeminiCliConfig, TaskRequest, TaskResult, TaskStatus +) +from executors.ssh_executor import SSHResult + + +class TestGeminiCliAgent: + + @pytest.fixture + def agent_config(self): + return GeminiCliConfig( + host="test-host", + node_version="v22.14.0", + model="gemini-2.5-pro", + max_concurrent=2, + command_timeout=30 + ) + + @pytest.fixture + def agent(self, agent_config): + return GeminiCliAgent(agent_config, "test_specialty") + + @pytest.fixture + def task_request(self): + return TaskRequest( + prompt="What is 2+2?", + task_id="test-task-123" + ) + + def test_agent_initialization(self, agent_config): + """Test agent initialization with proper configuration""" + agent = GeminiCliAgent(agent_config, "general_ai") + + assert agent.config.host == "test-host" + assert agent.config.node_version == "v22.14.0" + assert agent.specialization == "general_ai" + assert agent.agent_id == "test-host-gemini" + assert len(agent.active_tasks) == 0 + assert agent.stats["total_tasks"] == 0 + + def test_config_auto_paths(self): + """Test automatic path generation in config""" + config = GeminiCliConfig( + host="walnut", + node_version="v22.14.0" + ) + + expected_node_path = "/home/tony/.nvm/versions/node/v22.14.0/bin/node" + expected_gemini_path = "/home/tony/.nvm/versions/node/v22.14.0/bin/gemini" + + assert config.node_path == expected_node_path + assert config.gemini_path == expected_gemini_path + + def test_build_cli_command(self, agent): + """Test CLI command building""" + prompt = "What is Python?" + model = "gemini-2.5-pro" + + command = agent._build_cli_command(prompt, model) + + assert "source ~/.nvm/nvm.sh" in command + assert "nvm use v22.14.0" in command + assert "gemini --model gemini-2.5-pro" in command + assert "What is Python?" in command + + def test_build_cli_command_escaping(self, agent): + """Test CLI command with special characters""" + prompt = "What's the meaning of 'life'?" + model = "gemini-2.5-pro" + + command = agent._build_cli_command(prompt, model) + + # Should properly escape single quotes + assert "What\\'s the meaning of \\'life\\'?" in command + + def test_clean_response(self, agent): + """Test response cleaning""" + raw_output = """Now using node v22.14.0 (npm v11.3.0) +MCP STDERR (hive): Warning message + +This is the actual response +from Gemini CLI + +""" + + cleaned = agent._clean_response(raw_output) + expected = "This is the actual response\nfrom Gemini CLI" + + assert cleaned == expected + + @pytest.mark.asyncio + async def test_execute_task_success(self, agent, task_request, mocker): + """Test successful task execution""" + # Mock SSH executor + mock_ssh_result = SSHResult( + stdout="Now using node v22.14.0\n4\n", + stderr="", + returncode=0, + duration=1.5, + host="test-host", + command="test-command" + ) + + mock_execute = AsyncMock(return_value=mock_ssh_result) + mocker.patch.object(agent.ssh_executor, 'execute', mock_execute) + + result = await agent.execute_task(task_request) + + assert result.status == TaskStatus.COMPLETED + assert result.task_id == "test-task-123" + assert result.response == "4" + assert result.execution_time > 0 + assert result.model == "gemini-2.5-pro" + assert result.agent_id == "test-host-gemini" + + # Check statistics update + assert agent.stats["successful_tasks"] == 1 + assert agent.stats["total_tasks"] == 1 + + @pytest.mark.asyncio + async def test_execute_task_failure(self, agent, task_request, mocker): + """Test task execution failure handling""" + mock_ssh_result = SSHResult( + stdout="", + stderr="Command failed: invalid model", + returncode=1, + duration=0.5, + host="test-host", + command="test-command" + ) + + mock_execute = AsyncMock(return_value=mock_ssh_result) + mocker.patch.object(agent.ssh_executor, 'execute', mock_execute) + + result = await agent.execute_task(task_request) + + assert result.status == TaskStatus.FAILED + assert "CLI execution failed" in result.error + assert result.execution_time > 0 + + # Check statistics update + assert agent.stats["failed_tasks"] == 1 + assert agent.stats["total_tasks"] == 1 + + @pytest.mark.asyncio + async def test_execute_task_exception(self, agent, task_request, mocker): + """Test task execution with exception""" + mock_execute = AsyncMock(side_effect=Exception("SSH connection failed")) + mocker.patch.object(agent.ssh_executor, 'execute', mock_execute) + + result = await agent.execute_task(task_request) + + assert result.status == TaskStatus.FAILED + assert "SSH connection failed" in result.error + assert result.execution_time > 0 + + # Check statistics update + assert agent.stats["failed_tasks"] == 1 + assert agent.stats["total_tasks"] == 1 + + @pytest.mark.asyncio + async def test_concurrent_task_limit(self, agent, mocker): + """Test concurrent task execution limits""" + # Mock a slow SSH execution + slow_ssh_result = SSHResult( + stdout="result", + stderr="", + returncode=0, + duration=2.0, + host="test-host", + command="test-command" + ) + + async def slow_execute(*args, **kwargs): + await asyncio.sleep(0.1) # Simulate slow execution + return slow_ssh_result + + mock_execute = AsyncMock(side_effect=slow_execute) + mocker.patch.object(agent.ssh_executor, 'execute', mock_execute) + + # Start maximum concurrent tasks + task1 = TaskRequest(prompt="Task 1", task_id="task-1") + task2 = TaskRequest(prompt="Task 2", task_id="task-2") + task3 = TaskRequest(prompt="Task 3", task_id="task-3") + + # Start first two tasks (should succeed) + result1_coro = agent.execute_task(task1) + result2_coro = agent.execute_task(task2) + + # Give tasks time to start + await asyncio.sleep(0.01) + + # Third task should fail due to limit + result3 = await agent.execute_task(task3) + assert result3.status == TaskStatus.FAILED + assert "maximum concurrent tasks" in result3.error + + # Wait for first two to complete + result1 = await result1_coro + result2 = await result2_coro + + assert result1.status == TaskStatus.COMPLETED + assert result2.status == TaskStatus.COMPLETED + + @pytest.mark.asyncio + async def test_health_check_success(self, agent, mocker): + """Test successful health check""" + # Mock SSH connection test + mock_test_connection = AsyncMock(return_value=True) + mocker.patch.object(agent.ssh_executor, 'test_connection', mock_test_connection) + + # Mock successful CLI execution + mock_ssh_result = SSHResult( + stdout="health check ok\n", + stderr="", + returncode=0, + duration=1.0, + host="test-host", + command="test-command" + ) + mock_execute = AsyncMock(return_value=mock_ssh_result) + mocker.patch.object(agent.ssh_executor, 'execute', mock_execute) + + # Mock connection stats + mock_get_stats = AsyncMock(return_value={"total_connections": 1}) + mocker.patch.object(agent.ssh_executor, 'get_connection_stats', mock_get_stats) + + health = await agent.health_check() + + assert health["agent_id"] == "test-host-gemini" + assert health["ssh_healthy"] is True + assert health["cli_healthy"] is True + assert health["response_time"] > 0 + assert health["active_tasks"] == 0 + assert health["max_concurrent"] == 2 + + @pytest.mark.asyncio + async def test_health_check_failure(self, agent, mocker): + """Test health check with failures""" + # Mock SSH connection failure + mock_test_connection = AsyncMock(return_value=False) + mocker.patch.object(agent.ssh_executor, 'test_connection', mock_test_connection) + + health = await agent.health_check() + + assert health["ssh_healthy"] is False + assert health["cli_healthy"] is False + + @pytest.mark.asyncio + async def test_task_status_tracking(self, agent, mocker): + """Test task status tracking""" + # Mock SSH execution + mock_ssh_result = SSHResult( + stdout="result\n", + stderr="", + returncode=0, + duration=1.0, + host="test-host", + command="test-command" + ) + mock_execute = AsyncMock(return_value=mock_ssh_result) + mocker.patch.object(agent.ssh_executor, 'execute', mock_execute) + + task_request = TaskRequest(prompt="Test", task_id="status-test") + + # Execute task + result = await agent.execute_task(task_request) + + # Check task in history + status = await agent.get_task_status("status-test") + assert status is not None + assert status.status == TaskStatus.COMPLETED + assert status.task_id == "status-test" + + # Check non-existent task + status = await agent.get_task_status("non-existent") + assert status is None + + def test_statistics(self, agent): + """Test statistics tracking""" + stats = agent.get_statistics() + + assert stats["agent_id"] == "test-host-gemini" + assert stats["host"] == "test-host" + assert stats["specialization"] == "test_specialty" + assert stats["model"] == "gemini-2.5-pro" + assert stats["stats"]["total_tasks"] == 0 + assert stats["active_tasks"] == 0 + + @pytest.mark.asyncio + async def test_task_cancellation(self, agent, mocker): + """Test task cancellation""" + # Mock a long-running SSH execution + async def long_execute(*args, **kwargs): + await asyncio.sleep(10) # Long execution + return SSHResult("", "", 0, 10.0, "test-host", "cmd") + + mock_execute = AsyncMock(side_effect=long_execute) + mocker.patch.object(agent.ssh_executor, 'execute', mock_execute) + + task_request = TaskRequest(prompt="Long task", task_id="cancel-test") + + # Start task + task_coro = agent.execute_task(task_request) + + # Let it start + await asyncio.sleep(0.01) + + # Cancel it + cancelled = await agent.cancel_task("cancel-test") + assert cancelled is True + + # The task should be cancelled + try: + await task_coro + except asyncio.CancelledError: + pass # Expected + + @pytest.mark.asyncio + async def test_cleanup(self, agent, mocker): + """Test agent cleanup""" + # Mock SSH executor cleanup + mock_cleanup = AsyncMock() + mocker.patch.object(agent.ssh_executor, 'cleanup', mock_cleanup) + + await agent.cleanup() + + mock_cleanup.assert_called_once() + + +class TestTaskRequest: + + def test_task_request_auto_id(self): + """Test automatic task ID generation""" + request = TaskRequest(prompt="Test prompt") + + assert request.task_id is not None + assert len(request.task_id) == 12 # MD5 hash truncated to 12 chars + + def test_task_request_custom_id(self): + """Test custom task ID""" + request = TaskRequest(prompt="Test", task_id="custom-123") + + assert request.task_id == "custom-123" + + +class TestTaskResult: + + def test_task_result_to_dict(self): + """Test TaskResult serialization""" + result = TaskResult( + task_id="test-123", + status=TaskStatus.COMPLETED, + response="Test response", + execution_time=1.5, + model="gemini-2.5-pro", + agent_id="test-agent" + ) + + result_dict = result.to_dict() + + assert result_dict["task_id"] == "test-123" + assert result_dict["status"] == "completed" + assert result_dict["response"] == "Test response" + assert result_dict["execution_time"] == 1.5 + assert result_dict["model"] == "gemini-2.5-pro" + assert result_dict["agent_id"] == "test-agent" \ No newline at end of file diff --git a/backend/ccli_src/tests/test_gemini_cli_agent.py b/backend/ccli_src/tests/test_gemini_cli_agent.py new file mode 100644 index 00000000..125b7c27 --- /dev/null +++ b/backend/ccli_src/tests/test_gemini_cli_agent.py @@ -0,0 +1,380 @@ +""" +Unit tests for GeminiCliAgent +""" + +import pytest +import asyncio +from unittest.mock import Mock, AsyncMock, patch +from dataclasses import dataclass + +import sys +import os +sys.path.append(os.path.dirname(os.path.dirname(__file__))) + +from agents.gemini_cli_agent import ( + GeminiCliAgent, GeminiCliConfig, TaskRequest, TaskResult, TaskStatus +) +from executors.ssh_executor import SSHResult + + +class TestGeminiCliAgent: + + @pytest.fixture + def agent_config(self): + return GeminiCliConfig( + host="test-host", + node_version="v22.14.0", + model="gemini-2.5-pro", + max_concurrent=2, + command_timeout=30 + ) + + @pytest.fixture + def agent(self, agent_config): + return GeminiCliAgent(agent_config, "test_specialty") + + @pytest.fixture + def task_request(self): + return TaskRequest( + prompt="What is 2+2?", + task_id="test-task-123" + ) + + def test_agent_initialization(self, agent_config): + """Test agent initialization with proper configuration""" + agent = GeminiCliAgent(agent_config, "general_ai") + + assert agent.config.host == "test-host" + assert agent.config.node_version == "v22.14.0" + assert agent.specialization == "general_ai" + assert agent.agent_id == "test-host-gemini" + assert len(agent.active_tasks) == 0 + assert agent.stats["total_tasks"] == 0 + + def test_config_auto_paths(self): + """Test automatic path generation in config""" + config = GeminiCliConfig( + host="walnut", + node_version="v22.14.0" + ) + + expected_node_path = "/home/tony/.nvm/versions/node/v22.14.0/bin/node" + expected_gemini_path = "/home/tony/.nvm/versions/node/v22.14.0/bin/gemini" + + assert config.node_path == expected_node_path + assert config.gemini_path == expected_gemini_path + + def test_build_cli_command(self, agent): + """Test CLI command building""" + prompt = "What is Python?" + model = "gemini-2.5-pro" + + command = agent._build_cli_command(prompt, model) + + assert "source ~/.nvm/nvm.sh" in command + assert "nvm use v22.14.0" in command + assert "gemini --model gemini-2.5-pro" in command + assert "What is Python?" in command + + def test_build_cli_command_escaping(self, agent): + """Test CLI command with special characters""" + prompt = "What's the meaning of 'life'?" + model = "gemini-2.5-pro" + + command = agent._build_cli_command(prompt, model) + + # Should properly escape single quotes + assert "What\\'s the meaning of \\'life\\'?" in command + + def test_clean_response(self, agent): + """Test response cleaning""" + raw_output = """Now using node v22.14.0 (npm v11.3.0) +MCP STDERR (hive): Warning message + +This is the actual response +from Gemini CLI + +""" + + cleaned = agent._clean_response(raw_output) + expected = "This is the actual response\nfrom Gemini CLI" + + assert cleaned == expected + + @pytest.mark.asyncio + async def test_execute_task_success(self, agent, task_request, mocker): + """Test successful task execution""" + # Mock SSH executor + mock_ssh_result = SSHResult( + stdout="Now using node v22.14.0\n4\n", + stderr="", + returncode=0, + duration=1.5, + host="test-host", + command="test-command" + ) + + mock_execute = AsyncMock(return_value=mock_ssh_result) + mocker.patch.object(agent.ssh_executor, 'execute', mock_execute) + + result = await agent.execute_task(task_request) + + assert result.status == TaskStatus.COMPLETED + assert result.task_id == "test-task-123" + assert result.response == "4" + assert result.execution_time > 0 + assert result.model == "gemini-2.5-pro" + assert result.agent_id == "test-host-gemini" + + # Check statistics update + assert agent.stats["successful_tasks"] == 1 + assert agent.stats["total_tasks"] == 1 + + @pytest.mark.asyncio + async def test_execute_task_failure(self, agent, task_request, mocker): + """Test task execution failure handling""" + mock_ssh_result = SSHResult( + stdout="", + stderr="Command failed: invalid model", + returncode=1, + duration=0.5, + host="test-host", + command="test-command" + ) + + mock_execute = AsyncMock(return_value=mock_ssh_result) + mocker.patch.object(agent.ssh_executor, 'execute', mock_execute) + + result = await agent.execute_task(task_request) + + assert result.status == TaskStatus.FAILED + assert "CLI execution failed" in result.error + assert result.execution_time > 0 + + # Check statistics update + assert agent.stats["failed_tasks"] == 1 + assert agent.stats["total_tasks"] == 1 + + @pytest.mark.asyncio + async def test_execute_task_exception(self, agent, task_request, mocker): + """Test task execution with exception""" + mock_execute = AsyncMock(side_effect=Exception("SSH connection failed")) + mocker.patch.object(agent.ssh_executor, 'execute', mock_execute) + + result = await agent.execute_task(task_request) + + assert result.status == TaskStatus.FAILED + assert "SSH connection failed" in result.error + assert result.execution_time > 0 + + # Check statistics update + assert agent.stats["failed_tasks"] == 1 + assert agent.stats["total_tasks"] == 1 + + @pytest.mark.asyncio + async def test_concurrent_task_limit(self, agent, mocker): + """Test concurrent task execution limits""" + # Mock a slow SSH execution + slow_ssh_result = SSHResult( + stdout="result", + stderr="", + returncode=0, + duration=2.0, + host="test-host", + command="test-command" + ) + + async def slow_execute(*args, **kwargs): + await asyncio.sleep(0.1) # Simulate slow execution + return slow_ssh_result + + mock_execute = AsyncMock(side_effect=slow_execute) + mocker.patch.object(agent.ssh_executor, 'execute', mock_execute) + + # Start maximum concurrent tasks + task1 = TaskRequest(prompt="Task 1", task_id="task-1") + task2 = TaskRequest(prompt="Task 2", task_id="task-2") + task3 = TaskRequest(prompt="Task 3", task_id="task-3") + + # Start first two tasks (should succeed) + result1_coro = agent.execute_task(task1) + result2_coro = agent.execute_task(task2) + + # Give tasks time to start + await asyncio.sleep(0.01) + + # Third task should fail due to limit + result3 = await agent.execute_task(task3) + assert result3.status == TaskStatus.FAILED + assert "maximum concurrent tasks" in result3.error + + # Wait for first two to complete + result1 = await result1_coro + result2 = await result2_coro + + assert result1.status == TaskStatus.COMPLETED + assert result2.status == TaskStatus.COMPLETED + + @pytest.mark.asyncio + async def test_health_check_success(self, agent, mocker): + """Test successful health check""" + # Mock SSH connection test + mock_test_connection = AsyncMock(return_value=True) + mocker.patch.object(agent.ssh_executor, 'test_connection', mock_test_connection) + + # Mock successful CLI execution + mock_ssh_result = SSHResult( + stdout="health check ok\n", + stderr="", + returncode=0, + duration=1.0, + host="test-host", + command="test-command" + ) + mock_execute = AsyncMock(return_value=mock_ssh_result) + mocker.patch.object(agent.ssh_executor, 'execute', mock_execute) + + # Mock connection stats + mock_get_stats = AsyncMock(return_value={"total_connections": 1}) + mocker.patch.object(agent.ssh_executor, 'get_connection_stats', mock_get_stats) + + health = await agent.health_check() + + assert health["agent_id"] == "test-host-gemini" + assert health["ssh_healthy"] is True + assert health["cli_healthy"] is True + assert health["response_time"] > 0 + assert health["active_tasks"] == 0 + assert health["max_concurrent"] == 2 + + @pytest.mark.asyncio + async def test_health_check_failure(self, agent, mocker): + """Test health check with failures""" + # Mock SSH connection failure + mock_test_connection = AsyncMock(return_value=False) + mocker.patch.object(agent.ssh_executor, 'test_connection', mock_test_connection) + + health = await agent.health_check() + + assert health["ssh_healthy"] is False + assert health["cli_healthy"] is False + + @pytest.mark.asyncio + async def test_task_status_tracking(self, agent, mocker): + """Test task status tracking""" + # Mock SSH execution + mock_ssh_result = SSHResult( + stdout="result\n", + stderr="", + returncode=0, + duration=1.0, + host="test-host", + command="test-command" + ) + mock_execute = AsyncMock(return_value=mock_ssh_result) + mocker.patch.object(agent.ssh_executor, 'execute', mock_execute) + + task_request = TaskRequest(prompt="Test", task_id="status-test") + + # Execute task + result = await agent.execute_task(task_request) + + # Check task in history + status = await agent.get_task_status("status-test") + assert status is not None + assert status.status == TaskStatus.COMPLETED + assert status.task_id == "status-test" + + # Check non-existent task + status = await agent.get_task_status("non-existent") + assert status is None + + def test_statistics(self, agent): + """Test statistics tracking""" + stats = agent.get_statistics() + + assert stats["agent_id"] == "test-host-gemini" + assert stats["host"] == "test-host" + assert stats["specialization"] == "test_specialty" + assert stats["model"] == "gemini-2.5-pro" + assert stats["stats"]["total_tasks"] == 0 + assert stats["active_tasks"] == 0 + + @pytest.mark.asyncio + async def test_task_cancellation(self, agent, mocker): + """Test task cancellation""" + # Mock a long-running SSH execution + async def long_execute(*args, **kwargs): + await asyncio.sleep(10) # Long execution + return SSHResult("", "", 0, 10.0, "test-host", "cmd") + + mock_execute = AsyncMock(side_effect=long_execute) + mocker.patch.object(agent.ssh_executor, 'execute', mock_execute) + + task_request = TaskRequest(prompt="Long task", task_id="cancel-test") + + # Start task + task_coro = agent.execute_task(task_request) + + # Let it start + await asyncio.sleep(0.01) + + # Cancel it + cancelled = await agent.cancel_task("cancel-test") + assert cancelled is True + + # The task should be cancelled + try: + await task_coro + except asyncio.CancelledError: + pass # Expected + + @pytest.mark.asyncio + async def test_cleanup(self, agent, mocker): + """Test agent cleanup""" + # Mock SSH executor cleanup + mock_cleanup = AsyncMock() + mocker.patch.object(agent.ssh_executor, 'cleanup', mock_cleanup) + + await agent.cleanup() + + mock_cleanup.assert_called_once() + + +class TestTaskRequest: + + def test_task_request_auto_id(self): + """Test automatic task ID generation""" + request = TaskRequest(prompt="Test prompt") + + assert request.task_id is not None + assert len(request.task_id) == 12 # MD5 hash truncated to 12 chars + + def test_task_request_custom_id(self): + """Test custom task ID""" + request = TaskRequest(prompt="Test", task_id="custom-123") + + assert request.task_id == "custom-123" + + +class TestTaskResult: + + def test_task_result_to_dict(self): + """Test TaskResult serialization""" + result = TaskResult( + task_id="test-123", + status=TaskStatus.COMPLETED, + response="Test response", + execution_time=1.5, + model="gemini-2.5-pro", + agent_id="test-agent" + ) + + result_dict = result.to_dict() + + assert result_dict["task_id"] == "test-123" + assert result_dict["status"] == "completed" + assert result_dict["response"] == "Test response" + assert result_dict["execution_time"] == 1.5 + assert result_dict["model"] == "gemini-2.5-pro" + assert result_dict["agent_id"] == "test-agent" \ No newline at end of file