 d96c931a29
			
		
	
	d96c931a29
	
	
	
		
			
			This comprehensive refactoring addresses critical architectural issues: IMPORT CYCLE RESOLUTION: • pkg/crypto ↔ pkg/slurp/roles: Created pkg/security/access_levels.go • pkg/ucxl → pkg/dht: Created pkg/storage/interfaces.go • pkg/slurp/leader → pkg/election → pkg/slurp/storage: Moved types to pkg/election/interfaces.go MODULE PATH MIGRATION: • Changed from github.com/anthonyrawlins/bzzz to chorus.services/bzzz • Updated all import statements across 115+ files • Maintains compatibility while removing personal GitHub account dependency TYPE SYSTEM IMPROVEMENTS: • Resolved duplicate type declarations in crypto package • Added missing type definitions (RoleStatus, TimeRestrictions, KeyStatus, KeyRotationResult) • Proper interface segregation to prevent future cycles ARCHITECTURAL BENEFITS: • Build now progresses past structural issues to normal dependency resolution • Cleaner separation of concerns between packages • Eliminates circular dependencies that prevented compilation • Establishes foundation for scalable codebase growth 🤖 Generated with [Claude Code](https://claude.ai/code) Co-Authored-By: Claude <noreply@anthropic.com>
		
			
				
	
	
		
			646 lines
		
	
	
		
			17 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
			
		
		
	
	
			646 lines
		
	
	
		
			17 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
| // Package distribution provides replication management for distributed contexts
 | |
| package distribution
 | |
| 
 | |
| import (
 | |
| 	"context"
 | |
| 	"fmt"
 | |
| 	"sync"
 | |
| 	"time"
 | |
| 
 | |
| 	"chorus.services/bzzz/pkg/dht"
 | |
| 	"chorus.services/bzzz/pkg/config"
 | |
| 	"chorus.services/bzzz/pkg/ucxl"
 | |
| 	"github.com/libp2p/go-libp2p/core/peer"
 | |
| )
 | |
| 
 | |
| // ReplicationManagerImpl implements ReplicationManager interface
 | |
| type ReplicationManagerImpl struct {
 | |
| 	mu              sync.RWMutex
 | |
| 	dht             *dht.DHT
 | |
| 	config          *config.Config
 | |
| 	replicationMap  map[string]*ReplicationStatus
 | |
| 	repairQueue     chan *RepairRequest
 | |
| 	rebalanceQueue  chan *RebalanceRequest
 | |
| 	consistentHash  ConsistentHashing
 | |
| 	policy          *ReplicationPolicy
 | |
| 	stats           *ReplicationStatistics
 | |
| 	running         bool
 | |
| }
 | |
| 
 | |
| // RepairRequest represents a repair request
 | |
| type RepairRequest struct {
 | |
| 	Address         ucxl.Address
 | |
| 	RequestedBy     string
 | |
| 	Priority        Priority
 | |
| 	RequestTime     time.Time
 | |
| }
 | |
| 
 | |
| // RebalanceRequest represents a rebalance request
 | |
| type RebalanceRequest struct {
 | |
| 	Reason          string
 | |
| 	RequestedBy     string
 | |
| 	RequestTime     time.Time
 | |
| }
 | |
| 
 | |
| // NewReplicationManagerImpl creates a new replication manager implementation
 | |
| func NewReplicationManagerImpl(dht *dht.DHT, config *config.Config) (*ReplicationManagerImpl, error) {
 | |
| 	if dht == nil {
 | |
| 		return nil, fmt.Errorf("DHT instance is required")
 | |
| 	}
 | |
| 	if config == nil {
 | |
| 		return nil, fmt.Errorf("config is required")
 | |
| 	}
 | |
| 
 | |
| 	rm := &ReplicationManagerImpl{
 | |
| 		dht:            dht,
 | |
| 		config:         config,
 | |
| 		replicationMap: make(map[string]*ReplicationStatus),
 | |
| 		repairQueue:    make(chan *RepairRequest, 1000),
 | |
| 		rebalanceQueue: make(chan *RebalanceRequest, 100),
 | |
| 		policy: &ReplicationPolicy{
 | |
| 			DefaultFactor:     3,
 | |
| 			MinFactor:         2,
 | |
| 			MaxFactor:         7,
 | |
| 			PreferredZones:    []string{"zone-a", "zone-b", "zone-c"},
 | |
| 			AvoidSameNode:     true,
 | |
| 			ConsistencyLevel:  ConsistencyEventual,
 | |
| 			RepairThreshold:   0.8,
 | |
| 			RebalanceInterval: 6 * time.Hour,
 | |
| 		},
 | |
| 		stats: &ReplicationStatistics{
 | |
| 			LastUpdated: time.Now(),
 | |
| 		},
 | |
| 	}
 | |
| 
 | |
| 	// Initialize consistent hashing
 | |
| 	consistentHash, err := NewConsistentHashingImpl()
 | |
| 	if err != nil {
 | |
| 		return nil, fmt.Errorf("failed to create consistent hashing: %w", err)
 | |
| 	}
 | |
| 	rm.consistentHash = consistentHash
 | |
| 
 | |
| 	// Add known peers to consistent hash ring
 | |
| 	peers := dht.GetConnectedPeers()
 | |
| 	for _, peerID := range peers {
 | |
| 		rm.consistentHash.AddNode(peerID.String())
 | |
| 	}
 | |
| 
 | |
| 	return rm, nil
 | |
| }
 | |
| 
 | |
| // Start starts the replication manager
 | |
| func (rm *ReplicationManagerImpl) Start(ctx context.Context) error {
 | |
| 	rm.mu.Lock()
 | |
| 	if rm.running {
 | |
| 		rm.mu.Unlock()
 | |
| 		return fmt.Errorf("replication manager already running")
 | |
| 	}
 | |
| 	rm.running = true
 | |
| 	rm.mu.Unlock()
 | |
| 
 | |
| 	// Start background workers
 | |
| 	go rm.repairWorker(ctx)
 | |
| 	go rm.rebalanceWorker(ctx)
 | |
| 	go rm.healthChecker(ctx)
 | |
| 
 | |
| 	return nil
 | |
| }
 | |
| 
 | |
| // Stop stops the replication manager
 | |
| func (rm *ReplicationManagerImpl) Stop() error {
 | |
| 	rm.mu.Lock()
 | |
| 	defer rm.mu.Unlock()
 | |
| 
 | |
| 	rm.running = false
 | |
| 	close(rm.repairQueue)
 | |
| 	close(rm.rebalanceQueue)
 | |
| 
 | |
| 	return nil
 | |
| }
 | |
| 
 | |
| // EnsureReplication ensures context meets replication requirements
 | |
| func (rm *ReplicationManagerImpl) EnsureReplication(ctx context.Context, address ucxl.Address, factor int) error {
 | |
| 	if factor < rm.policy.MinFactor {
 | |
| 		factor = rm.policy.MinFactor
 | |
| 	}
 | |
| 	if factor > rm.policy.MaxFactor {
 | |
| 		factor = rm.policy.MaxFactor
 | |
| 	}
 | |
| 
 | |
| 	// Get current replication status
 | |
| 	status, err := rm.GetReplicationStatus(ctx, address)
 | |
| 	if err != nil {
 | |
| 		return fmt.Errorf("failed to get replication status: %w", err)
 | |
| 	}
 | |
| 
 | |
| 	if status.CurrentReplicas >= factor {
 | |
| 		return nil // Already sufficiently replicated
 | |
| 	}
 | |
| 
 | |
| 	// Calculate how many more replicas we need
 | |
| 	needed := factor - status.CurrentReplicas
 | |
| 
 | |
| 	// Select target nodes for additional replicas
 | |
| 	targetNodes, err := rm.selectReplicationNodes(address, needed)
 | |
| 	if err != nil {
 | |
| 		return fmt.Errorf("failed to select replication nodes: %w", err)
 | |
| 	}
 | |
| 
 | |
| 	// Create replicas on target nodes
 | |
| 	for _, nodeID := range targetNodes {
 | |
| 		if err := rm.createReplica(ctx, address, nodeID); err != nil {
 | |
| 			// Log error but continue with other nodes
 | |
| 			continue
 | |
| 		}
 | |
| 	}
 | |
| 
 | |
| 	// Update replication status
 | |
| 	rm.updateReplicationStatus(address, status.CurrentReplicas+len(targetNodes))
 | |
| 
 | |
| 	return nil
 | |
| }
 | |
| 
 | |
| // RepairReplicas repairs missing or corrupted replicas
 | |
| func (rm *ReplicationManagerImpl) RepairReplicas(ctx context.Context, address ucxl.Address) (*RepairResult, error) {
 | |
| 	start := time.Now()
 | |
| 
 | |
| 	result := &RepairResult{
 | |
| 		Address:          address.String(),
 | |
| 		RepairTime:       0,
 | |
| 		RepairSuccessful: false,
 | |
| 		Errors:           []string{},
 | |
| 		RepairedAt:       time.Now(),
 | |
| 	}
 | |
| 
 | |
| 	// Get current replication status
 | |
| 	status, err := rm.GetReplicationStatus(ctx, address)
 | |
| 	if err != nil {
 | |
| 		result.Errors = append(result.Errors, fmt.Sprintf("failed to get replication status: %v", err))
 | |
| 		return result, err
 | |
| 	}
 | |
| 
 | |
| 	// Identify unhealthy replicas
 | |
| 	unhealthyNodes := []string{}
 | |
| 	for nodeID, replica := range status.ReplicaDistribution {
 | |
| 		if replica == 0 { // Node should have replica but doesn't
 | |
| 			unhealthyNodes = append(unhealthyNodes, nodeID)
 | |
| 		}
 | |
| 	}
 | |
| 
 | |
| 	// Repair missing replicas
 | |
| 	repaired := 0
 | |
| 	for _, nodeID := range unhealthyNodes {
 | |
| 		if err := rm.createReplica(ctx, address, nodeID); err != nil {
 | |
| 			result.Errors = append(result.Errors, fmt.Sprintf("failed to repair replica on node %s: %v", nodeID, err))
 | |
| 		} else {
 | |
| 			repaired++
 | |
| 		}
 | |
| 	}
 | |
| 
 | |
| 	result.RepairedReplicas = repaired
 | |
| 	result.RepairTime = time.Since(start)
 | |
| 	result.RepairSuccessful = len(result.Errors) == 0
 | |
| 
 | |
| 	rm.mu.Lock()
 | |
| 	rm.stats.RepairRequests++
 | |
| 	if result.RepairSuccessful {
 | |
| 		rm.stats.SuccessfulRepairs++
 | |
| 	} else {
 | |
| 		rm.stats.FailedRepairs++
 | |
| 	}
 | |
| 	rm.stats.AverageRepairTime = (rm.stats.AverageRepairTime + result.RepairTime) / 2
 | |
| 	rm.stats.LastUpdated = time.Now()
 | |
| 	rm.mu.Unlock()
 | |
| 
 | |
| 	return result, nil
 | |
| }
 | |
| 
 | |
| // BalanceReplicas rebalances replicas across cluster nodes
 | |
| func (rm *ReplicationManagerImpl) BalanceReplicas(ctx context.Context) (*RebalanceResult, error) {
 | |
| 	start := time.Now()
 | |
| 
 | |
| 	result := &RebalanceResult{
 | |
| 		RebalanceTime:      0,
 | |
| 		RebalanceSuccessful: false,
 | |
| 		Errors:             []string{},
 | |
| 		RebalancedAt:       time.Now(),
 | |
| 	}
 | |
| 
 | |
| 	// Get current cluster topology
 | |
| 	peers := rm.dht.GetConnectedPeers()
 | |
| 	if len(peers) < rm.policy.MinFactor {
 | |
| 		result.Errors = append(result.Errors, "insufficient peers for rebalancing")
 | |
| 		return result, fmt.Errorf("insufficient peers for rebalancing")
 | |
| 	}
 | |
| 
 | |
| 	// Calculate ideal distribution
 | |
| 	idealDistribution := rm.calculateIdealDistribution(peers)
 | |
| 
 | |
| 	// Get current distribution for all contexts
 | |
| 	currentDistribution := rm.getCurrentDistribution(ctx)
 | |
| 
 | |
| 	// Calculate moves needed
 | |
| 	moves := rm.calculateRebalanceMoves(currentDistribution, idealDistribution)
 | |
| 
 | |
| 	// Execute moves
 | |
| 	moved := 0
 | |
| 	for _, move := range moves {
 | |
| 		if err := rm.moveReplica(ctx, move); err != nil {
 | |
| 			result.Errors = append(result.Errors, fmt.Sprintf("failed to move replica: %v", err))
 | |
| 		} else {
 | |
| 			moved++
 | |
| 		}
 | |
| 	}
 | |
| 
 | |
| 	result.MovedReplicas = moved
 | |
| 	result.RebalanceTime = time.Since(start)
 | |
| 	result.RebalanceSuccessful = len(result.Errors) == 0
 | |
| 
 | |
| 	// Calculate load balance improvement
 | |
| 	if len(moves) > 0 {
 | |
| 		result.LoadBalanceImprovement = float64(moved) / float64(len(moves))
 | |
| 	}
 | |
| 
 | |
| 	rm.mu.Lock()
 | |
| 	rm.stats.RebalanceOperations++
 | |
| 	rm.stats.LastRebalanceTime = time.Now()
 | |
| 	rm.stats.LastUpdated = time.Now()
 | |
| 	rm.mu.Unlock()
 | |
| 
 | |
| 	return result, nil
 | |
| }
 | |
| 
 | |
| // GetReplicationStatus returns current replication status
 | |
| func (rm *ReplicationManagerImpl) GetReplicationStatus(ctx context.Context, address ucxl.Address) (*ReplicaHealth, error) {
 | |
| 	rm.mu.RLock()
 | |
| 	status, exists := rm.replicationMap[address.String()]
 | |
| 	rm.mu.RUnlock()
 | |
| 
 | |
| 	if !exists {
 | |
| 		// Create new status entry
 | |
| 		status = &ReplicationStatus{
 | |
| 			Address:             address.String(),
 | |
| 			DesiredReplicas:     rm.policy.DefaultFactor,
 | |
| 			CurrentReplicas:     0,
 | |
| 			HealthyReplicas:     0,
 | |
| 			ReplicationHealth:   0.0,
 | |
| 			ReplicaDistribution: make(map[string]int),
 | |
| 			LastReplication:     time.Time{},
 | |
| 			ReplicationErrors:   []string{},
 | |
| 			Status:              "unknown",
 | |
| 		}
 | |
| 
 | |
| 		// Try to discover existing replicas
 | |
| 		rm.discoverReplicas(ctx, address, status)
 | |
| 
 | |
| 		rm.mu.Lock()
 | |
| 		rm.replicationMap[address.String()] = status
 | |
| 		rm.mu.Unlock()
 | |
| 	}
 | |
| 
 | |
| 	// Convert to ReplicaHealth format
 | |
| 	health := &ReplicaHealth{
 | |
| 		Address:         address,
 | |
| 		TotalReplicas:   status.CurrentReplicas,
 | |
| 		HealthyReplicas: status.HealthyReplicas,
 | |
| 		FailedReplicas:  status.CurrentReplicas - status.HealthyReplicas,
 | |
| 		ReplicaNodes:    []*ReplicaNode{},
 | |
| 		OverallHealth:   rm.determineOverallHealth(status),
 | |
| 		LastChecked:     time.Now(),
 | |
| 		RepairNeeded:    status.HealthyReplicas < status.DesiredReplicas,
 | |
| 	}
 | |
| 
 | |
| 	// Populate replica nodes
 | |
| 	for nodeID, count := range status.ReplicaDistribution {
 | |
| 		if count > 0 {
 | |
| 			health.ReplicaNodes = append(health.ReplicaNodes, &ReplicaNode{
 | |
| 				NodeID:         nodeID,
 | |
| 				Status:         rm.getNodeReplicaStatus(nodeID),
 | |
| 				LastSeen:       time.Now(),
 | |
| 				Version:        1,
 | |
| 				Checksum:       "",
 | |
| 				Latency:        0,
 | |
| 				NetworkAddress: nodeID,
 | |
| 			})
 | |
| 		}
 | |
| 	}
 | |
| 
 | |
| 	return health, nil
 | |
| }
 | |
| 
 | |
| // SetReplicationFactor sets the desired replication factor
 | |
| func (rm *ReplicationManagerImpl) SetReplicationFactor(factor int) error {
 | |
| 	if factor < 1 {
 | |
| 		return fmt.Errorf("replication factor must be at least 1")
 | |
| 	}
 | |
| 	if factor > 10 {
 | |
| 		return fmt.Errorf("replication factor cannot exceed 10")
 | |
| 	}
 | |
| 
 | |
| 	rm.mu.Lock()
 | |
| 	rm.policy.DefaultFactor = factor
 | |
| 	rm.mu.Unlock()
 | |
| 
 | |
| 	return nil
 | |
| }
 | |
| 
 | |
| // GetReplicationStats returns replication statistics
 | |
| func (rm *ReplicationManagerImpl) GetReplicationStats() (*ReplicationStatistics, error) {
 | |
| 	rm.mu.RLock()
 | |
| 	defer rm.mu.RUnlock()
 | |
| 
 | |
| 	// Update calculated fields
 | |
| 	rm.stats.AverageReplicationFactor = rm.calculateAverageReplicationFactor()
 | |
| 	rm.stats.ReplicationEfficiency = rm.calculateReplicationEfficiency()
 | |
| 
 | |
| 	return rm.stats, nil
 | |
| }
 | |
| 
 | |
| // Background workers
 | |
| 
 | |
| func (rm *ReplicationManagerImpl) repairWorker(ctx context.Context) {
 | |
| 	for {
 | |
| 		select {
 | |
| 		case <-ctx.Done():
 | |
| 			return
 | |
| 		case req := <-rm.repairQueue:
 | |
| 			if req == nil {
 | |
| 				return // Channel closed
 | |
| 			}
 | |
| 			rm.RepairReplicas(ctx, req.Address)
 | |
| 		}
 | |
| 	}
 | |
| }
 | |
| 
 | |
| func (rm *ReplicationManagerImpl) rebalanceWorker(ctx context.Context) {
 | |
| 	ticker := time.NewTicker(rm.policy.RebalanceInterval)
 | |
| 	defer ticker.Stop()
 | |
| 
 | |
| 	for {
 | |
| 		select {
 | |
| 		case <-ctx.Done():
 | |
| 			return
 | |
| 		case <-ticker.C:
 | |
| 			rm.BalanceReplicas(ctx)
 | |
| 		case req := <-rm.rebalanceQueue:
 | |
| 			if req == nil {
 | |
| 				return // Channel closed
 | |
| 			}
 | |
| 			rm.BalanceReplicas(ctx)
 | |
| 		}
 | |
| 	}
 | |
| }
 | |
| 
 | |
| func (rm *ReplicationManagerImpl) healthChecker(ctx context.Context) {
 | |
| 	ticker := time.NewTicker(5 * time.Minute)
 | |
| 	defer ticker.Stop()
 | |
| 
 | |
| 	for {
 | |
| 		select {
 | |
| 		case <-ctx.Done():
 | |
| 			return
 | |
| 		case <-ticker.C:
 | |
| 			rm.checkReplicaHealth(ctx)
 | |
| 		}
 | |
| 	}
 | |
| }
 | |
| 
 | |
| // Helper methods
 | |
| 
 | |
| func (rm *ReplicationManagerImpl) selectReplicationNodes(address ucxl.Address, count int) ([]string, error) {
 | |
| 	// Use consistent hashing to select nodes
 | |
| 	candidates, err := rm.consistentHash.GetNodes(address.String(), count*2) // Get more candidates than needed
 | |
| 	if err != nil {
 | |
| 		return nil, err
 | |
| 	}
 | |
| 
 | |
| 	// Filter out nodes that already have replicas and apply placement policies
 | |
| 	selectedNodes := []string{}
 | |
| 	for _, nodeID := range candidates {
 | |
| 		if len(selectedNodes) >= count {
 | |
| 			break
 | |
| 		}
 | |
| 
 | |
| 		// Check if node already has this replica
 | |
| 		if rm.hasReplica(address, nodeID) {
 | |
| 			continue
 | |
| 		}
 | |
| 
 | |
| 		// Check placement policies
 | |
| 		if rm.policy.AvoidSameNode && rm.isNodeOverloaded(nodeID) {
 | |
| 			continue
 | |
| 		}
 | |
| 
 | |
| 		selectedNodes = append(selectedNodes, nodeID)
 | |
| 	}
 | |
| 
 | |
| 	return selectedNodes, nil
 | |
| }
 | |
| 
 | |
| func (rm *ReplicationManagerImpl) createReplica(ctx context.Context, address ucxl.Address, nodeID string) error {
 | |
| 	// In a real implementation, this would:
 | |
| 	// 1. Connect to the target node
 | |
| 	// 2. Transfer the context data
 | |
| 	// 3. Verify successful storage
 | |
| 	// For now, we'll simulate success
 | |
| 	return nil
 | |
| }
 | |
| 
 | |
| func (rm *ReplicationManagerImpl) updateReplicationStatus(address ucxl.Address, currentReplicas int) {
 | |
| 	rm.mu.Lock()
 | |
| 	defer rm.mu.Unlock()
 | |
| 
 | |
| 	addressStr := address.String()
 | |
| 	if status, exists := rm.replicationMap[addressStr]; exists {
 | |
| 		status.CurrentReplicas = currentReplicas
 | |
| 		status.LastReplication = time.Now()
 | |
| 	}
 | |
| }
 | |
| 
 | |
| func (rm *ReplicationManagerImpl) discoverReplicas(ctx context.Context, address ucxl.Address, status *ReplicationStatus) {
 | |
| 	// In a real implementation, this would query the DHT to discover existing replicas
 | |
| 	// For now, we'll simulate some replicas
 | |
| 	peers := rm.dht.GetConnectedPeers()
 | |
| 	if len(peers) > 0 {
 | |
| 		status.CurrentReplicas = min(len(peers), rm.policy.DefaultFactor)
 | |
| 		status.HealthyReplicas = status.CurrentReplicas
 | |
| 		
 | |
| 		for i, peer := range peers {
 | |
| 			if i >= status.CurrentReplicas {
 | |
| 				break
 | |
| 			}
 | |
| 			status.ReplicaDistribution[peer.String()] = 1
 | |
| 		}
 | |
| 	}
 | |
| }
 | |
| 
 | |
| func (rm *ReplicationManagerImpl) determineOverallHealth(status *ReplicationStatus) HealthStatus {
 | |
| 	if status.HealthyReplicas == 0 {
 | |
| 		return HealthFailed
 | |
| 	}
 | |
| 	
 | |
| 	healthRatio := float64(status.HealthyReplicas) / float64(status.DesiredReplicas)
 | |
| 	
 | |
| 	if healthRatio >= 1.0 {
 | |
| 		return HealthHealthy
 | |
| 	} else if healthRatio >= 0.7 {
 | |
| 		return HealthDegraded
 | |
| 	} else if healthRatio >= 0.3 {
 | |
| 		return HealthCritical
 | |
| 	} else {
 | |
| 		return HealthFailed
 | |
| 	}
 | |
| }
 | |
| 
 | |
| func (rm *ReplicationManagerImpl) getNodeReplicaStatus(nodeID string) ReplicaStatus {
 | |
| 	// In a real implementation, this would check the actual status of the replica on the node
 | |
| 	// For now, assume healthy
 | |
| 	return ReplicaHealthy
 | |
| }
 | |
| 
 | |
| func (rm *ReplicationManagerImpl) calculateAverageReplicationFactor() float64 {
 | |
| 	rm.mu.RLock()
 | |
| 	defer rm.mu.RUnlock()
 | |
| 
 | |
| 	if len(rm.replicationMap) == 0 {
 | |
| 		return 0
 | |
| 	}
 | |
| 
 | |
| 	total := 0
 | |
| 	for _, status := range rm.replicationMap {
 | |
| 		total += status.CurrentReplicas
 | |
| 	}
 | |
| 
 | |
| 	return float64(total) / float64(len(rm.replicationMap))
 | |
| }
 | |
| 
 | |
| func (rm *ReplicationManagerImpl) calculateReplicationEfficiency() float64 {
 | |
| 	rm.mu.RLock()
 | |
| 	defer rm.mu.RUnlock()
 | |
| 
 | |
| 	if len(rm.replicationMap) == 0 {
 | |
| 		return 1.0
 | |
| 	}
 | |
| 
 | |
| 	efficient := 0
 | |
| 	for _, status := range rm.replicationMap {
 | |
| 		if status.HealthyReplicas >= status.DesiredReplicas {
 | |
| 			efficient++
 | |
| 		}
 | |
| 	}
 | |
| 
 | |
| 	return float64(efficient) / float64(len(rm.replicationMap))
 | |
| }
 | |
| 
 | |
| func (rm *ReplicationManagerImpl) checkReplicaHealth(ctx context.Context) {
 | |
| 	rm.mu.RLock()
 | |
| 	addresses := make([]string, 0, len(rm.replicationMap))
 | |
| 	for addr := range rm.replicationMap {
 | |
| 		addresses = append(addresses, addr)
 | |
| 	}
 | |
| 	rm.mu.RUnlock()
 | |
| 
 | |
| 	for _, addrStr := range addresses {
 | |
| 		addr, err := ucxl.ParseAddress(addrStr)
 | |
| 		if err != nil {
 | |
| 			continue
 | |
| 		}
 | |
| 
 | |
| 		// Check if repair is needed
 | |
| 		status, err := rm.GetReplicationStatus(ctx, addr)
 | |
| 		if err != nil {
 | |
| 			continue
 | |
| 		}
 | |
| 
 | |
| 		if status.RepairNeeded {
 | |
| 			select {
 | |
| 			case rm.repairQueue <- &RepairRequest{
 | |
| 				Address:     addr,
 | |
| 				RequestedBy: "health_checker",
 | |
| 				Priority:    PriorityNormal,
 | |
| 				RequestTime: time.Now(),
 | |
| 			}:
 | |
| 			default:
 | |
| 				// Queue is full, skip this repair
 | |
| 			}
 | |
| 		}
 | |
| 	}
 | |
| }
 | |
| 
 | |
| func (rm *ReplicationManagerImpl) calculateIdealDistribution(peers []peer.ID) map[string]int {
 | |
| 	// Simple ideal distribution - equal replicas per node
 | |
| 	distribution := make(map[string]int)
 | |
| 	for _, peer := range peers {
 | |
| 		distribution[peer.String()] = 0
 | |
| 	}
 | |
| 	return distribution
 | |
| }
 | |
| 
 | |
| func (rm *ReplicationManagerImpl) getCurrentDistribution(ctx context.Context) map[string]map[string]int {
 | |
| 	// Returns current distribution: address -> node -> replica count
 | |
| 	distribution := make(map[string]map[string]int)
 | |
| 	
 | |
| 	rm.mu.RLock()
 | |
| 	for addr, status := range rm.replicationMap {
 | |
| 		distribution[addr] = make(map[string]int)
 | |
| 		for nodeID, count := range status.ReplicaDistribution {
 | |
| 			distribution[addr][nodeID] = count
 | |
| 		}
 | |
| 	}
 | |
| 	rm.mu.RUnlock()
 | |
| 	
 | |
| 	return distribution
 | |
| }
 | |
| 
 | |
| func (rm *ReplicationManagerImpl) calculateRebalanceMoves(current, ideal map[string]map[string]int) []*RebalanceMove {
 | |
| 	moves := []*RebalanceMove{}
 | |
| 	// Simplified implementation - in production would use sophisticated algorithms
 | |
| 	return moves
 | |
| }
 | |
| 
 | |
| func (rm *ReplicationManagerImpl) moveReplica(ctx context.Context, move *RebalanceMove) error {
 | |
| 	// Implementation would move replica from source to target node
 | |
| 	return nil
 | |
| }
 | |
| 
 | |
| func (rm *ReplicationManagerImpl) hasReplica(address ucxl.Address, nodeID string) bool {
 | |
| 	rm.mu.RLock()
 | |
| 	defer rm.mu.RUnlock()
 | |
| 
 | |
| 	if status, exists := rm.replicationMap[address.String()]; exists {
 | |
| 		return status.ReplicaDistribution[nodeID] > 0
 | |
| 	}
 | |
| 	return false
 | |
| }
 | |
| 
 | |
| func (rm *ReplicationManagerImpl) isNodeOverloaded(nodeID string) bool {
 | |
| 	// Simple implementation - check if node has too many replicas
 | |
| 	rm.mu.RLock()
 | |
| 	defer rm.mu.RUnlock()
 | |
| 
 | |
| 	totalReplicas := 0
 | |
| 	for _, status := range rm.replicationMap {
 | |
| 		totalReplicas += status.ReplicaDistribution[nodeID]
 | |
| 	}
 | |
| 
 | |
| 	// Consider overloaded if more than average + 50%
 | |
| 	averageLoad := rm.calculateAverageReplicationFactor()
 | |
| 	return float64(totalReplicas) > averageLoad*1.5
 | |
| }
 | |
| 
 | |
| // RebalanceMove represents a replica move operation
 | |
| type RebalanceMove struct {
 | |
| 	Address     ucxl.Address `json:"address"`
 | |
| 	FromNode    string       `json:"from_node"`
 | |
| 	ToNode      string       `json:"to_node"`
 | |
| 	Priority    Priority     `json:"priority"`
 | |
| 	Reason      string       `json:"reason"`
 | |
| }
 | |
| 
 | |
| // Utility functions
 | |
| func min(a, b int) int {
 | |
| 	if a < b {
 | |
| 		return a
 | |
| 	}
 | |
| 	return b
 | |
| } |