Files
WHOOSH/internal/p2p/swarm_discovery.go
Claude Code 2826b28645 Phase 1: Implement Docker Swarm API agent discovery
Replaces DNS-based discovery (2/34 agents) with Docker API enumeration
to discover ALL running CHORUS containers.

Implementation:
- NEW: internal/p2p/swarm_discovery.go (261 lines)
  * Docker API client for Swarm task enumeration
  * Extracts container IPs from network attachments
  * Optional health verification before registration
  * Comprehensive error handling and logging

- MODIFIED: internal/p2p/discovery.go (~50 lines)
  * Integrated Swarm discovery with fallback to DNS
  * New config: DISCOVERY_METHOD (swarm/dns/auto)
  * Tries Swarm first, falls back gracefully
  * Backward compatible with existing DNS discovery

- NEW: IMPLEMENTATION-SUMMARY-Phase1-Swarm-Discovery.md
  * Complete deployment guide
  * Testing checklist
  * Performance metrics
  * Phase 2 roadmap

Expected Results:
- Discovery: 34/34 agents (100% vs previous ~6%)
- Council activation: Both core roles claimed
- Task execution: Unblocked

Security:
- Read-only Docker socket mount
- No privileged mode required
- Minimal API surface (TaskList + Ping only)

Next: Build image, deploy, verify discovery, activate council

Part of hybrid approach:
- Phase 1: Docker API (this commit) 
- Phase 2: NATS migration (planned Week 3)

Related:
- /home/tony/chorus/docs/DIAGNOSIS-Agent-Discovery-And-P2P-Architecture.md
- /home/tony/chorus/docs/ARCHITECTURE-ANALYSIS-LibP2P-HMMM-Migration.md

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>
2025-10-10 09:48:16 +11:00

262 lines
7.3 KiB
Go

package p2p
import (
"context"
"fmt"
"net/http"
"strings"
"time"
"github.com/docker/docker/api/types"
"github.com/docker/docker/api/types/filters"
"github.com/docker/docker/api/types/swarm"
"github.com/docker/docker/client"
"github.com/rs/zerolog/log"
)
// SwarmDiscovery handles Docker Swarm-based agent discovery by directly querying
// the Docker API to enumerate all running tasks for the CHORUS service.
// This approach solves the DNS VIP limitation where only 2 of 34 agents are discovered.
//
// Design rationale:
// - Docker Swarm DNS returns a single VIP that load-balances to random containers
// - We need to discover ALL containers, not just the ones we randomly connect to
// - By querying the Docker API directly, we can enumerate all running tasks
// - Each task has a network attachment with the actual container IP
type SwarmDiscovery struct {
client *client.Client
serviceName string
networkName string
agentPort int
}
// NewSwarmDiscovery creates a new Docker Swarm-based discovery client.
// The dockerHost parameter should be "unix:///var/run/docker.sock" in production.
func NewSwarmDiscovery(dockerHost, serviceName, networkName string, agentPort int) (*SwarmDiscovery, error) {
// Create Docker client with environment defaults if dockerHost is empty
opts := []client.Opt{
client.FromEnv,
client.WithAPIVersionNegotiation(),
}
if dockerHost != "" {
opts = append(opts, client.WithHost(dockerHost))
}
cli, err := client.NewClientWithOpts(opts...)
if err != nil {
return nil, fmt.Errorf("failed to create Docker client: %w", err)
}
// Verify we can connect to Docker API
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
if _, err := cli.Ping(ctx); err != nil {
return nil, fmt.Errorf("failed to ping Docker API: %w", err)
}
log.Info().
Str("docker_host", dockerHost).
Str("service_name", serviceName).
Str("network_name", networkName).
Int("agent_port", agentPort).
Msg("✅ Docker Swarm discovery client initialized")
return &SwarmDiscovery{
client: cli,
serviceName: serviceName,
networkName: networkName,
agentPort: agentPort,
}, nil
}
// DiscoverAgents queries the Docker Swarm API to find all running CHORUS agent containers.
// It returns a slice of Agent structs with endpoints constructed from container IPs.
//
// Implementation details:
// 1. List all tasks for the specified service
// 2. Filter for tasks in "running" desired state
// 3. Extract container IPs from network attachments
// 4. Build HTTP endpoints (http://<ip>:<port>)
// 5. Optionally verify agents are responsive via health check
func (sd *SwarmDiscovery) DiscoverAgents(ctx context.Context, verifyHealth bool) ([]*Agent, error) {
log.Debug().
Str("service_name", sd.serviceName).
Bool("verify_health", verifyHealth).
Msg("🔍 Starting Docker Swarm agent discovery")
// List all tasks for the CHORUS service
taskFilters := filters.NewArgs()
taskFilters.Add("service", sd.serviceName)
taskFilters.Add("desired-state", "running")
tasks, err := sd.client.TaskList(ctx, types.TaskListOptions{
Filters: taskFilters,
})
if err != nil {
return nil, fmt.Errorf("failed to list Docker tasks: %w", err)
}
if len(tasks) == 0 {
log.Warn().
Str("service_name", sd.serviceName).
Msg("⚠️ No running tasks found for CHORUS service")
return []*Agent{}, nil
}
log.Debug().
Int("task_count", len(tasks)).
Msg("📋 Found Docker Swarm tasks")
agents := make([]*Agent, 0, len(tasks))
for _, task := range tasks {
agent, err := sd.taskToAgent(task)
if err != nil {
log.Warn().
Err(err).
Str("task_id", task.ID).
Msg("⚠️ Failed to convert task to agent")
continue
}
// Optionally verify the agent is responsive
if verifyHealth {
if !sd.verifyAgentHealth(ctx, agent) {
log.Debug().
Str("agent_id", agent.ID).
Str("endpoint", agent.Endpoint).
Msg("⚠️ Agent health check failed, skipping")
continue
}
}
agents = append(agents, agent)
}
log.Info().
Int("discovered_count", len(agents)).
Int("total_tasks", len(tasks)).
Msg("✅ Docker Swarm agent discovery completed")
return agents, nil
}
// taskToAgent converts a Docker Swarm task to an Agent struct.
// It extracts the container IP from network attachments and builds the agent endpoint.
func (sd *SwarmDiscovery) taskToAgent(task swarm.Task) (*Agent, error) {
// Verify task is actually running
if task.Status.State != swarm.TaskStateRunning {
return nil, fmt.Errorf("task not in running state: %s", task.Status.State)
}
// Extract container IP from network attachments
var containerIP string
for _, attachment := range task.NetworksAttachments {
// Look for the correct network
if sd.networkName != "" && !strings.Contains(attachment.Network.Spec.Name, sd.networkName) {
continue
}
// Get the first IP address from this network
if len(attachment.Addresses) > 0 {
// Addresses are in CIDR format (e.g., "10.0.13.5/24")
// Strip the subnet mask to get just the IP
containerIP = stripCIDR(attachment.Addresses[0])
break
}
}
if containerIP == "" {
return nil, fmt.Errorf("no IP address found in network attachments for task %s", task.ID)
}
// Build endpoint URL
endpoint := fmt.Sprintf("http://%s:%d", containerIP, sd.agentPort)
// Extract node information for debugging
nodeID := task.NodeID
// Create agent struct
agent := &Agent{
ID: fmt.Sprintf("chorus-agent-%s", task.ID[:12]), // Use short task ID
Name: fmt.Sprintf("CHORUS Agent (Task: %s)", task.ID[:12]),
Status: "online",
Endpoint: endpoint,
LastSeen: time.Now(),
P2PAddr: fmt.Sprintf("%s:%d", containerIP, 9000), // P2P port (future use)
Capabilities: []string{
"general_development",
"task_coordination",
"ai_integration",
"code_analysis",
"autonomous_development",
},
Model: "llama3.1:8b",
ClusterID: "docker-swarm",
}
log.Debug().
Str("task_id", task.ID[:12]).
Str("node_id", nodeID).
Str("container_ip", containerIP).
Str("endpoint", endpoint).
Msg("🤖 Converted task to agent")
return agent, nil
}
// verifyAgentHealth performs a quick health check on the agent endpoint.
// Returns true if the agent responds successfully to a health check.
func (sd *SwarmDiscovery) verifyAgentHealth(ctx context.Context, agent *Agent) bool {
client := &http.Client{
Timeout: 5 * time.Second,
}
// Try multiple health check endpoints
healthPaths := []string{"/health", "/api/health", "/api/v1/health"}
for _, path := range healthPaths {
healthURL := agent.Endpoint + path
req, err := http.NewRequestWithContext(ctx, "GET", healthURL, nil)
if err != nil {
continue
}
resp, err := client.Do(req)
if err != nil {
continue
}
resp.Body.Close()
if resp.StatusCode == http.StatusOK {
log.Debug().
Str("agent_id", agent.ID).
Str("health_url", healthURL).
Msg("✅ Agent health check passed")
return true
}
}
return false
}
// Close releases resources held by the SwarmDiscovery client
func (sd *SwarmDiscovery) Close() error {
if sd.client != nil {
return sd.client.Close()
}
return nil
}
// stripCIDR removes the subnet mask from a CIDR-formatted IP address.
// Example: "10.0.13.5/24" -> "10.0.13.5"
func stripCIDR(cidrIP string) string {
if idx := strings.Index(cidrIP, "/"); idx != -1 {
return cidrIP[:idx]
}
return cidrIP
}