diff --git a/pkg/slurp/slurp.go b/pkg/slurp/slurp.go index cf18f8e..7ea9bfa 100644 --- a/pkg/slurp/slurp.go +++ b/pkg/slurp/slurp.go @@ -42,6 +42,7 @@ import ( "chorus/pkg/election" slurpContext "chorus/pkg/slurp/context" "chorus/pkg/slurp/storage" + "chorus/pkg/slurp/temporal" "chorus/pkg/ucxl" ) @@ -71,6 +72,8 @@ type SLURP struct { // Core components contextResolver ContextResolver temporalGraph TemporalGraph + temporalSystem *temporal.TemporalGraphSystem + temporalStore storage.ContextStore storage DistributedStorage intelligence ContextGenerator retrieval QueryEngine @@ -456,6 +459,10 @@ func (s *SLURP) Initialize(ctx context.Context) error { return fmt.Errorf("failed to load persisted contexts: %w", err) } + if err := s.initializeTemporalSystem(s.ctx); err != nil { + return fmt.Errorf("failed to initialize temporal system: %w", err) + } + // TODO: Initialize components in dependency order // 1. Initialize storage layer first // 2. Initialize context resolver with storage @@ -1179,6 +1186,60 @@ func (s *SLURP) setupPersistentStorage() error { return nil } +// initializeTemporalSystem wires the temporal graph to the DHT-backed persistence layer. +func (s *SLURP) initializeTemporalSystem(ctx context.Context) error { + if s.temporalGraph != nil { + return nil + } + + if s.localStorage == nil { + return fmt.Errorf("temporal persistence requires local storage") + } + + if s.temporalStore == nil { + s.temporalStore = storage.NewInMemoryContextStore() + } + + cfg := temporal.DefaultTemporalConfig() + if cfg.PersistenceConfig == nil { + cfg.PersistenceConfig = &temporal.PersistenceConfig{} + } + + cfg.PersistenceConfig.EnableWriteBuffer = false + cfg.PersistenceConfig.EnableAutoSync = false + cfg.PersistenceConfig.EnableAutoBackup = false + cfg.PersistenceConfig.EnableLocalStorage = true + cfg.PersistenceConfig.EnableDistributedStorage = s.dht != nil + cfg.PersistenceConfig.EnableEncryption = false + cfg.PersistenceConfig.BatchSize = 1 + cfg.PersistenceConfig.FlushInterval = 30 * time.Second + if len(cfg.PersistenceConfig.EncryptionRoles) == 0 { + cfg.PersistenceConfig.EncryptionRoles = []string{"default"} + } + + nodeID := s.config.Agent.ID + if nodeID == "" { + nodeID = fmt.Sprintf("slurp-node-%d", time.Now().UnixNano()) + } + + system, err := temporal.NewDHTBackedTemporalGraphSystem( + s.runtimeContext(ctx), + s.temporalStore, + s.localStorage, + s.dht, + nodeID, + cfg, + ) + if err != nil { + return fmt.Errorf("failed to build DHT temporal system: %w", err) + } + + s.temporalSystem = system + s.temporalGraph = system.Graph + + return nil +} + // loadPersistedContexts warms caches from disk (Roadmap: SEC-SLURP 1.1). func (s *SLURP) loadPersistedContexts(ctx context.Context) error { if s.localStorage == nil { diff --git a/pkg/slurp/storage/context_store_inmemory.go b/pkg/slurp/storage/context_store_inmemory.go new file mode 100644 index 0000000..9ad61f8 --- /dev/null +++ b/pkg/slurp/storage/context_store_inmemory.go @@ -0,0 +1,155 @@ +package storage + +import ( + "context" + "sync" + "time" + + slurpContext "chorus/pkg/slurp/context" + "chorus/pkg/ucxl" +) + +// inMemoryContextStore offers a lightweight ContextStore implementation suitable for +// local development and SEC-SLURP bootstrap scenarios. It keeps all context nodes in +// process memory, providing the minimal surface required by the temporal subsystem until +// the production storage stack is wired in. +type inMemoryContextStore struct { + mu sync.RWMutex + contexts map[string]*slurpContext.ContextNode +} + +// NewInMemoryContextStore constructs an in-memory ContextStore. +func NewInMemoryContextStore() ContextStore { + return &inMemoryContextStore{ + contexts: make(map[string]*slurpContext.ContextNode), + } +} + +func (s *inMemoryContextStore) StoreContext(ctx context.Context, node *slurpContext.ContextNode, roles []string) error { + if node == nil { + return nil + } + s.mu.Lock() + defer s.mu.Unlock() + s.contexts[node.UCXLAddress.String()] = node + return nil +} + +func (s *inMemoryContextStore) RetrieveContext(ctx context.Context, address ucxl.Address, role string) (*slurpContext.ContextNode, error) { + s.mu.RLock() + defer s.mu.RUnlock() + node, ok := s.contexts[address.String()] + if !ok { + return nil, ErrNotFound + } + return node, nil +} + +func (s *inMemoryContextStore) UpdateContext(ctx context.Context, node *slurpContext.ContextNode, roles []string) error { + if node == nil { + return nil + } + s.mu.Lock() + defer s.mu.Unlock() + s.contexts[node.UCXLAddress.String()] = node + return nil +} + +func (s *inMemoryContextStore) DeleteContext(ctx context.Context, address ucxl.Address) error { + s.mu.Lock() + defer s.mu.Unlock() + delete(s.contexts, address.String()) + return nil +} + +func (s *inMemoryContextStore) ExistsContext(ctx context.Context, address ucxl.Address) (bool, error) { + s.mu.RLock() + defer s.mu.RUnlock() + _, ok := s.contexts[address.String()] + return ok, nil +} + +func (s *inMemoryContextStore) ListContexts(ctx context.Context, criteria *ListCriteria) ([]*slurpContext.ContextNode, error) { + s.mu.RLock() + defer s.mu.RUnlock() + results := make([]*slurpContext.ContextNode, 0, len(s.contexts)) + for _, node := range s.contexts { + results = append(results, node) + } + return results, nil +} + +func (s *inMemoryContextStore) SearchContexts(ctx context.Context, query *SearchQuery) (*SearchResults, error) { + return &SearchResults{ + Results: []*SearchResult{}, + TotalResults: 0, + ProcessingTime: 0, + Facets: map[string]map[string]int{}, + Suggestions: []string{}, + ProcessedAt: time.Now(), + }, nil +} + +func (s *inMemoryContextStore) BatchStore(ctx context.Context, batch *BatchStoreRequest) (*BatchStoreResult, error) { + if batch == nil { + return &BatchStoreResult{}, nil + } + s.mu.Lock() + defer s.mu.Unlock() + success := 0 + for _, item := range batch.Contexts { + if item == nil || item.Context == nil { + continue + } + s.contexts[item.Context.UCXLAddress.String()] = item.Context + success++ + } + return &BatchStoreResult{SuccessCount: success}, nil +} + +func (s *inMemoryContextStore) BatchRetrieve(ctx context.Context, batch *BatchRetrieveRequest) (*BatchRetrieveResult, error) { + result := &BatchRetrieveResult{ + Contexts: make(map[string]*slurpContext.ContextNode), + Errors: make(map[string]error), + ProcessedAt: time.Now(), + ProcessingTime: 0, + } + if batch == nil { + return result, nil + } + s.mu.RLock() + defer s.mu.RUnlock() + for _, address := range batch.Addresses { + key := address.String() + if node, ok := s.contexts[key]; ok { + result.Contexts[key] = node + result.SuccessCount++ + } else { + result.Errors[key] = ErrNotFound + result.ErrorCount++ + } + } + return result, nil +} + +func (s *inMemoryContextStore) GetStorageStats(ctx context.Context) (*StorageStatistics, error) { + s.mu.RLock() + defer s.mu.RUnlock() + return &StorageStatistics{ + TotalContexts: int64(len(s.contexts)), + LocalContexts: int64(len(s.contexts)), + LastSyncTime: time.Now(), + }, nil +} + +func (s *inMemoryContextStore) Sync(ctx context.Context) error { + return nil +} + +func (s *inMemoryContextStore) Backup(ctx context.Context, destination string) error { + return nil +} + +func (s *inMemoryContextStore) Restore(ctx context.Context, source string) error { + return nil +}