package pubsub import ( "context" "encoding/json" "fmt" "strings" "sync" "time" "github.com/libp2p/go-libp2p/core/host" "github.com/libp2p/go-libp2p/core/peer" pubsub "github.com/libp2p/go-libp2p-pubsub" ) // PubSub handles publish/subscribe messaging for Bzzz coordination and Antennae meta-discussion type PubSub struct { ps *pubsub.PubSub host host.Host ctx context.Context cancel context.CancelFunc // Topic subscriptions bzzzTopic *pubsub.Topic antennaeTopic *pubsub.Topic // Message subscriptions bzzzSub *pubsub.Subscription antennaeSub *pubsub.Subscription // Dynamic topic management dynamicTopics map[string]*pubsub.Topic dynamicTopicsMux sync.RWMutex dynamicSubs map[string]*pubsub.Subscription dynamicSubsMux sync.RWMutex // Configuration bzzzTopicName string antennaeTopicName string // External message handler for Antennae messages AntennaeMessageHandler func(msg Message, from peer.ID) } // MessageType represents different types of messages type MessageType string const ( // Bzzz coordination messages TaskAnnouncement MessageType = "task_announcement" TaskClaim MessageType = "task_claim" TaskProgress MessageType = "task_progress" TaskComplete MessageType = "task_complete" CapabilityBcast MessageType = "capability_broadcast" // Only broadcast when capabilities change AvailabilityBcast MessageType = "availability_broadcast" // Regular availability status // Antennae meta-discussion messages MetaDiscussion MessageType = "meta_discussion" // Generic type for all discussion TaskHelpRequest MessageType = "task_help_request" // Request for assistance TaskHelpResponse MessageType = "task_help_response" // Response to a help request CoordinationRequest MessageType = "coordination_request" // Request for coordination CoordinationComplete MessageType = "coordination_complete" // Coordination session completed DependencyAlert MessageType = "dependency_alert" // Dependency detected EscalationTrigger MessageType = "escalation_trigger" // Human escalation needed // Role-based collaboration messages RoleAnnouncement MessageType = "role_announcement" // Agent announces its role and capabilities ExpertiseRequest MessageType = "expertise_request" // Request for specific expertise ExpertiseResponse MessageType = "expertise_response" // Response offering expertise StatusUpdate MessageType = "status_update" // Regular status updates from agents WorkAllocation MessageType = "work_allocation" // Allocation of work to specific roles RoleCollaboration MessageType = "role_collaboration" // Cross-role collaboration message MentorshipRequest MessageType = "mentorship_request" // Junior role requesting mentorship MentorshipResponse MessageType = "mentorship_response" // Senior role providing mentorship ProjectUpdate MessageType = "project_update" // Project-level status updates DeliverableReady MessageType = "deliverable_ready" // Notification that deliverable is complete ) // Message represents a Bzzz/Antennae message type Message struct { Type MessageType `json:"type"` From string `json:"from"` Timestamp time.Time `json:"timestamp"` Data map[string]interface{} `json:"data"` HopCount int `json:"hop_count,omitempty"` // For Antennae hop limiting // Role-based collaboration fields FromRole string `json:"from_role,omitempty"` // Role of sender ToRoles []string `json:"to_roles,omitempty"` // Target roles RequiredExpertise []string `json:"required_expertise,omitempty"` // Required expertise areas ProjectID string `json:"project_id,omitempty"` // Associated project Priority string `json:"priority,omitempty"` // Message priority (low, medium, high, urgent) ThreadID string `json:"thread_id,omitempty"` // Conversation thread ID } // NewPubSub creates a new PubSub instance for Bzzz coordination and Antennae meta-discussion func NewPubSub(ctx context.Context, h host.Host, bzzzTopic, antennaeTopic string) (*PubSub, error) { if bzzzTopic == "" { bzzzTopic = "bzzz/coordination/v1" } if antennaeTopic == "" { antennaeTopic = "antennae/meta-discussion/v1" } pubsubCtx, cancel := context.WithCancel(ctx) // Create gossipsub instance with message validation ps, err := pubsub.NewGossipSub(pubsubCtx, h, pubsub.WithMessageSigning(true), pubsub.WithStrictSignatureVerification(true), pubsub.WithValidateQueueSize(256), pubsub.WithValidateThrottle(1024), ) if err != nil { cancel() return nil, fmt.Errorf("failed to create gossipsub: %w", err) } p := &PubSub{ ps: ps, host: h, ctx: pubsubCtx, cancel: cancel, bzzzTopicName: bzzzTopic, antennaeTopicName: antennaeTopic, dynamicTopics: make(map[string]*pubsub.Topic), dynamicSubs: make(map[string]*pubsub.Subscription), } // Join static topics if err := p.joinStaticTopics(); err != nil { cancel() return nil, err } // Start message handlers go p.handleBzzzMessages() go p.handleAntennaeMessages() fmt.Printf("📡 PubSub initialized - Bzzz: %s, Antennae: %s\n", bzzzTopic, antennaeTopic) return p, nil } // SetAntennaeMessageHandler sets the handler for incoming Antennae messages. func (p *PubSub) SetAntennaeMessageHandler(handler func(msg Message, from peer.ID)) { p.AntennaeMessageHandler = handler } // joinStaticTopics joins the main Bzzz and Antennae topics func (p *PubSub) joinStaticTopics() error { // Join Bzzz coordination topic bzzzTopic, err := p.ps.Join(p.bzzzTopicName) if err != nil { return fmt.Errorf("failed to join Bzzz topic: %w", err) } p.bzzzTopic = bzzzTopic bzzzSub, err := bzzzTopic.Subscribe() if err != nil { return fmt.Errorf("failed to subscribe to Bzzz topic: %w", err) } p.bzzzSub = bzzzSub // Join Antennae meta-discussion topic antennaeTopic, err := p.ps.Join(p.antennaeTopicName) if err != nil { return fmt.Errorf("failed to join Antennae topic: %w", err) } p.antennaeTopic = antennaeTopic antennaeSub, err := antennaeTopic.Subscribe() if err != nil { return fmt.Errorf("failed to subscribe to Antennae topic: %w", err) } p.antennaeSub = antennaeSub return nil } // JoinDynamicTopic joins a new topic for a specific task func (p *PubSub) JoinDynamicTopic(topicName string) error { p.dynamicTopicsMux.Lock() defer p.dynamicTopicsMux.Unlock() p.dynamicSubsMux.Lock() defer p.dynamicSubsMux.Unlock() if _, exists := p.dynamicTopics[topicName]; exists { return nil // Already joined } topic, err := p.ps.Join(topicName) if err != nil { return fmt.Errorf("failed to join dynamic topic %s: %w", topicName, err) } sub, err := topic.Subscribe() if err != nil { topic.Close() return fmt.Errorf("failed to subscribe to dynamic topic %s: %w", topicName, err) } p.dynamicTopics[topicName] = topic p.dynamicSubs[topicName] = sub // Start a handler for this new subscription go p.handleDynamicMessages(sub) fmt.Printf("✅ Joined dynamic topic: %s\n", topicName) return nil } // JoinRoleBasedTopics joins topics based on role and expertise func (p *PubSub) JoinRoleBasedTopics(role string, expertise []string, reportsTo []string) error { var topicsToJoin []string // Join role-specific topic if role != "" { roleTopic := fmt.Sprintf("bzzz/roles/%s/v1", strings.ToLower(strings.ReplaceAll(role, " ", "_"))) topicsToJoin = append(topicsToJoin, roleTopic) } // Join expertise-specific topics for _, exp := range expertise { expertiseTopic := fmt.Sprintf("bzzz/expertise/%s/v1", strings.ToLower(strings.ReplaceAll(exp, " ", "_"))) topicsToJoin = append(topicsToJoin, expertiseTopic) } // Join reporting hierarchy topics for _, supervisor := range reportsTo { supervisorTopic := fmt.Sprintf("bzzz/hierarchy/%s/v1", strings.ToLower(strings.ReplaceAll(supervisor, " ", "_"))) topicsToJoin = append(topicsToJoin, supervisorTopic) } // Join all identified topics for _, topicName := range topicsToJoin { if err := p.JoinDynamicTopic(topicName); err != nil { fmt.Printf("⚠️ Failed to join role-based topic %s: %v\n", topicName, err) continue } } fmt.Printf("🎯 Joined %d role-based topics for role: %s\n", len(topicsToJoin), role) return nil } // JoinProjectTopic joins a project-specific topic func (p *PubSub) JoinProjectTopic(projectID string) error { if projectID == "" { return fmt.Errorf("project ID cannot be empty") } topicName := fmt.Sprintf("bzzz/projects/%s/coordination/v1", projectID) return p.JoinDynamicTopic(topicName) } // LeaveDynamicTopic leaves a specific task topic func (p *PubSub) LeaveDynamicTopic(topicName string) { p.dynamicTopicsMux.Lock() defer p.dynamicTopicsMux.Unlock() p.dynamicSubsMux.Lock() defer p.dynamicSubsMux.Unlock() if sub, exists := p.dynamicSubs[topicName]; exists { sub.Cancel() delete(p.dynamicSubs, topicName) } if topic, exists := p.dynamicTopics[topicName]; exists { topic.Close() delete(p.dynamicTopics, topicName) } fmt.Printf("🗑️ Left dynamic topic: %s\n", topicName) } // PublishToDynamicTopic publishes a message to a specific dynamic topic func (p *PubSub) PublishToDynamicTopic(topicName string, msgType MessageType, data map[string]interface{}) error { p.dynamicTopicsMux.RLock() topic, exists := p.dynamicTopics[topicName] p.dynamicTopicsMux.RUnlock() if !exists { return fmt.Errorf("not subscribed to dynamic topic: %s", topicName) } msg := Message{ Type: msgType, From: p.host.ID().String(), Timestamp: time.Now(), Data: data, } msgBytes, err := json.Marshal(msg) if err != nil { return fmt.Errorf("failed to marshal message for dynamic topic: %w", err) } return topic.Publish(p.ctx, msgBytes) } // PublishBzzzMessage publishes a message to the Bzzz coordination topic func (p *PubSub) PublishBzzzMessage(msgType MessageType, data map[string]interface{}) error { msg := Message{ Type: msgType, From: p.host.ID().String(), Timestamp: time.Now(), Data: data, } msgBytes, err := json.Marshal(msg) if err != nil { return fmt.Errorf("failed to marshal message: %w", err) } return p.bzzzTopic.Publish(p.ctx, msgBytes) } // PublishAntennaeMessage publishes a message to the Antennae meta-discussion topic func (p *PubSub) PublishAntennaeMessage(msgType MessageType, data map[string]interface{}) error { msg := Message{ Type: msgType, From: p.host.ID().String(), Timestamp: time.Now(), Data: data, } msgBytes, err := json.Marshal(msg) if err != nil { return fmt.Errorf("failed to marshal message: %w", err) } return p.antennaeTopic.Publish(p.ctx, msgBytes) } // PublishRoleBasedMessage publishes a role-based collaboration message func (p *PubSub) PublishRoleBasedMessage(msgType MessageType, data map[string]interface{}, opts MessageOptions) error { msg := Message{ Type: msgType, From: p.host.ID().String(), Timestamp: time.Now(), Data: data, FromRole: opts.FromRole, ToRoles: opts.ToRoles, RequiredExpertise: opts.RequiredExpertise, ProjectID: opts.ProjectID, Priority: opts.Priority, ThreadID: opts.ThreadID, } msgBytes, err := json.Marshal(msg) if err != nil { return fmt.Errorf("failed to marshal role-based message: %w", err) } // Determine which topic to use based on message type var topic *pubsub.Topic switch msgType { case RoleAnnouncement, ExpertiseRequest, ExpertiseResponse, StatusUpdate, WorkAllocation, RoleCollaboration, MentorshipRequest, MentorshipResponse, ProjectUpdate, DeliverableReady: topic = p.antennaeTopic // Use Antennae topic for role-based messages default: topic = p.bzzzTopic // Default to Bzzz topic } return topic.Publish(p.ctx, msgBytes) } // MessageOptions holds options for role-based messages type MessageOptions struct { FromRole string ToRoles []string RequiredExpertise []string ProjectID string Priority string ThreadID string } // handleBzzzMessages processes incoming Bzzz coordination messages func (p *PubSub) handleBzzzMessages() { for { msg, err := p.bzzzSub.Next(p.ctx) if err != nil { if p.ctx.Err() != nil { return // Context cancelled } fmt.Printf("❌ Error receiving Bzzz message: %v\n", err) continue } if msg.ReceivedFrom == p.host.ID() { continue } var bzzzMsg Message if err := json.Unmarshal(msg.Data, &bzzzMsg); err != nil { fmt.Printf("❌ Failed to unmarshal Bzzz message: %v\n", err) continue } p.processBzzzMessage(bzzzMsg, msg.ReceivedFrom) } } // handleAntennaeMessages processes incoming Antennae meta-discussion messages func (p *PubSub) handleAntennaeMessages() { for { msg, err := p.antennaeSub.Next(p.ctx) if err != nil { if p.ctx.Err() != nil { return // Context cancelled } fmt.Printf("❌ Error receiving Antennae message: %v\n", err) continue } if msg.ReceivedFrom == p.host.ID() { continue } var antennaeMsg Message if err := json.Unmarshal(msg.Data, &antennaeMsg); err != nil { fmt.Printf("❌ Failed to unmarshal Antennae message: %v\n", err) continue } if p.AntennaeMessageHandler != nil { p.AntennaeMessageHandler(antennaeMsg, msg.ReceivedFrom) } else { p.processAntennaeMessage(antennaeMsg, msg.ReceivedFrom) } } } // handleDynamicMessages processes messages from a dynamic topic subscription func (p *PubSub) handleDynamicMessages(sub *pubsub.Subscription) { for { msg, err := sub.Next(p.ctx) if err != nil { if p.ctx.Err() != nil || err.Error() == "subscription cancelled" { return // Subscription was cancelled, exit handler } fmt.Printf("❌ Error receiving dynamic message: %v\n", err) continue } if msg.ReceivedFrom == p.host.ID() { continue } var dynamicMsg Message if err := json.Unmarshal(msg.Data, &dynamicMsg); err != nil { fmt.Printf("❌ Failed to unmarshal dynamic message: %v\n", err) continue } // Use the main Antennae handler for all dynamic messages if p.AntennaeMessageHandler != nil { p.AntennaeMessageHandler(dynamicMsg, msg.ReceivedFrom) } } } // processBzzzMessage handles different types of Bzzz coordination messages func (p *PubSub) processBzzzMessage(msg Message, from peer.ID) { fmt.Printf("🐝 Bzzz [%s] from %s: %v\n", msg.Type, from.ShortString(), msg.Data) } // processAntennaeMessage provides default handling for Antennae messages if no external handler is set func (p *PubSub) processAntennaeMessage(msg Message, from peer.ID) { fmt.Printf("🎯 Default Antennae Handler [%s] from %s: %v\n", msg.Type, from.ShortString(), msg.Data) } // Close shuts down the PubSub instance func (p *PubSub) Close() error { p.cancel() if p.bzzzSub != nil { p.bzzzSub.Cancel() } if p.antennaeSub != nil { p.antennaeSub.Cancel() } if p.bzzzTopic != nil { p.bzzzTopic.Close() } if p.antennaeTopic != nil { p.antennaeTopic.Close() } p.dynamicTopicsMux.Lock() for _, topic := range p.dynamicTopics { topic.Close() } p.dynamicTopicsMux.Unlock() return nil }