Files
CHORUS/pkg/slurp/distribution/dht_impl.go
2025-09-27 21:35:15 +10:00

657 lines
20 KiB
Go

//go:build slurp_full
// +build slurp_full
// Package distribution provides DHT-based context distribution implementation
package distribution
import (
"context"
"crypto/sha256"
"encoding/hex"
"encoding/json"
"fmt"
"sync"
"time"
"chorus/pkg/config"
"chorus/pkg/crypto"
"chorus/pkg/dht"
"chorus/pkg/election"
slurpContext "chorus/pkg/slurp/context"
"chorus/pkg/ucxl"
)
// DHTContextDistributor implements ContextDistributor using CHORUS DHT infrastructure
type DHTContextDistributor struct {
mu sync.RWMutex
dht dht.DHT
roleCrypto *crypto.RoleCrypto
election election.Election
config *config.Config
deploymentID string
stats *DistributionStatistics
replicationMgr ReplicationManager
conflictResolver ConflictResolver
gossipProtocol GossipProtocol
networkMgr NetworkManager
keyGenerator KeyGenerator
vectorClockMgr VectorClockManager
}
// NewDHTContextDistributor creates a new DHT-based context distributor
func NewDHTContextDistributor(
dht dht.DHT,
roleCrypto *crypto.RoleCrypto,
election election.Election,
config *config.Config,
) (*DHTContextDistributor, error) {
if dht == nil {
return nil, fmt.Errorf("DHT instance is required")
}
if roleCrypto == nil {
return nil, fmt.Errorf("role crypto instance is required")
}
if config == nil {
return nil, fmt.Errorf("config is required")
}
deploymentID := fmt.Sprintf("CHORUS-slurp-%s", config.Agent.ID)
dist := &DHTContextDistributor{
dht: dht,
roleCrypto: roleCrypto,
election: election,
config: config,
deploymentID: deploymentID,
stats: &DistributionStatistics{
LastResetTime: time.Now(),
CollectedAt: time.Now(),
},
keyGenerator: NewDHTKeyGenerator(deploymentID),
}
// Initialize components
if err := dist.initializeComponents(); err != nil {
return nil, fmt.Errorf("failed to initialize components: %w", err)
}
return dist, nil
}
// initializeComponents initializes all sub-components
func (d *DHTContextDistributor) initializeComponents() error {
// Initialize replication manager
replicationMgr, err := NewReplicationManager(d.dht, d.config)
if err != nil {
return fmt.Errorf("failed to create replication manager: %w", err)
}
d.replicationMgr = replicationMgr
// Initialize conflict resolver
conflictResolver, err := NewConflictResolver(d.dht, d.config)
if err != nil {
return fmt.Errorf("failed to create conflict resolver: %w", err)
}
d.conflictResolver = conflictResolver
// Initialize gossip protocol
gossipProtocol, err := NewGossipProtocol(d.dht, d.config)
if err != nil {
return fmt.Errorf("failed to create gossip protocol: %w", err)
}
d.gossipProtocol = gossipProtocol
// Initialize network manager
networkMgr, err := NewNetworkManager(d.dht, d.config)
if err != nil {
return fmt.Errorf("failed to create network manager: %w", err)
}
d.networkMgr = networkMgr
// Initialize vector clock manager
vectorClockMgr, err := NewVectorClockManager(d.dht, d.config.Agent.ID)
if err != nil {
return fmt.Errorf("failed to create vector clock manager: %w", err)
}
d.vectorClockMgr = vectorClockMgr
return nil
}
// DistributeContext encrypts and stores context in DHT for role-based access
func (d *DHTContextDistributor) DistributeContext(ctx context.Context, node *slurpContext.ContextNode, roles []string) error {
start := time.Now()
d.mu.Lock()
d.stats.TotalDistributions++
d.mu.Unlock()
defer func() {
duration := time.Since(start)
d.mu.Lock()
d.stats.AverageDistributionTime = (d.stats.AverageDistributionTime + duration) / 2
d.mu.Unlock()
}()
if node == nil {
return d.recordError("node cannot be nil")
}
if len(roles) == 0 {
return d.recordError("roles cannot be empty")
}
// Validate context node
if err := node.Validate(); err != nil {
return d.recordError(fmt.Sprintf("context validation failed: %v", err))
}
// Get current vector clock
clock, err := d.vectorClockMgr.GetClock(d.config.Agent.ID)
if err != nil {
return d.recordError(fmt.Sprintf("failed to get vector clock: %v", err))
}
// Prepare context payload for role encryption
rawContext, err := json.Marshal(node)
if err != nil {
return d.recordError(fmt.Sprintf("failed to marshal context: %v", err))
}
// Create distribution metadata (checksum calculated per-role below)
metadata := &DistributionMetadata{
Address: node.UCXLAddress,
Roles: roles,
Version: 1,
VectorClock: clock,
DistributedBy: d.config.Agent.ID,
DistributedAt: time.Now(),
ReplicationFactor: d.getReplicationFactor(),
}
// Store encrypted data in DHT for each role
for _, role := range roles {
key := d.keyGenerator.GenerateContextKey(node.UCXLAddress.String(), role)
cipher, fingerprint, err := d.roleCrypto.EncryptForRole(rawContext, role)
if err != nil {
return d.recordError(fmt.Sprintf("failed to encrypt context for role %s: %v", role, err))
}
// Create role-specific storage package
storagePackage := &ContextStoragePackage{
EncryptedData: cipher,
KeyFingerprint: fingerprint,
Metadata: metadata,
Role: role,
StoredAt: time.Now(),
}
metadata.Checksum = d.calculateChecksum(cipher)
// Serialize for storage
storageBytes, err := json.Marshal(storagePackage)
if err != nil {
return d.recordError(fmt.Sprintf("failed to serialize storage package: %v", err))
}
// Store in DHT with replication
if err := d.dht.PutValue(ctx, key, storageBytes); err != nil {
return d.recordError(fmt.Sprintf("failed to store in DHT for role %s: %v", role, err))
}
// Announce that we provide this context
if err := d.dht.Provide(ctx, key); err != nil {
// Log warning but don't fail - this is for discovery optimization
continue
}
}
// Ensure replication
if err := d.replicationMgr.EnsureReplication(ctx, node.UCXLAddress, d.getReplicationFactor()); err != nil {
// Log warning but don't fail - replication can be eventually consistent
}
// Update statistics
d.mu.Lock()
d.stats.SuccessfulDistributions++
d.stats.TotalContextsStored++
d.stats.LastSyncTime = time.Now()
d.mu.Unlock()
return nil
}
// RetrieveContext gets context from DHT and decrypts for the requesting role
func (d *DHTContextDistributor) RetrieveContext(ctx context.Context, address ucxl.Address, role string) (*slurpContext.ResolvedContext, error) {
start := time.Now()
d.mu.Lock()
d.stats.TotalRetrievals++
d.mu.Unlock()
defer func() {
duration := time.Since(start)
d.mu.Lock()
d.stats.AverageRetrievalTime = (d.stats.AverageRetrievalTime + duration) / 2
d.mu.Unlock()
}()
// Generate key for the role
key := d.keyGenerator.GenerateContextKey(address.String(), role)
// Retrieve from DHT
storageBytes, err := d.dht.GetValue(ctx, key)
if err != nil {
// Try to find providers if direct lookup fails
providers, findErr := d.dht.FindProviders(ctx, key, 5)
if findErr != nil || len(providers) == 0 {
return nil, d.recordRetrievalError(fmt.Sprintf("context not found for role %s: %v", role, err))
}
// Try retrieving from providers
for _, provider := range providers {
// In a real implementation, we would connect to the provider
// For now, we'll just return the original error
_ = provider
}
return nil, d.recordRetrievalError(fmt.Sprintf("context not found for role %s: %v", role, err))
}
// Deserialize storage package
var storagePackage ContextStoragePackage
if err := json.Unmarshal(storageBytes, &storagePackage); err != nil {
return nil, d.recordRetrievalError(fmt.Sprintf("failed to deserialize storage package: %v", err))
}
// Decrypt context for role
plain, err := d.roleCrypto.DecryptForRole(storagePackage.EncryptedData, role, storagePackage.KeyFingerprint)
if err != nil {
return nil, d.recordRetrievalError(fmt.Sprintf("failed to decrypt context: %v", err))
}
var contextNode slurpContext.ContextNode
if err := json.Unmarshal(plain, &contextNode); err != nil {
return nil, d.recordRetrievalError(fmt.Sprintf("failed to decode context: %v", err))
}
// Convert to resolved context
resolvedContext := &slurpContext.ResolvedContext{
UCXLAddress: contextNode.UCXLAddress,
Summary: contextNode.Summary,
Purpose: contextNode.Purpose,
Technologies: contextNode.Technologies,
Tags: contextNode.Tags,
Insights: contextNode.Insights,
ContextSourcePath: contextNode.Path,
InheritanceChain: []string{contextNode.Path},
ResolutionConfidence: contextNode.RAGConfidence,
BoundedDepth: 1,
GlobalContextsApplied: false,
ResolvedAt: time.Now(),
}
// Update statistics
d.mu.Lock()
d.stats.SuccessfulRetrievals++
d.mu.Unlock()
return resolvedContext, nil
}
// UpdateContext updates existing distributed context with conflict resolution
func (d *DHTContextDistributor) UpdateContext(ctx context.Context, node *slurpContext.ContextNode, roles []string) (*ConflictResolution, error) {
start := time.Now()
// Check if context already exists
existingContext, err := d.RetrieveContext(ctx, node.UCXLAddress, d.config.Agent.Role)
if err != nil {
// Context doesn't exist, treat as new distribution
if err := d.DistributeContext(ctx, node, roles); err != nil {
return nil, fmt.Errorf("failed to distribute new context: %w", err)
}
return &ConflictResolution{
Address: node.UCXLAddress,
ResolutionType: ResolutionMerged,
MergedContext: node,
ResolutionTime: time.Since(start),
ResolvedAt: time.Now(),
Confidence: 1.0,
}, nil
}
// Convert existing resolved context back to context node for comparison
existingNode := &slurpContext.ContextNode{
Path: existingContext.ContextSourcePath,
UCXLAddress: existingContext.UCXLAddress,
Summary: existingContext.Summary,
Purpose: existingContext.Purpose,
Technologies: existingContext.Technologies,
Tags: existingContext.Tags,
Insights: existingContext.Insights,
RAGConfidence: existingContext.ResolutionConfidence,
GeneratedAt: existingContext.ResolvedAt,
}
// Use conflict resolver to handle the update
resolution, err := d.conflictResolver.ResolveConflict(ctx, node, existingNode)
if err != nil {
return nil, fmt.Errorf("failed to resolve conflict: %w", err)
}
// Distribute the resolved context
if resolution.MergedContext != nil {
if err := d.DistributeContext(ctx, resolution.MergedContext, roles); err != nil {
return nil, fmt.Errorf("failed to distribute merged context: %w", err)
}
}
return resolution, nil
}
// DeleteContext removes context from distributed storage
func (d *DHTContextDistributor) DeleteContext(ctx context.Context, address ucxl.Address) error {
// Get list of roles that have access to this context
// This is simplified - in production, we'd maintain an index
allRoles := []string{"senior_architect", "project_manager", "devops_engineer", "backend_developer", "frontend_developer"}
// Delete from DHT for each role
var errors []string
for _, role := range allRoles {
key := d.keyGenerator.GenerateContextKey(address.String(), role)
if err := d.dht.PutValue(ctx, key, []byte{}); err != nil {
errors = append(errors, fmt.Sprintf("failed to delete for role %s: %v", role, err))
}
}
if len(errors) > 0 {
return fmt.Errorf("deletion errors: %v", errors)
}
return nil
}
// ListDistributedContexts lists contexts available in the DHT for a role
func (d *DHTContextDistributor) ListDistributedContexts(ctx context.Context, role string, criteria *DistributionCriteria) ([]*DistributedContextInfo, error) {
// This is a simplified implementation
// In production, we'd maintain proper indexes and filtering
results := []*DistributedContextInfo{}
limit := 100
if criteria != nil && criteria.Limit > 0 {
limit = criteria.Limit
}
// For now, return empty list - proper implementation would require
// maintaining an index of all contexts in the cluster
_ = limit
return results, nil
}
// Sync synchronizes local state with distributed DHT
func (d *DHTContextDistributor) Sync(ctx context.Context) (*SyncResult, error) {
start := time.Now()
// Use gossip protocol to sync metadata
if err := d.gossipProtocol.StartGossip(ctx); err != nil {
return nil, fmt.Errorf("failed to start gossip sync: %w", err)
}
result := &SyncResult{
SyncedContexts: 0, // Would be populated in real implementation
ConflictsResolved: 0,
Errors: []string{},
SyncTime: time.Since(start),
PeersContacted: len(d.dht.GetConnectedPeers()),
DataTransferred: 0,
SyncedAt: time.Now(),
}
return result, nil
}
// Replicate ensures context has the desired replication factor
func (d *DHTContextDistributor) Replicate(ctx context.Context, address ucxl.Address, replicationFactor int) error {
return d.replicationMgr.EnsureReplication(ctx, address, replicationFactor)
}
// GetReplicaHealth returns health status of context replicas
func (d *DHTContextDistributor) GetReplicaHealth(ctx context.Context, address ucxl.Address) (*ReplicaHealth, error) {
return d.replicationMgr.GetReplicationStatus(ctx, address)
}
// GetDistributionStats returns distribution performance statistics
func (d *DHTContextDistributor) GetDistributionStats() (*DistributionStatistics, error) {
d.mu.RLock()
defer d.mu.RUnlock()
// Update collection timestamp
d.stats.CollectedAt = time.Now()
// Calculate derived metrics
totalOps := d.stats.TotalDistributions + d.stats.TotalRetrievals
if totalOps > 0 {
d.stats.HealthyNodes = len(d.dht.GetConnectedPeers())
}
return d.stats, nil
}
// SetReplicationPolicy configures replication behavior
func (d *DHTContextDistributor) SetReplicationPolicy(policy *ReplicationPolicy) error {
return d.replicationMgr.SetReplicationFactor(policy.DefaultFactor)
}
// Helper methods
func (d *DHTContextDistributor) recordError(message string) error {
d.mu.Lock()
d.stats.FailedDistributions++
d.mu.Unlock()
return fmt.Errorf(message)
}
func (d *DHTContextDistributor) recordRetrievalError(message string) error {
d.mu.Lock()
d.stats.FailedRetrievals++
d.mu.Unlock()
return fmt.Errorf(message)
}
func (d *DHTContextDistributor) getReplicationFactor() int {
return 3 // Default replication factor
}
func (d *DHTContextDistributor) calculateChecksum(data interface{}) string {
bytes, err := json.Marshal(data)
if err != nil {
return ""
}
hash := sha256.Sum256(bytes)
return hex.EncodeToString(hash[:])
}
// Start starts the distribution service
func (d *DHTContextDistributor) Start(ctx context.Context) error {
if d.gossipProtocol != nil {
if err := d.gossipProtocol.StartGossip(ctx); err != nil {
return fmt.Errorf("failed to start gossip protocol: %w", err)
}
}
return nil
}
// Stop stops the distribution service
func (d *DHTContextDistributor) Stop(ctx context.Context) error {
// Implementation would stop all background processes
return nil
}
// Supporting types and structures
// ContextStoragePackage represents a complete package for DHT storage
type ContextStoragePackage struct {
EncryptedData []byte `json:"encrypted_data"`
KeyFingerprint string `json:"key_fingerprint,omitempty"`
Metadata *DistributionMetadata `json:"metadata"`
Role string `json:"role"`
StoredAt time.Time `json:"stored_at"`
}
// DistributionMetadata contains metadata for distributed context
type DistributionMetadata struct {
Address ucxl.Address `json:"address"`
Roles []string `json:"roles"`
Version int64 `json:"version"`
VectorClock *VectorClock `json:"vector_clock"`
DistributedBy string `json:"distributed_by"`
DistributedAt time.Time `json:"distributed_at"`
ReplicationFactor int `json:"replication_factor"`
Checksum string `json:"checksum"`
}
// DHTKeyGenerator implements KeyGenerator interface
type DHTKeyGenerator struct {
deploymentID string
}
func NewDHTKeyGenerator(deploymentID string) *DHTKeyGenerator {
return &DHTKeyGenerator{
deploymentID: deploymentID,
}
}
func (kg *DHTKeyGenerator) GenerateContextKey(address string, role string) string {
return fmt.Sprintf("%s:context:%s:%s", kg.deploymentID, address, role)
}
func (kg *DHTKeyGenerator) GenerateMetadataKey(address string) string {
return fmt.Sprintf("%s:metadata:%s", kg.deploymentID, address)
}
func (kg *DHTKeyGenerator) GenerateReplicationKey(address string) string {
return fmt.Sprintf("%s:replication:%s", kg.deploymentID, address)
}
// Component constructors - these would be implemented in separate files
// NewReplicationManager creates a new replication manager
func NewReplicationManager(dht dht.DHT, config *config.Config) (ReplicationManager, error) {
impl, err := NewReplicationManagerImpl(dht, config)
if err != nil {
return nil, err
}
return impl, nil
}
// NewConflictResolver creates a new conflict resolver
func NewConflictResolver(dht dht.DHT, config *config.Config) (ConflictResolver, error) {
// Placeholder implementation until full resolver is wired
return &ConflictResolverImpl{}, nil
}
// NewGossipProtocol creates a new gossip protocol
func NewGossipProtocol(dht dht.DHT, config *config.Config) (GossipProtocol, error) {
impl, err := NewGossipProtocolImpl(dht, config)
if err != nil {
return nil, err
}
return impl, nil
}
// NewNetworkManager creates a new network manager
func NewNetworkManager(dht dht.DHT, config *config.Config) (NetworkManager, error) {
impl, err := NewNetworkManagerImpl(dht, config)
if err != nil {
return nil, err
}
return impl, nil
}
// NewVectorClockManager creates a new vector clock manager
func NewVectorClockManager(dht dht.DHT, nodeID string) (VectorClockManager, error) {
return &defaultVectorClockManager{
clocks: make(map[string]*VectorClock),
}, nil
}
// ConflictResolverImpl is a temporary stub until the full resolver is implemented
type ConflictResolverImpl struct{}
func (cr *ConflictResolverImpl) ResolveConflict(ctx context.Context, local, remote *slurpContext.ContextNode) (*ConflictResolution, error) {
return &ConflictResolution{
Address: local.UCXLAddress,
ResolutionType: ResolutionMerged,
MergedContext: local,
ResolutionTime: time.Millisecond,
ResolvedAt: time.Now(),
Confidence: 0.95,
}, nil
}
// defaultVectorClockManager provides a minimal vector clock store for SEC-SLURP scaffolding.
type defaultVectorClockManager struct {
mu sync.Mutex
clocks map[string]*VectorClock
}
func (vcm *defaultVectorClockManager) GetClock(nodeID string) (*VectorClock, error) {
vcm.mu.Lock()
defer vcm.mu.Unlock()
if clock, ok := vcm.clocks[nodeID]; ok {
return clock, nil
}
clock := &VectorClock{
Clock: map[string]int64{nodeID: time.Now().Unix()},
UpdatedAt: time.Now(),
}
vcm.clocks[nodeID] = clock
return clock, nil
}
func (vcm *defaultVectorClockManager) UpdateClock(nodeID string, clock *VectorClock) error {
vcm.mu.Lock()
defer vcm.mu.Unlock()
vcm.clocks[nodeID] = clock
return nil
}
func (vcm *defaultVectorClockManager) CompareClock(clock1, clock2 *VectorClock) ClockRelation {
if clock1 == nil || clock2 == nil {
return ClockConcurrent
}
if clock1.UpdatedAt.Before(clock2.UpdatedAt) {
return ClockBefore
}
if clock1.UpdatedAt.After(clock2.UpdatedAt) {
return ClockAfter
}
return ClockEqual
}
func (vcm *defaultVectorClockManager) MergeClock(clocks []*VectorClock) *VectorClock {
if len(clocks) == 0 {
return &VectorClock{
Clock: map[string]int64{},
UpdatedAt: time.Now(),
}
}
merged := &VectorClock{
Clock: make(map[string]int64),
UpdatedAt: clocks[0].UpdatedAt,
}
for _, clock := range clocks {
if clock == nil {
continue
}
if clock.UpdatedAt.After(merged.UpdatedAt) {
merged.UpdatedAt = clock.UpdatedAt
}
for node, value := range clock.Clock {
if existing, ok := merged.Clock[node]; !ok || value > existing {
merged.Clock[node] = value
}
}
}
return merged
}