- Force backend and frontend services to run on walnut where images are built - Fixes 'No such image' errors on other nodes - Both services now running successfully (1/1 replicas) - Frontend accessible at https://hive.home.deepblack.cloud - Backend API responding on /api/ endpoints 🤖 Generated with [Claude Code](https://claude.ai/code) Co-Authored-By: Claude <noreply@anthropic.com>
568 lines
19 KiB
Python
568 lines
19 KiB
Python
from fastapi import FastAPI, Depends, HTTPException, status
|
|
from fastapi.middleware.cors import CORSMiddleware
|
|
from fastapi.staticfiles import StaticFiles
|
|
from fastapi.encoders import jsonable_encoder
|
|
from contextlib import asynccontextmanager
|
|
import json
|
|
import asyncio
|
|
import uvicorn
|
|
import os
|
|
from datetime import datetime
|
|
from pathlib import Path
|
|
import socketio
|
|
|
|
from .core.unified_coordinator_refactored import UnifiedCoordinatorRefactored as UnifiedCoordinator
|
|
from .core.database import engine, get_db, init_database_with_retry, test_database_connection
|
|
from .models.user import Base
|
|
from .models import agent, project # Import the new agent and project models
|
|
|
|
# Global unified coordinator instance (will be initialized in lifespan)
|
|
unified_coordinator: UnifiedCoordinator = None
|
|
|
|
@asynccontextmanager
|
|
async def lifespan(app: FastAPI):
|
|
"""Enhanced application lifespan manager with proper error handling"""
|
|
startup_success = False
|
|
|
|
try:
|
|
# Startup
|
|
print("🚀 Starting Hive Orchestrator...")
|
|
|
|
# Initialize database with retry logic
|
|
print("📊 Initializing database...")
|
|
init_database_with_retry()
|
|
|
|
# Initialize auth database tables and initial data
|
|
print("🔐 Initializing authentication system...")
|
|
from .core.init_db import initialize_database
|
|
initialize_database()
|
|
|
|
# Initialize coordinator instance
|
|
print("🔧 Initializing unified coordinator...")
|
|
global unified_coordinator
|
|
unified_coordinator = UnifiedCoordinator()
|
|
|
|
# Test database connection
|
|
if not test_database_connection():
|
|
raise Exception("Database connection test failed")
|
|
|
|
# Initialize unified coordinator with error handling
|
|
print("🤖 Initializing Unified Coordinator...")
|
|
await unified_coordinator.start()
|
|
|
|
# Store coordinator in app state for endpoint access
|
|
app.state.hive_coordinator = unified_coordinator
|
|
app.state.unified_coordinator = unified_coordinator
|
|
|
|
startup_success = True
|
|
print("✅ Hive Orchestrator with Unified Coordinator started successfully!")
|
|
|
|
yield
|
|
|
|
except Exception as e:
|
|
print(f"❌ Startup failed: {e}")
|
|
if startup_success:
|
|
# If we got past startup, try to shutdown cleanly
|
|
try:
|
|
await unified_coordinator.shutdown()
|
|
except Exception as shutdown_error:
|
|
print(f"Shutdown error during startup failure: {shutdown_error}")
|
|
raise
|
|
|
|
finally:
|
|
# Shutdown
|
|
print("🛑 Shutting down Hive Orchestrator...")
|
|
try:
|
|
await unified_coordinator.shutdown()
|
|
print("✅ Hive Orchestrator stopped")
|
|
except Exception as e:
|
|
print(f"❌ Shutdown error: {e}")
|
|
|
|
# Create FastAPI application with comprehensive OpenAPI configuration
|
|
app = FastAPI(
|
|
title="Hive API",
|
|
description="""
|
|
**Hive Unified Distributed AI Orchestration Platform**
|
|
|
|
A comprehensive platform for managing and orchestrating distributed AI agents across multiple nodes.
|
|
Supports both Ollama-based local agents and CLI-based cloud agents (like Google Gemini).
|
|
|
|
## Features
|
|
|
|
* **Multi-Agent Management**: Register and manage both Ollama and CLI-based AI agents
|
|
* **Task Orchestration**: Distribute and coordinate tasks across specialized agents
|
|
* **Workflow Engine**: Create and execute complex multi-agent workflows
|
|
* **Real-time Monitoring**: Monitor agent health, task progress, and system performance
|
|
* **Performance Analytics**: Track utilization, success rates, and performance metrics
|
|
* **Authentication**: Secure API access with JWT-based authentication
|
|
|
|
## Agent Types
|
|
|
|
* **kernel_dev**: Linux kernel development and debugging
|
|
* **pytorch_dev**: PyTorch model development and optimization
|
|
* **profiler**: Performance profiling and optimization
|
|
* **docs_writer**: Documentation generation and technical writing
|
|
* **tester**: Automated testing and quality assurance
|
|
* **cli_gemini**: Google Gemini CLI integration for advanced reasoning
|
|
* **general_ai**: General-purpose AI assistance
|
|
* **reasoning**: Complex reasoning and problem-solving tasks
|
|
|
|
## Quick Start
|
|
|
|
1. Register agents via `/api/agents` endpoint
|
|
2. Create tasks via `/api/tasks` endpoint
|
|
3. Monitor progress via `/api/status` endpoint
|
|
4. Execute workflows via `/api/workflows` endpoint
|
|
|
|
For detailed documentation, visit the [Hive Documentation](https://hive.home.deepblack.cloud/docs).
|
|
""",
|
|
version="1.1.0",
|
|
terms_of_service="https://hive.home.deepblack.cloud/terms",
|
|
contact={
|
|
"name": "Hive Development Team",
|
|
"url": "https://hive.home.deepblack.cloud/contact",
|
|
"email": "hive-support@deepblack.cloud",
|
|
},
|
|
license_info={
|
|
"name": "MIT License",
|
|
"url": "https://opensource.org/licenses/MIT",
|
|
},
|
|
servers=[
|
|
{
|
|
"url": "https://hive.home.deepblack.cloud/api",
|
|
"description": "Production server"
|
|
},
|
|
{
|
|
"url": "http://localhost:8087/api",
|
|
"description": "Development server"
|
|
}
|
|
],
|
|
openapi_tags=[
|
|
{
|
|
"name": "authentication",
|
|
"description": "User authentication and authorization operations"
|
|
},
|
|
{
|
|
"name": "agents",
|
|
"description": "Ollama agent management and registration"
|
|
},
|
|
{
|
|
"name": "cli-agents",
|
|
"description": "CLI-based agent management (Google Gemini, etc.)"
|
|
},
|
|
{
|
|
"name": "tasks",
|
|
"description": "Task creation, management, and execution"
|
|
},
|
|
{
|
|
"name": "workflows",
|
|
"description": "Multi-agent workflow orchestration"
|
|
},
|
|
{
|
|
"name": "executions",
|
|
"description": "Workflow execution tracking and results"
|
|
},
|
|
{
|
|
"name": "monitoring",
|
|
"description": "System health monitoring and metrics"
|
|
},
|
|
{
|
|
"name": "projects",
|
|
"description": "Project management and organization"
|
|
},
|
|
{
|
|
"name": "cluster",
|
|
"description": "Cluster-wide operations and coordination"
|
|
},
|
|
{
|
|
"name": "distributed-workflows",
|
|
"description": "Advanced distributed workflow management"
|
|
},
|
|
{
|
|
"name": "bzzz-integration",
|
|
"description": "Bzzz P2P task coordination system integration"
|
|
}
|
|
],
|
|
lifespan=lifespan
|
|
)
|
|
|
|
# Enhanced CORS configuration with environment variable support
|
|
cors_origins = os.getenv("CORS_ORIGINS", "http://localhost:3000,http://localhost:3001,https://hive.home.deepblack.cloud,http://hive.home.deepblack.cloud")
|
|
allowed_origins = [origin.strip() for origin in cors_origins.split(",")]
|
|
|
|
app.add_middleware(
|
|
CORSMiddleware,
|
|
allow_origins=allowed_origins,
|
|
allow_credentials=True,
|
|
allow_methods=["*"],
|
|
allow_headers=["*"],
|
|
)
|
|
|
|
# Dependency injection for unified coordinator
|
|
def get_coordinator() -> UnifiedCoordinator:
|
|
"""Dependency injection for getting the unified coordinator instance"""
|
|
if unified_coordinator is None:
|
|
raise HTTPException(status_code=503, detail="Coordinator not initialized")
|
|
return unified_coordinator
|
|
|
|
# Import API routers
|
|
from .api import agents, workflows, executions, monitoring, projects, tasks, cluster, distributed_workflows, cli_agents, auth
|
|
|
|
# Import error handlers and response models
|
|
from .core.error_handlers import (
|
|
hive_exception_handler,
|
|
validation_exception_handler,
|
|
generic_exception_handler,
|
|
HiveAPIException,
|
|
create_health_response,
|
|
check_component_health
|
|
)
|
|
from .models.responses import HealthResponse, SystemStatusResponse, ErrorResponse, ComponentStatus
|
|
from fastapi.exceptions import RequestValidationError
|
|
import logging
|
|
from .docs_config import custom_openapi_schema
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
# Register global exception handlers
|
|
app.add_exception_handler(HiveAPIException, hive_exception_handler)
|
|
app.add_exception_handler(RequestValidationError, validation_exception_handler)
|
|
app.add_exception_handler(Exception, generic_exception_handler)
|
|
|
|
# Include API routes
|
|
app.include_router(auth.router, prefix="/api/auth", tags=["authentication"])
|
|
app.include_router(agents.router, prefix="/api", tags=["agents"])
|
|
app.include_router(workflows.router, prefix="/api", tags=["workflows"])
|
|
app.include_router(executions.router, prefix="/api", tags=["executions"])
|
|
app.include_router(monitoring.router, prefix="/api", tags=["monitoring"])
|
|
app.include_router(projects.router, prefix="/api", tags=["projects"])
|
|
app.include_router(projects.bzzz_router, prefix="/api", tags=["bzzz-integration"])
|
|
app.include_router(tasks.router, prefix="/api", tags=["tasks"])
|
|
app.include_router(cluster.router, prefix="/api", tags=["cluster"])
|
|
app.include_router(distributed_workflows.router, tags=["distributed-workflows"])
|
|
app.include_router(cli_agents.router, tags=["cli-agents"])
|
|
|
|
# Override dependency functions in API modules with our coordinator instance
|
|
agents.get_coordinator = get_coordinator
|
|
tasks.get_coordinator = get_coordinator
|
|
distributed_workflows.get_coordinator = get_coordinator
|
|
cli_agents.get_coordinator = get_coordinator
|
|
|
|
|
|
# Health Check and System Status Endpoints
|
|
@app.get(
|
|
"/health",
|
|
response_model=HealthResponse,
|
|
status_code=status.HTTP_200_OK,
|
|
summary="Simple health check",
|
|
description="""
|
|
Basic health check endpoint for monitoring system availability.
|
|
|
|
This lightweight endpoint provides a quick health status check
|
|
without detailed component analysis. Use this for:
|
|
|
|
- Load balancer health checks
|
|
- Simple uptime monitoring
|
|
- Basic availability verification
|
|
- Quick status confirmation
|
|
|
|
For detailed system status including component health,
|
|
use the `/api/health` endpoint instead.
|
|
""",
|
|
tags=["health"],
|
|
responses={
|
|
200: {"description": "System is healthy and operational"},
|
|
503: {"model": ErrorResponse, "description": "System is unhealthy or partially unavailable"}
|
|
}
|
|
)
|
|
async def health_check() -> HealthResponse:
|
|
"""
|
|
Simple health check endpoint.
|
|
|
|
Returns:
|
|
HealthResponse: Basic health status and timestamp
|
|
"""
|
|
return HealthResponse(
|
|
status="healthy",
|
|
version="1.1.0"
|
|
)
|
|
|
|
|
|
@app.get(
|
|
"/api/health",
|
|
response_model=SystemStatusResponse,
|
|
status_code=status.HTTP_200_OK,
|
|
summary="Comprehensive system health check",
|
|
description="""
|
|
Comprehensive health check with detailed component status information.
|
|
|
|
This endpoint performs thorough health checks on all system components:
|
|
|
|
**Checked Components:**
|
|
- Database connectivity and performance
|
|
- Coordinator service status
|
|
- Active agent health and availability
|
|
- Task queue status and capacity
|
|
- Memory and resource utilization
|
|
- External service dependencies
|
|
|
|
**Use Cases:**
|
|
- Detailed system monitoring and alerting
|
|
- Troubleshooting system issues
|
|
- Performance analysis and optimization
|
|
- Operational status dashboards
|
|
- Pre-deployment health verification
|
|
|
|
**Response Details:**
|
|
- Overall system status and version
|
|
- Component-specific health status
|
|
- Active agent status and utilization
|
|
- Task queue metrics and performance
|
|
- System uptime and performance metrics
|
|
""",
|
|
tags=["health"],
|
|
responses={
|
|
200: {"description": "Detailed system health status retrieved successfully"},
|
|
500: {"model": ErrorResponse, "description": "Health check failed due to system errors"}
|
|
}
|
|
)
|
|
async def detailed_health_check() -> SystemStatusResponse:
|
|
"""
|
|
Comprehensive system health check with component details.
|
|
|
|
Returns:
|
|
SystemStatusResponse: Detailed system and component health status
|
|
|
|
Raises:
|
|
HTTPException: If health check encounters critical errors
|
|
"""
|
|
try:
|
|
# Check database health
|
|
database_health = check_component_health(
|
|
"database",
|
|
lambda: test_database_connection()
|
|
)
|
|
|
|
# Check coordinator health
|
|
coordinator_health = check_component_health(
|
|
"coordinator",
|
|
lambda: unified_coordinator is not None and hasattr(unified_coordinator, 'get_health_status')
|
|
)
|
|
|
|
# Get coordinator status if available
|
|
coordinator_status = {}
|
|
if unified_coordinator:
|
|
try:
|
|
coordinator_status = await unified_coordinator.get_health_status()
|
|
except Exception as e:
|
|
coordinator_status = {"error": str(e)}
|
|
|
|
# Build component status list
|
|
components = [
|
|
ComponentStatus(
|
|
name="database",
|
|
status="success" if database_health["status"] == "healthy" else "error",
|
|
details=database_health.get("details", {}),
|
|
last_check=datetime.utcnow()
|
|
),
|
|
ComponentStatus(
|
|
name="coordinator",
|
|
status="success" if coordinator_health["status"] == "healthy" else "error",
|
|
details=coordinator_health.get("details", {}),
|
|
last_check=datetime.utcnow()
|
|
)
|
|
]
|
|
|
|
# Extract agent information
|
|
agents_info = coordinator_status.get("agents", {})
|
|
total_agents = len(agents_info)
|
|
active_tasks = coordinator_status.get("active_tasks", 0)
|
|
pending_tasks = coordinator_status.get("pending_tasks", 0)
|
|
completed_tasks = coordinator_status.get("completed_tasks", 0)
|
|
|
|
# Calculate uptime (placeholder - could be enhanced with actual uptime tracking)
|
|
uptime = coordinator_status.get("uptime", 0.0)
|
|
|
|
return SystemStatusResponse(
|
|
components=components,
|
|
agents=agents_info,
|
|
total_agents=total_agents,
|
|
active_tasks=active_tasks,
|
|
pending_tasks=pending_tasks,
|
|
completed_tasks=completed_tasks,
|
|
uptime=uptime,
|
|
version="1.1.0",
|
|
message="System health check completed successfully"
|
|
)
|
|
|
|
except Exception as e:
|
|
logger.error(f"Health check failed: {e}")
|
|
raise HTTPException(
|
|
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
|
|
detail=f"Health check failed: {str(e)}"
|
|
)
|
|
|
|
|
|
# Configure custom OpenAPI schema
|
|
def get_custom_openapi():
|
|
return custom_openapi_schema(app)
|
|
|
|
app.openapi = get_custom_openapi
|
|
|
|
# Socket.IO server setup
|
|
sio = socketio.AsyncServer(
|
|
async_mode='asgi',
|
|
cors_allowed_origins="*",
|
|
logger=True,
|
|
engineio_logger=False
|
|
)
|
|
|
|
# Socket.IO event handlers
|
|
@sio.event
|
|
async def connect(sid, environ):
|
|
"""Handle client connection"""
|
|
print(f"🔌 Socket.IO client {sid} connected")
|
|
await sio.emit('connection_confirmed', {
|
|
'status': 'connected',
|
|
'timestamp': datetime.now().isoformat(),
|
|
'message': 'Connected to Hive Socket.IO server'
|
|
}, room=sid)
|
|
|
|
@sio.event
|
|
async def disconnect(sid):
|
|
"""Handle client disconnection"""
|
|
print(f"🔌 Socket.IO client {sid} disconnected")
|
|
|
|
@sio.event
|
|
async def join_room(sid, data):
|
|
"""Handle client joining a room/topic"""
|
|
room = data.get('room', 'general')
|
|
await sio.enter_room(sid, room)
|
|
print(f"🔌 Client {sid} joined room: {room}")
|
|
|
|
await sio.emit('room_joined', {
|
|
'room': room,
|
|
'timestamp': datetime.now().isoformat(),
|
|
'message': f'Successfully joined {room} room'
|
|
}, room=sid)
|
|
|
|
@sio.event
|
|
async def leave_room(sid, data):
|
|
"""Handle client leaving a room/topic"""
|
|
room = data.get('room', 'general')
|
|
await sio.leave_room(sid, room)
|
|
print(f"🔌 Client {sid} left room: {room}")
|
|
|
|
await sio.emit('room_left', {
|
|
'room': room,
|
|
'timestamp': datetime.now().isoformat(),
|
|
'message': f'Successfully left {room} room'
|
|
}, room=sid)
|
|
|
|
@sio.event
|
|
async def subscribe(sid, data):
|
|
"""Handle event subscription"""
|
|
events = data.get('events', [])
|
|
room = data.get('room', 'general')
|
|
|
|
# Join the room if not already joined
|
|
await sio.enter_room(sid, room)
|
|
|
|
print(f"🔌 Client {sid} subscribed to events: {events} in room: {room}")
|
|
|
|
await sio.emit('subscription_confirmed', {
|
|
'events': events,
|
|
'room': room,
|
|
'timestamp': datetime.now().isoformat(),
|
|
'message': f'Subscribed to {len(events)} events in {room} room'
|
|
}, room=sid)
|
|
|
|
@sio.event
|
|
async def ping(sid):
|
|
"""Handle ping from client"""
|
|
await sio.emit('pong', {
|
|
'timestamp': datetime.now().isoformat()
|
|
}, room=sid)
|
|
|
|
# Socket.IO connection manager
|
|
class SocketIOManager:
|
|
def __init__(self, socketio_server):
|
|
self.sio = socketio_server
|
|
|
|
async def send_to_room(self, room: str, event: str, data: dict):
|
|
"""Send event to all clients in a room"""
|
|
try:
|
|
await self.sio.emit(event, data, room=room)
|
|
except Exception as e:
|
|
print(f"Error sending to room {room}: {e}")
|
|
|
|
async def broadcast(self, event: str, data: dict):
|
|
"""Broadcast event to all connected clients"""
|
|
try:
|
|
await self.sio.emit(event, data)
|
|
except Exception as e:
|
|
print(f"Error broadcasting event {event}: {e}")
|
|
|
|
async def send_to_client(self, sid: str, event: str, data: dict):
|
|
"""Send event to a specific client"""
|
|
try:
|
|
await self.sio.emit(event, data, room=sid)
|
|
except Exception as e:
|
|
print(f"Error sending to client {sid}: {e}")
|
|
|
|
manager = SocketIOManager(sio)
|
|
|
|
# Socket.IO integration with FastAPI
|
|
# The socket.io server is integrated below in the app creation
|
|
|
|
@app.get("/")
|
|
async def root():
|
|
"""Root endpoint"""
|
|
return {
|
|
"message": "🐝 Welcome to Hive - Distributed AI Orchestration Platform",
|
|
"status": "operational",
|
|
"version": "1.0.0",
|
|
"api_docs": "/docs",
|
|
"timestamp": datetime.now().isoformat()
|
|
}
|
|
|
|
# Removed duplicate /health endpoint - using the enhanced one above
|
|
|
|
@app.get("/api/health", response_model=None)
|
|
async def health_check():
|
|
"""Simple health check endpoint"""
|
|
return {
|
|
"status": "healthy",
|
|
"timestamp": datetime.now().isoformat(),
|
|
"version": "1.0.0",
|
|
"message": "Hive API is operational"
|
|
}
|
|
|
|
@app.get("/api/status")
|
|
async def get_system_status():
|
|
"""Get comprehensive system status"""
|
|
return await unified_coordinator.get_comprehensive_status()
|
|
|
|
@app.get("/api/metrics")
|
|
async def get_metrics():
|
|
"""Prometheus metrics endpoint"""
|
|
return await unified_coordinator.get_prometheus_metrics()
|
|
|
|
# Make manager and coordinator available to other modules
|
|
app.state.socketio_manager = manager
|
|
app.state.unified_coordinator = unified_coordinator
|
|
# Backward compatibility aliases
|
|
app.state.hive_coordinator = unified_coordinator
|
|
app.state.distributed_coordinator = unified_coordinator
|
|
|
|
# Create Socket.IO ASGI app
|
|
socket_app = socketio.ASGIApp(sio, other_asgi_app=app, socketio_path='/socket.io')
|
|
|
|
if __name__ == "__main__":
|
|
uvicorn.run(
|
|
"app.main:socket_app",
|
|
host="0.0.0.0",
|
|
port=8000,
|
|
reload=True,
|
|
log_level="info"
|
|
) |