 d96c931a29
			
		
	
	d96c931a29
	
	
	
		
			
			This comprehensive refactoring addresses critical architectural issues: IMPORT CYCLE RESOLUTION: • pkg/crypto ↔ pkg/slurp/roles: Created pkg/security/access_levels.go • pkg/ucxl → pkg/dht: Created pkg/storage/interfaces.go • pkg/slurp/leader → pkg/election → pkg/slurp/storage: Moved types to pkg/election/interfaces.go MODULE PATH MIGRATION: • Changed from github.com/anthonyrawlins/bzzz to chorus.services/bzzz • Updated all import statements across 115+ files • Maintains compatibility while removing personal GitHub account dependency TYPE SYSTEM IMPROVEMENTS: • Resolved duplicate type declarations in crypto package • Added missing type definitions (RoleStatus, TimeRestrictions, KeyStatus, KeyRotationResult) • Proper interface segregation to prevent future cycles ARCHITECTURAL BENEFITS: • Build now progresses past structural issues to normal dependency resolution • Cleaner separation of concerns between packages • Eliminates circular dependencies that prevented compilation • Establishes foundation for scalable codebase growth 🤖 Generated with [Claude Code](https://claude.ai/code) Co-Authored-By: Claude <noreply@anthropic.com>
		
			
				
	
	
		
			766 lines
		
	
	
		
			21 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
			
		
		
	
	
			766 lines
		
	
	
		
			21 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
| package storage
 | |
| 
 | |
| import (
 | |
| 	"context"
 | |
| 	"encoding/json"
 | |
| 	"fmt"
 | |
| 	"sync"
 | |
| 	"time"
 | |
| 
 | |
| 	"chorus.services/bzzz/pkg/crypto"
 | |
| 	"chorus.services/bzzz/pkg/dht"
 | |
| 	"chorus.services/bzzz/pkg/ucxl"
 | |
| 	slurpContext "chorus.services/bzzz/pkg/slurp/context"
 | |
| )
 | |
| 
 | |
| // ContextStoreImpl is the main implementation of the ContextStore interface
 | |
| // It coordinates between local storage, distributed storage, encryption, caching, and indexing
 | |
| type ContextStoreImpl struct {
 | |
| 	mu                sync.RWMutex
 | |
| 	localStorage      LocalStorage
 | |
| 	distributedStorage DistributedStorage
 | |
| 	encryptedStorage  EncryptedStorage
 | |
| 	cacheManager      CacheManager
 | |
| 	indexManager      IndexManager
 | |
| 	backupManager     BackupManager
 | |
| 	eventNotifier     EventNotifier
 | |
| 	
 | |
| 	// Configuration
 | |
| 	nodeID            string
 | |
| 	options           *ContextStoreOptions
 | |
| 	
 | |
| 	// Statistics and monitoring
 | |
| 	statistics        *StorageStatistics
 | |
| 	metricsCollector  *MetricsCollector
 | |
| 	
 | |
| 	// Background processes
 | |
| 	stopCh            chan struct{}
 | |
| 	syncTicker        *time.Ticker
 | |
| 	compactionTicker  *time.Ticker
 | |
| 	cleanupTicker     *time.Ticker
 | |
| }
 | |
| 
 | |
| // ContextStoreOptions configures the context store behavior
 | |
| type ContextStoreOptions struct {
 | |
| 	// Storage configuration
 | |
| 	PreferLocal         bool          `json:"prefer_local"`
 | |
| 	AutoReplicate       bool          `json:"auto_replicate"`
 | |
| 	DefaultReplicas     int           `json:"default_replicas"`
 | |
| 	EncryptionEnabled   bool          `json:"encryption_enabled"`
 | |
| 	CompressionEnabled  bool          `json:"compression_enabled"`
 | |
| 	
 | |
| 	// Caching configuration
 | |
| 	CachingEnabled      bool          `json:"caching_enabled"`
 | |
| 	CacheTTL            time.Duration `json:"cache_ttl"`
 | |
| 	CacheSize           int64         `json:"cache_size"`
 | |
| 	
 | |
| 	// Indexing configuration
 | |
| 	IndexingEnabled     bool          `json:"indexing_enabled"`
 | |
| 	IndexRefreshInterval time.Duration `json:"index_refresh_interval"`
 | |
| 	
 | |
| 	// Background processes
 | |
| 	SyncInterval        time.Duration `json:"sync_interval"`
 | |
| 	CompactionInterval  time.Duration `json:"compaction_interval"`
 | |
| 	CleanupInterval     time.Duration `json:"cleanup_interval"`
 | |
| 	
 | |
| 	// Performance tuning
 | |
| 	BatchSize           int           `json:"batch_size"`
 | |
| 	MaxConcurrentOps    int           `json:"max_concurrent_ops"`
 | |
| 	OperationTimeout    time.Duration `json:"operation_timeout"`
 | |
| }
 | |
| 
 | |
| // MetricsCollector collects and aggregates storage metrics
 | |
| type MetricsCollector struct {
 | |
| 	mu               sync.RWMutex
 | |
| 	operationCount   map[string]int64
 | |
| 	latencyHistogram map[string][]time.Duration
 | |
| 	errorCount       map[string]int64
 | |
| 	lastCollected    time.Time
 | |
| }
 | |
| 
 | |
| // DefaultContextStoreOptions returns sensible defaults
 | |
| func DefaultContextStoreOptions() *ContextStoreOptions {
 | |
| 	return &ContextStoreOptions{
 | |
| 		PreferLocal:          true,
 | |
| 		AutoReplicate:        true,
 | |
| 		DefaultReplicas:      3,
 | |
| 		EncryptionEnabled:    true,
 | |
| 		CompressionEnabled:   true,
 | |
| 		CachingEnabled:       true,
 | |
| 		CacheTTL:            24 * time.Hour,
 | |
| 		CacheSize:           1024 * 1024 * 1024, // 1GB
 | |
| 		IndexingEnabled:     true,
 | |
| 		IndexRefreshInterval: 5 * time.Minute,
 | |
| 		SyncInterval:        10 * time.Minute,
 | |
| 		CompactionInterval:  24 * time.Hour,
 | |
| 		CleanupInterval:     1 * time.Hour,
 | |
| 		BatchSize:           100,
 | |
| 		MaxConcurrentOps:    10,
 | |
| 		OperationTimeout:    30 * time.Second,
 | |
| 	}
 | |
| }
 | |
| 
 | |
| // NewContextStore creates a new context store with all components
 | |
| func NewContextStore(
 | |
| 	nodeID string,
 | |
| 	localStorage LocalStorage,
 | |
| 	distributedStorage DistributedStorage,
 | |
| 	encryptedStorage EncryptedStorage,
 | |
| 	cacheManager CacheManager,
 | |
| 	indexManager IndexManager,
 | |
| 	backupManager BackupManager,
 | |
| 	eventNotifier EventNotifier,
 | |
| 	options *ContextStoreOptions,
 | |
| ) *ContextStoreImpl {
 | |
| 	if options == nil {
 | |
| 		options = DefaultContextStoreOptions()
 | |
| 	}
 | |
| 
 | |
| 	cs := &ContextStoreImpl{
 | |
| 		localStorage:       localStorage,
 | |
| 		distributedStorage: distributedStorage,
 | |
| 		encryptedStorage:   encryptedStorage,
 | |
| 		cacheManager:       cacheManager,
 | |
| 		indexManager:       indexManager,
 | |
| 		backupManager:      backupManager,
 | |
| 		eventNotifier:      eventNotifier,
 | |
| 		nodeID:            nodeID,
 | |
| 		options:           options,
 | |
| 		statistics: &StorageStatistics{
 | |
| 			LastSyncTime: time.Now(),
 | |
| 		},
 | |
| 		metricsCollector: &MetricsCollector{
 | |
| 			operationCount:   make(map[string]int64),
 | |
| 			latencyHistogram: make(map[string][]time.Duration),
 | |
| 			errorCount:       make(map[string]int64),
 | |
| 			lastCollected:    time.Now(),
 | |
| 		},
 | |
| 		stopCh: make(chan struct{}),
 | |
| 	}
 | |
| 
 | |
| 	// Start background processes
 | |
| 	cs.startBackgroundProcesses()
 | |
| 
 | |
| 	return cs
 | |
| }
 | |
| 
 | |
| // StoreContext stores a context node with role-based encryption
 | |
| func (cs *ContextStoreImpl) StoreContext(
 | |
| 	ctx context.Context,
 | |
| 	node *slurpContext.ContextNode,
 | |
| 	roles []string,
 | |
| ) error {
 | |
| 	start := time.Now()
 | |
| 	defer func() {
 | |
| 		cs.recordLatency("store", time.Since(start))
 | |
| 	}()
 | |
| 
 | |
| 	// Validate input
 | |
| 	if node == nil {
 | |
| 		return fmt.Errorf("context node cannot be nil")
 | |
| 	}
 | |
| 	if len(roles) == 0 {
 | |
| 		return fmt.Errorf("at least one role must be specified")
 | |
| 	}
 | |
| 
 | |
| 	// Generate storage key
 | |
| 	storageKey := cs.generateStorageKey(node.UCXLAddress)
 | |
| 
 | |
| 	// Store based on configuration
 | |
| 	var storeErr error
 | |
| 	if cs.options.EncryptionEnabled {
 | |
| 		// Store encrypted for each role
 | |
| 		storeErr = cs.encryptedStorage.StoreEncrypted(ctx, storageKey, node, roles)
 | |
| 	} else {
 | |
| 		// Store unencrypted
 | |
| 		storeOptions := &StoreOptions{
 | |
| 			Encrypt:     false,
 | |
| 			Replicate:   cs.options.AutoReplicate,
 | |
| 			Index:       cs.options.IndexingEnabled,
 | |
| 			Cache:       cs.options.CachingEnabled,
 | |
| 			Compress:    cs.options.CompressionEnabled,
 | |
| 		}
 | |
| 		storeErr = cs.localStorage.Store(ctx, storageKey, node, storeOptions)
 | |
| 	}
 | |
| 
 | |
| 	if storeErr != nil {
 | |
| 		cs.recordError("store", storeErr)
 | |
| 		return fmt.Errorf("failed to store context: %w", storeErr)
 | |
| 	}
 | |
| 
 | |
| 	// Update search indexes if enabled
 | |
| 	if cs.options.IndexingEnabled {
 | |
| 		if err := cs.updateSearchIndexes(ctx, node); err != nil {
 | |
| 			// Log but don't fail the store operation
 | |
| 			cs.recordError("index_update", err)
 | |
| 		}
 | |
| 	}
 | |
| 
 | |
| 	// Cache the context if enabled
 | |
| 	if cs.options.CachingEnabled {
 | |
| 		for _, role := range roles {
 | |
| 			cacheKey := cs.generateCacheKey(node.UCXLAddress, role)
 | |
| 			if err := cs.cacheManager.Set(ctx, cacheKey, node, cs.options.CacheTTL); err != nil {
 | |
| 				// Log but don't fail
 | |
| 				cs.recordError("cache_set", err)
 | |
| 			}
 | |
| 		}
 | |
| 	}
 | |
| 
 | |
| 	// Replicate to distributed storage if enabled
 | |
| 	if cs.options.AutoReplicate && cs.distributedStorage != nil {
 | |
| 		go func() {
 | |
| 			replicateCtx, cancel := context.WithTimeout(context.Background(), cs.options.OperationTimeout)
 | |
| 			defer cancel()
 | |
| 			
 | |
| 			distOptions := &DistributedStoreOptions{
 | |
| 				ReplicationFactor: cs.options.DefaultReplicas,
 | |
| 				ConsistencyLevel:  ConsistencyQuorum,
 | |
| 				Timeout:          cs.options.OperationTimeout,
 | |
| 				SyncMode:         SyncAsync,
 | |
| 			}
 | |
| 			
 | |
| 			if err := cs.distributedStorage.Store(replicateCtx, storageKey, node, distOptions); err != nil {
 | |
| 				cs.recordError("replicate", err)
 | |
| 			}
 | |
| 		}()
 | |
| 	}
 | |
| 
 | |
| 	// Notify event listeners
 | |
| 	event := &StorageEvent{
 | |
| 		Type:      EventStored,
 | |
| 		Key:       storageKey,
 | |
| 		Data:      node,
 | |
| 		Timestamp: time.Now(),
 | |
| 		Metadata: map[string]interface{}{
 | |
| 			"roles":        roles,
 | |
| 			"ucxl_address": node.UCXLAddress.String(),
 | |
| 			"node_id":      cs.nodeID,
 | |
| 		},
 | |
| 	}
 | |
| 	cs.eventNotifier.NotifyStored(ctx, event)
 | |
| 
 | |
| 	cs.recordOperation("store")
 | |
| 	return nil
 | |
| }
 | |
| 
 | |
| // RetrieveContext retrieves context for a UCXL address and role
 | |
| func (cs *ContextStoreImpl) RetrieveContext(
 | |
| 	ctx context.Context,
 | |
| 	address ucxl.Address,
 | |
| 	role string,
 | |
| ) (*slurpContext.ContextNode, error) {
 | |
| 	start := time.Now()
 | |
| 	defer func() {
 | |
| 		cs.recordLatency("retrieve", time.Since(start))
 | |
| 	}()
 | |
| 
 | |
| 	storageKey := cs.generateStorageKey(address)
 | |
| 	cacheKey := cs.generateCacheKey(address, role)
 | |
| 
 | |
| 	// Try cache first if enabled
 | |
| 	if cs.options.CachingEnabled {
 | |
| 		if cachedData, found, err := cs.cacheManager.Get(ctx, cacheKey); err == nil && found {
 | |
| 			if contextNode, ok := cachedData.(*slurpContext.ContextNode); ok {
 | |
| 				cs.recordOperation("cache_hit")
 | |
| 				return contextNode, nil
 | |
| 			}
 | |
| 		}
 | |
| 		cs.recordOperation("cache_miss")
 | |
| 	}
 | |
| 
 | |
| 	// Retrieve from appropriate storage
 | |
| 	var retrievedData interface{}
 | |
| 	var retrieveErr error
 | |
| 
 | |
| 	if cs.options.EncryptionEnabled {
 | |
| 		// Retrieve and decrypt for role
 | |
| 		retrievedData, retrieveErr = cs.encryptedStorage.RetrieveDecrypted(ctx, storageKey, role)
 | |
| 	} else if cs.options.PreferLocal {
 | |
| 		// Try local first
 | |
| 		retrievedData, retrieveErr = cs.localStorage.Retrieve(ctx, storageKey)
 | |
| 		if retrieveErr != nil && cs.distributedStorage != nil {
 | |
| 			// Fallback to distributed
 | |
| 			retrievedData, retrieveErr = cs.distributedStorage.Retrieve(ctx, storageKey)
 | |
| 		}
 | |
| 	} else {
 | |
| 		// Try distributed first
 | |
| 		if cs.distributedStorage != nil {
 | |
| 			retrievedData, retrieveErr = cs.distributedStorage.Retrieve(ctx, storageKey)
 | |
| 		}
 | |
| 		if retrieveErr != nil {
 | |
| 			// Fallback to local
 | |
| 			retrievedData, retrieveErr = cs.localStorage.Retrieve(ctx, storageKey)
 | |
| 		}
 | |
| 	}
 | |
| 
 | |
| 	if retrieveErr != nil {
 | |
| 		cs.recordError("retrieve", retrieveErr)
 | |
| 		return nil, fmt.Errorf("failed to retrieve context: %w", retrieveErr)
 | |
| 	}
 | |
| 
 | |
| 	// Cast to context node
 | |
| 	contextNode, ok := retrievedData.(*slurpContext.ContextNode)
 | |
| 	if !ok {
 | |
| 		return nil, fmt.Errorf("invalid context node type")
 | |
| 	}
 | |
| 
 | |
| 	// Cache the result if caching is enabled
 | |
| 	if cs.options.CachingEnabled {
 | |
| 		if err := cs.cacheManager.Set(ctx, cacheKey, contextNode, cs.options.CacheTTL); err != nil {
 | |
| 			cs.recordError("cache_set", err)
 | |
| 		}
 | |
| 	}
 | |
| 
 | |
| 	// Notify event listeners
 | |
| 	event := &StorageEvent{
 | |
| 		Type:      EventRetrieved,
 | |
| 		Key:       storageKey,
 | |
| 		Data:      contextNode,
 | |
| 		Timestamp: time.Now(),
 | |
| 		Metadata: map[string]interface{}{
 | |
| 			"role":         role,
 | |
| 			"ucxl_address": address.String(),
 | |
| 			"node_id":      cs.nodeID,
 | |
| 		},
 | |
| 	}
 | |
| 	cs.eventNotifier.NotifyRetrieved(ctx, event)
 | |
| 
 | |
| 	cs.recordOperation("retrieve")
 | |
| 	return contextNode, nil
 | |
| }
 | |
| 
 | |
| // UpdateContext updates an existing context node
 | |
| func (cs *ContextStoreImpl) UpdateContext(
 | |
| 	ctx context.Context,
 | |
| 	node *slurpContext.ContextNode,
 | |
| 	roles []string,
 | |
| ) error {
 | |
| 	start := time.Now()
 | |
| 	defer func() {
 | |
| 		cs.recordLatency("update", time.Since(start))
 | |
| 	}()
 | |
| 
 | |
| 	// Check if context exists
 | |
| 	storageKey := cs.generateStorageKey(node.UCXLAddress)
 | |
| 	exists, err := cs.ExistsContext(ctx, node.UCXLAddress)
 | |
| 	if err != nil {
 | |
| 		return fmt.Errorf("failed to check context existence: %w", err)
 | |
| 	}
 | |
| 	if !exists {
 | |
| 		return fmt.Errorf("context does not exist for address: %s", node.UCXLAddress.String())
 | |
| 	}
 | |
| 
 | |
| 	// Update is essentially a store operation with additional logic
 | |
| 	if err := cs.StoreContext(ctx, node, roles); err != nil {
 | |
| 		return fmt.Errorf("failed to update context: %w", err)
 | |
| 	}
 | |
| 
 | |
| 	// Invalidate cache entries
 | |
| 	if cs.options.CachingEnabled {
 | |
| 		for _, role := range roles {
 | |
| 			cacheKey := cs.generateCacheKey(node.UCXLAddress, role)
 | |
| 			if err := cs.cacheManager.Delete(ctx, cacheKey); err != nil {
 | |
| 				cs.recordError("cache_delete", err)
 | |
| 			}
 | |
| 		}
 | |
| 	}
 | |
| 
 | |
| 	// Notify update event
 | |
| 	event := &StorageEvent{
 | |
| 		Type:      EventUpdated,
 | |
| 		Key:       storageKey,
 | |
| 		Data:      node,
 | |
| 		Timestamp: time.Now(),
 | |
| 		Metadata: map[string]interface{}{
 | |
| 			"roles":        roles,
 | |
| 			"ucxl_address": node.UCXLAddress.String(),
 | |
| 			"node_id":      cs.nodeID,
 | |
| 		},
 | |
| 	}
 | |
| 	cs.eventNotifier.NotifyUpdated(ctx, event)
 | |
| 
 | |
| 	cs.recordOperation("update")
 | |
| 	return nil
 | |
| }
 | |
| 
 | |
| // DeleteContext removes a context node from storage
 | |
| func (cs *ContextStoreImpl) DeleteContext(
 | |
| 	ctx context.Context,
 | |
| 	address ucxl.Address,
 | |
| ) error {
 | |
| 	start := time.Now()
 | |
| 	defer func() {
 | |
| 		cs.recordLatency("delete", time.Since(start))
 | |
| 	}()
 | |
| 
 | |
| 	storageKey := cs.generateStorageKey(address)
 | |
| 
 | |
| 	// Delete from all storage layers
 | |
| 	var errors []error
 | |
| 
 | |
| 	// Delete from local storage
 | |
| 	if err := cs.localStorage.Delete(ctx, storageKey); err != nil {
 | |
| 		errors = append(errors, fmt.Errorf("local delete failed: %w", err))
 | |
| 	}
 | |
| 
 | |
| 	// Delete from distributed storage if available
 | |
| 	if cs.distributedStorage != nil {
 | |
| 		if err := cs.distributedStorage.Delete(ctx, storageKey); err != nil {
 | |
| 			errors = append(errors, fmt.Errorf("distributed delete failed: %w", err))
 | |
| 		}
 | |
| 	}
 | |
| 
 | |
| 	// Delete from cache (all role variants)
 | |
| 	if cs.options.CachingEnabled {
 | |
| 		cachePattern := fmt.Sprintf("context:%s:*", address.String())
 | |
| 		if err := cs.cacheManager.DeletePattern(ctx, cachePattern); err != nil {
 | |
| 			errors = append(errors, fmt.Errorf("cache delete failed: %w", err))
 | |
| 		}
 | |
| 	}
 | |
| 
 | |
| 	// Remove from search indexes
 | |
| 	if cs.options.IndexingEnabled {
 | |
| 		indexes, err := cs.indexManager.ListIndexes(ctx)
 | |
| 		if err == nil {
 | |
| 			for _, indexName := range indexes {
 | |
| 				if err := cs.indexManager.DeleteFromIndex(ctx, indexName, storageKey); err != nil {
 | |
| 					errors = append(errors, fmt.Errorf("index delete failed: %w", err))
 | |
| 				}
 | |
| 			}
 | |
| 		}
 | |
| 	}
 | |
| 
 | |
| 	if len(errors) > 0 {
 | |
| 		// At least one deletion failed
 | |
| 		cs.recordError("delete", fmt.Errorf("partial delete failure: %v", errors))
 | |
| 		// Don't return error if at least one deletion succeeded
 | |
| 		// Log the issues but allow the operation to continue
 | |
| 	}
 | |
| 
 | |
| 	// Notify deletion event
 | |
| 	event := &StorageEvent{
 | |
| 		Type:      EventDeleted,
 | |
| 		Key:       storageKey,
 | |
| 		Timestamp: time.Now(),
 | |
| 		Metadata: map[string]interface{}{
 | |
| 			"ucxl_address": address.String(),
 | |
| 			"node_id":      cs.nodeID,
 | |
| 		},
 | |
| 	}
 | |
| 	cs.eventNotifier.NotifyDeleted(ctx, event)
 | |
| 
 | |
| 	cs.recordOperation("delete")
 | |
| 	return nil
 | |
| }
 | |
| 
 | |
| // ExistsContext checks if context exists for an address
 | |
| func (cs *ContextStoreImpl) ExistsContext(
 | |
| 	ctx context.Context,
 | |
| 	address ucxl.Address,
 | |
| ) (bool, error) {
 | |
| 	storageKey := cs.generateStorageKey(address)
 | |
| 
 | |
| 	// Check local storage first if preferring local
 | |
| 	if cs.options.PreferLocal {
 | |
| 		if exists, err := cs.localStorage.Exists(ctx, storageKey); err == nil {
 | |
| 			return exists, nil
 | |
| 		}
 | |
| 	}
 | |
| 
 | |
| 	// Check distributed storage if available
 | |
| 	if cs.distributedStorage != nil {
 | |
| 		if exists, err := cs.distributedStorage.Exists(ctx, storageKey); err == nil {
 | |
| 			return exists, nil
 | |
| 		}
 | |
| 	}
 | |
| 
 | |
| 	// Fallback to local if not preferring local
 | |
| 	if !cs.options.PreferLocal {
 | |
| 		return cs.localStorage.Exists(ctx, storageKey)
 | |
| 	}
 | |
| 
 | |
| 	return false, nil
 | |
| }
 | |
| 
 | |
| // Additional methods would continue here...
 | |
| // This is a comprehensive implementation showing the multi-tier architecture
 | |
| 
 | |
| // Helper methods
 | |
| 
 | |
| func (cs *ContextStoreImpl) generateStorageKey(address ucxl.Address) string {
 | |
| 	return fmt.Sprintf("context:%s", address.String())
 | |
| }
 | |
| 
 | |
| func (cs *ContextStoreImpl) generateCacheKey(address ucxl.Address, role string) string {
 | |
| 	return fmt.Sprintf("context:%s:role:%s", address.String(), role)
 | |
| }
 | |
| 
 | |
| func (cs *ContextStoreImpl) updateSearchIndexes(ctx context.Context, node *slurpContext.ContextNode) error {
 | |
| 	// Update various search indexes
 | |
| 	indexes, err := cs.indexManager.ListIndexes(ctx)
 | |
| 	if err != nil {
 | |
| 		return fmt.Errorf("failed to list indexes: %w", err)
 | |
| 	}
 | |
| 
 | |
| 	for _, indexName := range indexes {
 | |
| 		storageKey := cs.generateStorageKey(node.UCXLAddress)
 | |
| 		if err := cs.indexManager.UpdateIndex(ctx, indexName, storageKey, node); err != nil {
 | |
| 			// Log but continue with other indexes
 | |
| 			cs.recordError("index_update", err)
 | |
| 		}
 | |
| 	}
 | |
| 
 | |
| 	return nil
 | |
| }
 | |
| 
 | |
| func (cs *ContextStoreImpl) recordOperation(operation string) {
 | |
| 	cs.metricsCollector.mu.Lock()
 | |
| 	defer cs.metricsCollector.mu.Unlock()
 | |
| 	cs.metricsCollector.operationCount[operation]++
 | |
| }
 | |
| 
 | |
| func (cs *ContextStoreImpl) recordLatency(operation string, latency time.Duration) {
 | |
| 	cs.metricsCollector.mu.Lock()
 | |
| 	defer cs.metricsCollector.mu.Unlock()
 | |
| 	
 | |
| 	if cs.metricsCollector.latencyHistogram[operation] == nil {
 | |
| 		cs.metricsCollector.latencyHistogram[operation] = make([]time.Duration, 0, 100)
 | |
| 	}
 | |
| 	
 | |
| 	// Keep only last 100 samples
 | |
| 	histogram := cs.metricsCollector.latencyHistogram[operation]
 | |
| 	if len(histogram) >= 100 {
 | |
| 		histogram = histogram[1:]
 | |
| 	}
 | |
| 	histogram = append(histogram, latency)
 | |
| 	cs.metricsCollector.latencyHistogram[operation] = histogram
 | |
| }
 | |
| 
 | |
| func (cs *ContextStoreImpl) recordError(operation string, err error) {
 | |
| 	cs.metricsCollector.mu.Lock()
 | |
| 	defer cs.metricsCollector.mu.Unlock()
 | |
| 	cs.metricsCollector.errorCount[operation]++
 | |
| 	
 | |
| 	// Log the error (in production, use proper logging)
 | |
| 	fmt.Printf("Storage error in %s: %v\n", operation, err)
 | |
| }
 | |
| 
 | |
| func (cs *ContextStoreImpl) startBackgroundProcesses() {
 | |
| 	// Sync process
 | |
| 	if cs.options.SyncInterval > 0 {
 | |
| 		cs.syncTicker = time.NewTicker(cs.options.SyncInterval)
 | |
| 		go cs.syncProcess()
 | |
| 	}
 | |
| 
 | |
| 	// Compaction process
 | |
| 	if cs.options.CompactionInterval > 0 {
 | |
| 		cs.compactionTicker = time.NewTicker(cs.options.CompactionInterval)
 | |
| 		go cs.compactionProcess()
 | |
| 	}
 | |
| 
 | |
| 	// Cleanup process
 | |
| 	if cs.options.CleanupInterval > 0 {
 | |
| 		cs.cleanupTicker = time.NewTicker(cs.options.CleanupInterval)
 | |
| 		go cs.cleanupProcess()
 | |
| 	}
 | |
| }
 | |
| 
 | |
| func (cs *ContextStoreImpl) syncProcess() {
 | |
| 	for {
 | |
| 		select {
 | |
| 		case <-cs.syncTicker.C:
 | |
| 			ctx, cancel := context.WithTimeout(context.Background(), cs.options.OperationTimeout)
 | |
| 			if err := cs.Sync(ctx); err != nil {
 | |
| 				cs.recordError("sync", err)
 | |
| 			}
 | |
| 			cancel()
 | |
| 		case <-cs.stopCh:
 | |
| 			return
 | |
| 		}
 | |
| 	}
 | |
| }
 | |
| 
 | |
| func (cs *ContextStoreImpl) compactionProcess() {
 | |
| 	for {
 | |
| 		select {
 | |
| 		case <-cs.compactionTicker.C:
 | |
| 			ctx, cancel := context.WithTimeout(context.Background(), cs.options.OperationTimeout*2)
 | |
| 			if err := cs.localStorage.Compact(ctx); err != nil {
 | |
| 				cs.recordError("compaction", err)
 | |
| 			}
 | |
| 			cancel()
 | |
| 		case <-cs.stopCh:
 | |
| 			return
 | |
| 		}
 | |
| 	}
 | |
| }
 | |
| 
 | |
| func (cs *ContextStoreImpl) cleanupProcess() {
 | |
| 	for {
 | |
| 		select {
 | |
| 		case <-cs.cleanupTicker.C:
 | |
| 			ctx, cancel := context.WithTimeout(context.Background(), cs.options.OperationTimeout)
 | |
| 			cs.performCleanup(ctx)
 | |
| 			cancel()
 | |
| 		case <-cs.stopCh:
 | |
| 			return
 | |
| 		}
 | |
| 	}
 | |
| }
 | |
| 
 | |
| func (cs *ContextStoreImpl) performCleanup(ctx context.Context) {
 | |
| 	// Clean expired cache entries
 | |
| 	if err := cs.cacheManager.Clear(ctx); err != nil {
 | |
| 		cs.recordError("cache_cleanup", err)
 | |
| 	}
 | |
| 	
 | |
| 	// Clean old metrics
 | |
| 	cs.cleanupMetrics()
 | |
| }
 | |
| 
 | |
| func (cs *ContextStoreImpl) cleanupMetrics() {
 | |
| 	cs.metricsCollector.mu.Lock()
 | |
| 	defer cs.metricsCollector.mu.Unlock()
 | |
| 	
 | |
| 	// Reset histograms that are too large
 | |
| 	for operation, histogram := range cs.metricsCollector.latencyHistogram {
 | |
| 		if len(histogram) > 1000 {
 | |
| 			// Keep only the most recent 100 samples
 | |
| 			cs.metricsCollector.latencyHistogram[operation] = histogram[len(histogram)-100:]
 | |
| 		}
 | |
| 	}
 | |
| }
 | |
| 
 | |
| // GetStorageStats returns storage statistics and health information
 | |
| func (cs *ContextStoreImpl) GetStorageStats(ctx context.Context) (*StorageStatistics, error) {
 | |
| 	cs.mu.RLock()
 | |
| 	defer cs.mu.RUnlock()
 | |
| 
 | |
| 	// Aggregate stats from all storage layers
 | |
| 	localStats, err := cs.localStorage.GetLocalStats()
 | |
| 	if err != nil {
 | |
| 		return nil, fmt.Errorf("failed to get local stats: %w", err)
 | |
| 	}
 | |
| 
 | |
| 	// Update main statistics
 | |
| 	cs.statistics.LocalContexts = localStats.TotalFiles
 | |
| 	cs.statistics.TotalSize = localStats.TotalSize
 | |
| 	cs.statistics.CompressedSize = localStats.CompressedSize
 | |
| 	cs.statistics.AvailableSpace = localStats.AvailableSpace
 | |
| 	cs.statistics.AverageLatency = cs.calculateAverageLatency()
 | |
| 	cs.statistics.OperationsPerSecond = cs.calculateOpsPerSecond()
 | |
| 
 | |
| 	if cs.distributedStorage != nil {
 | |
| 		distStats, err := cs.distributedStorage.GetDistributedStats()
 | |
| 		if err == nil {
 | |
| 			cs.statistics.DistributedContexts = distStats.TotalReplicas
 | |
| 			cs.statistics.ReplicationFactor = float64(distStats.TotalReplicas) / float64(localStats.TotalFiles)
 | |
| 		}
 | |
| 	}
 | |
| 
 | |
| 	if cs.options.CachingEnabled {
 | |
| 		cacheStats, err := cs.cacheManager.GetCacheStats()
 | |
| 		if err == nil {
 | |
| 			cs.statistics.CacheSize = cacheStats.CurrentSize
 | |
| 		}
 | |
| 	}
 | |
| 
 | |
| 	// Return a copy
 | |
| 	statsCopy := *cs.statistics
 | |
| 	return &statsCopy, nil
 | |
| }
 | |
| 
 | |
| func (cs *ContextStoreImpl) calculateAverageLatency() time.Duration {
 | |
| 	cs.metricsCollector.mu.RLock()
 | |
| 	defer cs.metricsCollector.mu.RUnlock()
 | |
| 
 | |
| 	var totalLatency time.Duration
 | |
| 	var totalSamples int
 | |
| 
 | |
| 	for _, histogram := range cs.metricsCollector.latencyHistogram {
 | |
| 		for _, latency := range histogram {
 | |
| 			totalLatency += latency
 | |
| 			totalSamples++
 | |
| 		}
 | |
| 	}
 | |
| 
 | |
| 	if totalSamples == 0 {
 | |
| 		return 0
 | |
| 	}
 | |
| 	return totalLatency / time.Duration(totalSamples)
 | |
| }
 | |
| 
 | |
| func (cs *ContextStoreImpl) calculateOpsPerSecond() float64 {
 | |
| 	cs.metricsCollector.mu.RLock()
 | |
| 	defer cs.metricsCollector.mu.RUnlock()
 | |
| 
 | |
| 	timeSinceLastCollection := time.Since(cs.metricsCollector.lastCollected)
 | |
| 	if timeSinceLastCollection == 0 {
 | |
| 		return 0
 | |
| 	}
 | |
| 
 | |
| 	totalOps := int64(0)
 | |
| 	for _, count := range cs.metricsCollector.operationCount {
 | |
| 		totalOps += count
 | |
| 	}
 | |
| 
 | |
| 	return float64(totalOps) / timeSinceLastCollection.Seconds()
 | |
| }
 | |
| 
 | |
| // Sync synchronizes with distributed storage
 | |
| func (cs *ContextStoreImpl) Sync(ctx context.Context) error {
 | |
| 	if cs.distributedStorage == nil {
 | |
| 		return nil // No distributed storage to sync with
 | |
| 	}
 | |
| 
 | |
| 	start := time.Now()
 | |
| 	defer func() {
 | |
| 		cs.statistics.LastSyncTime = time.Now()
 | |
| 	}()
 | |
| 
 | |
| 	if err := cs.distributedStorage.Sync(ctx); err != nil {
 | |
| 		cs.statistics.SyncErrors++
 | |
| 		return fmt.Errorf("distributed sync failed: %w", err)
 | |
| 	}
 | |
| 
 | |
| 	// Notify sync event
 | |
| 	event := &StorageEvent{
 | |
| 		Type:      EventSynced,
 | |
| 		Timestamp: time.Now(),
 | |
| 		Metadata: map[string]interface{}{
 | |
| 			"node_id":    cs.nodeID,
 | |
| 			"sync_time": time.Since(start),
 | |
| 		},
 | |
| 	}
 | |
| 	cs.eventNotifier.NotifyStored(ctx, event) // Reuse stored notification
 | |
| 
 | |
| 	return nil
 | |
| }
 | |
| 
 | |
| // Close shuts down the context store and cleans up resources
 | |
| func (cs *ContextStoreImpl) Close() error {
 | |
| 	// Signal background processes to stop
 | |
| 	close(cs.stopCh)
 | |
| 
 | |
| 	// Stop tickers
 | |
| 	if cs.syncTicker != nil {
 | |
| 		cs.syncTicker.Stop()
 | |
| 	}
 | |
| 	if cs.compactionTicker != nil {
 | |
| 		cs.compactionTicker.Stop()
 | |
| 	}
 | |
| 	if cs.cleanupTicker != nil {
 | |
| 		cs.cleanupTicker.Stop()
 | |
| 	}
 | |
| 
 | |
| 	// Close storage implementations if they support it
 | |
| 	if closer, ok := cs.localStorage.(*LocalStorageImpl); ok {
 | |
| 		if err := closer.Close(); err != nil {
 | |
| 			return fmt.Errorf("failed to close local storage: %w", err)
 | |
| 		}
 | |
| 	}
 | |
| 
 | |
| 	return nil
 | |
| }
 |