Files
CHORUS/docs/ARCHITECTURE-ANALYSIS-LibP2P-HMMM-Migration.md

74 KiB

Agent Discovery and P2P Architecture Migration Analysis

Date: 2025-10-10 Author: Claude Code (Senior Software Architect) Status: CRITICAL ARCHITECTURE DECISION Context: WHOOSH discovers only 2/34 CHORUS agents, blocking E2E workflow


Executive Summary

The current HTTP/DNS-based discovery architecture is fundamentally incompatible with Docker Swarm's VIP-based load balancing. This analysis evaluates three migration approaches to resolve the agent discovery crisis and provides detailed implementation guidance.

Recommendation Summary

SHORT-TERM (Next 24-48 hours): Deploy Option 2 - Docker API Quick Fix LONG-TERM (Next 2 weeks): Migrate to Option 4 - NATS-Only Solution (simpler than full libp2p)

Rationale: Docker API provides immediate unblock with minimal risk, while NATS-based pub/sub delivers 80% of libp2p benefits with 20% of the complexity. Full libp2p migration deferred until true P2P discovery (mDNS/DHT) is required.


Table of Contents

  1. Current Architecture Analysis
  2. Complexity Assessment
  3. Technical Feasibility Evaluation
  4. Alternative Solution Comparison
  5. Risk Analysis Matrix
  6. Detailed Implementation Plans
  7. Timeline Estimates
  8. Final Recommendation

1. Current Architecture Analysis

1.1 Existing System Flow

┌──────────────────────────────────────────────────────────────┐
│                    CURRENT BROKEN FLOW                        │
└──────────────────────────────────────────────────────────────┘

┌─────────────┐
│   WHOOSH    │  Discovery Service
│  (1 node)   │  - internal/p2p/discovery.go
└──────┬──────┘  - internal/p2p/broadcaster.go
       │
       │ 1. DNS Lookup: "chorus" service
       ▼
┌──────────────────┐
│  Docker Swarm    │  Returns: 10.0.13.26 (VIP only)
│  DNS Service     │  Expected: 34 container IPs
└────────┬─────────┘
         │
         │ 2. HTTP GET http://chorus:8081/health
         ▼
┌──────────────────┐
│  Swarm VIP LB    │  Round-robins to random containers
│  10.0.13.26      │  WHOOSH gets ~2 unique responses
└────────┬─────────┘
         │
    ┌────┴─────┬──────────────┬────────────┐
    ▼          ▼              ▼            ▼  (... 34 total)
┌────────┐ ┌────────┐    ┌────────┐  ┌────────┐
│ CHORUS │ │ CHORUS │    │ CHORUS │  │ CHORUS │
│ Agent1 │ │ Agent2 │    │ Agent3 │  │ Agent34│
└────────┘ └────────┘    └────────┘  └────────┘
    ✅         ✅            ❌           ❌
(Discovered) (Discovered) (Invisible) (Invisible)

Result: Only 2/34 agents discovered → Council formation fails

1.2 Root Cause Analysis

Problem: Docker Swarm DNS returns Service VIP, not individual container IPs

Evidence from codebase:

// File: /home/tony/chorus/project-queues/active/WHOOSH/internal/p2p/discovery.go:222-235

func (d *Discovery) queryActualCHORUSService() {
    client := &http.Client{Timeout: 10 * time.Second}

    // ❌ BROKEN: This queries VIP, not all containers
    endpoint := "http://chorus:8081/health"
    resp, err := client.Get(endpoint)

    // Only discovers ONE agent per request
    // VIP load-balancer picks random container
    // Result: ~2-5 agents found after multiple attempts
}

Why this fails:

  1. DNS chorus10.0.13.26 (VIP only)
  2. No API to enumerate all 34 backend containers
  3. HTTP requires knowing endpoints in advance
  4. VIP randomization means discovery is non-deterministic

2. Complexity Assessment

2.1 Option 1: Full libp2p/HMMM Migration

Components to Modify:

Component File Path Changes Required LOC Changed Complexity
WHOOSH Discovery internal/p2p/discovery.go Replace HTTP with libp2p host init 200+ HIGH
WHOOSH Broadcaster internal/p2p/broadcaster.go Convert to pub/sub topics 150+ HIGH
WHOOSH Main cmd/whoosh/main.go Initialize libp2p node 50+ MEDIUM
CHORUS HTTP API api/http_server.go Add topic subscriptions 100+ MEDIUM
CHORUS Council Mgr internal/council/manager.go Subscribe to topics 80+ MEDIUM
CHORUS Main cmd/chorus-agent/main.go Initialize libp2p node 50+ MEDIUM
HMMM BACKBEAT Config Docker compose updates Topic configuration 20+ LOW
TOTAL 650+ LOC HIGH

Complexity Rating: HIGH

Rationale:

  • libp2p requires understanding gossipsub protocol
  • Peer discovery via mDNS/DHT adds network complexity
  • Need to maintain both HTTP (health checks) and libp2p
  • Breaking change requiring coordinated deployment
  • Testing requires full cluster stack

2.2 Option 2: Docker API Quick Fix

Components to Modify:

Component File Path Changes Required LOC Changed Complexity
WHOOSH Discovery internal/p2p/discovery.go Add Docker client integration 80 LOW
WHOOSH Config docker-compose.swarm.yml Mount docker.sock 5 LOW
Dependencies go.mod Add docker/docker library 2 LOW
TOTAL 87 LOC LOW

Complexity Rating: LOW

Rationale:

  • Single file modification
  • Well-documented Docker Go SDK
  • No architecture changes
  • Easy rollback (remove socket mount)
  • No distributed systems complexity

2.3 Option 3: NATS-Only Solution

Components to Modify:

Component File Path Changes Required LOC Changed Complexity
WHOOSH Publisher internal/p2p/broadcaster.go Replace HTTP with NATS pub 100 MEDIUM
WHOOSH Main cmd/whoosh/main.go Initialize NATS connection 30 LOW
CHORUS Subscriber api/http_server.go Subscribe to NATS topics 80 MEDIUM
CHORUS Main cmd/chorus-agent/main.go Initialize NATS connection 30 LOW
Dependencies go.mod (both repos) Add nats-io/nats.go 4 LOW
TOTAL 244 LOC MEDIUM

Complexity Rating: MEDIUM

Rationale:

  • NATS SDK is simple (already used in BACKBEAT)
  • No peer discovery needed (centralized NATS broker)
  • Maintains HTTP for health checks
  • Production-ready infrastructure (NATS already deployed)
  • Moderate testing complexity

2.4 Option 4: Hybrid Approach

Phase 1 (Docker API): LOW complexity (87 LOC, 1-2 days) Phase 2 (NATS Migration): MEDIUM complexity (244 LOC, 3-4 days)

Total Complexity Rating: MEDIUM (staged deployment reduces risk)


3. Technical Feasibility Evaluation

3.1 Can we use NATS directly instead of full libp2p?

YES - Strongly Recommended

Comparison:

Feature libp2p NATS Requirement
Pub/Sub Messaging Gossipsub JetStream Required
Broadcast to all agents Topic-based Subject-based Required
Peer Discovery mDNS/DHT Centralized ⚠️ Not needed yet
At-least-once delivery JetStream Required
Message persistence ⚠️ Complex Built-in 🎯 Nice-to-have
Cluster-wide deployment Decentralized Centralized Either works
Existing infrastructure New Already deployed 🎯 Huge win
Learning curve 🔴 Steep 🟢 Gentle 🎯 Team familiarity
Production maturity 🟡 Good 🟢 Excellent Required

Evidence from deployed infrastructure:

# File: /home/tony/chorus/project-queues/active/WHOOSH/BACKBEAT-prototype/docker-compose.swarm.yml:155-190

services:
  nats:
    image: nats:2.9-alpine
    command: ["--jetstream"]  # ✅ JetStream already enabled
    deploy:
      replicas: 1
    networks:
      - backbeat-net  # ✅ Shared network with WHOOSH/CHORUS

BACKBEAT SDK already uses NATS:

// File: /home/tony/chorus/project-queues/active/WHOOSH/BACKBEAT-prototype/pkg/sdk/client.go:15

import (
    "github.com/nats-io/nats.go"  // ✅ Already in vendor
)

// Subject pattern already established
subject := fmt.Sprintf("backbeat.status.%s", c.config.ClusterID)

Recommendation: Use NATS for 90% of use cases. Defer libp2p until you need:

  • True P2P discovery (no centralized broker)
  • Cross-datacenter mesh networking
  • DHT-based content routing

3.2 What's the minimal viable libp2p implementation?

If you must use libp2p (not recommended for MVP):

// Minimal libp2p setup for pub/sub only

package main

import (
    "context"
    "github.com/libp2p/go-libp2p"
    pubsub "github.com/libp2p/go-libp2p-pubsub"
    "github.com/multiformats/go-multiaddr"
)

func InitMinimalLibp2p(ctx context.Context) (*pubsub.PubSub, error) {
    // 1. Create libp2p host
    host, err := libp2p.New(
        libp2p.ListenAddrStrings("/ip4/0.0.0.0/tcp/9000"),
    )
    if err != nil {
        return nil, err
    }

    // 2. Create gossipsub instance
    ps, err := pubsub.NewGossipSub(ctx, host)
    if err != nil {
        return nil, err
    }

    // 3. Bootstrap to peers (requires known peer addresses)
    bootstrapPeers := []string{
        "/ip4/192.168.1.27/tcp/9000/p2p/QmYourPeerID",
    }

    for _, peerAddr := range bootstrapPeers {
        addr, _ := multiaddr.NewMultiaddr(peerAddr)
        host.Connect(ctx, addr)
    }

    return ps, nil
}

// Usage in WHOOSH
func BroadcastCouncilOpportunity(ps *pubsub.PubSub, opp *CouncilOpportunity) error {
    topic, _ := ps.Join("councils.forming")

    data, _ := json.Marshal(opp)
    return topic.Publish(context.Background(), data)
}

// Usage in CHORUS
func SubscribeToOpportunities(ps *pubsub.PubSub) {
    topic, _ := ps.Join("councils.forming")
    sub, _ := topic.Subscribe()

    for {
        msg, _ := sub.Next(context.Background())
        var opp CouncilOpportunity
        json.Unmarshal(msg.Data, &opp)
        handleOpportunity(opp)
    }
}

Minimal dependencies:

go get github.com/libp2p/go-libp2p@latest
go get github.com/libp2p/go-libp2p-pubsub@latest

Estimated complexity: 300 LOC (still 3x more than NATS)

3.3 Are there Go libraries/examples we can leverage?

YES - Multiple options

Option A: NATS Go Client (Recommended)

# Already in BACKBEAT SDK vendor directory
/home/tony/chorus/project-queues/active/WHOOSH/vendor/github.com/nats-io/nats.go/

Example from BACKBEAT SDK:

// File: internal/backbeat/integration.go (hypothetical WHOOSH usage)

import "github.com/nats-io/nats.go"

type WHOOSHPublisher struct {
    nc *nats.Conn
}

func NewWHOOSHPublisher(natsURL string) (*WHOOSHPublisher, error) {
    nc, err := nats.Connect(natsURL)
    if err != nil {
        return nil, err
    }

    return &WHOOSHPublisher{nc: nc}, nil
}

func (w *WHOOSHPublisher) BroadcastCouncilOpportunity(opp *CouncilOpportunity) error {
    data, _ := json.Marshal(opp)

    // Publish to all CHORUS agents subscribed to this subject
    return w.nc.Publish("chorus.councils.forming", data)
}

Option B: Docker Go SDK

# For Docker API quick fix
go get github.com/docker/docker@latest

Example:

// File: internal/p2p/discovery_swarm.go (new file)

import (
    "context"
    "github.com/docker/docker/client"
    "github.com/docker/docker/api/types"
    "github.com/docker/docker/api/types/filters"
)

func (d *Discovery) DiscoverSwarmAgents() ([]*Agent, error) {
    cli, err := client.NewClientWithOpts(client.FromEnv)
    if err != nil {
        return nil, err
    }
    defer cli.Close()

    // List all tasks for CHORUS service
    tasks, err := cli.TaskList(context.Background(), types.TaskListOptions{
        Filters: filters.NewArgs(
            filters.Arg("service", "CHORUS_chorus"),
            filters.Arg("desired-state", "running"),
        ),
    })

    agents := []*Agent{}
    for _, task := range tasks {
        // Extract container IP from task
        for _, attachment := range task.NetworksAttachments {
            if len(attachment.Addresses) > 0 {
                ip := strings.Split(attachment.Addresses[0], "/")[0]
                endpoint := fmt.Sprintf("http://%s:8080", ip)

                agents = append(agents, &Agent{
                    ID:       task.ID[:12],
                    Endpoint: endpoint,
                    Status:   "online",
                    // ... populate other fields
                })
            }
        }
    }

    return agents, nil
}

Option C: libp2p Examples

# Official examples repository
git clone https://github.com/libp2p/go-libp2p-examples.git

# Relevant examples:
# - pubsub-example/
# - echo/
# - chat-with-rendezvous/

3.4 What about backwards compatibility during migration?

Strategy: Dual-mode operation during transition

// File: internal/p2p/broadcaster.go

type Broadcaster struct {
    discovery *Discovery
    natsConn  *nats.Conn  // New: NATS connection
    ctx       context.Context
    cancel    context.CancelFunc

    // Feature flag for gradual rollout
    useNATS   bool
}

func (b *Broadcaster) BroadcastCouncilOpportunity(ctx context.Context, opp *CouncilOpportunity) error {
    if b.useNATS && b.natsConn != nil {
        // New path: NATS pub/sub
        return b.broadcastViaNATS(ctx, opp)
    }

    // Legacy path: HTTP POST (existing code)
    return b.broadcastViaHTTP(ctx, opp)
}

func (b *Broadcaster) broadcastViaNATS(ctx context.Context, opp *CouncilOpportunity) error {
    data, err := json.Marshal(opp)
    if err != nil {
        return err
    }

    subject := fmt.Sprintf("chorus.councils.%s.forming", opp.CouncilID)
    return b.natsConn.Publish(subject, data)
}

func (b *Broadcaster) broadcastViaHTTP(ctx context.Context, opp *CouncilOpportunity) error {
    // Existing HTTP broadcast logic (unchanged)
    agents := b.discovery.GetAgents()
    // ... existing code
}

Deployment strategy:

  1. Week 1: Deploy NATS code with useNATS=false (no behavior change)
  2. Week 2: Enable NATS for 10% of broadcasts (canary testing)
  3. Week 3: Enable NATS for 100% of broadcasts
  4. Week 4: Remove HTTP broadcast code entirely

Rollback: Set useNATS=false via environment variable


4. Alternative Solution Comparison

4.1 Option 1: Full libp2p/HMMM Migration

Architecture:

┌─────────────┐
│   WHOOSH    │
│  (libp2p)   │  - Initialize libp2p host
└──────┬──────┘  - Join "councils.forming" topic
       │          - Publish council opportunities
       │
       │ Publishes to libp2p gossipsub topic
       ▼
┌──────────────────────────────────────────┐
│  libp2p Gossipsub Network                │
│  - Peer discovery via mDNS               │
│  - DHT routing (optional)                │
│  - Message propagation to all subscribers│
└──────┬─────┬─────┬─────┬─────┬──────────┘
       │     │     │     │     │
   ┌───▼┐ ┌──▼─┐ ┌─▼──┐ ┌▼──┐ (... 34 total)
   │CH1 │ │CH2 │ │CH3 │ │CH4│  All CHORUS agents
   └────┘ └────┘ └────┘ └───┘  subscribe to topic

   ✅ ALL 34 agents receive message simultaneously

Pros:

  • True P2P architecture (no central broker)
  • Automatic peer discovery via mDNS/DHT
  • Built-in encryption and authentication
  • Message deduplication
  • Resilient to network partitions
  • Unlocks future P2P features (DHT storage, streams)

Cons:

  • High complexity (650+ LOC changes)
  • Steep learning curve
  • Requires libp2p expertise
  • Breaking change (coordinated deployment)
  • Complex testing requirements
  • Overkill for current needs

Implementation time: 7-11 days

When to use: When you need true P2P discovery without central infrastructure

4.2 Option 2: Docker API Quick Fix

Architecture:

┌─────────────┐
│   WHOOSH    │
│ (Docker API)│  - Mount /var/run/docker.sock
└──────┬──────┘  - Query Swarm API for tasks
       │
       │ GET /tasks?service=CHORUS_chorus
       ▼
┌──────────────────┐
│  Docker Engine   │  Returns: List of all 34 tasks
│  (Swarm Manager) │  with container IPs
└────────┬─────────┘
         │
    ┌────┴─────┬──────────────┬────────────┐
    ▼          ▼              ▼            ▼  (... 34 total)
┌────────┐ ┌────────┐    ┌────────┐  ┌────────┐
│10.0.13.1│ │10.0.13.2│  │10.0.13.3│ │10.0.13.34│
│CHORUS 1│ │CHORUS 2│    │CHORUS 3│ │CHORUS 34│
└────────┘ └────────┘    └────────┘ └────────┘

WHOOSH makes HTTP POST to all 34 container IPs
✅ 100% discovery rate

Code changes:

// File: internal/p2p/discovery.go (add new method)

import (
    "github.com/docker/docker/client"
    "github.com/docker/docker/api/types"
    "github.com/docker/docker/api/types/filters"
)

func (d *Discovery) discoverDockerSwarmAgents() {
    cli, err := client.NewClientWithOpts(client.FromEnv)
    if err != nil {
        log.Error().Err(err).Msg("Failed to create Docker client")
        return
    }
    defer cli.Close()

    // Query Swarm for all CHORUS tasks
    tasks, err := cli.TaskList(context.Background(), types.TaskListOptions{
        Filters: filters.NewArgs(
            filters.Arg("service", "CHORUS_chorus"),
            filters.Arg("desired-state", "running"),
        ),
    })
    if err != nil {
        log.Error().Err(err).Msg("Failed to list Swarm tasks")
        return
    }

    log.Info().Int("task_count", len(tasks)).Msg("Discovered Swarm tasks")

    for _, task := range tasks {
        if len(task.NetworksAttachments) == 0 {
            continue
        }

        for _, attachment := range task.NetworksAttachments {
            if len(attachment.Addresses) == 0 {
                continue
            }

            // Extract IP from CIDR (e.g., "10.0.13.5/24" -> "10.0.13.5")
            ip := strings.Split(attachment.Addresses[0], "/")[0]
            endpoint := fmt.Sprintf("http://%s:8080", ip)

            agent := &Agent{
                ID:       task.ID[:12], // Use first 12 chars of task ID
                Name:     fmt.Sprintf("CHORUS-%s", task.Slot),
                Endpoint: endpoint,
                Status:   "online",
                Capabilities: []string{
                    "general_development",
                    "task_coordination",
                },
                Model:      "llama3.1:8b",
                LastSeen:   time.Now(),
                P2PAddr:    fmt.Sprintf("%s:9000", ip),
                ClusterID:  "docker-unified-stack",
            }

            d.addOrUpdateAgent(agent)

            log.Debug().
                Str("task_id", task.ID[:12]).
                Str("ip", ip).
                Str("endpoint", endpoint).
                Msg("Discovered CHORUS agent via Docker API")
        }
    }
}

Deployment changes:

# File: docker/docker-compose.swarm.yml (WHOOSH service)

services:
  whoosh:
    image: anthonyrawlins/whoosh:latest
    volumes:
      - /var/run/docker.sock:/var/run/docker.sock:ro  # Mount Docker socket
    environment:
      - WHOOSH_DISCOVERY_MODE=docker-swarm

Pros:

  • Minimal code changes (87 LOC)
  • Immediate solution (1-2 days)
  • 100% discovery rate
  • Easy to test and validate
  • Simple rollback (remove socket mount)
  • No distributed systems complexity

Cons:

  • ⚠️ Requires privileged Docker socket access
  • ⚠️ Couples WHOOSH to Docker Swarm
  • ⚠️ Doesn't enable pub/sub messaging
  • ⚠️ Still uses HTTP (no performance benefits)
  • ⚠️ Not portable to Kubernetes/other orchestrators

Implementation time: 1-2 days

When to use: Emergency unblock, temporary fix while planning proper migration

Architecture:

┌─────────────┐
│   WHOOSH    │
│ (NATS client)│  - Connect to NATS broker
└──────┬──────┘  - Publish to subject
       │
       │ PUBLISH chorus.councils.forming {...}
       ▼
┌──────────────────────────────────────────┐
│  NATS Server (JetStream)                 │
│  - Already deployed as backbeat-nats     │
│  - Message persistence                   │
│  - At-least-once delivery                │
└──────┬─────┬─────┬─────┬─────┬──────────┘
       │     │     │     │     │
   ┌───▼┐ ┌──▼─┐ ┌─▼──┐ ┌▼──┐ (... 34 total)
   │CH1 │ │CH2 │ │CH3 │ │CH4│  All CHORUS agents
   │SUB │ │SUB │ │SUB │ │SUB│  subscribe on startup
   └────┘ └────┘ └────┘ └───┘

   ✅ ALL 34 agents receive message via subscription

Code changes - WHOOSH:

// File: internal/nats/publisher.go (new file)

package nats

import (
    "context"
    "encoding/json"
    "fmt"

    "github.com/nats-io/nats.go"
    "github.com/rs/zerolog/log"
)

type CouncilPublisher struct {
    nc *nats.Conn
}

func NewCouncilPublisher(natsURL string) (*CouncilPublisher, error) {
    nc, err := nats.Connect(natsURL,
        nats.MaxReconnects(-1),  // Infinite reconnects
        nats.ReconnectWait(1*time.Second),
    )
    if err != nil {
        return nil, fmt.Errorf("failed to connect to NATS: %w", err)
    }

    log.Info().Str("nats_url", natsURL).Msg("Connected to NATS for council publishing")

    return &CouncilPublisher{nc: nc}, nil
}

func (p *CouncilPublisher) PublishCouncilOpportunity(ctx context.Context, opp *CouncilOpportunity) error {
    data, err := json.Marshal(opp)
    if err != nil {
        return fmt.Errorf("failed to marshal opportunity: %w", err)
    }

    subject := "chorus.councils.forming"

    // Publish with headers for tracing
    msg := &nats.Msg{
        Subject: subject,
        Data:    data,
        Header: nats.Header{
            "Council-ID": []string{opp.CouncilID.String()},
            "Project":    []string{opp.ProjectName},
        },
    }

    if err := p.nc.PublishMsg(msg); err != nil {
        return fmt.Errorf("failed to publish to NATS: %w", err)
    }

    log.Info().
        Str("council_id", opp.CouncilID.String()).
        Str("subject", subject).
        Msg("Published council opportunity to NATS")

    return nil
}

func (p *CouncilPublisher) Close() error {
    if p.nc != nil {
        p.nc.Close()
    }
    return nil
}

Code changes - CHORUS:

// File: internal/nats/subscriber.go (new file)

package nats

import (
    "context"
    "encoding/json"
    "fmt"

    "chorus/internal/council"
    "github.com/nats-io/nats.go"
    "github.com/rs/zerolog/log"
)

type CouncilSubscriber struct {
    nc         *nats.Conn
    sub        *nats.Subscription
    councilMgr *council.Manager
}

func NewCouncilSubscriber(natsURL string, councilMgr *council.Manager) (*CouncilSubscriber, error) {
    nc, err := nats.Connect(natsURL,
        nats.MaxReconnects(-1),
        nats.ReconnectWait(1*time.Second),
    )
    if err != nil {
        return nil, fmt.Errorf("failed to connect to NATS: %w", err)
    }

    s := &CouncilSubscriber{
        nc:         nc,
        councilMgr: councilMgr,
    }

    // Subscribe to council opportunities
    sub, err := nc.Subscribe("chorus.councils.forming", s.handleCouncilOpportunity)
    if err != nil {
        nc.Close()
        return nil, fmt.Errorf("failed to subscribe to council opportunities: %w", err)
    }
    s.sub = sub

    log.Info().
        Str("nats_url", natsURL).
        Str("subject", "chorus.councils.forming").
        Msg("Subscribed to council opportunities on NATS")

    return s, nil
}

func (s *CouncilSubscriber) handleCouncilOpportunity(msg *nats.Msg) {
    var opp council.Opportunity
    if err := json.Unmarshal(msg.Data, &opp); err != nil {
        log.Error().Err(err).Msg("Failed to unmarshal council opportunity")
        return
    }

    log.Info().
        Str("council_id", opp.CouncilID.String()).
        Str("project", opp.ProjectName).
        Msg("Received council opportunity from NATS")

    // Delegate to council manager (existing logic)
    if err := s.councilMgr.HandleOpportunity(context.Background(), &opp); err != nil {
        log.Error().
            Err(err).
            Str("council_id", opp.CouncilID.String()).
            Msg("Failed to handle council opportunity")
    }
}

func (s *CouncilSubscriber) Close() error {
    if s.sub != nil {
        s.sub.Unsubscribe()
    }
    if s.nc != nil {
        s.nc.Close()
    }
    return nil
}

Integration in CHORUS main:

// File: cmd/chorus-agent/main.go

func main() {
    // ... existing setup

    // Initialize council manager
    councilMgr := council.NewManager(agentID, agentName, endpoint, p2pAddr, capabilities)

    // NEW: Subscribe to NATS instead of HTTP endpoint
    natsURL := os.Getenv("NATS_URL")
    if natsURL == "" {
        natsURL = "nats://backbeat-nats:4222"
    }

    natsSubscriber, err := nats.NewCouncilSubscriber(natsURL, councilMgr)
    if err != nil {
        log.Fatal().Err(err).Msg("Failed to initialize NATS subscriber")
    }
    defer natsSubscriber.Close()

    // HTTP server still runs for health checks (optional)
    if enableHTTP {
        httpServer := api.NewHTTPServer(cfg, node, hlog, ps)
        go httpServer.Start()
    }

    // ... rest of main
}

Pros:

  • Simpler than libp2p (244 vs 650 LOC)
  • Production-ready (NATS battle-tested)
  • Already deployed (backbeat-nats running)
  • 100% broadcast delivery
  • Message persistence (JetStream)
  • Familiar SDK (used in BACKBEAT)
  • Easy testing (local NATS for dev)
  • Portable (works with any orchestrator)

Cons:

  • ⚠️ Centralized (NATS is single point of failure)
  • ⚠️ No automatic peer discovery (not needed)
  • ⚠️ Requires NATS infrastructure (already have it)

Implementation time: 3-4 days

When to use: Production-grade pub/sub without P2P complexity

4.4 Comparison Matrix

Criteria Option 1: libp2p Option 2: Docker API Option 3: NATS Option 4: Hybrid
Discovery Success 100% (gossipsub) 100% (API query) 100% (pub/sub) 100% (both phases)
Implementation Time 7-11 days 1-2 days 3-4 days 2 days + 3-4 days
Code Complexity HIGH (650 LOC) LOW (87 LOC) MEDIUM (244 LOC) LOW → MEDIUM
Testing Complexity HIGH LOW MEDIUM LOW → MEDIUM
Production Risk HIGH LOW MEDIUM LOW (staged)
Scalability Excellent Good Excellent Excellent
Infrastructure Deps None (P2P) Docker Swarm NATS (have it) Both
Learning Curve Steep Gentle Gentle Gentle
Backwards Compat Breaking change Non-breaking Breaking change Dual-mode
Rollback Ease Difficult Easy Moderate Easy (per phase)
Future-Proofing Best (P2P ready) Poor (Docker-only) Good Good
Message Persistence Complex N/A Built-in N/A → Built-in
Operational Overhead High Low Low Low
Deployment Coupling Tight Tight Moderate Moderate

Scoring (1-10, higher is better):

Option Immediate Value Long-term Value Risk Level Total Score
Option 1: libp2p 6/10 10/10 4/10 20/30
Option 2: Docker API 10/10 4/10 9/10 23/30
Option 3: NATS 8/10 9/10 7/10 24/30
Option 4: Hybrid 10/10 9/10 8/10 27/30

5. Risk Analysis Matrix

5.1 Technical Risks

Risk Option 1: libp2p Option 2: Docker API Option 3: NATS Option 4: Hybrid
Peer discovery failure LOW (mDNS/DHT) N/A N/A N/A
Message delivery failure LOW (retry logic) MEDIUM (HTTP timeout) LOW (NATS ack) LOW → LOW
Network partition LOW (eventual consistency) HIGH (no retries) MEDIUM (NATS reconnect) HIGH → MEDIUM
Scale bottleneck LOW (P2P) MEDIUM (VIP load) LOW (NATS scales) MEDIUM → LOW
Implementation bugs HIGH (complex) LOW (simple) MEDIUM LOW → MEDIUM
Integration issues HIGH (new system) LOW (existing stack) MEDIUM (new client) LOW → MEDIUM
Performance degradation LOW (efficient) MEDIUM (HTTP overhead) LOW (fast broker) MEDIUM → LOW

5.2 Deployment Risks

Risk Option 1: libp2p Option 2: Docker API Option 3: NATS Option 4: Hybrid
Coordinated deployment CRITICAL LOW HIGH LOW → HIGH
Rollback complexity HIGH (breaking) LOW (remove mount) MEDIUM (revert code) LOW → MEDIUM
Downtime required YES (breaking) NO YES (breaking) NO → YES
Service interruption HIGH (full restart) LOW (WHOOSH only) MEDIUM (both services) LOW → MEDIUM
Config drift HIGH (new settings) LOW (one env var) MEDIUM (NATS URL) LOW → MEDIUM
Version compatibility CRITICAL N/A MEDIUM (NATS version) N/A → MEDIUM

5.3 Operational Risks

Risk Option 1: libp2p Option 2: Docker API Option 3: NATS Option 4: Hybrid
Monitoring gaps HIGH (new metrics) LOW (existing) MEDIUM (NATS metrics) LOW → MEDIUM
Debugging difficulty HIGH (distributed) LOW (centralized) MEDIUM (broker logs) LOW → MEDIUM
On-call burden HIGH (complex) LOW (simple) MEDIUM LOW → MEDIUM
Documentation debt HIGH (new system) LOW (minimal change) MEDIUM (NATS docs) LOW → MEDIUM
Team knowledge LOW (unfamiliar) HIGH (Docker known) MEDIUM (NATS used) HIGH → MEDIUM
Vendor lock-in LOW (open source) HIGH (Docker Swarm) MEDIUM (NATS) HIGH → MEDIUM

5.4 Mitigation Strategies

Option 1: libp2p

  • Risk: High implementation complexity
  • Mitigation:
    • Hire libp2p consultant for architecture review
    • Build prototype in isolated environment first
    • Extensive integration testing before production
    • Feature flagging for gradual rollout

Option 2: Docker API

  • Risk: Docker Swarm vendor lock-in
  • Mitigation:
    • Document as temporary solution
    • Plan NATS migration immediately after
    • Abstract discovery interface for future portability
    • Maintain HTTP broadcast as fallback

Option 3: NATS

  • Risk: NATS broker single point of failure
  • Mitigation:
    • Deploy NATS in clustered mode (3 replicas)
    • Enable JetStream for message persistence
    • Monitor NATS health and set up alerts
    • Document NATS recovery procedures

Option 4: Hybrid (Recommended)

  • Risk: Phase 2 migration complexity
  • Mitigation:
    • Validate Phase 1 thoroughly in production (1 week)
    • Implement dual-mode operation (HTTP + NATS) during Phase 2
    • Use feature flags for gradual NATS rollout
    • Maintain HTTP as fallback during transition

6. Detailed Implementation Plans

Timeline: 1-2 days

Step 1: Code Changes (4 hours)

File: /home/tony/chorus/project-queues/active/WHOOSH/internal/p2p/discovery.go

// Add import
import (
    "github.com/docker/docker/client"
    "github.com/docker/docker/api/types"
    "github.com/docker/docker/api/types/filters"
)

// Add new method after discoverRealCHORUSAgents()

func (d *Discovery) discoverDockerSwarmAgents() {
    if !d.config.DockerEnabled {
        return
    }

    cli, err := client.NewClientWithOpts(client.FromEnv)
    if err != nil {
        log.Debug().Err(err).Msg("Docker client not available (running outside Swarm?)")
        return
    }
    defer cli.Close()

    ctx := context.Background()

    // List all tasks for CHORUS service
    serviceName := os.Getenv("CHORUS_SERVICE_NAME")
    if serviceName == "" {
        serviceName = "CHORUS_chorus"  // Default stack name
    }

    tasks, err := cli.TaskList(ctx, types.TaskListOptions{
        Filters: filters.NewArgs(
            filters.Arg("service", serviceName),
            filters.Arg("desired-state", "running"),
        ),
    })

    if err != nil {
        log.Error().Err(err).Msg("Failed to list Docker Swarm tasks")
        return
    }

    log.Info().
        Int("task_count", len(tasks)).
        Str("service", serviceName).
        Msg("🐳 Discovered Docker Swarm tasks")

    discoveredCount := 0

    for _, task := range tasks {
        // Skip tasks without network attachments
        if len(task.NetworksAttachments) == 0 {
            log.Debug().Str("task_id", task.ID[:12]).Msg("Task has no network attachments")
            continue
        }

        // Extract IP from first network attachment
        for _, attachment := range task.NetworksAttachments {
            if len(attachment.Addresses) == 0 {
                continue
            }

            // Parse IP from CIDR notation (e.g., "10.0.13.5/24")
            ipCIDR := attachment.Addresses[0]
            ip := strings.Split(ipCIDR, "/")[0]

            // Construct agent endpoint
            port := os.Getenv("CHORUS_AGENT_PORT")
            if port == "" {
                port = "8080"
            }
            endpoint := fmt.Sprintf("http://%s:%s", ip, port)

            // Create agent entry
            agent := &Agent{
                ID:       task.ID[:12], // Short task ID
                Name:     fmt.Sprintf("CHORUS-Task-%d", task.Slot),
                Endpoint: endpoint,
                Status:   "online",
                Capabilities: []string{
                    "general_development",
                    "task_coordination",
                    "ai_integration",
                },
                Model:          "llama3.1:8b",
                LastSeen:       time.Now(),
                TasksCompleted: 0,
                P2PAddr:        fmt.Sprintf("%s:9000", ip),
                ClusterID:      "docker-unified-stack",
            }

            d.addOrUpdateAgent(agent)
            discoveredCount++

            log.Debug().
                Str("task_id", task.ID[:12]).
                Int("slot", int(task.Slot)).
                Str("ip", ip).
                Str("endpoint", endpoint).
                Msg("🤖 Discovered CHORUS agent via Docker API")

            break // Only use first network attachment
        }
    }

    log.Info().
        Int("discovered", discoveredCount).
        Int("total_tasks", len(tasks)).
        Msg("✅ Docker Swarm agent discovery completed")
}

Update discoverRealCHORUSAgents() to call new method:

func (d *Discovery) discoverRealCHORUSAgents() {
    log.Debug().Msg("🔍 Discovering real CHORUS agents via health endpoints")

    // Priority 1: Docker Swarm API (most accurate)
    d.discoverDockerSwarmAgents()

    // Priority 2: Known service endpoints (fallback)
    d.queryActualCHORUSService()
    d.discoverKnownEndpoints()
}

Step 2: Dependency Update (5 minutes)

File: /home/tony/chorus/project-queues/active/WHOOSH/go.mod

cd /home/tony/chorus/project-queues/active/WHOOSH
go get github.com/docker/docker@latest
go mod tidy

Step 3: Configuration Changes (10 minutes)

File: /home/tony/chorus/project-queues/active/WHOOSH/docker/docker-compose.swarm.yml

services:
  whoosh:
    image: anthonyrawlins/whoosh:latest

    # NEW: Mount Docker socket for Swarm API access
    volumes:
      - /var/run/docker.sock:/var/run/docker.sock:ro

    # NEW: Environment variables for Docker discovery
    environment:
      - WHOOSH_DISCOVERY_MODE=docker-swarm
      - CHORUS_SERVICE_NAME=CHORUS_chorus  # Match your stack name
      - CHORUS_AGENT_PORT=8080

    # IMPORTANT: Security note - Docker socket access is read-only
    # This allows querying tasks but not modifying them

Step 4: Build and Deploy (30 minutes)

# Build new WHOOSH image
cd /home/tony/chorus/project-queues/active/WHOOSH
docker build -t registry.home.deepblack.cloud/whoosh:v1.2.0-docker-discovery .
docker push registry.home.deepblack.cloud/whoosh:v1.2.0-docker-discovery

# Update compose file to use new image
# Edit docker/docker-compose.swarm.yml:
#   whoosh:
#     image: registry.home.deepblack.cloud/whoosh:v1.2.0-docker-discovery

# Redeploy to Swarm
docker stack deploy -c docker/docker-compose.swarm.yml WHOOSH

Step 5: Validation (1 hour)

# 1. Check WHOOSH logs for discovery
docker service logs -f WHOOSH_whoosh --tail 100

# Expected output:
# {"level":"info","task_count":34,"service":"CHORUS_chorus","message":"🐳 Discovered Docker Swarm tasks"}
# {"level":"info","discovered":34,"total_tasks":34,"message":"✅ Docker Swarm agent discovery completed"}

# 2. Check agent registry in database
docker exec -it $(docker ps -q -f name=WHOOSH_whoosh) psql -U whoosh -d whoosh -c \
  "SELECT COUNT(*) FROM agents WHERE status = 'available';"

# Expected: 34 rows

# 3. Trigger council formation
curl -X POST http://whoosh.chorus.services/api/v1/councils \
  -H "Content-Type: application/json" \
  -d '{
    "project_name": "Test Discovery",
    "repository": "https://gitea.chorus.services/tony/test",
    "core_roles": ["tpm", "senior-software-architect"]
  }'

# 4. Verify all agents received broadcast
docker service logs CHORUS_chorus --tail 1000 | grep "Received council opportunity"

# Expected: 34 log entries (one per agent)

Step 6: Monitoring and Rollback Plan (30 minutes)

Success Criteria:

  • All 34 CHORUS agents discovered
  • Council broadcasts reach all agents
  • Council formation completes successfully
  • No performance degradation

Rollback Procedure (if issues occur):

# 1. Remove Docker socket mount from compose file
# 2. Redeploy with previous image
docker stack deploy -c docker/docker-compose.swarm.yml.backup WHOOSH

# 3. Verify rollback
docker service logs WHOOSH_whoosh

Total Implementation Time: 6-8 hours


Timeline: 3-4 days

Day 1: WHOOSH Publisher Implementation

Task 1.1: Create NATS publisher module (3 hours)

New File: /home/tony/chorus/project-queues/active/WHOOSH/internal/nats/publisher.go

package nats

import (
    "context"
    "encoding/json"
    "fmt"
    "time"

    "github.com/chorus-services/whoosh/internal/p2p"
    "github.com/google/uuid"
    "github.com/nats-io/nats.go"
    "github.com/rs/zerolog/log"
)

type CouncilPublisher struct {
    nc *nats.Conn
}

func NewCouncilPublisher(natsURL string) (*CouncilPublisher, error) {
    opts := []nats.Option{
        nats.Name("WHOOSH Council Publisher"),
        nats.MaxReconnects(-1),  // Infinite reconnects
        nats.ReconnectWait(1 * time.Second),
        nats.DisconnectErrHandler(func(nc *nats.Conn, err error) {
            log.Warn().Err(err).Msg("NATS disconnected")
        }),
        nats.ReconnectHandler(func(nc *nats.Conn) {
            log.Info().Str("url", nc.ConnectedUrl()).Msg("NATS reconnected")
        }),
    }

    nc, err := nats.Connect(natsURL, opts...)
    if err != nil {
        return nil, fmt.Errorf("failed to connect to NATS at %s: %w", natsURL, err)
    }

    log.Info().
        Str("nats_url", natsURL).
        Str("server_url", nc.ConnectedUrl()).
        Msg("✅ Connected to NATS for council publishing")

    return &CouncilPublisher{nc: nc}, nil
}

func (p *CouncilPublisher) PublishCouncilOpportunity(ctx context.Context, opp *p2p.CouncilOpportunity) error {
    // Marshal opportunity to JSON
    data, err := json.Marshal(opp)
    if err != nil {
        return fmt.Errorf("failed to marshal council opportunity: %w", err)
    }

    // Construct subject (NATS topic)
    subject := "chorus.councils.forming"

    // Create message with headers for tracing
    msg := &nats.Msg{
        Subject: subject,
        Data:    data,
        Header: nats.Header{
            "Council-ID":   []string{opp.CouncilID.String()},
            "Project":      []string{opp.ProjectName},
            "Repository":   []string{opp.Repository},
            "UCXL-Address": []string{opp.UCXLAddress},
            "Published-At": []string{time.Now().Format(time.RFC3339)},
        },
    }

    // Publish to NATS
    if err := p.nc.PublishMsg(msg); err != nil {
        return fmt.Errorf("failed to publish to NATS subject %s: %w", subject, err)
    }

    // Flush to ensure delivery
    if err := p.nc.FlushTimeout(5 * time.Second); err != nil {
        log.Warn().Err(err).Msg("NATS flush timeout (message likely delivered)")
    }

    log.Info().
        Str("council_id", opp.CouncilID.String()).
        Str("project", opp.ProjectName).
        Str("subject", subject).
        Int("core_roles", len(opp.CoreRoles)).
        Int("optional_roles", len(opp.OptionalRoles)).
        Msg("📡 Published council opportunity to NATS")

    return nil
}

func (p *CouncilPublisher) PublishCouncilStatus(ctx context.Context, update *p2p.CouncilStatusUpdate) error {
    data, err := json.Marshal(update)
    if err != nil {
        return fmt.Errorf("failed to marshal council status: %w", err)
    }

    subject := fmt.Sprintf("chorus.councils.%s.status", update.CouncilID.String())

    msg := &nats.Msg{
        Subject: subject,
        Data:    data,
        Header: nats.Header{
            "Council-ID": []string{update.CouncilID.String()},
            "Status":     []string{update.Status},
        },
    }

    if err := p.nc.PublishMsg(msg); err != nil {
        return fmt.Errorf("failed to publish council status: %w", err)
    }

    log.Info().
        Str("council_id", update.CouncilID.String()).
        Str("status", update.Status).
        Str("subject", subject).
        Msg("📢 Published council status update to NATS")

    return nil
}

func (p *CouncilPublisher) Close() error {
    if p.nc != nil && !p.nc.IsClosed() {
        p.nc.Close()
        log.Info().Msg("Closed NATS connection")
    }
    return nil
}

func (p *CouncilPublisher) Health() error {
    if p.nc == nil {
        return fmt.Errorf("NATS connection is nil")
    }
    if p.nc.IsClosed() {
        return fmt.Errorf("NATS connection is closed")
    }
    if !p.nc.IsConnected() {
        return fmt.Errorf("NATS connection is not connected")
    }
    return nil
}

Task 1.2: Integrate publisher into WHOOSH server (2 hours)

File: /home/tony/chorus/project-queues/active/WHOOSH/internal/server/server.go

import (
    natspub "github.com/chorus-services/whoosh/internal/nats"
)

type Server struct {
    // ... existing fields
    natsPublisher *natspub.CouncilPublisher  // NEW
}

func NewServer(cfg *config.Config, db *pgxpool.Pool) *Server {
    // ... existing setup

    // NEW: Initialize NATS publisher
    natsURL := os.Getenv("NATS_URL")
    if natsURL == "" {
        natsURL = "nats://backbeat-nats:4222"  // Default to BACKBEAT NATS
    }

    natsPublisher, err := natspub.NewCouncilPublisher(natsURL)
    if err != nil {
        log.Fatal().Err(err).Msg("Failed to initialize NATS publisher")
    }

    return &Server{
        // ... existing fields
        natsPublisher: natsPublisher,
    }
}

func (s *Server) Stop() error {
    // ... existing shutdown

    // NEW: Close NATS connection
    if s.natsPublisher != nil {
        s.natsPublisher.Close()
    }

    return nil
}

Task 1.3: Update broadcaster to use NATS (2 hours)

File: /home/tony/chorus/project-queues/active/WHOOSH/internal/p2p/broadcaster.go

// Add field to Broadcaster struct
type Broadcaster struct {
    discovery      *Discovery
    natsPublisher  *nats.CouncilPublisher  // NEW
    ctx            context.Context
    cancel         context.CancelFunc
}

// Update BroadcastCouncilOpportunity to use NATS
func (b *Broadcaster) BroadcastCouncilOpportunity(ctx context.Context, opportunity *CouncilOpportunity) error {
    log.Info().
        Str("council_id", opportunity.CouncilID.String()).
        Str("project_name", opportunity.ProjectName).
        Msg("📡 Broadcasting council opportunity via NATS")

    // NEW: Use NATS pub/sub instead of HTTP
    if b.natsPublisher != nil {
        err := b.natsPublisher.PublishCouncilOpportunity(ctx, opportunity)
        if err != nil {
            log.Error().Err(err).Msg("Failed to publish via NATS, falling back to HTTP")
            // Fallback to HTTP (during transition period)
            return b.broadcastViaHTTP(ctx, opportunity)
        }

        log.Info().
            Str("council_id", opportunity.CouncilID.String()).
            Msg("✅ Council opportunity published to NATS")

        return nil
    }

    // Fallback: Use existing HTTP broadcast
    return b.broadcastViaHTTP(ctx, opportunity)
}

// Keep existing HTTP method as fallback
func (b *Broadcaster) broadcastViaHTTP(ctx context.Context, opportunity *CouncilOpportunity) error {
    // ... existing HTTP broadcast logic
}

Day 2: CHORUS Subscriber Implementation

Task 2.1: Create NATS subscriber module (3 hours)

New File: /home/tony/chorus/project-queues/active/CHORUS/internal/nats/subscriber.go

package nats

import (
    "context"
    "encoding/json"
    "fmt"
    "time"

    "chorus/internal/council"
    "github.com/nats-io/nats.go"
    "github.com/rs/zerolog/log"
)

type CouncilSubscriber struct {
    nc         *nats.Conn
    subs       []*nats.Subscription
    councilMgr *council.Manager
    ctx        context.Context
    cancel     context.CancelFunc
}

func NewCouncilSubscriber(natsURL string, councilMgr *council.Manager) (*CouncilSubscriber, error) {
    ctx, cancel := context.WithCancel(context.Background())

    opts := []nats.Option{
        nats.Name(fmt.Sprintf("CHORUS Agent %s", councilMgr.AgentID)),
        nats.MaxReconnects(-1),
        nats.ReconnectWait(1 * time.Second),
        nats.DisconnectErrHandler(func(nc *nats.Conn, err error) {
            log.Warn().Err(err).Msg("NATS disconnected")
        }),
        nats.ReconnectHandler(func(nc *nats.Conn) {
            log.Info().Msg("NATS reconnected")
        }),
    }

    nc, err := nats.Connect(natsURL, opts...)
    if err != nil {
        cancel()
        return nil, fmt.Errorf("failed to connect to NATS: %w", err)
    }

    s := &CouncilSubscriber{
        nc:         nc,
        subs:       make([]*nats.Subscription, 0),
        councilMgr: councilMgr,
        ctx:        ctx,
        cancel:     cancel,
    }

    // Subscribe to council opportunities
    if err := s.subscribeToOpportunities(); err != nil {
        nc.Close()
        cancel()
        return nil, fmt.Errorf("failed to subscribe to opportunities: %w", err)
    }

    // Subscribe to council status updates
    if err := s.subscribeToStatusUpdates(); err != nil {
        log.Warn().Err(err).Msg("Failed to subscribe to status updates (non-critical)")
    }

    log.Info().
        Str("nats_url", natsURL).
        Str("agent_id", councilMgr.AgentID).
        Msg("✅ Subscribed to NATS council topics")

    return s, nil
}

func (s *CouncilSubscriber) subscribeToOpportunities() error {
    subject := "chorus.councils.forming"

    sub, err := s.nc.Subscribe(subject, func(msg *nats.Msg) {
        s.handleCouncilOpportunity(msg)
    })

    if err != nil {
        return fmt.Errorf("failed to subscribe to %s: %w", subject, err)
    }

    s.subs = append(s.subs, sub)

    log.Info().
        Str("subject", subject).
        Msg("📬 Subscribed to council opportunities")

    return nil
}

func (s *CouncilSubscriber) subscribeToStatusUpdates() error {
    subject := "chorus.councils.*.status"  // Wildcard subscription

    sub, err := s.nc.Subscribe(subject, func(msg *nats.Msg) {
        s.handleCouncilStatus(msg)
    })

    if err != nil {
        return fmt.Errorf("failed to subscribe to %s: %w", subject, err)
    }

    s.subs = append(s.subs, sub)

    log.Info().
        Str("subject", subject).
        Msg("📬 Subscribed to council status updates")

    return nil
}

func (s *CouncilSubscriber) handleCouncilOpportunity(msg *nats.Msg) {
    // Parse council opportunity from message
    var opp council.Opportunity
    if err := json.Unmarshal(msg.Data, &opp); err != nil {
        log.Error().
            Err(err).
            Str("subject", msg.Subject).
            Msg("Failed to unmarshal council opportunity")
        return
    }

    // Extract headers for logging
    councilID := msg.Header.Get("Council-ID")
    projectName := msg.Header.Get("Project")

    log.Info().
        Str("council_id", councilID).
        Str("project", projectName).
        Str("subject", msg.Subject).
        Int("core_roles", len(opp.CoreRoles)).
        Msg("📥 Received council opportunity from NATS")

    // Delegate to council manager (existing logic)
    ctx, cancel := context.WithTimeout(s.ctx, 30*time.Second)
    defer cancel()

    if err := s.councilMgr.HandleOpportunity(ctx, &opp); err != nil {
        log.Error().
            Err(err).
            Str("council_id", councilID).
            Msg("Failed to handle council opportunity")
        return
    }

    log.Debug().
        Str("council_id", councilID).
        Msg("Successfully processed council opportunity")
}

func (s *CouncilSubscriber) handleCouncilStatus(msg *nats.Msg) {
    var status council.StatusUpdate
    if err := json.Unmarshal(msg.Data, &status); err != nil {
        log.Error().Err(err).Msg("Failed to unmarshal council status")
        return
    }

    log.Info().
        Str("council_id", status.CouncilID.String()).
        Str("status", status.Status).
        Msg("📥 Received council status update from NATS")

    // Delegate to council manager
    if err := s.councilMgr.HandleStatusUpdate(s.ctx, &status); err != nil {
        log.Error().
            Err(err).
            Str("council_id", status.CouncilID.String()).
            Msg("Failed to handle council status update")
    }
}

func (s *CouncilSubscriber) Close() error {
    s.cancel()

    // Unsubscribe all subscriptions
    for _, sub := range s.subs {
        if err := sub.Unsubscribe(); err != nil {
            log.Warn().Err(err).Msg("Failed to unsubscribe from NATS")
        }
    }

    // Close NATS connection
    if s.nc != nil && !s.nc.IsClosed() {
        s.nc.Close()
        log.Info().Msg("Closed NATS connection")
    }

    return nil
}

func (s *CouncilSubscriber) Health() error {
    if s.nc == nil {
        return fmt.Errorf("NATS connection is nil")
    }
    if s.nc.IsClosed() {
        return fmt.Errorf("NATS connection is closed")
    }
    if !s.nc.IsConnected() {
        return fmt.Errorf("NATS not connected")
    }
    return nil
}

Task 2.2: Integrate subscriber into CHORUS main (2 hours)

File: /home/tony/chorus/project-queues/active/CHORUS/cmd/chorus-agent/main.go

import (
    natssub "chorus/internal/nats"
)

func main() {
    // ... existing setup

    // Initialize council manager
    councilMgr := council.NewManager(agentID, agentName, endpoint, p2pAddr, capabilities)

    // NEW: Subscribe to NATS instead of HTTP endpoint
    natsURL := os.Getenv("NATS_URL")
    if natsURL == "" {
        natsURL = "nats://backbeat-nats:4222"
    }

    natsSubscriber, err := natssub.NewCouncilSubscriber(natsURL, councilMgr)
    if err != nil {
        log.Fatal().Err(err).Msg("Failed to initialize NATS subscriber")
    }
    defer natsSubscriber.Close()

    log.Info().
        Str("agent_id", agentID).
        Str("nats_url", natsURL).
        Msg("🎧 CHORUS agent listening for council opportunities via NATS")

    // HTTP server still runs for health checks
    httpServer := api.NewHTTPServer(cfg, node, hlog, ps)
    go func() {
        if err := httpServer.Start(); err != nil && err != http.ErrServerClosed {
            log.Error().Err(err).Msg("HTTP server error")
        }
    }()

    // ... rest of main
}

Day 3-4: Testing, Deployment, and Validation

Task 3.1: Local testing with Docker Compose (4 hours)

Create test environment: /home/tony/chorus/project-queues/active/WHOOSH/docker-compose.nats-test.yml

version: '3.8'

services:
  nats:
    image: nats:2.9-alpine
    command: ["--jetstream", "--debug"]
    ports:
      - "4222:4222"
      - "8222:8222"  # Monitoring port
    networks:
      - test-net

  whoosh:
    build: .
    environment:
      - NATS_URL=nats://nats:4222
      - CHORUS_SERVICE_NAME=test_chorus
    depends_on:
      - nats
    networks:
      - test-net

  chorus-agent-1:
    build: ../CHORUS
    environment:
      - NATS_URL=nats://nats:4222
      - CHORUS_AGENT_ID=test-agent-1
    depends_on:
      - nats
    networks:
      - test-net

  chorus-agent-2:
    build: ../CHORUS
    environment:
      - NATS_URL=nats://nats:4222
      - CHORUS_AGENT_ID=test-agent-2
    depends_on:
      - nats
    networks:
      - test-net

networks:
  test-net:
    driver: bridge

Run tests:

cd /home/tony/chorus/project-queues/active/WHOOSH
docker-compose -f docker-compose.nats-test.yml up

# In another terminal, trigger council formation
curl -X POST http://localhost:8080/api/v1/councils \
  -H "Content-Type: application/json" \
  -d '{
    "project_name": "NATS Test",
    "repository": "https://gitea.chorus.services/tony/test",
    "core_roles": ["tpm", "developer"]
  }'

# Verify both agents received message
docker-compose -f docker-compose.nats-test.yml logs chorus-agent-1 | grep "Received council opportunity"
docker-compose -f docker-compose.nats-test.yml logs chorus-agent-2 | grep "Received council opportunity"

Task 3.2: Production deployment (4 hours)

# Build and push WHOOSH
cd /home/tony/chorus/project-queues/active/WHOOSH
docker build -t registry.home.deepblack.cloud/whoosh:v1.3.0-nats .
docker push registry.home.deepblack.cloud/whoosh:v1.3.0-nats

# Build and push CHORUS
cd /home/tony/chorus/project-queues/active/CHORUS
make build-agent
docker build -f Dockerfile.simple -t registry.home.deepblack.cloud/chorus:v0.6.0-nats .
docker push registry.home.deepblack.cloud/chorus:v0.6.0-nats

# Update docker-compose files
# Update WHOOSH_BACKBEAT-prototype/docker-compose.swarm.yml:
#   whoosh:
#     image: registry.home.deepblack.cloud/whoosh:v1.3.0-nats
#     environment:
#       - NATS_URL=nats://backbeat-nats:4222

# Update CHORUS/docker/docker-compose.yml:
#   chorus:
#     image: registry.home.deepblack.cloud/chorus:v0.6.0-nats
#     environment:
#       - NATS_URL=nats://backbeat-nats:4222

# Deploy to Swarm
docker stack deploy -c WHOOSH_BACKBEAT-prototype/docker-compose.swarm.yml WHOOSH
docker stack deploy -c CHORUS/docker/docker-compose.yml CHORUS

Task 3.3: Validation and monitoring (2 hours)

# Monitor NATS connections
docker service logs WHOOSH_backbeat-nats --tail 100

# Monitor WHOOSH publishing
docker service logs WHOOSH_whoosh | grep "Published council opportunity"

# Monitor CHORUS subscriptions
docker service logs CHORUS_chorus | grep "Received council opportunity"

# Verify council formation works end-to-end
# (trigger test council via UI or API)

Total Implementation Time: 3-4 days


7. Timeline Estimates

PHASE 1: Docker API Quick Fix (Week 1)
=========================================

Day 1 (Friday)
├─ 09:00-13:00  Implementation
│  ├─ 09:00-11:00  Code changes (discovery.go)
│  ├─ 11:00-11:30  Dependency updates (go.mod)
│  ├─ 11:30-12:00  Docker compose changes
│  └─ 12:00-13:00  Build and push images
│
└─ 14:00-17:00  Testing and Deployment
   ├─ 14:00-15:00  Local testing
   ├─ 15:00-16:00  Swarm deployment
   ├─ 16:00-17:00  Validation and monitoring
   └─ 17:00        ✅ Phase 1 Complete

PHASE 1 VALIDATION (Week 2)
===========================

Monday-Friday
├─ Daily monitoring
├─ Performance metrics collection
├─ Bug fixes (if needed)
└─ Friday: Phase 1 sign-off meeting

PHASE 2: NATS Migration (Week 3)
=================================

Monday
├─ 09:00-12:00  WHOOSH publisher implementation
└─ 13:00-17:00  WHOOSH integration and testing

Tuesday
├─ 09:00-12:00  CHORUS subscriber implementation
└─ 13:00-17:00  CHORUS integration and testing

Wednesday
├─ 09:00-12:00  Local integration testing
├─ 13:00-15:00  Build and push images
└─ 15:00-17:00  Staging deployment

Thursday
├─ 09:00-11:00  Staging validation
├─ 11:00-13:00  Production deployment
└─ 14:00-17:00  Production monitoring

Friday
├─ 09:00-12:00  Performance validation
├─ 13:00-15:00  Remove HTTP fallback code
├─ 15:00-16:00  Documentation
└─ 16:00        ✅ Phase 2 Complete

Total: 2 weeks + 3 days

7.2 Resource Allocation

Role Phase 1 (Days) Phase 2 (Days) Total
Senior Software Architect 0.5 1.0 1.5
Backend Developer 1.0 3.0 4.0
DevOps Engineer 0.5 1.0 1.5
QA Engineer 0.5 1.0 1.5
Total Person-Days 2.5 6.0 8.5

7.3 Critical Path Analysis

Phase 1 Critical Path:

  1. Code implementation (4 hours) - BLOCKER
  2. Docker socket mount (10 min) - BLOCKER
  3. Build/push (30 min) - BLOCKER
  4. Deployment (30 min) - BLOCKER
  5. Validation (1 hour) - BLOCKER

Total Critical Path: 6 hours (can be completed in 1 day)

Phase 2 Critical Path:

  1. WHOOSH publisher (Day 1) - BLOCKER
  2. CHORUS subscriber (Day 2) - BLOCKER for Day 3
  3. Integration testing (Day 3 AM) - BLOCKER for deployment
  4. Production deployment (Day 3 PM) - BLOCKER
  5. Validation (Day 4) - BLOCKER for sign-off

Total Critical Path: 3.5 days


8. Final Recommendation

8.1 Executive Summary

Implement Option 4: Hybrid Approach

Phase 1 (Next 48 hours): Deploy Docker API Quick Fix Phase 2 (Week 3): Migrate to NATS-Only Solution Defer: Full libp2p migration until P2P discovery is truly needed

8.2 Rationale

Why NOT Full libp2p Migration Now?

  1. Complexity vs. Value: libp2p requires 650+ LOC changes, 7-11 days implementation, and steep learning curve. Current requirements (broadcast to all agents) don't justify this complexity.

  2. Infrastructure Already Exists: NATS is already deployed (backbeat-nats service) and proven in production. Using existing infrastructure reduces risk and operational overhead.

  3. No P2P Discovery Needed Yet: Current architecture has centralized WHOOSH coordinator. True peer-to-peer discovery (mDNS/DHT) isn't required until agents need to discover each other without WHOOSH.

  4. Migration Path Preserved: NATS pub/sub can later be replaced with libp2p gossipsub with minimal changes (both use topic-based messaging).

Why Docker API First?

  1. Immediate Unblock: Solves the critical discovery problem in 1 day, unblocking E2E workflow testing.

  2. Minimal Risk: 87 LOC change, easy rollback, no architectural changes.

  3. Validation Period: Gives 1 week to validate council formation works before starting Phase 2.

  4. Dual-Mode Safety: Allows testing NATS migration while keeping Docker API as fallback.

Why NATS Long-Term?

  1. Right Complexity Level: Solves 90% of use cases (broadcast, persistence, reliability) with 20% of libp2p complexity.

  2. Production-Ready: NATS is battle-tested, well-documented, and the team has experience with it (BACKBEAT SDK).

  3. Performance: Sub-millisecond message delivery, scales to millions of messages/sec.

  4. Operational Simplicity: Centralized monitoring, simple debugging, clear failure modes.

  5. Future-Proof: When/if P2P discovery is needed, NATS can remain for messaging while adding libp2p for discovery only.

8.3 Implementation Sequence

┌─────────────────────────────────────────────────────────┐
│  WEEK 1: Emergency Unblock                              │
├─────────────────────────────────────────────────────────┤
│  Friday:  Docker API implementation (6 hours)           │
│  Result:  100% agent discovery, council formation works │
└─────────────────────────────────────────────────────────┘
                        ↓
┌─────────────────────────────────────────────────────────┐
│  WEEK 2: Validation                                     │
├─────────────────────────────────────────────────────────┤
│  Mon-Fri: Monitor production, collect metrics           │
│  Result:  Confidence in Docker API solution             │
└─────────────────────────────────────────────────────────┘
                        ↓
┌─────────────────────────────────────────────────────────┐
│  WEEK 3: NATS Migration                                 │
├─────────────────────────────────────────────────────────┤
│  Mon:     WHOOSH publisher implementation               │
│  Tue:     CHORUS subscriber implementation              │
│  Wed AM:  Integration testing                           │
│  Wed PM:  Production deployment (dual-mode)             │
│  Thu-Fri: Validation, remove Docker API fallback        │
│  Result:  Production NATS-based pub/sub messaging       │
└─────────────────────────────────────────────────────────┘
                        ↓
┌─────────────────────────────────────────────────────────┐
│  FUTURE: libp2p (when needed)                           │
├─────────────────────────────────────────────────────────┤
│  Trigger: Need for P2P discovery, DHT storage, or       │
│           cross-datacenter mesh                         │
│  Approach: Add libp2p for discovery, keep NATS for      │
│            messaging (hybrid architecture)              │
└─────────────────────────────────────────────────────────┘

8.4 Success Metrics

Phase 1 Success Criteria:

  • All 34 CHORUS agents discovered by WHOOSH
  • Council broadcasts reach 100% of agents
  • Council formation completes in <60 seconds
  • Task execution begins successfully
  • Zero production incidents

Phase 2 Success Criteria:

  • NATS message delivery rate >99.9%
  • Message latency <100ms (p95)
  • Zero message loss
  • Successful rollback test
  • Zero production incidents during migration

8.5 Risk Mitigation Summary

Risk Mitigation
Phase 1 deployment failure Easy rollback (remove docker.sock mount)
Docker API performance issues Monitor task list query latency, cache results
Phase 2 NATS connection failures Keep HTTP fallback during transition, NATS clustering
Message delivery failures JetStream persistence, at-least-once delivery guarantees
Coordinated deployment issues Dual-mode operation (HTTP + NATS) during transition
Team knowledge gaps NATS already used in BACKBEAT, extensive documentation

8.6 Final Architecture Vision

┌────────────────────────────────────────────────────┐
│               FINAL ARCHITECTURE                    │
│              (End of Week 3)                        │
└────────────────────────────────────────────────────┘

┌─────────────┐
│   WHOOSH    │  - NATS publisher
│             │  - Council orchestrator
└──────┬──────┘  - Docker API for health monitoring
       │
       │ Publish to NATS subject
       │ "chorus.councils.forming"
       ▼
┌──────────────────────────────────────────────────┐
│  NATS Server (JetStream)                         │
│  - Already deployed as backbeat-nats             │
│  - Clustered for HA (future)                     │
│  - Message persistence enabled                   │
│  - Monitoring via NATS CLI / Prometheus          │
└──────┬──────┬──────┬──────┬──────┬──────────────┘
       │      │      │      │      │
   ┌───▼┐  ┌──▼─┐ ┌──▼─┐ ┌─▼──┐ (... 34 total)
   │CH1 │  │CH2 │ │CH3 │ │CH4 │  CHORUS agents
   │SUB │  │SUB │ │SUB │ │SUB │  - NATS subscribers
   └────┘  └────┘ └────┘ └────┘  - HTTP for health checks
                                  - libp2p for P2P comms (future)

Benefits:
✅ 100% agent discovery
✅ Sub-100ms message delivery
✅ At-least-once delivery guarantees
✅ Message persistence and replay
✅ Scalable to 1000+ agents
✅ Familiar technology stack
✅ Simple operational model
✅ Clear upgrade path to libp2p

8.7 Action Items

Immediate (Next 24 hours):

  1. Approve Hybrid Approach recommendation
  2. Schedule Phase 1 implementation (Friday)
  3. Assign developer resources
  4. Prepare rollback procedures
  5. Set up monitoring dashboards

Week 1:

  1. Implement Docker API discovery
  2. Deploy to production Swarm
  3. Validate 100% agent discovery
  4. Verify council formation E2E
  5. Document Phase 1 results

Week 2:

  1. Monitor production stability
  2. Collect performance metrics
  3. Plan Phase 2 NATS migration
  4. Review NATS clustering options
  5. Schedule Phase 2 deployment window

Week 3:

  1. Implement NATS publisher/subscriber
  2. Integration testing
  3. Production deployment (dual-mode)
  4. Validate NATS messaging
  5. Remove Docker API fallback
  6. Document final architecture

9. Appendix

9.1 ASCII Architecture Diagram - Current vs. Future

CURRENT (Broken):

┌─────────┐                    ┌─────────────────────┐
│ WHOOSH  │────HTTP GET───────→│  Docker Swarm VIP   │
│         │    (chorus:8081)   │   10.0.13.26        │
└─────────┘                    └──────────┬──────────┘
                                          │
                                 Round-robin to random
                                          ↓
                              ┌───────────┴───────────┐
                              ▼                       ▼
                         ┌─────────┐            ┌─────────┐
                         │CHORUS #1│            │CHORUS #2│
                         │  FOUND  │            │  FOUND  │
                         └─────────┘            └─────────┘

                         ❌ 32 other agents invisible!

PHASE 1 (Docker API):

┌─────────┐                    ┌─────────────────────┐
│ WHOOSH  │────Docker API─────→│  Swarm Manager      │
│ + sock  │  ListTasks()       │  /var/run/docker.sock│
└─────────┘                    └──────────┬──────────┘
                                          │
                                Returns ALL 34 tasks
                                    with IPs
                                          ↓
                    ┌─────────────────────┴──────────────┐
                    ▼                ▼                   ▼
              ┌─────────┐      ┌─────────┐        ┌─────────┐
              │CHORUS #1│      │CHORUS #2│  ...   │CHORUS #34│
              │10.0.13.1│      │10.0.13.2│        │10.0.13.34│
              └─────────┘      └─────────┘        └─────────┘

              ✅ ALL agents discovered via direct IP!

PHASE 2 (NATS):

┌─────────┐                    ┌─────────────────────┐
│ WHOOSH  │────PUBLISH────────→│  NATS Server        │
│ + NATS  │  subject:          │  (JetStream)        │
└─────────┘  chorus.councils.* └──────────┬──────────┘
                                          │
                                    Broadcast to
                                   all subscribers
                                          │
                    ┌─────────────────────┴──────────────┐
                    ▼                ▼                   ▼
              ┌─────────┐      ┌─────────┐        ┌─────────┐
              │CHORUS #1│      │CHORUS #2│  ...   │CHORUS #34│
              │SUB:     │      │SUB:     │        │SUB:     │
              │forming  │      │forming  │        │forming  │
              └─────────┘      └─────────┘        └─────────┘

              ✅ Pub/sub messaging, all receive simultaneously!

9.2 Code File Changes Summary

Phase 1: Docker API

File Action LOC Description
WHOOSH/internal/p2p/discovery.go MODIFY +80 Add discoverDockerSwarmAgents()
WHOOSH/go.mod MODIFY +2 Add docker/docker dependency
WHOOSH/docker-compose.swarm.yml MODIFY +5 Mount docker.sock
Total 87

Phase 2: NATS

File Action LOC Description
WHOOSH/internal/nats/publisher.go CREATE +150 NATS publisher implementation
WHOOSH/internal/server/server.go MODIFY +20 Integrate NATS publisher
WHOOSH/internal/p2p/broadcaster.go MODIFY +30 Use NATS for broadcasting
WHOOSH/go.mod MODIFY +2 Add nats-io/nats.go
CHORUS/internal/nats/subscriber.go CREATE +180 NATS subscriber implementation
CHORUS/cmd/chorus-agent/main.go MODIFY +30 Initialize NATS subscriber
CHORUS/go.mod MODIFY +2 Add nats-io/nats.go
Total 414

9.3 Environment Variables Reference

WHOOSH:

# Phase 1
WHOOSH_DISCOVERY_MODE=docker-swarm
CHORUS_SERVICE_NAME=CHORUS_chorus
CHORUS_AGENT_PORT=8080

# Phase 2
NATS_URL=nats://backbeat-nats:4222

CHORUS:

# Phase 2
NATS_URL=nats://backbeat-nats:4222
CHORUS_AGENT_ID=auto-generated-uuid
CHORUS_AGENT_NAME=CHORUS-Agent-{slot}

9.4 Monitoring and Observability

Phase 1 Metrics:

# WHOOSH discovery metrics
whoosh_agent_discovery_total{method="docker_api"} 34
whoosh_agent_discovery_duration_seconds{method="docker_api"} 0.125
whoosh_broadcast_success_total{council_id="xxx"} 34
whoosh_broadcast_error_total{council_id="xxx"} 0

Phase 2 Metrics:

# NATS metrics (built-in)
nats_connections{name="WHOOSH"} 1
nats_subscriptions{subject="chorus.councils.forming"} 34
nats_messages_in{subject="chorus.councils.forming"} 156
nats_messages_out{subject="chorus.councils.forming"} 156
nats_message_latency_seconds{p95} 0.045

# Application metrics
whoosh_nats_publish_total{subject="chorus.councils.forming"} 156
whoosh_nats_publish_errors_total 0
chorus_nats_receive_total{subject="chorus.councils.forming"} 34
chorus_council_opportunity_handled_total 34

Conclusion

The Hybrid Approach (Docker API → NATS) provides the optimal balance of:

  • Immediate value (1-day unblock)
  • Low risk (staged rollout)
  • Long-term quality (production-grade pub/sub)
  • Pragmatic complexity (simpler than libp2p, sufficient for needs)

This architecture will support 100+ agents, sub-second message delivery, and future migration to full libp2p if/when P2P discovery becomes a requirement.

Recommended Decision: Approve Hybrid Approach and begin Phase 1 implementation immediately.


Document Version: 1.0 Last Updated: 2025-10-10 Review Date: 2025-10-17 (after Phase 1 completion)