package election import ( "context" "crypto/md5" "encoding/json" "fmt" "log" "sync" "time" "chorus/pkg/config" "chorus/pubsub" libp2p "github.com/libp2p/go-libp2p/core/host" ) // SLURPElectionManager extends ElectionManager with SLURP contextual intelligence capabilities type SLURPElectionManager struct { *ElectionManager // Embed base election manager // SLURP-specific state contextMu sync.RWMutex contextManager ContextManager slurpConfig *SLURPElectionConfig contextCallbacks *ContextLeadershipCallbacks // Context leadership state isContextLeader bool contextTerm int64 contextStartedAt *time.Time lastHealthCheck time.Time // Failover state failoverState *ContextFailoverState transferInProgress bool // Monitoring healthMonitor *ContextHealthMonitor metricsCollector *ContextMetricsCollector // Shutdown coordination contextShutdown chan struct{} contextWg sync.WaitGroup } // NewSLURPElectionManager creates a new SLURP-enhanced election manager func NewSLURPElectionManager( ctx context.Context, cfg *config.Config, host libp2p.Host, ps *pubsub.PubSub, nodeID string, slurpConfig *SLURPElectionConfig, ) *SLURPElectionManager { // Create base election manager baseManager := NewElectionManager(ctx, cfg, host, ps, nodeID) if slurpConfig == nil { slurpConfig = DefaultSLURPElectionConfig() } sem := &SLURPElectionManager{ ElectionManager: baseManager, slurpConfig: slurpConfig, contextShutdown: make(chan struct{}), healthMonitor: NewContextHealthMonitor(), metricsCollector: NewContextMetricsCollector(), } // Override base callbacks to include SLURP handling sem.setupSLURPCallbacks() return sem } // RegisterContextManager registers a SLURP context manager for leader duties func (sem *SLURPElectionManager) RegisterContextManager(manager ContextManager) error { sem.contextMu.Lock() defer sem.contextMu.Unlock() if sem.contextManager != nil { return fmt.Errorf("context manager already registered") } sem.contextManager = manager // If we're already the leader, start context generation if sem.IsCurrentAdmin() && sem.slurpConfig.AutoStartGeneration { go sem.startContextGenerationDelayed() } log.Printf("✅ Context manager registered with SLURP election") return nil } // IsContextLeader returns whether this node is the current context generation leader func (sem *SLURPElectionManager) IsContextLeader() bool { sem.contextMu.RLock() defer sem.contextMu.RUnlock() return sem.isContextLeader && sem.IsCurrentAdmin() } // GetContextManager returns the registered context manager (if leader) func (sem *SLURPElectionManager) GetContextManager() (ContextManager, error) { sem.contextMu.RLock() defer sem.contextMu.RUnlock() if !sem.isContextLeader { return nil, fmt.Errorf("not context leader") } if sem.contextManager == nil { return nil, fmt.Errorf("no context manager registered") } return sem.contextManager, nil } // TransferContextLeadership initiates graceful context leadership transfer func (sem *SLURPElectionManager) TransferContextLeadership(ctx context.Context, targetNodeID string) error { if !sem.IsContextLeader() { return fmt.Errorf("not context leader, cannot transfer") } sem.contextMu.Lock() if sem.transferInProgress { sem.contextMu.Unlock() return fmt.Errorf("transfer already in progress") } sem.transferInProgress = true sem.contextMu.Unlock() defer func() { sem.contextMu.Lock() sem.transferInProgress = false sem.contextMu.Unlock() }() log.Printf("🔄 Initiating context leadership transfer to %s", targetNodeID) // Prepare failover state state, err := sem.PrepareContextFailover(ctx) if err != nil { return fmt.Errorf("failed to prepare context failover: %w", err) } // Send transfer message transferMsg := ElectionMessage{ Type: "context_leadership_transfer", NodeID: sem.nodeID, Timestamp: time.Now(), Term: int(sem.contextTerm), Data: map[string]interface{}{ "target_node": targetNodeID, "failover_state": state, "reason": "manual_transfer", }, } if err := sem.publishElectionMessage(transferMsg); err != nil { return fmt.Errorf("failed to send transfer message: %w", err) } // Stop context generation if err := sem.StopContextGeneration(ctx); err != nil { log.Printf("âš ī¸ Error stopping context generation during transfer: %v", err) } // Trigger new election if needed sem.TriggerElection(TriggerManual) log.Printf("✅ Context leadership transfer initiated") return nil } // GetContextLeaderInfo returns information about current context leader func (sem *SLURPElectionManager) GetContextLeaderInfo() (*LeaderInfo, error) { sem.contextMu.RLock() defer sem.contextMu.RUnlock() leaderID := sem.GetCurrentAdmin() if leaderID == "" { return nil, fmt.Errorf("no current leader") } info := &LeaderInfo{ NodeID: leaderID, Term: sem.contextTerm, ElectedAt: time.Now(), // TODO: Track actual election time // Version: "1.0.0", // TODO: Add Version field to LeaderInfo struct } // TODO: Add missing fields to LeaderInfo struct // if sem.isContextLeader && sem.contextStartedAt != nil { // info.ActiveSince = time.Since(*sem.contextStartedAt) // } // Add generation capacity and load info // if sem.contextManager != nil && sem.isContextLeader { // if status, err := sem.contextManager.GetGenerationStatus(); err == nil { // info.GenerationCapacity = 100 // TODO: Get from config // if status.ActiveTasks > 0 { // info.CurrentLoad = float64(status.ActiveTasks) / float64(info.GenerationCapacity) // } // info.HealthStatus = "healthy" // TODO: Get from health monitor // } // } return info, nil } // StartContextGeneration begins context generation operations (leader only) func (sem *SLURPElectionManager) StartContextGeneration(ctx context.Context) error { if !sem.IsCurrentAdmin() { return fmt.Errorf("not admin, cannot start context generation") } sem.contextMu.Lock() defer sem.contextMu.Unlock() if sem.isContextLeader { return fmt.Errorf("context generation already active") } if sem.contextManager == nil { return fmt.Errorf("no context manager registered") } log.Printf("🚀 Starting context generation as leader") // Mark as context leader sem.isContextLeader = true sem.contextTerm++ now := time.Now() sem.contextStartedAt = &now // Start background processes sem.contextWg.Add(2) go sem.runHealthMonitoring() go sem.runMetricsCollection() // Call callback if sem.contextCallbacks != nil && sem.contextCallbacks.OnBecomeContextLeader != nil { if err := sem.contextCallbacks.OnBecomeContextLeader(ctx, sem.contextTerm); err != nil { log.Printf("âš ī¸ Context leadership callback error: %v", err) } } if sem.contextCallbacks != nil && sem.contextCallbacks.OnContextGenerationStarted != nil { sem.contextCallbacks.OnContextGenerationStarted(sem.nodeID) } // Broadcast context leadership start startMsg := ElectionMessage{ Type: "context_generation_started", NodeID: sem.nodeID, Timestamp: time.Now(), Term: int(sem.contextTerm), Data: map[string]interface{}{ "leader_id": sem.nodeID, }, } if err := sem.publishElectionMessage(startMsg); err != nil { log.Printf("âš ī¸ Failed to broadcast context generation start: %v", err) } log.Printf("✅ Context generation started successfully") return nil } // StopContextGeneration stops context generation operations func (sem *SLURPElectionManager) StopContextGeneration(ctx context.Context) error { sem.contextMu.Lock() isLeader := sem.isContextLeader sem.contextMu.Unlock() if !isLeader { return nil // Already stopped } log.Printf("âšī¸ Stopping context generation") // Signal shutdown to background processes select { case <-sem.contextShutdown: // Already shutting down default: close(sem.contextShutdown) } // Wait for background processes with timeout done := make(chan struct{}) go func() { sem.contextWg.Wait() close(done) }() select { case <-done: log.Printf("✅ Background processes stopped cleanly") case <-time.After(sem.slurpConfig.GenerationStopTimeout): log.Printf("âš ī¸ Timeout waiting for background processes to stop") } sem.contextMu.Lock() sem.isContextLeader = false sem.contextStartedAt = nil sem.contextMu.Unlock() // Call callbacks if sem.contextCallbacks != nil && sem.contextCallbacks.OnLoseContextLeadership != nil { if err := sem.contextCallbacks.OnLoseContextLeadership(ctx, ""); err != nil { log.Printf("âš ī¸ Context leadership loss callback error: %v", err) } } if sem.contextCallbacks != nil && sem.contextCallbacks.OnContextGenerationStopped != nil { sem.contextCallbacks.OnContextGenerationStopped(sem.nodeID, "leadership_lost") } // Broadcast context generation stop stopMsg := ElectionMessage{ Type: "context_generation_stopped", NodeID: sem.nodeID, Timestamp: time.Now(), Term: int(sem.contextTerm), Data: map[string]interface{}{ "reason": "leadership_lost", }, } if err := sem.publishElectionMessage(stopMsg); err != nil { log.Printf("âš ī¸ Failed to broadcast context generation stop: %v", err) } // Reset shutdown channel for next start sem.contextShutdown = make(chan struct{}) log.Printf("✅ Context generation stopped") return nil } // GetContextGenerationStatus returns status of context operations func (sem *SLURPElectionManager) GetContextGenerationStatus() (*GenerationStatus, error) { sem.contextMu.RLock() manager := sem.contextManager // isLeader := sem.isContextLeader // TODO: Use when IsLeader field is added sem.contextMu.RUnlock() if manager == nil { return &GenerationStatus{ // IsLeader: false, // TODO: Add IsLeader field to GenerationStatus LeaderID: sem.GetCurrentAdmin(), // LastUpdate: time.Now(), // TODO: Add LastUpdate field to GenerationStatus }, nil } status, err := manager.GetGenerationStatus() if err != nil { return nil, err } // Override leader status from election state // status.IsLeader = isLeader // TODO: Add IsLeader field to GenerationStatus status.LeaderID = sem.GetCurrentAdmin() return status, nil } // RequestContextGeneration queues a context generation request func (sem *SLURPElectionManager) RequestContextGeneration(req *ContextGenerationRequest) error { sem.contextMu.RLock() manager := sem.contextManager isLeader := sem.isContextLeader sem.contextMu.RUnlock() if !isLeader { return fmt.Errorf("not context leader") } if manager == nil { return fmt.Errorf("no context manager registered") } return manager.RequestContextGeneration(req) } // SetContextLeadershipCallbacks sets callbacks for context leadership changes func (sem *SLURPElectionManager) SetContextLeadershipCallbacks(callbacks *ContextLeadershipCallbacks) error { sem.contextMu.Lock() defer sem.contextMu.Unlock() sem.contextCallbacks = callbacks return nil } // GetContextClusterHealth returns health of context generation cluster func (sem *SLURPElectionManager) GetContextClusterHealth() (*ContextClusterHealth, error) { return sem.healthMonitor.GetClusterHealth(), nil } // PrepareContextFailover prepares context state for leadership failover func (sem *SLURPElectionManager) PrepareContextFailover(ctx context.Context) (*ContextFailoverState, error) { if !sem.IsContextLeader() { return nil, fmt.Errorf("not context leader") } sem.contextMu.Lock() defer sem.contextMu.Unlock() log.Printf("đŸ“Ļ Preparing context failover state") state := &ContextFailoverState{ LeaderID: sem.nodeID, Term: sem.contextTerm, TransferTime: time.Now(), StateVersion: time.Now().Unix(), } // Get current state from context manager if sem.contextManager != nil { // Get queued requests (if supported) // TODO: Add interface method to get queued requests state.QueuedRequests = []*ContextGenerationRequest{} // Get active jobs (if supported) // TODO: Add interface method to get active jobs state.ActiveJobs = make(map[string]*ContextGenerationJob) // Get manager configuration // TODO: Add interface method to get configuration state.ManagerConfig = DefaultManagerConfig() } // Get cluster health snapshot if health, err := sem.GetContextClusterHealth(); err == nil { state.HealthSnapshot = health } // Calculate checksum if data, err := json.Marshal(state); err == nil { hash := md5.Sum(data) state.Checksum = fmt.Sprintf("%x", hash) } sem.failoverState = state log.Printf("✅ Context failover state prepared (version: %d)", state.StateVersion) return state, nil } // ExecuteContextFailover executes context leadership failover func (sem *SLURPElectionManager) ExecuteContextFailover(ctx context.Context, state *ContextFailoverState) error { if sem.IsContextLeader() { return fmt.Errorf("already context leader") } log.Printf("🔄 Executing context failover from state (version: %d)", state.StateVersion) // Validate state first validation, err := sem.ValidateContextState(state) if err != nil { return fmt.Errorf("failed to validate failover state: %w", err) } if !validation.Valid { return fmt.Errorf("invalid failover state: %v", validation.Issues) } sem.contextMu.Lock() defer sem.contextMu.Unlock() // Restore context leadership state sem.isContextLeader = true sem.contextTerm = state.Term + 1 // Increment term now := time.Now() sem.contextStartedAt = &now // TODO: Restore queued requests to context manager // TODO: Restore active jobs to context manager // TODO: Apply manager configuration // Start background processes sem.contextWg.Add(2) go sem.runHealthMonitoring() go sem.runMetricsCollection() log.Printf("✅ Context failover executed successfully (new term: %d)", sem.contextTerm) return nil } // ValidateContextState validates context failover state func (sem *SLURPElectionManager) ValidateContextState(state *ContextFailoverState) (*ContextStateValidation, error) { if state == nil { return &ContextStateValidation{ Valid: false, Issues: []string{"nil failover state"}, ValidatedAt: time.Now(), }, nil } validation := &ContextStateValidation{ ValidatedAt: time.Now(), ValidatedBy: sem.nodeID, Valid: true, } // Check basic fields if state.LeaderID == "" { validation.Issues = append(validation.Issues, "missing leader ID") validation.Valid = false } if state.Term <= 0 { validation.Issues = append(validation.Issues, "invalid term") validation.Valid = false } if state.StateVersion <= 0 { validation.Issues = append(validation.Issues, "invalid state version") validation.Valid = false } // Validate checksum if state.Checksum != "" { tempState := *state tempState.Checksum = "" if data, err := json.Marshal(tempState); err == nil { hash := md5.Sum(data) expectedChecksum := fmt.Sprintf("%x", hash) validation.ChecksumValid = expectedChecksum == state.Checksum if !validation.ChecksumValid { validation.Issues = append(validation.Issues, "checksum validation failed") validation.Valid = false } } } // Validate timestamps if state.TransferTime.IsZero() { validation.Issues = append(validation.Issues, "missing transfer time") validation.TimestampValid = false validation.Valid = false } else { validation.TimestampValid = true } // Version consistency check validation.VersionConsistent = true // TODO: Implement actual version checking // Queue state validation validation.QueueStateValid = state.QueuedRequests != nil if !validation.QueueStateValid { validation.Issues = append(validation.Issues, "invalid queue state") } // Cluster state validation validation.ClusterStateValid = state.ClusterState != nil if !validation.ClusterStateValid { validation.Issues = append(validation.Issues, "missing cluster state") } // Config validation validation.ConfigValid = state.ManagerConfig != nil if !validation.ConfigValid { validation.Issues = append(validation.Issues, "missing manager configuration") } // Set recovery requirements if len(validation.Issues) > 0 { validation.RequiresRecovery = true validation.RecoverySteps = []string{ "Review validation issues", "Perform partial state recovery", "Restart context generation with defaults", } } validation.ValidationDuration = time.Since(validation.ValidatedAt) return validation, nil } // setupSLURPCallbacks configures the base election manager with SLURP-aware callbacks func (sem *SLURPElectionManager) setupSLURPCallbacks() { sem.SetCallbacks( sem.onAdminChangedSLURP, sem.onElectionCompleteSLURP, ) } // onAdminChangedSLURP handles admin changes with SLURP context awareness func (sem *SLURPElectionManager) onAdminChangedSLURP(oldAdmin, newAdmin string) { log.Printf("🔄 Admin changed: %s -> %s (SLURP-aware)", oldAdmin, newAdmin) // If we lost leadership, stop context generation if oldAdmin == sem.nodeID && newAdmin != sem.nodeID { if err := sem.StopContextGeneration(context.Background()); err != nil { log.Printf("âš ī¸ Error stopping context generation: %v", err) } } // If we gained leadership, start context generation if newAdmin == sem.nodeID && oldAdmin != sem.nodeID { if sem.slurpConfig.AutoStartGeneration { go sem.startContextGenerationDelayed() } } // Call context callbacks if sem.contextCallbacks != nil && sem.contextCallbacks.OnContextLeaderChanged != nil { sem.contextCallbacks.OnContextLeaderChanged(oldAdmin, newAdmin, sem.contextTerm) } } // onElectionCompleteSLURP handles election completion with SLURP context awareness func (sem *SLURPElectionManager) onElectionCompleteSLURP(winner string) { log.Printf("🏆 Election complete: %s (SLURP-aware)", winner) // Update context term on election completion sem.contextMu.Lock() sem.contextTerm++ sem.contextMu.Unlock() } // startContextGenerationDelayed starts context generation after a delay func (sem *SLURPElectionManager) startContextGenerationDelayed() { time.Sleep(sem.slurpConfig.GenerationStartDelay) if err := sem.StartContextGeneration(context.Background()); err != nil { log.Printf("âš ī¸ Error starting context generation: %v", err) } } // runHealthMonitoring runs background health monitoring func (sem *SLURPElectionManager) runHealthMonitoring() { defer sem.contextWg.Done() ticker := time.NewTicker(sem.slurpConfig.ContextHealthCheckInterval) defer ticker.Stop() for { select { case <-ticker.C: sem.performHealthCheck() case <-sem.contextShutdown: return } } } // runMetricsCollection runs background metrics collection func (sem *SLURPElectionManager) runMetricsCollection() { defer sem.contextWg.Done() ticker := time.NewTicker(30 * time.Second) // TODO: Make configurable defer ticker.Stop() for { select { case <-ticker.C: sem.collectMetrics() case <-sem.contextShutdown: return } } } // performHealthCheck performs a context health check func (sem *SLURPElectionManager) performHealthCheck() { sem.contextMu.Lock() sem.lastHealthCheck = time.Now() sem.contextMu.Unlock() // TODO: Implement actual health checking logic if sem.contextManager != nil && sem.isContextLeader { if status, err := sem.contextManager.GetGenerationStatus(); err != nil { if sem.contextCallbacks != nil && sem.contextCallbacks.OnContextError != nil { sem.contextCallbacks.OnContextError(err, ErrorSeverityMedium) } } else { // Update health monitor with status sem.healthMonitor.UpdateGenerationStatus(status) } } } // collectMetrics collects context generation metrics func (sem *SLURPElectionManager) collectMetrics() { // TODO: Implement metrics collection sem.metricsCollector.CollectMetrics(sem) } // Stop overrides the base Stop to include SLURP cleanup func (sem *SLURPElectionManager) Stop() { log.Printf("🛑 Stopping SLURP election manager") // Stop context generation first if err := sem.StopContextGeneration(context.Background()); err != nil { log.Printf("âš ī¸ Error stopping context generation: %v", err) } // Stop base election manager sem.ElectionManager.Stop() log.Printf("✅ SLURP election manager stopped") } // Placeholder types for health monitoring and metrics collection // ContextHealthMonitor monitors the health of context generation cluster type ContextHealthMonitor struct { mu sync.RWMutex lastHealth *ContextClusterHealth lastUpdate time.Time } // NewContextHealthMonitor creates a new context health monitor func NewContextHealthMonitor() *ContextHealthMonitor { return &ContextHealthMonitor{ lastUpdate: time.Now(), } } // GetClusterHealth returns current cluster health func (chm *ContextHealthMonitor) GetClusterHealth() *ContextClusterHealth { chm.mu.RLock() defer chm.mu.RUnlock() if chm.lastHealth == nil { return &ContextClusterHealth{ TotalNodes: 1, HealthyNodes: 1, GenerationActive: false, OverallHealthScore: 1.0, LastElection: time.Now(), NextHealthCheck: time.Now().Add(30 * time.Second), } } return chm.lastHealth } // UpdateGenerationStatus updates health based on generation status func (chm *ContextHealthMonitor) UpdateGenerationStatus(status *GenerationStatus) { chm.mu.Lock() defer chm.mu.Unlock() // TODO: Implement health status update based on generation status chm.lastUpdate = time.Now() } // ContextMetricsCollector collects metrics for context operations type ContextMetricsCollector struct { mu sync.RWMutex lastCollection time.Time } // NewContextMetricsCollector creates a new context metrics collector func NewContextMetricsCollector() *ContextMetricsCollector { return &ContextMetricsCollector{} } // CollectMetrics collects current metrics func (cmc *ContextMetricsCollector) CollectMetrics(manager *SLURPElectionManager) { cmc.mu.Lock() defer cmc.mu.Unlock() // TODO: Implement metrics collection cmc.lastCollection = time.Now() }