# PubSub Package ## Overview The `pubsub` package provides a libp2p GossipSub-based publish/subscribe messaging infrastructure for CHORUS. It enables distributed coordination through multiple topic types, supporting task coordination (Bzzz), meta-discussion (HMMM), context feedback (RL learning), and role-based collaboration across the autonomous agent network. **Package Path:** `/home/tony/chorus/project-queues/active/CHORUS/pubsub/` **Key Features:** - Three static topics (Bzzz coordination, HMMM meta-discussion, Context feedback) - Dynamic per-task, per-issue, and per-project topic management - Role-based topic routing (roles, expertise, reporting hierarchy) - 31+ message types for different coordination scenarios - SHHH redaction integration for sensitive data - Hypercore logging integration for event persistence - Raw message publication for custom schemas - HMMM adapter for per-issue room communication ## Architecture ### Core Components ``` PubSub ├── Static Topics │ ├── chorusTopic - "CHORUS/coordination/v1" (Bzzz task coordination) │ ├── hmmmTopic - "hmmm/meta-discussion/v1" (HMMM meta-discussion) │ └── contextTopic - "CHORUS/context-feedback/v1" (RL context feedback) ├── Dynamic Topics │ ├── dynamicTopics - map[string]*pubsub.Topic │ ├── dynamicSubs - map[string]*pubsub.Subscription │ └── dynamicHandlers - map[string]func([]byte, peer.ID) ├── Message Handlers │ ├── HmmmMessageHandler - External HMMM handler │ └── ContextFeedbackHandler - External context handler ├── Integration │ ├── hypercoreLog - HypercoreLogger for event persistence │ └── redactor - *shhh.Sentinel for message sanitization └── Adapters └── GossipPublisher - HMMM adapter for per-issue topics ``` ## Message Types ### Bzzz Coordination Messages (6 types) Task coordination and agent availability messages published to `CHORUS/coordination/v1`: | Message Type | Purpose | Usage | |--------------|---------|-------| | `TaskAnnouncement` | New task available for claiming | Broadcast when task created | | `TaskClaim` | Agent claims a task | Response to TaskAnnouncement | | `TaskProgress` | Task progress update | Periodic updates during execution | | `TaskComplete` | Task completed successfully | Final status notification | | `CapabilityBcast` | Agent capability announcement | Broadcast when capabilities change | | `AvailabilityBcast` | Agent availability status | Regular heartbeat (30s intervals) | ### HMMM Meta-Discussion Messages (7 types) Agent-to-agent meta-discussion published to `hmmm/meta-discussion/v1`: | Message Type | Purpose | Usage | |--------------|---------|-------| | `MetaDiscussion` | Generic discussion message | General coordination discussion | | `TaskHelpRequest` | Request assistance from peers | When agent needs help | | `TaskHelpResponse` | Response to help request | Offer assistance | | `CoordinationRequest` | Request coordination session | Multi-agent coordination | | `CoordinationComplete` | Coordination session finished | Session completion | | `DependencyAlert` | Dependency detected | Alert about task dependencies | | `EscalationTrigger` | Human escalation needed | Critical issues requiring human | ### Role-Based Collaboration Messages (10 types) Role-based collaboration published to `hmmm/meta-discussion/v1`: | Message Type | Purpose | Usage | |--------------|---------|-------| | `RoleAnnouncement` | Agent announces role/capabilities | Agent startup | | `ExpertiseRequest` | Request specific expertise | Need domain knowledge | | `ExpertiseResponse` | Offer expertise | Response to request | | `StatusUpdate` | Regular status updates | Periodic role status | | `WorkAllocation` | Allocate work to roles | Task distribution | | `RoleCollaboration` | Cross-role collaboration | Multi-role coordination | | `MentorshipRequest` | Junior seeks mentorship | Learning assistance | | `MentorshipResponse` | Senior provides mentorship | Teaching response | | `ProjectUpdate` | Project-level status | Project progress | | `DeliverableReady` | Deliverable complete | Work product ready | ### Context Feedback Messages (5 types) RL Context Curator feedback published to `CHORUS/context-feedback/v1`: | Message Type | Purpose | Usage | |--------------|---------|-------| | `FeedbackEvent` | Context feedback for RL | Reinforcement learning signals | | `ContextRequest` | Request context from HCFS | Query context system | | `ContextResponse` | Context data response | HCFS response | | `ContextUsage` | Context usage patterns | Usage metrics | | `ContextRelevance` | Context relevance scoring | Relevance feedback | ### SLURP Event Integration Messages (3 types) HMMM-SLURP integration published to `hmmm/meta-discussion/v1`: | Message Type | Purpose | Usage | |--------------|---------|-------| | `SlurpEventGenerated` | HMMM consensus generated event | SLURP event creation | | `SlurpEventAck` | Acknowledge SLURP event | Receipt confirmation | | `SlurpContextUpdate` | Context update from SLURP | SLURP context sync | ## Topic Naming Conventions ### Static Topics ``` CHORUS/coordination/v1 - Bzzz task coordination hmmm/meta-discussion/v1 - HMMM meta-discussion CHORUS/context-feedback/v1 - Context feedback (RL) ``` ### Dynamic Topic Patterns ``` CHORUS/roles//v1 - Role-specific (e.g., "developer", "architect") CHORUS/expertise//v1 - Expertise-specific (e.g., "golang", "kubernetes") CHORUS/hierarchy//v1 - Reporting hierarchy CHORUS/projects//coordination/v1 - Project-specific CHORUS/meta/issue/ - Per-issue HMMM rooms (custom schema) - Any custom topic for specialized needs ``` ### Topic Naming Rules 1. Use lowercase with underscores for multi-word identifiers 2. Version suffix `/v1` for future compatibility 3. Prefix with `CHORUS/` for CHORUS-specific topics 4. Prefix with `hmmm/` for HMMM-specific topics 5. Use hierarchical structure for discoverability ## Message Format ### Standard CHORUS Message Envelope ```go type Message struct { Type MessageType `json:"type"` // Message type constant From string `json:"from"` // Peer ID of sender Timestamp time.Time `json:"timestamp"` // Message timestamp Data map[string]interface{} `json:"data"` // Message payload HopCount int `json:"hop_count,omitempty"` // Antennae hop limiting // Role-based collaboration fields FromRole string `json:"from_role,omitempty"` // Role of sender ToRoles []string `json:"to_roles,omitempty"` // Target roles RequiredExpertise []string `json:"required_expertise,omitempty"` // Required expertise ProjectID string `json:"project_id,omitempty"` // Associated project Priority string `json:"priority,omitempty"` // low, medium, high, urgent ThreadID string `json:"thread_id,omitempty"` // Conversation thread } ``` ### Message Publishing Messages are automatically wrapped in the standard envelope when using: - `PublishBzzzMessage()` - `PublishHmmmMessage()` - `PublishContextFeedbackMessage()` - `PublishToDynamicTopic()` - `PublishRoleBasedMessage()` For custom schemas (e.g., HMMM per-issue rooms), use `PublishRaw()` to bypass the envelope. ## GossipSub Configuration ### Validation and Security ```go pubsub.NewGossipSub(ctx, h, pubsub.WithMessageSigning(true), // Sign all messages pubsub.WithStrictSignatureVerification(true), // Verify signatures pubsub.WithValidateQueueSize(256), // Validation queue size pubsub.WithValidateThrottle(1024), // Validation throughput ) ``` ### Security Features - **Message Signing:** All messages cryptographically signed by sender - **Signature Verification:** Strict verification prevents impersonation - **SHHH Redaction:** Automatic sanitization of sensitive data before publication - **Validation Queue:** 256 messages buffered for validation - **Validation Throttle:** Process up to 1024 validations concurrently ### Network Properties - **Protocol:** libp2p GossipSub (epidemic broadcast) - **Delivery:** Best-effort, eventually consistent - **Ordering:** No guaranteed message ordering - **Reliability:** At-most-once delivery (use ACK patterns for reliability) ## API Reference ### Initialization #### NewPubSub ```go func NewPubSub(ctx context.Context, h host.Host, chorusTopic, hmmmTopic string) (*PubSub, error) ``` Creates a new PubSub instance with static topics. **Parameters:** - `ctx` - Context for lifecycle management - `h` - libp2p Host instance - `chorusTopic` - Bzzz coordination topic (default: "CHORUS/coordination/v1") - `hmmmTopic` - HMMM meta-discussion topic (default: "hmmm/meta-discussion/v1") **Returns:** PubSub instance or error **Example:** ```go ps, err := pubsub.NewPubSub(ctx, node.Host(), "CHORUS/coordination/v1", "hmmm/meta-discussion/v1") if err != nil { log.Fatal(err) } defer ps.Close() ``` #### NewPubSubWithLogger ```go func NewPubSubWithLogger(ctx context.Context, h host.Host, chorusTopic, hmmmTopic string, logger HypercoreLogger) (*PubSub, error) ``` Creates PubSub with hypercore logging integration. **Parameters:** - Same as NewPubSub, plus: - `logger` - HypercoreLogger implementation for event persistence **Example:** ```go ps, err := pubsub.NewPubSubWithLogger(ctx, node.Host(), "chorus/coordination/v1", "hmmm/meta-discussion/v1", hlog) ``` ### Static Topic Publishing #### PublishBzzzMessage ```go func (p *PubSub) PublishBzzzMessage(msgType MessageType, data map[string]interface{}) error ``` Publishes to Bzzz coordination topic (`CHORUS/coordination/v1`). **Parameters:** - `msgType` - One of: TaskAnnouncement, TaskClaim, TaskProgress, TaskComplete, CapabilityBcast, AvailabilityBcast - `data` - Message payload (automatically redacted if SHHH configured) **Example:** ```go err := ps.PublishBzzzMessage(pubsub.TaskAnnouncement, map[string]interface{}{ "task_id": "task-123", "description": "Deploy service to production", "capabilities": []string{"deployment", "kubernetes"}, }) ``` #### PublishHmmmMessage ```go func (p *PubSub) PublishHmmmMessage(msgType MessageType, data map[string]interface{}) error ``` Publishes to HMMM meta-discussion topic (`hmmm/meta-discussion/v1`). **Parameters:** - `msgType` - One of: MetaDiscussion, TaskHelpRequest, TaskHelpResponse, CoordinationRequest, etc. - `data` - Message payload **Example:** ```go err := ps.PublishHmmmMessage(pubsub.TaskHelpRequest, map[string]interface{}{ "task_id": "task-456", "help_needed": "Need expertise in Go concurrency patterns", "urgency": "medium", }) ``` #### PublishContextFeedbackMessage ```go func (p *PubSub) PublishContextFeedbackMessage(msgType MessageType, data map[string]interface{}) error ``` Publishes to Context feedback topic (`CHORUS/context-feedback/v1`). **Parameters:** - `msgType` - One of: FeedbackEvent, ContextRequest, ContextResponse, ContextUsage, ContextRelevance - `data` - Feedback payload **Example:** ```go err := ps.PublishContextFeedbackMessage(pubsub.FeedbackEvent, map[string]interface{}{ "context_path": "/project/docs/api.md", "relevance_score": 0.95, "usage_count": 12, }) ``` ### Dynamic Topic Management #### JoinDynamicTopic ```go func (p *PubSub) JoinDynamicTopic(topicName string) error ``` Joins a dynamic topic and subscribes to messages. **Parameters:** - `topicName` - Topic to join (idempotent) **Returns:** error if join fails **Example:** ```go err := ps.JoinDynamicTopic("CHORUS/projects/my-project/coordination/v1") ``` #### LeaveDynamicTopic ```go func (p *PubSub) LeaveDynamicTopic(topicName string) ``` Leaves a dynamic topic and cancels subscription. **Parameters:** - `topicName` - Topic to leave **Example:** ```go ps.LeaveDynamicTopic("CHORUS/projects/my-project/coordination/v1") ``` #### PublishToDynamicTopic ```go func (p *PubSub) PublishToDynamicTopic(topicName string, msgType MessageType, data map[string]interface{}) error ``` Publishes message to a dynamic topic (must be joined first). **Parameters:** - `topicName` - Target topic (must be joined) - `msgType` - Message type - `data` - Message payload **Returns:** error if not subscribed or publish fails **Example:** ```go err := ps.PublishToDynamicTopic("CHORUS/projects/my-project/coordination/v1", pubsub.StatusUpdate, map[string]interface{}{ "status": "in_progress", "completion": 0.45, }) ``` ### Role-Based Topics #### JoinRoleBasedTopics ```go func (p *PubSub) JoinRoleBasedTopics(role string, expertise []string, reportsTo []string) error ``` Joins topics based on role configuration. **Parameters:** - `role` - Agent role (e.g., "Developer", "Architect") - `expertise` - Expertise areas (e.g., ["golang", "kubernetes"]) - `reportsTo` - Reporting hierarchy (supervisor roles) **Topics Joined:** - `CHORUS/roles//v1` - `CHORUS/expertise//v1` (for each expertise) - `CHORUS/hierarchy//v1` (for each supervisor) **Example:** ```go err := ps.JoinRoleBasedTopics( "Senior Developer", []string{"golang", "distributed_systems", "kubernetes"}, []string{"Tech Lead", "Engineering Manager"}, ) ``` #### PublishRoleBasedMessage ```go func (p *PubSub) PublishRoleBasedMessage(msgType MessageType, data map[string]interface{}, opts MessageOptions) error ``` Publishes role-based collaboration message with routing metadata. **Parameters:** - `msgType` - One of the role-based message types - `data` - Message payload - `opts` - MessageOptions with routing metadata **Example:** ```go err := ps.PublishRoleBasedMessage(pubsub.ExpertiseRequest, map[string]interface{}{ "question": "How to handle distributed transactions?", "context": "Microservices architecture", }, pubsub.MessageOptions{ FromRole: "Junior Developer", ToRoles: []string{"Senior Developer", "Architect"}, RequiredExpertise: []string{"distributed_systems", "golang"}, ProjectID: "project-789", Priority: "high", ThreadID: "thread-abc", }) ``` ### Project Topics #### JoinProjectTopic ```go func (p *PubSub) JoinProjectTopic(projectID string) error ``` Joins project-specific coordination topic. **Parameters:** - `projectID` - Project identifier **Topic:** `CHORUS/projects//coordination/v1` **Example:** ```go err := ps.JoinProjectTopic("chorus-deployment-2025") ``` ### Raw Message Publication #### PublishRaw ```go func (p *PubSub) PublishRaw(topicName string, payload []byte) error ``` Publishes raw JSON payload without CHORUS message envelope. Used for custom schemas (e.g., HMMM per-issue rooms). **Parameters:** - `topicName` - Target topic (static or dynamic) - `payload` - Raw JSON bytes **Returns:** error if not subscribed **Example:** ```go // Custom HMMM message format hmmmMsg := map[string]interface{}{ "type": "issue_discussion", "issue_id": 42, "message": "Need review on API design", } payload, _ := json.Marshal(hmmmMsg) err := ps.PublishRaw("CHORUS/meta/issue/42", payload) ``` #### SubscribeRawTopic ```go func (p *PubSub) SubscribeRawTopic(topicName string, handler func([]byte, peer.ID)) error ``` Subscribes to topic with raw message handler (bypasses CHORUS envelope parsing). **Parameters:** - `topicName` - Topic to subscribe - `handler` - Function receiving raw payload and sender peer ID **Example:** ```go err := ps.SubscribeRawTopic("CHORUS/meta/issue/42", func(payload []byte, from peer.ID) { var msg map[string]interface{} json.Unmarshal(payload, &msg) fmt.Printf("Raw message from %s: %v\n", from.ShortString(), msg) }) ``` ### SLURP Integration #### PublishSlurpEventGenerated ```go func (p *PubSub) PublishSlurpEventGenerated(data map[string]interface{}) error ``` Publishes SLURP event generation notification. **Example:** ```go err := ps.PublishSlurpEventGenerated(map[string]interface{}{ "event_id": "evt-123", "event_type": "deployment", "discussion_id": "disc-456", "consensus": true, }) ``` #### PublishSlurpEventAck ```go func (p *PubSub) PublishSlurpEventAck(data map[string]interface{}) error ``` Acknowledges receipt of SLURP event. #### PublishSlurpContextUpdate ```go func (p *PubSub) PublishSlurpContextUpdate(data map[string]interface{}) error ``` Publishes context update from SLURP system. ### Message Handler Configuration #### SetHmmmMessageHandler ```go func (p *PubSub) SetHmmmMessageHandler(handler func(msg Message, from peer.ID)) ``` Sets external handler for HMMM messages. Overrides default logging-only handler. **Parameters:** - `handler` - Function receiving parsed Message and sender peer ID **Example:** ```go ps.SetHmmmMessageHandler(func(msg Message, from peer.ID) { fmt.Printf("HMMM [%s] from %s: %v\n", msg.Type, from.ShortString(), msg.Data) // Custom processing logic }) ``` #### SetContextFeedbackHandler ```go func (p *PubSub) SetContextFeedbackHandler(handler func(msg Message, from peer.ID)) ``` Sets external handler for context feedback messages. **Example:** ```go ps.SetContextFeedbackHandler(func(msg Message, from peer.ID) { if msg.Type == pubsub.FeedbackEvent { // Process RL feedback } }) ``` ### Integration #### SetRedactor ```go func (p *PubSub) SetRedactor(redactor *shhh.Sentinel) ``` Wires SHHH sentinel for automatic message sanitization before publication. **Parameters:** - `redactor` - SHHH Sentinel instance **Example:** ```go sentinel := shhh.NewSentinel(ctx, config) ps.SetRedactor(sentinel) // All subsequent publications automatically redacted ``` #### GetHypercoreLog ```go func (p *PubSub) GetHypercoreLog() HypercoreLogger ``` Returns configured hypercore logger for external access. **Returns:** HypercoreLogger instance or nil ### Lifecycle #### Close ```go func (p *PubSub) Close() error ``` Shuts down PubSub, cancels all subscriptions, and closes all topics. **Example:** ```go defer ps.Close() ``` ## HMMM Adapter ### GossipPublisher The `GossipPublisher` adapter bridges HMMM's per-issue room system with CHORUS pubsub. #### NewGossipPublisher ```go func NewGossipPublisher(ps *PubSub) *GossipPublisher ``` Creates HMMM adapter wrapping PubSub instance. **Parameters:** - `ps` - PubSub instance **Returns:** GossipPublisher adapter #### Publish ```go func (g *GossipPublisher) Publish(ctx context.Context, topic string, payload []byte) error ``` Ensures agent is subscribed to per-issue topic and publishes raw payload. **Parameters:** - `ctx` - Context - `topic` - Per-issue topic (e.g., "CHORUS/meta/issue/42") - `payload` - Raw JSON message (HMMM schema) **Behavior:** 1. Joins dynamic topic (idempotent) 2. Publishes raw payload (bypasses CHORUS envelope) **Example:** ```go adapter := pubsub.NewGossipPublisher(ps) err := adapter.Publish(ctx, "CHORUS/meta/issue/42", hmmmPayload) ``` ## Subscription Patterns ### Static Topic Subscription Static topics are automatically subscribed during `NewPubSub()`: - `CHORUS/coordination/v1` - Bzzz messages - `hmmm/meta-discussion/v1` - HMMM messages - `CHORUS/context-feedback/v1` - Context feedback Messages handled by: - `handleBzzzMessages()` - Processes Bzzz coordination - `handleHmmmMessages()` - Processes HMMM (delegates to external handler if set) - `handleContextFeedbackMessages()` - Processes context feedback ### Dynamic Topic Subscription Dynamic topics require explicit join: ```go // Task-specific topic ps.JoinDynamicTopic("CHORUS/tasks/task-123/v1") // Project-specific topic ps.JoinProjectTopic("project-456") // Role-based topics ps.JoinRoleBasedTopics("Developer", []string{"golang"}, []string{"Tech Lead"}) // Custom raw handler ps.SubscribeRawTopic("CHORUS/meta/issue/789", func(payload []byte, from peer.ID) { // Custom processing }) ``` ### Message Filtering Agents automatically filter out their own messages: ```go if msg.ReceivedFrom == p.host.ID() { continue // Ignore own messages } ``` ### Role-Based Routing Messages with role metadata are automatically routed to appropriate handlers: ```go if msg.FromRole != "" && len(msg.ToRoles) > 0 { // Check if this agent's role matches target roles if containsRole(myRole, msg.ToRoles) { // Process message } } ``` ## Hypercore Logging Integration ### Log Mapping PubSub messages are automatically logged to Hypercore with appropriate log types: | Message Type | Hypercore Log Type | Topic | |--------------|-------------------|-------| | TaskAnnouncement | task_announced | CHORUS | | TaskClaim | task_claimed | CHORUS | | TaskProgress | task_progress | CHORUS | | TaskComplete | task_completed | CHORUS | | CapabilityBcast | capability_broadcast | CHORUS | | AvailabilityBcast | network_event | CHORUS | | MetaDiscussion | collaboration | hmmm | | TaskHelpRequest | collaboration | hmmm | | EscalationTrigger | escalation | hmmm | | Role messages | collaboration | hmmm | | FeedbackEvent | context_feedback | context_feedback | | ContextRequest | context_request | context_feedback | ### Log Data Format ```go logData := map[string]interface{}{ "message_type": string(msg.Type), "from_peer": from.String(), "from_short": from.ShortString(), "timestamp": msg.Timestamp, "data": msg.Data, "topic": "CHORUS", "from_role": msg.FromRole, "to_roles": msg.ToRoles, "required_expertise": msg.RequiredExpertise, "project_id": msg.ProjectID, "priority": msg.Priority, "thread_id": msg.ThreadID, } ``` ## SHHH Redaction Integration ### Automatic Sanitization All outbound messages are sanitized if redactor is configured: ```go ps.SetRedactor(sentinel) ``` ### Redaction Process 1. Payload is cloned (deep copy) 2. Redactor scans for sensitive patterns 3. Sensitive data is redacted/masked 4. Sanitized payload is published ### Redaction Labels ```go labels := map[string]string{ "source": "pubsub", "topic": topicName, "message_type": string(msgType), } sentinel.RedactMapWithLabels(ctx, payload, labels) ``` ## Usage Examples ### Basic Task Coordination ```go // Initialize PubSub ps, err := pubsub.NewPubSub(ctx, node.Host(), "", "") if err != nil { log.Fatal(err) } defer ps.Close() // Announce task ps.PublishBzzzMessage(pubsub.TaskAnnouncement, map[string]interface{}{ "task_id": "task-123", "description": "Deploy service", "capabilities": []string{"deployment"}, }) // Claim task ps.PublishBzzzMessage(pubsub.TaskClaim, map[string]interface{}{ "task_id": "task-123", "agent_id": ps.Host().ID().String(), }) // Report progress ps.PublishBzzzMessage(pubsub.TaskProgress, map[string]interface{}{ "task_id": "task-123", "progress": 0.50, "status": "deploying", }) // Mark complete ps.PublishBzzzMessage(pubsub.TaskComplete, map[string]interface{}{ "task_id": "task-123", "result": "success", "output": "Service deployed to production", }) ``` ### Role-Based Collaboration ```go // Join role-based topics ps.JoinRoleBasedTopics("Senior Developer", []string{"golang", "kubernetes"}, []string{"Tech Lead"}) // Request expertise ps.PublishRoleBasedMessage(pubsub.ExpertiseRequest, map[string]interface{}{ "question": "How to implement distributed tracing?", "context": "Microservices deployment", }, pubsub.MessageOptions{ FromRole: "Junior Developer", ToRoles: []string{"Senior Developer", "Architect"}, RequiredExpertise: []string{"distributed_systems"}, Priority: "medium", }) // Respond with expertise ps.PublishRoleBasedMessage(pubsub.ExpertiseResponse, map[string]interface{}{ "answer": "Use OpenTelemetry with Jaeger backend", "resources": []string{"https://opentelemetry.io/docs"}, }, pubsub.MessageOptions{ FromRole: "Senior Developer", ThreadID: "thread-123", }) ``` ### HMMM Per-Issue Rooms ```go // Create HMMM adapter adapter := pubsub.NewGossipPublisher(ps) // Publish to per-issue room issueID := 42 topic := fmt.Sprintf("CHORUS/meta/issue/%d", issueID) message := map[string]interface{}{ "type": "discussion", "message": "API design looks good, approved", "issue_id": issueID, } payload, _ := json.Marshal(message) adapter.Publish(ctx, topic, payload) // Subscribe with custom handler ps.SubscribeRawTopic(topic, func(payload []byte, from peer.ID) { var msg map[string]interface{} json.Unmarshal(payload, &msg) fmt.Printf("Issue #%d message: %s\n", issueID, msg["message"]) }) ``` ### Project Coordination ```go // Join project topic projectID := "chorus-deployment-2025" ps.JoinProjectTopic(projectID) // Send project update ps.PublishToDynamicTopic( fmt.Sprintf("CHORUS/projects/%s/coordination/v1", projectID), pubsub.ProjectUpdate, map[string]interface{}{ "project_id": projectID, "phase": "testing", "completion": 0.75, "blockers": []string{}, }) ``` ### Context Feedback for RL ```go // Report context usage ps.PublishContextFeedbackMessage(pubsub.ContextUsage, map[string]interface{}{ "context_path": "/project/docs/architecture.md", "usage_count": 5, "query": "How does the authentication system work?", }) // Report relevance ps.PublishContextFeedbackMessage(pubsub.ContextRelevance, map[string]interface{}{ "context_path": "/project/docs/architecture.md", "relevance_score": 0.92, "query": "authentication flow", }) ``` ## Best Practices ### Topic Management 1. **Use Static Topics for Global Coordination** - Bzzz: Task announcements, claims, completion - HMMM: General meta-discussion, help requests - Context: RL feedback, context queries 2. **Use Dynamic Topics for Scoped Coordination** - Project-specific: Per-project coordination - Task-specific: Multi-agent task coordination - Issue-specific: HMMM per-issue rooms 3. **Use Role Topics for Targeted Messages** - Expertise requests to specific roles - Hierarchical escalation - Skill-based routing ### Message Design 1. **Include Sufficient Context** - Always include identifiers (task_id, project_id, etc.) - Timestamp messages appropriately - Use thread_id for conversation threading 2. **Use Appropriate Priority** - `urgent`: Immediate attention required - `high`: Important, handle soon - `medium`: Normal priority - `low`: Background, handle when available 3. **Design for Idempotency** - Assume messages may be received multiple times - Use unique identifiers for deduplication - Design state transitions to be idempotent ### Performance 1. **Topic Cleanup** - Leave dynamic topics when no longer needed - Prevents memory leaks and wasted bandwidth 2. **Message Size** - Keep payloads compact - Avoid large binary data in messages - Use content-addressed storage for large data 3. **Rate Limiting** - Don't spam availability broadcasts (30s intervals) - Batch related messages when possible - Use project topics to reduce global traffic ### Security 1. **Always Configure SHHH** - Set redactor before publishing sensitive data - Use labels for audit trails - Validate redaction in tests 2. **Validate Message Sources** - Check peer identity for sensitive operations - Use thread_id for conversation integrity - Implement ACLs for privileged operations 3. **Never Trust Message Content** - Validate all inputs - Sanitize data before persistence - Implement rate limiting per peer ## Testing ### Unit Tests ```go func TestPublishRaw_NameRouting_NoSubscription(t *testing.T) { p := &PubSub{ chorusTopicName: "CHORUS/coordination/v1", hmmmTopicName: "hmmm/meta-discussion/v1", contextTopicName: "CHORUS/context-feedback/v1", } if err := p.PublishRaw("nonexistent/topic", []byte("{}")); err == nil { t.Fatalf("expected error for unknown topic") } } ``` ### Integration Tests See `/home/tony/chorus/project-queues/active/CHORUS/pkg/hmmm_adapter/integration_test.go` for full integration test examples. ## Related Documentation - **P2P Package:** `/home/tony/chorus/project-queues/active/CHORUS/docs/comprehensive/packages/p2p.md` - Underlying libp2p networking - **HMMM Package:** `/home/tony/chorus/project-queues/active/CHORUS/pkg/hmmm/` - HMMM meta-discussion system - **SHHH Package:** `/home/tony/chorus/project-queues/active/CHORUS/pkg/shhh/` - Sensitive data redaction - **Hypercore Package:** `/home/tony/chorus/project-queues/active/CHORUS/pkg/hcfs/hypercore.go` - Event persistence ## Implementation Details ### Concurrency - All maps protected by RWMutex - Goroutines for message handling (3 static + N dynamic) - Context-based cancellation for clean shutdown ### Message Flow ``` Publisher PubSub Subscriber | | | |-- PublishBzzzMessage ---->| | | |-- Sanitize (SHHH) -------->| | |-- Marshal Message -------->| | |-- GossipSub Publish ------>| | | | | |<-- GossipSub Receive ------| | |-- Unmarshal Message ------>| | |-- Filter Own Messages ---->| | |-- handleBzzzMessages ----->| | |-- Log to Hypercore ------->| | |-- Call Handler ----------->|-- Process ``` ### Error Handling - Network errors logged but not fatal - Invalid messages logged and skipped - Subscription errors cancel context - Topic join errors returned immediately ## Source Files - `/home/tony/chorus/project-queues/active/CHORUS/pubsub/pubsub.go` - Main implementation (942 lines) - `/home/tony/chorus/project-queues/active/CHORUS/pubsub/adapter_hmmm.go` - HMMM adapter (41 lines) - `/home/tony/chorus/project-queues/active/CHORUS/pubsub/pubsub_test.go` - Unit tests