Files
bzzz/pubsub/pubsub.go
anthonyrawlins 81b473d48f Add comprehensive agent role integration from Bees-AgenticWorkers
- Add role-based configuration to AgentConfig with 15 predefined roles
- Enhanced message types for role-based collaboration
- Role-based topic subscription system
- Agent initialization with automatic role assignment
- Role announcements and collaboration settings
- Support for expertise areas, reporting hierarchy, and deliverables

🤖 Generated with [Claude Code](https://claude.ai/code)

Co-Authored-By: Claude <noreply@anthropic.com>
2025-07-27 15:24:43 +10:00

501 lines
15 KiB
Go

package pubsub
import (
"context"
"encoding/json"
"fmt"
"strings"
"sync"
"time"
"github.com/libp2p/go-libp2p/core/host"
"github.com/libp2p/go-libp2p/core/peer"
pubsub "github.com/libp2p/go-libp2p-pubsub"
)
// PubSub handles publish/subscribe messaging for Bzzz coordination and Antennae meta-discussion
type PubSub struct {
ps *pubsub.PubSub
host host.Host
ctx context.Context
cancel context.CancelFunc
// Topic subscriptions
bzzzTopic *pubsub.Topic
antennaeTopic *pubsub.Topic
// Message subscriptions
bzzzSub *pubsub.Subscription
antennaeSub *pubsub.Subscription
// Dynamic topic management
dynamicTopics map[string]*pubsub.Topic
dynamicTopicsMux sync.RWMutex
dynamicSubs map[string]*pubsub.Subscription
dynamicSubsMux sync.RWMutex
// Configuration
bzzzTopicName string
antennaeTopicName string
// External message handler for Antennae messages
AntennaeMessageHandler func(msg Message, from peer.ID)
}
// MessageType represents different types of messages
type MessageType string
const (
// Bzzz coordination messages
TaskAnnouncement MessageType = "task_announcement"
TaskClaim MessageType = "task_claim"
TaskProgress MessageType = "task_progress"
TaskComplete MessageType = "task_complete"
CapabilityBcast MessageType = "capability_broadcast" // Only broadcast when capabilities change
AvailabilityBcast MessageType = "availability_broadcast" // Regular availability status
// Antennae meta-discussion messages
MetaDiscussion MessageType = "meta_discussion" // Generic type for all discussion
TaskHelpRequest MessageType = "task_help_request" // Request for assistance
TaskHelpResponse MessageType = "task_help_response" // Response to a help request
CoordinationRequest MessageType = "coordination_request" // Request for coordination
CoordinationComplete MessageType = "coordination_complete" // Coordination session completed
DependencyAlert MessageType = "dependency_alert" // Dependency detected
EscalationTrigger MessageType = "escalation_trigger" // Human escalation needed
// Role-based collaboration messages
RoleAnnouncement MessageType = "role_announcement" // Agent announces its role and capabilities
ExpertiseRequest MessageType = "expertise_request" // Request for specific expertise
ExpertiseResponse MessageType = "expertise_response" // Response offering expertise
StatusUpdate MessageType = "status_update" // Regular status updates from agents
WorkAllocation MessageType = "work_allocation" // Allocation of work to specific roles
RoleCollaboration MessageType = "role_collaboration" // Cross-role collaboration message
MentorshipRequest MessageType = "mentorship_request" // Junior role requesting mentorship
MentorshipResponse MessageType = "mentorship_response" // Senior role providing mentorship
ProjectUpdate MessageType = "project_update" // Project-level status updates
DeliverableReady MessageType = "deliverable_ready" // Notification that deliverable is complete
)
// Message represents a Bzzz/Antennae message
type Message struct {
Type MessageType `json:"type"`
From string `json:"from"`
Timestamp time.Time `json:"timestamp"`
Data map[string]interface{} `json:"data"`
HopCount int `json:"hop_count,omitempty"` // For Antennae hop limiting
// Role-based collaboration fields
FromRole string `json:"from_role,omitempty"` // Role of sender
ToRoles []string `json:"to_roles,omitempty"` // Target roles
RequiredExpertise []string `json:"required_expertise,omitempty"` // Required expertise areas
ProjectID string `json:"project_id,omitempty"` // Associated project
Priority string `json:"priority,omitempty"` // Message priority (low, medium, high, urgent)
ThreadID string `json:"thread_id,omitempty"` // Conversation thread ID
}
// NewPubSub creates a new PubSub instance for Bzzz coordination and Antennae meta-discussion
func NewPubSub(ctx context.Context, h host.Host, bzzzTopic, antennaeTopic string) (*PubSub, error) {
if bzzzTopic == "" {
bzzzTopic = "bzzz/coordination/v1"
}
if antennaeTopic == "" {
antennaeTopic = "antennae/meta-discussion/v1"
}
pubsubCtx, cancel := context.WithCancel(ctx)
// Create gossipsub instance with message validation
ps, err := pubsub.NewGossipSub(pubsubCtx, h,
pubsub.WithMessageSigning(true),
pubsub.WithStrictSignatureVerification(true),
pubsub.WithValidateQueueSize(256),
pubsub.WithValidateThrottle(1024),
)
if err != nil {
cancel()
return nil, fmt.Errorf("failed to create gossipsub: %w", err)
}
p := &PubSub{
ps: ps,
host: h,
ctx: pubsubCtx,
cancel: cancel,
bzzzTopicName: bzzzTopic,
antennaeTopicName: antennaeTopic,
dynamicTopics: make(map[string]*pubsub.Topic),
dynamicSubs: make(map[string]*pubsub.Subscription),
}
// Join static topics
if err := p.joinStaticTopics(); err != nil {
cancel()
return nil, err
}
// Start message handlers
go p.handleBzzzMessages()
go p.handleAntennaeMessages()
fmt.Printf("📡 PubSub initialized - Bzzz: %s, Antennae: %s\n", bzzzTopic, antennaeTopic)
return p, nil
}
// SetAntennaeMessageHandler sets the handler for incoming Antennae messages.
func (p *PubSub) SetAntennaeMessageHandler(handler func(msg Message, from peer.ID)) {
p.AntennaeMessageHandler = handler
}
// joinStaticTopics joins the main Bzzz and Antennae topics
func (p *PubSub) joinStaticTopics() error {
// Join Bzzz coordination topic
bzzzTopic, err := p.ps.Join(p.bzzzTopicName)
if err != nil {
return fmt.Errorf("failed to join Bzzz topic: %w", err)
}
p.bzzzTopic = bzzzTopic
bzzzSub, err := bzzzTopic.Subscribe()
if err != nil {
return fmt.Errorf("failed to subscribe to Bzzz topic: %w", err)
}
p.bzzzSub = bzzzSub
// Join Antennae meta-discussion topic
antennaeTopic, err := p.ps.Join(p.antennaeTopicName)
if err != nil {
return fmt.Errorf("failed to join Antennae topic: %w", err)
}
p.antennaeTopic = antennaeTopic
antennaeSub, err := antennaeTopic.Subscribe()
if err != nil {
return fmt.Errorf("failed to subscribe to Antennae topic: %w", err)
}
p.antennaeSub = antennaeSub
return nil
}
// JoinDynamicTopic joins a new topic for a specific task
func (p *PubSub) JoinDynamicTopic(topicName string) error {
p.dynamicTopicsMux.Lock()
defer p.dynamicTopicsMux.Unlock()
p.dynamicSubsMux.Lock()
defer p.dynamicSubsMux.Unlock()
if _, exists := p.dynamicTopics[topicName]; exists {
return nil // Already joined
}
topic, err := p.ps.Join(topicName)
if err != nil {
return fmt.Errorf("failed to join dynamic topic %s: %w", topicName, err)
}
sub, err := topic.Subscribe()
if err != nil {
topic.Close()
return fmt.Errorf("failed to subscribe to dynamic topic %s: %w", topicName, err)
}
p.dynamicTopics[topicName] = topic
p.dynamicSubs[topicName] = sub
// Start a handler for this new subscription
go p.handleDynamicMessages(sub)
fmt.Printf("✅ Joined dynamic topic: %s\n", topicName)
return nil
}
// JoinRoleBasedTopics joins topics based on role and expertise
func (p *PubSub) JoinRoleBasedTopics(role string, expertise []string, reportsTo []string) error {
var topicsToJoin []string
// Join role-specific topic
if role != "" {
roleTopic := fmt.Sprintf("bzzz/roles/%s/v1", strings.ToLower(strings.ReplaceAll(role, " ", "_")))
topicsToJoin = append(topicsToJoin, roleTopic)
}
// Join expertise-specific topics
for _, exp := range expertise {
expertiseTopic := fmt.Sprintf("bzzz/expertise/%s/v1", strings.ToLower(strings.ReplaceAll(exp, " ", "_")))
topicsToJoin = append(topicsToJoin, expertiseTopic)
}
// Join reporting hierarchy topics
for _, supervisor := range reportsTo {
supervisorTopic := fmt.Sprintf("bzzz/hierarchy/%s/v1", strings.ToLower(strings.ReplaceAll(supervisor, " ", "_")))
topicsToJoin = append(topicsToJoin, supervisorTopic)
}
// Join all identified topics
for _, topicName := range topicsToJoin {
if err := p.JoinDynamicTopic(topicName); err != nil {
fmt.Printf("⚠️ Failed to join role-based topic %s: %v\n", topicName, err)
continue
}
}
fmt.Printf("🎯 Joined %d role-based topics for role: %s\n", len(topicsToJoin), role)
return nil
}
// JoinProjectTopic joins a project-specific topic
func (p *PubSub) JoinProjectTopic(projectID string) error {
if projectID == "" {
return fmt.Errorf("project ID cannot be empty")
}
topicName := fmt.Sprintf("bzzz/projects/%s/coordination/v1", projectID)
return p.JoinDynamicTopic(topicName)
}
// LeaveDynamicTopic leaves a specific task topic
func (p *PubSub) LeaveDynamicTopic(topicName string) {
p.dynamicTopicsMux.Lock()
defer p.dynamicTopicsMux.Unlock()
p.dynamicSubsMux.Lock()
defer p.dynamicSubsMux.Unlock()
if sub, exists := p.dynamicSubs[topicName]; exists {
sub.Cancel()
delete(p.dynamicSubs, topicName)
}
if topic, exists := p.dynamicTopics[topicName]; exists {
topic.Close()
delete(p.dynamicTopics, topicName)
}
fmt.Printf("🗑️ Left dynamic topic: %s\n", topicName)
}
// PublishToDynamicTopic publishes a message to a specific dynamic topic
func (p *PubSub) PublishToDynamicTopic(topicName string, msgType MessageType, data map[string]interface{}) error {
p.dynamicTopicsMux.RLock()
topic, exists := p.dynamicTopics[topicName]
p.dynamicTopicsMux.RUnlock()
if !exists {
return fmt.Errorf("not subscribed to dynamic topic: %s", topicName)
}
msg := Message{
Type: msgType,
From: p.host.ID().String(),
Timestamp: time.Now(),
Data: data,
}
msgBytes, err := json.Marshal(msg)
if err != nil {
return fmt.Errorf("failed to marshal message for dynamic topic: %w", err)
}
return topic.Publish(p.ctx, msgBytes)
}
// PublishBzzzMessage publishes a message to the Bzzz coordination topic
func (p *PubSub) PublishBzzzMessage(msgType MessageType, data map[string]interface{}) error {
msg := Message{
Type: msgType,
From: p.host.ID().String(),
Timestamp: time.Now(),
Data: data,
}
msgBytes, err := json.Marshal(msg)
if err != nil {
return fmt.Errorf("failed to marshal message: %w", err)
}
return p.bzzzTopic.Publish(p.ctx, msgBytes)
}
// PublishAntennaeMessage publishes a message to the Antennae meta-discussion topic
func (p *PubSub) PublishAntennaeMessage(msgType MessageType, data map[string]interface{}) error {
msg := Message{
Type: msgType,
From: p.host.ID().String(),
Timestamp: time.Now(),
Data: data,
}
msgBytes, err := json.Marshal(msg)
if err != nil {
return fmt.Errorf("failed to marshal message: %w", err)
}
return p.antennaeTopic.Publish(p.ctx, msgBytes)
}
// PublishRoleBasedMessage publishes a role-based collaboration message
func (p *PubSub) PublishRoleBasedMessage(msgType MessageType, data map[string]interface{}, opts MessageOptions) error {
msg := Message{
Type: msgType,
From: p.host.ID().String(),
Timestamp: time.Now(),
Data: data,
FromRole: opts.FromRole,
ToRoles: opts.ToRoles,
RequiredExpertise: opts.RequiredExpertise,
ProjectID: opts.ProjectID,
Priority: opts.Priority,
ThreadID: opts.ThreadID,
}
msgBytes, err := json.Marshal(msg)
if err != nil {
return fmt.Errorf("failed to marshal role-based message: %w", err)
}
// Determine which topic to use based on message type
var topic *pubsub.Topic
switch msgType {
case RoleAnnouncement, ExpertiseRequest, ExpertiseResponse, StatusUpdate,
WorkAllocation, RoleCollaboration, MentorshipRequest, MentorshipResponse,
ProjectUpdate, DeliverableReady:
topic = p.antennaeTopic // Use Antennae topic for role-based messages
default:
topic = p.bzzzTopic // Default to Bzzz topic
}
return topic.Publish(p.ctx, msgBytes)
}
// MessageOptions holds options for role-based messages
type MessageOptions struct {
FromRole string
ToRoles []string
RequiredExpertise []string
ProjectID string
Priority string
ThreadID string
}
// handleBzzzMessages processes incoming Bzzz coordination messages
func (p *PubSub) handleBzzzMessages() {
for {
msg, err := p.bzzzSub.Next(p.ctx)
if err != nil {
if p.ctx.Err() != nil {
return // Context cancelled
}
fmt.Printf("❌ Error receiving Bzzz message: %v\n", err)
continue
}
if msg.ReceivedFrom == p.host.ID() {
continue
}
var bzzzMsg Message
if err := json.Unmarshal(msg.Data, &bzzzMsg); err != nil {
fmt.Printf("❌ Failed to unmarshal Bzzz message: %v\n", err)
continue
}
p.processBzzzMessage(bzzzMsg, msg.ReceivedFrom)
}
}
// handleAntennaeMessages processes incoming Antennae meta-discussion messages
func (p *PubSub) handleAntennaeMessages() {
for {
msg, err := p.antennaeSub.Next(p.ctx)
if err != nil {
if p.ctx.Err() != nil {
return // Context cancelled
}
fmt.Printf("❌ Error receiving Antennae message: %v\n", err)
continue
}
if msg.ReceivedFrom == p.host.ID() {
continue
}
var antennaeMsg Message
if err := json.Unmarshal(msg.Data, &antennaeMsg); err != nil {
fmt.Printf("❌ Failed to unmarshal Antennae message: %v\n", err)
continue
}
if p.AntennaeMessageHandler != nil {
p.AntennaeMessageHandler(antennaeMsg, msg.ReceivedFrom)
} else {
p.processAntennaeMessage(antennaeMsg, msg.ReceivedFrom)
}
}
}
// handleDynamicMessages processes messages from a dynamic topic subscription
func (p *PubSub) handleDynamicMessages(sub *pubsub.Subscription) {
for {
msg, err := sub.Next(p.ctx)
if err != nil {
if p.ctx.Err() != nil || err.Error() == "subscription cancelled" {
return // Subscription was cancelled, exit handler
}
fmt.Printf("❌ Error receiving dynamic message: %v\n", err)
continue
}
if msg.ReceivedFrom == p.host.ID() {
continue
}
var dynamicMsg Message
if err := json.Unmarshal(msg.Data, &dynamicMsg); err != nil {
fmt.Printf("❌ Failed to unmarshal dynamic message: %v\n", err)
continue
}
// Use the main Antennae handler for all dynamic messages
if p.AntennaeMessageHandler != nil {
p.AntennaeMessageHandler(dynamicMsg, msg.ReceivedFrom)
}
}
}
// processBzzzMessage handles different types of Bzzz coordination messages
func (p *PubSub) processBzzzMessage(msg Message, from peer.ID) {
fmt.Printf("🐝 Bzzz [%s] from %s: %v\n", msg.Type, from.ShortString(), msg.Data)
}
// processAntennaeMessage provides default handling for Antennae messages if no external handler is set
func (p *PubSub) processAntennaeMessage(msg Message, from peer.ID) {
fmt.Printf("🎯 Default Antennae Handler [%s] from %s: %v\n",
msg.Type, from.ShortString(), msg.Data)
}
// Close shuts down the PubSub instance
func (p *PubSub) Close() error {
p.cancel()
if p.bzzzSub != nil {
p.bzzzSub.Cancel()
}
if p.antennaeSub != nil {
p.antennaeSub.Cancel()
}
if p.bzzzTopic != nil {
p.bzzzTopic.Close()
}
if p.antennaeTopic != nil {
p.antennaeTopic.Close()
}
p.dynamicTopicsMux.Lock()
for _, topic := range p.dynamicTopics {
topic.Close()
}
p.dynamicTopicsMux.Unlock()
return nil
}