Add comprehensive frontend UI and distributed infrastructure

Frontend Enhancements:
- Complete React TypeScript frontend with modern UI components
- Distributed workflows management interface with real-time updates
- Socket.IO integration for live agent status monitoring
- Agent management dashboard with cluster visualization
- Project management interface with metrics and task tracking
- Responsive design with proper error handling and loading states

Backend Infrastructure:
- Distributed coordinator for multi-agent workflow orchestration
- Cluster management API with comprehensive agent operations
- Enhanced database models for agents and projects
- Project service for filesystem-based project discovery
- Performance monitoring and metrics collection
- Comprehensive API documentation and error handling

Documentation:
- Complete distributed development guide (README_DISTRIBUTED.md)
- Comprehensive development report with architecture insights
- System configuration templates and deployment guides

The platform now provides a complete web interface for managing the distributed AI cluster
with real-time monitoring, workflow orchestration, and agent coordination capabilities.

🤖 Generated with [Claude Code](https://claude.ai/code)

Co-Authored-By: Claude <noreply@anthropic.com>
This commit is contained in:
anthonyrawlins
2025-07-10 08:41:59 +10:00
parent fc0eec91ef
commit 85bf1341f3
28348 changed files with 2646896 additions and 69 deletions

View File

@@ -0,0 +1,69 @@
"""
Cluster API endpoints for monitoring cluster nodes and workflows.
"""
from fastapi import APIRouter, HTTPException
from typing import Dict, Any, List, Optional
from ..services.cluster_service import ClusterService
router = APIRouter()
cluster_service = ClusterService()
@router.get("/cluster/overview")
async def get_cluster_overview() -> Dict[str, Any]:
"""Get overview of entire cluster status."""
try:
return cluster_service.get_cluster_overview()
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@router.get("/cluster/nodes")
async def get_cluster_nodes() -> Dict[str, Any]:
"""Get status of all cluster nodes."""
try:
overview = cluster_service.get_cluster_overview()
return {"nodes": overview["nodes"]}
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@router.get("/cluster/nodes/{node_id}")
async def get_node_details(node_id: str) -> Dict[str, Any]:
"""Get detailed information about a specific node."""
try:
node_details = cluster_service.get_node_details(node_id)
if not node_details:
raise HTTPException(status_code=404, detail="Node not found")
return node_details
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@router.get("/cluster/models")
async def get_available_models() -> Dict[str, List[Dict[str, Any]]]:
"""Get all available models across all nodes."""
try:
return cluster_service.get_available_models()
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@router.get("/cluster/workflows")
async def get_n8n_workflows() -> List[Dict[str, Any]]:
"""Get n8n workflows from the cluster."""
try:
return cluster_service.get_n8n_workflows()
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@router.get("/cluster/metrics")
async def get_cluster_metrics() -> Dict[str, Any]:
"""Get aggregated cluster metrics."""
try:
return cluster_service.get_cluster_metrics()
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@router.get("/cluster/executions")
async def get_workflow_executions(limit: int = 10) -> List[Dict[str, Any]]:
"""Get recent workflow executions from n8n."""
try:
return cluster_service.get_workflow_executions(limit)
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))

View File

@@ -0,0 +1,661 @@
"""
Distributed Development Workflow Coordinator
Enhanced orchestration system for cluster-wide development workflows
"""
import asyncio
import time
from typing import Dict, List, Optional, Any, Set
from dataclasses import dataclass, field
from enum import Enum
import aiohttp
import redis.asyncio as redis
from prometheus_client import Counter, Histogram, Gauge
import logging
from concurrent.futures import ThreadPoolExecutor
import json
import hashlib
logger = logging.getLogger(__name__)
# Performance Metrics
TASK_COUNTER = Counter('hive_tasks_total', 'Total tasks processed', ['task_type', 'agent'])
TASK_DURATION = Histogram('hive_task_duration_seconds', 'Task execution time', ['task_type', 'agent'])
ACTIVE_TASKS = Gauge('hive_active_tasks', 'Currently active tasks', ['agent'])
AGENT_UTILIZATION = Gauge('hive_agent_utilization', 'Agent utilization percentage', ['agent'])
class TaskType(Enum):
"""Task types for specialized agent assignment"""
CODE_GENERATION = "code_generation"
CODE_REVIEW = "code_review"
TESTING = "testing"
COMPILATION = "compilation"
OPTIMIZATION = "optimization"
DOCUMENTATION = "documentation"
DEPLOYMENT = "deployment"
class TaskPriority(Enum):
"""Task priority levels"""
CRITICAL = 1
HIGH = 2
NORMAL = 3
LOW = 4
@dataclass
class Agent:
"""Enhanced agent representation with performance tracking"""
id: str
endpoint: str
model: str
gpu_type: str
specializations: List[TaskType]
max_concurrent: int = 3
current_load: int = 0
performance_score: float = 1.0
last_response_time: float = 0.0
connection_pool: Optional[aiohttp.TCPConnector] = None
health_status: str = "healthy"
def __post_init__(self):
"""Initialize connection pool for this agent"""
self.connection_pool = aiohttp.TCPConnector(
limit=10,
limit_per_host=5,
keepalive_timeout=30,
enable_cleanup_closed=True
)
@dataclass
class Task:
"""Enhanced task with distributed execution support"""
id: str
type: TaskType
priority: TaskPriority
payload: Dict[str, Any]
dependencies: List[str] = field(default_factory=list)
estimated_duration: float = 0.0
created_at: float = field(default_factory=time.time)
assigned_agent: Optional[str] = None
result: Optional[Dict[str, Any]] = None
status: str = "pending"
subtasks: List[str] = field(default_factory=list)
@property
def cache_key(self) -> str:
"""Generate cache key for task result"""
payload_hash = hashlib.md5(json.dumps(self.payload, sort_keys=True).encode()).hexdigest()
return f"task_result:{self.type.value}:{payload_hash}"
class DistributedCoordinator:
"""Enhanced coordinator for distributed development workflows"""
def __init__(self, redis_url: str = "redis://localhost:6379"):
self.agents: Dict[str, Agent] = {}
self.tasks: Dict[str, Task] = {}
self.active_sessions: Dict[str, aiohttp.ClientSession] = {}
self.redis = redis.from_url(redis_url)
self.task_queue = asyncio.Queue()
self.result_cache = {}
self.executor = ThreadPoolExecutor(max_workers=20)
# Performance tracking
self.performance_history: Dict[str, List[float]] = {}
self.load_balancer = AdaptiveLoadBalancer()
# Cluster configuration based on CLUSTER_INFO.md
self._initialize_cluster_agents()
def _initialize_cluster_agents(self):
"""Initialize agents based on cluster configuration"""
cluster_config = {
"ACACIA": {
"endpoint": "http://192.168.1.72:11434",
"model": "deepseek-r1:7b",
"gpu_type": "NVIDIA GTX 1070",
"specializations": [TaskType.DEPLOYMENT, TaskType.DOCUMENTATION],
"max_concurrent": 2
},
"WALNUT": {
"endpoint": "http://192.168.1.27:11434",
"model": "starcoder2:15b",
"gpu_type": "AMD RX 9060 XT",
"specializations": [TaskType.CODE_GENERATION, TaskType.OPTIMIZATION],
"max_concurrent": 4
},
"IRONWOOD": {
"endpoint": "http://192.168.1.113:11434",
"model": "deepseek-coder-v2",
"gpu_type": "Quad-GPU (2x GTX 1070 + 2x Tesla P4)",
"specializations": [TaskType.CODE_GENERATION, TaskType.COMPILATION],
"max_concurrent": 8 # Multi-GPU capability
},
"ROSEWOOD": {
"endpoint": "http://192.168.1.132:11435",
"model": "deepseek-r1:8b",
"gpu_type": "Dual-GPU (RTX 2080 Super + RTX 3070)",
"specializations": [TaskType.TESTING, TaskType.CODE_REVIEW],
"max_concurrent": 6 # Multi-GPU capability
},
"FORSTEINET": {
"endpoint": "http://192.168.1.106:11434",
"model": "devstral",
"gpu_type": "AMD Radeon RX Vega 56/64",
"specializations": [TaskType.TESTING, TaskType.OPTIMIZATION],
"max_concurrent": 2
}
}
for agent_id, config in cluster_config.items():
self.agents[agent_id] = Agent(
id=agent_id,
endpoint=config["endpoint"],
model=config["model"],
gpu_type=config["gpu_type"],
specializations=config["specializations"],
max_concurrent=config["max_concurrent"]
)
async def start(self):
"""Start the distributed coordinator"""
logger.info("Starting Distributed Development Coordinator")
# Initialize agent sessions
for agent in self.agents.values():
self.active_sessions[agent.id] = aiohttp.ClientSession(
connector=agent.connection_pool,
timeout=aiohttp.ClientTimeout(total=120)
)
# Start background tasks
asyncio.create_task(self._task_processor())
asyncio.create_task(self._health_monitor())
asyncio.create_task(self._performance_optimizer())
logger.info(f"Coordinator started with {len(self.agents)} agents")
async def submit_workflow(self, workflow: Dict[str, Any]) -> str:
"""Submit a complete development workflow for distributed execution"""
workflow_id = f"workflow_{int(time.time())}"
# Parse workflow into tasks
tasks = self._parse_workflow_to_tasks(workflow, workflow_id)
# Add tasks to queue with dependency resolution
await self._schedule_workflow_tasks(tasks)
logger.info(f"Submitted workflow {workflow_id} with {len(tasks)} tasks")
return workflow_id
def _parse_workflow_to_tasks(self, workflow: Dict[str, Any], workflow_id: str) -> List[Task]:
"""Parse a workflow definition into executable tasks"""
tasks = []
# Standard development workflow tasks
base_tasks = [
{
"type": TaskType.CODE_GENERATION,
"priority": TaskPriority.HIGH,
"payload": {
"workflow_id": workflow_id,
"requirements": workflow.get("requirements", ""),
"context": workflow.get("context", ""),
"target_language": workflow.get("language", "python")
}
},
{
"type": TaskType.CODE_REVIEW,
"priority": TaskPriority.HIGH,
"payload": {
"workflow_id": workflow_id,
"review_criteria": workflow.get("review_criteria", [])
},
"dependencies": [f"{workflow_id}_code_generation"]
},
{
"type": TaskType.TESTING,
"priority": TaskPriority.NORMAL,
"payload": {
"workflow_id": workflow_id,
"test_types": workflow.get("test_types", ["unit", "integration"])
},
"dependencies": [f"{workflow_id}_code_review"]
},
{
"type": TaskType.COMPILATION,
"priority": TaskPriority.HIGH,
"payload": {
"workflow_id": workflow_id,
"build_config": workflow.get("build_config", {})
},
"dependencies": [f"{workflow_id}_testing"]
},
{
"type": TaskType.OPTIMIZATION,
"priority": TaskPriority.NORMAL,
"payload": {
"workflow_id": workflow_id,
"optimization_targets": workflow.get("optimization_targets", ["performance", "memory"])
},
"dependencies": [f"{workflow_id}_compilation"]
}
]
for i, task_def in enumerate(base_tasks):
task = Task(
id=f"{workflow_id}_{task_def['type'].value}",
type=task_def["type"],
priority=task_def["priority"],
payload=task_def["payload"],
dependencies=task_def.get("dependencies", [])
)
tasks.append(task)
return tasks
async def _schedule_workflow_tasks(self, tasks: List[Task]):
"""Schedule tasks with dependency resolution"""
for task in tasks:
self.tasks[task.id] = task
# Check if dependencies are met
if await self._dependencies_satisfied(task):
await self.task_queue.put(task)
async def _dependencies_satisfied(self, task: Task) -> bool:
"""Check if all task dependencies are satisfied"""
for dep_id in task.dependencies:
if dep_id not in self.tasks or self.tasks[dep_id].status != "completed":
return False
return True
async def _task_processor(self):
"""Main task processing loop with distributed execution"""
while True:
try:
# Get tasks with concurrent processing
tasks_batch = []
for _ in range(min(10, self.task_queue.qsize())):
if not self.task_queue.empty():
task = await asyncio.wait_for(self.task_queue.get(), timeout=1.0)
tasks_batch.append(task)
if tasks_batch:
await self._execute_tasks_batch(tasks_batch)
await asyncio.sleep(0.1)
except asyncio.TimeoutError:
continue
except Exception as e:
logger.error(f"Error in task processor: {e}")
await asyncio.sleep(1)
async def _execute_tasks_batch(self, tasks: List[Task]):
"""Execute a batch of tasks concurrently across available agents"""
execution_futures = []
for task in tasks:
# Check cache first
cached_result = await self._get_cached_result(task)
if cached_result:
task.result = cached_result
task.status = "completed"
await self._handle_task_completion(task)
continue
# Select optimal agent
agent = await self._select_optimal_agent(task)
if agent and agent.current_load < agent.max_concurrent:
future = asyncio.create_task(self._execute_task(task, agent))
execution_futures.append(future)
else:
# Re-queue if no agent available
await self.task_queue.put(task)
# Wait for batch completion
if execution_futures:
await asyncio.gather(*execution_futures, return_exceptions=True)
async def _select_optimal_agent(self, task: Task) -> Optional[Agent]:
"""Select the optimal agent for a task using performance-based load balancing"""
suitable_agents = [
agent for agent in self.agents.values()
if task.type in agent.specializations and agent.health_status == "healthy"
]
if not suitable_agents:
# Fallback to any available agent
suitable_agents = [
agent for agent in self.agents.values()
if agent.health_status == "healthy"
]
if not suitable_agents:
return None
# Select based on performance score and current load
best_agent = min(
suitable_agents,
key=lambda a: (a.current_load / a.max_concurrent) - (a.performance_score * 0.1)
)
return best_agent
async def _execute_task(self, task: Task, agent: Agent):
"""Execute a single task on the selected agent"""
task.assigned_agent = agent.id
task.status = "executing"
agent.current_load += 1
start_time = time.time()
ACTIVE_TASKS.labels(agent=agent.id).inc()
try:
session = self.active_sessions[agent.id]
# Prepare payload for agent
agent_payload = {
"model": agent.model,
"prompt": self._build_task_prompt(task),
"stream": False,
"options": {
"temperature": 0.1,
"top_p": 0.9,
"num_predict": 4000
}
}
# Execute task
async with session.post(
f"{agent.endpoint}/api/generate",
json=agent_payload,
timeout=aiohttp.ClientTimeout(total=300)
) as response:
if response.status == 200:
result = await response.json()
task.result = result
task.status = "completed"
# Cache result
await self._cache_result(task, result)
# Update performance metrics
execution_time = time.time() - start_time
agent.last_response_time = execution_time
self._update_agent_performance(agent.id, execution_time)
TASK_COUNTER.labels(task_type=task.type.value, agent=agent.id).inc()
TASK_DURATION.labels(task_type=task.type.value, agent=agent.id).observe(execution_time)
else:
task.status = "failed"
task.result = {"error": f"HTTP {response.status}"}
except Exception as e:
task.status = "failed"
task.result = {"error": str(e)}
logger.error(f"Task execution failed: {e}")
finally:
agent.current_load -= 1
ACTIVE_TASKS.labels(agent=agent.id).dec()
await self._handle_task_completion(task)
def _build_task_prompt(self, task: Task) -> str:
"""Build optimized prompt for task execution"""
base_prompts = {
TaskType.CODE_GENERATION: """
You are an expert software developer. Generate high-quality, production-ready code based on the requirements.
Requirements: {requirements}
Context: {context}
Target Language: {target_language}
Please provide:
1. Clean, well-documented code
2. Error handling
3. Performance considerations
4. Test examples
Code:
""",
TaskType.CODE_REVIEW: """
You are a senior code reviewer. Analyze the provided code for quality, security, and performance issues.
Please review for:
1. Code quality and maintainability
2. Security vulnerabilities
3. Performance bottlenecks
4. Best practices compliance
5. Documentation completeness
Provide specific feedback and improvement suggestions.
Code Review:
""",
TaskType.TESTING: """
You are a testing specialist. Create comprehensive tests for the provided code.
Test Types Required: {test_types}
Please provide:
1. Unit tests with edge cases
2. Integration tests
3. Performance tests
4. Test documentation
Tests:
""",
TaskType.COMPILATION: """
You are a build and deployment specialist. Analyze the code and provide compilation/build instructions.
Build Configuration: {build_config}
Please provide:
1. Build scripts
2. Dependency management
3. Optimization flags
4. Deployment configuration
Build Instructions:
""",
TaskType.OPTIMIZATION: """
You are a performance optimization expert. Analyze and optimize the provided code.
Optimization Targets: {optimization_targets}
Please provide:
1. Performance analysis
2. Bottleneck identification
3. Optimization recommendations
4. Benchmarking strategies
Optimization Report:
"""
}
prompt_template = base_prompts.get(task.type, "Complete the following task: {payload}")
return prompt_template.format(**task.payload)
async def _get_cached_result(self, task: Task) -> Optional[Dict[str, Any]]:
"""Get cached result for task if available"""
try:
cached = await self.redis.get(task.cache_key)
if cached:
return json.loads(cached)
except Exception as e:
logger.warning(f"Cache retrieval failed: {e}")
return None
async def _cache_result(self, task: Task, result: Dict[str, Any]):
"""Cache task result for future use"""
try:
await self.redis.setex(
task.cache_key,
3600, # 1 hour TTL
json.dumps(result)
)
except Exception as e:
logger.warning(f"Cache storage failed: {e}")
async def _handle_task_completion(self, task: Task):
"""Handle task completion and trigger dependent tasks"""
if task.status == "completed":
# Check for dependent tasks
dependent_tasks = [
t for t in self.tasks.values()
if task.id in t.dependencies and t.status == "pending"
]
for dep_task in dependent_tasks:
if await self._dependencies_satisfied(dep_task):
await self.task_queue.put(dep_task)
def _update_agent_performance(self, agent_id: str, execution_time: float):
"""Update agent performance metrics"""
if agent_id not in self.performance_history:
self.performance_history[agent_id] = []
self.performance_history[agent_id].append(execution_time)
# Keep only last 100 measurements
if len(self.performance_history[agent_id]) > 100:
self.performance_history[agent_id] = self.performance_history[agent_id][-100:]
# Update performance score (lower execution time = higher score)
avg_time = sum(self.performance_history[agent_id]) / len(self.performance_history[agent_id])
self.agents[agent_id].performance_score = max(0.1, 1.0 / (avg_time + 1.0))
async def _health_monitor(self):
"""Monitor agent health and availability"""
while True:
try:
health_checks = []
for agent in self.agents.values():
health_checks.append(self._check_agent_health(agent))
await asyncio.gather(*health_checks, return_exceptions=True)
await asyncio.sleep(30) # Check every 30 seconds
except Exception as e:
logger.error(f"Health monitor error: {e}")
await asyncio.sleep(30)
async def _check_agent_health(self, agent: Agent):
"""Check individual agent health"""
try:
session = self.active_sessions[agent.id]
async with session.get(f"{agent.endpoint}/api/tags", timeout=aiohttp.ClientTimeout(total=10)) as response:
if response.status == 200:
agent.health_status = "healthy"
else:
agent.health_status = "unhealthy"
except Exception:
agent.health_status = "unreachable"
# Update utilization metric
utilization = (agent.current_load / agent.max_concurrent) * 100
AGENT_UTILIZATION.labels(agent=agent.id).set(utilization)
async def _performance_optimizer(self):
"""Background performance optimization"""
while True:
try:
await self._optimize_agent_parameters()
await self._cleanup_completed_tasks()
await asyncio.sleep(300) # Optimize every 5 minutes
except Exception as e:
logger.error(f"Performance optimizer error: {e}")
await asyncio.sleep(300)
async def _optimize_agent_parameters(self):
"""Dynamically optimize agent parameters based on performance"""
for agent in self.agents.values():
if agent.id in self.performance_history:
avg_response_time = sum(self.performance_history[agent.id]) / len(self.performance_history[agent.id])
# Adjust max_concurrent based on performance
if avg_response_time < 10: # Fast responses
agent.max_concurrent = min(agent.max_concurrent + 1, 10)
elif avg_response_time > 30: # Slow responses
agent.max_concurrent = max(agent.max_concurrent - 1, 1)
async def _cleanup_completed_tasks(self):
"""Clean up old completed tasks"""
cutoff_time = time.time() - 3600 # Keep tasks for 1 hour
tasks_to_remove = [
task_id for task_id, task in self.tasks.items()
if task.status in ["completed", "failed"] and task.created_at < cutoff_time
]
for task_id in tasks_to_remove:
del self.tasks[task_id]
async def get_workflow_status(self, workflow_id: str) -> Dict[str, Any]:
"""Get comprehensive workflow status"""
workflow_tasks = [
task for task in self.tasks.values()
if task.payload.get("workflow_id") == workflow_id
]
if not workflow_tasks:
return {"error": "Workflow not found"}
total_tasks = len(workflow_tasks)
completed_tasks = sum(1 for task in workflow_tasks if task.status == "completed")
failed_tasks = sum(1 for task in workflow_tasks if task.status == "failed")
return {
"workflow_id": workflow_id,
"total_tasks": total_tasks,
"completed_tasks": completed_tasks,
"failed_tasks": failed_tasks,
"progress": (completed_tasks / total_tasks) * 100 if total_tasks > 0 else 0,
"status": "completed" if completed_tasks == total_tasks else "in_progress",
"tasks": [
{
"id": task.id,
"type": task.type.value,
"status": task.status,
"assigned_agent": task.assigned_agent,
"execution_time": time.time() - task.created_at
}
for task in workflow_tasks
]
}
async def stop(self):
"""Clean shutdown of coordinator"""
logger.info("Shutting down Distributed Coordinator")
# Close all sessions
for session in self.active_sessions.values():
await session.close()
# Close Redis connection
await self.redis.close()
logger.info("Coordinator shutdown complete")
class AdaptiveLoadBalancer:
"""Adaptive load balancer for optimal task distribution"""
def __init__(self):
self.agent_weights = {}
self.learning_rate = 0.1
def update_weight(self, agent_id: str, performance_metric: float):
"""Update agent weight based on performance"""
if agent_id not in self.agent_weights:
self.agent_weights[agent_id] = 1.0
# Exponential moving average
self.agent_weights[agent_id] = (
(1 - self.learning_rate) * self.agent_weights[agent_id] +
self.learning_rate * performance_metric
)
def get_weight(self, agent_id: str) -> float:
"""Get current weight for agent"""
return self.agent_weights.get(agent_id, 1.0)

View File

@@ -0,0 +1,2 @@
from . import agent
from . import project

View File

@@ -0,0 +1,15 @@
from sqlalchemy import Column, Integer, String, DateTime
from sqlalchemy.sql import func
from ..core.database import Base
class Agent(Base):
__tablename__ = "agents"
id = Column(String, primary_key=True, index=True)
endpoint = Column(String, nullable=False)
model = Column(String, nullable=False)
specialty = Column(String, nullable=False)
max_concurrent = Column(Integer, default=2)
current_tasks = Column(Integer, default=0)
created_at = Column(DateTime(timezone=True), server_default=func.now())
updated_at = Column(DateTime(timezone=True), onupdate=func.now())

View File

@@ -0,0 +1,25 @@
from sqlalchemy import Column, Integer, String, DateTime, Text
from sqlalchemy.sql import func
from ..core.database import Base
class Project(Base):
__tablename__ = "projects"
id = Column(Integer, primary_key=True, index=True)
name = Column(String, unique=True, index=True, nullable=False)
description = Column(Text, nullable=True)
status = Column(String, default="active") # e.g., active, completed, archived
created_at = Column(DateTime(timezone=True), server_default=func.now())
updated_at = Column(DateTime(timezone=True), onupdate=func.now())
# You might also need Pydantic models for request/response validation
# from pydantic import BaseModel
# class ProjectCreate(BaseModel):
# name: str
# description: str | None = None
# class ProjectMetrics(BaseModel):
# total_tasks: int
# completed_tasks: int
# # Add other metrics as needed

View File

@@ -0,0 +1,379 @@
"""
Cluster Service for monitoring cluster nodes and their capabilities.
"""
import requests
import json
from typing import Dict, List, Optional, Any
from datetime import datetime
import subprocess
import psutil
import platform
class ClusterService:
def __init__(self):
self.cluster_nodes = {
"walnut": {
"ip": "192.168.1.27",
"hostname": "walnut",
"role": "manager",
"gpu": "AMD RX 9060 XT",
"memory": "64GB",
"cpu": "AMD Ryzen 7 5800X3D",
"ollama_port": 11434,
"cockpit_port": 9090
},
"ironwood": {
"ip": "192.168.1.113",
"hostname": "ironwood",
"role": "worker",
"gpu": "NVIDIA RTX 3070",
"memory": "128GB",
"cpu": "AMD Threadripper 2920X",
"ollama_port": 11434,
"cockpit_port": 9090
},
"acacia": {
"ip": "192.168.1.72",
"hostname": "acacia",
"role": "worker",
"gpu": "NVIDIA GTX 1070",
"memory": "128GB",
"cpu": "Intel Xeon E5-2680 v4",
"ollama_port": 11434,
"cockpit_port": 9090
},
"forsteinet": {
"ip": "192.168.1.106",
"hostname": "forsteinet",
"role": "worker",
"gpu": "AMD RX Vega 56/64",
"memory": "32GB",
"cpu": "Intel Core i7-4770",
"ollama_port": 11434,
"cockpit_port": 9090
}
}
self.n8n_api_base = "https://n8n.home.deepblack.cloud/api/v1"
self.n8n_api_key = self._get_n8n_api_key()
def _get_n8n_api_key(self) -> Optional[str]:
"""Get n8n API key from secrets."""
try:
from pathlib import Path
api_key_path = Path("/home/tony/AI/secrets/passwords_and_tokens/n8n-api-key")
if api_key_path.exists():
return api_key_path.read_text().strip()
except Exception:
pass
return "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJzdWIiOiI4NTE3ODg3Yy0zYTI4LTRmMmEtOTA3Ni05NzBkNmFkMWE4MjEiLCJpc3MiOiJuOG4iLCJhdWQiOiJwdWJsaWMtYXBpIiwiaWF0IjoxNzUwMzMzMDI2fQ.NST0HBBjk0_DbQTO9QT17VYU-kZ5XBHoIM5HTt2sbkM"
def get_cluster_overview(self) -> Dict[str, Any]:
"""Get overview of entire cluster status."""
nodes = []
total_models = 0
active_nodes = 0
for node_id, node_info in self.cluster_nodes.items():
node_status = self._get_node_status(node_id)
nodes.append(node_status)
if node_status["status"] == "online":
active_nodes += 1
total_models += node_status["model_count"]
return {
"cluster_name": "deepblackcloud",
"total_nodes": len(self.cluster_nodes),
"active_nodes": active_nodes,
"total_models": total_models,
"nodes": nodes,
"last_updated": datetime.now().isoformat()
}
def _get_node_status(self, node_id: str) -> Dict[str, Any]:
"""Get detailed status for a specific node."""
node_info = self.cluster_nodes.get(node_id)
if not node_info:
return {"error": "Node not found"}
# Check if node is reachable
status = "offline"
models = []
model_count = 0
try:
# Check Ollama API
response = requests.get(
f"http://{node_info['ip']}:{node_info['ollama_port']}/api/tags",
timeout=5
)
if response.status_code == 200:
status = "online"
models_data = response.json()
models = models_data.get("models", [])
model_count = len(models)
except Exception:
pass
# Get system metrics if this is the local node
cpu_percent = None
memory_percent = None
disk_usage = None
if node_info["hostname"] == platform.node():
try:
cpu_percent = psutil.cpu_percent(interval=1)
memory = psutil.virtual_memory()
memory_percent = memory.percent
disk = psutil.disk_usage('/')
disk_usage = {
"total": disk.total,
"used": disk.used,
"free": disk.free,
"percent": (disk.used / disk.total) * 100
}
except Exception:
pass
return {
"id": node_id,
"hostname": node_info["hostname"],
"ip": node_info["ip"],
"status": status,
"role": node_info["role"],
"hardware": {
"cpu": node_info["cpu"],
"memory": node_info["memory"],
"gpu": node_info["gpu"]
},
"model_count": model_count,
"models": [{"name": m["name"], "size": m.get("size", 0)} for m in models],
"metrics": {
"cpu_percent": cpu_percent,
"memory_percent": memory_percent,
"disk_usage": disk_usage
},
"services": {
"ollama": f"http://{node_info['ip']}:{node_info['ollama_port']}",
"cockpit": f"https://{node_info['ip']}:{node_info['cockpit_port']}"
},
"last_check": datetime.now().isoformat()
}
def get_node_details(self, node_id: str) -> Optional[Dict[str, Any]]:
"""Get detailed information about a specific node."""
return self._get_node_status(node_id)
def get_available_models(self) -> Dict[str, List[Dict[str, Any]]]:
"""Get all available models across all nodes."""
models_by_node = {}
for node_id, node_info in self.cluster_nodes.items():
try:
response = requests.get(
f"http://{node_info['ip']}:{node_info['ollama_port']}/api/tags",
timeout=5
)
if response.status_code == 200:
models_data = response.json()
models = models_data.get("models", [])
models_by_node[node_id] = [
{
"name": m["name"],
"size": m.get("size", 0),
"modified": m.get("modified_at", ""),
"node": node_id,
"node_ip": node_info["ip"]
}
for m in models
]
else:
models_by_node[node_id] = []
except Exception:
models_by_node[node_id] = []
return models_by_node
def get_n8n_workflows(self) -> List[Dict[str, Any]]:
"""Get n8n workflows from the cluster."""
workflows = []
if not self.n8n_api_key:
return workflows
try:
headers = {
"X-N8N-API-KEY": self.n8n_api_key,
"Content-Type": "application/json"
}
response = requests.get(
f"{self.n8n_api_base}/workflows",
headers=headers,
timeout=10
)
if response.status_code == 200:
workflows_data = response.json()
# Process workflows to extract relevant information
for workflow in workflows_data:
workflows.append({
"id": workflow.get("id"),
"name": workflow.get("name"),
"active": workflow.get("active", False),
"created_at": workflow.get("createdAt"),
"updated_at": workflow.get("updatedAt"),
"tags": workflow.get("tags", []),
"node_count": len(workflow.get("nodes", [])),
"webhook_url": self._extract_webhook_url(workflow),
"description": self._extract_workflow_description(workflow)
})
except Exception as e:
print(f"Error fetching n8n workflows: {e}")
return workflows
def _extract_webhook_url(self, workflow: Dict) -> Optional[str]:
"""Extract webhook URL from workflow if it exists."""
nodes = workflow.get("nodes", [])
for node in nodes:
if node.get("type") == "n8n-nodes-base.webhook":
webhook_path = node.get("parameters", {}).get("path", "")
if webhook_path:
return f"https://n8n.home.deepblack.cloud/webhook/{webhook_path}"
return None
def _extract_workflow_description(self, workflow: Dict) -> str:
"""Extract workflow description from notes or nodes."""
# Try to get description from workflow notes
notes = workflow.get("notes", "")
if notes:
return notes[:200] + "..." if len(notes) > 200 else notes
# Try to infer from node types
nodes = workflow.get("nodes", [])
node_types = [node.get("type", "").split(".")[-1] for node in nodes]
if "webhook" in node_types:
return "Webhook-triggered workflow"
elif "ollama" in [nt.lower() for nt in node_types]:
return "AI model workflow"
else:
return f"Workflow with {len(nodes)} nodes"
def get_cluster_metrics(self) -> Dict[str, Any]:
"""Get aggregated cluster metrics."""
total_models = 0
active_nodes = 0
total_memory = 0
total_cpu_cores = 0
# Hardware specifications from CLUSTER_INFO.md
hardware_specs = {
"walnut": {"memory_gb": 64, "cpu_cores": 8},
"ironwood": {"memory_gb": 128, "cpu_cores": 12},
"acacia": {"memory_gb": 128, "cpu_cores": 56},
"forsteinet": {"memory_gb": 32, "cpu_cores": 4}
}
for node_id, node_info in self.cluster_nodes.items():
node_status = self._get_node_status(node_id)
if node_status["status"] == "online":
active_nodes += 1
total_models += node_status["model_count"]
# Add hardware specs
specs = hardware_specs.get(node_id, {})
total_memory += specs.get("memory_gb", 0)
total_cpu_cores += specs.get("cpu_cores", 0)
return {
"cluster_health": {
"total_nodes": len(self.cluster_nodes),
"active_nodes": active_nodes,
"offline_nodes": len(self.cluster_nodes) - active_nodes,
"health_percentage": (active_nodes / len(self.cluster_nodes)) * 100
},
"compute_resources": {
"total_memory_gb": total_memory,
"total_cpu_cores": total_cpu_cores,
"total_models": total_models
},
"services": {
"ollama_endpoints": active_nodes,
"n8n_workflows": len(self.get_n8n_workflows()),
"docker_swarm_active": self._check_docker_swarm()
},
"last_updated": datetime.now().isoformat()
}
def _check_docker_swarm(self) -> bool:
"""Check if Docker Swarm is active."""
try:
result = subprocess.run(
["docker", "info", "--format", "{{.Swarm.LocalNodeState}}"],
capture_output=True,
text=True,
timeout=5
)
return result.stdout.strip() == "active"
except Exception:
return False
def get_workflow_executions(self, limit: int = 10) -> List[Dict[str, Any]]:
"""Get recent workflow executions from n8n."""
executions = []
if not self.n8n_api_key:
return executions
try:
headers = {
"X-N8N-API-KEY": self.n8n_api_key,
"Content-Type": "application/json"
}
response = requests.get(
f"{self.n8n_api_base}/executions",
headers=headers,
params={"limit": limit},
timeout=10
)
if response.status_code == 200:
executions_data = response.json()
for execution in executions_data:
executions.append({
"id": execution.get("id"),
"workflow_id": execution.get("workflowId"),
"mode": execution.get("mode"),
"status": execution.get("finished") and "success" or "running",
"started_at": execution.get("startedAt"),
"finished_at": execution.get("stoppedAt"),
"duration": self._calculate_duration(
execution.get("startedAt"),
execution.get("stoppedAt")
)
})
except Exception as e:
print(f"Error fetching workflow executions: {e}")
return executions
def _calculate_duration(self, start_time: str, end_time: str) -> Optional[int]:
"""Calculate duration between start and end times in seconds."""
if not start_time or not end_time:
return None
try:
start = datetime.fromisoformat(start_time.replace('Z', '+00:00'))
end = datetime.fromisoformat(end_time.replace('Z', '+00:00'))
return int((end - start).total_seconds())
except Exception:
return None

View File

@@ -0,0 +1,437 @@
"""
Project Service for integrating with local project directories and GitHub.
"""
import os
import json
import re
from pathlib import Path
from typing import List, Dict, Optional, Any
from datetime import datetime
import requests
from requests.auth import HTTPBasicAuth
import markdown
from app.models.project import Project
class ProjectService:
def __init__(self):
self.projects_base_path = Path("/home/tony/AI/projects")
self.github_token = self._get_github_token()
self.github_api_base = "https://api.github.com"
def _get_github_token(self) -> Optional[str]:
"""Get GitHub token from secrets file."""
try:
# Try GitHub token first
github_token_path = Path("/home/tony/AI/secrets/passwords_and_tokens/github-token")
if github_token_path.exists():
return github_token_path.read_text().strip()
# Fallback to GitLab token if GitHub token doesn't exist
gitlab_token_path = Path("/home/tony/AI/secrets/passwords_and_tokens/claude-gitlab-token")
if gitlab_token_path.exists():
return gitlab_token_path.read_text().strip()
except Exception:
pass
return None
def get_all_projects(self) -> List[Dict[str, Any]]:
"""Get all projects from the local filesystem."""
projects = []
if not self.projects_base_path.exists():
return projects
for project_dir in self.projects_base_path.iterdir():
if project_dir.is_dir() and not project_dir.name.startswith('.'):
project_data = self._analyze_project_directory(project_dir)
if project_data:
projects.append(project_data)
# Sort by last modified date
projects.sort(key=lambda x: x.get('updated_at', ''), reverse=True)
return projects
def get_project_by_id(self, project_id: str) -> Optional[Dict[str, Any]]:
"""Get a specific project by ID (directory name)."""
project_path = self.projects_base_path / project_id
if not project_path.exists() or not project_path.is_dir():
return None
return self._analyze_project_directory(project_path)
def _analyze_project_directory(self, project_path: Path) -> Optional[Dict[str, Any]]:
"""Analyze a project directory and extract metadata."""
try:
project_id = project_path.name
# Skip if this is the hive project itself
if project_id == 'hive':
return None
# Get basic file info
stat = project_path.stat()
created_at = datetime.fromtimestamp(stat.st_ctime).isoformat()
updated_at = datetime.fromtimestamp(stat.st_mtime).isoformat()
# Read PROJECT_PLAN.md if it exists
project_plan_path = project_path / "PROJECT_PLAN.md"
project_plan_content = ""
description = ""
if project_plan_path.exists():
project_plan_content = project_plan_path.read_text(encoding='utf-8')
description = self._extract_description_from_plan(project_plan_content)
# Read TODOS.md if it exists
todos_path = project_path / "TODOS.md"
todos_content = ""
if todos_path.exists():
todos_content = todos_path.read_text(encoding='utf-8')
# Check for GitHub repository
git_config_path = project_path / ".git" / "config"
github_repo = None
if git_config_path.exists():
github_repo = self._extract_github_repo(git_config_path)
# Determine project status
status = self._determine_project_status(project_path, todos_content)
# Extract tags from content
tags = self._extract_tags(project_plan_content, project_path)
# Get workflow count (look for workflow-related files)
workflow_count = self._count_workflows(project_path)
# Build project data
project_data = {
"id": project_id,
"name": self._format_project_name(project_id),
"description": description or f"Project in {project_id}",
"status": status,
"created_at": created_at,
"updated_at": updated_at,
"tags": tags,
"github_repo": github_repo,
"workflow_count": workflow_count,
"has_project_plan": project_plan_path.exists(),
"has_todos": todos_path.exists(),
"file_count": len(list(project_path.rglob("*"))),
"metadata": {
"project_plan_path": str(project_plan_path) if project_plan_path.exists() else None,
"todos_path": str(todos_path) if todos_path.exists() else None,
"directory_size": self._get_directory_size(project_path)
}
}
return project_data
except Exception as e:
print(f"Error analyzing project directory {project_path}: {e}")
return None
def _extract_description_from_plan(self, content: str) -> str:
"""Extract description from PROJECT_PLAN.md content."""
lines = content.split('\n')
description_lines = []
in_description = False
for line in lines:
line = line.strip()
if not line:
continue
# Look for overview, description, or objective sections
if re.match(r'^#+\s*(overview|description|objective|project\s+description)', line, re.IGNORECASE):
in_description = True
continue
elif line.startswith('#') and in_description:
break
elif in_description and not line.startswith('#'):
description_lines.append(line)
if len(description_lines) >= 2: # Limit to first 2 lines
break
description = ' '.join(description_lines).strip()
# If no description found, try to get from the beginning
if not description:
for line in lines:
line = line.strip()
if line and not line.startswith('#') and not line.startswith('```'):
description = line
break
return description[:200] + "..." if len(description) > 200 else description
def _extract_github_repo(self, git_config_path: Path) -> Optional[str]:
"""Extract GitHub repository URL from git config."""
try:
config_content = git_config_path.read_text()
# Look for GitHub remote URL
for line in config_content.split('\n'):
if 'github.com' in line and ('url =' in line or 'url=' in line):
url = line.split('=', 1)[1].strip()
# Extract repo name from URL
if 'github.com/' in url:
repo_part = url.split('github.com/')[-1]
if repo_part.endswith('.git'):
repo_part = repo_part[:-4]
return repo_part
except Exception:
pass
return None
def _determine_project_status(self, project_path: Path, todos_content: str) -> str:
"""Determine project status based on various indicators."""
# Check for recent activity (files modified in last 30 days)
recent_activity = False
thirty_days_ago = datetime.now().timestamp() - (30 * 24 * 60 * 60)
try:
for file_path in project_path.rglob("*"):
if file_path.is_file() and file_path.stat().st_mtime > thirty_days_ago:
recent_activity = True
break
except Exception:
pass
# Check TODOS for status indicators
if todos_content:
content_lower = todos_content.lower()
if any(keyword in content_lower for keyword in ['completed', 'done', 'finished']):
if not recent_activity:
return "archived"
if any(keyword in content_lower for keyword in ['in progress', 'active', 'working']):
return "active"
# Check for deployment files
deployment_files = ['Dockerfile', 'docker-compose.yml', 'deploy.sh', 'package.json']
has_deployment = any((project_path / f).exists() for f in deployment_files)
if recent_activity:
return "active"
elif has_deployment:
return "inactive"
else:
return "draft"
def _extract_tags(self, content: str, project_path: Path) -> List[str]:
"""Extract tags based on content and file analysis."""
tags = []
if content:
content_lower = content.lower()
# Technology tags
tech_tags = {
'python': ['python', '.py'],
'javascript': ['javascript', 'js', 'node'],
'typescript': ['typescript', 'ts'],
'react': ['react', 'jsx'],
'docker': ['docker', 'dockerfile'],
'ai': ['ai', 'ml', 'machine learning', 'neural', 'model'],
'web': ['web', 'frontend', 'backend', 'api'],
'automation': ['automation', 'workflow', 'n8n'],
'infrastructure': ['infrastructure', 'deployment', 'devops'],
'mobile': ['mobile', 'ios', 'android', 'swift'],
'data': ['data', 'database', 'sql', 'analytics'],
'security': ['security', 'auth', 'authentication']
}
for tag, keywords in tech_tags.items():
if any(keyword in content_lower for keyword in keywords):
tags.append(tag)
# File-based tags
files = list(project_path.rglob("*"))
file_extensions = [f.suffix.lower() for f in files if f.is_file()]
if '.py' in file_extensions:
tags.append('python')
if '.js' in file_extensions or '.ts' in file_extensions:
tags.append('javascript')
if any(f.name == 'Dockerfile' for f in files):
tags.append('docker')
if any(f.name == 'package.json' for f in files):
tags.append('node')
return list(set(tags)) # Remove duplicates
def _count_workflows(self, project_path: Path) -> int:
"""Count workflow-related files in the project."""
workflow_patterns = [
'*.yml', '*.yaml', # GitHub Actions, Docker Compose
'*.json', # n8n workflows, package.json
'workflow*', 'Workflow*',
'*workflow*'
]
count = 0
for pattern in workflow_patterns:
count += len(list(project_path.rglob(pattern)))
return min(count, 20) # Cap at reasonable number
def _format_project_name(self, project_id: str) -> str:
"""Format project directory name into a readable project name."""
# Convert kebab-case and snake_case to Title Case
name = project_id.replace('-', ' ').replace('_', ' ')
return ' '.join(word.capitalize() for word in name.split())
def _get_directory_size(self, path: Path) -> int:
"""Get total size of directory in bytes."""
total_size = 0
try:
for file_path in path.rglob("*"):
if file_path.is_file():
total_size += file_path.stat().st_size
except Exception:
pass
return total_size
def get_project_metrics(self, project_id: str) -> Optional[Dict[str, Any]]:
"""Get detailed metrics for a project."""
project_path = self.projects_base_path / project_id
if not project_path.exists():
return None
# Get GitHub issues count if repo exists
github_repo = None
git_config_path = project_path / ".git" / "config"
if git_config_path.exists():
github_repo = self._extract_github_repo(git_config_path)
github_issues = 0
github_open_issues = 0
if github_repo and self.github_token:
try:
issues_data = self._get_github_issues(github_repo)
github_issues = len(issues_data)
github_open_issues = len([i for i in issues_data if i['state'] == 'open'])
except Exception:
pass
# Count workflows
workflow_count = self._count_workflows(project_path)
# Analyze TODO file
todos_path = project_path / "TODOS.md"
completed_tasks = 0
total_tasks = 0
if todos_path.exists():
todos_content = todos_path.read_text()
# Count checkboxes
total_tasks = len(re.findall(r'- \[[ x]\]', todos_content))
completed_tasks = len(re.findall(r'- \[x\]', todos_content))
# Get last activity
last_activity = None
try:
latest_file = None
latest_time = 0
for file_path in project_path.rglob("*"):
if file_path.is_file():
mtime = file_path.stat().st_mtime
if mtime > latest_time:
latest_time = mtime
latest_file = file_path
if latest_file:
last_activity = datetime.fromtimestamp(latest_time).isoformat()
except Exception:
pass
return {
"total_workflows": workflow_count,
"active_workflows": max(0, workflow_count - 1) if workflow_count > 0 else 0,
"total_tasks": total_tasks,
"completed_tasks": completed_tasks,
"github_issues": github_issues,
"github_open_issues": github_open_issues,
"task_completion_rate": completed_tasks / total_tasks if total_tasks > 0 else 0,
"last_activity": last_activity
}
def _get_github_issues(self, repo: str) -> List[Dict]:
"""Fetch GitHub issues for a repository."""
if not self.github_token:
return []
try:
url = f"{self.github_api_base}/repos/{repo}/issues"
headers = {
"Authorization": f"token {self.github_token}",
"Accept": "application/vnd.github.v3+json"
}
response = requests.get(url, headers=headers, timeout=10)
if response.status_code == 200:
return response.json()
except Exception as e:
print(f"Error fetching GitHub issues for {repo}: {e}")
return []
def get_project_tasks(self, project_id: str) -> List[Dict[str, Any]]:
"""Get tasks for a project (from GitHub issues and TODOS.md)."""
tasks = []
# Get GitHub issues
project_path = self.projects_base_path / project_id
git_config_path = project_path / ".git" / "config"
if git_config_path.exists():
github_repo = self._extract_github_repo(git_config_path)
if github_repo:
github_issues = self._get_github_issues(github_repo)
for issue in github_issues:
tasks.append({
"id": f"gh-{issue['number']}",
"title": issue['title'],
"description": issue.get('body', ''),
"status": "open" if issue['state'] == 'open' else "closed",
"type": "github_issue",
"created_at": issue['created_at'],
"updated_at": issue['updated_at'],
"url": issue['html_url'],
"labels": [label['name'] for label in issue.get('labels', [])]
})
# Get TODOS from TODOS.md
todos_path = project_path / "TODOS.md"
if todos_path.exists():
todos_content = todos_path.read_text()
todo_items = self._parse_todos_markdown(todos_content)
tasks.extend(todo_items)
return tasks
def _parse_todos_markdown(self, content: str) -> List[Dict[str, Any]]:
"""Parse TODOS.md content into structured tasks."""
tasks = []
lines = content.split('\n')
for i, line in enumerate(lines):
line = line.strip()
# Look for checkbox items
checkbox_match = re.match(r'- \[([x ])\]\s*(.+)', line)
if checkbox_match:
is_completed = checkbox_match.group(1) == 'x'
task_text = checkbox_match.group(2)
tasks.append({
"id": f"todo-{i}",
"title": task_text,
"description": "",
"status": "completed" if is_completed else "open",
"type": "todo",
"created_at": None,
"updated_at": None,
"url": None,
"labels": []
})
return tasks