Fix temporal persistence wiring and restore slurp_full suite

This commit is contained in:
anthonyrawlins
2025-09-28 11:39:03 +10:00
parent 9c32755632
commit 2ff408729c
16 changed files with 1195 additions and 802 deletions

View File

@@ -19,7 +19,8 @@ type temporalGraphImpl struct {
mu sync.RWMutex
// Core storage
storage storage.ContextStore
storage storage.ContextStore
persistence nodePersister
// In-memory graph structures for fast access
nodes map[string]*TemporalNode // nodeID -> TemporalNode
@@ -42,6 +43,10 @@ type temporalGraphImpl struct {
stalenessWeight *StalenessWeights
}
type nodePersister interface {
PersistTemporalNode(ctx context.Context, node *TemporalNode) error
}
// NewTemporalGraph creates a new temporal graph implementation
func NewTemporalGraph(storage storage.ContextStore) TemporalGraph {
return &temporalGraphImpl{
@@ -177,16 +182,40 @@ func (tg *temporalGraphImpl) EvolveContext(ctx context.Context, address ucxl.Add
}
// Copy influence relationships from parent
if len(latestNode.Influences) > 0 {
temporalNode.Influences = append([]ucxl.Address(nil), latestNode.Influences...)
} else {
temporalNode.Influences = make([]ucxl.Address, 0)
}
if len(latestNode.InfluencedBy) > 0 {
temporalNode.InfluencedBy = append([]ucxl.Address(nil), latestNode.InfluencedBy...)
} else {
temporalNode.InfluencedBy = make([]ucxl.Address, 0)
}
if latestNodeInfluences, exists := tg.influences[latestNode.ID]; exists {
tg.influences[nodeID] = make([]string, len(latestNodeInfluences))
copy(tg.influences[nodeID], latestNodeInfluences)
cloned := append([]string(nil), latestNodeInfluences...)
tg.influences[nodeID] = cloned
for _, targetID := range cloned {
tg.influencedBy[targetID] = ensureString(tg.influencedBy[targetID], nodeID)
if targetNode, ok := tg.nodes[targetID]; ok {
targetNode.InfluencedBy = ensureAddress(targetNode.InfluencedBy, address)
}
}
} else {
tg.influences[nodeID] = make([]string, 0)
}
if latestNodeInfluencedBy, exists := tg.influencedBy[latestNode.ID]; exists {
tg.influencedBy[nodeID] = make([]string, len(latestNodeInfluencedBy))
copy(tg.influencedBy[nodeID], latestNodeInfluencedBy)
cloned := append([]string(nil), latestNodeInfluencedBy...)
tg.influencedBy[nodeID] = cloned
for _, sourceID := range cloned {
tg.influences[sourceID] = ensureString(tg.influences[sourceID], nodeID)
if sourceNode, ok := tg.nodes[sourceID]; ok {
sourceNode.Influences = ensureAddress(sourceNode.Influences, address)
}
}
} else {
tg.influencedBy[nodeID] = make([]string, 0)
}
@@ -534,8 +563,7 @@ func (tg *temporalGraphImpl) FindDecisionPath(ctx context.Context, from, to ucxl
return nil, fmt.Errorf("from node not found: %w", err)
}
_, err := tg.getLatestNodeUnsafe(to)
if err != nil {
if _, err := tg.getLatestNodeUnsafe(to); err != nil {
return nil, fmt.Errorf("to node not found: %w", err)
}
@@ -750,31 +778,73 @@ func (tg *temporalGraphImpl) CompactHistory(ctx context.Context, beforeTime time
compacted := 0
// For each address, keep only the latest version and major milestones before the cutoff
for address, nodes := range tg.addressToNodes {
toKeep := make([]*TemporalNode, 0)
if len(nodes) == 0 {
continue
}
latestNode := nodes[len(nodes)-1]
toKeep := make([]*TemporalNode, 0, len(nodes))
toRemove := make([]*TemporalNode, 0)
for _, node := range nodes {
// Always keep nodes after the cutoff time
if node.Timestamp.After(beforeTime) {
if node == latestNode {
toKeep = append(toKeep, node)
continue
}
// Keep major changes and influential decisions
if tg.isMajorChange(node) || tg.isInfluentialDecision(node) {
if node.Timestamp.After(beforeTime) || tg.isMajorChange(node) || tg.isInfluentialDecision(node) {
toKeep = append(toKeep, node)
} else {
toRemove = append(toRemove, node)
continue
}
toRemove = append(toRemove, node)
}
// Update the address mapping
if len(toKeep) == 0 {
toKeep = append(toKeep, latestNode)
}
sort.Slice(toKeep, func(i, j int) bool {
return toKeep[i].Version < toKeep[j].Version
})
tg.addressToNodes[address] = toKeep
// Remove old nodes from main maps
for _, node := range toRemove {
if outgoing, exists := tg.influences[node.ID]; exists {
for _, targetID := range outgoing {
tg.influencedBy[targetID] = tg.removeFromSlice(tg.influencedBy[targetID], node.ID)
if targetNode, ok := tg.nodes[targetID]; ok {
targetNode.InfluencedBy = tg.removeAddressFromSlice(targetNode.InfluencedBy, node.UCXLAddress)
}
}
}
if incoming, exists := tg.influencedBy[node.ID]; exists {
for _, sourceID := range incoming {
tg.influences[sourceID] = tg.removeFromSlice(tg.influences[sourceID], node.ID)
if sourceNode, ok := tg.nodes[sourceID]; ok {
sourceNode.Influences = tg.removeAddressFromSlice(sourceNode.Influences, node.UCXLAddress)
}
}
}
if decisionNodes, exists := tg.decisionToNodes[node.DecisionID]; exists {
filtered := make([]*TemporalNode, 0, len(decisionNodes))
for _, candidate := range decisionNodes {
if candidate.ID != node.ID {
filtered = append(filtered, candidate)
}
}
if len(filtered) == 0 {
delete(tg.decisionToNodes, node.DecisionID)
delete(tg.decisions, node.DecisionID)
} else {
tg.decisionToNodes[node.DecisionID] = filtered
}
}
delete(tg.nodes, node.ID)
delete(tg.influences, node.ID)
delete(tg.influencedBy, node.ID)
@@ -782,7 +852,6 @@ func (tg *temporalGraphImpl) CompactHistory(ctx context.Context, beforeTime time
}
}
// Clear caches after compaction
tg.pathCache = make(map[string][]*DecisionStep)
tg.metricsCache = make(map[string]interface{})
@@ -901,12 +970,62 @@ func (tg *temporalGraphImpl) isInfluentialDecision(node *TemporalNode) bool {
}
func (tg *temporalGraphImpl) persistTemporalNode(ctx context.Context, node *TemporalNode) error {
// Convert to storage format and persist
// This would integrate with the storage system
// For now, we'll assume persistence happens in memory
if node == nil {
return fmt.Errorf("temporal node cannot be nil")
}
if tg.persistence != nil {
if err := tg.persistence.PersistTemporalNode(ctx, node); err != nil {
return fmt.Errorf("failed to persist temporal node: %w", err)
}
}
if tg.storage == nil || node.Context == nil {
return nil
}
roles := node.Context.EncryptedFor
if len(roles) == 0 {
roles = []string{"default"}
}
exists, err := tg.storage.ExistsContext(ctx, node.Context.UCXLAddress)
if err != nil {
return fmt.Errorf("failed to check context existence: %w", err)
}
if exists {
if err := tg.storage.UpdateContext(ctx, node.Context, roles); err != nil {
return fmt.Errorf("failed to update context for %s: %w", node.Context.UCXLAddress.String(), err)
}
return nil
}
if err := tg.storage.StoreContext(ctx, node.Context, roles); err != nil {
return fmt.Errorf("failed to store context for %s: %w", node.Context.UCXLAddress.String(), err)
}
return nil
}
func ensureString(list []string, value string) []string {
for _, existing := range list {
if existing == value {
return list
}
}
return append(list, value)
}
func ensureAddress(list []ucxl.Address, value ucxl.Address) []ucxl.Address {
for _, existing := range list {
if existing.String() == value.String() {
return list
}
}
return append(list, value)
}
func contains(s, substr string) bool {
return len(s) >= len(substr) && (s == substr ||
(len(s) > len(substr) && (s[:len(substr)] == substr || s[len(s)-len(substr):] == substr)))