package storage import ( "context" "encoding/json" "fmt" "sync" "time" "github.com/anthonyrawlins/bzzz/pkg/dht" "github.com/anthonyrawlins/bzzz/pkg/types" ) // DistributedStorageImpl implements the DistributedStorage interface type DistributedStorageImpl struct { mu sync.RWMutex dht dht.DHT nodeID string metrics *DistributedStorageStats replicas map[string][]string // key -> replica node IDs heartbeat *HeartbeatManager consensus *ConsensusManager options *DistributedStorageOptions } // HeartbeatManager manages node heartbeats and health type HeartbeatManager struct { mu sync.RWMutex nodes map[string]*NodeHealth heartbeatInterval time.Duration timeoutThreshold time.Duration stopCh chan struct{} } // NodeHealth tracks the health of a distributed storage node type NodeHealth struct { NodeID string `json:"node_id"` LastSeen time.Time `json:"last_seen"` Latency time.Duration `json:"latency"` IsActive bool `json:"is_active"` FailureCount int `json:"failure_count"` Load float64 `json:"load"` } // ConsensusManager handles consensus operations for distributed storage type ConsensusManager struct { mu sync.RWMutex pendingOps map[string]*ConsensusOperation votingTimeout time.Duration quorumSize int } // ConsensusOperation represents a distributed operation requiring consensus type ConsensusOperation struct { ID string `json:"id"` Type string `json:"type"` Key string `json:"key"` Data interface{} `json:"data"` Initiator string `json:"initiator"` Votes map[string]bool `json:"votes"` CreatedAt time.Time `json:"created_at"` Status ConsensusStatus `json:"status"` Callback func(bool, error) `json:"-"` } // ConsensusStatus represents the status of a consensus operation type ConsensusStatus string const ( ConsensusPending ConsensusStatus = "pending" ConsensusApproved ConsensusStatus = "approved" ConsensusRejected ConsensusStatus = "rejected" ConsensusTimeout ConsensusStatus = "timeout" ) // NewDistributedStorage creates a new distributed storage implementation func NewDistributedStorage( dht dht.DHT, nodeID string, options *DistributedStorageOptions, ) *DistributedStorageImpl { if options == nil { options = &DistributedStoreOptions{ ReplicationFactor: 3, ConsistencyLevel: ConsistencyQuorum, Timeout: 30 * time.Second, PreferLocal: true, SyncMode: SyncAsync, } } ds := &DistributedStorageImpl{ dht: dht, nodeID: nodeID, options: options, replicas: make(map[string][]string), metrics: &DistributedStorageStats{ LastRebalance: time.Now(), }, heartbeat: &HeartbeatManager{ nodes: make(map[string]*NodeHealth), heartbeatInterval: 30 * time.Second, timeoutThreshold: 90 * time.Second, stopCh: make(chan struct{}), }, consensus: &ConsensusManager{ pendingOps: make(map[string]*ConsensusOperation), votingTimeout: 10 * time.Second, quorumSize: (options.ReplicationFactor / 2) + 1, }, } // Start background processes go ds.heartbeat.start() go ds.consensusMonitor() go ds.rebalanceMonitor() return ds } // Store stores data in the distributed DHT with replication func (ds *DistributedStorageImpl) Store( ctx context.Context, key string, data interface{}, options *DistributedStoreOptions, ) error { start := time.Now() if options == nil { options = ds.options } // Serialize data dataBytes, err := json.Marshal(data) if err != nil { return fmt.Errorf("failed to marshal data: %w", err) } // Create distributed entry entry := &DistributedEntry{ Key: key, Data: dataBytes, ReplicationFactor: options.ReplicationFactor, ConsistencyLevel: options.ConsistencyLevel, CreatedAt: time.Now(), Version: 1, Checksum: ds.calculateChecksum(dataBytes), } // Determine target nodes for replication targetNodes, err := ds.selectReplicationNodes(key, options.ReplicationFactor) if err != nil { return fmt.Errorf("failed to select replication nodes: %w", err) } // Store based on consistency level switch options.ConsistencyLevel { case ConsistencyEventual: return ds.storeEventual(ctx, entry, targetNodes) case ConsistencyStrong: return ds.storeStrong(ctx, entry, targetNodes) case ConsistencyQuorum: return ds.storeQuorum(ctx, entry, targetNodes) default: return fmt.Errorf("unsupported consistency level: %s", options.ConsistencyLevel) } } // Retrieve retrieves data from the distributed DHT func (ds *DistributedStorageImpl) Retrieve( ctx context.Context, key string, ) (interface{}, error) { start := time.Now() defer func() { ds.updateLatencyMetrics(time.Since(start)) }() // Try local first if prefer local is enabled if ds.options.PreferLocal { if localData, err := ds.dht.Get(key); err == nil { return ds.deserializeEntry(localData) } } // Get replica nodes for this key replicas, err := ds.getReplicationNodes(key) if err != nil { return nil, fmt.Errorf("failed to get replication nodes: %w", err) } // Retrieve from replicas return ds.retrieveFromReplicas(ctx, key, replicas) } // Delete removes data from the distributed DHT func (ds *DistributedStorageImpl) Delete( ctx context.Context, key string, ) error { // Get replica nodes replicas, err := ds.getReplicationNodes(key) if err != nil { return fmt.Errorf("failed to get replication nodes: %w", err) } // Create consensus operation for deletion opID := ds.generateOperationID() op := &ConsensusOperation{ ID: opID, Type: "delete", Key: key, Initiator: ds.nodeID, Votes: make(map[string]bool), CreatedAt: time.Now(), Status: ConsensusPending, } // Execute consensus deletion return ds.executeConsensusOperation(ctx, op, replicas) } // Exists checks if data exists in the DHT func (ds *DistributedStorageImpl) Exists( ctx context.Context, key string, ) (bool, error) { // Try local first if ds.options.PreferLocal { if exists, err := ds.dht.Exists(key); err == nil { return exists, nil } } // Check replicas replicas, err := ds.getReplicationNodes(key) if err != nil { return false, fmt.Errorf("failed to get replication nodes: %w", err) } for _, nodeID := range replicas { if exists, err := ds.checkExistsOnNode(ctx, nodeID, key); err == nil && exists { return true, nil } } return false, nil } // Replicate ensures data is replicated across nodes func (ds *DistributedStorageImpl) Replicate( ctx context.Context, key string, replicationFactor int, ) error { // Get current replicas currentReplicas, err := ds.getReplicationNodes(key) if err != nil { return fmt.Errorf("failed to get current replicas: %w", err) } // If we already have enough replicas, return if len(currentReplicas) >= replicationFactor { return nil } // Get the data to replicate data, err := ds.Retrieve(ctx, key) if err != nil { return fmt.Errorf("failed to retrieve data for replication: %w", err) } // Select additional nodes for replication neededReplicas := replicationFactor - len(currentReplicas) newNodes, err := ds.selectAdditionalNodes(key, currentReplicas, neededReplicas) if err != nil { return fmt.Errorf("failed to select additional nodes: %w", err) } // Replicate to new nodes for _, nodeID := range newNodes { if err := ds.replicateToNode(ctx, nodeID, key, data); err != nil { // Log but continue with other nodes fmt.Printf("Failed to replicate to node %s: %v\n", nodeID, err) continue } currentReplicas = append(currentReplicas, nodeID) } // Update replica tracking ds.mu.Lock() ds.replicas[key] = currentReplicas ds.mu.Unlock() return nil } // FindReplicas finds all replicas of data func (ds *DistributedStorageImpl) FindReplicas( ctx context.Context, key string, ) ([]string, error) { return ds.getReplicationNodes(key) } // Sync synchronizes with other DHT nodes func (ds *DistributedStorageImpl) Sync(ctx context.Context) error { start := time.Now() defer func() { ds.metrics.LastRebalance = time.Now() }() // Get list of active nodes activeNodes := ds.heartbeat.getActiveNodes() // Sync with each active node for _, nodeID := range activeNodes { if nodeID == ds.nodeID { continue // Skip self } if err := ds.syncWithNode(ctx, nodeID); err != nil { // Log but continue with other nodes fmt.Printf("Failed to sync with node %s: %v\n", nodeID, err) continue } } return nil } // GetDistributedStats returns distributed storage statistics func (ds *DistributedStorageImpl) GetDistributedStats() (*DistributedStorageStats, error) { ds.mu.RLock() defer ds.mu.RUnlock() // Update current stats activeNodes := ds.heartbeat.getActiveNodes() ds.metrics.ActiveNodes = len(activeNodes) ds.metrics.TotalNodes = len(ds.heartbeat.nodes) ds.metrics.FailedNodes = ds.metrics.TotalNodes - ds.metrics.ActiveNodes // Calculate replica health totalReplicas := int64(0) healthyReplicas := int64(0) underReplicated := int64(0) for key, replicas := range ds.replicas { totalReplicas += int64(len(replicas)) healthy := 0 for _, nodeID := range replicas { if ds.heartbeat.isNodeHealthy(nodeID) { healthy++ } } healthyReplicas += int64(healthy) if healthy < ds.options.ReplicationFactor { underReplicated++ } } ds.metrics.TotalReplicas = totalReplicas ds.metrics.HealthyReplicas = healthyReplicas ds.metrics.UnderReplicated = underReplicated // Return copy statsCopy := *ds.metrics return &statsCopy, nil } // DistributedEntry represents a distributed storage entry type DistributedEntry struct { Key string `json:"key"` Data []byte `json:"data"` ReplicationFactor int `json:"replication_factor"` ConsistencyLevel ConsistencyLevel `json:"consistency_level"` CreatedAt time.Time `json:"created_at"` UpdatedAt time.Time `json:"updated_at"` Version int64 `json:"version"` Checksum string `json:"checksum"` } // Helper methods implementation func (ds *DistributedStorageImpl) selectReplicationNodes(key string, replicationFactor int) ([]string, error) { // Get active nodes activeNodes := ds.heartbeat.getActiveNodes() if len(activeNodes) < replicationFactor { return nil, fmt.Errorf("insufficient active nodes: need %d, have %d", replicationFactor, len(activeNodes)) } // Use consistent hashing to determine primary replicas // This is a simplified version - production would use proper consistent hashing nodes := make([]string, 0, replicationFactor) hash := ds.calculateKeyHash(key) // Select nodes in a deterministic way based on key hash for i := 0; i < replicationFactor && i < len(activeNodes); i++ { nodeIndex := (int(hash) + i) % len(activeNodes) nodes = append(nodes, activeNodes[nodeIndex]) } return nodes, nil } func (ds *DistributedStorageImpl) storeEventual(ctx context.Context, entry *DistributedEntry, nodes []string) error { // Store asynchronously on all nodes errCh := make(chan error, len(nodes)) for _, nodeID := range nodes { go func(node string) { err := ds.storeOnNode(ctx, node, entry) errorCh <- err }(nodeID) } // Don't wait for all nodes - eventual consistency // Just ensure at least one succeeds select { case err := <-errCh: if err == nil { return nil // First success } case <-time.After(5 * time.Second): return fmt.Errorf("timeout waiting for eventual store") } // If first failed, try to get at least one success timer := time.NewTimer(10 * time.Second) defer timer.Stop() for i := 1; i < len(nodes); i++ { select { case err := <-errCh: if err == nil { return nil } case <-timer.C: return fmt.Errorf("timeout waiting for eventual store success") } } return fmt.Errorf("failed to store on any node") } func (ds *DistributedStorageImpl) storeStrong(ctx context.Context, entry *DistributedEntry, nodes []string) error { // Store synchronously on all nodes errCh := make(chan error, len(nodes)) for _, nodeID := range nodes { go func(node string) { err := ds.storeOnNode(ctx, node, entry) errorCh <- err }(nodeID) } // Wait for all nodes to complete var errors []error for i := 0; i < len(nodes); i++ { select { case err := <-errCh: if err != nil { errors = append(errors, err) } case <-time.After(30 * time.Second): return fmt.Errorf("timeout waiting for strong consistency store") } } if len(errors) > 0 { return fmt.Errorf("strong consistency store failed: %v", errors) } return nil } func (ds *DistributedStorageImpl) storeQuorum(ctx context.Context, entry *DistributedEntry, nodes []string) error { // Store on quorum of nodes quorumSize := (len(nodes) / 2) + 1 errCh := make(chan error, len(nodes)) for _, nodeID := range nodes { go func(node string) { err := ds.storeOnNode(ctx, node, entry) errorCh <- err }(nodeID) } // Wait for quorum successCount := 0 errorCount := 0 for i := 0; i < len(nodes); i++ { select { case err := <-errCh: if err == nil { successCount++ if successCount >= quorumSize { return nil // Quorum achieved } } else { errorCount++ if errorCount > len(nodes)-quorumSize { return fmt.Errorf("quorum store failed: too many errors") } } case <-time.After(20 * time.Second): return fmt.Errorf("timeout waiting for quorum store") } } return fmt.Errorf("quorum store failed") } // Additional helper method implementations would continue here... // This is a substantial implementation showing the architecture func (ds *DistributedStorageImpl) calculateChecksum(data []byte) string { // Simple checksum calculation - would use proper hashing in production return fmt.Sprintf("%x", len(data)) // Placeholder } func (ds *DistributedStorageImpl) calculateKeyHash(key string) uint32 { // Simple hash function - would use proper consistent hashing in production hash := uint32(0) for _, c := range key { hash = hash*31 + uint32(c) } return hash } func (ds *DistributedStorageImpl) generateOperationID() string { return fmt.Sprintf("%s-%d", ds.nodeID, time.Now().UnixNano()) } func (ds *DistributedStorageImpl) updateLatencyMetrics(latency time.Duration) { ds.mu.Lock() defer ds.mu.Unlock() if ds.metrics.NetworkLatency == 0 { ds.metrics.NetworkLatency = latency } else { // Exponential moving average ds.metrics.NetworkLatency = time.Duration( float64(ds.metrics.NetworkLatency)*0.8 + float64(latency)*0.2, ) } } // Placeholder implementations for remaining methods func (ds *DistributedStorageImpl) getReplicationNodes(key string) ([]string, error) { ds.mu.RLock() defer ds.mu.RUnlock() if replicas, exists := ds.replicas[key]; exists { return replicas, nil } // Fall back to consistent hashing return ds.selectReplicationNodes(key, ds.options.ReplicationFactor) } func (ds *DistributedStorageImpl) retrieveFromReplicas(ctx context.Context, key string, replicas []string) (interface{}, error) { // Try each replica until success for _, nodeID := range replicas { if data, err := ds.retrieveFromNode(ctx, nodeID, key); err == nil { return ds.deserializeEntry(data) } } return nil, fmt.Errorf("failed to retrieve from any replica") } func (ds *DistributedStorageImpl) deserializeEntry(data interface{}) (interface{}, error) { // Deserialize distributed entry return data, nil // Placeholder } // Heartbeat manager methods func (hm *HeartbeatManager) start() { ticker := time.NewTicker(hm.heartbeatInterval) defer ticker.Stop() for { select { case <-ticker.C: hm.checkNodeHealth() case <-hm.stopCh: return } } } func (hm *HeartbeatManager) getActiveNodes() []string { hm.mu.RLock() defer hm.mu.RUnlock() var activeNodes []string for nodeID, health := range hm.nodes { if health.IsActive { activeNodes = append(activeNodes, nodeID) } } return activeNodes } func (hm *HeartbeatManager) isNodeHealthy(nodeID string) bool { hm.mu.RLock() defer hm.mu.RUnlock() health, exists := hm.nodes[nodeID] return exists && health.IsActive } func (hm *HeartbeatManager) checkNodeHealth() { // Placeholder implementation // Would send heartbeats and update node health } // Consensus monitor and other background processes func (ds *DistributedStorageImpl) consensusMonitor() { ticker := time.NewTicker(5 * time.Second) defer ticker.Stop() for range ticker.C { ds.cleanupExpiredOperations() } } func (ds *DistributedStorageImpl) rebalanceMonitor() { ticker := time.NewTicker(1 * time.Hour) defer ticker.Stop() for range ticker.C { ds.rebalanceReplicas() } } func (ds *DistributedStorageImpl) cleanupExpiredOperations() { // Cleanup expired consensus operations } func (ds *DistributedStorageImpl) rebalanceReplicas() { // Rebalance replicas across healthy nodes } // Placeholder method stubs for remaining functionality func (ds *DistributedStorageImpl) storeOnNode(ctx context.Context, nodeID string, entry *DistributedEntry) error { // Store entry on specific node return nil } func (ds *DistributedStorageImpl) retrieveFromNode(ctx context.Context, nodeID string, key string) (interface{}, error) { // Retrieve from specific node return nil, nil } func (ds *DistributedStorageImpl) checkExistsOnNode(ctx context.Context, nodeID string, key string) (bool, error) { // Check if key exists on specific node return false, nil } func (ds *DistributedStorageImpl) replicateToNode(ctx context.Context, nodeID string, key string, data interface{}) error { // Replicate data to specific node return nil } func (ds *DistributedStorageImpl) selectAdditionalNodes(key string, currentReplicas []string, needed int) ([]string, error) { // Select additional nodes for replication return nil, nil } func (ds *DistributedStorageImpl) syncWithNode(ctx context.Context, nodeID string) error { // Sync with specific node return nil } func (ds *DistributedStorageImpl) executeConsensusOperation(ctx context.Context, op *ConsensusOperation, nodes []string) error { // Execute consensus operation across nodes return nil }