 fc0eec91ef
			
		
	
	fc0eec91ef
	
	
	
		
			
			Major Features Added: - Fix Socket.IO connectivity by updating Dockerfile to use socket_app - Resolve distributed workflows API to return arrays instead of errors - Expand agent coverage from 3 to 7 agents (added OAK and ROSEWOOD) - Create comprehensive systemd service for MCP server with auto-discovery - Add daemon mode with periodic agent discovery every 5 minutes - Implement comprehensive test suite with 100% pass rate Infrastructure Improvements: - Enhanced database connection handling with retry logic - Improved agent registration with persistent storage - Added proper error handling for distributed workflows endpoint - Created management scripts for service lifecycle operations Agent Cluster Expansion: - ACACIA: deepseek-r1:7b (kernel_dev) - WALNUT: starcoder2:15b (pytorch_dev) - IRONWOOD: deepseek-coder-v2 (profiler) - OAK: codellama:latest (docs_writer) - OAK-TESTER: deepseek-r1:latest (tester) - ROSEWOOD: deepseek-coder-v2:latest (kernel_dev) - ROSEWOOD-VISION: llama3.2-vision:11b (tester) System Status: All 7 agents healthy, Socket.IO operational, MCP server fully functional 🤖 Generated with [Claude Code](https://claude.ai/code) Co-Authored-By: Claude <noreply@anthropic.com>
		
			
				
	
	
		
			499 lines
		
	
	
		
			18 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			499 lines
		
	
	
		
			18 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
| """
 | |
| Distributed Workflow API Endpoints
 | |
| RESTful API for managing distributed development workflows across the cluster
 | |
| """
 | |
| 
 | |
| from fastapi import APIRouter, HTTPException, BackgroundTasks, Depends, Request
 | |
| from pydantic import BaseModel, Field
 | |
| from typing import Dict, Any, List, Optional
 | |
| import asyncio
 | |
| import logging
 | |
| from datetime import datetime
 | |
| 
 | |
| from ..core.distributed_coordinator import DistributedCoordinator, TaskType, TaskPriority
 | |
| from ..core.hive_coordinator import HiveCoordinator
 | |
| 
 | |
| logger = logging.getLogger(__name__)
 | |
| 
 | |
| router = APIRouter(prefix="/api/distributed", tags=["distributed-workflows"])
 | |
| 
 | |
| # Global coordinator instance
 | |
| distributed_coordinator: Optional[DistributedCoordinator] = None
 | |
| 
 | |
| class WorkflowRequest(BaseModel):
 | |
|     """Request model for workflow submission"""
 | |
|     name: str = Field(..., description="Workflow name")
 | |
|     description: str = Field(..., description="Workflow description")
 | |
|     requirements: str = Field(..., description="Development requirements")
 | |
|     context: str = Field(default="", description="Additional context")
 | |
|     language: str = Field(default="python", description="Target programming language")
 | |
|     test_types: List[str] = Field(default=["unit", "integration"], description="Types of tests to generate")
 | |
|     optimization_targets: List[str] = Field(default=["performance", "memory"], description="Optimization focus areas")
 | |
|     build_config: Dict[str, Any] = Field(default_factory=dict, description="Build configuration")
 | |
|     priority: str = Field(default="normal", description="Workflow priority (critical, high, normal, low)")
 | |
| 
 | |
| class TaskStatus(BaseModel):
 | |
|     """Task status model"""
 | |
|     id: str
 | |
|     type: str
 | |
|     status: str
 | |
|     assigned_agent: Optional[str]
 | |
|     execution_time: float
 | |
|     result: Optional[Dict[str, Any]] = None
 | |
| 
 | |
| class WorkflowStatus(BaseModel):
 | |
|     """Workflow status response model"""
 | |
|     workflow_id: str
 | |
|     name: str
 | |
|     total_tasks: int
 | |
|     completed_tasks: int
 | |
|     failed_tasks: int
 | |
|     progress: float
 | |
|     status: str
 | |
|     created_at: datetime
 | |
|     tasks: List[TaskStatus]
 | |
| 
 | |
| class ClusterStatus(BaseModel):
 | |
|     """Cluster status model"""
 | |
|     total_agents: int
 | |
|     healthy_agents: int
 | |
|     total_capacity: int
 | |
|     current_load: int
 | |
|     utilization: float
 | |
|     agents: List[Dict[str, Any]]
 | |
| 
 | |
| class PerformanceMetrics(BaseModel):
 | |
|     """Performance metrics model"""
 | |
|     total_workflows: int
 | |
|     completed_workflows: int
 | |
|     failed_workflows: int
 | |
|     average_completion_time: float
 | |
|     throughput_per_hour: float
 | |
|     agent_performance: Dict[str, Dict[str, float]]
 | |
| 
 | |
| async def get_coordinator() -> DistributedCoordinator:
 | |
|     """Dependency to get the distributed coordinator instance"""
 | |
|     # Import here to avoid circular imports
 | |
|     from ..main import distributed_coordinator as main_coordinator
 | |
|     if main_coordinator is None:
 | |
|         raise HTTPException(status_code=503, detail="Distributed coordinator not initialized")
 | |
|     return main_coordinator
 | |
| 
 | |
| @router.on_event("startup")
 | |
| async def startup_distributed_coordinator():
 | |
|     """Initialize the distributed coordinator on startup"""
 | |
|     global distributed_coordinator
 | |
|     try:
 | |
|         distributed_coordinator = DistributedCoordinator()
 | |
|         await distributed_coordinator.start()
 | |
|         logger.info("Distributed coordinator started successfully")
 | |
|     except Exception as e:
 | |
|         logger.error(f"Failed to start distributed coordinator: {e}")
 | |
|         raise
 | |
| 
 | |
| @router.on_event("shutdown")
 | |
| async def shutdown_distributed_coordinator():
 | |
|     """Shutdown the distributed coordinator"""
 | |
|     global distributed_coordinator
 | |
|     if distributed_coordinator:
 | |
|         await distributed_coordinator.stop()
 | |
|         logger.info("Distributed coordinator stopped")
 | |
| 
 | |
| @router.post("/workflows", response_model=Dict[str, str])
 | |
| async def submit_workflow(
 | |
|     workflow: WorkflowRequest,
 | |
|     background_tasks: BackgroundTasks,
 | |
|     coordinator: DistributedCoordinator = Depends(get_coordinator)
 | |
| ):
 | |
|     """
 | |
|     Submit a new development workflow for distributed execution
 | |
|     
 | |
|     This endpoint creates a complete development workflow that includes:
 | |
|     - Code generation
 | |
|     - Code review
 | |
|     - Testing
 | |
|     - Compilation
 | |
|     - Optimization
 | |
|     
 | |
|     The workflow is distributed across the cluster based on agent capabilities.
 | |
|     """
 | |
|     try:
 | |
|         # Convert priority string to enum
 | |
|         priority_map = {
 | |
|             "critical": TaskPriority.CRITICAL,
 | |
|             "high": TaskPriority.HIGH,
 | |
|             "normal": TaskPriority.NORMAL,
 | |
|             "low": TaskPriority.LOW
 | |
|         }
 | |
|         
 | |
|         workflow_dict = {
 | |
|             "name": workflow.name,
 | |
|             "description": workflow.description,
 | |
|             "requirements": workflow.requirements,
 | |
|             "context": workflow.context,
 | |
|             "language": workflow.language,
 | |
|             "test_types": workflow.test_types,
 | |
|             "optimization_targets": workflow.optimization_targets,
 | |
|             "build_config": workflow.build_config,
 | |
|             "priority": priority_map.get(workflow.priority, TaskPriority.NORMAL)
 | |
|         }
 | |
|         
 | |
|         workflow_id = await coordinator.submit_workflow(workflow_dict)
 | |
|         
 | |
|         return {
 | |
|             "workflow_id": workflow_id,
 | |
|             "message": "Workflow submitted successfully",
 | |
|             "status": "accepted"
 | |
|         }
 | |
|         
 | |
|     except Exception as e:
 | |
|         logger.error(f"Failed to submit workflow: {e}")
 | |
|         raise HTTPException(status_code=500, detail=f"Failed to submit workflow: {str(e)}")
 | |
| 
 | |
| @router.get("/workflows/{workflow_id}", response_model=WorkflowStatus)
 | |
| async def get_workflow_status(
 | |
|     workflow_id: str,
 | |
|     coordinator: DistributedCoordinator = Depends(get_coordinator)
 | |
| ):
 | |
|     """
 | |
|     Get detailed status of a specific workflow
 | |
|     
 | |
|     Returns comprehensive information about workflow progress,
 | |
|     individual task status, and execution metrics.
 | |
|     """
 | |
|     try:
 | |
|         status = await coordinator.get_workflow_status(workflow_id)
 | |
|         
 | |
|         if "error" in status:
 | |
|             raise HTTPException(status_code=404, detail=status["error"])
 | |
|         
 | |
|         return WorkflowStatus(
 | |
|             workflow_id=status["workflow_id"],
 | |
|             name=f"Workflow {workflow_id}",  # Could be enhanced with actual name storage
 | |
|             total_tasks=status["total_tasks"],
 | |
|             completed_tasks=status["completed_tasks"],
 | |
|             failed_tasks=status["failed_tasks"],
 | |
|             progress=status["progress"],
 | |
|             status=status["status"],
 | |
|             created_at=datetime.now(),  # Could be enhanced with actual creation time
 | |
|             tasks=[
 | |
|                 TaskStatus(
 | |
|                     id=task["id"],
 | |
|                     type=task["type"],
 | |
|                     status=task["status"],
 | |
|                     assigned_agent=task["assigned_agent"],
 | |
|                     execution_time=task["execution_time"],
 | |
|                     result=None  # Could include task results if needed
 | |
|                 )
 | |
|                 for task in status["tasks"]
 | |
|             ]
 | |
|         )
 | |
|         
 | |
|     except HTTPException:
 | |
|         raise
 | |
|     except Exception as e:
 | |
|         logger.error(f"Failed to get workflow status: {e}")
 | |
|         raise HTTPException(status_code=500, detail=f"Failed to get workflow status: {str(e)}")
 | |
| 
 | |
| @router.get("/cluster/status", response_model=ClusterStatus)
 | |
| async def get_cluster_status(
 | |
|     coordinator: DistributedCoordinator = Depends(get_coordinator)
 | |
| ):
 | |
|     """
 | |
|     Get current cluster status and agent information
 | |
|     
 | |
|     Returns real-time information about all agents including:
 | |
|     - Health status
 | |
|     - Current load
 | |
|     - Performance metrics
 | |
|     - Specializations
 | |
|     """
 | |
|     try:
 | |
|         agents_info = []
 | |
|         total_capacity = 0
 | |
|         current_load = 0
 | |
|         healthy_agents = 0
 | |
|         
 | |
|         for agent in coordinator.agents.values():
 | |
|             if agent.health_status == "healthy":
 | |
|                 healthy_agents += 1
 | |
|             
 | |
|             total_capacity += agent.max_concurrent
 | |
|             current_load += agent.current_load
 | |
|             
 | |
|             agents_info.append({
 | |
|                 "id": agent.id,
 | |
|                 "endpoint": agent.endpoint,
 | |
|                 "model": agent.model,
 | |
|                 "gpu_type": agent.gpu_type,
 | |
|                 "specializations": [spec.value for spec in agent.specializations],
 | |
|                 "max_concurrent": agent.max_concurrent,
 | |
|                 "current_load": agent.current_load,
 | |
|                 "utilization": (agent.current_load / agent.max_concurrent) * 100,
 | |
|                 "performance_score": round(agent.performance_score, 3),
 | |
|                 "last_response_time": round(agent.last_response_time, 2),
 | |
|                 "health_status": agent.health_status
 | |
|             })
 | |
|         
 | |
|         utilization = (current_load / total_capacity) * 100 if total_capacity > 0 else 0
 | |
|         
 | |
|         return ClusterStatus(
 | |
|             total_agents=len(coordinator.agents),
 | |
|             healthy_agents=healthy_agents,
 | |
|             total_capacity=total_capacity,
 | |
|             current_load=current_load,
 | |
|             utilization=round(utilization, 2),
 | |
|             agents=agents_info
 | |
|         )
 | |
|         
 | |
|     except Exception as e:
 | |
|         logger.error(f"Failed to get cluster status: {e}")
 | |
|         raise HTTPException(status_code=500, detail=f"Failed to get cluster status: {str(e)}")
 | |
| 
 | |
| @router.get("/performance/metrics", response_model=PerformanceMetrics)
 | |
| async def get_performance_metrics(
 | |
|     coordinator: DistributedCoordinator = Depends(get_coordinator)
 | |
| ):
 | |
|     """
 | |
|     Get comprehensive performance metrics for the distributed system
 | |
|     
 | |
|     Returns metrics including:
 | |
|     - Workflow completion rates
 | |
|     - Agent performance statistics
 | |
|     - Throughput measurements
 | |
|     - System efficiency indicators
 | |
|     """
 | |
|     try:
 | |
|         # Calculate basic metrics
 | |
|         total_workflows = len([task for task in coordinator.tasks.values() 
 | |
|                               if task.type.value == "code_generation"])
 | |
|         completed_workflows = len([task for task in coordinator.tasks.values() 
 | |
|                                   if task.type.value == "code_generation" and task.status == "completed"])
 | |
|         failed_workflows = len([task for task in coordinator.tasks.values() 
 | |
|                                if task.type.value == "code_generation" and task.status == "failed"])
 | |
|         
 | |
|         # Calculate average completion time
 | |
|         completed_tasks = [task for task in coordinator.tasks.values() if task.status == "completed"]
 | |
|         average_completion_time = 0.0
 | |
|         if completed_tasks:
 | |
|             import time
 | |
|             current_time = time.time()
 | |
|             completion_times = [current_time - task.created_at for task in completed_tasks]
 | |
|             average_completion_time = sum(completion_times) / len(completion_times)
 | |
|         
 | |
|         # Calculate throughput (workflows per hour)
 | |
|         import time
 | |
|         current_time = time.time()
 | |
|         recent_completions = [
 | |
|             task for task in completed_tasks 
 | |
|             if current_time - task.created_at < 3600  # Last hour
 | |
|         ]
 | |
|         throughput_per_hour = len(recent_completions)
 | |
|         
 | |
|         # Agent performance metrics
 | |
|         agent_performance = {}
 | |
|         for agent_id, agent in coordinator.agents.items():
 | |
|             performance_history = coordinator.performance_history.get(agent_id, [])
 | |
|             agent_performance[agent_id] = {
 | |
|                 "avg_response_time": sum(performance_history) / len(performance_history) if performance_history else 0.0,
 | |
|                 "performance_score": agent.performance_score,
 | |
|                 "total_tasks": len(performance_history),
 | |
|                 "current_utilization": (agent.current_load / agent.max_concurrent) * 100
 | |
|             }
 | |
|         
 | |
|         return PerformanceMetrics(
 | |
|             total_workflows=total_workflows,
 | |
|             completed_workflows=completed_workflows,
 | |
|             failed_workflows=failed_workflows,
 | |
|             average_completion_time=round(average_completion_time, 2),
 | |
|             throughput_per_hour=throughput_per_hour,
 | |
|             agent_performance=agent_performance
 | |
|         )
 | |
|         
 | |
|     except Exception as e:
 | |
|         logger.error(f"Failed to get performance metrics: {e}")
 | |
|         raise HTTPException(status_code=500, detail=f"Failed to get performance metrics: {str(e)}")
 | |
| 
 | |
| @router.post("/workflows/{workflow_id}/cancel")
 | |
| async def cancel_workflow(
 | |
|     workflow_id: str,
 | |
|     coordinator: DistributedCoordinator = Depends(get_coordinator)
 | |
| ):
 | |
|     """
 | |
|     Cancel a running workflow and all its associated tasks
 | |
|     """
 | |
|     try:
 | |
|         # Find all tasks for this workflow
 | |
|         workflow_tasks = [
 | |
|             task for task in coordinator.tasks.values()
 | |
|             if task.payload.get("workflow_id") == workflow_id
 | |
|         ]
 | |
|         
 | |
|         if not workflow_tasks:
 | |
|             raise HTTPException(status_code=404, detail="Workflow not found")
 | |
|         
 | |
|         # Cancel pending and executing tasks
 | |
|         cancelled_count = 0
 | |
|         for task in workflow_tasks:
 | |
|             if task.status in ["pending", "executing"]:
 | |
|                 task.status = "cancelled"
 | |
|                 cancelled_count += 1
 | |
|         
 | |
|         return {
 | |
|             "workflow_id": workflow_id,
 | |
|             "message": f"Cancelled {cancelled_count} tasks",
 | |
|             "status": "cancelled"
 | |
|         }
 | |
|         
 | |
|     except HTTPException:
 | |
|         raise
 | |
|     except Exception as e:
 | |
|         logger.error(f"Failed to cancel workflow: {e}")
 | |
|         raise HTTPException(status_code=500, detail=f"Failed to cancel workflow: {str(e)}")
 | |
| 
 | |
| @router.post("/cluster/optimize")
 | |
| async def trigger_cluster_optimization(
 | |
|     coordinator: DistributedCoordinator = Depends(get_coordinator)
 | |
| ):
 | |
|     """
 | |
|     Manually trigger cluster optimization
 | |
|     
 | |
|     Forces immediate optimization of:
 | |
|     - Agent parameter tuning
 | |
|     - Load balancing adjustments
 | |
|     - Performance metric updates
 | |
|     """
 | |
|     try:
 | |
|         # Trigger optimization methods
 | |
|         await coordinator._optimize_agent_parameters()
 | |
|         await coordinator._cleanup_completed_tasks()
 | |
|         
 | |
|         return {
 | |
|             "message": "Cluster optimization triggered successfully",
 | |
|             "timestamp": datetime.now().isoformat()
 | |
|         }
 | |
|         
 | |
|     except Exception as e:
 | |
|         logger.error(f"Failed to trigger optimization: {e}")
 | |
|         raise HTTPException(status_code=500, detail=f"Failed to trigger optimization: {str(e)}")
 | |
| 
 | |
| @router.get("/workflows", response_model=List[Dict[str, Any]])
 | |
| async def list_workflows(
 | |
|     status: Optional[str] = None,
 | |
|     limit: int = 50
 | |
| ):
 | |
|     """
 | |
|     List all workflows with optional filtering
 | |
|     
 | |
|     Args:
 | |
|         status: Filter by workflow status (pending, executing, completed, failed)
 | |
|         limit: Maximum number of workflows to return
 | |
|     """
 | |
|     try:
 | |
|         # Get coordinator, return empty array if not available
 | |
|         try:
 | |
|             coordinator = await get_coordinator()
 | |
|         except HTTPException:
 | |
|             return []
 | |
|         
 | |
|         # Group tasks by workflow_id
 | |
|         workflows = {}
 | |
|         for task in coordinator.tasks.values():
 | |
|             workflow_id = task.payload.get("workflow_id")
 | |
|             if workflow_id:
 | |
|                 if workflow_id not in workflows:
 | |
|                     workflows[workflow_id] = []
 | |
|                 workflows[workflow_id].append(task)
 | |
|         
 | |
|         # Build workflow summaries
 | |
|         workflow_list = []
 | |
|         for workflow_id, tasks in workflows.items():
 | |
|             total_tasks = len(tasks)
 | |
|             completed_tasks = sum(1 for task in tasks if task.status == "completed")
 | |
|             failed_tasks = sum(1 for task in tasks if task.status == "failed")
 | |
|             
 | |
|             workflow_status = "completed" if completed_tasks == total_tasks else "in_progress"
 | |
|             if failed_tasks > 0:
 | |
|                 workflow_status = "failed"
 | |
|             
 | |
|             # Apply status filter
 | |
|             if status and workflow_status != status:
 | |
|                 continue
 | |
|             
 | |
|             workflow_list.append({
 | |
|                 "workflow_id": workflow_id,
 | |
|                 "total_tasks": total_tasks,
 | |
|                 "completed_tasks": completed_tasks,
 | |
|                 "failed_tasks": failed_tasks,
 | |
|                 "progress": (completed_tasks / total_tasks) * 100 if total_tasks > 0 else 0,
 | |
|                 "status": workflow_status,
 | |
|                 "created_at": min(task.created_at for task in tasks)
 | |
|             })
 | |
|         
 | |
|         # Sort by creation time (newest first) and apply limit
 | |
|         workflow_list.sort(key=lambda x: x["created_at"], reverse=True)
 | |
|         return workflow_list[:limit]
 | |
|         
 | |
|     except Exception as e:
 | |
|         logger.error(f"Failed to list workflows: {e}")
 | |
|         raise HTTPException(status_code=500, detail=f"Failed to list workflows: {str(e)}")
 | |
| 
 | |
| @router.get("/agents/{agent_id}/tasks", response_model=List[Dict[str, Any]])
 | |
| async def get_agent_tasks(
 | |
|     agent_id: str,
 | |
|     coordinator: DistributedCoordinator = Depends(get_coordinator)
 | |
| ):
 | |
|     """
 | |
|     Get all tasks assigned to a specific agent
 | |
|     """
 | |
|     try:
 | |
|         if agent_id not in coordinator.agents:
 | |
|             raise HTTPException(status_code=404, detail="Agent not found")
 | |
|         
 | |
|         agent_tasks = [
 | |
|             {
 | |
|                 "task_id": task.id,
 | |
|                 "type": task.type.value,
 | |
|                 "status": task.status,
 | |
|                 "priority": task.priority.value,
 | |
|                 "created_at": task.created_at,
 | |
|                 "workflow_id": task.payload.get("workflow_id")
 | |
|             }
 | |
|             for task in coordinator.tasks.values()
 | |
|             if task.assigned_agent == agent_id
 | |
|         ]
 | |
|         
 | |
|         return agent_tasks
 | |
|         
 | |
|     except HTTPException:
 | |
|         raise
 | |
|     except Exception as e:
 | |
|         logger.error(f"Failed to get agent tasks: {e}")
 | |
|         raise HTTPException(status_code=500, detail=f"Failed to get agent tasks: {str(e)}")
 | |
| 
 | |
| # Health check endpoint for the distributed system
 | |
| @router.get("/health")
 | |
| async def health_check(coordinator: DistributedCoordinator = Depends(get_coordinator)):
 | |
|     """
 | |
|     Health check for the distributed workflow system
 | |
|     """
 | |
|     try:
 | |
|         healthy_agents = sum(1 for agent in coordinator.agents.values() 
 | |
|                            if agent.health_status == "healthy")
 | |
|         total_agents = len(coordinator.agents)
 | |
|         
 | |
|         system_health = "healthy" if healthy_agents > 0 else "unhealthy"
 | |
|         
 | |
|         return {
 | |
|             "status": system_health,
 | |
|             "healthy_agents": healthy_agents,
 | |
|             "total_agents": total_agents,
 | |
|             "timestamp": datetime.now().isoformat()
 | |
|         }
 | |
|         
 | |
|     except Exception as e:
 | |
|         return {
 | |
|             "status": "unhealthy",
 | |
|             "error": str(e),
 | |
|             "timestamp": datetime.now().isoformat()
 | |
|         } |