- Health gates system for pre-scaling validation (KACHING, BACKBEAT, bootstrap peers) - Assignment broker API for per-replica configuration management - Bootstrap pool management with weighted peer selection and health monitoring - Wave-based scaling algorithm with exponential backoff and failure recovery - Enhanced SwarmManager with Docker service scaling capabilities - Comprehensive scaling metrics collection and reporting system - RESTful HTTP API for external scaling operations and monitoring - Integration with CHORUS P2P networking and assignment systems 🤖 Generated with [Claude Code](https://claude.ai/code) Co-Authored-By: Claude <noreply@anthropic.com>
444 lines
12 KiB
Go
444 lines
12 KiB
Go
package orchestrator
|
|
|
|
import (
|
|
"context"
|
|
"encoding/json"
|
|
"fmt"
|
|
"math/rand"
|
|
"net/http"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/rs/zerolog/log"
|
|
"go.opentelemetry.io/otel/attribute"
|
|
|
|
"github.com/chorus-services/whoosh/internal/tracing"
|
|
)
|
|
|
|
// BootstrapPoolManager manages the pool of bootstrap peers for CHORUS instances
|
|
type BootstrapPoolManager struct {
|
|
mu sync.RWMutex
|
|
peers []BootstrapPeer
|
|
chorusNodes map[string]CHORUSNodeInfo
|
|
updateInterval time.Duration
|
|
healthCheckTimeout time.Duration
|
|
httpClient *http.Client
|
|
}
|
|
|
|
// BootstrapPeer represents a bootstrap peer in the pool
|
|
type BootstrapPeer struct {
|
|
ID string `json:"id"` // Peer ID
|
|
Addresses []string `json:"addresses"` // Multiaddresses
|
|
Priority int `json:"priority"` // Priority (higher = more likely to be selected)
|
|
Healthy bool `json:"healthy"` // Health status
|
|
LastSeen time.Time `json:"last_seen"` // Last seen timestamp
|
|
NodeInfo CHORUSNodeInfo `json:"node_info,omitempty"` // Associated CHORUS node info
|
|
}
|
|
|
|
// CHORUSNodeInfo represents information about a CHORUS node
|
|
type CHORUSNodeInfo struct {
|
|
AgentID string `json:"agent_id"`
|
|
Role string `json:"role"`
|
|
Specialization string `json:"specialization"`
|
|
Capabilities []string `json:"capabilities"`
|
|
LastHeartbeat time.Time `json:"last_heartbeat"`
|
|
Healthy bool `json:"healthy"`
|
|
IsBootstrap bool `json:"is_bootstrap"`
|
|
}
|
|
|
|
// BootstrapSubset represents a subset of peers assigned to a replica
|
|
type BootstrapSubset struct {
|
|
Peers []BootstrapPeer `json:"peers"`
|
|
AssignedAt time.Time `json:"assigned_at"`
|
|
RequestedBy string `json:"requested_by,omitempty"`
|
|
}
|
|
|
|
// BootstrapPoolConfig represents configuration for the bootstrap pool
|
|
type BootstrapPoolConfig struct {
|
|
MinPoolSize int `json:"min_pool_size"` // Minimum peers to maintain
|
|
MaxPoolSize int `json:"max_pool_size"` // Maximum peers in pool
|
|
HealthCheckInterval time.Duration `json:"health_check_interval"` // How often to check peer health
|
|
StaleThreshold time.Duration `json:"stale_threshold"` // When to consider a peer stale
|
|
PreferredRoles []string `json:"preferred_roles"` // Preferred roles for bootstrap peers
|
|
}
|
|
|
|
// BootstrapPoolStats represents statistics about the bootstrap pool
|
|
type BootstrapPoolStats struct {
|
|
TotalPeers int `json:"total_peers"`
|
|
HealthyPeers int `json:"healthy_peers"`
|
|
UnhealthyPeers int `json:"unhealthy_peers"`
|
|
StalePeers int `json:"stale_peers"`
|
|
PeersByRole map[string]int `json:"peers_by_role"`
|
|
LastUpdated time.Time `json:"last_updated"`
|
|
AvgLatency float64 `json:"avg_latency_ms"`
|
|
}
|
|
|
|
// NewBootstrapPoolManager creates a new bootstrap pool manager
|
|
func NewBootstrapPoolManager(config BootstrapPoolConfig) *BootstrapPoolManager {
|
|
if config.MinPoolSize == 0 {
|
|
config.MinPoolSize = 5
|
|
}
|
|
if config.MaxPoolSize == 0 {
|
|
config.MaxPoolSize = 30
|
|
}
|
|
if config.HealthCheckInterval == 0 {
|
|
config.HealthCheckInterval = 2 * time.Minute
|
|
}
|
|
if config.StaleThreshold == 0 {
|
|
config.StaleThreshold = 10 * time.Minute
|
|
}
|
|
|
|
return &BootstrapPoolManager{
|
|
peers: make([]BootstrapPeer, 0),
|
|
chorusNodes: make(map[string]CHORUSNodeInfo),
|
|
updateInterval: config.HealthCheckInterval,
|
|
healthCheckTimeout: 10 * time.Second,
|
|
httpClient: &http.Client{Timeout: 10 * time.Second},
|
|
}
|
|
}
|
|
|
|
// Start begins the bootstrap pool management process
|
|
func (bpm *BootstrapPoolManager) Start(ctx context.Context) {
|
|
log.Info().Msg("Starting bootstrap pool manager")
|
|
|
|
// Start periodic health checks
|
|
ticker := time.NewTicker(bpm.updateInterval)
|
|
defer ticker.Stop()
|
|
|
|
for {
|
|
select {
|
|
case <-ctx.Done():
|
|
log.Info().Msg("Bootstrap pool manager stopping")
|
|
return
|
|
case <-ticker.C:
|
|
if err := bpm.updatePeerHealth(ctx); err != nil {
|
|
log.Error().Err(err).Msg("Failed to update peer health")
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
// AddPeer adds a new peer to the bootstrap pool
|
|
func (bpm *BootstrapPoolManager) AddPeer(peer BootstrapPeer) {
|
|
bpm.mu.Lock()
|
|
defer bpm.mu.Unlock()
|
|
|
|
// Check if peer already exists
|
|
for i, existingPeer := range bpm.peers {
|
|
if existingPeer.ID == peer.ID {
|
|
// Update existing peer
|
|
bpm.peers[i] = peer
|
|
log.Debug().Str("peer_id", peer.ID).Msg("Updated existing bootstrap peer")
|
|
return
|
|
}
|
|
}
|
|
|
|
// Add new peer
|
|
peer.LastSeen = time.Now()
|
|
bpm.peers = append(bpm.peers, peer)
|
|
log.Info().Str("peer_id", peer.ID).Msg("Added new bootstrap peer")
|
|
}
|
|
|
|
// RemovePeer removes a peer from the bootstrap pool
|
|
func (bpm *BootstrapPoolManager) RemovePeer(peerID string) {
|
|
bpm.mu.Lock()
|
|
defer bpm.mu.Unlock()
|
|
|
|
for i, peer := range bpm.peers {
|
|
if peer.ID == peerID {
|
|
// Remove peer by swapping with last element
|
|
bpm.peers[i] = bpm.peers[len(bpm.peers)-1]
|
|
bpm.peers = bpm.peers[:len(bpm.peers)-1]
|
|
log.Info().Str("peer_id", peerID).Msg("Removed bootstrap peer")
|
|
return
|
|
}
|
|
}
|
|
}
|
|
|
|
// GetSubset returns a subset of healthy bootstrap peers
|
|
func (bpm *BootstrapPoolManager) GetSubset(count int) BootstrapSubset {
|
|
bpm.mu.RLock()
|
|
defer bpm.mu.RUnlock()
|
|
|
|
// Filter healthy peers
|
|
var healthyPeers []BootstrapPeer
|
|
for _, peer := range bpm.peers {
|
|
if peer.Healthy && time.Since(peer.LastSeen) < 10*time.Minute {
|
|
healthyPeers = append(healthyPeers, peer)
|
|
}
|
|
}
|
|
|
|
if len(healthyPeers) == 0 {
|
|
log.Warn().Msg("No healthy bootstrap peers available")
|
|
return BootstrapSubset{
|
|
Peers: []BootstrapPeer{},
|
|
AssignedAt: time.Now(),
|
|
}
|
|
}
|
|
|
|
// Ensure count doesn't exceed available peers
|
|
if count > len(healthyPeers) {
|
|
count = len(healthyPeers)
|
|
}
|
|
|
|
// Select peers with weighted random selection based on priority
|
|
selectedPeers := bpm.selectWeightedRandomPeers(healthyPeers, count)
|
|
|
|
return BootstrapSubset{
|
|
Peers: selectedPeers,
|
|
AssignedAt: time.Now(),
|
|
}
|
|
}
|
|
|
|
// selectWeightedRandomPeers selects peers using weighted random selection
|
|
func (bpm *BootstrapPoolManager) selectWeightedRandomPeers(peers []BootstrapPeer, count int) []BootstrapPeer {
|
|
if count >= len(peers) {
|
|
return peers
|
|
}
|
|
|
|
// Calculate total weight
|
|
totalWeight := 0
|
|
for _, peer := range peers {
|
|
weight := peer.Priority
|
|
if weight <= 0 {
|
|
weight = 1 // Minimum weight
|
|
}
|
|
totalWeight += weight
|
|
}
|
|
|
|
selected := make([]BootstrapPeer, 0, count)
|
|
usedIndices := make(map[int]bool)
|
|
|
|
for len(selected) < count {
|
|
// Random selection with weight
|
|
randWeight := rand.Intn(totalWeight)
|
|
currentWeight := 0
|
|
|
|
for i, peer := range peers {
|
|
if usedIndices[i] {
|
|
continue
|
|
}
|
|
|
|
weight := peer.Priority
|
|
if weight <= 0 {
|
|
weight = 1
|
|
}
|
|
currentWeight += weight
|
|
|
|
if randWeight < currentWeight {
|
|
selected = append(selected, peer)
|
|
usedIndices[i] = true
|
|
break
|
|
}
|
|
}
|
|
|
|
// Prevent infinite loop if we can't find more unique peers
|
|
if len(selected) == len(peers)-len(usedIndices) {
|
|
break
|
|
}
|
|
}
|
|
|
|
return selected
|
|
}
|
|
|
|
// DiscoverPeersFromCHORUS discovers bootstrap peers from existing CHORUS nodes
|
|
func (bpm *BootstrapPoolManager) DiscoverPeersFromCHORUS(ctx context.Context, chorusEndpoints []string) error {
|
|
ctx, span := tracing.Tracer.Start(ctx, "bootstrap_pool.discover_peers")
|
|
defer span.End()
|
|
|
|
discoveredCount := 0
|
|
|
|
for _, endpoint := range chorusEndpoints {
|
|
if err := bpm.discoverFromEndpoint(ctx, endpoint); err != nil {
|
|
log.Warn().Str("endpoint", endpoint).Err(err).Msg("Failed to discover peers from CHORUS endpoint")
|
|
continue
|
|
}
|
|
discoveredCount++
|
|
}
|
|
|
|
span.SetAttributes(
|
|
attribute.Int("discovery.endpoints_checked", len(chorusEndpoints)),
|
|
attribute.Int("discovery.successful_discoveries", discoveredCount),
|
|
)
|
|
|
|
log.Info().
|
|
Int("endpoints_checked", len(chorusEndpoints)).
|
|
Int("successful_discoveries", discoveredCount).
|
|
Msg("Completed peer discovery from CHORUS nodes")
|
|
|
|
return nil
|
|
}
|
|
|
|
// discoverFromEndpoint discovers peers from a single CHORUS endpoint
|
|
func (bpm *BootstrapPoolManager) discoverFromEndpoint(ctx context.Context, endpoint string) error {
|
|
url := fmt.Sprintf("%s/api/v1/peers", endpoint)
|
|
|
|
req, err := http.NewRequestWithContext(ctx, "GET", url, nil)
|
|
if err != nil {
|
|
return fmt.Errorf("failed to create discovery request: %w", err)
|
|
}
|
|
|
|
resp, err := bpm.httpClient.Do(req)
|
|
if err != nil {
|
|
return fmt.Errorf("discovery request failed: %w", err)
|
|
}
|
|
defer resp.Body.Close()
|
|
|
|
if resp.StatusCode != http.StatusOK {
|
|
return fmt.Errorf("discovery request returned status %d", resp.StatusCode)
|
|
}
|
|
|
|
var peerInfo struct {
|
|
Peers []BootstrapPeer `json:"peers"`
|
|
NodeInfo CHORUSNodeInfo `json:"node_info"`
|
|
}
|
|
|
|
if err := json.NewDecoder(resp.Body).Decode(&peerInfo); err != nil {
|
|
return fmt.Errorf("failed to decode peer discovery response: %w", err)
|
|
}
|
|
|
|
// Add discovered peers to pool
|
|
for _, peer := range peerInfo.Peers {
|
|
peer.NodeInfo = peerInfo.NodeInfo
|
|
peer.Healthy = true
|
|
peer.LastSeen = time.Now()
|
|
|
|
// Set priority based on role
|
|
if bpm.isPreferredRole(peer.NodeInfo.Role) {
|
|
peer.Priority = 100
|
|
} else {
|
|
peer.Priority = 50
|
|
}
|
|
|
|
bpm.AddPeer(peer)
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// isPreferredRole checks if a role is preferred for bootstrap peers
|
|
func (bpm *BootstrapPoolManager) isPreferredRole(role string) bool {
|
|
preferredRoles := []string{"admin", "coordinator", "stable"}
|
|
for _, preferred := range preferredRoles {
|
|
if role == preferred {
|
|
return true
|
|
}
|
|
}
|
|
return false
|
|
}
|
|
|
|
// updatePeerHealth updates the health status of all peers
|
|
func (bpm *BootstrapPoolManager) updatePeerHealth(ctx context.Context) error {
|
|
bpm.mu.Lock()
|
|
defer bpm.mu.Unlock()
|
|
|
|
ctx, span := tracing.Tracer.Start(ctx, "bootstrap_pool.update_health")
|
|
defer span.End()
|
|
|
|
healthyCount := 0
|
|
checkedCount := 0
|
|
|
|
for i := range bpm.peers {
|
|
peer := &bpm.peers[i]
|
|
|
|
// Check if peer is stale
|
|
if time.Since(peer.LastSeen) > 10*time.Minute {
|
|
peer.Healthy = false
|
|
continue
|
|
}
|
|
|
|
// Health check via ping (if addresses are available)
|
|
if len(peer.Addresses) > 0 {
|
|
if bpm.pingPeer(ctx, peer) {
|
|
peer.Healthy = true
|
|
peer.LastSeen = time.Now()
|
|
healthyCount++
|
|
} else {
|
|
peer.Healthy = false
|
|
}
|
|
checkedCount++
|
|
}
|
|
}
|
|
|
|
span.SetAttributes(
|
|
attribute.Int("health_check.checked_count", checkedCount),
|
|
attribute.Int("health_check.healthy_count", healthyCount),
|
|
attribute.Int("health_check.total_peers", len(bpm.peers)),
|
|
)
|
|
|
|
log.Debug().
|
|
Int("checked", checkedCount).
|
|
Int("healthy", healthyCount).
|
|
Int("total", len(bpm.peers)).
|
|
Msg("Updated bootstrap peer health")
|
|
|
|
return nil
|
|
}
|
|
|
|
// pingPeer performs a simple connectivity check to a peer
|
|
func (bpm *BootstrapPoolManager) pingPeer(ctx context.Context, peer *BootstrapPeer) bool {
|
|
// For now, just return true if the peer was seen recently
|
|
// In a real implementation, this would do a libp2p ping or HTTP health check
|
|
return time.Since(peer.LastSeen) < 5*time.Minute
|
|
}
|
|
|
|
// GetStats returns statistics about the bootstrap pool
|
|
func (bpm *BootstrapPoolManager) GetStats() BootstrapPoolStats {
|
|
bpm.mu.RLock()
|
|
defer bpm.mu.RUnlock()
|
|
|
|
stats := BootstrapPoolStats{
|
|
TotalPeers: len(bpm.peers),
|
|
PeersByRole: make(map[string]int),
|
|
LastUpdated: time.Now(),
|
|
}
|
|
|
|
staleCutoff := time.Now().Add(-10 * time.Minute)
|
|
|
|
for _, peer := range bpm.peers {
|
|
// Count by health status
|
|
if peer.Healthy {
|
|
stats.HealthyPeers++
|
|
} else {
|
|
stats.UnhealthyPeers++
|
|
}
|
|
|
|
// Count stale peers
|
|
if peer.LastSeen.Before(staleCutoff) {
|
|
stats.StalePeers++
|
|
}
|
|
|
|
// Count by role
|
|
role := peer.NodeInfo.Role
|
|
if role == "" {
|
|
role = "unknown"
|
|
}
|
|
stats.PeersByRole[role]++
|
|
}
|
|
|
|
return stats
|
|
}
|
|
|
|
// GetHealthyPeerCount returns the number of healthy peers
|
|
func (bpm *BootstrapPoolManager) GetHealthyPeerCount() int {
|
|
bpm.mu.RLock()
|
|
defer bpm.mu.RUnlock()
|
|
|
|
count := 0
|
|
for _, peer := range bpm.peers {
|
|
if peer.Healthy && time.Since(peer.LastSeen) < 10*time.Minute {
|
|
count++
|
|
}
|
|
}
|
|
return count
|
|
}
|
|
|
|
// GetAllPeers returns all peers in the pool (for debugging)
|
|
func (bpm *BootstrapPoolManager) GetAllPeers() []BootstrapPeer {
|
|
bpm.mu.RLock()
|
|
defer bpm.mu.RUnlock()
|
|
|
|
peers := make([]BootstrapPeer, len(bpm.peers))
|
|
copy(peers, bpm.peers)
|
|
return peers
|
|
} |