Files
CHORUS/pkg/dht/replication_manager.go
anthonyrawlins 543ab216f9 Complete BZZZ functionality port to CHORUS
🎭 CHORUS now contains full BZZZ functionality adapted for containers

Core systems ported:
- P2P networking (libp2p with DHT and PubSub)
- Task coordination (COOEE protocol)
- HMMM collaborative reasoning
- SHHH encryption and security
- SLURP admin election system
- UCXL content addressing
- UCXI server integration
- Hypercore logging system
- Health monitoring and graceful shutdown
- License validation with KACHING

Container adaptations:
- Environment variable configuration (no YAML files)
- Container-optimized logging to stdout/stderr
- Auto-generated agent IDs for container deployments
- Docker-first architecture

All proven BZZZ P2P protocols, AI integration, and collaboration
features are now available in containerized form.

Next: Build and test container deployment.

🤖 Generated with [Claude Code](https://claude.ai/code)

Co-Authored-By: Claude <noreply@anthropic.com>
2025-09-02 20:02:37 +10:00

547 lines
14 KiB
Go

package dht
import (
"context"
"crypto/sha256"
"fmt"
"log"
"sync"
"time"
"github.com/ipfs/go-cid"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/libp2p/go-libp2p/core/routing"
"github.com/multiformats/go-multihash"
)
// ReplicationManager manages DHT data replication and provider records
type ReplicationManager struct {
dht routing.Routing
ctx context.Context
cancel context.CancelFunc
config *ReplicationConfig
// Provider tracking
providers map[string]*ProviderRecord
providersMutex sync.RWMutex
// Replication tracking
contentKeys map[string]*ContentRecord
keysMutex sync.RWMutex
// Background tasks
reprovideTimer *time.Timer
cleanupTimer *time.Timer
// Metrics
metrics *ReplicationMetrics
logger func(msg string, args ...interface{})
}
// ReplicationConfig holds replication configuration
type ReplicationConfig struct {
// Target replication factor for content
ReplicationFactor int
// Interval for reproviding content
ReprovideInterval time.Duration
// Cleanup interval for stale records
CleanupInterval time.Duration
// Provider record TTL
ProviderTTL time.Duration
// Maximum number of providers to track per key
MaxProvidersPerKey int
// Enable automatic replication
EnableAutoReplication bool
// Enable periodic reproviding
EnableReprovide bool
// Maximum concurrent replication operations
MaxConcurrentReplications int
}
// ProviderRecord tracks providers for a specific content key
type ProviderRecord struct {
Key string
Providers []ProviderInfo
LastUpdate time.Time
TTL time.Duration
}
// ProviderInfo contains information about a content provider
type ProviderInfo struct {
PeerID peer.ID
AddedAt time.Time
LastSeen time.Time
Quality float64 // Quality score 0.0-1.0
Distance uint32 // XOR distance from key
}
// ContentRecord tracks local content for replication
type ContentRecord struct {
Key string
Size int64
CreatedAt time.Time
LastProvided time.Time
ReplicationCount int
Priority int // Higher priority gets replicated first
}
// ReplicationMetrics tracks replication statistics
type ReplicationMetrics struct {
mu sync.RWMutex
TotalKeys int64
TotalProviders int64
ReprovideOperations int64
SuccessfulReplications int64
FailedReplications int64
LastReprovideTime time.Time
LastCleanupTime time.Time
AverageReplication float64
}
// DefaultReplicationConfig returns default replication configuration
func DefaultReplicationConfig() *ReplicationConfig {
return &ReplicationConfig{
ReplicationFactor: 3,
ReprovideInterval: 12 * time.Hour,
CleanupInterval: 1 * time.Hour,
ProviderTTL: 24 * time.Hour,
MaxProvidersPerKey: 10,
EnableAutoReplication: true,
EnableReprovide: true,
MaxConcurrentReplications: 5,
}
}
// NewReplicationManager creates a new replication manager
func NewReplicationManager(ctx context.Context, dht routing.Routing, config *ReplicationConfig) *ReplicationManager {
if config == nil {
config = DefaultReplicationConfig()
}
rmCtx, cancel := context.WithCancel(ctx)
rm := &ReplicationManager{
dht: dht,
ctx: rmCtx,
cancel: cancel,
config: config,
providers: make(map[string]*ProviderRecord),
contentKeys: make(map[string]*ContentRecord),
metrics: &ReplicationMetrics{},
logger: func(msg string, args ...interface{}) {
log.Printf("[REPLICATION] "+msg, args...)
},
}
// Start background tasks
rm.startBackgroundTasks()
return rm
}
// AddContent registers content for replication management
func (rm *ReplicationManager) AddContent(key string, size int64, priority int) error {
rm.keysMutex.Lock()
defer rm.keysMutex.Unlock()
record := &ContentRecord{
Key: key,
Size: size,
CreatedAt: time.Now(),
LastProvided: time.Time{}, // Will be set on first provide
ReplicationCount: 0,
Priority: priority,
}
rm.contentKeys[key] = record
rm.updateMetrics()
rm.logger("Added content for replication: %s (size: %d, priority: %d)", key, size, priority)
// Immediately provide if auto-replication is enabled
if rm.config.EnableAutoReplication {
go rm.provideContent(key)
}
return nil
}
// RemoveContent removes content from replication management
func (rm *ReplicationManager) RemoveContent(key string) error {
rm.keysMutex.Lock()
delete(rm.contentKeys, key)
rm.keysMutex.Unlock()
rm.providersMutex.Lock()
delete(rm.providers, key)
rm.providersMutex.Unlock()
rm.updateMetrics()
rm.logger("Removed content from replication: %s", key)
return nil
}
// ProvideContent announces this node as a provider for the given key
func (rm *ReplicationManager) ProvideContent(key string) error {
return rm.provideContent(key)
}
// FindProviders discovers providers for a given content key
func (rm *ReplicationManager) FindProviders(ctx context.Context, key string, limit int) ([]ProviderInfo, error) {
// First check our local provider cache
rm.providersMutex.RLock()
if record, exists := rm.providers[key]; exists && time.Since(record.LastUpdate) < record.TTL {
rm.providersMutex.RUnlock()
// Return cached providers (up to limit)
providers := make([]ProviderInfo, 0, len(record.Providers))
for i, provider := range record.Providers {
if i >= limit {
break
}
providers = append(providers, provider)
}
return providers, nil
}
rm.providersMutex.RUnlock()
// Query DHT for providers
keyHash := sha256.Sum256([]byte(key))
// Create a proper CID from the hash
mh, err := multihash.EncodeName(keyHash[:], "sha2-256")
if err != nil {
return nil, fmt.Errorf("failed to encode multihash: %w", err)
}
contentID := cid.NewCidV1(cid.Raw, mh)
// Use DHT to find providers
providerCh := rm.dht.FindProvidersAsync(ctx, contentID, limit)
var providers []ProviderInfo
for providerInfo := range providerCh {
if len(providers) >= limit {
break
}
provider := ProviderInfo{
PeerID: providerInfo.ID,
AddedAt: time.Now(),
LastSeen: time.Now(),
Quality: 1.0, // Default quality
Distance: calculateDistance(keyHash[:], providerInfo.ID),
}
providers = append(providers, provider)
}
// Cache the results
rm.updateProviderCache(key, providers)
rm.logger("Found %d providers for key: %s", len(providers), key)
return providers, nil
}
// GetReplicationStatus returns replication status for a specific key
func (rm *ReplicationManager) GetReplicationStatus(key string) (*ReplicationStatus, error) {
rm.keysMutex.RLock()
content, contentExists := rm.contentKeys[key]
rm.keysMutex.RUnlock()
rm.providersMutex.RLock()
providers, providersExist := rm.providers[key]
rm.providersMutex.RUnlock()
status := &ReplicationStatus{
Key: key,
TargetReplicas: rm.config.ReplicationFactor,
ActualReplicas: 0,
LastReprovided: time.Time{},
HealthyProviders: 0,
IsLocal: contentExists,
}
if contentExists {
status.LastReprovided = content.LastProvided
status.CreatedAt = content.CreatedAt
status.Size = content.Size
status.Priority = content.Priority
}
if providersExist {
status.ActualReplicas = len(providers.Providers)
// Count healthy providers (seen recently)
cutoff := time.Now().Add(-rm.config.ProviderTTL / 2)
for _, provider := range providers.Providers {
if provider.LastSeen.After(cutoff) {
status.HealthyProviders++
}
}
status.Providers = providers.Providers
}
// Determine health status
if status.ActualReplicas >= status.TargetReplicas {
status.Health = "healthy"
} else if status.ActualReplicas > 0 {
status.Health = "degraded"
} else {
status.Health = "critical"
}
return status, nil
}
// GetMetrics returns replication metrics
func (rm *ReplicationManager) GetMetrics() *ReplicationMetrics {
rm.metrics.mu.RLock()
defer rm.metrics.mu.RUnlock()
// Create a copy to avoid race conditions
metrics := *rm.metrics
return &metrics
}
// provideContent performs the actual content provision operation
func (rm *ReplicationManager) provideContent(key string) error {
ctx, cancel := context.WithTimeout(rm.ctx, 30*time.Second)
defer cancel()
keyHash := sha256.Sum256([]byte(key))
// Create a proper CID from the hash
mh, err := multihash.EncodeName(keyHash[:], "sha2-256")
if err != nil {
rm.metrics.mu.Lock()
rm.metrics.FailedReplications++
rm.metrics.mu.Unlock()
return fmt.Errorf("failed to encode multihash: %w", err)
}
contentID := cid.NewCidV1(cid.Raw, mh)
// Provide the content to the DHT
if err := rm.dht.Provide(ctx, contentID, true); err != nil {
rm.metrics.mu.Lock()
rm.metrics.FailedReplications++
rm.metrics.mu.Unlock()
return fmt.Errorf("failed to provide content %s: %w", key, err)
}
// Update local records
rm.keysMutex.Lock()
if record, exists := rm.contentKeys[key]; exists {
record.LastProvided = time.Now()
record.ReplicationCount++
}
rm.keysMutex.Unlock()
rm.metrics.mu.Lock()
rm.metrics.SuccessfulReplications++
rm.metrics.mu.Unlock()
rm.logger("Successfully provided content: %s", key)
return nil
}
// updateProviderCache updates the provider cache for a key
func (rm *ReplicationManager) updateProviderCache(key string, providers []ProviderInfo) {
rm.providersMutex.Lock()
defer rm.providersMutex.Unlock()
record := &ProviderRecord{
Key: key,
Providers: providers,
LastUpdate: time.Now(),
TTL: rm.config.ProviderTTL,
}
// Limit the number of providers
if len(record.Providers) > rm.config.MaxProvidersPerKey {
record.Providers = record.Providers[:rm.config.MaxProvidersPerKey]
}
rm.providers[key] = record
}
// startBackgroundTasks starts periodic maintenance tasks
func (rm *ReplicationManager) startBackgroundTasks() {
// Reprovide task
if rm.config.EnableReprovide {
rm.reprovideTimer = time.AfterFunc(rm.config.ReprovideInterval, func() {
rm.performReprovide()
// Reschedule
rm.reprovideTimer.Reset(rm.config.ReprovideInterval)
})
}
// Cleanup task
rm.cleanupTimer = time.AfterFunc(rm.config.CleanupInterval, func() {
rm.performCleanup()
// Reschedule
rm.cleanupTimer.Reset(rm.config.CleanupInterval)
})
}
// performReprovide re-provides all local content
func (rm *ReplicationManager) performReprovide() {
rm.logger("Starting reprovide operation")
start := time.Now()
rm.keysMutex.RLock()
keys := make([]string, 0, len(rm.contentKeys))
for key := range rm.contentKeys {
keys = append(keys, key)
}
rm.keysMutex.RUnlock()
// Provide all keys with concurrency limit
semaphore := make(chan struct{}, rm.config.MaxConcurrentReplications)
var wg sync.WaitGroup
var successful, failed int64
for _, key := range keys {
wg.Add(1)
go func(k string) {
defer wg.Done()
semaphore <- struct{}{} // Acquire
defer func() { <-semaphore }() // Release
if err := rm.provideContent(k); err != nil {
rm.logger("Failed to reprovide %s: %v", k, err)
failed++
} else {
successful++
}
}(key)
}
wg.Wait()
rm.metrics.mu.Lock()
rm.metrics.ReprovideOperations++
rm.metrics.LastReprovideTime = time.Now()
rm.metrics.mu.Unlock()
duration := time.Since(start)
rm.logger("Reprovide operation completed: %d successful, %d failed, took %v",
successful, failed, duration)
}
// performCleanup removes stale provider records
func (rm *ReplicationManager) performCleanup() {
rm.logger("Starting cleanup operation")
rm.providersMutex.Lock()
defer rm.providersMutex.Unlock()
cutoff := time.Now().Add(-rm.config.ProviderTTL)
removed := 0
for key, record := range rm.providers {
if record.LastUpdate.Before(cutoff) {
delete(rm.providers, key)
removed++
} else {
// Clean up individual providers within the record
validProviders := make([]ProviderInfo, 0, len(record.Providers))
for _, provider := range record.Providers {
if provider.LastSeen.After(cutoff) {
validProviders = append(validProviders, provider)
}
}
record.Providers = validProviders
}
}
rm.metrics.mu.Lock()
rm.metrics.LastCleanupTime = time.Now()
rm.metrics.mu.Unlock()
rm.logger("Cleanup operation completed: removed %d stale records", removed)
}
// updateMetrics recalculates metrics
func (rm *ReplicationManager) updateMetrics() {
rm.metrics.mu.Lock()
defer rm.metrics.mu.Unlock()
rm.metrics.TotalKeys = int64(len(rm.contentKeys))
totalProviders := int64(0)
totalReplications := int64(0)
for _, record := range rm.providers {
totalProviders += int64(len(record.Providers))
}
for _, content := range rm.contentKeys {
totalReplications += int64(content.ReplicationCount)
}
rm.metrics.TotalProviders = totalProviders
if rm.metrics.TotalKeys > 0 {
rm.metrics.AverageReplication = float64(totalReplications) / float64(rm.metrics.TotalKeys)
}
}
// Stop stops the replication manager
func (rm *ReplicationManager) Stop() error {
rm.cancel()
if rm.reprovideTimer != nil {
rm.reprovideTimer.Stop()
}
if rm.cleanupTimer != nil {
rm.cleanupTimer.Stop()
}
rm.logger("Replication manager stopped")
return nil
}
// ReplicationStatus holds the replication status of a specific key
type ReplicationStatus struct {
Key string
TargetReplicas int
ActualReplicas int
HealthyProviders int
LastReprovided time.Time
CreatedAt time.Time
Size int64
Priority int
Health string // "healthy", "degraded", "critical"
IsLocal bool
Providers []ProviderInfo
}
// calculateDistance calculates XOR distance between key and peer ID
func calculateDistance(key []byte, peerID peer.ID) uint32 {
peerBytes := []byte(peerID)
var distance uint32
minLen := len(key)
if len(peerBytes) < minLen {
minLen = len(peerBytes)
}
for i := 0; i < minLen; i++ {
distance ^= uint32(key[i] ^ peerBytes[i])
}
return distance
}