2269 lines
74 KiB
Markdown
2269 lines
74 KiB
Markdown
# Agent Discovery and P2P Architecture Migration Analysis
|
|
|
|
**Date**: 2025-10-10
|
|
**Author**: Claude Code (Senior Software Architect)
|
|
**Status**: CRITICAL ARCHITECTURE DECISION
|
|
**Context**: WHOOSH discovers only 2/34 CHORUS agents, blocking E2E workflow
|
|
|
|
---
|
|
|
|
## Executive Summary
|
|
|
|
The current HTTP/DNS-based discovery architecture is **fundamentally incompatible** with Docker Swarm's VIP-based load balancing. This analysis evaluates three migration approaches to resolve the agent discovery crisis and provides detailed implementation guidance.
|
|
|
|
### Recommendation Summary
|
|
|
|
**SHORT-TERM (Next 24-48 hours)**: Deploy **Option 2 - Docker API Quick Fix**
|
|
**LONG-TERM (Next 2 weeks)**: Migrate to **Option 4 - NATS-Only Solution** (simpler than full libp2p)
|
|
|
|
**Rationale**: Docker API provides immediate unblock with minimal risk, while NATS-based pub/sub delivers 80% of libp2p benefits with 20% of the complexity. Full libp2p migration deferred until true P2P discovery (mDNS/DHT) is required.
|
|
|
|
---
|
|
|
|
## Table of Contents
|
|
|
|
1. [Current Architecture Analysis](#1-current-architecture-analysis)
|
|
2. [Complexity Assessment](#2-complexity-assessment)
|
|
3. [Technical Feasibility Evaluation](#3-technical-feasibility-evaluation)
|
|
4. [Alternative Solution Comparison](#4-alternative-solution-comparison)
|
|
5. [Risk Analysis Matrix](#5-risk-analysis-matrix)
|
|
6. [Detailed Implementation Plans](#6-detailed-implementation-plans)
|
|
7. [Timeline Estimates](#7-timeline-estimates)
|
|
8. [Final Recommendation](#8-final-recommendation)
|
|
|
|
---
|
|
|
|
## 1. Current Architecture Analysis
|
|
|
|
### 1.1 Existing System Flow
|
|
|
|
```
|
|
┌──────────────────────────────────────────────────────────────┐
|
|
│ CURRENT BROKEN FLOW │
|
|
└──────────────────────────────────────────────────────────────┘
|
|
|
|
┌─────────────┐
|
|
│ WHOOSH │ Discovery Service
|
|
│ (1 node) │ - internal/p2p/discovery.go
|
|
└──────┬──────┘ - internal/p2p/broadcaster.go
|
|
│
|
|
│ 1. DNS Lookup: "chorus" service
|
|
▼
|
|
┌──────────────────┐
|
|
│ Docker Swarm │ Returns: 10.0.13.26 (VIP only)
|
|
│ DNS Service │ Expected: 34 container IPs
|
|
└────────┬─────────┘
|
|
│
|
|
│ 2. HTTP GET http://chorus:8081/health
|
|
▼
|
|
┌──────────────────┐
|
|
│ Swarm VIP LB │ Round-robins to random containers
|
|
│ 10.0.13.26 │ WHOOSH gets ~2 unique responses
|
|
└────────┬─────────┘
|
|
│
|
|
┌────┴─────┬──────────────┬────────────┐
|
|
▼ ▼ ▼ ▼ (... 34 total)
|
|
┌────────┐ ┌────────┐ ┌────────┐ ┌────────┐
|
|
│ CHORUS │ │ CHORUS │ │ CHORUS │ │ CHORUS │
|
|
│ Agent1 │ │ Agent2 │ │ Agent3 │ │ Agent34│
|
|
└────────┘ └────────┘ └────────┘ └────────┘
|
|
✅ ✅ ❌ ❌
|
|
(Discovered) (Discovered) (Invisible) (Invisible)
|
|
|
|
Result: Only 2/34 agents discovered → Council formation fails
|
|
```
|
|
|
|
### 1.2 Root Cause Analysis
|
|
|
|
**Problem**: Docker Swarm DNS returns Service VIP, not individual container IPs
|
|
|
|
**Evidence from codebase**:
|
|
```go
|
|
// File: /home/tony/chorus/project-queues/active/WHOOSH/internal/p2p/discovery.go:222-235
|
|
|
|
func (d *Discovery) queryActualCHORUSService() {
|
|
client := &http.Client{Timeout: 10 * time.Second}
|
|
|
|
// ❌ BROKEN: This queries VIP, not all containers
|
|
endpoint := "http://chorus:8081/health"
|
|
resp, err := client.Get(endpoint)
|
|
|
|
// Only discovers ONE agent per request
|
|
// VIP load-balancer picks random container
|
|
// Result: ~2-5 agents found after multiple attempts
|
|
}
|
|
```
|
|
|
|
**Why this fails**:
|
|
1. DNS `chorus` → `10.0.13.26` (VIP only)
|
|
2. No API to enumerate all 34 backend containers
|
|
3. HTTP requires knowing endpoints in advance
|
|
4. VIP randomization means discovery is non-deterministic
|
|
|
|
---
|
|
|
|
## 2. Complexity Assessment
|
|
|
|
### 2.1 Option 1: Full libp2p/HMMM Migration
|
|
|
|
**Components to Modify**:
|
|
|
|
| Component | File Path | Changes Required | LOC Changed | Complexity |
|
|
|-----------|-----------|------------------|-------------|------------|
|
|
| WHOOSH Discovery | `internal/p2p/discovery.go` | Replace HTTP with libp2p host init | 200+ | HIGH |
|
|
| WHOOSH Broadcaster | `internal/p2p/broadcaster.go` | Convert to pub/sub topics | 150+ | HIGH |
|
|
| WHOOSH Main | `cmd/whoosh/main.go` | Initialize libp2p node | 50+ | MEDIUM |
|
|
| CHORUS HTTP API | `api/http_server.go` | Add topic subscriptions | 100+ | MEDIUM |
|
|
| CHORUS Council Mgr | `internal/council/manager.go` | Subscribe to topics | 80+ | MEDIUM |
|
|
| CHORUS Main | `cmd/chorus-agent/main.go` | Initialize libp2p node | 50+ | MEDIUM |
|
|
| HMMM BACKBEAT Config | Docker compose updates | Topic configuration | 20+ | LOW |
|
|
| **TOTAL** | | | **650+ LOC** | **HIGH** |
|
|
|
|
**Complexity Rating**: **HIGH**
|
|
|
|
**Rationale**:
|
|
- libp2p requires understanding gossipsub protocol
|
|
- Peer discovery via mDNS/DHT adds network complexity
|
|
- Need to maintain both HTTP (health checks) and libp2p
|
|
- Breaking change requiring coordinated deployment
|
|
- Testing requires full cluster stack
|
|
|
|
### 2.2 Option 2: Docker API Quick Fix
|
|
|
|
**Components to Modify**:
|
|
|
|
| Component | File Path | Changes Required | LOC Changed | Complexity |
|
|
|-----------|-----------|------------------|-------------|------------|
|
|
| WHOOSH Discovery | `internal/p2p/discovery.go` | Add Docker client integration | 80 | LOW |
|
|
| WHOOSH Config | `docker-compose.swarm.yml` | Mount docker.sock | 5 | LOW |
|
|
| Dependencies | `go.mod` | Add docker/docker library | 2 | LOW |
|
|
| **TOTAL** | | | **87 LOC** | **LOW** |
|
|
|
|
**Complexity Rating**: **LOW**
|
|
|
|
**Rationale**:
|
|
- Single file modification
|
|
- Well-documented Docker Go SDK
|
|
- No architecture changes
|
|
- Easy rollback (remove socket mount)
|
|
- No distributed systems complexity
|
|
|
|
### 2.3 Option 3: NATS-Only Solution
|
|
|
|
**Components to Modify**:
|
|
|
|
| Component | File Path | Changes Required | LOC Changed | Complexity |
|
|
|-----------|-----------|------------------|-------------|------------|
|
|
| WHOOSH Publisher | `internal/p2p/broadcaster.go` | Replace HTTP with NATS pub | 100 | MEDIUM |
|
|
| WHOOSH Main | `cmd/whoosh/main.go` | Initialize NATS connection | 30 | LOW |
|
|
| CHORUS Subscriber | `api/http_server.go` | Subscribe to NATS topics | 80 | MEDIUM |
|
|
| CHORUS Main | `cmd/chorus-agent/main.go` | Initialize NATS connection | 30 | LOW |
|
|
| Dependencies | `go.mod` (both repos) | Add nats-io/nats.go | 4 | LOW |
|
|
| **TOTAL** | | | **244 LOC** | **MEDIUM** |
|
|
|
|
**Complexity Rating**: **MEDIUM**
|
|
|
|
**Rationale**:
|
|
- NATS SDK is simple (already used in BACKBEAT)
|
|
- No peer discovery needed (centralized NATS broker)
|
|
- Maintains HTTP for health checks
|
|
- Production-ready infrastructure (NATS already deployed)
|
|
- Moderate testing complexity
|
|
|
|
### 2.4 Option 4: Hybrid Approach
|
|
|
|
**Phase 1 (Docker API)**: LOW complexity (87 LOC, 1-2 days)
|
|
**Phase 2 (NATS Migration)**: MEDIUM complexity (244 LOC, 3-4 days)
|
|
|
|
**Total Complexity Rating**: **MEDIUM** (staged deployment reduces risk)
|
|
|
|
---
|
|
|
|
## 3. Technical Feasibility Evaluation
|
|
|
|
### 3.1 Can we use NATS directly instead of full libp2p?
|
|
|
|
**YES - Strongly Recommended**
|
|
|
|
**Comparison**:
|
|
|
|
| Feature | libp2p | NATS | Requirement |
|
|
|---------|--------|------|-------------|
|
|
| Pub/Sub Messaging | ✅ Gossipsub | ✅ JetStream | ✅ Required |
|
|
| Broadcast to all agents | ✅ Topic-based | ✅ Subject-based | ✅ Required |
|
|
| Peer Discovery | ✅ mDNS/DHT | ❌ Centralized | ⚠️ Not needed yet |
|
|
| At-least-once delivery | ✅ | ✅ JetStream | ✅ Required |
|
|
| Message persistence | ⚠️ Complex | ✅ Built-in | 🎯 Nice-to-have |
|
|
| Cluster-wide deployment | ✅ Decentralized | ✅ Centralized | ✅ Either works |
|
|
| Existing infrastructure | ❌ New | ✅ **Already deployed** | 🎯 Huge win |
|
|
| Learning curve | 🔴 Steep | 🟢 Gentle | 🎯 Team familiarity |
|
|
| Production maturity | 🟡 Good | 🟢 Excellent | ✅ Required |
|
|
|
|
**Evidence from deployed infrastructure**:
|
|
```yaml
|
|
# File: /home/tony/chorus/project-queues/active/WHOOSH/BACKBEAT-prototype/docker-compose.swarm.yml:155-190
|
|
|
|
services:
|
|
nats:
|
|
image: nats:2.9-alpine
|
|
command: ["--jetstream"] # ✅ JetStream already enabled
|
|
deploy:
|
|
replicas: 1
|
|
networks:
|
|
- backbeat-net # ✅ Shared network with WHOOSH/CHORUS
|
|
```
|
|
|
|
**BACKBEAT SDK already uses NATS**:
|
|
```go
|
|
// File: /home/tony/chorus/project-queues/active/WHOOSH/BACKBEAT-prototype/pkg/sdk/client.go:15
|
|
|
|
import (
|
|
"github.com/nats-io/nats.go" // ✅ Already in vendor
|
|
)
|
|
|
|
// Subject pattern already established
|
|
subject := fmt.Sprintf("backbeat.status.%s", c.config.ClusterID)
|
|
```
|
|
|
|
**Recommendation**: Use NATS for 90% of use cases. Defer libp2p until you need:
|
|
- True P2P discovery (no centralized broker)
|
|
- Cross-datacenter mesh networking
|
|
- DHT-based content routing
|
|
|
|
### 3.2 What's the minimal viable libp2p implementation?
|
|
|
|
**If you must use libp2p** (not recommended for MVP):
|
|
|
|
```go
|
|
// Minimal libp2p setup for pub/sub only
|
|
|
|
package main
|
|
|
|
import (
|
|
"context"
|
|
"github.com/libp2p/go-libp2p"
|
|
pubsub "github.com/libp2p/go-libp2p-pubsub"
|
|
"github.com/multiformats/go-multiaddr"
|
|
)
|
|
|
|
func InitMinimalLibp2p(ctx context.Context) (*pubsub.PubSub, error) {
|
|
// 1. Create libp2p host
|
|
host, err := libp2p.New(
|
|
libp2p.ListenAddrStrings("/ip4/0.0.0.0/tcp/9000"),
|
|
)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
// 2. Create gossipsub instance
|
|
ps, err := pubsub.NewGossipSub(ctx, host)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
// 3. Bootstrap to peers (requires known peer addresses)
|
|
bootstrapPeers := []string{
|
|
"/ip4/192.168.1.27/tcp/9000/p2p/QmYourPeerID",
|
|
}
|
|
|
|
for _, peerAddr := range bootstrapPeers {
|
|
addr, _ := multiaddr.NewMultiaddr(peerAddr)
|
|
host.Connect(ctx, addr)
|
|
}
|
|
|
|
return ps, nil
|
|
}
|
|
|
|
// Usage in WHOOSH
|
|
func BroadcastCouncilOpportunity(ps *pubsub.PubSub, opp *CouncilOpportunity) error {
|
|
topic, _ := ps.Join("councils.forming")
|
|
|
|
data, _ := json.Marshal(opp)
|
|
return topic.Publish(context.Background(), data)
|
|
}
|
|
|
|
// Usage in CHORUS
|
|
func SubscribeToOpportunities(ps *pubsub.PubSub) {
|
|
topic, _ := ps.Join("councils.forming")
|
|
sub, _ := topic.Subscribe()
|
|
|
|
for {
|
|
msg, _ := sub.Next(context.Background())
|
|
var opp CouncilOpportunity
|
|
json.Unmarshal(msg.Data, &opp)
|
|
handleOpportunity(opp)
|
|
}
|
|
}
|
|
```
|
|
|
|
**Minimal dependencies**:
|
|
```bash
|
|
go get github.com/libp2p/go-libp2p@latest
|
|
go get github.com/libp2p/go-libp2p-pubsub@latest
|
|
```
|
|
|
|
**Estimated complexity**: 300 LOC (still 3x more than NATS)
|
|
|
|
### 3.3 Are there Go libraries/examples we can leverage?
|
|
|
|
**YES - Multiple options**
|
|
|
|
**Option A: NATS Go Client** (Recommended)
|
|
```bash
|
|
# Already in BACKBEAT SDK vendor directory
|
|
/home/tony/chorus/project-queues/active/WHOOSH/vendor/github.com/nats-io/nats.go/
|
|
```
|
|
|
|
**Example from BACKBEAT SDK**:
|
|
```go
|
|
// File: internal/backbeat/integration.go (hypothetical WHOOSH usage)
|
|
|
|
import "github.com/nats-io/nats.go"
|
|
|
|
type WHOOSHPublisher struct {
|
|
nc *nats.Conn
|
|
}
|
|
|
|
func NewWHOOSHPublisher(natsURL string) (*WHOOSHPublisher, error) {
|
|
nc, err := nats.Connect(natsURL)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
return &WHOOSHPublisher{nc: nc}, nil
|
|
}
|
|
|
|
func (w *WHOOSHPublisher) BroadcastCouncilOpportunity(opp *CouncilOpportunity) error {
|
|
data, _ := json.Marshal(opp)
|
|
|
|
// Publish to all CHORUS agents subscribed to this subject
|
|
return w.nc.Publish("chorus.councils.forming", data)
|
|
}
|
|
```
|
|
|
|
**Option B: Docker Go SDK**
|
|
```bash
|
|
# For Docker API quick fix
|
|
go get github.com/docker/docker@latest
|
|
```
|
|
|
|
**Example**:
|
|
```go
|
|
// File: internal/p2p/discovery_swarm.go (new file)
|
|
|
|
import (
|
|
"context"
|
|
"github.com/docker/docker/client"
|
|
"github.com/docker/docker/api/types"
|
|
"github.com/docker/docker/api/types/filters"
|
|
)
|
|
|
|
func (d *Discovery) DiscoverSwarmAgents() ([]*Agent, error) {
|
|
cli, err := client.NewClientWithOpts(client.FromEnv)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
defer cli.Close()
|
|
|
|
// List all tasks for CHORUS service
|
|
tasks, err := cli.TaskList(context.Background(), types.TaskListOptions{
|
|
Filters: filters.NewArgs(
|
|
filters.Arg("service", "CHORUS_chorus"),
|
|
filters.Arg("desired-state", "running"),
|
|
),
|
|
})
|
|
|
|
agents := []*Agent{}
|
|
for _, task := range tasks {
|
|
// Extract container IP from task
|
|
for _, attachment := range task.NetworksAttachments {
|
|
if len(attachment.Addresses) > 0 {
|
|
ip := strings.Split(attachment.Addresses[0], "/")[0]
|
|
endpoint := fmt.Sprintf("http://%s:8080", ip)
|
|
|
|
agents = append(agents, &Agent{
|
|
ID: task.ID[:12],
|
|
Endpoint: endpoint,
|
|
Status: "online",
|
|
// ... populate other fields
|
|
})
|
|
}
|
|
}
|
|
}
|
|
|
|
return agents, nil
|
|
}
|
|
```
|
|
|
|
**Option C: libp2p Examples**
|
|
```bash
|
|
# Official examples repository
|
|
git clone https://github.com/libp2p/go-libp2p-examples.git
|
|
|
|
# Relevant examples:
|
|
# - pubsub-example/
|
|
# - echo/
|
|
# - chat-with-rendezvous/
|
|
```
|
|
|
|
### 3.4 What about backwards compatibility during migration?
|
|
|
|
**Strategy: Dual-mode operation during transition**
|
|
|
|
```go
|
|
// File: internal/p2p/broadcaster.go
|
|
|
|
type Broadcaster struct {
|
|
discovery *Discovery
|
|
natsConn *nats.Conn // New: NATS connection
|
|
ctx context.Context
|
|
cancel context.CancelFunc
|
|
|
|
// Feature flag for gradual rollout
|
|
useNATS bool
|
|
}
|
|
|
|
func (b *Broadcaster) BroadcastCouncilOpportunity(ctx context.Context, opp *CouncilOpportunity) error {
|
|
if b.useNATS && b.natsConn != nil {
|
|
// New path: NATS pub/sub
|
|
return b.broadcastViaNATS(ctx, opp)
|
|
}
|
|
|
|
// Legacy path: HTTP POST (existing code)
|
|
return b.broadcastViaHTTP(ctx, opp)
|
|
}
|
|
|
|
func (b *Broadcaster) broadcastViaNATS(ctx context.Context, opp *CouncilOpportunity) error {
|
|
data, err := json.Marshal(opp)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
subject := fmt.Sprintf("chorus.councils.%s.forming", opp.CouncilID)
|
|
return b.natsConn.Publish(subject, data)
|
|
}
|
|
|
|
func (b *Broadcaster) broadcastViaHTTP(ctx context.Context, opp *CouncilOpportunity) error {
|
|
// Existing HTTP broadcast logic (unchanged)
|
|
agents := b.discovery.GetAgents()
|
|
// ... existing code
|
|
}
|
|
```
|
|
|
|
**Deployment strategy**:
|
|
1. **Week 1**: Deploy NATS code with `useNATS=false` (no behavior change)
|
|
2. **Week 2**: Enable NATS for 10% of broadcasts (canary testing)
|
|
3. **Week 3**: Enable NATS for 100% of broadcasts
|
|
4. **Week 4**: Remove HTTP broadcast code entirely
|
|
|
|
**Rollback**: Set `useNATS=false` via environment variable
|
|
|
|
---
|
|
|
|
## 4. Alternative Solution Comparison
|
|
|
|
### 4.1 Option 1: Full libp2p/HMMM Migration
|
|
|
|
**Architecture**:
|
|
```
|
|
┌─────────────┐
|
|
│ WHOOSH │
|
|
│ (libp2p) │ - Initialize libp2p host
|
|
└──────┬──────┘ - Join "councils.forming" topic
|
|
│ - Publish council opportunities
|
|
│
|
|
│ Publishes to libp2p gossipsub topic
|
|
▼
|
|
┌──────────────────────────────────────────┐
|
|
│ libp2p Gossipsub Network │
|
|
│ - Peer discovery via mDNS │
|
|
│ - DHT routing (optional) │
|
|
│ - Message propagation to all subscribers│
|
|
└──────┬─────┬─────┬─────┬─────┬──────────┘
|
|
│ │ │ │ │
|
|
┌───▼┐ ┌──▼─┐ ┌─▼──┐ ┌▼──┐ (... 34 total)
|
|
│CH1 │ │CH2 │ │CH3 │ │CH4│ All CHORUS agents
|
|
└────┘ └────┘ └────┘ └───┘ subscribe to topic
|
|
|
|
✅ ALL 34 agents receive message simultaneously
|
|
```
|
|
|
|
**Pros**:
|
|
- ✅ True P2P architecture (no central broker)
|
|
- ✅ Automatic peer discovery via mDNS/DHT
|
|
- ✅ Built-in encryption and authentication
|
|
- ✅ Message deduplication
|
|
- ✅ Resilient to network partitions
|
|
- ✅ Unlocks future P2P features (DHT storage, streams)
|
|
|
|
**Cons**:
|
|
- ❌ High complexity (650+ LOC changes)
|
|
- ❌ Steep learning curve
|
|
- ❌ Requires libp2p expertise
|
|
- ❌ Breaking change (coordinated deployment)
|
|
- ❌ Complex testing requirements
|
|
- ❌ Overkill for current needs
|
|
|
|
**Implementation time**: 7-11 days
|
|
|
|
**When to use**: When you need true P2P discovery without central infrastructure
|
|
|
|
### 4.2 Option 2: Docker API Quick Fix
|
|
|
|
**Architecture**:
|
|
```
|
|
┌─────────────┐
|
|
│ WHOOSH │
|
|
│ (Docker API)│ - Mount /var/run/docker.sock
|
|
└──────┬──────┘ - Query Swarm API for tasks
|
|
│
|
|
│ GET /tasks?service=CHORUS_chorus
|
|
▼
|
|
┌──────────────────┐
|
|
│ Docker Engine │ Returns: List of all 34 tasks
|
|
│ (Swarm Manager) │ with container IPs
|
|
└────────┬─────────┘
|
|
│
|
|
┌────┴─────┬──────────────┬────────────┐
|
|
▼ ▼ ▼ ▼ (... 34 total)
|
|
┌────────┐ ┌────────┐ ┌────────┐ ┌────────┐
|
|
│10.0.13.1│ │10.0.13.2│ │10.0.13.3│ │10.0.13.34│
|
|
│CHORUS 1│ │CHORUS 2│ │CHORUS 3│ │CHORUS 34│
|
|
└────────┘ └────────┘ └────────┘ └────────┘
|
|
|
|
WHOOSH makes HTTP POST to all 34 container IPs
|
|
✅ 100% discovery rate
|
|
```
|
|
|
|
**Code changes**:
|
|
```go
|
|
// File: internal/p2p/discovery.go (add new method)
|
|
|
|
import (
|
|
"github.com/docker/docker/client"
|
|
"github.com/docker/docker/api/types"
|
|
"github.com/docker/docker/api/types/filters"
|
|
)
|
|
|
|
func (d *Discovery) discoverDockerSwarmAgents() {
|
|
cli, err := client.NewClientWithOpts(client.FromEnv)
|
|
if err != nil {
|
|
log.Error().Err(err).Msg("Failed to create Docker client")
|
|
return
|
|
}
|
|
defer cli.Close()
|
|
|
|
// Query Swarm for all CHORUS tasks
|
|
tasks, err := cli.TaskList(context.Background(), types.TaskListOptions{
|
|
Filters: filters.NewArgs(
|
|
filters.Arg("service", "CHORUS_chorus"),
|
|
filters.Arg("desired-state", "running"),
|
|
),
|
|
})
|
|
if err != nil {
|
|
log.Error().Err(err).Msg("Failed to list Swarm tasks")
|
|
return
|
|
}
|
|
|
|
log.Info().Int("task_count", len(tasks)).Msg("Discovered Swarm tasks")
|
|
|
|
for _, task := range tasks {
|
|
if len(task.NetworksAttachments) == 0 {
|
|
continue
|
|
}
|
|
|
|
for _, attachment := range task.NetworksAttachments {
|
|
if len(attachment.Addresses) == 0 {
|
|
continue
|
|
}
|
|
|
|
// Extract IP from CIDR (e.g., "10.0.13.5/24" -> "10.0.13.5")
|
|
ip := strings.Split(attachment.Addresses[0], "/")[0]
|
|
endpoint := fmt.Sprintf("http://%s:8080", ip)
|
|
|
|
agent := &Agent{
|
|
ID: task.ID[:12], // Use first 12 chars of task ID
|
|
Name: fmt.Sprintf("CHORUS-%s", task.Slot),
|
|
Endpoint: endpoint,
|
|
Status: "online",
|
|
Capabilities: []string{
|
|
"general_development",
|
|
"task_coordination",
|
|
},
|
|
Model: "llama3.1:8b",
|
|
LastSeen: time.Now(),
|
|
P2PAddr: fmt.Sprintf("%s:9000", ip),
|
|
ClusterID: "docker-unified-stack",
|
|
}
|
|
|
|
d.addOrUpdateAgent(agent)
|
|
|
|
log.Debug().
|
|
Str("task_id", task.ID[:12]).
|
|
Str("ip", ip).
|
|
Str("endpoint", endpoint).
|
|
Msg("Discovered CHORUS agent via Docker API")
|
|
}
|
|
}
|
|
}
|
|
```
|
|
|
|
**Deployment changes**:
|
|
```yaml
|
|
# File: docker/docker-compose.swarm.yml (WHOOSH service)
|
|
|
|
services:
|
|
whoosh:
|
|
image: anthonyrawlins/whoosh:latest
|
|
volumes:
|
|
- /var/run/docker.sock:/var/run/docker.sock:ro # Mount Docker socket
|
|
environment:
|
|
- WHOOSH_DISCOVERY_MODE=docker-swarm
|
|
```
|
|
|
|
**Pros**:
|
|
- ✅ **Minimal code changes** (87 LOC)
|
|
- ✅ **Immediate solution** (1-2 days)
|
|
- ✅ 100% discovery rate
|
|
- ✅ Easy to test and validate
|
|
- ✅ Simple rollback (remove socket mount)
|
|
- ✅ No distributed systems complexity
|
|
|
|
**Cons**:
|
|
- ⚠️ Requires privileged Docker socket access
|
|
- ⚠️ Couples WHOOSH to Docker Swarm
|
|
- ⚠️ Doesn't enable pub/sub messaging
|
|
- ⚠️ Still uses HTTP (no performance benefits)
|
|
- ⚠️ Not portable to Kubernetes/other orchestrators
|
|
|
|
**Implementation time**: 1-2 days
|
|
|
|
**When to use**: Emergency unblock, temporary fix while planning proper migration
|
|
|
|
### 4.3 Option 3: NATS-Only Solution (Recommended Long-term)
|
|
|
|
**Architecture**:
|
|
```
|
|
┌─────────────┐
|
|
│ WHOOSH │
|
|
│ (NATS client)│ - Connect to NATS broker
|
|
└──────┬──────┘ - Publish to subject
|
|
│
|
|
│ PUBLISH chorus.councils.forming {...}
|
|
▼
|
|
┌──────────────────────────────────────────┐
|
|
│ NATS Server (JetStream) │
|
|
│ - Already deployed as backbeat-nats │
|
|
│ - Message persistence │
|
|
│ - At-least-once delivery │
|
|
└──────┬─────┬─────┬─────┬─────┬──────────┘
|
|
│ │ │ │ │
|
|
┌───▼┐ ┌──▼─┐ ┌─▼──┐ ┌▼──┐ (... 34 total)
|
|
│CH1 │ │CH2 │ │CH3 │ │CH4│ All CHORUS agents
|
|
│SUB │ │SUB │ │SUB │ │SUB│ subscribe on startup
|
|
└────┘ └────┘ └────┘ └───┘
|
|
|
|
✅ ALL 34 agents receive message via subscription
|
|
```
|
|
|
|
**Code changes - WHOOSH**:
|
|
```go
|
|
// File: internal/nats/publisher.go (new file)
|
|
|
|
package nats
|
|
|
|
import (
|
|
"context"
|
|
"encoding/json"
|
|
"fmt"
|
|
|
|
"github.com/nats-io/nats.go"
|
|
"github.com/rs/zerolog/log"
|
|
)
|
|
|
|
type CouncilPublisher struct {
|
|
nc *nats.Conn
|
|
}
|
|
|
|
func NewCouncilPublisher(natsURL string) (*CouncilPublisher, error) {
|
|
nc, err := nats.Connect(natsURL,
|
|
nats.MaxReconnects(-1), // Infinite reconnects
|
|
nats.ReconnectWait(1*time.Second),
|
|
)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("failed to connect to NATS: %w", err)
|
|
}
|
|
|
|
log.Info().Str("nats_url", natsURL).Msg("Connected to NATS for council publishing")
|
|
|
|
return &CouncilPublisher{nc: nc}, nil
|
|
}
|
|
|
|
func (p *CouncilPublisher) PublishCouncilOpportunity(ctx context.Context, opp *CouncilOpportunity) error {
|
|
data, err := json.Marshal(opp)
|
|
if err != nil {
|
|
return fmt.Errorf("failed to marshal opportunity: %w", err)
|
|
}
|
|
|
|
subject := "chorus.councils.forming"
|
|
|
|
// Publish with headers for tracing
|
|
msg := &nats.Msg{
|
|
Subject: subject,
|
|
Data: data,
|
|
Header: nats.Header{
|
|
"Council-ID": []string{opp.CouncilID.String()},
|
|
"Project": []string{opp.ProjectName},
|
|
},
|
|
}
|
|
|
|
if err := p.nc.PublishMsg(msg); err != nil {
|
|
return fmt.Errorf("failed to publish to NATS: %w", err)
|
|
}
|
|
|
|
log.Info().
|
|
Str("council_id", opp.CouncilID.String()).
|
|
Str("subject", subject).
|
|
Msg("Published council opportunity to NATS")
|
|
|
|
return nil
|
|
}
|
|
|
|
func (p *CouncilPublisher) Close() error {
|
|
if p.nc != nil {
|
|
p.nc.Close()
|
|
}
|
|
return nil
|
|
}
|
|
```
|
|
|
|
**Code changes - CHORUS**:
|
|
```go
|
|
// File: internal/nats/subscriber.go (new file)
|
|
|
|
package nats
|
|
|
|
import (
|
|
"context"
|
|
"encoding/json"
|
|
"fmt"
|
|
|
|
"chorus/internal/council"
|
|
"github.com/nats-io/nats.go"
|
|
"github.com/rs/zerolog/log"
|
|
)
|
|
|
|
type CouncilSubscriber struct {
|
|
nc *nats.Conn
|
|
sub *nats.Subscription
|
|
councilMgr *council.Manager
|
|
}
|
|
|
|
func NewCouncilSubscriber(natsURL string, councilMgr *council.Manager) (*CouncilSubscriber, error) {
|
|
nc, err := nats.Connect(natsURL,
|
|
nats.MaxReconnects(-1),
|
|
nats.ReconnectWait(1*time.Second),
|
|
)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("failed to connect to NATS: %w", err)
|
|
}
|
|
|
|
s := &CouncilSubscriber{
|
|
nc: nc,
|
|
councilMgr: councilMgr,
|
|
}
|
|
|
|
// Subscribe to council opportunities
|
|
sub, err := nc.Subscribe("chorus.councils.forming", s.handleCouncilOpportunity)
|
|
if err != nil {
|
|
nc.Close()
|
|
return nil, fmt.Errorf("failed to subscribe to council opportunities: %w", err)
|
|
}
|
|
s.sub = sub
|
|
|
|
log.Info().
|
|
Str("nats_url", natsURL).
|
|
Str("subject", "chorus.councils.forming").
|
|
Msg("Subscribed to council opportunities on NATS")
|
|
|
|
return s, nil
|
|
}
|
|
|
|
func (s *CouncilSubscriber) handleCouncilOpportunity(msg *nats.Msg) {
|
|
var opp council.Opportunity
|
|
if err := json.Unmarshal(msg.Data, &opp); err != nil {
|
|
log.Error().Err(err).Msg("Failed to unmarshal council opportunity")
|
|
return
|
|
}
|
|
|
|
log.Info().
|
|
Str("council_id", opp.CouncilID.String()).
|
|
Str("project", opp.ProjectName).
|
|
Msg("Received council opportunity from NATS")
|
|
|
|
// Delegate to council manager (existing logic)
|
|
if err := s.councilMgr.HandleOpportunity(context.Background(), &opp); err != nil {
|
|
log.Error().
|
|
Err(err).
|
|
Str("council_id", opp.CouncilID.String()).
|
|
Msg("Failed to handle council opportunity")
|
|
}
|
|
}
|
|
|
|
func (s *CouncilSubscriber) Close() error {
|
|
if s.sub != nil {
|
|
s.sub.Unsubscribe()
|
|
}
|
|
if s.nc != nil {
|
|
s.nc.Close()
|
|
}
|
|
return nil
|
|
}
|
|
```
|
|
|
|
**Integration in CHORUS main**:
|
|
```go
|
|
// File: cmd/chorus-agent/main.go
|
|
|
|
func main() {
|
|
// ... existing setup
|
|
|
|
// Initialize council manager
|
|
councilMgr := council.NewManager(agentID, agentName, endpoint, p2pAddr, capabilities)
|
|
|
|
// NEW: Subscribe to NATS instead of HTTP endpoint
|
|
natsURL := os.Getenv("NATS_URL")
|
|
if natsURL == "" {
|
|
natsURL = "nats://backbeat-nats:4222"
|
|
}
|
|
|
|
natsSubscriber, err := nats.NewCouncilSubscriber(natsURL, councilMgr)
|
|
if err != nil {
|
|
log.Fatal().Err(err).Msg("Failed to initialize NATS subscriber")
|
|
}
|
|
defer natsSubscriber.Close()
|
|
|
|
// HTTP server still runs for health checks (optional)
|
|
if enableHTTP {
|
|
httpServer := api.NewHTTPServer(cfg, node, hlog, ps)
|
|
go httpServer.Start()
|
|
}
|
|
|
|
// ... rest of main
|
|
}
|
|
```
|
|
|
|
**Pros**:
|
|
- ✅ **Simpler than libp2p** (244 vs 650 LOC)
|
|
- ✅ **Production-ready** (NATS battle-tested)
|
|
- ✅ **Already deployed** (backbeat-nats running)
|
|
- ✅ 100% broadcast delivery
|
|
- ✅ Message persistence (JetStream)
|
|
- ✅ Familiar SDK (used in BACKBEAT)
|
|
- ✅ Easy testing (local NATS for dev)
|
|
- ✅ Portable (works with any orchestrator)
|
|
|
|
**Cons**:
|
|
- ⚠️ Centralized (NATS is single point of failure)
|
|
- ⚠️ No automatic peer discovery (not needed)
|
|
- ⚠️ Requires NATS infrastructure (already have it)
|
|
|
|
**Implementation time**: 3-4 days
|
|
|
|
**When to use**: Production-grade pub/sub without P2P complexity
|
|
|
|
### 4.4 Comparison Matrix
|
|
|
|
| Criteria | Option 1: libp2p | Option 2: Docker API | Option 3: NATS | Option 4: Hybrid |
|
|
|----------|------------------|----------------------|----------------|------------------|
|
|
| **Discovery Success** | 100% (gossipsub) | 100% (API query) | 100% (pub/sub) | 100% (both phases) |
|
|
| **Implementation Time** | 7-11 days | 1-2 days | 3-4 days | 2 days + 3-4 days |
|
|
| **Code Complexity** | HIGH (650 LOC) | LOW (87 LOC) | MEDIUM (244 LOC) | LOW → MEDIUM |
|
|
| **Testing Complexity** | HIGH | LOW | MEDIUM | LOW → MEDIUM |
|
|
| **Production Risk** | HIGH | LOW | MEDIUM | LOW (staged) |
|
|
| **Scalability** | Excellent | Good | Excellent | Excellent |
|
|
| **Infrastructure Deps** | None (P2P) | Docker Swarm | NATS (have it) | Both |
|
|
| **Learning Curve** | Steep | Gentle | Gentle | Gentle |
|
|
| **Backwards Compat** | Breaking change | Non-breaking | Breaking change | Dual-mode |
|
|
| **Rollback Ease** | Difficult | Easy | Moderate | Easy (per phase) |
|
|
| **Future-Proofing** | Best (P2P ready) | Poor (Docker-only) | Good | Good |
|
|
| **Message Persistence** | Complex | N/A | Built-in | N/A → Built-in |
|
|
| **Operational Overhead** | High | Low | Low | Low |
|
|
| **Deployment Coupling** | Tight | Tight | Moderate | Moderate |
|
|
|
|
**Scoring** (1-10, higher is better):
|
|
|
|
| Option | Immediate Value | Long-term Value | Risk Level | Total Score |
|
|
|--------|-----------------|-----------------|------------|-------------|
|
|
| **Option 1: libp2p** | 6/10 | 10/10 | 4/10 | **20/30** |
|
|
| **Option 2: Docker API** | 10/10 | 4/10 | 9/10 | **23/30** |
|
|
| **Option 3: NATS** | 8/10 | 9/10 | 7/10 | **24/30** |
|
|
| **Option 4: Hybrid** | 10/10 | 9/10 | 8/10 | **27/30** ⭐ |
|
|
|
|
---
|
|
|
|
## 5. Risk Analysis Matrix
|
|
|
|
### 5.1 Technical Risks
|
|
|
|
| Risk | Option 1: libp2p | Option 2: Docker API | Option 3: NATS | Option 4: Hybrid |
|
|
|------|------------------|----------------------|----------------|------------------|
|
|
| **Peer discovery failure** | LOW (mDNS/DHT) | N/A | N/A | N/A |
|
|
| **Message delivery failure** | LOW (retry logic) | MEDIUM (HTTP timeout) | LOW (NATS ack) | LOW → LOW |
|
|
| **Network partition** | LOW (eventual consistency) | HIGH (no retries) | MEDIUM (NATS reconnect) | HIGH → MEDIUM |
|
|
| **Scale bottleneck** | LOW (P2P) | MEDIUM (VIP load) | LOW (NATS scales) | MEDIUM → LOW |
|
|
| **Implementation bugs** | HIGH (complex) | LOW (simple) | MEDIUM | LOW → MEDIUM |
|
|
| **Integration issues** | HIGH (new system) | LOW (existing stack) | MEDIUM (new client) | LOW → MEDIUM |
|
|
| **Performance degradation** | LOW (efficient) | MEDIUM (HTTP overhead) | LOW (fast broker) | MEDIUM → LOW |
|
|
|
|
### 5.2 Deployment Risks
|
|
|
|
| Risk | Option 1: libp2p | Option 2: Docker API | Option 3: NATS | Option 4: Hybrid |
|
|
|------|------------------|----------------------|----------------|------------------|
|
|
| **Coordinated deployment** | **CRITICAL** | LOW | **HIGH** | LOW → HIGH |
|
|
| **Rollback complexity** | HIGH (breaking) | LOW (remove mount) | MEDIUM (revert code) | LOW → MEDIUM |
|
|
| **Downtime required** | YES (breaking) | NO | YES (breaking) | NO → YES |
|
|
| **Service interruption** | HIGH (full restart) | LOW (WHOOSH only) | MEDIUM (both services) | LOW → MEDIUM |
|
|
| **Config drift** | HIGH (new settings) | LOW (one env var) | MEDIUM (NATS URL) | LOW → MEDIUM |
|
|
| **Version compatibility** | CRITICAL | N/A | MEDIUM (NATS version) | N/A → MEDIUM |
|
|
|
|
### 5.3 Operational Risks
|
|
|
|
| Risk | Option 1: libp2p | Option 2: Docker API | Option 3: NATS | Option 4: Hybrid |
|
|
|------|------------------|----------------------|----------------|------------------|
|
|
| **Monitoring gaps** | HIGH (new metrics) | LOW (existing) | MEDIUM (NATS metrics) | LOW → MEDIUM |
|
|
| **Debugging difficulty** | HIGH (distributed) | LOW (centralized) | MEDIUM (broker logs) | LOW → MEDIUM |
|
|
| **On-call burden** | HIGH (complex) | LOW (simple) | MEDIUM | LOW → MEDIUM |
|
|
| **Documentation debt** | HIGH (new system) | LOW (minimal change) | MEDIUM (NATS docs) | LOW → MEDIUM |
|
|
| **Team knowledge** | LOW (unfamiliar) | HIGH (Docker known) | MEDIUM (NATS used) | HIGH → MEDIUM |
|
|
| **Vendor lock-in** | LOW (open source) | HIGH (Docker Swarm) | MEDIUM (NATS) | HIGH → MEDIUM |
|
|
|
|
### 5.4 Mitigation Strategies
|
|
|
|
**Option 1: libp2p**
|
|
- **Risk**: High implementation complexity
|
|
- **Mitigation**:
|
|
- Hire libp2p consultant for architecture review
|
|
- Build prototype in isolated environment first
|
|
- Extensive integration testing before production
|
|
- Feature flagging for gradual rollout
|
|
|
|
**Option 2: Docker API**
|
|
- **Risk**: Docker Swarm vendor lock-in
|
|
- **Mitigation**:
|
|
- Document as temporary solution
|
|
- Plan NATS migration immediately after
|
|
- Abstract discovery interface for future portability
|
|
- Maintain HTTP broadcast as fallback
|
|
|
|
**Option 3: NATS**
|
|
- **Risk**: NATS broker single point of failure
|
|
- **Mitigation**:
|
|
- Deploy NATS in clustered mode (3 replicas)
|
|
- Enable JetStream for message persistence
|
|
- Monitor NATS health and set up alerts
|
|
- Document NATS recovery procedures
|
|
|
|
**Option 4: Hybrid** (Recommended)
|
|
- **Risk**: Phase 2 migration complexity
|
|
- **Mitigation**:
|
|
- Validate Phase 1 thoroughly in production (1 week)
|
|
- Implement dual-mode operation (HTTP + NATS) during Phase 2
|
|
- Use feature flags for gradual NATS rollout
|
|
- Maintain HTTP as fallback during transition
|
|
|
|
---
|
|
|
|
## 6. Detailed Implementation Plans
|
|
|
|
### 6.1 Option 2: Docker API Quick Fix (RECOMMENDED SHORT-TERM)
|
|
|
|
**Timeline**: 1-2 days
|
|
|
|
#### Step 1: Code Changes (4 hours)
|
|
|
|
**File**: `/home/tony/chorus/project-queues/active/WHOOSH/internal/p2p/discovery.go`
|
|
|
|
```go
|
|
// Add import
|
|
import (
|
|
"github.com/docker/docker/client"
|
|
"github.com/docker/docker/api/types"
|
|
"github.com/docker/docker/api/types/filters"
|
|
)
|
|
|
|
// Add new method after discoverRealCHORUSAgents()
|
|
|
|
func (d *Discovery) discoverDockerSwarmAgents() {
|
|
if !d.config.DockerEnabled {
|
|
return
|
|
}
|
|
|
|
cli, err := client.NewClientWithOpts(client.FromEnv)
|
|
if err != nil {
|
|
log.Debug().Err(err).Msg("Docker client not available (running outside Swarm?)")
|
|
return
|
|
}
|
|
defer cli.Close()
|
|
|
|
ctx := context.Background()
|
|
|
|
// List all tasks for CHORUS service
|
|
serviceName := os.Getenv("CHORUS_SERVICE_NAME")
|
|
if serviceName == "" {
|
|
serviceName = "CHORUS_chorus" // Default stack name
|
|
}
|
|
|
|
tasks, err := cli.TaskList(ctx, types.TaskListOptions{
|
|
Filters: filters.NewArgs(
|
|
filters.Arg("service", serviceName),
|
|
filters.Arg("desired-state", "running"),
|
|
),
|
|
})
|
|
|
|
if err != nil {
|
|
log.Error().Err(err).Msg("Failed to list Docker Swarm tasks")
|
|
return
|
|
}
|
|
|
|
log.Info().
|
|
Int("task_count", len(tasks)).
|
|
Str("service", serviceName).
|
|
Msg("🐳 Discovered Docker Swarm tasks")
|
|
|
|
discoveredCount := 0
|
|
|
|
for _, task := range tasks {
|
|
// Skip tasks without network attachments
|
|
if len(task.NetworksAttachments) == 0 {
|
|
log.Debug().Str("task_id", task.ID[:12]).Msg("Task has no network attachments")
|
|
continue
|
|
}
|
|
|
|
// Extract IP from first network attachment
|
|
for _, attachment := range task.NetworksAttachments {
|
|
if len(attachment.Addresses) == 0 {
|
|
continue
|
|
}
|
|
|
|
// Parse IP from CIDR notation (e.g., "10.0.13.5/24")
|
|
ipCIDR := attachment.Addresses[0]
|
|
ip := strings.Split(ipCIDR, "/")[0]
|
|
|
|
// Construct agent endpoint
|
|
port := os.Getenv("CHORUS_AGENT_PORT")
|
|
if port == "" {
|
|
port = "8080"
|
|
}
|
|
endpoint := fmt.Sprintf("http://%s:%s", ip, port)
|
|
|
|
// Create agent entry
|
|
agent := &Agent{
|
|
ID: task.ID[:12], // Short task ID
|
|
Name: fmt.Sprintf("CHORUS-Task-%d", task.Slot),
|
|
Endpoint: endpoint,
|
|
Status: "online",
|
|
Capabilities: []string{
|
|
"general_development",
|
|
"task_coordination",
|
|
"ai_integration",
|
|
},
|
|
Model: "llama3.1:8b",
|
|
LastSeen: time.Now(),
|
|
TasksCompleted: 0,
|
|
P2PAddr: fmt.Sprintf("%s:9000", ip),
|
|
ClusterID: "docker-unified-stack",
|
|
}
|
|
|
|
d.addOrUpdateAgent(agent)
|
|
discoveredCount++
|
|
|
|
log.Debug().
|
|
Str("task_id", task.ID[:12]).
|
|
Int("slot", int(task.Slot)).
|
|
Str("ip", ip).
|
|
Str("endpoint", endpoint).
|
|
Msg("🤖 Discovered CHORUS agent via Docker API")
|
|
|
|
break // Only use first network attachment
|
|
}
|
|
}
|
|
|
|
log.Info().
|
|
Int("discovered", discoveredCount).
|
|
Int("total_tasks", len(tasks)).
|
|
Msg("✅ Docker Swarm agent discovery completed")
|
|
}
|
|
```
|
|
|
|
**Update `discoverRealCHORUSAgents()` to call new method**:
|
|
|
|
```go
|
|
func (d *Discovery) discoverRealCHORUSAgents() {
|
|
log.Debug().Msg("🔍 Discovering real CHORUS agents via health endpoints")
|
|
|
|
// Priority 1: Docker Swarm API (most accurate)
|
|
d.discoverDockerSwarmAgents()
|
|
|
|
// Priority 2: Known service endpoints (fallback)
|
|
d.queryActualCHORUSService()
|
|
d.discoverKnownEndpoints()
|
|
}
|
|
```
|
|
|
|
#### Step 2: Dependency Update (5 minutes)
|
|
|
|
**File**: `/home/tony/chorus/project-queues/active/WHOOSH/go.mod`
|
|
|
|
```bash
|
|
cd /home/tony/chorus/project-queues/active/WHOOSH
|
|
go get github.com/docker/docker@latest
|
|
go mod tidy
|
|
```
|
|
|
|
#### Step 3: Configuration Changes (10 minutes)
|
|
|
|
**File**: `/home/tony/chorus/project-queues/active/WHOOSH/docker/docker-compose.swarm.yml`
|
|
|
|
```yaml
|
|
services:
|
|
whoosh:
|
|
image: anthonyrawlins/whoosh:latest
|
|
|
|
# NEW: Mount Docker socket for Swarm API access
|
|
volumes:
|
|
- /var/run/docker.sock:/var/run/docker.sock:ro
|
|
|
|
# NEW: Environment variables for Docker discovery
|
|
environment:
|
|
- WHOOSH_DISCOVERY_MODE=docker-swarm
|
|
- CHORUS_SERVICE_NAME=CHORUS_chorus # Match your stack name
|
|
- CHORUS_AGENT_PORT=8080
|
|
|
|
# IMPORTANT: Security note - Docker socket access is read-only
|
|
# This allows querying tasks but not modifying them
|
|
```
|
|
|
|
#### Step 4: Build and Deploy (30 minutes)
|
|
|
|
```bash
|
|
# Build new WHOOSH image
|
|
cd /home/tony/chorus/project-queues/active/WHOOSH
|
|
docker build -t registry.home.deepblack.cloud/whoosh:v1.2.0-docker-discovery .
|
|
docker push registry.home.deepblack.cloud/whoosh:v1.2.0-docker-discovery
|
|
|
|
# Update compose file to use new image
|
|
# Edit docker/docker-compose.swarm.yml:
|
|
# whoosh:
|
|
# image: registry.home.deepblack.cloud/whoosh:v1.2.0-docker-discovery
|
|
|
|
# Redeploy to Swarm
|
|
docker stack deploy -c docker/docker-compose.swarm.yml WHOOSH
|
|
```
|
|
|
|
#### Step 5: Validation (1 hour)
|
|
|
|
```bash
|
|
# 1. Check WHOOSH logs for discovery
|
|
docker service logs -f WHOOSH_whoosh --tail 100
|
|
|
|
# Expected output:
|
|
# {"level":"info","task_count":34,"service":"CHORUS_chorus","message":"🐳 Discovered Docker Swarm tasks"}
|
|
# {"level":"info","discovered":34,"total_tasks":34,"message":"✅ Docker Swarm agent discovery completed"}
|
|
|
|
# 2. Check agent registry in database
|
|
docker exec -it $(docker ps -q -f name=WHOOSH_whoosh) psql -U whoosh -d whoosh -c \
|
|
"SELECT COUNT(*) FROM agents WHERE status = 'available';"
|
|
|
|
# Expected: 34 rows
|
|
|
|
# 3. Trigger council formation
|
|
curl -X POST http://whoosh.chorus.services/api/v1/councils \
|
|
-H "Content-Type: application/json" \
|
|
-d '{
|
|
"project_name": "Test Discovery",
|
|
"repository": "https://gitea.chorus.services/tony/test",
|
|
"core_roles": ["tpm", "senior-software-architect"]
|
|
}'
|
|
|
|
# 4. Verify all agents received broadcast
|
|
docker service logs CHORUS_chorus --tail 1000 | grep "Received council opportunity"
|
|
|
|
# Expected: 34 log entries (one per agent)
|
|
```
|
|
|
|
#### Step 6: Monitoring and Rollback Plan (30 minutes)
|
|
|
|
**Success Criteria**:
|
|
- ✅ All 34 CHORUS agents discovered
|
|
- ✅ Council broadcasts reach all agents
|
|
- ✅ Council formation completes successfully
|
|
- ✅ No performance degradation
|
|
|
|
**Rollback Procedure** (if issues occur):
|
|
```bash
|
|
# 1. Remove Docker socket mount from compose file
|
|
# 2. Redeploy with previous image
|
|
docker stack deploy -c docker/docker-compose.swarm.yml.backup WHOOSH
|
|
|
|
# 3. Verify rollback
|
|
docker service logs WHOOSH_whoosh
|
|
```
|
|
|
|
**Total Implementation Time**: 6-8 hours
|
|
|
|
---
|
|
|
|
### 6.2 Option 3: NATS-Only Solution (RECOMMENDED LONG-TERM)
|
|
|
|
**Timeline**: 3-4 days
|
|
|
|
#### Day 1: WHOOSH Publisher Implementation
|
|
|
|
**Task 1.1**: Create NATS publisher module (3 hours)
|
|
|
|
**New File**: `/home/tony/chorus/project-queues/active/WHOOSH/internal/nats/publisher.go`
|
|
|
|
```go
|
|
package nats
|
|
|
|
import (
|
|
"context"
|
|
"encoding/json"
|
|
"fmt"
|
|
"time"
|
|
|
|
"github.com/chorus-services/whoosh/internal/p2p"
|
|
"github.com/google/uuid"
|
|
"github.com/nats-io/nats.go"
|
|
"github.com/rs/zerolog/log"
|
|
)
|
|
|
|
type CouncilPublisher struct {
|
|
nc *nats.Conn
|
|
}
|
|
|
|
func NewCouncilPublisher(natsURL string) (*CouncilPublisher, error) {
|
|
opts := []nats.Option{
|
|
nats.Name("WHOOSH Council Publisher"),
|
|
nats.MaxReconnects(-1), // Infinite reconnects
|
|
nats.ReconnectWait(1 * time.Second),
|
|
nats.DisconnectErrHandler(func(nc *nats.Conn, err error) {
|
|
log.Warn().Err(err).Msg("NATS disconnected")
|
|
}),
|
|
nats.ReconnectHandler(func(nc *nats.Conn) {
|
|
log.Info().Str("url", nc.ConnectedUrl()).Msg("NATS reconnected")
|
|
}),
|
|
}
|
|
|
|
nc, err := nats.Connect(natsURL, opts...)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("failed to connect to NATS at %s: %w", natsURL, err)
|
|
}
|
|
|
|
log.Info().
|
|
Str("nats_url", natsURL).
|
|
Str("server_url", nc.ConnectedUrl()).
|
|
Msg("✅ Connected to NATS for council publishing")
|
|
|
|
return &CouncilPublisher{nc: nc}, nil
|
|
}
|
|
|
|
func (p *CouncilPublisher) PublishCouncilOpportunity(ctx context.Context, opp *p2p.CouncilOpportunity) error {
|
|
// Marshal opportunity to JSON
|
|
data, err := json.Marshal(opp)
|
|
if err != nil {
|
|
return fmt.Errorf("failed to marshal council opportunity: %w", err)
|
|
}
|
|
|
|
// Construct subject (NATS topic)
|
|
subject := "chorus.councils.forming"
|
|
|
|
// Create message with headers for tracing
|
|
msg := &nats.Msg{
|
|
Subject: subject,
|
|
Data: data,
|
|
Header: nats.Header{
|
|
"Council-ID": []string{opp.CouncilID.String()},
|
|
"Project": []string{opp.ProjectName},
|
|
"Repository": []string{opp.Repository},
|
|
"UCXL-Address": []string{opp.UCXLAddress},
|
|
"Published-At": []string{time.Now().Format(time.RFC3339)},
|
|
},
|
|
}
|
|
|
|
// Publish to NATS
|
|
if err := p.nc.PublishMsg(msg); err != nil {
|
|
return fmt.Errorf("failed to publish to NATS subject %s: %w", subject, err)
|
|
}
|
|
|
|
// Flush to ensure delivery
|
|
if err := p.nc.FlushTimeout(5 * time.Second); err != nil {
|
|
log.Warn().Err(err).Msg("NATS flush timeout (message likely delivered)")
|
|
}
|
|
|
|
log.Info().
|
|
Str("council_id", opp.CouncilID.String()).
|
|
Str("project", opp.ProjectName).
|
|
Str("subject", subject).
|
|
Int("core_roles", len(opp.CoreRoles)).
|
|
Int("optional_roles", len(opp.OptionalRoles)).
|
|
Msg("📡 Published council opportunity to NATS")
|
|
|
|
return nil
|
|
}
|
|
|
|
func (p *CouncilPublisher) PublishCouncilStatus(ctx context.Context, update *p2p.CouncilStatusUpdate) error {
|
|
data, err := json.Marshal(update)
|
|
if err != nil {
|
|
return fmt.Errorf("failed to marshal council status: %w", err)
|
|
}
|
|
|
|
subject := fmt.Sprintf("chorus.councils.%s.status", update.CouncilID.String())
|
|
|
|
msg := &nats.Msg{
|
|
Subject: subject,
|
|
Data: data,
|
|
Header: nats.Header{
|
|
"Council-ID": []string{update.CouncilID.String()},
|
|
"Status": []string{update.Status},
|
|
},
|
|
}
|
|
|
|
if err := p.nc.PublishMsg(msg); err != nil {
|
|
return fmt.Errorf("failed to publish council status: %w", err)
|
|
}
|
|
|
|
log.Info().
|
|
Str("council_id", update.CouncilID.String()).
|
|
Str("status", update.Status).
|
|
Str("subject", subject).
|
|
Msg("📢 Published council status update to NATS")
|
|
|
|
return nil
|
|
}
|
|
|
|
func (p *CouncilPublisher) Close() error {
|
|
if p.nc != nil && !p.nc.IsClosed() {
|
|
p.nc.Close()
|
|
log.Info().Msg("Closed NATS connection")
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (p *CouncilPublisher) Health() error {
|
|
if p.nc == nil {
|
|
return fmt.Errorf("NATS connection is nil")
|
|
}
|
|
if p.nc.IsClosed() {
|
|
return fmt.Errorf("NATS connection is closed")
|
|
}
|
|
if !p.nc.IsConnected() {
|
|
return fmt.Errorf("NATS connection is not connected")
|
|
}
|
|
return nil
|
|
}
|
|
```
|
|
|
|
**Task 1.2**: Integrate publisher into WHOOSH server (2 hours)
|
|
|
|
**File**: `/home/tony/chorus/project-queues/active/WHOOSH/internal/server/server.go`
|
|
|
|
```go
|
|
import (
|
|
natspub "github.com/chorus-services/whoosh/internal/nats"
|
|
)
|
|
|
|
type Server struct {
|
|
// ... existing fields
|
|
natsPublisher *natspub.CouncilPublisher // NEW
|
|
}
|
|
|
|
func NewServer(cfg *config.Config, db *pgxpool.Pool) *Server {
|
|
// ... existing setup
|
|
|
|
// NEW: Initialize NATS publisher
|
|
natsURL := os.Getenv("NATS_URL")
|
|
if natsURL == "" {
|
|
natsURL = "nats://backbeat-nats:4222" // Default to BACKBEAT NATS
|
|
}
|
|
|
|
natsPublisher, err := natspub.NewCouncilPublisher(natsURL)
|
|
if err != nil {
|
|
log.Fatal().Err(err).Msg("Failed to initialize NATS publisher")
|
|
}
|
|
|
|
return &Server{
|
|
// ... existing fields
|
|
natsPublisher: natsPublisher,
|
|
}
|
|
}
|
|
|
|
func (s *Server) Stop() error {
|
|
// ... existing shutdown
|
|
|
|
// NEW: Close NATS connection
|
|
if s.natsPublisher != nil {
|
|
s.natsPublisher.Close()
|
|
}
|
|
|
|
return nil
|
|
}
|
|
```
|
|
|
|
**Task 1.3**: Update broadcaster to use NATS (2 hours)
|
|
|
|
**File**: `/home/tony/chorus/project-queues/active/WHOOSH/internal/p2p/broadcaster.go`
|
|
|
|
```go
|
|
// Add field to Broadcaster struct
|
|
type Broadcaster struct {
|
|
discovery *Discovery
|
|
natsPublisher *nats.CouncilPublisher // NEW
|
|
ctx context.Context
|
|
cancel context.CancelFunc
|
|
}
|
|
|
|
// Update BroadcastCouncilOpportunity to use NATS
|
|
func (b *Broadcaster) BroadcastCouncilOpportunity(ctx context.Context, opportunity *CouncilOpportunity) error {
|
|
log.Info().
|
|
Str("council_id", opportunity.CouncilID.String()).
|
|
Str("project_name", opportunity.ProjectName).
|
|
Msg("📡 Broadcasting council opportunity via NATS")
|
|
|
|
// NEW: Use NATS pub/sub instead of HTTP
|
|
if b.natsPublisher != nil {
|
|
err := b.natsPublisher.PublishCouncilOpportunity(ctx, opportunity)
|
|
if err != nil {
|
|
log.Error().Err(err).Msg("Failed to publish via NATS, falling back to HTTP")
|
|
// Fallback to HTTP (during transition period)
|
|
return b.broadcastViaHTTP(ctx, opportunity)
|
|
}
|
|
|
|
log.Info().
|
|
Str("council_id", opportunity.CouncilID.String()).
|
|
Msg("✅ Council opportunity published to NATS")
|
|
|
|
return nil
|
|
}
|
|
|
|
// Fallback: Use existing HTTP broadcast
|
|
return b.broadcastViaHTTP(ctx, opportunity)
|
|
}
|
|
|
|
// Keep existing HTTP method as fallback
|
|
func (b *Broadcaster) broadcastViaHTTP(ctx context.Context, opportunity *CouncilOpportunity) error {
|
|
// ... existing HTTP broadcast logic
|
|
}
|
|
```
|
|
|
|
#### Day 2: CHORUS Subscriber Implementation
|
|
|
|
**Task 2.1**: Create NATS subscriber module (3 hours)
|
|
|
|
**New File**: `/home/tony/chorus/project-queues/active/CHORUS/internal/nats/subscriber.go`
|
|
|
|
```go
|
|
package nats
|
|
|
|
import (
|
|
"context"
|
|
"encoding/json"
|
|
"fmt"
|
|
"time"
|
|
|
|
"chorus/internal/council"
|
|
"github.com/nats-io/nats.go"
|
|
"github.com/rs/zerolog/log"
|
|
)
|
|
|
|
type CouncilSubscriber struct {
|
|
nc *nats.Conn
|
|
subs []*nats.Subscription
|
|
councilMgr *council.Manager
|
|
ctx context.Context
|
|
cancel context.CancelFunc
|
|
}
|
|
|
|
func NewCouncilSubscriber(natsURL string, councilMgr *council.Manager) (*CouncilSubscriber, error) {
|
|
ctx, cancel := context.WithCancel(context.Background())
|
|
|
|
opts := []nats.Option{
|
|
nats.Name(fmt.Sprintf("CHORUS Agent %s", councilMgr.AgentID)),
|
|
nats.MaxReconnects(-1),
|
|
nats.ReconnectWait(1 * time.Second),
|
|
nats.DisconnectErrHandler(func(nc *nats.Conn, err error) {
|
|
log.Warn().Err(err).Msg("NATS disconnected")
|
|
}),
|
|
nats.ReconnectHandler(func(nc *nats.Conn) {
|
|
log.Info().Msg("NATS reconnected")
|
|
}),
|
|
}
|
|
|
|
nc, err := nats.Connect(natsURL, opts...)
|
|
if err != nil {
|
|
cancel()
|
|
return nil, fmt.Errorf("failed to connect to NATS: %w", err)
|
|
}
|
|
|
|
s := &CouncilSubscriber{
|
|
nc: nc,
|
|
subs: make([]*nats.Subscription, 0),
|
|
councilMgr: councilMgr,
|
|
ctx: ctx,
|
|
cancel: cancel,
|
|
}
|
|
|
|
// Subscribe to council opportunities
|
|
if err := s.subscribeToOpportunities(); err != nil {
|
|
nc.Close()
|
|
cancel()
|
|
return nil, fmt.Errorf("failed to subscribe to opportunities: %w", err)
|
|
}
|
|
|
|
// Subscribe to council status updates
|
|
if err := s.subscribeToStatusUpdates(); err != nil {
|
|
log.Warn().Err(err).Msg("Failed to subscribe to status updates (non-critical)")
|
|
}
|
|
|
|
log.Info().
|
|
Str("nats_url", natsURL).
|
|
Str("agent_id", councilMgr.AgentID).
|
|
Msg("✅ Subscribed to NATS council topics")
|
|
|
|
return s, nil
|
|
}
|
|
|
|
func (s *CouncilSubscriber) subscribeToOpportunities() error {
|
|
subject := "chorus.councils.forming"
|
|
|
|
sub, err := s.nc.Subscribe(subject, func(msg *nats.Msg) {
|
|
s.handleCouncilOpportunity(msg)
|
|
})
|
|
|
|
if err != nil {
|
|
return fmt.Errorf("failed to subscribe to %s: %w", subject, err)
|
|
}
|
|
|
|
s.subs = append(s.subs, sub)
|
|
|
|
log.Info().
|
|
Str("subject", subject).
|
|
Msg("📬 Subscribed to council opportunities")
|
|
|
|
return nil
|
|
}
|
|
|
|
func (s *CouncilSubscriber) subscribeToStatusUpdates() error {
|
|
subject := "chorus.councils.*.status" // Wildcard subscription
|
|
|
|
sub, err := s.nc.Subscribe(subject, func(msg *nats.Msg) {
|
|
s.handleCouncilStatus(msg)
|
|
})
|
|
|
|
if err != nil {
|
|
return fmt.Errorf("failed to subscribe to %s: %w", subject, err)
|
|
}
|
|
|
|
s.subs = append(s.subs, sub)
|
|
|
|
log.Info().
|
|
Str("subject", subject).
|
|
Msg("📬 Subscribed to council status updates")
|
|
|
|
return nil
|
|
}
|
|
|
|
func (s *CouncilSubscriber) handleCouncilOpportunity(msg *nats.Msg) {
|
|
// Parse council opportunity from message
|
|
var opp council.Opportunity
|
|
if err := json.Unmarshal(msg.Data, &opp); err != nil {
|
|
log.Error().
|
|
Err(err).
|
|
Str("subject", msg.Subject).
|
|
Msg("Failed to unmarshal council opportunity")
|
|
return
|
|
}
|
|
|
|
// Extract headers for logging
|
|
councilID := msg.Header.Get("Council-ID")
|
|
projectName := msg.Header.Get("Project")
|
|
|
|
log.Info().
|
|
Str("council_id", councilID).
|
|
Str("project", projectName).
|
|
Str("subject", msg.Subject).
|
|
Int("core_roles", len(opp.CoreRoles)).
|
|
Msg("📥 Received council opportunity from NATS")
|
|
|
|
// Delegate to council manager (existing logic)
|
|
ctx, cancel := context.WithTimeout(s.ctx, 30*time.Second)
|
|
defer cancel()
|
|
|
|
if err := s.councilMgr.HandleOpportunity(ctx, &opp); err != nil {
|
|
log.Error().
|
|
Err(err).
|
|
Str("council_id", councilID).
|
|
Msg("Failed to handle council opportunity")
|
|
return
|
|
}
|
|
|
|
log.Debug().
|
|
Str("council_id", councilID).
|
|
Msg("Successfully processed council opportunity")
|
|
}
|
|
|
|
func (s *CouncilSubscriber) handleCouncilStatus(msg *nats.Msg) {
|
|
var status council.StatusUpdate
|
|
if err := json.Unmarshal(msg.Data, &status); err != nil {
|
|
log.Error().Err(err).Msg("Failed to unmarshal council status")
|
|
return
|
|
}
|
|
|
|
log.Info().
|
|
Str("council_id", status.CouncilID.String()).
|
|
Str("status", status.Status).
|
|
Msg("📥 Received council status update from NATS")
|
|
|
|
// Delegate to council manager
|
|
if err := s.councilMgr.HandleStatusUpdate(s.ctx, &status); err != nil {
|
|
log.Error().
|
|
Err(err).
|
|
Str("council_id", status.CouncilID.String()).
|
|
Msg("Failed to handle council status update")
|
|
}
|
|
}
|
|
|
|
func (s *CouncilSubscriber) Close() error {
|
|
s.cancel()
|
|
|
|
// Unsubscribe all subscriptions
|
|
for _, sub := range s.subs {
|
|
if err := sub.Unsubscribe(); err != nil {
|
|
log.Warn().Err(err).Msg("Failed to unsubscribe from NATS")
|
|
}
|
|
}
|
|
|
|
// Close NATS connection
|
|
if s.nc != nil && !s.nc.IsClosed() {
|
|
s.nc.Close()
|
|
log.Info().Msg("Closed NATS connection")
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (s *CouncilSubscriber) Health() error {
|
|
if s.nc == nil {
|
|
return fmt.Errorf("NATS connection is nil")
|
|
}
|
|
if s.nc.IsClosed() {
|
|
return fmt.Errorf("NATS connection is closed")
|
|
}
|
|
if !s.nc.IsConnected() {
|
|
return fmt.Errorf("NATS not connected")
|
|
}
|
|
return nil
|
|
}
|
|
```
|
|
|
|
**Task 2.2**: Integrate subscriber into CHORUS main (2 hours)
|
|
|
|
**File**: `/home/tony/chorus/project-queues/active/CHORUS/cmd/chorus-agent/main.go`
|
|
|
|
```go
|
|
import (
|
|
natssub "chorus/internal/nats"
|
|
)
|
|
|
|
func main() {
|
|
// ... existing setup
|
|
|
|
// Initialize council manager
|
|
councilMgr := council.NewManager(agentID, agentName, endpoint, p2pAddr, capabilities)
|
|
|
|
// NEW: Subscribe to NATS instead of HTTP endpoint
|
|
natsURL := os.Getenv("NATS_URL")
|
|
if natsURL == "" {
|
|
natsURL = "nats://backbeat-nats:4222"
|
|
}
|
|
|
|
natsSubscriber, err := natssub.NewCouncilSubscriber(natsURL, councilMgr)
|
|
if err != nil {
|
|
log.Fatal().Err(err).Msg("Failed to initialize NATS subscriber")
|
|
}
|
|
defer natsSubscriber.Close()
|
|
|
|
log.Info().
|
|
Str("agent_id", agentID).
|
|
Str("nats_url", natsURL).
|
|
Msg("🎧 CHORUS agent listening for council opportunities via NATS")
|
|
|
|
// HTTP server still runs for health checks
|
|
httpServer := api.NewHTTPServer(cfg, node, hlog, ps)
|
|
go func() {
|
|
if err := httpServer.Start(); err != nil && err != http.ErrServerClosed {
|
|
log.Error().Err(err).Msg("HTTP server error")
|
|
}
|
|
}()
|
|
|
|
// ... rest of main
|
|
}
|
|
```
|
|
|
|
#### Day 3-4: Testing, Deployment, and Validation
|
|
|
|
**Task 3.1**: Local testing with Docker Compose (4 hours)
|
|
|
|
**Create test environment**: `/home/tony/chorus/project-queues/active/WHOOSH/docker-compose.nats-test.yml`
|
|
|
|
```yaml
|
|
version: '3.8'
|
|
|
|
services:
|
|
nats:
|
|
image: nats:2.9-alpine
|
|
command: ["--jetstream", "--debug"]
|
|
ports:
|
|
- "4222:4222"
|
|
- "8222:8222" # Monitoring port
|
|
networks:
|
|
- test-net
|
|
|
|
whoosh:
|
|
build: .
|
|
environment:
|
|
- NATS_URL=nats://nats:4222
|
|
- CHORUS_SERVICE_NAME=test_chorus
|
|
depends_on:
|
|
- nats
|
|
networks:
|
|
- test-net
|
|
|
|
chorus-agent-1:
|
|
build: ../CHORUS
|
|
environment:
|
|
- NATS_URL=nats://nats:4222
|
|
- CHORUS_AGENT_ID=test-agent-1
|
|
depends_on:
|
|
- nats
|
|
networks:
|
|
- test-net
|
|
|
|
chorus-agent-2:
|
|
build: ../CHORUS
|
|
environment:
|
|
- NATS_URL=nats://nats:4222
|
|
- CHORUS_AGENT_ID=test-agent-2
|
|
depends_on:
|
|
- nats
|
|
networks:
|
|
- test-net
|
|
|
|
networks:
|
|
test-net:
|
|
driver: bridge
|
|
```
|
|
|
|
**Run tests**:
|
|
```bash
|
|
cd /home/tony/chorus/project-queues/active/WHOOSH
|
|
docker-compose -f docker-compose.nats-test.yml up
|
|
|
|
# In another terminal, trigger council formation
|
|
curl -X POST http://localhost:8080/api/v1/councils \
|
|
-H "Content-Type: application/json" \
|
|
-d '{
|
|
"project_name": "NATS Test",
|
|
"repository": "https://gitea.chorus.services/tony/test",
|
|
"core_roles": ["tpm", "developer"]
|
|
}'
|
|
|
|
# Verify both agents received message
|
|
docker-compose -f docker-compose.nats-test.yml logs chorus-agent-1 | grep "Received council opportunity"
|
|
docker-compose -f docker-compose.nats-test.yml logs chorus-agent-2 | grep "Received council opportunity"
|
|
```
|
|
|
|
**Task 3.2**: Production deployment (4 hours)
|
|
|
|
```bash
|
|
# Build and push WHOOSH
|
|
cd /home/tony/chorus/project-queues/active/WHOOSH
|
|
docker build -t registry.home.deepblack.cloud/whoosh:v1.3.0-nats .
|
|
docker push registry.home.deepblack.cloud/whoosh:v1.3.0-nats
|
|
|
|
# Build and push CHORUS
|
|
cd /home/tony/chorus/project-queues/active/CHORUS
|
|
make build-agent
|
|
docker build -f Dockerfile.simple -t registry.home.deepblack.cloud/chorus:v0.6.0-nats .
|
|
docker push registry.home.deepblack.cloud/chorus:v0.6.0-nats
|
|
|
|
# Update docker-compose files
|
|
# Update WHOOSH_BACKBEAT-prototype/docker-compose.swarm.yml:
|
|
# whoosh:
|
|
# image: registry.home.deepblack.cloud/whoosh:v1.3.0-nats
|
|
# environment:
|
|
# - NATS_URL=nats://backbeat-nats:4222
|
|
|
|
# Update CHORUS/docker/docker-compose.yml:
|
|
# chorus:
|
|
# image: registry.home.deepblack.cloud/chorus:v0.6.0-nats
|
|
# environment:
|
|
# - NATS_URL=nats://backbeat-nats:4222
|
|
|
|
# Deploy to Swarm
|
|
docker stack deploy -c WHOOSH_BACKBEAT-prototype/docker-compose.swarm.yml WHOOSH
|
|
docker stack deploy -c CHORUS/docker/docker-compose.yml CHORUS
|
|
```
|
|
|
|
**Task 3.3**: Validation and monitoring (2 hours)
|
|
|
|
```bash
|
|
# Monitor NATS connections
|
|
docker service logs WHOOSH_backbeat-nats --tail 100
|
|
|
|
# Monitor WHOOSH publishing
|
|
docker service logs WHOOSH_whoosh | grep "Published council opportunity"
|
|
|
|
# Monitor CHORUS subscriptions
|
|
docker service logs CHORUS_chorus | grep "Received council opportunity"
|
|
|
|
# Verify council formation works end-to-end
|
|
# (trigger test council via UI or API)
|
|
```
|
|
|
|
**Total Implementation Time**: 3-4 days
|
|
|
|
---
|
|
|
|
## 7. Timeline Estimates
|
|
|
|
### 7.1 Gantt Chart - Option 4: Hybrid Approach (RECOMMENDED)
|
|
|
|
```
|
|
PHASE 1: Docker API Quick Fix (Week 1)
|
|
=========================================
|
|
|
|
Day 1 (Friday)
|
|
├─ 09:00-13:00 Implementation
|
|
│ ├─ 09:00-11:00 Code changes (discovery.go)
|
|
│ ├─ 11:00-11:30 Dependency updates (go.mod)
|
|
│ ├─ 11:30-12:00 Docker compose changes
|
|
│ └─ 12:00-13:00 Build and push images
|
|
│
|
|
└─ 14:00-17:00 Testing and Deployment
|
|
├─ 14:00-15:00 Local testing
|
|
├─ 15:00-16:00 Swarm deployment
|
|
├─ 16:00-17:00 Validation and monitoring
|
|
└─ 17:00 ✅ Phase 1 Complete
|
|
|
|
PHASE 1 VALIDATION (Week 2)
|
|
===========================
|
|
|
|
Monday-Friday
|
|
├─ Daily monitoring
|
|
├─ Performance metrics collection
|
|
├─ Bug fixes (if needed)
|
|
└─ Friday: Phase 1 sign-off meeting
|
|
|
|
PHASE 2: NATS Migration (Week 3)
|
|
=================================
|
|
|
|
Monday
|
|
├─ 09:00-12:00 WHOOSH publisher implementation
|
|
└─ 13:00-17:00 WHOOSH integration and testing
|
|
|
|
Tuesday
|
|
├─ 09:00-12:00 CHORUS subscriber implementation
|
|
└─ 13:00-17:00 CHORUS integration and testing
|
|
|
|
Wednesday
|
|
├─ 09:00-12:00 Local integration testing
|
|
├─ 13:00-15:00 Build and push images
|
|
└─ 15:00-17:00 Staging deployment
|
|
|
|
Thursday
|
|
├─ 09:00-11:00 Staging validation
|
|
├─ 11:00-13:00 Production deployment
|
|
└─ 14:00-17:00 Production monitoring
|
|
|
|
Friday
|
|
├─ 09:00-12:00 Performance validation
|
|
├─ 13:00-15:00 Remove HTTP fallback code
|
|
├─ 15:00-16:00 Documentation
|
|
└─ 16:00 ✅ Phase 2 Complete
|
|
|
|
Total: 2 weeks + 3 days
|
|
```
|
|
|
|
### 7.2 Resource Allocation
|
|
|
|
| Role | Phase 1 (Days) | Phase 2 (Days) | Total |
|
|
|------|---------------|----------------|-------|
|
|
| Senior Software Architect | 0.5 | 1.0 | 1.5 |
|
|
| Backend Developer | 1.0 | 3.0 | 4.0 |
|
|
| DevOps Engineer | 0.5 | 1.0 | 1.5 |
|
|
| QA Engineer | 0.5 | 1.0 | 1.5 |
|
|
| **Total Person-Days** | **2.5** | **6.0** | **8.5** |
|
|
|
|
### 7.3 Critical Path Analysis
|
|
|
|
**Phase 1 Critical Path**:
|
|
1. Code implementation (4 hours) - BLOCKER
|
|
2. Docker socket mount (10 min) - BLOCKER
|
|
3. Build/push (30 min) - BLOCKER
|
|
4. Deployment (30 min) - BLOCKER
|
|
5. Validation (1 hour) - BLOCKER
|
|
|
|
**Total Critical Path**: 6 hours (can be completed in 1 day)
|
|
|
|
**Phase 2 Critical Path**:
|
|
1. WHOOSH publisher (Day 1) - BLOCKER
|
|
2. CHORUS subscriber (Day 2) - BLOCKER for Day 3
|
|
3. Integration testing (Day 3 AM) - BLOCKER for deployment
|
|
4. Production deployment (Day 3 PM) - BLOCKER
|
|
5. Validation (Day 4) - BLOCKER for sign-off
|
|
|
|
**Total Critical Path**: 3.5 days
|
|
|
|
---
|
|
|
|
## 8. Final Recommendation
|
|
|
|
### 8.1 Executive Summary
|
|
|
|
**Implement Option 4: Hybrid Approach**
|
|
|
|
**Phase 1** (Next 48 hours): Deploy Docker API Quick Fix
|
|
**Phase 2** (Week 3): Migrate to NATS-Only Solution
|
|
**Defer**: Full libp2p migration until P2P discovery is truly needed
|
|
|
|
### 8.2 Rationale
|
|
|
|
#### Why NOT Full libp2p Migration Now?
|
|
|
|
1. **Complexity vs. Value**: libp2p requires 650+ LOC changes, 7-11 days implementation, and steep learning curve. Current requirements (broadcast to all agents) don't justify this complexity.
|
|
|
|
2. **Infrastructure Already Exists**: NATS is already deployed (backbeat-nats service) and proven in production. Using existing infrastructure reduces risk and operational overhead.
|
|
|
|
3. **No P2P Discovery Needed Yet**: Current architecture has centralized WHOOSH coordinator. True peer-to-peer discovery (mDNS/DHT) isn't required until agents need to discover each other without WHOOSH.
|
|
|
|
4. **Migration Path Preserved**: NATS pub/sub can later be replaced with libp2p gossipsub with minimal changes (both use topic-based messaging).
|
|
|
|
#### Why Docker API First?
|
|
|
|
1. **Immediate Unblock**: Solves the critical discovery problem in 1 day, unblocking E2E workflow testing.
|
|
|
|
2. **Minimal Risk**: 87 LOC change, easy rollback, no architectural changes.
|
|
|
|
3. **Validation Period**: Gives 1 week to validate council formation works before starting Phase 2.
|
|
|
|
4. **Dual-Mode Safety**: Allows testing NATS migration while keeping Docker API as fallback.
|
|
|
|
#### Why NATS Long-Term?
|
|
|
|
1. **Right Complexity Level**: Solves 90% of use cases (broadcast, persistence, reliability) with 20% of libp2p complexity.
|
|
|
|
2. **Production-Ready**: NATS is battle-tested, well-documented, and the team has experience with it (BACKBEAT SDK).
|
|
|
|
3. **Performance**: Sub-millisecond message delivery, scales to millions of messages/sec.
|
|
|
|
4. **Operational Simplicity**: Centralized monitoring, simple debugging, clear failure modes.
|
|
|
|
5. **Future-Proof**: When/if P2P discovery is needed, NATS can remain for messaging while adding libp2p for discovery only.
|
|
|
|
### 8.3 Implementation Sequence
|
|
|
|
```
|
|
┌─────────────────────────────────────────────────────────┐
|
|
│ WEEK 1: Emergency Unblock │
|
|
├─────────────────────────────────────────────────────────┤
|
|
│ Friday: Docker API implementation (6 hours) │
|
|
│ Result: 100% agent discovery, council formation works │
|
|
└─────────────────────────────────────────────────────────┘
|
|
↓
|
|
┌─────────────────────────────────────────────────────────┐
|
|
│ WEEK 2: Validation │
|
|
├─────────────────────────────────────────────────────────┤
|
|
│ Mon-Fri: Monitor production, collect metrics │
|
|
│ Result: Confidence in Docker API solution │
|
|
└─────────────────────────────────────────────────────────┘
|
|
↓
|
|
┌─────────────────────────────────────────────────────────┐
|
|
│ WEEK 3: NATS Migration │
|
|
├─────────────────────────────────────────────────────────┤
|
|
│ Mon: WHOOSH publisher implementation │
|
|
│ Tue: CHORUS subscriber implementation │
|
|
│ Wed AM: Integration testing │
|
|
│ Wed PM: Production deployment (dual-mode) │
|
|
│ Thu-Fri: Validation, remove Docker API fallback │
|
|
│ Result: Production NATS-based pub/sub messaging │
|
|
└─────────────────────────────────────────────────────────┘
|
|
↓
|
|
┌─────────────────────────────────────────────────────────┐
|
|
│ FUTURE: libp2p (when needed) │
|
|
├─────────────────────────────────────────────────────────┤
|
|
│ Trigger: Need for P2P discovery, DHT storage, or │
|
|
│ cross-datacenter mesh │
|
|
│ Approach: Add libp2p for discovery, keep NATS for │
|
|
│ messaging (hybrid architecture) │
|
|
└─────────────────────────────────────────────────────────┘
|
|
```
|
|
|
|
### 8.4 Success Metrics
|
|
|
|
**Phase 1 Success Criteria**:
|
|
- ✅ All 34 CHORUS agents discovered by WHOOSH
|
|
- ✅ Council broadcasts reach 100% of agents
|
|
- ✅ Council formation completes in <60 seconds
|
|
- ✅ Task execution begins successfully
|
|
- ✅ Zero production incidents
|
|
|
|
**Phase 2 Success Criteria**:
|
|
- ✅ NATS message delivery rate >99.9%
|
|
- ✅ Message latency <100ms (p95)
|
|
- ✅ Zero message loss
|
|
- ✅ Successful rollback test
|
|
- ✅ Zero production incidents during migration
|
|
|
|
### 8.5 Risk Mitigation Summary
|
|
|
|
| Risk | Mitigation |
|
|
|------|------------|
|
|
| Phase 1 deployment failure | Easy rollback (remove docker.sock mount) |
|
|
| Docker API performance issues | Monitor task list query latency, cache results |
|
|
| Phase 2 NATS connection failures | Keep HTTP fallback during transition, NATS clustering |
|
|
| Message delivery failures | JetStream persistence, at-least-once delivery guarantees |
|
|
| Coordinated deployment issues | Dual-mode operation (HTTP + NATS) during transition |
|
|
| Team knowledge gaps | NATS already used in BACKBEAT, extensive documentation |
|
|
|
|
### 8.6 Final Architecture Vision
|
|
|
|
```
|
|
┌────────────────────────────────────────────────────┐
|
|
│ FINAL ARCHITECTURE │
|
|
│ (End of Week 3) │
|
|
└────────────────────────────────────────────────────┘
|
|
|
|
┌─────────────┐
|
|
│ WHOOSH │ - NATS publisher
|
|
│ │ - Council orchestrator
|
|
└──────┬──────┘ - Docker API for health monitoring
|
|
│
|
|
│ Publish to NATS subject
|
|
│ "chorus.councils.forming"
|
|
▼
|
|
┌──────────────────────────────────────────────────┐
|
|
│ NATS Server (JetStream) │
|
|
│ - Already deployed as backbeat-nats │
|
|
│ - Clustered for HA (future) │
|
|
│ - Message persistence enabled │
|
|
│ - Monitoring via NATS CLI / Prometheus │
|
|
└──────┬──────┬──────┬──────┬──────┬──────────────┘
|
|
│ │ │ │ │
|
|
┌───▼┐ ┌──▼─┐ ┌──▼─┐ ┌─▼──┐ (... 34 total)
|
|
│CH1 │ │CH2 │ │CH3 │ │CH4 │ CHORUS agents
|
|
│SUB │ │SUB │ │SUB │ │SUB │ - NATS subscribers
|
|
└────┘ └────┘ └────┘ └────┘ - HTTP for health checks
|
|
- libp2p for P2P comms (future)
|
|
|
|
Benefits:
|
|
✅ 100% agent discovery
|
|
✅ Sub-100ms message delivery
|
|
✅ At-least-once delivery guarantees
|
|
✅ Message persistence and replay
|
|
✅ Scalable to 1000+ agents
|
|
✅ Familiar technology stack
|
|
✅ Simple operational model
|
|
✅ Clear upgrade path to libp2p
|
|
```
|
|
|
|
### 8.7 Action Items
|
|
|
|
**Immediate (Next 24 hours)**:
|
|
1. [ ] Approve Hybrid Approach recommendation
|
|
2. [ ] Schedule Phase 1 implementation (Friday)
|
|
3. [ ] Assign developer resources
|
|
4. [ ] Prepare rollback procedures
|
|
5. [ ] Set up monitoring dashboards
|
|
|
|
**Week 1**:
|
|
1. [ ] Implement Docker API discovery
|
|
2. [ ] Deploy to production Swarm
|
|
3. [ ] Validate 100% agent discovery
|
|
4. [ ] Verify council formation E2E
|
|
5. [ ] Document Phase 1 results
|
|
|
|
**Week 2**:
|
|
1. [ ] Monitor production stability
|
|
2. [ ] Collect performance metrics
|
|
3. [ ] Plan Phase 2 NATS migration
|
|
4. [ ] Review NATS clustering options
|
|
5. [ ] Schedule Phase 2 deployment window
|
|
|
|
**Week 3**:
|
|
1. [ ] Implement NATS publisher/subscriber
|
|
2. [ ] Integration testing
|
|
3. [ ] Production deployment (dual-mode)
|
|
4. [ ] Validate NATS messaging
|
|
5. [ ] Remove Docker API fallback
|
|
6. [ ] Document final architecture
|
|
|
|
---
|
|
|
|
## 9. Appendix
|
|
|
|
### 9.1 ASCII Architecture Diagram - Current vs. Future
|
|
|
|
**CURRENT (Broken)**:
|
|
```
|
|
┌─────────┐ ┌─────────────────────┐
|
|
│ WHOOSH │────HTTP GET───────→│ Docker Swarm VIP │
|
|
│ │ (chorus:8081) │ 10.0.13.26 │
|
|
└─────────┘ └──────────┬──────────┘
|
|
│
|
|
Round-robin to random
|
|
↓
|
|
┌───────────┴───────────┐
|
|
▼ ▼
|
|
┌─────────┐ ┌─────────┐
|
|
│CHORUS #1│ │CHORUS #2│
|
|
│ FOUND │ │ FOUND │
|
|
└─────────┘ └─────────┘
|
|
|
|
❌ 32 other agents invisible!
|
|
```
|
|
|
|
**PHASE 1 (Docker API)**:
|
|
```
|
|
┌─────────┐ ┌─────────────────────┐
|
|
│ WHOOSH │────Docker API─────→│ Swarm Manager │
|
|
│ + sock │ ListTasks() │ /var/run/docker.sock│
|
|
└─────────┘ └──────────┬──────────┘
|
|
│
|
|
Returns ALL 34 tasks
|
|
with IPs
|
|
↓
|
|
┌─────────────────────┴──────────────┐
|
|
▼ ▼ ▼
|
|
┌─────────┐ ┌─────────┐ ┌─────────┐
|
|
│CHORUS #1│ │CHORUS #2│ ... │CHORUS #34│
|
|
│10.0.13.1│ │10.0.13.2│ │10.0.13.34│
|
|
└─────────┘ └─────────┘ └─────────┘
|
|
|
|
✅ ALL agents discovered via direct IP!
|
|
```
|
|
|
|
**PHASE 2 (NATS)**:
|
|
```
|
|
┌─────────┐ ┌─────────────────────┐
|
|
│ WHOOSH │────PUBLISH────────→│ NATS Server │
|
|
│ + NATS │ subject: │ (JetStream) │
|
|
└─────────┘ chorus.councils.* └──────────┬──────────┘
|
|
│
|
|
Broadcast to
|
|
all subscribers
|
|
│
|
|
┌─────────────────────┴──────────────┐
|
|
▼ ▼ ▼
|
|
┌─────────┐ ┌─────────┐ ┌─────────┐
|
|
│CHORUS #1│ │CHORUS #2│ ... │CHORUS #34│
|
|
│SUB: │ │SUB: │ │SUB: │
|
|
│forming │ │forming │ │forming │
|
|
└─────────┘ └─────────┘ └─────────┘
|
|
|
|
✅ Pub/sub messaging, all receive simultaneously!
|
|
```
|
|
|
|
### 9.2 Code File Changes Summary
|
|
|
|
**Phase 1: Docker API**
|
|
|
|
| File | Action | LOC | Description |
|
|
|------|--------|-----|-------------|
|
|
| `WHOOSH/internal/p2p/discovery.go` | MODIFY | +80 | Add discoverDockerSwarmAgents() |
|
|
| `WHOOSH/go.mod` | MODIFY | +2 | Add docker/docker dependency |
|
|
| `WHOOSH/docker-compose.swarm.yml` | MODIFY | +5 | Mount docker.sock |
|
|
| **Total** | | **87** | |
|
|
|
|
**Phase 2: NATS**
|
|
|
|
| File | Action | LOC | Description |
|
|
|------|--------|-----|-------------|
|
|
| `WHOOSH/internal/nats/publisher.go` | CREATE | +150 | NATS publisher implementation |
|
|
| `WHOOSH/internal/server/server.go` | MODIFY | +20 | Integrate NATS publisher |
|
|
| `WHOOSH/internal/p2p/broadcaster.go` | MODIFY | +30 | Use NATS for broadcasting |
|
|
| `WHOOSH/go.mod` | MODIFY | +2 | Add nats-io/nats.go |
|
|
| `CHORUS/internal/nats/subscriber.go` | CREATE | +180 | NATS subscriber implementation |
|
|
| `CHORUS/cmd/chorus-agent/main.go` | MODIFY | +30 | Initialize NATS subscriber |
|
|
| `CHORUS/go.mod` | MODIFY | +2 | Add nats-io/nats.go |
|
|
| **Total** | | **414** | |
|
|
|
|
### 9.3 Environment Variables Reference
|
|
|
|
**WHOOSH**:
|
|
```bash
|
|
# Phase 1
|
|
WHOOSH_DISCOVERY_MODE=docker-swarm
|
|
CHORUS_SERVICE_NAME=CHORUS_chorus
|
|
CHORUS_AGENT_PORT=8080
|
|
|
|
# Phase 2
|
|
NATS_URL=nats://backbeat-nats:4222
|
|
```
|
|
|
|
**CHORUS**:
|
|
```bash
|
|
# Phase 2
|
|
NATS_URL=nats://backbeat-nats:4222
|
|
CHORUS_AGENT_ID=auto-generated-uuid
|
|
CHORUS_AGENT_NAME=CHORUS-Agent-{slot}
|
|
```
|
|
|
|
### 9.4 Monitoring and Observability
|
|
|
|
**Phase 1 Metrics**:
|
|
```prometheus
|
|
# WHOOSH discovery metrics
|
|
whoosh_agent_discovery_total{method="docker_api"} 34
|
|
whoosh_agent_discovery_duration_seconds{method="docker_api"} 0.125
|
|
whoosh_broadcast_success_total{council_id="xxx"} 34
|
|
whoosh_broadcast_error_total{council_id="xxx"} 0
|
|
```
|
|
|
|
**Phase 2 Metrics**:
|
|
```prometheus
|
|
# NATS metrics (built-in)
|
|
nats_connections{name="WHOOSH"} 1
|
|
nats_subscriptions{subject="chorus.councils.forming"} 34
|
|
nats_messages_in{subject="chorus.councils.forming"} 156
|
|
nats_messages_out{subject="chorus.councils.forming"} 156
|
|
nats_message_latency_seconds{p95} 0.045
|
|
|
|
# Application metrics
|
|
whoosh_nats_publish_total{subject="chorus.councils.forming"} 156
|
|
whoosh_nats_publish_errors_total 0
|
|
chorus_nats_receive_total{subject="chorus.councils.forming"} 34
|
|
chorus_council_opportunity_handled_total 34
|
|
```
|
|
|
|
---
|
|
|
|
## Conclusion
|
|
|
|
The Hybrid Approach (Docker API → NATS) provides the optimal balance of:
|
|
- **Immediate value** (1-day unblock)
|
|
- **Low risk** (staged rollout)
|
|
- **Long-term quality** (production-grade pub/sub)
|
|
- **Pragmatic complexity** (simpler than libp2p, sufficient for needs)
|
|
|
|
This architecture will support 100+ agents, sub-second message delivery, and future migration to full libp2p if/when P2P discovery becomes a requirement.
|
|
|
|
**Recommended Decision**: Approve Hybrid Approach and begin Phase 1 implementation immediately.
|
|
|
|
---
|
|
|
|
**Document Version**: 1.0
|
|
**Last Updated**: 2025-10-10
|
|
**Review Date**: 2025-10-17 (after Phase 1 completion)
|