Files
CHORUS/pkg/slurp/storage/local_storage.go
anthonyrawlins 9bdcbe0447 Integrate BACKBEAT SDK and resolve KACHING license validation
Major integrations and fixes:
- Added BACKBEAT SDK integration for P2P operation timing
- Implemented beat-aware status tracking for distributed operations
- Added Docker secrets support for secure license management
- Resolved KACHING license validation via HTTPS/TLS
- Updated docker-compose configuration for clean stack deployment
- Disabled rollback policies to prevent deployment failures
- Added license credential storage (CHORUS-DEV-MULTI-001)

Technical improvements:
- BACKBEAT P2P operation tracking with phase management
- Enhanced configuration system with file-based secrets
- Improved error handling for license validation
- Clean separation of KACHING and CHORUS deployment stacks

🤖 Generated with [Claude Code](https://claude.ai/code)

Co-Authored-By: Claude <noreply@anthropic.com>
2025-09-06 07:56:26 +10:00

616 lines
16 KiB
Go

package storage
import (
"bytes"
"compress/gzip"
"context"
"crypto/sha256"
"encoding/json"
"fmt"
"io"
"io/fs"
"os"
"path/filepath"
"regexp"
"sync"
"syscall"
"time"
"github.com/syndtr/goleveldb/leveldb"
"github.com/syndtr/goleveldb/leveldb/filter"
"github.com/syndtr/goleveldb/leveldb/opt"
"github.com/syndtr/goleveldb/leveldb/util"
)
// LocalStorageImpl implements the LocalStorage interface using LevelDB
type LocalStorageImpl struct {
mu sync.RWMutex
db *leveldb.DB
basePath string
metrics *LocalStorageStats
options *LocalStorageOptions
}
// LocalStorageOptions configures local storage behavior
type LocalStorageOptions struct {
Compression bool `json:"compression"` // Enable compression
CacheSize int `json:"cache_size"` // Cache size in MB
WriteBuffer int `json:"write_buffer"` // Write buffer size in MB
MaxOpenFiles int `json:"max_open_files"` // Maximum open files
BlockSize int `json:"block_size"` // Block size in KB
SyncWrites bool `json:"sync_writes"` // Synchronous writes
CompactionInterval time.Duration `json:"compaction_interval"` // Auto-compaction interval
}
// DefaultLocalStorageOptions returns default options
func DefaultLocalStorageOptions() *LocalStorageOptions {
return &LocalStorageOptions{
Compression: true,
CacheSize: 64, // 64MB cache
WriteBuffer: 16, // 16MB write buffer
MaxOpenFiles: 1000,
BlockSize: 4, // 4KB blocks
SyncWrites: false,
CompactionInterval: 24 * time.Hour,
}
}
// NewLocalStorage creates a new local storage implementation
func NewLocalStorage(basePath string, options *LocalStorageOptions) (*LocalStorageImpl, error) {
if options == nil {
options = DefaultLocalStorageOptions()
}
// Ensure base directory exists
if err := os.MkdirAll(basePath, 0755); err != nil {
return nil, fmt.Errorf("failed to create storage directory: %w", err)
}
// Configure LevelDB options
dbOptions := &opt.Options{
Filter: filter.NewBloomFilter(10),
Compression: opt.DefaultCompression,
BlockCacheCapacity: options.CacheSize * 1024 * 1024, // Convert MB to bytes
WriteBuffer: options.WriteBuffer * 1024 * 1024,
OpenFilesCacheCapacity: options.MaxOpenFiles,
BlockSize: options.BlockSize * 1024, // Convert KB to bytes
}
if !options.Compression {
dbOptions.Compression = opt.NoCompression
}
// Open LevelDB database
dbPath := filepath.Join(basePath, "leveldb")
db, err := leveldb.OpenFile(dbPath, dbOptions)
if err != nil {
return nil, fmt.Errorf("failed to open LevelDB: %w", err)
}
ls := &LocalStorageImpl{
db: db,
basePath: basePath,
options: options,
metrics: &LocalStorageStats{
LastCompaction: time.Now(),
},
}
// Start background compaction if enabled
if options.CompactionInterval > 0 {
go ls.backgroundCompaction()
}
return ls, nil
}
// Store stores context data locally with optional encryption
func (ls *LocalStorageImpl) Store(
ctx context.Context,
key string,
data interface{},
options *StoreOptions,
) error {
start := time.Now()
ls.mu.Lock()
defer ls.mu.Unlock()
// Update metrics
defer func() {
ls.metrics.WriteOperations++
ls.updateAverageTime(&ls.metrics.AverageWriteTime, time.Since(start))
}()
// Serialize data
dataBytes, err := json.Marshal(data)
if err != nil {
return fmt.Errorf("failed to marshal data: %w", err)
}
// Create storage entry with metadata
entry := &StorageEntry{
Key: key,
Data: dataBytes,
CreatedAt: time.Now(),
UpdatedAt: time.Now(),
Metadata: make(map[string]interface{}),
}
// Apply options
if options != nil {
entry.TTL = options.TTL
entry.Compressed = options.Compress
entry.AccessLevel = string(options.AccessLevel)
// Copy metadata
for k, v := range options.Metadata {
entry.Metadata[k] = v
}
}
// Compress if requested
if entry.Compressed {
compressedData, err := ls.compress(dataBytes)
if err != nil {
return fmt.Errorf("failed to compress data: %w", err)
}
entry.Data = compressedData
entry.OriginalSize = int64(len(dataBytes))
entry.CompressedSize = int64(len(compressedData))
}
// Serialize entry
entryBytes, err := json.Marshal(entry)
if err != nil {
return fmt.Errorf("failed to marshal storage entry: %w", err)
}
// Store in LevelDB
writeOpt := &opt.WriteOptions{
Sync: ls.options.SyncWrites,
}
if err := ls.db.Put([]byte(key), entryBytes, writeOpt); err != nil {
return fmt.Errorf("failed to store data: %w", err)
}
// Update size metrics
ls.metrics.TotalSize += int64(len(entryBytes))
if entry.Compressed {
ls.metrics.CompressedSize += entry.CompressedSize
}
return nil
}
// Retrieve retrieves context data from local storage
func (ls *LocalStorageImpl) Retrieve(ctx context.Context, key string) (interface{}, error) {
start := time.Now()
ls.mu.RLock()
defer ls.mu.RUnlock()
// Update metrics
defer func() {
ls.metrics.ReadOperations++
ls.updateAverageTime(&ls.metrics.AverageReadTime, time.Since(start))
}()
// Retrieve from LevelDB
entryBytes, err := ls.db.Get([]byte(key), nil)
if err != nil {
if err == leveldb.ErrNotFound {
return nil, fmt.Errorf("key not found: %s", key)
}
return nil, fmt.Errorf("failed to retrieve data: %w", err)
}
// Deserialize entry
var entry StorageEntry
if err := json.Unmarshal(entryBytes, &entry); err != nil {
return nil, fmt.Errorf("failed to unmarshal storage entry: %w", err)
}
// Check TTL if set
if entry.TTL != nil && time.Since(entry.CreatedAt) > *entry.TTL {
// Data has expired, delete it
go func() {
ls.mu.Lock()
defer ls.mu.Unlock()
ls.db.Delete([]byte(key), nil)
}()
return nil, fmt.Errorf("data has expired for key: %s", key)
}
// Decompress if needed
dataBytes := entry.Data
if entry.Compressed {
decompressedData, err := ls.decompress(entry.Data)
if err != nil {
return nil, fmt.Errorf("failed to decompress data: %w", err)
}
dataBytes = decompressedData
}
// Deserialize data
var result interface{}
if err := json.Unmarshal(dataBytes, &result); err != nil {
return nil, fmt.Errorf("failed to unmarshal data: %w", err)
}
return result, nil
}
// Delete removes data from local storage
func (ls *LocalStorageImpl) Delete(ctx context.Context, key string) error {
ls.mu.Lock()
defer ls.mu.Unlock()
// Get size before deletion for metrics
entryBytes, err := ls.db.Get([]byte(key), nil)
if err != nil && err != leveldb.ErrNotFound {
return fmt.Errorf("failed to get data for deletion metrics: %w", err)
}
// Delete from LevelDB
if err := ls.db.Delete([]byte(key), nil); err != nil {
return fmt.Errorf("failed to delete data: %w", err)
}
// Update metrics
if entryBytes != nil {
ls.metrics.TotalSize -= int64(len(entryBytes))
}
return nil
}
// Exists checks if data exists locally
func (ls *LocalStorageImpl) Exists(ctx context.Context, key string) (bool, error) {
ls.mu.RLock()
defer ls.mu.RUnlock()
_, err := ls.db.Get([]byte(key), nil)
if err != nil {
if err == leveldb.ErrNotFound {
return false, nil
}
return false, fmt.Errorf("failed to check existence: %w", err)
}
return true, nil
}
// List lists all keys matching a pattern
func (ls *LocalStorageImpl) List(ctx context.Context, pattern string) ([]string, error) {
ls.mu.RLock()
defer ls.mu.RUnlock()
// Compile regex pattern
regex, err := regexp.Compile(pattern)
if err != nil {
return nil, fmt.Errorf("invalid pattern: %w", err)
}
var keys []string
iter := ls.db.NewIterator(nil, nil)
defer iter.Release()
for iter.Next() {
key := string(iter.Key())
if regex.MatchString(key) {
keys = append(keys, key)
}
}
if err := iter.Error(); err != nil {
return nil, fmt.Errorf("iterator error: %w", err)
}
return keys, nil
}
// Size returns the size of stored data
func (ls *LocalStorageImpl) Size(ctx context.Context, key string) (int64, error) {
ls.mu.RLock()
defer ls.mu.RUnlock()
entryBytes, err := ls.db.Get([]byte(key), nil)
if err != nil {
if err == leveldb.ErrNotFound {
return 0, fmt.Errorf("key not found: %s", key)
}
return 0, fmt.Errorf("failed to get data size: %w", err)
}
// Deserialize entry to get original size
var entry StorageEntry
if err := json.Unmarshal(entryBytes, &entry); err != nil {
return int64(len(entryBytes)), nil // Return serialized size if can't deserialize
}
if entry.OriginalSize > 0 {
return entry.OriginalSize, nil
}
return int64(len(entry.Data)), nil
}
// Compact compacts local storage to reclaim space
func (ls *LocalStorageImpl) Compact(ctx context.Context) error {
ls.mu.Lock()
defer ls.mu.Unlock()
start := time.Now()
// Perform compaction
if err := ls.db.CompactRange(util.Range{}); err != nil {
return fmt.Errorf("failed to compact database: %w", err)
}
// Update metrics
ls.metrics.LastCompaction = time.Now()
compactionTime := time.Since(start)
// Calculate new fragmentation ratio
ls.updateFragmentationRatio()
// Log compaction
fmt.Printf("Local storage compaction completed in %v\n", compactionTime)
return nil
}
// GetLocalStats returns local storage statistics
func (ls *LocalStorageImpl) GetLocalStats() (*LocalStorageStats, error) {
ls.mu.RLock()
defer ls.mu.RUnlock()
// Get LevelDB stats
dbStats := &leveldb.DBStats{}
if err := ls.db.Stats(dbStats); err != nil {
return nil, fmt.Errorf("failed to get database stats: %w", err)
}
// Update file count
ls.metrics.TotalFiles = int64(dbStats.IOWrite) // Approximate
// Get available space
availableSpace, err := ls.getAvailableSpace()
if err != nil {
// Log but don't fail
fmt.Printf("Failed to get available space: %v\n", err)
}
ls.metrics.AvailableSpace = availableSpace
// Return a copy
statsCopy := *ls.metrics
return &statsCopy, nil
}
// StorageEntry represents a stored data entry
type StorageEntry struct {
Key string `json:"key"`
Data []byte `json:"data"`
CreatedAt time.Time `json:"created_at"`
UpdatedAt time.Time `json:"updated_at"`
TTL *time.Duration `json:"ttl,omitempty"`
Compressed bool `json:"compressed"`
OriginalSize int64 `json:"original_size"`
CompressedSize int64 `json:"compressed_size"`
AccessLevel string `json:"access_level"`
Metadata map[string]interface{} `json:"metadata"`
}
// Helper methods
func (ls *LocalStorageImpl) compress(data []byte) ([]byte, error) {
// Use gzip compression for efficient data storage
var buf bytes.Buffer
// Create gzip writer with best compression
writer := gzip.NewWriter(&buf)
writer.Header.Name = "storage_data"
writer.Header.Comment = "CHORUS SLURP local storage compressed data"
// Write data to gzip writer
if _, err := writer.Write(data); err != nil {
writer.Close()
return nil, fmt.Errorf("failed to write compressed data: %w", err)
}
// Close writer to flush data
if err := writer.Close(); err != nil {
return nil, fmt.Errorf("failed to close gzip writer: %w", err)
}
compressed := buf.Bytes()
// Only return compressed data if it's actually smaller
if len(compressed) >= len(data) {
// Compression didn't help, return original data
return data, nil
}
return compressed, nil
}
func (ls *LocalStorageImpl) decompress(data []byte) ([]byte, error) {
// Create gzip reader
reader, err := gzip.NewReader(bytes.NewReader(data))
if err != nil {
// Data might not be compressed (fallback case)
return data, nil
}
defer reader.Close()
// Read decompressed data
var buf bytes.Buffer
if _, err := io.Copy(&buf, reader); err != nil {
return nil, fmt.Errorf("failed to decompress data: %w", err)
}
return buf.Bytes(), nil
}
func (ls *LocalStorageImpl) getAvailableSpace() (int64, error) {
// Get filesystem stats for the storage directory using syscalls
var stat syscall.Statfs_t
if err := syscall.Statfs(ls.basePath, &stat); err != nil {
return 0, fmt.Errorf("failed to get filesystem stats: %w", err)
}
// Calculate available space in bytes
// Available blocks * block size
availableBytes := int64(stat.Bavail) * int64(stat.Bsize)
return availableBytes, nil
}
func (ls *LocalStorageImpl) updateFragmentationRatio() {
// Simplified fragmentation calculation
// In production, this would be more sophisticated
ls.metrics.FragmentationRatio = 0.1 // Placeholder: 10%
}
func (ls *LocalStorageImpl) updateAverageTime(currentAvg *time.Duration, newTime time.Duration) {
// Simple exponential moving average
if *currentAvg == 0 {
*currentAvg = newTime
} else {
*currentAvg = time.Duration(float64(*currentAvg)*0.8 + float64(newTime)*0.2)
}
}
func (ls *LocalStorageImpl) backgroundCompaction() {
ticker := time.NewTicker(ls.options.CompactionInterval)
defer ticker.Stop()
for range ticker.C {
if err := ls.Compact(context.Background()); err != nil {
fmt.Printf("Background compaction failed: %v\n", err)
}
}
}
// GetCompressionStats returns compression statistics
func (ls *LocalStorageImpl) GetCompressionStats() (*CompressionStats, error) {
ls.mu.RLock()
defer ls.mu.RUnlock()
stats := &CompressionStats{
TotalEntries: 0,
CompressedEntries: 0,
TotalSize: ls.metrics.TotalSize,
CompressedSize: ls.metrics.CompressedSize,
CompressionRatio: 0.0,
}
// Iterate through all entries to get accurate stats
iter := ls.db.NewIterator(nil, nil)
defer iter.Release()
for iter.Next() {
stats.TotalEntries++
// Try to parse entry to check if compressed
var entry StorageEntry
if err := json.Unmarshal(iter.Value(), &entry); err == nil {
if entry.Compressed {
stats.CompressedEntries++
}
}
}
// Calculate compression ratio
if stats.TotalSize > 0 {
stats.CompressionRatio = float64(stats.CompressedSize) / float64(stats.TotalSize)
}
return stats, iter.Error()
}
// OptimizeStorage performs compression optimization on existing data
func (ls *LocalStorageImpl) OptimizeStorage(ctx context.Context, compressThreshold int64) error {
ls.mu.Lock()
defer ls.mu.Unlock()
optimized := 0
skipped := 0
// Iterate through all entries
iter := ls.db.NewIterator(nil, nil)
defer iter.Release()
for iter.Next() {
select {
case <-ctx.Done():
return ctx.Err()
default:
}
key := string(iter.Key())
// Parse existing entry
var entry StorageEntry
if err := json.Unmarshal(iter.Value(), &entry); err != nil {
continue // Skip malformed entries
}
// Skip if already compressed or too small
if entry.Compressed || int64(len(entry.Data)) < compressThreshold {
skipped++
continue
}
// Try compression
compressedData, err := ls.compress(entry.Data)
if err != nil {
continue // Skip on compression error
}
// Only update if compression helped
if len(compressedData) < len(entry.Data) {
entry.Compressed = true
entry.OriginalSize = int64(len(entry.Data))
entry.CompressedSize = int64(len(compressedData))
entry.Data = compressedData
entry.UpdatedAt = time.Now()
// Save updated entry
entryBytes, err := json.Marshal(entry)
if err != nil {
continue
}
writeOpt := &opt.WriteOptions{Sync: ls.options.SyncWrites}
if err := ls.db.Put([]byte(key), entryBytes, writeOpt); err != nil {
continue
}
optimized++
} else {
skipped++
}
}
fmt.Printf("Storage optimization complete: %d entries compressed, %d skipped\n", optimized, skipped)
return iter.Error()
}
// CompressionStats holds compression statistics
type CompressionStats struct {
TotalEntries int64 `json:"total_entries"`
CompressedEntries int64 `json:"compressed_entries"`
TotalSize int64 `json:"total_size"`
CompressedSize int64 `json:"compressed_size"`
CompressionRatio float64 `json:"compression_ratio"`
}
// Close closes the local storage
func (ls *LocalStorageImpl) Close() error {
ls.mu.Lock()
defer ls.mu.Unlock()
return ls.db.Close()
}