 9bdcbe0447
			
		
	
	9bdcbe0447
	
	
	
		
			
			Major integrations and fixes: - Added BACKBEAT SDK integration for P2P operation timing - Implemented beat-aware status tracking for distributed operations - Added Docker secrets support for secure license management - Resolved KACHING license validation via HTTPS/TLS - Updated docker-compose configuration for clean stack deployment - Disabled rollback policies to prevent deployment failures - Added license credential storage (CHORUS-DEV-MULTI-001) Technical improvements: - BACKBEAT P2P operation tracking with phase management - Enhanced configuration system with file-based secrets - Improved error handling for license validation - Clean separation of KACHING and CHORUS deployment stacks 🤖 Generated with [Claude Code](https://claude.ai/code) Co-Authored-By: Claude <noreply@anthropic.com>
		
			
				
	
	
		
			889 lines
		
	
	
		
			26 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
			
		
		
	
	
			889 lines
		
	
	
		
			26 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
| package temporal
 | |
| 
 | |
| import (
 | |
| 	"context"
 | |
| 	"encoding/json"
 | |
| 	"fmt"
 | |
| 	"sync"
 | |
| 	"time"
 | |
| 
 | |
| 	"chorus/pkg/ucxl"
 | |
| 	"chorus/pkg/slurp/storage"
 | |
| )
 | |
| 
 | |
| // persistenceManagerImpl handles persistence and synchronization of temporal graph data
 | |
| type persistenceManagerImpl struct {
 | |
| 	mu sync.RWMutex
 | |
| 	
 | |
| 	// Storage interfaces
 | |
| 	contextStore     storage.ContextStore
 | |
| 	localStorage     storage.LocalStorage
 | |
| 	distributedStore storage.DistributedStorage
 | |
| 	encryptedStore   storage.EncryptedStorage
 | |
| 	backupManager    storage.BackupManager
 | |
| 	
 | |
| 	// Reference to temporal graph
 | |
| 	graph *temporalGraphImpl
 | |
| 	
 | |
| 	// Persistence configuration
 | |
| 	config *PersistenceConfig
 | |
| 	
 | |
| 	// Synchronization state
 | |
| 	lastSyncTime     time.Time
 | |
| 	syncInProgress   bool
 | |
| 	pendingChanges   map[string]*PendingChange
 | |
| 	conflictResolver ConflictResolver
 | |
| 	
 | |
| 	// Performance optimization
 | |
| 	batchSize        int
 | |
| 	writeBuffer      []*TemporalNode
 | |
| 	bufferMutex      sync.Mutex
 | |
| 	flushInterval    time.Duration
 | |
| 	lastFlush        time.Time
 | |
| }
 | |
| 
 | |
| // PersistenceConfig represents configuration for temporal graph persistence
 | |
| type PersistenceConfig struct {
 | |
| 	// Storage settings
 | |
| 	EnableLocalStorage       bool              `json:"enable_local_storage"`
 | |
| 	EnableDistributedStorage bool              `json:"enable_distributed_storage"`
 | |
| 	EnableEncryption         bool              `json:"enable_encryption"`
 | |
| 	EncryptionRoles          []string          `json:"encryption_roles"`
 | |
| 	
 | |
| 	// Synchronization settings
 | |
| 	SyncInterval             time.Duration     `json:"sync_interval"`
 | |
| 	ConflictResolutionStrategy string          `json:"conflict_resolution_strategy"`
 | |
| 	EnableAutoSync           bool              `json:"enable_auto_sync"`
 | |
| 	MaxSyncRetries           int               `json:"max_sync_retries"`
 | |
| 	
 | |
| 	// Performance settings
 | |
| 	BatchSize                int               `json:"batch_size"`
 | |
| 	FlushInterval            time.Duration     `json:"flush_interval"`
 | |
| 	EnableWriteBuffer        bool              `json:"enable_write_buffer"`
 | |
| 	
 | |
| 	// Backup settings
 | |
| 	EnableAutoBackup         bool              `json:"enable_auto_backup"`
 | |
| 	BackupInterval           time.Duration     `json:"backup_interval"`
 | |
| 	RetainBackupCount        int               `json:"retain_backup_count"`
 | |
| 	
 | |
| 	// Storage keys and patterns
 | |
| 	KeyPrefix                string            `json:"key_prefix"`
 | |
| 	NodeKeyPattern           string            `json:"node_key_pattern"`
 | |
| 	GraphKeyPattern          string            `json:"graph_key_pattern"`
 | |
| 	MetadataKeyPattern       string            `json:"metadata_key_pattern"`
 | |
| }
 | |
| 
 | |
| // PendingChange represents a change waiting to be synchronized
 | |
| type PendingChange struct {
 | |
| 	ID          string                 `json:"id"`
 | |
| 	Type        ChangeType             `json:"type"`
 | |
| 	NodeID      string                 `json:"node_id"`
 | |
| 	Data        interface{}            `json:"data"`
 | |
| 	Timestamp   time.Time              `json:"timestamp"`
 | |
| 	Retries     int                    `json:"retries"`
 | |
| 	LastError   string                 `json:"last_error"`
 | |
| 	Metadata    map[string]interface{} `json:"metadata"`
 | |
| }
 | |
| 
 | |
| // ChangeType represents the type of change to be synchronized
 | |
| type ChangeType string
 | |
| 
 | |
| const (
 | |
| 	ChangeTypeNodeCreated  ChangeType = "node_created"
 | |
| 	ChangeTypeNodeUpdated  ChangeType = "node_updated"
 | |
| 	ChangeTypeNodeDeleted  ChangeType = "node_deleted"
 | |
| 	ChangeTypeGraphUpdated ChangeType = "graph_updated"
 | |
| 	ChangeTypeInfluenceAdded ChangeType = "influence_added"
 | |
| 	ChangeTypeInfluenceRemoved ChangeType = "influence_removed"
 | |
| )
 | |
| 
 | |
| // ConflictResolver defines how to resolve conflicts during synchronization
 | |
| type ConflictResolver interface {
 | |
| 	ResolveConflict(ctx context.Context, local, remote *TemporalNode) (*TemporalNode, error)
 | |
| 	ResolveGraphConflict(ctx context.Context, localGraph, remoteGraph *GraphSnapshot) (*GraphSnapshot, error)
 | |
| }
 | |
| 
 | |
| // GraphSnapshot represents a snapshot of the temporal graph for synchronization
 | |
| type GraphSnapshot struct {
 | |
| 	Timestamp     time.Time              `json:"timestamp"`
 | |
| 	Nodes         map[string]*TemporalNode `json:"nodes"`
 | |
| 	Influences    map[string][]string    `json:"influences"`
 | |
| 	InfluencedBy  map[string][]string    `json:"influenced_by"`
 | |
| 	Decisions     map[string]*DecisionMetadata `json:"decisions"`
 | |
| 	Metadata      *GraphMetadata         `json:"metadata"`
 | |
| 	Checksum      string                 `json:"checksum"`
 | |
| }
 | |
| 
 | |
| // GraphMetadata represents metadata about the temporal graph
 | |
| type GraphMetadata struct {
 | |
| 	Version         int       `json:"version"`
 | |
| 	LastModified    time.Time `json:"last_modified"`
 | |
| 	NodeCount       int       `json:"node_count"`
 | |
| 	EdgeCount       int       `json:"edge_count"`
 | |
| 	DecisionCount   int       `json:"decision_count"`
 | |
| 	CreatedBy       string    `json:"created_by"`
 | |
| 	CreatedAt       time.Time `json:"created_at"`
 | |
| }
 | |
| 
 | |
| // SyncResult represents the result of a synchronization operation
 | |
| type SyncResult struct {
 | |
| 	StartTime        time.Time     `json:"start_time"`
 | |
| 	EndTime          time.Time     `json:"end_time"`
 | |
| 	Duration         time.Duration `json:"duration"`
 | |
| 	NodesProcessed   int           `json:"nodes_processed"`
 | |
| 	NodesCreated     int           `json:"nodes_created"`
 | |
| 	NodesUpdated     int           `json:"nodes_updated"`
 | |
| 	NodesDeleted     int           `json:"nodes_deleted"`
 | |
| 	ConflictsFound   int           `json:"conflicts_found"`
 | |
| 	ConflictsResolved int          `json:"conflicts_resolved"`
 | |
| 	Errors           []string      `json:"errors"`
 | |
| 	Success          bool          `json:"success"`
 | |
| }
 | |
| 
 | |
| // NewPersistenceManager creates a new persistence manager
 | |
| func NewPersistenceManager(
 | |
| 	contextStore storage.ContextStore,
 | |
| 	localStorage storage.LocalStorage,
 | |
| 	distributedStore storage.DistributedStorage,
 | |
| 	encryptedStore storage.EncryptedStorage,
 | |
| 	backupManager storage.BackupManager,
 | |
| 	graph *temporalGraphImpl,
 | |
| 	config *PersistenceConfig,
 | |
| ) *persistenceManagerImpl {
 | |
| 	
 | |
| 	pm := &persistenceManagerImpl{
 | |
| 		contextStore:     contextStore,
 | |
| 		localStorage:     localStorage,
 | |
| 		distributedStore: distributedStore,
 | |
| 		encryptedStore:   encryptedStore,
 | |
| 		backupManager:    backupManager,
 | |
| 		graph:            graph,
 | |
| 		config:           config,
 | |
| 		pendingChanges:   make(map[string]*PendingChange),
 | |
| 		conflictResolver: NewDefaultConflictResolver(),
 | |
| 		batchSize:        config.BatchSize,
 | |
| 		writeBuffer:      make([]*TemporalNode, 0, config.BatchSize),
 | |
| 		flushInterval:    config.FlushInterval,
 | |
| 	}
 | |
| 	
 | |
| 	// Start background processes
 | |
| 	if config.EnableAutoSync {
 | |
| 		go pm.syncWorker()
 | |
| 	}
 | |
| 	
 | |
| 	if config.EnableWriteBuffer {
 | |
| 		go pm.flushWorker()
 | |
| 	}
 | |
| 	
 | |
| 	if config.EnableAutoBackup {
 | |
| 		go pm.backupWorker()
 | |
| 	}
 | |
| 	
 | |
| 	return pm
 | |
| }
 | |
| 
 | |
| // PersistTemporalNode persists a temporal node to storage
 | |
| func (pm *persistenceManagerImpl) PersistTemporalNode(ctx context.Context, node *TemporalNode) error {
 | |
| 	pm.mu.Lock()
 | |
| 	defer pm.mu.Unlock()
 | |
| 	
 | |
| 	// Add to write buffer if enabled
 | |
| 	if pm.config.EnableWriteBuffer {
 | |
| 		return pm.addToWriteBuffer(node)
 | |
| 	}
 | |
| 	
 | |
| 	// Direct persistence
 | |
| 	return pm.persistNodeDirect(ctx, node)
 | |
| }
 | |
| 
 | |
| // LoadTemporalGraph loads the temporal graph from storage
 | |
| func (pm *persistenceManagerImpl) LoadTemporalGraph(ctx context.Context) error {
 | |
| 	pm.mu.Lock()
 | |
| 	defer pm.mu.Unlock()
 | |
| 	
 | |
| 	// Load from different storage layers
 | |
| 	if pm.config.EnableLocalStorage {
 | |
| 		if err := pm.loadFromLocalStorage(ctx); err != nil {
 | |
| 			return fmt.Errorf("failed to load from local storage: %w", err)
 | |
| 		}
 | |
| 	}
 | |
| 	
 | |
| 	if pm.config.EnableDistributedStorage {
 | |
| 		if err := pm.loadFromDistributedStorage(ctx); err != nil {
 | |
| 			return fmt.Errorf("failed to load from distributed storage: %w", err)
 | |
| 		}
 | |
| 	}
 | |
| 	
 | |
| 	return nil
 | |
| }
 | |
| 
 | |
| // SynchronizeGraph synchronizes the temporal graph with distributed storage
 | |
| func (pm *persistenceManagerImpl) SynchronizeGraph(ctx context.Context) (*SyncResult, error) {
 | |
| 	pm.mu.Lock()
 | |
| 	if pm.syncInProgress {
 | |
| 		pm.mu.Unlock()
 | |
| 		return nil, fmt.Errorf("synchronization already in progress")
 | |
| 	}
 | |
| 	pm.syncInProgress = true
 | |
| 	pm.mu.Unlock()
 | |
| 	
 | |
| 	defer func() {
 | |
| 		pm.mu.Lock()
 | |
| 		pm.syncInProgress = false
 | |
| 		pm.lastSyncTime = time.Now()
 | |
| 		pm.mu.Unlock()
 | |
| 	}()
 | |
| 	
 | |
| 	result := &SyncResult{
 | |
| 		StartTime: time.Now(),
 | |
| 		Errors:    make([]string, 0),
 | |
| 	}
 | |
| 	
 | |
| 	// Create local snapshot
 | |
| 	localSnapshot, err := pm.createGraphSnapshot()
 | |
| 	if err != nil {
 | |
| 		result.Errors = append(result.Errors, fmt.Sprintf("failed to create local snapshot: %v", err))
 | |
| 		result.Success = false
 | |
| 		return result, err
 | |
| 	}
 | |
| 	
 | |
| 	// Get remote snapshot
 | |
| 	remoteSnapshot, err := pm.getRemoteSnapshot(ctx)
 | |
| 	if err != nil {
 | |
| 		// Remote might not exist yet, continue with local
 | |
| 		remoteSnapshot = nil
 | |
| 	}
 | |
| 	
 | |
| 	// Perform synchronization
 | |
| 	if remoteSnapshot != nil {
 | |
| 		err = pm.performBidirectionalSync(ctx, localSnapshot, remoteSnapshot, result)
 | |
| 	} else {
 | |
| 		err = pm.performInitialSync(ctx, localSnapshot, result)
 | |
| 	}
 | |
| 	
 | |
| 	if err != nil {
 | |
| 		result.Errors = append(result.Errors, fmt.Sprintf("sync failed: %v", err))
 | |
| 		result.Success = false
 | |
| 	} else {
 | |
| 		result.Success = true
 | |
| 	}
 | |
| 	
 | |
| 	result.EndTime = time.Now()
 | |
| 	result.Duration = result.EndTime.Sub(result.StartTime)
 | |
| 	
 | |
| 	return result, err
 | |
| }
 | |
| 
 | |
| // BackupGraph creates a backup of the temporal graph
 | |
| func (pm *persistenceManagerImpl) BackupGraph(ctx context.Context) error {
 | |
| 	pm.mu.RLock()
 | |
| 	defer pm.mu.RUnlock()
 | |
| 	
 | |
| 	if !pm.config.EnableAutoBackup {
 | |
| 		return fmt.Errorf("backup not enabled")
 | |
| 	}
 | |
| 	
 | |
| 	// Create graph snapshot
 | |
| 	snapshot, err := pm.createGraphSnapshot()
 | |
| 	if err != nil {
 | |
| 		return fmt.Errorf("failed to create snapshot: %w", err)
 | |
| 	}
 | |
| 	
 | |
| 	// Serialize snapshot
 | |
| 	data, err := json.Marshal(snapshot)
 | |
| 	if err != nil {
 | |
| 		return fmt.Errorf("failed to serialize snapshot: %w", err)
 | |
| 	}
 | |
| 	
 | |
| 	// Create backup configuration
 | |
| 	backupConfig := &storage.BackupConfig{
 | |
| 		Type:        "temporal_graph",
 | |
| 		Description: "Temporal graph backup",
 | |
| 		Tags:        []string{"temporal", "graph", "decision"},
 | |
| 		Metadata: map[string]interface{}{
 | |
| 			"node_count":     snapshot.Metadata.NodeCount,
 | |
| 			"edge_count":     snapshot.Metadata.EdgeCount,
 | |
| 			"decision_count": snapshot.Metadata.DecisionCount,
 | |
| 		},
 | |
| 	}
 | |
| 	
 | |
| 	// Create backup
 | |
| 	_, err = pm.backupManager.CreateBackup(ctx, backupConfig)
 | |
| 	return err
 | |
| }
 | |
| 
 | |
| // RestoreGraph restores the temporal graph from a backup
 | |
| func (pm *persistenceManagerImpl) RestoreGraph(ctx context.Context, backupID string) error {
 | |
| 	pm.mu.Lock()
 | |
| 	defer pm.mu.Unlock()
 | |
| 	
 | |
| 	// Create restore configuration
 | |
| 	restoreConfig := &storage.RestoreConfig{
 | |
| 		OverwriteExisting: true,
 | |
| 		ValidateIntegrity: true,
 | |
| 	}
 | |
| 	
 | |
| 	// Restore from backup
 | |
| 	err := pm.backupManager.RestoreBackup(ctx, backupID, restoreConfig)
 | |
| 	if err != nil {
 | |
| 		return fmt.Errorf("failed to restore backup: %w", err)
 | |
| 	}
 | |
| 	
 | |
| 	// Reload graph from storage
 | |
| 	return pm.LoadTemporalGraph(ctx)
 | |
| }
 | |
| 
 | |
| // Internal persistence methods
 | |
| 
 | |
| func (pm *persistenceManagerImpl) addToWriteBuffer(node *TemporalNode) error {
 | |
| 	pm.bufferMutex.Lock()
 | |
| 	defer pm.bufferMutex.Unlock()
 | |
| 	
 | |
| 	pm.writeBuffer = append(pm.writeBuffer, node)
 | |
| 	
 | |
| 	// Check if buffer is full
 | |
| 	if len(pm.writeBuffer) >= pm.batchSize {
 | |
| 		return pm.flushWriteBuffer()
 | |
| 	}
 | |
| 	
 | |
| 	return nil
 | |
| }
 | |
| 
 | |
| func (pm *persistenceManagerImpl) flushWriteBuffer() error {
 | |
| 	if len(pm.writeBuffer) == 0 {
 | |
| 		return nil
 | |
| 	}
 | |
| 	
 | |
| 	// Create batch store request
 | |
| 	batch := &storage.BatchStoreRequest{
 | |
| 		Operations: make([]*storage.BatchStoreOperation, len(pm.writeBuffer)),
 | |
| 	}
 | |
| 	
 | |
| 	for i, node := range pm.writeBuffer {
 | |
| 		key := pm.generateNodeKey(node)
 | |
| 		
 | |
| 		batch.Operations[i] = &storage.BatchStoreOperation{
 | |
| 			Type: "store",
 | |
| 			Key:  key,
 | |
| 			Data: node,
 | |
| 			Roles: pm.config.EncryptionRoles,
 | |
| 		}
 | |
| 	}
 | |
| 	
 | |
| 	// Execute batch store
 | |
| 	ctx := context.Background()
 | |
| 	_, err := pm.contextStore.BatchStore(ctx, batch)
 | |
| 	if err != nil {
 | |
| 		return fmt.Errorf("failed to flush write buffer: %w", err)
 | |
| 	}
 | |
| 	
 | |
| 	// Clear buffer
 | |
| 	pm.writeBuffer = pm.writeBuffer[:0]
 | |
| 	pm.lastFlush = time.Now()
 | |
| 	
 | |
| 	return nil
 | |
| }
 | |
| 
 | |
| func (pm *persistenceManagerImpl) persistNodeDirect(ctx context.Context, node *TemporalNode) error {
 | |
| 	key := pm.generateNodeKey(node)
 | |
| 	
 | |
| 	// Store in different layers
 | |
| 	if pm.config.EnableLocalStorage {
 | |
| 		if err := pm.localStorage.Store(ctx, key, node, nil); err != nil {
 | |
| 			return fmt.Errorf("failed to store in local storage: %w", err)
 | |
| 		}
 | |
| 	}
 | |
| 	
 | |
| 	if pm.config.EnableDistributedStorage {
 | |
| 		if err := pm.distributedStore.Store(ctx, key, node, nil); err != nil {
 | |
| 			return fmt.Errorf("failed to store in distributed storage: %w", err)
 | |
| 		}
 | |
| 	}
 | |
| 	
 | |
| 	if pm.config.EnableEncryption {
 | |
| 		if err := pm.encryptedStore.StoreEncrypted(ctx, key, node, pm.config.EncryptionRoles); err != nil {
 | |
| 			return fmt.Errorf("failed to store encrypted: %w", err)
 | |
| 		}
 | |
| 	}
 | |
| 	
 | |
| 	// Add to pending changes for synchronization
 | |
| 	change := &PendingChange{
 | |
| 		ID:        fmt.Sprintf("%s-%d", node.ID, time.Now().UnixNano()),
 | |
| 		Type:      ChangeTypeNodeCreated,
 | |
| 		NodeID:    node.ID,
 | |
| 		Data:      node,
 | |
| 		Timestamp: time.Now(),
 | |
| 		Metadata:  make(map[string]interface{}),
 | |
| 	}
 | |
| 	
 | |
| 	pm.pendingChanges[change.ID] = change
 | |
| 	
 | |
| 	return nil
 | |
| }
 | |
| 
 | |
| func (pm *persistenceManagerImpl) loadFromLocalStorage(ctx context.Context) error {
 | |
| 	// Load graph metadata
 | |
| 	metadataKey := pm.generateMetadataKey()
 | |
| 	metadataData, err := pm.localStorage.Retrieve(ctx, metadataKey)
 | |
| 	if err != nil {
 | |
| 		return fmt.Errorf("failed to load metadata: %w", err)
 | |
| 	}
 | |
| 	
 | |
| 	var metadata *GraphMetadata
 | |
| 	if err := json.Unmarshal(metadataData.([]byte), &metadata); err != nil {
 | |
| 		return fmt.Errorf("failed to unmarshal metadata: %w", err)
 | |
| 	}
 | |
| 	
 | |
| 	// Load all nodes
 | |
| 	pattern := pm.generateNodeKeyPattern()
 | |
| 	nodeKeys, err := pm.localStorage.List(ctx, pattern)
 | |
| 	if err != nil {
 | |
| 		return fmt.Errorf("failed to list nodes: %w", err)
 | |
| 	}
 | |
| 	
 | |
| 	// Load nodes in batches
 | |
| 	batchReq := &storage.BatchRetrieveRequest{
 | |
| 		Keys: nodeKeys,
 | |
| 	}
 | |
| 	
 | |
| 	batchResult, err := pm.contextStore.BatchRetrieve(ctx, batchReq)
 | |
| 	if err != nil {
 | |
| 		return fmt.Errorf("failed to batch retrieve nodes: %w", err)
 | |
| 	}
 | |
| 	
 | |
| 	// Reconstruct graph
 | |
| 	pm.graph.mu.Lock()
 | |
| 	defer pm.graph.mu.Unlock()
 | |
| 	
 | |
| 	pm.graph.nodes = make(map[string]*TemporalNode)
 | |
| 	pm.graph.addressToNodes = make(map[string][]*TemporalNode)
 | |
| 	pm.graph.influences = make(map[string][]string)
 | |
| 	pm.graph.influencedBy = make(map[string][]string)
 | |
| 	
 | |
| 	for key, result := range batchResult.Results {
 | |
| 		if result.Error != nil {
 | |
| 			continue // Skip failed retrievals
 | |
| 		}
 | |
| 		
 | |
| 		var node *TemporalNode
 | |
| 		if err := json.Unmarshal(result.Data.([]byte), &node); err != nil {
 | |
| 			continue // Skip invalid nodes
 | |
| 		}
 | |
| 		
 | |
| 		pm.reconstructGraphNode(node)
 | |
| 	}
 | |
| 	
 | |
| 	return nil
 | |
| }
 | |
| 
 | |
| func (pm *persistenceManagerImpl) loadFromDistributedStorage(ctx context.Context) error {
 | |
| 	// Similar to local storage but using distributed store
 | |
| 	// Implementation would be similar to loadFromLocalStorage
 | |
| 	return nil
 | |
| }
 | |
| 
 | |
| func (pm *persistenceManagerImpl) createGraphSnapshot() (*GraphSnapshot, error) {
 | |
| 	pm.graph.mu.RLock()
 | |
| 	defer pm.graph.mu.RUnlock()
 | |
| 	
 | |
| 	snapshot := &GraphSnapshot{
 | |
| 		Timestamp:    time.Now(),
 | |
| 		Nodes:        make(map[string]*TemporalNode),
 | |
| 		Influences:   make(map[string][]string),
 | |
| 		InfluencedBy: make(map[string][]string),
 | |
| 		Decisions:    make(map[string]*DecisionMetadata),
 | |
| 		Metadata: &GraphMetadata{
 | |
| 			Version:       1,
 | |
| 			LastModified:  time.Now(),
 | |
| 			NodeCount:     len(pm.graph.nodes),
 | |
| 			EdgeCount:     pm.calculateEdgeCount(),
 | |
| 			DecisionCount: len(pm.graph.decisions),
 | |
| 			CreatedBy:     "temporal_graph",
 | |
| 			CreatedAt:     time.Now(),
 | |
| 		},
 | |
| 	}
 | |
| 	
 | |
| 	// Copy nodes
 | |
| 	for id, node := range pm.graph.nodes {
 | |
| 		snapshot.Nodes[id] = node
 | |
| 	}
 | |
| 	
 | |
| 	// Copy influences
 | |
| 	for id, influences := range pm.graph.influences {
 | |
| 		snapshot.Influences[id] = make([]string, len(influences))
 | |
| 		copy(snapshot.Influences[id], influences)
 | |
| 	}
 | |
| 	
 | |
| 	// Copy influenced by
 | |
| 	for id, influencedBy := range pm.graph.influencedBy {
 | |
| 		snapshot.InfluencedBy[id] = make([]string, len(influencedBy))
 | |
| 		copy(snapshot.InfluencedBy[id], influencedBy)
 | |
| 	}
 | |
| 	
 | |
| 	// Copy decisions
 | |
| 	for id, decision := range pm.graph.decisions {
 | |
| 		snapshot.Decisions[id] = decision
 | |
| 	}
 | |
| 	
 | |
| 	// Calculate checksum
 | |
| 	snapshot.Checksum = pm.calculateSnapshotChecksum(snapshot)
 | |
| 	
 | |
| 	return snapshot, nil
 | |
| }
 | |
| 
 | |
| func (pm *persistenceManagerImpl) getRemoteSnapshot(ctx context.Context) (*GraphSnapshot, error) {
 | |
| 	key := pm.generateGraphKey()
 | |
| 	
 | |
| 	data, err := pm.distributedStore.Retrieve(ctx, key)
 | |
| 	if err != nil {
 | |
| 		return nil, err
 | |
| 	}
 | |
| 	
 | |
| 	var snapshot *GraphSnapshot
 | |
| 	if err := json.Unmarshal(data.([]byte), &snapshot); err != nil {
 | |
| 		return nil, fmt.Errorf("failed to unmarshal remote snapshot: %w", err)
 | |
| 	}
 | |
| 	
 | |
| 	return snapshot, nil
 | |
| }
 | |
| 
 | |
| func (pm *persistenceManagerImpl) performBidirectionalSync(ctx context.Context, local, remote *GraphSnapshot, result *SyncResult) error {
 | |
| 	// Compare snapshots and identify differences
 | |
| 	conflicts := pm.identifyConflicts(local, remote)
 | |
| 	result.ConflictsFound = len(conflicts)
 | |
| 	
 | |
| 	// Resolve conflicts
 | |
| 	for _, conflict := range conflicts {
 | |
| 		resolved, err := pm.resolveConflict(ctx, conflict)
 | |
| 		if err != nil {
 | |
| 			result.Errors = append(result.Errors, fmt.Sprintf("failed to resolve conflict %s: %v", conflict.NodeID, err))
 | |
| 			continue
 | |
| 		}
 | |
| 		
 | |
| 		// Apply resolution
 | |
| 		if err := pm.applyConflictResolution(ctx, resolved); err != nil {
 | |
| 			result.Errors = append(result.Errors, fmt.Sprintf("failed to apply resolution for %s: %v", conflict.NodeID, err))
 | |
| 			continue
 | |
| 		}
 | |
| 		
 | |
| 		result.ConflictsResolved++
 | |
| 	}
 | |
| 	
 | |
| 	// Sync local changes to remote
 | |
| 	err := pm.syncLocalToRemote(ctx, local, remote, result)
 | |
| 	if err != nil {
 | |
| 		return fmt.Errorf("failed to sync local to remote: %w", err)
 | |
| 	}
 | |
| 	
 | |
| 	// Sync remote changes to local
 | |
| 	err = pm.syncRemoteToLocal(ctx, remote, local, result)
 | |
| 	if err != nil {
 | |
| 		return fmt.Errorf("failed to sync remote to local: %w", err)
 | |
| 	}
 | |
| 	
 | |
| 	return nil
 | |
| }
 | |
| 
 | |
| func (pm *persistenceManagerImpl) performInitialSync(ctx context.Context, local *GraphSnapshot, result *SyncResult) error {
 | |
| 	// Store entire local snapshot to remote
 | |
| 	key := pm.generateGraphKey()
 | |
| 	
 | |
| 	data, err := json.Marshal(local)
 | |
| 	if err != nil {
 | |
| 		return fmt.Errorf("failed to marshal snapshot: %w", err)
 | |
| 	}
 | |
| 	
 | |
| 	err = pm.distributedStore.Store(ctx, key, data, nil)
 | |
| 	if err != nil {
 | |
| 		return fmt.Errorf("failed to store snapshot: %w", err)
 | |
| 	}
 | |
| 	
 | |
| 	result.NodesProcessed = len(local.Nodes)
 | |
| 	result.NodesCreated = len(local.Nodes)
 | |
| 	
 | |
| 	return nil
 | |
| }
 | |
| 
 | |
| // Background workers
 | |
| 
 | |
| func (pm *persistenceManagerImpl) syncWorker() {
 | |
| 	ticker := time.NewTicker(pm.config.SyncInterval)
 | |
| 	defer ticker.Stop()
 | |
| 	
 | |
| 	for range ticker.C {
 | |
| 		ctx := context.Background()
 | |
| 		if _, err := pm.SynchronizeGraph(ctx); err != nil {
 | |
| 			// Log error but continue
 | |
| 			fmt.Printf("sync worker error: %v\n", err)
 | |
| 		}
 | |
| 	}
 | |
| }
 | |
| 
 | |
| func (pm *persistenceManagerImpl) flushWorker() {
 | |
| 	ticker := time.NewTicker(pm.flushInterval)
 | |
| 	defer ticker.Stop()
 | |
| 	
 | |
| 	for range ticker.C {
 | |
| 		pm.bufferMutex.Lock()
 | |
| 		if time.Since(pm.lastFlush) >= pm.flushInterval && len(pm.writeBuffer) > 0 {
 | |
| 			if err := pm.flushWriteBuffer(); err != nil {
 | |
| 				fmt.Printf("flush worker error: %v\n", err)
 | |
| 			}
 | |
| 		}
 | |
| 		pm.bufferMutex.Unlock()
 | |
| 	}
 | |
| }
 | |
| 
 | |
| func (pm *persistenceManagerImpl) backupWorker() {
 | |
| 	ticker := time.NewTicker(pm.config.BackupInterval)
 | |
| 	defer ticker.Stop()
 | |
| 	
 | |
| 	for range ticker.C {
 | |
| 		ctx := context.Background()
 | |
| 		if err := pm.BackupGraph(ctx); err != nil {
 | |
| 			fmt.Printf("backup worker error: %v\n", err)
 | |
| 		}
 | |
| 	}
 | |
| }
 | |
| 
 | |
| // Helper methods
 | |
| 
 | |
| func (pm *persistenceManagerImpl) generateNodeKey(node *TemporalNode) string {
 | |
| 	return fmt.Sprintf("%s/nodes/%s", pm.config.KeyPrefix, node.ID)
 | |
| }
 | |
| 
 | |
| func (pm *persistenceManagerImpl) generateGraphKey() string {
 | |
| 	return fmt.Sprintf("%s/graph/snapshot", pm.config.KeyPrefix)
 | |
| }
 | |
| 
 | |
| func (pm *persistenceManagerImpl) generateMetadataKey() string {
 | |
| 	return fmt.Sprintf("%s/graph/metadata", pm.config.KeyPrefix)
 | |
| }
 | |
| 
 | |
| func (pm *persistenceManagerImpl) generateNodeKeyPattern() string {
 | |
| 	return fmt.Sprintf("%s/nodes/*", pm.config.KeyPrefix)
 | |
| }
 | |
| 
 | |
| func (pm *persistenceManagerImpl) calculateEdgeCount() int {
 | |
| 	count := 0
 | |
| 	for _, influences := range pm.graph.influences {
 | |
| 		count += len(influences)
 | |
| 	}
 | |
| 	return count
 | |
| }
 | |
| 
 | |
| func (pm *persistenceManagerImpl) calculateSnapshotChecksum(snapshot *GraphSnapshot) string {
 | |
| 	// Calculate checksum based on snapshot content
 | |
| 	data, _ := json.Marshal(snapshot.Nodes)
 | |
| 	return fmt.Sprintf("%x", data)[:16] // Simplified checksum
 | |
| }
 | |
| 
 | |
| func (pm *persistenceManagerImpl) reconstructGraphNode(node *TemporalNode) {
 | |
| 	// Add node to graph
 | |
| 	pm.graph.nodes[node.ID] = node
 | |
| 	
 | |
| 	// Update address mapping
 | |
| 	addressKey := node.UCXLAddress.String()
 | |
| 	if existing, exists := pm.graph.addressToNodes[addressKey]; exists {
 | |
| 		pm.graph.addressToNodes[addressKey] = append(existing, node)
 | |
| 	} else {
 | |
| 		pm.graph.addressToNodes[addressKey] = []*TemporalNode{node}
 | |
| 	}
 | |
| 	
 | |
| 	// Reconstruct influence relationships
 | |
| 	pm.graph.influences[node.ID] = make([]string, 0)
 | |
| 	pm.graph.influencedBy[node.ID] = make([]string, 0)
 | |
| 	
 | |
| 	// These would be rebuilt from the influence data in the snapshot
 | |
| }
 | |
| 
 | |
| func (pm *persistenceManagerImpl) identifyConflicts(local, remote *GraphSnapshot) []*SyncConflict {
 | |
| 	conflicts := make([]*SyncConflict, 0)
 | |
| 	
 | |
| 	// Compare nodes
 | |
| 	for nodeID, localNode := range local.Nodes {
 | |
| 		if remoteNode, exists := remote.Nodes[nodeID]; exists {
 | |
| 			if pm.hasNodeConflict(localNode, remoteNode) {
 | |
| 				conflict := &SyncConflict{
 | |
| 					Type:       ConflictTypeNodeMismatch,
 | |
| 					NodeID:     nodeID,
 | |
| 					LocalData:  localNode,
 | |
| 					RemoteData: remoteNode,
 | |
| 				}
 | |
| 				conflicts = append(conflicts, conflict)
 | |
| 			}
 | |
| 		}
 | |
| 	}
 | |
| 	
 | |
| 	return conflicts
 | |
| }
 | |
| 
 | |
| func (pm *persistenceManagerImpl) hasNodeConflict(local, remote *TemporalNode) bool {
 | |
| 	// Simple conflict detection based on timestamp and hash
 | |
| 	return local.Timestamp != remote.Timestamp || local.ContextHash != remote.ContextHash
 | |
| }
 | |
| 
 | |
| func (pm *persistenceManagerImpl) resolveConflict(ctx context.Context, conflict *SyncConflict) (*ConflictResolution, error) {
 | |
| 	// Use conflict resolver to resolve the conflict
 | |
| 	localNode := conflict.LocalData.(*TemporalNode)
 | |
| 	remoteNode := conflict.RemoteData.(*TemporalNode)
 | |
| 	
 | |
| 	resolvedNode, err := pm.conflictResolver.ResolveConflict(ctx, localNode, remoteNode)
 | |
| 	if err != nil {
 | |
| 		return nil, err
 | |
| 	}
 | |
| 	
 | |
| 	return &ConflictResolution{
 | |
| 		ConflictID:    conflict.NodeID,
 | |
| 		Resolution:    "merged",
 | |
| 		ResolvedData:  resolvedNode,
 | |
| 		ResolvedAt:    time.Now(),
 | |
| 	}, nil
 | |
| }
 | |
| 
 | |
| func (pm *persistenceManagerImpl) applyConflictResolution(ctx context.Context, resolution *ConflictResolution) error {
 | |
| 	// Apply the resolved node back to the graph
 | |
| 	resolvedNode := resolution.ResolvedData.(*TemporalNode)
 | |
| 	
 | |
| 	pm.graph.mu.Lock()
 | |
| 	pm.graph.nodes[resolvedNode.ID] = resolvedNode
 | |
| 	pm.graph.mu.Unlock()
 | |
| 	
 | |
| 	// Persist the resolved node
 | |
| 	return pm.persistNodeDirect(ctx, resolvedNode)
 | |
| }
 | |
| 
 | |
| func (pm *persistenceManagerImpl) syncLocalToRemote(ctx context.Context, local, remote *GraphSnapshot, result *SyncResult) error {
 | |
| 	// Sync nodes that exist locally but not remotely, or are newer locally
 | |
| 	for nodeID, localNode := range local.Nodes {
 | |
| 		shouldSync := false
 | |
| 		
 | |
| 		if remoteNode, exists := remote.Nodes[nodeID]; exists {
 | |
| 			// Check if local is newer
 | |
| 			if localNode.Timestamp.After(remoteNode.Timestamp) {
 | |
| 				shouldSync = true
 | |
| 			}
 | |
| 		} else {
 | |
| 			// Node doesn't exist remotely
 | |
| 			shouldSync = true
 | |
| 			result.NodesCreated++
 | |
| 		}
 | |
| 		
 | |
| 		if shouldSync {
 | |
| 			key := pm.generateNodeKey(localNode)
 | |
| 			data, err := json.Marshal(localNode)
 | |
| 			if err != nil {
 | |
| 				result.Errors = append(result.Errors, fmt.Sprintf("failed to marshal node %s: %v", nodeID, err))
 | |
| 				continue
 | |
| 			}
 | |
| 			
 | |
| 			err = pm.distributedStore.Store(ctx, key, data, nil)
 | |
| 			if err != nil {
 | |
| 				result.Errors = append(result.Errors, fmt.Sprintf("failed to sync node %s to remote: %v", nodeID, err))
 | |
| 				continue
 | |
| 			}
 | |
| 			
 | |
| 			if remoteNode, exists := remote.Nodes[nodeID]; exists && localNode.Timestamp.After(remoteNode.Timestamp) {
 | |
| 				result.NodesUpdated++
 | |
| 			}
 | |
| 		}
 | |
| 	}
 | |
| 	
 | |
| 	return nil
 | |
| }
 | |
| 
 | |
| func (pm *persistenceManagerImpl) syncRemoteToLocal(ctx context.Context, remote, local *GraphSnapshot, result *SyncResult) error {
 | |
| 	// Sync nodes that exist remotely but not locally, or are newer remotely
 | |
| 	for nodeID, remoteNode := range remote.Nodes {
 | |
| 		shouldSync := false
 | |
| 		
 | |
| 		if localNode, exists := local.Nodes[nodeID]; exists {
 | |
| 			// Check if remote is newer
 | |
| 			if remoteNode.Timestamp.After(localNode.Timestamp) {
 | |
| 				shouldSync = true
 | |
| 			}
 | |
| 		} else {
 | |
| 			// Node doesn't exist locally
 | |
| 			shouldSync = true
 | |
| 			result.NodesCreated++
 | |
| 		}
 | |
| 		
 | |
| 		if shouldSync {
 | |
| 			// Add to local graph
 | |
| 			pm.graph.mu.Lock()
 | |
| 			pm.graph.nodes[remoteNode.ID] = remoteNode
 | |
| 			pm.reconstructGraphNode(remoteNode)
 | |
| 			pm.graph.mu.Unlock()
 | |
| 			
 | |
| 			// Persist locally
 | |
| 			err := pm.persistNodeDirect(ctx, remoteNode)
 | |
| 			if err != nil {
 | |
| 				result.Errors = append(result.Errors, fmt.Sprintf("failed to sync node %s to local: %v", nodeID, err))
 | |
| 				continue
 | |
| 			}
 | |
| 			
 | |
| 			if localNode, exists := local.Nodes[nodeID]; exists && remoteNode.Timestamp.After(localNode.Timestamp) {
 | |
| 				result.NodesUpdated++
 | |
| 			}
 | |
| 		}
 | |
| 	}
 | |
| 	
 | |
| 	return nil
 | |
| }
 | |
| 
 | |
| // Supporting types for conflict resolution
 | |
| 
 | |
| type SyncConflict struct {
 | |
| 	Type       ConflictType    `json:"type"`
 | |
| 	NodeID     string          `json:"node_id"`
 | |
| 	LocalData  interface{}     `json:"local_data"`
 | |
| 	RemoteData interface{}     `json:"remote_data"`
 | |
| 	Severity   string          `json:"severity"`
 | |
| }
 | |
| 
 | |
| type ConflictType string
 | |
| 
 | |
| const (
 | |
| 	ConflictTypeNodeMismatch     ConflictType = "node_mismatch"
 | |
| 	ConflictTypeInfluenceMismatch ConflictType = "influence_mismatch"
 | |
| 	ConflictTypeMetadataMismatch  ConflictType = "metadata_mismatch"
 | |
| )
 | |
| 
 | |
| type ConflictResolution struct {
 | |
| 	ConflictID   string      `json:"conflict_id"`
 | |
| 	Resolution   string      `json:"resolution"`
 | |
| 	ResolvedData interface{} `json:"resolved_data"`
 | |
| 	ResolvedAt   time.Time   `json:"resolved_at"`
 | |
| 	ResolvedBy   string      `json:"resolved_by"`
 | |
| }
 | |
| 
 | |
| // Default conflict resolver implementation
 | |
| 
 | |
| type defaultConflictResolver struct{}
 | |
| 
 | |
| func NewDefaultConflictResolver() ConflictResolver {
 | |
| 	return &defaultConflictResolver{}
 | |
| }
 | |
| 
 | |
| func (dcr *defaultConflictResolver) ResolveConflict(ctx context.Context, local, remote *TemporalNode) (*TemporalNode, error) {
 | |
| 	// Default strategy: choose the one with higher confidence, or more recent if equal
 | |
| 	if local.Confidence > remote.Confidence {
 | |
| 		return local, nil
 | |
| 	} else if remote.Confidence > local.Confidence {
 | |
| 		return remote, nil
 | |
| 	} else {
 | |
| 		// Equal confidence, choose more recent
 | |
| 		if local.Timestamp.After(remote.Timestamp) {
 | |
| 			return local, nil
 | |
| 		}
 | |
| 		return remote, nil
 | |
| 	}
 | |
| }
 | |
| 
 | |
| func (dcr *defaultConflictResolver) ResolveGraphConflict(ctx context.Context, localGraph, remoteGraph *GraphSnapshot) (*GraphSnapshot, error) {
 | |
| 	// Default strategy: merge graphs, preferring more recent data
 | |
| 	if localGraph.Timestamp.After(remoteGraph.Timestamp) {
 | |
| 		return localGraph, nil
 | |
| 	}
 | |
| 	return remoteGraph, nil
 | |
| } |