 d7ad321176
			
		
	
	d7ad321176
	
	
	
		
			
			This comprehensive implementation includes: - FastAPI backend with MCP server integration - React/TypeScript frontend with Vite - PostgreSQL database with Redis caching - Grafana/Prometheus monitoring stack - Docker Compose orchestration - Full MCP protocol support for Claude Code integration Features: - Agent discovery and management across network - Visual workflow editor and execution engine - Real-time task coordination and monitoring - Multi-model support with specialized agents - Distributed development task allocation 🤖 Generated with [Claude Code](https://claude.ai/code) Co-Authored-By: Claude <noreply@anthropic.com>
		
			
				
	
	
		
			322 lines
		
	
	
		
			14 KiB
		
	
	
	
		
			Python
		
	
	
		
			Executable File
		
	
	
	
	
			
		
		
	
	
			322 lines
		
	
	
		
			14 KiB
		
	
	
	
		
			Python
		
	
	
		
			Executable File
		
	
	
	
	
| #!/usr/bin/env python3
 | |
| """
 | |
| Hive Cluster Coordination for n8n Workflow Development
 | |
| Coordinates distributed development of intelligent task allocation workflows
 | |
| """
 | |
| 
 | |
| import asyncio
 | |
| import aiohttp
 | |
| import json
 | |
| import time
 | |
| from typing import Dict, List
 | |
| from datetime import datetime
 | |
| 
 | |
| # Cluster configuration
 | |
| AGENTS = {
 | |
|     "walnut": {
 | |
|         "endpoint": "http://192.168.1.27:11434",
 | |
|         "model": "starcoder2:15b",
 | |
|         "specialty": "Senior Full-Stack Development & Architecture",
 | |
|         "capabilities": ["workflow_design", "frontend_architecture", "complex_coordination"]
 | |
|     },
 | |
|     "ironwood": {
 | |
|         "endpoint": "http://192.168.1.113:11434", 
 | |
|         "model": "deepseek-coder-v2",
 | |
|         "specialty": "Backend Development & Code Analysis",
 | |
|         "capabilities": ["api_design", "database_schema", "backend_logic"]
 | |
|     },
 | |
|     "acacia": {
 | |
|         "endpoint": "http://192.168.1.72:11434",
 | |
|         "model": "deepseek-r1:7b", 
 | |
|         "specialty": "Infrastructure, DevOps & System Architecture",
 | |
|         "capabilities": ["deployment", "n8n_integration", "system_architecture"]
 | |
|     },
 | |
|     "rosewood": {
 | |
|         "endpoint": "http://192.168.1.132:11434",
 | |
|         "model": "deepseek-r1:8b",
 | |
|         "specialty": "Quality Assurance, Testing & Code Review", 
 | |
|         "capabilities": ["testing_workflows", "quality_validation", "performance_testing"]
 | |
|     },
 | |
|     "oak": {
 | |
|         "endpoint": "http://192.168.1.135:11434",
 | |
|         "model": "mistral:7b-instruct",
 | |
|         "specialty": "iOS/macOS Development & Apple Ecosystem",
 | |
|         "capabilities": ["mobile_integration", "apple_ecosystem", "native_apps"]
 | |
|     }
 | |
| }
 | |
| 
 | |
| class HiveN8NCoordinator:
 | |
|     def __init__(self):
 | |
|         self.session = None
 | |
|         self.results = {}
 | |
|         
 | |
|     async def __aenter__(self):
 | |
|         self.session = aiohttp.ClientSession(timeout=aiohttp.ClientTimeout(total=300))
 | |
|         return self
 | |
|         
 | |
|     async def __aexit__(self, exc_type, exc_val, exc_tb):
 | |
|         if self.session:
 | |
|             await self.session.close()
 | |
|     
 | |
|     async def execute_agent_task(self, agent_id: str, task: Dict) -> Dict:
 | |
|         """Execute a task on a specific agent"""
 | |
|         agent = AGENTS[agent_id]
 | |
|         
 | |
|         print(f"🎯 Assigning to {agent_id.upper()}: {task['title']}")
 | |
|         
 | |
|         prompt = f"""You are a {agent['specialty']} specialist working as part of a distributed AI development cluster. 
 | |
| 
 | |
| TASK: {task['title']}
 | |
| 
 | |
| CONTEXT: We are building intelligent n8n workflows for automatic task allocation based on a 25-person software company model. The existing framework includes role-based AI agent workflows with:
 | |
| 
 | |
| - Executive Leadership Roles (CEO, CTO, Product Manager)
 | |
| - Engineering Roles (Frontend, Backend, DevOps, Security, QA)  
 | |
| - Support & Business Roles (Technical Writer, Developer Advocate, Marketing, Customer Success)
 | |
| - Coordination & Management Roles (Agent Coordinator, Knowledge Manager, Scrum Master)
 | |
| 
 | |
| Your specific assignment: {task['description']}
 | |
| 
 | |
| REQUIREMENTS:
 | |
| {chr(10).join(f"- {req}" for req in task['requirements'])}
 | |
| 
 | |
| DELIVERABLES:
 | |
| {chr(10).join(f"- {deliverable}" for deliverable in task['deliverables'])}
 | |
| 
 | |
| Please provide a comprehensive solution that integrates with the existing framework and enhances the automatic task allocation capabilities. Focus on your area of expertise while considering the broader system architecture.
 | |
| 
 | |
| Respond with detailed technical solutions, code examples, and implementation guidance."""
 | |
| 
 | |
|         try:
 | |
|             async with self.session.post(
 | |
|                 f"{agent['endpoint']}/api/generate",
 | |
|                 json={
 | |
|                     "model": agent['model'],
 | |
|                     "prompt": prompt,
 | |
|                     "stream": False,
 | |
|                     "options": {
 | |
|                         "num_predict": 4000,
 | |
|                         "temperature": 0.7
 | |
|                     }
 | |
|                 }
 | |
|             ) as response:
 | |
|                 if response.status == 200:
 | |
|                     result = await response.json()
 | |
|                     
 | |
|                     return {
 | |
|                         "agent": agent_id,
 | |
|                         "task": task['title'],
 | |
|                         "status": "completed",
 | |
|                         "response": result.get('response', ''),
 | |
|                         "model": agent['model'],
 | |
|                         "tokens_generated": result.get('eval_count', 0),
 | |
|                         "generation_time": result.get('eval_duration', 0) / 1000000000,  # Convert to seconds
 | |
|                         "timestamp": datetime.now().isoformat()
 | |
|                     }
 | |
|                 else:
 | |
|                     return {
 | |
|                         "agent": agent_id,
 | |
|                         "task": task['title'], 
 | |
|                         "status": "failed",
 | |
|                         "error": f"HTTP {response.status}",
 | |
|                         "timestamp": datetime.now().isoformat()
 | |
|                     }
 | |
|                     
 | |
|         except Exception as e:
 | |
|             return {
 | |
|                 "agent": agent_id,
 | |
|                 "task": task['title'],
 | |
|                 "status": "failed", 
 | |
|                 "error": str(e),
 | |
|                 "timestamp": datetime.now().isoformat()
 | |
|             }
 | |
|     
 | |
|     async def coordinate_development(self):
 | |
|         """Coordinate the distributed development of n8n workflows"""
 | |
|         
 | |
|         print("🐝 HIVE CLUSTER N8N WORKFLOW DEVELOPMENT COORDINATION")
 | |
|         print("=" * 70)
 | |
|         print(f"🚀 Coordinating {len(AGENTS)} specialized AI agents")
 | |
|         print(f"🎯 Target: Intelligent task allocation workflows for 25-person software company")
 | |
|         print()
 | |
|         
 | |
|         # Define tasks for each agent
 | |
|         tasks = {
 | |
|             "walnut": {
 | |
|                 "title": "Intelligent Task Allocation Algorithm Design",
 | |
|                 "description": "Design the core intelligent task allocation algorithms and workflow architecture for n8n",
 | |
|                 "requirements": [
 | |
|                     "Analyze agent capabilities and performance characteristics",
 | |
|                     "Design dynamic task routing based on complexity and specialty",
 | |
|                     "Create load balancing algorithms for optimal resource utilization", 
 | |
|                     "Design failure handling and fallback mechanisms",
 | |
|                     "Plan integration with existing role-based workflow system"
 | |
|                 ],
 | |
|                 "deliverables": [
 | |
|                     "Task allocation algorithm specifications",
 | |
|                     "Workflow architecture diagrams and documentation",
 | |
|                     "Agent capability mapping and scoring system",
 | |
|                     "Dynamic routing logic and decision trees",
 | |
|                     "Integration plan with existing n8n workflows"
 | |
|                 ]
 | |
|             },
 | |
|             
 | |
|             "ironwood": {
 | |
|                 "title": "Backend APIs and Database Schema for Task Routing",
 | |
|                 "description": "Implement robust backend APIs and database schema for intelligent task routing and monitoring",
 | |
|                 "requirements": [
 | |
|                     "Design REST APIs for task submission and agent management",
 | |
|                     "Create database schema for task tracking and agent performance",
 | |
|                     "Implement real-time task queue management system",
 | |
|                     "Build agent health monitoring and performance metrics",
 | |
|                     "Design webhook endpoints for n8n integration"
 | |
|                 ],
 | |
|                 "deliverables": [
 | |
|                     "Complete REST API specification and implementation",
 | |
|                     "Database schema with indexes and performance optimization",
 | |
|                     "Task queue management system with priority handling",
 | |
|                     "Real-time monitoring APIs with metrics collection",
 | |
|                     "Webhook endpoints for seamless n8n integration"
 | |
|                 ]
 | |
|             },
 | |
|             
 | |
|             "acacia": {
 | |
|                 "title": "n8n Workflow Deployment and Cluster Integration",
 | |
|                 "description": "Set up production-ready n8n workflow deployment with full cluster integration",
 | |
|                 "requirements": [
 | |
|                     "Deploy enhanced n8n workflows to production environment",
 | |
|                     "Configure cluster integration with all 6 agents",
 | |
|                     "Set up monitoring and alerting for workflow performance",
 | |
|                     "Implement backup and recovery procedures",
 | |
|                     "Configure security and access controls"
 | |
|                 ],
 | |
|                 "deliverables": [
 | |
|                     "Production deployment scripts and configurations",
 | |
|                     "Complete cluster integration with agent discovery",
 | |
|                     "Monitoring dashboard and alerting system",
 | |
|                     "Backup and recovery documentation and scripts", 
 | |
|                     "Security configuration and access control setup"
 | |
|                 ]
 | |
|             },
 | |
|             
 | |
|             "rosewood": {
 | |
|                 "title": "Comprehensive Testing and Quality Assurance Workflows",
 | |
|                 "description": "Develop comprehensive testing strategies and quality assurance workflows for the task allocation system",
 | |
|                 "requirements": [
 | |
|                     "Create automated testing suites for all workflow components",
 | |
|                     "Design performance testing and load testing strategies",
 | |
|                     "Implement quality metrics and success criteria",
 | |
|                     "Create integration testing for agent coordination",
 | |
|                     "Design monitoring and alerting for system health"
 | |
|                 ],
 | |
|                 "deliverables": [
 | |
|                     "Automated test suites for n8n workflows and APIs",
 | |
|                     "Performance testing framework and benchmarks",
 | |
|                     "Quality metrics dashboard and reporting",
 | |
|                     "Integration testing scenarios and validation",
 | |
|                     "System health monitoring and alerting configuration"
 | |
|                 ]
 | |
|             },
 | |
|             
 | |
|             "oak": {
 | |
|                 "title": "iOS/macOS Integration and Mobile Task Management",
 | |
|                 "description": "Create iOS/macOS integration components for mobile task management and monitoring",
 | |
|                 "requirements": [
 | |
|                     "Design native iOS/macOS app for task monitoring",
 | |
|                     "Create API integration for real-time cluster status",
 | |
|                     "Implement push notifications for task completion",
 | |
|                     "Design mobile-friendly task submission interface",
 | |
|                     "Plan Apple ecosystem integration features"
 | |
|                 ],
 | |
|                 "deliverables": [
 | |
|                     "iOS/macOS app design and architecture",
 | |
|                     "API integration specifications and implementation",
 | |
|                     "Push notification system design",
 | |
|                     "Mobile task submission interface mockups",
 | |
|                     "Apple ecosystem integration roadmap"
 | |
|                 ]
 | |
|             }
 | |
|         }
 | |
|         
 | |
|         # Execute all tasks in parallel
 | |
|         print("🔄 Executing tasks across the cluster...")
 | |
|         print()
 | |
|         
 | |
|         task_coroutines = []
 | |
|         for agent_id, task in tasks.items():
 | |
|             task_coroutines.append(self.execute_agent_task(agent_id, task))
 | |
|         
 | |
|         # Wait for all tasks to complete
 | |
|         results = await asyncio.gather(*task_coroutines, return_exceptions=True)
 | |
|         
 | |
|         # Process results
 | |
|         successful_tasks = 0
 | |
|         failed_tasks = 0
 | |
|         
 | |
|         print("\n" + "=" * 70)
 | |
|         print("📊 DEVELOPMENT COORDINATION RESULTS")
 | |
|         print("=" * 70)
 | |
|         
 | |
|         for result in results:
 | |
|             if isinstance(result, Exception):
 | |
|                 print(f"❌ Task failed with exception: {result}")
 | |
|                 failed_tasks += 1
 | |
|                 continue
 | |
|                 
 | |
|             if result['status'] == 'completed':
 | |
|                 print(f"✅ {result['agent'].upper()}: {result['task']}")
 | |
|                 print(f"   📝 Response: {len(result['response'])} characters")
 | |
|                 print(f"   🎯 Tokens: {result['tokens_generated']}")
 | |
|                 print(f"   ⏱️  Time: {result['generation_time']:.1f}s")
 | |
|                 successful_tasks += 1
 | |
|             else:
 | |
|                 print(f"❌ {result['agent'].upper()}: {result['task']} - {result.get('error', 'Unknown error')}")
 | |
|                 failed_tasks += 1
 | |
|             print()
 | |
|         
 | |
|         # Save detailed results
 | |
|         timestamp = int(time.time())
 | |
|         results_file = f"/home/tony/AI/projects/hive/results/n8n_coordination_{timestamp}.json"
 | |
|         
 | |
|         with open(results_file, 'w') as f:
 | |
|             json.dump({
 | |
|                 "coordination_summary": {
 | |
|                     "total_agents": len(AGENTS),
 | |
|                     "successful_tasks": successful_tasks, 
 | |
|                     "failed_tasks": failed_tasks,
 | |
|                     "coordination_time": datetime.now().isoformat(),
 | |
|                     "target": "n8n intelligent task allocation workflows"
 | |
|                 },
 | |
|                 "task_results": [r for r in results if not isinstance(r, Exception)],
 | |
|                 "agent_configuration": AGENTS
 | |
|             }, f, indent=2)
 | |
|         
 | |
|         print("🎉 COORDINATION SUMMARY")
 | |
|         print(f"   📊 Total Agents: {len(AGENTS)}")
 | |
|         print(f"   ✅ Successful: {successful_tasks}")
 | |
|         print(f"   ❌ Failed: {failed_tasks}")
 | |
|         print(f"   📁 Results: {results_file}")
 | |
|         print()
 | |
|         
 | |
|         if successful_tasks > 0:
 | |
|             print("🚀 Next Steps:")
 | |
|             print("   1. Review detailed agent responses for implementation details")
 | |
|             print("   2. Integrate solutions from each agent into cohesive system")
 | |
|             print("   3. Deploy enhanced workflows to n8n production environment")
 | |
|             print("   4. Test intelligent task allocation with real workloads")
 | |
|             print("   5. Monitor performance and optimize based on metrics")
 | |
|         
 | |
|         return results
 | |
| 
 | |
| async def main():
 | |
|     """Main coordination function"""
 | |
|     
 | |
|     # Ensure results directory exists
 | |
|     import os
 | |
|     os.makedirs("/home/tony/AI/projects/hive/results", exist_ok=True)
 | |
|     
 | |
|     async with HiveN8NCoordinator() as coordinator:
 | |
|         await coordinator.coordinate_development()
 | |
| 
 | |
| if __name__ == "__main__":
 | |
|     asyncio.run(main()) |