chore: align slurp config and scaffolding

This commit is contained in:
anthonyrawlins
2025-09-27 21:03:12 +10:00
parent acc4361463
commit 4a77862289
47 changed files with 5133 additions and 4274 deletions

View File

@@ -8,69 +8,68 @@ import (
"time"
"chorus/pkg/dht"
"chorus/pkg/types"
)
// DistributedStorageImpl implements the DistributedStorage interface
type DistributedStorageImpl struct {
mu sync.RWMutex
dht dht.DHT
nodeID string
metrics *DistributedStorageStats
replicas map[string][]string // key -> replica node IDs
heartbeat *HeartbeatManager
consensus *ConsensusManager
options *DistributedStorageOptions
mu sync.RWMutex
dht dht.DHT
nodeID string
metrics *DistributedStorageStats
replicas map[string][]string // key -> replica node IDs
heartbeat *HeartbeatManager
consensus *ConsensusManager
options *DistributedStorageOptions
}
// HeartbeatManager manages node heartbeats and health
type HeartbeatManager struct {
mu sync.RWMutex
nodes map[string]*NodeHealth
mu sync.RWMutex
nodes map[string]*NodeHealth
heartbeatInterval time.Duration
timeoutThreshold time.Duration
stopCh chan struct{}
stopCh chan struct{}
}
// NodeHealth tracks the health of a distributed storage node
type NodeHealth struct {
NodeID string `json:"node_id"`
LastSeen time.Time `json:"last_seen"`
NodeID string `json:"node_id"`
LastSeen time.Time `json:"last_seen"`
Latency time.Duration `json:"latency"`
IsActive bool `json:"is_active"`
FailureCount int `json:"failure_count"`
Load float64 `json:"load"`
IsActive bool `json:"is_active"`
FailureCount int `json:"failure_count"`
Load float64 `json:"load"`
}
// ConsensusManager handles consensus operations for distributed storage
type ConsensusManager struct {
mu sync.RWMutex
pendingOps map[string]*ConsensusOperation
votingTimeout time.Duration
quorumSize int
mu sync.RWMutex
pendingOps map[string]*ConsensusOperation
votingTimeout time.Duration
quorumSize int
}
// ConsensusOperation represents a distributed operation requiring consensus
type ConsensusOperation struct {
ID string `json:"id"`
Type string `json:"type"`
Key string `json:"key"`
Data interface{} `json:"data"`
Initiator string `json:"initiator"`
Votes map[string]bool `json:"votes"`
CreatedAt time.Time `json:"created_at"`
Status ConsensusStatus `json:"status"`
Callback func(bool, error) `json:"-"`
ID string `json:"id"`
Type string `json:"type"`
Key string `json:"key"`
Data interface{} `json:"data"`
Initiator string `json:"initiator"`
Votes map[string]bool `json:"votes"`
CreatedAt time.Time `json:"created_at"`
Status ConsensusStatus `json:"status"`
Callback func(bool, error) `json:"-"`
}
// ConsensusStatus represents the status of a consensus operation
type ConsensusStatus string
const (
ConsensusPending ConsensusStatus = "pending"
ConsensusApproved ConsensusStatus = "approved"
ConsensusRejected ConsensusStatus = "rejected"
ConsensusTimeout ConsensusStatus = "timeout"
ConsensusPending ConsensusStatus = "pending"
ConsensusApproved ConsensusStatus = "approved"
ConsensusRejected ConsensusStatus = "rejected"
ConsensusTimeout ConsensusStatus = "timeout"
)
// NewDistributedStorage creates a new distributed storage implementation
@@ -83,9 +82,9 @@ func NewDistributedStorage(
options = &DistributedStoreOptions{
ReplicationFactor: 3,
ConsistencyLevel: ConsistencyQuorum,
Timeout: 30 * time.Second,
PreferLocal: true,
SyncMode: SyncAsync,
Timeout: 30 * time.Second,
PreferLocal: true,
SyncMode: SyncAsync,
}
}
@@ -98,10 +97,10 @@ func NewDistributedStorage(
LastRebalance: time.Now(),
},
heartbeat: &HeartbeatManager{
nodes: make(map[string]*NodeHealth),
nodes: make(map[string]*NodeHealth),
heartbeatInterval: 30 * time.Second,
timeoutThreshold: 90 * time.Second,
stopCh: make(chan struct{}),
stopCh: make(chan struct{}),
},
consensus: &ConsensusManager{
pendingOps: make(map[string]*ConsensusOperation),
@@ -125,8 +124,6 @@ func (ds *DistributedStorageImpl) Store(
data interface{},
options *DistributedStoreOptions,
) error {
start := time.Now()
if options == nil {
options = ds.options
}
@@ -179,7 +176,7 @@ func (ds *DistributedStorageImpl) Retrieve(
// Try local first if prefer local is enabled
if ds.options.PreferLocal {
if localData, err := ds.dht.Get(key); err == nil {
if localData, err := ds.dht.GetValue(ctx, key); err == nil {
return ds.deserializeEntry(localData)
}
}
@@ -226,25 +223,9 @@ func (ds *DistributedStorageImpl) Exists(
ctx context.Context,
key string,
) (bool, error) {
// Try local first
if ds.options.PreferLocal {
if exists, err := ds.dht.Exists(key); err == nil {
return exists, nil
}
if _, err := ds.dht.GetValue(ctx, key); err == nil {
return true, nil
}
// Check replicas
replicas, err := ds.getReplicationNodes(key)
if err != nil {
return false, fmt.Errorf("failed to get replication nodes: %w", err)
}
for _, nodeID := range replicas {
if exists, err := ds.checkExistsOnNode(ctx, nodeID, key); err == nil && exists {
return true, nil
}
}
return false, nil
}
@@ -306,10 +287,7 @@ func (ds *DistributedStorageImpl) FindReplicas(
// Sync synchronizes with other DHT nodes
func (ds *DistributedStorageImpl) Sync(ctx context.Context) error {
start := time.Now()
defer func() {
ds.metrics.LastRebalance = time.Now()
}()
ds.metrics.LastRebalance = time.Now()
// Get list of active nodes
activeNodes := ds.heartbeat.getActiveNodes()
@@ -346,7 +324,7 @@ func (ds *DistributedStorageImpl) GetDistributedStats() (*DistributedStorageStat
healthyReplicas := int64(0)
underReplicated := int64(0)
for key, replicas := range ds.replicas {
for _, replicas := range ds.replicas {
totalReplicas += int64(len(replicas))
healthy := 0
for _, nodeID := range replicas {
@@ -371,14 +349,14 @@ func (ds *DistributedStorageImpl) GetDistributedStats() (*DistributedStorageStat
// DistributedEntry represents a distributed storage entry
type DistributedEntry struct {
Key string `json:"key"`
Data []byte `json:"data"`
ReplicationFactor int `json:"replication_factor"`
Key string `json:"key"`
Data []byte `json:"data"`
ReplicationFactor int `json:"replication_factor"`
ConsistencyLevel ConsistencyLevel `json:"consistency_level"`
CreatedAt time.Time `json:"created_at"`
UpdatedAt time.Time `json:"updated_at"`
Version int64 `json:"version"`
Checksum string `json:"checksum"`
CreatedAt time.Time `json:"created_at"`
UpdatedAt time.Time `json:"updated_at"`
Version int64 `json:"version"`
Checksum string `json:"checksum"`
}
// Helper methods implementation
@@ -394,7 +372,7 @@ func (ds *DistributedStorageImpl) selectReplicationNodes(key string, replication
// This is a simplified version - production would use proper consistent hashing
nodes := make([]string, 0, replicationFactor)
hash := ds.calculateKeyHash(key)
// Select nodes in a deterministic way based on key hash
for i := 0; i < replicationFactor && i < len(activeNodes); i++ {
nodeIndex := (int(hash) + i) % len(activeNodes)
@@ -405,13 +383,13 @@ func (ds *DistributedStorageImpl) selectReplicationNodes(key string, replication
}
func (ds *DistributedStorageImpl) storeEventual(ctx context.Context, entry *DistributedEntry, nodes []string) error {
// Store asynchronously on all nodes
// Store asynchronously on all nodes for SEC-SLURP-1.1a replication policy
errCh := make(chan error, len(nodes))
for _, nodeID := range nodes {
go func(node string) {
err := ds.storeOnNode(ctx, node, entry)
errorCh <- err
errCh <- err
}(nodeID)
}
@@ -429,7 +407,7 @@ func (ds *DistributedStorageImpl) storeEventual(ctx context.Context, entry *Dist
// If first failed, try to get at least one success
timer := time.NewTimer(10 * time.Second)
defer timer.Stop()
for i := 1; i < len(nodes); i++ {
select {
case err := <-errCh:
@@ -445,13 +423,13 @@ func (ds *DistributedStorageImpl) storeEventual(ctx context.Context, entry *Dist
}
func (ds *DistributedStorageImpl) storeStrong(ctx context.Context, entry *DistributedEntry, nodes []string) error {
// Store synchronously on all nodes
// Store synchronously on all nodes per SEC-SLURP-1.1a durability target
errCh := make(chan error, len(nodes))
for _, nodeID := range nodes {
go func(node string) {
err := ds.storeOnNode(ctx, node, entry)
errorCh <- err
errCh <- err
}(nodeID)
}
@@ -476,21 +454,21 @@ func (ds *DistributedStorageImpl) storeStrong(ctx context.Context, entry *Distri
}
func (ds *DistributedStorageImpl) storeQuorum(ctx context.Context, entry *DistributedEntry, nodes []string) error {
// Store on quorum of nodes
// Store on quorum of nodes per SEC-SLURP-1.1a availability guardrail
quorumSize := (len(nodes) / 2) + 1
errCh := make(chan error, len(nodes))
for _, nodeID := range nodes {
go func(node string) {
err := ds.storeOnNode(ctx, node, entry)
errorCh <- err
errCh <- err
}(nodeID)
}
// Wait for quorum
successCount := 0
errorCount := 0
for i := 0; i < len(nodes); i++ {
select {
case err := <-errCh:
@@ -537,7 +515,7 @@ func (ds *DistributedStorageImpl) generateOperationID() string {
func (ds *DistributedStorageImpl) updateLatencyMetrics(latency time.Duration) {
ds.mu.Lock()
defer ds.mu.Unlock()
if ds.metrics.NetworkLatency == 0 {
ds.metrics.NetworkLatency = latency
} else {
@@ -553,11 +531,11 @@ func (ds *DistributedStorageImpl) updateLatencyMetrics(latency time.Duration) {
func (ds *DistributedStorageImpl) getReplicationNodes(key string) ([]string, error) {
ds.mu.RLock()
defer ds.mu.RUnlock()
if replicas, exists := ds.replicas[key]; exists {
return replicas, nil
}
// Fall back to consistent hashing
return ds.selectReplicationNodes(key, ds.options.ReplicationFactor)
}