package orchestrator import ( "context" "encoding/json" "fmt" "math/rand" "net/http" "strconv" "sync" "time" "github.com/go-chi/chi/v5" "github.com/rs/zerolog/log" "go.opentelemetry.io/otel/attribute" "github.com/chorus-services/whoosh/internal/tracing" ) // AssignmentBroker manages per-replica assignments for CHORUS instances type AssignmentBroker struct { mu sync.RWMutex assignments map[string]*Assignment templates map[string]*AssignmentTemplate bootstrap *BootstrapPoolManager } // Assignment represents a configuration assignment for a CHORUS replica type Assignment struct { ID string `json:"id"` TaskSlot string `json:"task_slot,omitempty"` TaskID string `json:"task_id,omitempty"` ClusterID string `json:"cluster_id"` Role string `json:"role"` Model string `json:"model"` PromptUCXL string `json:"prompt_ucxl,omitempty"` Specialization string `json:"specialization"` Capabilities []string `json:"capabilities"` Environment map[string]string `json:"environment,omitempty"` BootstrapPeers []string `json:"bootstrap_peers"` JoinStaggerMS int `json:"join_stagger_ms"` DialsPerSecond int `json:"dials_per_second"` MaxConcurrentDHT int `json:"max_concurrent_dht"` ConfigEpoch int64 `json:"config_epoch"` AssignedAt time.Time `json:"assigned_at"` ExpiresAt time.Time `json:"expires_at,omitempty"` } // AssignmentTemplate defines a template for creating assignments type AssignmentTemplate struct { Name string `json:"name"` Role string `json:"role"` Model string `json:"model"` PromptUCXL string `json:"prompt_ucxl,omitempty"` Specialization string `json:"specialization"` Capabilities []string `json:"capabilities"` Environment map[string]string `json:"environment,omitempty"` // Scaling configuration DialsPerSecond int `json:"dials_per_second"` MaxConcurrentDHT int `json:"max_concurrent_dht"` BootstrapPeerCount int `json:"bootstrap_peer_count"` // How many bootstrap peers to assign MaxStaggerMS int `json:"max_stagger_ms"` // Maximum stagger delay } // AssignmentRequest represents a request for assignment type AssignmentRequest struct { TaskSlot string `json:"task_slot,omitempty"` TaskID string `json:"task_id,omitempty"` ClusterID string `json:"cluster_id"` Template string `json:"template,omitempty"` // Template name to use Role string `json:"role,omitempty"` // Override role Model string `json:"model,omitempty"` // Override model } // AssignmentStats represents statistics about assignments type AssignmentStats struct { TotalAssignments int `json:"total_assignments"` AssignmentsByRole map[string]int `json:"assignments_by_role"` AssignmentsByModel map[string]int `json:"assignments_by_model"` ActiveAssignments int `json:"active_assignments"` ExpiredAssignments int `json:"expired_assignments"` TemplateCount int `json:"template_count"` AvgStaggerMS float64 `json:"avg_stagger_ms"` } // NewAssignmentBroker creates a new assignment broker func NewAssignmentBroker(bootstrapManager *BootstrapPoolManager) *AssignmentBroker { broker := &AssignmentBroker{ assignments: make(map[string]*Assignment), templates: make(map[string]*AssignmentTemplate), bootstrap: bootstrapManager, } // Initialize default templates broker.initializeDefaultTemplates() return broker } // initializeDefaultTemplates sets up default assignment templates func (ab *AssignmentBroker) initializeDefaultTemplates() { defaultTemplates := []*AssignmentTemplate{ { Name: "general-developer", Role: "developer", Model: "meta/llama-3.1-8b-instruct", Specialization: "general_developer", Capabilities: []string{"general_development", "task_coordination"}, DialsPerSecond: 5, MaxConcurrentDHT: 16, BootstrapPeerCount: 3, MaxStaggerMS: 20000, }, { Name: "code-reviewer", Role: "reviewer", Model: "meta/llama-3.1-70b-instruct", Specialization: "code_reviewer", Capabilities: []string{"code_review", "quality_assurance"}, DialsPerSecond: 3, MaxConcurrentDHT: 8, BootstrapPeerCount: 2, MaxStaggerMS: 15000, }, { Name: "task-coordinator", Role: "coordinator", Model: "meta/llama-3.1-8b-instruct", Specialization: "task_coordinator", Capabilities: []string{"task_coordination", "planning"}, DialsPerSecond: 8, MaxConcurrentDHT: 24, BootstrapPeerCount: 4, MaxStaggerMS: 10000, }, { Name: "admin", Role: "admin", Model: "meta/llama-3.1-70b-instruct", Specialization: "system_admin", Capabilities: []string{"administration", "leadership", "slurp_operations"}, DialsPerSecond: 10, MaxConcurrentDHT: 32, BootstrapPeerCount: 5, MaxStaggerMS: 5000, }, } for _, template := range defaultTemplates { ab.templates[template.Name] = template } log.Info().Int("template_count", len(defaultTemplates)).Msg("Initialized default assignment templates") } // RegisterRoutes registers HTTP routes for the assignment broker func (ab *AssignmentBroker) RegisterRoutes(router chi.Router) { router.Get("/assign", ab.handleAssignRequest) router.Get("/", ab.handleListAssignments) router.Get("/{id}", ab.handleGetAssignment) router.Delete("/{id}", ab.handleDeleteAssignment) router.Route("/templates", func(r chi.Router) { r.Get("/", ab.handleListTemplates) r.Post("/", ab.handleCreateTemplate) r.Get("/{name}", ab.handleGetTemplate) }) router.Get("/stats", ab.handleGetStats) } // handleAssignRequest handles requests for new assignments func (ab *AssignmentBroker) handleAssignRequest(w http.ResponseWriter, r *http.Request) { ctx, span := tracing.Tracer.Start(r.Context(), "assignment_broker.assign_request") defer span.End() // Parse query parameters req := AssignmentRequest{ TaskSlot: r.URL.Query().Get("slot"), TaskID: r.URL.Query().Get("task"), ClusterID: r.URL.Query().Get("cluster"), Template: r.URL.Query().Get("template"), Role: r.URL.Query().Get("role"), Model: r.URL.Query().Get("model"), } // Default cluster ID if not provided if req.ClusterID == "" { req.ClusterID = "default" } // Default template if not provided if req.Template == "" { req.Template = "general-developer" } span.SetAttributes( attribute.String("assignment.cluster_id", req.ClusterID), attribute.String("assignment.template", req.Template), attribute.String("assignment.task_slot", req.TaskSlot), attribute.String("assignment.task_id", req.TaskID), ) // Create assignment assignment, err := ab.CreateAssignment(ctx, req) if err != nil { log.Error().Err(err).Msg("Failed to create assignment") http.Error(w, fmt.Sprintf("Failed to create assignment: %v", err), http.StatusInternalServerError) return } log.Info(). Str("assignment_id", assignment.ID). Str("role", assignment.Role). Str("model", assignment.Model). Str("cluster_id", assignment.ClusterID). Msg("Created assignment") w.Header().Set("Content-Type", "application/json") json.NewEncoder(w).Encode(assignment) } // handleListAssignments returns all active assignments func (ab *AssignmentBroker) handleListAssignments(w http.ResponseWriter, r *http.Request) { ab.mu.RLock() defer ab.mu.RUnlock() assignments := make([]*Assignment, 0, len(ab.assignments)) for _, assignment := range ab.assignments { // Only return non-expired assignments if assignment.ExpiresAt.IsZero() || time.Now().Before(assignment.ExpiresAt) { assignments = append(assignments, assignment) } } w.Header().Set("Content-Type", "application/json") json.NewEncoder(w).Encode(assignments) } // handleGetAssignment returns a specific assignment by ID func (ab *AssignmentBroker) handleGetAssignment(w http.ResponseWriter, r *http.Request) { assignmentID := chi.URLParam(r, "id") ab.mu.RLock() assignment, exists := ab.assignments[assignmentID] ab.mu.RUnlock() if !exists { http.Error(w, "Assignment not found", http.StatusNotFound) return } w.Header().Set("Content-Type", "application/json") json.NewEncoder(w).Encode(assignment) } // handleDeleteAssignment deletes an assignment func (ab *AssignmentBroker) handleDeleteAssignment(w http.ResponseWriter, r *http.Request) { assignmentID := chi.URLParam(r, "id") ab.mu.Lock() defer ab.mu.Unlock() if _, exists := ab.assignments[assignmentID]; !exists { http.Error(w, "Assignment not found", http.StatusNotFound) return } delete(ab.assignments, assignmentID) log.Info().Str("assignment_id", assignmentID).Msg("Deleted assignment") w.WriteHeader(http.StatusNoContent) } // handleListTemplates returns all available templates func (ab *AssignmentBroker) handleListTemplates(w http.ResponseWriter, r *http.Request) { ab.mu.RLock() defer ab.mu.RUnlock() templates := make([]*AssignmentTemplate, 0, len(ab.templates)) for _, template := range ab.templates { templates = append(templates, template) } w.Header().Set("Content-Type", "application/json") json.NewEncoder(w).Encode(templates) } // handleCreateTemplate creates a new assignment template func (ab *AssignmentBroker) handleCreateTemplate(w http.ResponseWriter, r *http.Request) { var template AssignmentTemplate if err := json.NewDecoder(r.Body).Decode(&template); err != nil { http.Error(w, "Invalid template data", http.StatusBadRequest) return } if template.Name == "" { http.Error(w, "Template name is required", http.StatusBadRequest) return } ab.mu.Lock() ab.templates[template.Name] = &template ab.mu.Unlock() log.Info().Str("template_name", template.Name).Msg("Created assignment template") w.Header().Set("Content-Type", "application/json") w.WriteHeader(http.StatusCreated) json.NewEncoder(w).Encode(&template) } // handleGetTemplate returns a specific template func (ab *AssignmentBroker) handleGetTemplate(w http.ResponseWriter, r *http.Request) { templateName := chi.URLParam(r, "name") ab.mu.RLock() template, exists := ab.templates[templateName] ab.mu.RUnlock() if !exists { http.Error(w, "Template not found", http.StatusNotFound) return } w.Header().Set("Content-Type", "application/json") json.NewEncoder(w).Encode(template) } // handleGetStats returns assignment statistics func (ab *AssignmentBroker) handleGetStats(w http.ResponseWriter, r *http.Request) { stats := ab.GetStats() w.Header().Set("Content-Type", "application/json") json.NewEncoder(w).Encode(stats) } // CreateAssignment creates a new assignment from a request func (ab *AssignmentBroker) CreateAssignment(ctx context.Context, req AssignmentRequest) (*Assignment, error) { ab.mu.Lock() defer ab.mu.Unlock() // Get template template, exists := ab.templates[req.Template] if !exists { return nil, fmt.Errorf("template '%s' not found", req.Template) } // Generate assignment ID assignmentID := ab.generateAssignmentID(req) // Get bootstrap peer subset var bootstrapPeers []string if ab.bootstrap != nil { subset := ab.bootstrap.GetSubset(template.BootstrapPeerCount) for _, peer := range subset.Peers { if len(peer.Addresses) > 0 { bootstrapPeers = append(bootstrapPeers, fmt.Sprintf("%s/p2p/%s", peer.Addresses[0], peer.ID)) } } } // Generate stagger delay staggerMS := 0 if template.MaxStaggerMS > 0 { staggerMS = rand.Intn(template.MaxStaggerMS) } // Create assignment assignment := &Assignment{ ID: assignmentID, TaskSlot: req.TaskSlot, TaskID: req.TaskID, ClusterID: req.ClusterID, Role: template.Role, Model: template.Model, PromptUCXL: template.PromptUCXL, Specialization: template.Specialization, Capabilities: template.Capabilities, Environment: make(map[string]string), BootstrapPeers: bootstrapPeers, JoinStaggerMS: staggerMS, DialsPerSecond: template.DialsPerSecond, MaxConcurrentDHT: template.MaxConcurrentDHT, ConfigEpoch: time.Now().Unix(), AssignedAt: time.Now(), ExpiresAt: time.Now().Add(24 * time.Hour), // 24 hour default expiry } // Apply request overrides if req.Role != "" { assignment.Role = req.Role } if req.Model != "" { assignment.Model = req.Model } // Copy environment from template for key, value := range template.Environment { assignment.Environment[key] = value } // Add assignment-specific environment assignment.Environment["ASSIGNMENT_ID"] = assignmentID assignment.Environment["CONFIG_EPOCH"] = strconv.FormatInt(assignment.ConfigEpoch, 10) assignment.Environment["DISABLE_MDNS"] = "true" assignment.Environment["DIALS_PER_SEC"] = strconv.Itoa(assignment.DialsPerSecond) assignment.Environment["MAX_CONCURRENT_DHT"] = strconv.Itoa(assignment.MaxConcurrentDHT) assignment.Environment["JOIN_STAGGER_MS"] = strconv.Itoa(assignment.JoinStaggerMS) // Store assignment ab.assignments[assignmentID] = assignment return assignment, nil } // generateAssignmentID generates a unique assignment ID func (ab *AssignmentBroker) generateAssignmentID(req AssignmentRequest) string { timestamp := time.Now().Unix() if req.TaskSlot != "" && req.TaskID != "" { return fmt.Sprintf("assign-%s-%s-%d", req.TaskSlot, req.TaskID, timestamp) } if req.TaskSlot != "" { return fmt.Sprintf("assign-%s-%d", req.TaskSlot, timestamp) } return fmt.Sprintf("assign-%s-%d", req.ClusterID, timestamp) } // GetStats returns assignment statistics func (ab *AssignmentBroker) GetStats() *AssignmentStats { ab.mu.RLock() defer ab.mu.RUnlock() stats := &AssignmentStats{ TotalAssignments: len(ab.assignments), AssignmentsByRole: make(map[string]int), AssignmentsByModel: make(map[string]int), TemplateCount: len(ab.templates), } var totalStagger int activeCount := 0 expiredCount := 0 now := time.Now() for _, assignment := range ab.assignments { // Count by role stats.AssignmentsByRole[assignment.Role]++ // Count by model stats.AssignmentsByModel[assignment.Model]++ // Track stagger for average totalStagger += assignment.JoinStaggerMS // Count active vs expired if assignment.ExpiresAt.IsZero() || now.Before(assignment.ExpiresAt) { activeCount++ } else { expiredCount++ } } stats.ActiveAssignments = activeCount stats.ExpiredAssignments = expiredCount if len(ab.assignments) > 0 { stats.AvgStaggerMS = float64(totalStagger) / float64(len(ab.assignments)) } return stats } // CleanupExpiredAssignments removes expired assignments func (ab *AssignmentBroker) CleanupExpiredAssignments() { ab.mu.Lock() defer ab.mu.Unlock() now := time.Now() expiredCount := 0 for id, assignment := range ab.assignments { if !assignment.ExpiresAt.IsZero() && now.After(assignment.ExpiresAt) { delete(ab.assignments, id) expiredCount++ } } if expiredCount > 0 { log.Info().Int("expired_count", expiredCount).Msg("Cleaned up expired assignments") } } // GetAssignment returns an assignment by ID func (ab *AssignmentBroker) GetAssignment(id string) (*Assignment, bool) { ab.mu.RLock() defer ab.mu.RUnlock() assignment, exists := ab.assignments[id] return assignment, exists }