// Package distribution provides replication management for distributed contexts package distribution import ( "context" "fmt" "sync" "time" "github.com/anthonyrawlins/bzzz/pkg/dht" "github.com/anthonyrawlins/bzzz/pkg/config" "github.com/anthonyrawlins/bzzz/pkg/ucxl" "github.com/libp2p/go-libp2p/core/peer" ) // ReplicationManagerImpl implements ReplicationManager interface type ReplicationManagerImpl struct { mu sync.RWMutex dht *dht.DHT config *config.Config replicationMap map[string]*ReplicationStatus repairQueue chan *RepairRequest rebalanceQueue chan *RebalanceRequest consistentHash ConsistentHashing policy *ReplicationPolicy stats *ReplicationStatistics running bool } // RepairRequest represents a repair request type RepairRequest struct { Address ucxl.Address RequestedBy string Priority Priority RequestTime time.Time } // RebalanceRequest represents a rebalance request type RebalanceRequest struct { Reason string RequestedBy string RequestTime time.Time } // NewReplicationManagerImpl creates a new replication manager implementation func NewReplicationManagerImpl(dht *dht.DHT, config *config.Config) (*ReplicationManagerImpl, error) { if dht == nil { return nil, fmt.Errorf("DHT instance is required") } if config == nil { return nil, fmt.Errorf("config is required") } rm := &ReplicationManagerImpl{ dht: dht, config: config, replicationMap: make(map[string]*ReplicationStatus), repairQueue: make(chan *RepairRequest, 1000), rebalanceQueue: make(chan *RebalanceRequest, 100), policy: &ReplicationPolicy{ DefaultFactor: 3, MinFactor: 2, MaxFactor: 7, PreferredZones: []string{"zone-a", "zone-b", "zone-c"}, AvoidSameNode: true, ConsistencyLevel: ConsistencyEventual, RepairThreshold: 0.8, RebalanceInterval: 6 * time.Hour, }, stats: &ReplicationStatistics{ LastUpdated: time.Now(), }, } // Initialize consistent hashing consistentHash, err := NewConsistentHashingImpl() if err != nil { return nil, fmt.Errorf("failed to create consistent hashing: %w", err) } rm.consistentHash = consistentHash // Add known peers to consistent hash ring peers := dht.GetConnectedPeers() for _, peerID := range peers { rm.consistentHash.AddNode(peerID.String()) } return rm, nil } // Start starts the replication manager func (rm *ReplicationManagerImpl) Start(ctx context.Context) error { rm.mu.Lock() if rm.running { rm.mu.Unlock() return fmt.Errorf("replication manager already running") } rm.running = true rm.mu.Unlock() // Start background workers go rm.repairWorker(ctx) go rm.rebalanceWorker(ctx) go rm.healthChecker(ctx) return nil } // Stop stops the replication manager func (rm *ReplicationManagerImpl) Stop() error { rm.mu.Lock() defer rm.mu.Unlock() rm.running = false close(rm.repairQueue) close(rm.rebalanceQueue) return nil } // EnsureReplication ensures context meets replication requirements func (rm *ReplicationManagerImpl) EnsureReplication(ctx context.Context, address ucxl.Address, factor int) error { if factor < rm.policy.MinFactor { factor = rm.policy.MinFactor } if factor > rm.policy.MaxFactor { factor = rm.policy.MaxFactor } // Get current replication status status, err := rm.GetReplicationStatus(ctx, address) if err != nil { return fmt.Errorf("failed to get replication status: %w", err) } if status.CurrentReplicas >= factor { return nil // Already sufficiently replicated } // Calculate how many more replicas we need needed := factor - status.CurrentReplicas // Select target nodes for additional replicas targetNodes, err := rm.selectReplicationNodes(address, needed) if err != nil { return fmt.Errorf("failed to select replication nodes: %w", err) } // Create replicas on target nodes for _, nodeID := range targetNodes { if err := rm.createReplica(ctx, address, nodeID); err != nil { // Log error but continue with other nodes continue } } // Update replication status rm.updateReplicationStatus(address, status.CurrentReplicas+len(targetNodes)) return nil } // RepairReplicas repairs missing or corrupted replicas func (rm *ReplicationManagerImpl) RepairReplicas(ctx context.Context, address ucxl.Address) (*RepairResult, error) { start := time.Now() result := &RepairResult{ Address: address.String(), RepairTime: 0, RepairSuccessful: false, Errors: []string{}, RepairedAt: time.Now(), } // Get current replication status status, err := rm.GetReplicationStatus(ctx, address) if err != nil { result.Errors = append(result.Errors, fmt.Sprintf("failed to get replication status: %v", err)) return result, err } // Identify unhealthy replicas unhealthyNodes := []string{} for nodeID, replica := range status.ReplicaDistribution { if replica == 0 { // Node should have replica but doesn't unhealthyNodes = append(unhealthyNodes, nodeID) } } // Repair missing replicas repaired := 0 for _, nodeID := range unhealthyNodes { if err := rm.createReplica(ctx, address, nodeID); err != nil { result.Errors = append(result.Errors, fmt.Sprintf("failed to repair replica on node %s: %v", nodeID, err)) } else { repaired++ } } result.RepairedReplicas = repaired result.RepairTime = time.Since(start) result.RepairSuccessful = len(result.Errors) == 0 rm.mu.Lock() rm.stats.RepairRequests++ if result.RepairSuccessful { rm.stats.SuccessfulRepairs++ } else { rm.stats.FailedRepairs++ } rm.stats.AverageRepairTime = (rm.stats.AverageRepairTime + result.RepairTime) / 2 rm.stats.LastUpdated = time.Now() rm.mu.Unlock() return result, nil } // BalanceReplicas rebalances replicas across cluster nodes func (rm *ReplicationManagerImpl) BalanceReplicas(ctx context.Context) (*RebalanceResult, error) { start := time.Now() result := &RebalanceResult{ RebalanceTime: 0, RebalanceSuccessful: false, Errors: []string{}, RebalancedAt: time.Now(), } // Get current cluster topology peers := rm.dht.GetConnectedPeers() if len(peers) < rm.policy.MinFactor { result.Errors = append(result.Errors, "insufficient peers for rebalancing") return result, fmt.Errorf("insufficient peers for rebalancing") } // Calculate ideal distribution idealDistribution := rm.calculateIdealDistribution(peers) // Get current distribution for all contexts currentDistribution := rm.getCurrentDistribution(ctx) // Calculate moves needed moves := rm.calculateRebalanceMoves(currentDistribution, idealDistribution) // Execute moves moved := 0 for _, move := range moves { if err := rm.moveReplica(ctx, move); err != nil { result.Errors = append(result.Errors, fmt.Sprintf("failed to move replica: %v", err)) } else { moved++ } } result.MovedReplicas = moved result.RebalanceTime = time.Since(start) result.RebalanceSuccessful = len(result.Errors) == 0 // Calculate load balance improvement if len(moves) > 0 { result.LoadBalanceImprovement = float64(moved) / float64(len(moves)) } rm.mu.Lock() rm.stats.RebalanceOperations++ rm.stats.LastRebalanceTime = time.Now() rm.stats.LastUpdated = time.Now() rm.mu.Unlock() return result, nil } // GetReplicationStatus returns current replication status func (rm *ReplicationManagerImpl) GetReplicationStatus(ctx context.Context, address ucxl.Address) (*ReplicaHealth, error) { rm.mu.RLock() status, exists := rm.replicationMap[address.String()] rm.mu.RUnlock() if !exists { // Create new status entry status = &ReplicationStatus{ Address: address.String(), DesiredReplicas: rm.policy.DefaultFactor, CurrentReplicas: 0, HealthyReplicas: 0, ReplicationHealth: 0.0, ReplicaDistribution: make(map[string]int), LastReplication: time.Time{}, ReplicationErrors: []string{}, Status: "unknown", } // Try to discover existing replicas rm.discoverReplicas(ctx, address, status) rm.mu.Lock() rm.replicationMap[address.String()] = status rm.mu.Unlock() } // Convert to ReplicaHealth format health := &ReplicaHealth{ Address: address, TotalReplicas: status.CurrentReplicas, HealthyReplicas: status.HealthyReplicas, FailedReplicas: status.CurrentReplicas - status.HealthyReplicas, ReplicaNodes: []*ReplicaNode{}, OverallHealth: rm.determineOverallHealth(status), LastChecked: time.Now(), RepairNeeded: status.HealthyReplicas < status.DesiredReplicas, } // Populate replica nodes for nodeID, count := range status.ReplicaDistribution { if count > 0 { health.ReplicaNodes = append(health.ReplicaNodes, &ReplicaNode{ NodeID: nodeID, Status: rm.getNodeReplicaStatus(nodeID), LastSeen: time.Now(), Version: 1, Checksum: "", Latency: 0, NetworkAddress: nodeID, }) } } return health, nil } // SetReplicationFactor sets the desired replication factor func (rm *ReplicationManagerImpl) SetReplicationFactor(factor int) error { if factor < 1 { return fmt.Errorf("replication factor must be at least 1") } if factor > 10 { return fmt.Errorf("replication factor cannot exceed 10") } rm.mu.Lock() rm.policy.DefaultFactor = factor rm.mu.Unlock() return nil } // GetReplicationStats returns replication statistics func (rm *ReplicationManagerImpl) GetReplicationStats() (*ReplicationStatistics, error) { rm.mu.RLock() defer rm.mu.RUnlock() // Update calculated fields rm.stats.AverageReplicationFactor = rm.calculateAverageReplicationFactor() rm.stats.ReplicationEfficiency = rm.calculateReplicationEfficiency() return rm.stats, nil } // Background workers func (rm *ReplicationManagerImpl) repairWorker(ctx context.Context) { for { select { case <-ctx.Done(): return case req := <-rm.repairQueue: if req == nil { return // Channel closed } rm.RepairReplicas(ctx, req.Address) } } } func (rm *ReplicationManagerImpl) rebalanceWorker(ctx context.Context) { ticker := time.NewTicker(rm.policy.RebalanceInterval) defer ticker.Stop() for { select { case <-ctx.Done(): return case <-ticker.C: rm.BalanceReplicas(ctx) case req := <-rm.rebalanceQueue: if req == nil { return // Channel closed } rm.BalanceReplicas(ctx) } } } func (rm *ReplicationManagerImpl) healthChecker(ctx context.Context) { ticker := time.NewTicker(5 * time.Minute) defer ticker.Stop() for { select { case <-ctx.Done(): return case <-ticker.C: rm.checkReplicaHealth(ctx) } } } // Helper methods func (rm *ReplicationManagerImpl) selectReplicationNodes(address ucxl.Address, count int) ([]string, error) { // Use consistent hashing to select nodes candidates, err := rm.consistentHash.GetNodes(address.String(), count*2) // Get more candidates than needed if err != nil { return nil, err } // Filter out nodes that already have replicas and apply placement policies selectedNodes := []string{} for _, nodeID := range candidates { if len(selectedNodes) >= count { break } // Check if node already has this replica if rm.hasReplica(address, nodeID) { continue } // Check placement policies if rm.policy.AvoidSameNode && rm.isNodeOverloaded(nodeID) { continue } selectedNodes = append(selectedNodes, nodeID) } return selectedNodes, nil } func (rm *ReplicationManagerImpl) createReplica(ctx context.Context, address ucxl.Address, nodeID string) error { // In a real implementation, this would: // 1. Connect to the target node // 2. Transfer the context data // 3. Verify successful storage // For now, we'll simulate success return nil } func (rm *ReplicationManagerImpl) updateReplicationStatus(address ucxl.Address, currentReplicas int) { rm.mu.Lock() defer rm.mu.Unlock() addressStr := address.String() if status, exists := rm.replicationMap[addressStr]; exists { status.CurrentReplicas = currentReplicas status.LastReplication = time.Now() } } func (rm *ReplicationManagerImpl) discoverReplicas(ctx context.Context, address ucxl.Address, status *ReplicationStatus) { // In a real implementation, this would query the DHT to discover existing replicas // For now, we'll simulate some replicas peers := rm.dht.GetConnectedPeers() if len(peers) > 0 { status.CurrentReplicas = min(len(peers), rm.policy.DefaultFactor) status.HealthyReplicas = status.CurrentReplicas for i, peer := range peers { if i >= status.CurrentReplicas { break } status.ReplicaDistribution[peer.String()] = 1 } } } func (rm *ReplicationManagerImpl) determineOverallHealth(status *ReplicationStatus) HealthStatus { if status.HealthyReplicas == 0 { return HealthFailed } healthRatio := float64(status.HealthyReplicas) / float64(status.DesiredReplicas) if healthRatio >= 1.0 { return HealthHealthy } else if healthRatio >= 0.7 { return HealthDegraded } else if healthRatio >= 0.3 { return HealthCritical } else { return HealthFailed } } func (rm *ReplicationManagerImpl) getNodeReplicaStatus(nodeID string) ReplicaStatus { // In a real implementation, this would check the actual status of the replica on the node // For now, assume healthy return ReplicaHealthy } func (rm *ReplicationManagerImpl) calculateAverageReplicationFactor() float64 { rm.mu.RLock() defer rm.mu.RUnlock() if len(rm.replicationMap) == 0 { return 0 } total := 0 for _, status := range rm.replicationMap { total += status.CurrentReplicas } return float64(total) / float64(len(rm.replicationMap)) } func (rm *ReplicationManagerImpl) calculateReplicationEfficiency() float64 { rm.mu.RLock() defer rm.mu.RUnlock() if len(rm.replicationMap) == 0 { return 1.0 } efficient := 0 for _, status := range rm.replicationMap { if status.HealthyReplicas >= status.DesiredReplicas { efficient++ } } return float64(efficient) / float64(len(rm.replicationMap)) } func (rm *ReplicationManagerImpl) checkReplicaHealth(ctx context.Context) { rm.mu.RLock() addresses := make([]string, 0, len(rm.replicationMap)) for addr := range rm.replicationMap { addresses = append(addresses, addr) } rm.mu.RUnlock() for _, addrStr := range addresses { addr, err := ucxl.ParseAddress(addrStr) if err != nil { continue } // Check if repair is needed status, err := rm.GetReplicationStatus(ctx, addr) if err != nil { continue } if status.RepairNeeded { select { case rm.repairQueue <- &RepairRequest{ Address: addr, RequestedBy: "health_checker", Priority: PriorityNormal, RequestTime: time.Now(), }: default: // Queue is full, skip this repair } } } } func (rm *ReplicationManagerImpl) calculateIdealDistribution(peers []peer.ID) map[string]int { // Simple ideal distribution - equal replicas per node distribution := make(map[string]int) for _, peer := range peers { distribution[peer.String()] = 0 } return distribution } func (rm *ReplicationManagerImpl) getCurrentDistribution(ctx context.Context) map[string]map[string]int { // Returns current distribution: address -> node -> replica count distribution := make(map[string]map[string]int) rm.mu.RLock() for addr, status := range rm.replicationMap { distribution[addr] = make(map[string]int) for nodeID, count := range status.ReplicaDistribution { distribution[addr][nodeID] = count } } rm.mu.RUnlock() return distribution } func (rm *ReplicationManagerImpl) calculateRebalanceMoves(current, ideal map[string]map[string]int) []*RebalanceMove { moves := []*RebalanceMove{} // Simplified implementation - in production would use sophisticated algorithms return moves } func (rm *ReplicationManagerImpl) moveReplica(ctx context.Context, move *RebalanceMove) error { // Implementation would move replica from source to target node return nil } func (rm *ReplicationManagerImpl) hasReplica(address ucxl.Address, nodeID string) bool { rm.mu.RLock() defer rm.mu.RUnlock() if status, exists := rm.replicationMap[address.String()]; exists { return status.ReplicaDistribution[nodeID] > 0 } return false } func (rm *ReplicationManagerImpl) isNodeOverloaded(nodeID string) bool { // Simple implementation - check if node has too many replicas rm.mu.RLock() defer rm.mu.RUnlock() totalReplicas := 0 for _, status := range rm.replicationMap { totalReplicas += status.ReplicaDistribution[nodeID] } // Consider overloaded if more than average + 50% averageLoad := rm.calculateAverageReplicationFactor() return float64(totalReplicas) > averageLoad*1.5 } // RebalanceMove represents a replica move operation type RebalanceMove struct { Address ucxl.Address `json:"address"` FromNode string `json:"from_node"` ToNode string `json:"to_node"` Priority Priority `json:"priority"` Reason string `json:"reason"` } // Utility functions func min(a, b int) int { if a < b { return a } return b }