package storage import ( "context" "encoding/json" "fmt" "sync" "time" "chorus/pkg/crypto" "chorus/pkg/dht" "chorus/pkg/ucxl" slurpContext "chorus/pkg/slurp/context" ) // ContextStoreImpl is the main implementation of the ContextStore interface // It coordinates between local storage, distributed storage, encryption, caching, and indexing type ContextStoreImpl struct { mu sync.RWMutex localStorage LocalStorage distributedStorage DistributedStorage encryptedStorage EncryptedStorage cacheManager CacheManager indexManager IndexManager backupManager BackupManager eventNotifier EventNotifier // Configuration nodeID string options *ContextStoreOptions // Statistics and monitoring statistics *StorageStatistics metricsCollector *MetricsCollector // Background processes stopCh chan struct{} syncTicker *time.Ticker compactionTicker *time.Ticker cleanupTicker *time.Ticker } // ContextStoreOptions configures the context store behavior type ContextStoreOptions struct { // Storage configuration PreferLocal bool `json:"prefer_local"` AutoReplicate bool `json:"auto_replicate"` DefaultReplicas int `json:"default_replicas"` EncryptionEnabled bool `json:"encryption_enabled"` CompressionEnabled bool `json:"compression_enabled"` // Caching configuration CachingEnabled bool `json:"caching_enabled"` CacheTTL time.Duration `json:"cache_ttl"` CacheSize int64 `json:"cache_size"` // Indexing configuration IndexingEnabled bool `json:"indexing_enabled"` IndexRefreshInterval time.Duration `json:"index_refresh_interval"` // Background processes SyncInterval time.Duration `json:"sync_interval"` CompactionInterval time.Duration `json:"compaction_interval"` CleanupInterval time.Duration `json:"cleanup_interval"` // Performance tuning BatchSize int `json:"batch_size"` MaxConcurrentOps int `json:"max_concurrent_ops"` OperationTimeout time.Duration `json:"operation_timeout"` } // MetricsCollector collects and aggregates storage metrics type MetricsCollector struct { mu sync.RWMutex operationCount map[string]int64 latencyHistogram map[string][]time.Duration errorCount map[string]int64 lastCollected time.Time } // DefaultContextStoreOptions returns sensible defaults func DefaultContextStoreOptions() *ContextStoreOptions { return &ContextStoreOptions{ PreferLocal: true, AutoReplicate: true, DefaultReplicas: 3, EncryptionEnabled: true, CompressionEnabled: true, CachingEnabled: true, CacheTTL: 24 * time.Hour, CacheSize: 1024 * 1024 * 1024, // 1GB IndexingEnabled: true, IndexRefreshInterval: 5 * time.Minute, SyncInterval: 10 * time.Minute, CompactionInterval: 24 * time.Hour, CleanupInterval: 1 * time.Hour, BatchSize: 100, MaxConcurrentOps: 10, OperationTimeout: 30 * time.Second, } } // NewContextStore creates a new context store with all components func NewContextStore( nodeID string, localStorage LocalStorage, distributedStorage DistributedStorage, encryptedStorage EncryptedStorage, cacheManager CacheManager, indexManager IndexManager, backupManager BackupManager, eventNotifier EventNotifier, options *ContextStoreOptions, ) *ContextStoreImpl { if options == nil { options = DefaultContextStoreOptions() } cs := &ContextStoreImpl{ localStorage: localStorage, distributedStorage: distributedStorage, encryptedStorage: encryptedStorage, cacheManager: cacheManager, indexManager: indexManager, backupManager: backupManager, eventNotifier: eventNotifier, nodeID: nodeID, options: options, statistics: &StorageStatistics{ LastSyncTime: time.Now(), }, metricsCollector: &MetricsCollector{ operationCount: make(map[string]int64), latencyHistogram: make(map[string][]time.Duration), errorCount: make(map[string]int64), lastCollected: time.Now(), }, stopCh: make(chan struct{}), } // Start background processes cs.startBackgroundProcesses() return cs } // StoreContext stores a context node with role-based encryption func (cs *ContextStoreImpl) StoreContext( ctx context.Context, node *slurpContext.ContextNode, roles []string, ) error { start := time.Now() defer func() { cs.recordLatency("store", time.Since(start)) }() // Validate input if node == nil { return fmt.Errorf("context node cannot be nil") } if len(roles) == 0 { return fmt.Errorf("at least one role must be specified") } // Generate storage key storageKey := cs.generateStorageKey(node.UCXLAddress) // Store based on configuration var storeErr error if cs.options.EncryptionEnabled { // Store encrypted for each role storeErr = cs.encryptedStorage.StoreEncrypted(ctx, storageKey, node, roles) } else { // Store unencrypted storeOptions := &StoreOptions{ Encrypt: false, Replicate: cs.options.AutoReplicate, Index: cs.options.IndexingEnabled, Cache: cs.options.CachingEnabled, Compress: cs.options.CompressionEnabled, } storeErr = cs.localStorage.Store(ctx, storageKey, node, storeOptions) } if storeErr != nil { cs.recordError("store", storeErr) return fmt.Errorf("failed to store context: %w", storeErr) } // Update search indexes if enabled if cs.options.IndexingEnabled { if err := cs.updateSearchIndexes(ctx, node); err != nil { // Log but don't fail the store operation cs.recordError("index_update", err) } } // Cache the context if enabled if cs.options.CachingEnabled { for _, role := range roles { cacheKey := cs.generateCacheKey(node.UCXLAddress, role) if err := cs.cacheManager.Set(ctx, cacheKey, node, cs.options.CacheTTL); err != nil { // Log but don't fail cs.recordError("cache_set", err) } } } // Replicate to distributed storage if enabled if cs.options.AutoReplicate && cs.distributedStorage != nil { go func() { replicateCtx, cancel := context.WithTimeout(context.Background(), cs.options.OperationTimeout) defer cancel() distOptions := &DistributedStoreOptions{ ReplicationFactor: cs.options.DefaultReplicas, ConsistencyLevel: ConsistencyQuorum, Timeout: cs.options.OperationTimeout, SyncMode: SyncAsync, } if err := cs.distributedStorage.Store(replicateCtx, storageKey, node, distOptions); err != nil { cs.recordError("replicate", err) } }() } // Notify event listeners event := &StorageEvent{ Type: EventStored, Key: storageKey, Data: node, Timestamp: time.Now(), Metadata: map[string]interface{}{ "roles": roles, "ucxl_address": node.UCXLAddress.String(), "node_id": cs.nodeID, }, } cs.eventNotifier.NotifyStored(ctx, event) cs.recordOperation("store") return nil } // RetrieveContext retrieves context for a UCXL address and role func (cs *ContextStoreImpl) RetrieveContext( ctx context.Context, address ucxl.Address, role string, ) (*slurpContext.ContextNode, error) { start := time.Now() defer func() { cs.recordLatency("retrieve", time.Since(start)) }() storageKey := cs.generateStorageKey(address) cacheKey := cs.generateCacheKey(address, role) // Try cache first if enabled if cs.options.CachingEnabled { if cachedData, found, err := cs.cacheManager.Get(ctx, cacheKey); err == nil && found { if contextNode, ok := cachedData.(*slurpContext.ContextNode); ok { cs.recordOperation("cache_hit") return contextNode, nil } } cs.recordOperation("cache_miss") } // Retrieve from appropriate storage var retrievedData interface{} var retrieveErr error if cs.options.EncryptionEnabled { // Retrieve and decrypt for role retrievedData, retrieveErr = cs.encryptedStorage.RetrieveDecrypted(ctx, storageKey, role) } else if cs.options.PreferLocal { // Try local first retrievedData, retrieveErr = cs.localStorage.Retrieve(ctx, storageKey) if retrieveErr != nil && cs.distributedStorage != nil { // Fallback to distributed retrievedData, retrieveErr = cs.distributedStorage.Retrieve(ctx, storageKey) } } else { // Try distributed first if cs.distributedStorage != nil { retrievedData, retrieveErr = cs.distributedStorage.Retrieve(ctx, storageKey) } if retrieveErr != nil { // Fallback to local retrievedData, retrieveErr = cs.localStorage.Retrieve(ctx, storageKey) } } if retrieveErr != nil { cs.recordError("retrieve", retrieveErr) return nil, fmt.Errorf("failed to retrieve context: %w", retrieveErr) } // Cast to context node contextNode, ok := retrievedData.(*slurpContext.ContextNode) if !ok { return nil, fmt.Errorf("invalid context node type") } // Cache the result if caching is enabled if cs.options.CachingEnabled { if err := cs.cacheManager.Set(ctx, cacheKey, contextNode, cs.options.CacheTTL); err != nil { cs.recordError("cache_set", err) } } // Notify event listeners event := &StorageEvent{ Type: EventRetrieved, Key: storageKey, Data: contextNode, Timestamp: time.Now(), Metadata: map[string]interface{}{ "role": role, "ucxl_address": address.String(), "node_id": cs.nodeID, }, } cs.eventNotifier.NotifyRetrieved(ctx, event) cs.recordOperation("retrieve") return contextNode, nil } // UpdateContext updates an existing context node func (cs *ContextStoreImpl) UpdateContext( ctx context.Context, node *slurpContext.ContextNode, roles []string, ) error { start := time.Now() defer func() { cs.recordLatency("update", time.Since(start)) }() // Check if context exists storageKey := cs.generateStorageKey(node.UCXLAddress) exists, err := cs.ExistsContext(ctx, node.UCXLAddress) if err != nil { return fmt.Errorf("failed to check context existence: %w", err) } if !exists { return fmt.Errorf("context does not exist for address: %s", node.UCXLAddress.String()) } // Update is essentially a store operation with additional logic if err := cs.StoreContext(ctx, node, roles); err != nil { return fmt.Errorf("failed to update context: %w", err) } // Invalidate cache entries if cs.options.CachingEnabled { for _, role := range roles { cacheKey := cs.generateCacheKey(node.UCXLAddress, role) if err := cs.cacheManager.Delete(ctx, cacheKey); err != nil { cs.recordError("cache_delete", err) } } } // Notify update event event := &StorageEvent{ Type: EventUpdated, Key: storageKey, Data: node, Timestamp: time.Now(), Metadata: map[string]interface{}{ "roles": roles, "ucxl_address": node.UCXLAddress.String(), "node_id": cs.nodeID, }, } cs.eventNotifier.NotifyUpdated(ctx, event) cs.recordOperation("update") return nil } // DeleteContext removes a context node from storage func (cs *ContextStoreImpl) DeleteContext( ctx context.Context, address ucxl.Address, ) error { start := time.Now() defer func() { cs.recordLatency("delete", time.Since(start)) }() storageKey := cs.generateStorageKey(address) // Delete from all storage layers var errors []error // Delete from local storage if err := cs.localStorage.Delete(ctx, storageKey); err != nil { errors = append(errors, fmt.Errorf("local delete failed: %w", err)) } // Delete from distributed storage if available if cs.distributedStorage != nil { if err := cs.distributedStorage.Delete(ctx, storageKey); err != nil { errors = append(errors, fmt.Errorf("distributed delete failed: %w", err)) } } // Delete from cache (all role variants) if cs.options.CachingEnabled { cachePattern := fmt.Sprintf("context:%s:*", address.String()) if err := cs.cacheManager.DeletePattern(ctx, cachePattern); err != nil { errors = append(errors, fmt.Errorf("cache delete failed: %w", err)) } } // Remove from search indexes if cs.options.IndexingEnabled { indexes, err := cs.indexManager.ListIndexes(ctx) if err == nil { for _, indexName := range indexes { if err := cs.indexManager.DeleteFromIndex(ctx, indexName, storageKey); err != nil { errors = append(errors, fmt.Errorf("index delete failed: %w", err)) } } } } if len(errors) > 0 { // At least one deletion failed cs.recordError("delete", fmt.Errorf("partial delete failure: %v", errors)) // Don't return error if at least one deletion succeeded // Log the issues but allow the operation to continue } // Notify deletion event event := &StorageEvent{ Type: EventDeleted, Key: storageKey, Timestamp: time.Now(), Metadata: map[string]interface{}{ "ucxl_address": address.String(), "node_id": cs.nodeID, }, } cs.eventNotifier.NotifyDeleted(ctx, event) cs.recordOperation("delete") return nil } // ExistsContext checks if context exists for an address func (cs *ContextStoreImpl) ExistsContext( ctx context.Context, address ucxl.Address, ) (bool, error) { storageKey := cs.generateStorageKey(address) // Check local storage first if preferring local if cs.options.PreferLocal { if exists, err := cs.localStorage.Exists(ctx, storageKey); err == nil { return exists, nil } } // Check distributed storage if available if cs.distributedStorage != nil { if exists, err := cs.distributedStorage.Exists(ctx, storageKey); err == nil { return exists, nil } } // Fallback to local if not preferring local if !cs.options.PreferLocal { return cs.localStorage.Exists(ctx, storageKey) } return false, nil } // Additional methods would continue here... // This is a comprehensive implementation showing the multi-tier architecture // Helper methods func (cs *ContextStoreImpl) generateStorageKey(address ucxl.Address) string { return fmt.Sprintf("context:%s", address.String()) } func (cs *ContextStoreImpl) generateCacheKey(address ucxl.Address, role string) string { return fmt.Sprintf("context:%s:role:%s", address.String(), role) } func (cs *ContextStoreImpl) updateSearchIndexes(ctx context.Context, node *slurpContext.ContextNode) error { // Update various search indexes indexes, err := cs.indexManager.ListIndexes(ctx) if err != nil { return fmt.Errorf("failed to list indexes: %w", err) } for _, indexName := range indexes { storageKey := cs.generateStorageKey(node.UCXLAddress) if err := cs.indexManager.UpdateIndex(ctx, indexName, storageKey, node); err != nil { // Log but continue with other indexes cs.recordError("index_update", err) } } return nil } func (cs *ContextStoreImpl) recordOperation(operation string) { cs.metricsCollector.mu.Lock() defer cs.metricsCollector.mu.Unlock() cs.metricsCollector.operationCount[operation]++ } func (cs *ContextStoreImpl) recordLatency(operation string, latency time.Duration) { cs.metricsCollector.mu.Lock() defer cs.metricsCollector.mu.Unlock() if cs.metricsCollector.latencyHistogram[operation] == nil { cs.metricsCollector.latencyHistogram[operation] = make([]time.Duration, 0, 100) } // Keep only last 100 samples histogram := cs.metricsCollector.latencyHistogram[operation] if len(histogram) >= 100 { histogram = histogram[1:] } histogram = append(histogram, latency) cs.metricsCollector.latencyHistogram[operation] = histogram } func (cs *ContextStoreImpl) recordError(operation string, err error) { cs.metricsCollector.mu.Lock() defer cs.metricsCollector.mu.Unlock() cs.metricsCollector.errorCount[operation]++ // Log the error (in production, use proper logging) fmt.Printf("Storage error in %s: %v\n", operation, err) } func (cs *ContextStoreImpl) startBackgroundProcesses() { // Sync process if cs.options.SyncInterval > 0 { cs.syncTicker = time.NewTicker(cs.options.SyncInterval) go cs.syncProcess() } // Compaction process if cs.options.CompactionInterval > 0 { cs.compactionTicker = time.NewTicker(cs.options.CompactionInterval) go cs.compactionProcess() } // Cleanup process if cs.options.CleanupInterval > 0 { cs.cleanupTicker = time.NewTicker(cs.options.CleanupInterval) go cs.cleanupProcess() } } func (cs *ContextStoreImpl) syncProcess() { for { select { case <-cs.syncTicker.C: ctx, cancel := context.WithTimeout(context.Background(), cs.options.OperationTimeout) if err := cs.Sync(ctx); err != nil { cs.recordError("sync", err) } cancel() case <-cs.stopCh: return } } } func (cs *ContextStoreImpl) compactionProcess() { for { select { case <-cs.compactionTicker.C: ctx, cancel := context.WithTimeout(context.Background(), cs.options.OperationTimeout*2) if err := cs.localStorage.Compact(ctx); err != nil { cs.recordError("compaction", err) } cancel() case <-cs.stopCh: return } } } func (cs *ContextStoreImpl) cleanupProcess() { for { select { case <-cs.cleanupTicker.C: ctx, cancel := context.WithTimeout(context.Background(), cs.options.OperationTimeout) cs.performCleanup(ctx) cancel() case <-cs.stopCh: return } } } func (cs *ContextStoreImpl) performCleanup(ctx context.Context) { // Clean expired cache entries if err := cs.cacheManager.Clear(ctx); err != nil { cs.recordError("cache_cleanup", err) } // Clean old metrics cs.cleanupMetrics() } func (cs *ContextStoreImpl) cleanupMetrics() { cs.metricsCollector.mu.Lock() defer cs.metricsCollector.mu.Unlock() // Reset histograms that are too large for operation, histogram := range cs.metricsCollector.latencyHistogram { if len(histogram) > 1000 { // Keep only the most recent 100 samples cs.metricsCollector.latencyHistogram[operation] = histogram[len(histogram)-100:] } } } // GetStorageStats returns storage statistics and health information func (cs *ContextStoreImpl) GetStorageStats(ctx context.Context) (*StorageStatistics, error) { cs.mu.RLock() defer cs.mu.RUnlock() // Aggregate stats from all storage layers localStats, err := cs.localStorage.GetLocalStats() if err != nil { return nil, fmt.Errorf("failed to get local stats: %w", err) } // Update main statistics cs.statistics.LocalContexts = localStats.TotalFiles cs.statistics.TotalSize = localStats.TotalSize cs.statistics.CompressedSize = localStats.CompressedSize cs.statistics.AvailableSpace = localStats.AvailableSpace cs.statistics.AverageLatency = cs.calculateAverageLatency() cs.statistics.OperationsPerSecond = cs.calculateOpsPerSecond() if cs.distributedStorage != nil { distStats, err := cs.distributedStorage.GetDistributedStats() if err == nil { cs.statistics.DistributedContexts = distStats.TotalReplicas cs.statistics.ReplicationFactor = float64(distStats.TotalReplicas) / float64(localStats.TotalFiles) } } if cs.options.CachingEnabled { cacheStats, err := cs.cacheManager.GetCacheStats() if err == nil { cs.statistics.CacheSize = cacheStats.CurrentSize } } // Return a copy statsCopy := *cs.statistics return &statsCopy, nil } func (cs *ContextStoreImpl) calculateAverageLatency() time.Duration { cs.metricsCollector.mu.RLock() defer cs.metricsCollector.mu.RUnlock() var totalLatency time.Duration var totalSamples int for _, histogram := range cs.metricsCollector.latencyHistogram { for _, latency := range histogram { totalLatency += latency totalSamples++ } } if totalSamples == 0 { return 0 } return totalLatency / time.Duration(totalSamples) } func (cs *ContextStoreImpl) calculateOpsPerSecond() float64 { cs.metricsCollector.mu.RLock() defer cs.metricsCollector.mu.RUnlock() timeSinceLastCollection := time.Since(cs.metricsCollector.lastCollected) if timeSinceLastCollection == 0 { return 0 } totalOps := int64(0) for _, count := range cs.metricsCollector.operationCount { totalOps += count } return float64(totalOps) / timeSinceLastCollection.Seconds() } // Sync synchronizes with distributed storage func (cs *ContextStoreImpl) Sync(ctx context.Context) error { if cs.distributedStorage == nil { return nil // No distributed storage to sync with } start := time.Now() defer func() { cs.statistics.LastSyncTime = time.Now() }() if err := cs.distributedStorage.Sync(ctx); err != nil { cs.statistics.SyncErrors++ return fmt.Errorf("distributed sync failed: %w", err) } // Notify sync event event := &StorageEvent{ Type: EventSynced, Timestamp: time.Now(), Metadata: map[string]interface{}{ "node_id": cs.nodeID, "sync_time": time.Since(start), }, } cs.eventNotifier.NotifyStored(ctx, event) // Reuse stored notification return nil } // Close shuts down the context store and cleans up resources func (cs *ContextStoreImpl) Close() error { // Signal background processes to stop close(cs.stopCh) // Stop tickers if cs.syncTicker != nil { cs.syncTicker.Stop() } if cs.compactionTicker != nil { cs.compactionTicker.Stop() } if cs.cleanupTicker != nil { cs.cleanupTicker.Stop() } // Close storage implementations if they support it if closer, ok := cs.localStorage.(*LocalStorageImpl); ok { if err := closer.Close(); err != nil { return fmt.Errorf("failed to close local storage: %w", err) } } return nil }