664 lines
		
	
	
		
			17 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
			
		
		
	
	
			664 lines
		
	
	
		
			17 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
| package storage
 | |
| 
 | |
| import (
 | |
| 	"bytes"
 | |
| 	"compress/gzip"
 | |
| 	"context"
 | |
| 	"crypto/sha256"
 | |
| 	"encoding/json"
 | |
| 	"fmt"
 | |
| 	"io"
 | |
| 	"io/fs"
 | |
| 	"os"
 | |
| 	"path/filepath"
 | |
| 	"regexp"
 | |
| 	"sync"
 | |
| 	"syscall"
 | |
| 	"time"
 | |
| 
 | |
| 	"github.com/syndtr/goleveldb/leveldb"
 | |
| 	"github.com/syndtr/goleveldb/leveldb/filter"
 | |
| 	"github.com/syndtr/goleveldb/leveldb/opt"
 | |
| 	"github.com/syndtr/goleveldb/leveldb/util"
 | |
| )
 | |
| 
 | |
| // LocalStorageImpl implements the LocalStorage interface using LevelDB
 | |
| type LocalStorageImpl struct {
 | |
| 	mu       sync.RWMutex
 | |
| 	db       *leveldb.DB
 | |
| 	basePath string
 | |
| 	metrics  *LocalStorageStats
 | |
| 	options  *LocalStorageOptions
 | |
| }
 | |
| 
 | |
| // LocalStorageOptions configures local storage behavior
 | |
| type LocalStorageOptions struct {
 | |
| 	Compression        bool          `json:"compression"`         // Enable compression
 | |
| 	CacheSize          int           `json:"cache_size"`          // Cache size in MB
 | |
| 	WriteBuffer        int           `json:"write_buffer"`        // Write buffer size in MB
 | |
| 	MaxOpenFiles       int           `json:"max_open_files"`      // Maximum open files
 | |
| 	BlockSize          int           `json:"block_size"`          // Block size in KB
 | |
| 	SyncWrites         bool          `json:"sync_writes"`         // Synchronous writes
 | |
| 	CompactionInterval time.Duration `json:"compaction_interval"` // Auto-compaction interval
 | |
| }
 | |
| 
 | |
| // DefaultLocalStorageOptions returns default options
 | |
| func DefaultLocalStorageOptions() *LocalStorageOptions {
 | |
| 	return &LocalStorageOptions{
 | |
| 		Compression:        true,
 | |
| 		CacheSize:          64, // 64MB cache
 | |
| 		WriteBuffer:        16, // 16MB write buffer
 | |
| 		MaxOpenFiles:       1000,
 | |
| 		BlockSize:          4, // 4KB blocks
 | |
| 		SyncWrites:         false,
 | |
| 		CompactionInterval: 24 * time.Hour,
 | |
| 	}
 | |
| }
 | |
| 
 | |
| // NewLocalStorage creates a new local storage implementation
 | |
| func NewLocalStorage(basePath string, options *LocalStorageOptions) (*LocalStorageImpl, error) {
 | |
| 	if options == nil {
 | |
| 		options = DefaultLocalStorageOptions()
 | |
| 	}
 | |
| 
 | |
| 	// Ensure base directory exists
 | |
| 	if err := os.MkdirAll(basePath, 0755); err != nil {
 | |
| 		return nil, fmt.Errorf("failed to create storage directory: %w", err)
 | |
| 	}
 | |
| 
 | |
| 	// Configure LevelDB options
 | |
| 	dbOptions := &opt.Options{
 | |
| 		Filter:                 filter.NewBloomFilter(10),
 | |
| 		Compression:            opt.DefaultCompression,
 | |
| 		BlockCacheCapacity:     options.CacheSize * 1024 * 1024, // Convert MB to bytes
 | |
| 		WriteBuffer:            options.WriteBuffer * 1024 * 1024,
 | |
| 		OpenFilesCacheCapacity: options.MaxOpenFiles,
 | |
| 		BlockSize:              options.BlockSize * 1024, // Convert KB to bytes
 | |
| 	}
 | |
| 
 | |
| 	if !options.Compression {
 | |
| 		dbOptions.Compression = opt.NoCompression
 | |
| 	}
 | |
| 
 | |
| 	// Open LevelDB database
 | |
| 	dbPath := filepath.Join(basePath, "leveldb")
 | |
| 	db, err := leveldb.OpenFile(dbPath, dbOptions)
 | |
| 	if err != nil {
 | |
| 		return nil, fmt.Errorf("failed to open LevelDB: %w", err)
 | |
| 	}
 | |
| 
 | |
| 	ls := &LocalStorageImpl{
 | |
| 		db:       db,
 | |
| 		basePath: basePath,
 | |
| 		options:  options,
 | |
| 		metrics: &LocalStorageStats{
 | |
| 			LastCompaction: time.Now(),
 | |
| 		},
 | |
| 	}
 | |
| 
 | |
| 	// Start background compaction if enabled
 | |
| 	if options.CompactionInterval > 0 {
 | |
| 		go ls.backgroundCompaction()
 | |
| 	}
 | |
| 
 | |
| 	return ls, nil
 | |
| }
 | |
| 
 | |
| // Store stores context data locally with optional encryption
 | |
| func (ls *LocalStorageImpl) Store(
 | |
| 	ctx context.Context,
 | |
| 	key string,
 | |
| 	data interface{},
 | |
| 	options *StoreOptions,
 | |
| ) error {
 | |
| 	start := time.Now()
 | |
| 	ls.mu.Lock()
 | |
| 	defer ls.mu.Unlock()
 | |
| 
 | |
| 	// Update metrics
 | |
| 	defer func() {
 | |
| 		ls.metrics.WriteOperations++
 | |
| 		ls.updateAverageTime(&ls.metrics.AverageWriteTime, time.Since(start))
 | |
| 	}()
 | |
| 
 | |
| 	// Serialize data
 | |
| 	dataBytes, err := json.Marshal(data)
 | |
| 	if err != nil {
 | |
| 		return fmt.Errorf("failed to marshal data: %w", err)
 | |
| 	}
 | |
| 
 | |
| 	// Create storage entry with metadata
 | |
| 	entry := &StorageEntry{
 | |
| 		Key:       key,
 | |
| 		Data:      dataBytes,
 | |
| 		CreatedAt: time.Now(),
 | |
| 		UpdatedAt: time.Now(),
 | |
| 		Metadata:  make(map[string]interface{}),
 | |
| 	}
 | |
| 	entry.Checksum = ls.computeChecksum(dataBytes)
 | |
| 
 | |
| 	// Apply options
 | |
| 	if options != nil {
 | |
| 		entry.TTL = options.TTL
 | |
| 		entry.Compressed = options.Compress
 | |
| 		entry.AccessLevel = options.AccessLevel.String()
 | |
| 
 | |
| 		// Copy metadata
 | |
| 		for k, v := range options.Metadata {
 | |
| 			entry.Metadata[k] = v
 | |
| 		}
 | |
| 	}
 | |
| 
 | |
| 	// Compress if requested
 | |
| 	if entry.Compressed {
 | |
| 		compressedData, err := ls.compress(dataBytes)
 | |
| 		if err != nil {
 | |
| 			return fmt.Errorf("failed to compress data: %w", err)
 | |
| 		}
 | |
| 		entry.Data = compressedData
 | |
| 		entry.OriginalSize = int64(len(dataBytes))
 | |
| 		entry.CompressedSize = int64(len(compressedData))
 | |
| 	}
 | |
| 
 | |
| 	// Serialize entry
 | |
| 	entryBytes, err := json.Marshal(entry)
 | |
| 	if err != nil {
 | |
| 		return fmt.Errorf("failed to marshal storage entry: %w", err)
 | |
| 	}
 | |
| 
 | |
| 	// Store in LevelDB
 | |
| 	writeOpt := &opt.WriteOptions{
 | |
| 		Sync: ls.options.SyncWrites,
 | |
| 	}
 | |
| 
 | |
| 	if err := ls.db.Put([]byte(key), entryBytes, writeOpt); err != nil {
 | |
| 		return fmt.Errorf("failed to store data: %w", err)
 | |
| 	}
 | |
| 
 | |
| 	// Update size metrics
 | |
| 	ls.metrics.TotalSize += int64(len(entryBytes))
 | |
| 	if entry.Compressed {
 | |
| 		ls.metrics.CompressedSize += entry.CompressedSize
 | |
| 	}
 | |
| 	ls.updateFileMetricsLocked()
 | |
| 
 | |
| 	return nil
 | |
| }
 | |
| 
 | |
| // Retrieve retrieves context data from local storage
 | |
| func (ls *LocalStorageImpl) Retrieve(ctx context.Context, key string) (interface{}, error) {
 | |
| 	start := time.Now()
 | |
| 	ls.mu.RLock()
 | |
| 	defer ls.mu.RUnlock()
 | |
| 
 | |
| 	// Update metrics
 | |
| 	defer func() {
 | |
| 		ls.metrics.ReadOperations++
 | |
| 		ls.updateAverageTime(&ls.metrics.AverageReadTime, time.Since(start))
 | |
| 	}()
 | |
| 
 | |
| 	// Retrieve from LevelDB
 | |
| 	entryBytes, err := ls.db.Get([]byte(key), nil)
 | |
| 	if err != nil {
 | |
| 		if err == leveldb.ErrNotFound {
 | |
| 			return nil, fmt.Errorf("%w: %s", ErrNotFound, key)
 | |
| 		}
 | |
| 		return nil, fmt.Errorf("failed to retrieve data: %w", err)
 | |
| 	}
 | |
| 
 | |
| 	// Deserialize entry
 | |
| 	var entry StorageEntry
 | |
| 	if err := json.Unmarshal(entryBytes, &entry); err != nil {
 | |
| 		return nil, fmt.Errorf("failed to unmarshal storage entry: %w", err)
 | |
| 	}
 | |
| 
 | |
| 	// Check TTL if set
 | |
| 	if entry.TTL != nil && time.Since(entry.CreatedAt) > *entry.TTL {
 | |
| 		// Data has expired, delete it
 | |
| 		go func() {
 | |
| 			ls.mu.Lock()
 | |
| 			defer ls.mu.Unlock()
 | |
| 			ls.db.Delete([]byte(key), nil)
 | |
| 		}()
 | |
| 		return nil, fmt.Errorf("data has expired for key: %s", key)
 | |
| 	}
 | |
| 
 | |
| 	// Decompress if needed
 | |
| 	dataBytes := entry.Data
 | |
| 	if entry.Compressed {
 | |
| 		decompressedData, err := ls.decompress(entry.Data)
 | |
| 		if err != nil {
 | |
| 			return nil, fmt.Errorf("failed to decompress data: %w", err)
 | |
| 		}
 | |
| 		dataBytes = decompressedData
 | |
| 	}
 | |
| 
 | |
| 	// Verify integrity against stored checksum (SEC-SLURP-1.1a requirement)
 | |
| 	if entry.Checksum != "" {
 | |
| 		computed := ls.computeChecksum(dataBytes)
 | |
| 		if computed != entry.Checksum {
 | |
| 			return nil, fmt.Errorf("data integrity check failed for key %s", key)
 | |
| 		}
 | |
| 	}
 | |
| 
 | |
| 	// Deserialize data
 | |
| 	var result interface{}
 | |
| 	if err := json.Unmarshal(dataBytes, &result); err != nil {
 | |
| 		return nil, fmt.Errorf("failed to unmarshal data: %w", err)
 | |
| 	}
 | |
| 
 | |
| 	return result, nil
 | |
| }
 | |
| 
 | |
| // Delete removes data from local storage
 | |
| func (ls *LocalStorageImpl) Delete(ctx context.Context, key string) error {
 | |
| 	ls.mu.Lock()
 | |
| 	defer ls.mu.Unlock()
 | |
| 
 | |
| 	// Get size before deletion for metrics
 | |
| 	entryBytes, err := ls.db.Get([]byte(key), nil)
 | |
| 	if err != nil && err != leveldb.ErrNotFound {
 | |
| 		return fmt.Errorf("failed to get data for deletion metrics: %w", err)
 | |
| 	}
 | |
| 
 | |
| 	// Delete from LevelDB
 | |
| 	if err := ls.db.Delete([]byte(key), nil); err != nil {
 | |
| 		return fmt.Errorf("failed to delete data: %w", err)
 | |
| 	}
 | |
| 
 | |
| 	// Update metrics
 | |
| 	if entryBytes != nil {
 | |
| 		ls.metrics.TotalSize -= int64(len(entryBytes))
 | |
| 	}
 | |
| 	ls.updateFileMetricsLocked()
 | |
| 
 | |
| 	return nil
 | |
| }
 | |
| 
 | |
| // Exists checks if data exists locally
 | |
| func (ls *LocalStorageImpl) Exists(ctx context.Context, key string) (bool, error) {
 | |
| 	ls.mu.RLock()
 | |
| 	defer ls.mu.RUnlock()
 | |
| 
 | |
| 	_, err := ls.db.Get([]byte(key), nil)
 | |
| 	if err != nil {
 | |
| 		if err == leveldb.ErrNotFound {
 | |
| 			return false, nil
 | |
| 		}
 | |
| 		return false, fmt.Errorf("failed to check existence: %w", err)
 | |
| 	}
 | |
| 
 | |
| 	return true, nil
 | |
| }
 | |
| 
 | |
| // List lists all keys matching a pattern
 | |
| func (ls *LocalStorageImpl) List(ctx context.Context, pattern string) ([]string, error) {
 | |
| 	ls.mu.RLock()
 | |
| 	defer ls.mu.RUnlock()
 | |
| 
 | |
| 	// Compile regex pattern
 | |
| 	regex, err := regexp.Compile(pattern)
 | |
| 	if err != nil {
 | |
| 		return nil, fmt.Errorf("invalid pattern: %w", err)
 | |
| 	}
 | |
| 
 | |
| 	var keys []string
 | |
| 	iter := ls.db.NewIterator(nil, nil)
 | |
| 	defer iter.Release()
 | |
| 
 | |
| 	for iter.Next() {
 | |
| 		key := string(iter.Key())
 | |
| 		if regex.MatchString(key) {
 | |
| 			keys = append(keys, key)
 | |
| 		}
 | |
| 	}
 | |
| 
 | |
| 	if err := iter.Error(); err != nil {
 | |
| 		return nil, fmt.Errorf("iterator error: %w", err)
 | |
| 	}
 | |
| 
 | |
| 	return keys, nil
 | |
| }
 | |
| 
 | |
| // Size returns the size of stored data
 | |
| func (ls *LocalStorageImpl) Size(ctx context.Context, key string) (int64, error) {
 | |
| 	ls.mu.RLock()
 | |
| 	defer ls.mu.RUnlock()
 | |
| 
 | |
| 	entryBytes, err := ls.db.Get([]byte(key), nil)
 | |
| 	if err != nil {
 | |
| 		if err == leveldb.ErrNotFound {
 | |
| 			return 0, fmt.Errorf("%w: %s", ErrNotFound, key)
 | |
| 		}
 | |
| 		return 0, fmt.Errorf("failed to get data size: %w", err)
 | |
| 	}
 | |
| 
 | |
| 	// Deserialize entry to get original size
 | |
| 	var entry StorageEntry
 | |
| 	if err := json.Unmarshal(entryBytes, &entry); err != nil {
 | |
| 		return int64(len(entryBytes)), nil // Return serialized size if can't deserialize
 | |
| 	}
 | |
| 
 | |
| 	if entry.OriginalSize > 0 {
 | |
| 		return entry.OriginalSize, nil
 | |
| 	}
 | |
| 
 | |
| 	return int64(len(entry.Data)), nil
 | |
| }
 | |
| 
 | |
| // Compact compacts local storage to reclaim space
 | |
| func (ls *LocalStorageImpl) Compact(ctx context.Context) error {
 | |
| 	ls.mu.Lock()
 | |
| 	defer ls.mu.Unlock()
 | |
| 
 | |
| 	start := time.Now()
 | |
| 
 | |
| 	// Perform compaction
 | |
| 	if err := ls.db.CompactRange(util.Range{}); err != nil {
 | |
| 		return fmt.Errorf("failed to compact database: %w", err)
 | |
| 	}
 | |
| 
 | |
| 	// Update metrics
 | |
| 	ls.metrics.LastCompaction = time.Now()
 | |
| 	compactionTime := time.Since(start)
 | |
| 
 | |
| 	// Calculate new fragmentation ratio
 | |
| 	ls.updateFragmentationRatio()
 | |
| 
 | |
| 	// Log compaction
 | |
| 	fmt.Printf("Local storage compaction completed in %v\n", compactionTime)
 | |
| 
 | |
| 	return nil
 | |
| }
 | |
| 
 | |
| // GetLocalStats returns local storage statistics
 | |
| func (ls *LocalStorageImpl) GetLocalStats() (*LocalStorageStats, error) {
 | |
| 	ls.mu.RLock()
 | |
| 	defer ls.mu.RUnlock()
 | |
| 
 | |
| 	// Get LevelDB stats
 | |
| 	dbStats := &leveldb.DBStats{}
 | |
| 	if err := ls.db.Stats(dbStats); err != nil {
 | |
| 		return nil, fmt.Errorf("failed to get database stats: %w", err)
 | |
| 	}
 | |
| 
 | |
| 	// Update file count
 | |
| 	ls.metrics.TotalFiles = int64(dbStats.IOWrite) // Approximate
 | |
| 
 | |
| 	// Get available space
 | |
| 	availableSpace, err := ls.getAvailableSpace()
 | |
| 	if err != nil {
 | |
| 		// Log but don't fail
 | |
| 		fmt.Printf("Failed to get available space: %v\n", err)
 | |
| 	}
 | |
| 	ls.metrics.AvailableSpace = availableSpace
 | |
| 
 | |
| 	// Return a copy
 | |
| 	statsCopy := *ls.metrics
 | |
| 	return &statsCopy, nil
 | |
| }
 | |
| 
 | |
| // StorageEntry represents a stored data entry
 | |
| type StorageEntry struct {
 | |
| 	Key            string                 `json:"key"`
 | |
| 	Data           []byte                 `json:"data"`
 | |
| 	CreatedAt      time.Time              `json:"created_at"`
 | |
| 	UpdatedAt      time.Time              `json:"updated_at"`
 | |
| 	TTL            *time.Duration         `json:"ttl,omitempty"`
 | |
| 	Compressed     bool                   `json:"compressed"`
 | |
| 	OriginalSize   int64                  `json:"original_size"`
 | |
| 	CompressedSize int64                  `json:"compressed_size"`
 | |
| 	Checksum       string                 `json:"checksum"`
 | |
| 	AccessLevel    string                 `json:"access_level"`
 | |
| 	Metadata       map[string]interface{} `json:"metadata"`
 | |
| }
 | |
| 
 | |
| // Helper methods
 | |
| 
 | |
| func (ls *LocalStorageImpl) compress(data []byte) ([]byte, error) {
 | |
| 	// Use gzip compression for efficient data storage
 | |
| 	var buf bytes.Buffer
 | |
| 
 | |
| 	// Create gzip writer with best compression
 | |
| 	writer := gzip.NewWriter(&buf)
 | |
| 	writer.Header.Name = "storage_data"
 | |
| 	writer.Header.Comment = "CHORUS SLURP local storage compressed data"
 | |
| 
 | |
| 	// Write data to gzip writer
 | |
| 	if _, err := writer.Write(data); err != nil {
 | |
| 		writer.Close()
 | |
| 		return nil, fmt.Errorf("failed to write compressed data: %w", err)
 | |
| 	}
 | |
| 
 | |
| 	// Close writer to flush data
 | |
| 	if err := writer.Close(); err != nil {
 | |
| 		return nil, fmt.Errorf("failed to close gzip writer: %w", err)
 | |
| 	}
 | |
| 
 | |
| 	compressed := buf.Bytes()
 | |
| 
 | |
| 	// Only return compressed data if it's actually smaller
 | |
| 	if len(compressed) >= len(data) {
 | |
| 		// Compression didn't help, return original data
 | |
| 		return data, nil
 | |
| 	}
 | |
| 
 | |
| 	return compressed, nil
 | |
| }
 | |
| 
 | |
| func (ls *LocalStorageImpl) computeChecksum(data []byte) string {
 | |
| 	// Compute SHA-256 checksum to satisfy SEC-SLURP-1.1a integrity tracking
 | |
| 	digest := sha256.Sum256(data)
 | |
| 	return fmt.Sprintf("%x", digest)
 | |
| }
 | |
| 
 | |
| func (ls *LocalStorageImpl) updateFileMetricsLocked() {
 | |
| 	// Refresh filesystem metrics using io/fs traversal (SEC-SLURP-1.1a durability telemetry)
 | |
| 	var fileCount int64
 | |
| 	var aggregateSize int64
 | |
| 
 | |
| 	walkErr := fs.WalkDir(os.DirFS(ls.basePath), ".", func(path string, d fs.DirEntry, err error) error {
 | |
| 		if err != nil {
 | |
| 			return err
 | |
| 		}
 | |
| 		if d.IsDir() {
 | |
| 			return nil
 | |
| 		}
 | |
| 		fileCount++
 | |
| 		if info, infoErr := d.Info(); infoErr == nil {
 | |
| 			aggregateSize += info.Size()
 | |
| 		}
 | |
| 		return nil
 | |
| 	})
 | |
| 
 | |
| 	if walkErr != nil {
 | |
| 		fmt.Printf("filesystem metrics refresh failed: %v\n", walkErr)
 | |
| 		return
 | |
| 	}
 | |
| 
 | |
| 	ls.metrics.TotalFiles = fileCount
 | |
| 	if aggregateSize > 0 {
 | |
| 		ls.metrics.TotalSize = aggregateSize
 | |
| 	}
 | |
| }
 | |
| 
 | |
| func (ls *LocalStorageImpl) decompress(data []byte) ([]byte, error) {
 | |
| 	// Create gzip reader
 | |
| 	reader, err := gzip.NewReader(bytes.NewReader(data))
 | |
| 	if err != nil {
 | |
| 		// Data might not be compressed (fallback case)
 | |
| 		return data, nil
 | |
| 	}
 | |
| 	defer reader.Close()
 | |
| 
 | |
| 	// Read decompressed data
 | |
| 	var buf bytes.Buffer
 | |
| 	if _, err := io.Copy(&buf, reader); err != nil {
 | |
| 		return nil, fmt.Errorf("failed to decompress data: %w", err)
 | |
| 	}
 | |
| 
 | |
| 	return buf.Bytes(), nil
 | |
| }
 | |
| 
 | |
| func (ls *LocalStorageImpl) getAvailableSpace() (int64, error) {
 | |
| 	// Get filesystem stats for the storage directory using syscalls
 | |
| 	var stat syscall.Statfs_t
 | |
| 	if err := syscall.Statfs(ls.basePath, &stat); err != nil {
 | |
| 		return 0, fmt.Errorf("failed to get filesystem stats: %w", err)
 | |
| 	}
 | |
| 
 | |
| 	// Calculate available space in bytes
 | |
| 	// Available blocks * block size
 | |
| 	availableBytes := int64(stat.Bavail) * int64(stat.Bsize)
 | |
| 
 | |
| 	return availableBytes, nil
 | |
| }
 | |
| 
 | |
| func (ls *LocalStorageImpl) updateFragmentationRatio() {
 | |
| 	// Simplified fragmentation calculation
 | |
| 	// In production, this would be more sophisticated
 | |
| 	ls.metrics.FragmentationRatio = 0.1 // Placeholder: 10%
 | |
| }
 | |
| 
 | |
| func (ls *LocalStorageImpl) updateAverageTime(currentAvg *time.Duration, newTime time.Duration) {
 | |
| 	// Simple exponential moving average
 | |
| 	if *currentAvg == 0 {
 | |
| 		*currentAvg = newTime
 | |
| 	} else {
 | |
| 		*currentAvg = time.Duration(float64(*currentAvg)*0.8 + float64(newTime)*0.2)
 | |
| 	}
 | |
| }
 | |
| 
 | |
| func (ls *LocalStorageImpl) backgroundCompaction() {
 | |
| 	ticker := time.NewTicker(ls.options.CompactionInterval)
 | |
| 	defer ticker.Stop()
 | |
| 
 | |
| 	for range ticker.C {
 | |
| 		if err := ls.Compact(context.Background()); err != nil {
 | |
| 			fmt.Printf("Background compaction failed: %v\n", err)
 | |
| 		}
 | |
| 	}
 | |
| }
 | |
| 
 | |
| // GetCompressionStats returns compression statistics
 | |
| func (ls *LocalStorageImpl) GetCompressionStats() (*CompressionStats, error) {
 | |
| 	ls.mu.RLock()
 | |
| 	defer ls.mu.RUnlock()
 | |
| 
 | |
| 	stats := &CompressionStats{
 | |
| 		TotalEntries:      0,
 | |
| 		CompressedEntries: 0,
 | |
| 		TotalSize:         ls.metrics.TotalSize,
 | |
| 		CompressedSize:    ls.metrics.CompressedSize,
 | |
| 		CompressionRatio:  0.0,
 | |
| 	}
 | |
| 
 | |
| 	// Iterate through all entries to get accurate stats
 | |
| 	iter := ls.db.NewIterator(nil, nil)
 | |
| 	defer iter.Release()
 | |
| 
 | |
| 	for iter.Next() {
 | |
| 		stats.TotalEntries++
 | |
| 
 | |
| 		// Try to parse entry to check if compressed
 | |
| 		var entry StorageEntry
 | |
| 		if err := json.Unmarshal(iter.Value(), &entry); err == nil {
 | |
| 			if entry.Compressed {
 | |
| 				stats.CompressedEntries++
 | |
| 			}
 | |
| 		}
 | |
| 	}
 | |
| 
 | |
| 	// Calculate compression ratio
 | |
| 	if stats.TotalSize > 0 {
 | |
| 		stats.CompressionRatio = float64(stats.CompressedSize) / float64(stats.TotalSize)
 | |
| 	}
 | |
| 
 | |
| 	return stats, iter.Error()
 | |
| }
 | |
| 
 | |
| // OptimizeStorage performs compression optimization on existing data
 | |
| func (ls *LocalStorageImpl) OptimizeStorage(ctx context.Context, compressThreshold int64) error {
 | |
| 	ls.mu.Lock()
 | |
| 	defer ls.mu.Unlock()
 | |
| 
 | |
| 	optimized := 0
 | |
| 	skipped := 0
 | |
| 
 | |
| 	// Iterate through all entries
 | |
| 	iter := ls.db.NewIterator(nil, nil)
 | |
| 	defer iter.Release()
 | |
| 
 | |
| 	for iter.Next() {
 | |
| 		select {
 | |
| 		case <-ctx.Done():
 | |
| 			return ctx.Err()
 | |
| 		default:
 | |
| 		}
 | |
| 
 | |
| 		key := string(iter.Key())
 | |
| 
 | |
| 		// Parse existing entry
 | |
| 		var entry StorageEntry
 | |
| 		if err := json.Unmarshal(iter.Value(), &entry); err != nil {
 | |
| 			continue // Skip malformed entries
 | |
| 		}
 | |
| 
 | |
| 		// Skip if already compressed or too small
 | |
| 		if entry.Compressed || int64(len(entry.Data)) < compressThreshold {
 | |
| 			skipped++
 | |
| 			continue
 | |
| 		}
 | |
| 
 | |
| 		// Try compression
 | |
| 		compressedData, err := ls.compress(entry.Data)
 | |
| 		if err != nil {
 | |
| 			continue // Skip on compression error
 | |
| 		}
 | |
| 
 | |
| 		// Only update if compression helped
 | |
| 		if len(compressedData) < len(entry.Data) {
 | |
| 			entry.Compressed = true
 | |
| 			entry.OriginalSize = int64(len(entry.Data))
 | |
| 			entry.CompressedSize = int64(len(compressedData))
 | |
| 			entry.Data = compressedData
 | |
| 			entry.UpdatedAt = time.Now()
 | |
| 
 | |
| 			// Save updated entry
 | |
| 			entryBytes, err := json.Marshal(entry)
 | |
| 			if err != nil {
 | |
| 				continue
 | |
| 			}
 | |
| 
 | |
| 			writeOpt := &opt.WriteOptions{Sync: ls.options.SyncWrites}
 | |
| 			if err := ls.db.Put([]byte(key), entryBytes, writeOpt); err != nil {
 | |
| 				continue
 | |
| 			}
 | |
| 
 | |
| 			optimized++
 | |
| 		} else {
 | |
| 			skipped++
 | |
| 		}
 | |
| 	}
 | |
| 
 | |
| 	fmt.Printf("Storage optimization complete: %d entries compressed, %d skipped\n", optimized, skipped)
 | |
| 	return iter.Error()
 | |
| }
 | |
| 
 | |
| // CompressionStats holds compression statistics
 | |
| type CompressionStats struct {
 | |
| 	TotalEntries      int64   `json:"total_entries"`
 | |
| 	CompressedEntries int64   `json:"compressed_entries"`
 | |
| 	TotalSize         int64   `json:"total_size"`
 | |
| 	CompressedSize    int64   `json:"compressed_size"`
 | |
| 	CompressionRatio  float64 `json:"compression_ratio"`
 | |
| }
 | |
| 
 | |
| // Close closes the local storage
 | |
| func (ls *LocalStorageImpl) Close() error {
 | |
| 	ls.mu.Lock()
 | |
| 	defer ls.mu.Unlock()
 | |
| 
 | |
| 	return ls.db.Close()
 | |
| }
 | 
