 9bdcbe0447
			
		
	
	9bdcbe0447
	
	
	
		
			
			Major integrations and fixes: - Added BACKBEAT SDK integration for P2P operation timing - Implemented beat-aware status tracking for distributed operations - Added Docker secrets support for secure license management - Resolved KACHING license validation via HTTPS/TLS - Updated docker-compose configuration for clean stack deployment - Disabled rollback policies to prevent deployment failures - Added license credential storage (CHORUS-DEV-MULTI-001) Technical improvements: - BACKBEAT P2P operation tracking with phase management - Enhanced configuration system with file-based secrets - Improved error handling for license validation - Clean separation of KACHING and CHORUS deployment stacks 🤖 Generated with [Claude Code](https://claude.ai/code) Co-Authored-By: Claude <noreply@anthropic.com>
		
			
				
	
	
		
			795 lines
		
	
	
		
			24 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
			
		
		
	
	
			795 lines
		
	
	
		
			24 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
| package dht
 | |
| 
 | |
| import (
 | |
| 	"context"
 | |
| 	"crypto/sha256"
 | |
| 	"encoding/base64"
 | |
| 	"encoding/json"
 | |
| 	"fmt"
 | |
| 	"log"
 | |
| 	"sync"
 | |
| 	"time"
 | |
| 
 | |
| 	"chorus/pkg/config"
 | |
| 	"chorus/pkg/crypto"
 | |
| 	"chorus/pkg/storage"
 | |
| 	"chorus/pkg/ucxl"
 | |
| 	"github.com/libp2p/go-libp2p/core/host"
 | |
| 	"github.com/libp2p/go-libp2p/core/peer"
 | |
| )
 | |
| 
 | |
| // EncryptedDHTStorage handles encrypted UCXL content storage in DHT
 | |
| type EncryptedDHTStorage struct {
 | |
| 	ctx       context.Context
 | |
| 	host      host.Host
 | |
| 	dht       *LibP2PDHT
 | |
| 	crypto    *crypto.AgeCrypto
 | |
| 	config    *config.Config
 | |
| 	nodeID    string
 | |
| 	
 | |
| 	// Local cache for performance
 | |
| 	cache     map[string]*CachedEntry
 | |
| 	cacheMu   sync.RWMutex
 | |
| 	
 | |
| 	// Metrics
 | |
| 	metrics   *StorageMetrics
 | |
| }
 | |
| 
 | |
| // CachedEntry represents a cached DHT entry
 | |
| type CachedEntry struct {
 | |
| 	Content   []byte
 | |
| 	Metadata  *UCXLMetadata
 | |
| 	CachedAt  time.Time
 | |
| 	ExpiresAt time.Time
 | |
| }
 | |
| 
 | |
| // UCXLMetadata holds metadata about stored UCXL content
 | |
| type UCXLMetadata struct {
 | |
| 	Address        string    `json:"address"`          // UCXL address
 | |
| 	CreatorRole    string    `json:"creator_role"`     // Role that created the content
 | |
| 	EncryptedFor   []string  `json:"encrypted_for"`    // Roles that can decrypt
 | |
| 	ContentType    string    `json:"content_type"`     // Type of content (decision, suggestion, etc)
 | |
| 	Timestamp      time.Time `json:"timestamp"`        // Creation timestamp
 | |
| 	Size           int       `json:"size"`             // Content size in bytes
 | |
| 	Hash           string    `json:"hash"`             // SHA256 hash of encrypted content
 | |
| 	DHTPeers       []string  `json:"dht_peers"`        // Peers that have this content
 | |
| 	ReplicationFactor int    `json:"replication_factor"` // Number of peers storing this
 | |
| }
 | |
| 
 | |
| // StorageMetrics tracks DHT storage performance
 | |
| type StorageMetrics struct {
 | |
| 	StoredItems     int64     `json:"stored_items"`
 | |
| 	RetrievedItems  int64     `json:"retrieved_items"`
 | |
| 	CacheHits       int64     `json:"cache_hits"`
 | |
| 	CacheMisses     int64     `json:"cache_misses"`
 | |
| 	EncryptionOps   int64     `json:"encryption_ops"`
 | |
| 	DecryptionOps   int64     `json:"decryption_ops"`
 | |
| 	AverageStoreTime time.Duration `json:"average_store_time"`
 | |
| 	AverageRetrieveTime time.Duration `json:"average_retrieve_time"`
 | |
| 	LastUpdate      time.Time `json:"last_update"`
 | |
| }
 | |
| 
 | |
| // NewEncryptedDHTStorage creates a new encrypted DHT storage instance
 | |
| func NewEncryptedDHTStorage(
 | |
| 	ctx context.Context,
 | |
| 	host host.Host,
 | |
| 	libp2pDHT *LibP2PDHT,
 | |
| 	config *config.Config,
 | |
| 	nodeID string,
 | |
| ) *EncryptedDHTStorage {
 | |
| 	ageCrypto := crypto.NewAgeCrypto(config)
 | |
| 	
 | |
| 	return &EncryptedDHTStorage{
 | |
| 		ctx:     ctx,
 | |
| 		host:    host,
 | |
| 		dht:     libp2pDHT,
 | |
| 		crypto:  ageCrypto,
 | |
| 		config:  config,
 | |
| 		nodeID:  nodeID,
 | |
| 		cache:   make(map[string]*CachedEntry),
 | |
| 		metrics: &StorageMetrics{
 | |
| 			LastUpdate: time.Now(),
 | |
| 		},
 | |
| 	}
 | |
| }
 | |
| 
 | |
| // StoreUCXLContent stores encrypted UCXL content in the DHT
 | |
| func (eds *EncryptedDHTStorage) StoreUCXLContent(
 | |
| 	ucxlAddress string,
 | |
| 	content []byte,
 | |
| 	creatorRole string,
 | |
| 	contentType string,
 | |
| ) error {
 | |
| 	startTime := time.Now()
 | |
| 	defer func() {
 | |
| 		eds.metrics.AverageStoreTime = time.Since(startTime)
 | |
| 		eds.metrics.LastUpdate = time.Now()
 | |
| 	}()
 | |
| 	
 | |
| 	// Validate UCXL address format
 | |
| 	parsedAddr, err := ucxl.Parse(ucxlAddress)
 | |
| 	if err != nil {
 | |
| 		if validationErr, ok := err.(*ucxl.ValidationError); ok {
 | |
| 			return fmt.Errorf("UCXL-400-INVALID_ADDRESS in %s: %s (address: %s)", 
 | |
| 				validationErr.Field, validationErr.Message, validationErr.Raw)
 | |
| 		}
 | |
| 		return fmt.Errorf("invalid UCXL address: %w", err)
 | |
| 	}
 | |
| 	
 | |
| 	log.Printf("✅ UCXL address validated: %s", parsedAddr.String())
 | |
| 	
 | |
| 	log.Printf("📦 Storing UCXL content: %s (creator: %s)", ucxlAddress, creatorRole)
 | |
| 	
 | |
| 	// Audit logging for Store operation
 | |
| 	if eds.config.Security.AuditLogging {
 | |
| 		eds.auditStoreOperation(ucxlAddress, creatorRole, contentType, len(content), true, "")
 | |
| 	}
 | |
| 	
 | |
| 	// Role-based access policy check
 | |
| 	if err := eds.checkStoreAccessPolicy(creatorRole, ucxlAddress, contentType); err != nil {
 | |
| 		// Audit failed access attempt
 | |
| 		if eds.config.Security.AuditLogging {
 | |
| 			eds.auditStoreOperation(ucxlAddress, creatorRole, contentType, len(content), false, err.Error())
 | |
| 		}
 | |
| 		return fmt.Errorf("store access denied: %w", err)
 | |
| 	}
 | |
| 	
 | |
| 	// Encrypt content for the creator role
 | |
| 	encryptedContent, err := eds.crypto.EncryptUCXLContent(content, creatorRole)
 | |
| 	if err != nil {
 | |
| 		return fmt.Errorf("failed to encrypt content: %w", err)
 | |
| 	}
 | |
| 	eds.metrics.EncryptionOps++
 | |
| 	
 | |
| 	// Get roles that can decrypt this content
 | |
| 	decryptableRoles, err := eds.getDecryptableRoles(creatorRole)
 | |
| 	if err != nil {
 | |
| 		return fmt.Errorf("failed to determine decryptable roles: %w", err)
 | |
| 	}
 | |
| 	
 | |
| 	// Create metadata
 | |
| 	metadata := &UCXLMetadata{
 | |
| 		Address:           ucxlAddress,
 | |
| 		CreatorRole:       creatorRole,
 | |
| 		EncryptedFor:      decryptableRoles,
 | |
| 		ContentType:       contentType,
 | |
| 		Timestamp:         time.Now(),
 | |
| 		Size:              len(encryptedContent),
 | |
| 		Hash:              fmt.Sprintf("%x", sha256.Sum256(encryptedContent)),
 | |
| 		ReplicationFactor: 3, // Default replication
 | |
| 	}
 | |
| 	
 | |
| 	// Create storage entry
 | |
| 	entry := &StorageEntry{
 | |
| 		Metadata:         metadata,
 | |
| 		EncryptedContent: encryptedContent,
 | |
| 		StoredBy:         eds.nodeID,
 | |
| 		StoredAt:         time.Now(),
 | |
| 	}
 | |
| 	
 | |
| 	// Serialize entry
 | |
| 	entryData, err := json.Marshal(entry)
 | |
| 	if err != nil {
 | |
| 		return fmt.Errorf("failed to serialize storage entry: %w", err)
 | |
| 	}
 | |
| 	
 | |
| 	// Generate DHT key from UCXL address
 | |
| 	dhtKey := eds.generateDHTKey(ucxlAddress)
 | |
| 	
 | |
| 	// Store in DHT
 | |
| 	if err := eds.dht.PutValue(eds.ctx, dhtKey, entryData); err != nil {
 | |
| 		return fmt.Errorf("failed to store in DHT: %w", err)
 | |
| 	}
 | |
| 	
 | |
| 	// Cache locally for performance
 | |
| 	eds.cacheEntry(ucxlAddress, &CachedEntry{
 | |
| 		Content:   encryptedContent,
 | |
| 		Metadata:  metadata,
 | |
| 		CachedAt:  time.Now(),
 | |
| 		ExpiresAt: time.Now().Add(10 * time.Minute), // Cache for 10 minutes
 | |
| 	})
 | |
| 	
 | |
| 	log.Printf("✅ Stored UCXL content in DHT: %s (size: %d bytes)", ucxlAddress, len(encryptedContent))
 | |
| 	eds.metrics.StoredItems++
 | |
| 	
 | |
| 	return nil
 | |
| }
 | |
| 
 | |
| // RetrieveUCXLContent retrieves and decrypts UCXL content from DHT
 | |
| func (eds *EncryptedDHTStorage) RetrieveUCXLContent(ucxlAddress string) ([]byte, *storage.UCXLMetadata, error) {
 | |
| 	startTime := time.Now()
 | |
| 	defer func() {
 | |
| 		eds.metrics.AverageRetrieveTime = time.Since(startTime)
 | |
| 		eds.metrics.LastUpdate = time.Now()
 | |
| 	}()
 | |
| 	
 | |
| 	// Validate UCXL address format
 | |
| 	parsedAddr, err := ucxl.Parse(ucxlAddress)
 | |
| 	if err != nil {
 | |
| 		if validationErr, ok := err.(*ucxl.ValidationError); ok {
 | |
| 			return nil, nil, fmt.Errorf("UCXL-400-INVALID_ADDRESS in %s: %s (address: %s)", 
 | |
| 				validationErr.Field, validationErr.Message, validationErr.Raw)
 | |
| 		}
 | |
| 		return nil, nil, fmt.Errorf("invalid UCXL address: %w", err)
 | |
| 	}
 | |
| 	
 | |
| 	log.Printf("📥 Retrieving UCXL content: %s", parsedAddr.String())
 | |
| 	
 | |
| 	// Get current role for audit logging
 | |
| 	currentRole := eds.getCurrentRole()
 | |
| 	
 | |
| 	// Role-based access policy check for retrieval
 | |
| 	if err := eds.checkRetrieveAccessPolicy(currentRole, ucxlAddress); err != nil {
 | |
| 		// Audit failed access attempt
 | |
| 		if eds.config.Security.AuditLogging {
 | |
| 			eds.auditRetrieveOperation(ucxlAddress, currentRole, false, err.Error())
 | |
| 		}
 | |
| 		return nil, nil, fmt.Errorf("retrieve access denied: %w", err)
 | |
| 	}
 | |
| 	
 | |
| 	// Check cache first
 | |
| 	if cachedEntry := eds.getCachedEntry(ucxlAddress); cachedEntry != nil {
 | |
| 		log.Printf("💾 Cache hit for %s", ucxlAddress)
 | |
| 		eds.metrics.CacheHits++
 | |
| 		
 | |
| 		// Decrypt content
 | |
| 		decryptedContent, err := eds.crypto.DecryptWithRole(cachedEntry.Content)
 | |
| 		if err != nil {
 | |
| 			// If decryption fails, remove from cache and fall through to DHT
 | |
| 			log.Printf("⚠️ Failed to decrypt cached content: %v", err)
 | |
| 			eds.invalidateCacheEntry(ucxlAddress)
 | |
| 		} else {
 | |
| 			eds.metrics.DecryptionOps++
 | |
| 			eds.metrics.RetrievedItems++
 | |
| 			// Convert to storage.UCXLMetadata
 | |
| 			storageMetadata := &storage.UCXLMetadata{
 | |
| 				Address:     cachedEntry.Metadata.Address,
 | |
| 				CreatorRole: cachedEntry.Metadata.CreatorRole,
 | |
| 				ContentType: cachedEntry.Metadata.ContentType,
 | |
| 				CreatedAt:   cachedEntry.Metadata.Timestamp,
 | |
| 				Size:        int64(cachedEntry.Metadata.Size),
 | |
| 				Encrypted:   true,
 | |
| 			}
 | |
| 			return decryptedContent, storageMetadata, nil
 | |
| 		}
 | |
| 	}
 | |
| 	
 | |
| 	eds.metrics.CacheMisses++
 | |
| 	
 | |
| 	// Generate DHT key
 | |
| 	dhtKey := eds.generateDHTKey(ucxlAddress)
 | |
| 	
 | |
| 	// Retrieve from DHT
 | |
| 	value, err := eds.dht.GetValue(eds.ctx, dhtKey)
 | |
| 	if err != nil {
 | |
| 		return nil, nil, fmt.Errorf("failed to retrieve from DHT: %w", err)
 | |
| 	}
 | |
| 	
 | |
| 	// Deserialize entry
 | |
| 	var entry StorageEntry
 | |
| 	if err := json.Unmarshal(value, &entry); err != nil {
 | |
| 		return nil, nil, fmt.Errorf("failed to deserialize storage entry: %w", err)
 | |
| 	}
 | |
| 	
 | |
| 	// Check if current role can decrypt this content
 | |
| 	canDecrypt, err := eds.crypto.CanDecryptContent(entry.Metadata.CreatorRole)
 | |
| 	if err != nil {
 | |
| 		return nil, nil, fmt.Errorf("failed to check decryption permission: %w", err)
 | |
| 	}
 | |
| 	
 | |
| 	if !canDecrypt {
 | |
| 		return nil, nil, fmt.Errorf("current role cannot decrypt content from role: %s", entry.Metadata.CreatorRole)
 | |
| 	}
 | |
| 	
 | |
| 	// Decrypt content
 | |
| 	decryptedContent, err := eds.crypto.DecryptWithRole(entry.EncryptedContent)
 | |
| 	if err != nil {
 | |
| 		return nil, nil, fmt.Errorf("failed to decrypt content: %w", err)
 | |
| 	}
 | |
| 	eds.metrics.DecryptionOps++
 | |
| 	
 | |
| 	// Cache the entry
 | |
| 	eds.cacheEntry(ucxlAddress, &CachedEntry{
 | |
| 		Content:   entry.EncryptedContent,
 | |
| 		Metadata:  entry.Metadata,
 | |
| 		CachedAt:  time.Now(),
 | |
| 		ExpiresAt: time.Now().Add(10 * time.Minute),
 | |
| 	})
 | |
| 	
 | |
| 	log.Printf("✅ Retrieved and decrypted UCXL content: %s (size: %d bytes)", ucxlAddress, len(decryptedContent))
 | |
| 	eds.metrics.RetrievedItems++
 | |
| 	
 | |
| 	// Audit successful retrieval
 | |
| 	if eds.config.Security.AuditLogging {
 | |
| 		eds.auditRetrieveOperation(ucxlAddress, currentRole, true, "")
 | |
| 	}
 | |
| 	
 | |
| 	// Convert to storage.UCXLMetadata interface
 | |
| 	storageMetadata := &storage.UCXLMetadata{
 | |
| 		Address:     entry.Metadata.Address,
 | |
| 		CreatorRole: entry.Metadata.CreatorRole,
 | |
| 		ContentType: entry.Metadata.ContentType,
 | |
| 		CreatedAt:   entry.Metadata.Timestamp,
 | |
| 		Size:        int64(entry.Metadata.Size),
 | |
| 		Encrypted:   true, // Always encrypted in DHT storage
 | |
| 	}
 | |
| 	
 | |
| 	return decryptedContent, storageMetadata, nil
 | |
| }
 | |
| 
 | |
| // ListContentByRole lists all content accessible by the current role
 | |
| func (eds *EncryptedDHTStorage) ListContentByRole(roleFilter string, limit int) ([]*UCXLMetadata, error) {
 | |
| 	// This is a simplified implementation
 | |
| 	// In a real system, you'd maintain an index or use DHT range queries
 | |
| 	
 | |
| 	log.Printf("📋 Listing content for role: %s (limit: %d)", roleFilter, limit)
 | |
| 	
 | |
| 	var results []*UCXLMetadata
 | |
| 	count := 0
 | |
| 	
 | |
| 	// For now, return cached entries that match the role filter
 | |
| 	eds.cacheMu.RLock()
 | |
| 	for _, entry := range eds.cache {
 | |
| 		if count >= limit {
 | |
| 			break
 | |
| 		}
 | |
| 		
 | |
| 		// Check if the role can access this content
 | |
| 		for _, role := range entry.Metadata.EncryptedFor {
 | |
| 			if role == roleFilter || role == "*" {
 | |
| 				results = append(results, entry.Metadata)
 | |
| 				count++
 | |
| 				break
 | |
| 			}
 | |
| 		}
 | |
| 	}
 | |
| 	eds.cacheMu.RUnlock()
 | |
| 	
 | |
| 	log.Printf("📋 Found %d content items for role %s", len(results), roleFilter)
 | |
| 	return results, nil
 | |
| }
 | |
| 
 | |
| // SearchContent searches for UCXL content by various criteria  
 | |
| func (eds *EncryptedDHTStorage) SearchContent(query *storage.SearchQuery) ([]*storage.UCXLMetadata, error) {
 | |
| 	log.Printf("🔍 Searching content: %+v", query)
 | |
| 	
 | |
| 	var results []*storage.UCXLMetadata
 | |
| 	
 | |
| 	eds.cacheMu.RLock()
 | |
| 	defer eds.cacheMu.RUnlock()
 | |
| 	
 | |
| 	for _, entry := range eds.cache {
 | |
| 		if eds.matchesQuery(entry.Metadata, query) {
 | |
| 			// Convert to storage.UCXLMetadata
 | |
| 			storageMetadata := &storage.UCXLMetadata{
 | |
| 				Address:     entry.Metadata.Address,
 | |
| 				CreatorRole: entry.Metadata.CreatorRole,
 | |
| 				ContentType: entry.Metadata.ContentType,
 | |
| 				CreatedAt:   entry.Metadata.Timestamp,
 | |
| 				Size:        int64(entry.Metadata.Size),
 | |
| 				Encrypted:   true,
 | |
| 			}
 | |
| 			results = append(results, storageMetadata)
 | |
| 			if len(results) >= query.Limit {
 | |
| 				break
 | |
| 			}
 | |
| 		}
 | |
| 	}
 | |
| 	
 | |
| 	log.Printf("🔍 Search found %d results", len(results))
 | |
| 	return results, nil
 | |
| }
 | |
| 
 | |
| // SearchQuery defines search criteria for UCXL content
 | |
| type SearchQuery struct {
 | |
| 	Agent       string    `json:"agent,omitempty"`
 | |
| 	Role        string    `json:"role,omitempty"`
 | |
| 	Project     string    `json:"project,omitempty"`
 | |
| 	Task        string    `json:"task,omitempty"`
 | |
| 	ContentType string    `json:"content_type,omitempty"`
 | |
| 	CreatedAfter time.Time `json:"created_after,omitempty"`
 | |
| 	CreatedBefore time.Time `json:"created_before,omitempty"`
 | |
| 	Limit       int       `json:"limit"`
 | |
| }
 | |
| 
 | |
| // StorageEntry represents a complete DHT storage entry
 | |
| type StorageEntry struct {
 | |
| 	Metadata         *UCXLMetadata `json:"metadata"`
 | |
| 	EncryptedContent []byte        `json:"encrypted_content"`
 | |
| 	StoredBy         string        `json:"stored_by"`
 | |
| 	StoredAt         time.Time     `json:"stored_at"`
 | |
| }
 | |
| 
 | |
| // generateDHTKey generates a consistent DHT key for a UCXL address
 | |
| func (eds *EncryptedDHTStorage) generateDHTKey(ucxlAddress string) string {
 | |
| 	// Use SHA256 hash of the UCXL address as DHT key
 | |
| 	hash := sha256.Sum256([]byte(ucxlAddress))
 | |
| 	return "/CHORUS/ucxl/" + base64.URLEncoding.EncodeToString(hash[:])
 | |
| }
 | |
| 
 | |
| // getDecryptableRoles determines which roles can decrypt content from a creator
 | |
| func (eds *EncryptedDHTStorage) getDecryptableRoles(creatorRole string) ([]string, error) {
 | |
| 	roles := config.GetPredefinedRoles()
 | |
| 	_, exists := roles[creatorRole]
 | |
| 	if !exists {
 | |
| 		return nil, fmt.Errorf("creator role '%s' not found", creatorRole)
 | |
| 	}
 | |
| 	
 | |
| 	// Start with the creator role itself
 | |
| 	decryptableRoles := []string{creatorRole}
 | |
| 	
 | |
| 	// Add all roles that have authority to decrypt this creator's content
 | |
| 	for roleName, role := range roles {
 | |
| 		if roleName == creatorRole {
 | |
| 			continue
 | |
| 		}
 | |
| 		
 | |
| 		// Check if this role can decrypt the creator's content
 | |
| 		for _, decryptableRole := range role.CanDecrypt {
 | |
| 			if decryptableRole == creatorRole || decryptableRole == "*" {
 | |
| 				decryptableRoles = append(decryptableRoles, roleName)
 | |
| 				break
 | |
| 			}
 | |
| 		}
 | |
| 	}
 | |
| 	
 | |
| 	return decryptableRoles, nil
 | |
| }
 | |
| 
 | |
| // cacheEntry adds an entry to the local cache
 | |
| func (eds *EncryptedDHTStorage) cacheEntry(ucxlAddress string, entry *CachedEntry) {
 | |
| 	eds.cacheMu.Lock()
 | |
| 	defer eds.cacheMu.Unlock()
 | |
| 	eds.cache[ucxlAddress] = entry
 | |
| }
 | |
| 
 | |
| // getCachedEntry retrieves an entry from the local cache
 | |
| func (eds *EncryptedDHTStorage) getCachedEntry(ucxlAddress string) *CachedEntry {
 | |
| 	eds.cacheMu.RLock()
 | |
| 	defer eds.cacheMu.RUnlock()
 | |
| 	
 | |
| 	entry, exists := eds.cache[ucxlAddress]
 | |
| 	if !exists {
 | |
| 		return nil
 | |
| 	}
 | |
| 	
 | |
| 	// Check if entry has expired
 | |
| 	if time.Now().After(entry.ExpiresAt) {
 | |
| 		// Remove expired entry asynchronously
 | |
| 		go eds.invalidateCacheEntry(ucxlAddress)
 | |
| 		return nil
 | |
| 	}
 | |
| 	
 | |
| 	return entry
 | |
| }
 | |
| 
 | |
| // invalidateCacheEntry removes an entry from the cache
 | |
| func (eds *EncryptedDHTStorage) invalidateCacheEntry(ucxlAddress string) {
 | |
| 	eds.cacheMu.Lock()
 | |
| 	defer eds.cacheMu.Unlock()
 | |
| 	delete(eds.cache, ucxlAddress)
 | |
| }
 | |
| 
 | |
| // matchesQuery checks if metadata matches a search query
 | |
| func (eds *EncryptedDHTStorage) matchesQuery(metadata *UCXLMetadata, query *storage.SearchQuery) bool {
 | |
| 	// Parse UCXL address properly
 | |
| 	parsedAddr, err := ucxl.Parse(metadata.Address)
 | |
| 	if err != nil {
 | |
| 		log.Printf("⚠️ Invalid UCXL address in search: %s", metadata.Address)
 | |
| 		return false // Skip invalid addresses
 | |
| 	}
 | |
| 	
 | |
| 	// Check agent filter
 | |
| 	if query.Agent != "" && parsedAddr.Agent != query.Agent {
 | |
| 		return false
 | |
| 	}
 | |
| 	
 | |
| 	// Check role filter
 | |
| 	if query.Role != "" && parsedAddr.Role != query.Role {
 | |
| 		return false
 | |
| 	}
 | |
| 	
 | |
| 	// Check project filter
 | |
| 	if query.Project != "" && parsedAddr.Project != query.Project {
 | |
| 		return false
 | |
| 	}
 | |
| 	
 | |
| 	// Check task filter
 | |
| 	if query.Task != "" && parsedAddr.Task != query.Task {
 | |
| 		return false
 | |
| 	}
 | |
| 	
 | |
| 	// Check content type filter
 | |
| 	if query.ContentType != "" && metadata.ContentType != query.ContentType {
 | |
| 		return false
 | |
| 	}
 | |
| 	
 | |
| 	// Check date filters
 | |
| 	if !query.CreatedAfter.IsZero() && metadata.Timestamp.Before(query.CreatedAfter) {
 | |
| 		return false
 | |
| 	}
 | |
| 	
 | |
| 	if !query.CreatedBefore.IsZero() && metadata.Timestamp.After(query.CreatedBefore) {
 | |
| 		return false
 | |
| 	}
 | |
| 	
 | |
| 	return true
 | |
| }
 | |
| 
 | |
| // GetMetrics returns current storage metrics
 | |
| func (eds *EncryptedDHTStorage) GetMetrics() map[string]interface{} {
 | |
| 	// Update cache statistics
 | |
| 	eds.cacheMu.RLock()
 | |
| 	cacheSize := len(eds.cache)
 | |
| 	eds.cacheMu.RUnlock()
 | |
| 	
 | |
| 	metrics := *eds.metrics // Copy metrics
 | |
| 	metrics.LastUpdate = time.Now()
 | |
| 	
 | |
| 	// Convert to map[string]interface{} for interface compatibility
 | |
| 	result := map[string]interface{}{
 | |
| 		"stored_items":     metrics.StoredItems,
 | |
| 		"retrieved_items":  metrics.RetrievedItems,
 | |
| 		"cache_hits":       metrics.CacheHits,
 | |
| 		"cache_misses":     metrics.CacheMisses,
 | |
| 		"encryption_ops":   metrics.EncryptionOps,
 | |
| 		"decryption_ops":   metrics.DecryptionOps,
 | |
| 		"cache_size":       cacheSize,
 | |
| 		"last_update":      metrics.LastUpdate,
 | |
| 	}
 | |
| 	
 | |
| 	log.Printf("📊 DHT Storage Metrics: stored=%d, retrieved=%d, cache_size=%d", 
 | |
| 		metrics.StoredItems, metrics.RetrievedItems, cacheSize)
 | |
| 	
 | |
| 	return result
 | |
| }
 | |
| 
 | |
| // CleanupCache removes expired entries from the cache
 | |
| func (eds *EncryptedDHTStorage) CleanupCache() {
 | |
| 	eds.cacheMu.Lock()
 | |
| 	defer eds.cacheMu.Unlock()
 | |
| 	
 | |
| 	now := time.Now()
 | |
| 	expired := 0
 | |
| 	
 | |
| 	for address, entry := range eds.cache {
 | |
| 		if now.After(entry.ExpiresAt) {
 | |
| 			delete(eds.cache, address)
 | |
| 			expired++
 | |
| 		}
 | |
| 	}
 | |
| 	
 | |
| 	if expired > 0 {
 | |
| 		log.Printf("🧹 Cleaned up %d expired cache entries", expired)
 | |
| 	}
 | |
| }
 | |
| 
 | |
| // StartCacheCleanup starts a background goroutine to clean up expired cache entries
 | |
| func (eds *EncryptedDHTStorage) StartCacheCleanup(interval time.Duration) {
 | |
| 	ticker := time.NewTicker(interval)
 | |
| 	
 | |
| 	go func() {
 | |
| 		defer ticker.Stop()
 | |
| 		
 | |
| 		for {
 | |
| 			select {
 | |
| 			case <-eds.ctx.Done():
 | |
| 				return
 | |
| 			case <-ticker.C:
 | |
| 				eds.CleanupCache()
 | |
| 			}
 | |
| 		}
 | |
| 	}()
 | |
| }
 | |
| 
 | |
| // AnnounceContent announces that this node has specific UCXL content
 | |
| func (eds *EncryptedDHTStorage) AnnounceContent(ucxlAddress string) error {
 | |
| 	// Get current role for audit logging
 | |
| 	currentRole := eds.getCurrentRole()
 | |
| 	
 | |
| 	// Role-based access policy check for announce
 | |
| 	if err := eds.checkAnnounceAccessPolicy(currentRole, ucxlAddress); err != nil {
 | |
| 		// Audit failed announce attempt
 | |
| 		if eds.config.Security.AuditLogging {
 | |
| 			eds.auditAnnounceOperation(ucxlAddress, currentRole, false, err.Error())
 | |
| 		}
 | |
| 		return fmt.Errorf("announce access denied: %w", err)
 | |
| 	}
 | |
| 	
 | |
| 	// Create announcement
 | |
| 	announcement := map[string]interface{}{
 | |
| 		"node_id":      eds.nodeID,
 | |
| 		"ucxl_address": ucxlAddress,
 | |
| 		"timestamp":    time.Now(),
 | |
| 		"peer_id":      eds.host.ID().String(),
 | |
| 	}
 | |
| 	
 | |
| 	announcementData, err := json.Marshal(announcement)
 | |
| 	if err != nil {
 | |
| 		return fmt.Errorf("failed to marshal announcement: %w", err)
 | |
| 	}
 | |
| 	
 | |
| 	// Announce via DHT
 | |
| 	dhtKey := "/CHORUS/announcements/" + eds.generateDHTKey(ucxlAddress)
 | |
| 	err = eds.dht.PutValue(eds.ctx, dhtKey, announcementData)
 | |
| 	
 | |
| 	// Audit the announce operation
 | |
| 	if eds.config.Security.AuditLogging {
 | |
| 		if err != nil {
 | |
| 			eds.auditAnnounceOperation(ucxlAddress, currentRole, false, err.Error())
 | |
| 		} else {
 | |
| 			eds.auditAnnounceOperation(ucxlAddress, currentRole, true, "")
 | |
| 		}
 | |
| 	}
 | |
| 	
 | |
| 	return err
 | |
| }
 | |
| 
 | |
| // DiscoverContentPeers discovers peers that have specific UCXL content
 | |
| func (eds *EncryptedDHTStorage) DiscoverContentPeers(ucxlAddress string) ([]peer.ID, error) {
 | |
| 	dhtKey := "/CHORUS/announcements/" + eds.generateDHTKey(ucxlAddress)
 | |
| 	
 | |
| 	// This is a simplified implementation
 | |
| 	// In a real system, you'd query multiple announcement keys
 | |
| 	value, err := eds.dht.GetValue(eds.ctx, dhtKey)
 | |
| 	if err != nil {
 | |
| 		return nil, fmt.Errorf("failed to discover peers: %w", err)
 | |
| 	}
 | |
| 	
 | |
| 	var announcement map[string]interface{}
 | |
| 	if err := json.Unmarshal(value, &announcement); err != nil {
 | |
| 		return nil, fmt.Errorf("failed to parse announcement: %w", err)
 | |
| 	}
 | |
| 	
 | |
| 	// Extract peer ID
 | |
| 	peerIDStr, ok := announcement["peer_id"].(string)
 | |
| 	if !ok {
 | |
| 		return nil, fmt.Errorf("invalid peer ID in announcement")
 | |
| 	}
 | |
| 	
 | |
| 	peerID, err := peer.Decode(peerIDStr)
 | |
| 	if err != nil {
 | |
| 		return nil, fmt.Errorf("failed to decode peer ID: %w", err)
 | |
| 	}
 | |
| 	
 | |
| 	return []peer.ID{peerID}, nil
 | |
| }
 | |
| 
 | |
| // Security policy and audit methods
 | |
| 
 | |
| // getCurrentRole gets the current role from the agent configuration
 | |
| func (eds *EncryptedDHTStorage) getCurrentRole() string {
 | |
| 	if eds.config.Agent.Role == "" {
 | |
| 		return "unknown"
 | |
| 	}
 | |
| 	return eds.config.Agent.Role
 | |
| }
 | |
| 
 | |
| // checkStoreAccessPolicy checks if the current role can store content
 | |
| func (eds *EncryptedDHTStorage) checkStoreAccessPolicy(creatorRole, ucxlAddress, contentType string) error {
 | |
| 	// Basic role validation
 | |
| 	roles := config.GetPredefinedRoles()
 | |
| 	if _, exists := roles[creatorRole]; !exists {
 | |
| 		return fmt.Errorf("unknown creator role: %s", creatorRole)
 | |
| 	}
 | |
| 	
 | |
| 	// Check if role has authority to create content
 | |
| 	role := roles[creatorRole]
 | |
| 	if role.AuthorityLevel == config.AuthorityReadOnly {
 | |
| 		return fmt.Errorf("role %s has read-only authority and cannot store content", creatorRole)
 | |
| 	}
 | |
| 	
 | |
| 	// Additional policy checks can be added here
 | |
| 	// For now, allow all valid roles except read-only to store content
 | |
| 	return nil
 | |
| }
 | |
| 
 | |
| // checkRetrieveAccessPolicy checks if the current role can retrieve content
 | |
| func (eds *EncryptedDHTStorage) checkRetrieveAccessPolicy(currentRole, ucxlAddress string) error {
 | |
| 	// Basic role validation
 | |
| 	roles := config.GetPredefinedRoles()
 | |
| 	if _, exists := roles[currentRole]; !exists {
 | |
| 		return fmt.Errorf("unknown current role: %s", currentRole)
 | |
| 	}
 | |
| 	
 | |
| 	// All valid roles can retrieve content (encryption handles access control)
 | |
| 	// Additional fine-grained policies can be added here
 | |
| 	return nil
 | |
| }
 | |
| 
 | |
| // checkAnnounceAccessPolicy checks if the current role can announce content
 | |
| func (eds *EncryptedDHTStorage) checkAnnounceAccessPolicy(currentRole, ucxlAddress string) error {
 | |
| 	// Basic role validation
 | |
| 	roles := config.GetPredefinedRoles()
 | |
| 	if _, exists := roles[currentRole]; !exists {
 | |
| 		return fmt.Errorf("unknown current role: %s", currentRole)
 | |
| 	}
 | |
| 	
 | |
| 	// Check if role has coordination or higher authority to announce
 | |
| 	role := roles[currentRole]
 | |
| 	if role.AuthorityLevel == config.AuthorityReadOnly || role.AuthorityLevel == config.AuthoritySuggestion {
 | |
| 		return fmt.Errorf("role %s lacks authority to announce content", currentRole)
 | |
| 	}
 | |
| 	
 | |
| 	return nil
 | |
| }
 | |
| 
 | |
| // auditStoreOperation logs a store operation for audit purposes
 | |
| func (eds *EncryptedDHTStorage) auditStoreOperation(ucxlAddress, role, contentType string, contentSize int, success bool, errorMsg string) {
 | |
| 	// Create audit logger if needed (in production, inject via constructor)
 | |
| 	if eds.config.Security.AuditPath == "" {
 | |
| 		return // No audit path configured
 | |
| 	}
 | |
| 	
 | |
| 	// Log to file or audit system
 | |
| 	auditEntry := map[string]interface{}{
 | |
| 		"timestamp":     time.Now(),
 | |
| 		"operation":     "store",
 | |
| 		"node_id":       eds.nodeID,
 | |
| 		"ucxl_address":  ucxlAddress,
 | |
| 		"role":          role,
 | |
| 		"content_type":  contentType,
 | |
| 		"content_size":  contentSize,
 | |
| 		"success":       success,
 | |
| 		"error_message": errorMsg,
 | |
| 		"audit_trail":   fmt.Sprintf("DHT-STORE-%s-%d", ucxlAddress, time.Now().Unix()),
 | |
| 	}
 | |
| 	
 | |
| 	log.Printf("🔍 AUDIT STORE: %+v", auditEntry)
 | |
| 	
 | |
| 	// In production, write to audit log file or send to audit service
 | |
| 	// For now, just log to console and update metrics
 | |
| 	if success {
 | |
| 		eds.metrics.StoredItems++
 | |
| 	}
 | |
| }
 | |
| 
 | |
| // auditRetrieveOperation logs a retrieve operation for audit purposes
 | |
| func (eds *EncryptedDHTStorage) auditRetrieveOperation(ucxlAddress, role string, success bool, errorMsg string) {
 | |
| 	// Create audit logger if needed
 | |
| 	if eds.config.Security.AuditPath == "" {
 | |
| 		return // No audit path configured
 | |
| 	}
 | |
| 	
 | |
| 	auditEntry := map[string]interface{}{
 | |
| 		"timestamp":     time.Now(),
 | |
| 		"operation":     "retrieve",
 | |
| 		"node_id":       eds.nodeID,
 | |
| 		"ucxl_address":  ucxlAddress,
 | |
| 		"role":          role,
 | |
| 		"success":       success,
 | |
| 		"error_message": errorMsg,
 | |
| 		"audit_trail":   fmt.Sprintf("DHT-RETRIEVE-%s-%d", ucxlAddress, time.Now().Unix()),
 | |
| 	}
 | |
| 	
 | |
| 	log.Printf("🔍 AUDIT RETRIEVE: %+v", auditEntry)
 | |
| 	
 | |
| 	// In production, write to audit log file or send to audit service
 | |
| 	if success {
 | |
| 		eds.metrics.RetrievedItems++
 | |
| 	}
 | |
| }
 | |
| 
 | |
| // auditAnnounceOperation logs an announce operation for audit purposes
 | |
| func (eds *EncryptedDHTStorage) auditAnnounceOperation(ucxlAddress, role string, success bool, errorMsg string) {
 | |
| 	// Create audit logger if needed
 | |
| 	if eds.config.Security.AuditPath == "" {
 | |
| 		return // No audit path configured
 | |
| 	}
 | |
| 	
 | |
| 	auditEntry := map[string]interface{}{
 | |
| 		"timestamp":     time.Now(),
 | |
| 		"operation":     "announce",
 | |
| 		"node_id":       eds.nodeID,
 | |
| 		"ucxl_address":  ucxlAddress,
 | |
| 		"role":          role,
 | |
| 		"success":       success,
 | |
| 		"error_message": errorMsg,
 | |
| 		"audit_trail":   fmt.Sprintf("DHT-ANNOUNCE-%s-%d", ucxlAddress, time.Now().Unix()),
 | |
| 		"peer_id":       eds.host.ID().String(),
 | |
| 	}
 | |
| 	
 | |
| 	log.Printf("🔍 AUDIT ANNOUNCE: %+v", auditEntry)
 | |
| 	
 | |
| 	// In production, write to audit log file or send to audit service
 | |
| } |