//go:build slurp_full // +build slurp_full // Package distribution provides centralized coordination for distributed context operations package distribution import ( "context" "fmt" "sync" "time" "chorus/pkg/config" "chorus/pkg/crypto" "chorus/pkg/dht" "chorus/pkg/election" slurpContext "chorus/pkg/slurp/context" "chorus/pkg/ucxl" ) // DistributionCoordinator orchestrates distributed context operations across the cluster type DistributionCoordinator struct { mu sync.RWMutex config *config.Config dht dht.DHT roleCrypto *crypto.RoleCrypto election election.Election distributor ContextDistributor replicationMgr ReplicationManager conflictResolver ConflictResolver gossipProtocol GossipProtocol networkMgr NetworkManager // Coordination state isLeader bool leaderID string coordinationTasks chan *CoordinationTask distributionQueue chan *DistributionRequest roleFilters map[string]*RoleFilter healthMonitors map[string]*HealthMonitor // Statistics and metrics stats *CoordinationStatistics performanceMetrics *PerformanceMetrics // Configuration maxConcurrentTasks int healthCheckInterval time.Duration leaderElectionTTL time.Duration distributionTimeout time.Duration } // CoordinationTask represents a task for the coordinator type CoordinationTask struct { TaskID string `json:"task_id"` TaskType CoordinationTaskType `json:"task_type"` Priority Priority `json:"priority"` CreatedAt time.Time `json:"created_at"` RequestedBy string `json:"requested_by"` Payload interface{} `json:"payload"` Context context.Context `json:"-"` Callback func(error) `json:"-"` } // CoordinationTaskType represents different types of coordination tasks type CoordinationTaskType string const ( TaskTypeDistribution CoordinationTaskType = "distribution" TaskTypeReplication CoordinationTaskType = "replication" TaskTypeConflictResolve CoordinationTaskType = "conflict_resolve" TaskTypeHealthCheck CoordinationTaskType = "health_check" TaskTypeNetworkRepair CoordinationTaskType = "network_repair" TaskTypeLoadBalance CoordinationTaskType = "load_balance" TaskTypeRoleSync CoordinationTaskType = "role_sync" ) // DistributionRequest represents a request for context distribution type DistributionRequest struct { RequestID string `json:"request_id"` ContextNode *slurpContext.ContextNode `json:"context_node"` TargetRoles []string `json:"target_roles"` Priority Priority `json:"priority"` RequesterID string `json:"requester_id"` CreatedAt time.Time `json:"created_at"` Options *DistributionOptions `json:"options"` Callback func(*DistributionResult, error) `json:"-"` } // DistributionOptions contains options for context distribution type DistributionOptions struct { ReplicationFactor int `json:"replication_factor"` ConsistencyLevel ConsistencyLevel `json:"consistency_level"` EncryptionLevel crypto.AccessLevel `json:"encryption_level"` TTL *time.Duration `json:"ttl,omitempty"` PreferredZones []string `json:"preferred_zones"` ExcludedNodes []string `json:"excluded_nodes"` ConflictResolution ResolutionType `json:"conflict_resolution"` } // DistributionResult represents the result of a distribution operation type DistributionResult struct { RequestID string `json:"request_id"` Success bool `json:"success"` DistributedNodes []string `json:"distributed_nodes"` ReplicationFactor int `json:"replication_factor"` ProcessingTime time.Duration `json:"processing_time"` Errors []string `json:"errors"` ConflictResolved *ConflictResolution `json:"conflict_resolved,omitempty"` CompletedAt time.Time `json:"completed_at"` } // RoleFilter manages role-based filtering for context access type RoleFilter struct { RoleID string `json:"role_id"` AccessLevel crypto.AccessLevel `json:"access_level"` AllowedCompartments []string `json:"allowed_compartments"` FilterRules []*FilterRule `json:"filter_rules"` LastUpdated time.Time `json:"last_updated"` } // FilterRule represents a single filtering rule type FilterRule struct { RuleID string `json:"rule_id"` RuleType FilterRuleType `json:"rule_type"` Pattern string `json:"pattern"` Action FilterAction `json:"action"` Metadata map[string]interface{} `json:"metadata"` } // FilterRuleType represents different types of filter rules type FilterRuleType string const ( FilterRuleTypeTag FilterRuleType = "tag" FilterRuleTypePath FilterRuleType = "path" FilterRuleTypeTechnology FilterRuleType = "technology" FilterRuleTypeContent FilterRuleType = "content" ) // FilterAction represents the action to take when a rule matches type FilterAction string const ( FilterActionAllow FilterAction = "allow" FilterActionDeny FilterAction = "deny" FilterActionModify FilterAction = "modify" FilterActionAudit FilterAction = "audit" ) // HealthMonitor monitors the health of a specific component type HealthMonitor struct { ComponentID string `json:"component_id"` ComponentType ComponentType `json:"component_type"` Status HealthStatus `json:"status"` LastHealthCheck time.Time `json:"last_health_check"` HealthScore float64 `json:"health_score"` Metrics map[string]interface{} `json:"metrics"` AlertThresholds *AlertThresholds `json:"alert_thresholds"` } // ComponentType represents different types of components to monitor type ComponentType string const ( ComponentTypeDHT ComponentType = "dht" ComponentTypeReplication ComponentType = "replication" ComponentTypeGossip ComponentType = "gossip" ComponentTypeNetwork ComponentType = "network" ComponentTypeConflictResolver ComponentType = "conflict_resolver" ) // AlertThresholds defines thresholds for health alerts type AlertThresholds struct { WarningThreshold float64 `json:"warning_threshold"` CriticalThreshold float64 `json:"critical_threshold"` RecoveryThreshold float64 `json:"recovery_threshold"` } // CoordinationStatistics tracks coordination performance type CoordinationStatistics struct { TotalTasks int64 `json:"total_tasks"` CompletedTasks int64 `json:"completed_tasks"` FailedTasks int64 `json:"failed_tasks"` QueuedTasks int64 `json:"queued_tasks"` AverageProcessTime time.Duration `json:"average_process_time"` LeaderElections int64 `json:"leader_elections"` LastLeaderChange time.Time `json:"last_leader_change"` DistributionSuccess float64 `json:"distribution_success_rate"` ConflictResolutions int64 `json:"conflict_resolutions"` LastUpdated time.Time `json:"last_updated"` } // PerformanceMetrics tracks detailed performance metrics type PerformanceMetrics struct { ThroughputPerSecond float64 `json:"throughput_per_second"` LatencyPercentiles map[string]float64 `json:"latency_percentiles"` ErrorRateByType map[string]float64 `json:"error_rate_by_type"` ResourceUtilization map[string]float64 `json:"resource_utilization"` NetworkMetrics *NetworkMetrics `json:"network_metrics"` StorageMetrics *StorageMetrics `json:"storage_metrics"` LastCalculated time.Time `json:"last_calculated"` } // NetworkMetrics tracks network-related performance type NetworkMetrics struct { BandwidthUtilization float64 `json:"bandwidth_utilization"` AverageLatency time.Duration `json:"average_latency"` PacketLossRate float64 `json:"packet_loss_rate"` ConnectionCount int `json:"connection_count"` MessageThroughput float64 `json:"message_throughput"` } // StorageMetrics tracks storage-related performance type StorageMetrics struct { TotalContexts int64 `json:"total_contexts"` StorageUtilization float64 `json:"storage_utilization"` CompressionRatio float64 `json:"compression_ratio"` ReplicationEfficiency float64 `json:"replication_efficiency"` CacheHitRate float64 `json:"cache_hit_rate"` } // NewDistributionCoordinator creates a new distribution coordinator func NewDistributionCoordinator( config *config.Config, dhtInstance dht.DHT, roleCrypto *crypto.RoleCrypto, election election.Election, ) (*DistributionCoordinator, error) { if config == nil { return nil, fmt.Errorf("config is required") } if dhtInstance == nil { return nil, fmt.Errorf("DHT instance is required") } if roleCrypto == nil { return nil, fmt.Errorf("role crypto instance is required") } if election == nil { return nil, fmt.Errorf("election instance is required") } // Create distributor distributor, err := NewDHTContextDistributor(dhtInstance, roleCrypto, election, config) if err != nil { return nil, fmt.Errorf("failed to create context distributor: %w", err) } coord := &DistributionCoordinator{ config: config, dht: dhtInstance, roleCrypto: roleCrypto, election: election, distributor: distributor, coordinationTasks: make(chan *CoordinationTask, 1000), distributionQueue: make(chan *DistributionRequest, 500), roleFilters: make(map[string]*RoleFilter), healthMonitors: make(map[string]*HealthMonitor), maxConcurrentTasks: 10, healthCheckInterval: 30 * time.Second, leaderElectionTTL: 60 * time.Second, distributionTimeout: 30 * time.Second, stats: &CoordinationStatistics{ LastUpdated: time.Now(), }, performanceMetrics: &PerformanceMetrics{ LatencyPercentiles: make(map[string]float64), ErrorRateByType: make(map[string]float64), ResourceUtilization: make(map[string]float64), NetworkMetrics: &NetworkMetrics{}, StorageMetrics: &StorageMetrics{}, LastCalculated: time.Now(), }, } // Initialize components if err := coord.initializeComponents(); err != nil { return nil, fmt.Errorf("failed to initialize components: %w", err) } // Initialize role filters coord.initializeRoleFilters() // Initialize health monitors coord.initializeHealthMonitors() return coord, nil } // Start starts the distribution coordinator func (dc *DistributionCoordinator) Start(ctx context.Context) error { // Start distributor if err := dc.distributor.Start(ctx); err != nil { return fmt.Errorf("failed to start distributor: %w", err) } // Start background workers go dc.coordinationWorker(ctx) go dc.distributionWorker(ctx) go dc.healthMonitorWorker(ctx) go dc.leaderElectionWorker(ctx) go dc.metricsCollector(ctx) return nil } // Stop stops the distribution coordinator func (dc *DistributionCoordinator) Stop(ctx context.Context) error { // Stop distributor if err := dc.distributor.Stop(ctx); err != nil { return fmt.Errorf("failed to stop distributor: %w", err) } close(dc.coordinationTasks) close(dc.distributionQueue) return nil } // DistributeContext distributes context with coordination func (dc *DistributionCoordinator) DistributeContext( ctx context.Context, node *slurpContext.ContextNode, roles []string, options *DistributionOptions, ) (*DistributionResult, error) { // Apply role filtering filteredRoles := dc.applyRoleFilters(roles, node) // Create distribution request request := &DistributionRequest{ RequestID: dc.generateRequestID(), ContextNode: node, TargetRoles: filteredRoles, Priority: PriorityNormal, RequesterID: dc.config.Agent.ID, CreatedAt: time.Now(), Options: options, } if options == nil { request.Options = dc.getDefaultDistributionOptions() } // Execute distribution return dc.executeDistribution(ctx, request) } // CoordinateReplication coordinates replication across the cluster func (dc *DistributionCoordinator) CoordinateReplication( ctx context.Context, address ucxl.Address, targetFactor int, ) error { task := &CoordinationTask{ TaskID: dc.generateTaskID(), TaskType: TaskTypeReplication, Priority: PriorityNormal, CreatedAt: time.Now(), RequestedBy: dc.config.Agent.ID, Payload: map[string]interface{}{ "address": address, "target_factor": targetFactor, }, Context: ctx, } return dc.submitTask(task) } // ResolveConflicts resolves conflicts in distributed contexts func (dc *DistributionCoordinator) ResolveConflicts( ctx context.Context, conflicts []*PotentialConflict, ) ([]*ConflictResolution, error) { results := make([]*ConflictResolution, 0, len(conflicts)) for _, conflict := range conflicts { task := &CoordinationTask{ TaskID: dc.generateTaskID(), TaskType: TaskTypeConflictResolve, Priority: dc.priorityFromSeverity(conflict.Severity), CreatedAt: time.Now(), RequestedBy: dc.config.Agent.ID, Payload: conflict, Context: ctx, } if err := dc.submitTask(task); err != nil { // Log error but continue with other conflicts continue } } return results, nil } // GetClusterHealth returns the overall health of the cluster func (dc *DistributionCoordinator) GetClusterHealth() (*ClusterHealth, error) { dc.mu.RLock() defer dc.mu.RUnlock() health := &ClusterHealth{ OverallStatus: dc.calculateOverallHealth(), NodeCount: len(dc.healthMonitors) + 1, // Placeholder count including current node HealthyNodes: 0, UnhealthyNodes: 0, ComponentHealth: make(map[string]*ComponentHealth), LastUpdated: time.Now(), Alerts: []string{}, Recommendations: []string{}, } // Calculate component health for componentID, monitor := range dc.healthMonitors { health.ComponentHealth[componentID] = &ComponentHealth{ ComponentType: monitor.ComponentType, Status: monitor.Status, HealthScore: monitor.HealthScore, LastCheck: monitor.LastHealthCheck, Metrics: monitor.Metrics, } if monitor.Status == HealthHealthy { health.HealthyNodes++ } else { health.UnhealthyNodes++ } } return health, nil } // GetCoordinationStats returns coordination statistics func (dc *DistributionCoordinator) GetCoordinationStats() (*CoordinationStatistics, error) { dc.mu.RLock() defer dc.mu.RUnlock() // Update real-time stats dc.stats.QueuedTasks = int64(len(dc.coordinationTasks) + len(dc.distributionQueue)) dc.stats.LastUpdated = time.Now() return dc.stats, nil } // GetPerformanceMetrics returns detailed performance metrics func (dc *DistributionCoordinator) GetPerformanceMetrics() (*PerformanceMetrics, error) { dc.mu.RLock() defer dc.mu.RUnlock() // Update calculated metrics dc.updatePerformanceMetrics() return dc.performanceMetrics, nil } // Background workers func (dc *DistributionCoordinator) coordinationWorker(ctx context.Context) { // Create worker pool workerCount := dc.maxConcurrentTasks for i := 0; i < workerCount; i++ { go dc.taskWorker(ctx, i) } // Task dispatcher for { select { case <-ctx.Done(): return case task := <-dc.coordinationTasks: if task == nil { return // Channel closed } // Task is picked up by worker pool } } } func (dc *DistributionCoordinator) taskWorker(ctx context.Context, workerID int) { for { select { case <-ctx.Done(): return case task := <-dc.coordinationTasks: if task == nil { return // Channel closed } dc.processCoordinationTask(task) } } } func (dc *DistributionCoordinator) distributionWorker(ctx context.Context) { for { select { case <-ctx.Done(): return case request := <-dc.distributionQueue: if request == nil { return // Channel closed } result, err := dc.executeDistributionRequest(ctx, request) if request.Callback != nil { go request.Callback(result, err) } } } } func (dc *DistributionCoordinator) healthMonitorWorker(ctx context.Context) { ticker := time.NewTicker(dc.healthCheckInterval) defer ticker.Stop() for { select { case <-ctx.Done(): return case <-ticker.C: dc.performHealthChecks(ctx) } } } func (dc *DistributionCoordinator) leaderElectionWorker(ctx context.Context) { ticker := time.NewTicker(dc.leaderElectionTTL / 2) defer ticker.Stop() for { select { case <-ctx.Done(): return case <-ticker.C: dc.checkLeadershipStatus() } } } func (dc *DistributionCoordinator) metricsCollector(ctx context.Context) { ticker := time.NewTicker(60 * time.Second) defer ticker.Stop() for { select { case <-ctx.Done(): return case <-ticker.C: dc.collectMetrics() } } } // Helper methods func (dc *DistributionCoordinator) initializeComponents() error { var err error // Initialize replication manager dc.replicationMgr, err = NewReplicationManager(dc.dht, dc.config) if err != nil { return fmt.Errorf("failed to create replication manager: %w", err) } // Initialize conflict resolver dc.conflictResolver, err = NewConflictResolver(dc.dht, dc.config) if err != nil { return fmt.Errorf("failed to create conflict resolver: %w", err) } // Initialize gossip protocol dc.gossipProtocol, err = NewGossipProtocol(dc.dht, dc.config) if err != nil { return fmt.Errorf("failed to create gossip protocol: %w", err) } // Initialize network manager dc.networkMgr, err = NewNetworkManager(dc.dht, dc.config) if err != nil { return fmt.Errorf("failed to create network manager: %w", err) } return nil } func (dc *DistributionCoordinator) initializeRoleFilters() { // Initialize role filters based on configuration roles := []string{"senior_architect", "project_manager", "devops_engineer", "backend_developer", "frontend_developer"} for _, role := range roles { dc.roleFilters[role] = &RoleFilter{ RoleID: role, AccessLevel: dc.getAccessLevelForRole(role), AllowedCompartments: dc.getAllowedCompartments(role), FilterRules: dc.getDefaultFilterRules(role), LastUpdated: time.Now(), } } } func (dc *DistributionCoordinator) initializeHealthMonitors() { components := map[string]ComponentType{ "dht": ComponentTypeDHT, "replication": ComponentTypeReplication, "gossip": ComponentTypeGossip, "network": ComponentTypeNetwork, "conflict_resolver": ComponentTypeConflictResolver, } for componentID, componentType := range components { dc.healthMonitors[componentID] = &HealthMonitor{ ComponentID: componentID, ComponentType: componentType, Status: HealthHealthy, LastHealthCheck: time.Now(), HealthScore: 1.0, Metrics: make(map[string]interface{}), AlertThresholds: &AlertThresholds{ WarningThreshold: 0.8, CriticalThreshold: 0.5, RecoveryThreshold: 0.9, }, } } } func (dc *DistributionCoordinator) applyRoleFilters(roles []string, node *slurpContext.ContextNode) []string { filtered := []string{} for _, role := range roles { if filter, exists := dc.roleFilters[role]; exists { if dc.passesFilter(filter, node) { filtered = append(filtered, role) } } else { // No filter defined, allow by default filtered = append(filtered, role) } } return filtered } func (dc *DistributionCoordinator) passesFilter(filter *RoleFilter, node *slurpContext.ContextNode) bool { // Apply filter rules for _, rule := range filter.FilterRules { if dc.ruleMatches(rule, node) { switch rule.Action { case FilterActionDeny: return false case FilterActionAllow: return true } } } return true // Default allow if no rules match } func (dc *DistributionCoordinator) ruleMatches(rule *FilterRule, node *slurpContext.ContextNode) bool { switch rule.RuleType { case FilterRuleTypeTag: for _, tag := range node.Tags { if tag == rule.Pattern { return true } } case FilterRuleTypePath: return node.Path == rule.Pattern case FilterRuleTypeTechnology: for _, tech := range node.Technologies { if tech == rule.Pattern { return true } } } return false } func (dc *DistributionCoordinator) executeDistribution(ctx context.Context, request *DistributionRequest) (*DistributionResult, error) { start := time.Now() result := &DistributionResult{ RequestID: request.RequestID, Success: false, DistributedNodes: []string{}, ProcessingTime: 0, Errors: []string{}, CompletedAt: time.Now(), } // Execute distribution via distributor if err := dc.distributor.DistributeContext(ctx, request.ContextNode, request.TargetRoles); err != nil { result.Errors = append(result.Errors, err.Error()) return result, err } result.Success = true result.ProcessingTime = time.Since(start) result.ReplicationFactor = request.Options.ReplicationFactor return result, nil } // Placeholder implementations for supporting types and methods // ClusterHealth represents overall cluster health type ClusterHealth struct { OverallStatus HealthStatus `json:"overall_status"` NodeCount int `json:"node_count"` HealthyNodes int `json:"healthy_nodes"` UnhealthyNodes int `json:"unhealthy_nodes"` ComponentHealth map[string]*ComponentHealth `json:"component_health"` LastUpdated time.Time `json:"last_updated"` Alerts []string `json:"alerts"` Recommendations []string `json:"recommendations"` } // ComponentHealth represents individual component health type ComponentHealth struct { ComponentType ComponentType `json:"component_type"` Status HealthStatus `json:"status"` HealthScore float64 `json:"health_score"` LastCheck time.Time `json:"last_check"` Metrics map[string]interface{} `json:"metrics"` } // Placeholder methods - these would have full implementations func (dc *DistributionCoordinator) generateRequestID() string { return fmt.Sprintf("req-%s-%d", dc.config.Agent.ID, time.Now().UnixNano()) } func (dc *DistributionCoordinator) generateTaskID() string { return fmt.Sprintf("task-%s-%d", dc.config.Agent.ID, time.Now().UnixNano()) } func (dc *DistributionCoordinator) getDefaultDistributionOptions() *DistributionOptions { return &DistributionOptions{ ReplicationFactor: 3, ConsistencyLevel: ConsistencyEventual, EncryptionLevel: crypto.AccessLevel(slurpContext.AccessMedium), ConflictResolution: ResolutionMerged, } } func (dc *DistributionCoordinator) getAccessLevelForRole(role string) crypto.AccessLevel { // Placeholder implementation return crypto.AccessLevel(slurpContext.AccessMedium) } func (dc *DistributionCoordinator) getAllowedCompartments(role string) []string { // Placeholder implementation return []string{"general"} } func (dc *DistributionCoordinator) getDefaultFilterRules(role string) []*FilterRule { // Placeholder implementation return []*FilterRule{} } func (dc *DistributionCoordinator) submitTask(task *CoordinationTask) error { select { case dc.coordinationTasks <- task: return nil default: return fmt.Errorf("coordination task queue is full") } } func (dc *DistributionCoordinator) processCoordinationTask(task *CoordinationTask) { // Placeholder implementation } func (dc *DistributionCoordinator) executeDistributionRequest(ctx context.Context, request *DistributionRequest) (*DistributionResult, error) { return dc.executeDistribution(ctx, request) } func (dc *DistributionCoordinator) performHealthChecks(ctx context.Context) { // Placeholder implementation } func (dc *DistributionCoordinator) checkLeadershipStatus() { // Placeholder implementation } func (dc *DistributionCoordinator) collectMetrics() { // Placeholder implementation } func (dc *DistributionCoordinator) calculateOverallHealth() HealthStatus { // Placeholder implementation return HealthHealthy } func (dc *DistributionCoordinator) updatePerformanceMetrics() { // Placeholder implementation } func (dc *DistributionCoordinator) priorityFromSeverity(severity ConflictSeverity) Priority { switch severity { case ConflictSeverityCritical: return PriorityCritical case ConflictSeverityHigh: return PriorityHigh case ConflictSeverityMedium: return PriorityNormal default: return PriorityLow } }