From 8f4c80f63d68417af80f0efd3d760d7644657dcd Mon Sep 17 00:00:00 2001 From: anthonyrawlins Date: Sun, 28 Sep 2025 11:59:52 +1000 Subject: [PATCH] Add helper for DHT-backed temporal persistence --- pkg/slurp/temporal/dht_builder.go | 67 +++++++++++++++++++++++++++++++ 1 file changed, 67 insertions(+) create mode 100644 pkg/slurp/temporal/dht_builder.go diff --git a/pkg/slurp/temporal/dht_builder.go b/pkg/slurp/temporal/dht_builder.go new file mode 100644 index 0000000..02e3b1d --- /dev/null +++ b/pkg/slurp/temporal/dht_builder.go @@ -0,0 +1,67 @@ +package temporal + +import ( + "context" + "fmt" + "time" + + "chorus/pkg/dht" + "chorus/pkg/slurp/storage" +) + +// NewDHTBackedTemporalGraphSystem constructs a temporal graph system whose persistence +// layer replicates snapshots through the provided libp2p DHT. When no DHT instance is +// supplied the function falls back to local-only persistence so callers can degrade +// gracefully during bring-up. +func NewDHTBackedTemporalGraphSystem( + ctx context.Context, + contextStore storage.ContextStore, + localStorage storage.LocalStorage, + dhtInstance dht.DHT, + nodeID string, + cfg *TemporalConfig, +) (*TemporalGraphSystem, error) { + if contextStore == nil { + return nil, fmt.Errorf("context store is required") + } + if localStorage == nil { + return nil, fmt.Errorf("local storage is required") + } + if cfg == nil { + cfg = DefaultTemporalConfig() + } + + // Ensure persistence is configured for distributed replication when a DHT is present. + if cfg.PersistenceConfig == nil { + cfg.PersistenceConfig = defaultPersistenceConfig() + } + cfg.PersistenceConfig.EnableLocalStorage = true + cfg.PersistenceConfig.EnableDistributedStorage = dhtInstance != nil + + // Disable write buffering by default so we do not depend on ContextStore batch APIs + // when callers only wire the DHT layer. + cfg.PersistenceConfig.EnableWriteBuffer = false + cfg.PersistenceConfig.BatchSize = 1 + + if nodeID == "" { + nodeID = fmt.Sprintf("slurp-node-%d", time.Now().UnixNano()) + } + + var distributed storage.DistributedStorage + if dhtInstance != nil { + distributed = storage.NewDistributedStorage(dhtInstance, nodeID, nil) + } + + factory := NewTemporalGraphFactory(contextStore, cfg) + + system, err := factory.CreateTemporalGraphSystem(localStorage, distributed, nil, nil) + if err != nil { + return nil, fmt.Errorf("failed to create temporal graph system: %w", err) + } + + if err := system.PersistenceManager.LoadTemporalGraph(ctx); err != nil { + return nil, fmt.Errorf("failed to load temporal graph: %w", err) + } + + return system, nil +}