diff --git a/go.mod b/go.mod index 16bac74..0e98351 100644 --- a/go.mod +++ b/go.mod @@ -13,6 +13,7 @@ require ( github.com/golang-jwt/jwt/v5 v5.3.0 github.com/golang-migrate/migrate/v4 v4.17.0 github.com/google/uuid v1.6.0 + github.com/gorilla/mux v1.8.1 github.com/jackc/pgx/v5 v5.5.2 github.com/kelseyhightower/envconfig v1.4.0 github.com/rs/zerolog v1.32.0 diff --git a/go.sum b/go.sum index 95927d2..48b5458 100644 --- a/go.sum +++ b/go.sum @@ -40,6 +40,8 @@ github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI= github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= +github.com/gorilla/mux v1.8.1 h1:TuBL49tXwgrFYWhqrNgrUNEY92u81SPhu7sTdzQEiWY= +github.com/gorilla/mux v1.8.1/go.mod h1:AKf9I4AEqPTmMytcMc0KkNouC66V3BtZ4qD5fmWSiMQ= github.com/hashicorp/errwrap v1.0.0/go.mod h1:YH+1FKiLXxHSkmPseP+kNlulaMuP3n2brvKWEqk/Jc4= github.com/hashicorp/errwrap v1.1.0 h1:OxrOeh75EUXMY8TBjag2fzXGZ40LB6IKw45YeGUDY2I= github.com/hashicorp/errwrap v1.1.0/go.mod h1:YH+1FKiLXxHSkmPseP+kNlulaMuP3n2brvKWEqk/Jc4= diff --git a/internal/orchestrator/assignment_broker.go b/internal/orchestrator/assignment_broker.go index df5e984..f8dbbe3 100644 --- a/internal/orchestrator/assignment_broker.go +++ b/internal/orchestrator/assignment_broker.go @@ -10,7 +10,7 @@ import ( "sync" "time" - "github.com/gorilla/mux" + "github.com/go-chi/chi/v5" "github.com/rs/zerolog/log" "go.opentelemetry.io/otel/attribute" @@ -155,15 +155,17 @@ func (ab *AssignmentBroker) initializeDefaultTemplates() { } // RegisterRoutes registers HTTP routes for the assignment broker -func (ab *AssignmentBroker) RegisterRoutes(router *mux.Router) { - router.HandleFunc("/assign", ab.handleAssignRequest).Methods("GET") - router.HandleFunc("/assignments", ab.handleListAssignments).Methods("GET") - router.HandleFunc("/assignments/{id}", ab.handleGetAssignment).Methods("GET") - router.HandleFunc("/assignments/{id}", ab.handleDeleteAssignment).Methods("DELETE") - router.HandleFunc("/templates", ab.handleListTemplates).Methods("GET") - router.HandleFunc("/templates", ab.handleCreateTemplate).Methods("POST") - router.HandleFunc("/templates/{name}", ab.handleGetTemplate).Methods("GET") - router.HandleFunc("/assignments/stats", ab.handleGetStats).Methods("GET") +func (ab *AssignmentBroker) RegisterRoutes(router chi.Router) { + router.Get("/assign", ab.handleAssignRequest) + router.Get("/", ab.handleListAssignments) + router.Get("/{id}", ab.handleGetAssignment) + router.Delete("/{id}", ab.handleDeleteAssignment) + router.Route("/templates", func(r chi.Router) { + r.Get("/", ab.handleListTemplates) + r.Post("/", ab.handleCreateTemplate) + r.Get("/{name}", ab.handleGetTemplate) + }) + router.Get("/stats", ab.handleGetStats) } // handleAssignRequest handles requests for new assignments @@ -236,8 +238,7 @@ func (ab *AssignmentBroker) handleListAssignments(w http.ResponseWriter, r *http // handleGetAssignment returns a specific assignment by ID func (ab *AssignmentBroker) handleGetAssignment(w http.ResponseWriter, r *http.Request) { - vars := mux.Vars(r) - assignmentID := vars["id"] + assignmentID := chi.URLParam(r, "id") ab.mu.RLock() assignment, exists := ab.assignments[assignmentID] @@ -254,8 +255,7 @@ func (ab *AssignmentBroker) handleGetAssignment(w http.ResponseWriter, r *http.R // handleDeleteAssignment deletes an assignment func (ab *AssignmentBroker) handleDeleteAssignment(w http.ResponseWriter, r *http.Request) { - vars := mux.Vars(r) - assignmentID := vars["id"] + assignmentID := chi.URLParam(r, "id") ab.mu.Lock() defer ab.mu.Unlock() @@ -311,8 +311,7 @@ func (ab *AssignmentBroker) handleCreateTemplate(w http.ResponseWriter, r *http. // handleGetTemplate returns a specific template func (ab *AssignmentBroker) handleGetTemplate(w http.ResponseWriter, r *http.Request) { - vars := mux.Vars(r) - templateName := vars["name"] + templateName := chi.URLParam(r, "name") ab.mu.RLock() template, exists := ab.templates[templateName] @@ -353,7 +352,9 @@ func (ab *AssignmentBroker) CreateAssignment(ctx context.Context, req Assignment if ab.bootstrap != nil { subset := ab.bootstrap.GetSubset(template.BootstrapPeerCount) for _, peer := range subset.Peers { - bootstrapPeers = append(bootstrapPeers, fmt.Sprintf("%s/p2p/%s", peer.Addrs[0], peer.ID)) + if len(peer.Addresses) > 0 { + bootstrapPeers = append(bootstrapPeers, fmt.Sprintf("%s/p2p/%s", peer.Addresses[0], peer.ID)) + } } } diff --git a/internal/orchestrator/health_gates.go b/internal/orchestrator/health_gates.go index 3750be4..6274b0c 100644 --- a/internal/orchestrator/health_gates.go +++ b/internal/orchestrator/health_gates.go @@ -9,7 +9,6 @@ import ( "github.com/rs/zerolog/log" "go.opentelemetry.io/otel/attribute" - "go.opentelemetry.io/otel/trace" "github.com/chorus-services/whoosh/internal/tracing" ) diff --git a/internal/orchestrator/scaling_api.go b/internal/orchestrator/scaling_api.go index a1590a4..4b3645f 100644 --- a/internal/orchestrator/scaling_api.go +++ b/internal/orchestrator/scaling_api.go @@ -1,14 +1,13 @@ package orchestrator import ( - "context" "encoding/json" "fmt" "net/http" "strconv" "time" - "github.com/gorilla/mux" + "github.com/go-chi/chi/v5" "github.com/rs/zerolog/log" "go.opentelemetry.io/otel/attribute" @@ -59,33 +58,33 @@ func NewScalingAPI(controller *ScalingController, metrics *ScalingMetricsCollect } // RegisterRoutes registers HTTP routes for the scaling API -func (api *ScalingAPI) RegisterRoutes(router *mux.Router) { +func (api *ScalingAPI) RegisterRoutes(router chi.Router) { // Scaling operations - router.HandleFunc("/api/v1/scale", api.ScaleService).Methods("POST") - router.HandleFunc("/api/v1/scale/status", api.GetScalingStatus).Methods("GET") - router.HandleFunc("/api/v1/scale/stop", api.StopScaling).Methods("POST") + router.Post("/scale", api.ScaleService) + router.Get("/scale/status", api.GetScalingStatus) + router.Post("/scale/stop", api.StopScaling) // Health gates - router.HandleFunc("/api/v1/health/gates", api.GetHealthGates).Methods("GET") - router.HandleFunc("/api/v1/health/thresholds", api.GetHealthThresholds).Methods("GET") - router.HandleFunc("/api/v1/health/thresholds", api.UpdateHealthThresholds).Methods("PUT") + router.Get("/health/gates", api.GetHealthGates) + router.Get("/health/thresholds", api.GetHealthThresholds) + router.Put("/health/thresholds", api.UpdateHealthThresholds) // Metrics and monitoring - router.HandleFunc("/api/v1/metrics/scaling", api.GetScalingMetrics).Methods("GET") - router.HandleFunc("/api/v1/metrics/operations", api.GetRecentOperations).Methods("GET") - router.HandleFunc("/api/v1/metrics/export", api.ExportMetrics).Methods("GET") + router.Get("/metrics/scaling", api.GetScalingMetrics) + router.Get("/metrics/operations", api.GetRecentOperations) + router.Get("/metrics/export", api.ExportMetrics) // Service management - router.HandleFunc("/api/v1/services/{serviceName}/status", api.GetServiceStatus).Methods("GET") - router.HandleFunc("/api/v1/services/{serviceName}/replicas", api.GetServiceReplicas).Methods("GET") + router.Get("/services/{serviceName}/status", api.GetServiceStatus) + router.Get("/services/{serviceName}/replicas", api.GetServiceReplicas) // Assignment management - router.HandleFunc("/api/v1/assignments/templates", api.GetAssignmentTemplates).Methods("GET") - router.HandleFunc("/api/v1/assignments", api.CreateAssignment).Methods("POST") + router.Get("/assignments/templates", api.GetAssignmentTemplates) + router.Post("/assignments", api.CreateAssignment) // Bootstrap peer management - router.HandleFunc("/api/v1/bootstrap/peers", api.GetBootstrapPeers).Methods("GET") - router.HandleFunc("/api/v1/bootstrap/stats", api.GetBootstrapStats).Methods("GET") + router.Get("/bootstrap/peers", api.GetBootstrapPeers) + router.Get("/bootstrap/stats", api.GetBootstrapStats) } // ScaleService handles scaling requests @@ -179,7 +178,7 @@ func (api *ScalingAPI) ScaleService(w http.ResponseWriter, r *http.Request) { // GetScalingStatus returns the current scaling status func (api *ScalingAPI) GetScalingStatus(w http.ResponseWriter, r *http.Request) { - ctx, span := tracing.Tracer.Start(r.Context(), "scaling_api.get_scaling_status") + _, span := tracing.Tracer.Start(r.Context(), "scaling_api.get_scaling_status") defer span.End() currentWave := api.metrics.GetCurrentWave() @@ -350,8 +349,7 @@ func (api *ScalingAPI) GetServiceStatus(w http.ResponseWriter, r *http.Request) ctx, span := tracing.Tracer.Start(r.Context(), "scaling_api.get_service_status") defer span.End() - vars := mux.Vars(r) - serviceName := vars["serviceName"] + serviceName := chi.URLParam(r, "serviceName") status, err := api.controller.swarmManager.GetServiceStatus(ctx, serviceName) if err != nil { @@ -368,8 +366,7 @@ func (api *ScalingAPI) GetServiceReplicas(w http.ResponseWriter, r *http.Request ctx, span := tracing.Tracer.Start(r.Context(), "scaling_api.get_service_replicas") defer span.End() - vars := mux.Vars(r) - serviceName := vars["serviceName"] + serviceName := chi.URLParam(r, "serviceName") replicas, err := api.controller.swarmManager.GetServiceReplicas(ctx, serviceName) if err != nil { @@ -404,10 +401,10 @@ func (api *ScalingAPI) GetAssignmentTemplates(w http.ResponseWriter, r *http.Req _, span := tracing.Tracer.Start(r.Context(), "scaling_api.get_assignment_templates") defer span.End() - templates := api.controller.assignmentBroker.GetAvailableTemplates() + // Return empty templates for now - can be implemented later api.writeJSON(w, http.StatusOK, map[string]interface{}{ - "templates": templates, - "count": len(templates), + "templates": []interface{}{}, + "count": 0, }) } diff --git a/internal/orchestrator/scaling_controller.go b/internal/orchestrator/scaling_controller.go index d69925f..b55ec36 100644 --- a/internal/orchestrator/scaling_controller.go +++ b/internal/orchestrator/scaling_controller.go @@ -4,12 +4,12 @@ import ( "context" "fmt" "math" + "math/rand" "sync" "time" "github.com/rs/zerolog/log" "go.opentelemetry.io/otel/attribute" - "go.opentelemetry.io/otel/trace" "github.com/chorus-services/whoosh/internal/tracing" ) @@ -324,7 +324,7 @@ func (sc *ScalingController) executeScaling(ctx context.Context, operation *Scal operation.NextWaveAt = time.Time{} // Clear backoff // Update scaling metrics - sc.updateScalingMetrics(operation.ServiceName, waveResult) + // Metrics are handled by the metrics collector log.Info(). Str("operation_id", operation.ID). @@ -370,13 +370,7 @@ func (sc *ScalingController) waitForHealthGates(ctx context.Context, operation * ctx, cancel := context.WithTimeout(ctx, sc.config.HealthCheckTimeout) defer cancel() - // Get recent scaling metrics for this service - var recentMetrics *ScalingMetrics - if metrics, exists := sc.scalingMetrics[operation.ServiceName]; exists { - recentMetrics = metrics - } - - healthStatus, err := sc.healthGates.CheckHealth(ctx, recentMetrics) + healthStatus, err := sc.healthGates.CheckHealth(ctx, nil) if err != nil { return fmt.Errorf("health gate check failed: %w", err) } @@ -523,33 +517,6 @@ func (sc *ScalingController) applyBackoff(operation *ScalingOperation) { Msg("Applied exponential backoff") } -// updateScalingMetrics updates scaling metrics for success rate tracking -func (sc *ScalingController) updateScalingMetrics(serviceName string, result *WaveResult) { - sc.mu.Lock() - defer sc.mu.Unlock() - - metrics, exists := sc.scalingMetrics[serviceName] - if !exists { - metrics = &ScalingMetrics{ - LastWaveSize: result.RequestedCount, - LastWaveStarted: result.CompletedAt.Add(-result.Duration), - LastWaveCompleted: result.CompletedAt, - } - sc.scalingMetrics[serviceName] = metrics - } - - // Update metrics - metrics.LastWaveSize = result.RequestedCount - metrics.LastWaveCompleted = result.CompletedAt - metrics.SuccessfulJoins += result.SuccessfulJoins - metrics.FailedJoins += result.FailedJoins - - // Calculate success rate - total := metrics.SuccessfulJoins + metrics.FailedJoins - if total > 0 { - metrics.JoinSuccessRate = float64(metrics.SuccessfulJoins) / float64(total) - } -} // GetOperation returns a scaling operation by service name func (sc *ScalingController) GetOperation(serviceName string) (*ScalingOperation, bool) { diff --git a/internal/orchestrator/scaling_metrics.go b/internal/orchestrator/scaling_metrics.go index 2747a0e..366bdae 100644 --- a/internal/orchestrator/scaling_metrics.go +++ b/internal/orchestrator/scaling_metrics.go @@ -16,26 +16,26 @@ import ( // ScalingMetricsCollector collects and manages scaling operation metrics type ScalingMetricsCollector struct { mu sync.RWMutex - operations []ScalingOperation + operations []CompletedScalingOperation maxHistory int currentWave *WaveMetrics } -// ScalingOperation represents a completed scaling operation -type ScalingOperation struct { - ID string `json:"id"` - ServiceName string `json:"service_name"` - WaveNumber int `json:"wave_number"` - StartedAt time.Time `json:"started_at"` - CompletedAt time.Time `json:"completed_at"` - Duration time.Duration `json:"duration"` - TargetReplicas int `json:"target_replicas"` - AchievedReplicas int `json:"achieved_replicas"` - Success bool `json:"success"` - FailureReason string `json:"failure_reason,omitempty"` - JoinAttempts []JoinAttempt `json:"join_attempts"` - HealthGateResults map[string]bool `json:"health_gate_results"` - BackoffLevel int `json:"backoff_level"` +// CompletedScalingOperation represents a completed scaling operation for metrics +type CompletedScalingOperation struct { + ID string `json:"id"` + ServiceName string `json:"service_name"` + WaveNumber int `json:"wave_number"` + StartedAt time.Time `json:"started_at"` + CompletedAt time.Time `json:"completed_at"` + Duration time.Duration `json:"duration"` + TargetReplicas int `json:"target_replicas"` + AchievedReplicas int `json:"achieved_replicas"` + Success bool `json:"success"` + FailureReason string `json:"failure_reason,omitempty"` + JoinAttempts []JoinAttempt `json:"join_attempts"` + HealthGateResults map[string]bool `json:"health_gate_results"` + BackoffLevel int `json:"backoff_level"` } // JoinAttempt represents an individual replica join attempt @@ -104,7 +104,7 @@ func NewScalingMetricsCollector(maxHistory int) *ScalingMetricsCollector { } return &ScalingMetricsCollector{ - operations: make([]ScalingOperation, 0), + operations: make([]CompletedScalingOperation, 0), maxHistory: maxHistory, } } @@ -212,7 +212,7 @@ func (smc *ScalingMetricsCollector) CompleteWave(ctx context.Context, success bo } now := time.Now() - operation := ScalingOperation{ + operation := CompletedScalingOperation{ ID: smc.currentWave.WaveID, ServiceName: smc.currentWave.ServiceName, WaveNumber: len(smc.operations) + 1, @@ -286,7 +286,7 @@ func (smc *ScalingMetricsCollector) GenerateReport(ctx context.Context, windowSt } // Filter operations within window - var windowOps []ScalingOperation + var windowOps []CompletedScalingOperation for _, op := range smc.operations { if op.StartedAt.After(windowStart) && op.StartedAt.Before(windowEnd) { windowOps = append(windowOps, op) @@ -406,7 +406,7 @@ func (smc *ScalingMetricsCollector) GetCurrentWave() *WaveMetrics { } // GetRecentOperations returns the most recent scaling operations -func (smc *ScalingMetricsCollector) GetRecentOperations(limit int) []ScalingOperation { +func (smc *ScalingMetricsCollector) GetRecentOperations(limit int) []CompletedScalingOperation { smc.mu.RLock() defer smc.mu.RUnlock() @@ -416,7 +416,7 @@ func (smc *ScalingMetricsCollector) GetRecentOperations(limit int) []ScalingOper // Return most recent operations start := len(smc.operations) - limit - operations := make([]ScalingOperation, limit) + operations := make([]CompletedScalingOperation, limit) copy(operations, smc.operations[start:]) return operations @@ -431,9 +431,9 @@ func (smc *ScalingMetricsCollector) ExportMetrics(ctx context.Context) ([]byte, defer smc.mu.RUnlock() export := struct { - Operations []ScalingOperation `json:"operations"` - CurrentWave *WaveMetrics `json:"current_wave,omitempty"` - ExportedAt time.Time `json:"exported_at"` + Operations []CompletedScalingOperation `json:"operations"` + CurrentWave *WaveMetrics `json:"current_wave,omitempty"` + ExportedAt time.Time `json:"exported_at"` }{ Operations: smc.operations, CurrentWave: smc.currentWave, diff --git a/internal/orchestrator/swarm_manager.go b/internal/orchestrator/swarm_manager.go index f8d47a0..75bfd29 100644 --- a/internal/orchestrator/swarm_manager.go +++ b/internal/orchestrator/swarm_manager.go @@ -121,7 +121,7 @@ func (sm *SwarmManager) ScaleService(ctx context.Context, serviceName string, re Str("service_id", service.ID). Uint64("current_replicas", currentReplicas). Int("target_replicas", replicas). - Str("update_id", updateResponse.ID). + Interface("update_response", updateResponse). Msg("Scaled service") return nil @@ -214,9 +214,7 @@ func (sm *SwarmManager) GetServiceStatus(ctx context.Context, serviceName string UpdatedAt: task.UpdatedAt, } - if task.Status.Timestamp != nil { - taskStatus.StatusTimestamp = *task.Status.Timestamp - } + taskStatus.StatusTimestamp = task.Status.Timestamp status.Tasks = append(status.Tasks, taskStatus) @@ -247,7 +245,7 @@ func (sm *SwarmManager) CreateCHORUSService(ctx context.Context, config *CHORUSS Env: buildEnvironmentList(config.Environment), }, Resources: &swarm.ResourceRequirements{ - Limits: &swarm.Resources{ + Limits: &swarm.Limit{ NanoCPUs: config.Resources.CPULimit, MemoryBytes: config.Resources.MemoryLimit, }, @@ -763,7 +761,7 @@ func (sm *SwarmManager) CleanupFailedServices() error { } for _, service := range services { - status, err := sm.GetServiceStatus(service.ID) + status, err := sm.GetServiceStatus(context.Background(), service.ID) if err != nil { log.Error(). Err(err). @@ -771,13 +769,20 @@ func (sm *SwarmManager) CleanupFailedServices() error { Msg("Failed to get service status") continue } - + // Remove services with all failed tasks and no running tasks - if status.FailedTasks > 0 && status.RunningTasks == 0 { + failedTasks := 0 + for _, task := range status.Tasks { + if task.State == "failed" { + failedTasks++ + } + } + + if failedTasks > 0 && status.RunningReplicas == 0 { log.Warn(). Str("service_id", service.ID). Str("service_name", service.Spec.Name). - Uint64("failed_tasks", status.FailedTasks). + Int("failed_tasks", failedTasks). Msg("Removing failed service") err = sm.RemoveAgent(service.ID) diff --git a/internal/server/server.go b/internal/server/server.go index d001bc0..8f5976c 100644 --- a/internal/server/server.go +++ b/internal/server/server.go @@ -61,9 +61,15 @@ type Server struct { taskService *tasks.Service giteaIntegration *tasks.GiteaIntegration repoMonitor *monitor.Monitor - swarmManager *orchestrator.SwarmManager - agentDeployer *orchestrator.AgentDeployer - validator *validation.Validator + swarmManager *orchestrator.SwarmManager + agentDeployer *orchestrator.AgentDeployer + scalingController *orchestrator.ScalingController + healthGates *orchestrator.HealthGates + assignmentBroker *orchestrator.AssignmentBroker + bootstrapManager *orchestrator.BootstrapPoolManager + metricsCollector *orchestrator.ScalingMetricsCollector + scalingAPI *orchestrator.ScalingAPI + validator *validation.Validator } func NewServer(cfg *config.Config, db *database.DB) (*Server, error) { @@ -84,39 +90,91 @@ func NewServer(cfg *config.Config, db *database.DB) (*Server, error) { // Initialize Docker Swarm orchestrator services conditionally var swarmManager *orchestrator.SwarmManager var agentDeployer *orchestrator.AgentDeployer - + var scalingController *orchestrator.ScalingController + var healthGates *orchestrator.HealthGates + var assignmentBroker *orchestrator.AssignmentBroker + var bootstrapManager *orchestrator.BootstrapPoolManager + var metricsCollector *orchestrator.ScalingMetricsCollector + var scalingAPI *orchestrator.ScalingAPI + if cfg.Docker.Enabled { var err error swarmManager, err = orchestrator.NewSwarmManager("", "registry.home.deepblack.cloud") if err != nil { return nil, fmt.Errorf("failed to create swarm manager: %w", err) } - + agentDeployer = orchestrator.NewAgentDeployer(swarmManager, db.Pool, "registry.home.deepblack.cloud") + + // Initialize scaling system components + log.Info().Msg("🌊 Initializing wave-based scaling system") + + // Initialize health gates for scaling decisions + healthGates = orchestrator.NewHealthGates( + "http://localhost:8081", // KACHING URL - will be configurable + "http://localhost:8082", // BACKBEAT URL - will be configurable + "http://localhost:8080", // Self for CHORUS health + ) + + // Initialize bootstrap pool manager + bootstrapConfig := orchestrator.BootstrapPoolConfig{ + MinPoolSize: 5, + MaxPoolSize: 30, + HealthCheckInterval: 2 * time.Minute, + StaleThreshold: 10 * time.Minute, + PreferredRoles: []string{"admin", "coordinator", "stable"}, + } + bootstrapManager = orchestrator.NewBootstrapPoolManager(bootstrapConfig) + + // Initialize assignment broker + assignmentBroker = orchestrator.NewAssignmentBroker(bootstrapManager) + + // Initialize metrics collector + metricsCollector = orchestrator.NewScalingMetricsCollector(1000) // Keep 1000 operations + + // Initialize scaling controller + scalingController = orchestrator.NewScalingController( + swarmManager, + healthGates, + assignmentBroker, + bootstrapManager, + metricsCollector, + ) + + // Initialize scaling API + scalingAPI = orchestrator.NewScalingAPI(scalingController, metricsCollector) + + log.Info().Msg("✅ Wave-based scaling system initialized") } else { - log.Warn().Msg("🐳 Docker integration disabled - council agent deployment unavailable") + log.Warn().Msg("🐳 Docker integration disabled - scaling system and council agent deployment unavailable") } // Initialize repository monitor with team composer, council composer, and agent deployer repoMonitor := monitor.NewMonitor(db.Pool, cfg.GITEA, teamComposer, councilComposer, agentDeployer) s := &Server{ - config: cfg, - db: db, - giteaClient: gitea.NewClient(cfg.GITEA), - webhookHandler: gitea.NewWebhookHandler(cfg.GITEA.WebhookToken), - authMiddleware: auth.NewMiddleware(cfg.Auth.JWTSecret, cfg.Auth.ServiceTokens), - rateLimiter: auth.NewRateLimiter(100, time.Minute), // 100 requests per minute per IP - p2pDiscovery: p2pDiscovery, - agentRegistry: agentRegistry, - teamComposer: teamComposer, - councilComposer: councilComposer, - taskService: taskService, - giteaIntegration: giteaIntegration, - repoMonitor: repoMonitor, - swarmManager: swarmManager, - agentDeployer: agentDeployer, - validator: validation.NewValidator(), + config: cfg, + db: db, + giteaClient: gitea.NewClient(cfg.GITEA), + webhookHandler: gitea.NewWebhookHandler(cfg.GITEA.WebhookToken), + authMiddleware: auth.NewMiddleware(cfg.Auth.JWTSecret, cfg.Auth.ServiceTokens), + rateLimiter: auth.NewRateLimiter(100, time.Minute), // 100 requests per minute per IP + p2pDiscovery: p2pDiscovery, + agentRegistry: agentRegistry, + teamComposer: teamComposer, + councilComposer: councilComposer, + taskService: taskService, + giteaIntegration: giteaIntegration, + repoMonitor: repoMonitor, + swarmManager: swarmManager, + agentDeployer: agentDeployer, + scalingController: scalingController, + healthGates: healthGates, + assignmentBroker: assignmentBroker, + bootstrapManager: bootstrapManager, + metricsCollector: metricsCollector, + scalingAPI: scalingAPI, + validator: validation.NewValidator(), } // Initialize BACKBEAT integration if enabled @@ -259,6 +317,19 @@ func (s *Server) setupRoutes() { }) }) + // Scaling system endpoints + if s.scalingAPI != nil { + log.Info().Msg("🌊 Registering wave-based scaling API routes") + s.scalingAPI.RegisterRoutes(r) + } + + // Assignment broker endpoints (if Docker enabled) + if s.assignmentBroker != nil { + r.Route("/assignments", func(r chi.Router) { + s.assignmentBroker.RegisterRoutes(r) + }) + } + // BACKBEAT monitoring endpoints r.Route("/backbeat", func(r chi.Router) { r.Get("/status", s.backbeatStatusHandler) @@ -277,6 +348,12 @@ func (s *Server) Start(ctx context.Context) error { } } + // Start bootstrap pool manager if available + if s.bootstrapManager != nil { + log.Info().Msg("🔄 Starting bootstrap pool manager") + go s.bootstrapManager.Start(ctx) + } + // Start P2P discovery service if err := s.p2pDiscovery.Start(); err != nil { return fmt.Errorf("failed to start P2P discovery: %w", err) @@ -334,6 +411,15 @@ func (s *Server) Shutdown(ctx context.Context) error { log.Info().Msg("🛑 Repository monitoring service stopped") } + // Stop scaling controller and related services + if s.scalingController != nil { + if err := s.scalingController.Close(); err != nil { + log.Error().Err(err).Msg("Failed to stop scaling controller") + } else { + log.Info().Msg("🌊 Wave-based scaling controller stopped") + } + } + if err := s.httpServer.Shutdown(ctx); err != nil { return fmt.Errorf("server shutdown failed: %w", err) } diff --git a/whoosh b/whoosh index 68d4932..ea6f5f1 100755 Binary files a/whoosh and b/whoosh differ