chore: hook temporal persistence to dht
This commit is contained in:
@@ -2,6 +2,8 @@ package storage
|
||||
|
||||
import (
|
||||
"context"
|
||||
"crypto/sha256"
|
||||
"encoding/hex"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"sync"
|
||||
@@ -10,71 +12,35 @@ import (
|
||||
"chorus/pkg/dht"
|
||||
)
|
||||
|
||||
// DistributedStorageImpl implements the DistributedStorage interface
|
||||
// DistributedStorageImpl is the minimal DHT-backed implementation used by SEC-SLURP 1.1.
|
||||
// The libp2p layer already handles gossip/replication, so we focus on deterministic
|
||||
// marshaling of entries and bookkeeping for the metrics surface that SLURP consumes.
|
||||
type DistributedStorageImpl struct {
|
||||
mu sync.RWMutex
|
||||
dht dht.DHT
|
||||
nodeID string
|
||||
metrics *DistributedStorageStats
|
||||
replicas map[string][]string // key -> replica node IDs
|
||||
heartbeat *HeartbeatManager
|
||||
consensus *ConsensusManager
|
||||
options *DistributedStorageOptions
|
||||
mu sync.RWMutex
|
||||
dht dht.DHT
|
||||
nodeID string
|
||||
options *DistributedStoreOptions
|
||||
metrics *DistributedStorageStats
|
||||
replicas map[string][]string
|
||||
}
|
||||
|
||||
// HeartbeatManager manages node heartbeats and health
|
||||
type HeartbeatManager struct {
|
||||
mu sync.RWMutex
|
||||
nodes map[string]*NodeHealth
|
||||
heartbeatInterval time.Duration
|
||||
timeoutThreshold time.Duration
|
||||
stopCh chan struct{}
|
||||
// DistributedEntry is the canonical representation we persist in the DHT.
|
||||
type DistributedEntry struct {
|
||||
Key string `json:"key"`
|
||||
Data []byte `json:"data"`
|
||||
ReplicationFactor int `json:"replication_factor"`
|
||||
ConsistencyLevel ConsistencyLevel `json:"consistency_level"`
|
||||
CreatedAt time.Time `json:"created_at"`
|
||||
UpdatedAt time.Time `json:"updated_at"`
|
||||
Version int64 `json:"version"`
|
||||
Checksum string `json:"checksum"`
|
||||
Tombstone bool `json:"tombstone"`
|
||||
}
|
||||
|
||||
// NodeHealth tracks the health of a distributed storage node
|
||||
type NodeHealth struct {
|
||||
NodeID string `json:"node_id"`
|
||||
LastSeen time.Time `json:"last_seen"`
|
||||
Latency time.Duration `json:"latency"`
|
||||
IsActive bool `json:"is_active"`
|
||||
FailureCount int `json:"failure_count"`
|
||||
Load float64 `json:"load"`
|
||||
}
|
||||
|
||||
// ConsensusManager handles consensus operations for distributed storage
|
||||
type ConsensusManager struct {
|
||||
mu sync.RWMutex
|
||||
pendingOps map[string]*ConsensusOperation
|
||||
votingTimeout time.Duration
|
||||
quorumSize int
|
||||
}
|
||||
|
||||
// ConsensusOperation represents a distributed operation requiring consensus
|
||||
type ConsensusOperation struct {
|
||||
ID string `json:"id"`
|
||||
Type string `json:"type"`
|
||||
Key string `json:"key"`
|
||||
Data interface{} `json:"data"`
|
||||
Initiator string `json:"initiator"`
|
||||
Votes map[string]bool `json:"votes"`
|
||||
CreatedAt time.Time `json:"created_at"`
|
||||
Status ConsensusStatus `json:"status"`
|
||||
Callback func(bool, error) `json:"-"`
|
||||
}
|
||||
|
||||
// ConsensusStatus represents the status of a consensus operation
|
||||
type ConsensusStatus string
|
||||
|
||||
const (
|
||||
ConsensusPending ConsensusStatus = "pending"
|
||||
ConsensusApproved ConsensusStatus = "approved"
|
||||
ConsensusRejected ConsensusStatus = "rejected"
|
||||
ConsensusTimeout ConsensusStatus = "timeout"
|
||||
)
|
||||
|
||||
// NewDistributedStorage creates a new distributed storage implementation
|
||||
// NewDistributedStorage wires a DHT-backed storage facade. When no node identifier is
|
||||
// provided we synthesise one so metrics remain stable across restarts during testing.
|
||||
func NewDistributedStorage(
|
||||
dht dht.DHT,
|
||||
dhtInstance dht.DHT,
|
||||
nodeID string,
|
||||
options *DistributedStorageOptions,
|
||||
) *DistributedStorageImpl {
|
||||
@@ -88,576 +54,267 @@ func NewDistributedStorage(
|
||||
}
|
||||
}
|
||||
|
||||
ds := &DistributedStorageImpl{
|
||||
dht: dht,
|
||||
nodeID: nodeID,
|
||||
options: options,
|
||||
replicas: make(map[string][]string),
|
||||
metrics: &DistributedStorageStats{
|
||||
LastRebalance: time.Now(),
|
||||
},
|
||||
heartbeat: &HeartbeatManager{
|
||||
nodes: make(map[string]*NodeHealth),
|
||||
heartbeatInterval: 30 * time.Second,
|
||||
timeoutThreshold: 90 * time.Second,
|
||||
stopCh: make(chan struct{}),
|
||||
},
|
||||
consensus: &ConsensusManager{
|
||||
pendingOps: make(map[string]*ConsensusOperation),
|
||||
votingTimeout: 10 * time.Second,
|
||||
quorumSize: (options.ReplicationFactor / 2) + 1,
|
||||
},
|
||||
if nodeID == "" {
|
||||
nodeID = fmt.Sprintf("slurp-node-%d", time.Now().UnixNano())
|
||||
}
|
||||
|
||||
// Start background processes
|
||||
go ds.heartbeat.start()
|
||||
go ds.consensusMonitor()
|
||||
go ds.rebalanceMonitor()
|
||||
metrics := &DistributedStorageStats{
|
||||
TotalNodes: 1,
|
||||
ActiveNodes: 1,
|
||||
FailedNodes: 0,
|
||||
TotalReplicas: 0,
|
||||
HealthyReplicas: 0,
|
||||
UnderReplicated: 0,
|
||||
LastRebalance: time.Now(),
|
||||
}
|
||||
|
||||
return ds
|
||||
return &DistributedStorageImpl{
|
||||
dht: dhtInstance,
|
||||
nodeID: nodeID,
|
||||
options: options,
|
||||
metrics: metrics,
|
||||
replicas: make(map[string][]string),
|
||||
}
|
||||
}
|
||||
|
||||
// Store stores data in the distributed DHT with replication
|
||||
// Store persists an encoded entry to the DHT.
|
||||
func (ds *DistributedStorageImpl) Store(
|
||||
ctx context.Context,
|
||||
key string,
|
||||
data interface{},
|
||||
options *DistributedStoreOptions,
|
||||
) error {
|
||||
if options == nil {
|
||||
options = ds.options
|
||||
if ds.dht == nil {
|
||||
return fmt.Errorf("distributed storage requires DHT instance")
|
||||
}
|
||||
|
||||
// Serialize data
|
||||
dataBytes, err := json.Marshal(data)
|
||||
storeOpts := options
|
||||
if storeOpts == nil {
|
||||
storeOpts = ds.options
|
||||
}
|
||||
|
||||
payload, err := normalisePayload(data)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to marshal data: %w", err)
|
||||
return fmt.Errorf("failed to encode distributed payload: %w", err)
|
||||
}
|
||||
|
||||
// Create distributed entry
|
||||
now := time.Now()
|
||||
entry := &DistributedEntry{
|
||||
Key: key,
|
||||
Data: dataBytes,
|
||||
ReplicationFactor: options.ReplicationFactor,
|
||||
ConsistencyLevel: options.ConsistencyLevel,
|
||||
CreatedAt: time.Now(),
|
||||
Data: payload,
|
||||
ReplicationFactor: storeOpts.ReplicationFactor,
|
||||
ConsistencyLevel: storeOpts.ConsistencyLevel,
|
||||
CreatedAt: now,
|
||||
UpdatedAt: now,
|
||||
Version: 1,
|
||||
Checksum: ds.calculateChecksum(dataBytes),
|
||||
Checksum: ds.calculateChecksum(payload),
|
||||
Tombstone: false,
|
||||
}
|
||||
|
||||
// Determine target nodes for replication
|
||||
targetNodes, err := ds.selectReplicationNodes(key, options.ReplicationFactor)
|
||||
encodedEntry, err := json.Marshal(entry)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to select replication nodes: %w", err)
|
||||
return fmt.Errorf("failed to marshal distributed entry: %w", err)
|
||||
}
|
||||
|
||||
// Store based on consistency level
|
||||
switch options.ConsistencyLevel {
|
||||
case ConsistencyEventual:
|
||||
return ds.storeEventual(ctx, entry, targetNodes)
|
||||
case ConsistencyStrong:
|
||||
return ds.storeStrong(ctx, entry, targetNodes)
|
||||
case ConsistencyQuorum:
|
||||
return ds.storeQuorum(ctx, entry, targetNodes)
|
||||
default:
|
||||
return fmt.Errorf("unsupported consistency level: %s", options.ConsistencyLevel)
|
||||
}
|
||||
}
|
||||
|
||||
// Retrieve retrieves data from the distributed DHT
|
||||
func (ds *DistributedStorageImpl) Retrieve(
|
||||
ctx context.Context,
|
||||
key string,
|
||||
) (interface{}, error) {
|
||||
start := time.Now()
|
||||
defer func() {
|
||||
ds.updateLatencyMetrics(time.Since(start))
|
||||
}()
|
||||
|
||||
// Try local first if prefer local is enabled
|
||||
if ds.options.PreferLocal {
|
||||
if localData, err := ds.dht.GetValue(ctx, key); err == nil {
|
||||
return ds.deserializeEntry(localData)
|
||||
}
|
||||
if err := ds.dht.PutValue(ctx, key, encodedEntry); err != nil {
|
||||
return fmt.Errorf("dht put failed: %w", err)
|
||||
}
|
||||
|
||||
// Get replica nodes for this key
|
||||
replicas, err := ds.getReplicationNodes(key)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to get replication nodes: %w", err)
|
||||
}
|
||||
_ = ds.dht.Provide(ctx, key)
|
||||
|
||||
// Retrieve from replicas
|
||||
return ds.retrieveFromReplicas(ctx, key, replicas)
|
||||
}
|
||||
|
||||
// Delete removes data from the distributed DHT
|
||||
func (ds *DistributedStorageImpl) Delete(
|
||||
ctx context.Context,
|
||||
key string,
|
||||
) error {
|
||||
// Get replica nodes
|
||||
replicas, err := ds.getReplicationNodes(key)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to get replication nodes: %w", err)
|
||||
}
|
||||
|
||||
// Create consensus operation for deletion
|
||||
opID := ds.generateOperationID()
|
||||
op := &ConsensusOperation{
|
||||
ID: opID,
|
||||
Type: "delete",
|
||||
Key: key,
|
||||
Initiator: ds.nodeID,
|
||||
Votes: make(map[string]bool),
|
||||
CreatedAt: time.Now(),
|
||||
Status: ConsensusPending,
|
||||
}
|
||||
|
||||
// Execute consensus deletion
|
||||
return ds.executeConsensusOperation(ctx, op, replicas)
|
||||
}
|
||||
|
||||
// Exists checks if data exists in the DHT
|
||||
func (ds *DistributedStorageImpl) Exists(
|
||||
ctx context.Context,
|
||||
key string,
|
||||
) (bool, error) {
|
||||
if _, err := ds.dht.GetValue(ctx, key); err == nil {
|
||||
return true, nil
|
||||
}
|
||||
return false, nil
|
||||
}
|
||||
|
||||
// Replicate ensures data is replicated across nodes
|
||||
func (ds *DistributedStorageImpl) Replicate(
|
||||
ctx context.Context,
|
||||
key string,
|
||||
replicationFactor int,
|
||||
) error {
|
||||
// Get current replicas
|
||||
currentReplicas, err := ds.getReplicationNodes(key)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to get current replicas: %w", err)
|
||||
}
|
||||
|
||||
// If we already have enough replicas, return
|
||||
if len(currentReplicas) >= replicationFactor {
|
||||
return nil
|
||||
}
|
||||
|
||||
// Get the data to replicate
|
||||
data, err := ds.Retrieve(ctx, key)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to retrieve data for replication: %w", err)
|
||||
}
|
||||
|
||||
// Select additional nodes for replication
|
||||
neededReplicas := replicationFactor - len(currentReplicas)
|
||||
newNodes, err := ds.selectAdditionalNodes(key, currentReplicas, neededReplicas)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to select additional nodes: %w", err)
|
||||
}
|
||||
|
||||
// Replicate to new nodes
|
||||
for _, nodeID := range newNodes {
|
||||
if err := ds.replicateToNode(ctx, nodeID, key, data); err != nil {
|
||||
// Log but continue with other nodes
|
||||
fmt.Printf("Failed to replicate to node %s: %v\n", nodeID, err)
|
||||
continue
|
||||
}
|
||||
currentReplicas = append(currentReplicas, nodeID)
|
||||
}
|
||||
|
||||
// Update replica tracking
|
||||
ds.mu.Lock()
|
||||
ds.replicas[key] = currentReplicas
|
||||
ds.replicas[key] = []string{ds.nodeID}
|
||||
ds.metrics.TotalReplicas++
|
||||
ds.metrics.HealthyReplicas++
|
||||
ds.mu.Unlock()
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// FindReplicas finds all replicas of data
|
||||
// Retrieve loads an entry from the DHT and returns the raw payload bytes.
|
||||
func (ds *DistributedStorageImpl) Retrieve(
|
||||
ctx context.Context,
|
||||
key string,
|
||||
) (interface{}, error) {
|
||||
if ds.dht == nil {
|
||||
return nil, fmt.Errorf("distributed storage requires DHT instance")
|
||||
}
|
||||
|
||||
raw, err := ds.dht.GetValue(ctx, key)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
entry, err := decodeEntry(raw)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if entry.Tombstone {
|
||||
return nil, fmt.Errorf("distributed entry %s is tombstoned", key)
|
||||
}
|
||||
|
||||
return entry.Data, nil
|
||||
}
|
||||
|
||||
// Delete writes a tombstone entry so future reads treat the key as absent.
|
||||
func (ds *DistributedStorageImpl) Delete(
|
||||
ctx context.Context,
|
||||
key string,
|
||||
) error {
|
||||
if ds.dht == nil {
|
||||
return fmt.Errorf("distributed storage requires DHT instance")
|
||||
}
|
||||
|
||||
now := time.Now()
|
||||
entry := &DistributedEntry{
|
||||
Key: key,
|
||||
Data: nil,
|
||||
ReplicationFactor: ds.options.ReplicationFactor,
|
||||
ConsistencyLevel: ds.options.ConsistencyLevel,
|
||||
CreatedAt: now,
|
||||
UpdatedAt: now,
|
||||
Version: 1,
|
||||
Checksum: "",
|
||||
Tombstone: true,
|
||||
}
|
||||
|
||||
encoded, err := json.Marshal(entry)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to marshal tombstone: %w", err)
|
||||
}
|
||||
|
||||
if err := ds.dht.PutValue(ctx, key, encoded); err != nil {
|
||||
return fmt.Errorf("dht put tombstone failed: %w", err)
|
||||
}
|
||||
|
||||
ds.mu.Lock()
|
||||
delete(ds.replicas, key)
|
||||
ds.mu.Unlock()
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// Exists checks whether a non-tombstoned entry is present in the DHT.
|
||||
func (ds *DistributedStorageImpl) Exists(
|
||||
ctx context.Context,
|
||||
key string,
|
||||
) (bool, error) {
|
||||
if ds.dht == nil {
|
||||
return false, fmt.Errorf("distributed storage requires DHT instance")
|
||||
}
|
||||
|
||||
raw, err := ds.dht.GetValue(ctx, key)
|
||||
if err != nil {
|
||||
return false, nil
|
||||
}
|
||||
|
||||
entry, err := decodeEntry(raw)
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
|
||||
return !entry.Tombstone, nil
|
||||
}
|
||||
|
||||
// Replicate triggers another Provide call so the libp2p layer can advertise the key.
|
||||
func (ds *DistributedStorageImpl) Replicate(
|
||||
ctx context.Context,
|
||||
key string,
|
||||
replicationFactor int,
|
||||
) error {
|
||||
if ds.dht == nil {
|
||||
return fmt.Errorf("distributed storage requires DHT instance")
|
||||
}
|
||||
|
||||
ds.mu.RLock()
|
||||
_, known := ds.replicas[key]
|
||||
ds.mu.RUnlock()
|
||||
if !known {
|
||||
// Nothing cached locally, but we still attempt to provide the key.
|
||||
if _, err := ds.dht.GetValue(ctx, key); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
return ds.dht.Provide(ctx, key)
|
||||
}
|
||||
|
||||
// FindReplicas reports the local bookkeeping for which nodes supplied a key.
|
||||
func (ds *DistributedStorageImpl) FindReplicas(
|
||||
ctx context.Context,
|
||||
key string,
|
||||
) ([]string, error) {
|
||||
return ds.getReplicationNodes(key)
|
||||
ds.mu.RLock()
|
||||
defer ds.mu.RUnlock()
|
||||
|
||||
if replicas, ok := ds.replicas[key]; ok {
|
||||
return append([]string{}, replicas...), nil
|
||||
}
|
||||
|
||||
return []string{}, nil
|
||||
}
|
||||
|
||||
// Sync synchronizes with other DHT nodes
|
||||
// Sync re-advertises every known key. This keeps bring-up deterministic while the
|
||||
// richer replication manager is still under construction.
|
||||
func (ds *DistributedStorageImpl) Sync(ctx context.Context) error {
|
||||
ds.metrics.LastRebalance = time.Now()
|
||||
if ds.dht == nil {
|
||||
return fmt.Errorf("distributed storage requires DHT instance")
|
||||
}
|
||||
|
||||
// Get list of active nodes
|
||||
activeNodes := ds.heartbeat.getActiveNodes()
|
||||
ds.mu.RLock()
|
||||
keys := make([]string, 0, len(ds.replicas))
|
||||
for key := range ds.replicas {
|
||||
keys = append(keys, key)
|
||||
}
|
||||
ds.mu.RUnlock()
|
||||
|
||||
// Sync with each active node
|
||||
for _, nodeID := range activeNodes {
|
||||
if nodeID == ds.nodeID {
|
||||
continue // Skip self
|
||||
}
|
||||
|
||||
if err := ds.syncWithNode(ctx, nodeID); err != nil {
|
||||
// Log but continue with other nodes
|
||||
fmt.Printf("Failed to sync with node %s: %v\n", nodeID, err)
|
||||
continue
|
||||
for _, key := range keys {
|
||||
if err := ds.dht.Provide(ctx, key); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// GetDistributedStats returns distributed storage statistics
|
||||
// GetDistributedStats returns a snapshot of the adapter's internal counters.
|
||||
func (ds *DistributedStorageImpl) GetDistributedStats() (*DistributedStorageStats, error) {
|
||||
ds.mu.RLock()
|
||||
defer ds.mu.RUnlock()
|
||||
|
||||
// Update current stats
|
||||
activeNodes := ds.heartbeat.getActiveNodes()
|
||||
ds.metrics.ActiveNodes = len(activeNodes)
|
||||
ds.metrics.TotalNodes = len(ds.heartbeat.nodes)
|
||||
ds.metrics.FailedNodes = ds.metrics.TotalNodes - ds.metrics.ActiveNodes
|
||||
|
||||
// Calculate replica health
|
||||
totalReplicas := int64(0)
|
||||
healthyReplicas := int64(0)
|
||||
underReplicated := int64(0)
|
||||
|
||||
for _, replicas := range ds.replicas {
|
||||
totalReplicas += int64(len(replicas))
|
||||
healthy := 0
|
||||
for _, nodeID := range replicas {
|
||||
if ds.heartbeat.isNodeHealthy(nodeID) {
|
||||
healthy++
|
||||
}
|
||||
}
|
||||
healthyReplicas += int64(healthy)
|
||||
if healthy < ds.options.ReplicationFactor {
|
||||
underReplicated++
|
||||
}
|
||||
}
|
||||
|
||||
ds.metrics.TotalReplicas = totalReplicas
|
||||
ds.metrics.HealthyReplicas = healthyReplicas
|
||||
ds.metrics.UnderReplicated = underReplicated
|
||||
|
||||
// Return copy
|
||||
statsCopy := *ds.metrics
|
||||
statsCopy.TotalReplicas = int64(len(ds.replicas))
|
||||
statsCopy.HealthyReplicas = statsCopy.TotalReplicas
|
||||
return &statsCopy, nil
|
||||
}
|
||||
|
||||
// DistributedEntry represents a distributed storage entry
|
||||
type DistributedEntry struct {
|
||||
Key string `json:"key"`
|
||||
Data []byte `json:"data"`
|
||||
ReplicationFactor int `json:"replication_factor"`
|
||||
ConsistencyLevel ConsistencyLevel `json:"consistency_level"`
|
||||
CreatedAt time.Time `json:"created_at"`
|
||||
UpdatedAt time.Time `json:"updated_at"`
|
||||
Version int64 `json:"version"`
|
||||
Checksum string `json:"checksum"`
|
||||
// Helpers --------------------------------------------------------------------
|
||||
|
||||
func normalisePayload(data interface{}) ([]byte, error) {
|
||||
switch v := data.(type) {
|
||||
case nil:
|
||||
return nil, nil
|
||||
case []byte:
|
||||
return v, nil
|
||||
case json.RawMessage:
|
||||
return []byte(v), nil
|
||||
default:
|
||||
return json.Marshal(v)
|
||||
}
|
||||
}
|
||||
|
||||
// Helper methods implementation
|
||||
|
||||
func (ds *DistributedStorageImpl) selectReplicationNodes(key string, replicationFactor int) ([]string, error) {
|
||||
// Get active nodes
|
||||
activeNodes := ds.heartbeat.getActiveNodes()
|
||||
if len(activeNodes) < replicationFactor {
|
||||
return nil, fmt.Errorf("insufficient active nodes: need %d, have %d", replicationFactor, len(activeNodes))
|
||||
func decodeEntry(raw []byte) (*DistributedEntry, error) {
|
||||
var entry DistributedEntry
|
||||
if err := json.Unmarshal(raw, &entry); err != nil {
|
||||
return nil, fmt.Errorf("failed to decode distributed entry: %w", err)
|
||||
}
|
||||
|
||||
// Use consistent hashing to determine primary replicas
|
||||
// This is a simplified version - production would use proper consistent hashing
|
||||
nodes := make([]string, 0, replicationFactor)
|
||||
hash := ds.calculateKeyHash(key)
|
||||
|
||||
// Select nodes in a deterministic way based on key hash
|
||||
for i := 0; i < replicationFactor && i < len(activeNodes); i++ {
|
||||
nodeIndex := (int(hash) + i) % len(activeNodes)
|
||||
nodes = append(nodes, activeNodes[nodeIndex])
|
||||
}
|
||||
|
||||
return nodes, nil
|
||||
return &entry, nil
|
||||
}
|
||||
|
||||
func (ds *DistributedStorageImpl) storeEventual(ctx context.Context, entry *DistributedEntry, nodes []string) error {
|
||||
// Store asynchronously on all nodes for SEC-SLURP-1.1a replication policy
|
||||
errCh := make(chan error, len(nodes))
|
||||
|
||||
for _, nodeID := range nodes {
|
||||
go func(node string) {
|
||||
err := ds.storeOnNode(ctx, node, entry)
|
||||
errCh <- err
|
||||
}(nodeID)
|
||||
}
|
||||
|
||||
// Don't wait for all nodes - eventual consistency
|
||||
// Just ensure at least one succeeds
|
||||
select {
|
||||
case err := <-errCh:
|
||||
if err == nil {
|
||||
return nil // First success
|
||||
}
|
||||
case <-time.After(5 * time.Second):
|
||||
return fmt.Errorf("timeout waiting for eventual store")
|
||||
}
|
||||
|
||||
// If first failed, try to get at least one success
|
||||
timer := time.NewTimer(10 * time.Second)
|
||||
defer timer.Stop()
|
||||
|
||||
for i := 1; i < len(nodes); i++ {
|
||||
select {
|
||||
case err := <-errCh:
|
||||
if err == nil {
|
||||
return nil
|
||||
}
|
||||
case <-timer.C:
|
||||
return fmt.Errorf("timeout waiting for eventual store success")
|
||||
}
|
||||
}
|
||||
|
||||
return fmt.Errorf("failed to store on any node")
|
||||
}
|
||||
|
||||
func (ds *DistributedStorageImpl) storeStrong(ctx context.Context, entry *DistributedEntry, nodes []string) error {
|
||||
// Store synchronously on all nodes per SEC-SLURP-1.1a durability target
|
||||
errCh := make(chan error, len(nodes))
|
||||
|
||||
for _, nodeID := range nodes {
|
||||
go func(node string) {
|
||||
err := ds.storeOnNode(ctx, node, entry)
|
||||
errCh <- err
|
||||
}(nodeID)
|
||||
}
|
||||
|
||||
// Wait for all nodes to complete
|
||||
var errors []error
|
||||
for i := 0; i < len(nodes); i++ {
|
||||
select {
|
||||
case err := <-errCh:
|
||||
if err != nil {
|
||||
errors = append(errors, err)
|
||||
}
|
||||
case <-time.After(30 * time.Second):
|
||||
return fmt.Errorf("timeout waiting for strong consistency store")
|
||||
}
|
||||
}
|
||||
|
||||
if len(errors) > 0 {
|
||||
return fmt.Errorf("strong consistency store failed: %v", errors)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (ds *DistributedStorageImpl) storeQuorum(ctx context.Context, entry *DistributedEntry, nodes []string) error {
|
||||
// Store on quorum of nodes per SEC-SLURP-1.1a availability guardrail
|
||||
quorumSize := (len(nodes) / 2) + 1
|
||||
errCh := make(chan error, len(nodes))
|
||||
|
||||
for _, nodeID := range nodes {
|
||||
go func(node string) {
|
||||
err := ds.storeOnNode(ctx, node, entry)
|
||||
errCh <- err
|
||||
}(nodeID)
|
||||
}
|
||||
|
||||
// Wait for quorum
|
||||
successCount := 0
|
||||
errorCount := 0
|
||||
|
||||
for i := 0; i < len(nodes); i++ {
|
||||
select {
|
||||
case err := <-errCh:
|
||||
if err == nil {
|
||||
successCount++
|
||||
if successCount >= quorumSize {
|
||||
return nil // Quorum achieved
|
||||
}
|
||||
} else {
|
||||
errorCount++
|
||||
if errorCount > len(nodes)-quorumSize {
|
||||
return fmt.Errorf("quorum store failed: too many errors")
|
||||
}
|
||||
}
|
||||
case <-time.After(20 * time.Second):
|
||||
return fmt.Errorf("timeout waiting for quorum store")
|
||||
}
|
||||
}
|
||||
|
||||
return fmt.Errorf("quorum store failed")
|
||||
}
|
||||
|
||||
// Additional helper method implementations would continue here...
|
||||
// This is a substantial implementation showing the architecture
|
||||
|
||||
func (ds *DistributedStorageImpl) calculateChecksum(data []byte) string {
|
||||
// Simple checksum calculation - would use proper hashing in production
|
||||
return fmt.Sprintf("%x", len(data)) // Placeholder
|
||||
}
|
||||
|
||||
func (ds *DistributedStorageImpl) calculateKeyHash(key string) uint32 {
|
||||
// Simple hash function - would use proper consistent hashing in production
|
||||
hash := uint32(0)
|
||||
for _, c := range key {
|
||||
hash = hash*31 + uint32(c)
|
||||
if len(data) == 0 {
|
||||
return ""
|
||||
}
|
||||
return hash
|
||||
}
|
||||
|
||||
func (ds *DistributedStorageImpl) generateOperationID() string {
|
||||
return fmt.Sprintf("%s-%d", ds.nodeID, time.Now().UnixNano())
|
||||
}
|
||||
|
||||
func (ds *DistributedStorageImpl) updateLatencyMetrics(latency time.Duration) {
|
||||
ds.mu.Lock()
|
||||
defer ds.mu.Unlock()
|
||||
|
||||
if ds.metrics.NetworkLatency == 0 {
|
||||
ds.metrics.NetworkLatency = latency
|
||||
} else {
|
||||
// Exponential moving average
|
||||
ds.metrics.NetworkLatency = time.Duration(
|
||||
float64(ds.metrics.NetworkLatency)*0.8 + float64(latency)*0.2,
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
// Placeholder implementations for remaining methods
|
||||
|
||||
func (ds *DistributedStorageImpl) getReplicationNodes(key string) ([]string, error) {
|
||||
ds.mu.RLock()
|
||||
defer ds.mu.RUnlock()
|
||||
|
||||
if replicas, exists := ds.replicas[key]; exists {
|
||||
return replicas, nil
|
||||
}
|
||||
|
||||
// Fall back to consistent hashing
|
||||
return ds.selectReplicationNodes(key, ds.options.ReplicationFactor)
|
||||
}
|
||||
|
||||
func (ds *DistributedStorageImpl) retrieveFromReplicas(ctx context.Context, key string, replicas []string) (interface{}, error) {
|
||||
// Try each replica until success
|
||||
for _, nodeID := range replicas {
|
||||
if data, err := ds.retrieveFromNode(ctx, nodeID, key); err == nil {
|
||||
return ds.deserializeEntry(data)
|
||||
}
|
||||
}
|
||||
return nil, fmt.Errorf("failed to retrieve from any replica")
|
||||
}
|
||||
|
||||
func (ds *DistributedStorageImpl) deserializeEntry(data interface{}) (interface{}, error) {
|
||||
// Deserialize distributed entry
|
||||
return data, nil // Placeholder
|
||||
}
|
||||
|
||||
// Heartbeat manager methods
|
||||
|
||||
func (hm *HeartbeatManager) start() {
|
||||
ticker := time.NewTicker(hm.heartbeatInterval)
|
||||
defer ticker.Stop()
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-ticker.C:
|
||||
hm.checkNodeHealth()
|
||||
case <-hm.stopCh:
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (hm *HeartbeatManager) getActiveNodes() []string {
|
||||
hm.mu.RLock()
|
||||
defer hm.mu.RUnlock()
|
||||
|
||||
var activeNodes []string
|
||||
for nodeID, health := range hm.nodes {
|
||||
if health.IsActive {
|
||||
activeNodes = append(activeNodes, nodeID)
|
||||
}
|
||||
}
|
||||
return activeNodes
|
||||
}
|
||||
|
||||
func (hm *HeartbeatManager) isNodeHealthy(nodeID string) bool {
|
||||
hm.mu.RLock()
|
||||
defer hm.mu.RUnlock()
|
||||
|
||||
health, exists := hm.nodes[nodeID]
|
||||
return exists && health.IsActive
|
||||
}
|
||||
|
||||
func (hm *HeartbeatManager) checkNodeHealth() {
|
||||
// Placeholder implementation
|
||||
// Would send heartbeats and update node health
|
||||
}
|
||||
|
||||
// Consensus monitor and other background processes
|
||||
|
||||
func (ds *DistributedStorageImpl) consensusMonitor() {
|
||||
ticker := time.NewTicker(5 * time.Second)
|
||||
defer ticker.Stop()
|
||||
|
||||
for range ticker.C {
|
||||
ds.cleanupExpiredOperations()
|
||||
}
|
||||
}
|
||||
|
||||
func (ds *DistributedStorageImpl) rebalanceMonitor() {
|
||||
ticker := time.NewTicker(1 * time.Hour)
|
||||
defer ticker.Stop()
|
||||
|
||||
for range ticker.C {
|
||||
ds.rebalanceReplicas()
|
||||
}
|
||||
}
|
||||
|
||||
func (ds *DistributedStorageImpl) cleanupExpiredOperations() {
|
||||
// Cleanup expired consensus operations
|
||||
}
|
||||
|
||||
func (ds *DistributedStorageImpl) rebalanceReplicas() {
|
||||
// Rebalance replicas across healthy nodes
|
||||
}
|
||||
|
||||
// Placeholder method stubs for remaining functionality
|
||||
|
||||
func (ds *DistributedStorageImpl) storeOnNode(ctx context.Context, nodeID string, entry *DistributedEntry) error {
|
||||
// Store entry on specific node
|
||||
return nil
|
||||
}
|
||||
|
||||
func (ds *DistributedStorageImpl) retrieveFromNode(ctx context.Context, nodeID string, key string) (interface{}, error) {
|
||||
// Retrieve from specific node
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
func (ds *DistributedStorageImpl) checkExistsOnNode(ctx context.Context, nodeID string, key string) (bool, error) {
|
||||
// Check if key exists on specific node
|
||||
return false, nil
|
||||
}
|
||||
|
||||
func (ds *DistributedStorageImpl) replicateToNode(ctx context.Context, nodeID string, key string, data interface{}) error {
|
||||
// Replicate data to specific node
|
||||
return nil
|
||||
}
|
||||
|
||||
func (ds *DistributedStorageImpl) selectAdditionalNodes(key string, currentReplicas []string, needed int) ([]string, error) {
|
||||
// Select additional nodes for replication
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
func (ds *DistributedStorageImpl) syncWithNode(ctx context.Context, nodeID string) error {
|
||||
// Sync with specific node
|
||||
return nil
|
||||
}
|
||||
|
||||
func (ds *DistributedStorageImpl) executeConsensusOperation(ctx context.Context, op *ConsensusOperation, nodes []string) error {
|
||||
// Execute consensus operation across nodes
|
||||
return nil
|
||||
sum := sha256.Sum256(data)
|
||||
return hex.EncodeToString(sum[:])
|
||||
}
|
||||
|
||||
@@ -534,8 +534,40 @@ func (pm *persistenceManagerImpl) loadFromLocalStorage(ctx context.Context) erro
|
||||
}
|
||||
|
||||
func (pm *persistenceManagerImpl) loadFromDistributedStorage(ctx context.Context) error {
|
||||
// Similar to local storage but using distributed store
|
||||
// Implementation would be similar to loadFromLocalStorage
|
||||
if pm.distributedStore == nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
data, err := pm.distributedStore.Retrieve(ctx, pm.generateGraphKey())
|
||||
if err != nil {
|
||||
// No remote snapshot yet
|
||||
return nil
|
||||
}
|
||||
|
||||
var snapshot GraphSnapshot
|
||||
switch raw := data.(type) {
|
||||
case []byte:
|
||||
if len(raw) == 0 {
|
||||
return nil
|
||||
}
|
||||
if err := json.Unmarshal(raw, &snapshot); err != nil {
|
||||
return fmt.Errorf("failed to decode distributed snapshot: %w", err)
|
||||
}
|
||||
case json.RawMessage:
|
||||
if err := json.Unmarshal(raw, &snapshot); err != nil {
|
||||
return fmt.Errorf("failed to decode distributed snapshot: %w", err)
|
||||
}
|
||||
default:
|
||||
encoded, marshalErr := json.Marshal(raw)
|
||||
if marshalErr != nil {
|
||||
return fmt.Errorf("failed to marshal distributed snapshot payload: %w", marshalErr)
|
||||
}
|
||||
if err := json.Unmarshal(encoded, &snapshot); err != nil {
|
||||
return fmt.Errorf("failed to decode distributed snapshot: %w", err)
|
||||
}
|
||||
}
|
||||
|
||||
pm.applySnapshot(&snapshot)
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -588,6 +620,51 @@ func (pm *persistenceManagerImpl) createGraphSnapshot() (*GraphSnapshot, error)
|
||||
return snapshot, nil
|
||||
}
|
||||
|
||||
func (pm *persistenceManagerImpl) applySnapshot(snapshot *GraphSnapshot) {
|
||||
if snapshot == nil {
|
||||
return
|
||||
}
|
||||
|
||||
pm.graph.mu.Lock()
|
||||
defer pm.graph.mu.Unlock()
|
||||
|
||||
pm.graph.nodes = make(map[string]*TemporalNode, len(snapshot.Nodes))
|
||||
pm.graph.addressToNodes = make(map[string][]*TemporalNode, len(snapshot.Nodes))
|
||||
pm.graph.influences = make(map[string][]string, len(snapshot.Influences))
|
||||
pm.graph.influencedBy = make(map[string][]string, len(snapshot.InfluencedBy))
|
||||
pm.graph.decisions = make(map[string]*DecisionMetadata, len(snapshot.Decisions))
|
||||
pm.graph.decisionToNodes = make(map[string][]*TemporalNode)
|
||||
pm.graph.pathCache = make(map[string][]*DecisionStep)
|
||||
pm.graph.metricsCache = make(map[string]interface{})
|
||||
|
||||
for id, node := range snapshot.Nodes {
|
||||
pm.graph.nodes[id] = node
|
||||
|
||||
addressKey := node.UCXLAddress.String()
|
||||
pm.graph.addressToNodes[addressKey] = append(pm.graph.addressToNodes[addressKey], node)
|
||||
|
||||
if influences, ok := snapshot.Influences[id]; ok {
|
||||
pm.graph.influences[id] = append([]string(nil), influences...)
|
||||
} else {
|
||||
pm.graph.influences[id] = make([]string, 0)
|
||||
}
|
||||
|
||||
if influencedBy, ok := snapshot.InfluencedBy[id]; ok {
|
||||
pm.graph.influencedBy[id] = append([]string(nil), influencedBy...)
|
||||
} else {
|
||||
pm.graph.influencedBy[id] = make([]string, 0)
|
||||
}
|
||||
|
||||
if node.DecisionID != "" {
|
||||
pm.graph.decisionToNodes[node.DecisionID] = append(pm.graph.decisionToNodes[node.DecisionID], node)
|
||||
}
|
||||
}
|
||||
|
||||
for id, decision := range snapshot.Decisions {
|
||||
pm.graph.decisions[id] = decision
|
||||
}
|
||||
}
|
||||
|
||||
func (pm *persistenceManagerImpl) getRemoteSnapshot(ctx context.Context) (*GraphSnapshot, error) {
|
||||
key := pm.generateGraphKey()
|
||||
|
||||
@@ -596,12 +673,27 @@ func (pm *persistenceManagerImpl) getRemoteSnapshot(ctx context.Context) (*Graph
|
||||
return nil, err
|
||||
}
|
||||
|
||||
var snapshot *GraphSnapshot
|
||||
if err := json.Unmarshal(data.([]byte), &snapshot); err != nil {
|
||||
return nil, fmt.Errorf("failed to unmarshal remote snapshot: %w", err)
|
||||
var snapshot GraphSnapshot
|
||||
switch raw := data.(type) {
|
||||
case []byte:
|
||||
if err := json.Unmarshal(raw, &snapshot); err != nil {
|
||||
return nil, fmt.Errorf("failed to unmarshal remote snapshot: %w", err)
|
||||
}
|
||||
case json.RawMessage:
|
||||
if err := json.Unmarshal(raw, &snapshot); err != nil {
|
||||
return nil, fmt.Errorf("failed to unmarshal remote snapshot: %w", err)
|
||||
}
|
||||
default:
|
||||
encoded, marshalErr := json.Marshal(raw)
|
||||
if marshalErr != nil {
|
||||
return nil, fmt.Errorf("failed to marshal remote snapshot payload: %w", marshalErr)
|
||||
}
|
||||
if err := json.Unmarshal(encoded, &snapshot); err != nil {
|
||||
return nil, fmt.Errorf("failed to unmarshal remote snapshot: %w", err)
|
||||
}
|
||||
}
|
||||
|
||||
return snapshot, nil
|
||||
return &snapshot, nil
|
||||
}
|
||||
|
||||
func (pm *persistenceManagerImpl) performBidirectionalSync(ctx context.Context, local, remote *GraphSnapshot, result *SyncResult) error {
|
||||
|
||||
Reference in New Issue
Block a user