diff --git a/api/http_server.go b/api/http_server.go index e59a3a5..3e1d494 100644 --- a/api/http_server.go +++ b/api/http_server.go @@ -4,10 +4,15 @@ import ( "encoding/json" "fmt" "net/http" + "os" "strconv" + "strings" "time" + "chorus/internal/council" "chorus/internal/logging" + "chorus/p2p" + "chorus/pkg/config" "chorus/pubsub" "github.com/gorilla/mux" @@ -15,19 +20,96 @@ import ( // HTTPServer provides HTTP API endpoints for CHORUS type HTTPServer struct { - port int - hypercoreLog *logging.HypercoreLog - pubsub *pubsub.PubSub - server *http.Server + port int + hypercoreLog *logging.HypercoreLog + pubsub *pubsub.PubSub + server *http.Server + CouncilManager *council.Manager // Exported for brief processing + whooshEndpoint string } // NewHTTPServer creates a new HTTP server for CHORUS API -func NewHTTPServer(port int, hlog *logging.HypercoreLog, ps *pubsub.PubSub) *HTTPServer { - return &HTTPServer{ - port: port, - hypercoreLog: hlog, - pubsub: ps, +func NewHTTPServer(cfg *config.Config, node *p2p.Node, hlog *logging.HypercoreLog, ps *pubsub.PubSub) *HTTPServer { + agentID := cfg.Agent.ID + agentName := deriveAgentName(cfg) + endpoint := deriveAgentEndpoint(cfg) + p2pAddr := deriveAgentP2PAddress(cfg, node) + capabilities := cfg.Agent.Capabilities + if len(capabilities) == 0 { + capabilities = []string{"general_development", "task_coordination"} } + + councilMgr := council.NewManager(agentID, agentName, endpoint, p2pAddr, capabilities) + + whooshEndpoint := overrideWhooshEndpoint(cfg) + + return &HTTPServer{ + port: cfg.Network.APIPort, + hypercoreLog: hlog, + pubsub: ps, + CouncilManager: councilMgr, + whooshEndpoint: strings.TrimRight(whooshEndpoint, "/"), + } +} + +func deriveAgentName(cfg *config.Config) string { + if v := strings.TrimSpace(os.Getenv("CHORUS_AGENT_NAME")); v != "" { + return v + } + if cfg.Agent.Specialization != "" { + return cfg.Agent.Specialization + } + return cfg.Agent.ID +} + +func deriveAgentEndpoint(cfg *config.Config) string { + if v := strings.TrimSpace(os.Getenv("CHORUS_AGENT_ENDPOINT")); v != "" { + return strings.TrimRight(v, "/") + } + host := strings.TrimSpace(os.Getenv("CHORUS_AGENT_SERVICE_HOST")) + if host == "" { + host = "chorus" + } + scheme := strings.TrimSpace(os.Getenv("CHORUS_AGENT_ENDPOINT_SCHEME")) + if scheme == "" { + scheme = "http" + } + return fmt.Sprintf("%s://%s:%d", scheme, host, cfg.Network.APIPort) +} + +func deriveAgentP2PAddress(cfg *config.Config, node *p2p.Node) string { + if v := strings.TrimSpace(os.Getenv("CHORUS_AGENT_P2P_ENDPOINT")); v != "" { + return v + } + if node != nil { + addrs := node.Addresses() + if len(addrs) > 0 { + return fmt.Sprintf("%s/p2p/%s", addrs[0], node.ID()) + } + } + host := strings.TrimSpace(os.Getenv("CHORUS_AGENT_SERVICE_HOST")) + if host == "" { + host = "chorus" + } + return fmt.Sprintf("%s:%d", host, cfg.Network.P2PPort) +} + +func overrideWhooshEndpoint(cfg *config.Config) string { + if v := strings.TrimSpace(os.Getenv("CHORUS_WHOOSH_ENDPOINT")); v != "" { + return strings.TrimRight(v, "/") + } + candidate := cfg.WHOOSHAPI.BaseURL + if candidate == "" { + candidate = cfg.WHOOSHAPI.URL + } + if candidate == "" { + return "http://whoosh:8080" + } + trimmed := strings.TrimRight(candidate, "/") + if strings.Contains(trimmed, "localhost") || strings.Contains(trimmed, "127.0.0.1") { + return "http://whoosh:8080" + } + return trimmed } // Start starts the HTTP server @@ -65,6 +147,12 @@ func (h *HTTPServer) Start() error { // Status endpoint api.HandleFunc("/status", h.handleStatus).Methods("GET") + // Council opportunity endpoints (v1) + v1 := api.PathPrefix("/v1").Subrouter() + v1.HandleFunc("/opportunities/council", h.handleCouncilOpportunity).Methods("POST") + v1.HandleFunc("/councils/status", h.handleCouncilStatusUpdate).Methods("POST") + v1.HandleFunc("/councils/{councilID}/roles/{roleName}/brief", h.handleCouncilBrief).Methods("POST") + h.server = &http.Server{ Addr: fmt.Sprintf(":%d", h.port), Handler: router, @@ -242,3 +330,209 @@ func (h *HTTPServer) handleStatus(w http.ResponseWriter, r *http.Request) { json.NewEncoder(w).Encode(status) } + +// handleCouncilOpportunity receives council formation opportunities from WHOOSH +func (h *HTTPServer) handleCouncilOpportunity(w http.ResponseWriter, r *http.Request) { + w.Header().Set("Content-Type", "application/json") + + var opportunity council.CouncilOpportunity + if err := json.NewDecoder(r.Body).Decode(&opportunity); err != nil { + http.Error(w, fmt.Sprintf("Invalid JSON payload: %v", err), http.StatusBadRequest) + return + } + + // Log the received opportunity to hypercore + logData := map[string]interface{}{ + "event": "council_opportunity_received", + "council_id": opportunity.CouncilID, + "project_name": opportunity.ProjectName, + "repository": opportunity.Repository, + "core_roles": len(opportunity.CoreRoles), + "optional_roles": len(opportunity.OptionalRoles), + "ucxl_address": opportunity.UCXLAddress, + "message": fmt.Sprintf("šŸ“” Received council opportunity for project: %s", opportunity.ProjectName), + } + + if _, err := h.hypercoreLog.Append(logging.NetworkEvent, logData); err != nil { + fmt.Printf("Failed to log council opportunity: %v\n", err) + } + + // Log to console for immediate visibility + fmt.Printf("\nšŸ“” COUNCIL OPPORTUNITY RECEIVED\n") + fmt.Printf(" Council ID: %s\n", opportunity.CouncilID) + fmt.Printf(" Project: %s\n", opportunity.ProjectName) + fmt.Printf(" Repository: %s\n", opportunity.Repository) + fmt.Printf(" Core Roles: %d\n", len(opportunity.CoreRoles)) + fmt.Printf(" Optional Roles: %d\n", len(opportunity.OptionalRoles)) + fmt.Printf(" UCXL: %s\n", opportunity.UCXLAddress) + fmt.Printf("\n Available Roles:\n") + for _, role := range opportunity.CoreRoles { + fmt.Printf(" - %s (%s) [CORE]\n", role.AgentName, role.RoleName) + } + for _, role := range opportunity.OptionalRoles { + fmt.Printf(" - %s (%s) [OPTIONAL]\n", role.AgentName, role.RoleName) + } + fmt.Printf("\n") + + // Evaluate the opportunity and claim a role if suitable + go func() { + if err := h.CouncilManager.EvaluateOpportunity(&opportunity, h.whooshEndpoint); err != nil { + fmt.Printf("Failed to evaluate/claim council role: %v\n", err) + } + }() + + response := map[string]interface{}{ + "status": "received", + "council_id": opportunity.CouncilID, + "message": "Council opportunity received and being evaluated", + "timestamp": time.Now().Unix(), + "agent_id": h.CouncilManager.AgentID(), + } + + w.WriteHeader(http.StatusAccepted) + json.NewEncoder(w).Encode(response) +} + +// handleCouncilStatusUpdate receives council staffing updates from WHOOSH +func (h *HTTPServer) handleCouncilStatusUpdate(w http.ResponseWriter, r *http.Request) { + w.Header().Set("Content-Type", "application/json") + + type roleCountsPayload struct { + Total int `json:"total"` + Claimed int `json:"claimed"` + } + + type councilStatusPayload struct { + CouncilID string `json:"council_id"` + ProjectName string `json:"project_name"` + Status string `json:"status"` + Message string `json:"message"` + Timestamp time.Time `json:"timestamp"` + CoreRoles roleCountsPayload `json:"core_roles"` + Optional roleCountsPayload `json:"optional_roles"` + } + + var payload councilStatusPayload + if err := json.NewDecoder(r.Body).Decode(&payload); err != nil { + http.Error(w, fmt.Sprintf("Invalid JSON payload: %v", err), http.StatusBadRequest) + return + } + + if payload.CouncilID == "" { + http.Error(w, "council_id is required", http.StatusBadRequest) + return + } + + if payload.Status == "" { + payload.Status = "unknown" + } + + if payload.Timestamp.IsZero() { + payload.Timestamp = time.Now() + } + + if payload.Message == "" { + payload.Message = fmt.Sprintf("Council status update: %s (core %d/%d, optional %d/%d)", + payload.Status, + payload.CoreRoles.Claimed, payload.CoreRoles.Total, + payload.Optional.Claimed, payload.Optional.Total, + ) + } + + logData := map[string]interface{}{ + "event": "council_status_update", + "council_id": payload.CouncilID, + "project_name": payload.ProjectName, + "status": payload.Status, + "message": payload.Message, + "timestamp": payload.Timestamp.Format(time.RFC3339), + "core_roles_total": payload.CoreRoles.Total, + "core_roles_claimed": payload.CoreRoles.Claimed, + "optional_roles_total": payload.Optional.Total, + "optional_roles_claimed": payload.Optional.Claimed, + } + + if _, err := h.hypercoreLog.Append(logging.NetworkEvent, logData); err != nil { + fmt.Printf("Failed to log council status update: %v\n", err) + } + + fmt.Printf("\nšŸ COUNCIL STATUS UPDATE\n") + fmt.Printf(" Council ID: %s\n", payload.CouncilID) + if payload.ProjectName != "" { + fmt.Printf(" Project: %s\n", payload.ProjectName) + } + fmt.Printf(" Status: %s\n", payload.Status) + fmt.Printf(" Core Roles: %d/%d claimed\n", payload.CoreRoles.Claimed, payload.CoreRoles.Total) + fmt.Printf(" Optional Roles: %d/%d claimed\n", payload.Optional.Claimed, payload.Optional.Total) + fmt.Printf(" Message: %s\n\n", payload.Message) + + response := map[string]interface{}{ + "status": "received", + "council_id": payload.CouncilID, + "timestamp": payload.Timestamp.Unix(), + } + + w.WriteHeader(http.StatusAccepted) + json.NewEncoder(w).Encode(response) +} + +func (h *HTTPServer) handleCouncilBrief(w http.ResponseWriter, r *http.Request) { + w.Header().Set("Content-Type", "application/json") + + vars := mux.Vars(r) + councilID := vars["councilID"] + roleName := vars["roleName"] + + if councilID == "" || roleName == "" { + http.Error(w, "councilID and roleName are required", http.StatusBadRequest) + return + } + + var brief council.CouncilBrief + if err := json.NewDecoder(r.Body).Decode(&brief); err != nil { + http.Error(w, fmt.Sprintf("Invalid JSON payload: %v", err), http.StatusBadRequest) + return + } + + brief.CouncilID = councilID + brief.RoleName = roleName + + fmt.Printf("\nšŸ“¦ Received council brief for %s (%s)\n", councilID, roleName) + if brief.BriefURL != "" { + fmt.Printf(" Brief URL: %s\n", brief.BriefURL) + } + if brief.Summary != "" { + fmt.Printf(" Summary: %s\n", brief.Summary) + } + + if h.CouncilManager != nil { + h.CouncilManager.HandleCouncilBrief(councilID, roleName, &brief) + } + + logData := map[string]interface{}{ + "event": "council_brief_received", + "council_id": councilID, + "role_name": roleName, + "project_name": brief.ProjectName, + "repository": brief.Repository, + "brief_url": brief.BriefURL, + "ucxl_address": brief.UCXLAddress, + "hmmm_topic": brief.HMMMTopic, + "expected_artifacts": brief.ExpectedArtifacts, + "timestamp": time.Now().Format(time.RFC3339), + } + + if _, err := h.hypercoreLog.Append(logging.NetworkEvent, logData); err != nil { + fmt.Printf("Failed to log council brief: %v\n", err) + } + + response := map[string]interface{}{ + "status": "received", + "council_id": councilID, + "role_name": roleName, + "timestamp": time.Now().Unix(), + } + + w.WriteHeader(http.StatusAccepted) + json.NewEncoder(w).Encode(response) +} diff --git a/internal/council/manager.go b/internal/council/manager.go index 3e60fe5..e279ae8 100644 --- a/internal/council/manager.go +++ b/internal/council/manager.go @@ -433,6 +433,11 @@ func (m *Manager) currentAssignmentSnapshot() *RoleAssignment { return m.currentAssignment } +// GetCurrentAssignment returns the current role assignment (public accessor) +func (m *Manager) GetCurrentAssignment() *RoleAssignment { + return m.currentAssignmentSnapshot() +} + // roleClaimResponse mirrors WHOOSH role claim response payload. type roleClaimResponse struct { Status string `json:"status"` diff --git a/internal/runtime/agent_support.go b/internal/runtime/agent_support.go index 04519d8..951e24f 100644 --- a/internal/runtime/agent_support.go +++ b/internal/runtime/agent_support.go @@ -1,12 +1,18 @@ package runtime import ( + "bytes" "context" + "encoding/json" "fmt" + "net/http" "time" + "chorus/internal/council" "chorus/internal/logging" + "chorus/pkg/ai" "chorus/pkg/dht" + "chorus/pkg/execution" "chorus/pkg/health" "chorus/pkg/shutdown" "chorus/pubsub" @@ -39,6 +45,10 @@ func (r *SharedRuntime) StartAgentMode() error { // Start status reporting go r.statusReporter() + // Start council brief processing + ctx := context.Background() + go r.processBriefs(ctx) + r.Logger.Info("šŸ” Listening for peers on container network...") r.Logger.Info("šŸ“” Ready for task coordination and meta-discussion") r.Logger.Info("šŸŽÆ HMMM collaborative reasoning enabled") @@ -321,3 +331,185 @@ func (r *SharedRuntime) setupGracefulShutdown(shutdownManager *shutdown.Manager, r.Logger.Info("šŸ›”ļø Graceful shutdown components registered") } + +// processBriefs polls for council briefs and executes them +func (r *SharedRuntime) processBriefs(ctx context.Context) { + ticker := time.NewTicker(15 * time.Second) + defer ticker.Stop() + + r.Logger.Info("šŸ“¦ Brief processing loop started") + + for { + select { + case <-ctx.Done(): + r.Logger.Info("šŸ“¦ Brief processing loop stopped") + return + case <-ticker.C: + if r.HTTPServer == nil || r.HTTPServer.CouncilManager == nil { + continue + } + + assignment := r.HTTPServer.CouncilManager.GetCurrentAssignment() + if assignment == nil || assignment.Brief == nil { + continue + } + + // Check if we have a brief to execute + brief := assignment.Brief + if brief.BriefURL == "" && brief.Summary == "" { + continue + } + + r.Logger.Info("šŸ“¦ Processing design brief for council %s, role %s", assignment.CouncilID, assignment.RoleName) + + // Execute the brief + if err := r.executeBrief(ctx, assignment); err != nil { + r.Logger.Error("āŒ Failed to execute brief: %v", err) + continue + } + + r.Logger.Info("āœ… Brief execution completed for council %s", assignment.CouncilID) + + // Clear the brief after execution to prevent re-execution + assignment.Brief = nil + } + } +} + +// executeBrief executes a council brief using the ExecutionEngine +func (r *SharedRuntime) executeBrief(ctx context.Context, assignment *council.RoleAssignment) error { + brief := assignment.Brief + if brief == nil { + return fmt.Errorf("no brief to execute") + } + + // Create execution engine + engine := execution.NewTaskExecutionEngine() + + // Create AI provider factory + aiFactory := ai.NewProviderFactory() + + engineConfig := &execution.EngineConfig{ + AIProviderFactory: aiFactory, + MaxConcurrentTasks: 1, + DefaultTimeout: time.Hour, + EnableMetrics: true, + LogLevel: "info", + } + + if err := engine.Initialize(ctx, engineConfig); err != nil { + return fmt.Errorf("failed to initialize execution engine: %w", err) + } + defer engine.Shutdown() + + // Build execution request + request := r.buildExecutionRequest(assignment) + + r.Logger.Info("šŸš€ Executing brief for council %s, role %s", assignment.CouncilID, assignment.RoleName) + + // Track task + taskID := fmt.Sprintf("council-%s-%s", assignment.CouncilID, assignment.RoleName) + r.TaskTracker.AddTask(taskID) + defer r.TaskTracker.RemoveTask(taskID) + + // Execute the task + result, err := engine.ExecuteTask(ctx, request) + if err != nil { + return fmt.Errorf("task execution failed: %w", err) + } + + r.Logger.Info("āœ… Task execution successful. Output: %s", result.Output) + + // Upload results to WHOOSH + if err := r.uploadResults(assignment, result); err != nil { + r.Logger.Error("āš ļø Failed to upload results to WHOOSH: %v", err) + // Don't fail the execution if upload fails + } + + return nil +} + +// buildExecutionRequest converts a council brief to an execution request +func (r *SharedRuntime) buildExecutionRequest(assignment *council.RoleAssignment) *execution.TaskExecutionRequest { + brief := assignment.Brief + + // Build task description from brief + taskDescription := brief.Summary + if taskDescription == "" { + taskDescription = "Execute council brief" + } + + // Add additional context + additionalContext := map[string]interface{}{ + "council_id": assignment.CouncilID, + "role_name": assignment.RoleName, + "brief_url": brief.BriefURL, + "expected_artifacts": brief.ExpectedArtifacts, + "hmmm_topic": brief.HMMMTopic, + "persona": assignment.Persona, + } + + return &execution.TaskExecutionRequest{ + ID: fmt.Sprintf("council-%s-%s", assignment.CouncilID, assignment.RoleName), + Type: "council_brief", + Description: taskDescription, + Context: additionalContext, + Requirements: &execution.TaskRequirements{ + AIModel: r.Config.AI.Provider, + SandboxType: "docker", + RequiredTools: []string{}, + }, + Timeout: time.Hour, + } +} + +// uploadResults uploads execution results to WHOOSH +func (r *SharedRuntime) uploadResults(assignment *council.RoleAssignment, result *execution.TaskExecutionResult) error { + // Get WHOOSH endpoint from environment or config + whooshEndpoint := r.Config.WHOOSHAPI.BaseURL + if whooshEndpoint == "" { + whooshEndpoint = "http://whoosh:8080" + } + + // Build result payload + payload := map[string]interface{}{ + "council_id": assignment.CouncilID, + "role_name": assignment.RoleName, + "agent_id": r.Config.Agent.ID, + "ucxl_address": assignment.UCXLAddress, + "output": result.Output, + "artifacts": result.Artifacts, + "success": result.Success, + "error_message": result.ErrorMessage, + "execution_time": result.Metrics.Duration.Seconds(), + "timestamp": time.Now().Unix(), + } + + jsonData, err := json.Marshal(payload) + if err != nil { + return fmt.Errorf("failed to marshal result payload: %w", err) + } + + // Send to WHOOSH + url := fmt.Sprintf("%s/api/councils/%s/results", whooshEndpoint, assignment.CouncilID) + req, err := http.NewRequest("POST", url, bytes.NewBuffer(jsonData)) + if err != nil { + return fmt.Errorf("failed to create HTTP request: %w", err) + } + + req.Header.Set("Content-Type", "application/json") + + client := &http.Client{Timeout: 30 * time.Second} + resp, err := client.Do(req) + if err != nil { + return fmt.Errorf("failed to send results to WHOOSH: %w", err) + } + defer resp.Body.Close() + + if resp.StatusCode != http.StatusOK && resp.StatusCode != http.StatusAccepted { + return fmt.Errorf("WHOOSH returned status %d", resp.StatusCode) + } + + r.Logger.Info("šŸ“¤ Results uploaded to WHOOSH for council %s", assignment.CouncilID) + return nil +}