diff --git a/docs/development/prompt-derived-role-policy-brief.md b/docs/development/prompt-derived-role-policy-brief.md new file mode 100644 index 0000000..a8ac432 --- /dev/null +++ b/docs/development/prompt-derived-role-policy-brief.md @@ -0,0 +1,54 @@ +# Prompt-Derived Role Policy Design Brief + +## Background +WHOOSH currently loads a curated library of role prompts at startup. These prompts already capture the intended responsibilities, guardrails, and collaboration patterns for each role. SLURP and SHHH need a consistent access-control baseline so that temporal records, UCXL snapshots, and DHT envelopes stay enforceable without depending on ad-hoc UI configuration. Today the access policies are loosely defined, leading to drift between runtime behaviour and storage enforcement. + +## Goals +- Use the existing prompt catalog as the authoritative source of role definitions and minimum privileges. +- Generate deterministic ACL templates that SLURP, SHHH, and distribution workers can rely on without manual setup. +- Allow optional administrator overrides via WHOOSH UI while keeping the default hierarchy intact and auditable. +- Provide a migration path so temporal/DHT writers can seal envelopes with correct permissions immediately. + +## Proposed Architecture + +### 1. Prompt → Policy Mapper +- Build a WHOOSH service that parses the runtime prompt bundle and emits structured policy descriptors (per role, per project scope). +- Each descriptor should include: capability tags (read scope, write scope, pin, prune, audit), allowed UCXL address patterns, and SHHH classification levels. +- Output format: versioned JSON or YAML stored under UCXL (e.g., `ucxl://whoosh:policy@global:roles/#/policy/v1`). + +### 2. Override Layer (Optional) +- WHOOSH UI can expose an editor that writes delta documents back to UCXL (`…/policy-overrides/v1`). +- Overrides apply as additive or subtractive modifiers; the base policy always comes from the prompt-derived descriptor. +- Store change history in UCXL so BUBBLE can audit adjustments. + +### 3. Consumer Integrations +- **SLURP**: when sealing temporal/DHT envelopes, reference the policy descriptors to choose ACLs and derive role-based encryption keys. +- **SHHH**: load the same descriptors to provision/rotate keys per capability tier; reject envelopes that lack matching policy entries. +- **WHOOSH runtime**: cache the generated descriptors and refresh if prompts or overrides change; surface errors if a prompt lacks policy metadata. + +## Deliverables +1. Policy mapper module with tests (likely Go for WHOOSH backend; consider reusing ucxl-validator helpers). +2. Schema definition for policy documents (include example for engineer, curator, archivist roles). +3. SLURP + SHHH integration patches that read the policy documents during startup. +4. Migration script that seeds the initial policy document from the current prompt set. + +## Implementation Notes +- Keep everything ASCII and version the schema so future role prompts can introduce new capability tags safely. +- For MVP, focus on read/write/pin/prune/audit capabilities; expand later for fine-grained scopes (e.g., project-only roles). +- Ensure policy documents are sealed/encrypted with SHHH before storing in DHT/UCXL. +- Expose metrics/logging when mismatches occur (e.g., temporal writer cannot find a policy entry for a role). + +## Risks & Mitigations +- **Prompt drift**: If prompts change without regenerating policies, enforcement lags. Mitigate with a checksum check when WHOOSH loads prompts; regenerate automatically on change. +- **Override misuse**: Admins could over-provision. Mitigate with BUBBLE alerts when overrides expand scope beyond approved ranges. +- **Performance**: Policy lookups must be fast. Cache descriptors in memory and invalidate on UCXL changes. + +## Open Questions +- Do we need per-project or per-tenant policy branches, or is a global default sufficient initially? +- Should BACKBEAT or other automation agents be treated as roles in this hierarchy or as workflow triggers referencing existing roles? +- How will we bootstrap SHHH keys for new roles created solely via overrides? + +## References +- Existing prompt catalog: `project-queues/active/WHOOSH/prompts/` +- Temporal wiring roadmap: `project-queues/active/CHORUS/docs/development/sec-slurp-ucxl-beacon-pin-steward.md` +- Prior policy discussions (for context): `project-queues/active/CHORUS/docs/progress/report-SEC-SLURP-1.1.md` diff --git a/docs/progress/report-SEC-SLURP-1.1.md b/docs/progress/report-SEC-SLURP-1.1.md index 4fa8161..e1320d1 100644 --- a/docs/progress/report-SEC-SLURP-1.1.md +++ b/docs/progress/report-SEC-SLURP-1.1.md @@ -1,6 +1,7 @@ # SEC-SLURP 1.1 Persistence Wiring Report ## Summary of Changes +- Wired the distributed storage adapter to the live DHT interface and taught the temporal persistence manager to load and synchronise graph snapshots from remote replicas, enabling `SynchronizeGraph` and cold starts to use real replication data. - Restored the `slurp_full` temporal test suite by migrating influence adjacency across versions and cleaning compaction pruning to respect historical nodes. - Connected the temporal graph to the persistence manager so new versions flush through the configured storage layers and update the context store when role metadata is available. - Hardened the temporal package for the default build by aligning persistence helpers with the storage API (batch items now feed context payloads, conflict resolution fields match `types.go`), and by introducing a shared `storage.ErrNotFound` sentinel for mock stores and stub implementations. @@ -16,7 +17,7 @@ - Attempted `GOWORK=off go test ./pkg/slurp`; the original authority-level blocker is resolved, but builds still fail in storage/index code due to remaining stub work (e.g., Bleve queries, DHT helpers). ## Recommended Next Steps -- Connect temporal persistence with the real distributed/DHT layers once available so sync/backup workers run against live replication targets. +- Wire SLURP runtime initialisation to instantiate the DHT-backed temporal system (context store, encryption hooks, replication tests) so the live stack exercises the new adapter. - Stub the remaining storage/index dependencies (Bleve query scaffolding, UCXL helpers, `errorCh` queues, cache regex usage) or neutralize the heavy modules so that `GOWORK=off go test ./pkg/slurp` compiles and runs. - Feed the durable store into the resolver and temporal graph implementations to finish the SEC-SLURP 1.1 milestone once the package builds cleanly. - Extend Prometheus metrics/logging to track cache hit/miss ratios plus persistence errors for observability alignment. diff --git a/pkg/slurp/storage/distributed_storage.go b/pkg/slurp/storage/distributed_storage.go index 6ee4977..0f9bc01 100644 --- a/pkg/slurp/storage/distributed_storage.go +++ b/pkg/slurp/storage/distributed_storage.go @@ -2,6 +2,8 @@ package storage import ( "context" + "crypto/sha256" + "encoding/hex" "encoding/json" "fmt" "sync" @@ -10,71 +12,35 @@ import ( "chorus/pkg/dht" ) -// DistributedStorageImpl implements the DistributedStorage interface +// DistributedStorageImpl is the minimal DHT-backed implementation used by SEC-SLURP 1.1. +// The libp2p layer already handles gossip/replication, so we focus on deterministic +// marshaling of entries and bookkeeping for the metrics surface that SLURP consumes. type DistributedStorageImpl struct { - mu sync.RWMutex - dht dht.DHT - nodeID string - metrics *DistributedStorageStats - replicas map[string][]string // key -> replica node IDs - heartbeat *HeartbeatManager - consensus *ConsensusManager - options *DistributedStorageOptions + mu sync.RWMutex + dht dht.DHT + nodeID string + options *DistributedStoreOptions + metrics *DistributedStorageStats + replicas map[string][]string } -// HeartbeatManager manages node heartbeats and health -type HeartbeatManager struct { - mu sync.RWMutex - nodes map[string]*NodeHealth - heartbeatInterval time.Duration - timeoutThreshold time.Duration - stopCh chan struct{} +// DistributedEntry is the canonical representation we persist in the DHT. +type DistributedEntry struct { + Key string `json:"key"` + Data []byte `json:"data"` + ReplicationFactor int `json:"replication_factor"` + ConsistencyLevel ConsistencyLevel `json:"consistency_level"` + CreatedAt time.Time `json:"created_at"` + UpdatedAt time.Time `json:"updated_at"` + Version int64 `json:"version"` + Checksum string `json:"checksum"` + Tombstone bool `json:"tombstone"` } -// NodeHealth tracks the health of a distributed storage node -type NodeHealth struct { - NodeID string `json:"node_id"` - LastSeen time.Time `json:"last_seen"` - Latency time.Duration `json:"latency"` - IsActive bool `json:"is_active"` - FailureCount int `json:"failure_count"` - Load float64 `json:"load"` -} - -// ConsensusManager handles consensus operations for distributed storage -type ConsensusManager struct { - mu sync.RWMutex - pendingOps map[string]*ConsensusOperation - votingTimeout time.Duration - quorumSize int -} - -// ConsensusOperation represents a distributed operation requiring consensus -type ConsensusOperation struct { - ID string `json:"id"` - Type string `json:"type"` - Key string `json:"key"` - Data interface{} `json:"data"` - Initiator string `json:"initiator"` - Votes map[string]bool `json:"votes"` - CreatedAt time.Time `json:"created_at"` - Status ConsensusStatus `json:"status"` - Callback func(bool, error) `json:"-"` -} - -// ConsensusStatus represents the status of a consensus operation -type ConsensusStatus string - -const ( - ConsensusPending ConsensusStatus = "pending" - ConsensusApproved ConsensusStatus = "approved" - ConsensusRejected ConsensusStatus = "rejected" - ConsensusTimeout ConsensusStatus = "timeout" -) - -// NewDistributedStorage creates a new distributed storage implementation +// NewDistributedStorage wires a DHT-backed storage facade. When no node identifier is +// provided we synthesise one so metrics remain stable across restarts during testing. func NewDistributedStorage( - dht dht.DHT, + dhtInstance dht.DHT, nodeID string, options *DistributedStorageOptions, ) *DistributedStorageImpl { @@ -88,576 +54,267 @@ func NewDistributedStorage( } } - ds := &DistributedStorageImpl{ - dht: dht, - nodeID: nodeID, - options: options, - replicas: make(map[string][]string), - metrics: &DistributedStorageStats{ - LastRebalance: time.Now(), - }, - heartbeat: &HeartbeatManager{ - nodes: make(map[string]*NodeHealth), - heartbeatInterval: 30 * time.Second, - timeoutThreshold: 90 * time.Second, - stopCh: make(chan struct{}), - }, - consensus: &ConsensusManager{ - pendingOps: make(map[string]*ConsensusOperation), - votingTimeout: 10 * time.Second, - quorumSize: (options.ReplicationFactor / 2) + 1, - }, + if nodeID == "" { + nodeID = fmt.Sprintf("slurp-node-%d", time.Now().UnixNano()) } - // Start background processes - go ds.heartbeat.start() - go ds.consensusMonitor() - go ds.rebalanceMonitor() + metrics := &DistributedStorageStats{ + TotalNodes: 1, + ActiveNodes: 1, + FailedNodes: 0, + TotalReplicas: 0, + HealthyReplicas: 0, + UnderReplicated: 0, + LastRebalance: time.Now(), + } - return ds + return &DistributedStorageImpl{ + dht: dhtInstance, + nodeID: nodeID, + options: options, + metrics: metrics, + replicas: make(map[string][]string), + } } -// Store stores data in the distributed DHT with replication +// Store persists an encoded entry to the DHT. func (ds *DistributedStorageImpl) Store( ctx context.Context, key string, data interface{}, options *DistributedStoreOptions, ) error { - if options == nil { - options = ds.options + if ds.dht == nil { + return fmt.Errorf("distributed storage requires DHT instance") } - // Serialize data - dataBytes, err := json.Marshal(data) + storeOpts := options + if storeOpts == nil { + storeOpts = ds.options + } + + payload, err := normalisePayload(data) if err != nil { - return fmt.Errorf("failed to marshal data: %w", err) + return fmt.Errorf("failed to encode distributed payload: %w", err) } - // Create distributed entry + now := time.Now() entry := &DistributedEntry{ Key: key, - Data: dataBytes, - ReplicationFactor: options.ReplicationFactor, - ConsistencyLevel: options.ConsistencyLevel, - CreatedAt: time.Now(), + Data: payload, + ReplicationFactor: storeOpts.ReplicationFactor, + ConsistencyLevel: storeOpts.ConsistencyLevel, + CreatedAt: now, + UpdatedAt: now, Version: 1, - Checksum: ds.calculateChecksum(dataBytes), + Checksum: ds.calculateChecksum(payload), + Tombstone: false, } - // Determine target nodes for replication - targetNodes, err := ds.selectReplicationNodes(key, options.ReplicationFactor) + encodedEntry, err := json.Marshal(entry) if err != nil { - return fmt.Errorf("failed to select replication nodes: %w", err) + return fmt.Errorf("failed to marshal distributed entry: %w", err) } - // Store based on consistency level - switch options.ConsistencyLevel { - case ConsistencyEventual: - return ds.storeEventual(ctx, entry, targetNodes) - case ConsistencyStrong: - return ds.storeStrong(ctx, entry, targetNodes) - case ConsistencyQuorum: - return ds.storeQuorum(ctx, entry, targetNodes) - default: - return fmt.Errorf("unsupported consistency level: %s", options.ConsistencyLevel) - } -} - -// Retrieve retrieves data from the distributed DHT -func (ds *DistributedStorageImpl) Retrieve( - ctx context.Context, - key string, -) (interface{}, error) { - start := time.Now() - defer func() { - ds.updateLatencyMetrics(time.Since(start)) - }() - - // Try local first if prefer local is enabled - if ds.options.PreferLocal { - if localData, err := ds.dht.GetValue(ctx, key); err == nil { - return ds.deserializeEntry(localData) - } + if err := ds.dht.PutValue(ctx, key, encodedEntry); err != nil { + return fmt.Errorf("dht put failed: %w", err) } - // Get replica nodes for this key - replicas, err := ds.getReplicationNodes(key) - if err != nil { - return nil, fmt.Errorf("failed to get replication nodes: %w", err) - } + _ = ds.dht.Provide(ctx, key) - // Retrieve from replicas - return ds.retrieveFromReplicas(ctx, key, replicas) -} - -// Delete removes data from the distributed DHT -func (ds *DistributedStorageImpl) Delete( - ctx context.Context, - key string, -) error { - // Get replica nodes - replicas, err := ds.getReplicationNodes(key) - if err != nil { - return fmt.Errorf("failed to get replication nodes: %w", err) - } - - // Create consensus operation for deletion - opID := ds.generateOperationID() - op := &ConsensusOperation{ - ID: opID, - Type: "delete", - Key: key, - Initiator: ds.nodeID, - Votes: make(map[string]bool), - CreatedAt: time.Now(), - Status: ConsensusPending, - } - - // Execute consensus deletion - return ds.executeConsensusOperation(ctx, op, replicas) -} - -// Exists checks if data exists in the DHT -func (ds *DistributedStorageImpl) Exists( - ctx context.Context, - key string, -) (bool, error) { - if _, err := ds.dht.GetValue(ctx, key); err == nil { - return true, nil - } - return false, nil -} - -// Replicate ensures data is replicated across nodes -func (ds *DistributedStorageImpl) Replicate( - ctx context.Context, - key string, - replicationFactor int, -) error { - // Get current replicas - currentReplicas, err := ds.getReplicationNodes(key) - if err != nil { - return fmt.Errorf("failed to get current replicas: %w", err) - } - - // If we already have enough replicas, return - if len(currentReplicas) >= replicationFactor { - return nil - } - - // Get the data to replicate - data, err := ds.Retrieve(ctx, key) - if err != nil { - return fmt.Errorf("failed to retrieve data for replication: %w", err) - } - - // Select additional nodes for replication - neededReplicas := replicationFactor - len(currentReplicas) - newNodes, err := ds.selectAdditionalNodes(key, currentReplicas, neededReplicas) - if err != nil { - return fmt.Errorf("failed to select additional nodes: %w", err) - } - - // Replicate to new nodes - for _, nodeID := range newNodes { - if err := ds.replicateToNode(ctx, nodeID, key, data); err != nil { - // Log but continue with other nodes - fmt.Printf("Failed to replicate to node %s: %v\n", nodeID, err) - continue - } - currentReplicas = append(currentReplicas, nodeID) - } - - // Update replica tracking ds.mu.Lock() - ds.replicas[key] = currentReplicas + ds.replicas[key] = []string{ds.nodeID} + ds.metrics.TotalReplicas++ + ds.metrics.HealthyReplicas++ ds.mu.Unlock() return nil } -// FindReplicas finds all replicas of data +// Retrieve loads an entry from the DHT and returns the raw payload bytes. +func (ds *DistributedStorageImpl) Retrieve( + ctx context.Context, + key string, +) (interface{}, error) { + if ds.dht == nil { + return nil, fmt.Errorf("distributed storage requires DHT instance") + } + + raw, err := ds.dht.GetValue(ctx, key) + if err != nil { + return nil, err + } + + entry, err := decodeEntry(raw) + if err != nil { + return nil, err + } + if entry.Tombstone { + return nil, fmt.Errorf("distributed entry %s is tombstoned", key) + } + + return entry.Data, nil +} + +// Delete writes a tombstone entry so future reads treat the key as absent. +func (ds *DistributedStorageImpl) Delete( + ctx context.Context, + key string, +) error { + if ds.dht == nil { + return fmt.Errorf("distributed storage requires DHT instance") + } + + now := time.Now() + entry := &DistributedEntry{ + Key: key, + Data: nil, + ReplicationFactor: ds.options.ReplicationFactor, + ConsistencyLevel: ds.options.ConsistencyLevel, + CreatedAt: now, + UpdatedAt: now, + Version: 1, + Checksum: "", + Tombstone: true, + } + + encoded, err := json.Marshal(entry) + if err != nil { + return fmt.Errorf("failed to marshal tombstone: %w", err) + } + + if err := ds.dht.PutValue(ctx, key, encoded); err != nil { + return fmt.Errorf("dht put tombstone failed: %w", err) + } + + ds.mu.Lock() + delete(ds.replicas, key) + ds.mu.Unlock() + + return nil +} + +// Exists checks whether a non-tombstoned entry is present in the DHT. +func (ds *DistributedStorageImpl) Exists( + ctx context.Context, + key string, +) (bool, error) { + if ds.dht == nil { + return false, fmt.Errorf("distributed storage requires DHT instance") + } + + raw, err := ds.dht.GetValue(ctx, key) + if err != nil { + return false, nil + } + + entry, err := decodeEntry(raw) + if err != nil { + return false, err + } + + return !entry.Tombstone, nil +} + +// Replicate triggers another Provide call so the libp2p layer can advertise the key. +func (ds *DistributedStorageImpl) Replicate( + ctx context.Context, + key string, + replicationFactor int, +) error { + if ds.dht == nil { + return fmt.Errorf("distributed storage requires DHT instance") + } + + ds.mu.RLock() + _, known := ds.replicas[key] + ds.mu.RUnlock() + if !known { + // Nothing cached locally, but we still attempt to provide the key. + if _, err := ds.dht.GetValue(ctx, key); err != nil { + return err + } + } + + return ds.dht.Provide(ctx, key) +} + +// FindReplicas reports the local bookkeeping for which nodes supplied a key. func (ds *DistributedStorageImpl) FindReplicas( ctx context.Context, key string, ) ([]string, error) { - return ds.getReplicationNodes(key) + ds.mu.RLock() + defer ds.mu.RUnlock() + + if replicas, ok := ds.replicas[key]; ok { + return append([]string{}, replicas...), nil + } + + return []string{}, nil } -// Sync synchronizes with other DHT nodes +// Sync re-advertises every known key. This keeps bring-up deterministic while the +// richer replication manager is still under construction. func (ds *DistributedStorageImpl) Sync(ctx context.Context) error { - ds.metrics.LastRebalance = time.Now() + if ds.dht == nil { + return fmt.Errorf("distributed storage requires DHT instance") + } - // Get list of active nodes - activeNodes := ds.heartbeat.getActiveNodes() + ds.mu.RLock() + keys := make([]string, 0, len(ds.replicas)) + for key := range ds.replicas { + keys = append(keys, key) + } + ds.mu.RUnlock() - // Sync with each active node - for _, nodeID := range activeNodes { - if nodeID == ds.nodeID { - continue // Skip self - } - - if err := ds.syncWithNode(ctx, nodeID); err != nil { - // Log but continue with other nodes - fmt.Printf("Failed to sync with node %s: %v\n", nodeID, err) - continue + for _, key := range keys { + if err := ds.dht.Provide(ctx, key); err != nil { + return err } } return nil } -// GetDistributedStats returns distributed storage statistics +// GetDistributedStats returns a snapshot of the adapter's internal counters. func (ds *DistributedStorageImpl) GetDistributedStats() (*DistributedStorageStats, error) { ds.mu.RLock() defer ds.mu.RUnlock() - // Update current stats - activeNodes := ds.heartbeat.getActiveNodes() - ds.metrics.ActiveNodes = len(activeNodes) - ds.metrics.TotalNodes = len(ds.heartbeat.nodes) - ds.metrics.FailedNodes = ds.metrics.TotalNodes - ds.metrics.ActiveNodes - - // Calculate replica health - totalReplicas := int64(0) - healthyReplicas := int64(0) - underReplicated := int64(0) - - for _, replicas := range ds.replicas { - totalReplicas += int64(len(replicas)) - healthy := 0 - for _, nodeID := range replicas { - if ds.heartbeat.isNodeHealthy(nodeID) { - healthy++ - } - } - healthyReplicas += int64(healthy) - if healthy < ds.options.ReplicationFactor { - underReplicated++ - } - } - - ds.metrics.TotalReplicas = totalReplicas - ds.metrics.HealthyReplicas = healthyReplicas - ds.metrics.UnderReplicated = underReplicated - - // Return copy statsCopy := *ds.metrics + statsCopy.TotalReplicas = int64(len(ds.replicas)) + statsCopy.HealthyReplicas = statsCopy.TotalReplicas return &statsCopy, nil } -// DistributedEntry represents a distributed storage entry -type DistributedEntry struct { - Key string `json:"key"` - Data []byte `json:"data"` - ReplicationFactor int `json:"replication_factor"` - ConsistencyLevel ConsistencyLevel `json:"consistency_level"` - CreatedAt time.Time `json:"created_at"` - UpdatedAt time.Time `json:"updated_at"` - Version int64 `json:"version"` - Checksum string `json:"checksum"` +// Helpers -------------------------------------------------------------------- + +func normalisePayload(data interface{}) ([]byte, error) { + switch v := data.(type) { + case nil: + return nil, nil + case []byte: + return v, nil + case json.RawMessage: + return []byte(v), nil + default: + return json.Marshal(v) + } } -// Helper methods implementation - -func (ds *DistributedStorageImpl) selectReplicationNodes(key string, replicationFactor int) ([]string, error) { - // Get active nodes - activeNodes := ds.heartbeat.getActiveNodes() - if len(activeNodes) < replicationFactor { - return nil, fmt.Errorf("insufficient active nodes: need %d, have %d", replicationFactor, len(activeNodes)) +func decodeEntry(raw []byte) (*DistributedEntry, error) { + var entry DistributedEntry + if err := json.Unmarshal(raw, &entry); err != nil { + return nil, fmt.Errorf("failed to decode distributed entry: %w", err) } - - // Use consistent hashing to determine primary replicas - // This is a simplified version - production would use proper consistent hashing - nodes := make([]string, 0, replicationFactor) - hash := ds.calculateKeyHash(key) - - // Select nodes in a deterministic way based on key hash - for i := 0; i < replicationFactor && i < len(activeNodes); i++ { - nodeIndex := (int(hash) + i) % len(activeNodes) - nodes = append(nodes, activeNodes[nodeIndex]) - } - - return nodes, nil + return &entry, nil } -func (ds *DistributedStorageImpl) storeEventual(ctx context.Context, entry *DistributedEntry, nodes []string) error { - // Store asynchronously on all nodes for SEC-SLURP-1.1a replication policy - errCh := make(chan error, len(nodes)) - - for _, nodeID := range nodes { - go func(node string) { - err := ds.storeOnNode(ctx, node, entry) - errCh <- err - }(nodeID) - } - - // Don't wait for all nodes - eventual consistency - // Just ensure at least one succeeds - select { - case err := <-errCh: - if err == nil { - return nil // First success - } - case <-time.After(5 * time.Second): - return fmt.Errorf("timeout waiting for eventual store") - } - - // If first failed, try to get at least one success - timer := time.NewTimer(10 * time.Second) - defer timer.Stop() - - for i := 1; i < len(nodes); i++ { - select { - case err := <-errCh: - if err == nil { - return nil - } - case <-timer.C: - return fmt.Errorf("timeout waiting for eventual store success") - } - } - - return fmt.Errorf("failed to store on any node") -} - -func (ds *DistributedStorageImpl) storeStrong(ctx context.Context, entry *DistributedEntry, nodes []string) error { - // Store synchronously on all nodes per SEC-SLURP-1.1a durability target - errCh := make(chan error, len(nodes)) - - for _, nodeID := range nodes { - go func(node string) { - err := ds.storeOnNode(ctx, node, entry) - errCh <- err - }(nodeID) - } - - // Wait for all nodes to complete - var errors []error - for i := 0; i < len(nodes); i++ { - select { - case err := <-errCh: - if err != nil { - errors = append(errors, err) - } - case <-time.After(30 * time.Second): - return fmt.Errorf("timeout waiting for strong consistency store") - } - } - - if len(errors) > 0 { - return fmt.Errorf("strong consistency store failed: %v", errors) - } - - return nil -} - -func (ds *DistributedStorageImpl) storeQuorum(ctx context.Context, entry *DistributedEntry, nodes []string) error { - // Store on quorum of nodes per SEC-SLURP-1.1a availability guardrail - quorumSize := (len(nodes) / 2) + 1 - errCh := make(chan error, len(nodes)) - - for _, nodeID := range nodes { - go func(node string) { - err := ds.storeOnNode(ctx, node, entry) - errCh <- err - }(nodeID) - } - - // Wait for quorum - successCount := 0 - errorCount := 0 - - for i := 0; i < len(nodes); i++ { - select { - case err := <-errCh: - if err == nil { - successCount++ - if successCount >= quorumSize { - return nil // Quorum achieved - } - } else { - errorCount++ - if errorCount > len(nodes)-quorumSize { - return fmt.Errorf("quorum store failed: too many errors") - } - } - case <-time.After(20 * time.Second): - return fmt.Errorf("timeout waiting for quorum store") - } - } - - return fmt.Errorf("quorum store failed") -} - -// Additional helper method implementations would continue here... -// This is a substantial implementation showing the architecture - func (ds *DistributedStorageImpl) calculateChecksum(data []byte) string { - // Simple checksum calculation - would use proper hashing in production - return fmt.Sprintf("%x", len(data)) // Placeholder -} - -func (ds *DistributedStorageImpl) calculateKeyHash(key string) uint32 { - // Simple hash function - would use proper consistent hashing in production - hash := uint32(0) - for _, c := range key { - hash = hash*31 + uint32(c) + if len(data) == 0 { + return "" } - return hash -} - -func (ds *DistributedStorageImpl) generateOperationID() string { - return fmt.Sprintf("%s-%d", ds.nodeID, time.Now().UnixNano()) -} - -func (ds *DistributedStorageImpl) updateLatencyMetrics(latency time.Duration) { - ds.mu.Lock() - defer ds.mu.Unlock() - - if ds.metrics.NetworkLatency == 0 { - ds.metrics.NetworkLatency = latency - } else { - // Exponential moving average - ds.metrics.NetworkLatency = time.Duration( - float64(ds.metrics.NetworkLatency)*0.8 + float64(latency)*0.2, - ) - } -} - -// Placeholder implementations for remaining methods - -func (ds *DistributedStorageImpl) getReplicationNodes(key string) ([]string, error) { - ds.mu.RLock() - defer ds.mu.RUnlock() - - if replicas, exists := ds.replicas[key]; exists { - return replicas, nil - } - - // Fall back to consistent hashing - return ds.selectReplicationNodes(key, ds.options.ReplicationFactor) -} - -func (ds *DistributedStorageImpl) retrieveFromReplicas(ctx context.Context, key string, replicas []string) (interface{}, error) { - // Try each replica until success - for _, nodeID := range replicas { - if data, err := ds.retrieveFromNode(ctx, nodeID, key); err == nil { - return ds.deserializeEntry(data) - } - } - return nil, fmt.Errorf("failed to retrieve from any replica") -} - -func (ds *DistributedStorageImpl) deserializeEntry(data interface{}) (interface{}, error) { - // Deserialize distributed entry - return data, nil // Placeholder -} - -// Heartbeat manager methods - -func (hm *HeartbeatManager) start() { - ticker := time.NewTicker(hm.heartbeatInterval) - defer ticker.Stop() - - for { - select { - case <-ticker.C: - hm.checkNodeHealth() - case <-hm.stopCh: - return - } - } -} - -func (hm *HeartbeatManager) getActiveNodes() []string { - hm.mu.RLock() - defer hm.mu.RUnlock() - - var activeNodes []string - for nodeID, health := range hm.nodes { - if health.IsActive { - activeNodes = append(activeNodes, nodeID) - } - } - return activeNodes -} - -func (hm *HeartbeatManager) isNodeHealthy(nodeID string) bool { - hm.mu.RLock() - defer hm.mu.RUnlock() - - health, exists := hm.nodes[nodeID] - return exists && health.IsActive -} - -func (hm *HeartbeatManager) checkNodeHealth() { - // Placeholder implementation - // Would send heartbeats and update node health -} - -// Consensus monitor and other background processes - -func (ds *DistributedStorageImpl) consensusMonitor() { - ticker := time.NewTicker(5 * time.Second) - defer ticker.Stop() - - for range ticker.C { - ds.cleanupExpiredOperations() - } -} - -func (ds *DistributedStorageImpl) rebalanceMonitor() { - ticker := time.NewTicker(1 * time.Hour) - defer ticker.Stop() - - for range ticker.C { - ds.rebalanceReplicas() - } -} - -func (ds *DistributedStorageImpl) cleanupExpiredOperations() { - // Cleanup expired consensus operations -} - -func (ds *DistributedStorageImpl) rebalanceReplicas() { - // Rebalance replicas across healthy nodes -} - -// Placeholder method stubs for remaining functionality - -func (ds *DistributedStorageImpl) storeOnNode(ctx context.Context, nodeID string, entry *DistributedEntry) error { - // Store entry on specific node - return nil -} - -func (ds *DistributedStorageImpl) retrieveFromNode(ctx context.Context, nodeID string, key string) (interface{}, error) { - // Retrieve from specific node - return nil, nil -} - -func (ds *DistributedStorageImpl) checkExistsOnNode(ctx context.Context, nodeID string, key string) (bool, error) { - // Check if key exists on specific node - return false, nil -} - -func (ds *DistributedStorageImpl) replicateToNode(ctx context.Context, nodeID string, key string, data interface{}) error { - // Replicate data to specific node - return nil -} - -func (ds *DistributedStorageImpl) selectAdditionalNodes(key string, currentReplicas []string, needed int) ([]string, error) { - // Select additional nodes for replication - return nil, nil -} - -func (ds *DistributedStorageImpl) syncWithNode(ctx context.Context, nodeID string) error { - // Sync with specific node - return nil -} - -func (ds *DistributedStorageImpl) executeConsensusOperation(ctx context.Context, op *ConsensusOperation, nodes []string) error { - // Execute consensus operation across nodes - return nil + sum := sha256.Sum256(data) + return hex.EncodeToString(sum[:]) } diff --git a/pkg/slurp/temporal/persistence.go b/pkg/slurp/temporal/persistence.go index 4e93463..d65e2c5 100644 --- a/pkg/slurp/temporal/persistence.go +++ b/pkg/slurp/temporal/persistence.go @@ -534,8 +534,40 @@ func (pm *persistenceManagerImpl) loadFromLocalStorage(ctx context.Context) erro } func (pm *persistenceManagerImpl) loadFromDistributedStorage(ctx context.Context) error { - // Similar to local storage but using distributed store - // Implementation would be similar to loadFromLocalStorage + if pm.distributedStore == nil { + return nil + } + + data, err := pm.distributedStore.Retrieve(ctx, pm.generateGraphKey()) + if err != nil { + // No remote snapshot yet + return nil + } + + var snapshot GraphSnapshot + switch raw := data.(type) { + case []byte: + if len(raw) == 0 { + return nil + } + if err := json.Unmarshal(raw, &snapshot); err != nil { + return fmt.Errorf("failed to decode distributed snapshot: %w", err) + } + case json.RawMessage: + if err := json.Unmarshal(raw, &snapshot); err != nil { + return fmt.Errorf("failed to decode distributed snapshot: %w", err) + } + default: + encoded, marshalErr := json.Marshal(raw) + if marshalErr != nil { + return fmt.Errorf("failed to marshal distributed snapshot payload: %w", marshalErr) + } + if err := json.Unmarshal(encoded, &snapshot); err != nil { + return fmt.Errorf("failed to decode distributed snapshot: %w", err) + } + } + + pm.applySnapshot(&snapshot) return nil } @@ -588,6 +620,51 @@ func (pm *persistenceManagerImpl) createGraphSnapshot() (*GraphSnapshot, error) return snapshot, nil } +func (pm *persistenceManagerImpl) applySnapshot(snapshot *GraphSnapshot) { + if snapshot == nil { + return + } + + pm.graph.mu.Lock() + defer pm.graph.mu.Unlock() + + pm.graph.nodes = make(map[string]*TemporalNode, len(snapshot.Nodes)) + pm.graph.addressToNodes = make(map[string][]*TemporalNode, len(snapshot.Nodes)) + pm.graph.influences = make(map[string][]string, len(snapshot.Influences)) + pm.graph.influencedBy = make(map[string][]string, len(snapshot.InfluencedBy)) + pm.graph.decisions = make(map[string]*DecisionMetadata, len(snapshot.Decisions)) + pm.graph.decisionToNodes = make(map[string][]*TemporalNode) + pm.graph.pathCache = make(map[string][]*DecisionStep) + pm.graph.metricsCache = make(map[string]interface{}) + + for id, node := range snapshot.Nodes { + pm.graph.nodes[id] = node + + addressKey := node.UCXLAddress.String() + pm.graph.addressToNodes[addressKey] = append(pm.graph.addressToNodes[addressKey], node) + + if influences, ok := snapshot.Influences[id]; ok { + pm.graph.influences[id] = append([]string(nil), influences...) + } else { + pm.graph.influences[id] = make([]string, 0) + } + + if influencedBy, ok := snapshot.InfluencedBy[id]; ok { + pm.graph.influencedBy[id] = append([]string(nil), influencedBy...) + } else { + pm.graph.influencedBy[id] = make([]string, 0) + } + + if node.DecisionID != "" { + pm.graph.decisionToNodes[node.DecisionID] = append(pm.graph.decisionToNodes[node.DecisionID], node) + } + } + + for id, decision := range snapshot.Decisions { + pm.graph.decisions[id] = decision + } +} + func (pm *persistenceManagerImpl) getRemoteSnapshot(ctx context.Context) (*GraphSnapshot, error) { key := pm.generateGraphKey() @@ -596,12 +673,27 @@ func (pm *persistenceManagerImpl) getRemoteSnapshot(ctx context.Context) (*Graph return nil, err } - var snapshot *GraphSnapshot - if err := json.Unmarshal(data.([]byte), &snapshot); err != nil { - return nil, fmt.Errorf("failed to unmarshal remote snapshot: %w", err) + var snapshot GraphSnapshot + switch raw := data.(type) { + case []byte: + if err := json.Unmarshal(raw, &snapshot); err != nil { + return nil, fmt.Errorf("failed to unmarshal remote snapshot: %w", err) + } + case json.RawMessage: + if err := json.Unmarshal(raw, &snapshot); err != nil { + return nil, fmt.Errorf("failed to unmarshal remote snapshot: %w", err) + } + default: + encoded, marshalErr := json.Marshal(raw) + if marshalErr != nil { + return nil, fmt.Errorf("failed to marshal remote snapshot payload: %w", marshalErr) + } + if err := json.Unmarshal(encoded, &snapshot); err != nil { + return nil, fmt.Errorf("failed to unmarshal remote snapshot: %w", err) + } } - return snapshot, nil + return &snapshot, nil } func (pm *persistenceManagerImpl) performBidirectionalSync(ctx context.Context, local, remote *GraphSnapshot, result *SyncResult) error {