 28f02b61d1
			
		
	
	28f02b61d1
	
	
	
		
			
			- Add scaling system components to server initialization - Register scaling API and assignment broker routes - Start bootstrap pool manager in server lifecycle - Add graceful shutdown for scaling controller - Update API routing to use chi.Router instead of gorilla/mux - Fix Docker API compatibility issues - Configure health gates with placeholder URLs for KACHING and BACKBEAT 🤖 Generated with [Claude Code](https://claude.ai/code) Co-Authored-By: Claude <noreply@anthropic.com>
		
			
				
	
	
		
			607 lines
		
	
	
		
			20 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
			
		
		
	
	
			607 lines
		
	
	
		
			20 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
| package orchestrator
 | |
| 
 | |
| import (
 | |
| 	"context"
 | |
| 	"fmt"
 | |
| 	"math"
 | |
| 	"math/rand"
 | |
| 	"sync"
 | |
| 	"time"
 | |
| 
 | |
| 	"github.com/rs/zerolog/log"
 | |
| 	"go.opentelemetry.io/otel/attribute"
 | |
| 
 | |
| 	"github.com/chorus-services/whoosh/internal/tracing"
 | |
| )
 | |
| 
 | |
| // ScalingController manages wave-based scaling operations for CHORUS services
 | |
| type ScalingController struct {
 | |
| 	mu                sync.RWMutex
 | |
| 	swarmManager      *SwarmManager
 | |
| 	healthGates       *HealthGates
 | |
| 	assignmentBroker  *AssignmentBroker
 | |
| 	bootstrapManager  *BootstrapPoolManager
 | |
| 	metricsCollector  *ScalingMetricsCollector
 | |
| 
 | |
| 	// Scaling configuration
 | |
| 	config            ScalingConfig
 | |
| 
 | |
| 	// Current scaling state
 | |
| 	currentOperations map[string]*ScalingOperation
 | |
| 	scalingActive     bool
 | |
| 	stopChan          chan struct{}
 | |
| 	ctx               context.Context
 | |
| 	cancel            context.CancelFunc
 | |
| }
 | |
| 
 | |
| // ScalingConfig defines configuration for scaling operations
 | |
| type ScalingConfig struct {
 | |
| 	MinWaveSize          int           `json:"min_wave_size"`            // Minimum replicas per wave
 | |
| 	MaxWaveSize          int           `json:"max_wave_size"`            // Maximum replicas per wave
 | |
| 	WaveInterval         time.Duration `json:"wave_interval"`            // Time between waves
 | |
| 	MaxConcurrentOps     int           `json:"max_concurrent_ops"`       // Maximum concurrent scaling operations
 | |
| 
 | |
| 	// Backoff configuration
 | |
| 	InitialBackoff       time.Duration `json:"initial_backoff"`          // Initial backoff delay
 | |
| 	MaxBackoff           time.Duration `json:"max_backoff"`              // Maximum backoff delay
 | |
| 	BackoffMultiplier    float64       `json:"backoff_multiplier"`       // Backoff multiplier
 | |
| 	JitterPercentage     float64       `json:"jitter_percentage"`        // Jitter percentage (0.0-1.0)
 | |
| 
 | |
| 	// Health gate configuration
 | |
| 	HealthCheckTimeout   time.Duration `json:"health_check_timeout"`     // Timeout for health checks
 | |
| 	MinJoinSuccessRate   float64       `json:"min_join_success_rate"`    // Minimum join success rate
 | |
| 	SuccessRateWindow    int           `json:"success_rate_window"`      // Window size for success rate calculation
 | |
| }
 | |
| 
 | |
| // ScalingOperation represents an ongoing scaling operation
 | |
| type ScalingOperation struct {
 | |
| 	ID                string                 `json:"id"`
 | |
| 	ServiceName       string                 `json:"service_name"`
 | |
| 	CurrentReplicas   int                    `json:"current_replicas"`
 | |
| 	TargetReplicas    int                    `json:"target_replicas"`
 | |
| 
 | |
| 	// Wave state
 | |
| 	CurrentWave       int                    `json:"current_wave"`
 | |
| 	WavesCompleted    int                    `json:"waves_completed"`
 | |
| 	WaveSize          int                    `json:"wave_size"`
 | |
| 
 | |
| 	// Timing
 | |
| 	StartedAt         time.Time              `json:"started_at"`
 | |
| 	LastWaveAt        time.Time              `json:"last_wave_at,omitempty"`
 | |
| 	EstimatedCompletion time.Time            `json:"estimated_completion,omitempty"`
 | |
| 
 | |
| 	// Backoff state
 | |
| 	ConsecutiveFailures int                  `json:"consecutive_failures"`
 | |
| 	NextWaveAt         time.Time             `json:"next_wave_at,omitempty"`
 | |
| 	BackoffDelay       time.Duration         `json:"backoff_delay"`
 | |
| 
 | |
| 	// Status
 | |
| 	Status            ScalingStatus          `json:"status"`
 | |
| 	LastError         string                 `json:"last_error,omitempty"`
 | |
| 
 | |
| 	// Configuration
 | |
| 	Template          string                 `json:"template"`
 | |
| 	ScalingParams     map[string]interface{} `json:"scaling_params,omitempty"`
 | |
| }
 | |
| 
 | |
| // ScalingStatus represents the status of a scaling operation
 | |
| type ScalingStatus string
 | |
| 
 | |
| const (
 | |
| 	ScalingStatusPending    ScalingStatus = "pending"
 | |
| 	ScalingStatusRunning    ScalingStatus = "running"
 | |
| 	ScalingStatusWaiting    ScalingStatus = "waiting"    // Waiting for health gates
 | |
| 	ScalingStatusBackoff    ScalingStatus = "backoff"    // In backoff period
 | |
| 	ScalingStatusCompleted  ScalingStatus = "completed"
 | |
| 	ScalingStatusFailed     ScalingStatus = "failed"
 | |
| 	ScalingStatusCancelled  ScalingStatus = "cancelled"
 | |
| )
 | |
| 
 | |
| // ScalingRequest represents a request to scale a service
 | |
| type ScalingRequest struct {
 | |
| 	ServiceName     string                 `json:"service_name"`
 | |
| 	TargetReplicas  int                    `json:"target_replicas"`
 | |
| 	Template        string                 `json:"template,omitempty"`
 | |
| 	ScalingParams   map[string]interface{} `json:"scaling_params,omitempty"`
 | |
| 	Force           bool                   `json:"force,omitempty"` // Skip health gates
 | |
| }
 | |
| 
 | |
| // WaveResult represents the result of a scaling wave
 | |
| type WaveResult struct {
 | |
| 	WaveNumber      int       `json:"wave_number"`
 | |
| 	RequestedCount  int       `json:"requested_count"`
 | |
| 	SuccessfulJoins int       `json:"successful_joins"`
 | |
| 	FailedJoins     int       `json:"failed_joins"`
 | |
| 	Duration        time.Duration `json:"duration"`
 | |
| 	CompletedAt     time.Time `json:"completed_at"`
 | |
| }
 | |
| 
 | |
| // NewScalingController creates a new scaling controller
 | |
| func NewScalingController(
 | |
| 	swarmManager *SwarmManager,
 | |
| 	healthGates *HealthGates,
 | |
| 	assignmentBroker *AssignmentBroker,
 | |
| 	bootstrapManager *BootstrapPoolManager,
 | |
| 	metricsCollector *ScalingMetricsCollector,
 | |
| ) *ScalingController {
 | |
| 	ctx, cancel := context.WithCancel(context.Background())
 | |
| 
 | |
| 	return &ScalingController{
 | |
| 		swarmManager:      swarmManager,
 | |
| 		healthGates:       healthGates,
 | |
| 		assignmentBroker:  assignmentBroker,
 | |
| 		bootstrapManager:  bootstrapManager,
 | |
| 		metricsCollector:  metricsCollector,
 | |
| 		config: ScalingConfig{
 | |
| 			MinWaveSize:        3,
 | |
| 			MaxWaveSize:        8,
 | |
| 			WaveInterval:       30 * time.Second,
 | |
| 			MaxConcurrentOps:   3,
 | |
| 			InitialBackoff:     30 * time.Second,
 | |
| 			MaxBackoff:         2 * time.Minute,
 | |
| 			BackoffMultiplier:  1.5,
 | |
| 			JitterPercentage:   0.2,
 | |
| 			HealthCheckTimeout: 10 * time.Second,
 | |
| 			MinJoinSuccessRate: 0.8,
 | |
| 			SuccessRateWindow:  10,
 | |
| 		},
 | |
| 		currentOperations: make(map[string]*ScalingOperation),
 | |
| 		stopChan:          make(chan struct{}, 1),
 | |
| 		ctx:               ctx,
 | |
| 		cancel:            cancel,
 | |
| 	}
 | |
| }
 | |
| 
 | |
| // StartScaling initiates a scaling operation and returns the wave ID
 | |
| func (sc *ScalingController) StartScaling(ctx context.Context, serviceName string, targetReplicas, waveSize int, template string) (string, error) {
 | |
| 	request := ScalingRequest{
 | |
| 		ServiceName:    serviceName,
 | |
| 		TargetReplicas: targetReplicas,
 | |
| 		Template:       template,
 | |
| 	}
 | |
| 
 | |
| 	operation, err := sc.startScalingOperation(ctx, request)
 | |
| 	if err != nil {
 | |
| 		return "", err
 | |
| 	}
 | |
| 
 | |
| 	return operation.ID, nil
 | |
| }
 | |
| 
 | |
| // startScalingOperation initiates a scaling operation
 | |
| func (sc *ScalingController) startScalingOperation(ctx context.Context, request ScalingRequest) (*ScalingOperation, error) {
 | |
| 	ctx, span := tracing.Tracer.Start(ctx, "scaling_controller.start_scaling")
 | |
| 	defer span.End()
 | |
| 
 | |
| 	sc.mu.Lock()
 | |
| 	defer sc.mu.Unlock()
 | |
| 
 | |
| 	// Check if there's already an operation for this service
 | |
| 	if existingOp, exists := sc.currentOperations[request.ServiceName]; exists {
 | |
| 		if existingOp.Status == ScalingStatusRunning || existingOp.Status == ScalingStatusWaiting {
 | |
| 			return nil, fmt.Errorf("scaling operation already in progress for service %s", request.ServiceName)
 | |
| 		}
 | |
| 	}
 | |
| 
 | |
| 	// Check concurrent operation limit
 | |
| 	runningOps := 0
 | |
| 	for _, op := range sc.currentOperations {
 | |
| 		if op.Status == ScalingStatusRunning || op.Status == ScalingStatusWaiting {
 | |
| 			runningOps++
 | |
| 		}
 | |
| 	}
 | |
| 
 | |
| 	if runningOps >= sc.config.MaxConcurrentOps {
 | |
| 		return nil, fmt.Errorf("maximum concurrent scaling operations (%d) reached", sc.config.MaxConcurrentOps)
 | |
| 	}
 | |
| 
 | |
| 	// Get current replica count
 | |
| 	currentReplicas, err := sc.swarmManager.GetServiceReplicas(ctx, request.ServiceName)
 | |
| 	if err != nil {
 | |
| 		return nil, fmt.Errorf("failed to get current replica count: %w", err)
 | |
| 	}
 | |
| 
 | |
| 	// Calculate wave size
 | |
| 	waveSize := sc.calculateWaveSize(currentReplicas, request.TargetReplicas)
 | |
| 
 | |
| 	// Create scaling operation
 | |
| 	operation := &ScalingOperation{
 | |
| 		ID:                fmt.Sprintf("scale-%s-%d", request.ServiceName, time.Now().Unix()),
 | |
| 		ServiceName:       request.ServiceName,
 | |
| 		CurrentReplicas:   currentReplicas,
 | |
| 		TargetReplicas:    request.TargetReplicas,
 | |
| 		CurrentWave:       1,
 | |
| 		WaveSize:          waveSize,
 | |
| 		StartedAt:         time.Now(),
 | |
| 		Status:            ScalingStatusPending,
 | |
| 		Template:          request.Template,
 | |
| 		ScalingParams:     request.ScalingParams,
 | |
| 		BackoffDelay:      sc.config.InitialBackoff,
 | |
| 	}
 | |
| 
 | |
| 	// Store operation
 | |
| 	sc.currentOperations[request.ServiceName] = operation
 | |
| 
 | |
| 	// Start metrics tracking
 | |
| 	if sc.metricsCollector != nil {
 | |
| 		sc.metricsCollector.StartWave(ctx, operation.ID, operation.ServiceName, operation.TargetReplicas)
 | |
| 	}
 | |
| 
 | |
| 	// Start scaling process in background
 | |
| 	go sc.executeScaling(context.Background(), operation, request.Force)
 | |
| 
 | |
| 	span.SetAttributes(
 | |
| 		attribute.String("scaling.service_name", request.ServiceName),
 | |
| 		attribute.Int("scaling.current_replicas", currentReplicas),
 | |
| 		attribute.Int("scaling.target_replicas", request.TargetReplicas),
 | |
| 		attribute.Int("scaling.wave_size", waveSize),
 | |
| 		attribute.String("scaling.operation_id", operation.ID),
 | |
| 	)
 | |
| 
 | |
| 	log.Info().
 | |
| 		Str("operation_id", operation.ID).
 | |
| 		Str("service_name", request.ServiceName).
 | |
| 		Int("current_replicas", currentReplicas).
 | |
| 		Int("target_replicas", request.TargetReplicas).
 | |
| 		Int("wave_size", waveSize).
 | |
| 		Msg("Started scaling operation")
 | |
| 
 | |
| 	return operation, nil
 | |
| }
 | |
| 
 | |
| // executeScaling executes the scaling operation with wave-based approach
 | |
| func (sc *ScalingController) executeScaling(ctx context.Context, operation *ScalingOperation, force bool) {
 | |
| 	ctx, span := tracing.Tracer.Start(ctx, "scaling_controller.execute_scaling")
 | |
| 	defer span.End()
 | |
| 
 | |
| 	defer func() {
 | |
| 		sc.mu.Lock()
 | |
| 		// Keep completed operations for a while for monitoring
 | |
| 		if operation.Status == ScalingStatusCompleted || operation.Status == ScalingStatusFailed {
 | |
| 			// Clean up after 1 hour
 | |
| 			go func() {
 | |
| 				time.Sleep(1 * time.Hour)
 | |
| 				sc.mu.Lock()
 | |
| 				delete(sc.currentOperations, operation.ServiceName)
 | |
| 				sc.mu.Unlock()
 | |
| 			}()
 | |
| 		}
 | |
| 		sc.mu.Unlock()
 | |
| 	}()
 | |
| 
 | |
| 	operation.Status = ScalingStatusRunning
 | |
| 
 | |
| 	for operation.CurrentReplicas < operation.TargetReplicas {
 | |
| 		// Check if we should wait for backoff
 | |
| 		if !operation.NextWaveAt.IsZero() && time.Now().Before(operation.NextWaveAt) {
 | |
| 			operation.Status = ScalingStatusBackoff
 | |
| 			waitTime := time.Until(operation.NextWaveAt)
 | |
| 			log.Info().
 | |
| 				Str("operation_id", operation.ID).
 | |
| 				Dur("wait_time", waitTime).
 | |
| 				Msg("Waiting for backoff period")
 | |
| 
 | |
| 			select {
 | |
| 			case <-ctx.Done():
 | |
| 				operation.Status = ScalingStatusCancelled
 | |
| 				return
 | |
| 			case <-time.After(waitTime):
 | |
| 				// Continue after backoff
 | |
| 			}
 | |
| 		}
 | |
| 
 | |
| 		operation.Status = ScalingStatusRunning
 | |
| 
 | |
| 		// Check health gates (unless forced)
 | |
| 		if !force {
 | |
| 			if err := sc.waitForHealthGates(ctx, operation); err != nil {
 | |
| 				operation.LastError = err.Error()
 | |
| 				operation.ConsecutiveFailures++
 | |
| 				sc.applyBackoff(operation)
 | |
| 				continue
 | |
| 			}
 | |
| 		}
 | |
| 
 | |
| 		// Execute scaling wave
 | |
| 		waveResult, err := sc.executeWave(ctx, operation)
 | |
| 		if err != nil {
 | |
| 			log.Error().
 | |
| 				Str("operation_id", operation.ID).
 | |
| 				Err(err).
 | |
| 				Msg("Scaling wave failed")
 | |
| 
 | |
| 			operation.LastError = err.Error()
 | |
| 			operation.ConsecutiveFailures++
 | |
| 			sc.applyBackoff(operation)
 | |
| 			continue
 | |
| 		}
 | |
| 
 | |
| 		// Update operation state
 | |
| 		operation.CurrentReplicas += waveResult.SuccessfulJoins
 | |
| 		operation.WavesCompleted++
 | |
| 		operation.LastWaveAt = time.Now()
 | |
| 		operation.ConsecutiveFailures = 0 // Reset on success
 | |
| 		operation.NextWaveAt = time.Time{} // Clear backoff
 | |
| 
 | |
| 		// Update scaling metrics
 | |
| 		// Metrics are handled by the metrics collector
 | |
| 
 | |
| 		log.Info().
 | |
| 			Str("operation_id", operation.ID).
 | |
| 			Int("wave", operation.CurrentWave).
 | |
| 			Int("successful_joins", waveResult.SuccessfulJoins).
 | |
| 			Int("failed_joins", waveResult.FailedJoins).
 | |
| 			Int("current_replicas", operation.CurrentReplicas).
 | |
| 			Int("target_replicas", operation.TargetReplicas).
 | |
| 			Msg("Scaling wave completed")
 | |
| 
 | |
| 		// Move to next wave
 | |
| 		operation.CurrentWave++
 | |
| 
 | |
| 		// Wait between waves
 | |
| 		if operation.CurrentReplicas < operation.TargetReplicas {
 | |
| 			select {
 | |
| 			case <-ctx.Done():
 | |
| 				operation.Status = ScalingStatusCancelled
 | |
| 				return
 | |
| 			case <-time.After(sc.config.WaveInterval):
 | |
| 				// Continue to next wave
 | |
| 			}
 | |
| 		}
 | |
| 	}
 | |
| 
 | |
| 	// Scaling completed successfully
 | |
| 	operation.Status = ScalingStatusCompleted
 | |
| 	operation.EstimatedCompletion = time.Now()
 | |
| 
 | |
| 	log.Info().
 | |
| 		Str("operation_id", operation.ID).
 | |
| 		Str("service_name", operation.ServiceName).
 | |
| 		Int("final_replicas", operation.CurrentReplicas).
 | |
| 		Int("waves_completed", operation.WavesCompleted).
 | |
| 		Dur("total_duration", time.Since(operation.StartedAt)).
 | |
| 		Msg("Scaling operation completed successfully")
 | |
| }
 | |
| 
 | |
| // waitForHealthGates waits for health gates to be satisfied
 | |
| func (sc *ScalingController) waitForHealthGates(ctx context.Context, operation *ScalingOperation) error {
 | |
| 	operation.Status = ScalingStatusWaiting
 | |
| 
 | |
| 	ctx, cancel := context.WithTimeout(ctx, sc.config.HealthCheckTimeout)
 | |
| 	defer cancel()
 | |
| 
 | |
| 	healthStatus, err := sc.healthGates.CheckHealth(ctx, nil)
 | |
| 	if err != nil {
 | |
| 		return fmt.Errorf("health gate check failed: %w", err)
 | |
| 	}
 | |
| 
 | |
| 	if !healthStatus.Healthy {
 | |
| 		return fmt.Errorf("health gates not satisfied: %s", healthStatus.OverallReason)
 | |
| 	}
 | |
| 
 | |
| 	return nil
 | |
| }
 | |
| 
 | |
| // executeWave executes a single scaling wave
 | |
| func (sc *ScalingController) executeWave(ctx context.Context, operation *ScalingOperation) (*WaveResult, error) {
 | |
| 	startTime := time.Now()
 | |
| 
 | |
| 	// Calculate how many replicas to add in this wave
 | |
| 	remaining := operation.TargetReplicas - operation.CurrentReplicas
 | |
| 	waveSize := operation.WaveSize
 | |
| 	if remaining < waveSize {
 | |
| 		waveSize = remaining
 | |
| 	}
 | |
| 
 | |
| 	// Create assignments for new replicas
 | |
| 	var assignments []*Assignment
 | |
| 	for i := 0; i < waveSize; i++ {
 | |
| 		assignReq := AssignmentRequest{
 | |
| 			ClusterID: "production", // TODO: Make configurable
 | |
| 			Template:  operation.Template,
 | |
| 		}
 | |
| 
 | |
| 		assignment, err := sc.assignmentBroker.CreateAssignment(ctx, assignReq)
 | |
| 		if err != nil {
 | |
| 			return nil, fmt.Errorf("failed to create assignment: %w", err)
 | |
| 		}
 | |
| 
 | |
| 		assignments = append(assignments, assignment)
 | |
| 	}
 | |
| 
 | |
| 	// Deploy new replicas
 | |
| 	newReplicaCount := operation.CurrentReplicas + waveSize
 | |
| 	err := sc.swarmManager.ScaleService(ctx, operation.ServiceName, newReplicaCount)
 | |
| 	if err != nil {
 | |
| 		return nil, fmt.Errorf("failed to scale service: %w", err)
 | |
| 	}
 | |
| 
 | |
| 	// Wait for replicas to come online and join successfully
 | |
| 	successfulJoins, failedJoins := sc.waitForReplicaJoins(ctx, operation.ServiceName, waveSize)
 | |
| 
 | |
| 	result := &WaveResult{
 | |
| 		WaveNumber:      operation.CurrentWave,
 | |
| 		RequestedCount:  waveSize,
 | |
| 		SuccessfulJoins: successfulJoins,
 | |
| 		FailedJoins:     failedJoins,
 | |
| 		Duration:        time.Since(startTime),
 | |
| 		CompletedAt:     time.Now(),
 | |
| 	}
 | |
| 
 | |
| 	return result, nil
 | |
| }
 | |
| 
 | |
| // waitForReplicaJoins waits for new replicas to join the cluster
 | |
| func (sc *ScalingController) waitForReplicaJoins(ctx context.Context, serviceName string, expectedJoins int) (successful, failed int) {
 | |
| 	// Wait up to 2 minutes for replicas to join
 | |
| 	ctx, cancel := context.WithTimeout(ctx, 2*time.Minute)
 | |
| 	defer cancel()
 | |
| 
 | |
| 	ticker := time.NewTicker(5 * time.Second)
 | |
| 	defer ticker.Stop()
 | |
| 
 | |
| 	startTime := time.Now()
 | |
| 
 | |
| 	for {
 | |
| 		select {
 | |
| 		case <-ctx.Done():
 | |
| 			// Timeout reached, return current counts
 | |
| 			return successful, expectedJoins - successful
 | |
| 		case <-ticker.C:
 | |
| 			// Check service status
 | |
| 			running, err := sc.swarmManager.GetRunningReplicas(ctx, serviceName)
 | |
| 			if err != nil {
 | |
| 				log.Warn().Err(err).Msg("Failed to get running replicas")
 | |
| 				continue
 | |
| 			}
 | |
| 
 | |
| 			// For now, assume all running replicas are successful joins
 | |
| 			// In a real implementation, this would check P2P network membership
 | |
| 			if running >= expectedJoins {
 | |
| 				successful = expectedJoins
 | |
| 				failed = 0
 | |
| 				return
 | |
| 			}
 | |
| 
 | |
| 			// If we've been waiting too long with no progress, consider some failed
 | |
| 			if time.Since(startTime) > 90*time.Second {
 | |
| 				successful = running
 | |
| 				failed = expectedJoins - running
 | |
| 				return
 | |
| 			}
 | |
| 		}
 | |
| 	}
 | |
| }
 | |
| 
 | |
| // calculateWaveSize calculates the appropriate wave size for scaling
 | |
| func (sc *ScalingController) calculateWaveSize(current, target int) int {
 | |
| 	totalNodes := 10 // TODO: Get actual node count from swarm
 | |
| 
 | |
| 	// Wave size formula: min(max(3, floor(total_nodes/10)), 8)
 | |
| 	waveSize := int(math.Max(3, math.Floor(float64(totalNodes)/10)))
 | |
| 	if waveSize > sc.config.MaxWaveSize {
 | |
| 		waveSize = sc.config.MaxWaveSize
 | |
| 	}
 | |
| 
 | |
| 	// Don't exceed remaining replicas needed
 | |
| 	remaining := target - current
 | |
| 	if waveSize > remaining {
 | |
| 		waveSize = remaining
 | |
| 	}
 | |
| 
 | |
| 	return waveSize
 | |
| }
 | |
| 
 | |
| // applyBackoff applies exponential backoff to the operation
 | |
| func (sc *ScalingController) applyBackoff(operation *ScalingOperation) {
 | |
| 	// Calculate backoff delay with exponential increase
 | |
| 	backoff := time.Duration(float64(operation.BackoffDelay) * math.Pow(sc.config.BackoffMultiplier, float64(operation.ConsecutiveFailures-1)))
 | |
| 
 | |
| 	// Cap at maximum backoff
 | |
| 	if backoff > sc.config.MaxBackoff {
 | |
| 		backoff = sc.config.MaxBackoff
 | |
| 	}
 | |
| 
 | |
| 	// Add jitter
 | |
| 	jitter := time.Duration(float64(backoff) * sc.config.JitterPercentage * (rand.Float64() - 0.5))
 | |
| 	backoff += jitter
 | |
| 
 | |
| 	operation.BackoffDelay = backoff
 | |
| 	operation.NextWaveAt = time.Now().Add(backoff)
 | |
| 
 | |
| 	log.Warn().
 | |
| 		Str("operation_id", operation.ID).
 | |
| 		Int("consecutive_failures", operation.ConsecutiveFailures).
 | |
| 		Dur("backoff_delay", backoff).
 | |
| 		Time("next_wave_at", operation.NextWaveAt).
 | |
| 		Msg("Applied exponential backoff")
 | |
| }
 | |
| 
 | |
| 
 | |
| // GetOperation returns a scaling operation by service name
 | |
| func (sc *ScalingController) GetOperation(serviceName string) (*ScalingOperation, bool) {
 | |
| 	sc.mu.RLock()
 | |
| 	defer sc.mu.RUnlock()
 | |
| 
 | |
| 	op, exists := sc.currentOperations[serviceName]
 | |
| 	return op, exists
 | |
| }
 | |
| 
 | |
| // GetAllOperations returns all current scaling operations
 | |
| func (sc *ScalingController) GetAllOperations() map[string]*ScalingOperation {
 | |
| 	sc.mu.RLock()
 | |
| 	defer sc.mu.RUnlock()
 | |
| 
 | |
| 	operations := make(map[string]*ScalingOperation)
 | |
| 	for k, v := range sc.currentOperations {
 | |
| 		operations[k] = v
 | |
| 	}
 | |
| 	return operations
 | |
| }
 | |
| 
 | |
| // CancelOperation cancels a scaling operation
 | |
| func (sc *ScalingController) CancelOperation(serviceName string) error {
 | |
| 	sc.mu.Lock()
 | |
| 	defer sc.mu.Unlock()
 | |
| 
 | |
| 	operation, exists := sc.currentOperations[serviceName]
 | |
| 	if !exists {
 | |
| 		return fmt.Errorf("no scaling operation found for service %s", serviceName)
 | |
| 	}
 | |
| 
 | |
| 	if operation.Status == ScalingStatusCompleted || operation.Status == ScalingStatusFailed {
 | |
| 		return fmt.Errorf("scaling operation already finished")
 | |
| 	}
 | |
| 
 | |
| 	operation.Status = ScalingStatusCancelled
 | |
| 	log.Info().Str("operation_id", operation.ID).Msg("Scaling operation cancelled")
 | |
| 
 | |
| 	// Complete metrics tracking
 | |
| 	if sc.metricsCollector != nil {
 | |
| 		currentReplicas, _ := sc.swarmManager.GetServiceReplicas(context.Background(), serviceName)
 | |
| 		sc.metricsCollector.CompleteWave(context.Background(), false, currentReplicas, "Operation cancelled", operation.ConsecutiveFailures)
 | |
| 	}
 | |
| 
 | |
| 	return nil
 | |
| }
 | |
| 
 | |
| // StopScaling stops all active scaling operations
 | |
| func (sc *ScalingController) StopScaling(ctx context.Context) {
 | |
| 	ctx, span := tracing.Tracer.Start(ctx, "scaling_controller.stop_scaling")
 | |
| 	defer span.End()
 | |
| 
 | |
| 	sc.mu.Lock()
 | |
| 	defer sc.mu.Unlock()
 | |
| 
 | |
| 	cancelledCount := 0
 | |
| 	for serviceName, operation := range sc.currentOperations {
 | |
| 		if operation.Status == ScalingStatusRunning || operation.Status == ScalingStatusWaiting || operation.Status == ScalingStatusBackoff {
 | |
| 			operation.Status = ScalingStatusCancelled
 | |
| 			cancelledCount++
 | |
| 
 | |
| 			// Complete metrics tracking for cancelled operations
 | |
| 			if sc.metricsCollector != nil {
 | |
| 				currentReplicas, _ := sc.swarmManager.GetServiceReplicas(ctx, serviceName)
 | |
| 				sc.metricsCollector.CompleteWave(ctx, false, currentReplicas, "Scaling stopped", operation.ConsecutiveFailures)
 | |
| 			}
 | |
| 
 | |
| 			log.Info().Str("operation_id", operation.ID).Str("service_name", serviceName).Msg("Scaling operation stopped")
 | |
| 		}
 | |
| 	}
 | |
| 
 | |
| 	// Signal stop to running operations
 | |
| 	select {
 | |
| 	case sc.stopChan <- struct{}{}:
 | |
| 	default:
 | |
| 	}
 | |
| 
 | |
| 	span.SetAttributes(attribute.Int("stopped_operations", cancelledCount))
 | |
| 	log.Info().Int("cancelled_operations", cancelledCount).Msg("Stopped all scaling operations")
 | |
| }
 | |
| 
 | |
| // Close shuts down the scaling controller
 | |
| func (sc *ScalingController) Close() error {
 | |
| 	sc.cancel()
 | |
| 	sc.StopScaling(sc.ctx)
 | |
| 	return nil
 | |
| } |