package runtime import ( "context" "encoding/json" "fmt" "net/http" "os" "path/filepath" "strings" "time" "chorus/api" "chorus/coordinator" "chorus/discovery" "chorus/internal/backbeat" "chorus/internal/licensing" "chorus/internal/logging" councilnats "chorus/internal/nats" "chorus/p2p" "chorus/pkg/config" "chorus/pkg/dht" "chorus/pkg/election" "chorus/pkg/health" "chorus/pkg/mcp" "chorus/pkg/metrics" "chorus/pkg/prompt" "chorus/pkg/shhh" "chorus/pkg/shutdown" "chorus/pkg/ucxi" "chorus/pkg/ucxl" "chorus/pubsub" "chorus/reasoning" "github.com/libp2p/go-libp2p/core/peer" "github.com/multiformats/go-multiaddr" "github.com/rs/zerolog" ) // Build information - set by main package var ( AppName = "CHORUS" AppVersion = "0.5.32" AppCommitHash = "unknown" AppBuildDate = "unknown" ) // SimpleLogger provides structured logging implementation via zerolog type SimpleLogger struct { logger zerolog.Logger } func NewSimpleLogger(component string) *SimpleLogger { return &SimpleLogger{ logger: logging.ForComponent(component), } } func (l *SimpleLogger) Info(msg string, args ...interface{}) { l.logger.Info().Msgf(msg, args...) } func (l *SimpleLogger) Warn(msg string, args ...interface{}) { l.logger.Warn().Msgf(msg, args...) } func (l *SimpleLogger) Error(msg string, args ...interface{}) { l.logger.Error().Msgf(msg, args...) } // SimpleTaskTracker tracks active tasks for availability reporting type SimpleTaskTracker struct { maxTasks int activeTasks map[string]bool decisionPublisher *ucxl.DecisionPublisher logger zerolog.Logger } // GetActiveTasks returns list of active task IDs func (t *SimpleTaskTracker) GetActiveTasks() []string { tasks := make([]string, 0, len(t.activeTasks)) for taskID := range t.activeTasks { tasks = append(tasks, taskID) } return tasks } // GetMaxTasks returns maximum number of concurrent tasks func (t *SimpleTaskTracker) GetMaxTasks() int { return t.maxTasks } // AddTask marks a task as active func (t *SimpleTaskTracker) AddTask(taskID string) { t.activeTasks[taskID] = true } // RemoveTask marks a task as completed and publishes decision if publisher available func (t *SimpleTaskTracker) RemoveTask(taskID string) { delete(t.activeTasks, taskID) // Publish task completion decision if publisher is available if t.decisionPublisher != nil { t.publishTaskCompletion(taskID, true, "Task completed successfully", nil) } } // publishTaskCompletion publishes a task completion decision to DHT func (t *SimpleTaskTracker) publishTaskCompletion(taskID string, success bool, summary string, filesModified []string) { if t.decisionPublisher == nil { return } if err := t.decisionPublisher.PublishTaskCompletion(taskID, success, summary, filesModified); err != nil { t.logger.Warn(). Err(err). Str("task_id", taskID). Msg("Failed to publish task completion") } else { t.logger.Debug(). Str("task_id", taskID). Msg("Published task completion decision") } } // SharedRuntime contains all the shared P2P infrastructure components type SharedRuntime struct { Config *config.Config RuntimeConfig *config.RuntimeConfig Logger *SimpleLogger Context context.Context Cancel context.CancelFunc Node *p2p.Node PubSub *pubsub.PubSub HypercoreLog *logging.HypercoreLog MDNSDiscovery *discovery.MDNSDiscovery BackbeatIntegration *backbeat.Integration DHTNode *dht.LibP2PDHT EncryptedStorage *dht.EncryptedDHTStorage DecisionPublisher *ucxl.DecisionPublisher ElectionManager *election.ElectionManager TaskCoordinator *coordinator.TaskCoordinator HTTPServer *api.HTTPServer UCXIServer *ucxi.Server HealthManager *health.Manager EnhancedHealth *health.EnhancedHealthChecks ShutdownManager *shutdown.Manager TaskTracker *SimpleTaskTracker Metrics *metrics.CHORUSMetrics Shhh *shhh.Sentinel CouncilSubscriber *councilnats.CouncilSubscriber } // Initialize sets up all shared P2P infrastructure components func Initialize(appMode string) (*SharedRuntime, error) { runtime := &SharedRuntime{} runtime.Logger = NewSimpleLogger(logging.ComponentRuntime) ctx, cancel := context.WithCancel(context.Background()) runtime.Context = ctx runtime.Cancel = cancel runtime.Logger.Info("Starting CHORUS v%s (build: %s, %s) - Container-First P2P Task Coordination", AppVersion, AppCommitHash, AppBuildDate) runtime.Logger.Info("πŸ“¦ Container deployment - Mode: %s", appMode) // Load configuration from environment (no config files in containers) runtime.Logger.Info("Loading configuration from environment variables...") cfg, err := config.LoadFromEnvironment() if err != nil { return nil, fmt.Errorf("configuration error: %v", err) } runtime.Config = cfg runtime.Logger.Info("Configuration loaded successfully") // Initialize runtime configuration with assignment support runtime.RuntimeConfig = config.NewRuntimeConfig(cfg) // Load assignment if ASSIGN_URL is configured if assignURL := os.Getenv("ASSIGN_URL"); assignURL != "" { runtime.Logger.Info("Loading assignment from WHOOSH: %s", assignURL) ctx, cancel := context.WithTimeout(runtime.Context, 10*time.Second) if err := runtime.RuntimeConfig.LoadAssignment(ctx, assignURL); err != nil { runtime.Logger.Warn("Failed to load assignment (continuing with base config): %v", err) } else { runtime.Logger.Info("Assignment loaded successfully") } cancel() // Start reload handler for SIGHUP runtime.RuntimeConfig.StartReloadHandler(runtime.Context, assignURL) runtime.Logger.Info("SIGHUP reload handler started for assignment updates") } else { runtime.Logger.Info("βšͺ No ASSIGN_URL configured, using static configuration") } runtime.Logger.Info("Agent ID: %s", cfg.Agent.ID) runtime.Logger.Info("🎯 Specialization: %s", cfg.Agent.Specialization) // CRITICAL: Validate license before any P2P operations runtime.Logger.Info("πŸ” Validating CHORUS license with KACHING...") licenseValidator := licensing.NewValidator(licensing.LicenseConfig{ LicenseID: cfg.License.LicenseID, ClusterID: cfg.License.ClusterID, KachingURL: cfg.License.KachingURL, Version: AppVersion, }) if err := licenseValidator.Validate(); err != nil { return nil, fmt.Errorf("license validation failed: %v", err) } runtime.Logger.Info("License validation successful - CHORUS authorized to run") // Initialize AI provider configuration runtime.Logger.Info("🧠 Configuring AI provider: %s", cfg.AI.Provider) if err := initializeAIProvider(cfg, runtime.Logger); err != nil { return nil, fmt.Errorf("AI provider initialization failed: %v", err) } runtime.Logger.Info("AI provider configured successfully") // Initialize metrics collector runtime.Metrics = metrics.NewCHORUSMetrics(nil) // Initialize SHHH sentinel sentinel, err := shhh.NewSentinel( shhh.Config{}, shhh.WithFindingObserver(runtime.handleShhhFindings), ) if err != nil { return nil, fmt.Errorf("failed to initialize SHHH sentinel: %v", err) } sentinel.SetAuditSink(&shhhAuditSink{logger: runtime.Logger}) runtime.Shhh = sentinel runtime.Logger.Info("πŸ›‘οΈ SHHH sentinel initialized") // Initialize BACKBEAT integration var backbeatIntegration *backbeat.Integration backbeatIntegration, err = backbeat.NewIntegration(cfg, cfg.Agent.ID, runtime.Logger) if err != nil { runtime.Logger.Warn("BACKBEAT integration initialization failed: %v", err) runtime.Logger.Info("πŸ“ P2P operations will run without beat synchronization") } else { if err := backbeatIntegration.Start(ctx); err != nil { runtime.Logger.Warn("Failed to start BACKBEAT integration: %v", err) backbeatIntegration = nil } else { runtime.Logger.Info("🎡 BACKBEAT integration started successfully") } } runtime.BackbeatIntegration = backbeatIntegration // Fetch bootstrap peers from WHOOSH for P2P mesh formation runtime.Logger.Info("Fetching bootstrap peers from WHOOSH...") bootstrapPeers, err := fetchBootstrapPeers(cfg.WHOOSHAPI.BaseURL, runtime.Logger) if err != nil { runtime.Logger.Warn("Failed to fetch bootstrap peers from WHOOSH: %v", err) runtime.Logger.Info("Falling back to static bootstrap configuration") bootstrapPeers = getStaticBootstrapPeers(runtime.Logger) } else { runtime.Logger.Info("Fetched %d bootstrap peers from WHOOSH", len(bootstrapPeers)) } // Set bootstrap peers in config for P2P node initialization if len(bootstrapPeers) > 0 { cfg.V2.DHT.BootstrapPeers = make([]string, len(bootstrapPeers)) for i, peer := range bootstrapPeers { for _, addr := range peer.Addrs { // Convert to full multiaddr with peer ID cfg.V2.DHT.BootstrapPeers[i] = fmt.Sprintf("%s/p2p/%s", addr.String(), peer.ID.String()) break // Use first address } } } // Initialize P2P node node, err := p2p.NewNode(ctx) if err != nil { return nil, fmt.Errorf("failed to create P2P node: %v", err) } runtime.Node = node runtime.Logger.Info("🐝 CHORUS node started successfully") runtime.Logger.Info("πŸ“ Node ID: %s", node.ID().ShortString()) runtime.Logger.Info("πŸ”— Listening addresses:") for _, addr := range node.Addresses() { runtime.Logger.Info(" %s/p2p/%s", addr, node.ID()) } // Wait for bootstrap peers to connect before proceeding // This prevents election race conditions where elections start before peer discovery // Increased from 5s to 15s to allow more time for P2P mesh formation if len(bootstrapPeers) > 0 { runtime.Logger.Info("Waiting 15 seconds for bootstrap peer connections to establish...") runtime.Logger.Info(" Target peers: %d bootstrap peers", len(bootstrapPeers)) // Poll connectivity every 3 seconds to provide feedback for i := 0; i < 5; i++ { time.Sleep(3 * time.Second) connectedPeers := len(node.Peers()) runtime.Logger.Info(" [%ds] Connected to %d peers", (i+1)*3, connectedPeers) // If we've connected to at least half the bootstrap peers, we're in good shape if connectedPeers >= len(bootstrapPeers)/2 && connectedPeers > 0 { runtime.Logger.Info("Bootstrap connectivity achieved (%d/%d peers), proceeding early", connectedPeers, len(bootstrapPeers)) break } } finalConnected := len(node.Peers()) if finalConnected == 0 { runtime.Logger.Warn("Bootstrap complete but NO peers connected - mesh may be isolated") } else { runtime.Logger.Info("Bootstrap grace period complete - %d peers connected", finalConnected) } } // Initialize Hypercore-style logger for P2P coordination hlog := logging.NewHypercoreLog(node.ID()) if runtime.Shhh != nil { hlog.SetRedactor(runtime.Shhh) } hlog.Append(logging.PeerJoined, map[string]interface{}{"status": "started"}) runtime.HypercoreLog = hlog runtime.Logger.Info("πŸ“ Hypercore logger initialized") // Initialize mDNS discovery mdnsDiscovery, err := discovery.NewMDNSDiscovery(ctx, node.Host(), "chorus-peer-discovery") if err != nil { return nil, fmt.Errorf("failed to create mDNS discovery: %v", err) } runtime.MDNSDiscovery = mdnsDiscovery // Initialize PubSub with hypercore logging ps, err := pubsub.NewPubSubWithLogger(ctx, node.Host(), "chorus/coordination/v1", "hmmm/meta-discussion/v1", hlog) if err != nil { return nil, fmt.Errorf("failed to create PubSub: %v", err) } if runtime.Shhh != nil { ps.SetRedactor(runtime.Shhh) } runtime.PubSub = ps runtime.Logger.Info("PubSub system initialized") // Join role-based topics if role is configured if cfg.Agent.Role != "" { reportsTo := []string{} if cfg.Agent.ReportsTo != "" { reportsTo = []string{cfg.Agent.ReportsTo} } if err := ps.JoinRoleBasedTopics(cfg.Agent.Role, cfg.Agent.Expertise, reportsTo); err != nil { runtime.Logger.Warn("Failed to join role-based topics: %v", err) } else { runtime.Logger.Info("🎯 Joined role-based collaboration topics") } } // Initialize remaining components if err := runtime.initializeElectionSystem(); err != nil { return nil, fmt.Errorf("failed to initialize election system: %v", err) } if err := runtime.initializeDHTStorage(); err != nil { return nil, fmt.Errorf("failed to initialize DHT storage: %v", err) } if err := runtime.initializeServices(); err != nil { return nil, fmt.Errorf("failed to initialize services: %v", err) } return runtime, nil } // Cleanup properly shuts down all runtime components func (r *SharedRuntime) Cleanup() { r.Logger.Info("Starting graceful shutdown...") if r.BackbeatIntegration != nil { r.BackbeatIntegration.Stop() } if r.MDNSDiscovery != nil { r.MDNSDiscovery.Close() r.Logger.Info("mDNS discovery closed") } if r.PubSub != nil { r.PubSub.Close() } if r.DHTNode != nil { r.DHTNode.Close() } if r.Node != nil { r.Node.Close() } if r.HTTPServer != nil { r.HTTPServer.Stop() } if r.CouncilSubscriber != nil { if err := r.CouncilSubscriber.Close(); err != nil { r.Logger.Warn("Failed to close council NATS subscriber: %v", err) } } if r.UCXIServer != nil { r.UCXIServer.Stop() } if r.ElectionManager != nil { r.ElectionManager.Stop() } if r.Cancel != nil { r.Cancel() } r.Logger.Info("CHORUS shutdown completed") } // Helper methods for initialization (extracted from main.go) func (r *SharedRuntime) initializeElectionSystem() error { // === Admin Election System === electionManager := election.NewElectionManager(r.Context, r.Config, r.Node.Host(), r.PubSub, r.Node.ID().ShortString()) if r.BackbeatIntegration != nil { electionManager.SetTempoResolver(func() int { return r.BackbeatIntegration.CurrentTempoBPM() }) electionManager.SetBeatGapResolver(func() time.Duration { return r.BackbeatIntegration.TimeSinceLastBeat() }) } // Set election callbacks with BACKBEAT integration electionManager.SetCallbacks( func(oldAdmin, newAdmin string) { r.Logger.Info("πŸ‘‘ Admin changed: %s -> %s", oldAdmin, newAdmin) // Track admin change with BACKBEAT if available if r.BackbeatIntegration != nil { operationID := fmt.Sprintf("admin-change-%d", time.Now().Unix()) if err := r.BackbeatIntegration.StartP2POperation(operationID, "admin_change", 2, map[string]interface{}{ "old_admin": oldAdmin, "new_admin": newAdmin, }); err == nil { // Complete immediately as this is a state change, not a long operation r.BackbeatIntegration.CompleteP2POperation(operationID, 1) } } // If this node becomes admin, enable SLURP functionality if newAdmin == r.Node.ID().ShortString() { r.Logger.Info("🎯 This node is now admin - enabling SLURP functionality") r.Config.Slurp.Enabled = true // Apply admin role configuration if err := r.Config.ApplyRoleDefinition("admin"); err != nil { r.Logger.Warn("Failed to apply admin role: %v", err) } } }, func(winner string) { r.Logger.Info("πŸ† Election completed, winner: %s", winner) // Track election completion with BACKBEAT if available if r.BackbeatIntegration != nil { operationID := fmt.Sprintf("election-completed-%d", time.Now().Unix()) if err := r.BackbeatIntegration.StartP2POperation(operationID, "election", 1, map[string]interface{}{ "winner": winner, "node_id": r.Node.ID().ShortString(), }); err == nil { r.BackbeatIntegration.CompleteP2POperation(operationID, 1) } } }, ) if err := electionManager.Start(); err != nil { return fmt.Errorf("failed to start election manager: %v", err) } r.ElectionManager = electionManager r.Logger.Info("Election manager started with automated heartbeat management") return nil } func (r *SharedRuntime) initializeDHTStorage() error { // === DHT Storage and Decision Publishing === var dhtNode *dht.LibP2PDHT var encryptedStorage *dht.EncryptedDHTStorage var decisionPublisher *ucxl.DecisionPublisher if r.Config.V2.DHT.Enabled { // Create DHT var err error dhtNode, err = dht.NewLibP2PDHT(r.Context, r.Node.Host()) if err != nil { r.Logger.Warn("Failed to create DHT: %v", err) } else { r.Logger.Info("πŸ•ΈοΈ DHT initialized") // Bootstrap DHT with BACKBEAT tracking if r.BackbeatIntegration != nil { operationID := fmt.Sprintf("dht-bootstrap-%d", time.Now().Unix()) if err := r.BackbeatIntegration.StartP2POperation(operationID, "dht_bootstrap", 4, nil); err == nil { r.BackbeatIntegration.UpdateP2POperationPhase(operationID, backbeat.PhaseConnecting, 0) } if err := dhtNode.Bootstrap(); err != nil { r.Logger.Warn("DHT bootstrap failed: %v", err) r.BackbeatIntegration.FailP2POperation(operationID, err.Error()) } else { r.BackbeatIntegration.CompleteP2POperation(operationID, 1) } } else { if err := dhtNode.Bootstrap(); err != nil { r.Logger.Warn("DHT bootstrap failed: %v", err) } } // Connect to bootstrap peers (with assignment override support) bootstrapPeers := r.RuntimeConfig.GetBootstrapPeers() if len(bootstrapPeers) == 0 { bootstrapPeers = r.Config.V2.DHT.BootstrapPeers } // Apply join stagger if configured joinStagger := r.RuntimeConfig.GetJoinStagger() if joinStagger > 0 { r.Logger.Info("⏱️ Applying join stagger delay: %v", joinStagger) time.Sleep(joinStagger) } for _, addrStr := range bootstrapPeers { addr, err := multiaddr.NewMultiaddr(addrStr) if err != nil { r.Logger.Warn("Invalid bootstrap address %s: %v", addrStr, err) continue } // Extract peer info from multiaddr info, err := peer.AddrInfoFromP2pAddr(addr) if err != nil { r.Logger.Warn("Failed to parse peer info from %s: %v", addrStr, err) continue } // Track peer discovery with BACKBEAT if available if r.BackbeatIntegration != nil { operationID := fmt.Sprintf("peer-discovery-%d", time.Now().Unix()) if err := r.BackbeatIntegration.StartP2POperation(operationID, "peer_discovery", 2, map[string]interface{}{ "peer_addr": addrStr, }); err == nil { r.BackbeatIntegration.UpdateP2POperationPhase(operationID, backbeat.PhaseConnecting, 0) if err := r.Node.Host().Connect(r.Context, *info); err != nil { r.Logger.Warn("Failed to connect to bootstrap peer %s: %v", addrStr, err) r.BackbeatIntegration.FailP2POperation(operationID, err.Error()) } else { r.Logger.Info("πŸ”— Connected to DHT bootstrap peer: %s", addrStr) r.BackbeatIntegration.CompleteP2POperation(operationID, 1) } } } else { if err := r.Node.Host().Connect(r.Context, *info); err != nil { r.Logger.Warn("Failed to connect to bootstrap peer %s: %v", addrStr, err) } else { r.Logger.Info("πŸ”— Connected to DHT bootstrap peer: %s", addrStr) } } } // Initialize encrypted storage encryptedStorage = dht.NewEncryptedDHTStorage( r.Context, r.Node.Host(), dhtNode, r.Config, r.Node.ID().ShortString(), ) // Start cache cleanup encryptedStorage.StartCacheCleanup(5 * time.Minute) r.Logger.Info("πŸ” Encrypted DHT storage initialized") // Initialize decision publisher decisionPublisher = ucxl.NewDecisionPublisher( r.Context, r.Config, encryptedStorage, r.Node.ID().ShortString(), r.Config.Agent.ID, ) r.Logger.Info("Decision publisher initialized") } } else { r.Logger.Info("βšͺ DHT disabled in configuration") } r.DHTNode = dhtNode r.EncryptedStorage = encryptedStorage r.DecisionPublisher = decisionPublisher return nil } func (r *SharedRuntime) initializeServices() error { // Create simple task tracker ahead of coordinator so broadcasts stay accurate taskTracker := &SimpleTaskTracker{ maxTasks: r.Config.Agent.MaxTasks, activeTasks: make(map[string]bool), logger: logging.ForComponent(logging.ComponentRuntime), } // Connect decision publisher to task tracker if available if r.DecisionPublisher != nil { taskTracker.decisionPublisher = r.DecisionPublisher r.Logger.Info("Task completion decisions will be published to DHT") } r.TaskTracker = taskTracker // === Task Coordination Integration === taskCoordinator := coordinator.NewTaskCoordinator( r.Context, r.PubSub, r.HypercoreLog, r.Config, r.Node.ID().ShortString(), nil, // HMMM router placeholder taskTracker, ) taskCoordinator.Start() r.TaskCoordinator = taskCoordinator r.Logger.Info("Task coordination system active") // Start HTTP API server httpServer := api.NewHTTPServer(r.Config, r.Node, r.HypercoreLog, r.PubSub) go func() { r.Logger.Info("HTTP API server starting on :%d", r.Config.Network.APIPort) if err := httpServer.Start(); err != nil && err != http.ErrServerClosed { r.Logger.Error("HTTP server error: %v", err) } }() r.HTTPServer = httpServer // Enable NATS-based council opportunity delivery. natsURL := strings.TrimSpace(os.Getenv("CHORUS_COUNCIL_NATS_URL")) if natsURL == "" { natsURL = strings.TrimSpace(os.Getenv("CHORUS_BACKBEAT_NATS_URL")) } if natsURL == "" { natsURL = "nats://backbeat-nats:4222" } if subscriber, err := councilnats.NewCouncilSubscriber(natsURL, httpServer.CouncilManager, httpServer.WhooshEndpoint()); err != nil { r.Logger.Warn("Council NATS subscriber disabled: %v", err) } else { r.CouncilSubscriber = subscriber r.Logger.Info("Council opportunities via NATS enabled (url=%s)", natsURL) } // === UCXI Server Integration === var ucxiServer *ucxi.Server if r.Config.UCXL.Enabled && r.Config.UCXL.Server.Enabled { storageDir := r.Config.UCXL.Storage.Directory if storageDir == "" { storageDir = filepath.Join(os.TempDir(), "chorus-ucxi-storage") } storage, err := ucxi.NewBasicContentStorage(storageDir) if err != nil { r.Logger.Warn("Failed to create UCXI storage: %v", err) } else { resolver := ucxi.NewBasicAddressResolver(r.Node.ID().ShortString()) resolver.SetDefaultTTL(r.Config.UCXL.Resolution.CacheTTL) ucxiConfig := ucxi.ServerConfig{ Port: r.Config.UCXL.Server.Port, BasePath: r.Config.UCXL.Server.BasePath, Resolver: resolver, Storage: storage, Logger: ucxi.NewSimpleLogger(logging.ComponentUCXI), } ucxiServer = ucxi.NewServer(ucxiConfig) go func() { r.Logger.Info("πŸ”— UCXI server starting on :%d", r.Config.UCXL.Server.Port) if err := ucxiServer.Start(); err != nil && err != http.ErrServerClosed { r.Logger.Error("UCXI server error: %v", err) } }() } } else { r.Logger.Info("βšͺ UCXI server disabled") } r.UCXIServer = ucxiServer return nil } func (r *SharedRuntime) handleShhhFindings(ctx context.Context, findings []shhh.Finding) { if r == nil || r.Metrics == nil { return } for _, finding := range findings { r.Metrics.IncrementSHHHFindings(finding.Rule, string(finding.Severity), finding.Count) } } type shhhAuditSink struct { logger *SimpleLogger } func (s *shhhAuditSink) RecordRedaction(_ context.Context, event shhh.AuditEvent) { if s == nil || s.logger == nil { return } s.logger.Warn("πŸ”’ SHHH redaction applied (rule=%s severity=%s path=%s)", event.Rule, event.Severity, event.Path) } // initializeAIProvider configures the reasoning engine with the appropriate AI provider func initializeAIProvider(cfg *config.Config, logger *SimpleLogger) error { // Set the AI provider reasoning.SetAIProvider(cfg.AI.Provider) // Configure the selected provider switch cfg.AI.Provider { case "resetdata": if cfg.AI.ResetData.APIKey == "" { return fmt.Errorf("RESETDATA_API_KEY environment variable is required for resetdata provider") } resetdataConfig := reasoning.ResetDataConfig{ BaseURL: cfg.AI.ResetData.BaseURL, APIKey: cfg.AI.ResetData.APIKey, Model: cfg.AI.ResetData.Model, Timeout: cfg.AI.ResetData.Timeout, } reasoning.SetResetDataConfig(resetdataConfig) logger.Info("ResetData AI provider configured - Endpoint: %s, Model: %s", cfg.AI.ResetData.BaseURL, cfg.AI.ResetData.Model) case "ollama": reasoning.SetOllamaEndpoint(cfg.AI.Ollama.Endpoint) logger.Info("πŸ¦™ Ollama AI provider configured - Endpoint: %s", cfg.AI.Ollama.Endpoint) default: logger.Warn("Unknown AI provider '%s', defaulting to resetdata", cfg.AI.Provider) if cfg.AI.ResetData.APIKey == "" { return fmt.Errorf("RESETDATA_API_KEY environment variable is required for default resetdata provider") } resetdataConfig := reasoning.ResetDataConfig{ BaseURL: cfg.AI.ResetData.BaseURL, APIKey: cfg.AI.ResetData.APIKey, Model: cfg.AI.ResetData.Model, Timeout: cfg.AI.ResetData.Timeout, } reasoning.SetResetDataConfig(resetdataConfig) reasoning.SetAIProvider("resetdata") } // Configure model selection reasoning.SetModelConfig( cfg.Agent.Models, cfg.Agent.ModelSelectionWebhook, cfg.Agent.DefaultReasoningModel, ) // Initialize prompt sources (roles + default instructions) from Docker-mounted directory promptsDir := os.Getenv("CHORUS_PROMPTS_DIR") defaultInstrPath := os.Getenv("CHORUS_DEFAULT_INSTRUCTIONS_PATH") _ = prompt.Initialize(promptsDir, defaultInstrPath) // Compose S + D for the agent role if available; otherwise use D only if cfg.Agent.Role != "" { if composed, err := prompt.ComposeSystemPrompt(cfg.Agent.Role); err == nil && strings.TrimSpace(composed) != "" { reasoning.SetDefaultSystemPrompt(composed) } else if d := prompt.GetDefaultInstructions(); strings.TrimSpace(d) != "" { reasoning.SetDefaultSystemPrompt(d) } } else if d := prompt.GetDefaultInstructions(); strings.TrimSpace(d) != "" { reasoning.SetDefaultSystemPrompt(d) } // Initialize LightRAG client if enabled if cfg.LightRAG.Enabled { lightragConfig := mcp.LightRAGConfig{ BaseURL: cfg.LightRAG.BaseURL, Timeout: cfg.LightRAG.Timeout, APIKey: cfg.LightRAG.APIKey, } lightragClient := mcp.NewLightRAGClient(lightragConfig) // Test connectivity ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) defer cancel() if lightragClient.IsHealthy(ctx) { reasoning.SetLightRAGClient(lightragClient) logger.Info("πŸ“š LightRAG RAG system enabled - Endpoint: %s, Mode: %s", cfg.LightRAG.BaseURL, cfg.LightRAG.DefaultMode) } else { logger.Warn("LightRAG enabled but server not healthy at %s", cfg.LightRAG.BaseURL) } } return nil } // fetchBootstrapPeers fetches bootstrap peer list from WHOOSH func fetchBootstrapPeers(whooshURL string, logger *SimpleLogger) ([]peer.AddrInfo, error) { client := &http.Client{Timeout: 10 * time.Second} url := fmt.Sprintf("%s/api/v1/bootstrap-peers", whooshURL) logger.Info("Fetching bootstrap peers from: %s", url) resp, err := client.Get(url) if err != nil { return nil, fmt.Errorf("failed to fetch bootstrap peers: %w", err) } defer resp.Body.Close() if resp.StatusCode != http.StatusOK { return nil, fmt.Errorf("bootstrap endpoint returned status %d", resp.StatusCode) } var result struct { BootstrapPeers []struct { Multiaddr string `json:"multiaddr"` PeerID string `json:"peer_id"` Name string `json:"name"` Priority int `json:"priority"` } `json:"bootstrap_peers"` } if err := json.NewDecoder(resp.Body).Decode(&result); err != nil { return nil, fmt.Errorf("failed to decode bootstrap peers: %w", err) } // Convert to peer.AddrInfo format peers := make([]peer.AddrInfo, 0, len(result.BootstrapPeers)) for _, bp := range result.BootstrapPeers { maddr, err := multiaddr.NewMultiaddr(bp.Multiaddr) if err != nil { logger.Warn("Invalid multiaddr %s: %v", bp.Multiaddr, err) continue } peerID, err := peer.Decode(bp.PeerID) if err != nil { logger.Warn("Invalid peer ID %s: %v", bp.PeerID, err) continue } peers = append(peers, peer.AddrInfo{ ID: peerID, Addrs: []multiaddr.Multiaddr{maddr}, }) logger.Info(" Bootstrap peer: %s (%s, priority %d)", bp.Name, bp.PeerID, bp.Priority) } return peers, nil } // getStaticBootstrapPeers returns a static fallback list of bootstrap peers func getStaticBootstrapPeers(logger *SimpleLogger) []peer.AddrInfo { logger.Warn("Using static bootstrap peer configuration (fallback)") // Static HMMM monitor peer (if WHOOSH is unavailable) staticPeers := []string{ "/ip4/172.27.0.6/tcp/9001/p2p/12D3KooWBhVfNETuGyjsrGwmhny7vnJzP1y7H59oqmq1VAPTzQMW", } peers := make([]peer.AddrInfo, 0, len(staticPeers)) for _, peerStr := range staticPeers { maddr, err := multiaddr.NewMultiaddr(peerStr) if err != nil { logger.Warn("Invalid static multiaddr %s: %v", peerStr, err) continue } addrInfo, err := peer.AddrInfoFromP2pAddr(maddr) if err != nil { logger.Warn("Failed to parse static peer address %s: %v", peerStr, err) continue } peers = append(peers, *addrInfo) logger.Info(" πŸ“Œ Static bootstrap peer: %s", addrInfo.ID.ShortString()) } return peers }