482 lines
12 KiB
Go
482 lines
12 KiB
Go
package storage
|
|
|
|
import (
|
|
"context"
|
|
"encoding/json"
|
|
"fmt"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/go-redis/redis/v8"
|
|
)
|
|
|
|
// CacheManagerImpl implements the CacheManager interface using Redis
|
|
type CacheManagerImpl struct {
|
|
mu sync.RWMutex
|
|
client *redis.Client
|
|
stats *CacheStatistics
|
|
policy *CachePolicy
|
|
prefix string
|
|
nodeID string
|
|
warmupKeys map[string]bool
|
|
}
|
|
|
|
// NewCacheManager creates a new cache manager with Redis backend
|
|
func NewCacheManager(redisAddr, nodeID string, policy *CachePolicy) (*CacheManagerImpl, error) {
|
|
if policy == nil {
|
|
policy = DefaultCachePolicy()
|
|
}
|
|
|
|
// Create Redis client
|
|
client := redis.NewClient(&redis.Options{
|
|
Addr: redisAddr,
|
|
Password: "", // No password for local Redis
|
|
DB: 0, // Default DB
|
|
DialTimeout: 10 * time.Second,
|
|
ReadTimeout: 5 * time.Second,
|
|
WriteTimeout: 5 * time.Second,
|
|
PoolSize: 10,
|
|
MinIdleConns: 5,
|
|
})
|
|
|
|
// Test connection
|
|
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
|
|
defer cancel()
|
|
|
|
if err := client.Ping(ctx).Err(); err != nil {
|
|
return nil, fmt.Errorf("failed to connect to Redis: %w", err)
|
|
}
|
|
|
|
cm := &CacheManagerImpl{
|
|
client: client,
|
|
policy: policy,
|
|
prefix: fmt.Sprintf("slurp:%s", nodeID),
|
|
nodeID: nodeID,
|
|
warmupKeys: make(map[string]bool),
|
|
stats: &CacheStatistics{
|
|
MaxSize: policy.MaxSize,
|
|
},
|
|
}
|
|
|
|
// Start background maintenance if needed
|
|
go cm.maintenanceLoop()
|
|
|
|
return cm, nil
|
|
}
|
|
|
|
// DefaultCachePolicy returns default caching policy
|
|
func DefaultCachePolicy() *CachePolicy {
|
|
return &CachePolicy{
|
|
TTL: 24 * time.Hour,
|
|
MaxSize: 1024 * 1024 * 1024, // 1GB
|
|
EvictionPolicy: "LRU",
|
|
RefreshThreshold: 0.8, // Refresh when 80% of TTL elapsed
|
|
WarmupEnabled: true,
|
|
CompressEntries: true,
|
|
MaxEntrySize: 10 * 1024 * 1024, // 10MB
|
|
}
|
|
}
|
|
|
|
// Get retrieves data from cache
|
|
func (cm *CacheManagerImpl) Get(
|
|
ctx context.Context,
|
|
key string,
|
|
) (interface{}, bool, error) {
|
|
start := time.Now()
|
|
defer func() {
|
|
cm.updateAccessStats(time.Since(start))
|
|
}()
|
|
|
|
cacheKey := cm.buildCacheKey(key)
|
|
|
|
// Get from Redis
|
|
result, err := cm.client.Get(ctx, cacheKey).Result()
|
|
if err != nil {
|
|
if err == redis.Nil {
|
|
// Cache miss
|
|
cm.recordMiss()
|
|
return nil, false, nil
|
|
}
|
|
return nil, false, fmt.Errorf("cache get error: %w", err)
|
|
}
|
|
|
|
// Deserialize cached entry
|
|
var entry CacheEntry
|
|
if err := json.Unmarshal([]byte(result), &entry); err != nil {
|
|
return nil, false, fmt.Errorf("cache entry deserialization error: %w", err)
|
|
}
|
|
|
|
// Check if entry has expired (additional check beyond Redis TTL)
|
|
if time.Now().After(entry.ExpiresAt) {
|
|
// Entry expired, delete it
|
|
go func() {
|
|
delCtx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
|
|
defer cancel()
|
|
cm.client.Del(delCtx, cacheKey)
|
|
}()
|
|
cm.recordMiss()
|
|
return nil, false, nil
|
|
}
|
|
|
|
// Update access statistics
|
|
go func() {
|
|
updCtx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
|
|
defer cancel()
|
|
cm.updateEntryAccess(updCtx, cacheKey)
|
|
}()
|
|
|
|
// Deserialize the actual data
|
|
var data interface{}
|
|
if err := json.Unmarshal(entry.Data, &data); err != nil {
|
|
return nil, false, fmt.Errorf("data deserialization error: %w", err)
|
|
}
|
|
|
|
cm.recordHit()
|
|
return data, true, nil
|
|
}
|
|
|
|
// Set stores data in cache with TTL
|
|
func (cm *CacheManagerImpl) Set(
|
|
ctx context.Context,
|
|
key string,
|
|
data interface{},
|
|
ttl time.Duration,
|
|
) error {
|
|
start := time.Now()
|
|
defer func() {
|
|
cm.updateAccessStats(time.Since(start))
|
|
}()
|
|
|
|
// Serialize the data
|
|
dataBytes, err := json.Marshal(data)
|
|
if err != nil {
|
|
return fmt.Errorf("data serialization error: %w", err)
|
|
}
|
|
|
|
// Check size limits
|
|
if len(dataBytes) > int(cm.policy.MaxEntrySize) {
|
|
return fmt.Errorf("data too large: %d bytes exceeds limit of %d", len(dataBytes), cm.policy.MaxEntrySize)
|
|
}
|
|
|
|
// Create cache entry
|
|
entry := CacheEntry{
|
|
Key: key,
|
|
Data: dataBytes,
|
|
CreatedAt: time.Now(),
|
|
ExpiresAt: time.Now().Add(ttl),
|
|
TTL: ttl,
|
|
AccessCount: 1,
|
|
NodeID: cm.nodeID,
|
|
}
|
|
|
|
// Apply compression if enabled and beneficial
|
|
if cm.policy.CompressEntries && len(dataBytes) > 1024 {
|
|
compressed, err := cm.compress(dataBytes)
|
|
if err == nil && len(compressed) < len(dataBytes) {
|
|
entry.Data = compressed
|
|
entry.Compressed = true
|
|
entry.OriginalSize = int64(len(dataBytes))
|
|
entry.CompressedSize = int64(len(compressed))
|
|
}
|
|
}
|
|
|
|
// Serialize cache entry
|
|
entryBytes, err := json.Marshal(entry)
|
|
if err != nil {
|
|
return fmt.Errorf("cache entry serialization error: %w", err)
|
|
}
|
|
|
|
cacheKey := cm.buildCacheKey(key)
|
|
|
|
// Store in Redis with TTL
|
|
if err := cm.client.Set(ctx, cacheKey, entryBytes, ttl).Err(); err != nil {
|
|
return fmt.Errorf("cache set error: %w", err)
|
|
}
|
|
|
|
// Update statistics
|
|
cm.updateCacheSize(int64(len(entryBytes)))
|
|
|
|
return nil
|
|
}
|
|
|
|
// Delete removes data from cache
|
|
func (cm *CacheManagerImpl) Delete(ctx context.Context, key string) error {
|
|
cacheKey := cm.buildCacheKey(key)
|
|
|
|
if err := cm.client.Del(ctx, cacheKey).Err(); err != nil {
|
|
return fmt.Errorf("cache delete error: %w", err)
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// DeletePattern removes cache entries matching pattern
|
|
func (cm *CacheManagerImpl) DeletePattern(ctx context.Context, pattern string) error {
|
|
// Build full pattern with prefix
|
|
fullPattern := cm.buildCacheKey(pattern)
|
|
|
|
// Use Redis SCAN to find matching keys
|
|
var cursor uint64
|
|
var keys []string
|
|
|
|
for {
|
|
result, nextCursor, err := cm.client.Scan(ctx, cursor, fullPattern, 100).Result()
|
|
if err != nil {
|
|
return fmt.Errorf("cache scan error: %w", err)
|
|
}
|
|
|
|
keys = append(keys, result...)
|
|
cursor = nextCursor
|
|
|
|
if cursor == 0 {
|
|
break
|
|
}
|
|
}
|
|
|
|
// Delete found keys in batches
|
|
if len(keys) > 0 {
|
|
pipeline := cm.client.Pipeline()
|
|
for _, key := range keys {
|
|
pipeline.Del(ctx, key)
|
|
}
|
|
|
|
if _, err := pipeline.Exec(ctx); err != nil {
|
|
return fmt.Errorf("cache batch delete error: %w", err)
|
|
}
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// Clear clears all cache entries
|
|
func (cm *CacheManagerImpl) Clear(ctx context.Context) error {
|
|
// Use pattern to delete all entries with our prefix
|
|
return cm.DeletePattern(ctx, "*")
|
|
}
|
|
|
|
// Warm pre-loads cache with frequently accessed data
|
|
func (cm *CacheManagerImpl) Warm(ctx context.Context, keys []string) error {
|
|
if !cm.policy.WarmupEnabled {
|
|
return nil
|
|
}
|
|
|
|
cm.mu.Lock()
|
|
for _, key := range keys {
|
|
cm.warmupKeys[key] = true
|
|
}
|
|
cm.mu.Unlock()
|
|
|
|
// Warmup process would typically be implemented by the caller
|
|
// who has access to the actual data sources
|
|
// Here we just mark keys as warmup candidates
|
|
|
|
return nil
|
|
}
|
|
|
|
// GetCacheStats returns cache performance statistics
|
|
func (cm *CacheManagerImpl) GetCacheStats() (*CacheStatistics, error) {
|
|
cm.mu.RLock()
|
|
defer cm.mu.RUnlock()
|
|
|
|
// Update Redis memory usage
|
|
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
|
|
defer cancel()
|
|
|
|
info, err := cm.client.Info(ctx, "memory").Result()
|
|
if err == nil {
|
|
// Parse memory info to get actual usage
|
|
// This is a simplified implementation
|
|
cm.stats.MemoryUsage = cm.parseMemoryUsage(info)
|
|
}
|
|
|
|
// Calculate hit rate
|
|
if cm.stats.TotalHits+cm.stats.TotalMisses > 0 {
|
|
cm.stats.HitRate = float64(cm.stats.TotalHits) / float64(cm.stats.TotalHits+cm.stats.TotalMisses)
|
|
cm.stats.MissRate = 1.0 - cm.stats.HitRate
|
|
}
|
|
|
|
// Return a copy
|
|
statsCopy := *cm.stats
|
|
return &statsCopy, nil
|
|
}
|
|
|
|
// SetCachePolicy sets caching policy
|
|
func (cm *CacheManagerImpl) SetCachePolicy(policy *CachePolicy) error {
|
|
cm.mu.Lock()
|
|
defer cm.mu.Unlock()
|
|
|
|
cm.policy = policy
|
|
cm.stats.MaxSize = policy.MaxSize
|
|
|
|
return nil
|
|
}
|
|
|
|
// CacheEntry represents a cached data entry with metadata
|
|
type CacheEntry struct {
|
|
Key string `json:"key"`
|
|
Data []byte `json:"data"`
|
|
CreatedAt time.Time `json:"created_at"`
|
|
ExpiresAt time.Time `json:"expires_at"`
|
|
TTL time.Duration `json:"ttl"`
|
|
AccessCount int64 `json:"access_count"`
|
|
LastAccessedAt time.Time `json:"last_accessed_at"`
|
|
Compressed bool `json:"compressed"`
|
|
OriginalSize int64 `json:"original_size"`
|
|
CompressedSize int64 `json:"compressed_size"`
|
|
NodeID string `json:"node_id"`
|
|
}
|
|
|
|
// Helper methods
|
|
|
|
func (cm *CacheManagerImpl) buildCacheKey(key string) string {
|
|
return fmt.Sprintf("%s:%s", cm.prefix, key)
|
|
}
|
|
|
|
func (cm *CacheManagerImpl) compress(data []byte) ([]byte, error) {
|
|
// Implement compression (gzip, lz4, etc.)
|
|
// For now, return as-is
|
|
return data, nil
|
|
}
|
|
|
|
func (cm *CacheManagerImpl) decompress(data []byte) ([]byte, error) {
|
|
// Implement decompression
|
|
// For now, return as-is
|
|
return data, nil
|
|
}
|
|
|
|
func (cm *CacheManagerImpl) recordHit() {
|
|
cm.mu.Lock()
|
|
defer cm.mu.Unlock()
|
|
cm.stats.TotalHits++
|
|
cm.stats.LastEviction = time.Now() // Update last activity time
|
|
}
|
|
|
|
func (cm *CacheManagerImpl) recordMiss() {
|
|
cm.mu.Lock()
|
|
defer cm.mu.Unlock()
|
|
cm.stats.TotalMisses++
|
|
}
|
|
|
|
func (cm *CacheManagerImpl) updateAccessStats(duration time.Duration) {
|
|
cm.mu.Lock()
|
|
defer cm.mu.Unlock()
|
|
|
|
if cm.stats.AverageLoadTime == 0 {
|
|
cm.stats.AverageLoadTime = duration
|
|
} else {
|
|
// Exponential moving average
|
|
cm.stats.AverageLoadTime = time.Duration(
|
|
float64(cm.stats.AverageLoadTime)*0.8 + float64(duration)*0.2,
|
|
)
|
|
}
|
|
}
|
|
|
|
func (cm *CacheManagerImpl) updateCacheSize(sizeChange int64) {
|
|
cm.mu.Lock()
|
|
defer cm.mu.Unlock()
|
|
cm.stats.CurrentSize += sizeChange
|
|
}
|
|
|
|
func (cm *CacheManagerImpl) updateEntryAccess(ctx context.Context, cacheKey string) {
|
|
// Get current entry
|
|
result, err := cm.client.Get(ctx, cacheKey).Result()
|
|
if err != nil {
|
|
return // Entry may have been evicted
|
|
}
|
|
|
|
var entry CacheEntry
|
|
if err := json.Unmarshal([]byte(result), &entry); err != nil {
|
|
return
|
|
}
|
|
|
|
// Update access statistics
|
|
entry.AccessCount++
|
|
entry.LastAccessedAt = time.Now()
|
|
|
|
// Update entry in cache
|
|
entryBytes, err := json.Marshal(entry)
|
|
if err != nil {
|
|
return
|
|
}
|
|
|
|
// Update with remaining TTL
|
|
remaining := cm.client.TTL(ctx, cacheKey).Val()
|
|
cm.client.Set(ctx, cacheKey, entryBytes, remaining)
|
|
}
|
|
|
|
func (cm *CacheManagerImpl) parseMemoryUsage(memInfo string) int64 {
|
|
// Parse Redis memory info to extract usage
|
|
// This is a simplified implementation
|
|
// In production, you'd parse the actual INFO memory output
|
|
return 0 // Placeholder
|
|
}
|
|
|
|
func (cm *CacheManagerImpl) maintenanceLoop() {
|
|
ticker := time.NewTicker(5 * time.Minute)
|
|
defer ticker.Stop()
|
|
|
|
for range ticker.C {
|
|
cm.performMaintenance()
|
|
}
|
|
}
|
|
|
|
func (cm *CacheManagerImpl) performMaintenance() {
|
|
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
|
|
defer cancel()
|
|
|
|
// Check for expired entries that Redis might have missed
|
|
// This is additional cleanup beyond Redis's native expiration
|
|
cm.cleanupExpiredEntries(ctx)
|
|
|
|
// Update memory statistics
|
|
cm.updateMemoryStats(ctx)
|
|
}
|
|
|
|
func (cm *CacheManagerImpl) cleanupExpiredEntries(ctx context.Context) {
|
|
// Scan for entries that might be expired
|
|
var cursor uint64
|
|
pattern := cm.buildCacheKey("*")
|
|
|
|
for {
|
|
keys, nextCursor, err := cm.client.Scan(ctx, cursor, pattern, 100).Result()
|
|
if err != nil {
|
|
break
|
|
}
|
|
|
|
// Check each key for expiration
|
|
for _, key := range keys {
|
|
// Check TTL
|
|
ttl := cm.client.TTL(ctx, key).Val()
|
|
if ttl < 0 {
|
|
// Key has no TTL or is expired
|
|
cm.client.Del(ctx, key)
|
|
cm.mu.Lock()
|
|
cm.stats.EvictionCount++
|
|
cm.mu.Unlock()
|
|
}
|
|
}
|
|
|
|
cursor = nextCursor
|
|
if cursor == 0 {
|
|
break
|
|
}
|
|
}
|
|
}
|
|
|
|
func (cm *CacheManagerImpl) updateMemoryStats(ctx context.Context) {
|
|
// Get Redis memory statistics
|
|
info, err := cm.client.Info(ctx, "memory").Result()
|
|
if err != nil {
|
|
return
|
|
}
|
|
|
|
cm.mu.Lock()
|
|
defer cm.mu.Unlock()
|
|
cm.stats.MemoryUsage = cm.parseMemoryUsage(info)
|
|
}
|
|
|
|
// Close closes the cache manager and cleans up resources
|
|
func (cm *CacheManagerImpl) Close() error {
|
|
return cm.client.Close()
|
|
}
|