Files
hive/backend/app/services/cluster_service.py
anthonyrawlins b6bff318d9 WIP: Save current work before CHORUS rebrand
- Agent roles integration progress
- Various backend and frontend updates
- Storybook cache cleanup

🤖 Generated with [Claude Code](https://claude.ai/code)

Co-Authored-By: Claude <noreply@anthropic.com>
2025-08-01 02:20:56 +10:00

457 lines
18 KiB
Python

"""
Cluster Service for monitoring cluster nodes and their capabilities.
"""
import requests
import json
from typing import Dict, List, Optional, Any
from datetime import datetime
import subprocess
import psutil
import platform
class ClusterService:
def __init__(self):
self.cluster_nodes = {
"walnut": {
"ip": "192.168.1.27",
"hostname": "walnut",
"role": "manager",
"gpu": "AMD RX 9060 XT",
"memory": "64GB",
"cpu": "AMD Ryzen 7 5800X3D",
"ollama_port": 11434,
"cockpit_port": 9090
},
"ironwood": {
"ip": "192.168.1.113",
"hostname": "ironwood",
"role": "worker",
"gpu": "NVIDIA RTX 2080S",
"memory": "128GB",
"cpu": "AMD Threadripper 2920X",
"ollama_port": 11434,
"cockpit_port": 9090
},
"acacia": {
"ip": "192.168.1.72",
"hostname": "acacia",
"role": "worker",
"gpu": "NVIDIA GTX 1070",
"memory": "128GB",
"cpu": "Intel Xeon E5-2680 v4",
"ollama_port": 11434,
"cockpit_port": 9090
},
"forsteinet": {
"ip": "192.168.1.106",
"hostname": "forsteinet",
"role": "worker",
"gpu": "AMD RX Vega 56/64",
"memory": "32GB",
"cpu": "Intel Core i7-4770",
"ollama_port": 11434,
"cockpit_port": 9090
}
}
self.n8n_api_base = "https://n8n.home.deepblack.cloud/api/v1"
self.n8n_api_key = self._get_n8n_api_key()
def _get_live_hardware_info(self, hostname: str, ip: str) -> Dict[str, str]:
"""Get live hardware information from a remote node via SSH."""
hardware = {
"cpu": "Unknown",
"memory": "Unknown",
"gpu": "Unknown"
}
try:
# Try to get GPU info via SSH
print(f"🔍 SSH GPU command for {hostname}: ssh tony@{ip} 'nvidia-smi || lspci | grep -i vga'")
gpu_result = subprocess.run([
"ssh", "-o", "StrictHostKeyChecking=no", "-o", "ConnectTimeout=5",
f"tony@{ip}", "nvidia-smi --query-gpu=name --format=csv,noheader,nounits || lspci | grep -i 'vga\\|3d\\|display'"
], capture_output=True, text=True, timeout=10)
print(f"📊 GPU command result for {hostname}: returncode={gpu_result.returncode}, stdout='{gpu_result.stdout.strip()}', stderr='{gpu_result.stderr.strip()}'")
if gpu_result.returncode == 0 and gpu_result.stdout.strip():
gpu_info = gpu_result.stdout.strip().split('\n')[0]
if "NVIDIA" in gpu_info or "RTX" in gpu_info or "GTX" in gpu_info:
hardware["gpu"] = gpu_info.strip()
elif "VGA" in gpu_info or "Display" in gpu_info:
# Parse lspci output for GPU info
if "NVIDIA" in gpu_info:
parts = gpu_info.split("NVIDIA")
if len(parts) > 1:
gpu_name = "NVIDIA" + parts[1].split('[')[0].strip()
hardware["gpu"] = gpu_name
elif "AMD" in gpu_info or "Radeon" in gpu_info:
parts = gpu_info.split(":")
if len(parts) > 2:
gpu_name = parts[2].strip()
hardware["gpu"] = gpu_name
# Try to get memory info via SSH
mem_result = subprocess.run([
"ssh", "-o", "StrictHostKeyChecking=no", "-o", "ConnectTimeout=5",
f"tony@{ip}", "free -h | grep '^Mem:' | awk '{print $2}'"
], capture_output=True, text=True, timeout=10)
if mem_result.returncode == 0 and mem_result.stdout.strip():
memory_info = mem_result.stdout.strip()
hardware["memory"] = memory_info
# Try to get CPU info via SSH
cpu_result = subprocess.run([
"ssh", "-o", "StrictHostKeyChecking=no", "-o", "ConnectTimeout=5",
f"tony@{ip}", "lscpu | grep 'Model name:' | cut -d':' -f2- | xargs"
], capture_output=True, text=True, timeout=10)
if cpu_result.returncode == 0 and cpu_result.stdout.strip():
cpu_info = cpu_result.stdout.strip()
hardware["cpu"] = cpu_info
except Exception as e:
print(f"Error getting live hardware info for {hostname}: {e}")
return hardware
def _get_n8n_api_key(self) -> Optional[str]:
"""Get n8n API key from secrets."""
try:
from pathlib import Path
api_key_path = Path("/home/tony/AI/secrets/passwords_and_tokens/n8n-api-key")
if api_key_path.exists():
return api_key_path.read_text().strip()
except Exception:
pass
return "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJzdWIiOiI4NTE3ODg3Yy0zYTI4LTRmMmEtOTA3Ni05NzBkNmFkMWE4MjEiLCJpc3MiOiJuOG4iLCJhdWQiOiJwdWJsaWMtYXBpIiwiaWF0IjoxNzUwMzMzMDI2fQ.NST0HBBjk0_DbQTO9QT17VYU-kZ5XBHoIM5HTt2sbkM"
def get_cluster_overview(self) -> Dict[str, Any]:
"""Get overview of entire cluster status."""
nodes = []
total_models = 0
active_nodes = 0
for node_id, node_info in self.cluster_nodes.items():
node_status = self._get_node_status(node_id)
nodes.append(node_status)
if node_status["status"] == "online":
active_nodes += 1
total_models += node_status["model_count"]
return {
"cluster_name": "deepblackcloud",
"total_nodes": len(self.cluster_nodes),
"active_nodes": active_nodes,
"total_models": total_models,
"nodes": nodes,
"last_updated": datetime.now().isoformat()
}
def _get_node_status(self, node_id: str) -> Dict[str, Any]:
"""Get detailed status for a specific node."""
node_info = self.cluster_nodes.get(node_id)
if not node_info:
return {"error": "Node not found"}
# Check if node is reachable
status = "offline"
models = []
model_count = 0
try:
# Check Ollama API
response = requests.get(
f"http://{node_info['ip']}:{node_info['ollama_port']}/api/tags",
timeout=5
)
if response.status_code == 200:
status = "online"
models_data = response.json()
models = models_data.get("models", [])
model_count = len(models)
except Exception:
pass
# Get system metrics if this is the local node
cpu_percent = None
memory_percent = None
disk_usage = None
if node_info["hostname"] == platform.node():
try:
cpu_percent = psutil.cpu_percent(interval=1)
memory = psutil.virtual_memory()
memory_percent = memory.percent
disk = psutil.disk_usage('/')
disk_usage = {
"total": disk.total,
"used": disk.used,
"free": disk.free,
"percent": (disk.used / disk.total) * 100
}
except Exception:
pass
# Try to get live hardware info if node is online
hardware_info = {
"cpu": node_info["cpu"],
"memory": node_info["memory"],
"gpu": node_info["gpu"]
}
if status == "online":
try:
print(f"🔍 Getting live hardware info for {node_id} ({node_info['ip']})")
live_hardware = self._get_live_hardware_info(node_info["hostname"], node_info["ip"])
print(f"📊 Live hardware detected for {node_id}: {live_hardware}")
# Use live data if available, fallback to hardcoded values
for key in ["cpu", "memory", "gpu"]:
if live_hardware[key] != "Unknown":
print(f"✅ Using live {key} for {node_id}: {live_hardware[key]}")
hardware_info[key] = live_hardware[key]
else:
print(f"⚠️ Using fallback {key} for {node_id}: {hardware_info[key]}")
except Exception as e:
print(f"❌ Failed to get live hardware info for {node_id}: {e}")
return {
"id": node_id,
"hostname": node_info["hostname"],
"ip": node_info["ip"],
"status": status,
"role": node_info["role"],
"hardware": hardware_info,
"model_count": model_count,
"models": [{"name": m["name"], "size": m.get("size", 0)} for m in models],
"metrics": {
"cpu_percent": cpu_percent,
"memory_percent": memory_percent,
"disk_usage": disk_usage
},
"services": {
"ollama": f"http://{node_info['ip']}:{node_info['ollama_port']}",
"cockpit": f"https://{node_info['ip']}:{node_info['cockpit_port']}"
},
"last_check": datetime.now().isoformat()
}
def get_node_details(self, node_id: str) -> Optional[Dict[str, Any]]:
"""Get detailed information about a specific node."""
return self._get_node_status(node_id)
def get_available_models(self) -> Dict[str, List[Dict[str, Any]]]:
"""Get all available models across all nodes."""
models_by_node = {}
for node_id, node_info in self.cluster_nodes.items():
try:
response = requests.get(
f"http://{node_info['ip']}:{node_info['ollama_port']}/api/tags",
timeout=5
)
if response.status_code == 200:
models_data = response.json()
models = models_data.get("models", [])
models_by_node[node_id] = [
{
"name": m["name"],
"size": m.get("size", 0),
"modified": m.get("modified_at", ""),
"node": node_id,
"node_ip": node_info["ip"]
}
for m in models
]
else:
models_by_node[node_id] = []
except Exception:
models_by_node[node_id] = []
return models_by_node
def get_n8n_workflows(self) -> List[Dict[str, Any]]:
"""Get n8n workflows from the cluster."""
workflows = []
if not self.n8n_api_key:
return workflows
try:
headers = {
"X-N8N-API-KEY": self.n8n_api_key,
"Content-Type": "application/json"
}
response = requests.get(
f"{self.n8n_api_base}/workflows",
headers=headers,
timeout=10
)
if response.status_code == 200:
workflows_data = response.json()
# Process workflows to extract relevant information
for workflow in workflows_data:
workflows.append({
"id": workflow.get("id"),
"name": workflow.get("name"),
"active": workflow.get("active", False),
"created_at": workflow.get("createdAt"),
"updated_at": workflow.get("updatedAt"),
"tags": workflow.get("tags", []),
"node_count": len(workflow.get("nodes", [])),
"webhook_url": self._extract_webhook_url(workflow),
"description": self._extract_workflow_description(workflow)
})
except Exception as e:
print(f"Error fetching n8n workflows: {e}")
return workflows
def _extract_webhook_url(self, workflow: Dict) -> Optional[str]:
"""Extract webhook URL from workflow if it exists."""
nodes = workflow.get("nodes", [])
for node in nodes:
if node.get("type") == "n8n-nodes-base.webhook":
webhook_path = node.get("parameters", {}).get("path", "")
if webhook_path:
return f"https://n8n.home.deepblack.cloud/webhook/{webhook_path}"
return None
def _extract_workflow_description(self, workflow: Dict) -> str:
"""Extract workflow description from notes or nodes."""
# Try to get description from workflow notes
notes = workflow.get("notes", "")
if notes:
return notes[:200] + "..." if len(notes) > 200 else notes
# Try to infer from node types
nodes = workflow.get("nodes", [])
node_types = [node.get("type", "").split(".")[-1] for node in nodes]
if "webhook" in node_types:
return "Webhook-triggered workflow"
elif "ollama" in [nt.lower() for nt in node_types]:
return "AI model workflow"
else:
return f"Workflow with {len(nodes)} nodes"
def get_cluster_metrics(self) -> Dict[str, Any]:
"""Get aggregated cluster metrics."""
total_models = 0
active_nodes = 0
total_memory = 0
total_cpu_cores = 0
# Hardware specifications from CLUSTER_INFO.md
hardware_specs = {
"walnut": {"memory_gb": 64, "cpu_cores": 8},
"ironwood": {"memory_gb": 128, "cpu_cores": 12},
"acacia": {"memory_gb": 128, "cpu_cores": 56},
"forsteinet": {"memory_gb": 32, "cpu_cores": 4}
}
for node_id, node_info in self.cluster_nodes.items():
node_status = self._get_node_status(node_id)
if node_status["status"] == "online":
active_nodes += 1
total_models += node_status["model_count"]
# Add hardware specs
specs = hardware_specs.get(node_id, {})
total_memory += specs.get("memory_gb", 0)
total_cpu_cores += specs.get("cpu_cores", 0)
return {
"cluster_health": {
"total_nodes": len(self.cluster_nodes),
"active_nodes": active_nodes,
"offline_nodes": len(self.cluster_nodes) - active_nodes,
"health_percentage": (active_nodes / len(self.cluster_nodes)) * 100
},
"compute_resources": {
"total_memory_gb": total_memory,
"total_cpu_cores": total_cpu_cores,
"total_models": total_models
},
"services": {
"ollama_endpoints": active_nodes,
"n8n_workflows": len(self.get_n8n_workflows()),
"docker_swarm_active": self._check_docker_swarm()
},
"last_updated": datetime.now().isoformat()
}
def _check_docker_swarm(self) -> bool:
"""Check if Docker Swarm is active."""
try:
result = subprocess.run(
["docker", "info", "--format", "{{.Swarm.LocalNodeState}}"],
capture_output=True,
text=True,
timeout=5
)
return result.stdout.strip() == "active"
except Exception:
return False
def get_workflow_executions(self, limit: int = 10) -> List[Dict[str, Any]]:
"""Get recent workflow executions from n8n."""
executions = []
if not self.n8n_api_key:
return executions
try:
headers = {
"X-N8N-API-KEY": self.n8n_api_key,
"Content-Type": "application/json"
}
response = requests.get(
f"{self.n8n_api_base}/executions",
headers=headers,
params={"limit": limit},
timeout=10
)
if response.status_code == 200:
executions_data = response.json()
for execution in executions_data:
executions.append({
"id": execution.get("id"),
"workflow_id": execution.get("workflowId"),
"mode": execution.get("mode"),
"status": execution.get("finished") and "success" or "running",
"started_at": execution.get("startedAt"),
"finished_at": execution.get("stoppedAt"),
"duration": self._calculate_duration(
execution.get("startedAt"),
execution.get("stoppedAt")
)
})
except Exception as e:
print(f"Error fetching workflow executions: {e}")
return executions
def _calculate_duration(self, start_time: str, end_time: str) -> Optional[int]:
"""Calculate duration between start and end times in seconds."""
if not start_time or not end_time:
return None
try:
start = datetime.fromisoformat(start_time.replace('Z', '+00:00'))
end = datetime.fromisoformat(end_time.replace('Z', '+00:00'))
return int((end - start).total_seconds())
except Exception:
return None