 9bdcbe0447
			
		
	
	9bdcbe0447
	
	
	
		
			
			Major integrations and fixes: - Added BACKBEAT SDK integration for P2P operation timing - Implemented beat-aware status tracking for distributed operations - Added Docker secrets support for secure license management - Resolved KACHING license validation via HTTPS/TLS - Updated docker-compose configuration for clean stack deployment - Disabled rollback policies to prevent deployment failures - Added license credential storage (CHORUS-DEV-MULTI-001) Technical improvements: - BACKBEAT P2P operation tracking with phase management - Enhanced configuration system with file-based secrets - Improved error handling for license validation - Clean separation of KACHING and CHORUS deployment stacks 🤖 Generated with [Claude Code](https://claude.ai/code) Co-Authored-By: Claude <noreply@anthropic.com>
		
			
				
	
	
		
			686 lines
		
	
	
		
			18 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
			
		
		
	
	
			686 lines
		
	
	
		
			18 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
| package storage
 | |
| 
 | |
| import (
 | |
| 	"context"
 | |
| 	"encoding/json"
 | |
| 	"fmt"
 | |
| 	"sync"
 | |
| 	"time"
 | |
| 
 | |
| 	"chorus/pkg/dht"
 | |
| 	"chorus/pkg/types"
 | |
| )
 | |
| 
 | |
| // DistributedStorageImpl implements the DistributedStorage interface
 | |
| type DistributedStorageImpl struct {
 | |
| 	mu          sync.RWMutex
 | |
| 	dht         dht.DHT
 | |
| 	nodeID      string
 | |
| 	metrics     *DistributedStorageStats
 | |
| 	replicas    map[string][]string // key -> replica node IDs
 | |
| 	heartbeat   *HeartbeatManager
 | |
| 	consensus   *ConsensusManager
 | |
| 	options     *DistributedStorageOptions
 | |
| }
 | |
| 
 | |
| // HeartbeatManager manages node heartbeats and health
 | |
| type HeartbeatManager struct {
 | |
| 	mu           sync.RWMutex
 | |
| 	nodes        map[string]*NodeHealth
 | |
| 	heartbeatInterval time.Duration
 | |
| 	timeoutThreshold  time.Duration
 | |
| 	stopCh       chan struct{}
 | |
| }
 | |
| 
 | |
| // NodeHealth tracks the health of a distributed storage node
 | |
| type NodeHealth struct {
 | |
| 	NodeID       string    `json:"node_id"`
 | |
| 	LastSeen     time.Time `json:"last_seen"`
 | |
| 	Latency      time.Duration `json:"latency"`
 | |
| 	IsActive     bool      `json:"is_active"`
 | |
| 	FailureCount int       `json:"failure_count"`
 | |
| 	Load         float64   `json:"load"`
 | |
| }
 | |
| 
 | |
| // ConsensusManager handles consensus operations for distributed storage
 | |
| type ConsensusManager struct {
 | |
| 	mu             sync.RWMutex
 | |
| 	pendingOps     map[string]*ConsensusOperation
 | |
| 	votingTimeout  time.Duration
 | |
| 	quorumSize     int
 | |
| }
 | |
| 
 | |
| // ConsensusOperation represents a distributed operation requiring consensus
 | |
| type ConsensusOperation struct {
 | |
| 	ID          string                 `json:"id"`
 | |
| 	Type        string                 `json:"type"`
 | |
| 	Key         string                 `json:"key"`
 | |
| 	Data        interface{}            `json:"data"`
 | |
| 	Initiator   string                 `json:"initiator"`
 | |
| 	Votes       map[string]bool        `json:"votes"`
 | |
| 	CreatedAt   time.Time              `json:"created_at"`
 | |
| 	Status      ConsensusStatus        `json:"status"`
 | |
| 	Callback    func(bool, error)      `json:"-"`
 | |
| }
 | |
| 
 | |
| // ConsensusStatus represents the status of a consensus operation
 | |
| type ConsensusStatus string
 | |
| 
 | |
| const (
 | |
| 	ConsensusPending   ConsensusStatus = "pending"
 | |
| 	ConsensusApproved  ConsensusStatus = "approved"
 | |
| 	ConsensusRejected  ConsensusStatus = "rejected"
 | |
| 	ConsensusTimeout   ConsensusStatus = "timeout"
 | |
| )
 | |
| 
 | |
| // NewDistributedStorage creates a new distributed storage implementation
 | |
| func NewDistributedStorage(
 | |
| 	dht dht.DHT,
 | |
| 	nodeID string,
 | |
| 	options *DistributedStorageOptions,
 | |
| ) *DistributedStorageImpl {
 | |
| 	if options == nil {
 | |
| 		options = &DistributedStoreOptions{
 | |
| 			ReplicationFactor: 3,
 | |
| 			ConsistencyLevel:  ConsistencyQuorum,
 | |
| 			Timeout:          30 * time.Second,
 | |
| 			PreferLocal:      true,
 | |
| 			SyncMode:         SyncAsync,
 | |
| 		}
 | |
| 	}
 | |
| 
 | |
| 	ds := &DistributedStorageImpl{
 | |
| 		dht:      dht,
 | |
| 		nodeID:   nodeID,
 | |
| 		options:  options,
 | |
| 		replicas: make(map[string][]string),
 | |
| 		metrics: &DistributedStorageStats{
 | |
| 			LastRebalance: time.Now(),
 | |
| 		},
 | |
| 		heartbeat: &HeartbeatManager{
 | |
| 			nodes:            make(map[string]*NodeHealth),
 | |
| 			heartbeatInterval: 30 * time.Second,
 | |
| 			timeoutThreshold:  90 * time.Second,
 | |
| 			stopCh:           make(chan struct{}),
 | |
| 		},
 | |
| 		consensus: &ConsensusManager{
 | |
| 			pendingOps:    make(map[string]*ConsensusOperation),
 | |
| 			votingTimeout: 10 * time.Second,
 | |
| 			quorumSize:    (options.ReplicationFactor / 2) + 1,
 | |
| 		},
 | |
| 	}
 | |
| 
 | |
| 	// Start background processes
 | |
| 	go ds.heartbeat.start()
 | |
| 	go ds.consensusMonitor()
 | |
| 	go ds.rebalanceMonitor()
 | |
| 
 | |
| 	return ds
 | |
| }
 | |
| 
 | |
| // Store stores data in the distributed DHT with replication
 | |
| func (ds *DistributedStorageImpl) Store(
 | |
| 	ctx context.Context,
 | |
| 	key string,
 | |
| 	data interface{},
 | |
| 	options *DistributedStoreOptions,
 | |
| ) error {
 | |
| 	start := time.Now()
 | |
| 
 | |
| 	if options == nil {
 | |
| 		options = ds.options
 | |
| 	}
 | |
| 
 | |
| 	// Serialize data
 | |
| 	dataBytes, err := json.Marshal(data)
 | |
| 	if err != nil {
 | |
| 		return fmt.Errorf("failed to marshal data: %w", err)
 | |
| 	}
 | |
| 
 | |
| 	// Create distributed entry
 | |
| 	entry := &DistributedEntry{
 | |
| 		Key:               key,
 | |
| 		Data:              dataBytes,
 | |
| 		ReplicationFactor: options.ReplicationFactor,
 | |
| 		ConsistencyLevel:  options.ConsistencyLevel,
 | |
| 		CreatedAt:         time.Now(),
 | |
| 		Version:           1,
 | |
| 		Checksum:          ds.calculateChecksum(dataBytes),
 | |
| 	}
 | |
| 
 | |
| 	// Determine target nodes for replication
 | |
| 	targetNodes, err := ds.selectReplicationNodes(key, options.ReplicationFactor)
 | |
| 	if err != nil {
 | |
| 		return fmt.Errorf("failed to select replication nodes: %w", err)
 | |
| 	}
 | |
| 
 | |
| 	// Store based on consistency level
 | |
| 	switch options.ConsistencyLevel {
 | |
| 	case ConsistencyEventual:
 | |
| 		return ds.storeEventual(ctx, entry, targetNodes)
 | |
| 	case ConsistencyStrong:
 | |
| 		return ds.storeStrong(ctx, entry, targetNodes)
 | |
| 	case ConsistencyQuorum:
 | |
| 		return ds.storeQuorum(ctx, entry, targetNodes)
 | |
| 	default:
 | |
| 		return fmt.Errorf("unsupported consistency level: %s", options.ConsistencyLevel)
 | |
| 	}
 | |
| }
 | |
| 
 | |
| // Retrieve retrieves data from the distributed DHT
 | |
| func (ds *DistributedStorageImpl) Retrieve(
 | |
| 	ctx context.Context,
 | |
| 	key string,
 | |
| ) (interface{}, error) {
 | |
| 	start := time.Now()
 | |
| 	defer func() {
 | |
| 		ds.updateLatencyMetrics(time.Since(start))
 | |
| 	}()
 | |
| 
 | |
| 	// Try local first if prefer local is enabled
 | |
| 	if ds.options.PreferLocal {
 | |
| 		if localData, err := ds.dht.Get(key); err == nil {
 | |
| 			return ds.deserializeEntry(localData)
 | |
| 		}
 | |
| 	}
 | |
| 
 | |
| 	// Get replica nodes for this key
 | |
| 	replicas, err := ds.getReplicationNodes(key)
 | |
| 	if err != nil {
 | |
| 		return nil, fmt.Errorf("failed to get replication nodes: %w", err)
 | |
| 	}
 | |
| 
 | |
| 	// Retrieve from replicas
 | |
| 	return ds.retrieveFromReplicas(ctx, key, replicas)
 | |
| }
 | |
| 
 | |
| // Delete removes data from the distributed DHT
 | |
| func (ds *DistributedStorageImpl) Delete(
 | |
| 	ctx context.Context,
 | |
| 	key string,
 | |
| ) error {
 | |
| 	// Get replica nodes
 | |
| 	replicas, err := ds.getReplicationNodes(key)
 | |
| 	if err != nil {
 | |
| 		return fmt.Errorf("failed to get replication nodes: %w", err)
 | |
| 	}
 | |
| 
 | |
| 	// Create consensus operation for deletion
 | |
| 	opID := ds.generateOperationID()
 | |
| 	op := &ConsensusOperation{
 | |
| 		ID:        opID,
 | |
| 		Type:      "delete",
 | |
| 		Key:       key,
 | |
| 		Initiator: ds.nodeID,
 | |
| 		Votes:     make(map[string]bool),
 | |
| 		CreatedAt: time.Now(),
 | |
| 		Status:    ConsensusPending,
 | |
| 	}
 | |
| 
 | |
| 	// Execute consensus deletion
 | |
| 	return ds.executeConsensusOperation(ctx, op, replicas)
 | |
| }
 | |
| 
 | |
| // Exists checks if data exists in the DHT
 | |
| func (ds *DistributedStorageImpl) Exists(
 | |
| 	ctx context.Context,
 | |
| 	key string,
 | |
| ) (bool, error) {
 | |
| 	// Try local first
 | |
| 	if ds.options.PreferLocal {
 | |
| 		if exists, err := ds.dht.Exists(key); err == nil {
 | |
| 			return exists, nil
 | |
| 		}
 | |
| 	}
 | |
| 
 | |
| 	// Check replicas
 | |
| 	replicas, err := ds.getReplicationNodes(key)
 | |
| 	if err != nil {
 | |
| 		return false, fmt.Errorf("failed to get replication nodes: %w", err)
 | |
| 	}
 | |
| 
 | |
| 	for _, nodeID := range replicas {
 | |
| 		if exists, err := ds.checkExistsOnNode(ctx, nodeID, key); err == nil && exists {
 | |
| 			return true, nil
 | |
| 		}
 | |
| 	}
 | |
| 
 | |
| 	return false, nil
 | |
| }
 | |
| 
 | |
| // Replicate ensures data is replicated across nodes
 | |
| func (ds *DistributedStorageImpl) Replicate(
 | |
| 	ctx context.Context,
 | |
| 	key string,
 | |
| 	replicationFactor int,
 | |
| ) error {
 | |
| 	// Get current replicas
 | |
| 	currentReplicas, err := ds.getReplicationNodes(key)
 | |
| 	if err != nil {
 | |
| 		return fmt.Errorf("failed to get current replicas: %w", err)
 | |
| 	}
 | |
| 
 | |
| 	// If we already have enough replicas, return
 | |
| 	if len(currentReplicas) >= replicationFactor {
 | |
| 		return nil
 | |
| 	}
 | |
| 
 | |
| 	// Get the data to replicate
 | |
| 	data, err := ds.Retrieve(ctx, key)
 | |
| 	if err != nil {
 | |
| 		return fmt.Errorf("failed to retrieve data for replication: %w", err)
 | |
| 	}
 | |
| 
 | |
| 	// Select additional nodes for replication
 | |
| 	neededReplicas := replicationFactor - len(currentReplicas)
 | |
| 	newNodes, err := ds.selectAdditionalNodes(key, currentReplicas, neededReplicas)
 | |
| 	if err != nil {
 | |
| 		return fmt.Errorf("failed to select additional nodes: %w", err)
 | |
| 	}
 | |
| 
 | |
| 	// Replicate to new nodes
 | |
| 	for _, nodeID := range newNodes {
 | |
| 		if err := ds.replicateToNode(ctx, nodeID, key, data); err != nil {
 | |
| 			// Log but continue with other nodes
 | |
| 			fmt.Printf("Failed to replicate to node %s: %v\n", nodeID, err)
 | |
| 			continue
 | |
| 		}
 | |
| 		currentReplicas = append(currentReplicas, nodeID)
 | |
| 	}
 | |
| 
 | |
| 	// Update replica tracking
 | |
| 	ds.mu.Lock()
 | |
| 	ds.replicas[key] = currentReplicas
 | |
| 	ds.mu.Unlock()
 | |
| 
 | |
| 	return nil
 | |
| }
 | |
| 
 | |
| // FindReplicas finds all replicas of data
 | |
| func (ds *DistributedStorageImpl) FindReplicas(
 | |
| 	ctx context.Context,
 | |
| 	key string,
 | |
| ) ([]string, error) {
 | |
| 	return ds.getReplicationNodes(key)
 | |
| }
 | |
| 
 | |
| // Sync synchronizes with other DHT nodes
 | |
| func (ds *DistributedStorageImpl) Sync(ctx context.Context) error {
 | |
| 	start := time.Now()
 | |
| 	defer func() {
 | |
| 		ds.metrics.LastRebalance = time.Now()
 | |
| 	}()
 | |
| 
 | |
| 	// Get list of active nodes
 | |
| 	activeNodes := ds.heartbeat.getActiveNodes()
 | |
| 
 | |
| 	// Sync with each active node
 | |
| 	for _, nodeID := range activeNodes {
 | |
| 		if nodeID == ds.nodeID {
 | |
| 			continue // Skip self
 | |
| 		}
 | |
| 
 | |
| 		if err := ds.syncWithNode(ctx, nodeID); err != nil {
 | |
| 			// Log but continue with other nodes
 | |
| 			fmt.Printf("Failed to sync with node %s: %v\n", nodeID, err)
 | |
| 			continue
 | |
| 		}
 | |
| 	}
 | |
| 
 | |
| 	return nil
 | |
| }
 | |
| 
 | |
| // GetDistributedStats returns distributed storage statistics
 | |
| func (ds *DistributedStorageImpl) GetDistributedStats() (*DistributedStorageStats, error) {
 | |
| 	ds.mu.RLock()
 | |
| 	defer ds.mu.RUnlock()
 | |
| 
 | |
| 	// Update current stats
 | |
| 	activeNodes := ds.heartbeat.getActiveNodes()
 | |
| 	ds.metrics.ActiveNodes = len(activeNodes)
 | |
| 	ds.metrics.TotalNodes = len(ds.heartbeat.nodes)
 | |
| 	ds.metrics.FailedNodes = ds.metrics.TotalNodes - ds.metrics.ActiveNodes
 | |
| 
 | |
| 	// Calculate replica health
 | |
| 	totalReplicas := int64(0)
 | |
| 	healthyReplicas := int64(0)
 | |
| 	underReplicated := int64(0)
 | |
| 
 | |
| 	for key, replicas := range ds.replicas {
 | |
| 		totalReplicas += int64(len(replicas))
 | |
| 		healthy := 0
 | |
| 		for _, nodeID := range replicas {
 | |
| 			if ds.heartbeat.isNodeHealthy(nodeID) {
 | |
| 				healthy++
 | |
| 			}
 | |
| 		}
 | |
| 		healthyReplicas += int64(healthy)
 | |
| 		if healthy < ds.options.ReplicationFactor {
 | |
| 			underReplicated++
 | |
| 		}
 | |
| 	}
 | |
| 
 | |
| 	ds.metrics.TotalReplicas = totalReplicas
 | |
| 	ds.metrics.HealthyReplicas = healthyReplicas
 | |
| 	ds.metrics.UnderReplicated = underReplicated
 | |
| 
 | |
| 	// Return copy
 | |
| 	statsCopy := *ds.metrics
 | |
| 	return &statsCopy, nil
 | |
| }
 | |
| 
 | |
| // DistributedEntry represents a distributed storage entry
 | |
| type DistributedEntry struct {
 | |
| 	Key               string          `json:"key"`
 | |
| 	Data              []byte          `json:"data"`
 | |
| 	ReplicationFactor int             `json:"replication_factor"`
 | |
| 	ConsistencyLevel  ConsistencyLevel `json:"consistency_level"`
 | |
| 	CreatedAt         time.Time       `json:"created_at"`
 | |
| 	UpdatedAt         time.Time       `json:"updated_at"`
 | |
| 	Version           int64           `json:"version"`
 | |
| 	Checksum          string          `json:"checksum"`
 | |
| }
 | |
| 
 | |
| // Helper methods implementation
 | |
| 
 | |
| func (ds *DistributedStorageImpl) selectReplicationNodes(key string, replicationFactor int) ([]string, error) {
 | |
| 	// Get active nodes
 | |
| 	activeNodes := ds.heartbeat.getActiveNodes()
 | |
| 	if len(activeNodes) < replicationFactor {
 | |
| 		return nil, fmt.Errorf("insufficient active nodes: need %d, have %d", replicationFactor, len(activeNodes))
 | |
| 	}
 | |
| 
 | |
| 	// Use consistent hashing to determine primary replicas
 | |
| 	// This is a simplified version - production would use proper consistent hashing
 | |
| 	nodes := make([]string, 0, replicationFactor)
 | |
| 	hash := ds.calculateKeyHash(key)
 | |
| 	
 | |
| 	// Select nodes in a deterministic way based on key hash
 | |
| 	for i := 0; i < replicationFactor && i < len(activeNodes); i++ {
 | |
| 		nodeIndex := (int(hash) + i) % len(activeNodes)
 | |
| 		nodes = append(nodes, activeNodes[nodeIndex])
 | |
| 	}
 | |
| 
 | |
| 	return nodes, nil
 | |
| }
 | |
| 
 | |
| func (ds *DistributedStorageImpl) storeEventual(ctx context.Context, entry *DistributedEntry, nodes []string) error {
 | |
| 	// Store asynchronously on all nodes
 | |
| 	errCh := make(chan error, len(nodes))
 | |
| 	
 | |
| 	for _, nodeID := range nodes {
 | |
| 		go func(node string) {
 | |
| 			err := ds.storeOnNode(ctx, node, entry)
 | |
| 			errorCh <- err
 | |
| 		}(nodeID)
 | |
| 	}
 | |
| 
 | |
| 	// Don't wait for all nodes - eventual consistency
 | |
| 	// Just ensure at least one succeeds
 | |
| 	select {
 | |
| 	case err := <-errCh:
 | |
| 		if err == nil {
 | |
| 			return nil // First success
 | |
| 		}
 | |
| 	case <-time.After(5 * time.Second):
 | |
| 		return fmt.Errorf("timeout waiting for eventual store")
 | |
| 	}
 | |
| 
 | |
| 	// If first failed, try to get at least one success
 | |
| 	timer := time.NewTimer(10 * time.Second)
 | |
| 	defer timer.Stop()
 | |
| 	
 | |
| 	for i := 1; i < len(nodes); i++ {
 | |
| 		select {
 | |
| 		case err := <-errCh:
 | |
| 			if err == nil {
 | |
| 				return nil
 | |
| 			}
 | |
| 		case <-timer.C:
 | |
| 			return fmt.Errorf("timeout waiting for eventual store success")
 | |
| 		}
 | |
| 	}
 | |
| 
 | |
| 	return fmt.Errorf("failed to store on any node")
 | |
| }
 | |
| 
 | |
| func (ds *DistributedStorageImpl) storeStrong(ctx context.Context, entry *DistributedEntry, nodes []string) error {
 | |
| 	// Store synchronously on all nodes
 | |
| 	errCh := make(chan error, len(nodes))
 | |
| 	
 | |
| 	for _, nodeID := range nodes {
 | |
| 		go func(node string) {
 | |
| 			err := ds.storeOnNode(ctx, node, entry)
 | |
| 			errorCh <- err
 | |
| 		}(nodeID)
 | |
| 	}
 | |
| 
 | |
| 	// Wait for all nodes to complete
 | |
| 	var errors []error
 | |
| 	for i := 0; i < len(nodes); i++ {
 | |
| 		select {
 | |
| 		case err := <-errCh:
 | |
| 			if err != nil {
 | |
| 				errors = append(errors, err)
 | |
| 			}
 | |
| 		case <-time.After(30 * time.Second):
 | |
| 			return fmt.Errorf("timeout waiting for strong consistency store")
 | |
| 		}
 | |
| 	}
 | |
| 
 | |
| 	if len(errors) > 0 {
 | |
| 		return fmt.Errorf("strong consistency store failed: %v", errors)
 | |
| 	}
 | |
| 
 | |
| 	return nil
 | |
| }
 | |
| 
 | |
| func (ds *DistributedStorageImpl) storeQuorum(ctx context.Context, entry *DistributedEntry, nodes []string) error {
 | |
| 	// Store on quorum of nodes
 | |
| 	quorumSize := (len(nodes) / 2) + 1
 | |
| 	errCh := make(chan error, len(nodes))
 | |
| 	
 | |
| 	for _, nodeID := range nodes {
 | |
| 		go func(node string) {
 | |
| 			err := ds.storeOnNode(ctx, node, entry)
 | |
| 			errorCh <- err
 | |
| 		}(nodeID)
 | |
| 	}
 | |
| 
 | |
| 	// Wait for quorum
 | |
| 	successCount := 0
 | |
| 	errorCount := 0
 | |
| 	
 | |
| 	for i := 0; i < len(nodes); i++ {
 | |
| 		select {
 | |
| 		case err := <-errCh:
 | |
| 			if err == nil {
 | |
| 				successCount++
 | |
| 				if successCount >= quorumSize {
 | |
| 					return nil // Quorum achieved
 | |
| 				}
 | |
| 			} else {
 | |
| 				errorCount++
 | |
| 				if errorCount > len(nodes)-quorumSize {
 | |
| 					return fmt.Errorf("quorum store failed: too many errors")
 | |
| 				}
 | |
| 			}
 | |
| 		case <-time.After(20 * time.Second):
 | |
| 			return fmt.Errorf("timeout waiting for quorum store")
 | |
| 		}
 | |
| 	}
 | |
| 
 | |
| 	return fmt.Errorf("quorum store failed")
 | |
| }
 | |
| 
 | |
| // Additional helper method implementations would continue here...
 | |
| // This is a substantial implementation showing the architecture
 | |
| 
 | |
| func (ds *DistributedStorageImpl) calculateChecksum(data []byte) string {
 | |
| 	// Simple checksum calculation - would use proper hashing in production
 | |
| 	return fmt.Sprintf("%x", len(data)) // Placeholder
 | |
| }
 | |
| 
 | |
| func (ds *DistributedStorageImpl) calculateKeyHash(key string) uint32 {
 | |
| 	// Simple hash function - would use proper consistent hashing in production
 | |
| 	hash := uint32(0)
 | |
| 	for _, c := range key {
 | |
| 		hash = hash*31 + uint32(c)
 | |
| 	}
 | |
| 	return hash
 | |
| }
 | |
| 
 | |
| func (ds *DistributedStorageImpl) generateOperationID() string {
 | |
| 	return fmt.Sprintf("%s-%d", ds.nodeID, time.Now().UnixNano())
 | |
| }
 | |
| 
 | |
| func (ds *DistributedStorageImpl) updateLatencyMetrics(latency time.Duration) {
 | |
| 	ds.mu.Lock()
 | |
| 	defer ds.mu.Unlock()
 | |
| 	
 | |
| 	if ds.metrics.NetworkLatency == 0 {
 | |
| 		ds.metrics.NetworkLatency = latency
 | |
| 	} else {
 | |
| 		// Exponential moving average
 | |
| 		ds.metrics.NetworkLatency = time.Duration(
 | |
| 			float64(ds.metrics.NetworkLatency)*0.8 + float64(latency)*0.2,
 | |
| 		)
 | |
| 	}
 | |
| }
 | |
| 
 | |
| // Placeholder implementations for remaining methods
 | |
| 
 | |
| func (ds *DistributedStorageImpl) getReplicationNodes(key string) ([]string, error) {
 | |
| 	ds.mu.RLock()
 | |
| 	defer ds.mu.RUnlock()
 | |
| 	
 | |
| 	if replicas, exists := ds.replicas[key]; exists {
 | |
| 		return replicas, nil
 | |
| 	}
 | |
| 	
 | |
| 	// Fall back to consistent hashing
 | |
| 	return ds.selectReplicationNodes(key, ds.options.ReplicationFactor)
 | |
| }
 | |
| 
 | |
| func (ds *DistributedStorageImpl) retrieveFromReplicas(ctx context.Context, key string, replicas []string) (interface{}, error) {
 | |
| 	// Try each replica until success
 | |
| 	for _, nodeID := range replicas {
 | |
| 		if data, err := ds.retrieveFromNode(ctx, nodeID, key); err == nil {
 | |
| 			return ds.deserializeEntry(data)
 | |
| 		}
 | |
| 	}
 | |
| 	return nil, fmt.Errorf("failed to retrieve from any replica")
 | |
| }
 | |
| 
 | |
| func (ds *DistributedStorageImpl) deserializeEntry(data interface{}) (interface{}, error) {
 | |
| 	// Deserialize distributed entry
 | |
| 	return data, nil // Placeholder
 | |
| }
 | |
| 
 | |
| // Heartbeat manager methods
 | |
| 
 | |
| func (hm *HeartbeatManager) start() {
 | |
| 	ticker := time.NewTicker(hm.heartbeatInterval)
 | |
| 	defer ticker.Stop()
 | |
| 
 | |
| 	for {
 | |
| 		select {
 | |
| 		case <-ticker.C:
 | |
| 			hm.checkNodeHealth()
 | |
| 		case <-hm.stopCh:
 | |
| 			return
 | |
| 		}
 | |
| 	}
 | |
| }
 | |
| 
 | |
| func (hm *HeartbeatManager) getActiveNodes() []string {
 | |
| 	hm.mu.RLock()
 | |
| 	defer hm.mu.RUnlock()
 | |
| 
 | |
| 	var activeNodes []string
 | |
| 	for nodeID, health := range hm.nodes {
 | |
| 		if health.IsActive {
 | |
| 			activeNodes = append(activeNodes, nodeID)
 | |
| 		}
 | |
| 	}
 | |
| 	return activeNodes
 | |
| }
 | |
| 
 | |
| func (hm *HeartbeatManager) isNodeHealthy(nodeID string) bool {
 | |
| 	hm.mu.RLock()
 | |
| 	defer hm.mu.RUnlock()
 | |
| 
 | |
| 	health, exists := hm.nodes[nodeID]
 | |
| 	return exists && health.IsActive
 | |
| }
 | |
| 
 | |
| func (hm *HeartbeatManager) checkNodeHealth() {
 | |
| 	// Placeholder implementation
 | |
| 	// Would send heartbeats and update node health
 | |
| }
 | |
| 
 | |
| // Consensus monitor and other background processes
 | |
| 
 | |
| func (ds *DistributedStorageImpl) consensusMonitor() {
 | |
| 	ticker := time.NewTicker(5 * time.Second)
 | |
| 	defer ticker.Stop()
 | |
| 
 | |
| 	for range ticker.C {
 | |
| 		ds.cleanupExpiredOperations()
 | |
| 	}
 | |
| }
 | |
| 
 | |
| func (ds *DistributedStorageImpl) rebalanceMonitor() {
 | |
| 	ticker := time.NewTicker(1 * time.Hour)
 | |
| 	defer ticker.Stop()
 | |
| 
 | |
| 	for range ticker.C {
 | |
| 		ds.rebalanceReplicas()
 | |
| 	}
 | |
| }
 | |
| 
 | |
| func (ds *DistributedStorageImpl) cleanupExpiredOperations() {
 | |
| 	// Cleanup expired consensus operations
 | |
| }
 | |
| 
 | |
| func (ds *DistributedStorageImpl) rebalanceReplicas() {
 | |
| 	// Rebalance replicas across healthy nodes
 | |
| }
 | |
| 
 | |
| // Placeholder method stubs for remaining functionality
 | |
| 
 | |
| func (ds *DistributedStorageImpl) storeOnNode(ctx context.Context, nodeID string, entry *DistributedEntry) error {
 | |
| 	// Store entry on specific node
 | |
| 	return nil
 | |
| }
 | |
| 
 | |
| func (ds *DistributedStorageImpl) retrieveFromNode(ctx context.Context, nodeID string, key string) (interface{}, error) {
 | |
| 	// Retrieve from specific node
 | |
| 	return nil, nil
 | |
| }
 | |
| 
 | |
| func (ds *DistributedStorageImpl) checkExistsOnNode(ctx context.Context, nodeID string, key string) (bool, error) {
 | |
| 	// Check if key exists on specific node
 | |
| 	return false, nil
 | |
| }
 | |
| 
 | |
| func (ds *DistributedStorageImpl) replicateToNode(ctx context.Context, nodeID string, key string, data interface{}) error {
 | |
| 	// Replicate data to specific node
 | |
| 	return nil
 | |
| }
 | |
| 
 | |
| func (ds *DistributedStorageImpl) selectAdditionalNodes(key string, currentReplicas []string, needed int) ([]string, error) {
 | |
| 	// Select additional nodes for replication
 | |
| 	return nil, nil
 | |
| }
 | |
| 
 | |
| func (ds *DistributedStorageImpl) syncWithNode(ctx context.Context, nodeID string) error {
 | |
| 	// Sync with specific node
 | |
| 	return nil
 | |
| }
 | |
| 
 | |
| func (ds *DistributedStorageImpl) executeConsensusOperation(ctx context.Context, op *ConsensusOperation, nodes []string) error {
 | |
| 	// Execute consensus operation across nodes
 | |
| 	return nil
 | |
| }
 |