package pubsub import ( "context" "encoding/json" "fmt" "strings" "sync" "time" "chorus/pkg/shhh" pubsub "github.com/libp2p/go-libp2p-pubsub" "github.com/libp2p/go-libp2p/core/host" "github.com/libp2p/go-libp2p/core/peer" ) // PubSub handles publish/subscribe messaging for Bzzz coordination and HMMM meta-discussion type PubSub struct { ps *pubsub.PubSub host host.Host ctx context.Context cancel context.CancelFunc // Topic subscriptions chorusTopic *pubsub.Topic hmmmTopic *pubsub.Topic contextTopic *pubsub.Topic // Message subscriptions chorusSub *pubsub.Subscription hmmmSub *pubsub.Subscription contextSub *pubsub.Subscription // Dynamic topic management dynamicTopics map[string]*pubsub.Topic dynamicTopicsMux sync.RWMutex dynamicSubs map[string]*pubsub.Subscription dynamicSubsMux sync.RWMutex dynamicHandlers map[string]func([]byte, peer.ID) dynamicHandlersMux sync.RWMutex // Configuration chorusTopicName string hmmmTopicName string contextTopicName string // External message handler for HMMM messages HmmmMessageHandler func(msg Message, from peer.ID) // External message handler for Context Feedback messages ContextFeedbackHandler func(msg Message, from peer.ID) // Hypercore-style logging hypercoreLog HypercoreLogger // SHHH sentinel redactor *shhh.Sentinel redactorMux sync.RWMutex } // HypercoreLogger interface for dependency injection type HypercoreLogger interface { AppendString(logType string, data map[string]interface{}) error GetStats() map[string]interface{} } // MessageType represents different types of messages type MessageType string const ( // Bzzz coordination messages TaskAnnouncement MessageType = "task_announcement" TaskClaim MessageType = "task_claim" TaskProgress MessageType = "task_progress" TaskComplete MessageType = "task_complete" CapabilityBcast MessageType = "capability_broadcast" // Only broadcast when capabilities change AvailabilityBcast MessageType = "availability_broadcast" // Regular availability status // HMMM meta-discussion messages MetaDiscussion MessageType = "meta_discussion" // Generic type for all discussion TaskHelpRequest MessageType = "task_help_request" // Request for assistance TaskHelpResponse MessageType = "task_help_response" // Response to a help request CoordinationRequest MessageType = "coordination_request" // Request for coordination CoordinationComplete MessageType = "coordination_complete" // Coordination session completed DependencyAlert MessageType = "dependency_alert" // Dependency detected EscalationTrigger MessageType = "escalation_trigger" // Human escalation needed // Role-based collaboration messages RoleAnnouncement MessageType = "role_announcement" // Agent announces its role and capabilities ExpertiseRequest MessageType = "expertise_request" // Request for specific expertise ExpertiseResponse MessageType = "expertise_response" // Response offering expertise StatusUpdate MessageType = "status_update" // Regular status updates from agents WorkAllocation MessageType = "work_allocation" // Allocation of work to specific roles RoleCollaboration MessageType = "role_collaboration" // Cross-role collaboration message MentorshipRequest MessageType = "mentorship_request" // Junior role requesting mentorship MentorshipResponse MessageType = "mentorship_response" // Senior role providing mentorship ProjectUpdate MessageType = "project_update" // Project-level status updates DeliverableReady MessageType = "deliverable_ready" // Notification that deliverable is complete // RL Context Curator feedback messages FeedbackEvent MessageType = "feedback_event" // Context feedback for RL learning ContextRequest MessageType = "context_request" // Request context from HCFS ContextResponse MessageType = "context_response" // Response with context data ContextUsage MessageType = "context_usage" // Report context usage patterns ContextRelevance MessageType = "context_relevance" // Report context relevance scoring // SLURP event integration messages SlurpEventGenerated MessageType = "slurp_event_generated" // HMMM consensus generated SLURP event SlurpEventAck MessageType = "slurp_event_ack" // Acknowledgment of SLURP event receipt SlurpContextUpdate MessageType = "slurp_context_update" // Context update from SLURP system ) // Message represents a Bzzz/Antennae message type Message struct { Type MessageType `json:"type"` From string `json:"from"` Timestamp time.Time `json:"timestamp"` Data map[string]interface{} `json:"data"` HopCount int `json:"hop_count,omitempty"` // For Antennae hop limiting // Role-based collaboration fields FromRole string `json:"from_role,omitempty"` // Role of sender ToRoles []string `json:"to_roles,omitempty"` // Target roles RequiredExpertise []string `json:"required_expertise,omitempty"` // Required expertise areas ProjectID string `json:"project_id,omitempty"` // Associated project Priority string `json:"priority,omitempty"` // Message priority (low, medium, high, urgent) ThreadID string `json:"thread_id,omitempty"` // Conversation thread ID } // NewPubSub creates a new PubSub instance for Bzzz coordination and HMMM meta-discussion func NewPubSub(ctx context.Context, h host.Host, chorusTopic, hmmmTopic string) (*PubSub, error) { return NewPubSubWithLogger(ctx, h, chorusTopic, hmmmTopic, nil) } // NewPubSubWithLogger creates a new PubSub instance with hypercore logging func NewPubSubWithLogger(ctx context.Context, h host.Host, chorusTopic, hmmmTopic string, logger HypercoreLogger) (*PubSub, error) { if chorusTopic == "" { chorusTopic = "CHORUS/coordination/v1" } if hmmmTopic == "" { hmmmTopic = "hmmm/meta-discussion/v1" } contextTopic := "CHORUS/context-feedback/v1" pubsubCtx, cancel := context.WithCancel(ctx) // Create gossipsub instance with message validation ps, err := pubsub.NewGossipSub(pubsubCtx, h, pubsub.WithMessageSigning(true), pubsub.WithStrictSignatureVerification(true), pubsub.WithValidateQueueSize(256), pubsub.WithValidateThrottle(1024), ) if err != nil { cancel() return nil, fmt.Errorf("failed to create gossipsub: %w", err) } p := &PubSub{ ps: ps, host: h, ctx: pubsubCtx, cancel: cancel, chorusTopicName: chorusTopic, hmmmTopicName: hmmmTopic, contextTopicName: contextTopic, dynamicTopics: make(map[string]*pubsub.Topic), dynamicSubs: make(map[string]*pubsub.Subscription), dynamicHandlers: make(map[string]func([]byte, peer.ID)), hypercoreLog: logger, } // Join static topics if err := p.joinStaticTopics(); err != nil { cancel() return nil, err } // Start message handlers go p.handleBzzzMessages() go p.handleHmmmMessages() go p.handleContextFeedbackMessages() fmt.Printf("📡 PubSub initialized - Bzzz: %s, HMMM: %s, Context: %s\n", chorusTopic, hmmmTopic, contextTopic) return p, nil } // SetRedactor wires the SHHH sentinel so outbound messages are sanitized before publication. func (p *PubSub) SetRedactor(redactor *shhh.Sentinel) { p.redactorMux.Lock() defer p.redactorMux.Unlock() p.redactor = redactor } // SetHmmmMessageHandler sets the handler for incoming HMMM messages. func (p *PubSub) SetHmmmMessageHandler(handler func(msg Message, from peer.ID)) { p.HmmmMessageHandler = handler } // SetContextFeedbackHandler sets the handler for incoming context feedback messages. func (p *PubSub) SetContextFeedbackHandler(handler func(msg Message, from peer.ID)) { p.ContextFeedbackHandler = handler } // joinStaticTopics joins the main Bzzz, HMMM, and Context Feedback topics func (p *PubSub) joinStaticTopics() error { // Join Bzzz coordination topic chorusTopic, err := p.ps.Join(p.chorusTopicName) if err != nil { return fmt.Errorf("failed to join Bzzz topic: %w", err) } p.chorusTopic = chorusTopic chorusSub, err := chorusTopic.Subscribe() if err != nil { return fmt.Errorf("failed to subscribe to Bzzz topic: %w", err) } p.chorusSub = chorusSub // Join HMMM meta-discussion topic hmmmTopic, err := p.ps.Join(p.hmmmTopicName) if err != nil { return fmt.Errorf("failed to join HMMM topic: %w", err) } p.hmmmTopic = hmmmTopic hmmmSub, err := hmmmTopic.Subscribe() if err != nil { return fmt.Errorf("failed to subscribe to HMMM topic: %w", err) } p.hmmmSub = hmmmSub // Join Context Feedback topic contextTopic, err := p.ps.Join(p.contextTopicName) if err != nil { return fmt.Errorf("failed to join Context Feedback topic: %w", err) } p.contextTopic = contextTopic contextSub, err := contextTopic.Subscribe() if err != nil { return fmt.Errorf("failed to subscribe to Context Feedback topic: %w", err) } p.contextSub = contextSub return nil } // subscribeDynamicTopic joins a topic and optionally assigns a raw handler. func (p *PubSub) subscribeDynamicTopic(topicName string, handler func([]byte, peer.ID)) error { if topicName == "" { return fmt.Errorf("topic name cannot be empty") } p.dynamicTopicsMux.RLock() _, exists := p.dynamicTopics[topicName] p.dynamicTopicsMux.RUnlock() if exists { p.dynamicHandlersMux.Lock() p.dynamicHandlers[topicName] = handler p.dynamicHandlersMux.Unlock() return nil } topic, err := p.ps.Join(topicName) if err != nil { return fmt.Errorf("failed to join dynamic topic %s: %w", topicName, err) } sub, err := topic.Subscribe() if err != nil { topic.Close() return fmt.Errorf("failed to subscribe to dynamic topic %s: %w", topicName, err) } p.dynamicTopicsMux.Lock() if _, already := p.dynamicTopics[topicName]; already { p.dynamicTopicsMux.Unlock() sub.Cancel() topic.Close() p.dynamicHandlersMux.Lock() p.dynamicHandlers[topicName] = handler p.dynamicHandlersMux.Unlock() return nil } p.dynamicTopics[topicName] = topic p.dynamicTopicsMux.Unlock() p.dynamicSubsMux.Lock() p.dynamicSubs[topicName] = sub p.dynamicSubsMux.Unlock() p.dynamicHandlersMux.Lock() p.dynamicHandlers[topicName] = handler p.dynamicHandlersMux.Unlock() go p.handleDynamicMessages(topicName, sub) fmt.Printf("✅ Joined dynamic topic: %s\n", topicName) return nil } // JoinDynamicTopic joins a new topic for a specific task func (p *PubSub) JoinDynamicTopic(topicName string) error { return p.subscribeDynamicTopic(topicName, nil) } // SubscribeRawTopic joins a topic and delivers raw payloads to the provided handler. func (p *PubSub) SubscribeRawTopic(topicName string, handler func([]byte, peer.ID)) error { if handler == nil { return fmt.Errorf("handler cannot be nil") } return p.subscribeDynamicTopic(topicName, handler) } // JoinRoleBasedTopics joins topics based on role and expertise func (p *PubSub) JoinRoleBasedTopics(role string, expertise []string, reportsTo []string) error { var topicsToJoin []string // Join role-specific topic if role != "" { roleTopic := fmt.Sprintf("CHORUS/roles/%s/v1", strings.ToLower(strings.ReplaceAll(role, " ", "_"))) topicsToJoin = append(topicsToJoin, roleTopic) } // Join expertise-specific topics for _, exp := range expertise { expertiseTopic := fmt.Sprintf("CHORUS/expertise/%s/v1", strings.ToLower(strings.ReplaceAll(exp, " ", "_"))) topicsToJoin = append(topicsToJoin, expertiseTopic) } // Join reporting hierarchy topics for _, supervisor := range reportsTo { supervisorTopic := fmt.Sprintf("CHORUS/hierarchy/%s/v1", strings.ToLower(strings.ReplaceAll(supervisor, " ", "_"))) topicsToJoin = append(topicsToJoin, supervisorTopic) } // Join all identified topics for _, topicName := range topicsToJoin { if err := p.JoinDynamicTopic(topicName); err != nil { fmt.Printf("⚠️ Failed to join role-based topic %s: %v\n", topicName, err) continue } } fmt.Printf("🎯 Joined %d role-based topics for role: %s\n", len(topicsToJoin), role) return nil } // JoinProjectTopic joins a project-specific topic func (p *PubSub) JoinProjectTopic(projectID string) error { if projectID == "" { return fmt.Errorf("project ID cannot be empty") } topicName := fmt.Sprintf("CHORUS/projects/%s/coordination/v1", projectID) return p.JoinDynamicTopic(topicName) } // LeaveDynamicTopic leaves a specific task topic func (p *PubSub) LeaveDynamicTopic(topicName string) { p.dynamicTopicsMux.Lock() defer p.dynamicTopicsMux.Unlock() p.dynamicSubsMux.Lock() defer p.dynamicSubsMux.Unlock() if sub, exists := p.dynamicSubs[topicName]; exists { sub.Cancel() delete(p.dynamicSubs, topicName) } if topic, exists := p.dynamicTopics[topicName]; exists { topic.Close() delete(p.dynamicTopics, topicName) } p.dynamicHandlersMux.Lock() delete(p.dynamicHandlers, topicName) p.dynamicHandlersMux.Unlock() fmt.Printf("🗑️ Left dynamic topic: %s\n", topicName) } // PublishToDynamicTopic publishes a message to a specific dynamic topic func (p *PubSub) PublishToDynamicTopic(topicName string, msgType MessageType, data map[string]interface{}) error { p.dynamicTopicsMux.RLock() topic, exists := p.dynamicTopics[topicName] p.dynamicTopicsMux.RUnlock() if !exists { return fmt.Errorf("not subscribed to dynamic topic: %s", topicName) } payload := p.sanitizePayload(topicName, msgType, data) msg := Message{ Type: msgType, From: p.host.ID().String(), Timestamp: time.Now(), Data: payload, } msgBytes, err := json.Marshal(msg) if err != nil { return fmt.Errorf("failed to marshal message for dynamic topic: %w", err) } return topic.Publish(p.ctx, msgBytes) } // PublishRaw publishes a raw JSON payload to the given topic name without // wrapping it in the CHORUS Message envelope. Intended for HMMM per-issue rooms // or other modules that maintain their own schemas. func (p *PubSub) PublishRaw(topicName string, payload []byte) error { // Dynamic topic p.dynamicTopicsMux.RLock() if topic, exists := p.dynamicTopics[topicName]; exists { p.dynamicTopicsMux.RUnlock() return topic.Publish(p.ctx, payload) } p.dynamicTopicsMux.RUnlock() // Static topics by name switch topicName { case p.chorusTopicName: return p.chorusTopic.Publish(p.ctx, payload) case p.hmmmTopicName: return p.hmmmTopic.Publish(p.ctx, payload) case p.contextTopicName: return p.contextTopic.Publish(p.ctx, payload) default: return fmt.Errorf("not subscribed to topic: %s", topicName) } } // PublishBzzzMessage publishes a message to the Bzzz coordination topic func (p *PubSub) PublishBzzzMessage(msgType MessageType, data map[string]interface{}) error { payload := p.sanitizePayload(p.chorusTopicName, msgType, data) msg := Message{ Type: msgType, From: p.host.ID().String(), Timestamp: time.Now(), Data: payload, } msgBytes, err := json.Marshal(msg) if err != nil { return fmt.Errorf("failed to marshal message: %w", err) } return p.chorusTopic.Publish(p.ctx, msgBytes) } // PublishHmmmMessage publishes a message to the HMMM meta-discussion topic func (p *PubSub) PublishHmmmMessage(msgType MessageType, data map[string]interface{}) error { payload := p.sanitizePayload(p.hmmmTopicName, msgType, data) msg := Message{ Type: msgType, From: p.host.ID().String(), Timestamp: time.Now(), Data: payload, } msgBytes, err := json.Marshal(msg) if err != nil { return fmt.Errorf("failed to marshal message: %w", err) } return p.hmmmTopic.Publish(p.ctx, msgBytes) } // PublishAntennaeMessage is a compatibility alias for PublishHmmmMessage // Deprecated: Use PublishHmmmMessage instead func (p *PubSub) PublishAntennaeMessage(msgType MessageType, data map[string]interface{}) error { return p.PublishHmmmMessage(msgType, data) } // SetAntennaeMessageHandler is a compatibility alias for SetHmmmMessageHandler // Deprecated: Use SetHmmmMessageHandler instead func (p *PubSub) SetAntennaeMessageHandler(handler func(msg Message, from peer.ID)) { p.SetHmmmMessageHandler(handler) } // PublishContextFeedbackMessage publishes a message to the Context Feedback topic func (p *PubSub) PublishContextFeedbackMessage(msgType MessageType, data map[string]interface{}) error { payload := p.sanitizePayload(p.contextTopicName, msgType, data) msg := Message{ Type: msgType, From: p.host.ID().String(), Timestamp: time.Now(), Data: payload, } msgBytes, err := json.Marshal(msg) if err != nil { return fmt.Errorf("failed to marshal context feedback message: %w", err) } return p.contextTopic.Publish(p.ctx, msgBytes) } // PublishRoleBasedMessage publishes a role-based collaboration message func (p *PubSub) PublishRoleBasedMessage(msgType MessageType, data map[string]interface{}, opts MessageOptions) error { topicName := p.chorusTopicName if isRoleMessage(msgType) { topicName = p.hmmmTopicName } payload := p.sanitizePayload(topicName, msgType, data) msg := Message{ Type: msgType, From: p.host.ID().String(), Timestamp: time.Now(), Data: payload, FromRole: opts.FromRole, ToRoles: opts.ToRoles, RequiredExpertise: opts.RequiredExpertise, ProjectID: opts.ProjectID, Priority: opts.Priority, ThreadID: opts.ThreadID, } msgBytes, err := json.Marshal(msg) if err != nil { return fmt.Errorf("failed to marshal role-based message: %w", err) } // Determine which topic to use based on message type var topic *pubsub.Topic switch { case isRoleMessage(msgType): topic = p.hmmmTopic // Use HMMM topic for role-based messages default: topic = p.chorusTopic // Default to Bzzz topic } return topic.Publish(p.ctx, msgBytes) } // PublishSlurpEventGenerated publishes a SLURP event generation notification func (p *PubSub) PublishSlurpEventGenerated(data map[string]interface{}) error { return p.PublishHmmmMessage(SlurpEventGenerated, data) } // PublishSlurpEventAck publishes a SLURP event acknowledgment func (p *PubSub) PublishSlurpEventAck(data map[string]interface{}) error { return p.PublishHmmmMessage(SlurpEventAck, data) } // PublishSlurpContextUpdate publishes a SLURP context update notification func (p *PubSub) PublishSlurpContextUpdate(data map[string]interface{}) error { return p.PublishHmmmMessage(SlurpContextUpdate, data) } // PublishSlurpIntegrationEvent publishes a generic SLURP integration event func (p *PubSub) PublishSlurpIntegrationEvent(eventType string, discussionID string, slurpEvent map[string]interface{}) error { data := map[string]interface{}{ "event_type": eventType, "discussion_id": discussionID, "slurp_event": slurpEvent, "timestamp": time.Now(), "source": "hmmm-slurp-integration", "peer_id": p.host.ID().String(), } return p.PublishSlurpEventGenerated(data) } // GetHypercoreLog returns the hypercore logger for external access func (p *PubSub) GetHypercoreLog() HypercoreLogger { return p.hypercoreLog } // MessageOptions holds options for role-based messages type MessageOptions struct { FromRole string ToRoles []string RequiredExpertise []string ProjectID string Priority string ThreadID string } // handleBzzzMessages processes incoming Bzzz coordination messages func (p *PubSub) handleBzzzMessages() { for { msg, err := p.chorusSub.Next(p.ctx) if err != nil { if p.ctx.Err() != nil { return // Context cancelled } fmt.Printf("❌ Error receiving Bzzz message: %v\n", err) continue } if msg.ReceivedFrom == p.host.ID() { continue } var chorusMsg Message if err := json.Unmarshal(msg.Data, &chorusMsg); err != nil { fmt.Printf("❌ Failed to unmarshal Bzzz message: %v\n", err) continue } p.processBzzzMessage(chorusMsg, msg.ReceivedFrom) } } // handleHmmmMessages processes incoming HMMM meta-discussion messages func (p *PubSub) handleHmmmMessages() { for { msg, err := p.hmmmSub.Next(p.ctx) if err != nil { if p.ctx.Err() != nil { return // Context cancelled } fmt.Printf("❌ Error receiving HMMM message: %v\n", err) continue } if msg.ReceivedFrom == p.host.ID() { continue } var hmmmMsg Message if err := json.Unmarshal(msg.Data, &hmmmMsg); err != nil { fmt.Printf("❌ Failed to unmarshal HMMM message: %v\n", err) continue } if p.HmmmMessageHandler != nil { p.HmmmMessageHandler(hmmmMsg, msg.ReceivedFrom) } else { p.processHmmmMessage(hmmmMsg, msg.ReceivedFrom) } } } // handleContextFeedbackMessages processes incoming context feedback messages func (p *PubSub) handleContextFeedbackMessages() { for { msg, err := p.contextSub.Next(p.ctx) if err != nil { if p.ctx.Err() != nil { return // Context cancelled } fmt.Printf("❌ Error receiving Context Feedback message: %v\n", err) continue } if msg.ReceivedFrom == p.host.ID() { continue } var contextMsg Message if err := json.Unmarshal(msg.Data, &contextMsg); err != nil { fmt.Printf("❌ Failed to unmarshal Context Feedback message: %v\n", err) continue } if p.ContextFeedbackHandler != nil { p.ContextFeedbackHandler(contextMsg, msg.ReceivedFrom) } else { p.processContextFeedbackMessage(contextMsg, msg.ReceivedFrom) } } } // getDynamicHandler returns the raw handler for a topic if registered. func (p *PubSub) getDynamicHandler(topicName string) func([]byte, peer.ID) { p.dynamicHandlersMux.RLock() handler := p.dynamicHandlers[topicName] p.dynamicHandlersMux.RUnlock() return handler } // handleDynamicMessages processes messages from a dynamic topic subscription func (p *PubSub) handleDynamicMessages(topicName string, sub *pubsub.Subscription) { for { msg, err := sub.Next(p.ctx) if err != nil { if p.ctx.Err() != nil || err.Error() == "subscription cancelled" { return // Subscription was cancelled, exit handler } fmt.Printf("❌ Error receiving dynamic message on %s: %v\n", topicName, err) continue } if msg.ReceivedFrom == p.host.ID() { continue } if handler := p.getDynamicHandler(topicName); handler != nil { handler(msg.Data, msg.ReceivedFrom) continue } var dynamicMsg Message if err := json.Unmarshal(msg.Data, &dynamicMsg); err != nil { fmt.Printf("❌ Failed to unmarshal dynamic message on %s: %v\n", topicName, err) continue } // Use the main HMMM handler for all dynamic messages without custom handlers if p.HmmmMessageHandler != nil { p.HmmmMessageHandler(dynamicMsg, msg.ReceivedFrom) } } } // processBzzzMessage handles different types of Bzzz coordination messages func (p *PubSub) processBzzzMessage(msg Message, from peer.ID) { fmt.Printf("🐝 Bzzz [%s] from %s: %v\n", msg.Type, from.ShortString(), msg.Data) // Log to hypercore if logger is available if p.hypercoreLog != nil { logData := map[string]interface{}{ "message_type": string(msg.Type), "from_peer": from.String(), "from_short": from.ShortString(), "timestamp": msg.Timestamp, "data": msg.Data, "topic": "CHORUS", } // Map pubsub message types to hypercore log types var logType string switch msg.Type { case TaskAnnouncement: logType = "task_announced" case TaskClaim: logType = "task_claimed" case TaskProgress: logType = "task_progress" case TaskComplete: logType = "task_completed" case CapabilityBcast: logType = "capability_broadcast" case AvailabilityBcast: logType = "network_event" default: logType = "network_event" } if err := p.hypercoreLog.AppendString(logType, logData); err != nil { fmt.Printf("❌ Failed to log Bzzz message to hypercore: %v\n", err) } } } // processHmmmMessage provides default handling for HMMM messages if no external handler is set func (p *PubSub) processHmmmMessage(msg Message, from peer.ID) { fmt.Printf("🎯 Default HMMM Handler [%s] from %s: %v\n", msg.Type, from.ShortString(), msg.Data) // Log to hypercore if logger is available if p.hypercoreLog != nil { logData := map[string]interface{}{ "message_type": string(msg.Type), "from_peer": from.String(), "from_short": from.ShortString(), "timestamp": msg.Timestamp, "data": msg.Data, "topic": "hmmm", "from_role": msg.FromRole, "to_roles": msg.ToRoles, "required_expertise": msg.RequiredExpertise, "project_id": msg.ProjectID, "priority": msg.Priority, "thread_id": msg.ThreadID, } // Map pubsub message types to hypercore log types var logType string switch msg.Type { case MetaDiscussion, TaskHelpRequest, TaskHelpResponse: logType = "collaboration" case CoordinationRequest, CoordinationComplete: logType = "collaboration" case DependencyAlert: logType = "collaboration" case EscalationTrigger: logType = "escalation" case RoleAnnouncement, ExpertiseRequest, ExpertiseResponse: logType = "collaboration" case StatusUpdate, WorkAllocation, RoleCollaboration: logType = "collaboration" case MentorshipRequest, MentorshipResponse: logType = "collaboration" case ProjectUpdate, DeliverableReady: logType = "collaboration" default: logType = "collaboration" } if err := p.hypercoreLog.AppendString(logType, logData); err != nil { fmt.Printf("❌ Failed to log HMMM message to hypercore: %v\n", err) } } } // processContextFeedbackMessage provides default handling for context feedback messages if no external handler is set func (p *PubSub) processContextFeedbackMessage(msg Message, from peer.ID) { fmt.Printf("🧠 Context Feedback [%s] from %s: %v\n", msg.Type, from.ShortString(), msg.Data) // Log to hypercore if logger is available if p.hypercoreLog != nil { logData := map[string]interface{}{ "message_type": string(msg.Type), "from_peer": from.String(), "from_short": from.ShortString(), "timestamp": msg.Timestamp, "data": msg.Data, "topic": "context_feedback", "from_role": msg.FromRole, "to_roles": msg.ToRoles, "project_id": msg.ProjectID, "priority": msg.Priority, "thread_id": msg.ThreadID, } // Map context feedback message types to hypercore log types var logType string switch msg.Type { case FeedbackEvent: logType = "context_feedback" case ContextRequest, ContextResponse: logType = "context_request" case ContextUsage, ContextRelevance: logType = "context_usage" default: logType = "context_feedback" } if err := p.hypercoreLog.AppendString(logType, logData); err != nil { fmt.Printf("❌ Failed to log Context Feedback message to hypercore: %v\n", err) } } } func (p *PubSub) sanitizePayload(topic string, msgType MessageType, data map[string]interface{}) map[string]interface{} { if data == nil { return nil } cloned := clonePayloadMap(data) p.redactorMux.RLock() redactor := p.redactor p.redactorMux.RUnlock() if redactor != nil { labels := map[string]string{ "source": "pubsub", "topic": topic, "message_type": string(msgType), } redactor.RedactMapWithLabels(context.Background(), cloned, labels) } return cloned } func isRoleMessage(msgType MessageType) bool { switch msgType { case RoleAnnouncement, ExpertiseRequest, ExpertiseResponse, StatusUpdate, WorkAllocation, RoleCollaboration, MentorshipRequest, MentorshipResponse, ProjectUpdate, DeliverableReady: return true default: return false } } func clonePayloadMap(in map[string]interface{}) map[string]interface{} { if in == nil { return nil } out := make(map[string]interface{}, len(in)) for k, v := range in { out[k] = clonePayloadValue(v) } return out } func clonePayloadValue(v interface{}) interface{} { switch tv := v.(type) { case map[string]interface{}: return clonePayloadMap(tv) case []interface{}: return clonePayloadSlice(tv) case []string: return append([]string(nil), tv...) default: return tv } } func clonePayloadSlice(in []interface{}) []interface{} { out := make([]interface{}, len(in)) for i, val := range in { out[i] = clonePayloadValue(val) } return out } // Close shuts down the PubSub instance func (p *PubSub) Close() error { p.cancel() if p.chorusSub != nil { p.chorusSub.Cancel() } if p.hmmmSub != nil { p.hmmmSub.Cancel() } if p.contextSub != nil { p.contextSub.Cancel() } if p.chorusTopic != nil { p.chorusTopic.Close() } if p.hmmmTopic != nil { p.hmmmTopic.Close() } if p.contextTopic != nil { p.contextTopic.Close() } p.dynamicSubsMux.Lock() for _, sub := range p.dynamicSubs { sub.Cancel() } p.dynamicSubsMux.Unlock() p.dynamicTopicsMux.Lock() for _, topic := range p.dynamicTopics { topic.Close() } p.dynamicTopicsMux.Unlock() return nil }