Files
WHOOSH/internal/orchestrator/assignment_broker.go
Claude Code 28f02b61d1 Integrate wave-based scaling system with WHOOSH server
- Add scaling system components to server initialization
- Register scaling API and assignment broker routes
- Start bootstrap pool manager in server lifecycle
- Add graceful shutdown for scaling controller
- Update API routing to use chi.Router instead of gorilla/mux
- Fix Docker API compatibility issues
- Configure health gates with placeholder URLs for KACHING and BACKBEAT

🤖 Generated with [Claude Code](https://claude.ai/code)

Co-Authored-By: Claude <noreply@anthropic.com>
2025-09-22 13:59:01 +10:00

502 lines
15 KiB
Go

package orchestrator
import (
"context"
"encoding/json"
"fmt"
"math/rand"
"net/http"
"strconv"
"sync"
"time"
"github.com/go-chi/chi/v5"
"github.com/rs/zerolog/log"
"go.opentelemetry.io/otel/attribute"
"github.com/chorus-services/whoosh/internal/tracing"
)
// AssignmentBroker manages per-replica assignments for CHORUS instances
type AssignmentBroker struct {
mu sync.RWMutex
assignments map[string]*Assignment
templates map[string]*AssignmentTemplate
bootstrap *BootstrapPoolManager
}
// Assignment represents a configuration assignment for a CHORUS replica
type Assignment struct {
ID string `json:"id"`
TaskSlot string `json:"task_slot,omitempty"`
TaskID string `json:"task_id,omitempty"`
ClusterID string `json:"cluster_id"`
Role string `json:"role"`
Model string `json:"model"`
PromptUCXL string `json:"prompt_ucxl,omitempty"`
Specialization string `json:"specialization"`
Capabilities []string `json:"capabilities"`
Environment map[string]string `json:"environment,omitempty"`
BootstrapPeers []string `json:"bootstrap_peers"`
JoinStaggerMS int `json:"join_stagger_ms"`
DialsPerSecond int `json:"dials_per_second"`
MaxConcurrentDHT int `json:"max_concurrent_dht"`
ConfigEpoch int64 `json:"config_epoch"`
AssignedAt time.Time `json:"assigned_at"`
ExpiresAt time.Time `json:"expires_at,omitempty"`
}
// AssignmentTemplate defines a template for creating assignments
type AssignmentTemplate struct {
Name string `json:"name"`
Role string `json:"role"`
Model string `json:"model"`
PromptUCXL string `json:"prompt_ucxl,omitempty"`
Specialization string `json:"specialization"`
Capabilities []string `json:"capabilities"`
Environment map[string]string `json:"environment,omitempty"`
// Scaling configuration
DialsPerSecond int `json:"dials_per_second"`
MaxConcurrentDHT int `json:"max_concurrent_dht"`
BootstrapPeerCount int `json:"bootstrap_peer_count"` // How many bootstrap peers to assign
MaxStaggerMS int `json:"max_stagger_ms"` // Maximum stagger delay
}
// AssignmentRequest represents a request for assignment
type AssignmentRequest struct {
TaskSlot string `json:"task_slot,omitempty"`
TaskID string `json:"task_id,omitempty"`
ClusterID string `json:"cluster_id"`
Template string `json:"template,omitempty"` // Template name to use
Role string `json:"role,omitempty"` // Override role
Model string `json:"model,omitempty"` // Override model
}
// AssignmentStats represents statistics about assignments
type AssignmentStats struct {
TotalAssignments int `json:"total_assignments"`
AssignmentsByRole map[string]int `json:"assignments_by_role"`
AssignmentsByModel map[string]int `json:"assignments_by_model"`
ActiveAssignments int `json:"active_assignments"`
ExpiredAssignments int `json:"expired_assignments"`
TemplateCount int `json:"template_count"`
AvgStaggerMS float64 `json:"avg_stagger_ms"`
}
// NewAssignmentBroker creates a new assignment broker
func NewAssignmentBroker(bootstrapManager *BootstrapPoolManager) *AssignmentBroker {
broker := &AssignmentBroker{
assignments: make(map[string]*Assignment),
templates: make(map[string]*AssignmentTemplate),
bootstrap: bootstrapManager,
}
// Initialize default templates
broker.initializeDefaultTemplates()
return broker
}
// initializeDefaultTemplates sets up default assignment templates
func (ab *AssignmentBroker) initializeDefaultTemplates() {
defaultTemplates := []*AssignmentTemplate{
{
Name: "general-developer",
Role: "developer",
Model: "meta/llama-3.1-8b-instruct",
Specialization: "general_developer",
Capabilities: []string{"general_development", "task_coordination"},
DialsPerSecond: 5,
MaxConcurrentDHT: 16,
BootstrapPeerCount: 3,
MaxStaggerMS: 20000,
},
{
Name: "code-reviewer",
Role: "reviewer",
Model: "meta/llama-3.1-70b-instruct",
Specialization: "code_reviewer",
Capabilities: []string{"code_review", "quality_assurance"},
DialsPerSecond: 3,
MaxConcurrentDHT: 8,
BootstrapPeerCount: 2,
MaxStaggerMS: 15000,
},
{
Name: "task-coordinator",
Role: "coordinator",
Model: "meta/llama-3.1-8b-instruct",
Specialization: "task_coordinator",
Capabilities: []string{"task_coordination", "planning"},
DialsPerSecond: 8,
MaxConcurrentDHT: 24,
BootstrapPeerCount: 4,
MaxStaggerMS: 10000,
},
{
Name: "admin",
Role: "admin",
Model: "meta/llama-3.1-70b-instruct",
Specialization: "system_admin",
Capabilities: []string{"administration", "leadership", "slurp_operations"},
DialsPerSecond: 10,
MaxConcurrentDHT: 32,
BootstrapPeerCount: 5,
MaxStaggerMS: 5000,
},
}
for _, template := range defaultTemplates {
ab.templates[template.Name] = template
}
log.Info().Int("template_count", len(defaultTemplates)).Msg("Initialized default assignment templates")
}
// RegisterRoutes registers HTTP routes for the assignment broker
func (ab *AssignmentBroker) RegisterRoutes(router chi.Router) {
router.Get("/assign", ab.handleAssignRequest)
router.Get("/", ab.handleListAssignments)
router.Get("/{id}", ab.handleGetAssignment)
router.Delete("/{id}", ab.handleDeleteAssignment)
router.Route("/templates", func(r chi.Router) {
r.Get("/", ab.handleListTemplates)
r.Post("/", ab.handleCreateTemplate)
r.Get("/{name}", ab.handleGetTemplate)
})
router.Get("/stats", ab.handleGetStats)
}
// handleAssignRequest handles requests for new assignments
func (ab *AssignmentBroker) handleAssignRequest(w http.ResponseWriter, r *http.Request) {
ctx, span := tracing.Tracer.Start(r.Context(), "assignment_broker.assign_request")
defer span.End()
// Parse query parameters
req := AssignmentRequest{
TaskSlot: r.URL.Query().Get("slot"),
TaskID: r.URL.Query().Get("task"),
ClusterID: r.URL.Query().Get("cluster"),
Template: r.URL.Query().Get("template"),
Role: r.URL.Query().Get("role"),
Model: r.URL.Query().Get("model"),
}
// Default cluster ID if not provided
if req.ClusterID == "" {
req.ClusterID = "default"
}
// Default template if not provided
if req.Template == "" {
req.Template = "general-developer"
}
span.SetAttributes(
attribute.String("assignment.cluster_id", req.ClusterID),
attribute.String("assignment.template", req.Template),
attribute.String("assignment.task_slot", req.TaskSlot),
attribute.String("assignment.task_id", req.TaskID),
)
// Create assignment
assignment, err := ab.CreateAssignment(ctx, req)
if err != nil {
log.Error().Err(err).Msg("Failed to create assignment")
http.Error(w, fmt.Sprintf("Failed to create assignment: %v", err), http.StatusInternalServerError)
return
}
log.Info().
Str("assignment_id", assignment.ID).
Str("role", assignment.Role).
Str("model", assignment.Model).
Str("cluster_id", assignment.ClusterID).
Msg("Created assignment")
w.Header().Set("Content-Type", "application/json")
json.NewEncoder(w).Encode(assignment)
}
// handleListAssignments returns all active assignments
func (ab *AssignmentBroker) handleListAssignments(w http.ResponseWriter, r *http.Request) {
ab.mu.RLock()
defer ab.mu.RUnlock()
assignments := make([]*Assignment, 0, len(ab.assignments))
for _, assignment := range ab.assignments {
// Only return non-expired assignments
if assignment.ExpiresAt.IsZero() || time.Now().Before(assignment.ExpiresAt) {
assignments = append(assignments, assignment)
}
}
w.Header().Set("Content-Type", "application/json")
json.NewEncoder(w).Encode(assignments)
}
// handleGetAssignment returns a specific assignment by ID
func (ab *AssignmentBroker) handleGetAssignment(w http.ResponseWriter, r *http.Request) {
assignmentID := chi.URLParam(r, "id")
ab.mu.RLock()
assignment, exists := ab.assignments[assignmentID]
ab.mu.RUnlock()
if !exists {
http.Error(w, "Assignment not found", http.StatusNotFound)
return
}
w.Header().Set("Content-Type", "application/json")
json.NewEncoder(w).Encode(assignment)
}
// handleDeleteAssignment deletes an assignment
func (ab *AssignmentBroker) handleDeleteAssignment(w http.ResponseWriter, r *http.Request) {
assignmentID := chi.URLParam(r, "id")
ab.mu.Lock()
defer ab.mu.Unlock()
if _, exists := ab.assignments[assignmentID]; !exists {
http.Error(w, "Assignment not found", http.StatusNotFound)
return
}
delete(ab.assignments, assignmentID)
log.Info().Str("assignment_id", assignmentID).Msg("Deleted assignment")
w.WriteHeader(http.StatusNoContent)
}
// handleListTemplates returns all available templates
func (ab *AssignmentBroker) handleListTemplates(w http.ResponseWriter, r *http.Request) {
ab.mu.RLock()
defer ab.mu.RUnlock()
templates := make([]*AssignmentTemplate, 0, len(ab.templates))
for _, template := range ab.templates {
templates = append(templates, template)
}
w.Header().Set("Content-Type", "application/json")
json.NewEncoder(w).Encode(templates)
}
// handleCreateTemplate creates a new assignment template
func (ab *AssignmentBroker) handleCreateTemplate(w http.ResponseWriter, r *http.Request) {
var template AssignmentTemplate
if err := json.NewDecoder(r.Body).Decode(&template); err != nil {
http.Error(w, "Invalid template data", http.StatusBadRequest)
return
}
if template.Name == "" {
http.Error(w, "Template name is required", http.StatusBadRequest)
return
}
ab.mu.Lock()
ab.templates[template.Name] = &template
ab.mu.Unlock()
log.Info().Str("template_name", template.Name).Msg("Created assignment template")
w.Header().Set("Content-Type", "application/json")
w.WriteHeader(http.StatusCreated)
json.NewEncoder(w).Encode(&template)
}
// handleGetTemplate returns a specific template
func (ab *AssignmentBroker) handleGetTemplate(w http.ResponseWriter, r *http.Request) {
templateName := chi.URLParam(r, "name")
ab.mu.RLock()
template, exists := ab.templates[templateName]
ab.mu.RUnlock()
if !exists {
http.Error(w, "Template not found", http.StatusNotFound)
return
}
w.Header().Set("Content-Type", "application/json")
json.NewEncoder(w).Encode(template)
}
// handleGetStats returns assignment statistics
func (ab *AssignmentBroker) handleGetStats(w http.ResponseWriter, r *http.Request) {
stats := ab.GetStats()
w.Header().Set("Content-Type", "application/json")
json.NewEncoder(w).Encode(stats)
}
// CreateAssignment creates a new assignment from a request
func (ab *AssignmentBroker) CreateAssignment(ctx context.Context, req AssignmentRequest) (*Assignment, error) {
ab.mu.Lock()
defer ab.mu.Unlock()
// Get template
template, exists := ab.templates[req.Template]
if !exists {
return nil, fmt.Errorf("template '%s' not found", req.Template)
}
// Generate assignment ID
assignmentID := ab.generateAssignmentID(req)
// Get bootstrap peer subset
var bootstrapPeers []string
if ab.bootstrap != nil {
subset := ab.bootstrap.GetSubset(template.BootstrapPeerCount)
for _, peer := range subset.Peers {
if len(peer.Addresses) > 0 {
bootstrapPeers = append(bootstrapPeers, fmt.Sprintf("%s/p2p/%s", peer.Addresses[0], peer.ID))
}
}
}
// Generate stagger delay
staggerMS := 0
if template.MaxStaggerMS > 0 {
staggerMS = rand.Intn(template.MaxStaggerMS)
}
// Create assignment
assignment := &Assignment{
ID: assignmentID,
TaskSlot: req.TaskSlot,
TaskID: req.TaskID,
ClusterID: req.ClusterID,
Role: template.Role,
Model: template.Model,
PromptUCXL: template.PromptUCXL,
Specialization: template.Specialization,
Capabilities: template.Capabilities,
Environment: make(map[string]string),
BootstrapPeers: bootstrapPeers,
JoinStaggerMS: staggerMS,
DialsPerSecond: template.DialsPerSecond,
MaxConcurrentDHT: template.MaxConcurrentDHT,
ConfigEpoch: time.Now().Unix(),
AssignedAt: time.Now(),
ExpiresAt: time.Now().Add(24 * time.Hour), // 24 hour default expiry
}
// Apply request overrides
if req.Role != "" {
assignment.Role = req.Role
}
if req.Model != "" {
assignment.Model = req.Model
}
// Copy environment from template
for key, value := range template.Environment {
assignment.Environment[key] = value
}
// Add assignment-specific environment
assignment.Environment["ASSIGNMENT_ID"] = assignmentID
assignment.Environment["CONFIG_EPOCH"] = strconv.FormatInt(assignment.ConfigEpoch, 10)
assignment.Environment["DISABLE_MDNS"] = "true"
assignment.Environment["DIALS_PER_SEC"] = strconv.Itoa(assignment.DialsPerSecond)
assignment.Environment["MAX_CONCURRENT_DHT"] = strconv.Itoa(assignment.MaxConcurrentDHT)
assignment.Environment["JOIN_STAGGER_MS"] = strconv.Itoa(assignment.JoinStaggerMS)
// Store assignment
ab.assignments[assignmentID] = assignment
return assignment, nil
}
// generateAssignmentID generates a unique assignment ID
func (ab *AssignmentBroker) generateAssignmentID(req AssignmentRequest) string {
timestamp := time.Now().Unix()
if req.TaskSlot != "" && req.TaskID != "" {
return fmt.Sprintf("assign-%s-%s-%d", req.TaskSlot, req.TaskID, timestamp)
}
if req.TaskSlot != "" {
return fmt.Sprintf("assign-%s-%d", req.TaskSlot, timestamp)
}
return fmt.Sprintf("assign-%s-%d", req.ClusterID, timestamp)
}
// GetStats returns assignment statistics
func (ab *AssignmentBroker) GetStats() *AssignmentStats {
ab.mu.RLock()
defer ab.mu.RUnlock()
stats := &AssignmentStats{
TotalAssignments: len(ab.assignments),
AssignmentsByRole: make(map[string]int),
AssignmentsByModel: make(map[string]int),
TemplateCount: len(ab.templates),
}
var totalStagger int
activeCount := 0
expiredCount := 0
now := time.Now()
for _, assignment := range ab.assignments {
// Count by role
stats.AssignmentsByRole[assignment.Role]++
// Count by model
stats.AssignmentsByModel[assignment.Model]++
// Track stagger for average
totalStagger += assignment.JoinStaggerMS
// Count active vs expired
if assignment.ExpiresAt.IsZero() || now.Before(assignment.ExpiresAt) {
activeCount++
} else {
expiredCount++
}
}
stats.ActiveAssignments = activeCount
stats.ExpiredAssignments = expiredCount
if len(ab.assignments) > 0 {
stats.AvgStaggerMS = float64(totalStagger) / float64(len(ab.assignments))
}
return stats
}
// CleanupExpiredAssignments removes expired assignments
func (ab *AssignmentBroker) CleanupExpiredAssignments() {
ab.mu.Lock()
defer ab.mu.Unlock()
now := time.Now()
expiredCount := 0
for id, assignment := range ab.assignments {
if !assignment.ExpiresAt.IsZero() && now.After(assignment.ExpiresAt) {
delete(ab.assignments, id)
expiredCount++
}
}
if expiredCount > 0 {
log.Info().Int("expired_count", expiredCount).Msg("Cleaned up expired assignments")
}
}
// GetAssignment returns an assignment by ID
func (ab *AssignmentBroker) GetAssignment(id string) (*Assignment, bool) {
ab.mu.RLock()
defer ab.mu.RUnlock()
assignment, exists := ab.assignments[id]
return assignment, exists
}