package p2p import ( "context" "fmt" "net/http" "strings" "time" "github.com/docker/docker/api/types" "github.com/docker/docker/api/types/filters" "github.com/docker/docker/api/types/swarm" "github.com/docker/docker/client" "github.com/rs/zerolog/log" ) // SwarmDiscovery handles Docker Swarm-based agent discovery by directly querying // the Docker API to enumerate all running tasks for the CHORUS service. // This approach solves the DNS VIP limitation where only 2 of 34 agents are discovered. // // Design rationale: // - Docker Swarm DNS returns a single VIP that load-balances to random containers // - We need to discover ALL containers, not just the ones we randomly connect to // - By querying the Docker API directly, we can enumerate all running tasks // - Each task has a network attachment with the actual container IP type SwarmDiscovery struct { client *client.Client serviceName string networkName string agentPort int } // NewSwarmDiscovery creates a new Docker Swarm-based discovery client. // The dockerHost parameter should be "unix:///var/run/docker.sock" in production. func NewSwarmDiscovery(dockerHost, serviceName, networkName string, agentPort int) (*SwarmDiscovery, error) { // Create Docker client with environment defaults if dockerHost is empty opts := []client.Opt{ client.FromEnv, client.WithAPIVersionNegotiation(), } if dockerHost != "" { opts = append(opts, client.WithHost(dockerHost)) } cli, err := client.NewClientWithOpts(opts...) if err != nil { return nil, fmt.Errorf("failed to create Docker client: %w", err) } // Verify we can connect to Docker API ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) defer cancel() if _, err := cli.Ping(ctx); err != nil { return nil, fmt.Errorf("failed to ping Docker API: %w", err) } log.Info(). Str("docker_host", dockerHost). Str("service_name", serviceName). Str("network_name", networkName). Int("agent_port", agentPort). Msg("✅ Docker Swarm discovery client initialized") return &SwarmDiscovery{ client: cli, serviceName: serviceName, networkName: networkName, agentPort: agentPort, }, nil } // DiscoverAgents queries the Docker Swarm API to find all running CHORUS agent containers. // It returns a slice of Agent structs with endpoints constructed from container IPs. // // Implementation details: // 1. List all tasks for the specified service // 2. Filter for tasks in "running" desired state // 3. Extract container IPs from network attachments // 4. Build HTTP endpoints (http://:) // 5. Optionally verify agents are responsive via health check func (sd *SwarmDiscovery) DiscoverAgents(ctx context.Context, verifyHealth bool) ([]*Agent, error) { log.Debug(). Str("service_name", sd.serviceName). Bool("verify_health", verifyHealth). Msg("🔍 Starting Docker Swarm agent discovery") // List all tasks for the CHORUS service taskFilters := filters.NewArgs() taskFilters.Add("service", sd.serviceName) taskFilters.Add("desired-state", "running") tasks, err := sd.client.TaskList(ctx, types.TaskListOptions{ Filters: taskFilters, }) if err != nil { return nil, fmt.Errorf("failed to list Docker tasks: %w", err) } if len(tasks) == 0 { log.Warn(). Str("service_name", sd.serviceName). Msg("⚠️ No running tasks found for CHORUS service") return []*Agent{}, nil } log.Debug(). Int("task_count", len(tasks)). Msg("📋 Found Docker Swarm tasks") agents := make([]*Agent, 0, len(tasks)) for _, task := range tasks { agent, err := sd.taskToAgent(task) if err != nil { log.Warn(). Err(err). Str("task_id", task.ID). Msg("⚠️ Failed to convert task to agent") continue } // Optionally verify the agent is responsive if verifyHealth { if !sd.verifyAgentHealth(ctx, agent) { log.Debug(). Str("agent_id", agent.ID). Str("endpoint", agent.Endpoint). Msg("⚠️ Agent health check failed, skipping") continue } } agents = append(agents, agent) } log.Info(). Int("discovered_count", len(agents)). Int("total_tasks", len(tasks)). Msg("✅ Docker Swarm agent discovery completed") return agents, nil } // taskToAgent converts a Docker Swarm task to an Agent struct. // It extracts the container IP from network attachments and builds the agent endpoint. func (sd *SwarmDiscovery) taskToAgent(task swarm.Task) (*Agent, error) { // Verify task is actually running if task.Status.State != swarm.TaskStateRunning { return nil, fmt.Errorf("task not in running state: %s", task.Status.State) } // Extract container IP from network attachments var containerIP string for _, attachment := range task.NetworksAttachments { // Look for the correct network if sd.networkName != "" && !strings.Contains(attachment.Network.Spec.Name, sd.networkName) { continue } // Get the first IP address from this network if len(attachment.Addresses) > 0 { // Addresses are in CIDR format (e.g., "10.0.13.5/24") // Strip the subnet mask to get just the IP containerIP = stripCIDR(attachment.Addresses[0]) break } } if containerIP == "" { return nil, fmt.Errorf("no IP address found in network attachments for task %s", task.ID) } // Build endpoint URL endpoint := fmt.Sprintf("http://%s:%d", containerIP, sd.agentPort) // Extract node information for debugging nodeID := task.NodeID // Create agent struct agent := &Agent{ ID: fmt.Sprintf("chorus-agent-%s", task.ID[:12]), // Use short task ID Name: fmt.Sprintf("CHORUS Agent (Task: %s)", task.ID[:12]), Status: "online", Endpoint: endpoint, LastSeen: time.Now(), P2PAddr: fmt.Sprintf("%s:%d", containerIP, 9000), // P2P port (future use) Capabilities: []string{ "general_development", "task_coordination", "ai_integration", "code_analysis", "autonomous_development", }, Model: "llama3.1:8b", ClusterID: "docker-swarm", } log.Debug(). Str("task_id", task.ID[:12]). Str("node_id", nodeID). Str("container_ip", containerIP). Str("endpoint", endpoint). Msg("🤖 Converted task to agent") return agent, nil } // verifyAgentHealth performs a quick health check on the agent endpoint. // Returns true if the agent responds successfully to a health check. func (sd *SwarmDiscovery) verifyAgentHealth(ctx context.Context, agent *Agent) bool { client := &http.Client{ Timeout: 5 * time.Second, } // Try multiple health check endpoints healthPaths := []string{"/health", "/api/health", "/api/v1/health"} for _, path := range healthPaths { healthURL := agent.Endpoint + path req, err := http.NewRequestWithContext(ctx, "GET", healthURL, nil) if err != nil { continue } resp, err := client.Do(req) if err != nil { continue } resp.Body.Close() if resp.StatusCode == http.StatusOK { log.Debug(). Str("agent_id", agent.ID). Str("health_url", healthURL). Msg("✅ Agent health check passed") return true } } return false } // Close releases resources held by the SwarmDiscovery client func (sd *SwarmDiscovery) Close() error { if sd.client != nil { return sd.client.Close() } return nil } // stripCIDR removes the subnet mask from a CIDR-formatted IP address. // Example: "10.0.13.5/24" -> "10.0.13.5" func stripCIDR(cidrIP string) string { if idx := strings.Index(cidrIP, "/"); idx != -1 { return cidrIP[:idx] } return cidrIP }