- Add FUTURE_DEVELOPMENT.md with comprehensive v2 protocol specification - Add MCP integration design and implementation foundation - Add infrastructure and deployment configurations - Update system architecture for v2 evolution 🤖 Generated with [Claude Code](https://claude.ai/code) Co-Authored-By: Claude <noreply@anthropic.com>
517 lines
21 KiB
Python
517 lines
21 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
BZZZ MCP Integration Example: Collaborative Code Review
|
|
======================================================
|
|
|
|
This example demonstrates how GPT-4 agents collaborate through the BZZZ MCP
|
|
integration to perform a comprehensive code review.
|
|
|
|
Scenario: A pull request requires review from multiple specialized agents:
|
|
- Architect Agent: Reviews system design and architecture implications
|
|
- Security Agent: Analyzes security vulnerabilities
|
|
- Performance Agent: Evaluates performance impact
|
|
- Documentation Agent: Ensures proper documentation
|
|
|
|
The agents coordinate through BZZZ semantic addressing and threaded conversations.
|
|
"""
|
|
|
|
import asyncio
|
|
import json
|
|
import os
|
|
import sys
|
|
from typing import Dict, List, Any, Optional
|
|
from dataclasses import dataclass
|
|
from mcp import ClientSession, StdioServerParameters
|
|
from mcp.client.stdio import stdio_client
|
|
|
|
# Add the parent directory to the path to import BZZZ modules
|
|
sys.path.append(os.path.join(os.path.dirname(__file__), '..'))
|
|
|
|
@dataclass
|
|
class CodeReviewTask:
|
|
"""Represents a code review task"""
|
|
repository: str
|
|
pull_request_number: int
|
|
title: str
|
|
description: str
|
|
files_changed: List[str]
|
|
lines_of_code: int
|
|
complexity_score: float
|
|
security_risk: str # low, medium, high
|
|
|
|
@dataclass
|
|
class AgentRole:
|
|
"""Defines an agent role and its responsibilities"""
|
|
name: str
|
|
specialization: str
|
|
capabilities: List[str]
|
|
system_prompt: str
|
|
|
|
class CollaborativeReviewOrchestrator:
|
|
"""Orchestrates collaborative code review using BZZZ MCP integration"""
|
|
|
|
def __init__(self):
|
|
self.mcp_session: Optional[ClientSession] = None
|
|
self.agents: Dict[str, AgentRole] = {}
|
|
self.active_threads: Dict[str, Dict] = {}
|
|
|
|
async def initialize(self):
|
|
"""Initialize MCP connection to BZZZ server"""
|
|
# Connect to the BZZZ MCP server
|
|
server_params = StdioServerParameters(
|
|
command="node",
|
|
args=["/home/tony/chorus/project-queues/active/BZZZ/mcp-server/dist/index.js"]
|
|
)
|
|
|
|
self.mcp_session = await stdio_client(server_params)
|
|
print("✅ Connected to BZZZ MCP Server")
|
|
|
|
# Define agent roles
|
|
self.define_agent_roles()
|
|
|
|
def define_agent_roles(self):
|
|
"""Define the specialized agent roles for code review"""
|
|
self.agents = {
|
|
"architect": AgentRole(
|
|
name="architect",
|
|
specialization="system_architecture",
|
|
capabilities=["system_design", "architecture_review", "scalability_analysis"],
|
|
system_prompt="""You are a senior software architect reviewing code changes.
|
|
Focus on: architectural consistency, design patterns, system boundaries,
|
|
scalability implications, and integration concerns."""
|
|
),
|
|
"security": AgentRole(
|
|
name="security_expert",
|
|
specialization="security_analysis",
|
|
capabilities=["security_review", "vulnerability_analysis", "threat_modeling"],
|
|
system_prompt="""You are a security expert reviewing code for vulnerabilities.
|
|
Focus on: input validation, authentication, authorization, data protection,
|
|
injection attacks, and secure coding practices."""
|
|
),
|
|
"performance": AgentRole(
|
|
name="performance_expert",
|
|
specialization="performance_optimization",
|
|
capabilities=["performance_analysis", "optimization", "profiling"],
|
|
system_prompt="""You are a performance expert reviewing code efficiency.
|
|
Focus on: algorithmic complexity, memory usage, database queries,
|
|
caching strategies, and performance bottlenecks."""
|
|
),
|
|
"documentation": AgentRole(
|
|
name="documentation_specialist",
|
|
specialization="technical_writing",
|
|
capabilities=["documentation_review", "api_documentation", "code_comments"],
|
|
system_prompt="""You are a documentation specialist ensuring code clarity.
|
|
Focus on: code comments, API documentation, README updates,
|
|
inline documentation, and knowledge transfer."""
|
|
)
|
|
}
|
|
|
|
async def start_collaborative_review(self, task: CodeReviewTask) -> Dict[str, Any]:
|
|
"""Start a collaborative review process for the given task"""
|
|
print(f"🔍 Starting collaborative review for PR #{task.pull_request_number}")
|
|
|
|
# Step 1: Announce agents to BZZZ network
|
|
await self.announce_agents()
|
|
|
|
# Step 2: Create semantic addresses for the review
|
|
review_address = f"bzzz://*:*@{task.repository}:pr{task.pull_request_number}/review"
|
|
|
|
# Step 3: Determine required agent roles based on task characteristics
|
|
required_roles = self.determine_required_roles(task)
|
|
print(f"📋 Required roles: {', '.join(required_roles)}")
|
|
|
|
# Step 4: Create collaborative thread
|
|
thread_id = await self.create_review_thread(task, required_roles)
|
|
print(f"💬 Created review thread: {thread_id}")
|
|
|
|
# Step 5: Coordinate the review process
|
|
review_results = await self.coordinate_review(thread_id, task, required_roles)
|
|
|
|
# Step 6: Generate final review summary
|
|
final_summary = await self.generate_review_summary(thread_id, review_results)
|
|
|
|
print("✅ Collaborative review completed")
|
|
return final_summary
|
|
|
|
async def announce_agents(self):
|
|
"""Announce all agent roles to the BZZZ network"""
|
|
if not self.mcp_session:
|
|
raise RuntimeError("MCP session not initialized")
|
|
|
|
for role_name, role in self.agents.items():
|
|
result = await self.mcp_session.call_tool(
|
|
"bzzz_announce",
|
|
{
|
|
"agent_id": f"review_agent_{role_name}",
|
|
"role": role.name,
|
|
"capabilities": role.capabilities,
|
|
"specialization": role.specialization,
|
|
"max_tasks": 2
|
|
}
|
|
)
|
|
print(f"📡 Announced {role_name} agent: {result.content[0].text}")
|
|
|
|
def determine_required_roles(self, task: CodeReviewTask) -> List[str]:
|
|
"""Determine which agent roles are needed based on task characteristics"""
|
|
required = ["architect"] # Architect always participates
|
|
|
|
# Add security expert for medium/high risk changes
|
|
if task.security_risk in ["medium", "high"]:
|
|
required.append("security")
|
|
|
|
# Add performance expert for large/complex changes
|
|
if task.lines_of_code > 500 or task.complexity_score > 7.0:
|
|
required.append("performance")
|
|
|
|
# Add documentation expert if documentation files changed
|
|
doc_files = [f for f in task.files_changed if f.endswith(('.md', '.rst', '.txt'))]
|
|
if doc_files or task.lines_of_code > 200:
|
|
required.append("documentation")
|
|
|
|
return required
|
|
|
|
async def create_review_thread(self, task: CodeReviewTask, required_roles: List[str]) -> str:
|
|
"""Create a threaded conversation for the review"""
|
|
if not self.mcp_session:
|
|
raise RuntimeError("MCP session not initialized")
|
|
|
|
participants = [f"review_agent_{role}" for role in required_roles]
|
|
|
|
result = await self.mcp_session.call_tool(
|
|
"bzzz_thread",
|
|
{
|
|
"action": "create",
|
|
"topic": f"Code Review: {task.title}",
|
|
"participants": participants
|
|
}
|
|
)
|
|
|
|
response_data = json.loads(result.content[0].text)
|
|
return response_data["result"]["thread_id"]
|
|
|
|
async def coordinate_review(self, thread_id: str, task: CodeReviewTask, required_roles: List[str]) -> Dict[str, Any]:
|
|
"""Coordinate the collaborative review process"""
|
|
review_results = {}
|
|
|
|
# Step 1: Share task context with all agents
|
|
await self.share_task_context(thread_id, task)
|
|
|
|
# Step 2: Each agent performs their specialized review
|
|
for role in required_roles:
|
|
print(f"🔍 {role} agent performing review...")
|
|
agent_review = await self.conduct_role_specific_review(thread_id, role, task)
|
|
review_results[role] = agent_review
|
|
|
|
# Step 3: Facilitate cross-agent discussion
|
|
discussion_results = await self.facilitate_discussion(thread_id, review_results)
|
|
review_results["discussion"] = discussion_results
|
|
|
|
# Step 4: Reach consensus on final recommendations
|
|
consensus = await self.reach_consensus(thread_id, review_results)
|
|
review_results["consensus"] = consensus
|
|
|
|
return review_results
|
|
|
|
async def share_task_context(self, thread_id: str, task: CodeReviewTask):
|
|
"""Share the task context with all thread participants"""
|
|
if not self.mcp_session:
|
|
raise RuntimeError("MCP session not initialized")
|
|
|
|
context_message = {
|
|
"task": {
|
|
"repository": task.repository,
|
|
"pr_number": task.pull_request_number,
|
|
"title": task.title,
|
|
"description": task.description,
|
|
"files_changed": task.files_changed,
|
|
"lines_of_code": task.lines_of_code,
|
|
"complexity_score": task.complexity_score,
|
|
"security_risk": task.security_risk
|
|
},
|
|
"review_guidelines": {
|
|
"focus_areas": ["correctness", "security", "performance", "maintainability"],
|
|
"severity_levels": ["critical", "major", "minor", "suggestion"],
|
|
"collaboration_expected": True
|
|
}
|
|
}
|
|
|
|
target_address = f"bzzz://*:*@{task.repository}:pr{task.pull_request_number}/context"
|
|
|
|
await self.mcp_session.call_tool(
|
|
"bzzz_post",
|
|
{
|
|
"target_address": target_address,
|
|
"message_type": "task_context",
|
|
"content": context_message,
|
|
"thread_id": thread_id,
|
|
"priority": "high"
|
|
}
|
|
)
|
|
|
|
async def conduct_role_specific_review(self, thread_id: str, role: str, task: CodeReviewTask) -> Dict[str, Any]:
|
|
"""Simulate a role-specific review (in real implementation, this would call GPT-4)"""
|
|
print(f" Analyzing {len(task.files_changed)} files for {role} concerns...")
|
|
|
|
# Simulate different review outcomes based on role
|
|
review_data = {
|
|
"architect": {
|
|
"findings": [
|
|
"Code follows established patterns",
|
|
"Consider extracting common functionality into utility class",
|
|
"Database schema changes require migration script"
|
|
],
|
|
"severity": "minor",
|
|
"recommendations": ["Refactor common code", "Add migration script"],
|
|
"approval_status": "approved_with_suggestions"
|
|
},
|
|
"security": {
|
|
"findings": [
|
|
"Input validation implemented correctly",
|
|
"SQL injection protection in place",
|
|
"Consider adding rate limiting for API endpoints"
|
|
],
|
|
"severity": "minor",
|
|
"recommendations": ["Add rate limiting", "Update security documentation"],
|
|
"approval_status": "approved_with_suggestions"
|
|
},
|
|
"performance": {
|
|
"findings": [
|
|
"Database queries are optimized",
|
|
"Memory usage looks reasonable",
|
|
"Consider caching for frequently accessed data"
|
|
],
|
|
"severity": "suggestion",
|
|
"recommendations": ["Implement caching strategy", "Add performance monitoring"],
|
|
"approval_status": "approved"
|
|
},
|
|
"documentation": {
|
|
"findings": [
|
|
"API documentation updated",
|
|
"Some complex functions lack comments",
|
|
"README needs update for new features"
|
|
],
|
|
"severity": "minor",
|
|
"recommendations": ["Add function comments", "Update README"],
|
|
"approval_status": "approved_with_suggestions"
|
|
}
|
|
}.get(role, {})
|
|
|
|
# Post review findings to the thread
|
|
await self.post_review_findings(thread_id, role, review_data, task)
|
|
|
|
return review_data
|
|
|
|
async def post_review_findings(self, thread_id: str, role: str, review_data: Dict, task: CodeReviewTask):
|
|
"""Post review findings to the collaborative thread"""
|
|
if not self.mcp_session:
|
|
raise RuntimeError("MCP session not initialized")
|
|
|
|
message_content = {
|
|
"reviewer": role,
|
|
"review_type": "initial_review",
|
|
"findings": review_data.get("findings", []),
|
|
"severity": review_data.get("severity", "info"),
|
|
"recommendations": review_data.get("recommendations", []),
|
|
"approval_status": review_data.get("approval_status", "pending"),
|
|
"timestamp": "2025-01-07T12:00:00Z"
|
|
}
|
|
|
|
target_address = f"bzzz://*:{role}@{task.repository}:pr{task.pull_request_number}/findings"
|
|
|
|
await self.mcp_session.call_tool(
|
|
"bzzz_post",
|
|
{
|
|
"target_address": target_address,
|
|
"message_type": "review_findings",
|
|
"content": message_content,
|
|
"thread_id": thread_id,
|
|
"priority": "medium"
|
|
}
|
|
)
|
|
|
|
async def facilitate_discussion(self, thread_id: str, review_results: Dict[str, Any]) -> Dict[str, Any]:
|
|
"""Facilitate cross-agent discussion about conflicting or overlapping concerns"""
|
|
print("💭 Facilitating inter-agent discussion...")
|
|
|
|
# Identify areas where multiple agents have concerns
|
|
common_concerns = self.identify_common_concerns(review_results)
|
|
|
|
discussion_points = []
|
|
for concern in common_concerns:
|
|
discussion_point = {
|
|
"topic": concern["area"],
|
|
"agents_involved": concern["agents"],
|
|
"severity_levels": concern["severities"],
|
|
"proposed_resolution": concern["suggested_approach"]
|
|
}
|
|
discussion_points.append(discussion_point)
|
|
|
|
# Simulate discussion outcomes
|
|
discussion_results = {
|
|
"discussion_points": discussion_points,
|
|
"resolved_conflicts": len(discussion_points),
|
|
"consensus_reached": True,
|
|
"escalation_needed": False
|
|
}
|
|
|
|
return discussion_results
|
|
|
|
def identify_common_concerns(self, review_results: Dict[str, Any]) -> List[Dict]:
|
|
"""Identify areas where multiple agents have overlapping concerns"""
|
|
# This would analyze the review findings to find common themes
|
|
# For demo purposes, return a sample concern
|
|
return [
|
|
{
|
|
"area": "error_handling",
|
|
"agents": ["architect", "security"],
|
|
"severities": ["minor", "minor"],
|
|
"suggested_approach": "Implement consistent error handling pattern"
|
|
}
|
|
]
|
|
|
|
async def reach_consensus(self, thread_id: str, review_results: Dict[str, Any]) -> Dict[str, Any]:
|
|
"""Facilitate consensus-building among reviewing agents"""
|
|
print("🤝 Building consensus on final recommendations...")
|
|
|
|
# Aggregate all findings and recommendations
|
|
all_findings = []
|
|
all_recommendations = []
|
|
approval_statuses = []
|
|
|
|
for role, results in review_results.items():
|
|
if role == "discussion":
|
|
continue
|
|
all_findings.extend(results.get("findings", []))
|
|
all_recommendations.extend(results.get("recommendations", []))
|
|
approval_statuses.append(results.get("approval_status", "pending"))
|
|
|
|
# Determine overall approval status
|
|
if all(status == "approved" for status in approval_statuses):
|
|
overall_status = "approved"
|
|
elif any(status == "rejected" for status in approval_statuses):
|
|
overall_status = "rejected"
|
|
else:
|
|
overall_status = "approved_with_changes"
|
|
|
|
consensus = {
|
|
"overall_approval": overall_status,
|
|
"critical_issues": 0,
|
|
"major_issues": 1,
|
|
"minor_issues": 4,
|
|
"suggestions": 3,
|
|
"consolidated_recommendations": list(set(all_recommendations)),
|
|
"requires_changes": overall_status != "approved",
|
|
"consensus_confidence": 0.95
|
|
}
|
|
|
|
return consensus
|
|
|
|
async def generate_review_summary(self, thread_id: str, review_results: Dict[str, Any]) -> Dict[str, Any]:
|
|
"""Generate a comprehensive review summary"""
|
|
if not self.mcp_session:
|
|
raise RuntimeError("MCP session not initialized")
|
|
|
|
# Use thread summarization tool
|
|
summary_result = await self.mcp_session.call_tool(
|
|
"bzzz_thread",
|
|
{
|
|
"action": "summarize",
|
|
"thread_id": thread_id
|
|
}
|
|
)
|
|
|
|
thread_summary = json.loads(summary_result.content[0].text)
|
|
|
|
final_summary = {
|
|
"review_id": f"review_{thread_id}",
|
|
"overall_status": review_results.get("consensus", {}).get("overall_approval", "pending"),
|
|
"participating_agents": list(self.agents.keys()),
|
|
"thread_summary": thread_summary,
|
|
"key_findings": self.extract_key_findings(review_results),
|
|
"action_items": self.generate_action_items(review_results),
|
|
"approval_required": review_results.get("consensus", {}).get("requires_changes", True),
|
|
"estimated_fix_time": "2-4 hours",
|
|
"review_completed_at": "2025-01-07T12:30:00Z"
|
|
}
|
|
|
|
return final_summary
|
|
|
|
def extract_key_findings(self, review_results: Dict[str, Any]) -> List[str]:
|
|
"""Extract the most important findings from all agent reviews"""
|
|
key_findings = []
|
|
for role, results in review_results.items():
|
|
if role in ["discussion", "consensus"]:
|
|
continue
|
|
findings = results.get("findings", [])
|
|
# Take first 2 findings from each agent as key findings
|
|
key_findings.extend(findings[:2])
|
|
return key_findings
|
|
|
|
def generate_action_items(self, review_results: Dict[str, Any]) -> List[Dict]:
|
|
"""Generate actionable items based on review findings"""
|
|
action_items = []
|
|
consensus = review_results.get("consensus", {})
|
|
|
|
for rec in consensus.get("consolidated_recommendations", []):
|
|
action_items.append({
|
|
"action": rec,
|
|
"priority": "medium",
|
|
"estimated_effort": "1-2 hours",
|
|
"assignee": "developer"
|
|
})
|
|
|
|
return action_items
|
|
|
|
async def cleanup(self):
|
|
"""Clean up resources and close connections"""
|
|
if self.mcp_session:
|
|
await self.mcp_session.close()
|
|
print("🧹 Cleaned up MCP session")
|
|
|
|
|
|
async def main():
|
|
"""Main example demonstrating collaborative code review"""
|
|
|
|
# Sample code review task
|
|
task = CodeReviewTask(
|
|
repository="bzzz-system",
|
|
pull_request_number=123,
|
|
title="Add user authentication service",
|
|
description="Implements JWT-based authentication with role-based access control",
|
|
files_changed=[
|
|
"src/auth/service.py",
|
|
"src/auth/middleware.py",
|
|
"src/models/user.py",
|
|
"tests/test_auth.py",
|
|
"docs/api/auth.md"
|
|
],
|
|
lines_of_code=450,
|
|
complexity_score=6.5,
|
|
security_risk="medium"
|
|
)
|
|
|
|
# Initialize the orchestrator
|
|
orchestrator = CollaborativeReviewOrchestrator()
|
|
|
|
try:
|
|
print("🚀 Initializing BZZZ MCP Collaborative Review Example")
|
|
await orchestrator.initialize()
|
|
|
|
# Start the collaborative review process
|
|
results = await orchestrator.start_collaborative_review(task)
|
|
|
|
# Display results
|
|
print("\n" + "="*60)
|
|
print("📊 COLLABORATIVE REVIEW RESULTS")
|
|
print("="*60)
|
|
print(json.dumps(results, indent=2))
|
|
|
|
except Exception as e:
|
|
print(f"❌ Error during collaborative review: {e}")
|
|
|
|
finally:
|
|
await orchestrator.cleanup()
|
|
|
|
if __name__ == "__main__":
|
|
# Run the example
|
|
asyncio.run(main()) |