Complete Hive platform functionality and expand cluster to 7 agents
Major Features Added: - Fix Socket.IO connectivity by updating Dockerfile to use socket_app - Resolve distributed workflows API to return arrays instead of errors - Expand agent coverage from 3 to 7 agents (added OAK and ROSEWOOD) - Create comprehensive systemd service for MCP server with auto-discovery - Add daemon mode with periodic agent discovery every 5 minutes - Implement comprehensive test suite with 100% pass rate Infrastructure Improvements: - Enhanced database connection handling with retry logic - Improved agent registration with persistent storage - Added proper error handling for distributed workflows endpoint - Created management scripts for service lifecycle operations Agent Cluster Expansion: - ACACIA: deepseek-r1:7b (kernel_dev) - WALNUT: starcoder2:15b (pytorch_dev) - IRONWOOD: deepseek-coder-v2 (profiler) - OAK: codellama:latest (docs_writer) - OAK-TESTER: deepseek-r1:latest (tester) - ROSEWOOD: deepseek-coder-v2:latest (kernel_dev) - ROSEWOOD-VISION: llama3.2-vision:11b (tester) System Status: All 7 agents healthy, Socket.IO operational, MCP server fully functional 🤖 Generated with [Claude Code](https://claude.ai/code) Co-Authored-By: Claude <noreply@anthropic.com>
This commit is contained in:
@@ -1,23 +1,52 @@
|
||||
from fastapi import APIRouter, Depends, HTTPException
|
||||
from fastapi import APIRouter, Depends, HTTPException, Request
|
||||
from typing import List, Dict, Any
|
||||
from ..core.auth import get_current_user
|
||||
from ..core.hive_coordinator import Agent, AgentType
|
||||
|
||||
router = APIRouter()
|
||||
|
||||
from app.core.database import SessionLocal
|
||||
from app.models.agent import Agent as ORMAgent
|
||||
|
||||
@router.get("/agents")
|
||||
async def get_agents(current_user: dict = Depends(get_current_user)):
|
||||
async def get_agents(request: Request, current_user: dict = Depends(get_current_user)):
|
||||
"""Get all registered agents"""
|
||||
with SessionLocal() as db:
|
||||
db_agents = db.query(ORMAgent).all()
|
||||
agents_list = []
|
||||
for db_agent in db_agents:
|
||||
agents_list.append({
|
||||
"id": db_agent.id,
|
||||
"endpoint": db_agent.endpoint,
|
||||
"model": db_agent.model,
|
||||
"specialty": db_agent.specialty,
|
||||
"max_concurrent": db_agent.max_concurrent,
|
||||
"current_tasks": db_agent.current_tasks
|
||||
})
|
||||
|
||||
return {
|
||||
"agents": [],
|
||||
"total": 0,
|
||||
"message": "Agents endpoint ready"
|
||||
"agents": agents_list,
|
||||
"total": len(agents_list),
|
||||
}
|
||||
|
||||
@router.post("/agents")
|
||||
async def register_agent(agent_data: Dict[str, Any], current_user: dict = Depends(get_current_user)):
|
||||
async def register_agent(agent_data: Dict[str, Any], request: Request, current_user: dict = Depends(get_current_user)):
|
||||
"""Register a new agent"""
|
||||
return {
|
||||
"status": "success",
|
||||
"message": "Agent registration endpoint ready",
|
||||
"agent_id": "placeholder"
|
||||
}
|
||||
hive_coordinator = request.app.state.hive_coordinator
|
||||
|
||||
try:
|
||||
agent = Agent(
|
||||
id=agent_data["id"],
|
||||
endpoint=agent_data["endpoint"],
|
||||
model=agent_data["model"],
|
||||
specialty=AgentType(agent_data["specialty"]),
|
||||
max_concurrent=agent_data.get("max_concurrent", 2),
|
||||
)
|
||||
hive_coordinator.add_agent(agent)
|
||||
return {
|
||||
"status": "success",
|
||||
"message": f"Agent {agent.id} registered successfully",
|
||||
"agent_id": agent.id
|
||||
}
|
||||
except (KeyError, ValueError) as e:
|
||||
raise HTTPException(status_code=400, detail=f"Invalid agent data: {e}")
|
||||
499
backend/app/api/distributed_workflows.py
Normal file
499
backend/app/api/distributed_workflows.py
Normal file
@@ -0,0 +1,499 @@
|
||||
"""
|
||||
Distributed Workflow API Endpoints
|
||||
RESTful API for managing distributed development workflows across the cluster
|
||||
"""
|
||||
|
||||
from fastapi import APIRouter, HTTPException, BackgroundTasks, Depends, Request
|
||||
from pydantic import BaseModel, Field
|
||||
from typing import Dict, Any, List, Optional
|
||||
import asyncio
|
||||
import logging
|
||||
from datetime import datetime
|
||||
|
||||
from ..core.distributed_coordinator import DistributedCoordinator, TaskType, TaskPriority
|
||||
from ..core.hive_coordinator import HiveCoordinator
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
router = APIRouter(prefix="/api/distributed", tags=["distributed-workflows"])
|
||||
|
||||
# Global coordinator instance
|
||||
distributed_coordinator: Optional[DistributedCoordinator] = None
|
||||
|
||||
class WorkflowRequest(BaseModel):
|
||||
"""Request model for workflow submission"""
|
||||
name: str = Field(..., description="Workflow name")
|
||||
description: str = Field(..., description="Workflow description")
|
||||
requirements: str = Field(..., description="Development requirements")
|
||||
context: str = Field(default="", description="Additional context")
|
||||
language: str = Field(default="python", description="Target programming language")
|
||||
test_types: List[str] = Field(default=["unit", "integration"], description="Types of tests to generate")
|
||||
optimization_targets: List[str] = Field(default=["performance", "memory"], description="Optimization focus areas")
|
||||
build_config: Dict[str, Any] = Field(default_factory=dict, description="Build configuration")
|
||||
priority: str = Field(default="normal", description="Workflow priority (critical, high, normal, low)")
|
||||
|
||||
class TaskStatus(BaseModel):
|
||||
"""Task status model"""
|
||||
id: str
|
||||
type: str
|
||||
status: str
|
||||
assigned_agent: Optional[str]
|
||||
execution_time: float
|
||||
result: Optional[Dict[str, Any]] = None
|
||||
|
||||
class WorkflowStatus(BaseModel):
|
||||
"""Workflow status response model"""
|
||||
workflow_id: str
|
||||
name: str
|
||||
total_tasks: int
|
||||
completed_tasks: int
|
||||
failed_tasks: int
|
||||
progress: float
|
||||
status: str
|
||||
created_at: datetime
|
||||
tasks: List[TaskStatus]
|
||||
|
||||
class ClusterStatus(BaseModel):
|
||||
"""Cluster status model"""
|
||||
total_agents: int
|
||||
healthy_agents: int
|
||||
total_capacity: int
|
||||
current_load: int
|
||||
utilization: float
|
||||
agents: List[Dict[str, Any]]
|
||||
|
||||
class PerformanceMetrics(BaseModel):
|
||||
"""Performance metrics model"""
|
||||
total_workflows: int
|
||||
completed_workflows: int
|
||||
failed_workflows: int
|
||||
average_completion_time: float
|
||||
throughput_per_hour: float
|
||||
agent_performance: Dict[str, Dict[str, float]]
|
||||
|
||||
async def get_coordinator() -> DistributedCoordinator:
|
||||
"""Dependency to get the distributed coordinator instance"""
|
||||
# Import here to avoid circular imports
|
||||
from ..main import distributed_coordinator as main_coordinator
|
||||
if main_coordinator is None:
|
||||
raise HTTPException(status_code=503, detail="Distributed coordinator not initialized")
|
||||
return main_coordinator
|
||||
|
||||
@router.on_event("startup")
|
||||
async def startup_distributed_coordinator():
|
||||
"""Initialize the distributed coordinator on startup"""
|
||||
global distributed_coordinator
|
||||
try:
|
||||
distributed_coordinator = DistributedCoordinator()
|
||||
await distributed_coordinator.start()
|
||||
logger.info("Distributed coordinator started successfully")
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to start distributed coordinator: {e}")
|
||||
raise
|
||||
|
||||
@router.on_event("shutdown")
|
||||
async def shutdown_distributed_coordinator():
|
||||
"""Shutdown the distributed coordinator"""
|
||||
global distributed_coordinator
|
||||
if distributed_coordinator:
|
||||
await distributed_coordinator.stop()
|
||||
logger.info("Distributed coordinator stopped")
|
||||
|
||||
@router.post("/workflows", response_model=Dict[str, str])
|
||||
async def submit_workflow(
|
||||
workflow: WorkflowRequest,
|
||||
background_tasks: BackgroundTasks,
|
||||
coordinator: DistributedCoordinator = Depends(get_coordinator)
|
||||
):
|
||||
"""
|
||||
Submit a new development workflow for distributed execution
|
||||
|
||||
This endpoint creates a complete development workflow that includes:
|
||||
- Code generation
|
||||
- Code review
|
||||
- Testing
|
||||
- Compilation
|
||||
- Optimization
|
||||
|
||||
The workflow is distributed across the cluster based on agent capabilities.
|
||||
"""
|
||||
try:
|
||||
# Convert priority string to enum
|
||||
priority_map = {
|
||||
"critical": TaskPriority.CRITICAL,
|
||||
"high": TaskPriority.HIGH,
|
||||
"normal": TaskPriority.NORMAL,
|
||||
"low": TaskPriority.LOW
|
||||
}
|
||||
|
||||
workflow_dict = {
|
||||
"name": workflow.name,
|
||||
"description": workflow.description,
|
||||
"requirements": workflow.requirements,
|
||||
"context": workflow.context,
|
||||
"language": workflow.language,
|
||||
"test_types": workflow.test_types,
|
||||
"optimization_targets": workflow.optimization_targets,
|
||||
"build_config": workflow.build_config,
|
||||
"priority": priority_map.get(workflow.priority, TaskPriority.NORMAL)
|
||||
}
|
||||
|
||||
workflow_id = await coordinator.submit_workflow(workflow_dict)
|
||||
|
||||
return {
|
||||
"workflow_id": workflow_id,
|
||||
"message": "Workflow submitted successfully",
|
||||
"status": "accepted"
|
||||
}
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to submit workflow: {e}")
|
||||
raise HTTPException(status_code=500, detail=f"Failed to submit workflow: {str(e)}")
|
||||
|
||||
@router.get("/workflows/{workflow_id}", response_model=WorkflowStatus)
|
||||
async def get_workflow_status(
|
||||
workflow_id: str,
|
||||
coordinator: DistributedCoordinator = Depends(get_coordinator)
|
||||
):
|
||||
"""
|
||||
Get detailed status of a specific workflow
|
||||
|
||||
Returns comprehensive information about workflow progress,
|
||||
individual task status, and execution metrics.
|
||||
"""
|
||||
try:
|
||||
status = await coordinator.get_workflow_status(workflow_id)
|
||||
|
||||
if "error" in status:
|
||||
raise HTTPException(status_code=404, detail=status["error"])
|
||||
|
||||
return WorkflowStatus(
|
||||
workflow_id=status["workflow_id"],
|
||||
name=f"Workflow {workflow_id}", # Could be enhanced with actual name storage
|
||||
total_tasks=status["total_tasks"],
|
||||
completed_tasks=status["completed_tasks"],
|
||||
failed_tasks=status["failed_tasks"],
|
||||
progress=status["progress"],
|
||||
status=status["status"],
|
||||
created_at=datetime.now(), # Could be enhanced with actual creation time
|
||||
tasks=[
|
||||
TaskStatus(
|
||||
id=task["id"],
|
||||
type=task["type"],
|
||||
status=task["status"],
|
||||
assigned_agent=task["assigned_agent"],
|
||||
execution_time=task["execution_time"],
|
||||
result=None # Could include task results if needed
|
||||
)
|
||||
for task in status["tasks"]
|
||||
]
|
||||
)
|
||||
|
||||
except HTTPException:
|
||||
raise
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to get workflow status: {e}")
|
||||
raise HTTPException(status_code=500, detail=f"Failed to get workflow status: {str(e)}")
|
||||
|
||||
@router.get("/cluster/status", response_model=ClusterStatus)
|
||||
async def get_cluster_status(
|
||||
coordinator: DistributedCoordinator = Depends(get_coordinator)
|
||||
):
|
||||
"""
|
||||
Get current cluster status and agent information
|
||||
|
||||
Returns real-time information about all agents including:
|
||||
- Health status
|
||||
- Current load
|
||||
- Performance metrics
|
||||
- Specializations
|
||||
"""
|
||||
try:
|
||||
agents_info = []
|
||||
total_capacity = 0
|
||||
current_load = 0
|
||||
healthy_agents = 0
|
||||
|
||||
for agent in coordinator.agents.values():
|
||||
if agent.health_status == "healthy":
|
||||
healthy_agents += 1
|
||||
|
||||
total_capacity += agent.max_concurrent
|
||||
current_load += agent.current_load
|
||||
|
||||
agents_info.append({
|
||||
"id": agent.id,
|
||||
"endpoint": agent.endpoint,
|
||||
"model": agent.model,
|
||||
"gpu_type": agent.gpu_type,
|
||||
"specializations": [spec.value for spec in agent.specializations],
|
||||
"max_concurrent": agent.max_concurrent,
|
||||
"current_load": agent.current_load,
|
||||
"utilization": (agent.current_load / agent.max_concurrent) * 100,
|
||||
"performance_score": round(agent.performance_score, 3),
|
||||
"last_response_time": round(agent.last_response_time, 2),
|
||||
"health_status": agent.health_status
|
||||
})
|
||||
|
||||
utilization = (current_load / total_capacity) * 100 if total_capacity > 0 else 0
|
||||
|
||||
return ClusterStatus(
|
||||
total_agents=len(coordinator.agents),
|
||||
healthy_agents=healthy_agents,
|
||||
total_capacity=total_capacity,
|
||||
current_load=current_load,
|
||||
utilization=round(utilization, 2),
|
||||
agents=agents_info
|
||||
)
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to get cluster status: {e}")
|
||||
raise HTTPException(status_code=500, detail=f"Failed to get cluster status: {str(e)}")
|
||||
|
||||
@router.get("/performance/metrics", response_model=PerformanceMetrics)
|
||||
async def get_performance_metrics(
|
||||
coordinator: DistributedCoordinator = Depends(get_coordinator)
|
||||
):
|
||||
"""
|
||||
Get comprehensive performance metrics for the distributed system
|
||||
|
||||
Returns metrics including:
|
||||
- Workflow completion rates
|
||||
- Agent performance statistics
|
||||
- Throughput measurements
|
||||
- System efficiency indicators
|
||||
"""
|
||||
try:
|
||||
# Calculate basic metrics
|
||||
total_workflows = len([task for task in coordinator.tasks.values()
|
||||
if task.type.value == "code_generation"])
|
||||
completed_workflows = len([task for task in coordinator.tasks.values()
|
||||
if task.type.value == "code_generation" and task.status == "completed"])
|
||||
failed_workflows = len([task for task in coordinator.tasks.values()
|
||||
if task.type.value == "code_generation" and task.status == "failed"])
|
||||
|
||||
# Calculate average completion time
|
||||
completed_tasks = [task for task in coordinator.tasks.values() if task.status == "completed"]
|
||||
average_completion_time = 0.0
|
||||
if completed_tasks:
|
||||
import time
|
||||
current_time = time.time()
|
||||
completion_times = [current_time - task.created_at for task in completed_tasks]
|
||||
average_completion_time = sum(completion_times) / len(completion_times)
|
||||
|
||||
# Calculate throughput (workflows per hour)
|
||||
import time
|
||||
current_time = time.time()
|
||||
recent_completions = [
|
||||
task for task in completed_tasks
|
||||
if current_time - task.created_at < 3600 # Last hour
|
||||
]
|
||||
throughput_per_hour = len(recent_completions)
|
||||
|
||||
# Agent performance metrics
|
||||
agent_performance = {}
|
||||
for agent_id, agent in coordinator.agents.items():
|
||||
performance_history = coordinator.performance_history.get(agent_id, [])
|
||||
agent_performance[agent_id] = {
|
||||
"avg_response_time": sum(performance_history) / len(performance_history) if performance_history else 0.0,
|
||||
"performance_score": agent.performance_score,
|
||||
"total_tasks": len(performance_history),
|
||||
"current_utilization": (agent.current_load / agent.max_concurrent) * 100
|
||||
}
|
||||
|
||||
return PerformanceMetrics(
|
||||
total_workflows=total_workflows,
|
||||
completed_workflows=completed_workflows,
|
||||
failed_workflows=failed_workflows,
|
||||
average_completion_time=round(average_completion_time, 2),
|
||||
throughput_per_hour=throughput_per_hour,
|
||||
agent_performance=agent_performance
|
||||
)
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to get performance metrics: {e}")
|
||||
raise HTTPException(status_code=500, detail=f"Failed to get performance metrics: {str(e)}")
|
||||
|
||||
@router.post("/workflows/{workflow_id}/cancel")
|
||||
async def cancel_workflow(
|
||||
workflow_id: str,
|
||||
coordinator: DistributedCoordinator = Depends(get_coordinator)
|
||||
):
|
||||
"""
|
||||
Cancel a running workflow and all its associated tasks
|
||||
"""
|
||||
try:
|
||||
# Find all tasks for this workflow
|
||||
workflow_tasks = [
|
||||
task for task in coordinator.tasks.values()
|
||||
if task.payload.get("workflow_id") == workflow_id
|
||||
]
|
||||
|
||||
if not workflow_tasks:
|
||||
raise HTTPException(status_code=404, detail="Workflow not found")
|
||||
|
||||
# Cancel pending and executing tasks
|
||||
cancelled_count = 0
|
||||
for task in workflow_tasks:
|
||||
if task.status in ["pending", "executing"]:
|
||||
task.status = "cancelled"
|
||||
cancelled_count += 1
|
||||
|
||||
return {
|
||||
"workflow_id": workflow_id,
|
||||
"message": f"Cancelled {cancelled_count} tasks",
|
||||
"status": "cancelled"
|
||||
}
|
||||
|
||||
except HTTPException:
|
||||
raise
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to cancel workflow: {e}")
|
||||
raise HTTPException(status_code=500, detail=f"Failed to cancel workflow: {str(e)}")
|
||||
|
||||
@router.post("/cluster/optimize")
|
||||
async def trigger_cluster_optimization(
|
||||
coordinator: DistributedCoordinator = Depends(get_coordinator)
|
||||
):
|
||||
"""
|
||||
Manually trigger cluster optimization
|
||||
|
||||
Forces immediate optimization of:
|
||||
- Agent parameter tuning
|
||||
- Load balancing adjustments
|
||||
- Performance metric updates
|
||||
"""
|
||||
try:
|
||||
# Trigger optimization methods
|
||||
await coordinator._optimize_agent_parameters()
|
||||
await coordinator._cleanup_completed_tasks()
|
||||
|
||||
return {
|
||||
"message": "Cluster optimization triggered successfully",
|
||||
"timestamp": datetime.now().isoformat()
|
||||
}
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to trigger optimization: {e}")
|
||||
raise HTTPException(status_code=500, detail=f"Failed to trigger optimization: {str(e)}")
|
||||
|
||||
@router.get("/workflows", response_model=List[Dict[str, Any]])
|
||||
async def list_workflows(
|
||||
status: Optional[str] = None,
|
||||
limit: int = 50
|
||||
):
|
||||
"""
|
||||
List all workflows with optional filtering
|
||||
|
||||
Args:
|
||||
status: Filter by workflow status (pending, executing, completed, failed)
|
||||
limit: Maximum number of workflows to return
|
||||
"""
|
||||
try:
|
||||
# Get coordinator, return empty array if not available
|
||||
try:
|
||||
coordinator = await get_coordinator()
|
||||
except HTTPException:
|
||||
return []
|
||||
|
||||
# Group tasks by workflow_id
|
||||
workflows = {}
|
||||
for task in coordinator.tasks.values():
|
||||
workflow_id = task.payload.get("workflow_id")
|
||||
if workflow_id:
|
||||
if workflow_id not in workflows:
|
||||
workflows[workflow_id] = []
|
||||
workflows[workflow_id].append(task)
|
||||
|
||||
# Build workflow summaries
|
||||
workflow_list = []
|
||||
for workflow_id, tasks in workflows.items():
|
||||
total_tasks = len(tasks)
|
||||
completed_tasks = sum(1 for task in tasks if task.status == "completed")
|
||||
failed_tasks = sum(1 for task in tasks if task.status == "failed")
|
||||
|
||||
workflow_status = "completed" if completed_tasks == total_tasks else "in_progress"
|
||||
if failed_tasks > 0:
|
||||
workflow_status = "failed"
|
||||
|
||||
# Apply status filter
|
||||
if status and workflow_status != status:
|
||||
continue
|
||||
|
||||
workflow_list.append({
|
||||
"workflow_id": workflow_id,
|
||||
"total_tasks": total_tasks,
|
||||
"completed_tasks": completed_tasks,
|
||||
"failed_tasks": failed_tasks,
|
||||
"progress": (completed_tasks / total_tasks) * 100 if total_tasks > 0 else 0,
|
||||
"status": workflow_status,
|
||||
"created_at": min(task.created_at for task in tasks)
|
||||
})
|
||||
|
||||
# Sort by creation time (newest first) and apply limit
|
||||
workflow_list.sort(key=lambda x: x["created_at"], reverse=True)
|
||||
return workflow_list[:limit]
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to list workflows: {e}")
|
||||
raise HTTPException(status_code=500, detail=f"Failed to list workflows: {str(e)}")
|
||||
|
||||
@router.get("/agents/{agent_id}/tasks", response_model=List[Dict[str, Any]])
|
||||
async def get_agent_tasks(
|
||||
agent_id: str,
|
||||
coordinator: DistributedCoordinator = Depends(get_coordinator)
|
||||
):
|
||||
"""
|
||||
Get all tasks assigned to a specific agent
|
||||
"""
|
||||
try:
|
||||
if agent_id not in coordinator.agents:
|
||||
raise HTTPException(status_code=404, detail="Agent not found")
|
||||
|
||||
agent_tasks = [
|
||||
{
|
||||
"task_id": task.id,
|
||||
"type": task.type.value,
|
||||
"status": task.status,
|
||||
"priority": task.priority.value,
|
||||
"created_at": task.created_at,
|
||||
"workflow_id": task.payload.get("workflow_id")
|
||||
}
|
||||
for task in coordinator.tasks.values()
|
||||
if task.assigned_agent == agent_id
|
||||
]
|
||||
|
||||
return agent_tasks
|
||||
|
||||
except HTTPException:
|
||||
raise
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to get agent tasks: {e}")
|
||||
raise HTTPException(status_code=500, detail=f"Failed to get agent tasks: {str(e)}")
|
||||
|
||||
# Health check endpoint for the distributed system
|
||||
@router.get("/health")
|
||||
async def health_check(coordinator: DistributedCoordinator = Depends(get_coordinator)):
|
||||
"""
|
||||
Health check for the distributed workflow system
|
||||
"""
|
||||
try:
|
||||
healthy_agents = sum(1 for agent in coordinator.agents.values()
|
||||
if agent.health_status == "healthy")
|
||||
total_agents = len(coordinator.agents)
|
||||
|
||||
system_health = "healthy" if healthy_agents > 0 else "unhealthy"
|
||||
|
||||
return {
|
||||
"status": system_health,
|
||||
"healthy_agents": healthy_agents,
|
||||
"total_agents": total_agents,
|
||||
"timestamp": datetime.now().isoformat()
|
||||
}
|
||||
|
||||
except Exception as e:
|
||||
return {
|
||||
"status": "unhealthy",
|
||||
"error": str(e),
|
||||
"timestamp": datetime.now().isoformat()
|
||||
}
|
||||
@@ -1,9 +1,45 @@
|
||||
from fastapi import APIRouter, Depends
|
||||
from ..core.auth import get_current_user
|
||||
from fastapi import APIRouter, Depends, HTTPException
|
||||
from typing import Dict, Any, List
|
||||
from app.core.auth import get_current_user
|
||||
from app.services.project_service import ProjectService
|
||||
|
||||
router = APIRouter()
|
||||
project_service = ProjectService()
|
||||
|
||||
@router.get("/projects")
|
||||
async def get_projects(current_user: dict = Depends(get_current_user)):
|
||||
"""Get all projects"""
|
||||
return {"projects": [], "total": 0, "message": "Projects endpoint ready"}
|
||||
async def get_projects(current_user: dict = Depends(get_current_user)) -> List[Dict[str, Any]]:
|
||||
"""Get all projects from the local filesystem."""
|
||||
try:
|
||||
return project_service.get_all_projects()
|
||||
except Exception as e:
|
||||
raise HTTPException(status_code=500, detail=str(e))
|
||||
|
||||
@router.get("/projects/{project_id}")
|
||||
async def get_project(project_id: str, current_user: dict = Depends(get_current_user)) -> Dict[str, Any]:
|
||||
"""Get a specific project by ID."""
|
||||
try:
|
||||
project = project_service.get_project_by_id(project_id)
|
||||
if not project:
|
||||
raise HTTPException(status_code=404, detail="Project not found")
|
||||
return project
|
||||
except Exception as e:
|
||||
raise HTTPException(status_code=500, detail=str(e))
|
||||
|
||||
@router.get("/projects/{project_id}/metrics")
|
||||
async def get_project_metrics(project_id: str, current_user: dict = Depends(get_current_user)) -> Dict[str, Any]:
|
||||
"""Get detailed metrics for a project."""
|
||||
try:
|
||||
metrics = project_service.get_project_metrics(project_id)
|
||||
if not metrics:
|
||||
raise HTTPException(status_code=404, detail="Project not found")
|
||||
return metrics
|
||||
except Exception as e:
|
||||
raise HTTPException(status_code=500, detail=str(e))
|
||||
|
||||
@router.get("/projects/{project_id}/tasks")
|
||||
async def get_project_tasks(project_id: str, current_user: dict = Depends(get_current_user)) -> List[Dict[str, Any]]:
|
||||
"""Get tasks for a project (from GitHub issues and TODOS.md)."""
|
||||
try:
|
||||
return project_service.get_project_tasks(project_id)
|
||||
except Exception as e:
|
||||
raise HTTPException(status_code=500, detail=str(e))
|
||||
@@ -1,19 +1,82 @@
|
||||
from sqlalchemy import create_engine
|
||||
from sqlalchemy import create_engine, event, text
|
||||
from sqlalchemy.ext.declarative import declarative_base
|
||||
from sqlalchemy.orm import sessionmaker
|
||||
from sqlalchemy.pool import QueuePool
|
||||
from sqlalchemy.exc import DisconnectionError
|
||||
import os
|
||||
import time
|
||||
import logging
|
||||
|
||||
# Use PostgreSQL in production, SQLite for development
|
||||
DATABASE_URL = os.getenv("DATABASE_URL", "sqlite:///./hive.db")
|
||||
# Enhanced database configuration with connection pooling
|
||||
DATABASE_URL = os.getenv("DATABASE_URL", "postgresql://postgres:hive123@hive_postgres:5432/hive")
|
||||
|
||||
# Create engine with connection pooling and reliability features
|
||||
if "sqlite" in DATABASE_URL:
|
||||
engine = create_engine(
|
||||
DATABASE_URL,
|
||||
connect_args={"check_same_thread": False},
|
||||
pool_pre_ping=True
|
||||
)
|
||||
else:
|
||||
engine = create_engine(
|
||||
DATABASE_URL,
|
||||
poolclass=QueuePool,
|
||||
pool_size=10,
|
||||
max_overflow=20,
|
||||
pool_pre_ping=True,
|
||||
pool_recycle=3600,
|
||||
echo=False
|
||||
)
|
||||
|
||||
engine = create_engine(DATABASE_URL, connect_args={"check_same_thread": False} if "sqlite" in DATABASE_URL else {})
|
||||
SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)
|
||||
|
||||
Base = declarative_base()
|
||||
|
||||
@event.listens_for(engine, "connect")
|
||||
def set_sqlite_pragma(dbapi_connection, connection_record):
|
||||
"""Set SQLite pragma for foreign key support"""
|
||||
if "sqlite" in DATABASE_URL:
|
||||
cursor = dbapi_connection.cursor()
|
||||
cursor.execute("PRAGMA foreign_keys=ON")
|
||||
cursor.close()
|
||||
|
||||
def get_db():
|
||||
"""Database session dependency with proper error handling"""
|
||||
db = SessionLocal()
|
||||
try:
|
||||
yield db
|
||||
except DisconnectionError as e:
|
||||
logging.error(f"Database disconnection error: {e}")
|
||||
db.rollback()
|
||||
raise
|
||||
except Exception as e:
|
||||
logging.error(f"Database error: {e}")
|
||||
db.rollback()
|
||||
raise
|
||||
finally:
|
||||
db.close()
|
||||
db.close()
|
||||
|
||||
def test_database_connection():
|
||||
"""Test database connectivity"""
|
||||
try:
|
||||
with engine.connect() as conn:
|
||||
result = conn.execute(text("SELECT 1"))
|
||||
return True
|
||||
except Exception as e:
|
||||
logging.error(f"Database connection test failed: {e}")
|
||||
return False
|
||||
|
||||
def init_database_with_retry(max_retries=5, retry_delay=2):
|
||||
"""Initialize database with retry logic"""
|
||||
for attempt in range(max_retries):
|
||||
try:
|
||||
Base.metadata.create_all(bind=engine)
|
||||
logging.info("Database initialized successfully")
|
||||
return True
|
||||
except Exception as e:
|
||||
if attempt == max_retries - 1:
|
||||
logging.error(f"Database initialization failed after {max_retries} attempts: {e}")
|
||||
raise
|
||||
logging.warning(f"Database initialization attempt {attempt + 1} failed: {e}")
|
||||
time.sleep(retry_delay ** attempt)
|
||||
return False
|
||||
@@ -11,6 +11,9 @@ import time
|
||||
from dataclasses import dataclass
|
||||
from typing import Dict, List, Optional, Any
|
||||
from enum import Enum
|
||||
from sqlalchemy.orm import Session
|
||||
from ..models.agent import Agent as ORMAgent
|
||||
from ..core.database import SessionLocal
|
||||
|
||||
class AgentType(Enum):
|
||||
KERNEL_DEV = "kernel_dev"
|
||||
@@ -48,9 +51,8 @@ class Task:
|
||||
created_at: float = None
|
||||
completed_at: Optional[float] = None
|
||||
|
||||
class AIDevCoordinator:
|
||||
class HiveCoordinator:
|
||||
def __init__(self):
|
||||
self.agents: Dict[str, Agent] = {}
|
||||
self.tasks: Dict[str, Task] = {}
|
||||
self.task_queue: List[Task] = []
|
||||
self.is_initialized = False
|
||||
@@ -89,9 +91,20 @@ FOCUS:[full-coverage]→[test+measure+handle+automate]"""
|
||||
}
|
||||
|
||||
def add_agent(self, agent: Agent):
|
||||
"""Register a new agent"""
|
||||
self.agents[agent.id] = agent
|
||||
print(f"Registered agent {agent.id} ({agent.specialty.value}) at {agent.endpoint}")
|
||||
"""Register a new agent and persist to database"""
|
||||
with SessionLocal() as db:
|
||||
db_agent = ORMAgent(
|
||||
id=agent.id,
|
||||
endpoint=agent.endpoint,
|
||||
model=agent.model,
|
||||
specialty=agent.specialty.value,
|
||||
max_concurrent=agent.max_concurrent,
|
||||
current_tasks=agent.current_tasks
|
||||
)
|
||||
db.add(db_agent)
|
||||
db.commit()
|
||||
db.refresh(db_agent)
|
||||
print(f"Registered agent {agent.id} ({agent.specialty.value}) at {agent.endpoint} and persisted to DB")
|
||||
|
||||
def create_task(self, task_type: AgentType, context: Dict, priority: int = 3) -> Task:
|
||||
"""Create a new development task"""
|
||||
@@ -111,16 +124,37 @@ FOCUS:[full-coverage]→[test+measure+handle+automate]"""
|
||||
return task
|
||||
|
||||
def get_available_agent(self, task_type: AgentType) -> Optional[Agent]:
|
||||
"""Find an available agent for the task type"""
|
||||
available_agents = [
|
||||
agent for agent in self.agents.values()
|
||||
if agent.specialty == task_type and agent.current_tasks < agent.max_concurrent
|
||||
]
|
||||
return available_agents[0] if available_agents else None
|
||||
"""Find an available agent for the task type from the database"""
|
||||
with SessionLocal() as db:
|
||||
available_agents_orm = db.query(ORMAgent).filter(
|
||||
ORMAgent.specialty == task_type.value,
|
||||
ORMAgent.current_tasks < ORMAgent.max_concurrent
|
||||
).all()
|
||||
|
||||
if available_agents_orm:
|
||||
# Convert ORM agent to dataclass Agent
|
||||
db_agent = available_agents_orm[0]
|
||||
return Agent(
|
||||
id=db_agent.id,
|
||||
endpoint=db_agent.endpoint,
|
||||
model=db_agent.model,
|
||||
specialty=AgentType(db_agent.specialty),
|
||||
max_concurrent=db_agent.max_concurrent,
|
||||
current_tasks=db_agent.current_tasks
|
||||
)
|
||||
return None
|
||||
|
||||
async def execute_task(self, task: Task, agent: Agent) -> Dict:
|
||||
"""Execute a task on a specific agent"""
|
||||
agent.current_tasks += 1
|
||||
"""Execute a task on a specific agent with improved error handling"""
|
||||
with SessionLocal() as db:
|
||||
db_agent = db.query(ORMAgent).filter(ORMAgent.id == agent.id).first()
|
||||
if db_agent:
|
||||
db_agent.current_tasks += 1
|
||||
db.add(db_agent)
|
||||
db.commit()
|
||||
db.refresh(db_agent)
|
||||
agent.current_tasks = db_agent.current_tasks # Update in-memory object
|
||||
|
||||
task.status = TaskStatus.IN_PROGRESS
|
||||
task.assigned_agent = agent.id
|
||||
|
||||
@@ -146,18 +180,32 @@ Complete task → respond JSON format specified above."""
|
||||
}
|
||||
|
||||
try:
|
||||
async with aiohttp.ClientSession() as session:
|
||||
async with session.post(f"{agent.endpoint}/api/generate", json=payload) as response:
|
||||
if response.status == 200:
|
||||
result = await response.json()
|
||||
task.result = result
|
||||
task.status = TaskStatus.COMPLETED
|
||||
task.completed_at = time.time()
|
||||
print(f"Task {task.id} completed by {agent.id}")
|
||||
return result
|
||||
else:
|
||||
raise Exception(f"HTTP {response.status}: {await response.text()}")
|
||||
# Use the session initialized in the coordinator
|
||||
session = getattr(self, 'session', None)
|
||||
if not session:
|
||||
raise Exception("HTTP session not initialized")
|
||||
|
||||
async with session.post(
|
||||
f"{agent.endpoint}/api/generate",
|
||||
json=payload,
|
||||
timeout=aiohttp.ClientTimeout(total=300) # 5 minute timeout for AI tasks
|
||||
) as response:
|
||||
if response.status == 200:
|
||||
result = await response.json()
|
||||
task.result = result
|
||||
task.status = TaskStatus.COMPLETED
|
||||
task.completed_at = time.time()
|
||||
print(f"Task {task.id} completed by {agent.id}")
|
||||
return result
|
||||
else:
|
||||
error_text = await response.text()
|
||||
raise Exception(f"HTTP {response.status}: {error_text}")
|
||||
|
||||
except asyncio.TimeoutError:
|
||||
task.status = TaskStatus.FAILED
|
||||
task.result = {"error": "Task execution timeout"}
|
||||
print(f"Task {task.id} timed out on {agent.id}")
|
||||
return {"error": "Task execution timeout"}
|
||||
except Exception as e:
|
||||
task.status = TaskStatus.FAILED
|
||||
task.result = {"error": str(e)}
|
||||
@@ -165,7 +213,14 @@ Complete task → respond JSON format specified above."""
|
||||
return {"error": str(e)}
|
||||
|
||||
finally:
|
||||
agent.current_tasks -= 1
|
||||
with SessionLocal() as db:
|
||||
db_agent = db.query(ORMAgent).filter(ORMAgent.id == agent.id).first()
|
||||
if db_agent:
|
||||
db_agent.current_tasks -= 1
|
||||
db.add(db_agent)
|
||||
db.commit()
|
||||
db.refresh(db_agent)
|
||||
agent.current_tasks = db_agent.current_tasks # Update in-memory object
|
||||
|
||||
async def process_queue(self):
|
||||
"""Process the task queue with available agents"""
|
||||
@@ -246,8 +301,10 @@ Complete task → respond JSON format specified above."""
|
||||
completion_rate = completed / total_tasks if total_tasks > 0 else 0
|
||||
|
||||
agent_vectors = {}
|
||||
for agent in self.agents.values():
|
||||
agent_vectors[agent.id] = f"[{agent.specialty.value}@{agent.current_tasks}/{agent.max_concurrent}]"
|
||||
with SessionLocal() as db:
|
||||
db_agents = db.query(ORMAgent).all()
|
||||
for agent in db_agents:
|
||||
agent_vectors[agent.id] = f"[{agent.specialty}@{agent.current_tasks}/{agent.max_concurrent}]"
|
||||
|
||||
return {
|
||||
"status_vector": status_vector,
|
||||
@@ -259,26 +316,94 @@ Complete task → respond JSON format specified above."""
|
||||
"failed": failed,
|
||||
"in_progress": in_progress,
|
||||
"pending": total_tasks - completed - failed - in_progress,
|
||||
"agents": {agent.id: agent.current_tasks for agent in self.agents.values()}
|
||||
"agents": {agent.id: agent.current_tasks for agent in db_agents}
|
||||
}
|
||||
|
||||
async def initialize(self):
|
||||
"""Initialize the coordinator"""
|
||||
print("Initializing Hive Coordinator...")
|
||||
self.is_initialized = True
|
||||
print("✅ Hive Coordinator initialized")
|
||||
"""Initialize the coordinator with proper error handling"""
|
||||
try:
|
||||
print("Initializing Hive Coordinator...")
|
||||
|
||||
# Initialize HTTP client session with timeouts
|
||||
self.session = aiohttp.ClientSession(
|
||||
timeout=aiohttp.ClientTimeout(total=30, connect=10),
|
||||
connector=aiohttp.TCPConnector(
|
||||
limit=100,
|
||||
limit_per_host=30,
|
||||
ttl_dns_cache=300,
|
||||
use_dns_cache=True
|
||||
)
|
||||
)
|
||||
|
||||
# Initialize task processing
|
||||
self.task_processor = None
|
||||
|
||||
# Test connectivity to any configured agents
|
||||
await self._test_initial_connectivity()
|
||||
|
||||
self.is_initialized = True
|
||||
print("✅ Hive Coordinator initialized")
|
||||
|
||||
except Exception as e:
|
||||
print(f"❌ Coordinator initialization failed: {e}")
|
||||
self.is_initialized = False
|
||||
# Clean up any partial initialization
|
||||
if hasattr(self, 'session') and self.session:
|
||||
await self.session.close()
|
||||
raise
|
||||
|
||||
async def shutdown(self):
|
||||
"""Shutdown the coordinator"""
|
||||
"""Enhanced shutdown with proper cleanup"""
|
||||
print("Shutting down Hive Coordinator...")
|
||||
self.is_initialized = False
|
||||
print("✅ Hive Coordinator shutdown")
|
||||
|
||||
try:
|
||||
# Cancel any running tasks
|
||||
running_tasks = [task for task in self.tasks.values() if task.status == TaskStatus.IN_PROGRESS]
|
||||
if running_tasks:
|
||||
print(f"Canceling {len(running_tasks)} running tasks...")
|
||||
for task in running_tasks:
|
||||
task.status = TaskStatus.FAILED
|
||||
task.result = {"error": "Coordinator shutdown"}
|
||||
|
||||
# Close HTTP session
|
||||
if hasattr(self, 'session') and self.session:
|
||||
await self.session.close()
|
||||
|
||||
# Stop task processor
|
||||
if hasattr(self, 'task_processor') and self.task_processor:
|
||||
self.task_processor.cancel()
|
||||
try:
|
||||
await self.task_processor
|
||||
except asyncio.CancelledError:
|
||||
pass
|
||||
|
||||
self.is_initialized = False
|
||||
print("✅ Hive Coordinator shutdown")
|
||||
|
||||
except Exception as e:
|
||||
print(f"❌ Shutdown error: {e}")
|
||||
self.is_initialized = False
|
||||
|
||||
async def _test_initial_connectivity(self):
|
||||
"""Test initial connectivity to prevent startup issues"""
|
||||
# This would test any pre-configured agents
|
||||
# For now, just ensure we can make HTTP requests
|
||||
try:
|
||||
async with self.session.get('http://httpbin.org/get', timeout=5) as response:
|
||||
if response.status == 200:
|
||||
print("✅ HTTP connectivity test passed")
|
||||
except Exception as e:
|
||||
print(f"⚠️ HTTP connectivity test failed: {e}")
|
||||
# Don't fail initialization for this, just warn
|
||||
|
||||
async def get_health_status(self):
|
||||
"""Get health status"""
|
||||
with SessionLocal() as db:
|
||||
db_agents = db.query(ORMAgent).all()
|
||||
agents_status = {agent.id: "available" for agent in db_agents}
|
||||
return {
|
||||
"status": "healthy" if self.is_initialized else "unhealthy",
|
||||
"agents": {agent.id: "available" for agent in self.agents.values()},
|
||||
"agents": agents_status,
|
||||
"tasks": {
|
||||
"pending": len([t for t in self.tasks.values() if t.status == TaskStatus.PENDING]),
|
||||
"running": len([t for t in self.tasks.values() if t.status == TaskStatus.IN_PROGRESS]),
|
||||
@@ -289,6 +414,12 @@ Complete task → respond JSON format specified above."""
|
||||
|
||||
async def get_comprehensive_status(self):
|
||||
"""Get comprehensive system status"""
|
||||
with SessionLocal() as db:
|
||||
db_agents = db.query(ORMAgent).all()
|
||||
total_agents = len(db_agents)
|
||||
available_agents = len([a for a in db_agents if a.current_tasks < a.max_concurrent])
|
||||
busy_agents = len([a for a in db_agents if a.current_tasks >= a.max_concurrent])
|
||||
|
||||
return {
|
||||
"system": {
|
||||
"status": "operational" if self.is_initialized else "initializing",
|
||||
@@ -296,9 +427,19 @@ Complete task → respond JSON format specified above."""
|
||||
"version": "1.0.0"
|
||||
},
|
||||
"agents": {
|
||||
"total": len(self.agents),
|
||||
"available": len([a for a in self.agents.values() if a.current_tasks < a.max_concurrent]),
|
||||
"busy": len([a for a in self.agents.values() if a.current_tasks >= a.max_concurrent])
|
||||
"total": total_agents,
|
||||
"available": available_agents,
|
||||
"busy": busy_agents,
|
||||
"details": [
|
||||
{
|
||||
"id": agent.id,
|
||||
"endpoint": agent.endpoint,
|
||||
"model": agent.model,
|
||||
"specialty": agent.specialty,
|
||||
"max_concurrent": agent.max_concurrent,
|
||||
"current_tasks": agent.current_tasks
|
||||
} for agent in db_agents
|
||||
]
|
||||
},
|
||||
"tasks": {
|
||||
"total": len(self.tasks),
|
||||
@@ -313,9 +454,14 @@ Complete task → respond JSON format specified above."""
|
||||
"""Get Prometheus formatted metrics"""
|
||||
metrics = []
|
||||
|
||||
with SessionLocal() as db:
|
||||
db_agents = db.query(ORMAgent).all()
|
||||
total_agents = len(db_agents)
|
||||
available_agents = len([a for a in db_agents if a.current_tasks < a.max_concurrent])
|
||||
|
||||
# Agent metrics
|
||||
metrics.append(f"hive_agents_total {len(self.agents)}")
|
||||
metrics.append(f"hive_agents_available {len([a for a in self.agents.values() if a.current_tasks < a.max_concurrent])}")
|
||||
metrics.append(f"hive_agents_total {total_agents}")
|
||||
metrics.append(f"hive_agents_available {available_agents}")
|
||||
|
||||
# Task metrics
|
||||
metrics.append(f"hive_tasks_total {len(self.tasks)}")
|
||||
@@ -329,7 +475,7 @@ Complete task → respond JSON format specified above."""
|
||||
# Example usage and testing functions
|
||||
async def demo_coordination():
|
||||
"""Demonstrate the coordination system"""
|
||||
coordinator = AIDevCoordinator()
|
||||
coordinator = HiveCoordinator()
|
||||
|
||||
# Add example agents (you'll replace with your actual endpoints)
|
||||
coordinator.add_agent(Agent(
|
||||
|
||||
Reference in New Issue
Block a user