Files
anthonyrawlins d96c931a29 Resolve import cycles and migrate to chorus.services module path
This comprehensive refactoring addresses critical architectural issues:

IMPORT CYCLE RESOLUTION:
• pkg/crypto ↔ pkg/slurp/roles: Created pkg/security/access_levels.go
• pkg/ucxl → pkg/dht: Created pkg/storage/interfaces.go
• pkg/slurp/leader → pkg/election → pkg/slurp/storage: Moved types to pkg/election/interfaces.go

MODULE PATH MIGRATION:
• Changed from github.com/anthonyrawlins/bzzz to chorus.services/bzzz
• Updated all import statements across 115+ files
• Maintains compatibility while removing personal GitHub account dependency

TYPE SYSTEM IMPROVEMENTS:
• Resolved duplicate type declarations in crypto package
• Added missing type definitions (RoleStatus, TimeRestrictions, KeyStatus, KeyRotationResult)
• Proper interface segregation to prevent future cycles

ARCHITECTURAL BENEFITS:
• Build now progresses past structural issues to normal dependency resolution
• Cleaner separation of concerns between packages
• Eliminates circular dependencies that prevented compilation
• Establishes foundation for scalable codebase growth

🤖 Generated with [Claude Code](https://claude.ai/code)

Co-Authored-By: Claude <noreply@anthropic.com>
2025-08-17 10:04:25 +10:00

371 lines
18 KiB
Go

package distribution
import (
"context"
"crypto/sha256"
"encoding/hex"
"encoding/json"
"fmt"
"sync"
"time"
"chorus.services/bzzz/pkg/dht"
"chorus.services/bzzz/pkg/crypto"
"chorus.services/bzzz/pkg/election"
"chorus.services/bzzz/pkg/ucxl"
"chorus.services/bzzz/pkg/config"
slurpContext "chorus.services/bzzz/pkg/slurp/context"
)
// ContextDistributor handles distributed context operations via DHT
//
// This is the primary interface for distributing context data across the BZZZ
// cluster using the existing DHT infrastructure with role-based encryption
// and conflict resolution capabilities.
type ContextDistributor interface {
// DistributeContext encrypts and stores context in DHT for role-based access
// The context is encrypted for each specified role and distributed across
// the cluster with the configured replication factor
DistributeContext(ctx context.Context, node *slurpContext.ContextNode, roles []string) error
// RetrieveContext gets context from DHT and decrypts for the requesting role
// Automatically handles role-based decryption and returns the resolved context
RetrieveContext(ctx context.Context, address ucxl.Address, role string) (*slurpContext.ResolvedContext, error)
// UpdateContext updates existing distributed context with conflict resolution
// Uses vector clocks and leader coordination for consistent updates
UpdateContext(ctx context.Context, node *slurpContext.ContextNode, roles []string) (*ConflictResolution, error)
// DeleteContext removes context from distributed storage
// Handles distributed deletion across all replicas
DeleteContext(ctx context.Context, address ucxl.Address) error
// ListDistributedContexts lists contexts available in the DHT for a role
// Provides efficient enumeration with role-based filtering
ListDistributedContexts(ctx context.Context, role string, criteria *DistributionCriteria) ([]*DistributedContextInfo, error)
// Sync synchronizes local state with distributed DHT
// Ensures eventual consistency by exchanging metadata with peers
Sync(ctx context.Context) (*SyncResult, error)
// Replicate ensures context has the desired replication factor
// Manages replica placement and health across cluster nodes
Replicate(ctx context.Context, address ucxl.Address, replicationFactor int) error
// GetReplicaHealth returns health status of context replicas
// Provides visibility into replication status and node health
GetReplicaHealth(ctx context.Context, address ucxl.Address) (*ReplicaHealth, error)
// GetDistributionStats returns distribution performance statistics
GetDistributionStats() (*DistributionStatistics, error)
// SetReplicationPolicy configures replication behavior
SetReplicationPolicy(policy *ReplicationPolicy) error
}
// DHTStorage provides direct DHT storage operations for context data
type DHTStorage interface {
// Put stores encrypted context data in the DHT
Put(ctx context.Context, key string, data []byte, options *DHTStoreOptions) error
// Get retrieves encrypted context data from the DHT
Get(ctx context.Context, key string) ([]byte, *DHTMetadata, error)
// Delete removes data from the DHT
Delete(ctx context.Context, key string) error
// Exists checks if data exists in the DHT
Exists(ctx context.Context, key string) (bool, error)
// FindProviders finds nodes that have the specified data
FindProviders(ctx context.Context, key string) ([]string, error)
// ListKeys lists all keys matching a pattern
ListKeys(ctx context.Context, pattern string) ([]string, error)
// GetStats returns DHT operation statistics
GetStats() (*DHTStatistics, error)
}
// ConflictResolver handles conflicts during concurrent context updates
type ConflictResolver interface {
// ResolveConflict resolves conflicts between concurrent context updates
// Uses vector clocks and semantic merging rules for resolution
ResolveConflict(ctx context.Context, local *slurpContext.ContextNode, remote *slurpContext.ContextNode) (*ConflictResolution, error)
// DetectConflicts detects potential conflicts before they occur
// Provides early warning for conflicting operations
DetectConflicts(ctx context.Context, update *slurpContext.ContextNode) ([]*PotentialConflict, error)
// MergeContexts merges multiple context versions semantically
// Combines changes from different sources intelligently
MergeContexts(ctx context.Context, contexts []*slurpContext.ContextNode) (*slurpContext.ContextNode, error)
// GetConflictHistory returns history of resolved conflicts
GetConflictHistory(ctx context.Context, address ucxl.Address) ([]*ConflictResolution, error)
// SetResolutionStrategy configures conflict resolution strategy
SetResolutionStrategy(strategy *ResolutionStrategy) error
}
// ReplicationManager manages context replication across cluster nodes
type ReplicationManager interface {
// EnsureReplication ensures context meets replication requirements
EnsureReplication(ctx context.Context, address ucxl.Address, factor int) error
// RepairReplicas repairs missing or corrupted replicas
RepairReplicas(ctx context.Context, address ucxl.Address) (*RepairResult, error)
// BalanceReplicas rebalances replicas across cluster nodes
BalanceReplicas(ctx context.Context) (*RebalanceResult, error)
// GetReplicationStatus returns current replication status
GetReplicationStatus(ctx context.Context, address ucxl.Address) (*ReplicationStatus, error)
// SetReplicationFactor sets the desired replication factor
SetReplicationFactor(factor int) error
// GetReplicationStats returns replication statistics
GetReplicationStats() (*ReplicationStatistics, error)
}
// GossipProtocol handles efficient metadata synchronization
type GossipProtocol interface {
// StartGossip begins gossip protocol for metadata synchronization
StartGossip(ctx context.Context) error
// StopGossip stops gossip protocol
StopGossip(ctx context.Context) error
// GossipMetadata exchanges metadata with peer nodes
GossipMetadata(ctx context.Context, peer string) error
// GetGossipState returns current gossip protocol state
GetGossipState() (*GossipState, error)
// SetGossipInterval configures gossip frequency
SetGossipInterval(interval time.Duration) error
// GetGossipStats returns gossip protocol statistics
GetGossipStats() (*GossipStatistics, error)
}
// NetworkManager handles network topology and partition detection
type NetworkManager interface {
// DetectPartition detects network partitions in the cluster
DetectPartition(ctx context.Context) (*PartitionInfo, error)
// GetTopology returns current network topology
GetTopology(ctx context.Context) (*NetworkTopology, error)
// GetPeers returns list of available peer nodes
GetPeers(ctx context.Context) ([]*PeerInfo, error)
// CheckConnectivity checks connectivity to peer nodes
CheckConnectivity(ctx context.Context, peers []string) (*ConnectivityReport, error)
// RecoverFromPartition attempts to recover from network partition
RecoverFromPartition(ctx context.Context) (*RecoveryResult, error)
// GetNetworkStats returns network performance statistics
GetNetworkStats() (*NetworkStatistics, error)
}
// Supporting types for distribution operations
// DistributionCriteria represents criteria for listing distributed contexts
type DistributionCriteria struct {
Tags []string `json:"tags"` // Required tags
Technologies []string `json:"technologies"` // Required technologies
MinReplicas int `json:"min_replicas"` // Minimum replica count
MaxAge *time.Duration `json:"max_age"` // Maximum age
HealthyOnly bool `json:"healthy_only"` // Only healthy replicas
Limit int `json:"limit"` // Maximum results
Offset int `json:"offset"` // Result offset
}
// DistributedContextInfo represents information about distributed context
type DistributedContextInfo struct {
Address ucxl.Address `json:"address"` // Context address
Roles []string `json:"roles"` // Accessible roles
ReplicaCount int `json:"replica_count"` // Number of replicas
HealthyReplicas int `json:"healthy_replicas"` // Healthy replica count
LastUpdated time.Time `json:"last_updated"` // Last update time
Version int64 `json:"version"` // Version number
Size int64 `json:"size"` // Data size
Checksum string `json:"checksum"` // Data checksum
}
// ConflictResolution represents the result of conflict resolution
type ConflictResolution struct {
Address ucxl.Address `json:"address"` // Context address
ResolutionType ResolutionType `json:"resolution_type"` // How conflict was resolved
MergedContext *slurpContext.ContextNode `json:"merged_context"` // Resulting merged context
ConflictingSources []string `json:"conflicting_sources"` // Sources of conflict
ResolutionTime time.Duration `json:"resolution_time"` // Time taken to resolve
ResolvedAt time.Time `json:"resolved_at"` // When resolved
Confidence float64 `json:"confidence"` // Confidence in resolution
ManualReview bool `json:"manual_review"` // Whether manual review needed
}
// ResolutionType represents different types of conflict resolution
type ResolutionType string
const (
ResolutionMerged ResolutionType = "merged" // Contexts were merged
ResolutionLastWriter ResolutionType = "last_writer" // Last writer wins
ResolutionLeaderDecision ResolutionType = "leader_decision" // Leader made decision
ResolutionManual ResolutionType = "manual" // Manual resolution required
ResolutionFailed ResolutionType = "failed" // Resolution failed
)
// PotentialConflict represents a detected potential conflict
type PotentialConflict struct {
Address ucxl.Address `json:"address"` // Context address
ConflictType ConflictType `json:"conflict_type"` // Type of conflict
Description string `json:"description"` // Conflict description
Severity ConflictSeverity `json:"severity"` // Conflict severity
AffectedFields []string `json:"affected_fields"` // Fields in conflict
Suggestions []string `json:"suggestions"` // Resolution suggestions
DetectedAt time.Time `json:"detected_at"` // When detected
}
// ConflictType represents different types of conflicts
type ConflictType string
const (
ConflictConcurrentUpdate ConflictType = "concurrent_update" // Concurrent updates
ConflictFieldMismatch ConflictType = "field_mismatch" // Field value mismatch
ConflictVersionSkew ConflictType = "version_skew" // Version inconsistency
ConflictRoleAccess ConflictType = "role_access" // Role access conflict
ConflictSchemaChange ConflictType = "schema_change" // Schema version conflict
)
// ConflictSeverity represents conflict severity levels
type ConflictSeverity string
const (
SeverityLow ConflictSeverity = "low" // Low severity - auto-resolvable
SeverityMedium ConflictSeverity = "medium" // Medium severity - may need review
SeverityHigh ConflictSeverity = "high" // High severity - needs attention
SeverityCritical ConflictSeverity = "critical" // Critical - manual intervention required
)
// ResolutionStrategy represents conflict resolution strategy configuration
type ResolutionStrategy struct {
DefaultResolution ResolutionType `json:"default_resolution"` // Default resolution method
FieldPriorities map[string]int `json:"field_priorities"` // Field priority mapping
AutoMergeEnabled bool `json:"auto_merge_enabled"` // Enable automatic merging
RequireConsensus bool `json:"require_consensus"` // Require node consensus
LeaderBreaksTies bool `json:"leader_breaks_ties"` // Leader resolves ties
MaxConflictAge time.Duration `json:"max_conflict_age"` // Max age before escalation
EscalationRoles []string `json:"escalation_roles"` // Roles for manual escalation
}
// SyncResult represents the result of synchronization operation
type SyncResult struct {
SyncedContexts int `json:"synced_contexts"` // Contexts synchronized
ConflictsResolved int `json:"conflicts_resolved"` // Conflicts resolved
Errors []string `json:"errors"` // Synchronization errors
SyncTime time.Duration `json:"sync_time"` // Total sync time
PeersContacted int `json:"peers_contacted"` // Number of peers contacted
DataTransferred int64 `json:"data_transferred"` // Bytes transferred
SyncedAt time.Time `json:"synced_at"` // When sync completed
}
// ReplicaHealth represents health status of context replicas
type ReplicaHealth struct {
Address ucxl.Address `json:"address"` // Context address
TotalReplicas int `json:"total_replicas"` // Total replica count
HealthyReplicas int `json:"healthy_replicas"` // Healthy replica count
FailedReplicas int `json:"failed_replicas"` // Failed replica count
ReplicaNodes []*ReplicaNode `json:"replica_nodes"` // Individual replica status
OverallHealth HealthStatus `json:"overall_health"` // Overall health status
LastChecked time.Time `json:"last_checked"` // When last checked
RepairNeeded bool `json:"repair_needed"` // Whether repair is needed
}
// ReplicaNode represents status of individual replica node
type ReplicaNode struct {
NodeID string `json:"node_id"` // Node identifier
Status ReplicaStatus `json:"status"` // Replica status
LastSeen time.Time `json:"last_seen"` // When last seen
Version int64 `json:"version"` // Context version
Checksum string `json:"checksum"` // Data checksum
Latency time.Duration `json:"latency"` // Network latency
NetworkAddress string `json:"network_address"` // Network address
}
// ReplicaStatus represents status of individual replica
type ReplicaStatus string
const (
ReplicaHealthy ReplicaStatus = "healthy" // Replica is healthy
ReplicaStale ReplicaStatus = "stale" // Replica is stale
ReplicaCorrupted ReplicaStatus = "corrupted" // Replica is corrupted
ReplicaUnreachable ReplicaStatus = "unreachable" // Replica is unreachable
ReplicaSyncing ReplicaStatus = "syncing" // Replica is syncing
)
// HealthStatus represents overall health status
type HealthStatus string
const (
HealthHealthy HealthStatus = "healthy" // All replicas healthy
HealthDegraded HealthStatus = "degraded" // Some replicas unhealthy
HealthCritical HealthStatus = "critical" // Most replicas unhealthy
HealthFailed HealthStatus = "failed" // All replicas failed
)
// ReplicationPolicy represents replication behavior configuration
type ReplicationPolicy struct {
DefaultFactor int `json:"default_factor"` // Default replication factor
MinFactor int `json:"min_factor"` // Minimum replication factor
MaxFactor int `json:"max_factor"` // Maximum replication factor
PreferredZones []string `json:"preferred_zones"` // Preferred availability zones
AvoidSameNode bool `json:"avoid_same_node"` // Avoid same physical node
ConsistencyLevel ConsistencyLevel `json:"consistency_level"` // Consistency requirements
RepairThreshold float64 `json:"repair_threshold"` // Health threshold for repair
RebalanceInterval time.Duration `json:"rebalance_interval"` // Rebalancing frequency
}
// ConsistencyLevel represents consistency requirements
type ConsistencyLevel string
const (
ConsistencyEventual ConsistencyLevel = "eventual" // Eventual consistency
ConsistencyQuorum ConsistencyLevel = "quorum" // Quorum-based consistency
ConsistencyStrong ConsistencyLevel = "strong" // Strong consistency
)
// DHTStoreOptions represents options for DHT storage operations
type DHTStoreOptions struct {
ReplicationFactor int `json:"replication_factor"` // Number of replicas
TTL *time.Duration `json:"ttl,omitempty"` // Time to live
Priority Priority `json:"priority"` // Storage priority
Compress bool `json:"compress"` // Whether to compress
Checksum bool `json:"checksum"` // Whether to checksum
Metadata map[string]interface{} `json:"metadata"` // Additional metadata
}
// Priority represents storage operation priority
type Priority string
const (
PriorityLow Priority = "low" // Low priority
PriorityNormal Priority = "normal" // Normal priority
PriorityHigh Priority = "high" // High priority
PriorityCritical Priority = "critical" // Critical priority
)
// DHTMetadata represents metadata for DHT stored data
type DHTMetadata struct {
StoredAt time.Time `json:"stored_at"` // When stored
UpdatedAt time.Time `json:"updated_at"` // When last updated
Version int64 `json:"version"` // Version number
Size int64 `json:"size"` // Data size
Checksum string `json:"checksum"` // Data checksum
ReplicationFactor int `json:"replication_factor"` // Number of replicas
TTL *time.Time `json:"ttl,omitempty"` // Time to live
Metadata map[string]interface{} `json:"metadata"` // Additional metadata
}