Files
CHORUS/pkg/slurp/storage/distributed_storage.go
2025-09-28 13:45:43 +10:00

321 lines
7.7 KiB
Go

package storage
import (
"context"
"crypto/sha256"
"encoding/hex"
"encoding/json"
"fmt"
"sync"
"time"
"chorus/pkg/dht"
)
// DistributedStorageImpl is the minimal DHT-backed implementation used by SEC-SLURP 1.1.
// The libp2p layer already handles gossip/replication, so we focus on deterministic
// marshaling of entries and bookkeeping for the metrics surface that SLURP consumes.
type DistributedStorageImpl struct {
mu sync.RWMutex
dht dht.DHT
nodeID string
options *DistributedStoreOptions
metrics *DistributedStorageStats
replicas map[string][]string
}
// DistributedEntry is the canonical representation we persist in the DHT.
type DistributedEntry struct {
Key string `json:"key"`
Data []byte `json:"data"`
ReplicationFactor int `json:"replication_factor"`
ConsistencyLevel ConsistencyLevel `json:"consistency_level"`
CreatedAt time.Time `json:"created_at"`
UpdatedAt time.Time `json:"updated_at"`
Version int64 `json:"version"`
Checksum string `json:"checksum"`
Tombstone bool `json:"tombstone"`
}
// NewDistributedStorage wires a DHT-backed storage facade. When no node identifier is
// provided we synthesise one so metrics remain stable across restarts during testing.
func NewDistributedStorage(
dhtInstance dht.DHT,
nodeID string,
options *DistributedStorageOptions,
) *DistributedStorageImpl {
if options == nil {
options = &DistributedStoreOptions{
ReplicationFactor: 3,
ConsistencyLevel: ConsistencyQuorum,
Timeout: 30 * time.Second,
PreferLocal: true,
SyncMode: SyncAsync,
}
}
if nodeID == "" {
nodeID = fmt.Sprintf("slurp-node-%d", time.Now().UnixNano())
}
metrics := &DistributedStorageStats{
TotalNodes: 1,
ActiveNodes: 1,
FailedNodes: 0,
TotalReplicas: 0,
HealthyReplicas: 0,
UnderReplicated: 0,
LastRebalance: time.Now(),
}
return &DistributedStorageImpl{
dht: dhtInstance,
nodeID: nodeID,
options: options,
metrics: metrics,
replicas: make(map[string][]string),
}
}
// Store persists an encoded entry to the DHT.
func (ds *DistributedStorageImpl) Store(
ctx context.Context,
key string,
data interface{},
options *DistributedStoreOptions,
) error {
if ds.dht == nil {
return fmt.Errorf("distributed storage requires DHT instance")
}
storeOpts := options
if storeOpts == nil {
storeOpts = ds.options
}
payload, err := normalisePayload(data)
if err != nil {
return fmt.Errorf("failed to encode distributed payload: %w", err)
}
now := time.Now()
entry := &DistributedEntry{
Key: key,
Data: payload,
ReplicationFactor: storeOpts.ReplicationFactor,
ConsistencyLevel: storeOpts.ConsistencyLevel,
CreatedAt: now,
UpdatedAt: now,
Version: 1,
Checksum: ds.calculateChecksum(payload),
Tombstone: false,
}
encodedEntry, err := json.Marshal(entry)
if err != nil {
return fmt.Errorf("failed to marshal distributed entry: %w", err)
}
if err := ds.dht.PutValue(ctx, key, encodedEntry); err != nil {
return fmt.Errorf("dht put failed: %w", err)
}
_ = ds.dht.Provide(ctx, key)
ds.mu.Lock()
ds.replicas[key] = []string{ds.nodeID}
ds.metrics.TotalReplicas++
ds.metrics.HealthyReplicas++
ds.mu.Unlock()
return nil
}
// Retrieve loads an entry from the DHT and returns the raw payload bytes.
func (ds *DistributedStorageImpl) Retrieve(
ctx context.Context,
key string,
) (interface{}, error) {
if ds.dht == nil {
return nil, fmt.Errorf("distributed storage requires DHT instance")
}
raw, err := ds.dht.GetValue(ctx, key)
if err != nil {
return nil, err
}
entry, err := decodeEntry(raw)
if err != nil {
return nil, err
}
if entry.Tombstone {
return nil, fmt.Errorf("distributed entry %s is tombstoned", key)
}
return entry.Data, nil
}
// Delete writes a tombstone entry so future reads treat the key as absent.
func (ds *DistributedStorageImpl) Delete(
ctx context.Context,
key string,
) error {
if ds.dht == nil {
return fmt.Errorf("distributed storage requires DHT instance")
}
now := time.Now()
entry := &DistributedEntry{
Key: key,
Data: nil,
ReplicationFactor: ds.options.ReplicationFactor,
ConsistencyLevel: ds.options.ConsistencyLevel,
CreatedAt: now,
UpdatedAt: now,
Version: 1,
Checksum: "",
Tombstone: true,
}
encoded, err := json.Marshal(entry)
if err != nil {
return fmt.Errorf("failed to marshal tombstone: %w", err)
}
if err := ds.dht.PutValue(ctx, key, encoded); err != nil {
return fmt.Errorf("dht put tombstone failed: %w", err)
}
ds.mu.Lock()
delete(ds.replicas, key)
ds.mu.Unlock()
return nil
}
// Exists checks whether a non-tombstoned entry is present in the DHT.
func (ds *DistributedStorageImpl) Exists(
ctx context.Context,
key string,
) (bool, error) {
if ds.dht == nil {
return false, fmt.Errorf("distributed storage requires DHT instance")
}
raw, err := ds.dht.GetValue(ctx, key)
if err != nil {
return false, nil
}
entry, err := decodeEntry(raw)
if err != nil {
return false, err
}
return !entry.Tombstone, nil
}
// Replicate triggers another Provide call so the libp2p layer can advertise the key.
func (ds *DistributedStorageImpl) Replicate(
ctx context.Context,
key string,
replicationFactor int,
) error {
if ds.dht == nil {
return fmt.Errorf("distributed storage requires DHT instance")
}
ds.mu.RLock()
_, known := ds.replicas[key]
ds.mu.RUnlock()
if !known {
// Nothing cached locally, but we still attempt to provide the key.
if _, err := ds.dht.GetValue(ctx, key); err != nil {
return err
}
}
return ds.dht.Provide(ctx, key)
}
// FindReplicas reports the local bookkeeping for which nodes supplied a key.
func (ds *DistributedStorageImpl) FindReplicas(
ctx context.Context,
key string,
) ([]string, error) {
ds.mu.RLock()
defer ds.mu.RUnlock()
if replicas, ok := ds.replicas[key]; ok {
return append([]string{}, replicas...), nil
}
return []string{}, nil
}
// Sync re-advertises every known key. This keeps bring-up deterministic while the
// richer replication manager is still under construction.
func (ds *DistributedStorageImpl) Sync(ctx context.Context) error {
if ds.dht == nil {
return fmt.Errorf("distributed storage requires DHT instance")
}
ds.mu.RLock()
keys := make([]string, 0, len(ds.replicas))
for key := range ds.replicas {
keys = append(keys, key)
}
ds.mu.RUnlock()
for _, key := range keys {
if err := ds.dht.Provide(ctx, key); err != nil {
return err
}
}
return nil
}
// GetDistributedStats returns a snapshot of the adapter's internal counters.
func (ds *DistributedStorageImpl) GetDistributedStats() (*DistributedStorageStats, error) {
ds.mu.RLock()
defer ds.mu.RUnlock()
statsCopy := *ds.metrics
statsCopy.TotalReplicas = int64(len(ds.replicas))
statsCopy.HealthyReplicas = statsCopy.TotalReplicas
return &statsCopy, nil
}
// Helpers --------------------------------------------------------------------
func normalisePayload(data interface{}) ([]byte, error) {
switch v := data.(type) {
case nil:
return nil, nil
case []byte:
return v, nil
case json.RawMessage:
return []byte(v), nil
default:
return json.Marshal(v)
}
}
func decodeEntry(raw []byte) (*DistributedEntry, error) {
var entry DistributedEntry
if err := json.Unmarshal(raw, &entry); err != nil {
return nil, fmt.Errorf("failed to decode distributed entry: %w", err)
}
return &entry, nil
}
func (ds *DistributedStorageImpl) calculateChecksum(data []byte) string {
if len(data) == 0 {
return ""
}
sum := sha256.Sum256(data)
return hex.EncodeToString(sum[:])
}