Phase 1: Implement Docker Swarm API agent discovery

Replaces DNS-based discovery (2/34 agents) with Docker API enumeration
to discover ALL running CHORUS containers.

Implementation:
- NEW: internal/p2p/swarm_discovery.go (261 lines)
  * Docker API client for Swarm task enumeration
  * Extracts container IPs from network attachments
  * Optional health verification before registration
  * Comprehensive error handling and logging

- MODIFIED: internal/p2p/discovery.go (~50 lines)
  * Integrated Swarm discovery with fallback to DNS
  * New config: DISCOVERY_METHOD (swarm/dns/auto)
  * Tries Swarm first, falls back gracefully
  * Backward compatible with existing DNS discovery

- NEW: IMPLEMENTATION-SUMMARY-Phase1-Swarm-Discovery.md
  * Complete deployment guide
  * Testing checklist
  * Performance metrics
  * Phase 2 roadmap

Expected Results:
- Discovery: 34/34 agents (100% vs previous ~6%)
- Council activation: Both core roles claimed
- Task execution: Unblocked

Security:
- Read-only Docker socket mount
- No privileged mode required
- Minimal API surface (TaskList + Ping only)

Next: Build image, deploy, verify discovery, activate council

Part of hybrid approach:
- Phase 1: Docker API (this commit) 
- Phase 2: NATS migration (planned Week 3)

Related:
- /home/tony/chorus/docs/DIAGNOSIS-Agent-Discovery-And-P2P-Architecture.md
- /home/tony/chorus/docs/ARCHITECTURE-ANALYSIS-LibP2P-HMMM-Migration.md

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>
This commit is contained in:
Claude Code
2025-10-10 09:48:16 +11:00
parent 6d6241df87
commit 2826b28645
3 changed files with 958 additions and 93 deletions

View File

@@ -6,6 +6,7 @@ import (
"fmt"
"net"
"net/http"
"net/url"
"os"
"strings"
"sync"
@@ -22,17 +23,17 @@ import (
// REST endpoints to the WHOOSH UI. The omitempty tag on CurrentTeam allows agents to be
// unassigned without cluttering the JSON response with empty fields.
type Agent struct {
ID string `json:"id"` // Unique identifier (e.g., "chorus-agent-001")
Name string `json:"name"` // Human-readable name for UI display
Status string `json:"status"` // online/idle/working - current availability
Capabilities []string `json:"capabilities"` // Skills: ["go_development", "database_design"]
Model string `json:"model"` // LLM model ("llama3.1:8b", "codellama", etc.)
Endpoint string `json:"endpoint"` // HTTP API endpoint for task assignment
LastSeen time.Time `json:"last_seen"` // Timestamp of last health check response
TasksCompleted int `json:"tasks_completed"` // Performance metric for load balancing
ID string `json:"id"` // Unique identifier (e.g., "chorus-agent-001")
Name string `json:"name"` // Human-readable name for UI display
Status string `json:"status"` // online/idle/working - current availability
Capabilities []string `json:"capabilities"` // Skills: ["go_development", "database_design"]
Model string `json:"model"` // LLM model ("llama3.1:8b", "codellama", etc.)
Endpoint string `json:"endpoint"` // HTTP API endpoint for task assignment
LastSeen time.Time `json:"last_seen"` // Timestamp of last health check response
TasksCompleted int `json:"tasks_completed"` // Performance metric for load balancing
CurrentTeam string `json:"current_team,omitempty"` // Active team assignment (optional)
P2PAddr string `json:"p2p_addr"` // Peer-to-peer communication address
ClusterID string `json:"cluster_id"` // Docker Swarm cluster identifier
P2PAddr string `json:"p2p_addr"` // Peer-to-peer communication address
ClusterID string `json:"cluster_id"` // Docker Swarm cluster identifier
}
// Discovery handles P2P agent discovery for CHORUS agents within the Docker Swarm network.
@@ -44,14 +45,16 @@ type Agent struct {
// 2. Context-based cancellation for clean shutdown in Docker containers
// 3. Map storage for O(1) agent lookup by ID
// 4. Separate channels for different types of shutdown signaling
// 5. SwarmDiscovery for direct Docker API enumeration (bypasses DNS VIP limitation)
type Discovery struct {
agents map[string]*Agent // Thread-safe registry of discovered agents
mu sync.RWMutex // Protects agents map from concurrent access
listeners []net.PacketConn // UDP listeners for P2P broadcasts (future use)
stopCh chan struct{} // Channel for shutdown coordination
ctx context.Context // Context for graceful cancellation
cancel context.CancelFunc // Function to trigger context cancellation
config *DiscoveryConfig // Configuration for discovery behavior
agents map[string]*Agent // Thread-safe registry of discovered agents
mu sync.RWMutex // Protects agents map from concurrent access
listeners []net.PacketConn // UDP listeners for P2P broadcasts (future use)
stopCh chan struct{} // Channel for shutdown coordination
ctx context.Context // Context for graceful cancellation
cancel context.CancelFunc // Function to trigger context cancellation
config *DiscoveryConfig // Configuration for discovery behavior
swarmDiscovery *SwarmDiscovery // Docker Swarm API client for agent enumeration
}
// DiscoveryConfig configures discovery behavior and service endpoints
@@ -59,33 +62,49 @@ type DiscoveryConfig struct {
// Service discovery endpoints
KnownEndpoints []string `json:"known_endpoints"`
ServicePorts []int `json:"service_ports"`
// Docker Swarm discovery
DockerEnabled bool `json:"docker_enabled"`
ServiceName string `json:"service_name"`
DockerEnabled bool `json:"docker_enabled"`
DockerHost string `json:"docker_host"`
ServiceName string `json:"service_name"`
NetworkName string `json:"network_name"`
AgentPort int `json:"agent_port"`
VerifyHealth bool `json:"verify_health"`
DiscoveryMethod string `json:"discovery_method"` // "swarm", "dns", or "auto"
// Health check configuration
HealthTimeout time.Duration `json:"health_timeout"`
RetryAttempts int `json:"retry_attempts"`
// Agent filtering
RequiredCapabilities []string `json:"required_capabilities"`
RequiredCapabilities []string `json:"required_capabilities"`
MinLastSeenThreshold time.Duration `json:"min_last_seen_threshold"`
}
// DefaultDiscoveryConfig returns a sensible default configuration
func DefaultDiscoveryConfig() *DiscoveryConfig {
// Determine default discovery method from environment
discoveryMethod := os.Getenv("DISCOVERY_METHOD")
if discoveryMethod == "" {
discoveryMethod = "auto" // Try swarm first, fall back to DNS
}
return &DiscoveryConfig{
KnownEndpoints: []string{
"http://chorus:8081",
"http://chorus-agent:8081",
"http://localhost:8081",
},
ServicePorts: []int{8080, 8081, 9000},
DockerEnabled: true,
ServiceName: "chorus",
HealthTimeout: 10 * time.Second,
RetryAttempts: 3,
ServicePorts: []int{8080, 8081, 9000},
DockerEnabled: true,
DockerHost: "unix:///var/run/docker.sock",
ServiceName: "CHORUS_chorus",
NetworkName: "chorus_default",
AgentPort: 8080,
VerifyHealth: false, // Set to true for stricter discovery
DiscoveryMethod: discoveryMethod,
HealthTimeout: 10 * time.Second,
RetryAttempts: 3,
RequiredCapabilities: []string{},
MinLastSeenThreshold: 5 * time.Minute,
}
@@ -105,18 +124,58 @@ func NewDiscovery() *Discovery {
func NewDiscoveryWithConfig(config *DiscoveryConfig) *Discovery {
// Create cancellable context for graceful shutdown coordination
ctx, cancel := context.WithCancel(context.Background())
if config == nil {
config = DefaultDiscoveryConfig()
}
return &Discovery{
d := &Discovery{
agents: make(map[string]*Agent), // Initialize empty agent registry
stopCh: make(chan struct{}), // Unbuffered channel for shutdown signaling
ctx: ctx, // Parent context for all goroutines
cancel: cancel, // Cancellation function for cleanup
config: config, // Discovery configuration
}
// Initialize Docker Swarm discovery if enabled
if config.DockerEnabled && (config.DiscoveryMethod == "swarm" || config.DiscoveryMethod == "auto") {
swarmDiscovery, err := NewSwarmDiscovery(
config.DockerHost,
config.ServiceName,
config.NetworkName,
config.AgentPort,
)
if err != nil {
log.Warn().
Err(err).
Str("discovery_method", config.DiscoveryMethod).
Msg("⚠️ Failed to initialize Docker Swarm discovery, will fall back to DNS-based discovery")
} else {
d.swarmDiscovery = swarmDiscovery
log.Info().
Str("discovery_method", config.DiscoveryMethod).
Msg("✅ Docker Swarm discovery initialized")
}
}
return d
}
func normalizeAPIEndpoint(raw string) (string, string) {
parsed, err := url.Parse(raw)
if err != nil {
return raw, ""
}
host := parsed.Hostname()
if host == "" {
return raw, ""
}
scheme := parsed.Scheme
if scheme == "" {
scheme = "http"
}
apiURL := fmt.Sprintf("%s://%s:%d", scheme, host, 8080)
return apiURL, host
}
// Start begins listening for CHORUS agent P2P broadcasts and starts background services.
@@ -132,7 +191,7 @@ func (d *Discovery) Start() error {
// This continuously polls CHORUS agents via their health endpoints to
// maintain an up-to-date registry of available agents and capabilities.
go d.listenForBroadcasts()
// Launch cleanup service to remove stale agents that haven't responded
// to health checks. This prevents the UI from showing offline agents
// and ensures accurate team formation decisions.
@@ -144,14 +203,21 @@ func (d *Discovery) Start() error {
// Stop shuts down the P2P discovery service
func (d *Discovery) Stop() error {
log.Info().Msg("🔍 Stopping CHORUS P2P agent discovery")
d.cancel()
close(d.stopCh)
for _, listener := range d.listeners {
listener.Close()
}
// Close Docker Swarm discovery client
if d.swarmDiscovery != nil {
if err := d.swarmDiscovery.Close(); err != nil {
log.Warn().Err(err).Msg("Failed to close Docker Swarm discovery client")
}
}
return nil
}
@@ -159,26 +225,26 @@ func (d *Discovery) Stop() error {
func (d *Discovery) GetAgents() []*Agent {
d.mu.RLock()
defer d.mu.RUnlock()
agents := make([]*Agent, 0, len(d.agents))
for _, agent := range d.agents {
agents = append(agents, agent)
}
return agents
}
// listenForBroadcasts listens for CHORUS agent P2P broadcasts
func (d *Discovery) listenForBroadcasts() {
log.Info().Msg("🔍 Starting real CHORUS agent discovery")
// Real discovery polling every 30 seconds to avoid overwhelming the service
ticker := time.NewTicker(30 * time.Second)
defer ticker.Stop()
// Run initial discovery immediately
d.discoverRealCHORUSAgents()
for {
select {
case <-d.ctx.Done():
@@ -192,7 +258,34 @@ func (d *Discovery) listenForBroadcasts() {
// discoverRealCHORUSAgents discovers actual CHORUS agents by querying their health endpoints
func (d *Discovery) discoverRealCHORUSAgents() {
log.Debug().Msg("🔍 Discovering real CHORUS agents via health endpoints")
// Try Docker Swarm API discovery first (most reliable for production)
if d.swarmDiscovery != nil && (d.config.DiscoveryMethod == "swarm" || d.config.DiscoveryMethod == "auto") {
agents, err := d.swarmDiscovery.DiscoverAgents(d.ctx, d.config.VerifyHealth)
if err != nil {
log.Warn().
Err(err).
Str("discovery_method", d.config.DiscoveryMethod).
Msg("⚠️ Docker Swarm discovery failed, falling back to DNS-based discovery")
} else if len(agents) > 0 {
// Successfully discovered agents via Docker Swarm API
log.Info().
Int("agent_count", len(agents)).
Msg("✅ Successfully discovered agents via Docker Swarm API")
// Add all discovered agents to the registry
for _, agent := range agents {
d.addOrUpdateAgent(agent)
}
// If we're in "swarm" mode (not "auto"), return here and skip DNS discovery
if d.config.DiscoveryMethod == "swarm" {
return
}
}
}
// Fall back to DNS-based discovery methods
// Query multiple potential CHORUS services
d.queryActualCHORUSService()
d.discoverDockerSwarmAgents()
@@ -203,7 +296,7 @@ func (d *Discovery) discoverRealCHORUSAgents() {
// This function replaces the previous simulation and discovers only what's actually running.
func (d *Discovery) queryActualCHORUSService() {
client := &http.Client{Timeout: 10 * time.Second}
// Try to query the CHORUS health endpoint
endpoint := "http://chorus:8081/health"
resp, err := client.Get(endpoint)
@@ -215,7 +308,7 @@ func (d *Discovery) queryActualCHORUSService() {
return
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK {
log.Debug().
Int("status_code", resp.StatusCode).
@@ -223,7 +316,7 @@ func (d *Discovery) queryActualCHORUSService() {
Msg("CHORUS health endpoint returned non-200 status")
return
}
// CHORUS is responding, so create a single agent entry for the actual instance
agentID := "chorus-agent-001"
agent := &Agent{
@@ -232,7 +325,7 @@ func (d *Discovery) queryActualCHORUSService() {
Status: "online",
Capabilities: []string{
"general_development",
"task_coordination",
"task_coordination",
"ai_integration",
"code_analysis",
"autonomous_development",
@@ -244,11 +337,11 @@ func (d *Discovery) queryActualCHORUSService() {
P2PAddr: "chorus:9000",
ClusterID: "docker-unified-stack",
}
// Check if CHORUS has an API endpoint that provides more detailed info
// For now, we'll just use the single discovered instance
d.addOrUpdateAgent(agent)
log.Info().
Str("agent_id", agentID).
Str("endpoint", endpoint).
@@ -259,7 +352,7 @@ func (d *Discovery) queryActualCHORUSService() {
func (d *Discovery) addOrUpdateAgent(agent *Agent) {
d.mu.Lock()
defer d.mu.Unlock()
existing, exists := d.agents[agent.ID]
if exists {
// Update existing agent
@@ -281,7 +374,7 @@ func (d *Discovery) addOrUpdateAgent(agent *Agent) {
func (d *Discovery) cleanupStaleAgents() {
ticker := time.NewTicker(60 * time.Second)
defer ticker.Stop()
for {
select {
case <-d.ctx.Done():
@@ -296,9 +389,9 @@ func (d *Discovery) cleanupStaleAgents() {
func (d *Discovery) removeStaleAgents() {
d.mu.Lock()
defer d.mu.Unlock()
staleThreshold := time.Now().Add(-5 * time.Minute)
for id, agent := range d.agents {
if agent.LastSeen.Before(staleThreshold) {
delete(d.agents, id)
@@ -319,10 +412,10 @@ func (d *Discovery) discoverDockerSwarmAgents() {
// Query Docker Swarm API to find running services
// For production deployment, this would query the Docker API
// For MVP, we'll check for service-specific health endpoints
servicePorts := d.config.ServicePorts
serviceHosts := []string{"chorus", "chorus-agent", d.config.ServiceName}
for _, host := range serviceHosts {
for _, port := range servicePorts {
d.checkServiceEndpoint(host, port)
@@ -335,7 +428,7 @@ func (d *Discovery) discoverKnownEndpoints() {
for _, endpoint := range d.config.KnownEndpoints {
d.queryServiceEndpoint(endpoint)
}
// Check environment variables for additional endpoints
if endpoints := os.Getenv("CHORUS_DISCOVERY_ENDPOINTS"); endpoints != "" {
for _, endpoint := range strings.Split(endpoints, ",") {
@@ -356,10 +449,10 @@ func (d *Discovery) checkServiceEndpoint(host string, port int) {
// queryServiceEndpoint attempts to discover a CHORUS agent at the given endpoint
func (d *Discovery) queryServiceEndpoint(endpoint string) {
client := &http.Client{Timeout: d.config.HealthTimeout}
// Try multiple health check paths
healthPaths := []string{"/health", "/api/health", "/api/v1/health", "/status"}
for _, path := range healthPaths {
fullURL := endpoint + path
resp, err := client.Get(fullURL)
@@ -370,7 +463,7 @@ func (d *Discovery) queryServiceEndpoint(endpoint string) {
Msg("Failed to reach service endpoint")
continue
}
if resp.StatusCode == http.StatusOK {
d.processServiceResponse(endpoint, resp)
resp.Body.Close()
@@ -384,36 +477,42 @@ func (d *Discovery) queryServiceEndpoint(endpoint string) {
func (d *Discovery) processServiceResponse(endpoint string, resp *http.Response) {
// Try to parse response for agent metadata
var agentInfo struct {
ID string `json:"id"`
Name string `json:"name"`
Status string `json:"status"`
Capabilities []string `json:"capabilities"`
Model string `json:"model"`
ID string `json:"id"`
Name string `json:"name"`
Status string `json:"status"`
Capabilities []string `json:"capabilities"`
Model string `json:"model"`
Metadata map[string]interface{} `json:"metadata"`
}
if err := json.NewDecoder(resp.Body).Decode(&agentInfo); err != nil {
// If parsing fails, create a basic agent entry
d.createBasicAgentFromEndpoint(endpoint)
return
}
apiEndpoint, host := normalizeAPIEndpoint(endpoint)
p2pAddr := endpoint
if host != "" {
p2pAddr = fmt.Sprintf("%s:%d", host, 9000)
}
// Create detailed agent from parsed info
agent := &Agent{
ID: agentInfo.ID,
Name: agentInfo.Name,
Status: agentInfo.Status,
ID: agentInfo.ID,
Name: agentInfo.Name,
Status: agentInfo.Status,
Capabilities: agentInfo.Capabilities,
Model: agentInfo.Model,
Endpoint: endpoint,
LastSeen: time.Now(),
P2PAddr: endpoint,
ClusterID: "docker-unified-stack",
Model: agentInfo.Model,
Endpoint: apiEndpoint,
LastSeen: time.Now(),
P2PAddr: p2pAddr,
ClusterID: "docker-unified-stack",
}
// Set defaults if fields are empty
if agent.ID == "" {
agent.ID = fmt.Sprintf("chorus-agent-%s", strings.ReplaceAll(endpoint, ":", "-"))
agent.ID = fmt.Sprintf("chorus-agent-%s", strings.ReplaceAll(apiEndpoint, ":", "-"))
}
if agent.Name == "" {
agent.Name = "CHORUS Agent"
@@ -424,7 +523,7 @@ func (d *Discovery) processServiceResponse(endpoint string, resp *http.Response)
if len(agent.Capabilities) == 0 {
agent.Capabilities = []string{
"general_development",
"task_coordination",
"task_coordination",
"ai_integration",
"code_analysis",
"autonomous_development",
@@ -433,9 +532,9 @@ func (d *Discovery) processServiceResponse(endpoint string, resp *http.Response)
if agent.Model == "" {
agent.Model = "llama3.1:8b"
}
d.addOrUpdateAgent(agent)
log.Info().
Str("agent_id", agent.ID).
Str("endpoint", endpoint).
@@ -444,27 +543,33 @@ func (d *Discovery) processServiceResponse(endpoint string, resp *http.Response)
// createBasicAgentFromEndpoint creates a basic agent entry when detailed info isn't available
func (d *Discovery) createBasicAgentFromEndpoint(endpoint string) {
agentID := fmt.Sprintf("chorus-agent-%s", strings.ReplaceAll(endpoint, ":", "-"))
apiEndpoint, host := normalizeAPIEndpoint(endpoint)
agentID := fmt.Sprintf("chorus-agent-%s", strings.ReplaceAll(apiEndpoint, ":", "-"))
p2pAddr := endpoint
if host != "" {
p2pAddr = fmt.Sprintf("%s:%d", host, 9000)
}
agent := &Agent{
ID: agentID,
Name: "CHORUS Agent",
Status: "online",
Capabilities: []string{
"general_development",
"task_coordination",
"task_coordination",
"ai_integration",
},
Model: "llama3.1:8b",
Endpoint: endpoint,
Endpoint: apiEndpoint,
LastSeen: time.Now(),
TasksCompleted: 0,
P2PAddr: endpoint,
P2PAddr: p2pAddr,
ClusterID: "docker-unified-stack",
}
d.addOrUpdateAgent(agent)
log.Info().
Str("agent_id", agentID).
Str("endpoint", endpoint).
@@ -473,12 +578,12 @@ func (d *Discovery) createBasicAgentFromEndpoint(endpoint string) {
// AgentHealthResponse represents the expected health response format
type AgentHealthResponse struct {
ID string `json:"id"`
Name string `json:"name"`
Status string `json:"status"`
Capabilities []string `json:"capabilities"`
Model string `json:"model"`
LastSeen time.Time `json:"last_seen"`
TasksCompleted int `json:"tasks_completed"`
Metadata map[string]interface{} `json:"metadata"`
}
ID string `json:"id"`
Name string `json:"name"`
Status string `json:"status"`
Capabilities []string `json:"capabilities"`
Model string `json:"model"`
LastSeen time.Time `json:"last_seen"`
TasksCompleted int `json:"tasks_completed"`
Metadata map[string]interface{} `json:"metadata"`
}