- Add scaling system components to server initialization - Register scaling API and assignment broker routes - Start bootstrap pool manager in server lifecycle - Add graceful shutdown for scaling controller - Update API routing to use chi.Router instead of gorilla/mux - Fix Docker API compatibility issues - Configure health gates with placeholder URLs for KACHING and BACKBEAT 🤖 Generated with [Claude Code](https://claude.ai/code) Co-Authored-By: Claude <noreply@anthropic.com>
502 lines
15 KiB
Go
502 lines
15 KiB
Go
package orchestrator
|
|
|
|
import (
|
|
"context"
|
|
"encoding/json"
|
|
"fmt"
|
|
"math/rand"
|
|
"net/http"
|
|
"strconv"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/go-chi/chi/v5"
|
|
"github.com/rs/zerolog/log"
|
|
"go.opentelemetry.io/otel/attribute"
|
|
|
|
"github.com/chorus-services/whoosh/internal/tracing"
|
|
)
|
|
|
|
// AssignmentBroker manages per-replica assignments for CHORUS instances
|
|
type AssignmentBroker struct {
|
|
mu sync.RWMutex
|
|
assignments map[string]*Assignment
|
|
templates map[string]*AssignmentTemplate
|
|
bootstrap *BootstrapPoolManager
|
|
}
|
|
|
|
// Assignment represents a configuration assignment for a CHORUS replica
|
|
type Assignment struct {
|
|
ID string `json:"id"`
|
|
TaskSlot string `json:"task_slot,omitempty"`
|
|
TaskID string `json:"task_id,omitempty"`
|
|
ClusterID string `json:"cluster_id"`
|
|
Role string `json:"role"`
|
|
Model string `json:"model"`
|
|
PromptUCXL string `json:"prompt_ucxl,omitempty"`
|
|
Specialization string `json:"specialization"`
|
|
Capabilities []string `json:"capabilities"`
|
|
Environment map[string]string `json:"environment,omitempty"`
|
|
BootstrapPeers []string `json:"bootstrap_peers"`
|
|
JoinStaggerMS int `json:"join_stagger_ms"`
|
|
DialsPerSecond int `json:"dials_per_second"`
|
|
MaxConcurrentDHT int `json:"max_concurrent_dht"`
|
|
ConfigEpoch int64 `json:"config_epoch"`
|
|
AssignedAt time.Time `json:"assigned_at"`
|
|
ExpiresAt time.Time `json:"expires_at,omitempty"`
|
|
}
|
|
|
|
// AssignmentTemplate defines a template for creating assignments
|
|
type AssignmentTemplate struct {
|
|
Name string `json:"name"`
|
|
Role string `json:"role"`
|
|
Model string `json:"model"`
|
|
PromptUCXL string `json:"prompt_ucxl,omitempty"`
|
|
Specialization string `json:"specialization"`
|
|
Capabilities []string `json:"capabilities"`
|
|
Environment map[string]string `json:"environment,omitempty"`
|
|
|
|
// Scaling configuration
|
|
DialsPerSecond int `json:"dials_per_second"`
|
|
MaxConcurrentDHT int `json:"max_concurrent_dht"`
|
|
BootstrapPeerCount int `json:"bootstrap_peer_count"` // How many bootstrap peers to assign
|
|
MaxStaggerMS int `json:"max_stagger_ms"` // Maximum stagger delay
|
|
}
|
|
|
|
// AssignmentRequest represents a request for assignment
|
|
type AssignmentRequest struct {
|
|
TaskSlot string `json:"task_slot,omitempty"`
|
|
TaskID string `json:"task_id,omitempty"`
|
|
ClusterID string `json:"cluster_id"`
|
|
Template string `json:"template,omitempty"` // Template name to use
|
|
Role string `json:"role,omitempty"` // Override role
|
|
Model string `json:"model,omitempty"` // Override model
|
|
}
|
|
|
|
// AssignmentStats represents statistics about assignments
|
|
type AssignmentStats struct {
|
|
TotalAssignments int `json:"total_assignments"`
|
|
AssignmentsByRole map[string]int `json:"assignments_by_role"`
|
|
AssignmentsByModel map[string]int `json:"assignments_by_model"`
|
|
ActiveAssignments int `json:"active_assignments"`
|
|
ExpiredAssignments int `json:"expired_assignments"`
|
|
TemplateCount int `json:"template_count"`
|
|
AvgStaggerMS float64 `json:"avg_stagger_ms"`
|
|
}
|
|
|
|
// NewAssignmentBroker creates a new assignment broker
|
|
func NewAssignmentBroker(bootstrapManager *BootstrapPoolManager) *AssignmentBroker {
|
|
broker := &AssignmentBroker{
|
|
assignments: make(map[string]*Assignment),
|
|
templates: make(map[string]*AssignmentTemplate),
|
|
bootstrap: bootstrapManager,
|
|
}
|
|
|
|
// Initialize default templates
|
|
broker.initializeDefaultTemplates()
|
|
|
|
return broker
|
|
}
|
|
|
|
// initializeDefaultTemplates sets up default assignment templates
|
|
func (ab *AssignmentBroker) initializeDefaultTemplates() {
|
|
defaultTemplates := []*AssignmentTemplate{
|
|
{
|
|
Name: "general-developer",
|
|
Role: "developer",
|
|
Model: "meta/llama-3.1-8b-instruct",
|
|
Specialization: "general_developer",
|
|
Capabilities: []string{"general_development", "task_coordination"},
|
|
DialsPerSecond: 5,
|
|
MaxConcurrentDHT: 16,
|
|
BootstrapPeerCount: 3,
|
|
MaxStaggerMS: 20000,
|
|
},
|
|
{
|
|
Name: "code-reviewer",
|
|
Role: "reviewer",
|
|
Model: "meta/llama-3.1-70b-instruct",
|
|
Specialization: "code_reviewer",
|
|
Capabilities: []string{"code_review", "quality_assurance"},
|
|
DialsPerSecond: 3,
|
|
MaxConcurrentDHT: 8,
|
|
BootstrapPeerCount: 2,
|
|
MaxStaggerMS: 15000,
|
|
},
|
|
{
|
|
Name: "task-coordinator",
|
|
Role: "coordinator",
|
|
Model: "meta/llama-3.1-8b-instruct",
|
|
Specialization: "task_coordinator",
|
|
Capabilities: []string{"task_coordination", "planning"},
|
|
DialsPerSecond: 8,
|
|
MaxConcurrentDHT: 24,
|
|
BootstrapPeerCount: 4,
|
|
MaxStaggerMS: 10000,
|
|
},
|
|
{
|
|
Name: "admin",
|
|
Role: "admin",
|
|
Model: "meta/llama-3.1-70b-instruct",
|
|
Specialization: "system_admin",
|
|
Capabilities: []string{"administration", "leadership", "slurp_operations"},
|
|
DialsPerSecond: 10,
|
|
MaxConcurrentDHT: 32,
|
|
BootstrapPeerCount: 5,
|
|
MaxStaggerMS: 5000,
|
|
},
|
|
}
|
|
|
|
for _, template := range defaultTemplates {
|
|
ab.templates[template.Name] = template
|
|
}
|
|
|
|
log.Info().Int("template_count", len(defaultTemplates)).Msg("Initialized default assignment templates")
|
|
}
|
|
|
|
// RegisterRoutes registers HTTP routes for the assignment broker
|
|
func (ab *AssignmentBroker) RegisterRoutes(router chi.Router) {
|
|
router.Get("/assign", ab.handleAssignRequest)
|
|
router.Get("/", ab.handleListAssignments)
|
|
router.Get("/{id}", ab.handleGetAssignment)
|
|
router.Delete("/{id}", ab.handleDeleteAssignment)
|
|
router.Route("/templates", func(r chi.Router) {
|
|
r.Get("/", ab.handleListTemplates)
|
|
r.Post("/", ab.handleCreateTemplate)
|
|
r.Get("/{name}", ab.handleGetTemplate)
|
|
})
|
|
router.Get("/stats", ab.handleGetStats)
|
|
}
|
|
|
|
// handleAssignRequest handles requests for new assignments
|
|
func (ab *AssignmentBroker) handleAssignRequest(w http.ResponseWriter, r *http.Request) {
|
|
ctx, span := tracing.Tracer.Start(r.Context(), "assignment_broker.assign_request")
|
|
defer span.End()
|
|
|
|
// Parse query parameters
|
|
req := AssignmentRequest{
|
|
TaskSlot: r.URL.Query().Get("slot"),
|
|
TaskID: r.URL.Query().Get("task"),
|
|
ClusterID: r.URL.Query().Get("cluster"),
|
|
Template: r.URL.Query().Get("template"),
|
|
Role: r.URL.Query().Get("role"),
|
|
Model: r.URL.Query().Get("model"),
|
|
}
|
|
|
|
// Default cluster ID if not provided
|
|
if req.ClusterID == "" {
|
|
req.ClusterID = "default"
|
|
}
|
|
|
|
// Default template if not provided
|
|
if req.Template == "" {
|
|
req.Template = "general-developer"
|
|
}
|
|
|
|
span.SetAttributes(
|
|
attribute.String("assignment.cluster_id", req.ClusterID),
|
|
attribute.String("assignment.template", req.Template),
|
|
attribute.String("assignment.task_slot", req.TaskSlot),
|
|
attribute.String("assignment.task_id", req.TaskID),
|
|
)
|
|
|
|
// Create assignment
|
|
assignment, err := ab.CreateAssignment(ctx, req)
|
|
if err != nil {
|
|
log.Error().Err(err).Msg("Failed to create assignment")
|
|
http.Error(w, fmt.Sprintf("Failed to create assignment: %v", err), http.StatusInternalServerError)
|
|
return
|
|
}
|
|
|
|
log.Info().
|
|
Str("assignment_id", assignment.ID).
|
|
Str("role", assignment.Role).
|
|
Str("model", assignment.Model).
|
|
Str("cluster_id", assignment.ClusterID).
|
|
Msg("Created assignment")
|
|
|
|
w.Header().Set("Content-Type", "application/json")
|
|
json.NewEncoder(w).Encode(assignment)
|
|
}
|
|
|
|
// handleListAssignments returns all active assignments
|
|
func (ab *AssignmentBroker) handleListAssignments(w http.ResponseWriter, r *http.Request) {
|
|
ab.mu.RLock()
|
|
defer ab.mu.RUnlock()
|
|
|
|
assignments := make([]*Assignment, 0, len(ab.assignments))
|
|
for _, assignment := range ab.assignments {
|
|
// Only return non-expired assignments
|
|
if assignment.ExpiresAt.IsZero() || time.Now().Before(assignment.ExpiresAt) {
|
|
assignments = append(assignments, assignment)
|
|
}
|
|
}
|
|
|
|
w.Header().Set("Content-Type", "application/json")
|
|
json.NewEncoder(w).Encode(assignments)
|
|
}
|
|
|
|
// handleGetAssignment returns a specific assignment by ID
|
|
func (ab *AssignmentBroker) handleGetAssignment(w http.ResponseWriter, r *http.Request) {
|
|
assignmentID := chi.URLParam(r, "id")
|
|
|
|
ab.mu.RLock()
|
|
assignment, exists := ab.assignments[assignmentID]
|
|
ab.mu.RUnlock()
|
|
|
|
if !exists {
|
|
http.Error(w, "Assignment not found", http.StatusNotFound)
|
|
return
|
|
}
|
|
|
|
w.Header().Set("Content-Type", "application/json")
|
|
json.NewEncoder(w).Encode(assignment)
|
|
}
|
|
|
|
// handleDeleteAssignment deletes an assignment
|
|
func (ab *AssignmentBroker) handleDeleteAssignment(w http.ResponseWriter, r *http.Request) {
|
|
assignmentID := chi.URLParam(r, "id")
|
|
|
|
ab.mu.Lock()
|
|
defer ab.mu.Unlock()
|
|
|
|
if _, exists := ab.assignments[assignmentID]; !exists {
|
|
http.Error(w, "Assignment not found", http.StatusNotFound)
|
|
return
|
|
}
|
|
|
|
delete(ab.assignments, assignmentID)
|
|
log.Info().Str("assignment_id", assignmentID).Msg("Deleted assignment")
|
|
|
|
w.WriteHeader(http.StatusNoContent)
|
|
}
|
|
|
|
// handleListTemplates returns all available templates
|
|
func (ab *AssignmentBroker) handleListTemplates(w http.ResponseWriter, r *http.Request) {
|
|
ab.mu.RLock()
|
|
defer ab.mu.RUnlock()
|
|
|
|
templates := make([]*AssignmentTemplate, 0, len(ab.templates))
|
|
for _, template := range ab.templates {
|
|
templates = append(templates, template)
|
|
}
|
|
|
|
w.Header().Set("Content-Type", "application/json")
|
|
json.NewEncoder(w).Encode(templates)
|
|
}
|
|
|
|
// handleCreateTemplate creates a new assignment template
|
|
func (ab *AssignmentBroker) handleCreateTemplate(w http.ResponseWriter, r *http.Request) {
|
|
var template AssignmentTemplate
|
|
if err := json.NewDecoder(r.Body).Decode(&template); err != nil {
|
|
http.Error(w, "Invalid template data", http.StatusBadRequest)
|
|
return
|
|
}
|
|
|
|
if template.Name == "" {
|
|
http.Error(w, "Template name is required", http.StatusBadRequest)
|
|
return
|
|
}
|
|
|
|
ab.mu.Lock()
|
|
ab.templates[template.Name] = &template
|
|
ab.mu.Unlock()
|
|
|
|
log.Info().Str("template_name", template.Name).Msg("Created assignment template")
|
|
|
|
w.Header().Set("Content-Type", "application/json")
|
|
w.WriteHeader(http.StatusCreated)
|
|
json.NewEncoder(w).Encode(&template)
|
|
}
|
|
|
|
// handleGetTemplate returns a specific template
|
|
func (ab *AssignmentBroker) handleGetTemplate(w http.ResponseWriter, r *http.Request) {
|
|
templateName := chi.URLParam(r, "name")
|
|
|
|
ab.mu.RLock()
|
|
template, exists := ab.templates[templateName]
|
|
ab.mu.RUnlock()
|
|
|
|
if !exists {
|
|
http.Error(w, "Template not found", http.StatusNotFound)
|
|
return
|
|
}
|
|
|
|
w.Header().Set("Content-Type", "application/json")
|
|
json.NewEncoder(w).Encode(template)
|
|
}
|
|
|
|
// handleGetStats returns assignment statistics
|
|
func (ab *AssignmentBroker) handleGetStats(w http.ResponseWriter, r *http.Request) {
|
|
stats := ab.GetStats()
|
|
w.Header().Set("Content-Type", "application/json")
|
|
json.NewEncoder(w).Encode(stats)
|
|
}
|
|
|
|
// CreateAssignment creates a new assignment from a request
|
|
func (ab *AssignmentBroker) CreateAssignment(ctx context.Context, req AssignmentRequest) (*Assignment, error) {
|
|
ab.mu.Lock()
|
|
defer ab.mu.Unlock()
|
|
|
|
// Get template
|
|
template, exists := ab.templates[req.Template]
|
|
if !exists {
|
|
return nil, fmt.Errorf("template '%s' not found", req.Template)
|
|
}
|
|
|
|
// Generate assignment ID
|
|
assignmentID := ab.generateAssignmentID(req)
|
|
|
|
// Get bootstrap peer subset
|
|
var bootstrapPeers []string
|
|
if ab.bootstrap != nil {
|
|
subset := ab.bootstrap.GetSubset(template.BootstrapPeerCount)
|
|
for _, peer := range subset.Peers {
|
|
if len(peer.Addresses) > 0 {
|
|
bootstrapPeers = append(bootstrapPeers, fmt.Sprintf("%s/p2p/%s", peer.Addresses[0], peer.ID))
|
|
}
|
|
}
|
|
}
|
|
|
|
// Generate stagger delay
|
|
staggerMS := 0
|
|
if template.MaxStaggerMS > 0 {
|
|
staggerMS = rand.Intn(template.MaxStaggerMS)
|
|
}
|
|
|
|
// Create assignment
|
|
assignment := &Assignment{
|
|
ID: assignmentID,
|
|
TaskSlot: req.TaskSlot,
|
|
TaskID: req.TaskID,
|
|
ClusterID: req.ClusterID,
|
|
Role: template.Role,
|
|
Model: template.Model,
|
|
PromptUCXL: template.PromptUCXL,
|
|
Specialization: template.Specialization,
|
|
Capabilities: template.Capabilities,
|
|
Environment: make(map[string]string),
|
|
BootstrapPeers: bootstrapPeers,
|
|
JoinStaggerMS: staggerMS,
|
|
DialsPerSecond: template.DialsPerSecond,
|
|
MaxConcurrentDHT: template.MaxConcurrentDHT,
|
|
ConfigEpoch: time.Now().Unix(),
|
|
AssignedAt: time.Now(),
|
|
ExpiresAt: time.Now().Add(24 * time.Hour), // 24 hour default expiry
|
|
}
|
|
|
|
// Apply request overrides
|
|
if req.Role != "" {
|
|
assignment.Role = req.Role
|
|
}
|
|
if req.Model != "" {
|
|
assignment.Model = req.Model
|
|
}
|
|
|
|
// Copy environment from template
|
|
for key, value := range template.Environment {
|
|
assignment.Environment[key] = value
|
|
}
|
|
|
|
// Add assignment-specific environment
|
|
assignment.Environment["ASSIGNMENT_ID"] = assignmentID
|
|
assignment.Environment["CONFIG_EPOCH"] = strconv.FormatInt(assignment.ConfigEpoch, 10)
|
|
assignment.Environment["DISABLE_MDNS"] = "true"
|
|
assignment.Environment["DIALS_PER_SEC"] = strconv.Itoa(assignment.DialsPerSecond)
|
|
assignment.Environment["MAX_CONCURRENT_DHT"] = strconv.Itoa(assignment.MaxConcurrentDHT)
|
|
assignment.Environment["JOIN_STAGGER_MS"] = strconv.Itoa(assignment.JoinStaggerMS)
|
|
|
|
// Store assignment
|
|
ab.assignments[assignmentID] = assignment
|
|
|
|
return assignment, nil
|
|
}
|
|
|
|
// generateAssignmentID generates a unique assignment ID
|
|
func (ab *AssignmentBroker) generateAssignmentID(req AssignmentRequest) string {
|
|
timestamp := time.Now().Unix()
|
|
|
|
if req.TaskSlot != "" && req.TaskID != "" {
|
|
return fmt.Sprintf("assign-%s-%s-%d", req.TaskSlot, req.TaskID, timestamp)
|
|
}
|
|
|
|
if req.TaskSlot != "" {
|
|
return fmt.Sprintf("assign-%s-%d", req.TaskSlot, timestamp)
|
|
}
|
|
|
|
return fmt.Sprintf("assign-%s-%d", req.ClusterID, timestamp)
|
|
}
|
|
|
|
// GetStats returns assignment statistics
|
|
func (ab *AssignmentBroker) GetStats() *AssignmentStats {
|
|
ab.mu.RLock()
|
|
defer ab.mu.RUnlock()
|
|
|
|
stats := &AssignmentStats{
|
|
TotalAssignments: len(ab.assignments),
|
|
AssignmentsByRole: make(map[string]int),
|
|
AssignmentsByModel: make(map[string]int),
|
|
TemplateCount: len(ab.templates),
|
|
}
|
|
|
|
var totalStagger int
|
|
activeCount := 0
|
|
expiredCount := 0
|
|
now := time.Now()
|
|
|
|
for _, assignment := range ab.assignments {
|
|
// Count by role
|
|
stats.AssignmentsByRole[assignment.Role]++
|
|
|
|
// Count by model
|
|
stats.AssignmentsByModel[assignment.Model]++
|
|
|
|
// Track stagger for average
|
|
totalStagger += assignment.JoinStaggerMS
|
|
|
|
// Count active vs expired
|
|
if assignment.ExpiresAt.IsZero() || now.Before(assignment.ExpiresAt) {
|
|
activeCount++
|
|
} else {
|
|
expiredCount++
|
|
}
|
|
}
|
|
|
|
stats.ActiveAssignments = activeCount
|
|
stats.ExpiredAssignments = expiredCount
|
|
|
|
if len(ab.assignments) > 0 {
|
|
stats.AvgStaggerMS = float64(totalStagger) / float64(len(ab.assignments))
|
|
}
|
|
|
|
return stats
|
|
}
|
|
|
|
// CleanupExpiredAssignments removes expired assignments
|
|
func (ab *AssignmentBroker) CleanupExpiredAssignments() {
|
|
ab.mu.Lock()
|
|
defer ab.mu.Unlock()
|
|
|
|
now := time.Now()
|
|
expiredCount := 0
|
|
|
|
for id, assignment := range ab.assignments {
|
|
if !assignment.ExpiresAt.IsZero() && now.After(assignment.ExpiresAt) {
|
|
delete(ab.assignments, id)
|
|
expiredCount++
|
|
}
|
|
}
|
|
|
|
if expiredCount > 0 {
|
|
log.Info().Int("expired_count", expiredCount).Msg("Cleaned up expired assignments")
|
|
}
|
|
}
|
|
|
|
// GetAssignment returns an assignment by ID
|
|
func (ab *AssignmentBroker) GetAssignment(id string) (*Assignment, bool) {
|
|
ab.mu.RLock()
|
|
defer ab.mu.RUnlock()
|
|
|
|
assignment, exists := ab.assignments[id]
|
|
return assignment, exists
|
|
} |