package dht import ( "context" "fmt" "sync" "time" "chorus.services/bzzz/pkg/config" ) // HybridDHT provides a switchable interface between mock and real DHT implementations type HybridDHT struct { mockDHT DHT realDHT DHT config *config.HybridConfig // State management currentBackend string fallbackActive bool healthStatus map[string]*BackendHealth // Synchronization mu sync.RWMutex // Monitoring metrics *HybridMetrics logger Logger } // BackendHealth tracks health status of DHT backends type BackendHealth struct { Backend string `json:"backend"` Status HealthStatus `json:"status"` LastCheck time.Time `json:"last_check"` ErrorCount int `json:"error_count"` Latency time.Duration `json:"latency"` Consecutive int `json:"consecutive_failures"` } type HealthStatus string const ( HealthStatusHealthy HealthStatus = "healthy" HealthStatusDegraded HealthStatus = "degraded" HealthStatusFailed HealthStatus = "failed" ) // HybridMetrics tracks hybrid DHT performance and behavior type HybridMetrics struct { mu sync.RWMutex MockRequests uint64 `json:"mock_requests"` RealRequests uint64 `json:"real_requests"` FallbackEvents uint64 `json:"fallback_events"` RecoveryEvents uint64 `json:"recovery_events"` MockLatency time.Duration `json:"mock_latency_avg"` RealLatency time.Duration `json:"real_latency_avg"` MockErrorRate float64 `json:"mock_error_rate"` RealErrorRate float64 `json:"real_error_rate"` TotalOperations uint64 `json:"total_operations"` LastMetricUpdate time.Time `json:"last_update"` } // Logger interface for structured logging type Logger interface { Info(msg string, fields ...interface{}) Warn(msg string, fields ...interface{}) Error(msg string, fields ...interface{}) Debug(msg string, fields ...interface{}) } // NewHybridDHT creates a new hybrid DHT instance func NewHybridDHT(config *config.HybridConfig, logger Logger) (*HybridDHT, error) { hybrid := &HybridDHT{ config: config, logger: logger, healthStatus: make(map[string]*BackendHealth), metrics: &HybridMetrics{}, } // Initialize mock DHT (always available) mockDHT := NewMockDHT() hybrid.mockDHT = mockDHT hybrid.healthStatus["mock"] = &BackendHealth{ Backend: "mock", Status: HealthStatusHealthy, LastCheck: time.Now(), } // Initialize real DHT if enabled if config.IsRealDHTEnabled() { realDHT, err := NewRealDHT(config) if err != nil { logger.Warn("Failed to initialize real DHT, falling back to mock", "error", err) hybrid.currentBackend = "mock" hybrid.fallbackActive = true } else { hybrid.realDHT = realDHT hybrid.currentBackend = "real" hybrid.healthStatus["real"] = &BackendHealth{ Backend: "real", Status: HealthStatusHealthy, LastCheck: time.Now(), } } } else { hybrid.currentBackend = "mock" } // Start health monitoring go hybrid.startHealthMonitoring() go hybrid.startMetricsCollection() logger.Info("Hybrid DHT initialized", "backend", hybrid.currentBackend, "fallback_enabled", config.IsFallbackEnabled()) return hybrid, nil } // PutValue stores a key-value pair using the current backend func (h *HybridDHT) PutValue(ctx context.Context, key string, value []byte) error { start := time.Now() backend := h.getCurrentBackend() var err error switch backend { case "mock": err = h.mockDHT.PutValue(ctx, key, value) h.updateMetrics("mock", start, err) case "real": err = h.realDHT.PutValue(ctx, key, value) h.updateMetrics("real", start, err) // Handle fallback on error if err != nil && h.config.IsFallbackEnabled() { h.logger.Warn("Real DHT PutValue failed, trying fallback", "key", key, "error", err) h.recordBackendError("real") // Try mock fallback fallbackErr := h.mockDHT.PutValue(ctx, key, value) h.updateMetrics("mock", start, fallbackErr) if fallbackErr == nil { h.triggerFallback("real", "mock") return nil } return fmt.Errorf("both real and mock DHT failed: real=%w, mock=%v", err, fallbackErr) } } if err != nil { h.recordBackendError(backend) } else { h.recordBackendSuccess(backend) } return err } // GetValue retrieves a value by key using the current backend func (h *HybridDHT) GetValue(ctx context.Context, key string) ([]byte, error) { start := time.Now() backend := h.getCurrentBackend() var value []byte var err error switch backend { case "mock": value, err = h.mockDHT.GetValue(ctx, key) h.updateMetrics("mock", start, err) case "real": value, err = h.realDHT.GetValue(ctx, key) h.updateMetrics("real", start, err) // Handle fallback on error if err != nil && h.config.IsFallbackEnabled() { h.logger.Warn("Real DHT GetValue failed, trying fallback", "key", key, "error", err) h.recordBackendError("real") // Try mock fallback fallbackValue, fallbackErr := h.mockDHT.GetValue(ctx, key) h.updateMetrics("mock", start, fallbackErr) if fallbackErr == nil { h.triggerFallback("real", "mock") return fallbackValue, nil } return nil, fmt.Errorf("both real and mock DHT failed: real=%w, mock=%v", err, fallbackErr) } } if err != nil { h.recordBackendError(backend) } else { h.recordBackendSuccess(backend) } return value, err } // Provide announces that this node provides a value for the given key func (h *HybridDHT) Provide(ctx context.Context, key, providerId string) error { start := time.Now() backend := h.getCurrentBackend() var err error switch backend { case "mock": err = h.mockDHT.Provide(ctx, key, providerId) h.updateMetrics("mock", start, err) case "real": err = h.realDHT.Provide(ctx, key, providerId) h.updateMetrics("real", start, err) // Handle fallback on error if err != nil && h.config.IsFallbackEnabled() { h.logger.Warn("Real DHT Provide failed, trying fallback", "key", key, "error", err) h.recordBackendError("real") // Try mock fallback fallbackErr := h.mockDHT.Provide(ctx, key, providerId) h.updateMetrics("mock", start, fallbackErr) if fallbackErr == nil { h.triggerFallback("real", "mock") return nil } return fmt.Errorf("both real and mock DHT failed: real=%w, mock=%v", err, fallbackErr) } } if err != nil { h.recordBackendError(backend) } else { h.recordBackendSuccess(backend) } return err } // FindProviders finds providers for the given key func (h *HybridDHT) FindProviders(ctx context.Context, key string) ([]string, error) { start := time.Now() backend := h.getCurrentBackend() var providers []string var err error switch backend { case "mock": providers, err = h.mockDHT.FindProviders(ctx, key) h.updateMetrics("mock", start, err) case "real": providers, err = h.realDHT.FindProviders(ctx, key) h.updateMetrics("real", start, err) // Handle fallback on error if err != nil && h.config.IsFallbackEnabled() { h.logger.Warn("Real DHT FindProviders failed, trying fallback", "key", key, "error", err) h.recordBackendError("real") // Try mock fallback fallbackProviders, fallbackErr := h.mockDHT.FindProviders(ctx, key) h.updateMetrics("mock", start, fallbackErr) if fallbackErr == nil { h.triggerFallback("real", "mock") return fallbackProviders, nil } return nil, fmt.Errorf("both real and mock DHT failed: real=%w, mock=%v", err, fallbackErr) } } if err != nil { h.recordBackendError(backend) } else { h.recordBackendSuccess(backend) } return providers, err } // GetStats returns statistics from the current backend func (h *HybridDHT) GetStats() DHTStats { backend := h.getCurrentBackend() switch backend { case "mock": return h.mockDHT.GetStats() case "real": if h.realDHT != nil { return h.realDHT.GetStats() } fallthrough default: return h.mockDHT.GetStats() } } // GetHybridMetrics returns hybrid-specific metrics func (h *HybridDHT) GetHybridMetrics() *HybridMetrics { h.metrics.mu.RLock() defer h.metrics.mu.RUnlock() // Return a copy to avoid concurrent access issues metrics := *h.metrics return &metrics } // GetBackendHealth returns health status for all backends func (h *HybridDHT) GetBackendHealth() map[string]*BackendHealth { h.mu.RLock() defer h.mu.RUnlock() // Return a deep copy health := make(map[string]*BackendHealth) for k, v := range h.healthStatus { healthCopy := *v health[k] = &healthCopy } return health } // SwitchBackend manually switches to a specific backend func (h *HybridDHT) SwitchBackend(backend string) error { h.mu.Lock() defer h.mu.Unlock() switch backend { case "mock": if h.mockDHT == nil { return fmt.Errorf("mock DHT not available") } h.currentBackend = "mock" h.logger.Info("Manually switched to mock DHT") case "real": if h.realDHT == nil { return fmt.Errorf("real DHT not available") } h.currentBackend = "real" h.fallbackActive = false h.logger.Info("Manually switched to real DHT") default: return fmt.Errorf("unknown backend: %s", backend) } return nil } // Close shuts down the hybrid DHT func (h *HybridDHT) Close() error { h.logger.Info("Shutting down hybrid DHT") var errors []error if h.realDHT != nil { if closer, ok := h.realDHT.(interface{ Close() error }); ok { if err := closer.Close(); err != nil { errors = append(errors, fmt.Errorf("real DHT close error: %w", err)) } } } if h.mockDHT != nil { if closer, ok := h.mockDHT.(interface{ Close() error }); ok { if err := closer.Close(); err != nil { errors = append(errors, fmt.Errorf("mock DHT close error: %w", err)) } } } if len(errors) > 0 { return fmt.Errorf("errors during close: %v", errors) } return nil } // Private methods func (h *HybridDHT) getCurrentBackend() string { h.mu.RLock() defer h.mu.RUnlock() return h.currentBackend } func (h *HybridDHT) triggerFallback(from, to string) { h.mu.Lock() defer h.mu.Unlock() if h.currentBackend != to { h.currentBackend = to h.fallbackActive = true h.metrics.mu.Lock() h.metrics.FallbackEvents++ h.metrics.mu.Unlock() h.logger.Warn("Fallback triggered", "from", from, "to", to) } } func (h *HybridDHT) recordBackendError(backend string) { h.mu.Lock() defer h.mu.Unlock() if health, exists := h.healthStatus[backend]; exists { health.ErrorCount++ health.Consecutive++ health.LastCheck = time.Now() // Update status based on consecutive failures if health.Consecutive >= 3 { health.Status = HealthStatusFailed } else if health.Consecutive >= 1 { health.Status = HealthStatusDegraded } } } func (h *HybridDHT) recordBackendSuccess(backend string) { h.mu.Lock() defer h.mu.Unlock() if health, exists := h.healthStatus[backend]; exists { health.Consecutive = 0 // Reset consecutive failures health.LastCheck = time.Now() health.Status = HealthStatusHealthy // Trigger recovery if we were in fallback mode if h.fallbackActive && backend == "real" && h.config.IsRealDHTEnabled() { h.currentBackend = "real" h.fallbackActive = false h.metrics.mu.Lock() h.metrics.RecoveryEvents++ h.metrics.mu.Unlock() h.logger.Info("Recovery triggered, switched back to real DHT") } } } func (h *HybridDHT) updateMetrics(backend string, start time.Time, err error) { h.metrics.mu.Lock() defer h.metrics.mu.Unlock() latency := time.Since(start) h.metrics.TotalOperations++ h.metrics.LastMetricUpdate = time.Now() switch backend { case "mock": h.metrics.MockRequests++ h.metrics.MockLatency = h.updateAverageLatency(h.metrics.MockLatency, latency, h.metrics.MockRequests) if err != nil { h.metrics.MockErrorRate = h.updateErrorRate(h.metrics.MockErrorRate, true, h.metrics.MockRequests) } else { h.metrics.MockErrorRate = h.updateErrorRate(h.metrics.MockErrorRate, false, h.metrics.MockRequests) } case "real": h.metrics.RealRequests++ h.metrics.RealLatency = h.updateAverageLatency(h.metrics.RealLatency, latency, h.metrics.RealRequests) if err != nil { h.metrics.RealErrorRate = h.updateErrorRate(h.metrics.RealErrorRate, true, h.metrics.RealRequests) } else { h.metrics.RealErrorRate = h.updateErrorRate(h.metrics.RealErrorRate, false, h.metrics.RealRequests) } } } func (h *HybridDHT) updateAverageLatency(currentAvg, newLatency time.Duration, count uint64) time.Duration { if count <= 1 { return newLatency } // Exponential moving average with weight based on count weight := 1.0 / float64(count) return time.Duration(float64(currentAvg)*(1-weight) + float64(newLatency)*weight) } func (h *HybridDHT) updateErrorRate(currentRate float64, isError bool, count uint64) float64 { if count <= 1 { if isError { return 1.0 } return 0.0 } // Exponential moving average for error rate weight := 1.0 / float64(count) errorValue := 0.0 if isError { errorValue = 1.0 } return currentRate*(1-weight) + errorValue*weight } func (h *HybridDHT) startHealthMonitoring() { ticker := time.NewTicker(h.config.DHT.HealthCheckInterval) defer ticker.Stop() for range ticker.C { h.performHealthChecks() } } func (h *HybridDHT) startMetricsCollection() { ticker := time.NewTicker(h.config.Monitoring.MetricsInterval) defer ticker.Stop() for range ticker.C { h.collectAndLogMetrics() } } func (h *HybridDHT) performHealthChecks() { ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) defer cancel() // Health check for real DHT if h.realDHT != nil { start := time.Now() _, err := h.realDHT.GetValue(ctx, "health-check-key") h.mu.Lock() if health, exists := h.healthStatus["real"]; exists { health.LastCheck = time.Now() health.Latency = time.Since(start) if err != nil { health.ErrorCount++ health.Consecutive++ if health.Consecutive >= 3 { health.Status = HealthStatusFailed } else { health.Status = HealthStatusDegraded } } else { health.Consecutive = 0 health.Status = HealthStatusHealthy } } h.mu.Unlock() } // Health check for mock DHT (should always be healthy) h.mu.Lock() if health, exists := h.healthStatus["mock"]; exists { health.LastCheck = time.Now() health.Status = HealthStatusHealthy health.Latency = 1 * time.Millisecond // Mock is always fast } h.mu.Unlock() } func (h *HybridDHT) collectAndLogMetrics() { metrics := h.GetHybridMetrics() health := h.GetBackendHealth() h.logger.Info("Hybrid DHT metrics", "current_backend", h.getCurrentBackend(), "fallback_active", h.fallbackActive, "mock_requests", metrics.MockRequests, "real_requests", metrics.RealRequests, "fallback_events", metrics.FallbackEvents, "recovery_events", metrics.RecoveryEvents, "mock_latency_ms", metrics.MockLatency.Milliseconds(), "real_latency_ms", metrics.RealLatency.Milliseconds(), "mock_error_rate", metrics.MockErrorRate, "real_error_rate", metrics.RealErrorRate, "total_operations", metrics.TotalOperations) // Log health status for backend, healthStatus := range health { h.logger.Debug("Backend health", "backend", backend, "status", healthStatus.Status, "error_count", healthStatus.ErrorCount, "consecutive_failures", healthStatus.Consecutive, "latency_ms", healthStatus.Latency.Milliseconds()) } }