🎉 Complete CCLI Integration: Phase 4 (MCP Server Updates)

IMPLEMENTATION COMPLETE: Successfully integrated Google Gemini CLI as
mixed agent type in Hive distributed AI platform.

## Phase 4 Achievements:
 Enhanced MCP tools with CLI agent support
 Added hive_register_cli_agent, hive_get_cli_agents tools
 Updated HiveClient interface for CLI agent management
 Mixed agent type coordination via MCP
 Comprehensive error handling and user feedback

## Key Features:
- CLI agent registration with health checks
- Mixed agent dashboard (🤖 Ollama +  CLI)
- Predefined agent quick setup (walnut-gemini, ironwood-gemini)
- SSH-based task execution with connection pooling
- Complete backward compatibility

## Technical Stack:
- MCP Tools: CLI agent management interface
- HiveClient: Enhanced API client with CLI support
- TypeScript: Full type safety for mixed agent operations
- Error Handling: Comprehensive CLI connectivity validation

## Production Ready:
 16 MCP tools with CLI agent coverage
 Mixed agent type task coordination
 Health monitoring and statistics collection
 Robust SSH execution with timeout handling
 Integration tested and validated

Ready for hybrid AI orchestration: 5 Ollama + 2 CLI agents

🤖 Generated with [Claude Code](https://claude.ai/code)

Co-Authored-By: Claude <noreply@anthropic.com>
This commit is contained in:
anthonyrawlins
2025-07-10 12:11:27 +10:00
parent 85bf1341f3
commit 2915ee9aa7
9 changed files with 1045 additions and 59 deletions

View File

@@ -14,6 +14,7 @@ from enum import Enum
from sqlalchemy.orm import Session
from ..models.agent import Agent as ORMAgent
from ..core.database import SessionLocal
from ..cli_agents.cli_agent_manager import get_cli_agent_manager
class AgentType(Enum):
KERNEL_DEV = "kernel_dev"
@@ -21,6 +22,10 @@ class AgentType(Enum):
PROFILER = "profiler"
DOCS_WRITER = "docs_writer"
TESTER = "tester"
# CLI Agent Types
CLI_GEMINI = "cli_gemini"
GENERAL_AI = "general_ai"
REASONING = "reasoning"
class TaskStatus(Enum):
PENDING = "pending"
@@ -36,6 +41,8 @@ class Agent:
specialty: AgentType
max_concurrent: int = 2
current_tasks: int = 0
agent_type: str = "ollama" # "ollama" or "cli"
cli_config: Optional[Dict[str, Any]] = None
@dataclass
class Task:
@@ -56,6 +63,7 @@ class HiveCoordinator:
self.tasks: Dict[str, Task] = {}
self.task_queue: List[Task] = []
self.is_initialized = False
self.cli_agent_manager = None
# Agent prompts with compressed notation for efficient inter-agent communication
self.agent_prompts = {
@@ -87,7 +95,26 @@ FOCUS:[clear-accurate]→[explain+demonstrate+guide+solve]""",
SPEC:[coverage+benchmarks+edge-cases+automation]→[comprehensive+automated]
OUT:[tests+benchmarks+edge_cases+ci_config]→JSON[tests|benchmarks|edge_cases|ci_config]
FOCUS:[full-coverage]→[test+measure+handle+automate]"""
FOCUS:[full-coverage]→[test+measure+handle+automate]""",
# CLI Agent Prompts
AgentType.CLI_GEMINI: """[AI-assistant]→[general-purpose+reasoning]|[Gemini-2.5-Pro]
SPEC:[comprehensive-analysis+structured-responses+context-aware]→[accurate+helpful+efficient]
OUT:[detailed-response+reasoning+recommendations]→JSON[response|analysis|recommendations]
FOCUS:[intelligent-assistance]→[understand+analyze+explain+assist]""",
AgentType.GENERAL_AI: """[general-AI]→[multi-domain+adaptive]|[broad-knowledge]
SPEC:[flexible-reasoning+cross-domain+context-adaptation]→[accurate+comprehensive]
OUT:[detailed-analysis+insights+recommendations]→JSON[response|insights|recommendations]
FOCUS:[adaptive-intelligence]→[understand+reason+synthesize+recommend]""",
AgentType.REASONING: """[reasoning-expert]→[logical-analysis+problem-solving]|[advanced-reasoning]
SPEC:[complex-reasoning+step-by-step+logical-deduction]→[clear+systematic+thorough]
OUT:[detailed-reasoning+step-by-step+conclusions]→JSON[reasoning|steps|conclusions]
FOCUS:[logical-analysis]→[analyze+deduce+explain+conclude]"""
}
def add_agent(self, agent: Agent):
@@ -99,11 +126,25 @@ FOCUS:[full-coverage]→[test+measure+handle+automate]"""
model=agent.model,
specialty=agent.specialty.value,
max_concurrent=agent.max_concurrent,
current_tasks=agent.current_tasks
current_tasks=agent.current_tasks,
agent_type=agent.agent_type,
cli_config=agent.cli_config
)
db.add(db_agent)
db.commit()
db.refresh(db_agent)
# If it's a CLI agent, register with CLI agent manager
if agent.agent_type == "cli" and agent.cli_config:
try:
if not self.cli_agent_manager:
self.cli_agent_manager = get_cli_agent_manager()
self.cli_agent_manager.create_cli_agent(agent.id, agent.cli_config)
print(f"Registered CLI agent {agent.id} ({agent.specialty.value}) and initialized CLI backend")
except Exception as e:
print(f"Warning: Failed to initialize CLI backend for {agent.id}: {e}")
print(f"Registered agent {agent.id} ({agent.specialty.value}) at {agent.endpoint} and persisted to DB")
def create_task(self, task_type: AgentType, context: Dict, priority: int = 3) -> Task:
@@ -140,7 +181,9 @@ FOCUS:[full-coverage]→[test+measure+handle+automate]"""
model=db_agent.model,
specialty=AgentType(db_agent.specialty),
max_concurrent=db_agent.max_concurrent,
current_tasks=db_agent.current_tasks
current_tasks=db_agent.current_tasks,
agent_type=db_agent.agent_type or "ollama",
cli_config=db_agent.cli_config
)
return None
@@ -158,6 +201,46 @@ FOCUS:[full-coverage]→[test+measure+handle+automate]"""
task.status = TaskStatus.IN_PROGRESS
task.assigned_agent = agent.id
try:
# Route to appropriate executor based on agent type
if agent.agent_type == "cli":
result = await self._execute_cli_task(task, agent)
else:
result = await self._execute_ollama_task(task, agent)
task.result = result
task.status = TaskStatus.COMPLETED
task.completed_at = time.time()
print(f"Task {task.id} completed by {agent.id}")
return result
except Exception as e:
task.status = TaskStatus.FAILED
task.result = {"error": str(e)}
print(f"Task {task.id} failed: {e}")
return {"error": str(e)}
finally:
with SessionLocal() as db:
db_agent = db.query(ORMAgent).filter(ORMAgent.id == agent.id).first()
if db_agent:
db_agent.current_tasks -= 1
db.add(db_agent)
db.commit()
db.refresh(db_agent)
agent.current_tasks = db_agent.current_tasks # Update in-memory object
async def _execute_cli_task(self, task: Task, agent: Agent) -> Dict:
"""Execute task on CLI agent"""
if not self.cli_agent_manager:
self.cli_agent_manager = get_cli_agent_manager()
if not self.cli_agent_manager.is_initialized:
await self.cli_agent_manager.initialize()
return await self.cli_agent_manager.execute_cli_task(agent.id, task)
async def _execute_ollama_task(self, task: Task, agent: Agent) -> Dict:
"""Execute task on Ollama agent (original implementation)"""
prompt = self.agent_prompts[task.type]
# Construct compressed context using terse notation
@@ -179,48 +262,22 @@ Complete task → respond JSON format specified above."""
}
}
try:
# Use the session initialized in the coordinator
session = getattr(self, 'session', None)
if not session:
raise Exception("HTTP session not initialized")
async with session.post(
f"{agent.endpoint}/api/generate",
json=payload,
timeout=aiohttp.ClientTimeout(total=300) # 5 minute timeout for AI tasks
) as response:
if response.status == 200:
result = await response.json()
task.result = result
task.status = TaskStatus.COMPLETED
task.completed_at = time.time()
print(f"Task {task.id} completed by {agent.id}")
return result
else:
error_text = await response.text()
raise Exception(f"HTTP {response.status}: {error_text}")
# Use the session initialized in the coordinator
session = getattr(self, 'session', None)
if not session:
raise Exception("HTTP session not initialized")
except asyncio.TimeoutError:
task.status = TaskStatus.FAILED
task.result = {"error": "Task execution timeout"}
print(f"Task {task.id} timed out on {agent.id}")
return {"error": "Task execution timeout"}
except Exception as e:
task.status = TaskStatus.FAILED
task.result = {"error": str(e)}
print(f"Task {task.id} failed: {e}")
return {"error": str(e)}
finally:
with SessionLocal() as db:
db_agent = db.query(ORMAgent).filter(ORMAgent.id == agent.id).first()
if db_agent:
db_agent.current_tasks -= 1
db.add(db_agent)
db.commit()
db.refresh(db_agent)
agent.current_tasks = db_agent.current_tasks # Update in-memory object
async with session.post(
f"{agent.endpoint}/api/generate",
json=payload,
timeout=aiohttp.ClientTimeout(total=300) # 5 minute timeout for AI tasks
) as response:
if response.status == 200:
result = await response.json()
return result
else:
error_text = await response.text()
raise Exception(f"HTTP {response.status}: {error_text}")
async def process_queue(self):
"""Process the task queue with available agents"""
@@ -335,6 +392,16 @@ Complete task → respond JSON format specified above."""
)
)
# Initialize CLI agent manager
try:
self.cli_agent_manager = get_cli_agent_manager()
await self.cli_agent_manager.initialize()
print("✅ CLI Agent Manager initialized")
except Exception as e:
print(f"⚠️ CLI Agent Manager initialization failed: {e}")
print("Continuing without CLI agents...")
self.cli_agent_manager = None
# Initialize task processing
self.task_processor = None
@@ -350,6 +417,8 @@ Complete task → respond JSON format specified above."""
# Clean up any partial initialization
if hasattr(self, 'session') and self.session:
await self.session.close()
if hasattr(self, 'cli_agent_manager') and self.cli_agent_manager:
await self.cli_agent_manager.shutdown()
raise
async def shutdown(self):
@@ -365,6 +434,10 @@ Complete task → respond JSON format specified above."""
task.status = TaskStatus.FAILED
task.result = {"error": "Coordinator shutdown"}
# Shutdown CLI agent manager
if hasattr(self, 'cli_agent_manager') and self.cli_agent_manager:
await self.cli_agent_manager.shutdown()
# Close HTTP session
if hasattr(self, 'session') and self.session:
await self.session.close()