Fix critical in-memory task storage with database persistence
Major architectural improvement to replace in-memory task storage with database-backed persistence while maintaining backward compatibility. Changes: - Created Task SQLAlchemy model matching database schema - Added Workflow and Execution SQLAlchemy models - Created TaskService for database CRUD operations - Updated UnifiedCoordinator to use database persistence - Modified task APIs to leverage database storage - Added task loading from database on coordinator initialization - Implemented status change persistence during task execution - Enhanced task cleanup with database support - Added comprehensive task statistics from database Benefits: - Tasks persist across application restarts - Better scalability and reliability - Historical task data retention - Comprehensive task filtering and querying - Maintains in-memory cache for performance 🤖 Generated with [Claude Code](https://claude.ai/code) Co-Authored-By: Claude <noreply@anthropic.com>
This commit is contained in:
@@ -21,6 +21,7 @@ from prometheus_client import Counter, Histogram, Gauge
|
||||
from ..models.agent import Agent as ORMAgent
|
||||
from ..core.database import SessionLocal
|
||||
from ..cli_agents.cli_agent_manager import get_cli_agent_manager
|
||||
from ..services.task_service import TaskService
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
@@ -120,10 +121,13 @@ class UnifiedCoordinator:
|
||||
def __init__(self, redis_url: str = "redis://localhost:6379"):
|
||||
# Core state
|
||||
self.agents: Dict[str, Agent] = {}
|
||||
self.tasks: Dict[str, Task] = {}
|
||||
self.tasks: Dict[str, Task] = {} # In-memory cache for active tasks
|
||||
self.task_queue: List[Task] = []
|
||||
self.is_initialized = False
|
||||
|
||||
# Database persistence
|
||||
self.task_service = TaskService()
|
||||
|
||||
# CLI agent support
|
||||
self.cli_agent_manager = None
|
||||
|
||||
@@ -163,6 +167,9 @@ class UnifiedCoordinator:
|
||||
# Load agents from database
|
||||
await self._load_database_agents()
|
||||
|
||||
# Load existing tasks from database
|
||||
await self._load_database_tasks()
|
||||
|
||||
# Initialize cluster agents
|
||||
self._initialize_cluster_agents()
|
||||
|
||||
@@ -249,6 +256,31 @@ class UnifiedCoordinator:
|
||||
except Exception as e:
|
||||
logger.error(f"❌ Failed to load agents from database: {e}")
|
||||
|
||||
async def _load_database_tasks(self):
|
||||
"""Load pending and in-progress tasks from database"""
|
||||
try:
|
||||
# Load pending tasks
|
||||
pending_orm_tasks = self.task_service.get_tasks(status='pending', limit=100)
|
||||
for orm_task in pending_orm_tasks:
|
||||
coordinator_task = self.task_service.coordinator_task_from_orm(orm_task)
|
||||
self.tasks[coordinator_task.id] = coordinator_task
|
||||
self.task_queue.append(coordinator_task)
|
||||
|
||||
# Load in-progress tasks
|
||||
in_progress_orm_tasks = self.task_service.get_tasks(status='in_progress', limit=100)
|
||||
for orm_task in in_progress_orm_tasks:
|
||||
coordinator_task = self.task_service.coordinator_task_from_orm(orm_task)
|
||||
self.tasks[coordinator_task.id] = coordinator_task
|
||||
# In-progress tasks are not added to task_queue as they're already being processed
|
||||
|
||||
# Sort task queue by priority
|
||||
self.task_queue.sort(key=lambda t: t.priority)
|
||||
|
||||
logger.info(f"📊 Loaded {len(pending_orm_tasks)} pending and {len(in_progress_orm_tasks)} in-progress tasks from database")
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"❌ Failed to load tasks from database: {e}")
|
||||
|
||||
def _initialize_cluster_agents(self):
|
||||
"""Initialize predefined cluster agents"""
|
||||
# This maintains compatibility with the original HiveCoordinator
|
||||
@@ -292,6 +324,14 @@ class UnifiedCoordinator:
|
||||
payload=context # For compatibility
|
||||
)
|
||||
|
||||
# Persist to database
|
||||
try:
|
||||
self.task_service.create_task(task)
|
||||
logger.info(f"💾 Task {task_id} persisted to database")
|
||||
except Exception as e:
|
||||
logger.error(f"❌ Failed to persist task {task_id} to database: {e}")
|
||||
|
||||
# Add to in-memory structures
|
||||
self.tasks[task_id] = task
|
||||
self.task_queue.append(task)
|
||||
|
||||
@@ -416,6 +456,13 @@ class UnifiedCoordinator:
|
||||
task.assigned_agent = agent.id
|
||||
agent.current_tasks += 1
|
||||
|
||||
# Persist status change to database
|
||||
try:
|
||||
self.task_service.update_task(task.id, task)
|
||||
logger.debug(f"💾 Updated task {task.id} status to IN_PROGRESS in database")
|
||||
except Exception as e:
|
||||
logger.error(f"❌ Failed to update task {task.id} status in database: {e}")
|
||||
|
||||
ACTIVE_TASKS.labels(agent=agent.id).inc()
|
||||
start_time = time.time()
|
||||
|
||||
@@ -435,6 +482,13 @@ class UnifiedCoordinator:
|
||||
task.status = TaskStatus.COMPLETED
|
||||
task.completed_at = time.time()
|
||||
|
||||
# Persist completion to database
|
||||
try:
|
||||
self.task_service.update_task(task.id, task)
|
||||
logger.debug(f"💾 Updated task {task.id} status to COMPLETED in database")
|
||||
except Exception as e:
|
||||
logger.error(f"❌ Failed to update completed task {task.id} in database: {e}")
|
||||
|
||||
# Update agent
|
||||
agent.current_tasks -= 1
|
||||
self.load_balancer.update_weight(agent.id, execution_time)
|
||||
@@ -450,6 +504,14 @@ class UnifiedCoordinator:
|
||||
except Exception as e:
|
||||
task.status = TaskStatus.FAILED
|
||||
task.result = {"error": str(e)}
|
||||
|
||||
# Persist failure to database
|
||||
try:
|
||||
self.task_service.update_task(task.id, task)
|
||||
logger.debug(f"💾 Updated task {task.id} status to FAILED in database")
|
||||
except Exception as db_e:
|
||||
logger.error(f"❌ Failed to update failed task {task.id} in database: {db_e}")
|
||||
|
||||
agent.current_tasks -= 1
|
||||
ACTIVE_TASKS.labels(agent=agent.id).dec()
|
||||
logger.error(f"❌ Task {task.id} failed: {e}")
|
||||
@@ -622,18 +684,31 @@ Please complete this task based on the provided context and requirements.
|
||||
|
||||
async def _cleanup_completed_tasks(self):
|
||||
"""Clean up old completed tasks"""
|
||||
cutoff_time = time.time() - 3600 # 1 hour ago
|
||||
|
||||
completed_tasks = [
|
||||
task_id for task_id, task in self.tasks.items()
|
||||
if task.status == TaskStatus.COMPLETED and (task.completed_at or 0) < cutoff_time
|
||||
]
|
||||
|
||||
for task_id in completed_tasks:
|
||||
del self.tasks[task_id]
|
||||
try:
|
||||
# Clean up in-memory tasks (keep only active ones)
|
||||
cutoff_time = time.time() - 3600 # 1 hour ago
|
||||
|
||||
if completed_tasks:
|
||||
logger.info(f"🧹 Cleaned up {len(completed_tasks)} old completed tasks")
|
||||
completed_tasks = [
|
||||
task_id for task_id, task in self.tasks.items()
|
||||
if task.status == TaskStatus.COMPLETED and (task.completed_at or 0) < cutoff_time
|
||||
]
|
||||
|
||||
for task_id in completed_tasks:
|
||||
del self.tasks[task_id]
|
||||
|
||||
# Clean up database tasks (older ones)
|
||||
try:
|
||||
db_cleaned_count = self.task_service.cleanup_completed_tasks(max_age_hours=24)
|
||||
if db_cleaned_count > 0:
|
||||
logger.info(f"🧹 Cleaned up {db_cleaned_count} old tasks from database")
|
||||
except Exception as e:
|
||||
logger.error(f"❌ Failed to cleanup database tasks: {e}")
|
||||
|
||||
if completed_tasks:
|
||||
logger.info(f"🧹 Cleaned up {len(completed_tasks)} old completed tasks from memory")
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"❌ Failed to cleanup completed tasks: {e}")
|
||||
|
||||
# =========================================================================
|
||||
# STATUS & METRICS
|
||||
@@ -641,11 +716,39 @@ Please complete this task based on the provided context and requirements.
|
||||
|
||||
def get_task_status(self, task_id: str) -> Optional[Task]:
|
||||
"""Get status of a specific task"""
|
||||
return self.tasks.get(task_id)
|
||||
# First check in-memory cache
|
||||
task = self.tasks.get(task_id)
|
||||
if task:
|
||||
return task
|
||||
|
||||
# If not in memory, check database
|
||||
try:
|
||||
orm_task = self.task_service.get_task(task_id)
|
||||
if orm_task:
|
||||
return self.task_service.coordinator_task_from_orm(orm_task)
|
||||
except Exception as e:
|
||||
logger.error(f"❌ Failed to get task {task_id} from database: {e}")
|
||||
|
||||
return None
|
||||
|
||||
def get_completed_tasks(self) -> List[Task]:
|
||||
def get_completed_tasks(self, limit: int = 50) -> List[Task]:
|
||||
"""Get all completed tasks"""
|
||||
return [task for task in self.tasks.values() if task.status == TaskStatus.COMPLETED]
|
||||
# Get from in-memory cache first
|
||||
memory_completed = [task for task in self.tasks.values() if task.status == TaskStatus.COMPLETED]
|
||||
|
||||
# Get additional from database if needed
|
||||
try:
|
||||
if len(memory_completed) < limit:
|
||||
db_completed = self.task_service.get_tasks(status='completed', limit=limit)
|
||||
db_tasks = [self.task_service.coordinator_task_from_orm(orm_task) for orm_task in db_completed]
|
||||
|
||||
# Combine and deduplicate
|
||||
all_tasks = {task.id: task for task in memory_completed + db_tasks}
|
||||
return list(all_tasks.values())[:limit]
|
||||
except Exception as e:
|
||||
logger.error(f"❌ Failed to get completed tasks from database: {e}")
|
||||
|
||||
return memory_completed[:limit]
|
||||
|
||||
async def get_health_status(self):
|
||||
"""Get coordinator health status"""
|
||||
@@ -660,13 +763,21 @@ Please complete this task based on the provided context and requirements.
|
||||
"last_heartbeat": agent.last_heartbeat
|
||||
}
|
||||
|
||||
# Get comprehensive task statistics from database
|
||||
try:
|
||||
db_stats = self.task_service.get_task_statistics()
|
||||
except Exception as e:
|
||||
logger.error(f"❌ Failed to get task statistics from database: {e}")
|
||||
db_stats = {}
|
||||
|
||||
return {
|
||||
"status": "operational" if self.is_initialized else "initializing",
|
||||
"agents": agent_status,
|
||||
"total_agents": len(self.agents),
|
||||
"active_tasks": len([t for t in self.tasks.values() if t.status == TaskStatus.IN_PROGRESS]),
|
||||
"pending_tasks": len(self.task_queue),
|
||||
"completed_tasks": len([t for t in self.tasks.values() if t.status == TaskStatus.COMPLETED])
|
||||
"completed_tasks": len([t for t in self.tasks.values() if t.status == TaskStatus.COMPLETED]),
|
||||
"database_statistics": db_stats
|
||||
}
|
||||
|
||||
async def get_comprehensive_status(self):
|
||||
|
||||
Reference in New Issue
Block a user