package leader import ( "context" "fmt" "math/rand" "sort" "sync" "time" "chorus/pkg/dht" "chorus/pkg/election" slurpContext "chorus/pkg/slurp/context" "chorus/pkg/slurp/intelligence" "chorus/pkg/slurp/storage" ) // ContextManager handles leader-only context generation duties // // This is the primary interface for managing contextual intelligence // operations that require cluster-wide coordination and can only be // performed by the elected leader node. type ContextManager interface { // RequestContextGeneration queues a context generation request // Only the leader processes these requests to prevent conflicts RequestContextGeneration(req *ContextGenerationRequest) error // RequestFromLeader allows non-leader nodes to request context from leader RequestFromLeader(req *ContextGenerationRequest) (*ContextGenerationResult, error) // GetGenerationStatus returns status of context generation operations GetGenerationStatus() (*GenerationStatus, error) // GetQueueStatus returns status of the generation queue GetQueueStatus() (*QueueStatus, error) // CancelGeneration cancels pending or active generation task CancelGeneration(taskID string) error // PrioritizeGeneration changes priority of queued generation task PrioritizeGeneration(taskID string, priority Priority) error // IsLeader returns whether this node is the current leader IsLeader() bool // WaitForLeadership blocks until this node becomes leader WaitForLeadership(ctx context.Context) error // GetLeaderInfo returns information about current leader GetLeaderInfo() (*LeaderInfo, error) // TransferLeadership initiates graceful leadership transfer TransferLeadership(ctx context.Context, targetNodeID string) error // GetManagerStats returns manager performance statistics GetManagerStats() (*ManagerStatistics, error) } // GenerationCoordinator coordinates context generation across the cluster // // Manages the distribution and coordination of context generation tasks, // ensuring efficient resource utilization and preventing duplicate work. type GenerationCoordinator interface { // CoordinateGeneration coordinates generation of context across cluster CoordinateGeneration(ctx context.Context, req *ContextGenerationRequest) (*CoordinationResult, error) // DistributeGeneration distributes generation task to appropriate node DistributeGeneration(ctx context.Context, task *GenerationTask) error // CollectGenerationResults collects results from distributed generation CollectGenerationResults(ctx context.Context, taskID string) (*GenerationResults, error) // CheckGenerationStatus checks status of distributed generation CheckGenerationStatus(ctx context.Context, taskID string) (*TaskStatus, error) // RebalanceLoad rebalances generation load across cluster nodes RebalanceLoad(ctx context.Context) (*RebalanceResult, error) // GetClusterCapacity returns current cluster generation capacity GetClusterCapacity() (*ClusterCapacity, error) // SetGenerationPolicy configures generation coordination policy SetGenerationPolicy(policy *GenerationPolicy) error // GetCoordinationStats returns coordination performance statistics GetCoordinationStats() (*CoordinationStatistics, error) } // QueueManager manages context generation request queues // // Handles prioritization, scheduling, and lifecycle management of // context generation requests with support for different priority // levels and fair resource allocation. type QueueManager interface { // EnqueueRequest adds request to generation queue EnqueueRequest(req *ContextGenerationRequest) error // DequeueRequest gets next request from queue DequeueRequest() (*ContextGenerationRequest, error) // PeekQueue shows next request without removing it PeekQueue() (*ContextGenerationRequest, error) // UpdateRequestPriority changes priority of queued request UpdateRequestPriority(requestID string, priority Priority) error // CancelRequest removes request from queue CancelRequest(requestID string) error // GetQueueLength returns current queue length GetQueueLength() int // GetQueuedRequests returns all queued requests GetQueuedRequests() ([]*ContextGenerationRequest, error) // ClearQueue removes all requests from queue ClearQueue() error // SetQueuePolicy configures queue management policy SetQueuePolicy(policy *QueuePolicy) error // GetQueueStats returns queue performance statistics GetQueueStats() (*QueueStatistics, error) } // FailoverManager handles leader failover and state transfer // // Ensures continuity of context generation operations during leadership // changes with minimal disruption and no loss of queued requests. type FailoverManager interface { // PrepareFailover prepares current state for potential failover PrepareFailover(ctx context.Context) (*FailoverState, error) // ExecuteFailover executes failover to become new leader ExecuteFailover(ctx context.Context, previousState *FailoverState) error // TransferState transfers leadership state to another node TransferState(ctx context.Context, targetNodeID string) error // ReceiveState receives leadership state from previous leader ReceiveState(ctx context.Context, state *FailoverState) error // ValidateState validates received failover state ValidateState(state *FailoverState) (*StateValidation, error) // RecoverFromFailover recovers operations after failover RecoverFromFailover(ctx context.Context) (*RecoveryResult, error) // GetFailoverHistory returns history of failover events GetFailoverHistory() ([]*FailoverEvent, error) // GetFailoverStats returns failover statistics GetFailoverStats() (*FailoverStatistics, error) } // ClusterCoordinator manages cluster-wide context operations // // Coordinates context-related operations across all nodes in the cluster, // including synchronization, health monitoring, and resource management. type ClusterCoordinator interface { // SynchronizeCluster synchronizes context state across cluster SynchronizeCluster(ctx context.Context) (*SyncResult, error) // GetClusterState returns current cluster state GetClusterState() (*ClusterState, error) // GetNodeHealth returns health status of cluster nodes GetNodeHealth() (map[string]*NodeHealth, error) // EvictNode removes unresponsive node from cluster operations EvictNode(ctx context.Context, nodeID string) error // AddNode adds new node to cluster operations AddNode(ctx context.Context, nodeID string, nodeInfo *NodeInfo) error // BroadcastMessage broadcasts message to all cluster nodes BroadcastMessage(ctx context.Context, message *ClusterMessage) error // GetClusterMetrics returns cluster performance metrics GetClusterMetrics() (*ClusterMetrics, error) // ConfigureCluster configures cluster coordination parameters ConfigureCluster(config *ClusterConfig) error } // HealthMonitor monitors cluster and context system health // // Provides health monitoring for the distributed context system, // including node health, queue health, and overall system status. type HealthMonitor interface { // CheckHealth performs comprehensive health check CheckHealth(ctx context.Context) (*HealthStatus, error) // CheckNodeHealth checks health of specific node CheckNodeHealth(ctx context.Context, nodeID string) (*NodeHealth, error) // CheckQueueHealth checks health of generation queue CheckQueueHealth() (*QueueHealth, error) // CheckLeaderHealth checks health of leader node CheckLeaderHealth() (*LeaderHealth, error) // GetHealthMetrics returns health monitoring metrics GetHealthMetrics() (*HealthMetrics, error) // SetHealthPolicy configures health monitoring policy SetHealthPolicy(policy *HealthPolicy) error // GetHealthHistory returns history of health events GetHealthHistory(timeRange time.Duration) ([]*HealthEvent, error) // SubscribeToHealthEvents subscribes to health event notifications SubscribeToHealthEvents(handler HealthEventHandler) error } // ResourceManager manages resource allocation for context operations type ResourceManager interface { // AllocateResources allocates resources for context generation AllocateResources(req *ResourceRequest) (*ResourceAllocation, error) // ReleaseResources releases allocated resources ReleaseResources(allocationID string) error // GetAvailableResources returns currently available resources GetAvailableResources() (*AvailableResources, error) // SetResourceLimits configures resource usage limits SetResourceLimits(limits *ResourceLimits) error // GetResourceUsage returns current resource usage statistics GetResourceUsage() (*ResourceUsage, error) // RebalanceResources rebalances resources across operations RebalanceResources(ctx context.Context) (*ResourceRebalanceResult, error) } // LeaderContextManager is the concrete implementation of context management type LeaderContextManager struct { mu sync.RWMutex isLeader bool election election.Election dht dht.DHT intelligence intelligence.IntelligenceEngine storage storage.ContextStore contextResolver slurpContext.ContextResolver contextUpserter slurp.ContextPersister // Context generation state generationQueue chan *ContextGenerationRequest activeJobs map[string]*ContextGenerationJob completedJobs map[string]*ContextGenerationJob // Coordination components coordinator GenerationCoordinator queueManager QueueManager failoverManager FailoverManager clusterCoord ClusterCoordinator healthMonitor HealthMonitor resourceManager ResourceManager // Configuration config *ManagerConfig // Statistics stats *ManagerStatistics // Shutdown coordination shutdownChan chan struct{} shutdownOnce sync.Once } // SetContextPersister registers the SLURP persistence hook (Roadmap: SEC-SLURP 1.1). func (cm *LeaderContextManager) SetContextPersister(persister slurp.ContextPersister) { cm.mu.Lock() defer cm.mu.Unlock() cm.contextUpserter = persister } // NewContextManager creates a new leader context manager func NewContextManager( election election.Election, dht dht.DHT, intelligence intelligence.IntelligenceEngine, storage storage.ContextStore, resolver slurpContext.ContextResolver, ) *LeaderContextManager { cm := &LeaderContextManager{ election: election, dht: dht, intelligence: intelligence, storage: storage, contextResolver: resolver, generationQueue: make(chan *ContextGenerationRequest, 1000), activeJobs: make(map[string]*ContextGenerationJob), completedJobs: make(map[string]*ContextGenerationJob), shutdownChan: make(chan struct{}), config: DefaultManagerConfig(), stats: &ManagerStatistics{}, } // Initialize coordination components cm.coordinator = NewGenerationCoordinator(cm) cm.queueManager = NewQueueManager(cm) cm.failoverManager = NewFailoverManager(cm) cm.clusterCoord = NewClusterCoordinator(cm) cm.healthMonitor = NewHealthMonitor(cm) cm.resourceManager = NewResourceManager(cm) // Start background processes go cm.watchLeadershipChanges() go cm.processContextGeneration() go cm.monitorHealth() go cm.syncCluster() return cm } // RequestContextGeneration queues a context generation request func (cm *LeaderContextManager) RequestContextGeneration(req *ContextGenerationRequest) error { if !cm.IsLeader() { return ErrNotLeader } // Validate request if err := cm.validateRequest(req); err != nil { return err } // Check for duplicates if cm.isDuplicate(req) { return ErrDuplicateRequest } // Enqueue request select { case cm.generationQueue <- req: cm.stats.TotalRequests++ return nil default: cm.stats.DroppedRequests++ return ErrQueueFull } } // IsLeader returns whether this node is the current leader func (cm *LeaderContextManager) IsLeader() bool { cm.mu.RLock() defer cm.mu.RUnlock() return cm.isLeader } // GetGenerationStatus returns status of context generation operations func (cm *LeaderContextManager) GetGenerationStatus() (*GenerationStatus, error) { cm.mu.RLock() defer cm.mu.RUnlock() status := &GenerationStatus{ ActiveTasks: len(cm.activeJobs), QueuedTasks: len(cm.generationQueue), CompletedTasks: len(cm.completedJobs), IsLeader: cm.isLeader, LastUpdate: time.Now(), } // Calculate estimated completion time if status.ActiveTasks > 0 || status.QueuedTasks > 0 { avgJobTime := cm.calculateAverageJobTime() totalRemaining := time.Duration(status.ActiveTasks+status.QueuedTasks) * avgJobTime status.EstimatedCompletion = time.Now().Add(totalRemaining) } return status, nil } // watchLeadershipChanges monitors leadership changes func (cm *LeaderContextManager) watchLeadershipChanges() { for { select { case <-cm.shutdownChan: return default: // Check leadership status newIsLeader := cm.election.IsLeader() cm.mu.Lock() oldIsLeader := cm.isLeader cm.isLeader = newIsLeader cm.mu.Unlock() // Handle leadership change if oldIsLeader != newIsLeader { if newIsLeader { cm.onBecomeLeader() } else { cm.onLoseLeadership() } } // Sleep before next check time.Sleep(cm.config.LeadershipCheckInterval) } } } // processContextGeneration processes context generation requests func (cm *LeaderContextManager) processContextGeneration() { for { select { case req := <-cm.generationQueue: if cm.IsLeader() { go cm.handleGenerationRequest(req) } else { // Not leader anymore, requeue or forward to leader cm.handleNonLeaderRequest(req) } case <-cm.shutdownChan: return } } } // handleGenerationRequest handles a single context generation request func (cm *LeaderContextManager) handleGenerationRequest(req *ContextGenerationRequest) { job := &ContextGenerationJob{ ID: generateJobID(), Request: req, Status: JobStatusRunning, StartedAt: time.Now(), } cm.mu.Lock() cm.activeJobs[job.ID] = job cm.mu.Unlock() defer func() { cm.mu.Lock() delete(cm.activeJobs, job.ID) cm.completedJobs[job.ID] = job cm.mu.Unlock() // Clean up old completed jobs cm.cleanupCompletedJobs() }() // Generate context using intelligence engine contextNode, err := cm.intelligence.AnalyzeFile( context.Background(), req.FilePath, req.Role, ) completedAt := time.Now() job.CompletedAt = &completedAt if err != nil { job.Status = JobStatusFailed job.Error = err cm.stats.FailedJobs++ } else { job.Status = JobStatusCompleted job.Result = contextNode cm.stats.CompletedJobs++ // Store generated context (SEC-SLURP 1.1 persistence bridge) if cm.contextUpserter != nil { if _, persistErr := cm.contextUpserter.UpsertContext(context.Background(), contextNode); persistErr != nil { // TODO(SEC-SLURP 1.1): surface persistence errors via structured logging/telemetry } } else if cm.storage != nil { if err := cm.storage.StoreContext(context.Background(), contextNode, []string{req.Role}); err != nil { // TODO: Add proper logging when falling back to legacy storage path } } } } // Helper methods func (cm *LeaderContextManager) validateRequest(req *ContextGenerationRequest) error { if req == nil { return ErrInvalidRequest } if req.UCXLAddress == "" { return ErrMissingUCXLAddress } if req.FilePath == "" { return ErrMissingFilePath } if req.Role == "" { return ErrMissingRole } return nil } func (cm *LeaderContextManager) isDuplicate(req *ContextGenerationRequest) bool { // Check active jobs for _, job := range cm.activeJobs { if job.Request.UCXLAddress == req.UCXLAddress && job.Request.Role == req.Role { return true } } return false } func (cm *LeaderContextManager) calculateAverageJobTime() time.Duration { if len(cm.completedJobs) == 0 { return time.Minute // Default estimate } var totalTime time.Duration count := 0 for _, job := range cm.completedJobs { if job.CompletedAt != nil { totalTime += job.CompletedAt.Sub(job.StartedAt) count++ } } if count == 0 { return time.Minute } return totalTime / time.Duration(count) } // calculateAverageWaitTime calculates average wait time for requests func (cm *LeaderContextManager) calculateAverageWaitTime() time.Duration { // TODO: Track actual wait times for requests // For now, estimate based on queue length and processing rate queueLength := len(cm.generationQueue) if queueLength == 0 { return 0 } avgJobTime := cm.calculateAverageJobTime() concurrency := cm.config.MaxConcurrentJobs // Estimate wait time based on queue position and processing capacity estimatedWait := time.Duration(queueLength/concurrency) * avgJobTime return estimatedWait } // GetQueueStatus returns status of the generation queue func (cm *LeaderContextManager) GetQueueStatus() (*QueueStatus, error) { cm.mu.RLock() defer cm.mu.RUnlock() status := &QueueStatus{ QueueLength: len(cm.generationQueue), MaxQueueSize: cm.config.QueueSize, QueuedRequests: []*ContextGenerationRequest{}, PriorityDistribution: make(map[Priority]int), AverageWaitTime: cm.calculateAverageWaitTime(), } // Get oldest request time if any if len(cm.generationQueue) > 0 { // Peek at queue without draining oldest := time.Now() status.OldestRequest = &oldest } return status, nil } // CancelGeneration cancels pending or active generation task func (cm *LeaderContextManager) CancelGeneration(taskID string) error { cm.mu.Lock() defer cm.mu.Unlock() // Check if task is active if job, exists := cm.activeJobs[taskID]; exists { job.Status = JobStatusCancelled job.Error = fmt.Errorf("task cancelled by user") completedAt := time.Now() job.CompletedAt = &completedAt delete(cm.activeJobs, taskID) cm.completedJobs[taskID] = job cm.stats.CancelledJobs++ return nil } // TODO: Remove from queue if pending return fmt.Errorf("task %s not found", taskID) } // PrioritizeGeneration changes priority of queued generation task func (cm *LeaderContextManager) PrioritizeGeneration(taskID string, priority Priority) error { // TODO: Implement priority change for queued tasks return fmt.Errorf("priority change not implemented") } // GetManagerStats returns manager performance statistics func (cm *LeaderContextManager) GetManagerStats() (*ManagerStatistics, error) { cm.mu.RLock() defer cm.mu.RUnlock() stats := *cm.stats // Copy current stats stats.AverageJobTime = cm.calculateAverageJobTime() stats.HighestQueueLength = len(cm.generationQueue) return &stats, nil } func (cm *LeaderContextManager) onBecomeLeader() { // Initialize leader-specific state cm.stats.LeadershipChanges++ cm.stats.LastBecameLeader = time.Now() // Recover any pending state from previous leader if err := cm.failoverManager.RecoverFromFailover(context.Background()); err != nil { // Log error but continue - we're the leader now // TODO: Add proper logging } } func (cm *LeaderContextManager) onLoseLeadership() { // Prepare state for transfer if state, err := cm.failoverManager.PrepareFailover(context.Background()); err == nil { // TODO: Send state to new leader _ = state } cm.stats.LastLostLeadership = time.Now() } func (cm *LeaderContextManager) handleNonLeaderRequest(req *ContextGenerationRequest) { // Forward request to current leader or queue for later // TODO: Implement leader forwarding } func (cm *LeaderContextManager) monitorHealth() { ticker := time.NewTicker(cm.config.HealthCheckInterval) defer ticker.Stop() for { select { case <-ticker.C: if _, err := cm.healthMonitor.CheckHealth(context.Background()); err != nil { // Handle health issues // TODO: Implement health issue handling } case <-cm.shutdownChan: return } } } func (cm *LeaderContextManager) syncCluster() { ticker := time.NewTicker(cm.config.ClusterSyncInterval) defer ticker.Stop() for { select { case <-ticker.C: if cm.IsLeader() { if _, err := cm.clusterCoord.SynchronizeCluster(context.Background()); err != nil { // Handle sync errors // TODO: Implement sync error handling } } case <-cm.shutdownChan: return } } } func (cm *LeaderContextManager) cleanupCompletedJobs() { cm.mu.Lock() defer cm.mu.Unlock() if len(cm.completedJobs) <= cm.config.MaxCompletedJobs { return } // Remove oldest completed jobs based on completion time type jobWithTime struct { id string job *ContextGenerationJob time time.Time } var jobs []jobWithTime for id, job := range cm.completedJobs { completedAt := time.Now() if job.CompletedAt != nil { completedAt = *job.CompletedAt } jobs = append(jobs, jobWithTime{id: id, job: job, time: completedAt}) } // Sort by completion time (oldest first) sort.Slice(jobs, func(i, j int) bool { return jobs[i].time.Before(jobs[j].time) }) // Remove oldest jobs to get back to limit toRemove := len(jobs) - cm.config.MaxCompletedJobs for i := 0; i < toRemove; i++ { delete(cm.completedJobs, jobs[i].id) } } func generateJobID() string { // Generate UUID-like job ID with timestamp timestamp := time.Now().Unix() random := rand.Int63() return fmt.Sprintf("ctx-job-%d-%x", timestamp, random&0xFFFFFF) } // Error definitions var ( ErrNotLeader = &LeaderError{Code: "NOT_LEADER", Message: "Node is not the leader"} ErrQueueFull = &LeaderError{Code: "QUEUE_FULL", Message: "Generation queue is full"} ErrDuplicateRequest = &LeaderError{Code: "DUPLICATE_REQUEST", Message: "Duplicate generation request"} ErrInvalidRequest = &LeaderError{Code: "INVALID_REQUEST", Message: "Invalid generation request"} ErrMissingUCXLAddress = &LeaderError{Code: "MISSING_UCXL_ADDRESS", Message: "Missing UCXL address"} ErrMissingFilePath = &LeaderError{Code: "MISSING_FILE_PATH", Message: "Missing file path"} ErrMissingRole = &LeaderError{Code: "MISSING_ROLE", Message: "Missing role"} ) // LeaderError represents errors specific to leader operations type LeaderError struct { Code string `json:"code"` Message string `json:"message"` } func (e *LeaderError) Error() string { return e.Message } // DefaultManagerConfig returns default manager configuration func DefaultManagerConfig() *ManagerConfig { return &ManagerConfig{ LeadershipCheckInterval: 5 * time.Second, HealthCheckInterval: 30 * time.Second, ClusterSyncInterval: 60 * time.Second, MaxCompletedJobs: 1000, QueueSize: 10000, MaxConcurrentJobs: 10, JobTimeout: 10 * time.Minute, } }