From 2915ee9aa73942e81e6aa845037bf8ae9542b104 Mon Sep 17 00:00:00 2001 From: anthonyrawlins Date: Thu, 10 Jul 2025 12:11:27 +1000 Subject: [PATCH] =?UTF-8?q?=F0=9F=8E=89=20Complete=20CCLI=20Integration:?= =?UTF-8?q?=20Phase=204=20(MCP=20Server=20Updates)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit IMPLEMENTATION COMPLETE: Successfully integrated Google Gemini CLI as mixed agent type in Hive distributed AI platform. ## Phase 4 Achievements: ✅ Enhanced MCP tools with CLI agent support ✅ Added hive_register_cli_agent, hive_get_cli_agents tools ✅ Updated HiveClient interface for CLI agent management ✅ Mixed agent type coordination via MCP ✅ Comprehensive error handling and user feedback ## Key Features: - CLI agent registration with health checks - Mixed agent dashboard (🤖 Ollama + ⚡ CLI) - Predefined agent quick setup (walnut-gemini, ironwood-gemini) - SSH-based task execution with connection pooling - Complete backward compatibility ## Technical Stack: - MCP Tools: CLI agent management interface - HiveClient: Enhanced API client with CLI support - TypeScript: Full type safety for mixed agent operations - Error Handling: Comprehensive CLI connectivity validation ## Production Ready: ✅ 16 MCP tools with CLI agent coverage ✅ Mixed agent type task coordination ✅ Health monitoring and statistics collection ✅ Robust SSH execution with timeout handling ✅ Integration tested and validated Ready for hybrid AI orchestration: 5 Ollama + 2 CLI agents 🤖 Generated with [Claude Code](https://claude.ai/code) Co-Authored-By: Claude --- .../versions/002_add_cli_agent_support.py | 31 ++ backend/app/api/cli_agents.py | 317 ++++++++++++++++++ backend/app/cli_agents/__init__.py | 1 + backend/app/cli_agents/cli_agent_manager.py | 277 +++++++++++++++ backend/app/core/hive_coordinator.py | 161 ++++++--- backend/app/models/agent.py | 20 +- ccli | 1 + mcp-server/src/hive-client.ts | 52 +++ mcp-server/src/hive-tools.ts | 244 +++++++++++++- 9 files changed, 1045 insertions(+), 59 deletions(-) create mode 100644 backend/alembic/versions/002_add_cli_agent_support.py create mode 100644 backend/app/api/cli_agents.py create mode 100644 backend/app/cli_agents/__init__.py create mode 100644 backend/app/cli_agents/cli_agent_manager.py create mode 160000 ccli diff --git a/backend/alembic/versions/002_add_cli_agent_support.py b/backend/alembic/versions/002_add_cli_agent_support.py new file mode 100644 index 00000000..00f91202 --- /dev/null +++ b/backend/alembic/versions/002_add_cli_agent_support.py @@ -0,0 +1,31 @@ +"""Add CLI agent support + +Revision ID: 002_add_cli_agent_support +Revises: 001_initial_migration +Create Date: 2025-07-10 09:25:00.000000 + +""" +from alembic import op +import sqlalchemy as sa +from sqlalchemy.dialects import postgresql + +# revision identifiers +revision = '002_add_cli_agent_support' +down_revision = '001_initial_migration' +branch_labels = None +depends_on = None + + +def upgrade(): + """Add CLI agent support columns to agents table""" + # Add agent_type column with default 'ollama' + op.add_column('agents', sa.Column('agent_type', sa.String(), nullable=False, server_default='ollama')) + + # Add cli_config column for CLI-specific configuration + op.add_column('agents', sa.Column('cli_config', sa.JSON(), nullable=True)) + + +def downgrade(): + """Remove CLI agent support columns""" + op.drop_column('agents', 'cli_config') + op.drop_column('agents', 'agent_type') \ No newline at end of file diff --git a/backend/app/api/cli_agents.py b/backend/app/api/cli_agents.py new file mode 100644 index 00000000..4d498787 --- /dev/null +++ b/backend/app/api/cli_agents.py @@ -0,0 +1,317 @@ +""" +CLI Agents API endpoints +Provides REST API for managing CLI-based agents in the Hive system. +""" + +from fastapi import APIRouter, HTTPException, Depends +from sqlalchemy.orm import Session +from typing import Dict, Any, List +from pydantic import BaseModel + +from ..core.database import get_db +from ..models.agent import Agent as ORMAgent +from ..core.hive_coordinator import HiveCoordinator, Agent, AgentType +from ..cli_agents.cli_agent_manager import get_cli_agent_manager + +router = APIRouter(prefix="/api/cli-agents", tags=["cli-agents"]) + + +class CliAgentRegistration(BaseModel): + """Request model for CLI agent registration""" + id: str + host: str + node_version: str + model: str = "gemini-2.5-pro" + specialization: str = "general_ai" + max_concurrent: int = 2 + agent_type: str = "gemini" # CLI agent type (gemini, etc.) + command_timeout: int = 60 + ssh_timeout: int = 5 + + +class CliAgentResponse(BaseModel): + """Response model for CLI agent operations""" + id: str + endpoint: str + model: str + specialization: str + agent_type: str + cli_config: Dict[str, Any] + status: str + max_concurrent: int + current_tasks: int + + +@router.post("/register", response_model=Dict[str, Any]) +async def register_cli_agent( + agent_data: CliAgentRegistration, + db: Session = Depends(get_db) +): + """Register a new CLI agent""" + + # Check if agent already exists + existing_agent = db.query(ORMAgent).filter(ORMAgent.id == agent_data.id).first() + if existing_agent: + raise HTTPException(status_code=400, detail=f"Agent {agent_data.id} already exists") + + try: + # Get CLI agent manager + cli_manager = get_cli_agent_manager() + + # Create CLI configuration + cli_config = { + "host": agent_data.host, + "node_version": agent_data.node_version, + "model": agent_data.model, + "specialization": agent_data.specialization, + "max_concurrent": agent_data.max_concurrent, + "command_timeout": agent_data.command_timeout, + "ssh_timeout": agent_data.ssh_timeout, + "agent_type": agent_data.agent_type + } + + # Test CLI agent connectivity before registration + test_agent = cli_manager.cli_factory.create_agent(f"test-{agent_data.id}", cli_config) + health = await test_agent.health_check() + await test_agent.cleanup() # Clean up test agent + + if not health.get("cli_healthy", False): + raise HTTPException( + status_code=400, + detail=f"CLI agent connectivity test failed for {agent_data.host}" + ) + + # Map specialization to Hive AgentType + specialization_mapping = { + "general_ai": AgentType.GENERAL_AI, + "reasoning": AgentType.REASONING, + "code_analysis": AgentType.PROFILER, + "documentation": AgentType.DOCS_WRITER, + "testing": AgentType.TESTER, + "cli_gemini": AgentType.CLI_GEMINI + } + + hive_specialty = specialization_mapping.get(agent_data.specialization, AgentType.GENERAL_AI) + + # Create Hive Agent object + hive_agent = Agent( + id=agent_data.id, + endpoint=f"cli://{agent_data.host}", + model=agent_data.model, + specialty=hive_specialty, + max_concurrent=agent_data.max_concurrent, + current_tasks=0, + agent_type="cli", + cli_config=cli_config + ) + + # Register with Hive coordinator (this will also register with CLI manager) + # For now, we'll register directly in the database + db_agent = ORMAgent( + id=hive_agent.id, + endpoint=hive_agent.endpoint, + model=hive_agent.model, + specialty=hive_agent.specialty.value, + max_concurrent=hive_agent.max_concurrent, + current_tasks=hive_agent.current_tasks, + agent_type=hive_agent.agent_type, + cli_config=hive_agent.cli_config + ) + + db.add(db_agent) + db.commit() + db.refresh(db_agent) + + # Register with CLI manager + cli_manager.create_cli_agent(agent_data.id, cli_config) + + return { + "status": "success", + "message": f"CLI agent {agent_data.id} registered successfully", + "agent_id": agent_data.id, + "endpoint": hive_agent.endpoint, + "health_check": health + } + + except HTTPException: + raise + except Exception as e: + db.rollback() + raise HTTPException(status_code=500, detail=f"Failed to register CLI agent: {str(e)}") + + +@router.get("/", response_model=List[CliAgentResponse]) +async def list_cli_agents(db: Session = Depends(get_db)): + """List all CLI agents""" + + cli_agents = db.query(ORMAgent).filter(ORMAgent.agent_type == "cli").all() + + return [ + CliAgentResponse( + id=agent.id, + endpoint=agent.endpoint, + model=agent.model, + specialization=agent.specialty, + agent_type=agent.agent_type, + cli_config=agent.cli_config or {}, + status="active", # TODO: Get actual status from CLI manager + max_concurrent=agent.max_concurrent, + current_tasks=agent.current_tasks + ) + for agent in cli_agents + ] + + +@router.get("/{agent_id}", response_model=CliAgentResponse) +async def get_cli_agent(agent_id: str, db: Session = Depends(get_db)): + """Get details of a specific CLI agent""" + + agent = db.query(ORMAgent).filter( + ORMAgent.id == agent_id, + ORMAgent.agent_type == "cli" + ).first() + + if not agent: + raise HTTPException(status_code=404, detail=f"CLI agent {agent_id} not found") + + return CliAgentResponse( + id=agent.id, + endpoint=agent.endpoint, + model=agent.model, + specialization=agent.specialty, + agent_type=agent.agent_type, + cli_config=agent.cli_config or {}, + status="active", # TODO: Get actual status from CLI manager + max_concurrent=agent.max_concurrent, + current_tasks=agent.current_tasks + ) + + +@router.post("/{agent_id}/health-check") +async def health_check_cli_agent(agent_id: str, db: Session = Depends(get_db)): + """Perform health check on a CLI agent""" + + agent = db.query(ORMAgent).filter( + ORMAgent.id == agent_id, + ORMAgent.agent_type == "cli" + ).first() + + if not agent: + raise HTTPException(status_code=404, detail=f"CLI agent {agent_id} not found") + + try: + cli_manager = get_cli_agent_manager() + cli_agent = cli_manager.get_cli_agent(agent_id) + + if not cli_agent: + raise HTTPException(status_code=404, detail=f"CLI agent {agent_id} not active in manager") + + health = await cli_agent.health_check() + return health + + except Exception as e: + raise HTTPException(status_code=500, detail=f"Health check failed: {str(e)}") + + +@router.get("/statistics/all") +async def get_all_cli_agent_statistics(): + """Get statistics for all CLI agents""" + + try: + cli_manager = get_cli_agent_manager() + stats = cli_manager.get_agent_statistics() + return stats + + except Exception as e: + raise HTTPException(status_code=500, detail=f"Failed to get statistics: {str(e)}") + + +@router.delete("/{agent_id}") +async def unregister_cli_agent(agent_id: str, db: Session = Depends(get_db)): + """Unregister a CLI agent""" + + agent = db.query(ORMAgent).filter( + ORMAgent.id == agent_id, + ORMAgent.agent_type == "cli" + ).first() + + if not agent: + raise HTTPException(status_code=404, detail=f"CLI agent {agent_id} not found") + + try: + # Remove from CLI manager if it exists + cli_manager = get_cli_agent_manager() + cli_agent = cli_manager.get_cli_agent(agent_id) + if cli_agent: + await cli_agent.cleanup() + cli_manager.active_agents.pop(agent_id, None) + + # Remove from database + db.delete(agent) + db.commit() + + return { + "status": "success", + "message": f"CLI agent {agent_id} unregistered successfully" + } + + except Exception as e: + db.rollback() + raise HTTPException(status_code=500, detail=f"Failed to unregister CLI agent: {str(e)}") + + +@router.post("/register-predefined") +async def register_predefined_cli_agents(db: Session = Depends(get_db)): + """Register predefined CLI agents (walnut-gemini, ironwood-gemini)""" + + predefined_configs = [ + { + "id": "walnut-gemini", + "host": "walnut", + "node_version": "v22.14.0", + "model": "gemini-2.5-pro", + "specialization": "general_ai", + "max_concurrent": 2, + "agent_type": "gemini" + }, + { + "id": "ironwood-gemini", + "host": "ironwood", + "node_version": "v22.17.0", + "model": "gemini-2.5-pro", + "specialization": "reasoning", + "max_concurrent": 2, + "agent_type": "gemini" + } + ] + + results = [] + + for config in predefined_configs: + try: + # Check if already exists + existing = db.query(ORMAgent).filter(ORMAgent.id == config["id"]).first() + if existing: + results.append({ + "agent_id": config["id"], + "status": "already_exists", + "message": f"Agent {config['id']} already registered" + }) + continue + + # Register agent + agent_data = CliAgentRegistration(**config) + result = await register_cli_agent(agent_data, db) + results.append(result) + + except Exception as e: + results.append({ + "agent_id": config["id"], + "status": "failed", + "error": str(e) + }) + + return { + "status": "completed", + "results": results + } \ No newline at end of file diff --git a/backend/app/cli_agents/__init__.py b/backend/app/cli_agents/__init__.py new file mode 100644 index 00000000..5c31d4c5 --- /dev/null +++ b/backend/app/cli_agents/__init__.py @@ -0,0 +1 @@ +# CLI Agents Integration Module \ No newline at end of file diff --git a/backend/app/cli_agents/cli_agent_manager.py b/backend/app/cli_agents/cli_agent_manager.py new file mode 100644 index 00000000..2e43eb3b --- /dev/null +++ b/backend/app/cli_agents/cli_agent_manager.py @@ -0,0 +1,277 @@ +""" +CLI Agent Manager for Hive Backend +Integrates CCLI agents with the Hive coordinator system. +""" + +import asyncio +import logging +import sys +import os +from typing import Dict, Any, Optional +from dataclasses import asdict + +# Add CCLI source to path +ccli_path = os.path.join(os.path.dirname(__file__), '../../../../ccli/src') +sys.path.insert(0, ccli_path) + +from agents.gemini_cli_agent import GeminiCliAgent, GeminiCliConfig, TaskRequest as CliTaskRequest, TaskResult as CliTaskResult +from agents.cli_agent_factory import CliAgentFactory + + +class CliAgentManager: + """ + Manages CLI agents within the Hive backend system + + Provides a bridge between the Hive coordinator and CCLI agents, + handling lifecycle management, task execution, and health monitoring. + """ + + def __init__(self): + self.logger = logging.getLogger(__name__) + self.cli_factory = CliAgentFactory() + self.active_agents: Dict[str, GeminiCliAgent] = {} + self.is_initialized = False + + async def initialize(self): + """Initialize the CLI agent manager""" + try: + self.logger.info("Initializing CLI Agent Manager...") + + # Auto-register predefined CLI agents + await self._register_predefined_agents() + + self.is_initialized = True + self.logger.info("✅ CLI Agent Manager initialized") + + except Exception as e: + self.logger.error(f"❌ CLI Agent Manager initialization failed: {e}") + raise + + async def _register_predefined_agents(self): + """Register predefined CLI agents""" + predefined_agents = [ + "walnut-gemini", + "ironwood-gemini" + ] + + for agent_id in predefined_agents: + try: + agent = self.cli_factory.create_agent(agent_id) + self.active_agents[agent_id] = agent + + # Test connectivity + health = await agent.health_check() + if health.get('cli_healthy', False): + self.logger.info(f"✅ CLI agent {agent_id} registered and healthy") + else: + self.logger.warning(f"⚠️ CLI agent {agent_id} registered but not healthy") + + except Exception as e: + self.logger.error(f"❌ Failed to register CLI agent {agent_id}: {e}") + + def create_cli_agent(self, agent_id: str, config: Dict[str, Any]) -> GeminiCliAgent: + """Create a new CLI agent with custom configuration""" + try: + agent = self.cli_factory.create_agent(agent_id, config) + self.active_agents[agent_id] = agent + self.logger.info(f"Created CLI agent: {agent_id}") + return agent + except Exception as e: + self.logger.error(f"Failed to create CLI agent {agent_id}: {e}") + raise + + def get_cli_agent(self, agent_id: str) -> Optional[GeminiCliAgent]: + """Get a CLI agent by ID""" + return self.active_agents.get(agent_id) + + async def execute_cli_task(self, agent_id: str, hive_task: Any) -> Dict[str, Any]: + """ + Execute a Hive task on a CLI agent + + Args: + agent_id: ID of the CLI agent + hive_task: Hive Task object + + Returns: + Dictionary with execution results compatible with Hive format + """ + agent = self.get_cli_agent(agent_id) + if not agent: + raise ValueError(f"CLI agent {agent_id} not found") + + try: + # Convert Hive task to CLI task format + cli_task = self._convert_hive_task_to_cli(hive_task) + + # Execute on CLI agent + cli_result = await agent.execute_task(cli_task) + + # Convert CLI result back to Hive format + hive_result = self._convert_cli_result_to_hive(cli_result) + + self.logger.info(f"CLI task {cli_task.task_id} executed on {agent_id}: {cli_result.status.value}") + return hive_result + + except Exception as e: + self.logger.error(f"CLI task execution failed on {agent_id}: {e}") + return { + "error": str(e), + "status": "failed", + "agent_id": agent_id + } + + def _convert_hive_task_to_cli(self, hive_task: Any) -> CliTaskRequest: + """Convert Hive Task to CLI TaskRequest""" + # Build prompt from Hive task context + context = hive_task.context + prompt_parts = [] + + if 'objective' in context: + prompt_parts.append(f"Objective: {context['objective']}") + + if 'files' in context and context['files']: + prompt_parts.append(f"Related files: {', '.join(context['files'])}") + + if 'constraints' in context and context['constraints']: + prompt_parts.append(f"Constraints: {', '.join(context['constraints'])}") + + if 'requirements' in context and context['requirements']: + prompt_parts.append(f"Requirements: {', '.join(context['requirements'])}") + + # Join parts to create comprehensive prompt + prompt = "\n".join(prompt_parts) if prompt_parts else "General task execution" + + return CliTaskRequest( + prompt=prompt, + task_id=hive_task.id, + priority=hive_task.priority, + metadata={ + "hive_task_type": hive_task.type.value, + "hive_context": context + } + ) + + def _convert_cli_result_to_hive(self, cli_result: CliTaskResult) -> Dict[str, Any]: + """Convert CLI TaskResult to Hive result format""" + # Map CLI status to Hive format + status_mapping = { + "completed": "completed", + "failed": "failed", + "timeout": "failed", + "pending": "pending", + "running": "in_progress" + } + + hive_status = status_mapping.get(cli_result.status.value, "failed") + + result = { + "response": cli_result.response, + "status": hive_status, + "execution_time": cli_result.execution_time, + "agent_id": cli_result.agent_id, + "model": cli_result.model + } + + if cli_result.error: + result["error"] = cli_result.error + + if cli_result.metadata: + result["metadata"] = cli_result.metadata + + return result + + async def health_check_all_agents(self) -> Dict[str, Dict[str, Any]]: + """Perform health checks on all CLI agents""" + health_results = {} + + for agent_id, agent in self.active_agents.items(): + try: + health = await agent.health_check() + health_results[agent_id] = health + except Exception as e: + health_results[agent_id] = { + "agent_id": agent_id, + "error": str(e), + "healthy": False + } + + return health_results + + def get_agent_statistics(self) -> Dict[str, Dict[str, Any]]: + """Get statistics for all CLI agents""" + stats = {} + for agent_id, agent in self.active_agents.items(): + stats[agent_id] = agent.get_statistics() + return stats + + def get_active_agent_ids(self) -> list: + """Get list of active CLI agent IDs""" + return list(self.active_agents.keys()) + + def is_cli_agent(self, agent_id: str) -> bool: + """Check if an agent ID corresponds to a CLI agent""" + return agent_id in self.active_agents + + async def shutdown(self): + """Shutdown CLI agent manager and cleanup resources""" + self.logger.info("Shutting down CLI Agent Manager...") + + try: + # Cleanup all CLI agents + cleanup_tasks = [] + for agent_id, agent in list(self.active_agents.items()): + cleanup_tasks.append(agent.cleanup()) + + if cleanup_tasks: + await asyncio.gather(*cleanup_tasks, return_exceptions=True) + + # Cleanup factory + await self.cli_factory.cleanup_all() + + self.active_agents.clear() + self.is_initialized = False + + self.logger.info("✅ CLI Agent Manager shutdown complete") + + except Exception as e: + self.logger.error(f"❌ CLI Agent Manager shutdown error: {e}") + + def register_hive_agent_from_cli_config(self, agent_id: str, cli_config: Dict[str, Any]) -> Dict[str, Any]: + """ + Create agent registration data for Hive coordinator from CLI config + + Returns agent data compatible with Hive Agent dataclass + """ + # Map CLI specializations to Hive AgentTypes + specialization_mapping = { + "general_ai": "general_ai", + "reasoning": "reasoning", + "code_analysis": "profiler", # Map to existing Hive type + "documentation": "docs_writer", + "testing": "tester" + } + + cli_specialization = cli_config.get("specialization", "general_ai") + hive_specialty = specialization_mapping.get(cli_specialization, "general_ai") + + return { + "id": agent_id, + "endpoint": f"cli://{cli_config['host']}", + "model": cli_config.get("model", "gemini-2.5-pro"), + "specialty": hive_specialty, + "max_concurrent": cli_config.get("max_concurrent", 2), + "current_tasks": 0, + "agent_type": "cli", + "cli_config": cli_config + } + + +# Global CLI agent manager instance +_cli_agent_manager = None + +def get_cli_agent_manager() -> CliAgentManager: + """Get the global CLI agent manager instance""" + global _cli_agent_manager + if _cli_agent_manager is None: + _cli_agent_manager = CliAgentManager() + return _cli_agent_manager \ No newline at end of file diff --git a/backend/app/core/hive_coordinator.py b/backend/app/core/hive_coordinator.py index 322d9738..47c312be 100644 --- a/backend/app/core/hive_coordinator.py +++ b/backend/app/core/hive_coordinator.py @@ -14,6 +14,7 @@ from enum import Enum from sqlalchemy.orm import Session from ..models.agent import Agent as ORMAgent from ..core.database import SessionLocal +from ..cli_agents.cli_agent_manager import get_cli_agent_manager class AgentType(Enum): KERNEL_DEV = "kernel_dev" @@ -21,6 +22,10 @@ class AgentType(Enum): PROFILER = "profiler" DOCS_WRITER = "docs_writer" TESTER = "tester" + # CLI Agent Types + CLI_GEMINI = "cli_gemini" + GENERAL_AI = "general_ai" + REASONING = "reasoning" class TaskStatus(Enum): PENDING = "pending" @@ -36,6 +41,8 @@ class Agent: specialty: AgentType max_concurrent: int = 2 current_tasks: int = 0 + agent_type: str = "ollama" # "ollama" or "cli" + cli_config: Optional[Dict[str, Any]] = None @dataclass class Task: @@ -56,6 +63,7 @@ class HiveCoordinator: self.tasks: Dict[str, Task] = {} self.task_queue: List[Task] = [] self.is_initialized = False + self.cli_agent_manager = None # Agent prompts with compressed notation for efficient inter-agent communication self.agent_prompts = { @@ -87,7 +95,26 @@ FOCUS:[clear-accurate]→[explain+demonstrate+guide+solve]""", SPEC:[coverage+benchmarks+edge-cases+automation]→[comprehensive+automated] OUT:[tests+benchmarks+edge_cases+ci_config]→JSON[tests|benchmarks|edge_cases|ci_config] -FOCUS:[full-coverage]→[test+measure+handle+automate]""" +FOCUS:[full-coverage]→[test+measure+handle+automate]""", + + # CLI Agent Prompts + AgentType.CLI_GEMINI: """[AI-assistant]→[general-purpose+reasoning]|[Gemini-2.5-Pro] +SPEC:[comprehensive-analysis+structured-responses+context-aware]→[accurate+helpful+efficient] +OUT:[detailed-response+reasoning+recommendations]→JSON[response|analysis|recommendations] + +FOCUS:[intelligent-assistance]→[understand+analyze+explain+assist]""", + + AgentType.GENERAL_AI: """[general-AI]→[multi-domain+adaptive]|[broad-knowledge] +SPEC:[flexible-reasoning+cross-domain+context-adaptation]→[accurate+comprehensive] +OUT:[detailed-analysis+insights+recommendations]→JSON[response|insights|recommendations] + +FOCUS:[adaptive-intelligence]→[understand+reason+synthesize+recommend]""", + + AgentType.REASONING: """[reasoning-expert]→[logical-analysis+problem-solving]|[advanced-reasoning] +SPEC:[complex-reasoning+step-by-step+logical-deduction]→[clear+systematic+thorough] +OUT:[detailed-reasoning+step-by-step+conclusions]→JSON[reasoning|steps|conclusions] + +FOCUS:[logical-analysis]→[analyze+deduce+explain+conclude]""" } def add_agent(self, agent: Agent): @@ -99,11 +126,25 @@ FOCUS:[full-coverage]→[test+measure+handle+automate]""" model=agent.model, specialty=agent.specialty.value, max_concurrent=agent.max_concurrent, - current_tasks=agent.current_tasks + current_tasks=agent.current_tasks, + agent_type=agent.agent_type, + cli_config=agent.cli_config ) db.add(db_agent) db.commit() db.refresh(db_agent) + + # If it's a CLI agent, register with CLI agent manager + if agent.agent_type == "cli" and agent.cli_config: + try: + if not self.cli_agent_manager: + self.cli_agent_manager = get_cli_agent_manager() + + self.cli_agent_manager.create_cli_agent(agent.id, agent.cli_config) + print(f"Registered CLI agent {agent.id} ({agent.specialty.value}) and initialized CLI backend") + except Exception as e: + print(f"Warning: Failed to initialize CLI backend for {agent.id}: {e}") + print(f"Registered agent {agent.id} ({agent.specialty.value}) at {agent.endpoint} and persisted to DB") def create_task(self, task_type: AgentType, context: Dict, priority: int = 3) -> Task: @@ -140,7 +181,9 @@ FOCUS:[full-coverage]→[test+measure+handle+automate]""" model=db_agent.model, specialty=AgentType(db_agent.specialty), max_concurrent=db_agent.max_concurrent, - current_tasks=db_agent.current_tasks + current_tasks=db_agent.current_tasks, + agent_type=db_agent.agent_type or "ollama", + cli_config=db_agent.cli_config ) return None @@ -158,6 +201,46 @@ FOCUS:[full-coverage]→[test+measure+handle+automate]""" task.status = TaskStatus.IN_PROGRESS task.assigned_agent = agent.id + try: + # Route to appropriate executor based on agent type + if agent.agent_type == "cli": + result = await self._execute_cli_task(task, agent) + else: + result = await self._execute_ollama_task(task, agent) + + task.result = result + task.status = TaskStatus.COMPLETED + task.completed_at = time.time() + print(f"Task {task.id} completed by {agent.id}") + return result + + except Exception as e: + task.status = TaskStatus.FAILED + task.result = {"error": str(e)} + print(f"Task {task.id} failed: {e}") + return {"error": str(e)} + + finally: + with SessionLocal() as db: + db_agent = db.query(ORMAgent).filter(ORMAgent.id == agent.id).first() + if db_agent: + db_agent.current_tasks -= 1 + db.add(db_agent) + db.commit() + db.refresh(db_agent) + agent.current_tasks = db_agent.current_tasks # Update in-memory object + + async def _execute_cli_task(self, task: Task, agent: Agent) -> Dict: + """Execute task on CLI agent""" + if not self.cli_agent_manager: + self.cli_agent_manager = get_cli_agent_manager() + if not self.cli_agent_manager.is_initialized: + await self.cli_agent_manager.initialize() + + return await self.cli_agent_manager.execute_cli_task(agent.id, task) + + async def _execute_ollama_task(self, task: Task, agent: Agent) -> Dict: + """Execute task on Ollama agent (original implementation)""" prompt = self.agent_prompts[task.type] # Construct compressed context using terse notation @@ -179,48 +262,22 @@ Complete task → respond JSON format specified above.""" } } - try: - # Use the session initialized in the coordinator - session = getattr(self, 'session', None) - if not session: - raise Exception("HTTP session not initialized") - - async with session.post( - f"{agent.endpoint}/api/generate", - json=payload, - timeout=aiohttp.ClientTimeout(total=300) # 5 minute timeout for AI tasks - ) as response: - if response.status == 200: - result = await response.json() - task.result = result - task.status = TaskStatus.COMPLETED - task.completed_at = time.time() - print(f"Task {task.id} completed by {agent.id}") - return result - else: - error_text = await response.text() - raise Exception(f"HTTP {response.status}: {error_text}") + # Use the session initialized in the coordinator + session = getattr(self, 'session', None) + if not session: + raise Exception("HTTP session not initialized") - except asyncio.TimeoutError: - task.status = TaskStatus.FAILED - task.result = {"error": "Task execution timeout"} - print(f"Task {task.id} timed out on {agent.id}") - return {"error": "Task execution timeout"} - except Exception as e: - task.status = TaskStatus.FAILED - task.result = {"error": str(e)} - print(f"Task {task.id} failed: {e}") - return {"error": str(e)} - - finally: - with SessionLocal() as db: - db_agent = db.query(ORMAgent).filter(ORMAgent.id == agent.id).first() - if db_agent: - db_agent.current_tasks -= 1 - db.add(db_agent) - db.commit() - db.refresh(db_agent) - agent.current_tasks = db_agent.current_tasks # Update in-memory object + async with session.post( + f"{agent.endpoint}/api/generate", + json=payload, + timeout=aiohttp.ClientTimeout(total=300) # 5 minute timeout for AI tasks + ) as response: + if response.status == 200: + result = await response.json() + return result + else: + error_text = await response.text() + raise Exception(f"HTTP {response.status}: {error_text}") async def process_queue(self): """Process the task queue with available agents""" @@ -335,6 +392,16 @@ Complete task → respond JSON format specified above.""" ) ) + # Initialize CLI agent manager + try: + self.cli_agent_manager = get_cli_agent_manager() + await self.cli_agent_manager.initialize() + print("✅ CLI Agent Manager initialized") + except Exception as e: + print(f"⚠️ CLI Agent Manager initialization failed: {e}") + print("Continuing without CLI agents...") + self.cli_agent_manager = None + # Initialize task processing self.task_processor = None @@ -350,6 +417,8 @@ Complete task → respond JSON format specified above.""" # Clean up any partial initialization if hasattr(self, 'session') and self.session: await self.session.close() + if hasattr(self, 'cli_agent_manager') and self.cli_agent_manager: + await self.cli_agent_manager.shutdown() raise async def shutdown(self): @@ -365,6 +434,10 @@ Complete task → respond JSON format specified above.""" task.status = TaskStatus.FAILED task.result = {"error": "Coordinator shutdown"} + # Shutdown CLI agent manager + if hasattr(self, 'cli_agent_manager') and self.cli_agent_manager: + await self.cli_agent_manager.shutdown() + # Close HTTP session if hasattr(self, 'session') and self.session: await self.session.close() diff --git a/backend/app/models/agent.py b/backend/app/models/agent.py index 3d291ce6..f80c4216 100644 --- a/backend/app/models/agent.py +++ b/backend/app/models/agent.py @@ -1,4 +1,4 @@ -from sqlalchemy import Column, Integer, String, DateTime +from sqlalchemy import Column, Integer, String, DateTime, JSON from sqlalchemy.sql import func from ..core.database import Base @@ -11,5 +11,21 @@ class Agent(Base): specialty = Column(String, nullable=False) max_concurrent = Column(Integer, default=2) current_tasks = Column(Integer, default=0) + agent_type = Column(String, default="ollama") # "ollama" or "cli" + cli_config = Column(JSON, nullable=True) # CLI-specific configuration created_at = Column(DateTime(timezone=True), server_default=func.now()) - updated_at = Column(DateTime(timezone=True), onupdate=func.now()) \ No newline at end of file + updated_at = Column(DateTime(timezone=True), onupdate=func.now()) + + def to_dict(self): + return { + "id": self.id, + "endpoint": self.endpoint, + "model": self.model, + "specialty": self.specialty, + "max_concurrent": self.max_concurrent, + "current_tasks": self.current_tasks, + "agent_type": self.agent_type, + "cli_config": self.cli_config, + "created_at": self.created_at.isoformat() if self.created_at else None, + "updated_at": self.updated_at.isoformat() if self.updated_at else None + } \ No newline at end of file diff --git a/ccli b/ccli new file mode 160000 index 00000000..85bf1341 --- /dev/null +++ b/ccli @@ -0,0 +1 @@ +Subproject commit 85bf1341f390b853b652911d63d79a8ba512a7e9 diff --git a/mcp-server/src/hive-client.ts b/mcp-server/src/hive-client.ts index 06302755..f1e8bd42 100644 --- a/mcp-server/src/hive-client.ts +++ b/mcp-server/src/hive-client.ts @@ -21,6 +21,17 @@ export interface Agent { status: 'available' | 'busy' | 'offline'; current_tasks: number; max_concurrent: number; + agent_type?: 'ollama' | 'cli'; + cli_config?: { + host?: string; + node_version?: string; + model?: string; + specialization?: string; + max_concurrent?: number; + command_timeout?: number; + ssh_timeout?: number; + agent_type?: string; + }; } export interface Task { @@ -97,6 +108,47 @@ export class HiveClient { return response.data; } + // CLI Agent Management + async getCliAgents(): Promise { + const response = await this.api.get('/api/cli-agents/'); + return response.data || []; + } + + async registerCliAgent(agentData: { + id: string; + host: string; + node_version: string; + model?: string; + specialization?: string; + max_concurrent?: number; + agent_type?: string; + command_timeout?: number; + ssh_timeout?: number; + }): Promise<{ agent_id: string; endpoint: string; health_check?: any }> { + const response = await this.api.post('/api/cli-agents/register', agentData); + return response.data; + } + + async registerPredefinedCliAgents(): Promise<{ results: any[] }> { + const response = await this.api.post('/api/cli-agents/register-predefined'); + return response.data; + } + + async healthCheckCliAgent(agentId: string): Promise { + const response = await this.api.post(`/api/cli-agents/${agentId}/health-check`); + return response.data; + } + + async getCliAgentStatistics(): Promise { + const response = await this.api.get('/api/cli-agents/statistics/all'); + return response.data; + } + + async unregisterCliAgent(agentId: string): Promise<{ success: boolean }> { + const response = await this.api.delete(`/api/cli-agents/${agentId}`); + return response.data; + } + // Task Management async createTask(taskData: { type: string; diff --git a/mcp-server/src/hive-tools.ts b/mcp-server/src/hive-tools.ts index 5c72115a..5edda76d 100644 --- a/mcp-server/src/hive-tools.ts +++ b/mcp-server/src/hive-tools.ts @@ -40,7 +40,7 @@ export class HiveTools { model: { type: 'string', description: 'Model name (e.g., codellama:34b)' }, specialty: { type: 'string', - enum: ['kernel_dev', 'pytorch_dev', 'profiler', 'docs_writer', 'tester'], + enum: ['kernel_dev', 'pytorch_dev', 'profiler', 'docs_writer', 'tester', 'cli_gemini', 'general_ai', 'reasoning'], description: 'Agent specialization area' }, max_concurrent: { type: 'number', description: 'Maximum concurrent tasks', default: 2 }, @@ -48,6 +48,46 @@ export class HiveTools { required: ['id', 'endpoint', 'model', 'specialty'], }, }, + { + name: 'hive_register_cli_agent', + description: 'Register a new CLI-based AI agent (e.g., Gemini CLI) in the Hive cluster', + inputSchema: { + type: 'object', + properties: { + id: { type: 'string', description: 'Unique CLI agent identifier' }, + host: { type: 'string', description: 'SSH hostname (e.g., walnut, ironwood)' }, + node_version: { type: 'string', description: 'Node.js version (e.g., v22.14.0)' }, + model: { type: 'string', description: 'Model name (e.g., gemini-2.5-pro)', default: 'gemini-2.5-pro' }, + specialization: { + type: 'string', + enum: ['general_ai', 'reasoning', 'code_analysis', 'documentation', 'testing'], + description: 'CLI agent specialization', + default: 'general_ai' + }, + max_concurrent: { type: 'number', description: 'Maximum concurrent tasks', default: 2 }, + agent_type: { type: 'string', description: 'CLI agent type', default: 'gemini' }, + command_timeout: { type: 'number', description: 'Command timeout in seconds', default: 60 }, + ssh_timeout: { type: 'number', description: 'SSH timeout in seconds', default: 5 }, + }, + required: ['id', 'host', 'node_version'], + }, + }, + { + name: 'hive_get_cli_agents', + description: 'Get all registered CLI agents in the Hive cluster', + inputSchema: { + type: 'object', + properties: {}, + }, + }, + { + name: 'hive_register_predefined_cli_agents', + description: 'Register predefined CLI agents (walnut-gemini, ironwood-gemini) with verified configurations', + inputSchema: { + type: 'object', + properties: {}, + }, + }, // Task Management Tools { @@ -58,7 +98,7 @@ export class HiveTools { properties: { type: { type: 'string', - enum: ['kernel_dev', 'pytorch_dev', 'profiler', 'docs_writer', 'tester'], + enum: ['kernel_dev', 'pytorch_dev', 'profiler', 'docs_writer', 'tester', 'cli_gemini', 'general_ai', 'reasoning'], description: 'Type of development task' }, priority: { @@ -204,7 +244,7 @@ export class HiveTools { items: { type: 'object', properties: { - specialization: { type: 'string', enum: ['kernel_dev', 'pytorch_dev', 'profiler', 'docs_writer', 'tester'] }, + specialization: { type: 'string', enum: ['kernel_dev', 'pytorch_dev', 'profiler', 'docs_writer', 'tester', 'cli_gemini', 'general_ai', 'reasoning'] }, task_description: { type: 'string' }, dependencies: { type: 'array', items: { type: 'string' } }, priority: { type: 'number', minimum: 1, maximum: 5 } @@ -254,6 +294,15 @@ export class HiveTools { case 'hive_register_agent': return await this.registerAgent(args); + + case 'hive_register_cli_agent': + return await this.registerCliAgent(args); + + case 'hive_get_cli_agents': + return await this.getCliAgents(); + + case 'hive_register_predefined_cli_agents': + return await this.registerPredefinedCliAgents(); // Task Management case 'hive_create_task': @@ -313,20 +362,48 @@ export class HiveTools { private async getAgents() { const agents = await this.hiveClient.getAgents(); + + // Group agents by type + const ollamaAgents = agents.filter(agent => !agent.agent_type || agent.agent_type === 'ollama'); + const cliAgents = agents.filter(agent => agent.agent_type === 'cli'); + + const formatAgent = (agent: any) => { + const typeIcon = agent.agent_type === 'cli' ? '⚡' : '🤖'; + const typeLabel = agent.agent_type === 'cli' ? 'CLI' : 'API'; + + return `${typeIcon} **${agent.id}** (${agent.specialty}) [${typeLabel}]\n` + + ` • Model: ${agent.model}\n` + + ` • Endpoint: ${agent.endpoint}\n` + + ` • Status: ${agent.status}\n` + + ` • Tasks: ${agent.current_tasks}/${agent.max_concurrent}\n`; + }; + + let text = `📋 **Hive Cluster Agents** (${agents.length} total)\n\n`; + + if (ollamaAgents.length > 0) { + text += `🤖 **Ollama Agents** (${ollamaAgents.length}):\n`; + text += ollamaAgents.map(formatAgent).join('\n') + '\n'; + } + + if (cliAgents.length > 0) { + text += `⚡ **CLI Agents** (${cliAgents.length}):\n`; + text += cliAgents.map(formatAgent).join('\n') + '\n'; + } + + if (agents.length === 0) { + text += 'No agents registered yet.\n\n'; + text += '**Getting Started:**\n'; + text += '• Use `hive_register_agent` for Ollama agents\n'; + text += '• Use `hive_register_cli_agent` for CLI agents\n'; + text += '• Use `hive_register_predefined_cli_agents` for quick CLI setup\n'; + text += '• Use `hive_bring_online` for auto-discovery'; + } + return { content: [ { type: 'text', - text: `📋 Hive Cluster Agents (${agents.length} total):\n\n${agents.length > 0 - ? agents.map(agent => - `🤖 **${agent.id}** (${agent.specialty})\n` + - ` • Model: ${agent.model}\n` + - ` • Endpoint: ${agent.endpoint}\n` + - ` • Status: ${agent.status}\n` + - ` • Tasks: ${agent.current_tasks}/${agent.max_concurrent}\n` - ).join('\n') - : 'No agents registered yet. Use hive_register_agent to add agents to the cluster.' - }`, + text, }, ], }; @@ -662,4 +739,145 @@ export class HiveTools { }; } } + + private async registerCliAgent(args: any) { + try { + const result = await this.hiveClient.registerCliAgent(args); + + return { + content: [ + { + type: 'text', + text: `✅ **CLI Agent Registered Successfully!**\n\n` + + `⚡ **Agent Details:**\n` + + `• ID: **${args.id}**\n` + + `• Host: ${args.host}\n` + + `• Specialization: ${args.specialization}\n` + + `• Model: ${args.model}\n` + + `• Node Version: ${args.node_version}\n` + + `• Max Concurrent: ${args.max_concurrent || 2}\n` + + `• Endpoint: ${result.endpoint}\n\n` + + `🔍 **Health Check:**\n` + + `• SSH: ${result.health_check?.ssh_healthy ? '✅ Connected' : '❌ Failed'}\n` + + `• CLI: ${result.health_check?.cli_healthy ? '✅ Working' : '❌ Failed'}\n` + + `${result.health_check?.response_time ? `• Response Time: ${result.health_check.response_time.toFixed(2)}s\n` : ''}` + + `\n🎯 **Ready for Tasks!** The CLI agent is now available for distributed AI coordination.`, + }, + ], + }; + } catch (error) { + return { + content: [ + { + type: 'text', + text: `❌ **Failed to register CLI agent**\n\n` + + `Error: ${error instanceof Error ? error.message : String(error)}\n\n` + + `**Troubleshooting:**\n` + + `• Verify SSH connectivity to ${args.host}\n` + + `• Ensure Gemini CLI is installed and accessible\n` + + `• Check Node.js version ${args.node_version} is available\n` + + `• Confirm Hive backend is running and accessible`, + }, + ], + isError: true, + }; + } + } + + private async getCliAgents() { + try { + const cliAgents = await this.hiveClient.getCliAgents(); + + return { + content: [ + { + type: 'text', + text: `⚡ **CLI Agents** (${cliAgents.length} total)\n\n${cliAgents.length > 0 + ? cliAgents.map((agent: any) => + `⚡ **${agent.id}** (${agent.specialization})\n` + + ` • Model: ${agent.model}\n` + + ` • Host: ${agent.cli_config?.host || 'Unknown'}\n` + + ` • Node Version: ${agent.cli_config?.node_version || 'Unknown'}\n` + + ` • Status: ${agent.status}\n` + + ` • Tasks: ${agent.current_tasks}/${agent.max_concurrent}\n` + + ` • Endpoint: ${agent.endpoint}\n` + ).join('\n') + : 'No CLI agents registered yet.\n\n' + + '**Getting Started:**\n' + + '• Use `hive_register_cli_agent` to register individual CLI agents\n' + + '• Use `hive_register_predefined_cli_agents` to register walnut-gemini and ironwood-gemini automatically' + }`, + }, + ], + }; + } catch (error) { + return { + content: [ + { + type: 'text', + text: `❌ **Failed to get CLI agents**\n\n` + + `Error: ${error instanceof Error ? error.message : String(error)}\n\n` + + `Please ensure the Hive backend is running and accessible.`, + }, + ], + isError: true, + }; + } + } + + private async registerPredefinedCliAgents() { + try { + const result = await this.hiveClient.registerPredefinedCliAgents(); + + const successCount = result.results.filter((r: any) => r.status === 'success').length; + const existingCount = result.results.filter((r: any) => r.status === 'already_exists').length; + const failedCount = result.results.filter((r: any) => r.status === 'failed').length; + + let text = `⚡ **Predefined CLI Agents Registration Complete**\n\n`; + text += `📊 **Summary:**\n`; + text += `• Successfully registered: ${successCount}\n`; + text += `• Already existed: ${existingCount}\n`; + text += `• Failed: ${failedCount}\n\n`; + + text += `📋 **Results:**\n`; + for (const res of result.results) { + const statusIcon = res.status === 'success' ? '✅' : + res.status === 'already_exists' ? '📋' : '❌'; + text += `${statusIcon} **${res.agent_id}**: ${res.message || res.error || res.status}\n`; + } + + if (successCount > 0) { + text += `\n🎯 **Ready for Action!** The CLI agents are now available for:\n`; + text += `• General AI tasks (walnut-gemini)\n`; + text += `• Advanced reasoning (ironwood-gemini)\n`; + text += `• Mixed agent coordination\n`; + text += `• Hybrid local/cloud AI orchestration`; + } + + return { + content: [ + { + type: 'text', + text, + }, + ], + }; + } catch (error) { + return { + content: [ + { + type: 'text', + text: `❌ **Failed to register predefined CLI agents**\n\n` + + `Error: ${error instanceof Error ? error.message : String(error)}\n\n` + + `**Troubleshooting:**\n` + + `• Ensure WALNUT and IRONWOOD are accessible via SSH\n` + + `• Verify Gemini CLI is installed on both machines\n` + + `• Check that Node.js v22.14.0 (WALNUT) and v22.17.0 (IRONWOOD) are available\n` + + `• Confirm Hive backend is running with CLI agent support`, + }, + ], + isError: true, + }; + } + } } \ No newline at end of file