- Migrated from HIVE branding to WHOOSH across all components - Enhanced backend API with new services: AI models, BZZZ integration, templates, members - Added comprehensive testing suite with security, performance, and integration tests - Improved frontend with new components for project setup, AI models, and team management - Updated MCP server implementation with WHOOSH-specific tools and resources - Enhanced deployment configurations with production-ready Docker setups - Added comprehensive documentation and setup guides - Implemented age encryption service and UCXL integration 🤖 Generated with [Claude Code](https://claude.ai/code) Co-Authored-By: Claude <noreply@anthropic.com>
471 lines
19 KiB
Python
471 lines
19 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
BZZZ Integration Service for WHOOSH
|
|
Connects WHOOSH to the existing BZZZ distributed system for P2P team collaboration
|
|
"""
|
|
|
|
import asyncio
|
|
import json
|
|
import logging
|
|
import aiohttp
|
|
from typing import Dict, List, Optional, Any
|
|
from datetime import datetime
|
|
from dataclasses import dataclass, asdict
|
|
from enum import Enum
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
class AgentRole(Enum):
|
|
"""Agent roles from BZZZ system"""
|
|
SENIOR_ARCHITECT = "senior_architect"
|
|
FRONTEND_DEVELOPER = "frontend_developer"
|
|
BACKEND_DEVELOPER = "backend_developer"
|
|
DEVOPS_ENGINEER = "devops_engineer"
|
|
PROJECT_MANAGER = "project_manager"
|
|
AI_COORDINATOR = "ai_coordinator"
|
|
|
|
@dataclass
|
|
class BzzzDecision:
|
|
"""BZZZ decision structure"""
|
|
id: str
|
|
title: str
|
|
description: str
|
|
author_role: str
|
|
context: Dict[str, Any]
|
|
timestamp: datetime
|
|
ucxl_address: Optional[str] = None
|
|
|
|
@dataclass
|
|
class TeamMember:
|
|
"""Team member in BZZZ network"""
|
|
agent_id: str
|
|
role: AgentRole
|
|
endpoint: str
|
|
capabilities: List[str]
|
|
status: str = "online"
|
|
|
|
class BzzzIntegrationService:
|
|
"""
|
|
Service for integrating WHOOSH with the existing BZZZ distributed system.
|
|
Provides P2P team collaboration, decision publishing, and consensus mechanisms.
|
|
"""
|
|
|
|
def __init__(self, config: Optional[Dict[str, Any]] = None):
|
|
self.config = config or self._default_config()
|
|
self.bzzz_endpoints = self.config.get("bzzz_endpoints", [])
|
|
self.agent_id = self.config.get("agent_id", "whoosh-coordinator")
|
|
self.role = AgentRole(self.config.get("role", "ai_coordinator"))
|
|
self.session: Optional[aiohttp.ClientSession] = None
|
|
self.team_members: Dict[str, TeamMember] = {}
|
|
self.active_decisions: Dict[str, BzzzDecision] = {}
|
|
|
|
def _default_config(self) -> Dict[str, Any]:
|
|
"""Default BZZZ integration configuration"""
|
|
return {
|
|
"bzzz_endpoints": [
|
|
# Direct BZZZ connections disabled - WHOOSH should use BZZZ API instead
|
|
# "http://192.168.1.27:8080", # walnut
|
|
# "http://192.168.1.72:8080", # acacia
|
|
# "http://192.168.1.113:8080", # ironwood
|
|
],
|
|
"agent_id": "whoosh-coordinator",
|
|
"role": "ai_coordinator",
|
|
"discovery_interval": 30,
|
|
"health_check_interval": 60,
|
|
"decision_sync_interval": 15,
|
|
}
|
|
|
|
async def initialize(self) -> bool:
|
|
"""Initialize BZZZ integration service"""
|
|
try:
|
|
logger.info("🔌 Initializing BZZZ Integration Service")
|
|
|
|
# Create HTTP session
|
|
self.session = aiohttp.ClientSession(
|
|
timeout=aiohttp.ClientTimeout(total=30)
|
|
)
|
|
|
|
# Register with BZZZ network
|
|
await self._register_with_network()
|
|
|
|
# Discover team members
|
|
await self._discover_team_members()
|
|
|
|
# Start background tasks
|
|
asyncio.create_task(self._decision_sync_loop())
|
|
asyncio.create_task(self._health_check_loop())
|
|
|
|
logger.info(f"✅ BZZZ Integration initialized with {len(self.team_members)} team members")
|
|
return True
|
|
|
|
except Exception as e:
|
|
logger.error(f"❌ Failed to initialize BZZZ integration: {e}")
|
|
return False
|
|
|
|
async def _register_with_network(self) -> None:
|
|
"""Register WHOOSH coordinator with BZZZ network"""
|
|
registration_data = {
|
|
"agent_id": self.agent_id,
|
|
"role": self.role.value,
|
|
"capabilities": [
|
|
"ai_coordination",
|
|
"workflow_orchestration",
|
|
"task_distribution",
|
|
"performance_monitoring"
|
|
],
|
|
"endpoint": "http://localhost:8000", # WHOOSH backend
|
|
"metadata": {
|
|
"system": "WHOOSH",
|
|
"version": "6.2",
|
|
"specialization": "AI Orchestration Platform"
|
|
}
|
|
}
|
|
|
|
for endpoint in self.bzzz_endpoints:
|
|
try:
|
|
async with self.session.post(
|
|
f"{endpoint}/api/agent/register",
|
|
json=registration_data
|
|
) as response:
|
|
if response.status == 200:
|
|
result = await response.json()
|
|
logger.info(f"✅ Registered with BZZZ node: {endpoint}")
|
|
logger.debug(f"Registration result: {result}")
|
|
else:
|
|
logger.warning(f"⚠️ Failed to register with {endpoint}: {response.status}")
|
|
|
|
except Exception as e:
|
|
logger.warning(f"⚠️ Could not connect to BZZZ endpoint {endpoint}: {e}")
|
|
|
|
async def _discover_team_members(self) -> None:
|
|
"""Discover active team members in BZZZ network"""
|
|
discovered_members = {}
|
|
|
|
for endpoint in self.bzzz_endpoints:
|
|
try:
|
|
async with self.session.get(f"{endpoint}/api/agents") as response:
|
|
if response.status == 200:
|
|
agents_data = await response.json()
|
|
|
|
for agent_data in agents_data.get("agents", []):
|
|
if agent_data["agent_id"] != self.agent_id: # Don't include ourselves
|
|
member = TeamMember(
|
|
agent_id=agent_data["agent_id"],
|
|
role=AgentRole(agent_data.get("role", "backend_developer")),
|
|
endpoint=agent_data.get("endpoint", endpoint),
|
|
capabilities=agent_data.get("capabilities", []),
|
|
status=agent_data.get("status", "online")
|
|
)
|
|
discovered_members[member.agent_id] = member
|
|
|
|
except Exception as e:
|
|
logger.warning(f"⚠️ Failed to discover members from {endpoint}: {e}")
|
|
|
|
self.team_members = discovered_members
|
|
logger.info(f"🔍 Discovered {len(self.team_members)} team members")
|
|
|
|
for member in self.team_members.values():
|
|
logger.debug(f" - {member.agent_id} ({member.role.value}) @ {member.endpoint}")
|
|
|
|
async def publish_decision(
|
|
self,
|
|
title: str,
|
|
description: str,
|
|
context: Dict[str, Any],
|
|
ucxl_address: Optional[str] = None
|
|
) -> Optional[str]:
|
|
"""
|
|
Publish a decision to the BZZZ network for team consensus
|
|
Returns decision ID if successful
|
|
"""
|
|
try:
|
|
decision_data = {
|
|
"title": title,
|
|
"description": description,
|
|
"author_role": self.role.value,
|
|
"context": context,
|
|
"ucxl_address": ucxl_address,
|
|
"timestamp": datetime.utcnow().isoformat()
|
|
}
|
|
|
|
# Try to publish to available BZZZ nodes
|
|
for endpoint in self.bzzz_endpoints:
|
|
try:
|
|
async with self.session.post(
|
|
f"{endpoint}/api/decisions",
|
|
json=decision_data
|
|
) as response:
|
|
if response.status == 201:
|
|
result = await response.json()
|
|
decision_id = result.get("decision_id")
|
|
|
|
# Store locally
|
|
decision = BzzzDecision(
|
|
id=decision_id,
|
|
title=title,
|
|
description=description,
|
|
author_role=self.role.value,
|
|
context=context,
|
|
timestamp=datetime.utcnow(),
|
|
ucxl_address=ucxl_address
|
|
)
|
|
self.active_decisions[decision_id] = decision
|
|
|
|
logger.info(f"📝 Published decision: {title} (ID: {decision_id})")
|
|
return decision_id
|
|
|
|
except Exception as e:
|
|
logger.warning(f"⚠️ Failed to publish to {endpoint}: {e}")
|
|
continue
|
|
|
|
logger.error("❌ Failed to publish decision to any BZZZ node")
|
|
return None
|
|
|
|
except Exception as e:
|
|
logger.error(f"❌ Error publishing decision: {e}")
|
|
return None
|
|
|
|
async def get_team_consensus(self, decision_id: str) -> Optional[Dict[str, Any]]:
|
|
"""Get consensus status for a decision from team members"""
|
|
try:
|
|
consensus_data = {}
|
|
|
|
for endpoint in self.bzzz_endpoints:
|
|
try:
|
|
async with self.session.get(
|
|
f"{endpoint}/api/decisions/{decision_id}/consensus"
|
|
) as response:
|
|
if response.status == 200:
|
|
consensus = await response.json()
|
|
consensus_data[endpoint] = consensus
|
|
|
|
except Exception as e:
|
|
logger.warning(f"⚠️ Failed to get consensus from {endpoint}: {e}")
|
|
|
|
if consensus_data:
|
|
# Aggregate consensus across nodes
|
|
total_votes = 0
|
|
approvals = 0
|
|
|
|
for node_consensus in consensus_data.values():
|
|
votes = node_consensus.get("votes", [])
|
|
total_votes += len(votes)
|
|
approvals += sum(1 for vote in votes if vote.get("approval", False))
|
|
|
|
return {
|
|
"decision_id": decision_id,
|
|
"total_votes": total_votes,
|
|
"approvals": approvals,
|
|
"approval_rate": approvals / total_votes if total_votes > 0 else 0,
|
|
"consensus_reached": approvals >= len(self.team_members) * 0.6, # 60% threshold
|
|
"details": consensus_data
|
|
}
|
|
|
|
return None
|
|
|
|
except Exception as e:
|
|
logger.error(f"❌ Error getting team consensus: {e}")
|
|
return None
|
|
|
|
async def coordinate_task_assignment(
|
|
self,
|
|
task_description: str,
|
|
required_capabilities: List[str],
|
|
priority: str = "medium"
|
|
) -> Optional[Dict[str, Any]]:
|
|
"""
|
|
Coordinate task assignment across team members based on capabilities and availability
|
|
"""
|
|
try:
|
|
# Find suitable team members
|
|
suitable_members = []
|
|
for member in self.team_members.values():
|
|
if member.status == "online":
|
|
capability_match = len(set(required_capabilities) & set(member.capabilities))
|
|
if capability_match > 0:
|
|
suitable_members.append({
|
|
"member": member,
|
|
"capability_score": capability_match / len(required_capabilities),
|
|
"availability_score": 1.0 if member.status == "online" else 0.5
|
|
})
|
|
|
|
# Sort by combined score
|
|
suitable_members.sort(
|
|
key=lambda x: x["capability_score"] + x["availability_score"],
|
|
reverse=True
|
|
)
|
|
|
|
if not suitable_members:
|
|
logger.warning("⚠️ No suitable team members found for task")
|
|
return None
|
|
|
|
# Create coordination decision
|
|
best_member = suitable_members[0]["member"]
|
|
decision_context = {
|
|
"task_description": task_description,
|
|
"required_capabilities": required_capabilities,
|
|
"assigned_to": best_member.agent_id,
|
|
"assignment_reason": f"Best capability match ({suitable_members[0]['capability_score']:.2f})",
|
|
"priority": priority,
|
|
"alternatives": [
|
|
{
|
|
"agent_id": sm["member"].agent_id,
|
|
"score": sm["capability_score"] + sm["availability_score"]
|
|
}
|
|
for sm in suitable_members[1:3] # Top 3 alternatives
|
|
]
|
|
}
|
|
|
|
decision_id = await self.publish_decision(
|
|
title=f"Task Assignment: {task_description[:50]}{'...' if len(task_description) > 50 else ''}",
|
|
description=f"Assigning task to {best_member.agent_id} based on capabilities and availability",
|
|
context=decision_context
|
|
)
|
|
|
|
return {
|
|
"decision_id": decision_id,
|
|
"assigned_to": best_member.agent_id,
|
|
"assignment_score": suitable_members[0]["capability_score"] + suitable_members[0]["availability_score"],
|
|
"alternatives": decision_context["alternatives"]
|
|
}
|
|
|
|
except Exception as e:
|
|
logger.error(f"❌ Error coordinating task assignment: {e}")
|
|
return None
|
|
|
|
async def _decision_sync_loop(self) -> None:
|
|
"""Background task to sync decisions from BZZZ network"""
|
|
while True:
|
|
try:
|
|
await self._sync_recent_decisions()
|
|
await asyncio.sleep(self.config["decision_sync_interval"])
|
|
except Exception as e:
|
|
logger.error(f"❌ Error in decision sync loop: {e}")
|
|
await asyncio.sleep(30) # Wait longer on error
|
|
|
|
async def _sync_recent_decisions(self) -> None:
|
|
"""Sync recent decisions from BZZZ network"""
|
|
try:
|
|
for endpoint in self.bzzz_endpoints:
|
|
try:
|
|
# Get recent decisions (last hour)
|
|
params = {
|
|
"since": (datetime.utcnow().timestamp() - 3600) # Last hour
|
|
}
|
|
|
|
async with self.session.get(
|
|
f"{endpoint}/api/decisions",
|
|
params=params
|
|
) as response:
|
|
if response.status == 200:
|
|
decisions_data = await response.json()
|
|
|
|
for decision_data in decisions_data.get("decisions", []):
|
|
decision_id = decision_data["id"]
|
|
if decision_id not in self.active_decisions:
|
|
decision = BzzzDecision(
|
|
id=decision_id,
|
|
title=decision_data["title"],
|
|
description=decision_data["description"],
|
|
author_role=decision_data["author_role"],
|
|
context=decision_data.get("context", {}),
|
|
timestamp=datetime.fromisoformat(decision_data["timestamp"]),
|
|
ucxl_address=decision_data.get("ucxl_address")
|
|
)
|
|
self.active_decisions[decision_id] = decision
|
|
logger.debug(f"📥 Synced decision: {decision.title}")
|
|
|
|
except Exception as e:
|
|
logger.warning(f"⚠️ Failed to sync from {endpoint}: {e}")
|
|
|
|
except Exception as e:
|
|
logger.error(f"❌ Error syncing decisions: {e}")
|
|
|
|
async def _health_check_loop(self) -> None:
|
|
"""Background task to check health of team members"""
|
|
while True:
|
|
try:
|
|
await self._check_team_health()
|
|
await asyncio.sleep(self.config["health_check_interval"])
|
|
except Exception as e:
|
|
logger.error(f"❌ Error in health check loop: {e}")
|
|
await asyncio.sleep(60) # Wait longer on error
|
|
|
|
async def _check_team_health(self) -> None:
|
|
"""Check health status of all team members"""
|
|
try:
|
|
for member_id, member in self.team_members.items():
|
|
try:
|
|
async with self.session.get(
|
|
f"{member.endpoint}/api/agent/status",
|
|
timeout=aiohttp.ClientTimeout(total=10)
|
|
) as response:
|
|
if response.status == 200:
|
|
status_data = await response.json()
|
|
member.status = status_data.get("status", "online")
|
|
else:
|
|
member.status = "offline"
|
|
|
|
except Exception:
|
|
member.status = "offline"
|
|
|
|
except Exception as e:
|
|
logger.error(f"❌ Error checking team health: {e}")
|
|
|
|
async def get_team_status(self) -> Dict[str, Any]:
|
|
"""Get current team status and statistics"""
|
|
try:
|
|
online_members = sum(1 for m in self.team_members.values() if m.status == "online")
|
|
|
|
role_distribution = {}
|
|
for member in self.team_members.values():
|
|
role = member.role.value
|
|
role_distribution[role] = role_distribution.get(role, 0) + 1
|
|
|
|
recent_decisions = [
|
|
{
|
|
"id": decision.id,
|
|
"title": decision.title,
|
|
"author_role": decision.author_role,
|
|
"timestamp": decision.timestamp.isoformat()
|
|
}
|
|
for decision in sorted(
|
|
self.active_decisions.values(),
|
|
key=lambda d: d.timestamp,
|
|
reverse=True
|
|
)[:10] # Last 10 decisions
|
|
]
|
|
|
|
return {
|
|
"total_members": len(self.team_members),
|
|
"online_members": online_members,
|
|
"offline_members": len(self.team_members) - online_members,
|
|
"role_distribution": role_distribution,
|
|
"active_decisions": len(self.active_decisions),
|
|
"recent_decisions": recent_decisions,
|
|
"network_health": online_members / len(self.team_members) if self.team_members else 0
|
|
}
|
|
|
|
except Exception as e:
|
|
logger.error(f"❌ Error getting team status: {e}")
|
|
return {
|
|
"total_members": 0,
|
|
"online_members": 0,
|
|
"offline_members": 0,
|
|
"role_distribution": {},
|
|
"active_decisions": 0,
|
|
"recent_decisions": [],
|
|
"network_health": 0
|
|
}
|
|
|
|
async def cleanup(self) -> None:
|
|
"""Cleanup BZZZ integration resources"""
|
|
try:
|
|
if self.session:
|
|
await self.session.close()
|
|
logger.info("🧹 BZZZ Integration Service cleanup completed")
|
|
except Exception as e:
|
|
logger.error(f"❌ Error during cleanup: {e}")
|
|
|
|
# Global service instance
|
|
bzzz_service = BzzzIntegrationService() |