package p2p import ( "context" "encoding/json" "fmt" "net" "net/http" "os" "strings" "sync" "time" "github.com/rs/zerolog/log" ) // Agent represents a CHORUS agent discovered via P2P networking within the Docker Swarm cluster. // This struct defines the complete metadata we track for each AI agent, enabling intelligent // team formation and workload distribution. // // Design decision: We use JSON tags for API serialization since this data is exposed via // REST endpoints to the WHOOSH UI. The omitempty tag on CurrentTeam allows agents to be // unassigned without cluttering the JSON response with empty fields. type Agent struct { ID string `json:"id"` // Unique identifier (e.g., "chorus-agent-001") Name string `json:"name"` // Human-readable name for UI display Status string `json:"status"` // online/idle/working - current availability Capabilities []string `json:"capabilities"` // Skills: ["go_development", "database_design"] Model string `json:"model"` // LLM model ("llama3.1:8b", "codellama", etc.) Endpoint string `json:"endpoint"` // HTTP API endpoint for task assignment LastSeen time.Time `json:"last_seen"` // Timestamp of last health check response TasksCompleted int `json:"tasks_completed"` // Performance metric for load balancing CurrentTeam string `json:"current_team,omitempty"` // Active team assignment (optional) P2PAddr string `json:"p2p_addr"` // Peer-to-peer communication address ClusterID string `json:"cluster_id"` // Docker Swarm cluster identifier } // Discovery handles P2P agent discovery for CHORUS agents within the Docker Swarm network. // This service maintains a real-time registry of available agents and their capabilities, // enabling the WHOOSH orchestrator to make intelligent team formation decisions. // // Design decisions: // 1. RWMutex for thread-safe concurrent access (many readers, few writers) // 2. Context-based cancellation for clean shutdown in Docker containers // 3. Map storage for O(1) agent lookup by ID // 4. Separate channels for different types of shutdown signaling type Discovery struct { agents map[string]*Agent // Thread-safe registry of discovered agents mu sync.RWMutex // Protects agents map from concurrent access listeners []net.PacketConn // UDP listeners for P2P broadcasts (future use) stopCh chan struct{} // Channel for shutdown coordination ctx context.Context // Context for graceful cancellation cancel context.CancelFunc // Function to trigger context cancellation config *DiscoveryConfig // Configuration for discovery behavior } // DiscoveryConfig configures discovery behavior and service endpoints type DiscoveryConfig struct { // Service discovery endpoints KnownEndpoints []string `json:"known_endpoints"` ServicePorts []int `json:"service_ports"` // Docker Swarm discovery DockerEnabled bool `json:"docker_enabled"` ServiceName string `json:"service_name"` // Health check configuration HealthTimeout time.Duration `json:"health_timeout"` RetryAttempts int `json:"retry_attempts"` // Agent filtering RequiredCapabilities []string `json:"required_capabilities"` MinLastSeenThreshold time.Duration `json:"min_last_seen_threshold"` } // DefaultDiscoveryConfig returns a sensible default configuration func DefaultDiscoveryConfig() *DiscoveryConfig { return &DiscoveryConfig{ KnownEndpoints: []string{ "http://chorus:8081", "http://chorus-agent:8081", "http://localhost:8081", }, ServicePorts: []int{8080, 8081, 9000}, DockerEnabled: true, ServiceName: "chorus", HealthTimeout: 10 * time.Second, RetryAttempts: 3, RequiredCapabilities: []string{}, MinLastSeenThreshold: 5 * time.Minute, } } // NewDiscovery creates a new P2P discovery service with proper initialization. // This constructor ensures all channels and contexts are properly set up for // concurrent operation within the Docker Swarm environment. // // Implementation decision: We use context.WithCancel rather than a timeout context // because agent discovery should run indefinitely until explicitly stopped. func NewDiscovery() *Discovery { return NewDiscoveryWithConfig(DefaultDiscoveryConfig()) } // NewDiscoveryWithConfig creates a new P2P discovery service with custom configuration func NewDiscoveryWithConfig(config *DiscoveryConfig) *Discovery { // Create cancellable context for graceful shutdown coordination ctx, cancel := context.WithCancel(context.Background()) if config == nil { config = DefaultDiscoveryConfig() } return &Discovery{ agents: make(map[string]*Agent), // Initialize empty agent registry stopCh: make(chan struct{}), // Unbuffered channel for shutdown signaling ctx: ctx, // Parent context for all goroutines cancel: cancel, // Cancellation function for cleanup config: config, // Discovery configuration } } // Start begins listening for CHORUS agent P2P broadcasts and starts background services. // This method launches goroutines for agent discovery and cleanup, enabling real-time // monitoring of the CHORUS agent ecosystem. // // Implementation decision: We use goroutines rather than a worker pool because the // workload is I/O bound (HTTP health checks) and we want immediate responsiveness. func (d *Discovery) Start() error { log.Info().Msg("๐Ÿ” Starting CHORUS P2P agent discovery") // Launch agent discovery in separate goroutine to avoid blocking startup. // This continuously polls CHORUS agents via their health endpoints to // maintain an up-to-date registry of available agents and capabilities. go d.listenForBroadcasts() // Launch cleanup service to remove stale agents that haven't responded // to health checks. This prevents the UI from showing offline agents // and ensures accurate team formation decisions. go d.cleanupStaleAgents() return nil // Always succeeds since goroutines handle errors internally } // Stop shuts down the P2P discovery service func (d *Discovery) Stop() error { log.Info().Msg("๐Ÿ” Stopping CHORUS P2P agent discovery") d.cancel() close(d.stopCh) for _, listener := range d.listeners { listener.Close() } return nil } // GetAgents returns all currently discovered agents func (d *Discovery) GetAgents() []*Agent { d.mu.RLock() defer d.mu.RUnlock() agents := make([]*Agent, 0, len(d.agents)) for _, agent := range d.agents { agents = append(agents, agent) } return agents } // listenForBroadcasts listens for CHORUS agent P2P broadcasts func (d *Discovery) listenForBroadcasts() { log.Info().Msg("๐Ÿ” Starting real CHORUS agent discovery") // Real discovery polling every 30 seconds to avoid overwhelming the service ticker := time.NewTicker(30 * time.Second) defer ticker.Stop() // Run initial discovery immediately d.discoverRealCHORUSAgents() for { select { case <-d.ctx.Done(): return case <-ticker.C: d.discoverRealCHORUSAgents() } } } // discoverRealCHORUSAgents discovers actual CHORUS agents by querying their health endpoints func (d *Discovery) discoverRealCHORUSAgents() { log.Debug().Msg("๐Ÿ” Discovering real CHORUS agents via health endpoints") // Query multiple potential CHORUS services d.queryActualCHORUSService() d.discoverDockerSwarmAgents() d.discoverKnownEndpoints() } // queryActualCHORUSService queries the real CHORUS service to discover actual running agents. // This function replaces the previous simulation and discovers only what's actually running. func (d *Discovery) queryActualCHORUSService() { client := &http.Client{Timeout: 10 * time.Second} // Try to query the CHORUS health endpoint endpoint := "http://chorus:8081/health" resp, err := client.Get(endpoint) if err != nil { log.Debug(). Err(err). Str("endpoint", endpoint). Msg("Failed to reach CHORUS health endpoint") return } defer resp.Body.Close() if resp.StatusCode != http.StatusOK { log.Debug(). Int("status_code", resp.StatusCode). Str("endpoint", endpoint). Msg("CHORUS health endpoint returned non-200 status") return } // CHORUS is responding, so create a single agent entry for the actual instance agentID := "chorus-agent-001" agent := &Agent{ ID: agentID, Name: "CHORUS Agent", Status: "online", Capabilities: []string{ "general_development", "task_coordination", "ai_integration", "code_analysis", "autonomous_development", }, Model: "llama3.1:8b", Endpoint: "http://chorus:8080", LastSeen: time.Now(), TasksCompleted: 0, // Will be updated by actual task completion tracking P2PAddr: "chorus:9000", ClusterID: "docker-unified-stack", } // Check if CHORUS has an API endpoint that provides more detailed info // For now, we'll just use the single discovered instance d.addOrUpdateAgent(agent) log.Info(). Str("agent_id", agentID). Str("endpoint", endpoint). Msg("๐Ÿค– Discovered real CHORUS agent") } // addOrUpdateAgent adds or updates an agent in the discovery cache func (d *Discovery) addOrUpdateAgent(agent *Agent) { d.mu.Lock() defer d.mu.Unlock() existing, exists := d.agents[agent.ID] if exists { // Update existing agent existing.Status = agent.Status existing.LastSeen = agent.LastSeen existing.TasksCompleted = agent.TasksCompleted existing.CurrentTeam = agent.CurrentTeam } else { // Add new agent d.agents[agent.ID] = agent log.Info(). Str("agent_id", agent.ID). Str("p2p_addr", agent.P2PAddr). Msg("๐Ÿค– Discovered new CHORUS agent") } } // cleanupStaleAgents removes agents that haven't been seen recently func (d *Discovery) cleanupStaleAgents() { ticker := time.NewTicker(60 * time.Second) defer ticker.Stop() for { select { case <-d.ctx.Done(): return case <-ticker.C: d.removeStaleAgents() } } } // removeStaleAgents removes agents that haven't been seen in 5 minutes func (d *Discovery) removeStaleAgents() { d.mu.Lock() defer d.mu.Unlock() staleThreshold := time.Now().Add(-5 * time.Minute) for id, agent := range d.agents { if agent.LastSeen.Before(staleThreshold) { delete(d.agents, id) log.Info(). Str("agent_id", id). Time("last_seen", agent.LastSeen). Msg("๐Ÿงน Removed stale agent") } } } // discoverDockerSwarmAgents discovers CHORUS agents running in Docker Swarm func (d *Discovery) discoverDockerSwarmAgents() { if !d.config.DockerEnabled { return } // Query Docker Swarm API to find running services // For production deployment, this would query the Docker API // For MVP, we'll check for service-specific health endpoints servicePorts := d.config.ServicePorts serviceHosts := []string{"chorus", "chorus-agent", d.config.ServiceName} for _, host := range serviceHosts { for _, port := range servicePorts { d.checkServiceEndpoint(host, port) } } } // discoverKnownEndpoints checks configured known endpoints for CHORUS agents func (d *Discovery) discoverKnownEndpoints() { for _, endpoint := range d.config.KnownEndpoints { d.queryServiceEndpoint(endpoint) } // Check environment variables for additional endpoints if endpoints := os.Getenv("CHORUS_DISCOVERY_ENDPOINTS"); endpoints != "" { for _, endpoint := range strings.Split(endpoints, ",") { endpoint = strings.TrimSpace(endpoint) if endpoint != "" { d.queryServiceEndpoint(endpoint) } } } } // checkServiceEndpoint checks a specific host:port combination for a CHORUS agent func (d *Discovery) checkServiceEndpoint(host string, port int) { endpoint := fmt.Sprintf("http://%s:%d", host, port) d.queryServiceEndpoint(endpoint) } // queryServiceEndpoint attempts to discover a CHORUS agent at the given endpoint func (d *Discovery) queryServiceEndpoint(endpoint string) { client := &http.Client{Timeout: d.config.HealthTimeout} // Try multiple health check paths healthPaths := []string{"/health", "/api/health", "/api/v1/health", "/status"} for _, path := range healthPaths { fullURL := endpoint + path resp, err := client.Get(fullURL) if err != nil { log.Debug(). Err(err). Str("endpoint", fullURL). Msg("Failed to reach service endpoint") continue } if resp.StatusCode == http.StatusOK { d.processServiceResponse(endpoint, resp) resp.Body.Close() return // Found working endpoint } resp.Body.Close() } } // processServiceResponse processes a successful health check response func (d *Discovery) processServiceResponse(endpoint string, resp *http.Response) { // Try to parse response for agent metadata var agentInfo struct { ID string `json:"id"` Name string `json:"name"` Status string `json:"status"` Capabilities []string `json:"capabilities"` Model string `json:"model"` Metadata map[string]interface{} `json:"metadata"` } if err := json.NewDecoder(resp.Body).Decode(&agentInfo); err != nil { // If parsing fails, create a basic agent entry d.createBasicAgentFromEndpoint(endpoint) return } // Create detailed agent from parsed info agent := &Agent{ ID: agentInfo.ID, Name: agentInfo.Name, Status: agentInfo.Status, Capabilities: agentInfo.Capabilities, Model: agentInfo.Model, Endpoint: endpoint, LastSeen: time.Now(), P2PAddr: endpoint, ClusterID: "docker-unified-stack", } // Set defaults if fields are empty if agent.ID == "" { agent.ID = fmt.Sprintf("chorus-agent-%s", strings.ReplaceAll(endpoint, ":", "-")) } if agent.Name == "" { agent.Name = "CHORUS Agent" } if agent.Status == "" { agent.Status = "online" } if len(agent.Capabilities) == 0 { agent.Capabilities = []string{ "general_development", "task_coordination", "ai_integration", "code_analysis", "autonomous_development", } } if agent.Model == "" { agent.Model = "llama3.1:8b" } d.addOrUpdateAgent(agent) log.Info(). Str("agent_id", agent.ID). Str("endpoint", endpoint). Msg("๐Ÿค– Discovered CHORUS agent with metadata") } // createBasicAgentFromEndpoint creates a basic agent entry when detailed info isn't available func (d *Discovery) createBasicAgentFromEndpoint(endpoint string) { agentID := fmt.Sprintf("chorus-agent-%s", strings.ReplaceAll(endpoint, ":", "-")) agent := &Agent{ ID: agentID, Name: "CHORUS Agent", Status: "online", Capabilities: []string{ "general_development", "task_coordination", "ai_integration", }, Model: "llama3.1:8b", Endpoint: endpoint, LastSeen: time.Now(), TasksCompleted: 0, P2PAddr: endpoint, ClusterID: "docker-unified-stack", } d.addOrUpdateAgent(agent) log.Info(). Str("agent_id", agentID). Str("endpoint", endpoint). Msg("๐Ÿค– Discovered basic CHORUS agent") } // AgentHealthResponse represents the expected health response format type AgentHealthResponse struct { ID string `json:"id"` Name string `json:"name"` Status string `json:"status"` Capabilities []string `json:"capabilities"` Model string `json:"model"` LastSeen time.Time `json:"last_seen"` TasksCompleted int `json:"tasks_completed"` Metadata map[string]interface{} `json:"metadata"` }