#!/usr/bin/env python3 """ BZZZ Integration Service for WHOOSH Connects WHOOSH to the existing BZZZ distributed system for P2P team collaboration """ import asyncio import json import logging import aiohttp from typing import Dict, List, Optional, Any from datetime import datetime from dataclasses import dataclass, asdict from enum import Enum logger = logging.getLogger(__name__) class AgentRole(Enum): """Agent roles from BZZZ system""" SENIOR_ARCHITECT = "senior_architect" FRONTEND_DEVELOPER = "frontend_developer" BACKEND_DEVELOPER = "backend_developer" DEVOPS_ENGINEER = "devops_engineer" PROJECT_MANAGER = "project_manager" AI_COORDINATOR = "ai_coordinator" @dataclass class BzzzDecision: """BZZZ decision structure""" id: str title: str description: str author_role: str context: Dict[str, Any] timestamp: datetime ucxl_address: Optional[str] = None @dataclass class TeamMember: """Team member in BZZZ network""" agent_id: str role: AgentRole endpoint: str capabilities: List[str] status: str = "online" class BzzzIntegrationService: """ Service for integrating WHOOSH with the existing BZZZ distributed system. Provides P2P team collaboration, decision publishing, and consensus mechanisms. """ def __init__(self, config: Optional[Dict[str, Any]] = None): self.config = config or self._default_config() self.bzzz_endpoints = self.config.get("bzzz_endpoints", []) self.agent_id = self.config.get("agent_id", "whoosh-coordinator") self.role = AgentRole(self.config.get("role", "ai_coordinator")) self.session: Optional[aiohttp.ClientSession] = None self.team_members: Dict[str, TeamMember] = {} self.active_decisions: Dict[str, BzzzDecision] = {} def _default_config(self) -> Dict[str, Any]: """Default BZZZ integration configuration""" return { "bzzz_endpoints": [ # Direct BZZZ connections disabled - WHOOSH should use BZZZ API instead # "http://192.168.1.27:8080", # walnut # "http://192.168.1.72:8080", # acacia # "http://192.168.1.113:8080", # ironwood ], "agent_id": "whoosh-coordinator", "role": "ai_coordinator", "discovery_interval": 30, "health_check_interval": 60, "decision_sync_interval": 15, } async def initialize(self) -> bool: """Initialize BZZZ integration service""" try: logger.info("๐Ÿ”Œ Initializing BZZZ Integration Service") # Create HTTP session self.session = aiohttp.ClientSession( timeout=aiohttp.ClientTimeout(total=30) ) # Register with BZZZ network await self._register_with_network() # Discover team members await self._discover_team_members() # Start background tasks asyncio.create_task(self._decision_sync_loop()) asyncio.create_task(self._health_check_loop()) logger.info(f"โœ… BZZZ Integration initialized with {len(self.team_members)} team members") return True except Exception as e: logger.error(f"โŒ Failed to initialize BZZZ integration: {e}") return False async def _register_with_network(self) -> None: """Register WHOOSH coordinator with BZZZ network""" registration_data = { "agent_id": self.agent_id, "role": self.role.value, "capabilities": [ "ai_coordination", "workflow_orchestration", "task_distribution", "performance_monitoring" ], "endpoint": "http://localhost:8000", # WHOOSH backend "metadata": { "system": "WHOOSH", "version": "6.2", "specialization": "AI Orchestration Platform" } } for endpoint in self.bzzz_endpoints: try: async with self.session.post( f"{endpoint}/api/agent/register", json=registration_data ) as response: if response.status == 200: result = await response.json() logger.info(f"โœ… Registered with BZZZ node: {endpoint}") logger.debug(f"Registration result: {result}") else: logger.warning(f"โš ๏ธ Failed to register with {endpoint}: {response.status}") except Exception as e: logger.warning(f"โš ๏ธ Could not connect to BZZZ endpoint {endpoint}: {e}") async def _discover_team_members(self) -> None: """Discover active team members in BZZZ network""" discovered_members = {} for endpoint in self.bzzz_endpoints: try: async with self.session.get(f"{endpoint}/api/agents") as response: if response.status == 200: agents_data = await response.json() for agent_data in agents_data.get("agents", []): if agent_data["agent_id"] != self.agent_id: # Don't include ourselves member = TeamMember( agent_id=agent_data["agent_id"], role=AgentRole(agent_data.get("role", "backend_developer")), endpoint=agent_data.get("endpoint", endpoint), capabilities=agent_data.get("capabilities", []), status=agent_data.get("status", "online") ) discovered_members[member.agent_id] = member except Exception as e: logger.warning(f"โš ๏ธ Failed to discover members from {endpoint}: {e}") self.team_members = discovered_members logger.info(f"๐Ÿ” Discovered {len(self.team_members)} team members") for member in self.team_members.values(): logger.debug(f" - {member.agent_id} ({member.role.value}) @ {member.endpoint}") async def publish_decision( self, title: str, description: str, context: Dict[str, Any], ucxl_address: Optional[str] = None ) -> Optional[str]: """ Publish a decision to the BZZZ network for team consensus Returns decision ID if successful """ try: decision_data = { "title": title, "description": description, "author_role": self.role.value, "context": context, "ucxl_address": ucxl_address, "timestamp": datetime.utcnow().isoformat() } # Try to publish to available BZZZ nodes for endpoint in self.bzzz_endpoints: try: async with self.session.post( f"{endpoint}/api/decisions", json=decision_data ) as response: if response.status == 201: result = await response.json() decision_id = result.get("decision_id") # Store locally decision = BzzzDecision( id=decision_id, title=title, description=description, author_role=self.role.value, context=context, timestamp=datetime.utcnow(), ucxl_address=ucxl_address ) self.active_decisions[decision_id] = decision logger.info(f"๐Ÿ“ Published decision: {title} (ID: {decision_id})") return decision_id except Exception as e: logger.warning(f"โš ๏ธ Failed to publish to {endpoint}: {e}") continue logger.error("โŒ Failed to publish decision to any BZZZ node") return None except Exception as e: logger.error(f"โŒ Error publishing decision: {e}") return None async def get_team_consensus(self, decision_id: str) -> Optional[Dict[str, Any]]: """Get consensus status for a decision from team members""" try: consensus_data = {} for endpoint in self.bzzz_endpoints: try: async with self.session.get( f"{endpoint}/api/decisions/{decision_id}/consensus" ) as response: if response.status == 200: consensus = await response.json() consensus_data[endpoint] = consensus except Exception as e: logger.warning(f"โš ๏ธ Failed to get consensus from {endpoint}: {e}") if consensus_data: # Aggregate consensus across nodes total_votes = 0 approvals = 0 for node_consensus in consensus_data.values(): votes = node_consensus.get("votes", []) total_votes += len(votes) approvals += sum(1 for vote in votes if vote.get("approval", False)) return { "decision_id": decision_id, "total_votes": total_votes, "approvals": approvals, "approval_rate": approvals / total_votes if total_votes > 0 else 0, "consensus_reached": approvals >= len(self.team_members) * 0.6, # 60% threshold "details": consensus_data } return None except Exception as e: logger.error(f"โŒ Error getting team consensus: {e}") return None async def coordinate_task_assignment( self, task_description: str, required_capabilities: List[str], priority: str = "medium" ) -> Optional[Dict[str, Any]]: """ Coordinate task assignment across team members based on capabilities and availability """ try: # Find suitable team members suitable_members = [] for member in self.team_members.values(): if member.status == "online": capability_match = len(set(required_capabilities) & set(member.capabilities)) if capability_match > 0: suitable_members.append({ "member": member, "capability_score": capability_match / len(required_capabilities), "availability_score": 1.0 if member.status == "online" else 0.5 }) # Sort by combined score suitable_members.sort( key=lambda x: x["capability_score"] + x["availability_score"], reverse=True ) if not suitable_members: logger.warning("โš ๏ธ No suitable team members found for task") return None # Create coordination decision best_member = suitable_members[0]["member"] decision_context = { "task_description": task_description, "required_capabilities": required_capabilities, "assigned_to": best_member.agent_id, "assignment_reason": f"Best capability match ({suitable_members[0]['capability_score']:.2f})", "priority": priority, "alternatives": [ { "agent_id": sm["member"].agent_id, "score": sm["capability_score"] + sm["availability_score"] } for sm in suitable_members[1:3] # Top 3 alternatives ] } decision_id = await self.publish_decision( title=f"Task Assignment: {task_description[:50]}{'...' if len(task_description) > 50 else ''}", description=f"Assigning task to {best_member.agent_id} based on capabilities and availability", context=decision_context ) return { "decision_id": decision_id, "assigned_to": best_member.agent_id, "assignment_score": suitable_members[0]["capability_score"] + suitable_members[0]["availability_score"], "alternatives": decision_context["alternatives"] } except Exception as e: logger.error(f"โŒ Error coordinating task assignment: {e}") return None async def _decision_sync_loop(self) -> None: """Background task to sync decisions from BZZZ network""" while True: try: await self._sync_recent_decisions() await asyncio.sleep(self.config["decision_sync_interval"]) except Exception as e: logger.error(f"โŒ Error in decision sync loop: {e}") await asyncio.sleep(30) # Wait longer on error async def _sync_recent_decisions(self) -> None: """Sync recent decisions from BZZZ network""" try: for endpoint in self.bzzz_endpoints: try: # Get recent decisions (last hour) params = { "since": (datetime.utcnow().timestamp() - 3600) # Last hour } async with self.session.get( f"{endpoint}/api/decisions", params=params ) as response: if response.status == 200: decisions_data = await response.json() for decision_data in decisions_data.get("decisions", []): decision_id = decision_data["id"] if decision_id not in self.active_decisions: decision = BzzzDecision( id=decision_id, title=decision_data["title"], description=decision_data["description"], author_role=decision_data["author_role"], context=decision_data.get("context", {}), timestamp=datetime.fromisoformat(decision_data["timestamp"]), ucxl_address=decision_data.get("ucxl_address") ) self.active_decisions[decision_id] = decision logger.debug(f"๐Ÿ“ฅ Synced decision: {decision.title}") except Exception as e: logger.warning(f"โš ๏ธ Failed to sync from {endpoint}: {e}") except Exception as e: logger.error(f"โŒ Error syncing decisions: {e}") async def _health_check_loop(self) -> None: """Background task to check health of team members""" while True: try: await self._check_team_health() await asyncio.sleep(self.config["health_check_interval"]) except Exception as e: logger.error(f"โŒ Error in health check loop: {e}") await asyncio.sleep(60) # Wait longer on error async def _check_team_health(self) -> None: """Check health status of all team members""" try: for member_id, member in self.team_members.items(): try: async with self.session.get( f"{member.endpoint}/api/agent/status", timeout=aiohttp.ClientTimeout(total=10) ) as response: if response.status == 200: status_data = await response.json() member.status = status_data.get("status", "online") else: member.status = "offline" except Exception: member.status = "offline" except Exception as e: logger.error(f"โŒ Error checking team health: {e}") async def get_team_status(self) -> Dict[str, Any]: """Get current team status and statistics""" try: online_members = sum(1 for m in self.team_members.values() if m.status == "online") role_distribution = {} for member in self.team_members.values(): role = member.role.value role_distribution[role] = role_distribution.get(role, 0) + 1 recent_decisions = [ { "id": decision.id, "title": decision.title, "author_role": decision.author_role, "timestamp": decision.timestamp.isoformat() } for decision in sorted( self.active_decisions.values(), key=lambda d: d.timestamp, reverse=True )[:10] # Last 10 decisions ] return { "total_members": len(self.team_members), "online_members": online_members, "offline_members": len(self.team_members) - online_members, "role_distribution": role_distribution, "active_decisions": len(self.active_decisions), "recent_decisions": recent_decisions, "network_health": online_members / len(self.team_members) if self.team_members else 0 } except Exception as e: logger.error(f"โŒ Error getting team status: {e}") return { "total_members": 0, "online_members": 0, "offline_members": 0, "role_distribution": {}, "active_decisions": 0, "recent_decisions": [], "network_health": 0 } async def cleanup(self) -> None: """Cleanup BZZZ integration resources""" try: if self.session: await self.session.close() logger.info("๐Ÿงน BZZZ Integration Service cleanup completed") except Exception as e: logger.error(f"โŒ Error during cleanup: {e}") # Global service instance bzzz_service = BzzzIntegrationService()