package storage import ( "context" "encoding/json" "fmt" "regexp" "sync" "time" "github.com/go-redis/redis/v8" ) // CacheManagerImpl implements the CacheManager interface using Redis type CacheManagerImpl struct { mu sync.RWMutex client *redis.Client stats *CacheStatistics policy *CachePolicy prefix string nodeID string warmupKeys map[string]bool } // NewCacheManager creates a new cache manager with Redis backend func NewCacheManager(redisAddr, nodeID string, policy *CachePolicy) (*CacheManagerImpl, error) { if policy == nil { policy = DefaultCachePolicy() } // Create Redis client client := redis.NewClient(&redis.Options{ Addr: redisAddr, Password: "", // No password for local Redis DB: 0, // Default DB DialTimeout: 10 * time.Second, ReadTimeout: 5 * time.Second, WriteTimeout: 5 * time.Second, PoolSize: 10, MinIdleConns: 5, }) // Test connection ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) defer cancel() if err := client.Ping(ctx).Err(); err != nil { return nil, fmt.Errorf("failed to connect to Redis: %w", err) } cm := &CacheManagerImpl{ client: client, policy: policy, prefix: fmt.Sprintf("slurp:%s", nodeID), nodeID: nodeID, warmupKeys: make(map[string]bool), stats: &CacheStatistics{ MaxSize: policy.MaxSize, }, } // Start background maintenance if needed go cm.maintenanceLoop() return cm, nil } // DefaultCachePolicy returns default caching policy func DefaultCachePolicy() *CachePolicy { return &CachePolicy{ TTL: 24 * time.Hour, MaxSize: 1024 * 1024 * 1024, // 1GB EvictionPolicy: "LRU", RefreshThreshold: 0.8, // Refresh when 80% of TTL elapsed WarmupEnabled: true, CompressEntries: true, MaxEntrySize: 10 * 1024 * 1024, // 10MB } } // Get retrieves data from cache func (cm *CacheManagerImpl) Get( ctx context.Context, key string, ) (interface{}, bool, error) { start := time.Now() defer func() { cm.updateAccessStats(time.Since(start)) }() cacheKey := cm.buildCacheKey(key) // Get from Redis result, err := cm.client.Get(ctx, cacheKey).Result() if err != nil { if err == redis.Nil { // Cache miss cm.recordMiss() return nil, false, nil } return nil, false, fmt.Errorf("cache get error: %w", err) } // Deserialize cached entry var entry CacheEntry if err := json.Unmarshal([]byte(result), &entry); err != nil { return nil, false, fmt.Errorf("cache entry deserialization error: %w", err) } // Check if entry has expired (additional check beyond Redis TTL) if time.Now().After(entry.ExpiresAt) { // Entry expired, delete it go func() { delCtx, cancel := context.WithTimeout(context.Background(), 5*time.Second) defer cancel() cm.client.Del(delCtx, cacheKey) }() cm.recordMiss() return nil, false, nil } // Update access statistics go func() { updCtx, cancel := context.WithTimeout(context.Background(), 5*time.Second) defer cancel() cm.updateEntryAccess(updCtx, cacheKey) }() // Deserialize the actual data var data interface{} if err := json.Unmarshal(entry.Data, &data); err != nil { return nil, false, fmt.Errorf("data deserialization error: %w", err) } cm.recordHit() return data, true, nil } // Set stores data in cache with TTL func (cm *CacheManagerImpl) Set( ctx context.Context, key string, data interface{}, ttl time.Duration, ) error { start := time.Now() defer func() { cm.updateAccessStats(time.Since(start)) }() // Serialize the data dataBytes, err := json.Marshal(data) if err != nil { return fmt.Errorf("data serialization error: %w", err) } // Check size limits if len(dataBytes) > int(cm.policy.MaxEntrySize) { return fmt.Errorf("data too large: %d bytes exceeds limit of %d", len(dataBytes), cm.policy.MaxEntrySize) } // Create cache entry entry := CacheEntry{ Key: key, Data: dataBytes, CreatedAt: time.Now(), ExpiresAt: time.Now().Add(ttl), TTL: ttl, AccessCount: 1, NodeID: cm.nodeID, } // Apply compression if enabled and beneficial if cm.policy.CompressEntries && len(dataBytes) > 1024 { compressed, err := cm.compress(dataBytes) if err == nil && len(compressed) < len(dataBytes) { entry.Data = compressed entry.Compressed = true entry.OriginalSize = int64(len(dataBytes)) entry.CompressedSize = int64(len(compressed)) } } // Serialize cache entry entryBytes, err := json.Marshal(entry) if err != nil { return fmt.Errorf("cache entry serialization error: %w", err) } cacheKey := cm.buildCacheKey(key) // Store in Redis with TTL if err := cm.client.Set(ctx, cacheKey, entryBytes, ttl).Err(); err != nil { return fmt.Errorf("cache set error: %w", err) } // Update statistics cm.updateCacheSize(int64(len(entryBytes))) return nil } // Delete removes data from cache func (cm *CacheManagerImpl) Delete(ctx context.Context, key string) error { cacheKey := cm.buildCacheKey(key) if err := cm.client.Del(ctx, cacheKey).Err(); err != nil { return fmt.Errorf("cache delete error: %w", err) } return nil } // DeletePattern removes cache entries matching pattern func (cm *CacheManagerImpl) DeletePattern(ctx context.Context, pattern string) error { // Build full pattern with prefix fullPattern := cm.buildCacheKey(pattern) // Use Redis SCAN to find matching keys var cursor uint64 var keys []string for { result, nextCursor, err := cm.client.Scan(ctx, cursor, fullPattern, 100).Result() if err != nil { return fmt.Errorf("cache scan error: %w", err) } keys = append(keys, result...) cursor = nextCursor if cursor == 0 { break } } // Delete found keys in batches if len(keys) > 0 { pipeline := cm.client.Pipeline() for _, key := range keys { pipeline.Del(ctx, key) } if _, err := pipeline.Exec(ctx); err != nil { return fmt.Errorf("cache batch delete error: %w", err) } } return nil } // Clear clears all cache entries func (cm *CacheManagerImpl) Clear(ctx context.Context) error { // Use pattern to delete all entries with our prefix return cm.DeletePattern(ctx, "*") } // Warm pre-loads cache with frequently accessed data func (cm *CacheManagerImpl) Warm(ctx context.Context, keys []string) error { if !cm.policy.WarmupEnabled { return nil } cm.mu.Lock() for _, key := range keys { cm.warmupKeys[key] = true } cm.mu.Unlock() // Warmup process would typically be implemented by the caller // who has access to the actual data sources // Here we just mark keys as warmup candidates return nil } // GetCacheStats returns cache performance statistics func (cm *CacheManagerImpl) GetCacheStats() (*CacheStatistics, error) { cm.mu.RLock() defer cm.mu.RUnlock() // Update Redis memory usage ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) defer cancel() info, err := cm.client.Info(ctx, "memory").Result() if err == nil { // Parse memory info to get actual usage // This is a simplified implementation cm.stats.MemoryUsage = cm.parseMemoryUsage(info) } // Calculate hit rate if cm.stats.TotalHits+cm.stats.TotalMisses > 0 { cm.stats.HitRate = float64(cm.stats.TotalHits) / float64(cm.stats.TotalHits+cm.stats.TotalMisses) cm.stats.MissRate = 1.0 - cm.stats.HitRate } // Return a copy statsCopy := *cm.stats return &statsCopy, nil } // SetCachePolicy sets caching policy func (cm *CacheManagerImpl) SetCachePolicy(policy *CachePolicy) error { cm.mu.Lock() defer cm.mu.Unlock() cm.policy = policy cm.stats.MaxSize = policy.MaxSize return nil } // CacheEntry represents a cached data entry with metadata type CacheEntry struct { Key string `json:"key"` Data []byte `json:"data"` CreatedAt time.Time `json:"created_at"` ExpiresAt time.Time `json:"expires_at"` TTL time.Duration `json:"ttl"` AccessCount int64 `json:"access_count"` LastAccessedAt time.Time `json:"last_accessed_at"` Compressed bool `json:"compressed"` OriginalSize int64 `json:"original_size"` CompressedSize int64 `json:"compressed_size"` NodeID string `json:"node_id"` } // Helper methods func (cm *CacheManagerImpl) buildCacheKey(key string) string { return fmt.Sprintf("%s:%s", cm.prefix, key) } func (cm *CacheManagerImpl) compress(data []byte) ([]byte, error) { // Implement compression (gzip, lz4, etc.) // For now, return as-is return data, nil } func (cm *CacheManagerImpl) decompress(data []byte) ([]byte, error) { // Implement decompression // For now, return as-is return data, nil } func (cm *CacheManagerImpl) recordHit() { cm.mu.Lock() defer cm.mu.Unlock() cm.stats.TotalHits++ cm.stats.LastEviction = time.Now() // Update last activity time } func (cm *CacheManagerImpl) recordMiss() { cm.mu.Lock() defer cm.mu.Unlock() cm.stats.TotalMisses++ } func (cm *CacheManagerImpl) updateAccessStats(duration time.Duration) { cm.mu.Lock() defer cm.mu.Unlock() if cm.stats.AverageLoadTime == 0 { cm.stats.AverageLoadTime = duration } else { // Exponential moving average cm.stats.AverageLoadTime = time.Duration( float64(cm.stats.AverageLoadTime)*0.8 + float64(duration)*0.2, ) } } func (cm *CacheManagerImpl) updateCacheSize(sizeChange int64) { cm.mu.Lock() defer cm.mu.Unlock() cm.stats.CurrentSize += sizeChange } func (cm *CacheManagerImpl) updateEntryAccess(ctx context.Context, cacheKey string) { // Get current entry result, err := cm.client.Get(ctx, cacheKey).Result() if err != nil { return // Entry may have been evicted } var entry CacheEntry if err := json.Unmarshal([]byte(result), &entry); err != nil { return } // Update access statistics entry.AccessCount++ entry.LastAccessedAt = time.Now() // Update entry in cache entryBytes, err := json.Marshal(entry) if err != nil { return } // Update with remaining TTL remaining := cm.client.TTL(ctx, cacheKey).Val() cm.client.Set(ctx, cacheKey, entryBytes, remaining) } func (cm *CacheManagerImpl) parseMemoryUsage(memInfo string) int64 { // Parse Redis memory info to extract usage // This is a simplified implementation // In production, you'd parse the actual INFO memory output return 0 // Placeholder } func (cm *CacheManagerImpl) maintenanceLoop() { ticker := time.NewTicker(5 * time.Minute) defer ticker.Stop() for range ticker.C { cm.performMaintenance() } } func (cm *CacheManagerImpl) performMaintenance() { ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) defer cancel() // Check for expired entries that Redis might have missed // This is additional cleanup beyond Redis's native expiration cm.cleanupExpiredEntries(ctx) // Update memory statistics cm.updateMemoryStats(ctx) } func (cm *CacheManagerImpl) cleanupExpiredEntries(ctx context.Context) { // Scan for entries that might be expired var cursor uint64 pattern := cm.buildCacheKey("*") for { keys, nextCursor, err := cm.client.Scan(ctx, cursor, pattern, 100).Result() if err != nil { break } // Check each key for expiration for _, key := range keys { // Check TTL ttl := cm.client.TTL(ctx, key).Val() if ttl < 0 { // Key has no TTL or is expired cm.client.Del(ctx, key) cm.mu.Lock() cm.stats.EvictionCount++ cm.mu.Unlock() } } cursor = nextCursor if cursor == 0 { break } } } func (cm *CacheManagerImpl) updateMemoryStats(ctx context.Context) { // Get Redis memory statistics info, err := cm.client.Info(ctx, "memory").Result() if err != nil { return } cm.mu.Lock() defer cm.mu.Unlock() cm.stats.MemoryUsage = cm.parseMemoryUsage(info) } // Close closes the cache manager and cleans up resources func (cm *CacheManagerImpl) Close() error { return cm.client.Close() }