package runtime import ( "context" "fmt" "time" "chorus.services/bzzz/p2p" "chorus.services/bzzz/pkg/dht" "chorus.services/bzzz/pkg/health" "chorus.services/bzzz/pkg/shutdown" "chorus.services/bzzz/pubsub" ) // setupHealthChecks configures comprehensive health monitoring func (r *StandardRuntime) setupHealthChecks(healthManager *health.Manager, ps *pubsub.PubSub, node *p2p.Node, dhtNode *dht.LibP2PDHT) { // P2P connectivity check (critical) p2pCheck := &health.HealthCheck{ Name: "p2p-connectivity", Description: "P2P network connectivity and peer count", Enabled: true, Critical: true, Interval: 15 * time.Second, Timeout: 10 * time.Second, Checker: func(ctx context.Context) health.CheckResult { connectedPeers := node.ConnectedPeers() minPeers := 1 if connectedPeers < minPeers { return health.CheckResult{ Healthy: false, Message: fmt.Sprintf("Insufficient P2P peers: %d < %d", connectedPeers, minPeers), Details: map[string]interface{}{ "connected_peers": connectedPeers, "min_peers": minPeers, "node_id": node.ID().ShortString(), }, Timestamp: time.Now(), } } return health.CheckResult{ Healthy: true, Message: fmt.Sprintf("P2P connectivity OK: %d peers connected", connectedPeers), Details: map[string]interface{}{ "connected_peers": connectedPeers, "min_peers": minPeers, "node_id": node.ID().ShortString(), }, Timestamp: time.Now(), } }, } healthManager.RegisterCheck(p2pCheck) // Active PubSub health probe pubsubAdapter := health.NewPubSubAdapter(ps) activePubSubCheck := health.CreateActivePubSubCheck(pubsubAdapter) healthManager.RegisterCheck(activePubSubCheck) r.logger.Info("✅ Active PubSub health probe registered") // Active DHT health probe (if DHT is enabled) if dhtNode != nil { dhtAdapter := health.NewDHTAdapter(dhtNode) activeDHTCheck := health.CreateActiveDHTCheck(dhtAdapter) healthManager.RegisterCheck(activeDHTCheck) r.logger.Info("✅ Active DHT health probe registered") } // Legacy static health checks for backward compatibility // PubSub system check (static) pubsubCheck := &health.HealthCheck{ Name: "pubsub-system-static", Description: "Static PubSub messaging system health", Enabled: true, Critical: false, Interval: 30 * time.Second, Timeout: 5 * time.Second, Checker: func(ctx context.Context) health.CheckResult { // Simple health check - basic connectivity return health.CheckResult{ Healthy: true, Message: "PubSub system operational (static check)", Timestamp: time.Now(), } }, } healthManager.RegisterCheck(pubsubCheck) // DHT system check (static, if DHT is enabled) if dhtNode != nil { dhtCheck := &health.HealthCheck{ Name: "dht-system-static", Description: "Static Distributed Hash Table system health", Enabled: true, Critical: false, Interval: 60 * time.Second, Timeout: 15 * time.Second, Checker: func(ctx context.Context) health.CheckResult { // Basic connectivity check return health.CheckResult{ Healthy: true, Message: "DHT system operational (static check)", Details: map[string]interface{}{ "dht_enabled": true, }, Timestamp: time.Now(), } }, } healthManager.RegisterCheck(dhtCheck) } // Memory usage check memoryCheck := health.CreateMemoryCheck(0.85) // Alert if > 85% healthManager.RegisterCheck(memoryCheck) // Disk space check diskCheck := health.CreateDiskSpaceCheck("/tmp", 0.90) // Alert if > 90% healthManager.RegisterCheck(diskCheck) } // setupGracefulShutdown registers all components for proper shutdown func (r *StandardRuntime) setupGracefulShutdown(shutdownManager *shutdown.Manager, healthManager *health.Manager, services *RuntimeServices) { // Health manager (stop health checks early) healthComponent := shutdown.NewGenericComponent("health-manager", 10, true). SetShutdownFunc(func(ctx context.Context) error { return healthManager.Stop() }) shutdownManager.Register(healthComponent) // HTTP servers if services.HTTPServer != nil { httpComponent := shutdown.NewGenericComponent("main-http-server", 20, true). SetShutdownFunc(func(ctx context.Context) error { return services.HTTPServer.Stop() }) shutdownManager.Register(httpComponent) } if services.UCXIServer != nil { ucxiComponent := shutdown.NewGenericComponent("ucxi-server", 21, true). SetShutdownFunc(func(ctx context.Context) error { services.UCXIServer.Stop() return nil }) shutdownManager.Register(ucxiComponent) } // Task coordination system if services.TaskCoordinator != nil { taskComponent := shutdown.NewGenericComponent("task-coordinator", 30, true). SetCloser(func() error { // In real implementation, gracefully stop task coordinator return nil }) shutdownManager.Register(taskComponent) } // DHT system if services.DHT != nil { dhtComponent := shutdown.NewGenericComponent("dht-node", 35, true). SetCloser(func() error { return services.DHT.Close() }) shutdownManager.Register(dhtComponent) } // PubSub system if services.PubSub != nil { pubsubComponent := shutdown.NewGenericComponent("pubsub-system", 40, true). SetCloser(func() error { return services.PubSub.Close() }) shutdownManager.Register(pubsubComponent) } // mDNS discovery if services.MDNSDiscovery != nil { mdnsComponent := shutdown.NewGenericComponent("mdns-discovery", 50, true). SetCloser(func() error { // In real implementation, close mDNS discovery properly return nil }) shutdownManager.Register(mdnsComponent) } // Election manager if services.ElectionManager != nil { electionComponent := shutdown.NewGenericComponent("election-manager", 55, true). SetCloser(func() error { services.ElectionManager.Stop() return nil }) shutdownManager.Register(electionComponent) } // P2P node (close last as other components depend on it) p2pComponent := shutdown.NewP2PNodeComponent("p2p-node", func() error { return services.Node.Close() }, 60) shutdownManager.Register(p2pComponent) // Add shutdown hooks r.setupShutdownHooks(shutdownManager) } // setupShutdownHooks adds hooks for different shutdown phases func (r *StandardRuntime) setupShutdownHooks(shutdownManager *shutdown.Manager) { // Pre-shutdown: Save state and notify peers shutdownManager.AddHook(shutdown.PhasePreShutdown, func(ctx context.Context) error { r.logger.Info("🔄 Pre-shutdown: Notifying peers and saving state...") // In real implementation: notify peers, save critical state return nil }) // Post-shutdown: Final cleanup shutdownManager.AddHook(shutdown.PhasePostShutdown, func(ctx context.Context) error { r.logger.Info("🔄 Post-shutdown: Performing final cleanup...") // In real implementation: flush logs, clean temporary files return nil }) // Cleanup: Final state persistence shutdownManager.AddHook(shutdown.PhaseCleanup, func(ctx context.Context) error { r.logger.Info("🔄 Cleanup: Finalizing shutdown...") // In real implementation: persist final state, cleanup resources return nil }) }