From 0b670a535d969316d00ea6ec9449f75ad6d72d8f Mon Sep 17 00:00:00 2001 From: anthonyrawlins Date: Sat, 27 Sep 2025 15:26:25 +1000 Subject: [PATCH] Wire SLURP persistence and add restart coverage --- docker/docker-compose.yml | 9 +- docs/progress/report-SEC-SLURP-1.1.md | 14 + pkg/slurp/leader/manager.go | 274 ++++--- pkg/slurp/slurp.go | 1064 +++++++++++++++++++------ pkg/slurp/slurp_persistence_test.go | 69 ++ 5 files changed, 1061 insertions(+), 369 deletions(-) create mode 100644 docs/progress/report-SEC-SLURP-1.1.md create mode 100644 pkg/slurp/slurp_persistence_test.go diff --git a/docker/docker-compose.yml b/docker/docker-compose.yml index 134deec..8ad086d 100644 --- a/docker/docker-compose.yml +++ b/docker/docker-compose.yml @@ -145,7 +145,7 @@ services: start_period: 10s whoosh: - image: anthonyrawlins/whoosh:scaling-v1.0.0 + image: anthonyrawlins/whoosh:latest ports: - target: 8080 published: 8800 @@ -200,6 +200,9 @@ services: WHOOSH_BACKBEAT_AGENT_ID: "whoosh" WHOOSH_BACKBEAT_NATS_URL: "nats://backbeat-nats:4222" + # Docker integration configuration (disabled for agent assignment architecture) + WHOOSH_DOCKER_ENABLED: "false" + secrets: - whoosh_db_password - gitea_token @@ -207,8 +210,8 @@ services: - jwt_secret - service_tokens - redis_password - volumes: - - /var/run/docker.sock:/var/run/docker.sock + # volumes: + # - /var/run/docker.sock:/var/run/docker.sock # Disabled for agent assignment architecture deploy: replicas: 2 restart_policy: diff --git a/docs/progress/report-SEC-SLURP-1.1.md b/docs/progress/report-SEC-SLURP-1.1.md new file mode 100644 index 0000000..fd43688 --- /dev/null +++ b/docs/progress/report-SEC-SLURP-1.1.md @@ -0,0 +1,14 @@ +# SEC-SLURP 1.1 Persistence Wiring Report + +## Summary of Changes +- Added LevelDB-backed persistence scaffolding in `pkg/slurp/slurp.go`, capturing the storage path, local storage handle, and the roadmap-tagged metrics helpers required for SEC-SLURP 1.1. +- Upgraded SLURP’s lifecycle so initialization bootstraps cached context data from disk, cache misses hydrate from persistence, successful `UpsertContext` calls write back to LevelDB, and shutdown closes the store with error telemetry. +- Introduced `pkg/slurp/slurp_persistence_test.go` to confirm contexts survive process restarts and can be resolved after clearing in-memory caches. +- Instrumented cache/persistence metrics so hit/miss ratios and storage failures are tracked for observability. +- Attempted `GOWORK=off go test ./pkg/slurp`; execution was blocked by legacy references to `config.Authority*` symbols in `pkg/slurp/context`, so the new test did not run. + +## Recommended Next Steps +- Address the `config.Authority*` symbol drift (or scope down the impacted packages) so the SLURP test suite can compile cleanly, then rerun `GOWORK=off go test ./pkg/slurp` to validate persistence changes. +- Feed the durable store into the resolver and temporal graph implementations to finish the remaining Phase 1 SLURP roadmap items. +- Expand Prometheus metrics and logging to track cache hit/miss ratios plus persistence errors for SEC-SLURP observability goals. +- Review unrelated changes on `feature/phase-4-real-providers` (e.g., docker-compose edits) and either align them with this roadmap work or revert to keep the branch focused. diff --git a/pkg/slurp/leader/manager.go b/pkg/slurp/leader/manager.go index c86f706..d1ce6db 100644 --- a/pkg/slurp/leader/manager.go +++ b/pkg/slurp/leader/manager.go @@ -8,12 +8,11 @@ import ( "sync" "time" - "chorus/pkg/election" "chorus/pkg/dht" - "chorus/pkg/ucxl" + "chorus/pkg/election" + slurpContext "chorus/pkg/slurp/context" "chorus/pkg/slurp/intelligence" "chorus/pkg/slurp/storage" - slurpContext "chorus/pkg/slurp/context" ) // ContextManager handles leader-only context generation duties @@ -25,34 +24,34 @@ type ContextManager interface { // RequestContextGeneration queues a context generation request // Only the leader processes these requests to prevent conflicts RequestContextGeneration(req *ContextGenerationRequest) error - + // RequestFromLeader allows non-leader nodes to request context from leader RequestFromLeader(req *ContextGenerationRequest) (*ContextGenerationResult, error) - + // GetGenerationStatus returns status of context generation operations GetGenerationStatus() (*GenerationStatus, error) - + // GetQueueStatus returns status of the generation queue GetQueueStatus() (*QueueStatus, error) - + // CancelGeneration cancels pending or active generation task CancelGeneration(taskID string) error - + // PrioritizeGeneration changes priority of queued generation task PrioritizeGeneration(taskID string, priority Priority) error - + // IsLeader returns whether this node is the current leader IsLeader() bool - + // WaitForLeadership blocks until this node becomes leader WaitForLeadership(ctx context.Context) error - + // GetLeaderInfo returns information about current leader GetLeaderInfo() (*LeaderInfo, error) - + // TransferLeadership initiates graceful leadership transfer TransferLeadership(ctx context.Context, targetNodeID string) error - + // GetManagerStats returns manager performance statistics GetManagerStats() (*ManagerStatistics, error) } @@ -64,25 +63,25 @@ type ContextManager interface { type GenerationCoordinator interface { // CoordinateGeneration coordinates generation of context across cluster CoordinateGeneration(ctx context.Context, req *ContextGenerationRequest) (*CoordinationResult, error) - + // DistributeGeneration distributes generation task to appropriate node DistributeGeneration(ctx context.Context, task *GenerationTask) error - + // CollectGenerationResults collects results from distributed generation CollectGenerationResults(ctx context.Context, taskID string) (*GenerationResults, error) - + // CheckGenerationStatus checks status of distributed generation CheckGenerationStatus(ctx context.Context, taskID string) (*TaskStatus, error) - + // RebalanceLoad rebalances generation load across cluster nodes RebalanceLoad(ctx context.Context) (*RebalanceResult, error) - + // GetClusterCapacity returns current cluster generation capacity GetClusterCapacity() (*ClusterCapacity, error) - + // SetGenerationPolicy configures generation coordination policy SetGenerationPolicy(policy *GenerationPolicy) error - + // GetCoordinationStats returns coordination performance statistics GetCoordinationStats() (*CoordinationStatistics, error) } @@ -95,31 +94,31 @@ type GenerationCoordinator interface { type QueueManager interface { // EnqueueRequest adds request to generation queue EnqueueRequest(req *ContextGenerationRequest) error - + // DequeueRequest gets next request from queue DequeueRequest() (*ContextGenerationRequest, error) - + // PeekQueue shows next request without removing it PeekQueue() (*ContextGenerationRequest, error) - + // UpdateRequestPriority changes priority of queued request UpdateRequestPriority(requestID string, priority Priority) error - + // CancelRequest removes request from queue CancelRequest(requestID string) error - + // GetQueueLength returns current queue length GetQueueLength() int - + // GetQueuedRequests returns all queued requests GetQueuedRequests() ([]*ContextGenerationRequest, error) - + // ClearQueue removes all requests from queue ClearQueue() error - + // SetQueuePolicy configures queue management policy SetQueuePolicy(policy *QueuePolicy) error - + // GetQueueStats returns queue performance statistics GetQueueStats() (*QueueStatistics, error) } @@ -131,25 +130,25 @@ type QueueManager interface { type FailoverManager interface { // PrepareFailover prepares current state for potential failover PrepareFailover(ctx context.Context) (*FailoverState, error) - + // ExecuteFailover executes failover to become new leader ExecuteFailover(ctx context.Context, previousState *FailoverState) error - + // TransferState transfers leadership state to another node TransferState(ctx context.Context, targetNodeID string) error - + // ReceiveState receives leadership state from previous leader ReceiveState(ctx context.Context, state *FailoverState) error - + // ValidateState validates received failover state ValidateState(state *FailoverState) (*StateValidation, error) - + // RecoverFromFailover recovers operations after failover RecoverFromFailover(ctx context.Context) (*RecoveryResult, error) - + // GetFailoverHistory returns history of failover events GetFailoverHistory() ([]*FailoverEvent, error) - + // GetFailoverStats returns failover statistics GetFailoverStats() (*FailoverStatistics, error) } @@ -161,25 +160,25 @@ type FailoverManager interface { type ClusterCoordinator interface { // SynchronizeCluster synchronizes context state across cluster SynchronizeCluster(ctx context.Context) (*SyncResult, error) - + // GetClusterState returns current cluster state GetClusterState() (*ClusterState, error) - + // GetNodeHealth returns health status of cluster nodes GetNodeHealth() (map[string]*NodeHealth, error) - + // EvictNode removes unresponsive node from cluster operations EvictNode(ctx context.Context, nodeID string) error - + // AddNode adds new node to cluster operations AddNode(ctx context.Context, nodeID string, nodeInfo *NodeInfo) error - + // BroadcastMessage broadcasts message to all cluster nodes BroadcastMessage(ctx context.Context, message *ClusterMessage) error - + // GetClusterMetrics returns cluster performance metrics GetClusterMetrics() (*ClusterMetrics, error) - + // ConfigureCluster configures cluster coordination parameters ConfigureCluster(config *ClusterConfig) error } @@ -191,25 +190,25 @@ type ClusterCoordinator interface { type HealthMonitor interface { // CheckHealth performs comprehensive health check CheckHealth(ctx context.Context) (*HealthStatus, error) - + // CheckNodeHealth checks health of specific node CheckNodeHealth(ctx context.Context, nodeID string) (*NodeHealth, error) - + // CheckQueueHealth checks health of generation queue CheckQueueHealth() (*QueueHealth, error) - + // CheckLeaderHealth checks health of leader node CheckLeaderHealth() (*LeaderHealth, error) - + // GetHealthMetrics returns health monitoring metrics GetHealthMetrics() (*HealthMetrics, error) - + // SetHealthPolicy configures health monitoring policy SetHealthPolicy(policy *HealthPolicy) error - + // GetHealthHistory returns history of health events GetHealthHistory(timeRange time.Duration) ([]*HealthEvent, error) - + // SubscribeToHealthEvents subscribes to health event notifications SubscribeToHealthEvents(handler HealthEventHandler) error } @@ -218,19 +217,19 @@ type HealthMonitor interface { type ResourceManager interface { // AllocateResources allocates resources for context generation AllocateResources(req *ResourceRequest) (*ResourceAllocation, error) - + // ReleaseResources releases allocated resources ReleaseResources(allocationID string) error - + // GetAvailableResources returns currently available resources GetAvailableResources() (*AvailableResources, error) - + // SetResourceLimits configures resource usage limits SetResourceLimits(limits *ResourceLimits) error - + // GetResourceUsage returns current resource usage statistics GetResourceUsage() (*ResourceUsage, error) - + // RebalanceResources rebalances resources across operations RebalanceResources(ctx context.Context) (*ResourceRebalanceResult, error) } @@ -244,12 +243,13 @@ type LeaderContextManager struct { intelligence intelligence.IntelligenceEngine storage storage.ContextStore contextResolver slurpContext.ContextResolver - + contextUpserter slurp.ContextPersister + // Context generation state generationQueue chan *ContextGenerationRequest activeJobs map[string]*ContextGenerationJob completedJobs map[string]*ContextGenerationJob - + // Coordination components coordinator GenerationCoordinator queueManager QueueManager @@ -257,16 +257,23 @@ type LeaderContextManager struct { clusterCoord ClusterCoordinator healthMonitor HealthMonitor resourceManager ResourceManager - + // Configuration - config *ManagerConfig - + config *ManagerConfig + // Statistics - stats *ManagerStatistics - + stats *ManagerStatistics + // Shutdown coordination - shutdownChan chan struct{} - shutdownOnce sync.Once + shutdownChan chan struct{} + shutdownOnce sync.Once +} + +// SetContextPersister registers the SLURP persistence hook (Roadmap: SEC-SLURP 1.1). +func (cm *LeaderContextManager) SetContextPersister(persister slurp.ContextPersister) { + cm.mu.Lock() + defer cm.mu.Unlock() + cm.contextUpserter = persister } // NewContextManager creates a new leader context manager @@ -279,18 +286,18 @@ func NewContextManager( ) *LeaderContextManager { cm := &LeaderContextManager{ election: election, - dht: dht, - intelligence: intelligence, - storage: storage, + dht: dht, + intelligence: intelligence, + storage: storage, contextResolver: resolver, generationQueue: make(chan *ContextGenerationRequest, 1000), - activeJobs: make(map[string]*ContextGenerationJob), - completedJobs: make(map[string]*ContextGenerationJob), - shutdownChan: make(chan struct{}), - config: DefaultManagerConfig(), - stats: &ManagerStatistics{}, + activeJobs: make(map[string]*ContextGenerationJob), + completedJobs: make(map[string]*ContextGenerationJob), + shutdownChan: make(chan struct{}), + config: DefaultManagerConfig(), + stats: &ManagerStatistics{}, } - + // Initialize coordination components cm.coordinator = NewGenerationCoordinator(cm) cm.queueManager = NewQueueManager(cm) @@ -298,13 +305,13 @@ func NewContextManager( cm.clusterCoord = NewClusterCoordinator(cm) cm.healthMonitor = NewHealthMonitor(cm) cm.resourceManager = NewResourceManager(cm) - + // Start background processes go cm.watchLeadershipChanges() go cm.processContextGeneration() go cm.monitorHealth() go cm.syncCluster() - + return cm } @@ -313,17 +320,17 @@ func (cm *LeaderContextManager) RequestContextGeneration(req *ContextGenerationR if !cm.IsLeader() { return ErrNotLeader } - + // Validate request if err := cm.validateRequest(req); err != nil { return err } - + // Check for duplicates if cm.isDuplicate(req) { return ErrDuplicateRequest } - + // Enqueue request select { case cm.generationQueue <- req: @@ -346,7 +353,7 @@ func (cm *LeaderContextManager) IsLeader() bool { func (cm *LeaderContextManager) GetGenerationStatus() (*GenerationStatus, error) { cm.mu.RLock() defer cm.mu.RUnlock() - + status := &GenerationStatus{ ActiveTasks: len(cm.activeJobs), QueuedTasks: len(cm.generationQueue), @@ -354,14 +361,14 @@ func (cm *LeaderContextManager) GetGenerationStatus() (*GenerationStatus, error) IsLeader: cm.isLeader, LastUpdate: time.Now(), } - + // Calculate estimated completion time if status.ActiveTasks > 0 || status.QueuedTasks > 0 { avgJobTime := cm.calculateAverageJobTime() totalRemaining := time.Duration(status.ActiveTasks+status.QueuedTasks) * avgJobTime status.EstimatedCompletion = time.Now().Add(totalRemaining) } - + return status, nil } @@ -374,12 +381,12 @@ func (cm *LeaderContextManager) watchLeadershipChanges() { default: // Check leadership status newIsLeader := cm.election.IsLeader() - + cm.mu.Lock() oldIsLeader := cm.isLeader cm.isLeader = newIsLeader cm.mu.Unlock() - + // Handle leadership change if oldIsLeader != newIsLeader { if newIsLeader { @@ -388,7 +395,7 @@ func (cm *LeaderContextManager) watchLeadershipChanges() { cm.onLoseLeadership() } } - + // Sleep before next check time.Sleep(cm.config.LeadershipCheckInterval) } @@ -420,31 +427,31 @@ func (cm *LeaderContextManager) handleGenerationRequest(req *ContextGenerationRe Status: JobStatusRunning, StartedAt: time.Now(), } - + cm.mu.Lock() cm.activeJobs[job.ID] = job cm.mu.Unlock() - + defer func() { cm.mu.Lock() delete(cm.activeJobs, job.ID) cm.completedJobs[job.ID] = job cm.mu.Unlock() - + // Clean up old completed jobs cm.cleanupCompletedJobs() }() - + // Generate context using intelligence engine contextNode, err := cm.intelligence.AnalyzeFile( context.Background(), req.FilePath, req.Role, ) - + completedAt := time.Now() job.CompletedAt = &completedAt - + if err != nil { job.Status = JobStatusFailed job.Error = err @@ -453,11 +460,16 @@ func (cm *LeaderContextManager) handleGenerationRequest(req *ContextGenerationRe job.Status = JobStatusCompleted job.Result = contextNode cm.stats.CompletedJobs++ - - // Store generated context - if err := cm.storage.StoreContext(context.Background(), contextNode, []string{req.Role}); err != nil { - // Log storage error but don't fail the job - // TODO: Add proper logging + + // Store generated context (SEC-SLURP 1.1 persistence bridge) + if cm.contextUpserter != nil { + if _, persistErr := cm.contextUpserter.UpsertContext(context.Background(), contextNode); persistErr != nil { + // TODO(SEC-SLURP 1.1): surface persistence errors via structured logging/telemetry + } + } else if cm.storage != nil { + if err := cm.storage.StoreContext(context.Background(), contextNode, []string{req.Role}); err != nil { + // TODO: Add proper logging when falling back to legacy storage path + } } } } @@ -494,21 +506,21 @@ func (cm *LeaderContextManager) calculateAverageJobTime() time.Duration { if len(cm.completedJobs) == 0 { return time.Minute // Default estimate } - + var totalTime time.Duration count := 0 - + for _, job := range cm.completedJobs { if job.CompletedAt != nil { totalTime += job.CompletedAt.Sub(job.StartedAt) count++ } } - + if count == 0 { return time.Minute } - + return totalTime / time.Duration(count) } @@ -520,10 +532,10 @@ func (cm *LeaderContextManager) calculateAverageWaitTime() time.Duration { if queueLength == 0 { return 0 } - + avgJobTime := cm.calculateAverageJobTime() concurrency := cm.config.MaxConcurrentJobs - + // Estimate wait time based on queue position and processing capacity estimatedWait := time.Duration(queueLength/concurrency) * avgJobTime return estimatedWait @@ -533,22 +545,22 @@ func (cm *LeaderContextManager) calculateAverageWaitTime() time.Duration { func (cm *LeaderContextManager) GetQueueStatus() (*QueueStatus, error) { cm.mu.RLock() defer cm.mu.RUnlock() - + status := &QueueStatus{ - QueueLength: len(cm.generationQueue), - MaxQueueSize: cm.config.QueueSize, - QueuedRequests: []*ContextGenerationRequest{}, + QueueLength: len(cm.generationQueue), + MaxQueueSize: cm.config.QueueSize, + QueuedRequests: []*ContextGenerationRequest{}, PriorityDistribution: make(map[Priority]int), - AverageWaitTime: cm.calculateAverageWaitTime(), + AverageWaitTime: cm.calculateAverageWaitTime(), } - + // Get oldest request time if any if len(cm.generationQueue) > 0 { // Peek at queue without draining oldest := time.Now() status.OldestRequest = &oldest } - + return status, nil } @@ -556,21 +568,21 @@ func (cm *LeaderContextManager) GetQueueStatus() (*QueueStatus, error) { func (cm *LeaderContextManager) CancelGeneration(taskID string) error { cm.mu.Lock() defer cm.mu.Unlock() - + // Check if task is active if job, exists := cm.activeJobs[taskID]; exists { job.Status = JobStatusCancelled job.Error = fmt.Errorf("task cancelled by user") completedAt := time.Now() job.CompletedAt = &completedAt - + delete(cm.activeJobs, taskID) cm.completedJobs[taskID] = job cm.stats.CancelledJobs++ - + return nil } - + // TODO: Remove from queue if pending return fmt.Errorf("task %s not found", taskID) } @@ -585,11 +597,11 @@ func (cm *LeaderContextManager) PrioritizeGeneration(taskID string, priority Pri func (cm *LeaderContextManager) GetManagerStats() (*ManagerStatistics, error) { cm.mu.RLock() defer cm.mu.RUnlock() - + stats := *cm.stats // Copy current stats stats.AverageJobTime = cm.calculateAverageJobTime() stats.HighestQueueLength = len(cm.generationQueue) - + return &stats, nil } @@ -597,7 +609,7 @@ func (cm *LeaderContextManager) onBecomeLeader() { // Initialize leader-specific state cm.stats.LeadershipChanges++ cm.stats.LastBecameLeader = time.Now() - + // Recover any pending state from previous leader if err := cm.failoverManager.RecoverFromFailover(context.Background()); err != nil { // Log error but continue - we're the leader now @@ -611,7 +623,7 @@ func (cm *LeaderContextManager) onLoseLeadership() { // TODO: Send state to new leader _ = state } - + cm.stats.LastLostLeadership = time.Now() } @@ -623,7 +635,7 @@ func (cm *LeaderContextManager) handleNonLeaderRequest(req *ContextGenerationReq func (cm *LeaderContextManager) monitorHealth() { ticker := time.NewTicker(cm.config.HealthCheckInterval) defer ticker.Stop() - + for { select { case <-ticker.C: @@ -640,7 +652,7 @@ func (cm *LeaderContextManager) monitorHealth() { func (cm *LeaderContextManager) syncCluster() { ticker := time.NewTicker(cm.config.ClusterSyncInterval) defer ticker.Stop() - + for { select { case <-ticker.C: @@ -659,18 +671,18 @@ func (cm *LeaderContextManager) syncCluster() { func (cm *LeaderContextManager) cleanupCompletedJobs() { cm.mu.Lock() defer cm.mu.Unlock() - + if len(cm.completedJobs) <= cm.config.MaxCompletedJobs { return } - + // Remove oldest completed jobs based on completion time type jobWithTime struct { id string job *ContextGenerationJob time time.Time } - + var jobs []jobWithTime for id, job := range cm.completedJobs { completedAt := time.Now() @@ -679,12 +691,12 @@ func (cm *LeaderContextManager) cleanupCompletedJobs() { } jobs = append(jobs, jobWithTime{id: id, job: job, time: completedAt}) } - + // Sort by completion time (oldest first) sort.Slice(jobs, func(i, j int) bool { return jobs[i].time.Before(jobs[j].time) }) - + // Remove oldest jobs to get back to limit toRemove := len(jobs) - cm.config.MaxCompletedJobs for i := 0; i < toRemove; i++ { @@ -701,13 +713,13 @@ func generateJobID() string { // Error definitions var ( - ErrNotLeader = &LeaderError{Code: "NOT_LEADER", Message: "Node is not the leader"} - ErrQueueFull = &LeaderError{Code: "QUEUE_FULL", Message: "Generation queue is full"} - ErrDuplicateRequest = &LeaderError{Code: "DUPLICATE_REQUEST", Message: "Duplicate generation request"} - ErrInvalidRequest = &LeaderError{Code: "INVALID_REQUEST", Message: "Invalid generation request"} - ErrMissingUCXLAddress = &LeaderError{Code: "MISSING_UCXL_ADDRESS", Message: "Missing UCXL address"} - ErrMissingFilePath = &LeaderError{Code: "MISSING_FILE_PATH", Message: "Missing file path"} - ErrMissingRole = &LeaderError{Code: "MISSING_ROLE", Message: "Missing role"} + ErrNotLeader = &LeaderError{Code: "NOT_LEADER", Message: "Node is not the leader"} + ErrQueueFull = &LeaderError{Code: "QUEUE_FULL", Message: "Generation queue is full"} + ErrDuplicateRequest = &LeaderError{Code: "DUPLICATE_REQUEST", Message: "Duplicate generation request"} + ErrInvalidRequest = &LeaderError{Code: "INVALID_REQUEST", Message: "Invalid generation request"} + ErrMissingUCXLAddress = &LeaderError{Code: "MISSING_UCXL_ADDRESS", Message: "Missing UCXL address"} + ErrMissingFilePath = &LeaderError{Code: "MISSING_FILE_PATH", Message: "Missing file path"} + ErrMissingRole = &LeaderError{Code: "MISSING_ROLE", Message: "Missing role"} ) // LeaderError represents errors specific to leader operations @@ -731,4 +743,4 @@ func DefaultManagerConfig() *ManagerConfig { MaxConcurrentJobs: 10, JobTimeout: 10 * time.Minute, } -} \ No newline at end of file +} diff --git a/pkg/slurp/slurp.go b/pkg/slurp/slurp.go index 27eace7..c0b3306 100644 --- a/pkg/slurp/slurp.go +++ b/pkg/slurp/slurp.go @@ -27,7 +27,12 @@ package slurp import ( "context" + "encoding/json" + "errors" "fmt" + "os" + "path/filepath" + "strings" "sync" "time" @@ -35,8 +40,15 @@ import ( "chorus/pkg/crypto" "chorus/pkg/dht" "chorus/pkg/election" + slurpContext "chorus/pkg/slurp/context" + "chorus/pkg/slurp/storage" + "chorus/pkg/ucxl" ) +const contextStoragePrefix = "slurp:context:" + +var errContextNotPersisted = errors.New("slurp context not persisted") + // SLURP is the main coordinator for contextual intelligence operations. // // It orchestrates the interaction between context resolution, temporal analysis, @@ -51,53 +63,67 @@ type SLURP struct { dht dht.DHT crypto *crypto.AgeCrypto election *election.ElectionManager - + + // Roadmap: SEC-SLURP 1.1 persistent storage wiring + storagePath string + localStorage storage.LocalStorage + // Core components - contextResolver ContextResolver - temporalGraph TemporalGraph - storage DistributedStorage - intelligence ContextGenerator - retrieval QueryEngine - + contextResolver ContextResolver + temporalGraph TemporalGraph + storage DistributedStorage + intelligence ContextGenerator + retrieval QueryEngine + // State management - mu sync.RWMutex - initialized bool - adminMode bool - currentAdmin string - + mu sync.RWMutex + initialized bool + adminMode bool + currentAdmin string + + // SEC-SLURP 1.1: lightweight in-memory context persistence + contextsMu sync.RWMutex + contextStore map[string]*slurpContext.ContextNode + resolvedCache map[string]*slurpContext.ResolvedContext + // Background processing - ctx context.Context - cancel context.CancelFunc - backgroundTasks sync.WaitGroup - + ctx context.Context + cancel context.CancelFunc + backgroundTasks sync.WaitGroup + // Performance monitoring - metrics *SLURPMetrics - + metrics *SLURPMetrics + // Event handling - eventHandlers map[EventType][]EventHandler - eventMux sync.RWMutex + eventHandlers map[EventType][]EventHandler + eventMux sync.RWMutex +} + +// ContextPersister exposes the persistence contract used by leader workflows (SEC-SLURP 1.1). +type ContextPersister interface { + UpsertContext(ctx context.Context, node *slurpContext.ContextNode) (*slurpContext.ResolvedContext, error) } // SLURPConfig holds SLURP-specific configuration that extends the main CHORUS config type SLURPConfig struct { // Enable/disable SLURP system Enabled bool `yaml:"enabled" json:"enabled"` - + // Context resolution settings ContextResolution ContextResolutionConfig `yaml:"context_resolution" json:"context_resolution"` - + // Temporal analysis settings TemporalAnalysis TemporalAnalysisConfig `yaml:"temporal_analysis" json:"temporal_analysis"` - + // Storage configuration Storage SLURPStorageConfig `yaml:"storage" json:"storage"` - + // Intelligence/generation settings Intelligence IntelligenceConfig `yaml:"intelligence" json:"intelligence"` - + // Performance tuning Performance PerformanceConfig `yaml:"performance" json:"performance"` - + // Security settings Security SLURPSecurityConfig `yaml:"security" json:"security"` } @@ -105,69 +131,69 @@ type SLURPConfig struct { // ContextResolutionConfig configures hierarchical context resolution type ContextResolutionConfig struct { // Bounded traversal settings - MaxHierarchyDepth int `yaml:"max_hierarchy_depth" json:"max_hierarchy_depth"` - DefaultDepthLimit int `yaml:"default_depth_limit" json:"default_depth_limit"` - + MaxHierarchyDepth int `yaml:"max_hierarchy_depth" json:"max_hierarchy_depth"` + DefaultDepthLimit int `yaml:"default_depth_limit" json:"default_depth_limit"` + // Caching configuration - CacheTTL time.Duration `yaml:"cache_ttl" json:"cache_ttl"` - CacheMaxSize int64 `yaml:"cache_max_size" json:"cache_max_size"` - CacheMaxEntries int `yaml:"cache_max_entries" json:"cache_max_entries"` - + CacheTTL time.Duration `yaml:"cache_ttl" json:"cache_ttl"` + CacheMaxSize int64 `yaml:"cache_max_size" json:"cache_max_size"` + CacheMaxEntries int `yaml:"cache_max_entries" json:"cache_max_entries"` + // Resolution behavior - RequireStrictMatching bool `yaml:"require_strict_matching" json:"require_strict_matching"` - AllowPartialResolution bool `yaml:"allow_partial_resolution" json:"allow_partial_resolution"` - MinConfidenceThreshold float64 `yaml:"min_confidence_threshold" json:"min_confidence_threshold"` - + RequireStrictMatching bool `yaml:"require_strict_matching" json:"require_strict_matching"` + AllowPartialResolution bool `yaml:"allow_partial_resolution" json:"allow_partial_resolution"` + MinConfidenceThreshold float64 `yaml:"min_confidence_threshold" json:"min_confidence_threshold"` + // Global context settings - EnableGlobalContexts bool `yaml:"enable_global_contexts" json:"enable_global_contexts"` - GlobalContextTTL time.Duration `yaml:"global_context_ttl" json:"global_context_ttl"` + EnableGlobalContexts bool `yaml:"enable_global_contexts" json:"enable_global_contexts"` + GlobalContextTTL time.Duration `yaml:"global_context_ttl" json:"global_context_ttl"` } // TemporalAnalysisConfig configures decision-based temporal evolution tracking type TemporalAnalysisConfig struct { // Decision hop analysis - MaxDecisionHops int `yaml:"max_decision_hops" json:"max_decision_hops"` - DefaultHopLimit int `yaml:"default_hop_limit" json:"default_hop_limit"` - + MaxDecisionHops int `yaml:"max_decision_hops" json:"max_decision_hops"` + DefaultHopLimit int `yaml:"default_hop_limit" json:"default_hop_limit"` + // Temporal navigation - EnableNavigation bool `yaml:"enable_navigation" json:"enable_navigation"` - MaxNavigationHistory int `yaml:"max_navigation_history" json:"max_navigation_history"` - + EnableNavigation bool `yaml:"enable_navigation" json:"enable_navigation"` + MaxNavigationHistory int `yaml:"max_navigation_history" json:"max_navigation_history"` + // Staleness detection - StalenessThreshold float64 `yaml:"staleness_threshold" json:"staleness_threshold"` + StalenessThreshold float64 `yaml:"staleness_threshold" json:"staleness_threshold"` StalenessCheckInterval time.Duration `yaml:"staleness_check_interval" json:"staleness_check_interval"` - + // Decision analysis MinDecisionConfidence float64 `yaml:"min_decision_confidence" json:"min_decision_confidence"` MaxDecisionAge time.Duration `yaml:"max_decision_age" json:"max_decision_age"` - + // Influence propagation - EnableInfluencePropagation bool `yaml:"enable_influence_propagation" json:"enable_influence_propagation"` - MaxInfluenceDepth int `yaml:"max_influence_depth" json:"max_influence_depth"` + EnableInfluencePropagation bool `yaml:"enable_influence_propagation" json:"enable_influence_propagation"` + MaxInfluenceDepth int `yaml:"max_influence_depth" json:"max_influence_depth"` } // SLURPStorageConfig configures distributed storage behavior type SLURPStorageConfig struct { // Storage backend - Backend string `yaml:"backend" json:"backend"` // "dht", "hybrid" - + Backend string `yaml:"backend" json:"backend"` // "dht", "hybrid" + // Encryption settings - DefaultEncryption bool `yaml:"default_encryption" json:"default_encryption"` - EncryptionRoles []string `yaml:"encryption_roles" json:"encryption_roles"` - + DefaultEncryption bool `yaml:"default_encryption" json:"default_encryption"` + EncryptionRoles []string `yaml:"encryption_roles" json:"encryption_roles"` + // Persistence - LocalCacheEnabled bool `yaml:"local_cache_enabled" json:"local_cache_enabled"` - LocalCachePath string `yaml:"local_cache_path" json:"local_cache_path"` - LocalCacheMaxSize int64 `yaml:"local_cache_max_size" json:"local_cache_max_size"` - + LocalCacheEnabled bool `yaml:"local_cache_enabled" json:"local_cache_enabled"` + LocalCachePath string `yaml:"local_cache_path" json:"local_cache_path"` + LocalCacheMaxSize int64 `yaml:"local_cache_max_size" json:"local_cache_max_size"` + // Synchronization - SyncInterval time.Duration `yaml:"sync_interval" json:"sync_interval"` - SyncTimeout time.Duration `yaml:"sync_timeout" json:"sync_timeout"` - ConflictResolution string `yaml:"conflict_resolution" json:"conflict_resolution"` - + SyncInterval time.Duration `yaml:"sync_interval" json:"sync_interval"` + SyncTimeout time.Duration `yaml:"sync_timeout" json:"sync_timeout"` + ConflictResolution string `yaml:"conflict_resolution" json:"conflict_resolution"` + // Replication - ReplicationFactor int `yaml:"replication_factor" json:"replication_factor"` - ConsistencyLevel string `yaml:"consistency_level" json:"consistency_level"` + ReplicationFactor int `yaml:"replication_factor" json:"replication_factor"` + ConsistencyLevel string `yaml:"consistency_level" json:"consistency_level"` } // IntelligenceConfig configures context generation and analysis @@ -176,45 +202,45 @@ type IntelligenceConfig struct { EnableGeneration bool `yaml:"enable_generation" json:"enable_generation"` GenerationTimeout time.Duration `yaml:"generation_timeout" json:"generation_timeout"` GenerationConcurrency int `yaml:"generation_concurrency" json:"generation_concurrency"` - + // Analysis settings - EnableAnalysis bool `yaml:"enable_analysis" json:"enable_analysis"` - AnalysisInterval time.Duration `yaml:"analysis_interval" json:"analysis_interval"` - + EnableAnalysis bool `yaml:"enable_analysis" json:"enable_analysis"` + AnalysisInterval time.Duration `yaml:"analysis_interval" json:"analysis_interval"` + // Pattern detection - EnablePatternDetection bool `yaml:"enable_pattern_detection" json:"enable_pattern_detection"` - PatternMatchThreshold float64 `yaml:"pattern_match_threshold" json:"pattern_match_threshold"` - + EnablePatternDetection bool `yaml:"enable_pattern_detection" json:"enable_pattern_detection"` + PatternMatchThreshold float64 `yaml:"pattern_match_threshold" json:"pattern_match_threshold"` + // Quality metrics - QualityThreshold float64 `yaml:"quality_threshold" json:"quality_threshold"` - EnableQualityMetrics bool `yaml:"enable_quality_metrics" json:"enable_quality_metrics"` - + QualityThreshold float64 `yaml:"quality_threshold" json:"quality_threshold"` + EnableQualityMetrics bool `yaml:"enable_quality_metrics" json:"enable_quality_metrics"` + // External integrations - RAGEndpoint string `yaml:"rag_endpoint" json:"rag_endpoint"` - RAGTimeout time.Duration `yaml:"rag_timeout" json:"rag_timeout"` + RAGEndpoint string `yaml:"rag_endpoint" json:"rag_endpoint"` + RAGTimeout time.Duration `yaml:"rag_timeout" json:"rag_timeout"` } // PerformanceConfig configures performance optimization settings type PerformanceConfig struct { // Concurrency limits - MaxConcurrentResolutions int `yaml:"max_concurrent_resolutions" json:"max_concurrent_resolutions"` - MaxConcurrentGenerations int `yaml:"max_concurrent_generations" json:"max_concurrent_generations"` - MaxConcurrentQueries int `yaml:"max_concurrent_queries" json:"max_concurrent_queries"` - + MaxConcurrentResolutions int `yaml:"max_concurrent_resolutions" json:"max_concurrent_resolutions"` + MaxConcurrentGenerations int `yaml:"max_concurrent_generations" json:"max_concurrent_generations"` + MaxConcurrentQueries int `yaml:"max_concurrent_queries" json:"max_concurrent_queries"` + // Timeout settings - DefaultRequestTimeout time.Duration `yaml:"default_request_timeout" json:"default_request_timeout"` - BackgroundTaskTimeout time.Duration `yaml:"background_task_timeout" json:"background_task_timeout"` - HealthCheckTimeout time.Duration `yaml:"health_check_timeout" json:"health_check_timeout"` - + DefaultRequestTimeout time.Duration `yaml:"default_request_timeout" json:"default_request_timeout"` + BackgroundTaskTimeout time.Duration `yaml:"background_task_timeout" json:"background_task_timeout"` + HealthCheckTimeout time.Duration `yaml:"health_check_timeout" json:"health_check_timeout"` + // Resource limits - MaxMemoryUsage int64 `yaml:"max_memory_usage" json:"max_memory_usage"` - MaxDiskUsage int64 `yaml:"max_disk_usage" json:"max_disk_usage"` - + MaxMemoryUsage int64 `yaml:"max_memory_usage" json:"max_memory_usage"` + MaxDiskUsage int64 `yaml:"max_disk_usage" json:"max_disk_usage"` + // Batch processing - DefaultBatchSize int `yaml:"default_batch_size" json:"default_batch_size"` - MaxBatchSize int `yaml:"max_batch_size" json:"max_batch_size"` - BatchTimeout time.Duration `yaml:"batch_timeout" json:"batch_timeout"` - + DefaultBatchSize int `yaml:"default_batch_size" json:"default_batch_size"` + MaxBatchSize int `yaml:"max_batch_size" json:"max_batch_size"` + BatchTimeout time.Duration `yaml:"batch_timeout" json:"batch_timeout"` + // Monitoring EnableMetrics bool `yaml:"enable_metrics" json:"enable_metrics"` MetricsCollectionInterval time.Duration `yaml:"metrics_collection_interval" json:"metrics_collection_interval"` @@ -223,82 +249,85 @@ type PerformanceConfig struct { // SLURPSecurityConfig configures security-specific settings type SLURPSecurityConfig struct { // Access control - EnforceRoleBasedAccess bool `yaml:"enforce_role_based_access" json:"enforce_role_based_access"` - DefaultAccessRoles []string `yaml:"default_access_roles" json:"default_access_roles"` - AdminOnlyOperations []string `yaml:"admin_only_operations" json:"admin_only_operations"` - + EnforceRoleBasedAccess bool `yaml:"enforce_role_based_access" json:"enforce_role_based_access"` + DefaultAccessRoles []string `yaml:"default_access_roles" json:"default_access_roles"` + AdminOnlyOperations []string `yaml:"admin_only_operations" json:"admin_only_operations"` + // Audit and logging - EnableAuditLog bool `yaml:"enable_audit_log" json:"enable_audit_log"` - AuditLogPath string `yaml:"audit_log_path" json:"audit_log_path"` - LogSensitiveOperations bool `yaml:"log_sensitive_operations" json:"log_sensitive_operations"` - + EnableAuditLog bool `yaml:"enable_audit_log" json:"enable_audit_log"` + AuditLogPath string `yaml:"audit_log_path" json:"audit_log_path"` + LogSensitiveOperations bool `yaml:"log_sensitive_operations" json:"log_sensitive_operations"` + // Encryption - RequireEncryption bool `yaml:"require_encryption" json:"require_encryption"` - EncryptionAlgorithm string `yaml:"encryption_algorithm" json:"encryption_algorithm"` - KeyRotationInterval time.Duration `yaml:"key_rotation_interval" json:"key_rotation_interval"` - + RequireEncryption bool `yaml:"require_encryption" json:"require_encryption"` + EncryptionAlgorithm string `yaml:"encryption_algorithm" json:"encryption_algorithm"` + KeyRotationInterval time.Duration `yaml:"key_rotation_interval" json:"key_rotation_interval"` + // Rate limiting - EnableRateLimiting bool `yaml:"enable_rate_limiting" json:"enable_rate_limiting"` - DefaultRateLimit int `yaml:"default_rate_limit" json:"default_rate_limit"` - BurstLimit int `yaml:"burst_limit" json:"burst_limit"` + EnableRateLimiting bool `yaml:"enable_rate_limiting" json:"enable_rate_limiting"` + DefaultRateLimit int `yaml:"default_rate_limit" json:"default_rate_limit"` + BurstLimit int `yaml:"burst_limit" json:"burst_limit"` } // SLURPMetrics holds performance and operational metrics type SLURPMetrics struct { // Resolution metrics - TotalResolutions int64 `json:"total_resolutions"` - SuccessfulResolutions int64 `json:"successful_resolutions"` - FailedResolutions int64 `json:"failed_resolutions"` - AverageResolutionTime time.Duration `json:"average_resolution_time"` - CacheHitRate float64 `json:"cache_hit_rate"` - + TotalResolutions int64 `json:"total_resolutions"` + SuccessfulResolutions int64 `json:"successful_resolutions"` + FailedResolutions int64 `json:"failed_resolutions"` + AverageResolutionTime time.Duration `json:"average_resolution_time"` + CacheHitRate float64 `json:"cache_hit_rate"` + CacheHits int64 `json:"cache_hits"` + CacheMisses int64 `json:"cache_misses"` + PersistenceErrors int64 `json:"persistence_errors"` + // Temporal metrics - TemporalNodes int64 `json:"temporal_nodes"` - DecisionPaths int64 `json:"decision_paths"` - InfluenceRelationships int64 `json:"influence_relationships"` - StaleContexts int64 `json:"stale_contexts"` - + TemporalNodes int64 `json:"temporal_nodes"` + DecisionPaths int64 `json:"decision_paths"` + InfluenceRelationships int64 `json:"influence_relationships"` + StaleContexts int64 `json:"stale_contexts"` + // Storage metrics - StoredContexts int64 `json:"stored_contexts"` - EncryptedContexts int64 `json:"encrypted_contexts"` - StorageUtilization float64 `json:"storage_utilization"` - SyncOperations int64 `json:"sync_operations"` - + StoredContexts int64 `json:"stored_contexts"` + EncryptedContexts int64 `json:"encrypted_contexts"` + StorageUtilization float64 `json:"storage_utilization"` + SyncOperations int64 `json:"sync_operations"` + // Intelligence metrics - GenerationRequests int64 `json:"generation_requests"` - SuccessfulGenerations int64 `json:"successful_generations"` - AnalysisOperations int64 `json:"analysis_operations"` - PatternMatches int64 `json:"pattern_matches"` - + GenerationRequests int64 `json:"generation_requests"` + SuccessfulGenerations int64 `json:"successful_generations"` + AnalysisOperations int64 `json:"analysis_operations"` + PatternMatches int64 `json:"pattern_matches"` + // Performance metrics - ActiveConnections int64 `json:"active_connections"` - MemoryUsage int64 `json:"memory_usage"` - DiskUsage int64 `json:"disk_usage"` - + ActiveConnections int64 `json:"active_connections"` + MemoryUsage int64 `json:"memory_usage"` + DiskUsage int64 `json:"disk_usage"` + // Error metrics - AuthenticationErrors int64 `json:"authentication_errors"` - AuthorizationErrors int64 `json:"authorization_errors"` - TimeoutErrors int64 `json:"timeout_errors"` - + AuthenticationErrors int64 `json:"authentication_errors"` + AuthorizationErrors int64 `json:"authorization_errors"` + TimeoutErrors int64 `json:"timeout_errors"` + // Timestamp - LastUpdated time.Time `json:"last_updated"` + LastUpdated time.Time `json:"last_updated"` } // EventType represents different types of SLURP events type EventType string const ( - EventContextResolved EventType = "context_resolved" - EventContextGenerated EventType = "context_generated" - EventContextEvolved EventType = "context_evolved" - EventDecisionRecorded EventType = "decision_recorded" - EventInfluenceAdded EventType = "influence_added" - EventAdminChanged EventType = "admin_changed" - EventStalenessDetected EventType = "staleness_detected" - EventCacheInvalidated EventType = "cache_invalidated" - EventStorageSynced EventType = "storage_synced" - EventPatternDetected EventType = "pattern_detected" - EventErrorOccurred EventType = "error_occurred" + EventContextResolved EventType = "context_resolved" + EventContextGenerated EventType = "context_generated" + EventContextEvolved EventType = "context_evolved" + EventDecisionRecorded EventType = "decision_recorded" + EventInfluenceAdded EventType = "influence_added" + EventAdminChanged EventType = "admin_changed" + EventStalenessDetected EventType = "staleness_detected" + EventCacheInvalidated EventType = "cache_invalidated" + EventStorageSynced EventType = "storage_synced" + EventPatternDetected EventType = "pattern_detected" + EventErrorOccurred EventType = "error_occurred" ) // EventHandler defines the interface for handling SLURP events @@ -340,25 +369,30 @@ func NewSLURP( if electionManager == nil { return nil, fmt.Errorf("election manager is required") } - + // Validate SLURP configuration if err := validateSLURPConfig(&config.Slurp); err != nil { return nil, fmt.Errorf("invalid SLURP configuration: %w", err) } - + ctx, cancel := context.WithCancel(context.Background()) - + + storagePath := defaultStoragePath(config) + slurp := &SLURP{ config: config, - dht: dhtInstance, - crypto: cryptoInstance, - election: electionManager, - ctx: ctx, - cancel: cancel, - metrics: &SLURPMetrics{LastUpdated: time.Now()}, + dht: dhtInstance, + crypto: cryptoInstance, + election: electionManager, + ctx: ctx, + cancel: cancel, + metrics: &SLURPMetrics{LastUpdated: time.Now()}, eventHandlers: make(map[EventType][]EventHandler), + contextStore: make(map[string]*slurpContext.ContextNode), + resolvedCache: make(map[string]*slurpContext.ResolvedContext), + storagePath: storagePath, } - + return slurp, nil } @@ -378,16 +412,50 @@ func NewSLURP( func (s *SLURP) Initialize(ctx context.Context) error { s.mu.Lock() defer s.mu.Unlock() - + if s.initialized { return fmt.Errorf("SLURP already initialized") } - + // Check if SLURP is enabled if !s.config.Slurp.Enabled { return fmt.Errorf("SLURP is disabled in configuration") } - + + // Establish runtime context for background operations + if ctx != nil { + if s.cancel != nil { + s.cancel() + } + s.ctx, s.cancel = context.WithCancel(ctx) + } else if s.ctx == nil { + s.ctx, s.cancel = context.WithCancel(context.Background()) + } + + // Ensure metrics structure is available + if s.metrics == nil { + s.metrics = &SLURPMetrics{} + } + s.metrics.LastUpdated = time.Now() + + // Initialize in-memory persistence (SEC-SLURP 1.1 bootstrap) + s.contextsMu.Lock() + if s.contextStore == nil { + s.contextStore = make(map[string]*slurpContext.ContextNode) + } + if s.resolvedCache == nil { + s.resolvedCache = make(map[string]*slurpContext.ResolvedContext) + } + s.contextsMu.Unlock() + + // Roadmap: SEC-SLURP 1.1 persistent storage bootstrapping + if err := s.setupPersistentStorage(); err != nil { + return fmt.Errorf("failed to initialize SLURP storage: %w", err) + } + if err := s.loadPersistedContexts(s.ctx); err != nil { + return fmt.Errorf("failed to load persisted contexts: %w", err) + } + // TODO: Initialize components in dependency order // 1. Initialize storage layer first // 2. Initialize context resolver with storage @@ -395,27 +463,27 @@ func (s *SLURP) Initialize(ctx context.Context) error { // 4. Initialize intelligence with resolver and temporal // 5. Initialize retrieval with all components // 6. Set up event handlers and background tasks - + // Set admin status based on current election state s.updateAdminStatus() - + // Set up election event handlers if err := s.setupElectionHandlers(); err != nil { return fmt.Errorf("failed to setup election handlers: %w", err) } - + // Start background tasks s.startBackgroundTasks() - + s.initialized = true - + // Emit initialization event s.emitEvent(EventContextResolved, map[string]interface{}{ - "action": "system_initialized", - "admin_mode": s.adminMode, + "action": "system_initialized", + "admin_mode": s.adminMode, "current_admin": s.currentAdmin, }) - + return nil } @@ -425,12 +493,14 @@ func (s *SLURP) Initialize(ctx context.Context) error { // hierarchy traversal with caching and role-based access control. // // Parameters: -// ctx: Request context for cancellation and timeouts -// ucxlAddress: The UCXL address to resolve context for +// +// ctx: Request context for cancellation and timeouts +// ucxlAddress: The UCXL address to resolve context for // // Returns: -// *ResolvedContext: Complete resolved context with metadata -// error: Any error during resolution +// +// *ResolvedContext: Complete resolved context with metadata +// error: Any error during resolution // // The resolution process: // 1. Validates the UCXL address format @@ -443,11 +513,53 @@ func (s *SLURP) Resolve(ctx context.Context, ucxlAddress string) (*ResolvedConte if !s.initialized { return nil, fmt.Errorf("SLURP not initialized") } - - // TODO: Implement context resolution - // This would delegate to the contextResolver component - - return nil, fmt.Errorf("not implemented") + + start := time.Now() + + parsed, err := ucxl.Parse(ucxlAddress) + if err != nil { + return nil, fmt.Errorf("invalid UCXL address: %w", err) + } + + key := parsed.String() + + s.contextsMu.RLock() + if resolved, ok := s.resolvedCache[key]; ok { + s.contextsMu.RUnlock() + s.markCacheHit() + s.markResolutionSuccess(time.Since(start)) + return convertResolvedForAPI(resolved), nil + } + s.contextsMu.RUnlock() + + node := s.getContextNode(key) + if node == nil { + // Roadmap: SEC-SLURP 1.1 - fallback to persistent storage when caches miss. + loadedNode, loadErr := s.loadContextForKey(ctx, key) + if loadErr != nil { + s.markResolutionFailure() + if !errors.Is(loadErr, errContextNotPersisted) { + s.markPersistenceError() + } + if errors.Is(loadErr, errContextNotPersisted) { + return nil, fmt.Errorf("context not found for %s", key) + } + return nil, fmt.Errorf("failed to load context for %s: %w", key, loadErr) + } + node = loadedNode + s.markCacheMiss() + } else { + s.markCacheMiss() + } + + built := buildResolvedContext(node) + s.contextsMu.Lock() + s.contextStore[key] = node + s.resolvedCache[key] = built + s.contextsMu.Unlock() + + s.markResolutionSuccess(time.Since(start)) + return convertResolvedForAPI(built), nil } // ResolveWithDepth resolves context with a specific depth limit. @@ -458,14 +570,19 @@ func (s *SLURP) ResolveWithDepth(ctx context.Context, ucxlAddress string, maxDep if !s.initialized { return nil, fmt.Errorf("SLURP not initialized") } - + if maxDepth < 0 { return nil, fmt.Errorf("maxDepth cannot be negative") } - - // TODO: Implement depth-limited resolution - - return nil, fmt.Errorf("not implemented") + + resolved, err := s.Resolve(ctx, ucxlAddress) + if err != nil { + return nil, err + } + if resolved != nil { + resolved.BoundedDepth = maxDepth + } + return resolved, nil } // BatchResolve efficiently resolves multiple UCXL addresses in parallel. @@ -476,14 +593,24 @@ func (s *SLURP) BatchResolve(ctx context.Context, addresses []string) (map[strin if !s.initialized { return nil, fmt.Errorf("SLURP not initialized") } - + if len(addresses) == 0 { return make(map[string]*ResolvedContext), nil } - - // TODO: Implement batch resolution with concurrency control - - return nil, fmt.Errorf("not implemented") + + results := make(map[string]*ResolvedContext, len(addresses)) + var firstErr error + for _, addr := range addresses { + resolved, err := s.Resolve(ctx, addr) + if err != nil { + if firstErr == nil { + firstErr = err + } + continue + } + results[addr] = resolved + } + return results, firstErr } // GetTemporalEvolution retrieves the temporal evolution history for a context. @@ -494,10 +621,17 @@ func (s *SLURP) GetTemporalEvolution(ctx context.Context, ucxlAddress string) ([ if !s.initialized { return nil, fmt.Errorf("SLURP not initialized") } - - // TODO: Delegate to temporal graph component - - return nil, fmt.Errorf("not implemented") + + if s.temporalGraph == nil { + return nil, fmt.Errorf("temporal graph not configured") + } + + parsed, err := ucxl.Parse(ucxlAddress) + if err != nil { + return nil, fmt.Errorf("invalid UCXL address: %w", err) + } + + return s.temporalGraph.GetEvolutionHistory(ctx, *parsed) } // NavigateDecisionHops navigates through the decision graph by hop distance. @@ -509,10 +643,21 @@ func (s *SLURP) NavigateDecisionHops(ctx context.Context, ucxlAddress string, ho if !s.initialized { return nil, fmt.Errorf("SLURP not initialized") } - - // TODO: Implement decision-hop navigation - - return nil, fmt.Errorf("not implemented") + + if s.temporalGraph == nil { + return nil, fmt.Errorf("decision navigation not configured") + } + + parsed, err := ucxl.Parse(ucxlAddress) + if err != nil { + return nil, fmt.Errorf("invalid UCXL address: %w", err) + } + + if navigator, ok := s.temporalGraph.(DecisionNavigator); ok { + return navigator.NavigateDecisionHops(ctx, *parsed, hops, direction) + } + + return nil, fmt.Errorf("decision navigation not supported by temporal graph") } // GenerateContext generates new context for a path (admin-only operation). @@ -524,15 +669,211 @@ func (s *SLURP) GenerateContext(ctx context.Context, path string, options *Gener if !s.initialized { return nil, fmt.Errorf("SLURP not initialized") } - + // Enforce admin-only restriction if !s.IsCurrentNodeAdmin() { return nil, fmt.Errorf("context generation requires admin privileges") } - - // TODO: Delegate to intelligence component - - return nil, fmt.Errorf("not implemented") + + if s.intelligence == nil { + return nil, fmt.Errorf("intelligence engine not configured") + } + + s.mu.Lock() + s.metrics.GenerationRequests++ + s.metrics.LastUpdated = time.Now() + s.mu.Unlock() + + generated, err := s.intelligence.GenerateContext(ctx, path, options) + if err != nil { + return nil, err + } + + contextNode, err := convertAPIToContextNode(generated) + if err != nil { + return nil, err + } + + if _, err := s.UpsertContext(ctx, contextNode); err != nil { + return nil, err + } + + return generated, nil +} + +// UpsertContext persists a context node and exposes it for immediate resolution (SEC-SLURP 1.1). +func (s *SLURP) UpsertContext(ctx context.Context, node *slurpContext.ContextNode) (*slurpContext.ResolvedContext, error) { + if !s.initialized { + return nil, fmt.Errorf("SLURP not initialized") + } + if node == nil { + return nil, fmt.Errorf("context node cannot be nil") + } + + if err := node.Validate(); err != nil { + return nil, err + } + + clone := node.Clone() + resolved := buildResolvedContext(clone) + key := clone.UCXLAddress.String() + + s.contextsMu.Lock() + s.contextStore[key] = clone + s.resolvedCache[key] = resolved + s.contextsMu.Unlock() + + s.mu.Lock() + s.metrics.StoredContexts++ + s.metrics.SuccessfulGenerations++ + s.metrics.LastUpdated = time.Now() + s.mu.Unlock() + + if err := s.persistContext(ctx, clone); err != nil && !errors.Is(err, errContextNotPersisted) { + s.markPersistenceError() + s.emitEvent(EventErrorOccurred, map[string]interface{}{ + "action": "persist_context", + "ucxl_address": key, + "error": err.Error(), + }) + } + + s.emitEvent(EventContextGenerated, map[string]interface{}{ + "ucxl_address": key, + "summary": clone.Summary, + "path": clone.Path, + }) + + return cloneResolvedInternal(resolved), nil +} + +func buildResolvedContext(node *slurpContext.ContextNode) *slurpContext.ResolvedContext { + if node == nil { + return nil + } + + return &slurpContext.ResolvedContext{ + UCXLAddress: node.UCXLAddress, + Summary: node.Summary, + Purpose: node.Purpose, + Technologies: cloneStringSlice(node.Technologies), + Tags: cloneStringSlice(node.Tags), + Insights: cloneStringSlice(node.Insights), + ContextSourcePath: node.Path, + InheritanceChain: []string{node.UCXLAddress.String()}, + ResolutionConfidence: node.RAGConfidence, + BoundedDepth: 0, + GlobalContextsApplied: false, + ResolvedAt: time.Now(), + } +} + +func cloneResolvedInternal(resolved *slurpContext.ResolvedContext) *slurpContext.ResolvedContext { + if resolved == nil { + return nil + } + + clone := *resolved + clone.Technologies = cloneStringSlice(resolved.Technologies) + clone.Tags = cloneStringSlice(resolved.Tags) + clone.Insights = cloneStringSlice(resolved.Insights) + clone.InheritanceChain = cloneStringSlice(resolved.InheritanceChain) + return &clone +} + +func convertResolvedForAPI(resolved *slurpContext.ResolvedContext) *ResolvedContext { + if resolved == nil { + return nil + } + + return &ResolvedContext{ + UCXLAddress: resolved.UCXLAddress.String(), + Summary: resolved.Summary, + Purpose: resolved.Purpose, + Technologies: cloneStringSlice(resolved.Technologies), + Tags: cloneStringSlice(resolved.Tags), + Insights: cloneStringSlice(resolved.Insights), + SourcePath: resolved.ContextSourcePath, + InheritanceChain: cloneStringSlice(resolved.InheritanceChain), + Confidence: resolved.ResolutionConfidence, + BoundedDepth: resolved.BoundedDepth, + GlobalApplied: resolved.GlobalContextsApplied, + ResolvedAt: resolved.ResolvedAt, + Version: 1, + LastUpdated: resolved.ResolvedAt, + EvolutionHistory: cloneStringSlice(resolved.InheritanceChain), + NodesTraversed: len(resolved.InheritanceChain), + } +} + +func convertAPIToContextNode(node *ContextNode) (*slurpContext.ContextNode, error) { + if node == nil { + return nil, fmt.Errorf("context node cannot be nil") + } + + address, err := ucxl.Parse(node.UCXLAddress) + if err != nil { + return nil, fmt.Errorf("invalid UCXL address: %w", err) + } + + converted := &slurpContext.ContextNode{ + Path: node.Path, + UCXLAddress: *address, + Summary: node.Summary, + Purpose: node.Purpose, + Technologies: cloneStringSlice(node.Technologies), + Tags: cloneStringSlice(node.Tags), + Insights: cloneStringSlice(node.Insights), + OverridesParent: node.Overrides, + ContextSpecificity: node.Specificity, + AppliesToChildren: node.AppliesTo == ScopeChildren, + GeneratedAt: node.CreatedAt, + RAGConfidence: node.Confidence, + EncryptedFor: cloneStringSlice(node.EncryptedFor), + AccessLevel: slurpContext.RoleAccessLevel(node.AccessLevel), + Metadata: cloneMetadata(node.Metadata), + } + + converted.AppliesTo = slurpContext.ContextScope(node.AppliesTo) + converted.CreatedBy = node.CreatedBy + converted.UpdatedAt = node.UpdatedAt + converted.WhoUpdated = node.UpdatedBy + converted.Parent = node.Parent + converted.Children = cloneStringSlice(node.Children) + converted.FileType = node.FileType + converted.Language = node.Language + converted.Size = node.Size + converted.LastModified = node.LastModified + converted.ContentHash = node.ContentHash + + if converted.GeneratedAt.IsZero() { + converted.GeneratedAt = time.Now() + } + if converted.UpdatedAt.IsZero() { + converted.UpdatedAt = converted.GeneratedAt + } + + return converted, nil +} + +func cloneStringSlice(src []string) []string { + if len(src) == 0 { + return nil + } + dst := make([]string, len(src)) + copy(dst, src) + return dst +} + +func cloneMetadata(src map[string]interface{}) map[string]interface{} { + if len(src) == 0 { + return nil + } + dst := make(map[string]interface{}, len(src)) + for k, v := range src { + dst[k] = v + } + return dst } // IsCurrentNodeAdmin returns true if the current node is the elected admin. @@ -549,13 +890,74 @@ func (s *SLURP) IsCurrentNodeAdmin() bool { func (s *SLURP) GetMetrics() *SLURPMetrics { s.mu.RLock() defer s.mu.RUnlock() - + // Return a copy to prevent modification metricsCopy := *s.metrics metricsCopy.LastUpdated = time.Now() return &metricsCopy } +// markResolutionSuccess tracks cache or storage hits (Roadmap: SEC-SLURP 1.1). +func (s *SLURP) markResolutionSuccess(duration time.Duration) { + s.mu.Lock() + defer s.mu.Unlock() + + s.metrics.TotalResolutions++ + s.metrics.SuccessfulResolutions++ + s.metrics.AverageResolutionTime = updateAverageDuration( + s.metrics.AverageResolutionTime, + s.metrics.TotalResolutions, + duration, + ) + if s.metrics.TotalResolutions > 0 { + s.metrics.CacheHitRate = float64(s.metrics.CacheHits) / float64(s.metrics.TotalResolutions) + } + s.metrics.LastUpdated = time.Now() +} + +// markResolutionFailure tracks lookup failures (Roadmap: SEC-SLURP 1.1). +func (s *SLURP) markResolutionFailure() { + s.mu.Lock() + defer s.mu.Unlock() + + s.metrics.TotalResolutions++ + s.metrics.FailedResolutions++ + if s.metrics.TotalResolutions > 0 { + s.metrics.CacheHitRate = float64(s.metrics.CacheHits) / float64(s.metrics.TotalResolutions) + } + s.metrics.LastUpdated = time.Now() +} + +func (s *SLURP) markCacheHit() { + s.mu.Lock() + defer s.mu.Unlock() + + s.metrics.CacheHits++ + if s.metrics.TotalResolutions > 0 { + s.metrics.CacheHitRate = float64(s.metrics.CacheHits) / float64(s.metrics.TotalResolutions) + } + s.metrics.LastUpdated = time.Now() +} + +func (s *SLURP) markCacheMiss() { + s.mu.Lock() + defer s.mu.Unlock() + + s.metrics.CacheMisses++ + if s.metrics.TotalResolutions > 0 { + s.metrics.CacheHitRate = float64(s.metrics.CacheHits) / float64(s.metrics.TotalResolutions) + } + s.metrics.LastUpdated = time.Now() +} + +func (s *SLURP) markPersistenceError() { + s.mu.Lock() + defer s.mu.Unlock() + + s.metrics.PersistenceErrors++ + s.metrics.LastUpdated = time.Now() +} + // RegisterEventHandler registers an event handler for specific event types. // // Event handlers are called asynchronously when events occur and can be @@ -563,11 +965,11 @@ func (s *SLURP) GetMetrics() *SLURPMetrics { func (s *SLURP) RegisterEventHandler(eventType EventType, handler EventHandler) { s.eventMux.Lock() defer s.eventMux.Unlock() - + if _, exists := s.eventHandlers[eventType]; !exists { s.eventHandlers[eventType] = make([]EventHandler, 0) } - + s.eventHandlers[eventType] = append(s.eventHandlers[eventType], handler) } @@ -578,26 +980,33 @@ func (s *SLURP) RegisterEventHandler(eventType EventType, handler EventHandler) func (s *SLURP) Close() error { s.mu.Lock() defer s.mu.Unlock() - + if !s.initialized { return nil } - + // Cancel context to stop background tasks s.cancel() - + // Wait for background tasks to complete s.backgroundTasks.Wait() - + // TODO: Close all components in reverse dependency order // 1. Stop retrieval engine // 2. Stop intelligence system // 3. Flush and close temporal graph // 4. Flush and close context resolver // 5. Close storage layer - + if s.localStorage != nil { + if closer, ok := s.localStorage.(interface{ Close() error }); ok { + if err := closer.Close(); err != nil { + return fmt.Errorf("failed to close SLURP storage: %w", err) + } + } + } + s.initialized = false - + return nil } @@ -614,13 +1023,13 @@ func (s *SLURP) setupElectionHandlers() error { if s.election == nil { return nil } - + // Set up callbacks for admin changes s.election.SetCallbacks( s.handleAdminChanged, s.handleElectionComplete, ) - + return nil } @@ -629,11 +1038,11 @@ func (s *SLURP) handleAdminChanged(oldAdmin, newAdmin string) { s.currentAdmin = newAdmin s.adminMode = (newAdmin == s.config.Agent.ID) s.mu.Unlock() - + // Emit admin change event s.emitEvent(EventAdminChanged, map[string]interface{}{ - "old_admin": oldAdmin, - "new_admin": newAdmin, + "old_admin": oldAdmin, + "new_admin": newAdmin, "is_current_node": s.adminMode, }) } @@ -646,11 +1055,11 @@ func (s *SLURP) startBackgroundTasks() { // Start metrics collection s.backgroundTasks.Add(1) go s.metricsCollectionLoop() - + // Start cache maintenance s.backgroundTasks.Add(1) go s.cacheMaintenance() - + // Start staleness detection if s.config.Slurp.TemporalAnalysis.StalenessCheckInterval > 0 { s.backgroundTasks.Add(1) @@ -660,10 +1069,10 @@ func (s *SLURP) startBackgroundTasks() { func (s *SLURP) metricsCollectionLoop() { defer s.backgroundTasks.Done() - + ticker := time.NewTicker(s.config.Slurp.Performance.MetricsCollectionInterval) defer ticker.Stop() - + for { select { case <-s.ctx.Done(): @@ -676,11 +1085,11 @@ func (s *SLURP) metricsCollectionLoop() { func (s *SLURP) cacheMaintenance() { defer s.backgroundTasks.Done() - + // TODO: Implement cache maintenance loop ticker := time.NewTicker(5 * time.Minute) defer ticker.Stop() - + for { select { case <-s.ctx.Done(): @@ -693,10 +1102,10 @@ func (s *SLURP) cacheMaintenance() { func (s *SLURP) stalenessDetectionLoop() { defer s.backgroundTasks.Done() - + ticker := time.NewTicker(s.config.Slurp.TemporalAnalysis.StalenessCheckInterval) defer ticker.Stop() - + for { select { case <-s.ctx.Done(): @@ -710,11 +1119,185 @@ func (s *SLURP) stalenessDetectionLoop() { func (s *SLURP) updateMetrics() { s.mu.Lock() defer s.mu.Unlock() - + // TODO: Collect metrics from all components s.metrics.LastUpdated = time.Now() } +// getContextNode returns cached nodes (Roadmap: SEC-SLURP 1.1 persistence). +func (s *SLURP) getContextNode(key string) *slurpContext.ContextNode { + s.contextsMu.RLock() + defer s.contextsMu.RUnlock() + + if node, ok := s.contextStore[key]; ok { + return node + } + return nil +} + +// loadContextForKey hydrates nodes from LevelDB (Roadmap: SEC-SLURP 1.1). +func (s *SLURP) loadContextForKey(ctx context.Context, key string) (*slurpContext.ContextNode, error) { + if s.localStorage == nil { + return nil, errContextNotPersisted + } + + runtimeCtx := s.runtimeContext(ctx) + stored, err := s.localStorage.Retrieve(runtimeCtx, contextStoragePrefix+key) + if err != nil { + if strings.Contains(err.Error(), "not found") { + return nil, errContextNotPersisted + } + return nil, err + } + + node, convErr := convertStoredToContextNode(stored) + if convErr != nil { + return nil, convErr + } + + return node, nil +} + +// setupPersistentStorage configures LevelDB persistence (Roadmap: SEC-SLURP 1.1). +func (s *SLURP) setupPersistentStorage() error { + if s.localStorage != nil { + return nil + } + + resolvedPath := s.storagePath + if resolvedPath == "" { + resolvedPath = defaultStoragePath(s.config) + } + + store, err := storage.NewLocalStorage(resolvedPath, nil) + if err != nil { + return err + } + + s.localStorage = store + s.storagePath = resolvedPath + return nil +} + +// loadPersistedContexts warms caches from disk (Roadmap: SEC-SLURP 1.1). +func (s *SLURP) loadPersistedContexts(ctx context.Context) error { + if s.localStorage == nil { + return nil + } + + runtimeCtx := s.runtimeContext(ctx) + keys, err := s.localStorage.List(runtimeCtx, ".*") + if err != nil { + return err + } + + var loaded int64 + s.contextsMu.Lock() + defer s.contextsMu.Unlock() + + for _, key := range keys { + if !strings.HasPrefix(key, contextStoragePrefix) { + continue + } + + stored, retrieveErr := s.localStorage.Retrieve(runtimeCtx, key) + if retrieveErr != nil { + s.markPersistenceError() + s.emitEvent(EventErrorOccurred, map[string]interface{}{ + "action": "load_persisted_context", + "key": key, + "error": retrieveErr.Error(), + }) + continue + } + + node, convErr := convertStoredToContextNode(stored) + if convErr != nil { + s.markPersistenceError() + s.emitEvent(EventErrorOccurred, map[string]interface{}{ + "action": "decode_persisted_context", + "key": key, + "error": convErr.Error(), + }) + continue + } + + address := strings.TrimPrefix(key, contextStoragePrefix) + nodeClone := node.Clone() + s.contextStore[address] = nodeClone + s.resolvedCache[address] = buildResolvedContext(nodeClone) + loaded++ + } + + s.mu.Lock() + s.metrics.StoredContexts = loaded + s.metrics.LastUpdated = time.Now() + s.mu.Unlock() + + return nil +} + +// persistContext stores contexts to LevelDB (Roadmap: SEC-SLURP 1.1). +func (s *SLURP) persistContext(ctx context.Context, node *slurpContext.ContextNode) error { + if s.localStorage == nil { + return errContextNotPersisted + } + + options := &storage.StoreOptions{ + Compress: true, + Cache: true, + Metadata: map[string]interface{}{ + "path": node.Path, + "summary": node.Summary, + "roadmap_tag": "SEC-SLURP-1.1", + }, + } + + return s.localStorage.Store(s.runtimeContext(ctx), contextStoragePrefix+node.UCXLAddress.String(), node, options) +} + +// runtimeContext provides a safe context for persistence (Roadmap: SEC-SLURP 1.1). +func (s *SLURP) runtimeContext(ctx context.Context) context.Context { + if ctx != nil { + return ctx + } + if s.ctx != nil { + return s.ctx + } + return context.Background() +} + +// defaultStoragePath resolves the SLURP storage directory (Roadmap: SEC-SLURP 1.1). +func defaultStoragePath(cfg *config.Config) string { + if cfg != nil && cfg.UCXL.Storage.Directory != "" { + return filepath.Join(cfg.UCXL.Storage.Directory, "slurp") + } + home, err := os.UserHomeDir() + if err == nil && home != "" { + return filepath.Join(home, ".chorus", "slurp") + } + return filepath.Join(os.TempDir(), "chorus", "slurp") +} + +// convertStoredToContextNode rehydrates persisted contexts (Roadmap: SEC-SLURP 1.1). +func convertStoredToContextNode(raw interface{}) (*slurpContext.ContextNode, error) { + if raw == nil { + return nil, fmt.Errorf("no context data provided") + } + + payload, err := json.Marshal(raw) + if err != nil { + return nil, fmt.Errorf("failed to marshal persisted context: %w", err) + } + + var node slurpContext.ContextNode + if err := json.Unmarshal(payload, &node); err != nil { + return nil, fmt.Errorf("failed to decode persisted context: %w", err) + } + + return &node, nil +} + func (s *SLURP) detectStaleContexts() { // TODO: Implement staleness detection // This would scan temporal nodes for contexts that haven't been @@ -728,7 +1311,7 @@ func (s *SLURP) emitEvent(eventType EventType, data map[string]interface{}) { Source: "slurp_core", Data: data, } - + // Call event handlers asynchronously go s.handleEvent(event) } @@ -740,12 +1323,12 @@ func (s *SLURP) handleEvent(event *SLURPEvent) { s.eventMux.RUnlock() return } - + // Make a copy of handlers to avoid holding lock during execution handlersCopy := make([]EventHandler, len(handlers)) copy(handlersCopy, handlers) s.eventMux.RUnlock() - + // Execute handlers for _, handler := range handlersCopy { func(h EventHandler) { @@ -754,10 +1337,10 @@ func (s *SLURP) handleEvent(event *SLURPEvent) { // Log handler panic but don't crash the system } }() - + ctx, cancel := context.WithTimeout(s.ctx, 30*time.Second) defer cancel() - + if err := h(ctx, event); err != nil { // Log handler error but continue with other handlers } @@ -770,22 +1353,33 @@ func validateSLURPConfig(config *SLURPConfig) error { if config.ContextResolution.MaxHierarchyDepth < 1 { return fmt.Errorf("max_hierarchy_depth must be at least 1") } - + if config.ContextResolution.MinConfidenceThreshold < 0 || config.ContextResolution.MinConfidenceThreshold > 1 { return fmt.Errorf("min_confidence_threshold must be between 0 and 1") } - + if config.TemporalAnalysis.MaxDecisionHops < 1 { return fmt.Errorf("max_decision_hops must be at least 1") } - + if config.TemporalAnalysis.StalenessThreshold < 0 || config.TemporalAnalysis.StalenessThreshold > 1 { return fmt.Errorf("staleness_threshold must be between 0 and 1") } - + if config.Performance.MaxConcurrentResolutions < 1 { return fmt.Errorf("max_concurrent_resolutions must be at least 1") } - + return nil -} \ No newline at end of file +} + +func updateAverageDuration(current time.Duration, total int64, latest time.Duration) time.Duration { + if total <= 0 { + return latest + } + if total == 1 { + return latest + } + prevSum := int64(current) * (total - 1) + return time.Duration((prevSum + int64(latest)) / total) +} diff --git a/pkg/slurp/slurp_persistence_test.go b/pkg/slurp/slurp_persistence_test.go new file mode 100644 index 0000000..93f1892 --- /dev/null +++ b/pkg/slurp/slurp_persistence_test.go @@ -0,0 +1,69 @@ +package slurp + +import ( + "context" + "testing" + "time" + + "chorus/pkg/config" + slurpContext "chorus/pkg/slurp/context" + "chorus/pkg/ucxl" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +// TestSLURPPersistenceLoadsContexts verifies LevelDB fallback (Roadmap: SEC-SLURP 1.1). +func TestSLURPPersistenceLoadsContexts(t *testing.T) { + configDir := t.TempDir() + cfg := &config.Config{ + Slurp: config.SlurpConfig{Enabled: true}, + UCXL: config.UCXLConfig{ + Storage: config.StorageConfig{Directory: configDir}, + }, + } + + primary, err := NewSLURP(cfg, nil, nil, nil) + require.NoError(t, err) + require.NoError(t, primary.Initialize(context.Background())) + t.Cleanup(func() { + _ = primary.Close() + }) + + address, err := ucxl.Parse("ucxl://agent:resolver@chorus:task/current/docs/example.go") + require.NoError(t, err) + + node := &slurpContext.ContextNode{ + Path: "docs/example.go", + UCXLAddress: *address, + Summary: "Persistent context summary", + Purpose: "Verify persistence pipeline", + Technologies: []string{"Go"}, + Tags: []string{"persistence", "slurp"}, + GeneratedAt: time.Now().UTC(), + RAGConfidence: 0.92, + } + + _, err = primary.UpsertContext(context.Background(), node) + require.NoError(t, err) + require.NoError(t, primary.Close()) + + restore, err := NewSLURP(cfg, nil, nil, nil) + require.NoError(t, err) + require.NoError(t, restore.Initialize(context.Background())) + t.Cleanup(func() { + _ = restore.Close() + }) + + // Clear in-memory caches to force disk hydration path. + restore.contextsMu.Lock() + restore.contextStore = make(map[string]*slurpContext.ContextNode) + restore.resolvedCache = make(map[string]*slurpContext.ResolvedContext) + restore.contextsMu.Unlock() + + resolved, err := restore.Resolve(context.Background(), address.String()) + require.NoError(t, err) + require.NotNil(t, resolved) + assert.Equal(t, node.Summary, resolved.Summary) + assert.Equal(t, node.Purpose, resolved.Purpose) + assert.Contains(t, resolved.Technologies, "Go") +}