package leader import ( "context" "fmt" "log" "sync" "time" "chorus.services/bzzz/pkg/election" "chorus.services/bzzz/pkg/health" "chorus.services/bzzz/pkg/metrics" "chorus.services/bzzz/pkg/slurp/intelligence" "chorus.services/bzzz/pkg/slurp/storage" slurpContext "chorus.services/bzzz/pkg/slurp/context" ) // EnhancedLeaderManager provides enhanced leadership lifecycle management for SLURP type EnhancedLeaderManager struct { *LeaderContextManager // Enhanced components healthMonitor *SLURPHealthMonitor metricsCollector *metrics.BZZZMetrics leadershipHistory *LeadershipHistory // Lifecycle management lifecycleState LifecycleState transitionMutex sync.RWMutex // Health probing healthProbes map[string]*HealthProbe probeScheduler *ProbeScheduler // Configuration config *EnhancedManagerConfig // Event handlers onLeadershipGained func(context.Context) error onLeadershipLost func(context.Context) error onHealthDegraded func(*HealthReport) error logger func(string, ...interface{}) } // LifecycleState represents the current state of leadership lifecycle type LifecycleState int const ( StateInitializing LifecycleState = iota StateFollower StateCandidating StateLeader StateTransitioning StateDegradedLeader StateStopping ) // EnhancedManagerConfig provides enhanced configuration options type EnhancedManagerConfig struct { *ManagerConfig // Health monitoring HealthCheckInterval time.Duration HealthDegradationTimeout time.Duration CriticalHealthThreshold float64 // Leadership lifecycle LeadershipTransitionTimeout time.Duration GracefulHandoverTimeout time.Duration StateTransitionRetries int // Performance monitoring MetricsReportingInterval time.Duration PerformanceAlertThreshold time.Duration ResourceUsageAlertThreshold float64 // Probe configuration ProbeSchedulingInterval time.Duration ProbeTimeout time.Duration ProbeFailureThreshold int // Advanced features EnablePredictiveFailover bool EnablePerformanceOptimization bool EnableDetailedMetrics bool } // SLURPHealthMonitor monitors SLURP-specific health metrics type SLURPHealthMonitor struct { mu sync.RWMutex manager *EnhancedLeaderManager healthChecks map[string]*health.HealthCheck lastHealthReport *HealthReport healthHistory []*HealthReport // Health metrics generationSuccessRate float64 averageGenerationTime time.Duration queueHealthScore float64 leadershipStabilityScore float64 config *HealthMonitorConfig } // HealthMonitorConfig configures SLURP health monitoring type HealthMonitorConfig struct { HistoryRetention time.Duration MaxHistoryEntries int HealthReportInterval time.Duration CriticalHealthThreshold float64 WarningHealthThreshold float64 } // HealthReport provides comprehensive health information type HealthReport struct { Timestamp time.Time OverallHealth float64 ComponentHealth map[string]float64 PerformanceMetrics *PerformanceMetrics ResourceUtilization *ResourceUtilization LeadershipMetrics *LeadershipMetrics Issues []HealthIssue Recommendations []HealthRecommendation } // PerformanceMetrics tracks SLURP performance indicators type PerformanceMetrics struct { AverageGenerationTime time.Duration GenerationThroughput float64 SuccessRate float64 QueueLength int ActiveJobs int ErrorRate float64 } // ResourceUtilization tracks resource usage type ResourceUtilization struct { CPUUsage float64 MemoryUsage float64 DiskUsage float64 NetworkBandwidth float64 GoroutineCount int } // LeadershipMetrics tracks leadership-related metrics type LeadershipMetrics struct { LeadershipDuration time.Duration TransitionsCount int64 LastTransitionTime time.Time StabilityScore float64 FailoverCount int64 } // HealthIssue represents a specific health concern type HealthIssue struct { Severity IssueSeverity Component string Description string Impact string Timestamp time.Time Resolved bool } // HealthRecommendation suggests actions to improve health type HealthRecommendation struct { Priority RecommendationPriority Action string Description string Impact string Effort EstimatedEffort } // Issue and recommendation types type IssueSeverity int type RecommendationPriority int type EstimatedEffort int const ( SeverityCritical IssueSeverity = iota SeverityHigh SeverityMedium SeverityLow ) const ( PriorityUrgent RecommendationPriority = iota PriorityHigh PriorityMedium PriorityLow ) const ( EffortLow EstimatedEffort = iota EffortMedium EffortHigh ) // LeadershipHistory tracks leadership events and transitions type LeadershipHistory struct { mu sync.RWMutex events []*LeadershipEvent maxEvents int startTime time.Time } // LeadershipEvent represents a leadership-related event type LeadershipEvent struct { Type LeadershipEventType Timestamp time.Time NodeID string PreviousLeader string Duration time.Duration Reason string Metadata map[string]interface{} } // LeadershipEventType defines types of leadership events type LeadershipEventType int const ( EventTypeElectionStarted LeadershipEventType = iota EventTypeLeaderElected EventTypeLeadershipLost EventTypeFailover EventTypeGracefulTransition EventTypeHealthDegradation EventTypePerformanceAlert ) // HealthProbe defines a health probe configuration type HealthProbe struct { Name string Description string ProbeFunc func(context.Context) *ProbeResult Interval time.Duration Timeout time.Duration FailureThreshold int // State tracking consecutiveFailures int lastProbeTime time.Time lastResult *ProbeResult enabled bool } // ProbeResult contains the result of a health probe type ProbeResult struct { Healthy bool Message string Latency time.Duration Metadata map[string]interface{} Error error Timestamp time.Time } // ProbeScheduler manages the scheduling and execution of health probes type ProbeScheduler struct { mu sync.RWMutex probes map[string]*HealthProbe scheduler *time.Ticker stopCh chan struct{} running bool } // NewEnhancedLeaderManager creates an enhanced leader manager func NewEnhancedLeaderManager( election election.Election, intelligence intelligence.IntelligenceEngine, storage storage.ContextStore, resolver slurpContext.ContextResolver, metricsCollector *metrics.BZZZMetrics, config *EnhancedManagerConfig, ) *EnhancedLeaderManager { if config == nil { config = DefaultEnhancedManagerConfig() } // Create base manager baseManager := NewContextManager(election, nil, intelligence, storage, resolver).(*LeaderContextManager) elm := &EnhancedLeaderManager{ LeaderContextManager: baseManager, metricsCollector: metricsCollector, lifecycleState: StateInitializing, healthProbes: make(map[string]*HealthProbe), config: config, logger: func(msg string, args ...interface{}) { log.Printf("[SLURP-LEADER] "+msg, args...) }, } // Initialize components elm.healthMonitor = NewSLURPHealthMonitor(elm) elm.leadershipHistory = NewLeadershipHistory(1000) elm.probeScheduler = NewProbeScheduler() // Register default health probes elm.registerDefaultHealthProbes() // Start background processes go elm.runLifecycleManager() go elm.runHealthMonitoring() go elm.runMetricsCollection() elm.logger("Enhanced SLURP leader manager initialized") return elm } // DefaultEnhancedManagerConfig returns default enhanced configuration func DefaultEnhancedManagerConfig() *EnhancedManagerConfig { return &EnhancedManagerConfig{ ManagerConfig: DefaultManagerConfig(), HealthCheckInterval: 30 * time.Second, HealthDegradationTimeout: 5 * time.Minute, CriticalHealthThreshold: 0.3, LeadershipTransitionTimeout: 60 * time.Second, GracefulHandoverTimeout: 30 * time.Second, StateTransitionRetries: 3, MetricsReportingInterval: 15 * time.Second, PerformanceAlertThreshold: 2 * time.Minute, ResourceUsageAlertThreshold: 0.85, ProbeSchedulingInterval: 10 * time.Second, ProbeTimeout: 5 * time.Second, ProbeFailureThreshold: 3, EnablePredictiveFailover: true, EnablePerformanceOptimization: true, EnableDetailedMetrics: true, } } // runLifecycleManager manages the leadership lifecycle func (elm *EnhancedLeaderManager) runLifecycleManager() { ticker := time.NewTicker(elm.config.LeadershipCheckInterval) defer ticker.Stop() for { select { case <-ticker.C: elm.processLifecycleTransitions() case <-elm.shutdownChan: elm.handleShutdown() return } } } // processLifecycleTransitions handles state transitions func (elm *EnhancedLeaderManager) processLifecycleTransitions() { elm.transitionMutex.Lock() defer elm.transitionMutex.Unlock() currentState := elm.lifecycleState isLeader := elm.IsLeader() healthScore := elm.healthMonitor.GetOverallHealthScore() // Determine target state var targetState LifecycleState switch currentState { case StateInitializing: if isLeader { targetState = StateLeader } else { targetState = StateFollower } case StateFollower: if isLeader { targetState = StateCandidating } case StateCandidating: if isLeader { targetState = StateLeader } else { targetState = StateFollower } case StateLeader: if !isLeader { targetState = StateFollower } else if healthScore < elm.config.CriticalHealthThreshold { targetState = StateDegradedLeader } case StateDegradedLeader: if !isLeader { targetState = StateFollower } else if healthScore >= elm.config.CriticalHealthThreshold { targetState = StateLeader } default: targetState = currentState } // Execute transition if needed if targetState != currentState { elm.executeStateTransition(currentState, targetState) } } // executeStateTransition performs a state transition func (elm *EnhancedLeaderManager) executeStateTransition(from, to LifecycleState) { elm.logger("Transitioning from %v to %v", from, to) // Record transition event event := &LeadershipEvent{ Type: elm.getEventTypeForTransition(from, to), Timestamp: time.Now(), NodeID: elm.nodeID, Reason: elm.getTransitionReason(from, to), Metadata: make(map[string]interface{}), } elm.leadershipHistory.AddEvent(event) // Execute transition logic switch to { case StateLeader: elm.transitionToLeader(from) case StateFollower: elm.transitionToFollower(from) case StateDegradedLeader: elm.transitionToDegradedLeader(from) } elm.lifecycleState = to // Update metrics if elm.metricsCollector != nil { elm.metricsCollector.IncrementSLURPGenerated("state_transition", "success") } elm.logger("Successfully transitioned to %v", to) } // transitionToLeader handles transition to leader state func (elm *EnhancedLeaderManager) transitionToLeader(fromState LifecycleState) { elm.logger("Becoming SLURP leader") // Start leadership responsibilities elm.startLeadershipDuties() // Enable enhanced health monitoring elm.healthMonitor.EnableLeadershipMonitoring() // Start enhanced probe schedule elm.probeScheduler.EnableLeadershipProbes() // Execute callback if set if elm.onLeadershipGained != nil { go func() { ctx, cancel := context.WithTimeout(context.Background(), elm.config.LeadershipTransitionTimeout) defer cancel() if err := elm.onLeadershipGained(ctx); err != nil { elm.logger("Error in leadership gained callback: %v", err) } }() } } // transitionToFollower handles transition to follower state func (elm *EnhancedLeaderManager) transitionToFollower(fromState LifecycleState) { elm.logger("Becoming SLURP follower") // Stop leadership responsibilities elm.stopLeadershipDuties() // Disable leadership-specific monitoring elm.healthMonitor.DisableLeadershipMonitoring() // Use follower probe schedule elm.probeScheduler.EnableFollowerProbes() // Execute callback if set if elm.onLeadershipLost != nil { go func() { ctx, cancel := context.WithTimeout(context.Background(), elm.config.LeadershipTransitionTimeout) defer cancel() if err := elm.onLeadershipLost(ctx); err != nil { elm.logger("Error in leadership lost callback: %v", err) } }() } } // transitionToDegradedLeader handles transition to degraded leader state func (elm *EnhancedLeaderManager) transitionToDegradedLeader(fromState LifecycleState) { elm.logger("Transitioning to degraded leader state") // Enable degraded mode operations elm.enableDegradedMode() // Increase health monitoring frequency elm.healthMonitor.EnableDegradedMonitoring() // Execute callback if set if elm.onHealthDegraded != nil { go func() { report := elm.healthMonitor.GenerateHealthReport() if err := elm.onHealthDegraded(report); err != nil { elm.logger("Error in health degraded callback: %v", err) } }() } } // startLeadershipDuties starts leader-specific background tasks func (elm *EnhancedLeaderManager) startLeadershipDuties() { // Start context generation processing elm.resumeContextGeneration() // Start cluster coordination elm.startClusterCoordination() // Enable advanced metrics collection if elm.config.EnableDetailedMetrics { elm.enableDetailedMetrics() } } // stopLeadershipDuties stops leader-specific tasks func (elm *EnhancedLeaderManager) stopLeadershipDuties() { // Pause context generation processing elm.pauseContextGeneration() // Stop cluster coordination elm.stopClusterCoordination() // Disable advanced metrics collection elm.disableDetailedMetrics() } // registerDefaultHealthProbes sets up default health monitoring probes func (elm *EnhancedLeaderManager) registerDefaultHealthProbes() { // Generation performance probe elm.RegisterHealthProbe(&HealthProbe{ Name: "slurp_generation_performance", Description: "Monitors context generation performance", ProbeFunc: elm.probeGenerationPerformance, Interval: elm.config.ProbeSchedulingInterval, Timeout: elm.config.ProbeTimeout, FailureThreshold: elm.config.ProbeFailureThreshold, enabled: true, }) // Queue health probe elm.RegisterHealthProbe(&HealthProbe{ Name: "slurp_queue_health", Description: "Monitors generation queue health", ProbeFunc: elm.probeQueueHealth, Interval: elm.config.ProbeSchedulingInterval, Timeout: elm.config.ProbeTimeout, FailureThreshold: elm.config.ProbeFailureThreshold, enabled: true, }) // Resource utilization probe elm.RegisterHealthProbe(&HealthProbe{ Name: "slurp_resource_utilization", Description: "Monitors SLURP resource usage", ProbeFunc: elm.probeResourceUtilization, Interval: elm.config.ProbeSchedulingInterval * 2, Timeout: elm.config.ProbeTimeout, FailureThreshold: elm.config.ProbeFailureThreshold, enabled: true, }) // Leadership stability probe elm.RegisterHealthProbe(&HealthProbe{ Name: "slurp_leadership_stability", Description: "Monitors leadership stability", ProbeFunc: elm.probeLeadershipStability, Interval: elm.config.ProbeSchedulingInterval * 3, Timeout: elm.config.ProbeTimeout, FailureThreshold: elm.config.ProbeFailureThreshold, enabled: true, }) } // RegisterHealthProbe registers a new health probe func (elm *EnhancedLeaderManager) RegisterHealthProbe(probe *HealthProbe) { elm.mu.Lock() defer elm.mu.Unlock() elm.healthProbes[probe.Name] = probe elm.probeScheduler.AddProbe(probe) elm.logger("Registered health probe: %s", probe.Name) } // Probe implementations func (elm *EnhancedLeaderManager) probeGenerationPerformance(ctx context.Context) *ProbeResult { stats, err := elm.GetManagerStats() if err != nil { return &ProbeResult{ Healthy: false, Message: fmt.Sprintf("Failed to get manager stats: %v", err), Error: err, Timestamp: time.Now(), } } // Check if generation time is within acceptable limits acceptable := stats.AverageJobTime < elm.config.PerformanceAlertThreshold return &ProbeResult{ Healthy: acceptable, Message: fmt.Sprintf("Average generation time: %v", stats.AverageJobTime), Metadata: map[string]interface{}{ "average_time": stats.AverageJobTime.Seconds(), "total_jobs": stats.CompletedJobs, "failed_jobs": stats.FailedJobs, }, Timestamp: time.Now(), } } func (elm *EnhancedLeaderManager) probeQueueHealth(ctx context.Context) *ProbeResult { status, err := elm.GetQueueStatus() if err != nil { return &ProbeResult{ Healthy: false, Message: fmt.Sprintf("Failed to get queue status: %v", err), Error: err, Timestamp: time.Now(), } } // Check queue health queueUtilization := float64(status.QueueLength) / float64(status.MaxQueueSize) healthy := queueUtilization < 0.8 // Alert if queue is 80% full return &ProbeResult{ Healthy: healthy, Message: fmt.Sprintf("Queue utilization: %.1f%%", queueUtilization*100), Metadata: map[string]interface{}{ "queue_length": status.QueueLength, "max_size": status.MaxQueueSize, "utilization": queueUtilization, "wait_time": status.AverageWaitTime.Seconds(), }, Timestamp: time.Now(), } } func (elm *EnhancedLeaderManager) probeResourceUtilization(ctx context.Context) *ProbeResult { // This would integrate with actual resource monitoring // For now, simulate resource checks cpuUsage := 0.45 // 45% memoryUsage := 0.62 // 62% healthy := cpuUsage < elm.config.ResourceUsageAlertThreshold && memoryUsage < elm.config.ResourceUsageAlertThreshold return &ProbeResult{ Healthy: healthy, Message: fmt.Sprintf("CPU: %.1f%%, Memory: %.1f%%", cpuUsage*100, memoryUsage*100), Metadata: map[string]interface{}{ "cpu_usage": cpuUsage, "memory_usage": memoryUsage, "threshold": elm.config.ResourceUsageAlertThreshold, }, Timestamp: time.Now(), } } func (elm *EnhancedLeaderManager) probeLeadershipStability(ctx context.Context) *ProbeResult { stabilityScore := elm.leadershipHistory.GetStabilityScore() recentTransitions := elm.leadershipHistory.GetRecentTransitionCount(1 * time.Hour) healthy := stabilityScore > 0.8 && recentTransitions < 3 return &ProbeResult{ Healthy: healthy, Message: fmt.Sprintf("Stability score: %.2f, recent transitions: %d", stabilityScore, recentTransitions), Metadata: map[string]interface{}{ "stability_score": stabilityScore, "recent_transitions": recentTransitions, "leadership_duration": elm.getLeadershipDuration().Seconds(), }, Timestamp: time.Now(), } } // Helper methods func (elm *EnhancedLeaderManager) getEventTypeForTransition(from, to LifecycleState) LeadershipEventType { if to == StateLeader { return EventTypeLeaderElected } else if from == StateLeader { return EventTypeLeadershipLost } return EventTypeElectionStarted } func (elm *EnhancedLeaderManager) getTransitionReason(from, to LifecycleState) string { switch { case from == StateFollower && to == StateLeader: return "elected_as_leader" case from == StateLeader && to == StateFollower: return "lost_leadership" case from == StateLeader && to == StateDegradedLeader: return "health_degradation" case from == StateDegradedLeader && to == StateLeader: return "health_recovered" default: return fmt.Sprintf("transition_%v_to_%v", from, to) } } // Additional helper methods would be implemented here... // Placeholder implementations for methods referenced but not fully defined func (elm *EnhancedLeaderManager) resumeContextGeneration() {} func (elm *EnhancedLeaderManager) pauseContextGeneration() {} func (elm *EnhancedLeaderManager) startClusterCoordination() {} func (elm *EnhancedLeaderManager) stopClusterCoordination() {} func (elm *EnhancedLeaderManager) enableDetailedMetrics() {} func (elm *EnhancedLeaderManager) disableDetailedMetrics() {} func (elm *EnhancedLeaderManager) enableDegradedMode() {} func (elm *EnhancedLeaderManager) runHealthMonitoring() {} func (elm *EnhancedLeaderManager) runMetricsCollection() {} func (elm *EnhancedLeaderManager) handleShutdown() {} func (elm *EnhancedLeaderManager) getLeadershipDuration() time.Duration { return time.Hour } // Stub implementations for component types func NewSLURPHealthMonitor(manager *EnhancedLeaderManager) *SLURPHealthMonitor { return &SLURPHealthMonitor{manager: manager} } func (shm *SLURPHealthMonitor) GetOverallHealthScore() float64 { return 0.9 } func (shm *SLURPHealthMonitor) EnableLeadershipMonitoring() {} func (shm *SLURPHealthMonitor) DisableLeadershipMonitoring() {} func (shm *SLURPHealthMonitor) EnableDegradedMonitoring() {} func (shm *SLURPHealthMonitor) GenerateHealthReport() *HealthReport { return &HealthReport{} } func NewLeadershipHistory(maxEvents int) *LeadershipHistory { return &LeadershipHistory{maxEvents: maxEvents, startTime: time.Now()} } func (lh *LeadershipHistory) AddEvent(event *LeadershipEvent) {} func (lh *LeadershipHistory) GetStabilityScore() float64 { return 0.9 } func (lh *LeadershipHistory) GetRecentTransitionCount(duration time.Duration) int { return 1 } func NewProbeScheduler() *ProbeScheduler { return &ProbeScheduler{ probes: make(map[string]*HealthProbe), stopCh: make(chan struct{}), } } func (ps *ProbeScheduler) AddProbe(probe *HealthProbe) {} func (ps *ProbeScheduler) EnableLeadershipProbes() {} func (ps *ProbeScheduler) EnableFollowerProbes() {}