74 KiB
Agent Discovery and P2P Architecture Migration Analysis
Date: 2025-10-10 Author: Claude Code (Senior Software Architect) Status: CRITICAL ARCHITECTURE DECISION Context: WHOOSH discovers only 2/34 CHORUS agents, blocking E2E workflow
Executive Summary
The current HTTP/DNS-based discovery architecture is fundamentally incompatible with Docker Swarm's VIP-based load balancing. This analysis evaluates three migration approaches to resolve the agent discovery crisis and provides detailed implementation guidance.
Recommendation Summary
SHORT-TERM (Next 24-48 hours): Deploy Option 2 - Docker API Quick Fix LONG-TERM (Next 2 weeks): Migrate to Option 4 - NATS-Only Solution (simpler than full libp2p)
Rationale: Docker API provides immediate unblock with minimal risk, while NATS-based pub/sub delivers 80% of libp2p benefits with 20% of the complexity. Full libp2p migration deferred until true P2P discovery (mDNS/DHT) is required.
Table of Contents
- Current Architecture Analysis
- Complexity Assessment
- Technical Feasibility Evaluation
- Alternative Solution Comparison
- Risk Analysis Matrix
- Detailed Implementation Plans
- Timeline Estimates
- Final Recommendation
1. Current Architecture Analysis
1.1 Existing System Flow
┌──────────────────────────────────────────────────────────────┐
│ CURRENT BROKEN FLOW │
└──────────────────────────────────────────────────────────────┘
┌─────────────┐
│ WHOOSH │ Discovery Service
│ (1 node) │ - internal/p2p/discovery.go
└──────┬──────┘ - internal/p2p/broadcaster.go
│
│ 1. DNS Lookup: "chorus" service
▼
┌──────────────────┐
│ Docker Swarm │ Returns: 10.0.13.26 (VIP only)
│ DNS Service │ Expected: 34 container IPs
└────────┬─────────┘
│
│ 2. HTTP GET http://chorus:8081/health
▼
┌──────────────────┐
│ Swarm VIP LB │ Round-robins to random containers
│ 10.0.13.26 │ WHOOSH gets ~2 unique responses
└────────┬─────────┘
│
┌────┴─────┬──────────────┬────────────┐
▼ ▼ ▼ ▼ (... 34 total)
┌────────┐ ┌────────┐ ┌────────┐ ┌────────┐
│ CHORUS │ │ CHORUS │ │ CHORUS │ │ CHORUS │
│ Agent1 │ │ Agent2 │ │ Agent3 │ │ Agent34│
└────────┘ └────────┘ └────────┘ └────────┘
✅ ✅ ❌ ❌
(Discovered) (Discovered) (Invisible) (Invisible)
Result: Only 2/34 agents discovered → Council formation fails
1.2 Root Cause Analysis
Problem: Docker Swarm DNS returns Service VIP, not individual container IPs
Evidence from codebase:
// File: /home/tony/chorus/project-queues/active/WHOOSH/internal/p2p/discovery.go:222-235
func (d *Discovery) queryActualCHORUSService() {
client := &http.Client{Timeout: 10 * time.Second}
// ❌ BROKEN: This queries VIP, not all containers
endpoint := "http://chorus:8081/health"
resp, err := client.Get(endpoint)
// Only discovers ONE agent per request
// VIP load-balancer picks random container
// Result: ~2-5 agents found after multiple attempts
}
Why this fails:
- DNS
chorus→10.0.13.26(VIP only) - No API to enumerate all 34 backend containers
- HTTP requires knowing endpoints in advance
- VIP randomization means discovery is non-deterministic
2. Complexity Assessment
2.1 Option 1: Full libp2p/HMMM Migration
Components to Modify:
| Component | File Path | Changes Required | LOC Changed | Complexity |
|---|---|---|---|---|
| WHOOSH Discovery | internal/p2p/discovery.go |
Replace HTTP with libp2p host init | 200+ | HIGH |
| WHOOSH Broadcaster | internal/p2p/broadcaster.go |
Convert to pub/sub topics | 150+ | HIGH |
| WHOOSH Main | cmd/whoosh/main.go |
Initialize libp2p node | 50+ | MEDIUM |
| CHORUS HTTP API | api/http_server.go |
Add topic subscriptions | 100+ | MEDIUM |
| CHORUS Council Mgr | internal/council/manager.go |
Subscribe to topics | 80+ | MEDIUM |
| CHORUS Main | cmd/chorus-agent/main.go |
Initialize libp2p node | 50+ | MEDIUM |
| HMMM BACKBEAT Config | Docker compose updates | Topic configuration | 20+ | LOW |
| TOTAL | 650+ LOC | HIGH |
Complexity Rating: HIGH
Rationale:
- libp2p requires understanding gossipsub protocol
- Peer discovery via mDNS/DHT adds network complexity
- Need to maintain both HTTP (health checks) and libp2p
- Breaking change requiring coordinated deployment
- Testing requires full cluster stack
2.2 Option 2: Docker API Quick Fix
Components to Modify:
| Component | File Path | Changes Required | LOC Changed | Complexity |
|---|---|---|---|---|
| WHOOSH Discovery | internal/p2p/discovery.go |
Add Docker client integration | 80 | LOW |
| WHOOSH Config | docker-compose.swarm.yml |
Mount docker.sock | 5 | LOW |
| Dependencies | go.mod |
Add docker/docker library | 2 | LOW |
| TOTAL | 87 LOC | LOW |
Complexity Rating: LOW
Rationale:
- Single file modification
- Well-documented Docker Go SDK
- No architecture changes
- Easy rollback (remove socket mount)
- No distributed systems complexity
2.3 Option 3: NATS-Only Solution
Components to Modify:
| Component | File Path | Changes Required | LOC Changed | Complexity |
|---|---|---|---|---|
| WHOOSH Publisher | internal/p2p/broadcaster.go |
Replace HTTP with NATS pub | 100 | MEDIUM |
| WHOOSH Main | cmd/whoosh/main.go |
Initialize NATS connection | 30 | LOW |
| CHORUS Subscriber | api/http_server.go |
Subscribe to NATS topics | 80 | MEDIUM |
| CHORUS Main | cmd/chorus-agent/main.go |
Initialize NATS connection | 30 | LOW |
| Dependencies | go.mod (both repos) |
Add nats-io/nats.go | 4 | LOW |
| TOTAL | 244 LOC | MEDIUM |
Complexity Rating: MEDIUM
Rationale:
- NATS SDK is simple (already used in BACKBEAT)
- No peer discovery needed (centralized NATS broker)
- Maintains HTTP for health checks
- Production-ready infrastructure (NATS already deployed)
- Moderate testing complexity
2.4 Option 4: Hybrid Approach
Phase 1 (Docker API): LOW complexity (87 LOC, 1-2 days) Phase 2 (NATS Migration): MEDIUM complexity (244 LOC, 3-4 days)
Total Complexity Rating: MEDIUM (staged deployment reduces risk)
3. Technical Feasibility Evaluation
3.1 Can we use NATS directly instead of full libp2p?
YES - Strongly Recommended
Comparison:
| Feature | libp2p | NATS | Requirement |
|---|---|---|---|
| Pub/Sub Messaging | ✅ Gossipsub | ✅ JetStream | ✅ Required |
| Broadcast to all agents | ✅ Topic-based | ✅ Subject-based | ✅ Required |
| Peer Discovery | ✅ mDNS/DHT | ❌ Centralized | ⚠️ Not needed yet |
| At-least-once delivery | ✅ | ✅ JetStream | ✅ Required |
| Message persistence | ⚠️ Complex | ✅ Built-in | 🎯 Nice-to-have |
| Cluster-wide deployment | ✅ Decentralized | ✅ Centralized | ✅ Either works |
| Existing infrastructure | ❌ New | ✅ Already deployed | 🎯 Huge win |
| Learning curve | 🔴 Steep | 🟢 Gentle | 🎯 Team familiarity |
| Production maturity | 🟡 Good | 🟢 Excellent | ✅ Required |
Evidence from deployed infrastructure:
# File: /home/tony/chorus/project-queues/active/WHOOSH/BACKBEAT-prototype/docker-compose.swarm.yml:155-190
services:
nats:
image: nats:2.9-alpine
command: ["--jetstream"] # ✅ JetStream already enabled
deploy:
replicas: 1
networks:
- backbeat-net # ✅ Shared network with WHOOSH/CHORUS
BACKBEAT SDK already uses NATS:
// File: /home/tony/chorus/project-queues/active/WHOOSH/BACKBEAT-prototype/pkg/sdk/client.go:15
import (
"github.com/nats-io/nats.go" // ✅ Already in vendor
)
// Subject pattern already established
subject := fmt.Sprintf("backbeat.status.%s", c.config.ClusterID)
Recommendation: Use NATS for 90% of use cases. Defer libp2p until you need:
- True P2P discovery (no centralized broker)
- Cross-datacenter mesh networking
- DHT-based content routing
3.2 What's the minimal viable libp2p implementation?
If you must use libp2p (not recommended for MVP):
// Minimal libp2p setup for pub/sub only
package main
import (
"context"
"github.com/libp2p/go-libp2p"
pubsub "github.com/libp2p/go-libp2p-pubsub"
"github.com/multiformats/go-multiaddr"
)
func InitMinimalLibp2p(ctx context.Context) (*pubsub.PubSub, error) {
// 1. Create libp2p host
host, err := libp2p.New(
libp2p.ListenAddrStrings("/ip4/0.0.0.0/tcp/9000"),
)
if err != nil {
return nil, err
}
// 2. Create gossipsub instance
ps, err := pubsub.NewGossipSub(ctx, host)
if err != nil {
return nil, err
}
// 3. Bootstrap to peers (requires known peer addresses)
bootstrapPeers := []string{
"/ip4/192.168.1.27/tcp/9000/p2p/QmYourPeerID",
}
for _, peerAddr := range bootstrapPeers {
addr, _ := multiaddr.NewMultiaddr(peerAddr)
host.Connect(ctx, addr)
}
return ps, nil
}
// Usage in WHOOSH
func BroadcastCouncilOpportunity(ps *pubsub.PubSub, opp *CouncilOpportunity) error {
topic, _ := ps.Join("councils.forming")
data, _ := json.Marshal(opp)
return topic.Publish(context.Background(), data)
}
// Usage in CHORUS
func SubscribeToOpportunities(ps *pubsub.PubSub) {
topic, _ := ps.Join("councils.forming")
sub, _ := topic.Subscribe()
for {
msg, _ := sub.Next(context.Background())
var opp CouncilOpportunity
json.Unmarshal(msg.Data, &opp)
handleOpportunity(opp)
}
}
Minimal dependencies:
go get github.com/libp2p/go-libp2p@latest
go get github.com/libp2p/go-libp2p-pubsub@latest
Estimated complexity: 300 LOC (still 3x more than NATS)
3.3 Are there Go libraries/examples we can leverage?
YES - Multiple options
Option A: NATS Go Client (Recommended)
# Already in BACKBEAT SDK vendor directory
/home/tony/chorus/project-queues/active/WHOOSH/vendor/github.com/nats-io/nats.go/
Example from BACKBEAT SDK:
// File: internal/backbeat/integration.go (hypothetical WHOOSH usage)
import "github.com/nats-io/nats.go"
type WHOOSHPublisher struct {
nc *nats.Conn
}
func NewWHOOSHPublisher(natsURL string) (*WHOOSHPublisher, error) {
nc, err := nats.Connect(natsURL)
if err != nil {
return nil, err
}
return &WHOOSHPublisher{nc: nc}, nil
}
func (w *WHOOSHPublisher) BroadcastCouncilOpportunity(opp *CouncilOpportunity) error {
data, _ := json.Marshal(opp)
// Publish to all CHORUS agents subscribed to this subject
return w.nc.Publish("chorus.councils.forming", data)
}
Option B: Docker Go SDK
# For Docker API quick fix
go get github.com/docker/docker@latest
Example:
// File: internal/p2p/discovery_swarm.go (new file)
import (
"context"
"github.com/docker/docker/client"
"github.com/docker/docker/api/types"
"github.com/docker/docker/api/types/filters"
)
func (d *Discovery) DiscoverSwarmAgents() ([]*Agent, error) {
cli, err := client.NewClientWithOpts(client.FromEnv)
if err != nil {
return nil, err
}
defer cli.Close()
// List all tasks for CHORUS service
tasks, err := cli.TaskList(context.Background(), types.TaskListOptions{
Filters: filters.NewArgs(
filters.Arg("service", "CHORUS_chorus"),
filters.Arg("desired-state", "running"),
),
})
agents := []*Agent{}
for _, task := range tasks {
// Extract container IP from task
for _, attachment := range task.NetworksAttachments {
if len(attachment.Addresses) > 0 {
ip := strings.Split(attachment.Addresses[0], "/")[0]
endpoint := fmt.Sprintf("http://%s:8080", ip)
agents = append(agents, &Agent{
ID: task.ID[:12],
Endpoint: endpoint,
Status: "online",
// ... populate other fields
})
}
}
}
return agents, nil
}
Option C: libp2p Examples
# Official examples repository
git clone https://github.com/libp2p/go-libp2p-examples.git
# Relevant examples:
# - pubsub-example/
# - echo/
# - chat-with-rendezvous/
3.4 What about backwards compatibility during migration?
Strategy: Dual-mode operation during transition
// File: internal/p2p/broadcaster.go
type Broadcaster struct {
discovery *Discovery
natsConn *nats.Conn // New: NATS connection
ctx context.Context
cancel context.CancelFunc
// Feature flag for gradual rollout
useNATS bool
}
func (b *Broadcaster) BroadcastCouncilOpportunity(ctx context.Context, opp *CouncilOpportunity) error {
if b.useNATS && b.natsConn != nil {
// New path: NATS pub/sub
return b.broadcastViaNATS(ctx, opp)
}
// Legacy path: HTTP POST (existing code)
return b.broadcastViaHTTP(ctx, opp)
}
func (b *Broadcaster) broadcastViaNATS(ctx context.Context, opp *CouncilOpportunity) error {
data, err := json.Marshal(opp)
if err != nil {
return err
}
subject := fmt.Sprintf("chorus.councils.%s.forming", opp.CouncilID)
return b.natsConn.Publish(subject, data)
}
func (b *Broadcaster) broadcastViaHTTP(ctx context.Context, opp *CouncilOpportunity) error {
// Existing HTTP broadcast logic (unchanged)
agents := b.discovery.GetAgents()
// ... existing code
}
Deployment strategy:
- Week 1: Deploy NATS code with
useNATS=false(no behavior change) - Week 2: Enable NATS for 10% of broadcasts (canary testing)
- Week 3: Enable NATS for 100% of broadcasts
- Week 4: Remove HTTP broadcast code entirely
Rollback: Set useNATS=false via environment variable
4. Alternative Solution Comparison
4.1 Option 1: Full libp2p/HMMM Migration
Architecture:
┌─────────────┐
│ WHOOSH │
│ (libp2p) │ - Initialize libp2p host
└──────┬──────┘ - Join "councils.forming" topic
│ - Publish council opportunities
│
│ Publishes to libp2p gossipsub topic
▼
┌──────────────────────────────────────────┐
│ libp2p Gossipsub Network │
│ - Peer discovery via mDNS │
│ - DHT routing (optional) │
│ - Message propagation to all subscribers│
└──────┬─────┬─────┬─────┬─────┬──────────┘
│ │ │ │ │
┌───▼┐ ┌──▼─┐ ┌─▼──┐ ┌▼──┐ (... 34 total)
│CH1 │ │CH2 │ │CH3 │ │CH4│ All CHORUS agents
└────┘ └────┘ └────┘ └───┘ subscribe to topic
✅ ALL 34 agents receive message simultaneously
Pros:
- ✅ True P2P architecture (no central broker)
- ✅ Automatic peer discovery via mDNS/DHT
- ✅ Built-in encryption and authentication
- ✅ Message deduplication
- ✅ Resilient to network partitions
- ✅ Unlocks future P2P features (DHT storage, streams)
Cons:
- ❌ High complexity (650+ LOC changes)
- ❌ Steep learning curve
- ❌ Requires libp2p expertise
- ❌ Breaking change (coordinated deployment)
- ❌ Complex testing requirements
- ❌ Overkill for current needs
Implementation time: 7-11 days
When to use: When you need true P2P discovery without central infrastructure
4.2 Option 2: Docker API Quick Fix
Architecture:
┌─────────────┐
│ WHOOSH │
│ (Docker API)│ - Mount /var/run/docker.sock
└──────┬──────┘ - Query Swarm API for tasks
│
│ GET /tasks?service=CHORUS_chorus
▼
┌──────────────────┐
│ Docker Engine │ Returns: List of all 34 tasks
│ (Swarm Manager) │ with container IPs
└────────┬─────────┘
│
┌────┴─────┬──────────────┬────────────┐
▼ ▼ ▼ ▼ (... 34 total)
┌────────┐ ┌────────┐ ┌────────┐ ┌────────┐
│10.0.13.1│ │10.0.13.2│ │10.0.13.3│ │10.0.13.34│
│CHORUS 1│ │CHORUS 2│ │CHORUS 3│ │CHORUS 34│
└────────┘ └────────┘ └────────┘ └────────┘
WHOOSH makes HTTP POST to all 34 container IPs
✅ 100% discovery rate
Code changes:
// File: internal/p2p/discovery.go (add new method)
import (
"github.com/docker/docker/client"
"github.com/docker/docker/api/types"
"github.com/docker/docker/api/types/filters"
)
func (d *Discovery) discoverDockerSwarmAgents() {
cli, err := client.NewClientWithOpts(client.FromEnv)
if err != nil {
log.Error().Err(err).Msg("Failed to create Docker client")
return
}
defer cli.Close()
// Query Swarm for all CHORUS tasks
tasks, err := cli.TaskList(context.Background(), types.TaskListOptions{
Filters: filters.NewArgs(
filters.Arg("service", "CHORUS_chorus"),
filters.Arg("desired-state", "running"),
),
})
if err != nil {
log.Error().Err(err).Msg("Failed to list Swarm tasks")
return
}
log.Info().Int("task_count", len(tasks)).Msg("Discovered Swarm tasks")
for _, task := range tasks {
if len(task.NetworksAttachments) == 0 {
continue
}
for _, attachment := range task.NetworksAttachments {
if len(attachment.Addresses) == 0 {
continue
}
// Extract IP from CIDR (e.g., "10.0.13.5/24" -> "10.0.13.5")
ip := strings.Split(attachment.Addresses[0], "/")[0]
endpoint := fmt.Sprintf("http://%s:8080", ip)
agent := &Agent{
ID: task.ID[:12], // Use first 12 chars of task ID
Name: fmt.Sprintf("CHORUS-%s", task.Slot),
Endpoint: endpoint,
Status: "online",
Capabilities: []string{
"general_development",
"task_coordination",
},
Model: "llama3.1:8b",
LastSeen: time.Now(),
P2PAddr: fmt.Sprintf("%s:9000", ip),
ClusterID: "docker-unified-stack",
}
d.addOrUpdateAgent(agent)
log.Debug().
Str("task_id", task.ID[:12]).
Str("ip", ip).
Str("endpoint", endpoint).
Msg("Discovered CHORUS agent via Docker API")
}
}
}
Deployment changes:
# File: docker/docker-compose.swarm.yml (WHOOSH service)
services:
whoosh:
image: anthonyrawlins/whoosh:latest
volumes:
- /var/run/docker.sock:/var/run/docker.sock:ro # Mount Docker socket
environment:
- WHOOSH_DISCOVERY_MODE=docker-swarm
Pros:
- ✅ Minimal code changes (87 LOC)
- ✅ Immediate solution (1-2 days)
- ✅ 100% discovery rate
- ✅ Easy to test and validate
- ✅ Simple rollback (remove socket mount)
- ✅ No distributed systems complexity
Cons:
- ⚠️ Requires privileged Docker socket access
- ⚠️ Couples WHOOSH to Docker Swarm
- ⚠️ Doesn't enable pub/sub messaging
- ⚠️ Still uses HTTP (no performance benefits)
- ⚠️ Not portable to Kubernetes/other orchestrators
Implementation time: 1-2 days
When to use: Emergency unblock, temporary fix while planning proper migration
4.3 Option 3: NATS-Only Solution (Recommended Long-term)
Architecture:
┌─────────────┐
│ WHOOSH │
│ (NATS client)│ - Connect to NATS broker
└──────┬──────┘ - Publish to subject
│
│ PUBLISH chorus.councils.forming {...}
▼
┌──────────────────────────────────────────┐
│ NATS Server (JetStream) │
│ - Already deployed as backbeat-nats │
│ - Message persistence │
│ - At-least-once delivery │
└──────┬─────┬─────┬─────┬─────┬──────────┘
│ │ │ │ │
┌───▼┐ ┌──▼─┐ ┌─▼──┐ ┌▼──┐ (... 34 total)
│CH1 │ │CH2 │ │CH3 │ │CH4│ All CHORUS agents
│SUB │ │SUB │ │SUB │ │SUB│ subscribe on startup
└────┘ └────┘ └────┘ └───┘
✅ ALL 34 agents receive message via subscription
Code changes - WHOOSH:
// File: internal/nats/publisher.go (new file)
package nats
import (
"context"
"encoding/json"
"fmt"
"github.com/nats-io/nats.go"
"github.com/rs/zerolog/log"
)
type CouncilPublisher struct {
nc *nats.Conn
}
func NewCouncilPublisher(natsURL string) (*CouncilPublisher, error) {
nc, err := nats.Connect(natsURL,
nats.MaxReconnects(-1), // Infinite reconnects
nats.ReconnectWait(1*time.Second),
)
if err != nil {
return nil, fmt.Errorf("failed to connect to NATS: %w", err)
}
log.Info().Str("nats_url", natsURL).Msg("Connected to NATS for council publishing")
return &CouncilPublisher{nc: nc}, nil
}
func (p *CouncilPublisher) PublishCouncilOpportunity(ctx context.Context, opp *CouncilOpportunity) error {
data, err := json.Marshal(opp)
if err != nil {
return fmt.Errorf("failed to marshal opportunity: %w", err)
}
subject := "chorus.councils.forming"
// Publish with headers for tracing
msg := &nats.Msg{
Subject: subject,
Data: data,
Header: nats.Header{
"Council-ID": []string{opp.CouncilID.String()},
"Project": []string{opp.ProjectName},
},
}
if err := p.nc.PublishMsg(msg); err != nil {
return fmt.Errorf("failed to publish to NATS: %w", err)
}
log.Info().
Str("council_id", opp.CouncilID.String()).
Str("subject", subject).
Msg("Published council opportunity to NATS")
return nil
}
func (p *CouncilPublisher) Close() error {
if p.nc != nil {
p.nc.Close()
}
return nil
}
Code changes - CHORUS:
// File: internal/nats/subscriber.go (new file)
package nats
import (
"context"
"encoding/json"
"fmt"
"chorus/internal/council"
"github.com/nats-io/nats.go"
"github.com/rs/zerolog/log"
)
type CouncilSubscriber struct {
nc *nats.Conn
sub *nats.Subscription
councilMgr *council.Manager
}
func NewCouncilSubscriber(natsURL string, councilMgr *council.Manager) (*CouncilSubscriber, error) {
nc, err := nats.Connect(natsURL,
nats.MaxReconnects(-1),
nats.ReconnectWait(1*time.Second),
)
if err != nil {
return nil, fmt.Errorf("failed to connect to NATS: %w", err)
}
s := &CouncilSubscriber{
nc: nc,
councilMgr: councilMgr,
}
// Subscribe to council opportunities
sub, err := nc.Subscribe("chorus.councils.forming", s.handleCouncilOpportunity)
if err != nil {
nc.Close()
return nil, fmt.Errorf("failed to subscribe to council opportunities: %w", err)
}
s.sub = sub
log.Info().
Str("nats_url", natsURL).
Str("subject", "chorus.councils.forming").
Msg("Subscribed to council opportunities on NATS")
return s, nil
}
func (s *CouncilSubscriber) handleCouncilOpportunity(msg *nats.Msg) {
var opp council.Opportunity
if err := json.Unmarshal(msg.Data, &opp); err != nil {
log.Error().Err(err).Msg("Failed to unmarshal council opportunity")
return
}
log.Info().
Str("council_id", opp.CouncilID.String()).
Str("project", opp.ProjectName).
Msg("Received council opportunity from NATS")
// Delegate to council manager (existing logic)
if err := s.councilMgr.HandleOpportunity(context.Background(), &opp); err != nil {
log.Error().
Err(err).
Str("council_id", opp.CouncilID.String()).
Msg("Failed to handle council opportunity")
}
}
func (s *CouncilSubscriber) Close() error {
if s.sub != nil {
s.sub.Unsubscribe()
}
if s.nc != nil {
s.nc.Close()
}
return nil
}
Integration in CHORUS main:
// File: cmd/chorus-agent/main.go
func main() {
// ... existing setup
// Initialize council manager
councilMgr := council.NewManager(agentID, agentName, endpoint, p2pAddr, capabilities)
// NEW: Subscribe to NATS instead of HTTP endpoint
natsURL := os.Getenv("NATS_URL")
if natsURL == "" {
natsURL = "nats://backbeat-nats:4222"
}
natsSubscriber, err := nats.NewCouncilSubscriber(natsURL, councilMgr)
if err != nil {
log.Fatal().Err(err).Msg("Failed to initialize NATS subscriber")
}
defer natsSubscriber.Close()
// HTTP server still runs for health checks (optional)
if enableHTTP {
httpServer := api.NewHTTPServer(cfg, node, hlog, ps)
go httpServer.Start()
}
// ... rest of main
}
Pros:
- ✅ Simpler than libp2p (244 vs 650 LOC)
- ✅ Production-ready (NATS battle-tested)
- ✅ Already deployed (backbeat-nats running)
- ✅ 100% broadcast delivery
- ✅ Message persistence (JetStream)
- ✅ Familiar SDK (used in BACKBEAT)
- ✅ Easy testing (local NATS for dev)
- ✅ Portable (works with any orchestrator)
Cons:
- ⚠️ Centralized (NATS is single point of failure)
- ⚠️ No automatic peer discovery (not needed)
- ⚠️ Requires NATS infrastructure (already have it)
Implementation time: 3-4 days
When to use: Production-grade pub/sub without P2P complexity
4.4 Comparison Matrix
| Criteria | Option 1: libp2p | Option 2: Docker API | Option 3: NATS | Option 4: Hybrid |
|---|---|---|---|---|
| Discovery Success | 100% (gossipsub) | 100% (API query) | 100% (pub/sub) | 100% (both phases) |
| Implementation Time | 7-11 days | 1-2 days | 3-4 days | 2 days + 3-4 days |
| Code Complexity | HIGH (650 LOC) | LOW (87 LOC) | MEDIUM (244 LOC) | LOW → MEDIUM |
| Testing Complexity | HIGH | LOW | MEDIUM | LOW → MEDIUM |
| Production Risk | HIGH | LOW | MEDIUM | LOW (staged) |
| Scalability | Excellent | Good | Excellent | Excellent |
| Infrastructure Deps | None (P2P) | Docker Swarm | NATS (have it) | Both |
| Learning Curve | Steep | Gentle | Gentle | Gentle |
| Backwards Compat | Breaking change | Non-breaking | Breaking change | Dual-mode |
| Rollback Ease | Difficult | Easy | Moderate | Easy (per phase) |
| Future-Proofing | Best (P2P ready) | Poor (Docker-only) | Good | Good |
| Message Persistence | Complex | N/A | Built-in | N/A → Built-in |
| Operational Overhead | High | Low | Low | Low |
| Deployment Coupling | Tight | Tight | Moderate | Moderate |
Scoring (1-10, higher is better):
| Option | Immediate Value | Long-term Value | Risk Level | Total Score |
|---|---|---|---|---|
| Option 1: libp2p | 6/10 | 10/10 | 4/10 | 20/30 |
| Option 2: Docker API | 10/10 | 4/10 | 9/10 | 23/30 |
| Option 3: NATS | 8/10 | 9/10 | 7/10 | 24/30 |
| Option 4: Hybrid | 10/10 | 9/10 | 8/10 | 27/30 ⭐ |
5. Risk Analysis Matrix
5.1 Technical Risks
| Risk | Option 1: libp2p | Option 2: Docker API | Option 3: NATS | Option 4: Hybrid |
|---|---|---|---|---|
| Peer discovery failure | LOW (mDNS/DHT) | N/A | N/A | N/A |
| Message delivery failure | LOW (retry logic) | MEDIUM (HTTP timeout) | LOW (NATS ack) | LOW → LOW |
| Network partition | LOW (eventual consistency) | HIGH (no retries) | MEDIUM (NATS reconnect) | HIGH → MEDIUM |
| Scale bottleneck | LOW (P2P) | MEDIUM (VIP load) | LOW (NATS scales) | MEDIUM → LOW |
| Implementation bugs | HIGH (complex) | LOW (simple) | MEDIUM | LOW → MEDIUM |
| Integration issues | HIGH (new system) | LOW (existing stack) | MEDIUM (new client) | LOW → MEDIUM |
| Performance degradation | LOW (efficient) | MEDIUM (HTTP overhead) | LOW (fast broker) | MEDIUM → LOW |
5.2 Deployment Risks
| Risk | Option 1: libp2p | Option 2: Docker API | Option 3: NATS | Option 4: Hybrid |
|---|---|---|---|---|
| Coordinated deployment | CRITICAL | LOW | HIGH | LOW → HIGH |
| Rollback complexity | HIGH (breaking) | LOW (remove mount) | MEDIUM (revert code) | LOW → MEDIUM |
| Downtime required | YES (breaking) | NO | YES (breaking) | NO → YES |
| Service interruption | HIGH (full restart) | LOW (WHOOSH only) | MEDIUM (both services) | LOW → MEDIUM |
| Config drift | HIGH (new settings) | LOW (one env var) | MEDIUM (NATS URL) | LOW → MEDIUM |
| Version compatibility | CRITICAL | N/A | MEDIUM (NATS version) | N/A → MEDIUM |
5.3 Operational Risks
| Risk | Option 1: libp2p | Option 2: Docker API | Option 3: NATS | Option 4: Hybrid |
|---|---|---|---|---|
| Monitoring gaps | HIGH (new metrics) | LOW (existing) | MEDIUM (NATS metrics) | LOW → MEDIUM |
| Debugging difficulty | HIGH (distributed) | LOW (centralized) | MEDIUM (broker logs) | LOW → MEDIUM |
| On-call burden | HIGH (complex) | LOW (simple) | MEDIUM | LOW → MEDIUM |
| Documentation debt | HIGH (new system) | LOW (minimal change) | MEDIUM (NATS docs) | LOW → MEDIUM |
| Team knowledge | LOW (unfamiliar) | HIGH (Docker known) | MEDIUM (NATS used) | HIGH → MEDIUM |
| Vendor lock-in | LOW (open source) | HIGH (Docker Swarm) | MEDIUM (NATS) | HIGH → MEDIUM |
5.4 Mitigation Strategies
Option 1: libp2p
- Risk: High implementation complexity
- Mitigation:
- Hire libp2p consultant for architecture review
- Build prototype in isolated environment first
- Extensive integration testing before production
- Feature flagging for gradual rollout
Option 2: Docker API
- Risk: Docker Swarm vendor lock-in
- Mitigation:
- Document as temporary solution
- Plan NATS migration immediately after
- Abstract discovery interface for future portability
- Maintain HTTP broadcast as fallback
Option 3: NATS
- Risk: NATS broker single point of failure
- Mitigation:
- Deploy NATS in clustered mode (3 replicas)
- Enable JetStream for message persistence
- Monitor NATS health and set up alerts
- Document NATS recovery procedures
Option 4: Hybrid (Recommended)
- Risk: Phase 2 migration complexity
- Mitigation:
- Validate Phase 1 thoroughly in production (1 week)
- Implement dual-mode operation (HTTP + NATS) during Phase 2
- Use feature flags for gradual NATS rollout
- Maintain HTTP as fallback during transition
6. Detailed Implementation Plans
6.1 Option 2: Docker API Quick Fix (RECOMMENDED SHORT-TERM)
Timeline: 1-2 days
Step 1: Code Changes (4 hours)
File: /home/tony/chorus/project-queues/active/WHOOSH/internal/p2p/discovery.go
// Add import
import (
"github.com/docker/docker/client"
"github.com/docker/docker/api/types"
"github.com/docker/docker/api/types/filters"
)
// Add new method after discoverRealCHORUSAgents()
func (d *Discovery) discoverDockerSwarmAgents() {
if !d.config.DockerEnabled {
return
}
cli, err := client.NewClientWithOpts(client.FromEnv)
if err != nil {
log.Debug().Err(err).Msg("Docker client not available (running outside Swarm?)")
return
}
defer cli.Close()
ctx := context.Background()
// List all tasks for CHORUS service
serviceName := os.Getenv("CHORUS_SERVICE_NAME")
if serviceName == "" {
serviceName = "CHORUS_chorus" // Default stack name
}
tasks, err := cli.TaskList(ctx, types.TaskListOptions{
Filters: filters.NewArgs(
filters.Arg("service", serviceName),
filters.Arg("desired-state", "running"),
),
})
if err != nil {
log.Error().Err(err).Msg("Failed to list Docker Swarm tasks")
return
}
log.Info().
Int("task_count", len(tasks)).
Str("service", serviceName).
Msg("🐳 Discovered Docker Swarm tasks")
discoveredCount := 0
for _, task := range tasks {
// Skip tasks without network attachments
if len(task.NetworksAttachments) == 0 {
log.Debug().Str("task_id", task.ID[:12]).Msg("Task has no network attachments")
continue
}
// Extract IP from first network attachment
for _, attachment := range task.NetworksAttachments {
if len(attachment.Addresses) == 0 {
continue
}
// Parse IP from CIDR notation (e.g., "10.0.13.5/24")
ipCIDR := attachment.Addresses[0]
ip := strings.Split(ipCIDR, "/")[0]
// Construct agent endpoint
port := os.Getenv("CHORUS_AGENT_PORT")
if port == "" {
port = "8080"
}
endpoint := fmt.Sprintf("http://%s:%s", ip, port)
// Create agent entry
agent := &Agent{
ID: task.ID[:12], // Short task ID
Name: fmt.Sprintf("CHORUS-Task-%d", task.Slot),
Endpoint: endpoint,
Status: "online",
Capabilities: []string{
"general_development",
"task_coordination",
"ai_integration",
},
Model: "llama3.1:8b",
LastSeen: time.Now(),
TasksCompleted: 0,
P2PAddr: fmt.Sprintf("%s:9000", ip),
ClusterID: "docker-unified-stack",
}
d.addOrUpdateAgent(agent)
discoveredCount++
log.Debug().
Str("task_id", task.ID[:12]).
Int("slot", int(task.Slot)).
Str("ip", ip).
Str("endpoint", endpoint).
Msg("🤖 Discovered CHORUS agent via Docker API")
break // Only use first network attachment
}
}
log.Info().
Int("discovered", discoveredCount).
Int("total_tasks", len(tasks)).
Msg("✅ Docker Swarm agent discovery completed")
}
Update discoverRealCHORUSAgents() to call new method:
func (d *Discovery) discoverRealCHORUSAgents() {
log.Debug().Msg("🔍 Discovering real CHORUS agents via health endpoints")
// Priority 1: Docker Swarm API (most accurate)
d.discoverDockerSwarmAgents()
// Priority 2: Known service endpoints (fallback)
d.queryActualCHORUSService()
d.discoverKnownEndpoints()
}
Step 2: Dependency Update (5 minutes)
File: /home/tony/chorus/project-queues/active/WHOOSH/go.mod
cd /home/tony/chorus/project-queues/active/WHOOSH
go get github.com/docker/docker@latest
go mod tidy
Step 3: Configuration Changes (10 minutes)
File: /home/tony/chorus/project-queues/active/WHOOSH/docker/docker-compose.swarm.yml
services:
whoosh:
image: anthonyrawlins/whoosh:latest
# NEW: Mount Docker socket for Swarm API access
volumes:
- /var/run/docker.sock:/var/run/docker.sock:ro
# NEW: Environment variables for Docker discovery
environment:
- WHOOSH_DISCOVERY_MODE=docker-swarm
- CHORUS_SERVICE_NAME=CHORUS_chorus # Match your stack name
- CHORUS_AGENT_PORT=8080
# IMPORTANT: Security note - Docker socket access is read-only
# This allows querying tasks but not modifying them
Step 4: Build and Deploy (30 minutes)
# Build new WHOOSH image
cd /home/tony/chorus/project-queues/active/WHOOSH
docker build -t registry.home.deepblack.cloud/whoosh:v1.2.0-docker-discovery .
docker push registry.home.deepblack.cloud/whoosh:v1.2.0-docker-discovery
# Update compose file to use new image
# Edit docker/docker-compose.swarm.yml:
# whoosh:
# image: registry.home.deepblack.cloud/whoosh:v1.2.0-docker-discovery
# Redeploy to Swarm
docker stack deploy -c docker/docker-compose.swarm.yml WHOOSH
Step 5: Validation (1 hour)
# 1. Check WHOOSH logs for discovery
docker service logs -f WHOOSH_whoosh --tail 100
# Expected output:
# {"level":"info","task_count":34,"service":"CHORUS_chorus","message":"🐳 Discovered Docker Swarm tasks"}
# {"level":"info","discovered":34,"total_tasks":34,"message":"✅ Docker Swarm agent discovery completed"}
# 2. Check agent registry in database
docker exec -it $(docker ps -q -f name=WHOOSH_whoosh) psql -U whoosh -d whoosh -c \
"SELECT COUNT(*) FROM agents WHERE status = 'available';"
# Expected: 34 rows
# 3. Trigger council formation
curl -X POST http://whoosh.chorus.services/api/v1/councils \
-H "Content-Type: application/json" \
-d '{
"project_name": "Test Discovery",
"repository": "https://gitea.chorus.services/tony/test",
"core_roles": ["tpm", "senior-software-architect"]
}'
# 4. Verify all agents received broadcast
docker service logs CHORUS_chorus --tail 1000 | grep "Received council opportunity"
# Expected: 34 log entries (one per agent)
Step 6: Monitoring and Rollback Plan (30 minutes)
Success Criteria:
- ✅ All 34 CHORUS agents discovered
- ✅ Council broadcasts reach all agents
- ✅ Council formation completes successfully
- ✅ No performance degradation
Rollback Procedure (if issues occur):
# 1. Remove Docker socket mount from compose file
# 2. Redeploy with previous image
docker stack deploy -c docker/docker-compose.swarm.yml.backup WHOOSH
# 3. Verify rollback
docker service logs WHOOSH_whoosh
Total Implementation Time: 6-8 hours
6.2 Option 3: NATS-Only Solution (RECOMMENDED LONG-TERM)
Timeline: 3-4 days
Day 1: WHOOSH Publisher Implementation
Task 1.1: Create NATS publisher module (3 hours)
New File: /home/tony/chorus/project-queues/active/WHOOSH/internal/nats/publisher.go
package nats
import (
"context"
"encoding/json"
"fmt"
"time"
"github.com/chorus-services/whoosh/internal/p2p"
"github.com/google/uuid"
"github.com/nats-io/nats.go"
"github.com/rs/zerolog/log"
)
type CouncilPublisher struct {
nc *nats.Conn
}
func NewCouncilPublisher(natsURL string) (*CouncilPublisher, error) {
opts := []nats.Option{
nats.Name("WHOOSH Council Publisher"),
nats.MaxReconnects(-1), // Infinite reconnects
nats.ReconnectWait(1 * time.Second),
nats.DisconnectErrHandler(func(nc *nats.Conn, err error) {
log.Warn().Err(err).Msg("NATS disconnected")
}),
nats.ReconnectHandler(func(nc *nats.Conn) {
log.Info().Str("url", nc.ConnectedUrl()).Msg("NATS reconnected")
}),
}
nc, err := nats.Connect(natsURL, opts...)
if err != nil {
return nil, fmt.Errorf("failed to connect to NATS at %s: %w", natsURL, err)
}
log.Info().
Str("nats_url", natsURL).
Str("server_url", nc.ConnectedUrl()).
Msg("✅ Connected to NATS for council publishing")
return &CouncilPublisher{nc: nc}, nil
}
func (p *CouncilPublisher) PublishCouncilOpportunity(ctx context.Context, opp *p2p.CouncilOpportunity) error {
// Marshal opportunity to JSON
data, err := json.Marshal(opp)
if err != nil {
return fmt.Errorf("failed to marshal council opportunity: %w", err)
}
// Construct subject (NATS topic)
subject := "chorus.councils.forming"
// Create message with headers for tracing
msg := &nats.Msg{
Subject: subject,
Data: data,
Header: nats.Header{
"Council-ID": []string{opp.CouncilID.String()},
"Project": []string{opp.ProjectName},
"Repository": []string{opp.Repository},
"UCXL-Address": []string{opp.UCXLAddress},
"Published-At": []string{time.Now().Format(time.RFC3339)},
},
}
// Publish to NATS
if err := p.nc.PublishMsg(msg); err != nil {
return fmt.Errorf("failed to publish to NATS subject %s: %w", subject, err)
}
// Flush to ensure delivery
if err := p.nc.FlushTimeout(5 * time.Second); err != nil {
log.Warn().Err(err).Msg("NATS flush timeout (message likely delivered)")
}
log.Info().
Str("council_id", opp.CouncilID.String()).
Str("project", opp.ProjectName).
Str("subject", subject).
Int("core_roles", len(opp.CoreRoles)).
Int("optional_roles", len(opp.OptionalRoles)).
Msg("📡 Published council opportunity to NATS")
return nil
}
func (p *CouncilPublisher) PublishCouncilStatus(ctx context.Context, update *p2p.CouncilStatusUpdate) error {
data, err := json.Marshal(update)
if err != nil {
return fmt.Errorf("failed to marshal council status: %w", err)
}
subject := fmt.Sprintf("chorus.councils.%s.status", update.CouncilID.String())
msg := &nats.Msg{
Subject: subject,
Data: data,
Header: nats.Header{
"Council-ID": []string{update.CouncilID.String()},
"Status": []string{update.Status},
},
}
if err := p.nc.PublishMsg(msg); err != nil {
return fmt.Errorf("failed to publish council status: %w", err)
}
log.Info().
Str("council_id", update.CouncilID.String()).
Str("status", update.Status).
Str("subject", subject).
Msg("📢 Published council status update to NATS")
return nil
}
func (p *CouncilPublisher) Close() error {
if p.nc != nil && !p.nc.IsClosed() {
p.nc.Close()
log.Info().Msg("Closed NATS connection")
}
return nil
}
func (p *CouncilPublisher) Health() error {
if p.nc == nil {
return fmt.Errorf("NATS connection is nil")
}
if p.nc.IsClosed() {
return fmt.Errorf("NATS connection is closed")
}
if !p.nc.IsConnected() {
return fmt.Errorf("NATS connection is not connected")
}
return nil
}
Task 1.2: Integrate publisher into WHOOSH server (2 hours)
File: /home/tony/chorus/project-queues/active/WHOOSH/internal/server/server.go
import (
natspub "github.com/chorus-services/whoosh/internal/nats"
)
type Server struct {
// ... existing fields
natsPublisher *natspub.CouncilPublisher // NEW
}
func NewServer(cfg *config.Config, db *pgxpool.Pool) *Server {
// ... existing setup
// NEW: Initialize NATS publisher
natsURL := os.Getenv("NATS_URL")
if natsURL == "" {
natsURL = "nats://backbeat-nats:4222" // Default to BACKBEAT NATS
}
natsPublisher, err := natspub.NewCouncilPublisher(natsURL)
if err != nil {
log.Fatal().Err(err).Msg("Failed to initialize NATS publisher")
}
return &Server{
// ... existing fields
natsPublisher: natsPublisher,
}
}
func (s *Server) Stop() error {
// ... existing shutdown
// NEW: Close NATS connection
if s.natsPublisher != nil {
s.natsPublisher.Close()
}
return nil
}
Task 1.3: Update broadcaster to use NATS (2 hours)
File: /home/tony/chorus/project-queues/active/WHOOSH/internal/p2p/broadcaster.go
// Add field to Broadcaster struct
type Broadcaster struct {
discovery *Discovery
natsPublisher *nats.CouncilPublisher // NEW
ctx context.Context
cancel context.CancelFunc
}
// Update BroadcastCouncilOpportunity to use NATS
func (b *Broadcaster) BroadcastCouncilOpportunity(ctx context.Context, opportunity *CouncilOpportunity) error {
log.Info().
Str("council_id", opportunity.CouncilID.String()).
Str("project_name", opportunity.ProjectName).
Msg("📡 Broadcasting council opportunity via NATS")
// NEW: Use NATS pub/sub instead of HTTP
if b.natsPublisher != nil {
err := b.natsPublisher.PublishCouncilOpportunity(ctx, opportunity)
if err != nil {
log.Error().Err(err).Msg("Failed to publish via NATS, falling back to HTTP")
// Fallback to HTTP (during transition period)
return b.broadcastViaHTTP(ctx, opportunity)
}
log.Info().
Str("council_id", opportunity.CouncilID.String()).
Msg("✅ Council opportunity published to NATS")
return nil
}
// Fallback: Use existing HTTP broadcast
return b.broadcastViaHTTP(ctx, opportunity)
}
// Keep existing HTTP method as fallback
func (b *Broadcaster) broadcastViaHTTP(ctx context.Context, opportunity *CouncilOpportunity) error {
// ... existing HTTP broadcast logic
}
Day 2: CHORUS Subscriber Implementation
Task 2.1: Create NATS subscriber module (3 hours)
New File: /home/tony/chorus/project-queues/active/CHORUS/internal/nats/subscriber.go
package nats
import (
"context"
"encoding/json"
"fmt"
"time"
"chorus/internal/council"
"github.com/nats-io/nats.go"
"github.com/rs/zerolog/log"
)
type CouncilSubscriber struct {
nc *nats.Conn
subs []*nats.Subscription
councilMgr *council.Manager
ctx context.Context
cancel context.CancelFunc
}
func NewCouncilSubscriber(natsURL string, councilMgr *council.Manager) (*CouncilSubscriber, error) {
ctx, cancel := context.WithCancel(context.Background())
opts := []nats.Option{
nats.Name(fmt.Sprintf("CHORUS Agent %s", councilMgr.AgentID)),
nats.MaxReconnects(-1),
nats.ReconnectWait(1 * time.Second),
nats.DisconnectErrHandler(func(nc *nats.Conn, err error) {
log.Warn().Err(err).Msg("NATS disconnected")
}),
nats.ReconnectHandler(func(nc *nats.Conn) {
log.Info().Msg("NATS reconnected")
}),
}
nc, err := nats.Connect(natsURL, opts...)
if err != nil {
cancel()
return nil, fmt.Errorf("failed to connect to NATS: %w", err)
}
s := &CouncilSubscriber{
nc: nc,
subs: make([]*nats.Subscription, 0),
councilMgr: councilMgr,
ctx: ctx,
cancel: cancel,
}
// Subscribe to council opportunities
if err := s.subscribeToOpportunities(); err != nil {
nc.Close()
cancel()
return nil, fmt.Errorf("failed to subscribe to opportunities: %w", err)
}
// Subscribe to council status updates
if err := s.subscribeToStatusUpdates(); err != nil {
log.Warn().Err(err).Msg("Failed to subscribe to status updates (non-critical)")
}
log.Info().
Str("nats_url", natsURL).
Str("agent_id", councilMgr.AgentID).
Msg("✅ Subscribed to NATS council topics")
return s, nil
}
func (s *CouncilSubscriber) subscribeToOpportunities() error {
subject := "chorus.councils.forming"
sub, err := s.nc.Subscribe(subject, func(msg *nats.Msg) {
s.handleCouncilOpportunity(msg)
})
if err != nil {
return fmt.Errorf("failed to subscribe to %s: %w", subject, err)
}
s.subs = append(s.subs, sub)
log.Info().
Str("subject", subject).
Msg("📬 Subscribed to council opportunities")
return nil
}
func (s *CouncilSubscriber) subscribeToStatusUpdates() error {
subject := "chorus.councils.*.status" // Wildcard subscription
sub, err := s.nc.Subscribe(subject, func(msg *nats.Msg) {
s.handleCouncilStatus(msg)
})
if err != nil {
return fmt.Errorf("failed to subscribe to %s: %w", subject, err)
}
s.subs = append(s.subs, sub)
log.Info().
Str("subject", subject).
Msg("📬 Subscribed to council status updates")
return nil
}
func (s *CouncilSubscriber) handleCouncilOpportunity(msg *nats.Msg) {
// Parse council opportunity from message
var opp council.Opportunity
if err := json.Unmarshal(msg.Data, &opp); err != nil {
log.Error().
Err(err).
Str("subject", msg.Subject).
Msg("Failed to unmarshal council opportunity")
return
}
// Extract headers for logging
councilID := msg.Header.Get("Council-ID")
projectName := msg.Header.Get("Project")
log.Info().
Str("council_id", councilID).
Str("project", projectName).
Str("subject", msg.Subject).
Int("core_roles", len(opp.CoreRoles)).
Msg("📥 Received council opportunity from NATS")
// Delegate to council manager (existing logic)
ctx, cancel := context.WithTimeout(s.ctx, 30*time.Second)
defer cancel()
if err := s.councilMgr.HandleOpportunity(ctx, &opp); err != nil {
log.Error().
Err(err).
Str("council_id", councilID).
Msg("Failed to handle council opportunity")
return
}
log.Debug().
Str("council_id", councilID).
Msg("Successfully processed council opportunity")
}
func (s *CouncilSubscriber) handleCouncilStatus(msg *nats.Msg) {
var status council.StatusUpdate
if err := json.Unmarshal(msg.Data, &status); err != nil {
log.Error().Err(err).Msg("Failed to unmarshal council status")
return
}
log.Info().
Str("council_id", status.CouncilID.String()).
Str("status", status.Status).
Msg("📥 Received council status update from NATS")
// Delegate to council manager
if err := s.councilMgr.HandleStatusUpdate(s.ctx, &status); err != nil {
log.Error().
Err(err).
Str("council_id", status.CouncilID.String()).
Msg("Failed to handle council status update")
}
}
func (s *CouncilSubscriber) Close() error {
s.cancel()
// Unsubscribe all subscriptions
for _, sub := range s.subs {
if err := sub.Unsubscribe(); err != nil {
log.Warn().Err(err).Msg("Failed to unsubscribe from NATS")
}
}
// Close NATS connection
if s.nc != nil && !s.nc.IsClosed() {
s.nc.Close()
log.Info().Msg("Closed NATS connection")
}
return nil
}
func (s *CouncilSubscriber) Health() error {
if s.nc == nil {
return fmt.Errorf("NATS connection is nil")
}
if s.nc.IsClosed() {
return fmt.Errorf("NATS connection is closed")
}
if !s.nc.IsConnected() {
return fmt.Errorf("NATS not connected")
}
return nil
}
Task 2.2: Integrate subscriber into CHORUS main (2 hours)
File: /home/tony/chorus/project-queues/active/CHORUS/cmd/chorus-agent/main.go
import (
natssub "chorus/internal/nats"
)
func main() {
// ... existing setup
// Initialize council manager
councilMgr := council.NewManager(agentID, agentName, endpoint, p2pAddr, capabilities)
// NEW: Subscribe to NATS instead of HTTP endpoint
natsURL := os.Getenv("NATS_URL")
if natsURL == "" {
natsURL = "nats://backbeat-nats:4222"
}
natsSubscriber, err := natssub.NewCouncilSubscriber(natsURL, councilMgr)
if err != nil {
log.Fatal().Err(err).Msg("Failed to initialize NATS subscriber")
}
defer natsSubscriber.Close()
log.Info().
Str("agent_id", agentID).
Str("nats_url", natsURL).
Msg("🎧 CHORUS agent listening for council opportunities via NATS")
// HTTP server still runs for health checks
httpServer := api.NewHTTPServer(cfg, node, hlog, ps)
go func() {
if err := httpServer.Start(); err != nil && err != http.ErrServerClosed {
log.Error().Err(err).Msg("HTTP server error")
}
}()
// ... rest of main
}
Day 3-4: Testing, Deployment, and Validation
Task 3.1: Local testing with Docker Compose (4 hours)
Create test environment: /home/tony/chorus/project-queues/active/WHOOSH/docker-compose.nats-test.yml
version: '3.8'
services:
nats:
image: nats:2.9-alpine
command: ["--jetstream", "--debug"]
ports:
- "4222:4222"
- "8222:8222" # Monitoring port
networks:
- test-net
whoosh:
build: .
environment:
- NATS_URL=nats://nats:4222
- CHORUS_SERVICE_NAME=test_chorus
depends_on:
- nats
networks:
- test-net
chorus-agent-1:
build: ../CHORUS
environment:
- NATS_URL=nats://nats:4222
- CHORUS_AGENT_ID=test-agent-1
depends_on:
- nats
networks:
- test-net
chorus-agent-2:
build: ../CHORUS
environment:
- NATS_URL=nats://nats:4222
- CHORUS_AGENT_ID=test-agent-2
depends_on:
- nats
networks:
- test-net
networks:
test-net:
driver: bridge
Run tests:
cd /home/tony/chorus/project-queues/active/WHOOSH
docker-compose -f docker-compose.nats-test.yml up
# In another terminal, trigger council formation
curl -X POST http://localhost:8080/api/v1/councils \
-H "Content-Type: application/json" \
-d '{
"project_name": "NATS Test",
"repository": "https://gitea.chorus.services/tony/test",
"core_roles": ["tpm", "developer"]
}'
# Verify both agents received message
docker-compose -f docker-compose.nats-test.yml logs chorus-agent-1 | grep "Received council opportunity"
docker-compose -f docker-compose.nats-test.yml logs chorus-agent-2 | grep "Received council opportunity"
Task 3.2: Production deployment (4 hours)
# Build and push WHOOSH
cd /home/tony/chorus/project-queues/active/WHOOSH
docker build -t registry.home.deepblack.cloud/whoosh:v1.3.0-nats .
docker push registry.home.deepblack.cloud/whoosh:v1.3.0-nats
# Build and push CHORUS
cd /home/tony/chorus/project-queues/active/CHORUS
make build-agent
docker build -f Dockerfile.simple -t registry.home.deepblack.cloud/chorus:v0.6.0-nats .
docker push registry.home.deepblack.cloud/chorus:v0.6.0-nats
# Update docker-compose files
# Update WHOOSH_BACKBEAT-prototype/docker-compose.swarm.yml:
# whoosh:
# image: registry.home.deepblack.cloud/whoosh:v1.3.0-nats
# environment:
# - NATS_URL=nats://backbeat-nats:4222
# Update CHORUS/docker/docker-compose.yml:
# chorus:
# image: registry.home.deepblack.cloud/chorus:v0.6.0-nats
# environment:
# - NATS_URL=nats://backbeat-nats:4222
# Deploy to Swarm
docker stack deploy -c WHOOSH_BACKBEAT-prototype/docker-compose.swarm.yml WHOOSH
docker stack deploy -c CHORUS/docker/docker-compose.yml CHORUS
Task 3.3: Validation and monitoring (2 hours)
# Monitor NATS connections
docker service logs WHOOSH_backbeat-nats --tail 100
# Monitor WHOOSH publishing
docker service logs WHOOSH_whoosh | grep "Published council opportunity"
# Monitor CHORUS subscriptions
docker service logs CHORUS_chorus | grep "Received council opportunity"
# Verify council formation works end-to-end
# (trigger test council via UI or API)
Total Implementation Time: 3-4 days
7. Timeline Estimates
7.1 Gantt Chart - Option 4: Hybrid Approach (RECOMMENDED)
PHASE 1: Docker API Quick Fix (Week 1)
=========================================
Day 1 (Friday)
├─ 09:00-13:00 Implementation
│ ├─ 09:00-11:00 Code changes (discovery.go)
│ ├─ 11:00-11:30 Dependency updates (go.mod)
│ ├─ 11:30-12:00 Docker compose changes
│ └─ 12:00-13:00 Build and push images
│
└─ 14:00-17:00 Testing and Deployment
├─ 14:00-15:00 Local testing
├─ 15:00-16:00 Swarm deployment
├─ 16:00-17:00 Validation and monitoring
└─ 17:00 ✅ Phase 1 Complete
PHASE 1 VALIDATION (Week 2)
===========================
Monday-Friday
├─ Daily monitoring
├─ Performance metrics collection
├─ Bug fixes (if needed)
└─ Friday: Phase 1 sign-off meeting
PHASE 2: NATS Migration (Week 3)
=================================
Monday
├─ 09:00-12:00 WHOOSH publisher implementation
└─ 13:00-17:00 WHOOSH integration and testing
Tuesday
├─ 09:00-12:00 CHORUS subscriber implementation
└─ 13:00-17:00 CHORUS integration and testing
Wednesday
├─ 09:00-12:00 Local integration testing
├─ 13:00-15:00 Build and push images
└─ 15:00-17:00 Staging deployment
Thursday
├─ 09:00-11:00 Staging validation
├─ 11:00-13:00 Production deployment
└─ 14:00-17:00 Production monitoring
Friday
├─ 09:00-12:00 Performance validation
├─ 13:00-15:00 Remove HTTP fallback code
├─ 15:00-16:00 Documentation
└─ 16:00 ✅ Phase 2 Complete
Total: 2 weeks + 3 days
7.2 Resource Allocation
| Role | Phase 1 (Days) | Phase 2 (Days) | Total |
|---|---|---|---|
| Senior Software Architect | 0.5 | 1.0 | 1.5 |
| Backend Developer | 1.0 | 3.0 | 4.0 |
| DevOps Engineer | 0.5 | 1.0 | 1.5 |
| QA Engineer | 0.5 | 1.0 | 1.5 |
| Total Person-Days | 2.5 | 6.0 | 8.5 |
7.3 Critical Path Analysis
Phase 1 Critical Path:
- Code implementation (4 hours) - BLOCKER
- Docker socket mount (10 min) - BLOCKER
- Build/push (30 min) - BLOCKER
- Deployment (30 min) - BLOCKER
- Validation (1 hour) - BLOCKER
Total Critical Path: 6 hours (can be completed in 1 day)
Phase 2 Critical Path:
- WHOOSH publisher (Day 1) - BLOCKER
- CHORUS subscriber (Day 2) - BLOCKER for Day 3
- Integration testing (Day 3 AM) - BLOCKER for deployment
- Production deployment (Day 3 PM) - BLOCKER
- Validation (Day 4) - BLOCKER for sign-off
Total Critical Path: 3.5 days
8. Final Recommendation
8.1 Executive Summary
Implement Option 4: Hybrid Approach
Phase 1 (Next 48 hours): Deploy Docker API Quick Fix Phase 2 (Week 3): Migrate to NATS-Only Solution Defer: Full libp2p migration until P2P discovery is truly needed
8.2 Rationale
Why NOT Full libp2p Migration Now?
-
Complexity vs. Value: libp2p requires 650+ LOC changes, 7-11 days implementation, and steep learning curve. Current requirements (broadcast to all agents) don't justify this complexity.
-
Infrastructure Already Exists: NATS is already deployed (backbeat-nats service) and proven in production. Using existing infrastructure reduces risk and operational overhead.
-
No P2P Discovery Needed Yet: Current architecture has centralized WHOOSH coordinator. True peer-to-peer discovery (mDNS/DHT) isn't required until agents need to discover each other without WHOOSH.
-
Migration Path Preserved: NATS pub/sub can later be replaced with libp2p gossipsub with minimal changes (both use topic-based messaging).
Why Docker API First?
-
Immediate Unblock: Solves the critical discovery problem in 1 day, unblocking E2E workflow testing.
-
Minimal Risk: 87 LOC change, easy rollback, no architectural changes.
-
Validation Period: Gives 1 week to validate council formation works before starting Phase 2.
-
Dual-Mode Safety: Allows testing NATS migration while keeping Docker API as fallback.
Why NATS Long-Term?
-
Right Complexity Level: Solves 90% of use cases (broadcast, persistence, reliability) with 20% of libp2p complexity.
-
Production-Ready: NATS is battle-tested, well-documented, and the team has experience with it (BACKBEAT SDK).
-
Performance: Sub-millisecond message delivery, scales to millions of messages/sec.
-
Operational Simplicity: Centralized monitoring, simple debugging, clear failure modes.
-
Future-Proof: When/if P2P discovery is needed, NATS can remain for messaging while adding libp2p for discovery only.
8.3 Implementation Sequence
┌─────────────────────────────────────────────────────────┐
│ WEEK 1: Emergency Unblock │
├─────────────────────────────────────────────────────────┤
│ Friday: Docker API implementation (6 hours) │
│ Result: 100% agent discovery, council formation works │
└─────────────────────────────────────────────────────────┘
↓
┌─────────────────────────────────────────────────────────┐
│ WEEK 2: Validation │
├─────────────────────────────────────────────────────────┤
│ Mon-Fri: Monitor production, collect metrics │
│ Result: Confidence in Docker API solution │
└─────────────────────────────────────────────────────────┘
↓
┌─────────────────────────────────────────────────────────┐
│ WEEK 3: NATS Migration │
├─────────────────────────────────────────────────────────┤
│ Mon: WHOOSH publisher implementation │
│ Tue: CHORUS subscriber implementation │
│ Wed AM: Integration testing │
│ Wed PM: Production deployment (dual-mode) │
│ Thu-Fri: Validation, remove Docker API fallback │
│ Result: Production NATS-based pub/sub messaging │
└─────────────────────────────────────────────────────────┘
↓
┌─────────────────────────────────────────────────────────┐
│ FUTURE: libp2p (when needed) │
├─────────────────────────────────────────────────────────┤
│ Trigger: Need for P2P discovery, DHT storage, or │
│ cross-datacenter mesh │
│ Approach: Add libp2p for discovery, keep NATS for │
│ messaging (hybrid architecture) │
└─────────────────────────────────────────────────────────┘
8.4 Success Metrics
Phase 1 Success Criteria:
- ✅ All 34 CHORUS agents discovered by WHOOSH
- ✅ Council broadcasts reach 100% of agents
- ✅ Council formation completes in <60 seconds
- ✅ Task execution begins successfully
- ✅ Zero production incidents
Phase 2 Success Criteria:
- ✅ NATS message delivery rate >99.9%
- ✅ Message latency <100ms (p95)
- ✅ Zero message loss
- ✅ Successful rollback test
- ✅ Zero production incidents during migration
8.5 Risk Mitigation Summary
| Risk | Mitigation |
|---|---|
| Phase 1 deployment failure | Easy rollback (remove docker.sock mount) |
| Docker API performance issues | Monitor task list query latency, cache results |
| Phase 2 NATS connection failures | Keep HTTP fallback during transition, NATS clustering |
| Message delivery failures | JetStream persistence, at-least-once delivery guarantees |
| Coordinated deployment issues | Dual-mode operation (HTTP + NATS) during transition |
| Team knowledge gaps | NATS already used in BACKBEAT, extensive documentation |
8.6 Final Architecture Vision
┌────────────────────────────────────────────────────┐
│ FINAL ARCHITECTURE │
│ (End of Week 3) │
└────────────────────────────────────────────────────┘
┌─────────────┐
│ WHOOSH │ - NATS publisher
│ │ - Council orchestrator
└──────┬──────┘ - Docker API for health monitoring
│
│ Publish to NATS subject
│ "chorus.councils.forming"
▼
┌──────────────────────────────────────────────────┐
│ NATS Server (JetStream) │
│ - Already deployed as backbeat-nats │
│ - Clustered for HA (future) │
│ - Message persistence enabled │
│ - Monitoring via NATS CLI / Prometheus │
└──────┬──────┬──────┬──────┬──────┬──────────────┘
│ │ │ │ │
┌───▼┐ ┌──▼─┐ ┌──▼─┐ ┌─▼──┐ (... 34 total)
│CH1 │ │CH2 │ │CH3 │ │CH4 │ CHORUS agents
│SUB │ │SUB │ │SUB │ │SUB │ - NATS subscribers
└────┘ └────┘ └────┘ └────┘ - HTTP for health checks
- libp2p for P2P comms (future)
Benefits:
✅ 100% agent discovery
✅ Sub-100ms message delivery
✅ At-least-once delivery guarantees
✅ Message persistence and replay
✅ Scalable to 1000+ agents
✅ Familiar technology stack
✅ Simple operational model
✅ Clear upgrade path to libp2p
8.7 Action Items
Immediate (Next 24 hours):
- Approve Hybrid Approach recommendation
- Schedule Phase 1 implementation (Friday)
- Assign developer resources
- Prepare rollback procedures
- Set up monitoring dashboards
Week 1:
- Implement Docker API discovery
- Deploy to production Swarm
- Validate 100% agent discovery
- Verify council formation E2E
- Document Phase 1 results
Week 2:
- Monitor production stability
- Collect performance metrics
- Plan Phase 2 NATS migration
- Review NATS clustering options
- Schedule Phase 2 deployment window
Week 3:
- Implement NATS publisher/subscriber
- Integration testing
- Production deployment (dual-mode)
- Validate NATS messaging
- Remove Docker API fallback
- Document final architecture
9. Appendix
9.1 ASCII Architecture Diagram - Current vs. Future
CURRENT (Broken):
┌─────────┐ ┌─────────────────────┐
│ WHOOSH │────HTTP GET───────→│ Docker Swarm VIP │
│ │ (chorus:8081) │ 10.0.13.26 │
└─────────┘ └──────────┬──────────┘
│
Round-robin to random
↓
┌───────────┴───────────┐
▼ ▼
┌─────────┐ ┌─────────┐
│CHORUS #1│ │CHORUS #2│
│ FOUND │ │ FOUND │
└─────────┘ └─────────┘
❌ 32 other agents invisible!
PHASE 1 (Docker API):
┌─────────┐ ┌─────────────────────┐
│ WHOOSH │────Docker API─────→│ Swarm Manager │
│ + sock │ ListTasks() │ /var/run/docker.sock│
└─────────┘ └──────────┬──────────┘
│
Returns ALL 34 tasks
with IPs
↓
┌─────────────────────┴──────────────┐
▼ ▼ ▼
┌─────────┐ ┌─────────┐ ┌─────────┐
│CHORUS #1│ │CHORUS #2│ ... │CHORUS #34│
│10.0.13.1│ │10.0.13.2│ │10.0.13.34│
└─────────┘ └─────────┘ └─────────┘
✅ ALL agents discovered via direct IP!
PHASE 2 (NATS):
┌─────────┐ ┌─────────────────────┐
│ WHOOSH │────PUBLISH────────→│ NATS Server │
│ + NATS │ subject: │ (JetStream) │
└─────────┘ chorus.councils.* └──────────┬──────────┘
│
Broadcast to
all subscribers
│
┌─────────────────────┴──────────────┐
▼ ▼ ▼
┌─────────┐ ┌─────────┐ ┌─────────┐
│CHORUS #1│ │CHORUS #2│ ... │CHORUS #34│
│SUB: │ │SUB: │ │SUB: │
│forming │ │forming │ │forming │
└─────────┘ └─────────┘ └─────────┘
✅ Pub/sub messaging, all receive simultaneously!
9.2 Code File Changes Summary
Phase 1: Docker API
| File | Action | LOC | Description |
|---|---|---|---|
WHOOSH/internal/p2p/discovery.go |
MODIFY | +80 | Add discoverDockerSwarmAgents() |
WHOOSH/go.mod |
MODIFY | +2 | Add docker/docker dependency |
WHOOSH/docker-compose.swarm.yml |
MODIFY | +5 | Mount docker.sock |
| Total | 87 |
Phase 2: NATS
| File | Action | LOC | Description |
|---|---|---|---|
WHOOSH/internal/nats/publisher.go |
CREATE | +150 | NATS publisher implementation |
WHOOSH/internal/server/server.go |
MODIFY | +20 | Integrate NATS publisher |
WHOOSH/internal/p2p/broadcaster.go |
MODIFY | +30 | Use NATS for broadcasting |
WHOOSH/go.mod |
MODIFY | +2 | Add nats-io/nats.go |
CHORUS/internal/nats/subscriber.go |
CREATE | +180 | NATS subscriber implementation |
CHORUS/cmd/chorus-agent/main.go |
MODIFY | +30 | Initialize NATS subscriber |
CHORUS/go.mod |
MODIFY | +2 | Add nats-io/nats.go |
| Total | 414 |
9.3 Environment Variables Reference
WHOOSH:
# Phase 1
WHOOSH_DISCOVERY_MODE=docker-swarm
CHORUS_SERVICE_NAME=CHORUS_chorus
CHORUS_AGENT_PORT=8080
# Phase 2
NATS_URL=nats://backbeat-nats:4222
CHORUS:
# Phase 2
NATS_URL=nats://backbeat-nats:4222
CHORUS_AGENT_ID=auto-generated-uuid
CHORUS_AGENT_NAME=CHORUS-Agent-{slot}
9.4 Monitoring and Observability
Phase 1 Metrics:
# WHOOSH discovery metrics
whoosh_agent_discovery_total{method="docker_api"} 34
whoosh_agent_discovery_duration_seconds{method="docker_api"} 0.125
whoosh_broadcast_success_total{council_id="xxx"} 34
whoosh_broadcast_error_total{council_id="xxx"} 0
Phase 2 Metrics:
# NATS metrics (built-in)
nats_connections{name="WHOOSH"} 1
nats_subscriptions{subject="chorus.councils.forming"} 34
nats_messages_in{subject="chorus.councils.forming"} 156
nats_messages_out{subject="chorus.councils.forming"} 156
nats_message_latency_seconds{p95} 0.045
# Application metrics
whoosh_nats_publish_total{subject="chorus.councils.forming"} 156
whoosh_nats_publish_errors_total 0
chorus_nats_receive_total{subject="chorus.councils.forming"} 34
chorus_council_opportunity_handled_total 34
Conclusion
The Hybrid Approach (Docker API → NATS) provides the optimal balance of:
- Immediate value (1-day unblock)
- Low risk (staged rollout)
- Long-term quality (production-grade pub/sub)
- Pragmatic complexity (simpler than libp2p, sufficient for needs)
This architecture will support 100+ agents, sub-second message delivery, and future migration to full libp2p if/when P2P discovery becomes a requirement.
Recommended Decision: Approve Hybrid Approach and begin Phase 1 implementation immediately.
Document Version: 1.0 Last Updated: 2025-10-10 Review Date: 2025-10-17 (after Phase 1 completion)