package bootstrap import ( "context" "encoding/json" "fmt" "io/ioutil" "math/rand" "net/http" "os" "strings" "time" "github.com/libp2p/go-libp2p/core/host" "github.com/libp2p/go-libp2p/core/peer" "github.com/multiformats/go-multiaddr" ) // BootstrapPool manages a pool of bootstrap peers for DHT joining type BootstrapPool struct { peers []peer.AddrInfo dialsPerSecond int maxConcurrent int staggerDelay time.Duration httpClient *http.Client } // BootstrapConfig represents the JSON configuration for bootstrap peers type BootstrapConfig struct { Peers []BootstrapPeer `json:"peers"` Meta BootstrapMeta `json:"meta,omitempty"` } // BootstrapPeer represents a single bootstrap peer type BootstrapPeer struct { ID string `json:"id"` // Peer ID Addresses []string `json:"addresses"` // Multiaddresses Priority int `json:"priority"` // Priority (higher = more likely to be selected) Healthy bool `json:"healthy"` // Health status LastSeen string `json:"last_seen"` // Last seen timestamp } // BootstrapMeta contains metadata about the bootstrap configuration type BootstrapMeta struct { UpdatedAt string `json:"updated_at"` Version int `json:"version"` ClusterID string `json:"cluster_id"` TotalPeers int `json:"total_peers"` HealthyPeers int `json:"healthy_peers"` } // BootstrapSubset represents a subset of peers assigned to a replica type BootstrapSubset struct { Peers []peer.AddrInfo `json:"peers"` StaggerDelayMS int `json:"stagger_delay_ms"` AssignedAt time.Time `json:"assigned_at"` } // NewBootstrapPool creates a new bootstrap pool manager func NewBootstrapPool(dialsPerSecond, maxConcurrent int, staggerMS int) *BootstrapPool { return &BootstrapPool{ peers: []peer.AddrInfo{}, dialsPerSecond: dialsPerSecond, maxConcurrent: maxConcurrent, staggerDelay: time.Duration(staggerMS) * time.Millisecond, httpClient: &http.Client{Timeout: 10 * time.Second}, } } // LoadFromFile loads bootstrap configuration from a JSON file func (bp *BootstrapPool) LoadFromFile(filePath string) error { if filePath == "" { return nil // No file configured } data, err := ioutil.ReadFile(filePath) if err != nil { return fmt.Errorf("failed to read bootstrap file %s: %w", filePath, err) } return bp.loadFromJSON(data) } // LoadFromURL loads bootstrap configuration from a URL (WHOOSH endpoint) func (bp *BootstrapPool) LoadFromURL(ctx context.Context, url string) error { if url == "" { return nil // No URL configured } req, err := http.NewRequestWithContext(ctx, "GET", url, nil) if err != nil { return fmt.Errorf("failed to create bootstrap request: %w", err) } resp, err := bp.httpClient.Do(req) if err != nil { return fmt.Errorf("bootstrap request failed: %w", err) } defer resp.Body.Close() if resp.StatusCode != http.StatusOK { return fmt.Errorf("bootstrap request failed with status %d", resp.StatusCode) } data, err := ioutil.ReadAll(resp.Body) if err != nil { return fmt.Errorf("failed to read bootstrap response: %w", err) } return bp.loadFromJSON(data) } // loadFromJSON parses JSON bootstrap configuration func (bp *BootstrapPool) loadFromJSON(data []byte) error { var config BootstrapConfig if err := json.Unmarshal(data, &config); err != nil { return fmt.Errorf("failed to parse bootstrap JSON: %w", err) } // Convert bootstrap peers to AddrInfo var peers []peer.AddrInfo for _, bsPeer := range config.Peers { // Only include healthy peers if !bsPeer.Healthy { continue } // Parse peer ID peerID, err := peer.Decode(bsPeer.ID) if err != nil { fmt.Printf("⚠️ Invalid peer ID %s: %v\n", bsPeer.ID, err) continue } // Parse multiaddresses var addrs []multiaddr.Multiaddr for _, addrStr := range bsPeer.Addresses { addr, err := multiaddr.NewMultiaddr(addrStr) if err != nil { fmt.Printf("⚠️ Invalid multiaddress %s: %v\n", addrStr, err) continue } addrs = append(addrs, addr) } if len(addrs) > 0 { peers = append(peers, peer.AddrInfo{ ID: peerID, Addrs: addrs, }) } } bp.peers = peers fmt.Printf("📋 Loaded %d healthy bootstrap peers from configuration\n", len(peers)) return nil } // LoadFromEnvironment loads bootstrap configuration from environment variables func (bp *BootstrapPool) LoadFromEnvironment() error { // Try loading from file first if bootstrapFile := os.Getenv("BOOTSTRAP_JSON"); bootstrapFile != "" { if err := bp.LoadFromFile(bootstrapFile); err != nil { fmt.Printf("⚠️ Failed to load bootstrap from file: %v\n", err) } else { return nil // Successfully loaded from file } } // Try loading from URL if bootstrapURL := os.Getenv("BOOTSTRAP_URL"); bootstrapURL != "" { ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) defer cancel() if err := bp.LoadFromURL(ctx, bootstrapURL); err != nil { fmt.Printf("⚠️ Failed to load bootstrap from URL: %v\n", err) } else { return nil // Successfully loaded from URL } } // Fallback to legacy environment variable if bootstrapPeersEnv := os.Getenv("CHORUS_BOOTSTRAP_PEERS"); bootstrapPeersEnv != "" { return bp.loadFromLegacyEnv(bootstrapPeersEnv) } return nil // No bootstrap configuration found } // loadFromLegacyEnv loads from comma-separated multiaddress list func (bp *BootstrapPool) loadFromLegacyEnv(peersEnv string) error { peerStrs := strings.Split(peersEnv, ",") var peers []peer.AddrInfo for _, peerStr := range peerStrs { peerStr = strings.TrimSpace(peerStr) if peerStr == "" { continue } // Parse multiaddress addr, err := multiaddr.NewMultiaddr(peerStr) if err != nil { fmt.Printf("⚠️ Invalid bootstrap peer %s: %v\n", peerStr, err) continue } // Extract peer info info, err := peer.AddrInfoFromP2pAddr(addr) if err != nil { fmt.Printf("⚠️ Failed to parse peer info from %s: %v\n", peerStr, err) continue } peers = append(peers, *info) } bp.peers = peers fmt.Printf("📋 Loaded %d bootstrap peers from legacy environment\n", len(peers)) return nil } // GetSubset returns a subset of bootstrap peers for a replica func (bp *BootstrapPool) GetSubset(count int) BootstrapSubset { if len(bp.peers) == 0 { return BootstrapSubset{ Peers: []peer.AddrInfo{}, StaggerDelayMS: 0, AssignedAt: time.Now(), } } // Ensure count doesn't exceed available peers if count > len(bp.peers) { count = len(bp.peers) } // Randomly select peers from the pool selectedPeers := make([]peer.AddrInfo, 0, count) indices := rand.Perm(len(bp.peers)) for i := 0; i < count; i++ { selectedPeers = append(selectedPeers, bp.peers[indices[i]]) } // Generate random stagger delay (0 to configured max) staggerMS := 0 if bp.staggerDelay > 0 { staggerMS = rand.Intn(int(bp.staggerDelay.Milliseconds())) } return BootstrapSubset{ Peers: selectedPeers, StaggerDelayMS: staggerMS, AssignedAt: time.Now(), } } // ConnectWithRateLimit connects to bootstrap peers with rate limiting func (bp *BootstrapPool) ConnectWithRateLimit(ctx context.Context, h host.Host, subset BootstrapSubset) error { if len(subset.Peers) == 0 { return nil // No peers to connect to } // Apply stagger delay if subset.StaggerDelayMS > 0 { delay := time.Duration(subset.StaggerDelayMS) * time.Millisecond fmt.Printf("⏱️ Applying join stagger delay: %v\n", delay) select { case <-ctx.Done(): return ctx.Err() case <-time.After(delay): // Continue after delay } } // Create rate limiter for dials ticker := time.NewTicker(time.Second / time.Duration(bp.dialsPerSecond)) defer ticker.Stop() // Semaphore for concurrent dials semaphore := make(chan struct{}, bp.maxConcurrent) // Connect to each peer with rate limiting for i, peerInfo := range subset.Peers { // Wait for rate limiter select { case <-ctx.Done(): return ctx.Err() case <-ticker.C: // Rate limit satisfied } // Acquire semaphore select { case <-ctx.Done(): return ctx.Err() case semaphore <- struct{}{}: // Semaphore acquired } // Connect to peer in goroutine go func(info peer.AddrInfo, index int) { defer func() { <-semaphore }() // Release semaphore ctx, cancel := context.WithTimeout(ctx, 30*time.Second) defer cancel() if err := h.Connect(ctx, info); err != nil { fmt.Printf("⚠️ Failed to connect to bootstrap peer %s (%d/%d): %v\n", info.ID.ShortString(), index+1, len(subset.Peers), err) } else { fmt.Printf("🔗 Connected to bootstrap peer %s (%d/%d)\n", info.ID.ShortString(), index+1, len(subset.Peers)) } }(peerInfo, i) } // Wait for all connections to complete or timeout for i := 0; i < bp.maxConcurrent && i < len(subset.Peers); i++ { select { case <-ctx.Done(): return ctx.Err() case semaphore <- struct{}{}: <-semaphore // Immediately release } } return nil } // GetPeerCount returns the number of available bootstrap peers func (bp *BootstrapPool) GetPeerCount() int { return len(bp.peers) } // GetPeers returns all bootstrap peers (for debugging) func (bp *BootstrapPool) GetPeers() []peer.AddrInfo { return bp.peers } // GetStats returns bootstrap pool statistics func (bp *BootstrapPool) GetStats() map[string]interface{} { return map[string]interface{}{ "peer_count": len(bp.peers), "dials_per_second": bp.dialsPerSecond, "max_concurrent": bp.maxConcurrent, "stagger_delay_ms": bp.staggerDelay.Milliseconds(), } }