Files
hive/backend/ccli_src/agents/cli_agent_factory.py
anthonyrawlins 8b32d54e79 Copy CCLI source to backend for Docker builds
🤖 Generated with [Claude Code](https://claude.ai/code)

Co-Authored-By: Claude <noreply@anthropic.com>
2025-07-10 12:46:52 +10:00

344 lines
12 KiB
Python

"""
CLI Agent Factory
Creates and manages CLI-based agents with predefined configurations.
"""
import logging
from typing import Dict, List, Optional, Any
from enum import Enum
from dataclasses import dataclass
from agents.gemini_cli_agent import GeminiCliAgent, GeminiCliConfig
class CliAgentType(Enum):
"""Supported CLI agent types"""
GEMINI = "gemini"
class Specialization(Enum):
"""Agent specializations"""
GENERAL_AI = "general_ai"
REASONING = "reasoning"
CODE_ANALYSIS = "code_analysis"
DOCUMENTATION = "documentation"
TESTING = "testing"
@dataclass
class CliAgentDefinition:
"""Definition for a CLI agent instance"""
agent_id: str
agent_type: CliAgentType
config: Dict[str, Any]
specialization: Specialization
description: str
enabled: bool = True
class CliAgentFactory:
"""
Factory for creating and managing CLI agents
Provides predefined configurations for known agent instances and
supports dynamic agent creation with custom configurations.
"""
# Predefined agent configurations based on verified environment testing
PREDEFINED_AGENTS = {
"walnut-gemini": CliAgentDefinition(
agent_id="walnut-gemini",
agent_type=CliAgentType.GEMINI,
config={
"host": "walnut",
"node_version": "v22.14.0",
"model": "gemini-2.5-pro",
"max_concurrent": 2,
"command_timeout": 60,
"ssh_timeout": 5
},
specialization=Specialization.GENERAL_AI,
description="Gemini CLI agent on WALNUT for general AI tasks",
enabled=True
),
"ironwood-gemini": CliAgentDefinition(
agent_id="ironwood-gemini",
agent_type=CliAgentType.GEMINI,
config={
"host": "ironwood",
"node_version": "v22.17.0",
"model": "gemini-2.5-pro",
"max_concurrent": 2,
"command_timeout": 60,
"ssh_timeout": 5
},
specialization=Specialization.REASONING,
description="Gemini CLI agent on IRONWOOD for reasoning tasks (faster)",
enabled=True
),
# Additional specialized configurations
"walnut-gemini-code": CliAgentDefinition(
agent_id="walnut-gemini-code",
agent_type=CliAgentType.GEMINI,
config={
"host": "walnut",
"node_version": "v22.14.0",
"model": "gemini-2.5-pro",
"max_concurrent": 1, # More conservative for code analysis
"command_timeout": 90, # Longer timeout for complex code analysis
"ssh_timeout": 5
},
specialization=Specialization.CODE_ANALYSIS,
description="Gemini CLI agent specialized for code analysis tasks",
enabled=False # Start disabled, enable when needed
),
"ironwood-gemini-docs": CliAgentDefinition(
agent_id="ironwood-gemini-docs",
agent_type=CliAgentType.GEMINI,
config={
"host": "ironwood",
"node_version": "v22.17.0",
"model": "gemini-2.5-pro",
"max_concurrent": 2,
"command_timeout": 45,
"ssh_timeout": 5
},
specialization=Specialization.DOCUMENTATION,
description="Gemini CLI agent for documentation generation",
enabled=False
)
}
def __init__(self):
self.logger = logging.getLogger(__name__)
self.active_agents: Dict[str, GeminiCliAgent] = {}
@classmethod
def get_predefined_agent_ids(cls) -> List[str]:
"""Get list of all predefined agent IDs"""
return list(cls.PREDEFINED_AGENTS.keys())
@classmethod
def get_enabled_agent_ids(cls) -> List[str]:
"""Get list of enabled predefined agent IDs"""
return [
agent_id for agent_id, definition in cls.PREDEFINED_AGENTS.items()
if definition.enabled
]
@classmethod
def get_agent_definition(cls, agent_id: str) -> Optional[CliAgentDefinition]:
"""Get predefined agent definition by ID"""
return cls.PREDEFINED_AGENTS.get(agent_id)
def create_agent(self, agent_id: str, custom_config: Optional[Dict[str, Any]] = None) -> GeminiCliAgent:
"""
Create a CLI agent instance
Args:
agent_id: ID of predefined agent or custom ID
custom_config: Optional custom configuration to override defaults
Returns:
GeminiCliAgent instance
Raises:
ValueError: If agent_id is unknown and no custom_config provided
"""
# Check if agent already exists
if agent_id in self.active_agents:
self.logger.warning(f"Agent {agent_id} already exists, returning existing instance")
return self.active_agents[agent_id]
# Get configuration
if agent_id in self.PREDEFINED_AGENTS:
definition = self.PREDEFINED_AGENTS[agent_id]
if not definition.enabled:
self.logger.warning(f"Agent {agent_id} is disabled but being created anyway")
config_dict = definition.config.copy()
specialization = definition.specialization.value
# Apply custom overrides
if custom_config:
config_dict.update(custom_config)
elif custom_config:
# Custom agent configuration
config_dict = custom_config
specialization = custom_config.get("specialization", "general_ai")
else:
raise ValueError(f"Unknown agent ID '{agent_id}' and no custom configuration provided")
# Determine agent type and create appropriate agent
agent_type = config_dict.get("agent_type", "gemini")
if agent_type == "gemini" or agent_type == CliAgentType.GEMINI:
agent = self._create_gemini_agent(agent_id, config_dict, specialization)
else:
raise ValueError(f"Unsupported agent type: {agent_type}")
# Store in active agents
self.active_agents[agent_id] = agent
self.logger.info(f"Created CLI agent: {agent_id} ({specialization})")
return agent
def _create_gemini_agent(self, agent_id: str, config_dict: Dict[str, Any], specialization: str) -> GeminiCliAgent:
"""Create a Gemini CLI agent with the given configuration"""
# Create GeminiCliConfig from dictionary
config = GeminiCliConfig(
host=config_dict["host"],
node_version=config_dict["node_version"],
model=config_dict.get("model", "gemini-2.5-pro"),
max_concurrent=config_dict.get("max_concurrent", 2),
command_timeout=config_dict.get("command_timeout", 60),
ssh_timeout=config_dict.get("ssh_timeout", 5),
node_path=config_dict.get("node_path"),
gemini_path=config_dict.get("gemini_path")
)
return GeminiCliAgent(config, specialization)
def get_agent(self, agent_id: str) -> Optional[GeminiCliAgent]:
"""Get an existing agent instance"""
return self.active_agents.get(agent_id)
def remove_agent(self, agent_id: str) -> bool:
"""Remove an agent instance"""
if agent_id in self.active_agents:
agent = self.active_agents.pop(agent_id)
# Note: Cleanup should be called by the caller if needed
self.logger.info(f"Removed CLI agent: {agent_id}")
return True
return False
def get_active_agents(self) -> Dict[str, GeminiCliAgent]:
"""Get all active agent instances"""
return self.active_agents.copy()
def get_agent_info(self, agent_id: str) -> Optional[Dict[str, Any]]:
"""Get information about an agent"""
# Check active agents
if agent_id in self.active_agents:
agent = self.active_agents[agent_id]
return {
"agent_id": agent_id,
"status": "active",
"host": agent.config.host,
"model": agent.config.model,
"specialization": agent.specialization,
"active_tasks": len(agent.active_tasks),
"max_concurrent": agent.config.max_concurrent,
"statistics": agent.get_statistics()
}
# Check predefined but not active
if agent_id in self.PREDEFINED_AGENTS:
definition = self.PREDEFINED_AGENTS[agent_id]
return {
"agent_id": agent_id,
"status": "available" if definition.enabled else "disabled",
"agent_type": definition.agent_type.value,
"specialization": definition.specialization.value,
"description": definition.description,
"config": definition.config
}
return None
def list_all_agents(self) -> Dict[str, Dict[str, Any]]:
"""List all agents (predefined and active)"""
all_agents = {}
# Add predefined agents
for agent_id in self.PREDEFINED_AGENTS:
all_agents[agent_id] = self.get_agent_info(agent_id)
# Add any custom active agents not in predefined list
for agent_id in self.active_agents:
if agent_id not in all_agents:
all_agents[agent_id] = self.get_agent_info(agent_id)
return all_agents
async def health_check_all(self) -> Dict[str, Dict[str, Any]]:
"""Perform health checks on all active agents"""
health_results = {}
for agent_id, agent in self.active_agents.items():
try:
health_results[agent_id] = await agent.health_check()
except Exception as e:
health_results[agent_id] = {
"agent_id": agent_id,
"error": str(e),
"healthy": False
}
return health_results
async def cleanup_all(self):
"""Clean up all active agents"""
for agent_id, agent in list(self.active_agents.items()):
try:
await agent.cleanup()
self.logger.info(f"Cleaned up agent: {agent_id}")
except Exception as e:
self.logger.error(f"Error cleaning up agent {agent_id}: {e}")
self.active_agents.clear()
@classmethod
def create_custom_agent_config(cls, host: str, node_version: str,
specialization: str = "general_ai",
**kwargs) -> Dict[str, Any]:
"""
Helper to create custom agent configuration
Args:
host: Target host for SSH connection
node_version: Node.js version (e.g., "v22.14.0")
specialization: Agent specialization
**kwargs: Additional configuration options
Returns:
Configuration dictionary for create_agent()
"""
config = {
"host": host,
"node_version": node_version,
"specialization": specialization,
"agent_type": "gemini",
"model": "gemini-2.5-pro",
"max_concurrent": 2,
"command_timeout": 60,
"ssh_timeout": 5
}
config.update(kwargs)
return config
# Module-level convenience functions
_default_factory = None
def get_default_factory() -> CliAgentFactory:
"""Get the default CLI agent factory instance"""
global _default_factory
if _default_factory is None:
_default_factory = CliAgentFactory()
return _default_factory
def create_agent(agent_id: str, custom_config: Optional[Dict[str, Any]] = None) -> GeminiCliAgent:
"""Convenience function to create an agent using the default factory"""
factory = get_default_factory()
return factory.create_agent(agent_id, custom_config)