 9bdcbe0447
			
		
	
	9bdcbe0447
	
	
	
		
			
			Major integrations and fixes: - Added BACKBEAT SDK integration for P2P operation timing - Implemented beat-aware status tracking for distributed operations - Added Docker secrets support for secure license management - Resolved KACHING license validation via HTTPS/TLS - Updated docker-compose configuration for clean stack deployment - Disabled rollback policies to prevent deployment failures - Added license credential storage (CHORUS-DEV-MULTI-001) Technical improvements: - BACKBEAT P2P operation tracking with phase management - Enhanced configuration system with file-based secrets - Improved error handling for license validation - Clean separation of KACHING and CHORUS deployment stacks 🤖 Generated with [Claude Code](https://claude.ai/code) Co-Authored-By: Claude <noreply@anthropic.com>
		
			
				
	
	
		
			772 lines
		
	
	
		
			22 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
			
		
		
	
	
			772 lines
		
	
	
		
			22 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
| package election
 | |
| 
 | |
| import (
 | |
| 	"context"
 | |
| 	"crypto/md5"
 | |
| 	"encoding/json"
 | |
| 	"fmt"
 | |
| 	"log"
 | |
| 	"sync"
 | |
| 	"time"
 | |
| 
 | |
| 	"chorus/pkg/config"
 | |
| 	"chorus/pubsub"
 | |
| 	libp2p "github.com/libp2p/go-libp2p/core/host"
 | |
| )
 | |
| 
 | |
| // SLURPElectionManager extends ElectionManager with SLURP contextual intelligence capabilities
 | |
| type SLURPElectionManager struct {
 | |
| 	*ElectionManager // Embed base election manager
 | |
| 	
 | |
| 	// SLURP-specific state
 | |
| 	contextMu          sync.RWMutex
 | |
| 	contextManager     ContextManager
 | |
| 	slurpConfig        *SLURPElectionConfig
 | |
| 	contextCallbacks   *ContextLeadershipCallbacks
 | |
| 	
 | |
| 	// Context leadership state
 | |
| 	isContextLeader    bool
 | |
| 	contextTerm        int64
 | |
| 	contextStartedAt   *time.Time
 | |
| 	lastHealthCheck    time.Time
 | |
| 	
 | |
| 	// Failover state
 | |
| 	failoverState      *ContextFailoverState
 | |
| 	transferInProgress bool
 | |
| 	
 | |
| 	// Monitoring
 | |
| 	healthMonitor      *ContextHealthMonitor
 | |
| 	metricsCollector   *ContextMetricsCollector
 | |
| 	
 | |
| 	// Shutdown coordination
 | |
| 	contextShutdown    chan struct{}
 | |
| 	contextWg          sync.WaitGroup
 | |
| }
 | |
| 
 | |
| // NewSLURPElectionManager creates a new SLURP-enhanced election manager
 | |
| func NewSLURPElectionManager(
 | |
| 	ctx context.Context,
 | |
| 	cfg *config.Config,
 | |
| 	host libp2p.Host,
 | |
| 	ps *pubsub.PubSub,
 | |
| 	nodeID string,
 | |
| 	slurpConfig *SLURPElectionConfig,
 | |
| ) *SLURPElectionManager {
 | |
| 	// Create base election manager
 | |
| 	baseManager := NewElectionManager(ctx, cfg, host, ps, nodeID)
 | |
| 	
 | |
| 	if slurpConfig == nil {
 | |
| 		slurpConfig = DefaultSLURPElectionConfig()
 | |
| 	}
 | |
| 	
 | |
| 	sem := &SLURPElectionManager{
 | |
| 		ElectionManager:    baseManager,
 | |
| 		slurpConfig:        slurpConfig,
 | |
| 		contextShutdown:    make(chan struct{}),
 | |
| 		healthMonitor:      NewContextHealthMonitor(),
 | |
| 		metricsCollector:   NewContextMetricsCollector(),
 | |
| 	}
 | |
| 	
 | |
| 	// Override base callbacks to include SLURP handling
 | |
| 	sem.setupSLURPCallbacks()
 | |
| 	
 | |
| 	return sem
 | |
| }
 | |
| 
 | |
| // RegisterContextManager registers a SLURP context manager for leader duties
 | |
| func (sem *SLURPElectionManager) RegisterContextManager(manager ContextManager) error {
 | |
| 	sem.contextMu.Lock()
 | |
| 	defer sem.contextMu.Unlock()
 | |
| 	
 | |
| 	if sem.contextManager != nil {
 | |
| 		return fmt.Errorf("context manager already registered")
 | |
| 	}
 | |
| 	
 | |
| 	sem.contextManager = manager
 | |
| 	
 | |
| 	// If we're already the leader, start context generation
 | |
| 	if sem.IsCurrentAdmin() && sem.slurpConfig.AutoStartGeneration {
 | |
| 		go sem.startContextGenerationDelayed()
 | |
| 	}
 | |
| 	
 | |
| 	log.Printf("✅ Context manager registered with SLURP election")
 | |
| 	return nil
 | |
| }
 | |
| 
 | |
| // IsContextLeader returns whether this node is the current context generation leader
 | |
| func (sem *SLURPElectionManager) IsContextLeader() bool {
 | |
| 	sem.contextMu.RLock()
 | |
| 	defer sem.contextMu.RUnlock()
 | |
| 	return sem.isContextLeader && sem.IsCurrentAdmin()
 | |
| }
 | |
| 
 | |
| // GetContextManager returns the registered context manager (if leader)
 | |
| func (sem *SLURPElectionManager) GetContextManager() (ContextManager, error) {
 | |
| 	sem.contextMu.RLock()
 | |
| 	defer sem.contextMu.RUnlock()
 | |
| 	
 | |
| 	if !sem.isContextLeader {
 | |
| 		return nil, fmt.Errorf("not context leader")
 | |
| 	}
 | |
| 	
 | |
| 	if sem.contextManager == nil {
 | |
| 		return nil, fmt.Errorf("no context manager registered")
 | |
| 	}
 | |
| 	
 | |
| 	return sem.contextManager, nil
 | |
| }
 | |
| 
 | |
| // TransferContextLeadership initiates graceful context leadership transfer
 | |
| func (sem *SLURPElectionManager) TransferContextLeadership(ctx context.Context, targetNodeID string) error {
 | |
| 	if !sem.IsContextLeader() {
 | |
| 		return fmt.Errorf("not context leader, cannot transfer")
 | |
| 	}
 | |
| 	
 | |
| 	sem.contextMu.Lock()
 | |
| 	if sem.transferInProgress {
 | |
| 		sem.contextMu.Unlock()
 | |
| 		return fmt.Errorf("transfer already in progress")
 | |
| 	}
 | |
| 	sem.transferInProgress = true
 | |
| 	sem.contextMu.Unlock()
 | |
| 	
 | |
| 	defer func() {
 | |
| 		sem.contextMu.Lock()
 | |
| 		sem.transferInProgress = false
 | |
| 		sem.contextMu.Unlock()
 | |
| 	}()
 | |
| 	
 | |
| 	log.Printf("🔄 Initiating context leadership transfer to %s", targetNodeID)
 | |
| 	
 | |
| 	// Prepare failover state
 | |
| 	state, err := sem.PrepareContextFailover(ctx)
 | |
| 	if err != nil {
 | |
| 		return fmt.Errorf("failed to prepare context failover: %w", err)
 | |
| 	}
 | |
| 	
 | |
| 	// Send transfer message
 | |
| 	transferMsg := ElectionMessage{
 | |
| 		Type:      "context_leadership_transfer",
 | |
| 		NodeID:    sem.nodeID,
 | |
| 		Timestamp: time.Now(),
 | |
| 		Term:      int(sem.contextTerm),
 | |
| 		Data: map[string]interface{}{
 | |
| 			"target_node":    targetNodeID,
 | |
| 			"failover_state": state,
 | |
| 			"reason":         "manual_transfer",
 | |
| 		},
 | |
| 	}
 | |
| 	
 | |
| 	if err := sem.publishElectionMessage(transferMsg); err != nil {
 | |
| 		return fmt.Errorf("failed to send transfer message: %w", err)
 | |
| 	}
 | |
| 	
 | |
| 	// Stop context generation
 | |
| 	if err := sem.StopContextGeneration(ctx); err != nil {
 | |
| 		log.Printf("⚠️ Error stopping context generation during transfer: %v", err)
 | |
| 	}
 | |
| 	
 | |
| 	// Trigger new election if needed
 | |
| 	sem.TriggerElection(TriggerManual)
 | |
| 	
 | |
| 	log.Printf("✅ Context leadership transfer initiated")
 | |
| 	return nil
 | |
| }
 | |
| 
 | |
| // GetContextLeaderInfo returns information about current context leader
 | |
| func (sem *SLURPElectionManager) GetContextLeaderInfo() (*LeaderInfo, error) {
 | |
| 	sem.contextMu.RLock()
 | |
| 	defer sem.contextMu.RUnlock()
 | |
| 	
 | |
| 	leaderID := sem.GetCurrentAdmin()
 | |
| 	if leaderID == "" {
 | |
| 		return nil, fmt.Errorf("no current leader")
 | |
| 	}
 | |
| 	
 | |
| 	info := &LeaderInfo{
 | |
| 		NodeID:     leaderID,
 | |
| 		Term:       sem.contextTerm,
 | |
| 		ElectedAt:  time.Now(), // TODO: Track actual election time
 | |
| 		// Version:    "1.0.0",    // TODO: Add Version field to LeaderInfo struct
 | |
| 	}
 | |
| 	
 | |
| 	// TODO: Add missing fields to LeaderInfo struct
 | |
| 	// if sem.isContextLeader && sem.contextStartedAt != nil {
 | |
| 	//	info.ActiveSince = time.Since(*sem.contextStartedAt)
 | |
| 	// }
 | |
| 	
 | |
| 	// Add generation capacity and load info
 | |
| 	// if sem.contextManager != nil && sem.isContextLeader {
 | |
| 	//	if status, err := sem.contextManager.GetGenerationStatus(); err == nil {
 | |
| 	//		info.GenerationCapacity = 100 // TODO: Get from config
 | |
| 	//		if status.ActiveTasks > 0 {
 | |
| 	//			info.CurrentLoad = float64(status.ActiveTasks) / float64(info.GenerationCapacity)
 | |
| 	//		}
 | |
| 	//		info.HealthStatus = "healthy" // TODO: Get from health monitor
 | |
| 	//	}
 | |
| 	// }
 | |
| 	
 | |
| 	return info, nil
 | |
| }
 | |
| 
 | |
| // StartContextGeneration begins context generation operations (leader only)
 | |
| func (sem *SLURPElectionManager) StartContextGeneration(ctx context.Context) error {
 | |
| 	if !sem.IsCurrentAdmin() {
 | |
| 		return fmt.Errorf("not admin, cannot start context generation")
 | |
| 	}
 | |
| 	
 | |
| 	sem.contextMu.Lock()
 | |
| 	defer sem.contextMu.Unlock()
 | |
| 	
 | |
| 	if sem.isContextLeader {
 | |
| 		return fmt.Errorf("context generation already active")
 | |
| 	}
 | |
| 	
 | |
| 	if sem.contextManager == nil {
 | |
| 		return fmt.Errorf("no context manager registered")
 | |
| 	}
 | |
| 	
 | |
| 	log.Printf("🚀 Starting context generation as leader")
 | |
| 	
 | |
| 	// Mark as context leader
 | |
| 	sem.isContextLeader = true
 | |
| 	sem.contextTerm++
 | |
| 	now := time.Now()
 | |
| 	sem.contextStartedAt = &now
 | |
| 	
 | |
| 	// Start background processes
 | |
| 	sem.contextWg.Add(2)
 | |
| 	go sem.runHealthMonitoring()
 | |
| 	go sem.runMetricsCollection()
 | |
| 	
 | |
| 	// Call callback
 | |
| 	if sem.contextCallbacks != nil && sem.contextCallbacks.OnBecomeContextLeader != nil {
 | |
| 		if err := sem.contextCallbacks.OnBecomeContextLeader(ctx, sem.contextTerm); err != nil {
 | |
| 			log.Printf("⚠️ Context leadership callback error: %v", err)
 | |
| 		}
 | |
| 	}
 | |
| 	
 | |
| 	if sem.contextCallbacks != nil && sem.contextCallbacks.OnContextGenerationStarted != nil {
 | |
| 		sem.contextCallbacks.OnContextGenerationStarted(sem.nodeID)
 | |
| 	}
 | |
| 	
 | |
| 	// Broadcast context leadership start
 | |
| 	startMsg := ElectionMessage{
 | |
| 		Type:      "context_generation_started",
 | |
| 		NodeID:    sem.nodeID,
 | |
| 		Timestamp: time.Now(),
 | |
| 		Term:      int(sem.contextTerm),
 | |
| 		Data: map[string]interface{}{
 | |
| 			"leader_id": sem.nodeID,
 | |
| 		},
 | |
| 	}
 | |
| 	
 | |
| 	if err := sem.publishElectionMessage(startMsg); err != nil {
 | |
| 		log.Printf("⚠️ Failed to broadcast context generation start: %v", err)
 | |
| 	}
 | |
| 	
 | |
| 	log.Printf("✅ Context generation started successfully")
 | |
| 	return nil
 | |
| }
 | |
| 
 | |
| // StopContextGeneration stops context generation operations
 | |
| func (sem *SLURPElectionManager) StopContextGeneration(ctx context.Context) error {
 | |
| 	sem.contextMu.Lock()
 | |
| 	isLeader := sem.isContextLeader
 | |
| 	sem.contextMu.Unlock()
 | |
| 	
 | |
| 	if !isLeader {
 | |
| 		return nil // Already stopped
 | |
| 	}
 | |
| 	
 | |
| 	log.Printf("⏹️ Stopping context generation")
 | |
| 	
 | |
| 	// Signal shutdown to background processes
 | |
| 	select {
 | |
| 	case <-sem.contextShutdown:
 | |
| 		// Already shutting down
 | |
| 	default:
 | |
| 		close(sem.contextShutdown)
 | |
| 	}
 | |
| 	
 | |
| 	// Wait for background processes with timeout
 | |
| 	done := make(chan struct{})
 | |
| 	go func() {
 | |
| 		sem.contextWg.Wait()
 | |
| 		close(done)
 | |
| 	}()
 | |
| 	
 | |
| 	select {
 | |
| 	case <-done:
 | |
| 		log.Printf("✅ Background processes stopped cleanly")
 | |
| 	case <-time.After(sem.slurpConfig.GenerationStopTimeout):
 | |
| 		log.Printf("⚠️ Timeout waiting for background processes to stop")
 | |
| 	}
 | |
| 	
 | |
| 	sem.contextMu.Lock()
 | |
| 	sem.isContextLeader = false
 | |
| 	sem.contextStartedAt = nil
 | |
| 	sem.contextMu.Unlock()
 | |
| 	
 | |
| 	// Call callbacks
 | |
| 	if sem.contextCallbacks != nil && sem.contextCallbacks.OnLoseContextLeadership != nil {
 | |
| 		if err := sem.contextCallbacks.OnLoseContextLeadership(ctx, ""); err != nil {
 | |
| 			log.Printf("⚠️ Context leadership loss callback error: %v", err)
 | |
| 		}
 | |
| 	}
 | |
| 	
 | |
| 	if sem.contextCallbacks != nil && sem.contextCallbacks.OnContextGenerationStopped != nil {
 | |
| 		sem.contextCallbacks.OnContextGenerationStopped(sem.nodeID, "leadership_lost")
 | |
| 	}
 | |
| 	
 | |
| 	// Broadcast context generation stop
 | |
| 	stopMsg := ElectionMessage{
 | |
| 		Type:      "context_generation_stopped",
 | |
| 		NodeID:    sem.nodeID,
 | |
| 		Timestamp: time.Now(),
 | |
| 		Term:      int(sem.contextTerm),
 | |
| 		Data: map[string]interface{}{
 | |
| 			"reason": "leadership_lost",
 | |
| 		},
 | |
| 	}
 | |
| 	
 | |
| 	if err := sem.publishElectionMessage(stopMsg); err != nil {
 | |
| 		log.Printf("⚠️ Failed to broadcast context generation stop: %v", err)
 | |
| 	}
 | |
| 	
 | |
| 	// Reset shutdown channel for next start
 | |
| 	sem.contextShutdown = make(chan struct{})
 | |
| 	
 | |
| 	log.Printf("✅ Context generation stopped")
 | |
| 	return nil
 | |
| }
 | |
| 
 | |
| // GetContextGenerationStatus returns status of context operations
 | |
| func (sem *SLURPElectionManager) GetContextGenerationStatus() (*GenerationStatus, error) {
 | |
| 	sem.contextMu.RLock()
 | |
| 	manager := sem.contextManager
 | |
| 	// isLeader := sem.isContextLeader // TODO: Use when IsLeader field is added
 | |
| 	sem.contextMu.RUnlock()
 | |
| 	
 | |
| 	if manager == nil {
 | |
| 		return &GenerationStatus{
 | |
| 			// IsLeader:   false,      // TODO: Add IsLeader field to GenerationStatus
 | |
| 			LeaderID:   sem.GetCurrentAdmin(),
 | |
| 			// LastUpdate: time.Now(), // TODO: Add LastUpdate field to GenerationStatus
 | |
| 		}, nil
 | |
| 	}
 | |
| 	
 | |
| 	status, err := manager.GetGenerationStatus()
 | |
| 	if err != nil {
 | |
| 		return nil, err
 | |
| 	}
 | |
| 	
 | |
| 	// Override leader status from election state
 | |
| 	// status.IsLeader = isLeader // TODO: Add IsLeader field to GenerationStatus
 | |
| 	status.LeaderID = sem.GetCurrentAdmin()
 | |
| 	
 | |
| 	return status, nil
 | |
| }
 | |
| 
 | |
| // RequestContextGeneration queues a context generation request
 | |
| func (sem *SLURPElectionManager) RequestContextGeneration(req *ContextGenerationRequest) error {
 | |
| 	sem.contextMu.RLock()
 | |
| 	manager := sem.contextManager
 | |
| 	isLeader := sem.isContextLeader
 | |
| 	sem.contextMu.RUnlock()
 | |
| 	
 | |
| 	if !isLeader {
 | |
| 		return fmt.Errorf("not context leader")
 | |
| 	}
 | |
| 	
 | |
| 	if manager == nil {
 | |
| 		return fmt.Errorf("no context manager registered")
 | |
| 	}
 | |
| 	
 | |
| 	return manager.RequestContextGeneration(req)
 | |
| }
 | |
| 
 | |
| // SetContextLeadershipCallbacks sets callbacks for context leadership changes
 | |
| func (sem *SLURPElectionManager) SetContextLeadershipCallbacks(callbacks *ContextLeadershipCallbacks) error {
 | |
| 	sem.contextMu.Lock()
 | |
| 	defer sem.contextMu.Unlock()
 | |
| 	
 | |
| 	sem.contextCallbacks = callbacks
 | |
| 	return nil
 | |
| }
 | |
| 
 | |
| // GetContextClusterHealth returns health of context generation cluster
 | |
| func (sem *SLURPElectionManager) GetContextClusterHealth() (*ContextClusterHealth, error) {
 | |
| 	return sem.healthMonitor.GetClusterHealth(), nil
 | |
| }
 | |
| 
 | |
| // PrepareContextFailover prepares context state for leadership failover
 | |
| func (sem *SLURPElectionManager) PrepareContextFailover(ctx context.Context) (*ContextFailoverState, error) {
 | |
| 	if !sem.IsContextLeader() {
 | |
| 		return nil, fmt.Errorf("not context leader")
 | |
| 	}
 | |
| 	
 | |
| 	sem.contextMu.Lock()
 | |
| 	defer sem.contextMu.Unlock()
 | |
| 	
 | |
| 	log.Printf("📦 Preparing context failover state")
 | |
| 	
 | |
| 	state := &ContextFailoverState{
 | |
| 		LeaderID:     sem.nodeID,
 | |
| 		Term:         sem.contextTerm,
 | |
| 		TransferTime: time.Now(),
 | |
| 		StateVersion: time.Now().Unix(),
 | |
| 	}
 | |
| 	
 | |
| 	// Get current state from context manager
 | |
| 	if sem.contextManager != nil {
 | |
| 		// Get queued requests (if supported)
 | |
| 		// TODO: Add interface method to get queued requests
 | |
| 		state.QueuedRequests = []*ContextGenerationRequest{}
 | |
| 		
 | |
| 		// Get active jobs (if supported)
 | |
| 		// TODO: Add interface method to get active jobs
 | |
| 		state.ActiveJobs = make(map[string]*ContextGenerationJob)
 | |
| 		
 | |
| 		// Get manager configuration
 | |
| 		// TODO: Add interface method to get configuration
 | |
| 		state.ManagerConfig = DefaultManagerConfig()
 | |
| 	}
 | |
| 	
 | |
| 	// Get cluster health snapshot
 | |
| 	if health, err := sem.GetContextClusterHealth(); err == nil {
 | |
| 		state.HealthSnapshot = health
 | |
| 	}
 | |
| 	
 | |
| 	// Calculate checksum
 | |
| 	if data, err := json.Marshal(state); err == nil {
 | |
| 		hash := md5.Sum(data)
 | |
| 		state.Checksum = fmt.Sprintf("%x", hash)
 | |
| 	}
 | |
| 	
 | |
| 	sem.failoverState = state
 | |
| 	
 | |
| 	log.Printf("✅ Context failover state prepared (version: %d)", state.StateVersion)
 | |
| 	return state, nil
 | |
| }
 | |
| 
 | |
| // ExecuteContextFailover executes context leadership failover
 | |
| func (sem *SLURPElectionManager) ExecuteContextFailover(ctx context.Context, state *ContextFailoverState) error {
 | |
| 	if sem.IsContextLeader() {
 | |
| 		return fmt.Errorf("already context leader")
 | |
| 	}
 | |
| 	
 | |
| 	log.Printf("🔄 Executing context failover from state (version: %d)", state.StateVersion)
 | |
| 	
 | |
| 	// Validate state first
 | |
| 	validation, err := sem.ValidateContextState(state)
 | |
| 	if err != nil {
 | |
| 		return fmt.Errorf("failed to validate failover state: %w", err)
 | |
| 	}
 | |
| 	
 | |
| 	if !validation.Valid {
 | |
| 		return fmt.Errorf("invalid failover state: %v", validation.Issues)
 | |
| 	}
 | |
| 	
 | |
| 	sem.contextMu.Lock()
 | |
| 	defer sem.contextMu.Unlock()
 | |
| 	
 | |
| 	// Restore context leadership state
 | |
| 	sem.isContextLeader = true
 | |
| 	sem.contextTerm = state.Term + 1 // Increment term
 | |
| 	now := time.Now()
 | |
| 	sem.contextStartedAt = &now
 | |
| 	
 | |
| 	// TODO: Restore queued requests to context manager
 | |
| 	// TODO: Restore active jobs to context manager
 | |
| 	// TODO: Apply manager configuration
 | |
| 	
 | |
| 	// Start background processes
 | |
| 	sem.contextWg.Add(2)
 | |
| 	go sem.runHealthMonitoring()
 | |
| 	go sem.runMetricsCollection()
 | |
| 	
 | |
| 	log.Printf("✅ Context failover executed successfully (new term: %d)", sem.contextTerm)
 | |
| 	return nil
 | |
| }
 | |
| 
 | |
| // ValidateContextState validates context failover state
 | |
| func (sem *SLURPElectionManager) ValidateContextState(state *ContextFailoverState) (*ContextStateValidation, error) {
 | |
| 	if state == nil {
 | |
| 		return &ContextStateValidation{
 | |
| 			Valid:      false,
 | |
| 			Issues:     []string{"nil failover state"},
 | |
| 			ValidatedAt: time.Now(),
 | |
| 		}, nil
 | |
| 	}
 | |
| 	
 | |
| 	validation := &ContextStateValidation{
 | |
| 		ValidatedAt: time.Now(),
 | |
| 		ValidatedBy: sem.nodeID,
 | |
| 		Valid:       true,
 | |
| 	}
 | |
| 	
 | |
| 	// Check basic fields
 | |
| 	if state.LeaderID == "" {
 | |
| 		validation.Issues = append(validation.Issues, "missing leader ID")
 | |
| 		validation.Valid = false
 | |
| 	}
 | |
| 	
 | |
| 	if state.Term <= 0 {
 | |
| 		validation.Issues = append(validation.Issues, "invalid term")
 | |
| 		validation.Valid = false
 | |
| 	}
 | |
| 	
 | |
| 	if state.StateVersion <= 0 {
 | |
| 		validation.Issues = append(validation.Issues, "invalid state version")
 | |
| 		validation.Valid = false
 | |
| 	}
 | |
| 	
 | |
| 	// Validate checksum
 | |
| 	if state.Checksum != "" {
 | |
| 		tempState := *state
 | |
| 		tempState.Checksum = ""
 | |
| 		if data, err := json.Marshal(tempState); err == nil {
 | |
| 			hash := md5.Sum(data)
 | |
| 			expectedChecksum := fmt.Sprintf("%x", hash)
 | |
| 			validation.ChecksumValid = expectedChecksum == state.Checksum
 | |
| 			if !validation.ChecksumValid {
 | |
| 				validation.Issues = append(validation.Issues, "checksum validation failed")
 | |
| 				validation.Valid = false
 | |
| 			}
 | |
| 		}
 | |
| 	}
 | |
| 	
 | |
| 	// Validate timestamps
 | |
| 	if state.TransferTime.IsZero() {
 | |
| 		validation.Issues = append(validation.Issues, "missing transfer time")
 | |
| 		validation.TimestampValid = false
 | |
| 		validation.Valid = false
 | |
| 	} else {
 | |
| 		validation.TimestampValid = true
 | |
| 	}
 | |
| 	
 | |
| 	// Version consistency check
 | |
| 	validation.VersionConsistent = true // TODO: Implement actual version checking
 | |
| 	
 | |
| 	// Queue state validation
 | |
| 	validation.QueueStateValid = state.QueuedRequests != nil
 | |
| 	if !validation.QueueStateValid {
 | |
| 		validation.Issues = append(validation.Issues, "invalid queue state")
 | |
| 	}
 | |
| 	
 | |
| 	// Cluster state validation
 | |
| 	validation.ClusterStateValid = state.ClusterState != nil
 | |
| 	if !validation.ClusterStateValid {
 | |
| 		validation.Issues = append(validation.Issues, "missing cluster state")
 | |
| 	}
 | |
| 	
 | |
| 	// Config validation
 | |
| 	validation.ConfigValid = state.ManagerConfig != nil
 | |
| 	if !validation.ConfigValid {
 | |
| 		validation.Issues = append(validation.Issues, "missing manager configuration")
 | |
| 	}
 | |
| 	
 | |
| 	// Set recovery requirements
 | |
| 	if len(validation.Issues) > 0 {
 | |
| 		validation.RequiresRecovery = true
 | |
| 		validation.RecoverySteps = []string{
 | |
| 			"Review validation issues",
 | |
| 			"Perform partial state recovery",
 | |
| 			"Restart context generation with defaults",
 | |
| 		}
 | |
| 	}
 | |
| 	
 | |
| 	validation.ValidationDuration = time.Since(validation.ValidatedAt)
 | |
| 	
 | |
| 	return validation, nil
 | |
| }
 | |
| 
 | |
| // setupSLURPCallbacks configures the base election manager with SLURP-aware callbacks
 | |
| func (sem *SLURPElectionManager) setupSLURPCallbacks() {
 | |
| 	sem.SetCallbacks(
 | |
| 		sem.onAdminChangedSLURP,
 | |
| 		sem.onElectionCompleteSLURP,
 | |
| 	)
 | |
| }
 | |
| 
 | |
| // onAdminChangedSLURP handles admin changes with SLURP context awareness
 | |
| func (sem *SLURPElectionManager) onAdminChangedSLURP(oldAdmin, newAdmin string) {
 | |
| 	log.Printf("🔄 Admin changed: %s -> %s (SLURP-aware)", oldAdmin, newAdmin)
 | |
| 	
 | |
| 	// If we lost leadership, stop context generation
 | |
| 	if oldAdmin == sem.nodeID && newAdmin != sem.nodeID {
 | |
| 		if err := sem.StopContextGeneration(context.Background()); err != nil {
 | |
| 			log.Printf("⚠️ Error stopping context generation: %v", err)
 | |
| 		}
 | |
| 	}
 | |
| 	
 | |
| 	// If we gained leadership, start context generation
 | |
| 	if newAdmin == sem.nodeID && oldAdmin != sem.nodeID {
 | |
| 		if sem.slurpConfig.AutoStartGeneration {
 | |
| 			go sem.startContextGenerationDelayed()
 | |
| 		}
 | |
| 	}
 | |
| 	
 | |
| 	// Call context callbacks
 | |
| 	if sem.contextCallbacks != nil && sem.contextCallbacks.OnContextLeaderChanged != nil {
 | |
| 		sem.contextCallbacks.OnContextLeaderChanged(oldAdmin, newAdmin, sem.contextTerm)
 | |
| 	}
 | |
| }
 | |
| 
 | |
| // onElectionCompleteSLURP handles election completion with SLURP context awareness
 | |
| func (sem *SLURPElectionManager) onElectionCompleteSLURP(winner string) {
 | |
| 	log.Printf("🏆 Election complete: %s (SLURP-aware)", winner)
 | |
| 	
 | |
| 	// Update context term on election completion
 | |
| 	sem.contextMu.Lock()
 | |
| 	sem.contextTerm++
 | |
| 	sem.contextMu.Unlock()
 | |
| }
 | |
| 
 | |
| // startContextGenerationDelayed starts context generation after a delay
 | |
| func (sem *SLURPElectionManager) startContextGenerationDelayed() {
 | |
| 	time.Sleep(sem.slurpConfig.GenerationStartDelay)
 | |
| 	
 | |
| 	if err := sem.StartContextGeneration(context.Background()); err != nil {
 | |
| 		log.Printf("⚠️ Error starting context generation: %v", err)
 | |
| 	}
 | |
| }
 | |
| 
 | |
| // runHealthMonitoring runs background health monitoring
 | |
| func (sem *SLURPElectionManager) runHealthMonitoring() {
 | |
| 	defer sem.contextWg.Done()
 | |
| 	
 | |
| 	ticker := time.NewTicker(sem.slurpConfig.ContextHealthCheckInterval)
 | |
| 	defer ticker.Stop()
 | |
| 	
 | |
| 	for {
 | |
| 		select {
 | |
| 		case <-ticker.C:
 | |
| 			sem.performHealthCheck()
 | |
| 		case <-sem.contextShutdown:
 | |
| 			return
 | |
| 		}
 | |
| 	}
 | |
| }
 | |
| 
 | |
| // runMetricsCollection runs background metrics collection
 | |
| func (sem *SLURPElectionManager) runMetricsCollection() {
 | |
| 	defer sem.contextWg.Done()
 | |
| 	
 | |
| 	ticker := time.NewTicker(30 * time.Second) // TODO: Make configurable
 | |
| 	defer ticker.Stop()
 | |
| 	
 | |
| 	for {
 | |
| 		select {
 | |
| 		case <-ticker.C:
 | |
| 			sem.collectMetrics()
 | |
| 		case <-sem.contextShutdown:
 | |
| 			return
 | |
| 		}
 | |
| 	}
 | |
| }
 | |
| 
 | |
| // performHealthCheck performs a context health check
 | |
| func (sem *SLURPElectionManager) performHealthCheck() {
 | |
| 	sem.contextMu.Lock()
 | |
| 	sem.lastHealthCheck = time.Now()
 | |
| 	sem.contextMu.Unlock()
 | |
| 	
 | |
| 	// TODO: Implement actual health checking logic
 | |
| 	if sem.contextManager != nil && sem.isContextLeader {
 | |
| 		if status, err := sem.contextManager.GetGenerationStatus(); err != nil {
 | |
| 			if sem.contextCallbacks != nil && sem.contextCallbacks.OnContextError != nil {
 | |
| 				sem.contextCallbacks.OnContextError(err, ErrorSeverityMedium)
 | |
| 			}
 | |
| 		} else {
 | |
| 			// Update health monitor with status
 | |
| 			sem.healthMonitor.UpdateGenerationStatus(status)
 | |
| 		}
 | |
| 	}
 | |
| }
 | |
| 
 | |
| // collectMetrics collects context generation metrics
 | |
| func (sem *SLURPElectionManager) collectMetrics() {
 | |
| 	// TODO: Implement metrics collection
 | |
| 	sem.metricsCollector.CollectMetrics(sem)
 | |
| }
 | |
| 
 | |
| // Stop overrides the base Stop to include SLURP cleanup
 | |
| func (sem *SLURPElectionManager) Stop() {
 | |
| 	log.Printf("🛑 Stopping SLURP election manager")
 | |
| 	
 | |
| 	// Stop context generation first
 | |
| 	if err := sem.StopContextGeneration(context.Background()); err != nil {
 | |
| 		log.Printf("⚠️ Error stopping context generation: %v", err)
 | |
| 	}
 | |
| 	
 | |
| 	// Stop base election manager
 | |
| 	sem.ElectionManager.Stop()
 | |
| 	
 | |
| 	log.Printf("✅ SLURP election manager stopped")
 | |
| }
 | |
| 
 | |
| // Placeholder types for health monitoring and metrics collection
 | |
| 
 | |
| // ContextHealthMonitor monitors the health of context generation cluster
 | |
| type ContextHealthMonitor struct {
 | |
| 	mu           sync.RWMutex
 | |
| 	lastHealth   *ContextClusterHealth
 | |
| 	lastUpdate   time.Time
 | |
| }
 | |
| 
 | |
| // NewContextHealthMonitor creates a new context health monitor
 | |
| func NewContextHealthMonitor() *ContextHealthMonitor {
 | |
| 	return &ContextHealthMonitor{
 | |
| 		lastUpdate: time.Now(),
 | |
| 	}
 | |
| }
 | |
| 
 | |
| // GetClusterHealth returns current cluster health
 | |
| func (chm *ContextHealthMonitor) GetClusterHealth() *ContextClusterHealth {
 | |
| 	chm.mu.RLock()
 | |
| 	defer chm.mu.RUnlock()
 | |
| 	
 | |
| 	if chm.lastHealth == nil {
 | |
| 		return &ContextClusterHealth{
 | |
| 			TotalNodes:         1,
 | |
| 			HealthyNodes:       1,
 | |
| 			GenerationActive:   false,
 | |
| 			OverallHealthScore: 1.0,
 | |
| 			LastElection:       time.Now(),
 | |
| 			NextHealthCheck:    time.Now().Add(30 * time.Second),
 | |
| 		}
 | |
| 	}
 | |
| 	
 | |
| 	return chm.lastHealth
 | |
| }
 | |
| 
 | |
| // UpdateGenerationStatus updates health based on generation status
 | |
| func (chm *ContextHealthMonitor) UpdateGenerationStatus(status *GenerationStatus) {
 | |
| 	chm.mu.Lock()
 | |
| 	defer chm.mu.Unlock()
 | |
| 	
 | |
| 	// TODO: Implement health status update based on generation status
 | |
| 	chm.lastUpdate = time.Now()
 | |
| }
 | |
| 
 | |
| // ContextMetricsCollector collects metrics for context operations
 | |
| type ContextMetricsCollector struct {
 | |
| 	mu            sync.RWMutex
 | |
| 	lastCollection time.Time
 | |
| }
 | |
| 
 | |
| // NewContextMetricsCollector creates a new context metrics collector
 | |
| func NewContextMetricsCollector() *ContextMetricsCollector {
 | |
| 	return &ContextMetricsCollector{}
 | |
| }
 | |
| 
 | |
| // CollectMetrics collects current metrics
 | |
| func (cmc *ContextMetricsCollector) CollectMetrics(manager *SLURPElectionManager) {
 | |
| 	cmc.mu.Lock()
 | |
| 	defer cmc.mu.Unlock()
 | |
| 	
 | |
| 	// TODO: Implement metrics collection
 | |
| 	cmc.lastCollection = time.Now()
 | |
| } |