feat: bootstrap temporal graph via dht-backed init
This commit is contained in:
@@ -42,6 +42,7 @@ import (
|
|||||||
"chorus/pkg/election"
|
"chorus/pkg/election"
|
||||||
slurpContext "chorus/pkg/slurp/context"
|
slurpContext "chorus/pkg/slurp/context"
|
||||||
"chorus/pkg/slurp/storage"
|
"chorus/pkg/slurp/storage"
|
||||||
|
"chorus/pkg/slurp/temporal"
|
||||||
"chorus/pkg/ucxl"
|
"chorus/pkg/ucxl"
|
||||||
)
|
)
|
||||||
|
|
||||||
@@ -71,6 +72,8 @@ type SLURP struct {
|
|||||||
// Core components
|
// Core components
|
||||||
contextResolver ContextResolver
|
contextResolver ContextResolver
|
||||||
temporalGraph TemporalGraph
|
temporalGraph TemporalGraph
|
||||||
|
temporalSystem *temporal.TemporalGraphSystem
|
||||||
|
temporalStore storage.ContextStore
|
||||||
storage DistributedStorage
|
storage DistributedStorage
|
||||||
intelligence ContextGenerator
|
intelligence ContextGenerator
|
||||||
retrieval QueryEngine
|
retrieval QueryEngine
|
||||||
@@ -456,6 +459,10 @@ func (s *SLURP) Initialize(ctx context.Context) error {
|
|||||||
return fmt.Errorf("failed to load persisted contexts: %w", err)
|
return fmt.Errorf("failed to load persisted contexts: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if err := s.initializeTemporalSystem(s.ctx); err != nil {
|
||||||
|
return fmt.Errorf("failed to initialize temporal system: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
// TODO: Initialize components in dependency order
|
// TODO: Initialize components in dependency order
|
||||||
// 1. Initialize storage layer first
|
// 1. Initialize storage layer first
|
||||||
// 2. Initialize context resolver with storage
|
// 2. Initialize context resolver with storage
|
||||||
@@ -1179,6 +1186,60 @@ func (s *SLURP) setupPersistentStorage() error {
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// initializeTemporalSystem wires the temporal graph to the DHT-backed persistence layer.
|
||||||
|
func (s *SLURP) initializeTemporalSystem(ctx context.Context) error {
|
||||||
|
if s.temporalGraph != nil {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
if s.localStorage == nil {
|
||||||
|
return fmt.Errorf("temporal persistence requires local storage")
|
||||||
|
}
|
||||||
|
|
||||||
|
if s.temporalStore == nil {
|
||||||
|
s.temporalStore = storage.NewInMemoryContextStore()
|
||||||
|
}
|
||||||
|
|
||||||
|
cfg := temporal.DefaultTemporalConfig()
|
||||||
|
if cfg.PersistenceConfig == nil {
|
||||||
|
cfg.PersistenceConfig = &temporal.PersistenceConfig{}
|
||||||
|
}
|
||||||
|
|
||||||
|
cfg.PersistenceConfig.EnableWriteBuffer = false
|
||||||
|
cfg.PersistenceConfig.EnableAutoSync = false
|
||||||
|
cfg.PersistenceConfig.EnableAutoBackup = false
|
||||||
|
cfg.PersistenceConfig.EnableLocalStorage = true
|
||||||
|
cfg.PersistenceConfig.EnableDistributedStorage = s.dht != nil
|
||||||
|
cfg.PersistenceConfig.EnableEncryption = false
|
||||||
|
cfg.PersistenceConfig.BatchSize = 1
|
||||||
|
cfg.PersistenceConfig.FlushInterval = 30 * time.Second
|
||||||
|
if len(cfg.PersistenceConfig.EncryptionRoles) == 0 {
|
||||||
|
cfg.PersistenceConfig.EncryptionRoles = []string{"default"}
|
||||||
|
}
|
||||||
|
|
||||||
|
nodeID := s.config.Agent.ID
|
||||||
|
if nodeID == "" {
|
||||||
|
nodeID = fmt.Sprintf("slurp-node-%d", time.Now().UnixNano())
|
||||||
|
}
|
||||||
|
|
||||||
|
system, err := temporal.NewDHTBackedTemporalGraphSystem(
|
||||||
|
s.runtimeContext(ctx),
|
||||||
|
s.temporalStore,
|
||||||
|
s.localStorage,
|
||||||
|
s.dht,
|
||||||
|
nodeID,
|
||||||
|
cfg,
|
||||||
|
)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("failed to build DHT temporal system: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
s.temporalSystem = system
|
||||||
|
s.temporalGraph = system.Graph
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
// loadPersistedContexts warms caches from disk (Roadmap: SEC-SLURP 1.1).
|
// loadPersistedContexts warms caches from disk (Roadmap: SEC-SLURP 1.1).
|
||||||
func (s *SLURP) loadPersistedContexts(ctx context.Context) error {
|
func (s *SLURP) loadPersistedContexts(ctx context.Context) error {
|
||||||
if s.localStorage == nil {
|
if s.localStorage == nil {
|
||||||
|
|||||||
155
pkg/slurp/storage/context_store_inmemory.go
Normal file
155
pkg/slurp/storage/context_store_inmemory.go
Normal file
@@ -0,0 +1,155 @@
|
|||||||
|
package storage
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"sync"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
slurpContext "chorus/pkg/slurp/context"
|
||||||
|
"chorus/pkg/ucxl"
|
||||||
|
)
|
||||||
|
|
||||||
|
// inMemoryContextStore offers a lightweight ContextStore implementation suitable for
|
||||||
|
// local development and SEC-SLURP bootstrap scenarios. It keeps all context nodes in
|
||||||
|
// process memory, providing the minimal surface required by the temporal subsystem until
|
||||||
|
// the production storage stack is wired in.
|
||||||
|
type inMemoryContextStore struct {
|
||||||
|
mu sync.RWMutex
|
||||||
|
contexts map[string]*slurpContext.ContextNode
|
||||||
|
}
|
||||||
|
|
||||||
|
// NewInMemoryContextStore constructs an in-memory ContextStore.
|
||||||
|
func NewInMemoryContextStore() ContextStore {
|
||||||
|
return &inMemoryContextStore{
|
||||||
|
contexts: make(map[string]*slurpContext.ContextNode),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *inMemoryContextStore) StoreContext(ctx context.Context, node *slurpContext.ContextNode, roles []string) error {
|
||||||
|
if node == nil {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
s.mu.Lock()
|
||||||
|
defer s.mu.Unlock()
|
||||||
|
s.contexts[node.UCXLAddress.String()] = node
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *inMemoryContextStore) RetrieveContext(ctx context.Context, address ucxl.Address, role string) (*slurpContext.ContextNode, error) {
|
||||||
|
s.mu.RLock()
|
||||||
|
defer s.mu.RUnlock()
|
||||||
|
node, ok := s.contexts[address.String()]
|
||||||
|
if !ok {
|
||||||
|
return nil, ErrNotFound
|
||||||
|
}
|
||||||
|
return node, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *inMemoryContextStore) UpdateContext(ctx context.Context, node *slurpContext.ContextNode, roles []string) error {
|
||||||
|
if node == nil {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
s.mu.Lock()
|
||||||
|
defer s.mu.Unlock()
|
||||||
|
s.contexts[node.UCXLAddress.String()] = node
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *inMemoryContextStore) DeleteContext(ctx context.Context, address ucxl.Address) error {
|
||||||
|
s.mu.Lock()
|
||||||
|
defer s.mu.Unlock()
|
||||||
|
delete(s.contexts, address.String())
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *inMemoryContextStore) ExistsContext(ctx context.Context, address ucxl.Address) (bool, error) {
|
||||||
|
s.mu.RLock()
|
||||||
|
defer s.mu.RUnlock()
|
||||||
|
_, ok := s.contexts[address.String()]
|
||||||
|
return ok, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *inMemoryContextStore) ListContexts(ctx context.Context, criteria *ListCriteria) ([]*slurpContext.ContextNode, error) {
|
||||||
|
s.mu.RLock()
|
||||||
|
defer s.mu.RUnlock()
|
||||||
|
results := make([]*slurpContext.ContextNode, 0, len(s.contexts))
|
||||||
|
for _, node := range s.contexts {
|
||||||
|
results = append(results, node)
|
||||||
|
}
|
||||||
|
return results, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *inMemoryContextStore) SearchContexts(ctx context.Context, query *SearchQuery) (*SearchResults, error) {
|
||||||
|
return &SearchResults{
|
||||||
|
Results: []*SearchResult{},
|
||||||
|
TotalResults: 0,
|
||||||
|
ProcessingTime: 0,
|
||||||
|
Facets: map[string]map[string]int{},
|
||||||
|
Suggestions: []string{},
|
||||||
|
ProcessedAt: time.Now(),
|
||||||
|
}, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *inMemoryContextStore) BatchStore(ctx context.Context, batch *BatchStoreRequest) (*BatchStoreResult, error) {
|
||||||
|
if batch == nil {
|
||||||
|
return &BatchStoreResult{}, nil
|
||||||
|
}
|
||||||
|
s.mu.Lock()
|
||||||
|
defer s.mu.Unlock()
|
||||||
|
success := 0
|
||||||
|
for _, item := range batch.Contexts {
|
||||||
|
if item == nil || item.Context == nil {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
s.contexts[item.Context.UCXLAddress.String()] = item.Context
|
||||||
|
success++
|
||||||
|
}
|
||||||
|
return &BatchStoreResult{SuccessCount: success}, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *inMemoryContextStore) BatchRetrieve(ctx context.Context, batch *BatchRetrieveRequest) (*BatchRetrieveResult, error) {
|
||||||
|
result := &BatchRetrieveResult{
|
||||||
|
Contexts: make(map[string]*slurpContext.ContextNode),
|
||||||
|
Errors: make(map[string]error),
|
||||||
|
ProcessedAt: time.Now(),
|
||||||
|
ProcessingTime: 0,
|
||||||
|
}
|
||||||
|
if batch == nil {
|
||||||
|
return result, nil
|
||||||
|
}
|
||||||
|
s.mu.RLock()
|
||||||
|
defer s.mu.RUnlock()
|
||||||
|
for _, address := range batch.Addresses {
|
||||||
|
key := address.String()
|
||||||
|
if node, ok := s.contexts[key]; ok {
|
||||||
|
result.Contexts[key] = node
|
||||||
|
result.SuccessCount++
|
||||||
|
} else {
|
||||||
|
result.Errors[key] = ErrNotFound
|
||||||
|
result.ErrorCount++
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return result, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *inMemoryContextStore) GetStorageStats(ctx context.Context) (*StorageStatistics, error) {
|
||||||
|
s.mu.RLock()
|
||||||
|
defer s.mu.RUnlock()
|
||||||
|
return &StorageStatistics{
|
||||||
|
TotalContexts: int64(len(s.contexts)),
|
||||||
|
LocalContexts: int64(len(s.contexts)),
|
||||||
|
LastSyncTime: time.Now(),
|
||||||
|
}, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *inMemoryContextStore) Sync(ctx context.Context) error {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *inMemoryContextStore) Backup(ctx context.Context, destination string) error {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *inMemoryContextStore) Restore(ctx context.Context, source string) error {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
Reference in New Issue
Block a user