This comprehensive refactoring addresses critical architectural issues: IMPORT CYCLE RESOLUTION: • pkg/crypto ↔ pkg/slurp/roles: Created pkg/security/access_levels.go • pkg/ucxl → pkg/dht: Created pkg/storage/interfaces.go • pkg/slurp/leader → pkg/election → pkg/slurp/storage: Moved types to pkg/election/interfaces.go MODULE PATH MIGRATION: • Changed from github.com/anthonyrawlins/bzzz to chorus.services/bzzz • Updated all import statements across 115+ files • Maintains compatibility while removing personal GitHub account dependency TYPE SYSTEM IMPROVEMENTS: • Resolved duplicate type declarations in crypto package • Added missing type definitions (RoleStatus, TimeRestrictions, KeyStatus, KeyRotationResult) • Proper interface segregation to prevent future cycles ARCHITECTURAL BENEFITS: • Build now progresses past structural issues to normal dependency resolution • Cleaner separation of concerns between packages • Eliminates circular dependencies that prevented compilation • Establishes foundation for scalable codebase growth 🤖 Generated with [Claude Code](https://claude.ai/code) Co-Authored-By: Claude <noreply@anthropic.com>
470 lines
14 KiB
Go
470 lines
14 KiB
Go
package leader
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"log"
|
|
"time"
|
|
|
|
"chorus.services/bzzz/pkg/config"
|
|
"chorus.services/bzzz/pkg/election"
|
|
"chorus.services/bzzz/pkg/dht"
|
|
"chorus.services/bzzz/pkg/slurp/intelligence"
|
|
"chorus.services/bzzz/pkg/slurp/storage"
|
|
slurpContext "chorus.services/bzzz/pkg/slurp/context"
|
|
"chorus.services/bzzz/pubsub"
|
|
libp2p "github.com/libp2p/go-libp2p/core/host"
|
|
)
|
|
|
|
// SLURPLeaderSystem represents the complete SLURP leader system integration
|
|
type SLURPLeaderSystem struct {
|
|
// Core components
|
|
config *SLURPLeaderConfig
|
|
logger *ContextLogger
|
|
metricsCollector *MetricsCollector
|
|
|
|
// Election system
|
|
slurpElection *election.SLURPElectionManager
|
|
|
|
// Context management
|
|
contextManager *ElectionIntegratedContextManager
|
|
intelligenceEngine intelligence.IntelligenceEngine
|
|
contextStore storage.ContextStore
|
|
contextResolver slurpContext.ContextResolver
|
|
|
|
// Distributed components
|
|
dht dht.DHT
|
|
pubsub *pubsub.PubSub
|
|
host libp2p.Host
|
|
|
|
// Reliability components
|
|
failoverManager *FailoverManager
|
|
|
|
// System state
|
|
running bool
|
|
nodeID string
|
|
}
|
|
|
|
// NewSLURPLeaderSystem creates a new complete SLURP leader system
|
|
func NewSLURPLeaderSystem(ctx context.Context, configPath string) (*SLURPLeaderSystem, error) {
|
|
// Load configuration
|
|
config, err := LoadSLURPLeaderConfig(configPath)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("failed to load configuration: %w", err)
|
|
}
|
|
|
|
// Validate configuration
|
|
if err := config.Validate(); err != nil {
|
|
return nil, fmt.Errorf("invalid configuration: %w", err)
|
|
}
|
|
|
|
// Get effective configuration
|
|
effectiveConfig := config.GetEffectiveConfig()
|
|
nodeID := effectiveConfig.Core.NodeID
|
|
|
|
// Initialize logging
|
|
var logLevel LogLevel
|
|
switch effectiveConfig.Observability.LogLevel {
|
|
case "debug":
|
|
logLevel = LogLevelDebug
|
|
case "info":
|
|
logLevel = LogLevelInfo
|
|
case "warn":
|
|
logLevel = LogLevelWarn
|
|
case "error":
|
|
logLevel = LogLevelError
|
|
case "critical":
|
|
logLevel = LogLevelCritical
|
|
default:
|
|
logLevel = LogLevelInfo
|
|
}
|
|
|
|
logger := NewContextLogger(nodeID, "slurp-leader", logLevel)
|
|
|
|
// Add file output if configured
|
|
if effectiveConfig.Observability.LogFile != "" {
|
|
fileOutput, err := NewFileOutput(effectiveConfig.Observability.LogFile)
|
|
if err != nil {
|
|
logger.Warn("Failed to create file output: %v", err)
|
|
} else {
|
|
logger.AddOutput(fileOutput)
|
|
}
|
|
}
|
|
|
|
// Initialize metrics collector
|
|
metricsCollector := NewMetricsCollector()
|
|
|
|
system := &SLURPLeaderSystem{
|
|
config: effectiveConfig,
|
|
logger: logger,
|
|
metricsCollector: metricsCollector,
|
|
nodeID: nodeID,
|
|
}
|
|
|
|
logger.Info("SLURP Leader System initialized with node ID: %s", nodeID)
|
|
|
|
return system, nil
|
|
}
|
|
|
|
// Start starts the complete SLURP leader system
|
|
func (sys *SLURPLeaderSystem) Start(ctx context.Context) error {
|
|
if sys.running {
|
|
return fmt.Errorf("system already running")
|
|
}
|
|
|
|
sys.logger.Info("Starting SLURP Leader System")
|
|
|
|
// Initialize distributed components
|
|
if err := sys.initializeDistributedComponents(ctx); err != nil {
|
|
return fmt.Errorf("failed to initialize distributed components: %w", err)
|
|
}
|
|
|
|
// Initialize context components
|
|
if err := sys.initializeContextComponents(ctx); err != nil {
|
|
return fmt.Errorf("failed to initialize context components: %w", err)
|
|
}
|
|
|
|
// Initialize election system
|
|
if err := sys.initializeElectionSystem(ctx); err != nil {
|
|
return fmt.Errorf("failed to initialize election system: %w", err)
|
|
}
|
|
|
|
// Initialize reliability components
|
|
if err := sys.initializeReliabilityComponents(ctx); err != nil {
|
|
return fmt.Errorf("failed to initialize reliability components: %w", err)
|
|
}
|
|
|
|
// Start all components
|
|
if err := sys.startComponents(ctx); err != nil {
|
|
return fmt.Errorf("failed to start components: %w", err)
|
|
}
|
|
|
|
sys.running = true
|
|
sys.logger.Info("SLURP Leader System started successfully")
|
|
|
|
return nil
|
|
}
|
|
|
|
// Stop stops the complete SLURP leader system
|
|
func (sys *SLURPLeaderSystem) Stop(ctx context.Context) error {
|
|
if !sys.running {
|
|
return nil
|
|
}
|
|
|
|
sys.logger.Info("Stopping SLURP Leader System")
|
|
|
|
// Stop components in reverse order
|
|
if err := sys.stopComponents(ctx); err != nil {
|
|
sys.logger.Error("Error stopping components: %v", err)
|
|
}
|
|
|
|
sys.running = false
|
|
sys.logger.Info("SLURP Leader System stopped")
|
|
|
|
// Close logger
|
|
if err := sys.logger.Close(); err != nil {
|
|
log.Printf("Error closing logger: %v", err)
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// GetStatus returns current system status
|
|
func (sys *SLURPLeaderSystem) GetStatus() *SystemStatus {
|
|
status := &SystemStatus{
|
|
Running: sys.running,
|
|
NodeID: sys.nodeID,
|
|
Uptime: time.Since(sys.metricsCollector.startTime),
|
|
LastUpdate: time.Now(),
|
|
}
|
|
|
|
// Get election status
|
|
if sys.slurpElection != nil {
|
|
status.IsLeader = sys.slurpElection.IsCurrentAdmin()
|
|
status.IsContextLeader = sys.slurpElection.IsContextLeader()
|
|
status.CurrentLeader = sys.slurpElection.GetCurrentAdmin()
|
|
status.ElectionState = string(sys.slurpElection.GetElectionState())
|
|
}
|
|
|
|
// Get context generation status
|
|
if sys.contextManager != nil {
|
|
if genStatus, err := sys.contextManager.GetGenerationStatus(); err == nil {
|
|
status.ContextGeneration = genStatus
|
|
}
|
|
}
|
|
|
|
// Get health status
|
|
if sys.failoverManager != nil {
|
|
// TODO: Get health status from health monitor
|
|
status.HealthStatus = "healthy"
|
|
status.HealthScore = 1.0
|
|
}
|
|
|
|
// Get metrics
|
|
status.Metrics = sys.metricsCollector.GetMetrics()
|
|
|
|
return status
|
|
}
|
|
|
|
// RequestContextGeneration requests context generation for a file
|
|
func (sys *SLURPLeaderSystem) RequestContextGeneration(req *ContextGenerationRequest) (*ContextGenerationResult, error) {
|
|
if !sys.running {
|
|
return nil, fmt.Errorf("system not running")
|
|
}
|
|
|
|
if sys.contextManager == nil {
|
|
return nil, fmt.Errorf("context manager not initialized")
|
|
}
|
|
|
|
sys.logger.LogContextGeneration("request_received", req, nil, nil)
|
|
|
|
// Forward to context manager
|
|
return sys.contextManager.RequestFromLeader(req)
|
|
}
|
|
|
|
// GetClusterHealth returns cluster health information
|
|
func (sys *SLURPLeaderSystem) GetClusterHealth() (*ContextClusterHealth, error) {
|
|
if sys.slurpElection == nil {
|
|
return nil, fmt.Errorf("election system not initialized")
|
|
}
|
|
|
|
return sys.slurpElection.GetContextClusterHealth()
|
|
}
|
|
|
|
// TransferLeadership initiates leadership transfer to another node
|
|
func (sys *SLURPLeaderSystem) TransferLeadership(ctx context.Context, targetNodeID string) error {
|
|
if sys.slurpElection == nil {
|
|
return fmt.Errorf("election system not initialized")
|
|
}
|
|
|
|
sys.logger.LogLeadershipChange("transfer_initiated", sys.nodeID, targetNodeID, 0,
|
|
map[string]interface{}{"target": targetNodeID, "reason": "manual"})
|
|
|
|
return sys.slurpElection.TransferContextLeadership(ctx, targetNodeID)
|
|
}
|
|
|
|
// GetMetrics returns current system metrics
|
|
func (sys *SLURPLeaderSystem) GetMetrics() *ContextMetrics {
|
|
return sys.metricsCollector.GetMetrics()
|
|
}
|
|
|
|
// GetFailoverHistory returns failover event history
|
|
func (sys *SLURPLeaderSystem) GetFailoverHistory() ([]*FailoverEvent, error) {
|
|
if sys.failoverManager == nil {
|
|
return nil, fmt.Errorf("failover manager not initialized")
|
|
}
|
|
|
|
return sys.failoverManager.GetFailoverHistory()
|
|
}
|
|
|
|
// Private initialization methods
|
|
|
|
func (sys *SLURPLeaderSystem) initializeDistributedComponents(ctx context.Context) error {
|
|
sys.logger.Debug("Initializing distributed components")
|
|
|
|
// TODO: Initialize libp2p host
|
|
// TODO: Initialize DHT
|
|
// TODO: Initialize pubsub
|
|
|
|
return nil
|
|
}
|
|
|
|
func (sys *SLURPLeaderSystem) initializeContextComponents(ctx context.Context) error {
|
|
sys.logger.Debug("Initializing context components")
|
|
|
|
// TODO: Initialize intelligence engine
|
|
// TODO: Initialize context store
|
|
// TODO: Initialize context resolver
|
|
|
|
return nil
|
|
}
|
|
|
|
func (sys *SLURPLeaderSystem) initializeElectionSystem(ctx context.Context) error {
|
|
sys.logger.Debug("Initializing election system")
|
|
|
|
// Convert to base BZZZ config
|
|
bzzzConfig := sys.config.ToBaseBZZZConfig()
|
|
|
|
// Create SLURP election configuration
|
|
slurpElectionConfig := &election.SLURPElectionConfig{
|
|
EnableContextLeadership: sys.config.Core.ProjectManagerEnabled,
|
|
ContextLeadershipWeight: sys.config.Election.ContextLeadershipWeight,
|
|
RequireContextCapability: sys.config.Election.RequireContextCapability,
|
|
AutoStartGeneration: sys.config.Election.AutoStartGeneration,
|
|
GenerationStartDelay: sys.config.Election.GenerationStartDelay,
|
|
GenerationStopTimeout: sys.config.Election.GenerationStopTimeout,
|
|
ContextFailoverTimeout: sys.config.Failover.StateTransferTimeout,
|
|
StateTransferTimeout: sys.config.Failover.StateTransferTimeout,
|
|
ValidationTimeout: sys.config.Failover.ValidationTimeout,
|
|
RequireStateValidation: sys.config.Failover.RequireStateValidation,
|
|
ContextHealthCheckInterval: sys.config.Health.HealthCheckInterval,
|
|
ClusterHealthThreshold: sys.config.Health.HealthyThreshold,
|
|
LeaderHealthThreshold: sys.config.Health.HealthyThreshold,
|
|
MaxQueueTransferSize: sys.config.Failover.MaxJobsToTransfer,
|
|
QueueDrainTimeout: sys.config.ContextManagement.QueueDrainTimeout,
|
|
PreserveCompletedJobs: sys.config.Failover.PreserveCompletedJobs,
|
|
CoordinationTimeout: sys.config.ContextManagement.ProcessingTimeout,
|
|
MaxCoordinationRetries: sys.config.ContextManagement.RetryAttempts,
|
|
CoordinationBackoff: sys.config.ContextManagement.RetryBackoff,
|
|
}
|
|
|
|
// Create SLURP election manager
|
|
sys.slurpElection = election.NewSLURPElectionManager(
|
|
ctx,
|
|
bzzzConfig,
|
|
sys.host,
|
|
sys.pubsub,
|
|
sys.nodeID,
|
|
slurpElectionConfig,
|
|
)
|
|
|
|
// Create election-integrated context manager
|
|
var err error
|
|
sys.contextManager, err = NewElectionIntegratedContextManager(
|
|
sys.slurpElection,
|
|
sys.dht,
|
|
sys.intelligenceEngine,
|
|
sys.contextStore,
|
|
sys.contextResolver,
|
|
nil, // Use default integration config
|
|
)
|
|
if err != nil {
|
|
return fmt.Errorf("failed to create election-integrated context manager: %w", err)
|
|
}
|
|
|
|
sys.logger.Info("Election system initialized")
|
|
return nil
|
|
}
|
|
|
|
func (sys *SLURPLeaderSystem) initializeReliabilityComponents(ctx context.Context) error {
|
|
sys.logger.Debug("Initializing reliability components")
|
|
|
|
// Get base context manager from integrated manager
|
|
baseManager := sys.contextManager.LeaderContextManager
|
|
|
|
// Create failover manager
|
|
sys.failoverManager = NewFailoverManager(baseManager, sys.logger, sys.metricsCollector)
|
|
|
|
sys.logger.Info("Reliability components initialized")
|
|
return nil
|
|
}
|
|
|
|
func (sys *SLURPLeaderSystem) startComponents(ctx context.Context) error {
|
|
sys.logger.Debug("Starting all components")
|
|
|
|
// Start election system
|
|
if err := sys.slurpElection.Start(); err != nil {
|
|
return fmt.Errorf("failed to start election system: %w", err)
|
|
}
|
|
|
|
sys.logger.Info("All components started")
|
|
return nil
|
|
}
|
|
|
|
func (sys *SLURPLeaderSystem) stopComponents(ctx context.Context) error {
|
|
sys.logger.Debug("Stopping all components")
|
|
|
|
// Stop context manager
|
|
if sys.contextManager != nil {
|
|
sys.contextManager.Stop()
|
|
}
|
|
|
|
// Stop election system
|
|
if sys.slurpElection != nil {
|
|
sys.slurpElection.Stop()
|
|
}
|
|
|
|
sys.logger.Info("All components stopped")
|
|
return nil
|
|
}
|
|
|
|
// SystemStatus represents current system status
|
|
type SystemStatus struct {
|
|
// Basic status
|
|
Running bool `json:"running"`
|
|
NodeID string `json:"node_id"`
|
|
Uptime time.Duration `json:"uptime"`
|
|
LastUpdate time.Time `json:"last_update"`
|
|
|
|
// Leadership status
|
|
IsLeader bool `json:"is_leader"`
|
|
IsContextLeader bool `json:"is_context_leader"`
|
|
CurrentLeader string `json:"current_leader"`
|
|
ElectionState string `json:"election_state"`
|
|
|
|
// Context generation status
|
|
ContextGeneration *GenerationStatus `json:"context_generation,omitempty"`
|
|
|
|
// Health status
|
|
HealthStatus string `json:"health_status"`
|
|
HealthScore float64 `json:"health_score"`
|
|
|
|
// Performance metrics
|
|
Metrics *ContextMetrics `json:"metrics,omitempty"`
|
|
}
|
|
|
|
// Example usage function
|
|
func ExampleSLURPLeaderUsage() {
|
|
ctx := context.Background()
|
|
|
|
// Create and start SLURP leader system
|
|
system, err := NewSLURPLeaderSystem(ctx, "config.yaml")
|
|
if err != nil {
|
|
log.Fatalf("Failed to create SLURP leader system: %v", err)
|
|
}
|
|
|
|
// Start the system
|
|
if err := system.Start(ctx); err != nil {
|
|
log.Fatalf("Failed to start SLURP leader system: %v", err)
|
|
}
|
|
|
|
// Defer cleanup
|
|
defer func() {
|
|
if err := system.Stop(ctx); err != nil {
|
|
log.Printf("Error stopping system: %v", err)
|
|
}
|
|
}()
|
|
|
|
// Wait for leadership
|
|
if err := system.contextManager.WaitForLeadership(ctx); err != nil {
|
|
log.Printf("Failed to gain leadership: %v", err)
|
|
return
|
|
}
|
|
|
|
log.Printf("🎯 Became context leader!")
|
|
|
|
// Request context generation
|
|
req := &ContextGenerationRequest{
|
|
ID: "example-request-1",
|
|
UCXLAddress: "ucxl://example.com/path/to/file",
|
|
FilePath: "/path/to/file.go",
|
|
Role: "developer",
|
|
Priority: PriorityNormal,
|
|
RequestedBy: "example-user",
|
|
CreatedAt: time.Now(),
|
|
}
|
|
|
|
result, err := system.RequestContextGeneration(req)
|
|
if err != nil {
|
|
log.Printf("Failed to request context generation: %v", err)
|
|
return
|
|
}
|
|
|
|
log.Printf("✅ Context generation result: %+v", result)
|
|
|
|
// Get system status
|
|
status := system.GetStatus()
|
|
log.Printf("📊 System status: Leader=%t, ContextLeader=%t, Health=%s",
|
|
status.IsLeader, status.IsContextLeader, status.HealthStatus)
|
|
|
|
// Get metrics
|
|
metrics := system.GetMetrics()
|
|
log.Printf("📈 Metrics: Requests=%d, Success Rate=%.2f%%, Throughput=%.2f req/s",
|
|
metrics.TotalRequests, metrics.SuccessRate*100, metrics.Throughput)
|
|
|
|
// Keep running until interrupted
|
|
select {
|
|
case <-ctx.Done():
|
|
log.Printf("Context cancelled, shutting down")
|
|
}
|
|
} |