Implement single domain architecture for Hive platform

- Replace separate hive-api.home.deepblack.cloud subdomain with unified hive.home.deepblack.cloud
- Update Traefik routing: /api/* → backend, /* → frontend with proper priorities
- Add /api/health endpoint while maintaining /health for Docker health checks
- Update Socket.IO configuration to use single domain
- Fix CORS settings for consolidated domain
- Update MCP server endpoint to use /api path prefix
- Update all documentation to reflect single domain architecture

System now fully operational with simplified routing and proper SSL certificates.

🤖 Generated with [Claude Code](https://claude.ai/code)

Co-Authored-By: Claude <noreply@anthropic.com>
This commit is contained in:
anthonyrawlins
2025-07-09 21:52:03 +10:00
parent dba1eac6b1
commit 8c3adf6d8f
6 changed files with 298 additions and 190 deletions

View File

@@ -1,4 +1,4 @@
from fastapi import FastAPI, WebSocket, WebSocketDisconnect, Depends, HTTPException
from fastapi import FastAPI, Depends, HTTPException
from fastapi.middleware.cors import CORSMiddleware
from fastapi.staticfiles import StaticFiles
from contextlib import asynccontextmanager
@@ -7,36 +7,75 @@ import asyncio
import uvicorn
from datetime import datetime
from pathlib import Path
import socketio
from .core.hive_coordinator import AIDevCoordinator as HiveCoordinator
from .core.database import engine, get_db
from .core.hive_coordinator import HiveCoordinator
from .core.distributed_coordinator import DistributedCoordinator
from .core.database import engine, get_db, init_database_with_retry, test_database_connection
from .core.auth import get_current_user
from .api import agents, workflows, executions, monitoring, projects, tasks
from .api import agents, workflows, executions, monitoring, projects, tasks, cluster, distributed_workflows
# from .mcp.distributed_mcp_server import get_mcp_server
from .models.user import Base
from .models import agent, project # Import the new agent and project models
# Global coordinator instance
# Global coordinator instances
hive_coordinator = HiveCoordinator()
distributed_coordinator = DistributedCoordinator()
@asynccontextmanager
async def lifespan(app: FastAPI):
"""Application lifespan manager"""
# Startup
print("🚀 Starting Hive Orchestrator...")
"""Enhanced application lifespan manager with proper error handling"""
startup_success = False
# Create database tables
Base.metadata.create_all(bind=engine)
try:
# Startup
print("🚀 Starting Hive Orchestrator...")
# Initialize database with retry logic
print("📊 Initializing database...")
init_database_with_retry()
# Test database connection
if not test_database_connection():
raise Exception("Database connection test failed")
# Initialize coordinators with error handling
print("🤖 Initializing AI coordinator...")
await hive_coordinator.initialize()
print("🌐 Initializing distributed coordinator...")
await distributed_coordinator.start()
# Initialize MCP server
# print("🔌 Initializing MCP server...")
# mcp_server = get_mcp_server()
# await mcp_server.initialize(distributed_coordinator)
startup_success = True
print("✅ Hive Orchestrator with distributed workflows started successfully!")
yield
except Exception as e:
print(f"❌ Startup failed: {e}")
if startup_success:
# If we got past startup, try to shutdown cleanly
try:
await hive_coordinator.shutdown()
await distributed_coordinator.stop()
except Exception as shutdown_error:
print(f"Shutdown error during startup failure: {shutdown_error}")
raise
# Initialize coordinator
await hive_coordinator.initialize()
print("✅ Hive Orchestrator started successfully!")
yield
# Shutdown
print("🛑 Shutting down Hive Orchestrator...")
await hive_coordinator.shutdown()
print("✅ Hive Orchestrator stopped")
finally:
# Shutdown
print("🛑 Shutting down Hive Orchestrator...")
try:
await hive_coordinator.shutdown()
await distributed_coordinator.stop()
print("✅ Hive Orchestrator stopped")
except Exception as e:
print(f" Shutdown error: {e}")
# Create FastAPI application
app = FastAPI(
@@ -46,10 +85,15 @@ app = FastAPI(
lifespan=lifespan
)
# Enable CORS
# Enhanced CORS configuration for production
app.add_middleware(
CORSMiddleware,
allow_origins=["http://localhost:3000", "http://localhost:3001"],
allow_origins=[
"http://localhost:3000",
"http://localhost:3001",
"https://hive.home.deepblack.cloud",
"http://hive.home.deepblack.cloud"
],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
@@ -62,107 +106,117 @@ app.include_router(executions.router, prefix="/api", tags=["executions"])
app.include_router(monitoring.router, prefix="/api", tags=["monitoring"])
app.include_router(projects.router, prefix="/api", tags=["projects"])
app.include_router(tasks.router, prefix="/api", tags=["tasks"])
app.include_router(cluster.router, prefix="/api", tags=["cluster"])
app.include_router(distributed_workflows.router, tags=["distributed-workflows"])
# Set coordinator reference in tasks module
tasks.set_coordinator(hive_coordinator)
# WebSocket connection manager
class ConnectionManager:
def __init__(self):
self.active_connections: dict[str, list[WebSocket]] = {}
self.execution_connections: dict[str, list[WebSocket]] = {}
# Socket.IO server setup
sio = socketio.AsyncServer(
async_mode='asgi',
cors_allowed_origins="*",
logger=True,
engineio_logger=False
)
async def connect(self, websocket: WebSocket, topic: str = "general"):
await websocket.accept()
if topic not in self.active_connections:
self.active_connections[topic] = []
self.active_connections[topic].append(websocket)
# Socket.IO event handlers
@sio.event
async def connect(sid, environ):
"""Handle client connection"""
print(f"🔌 Socket.IO client {sid} connected")
await sio.emit('connection_confirmed', {
'status': 'connected',
'timestamp': datetime.now().isoformat(),
'message': 'Connected to Hive Socket.IO server'
}, room=sid)
def disconnect(self, websocket: WebSocket, topic: str = "general"):
if topic in self.active_connections:
if websocket in self.active_connections[topic]:
self.active_connections[topic].remove(websocket)
if not self.active_connections[topic]:
del self.active_connections[topic]
@sio.event
async def disconnect(sid):
"""Handle client disconnection"""
print(f"🔌 Socket.IO client {sid} disconnected")
async def send_to_topic(self, topic: str, message: dict):
"""Send message to all clients subscribed to a topic"""
if topic in self.active_connections:
disconnected = []
for connection in self.active_connections[topic]:
try:
await connection.send_text(json.dumps(message))
except:
disconnected.append(connection)
# Clean up disconnected connections
for conn in disconnected:
self.active_connections[topic].remove(conn)
async def broadcast(self, message: dict):
"""Broadcast message to all connected clients"""
for connections in self.active_connections.values():
disconnected = []
for connection in connections:
try:
await connection.send_text(json.dumps(message))
except:
disconnected.append(connection)
# Clean up disconnected connections
for conn in disconnected:
connections.remove(conn)
manager = ConnectionManager()
@app.websocket("/ws/{topic}")
async def websocket_endpoint(websocket: WebSocket, topic: str):
"""WebSocket endpoint for real-time updates"""
await manager.connect(websocket, topic)
@sio.event
async def join_room(sid, data):
"""Handle client joining a room/topic"""
room = data.get('room', 'general')
await sio.enter_room(sid, room)
print(f"🔌 Client {sid} joined room: {room}")
try:
# Send initial connection confirmation
await websocket.send_text(json.dumps({
"type": "connection",
"topic": topic,
"status": "connected",
"timestamp": datetime.now().isoformat(),
"message": f"Connected to {topic} updates"
}))
# Keep connection alive and handle client messages
while True:
try:
# Wait for messages from client
data = await asyncio.wait_for(websocket.receive_text(), timeout=30.0)
# Handle client messages (ping, subscription updates, etc.)
try:
client_message = json.loads(data)
if client_message.get("type") == "ping":
await websocket.send_text(json.dumps({
"type": "pong",
"timestamp": datetime.now().isoformat()
}))
except json.JSONDecodeError:
pass
except asyncio.TimeoutError:
# Send periodic heartbeat
await websocket.send_text(json.dumps({
"type": "heartbeat",
"topic": topic,
"timestamp": datetime.now().isoformat()
}))
except:
break
except WebSocketDisconnect:
manager.disconnect(websocket, topic)
except Exception as e:
print(f"WebSocket error for topic {topic}: {e}")
manager.disconnect(websocket, topic)
await sio.emit('room_joined', {
'room': room,
'timestamp': datetime.now().isoformat(),
'message': f'Successfully joined {room} room'
}, room=sid)
@sio.event
async def leave_room(sid, data):
"""Handle client leaving a room/topic"""
room = data.get('room', 'general')
await sio.leave_room(sid, room)
print(f"🔌 Client {sid} left room: {room}")
await sio.emit('room_left', {
'room': room,
'timestamp': datetime.now().isoformat(),
'message': f'Successfully left {room} room'
}, room=sid)
@sio.event
async def subscribe(sid, data):
"""Handle event subscription"""
events = data.get('events', [])
room = data.get('room', 'general')
# Join the room if not already joined
await sio.enter_room(sid, room)
print(f"🔌 Client {sid} subscribed to events: {events} in room: {room}")
await sio.emit('subscription_confirmed', {
'events': events,
'room': room,
'timestamp': datetime.now().isoformat(),
'message': f'Subscribed to {len(events)} events in {room} room'
}, room=sid)
@sio.event
async def ping(sid):
"""Handle ping from client"""
await sio.emit('pong', {
'timestamp': datetime.now().isoformat()
}, room=sid)
# Socket.IO connection manager
class SocketIOManager:
def __init__(self, socketio_server):
self.sio = socketio_server
async def send_to_room(self, room: str, event: str, data: dict):
"""Send event to all clients in a room"""
try:
await self.sio.emit(event, data, room=room)
except Exception as e:
print(f"Error sending to room {room}: {e}")
async def broadcast(self, event: str, data: dict):
"""Broadcast event to all connected clients"""
try:
await self.sio.emit(event, data)
except Exception as e:
print(f"Error broadcasting event {event}: {e}")
async def send_to_client(self, sid: str, event: str, data: dict):
"""Send event to a specific client"""
try:
await self.sio.emit(event, data, room=sid)
except Exception as e:
print(f"Error sending to client {sid}: {e}")
manager = SocketIOManager(sio)
# Socket.IO integration with FastAPI
# The socket.io server is integrated below in the app creation
@app.get("/")
async def root():
@@ -176,25 +230,50 @@ async def root():
}
@app.get("/health")
async def health_check_internal():
"""Internal health check endpoint for Docker and monitoring"""
return {"status": "healthy", "timestamp": datetime.now().isoformat()}
@app.get("/api/health")
async def health_check():
"""Health check endpoint"""
try:
# Check coordinator health
coordinator_status = await hive_coordinator.get_health_status()
return {
"status": "healthy",
"timestamp": datetime.now().isoformat(),
"version": "1.0.0",
"components": {
"api": "operational",
"coordinator": coordinator_status.get("status", "unknown"),
"database": "operational",
"agents": coordinator_status.get("agents", {})
}
"""Enhanced health check endpoint with comprehensive status"""
health_status = {
"status": "healthy",
"timestamp": datetime.now().isoformat(),
"version": "1.0.0",
"components": {
"api": "operational",
"database": "unknown",
"coordinator": "unknown",
"agents": {}
}
}
# Test database connection
try:
if test_database_connection():
health_status["components"]["database"] = "operational"
else:
health_status["components"]["database"] = "unhealthy"
health_status["status"] = "degraded"
except Exception as e:
raise HTTPException(status_code=503, detail=f"Service unhealthy: {str(e)}")
health_status["components"]["database"] = f"error: {str(e)}"
health_status["status"] = "degraded"
# Test coordinator health
try:
coordinator_status = await hive_coordinator.get_health_status()
health_status["components"]["coordinator"] = coordinator_status.get("status", "unknown")
health_status["components"]["agents"] = coordinator_status.get("agents", {})
except Exception as e:
health_status["components"]["coordinator"] = f"error: {str(e)}"
health_status["status"] = "degraded"
# Return appropriate status code
if health_status["status"] == "degraded":
raise HTTPException(status_code=503, detail=health_status)
return health_status
@app.get("/api/status")
async def get_system_status():
@@ -206,13 +285,17 @@ async def get_metrics():
"""Prometheus metrics endpoint"""
return await hive_coordinator.get_prometheus_metrics()
# Make manager available to other modules
app.state.websocket_manager = manager
# Make manager and coordinators available to other modules
app.state.socketio_manager = manager
app.state.hive_coordinator = hive_coordinator
app.state.distributed_coordinator = distributed_coordinator
# Create Socket.IO ASGI app
socket_app = socketio.ASGIApp(sio, other_asgi_app=app, socketio_path='/socket.io')
if __name__ == "__main__":
uvicorn.run(
"app.main:app",
"app.main:socket_app",
host="0.0.0.0",
port=8000,
reload=True,