package dht import ( "context" "crypto/sha256" "fmt" "log" "sync" "time" "github.com/ipfs/go-cid" "github.com/libp2p/go-libp2p/core/peer" "github.com/libp2p/go-libp2p/core/routing" "github.com/multiformats/go-multihash" ) // ReplicationManager manages DHT data replication and provider records type ReplicationManager struct { dht routing.Routing ctx context.Context cancel context.CancelFunc config *ReplicationConfig // Provider tracking providers map[string]*ProviderRecord providersMutex sync.RWMutex // Replication tracking contentKeys map[string]*ContentRecord keysMutex sync.RWMutex // Background tasks reprovideTimer *time.Timer cleanupTimer *time.Timer // Metrics metrics *ReplicationMetrics logger func(msg string, args ...interface{}) } // ReplicationConfig holds replication configuration type ReplicationConfig struct { // Target replication factor for content ReplicationFactor int // Interval for reproviding content ReprovideInterval time.Duration // Cleanup interval for stale records CleanupInterval time.Duration // Provider record TTL ProviderTTL time.Duration // Maximum number of providers to track per key MaxProvidersPerKey int // Enable automatic replication EnableAutoReplication bool // Enable periodic reproviding EnableReprovide bool // Maximum concurrent replication operations MaxConcurrentReplications int } // ProviderRecord tracks providers for a specific content key type ProviderRecord struct { Key string Providers []ProviderInfo LastUpdate time.Time TTL time.Duration } // ProviderInfo contains information about a content provider type ProviderInfo struct { PeerID peer.ID AddedAt time.Time LastSeen time.Time Quality float64 // Quality score 0.0-1.0 Distance uint32 // XOR distance from key } // ContentRecord tracks local content for replication type ContentRecord struct { Key string Size int64 CreatedAt time.Time LastProvided time.Time ReplicationCount int Priority int // Higher priority gets replicated first } // ReplicationMetrics tracks replication statistics type ReplicationMetrics struct { mu sync.RWMutex TotalKeys int64 TotalProviders int64 ReprovideOperations int64 SuccessfulReplications int64 FailedReplications int64 LastReprovideTime time.Time LastCleanupTime time.Time AverageReplication float64 } // DefaultReplicationConfig returns default replication configuration func DefaultReplicationConfig() *ReplicationConfig { return &ReplicationConfig{ ReplicationFactor: 3, ReprovideInterval: 12 * time.Hour, CleanupInterval: 1 * time.Hour, ProviderTTL: 24 * time.Hour, MaxProvidersPerKey: 10, EnableAutoReplication: true, EnableReprovide: true, MaxConcurrentReplications: 5, } } // NewReplicationManager creates a new replication manager func NewReplicationManager(ctx context.Context, dht routing.Routing, config *ReplicationConfig) *ReplicationManager { if config == nil { config = DefaultReplicationConfig() } rmCtx, cancel := context.WithCancel(ctx) rm := &ReplicationManager{ dht: dht, ctx: rmCtx, cancel: cancel, config: config, providers: make(map[string]*ProviderRecord), contentKeys: make(map[string]*ContentRecord), metrics: &ReplicationMetrics{}, logger: func(msg string, args ...interface{}) { log.Printf("[REPLICATION] "+msg, args...) }, } // Start background tasks rm.startBackgroundTasks() return rm } // AddContent registers content for replication management func (rm *ReplicationManager) AddContent(key string, size int64, priority int) error { rm.keysMutex.Lock() defer rm.keysMutex.Unlock() record := &ContentRecord{ Key: key, Size: size, CreatedAt: time.Now(), LastProvided: time.Time{}, // Will be set on first provide ReplicationCount: 0, Priority: priority, } rm.contentKeys[key] = record rm.updateMetrics() rm.logger("Added content for replication: %s (size: %d, priority: %d)", key, size, priority) // Immediately provide if auto-replication is enabled if rm.config.EnableAutoReplication { go rm.provideContent(key) } return nil } // RemoveContent removes content from replication management func (rm *ReplicationManager) RemoveContent(key string) error { rm.keysMutex.Lock() delete(rm.contentKeys, key) rm.keysMutex.Unlock() rm.providersMutex.Lock() delete(rm.providers, key) rm.providersMutex.Unlock() rm.updateMetrics() rm.logger("Removed content from replication: %s", key) return nil } // ProvideContent announces this node as a provider for the given key func (rm *ReplicationManager) ProvideContent(key string) error { return rm.provideContent(key) } // FindProviders discovers providers for a given content key func (rm *ReplicationManager) FindProviders(ctx context.Context, key string, limit int) ([]ProviderInfo, error) { // First check our local provider cache rm.providersMutex.RLock() if record, exists := rm.providers[key]; exists && time.Since(record.LastUpdate) < record.TTL { rm.providersMutex.RUnlock() // Return cached providers (up to limit) providers := make([]ProviderInfo, 0, len(record.Providers)) for i, provider := range record.Providers { if i >= limit { break } providers = append(providers, provider) } return providers, nil } rm.providersMutex.RUnlock() // Query DHT for providers keyHash := sha256.Sum256([]byte(key)) // Create a proper CID from the hash mh, err := multihash.EncodeName(keyHash[:], "sha2-256") if err != nil { return nil, fmt.Errorf("failed to encode multihash: %w", err) } contentID := cid.NewCidV1(cid.Raw, mh) // Use DHT to find providers providerCh := rm.dht.FindProvidersAsync(ctx, contentID, limit) var providers []ProviderInfo for providerInfo := range providerCh { if len(providers) >= limit { break } provider := ProviderInfo{ PeerID: providerInfo.ID, AddedAt: time.Now(), LastSeen: time.Now(), Quality: 1.0, // Default quality Distance: calculateDistance(keyHash[:], providerInfo.ID), } providers = append(providers, provider) } // Cache the results rm.updateProviderCache(key, providers) rm.logger("Found %d providers for key: %s", len(providers), key) return providers, nil } // GetReplicationStatus returns replication status for a specific key func (rm *ReplicationManager) GetReplicationStatus(key string) (*ReplicationStatus, error) { rm.keysMutex.RLock() content, contentExists := rm.contentKeys[key] rm.keysMutex.RUnlock() rm.providersMutex.RLock() providers, providersExist := rm.providers[key] rm.providersMutex.RUnlock() status := &ReplicationStatus{ Key: key, TargetReplicas: rm.config.ReplicationFactor, ActualReplicas: 0, LastReprovided: time.Time{}, HealthyProviders: 0, IsLocal: contentExists, } if contentExists { status.LastReprovided = content.LastProvided status.CreatedAt = content.CreatedAt status.Size = content.Size status.Priority = content.Priority } if providersExist { status.ActualReplicas = len(providers.Providers) // Count healthy providers (seen recently) cutoff := time.Now().Add(-rm.config.ProviderTTL / 2) for _, provider := range providers.Providers { if provider.LastSeen.After(cutoff) { status.HealthyProviders++ } } status.Providers = providers.Providers } // Determine health status if status.ActualReplicas >= status.TargetReplicas { status.Health = "healthy" } else if status.ActualReplicas > 0 { status.Health = "degraded" } else { status.Health = "critical" } return status, nil } // GetMetrics returns replication metrics func (rm *ReplicationManager) GetMetrics() *ReplicationMetrics { rm.metrics.mu.RLock() defer rm.metrics.mu.RUnlock() // Create a copy to avoid race conditions metrics := *rm.metrics return &metrics } // provideContent performs the actual content provision operation func (rm *ReplicationManager) provideContent(key string) error { ctx, cancel := context.WithTimeout(rm.ctx, 30*time.Second) defer cancel() keyHash := sha256.Sum256([]byte(key)) // Create a proper CID from the hash mh, err := multihash.EncodeName(keyHash[:], "sha2-256") if err != nil { rm.metrics.mu.Lock() rm.metrics.FailedReplications++ rm.metrics.mu.Unlock() return fmt.Errorf("failed to encode multihash: %w", err) } contentID := cid.NewCidV1(cid.Raw, mh) // Provide the content to the DHT if err := rm.dht.Provide(ctx, contentID, true); err != nil { rm.metrics.mu.Lock() rm.metrics.FailedReplications++ rm.metrics.mu.Unlock() return fmt.Errorf("failed to provide content %s: %w", key, err) } // Update local records rm.keysMutex.Lock() if record, exists := rm.contentKeys[key]; exists { record.LastProvided = time.Now() record.ReplicationCount++ } rm.keysMutex.Unlock() rm.metrics.mu.Lock() rm.metrics.SuccessfulReplications++ rm.metrics.mu.Unlock() rm.logger("Successfully provided content: %s", key) return nil } // updateProviderCache updates the provider cache for a key func (rm *ReplicationManager) updateProviderCache(key string, providers []ProviderInfo) { rm.providersMutex.Lock() defer rm.providersMutex.Unlock() record := &ProviderRecord{ Key: key, Providers: providers, LastUpdate: time.Now(), TTL: rm.config.ProviderTTL, } // Limit the number of providers if len(record.Providers) > rm.config.MaxProvidersPerKey { record.Providers = record.Providers[:rm.config.MaxProvidersPerKey] } rm.providers[key] = record } // startBackgroundTasks starts periodic maintenance tasks func (rm *ReplicationManager) startBackgroundTasks() { // Reprovide task if rm.config.EnableReprovide { rm.reprovideTimer = time.AfterFunc(rm.config.ReprovideInterval, func() { rm.performReprovide() // Reschedule rm.reprovideTimer.Reset(rm.config.ReprovideInterval) }) } // Cleanup task rm.cleanupTimer = time.AfterFunc(rm.config.CleanupInterval, func() { rm.performCleanup() // Reschedule rm.cleanupTimer.Reset(rm.config.CleanupInterval) }) } // performReprovide re-provides all local content func (rm *ReplicationManager) performReprovide() { rm.logger("Starting reprovide operation") start := time.Now() rm.keysMutex.RLock() keys := make([]string, 0, len(rm.contentKeys)) for key := range rm.contentKeys { keys = append(keys, key) } rm.keysMutex.RUnlock() // Provide all keys with concurrency limit semaphore := make(chan struct{}, rm.config.MaxConcurrentReplications) var wg sync.WaitGroup var successful, failed int64 for _, key := range keys { wg.Add(1) go func(k string) { defer wg.Done() semaphore <- struct{}{} // Acquire defer func() { <-semaphore }() // Release if err := rm.provideContent(k); err != nil { rm.logger("Failed to reprovide %s: %v", k, err) failed++ } else { successful++ } }(key) } wg.Wait() rm.metrics.mu.Lock() rm.metrics.ReprovideOperations++ rm.metrics.LastReprovideTime = time.Now() rm.metrics.mu.Unlock() duration := time.Since(start) rm.logger("Reprovide operation completed: %d successful, %d failed, took %v", successful, failed, duration) } // performCleanup removes stale provider records func (rm *ReplicationManager) performCleanup() { rm.logger("Starting cleanup operation") rm.providersMutex.Lock() defer rm.providersMutex.Unlock() cutoff := time.Now().Add(-rm.config.ProviderTTL) removed := 0 for key, record := range rm.providers { if record.LastUpdate.Before(cutoff) { delete(rm.providers, key) removed++ } else { // Clean up individual providers within the record validProviders := make([]ProviderInfo, 0, len(record.Providers)) for _, provider := range record.Providers { if provider.LastSeen.After(cutoff) { validProviders = append(validProviders, provider) } } record.Providers = validProviders } } rm.metrics.mu.Lock() rm.metrics.LastCleanupTime = time.Now() rm.metrics.mu.Unlock() rm.logger("Cleanup operation completed: removed %d stale records", removed) } // updateMetrics recalculates metrics func (rm *ReplicationManager) updateMetrics() { rm.metrics.mu.Lock() defer rm.metrics.mu.Unlock() rm.metrics.TotalKeys = int64(len(rm.contentKeys)) totalProviders := int64(0) totalReplications := int64(0) for _, record := range rm.providers { totalProviders += int64(len(record.Providers)) } for _, content := range rm.contentKeys { totalReplications += int64(content.ReplicationCount) } rm.metrics.TotalProviders = totalProviders if rm.metrics.TotalKeys > 0 { rm.metrics.AverageReplication = float64(totalReplications) / float64(rm.metrics.TotalKeys) } } // Stop stops the replication manager func (rm *ReplicationManager) Stop() error { rm.cancel() if rm.reprovideTimer != nil { rm.reprovideTimer.Stop() } if rm.cleanupTimer != nil { rm.cleanupTimer.Stop() } rm.logger("Replication manager stopped") return nil } // ReplicationStatus holds the replication status of a specific key type ReplicationStatus struct { Key string TargetReplicas int ActualReplicas int HealthyProviders int LastReprovided time.Time CreatedAt time.Time Size int64 Priority int Health string // "healthy", "degraded", "critical" IsLocal bool Providers []ProviderInfo } // calculateDistance calculates XOR distance between key and peer ID func calculateDistance(key []byte, peerID peer.ID) uint32 { peerBytes := []byte(peerID) var distance uint32 minLen := len(key) if len(peerBytes) < minLen { minLen = len(peerBytes) } for i := 0; i < minLen; i++ { distance ^= uint32(key[i] ^ peerBytes[i]) } return distance }