Refactor UnifiedCoordinator to follow Single Responsibility Principle

- Create dedicated service classes for separated concerns:
  * AgentService: Agent management and health monitoring
  * WorkflowService: Workflow parsing and execution tracking
  * PerformanceService: Metrics and load balancing
  * BackgroundService: Background processes and cleanup
  * TaskService: Database persistence (already existed)

- Refactor UnifiedCoordinator into UnifiedCoordinatorRefactored
  * Clean separation of responsibilities
  * Improved maintainability and testability
  * Dependency injection pattern for services
  * Clear service boundaries and interfaces

- Maintain backward compatibility through re-exports
- Update main.py to use refactored coordinator

🚀 Generated with [Claude Code](https://claude.ai/code)

Co-Authored-By: Claude <noreply@anthropic.com>
This commit is contained in:
anthonyrawlins
2025-07-11 09:09:11 +10:00
parent 36c5e10a51
commit c6d69695a8
3042 changed files with 45137 additions and 46134 deletions

View File

@@ -2,833 +2,16 @@
Unified Hive Coordinator
Combines the functionality of HiveCoordinator and DistributedCoordinator into a single,
cohesive orchestration system for the Hive platform.
DEPRECATED: This module is being refactored. Use unified_coordinator_refactored.py for new implementations.
"""
import asyncio
import aiohttp
import json
import time
import hashlib
import logging
from dataclasses import dataclass, field
from typing import Dict, List, Optional, Any, Set
from enum import Enum
from concurrent.futures import ThreadPoolExecutor
from sqlalchemy.orm import Session
import redis.asyncio as redis
from prometheus_client import Counter, Histogram, Gauge
from ..models.agent import Agent as ORMAgent
from ..core.database import SessionLocal
from ..cli_agents.cli_agent_manager import get_cli_agent_manager
from ..services.task_service import TaskService
logger = logging.getLogger(__name__)
# Performance Metrics
TASK_COUNTER = Counter('hive_tasks_total', 'Total tasks processed', ['task_type', 'agent'])
TASK_DURATION = Histogram('hive_task_duration_seconds', 'Task execution time', ['task_type', 'agent'])
ACTIVE_TASKS = Gauge('hive_active_tasks', 'Currently active tasks', ['agent'])
AGENT_UTILIZATION = Gauge('hive_agent_utilization', 'Agent utilization percentage', ['agent'])
class AgentType(Enum):
"""Unified agent types supporting both original and distributed workflows"""
# Original agent types
KERNEL_DEV = "kernel_dev"
PYTORCH_DEV = "pytorch_dev"
PROFILER = "profiler"
DOCS_WRITER = "docs_writer"
TESTER = "tester"
CLI_GEMINI = "cli_gemini"
GENERAL_AI = "general_ai"
REASONING = "reasoning"
# Distributed workflow types
CODE_GENERATION = "code_generation"
CODE_REVIEW = "code_review"
TESTING = "testing"
COMPILATION = "compilation"
OPTIMIZATION = "optimization"
DOCUMENTATION = "documentation"
DEPLOYMENT = "deployment"
class TaskStatus(Enum):
"""Task status tracking"""
PENDING = "pending"
IN_PROGRESS = "in_progress"
COMPLETED = "completed"
FAILED = "failed"
class TaskPriority(Enum):
"""Task priority levels"""
CRITICAL = 1
HIGH = 2
NORMAL = 3
LOW = 4
@dataclass
class Agent:
"""Unified agent representation supporting both Ollama and CLI agents"""
id: str
endpoint: str
model: str
specialty: AgentType
max_concurrent: int = 2
current_tasks: int = 0
agent_type: str = "ollama" # "ollama" or "cli"
cli_config: Optional[Dict[str, Any]] = None
# Enhanced fields for distributed workflows
gpu_type: str = "unknown"
capabilities: Set[str] = field(default_factory=set)
performance_history: List[float] = field(default_factory=list)
specializations: List[AgentType] = field(default_factory=list)
last_heartbeat: float = field(default_factory=time.time)
def __post_init__(self):
if self.specializations:
self.capabilities.update([spec.value for spec in self.specializations])
@dataclass
class Task:
"""Unified task representation"""
id: str
type: AgentType
priority: int = 3
status: TaskStatus = TaskStatus.PENDING
context: Dict[str, Any] = field(default_factory=dict)
payload: Dict[str, Any] = field(default_factory=dict)
assigned_agent: Optional[str] = None
result: Optional[Dict] = None
created_at: float = field(default_factory=time.time)
completed_at: Optional[float] = None
# Workflow support
workflow_id: Optional[str] = None
dependencies: List[str] = field(default_factory=list)
def cache_key(self) -> str:
"""Generate cache key for task result"""
payload_hash = hashlib.md5(json.dumps(self.payload, sort_keys=True).encode()).hexdigest()
return f"task_result:{self.type.value}:{payload_hash}"
class UnifiedCoordinator:
"""
Unified coordinator that combines HiveCoordinator and DistributedCoordinator functionality.
Provides both simple task orchestration and advanced distributed workflow management.
"""
def __init__(self, redis_url: str = "redis://localhost:6379"):
# Core state
self.agents: Dict[str, Agent] = {}
self.tasks: Dict[str, Task] = {} # In-memory cache for active tasks
self.task_queue: List[Task] = []
self.is_initialized = False
# Database persistence
self.task_service = TaskService()
# CLI agent support
self.cli_agent_manager = None
# Distributed workflow support
self.redis_url = redis_url
self.redis_client: Optional[redis.Redis] = None
self.executor = ThreadPoolExecutor(max_workers=4)
self.running = False
self.workflow_tasks: Dict[str, List[Task]] = {}
# Performance tracking
self.load_balancer = AdaptiveLoadBalancer()
# Async tasks
self._background_tasks: Set[asyncio.Task] = set()
async def initialize(self):
"""Initialize the unified coordinator with all subsystems"""
if self.is_initialized:
return
logger.info("🚀 Initializing Unified Hive Coordinator...")
try:
# Initialize CLI agent manager
self.cli_agent_manager = get_cli_agent_manager()
# Initialize Redis connection for distributed features
try:
self.redis_client = redis.from_url(self.redis_url)
await self.redis_client.ping()
logger.info("✅ Redis connection established")
except Exception as e:
logger.warning(f"⚠️ Redis unavailable, distributed features disabled: {e}")
self.redis_client = None
# Load agents from database
await self._load_database_agents()
# Load existing tasks from database
await self._load_database_tasks()
# Initialize cluster agents
self._initialize_cluster_agents()
# Test initial connectivity
await self._test_initial_connectivity()
self.is_initialized = True
logger.info("✅ Unified Hive Coordinator initialized successfully")
except Exception as e:
logger.error(f"❌ Failed to initialize coordinator: {e}")
raise
async def start(self):
"""Start the coordinator background processes"""
if not self.is_initialized:
await self.initialize()
self.running = True
# Start background tasks
self._background_tasks.add(asyncio.create_task(self._task_processor()))
if self.redis_client:
self._background_tasks.add(asyncio.create_task(self._health_monitor()))
self._background_tasks.add(asyncio.create_task(self._performance_optimizer()))
logger.info("🚀 Unified Coordinator background processes started")
async def shutdown(self):
"""Shutdown the coordinator gracefully"""
logger.info("🛑 Shutting down Unified Hive Coordinator...")
self.running = False
# Cancel background tasks
for task in self._background_tasks:
task.cancel()
# Wait for tasks to complete
if self._background_tasks:
await asyncio.gather(*self._background_tasks, return_exceptions=True)
# Close Redis connection
if self.redis_client:
await self.redis_client.close()
# Shutdown executor
self.executor.shutdown(wait=True)
logger.info("✅ Unified Coordinator shutdown complete")
# =========================================================================
# AGENT MANAGEMENT
# =========================================================================
def add_agent(self, agent: Agent):
"""Add an agent to the coordinator"""
self.agents[agent.id] = agent
logger.info(f"✅ Added agent: {agent.id} ({agent.specialty.value})")
async def _load_database_agents(self):
"""Load agents from database"""
try:
db = SessionLocal()
orm_agents = db.query(ORMAgent).all()
for orm_agent in orm_agents:
specialty = AgentType(orm_agent.specialty) if orm_agent.specialty else AgentType.GENERAL_AI
agent = Agent(
id=orm_agent.id,
endpoint=orm_agent.endpoint,
model=orm_agent.model or "unknown",
specialty=specialty,
max_concurrent=orm_agent.max_concurrent,
current_tasks=orm_agent.current_tasks,
agent_type=orm_agent.agent_type,
cli_config=orm_agent.cli_config
)
self.add_agent(agent)
db.close()
logger.info(f"📊 Loaded {len(orm_agents)} agents from database")
except Exception as e:
logger.error(f"❌ Failed to load agents from database: {e}")
async def _load_database_tasks(self):
"""Load pending and in-progress tasks from database"""
try:
# Load pending tasks
pending_orm_tasks = self.task_service.get_tasks(status='pending', limit=100)
for orm_task in pending_orm_tasks:
coordinator_task = self.task_service.coordinator_task_from_orm(orm_task)
self.tasks[coordinator_task.id] = coordinator_task
self.task_queue.append(coordinator_task)
# Load in-progress tasks
in_progress_orm_tasks = self.task_service.get_tasks(status='in_progress', limit=100)
for orm_task in in_progress_orm_tasks:
coordinator_task = self.task_service.coordinator_task_from_orm(orm_task)
self.tasks[coordinator_task.id] = coordinator_task
# In-progress tasks are not added to task_queue as they're already being processed
# Sort task queue by priority
self.task_queue.sort(key=lambda t: t.priority)
logger.info(f"📊 Loaded {len(pending_orm_tasks)} pending and {len(in_progress_orm_tasks)} in-progress tasks from database")
except Exception as e:
logger.error(f"❌ Failed to load tasks from database: {e}")
def _initialize_cluster_agents(self):
"""Initialize predefined cluster agents"""
# This maintains compatibility with the original HiveCoordinator
cluster_agents = [
Agent(
id="walnut-codellama",
endpoint="http://walnut.local:11434",
model="codellama:34b",
specialty=AgentType.KERNEL_DEV
),
Agent(
id="oak-gemma",
endpoint="http://oak.local:11434",
model="gemma2:27b",
specialty=AgentType.PYTORCH_DEV
),
Agent(
id="ironwood-llama",
endpoint="http://ironwood.local:11434",
model="llama3.1:70b",
specialty=AgentType.GENERAL_AI
)
]
for agent in cluster_agents:
if agent.id not in self.agents:
self.add_agent(agent)
# =========================================================================
# TASK MANAGEMENT
# =========================================================================
def create_task(self, task_type: AgentType, context: Dict, priority: int = 3) -> Task:
"""Create a new task"""
task_id = f"task_{int(time.time())}_{len(self.tasks)}"
task = Task(
id=task_id,
type=task_type,
context=context,
priority=priority,
payload=context # For compatibility
)
# Persist to database
try:
self.task_service.create_task(task)
logger.info(f"💾 Task {task_id} persisted to database")
except Exception as e:
logger.error(f"❌ Failed to persist task {task_id} to database: {e}")
# Add to in-memory structures
self.tasks[task_id] = task
self.task_queue.append(task)
# Sort queue by priority
self.task_queue.sort(key=lambda t: t.priority)
logger.info(f"📝 Created task: {task_id} ({task_type.value}, priority: {priority})")
return task
async def submit_workflow(self, workflow: Dict[str, Any]) -> str:
"""Submit a workflow for execution (distributed coordinator compatibility)"""
workflow_id = f"workflow_{int(time.time())}"
tasks = self._parse_workflow_to_tasks(workflow, workflow_id)
self.workflow_tasks[workflow_id] = tasks
for task in tasks:
self.tasks[task.id] = task
await self._schedule_workflow_tasks(tasks)
logger.info(f"🔄 Submitted workflow: {workflow_id} with {len(tasks)} tasks")
return workflow_id
def _parse_workflow_to_tasks(self, workflow: Dict[str, Any], workflow_id: str) -> List[Task]:
"""Parse workflow definition into tasks"""
tasks = []
base_tasks = workflow.get('tasks', [])
for i, task_def in enumerate(base_tasks):
task_id = f"{workflow_id}_task_{i}"
task_type = AgentType(task_def.get('type', 'general_ai'))
task = Task(
id=task_id,
type=task_type,
workflow_id=workflow_id,
context=task_def.get('context', {}),
payload=task_def.get('payload', {}),
dependencies=task_def.get('dependencies', []),
priority=task_def.get('priority', 3)
)
tasks.append(task)
return tasks
async def _schedule_workflow_tasks(self, tasks: List[Task]):
"""Schedule workflow tasks respecting dependencies"""
for task in tasks:
if not task.dependencies:
self.task_queue.append(task)
# Tasks with dependencies will be scheduled when dependencies complete
def get_available_agent(self, task_type: AgentType) -> Optional[Agent]:
"""Find an available agent for the task type"""
available_agents = [
agent for agent in self.agents.values()
if (agent.specialty == task_type or task_type in agent.specializations)
and agent.current_tasks < agent.max_concurrent
]
if not available_agents:
# Fallback to general AI agents
available_agents = [
agent for agent in self.agents.values()
if agent.specialty == AgentType.GENERAL_AI
and agent.current_tasks < agent.max_concurrent
]
if available_agents:
# Use load balancer for optimal selection
return min(available_agents, key=lambda a: self.load_balancer.get_weight(a.id))
return None
# =========================================================================
# TASK EXECUTION
# =========================================================================
async def _task_processor(self):
"""Background task processor"""
while self.running:
try:
if self.task_queue:
# Process pending tasks
await self.process_queue()
# Check for workflow tasks whose dependencies are satisfied
await self._check_workflow_dependencies()
await asyncio.sleep(1)
except Exception as e:
logger.error(f"❌ Error in task processor: {e}")
await asyncio.sleep(5)
async def process_queue(self):
"""Process the task queue"""
if not self.task_queue:
return
# Process up to 5 tasks concurrently
batch_size = min(5, len(self.task_queue))
current_batch = self.task_queue[:batch_size]
tasks_to_execute = []
for task in current_batch:
agent = self.get_available_agent(task.type)
if agent:
tasks_to_execute.append((task, agent))
self.task_queue.remove(task)
if tasks_to_execute:
await asyncio.gather(*[
self._execute_task_with_agent(task, agent)
for task, agent in tasks_to_execute
], return_exceptions=True)
async def _execute_task_with_agent(self, task: Task, agent: Agent):
"""Execute a task with a specific agent"""
try:
task.status = TaskStatus.IN_PROGRESS
task.assigned_agent = agent.id
agent.current_tasks += 1
# Persist status change to database
try:
self.task_service.update_task(task.id, task)
logger.debug(f"💾 Updated task {task.id} status to IN_PROGRESS in database")
except Exception as e:
logger.error(f"❌ Failed to update task {task.id} status in database: {e}")
ACTIVE_TASKS.labels(agent=agent.id).inc()
start_time = time.time()
# Execute based on agent type
if agent.agent_type == "cli":
result = await self._execute_cli_task(task, agent)
else:
result = await self._execute_ollama_task(task, agent)
# Record metrics
execution_time = time.time() - start_time
TASK_COUNTER.labels(task_type=task.type.value, agent=agent.id).inc()
TASK_DURATION.labels(task_type=task.type.value, agent=agent.id).observe(execution_time)
# Update task
task.result = result
task.status = TaskStatus.COMPLETED
task.completed_at = time.time()
# Persist completion to database
try:
self.task_service.update_task(task.id, task)
logger.debug(f"💾 Updated task {task.id} status to COMPLETED in database")
except Exception as e:
logger.error(f"❌ Failed to update completed task {task.id} in database: {e}")
# Update agent
agent.current_tasks -= 1
self.load_balancer.update_weight(agent.id, execution_time)
ACTIVE_TASKS.labels(agent=agent.id).dec()
# Handle workflow completion
if task.workflow_id:
await self._handle_workflow_task_completion(task)
logger.info(f"✅ Task {task.id} completed by {agent.id}")
except Exception as e:
task.status = TaskStatus.FAILED
task.result = {"error": str(e)}
# Persist failure to database
try:
self.task_service.update_task(task.id, task)
logger.debug(f"💾 Updated task {task.id} status to FAILED in database")
except Exception as db_e:
logger.error(f"❌ Failed to update failed task {task.id} in database: {db_e}")
agent.current_tasks -= 1
ACTIVE_TASKS.labels(agent=agent.id).dec()
logger.error(f"❌ Task {task.id} failed: {e}")
async def _execute_cli_task(self, task: Task, agent: Agent) -> Dict:
"""Execute task on CLI agent"""
if not self.cli_agent_manager:
raise Exception("CLI agent manager not initialized")
prompt = self._build_task_prompt(task)
return await self.cli_agent_manager.execute_task(agent.id, prompt, task.context)
async def _execute_ollama_task(self, task: Task, agent: Agent) -> Dict:
"""Execute task on Ollama agent"""
prompt = self._build_task_prompt(task)
async with aiohttp.ClientSession() as session:
payload = {
"model": agent.model,
"prompt": prompt,
"stream": False
}
async with session.post(f"{agent.endpoint}/api/generate", json=payload) as response:
if response.status == 200:
result = await response.json()
return {"output": result.get("response", ""), "model": agent.model}
else:
raise Exception(f"HTTP {response.status}: {await response.text()}")
def _build_task_prompt(self, task: Task) -> str:
"""Build prompt for task execution"""
context_str = json.dumps(task.context, indent=2) if task.context else "No context provided"
return f"""
Task Type: {task.type.value}
Priority: {task.priority}
Context: {context_str}
Please complete this task based on the provided context and requirements.
"""
# =========================================================================
# WORKFLOW MANAGEMENT
# =========================================================================
async def _check_workflow_dependencies(self):
"""Check and schedule workflow tasks whose dependencies are satisfied"""
for workflow_id, workflow_tasks in self.workflow_tasks.items():
for task in workflow_tasks:
if (task.status == TaskStatus.PENDING and
task not in self.task_queue and
await self._dependencies_satisfied(task)):
self.task_queue.append(task)
async def _dependencies_satisfied(self, task: Task) -> bool:
"""Check if task dependencies are satisfied"""
for dep_id in task.dependencies:
dep_task = self.tasks.get(dep_id)
if not dep_task or dep_task.status != TaskStatus.COMPLETED:
return False
return True
async def _handle_workflow_task_completion(self, task: Task):
"""Handle completion of a workflow task"""
if not task.workflow_id:
return
# Check if workflow is complete
workflow_tasks = self.workflow_tasks.get(task.workflow_id, [])
completed_tasks = [t for t in workflow_tasks if t.status == TaskStatus.COMPLETED]
if len(completed_tasks) == len(workflow_tasks):
logger.info(f"🎉 Workflow {task.workflow_id} completed")
# Could emit event or update database here
async def get_workflow_status(self, workflow_id: str) -> Dict[str, Any]:
"""Get workflow execution status"""
workflow_tasks = self.workflow_tasks.get(workflow_id, [])
if not workflow_tasks:
return {"error": "Workflow not found"}
status_counts = {}
for status in TaskStatus:
status_counts[status.value] = len([t for t in workflow_tasks if t.status == status])
return {
"workflow_id": workflow_id,
"total_tasks": len(workflow_tasks),
"status_breakdown": status_counts,
"completed": status_counts.get("completed", 0) == len(workflow_tasks)
}
# =========================================================================
# MONITORING & HEALTH
# =========================================================================
async def _test_initial_connectivity(self):
"""Test connectivity to all agents"""
logger.info("🔍 Testing agent connectivity...")
for agent in self.agents.values():
try:
if agent.agent_type == "cli":
# Test CLI agent
if self.cli_agent_manager:
await self.cli_agent_manager.test_agent(agent.id)
else:
# Test Ollama agent
async with aiohttp.ClientSession() as session:
async with session.get(f"{agent.endpoint}/api/tags", timeout=aiohttp.ClientTimeout(total=5)) as response:
if response.status == 200:
logger.info(f"✅ Agent {agent.id} is responsive")
else:
logger.warning(f"⚠️ Agent {agent.id} returned HTTP {response.status}")
except Exception as e:
logger.warning(f"⚠️ Agent {agent.id} is not responsive: {e}")
async def _health_monitor(self):
"""Background health monitoring"""
while self.running:
try:
for agent in self.agents.values():
await self._check_agent_health(agent)
await asyncio.sleep(30) # Check every 30 seconds
except Exception as e:
logger.error(f"❌ Health monitor error: {e}")
await asyncio.sleep(60)
async def _check_agent_health(self, agent: Agent):
"""Check individual agent health"""
try:
if agent.agent_type == "cli":
# CLI agent health check
if self.cli_agent_manager:
is_healthy = await self.cli_agent_manager.test_agent(agent.id)
else:
# Ollama agent health check
async with aiohttp.ClientSession() as session:
async with session.get(f"{agent.endpoint}/api/tags", timeout=aiohttp.ClientTimeout(total=10)) as response:
is_healthy = response.status == 200
if is_healthy:
agent.last_heartbeat = time.time()
else:
logger.warning(f"⚠️ Agent {agent.id} health check failed")
except Exception as e:
logger.warning(f"⚠️ Agent {agent.id} health check error: {e}")
async def _performance_optimizer(self):
"""Background performance optimization"""
while self.running:
try:
await self._optimize_agent_parameters()
await self._cleanup_completed_tasks()
await asyncio.sleep(300) # Optimize every 5 minutes
except Exception as e:
logger.error(f"❌ Performance optimizer error: {e}")
await asyncio.sleep(600)
async def _optimize_agent_parameters(self):
"""Optimize agent parameters based on performance"""
for agent in self.agents.values():
if agent.performance_history:
avg_time = sum(agent.performance_history) / len(agent.performance_history)
utilization = agent.current_tasks / agent.max_concurrent if agent.max_concurrent > 0 else 0
AGENT_UTILIZATION.labels(agent=agent.id).set(utilization)
async def _cleanup_completed_tasks(self):
"""Clean up old completed tasks"""
try:
# Clean up in-memory tasks (keep only active ones)
cutoff_time = time.time() - 3600 # 1 hour ago
completed_tasks = [
task_id for task_id, task in self.tasks.items()
if task.status == TaskStatus.COMPLETED and (task.completed_at or 0) < cutoff_time
]
for task_id in completed_tasks:
del self.tasks[task_id]
# Clean up database tasks (older ones)
try:
db_cleaned_count = self.task_service.cleanup_completed_tasks(max_age_hours=24)
if db_cleaned_count > 0:
logger.info(f"🧹 Cleaned up {db_cleaned_count} old tasks from database")
except Exception as e:
logger.error(f"❌ Failed to cleanup database tasks: {e}")
if completed_tasks:
logger.info(f"🧹 Cleaned up {len(completed_tasks)} old completed tasks from memory")
except Exception as e:
logger.error(f"❌ Failed to cleanup completed tasks: {e}")
# =========================================================================
# STATUS & METRICS
# =========================================================================
def get_task_status(self, task_id: str) -> Optional[Task]:
"""Get status of a specific task"""
# First check in-memory cache
task = self.tasks.get(task_id)
if task:
return task
# If not in memory, check database
try:
orm_task = self.task_service.get_task(task_id)
if orm_task:
return self.task_service.coordinator_task_from_orm(orm_task)
except Exception as e:
logger.error(f"❌ Failed to get task {task_id} from database: {e}")
return None
def get_completed_tasks(self, limit: int = 50) -> List[Task]:
"""Get all completed tasks"""
# Get from in-memory cache first
memory_completed = [task for task in self.tasks.values() if task.status == TaskStatus.COMPLETED]
# Get additional from database if needed
try:
if len(memory_completed) < limit:
db_completed = self.task_service.get_tasks(status='completed', limit=limit)
db_tasks = [self.task_service.coordinator_task_from_orm(orm_task) for orm_task in db_completed]
# Combine and deduplicate
all_tasks = {task.id: task for task in memory_completed + db_tasks}
return list(all_tasks.values())[:limit]
except Exception as e:
logger.error(f"❌ Failed to get completed tasks from database: {e}")
return memory_completed[:limit]
async def get_health_status(self):
"""Get coordinator health status"""
agent_status = {}
for agent_id, agent in self.agents.items():
agent_status[agent_id] = {
"type": agent.agent_type,
"model": agent.model,
"specialty": agent.specialty.value,
"current_tasks": agent.current_tasks,
"max_concurrent": agent.max_concurrent,
"last_heartbeat": agent.last_heartbeat
}
# Get comprehensive task statistics from database
try:
db_stats = self.task_service.get_task_statistics()
except Exception as e:
logger.error(f"❌ Failed to get task statistics from database: {e}")
db_stats = {}
return {
"status": "operational" if self.is_initialized else "initializing",
"agents": agent_status,
"total_agents": len(self.agents),
"active_tasks": len([t for t in self.tasks.values() if t.status == TaskStatus.IN_PROGRESS]),
"pending_tasks": len(self.task_queue),
"completed_tasks": len([t for t in self.tasks.values() if t.status == TaskStatus.COMPLETED]),
"database_statistics": db_stats
}
async def get_comprehensive_status(self):
"""Get comprehensive system status"""
health = await self.get_health_status()
return {
**health,
"coordinator_type": "unified",
"features": {
"simple_tasks": True,
"workflows": True,
"cli_agents": self.cli_agent_manager is not None,
"distributed_caching": self.redis_client is not None,
"performance_monitoring": True
},
"uptime": time.time() - (self.is_initialized and time.time() or 0)
}
async def get_prometheus_metrics(self):
"""Get Prometheus metrics"""
from prometheus_client import generate_latest, CONTENT_TYPE_LATEST
return generate_latest()
def generate_progress_report(self) -> Dict:
"""Generate progress report"""
total_tasks = len(self.tasks)
completed_tasks = len([t for t in self.tasks.values() if t.status == TaskStatus.COMPLETED])
failed_tasks = len([t for t in self.tasks.values() if t.status == TaskStatus.FAILED])
return {
"total_tasks": total_tasks,
"completed_tasks": completed_tasks,
"failed_tasks": failed_tasks,
"success_rate": completed_tasks / total_tasks if total_tasks > 0 else 0,
"active_agents": len([a for a in self.agents.values() if a.current_tasks > 0]),
"queue_length": len(self.task_queue)
}
class AdaptiveLoadBalancer:
"""Simple adaptive load balancer for agent selection"""
def __init__(self):
self.weights: Dict[str, float] = {}
def update_weight(self, agent_id: str, performance_metric: float):
"""Update agent weight based on performance (lower is better)"""
# Inverse relationship: better performance = lower weight
self.weights[agent_id] = performance_metric
def get_weight(self, agent_id: str) -> float:
"""Get agent weight (lower = more preferred)"""
return self.weights.get(agent_id, 1.0)
# Re-export from refactored implementation
from .unified_coordinator_refactored import (
UnifiedCoordinatorRefactored as UnifiedCoordinator,
Agent,
Task,
AgentType,
TaskStatus,
TaskPriority
)

View File

@@ -0,0 +1,468 @@
"""
Refactored Unified Hive Coordinator
Clean architecture with separated concerns using dedicated service classes.
Each service handles a specific responsibility for maintainability and testability.
"""
import asyncio
import aiohttp
import json
import time
import hashlib
import logging
from dataclasses import dataclass, field
from typing import Dict, List, Optional, Any, Set
from enum import Enum
import redis.asyncio as redis
from ..services.agent_service import AgentService, Agent, AgentType
from ..services.task_service import TaskService
from ..services.workflow_service import WorkflowService, Task, TaskStatus
from ..services.performance_service import PerformanceService
from ..services.background_service import BackgroundService
logger = logging.getLogger(__name__)
class TaskPriority(Enum):
"""Task priority levels"""
CRITICAL = 1
HIGH = 2
NORMAL = 3
LOW = 4
class UnifiedCoordinatorRefactored:
"""
Refactored unified coordinator with separated concerns.
This coordinator orchestrates between specialized services:
- AgentService: Agent management and health monitoring
- TaskService: Database persistence and CRUD operations
- WorkflowService: Workflow parsing and execution tracking
- PerformanceService: Metrics and load balancing
- BackgroundService: Background processes and cleanup
"""
def __init__(self, redis_url: str = "redis://localhost:6379"):
# Core state - only minimal coordination state
self.tasks: Dict[str, Task] = {} # In-memory cache for active tasks
self.task_queue: List[Task] = []
self.is_initialized = False
self.running = False
# Redis for distributed features
self.redis_url = redis_url
self.redis_client: Optional[redis.Redis] = None
# Specialized services
self.agent_service = AgentService()
self.task_service = TaskService()
self.workflow_service = WorkflowService()
self.performance_service = PerformanceService()
self.background_service = BackgroundService()
async def initialize(self):
"""Initialize the unified coordinator with all subsystems"""
if self.is_initialized:
return
logger.info("🚀 Initializing Refactored Unified Hive Coordinator...")
try:
# Initialize Redis connection for distributed features
try:
self.redis_client = redis.from_url(self.redis_url)
await self.redis_client.ping()
logger.info("✅ Redis connection established")
except Exception as e:
logger.warning(f"⚠️ Redis unavailable, distributed features disabled: {e}")
self.redis_client = None
# Initialize all services
await self.agent_service.initialize()
self.task_service.initialize()
self.workflow_service.initialize()
self.performance_service.initialize()
# Initialize background service with dependencies
self.background_service.initialize(
self.agent_service,
self.task_service,
self.workflow_service,
self.performance_service
)
# Load existing tasks from database
await self._load_database_tasks()
self.is_initialized = True
logger.info("✅ Refactored Unified Hive Coordinator initialized successfully")
except Exception as e:
logger.error(f"❌ Failed to initialize coordinator: {e}")
raise
async def start(self):
"""Start the coordinator background processes"""
if not self.is_initialized:
await self.initialize()
self.running = True
# Start background service
await self.background_service.start()
# Start main task processor
asyncio.create_task(self._task_processor())
logger.info("🚀 Refactored Unified Coordinator background processes started")
async def shutdown(self):
"""Shutdown the coordinator gracefully"""
logger.info("🛑 Shutting down Refactored Unified Hive Coordinator...")
self.running = False
# Shutdown background service
await self.background_service.shutdown()
# Close Redis connection
if self.redis_client:
await self.redis_client.close()
logger.info("✅ Refactored Unified Coordinator shutdown complete")
# =========================================================================
# TASK COORDINATION (Main Responsibility)
# =========================================================================
def create_task(self, task_type: AgentType, context: Dict, priority: int = 3) -> Task:
"""Create a new task"""
task_id = f"task_{int(time.time())}_{len(self.tasks)}"
task = Task(
id=task_id,
type=task_type,
context=context,
priority=priority,
payload=context # For compatibility
)
# Persist to database
try:
self.task_service.create_task(task)
logger.info(f"💾 Task {task_id} persisted to database")
except Exception as e:
logger.error(f"❌ Failed to persist task {task_id} to database: {e}")
# Add to in-memory structures
self.tasks[task_id] = task
self.task_queue.append(task)
# Sort queue by priority
self.task_queue.sort(key=lambda t: t.priority)
logger.info(f"📝 Created task: {task_id} ({task_type.value}, priority: {priority})")
return task
async def _task_processor(self):
"""Background task processor"""
while self.running:
try:
if self.task_queue:
# Process pending tasks
await self.process_queue()
# Check for workflow tasks whose dependencies are satisfied
await self._check_workflow_dependencies()
await asyncio.sleep(1)
except Exception as e:
logger.error(f"❌ Error in task processor: {e}")
await asyncio.sleep(5)
async def process_queue(self):
"""Process the task queue"""
if not self.task_queue:
return
# Process up to 5 tasks concurrently
batch_size = min(5, len(self.task_queue))
current_batch = self.task_queue[:batch_size]
tasks_to_execute = []
for task in current_batch:
agent = self.agent_service.get_optimal_agent(
task.type,
self.performance_service.get_load_balancer()
)
if agent:
tasks_to_execute.append((task, agent))
self.task_queue.remove(task)
if tasks_to_execute:
await asyncio.gather(*[
self._execute_task_with_agent(task, agent)
for task, agent in tasks_to_execute
], return_exceptions=True)
async def _execute_task_with_agent(self, task: Task, agent):
"""Execute a task with a specific agent"""
try:
task.status = TaskStatus.IN_PROGRESS
task.assigned_agent = agent.id
# Update agent and metrics
self.agent_service.increment_agent_tasks(agent.id)
self.performance_service.record_task_start(agent.id)
# Persist status change to database
try:
self.task_service.update_task(task.id, task)
logger.debug(f"💾 Updated task {task.id} status to IN_PROGRESS in database")
except Exception as e:
logger.error(f"❌ Failed to update task {task.id} status in database: {e}")
start_time = time.time()
# Execute based on agent type
if agent.agent_type == "cli":
result = await self._execute_cli_task(task, agent)
else:
result = await self._execute_ollama_task(task, agent)
# Record metrics
execution_time = time.time() - start_time
self.performance_service.record_task_completion(agent.id, task.type.value, execution_time)
# Update task
task.result = result
task.status = TaskStatus.COMPLETED
task.completed_at = time.time()
# Persist completion to database
try:
self.task_service.update_task(task.id, task)
logger.debug(f"💾 Updated task {task.id} status to COMPLETED in database")
except Exception as e:
logger.error(f"❌ Failed to update completed task {task.id} in database: {e}")
# Update agent
self.agent_service.decrement_agent_tasks(agent.id)
# Handle workflow completion
if task.workflow_id:
self.workflow_service.handle_task_completion(task)
logger.info(f"✅ Task {task.id} completed by {agent.id}")
except Exception as e:
task.status = TaskStatus.FAILED
task.result = {"error": str(e)}
# Persist failure to database
try:
self.task_service.update_task(task.id, task)
logger.debug(f"💾 Updated task {task.id} status to FAILED in database")
except Exception as db_e:
logger.error(f"❌ Failed to update failed task {task.id} in database: {db_e}")
self.agent_service.decrement_agent_tasks(agent.id)
self.performance_service.record_task_failure(agent.id)
logger.error(f"❌ Task {task.id} failed: {e}")
async def _execute_cli_task(self, task: Task, agent) -> Dict:
"""Execute task on CLI agent"""
if not self.agent_service.cli_agent_manager:
raise Exception("CLI agent manager not initialized")
prompt = self._build_task_prompt(task)
return await self.agent_service.cli_agent_manager.execute_task(agent.id, prompt, task.context)
async def _execute_ollama_task(self, task: Task, agent) -> Dict:
"""Execute task on Ollama agent"""
prompt = self._build_task_prompt(task)
async with aiohttp.ClientSession() as session:
payload = {
"model": agent.model,
"prompt": prompt,
"stream": False
}
async with session.post(f"{agent.endpoint}/api/generate", json=payload) as response:
if response.status == 200:
result = await response.json()
return {"output": result.get("response", ""), "model": agent.model}
else:
raise Exception(f"HTTP {response.status}: {await response.text()}")
def _build_task_prompt(self, task: Task) -> str:
"""Build prompt for task execution"""
context_str = json.dumps(task.context, indent=2) if task.context else "No context provided"
return f"""
Task Type: {task.type.value}
Priority: {task.priority}
Context: {context_str}
Please complete this task based on the provided context and requirements.
"""
# =========================================================================
# WORKFLOW DELEGATION
# =========================================================================
async def submit_workflow(self, workflow: Dict[str, Any]) -> str:
"""Submit a workflow for execution"""
return await self.workflow_service.submit_workflow(workflow)
async def _check_workflow_dependencies(self):
"""Check and schedule workflow tasks whose dependencies are satisfied"""
ready_tasks = self.workflow_service.get_ready_workflow_tasks(self.tasks)
for task in ready_tasks:
if task not in self.task_queue:
self.tasks[task.id] = task
self.task_queue.append(task)
def get_workflow_status(self, workflow_id: str) -> Dict[str, Any]:
"""Get workflow execution status"""
return self.workflow_service.get_workflow_status(workflow_id)
# =========================================================================
# SERVICE DELEGATION
# =========================================================================
async def _load_database_tasks(self):
"""Load pending and in-progress tasks from database"""
try:
# Load pending tasks
pending_orm_tasks = self.task_service.get_tasks(status='pending', limit=100)
for orm_task in pending_orm_tasks:
coordinator_task = self.task_service.coordinator_task_from_orm(orm_task)
self.tasks[coordinator_task.id] = coordinator_task
self.task_queue.append(coordinator_task)
# Load in-progress tasks
in_progress_orm_tasks = self.task_service.get_tasks(status='in_progress', limit=100)
for orm_task in in_progress_orm_tasks:
coordinator_task = self.task_service.coordinator_task_from_orm(orm_task)
self.tasks[coordinator_task.id] = coordinator_task
# In-progress tasks are not added to task_queue as they're already being processed
# Sort task queue by priority
self.task_queue.sort(key=lambda t: t.priority)
logger.info(f"📊 Loaded {len(pending_orm_tasks)} pending and {len(in_progress_orm_tasks)} in-progress tasks from database")
except Exception as e:
logger.error(f"❌ Failed to load tasks from database: {e}")
# =========================================================================
# STATUS & HEALTH (Delegation to Services)
# =========================================================================
def get_task_status(self, task_id: str) -> Optional[Task]:
"""Get status of a specific task"""
# First check in-memory cache
task = self.tasks.get(task_id)
if task:
return task
# If not in memory, check database
try:
orm_task = self.task_service.get_task(task_id)
if orm_task:
return self.task_service.coordinator_task_from_orm(orm_task)
except Exception as e:
logger.error(f"❌ Failed to get task {task_id} from database: {e}")
return None
def get_completed_tasks(self, limit: int = 50) -> List[Task]:
"""Get all completed tasks"""
# Get from in-memory cache first
memory_completed = [task for task in self.tasks.values() if task.status == TaskStatus.COMPLETED]
# Get additional from database if needed
try:
if len(memory_completed) < limit:
db_completed = self.task_service.get_tasks(status='completed', limit=limit)
db_tasks = [self.task_service.coordinator_task_from_orm(orm_task) for orm_task in db_completed]
# Combine and deduplicate
all_tasks = {task.id: task for task in memory_completed + db_tasks}
return list(all_tasks.values())[:limit]
except Exception as e:
logger.error(f"❌ Failed to get completed tasks from database: {e}")
return memory_completed[:limit]
async def get_health_status(self):
"""Get coordinator health status"""
agent_status = self.agent_service.get_agent_status()
# Get comprehensive task statistics from database
try:
db_stats = self.task_service.get_task_statistics()
except Exception as e:
logger.error(f"❌ Failed to get task statistics from database: {e}")
db_stats = {}
return {
"status": "operational" if self.is_initialized else "initializing",
"agents": agent_status,
"total_agents": len(self.agent_service.get_all_agents()),
"active_tasks": len([t for t in self.tasks.values() if t.status == TaskStatus.IN_PROGRESS]),
"pending_tasks": len(self.task_queue),
"completed_tasks": len([t for t in self.tasks.values() if t.status == TaskStatus.COMPLETED]),
"database_statistics": db_stats,
"background_service": self.background_service.get_status()
}
async def get_comprehensive_status(self):
"""Get comprehensive system status"""
health = await self.get_health_status()
return {
**health,
"coordinator_type": "unified_refactored",
"features": {
"simple_tasks": True,
"workflows": True,
"cli_agents": self.agent_service.cli_agent_manager is not None,
"distributed_caching": self.redis_client is not None,
"performance_monitoring": True,
"separated_concerns": True
},
"uptime": time.time() - (self.is_initialized and time.time() or 0),
"performance_metrics": self.performance_service.get_performance_metrics()
}
async def get_prometheus_metrics(self):
"""Get Prometheus metrics"""
return await self.performance_service.get_prometheus_metrics()
def generate_progress_report(self) -> Dict:
"""Generate progress report"""
return self.performance_service.generate_performance_report(
self.agent_service.get_all_agents(),
self.tasks
)
# =========================================================================
# AGENT MANAGEMENT (Delegation)
# =========================================================================
def add_agent(self, agent: Agent):
"""Add an agent to the coordinator"""
self.agent_service.add_agent(agent)
def get_available_agent(self, task_type: AgentType):
"""Find an available agent for the task type"""
return self.agent_service.get_optimal_agent(
task_type,
self.performance_service.get_load_balancer()
)

View File

@@ -9,7 +9,7 @@ from datetime import datetime
from pathlib import Path
import socketio
from .core.unified_coordinator import UnifiedCoordinator
from .core.unified_coordinator_refactored import UnifiedCoordinatorRefactored as UnifiedCoordinator
from .core.database import engine, get_db, init_database_with_retry, test_database_connection
from .api import agents, workflows, executions, monitoring, projects, tasks, cluster, distributed_workflows, cli_agents, auth
from .models.user import Base

View File

@@ -0,0 +1,300 @@
"""
Agent Management Service
Handles agent registration, health monitoring, and connectivity management.
"""
import asyncio
import aiohttp
import time
import logging
from typing import Dict, List, Optional, Set, Any
from dataclasses import dataclass, field
from sqlalchemy.orm import Session
from enum import Enum
from ..models.agent import Agent as ORMAgent
from ..core.database import SessionLocal
from ..cli_agents.cli_agent_manager import get_cli_agent_manager
logger = logging.getLogger(__name__)
class AgentType(Enum):
"""Unified agent types supporting both original and distributed workflows"""
# Original agent types
KERNEL_DEV = "kernel_dev"
PYTORCH_DEV = "pytorch_dev"
PROFILER = "profiler"
DOCS_WRITER = "docs_writer"
TESTER = "tester"
CLI_GEMINI = "cli_gemini"
GENERAL_AI = "general_ai"
REASONING = "reasoning"
# Distributed workflow types
CODE_GENERATION = "code_generation"
CODE_REVIEW = "code_review"
TESTING = "testing"
COMPILATION = "compilation"
OPTIMIZATION = "optimization"
DOCUMENTATION = "documentation"
DEPLOYMENT = "deployment"
@dataclass
class Agent:
"""Unified agent representation supporting both Ollama and CLI agents"""
id: str
endpoint: str
model: str
specialty: AgentType
max_concurrent: int = 2
current_tasks: int = 0
agent_type: str = "ollama" # "ollama" or "cli"
cli_config: Optional[Dict[str, Any]] = None
# Enhanced fields for distributed workflows
gpu_type: str = "unknown"
capabilities: Set[str] = field(default_factory=set)
performance_history: List[float] = field(default_factory=list)
specializations: List[AgentType] = field(default_factory=list)
last_heartbeat: float = field(default_factory=time.time)
def __post_init__(self):
if self.specializations:
self.capabilities.update([spec.value for spec in self.specializations])
class AgentService:
"""Service for managing agents in the Hive cluster"""
def __init__(self):
self.agents: Dict[str, Agent] = {}
self.cli_agent_manager = None
self._initialized = False
async def initialize(self):
"""Initialize the agent service"""
if self._initialized:
return
try:
# Initialize CLI agent manager
self.cli_agent_manager = get_cli_agent_manager()
# Load agents from database
await self._load_database_agents()
# Initialize predefined cluster agents
self._initialize_cluster_agents()
# Test initial connectivity
await self._test_initial_connectivity()
self._initialized = True
logger.info("✅ Agent Service initialized successfully")
except Exception as e:
logger.error(f"❌ Failed to initialize agent service: {e}")
raise
def add_agent(self, agent: Agent):
"""Add an agent to the service"""
self.agents[agent.id] = agent
logger.info(f"✅ Added agent: {agent.id} ({agent.specialty.value})")
def get_agent(self, agent_id: str) -> Optional[Agent]:
"""Get agent by ID"""
return self.agents.get(agent_id)
def get_all_agents(self) -> Dict[str, Agent]:
"""Get all agents"""
return self.agents.copy()
def get_agents_by_specialty(self, specialty: AgentType) -> List[Agent]:
"""Get agents by specialty"""
return [
agent for agent in self.agents.values()
if agent.specialty == specialty or specialty in agent.specializations
]
def get_available_agents(self, specialty: Optional[AgentType] = None) -> List[Agent]:
"""Get available agents, optionally filtered by specialty"""
available = [
agent for agent in self.agents.values()
if agent.current_tasks < agent.max_concurrent
]
if specialty:
available = [
agent for agent in available
if agent.specialty == specialty or specialty in agent.specializations
]
return available
def get_optimal_agent(self, specialty: AgentType, load_balancer=None) -> Optional[Agent]:
"""Get the optimal agent for a task type"""
available_agents = [
agent for agent in self.agents.values()
if (agent.specialty == specialty or specialty in agent.specializations)
and agent.current_tasks < agent.max_concurrent
]
if not available_agents:
# Fallback to general AI agents
available_agents = [
agent for agent in self.agents.values()
if agent.specialty == AgentType.GENERAL_AI
and agent.current_tasks < agent.max_concurrent
]
if available_agents:
if load_balancer:
return min(available_agents, key=lambda a: load_balancer.get_weight(a.id))
else:
# Simple round-robin based on current tasks
return min(available_agents, key=lambda a: a.current_tasks)
return None
def increment_agent_tasks(self, agent_id: str):
"""Increment current task count for an agent"""
if agent_id in self.agents:
self.agents[agent_id].current_tasks += 1
def decrement_agent_tasks(self, agent_id: str):
"""Decrement current task count for an agent"""
if agent_id in self.agents:
self.agents[agent_id].current_tasks = max(0, self.agents[agent_id].current_tasks - 1)
def update_agent_heartbeat(self, agent_id: str):
"""Update agent heartbeat timestamp"""
if agent_id in self.agents:
self.agents[agent_id].last_heartbeat = time.time()
async def _load_database_agents(self):
"""Load agents from database"""
try:
db = SessionLocal()
orm_agents = db.query(ORMAgent).all()
for orm_agent in orm_agents:
specialty = AgentType(orm_agent.specialty) if orm_agent.specialty else AgentType.GENERAL_AI
agent = Agent(
id=orm_agent.id,
endpoint=orm_agent.endpoint,
model=orm_agent.model or "unknown",
specialty=specialty,
max_concurrent=orm_agent.max_concurrent,
current_tasks=orm_agent.current_tasks,
agent_type=orm_agent.agent_type,
cli_config=orm_agent.cli_config
)
self.add_agent(agent)
db.close()
logger.info(f"📊 Loaded {len(orm_agents)} agents from database")
except Exception as e:
logger.error(f"❌ Failed to load agents from database: {e}")
def _initialize_cluster_agents(self):
"""Initialize predefined cluster agents"""
cluster_agents = [
Agent(
id="walnut-codellama",
endpoint="http://walnut.local:11434",
model="codellama:34b",
specialty=AgentType.KERNEL_DEV
),
Agent(
id="oak-gemma",
endpoint="http://oak.local:11434",
model="gemma2:27b",
specialty=AgentType.PYTORCH_DEV
),
Agent(
id="ironwood-llama",
endpoint="http://ironwood.local:11434",
model="llama3.1:70b",
specialty=AgentType.GENERAL_AI
)
]
for agent in cluster_agents:
if agent.id not in self.agents:
self.add_agent(agent)
async def _test_initial_connectivity(self):
"""Test connectivity to all agents"""
logger.info("🔍 Testing agent connectivity...")
for agent in self.agents.values():
try:
if agent.agent_type == "cli":
# Test CLI agent
if self.cli_agent_manager:
await self.cli_agent_manager.test_agent(agent.id)
else:
# Test Ollama agent
async with aiohttp.ClientSession() as session:
async with session.get(
f"{agent.endpoint}/api/tags",
timeout=aiohttp.ClientTimeout(total=5)
) as response:
if response.status == 200:
logger.info(f"✅ Agent {agent.id} is responsive")
else:
logger.warning(f"⚠️ Agent {agent.id} returned HTTP {response.status}")
except Exception as e:
logger.warning(f"⚠️ Agent {agent.id} is not responsive: {e}")
async def check_agent_health(self, agent: Agent) -> bool:
"""Check individual agent health"""
try:
if agent.agent_type == "cli":
# CLI agent health check
if self.cli_agent_manager:
return await self.cli_agent_manager.test_agent(agent.id)
return False
else:
# Ollama agent health check
async with aiohttp.ClientSession() as session:
async with session.get(
f"{agent.endpoint}/api/tags",
timeout=aiohttp.ClientTimeout(total=10)
) as response:
return response.status == 200
except Exception as e:
logger.warning(f"⚠️ Agent {agent.id} health check error: {e}")
return False
async def health_monitor_cycle(self):
"""Single cycle of health monitoring for all agents"""
try:
for agent in self.agents.values():
is_healthy = await self.check_agent_health(agent)
if is_healthy:
agent.last_heartbeat = time.time()
else:
logger.warning(f"⚠️ Agent {agent.id} health check failed")
except Exception as e:
logger.error(f"❌ Health monitor cycle error: {e}")
def get_agent_status(self) -> Dict[str, Dict]:
"""Get status of all agents"""
agent_status = {}
for agent_id, agent in self.agents.items():
agent_status[agent_id] = {
"type": agent.agent_type,
"model": agent.model,
"specialty": agent.specialty.value,
"current_tasks": agent.current_tasks,
"max_concurrent": agent.max_concurrent,
"last_heartbeat": agent.last_heartbeat,
"utilization": agent.current_tasks / agent.max_concurrent if agent.max_concurrent > 0 else 0
}
return agent_status

View File

@@ -0,0 +1,163 @@
"""
Background Processing Service
Handles background tasks, cleanup, monitoring, and maintenance operations.
"""
import asyncio
import logging
from typing import Set, Optional, Callable
from concurrent.futures import ThreadPoolExecutor
logger = logging.getLogger(__name__)
class BackgroundService:
"""Service for managing background tasks and processes"""
def __init__(self):
self.running = False
self.executor = ThreadPoolExecutor(max_workers=4)
self._background_tasks: Set[asyncio.Task] = set()
self._initialized = False
# Service references (injected)
self.agent_service = None
self.task_service = None
self.workflow_service = None
self.performance_service = None
def initialize(self, agent_service, task_service, workflow_service, performance_service):
"""Initialize the background service with dependencies"""
if self._initialized:
return
self.agent_service = agent_service
self.task_service = task_service
self.workflow_service = workflow_service
self.performance_service = performance_service
self._initialized = True
logger.info("✅ Background Service initialized successfully")
async def start(self):
"""Start background processes"""
if not self._initialized:
raise Exception("Background service not initialized")
self.running = True
# Start background tasks
self._background_tasks.add(asyncio.create_task(self._health_monitor()))
self._background_tasks.add(asyncio.create_task(self._performance_optimizer()))
self._background_tasks.add(asyncio.create_task(self._cleanup_manager()))
logger.info("🚀 Background Service processes started")
async def shutdown(self):
"""Shutdown background processes"""
logger.info("🛑 Shutting down Background Service...")
self.running = False
# Cancel background tasks
for task in self._background_tasks:
task.cancel()
# Wait for tasks to complete
if self._background_tasks:
await asyncio.gather(*self._background_tasks, return_exceptions=True)
# Shutdown executor
self.executor.shutdown(wait=True)
logger.info("✅ Background Service shutdown complete")
async def _health_monitor(self):
"""Background health monitoring"""
while self.running:
try:
if self.agent_service:
await self.agent_service.health_monitor_cycle()
await asyncio.sleep(30) # Check every 30 seconds
except Exception as e:
logger.error(f"❌ Health monitor error: {e}")
await asyncio.sleep(60)
async def _performance_optimizer(self):
"""Background performance optimization"""
while self.running:
try:
if self.performance_service and self.agent_service:
await self.performance_service.optimization_cycle(
self.agent_service.get_all_agents()
)
await asyncio.sleep(300) # Optimize every 5 minutes
except Exception as e:
logger.error(f"❌ Performance optimizer error: {e}")
await asyncio.sleep(600)
async def _cleanup_manager(self):
"""Background cleanup management"""
while self.running:
try:
# Cleanup completed tasks
if self.task_service:
cleaned_count = await self._cleanup_completed_tasks()
if cleaned_count > 0:
logger.info(f"🧹 Cleaned up {cleaned_count} old tasks")
# Cleanup workflows
if self.workflow_service:
workflow_cleaned = self.workflow_service.cleanup_completed_workflows(max_age_hours=24)
if workflow_cleaned > 0:
logger.info(f"🧹 Cleaned up {workflow_cleaned} old workflows")
await asyncio.sleep(3600) # Cleanup every hour
except Exception as e:
logger.error(f"❌ Cleanup manager error: {e}")
await asyncio.sleep(1800) # Retry in 30 minutes
async def _cleanup_completed_tasks(self) -> int:
"""Clean up old completed tasks"""
try:
# Clean up database tasks (older ones)
db_cleaned_count = self.task_service.cleanup_completed_tasks(max_age_hours=24)
return db_cleaned_count
except Exception as e:
logger.error(f"❌ Failed to cleanup completed tasks: {e}")
return 0
def add_background_task(self, coro):
"""Add a custom background task"""
if self.running:
task = asyncio.create_task(coro)
self._background_tasks.add(task)
# Clean up completed tasks
task.add_done_callback(self._background_tasks.discard)
return task
return None
def schedule_periodic_task(self, coro_func: Callable, interval_seconds: int):
"""Schedule a periodic task"""
async def periodic_wrapper():
while self.running:
try:
await coro_func()
await asyncio.sleep(interval_seconds)
except Exception as e:
logger.error(f"❌ Periodic task error: {e}")
await asyncio.sleep(interval_seconds)
return self.add_background_task(periodic_wrapper())
def get_status(self) -> dict:
"""Get background service status"""
return {
"running": self.running,
"active_tasks": len([t for t in self._background_tasks if not t.done()]),
"total_tasks": len(self._background_tasks),
"executor_threads": self.executor._threads if hasattr(self.executor, '_threads') else 0
}

View File

@@ -0,0 +1,173 @@
"""
Performance Monitoring and Optimization Service
Handles performance metrics, load balancing, and system optimization.
"""
import time
import logging
from typing import Dict, List, Optional
from prometheus_client import Counter, Histogram, Gauge
logger = logging.getLogger(__name__)
# Performance Metrics
TASK_COUNTER = Counter('hive_tasks_total', 'Total tasks processed', ['task_type', 'agent'])
TASK_DURATION = Histogram('hive_task_duration_seconds', 'Task execution time', ['task_type', 'agent'])
ACTIVE_TASKS = Gauge('hive_active_tasks', 'Currently active tasks', ['agent'])
AGENT_UTILIZATION = Gauge('hive_agent_utilization', 'Agent utilization percentage', ['agent'])
class AdaptiveLoadBalancer:
"""Adaptive load balancer for optimal agent selection"""
def __init__(self):
self.weights: Dict[str, float] = {}
self.performance_history: Dict[str, List[float]] = {}
self.max_history = 100 # Keep last 100 performance measurements
def update_weight(self, agent_id: str, performance_metric: float):
"""Update agent weight based on performance (lower is better)"""
# Inverse relationship: better performance = lower weight
self.weights[agent_id] = performance_metric
# Update performance history
if agent_id not in self.performance_history:
self.performance_history[agent_id] = []
self.performance_history[agent_id].append(performance_metric)
# Keep only recent history
if len(self.performance_history[agent_id]) > self.max_history:
self.performance_history[agent_id] = self.performance_history[agent_id][-self.max_history:]
def get_weight(self, agent_id: str) -> float:
"""Get agent weight (lower = more preferred)"""
return self.weights.get(agent_id, 1.0)
def get_average_performance(self, agent_id: str) -> float:
"""Get average performance for an agent"""
history = self.performance_history.get(agent_id, [])
if not history:
return 1.0
return sum(history) / len(history)
def get_performance_stats(self) -> Dict[str, Dict[str, float]]:
"""Get performance statistics for all agents"""
stats = {}
for agent_id in self.weights:
history = self.performance_history.get(agent_id, [])
if history:
stats[agent_id] = {
"current_weight": self.weights[agent_id],
"average_time": sum(history) / len(history),
"min_time": min(history),
"max_time": max(history),
"sample_count": len(history)
}
return stats
class PerformanceService:
"""Service for performance monitoring and optimization"""
def __init__(self):
self.load_balancer = AdaptiveLoadBalancer()
self._initialized = False
def initialize(self):
"""Initialize the performance service"""
if self._initialized:
return
self._initialized = True
logger.info("✅ Performance Service initialized successfully")
def record_task_start(self, agent_id: str):
"""Record task start for metrics"""
ACTIVE_TASKS.labels(agent=agent_id).inc()
def record_task_completion(self, agent_id: str, task_type: str, execution_time: float):
"""Record task completion metrics"""
TASK_COUNTER.labels(task_type=task_type, agent=agent_id).inc()
TASK_DURATION.labels(task_type=task_type, agent=agent_id).observe(execution_time)
ACTIVE_TASKS.labels(agent=agent_id).dec()
# Update load balancer
self.load_balancer.update_weight(agent_id, execution_time)
def record_task_failure(self, agent_id: str):
"""Record task failure for metrics"""
ACTIVE_TASKS.labels(agent=agent_id).dec()
def update_agent_utilization(self, agent_id: str, current_tasks: int, max_concurrent: int):
"""Update agent utilization metrics"""
utilization = current_tasks / max_concurrent if max_concurrent > 0 else 0
AGENT_UTILIZATION.labels(agent=agent_id).set(utilization)
def get_load_balancer(self) -> AdaptiveLoadBalancer:
"""Get the load balancer instance"""
return self.load_balancer
async def optimization_cycle(self, agents: Dict):
"""Single cycle of performance optimization"""
try:
# Update utilization metrics for all agents
for agent in agents.values():
utilization = agent.current_tasks / agent.max_concurrent if agent.max_concurrent > 0 else 0
AGENT_UTILIZATION.labels(agent=agent.id).set(utilization)
# Additional optimization logic could go here
# - Dynamic scaling recommendations
# - Agent rebalancing suggestions
# - Performance alerts
except Exception as e:
logger.error(f"❌ Performance optimization cycle error: {e}")
def get_performance_metrics(self) -> Dict:
"""Get current performance metrics"""
return {
"load_balancer_stats": self.load_balancer.get_performance_stats(),
"prometheus_available": True
}
async def get_prometheus_metrics(self):
"""Get Prometheus metrics"""
from prometheus_client import generate_latest, CONTENT_TYPE_LATEST
return generate_latest()
def generate_performance_report(self, agents: Dict, tasks: Dict) -> Dict:
"""Generate comprehensive performance report"""
from .workflow_service import TaskStatus
# Agent performance
agent_stats = {}
for agent_id, agent in agents.items():
agent_stats[agent_id] = {
"current_tasks": agent.current_tasks,
"max_concurrent": agent.max_concurrent,
"utilization": agent.current_tasks / agent.max_concurrent if agent.max_concurrent > 0 else 0,
"average_performance": self.load_balancer.get_average_performance(agent_id),
"weight": self.load_balancer.get_weight(agent_id)
}
# Task statistics
total_tasks = len(tasks)
completed_tasks = len([t for t in tasks.values() if t.status == TaskStatus.COMPLETED])
failed_tasks = len([t for t in tasks.values() if t.status == TaskStatus.FAILED])
active_tasks = len([t for t in tasks.values() if t.status == TaskStatus.IN_PROGRESS])
return {
"timestamp": time.time(),
"task_statistics": {
"total": total_tasks,
"completed": completed_tasks,
"failed": failed_tasks,
"active": active_tasks,
"success_rate": completed_tasks / total_tasks if total_tasks > 0 else 0
},
"agent_performance": agent_stats,
"active_agents": len([a for a in agents.values() if a.current_tasks > 0]),
"load_balancer": self.load_balancer.get_performance_stats()
}

View File

@@ -0,0 +1,263 @@
"""
Workflow Management Service
Handles workflow parsing, scheduling, dependency tracking, and execution management.
"""
import time
import logging
from typing import Dict, List, Optional, Any
from dataclasses import dataclass, field
from enum import Enum
# Import shared types
from .agent_service import AgentType
logger = logging.getLogger(__name__)
class TaskStatus(Enum):
"""Task status tracking"""
PENDING = "pending"
IN_PROGRESS = "in_progress"
COMPLETED = "completed"
FAILED = "failed"
@dataclass
class Task:
"""Unified task representation"""
id: str
type: AgentType
priority: int = 3
status: TaskStatus = TaskStatus.PENDING
context: Dict[str, Any] = field(default_factory=dict)
payload: Dict[str, Any] = field(default_factory=dict)
assigned_agent: Optional[str] = None
result: Optional[Dict] = None
created_at: float = field(default_factory=time.time)
completed_at: Optional[float] = None
# Workflow support
workflow_id: Optional[str] = None
dependencies: List[str] = field(default_factory=list)
def cache_key(self) -> str:
"""Generate cache key for task result"""
import hashlib
import json
payload_hash = hashlib.md5(json.dumps(self.payload, sort_keys=True).encode()).hexdigest()
return f"task_result:{self.type.value}:{payload_hash}"
@dataclass
class WorkflowExecution:
"""Represents a workflow execution instance"""
workflow_id: str
execution_id: str
tasks: List[Task]
created_at: float
completed_at: Optional[float] = None
status: str = "running"
metadata: Dict[str, Any] = None
def __post_init__(self):
if self.metadata is None:
self.metadata = {}
class WorkflowService:
"""Service for managing workflows and their execution"""
def __init__(self):
self.workflow_tasks: Dict[str, List[Task]] = {}
self.workflow_executions: Dict[str, WorkflowExecution] = {}
self._initialized = False
def initialize(self):
"""Initialize the workflow service"""
if self._initialized:
return
self._initialized = True
logger.info("✅ Workflow Service initialized successfully")
async def submit_workflow(self, workflow: Dict[str, Any]) -> str:
"""Submit a workflow for execution"""
workflow_id = f"workflow_{int(time.time())}"
execution_id = f"exec_{workflow_id}"
tasks = self._parse_workflow_to_tasks(workflow, workflow_id)
# Create workflow execution record
execution = WorkflowExecution(
workflow_id=workflow_id,
execution_id=execution_id,
tasks=tasks,
created_at=time.time(),
metadata=workflow.get('metadata', {})
)
self.workflow_tasks[workflow_id] = tasks
self.workflow_executions[execution_id] = execution
logger.info(f"🔄 Submitted workflow: {workflow_id} with {len(tasks)} tasks")
return workflow_id
def _parse_workflow_to_tasks(self, workflow: Dict[str, Any], workflow_id: str) -> List[Task]:
"""Parse workflow definition into tasks"""
tasks = []
base_tasks = workflow.get('tasks', [])
for i, task_def in enumerate(base_tasks):
task_id = f"{workflow_id}_task_{i}"
task_type = AgentType(task_def.get('type', 'general_ai'))
task = Task(
id=task_id,
type=task_type,
workflow_id=workflow_id,
context=task_def.get('context', {}),
payload=task_def.get('payload', {}),
dependencies=task_def.get('dependencies', []),
priority=task_def.get('priority', 3)
)
tasks.append(task)
return tasks
def get_ready_workflow_tasks(self, all_tasks: Dict[str, Task]) -> List[Task]:
"""Get workflow tasks that are ready to execute (dependencies satisfied)"""
ready_tasks = []
for workflow_id, workflow_tasks in self.workflow_tasks.items():
for task in workflow_tasks:
if (task.status == TaskStatus.PENDING and
self._dependencies_satisfied(task, all_tasks)):
ready_tasks.append(task)
return ready_tasks
def _dependencies_satisfied(self, task: Task, all_tasks: Dict[str, Task]) -> bool:
"""Check if task dependencies are satisfied"""
for dep_id in task.dependencies:
dep_task = all_tasks.get(dep_id)
if not dep_task or dep_task.status != TaskStatus.COMPLETED:
return False
return True
def handle_task_completion(self, task: Task):
"""Handle completion of a workflow task"""
if not task.workflow_id:
return
# Check if workflow is complete
workflow_tasks = self.workflow_tasks.get(task.workflow_id, [])
completed_tasks = [t for t in workflow_tasks if t.status == TaskStatus.COMPLETED]
failed_tasks = [t for t in workflow_tasks if t.status == TaskStatus.FAILED]
# Update workflow execution status
for execution in self.workflow_executions.values():
if execution.workflow_id == task.workflow_id:
if len(failed_tasks) > 0:
execution.status = "failed"
execution.completed_at = time.time()
logger.info(f"❌ Workflow {task.workflow_id} failed")
elif len(completed_tasks) == len(workflow_tasks):
execution.status = "completed"
execution.completed_at = time.time()
logger.info(f"🎉 Workflow {task.workflow_id} completed")
break
def get_workflow_status(self, workflow_id: str) -> Dict[str, Any]:
"""Get workflow execution status"""
workflow_tasks = self.workflow_tasks.get(workflow_id, [])
if not workflow_tasks:
return {"error": "Workflow not found"}
status_counts = {}
for status in TaskStatus:
status_counts[status.value] = len([t for t in workflow_tasks if t.status == status])
# Find execution record
execution = None
for exec_record in self.workflow_executions.values():
if exec_record.workflow_id == workflow_id:
execution = exec_record
break
return {
"workflow_id": workflow_id,
"execution_id": execution.execution_id if execution else None,
"total_tasks": len(workflow_tasks),
"status_breakdown": status_counts,
"completed": status_counts.get("completed", 0) == len(workflow_tasks),
"status": execution.status if execution else "unknown",
"created_at": execution.created_at if execution else None,
"completed_at": execution.completed_at if execution else None
}
def get_workflow_tasks(self, workflow_id: str) -> List[Task]:
"""Get all tasks for a workflow"""
return self.workflow_tasks.get(workflow_id, [])
def get_all_workflows(self) -> Dict[str, List[Task]]:
"""Get all workflows"""
return self.workflow_tasks.copy()
def get_workflow_executions(self, workflow_id: Optional[str] = None) -> List[Dict[str, Any]]:
"""Get workflow execution history"""
executions = []
for execution in self.workflow_executions.values():
if workflow_id is None or execution.workflow_id == workflow_id:
executions.append({
"workflow_id": execution.workflow_id,
"execution_id": execution.execution_id,
"status": execution.status,
"task_count": len(execution.tasks),
"created_at": execution.created_at,
"completed_at": execution.completed_at,
"metadata": execution.metadata
})
# Sort by creation time, newest first
executions.sort(key=lambda x: x["created_at"], reverse=True)
return executions
def cleanup_completed_workflows(self, max_age_hours: int = 24):
"""Clean up old completed workflow executions"""
cutoff_time = time.time() - (max_age_hours * 3600)
# Find completed executions older than cutoff
to_remove = []
for execution_id, execution in self.workflow_executions.items():
if (execution.status in ["completed", "failed"] and
execution.completed_at and
execution.completed_at < cutoff_time):
to_remove.append(execution_id)
# Remove old executions and their associated workflow tasks
removed_count = 0
for execution_id in to_remove:
execution = self.workflow_executions[execution_id]
workflow_id = execution.workflow_id
# Remove workflow tasks if this is the only execution for this workflow
other_executions = [
e for e in self.workflow_executions.values()
if e.workflow_id == workflow_id and e.execution_id != execution_id
]
if not other_executions:
self.workflow_tasks.pop(workflow_id, None)
# Remove execution record
del self.workflow_executions[execution_id]
removed_count += 1
if removed_count > 0:
logger.info(f"🧹 Cleaned up {removed_count} old workflow executions")
return removed_count