//go:build slurp_full // +build slurp_full // Package distribution provides DHT-based context distribution implementation package distribution import ( "context" "crypto/sha256" "encoding/hex" "encoding/json" "fmt" "sync" "time" "chorus/pkg/config" "chorus/pkg/crypto" "chorus/pkg/dht" "chorus/pkg/election" slurpContext "chorus/pkg/slurp/context" "chorus/pkg/ucxl" ) // DHTContextDistributor implements ContextDistributor using CHORUS DHT infrastructure type DHTContextDistributor struct { mu sync.RWMutex dht dht.DHT roleCrypto *crypto.RoleCrypto election election.Election config *config.Config deploymentID string stats *DistributionStatistics replicationMgr ReplicationManager conflictResolver ConflictResolver gossipProtocol GossipProtocol networkMgr NetworkManager keyGenerator KeyGenerator vectorClockMgr VectorClockManager } // NewDHTContextDistributor creates a new DHT-based context distributor func NewDHTContextDistributor( dht dht.DHT, roleCrypto *crypto.RoleCrypto, election election.Election, config *config.Config, ) (*DHTContextDistributor, error) { if dht == nil { return nil, fmt.Errorf("DHT instance is required") } if roleCrypto == nil { return nil, fmt.Errorf("role crypto instance is required") } if config == nil { return nil, fmt.Errorf("config is required") } deploymentID := fmt.Sprintf("CHORUS-slurp-%s", config.Agent.ID) dist := &DHTContextDistributor{ dht: dht, roleCrypto: roleCrypto, election: election, config: config, deploymentID: deploymentID, stats: &DistributionStatistics{ LastResetTime: time.Now(), CollectedAt: time.Now(), }, keyGenerator: NewDHTKeyGenerator(deploymentID), } // Initialize components if err := dist.initializeComponents(); err != nil { return nil, fmt.Errorf("failed to initialize components: %w", err) } return dist, nil } // initializeComponents initializes all sub-components func (d *DHTContextDistributor) initializeComponents() error { // Initialize replication manager replicationMgr, err := NewReplicationManager(d.dht, d.config) if err != nil { return fmt.Errorf("failed to create replication manager: %w", err) } d.replicationMgr = replicationMgr // Initialize conflict resolver conflictResolver, err := NewConflictResolver(d.dht, d.config) if err != nil { return fmt.Errorf("failed to create conflict resolver: %w", err) } d.conflictResolver = conflictResolver // Initialize gossip protocol gossipProtocol, err := NewGossipProtocol(d.dht, d.config) if err != nil { return fmt.Errorf("failed to create gossip protocol: %w", err) } d.gossipProtocol = gossipProtocol // Initialize network manager networkMgr, err := NewNetworkManager(d.dht, d.config) if err != nil { return fmt.Errorf("failed to create network manager: %w", err) } d.networkMgr = networkMgr // Initialize vector clock manager vectorClockMgr, err := NewVectorClockManager(d.dht, d.config.Agent.ID) if err != nil { return fmt.Errorf("failed to create vector clock manager: %w", err) } d.vectorClockMgr = vectorClockMgr return nil } // DistributeContext encrypts and stores context in DHT for role-based access func (d *DHTContextDistributor) DistributeContext(ctx context.Context, node *slurpContext.ContextNode, roles []string) error { start := time.Now() d.mu.Lock() d.stats.TotalDistributions++ d.mu.Unlock() defer func() { duration := time.Since(start) d.mu.Lock() d.stats.AverageDistributionTime = (d.stats.AverageDistributionTime + duration) / 2 d.mu.Unlock() }() if node == nil { return d.recordError("node cannot be nil") } if len(roles) == 0 { return d.recordError("roles cannot be empty") } // Validate context node if err := node.Validate(); err != nil { return d.recordError(fmt.Sprintf("context validation failed: %v", err)) } // Get current vector clock clock, err := d.vectorClockMgr.GetClock(d.config.Agent.ID) if err != nil { return d.recordError(fmt.Sprintf("failed to get vector clock: %v", err)) } // Prepare context payload for role encryption rawContext, err := json.Marshal(node) if err != nil { return d.recordError(fmt.Sprintf("failed to marshal context: %v", err)) } // Create distribution metadata (checksum calculated per-role below) metadata := &DistributionMetadata{ Address: node.UCXLAddress, Roles: roles, Version: 1, VectorClock: clock, DistributedBy: d.config.Agent.ID, DistributedAt: time.Now(), ReplicationFactor: d.getReplicationFactor(), } // Store encrypted data in DHT for each role for _, role := range roles { key := d.keyGenerator.GenerateContextKey(node.UCXLAddress.String(), role) cipher, fingerprint, err := d.roleCrypto.EncryptForRole(rawContext, role) if err != nil { return d.recordError(fmt.Sprintf("failed to encrypt context for role %s: %v", role, err)) } // Create role-specific storage package storagePackage := &ContextStoragePackage{ EncryptedData: cipher, KeyFingerprint: fingerprint, Metadata: metadata, Role: role, StoredAt: time.Now(), } metadata.Checksum = d.calculateChecksum(cipher) // Serialize for storage storageBytes, err := json.Marshal(storagePackage) if err != nil { return d.recordError(fmt.Sprintf("failed to serialize storage package: %v", err)) } // Store in DHT with replication if err := d.dht.PutValue(ctx, key, storageBytes); err != nil { return d.recordError(fmt.Sprintf("failed to store in DHT for role %s: %v", role, err)) } // Announce that we provide this context if err := d.dht.Provide(ctx, key); err != nil { // Log warning but don't fail - this is for discovery optimization continue } } // Ensure replication if err := d.replicationMgr.EnsureReplication(ctx, node.UCXLAddress, d.getReplicationFactor()); err != nil { // Log warning but don't fail - replication can be eventually consistent } // Update statistics d.mu.Lock() d.stats.SuccessfulDistributions++ d.stats.TotalContextsStored++ d.stats.LastSyncTime = time.Now() d.mu.Unlock() return nil } // RetrieveContext gets context from DHT and decrypts for the requesting role func (d *DHTContextDistributor) RetrieveContext(ctx context.Context, address ucxl.Address, role string) (*slurpContext.ResolvedContext, error) { start := time.Now() d.mu.Lock() d.stats.TotalRetrievals++ d.mu.Unlock() defer func() { duration := time.Since(start) d.mu.Lock() d.stats.AverageRetrievalTime = (d.stats.AverageRetrievalTime + duration) / 2 d.mu.Unlock() }() // Generate key for the role key := d.keyGenerator.GenerateContextKey(address.String(), role) // Retrieve from DHT storageBytes, err := d.dht.GetValue(ctx, key) if err != nil { // Try to find providers if direct lookup fails providers, findErr := d.dht.FindProviders(ctx, key, 5) if findErr != nil || len(providers) == 0 { return nil, d.recordRetrievalError(fmt.Sprintf("context not found for role %s: %v", role, err)) } // Try retrieving from providers for _, provider := range providers { // In a real implementation, we would connect to the provider // For now, we'll just return the original error _ = provider } return nil, d.recordRetrievalError(fmt.Sprintf("context not found for role %s: %v", role, err)) } // Deserialize storage package var storagePackage ContextStoragePackage if err := json.Unmarshal(storageBytes, &storagePackage); err != nil { return nil, d.recordRetrievalError(fmt.Sprintf("failed to deserialize storage package: %v", err)) } // Decrypt context for role plain, err := d.roleCrypto.DecryptForRole(storagePackage.EncryptedData, role, storagePackage.KeyFingerprint) if err != nil { return nil, d.recordRetrievalError(fmt.Sprintf("failed to decrypt context: %v", err)) } var contextNode slurpContext.ContextNode if err := json.Unmarshal(plain, &contextNode); err != nil { return nil, d.recordRetrievalError(fmt.Sprintf("failed to decode context: %v", err)) } // Convert to resolved context resolvedContext := &slurpContext.ResolvedContext{ UCXLAddress: contextNode.UCXLAddress, Summary: contextNode.Summary, Purpose: contextNode.Purpose, Technologies: contextNode.Technologies, Tags: contextNode.Tags, Insights: contextNode.Insights, ContextSourcePath: contextNode.Path, InheritanceChain: []string{contextNode.Path}, ResolutionConfidence: contextNode.RAGConfidence, BoundedDepth: 1, GlobalContextsApplied: false, ResolvedAt: time.Now(), } // Update statistics d.mu.Lock() d.stats.SuccessfulRetrievals++ d.mu.Unlock() return resolvedContext, nil } // UpdateContext updates existing distributed context with conflict resolution func (d *DHTContextDistributor) UpdateContext(ctx context.Context, node *slurpContext.ContextNode, roles []string) (*ConflictResolution, error) { start := time.Now() // Check if context already exists existingContext, err := d.RetrieveContext(ctx, node.UCXLAddress, d.config.Agent.Role) if err != nil { // Context doesn't exist, treat as new distribution if err := d.DistributeContext(ctx, node, roles); err != nil { return nil, fmt.Errorf("failed to distribute new context: %w", err) } return &ConflictResolution{ Address: node.UCXLAddress, ResolutionType: ResolutionMerged, MergedContext: node, ResolutionTime: time.Since(start), ResolvedAt: time.Now(), Confidence: 1.0, }, nil } // Convert existing resolved context back to context node for comparison existingNode := &slurpContext.ContextNode{ Path: existingContext.ContextSourcePath, UCXLAddress: existingContext.UCXLAddress, Summary: existingContext.Summary, Purpose: existingContext.Purpose, Technologies: existingContext.Technologies, Tags: existingContext.Tags, Insights: existingContext.Insights, RAGConfidence: existingContext.ResolutionConfidence, GeneratedAt: existingContext.ResolvedAt, } // Use conflict resolver to handle the update resolution, err := d.conflictResolver.ResolveConflict(ctx, node, existingNode) if err != nil { return nil, fmt.Errorf("failed to resolve conflict: %w", err) } // Distribute the resolved context if resolution.MergedContext != nil { if err := d.DistributeContext(ctx, resolution.MergedContext, roles); err != nil { return nil, fmt.Errorf("failed to distribute merged context: %w", err) } } return resolution, nil } // DeleteContext removes context from distributed storage func (d *DHTContextDistributor) DeleteContext(ctx context.Context, address ucxl.Address) error { // Get list of roles that have access to this context // This is simplified - in production, we'd maintain an index allRoles := []string{"senior_architect", "project_manager", "devops_engineer", "backend_developer", "frontend_developer"} // Delete from DHT for each role var errors []string for _, role := range allRoles { key := d.keyGenerator.GenerateContextKey(address.String(), role) if err := d.dht.PutValue(ctx, key, []byte{}); err != nil { errors = append(errors, fmt.Sprintf("failed to delete for role %s: %v", role, err)) } } if len(errors) > 0 { return fmt.Errorf("deletion errors: %v", errors) } return nil } // ListDistributedContexts lists contexts available in the DHT for a role func (d *DHTContextDistributor) ListDistributedContexts(ctx context.Context, role string, criteria *DistributionCriteria) ([]*DistributedContextInfo, error) { // This is a simplified implementation // In production, we'd maintain proper indexes and filtering results := []*DistributedContextInfo{} limit := 100 if criteria != nil && criteria.Limit > 0 { limit = criteria.Limit } // For now, return empty list - proper implementation would require // maintaining an index of all contexts in the cluster _ = limit return results, nil } // Sync synchronizes local state with distributed DHT func (d *DHTContextDistributor) Sync(ctx context.Context) (*SyncResult, error) { start := time.Now() // Use gossip protocol to sync metadata if err := d.gossipProtocol.StartGossip(ctx); err != nil { return nil, fmt.Errorf("failed to start gossip sync: %w", err) } result := &SyncResult{ SyncedContexts: 0, // Would be populated in real implementation ConflictsResolved: 0, Errors: []string{}, SyncTime: time.Since(start), PeersContacted: len(d.dht.GetConnectedPeers()), DataTransferred: 0, SyncedAt: time.Now(), } return result, nil } // Replicate ensures context has the desired replication factor func (d *DHTContextDistributor) Replicate(ctx context.Context, address ucxl.Address, replicationFactor int) error { return d.replicationMgr.EnsureReplication(ctx, address, replicationFactor) } // GetReplicaHealth returns health status of context replicas func (d *DHTContextDistributor) GetReplicaHealth(ctx context.Context, address ucxl.Address) (*ReplicaHealth, error) { return d.replicationMgr.GetReplicationStatus(ctx, address) } // GetDistributionStats returns distribution performance statistics func (d *DHTContextDistributor) GetDistributionStats() (*DistributionStatistics, error) { d.mu.RLock() defer d.mu.RUnlock() // Update collection timestamp d.stats.CollectedAt = time.Now() // Calculate derived metrics totalOps := d.stats.TotalDistributions + d.stats.TotalRetrievals if totalOps > 0 { d.stats.HealthyNodes = len(d.dht.GetConnectedPeers()) } return d.stats, nil } // SetReplicationPolicy configures replication behavior func (d *DHTContextDistributor) SetReplicationPolicy(policy *ReplicationPolicy) error { return d.replicationMgr.SetReplicationFactor(policy.DefaultFactor) } // Helper methods func (d *DHTContextDistributor) recordError(message string) error { d.mu.Lock() d.stats.FailedDistributions++ d.mu.Unlock() return fmt.Errorf(message) } func (d *DHTContextDistributor) recordRetrievalError(message string) error { d.mu.Lock() d.stats.FailedRetrievals++ d.mu.Unlock() return fmt.Errorf(message) } func (d *DHTContextDistributor) getReplicationFactor() int { return 3 // Default replication factor } func (d *DHTContextDistributor) calculateChecksum(data interface{}) string { bytes, err := json.Marshal(data) if err != nil { return "" } hash := sha256.Sum256(bytes) return hex.EncodeToString(hash[:]) } // Start starts the distribution service func (d *DHTContextDistributor) Start(ctx context.Context) error { if d.gossipProtocol != nil { if err := d.gossipProtocol.StartGossip(ctx); err != nil { return fmt.Errorf("failed to start gossip protocol: %w", err) } } return nil } // Stop stops the distribution service func (d *DHTContextDistributor) Stop(ctx context.Context) error { // Implementation would stop all background processes return nil } // Supporting types and structures // ContextStoragePackage represents a complete package for DHT storage type ContextStoragePackage struct { EncryptedData []byte `json:"encrypted_data"` KeyFingerprint string `json:"key_fingerprint,omitempty"` Metadata *DistributionMetadata `json:"metadata"` Role string `json:"role"` StoredAt time.Time `json:"stored_at"` } // DistributionMetadata contains metadata for distributed context type DistributionMetadata struct { Address ucxl.Address `json:"address"` Roles []string `json:"roles"` Version int64 `json:"version"` VectorClock *VectorClock `json:"vector_clock"` DistributedBy string `json:"distributed_by"` DistributedAt time.Time `json:"distributed_at"` ReplicationFactor int `json:"replication_factor"` Checksum string `json:"checksum"` } // DHTKeyGenerator implements KeyGenerator interface type DHTKeyGenerator struct { deploymentID string } func NewDHTKeyGenerator(deploymentID string) *DHTKeyGenerator { return &DHTKeyGenerator{ deploymentID: deploymentID, } } func (kg *DHTKeyGenerator) GenerateContextKey(address string, role string) string { return fmt.Sprintf("%s:context:%s:%s", kg.deploymentID, address, role) } func (kg *DHTKeyGenerator) GenerateMetadataKey(address string) string { return fmt.Sprintf("%s:metadata:%s", kg.deploymentID, address) } func (kg *DHTKeyGenerator) GenerateReplicationKey(address string) string { return fmt.Sprintf("%s:replication:%s", kg.deploymentID, address) } // Component constructors - these would be implemented in separate files // NewReplicationManager creates a new replication manager func NewReplicationManager(dht dht.DHT, config *config.Config) (ReplicationManager, error) { impl, err := NewReplicationManagerImpl(dht, config) if err != nil { return nil, err } return impl, nil } // NewConflictResolver creates a new conflict resolver func NewConflictResolver(dht dht.DHT, config *config.Config) (ConflictResolver, error) { // Placeholder implementation until full resolver is wired return &ConflictResolverImpl{}, nil } // NewGossipProtocol creates a new gossip protocol func NewGossipProtocol(dht dht.DHT, config *config.Config) (GossipProtocol, error) { impl, err := NewGossipProtocolImpl(dht, config) if err != nil { return nil, err } return impl, nil } // NewNetworkManager creates a new network manager func NewNetworkManager(dht dht.DHT, config *config.Config) (NetworkManager, error) { impl, err := NewNetworkManagerImpl(dht, config) if err != nil { return nil, err } return impl, nil } // NewVectorClockManager creates a new vector clock manager func NewVectorClockManager(dht dht.DHT, nodeID string) (VectorClockManager, error) { return &defaultVectorClockManager{ clocks: make(map[string]*VectorClock), }, nil } // ConflictResolverImpl is a temporary stub until the full resolver is implemented type ConflictResolverImpl struct{} func (cr *ConflictResolverImpl) ResolveConflict(ctx context.Context, local, remote *slurpContext.ContextNode) (*ConflictResolution, error) { return &ConflictResolution{ Address: local.UCXLAddress, ResolutionType: ResolutionMerged, MergedContext: local, ResolutionTime: time.Millisecond, ResolvedAt: time.Now(), Confidence: 0.95, }, nil } // defaultVectorClockManager provides a minimal vector clock store for SEC-SLURP scaffolding. type defaultVectorClockManager struct { mu sync.Mutex clocks map[string]*VectorClock } func (vcm *defaultVectorClockManager) GetClock(nodeID string) (*VectorClock, error) { vcm.mu.Lock() defer vcm.mu.Unlock() if clock, ok := vcm.clocks[nodeID]; ok { return clock, nil } clock := &VectorClock{ Clock: map[string]int64{nodeID: time.Now().Unix()}, UpdatedAt: time.Now(), } vcm.clocks[nodeID] = clock return clock, nil } func (vcm *defaultVectorClockManager) UpdateClock(nodeID string, clock *VectorClock) error { vcm.mu.Lock() defer vcm.mu.Unlock() vcm.clocks[nodeID] = clock return nil } func (vcm *defaultVectorClockManager) CompareClock(clock1, clock2 *VectorClock) ClockRelation { if clock1 == nil || clock2 == nil { return ClockConcurrent } if clock1.UpdatedAt.Before(clock2.UpdatedAt) { return ClockBefore } if clock1.UpdatedAt.After(clock2.UpdatedAt) { return ClockAfter } return ClockEqual } func (vcm *defaultVectorClockManager) MergeClock(clocks []*VectorClock) *VectorClock { if len(clocks) == 0 { return &VectorClock{ Clock: map[string]int64{}, UpdatedAt: time.Now(), } } merged := &VectorClock{ Clock: make(map[string]int64), UpdatedAt: clocks[0].UpdatedAt, } for _, clock := range clocks { if clock == nil { continue } if clock.UpdatedAt.After(merged.UpdatedAt) { merged.UpdatedAt = clock.UpdatedAt } for node, value := range clock.Clock { if existing, ok := merged.Clock[node]; !ok || value > existing { merged.Clock[node] = value } } } return merged }