 3373f7b462
			
		
	
	3373f7b462
	
	
		
			
	
		
	
	**Problem**: The standardized label set was missing the `chorus-entrypoint` label, which is present in CHORUS repository and required for triggering council formation for project kickoffs. **Changes**: - Added `chorus-entrypoint` label (#ff6b6b) to `EnsureRequiredLabels()` in `internal/gitea/client.go` - Now creates 9 standard labels (was 8): 1. bug 2. bzzz-task 3. chorus-entrypoint (NEW) 4. duplicate 5. enhancement 6. help wanted 7. invalid 8. question 9. wontfix **Testing**: - Rebuilt and deployed WHOOSH with updated label configuration - Synced labels to all 5 monitored repositories (whoosh-ui, SequentialThinkingForCHORUS, TEST, WHOOSH, CHORUS) - Verified all repositories now have complete 9-label set **Impact**: All CHORUS ecosystem repositories now have consistent labeling matching the CHORUS repository standard, enabling proper council formation triggers. 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude <noreply@anthropic.com>
		
			
				
	
	
		
			600 lines
		
	
	
		
			19 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
			
		
		
	
	
			600 lines
		
	
	
		
			19 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
| package p2p
 | |
| 
 | |
| import (
 | |
| 	"context"
 | |
| 	"encoding/json"
 | |
| 	"fmt"
 | |
| 	"net"
 | |
| 	"net/http"
 | |
| 	"net/url"
 | |
| 	"os"
 | |
| 	"strings"
 | |
| 	"sync"
 | |
| 	"time"
 | |
| 
 | |
| 	"github.com/rs/zerolog/log"
 | |
| )
 | |
| 
 | |
| // Agent represents a CHORUS agent discovered via P2P networking within the Docker Swarm cluster.
 | |
| // This struct defines the complete metadata we track for each AI agent, enabling intelligent
 | |
| // team formation and workload distribution.
 | |
| //
 | |
| // Design decision: We use JSON tags for API serialization since this data is exposed via
 | |
| // REST endpoints to the WHOOSH UI. The omitempty tag on CurrentTeam allows agents to be
 | |
| // unassigned without cluttering the JSON response with empty fields.
 | |
| type Agent struct {
 | |
| 	ID             string    `json:"id"`                     // Unique identifier (e.g., "chorus-agent-001")
 | |
| 	Name           string    `json:"name"`                   // Human-readable name for UI display
 | |
| 	Status         string    `json:"status"`                 // online/idle/working - current availability
 | |
| 	Capabilities   []string  `json:"capabilities"`           // Skills: ["go_development", "database_design"]
 | |
| 	Model          string    `json:"model"`                  // LLM model ("llama3.1:8b", "codellama", etc.)
 | |
| 	Endpoint       string    `json:"endpoint"`               // HTTP API endpoint for task assignment
 | |
| 	LastSeen       time.Time `json:"last_seen"`              // Timestamp of last health check response
 | |
| 	TasksCompleted int       `json:"tasks_completed"`        // Performance metric for load balancing
 | |
| 	CurrentTeam    string    `json:"current_team,omitempty"` // Active team assignment (optional)
 | |
| 	P2PAddr        string    `json:"p2p_addr"`               // Peer-to-peer communication address
 | |
| 	PeerID         string    `json:"peer_id"`                // libp2p peer ID for bootstrap coordination
 | |
| 	ClusterID      string    `json:"cluster_id"`             // Docker Swarm cluster identifier
 | |
| }
 | |
| 
 | |
| // Discovery handles P2P agent discovery for CHORUS agents within the Docker Swarm network.
 | |
| // This service maintains a real-time registry of available agents and their capabilities,
 | |
| // enabling the WHOOSH orchestrator to make intelligent team formation decisions.
 | |
| //
 | |
| // Design decisions:
 | |
| // 1. RWMutex for thread-safe concurrent access (many readers, few writers)
 | |
| // 2. Context-based cancellation for clean shutdown in Docker containers
 | |
| // 3. Map storage for O(1) agent lookup by ID
 | |
| // 4. Separate channels for different types of shutdown signaling
 | |
| // 5. SwarmDiscovery for direct Docker API enumeration (bypasses DNS VIP limitation)
 | |
| type Discovery struct {
 | |
| 	agents         map[string]*Agent  // Thread-safe registry of discovered agents
 | |
| 	mu             sync.RWMutex       // Protects agents map from concurrent access
 | |
| 	listeners      []net.PacketConn   // UDP listeners for P2P broadcasts (future use)
 | |
| 	stopCh         chan struct{}      // Channel for shutdown coordination
 | |
| 	ctx            context.Context    // Context for graceful cancellation
 | |
| 	cancel         context.CancelFunc // Function to trigger context cancellation
 | |
| 	config         *DiscoveryConfig   // Configuration for discovery behavior
 | |
| 	swarmDiscovery *SwarmDiscovery    // Docker Swarm API client for agent enumeration
 | |
| }
 | |
| 
 | |
| // DiscoveryConfig configures discovery behavior and service endpoints
 | |
| type DiscoveryConfig struct {
 | |
| 	// Service discovery endpoints
 | |
| 	KnownEndpoints []string `json:"known_endpoints"`
 | |
| 	ServicePorts   []int    `json:"service_ports"`
 | |
| 
 | |
| 	// Docker Swarm discovery
 | |
| 	DockerEnabled   bool   `json:"docker_enabled"`
 | |
| 	DockerHost      string `json:"docker_host"`
 | |
| 	ServiceName     string `json:"service_name"`
 | |
| 	NetworkName     string `json:"network_name"`
 | |
| 	AgentPort       int    `json:"agent_port"`
 | |
| 	VerifyHealth    bool   `json:"verify_health"`
 | |
| 	DiscoveryMethod string `json:"discovery_method"` // "swarm", "dns", or "auto"
 | |
| 
 | |
| 	// Health check configuration
 | |
| 	HealthTimeout time.Duration `json:"health_timeout"`
 | |
| 	RetryAttempts int           `json:"retry_attempts"`
 | |
| 
 | |
| 	// Agent filtering
 | |
| 	RequiredCapabilities []string      `json:"required_capabilities"`
 | |
| 	MinLastSeenThreshold time.Duration `json:"min_last_seen_threshold"`
 | |
| }
 | |
| 
 | |
| // DefaultDiscoveryConfig returns a sensible default configuration
 | |
| func DefaultDiscoveryConfig() *DiscoveryConfig {
 | |
| 	// Determine default discovery method from environment
 | |
| 	discoveryMethod := os.Getenv("DISCOVERY_METHOD")
 | |
| 	if discoveryMethod == "" {
 | |
| 		discoveryMethod = "auto" // Try swarm first, fall back to DNS
 | |
| 	}
 | |
| 
 | |
| 	return &DiscoveryConfig{
 | |
| 		KnownEndpoints: []string{
 | |
| 			"http://chorus:8081",
 | |
| 			"http://chorus-agent:8081",
 | |
| 			"http://localhost:8081",
 | |
| 		},
 | |
| 		ServicePorts:         []int{8080, 8081, 9000},
 | |
| 		DockerEnabled:        true,
 | |
| 		DockerHost:           "unix:///var/run/docker.sock",
 | |
| 		ServiceName:          "CHORUS_chorus",
 | |
| 		NetworkName:          "chorus_net", // Match CHORUS_chorus_net (service prefix added automatically)
 | |
| 		AgentPort:            8080,
 | |
| 		VerifyHealth:         false, // Set to true for stricter discovery
 | |
| 		DiscoveryMethod:      discoveryMethod,
 | |
| 		HealthTimeout:        10 * time.Second,
 | |
| 		RetryAttempts:        3,
 | |
| 		RequiredCapabilities: []string{},
 | |
| 		MinLastSeenThreshold: 5 * time.Minute,
 | |
| 	}
 | |
| }
 | |
| 
 | |
| // NewDiscovery creates a new P2P discovery service with proper initialization.
 | |
| // This constructor ensures all channels and contexts are properly set up for
 | |
| // concurrent operation within the Docker Swarm environment.
 | |
| //
 | |
| // Implementation decision: We use context.WithCancel rather than a timeout context
 | |
| // because agent discovery should run indefinitely until explicitly stopped.
 | |
| func NewDiscovery() *Discovery {
 | |
| 	return NewDiscoveryWithConfig(DefaultDiscoveryConfig())
 | |
| }
 | |
| 
 | |
| // NewDiscoveryWithConfig creates a new P2P discovery service with custom configuration
 | |
| func NewDiscoveryWithConfig(config *DiscoveryConfig) *Discovery {
 | |
| 	// Create cancellable context for graceful shutdown coordination
 | |
| 	ctx, cancel := context.WithCancel(context.Background())
 | |
| 
 | |
| 	if config == nil {
 | |
| 		config = DefaultDiscoveryConfig()
 | |
| 	}
 | |
| 
 | |
| 	d := &Discovery{
 | |
| 		agents: make(map[string]*Agent), // Initialize empty agent registry
 | |
| 		stopCh: make(chan struct{}),     // Unbuffered channel for shutdown signaling
 | |
| 		ctx:    ctx,                     // Parent context for all goroutines
 | |
| 		cancel: cancel,                  // Cancellation function for cleanup
 | |
| 		config: config,                  // Discovery configuration
 | |
| 	}
 | |
| 
 | |
| 	// Initialize Docker Swarm discovery if enabled
 | |
| 	if config.DockerEnabled && (config.DiscoveryMethod == "swarm" || config.DiscoveryMethod == "auto") {
 | |
| 		swarmDiscovery, err := NewSwarmDiscovery(
 | |
| 			config.DockerHost,
 | |
| 			config.ServiceName,
 | |
| 			config.NetworkName,
 | |
| 			config.AgentPort,
 | |
| 		)
 | |
| 		if err != nil {
 | |
| 			log.Warn().
 | |
| 				Err(err).
 | |
| 				Str("discovery_method", config.DiscoveryMethod).
 | |
| 				Msg("⚠️ Failed to initialize Docker Swarm discovery, will fall back to DNS-based discovery")
 | |
| 		} else {
 | |
| 			d.swarmDiscovery = swarmDiscovery
 | |
| 			log.Info().
 | |
| 				Str("discovery_method", config.DiscoveryMethod).
 | |
| 				Msg("✅ Docker Swarm discovery initialized")
 | |
| 		}
 | |
| 	}
 | |
| 
 | |
| 	return d
 | |
| }
 | |
| 
 | |
| func normalizeAPIEndpoint(raw string) (string, string) {
 | |
| 	parsed, err := url.Parse(raw)
 | |
| 	if err != nil {
 | |
| 		return raw, ""
 | |
| 	}
 | |
| 	host := parsed.Hostname()
 | |
| 	if host == "" {
 | |
| 		return raw, ""
 | |
| 	}
 | |
| 	scheme := parsed.Scheme
 | |
| 	if scheme == "" {
 | |
| 		scheme = "http"
 | |
| 	}
 | |
| 	apiURL := fmt.Sprintf("%s://%s:%d", scheme, host, 8080)
 | |
| 	return apiURL, host
 | |
| }
 | |
| 
 | |
| // Start begins listening for CHORUS agent P2P broadcasts and starts background services.
 | |
| // This method launches goroutines for agent discovery and cleanup, enabling real-time
 | |
| // monitoring of the CHORUS agent ecosystem.
 | |
| //
 | |
| // Implementation decision: We use goroutines rather than a worker pool because the
 | |
| // workload is I/O bound (HTTP health checks) and we want immediate responsiveness.
 | |
| func (d *Discovery) Start() error {
 | |
| 	log.Info().Msg("🔍 Starting CHORUS P2P agent discovery")
 | |
| 
 | |
| 	// Launch agent discovery in separate goroutine to avoid blocking startup.
 | |
| 	// This continuously polls CHORUS agents via their health endpoints to
 | |
| 	// maintain an up-to-date registry of available agents and capabilities.
 | |
| 	go d.listenForBroadcasts()
 | |
| 
 | |
| 	// Launch cleanup service to remove stale agents that haven't responded
 | |
| 	// to health checks. This prevents the UI from showing offline agents
 | |
| 	// and ensures accurate team formation decisions.
 | |
| 	go d.cleanupStaleAgents()
 | |
| 
 | |
| 	return nil // Always succeeds since goroutines handle errors internally
 | |
| }
 | |
| 
 | |
| // Stop shuts down the P2P discovery service
 | |
| func (d *Discovery) Stop() error {
 | |
| 	log.Info().Msg("🔍 Stopping CHORUS P2P agent discovery")
 | |
| 
 | |
| 	d.cancel()
 | |
| 	close(d.stopCh)
 | |
| 
 | |
| 	for _, listener := range d.listeners {
 | |
| 		listener.Close()
 | |
| 	}
 | |
| 
 | |
| 	// Close Docker Swarm discovery client
 | |
| 	if d.swarmDiscovery != nil {
 | |
| 		if err := d.swarmDiscovery.Close(); err != nil {
 | |
| 			log.Warn().Err(err).Msg("Failed to close Docker Swarm discovery client")
 | |
| 		}
 | |
| 	}
 | |
| 
 | |
| 	return nil
 | |
| }
 | |
| 
 | |
| // GetAgents returns all currently discovered agents
 | |
| func (d *Discovery) GetAgents() []*Agent {
 | |
| 	d.mu.RLock()
 | |
| 	defer d.mu.RUnlock()
 | |
| 
 | |
| 	agents := make([]*Agent, 0, len(d.agents))
 | |
| 	for _, agent := range d.agents {
 | |
| 		agents = append(agents, agent)
 | |
| 	}
 | |
| 
 | |
| 	return agents
 | |
| }
 | |
| 
 | |
| // listenForBroadcasts listens for CHORUS agent P2P broadcasts
 | |
| func (d *Discovery) listenForBroadcasts() {
 | |
| 	log.Info().Msg("🔍 Starting real CHORUS agent discovery")
 | |
| 
 | |
| 	// Real discovery polling every 30 seconds to avoid overwhelming the service
 | |
| 	ticker := time.NewTicker(30 * time.Second)
 | |
| 	defer ticker.Stop()
 | |
| 
 | |
| 	// Run initial discovery immediately
 | |
| 	d.discoverRealCHORUSAgents()
 | |
| 
 | |
| 	for {
 | |
| 		select {
 | |
| 		case <-d.ctx.Done():
 | |
| 			return
 | |
| 		case <-ticker.C:
 | |
| 			d.discoverRealCHORUSAgents()
 | |
| 		}
 | |
| 	}
 | |
| }
 | |
| 
 | |
| // discoverRealCHORUSAgents discovers actual CHORUS agents by querying their health endpoints
 | |
| func (d *Discovery) discoverRealCHORUSAgents() {
 | |
| 	log.Debug().Msg("🔍 Discovering real CHORUS agents via health endpoints")
 | |
| 
 | |
| 	// Try Docker Swarm API discovery first (most reliable for production)
 | |
| 	if d.swarmDiscovery != nil && (d.config.DiscoveryMethod == "swarm" || d.config.DiscoveryMethod == "auto") {
 | |
| 		agents, err := d.swarmDiscovery.DiscoverAgents(d.ctx, d.config.VerifyHealth)
 | |
| 		if err != nil {
 | |
| 			log.Warn().
 | |
| 				Err(err).
 | |
| 				Str("discovery_method", d.config.DiscoveryMethod).
 | |
| 				Msg("⚠️ Docker Swarm discovery failed, falling back to DNS-based discovery")
 | |
| 		} else if len(agents) > 0 {
 | |
| 			// Successfully discovered agents via Docker Swarm API
 | |
| 			log.Info().
 | |
| 				Int("agent_count", len(agents)).
 | |
| 				Msg("✅ Successfully discovered agents via Docker Swarm API")
 | |
| 
 | |
| 			// Add all discovered agents to the registry
 | |
| 			for _, agent := range agents {
 | |
| 				d.addOrUpdateAgent(agent)
 | |
| 			}
 | |
| 
 | |
| 			// If we're in "swarm" mode (not "auto"), return here and skip DNS discovery
 | |
| 			if d.config.DiscoveryMethod == "swarm" {
 | |
| 				return
 | |
| 			}
 | |
| 		}
 | |
| 	}
 | |
| 
 | |
| 	// Fall back to DNS-based discovery methods
 | |
| 	// Query multiple potential CHORUS services
 | |
| 	d.queryActualCHORUSService()
 | |
| 	d.discoverDockerSwarmAgents()
 | |
| 	d.discoverKnownEndpoints()
 | |
| }
 | |
| 
 | |
| // queryActualCHORUSService queries the real CHORUS service to discover actual running agents.
 | |
| // This function replaces the previous simulation and discovers only what's actually running.
 | |
| func (d *Discovery) queryActualCHORUSService() {
 | |
| 	client := &http.Client{Timeout: 10 * time.Second}
 | |
| 
 | |
| 	// Try to query the CHORUS health endpoint
 | |
| 	endpoint := "http://chorus:8081/health"
 | |
| 	resp, err := client.Get(endpoint)
 | |
| 	if err != nil {
 | |
| 		log.Debug().
 | |
| 			Err(err).
 | |
| 			Str("endpoint", endpoint).
 | |
| 			Msg("Failed to reach CHORUS health endpoint")
 | |
| 		return
 | |
| 	}
 | |
| 	defer resp.Body.Close()
 | |
| 
 | |
| 	if resp.StatusCode != http.StatusOK {
 | |
| 		log.Debug().
 | |
| 			Int("status_code", resp.StatusCode).
 | |
| 			Str("endpoint", endpoint).
 | |
| 			Msg("CHORUS health endpoint returned non-200 status")
 | |
| 		return
 | |
| 	}
 | |
| 
 | |
| 	// CHORUS is responding, so create a single agent entry for the actual instance
 | |
| 	agentID := "chorus-agent-001"
 | |
| 	agent := &Agent{
 | |
| 		ID:     agentID,
 | |
| 		Name:   "CHORUS Agent",
 | |
| 		Status: "online",
 | |
| 		Capabilities: []string{
 | |
| 			"general_development",
 | |
| 			"task_coordination",
 | |
| 			"ai_integration",
 | |
| 			"code_analysis",
 | |
| 			"autonomous_development",
 | |
| 		},
 | |
| 		Model:          "llama3.1:8b",
 | |
| 		Endpoint:       "http://chorus:8080",
 | |
| 		LastSeen:       time.Now(),
 | |
| 		TasksCompleted: 0, // Will be updated by actual task completion tracking
 | |
| 		P2PAddr:        "chorus:9000",
 | |
| 		ClusterID:      "docker-unified-stack",
 | |
| 	}
 | |
| 
 | |
| 	// Check if CHORUS has an API endpoint that provides more detailed info
 | |
| 	// For now, we'll just use the single discovered instance
 | |
| 	d.addOrUpdateAgent(agent)
 | |
| 
 | |
| 	log.Info().
 | |
| 		Str("agent_id", agentID).
 | |
| 		Str("endpoint", endpoint).
 | |
| 		Msg("🤖 Discovered real CHORUS agent")
 | |
| }
 | |
| 
 | |
| // addOrUpdateAgent adds or updates an agent in the discovery cache
 | |
| func (d *Discovery) addOrUpdateAgent(agent *Agent) {
 | |
| 	d.mu.Lock()
 | |
| 	defer d.mu.Unlock()
 | |
| 
 | |
| 	existing, exists := d.agents[agent.ID]
 | |
| 	if exists {
 | |
| 		// Update existing agent
 | |
| 		existing.Status = agent.Status
 | |
| 		existing.LastSeen = agent.LastSeen
 | |
| 		existing.TasksCompleted = agent.TasksCompleted
 | |
| 		existing.CurrentTeam = agent.CurrentTeam
 | |
| 	} else {
 | |
| 		// Add new agent
 | |
| 		d.agents[agent.ID] = agent
 | |
| 		log.Info().
 | |
| 			Str("agent_id", agent.ID).
 | |
| 			Str("p2p_addr", agent.P2PAddr).
 | |
| 			Msg("🤖 Discovered new CHORUS agent")
 | |
| 	}
 | |
| }
 | |
| 
 | |
| // cleanupStaleAgents removes agents that haven't been seen recently
 | |
| func (d *Discovery) cleanupStaleAgents() {
 | |
| 	ticker := time.NewTicker(60 * time.Second)
 | |
| 	defer ticker.Stop()
 | |
| 
 | |
| 	for {
 | |
| 		select {
 | |
| 		case <-d.ctx.Done():
 | |
| 			return
 | |
| 		case <-ticker.C:
 | |
| 			d.removeStaleAgents()
 | |
| 		}
 | |
| 	}
 | |
| }
 | |
| 
 | |
| // removeStaleAgents removes agents that haven't been seen in 5 minutes
 | |
| func (d *Discovery) removeStaleAgents() {
 | |
| 	d.mu.Lock()
 | |
| 	defer d.mu.Unlock()
 | |
| 
 | |
| 	staleThreshold := time.Now().Add(-5 * time.Minute)
 | |
| 
 | |
| 	for id, agent := range d.agents {
 | |
| 		if agent.LastSeen.Before(staleThreshold) {
 | |
| 			delete(d.agents, id)
 | |
| 			log.Info().
 | |
| 				Str("agent_id", id).
 | |
| 				Time("last_seen", agent.LastSeen).
 | |
| 				Msg("🧹 Removed stale agent")
 | |
| 		}
 | |
| 	}
 | |
| }
 | |
| 
 | |
| // discoverDockerSwarmAgents discovers CHORUS agents running in Docker Swarm
 | |
| func (d *Discovery) discoverDockerSwarmAgents() {
 | |
| 	if !d.config.DockerEnabled {
 | |
| 		return
 | |
| 	}
 | |
| 
 | |
| 	// Query Docker Swarm API to find running services
 | |
| 	// For production deployment, this would query the Docker API
 | |
| 	// For MVP, we'll check for service-specific health endpoints
 | |
| 
 | |
| 	servicePorts := d.config.ServicePorts
 | |
| 	serviceHosts := []string{"chorus", "chorus-agent", d.config.ServiceName}
 | |
| 
 | |
| 	for _, host := range serviceHosts {
 | |
| 		for _, port := range servicePorts {
 | |
| 			d.checkServiceEndpoint(host, port)
 | |
| 		}
 | |
| 	}
 | |
| }
 | |
| 
 | |
| // discoverKnownEndpoints checks configured known endpoints for CHORUS agents
 | |
| func (d *Discovery) discoverKnownEndpoints() {
 | |
| 	for _, endpoint := range d.config.KnownEndpoints {
 | |
| 		d.queryServiceEndpoint(endpoint)
 | |
| 	}
 | |
| 
 | |
| 	// Check environment variables for additional endpoints
 | |
| 	if endpoints := os.Getenv("CHORUS_DISCOVERY_ENDPOINTS"); endpoints != "" {
 | |
| 		for _, endpoint := range strings.Split(endpoints, ",") {
 | |
| 			endpoint = strings.TrimSpace(endpoint)
 | |
| 			if endpoint != "" {
 | |
| 				d.queryServiceEndpoint(endpoint)
 | |
| 			}
 | |
| 		}
 | |
| 	}
 | |
| }
 | |
| 
 | |
| // checkServiceEndpoint checks a specific host:port combination for a CHORUS agent
 | |
| func (d *Discovery) checkServiceEndpoint(host string, port int) {
 | |
| 	endpoint := fmt.Sprintf("http://%s:%d", host, port)
 | |
| 	d.queryServiceEndpoint(endpoint)
 | |
| }
 | |
| 
 | |
| // queryServiceEndpoint attempts to discover a CHORUS agent at the given endpoint
 | |
| func (d *Discovery) queryServiceEndpoint(endpoint string) {
 | |
| 	client := &http.Client{Timeout: d.config.HealthTimeout}
 | |
| 
 | |
| 	// Try multiple health check paths
 | |
| 	healthPaths := []string{"/health", "/api/health", "/api/v1/health", "/status"}
 | |
| 
 | |
| 	for _, path := range healthPaths {
 | |
| 		fullURL := endpoint + path
 | |
| 		resp, err := client.Get(fullURL)
 | |
| 		if err != nil {
 | |
| 			log.Debug().
 | |
| 				Err(err).
 | |
| 				Str("endpoint", fullURL).
 | |
| 				Msg("Failed to reach service endpoint")
 | |
| 			continue
 | |
| 		}
 | |
| 
 | |
| 		if resp.StatusCode == http.StatusOK {
 | |
| 			d.processServiceResponse(endpoint, resp)
 | |
| 			resp.Body.Close()
 | |
| 			return // Found working endpoint
 | |
| 		}
 | |
| 		resp.Body.Close()
 | |
| 	}
 | |
| }
 | |
| 
 | |
| // processServiceResponse processes a successful health check response
 | |
| func (d *Discovery) processServiceResponse(endpoint string, resp *http.Response) {
 | |
| 	// Try to parse response for agent metadata
 | |
| 	var agentInfo struct {
 | |
| 		ID           string                 `json:"id"`
 | |
| 		Name         string                 `json:"name"`
 | |
| 		Status       string                 `json:"status"`
 | |
| 		Capabilities []string               `json:"capabilities"`
 | |
| 		Model        string                 `json:"model"`
 | |
| 		PeerID       string                 `json:"peer_id"`
 | |
| 		Metadata     map[string]interface{} `json:"metadata"`
 | |
| 	}
 | |
| 
 | |
| 	if err := json.NewDecoder(resp.Body).Decode(&agentInfo); err != nil {
 | |
| 		// If parsing fails, create a basic agent entry
 | |
| 		d.createBasicAgentFromEndpoint(endpoint)
 | |
| 		return
 | |
| 	}
 | |
| 
 | |
| 	apiEndpoint, host := normalizeAPIEndpoint(endpoint)
 | |
| 	p2pAddr := endpoint
 | |
| 	if host != "" {
 | |
| 		p2pAddr = fmt.Sprintf("%s:%d", host, 9000)
 | |
| 	}
 | |
| 
 | |
| 	// Build multiaddr from peer_id if available
 | |
| 	if agentInfo.PeerID != "" && host != "" {
 | |
| 		p2pAddr = fmt.Sprintf("/ip4/%s/tcp/9000/p2p/%s", host, agentInfo.PeerID)
 | |
| 	}
 | |
| 
 | |
| 	// Create detailed agent from parsed info
 | |
| 	agent := &Agent{
 | |
| 		ID:           agentInfo.ID,
 | |
| 		Name:         agentInfo.Name,
 | |
| 		Status:       agentInfo.Status,
 | |
| 		Capabilities: agentInfo.Capabilities,
 | |
| 		Model:        agentInfo.Model,
 | |
| 		PeerID:       agentInfo.PeerID,
 | |
| 		Endpoint:     apiEndpoint,
 | |
| 		LastSeen:     time.Now(),
 | |
| 		P2PAddr:      p2pAddr,
 | |
| 		ClusterID:    "docker-unified-stack",
 | |
| 	}
 | |
| 
 | |
| 	// Set defaults if fields are empty
 | |
| 	if agent.ID == "" {
 | |
| 		agent.ID = fmt.Sprintf("chorus-agent-%s", strings.ReplaceAll(apiEndpoint, ":", "-"))
 | |
| 	}
 | |
| 	if agent.Name == "" {
 | |
| 		agent.Name = "CHORUS Agent"
 | |
| 	}
 | |
| 	if agent.Status == "" {
 | |
| 		agent.Status = "online"
 | |
| 	}
 | |
| 	if len(agent.Capabilities) == 0 {
 | |
| 		agent.Capabilities = []string{
 | |
| 			"general_development",
 | |
| 			"task_coordination",
 | |
| 			"ai_integration",
 | |
| 			"code_analysis",
 | |
| 			"autonomous_development",
 | |
| 		}
 | |
| 	}
 | |
| 	if agent.Model == "" {
 | |
| 		agent.Model = "llama3.1:8b"
 | |
| 	}
 | |
| 
 | |
| 	d.addOrUpdateAgent(agent)
 | |
| 
 | |
| 	log.Info().
 | |
| 		Str("agent_id", agent.ID).
 | |
| 		Str("peer_id", agent.PeerID).
 | |
| 		Str("endpoint", endpoint).
 | |
| 		Msg("🤖 Discovered CHORUS agent with metadata")
 | |
| }
 | |
| 
 | |
| // createBasicAgentFromEndpoint creates a basic agent entry when detailed info isn't available
 | |
| func (d *Discovery) createBasicAgentFromEndpoint(endpoint string) {
 | |
| 	apiEndpoint, host := normalizeAPIEndpoint(endpoint)
 | |
| 	agentID := fmt.Sprintf("chorus-agent-%s", strings.ReplaceAll(apiEndpoint, ":", "-"))
 | |
| 
 | |
| 	p2pAddr := endpoint
 | |
| 	if host != "" {
 | |
| 		p2pAddr = fmt.Sprintf("%s:%d", host, 9000)
 | |
| 	}
 | |
| 
 | |
| 	agent := &Agent{
 | |
| 		ID:     agentID,
 | |
| 		Name:   "CHORUS Agent",
 | |
| 		Status: "online",
 | |
| 		Capabilities: []string{
 | |
| 			"general_development",
 | |
| 			"task_coordination",
 | |
| 			"ai_integration",
 | |
| 		},
 | |
| 		Model:          "llama3.1:8b",
 | |
| 		Endpoint:       apiEndpoint,
 | |
| 		LastSeen:       time.Now(),
 | |
| 		TasksCompleted: 0,
 | |
| 		P2PAddr:        p2pAddr,
 | |
| 		ClusterID:      "docker-unified-stack",
 | |
| 	}
 | |
| 
 | |
| 	d.addOrUpdateAgent(agent)
 | |
| 
 | |
| 	log.Info().
 | |
| 		Str("agent_id", agentID).
 | |
| 		Str("endpoint", endpoint).
 | |
| 		Msg("🤖 Discovered basic CHORUS agent")
 | |
| }
 | |
| 
 | |
| // AgentHealthResponse represents the expected health response format
 | |
| type AgentHealthResponse struct {
 | |
| 	ID             string                 `json:"id"`
 | |
| 	Name           string                 `json:"name"`
 | |
| 	Status         string                 `json:"status"`
 | |
| 	Capabilities   []string               `json:"capabilities"`
 | |
| 	Model          string                 `json:"model"`
 | |
| 	PeerID         string                 `json:"peer_id"`
 | |
| 	LastSeen       time.Time              `json:"last_seen"`
 | |
| 	TasksCompleted int                    `json:"tasks_completed"`
 | |
| 	Metadata       map[string]interface{} `json:"metadata"`
 | |
| }
 |