Files
hive/backend/app/core/workflow_engine.py
anthonyrawlins d7ad321176 Initial commit: Complete Hive distributed AI orchestration platform
This comprehensive implementation includes:
- FastAPI backend with MCP server integration
- React/TypeScript frontend with Vite
- PostgreSQL database with Redis caching
- Grafana/Prometheus monitoring stack
- Docker Compose orchestration
- Full MCP protocol support for Claude Code integration

Features:
- Agent discovery and management across network
- Visual workflow editor and execution engine
- Real-time task coordination and monitoring
- Multi-model support with specialized agents
- Distributed development task allocation

🤖 Generated with [Claude Code](https://claude.ai/code)

Co-Authored-By: Claude <noreply@anthropic.com>
2025-07-07 21:44:31 +10:00

446 lines
17 KiB
Python

import sys
import os
from pathlib import Path
from typing import Dict, Any, List, Optional
import asyncio
import aiohttp
import json
from datetime import datetime
import uuid
# Add the McPlan project root to the Python path
mcplan_root = Path(__file__).parent.parent.parent.parent
sys.path.insert(0, str(mcplan_root))
# Import the existing McPlan bridge components
try:
from mcplan_bridge_poc import N8nWorkflowParser, McPlanNodeExecutor, McPlanWorkflowEngine
except ImportError:
# Fallback implementation if import fails
class N8nWorkflowParser:
def __init__(self, workflow_json):
self.workflow_json = workflow_json
self.nodes = {}
self.connections = []
self.execution_order = []
def parse(self):
pass
class McPlanNodeExecutor:
def __init__(self):
self.execution_context = {}
class McPlanWorkflowEngine:
def __init__(self):
self.parser = None
self.executor = McPlanNodeExecutor()
async def load_workflow(self, workflow_json):
pass
async def execute_workflow(self, input_data):
return {"success": True, "message": "Fallback execution"}
class MultiAgentOrchestrator:
"""
Multi-agent orchestration system for distributing workflow tasks
"""
def __init__(self):
# Available Ollama agents from cluster
self.agents = {
'acacia': {
'name': 'ACACIA Infrastructure Specialist',
'endpoint': 'http://192.168.1.72:11434',
'model': 'deepseek-r1:7b',
'specialization': 'Infrastructure & Architecture',
'timeout': 30,
'status': 'unknown'
},
'walnut': {
'name': 'WALNUT Full-Stack Developer',
'endpoint': 'http://192.168.1.27:11434',
'model': 'starcoder2:15b',
'specialization': 'Full-Stack Development',
'timeout': 25,
'status': 'unknown'
},
'ironwood': {
'name': 'IRONWOOD Backend Specialist',
'endpoint': 'http://192.168.1.113:11434',
'model': 'deepseek-coder-v2',
'specialization': 'Backend & Optimization',
'timeout': 30,
'status': 'unknown'
}
}
async def check_agent_health(self, agent_id: str) -> bool:
"""Check if an agent is available and responsive"""
agent = self.agents.get(agent_id)
if not agent:
return False
try:
async with aiohttp.ClientSession(timeout=aiohttp.ClientTimeout(total=5)) as session:
async with session.get(f"{agent['endpoint']}/api/tags") as response:
if response.status == 200:
self.agents[agent_id]['status'] = 'healthy'
return True
except Exception as e:
print(f"Agent {agent_id} health check failed: {e}")
self.agents[agent_id]['status'] = 'unhealthy'
return False
async def get_available_agents(self) -> List[str]:
"""Get list of available and healthy agents"""
available = []
health_checks = [self.check_agent_health(agent_id) for agent_id in self.agents.keys()]
results = await asyncio.gather(*health_checks, return_exceptions=True)
for i, agent_id in enumerate(self.agents.keys()):
if isinstance(results[i], bool) and results[i]:
available.append(agent_id)
return available
async def execute_on_agent(self, agent_id: str, task: Dict[str, Any]) -> Dict[str, Any]:
"""Execute a task on a specific agent"""
agent = self.agents.get(agent_id)
if not agent:
return {"success": False, "error": f"Agent {agent_id} not found"}
prompt = f"""Task: {task.get('description', 'Unknown task')}
Type: {task.get('type', 'general')}
Parameters: {json.dumps(task.get('parameters', {}), indent=2)}
Please execute this task and provide a structured response."""
payload = {
"model": agent['model'],
"prompt": prompt,
"stream": False,
"options": {
"num_predict": 400,
"temperature": 0.1,
"top_p": 0.9
}
}
try:
async with aiohttp.ClientSession(timeout=aiohttp.ClientTimeout(total=agent['timeout'])) as session:
async with session.post(f"{agent['endpoint']}/api/generate", json=payload) as response:
if response.status == 200:
result = await response.json()
return {
"success": True,
"agent": agent_id,
"response": result.get('response', ''),
"model": agent['model'],
"task_id": task.get('id', str(uuid.uuid4()))
}
else:
return {
"success": False,
"error": f"HTTP {response.status}",
"agent": agent_id
}
except Exception as e:
return {
"success": False,
"error": str(e),
"agent": agent_id
}
async def orchestrate_workflow(self, workflow_nodes: List[Dict[str, Any]]) -> Dict[str, Any]:
"""Orchestrate workflow execution across multiple agents"""
available_agents = await self.get_available_agents()
if not available_agents:
return {
"success": False,
"error": "No agents available for orchestration"
}
# Distribute nodes among available agents
tasks = []
for i, node in enumerate(workflow_nodes):
agent_id = available_agents[i % len(available_agents)]
task = {
"id": node.get('id', f"node-{i}"),
"type": node.get('type', 'unknown'),
"description": f"Execute {node.get('type', 'node')} with parameters",
"parameters": node.get('parameters', {}),
"agent_id": agent_id
}
tasks.append(self.execute_on_agent(agent_id, task))
# Execute all tasks concurrently
results = await asyncio.gather(*tasks, return_exceptions=True)
# Process results
successful_tasks = []
failed_tasks = []
for i, result in enumerate(results):
if isinstance(result, dict) and result.get('success'):
successful_tasks.append(result)
else:
failed_tasks.append({
"node_index": i,
"error": str(result) if isinstance(result, Exception) else result
})
return {
"success": len(failed_tasks) == 0,
"total_tasks": len(tasks),
"successful_tasks": len(successful_tasks),
"failed_tasks": len(failed_tasks),
"results": successful_tasks,
"errors": failed_tasks,
"agents_used": list(set([task.get('agent') for task in successful_tasks if task.get('agent')])),
"execution_time": datetime.now().isoformat()
}
class McPlanEngine:
"""
Web-enhanced McPlan engine with multi-agent orchestration capabilities
"""
def __init__(self):
self.engine = McPlanWorkflowEngine()
self.orchestrator = MultiAgentOrchestrator()
self.status_callbacks = []
def add_status_callback(self, callback):
"""Add callback for status updates during execution"""
self.status_callbacks.append(callback)
async def notify_status(self, node_id: str, status: str, data: Any = None):
"""Notify all status callbacks"""
for callback in self.status_callbacks:
await callback(node_id, status, data)
async def validate_workflow(self, workflow_json: Dict[str, Any]) -> Dict[str, Any]:
"""Validate workflow structure and return analysis"""
try:
parser = N8nWorkflowParser(workflow_json)
parser.parse()
return {
"valid": True,
"errors": [],
"warnings": [],
"execution_order": parser.execution_order,
"node_count": len(parser.nodes),
"connection_count": len(parser.connections)
}
except Exception as e:
return {
"valid": False,
"errors": [str(e)],
"warnings": [],
"execution_order": [],
"node_count": 0,
"connection_count": 0
}
async def load_workflow(self, workflow_json: Dict[str, Any]):
"""Load workflow into engine"""
await self.engine.load_workflow(workflow_json)
async def execute_workflow(self, input_data: Dict[str, Any], use_orchestration: bool = False) -> Dict[str, Any]:
"""Execute workflow with optional multi-agent orchestration"""
try:
if use_orchestration:
# Use multi-agent orchestration
await self.notify_status("orchestration", "starting", {"message": "Starting multi-agent orchestration"})
# Get workflow nodes for orchestration
if hasattr(self.engine, 'parser') and self.engine.parser:
workflow_nodes = list(self.engine.parser.nodes.values())
orchestration_result = await self.orchestrator.orchestrate_workflow(workflow_nodes)
await self.notify_status("orchestration", "completed", orchestration_result)
# Combine orchestration results with standard execution
standard_result = await self.engine.execute_workflow(input_data)
return {
"success": orchestration_result.get("success", False) and
(standard_result.get("success", True) if isinstance(standard_result, dict) else True),
"standard_execution": standard_result,
"orchestration": orchestration_result,
"execution_mode": "multi-agent",
"message": "Workflow executed with multi-agent orchestration"
}
else:
# Fallback to standard execution if no parsed workflow
await self.notify_status("orchestration", "fallback", {"message": "No parsed workflow, using standard execution"})
use_orchestration = False
if not use_orchestration:
# Standard single-agent execution
await self.notify_status("execution", "starting", {"message": "Starting standard execution"})
result = await self.engine.execute_workflow(input_data)
# Ensure result is properly formatted
if not isinstance(result, dict):
result = {"result": result}
if "success" not in result:
result["success"] = True
result["execution_mode"] = "single-agent"
await self.notify_status("execution", "completed", result)
return result
except Exception as e:
error_result = {
"success": False,
"error": str(e),
"message": f"Workflow execution failed: {str(e)}",
"execution_mode": "multi-agent" if use_orchestration else "single-agent"
}
await self.notify_status("execution", "error", error_result)
return error_result
async def get_orchestration_status(self) -> Dict[str, Any]:
"""Get current status of all agents in the orchestration cluster"""
agent_status = {}
for agent_id, agent in self.orchestrator.agents.items():
is_healthy = await self.orchestrator.check_agent_health(agent_id)
agent_status[agent_id] = {
"name": agent["name"],
"endpoint": agent["endpoint"],
"model": agent["model"],
"specialization": agent["specialization"],
"status": "healthy" if is_healthy else "unhealthy",
"timeout": agent["timeout"]
}
available_agents = await self.orchestrator.get_available_agents()
return {
"total_agents": len(self.orchestrator.agents),
"healthy_agents": len(available_agents),
"available_agents": available_agents,
"agent_details": agent_status,
"orchestration_ready": len(available_agents) > 0
}
async def get_node_definitions(self) -> List[Dict[str, Any]]:
"""Get available node type definitions"""
return [
{
"type": "n8n-nodes-base.webhook",
"name": "Webhook",
"description": "HTTP endpoint trigger",
"category": "trigger",
"color": "#ff6b6b",
"icon": "webhook"
},
{
"type": "n8n-nodes-base.set",
"name": "Set",
"description": "Data transformation and assignment",
"category": "transform",
"color": "#4ecdc4",
"icon": "settings"
},
{
"type": "n8n-nodes-base.switch",
"name": "Switch",
"description": "Conditional routing",
"category": "logic",
"color": "#45b7d1",
"icon": "git-branch"
},
{
"type": "n8n-nodes-base.httpRequest",
"name": "HTTP Request",
"description": "Make HTTP requests to APIs",
"category": "action",
"color": "#96ceb4",
"icon": "cpu"
},
{
"type": "n8n-nodes-base.respondToWebhook",
"name": "Respond to Webhook",
"description": "Send HTTP response",
"category": "response",
"color": "#feca57",
"icon": "send"
}
]
async def get_execution_modes(self) -> List[Dict[str, Any]]:
"""Get available execution modes"""
orchestration_status = await self.get_orchestration_status()
modes = [
{
"id": "single-agent",
"name": "Single Agent Execution",
"description": "Execute workflow on local McPlan engine",
"available": True,
"performance": "Fast, sequential execution",
"use_case": "Simple workflows, development, testing"
}
]
if orchestration_status["orchestration_ready"]:
modes.append({
"id": "multi-agent",
"name": "Multi-Agent Orchestration",
"description": f"Distribute workflow across {orchestration_status['healthy_agents']} agents",
"available": True,
"performance": "Parallel execution, higher throughput",
"use_case": "Complex workflows, production, scaling",
"agents": orchestration_status["available_agents"]
})
else:
modes.append({
"id": "multi-agent",
"name": "Multi-Agent Orchestration",
"description": "No agents available for orchestration",
"available": False,
"performance": "Unavailable",
"use_case": "Requires healthy Ollama agents in cluster"
})
return modes
async def test_orchestration(self) -> Dict[str, Any]:
"""Test multi-agent orchestration with a simple task"""
test_nodes = [
{
"id": "test-node-1",
"type": "test",
"parameters": {"message": "Hello from orchestration test"}
}
]
result = await self.orchestrator.orchestrate_workflow(test_nodes)
return {
"test_completed": True,
"timestamp": datetime.now().isoformat(),
**result
}