Major integrations and fixes: - Added BACKBEAT SDK integration for P2P operation timing - Implemented beat-aware status tracking for distributed operations - Added Docker secrets support for secure license management - Resolved KACHING license validation via HTTPS/TLS - Updated docker-compose configuration for clean stack deployment - Disabled rollback policies to prevent deployment failures - Added license credential storage (CHORUS-DEV-MULTI-001) Technical improvements: - BACKBEAT P2P operation tracking with phase management - Enhanced configuration system with file-based secrets - Improved error handling for license validation - Clean separation of KACHING and CHORUS deployment stacks 🤖 Generated with [Claude Code](https://claude.ai/code) Co-Authored-By: Claude <noreply@anthropic.com>
734 lines
22 KiB
Go
734 lines
22 KiB
Go
package leader
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"math/rand"
|
|
"sort"
|
|
"sync"
|
|
"time"
|
|
|
|
"chorus/pkg/election"
|
|
"chorus/pkg/dht"
|
|
"chorus/pkg/ucxl"
|
|
"chorus/pkg/slurp/intelligence"
|
|
"chorus/pkg/slurp/storage"
|
|
slurpContext "chorus/pkg/slurp/context"
|
|
)
|
|
|
|
// ContextManager handles leader-only context generation duties
|
|
//
|
|
// This is the primary interface for managing contextual intelligence
|
|
// operations that require cluster-wide coordination and can only be
|
|
// performed by the elected leader node.
|
|
type ContextManager interface {
|
|
// RequestContextGeneration queues a context generation request
|
|
// Only the leader processes these requests to prevent conflicts
|
|
RequestContextGeneration(req *ContextGenerationRequest) error
|
|
|
|
// RequestFromLeader allows non-leader nodes to request context from leader
|
|
RequestFromLeader(req *ContextGenerationRequest) (*ContextGenerationResult, error)
|
|
|
|
// GetGenerationStatus returns status of context generation operations
|
|
GetGenerationStatus() (*GenerationStatus, error)
|
|
|
|
// GetQueueStatus returns status of the generation queue
|
|
GetQueueStatus() (*QueueStatus, error)
|
|
|
|
// CancelGeneration cancels pending or active generation task
|
|
CancelGeneration(taskID string) error
|
|
|
|
// PrioritizeGeneration changes priority of queued generation task
|
|
PrioritizeGeneration(taskID string, priority Priority) error
|
|
|
|
// IsLeader returns whether this node is the current leader
|
|
IsLeader() bool
|
|
|
|
// WaitForLeadership blocks until this node becomes leader
|
|
WaitForLeadership(ctx context.Context) error
|
|
|
|
// GetLeaderInfo returns information about current leader
|
|
GetLeaderInfo() (*LeaderInfo, error)
|
|
|
|
// TransferLeadership initiates graceful leadership transfer
|
|
TransferLeadership(ctx context.Context, targetNodeID string) error
|
|
|
|
// GetManagerStats returns manager performance statistics
|
|
GetManagerStats() (*ManagerStatistics, error)
|
|
}
|
|
|
|
// GenerationCoordinator coordinates context generation across the cluster
|
|
//
|
|
// Manages the distribution and coordination of context generation tasks,
|
|
// ensuring efficient resource utilization and preventing duplicate work.
|
|
type GenerationCoordinator interface {
|
|
// CoordinateGeneration coordinates generation of context across cluster
|
|
CoordinateGeneration(ctx context.Context, req *ContextGenerationRequest) (*CoordinationResult, error)
|
|
|
|
// DistributeGeneration distributes generation task to appropriate node
|
|
DistributeGeneration(ctx context.Context, task *GenerationTask) error
|
|
|
|
// CollectGenerationResults collects results from distributed generation
|
|
CollectGenerationResults(ctx context.Context, taskID string) (*GenerationResults, error)
|
|
|
|
// CheckGenerationStatus checks status of distributed generation
|
|
CheckGenerationStatus(ctx context.Context, taskID string) (*TaskStatus, error)
|
|
|
|
// RebalanceLoad rebalances generation load across cluster nodes
|
|
RebalanceLoad(ctx context.Context) (*RebalanceResult, error)
|
|
|
|
// GetClusterCapacity returns current cluster generation capacity
|
|
GetClusterCapacity() (*ClusterCapacity, error)
|
|
|
|
// SetGenerationPolicy configures generation coordination policy
|
|
SetGenerationPolicy(policy *GenerationPolicy) error
|
|
|
|
// GetCoordinationStats returns coordination performance statistics
|
|
GetCoordinationStats() (*CoordinationStatistics, error)
|
|
}
|
|
|
|
// QueueManager manages context generation request queues
|
|
//
|
|
// Handles prioritization, scheduling, and lifecycle management of
|
|
// context generation requests with support for different priority
|
|
// levels and fair resource allocation.
|
|
type QueueManager interface {
|
|
// EnqueueRequest adds request to generation queue
|
|
EnqueueRequest(req *ContextGenerationRequest) error
|
|
|
|
// DequeueRequest gets next request from queue
|
|
DequeueRequest() (*ContextGenerationRequest, error)
|
|
|
|
// PeekQueue shows next request without removing it
|
|
PeekQueue() (*ContextGenerationRequest, error)
|
|
|
|
// UpdateRequestPriority changes priority of queued request
|
|
UpdateRequestPriority(requestID string, priority Priority) error
|
|
|
|
// CancelRequest removes request from queue
|
|
CancelRequest(requestID string) error
|
|
|
|
// GetQueueLength returns current queue length
|
|
GetQueueLength() int
|
|
|
|
// GetQueuedRequests returns all queued requests
|
|
GetQueuedRequests() ([]*ContextGenerationRequest, error)
|
|
|
|
// ClearQueue removes all requests from queue
|
|
ClearQueue() error
|
|
|
|
// SetQueuePolicy configures queue management policy
|
|
SetQueuePolicy(policy *QueuePolicy) error
|
|
|
|
// GetQueueStats returns queue performance statistics
|
|
GetQueueStats() (*QueueStatistics, error)
|
|
}
|
|
|
|
// FailoverManager handles leader failover and state transfer
|
|
//
|
|
// Ensures continuity of context generation operations during leadership
|
|
// changes with minimal disruption and no loss of queued requests.
|
|
type FailoverManager interface {
|
|
// PrepareFailover prepares current state for potential failover
|
|
PrepareFailover(ctx context.Context) (*FailoverState, error)
|
|
|
|
// ExecuteFailover executes failover to become new leader
|
|
ExecuteFailover(ctx context.Context, previousState *FailoverState) error
|
|
|
|
// TransferState transfers leadership state to another node
|
|
TransferState(ctx context.Context, targetNodeID string) error
|
|
|
|
// ReceiveState receives leadership state from previous leader
|
|
ReceiveState(ctx context.Context, state *FailoverState) error
|
|
|
|
// ValidateState validates received failover state
|
|
ValidateState(state *FailoverState) (*StateValidation, error)
|
|
|
|
// RecoverFromFailover recovers operations after failover
|
|
RecoverFromFailover(ctx context.Context) (*RecoveryResult, error)
|
|
|
|
// GetFailoverHistory returns history of failover events
|
|
GetFailoverHistory() ([]*FailoverEvent, error)
|
|
|
|
// GetFailoverStats returns failover statistics
|
|
GetFailoverStats() (*FailoverStatistics, error)
|
|
}
|
|
|
|
// ClusterCoordinator manages cluster-wide context operations
|
|
//
|
|
// Coordinates context-related operations across all nodes in the cluster,
|
|
// including synchronization, health monitoring, and resource management.
|
|
type ClusterCoordinator interface {
|
|
// SynchronizeCluster synchronizes context state across cluster
|
|
SynchronizeCluster(ctx context.Context) (*SyncResult, error)
|
|
|
|
// GetClusterState returns current cluster state
|
|
GetClusterState() (*ClusterState, error)
|
|
|
|
// GetNodeHealth returns health status of cluster nodes
|
|
GetNodeHealth() (map[string]*NodeHealth, error)
|
|
|
|
// EvictNode removes unresponsive node from cluster operations
|
|
EvictNode(ctx context.Context, nodeID string) error
|
|
|
|
// AddNode adds new node to cluster operations
|
|
AddNode(ctx context.Context, nodeID string, nodeInfo *NodeInfo) error
|
|
|
|
// BroadcastMessage broadcasts message to all cluster nodes
|
|
BroadcastMessage(ctx context.Context, message *ClusterMessage) error
|
|
|
|
// GetClusterMetrics returns cluster performance metrics
|
|
GetClusterMetrics() (*ClusterMetrics, error)
|
|
|
|
// ConfigureCluster configures cluster coordination parameters
|
|
ConfigureCluster(config *ClusterConfig) error
|
|
}
|
|
|
|
// HealthMonitor monitors cluster and context system health
|
|
//
|
|
// Provides health monitoring for the distributed context system,
|
|
// including node health, queue health, and overall system status.
|
|
type HealthMonitor interface {
|
|
// CheckHealth performs comprehensive health check
|
|
CheckHealth(ctx context.Context) (*HealthStatus, error)
|
|
|
|
// CheckNodeHealth checks health of specific node
|
|
CheckNodeHealth(ctx context.Context, nodeID string) (*NodeHealth, error)
|
|
|
|
// CheckQueueHealth checks health of generation queue
|
|
CheckQueueHealth() (*QueueHealth, error)
|
|
|
|
// CheckLeaderHealth checks health of leader node
|
|
CheckLeaderHealth() (*LeaderHealth, error)
|
|
|
|
// GetHealthMetrics returns health monitoring metrics
|
|
GetHealthMetrics() (*HealthMetrics, error)
|
|
|
|
// SetHealthPolicy configures health monitoring policy
|
|
SetHealthPolicy(policy *HealthPolicy) error
|
|
|
|
// GetHealthHistory returns history of health events
|
|
GetHealthHistory(timeRange time.Duration) ([]*HealthEvent, error)
|
|
|
|
// SubscribeToHealthEvents subscribes to health event notifications
|
|
SubscribeToHealthEvents(handler HealthEventHandler) error
|
|
}
|
|
|
|
// ResourceManager manages resource allocation for context operations
|
|
type ResourceManager interface {
|
|
// AllocateResources allocates resources for context generation
|
|
AllocateResources(req *ResourceRequest) (*ResourceAllocation, error)
|
|
|
|
// ReleaseResources releases allocated resources
|
|
ReleaseResources(allocationID string) error
|
|
|
|
// GetAvailableResources returns currently available resources
|
|
GetAvailableResources() (*AvailableResources, error)
|
|
|
|
// SetResourceLimits configures resource usage limits
|
|
SetResourceLimits(limits *ResourceLimits) error
|
|
|
|
// GetResourceUsage returns current resource usage statistics
|
|
GetResourceUsage() (*ResourceUsage, error)
|
|
|
|
// RebalanceResources rebalances resources across operations
|
|
RebalanceResources(ctx context.Context) (*ResourceRebalanceResult, error)
|
|
}
|
|
|
|
// LeaderContextManager is the concrete implementation of context management
|
|
type LeaderContextManager struct {
|
|
mu sync.RWMutex
|
|
isLeader bool
|
|
election election.Election
|
|
dht dht.DHT
|
|
intelligence intelligence.IntelligenceEngine
|
|
storage storage.ContextStore
|
|
contextResolver slurpContext.ContextResolver
|
|
|
|
// Context generation state
|
|
generationQueue chan *ContextGenerationRequest
|
|
activeJobs map[string]*ContextGenerationJob
|
|
completedJobs map[string]*ContextGenerationJob
|
|
|
|
// Coordination components
|
|
coordinator GenerationCoordinator
|
|
queueManager QueueManager
|
|
failoverManager FailoverManager
|
|
clusterCoord ClusterCoordinator
|
|
healthMonitor HealthMonitor
|
|
resourceManager ResourceManager
|
|
|
|
// Configuration
|
|
config *ManagerConfig
|
|
|
|
// Statistics
|
|
stats *ManagerStatistics
|
|
|
|
// Shutdown coordination
|
|
shutdownChan chan struct{}
|
|
shutdownOnce sync.Once
|
|
}
|
|
|
|
// NewContextManager creates a new leader context manager
|
|
func NewContextManager(
|
|
election election.Election,
|
|
dht dht.DHT,
|
|
intelligence intelligence.IntelligenceEngine,
|
|
storage storage.ContextStore,
|
|
resolver slurpContext.ContextResolver,
|
|
) *LeaderContextManager {
|
|
cm := &LeaderContextManager{
|
|
election: election,
|
|
dht: dht,
|
|
intelligence: intelligence,
|
|
storage: storage,
|
|
contextResolver: resolver,
|
|
generationQueue: make(chan *ContextGenerationRequest, 1000),
|
|
activeJobs: make(map[string]*ContextGenerationJob),
|
|
completedJobs: make(map[string]*ContextGenerationJob),
|
|
shutdownChan: make(chan struct{}),
|
|
config: DefaultManagerConfig(),
|
|
stats: &ManagerStatistics{},
|
|
}
|
|
|
|
// Initialize coordination components
|
|
cm.coordinator = NewGenerationCoordinator(cm)
|
|
cm.queueManager = NewQueueManager(cm)
|
|
cm.failoverManager = NewFailoverManager(cm)
|
|
cm.clusterCoord = NewClusterCoordinator(cm)
|
|
cm.healthMonitor = NewHealthMonitor(cm)
|
|
cm.resourceManager = NewResourceManager(cm)
|
|
|
|
// Start background processes
|
|
go cm.watchLeadershipChanges()
|
|
go cm.processContextGeneration()
|
|
go cm.monitorHealth()
|
|
go cm.syncCluster()
|
|
|
|
return cm
|
|
}
|
|
|
|
// RequestContextGeneration queues a context generation request
|
|
func (cm *LeaderContextManager) RequestContextGeneration(req *ContextGenerationRequest) error {
|
|
if !cm.IsLeader() {
|
|
return ErrNotLeader
|
|
}
|
|
|
|
// Validate request
|
|
if err := cm.validateRequest(req); err != nil {
|
|
return err
|
|
}
|
|
|
|
// Check for duplicates
|
|
if cm.isDuplicate(req) {
|
|
return ErrDuplicateRequest
|
|
}
|
|
|
|
// Enqueue request
|
|
select {
|
|
case cm.generationQueue <- req:
|
|
cm.stats.TotalRequests++
|
|
return nil
|
|
default:
|
|
cm.stats.DroppedRequests++
|
|
return ErrQueueFull
|
|
}
|
|
}
|
|
|
|
// IsLeader returns whether this node is the current leader
|
|
func (cm *LeaderContextManager) IsLeader() bool {
|
|
cm.mu.RLock()
|
|
defer cm.mu.RUnlock()
|
|
return cm.isLeader
|
|
}
|
|
|
|
// GetGenerationStatus returns status of context generation operations
|
|
func (cm *LeaderContextManager) GetGenerationStatus() (*GenerationStatus, error) {
|
|
cm.mu.RLock()
|
|
defer cm.mu.RUnlock()
|
|
|
|
status := &GenerationStatus{
|
|
ActiveTasks: len(cm.activeJobs),
|
|
QueuedTasks: len(cm.generationQueue),
|
|
CompletedTasks: len(cm.completedJobs),
|
|
IsLeader: cm.isLeader,
|
|
LastUpdate: time.Now(),
|
|
}
|
|
|
|
// Calculate estimated completion time
|
|
if status.ActiveTasks > 0 || status.QueuedTasks > 0 {
|
|
avgJobTime := cm.calculateAverageJobTime()
|
|
totalRemaining := time.Duration(status.ActiveTasks+status.QueuedTasks) * avgJobTime
|
|
status.EstimatedCompletion = time.Now().Add(totalRemaining)
|
|
}
|
|
|
|
return status, nil
|
|
}
|
|
|
|
// watchLeadershipChanges monitors leadership changes
|
|
func (cm *LeaderContextManager) watchLeadershipChanges() {
|
|
for {
|
|
select {
|
|
case <-cm.shutdownChan:
|
|
return
|
|
default:
|
|
// Check leadership status
|
|
newIsLeader := cm.election.IsLeader()
|
|
|
|
cm.mu.Lock()
|
|
oldIsLeader := cm.isLeader
|
|
cm.isLeader = newIsLeader
|
|
cm.mu.Unlock()
|
|
|
|
// Handle leadership change
|
|
if oldIsLeader != newIsLeader {
|
|
if newIsLeader {
|
|
cm.onBecomeLeader()
|
|
} else {
|
|
cm.onLoseLeadership()
|
|
}
|
|
}
|
|
|
|
// Sleep before next check
|
|
time.Sleep(cm.config.LeadershipCheckInterval)
|
|
}
|
|
}
|
|
}
|
|
|
|
// processContextGeneration processes context generation requests
|
|
func (cm *LeaderContextManager) processContextGeneration() {
|
|
for {
|
|
select {
|
|
case req := <-cm.generationQueue:
|
|
if cm.IsLeader() {
|
|
go cm.handleGenerationRequest(req)
|
|
} else {
|
|
// Not leader anymore, requeue or forward to leader
|
|
cm.handleNonLeaderRequest(req)
|
|
}
|
|
case <-cm.shutdownChan:
|
|
return
|
|
}
|
|
}
|
|
}
|
|
|
|
// handleGenerationRequest handles a single context generation request
|
|
func (cm *LeaderContextManager) handleGenerationRequest(req *ContextGenerationRequest) {
|
|
job := &ContextGenerationJob{
|
|
ID: generateJobID(),
|
|
Request: req,
|
|
Status: JobStatusRunning,
|
|
StartedAt: time.Now(),
|
|
}
|
|
|
|
cm.mu.Lock()
|
|
cm.activeJobs[job.ID] = job
|
|
cm.mu.Unlock()
|
|
|
|
defer func() {
|
|
cm.mu.Lock()
|
|
delete(cm.activeJobs, job.ID)
|
|
cm.completedJobs[job.ID] = job
|
|
cm.mu.Unlock()
|
|
|
|
// Clean up old completed jobs
|
|
cm.cleanupCompletedJobs()
|
|
}()
|
|
|
|
// Generate context using intelligence engine
|
|
contextNode, err := cm.intelligence.AnalyzeFile(
|
|
context.Background(),
|
|
req.FilePath,
|
|
req.Role,
|
|
)
|
|
|
|
completedAt := time.Now()
|
|
job.CompletedAt = &completedAt
|
|
|
|
if err != nil {
|
|
job.Status = JobStatusFailed
|
|
job.Error = err
|
|
cm.stats.FailedJobs++
|
|
} else {
|
|
job.Status = JobStatusCompleted
|
|
job.Result = contextNode
|
|
cm.stats.CompletedJobs++
|
|
|
|
// Store generated context
|
|
if err := cm.storage.StoreContext(context.Background(), contextNode, []string{req.Role}); err != nil {
|
|
// Log storage error but don't fail the job
|
|
// TODO: Add proper logging
|
|
}
|
|
}
|
|
}
|
|
|
|
// Helper methods
|
|
|
|
func (cm *LeaderContextManager) validateRequest(req *ContextGenerationRequest) error {
|
|
if req == nil {
|
|
return ErrInvalidRequest
|
|
}
|
|
if req.UCXLAddress == "" {
|
|
return ErrMissingUCXLAddress
|
|
}
|
|
if req.FilePath == "" {
|
|
return ErrMissingFilePath
|
|
}
|
|
if req.Role == "" {
|
|
return ErrMissingRole
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (cm *LeaderContextManager) isDuplicate(req *ContextGenerationRequest) bool {
|
|
// Check active jobs
|
|
for _, job := range cm.activeJobs {
|
|
if job.Request.UCXLAddress == req.UCXLAddress && job.Request.Role == req.Role {
|
|
return true
|
|
}
|
|
}
|
|
return false
|
|
}
|
|
|
|
func (cm *LeaderContextManager) calculateAverageJobTime() time.Duration {
|
|
if len(cm.completedJobs) == 0 {
|
|
return time.Minute // Default estimate
|
|
}
|
|
|
|
var totalTime time.Duration
|
|
count := 0
|
|
|
|
for _, job := range cm.completedJobs {
|
|
if job.CompletedAt != nil {
|
|
totalTime += job.CompletedAt.Sub(job.StartedAt)
|
|
count++
|
|
}
|
|
}
|
|
|
|
if count == 0 {
|
|
return time.Minute
|
|
}
|
|
|
|
return totalTime / time.Duration(count)
|
|
}
|
|
|
|
// calculateAverageWaitTime calculates average wait time for requests
|
|
func (cm *LeaderContextManager) calculateAverageWaitTime() time.Duration {
|
|
// TODO: Track actual wait times for requests
|
|
// For now, estimate based on queue length and processing rate
|
|
queueLength := len(cm.generationQueue)
|
|
if queueLength == 0 {
|
|
return 0
|
|
}
|
|
|
|
avgJobTime := cm.calculateAverageJobTime()
|
|
concurrency := cm.config.MaxConcurrentJobs
|
|
|
|
// Estimate wait time based on queue position and processing capacity
|
|
estimatedWait := time.Duration(queueLength/concurrency) * avgJobTime
|
|
return estimatedWait
|
|
}
|
|
|
|
// GetQueueStatus returns status of the generation queue
|
|
func (cm *LeaderContextManager) GetQueueStatus() (*QueueStatus, error) {
|
|
cm.mu.RLock()
|
|
defer cm.mu.RUnlock()
|
|
|
|
status := &QueueStatus{
|
|
QueueLength: len(cm.generationQueue),
|
|
MaxQueueSize: cm.config.QueueSize,
|
|
QueuedRequests: []*ContextGenerationRequest{},
|
|
PriorityDistribution: make(map[Priority]int),
|
|
AverageWaitTime: cm.calculateAverageWaitTime(),
|
|
}
|
|
|
|
// Get oldest request time if any
|
|
if len(cm.generationQueue) > 0 {
|
|
// Peek at queue without draining
|
|
oldest := time.Now()
|
|
status.OldestRequest = &oldest
|
|
}
|
|
|
|
return status, nil
|
|
}
|
|
|
|
// CancelGeneration cancels pending or active generation task
|
|
func (cm *LeaderContextManager) CancelGeneration(taskID string) error {
|
|
cm.mu.Lock()
|
|
defer cm.mu.Unlock()
|
|
|
|
// Check if task is active
|
|
if job, exists := cm.activeJobs[taskID]; exists {
|
|
job.Status = JobStatusCancelled
|
|
job.Error = fmt.Errorf("task cancelled by user")
|
|
completedAt := time.Now()
|
|
job.CompletedAt = &completedAt
|
|
|
|
delete(cm.activeJobs, taskID)
|
|
cm.completedJobs[taskID] = job
|
|
cm.stats.CancelledJobs++
|
|
|
|
return nil
|
|
}
|
|
|
|
// TODO: Remove from queue if pending
|
|
return fmt.Errorf("task %s not found", taskID)
|
|
}
|
|
|
|
// PrioritizeGeneration changes priority of queued generation task
|
|
func (cm *LeaderContextManager) PrioritizeGeneration(taskID string, priority Priority) error {
|
|
// TODO: Implement priority change for queued tasks
|
|
return fmt.Errorf("priority change not implemented")
|
|
}
|
|
|
|
// GetManagerStats returns manager performance statistics
|
|
func (cm *LeaderContextManager) GetManagerStats() (*ManagerStatistics, error) {
|
|
cm.mu.RLock()
|
|
defer cm.mu.RUnlock()
|
|
|
|
stats := *cm.stats // Copy current stats
|
|
stats.AverageJobTime = cm.calculateAverageJobTime()
|
|
stats.HighestQueueLength = len(cm.generationQueue)
|
|
|
|
return &stats, nil
|
|
}
|
|
|
|
func (cm *LeaderContextManager) onBecomeLeader() {
|
|
// Initialize leader-specific state
|
|
cm.stats.LeadershipChanges++
|
|
cm.stats.LastBecameLeader = time.Now()
|
|
|
|
// Recover any pending state from previous leader
|
|
if err := cm.failoverManager.RecoverFromFailover(context.Background()); err != nil {
|
|
// Log error but continue - we're the leader now
|
|
// TODO: Add proper logging
|
|
}
|
|
}
|
|
|
|
func (cm *LeaderContextManager) onLoseLeadership() {
|
|
// Prepare state for transfer
|
|
if state, err := cm.failoverManager.PrepareFailover(context.Background()); err == nil {
|
|
// TODO: Send state to new leader
|
|
_ = state
|
|
}
|
|
|
|
cm.stats.LastLostLeadership = time.Now()
|
|
}
|
|
|
|
func (cm *LeaderContextManager) handleNonLeaderRequest(req *ContextGenerationRequest) {
|
|
// Forward request to current leader or queue for later
|
|
// TODO: Implement leader forwarding
|
|
}
|
|
|
|
func (cm *LeaderContextManager) monitorHealth() {
|
|
ticker := time.NewTicker(cm.config.HealthCheckInterval)
|
|
defer ticker.Stop()
|
|
|
|
for {
|
|
select {
|
|
case <-ticker.C:
|
|
if _, err := cm.healthMonitor.CheckHealth(context.Background()); err != nil {
|
|
// Handle health issues
|
|
// TODO: Implement health issue handling
|
|
}
|
|
case <-cm.shutdownChan:
|
|
return
|
|
}
|
|
}
|
|
}
|
|
|
|
func (cm *LeaderContextManager) syncCluster() {
|
|
ticker := time.NewTicker(cm.config.ClusterSyncInterval)
|
|
defer ticker.Stop()
|
|
|
|
for {
|
|
select {
|
|
case <-ticker.C:
|
|
if cm.IsLeader() {
|
|
if _, err := cm.clusterCoord.SynchronizeCluster(context.Background()); err != nil {
|
|
// Handle sync errors
|
|
// TODO: Implement sync error handling
|
|
}
|
|
}
|
|
case <-cm.shutdownChan:
|
|
return
|
|
}
|
|
}
|
|
}
|
|
|
|
func (cm *LeaderContextManager) cleanupCompletedJobs() {
|
|
cm.mu.Lock()
|
|
defer cm.mu.Unlock()
|
|
|
|
if len(cm.completedJobs) <= cm.config.MaxCompletedJobs {
|
|
return
|
|
}
|
|
|
|
// Remove oldest completed jobs based on completion time
|
|
type jobWithTime struct {
|
|
id string
|
|
job *ContextGenerationJob
|
|
time time.Time
|
|
}
|
|
|
|
var jobs []jobWithTime
|
|
for id, job := range cm.completedJobs {
|
|
completedAt := time.Now()
|
|
if job.CompletedAt != nil {
|
|
completedAt = *job.CompletedAt
|
|
}
|
|
jobs = append(jobs, jobWithTime{id: id, job: job, time: completedAt})
|
|
}
|
|
|
|
// Sort by completion time (oldest first)
|
|
sort.Slice(jobs, func(i, j int) bool {
|
|
return jobs[i].time.Before(jobs[j].time)
|
|
})
|
|
|
|
// Remove oldest jobs to get back to limit
|
|
toRemove := len(jobs) - cm.config.MaxCompletedJobs
|
|
for i := 0; i < toRemove; i++ {
|
|
delete(cm.completedJobs, jobs[i].id)
|
|
}
|
|
}
|
|
|
|
func generateJobID() string {
|
|
// Generate UUID-like job ID with timestamp
|
|
timestamp := time.Now().Unix()
|
|
random := rand.Int63()
|
|
return fmt.Sprintf("ctx-job-%d-%x", timestamp, random&0xFFFFFF)
|
|
}
|
|
|
|
// Error definitions
|
|
var (
|
|
ErrNotLeader = &LeaderError{Code: "NOT_LEADER", Message: "Node is not the leader"}
|
|
ErrQueueFull = &LeaderError{Code: "QUEUE_FULL", Message: "Generation queue is full"}
|
|
ErrDuplicateRequest = &LeaderError{Code: "DUPLICATE_REQUEST", Message: "Duplicate generation request"}
|
|
ErrInvalidRequest = &LeaderError{Code: "INVALID_REQUEST", Message: "Invalid generation request"}
|
|
ErrMissingUCXLAddress = &LeaderError{Code: "MISSING_UCXL_ADDRESS", Message: "Missing UCXL address"}
|
|
ErrMissingFilePath = &LeaderError{Code: "MISSING_FILE_PATH", Message: "Missing file path"}
|
|
ErrMissingRole = &LeaderError{Code: "MISSING_ROLE", Message: "Missing role"}
|
|
)
|
|
|
|
// LeaderError represents errors specific to leader operations
|
|
type LeaderError struct {
|
|
Code string `json:"code"`
|
|
Message string `json:"message"`
|
|
}
|
|
|
|
func (e *LeaderError) Error() string {
|
|
return e.Message
|
|
}
|
|
|
|
// DefaultManagerConfig returns default manager configuration
|
|
func DefaultManagerConfig() *ManagerConfig {
|
|
return &ManagerConfig{
|
|
LeadershipCheckInterval: 5 * time.Second,
|
|
HealthCheckInterval: 30 * time.Second,
|
|
ClusterSyncInterval: 60 * time.Second,
|
|
MaxCompletedJobs: 1000,
|
|
QueueSize: 10000,
|
|
MaxConcurrentJobs: 10,
|
|
JobTimeout: 10 * time.Minute,
|
|
}
|
|
} |