package runtime import ( "context" "fmt" "time" "chorus/internal/logging" "chorus/pkg/dht" "chorus/pkg/health" "chorus/pkg/shutdown" "chorus/pubsub" ) // simpleLogger implements basic logging for shutdown and health systems type simpleLogger struct { logger logging.Logger } func (l *simpleLogger) Info(msg string, args ...interface{}) { l.logger.Info(msg, args...) } func (l *simpleLogger) Warn(msg string, args ...interface{}) { l.logger.Warn(msg, args...) } func (l *simpleLogger) Error(msg string, args ...interface{}) { l.logger.Error(msg, args...) } // StartAgentMode runs the autonomous agent with all standard behaviors func (r *SharedRuntime) StartAgentMode() error { // Announce capabilities and role go r.announceAvailability() go r.announceCapabilitiesOnChange() go r.announceRoleOnStartup() // Start status reporting go r.statusReporter() r.Logger.Info("🔍 Listening for peers on container network...") r.Logger.Info("📡 Ready for task coordination and meta-discussion") r.Logger.Info("🎯 HMMM collaborative reasoning enabled") // === Comprehensive Health Monitoring & Graceful Shutdown === shutdownManager := shutdown.NewManager(30*time.Second, &simpleLogger{logger: r.Logger}) healthManager := health.NewManager(r.Node.ID().ShortString(), AppVersion, &simpleLogger{logger: r.Logger}) healthManager.SetShutdownManager(shutdownManager) // Register health checks r.setupHealthChecks(healthManager) // Register components for graceful shutdown r.setupGracefulShutdown(shutdownManager, healthManager) // Start health monitoring if err := healthManager.Start(); err != nil { return err } r.HealthManager = healthManager r.Logger.Info("❤️ Health monitoring started") // Start health HTTP server if err := healthManager.StartHTTPServer(r.Config.Network.HealthPort); err != nil { r.Logger.Error("❌ Failed to start health HTTP server: %v", err) } else { r.Logger.Info("🏥 Health endpoints available at http://localhost:%d/health", r.Config.Network.HealthPort) } // Start shutdown manager shutdownManager.Start() r.ShutdownManager = shutdownManager r.Logger.Info("🛡️ Graceful shutdown manager started") r.Logger.Info("✅ CHORUS agent system fully operational with health monitoring") // Wait for graceful shutdown shutdownManager.Wait() r.Logger.Info("✅ CHORUS agent system shutdown completed") return nil } // announceAvailability broadcasts current working status for task assignment func (r *SharedRuntime) announceAvailability() { ticker := time.NewTicker(30 * time.Second) defer ticker.Stop() for ; ; <-ticker.C { currentTasks := r.TaskTracker.GetActiveTasks() maxTasks := r.TaskTracker.GetMaxTasks() isAvailable := len(currentTasks) < maxTasks status := "ready" if len(currentTasks) >= maxTasks { status = "busy" } else if len(currentTasks) > 0 { status = "working" } availability := map[string]interface{}{ "node_id": r.Node.ID().ShortString(), "available_for_work": isAvailable, "current_tasks": len(currentTasks), "max_tasks": maxTasks, "last_activity": time.Now().Unix(), "status": status, "timestamp": time.Now().Unix(), } if err := r.PubSub.PublishBzzzMessage(pubsub.AvailabilityBcast, availability); err != nil { r.Logger.Error("❌ Failed to announce availability: %v", err) } } } // statusReporter provides periodic status updates func (r *SharedRuntime) statusReporter() { ticker := time.NewTicker(60 * time.Second) defer ticker.Stop() for ; ; <-ticker.C { peers := r.Node.ConnectedPeers() r.Logger.Info("📊 Status: %d connected peers", peers) } } // announceCapabilitiesOnChange announces capabilities when they change func (r *SharedRuntime) announceCapabilitiesOnChange() { if r.PubSub == nil { r.Logger.Warn("⚠️ Capability broadcast skipped: PubSub not initialized") return } r.Logger.Info("📢 Broadcasting agent capabilities to network") activeTaskCount := 0 if r.TaskTracker != nil { activeTaskCount = len(r.TaskTracker.GetActiveTasks()) } announcement := map[string]interface{}{ "agent_id": r.Config.Agent.ID, "node_id": r.Node.ID().ShortString(), "version": AppVersion, "capabilities": r.Config.Agent.Capabilities, "expertise": r.Config.Agent.Expertise, "models": r.Config.Agent.Models, "specialization": r.Config.Agent.Specialization, "max_tasks": r.Config.Agent.MaxTasks, "current_tasks": activeTaskCount, "timestamp": time.Now().Unix(), "availability": "ready", } if err := r.PubSub.PublishBzzzMessage(pubsub.CapabilityBcast, announcement); err != nil { r.Logger.Error("❌ Failed to broadcast capabilities: %v", err) return } r.Logger.Info("✅ Capabilities broadcast published") // TODO: Watch for live capability changes (role updates, model changes) and re-broadcast } // announceRoleOnStartup announces role when the agent starts func (r *SharedRuntime) announceRoleOnStartup() { role := r.Config.Agent.Role if role == "" { r.Logger.Info("🎭 No agent role configured; skipping role announcement") return } if r.PubSub == nil { r.Logger.Warn("⚠️ Role announcement skipped: PubSub not initialized") return } r.Logger.Info("🎭 Announcing agent role to collaboration mesh") announcement := map[string]interface{}{ "agent_id": r.Config.Agent.ID, "node_id": r.Node.ID().ShortString(), "role": role, "expertise": r.Config.Agent.Expertise, "capabilities": r.Config.Agent.Capabilities, "reports_to": r.Config.Agent.ReportsTo, "specialization": r.Config.Agent.Specialization, "timestamp": time.Now().Unix(), } opts := pubsub.MessageOptions{ FromRole: role, Priority: "medium", ThreadID: fmt.Sprintf("role:%s", role), } if err := r.PubSub.PublishRoleBasedMessage(pubsub.RoleAnnouncement, announcement, opts); err != nil { r.Logger.Error("❌ Failed to announce role: %v", err) return } r.Logger.Info("✅ Role announcement published") } func (r *SharedRuntime) setupHealthChecks(healthManager *health.Manager) { // Add BACKBEAT health check if r.BackbeatIntegration != nil { backbeatCheck := &health.HealthCheck{ Name: "backbeat", Description: "BACKBEAT timing integration health", Interval: 30 * time.Second, Timeout: 10 * time.Second, Enabled: true, Critical: false, Checker: func(ctx context.Context) health.CheckResult { healthInfo := r.BackbeatIntegration.GetHealth() connected, _ := healthInfo["connected"].(bool) result := health.CheckResult{ Healthy: connected, Details: healthInfo, Timestamp: time.Now(), } if connected { result.Message = "BACKBEAT integration healthy and connected" } else { result.Message = "BACKBEAT integration not connected" } return result }, } healthManager.RegisterCheck(backbeatCheck) } // Register enhanced health instrumentation when core subsystems are available if r.PubSub == nil { r.Logger.Warn("⚠️ Skipping enhanced health checks: PubSub not initialized") return } if r.ElectionManager == nil { r.Logger.Warn("⚠️ Skipping enhanced health checks: election manager not ready") return } var replication *dht.ReplicationManager if r.DHTNode != nil { replication = r.DHTNode.ReplicationManager() } enhanced := health.NewEnhancedHealthChecks( healthManager, r.ElectionManager, r.DHTNode, r.PubSub, replication, &simpleLogger{logger: r.Logger}, ) r.EnhancedHealth = enhanced r.Logger.Info("🩺 Enhanced health checks registered") } func (r *SharedRuntime) setupGracefulShutdown(shutdownManager *shutdown.Manager, healthManager *health.Manager) { if shutdownManager == nil { r.Logger.Warn("⚠️ Shutdown manager not initialized; graceful teardown skipped") return } if r.HTTPServer != nil { httpComponent := shutdown.NewGenericComponent("http-api-server", 10, true). SetShutdownFunc(func(ctx context.Context) error { return r.HTTPServer.Stop() }) shutdownManager.Register(httpComponent) } if healthManager != nil { healthComponent := shutdown.NewGenericComponent("health-manager", 15, true). SetShutdownFunc(func(ctx context.Context) error { return healthManager.Stop() }) shutdownManager.Register(healthComponent) } if r.UCXIServer != nil { ucxiComponent := shutdown.NewGenericComponent("ucxi-server", 20, true). SetShutdownFunc(func(ctx context.Context) error { return r.UCXIServer.Stop() }) shutdownManager.Register(ucxiComponent) } if r.PubSub != nil { shutdownManager.Register(shutdown.NewPubSubComponent("pubsub", r.PubSub.Close, 30)) } if r.DHTNode != nil { dhtComponent := shutdown.NewGenericComponent("dht-node", 35, true). SetCloser(r.DHTNode.Close) shutdownManager.Register(dhtComponent) } if r.Node != nil { shutdownManager.Register(shutdown.NewP2PNodeComponent("p2p-node", r.Node.Close, 40)) } if r.ElectionManager != nil { shutdownManager.Register(shutdown.NewElectionManagerComponent("election-manager", r.ElectionManager.Stop, 45)) } if r.BackbeatIntegration != nil { backbeatComponent := shutdown.NewGenericComponent("backbeat-integration", 50, true). SetShutdownFunc(func(ctx context.Context) error { return r.BackbeatIntegration.Stop() }) shutdownManager.Register(backbeatComponent) } r.Logger.Info("🛡️ Graceful shutdown components registered") }