package api import ( "encoding/json" "fmt" "net/http" "os" "strconv" "strings" "time" "chorus/internal/council" "chorus/internal/logging" "chorus/p2p" "chorus/pkg/config" "chorus/pubsub" "github.com/gorilla/mux" "github.com/rs/zerolog" ) // HTTPServer provides HTTP API endpoints for CHORUS type HTTPServer struct { port int hypercoreLog *logging.HypercoreLog pubsub *pubsub.PubSub node *p2p.Node // P2P node for peer ID and network info server *http.Server CouncilManager *council.Manager // Exported for brief processing whooshEndpoint string logger zerolog.Logger } // NewHTTPServer creates a new HTTP server for CHORUS API func NewHTTPServer(cfg *config.Config, node *p2p.Node, hlog *logging.HypercoreLog, ps *pubsub.PubSub) *HTTPServer { agentID := cfg.Agent.ID agentName := deriveAgentName(cfg) endpoint := deriveAgentEndpoint(cfg) p2pAddr := deriveAgentP2PAddress(cfg, node) capabilities := cfg.Agent.Capabilities if len(capabilities) == 0 { capabilities = []string{"general_development", "task_coordination"} } councilMgr := council.NewManager(agentID, agentName, endpoint, p2pAddr, capabilities) whooshEndpoint := overrideWhooshEndpoint(cfg) return &HTTPServer{ port: cfg.Network.APIPort, hypercoreLog: hlog, pubsub: ps, node: node, CouncilManager: councilMgr, whooshEndpoint: strings.TrimRight(whooshEndpoint, "/"), logger: logging.ForComponent(logging.ComponentServer), } } // WhooshEndpoint returns the WHOOSH base endpoint configured for this agent. func (h *HTTPServer) WhooshEndpoint() string { return h.whooshEndpoint } func deriveAgentName(cfg *config.Config) string { if v := strings.TrimSpace(os.Getenv("CHORUS_AGENT_NAME")); v != "" { return v } if cfg.Agent.Specialization != "" { return cfg.Agent.Specialization } return cfg.Agent.ID } func deriveAgentEndpoint(cfg *config.Config) string { if v := strings.TrimSpace(os.Getenv("CHORUS_AGENT_ENDPOINT")); v != "" { return strings.TrimRight(v, "/") } host := strings.TrimSpace(os.Getenv("CHORUS_AGENT_SERVICE_HOST")) if host == "" { host = "chorus" } scheme := strings.TrimSpace(os.Getenv("CHORUS_AGENT_ENDPOINT_SCHEME")) if scheme == "" { scheme = "http" } return fmt.Sprintf("%s://%s:%d", scheme, host, cfg.Network.APIPort) } func deriveAgentP2PAddress(cfg *config.Config, node *p2p.Node) string { if v := strings.TrimSpace(os.Getenv("CHORUS_AGENT_P2P_ENDPOINT")); v != "" { return v } if node != nil { addrs := node.Addresses() if len(addrs) > 0 { return fmt.Sprintf("%s/p2p/%s", addrs[0], node.ID()) } } host := strings.TrimSpace(os.Getenv("CHORUS_AGENT_SERVICE_HOST")) if host == "" { host = "chorus" } return fmt.Sprintf("%s:%d", host, cfg.Network.P2PPort) } func overrideWhooshEndpoint(cfg *config.Config) string { if v := strings.TrimSpace(os.Getenv("CHORUS_WHOOSH_ENDPOINT")); v != "" { return strings.TrimRight(v, "/") } candidate := cfg.WHOOSHAPI.BaseURL if candidate == "" { candidate = cfg.WHOOSHAPI.URL } if candidate == "" { return "http://whoosh:8080" } trimmed := strings.TrimRight(candidate, "/") if strings.Contains(trimmed, "localhost") || strings.Contains(trimmed, "127.0.0.1") { return "http://whoosh:8080" } return trimmed } // Start starts the HTTP server func (h *HTTPServer) Start() error { router := mux.NewRouter() // Enable CORS for all routes router.Use(func(next http.Handler) http.Handler { return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { w.Header().Set("Access-Control-Allow-Origin", "*") w.Header().Set("Access-Control-Allow-Methods", "GET, POST, PUT, DELETE, OPTIONS") w.Header().Set("Access-Control-Allow-Headers", "Content-Type, Authorization") if r.Method == "OPTIONS" { w.WriteHeader(http.StatusOK) return } next.ServeHTTP(w, r) }) }) // API routes api := router.PathPrefix("/api").Subrouter() // Hypercore log endpoints api.HandleFunc("/hypercore/logs", h.handleGetLogs).Methods("GET") api.HandleFunc("/hypercore/logs/recent", h.handleGetRecentLogs).Methods("GET") api.HandleFunc("/hypercore/logs/stats", h.handleGetLogStats).Methods("GET") api.HandleFunc("/hypercore/logs/since/{index}", h.handleGetLogsSince).Methods("GET") // Health check api.HandleFunc("/health", h.handleHealth).Methods("GET") // Status endpoint api.HandleFunc("/status", h.handleStatus).Methods("GET") // Council opportunity endpoints (v1) v1 := api.PathPrefix("/v1").Subrouter() v1.HandleFunc("/opportunities/council", h.handleCouncilOpportunity).Methods("POST") v1.HandleFunc("/councils/status", h.handleCouncilStatusUpdate).Methods("POST") v1.HandleFunc("/councils/{councilID}/roles/{roleName}/brief", h.handleCouncilBrief).Methods("POST") h.server = &http.Server{ Addr: fmt.Sprintf(":%d", h.port), Handler: router, ReadTimeout: 15 * time.Second, WriteTimeout: 15 * time.Second, IdleTimeout: 60 * time.Second, } h.logger.Info().Int("port", h.port).Msg("Starting HTTP API server") return h.server.ListenAndServe() } // Stop stops the HTTP server func (h *HTTPServer) Stop() error { if h.server != nil { return h.server.Close() } return nil } // handleGetLogs returns hypercore log entries func (h *HTTPServer) handleGetLogs(w http.ResponseWriter, r *http.Request) { w.Header().Set("Content-Type", "application/json") // Parse query parameters query := r.URL.Query() startStr := query.Get("start") endStr := query.Get("end") limitStr := query.Get("limit") var start, end uint64 var err error if startStr != "" { start, err = strconv.ParseUint(startStr, 10, 64) if err != nil { http.Error(w, "Invalid start parameter", http.StatusBadRequest) return } } if endStr != "" { end, err = strconv.ParseUint(endStr, 10, 64) if err != nil { http.Error(w, "Invalid end parameter", http.StatusBadRequest) return } } else { end = h.hypercoreLog.Length() } var limit int = 100 // Default limit if limitStr != "" { limit, err = strconv.Atoi(limitStr) if err != nil || limit <= 0 || limit > 1000 { limit = 100 } } // Get log entries var entries []logging.LogEntry if endStr != "" || startStr != "" { entries, err = h.hypercoreLog.GetRange(start, end) } else { entries, err = h.hypercoreLog.GetRecentEntries(limit) } if err != nil { http.Error(w, fmt.Sprintf("Failed to get log entries: %v", err), http.StatusInternalServerError) return } response := map[string]interface{}{ "entries": entries, "count": len(entries), "timestamp": time.Now().Unix(), "total": h.hypercoreLog.Length(), } json.NewEncoder(w).Encode(response) } // handleGetRecentLogs returns the most recent log entries func (h *HTTPServer) handleGetRecentLogs(w http.ResponseWriter, r *http.Request) { w.Header().Set("Content-Type", "application/json") // Parse limit parameter query := r.URL.Query() limitStr := query.Get("limit") limit := 50 // Default if limitStr != "" { if l, err := strconv.Atoi(limitStr); err == nil && l > 0 && l <= 1000 { limit = l } } entries, err := h.hypercoreLog.GetRecentEntries(limit) if err != nil { http.Error(w, fmt.Sprintf("Failed to get recent entries: %v", err), http.StatusInternalServerError) return } response := map[string]interface{}{ "entries": entries, "count": len(entries), "timestamp": time.Now().Unix(), "total": h.hypercoreLog.Length(), } json.NewEncoder(w).Encode(response) } // handleGetLogsSince returns log entries since a given index func (h *HTTPServer) handleGetLogsSince(w http.ResponseWriter, r *http.Request) { w.Header().Set("Content-Type", "application/json") vars := mux.Vars(r) indexStr := vars["index"] index, err := strconv.ParseUint(indexStr, 10, 64) if err != nil { http.Error(w, "Invalid index parameter", http.StatusBadRequest) return } entries, err := h.hypercoreLog.GetEntriesSince(index) if err != nil { http.Error(w, fmt.Sprintf("Failed to get entries since index: %v", err), http.StatusInternalServerError) return } response := map[string]interface{}{ "entries": entries, "count": len(entries), "since_index": index, "timestamp": time.Now().Unix(), "total": h.hypercoreLog.Length(), } json.NewEncoder(w).Encode(response) } // handleGetLogStats returns statistics about the hypercore log func (h *HTTPServer) handleGetLogStats(w http.ResponseWriter, r *http.Request) { w.Header().Set("Content-Type", "application/json") stats := h.hypercoreLog.GetStats() json.NewEncoder(w).Encode(stats) } // handleHealth returns health status with P2P network information func (h *HTTPServer) handleHealth(w http.ResponseWriter, r *http.Request) { w.Header().Set("Content-Type", "application/json") health := map[string]interface{}{ "status": "healthy", "timestamp": time.Now().Unix(), "log_entries": h.hypercoreLog.Length(), } // Add P2P network information if node is available if h.node != nil { // Get peer ID health["peer_id"] = h.node.ID().String() // Build complete multiaddrs with peer ID using actual container IPs // This is required for Docker Swarm because the service VIP load-balances // and would cause peer ID mismatches when connecting to different replicas var multiaddrs []string rawAddrs := h.node.Addresses() // Log what addresses we're getting from the node h.logger.Debug().Int("address_count", len(rawAddrs)).Msg("Processing node addresses") for i, addr := range rawAddrs { h.logger.Debug().Int("index", i).Str("address", addr.String()).Msg("Raw address") } for _, addr := range rawAddrs { addrStr := addr.String() // Extract IP and port from multiaddr var ip, port string if strings.Contains(addrStr, "/ip4/") && strings.Contains(addrStr, "/tcp/") { parts := strings.Split(addrStr, "/") for i := 0; i < len(parts)-1; i++ { if parts[i] == "ip4" { ip = parts[i+1] } if parts[i] == "tcp" { port = parts[i+1] } } } // Skip localhost addresses if ip == "127.0.0.1" || ip == "::1" { continue } // Build IP-based multiaddr for direct P2P connections // This bypasses the Docker Swarm VIP and allows direct connection to this specific replica if ip != "" && port != "" { multiaddr := fmt.Sprintf("/ip4/%s/tcp/%s/p2p/%s", ip, port, h.node.ID().String()) h.logger.Debug().Str("multiaddr", multiaddr).Msg("Built multiaddr") multiaddrs = append(multiaddrs, multiaddr) } } health["multiaddrs"] = multiaddrs // Add connected peer count connectedPeers := h.node.ConnectedPeers() health["connected_peers"] = connectedPeers // P2P Connectivity Status - critical for detecting mesh issues p2pStatus := "healthy" if connectedPeers == 0 { p2pStatus = "isolated" // No peers - serious issue health["status"] = "degraded" } else if connectedPeers < 3 { p2pStatus = "limited" // Few peers - potential discovery issue } health["p2p_status"] = p2pStatus // Add DHT status if available if h.node.DHT() != nil { health["dht_enabled"] = true // DHT routing table size indicates how many nodes we know about health["dht_routing_table_size"] = h.node.DHT().GetDHTSize() } else { health["dht_enabled"] = false } // Add GossipSub topics (static topics that agents join) health["gossipsub_topics"] = []string{ "CHORUS/coordination/v1", "hmmm/meta-discussion/v1", "CHORUS/context-feedback/v1", } // Add bootstrap status health["bootstrap_peers_configured"] = len(h.node.BootstrapPeers()) } json.NewEncoder(w).Encode(health) } // handleStatus returns detailed status information func (h *HTTPServer) handleStatus(w http.ResponseWriter, r *http.Request) { w.Header().Set("Content-Type", "application/json") status := map[string]interface{}{ "status": "running", "timestamp": time.Now().Unix(), "hypercore": h.hypercoreLog.GetStats(), "api_version": "1.0.0", } json.NewEncoder(w).Encode(status) } // handleCouncilOpportunity receives council formation opportunities from WHOOSH func (h *HTTPServer) handleCouncilOpportunity(w http.ResponseWriter, r *http.Request) { w.Header().Set("Content-Type", "application/json") var opportunity council.CouncilOpportunity if err := json.NewDecoder(r.Body).Decode(&opportunity); err != nil { http.Error(w, fmt.Sprintf("Invalid JSON payload: %v", err), http.StatusBadRequest) return } // Log the received opportunity to hypercore logData := map[string]interface{}{ "event": "council_opportunity_received", "council_id": opportunity.CouncilID, "project_name": opportunity.ProjectName, "repository": opportunity.Repository, "core_roles": len(opportunity.CoreRoles), "optional_roles": len(opportunity.OptionalRoles), "ucxl_address": opportunity.UCXLAddress, "message": fmt.Sprintf("Received council opportunity for project: %s", opportunity.ProjectName), } if _, err := h.hypercoreLog.Append(logging.NetworkEvent, logData); err != nil { h.logger.Warn().Err(err).Msg("Failed to log council opportunity") } // Log council opportunity with structured logging h.logger.Info(). Str("council_id", opportunity.CouncilID). Str("project_name", opportunity.ProjectName). Str("repository", opportunity.Repository). Int("core_roles", len(opportunity.CoreRoles)). Int("optional_roles", len(opportunity.OptionalRoles)). Str("ucxl_address", opportunity.UCXLAddress). Msg("Council opportunity received") // Log available roles for _, role := range opportunity.CoreRoles { h.logger.Info(). Str("agent_name", role.AgentName). Str("role_name", role.RoleName). Str("role_type", "CORE"). Msg("Available role") } for _, role := range opportunity.OptionalRoles { h.logger.Info(). Str("agent_name", role.AgentName). Str("role_name", role.RoleName). Str("role_type", "OPTIONAL"). Msg("Available role") } // Evaluate the opportunity and claim a role if suitable go func() { if err := h.CouncilManager.EvaluateOpportunity(&opportunity, h.whooshEndpoint); err != nil { h.logger.Warn().Err(err).Msg("Failed to evaluate/claim council role") } }() response := map[string]interface{}{ "status": "received", "council_id": opportunity.CouncilID, "message": "Council opportunity received and being evaluated", "timestamp": time.Now().Unix(), "agent_id": h.CouncilManager.AgentID(), } w.WriteHeader(http.StatusAccepted) json.NewEncoder(w).Encode(response) } // handleCouncilStatusUpdate receives council staffing updates from WHOOSH func (h *HTTPServer) handleCouncilStatusUpdate(w http.ResponseWriter, r *http.Request) { w.Header().Set("Content-Type", "application/json") type roleCountsPayload struct { Total int `json:"total"` Claimed int `json:"claimed"` } type councilStatusPayload struct { CouncilID string `json:"council_id"` ProjectName string `json:"project_name"` Status string `json:"status"` Message string `json:"message"` Timestamp time.Time `json:"timestamp"` CoreRoles roleCountsPayload `json:"core_roles"` Optional roleCountsPayload `json:"optional_roles"` } var payload councilStatusPayload if err := json.NewDecoder(r.Body).Decode(&payload); err != nil { http.Error(w, fmt.Sprintf("Invalid JSON payload: %v", err), http.StatusBadRequest) return } if payload.CouncilID == "" { http.Error(w, "council_id is required", http.StatusBadRequest) return } if payload.Status == "" { payload.Status = "unknown" } if payload.Timestamp.IsZero() { payload.Timestamp = time.Now() } if payload.Message == "" { payload.Message = fmt.Sprintf("Council status update: %s (core %d/%d, optional %d/%d)", payload.Status, payload.CoreRoles.Claimed, payload.CoreRoles.Total, payload.Optional.Claimed, payload.Optional.Total, ) } logData := map[string]interface{}{ "event": "council_status_update", "council_id": payload.CouncilID, "project_name": payload.ProjectName, "status": payload.Status, "message": payload.Message, "timestamp": payload.Timestamp.Format(time.RFC3339), "core_roles_total": payload.CoreRoles.Total, "core_roles_claimed": payload.CoreRoles.Claimed, "optional_roles_total": payload.Optional.Total, "optional_roles_claimed": payload.Optional.Claimed, } if _, err := h.hypercoreLog.Append(logging.NetworkEvent, logData); err != nil { h.logger.Warn().Err(err).Msg("Failed to log council status update") } h.logger.Info(). Str("council_id", payload.CouncilID). Str("project_name", payload.ProjectName). Str("status", payload.Status). Int("core_roles_claimed", payload.CoreRoles.Claimed). Int("core_roles_total", payload.CoreRoles.Total). Int("optional_roles_claimed", payload.Optional.Claimed). Int("optional_roles_total", payload.Optional.Total). Str("message", payload.Message). Msg("Council status update") response := map[string]interface{}{ "status": "received", "council_id": payload.CouncilID, "timestamp": payload.Timestamp.Unix(), } w.WriteHeader(http.StatusAccepted) json.NewEncoder(w).Encode(response) } func (h *HTTPServer) handleCouncilBrief(w http.ResponseWriter, r *http.Request) { w.Header().Set("Content-Type", "application/json") vars := mux.Vars(r) councilID := vars["councilID"] roleName := vars["roleName"] if councilID == "" || roleName == "" { http.Error(w, "councilID and roleName are required", http.StatusBadRequest) return } var brief council.CouncilBrief if err := json.NewDecoder(r.Body).Decode(&brief); err != nil { http.Error(w, fmt.Sprintf("Invalid JSON payload: %v", err), http.StatusBadRequest) return } brief.CouncilID = councilID brief.RoleName = roleName h.logger.Info(). Str("council_id", councilID). Str("role_name", roleName). Str("brief_url", brief.BriefURL). Str("summary", brief.Summary). Msg("Received council brief") if h.CouncilManager != nil { h.CouncilManager.HandleCouncilBrief(councilID, roleName, &brief) } logData := map[string]interface{}{ "event": "council_brief_received", "council_id": councilID, "role_name": roleName, "project_name": brief.ProjectName, "repository": brief.Repository, "brief_url": brief.BriefURL, "ucxl_address": brief.UCXLAddress, "hmmm_topic": brief.HMMMTopic, "expected_artifacts": brief.ExpectedArtifacts, "timestamp": time.Now().Format(time.RFC3339), } if _, err := h.hypercoreLog.Append(logging.NetworkEvent, logData); err != nil { h.logger.Warn().Err(err).Msg("Failed to log council brief") } response := map[string]interface{}{ "status": "received", "council_id": councilID, "role_name": roleName, "timestamp": time.Now().Unix(), } w.WriteHeader(http.StatusAccepted) json.NewEncoder(w).Encode(response) }