Files
bzzz/pkg/slurp/temporal/persistence.go
anthonyrawlins d96c931a29 Resolve import cycles and migrate to chorus.services module path
This comprehensive refactoring addresses critical architectural issues:

IMPORT CYCLE RESOLUTION:
• pkg/crypto ↔ pkg/slurp/roles: Created pkg/security/access_levels.go
• pkg/ucxl → pkg/dht: Created pkg/storage/interfaces.go
• pkg/slurp/leader → pkg/election → pkg/slurp/storage: Moved types to pkg/election/interfaces.go

MODULE PATH MIGRATION:
• Changed from github.com/anthonyrawlins/bzzz to chorus.services/bzzz
• Updated all import statements across 115+ files
• Maintains compatibility while removing personal GitHub account dependency

TYPE SYSTEM IMPROVEMENTS:
• Resolved duplicate type declarations in crypto package
• Added missing type definitions (RoleStatus, TimeRestrictions, KeyStatus, KeyRotationResult)
• Proper interface segregation to prevent future cycles

ARCHITECTURAL BENEFITS:
• Build now progresses past structural issues to normal dependency resolution
• Cleaner separation of concerns between packages
• Eliminates circular dependencies that prevented compilation
• Establishes foundation for scalable codebase growth

🤖 Generated with [Claude Code](https://claude.ai/code)

Co-Authored-By: Claude <noreply@anthropic.com>
2025-08-17 10:04:25 +10:00

889 lines
26 KiB
Go

package temporal
import (
"context"
"encoding/json"
"fmt"
"sync"
"time"
"chorus.services/bzzz/pkg/ucxl"
"chorus.services/bzzz/pkg/slurp/storage"
)
// persistenceManagerImpl handles persistence and synchronization of temporal graph data
type persistenceManagerImpl struct {
mu sync.RWMutex
// Storage interfaces
contextStore storage.ContextStore
localStorage storage.LocalStorage
distributedStore storage.DistributedStorage
encryptedStore storage.EncryptedStorage
backupManager storage.BackupManager
// Reference to temporal graph
graph *temporalGraphImpl
// Persistence configuration
config *PersistenceConfig
// Synchronization state
lastSyncTime time.Time
syncInProgress bool
pendingChanges map[string]*PendingChange
conflictResolver ConflictResolver
// Performance optimization
batchSize int
writeBuffer []*TemporalNode
bufferMutex sync.Mutex
flushInterval time.Duration
lastFlush time.Time
}
// PersistenceConfig represents configuration for temporal graph persistence
type PersistenceConfig struct {
// Storage settings
EnableLocalStorage bool `json:"enable_local_storage"`
EnableDistributedStorage bool `json:"enable_distributed_storage"`
EnableEncryption bool `json:"enable_encryption"`
EncryptionRoles []string `json:"encryption_roles"`
// Synchronization settings
SyncInterval time.Duration `json:"sync_interval"`
ConflictResolutionStrategy string `json:"conflict_resolution_strategy"`
EnableAutoSync bool `json:"enable_auto_sync"`
MaxSyncRetries int `json:"max_sync_retries"`
// Performance settings
BatchSize int `json:"batch_size"`
FlushInterval time.Duration `json:"flush_interval"`
EnableWriteBuffer bool `json:"enable_write_buffer"`
// Backup settings
EnableAutoBackup bool `json:"enable_auto_backup"`
BackupInterval time.Duration `json:"backup_interval"`
RetainBackupCount int `json:"retain_backup_count"`
// Storage keys and patterns
KeyPrefix string `json:"key_prefix"`
NodeKeyPattern string `json:"node_key_pattern"`
GraphKeyPattern string `json:"graph_key_pattern"`
MetadataKeyPattern string `json:"metadata_key_pattern"`
}
// PendingChange represents a change waiting to be synchronized
type PendingChange struct {
ID string `json:"id"`
Type ChangeType `json:"type"`
NodeID string `json:"node_id"`
Data interface{} `json:"data"`
Timestamp time.Time `json:"timestamp"`
Retries int `json:"retries"`
LastError string `json:"last_error"`
Metadata map[string]interface{} `json:"metadata"`
}
// ChangeType represents the type of change to be synchronized
type ChangeType string
const (
ChangeTypeNodeCreated ChangeType = "node_created"
ChangeTypeNodeUpdated ChangeType = "node_updated"
ChangeTypeNodeDeleted ChangeType = "node_deleted"
ChangeTypeGraphUpdated ChangeType = "graph_updated"
ChangeTypeInfluenceAdded ChangeType = "influence_added"
ChangeTypeInfluenceRemoved ChangeType = "influence_removed"
)
// ConflictResolver defines how to resolve conflicts during synchronization
type ConflictResolver interface {
ResolveConflict(ctx context.Context, local, remote *TemporalNode) (*TemporalNode, error)
ResolveGraphConflict(ctx context.Context, localGraph, remoteGraph *GraphSnapshot) (*GraphSnapshot, error)
}
// GraphSnapshot represents a snapshot of the temporal graph for synchronization
type GraphSnapshot struct {
Timestamp time.Time `json:"timestamp"`
Nodes map[string]*TemporalNode `json:"nodes"`
Influences map[string][]string `json:"influences"`
InfluencedBy map[string][]string `json:"influenced_by"`
Decisions map[string]*DecisionMetadata `json:"decisions"`
Metadata *GraphMetadata `json:"metadata"`
Checksum string `json:"checksum"`
}
// GraphMetadata represents metadata about the temporal graph
type GraphMetadata struct {
Version int `json:"version"`
LastModified time.Time `json:"last_modified"`
NodeCount int `json:"node_count"`
EdgeCount int `json:"edge_count"`
DecisionCount int `json:"decision_count"`
CreatedBy string `json:"created_by"`
CreatedAt time.Time `json:"created_at"`
}
// SyncResult represents the result of a synchronization operation
type SyncResult struct {
StartTime time.Time `json:"start_time"`
EndTime time.Time `json:"end_time"`
Duration time.Duration `json:"duration"`
NodesProcessed int `json:"nodes_processed"`
NodesCreated int `json:"nodes_created"`
NodesUpdated int `json:"nodes_updated"`
NodesDeleted int `json:"nodes_deleted"`
ConflictsFound int `json:"conflicts_found"`
ConflictsResolved int `json:"conflicts_resolved"`
Errors []string `json:"errors"`
Success bool `json:"success"`
}
// NewPersistenceManager creates a new persistence manager
func NewPersistenceManager(
contextStore storage.ContextStore,
localStorage storage.LocalStorage,
distributedStore storage.DistributedStorage,
encryptedStore storage.EncryptedStorage,
backupManager storage.BackupManager,
graph *temporalGraphImpl,
config *PersistenceConfig,
) *persistenceManagerImpl {
pm := &persistenceManagerImpl{
contextStore: contextStore,
localStorage: localStorage,
distributedStore: distributedStore,
encryptedStore: encryptedStore,
backupManager: backupManager,
graph: graph,
config: config,
pendingChanges: make(map[string]*PendingChange),
conflictResolver: NewDefaultConflictResolver(),
batchSize: config.BatchSize,
writeBuffer: make([]*TemporalNode, 0, config.BatchSize),
flushInterval: config.FlushInterval,
}
// Start background processes
if config.EnableAutoSync {
go pm.syncWorker()
}
if config.EnableWriteBuffer {
go pm.flushWorker()
}
if config.EnableAutoBackup {
go pm.backupWorker()
}
return pm
}
// PersistTemporalNode persists a temporal node to storage
func (pm *persistenceManagerImpl) PersistTemporalNode(ctx context.Context, node *TemporalNode) error {
pm.mu.Lock()
defer pm.mu.Unlock()
// Add to write buffer if enabled
if pm.config.EnableWriteBuffer {
return pm.addToWriteBuffer(node)
}
// Direct persistence
return pm.persistNodeDirect(ctx, node)
}
// LoadTemporalGraph loads the temporal graph from storage
func (pm *persistenceManagerImpl) LoadTemporalGraph(ctx context.Context) error {
pm.mu.Lock()
defer pm.mu.Unlock()
// Load from different storage layers
if pm.config.EnableLocalStorage {
if err := pm.loadFromLocalStorage(ctx); err != nil {
return fmt.Errorf("failed to load from local storage: %w", err)
}
}
if pm.config.EnableDistributedStorage {
if err := pm.loadFromDistributedStorage(ctx); err != nil {
return fmt.Errorf("failed to load from distributed storage: %w", err)
}
}
return nil
}
// SynchronizeGraph synchronizes the temporal graph with distributed storage
func (pm *persistenceManagerImpl) SynchronizeGraph(ctx context.Context) (*SyncResult, error) {
pm.mu.Lock()
if pm.syncInProgress {
pm.mu.Unlock()
return nil, fmt.Errorf("synchronization already in progress")
}
pm.syncInProgress = true
pm.mu.Unlock()
defer func() {
pm.mu.Lock()
pm.syncInProgress = false
pm.lastSyncTime = time.Now()
pm.mu.Unlock()
}()
result := &SyncResult{
StartTime: time.Now(),
Errors: make([]string, 0),
}
// Create local snapshot
localSnapshot, err := pm.createGraphSnapshot()
if err != nil {
result.Errors = append(result.Errors, fmt.Sprintf("failed to create local snapshot: %v", err))
result.Success = false
return result, err
}
// Get remote snapshot
remoteSnapshot, err := pm.getRemoteSnapshot(ctx)
if err != nil {
// Remote might not exist yet, continue with local
remoteSnapshot = nil
}
// Perform synchronization
if remoteSnapshot != nil {
err = pm.performBidirectionalSync(ctx, localSnapshot, remoteSnapshot, result)
} else {
err = pm.performInitialSync(ctx, localSnapshot, result)
}
if err != nil {
result.Errors = append(result.Errors, fmt.Sprintf("sync failed: %v", err))
result.Success = false
} else {
result.Success = true
}
result.EndTime = time.Now()
result.Duration = result.EndTime.Sub(result.StartTime)
return result, err
}
// BackupGraph creates a backup of the temporal graph
func (pm *persistenceManagerImpl) BackupGraph(ctx context.Context) error {
pm.mu.RLock()
defer pm.mu.RUnlock()
if !pm.config.EnableAutoBackup {
return fmt.Errorf("backup not enabled")
}
// Create graph snapshot
snapshot, err := pm.createGraphSnapshot()
if err != nil {
return fmt.Errorf("failed to create snapshot: %w", err)
}
// Serialize snapshot
data, err := json.Marshal(snapshot)
if err != nil {
return fmt.Errorf("failed to serialize snapshot: %w", err)
}
// Create backup configuration
backupConfig := &storage.BackupConfig{
Type: "temporal_graph",
Description: "Temporal graph backup",
Tags: []string{"temporal", "graph", "decision"},
Metadata: map[string]interface{}{
"node_count": snapshot.Metadata.NodeCount,
"edge_count": snapshot.Metadata.EdgeCount,
"decision_count": snapshot.Metadata.DecisionCount,
},
}
// Create backup
_, err = pm.backupManager.CreateBackup(ctx, backupConfig)
return err
}
// RestoreGraph restores the temporal graph from a backup
func (pm *persistenceManagerImpl) RestoreGraph(ctx context.Context, backupID string) error {
pm.mu.Lock()
defer pm.mu.Unlock()
// Create restore configuration
restoreConfig := &storage.RestoreConfig{
OverwriteExisting: true,
ValidateIntegrity: true,
}
// Restore from backup
err := pm.backupManager.RestoreBackup(ctx, backupID, restoreConfig)
if err != nil {
return fmt.Errorf("failed to restore backup: %w", err)
}
// Reload graph from storage
return pm.LoadTemporalGraph(ctx)
}
// Internal persistence methods
func (pm *persistenceManagerImpl) addToWriteBuffer(node *TemporalNode) error {
pm.bufferMutex.Lock()
defer pm.bufferMutex.Unlock()
pm.writeBuffer = append(pm.writeBuffer, node)
// Check if buffer is full
if len(pm.writeBuffer) >= pm.batchSize {
return pm.flushWriteBuffer()
}
return nil
}
func (pm *persistenceManagerImpl) flushWriteBuffer() error {
if len(pm.writeBuffer) == 0 {
return nil
}
// Create batch store request
batch := &storage.BatchStoreRequest{
Operations: make([]*storage.BatchStoreOperation, len(pm.writeBuffer)),
}
for i, node := range pm.writeBuffer {
key := pm.generateNodeKey(node)
batch.Operations[i] = &storage.BatchStoreOperation{
Type: "store",
Key: key,
Data: node,
Roles: pm.config.EncryptionRoles,
}
}
// Execute batch store
ctx := context.Background()
_, err := pm.contextStore.BatchStore(ctx, batch)
if err != nil {
return fmt.Errorf("failed to flush write buffer: %w", err)
}
// Clear buffer
pm.writeBuffer = pm.writeBuffer[:0]
pm.lastFlush = time.Now()
return nil
}
func (pm *persistenceManagerImpl) persistNodeDirect(ctx context.Context, node *TemporalNode) error {
key := pm.generateNodeKey(node)
// Store in different layers
if pm.config.EnableLocalStorage {
if err := pm.localStorage.Store(ctx, key, node, nil); err != nil {
return fmt.Errorf("failed to store in local storage: %w", err)
}
}
if pm.config.EnableDistributedStorage {
if err := pm.distributedStore.Store(ctx, key, node, nil); err != nil {
return fmt.Errorf("failed to store in distributed storage: %w", err)
}
}
if pm.config.EnableEncryption {
if err := pm.encryptedStore.StoreEncrypted(ctx, key, node, pm.config.EncryptionRoles); err != nil {
return fmt.Errorf("failed to store encrypted: %w", err)
}
}
// Add to pending changes for synchronization
change := &PendingChange{
ID: fmt.Sprintf("%s-%d", node.ID, time.Now().UnixNano()),
Type: ChangeTypeNodeCreated,
NodeID: node.ID,
Data: node,
Timestamp: time.Now(),
Metadata: make(map[string]interface{}),
}
pm.pendingChanges[change.ID] = change
return nil
}
func (pm *persistenceManagerImpl) loadFromLocalStorage(ctx context.Context) error {
// Load graph metadata
metadataKey := pm.generateMetadataKey()
metadataData, err := pm.localStorage.Retrieve(ctx, metadataKey)
if err != nil {
return fmt.Errorf("failed to load metadata: %w", err)
}
var metadata *GraphMetadata
if err := json.Unmarshal(metadataData.([]byte), &metadata); err != nil {
return fmt.Errorf("failed to unmarshal metadata: %w", err)
}
// Load all nodes
pattern := pm.generateNodeKeyPattern()
nodeKeys, err := pm.localStorage.List(ctx, pattern)
if err != nil {
return fmt.Errorf("failed to list nodes: %w", err)
}
// Load nodes in batches
batchReq := &storage.BatchRetrieveRequest{
Keys: nodeKeys,
}
batchResult, err := pm.contextStore.BatchRetrieve(ctx, batchReq)
if err != nil {
return fmt.Errorf("failed to batch retrieve nodes: %w", err)
}
// Reconstruct graph
pm.graph.mu.Lock()
defer pm.graph.mu.Unlock()
pm.graph.nodes = make(map[string]*TemporalNode)
pm.graph.addressToNodes = make(map[string][]*TemporalNode)
pm.graph.influences = make(map[string][]string)
pm.graph.influencedBy = make(map[string][]string)
for key, result := range batchResult.Results {
if result.Error != nil {
continue // Skip failed retrievals
}
var node *TemporalNode
if err := json.Unmarshal(result.Data.([]byte), &node); err != nil {
continue // Skip invalid nodes
}
pm.reconstructGraphNode(node)
}
return nil
}
func (pm *persistenceManagerImpl) loadFromDistributedStorage(ctx context.Context) error {
// Similar to local storage but using distributed store
// Implementation would be similar to loadFromLocalStorage
return nil
}
func (pm *persistenceManagerImpl) createGraphSnapshot() (*GraphSnapshot, error) {
pm.graph.mu.RLock()
defer pm.graph.mu.RUnlock()
snapshot := &GraphSnapshot{
Timestamp: time.Now(),
Nodes: make(map[string]*TemporalNode),
Influences: make(map[string][]string),
InfluencedBy: make(map[string][]string),
Decisions: make(map[string]*DecisionMetadata),
Metadata: &GraphMetadata{
Version: 1,
LastModified: time.Now(),
NodeCount: len(pm.graph.nodes),
EdgeCount: pm.calculateEdgeCount(),
DecisionCount: len(pm.graph.decisions),
CreatedBy: "temporal_graph",
CreatedAt: time.Now(),
},
}
// Copy nodes
for id, node := range pm.graph.nodes {
snapshot.Nodes[id] = node
}
// Copy influences
for id, influences := range pm.graph.influences {
snapshot.Influences[id] = make([]string, len(influences))
copy(snapshot.Influences[id], influences)
}
// Copy influenced by
for id, influencedBy := range pm.graph.influencedBy {
snapshot.InfluencedBy[id] = make([]string, len(influencedBy))
copy(snapshot.InfluencedBy[id], influencedBy)
}
// Copy decisions
for id, decision := range pm.graph.decisions {
snapshot.Decisions[id] = decision
}
// Calculate checksum
snapshot.Checksum = pm.calculateSnapshotChecksum(snapshot)
return snapshot, nil
}
func (pm *persistenceManagerImpl) getRemoteSnapshot(ctx context.Context) (*GraphSnapshot, error) {
key := pm.generateGraphKey()
data, err := pm.distributedStore.Retrieve(ctx, key)
if err != nil {
return nil, err
}
var snapshot *GraphSnapshot
if err := json.Unmarshal(data.([]byte), &snapshot); err != nil {
return nil, fmt.Errorf("failed to unmarshal remote snapshot: %w", err)
}
return snapshot, nil
}
func (pm *persistenceManagerImpl) performBidirectionalSync(ctx context.Context, local, remote *GraphSnapshot, result *SyncResult) error {
// Compare snapshots and identify differences
conflicts := pm.identifyConflicts(local, remote)
result.ConflictsFound = len(conflicts)
// Resolve conflicts
for _, conflict := range conflicts {
resolved, err := pm.resolveConflict(ctx, conflict)
if err != nil {
result.Errors = append(result.Errors, fmt.Sprintf("failed to resolve conflict %s: %v", conflict.NodeID, err))
continue
}
// Apply resolution
if err := pm.applyConflictResolution(ctx, resolved); err != nil {
result.Errors = append(result.Errors, fmt.Sprintf("failed to apply resolution for %s: %v", conflict.NodeID, err))
continue
}
result.ConflictsResolved++
}
// Sync local changes to remote
err := pm.syncLocalToRemote(ctx, local, remote, result)
if err != nil {
return fmt.Errorf("failed to sync local to remote: %w", err)
}
// Sync remote changes to local
err = pm.syncRemoteToLocal(ctx, remote, local, result)
if err != nil {
return fmt.Errorf("failed to sync remote to local: %w", err)
}
return nil
}
func (pm *persistenceManagerImpl) performInitialSync(ctx context.Context, local *GraphSnapshot, result *SyncResult) error {
// Store entire local snapshot to remote
key := pm.generateGraphKey()
data, err := json.Marshal(local)
if err != nil {
return fmt.Errorf("failed to marshal snapshot: %w", err)
}
err = pm.distributedStore.Store(ctx, key, data, nil)
if err != nil {
return fmt.Errorf("failed to store snapshot: %w", err)
}
result.NodesProcessed = len(local.Nodes)
result.NodesCreated = len(local.Nodes)
return nil
}
// Background workers
func (pm *persistenceManagerImpl) syncWorker() {
ticker := time.NewTicker(pm.config.SyncInterval)
defer ticker.Stop()
for range ticker.C {
ctx := context.Background()
if _, err := pm.SynchronizeGraph(ctx); err != nil {
// Log error but continue
fmt.Printf("sync worker error: %v\n", err)
}
}
}
func (pm *persistenceManagerImpl) flushWorker() {
ticker := time.NewTicker(pm.flushInterval)
defer ticker.Stop()
for range ticker.C {
pm.bufferMutex.Lock()
if time.Since(pm.lastFlush) >= pm.flushInterval && len(pm.writeBuffer) > 0 {
if err := pm.flushWriteBuffer(); err != nil {
fmt.Printf("flush worker error: %v\n", err)
}
}
pm.bufferMutex.Unlock()
}
}
func (pm *persistenceManagerImpl) backupWorker() {
ticker := time.NewTicker(pm.config.BackupInterval)
defer ticker.Stop()
for range ticker.C {
ctx := context.Background()
if err := pm.BackupGraph(ctx); err != nil {
fmt.Printf("backup worker error: %v\n", err)
}
}
}
// Helper methods
func (pm *persistenceManagerImpl) generateNodeKey(node *TemporalNode) string {
return fmt.Sprintf("%s/nodes/%s", pm.config.KeyPrefix, node.ID)
}
func (pm *persistenceManagerImpl) generateGraphKey() string {
return fmt.Sprintf("%s/graph/snapshot", pm.config.KeyPrefix)
}
func (pm *persistenceManagerImpl) generateMetadataKey() string {
return fmt.Sprintf("%s/graph/metadata", pm.config.KeyPrefix)
}
func (pm *persistenceManagerImpl) generateNodeKeyPattern() string {
return fmt.Sprintf("%s/nodes/*", pm.config.KeyPrefix)
}
func (pm *persistenceManagerImpl) calculateEdgeCount() int {
count := 0
for _, influences := range pm.graph.influences {
count += len(influences)
}
return count
}
func (pm *persistenceManagerImpl) calculateSnapshotChecksum(snapshot *GraphSnapshot) string {
// Calculate checksum based on snapshot content
data, _ := json.Marshal(snapshot.Nodes)
return fmt.Sprintf("%x", data)[:16] // Simplified checksum
}
func (pm *persistenceManagerImpl) reconstructGraphNode(node *TemporalNode) {
// Add node to graph
pm.graph.nodes[node.ID] = node
// Update address mapping
addressKey := node.UCXLAddress.String()
if existing, exists := pm.graph.addressToNodes[addressKey]; exists {
pm.graph.addressToNodes[addressKey] = append(existing, node)
} else {
pm.graph.addressToNodes[addressKey] = []*TemporalNode{node}
}
// Reconstruct influence relationships
pm.graph.influences[node.ID] = make([]string, 0)
pm.graph.influencedBy[node.ID] = make([]string, 0)
// These would be rebuilt from the influence data in the snapshot
}
func (pm *persistenceManagerImpl) identifyConflicts(local, remote *GraphSnapshot) []*SyncConflict {
conflicts := make([]*SyncConflict, 0)
// Compare nodes
for nodeID, localNode := range local.Nodes {
if remoteNode, exists := remote.Nodes[nodeID]; exists {
if pm.hasNodeConflict(localNode, remoteNode) {
conflict := &SyncConflict{
Type: ConflictTypeNodeMismatch,
NodeID: nodeID,
LocalData: localNode,
RemoteData: remoteNode,
}
conflicts = append(conflicts, conflict)
}
}
}
return conflicts
}
func (pm *persistenceManagerImpl) hasNodeConflict(local, remote *TemporalNode) bool {
// Simple conflict detection based on timestamp and hash
return local.Timestamp != remote.Timestamp || local.ContextHash != remote.ContextHash
}
func (pm *persistenceManagerImpl) resolveConflict(ctx context.Context, conflict *SyncConflict) (*ConflictResolution, error) {
// Use conflict resolver to resolve the conflict
localNode := conflict.LocalData.(*TemporalNode)
remoteNode := conflict.RemoteData.(*TemporalNode)
resolvedNode, err := pm.conflictResolver.ResolveConflict(ctx, localNode, remoteNode)
if err != nil {
return nil, err
}
return &ConflictResolution{
ConflictID: conflict.NodeID,
Resolution: "merged",
ResolvedData: resolvedNode,
ResolvedAt: time.Now(),
}, nil
}
func (pm *persistenceManagerImpl) applyConflictResolution(ctx context.Context, resolution *ConflictResolution) error {
// Apply the resolved node back to the graph
resolvedNode := resolution.ResolvedData.(*TemporalNode)
pm.graph.mu.Lock()
pm.graph.nodes[resolvedNode.ID] = resolvedNode
pm.graph.mu.Unlock()
// Persist the resolved node
return pm.persistNodeDirect(ctx, resolvedNode)
}
func (pm *persistenceManagerImpl) syncLocalToRemote(ctx context.Context, local, remote *GraphSnapshot, result *SyncResult) error {
// Sync nodes that exist locally but not remotely, or are newer locally
for nodeID, localNode := range local.Nodes {
shouldSync := false
if remoteNode, exists := remote.Nodes[nodeID]; exists {
// Check if local is newer
if localNode.Timestamp.After(remoteNode.Timestamp) {
shouldSync = true
}
} else {
// Node doesn't exist remotely
shouldSync = true
result.NodesCreated++
}
if shouldSync {
key := pm.generateNodeKey(localNode)
data, err := json.Marshal(localNode)
if err != nil {
result.Errors = append(result.Errors, fmt.Sprintf("failed to marshal node %s: %v", nodeID, err))
continue
}
err = pm.distributedStore.Store(ctx, key, data, nil)
if err != nil {
result.Errors = append(result.Errors, fmt.Sprintf("failed to sync node %s to remote: %v", nodeID, err))
continue
}
if remoteNode, exists := remote.Nodes[nodeID]; exists && localNode.Timestamp.After(remoteNode.Timestamp) {
result.NodesUpdated++
}
}
}
return nil
}
func (pm *persistenceManagerImpl) syncRemoteToLocal(ctx context.Context, remote, local *GraphSnapshot, result *SyncResult) error {
// Sync nodes that exist remotely but not locally, or are newer remotely
for nodeID, remoteNode := range remote.Nodes {
shouldSync := false
if localNode, exists := local.Nodes[nodeID]; exists {
// Check if remote is newer
if remoteNode.Timestamp.After(localNode.Timestamp) {
shouldSync = true
}
} else {
// Node doesn't exist locally
shouldSync = true
result.NodesCreated++
}
if shouldSync {
// Add to local graph
pm.graph.mu.Lock()
pm.graph.nodes[remoteNode.ID] = remoteNode
pm.reconstructGraphNode(remoteNode)
pm.graph.mu.Unlock()
// Persist locally
err := pm.persistNodeDirect(ctx, remoteNode)
if err != nil {
result.Errors = append(result.Errors, fmt.Sprintf("failed to sync node %s to local: %v", nodeID, err))
continue
}
if localNode, exists := local.Nodes[nodeID]; exists && remoteNode.Timestamp.After(localNode.Timestamp) {
result.NodesUpdated++
}
}
}
return nil
}
// Supporting types for conflict resolution
type SyncConflict struct {
Type ConflictType `json:"type"`
NodeID string `json:"node_id"`
LocalData interface{} `json:"local_data"`
RemoteData interface{} `json:"remote_data"`
Severity string `json:"severity"`
}
type ConflictType string
const (
ConflictTypeNodeMismatch ConflictType = "node_mismatch"
ConflictTypeInfluenceMismatch ConflictType = "influence_mismatch"
ConflictTypeMetadataMismatch ConflictType = "metadata_mismatch"
)
type ConflictResolution struct {
ConflictID string `json:"conflict_id"`
Resolution string `json:"resolution"`
ResolvedData interface{} `json:"resolved_data"`
ResolvedAt time.Time `json:"resolved_at"`
ResolvedBy string `json:"resolved_by"`
}
// Default conflict resolver implementation
type defaultConflictResolver struct{}
func NewDefaultConflictResolver() ConflictResolver {
return &defaultConflictResolver{}
}
func (dcr *defaultConflictResolver) ResolveConflict(ctx context.Context, local, remote *TemporalNode) (*TemporalNode, error) {
// Default strategy: choose the one with higher confidence, or more recent if equal
if local.Confidence > remote.Confidence {
return local, nil
} else if remote.Confidence > local.Confidence {
return remote, nil
} else {
// Equal confidence, choose more recent
if local.Timestamp.After(remote.Timestamp) {
return local, nil
}
return remote, nil
}
}
func (dcr *defaultConflictResolver) ResolveGraphConflict(ctx context.Context, localGraph, remoteGraph *GraphSnapshot) (*GraphSnapshot, error) {
// Default strategy: merge graphs, preferring more recent data
if localGraph.Timestamp.After(remoteGraph.Timestamp) {
return localGraph, nil
}
return remoteGraph, nil
}