package runtime import ( "bytes" "context" "encoding/json" "fmt" "net/http" "time" "chorus/internal/council" "chorus/internal/logging" "chorus/pkg/ai" "chorus/pkg/dht" "chorus/pkg/execution" "chorus/pkg/health" "chorus/pkg/shutdown" "chorus/pubsub" ) // simpleLogger implements basic logging for shutdown and health systems type simpleLogger struct { logger logging.Logger } func (l *simpleLogger) Info(msg string, args ...interface{}) { l.logger.Info(msg, args...) } func (l *simpleLogger) Warn(msg string, args ...interface{}) { l.logger.Warn(msg, args...) } func (l *simpleLogger) Error(msg string, args ...interface{}) { l.logger.Error(msg, args...) } // StartAgentMode runs the autonomous agent with all standard behaviors func (r *SharedRuntime) StartAgentMode() error { // Announce capabilities and role go r.announceAvailability() go r.announceCapabilitiesOnChange() go r.announceRoleOnStartup() // Start status reporting go r.statusReporter() // Start council brief processing ctx := context.Background() go r.processBriefs(ctx) r.Logger.Info("🔍 Listening for peers on container network...") r.Logger.Info("📡 Ready for task coordination and meta-discussion") r.Logger.Info("🎯 HMMM collaborative reasoning enabled") // === Comprehensive Health Monitoring & Graceful Shutdown === shutdownManager := shutdown.NewManager(30*time.Second, &simpleLogger{logger: r.Logger}) healthManager := health.NewManager(r.Node.ID().ShortString(), AppVersion, &simpleLogger{logger: r.Logger}) healthManager.SetShutdownManager(shutdownManager) // Register health checks r.setupHealthChecks(healthManager) // Register components for graceful shutdown r.setupGracefulShutdown(shutdownManager, healthManager) // Start health monitoring if err := healthManager.Start(); err != nil { return err } r.HealthManager = healthManager r.Logger.Info("❤️ Health monitoring started") // Start health HTTP server if err := healthManager.StartHTTPServer(r.Config.Network.HealthPort); err != nil { r.Logger.Error("❌ Failed to start health HTTP server: %v", err) } else { r.Logger.Info("🏥 Health endpoints available at http://localhost:%d/health", r.Config.Network.HealthPort) } // Start shutdown manager shutdownManager.Start() r.ShutdownManager = shutdownManager r.Logger.Info("🛡️ Graceful shutdown manager started") r.Logger.Info("✅ CHORUS agent system fully operational with health monitoring") // Wait for graceful shutdown shutdownManager.Wait() r.Logger.Info("✅ CHORUS agent system shutdown completed") return nil } // announceAvailability broadcasts current working status for task assignment func (r *SharedRuntime) announceAvailability() { ticker := time.NewTicker(30 * time.Second) defer ticker.Stop() for ; ; <-ticker.C { currentTasks := r.TaskTracker.GetActiveTasks() maxTasks := r.TaskTracker.GetMaxTasks() isAvailable := len(currentTasks) < maxTasks status := "ready" if len(currentTasks) >= maxTasks { status = "busy" } else if len(currentTasks) > 0 { status = "working" } availability := map[string]interface{}{ "node_id": r.Node.ID().ShortString(), "available_for_work": isAvailable, "current_tasks": len(currentTasks), "max_tasks": maxTasks, "last_activity": time.Now().Unix(), "status": status, "timestamp": time.Now().Unix(), } if err := r.PubSub.PublishBzzzMessage(pubsub.AvailabilityBcast, availability); err != nil { r.Logger.Error("❌ Failed to announce availability: %v", err) } } } // statusReporter provides periodic status updates func (r *SharedRuntime) statusReporter() { ticker := time.NewTicker(60 * time.Second) defer ticker.Stop() for ; ; <-ticker.C { peers := r.Node.ConnectedPeers() r.Logger.Info("📊 Status: %d connected peers", peers) } } // announceCapabilitiesOnChange announces capabilities when they change func (r *SharedRuntime) announceCapabilitiesOnChange() { if r.PubSub == nil { r.Logger.Warn("⚠️ Capability broadcast skipped: PubSub not initialized") return } r.Logger.Info("📢 Broadcasting agent capabilities to network") activeTaskCount := 0 if r.TaskTracker != nil { activeTaskCount = len(r.TaskTracker.GetActiveTasks()) } announcement := map[string]interface{}{ "agent_id": r.Config.Agent.ID, "node_id": r.Node.ID().ShortString(), "version": AppVersion, "capabilities": r.Config.Agent.Capabilities, "expertise": r.Config.Agent.Expertise, "models": r.Config.Agent.Models, "specialization": r.Config.Agent.Specialization, "max_tasks": r.Config.Agent.MaxTasks, "current_tasks": activeTaskCount, "timestamp": time.Now().Unix(), "availability": "ready", } if err := r.PubSub.PublishBzzzMessage(pubsub.CapabilityBcast, announcement); err != nil { r.Logger.Error("❌ Failed to broadcast capabilities: %v", err) return } r.Logger.Info("✅ Capabilities broadcast published") // TODO: Watch for live capability changes (role updates, model changes) and re-broadcast } // announceRoleOnStartup announces role when the agent starts func (r *SharedRuntime) announceRoleOnStartup() { role := r.Config.Agent.Role if role == "" { r.Logger.Info("🎭 No agent role configured; skipping role announcement") return } if r.PubSub == nil { r.Logger.Warn("⚠️ Role announcement skipped: PubSub not initialized") return } r.Logger.Info("🎭 Announcing agent role to collaboration mesh") announcement := map[string]interface{}{ "agent_id": r.Config.Agent.ID, "node_id": r.Node.ID().ShortString(), "role": role, "expertise": r.Config.Agent.Expertise, "capabilities": r.Config.Agent.Capabilities, "reports_to": r.Config.Agent.ReportsTo, "specialization": r.Config.Agent.Specialization, "timestamp": time.Now().Unix(), } opts := pubsub.MessageOptions{ FromRole: role, Priority: "medium", ThreadID: fmt.Sprintf("role:%s", role), } if err := r.PubSub.PublishRoleBasedMessage(pubsub.RoleAnnouncement, announcement, opts); err != nil { r.Logger.Error("❌ Failed to announce role: %v", err) return } r.Logger.Info("✅ Role announcement published") } func (r *SharedRuntime) setupHealthChecks(healthManager *health.Manager) { // Add BACKBEAT health check if r.BackbeatIntegration != nil { backbeatCheck := &health.HealthCheck{ Name: "backbeat", Description: "BACKBEAT timing integration health", Interval: 30 * time.Second, Timeout: 10 * time.Second, Enabled: true, Critical: false, Checker: func(ctx context.Context) health.CheckResult { healthInfo := r.BackbeatIntegration.GetHealth() connected, _ := healthInfo["connected"].(bool) result := health.CheckResult{ Healthy: connected, Details: healthInfo, Timestamp: time.Now(), } if connected { result.Message = "BACKBEAT integration healthy and connected" } else { result.Message = "BACKBEAT integration not connected" } return result }, } healthManager.RegisterCheck(backbeatCheck) } // Register enhanced health instrumentation when core subsystems are available if r.PubSub == nil { r.Logger.Warn("⚠️ Skipping enhanced health checks: PubSub not initialized") return } if r.ElectionManager == nil { r.Logger.Warn("⚠️ Skipping enhanced health checks: election manager not ready") return } var replication *dht.ReplicationManager if r.DHTNode != nil { replication = r.DHTNode.ReplicationManager() } enhanced := health.NewEnhancedHealthChecks( healthManager, r.ElectionManager, r.DHTNode, r.PubSub, replication, &simpleLogger{logger: r.Logger}, ) r.EnhancedHealth = enhanced r.Logger.Info("🩺 Enhanced health checks registered") } func (r *SharedRuntime) setupGracefulShutdown(shutdownManager *shutdown.Manager, healthManager *health.Manager) { if shutdownManager == nil { r.Logger.Warn("⚠️ Shutdown manager not initialized; graceful teardown skipped") return } if r.HTTPServer != nil { httpComponent := shutdown.NewGenericComponent("http-api-server", 10, true). SetShutdownFunc(func(ctx context.Context) error { return r.HTTPServer.Stop() }) shutdownManager.Register(httpComponent) } if healthManager != nil { healthComponent := shutdown.NewGenericComponent("health-manager", 15, true). SetShutdownFunc(func(ctx context.Context) error { return healthManager.Stop() }) shutdownManager.Register(healthComponent) } if r.UCXIServer != nil { ucxiComponent := shutdown.NewGenericComponent("ucxi-server", 20, true). SetShutdownFunc(func(ctx context.Context) error { return r.UCXIServer.Stop() }) shutdownManager.Register(ucxiComponent) } if r.PubSub != nil { shutdownManager.Register(shutdown.NewPubSubComponent("pubsub", r.PubSub.Close, 30)) } if r.DHTNode != nil { dhtComponent := shutdown.NewGenericComponent("dht-node", 35, true). SetCloser(r.DHTNode.Close) shutdownManager.Register(dhtComponent) } if r.Node != nil { shutdownManager.Register(shutdown.NewP2PNodeComponent("p2p-node", r.Node.Close, 40)) } if r.ElectionManager != nil { shutdownManager.Register(shutdown.NewElectionManagerComponent("election-manager", r.ElectionManager.Stop, 45)) } if r.BackbeatIntegration != nil { backbeatComponent := shutdown.NewGenericComponent("backbeat-integration", 50, true). SetShutdownFunc(func(ctx context.Context) error { return r.BackbeatIntegration.Stop() }) shutdownManager.Register(backbeatComponent) } r.Logger.Info("🛡️ Graceful shutdown components registered") } // processBriefs polls for council briefs and executes them func (r *SharedRuntime) processBriefs(ctx context.Context) { ticker := time.NewTicker(15 * time.Second) defer ticker.Stop() r.Logger.Info("📦 Brief processing loop started") for { select { case <-ctx.Done(): r.Logger.Info("📦 Brief processing loop stopped") return case <-ticker.C: if r.HTTPServer == nil || r.HTTPServer.CouncilManager == nil { continue } assignment := r.HTTPServer.CouncilManager.GetCurrentAssignment() if assignment == nil || assignment.Brief == nil { continue } // Check if we have a brief to execute brief := assignment.Brief if brief.BriefURL == "" && brief.Summary == "" { continue } r.Logger.Info("📦 Processing design brief for council %s, role %s", assignment.CouncilID, assignment.RoleName) // Execute the brief if err := r.executeBrief(ctx, assignment); err != nil { r.Logger.Error("❌ Failed to execute brief: %v", err) continue } r.Logger.Info("✅ Brief execution completed for council %s", assignment.CouncilID) // Clear the brief after execution to prevent re-execution assignment.Brief = nil } } } // executeBrief executes a council brief using the ExecutionEngine func (r *SharedRuntime) executeBrief(ctx context.Context, assignment *council.RoleAssignment) error { brief := assignment.Brief if brief == nil { return fmt.Errorf("no brief to execute") } // Create execution engine engine := execution.NewTaskExecutionEngine() // Create AI provider factory with proper configuration aiFactory := ai.NewProviderFactory() // Register the configured provider providerConfig := ai.ProviderConfig{ Type: r.Config.AI.Provider, Endpoint: r.Config.AI.Ollama.Endpoint, DefaultModel: "llama3.1:8b", Timeout: r.Config.AI.Ollama.Timeout, } if err := aiFactory.RegisterProvider(r.Config.AI.Provider, providerConfig); err != nil { r.Logger.Warn("⚠️ Failed to register AI provider: %v", err) } // Set role mapping with default provider // This ensures GetProviderForRole() can find a provider for any role roleMapping := ai.RoleModelMapping{ DefaultProvider: r.Config.AI.Provider, FallbackProvider: r.Config.AI.Provider, Roles: make(map[string]ai.RoleConfig), } aiFactory.SetRoleMapping(roleMapping) engineConfig := &execution.EngineConfig{ AIProviderFactory: aiFactory, MaxConcurrentTasks: 1, DefaultTimeout: time.Hour, EnableMetrics: true, LogLevel: "info", } if err := engine.Initialize(ctx, engineConfig); err != nil { return fmt.Errorf("failed to initialize execution engine: %w", err) } defer engine.Shutdown() // Build execution request request := r.buildExecutionRequest(assignment) r.Logger.Info("🚀 Executing brief for council %s, role %s", assignment.CouncilID, assignment.RoleName) // Track task taskID := fmt.Sprintf("council-%s-%s", assignment.CouncilID, assignment.RoleName) r.TaskTracker.AddTask(taskID) defer r.TaskTracker.RemoveTask(taskID) // Execute the task result, err := engine.ExecuteTask(ctx, request) if err != nil { return fmt.Errorf("task execution failed: %w", err) } r.Logger.Info("✅ Task execution successful. Output: %s", result.Output) // Upload results to WHOOSH if err := r.uploadResults(assignment, result); err != nil { r.Logger.Error("⚠️ Failed to upload results to WHOOSH: %v", err) // Don't fail the execution if upload fails } return nil } // buildExecutionRequest converts a council brief to an execution request func (r *SharedRuntime) buildExecutionRequest(assignment *council.RoleAssignment) *execution.TaskExecutionRequest { brief := assignment.Brief // Build task description from brief taskDescription := brief.Summary if taskDescription == "" { taskDescription = "Execute council brief" } // Add additional context additionalContext := map[string]interface{}{ "council_id": assignment.CouncilID, "role_name": assignment.RoleName, "brief_url": brief.BriefURL, "expected_artifacts": brief.ExpectedArtifacts, "hmmm_topic": brief.HMMMTopic, "persona": assignment.Persona, } return &execution.TaskExecutionRequest{ ID: fmt.Sprintf("council-%s-%s", assignment.CouncilID, assignment.RoleName), Type: "council_brief", Description: taskDescription, Context: additionalContext, Requirements: &execution.TaskRequirements{ AIModel: r.Config.AI.Provider, SandboxType: "docker", RequiredTools: []string{}, }, Timeout: time.Hour, } } // uploadResults uploads execution results to WHOOSH func (r *SharedRuntime) uploadResults(assignment *council.RoleAssignment, result *execution.TaskExecutionResult) error { // Get WHOOSH endpoint from environment or config whooshEndpoint := r.Config.WHOOSHAPI.BaseURL if whooshEndpoint == "" { whooshEndpoint = "http://whoosh:8080" } // Build result payload payload := map[string]interface{}{ "council_id": assignment.CouncilID, "role_name": assignment.RoleName, "agent_id": r.Config.Agent.ID, "ucxl_address": assignment.UCXLAddress, "output": result.Output, "artifacts": result.Artifacts, "success": result.Success, "error_message": result.ErrorMessage, "execution_time": result.Metrics.Duration.Seconds(), "timestamp": time.Now().Unix(), } jsonData, err := json.Marshal(payload) if err != nil { return fmt.Errorf("failed to marshal result payload: %w", err) } // Send to WHOOSH url := fmt.Sprintf("%s/api/councils/%s/results", whooshEndpoint, assignment.CouncilID) req, err := http.NewRequest("POST", url, bytes.NewBuffer(jsonData)) if err != nil { return fmt.Errorf("failed to create HTTP request: %w", err) } req.Header.Set("Content-Type", "application/json") client := &http.Client{Timeout: 30 * time.Second} resp, err := client.Do(req) if err != nil { return fmt.Errorf("failed to send results to WHOOSH: %w", err) } defer resp.Body.Close() if resp.StatusCode != http.StatusOK && resp.StatusCode != http.StatusAccepted { return fmt.Errorf("WHOOSH returned status %d", resp.StatusCode) } r.Logger.Info("📤 Results uploaded to WHOOSH for council %s", assignment.CouncilID) return nil }