Files
hive/backend/app/services/bzzz_integration_service.py
anthonyrawlins 268214d971 Major WHOOSH system refactoring and feature enhancements
- Migrated from HIVE branding to WHOOSH across all components
- Enhanced backend API with new services: AI models, BZZZ integration, templates, members
- Added comprehensive testing suite with security, performance, and integration tests
- Improved frontend with new components for project setup, AI models, and team management
- Updated MCP server implementation with WHOOSH-specific tools and resources
- Enhanced deployment configurations with production-ready Docker setups
- Added comprehensive documentation and setup guides
- Implemented age encryption service and UCXL integration

🤖 Generated with [Claude Code](https://claude.ai/code)

Co-Authored-By: Claude <noreply@anthropic.com>
2025-08-27 08:34:48 +10:00

471 lines
19 KiB
Python

#!/usr/bin/env python3
"""
BZZZ Integration Service for WHOOSH
Connects WHOOSH to the existing BZZZ distributed system for P2P team collaboration
"""
import asyncio
import json
import logging
import aiohttp
from typing import Dict, List, Optional, Any
from datetime import datetime
from dataclasses import dataclass, asdict
from enum import Enum
logger = logging.getLogger(__name__)
class AgentRole(Enum):
"""Agent roles from BZZZ system"""
SENIOR_ARCHITECT = "senior_architect"
FRONTEND_DEVELOPER = "frontend_developer"
BACKEND_DEVELOPER = "backend_developer"
DEVOPS_ENGINEER = "devops_engineer"
PROJECT_MANAGER = "project_manager"
AI_COORDINATOR = "ai_coordinator"
@dataclass
class BzzzDecision:
"""BZZZ decision structure"""
id: str
title: str
description: str
author_role: str
context: Dict[str, Any]
timestamp: datetime
ucxl_address: Optional[str] = None
@dataclass
class TeamMember:
"""Team member in BZZZ network"""
agent_id: str
role: AgentRole
endpoint: str
capabilities: List[str]
status: str = "online"
class BzzzIntegrationService:
"""
Service for integrating WHOOSH with the existing BZZZ distributed system.
Provides P2P team collaboration, decision publishing, and consensus mechanisms.
"""
def __init__(self, config: Optional[Dict[str, Any]] = None):
self.config = config or self._default_config()
self.bzzz_endpoints = self.config.get("bzzz_endpoints", [])
self.agent_id = self.config.get("agent_id", "whoosh-coordinator")
self.role = AgentRole(self.config.get("role", "ai_coordinator"))
self.session: Optional[aiohttp.ClientSession] = None
self.team_members: Dict[str, TeamMember] = {}
self.active_decisions: Dict[str, BzzzDecision] = {}
def _default_config(self) -> Dict[str, Any]:
"""Default BZZZ integration configuration"""
return {
"bzzz_endpoints": [
# Direct BZZZ connections disabled - WHOOSH should use BZZZ API instead
# "http://192.168.1.27:8080", # walnut
# "http://192.168.1.72:8080", # acacia
# "http://192.168.1.113:8080", # ironwood
],
"agent_id": "whoosh-coordinator",
"role": "ai_coordinator",
"discovery_interval": 30,
"health_check_interval": 60,
"decision_sync_interval": 15,
}
async def initialize(self) -> bool:
"""Initialize BZZZ integration service"""
try:
logger.info("🔌 Initializing BZZZ Integration Service")
# Create HTTP session
self.session = aiohttp.ClientSession(
timeout=aiohttp.ClientTimeout(total=30)
)
# Register with BZZZ network
await self._register_with_network()
# Discover team members
await self._discover_team_members()
# Start background tasks
asyncio.create_task(self._decision_sync_loop())
asyncio.create_task(self._health_check_loop())
logger.info(f"✅ BZZZ Integration initialized with {len(self.team_members)} team members")
return True
except Exception as e:
logger.error(f"❌ Failed to initialize BZZZ integration: {e}")
return False
async def _register_with_network(self) -> None:
"""Register WHOOSH coordinator with BZZZ network"""
registration_data = {
"agent_id": self.agent_id,
"role": self.role.value,
"capabilities": [
"ai_coordination",
"workflow_orchestration",
"task_distribution",
"performance_monitoring"
],
"endpoint": "http://localhost:8000", # WHOOSH backend
"metadata": {
"system": "WHOOSH",
"version": "6.2",
"specialization": "AI Orchestration Platform"
}
}
for endpoint in self.bzzz_endpoints:
try:
async with self.session.post(
f"{endpoint}/api/agent/register",
json=registration_data
) as response:
if response.status == 200:
result = await response.json()
logger.info(f"✅ Registered with BZZZ node: {endpoint}")
logger.debug(f"Registration result: {result}")
else:
logger.warning(f"⚠️ Failed to register with {endpoint}: {response.status}")
except Exception as e:
logger.warning(f"⚠️ Could not connect to BZZZ endpoint {endpoint}: {e}")
async def _discover_team_members(self) -> None:
"""Discover active team members in BZZZ network"""
discovered_members = {}
for endpoint in self.bzzz_endpoints:
try:
async with self.session.get(f"{endpoint}/api/agents") as response:
if response.status == 200:
agents_data = await response.json()
for agent_data in agents_data.get("agents", []):
if agent_data["agent_id"] != self.agent_id: # Don't include ourselves
member = TeamMember(
agent_id=agent_data["agent_id"],
role=AgentRole(agent_data.get("role", "backend_developer")),
endpoint=agent_data.get("endpoint", endpoint),
capabilities=agent_data.get("capabilities", []),
status=agent_data.get("status", "online")
)
discovered_members[member.agent_id] = member
except Exception as e:
logger.warning(f"⚠️ Failed to discover members from {endpoint}: {e}")
self.team_members = discovered_members
logger.info(f"🔍 Discovered {len(self.team_members)} team members")
for member in self.team_members.values():
logger.debug(f" - {member.agent_id} ({member.role.value}) @ {member.endpoint}")
async def publish_decision(
self,
title: str,
description: str,
context: Dict[str, Any],
ucxl_address: Optional[str] = None
) -> Optional[str]:
"""
Publish a decision to the BZZZ network for team consensus
Returns decision ID if successful
"""
try:
decision_data = {
"title": title,
"description": description,
"author_role": self.role.value,
"context": context,
"ucxl_address": ucxl_address,
"timestamp": datetime.utcnow().isoformat()
}
# Try to publish to available BZZZ nodes
for endpoint in self.bzzz_endpoints:
try:
async with self.session.post(
f"{endpoint}/api/decisions",
json=decision_data
) as response:
if response.status == 201:
result = await response.json()
decision_id = result.get("decision_id")
# Store locally
decision = BzzzDecision(
id=decision_id,
title=title,
description=description,
author_role=self.role.value,
context=context,
timestamp=datetime.utcnow(),
ucxl_address=ucxl_address
)
self.active_decisions[decision_id] = decision
logger.info(f"📝 Published decision: {title} (ID: {decision_id})")
return decision_id
except Exception as e:
logger.warning(f"⚠️ Failed to publish to {endpoint}: {e}")
continue
logger.error("❌ Failed to publish decision to any BZZZ node")
return None
except Exception as e:
logger.error(f"❌ Error publishing decision: {e}")
return None
async def get_team_consensus(self, decision_id: str) -> Optional[Dict[str, Any]]:
"""Get consensus status for a decision from team members"""
try:
consensus_data = {}
for endpoint in self.bzzz_endpoints:
try:
async with self.session.get(
f"{endpoint}/api/decisions/{decision_id}/consensus"
) as response:
if response.status == 200:
consensus = await response.json()
consensus_data[endpoint] = consensus
except Exception as e:
logger.warning(f"⚠️ Failed to get consensus from {endpoint}: {e}")
if consensus_data:
# Aggregate consensus across nodes
total_votes = 0
approvals = 0
for node_consensus in consensus_data.values():
votes = node_consensus.get("votes", [])
total_votes += len(votes)
approvals += sum(1 for vote in votes if vote.get("approval", False))
return {
"decision_id": decision_id,
"total_votes": total_votes,
"approvals": approvals,
"approval_rate": approvals / total_votes if total_votes > 0 else 0,
"consensus_reached": approvals >= len(self.team_members) * 0.6, # 60% threshold
"details": consensus_data
}
return None
except Exception as e:
logger.error(f"❌ Error getting team consensus: {e}")
return None
async def coordinate_task_assignment(
self,
task_description: str,
required_capabilities: List[str],
priority: str = "medium"
) -> Optional[Dict[str, Any]]:
"""
Coordinate task assignment across team members based on capabilities and availability
"""
try:
# Find suitable team members
suitable_members = []
for member in self.team_members.values():
if member.status == "online":
capability_match = len(set(required_capabilities) & set(member.capabilities))
if capability_match > 0:
suitable_members.append({
"member": member,
"capability_score": capability_match / len(required_capabilities),
"availability_score": 1.0 if member.status == "online" else 0.5
})
# Sort by combined score
suitable_members.sort(
key=lambda x: x["capability_score"] + x["availability_score"],
reverse=True
)
if not suitable_members:
logger.warning("⚠️ No suitable team members found for task")
return None
# Create coordination decision
best_member = suitable_members[0]["member"]
decision_context = {
"task_description": task_description,
"required_capabilities": required_capabilities,
"assigned_to": best_member.agent_id,
"assignment_reason": f"Best capability match ({suitable_members[0]['capability_score']:.2f})",
"priority": priority,
"alternatives": [
{
"agent_id": sm["member"].agent_id,
"score": sm["capability_score"] + sm["availability_score"]
}
for sm in suitable_members[1:3] # Top 3 alternatives
]
}
decision_id = await self.publish_decision(
title=f"Task Assignment: {task_description[:50]}{'...' if len(task_description) > 50 else ''}",
description=f"Assigning task to {best_member.agent_id} based on capabilities and availability",
context=decision_context
)
return {
"decision_id": decision_id,
"assigned_to": best_member.agent_id,
"assignment_score": suitable_members[0]["capability_score"] + suitable_members[0]["availability_score"],
"alternatives": decision_context["alternatives"]
}
except Exception as e:
logger.error(f"❌ Error coordinating task assignment: {e}")
return None
async def _decision_sync_loop(self) -> None:
"""Background task to sync decisions from BZZZ network"""
while True:
try:
await self._sync_recent_decisions()
await asyncio.sleep(self.config["decision_sync_interval"])
except Exception as e:
logger.error(f"❌ Error in decision sync loop: {e}")
await asyncio.sleep(30) # Wait longer on error
async def _sync_recent_decisions(self) -> None:
"""Sync recent decisions from BZZZ network"""
try:
for endpoint in self.bzzz_endpoints:
try:
# Get recent decisions (last hour)
params = {
"since": (datetime.utcnow().timestamp() - 3600) # Last hour
}
async with self.session.get(
f"{endpoint}/api/decisions",
params=params
) as response:
if response.status == 200:
decisions_data = await response.json()
for decision_data in decisions_data.get("decisions", []):
decision_id = decision_data["id"]
if decision_id not in self.active_decisions:
decision = BzzzDecision(
id=decision_id,
title=decision_data["title"],
description=decision_data["description"],
author_role=decision_data["author_role"],
context=decision_data.get("context", {}),
timestamp=datetime.fromisoformat(decision_data["timestamp"]),
ucxl_address=decision_data.get("ucxl_address")
)
self.active_decisions[decision_id] = decision
logger.debug(f"📥 Synced decision: {decision.title}")
except Exception as e:
logger.warning(f"⚠️ Failed to sync from {endpoint}: {e}")
except Exception as e:
logger.error(f"❌ Error syncing decisions: {e}")
async def _health_check_loop(self) -> None:
"""Background task to check health of team members"""
while True:
try:
await self._check_team_health()
await asyncio.sleep(self.config["health_check_interval"])
except Exception as e:
logger.error(f"❌ Error in health check loop: {e}")
await asyncio.sleep(60) # Wait longer on error
async def _check_team_health(self) -> None:
"""Check health status of all team members"""
try:
for member_id, member in self.team_members.items():
try:
async with self.session.get(
f"{member.endpoint}/api/agent/status",
timeout=aiohttp.ClientTimeout(total=10)
) as response:
if response.status == 200:
status_data = await response.json()
member.status = status_data.get("status", "online")
else:
member.status = "offline"
except Exception:
member.status = "offline"
except Exception as e:
logger.error(f"❌ Error checking team health: {e}")
async def get_team_status(self) -> Dict[str, Any]:
"""Get current team status and statistics"""
try:
online_members = sum(1 for m in self.team_members.values() if m.status == "online")
role_distribution = {}
for member in self.team_members.values():
role = member.role.value
role_distribution[role] = role_distribution.get(role, 0) + 1
recent_decisions = [
{
"id": decision.id,
"title": decision.title,
"author_role": decision.author_role,
"timestamp": decision.timestamp.isoformat()
}
for decision in sorted(
self.active_decisions.values(),
key=lambda d: d.timestamp,
reverse=True
)[:10] # Last 10 decisions
]
return {
"total_members": len(self.team_members),
"online_members": online_members,
"offline_members": len(self.team_members) - online_members,
"role_distribution": role_distribution,
"active_decisions": len(self.active_decisions),
"recent_decisions": recent_decisions,
"network_health": online_members / len(self.team_members) if self.team_members else 0
}
except Exception as e:
logger.error(f"❌ Error getting team status: {e}")
return {
"total_members": 0,
"online_members": 0,
"offline_members": 0,
"role_distribution": {},
"active_decisions": 0,
"recent_decisions": [],
"network_health": 0
}
async def cleanup(self) -> None:
"""Cleanup BZZZ integration resources"""
try:
if self.session:
await self.session.close()
logger.info("🧹 BZZZ Integration Service cleanup completed")
except Exception as e:
logger.error(f"❌ Error during cleanup: {e}")
# Global service instance
bzzz_service = BzzzIntegrationService()