470 lines
		
	
	
		
			14 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
			
		
		
	
	
			470 lines
		
	
	
		
			14 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
| package leader
 | |
| 
 | |
| import (
 | |
| 	"context"
 | |
| 	"fmt"
 | |
| 	"log"
 | |
| 	"time"
 | |
| 
 | |
| 	"chorus/pkg/config"
 | |
| 	"chorus/pkg/election"
 | |
| 	"chorus/pkg/dht"
 | |
| 	"chorus/pkg/slurp/intelligence"
 | |
| 	"chorus/pkg/slurp/storage"
 | |
| 	slurpContext "chorus/pkg/slurp/context"
 | |
| 	"chorus/pubsub"
 | |
| 	libp2p "github.com/libp2p/go-libp2p/core/host"
 | |
| )
 | |
| 
 | |
| // SLURPLeaderSystem represents the complete SLURP leader system integration
 | |
| type SLURPLeaderSystem struct {
 | |
| 	// Core components
 | |
| 	config             *SLURPLeaderConfig
 | |
| 	logger             *ContextLogger
 | |
| 	metricsCollector   *MetricsCollector
 | |
| 	
 | |
| 	// Election system
 | |
| 	slurpElection      *election.SLURPElectionManager
 | |
| 	
 | |
| 	// Context management
 | |
| 	contextManager     *ElectionIntegratedContextManager
 | |
| 	intelligenceEngine intelligence.IntelligenceEngine
 | |
| 	contextStore       storage.ContextStore
 | |
| 	contextResolver    slurpContext.ContextResolver
 | |
| 	
 | |
| 	// Distributed components
 | |
| 	dht                dht.DHT
 | |
| 	pubsub             *pubsub.PubSub
 | |
| 	host               libp2p.Host
 | |
| 	
 | |
| 	// Reliability components
 | |
| 	failoverManager    *FailoverManager
 | |
| 	
 | |
| 	// System state
 | |
| 	running            bool
 | |
| 	nodeID             string
 | |
| }
 | |
| 
 | |
| // NewSLURPLeaderSystem creates a new complete SLURP leader system
 | |
| func NewSLURPLeaderSystem(ctx context.Context, configPath string) (*SLURPLeaderSystem, error) {
 | |
| 	// Load configuration
 | |
| 	config, err := LoadSLURPLeaderConfig(configPath)
 | |
| 	if err != nil {
 | |
| 		return nil, fmt.Errorf("failed to load configuration: %w", err)
 | |
| 	}
 | |
| 	
 | |
| 	// Validate configuration
 | |
| 	if err := config.Validate(); err != nil {
 | |
| 		return nil, fmt.Errorf("invalid configuration: %w", err)
 | |
| 	}
 | |
| 	
 | |
| 	// Get effective configuration
 | |
| 	effectiveConfig := config.GetEffectiveConfig()
 | |
| 	nodeID := effectiveConfig.Core.NodeID
 | |
| 	
 | |
| 	// Initialize logging
 | |
| 	var logLevel LogLevel
 | |
| 	switch effectiveConfig.Observability.LogLevel {
 | |
| 	case "debug":
 | |
| 		logLevel = LogLevelDebug
 | |
| 	case "info":
 | |
| 		logLevel = LogLevelInfo
 | |
| 	case "warn":
 | |
| 		logLevel = LogLevelWarn
 | |
| 	case "error":
 | |
| 		logLevel = LogLevelError
 | |
| 	case "critical":
 | |
| 		logLevel = LogLevelCritical
 | |
| 	default:
 | |
| 		logLevel = LogLevelInfo
 | |
| 	}
 | |
| 	
 | |
| 	logger := NewContextLogger(nodeID, "slurp-leader", logLevel)
 | |
| 	
 | |
| 	// Add file output if configured
 | |
| 	if effectiveConfig.Observability.LogFile != "" {
 | |
| 		fileOutput, err := NewFileOutput(effectiveConfig.Observability.LogFile)
 | |
| 		if err != nil {
 | |
| 			logger.Warn("Failed to create file output: %v", err)
 | |
| 		} else {
 | |
| 			logger.AddOutput(fileOutput)
 | |
| 		}
 | |
| 	}
 | |
| 	
 | |
| 	// Initialize metrics collector
 | |
| 	metricsCollector := NewMetricsCollector()
 | |
| 	
 | |
| 	system := &SLURPLeaderSystem{
 | |
| 		config:           effectiveConfig,
 | |
| 		logger:           logger,
 | |
| 		metricsCollector: metricsCollector,
 | |
| 		nodeID:           nodeID,
 | |
| 	}
 | |
| 	
 | |
| 	logger.Info("SLURP Leader System initialized with node ID: %s", nodeID)
 | |
| 	
 | |
| 	return system, nil
 | |
| }
 | |
| 
 | |
| // Start starts the complete SLURP leader system
 | |
| func (sys *SLURPLeaderSystem) Start(ctx context.Context) error {
 | |
| 	if sys.running {
 | |
| 		return fmt.Errorf("system already running")
 | |
| 	}
 | |
| 	
 | |
| 	sys.logger.Info("Starting SLURP Leader System")
 | |
| 	
 | |
| 	// Initialize distributed components
 | |
| 	if err := sys.initializeDistributedComponents(ctx); err != nil {
 | |
| 		return fmt.Errorf("failed to initialize distributed components: %w", err)
 | |
| 	}
 | |
| 	
 | |
| 	// Initialize context components
 | |
| 	if err := sys.initializeContextComponents(ctx); err != nil {
 | |
| 		return fmt.Errorf("failed to initialize context components: %w", err)
 | |
| 	}
 | |
| 	
 | |
| 	// Initialize election system
 | |
| 	if err := sys.initializeElectionSystem(ctx); err != nil {
 | |
| 		return fmt.Errorf("failed to initialize election system: %w", err)
 | |
| 	}
 | |
| 	
 | |
| 	// Initialize reliability components
 | |
| 	if err := sys.initializeReliabilityComponents(ctx); err != nil {
 | |
| 		return fmt.Errorf("failed to initialize reliability components: %w", err)
 | |
| 	}
 | |
| 	
 | |
| 	// Start all components
 | |
| 	if err := sys.startComponents(ctx); err != nil {
 | |
| 		return fmt.Errorf("failed to start components: %w", err)
 | |
| 	}
 | |
| 	
 | |
| 	sys.running = true
 | |
| 	sys.logger.Info("SLURP Leader System started successfully")
 | |
| 	
 | |
| 	return nil
 | |
| }
 | |
| 
 | |
| // Stop stops the complete SLURP leader system
 | |
| func (sys *SLURPLeaderSystem) Stop(ctx context.Context) error {
 | |
| 	if !sys.running {
 | |
| 		return nil
 | |
| 	}
 | |
| 	
 | |
| 	sys.logger.Info("Stopping SLURP Leader System")
 | |
| 	
 | |
| 	// Stop components in reverse order
 | |
| 	if err := sys.stopComponents(ctx); err != nil {
 | |
| 		sys.logger.Error("Error stopping components: %v", err)
 | |
| 	}
 | |
| 	
 | |
| 	sys.running = false
 | |
| 	sys.logger.Info("SLURP Leader System stopped")
 | |
| 	
 | |
| 	// Close logger
 | |
| 	if err := sys.logger.Close(); err != nil {
 | |
| 		log.Printf("Error closing logger: %v", err)
 | |
| 	}
 | |
| 	
 | |
| 	return nil
 | |
| }
 | |
| 
 | |
| // GetStatus returns current system status
 | |
| func (sys *SLURPLeaderSystem) GetStatus() *SystemStatus {
 | |
| 	status := &SystemStatus{
 | |
| 		Running:          sys.running,
 | |
| 		NodeID:           sys.nodeID,
 | |
| 		Uptime:           time.Since(sys.metricsCollector.startTime),
 | |
| 		LastUpdate:       time.Now(),
 | |
| 	}
 | |
| 	
 | |
| 	// Get election status
 | |
| 	if sys.slurpElection != nil {
 | |
| 		status.IsLeader = sys.slurpElection.IsCurrentAdmin()
 | |
| 		status.IsContextLeader = sys.slurpElection.IsContextLeader()
 | |
| 		status.CurrentLeader = sys.slurpElection.GetCurrentAdmin()
 | |
| 		status.ElectionState = string(sys.slurpElection.GetElectionState())
 | |
| 	}
 | |
| 	
 | |
| 	// Get context generation status
 | |
| 	if sys.contextManager != nil {
 | |
| 		if genStatus, err := sys.contextManager.GetGenerationStatus(); err == nil {
 | |
| 			status.ContextGeneration = genStatus
 | |
| 		}
 | |
| 	}
 | |
| 	
 | |
| 	// Get health status
 | |
| 	if sys.failoverManager != nil {
 | |
| 		// TODO: Get health status from health monitor
 | |
| 		status.HealthStatus = "healthy"
 | |
| 		status.HealthScore = 1.0
 | |
| 	}
 | |
| 	
 | |
| 	// Get metrics
 | |
| 	status.Metrics = sys.metricsCollector.GetMetrics()
 | |
| 	
 | |
| 	return status
 | |
| }
 | |
| 
 | |
| // RequestContextGeneration requests context generation for a file
 | |
| func (sys *SLURPLeaderSystem) RequestContextGeneration(req *ContextGenerationRequest) (*ContextGenerationResult, error) {
 | |
| 	if !sys.running {
 | |
| 		return nil, fmt.Errorf("system not running")
 | |
| 	}
 | |
| 	
 | |
| 	if sys.contextManager == nil {
 | |
| 		return nil, fmt.Errorf("context manager not initialized")
 | |
| 	}
 | |
| 	
 | |
| 	sys.logger.LogContextGeneration("request_received", req, nil, nil)
 | |
| 	
 | |
| 	// Forward to context manager
 | |
| 	return sys.contextManager.RequestFromLeader(req)
 | |
| }
 | |
| 
 | |
| // GetClusterHealth returns cluster health information
 | |
| func (sys *SLURPLeaderSystem) GetClusterHealth() (*ContextClusterHealth, error) {
 | |
| 	if sys.slurpElection == nil {
 | |
| 		return nil, fmt.Errorf("election system not initialized")
 | |
| 	}
 | |
| 	
 | |
| 	return sys.slurpElection.GetContextClusterHealth()
 | |
| }
 | |
| 
 | |
| // TransferLeadership initiates leadership transfer to another node
 | |
| func (sys *SLURPLeaderSystem) TransferLeadership(ctx context.Context, targetNodeID string) error {
 | |
| 	if sys.slurpElection == nil {
 | |
| 		return fmt.Errorf("election system not initialized")
 | |
| 	}
 | |
| 	
 | |
| 	sys.logger.LogLeadershipChange("transfer_initiated", sys.nodeID, targetNodeID, 0, 
 | |
| 		map[string]interface{}{"target": targetNodeID, "reason": "manual"})
 | |
| 	
 | |
| 	return sys.slurpElection.TransferContextLeadership(ctx, targetNodeID)
 | |
| }
 | |
| 
 | |
| // GetMetrics returns current system metrics
 | |
| func (sys *SLURPLeaderSystem) GetMetrics() *ContextMetrics {
 | |
| 	return sys.metricsCollector.GetMetrics()
 | |
| }
 | |
| 
 | |
| // GetFailoverHistory returns failover event history
 | |
| func (sys *SLURPLeaderSystem) GetFailoverHistory() ([]*FailoverEvent, error) {
 | |
| 	if sys.failoverManager == nil {
 | |
| 		return nil, fmt.Errorf("failover manager not initialized")
 | |
| 	}
 | |
| 	
 | |
| 	return sys.failoverManager.GetFailoverHistory()
 | |
| }
 | |
| 
 | |
| // Private initialization methods
 | |
| 
 | |
| func (sys *SLURPLeaderSystem) initializeDistributedComponents(ctx context.Context) error {
 | |
| 	sys.logger.Debug("Initializing distributed components")
 | |
| 	
 | |
| 	// TODO: Initialize libp2p host
 | |
| 	// TODO: Initialize DHT
 | |
| 	// TODO: Initialize pubsub
 | |
| 	
 | |
| 	return nil
 | |
| }
 | |
| 
 | |
| func (sys *SLURPLeaderSystem) initializeContextComponents(ctx context.Context) error {
 | |
| 	sys.logger.Debug("Initializing context components")
 | |
| 	
 | |
| 	// TODO: Initialize intelligence engine
 | |
| 	// TODO: Initialize context store
 | |
| 	// TODO: Initialize context resolver
 | |
| 	
 | |
| 	return nil
 | |
| }
 | |
| 
 | |
| func (sys *SLURPLeaderSystem) initializeElectionSystem(ctx context.Context) error {
 | |
| 	sys.logger.Debug("Initializing election system")
 | |
| 	
 | |
| 	// Convert to base CHORUS config
 | |
| 	chorusConfig := sys.config.ToBaseCHORUSConfig()
 | |
| 	
 | |
| 	// Create SLURP election configuration
 | |
| 	slurpElectionConfig := &election.SLURPElectionConfig{
 | |
| 		EnableContextLeadership:     sys.config.Core.ProjectManagerEnabled,
 | |
| 		ContextLeadershipWeight:     sys.config.Election.ContextLeadershipWeight,
 | |
| 		RequireContextCapability:    sys.config.Election.RequireContextCapability,
 | |
| 		AutoStartGeneration:         sys.config.Election.AutoStartGeneration,
 | |
| 		GenerationStartDelay:        sys.config.Election.GenerationStartDelay,
 | |
| 		GenerationStopTimeout:       sys.config.Election.GenerationStopTimeout,
 | |
| 		ContextFailoverTimeout:      sys.config.Failover.StateTransferTimeout,
 | |
| 		StateTransferTimeout:        sys.config.Failover.StateTransferTimeout,
 | |
| 		ValidationTimeout:           sys.config.Failover.ValidationTimeout,
 | |
| 		RequireStateValidation:      sys.config.Failover.RequireStateValidation,
 | |
| 		ContextHealthCheckInterval:  sys.config.Health.HealthCheckInterval,
 | |
| 		ClusterHealthThreshold:      sys.config.Health.HealthyThreshold,
 | |
| 		LeaderHealthThreshold:       sys.config.Health.HealthyThreshold,
 | |
| 		MaxQueueTransferSize:        sys.config.Failover.MaxJobsToTransfer,
 | |
| 		QueueDrainTimeout:           sys.config.ContextManagement.QueueDrainTimeout,
 | |
| 		PreserveCompletedJobs:       sys.config.Failover.PreserveCompletedJobs,
 | |
| 		CoordinationTimeout:         sys.config.ContextManagement.ProcessingTimeout,
 | |
| 		MaxCoordinationRetries:      sys.config.ContextManagement.RetryAttempts,
 | |
| 		CoordinationBackoff:         sys.config.ContextManagement.RetryBackoff,
 | |
| 	}
 | |
| 	
 | |
| 	// Create SLURP election manager
 | |
| 	sys.slurpElection = election.NewSLURPElectionManager(
 | |
| 		ctx,
 | |
| 		chorusConfig,
 | |
| 		sys.host,
 | |
| 		sys.pubsub,
 | |
| 		sys.nodeID,
 | |
| 		slurpElectionConfig,
 | |
| 	)
 | |
| 	
 | |
| 	// Create election-integrated context manager
 | |
| 	var err error
 | |
| 	sys.contextManager, err = NewElectionIntegratedContextManager(
 | |
| 		sys.slurpElection,
 | |
| 		sys.dht,
 | |
| 		sys.intelligenceEngine,
 | |
| 		sys.contextStore,
 | |
| 		sys.contextResolver,
 | |
| 		nil, // Use default integration config
 | |
| 	)
 | |
| 	if err != nil {
 | |
| 		return fmt.Errorf("failed to create election-integrated context manager: %w", err)
 | |
| 	}
 | |
| 	
 | |
| 	sys.logger.Info("Election system initialized")
 | |
| 	return nil
 | |
| }
 | |
| 
 | |
| func (sys *SLURPLeaderSystem) initializeReliabilityComponents(ctx context.Context) error {
 | |
| 	sys.logger.Debug("Initializing reliability components")
 | |
| 	
 | |
| 	// Get base context manager from integrated manager
 | |
| 	baseManager := sys.contextManager.LeaderContextManager
 | |
| 	
 | |
| 	// Create failover manager
 | |
| 	sys.failoverManager = NewFailoverManager(baseManager, sys.logger, sys.metricsCollector)
 | |
| 	
 | |
| 	sys.logger.Info("Reliability components initialized")
 | |
| 	return nil
 | |
| }
 | |
| 
 | |
| func (sys *SLURPLeaderSystem) startComponents(ctx context.Context) error {
 | |
| 	sys.logger.Debug("Starting all components")
 | |
| 	
 | |
| 	// Start election system
 | |
| 	if err := sys.slurpElection.Start(); err != nil {
 | |
| 		return fmt.Errorf("failed to start election system: %w", err)
 | |
| 	}
 | |
| 	
 | |
| 	sys.logger.Info("All components started")
 | |
| 	return nil
 | |
| }
 | |
| 
 | |
| func (sys *SLURPLeaderSystem) stopComponents(ctx context.Context) error {
 | |
| 	sys.logger.Debug("Stopping all components")
 | |
| 	
 | |
| 	// Stop context manager
 | |
| 	if sys.contextManager != nil {
 | |
| 		sys.contextManager.Stop()
 | |
| 	}
 | |
| 	
 | |
| 	// Stop election system
 | |
| 	if sys.slurpElection != nil {
 | |
| 		sys.slurpElection.Stop()
 | |
| 	}
 | |
| 	
 | |
| 	sys.logger.Info("All components stopped")
 | |
| 	return nil
 | |
| }
 | |
| 
 | |
| // SystemStatus represents current system status
 | |
| type SystemStatus struct {
 | |
| 	// Basic status
 | |
| 	Running             bool                   `json:"running"`
 | |
| 	NodeID              string                 `json:"node_id"`
 | |
| 	Uptime              time.Duration          `json:"uptime"`
 | |
| 	LastUpdate          time.Time              `json:"last_update"`
 | |
| 	
 | |
| 	// Leadership status
 | |
| 	IsLeader            bool                   `json:"is_leader"`
 | |
| 	IsContextLeader     bool                   `json:"is_context_leader"`
 | |
| 	CurrentLeader       string                 `json:"current_leader"`
 | |
| 	ElectionState       string                 `json:"election_state"`
 | |
| 	
 | |
| 	// Context generation status
 | |
| 	ContextGeneration   *GenerationStatus      `json:"context_generation,omitempty"`
 | |
| 	
 | |
| 	// Health status
 | |
| 	HealthStatus        string                 `json:"health_status"`
 | |
| 	HealthScore         float64                `json:"health_score"`
 | |
| 	
 | |
| 	// Performance metrics
 | |
| 	Metrics             *ContextMetrics        `json:"metrics,omitempty"`
 | |
| }
 | |
| 
 | |
| // Example usage function
 | |
| func ExampleSLURPLeaderUsage() {
 | |
| 	ctx := context.Background()
 | |
| 	
 | |
| 	// Create and start SLURP leader system
 | |
| 	system, err := NewSLURPLeaderSystem(ctx, "config.yaml")
 | |
| 	if err != nil {
 | |
| 		log.Fatalf("Failed to create SLURP leader system: %v", err)
 | |
| 	}
 | |
| 	
 | |
| 	// Start the system
 | |
| 	if err := system.Start(ctx); err != nil {
 | |
| 		log.Fatalf("Failed to start SLURP leader system: %v", err)
 | |
| 	}
 | |
| 	
 | |
| 	// Defer cleanup
 | |
| 	defer func() {
 | |
| 		if err := system.Stop(ctx); err != nil {
 | |
| 			log.Printf("Error stopping system: %v", err)
 | |
| 		}
 | |
| 	}()
 | |
| 	
 | |
| 	// Wait for leadership
 | |
| 	if err := system.contextManager.WaitForLeadership(ctx); err != nil {
 | |
| 		log.Printf("Failed to gain leadership: %v", err)
 | |
| 		return
 | |
| 	}
 | |
| 	
 | |
| 	log.Printf("🎯 Became context leader!")
 | |
| 	
 | |
| 	// Request context generation
 | |
| 	req := &ContextGenerationRequest{
 | |
| 		ID:           "example-request-1",
 | |
| 		UCXLAddress:  "ucxl://example.com/path/to/file",
 | |
| 		FilePath:     "/path/to/file.go",
 | |
| 		Role:         "developer",
 | |
| 		Priority:     PriorityNormal,
 | |
| 		RequestedBy:  "example-user",
 | |
| 		CreatedAt:    time.Now(),
 | |
| 	}
 | |
| 	
 | |
| 	result, err := system.RequestContextGeneration(req)
 | |
| 	if err != nil {
 | |
| 		log.Printf("Failed to request context generation: %v", err)
 | |
| 		return
 | |
| 	}
 | |
| 	
 | |
| 	log.Printf("✅ Context generation result: %+v", result)
 | |
| 	
 | |
| 	// Get system status
 | |
| 	status := system.GetStatus()
 | |
| 	log.Printf("📊 System status: Leader=%t, ContextLeader=%t, Health=%s", 
 | |
| 		status.IsLeader, status.IsContextLeader, status.HealthStatus)
 | |
| 	
 | |
| 	// Get metrics
 | |
| 	metrics := system.GetMetrics()
 | |
| 	log.Printf("📈 Metrics: Requests=%d, Success Rate=%.2f%%, Throughput=%.2f req/s",
 | |
| 		metrics.TotalRequests, metrics.SuccessRate*100, metrics.Throughput)
 | |
| 	
 | |
| 	// Keep running until interrupted
 | |
| 	select {
 | |
| 	case <-ctx.Done():
 | |
| 		log.Printf("Context cancelled, shutting down")
 | |
| 	}
 | |
| } | 
