# Agent Discovery and P2P Architecture Migration Analysis **Date**: 2025-10-10 **Author**: Claude Code (Senior Software Architect) **Status**: CRITICAL ARCHITECTURE DECISION **Context**: WHOOSH discovers only 2/34 CHORUS agents, blocking E2E workflow --- ## Executive Summary The current HTTP/DNS-based discovery architecture is **fundamentally incompatible** with Docker Swarm's VIP-based load balancing. This analysis evaluates three migration approaches to resolve the agent discovery crisis and provides detailed implementation guidance. ### Recommendation Summary **SHORT-TERM (Next 24-48 hours)**: Deploy **Option 2 - Docker API Quick Fix** **LONG-TERM (Next 2 weeks)**: Migrate to **Option 4 - NATS-Only Solution** (simpler than full libp2p) **Rationale**: Docker API provides immediate unblock with minimal risk, while NATS-based pub/sub delivers 80% of libp2p benefits with 20% of the complexity. Full libp2p migration deferred until true P2P discovery (mDNS/DHT) is required. --- ## Table of Contents 1. [Current Architecture Analysis](#1-current-architecture-analysis) 2. [Complexity Assessment](#2-complexity-assessment) 3. [Technical Feasibility Evaluation](#3-technical-feasibility-evaluation) 4. [Alternative Solution Comparison](#4-alternative-solution-comparison) 5. [Risk Analysis Matrix](#5-risk-analysis-matrix) 6. [Detailed Implementation Plans](#6-detailed-implementation-plans) 7. [Timeline Estimates](#7-timeline-estimates) 8. [Final Recommendation](#8-final-recommendation) --- ## 1. Current Architecture Analysis ### 1.1 Existing System Flow ``` ┌──────────────────────────────────────────────────────────────┐ │ CURRENT BROKEN FLOW │ └──────────────────────────────────────────────────────────────┘ ┌─────────────┐ │ WHOOSH │ Discovery Service │ (1 node) │ - internal/p2p/discovery.go └──────┬──────┘ - internal/p2p/broadcaster.go │ │ 1. DNS Lookup: "chorus" service ▼ ┌──────────────────┐ │ Docker Swarm │ Returns: 10.0.13.26 (VIP only) │ DNS Service │ Expected: 34 container IPs └────────┬─────────┘ │ │ 2. HTTP GET http://chorus:8081/health ▼ ┌──────────────────┐ │ Swarm VIP LB │ Round-robins to random containers │ 10.0.13.26 │ WHOOSH gets ~2 unique responses └────────┬─────────┘ │ ┌────┴─────┬──────────────┬────────────┐ ▼ ▼ ▼ ▼ (... 34 total) ┌────────┐ ┌────────┐ ┌────────┐ ┌────────┐ │ CHORUS │ │ CHORUS │ │ CHORUS │ │ CHORUS │ │ Agent1 │ │ Agent2 │ │ Agent3 │ │ Agent34│ └────────┘ └────────┘ └────────┘ └────────┘ ✅ ✅ ❌ ❌ (Discovered) (Discovered) (Invisible) (Invisible) Result: Only 2/34 agents discovered → Council formation fails ``` ### 1.2 Root Cause Analysis **Problem**: Docker Swarm DNS returns Service VIP, not individual container IPs **Evidence from codebase**: ```go // File: /home/tony/chorus/project-queues/active/WHOOSH/internal/p2p/discovery.go:222-235 func (d *Discovery) queryActualCHORUSService() { client := &http.Client{Timeout: 10 * time.Second} // ❌ BROKEN: This queries VIP, not all containers endpoint := "http://chorus:8081/health" resp, err := client.Get(endpoint) // Only discovers ONE agent per request // VIP load-balancer picks random container // Result: ~2-5 agents found after multiple attempts } ``` **Why this fails**: 1. DNS `chorus` → `10.0.13.26` (VIP only) 2. No API to enumerate all 34 backend containers 3. HTTP requires knowing endpoints in advance 4. VIP randomization means discovery is non-deterministic --- ## 2. Complexity Assessment ### 2.1 Option 1: Full libp2p/HMMM Migration **Components to Modify**: | Component | File Path | Changes Required | LOC Changed | Complexity | |-----------|-----------|------------------|-------------|------------| | WHOOSH Discovery | `internal/p2p/discovery.go` | Replace HTTP with libp2p host init | 200+ | HIGH | | WHOOSH Broadcaster | `internal/p2p/broadcaster.go` | Convert to pub/sub topics | 150+ | HIGH | | WHOOSH Main | `cmd/whoosh/main.go` | Initialize libp2p node | 50+ | MEDIUM | | CHORUS HTTP API | `api/http_server.go` | Add topic subscriptions | 100+ | MEDIUM | | CHORUS Council Mgr | `internal/council/manager.go` | Subscribe to topics | 80+ | MEDIUM | | CHORUS Main | `cmd/chorus-agent/main.go` | Initialize libp2p node | 50+ | MEDIUM | | HMMM BACKBEAT Config | Docker compose updates | Topic configuration | 20+ | LOW | | **TOTAL** | | | **650+ LOC** | **HIGH** | **Complexity Rating**: **HIGH** **Rationale**: - libp2p requires understanding gossipsub protocol - Peer discovery via mDNS/DHT adds network complexity - Need to maintain both HTTP (health checks) and libp2p - Breaking change requiring coordinated deployment - Testing requires full cluster stack ### 2.2 Option 2: Docker API Quick Fix **Components to Modify**: | Component | File Path | Changes Required | LOC Changed | Complexity | |-----------|-----------|------------------|-------------|------------| | WHOOSH Discovery | `internal/p2p/discovery.go` | Add Docker client integration | 80 | LOW | | WHOOSH Config | `docker-compose.swarm.yml` | Mount docker.sock | 5 | LOW | | Dependencies | `go.mod` | Add docker/docker library | 2 | LOW | | **TOTAL** | | | **87 LOC** | **LOW** | **Complexity Rating**: **LOW** **Rationale**: - Single file modification - Well-documented Docker Go SDK - No architecture changes - Easy rollback (remove socket mount) - No distributed systems complexity ### 2.3 Option 3: NATS-Only Solution **Components to Modify**: | Component | File Path | Changes Required | LOC Changed | Complexity | |-----------|-----------|------------------|-------------|------------| | WHOOSH Publisher | `internal/p2p/broadcaster.go` | Replace HTTP with NATS pub | 100 | MEDIUM | | WHOOSH Main | `cmd/whoosh/main.go` | Initialize NATS connection | 30 | LOW | | CHORUS Subscriber | `api/http_server.go` | Subscribe to NATS topics | 80 | MEDIUM | | CHORUS Main | `cmd/chorus-agent/main.go` | Initialize NATS connection | 30 | LOW | | Dependencies | `go.mod` (both repos) | Add nats-io/nats.go | 4 | LOW | | **TOTAL** | | | **244 LOC** | **MEDIUM** | **Complexity Rating**: **MEDIUM** **Rationale**: - NATS SDK is simple (already used in BACKBEAT) - No peer discovery needed (centralized NATS broker) - Maintains HTTP for health checks - Production-ready infrastructure (NATS already deployed) - Moderate testing complexity ### 2.4 Option 4: Hybrid Approach **Phase 1 (Docker API)**: LOW complexity (87 LOC, 1-2 days) **Phase 2 (NATS Migration)**: MEDIUM complexity (244 LOC, 3-4 days) **Total Complexity Rating**: **MEDIUM** (staged deployment reduces risk) --- ## 3. Technical Feasibility Evaluation ### 3.1 Can we use NATS directly instead of full libp2p? **YES - Strongly Recommended** **Comparison**: | Feature | libp2p | NATS | Requirement | |---------|--------|------|-------------| | Pub/Sub Messaging | ✅ Gossipsub | ✅ JetStream | ✅ Required | | Broadcast to all agents | ✅ Topic-based | ✅ Subject-based | ✅ Required | | Peer Discovery | ✅ mDNS/DHT | ❌ Centralized | ⚠️ Not needed yet | | At-least-once delivery | ✅ | ✅ JetStream | ✅ Required | | Message persistence | ⚠️ Complex | ✅ Built-in | 🎯 Nice-to-have | | Cluster-wide deployment | ✅ Decentralized | ✅ Centralized | ✅ Either works | | Existing infrastructure | ❌ New | ✅ **Already deployed** | 🎯 Huge win | | Learning curve | 🔴 Steep | 🟢 Gentle | 🎯 Team familiarity | | Production maturity | 🟡 Good | 🟢 Excellent | ✅ Required | **Evidence from deployed infrastructure**: ```yaml # File: /home/tony/chorus/project-queues/active/WHOOSH/BACKBEAT-prototype/docker-compose.swarm.yml:155-190 services: nats: image: nats:2.9-alpine command: ["--jetstream"] # ✅ JetStream already enabled deploy: replicas: 1 networks: - backbeat-net # ✅ Shared network with WHOOSH/CHORUS ``` **BACKBEAT SDK already uses NATS**: ```go // File: /home/tony/chorus/project-queues/active/WHOOSH/BACKBEAT-prototype/pkg/sdk/client.go:15 import ( "github.com/nats-io/nats.go" // ✅ Already in vendor ) // Subject pattern already established subject := fmt.Sprintf("backbeat.status.%s", c.config.ClusterID) ``` **Recommendation**: Use NATS for 90% of use cases. Defer libp2p until you need: - True P2P discovery (no centralized broker) - Cross-datacenter mesh networking - DHT-based content routing ### 3.2 What's the minimal viable libp2p implementation? **If you must use libp2p** (not recommended for MVP): ```go // Minimal libp2p setup for pub/sub only package main import ( "context" "github.com/libp2p/go-libp2p" pubsub "github.com/libp2p/go-libp2p-pubsub" "github.com/multiformats/go-multiaddr" ) func InitMinimalLibp2p(ctx context.Context) (*pubsub.PubSub, error) { // 1. Create libp2p host host, err := libp2p.New( libp2p.ListenAddrStrings("/ip4/0.0.0.0/tcp/9000"), ) if err != nil { return nil, err } // 2. Create gossipsub instance ps, err := pubsub.NewGossipSub(ctx, host) if err != nil { return nil, err } // 3. Bootstrap to peers (requires known peer addresses) bootstrapPeers := []string{ "/ip4/192.168.1.27/tcp/9000/p2p/QmYourPeerID", } for _, peerAddr := range bootstrapPeers { addr, _ := multiaddr.NewMultiaddr(peerAddr) host.Connect(ctx, addr) } return ps, nil } // Usage in WHOOSH func BroadcastCouncilOpportunity(ps *pubsub.PubSub, opp *CouncilOpportunity) error { topic, _ := ps.Join("councils.forming") data, _ := json.Marshal(opp) return topic.Publish(context.Background(), data) } // Usage in CHORUS func SubscribeToOpportunities(ps *pubsub.PubSub) { topic, _ := ps.Join("councils.forming") sub, _ := topic.Subscribe() for { msg, _ := sub.Next(context.Background()) var opp CouncilOpportunity json.Unmarshal(msg.Data, &opp) handleOpportunity(opp) } } ``` **Minimal dependencies**: ```bash go get github.com/libp2p/go-libp2p@latest go get github.com/libp2p/go-libp2p-pubsub@latest ``` **Estimated complexity**: 300 LOC (still 3x more than NATS) ### 3.3 Are there Go libraries/examples we can leverage? **YES - Multiple options** **Option A: NATS Go Client** (Recommended) ```bash # Already in BACKBEAT SDK vendor directory /home/tony/chorus/project-queues/active/WHOOSH/vendor/github.com/nats-io/nats.go/ ``` **Example from BACKBEAT SDK**: ```go // File: internal/backbeat/integration.go (hypothetical WHOOSH usage) import "github.com/nats-io/nats.go" type WHOOSHPublisher struct { nc *nats.Conn } func NewWHOOSHPublisher(natsURL string) (*WHOOSHPublisher, error) { nc, err := nats.Connect(natsURL) if err != nil { return nil, err } return &WHOOSHPublisher{nc: nc}, nil } func (w *WHOOSHPublisher) BroadcastCouncilOpportunity(opp *CouncilOpportunity) error { data, _ := json.Marshal(opp) // Publish to all CHORUS agents subscribed to this subject return w.nc.Publish("chorus.councils.forming", data) } ``` **Option B: Docker Go SDK** ```bash # For Docker API quick fix go get github.com/docker/docker@latest ``` **Example**: ```go // File: internal/p2p/discovery_swarm.go (new file) import ( "context" "github.com/docker/docker/client" "github.com/docker/docker/api/types" "github.com/docker/docker/api/types/filters" ) func (d *Discovery) DiscoverSwarmAgents() ([]*Agent, error) { cli, err := client.NewClientWithOpts(client.FromEnv) if err != nil { return nil, err } defer cli.Close() // List all tasks for CHORUS service tasks, err := cli.TaskList(context.Background(), types.TaskListOptions{ Filters: filters.NewArgs( filters.Arg("service", "CHORUS_chorus"), filters.Arg("desired-state", "running"), ), }) agents := []*Agent{} for _, task := range tasks { // Extract container IP from task for _, attachment := range task.NetworksAttachments { if len(attachment.Addresses) > 0 { ip := strings.Split(attachment.Addresses[0], "/")[0] endpoint := fmt.Sprintf("http://%s:8080", ip) agents = append(agents, &Agent{ ID: task.ID[:12], Endpoint: endpoint, Status: "online", // ... populate other fields }) } } } return agents, nil } ``` **Option C: libp2p Examples** ```bash # Official examples repository git clone https://github.com/libp2p/go-libp2p-examples.git # Relevant examples: # - pubsub-example/ # - echo/ # - chat-with-rendezvous/ ``` ### 3.4 What about backwards compatibility during migration? **Strategy: Dual-mode operation during transition** ```go // File: internal/p2p/broadcaster.go type Broadcaster struct { discovery *Discovery natsConn *nats.Conn // New: NATS connection ctx context.Context cancel context.CancelFunc // Feature flag for gradual rollout useNATS bool } func (b *Broadcaster) BroadcastCouncilOpportunity(ctx context.Context, opp *CouncilOpportunity) error { if b.useNATS && b.natsConn != nil { // New path: NATS pub/sub return b.broadcastViaNATS(ctx, opp) } // Legacy path: HTTP POST (existing code) return b.broadcastViaHTTP(ctx, opp) } func (b *Broadcaster) broadcastViaNATS(ctx context.Context, opp *CouncilOpportunity) error { data, err := json.Marshal(opp) if err != nil { return err } subject := fmt.Sprintf("chorus.councils.%s.forming", opp.CouncilID) return b.natsConn.Publish(subject, data) } func (b *Broadcaster) broadcastViaHTTP(ctx context.Context, opp *CouncilOpportunity) error { // Existing HTTP broadcast logic (unchanged) agents := b.discovery.GetAgents() // ... existing code } ``` **Deployment strategy**: 1. **Week 1**: Deploy NATS code with `useNATS=false` (no behavior change) 2. **Week 2**: Enable NATS for 10% of broadcasts (canary testing) 3. **Week 3**: Enable NATS for 100% of broadcasts 4. **Week 4**: Remove HTTP broadcast code entirely **Rollback**: Set `useNATS=false` via environment variable --- ## 4. Alternative Solution Comparison ### 4.1 Option 1: Full libp2p/HMMM Migration **Architecture**: ``` ┌─────────────┐ │ WHOOSH │ │ (libp2p) │ - Initialize libp2p host └──────┬──────┘ - Join "councils.forming" topic │ - Publish council opportunities │ │ Publishes to libp2p gossipsub topic ▼ ┌──────────────────────────────────────────┐ │ libp2p Gossipsub Network │ │ - Peer discovery via mDNS │ │ - DHT routing (optional) │ │ - Message propagation to all subscribers│ └──────┬─────┬─────┬─────┬─────┬──────────┘ │ │ │ │ │ ┌───▼┐ ┌──▼─┐ ┌─▼──┐ ┌▼──┐ (... 34 total) │CH1 │ │CH2 │ │CH3 │ │CH4│ All CHORUS agents └────┘ └────┘ └────┘ └───┘ subscribe to topic ✅ ALL 34 agents receive message simultaneously ``` **Pros**: - ✅ True P2P architecture (no central broker) - ✅ Automatic peer discovery via mDNS/DHT - ✅ Built-in encryption and authentication - ✅ Message deduplication - ✅ Resilient to network partitions - ✅ Unlocks future P2P features (DHT storage, streams) **Cons**: - ❌ High complexity (650+ LOC changes) - ❌ Steep learning curve - ❌ Requires libp2p expertise - ❌ Breaking change (coordinated deployment) - ❌ Complex testing requirements - ❌ Overkill for current needs **Implementation time**: 7-11 days **When to use**: When you need true P2P discovery without central infrastructure ### 4.2 Option 2: Docker API Quick Fix **Architecture**: ``` ┌─────────────┐ │ WHOOSH │ │ (Docker API)│ - Mount /var/run/docker.sock └──────┬──────┘ - Query Swarm API for tasks │ │ GET /tasks?service=CHORUS_chorus ▼ ┌──────────────────┐ │ Docker Engine │ Returns: List of all 34 tasks │ (Swarm Manager) │ with container IPs └────────┬─────────┘ │ ┌────┴─────┬──────────────┬────────────┐ ▼ ▼ ▼ ▼ (... 34 total) ┌────────┐ ┌────────┐ ┌────────┐ ┌────────┐ │10.0.13.1│ │10.0.13.2│ │10.0.13.3│ │10.0.13.34│ │CHORUS 1│ │CHORUS 2│ │CHORUS 3│ │CHORUS 34│ └────────┘ └────────┘ └────────┘ └────────┘ WHOOSH makes HTTP POST to all 34 container IPs ✅ 100% discovery rate ``` **Code changes**: ```go // File: internal/p2p/discovery.go (add new method) import ( "github.com/docker/docker/client" "github.com/docker/docker/api/types" "github.com/docker/docker/api/types/filters" ) func (d *Discovery) discoverDockerSwarmAgents() { cli, err := client.NewClientWithOpts(client.FromEnv) if err != nil { log.Error().Err(err).Msg("Failed to create Docker client") return } defer cli.Close() // Query Swarm for all CHORUS tasks tasks, err := cli.TaskList(context.Background(), types.TaskListOptions{ Filters: filters.NewArgs( filters.Arg("service", "CHORUS_chorus"), filters.Arg("desired-state", "running"), ), }) if err != nil { log.Error().Err(err).Msg("Failed to list Swarm tasks") return } log.Info().Int("task_count", len(tasks)).Msg("Discovered Swarm tasks") for _, task := range tasks { if len(task.NetworksAttachments) == 0 { continue } for _, attachment := range task.NetworksAttachments { if len(attachment.Addresses) == 0 { continue } // Extract IP from CIDR (e.g., "10.0.13.5/24" -> "10.0.13.5") ip := strings.Split(attachment.Addresses[0], "/")[0] endpoint := fmt.Sprintf("http://%s:8080", ip) agent := &Agent{ ID: task.ID[:12], // Use first 12 chars of task ID Name: fmt.Sprintf("CHORUS-%s", task.Slot), Endpoint: endpoint, Status: "online", Capabilities: []string{ "general_development", "task_coordination", }, Model: "llama3.1:8b", LastSeen: time.Now(), P2PAddr: fmt.Sprintf("%s:9000", ip), ClusterID: "docker-unified-stack", } d.addOrUpdateAgent(agent) log.Debug(). Str("task_id", task.ID[:12]). Str("ip", ip). Str("endpoint", endpoint). Msg("Discovered CHORUS agent via Docker API") } } } ``` **Deployment changes**: ```yaml # File: docker/docker-compose.swarm.yml (WHOOSH service) services: whoosh: image: anthonyrawlins/whoosh:latest volumes: - /var/run/docker.sock:/var/run/docker.sock:ro # Mount Docker socket environment: - WHOOSH_DISCOVERY_MODE=docker-swarm ``` **Pros**: - ✅ **Minimal code changes** (87 LOC) - ✅ **Immediate solution** (1-2 days) - ✅ 100% discovery rate - ✅ Easy to test and validate - ✅ Simple rollback (remove socket mount) - ✅ No distributed systems complexity **Cons**: - ⚠️ Requires privileged Docker socket access - ⚠️ Couples WHOOSH to Docker Swarm - ⚠️ Doesn't enable pub/sub messaging - ⚠️ Still uses HTTP (no performance benefits) - ⚠️ Not portable to Kubernetes/other orchestrators **Implementation time**: 1-2 days **When to use**: Emergency unblock, temporary fix while planning proper migration ### 4.3 Option 3: NATS-Only Solution (Recommended Long-term) **Architecture**: ``` ┌─────────────┐ │ WHOOSH │ │ (NATS client)│ - Connect to NATS broker └──────┬──────┘ - Publish to subject │ │ PUBLISH chorus.councils.forming {...} ▼ ┌──────────────────────────────────────────┐ │ NATS Server (JetStream) │ │ - Already deployed as backbeat-nats │ │ - Message persistence │ │ - At-least-once delivery │ └──────┬─────┬─────┬─────┬─────┬──────────┘ │ │ │ │ │ ┌───▼┐ ┌──▼─┐ ┌─▼──┐ ┌▼──┐ (... 34 total) │CH1 │ │CH2 │ │CH3 │ │CH4│ All CHORUS agents │SUB │ │SUB │ │SUB │ │SUB│ subscribe on startup └────┘ └────┘ └────┘ └───┘ ✅ ALL 34 agents receive message via subscription ``` **Code changes - WHOOSH**: ```go // File: internal/nats/publisher.go (new file) package nats import ( "context" "encoding/json" "fmt" "github.com/nats-io/nats.go" "github.com/rs/zerolog/log" ) type CouncilPublisher struct { nc *nats.Conn } func NewCouncilPublisher(natsURL string) (*CouncilPublisher, error) { nc, err := nats.Connect(natsURL, nats.MaxReconnects(-1), // Infinite reconnects nats.ReconnectWait(1*time.Second), ) if err != nil { return nil, fmt.Errorf("failed to connect to NATS: %w", err) } log.Info().Str("nats_url", natsURL).Msg("Connected to NATS for council publishing") return &CouncilPublisher{nc: nc}, nil } func (p *CouncilPublisher) PublishCouncilOpportunity(ctx context.Context, opp *CouncilOpportunity) error { data, err := json.Marshal(opp) if err != nil { return fmt.Errorf("failed to marshal opportunity: %w", err) } subject := "chorus.councils.forming" // Publish with headers for tracing msg := &nats.Msg{ Subject: subject, Data: data, Header: nats.Header{ "Council-ID": []string{opp.CouncilID.String()}, "Project": []string{opp.ProjectName}, }, } if err := p.nc.PublishMsg(msg); err != nil { return fmt.Errorf("failed to publish to NATS: %w", err) } log.Info(). Str("council_id", opp.CouncilID.String()). Str("subject", subject). Msg("Published council opportunity to NATS") return nil } func (p *CouncilPublisher) Close() error { if p.nc != nil { p.nc.Close() } return nil } ``` **Code changes - CHORUS**: ```go // File: internal/nats/subscriber.go (new file) package nats import ( "context" "encoding/json" "fmt" "chorus/internal/council" "github.com/nats-io/nats.go" "github.com/rs/zerolog/log" ) type CouncilSubscriber struct { nc *nats.Conn sub *nats.Subscription councilMgr *council.Manager } func NewCouncilSubscriber(natsURL string, councilMgr *council.Manager) (*CouncilSubscriber, error) { nc, err := nats.Connect(natsURL, nats.MaxReconnects(-1), nats.ReconnectWait(1*time.Second), ) if err != nil { return nil, fmt.Errorf("failed to connect to NATS: %w", err) } s := &CouncilSubscriber{ nc: nc, councilMgr: councilMgr, } // Subscribe to council opportunities sub, err := nc.Subscribe("chorus.councils.forming", s.handleCouncilOpportunity) if err != nil { nc.Close() return nil, fmt.Errorf("failed to subscribe to council opportunities: %w", err) } s.sub = sub log.Info(). Str("nats_url", natsURL). Str("subject", "chorus.councils.forming"). Msg("Subscribed to council opportunities on NATS") return s, nil } func (s *CouncilSubscriber) handleCouncilOpportunity(msg *nats.Msg) { var opp council.Opportunity if err := json.Unmarshal(msg.Data, &opp); err != nil { log.Error().Err(err).Msg("Failed to unmarshal council opportunity") return } log.Info(). Str("council_id", opp.CouncilID.String()). Str("project", opp.ProjectName). Msg("Received council opportunity from NATS") // Delegate to council manager (existing logic) if err := s.councilMgr.HandleOpportunity(context.Background(), &opp); err != nil { log.Error(). Err(err). Str("council_id", opp.CouncilID.String()). Msg("Failed to handle council opportunity") } } func (s *CouncilSubscriber) Close() error { if s.sub != nil { s.sub.Unsubscribe() } if s.nc != nil { s.nc.Close() } return nil } ``` **Integration in CHORUS main**: ```go // File: cmd/chorus-agent/main.go func main() { // ... existing setup // Initialize council manager councilMgr := council.NewManager(agentID, agentName, endpoint, p2pAddr, capabilities) // NEW: Subscribe to NATS instead of HTTP endpoint natsURL := os.Getenv("NATS_URL") if natsURL == "" { natsURL = "nats://backbeat-nats:4222" } natsSubscriber, err := nats.NewCouncilSubscriber(natsURL, councilMgr) if err != nil { log.Fatal().Err(err).Msg("Failed to initialize NATS subscriber") } defer natsSubscriber.Close() // HTTP server still runs for health checks (optional) if enableHTTP { httpServer := api.NewHTTPServer(cfg, node, hlog, ps) go httpServer.Start() } // ... rest of main } ``` **Pros**: - ✅ **Simpler than libp2p** (244 vs 650 LOC) - ✅ **Production-ready** (NATS battle-tested) - ✅ **Already deployed** (backbeat-nats running) - ✅ 100% broadcast delivery - ✅ Message persistence (JetStream) - ✅ Familiar SDK (used in BACKBEAT) - ✅ Easy testing (local NATS for dev) - ✅ Portable (works with any orchestrator) **Cons**: - ⚠️ Centralized (NATS is single point of failure) - ⚠️ No automatic peer discovery (not needed) - ⚠️ Requires NATS infrastructure (already have it) **Implementation time**: 3-4 days **When to use**: Production-grade pub/sub without P2P complexity ### 4.4 Comparison Matrix | Criteria | Option 1: libp2p | Option 2: Docker API | Option 3: NATS | Option 4: Hybrid | |----------|------------------|----------------------|----------------|------------------| | **Discovery Success** | 100% (gossipsub) | 100% (API query) | 100% (pub/sub) | 100% (both phases) | | **Implementation Time** | 7-11 days | 1-2 days | 3-4 days | 2 days + 3-4 days | | **Code Complexity** | HIGH (650 LOC) | LOW (87 LOC) | MEDIUM (244 LOC) | LOW → MEDIUM | | **Testing Complexity** | HIGH | LOW | MEDIUM | LOW → MEDIUM | | **Production Risk** | HIGH | LOW | MEDIUM | LOW (staged) | | **Scalability** | Excellent | Good | Excellent | Excellent | | **Infrastructure Deps** | None (P2P) | Docker Swarm | NATS (have it) | Both | | **Learning Curve** | Steep | Gentle | Gentle | Gentle | | **Backwards Compat** | Breaking change | Non-breaking | Breaking change | Dual-mode | | **Rollback Ease** | Difficult | Easy | Moderate | Easy (per phase) | | **Future-Proofing** | Best (P2P ready) | Poor (Docker-only) | Good | Good | | **Message Persistence** | Complex | N/A | Built-in | N/A → Built-in | | **Operational Overhead** | High | Low | Low | Low | | **Deployment Coupling** | Tight | Tight | Moderate | Moderate | **Scoring** (1-10, higher is better): | Option | Immediate Value | Long-term Value | Risk Level | Total Score | |--------|-----------------|-----------------|------------|-------------| | **Option 1: libp2p** | 6/10 | 10/10 | 4/10 | **20/30** | | **Option 2: Docker API** | 10/10 | 4/10 | 9/10 | **23/30** | | **Option 3: NATS** | 8/10 | 9/10 | 7/10 | **24/30** | | **Option 4: Hybrid** | 10/10 | 9/10 | 8/10 | **27/30** ⭐ | --- ## 5. Risk Analysis Matrix ### 5.1 Technical Risks | Risk | Option 1: libp2p | Option 2: Docker API | Option 3: NATS | Option 4: Hybrid | |------|------------------|----------------------|----------------|------------------| | **Peer discovery failure** | LOW (mDNS/DHT) | N/A | N/A | N/A | | **Message delivery failure** | LOW (retry logic) | MEDIUM (HTTP timeout) | LOW (NATS ack) | LOW → LOW | | **Network partition** | LOW (eventual consistency) | HIGH (no retries) | MEDIUM (NATS reconnect) | HIGH → MEDIUM | | **Scale bottleneck** | LOW (P2P) | MEDIUM (VIP load) | LOW (NATS scales) | MEDIUM → LOW | | **Implementation bugs** | HIGH (complex) | LOW (simple) | MEDIUM | LOW → MEDIUM | | **Integration issues** | HIGH (new system) | LOW (existing stack) | MEDIUM (new client) | LOW → MEDIUM | | **Performance degradation** | LOW (efficient) | MEDIUM (HTTP overhead) | LOW (fast broker) | MEDIUM → LOW | ### 5.2 Deployment Risks | Risk | Option 1: libp2p | Option 2: Docker API | Option 3: NATS | Option 4: Hybrid | |------|------------------|----------------------|----------------|------------------| | **Coordinated deployment** | **CRITICAL** | LOW | **HIGH** | LOW → HIGH | | **Rollback complexity** | HIGH (breaking) | LOW (remove mount) | MEDIUM (revert code) | LOW → MEDIUM | | **Downtime required** | YES (breaking) | NO | YES (breaking) | NO → YES | | **Service interruption** | HIGH (full restart) | LOW (WHOOSH only) | MEDIUM (both services) | LOW → MEDIUM | | **Config drift** | HIGH (new settings) | LOW (one env var) | MEDIUM (NATS URL) | LOW → MEDIUM | | **Version compatibility** | CRITICAL | N/A | MEDIUM (NATS version) | N/A → MEDIUM | ### 5.3 Operational Risks | Risk | Option 1: libp2p | Option 2: Docker API | Option 3: NATS | Option 4: Hybrid | |------|------------------|----------------------|----------------|------------------| | **Monitoring gaps** | HIGH (new metrics) | LOW (existing) | MEDIUM (NATS metrics) | LOW → MEDIUM | | **Debugging difficulty** | HIGH (distributed) | LOW (centralized) | MEDIUM (broker logs) | LOW → MEDIUM | | **On-call burden** | HIGH (complex) | LOW (simple) | MEDIUM | LOW → MEDIUM | | **Documentation debt** | HIGH (new system) | LOW (minimal change) | MEDIUM (NATS docs) | LOW → MEDIUM | | **Team knowledge** | LOW (unfamiliar) | HIGH (Docker known) | MEDIUM (NATS used) | HIGH → MEDIUM | | **Vendor lock-in** | LOW (open source) | HIGH (Docker Swarm) | MEDIUM (NATS) | HIGH → MEDIUM | ### 5.4 Mitigation Strategies **Option 1: libp2p** - **Risk**: High implementation complexity - **Mitigation**: - Hire libp2p consultant for architecture review - Build prototype in isolated environment first - Extensive integration testing before production - Feature flagging for gradual rollout **Option 2: Docker API** - **Risk**: Docker Swarm vendor lock-in - **Mitigation**: - Document as temporary solution - Plan NATS migration immediately after - Abstract discovery interface for future portability - Maintain HTTP broadcast as fallback **Option 3: NATS** - **Risk**: NATS broker single point of failure - **Mitigation**: - Deploy NATS in clustered mode (3 replicas) - Enable JetStream for message persistence - Monitor NATS health and set up alerts - Document NATS recovery procedures **Option 4: Hybrid** (Recommended) - **Risk**: Phase 2 migration complexity - **Mitigation**: - Validate Phase 1 thoroughly in production (1 week) - Implement dual-mode operation (HTTP + NATS) during Phase 2 - Use feature flags for gradual NATS rollout - Maintain HTTP as fallback during transition --- ## 6. Detailed Implementation Plans ### 6.1 Option 2: Docker API Quick Fix (RECOMMENDED SHORT-TERM) **Timeline**: 1-2 days #### Step 1: Code Changes (4 hours) **File**: `/home/tony/chorus/project-queues/active/WHOOSH/internal/p2p/discovery.go` ```go // Add import import ( "github.com/docker/docker/client" "github.com/docker/docker/api/types" "github.com/docker/docker/api/types/filters" ) // Add new method after discoverRealCHORUSAgents() func (d *Discovery) discoverDockerSwarmAgents() { if !d.config.DockerEnabled { return } cli, err := client.NewClientWithOpts(client.FromEnv) if err != nil { log.Debug().Err(err).Msg("Docker client not available (running outside Swarm?)") return } defer cli.Close() ctx := context.Background() // List all tasks for CHORUS service serviceName := os.Getenv("CHORUS_SERVICE_NAME") if serviceName == "" { serviceName = "CHORUS_chorus" // Default stack name } tasks, err := cli.TaskList(ctx, types.TaskListOptions{ Filters: filters.NewArgs( filters.Arg("service", serviceName), filters.Arg("desired-state", "running"), ), }) if err != nil { log.Error().Err(err).Msg("Failed to list Docker Swarm tasks") return } log.Info(). Int("task_count", len(tasks)). Str("service", serviceName). Msg("🐳 Discovered Docker Swarm tasks") discoveredCount := 0 for _, task := range tasks { // Skip tasks without network attachments if len(task.NetworksAttachments) == 0 { log.Debug().Str("task_id", task.ID[:12]).Msg("Task has no network attachments") continue } // Extract IP from first network attachment for _, attachment := range task.NetworksAttachments { if len(attachment.Addresses) == 0 { continue } // Parse IP from CIDR notation (e.g., "10.0.13.5/24") ipCIDR := attachment.Addresses[0] ip := strings.Split(ipCIDR, "/")[0] // Construct agent endpoint port := os.Getenv("CHORUS_AGENT_PORT") if port == "" { port = "8080" } endpoint := fmt.Sprintf("http://%s:%s", ip, port) // Create agent entry agent := &Agent{ ID: task.ID[:12], // Short task ID Name: fmt.Sprintf("CHORUS-Task-%d", task.Slot), Endpoint: endpoint, Status: "online", Capabilities: []string{ "general_development", "task_coordination", "ai_integration", }, Model: "llama3.1:8b", LastSeen: time.Now(), TasksCompleted: 0, P2PAddr: fmt.Sprintf("%s:9000", ip), ClusterID: "docker-unified-stack", } d.addOrUpdateAgent(agent) discoveredCount++ log.Debug(). Str("task_id", task.ID[:12]). Int("slot", int(task.Slot)). Str("ip", ip). Str("endpoint", endpoint). Msg("🤖 Discovered CHORUS agent via Docker API") break // Only use first network attachment } } log.Info(). Int("discovered", discoveredCount). Int("total_tasks", len(tasks)). Msg("✅ Docker Swarm agent discovery completed") } ``` **Update `discoverRealCHORUSAgents()` to call new method**: ```go func (d *Discovery) discoverRealCHORUSAgents() { log.Debug().Msg("🔍 Discovering real CHORUS agents via health endpoints") // Priority 1: Docker Swarm API (most accurate) d.discoverDockerSwarmAgents() // Priority 2: Known service endpoints (fallback) d.queryActualCHORUSService() d.discoverKnownEndpoints() } ``` #### Step 2: Dependency Update (5 minutes) **File**: `/home/tony/chorus/project-queues/active/WHOOSH/go.mod` ```bash cd /home/tony/chorus/project-queues/active/WHOOSH go get github.com/docker/docker@latest go mod tidy ``` #### Step 3: Configuration Changes (10 minutes) **File**: `/home/tony/chorus/project-queues/active/WHOOSH/docker/docker-compose.swarm.yml` ```yaml services: whoosh: image: anthonyrawlins/whoosh:latest # NEW: Mount Docker socket for Swarm API access volumes: - /var/run/docker.sock:/var/run/docker.sock:ro # NEW: Environment variables for Docker discovery environment: - WHOOSH_DISCOVERY_MODE=docker-swarm - CHORUS_SERVICE_NAME=CHORUS_chorus # Match your stack name - CHORUS_AGENT_PORT=8080 # IMPORTANT: Security note - Docker socket access is read-only # This allows querying tasks but not modifying them ``` #### Step 4: Build and Deploy (30 minutes) ```bash # Build new WHOOSH image cd /home/tony/chorus/project-queues/active/WHOOSH docker build -t registry.home.deepblack.cloud/whoosh:v1.2.0-docker-discovery . docker push registry.home.deepblack.cloud/whoosh:v1.2.0-docker-discovery # Update compose file to use new image # Edit docker/docker-compose.swarm.yml: # whoosh: # image: registry.home.deepblack.cloud/whoosh:v1.2.0-docker-discovery # Redeploy to Swarm docker stack deploy -c docker/docker-compose.swarm.yml WHOOSH ``` #### Step 5: Validation (1 hour) ```bash # 1. Check WHOOSH logs for discovery docker service logs -f WHOOSH_whoosh --tail 100 # Expected output: # {"level":"info","task_count":34,"service":"CHORUS_chorus","message":"🐳 Discovered Docker Swarm tasks"} # {"level":"info","discovered":34,"total_tasks":34,"message":"✅ Docker Swarm agent discovery completed"} # 2. Check agent registry in database docker exec -it $(docker ps -q -f name=WHOOSH_whoosh) psql -U whoosh -d whoosh -c \ "SELECT COUNT(*) FROM agents WHERE status = 'available';" # Expected: 34 rows # 3. Trigger council formation curl -X POST http://whoosh.chorus.services/api/v1/councils \ -H "Content-Type: application/json" \ -d '{ "project_name": "Test Discovery", "repository": "https://gitea.chorus.services/tony/test", "core_roles": ["tpm", "senior-software-architect"] }' # 4. Verify all agents received broadcast docker service logs CHORUS_chorus --tail 1000 | grep "Received council opportunity" # Expected: 34 log entries (one per agent) ``` #### Step 6: Monitoring and Rollback Plan (30 minutes) **Success Criteria**: - ✅ All 34 CHORUS agents discovered - ✅ Council broadcasts reach all agents - ✅ Council formation completes successfully - ✅ No performance degradation **Rollback Procedure** (if issues occur): ```bash # 1. Remove Docker socket mount from compose file # 2. Redeploy with previous image docker stack deploy -c docker/docker-compose.swarm.yml.backup WHOOSH # 3. Verify rollback docker service logs WHOOSH_whoosh ``` **Total Implementation Time**: 6-8 hours --- ### 6.2 Option 3: NATS-Only Solution (RECOMMENDED LONG-TERM) **Timeline**: 3-4 days #### Day 1: WHOOSH Publisher Implementation **Task 1.1**: Create NATS publisher module (3 hours) **New File**: `/home/tony/chorus/project-queues/active/WHOOSH/internal/nats/publisher.go` ```go package nats import ( "context" "encoding/json" "fmt" "time" "github.com/chorus-services/whoosh/internal/p2p" "github.com/google/uuid" "github.com/nats-io/nats.go" "github.com/rs/zerolog/log" ) type CouncilPublisher struct { nc *nats.Conn } func NewCouncilPublisher(natsURL string) (*CouncilPublisher, error) { opts := []nats.Option{ nats.Name("WHOOSH Council Publisher"), nats.MaxReconnects(-1), // Infinite reconnects nats.ReconnectWait(1 * time.Second), nats.DisconnectErrHandler(func(nc *nats.Conn, err error) { log.Warn().Err(err).Msg("NATS disconnected") }), nats.ReconnectHandler(func(nc *nats.Conn) { log.Info().Str("url", nc.ConnectedUrl()).Msg("NATS reconnected") }), } nc, err := nats.Connect(natsURL, opts...) if err != nil { return nil, fmt.Errorf("failed to connect to NATS at %s: %w", natsURL, err) } log.Info(). Str("nats_url", natsURL). Str("server_url", nc.ConnectedUrl()). Msg("✅ Connected to NATS for council publishing") return &CouncilPublisher{nc: nc}, nil } func (p *CouncilPublisher) PublishCouncilOpportunity(ctx context.Context, opp *p2p.CouncilOpportunity) error { // Marshal opportunity to JSON data, err := json.Marshal(opp) if err != nil { return fmt.Errorf("failed to marshal council opportunity: %w", err) } // Construct subject (NATS topic) subject := "chorus.councils.forming" // Create message with headers for tracing msg := &nats.Msg{ Subject: subject, Data: data, Header: nats.Header{ "Council-ID": []string{opp.CouncilID.String()}, "Project": []string{opp.ProjectName}, "Repository": []string{opp.Repository}, "UCXL-Address": []string{opp.UCXLAddress}, "Published-At": []string{time.Now().Format(time.RFC3339)}, }, } // Publish to NATS if err := p.nc.PublishMsg(msg); err != nil { return fmt.Errorf("failed to publish to NATS subject %s: %w", subject, err) } // Flush to ensure delivery if err := p.nc.FlushTimeout(5 * time.Second); err != nil { log.Warn().Err(err).Msg("NATS flush timeout (message likely delivered)") } log.Info(). Str("council_id", opp.CouncilID.String()). Str("project", opp.ProjectName). Str("subject", subject). Int("core_roles", len(opp.CoreRoles)). Int("optional_roles", len(opp.OptionalRoles)). Msg("📡 Published council opportunity to NATS") return nil } func (p *CouncilPublisher) PublishCouncilStatus(ctx context.Context, update *p2p.CouncilStatusUpdate) error { data, err := json.Marshal(update) if err != nil { return fmt.Errorf("failed to marshal council status: %w", err) } subject := fmt.Sprintf("chorus.councils.%s.status", update.CouncilID.String()) msg := &nats.Msg{ Subject: subject, Data: data, Header: nats.Header{ "Council-ID": []string{update.CouncilID.String()}, "Status": []string{update.Status}, }, } if err := p.nc.PublishMsg(msg); err != nil { return fmt.Errorf("failed to publish council status: %w", err) } log.Info(). Str("council_id", update.CouncilID.String()). Str("status", update.Status). Str("subject", subject). Msg("📢 Published council status update to NATS") return nil } func (p *CouncilPublisher) Close() error { if p.nc != nil && !p.nc.IsClosed() { p.nc.Close() log.Info().Msg("Closed NATS connection") } return nil } func (p *CouncilPublisher) Health() error { if p.nc == nil { return fmt.Errorf("NATS connection is nil") } if p.nc.IsClosed() { return fmt.Errorf("NATS connection is closed") } if !p.nc.IsConnected() { return fmt.Errorf("NATS connection is not connected") } return nil } ``` **Task 1.2**: Integrate publisher into WHOOSH server (2 hours) **File**: `/home/tony/chorus/project-queues/active/WHOOSH/internal/server/server.go` ```go import ( natspub "github.com/chorus-services/whoosh/internal/nats" ) type Server struct { // ... existing fields natsPublisher *natspub.CouncilPublisher // NEW } func NewServer(cfg *config.Config, db *pgxpool.Pool) *Server { // ... existing setup // NEW: Initialize NATS publisher natsURL := os.Getenv("NATS_URL") if natsURL == "" { natsURL = "nats://backbeat-nats:4222" // Default to BACKBEAT NATS } natsPublisher, err := natspub.NewCouncilPublisher(natsURL) if err != nil { log.Fatal().Err(err).Msg("Failed to initialize NATS publisher") } return &Server{ // ... existing fields natsPublisher: natsPublisher, } } func (s *Server) Stop() error { // ... existing shutdown // NEW: Close NATS connection if s.natsPublisher != nil { s.natsPublisher.Close() } return nil } ``` **Task 1.3**: Update broadcaster to use NATS (2 hours) **File**: `/home/tony/chorus/project-queues/active/WHOOSH/internal/p2p/broadcaster.go` ```go // Add field to Broadcaster struct type Broadcaster struct { discovery *Discovery natsPublisher *nats.CouncilPublisher // NEW ctx context.Context cancel context.CancelFunc } // Update BroadcastCouncilOpportunity to use NATS func (b *Broadcaster) BroadcastCouncilOpportunity(ctx context.Context, opportunity *CouncilOpportunity) error { log.Info(). Str("council_id", opportunity.CouncilID.String()). Str("project_name", opportunity.ProjectName). Msg("📡 Broadcasting council opportunity via NATS") // NEW: Use NATS pub/sub instead of HTTP if b.natsPublisher != nil { err := b.natsPublisher.PublishCouncilOpportunity(ctx, opportunity) if err != nil { log.Error().Err(err).Msg("Failed to publish via NATS, falling back to HTTP") // Fallback to HTTP (during transition period) return b.broadcastViaHTTP(ctx, opportunity) } log.Info(). Str("council_id", opportunity.CouncilID.String()). Msg("✅ Council opportunity published to NATS") return nil } // Fallback: Use existing HTTP broadcast return b.broadcastViaHTTP(ctx, opportunity) } // Keep existing HTTP method as fallback func (b *Broadcaster) broadcastViaHTTP(ctx context.Context, opportunity *CouncilOpportunity) error { // ... existing HTTP broadcast logic } ``` #### Day 2: CHORUS Subscriber Implementation **Task 2.1**: Create NATS subscriber module (3 hours) **New File**: `/home/tony/chorus/project-queues/active/CHORUS/internal/nats/subscriber.go` ```go package nats import ( "context" "encoding/json" "fmt" "time" "chorus/internal/council" "github.com/nats-io/nats.go" "github.com/rs/zerolog/log" ) type CouncilSubscriber struct { nc *nats.Conn subs []*nats.Subscription councilMgr *council.Manager ctx context.Context cancel context.CancelFunc } func NewCouncilSubscriber(natsURL string, councilMgr *council.Manager) (*CouncilSubscriber, error) { ctx, cancel := context.WithCancel(context.Background()) opts := []nats.Option{ nats.Name(fmt.Sprintf("CHORUS Agent %s", councilMgr.AgentID)), nats.MaxReconnects(-1), nats.ReconnectWait(1 * time.Second), nats.DisconnectErrHandler(func(nc *nats.Conn, err error) { log.Warn().Err(err).Msg("NATS disconnected") }), nats.ReconnectHandler(func(nc *nats.Conn) { log.Info().Msg("NATS reconnected") }), } nc, err := nats.Connect(natsURL, opts...) if err != nil { cancel() return nil, fmt.Errorf("failed to connect to NATS: %w", err) } s := &CouncilSubscriber{ nc: nc, subs: make([]*nats.Subscription, 0), councilMgr: councilMgr, ctx: ctx, cancel: cancel, } // Subscribe to council opportunities if err := s.subscribeToOpportunities(); err != nil { nc.Close() cancel() return nil, fmt.Errorf("failed to subscribe to opportunities: %w", err) } // Subscribe to council status updates if err := s.subscribeToStatusUpdates(); err != nil { log.Warn().Err(err).Msg("Failed to subscribe to status updates (non-critical)") } log.Info(). Str("nats_url", natsURL). Str("agent_id", councilMgr.AgentID). Msg("✅ Subscribed to NATS council topics") return s, nil } func (s *CouncilSubscriber) subscribeToOpportunities() error { subject := "chorus.councils.forming" sub, err := s.nc.Subscribe(subject, func(msg *nats.Msg) { s.handleCouncilOpportunity(msg) }) if err != nil { return fmt.Errorf("failed to subscribe to %s: %w", subject, err) } s.subs = append(s.subs, sub) log.Info(). Str("subject", subject). Msg("📬 Subscribed to council opportunities") return nil } func (s *CouncilSubscriber) subscribeToStatusUpdates() error { subject := "chorus.councils.*.status" // Wildcard subscription sub, err := s.nc.Subscribe(subject, func(msg *nats.Msg) { s.handleCouncilStatus(msg) }) if err != nil { return fmt.Errorf("failed to subscribe to %s: %w", subject, err) } s.subs = append(s.subs, sub) log.Info(). Str("subject", subject). Msg("📬 Subscribed to council status updates") return nil } func (s *CouncilSubscriber) handleCouncilOpportunity(msg *nats.Msg) { // Parse council opportunity from message var opp council.Opportunity if err := json.Unmarshal(msg.Data, &opp); err != nil { log.Error(). Err(err). Str("subject", msg.Subject). Msg("Failed to unmarshal council opportunity") return } // Extract headers for logging councilID := msg.Header.Get("Council-ID") projectName := msg.Header.Get("Project") log.Info(). Str("council_id", councilID). Str("project", projectName). Str("subject", msg.Subject). Int("core_roles", len(opp.CoreRoles)). Msg("📥 Received council opportunity from NATS") // Delegate to council manager (existing logic) ctx, cancel := context.WithTimeout(s.ctx, 30*time.Second) defer cancel() if err := s.councilMgr.HandleOpportunity(ctx, &opp); err != nil { log.Error(). Err(err). Str("council_id", councilID). Msg("Failed to handle council opportunity") return } log.Debug(). Str("council_id", councilID). Msg("Successfully processed council opportunity") } func (s *CouncilSubscriber) handleCouncilStatus(msg *nats.Msg) { var status council.StatusUpdate if err := json.Unmarshal(msg.Data, &status); err != nil { log.Error().Err(err).Msg("Failed to unmarshal council status") return } log.Info(). Str("council_id", status.CouncilID.String()). Str("status", status.Status). Msg("📥 Received council status update from NATS") // Delegate to council manager if err := s.councilMgr.HandleStatusUpdate(s.ctx, &status); err != nil { log.Error(). Err(err). Str("council_id", status.CouncilID.String()). Msg("Failed to handle council status update") } } func (s *CouncilSubscriber) Close() error { s.cancel() // Unsubscribe all subscriptions for _, sub := range s.subs { if err := sub.Unsubscribe(); err != nil { log.Warn().Err(err).Msg("Failed to unsubscribe from NATS") } } // Close NATS connection if s.nc != nil && !s.nc.IsClosed() { s.nc.Close() log.Info().Msg("Closed NATS connection") } return nil } func (s *CouncilSubscriber) Health() error { if s.nc == nil { return fmt.Errorf("NATS connection is nil") } if s.nc.IsClosed() { return fmt.Errorf("NATS connection is closed") } if !s.nc.IsConnected() { return fmt.Errorf("NATS not connected") } return nil } ``` **Task 2.2**: Integrate subscriber into CHORUS main (2 hours) **File**: `/home/tony/chorus/project-queues/active/CHORUS/cmd/chorus-agent/main.go` ```go import ( natssub "chorus/internal/nats" ) func main() { // ... existing setup // Initialize council manager councilMgr := council.NewManager(agentID, agentName, endpoint, p2pAddr, capabilities) // NEW: Subscribe to NATS instead of HTTP endpoint natsURL := os.Getenv("NATS_URL") if natsURL == "" { natsURL = "nats://backbeat-nats:4222" } natsSubscriber, err := natssub.NewCouncilSubscriber(natsURL, councilMgr) if err != nil { log.Fatal().Err(err).Msg("Failed to initialize NATS subscriber") } defer natsSubscriber.Close() log.Info(). Str("agent_id", agentID). Str("nats_url", natsURL). Msg("🎧 CHORUS agent listening for council opportunities via NATS") // HTTP server still runs for health checks httpServer := api.NewHTTPServer(cfg, node, hlog, ps) go func() { if err := httpServer.Start(); err != nil && err != http.ErrServerClosed { log.Error().Err(err).Msg("HTTP server error") } }() // ... rest of main } ``` #### Day 3-4: Testing, Deployment, and Validation **Task 3.1**: Local testing with Docker Compose (4 hours) **Create test environment**: `/home/tony/chorus/project-queues/active/WHOOSH/docker-compose.nats-test.yml` ```yaml version: '3.8' services: nats: image: nats:2.9-alpine command: ["--jetstream", "--debug"] ports: - "4222:4222" - "8222:8222" # Monitoring port networks: - test-net whoosh: build: . environment: - NATS_URL=nats://nats:4222 - CHORUS_SERVICE_NAME=test_chorus depends_on: - nats networks: - test-net chorus-agent-1: build: ../CHORUS environment: - NATS_URL=nats://nats:4222 - CHORUS_AGENT_ID=test-agent-1 depends_on: - nats networks: - test-net chorus-agent-2: build: ../CHORUS environment: - NATS_URL=nats://nats:4222 - CHORUS_AGENT_ID=test-agent-2 depends_on: - nats networks: - test-net networks: test-net: driver: bridge ``` **Run tests**: ```bash cd /home/tony/chorus/project-queues/active/WHOOSH docker-compose -f docker-compose.nats-test.yml up # In another terminal, trigger council formation curl -X POST http://localhost:8080/api/v1/councils \ -H "Content-Type: application/json" \ -d '{ "project_name": "NATS Test", "repository": "https://gitea.chorus.services/tony/test", "core_roles": ["tpm", "developer"] }' # Verify both agents received message docker-compose -f docker-compose.nats-test.yml logs chorus-agent-1 | grep "Received council opportunity" docker-compose -f docker-compose.nats-test.yml logs chorus-agent-2 | grep "Received council opportunity" ``` **Task 3.2**: Production deployment (4 hours) ```bash # Build and push WHOOSH cd /home/tony/chorus/project-queues/active/WHOOSH docker build -t registry.home.deepblack.cloud/whoosh:v1.3.0-nats . docker push registry.home.deepblack.cloud/whoosh:v1.3.0-nats # Build and push CHORUS cd /home/tony/chorus/project-queues/active/CHORUS make build-agent docker build -f Dockerfile.simple -t registry.home.deepblack.cloud/chorus:v0.6.0-nats . docker push registry.home.deepblack.cloud/chorus:v0.6.0-nats # Update docker-compose files # Update WHOOSH_BACKBEAT-prototype/docker-compose.swarm.yml: # whoosh: # image: registry.home.deepblack.cloud/whoosh:v1.3.0-nats # environment: # - NATS_URL=nats://backbeat-nats:4222 # Update CHORUS/docker/docker-compose.yml: # chorus: # image: registry.home.deepblack.cloud/chorus:v0.6.0-nats # environment: # - NATS_URL=nats://backbeat-nats:4222 # Deploy to Swarm docker stack deploy -c WHOOSH_BACKBEAT-prototype/docker-compose.swarm.yml WHOOSH docker stack deploy -c CHORUS/docker/docker-compose.yml CHORUS ``` **Task 3.3**: Validation and monitoring (2 hours) ```bash # Monitor NATS connections docker service logs WHOOSH_backbeat-nats --tail 100 # Monitor WHOOSH publishing docker service logs WHOOSH_whoosh | grep "Published council opportunity" # Monitor CHORUS subscriptions docker service logs CHORUS_chorus | grep "Received council opportunity" # Verify council formation works end-to-end # (trigger test council via UI or API) ``` **Total Implementation Time**: 3-4 days --- ## 7. Timeline Estimates ### 7.1 Gantt Chart - Option 4: Hybrid Approach (RECOMMENDED) ``` PHASE 1: Docker API Quick Fix (Week 1) ========================================= Day 1 (Friday) ├─ 09:00-13:00 Implementation │ ├─ 09:00-11:00 Code changes (discovery.go) │ ├─ 11:00-11:30 Dependency updates (go.mod) │ ├─ 11:30-12:00 Docker compose changes │ └─ 12:00-13:00 Build and push images │ └─ 14:00-17:00 Testing and Deployment ├─ 14:00-15:00 Local testing ├─ 15:00-16:00 Swarm deployment ├─ 16:00-17:00 Validation and monitoring └─ 17:00 ✅ Phase 1 Complete PHASE 1 VALIDATION (Week 2) =========================== Monday-Friday ├─ Daily monitoring ├─ Performance metrics collection ├─ Bug fixes (if needed) └─ Friday: Phase 1 sign-off meeting PHASE 2: NATS Migration (Week 3) ================================= Monday ├─ 09:00-12:00 WHOOSH publisher implementation └─ 13:00-17:00 WHOOSH integration and testing Tuesday ├─ 09:00-12:00 CHORUS subscriber implementation └─ 13:00-17:00 CHORUS integration and testing Wednesday ├─ 09:00-12:00 Local integration testing ├─ 13:00-15:00 Build and push images └─ 15:00-17:00 Staging deployment Thursday ├─ 09:00-11:00 Staging validation ├─ 11:00-13:00 Production deployment └─ 14:00-17:00 Production monitoring Friday ├─ 09:00-12:00 Performance validation ├─ 13:00-15:00 Remove HTTP fallback code ├─ 15:00-16:00 Documentation └─ 16:00 ✅ Phase 2 Complete Total: 2 weeks + 3 days ``` ### 7.2 Resource Allocation | Role | Phase 1 (Days) | Phase 2 (Days) | Total | |------|---------------|----------------|-------| | Senior Software Architect | 0.5 | 1.0 | 1.5 | | Backend Developer | 1.0 | 3.0 | 4.0 | | DevOps Engineer | 0.5 | 1.0 | 1.5 | | QA Engineer | 0.5 | 1.0 | 1.5 | | **Total Person-Days** | **2.5** | **6.0** | **8.5** | ### 7.3 Critical Path Analysis **Phase 1 Critical Path**: 1. Code implementation (4 hours) - BLOCKER 2. Docker socket mount (10 min) - BLOCKER 3. Build/push (30 min) - BLOCKER 4. Deployment (30 min) - BLOCKER 5. Validation (1 hour) - BLOCKER **Total Critical Path**: 6 hours (can be completed in 1 day) **Phase 2 Critical Path**: 1. WHOOSH publisher (Day 1) - BLOCKER 2. CHORUS subscriber (Day 2) - BLOCKER for Day 3 3. Integration testing (Day 3 AM) - BLOCKER for deployment 4. Production deployment (Day 3 PM) - BLOCKER 5. Validation (Day 4) - BLOCKER for sign-off **Total Critical Path**: 3.5 days --- ## 8. Final Recommendation ### 8.1 Executive Summary **Implement Option 4: Hybrid Approach** **Phase 1** (Next 48 hours): Deploy Docker API Quick Fix **Phase 2** (Week 3): Migrate to NATS-Only Solution **Defer**: Full libp2p migration until P2P discovery is truly needed ### 8.2 Rationale #### Why NOT Full libp2p Migration Now? 1. **Complexity vs. Value**: libp2p requires 650+ LOC changes, 7-11 days implementation, and steep learning curve. Current requirements (broadcast to all agents) don't justify this complexity. 2. **Infrastructure Already Exists**: NATS is already deployed (backbeat-nats service) and proven in production. Using existing infrastructure reduces risk and operational overhead. 3. **No P2P Discovery Needed Yet**: Current architecture has centralized WHOOSH coordinator. True peer-to-peer discovery (mDNS/DHT) isn't required until agents need to discover each other without WHOOSH. 4. **Migration Path Preserved**: NATS pub/sub can later be replaced with libp2p gossipsub with minimal changes (both use topic-based messaging). #### Why Docker API First? 1. **Immediate Unblock**: Solves the critical discovery problem in 1 day, unblocking E2E workflow testing. 2. **Minimal Risk**: 87 LOC change, easy rollback, no architectural changes. 3. **Validation Period**: Gives 1 week to validate council formation works before starting Phase 2. 4. **Dual-Mode Safety**: Allows testing NATS migration while keeping Docker API as fallback. #### Why NATS Long-Term? 1. **Right Complexity Level**: Solves 90% of use cases (broadcast, persistence, reliability) with 20% of libp2p complexity. 2. **Production-Ready**: NATS is battle-tested, well-documented, and the team has experience with it (BACKBEAT SDK). 3. **Performance**: Sub-millisecond message delivery, scales to millions of messages/sec. 4. **Operational Simplicity**: Centralized monitoring, simple debugging, clear failure modes. 5. **Future-Proof**: When/if P2P discovery is needed, NATS can remain for messaging while adding libp2p for discovery only. ### 8.3 Implementation Sequence ``` ┌─────────────────────────────────────────────────────────┐ │ WEEK 1: Emergency Unblock │ ├─────────────────────────────────────────────────────────┤ │ Friday: Docker API implementation (6 hours) │ │ Result: 100% agent discovery, council formation works │ └─────────────────────────────────────────────────────────┘ ↓ ┌─────────────────────────────────────────────────────────┐ │ WEEK 2: Validation │ ├─────────────────────────────────────────────────────────┤ │ Mon-Fri: Monitor production, collect metrics │ │ Result: Confidence in Docker API solution │ └─────────────────────────────────────────────────────────┘ ↓ ┌─────────────────────────────────────────────────────────┐ │ WEEK 3: NATS Migration │ ├─────────────────────────────────────────────────────────┤ │ Mon: WHOOSH publisher implementation │ │ Tue: CHORUS subscriber implementation │ │ Wed AM: Integration testing │ │ Wed PM: Production deployment (dual-mode) │ │ Thu-Fri: Validation, remove Docker API fallback │ │ Result: Production NATS-based pub/sub messaging │ └─────────────────────────────────────────────────────────┘ ↓ ┌─────────────────────────────────────────────────────────┐ │ FUTURE: libp2p (when needed) │ ├─────────────────────────────────────────────────────────┤ │ Trigger: Need for P2P discovery, DHT storage, or │ │ cross-datacenter mesh │ │ Approach: Add libp2p for discovery, keep NATS for │ │ messaging (hybrid architecture) │ └─────────────────────────────────────────────────────────┘ ``` ### 8.4 Success Metrics **Phase 1 Success Criteria**: - ✅ All 34 CHORUS agents discovered by WHOOSH - ✅ Council broadcasts reach 100% of agents - ✅ Council formation completes in <60 seconds - ✅ Task execution begins successfully - ✅ Zero production incidents **Phase 2 Success Criteria**: - ✅ NATS message delivery rate >99.9% - ✅ Message latency <100ms (p95) - ✅ Zero message loss - ✅ Successful rollback test - ✅ Zero production incidents during migration ### 8.5 Risk Mitigation Summary | Risk | Mitigation | |------|------------| | Phase 1 deployment failure | Easy rollback (remove docker.sock mount) | | Docker API performance issues | Monitor task list query latency, cache results | | Phase 2 NATS connection failures | Keep HTTP fallback during transition, NATS clustering | | Message delivery failures | JetStream persistence, at-least-once delivery guarantees | | Coordinated deployment issues | Dual-mode operation (HTTP + NATS) during transition | | Team knowledge gaps | NATS already used in BACKBEAT, extensive documentation | ### 8.6 Final Architecture Vision ``` ┌────────────────────────────────────────────────────┐ │ FINAL ARCHITECTURE │ │ (End of Week 3) │ └────────────────────────────────────────────────────┘ ┌─────────────┐ │ WHOOSH │ - NATS publisher │ │ - Council orchestrator └──────┬──────┘ - Docker API for health monitoring │ │ Publish to NATS subject │ "chorus.councils.forming" ▼ ┌──────────────────────────────────────────────────┐ │ NATS Server (JetStream) │ │ - Already deployed as backbeat-nats │ │ - Clustered for HA (future) │ │ - Message persistence enabled │ │ - Monitoring via NATS CLI / Prometheus │ └──────┬──────┬──────┬──────┬──────┬──────────────┘ │ │ │ │ │ ┌───▼┐ ┌──▼─┐ ┌──▼─┐ ┌─▼──┐ (... 34 total) │CH1 │ │CH2 │ │CH3 │ │CH4 │ CHORUS agents │SUB │ │SUB │ │SUB │ │SUB │ - NATS subscribers └────┘ └────┘ └────┘ └────┘ - HTTP for health checks - libp2p for P2P comms (future) Benefits: ✅ 100% agent discovery ✅ Sub-100ms message delivery ✅ At-least-once delivery guarantees ✅ Message persistence and replay ✅ Scalable to 1000+ agents ✅ Familiar technology stack ✅ Simple operational model ✅ Clear upgrade path to libp2p ``` ### 8.7 Action Items **Immediate (Next 24 hours)**: 1. [ ] Approve Hybrid Approach recommendation 2. [ ] Schedule Phase 1 implementation (Friday) 3. [ ] Assign developer resources 4. [ ] Prepare rollback procedures 5. [ ] Set up monitoring dashboards **Week 1**: 1. [ ] Implement Docker API discovery 2. [ ] Deploy to production Swarm 3. [ ] Validate 100% agent discovery 4. [ ] Verify council formation E2E 5. [ ] Document Phase 1 results **Week 2**: 1. [ ] Monitor production stability 2. [ ] Collect performance metrics 3. [ ] Plan Phase 2 NATS migration 4. [ ] Review NATS clustering options 5. [ ] Schedule Phase 2 deployment window **Week 3**: 1. [ ] Implement NATS publisher/subscriber 2. [ ] Integration testing 3. [ ] Production deployment (dual-mode) 4. [ ] Validate NATS messaging 5. [ ] Remove Docker API fallback 6. [ ] Document final architecture --- ## 9. Appendix ### 9.1 ASCII Architecture Diagram - Current vs. Future **CURRENT (Broken)**: ``` ┌─────────┐ ┌─────────────────────┐ │ WHOOSH │────HTTP GET───────→│ Docker Swarm VIP │ │ │ (chorus:8081) │ 10.0.13.26 │ └─────────┘ └──────────┬──────────┘ │ Round-robin to random ↓ ┌───────────┴───────────┐ ▼ ▼ ┌─────────┐ ┌─────────┐ │CHORUS #1│ │CHORUS #2│ │ FOUND │ │ FOUND │ └─────────┘ └─────────┘ ❌ 32 other agents invisible! ``` **PHASE 1 (Docker API)**: ``` ┌─────────┐ ┌─────────────────────┐ │ WHOOSH │────Docker API─────→│ Swarm Manager │ │ + sock │ ListTasks() │ /var/run/docker.sock│ └─────────┘ └──────────┬──────────┘ │ Returns ALL 34 tasks with IPs ↓ ┌─────────────────────┴──────────────┐ ▼ ▼ ▼ ┌─────────┐ ┌─────────┐ ┌─────────┐ │CHORUS #1│ │CHORUS #2│ ... │CHORUS #34│ │10.0.13.1│ │10.0.13.2│ │10.0.13.34│ └─────────┘ └─────────┘ └─────────┘ ✅ ALL agents discovered via direct IP! ``` **PHASE 2 (NATS)**: ``` ┌─────────┐ ┌─────────────────────┐ │ WHOOSH │────PUBLISH────────→│ NATS Server │ │ + NATS │ subject: │ (JetStream) │ └─────────┘ chorus.councils.* └──────────┬──────────┘ │ Broadcast to all subscribers │ ┌─────────────────────┴──────────────┐ ▼ ▼ ▼ ┌─────────┐ ┌─────────┐ ┌─────────┐ │CHORUS #1│ │CHORUS #2│ ... │CHORUS #34│ │SUB: │ │SUB: │ │SUB: │ │forming │ │forming │ │forming │ └─────────┘ └─────────┘ └─────────┘ ✅ Pub/sub messaging, all receive simultaneously! ``` ### 9.2 Code File Changes Summary **Phase 1: Docker API** | File | Action | LOC | Description | |------|--------|-----|-------------| | `WHOOSH/internal/p2p/discovery.go` | MODIFY | +80 | Add discoverDockerSwarmAgents() | | `WHOOSH/go.mod` | MODIFY | +2 | Add docker/docker dependency | | `WHOOSH/docker-compose.swarm.yml` | MODIFY | +5 | Mount docker.sock | | **Total** | | **87** | | **Phase 2: NATS** | File | Action | LOC | Description | |------|--------|-----|-------------| | `WHOOSH/internal/nats/publisher.go` | CREATE | +150 | NATS publisher implementation | | `WHOOSH/internal/server/server.go` | MODIFY | +20 | Integrate NATS publisher | | `WHOOSH/internal/p2p/broadcaster.go` | MODIFY | +30 | Use NATS for broadcasting | | `WHOOSH/go.mod` | MODIFY | +2 | Add nats-io/nats.go | | `CHORUS/internal/nats/subscriber.go` | CREATE | +180 | NATS subscriber implementation | | `CHORUS/cmd/chorus-agent/main.go` | MODIFY | +30 | Initialize NATS subscriber | | `CHORUS/go.mod` | MODIFY | +2 | Add nats-io/nats.go | | **Total** | | **414** | | ### 9.3 Environment Variables Reference **WHOOSH**: ```bash # Phase 1 WHOOSH_DISCOVERY_MODE=docker-swarm CHORUS_SERVICE_NAME=CHORUS_chorus CHORUS_AGENT_PORT=8080 # Phase 2 NATS_URL=nats://backbeat-nats:4222 ``` **CHORUS**: ```bash # Phase 2 NATS_URL=nats://backbeat-nats:4222 CHORUS_AGENT_ID=auto-generated-uuid CHORUS_AGENT_NAME=CHORUS-Agent-{slot} ``` ### 9.4 Monitoring and Observability **Phase 1 Metrics**: ```prometheus # WHOOSH discovery metrics whoosh_agent_discovery_total{method="docker_api"} 34 whoosh_agent_discovery_duration_seconds{method="docker_api"} 0.125 whoosh_broadcast_success_total{council_id="xxx"} 34 whoosh_broadcast_error_total{council_id="xxx"} 0 ``` **Phase 2 Metrics**: ```prometheus # NATS metrics (built-in) nats_connections{name="WHOOSH"} 1 nats_subscriptions{subject="chorus.councils.forming"} 34 nats_messages_in{subject="chorus.councils.forming"} 156 nats_messages_out{subject="chorus.councils.forming"} 156 nats_message_latency_seconds{p95} 0.045 # Application metrics whoosh_nats_publish_total{subject="chorus.councils.forming"} 156 whoosh_nats_publish_errors_total 0 chorus_nats_receive_total{subject="chorus.councils.forming"} 34 chorus_council_opportunity_handled_total 34 ``` --- ## Conclusion The Hybrid Approach (Docker API → NATS) provides the optimal balance of: - **Immediate value** (1-day unblock) - **Low risk** (staged rollout) - **Long-term quality** (production-grade pub/sub) - **Pragmatic complexity** (simpler than libp2p, sufficient for needs) This architecture will support 100+ agents, sub-second message delivery, and future migration to full libp2p if/when P2P discovery becomes a requirement. **Recommended Decision**: Approve Hybrid Approach and begin Phase 1 implementation immediately. --- **Document Version**: 1.0 **Last Updated**: 2025-10-10 **Review Date**: 2025-10-17 (after Phase 1 completion)