Harden CHORUS security and messaging stack
This commit is contained in:
443
pubsub/pubsub.go
443
pubsub/pubsub.go
@@ -8,9 +8,10 @@ import (
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"chorus/pkg/shhh"
|
||||
pubsub "github.com/libp2p/go-libp2p-pubsub"
|
||||
"github.com/libp2p/go-libp2p/core/host"
|
||||
"github.com/libp2p/go-libp2p/core/peer"
|
||||
pubsub "github.com/libp2p/go-libp2p-pubsub"
|
||||
)
|
||||
|
||||
// PubSub handles publish/subscribe messaging for Bzzz coordination and HMMM meta-discussion
|
||||
@@ -19,36 +20,42 @@ type PubSub struct {
|
||||
host host.Host
|
||||
ctx context.Context
|
||||
cancel context.CancelFunc
|
||||
|
||||
|
||||
// Topic subscriptions
|
||||
chorusTopic *pubsub.Topic
|
||||
hmmmTopic *pubsub.Topic
|
||||
contextTopic *pubsub.Topic
|
||||
|
||||
chorusTopic *pubsub.Topic
|
||||
hmmmTopic *pubsub.Topic
|
||||
contextTopic *pubsub.Topic
|
||||
|
||||
// Message subscriptions
|
||||
chorusSub *pubsub.Subscription
|
||||
hmmmSub *pubsub.Subscription
|
||||
contextSub *pubsub.Subscription
|
||||
|
||||
chorusSub *pubsub.Subscription
|
||||
hmmmSub *pubsub.Subscription
|
||||
contextSub *pubsub.Subscription
|
||||
|
||||
// Dynamic topic management
|
||||
dynamicTopics map[string]*pubsub.Topic
|
||||
dynamicTopicsMux sync.RWMutex
|
||||
dynamicSubs map[string]*pubsub.Subscription
|
||||
dynamicSubsMux sync.RWMutex
|
||||
dynamicTopics map[string]*pubsub.Topic
|
||||
dynamicTopicsMux sync.RWMutex
|
||||
dynamicSubs map[string]*pubsub.Subscription
|
||||
dynamicSubsMux sync.RWMutex
|
||||
dynamicHandlers map[string]func([]byte, peer.ID)
|
||||
dynamicHandlersMux sync.RWMutex
|
||||
|
||||
// Configuration
|
||||
chorusTopicName string
|
||||
hmmmTopicName string
|
||||
contextTopicName string
|
||||
chorusTopicName string
|
||||
hmmmTopicName string
|
||||
contextTopicName string
|
||||
|
||||
// External message handler for HMMM messages
|
||||
HmmmMessageHandler func(msg Message, from peer.ID)
|
||||
|
||||
HmmmMessageHandler func(msg Message, from peer.ID)
|
||||
|
||||
// External message handler for Context Feedback messages
|
||||
ContextFeedbackHandler func(msg Message, from peer.ID)
|
||||
|
||||
|
||||
// Hypercore-style logging
|
||||
hypercoreLog HypercoreLogger
|
||||
|
||||
// SHHH sentinel
|
||||
redactor *shhh.Sentinel
|
||||
redactorMux sync.RWMutex
|
||||
}
|
||||
|
||||
// HypercoreLogger interface for dependency injection
|
||||
@@ -62,45 +69,45 @@ type MessageType string
|
||||
|
||||
const (
|
||||
// Bzzz coordination messages
|
||||
TaskAnnouncement MessageType = "task_announcement"
|
||||
TaskClaim MessageType = "task_claim"
|
||||
TaskProgress MessageType = "task_progress"
|
||||
TaskComplete MessageType = "task_complete"
|
||||
CapabilityBcast MessageType = "capability_broadcast" // Only broadcast when capabilities change
|
||||
TaskAnnouncement MessageType = "task_announcement"
|
||||
TaskClaim MessageType = "task_claim"
|
||||
TaskProgress MessageType = "task_progress"
|
||||
TaskComplete MessageType = "task_complete"
|
||||
CapabilityBcast MessageType = "capability_broadcast" // Only broadcast when capabilities change
|
||||
AvailabilityBcast MessageType = "availability_broadcast" // Regular availability status
|
||||
|
||||
|
||||
// HMMM meta-discussion messages
|
||||
MetaDiscussion MessageType = "meta_discussion" // Generic type for all discussion
|
||||
TaskHelpRequest MessageType = "task_help_request" // Request for assistance
|
||||
TaskHelpResponse MessageType = "task_help_response" // Response to a help request
|
||||
CoordinationRequest MessageType = "coordination_request" // Request for coordination
|
||||
CoordinationComplete MessageType = "coordination_complete" // Coordination session completed
|
||||
DependencyAlert MessageType = "dependency_alert" // Dependency detected
|
||||
EscalationTrigger MessageType = "escalation_trigger" // Human escalation needed
|
||||
|
||||
MetaDiscussion MessageType = "meta_discussion" // Generic type for all discussion
|
||||
TaskHelpRequest MessageType = "task_help_request" // Request for assistance
|
||||
TaskHelpResponse MessageType = "task_help_response" // Response to a help request
|
||||
CoordinationRequest MessageType = "coordination_request" // Request for coordination
|
||||
CoordinationComplete MessageType = "coordination_complete" // Coordination session completed
|
||||
DependencyAlert MessageType = "dependency_alert" // Dependency detected
|
||||
EscalationTrigger MessageType = "escalation_trigger" // Human escalation needed
|
||||
|
||||
// Role-based collaboration messages
|
||||
RoleAnnouncement MessageType = "role_announcement" // Agent announces its role and capabilities
|
||||
ExpertiseRequest MessageType = "expertise_request" // Request for specific expertise
|
||||
ExpertiseResponse MessageType = "expertise_response" // Response offering expertise
|
||||
StatusUpdate MessageType = "status_update" // Regular status updates from agents
|
||||
WorkAllocation MessageType = "work_allocation" // Allocation of work to specific roles
|
||||
RoleCollaboration MessageType = "role_collaboration" // Cross-role collaboration message
|
||||
MentorshipRequest MessageType = "mentorship_request" // Junior role requesting mentorship
|
||||
MentorshipResponse MessageType = "mentorship_response" // Senior role providing mentorship
|
||||
ProjectUpdate MessageType = "project_update" // Project-level status updates
|
||||
DeliverableReady MessageType = "deliverable_ready" // Notification that deliverable is complete
|
||||
|
||||
RoleAnnouncement MessageType = "role_announcement" // Agent announces its role and capabilities
|
||||
ExpertiseRequest MessageType = "expertise_request" // Request for specific expertise
|
||||
ExpertiseResponse MessageType = "expertise_response" // Response offering expertise
|
||||
StatusUpdate MessageType = "status_update" // Regular status updates from agents
|
||||
WorkAllocation MessageType = "work_allocation" // Allocation of work to specific roles
|
||||
RoleCollaboration MessageType = "role_collaboration" // Cross-role collaboration message
|
||||
MentorshipRequest MessageType = "mentorship_request" // Junior role requesting mentorship
|
||||
MentorshipResponse MessageType = "mentorship_response" // Senior role providing mentorship
|
||||
ProjectUpdate MessageType = "project_update" // Project-level status updates
|
||||
DeliverableReady MessageType = "deliverable_ready" // Notification that deliverable is complete
|
||||
|
||||
// RL Context Curator feedback messages
|
||||
FeedbackEvent MessageType = "feedback_event" // Context feedback for RL learning
|
||||
ContextRequest MessageType = "context_request" // Request context from HCFS
|
||||
ContextResponse MessageType = "context_response" // Response with context data
|
||||
ContextUsage MessageType = "context_usage" // Report context usage patterns
|
||||
ContextRelevance MessageType = "context_relevance" // Report context relevance scoring
|
||||
|
||||
FeedbackEvent MessageType = "feedback_event" // Context feedback for RL learning
|
||||
ContextRequest MessageType = "context_request" // Request context from HCFS
|
||||
ContextResponse MessageType = "context_response" // Response with context data
|
||||
ContextUsage MessageType = "context_usage" // Report context usage patterns
|
||||
ContextRelevance MessageType = "context_relevance" // Report context relevance scoring
|
||||
|
||||
// SLURP event integration messages
|
||||
SlurpEventGenerated MessageType = "slurp_event_generated" // HMMM consensus generated SLURP event
|
||||
SlurpEventAck MessageType = "slurp_event_ack" // Acknowledgment of SLURP event receipt
|
||||
SlurpContextUpdate MessageType = "slurp_context_update" // Context update from SLURP system
|
||||
SlurpEventGenerated MessageType = "slurp_event_generated" // HMMM consensus generated SLURP event
|
||||
SlurpEventAck MessageType = "slurp_event_ack" // Acknowledgment of SLURP event receipt
|
||||
SlurpContextUpdate MessageType = "slurp_context_update" // Context update from SLURP system
|
||||
)
|
||||
|
||||
// Message represents a Bzzz/Antennae message
|
||||
@@ -110,14 +117,14 @@ type Message struct {
|
||||
Timestamp time.Time `json:"timestamp"`
|
||||
Data map[string]interface{} `json:"data"`
|
||||
HopCount int `json:"hop_count,omitempty"` // For Antennae hop limiting
|
||||
|
||||
|
||||
// Role-based collaboration fields
|
||||
FromRole string `json:"from_role,omitempty"` // Role of sender
|
||||
ToRoles []string `json:"to_roles,omitempty"` // Target roles
|
||||
FromRole string `json:"from_role,omitempty"` // Role of sender
|
||||
ToRoles []string `json:"to_roles,omitempty"` // Target roles
|
||||
RequiredExpertise []string `json:"required_expertise,omitempty"` // Required expertise areas
|
||||
ProjectID string `json:"project_id,omitempty"` // Associated project
|
||||
Priority string `json:"priority,omitempty"` // Message priority (low, medium, high, urgent)
|
||||
ThreadID string `json:"thread_id,omitempty"` // Conversation thread ID
|
||||
ProjectID string `json:"project_id,omitempty"` // Associated project
|
||||
Priority string `json:"priority,omitempty"` // Message priority (low, medium, high, urgent)
|
||||
ThreadID string `json:"thread_id,omitempty"` // Conversation thread ID
|
||||
}
|
||||
|
||||
// NewPubSub creates a new PubSub instance for Bzzz coordination and HMMM meta-discussion
|
||||
@@ -150,16 +157,17 @@ func NewPubSubWithLogger(ctx context.Context, h host.Host, chorusTopic, hmmmTopi
|
||||
}
|
||||
|
||||
p := &PubSub{
|
||||
ps: ps,
|
||||
host: h,
|
||||
ctx: pubsubCtx,
|
||||
cancel: cancel,
|
||||
chorusTopicName: chorusTopic,
|
||||
ps: ps,
|
||||
host: h,
|
||||
ctx: pubsubCtx,
|
||||
cancel: cancel,
|
||||
chorusTopicName: chorusTopic,
|
||||
hmmmTopicName: hmmmTopic,
|
||||
contextTopicName: contextTopic,
|
||||
dynamicTopics: make(map[string]*pubsub.Topic),
|
||||
dynamicSubs: make(map[string]*pubsub.Subscription),
|
||||
hypercoreLog: logger,
|
||||
dynamicTopics: make(map[string]*pubsub.Topic),
|
||||
dynamicSubs: make(map[string]*pubsub.Subscription),
|
||||
dynamicHandlers: make(map[string]func([]byte, peer.ID)),
|
||||
hypercoreLog: logger,
|
||||
}
|
||||
|
||||
// Join static topics
|
||||
@@ -177,6 +185,13 @@ func NewPubSubWithLogger(ctx context.Context, h host.Host, chorusTopic, hmmmTopi
|
||||
return p, nil
|
||||
}
|
||||
|
||||
// SetRedactor wires the SHHH sentinel so outbound messages are sanitized before publication.
|
||||
func (p *PubSub) SetRedactor(redactor *shhh.Sentinel) {
|
||||
p.redactorMux.Lock()
|
||||
defer p.redactorMux.Unlock()
|
||||
p.redactor = redactor
|
||||
}
|
||||
|
||||
// SetHmmmMessageHandler sets the handler for incoming HMMM messages.
|
||||
func (p *PubSub) SetHmmmMessageHandler(handler func(msg Message, from peer.ID)) {
|
||||
p.HmmmMessageHandler = handler
|
||||
@@ -231,15 +246,21 @@ func (p *PubSub) joinStaticTopics() error {
|
||||
return nil
|
||||
}
|
||||
|
||||
// JoinDynamicTopic joins a new topic for a specific task
|
||||
func (p *PubSub) JoinDynamicTopic(topicName string) error {
|
||||
p.dynamicTopicsMux.Lock()
|
||||
defer p.dynamicTopicsMux.Unlock()
|
||||
p.dynamicSubsMux.Lock()
|
||||
defer p.dynamicSubsMux.Unlock()
|
||||
// subscribeDynamicTopic joins a topic and optionally assigns a raw handler.
|
||||
func (p *PubSub) subscribeDynamicTopic(topicName string, handler func([]byte, peer.ID)) error {
|
||||
if topicName == "" {
|
||||
return fmt.Errorf("topic name cannot be empty")
|
||||
}
|
||||
|
||||
if _, exists := p.dynamicTopics[topicName]; exists {
|
||||
return nil // Already joined
|
||||
p.dynamicTopicsMux.RLock()
|
||||
_, exists := p.dynamicTopics[topicName]
|
||||
p.dynamicTopicsMux.RUnlock()
|
||||
|
||||
if exists {
|
||||
p.dynamicHandlersMux.Lock()
|
||||
p.dynamicHandlers[topicName] = handler
|
||||
p.dynamicHandlersMux.Unlock()
|
||||
return nil
|
||||
}
|
||||
|
||||
topic, err := p.ps.Join(topicName)
|
||||
@@ -253,38 +274,68 @@ func (p *PubSub) JoinDynamicTopic(topicName string) error {
|
||||
return fmt.Errorf("failed to subscribe to dynamic topic %s: %w", topicName, err)
|
||||
}
|
||||
|
||||
p.dynamicTopicsMux.Lock()
|
||||
if _, already := p.dynamicTopics[topicName]; already {
|
||||
p.dynamicTopicsMux.Unlock()
|
||||
sub.Cancel()
|
||||
topic.Close()
|
||||
p.dynamicHandlersMux.Lock()
|
||||
p.dynamicHandlers[topicName] = handler
|
||||
p.dynamicHandlersMux.Unlock()
|
||||
return nil
|
||||
}
|
||||
p.dynamicTopics[topicName] = topic
|
||||
p.dynamicSubs[topicName] = sub
|
||||
p.dynamicTopicsMux.Unlock()
|
||||
|
||||
// Start a handler for this new subscription
|
||||
go p.handleDynamicMessages(sub)
|
||||
p.dynamicSubsMux.Lock()
|
||||
p.dynamicSubs[topicName] = sub
|
||||
p.dynamicSubsMux.Unlock()
|
||||
|
||||
p.dynamicHandlersMux.Lock()
|
||||
p.dynamicHandlers[topicName] = handler
|
||||
p.dynamicHandlersMux.Unlock()
|
||||
|
||||
go p.handleDynamicMessages(topicName, sub)
|
||||
|
||||
fmt.Printf("✅ Joined dynamic topic: %s\n", topicName)
|
||||
return nil
|
||||
}
|
||||
|
||||
// JoinDynamicTopic joins a new topic for a specific task
|
||||
func (p *PubSub) JoinDynamicTopic(topicName string) error {
|
||||
return p.subscribeDynamicTopic(topicName, nil)
|
||||
}
|
||||
|
||||
// SubscribeRawTopic joins a topic and delivers raw payloads to the provided handler.
|
||||
func (p *PubSub) SubscribeRawTopic(topicName string, handler func([]byte, peer.ID)) error {
|
||||
if handler == nil {
|
||||
return fmt.Errorf("handler cannot be nil")
|
||||
}
|
||||
return p.subscribeDynamicTopic(topicName, handler)
|
||||
}
|
||||
|
||||
// JoinRoleBasedTopics joins topics based on role and expertise
|
||||
func (p *PubSub) JoinRoleBasedTopics(role string, expertise []string, reportsTo []string) error {
|
||||
var topicsToJoin []string
|
||||
|
||||
|
||||
// Join role-specific topic
|
||||
if role != "" {
|
||||
roleTopic := fmt.Sprintf("CHORUS/roles/%s/v1", strings.ToLower(strings.ReplaceAll(role, " ", "_")))
|
||||
topicsToJoin = append(topicsToJoin, roleTopic)
|
||||
}
|
||||
|
||||
|
||||
// Join expertise-specific topics
|
||||
for _, exp := range expertise {
|
||||
expertiseTopic := fmt.Sprintf("CHORUS/expertise/%s/v1", strings.ToLower(strings.ReplaceAll(exp, " ", "_")))
|
||||
topicsToJoin = append(topicsToJoin, expertiseTopic)
|
||||
}
|
||||
|
||||
|
||||
// Join reporting hierarchy topics
|
||||
for _, supervisor := range reportsTo {
|
||||
supervisorTopic := fmt.Sprintf("CHORUS/hierarchy/%s/v1", strings.ToLower(strings.ReplaceAll(supervisor, " ", "_")))
|
||||
topicsToJoin = append(topicsToJoin, supervisorTopic)
|
||||
}
|
||||
|
||||
|
||||
// Join all identified topics
|
||||
for _, topicName := range topicsToJoin {
|
||||
if err := p.JoinDynamicTopic(topicName); err != nil {
|
||||
@@ -292,7 +343,7 @@ func (p *PubSub) JoinRoleBasedTopics(role string, expertise []string, reportsTo
|
||||
continue
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
fmt.Printf("🎯 Joined %d role-based topics for role: %s\n", len(topicsToJoin), role)
|
||||
return nil
|
||||
}
|
||||
@@ -302,7 +353,7 @@ func (p *PubSub) JoinProjectTopic(projectID string) error {
|
||||
if projectID == "" {
|
||||
return fmt.Errorf("project ID cannot be empty")
|
||||
}
|
||||
|
||||
|
||||
topicName := fmt.Sprintf("CHORUS/projects/%s/coordination/v1", projectID)
|
||||
return p.JoinDynamicTopic(topicName)
|
||||
}
|
||||
@@ -324,6 +375,10 @@ func (p *PubSub) LeaveDynamicTopic(topicName string) {
|
||||
delete(p.dynamicTopics, topicName)
|
||||
}
|
||||
|
||||
p.dynamicHandlersMux.Lock()
|
||||
delete(p.dynamicHandlers, topicName)
|
||||
p.dynamicHandlersMux.Unlock()
|
||||
|
||||
fmt.Printf("🗑️ Left dynamic topic: %s\n", topicName)
|
||||
}
|
||||
|
||||
@@ -337,11 +392,12 @@ func (p *PubSub) PublishToDynamicTopic(topicName string, msgType MessageType, da
|
||||
return fmt.Errorf("not subscribed to dynamic topic: %s", topicName)
|
||||
}
|
||||
|
||||
payload := p.sanitizePayload(topicName, msgType, data)
|
||||
msg := Message{
|
||||
Type: msgType,
|
||||
From: p.host.ID().String(),
|
||||
Timestamp: time.Now(),
|
||||
Data: data,
|
||||
Data: payload,
|
||||
}
|
||||
|
||||
msgBytes, err := json.Marshal(msg)
|
||||
@@ -356,34 +412,35 @@ func (p *PubSub) PublishToDynamicTopic(topicName string, msgType MessageType, da
|
||||
// wrapping it in the CHORUS Message envelope. Intended for HMMM per-issue rooms
|
||||
// or other modules that maintain their own schemas.
|
||||
func (p *PubSub) PublishRaw(topicName string, payload []byte) error {
|
||||
// Dynamic topic
|
||||
p.dynamicTopicsMux.RLock()
|
||||
if topic, exists := p.dynamicTopics[topicName]; exists {
|
||||
p.dynamicTopicsMux.RUnlock()
|
||||
return topic.Publish(p.ctx, payload)
|
||||
}
|
||||
p.dynamicTopicsMux.RUnlock()
|
||||
// Dynamic topic
|
||||
p.dynamicTopicsMux.RLock()
|
||||
if topic, exists := p.dynamicTopics[topicName]; exists {
|
||||
p.dynamicTopicsMux.RUnlock()
|
||||
return topic.Publish(p.ctx, payload)
|
||||
}
|
||||
p.dynamicTopicsMux.RUnlock()
|
||||
|
||||
// Static topics by name
|
||||
switch topicName {
|
||||
case p.chorusTopicName:
|
||||
return p.chorusTopic.Publish(p.ctx, payload)
|
||||
case p.hmmmTopicName:
|
||||
return p.hmmmTopic.Publish(p.ctx, payload)
|
||||
case p.contextTopicName:
|
||||
return p.contextTopic.Publish(p.ctx, payload)
|
||||
default:
|
||||
return fmt.Errorf("not subscribed to topic: %s", topicName)
|
||||
}
|
||||
// Static topics by name
|
||||
switch topicName {
|
||||
case p.chorusTopicName:
|
||||
return p.chorusTopic.Publish(p.ctx, payload)
|
||||
case p.hmmmTopicName:
|
||||
return p.hmmmTopic.Publish(p.ctx, payload)
|
||||
case p.contextTopicName:
|
||||
return p.contextTopic.Publish(p.ctx, payload)
|
||||
default:
|
||||
return fmt.Errorf("not subscribed to topic: %s", topicName)
|
||||
}
|
||||
}
|
||||
|
||||
// PublishBzzzMessage publishes a message to the Bzzz coordination topic
|
||||
func (p *PubSub) PublishBzzzMessage(msgType MessageType, data map[string]interface{}) error {
|
||||
payload := p.sanitizePayload(p.chorusTopicName, msgType, data)
|
||||
msg := Message{
|
||||
Type: msgType,
|
||||
From: p.host.ID().String(),
|
||||
Timestamp: time.Now(),
|
||||
Data: data,
|
||||
Data: payload,
|
||||
}
|
||||
|
||||
msgBytes, err := json.Marshal(msg)
|
||||
@@ -396,11 +453,12 @@ func (p *PubSub) PublishBzzzMessage(msgType MessageType, data map[string]interfa
|
||||
|
||||
// PublishHmmmMessage publishes a message to the HMMM meta-discussion topic
|
||||
func (p *PubSub) PublishHmmmMessage(msgType MessageType, data map[string]interface{}) error {
|
||||
payload := p.sanitizePayload(p.hmmmTopicName, msgType, data)
|
||||
msg := Message{
|
||||
Type: msgType,
|
||||
From: p.host.ID().String(),
|
||||
Timestamp: time.Now(),
|
||||
Data: data,
|
||||
Data: payload,
|
||||
}
|
||||
|
||||
msgBytes, err := json.Marshal(msg)
|
||||
@@ -425,11 +483,12 @@ func (p *PubSub) SetAntennaeMessageHandler(handler func(msg Message, from peer.I
|
||||
|
||||
// PublishContextFeedbackMessage publishes a message to the Context Feedback topic
|
||||
func (p *PubSub) PublishContextFeedbackMessage(msgType MessageType, data map[string]interface{}) error {
|
||||
payload := p.sanitizePayload(p.contextTopicName, msgType, data)
|
||||
msg := Message{
|
||||
Type: msgType,
|
||||
From: p.host.ID().String(),
|
||||
Timestamp: time.Now(),
|
||||
Data: data,
|
||||
Data: payload,
|
||||
}
|
||||
|
||||
msgBytes, err := json.Marshal(msg)
|
||||
@@ -442,11 +501,16 @@ func (p *PubSub) PublishContextFeedbackMessage(msgType MessageType, data map[str
|
||||
|
||||
// PublishRoleBasedMessage publishes a role-based collaboration message
|
||||
func (p *PubSub) PublishRoleBasedMessage(msgType MessageType, data map[string]interface{}, opts MessageOptions) error {
|
||||
topicName := p.chorusTopicName
|
||||
if isRoleMessage(msgType) {
|
||||
topicName = p.hmmmTopicName
|
||||
}
|
||||
payload := p.sanitizePayload(topicName, msgType, data)
|
||||
msg := Message{
|
||||
Type: msgType,
|
||||
From: p.host.ID().String(),
|
||||
Timestamp: time.Now(),
|
||||
Data: data,
|
||||
Data: payload,
|
||||
FromRole: opts.FromRole,
|
||||
ToRoles: opts.ToRoles,
|
||||
RequiredExpertise: opts.RequiredExpertise,
|
||||
@@ -462,10 +526,8 @@ func (p *PubSub) PublishRoleBasedMessage(msgType MessageType, data map[string]in
|
||||
|
||||
// Determine which topic to use based on message type
|
||||
var topic *pubsub.Topic
|
||||
switch msgType {
|
||||
case RoleAnnouncement, ExpertiseRequest, ExpertiseResponse, StatusUpdate,
|
||||
WorkAllocation, RoleCollaboration, MentorshipRequest, MentorshipResponse,
|
||||
ProjectUpdate, DeliverableReady:
|
||||
switch {
|
||||
case isRoleMessage(msgType):
|
||||
topic = p.hmmmTopic // Use HMMM topic for role-based messages
|
||||
default:
|
||||
topic = p.chorusTopic // Default to Bzzz topic
|
||||
@@ -492,14 +554,14 @@ func (p *PubSub) PublishSlurpContextUpdate(data map[string]interface{}) error {
|
||||
// PublishSlurpIntegrationEvent publishes a generic SLURP integration event
|
||||
func (p *PubSub) PublishSlurpIntegrationEvent(eventType string, discussionID string, slurpEvent map[string]interface{}) error {
|
||||
data := map[string]interface{}{
|
||||
"event_type": eventType,
|
||||
"discussion_id": discussionID,
|
||||
"slurp_event": slurpEvent,
|
||||
"timestamp": time.Now(),
|
||||
"source": "hmmm-slurp-integration",
|
||||
"peer_id": p.host.ID().String(),
|
||||
"event_type": eventType,
|
||||
"discussion_id": discussionID,
|
||||
"slurp_event": slurpEvent,
|
||||
"timestamp": time.Now(),
|
||||
"source": "hmmm-slurp-integration",
|
||||
"peer_id": p.host.ID().String(),
|
||||
}
|
||||
|
||||
|
||||
return p.PublishSlurpEventGenerated(data)
|
||||
}
|
||||
|
||||
@@ -604,15 +666,23 @@ func (p *PubSub) handleContextFeedbackMessages() {
|
||||
}
|
||||
}
|
||||
|
||||
// getDynamicHandler returns the raw handler for a topic if registered.
|
||||
func (p *PubSub) getDynamicHandler(topicName string) func([]byte, peer.ID) {
|
||||
p.dynamicHandlersMux.RLock()
|
||||
handler := p.dynamicHandlers[topicName]
|
||||
p.dynamicHandlersMux.RUnlock()
|
||||
return handler
|
||||
}
|
||||
|
||||
// handleDynamicMessages processes messages from a dynamic topic subscription
|
||||
func (p *PubSub) handleDynamicMessages(sub *pubsub.Subscription) {
|
||||
func (p *PubSub) handleDynamicMessages(topicName string, sub *pubsub.Subscription) {
|
||||
for {
|
||||
msg, err := sub.Next(p.ctx)
|
||||
if err != nil {
|
||||
if p.ctx.Err() != nil || err.Error() == "subscription cancelled" {
|
||||
return // Subscription was cancelled, exit handler
|
||||
}
|
||||
fmt.Printf("❌ Error receiving dynamic message: %v\n", err)
|
||||
fmt.Printf("❌ Error receiving dynamic message on %s: %v\n", topicName, err)
|
||||
continue
|
||||
}
|
||||
|
||||
@@ -620,13 +690,18 @@ func (p *PubSub) handleDynamicMessages(sub *pubsub.Subscription) {
|
||||
continue
|
||||
}
|
||||
|
||||
var dynamicMsg Message
|
||||
if err := json.Unmarshal(msg.Data, &dynamicMsg); err != nil {
|
||||
fmt.Printf("❌ Failed to unmarshal dynamic message: %v\n", err)
|
||||
if handler := p.getDynamicHandler(topicName); handler != nil {
|
||||
handler(msg.Data, msg.ReceivedFrom)
|
||||
continue
|
||||
}
|
||||
|
||||
// Use the main HMMM handler for all dynamic messages
|
||||
var dynamicMsg Message
|
||||
if err := json.Unmarshal(msg.Data, &dynamicMsg); err != nil {
|
||||
fmt.Printf("❌ Failed to unmarshal dynamic message on %s: %v\n", topicName, err)
|
||||
continue
|
||||
}
|
||||
|
||||
// Use the main HMMM handler for all dynamic messages without custom handlers
|
||||
if p.HmmmMessageHandler != nil {
|
||||
p.HmmmMessageHandler(dynamicMsg, msg.ReceivedFrom)
|
||||
}
|
||||
@@ -636,7 +711,7 @@ func (p *PubSub) handleDynamicMessages(sub *pubsub.Subscription) {
|
||||
// processBzzzMessage handles different types of Bzzz coordination messages
|
||||
func (p *PubSub) processBzzzMessage(msg Message, from peer.ID) {
|
||||
fmt.Printf("🐝 Bzzz [%s] from %s: %v\n", msg.Type, from.ShortString(), msg.Data)
|
||||
|
||||
|
||||
// Log to hypercore if logger is available
|
||||
if p.hypercoreLog != nil {
|
||||
logData := map[string]interface{}{
|
||||
@@ -647,7 +722,7 @@ func (p *PubSub) processBzzzMessage(msg Message, from peer.ID) {
|
||||
"data": msg.Data,
|
||||
"topic": "CHORUS",
|
||||
}
|
||||
|
||||
|
||||
// Map pubsub message types to hypercore log types
|
||||
var logType string
|
||||
switch msg.Type {
|
||||
@@ -666,7 +741,7 @@ func (p *PubSub) processBzzzMessage(msg Message, from peer.ID) {
|
||||
default:
|
||||
logType = "network_event"
|
||||
}
|
||||
|
||||
|
||||
if err := p.hypercoreLog.AppendString(logType, logData); err != nil {
|
||||
fmt.Printf("❌ Failed to log Bzzz message to hypercore: %v\n", err)
|
||||
}
|
||||
@@ -675,9 +750,9 @@ func (p *PubSub) processBzzzMessage(msg Message, from peer.ID) {
|
||||
|
||||
// processHmmmMessage provides default handling for HMMM messages if no external handler is set
|
||||
func (p *PubSub) processHmmmMessage(msg Message, from peer.ID) {
|
||||
fmt.Printf("🎯 Default HMMM Handler [%s] from %s: %v\n",
|
||||
fmt.Printf("🎯 Default HMMM Handler [%s] from %s: %v\n",
|
||||
msg.Type, from.ShortString(), msg.Data)
|
||||
|
||||
|
||||
// Log to hypercore if logger is available
|
||||
if p.hypercoreLog != nil {
|
||||
logData := map[string]interface{}{
|
||||
@@ -694,7 +769,7 @@ func (p *PubSub) processHmmmMessage(msg Message, from peer.ID) {
|
||||
"priority": msg.Priority,
|
||||
"thread_id": msg.ThreadID,
|
||||
}
|
||||
|
||||
|
||||
// Map pubsub message types to hypercore log types
|
||||
var logType string
|
||||
switch msg.Type {
|
||||
@@ -717,7 +792,7 @@ func (p *PubSub) processHmmmMessage(msg Message, from peer.ID) {
|
||||
default:
|
||||
logType = "collaboration"
|
||||
}
|
||||
|
||||
|
||||
if err := p.hypercoreLog.AppendString(logType, logData); err != nil {
|
||||
fmt.Printf("❌ Failed to log HMMM message to hypercore: %v\n", err)
|
||||
}
|
||||
@@ -726,25 +801,25 @@ func (p *PubSub) processHmmmMessage(msg Message, from peer.ID) {
|
||||
|
||||
// processContextFeedbackMessage provides default handling for context feedback messages if no external handler is set
|
||||
func (p *PubSub) processContextFeedbackMessage(msg Message, from peer.ID) {
|
||||
fmt.Printf("🧠 Context Feedback [%s] from %s: %v\n",
|
||||
fmt.Printf("🧠 Context Feedback [%s] from %s: %v\n",
|
||||
msg.Type, from.ShortString(), msg.Data)
|
||||
|
||||
|
||||
// Log to hypercore if logger is available
|
||||
if p.hypercoreLog != nil {
|
||||
logData := map[string]interface{}{
|
||||
"message_type": string(msg.Type),
|
||||
"from_peer": from.String(),
|
||||
"from_short": from.ShortString(),
|
||||
"timestamp": msg.Timestamp,
|
||||
"data": msg.Data,
|
||||
"topic": "context_feedback",
|
||||
"from_role": msg.FromRole,
|
||||
"to_roles": msg.ToRoles,
|
||||
"project_id": msg.ProjectID,
|
||||
"priority": msg.Priority,
|
||||
"thread_id": msg.ThreadID,
|
||||
"message_type": string(msg.Type),
|
||||
"from_peer": from.String(),
|
||||
"from_short": from.ShortString(),
|
||||
"timestamp": msg.Timestamp,
|
||||
"data": msg.Data,
|
||||
"topic": "context_feedback",
|
||||
"from_role": msg.FromRole,
|
||||
"to_roles": msg.ToRoles,
|
||||
"project_id": msg.ProjectID,
|
||||
"priority": msg.Priority,
|
||||
"thread_id": msg.ThreadID,
|
||||
}
|
||||
|
||||
|
||||
// Map context feedback message types to hypercore log types
|
||||
var logType string
|
||||
switch msg.Type {
|
||||
@@ -757,17 +832,79 @@ func (p *PubSub) processContextFeedbackMessage(msg Message, from peer.ID) {
|
||||
default:
|
||||
logType = "context_feedback"
|
||||
}
|
||||
|
||||
|
||||
if err := p.hypercoreLog.AppendString(logType, logData); err != nil {
|
||||
fmt.Printf("❌ Failed to log Context Feedback message to hypercore: %v\n", err)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (p *PubSub) sanitizePayload(topic string, msgType MessageType, data map[string]interface{}) map[string]interface{} {
|
||||
if data == nil {
|
||||
return nil
|
||||
}
|
||||
cloned := clonePayloadMap(data)
|
||||
p.redactorMux.RLock()
|
||||
redactor := p.redactor
|
||||
p.redactorMux.RUnlock()
|
||||
if redactor != nil {
|
||||
labels := map[string]string{
|
||||
"source": "pubsub",
|
||||
"topic": topic,
|
||||
"message_type": string(msgType),
|
||||
}
|
||||
redactor.RedactMapWithLabels(context.Background(), cloned, labels)
|
||||
}
|
||||
return cloned
|
||||
}
|
||||
|
||||
func isRoleMessage(msgType MessageType) bool {
|
||||
switch msgType {
|
||||
case RoleAnnouncement, ExpertiseRequest, ExpertiseResponse, StatusUpdate,
|
||||
WorkAllocation, RoleCollaboration, MentorshipRequest, MentorshipResponse,
|
||||
ProjectUpdate, DeliverableReady:
|
||||
return true
|
||||
default:
|
||||
return false
|
||||
}
|
||||
}
|
||||
|
||||
func clonePayloadMap(in map[string]interface{}) map[string]interface{} {
|
||||
if in == nil {
|
||||
return nil
|
||||
}
|
||||
out := make(map[string]interface{}, len(in))
|
||||
for k, v := range in {
|
||||
out[k] = clonePayloadValue(v)
|
||||
}
|
||||
return out
|
||||
}
|
||||
|
||||
func clonePayloadValue(v interface{}) interface{} {
|
||||
switch tv := v.(type) {
|
||||
case map[string]interface{}:
|
||||
return clonePayloadMap(tv)
|
||||
case []interface{}:
|
||||
return clonePayloadSlice(tv)
|
||||
case []string:
|
||||
return append([]string(nil), tv...)
|
||||
default:
|
||||
return tv
|
||||
}
|
||||
}
|
||||
|
||||
func clonePayloadSlice(in []interface{}) []interface{} {
|
||||
out := make([]interface{}, len(in))
|
||||
for i, val := range in {
|
||||
out[i] = clonePayloadValue(val)
|
||||
}
|
||||
return out
|
||||
}
|
||||
|
||||
// Close shuts down the PubSub instance
|
||||
func (p *PubSub) Close() error {
|
||||
p.cancel()
|
||||
|
||||
|
||||
if p.chorusSub != nil {
|
||||
p.chorusSub.Cancel()
|
||||
}
|
||||
@@ -777,7 +914,7 @@ func (p *PubSub) Close() error {
|
||||
if p.contextSub != nil {
|
||||
p.contextSub.Cancel()
|
||||
}
|
||||
|
||||
|
||||
if p.chorusTopic != nil {
|
||||
p.chorusTopic.Close()
|
||||
}
|
||||
@@ -787,7 +924,13 @@ func (p *PubSub) Close() error {
|
||||
if p.contextTopic != nil {
|
||||
p.contextTopic.Close()
|
||||
}
|
||||
|
||||
|
||||
p.dynamicSubsMux.Lock()
|
||||
for _, sub := range p.dynamicSubs {
|
||||
sub.Cancel()
|
||||
}
|
||||
p.dynamicSubsMux.Unlock()
|
||||
|
||||
p.dynamicTopicsMux.Lock()
|
||||
for _, topic := range p.dynamicTopics {
|
||||
topic.Close()
|
||||
|
||||
Reference in New Issue
Block a user