diff --git a/Makefile b/Makefile index 5529643..1f95d1b 100644 --- a/Makefile +++ b/Makefile @@ -6,7 +6,7 @@ BINARY_NAME_AGENT = chorus-agent BINARY_NAME_HAP = chorus-hap BINARY_NAME_COMPAT = chorus BINARY_NAME_SEQTHINK = seqthink-wrapper -VERSION ?= 0.5.28 +VERSION ?= 0.5.40 COMMIT_HASH ?= $(shell git rev-parse --short HEAD 2>/dev/null || echo "unknown") BUILD_DATE ?= $(shell date -u '+%Y-%m-%d_%H:%M:%S') @@ -31,15 +31,15 @@ build: build-agent build-hap build-compat build-agent: @echo "πŸ€– Building CHORUS autonomous agent..." @mkdir -p $(BUILD_DIR) - go build $(BUILD_FLAGS) -o $(BUILD_DIR)/$(BINARY_NAME_AGENT) ./$(CMD_DIR)/agent + GOWORK=off go build -mod=mod $(BUILD_FLAGS) -o $(BUILD_DIR)/$(BINARY_NAME_AGENT) ./$(CMD_DIR)/agent @echo "βœ… Agent binary built: $(BUILD_DIR)/$(BINARY_NAME_AGENT)" -# Build human agent portal binary +# Build human agent portal binary .PHONY: build-hap build-hap: @echo "πŸ‘€ Building CHORUS human agent portal..." @mkdir -p $(BUILD_DIR) - go build $(BUILD_FLAGS) -o $(BUILD_DIR)/$(BINARY_NAME_HAP) ./$(CMD_DIR)/hap + GOWORK=off go build -mod=mod $(BUILD_FLAGS) -o $(BUILD_DIR)/$(BINARY_NAME_HAP) ./$(CMD_DIR)/hap @echo "βœ… HAP binary built: $(BUILD_DIR)/$(BINARY_NAME_HAP)" # Build compatibility wrapper (deprecated) @@ -47,7 +47,7 @@ build-hap: build-compat: @echo "⚠️ Building CHORUS compatibility wrapper (deprecated)..." @mkdir -p $(BUILD_DIR) - go build $(BUILD_FLAGS) -o $(BUILD_DIR)/$(BINARY_NAME_COMPAT) ./$(CMD_DIR)/chorus + GOWORK=off go build -mod=mod $(BUILD_FLAGS) -o $(BUILD_DIR)/$(BINARY_NAME_COMPAT) ./$(CMD_DIR)/chorus @echo "βœ… Compatibility wrapper built: $(BUILD_DIR)/$(BINARY_NAME_COMPAT)" # Build Sequential Thinking age-encrypted wrapper diff --git a/api/http_server.go b/api/http_server.go index 3e1d494..07a60a3 100644 --- a/api/http_server.go +++ b/api/http_server.go @@ -16,6 +16,7 @@ import ( "chorus/pubsub" "github.com/gorilla/mux" + "github.com/rs/zerolog" ) // HTTPServer provides HTTP API endpoints for CHORUS @@ -23,9 +24,11 @@ type HTTPServer struct { port int hypercoreLog *logging.HypercoreLog pubsub *pubsub.PubSub + node *p2p.Node // P2P node for peer ID and network info server *http.Server CouncilManager *council.Manager // Exported for brief processing whooshEndpoint string + logger zerolog.Logger } // NewHTTPServer creates a new HTTP server for CHORUS API @@ -47,11 +50,18 @@ func NewHTTPServer(cfg *config.Config, node *p2p.Node, hlog *logging.HypercoreLo port: cfg.Network.APIPort, hypercoreLog: hlog, pubsub: ps, + node: node, CouncilManager: councilMgr, whooshEndpoint: strings.TrimRight(whooshEndpoint, "/"), + logger: logging.ForComponent(logging.ComponentServer), } } +// WhooshEndpoint returns the WHOOSH base endpoint configured for this agent. +func (h *HTTPServer) WhooshEndpoint() string { + return h.whooshEndpoint +} + func deriveAgentName(cfg *config.Config) string { if v := strings.TrimSpace(os.Getenv("CHORUS_AGENT_NAME")); v != "" { return v @@ -161,7 +171,7 @@ func (h *HTTPServer) Start() error { IdleTimeout: 60 * time.Second, } - fmt.Printf("🌐 Starting HTTP API server on port %d\n", h.port) + h.logger.Info().Int("port", h.port).Msg("Starting HTTP API server") return h.server.ListenAndServe() } @@ -304,7 +314,7 @@ func (h *HTTPServer) handleGetLogStats(w http.ResponseWriter, r *http.Request) { json.NewEncoder(w).Encode(stats) } -// handleHealth returns health status +// handleHealth returns health status with P2P network information func (h *HTTPServer) handleHealth(w http.ResponseWriter, r *http.Request) { w.Header().Set("Content-Type", "application/json") @@ -314,6 +324,89 @@ func (h *HTTPServer) handleHealth(w http.ResponseWriter, r *http.Request) { "log_entries": h.hypercoreLog.Length(), } + // Add P2P network information if node is available + if h.node != nil { + // Get peer ID + health["peer_id"] = h.node.ID().String() + + // Build complete multiaddrs with peer ID using actual container IPs + // This is required for Docker Swarm because the service VIP load-balances + // and would cause peer ID mismatches when connecting to different replicas + var multiaddrs []string + rawAddrs := h.node.Addresses() + + // Log what addresses we're getting from the node + h.logger.Debug().Int("address_count", len(rawAddrs)).Msg("Processing node addresses") + for i, addr := range rawAddrs { + h.logger.Debug().Int("index", i).Str("address", addr.String()).Msg("Raw address") + } + + for _, addr := range rawAddrs { + addrStr := addr.String() + + // Extract IP and port from multiaddr + var ip, port string + if strings.Contains(addrStr, "/ip4/") && strings.Contains(addrStr, "/tcp/") { + parts := strings.Split(addrStr, "/") + for i := 0; i < len(parts)-1; i++ { + if parts[i] == "ip4" { + ip = parts[i+1] + } + if parts[i] == "tcp" { + port = parts[i+1] + } + } + } + + // Skip localhost addresses + if ip == "127.0.0.1" || ip == "::1" { + continue + } + + // Build IP-based multiaddr for direct P2P connections + // This bypasses the Docker Swarm VIP and allows direct connection to this specific replica + if ip != "" && port != "" { + multiaddr := fmt.Sprintf("/ip4/%s/tcp/%s/p2p/%s", ip, port, h.node.ID().String()) + h.logger.Debug().Str("multiaddr", multiaddr).Msg("Built multiaddr") + multiaddrs = append(multiaddrs, multiaddr) + } + } + health["multiaddrs"] = multiaddrs + + // Add connected peer count + connectedPeers := h.node.ConnectedPeers() + health["connected_peers"] = connectedPeers + + // P2P Connectivity Status - critical for detecting mesh issues + p2pStatus := "healthy" + if connectedPeers == 0 { + p2pStatus = "isolated" // No peers - serious issue + health["status"] = "degraded" + } else if connectedPeers < 3 { + p2pStatus = "limited" // Few peers - potential discovery issue + } + health["p2p_status"] = p2pStatus + + // Add DHT status if available + if h.node.DHT() != nil { + health["dht_enabled"] = true + // DHT routing table size indicates how many nodes we know about + health["dht_routing_table_size"] = h.node.DHT().GetDHTSize() + } else { + health["dht_enabled"] = false + } + + // Add GossipSub topics (static topics that agents join) + health["gossipsub_topics"] = []string{ + "CHORUS/coordination/v1", + "hmmm/meta-discussion/v1", + "CHORUS/context-feedback/v1", + } + + // Add bootstrap status + health["bootstrap_peers_configured"] = len(h.node.BootstrapPeers()) + } + json.NewEncoder(w).Encode(health) } @@ -350,34 +443,43 @@ func (h *HTTPServer) handleCouncilOpportunity(w http.ResponseWriter, r *http.Req "core_roles": len(opportunity.CoreRoles), "optional_roles": len(opportunity.OptionalRoles), "ucxl_address": opportunity.UCXLAddress, - "message": fmt.Sprintf("πŸ“‘ Received council opportunity for project: %s", opportunity.ProjectName), + "message": fmt.Sprintf("Received council opportunity for project: %s", opportunity.ProjectName), } if _, err := h.hypercoreLog.Append(logging.NetworkEvent, logData); err != nil { - fmt.Printf("Failed to log council opportunity: %v\n", err) + h.logger.Warn().Err(err).Msg("Failed to log council opportunity") } - // Log to console for immediate visibility - fmt.Printf("\nπŸ“‘ COUNCIL OPPORTUNITY RECEIVED\n") - fmt.Printf(" Council ID: %s\n", opportunity.CouncilID) - fmt.Printf(" Project: %s\n", opportunity.ProjectName) - fmt.Printf(" Repository: %s\n", opportunity.Repository) - fmt.Printf(" Core Roles: %d\n", len(opportunity.CoreRoles)) - fmt.Printf(" Optional Roles: %d\n", len(opportunity.OptionalRoles)) - fmt.Printf(" UCXL: %s\n", opportunity.UCXLAddress) - fmt.Printf("\n Available Roles:\n") + // Log council opportunity with structured logging + h.logger.Info(). + Str("council_id", opportunity.CouncilID). + Str("project_name", opportunity.ProjectName). + Str("repository", opportunity.Repository). + Int("core_roles", len(opportunity.CoreRoles)). + Int("optional_roles", len(opportunity.OptionalRoles)). + Str("ucxl_address", opportunity.UCXLAddress). + Msg("Council opportunity received") + + // Log available roles for _, role := range opportunity.CoreRoles { - fmt.Printf(" - %s (%s) [CORE]\n", role.AgentName, role.RoleName) + h.logger.Info(). + Str("agent_name", role.AgentName). + Str("role_name", role.RoleName). + Str("role_type", "CORE"). + Msg("Available role") } for _, role := range opportunity.OptionalRoles { - fmt.Printf(" - %s (%s) [OPTIONAL]\n", role.AgentName, role.RoleName) + h.logger.Info(). + Str("agent_name", role.AgentName). + Str("role_name", role.RoleName). + Str("role_type", "OPTIONAL"). + Msg("Available role") } - fmt.Printf("\n") // Evaluate the opportunity and claim a role if suitable go func() { if err := h.CouncilManager.EvaluateOpportunity(&opportunity, h.whooshEndpoint); err != nil { - fmt.Printf("Failed to evaluate/claim council role: %v\n", err) + h.logger.Warn().Err(err).Msg("Failed to evaluate/claim council role") } }() @@ -453,18 +555,19 @@ func (h *HTTPServer) handleCouncilStatusUpdate(w http.ResponseWriter, r *http.Re } if _, err := h.hypercoreLog.Append(logging.NetworkEvent, logData); err != nil { - fmt.Printf("Failed to log council status update: %v\n", err) + h.logger.Warn().Err(err).Msg("Failed to log council status update") } - fmt.Printf("\n🏁 COUNCIL STATUS UPDATE\n") - fmt.Printf(" Council ID: %s\n", payload.CouncilID) - if payload.ProjectName != "" { - fmt.Printf(" Project: %s\n", payload.ProjectName) - } - fmt.Printf(" Status: %s\n", payload.Status) - fmt.Printf(" Core Roles: %d/%d claimed\n", payload.CoreRoles.Claimed, payload.CoreRoles.Total) - fmt.Printf(" Optional Roles: %d/%d claimed\n", payload.Optional.Claimed, payload.Optional.Total) - fmt.Printf(" Message: %s\n\n", payload.Message) + h.logger.Info(). + Str("council_id", payload.CouncilID). + Str("project_name", payload.ProjectName). + Str("status", payload.Status). + Int("core_roles_claimed", payload.CoreRoles.Claimed). + Int("core_roles_total", payload.CoreRoles.Total). + Int("optional_roles_claimed", payload.Optional.Claimed). + Int("optional_roles_total", payload.Optional.Total). + Str("message", payload.Message). + Msg("Council status update") response := map[string]interface{}{ "status": "received", @@ -497,13 +600,12 @@ func (h *HTTPServer) handleCouncilBrief(w http.ResponseWriter, r *http.Request) brief.CouncilID = councilID brief.RoleName = roleName - fmt.Printf("\nπŸ“¦ Received council brief for %s (%s)\n", councilID, roleName) - if brief.BriefURL != "" { - fmt.Printf(" Brief URL: %s\n", brief.BriefURL) - } - if brief.Summary != "" { - fmt.Printf(" Summary: %s\n", brief.Summary) - } + h.logger.Info(). + Str("council_id", councilID). + Str("role_name", roleName). + Str("brief_url", brief.BriefURL). + Str("summary", brief.Summary). + Msg("Received council brief") if h.CouncilManager != nil { h.CouncilManager.HandleCouncilBrief(councilID, roleName, &brief) @@ -523,7 +625,7 @@ func (h *HTTPServer) handleCouncilBrief(w http.ResponseWriter, r *http.Request) } if _, err := h.hypercoreLog.Append(logging.NetworkEvent, logData); err != nil { - fmt.Printf("Failed to log council brief: %v\n", err) + h.logger.Warn().Err(err).Msg("Failed to log council brief") } response := map[string]interface{}{ diff --git a/discovery/mdns.go b/discovery/mdns.go index 06782f0..a5082aa 100644 --- a/discovery/mdns.go +++ b/discovery/mdns.go @@ -5,9 +5,11 @@ import ( "fmt" "time" + "chorus/internal/logging" "github.com/libp2p/go-libp2p/core/host" "github.com/libp2p/go-libp2p/core/peer" "github.com/libp2p/go-libp2p/p2p/discovery/mdns" + "github.com/rs/zerolog" ) // MDNSDiscovery handles mDNS peer discovery for local network @@ -18,6 +20,7 @@ type MDNSDiscovery struct { ctx context.Context cancel context.CancelFunc serviceTag string + logger zerolog.Logger } // mdnsNotifee handles discovered peers @@ -25,6 +28,7 @@ type mdnsNotifee struct { h host.Host ctx context.Context peersChan chan peer.AddrInfo + logger zerolog.Logger } // NewMDNSDiscovery creates a new mDNS discovery service @@ -35,11 +39,14 @@ func NewMDNSDiscovery(ctx context.Context, h host.Host, serviceTag string) (*MDN discoveryCtx, cancel := context.WithCancel(ctx) + logger := logging.ForComponent(logging.ComponentP2P) + // Create notifee to handle discovered peers notifee := &mdnsNotifee{ h: h, ctx: discoveryCtx, peersChan: make(chan peer.AddrInfo, 10), + logger: logger, } // Create mDNS service @@ -52,6 +59,7 @@ func NewMDNSDiscovery(ctx context.Context, h host.Host, serviceTag string) (*MDN ctx: discoveryCtx, cancel: cancel, serviceTag: serviceTag, + logger: logger, } // Start the service @@ -63,7 +71,7 @@ func NewMDNSDiscovery(ctx context.Context, h host.Host, serviceTag string) (*MDN // Start background peer connection handler go discovery.handleDiscoveredPeers() - fmt.Printf("πŸ” mDNS Discovery started with service tag: %s\n", serviceTag) + logger.Info().Str("service_tag", serviceTag).Msg("mDNS Discovery started") return discovery, nil } @@ -90,13 +98,13 @@ func (d *MDNSDiscovery) handleDiscoveredPeers() { } // Attempt to connect - fmt.Printf("🀝 Discovered peer %s, attempting connection...\n", peerInfo.ID.ShortString()) - + d.logger.Info().Str("peer_id", peerInfo.ID.ShortString()).Msg("Discovered peer, attempting connection") + connectCtx, cancel := context.WithTimeout(d.ctx, 10*time.Second) if err := d.host.Connect(connectCtx, peerInfo); err != nil { - fmt.Printf("❌ Failed to connect to peer %s: %v\n", peerInfo.ID.ShortString(), err) + d.logger.Warn().Err(err).Str("peer_id", peerInfo.ID.ShortString()).Msg("Failed to connect to peer") } else { - fmt.Printf("βœ… Successfully connected to peer %s\n", peerInfo.ID.ShortString()) + d.logger.Info().Str("peer_id", peerInfo.ID.ShortString()).Msg("Successfully connected to peer") } cancel() } @@ -119,6 +127,6 @@ func (n *mdnsNotifee) HandlePeerFound(pi peer.AddrInfo) { // Peer info sent to channel default: // Channel is full, skip this peer - fmt.Printf("⚠️ Discovery channel full, skipping peer %s\n", pi.ID.ShortString()) + n.logger.Warn().Str("peer_id", pi.ID.ShortString()).Msg("Discovery channel full, skipping peer") } } \ No newline at end of file diff --git a/internal/runtime/shared.go b/internal/runtime/shared.go index 615af32..ec822b8 100644 --- a/internal/runtime/shared.go +++ b/internal/runtime/shared.go @@ -2,8 +2,8 @@ package runtime import ( "context" + "encoding/json" "fmt" - "log" "net/http" "os" "path/filepath" @@ -16,6 +16,7 @@ import ( "chorus/internal/backbeat" "chorus/internal/licensing" "chorus/internal/logging" + councilnats "chorus/internal/nats" "chorus/p2p" "chorus/pkg/config" "chorus/pkg/dht" @@ -32,29 +33,38 @@ import ( "chorus/reasoning" "github.com/libp2p/go-libp2p/core/peer" "github.com/multiformats/go-multiaddr" + "github.com/rs/zerolog" ) // Build information - set by main package var ( AppName = "CHORUS" - AppVersion = "0.1.0-dev" + AppVersion = "0.5.32" AppCommitHash = "unknown" AppBuildDate = "unknown" ) -// SimpleLogger provides basic logging implementation -type SimpleLogger struct{} +// SimpleLogger provides structured logging implementation via zerolog +type SimpleLogger struct { + logger zerolog.Logger +} + +func NewSimpleLogger(component string) *SimpleLogger { + return &SimpleLogger{ + logger: logging.ForComponent(component), + } +} func (l *SimpleLogger) Info(msg string, args ...interface{}) { - log.Printf("[INFO] "+msg, args...) + l.logger.Info().Msgf(msg, args...) } func (l *SimpleLogger) Warn(msg string, args ...interface{}) { - log.Printf("[WARN] "+msg, args...) + l.logger.Warn().Msgf(msg, args...) } func (l *SimpleLogger) Error(msg string, args ...interface{}) { - log.Printf("[ERROR] "+msg, args...) + l.logger.Error().Msgf(msg, args...) } // SimpleTaskTracker tracks active tasks for availability reporting @@ -62,6 +72,7 @@ type SimpleTaskTracker struct { maxTasks int activeTasks map[string]bool decisionPublisher *ucxl.DecisionPublisher + logger zerolog.Logger } // GetActiveTasks returns list of active task IDs @@ -100,9 +111,14 @@ func (t *SimpleTaskTracker) publishTaskCompletion(taskID string, success bool, s } if err := t.decisionPublisher.PublishTaskCompletion(taskID, success, summary, filesModified); err != nil { - fmt.Printf("⚠️ Failed to publish task completion for %s: %v\n", taskID, err) + t.logger.Warn(). + Err(err). + Str("task_id", taskID). + Msg("Failed to publish task completion") } else { - fmt.Printf("πŸ“€ Published task completion decision for: %s\n", taskID) + t.logger.Debug(). + Str("task_id", taskID). + Msg("Published task completion decision") } } @@ -131,52 +147,53 @@ type SharedRuntime struct { TaskTracker *SimpleTaskTracker Metrics *metrics.CHORUSMetrics Shhh *shhh.Sentinel + CouncilSubscriber *councilnats.CouncilSubscriber } // Initialize sets up all shared P2P infrastructure components func Initialize(appMode string) (*SharedRuntime, error) { runtime := &SharedRuntime{} - runtime.Logger = &SimpleLogger{} + runtime.Logger = NewSimpleLogger(logging.ComponentRuntime) ctx, cancel := context.WithCancel(context.Background()) runtime.Context = ctx runtime.Cancel = cancel - runtime.Logger.Info("🎭 Starting CHORUS v%s (build: %s, %s) - Container-First P2P Task Coordination", AppVersion, AppCommitHash, AppBuildDate) + runtime.Logger.Info("Starting CHORUS v%s (build: %s, %s) - Container-First P2P Task Coordination", AppVersion, AppCommitHash, AppBuildDate) runtime.Logger.Info("πŸ“¦ Container deployment - Mode: %s", appMode) // Load configuration from environment (no config files in containers) - runtime.Logger.Info("πŸ“‹ Loading configuration from environment variables...") + runtime.Logger.Info("Loading configuration from environment variables...") cfg, err := config.LoadFromEnvironment() if err != nil { return nil, fmt.Errorf("configuration error: %v", err) } runtime.Config = cfg - runtime.Logger.Info("βœ… Configuration loaded successfully") + runtime.Logger.Info("Configuration loaded successfully") // Initialize runtime configuration with assignment support runtime.RuntimeConfig = config.NewRuntimeConfig(cfg) // Load assignment if ASSIGN_URL is configured if assignURL := os.Getenv("ASSIGN_URL"); assignURL != "" { - runtime.Logger.Info("πŸ“‘ Loading assignment from WHOOSH: %s", assignURL) + runtime.Logger.Info("Loading assignment from WHOOSH: %s", assignURL) ctx, cancel := context.WithTimeout(runtime.Context, 10*time.Second) if err := runtime.RuntimeConfig.LoadAssignment(ctx, assignURL); err != nil { - runtime.Logger.Warn("⚠️ Failed to load assignment (continuing with base config): %v", err) + runtime.Logger.Warn("Failed to load assignment (continuing with base config): %v", err) } else { - runtime.Logger.Info("βœ… Assignment loaded successfully") + runtime.Logger.Info("Assignment loaded successfully") } cancel() // Start reload handler for SIGHUP runtime.RuntimeConfig.StartReloadHandler(runtime.Context, assignURL) - runtime.Logger.Info("πŸ“‘ SIGHUP reload handler started for assignment updates") + runtime.Logger.Info("SIGHUP reload handler started for assignment updates") } else { runtime.Logger.Info("βšͺ No ASSIGN_URL configured, using static configuration") } - runtime.Logger.Info("πŸ€– Agent ID: %s", cfg.Agent.ID) + runtime.Logger.Info("Agent ID: %s", cfg.Agent.ID) runtime.Logger.Info("🎯 Specialization: %s", cfg.Agent.Specialization) // CRITICAL: Validate license before any P2P operations @@ -185,18 +202,19 @@ func Initialize(appMode string) (*SharedRuntime, error) { LicenseID: cfg.License.LicenseID, ClusterID: cfg.License.ClusterID, KachingURL: cfg.License.KachingURL, + Version: AppVersion, }) if err := licenseValidator.Validate(); err != nil { return nil, fmt.Errorf("license validation failed: %v", err) } - runtime.Logger.Info("βœ… License validation successful - CHORUS authorized to run") + runtime.Logger.Info("License validation successful - CHORUS authorized to run") // Initialize AI provider configuration runtime.Logger.Info("🧠 Configuring AI provider: %s", cfg.AI.Provider) if err := initializeAIProvider(cfg, runtime.Logger); err != nil { return nil, fmt.Errorf("AI provider initialization failed: %v", err) } - runtime.Logger.Info("βœ… AI provider configured successfully") + runtime.Logger.Info("AI provider configured successfully") // Initialize metrics collector runtime.Metrics = metrics.NewCHORUSMetrics(nil) @@ -217,11 +235,11 @@ func Initialize(appMode string) (*SharedRuntime, error) { var backbeatIntegration *backbeat.Integration backbeatIntegration, err = backbeat.NewIntegration(cfg, cfg.Agent.ID, runtime.Logger) if err != nil { - runtime.Logger.Warn("⚠️ BACKBEAT integration initialization failed: %v", err) + runtime.Logger.Warn("BACKBEAT integration initialization failed: %v", err) runtime.Logger.Info("πŸ“ P2P operations will run without beat synchronization") } else { if err := backbeatIntegration.Start(ctx); err != nil { - runtime.Logger.Warn("⚠️ Failed to start BACKBEAT integration: %v", err) + runtime.Logger.Warn("Failed to start BACKBEAT integration: %v", err) backbeatIntegration = nil } else { runtime.Logger.Info("🎡 BACKBEAT integration started successfully") @@ -229,6 +247,29 @@ func Initialize(appMode string) (*SharedRuntime, error) { } runtime.BackbeatIntegration = backbeatIntegration + // Fetch bootstrap peers from WHOOSH for P2P mesh formation + runtime.Logger.Info("Fetching bootstrap peers from WHOOSH...") + bootstrapPeers, err := fetchBootstrapPeers(cfg.WHOOSHAPI.BaseURL, runtime.Logger) + if err != nil { + runtime.Logger.Warn("Failed to fetch bootstrap peers from WHOOSH: %v", err) + runtime.Logger.Info("Falling back to static bootstrap configuration") + bootstrapPeers = getStaticBootstrapPeers(runtime.Logger) + } else { + runtime.Logger.Info("Fetched %d bootstrap peers from WHOOSH", len(bootstrapPeers)) + } + + // Set bootstrap peers in config for P2P node initialization + if len(bootstrapPeers) > 0 { + cfg.V2.DHT.BootstrapPeers = make([]string, len(bootstrapPeers)) + for i, peer := range bootstrapPeers { + for _, addr := range peer.Addrs { + // Convert to full multiaddr with peer ID + cfg.V2.DHT.BootstrapPeers[i] = fmt.Sprintf("%s/p2p/%s", addr.String(), peer.ID.String()) + break // Use first address + } + } + } + // Initialize P2P node node, err := p2p.NewNode(ctx) if err != nil { @@ -243,6 +284,35 @@ func Initialize(appMode string) (*SharedRuntime, error) { runtime.Logger.Info(" %s/p2p/%s", addr, node.ID()) } + // Wait for bootstrap peers to connect before proceeding + // This prevents election race conditions where elections start before peer discovery + // Increased from 5s to 15s to allow more time for P2P mesh formation + if len(bootstrapPeers) > 0 { + runtime.Logger.Info("Waiting 15 seconds for bootstrap peer connections to establish...") + runtime.Logger.Info(" Target peers: %d bootstrap peers", len(bootstrapPeers)) + + // Poll connectivity every 3 seconds to provide feedback + for i := 0; i < 5; i++ { + time.Sleep(3 * time.Second) + connectedPeers := len(node.Peers()) + runtime.Logger.Info(" [%ds] Connected to %d peers", (i+1)*3, connectedPeers) + + // If we've connected to at least half the bootstrap peers, we're in good shape + if connectedPeers >= len(bootstrapPeers)/2 && connectedPeers > 0 { + runtime.Logger.Info("Bootstrap connectivity achieved (%d/%d peers), proceeding early", + connectedPeers, len(bootstrapPeers)) + break + } + } + + finalConnected := len(node.Peers()) + if finalConnected == 0 { + runtime.Logger.Warn("Bootstrap complete but NO peers connected - mesh may be isolated") + } else { + runtime.Logger.Info("Bootstrap grace period complete - %d peers connected", finalConnected) + } + } + // Initialize Hypercore-style logger for P2P coordination hlog := logging.NewHypercoreLog(node.ID()) if runtime.Shhh != nil { @@ -269,7 +339,7 @@ func Initialize(appMode string) (*SharedRuntime, error) { } runtime.PubSub = ps - runtime.Logger.Info("πŸ“‘ PubSub system initialized") + runtime.Logger.Info("PubSub system initialized") // Join role-based topics if role is configured if cfg.Agent.Role != "" { @@ -278,7 +348,7 @@ func Initialize(appMode string) (*SharedRuntime, error) { reportsTo = []string{cfg.Agent.ReportsTo} } if err := ps.JoinRoleBasedTopics(cfg.Agent.Role, cfg.Agent.Expertise, reportsTo); err != nil { - runtime.Logger.Warn("⚠️ Failed to join role-based topics: %v", err) + runtime.Logger.Warn("Failed to join role-based topics: %v", err) } else { runtime.Logger.Info("🎯 Joined role-based collaboration topics") } @@ -302,7 +372,7 @@ func Initialize(appMode string) (*SharedRuntime, error) { // Cleanup properly shuts down all runtime components func (r *SharedRuntime) Cleanup() { - r.Logger.Info("πŸ”„ Starting graceful shutdown...") + r.Logger.Info("Starting graceful shutdown...") if r.BackbeatIntegration != nil { r.BackbeatIntegration.Stop() @@ -310,7 +380,7 @@ func (r *SharedRuntime) Cleanup() { if r.MDNSDiscovery != nil { r.MDNSDiscovery.Close() - r.Logger.Info("πŸ” mDNS discovery closed") + r.Logger.Info("mDNS discovery closed") } if r.PubSub != nil { @@ -329,6 +399,12 @@ func (r *SharedRuntime) Cleanup() { r.HTTPServer.Stop() } + if r.CouncilSubscriber != nil { + if err := r.CouncilSubscriber.Close(); err != nil { + r.Logger.Warn("Failed to close council NATS subscriber: %v", err) + } + } + if r.UCXIServer != nil { r.UCXIServer.Stop() } @@ -341,7 +417,7 @@ func (r *SharedRuntime) Cleanup() { r.Cancel() } - r.Logger.Info("βœ… CHORUS shutdown completed") + r.Logger.Info("CHORUS shutdown completed") } // Helper methods for initialization (extracted from main.go) @@ -349,6 +425,15 @@ func (r *SharedRuntime) initializeElectionSystem() error { // === Admin Election System === electionManager := election.NewElectionManager(r.Context, r.Config, r.Node.Host(), r.PubSub, r.Node.ID().ShortString()) + if r.BackbeatIntegration != nil { + electionManager.SetTempoResolver(func() int { + return r.BackbeatIntegration.CurrentTempoBPM() + }) + electionManager.SetBeatGapResolver(func() time.Duration { + return r.BackbeatIntegration.TimeSinceLastBeat() + }) + } + // Set election callbacks with BACKBEAT integration electionManager.SetCallbacks( func(oldAdmin, newAdmin string) { @@ -372,7 +457,7 @@ func (r *SharedRuntime) initializeElectionSystem() error { r.Config.Slurp.Enabled = true // Apply admin role configuration if err := r.Config.ApplyRoleDefinition("admin"); err != nil { - r.Logger.Warn("⚠️ Failed to apply admin role: %v", err) + r.Logger.Warn("Failed to apply admin role: %v", err) } } }, @@ -396,7 +481,7 @@ func (r *SharedRuntime) initializeElectionSystem() error { return fmt.Errorf("failed to start election manager: %v", err) } r.ElectionManager = electionManager - r.Logger.Info("βœ… Election manager started with automated heartbeat management") + r.Logger.Info("Election manager started with automated heartbeat management") return nil } @@ -412,7 +497,7 @@ func (r *SharedRuntime) initializeDHTStorage() error { var err error dhtNode, err = dht.NewLibP2PDHT(r.Context, r.Node.Host()) if err != nil { - r.Logger.Warn("⚠️ Failed to create DHT: %v", err) + r.Logger.Warn("Failed to create DHT: %v", err) } else { r.Logger.Info("πŸ•ΈοΈ DHT initialized") @@ -424,14 +509,14 @@ func (r *SharedRuntime) initializeDHTStorage() error { } if err := dhtNode.Bootstrap(); err != nil { - r.Logger.Warn("⚠️ DHT bootstrap failed: %v", err) + r.Logger.Warn("DHT bootstrap failed: %v", err) r.BackbeatIntegration.FailP2POperation(operationID, err.Error()) } else { r.BackbeatIntegration.CompleteP2POperation(operationID, 1) } } else { if err := dhtNode.Bootstrap(); err != nil { - r.Logger.Warn("⚠️ DHT bootstrap failed: %v", err) + r.Logger.Warn("DHT bootstrap failed: %v", err) } } @@ -451,14 +536,14 @@ func (r *SharedRuntime) initializeDHTStorage() error { for _, addrStr := range bootstrapPeers { addr, err := multiaddr.NewMultiaddr(addrStr) if err != nil { - r.Logger.Warn("⚠️ Invalid bootstrap address %s: %v", addrStr, err) + r.Logger.Warn("Invalid bootstrap address %s: %v", addrStr, err) continue } // Extract peer info from multiaddr info, err := peer.AddrInfoFromP2pAddr(addr) if err != nil { - r.Logger.Warn("⚠️ Failed to parse peer info from %s: %v", addrStr, err) + r.Logger.Warn("Failed to parse peer info from %s: %v", addrStr, err) continue } @@ -471,7 +556,7 @@ func (r *SharedRuntime) initializeDHTStorage() error { r.BackbeatIntegration.UpdateP2POperationPhase(operationID, backbeat.PhaseConnecting, 0) if err := r.Node.Host().Connect(r.Context, *info); err != nil { - r.Logger.Warn("⚠️ Failed to connect to bootstrap peer %s: %v", addrStr, err) + r.Logger.Warn("Failed to connect to bootstrap peer %s: %v", addrStr, err) r.BackbeatIntegration.FailP2POperation(operationID, err.Error()) } else { r.Logger.Info("πŸ”— Connected to DHT bootstrap peer: %s", addrStr) @@ -480,7 +565,7 @@ func (r *SharedRuntime) initializeDHTStorage() error { } } else { if err := r.Node.Host().Connect(r.Context, *info); err != nil { - r.Logger.Warn("⚠️ Failed to connect to bootstrap peer %s: %v", addrStr, err) + r.Logger.Warn("Failed to connect to bootstrap peer %s: %v", addrStr, err) } else { r.Logger.Info("πŸ”— Connected to DHT bootstrap peer: %s", addrStr) } @@ -508,7 +593,7 @@ func (r *SharedRuntime) initializeDHTStorage() error { r.Node.ID().ShortString(), r.Config.Agent.ID, ) - r.Logger.Info("πŸ“€ Decision publisher initialized") + r.Logger.Info("Decision publisher initialized") } } else { r.Logger.Info("βšͺ DHT disabled in configuration") @@ -526,12 +611,13 @@ func (r *SharedRuntime) initializeServices() error { taskTracker := &SimpleTaskTracker{ maxTasks: r.Config.Agent.MaxTasks, activeTasks: make(map[string]bool), + logger: logging.ForComponent(logging.ComponentRuntime), } // Connect decision publisher to task tracker if available if r.DecisionPublisher != nil { taskTracker.decisionPublisher = r.DecisionPublisher - r.Logger.Info("πŸ“€ Task completion decisions will be published to DHT") + r.Logger.Info("Task completion decisions will be published to DHT") } r.TaskTracker = taskTracker @@ -548,18 +634,34 @@ func (r *SharedRuntime) initializeServices() error { taskCoordinator.Start() r.TaskCoordinator = taskCoordinator - r.Logger.Info("βœ… Task coordination system active") + r.Logger.Info("Task coordination system active") // Start HTTP API server - httpServer := api.NewHTTPServer(r.Config.Network.APIPort, r.HypercoreLog, r.PubSub) + httpServer := api.NewHTTPServer(r.Config, r.Node, r.HypercoreLog, r.PubSub) go func() { - r.Logger.Info("🌐 HTTP API server starting on :%d", r.Config.Network.APIPort) + r.Logger.Info("HTTP API server starting on :%d", r.Config.Network.APIPort) if err := httpServer.Start(); err != nil && err != http.ErrServerClosed { - r.Logger.Error("❌ HTTP server error: %v", err) + r.Logger.Error("HTTP server error: %v", err) } }() r.HTTPServer = httpServer + // Enable NATS-based council opportunity delivery. + natsURL := strings.TrimSpace(os.Getenv("CHORUS_COUNCIL_NATS_URL")) + if natsURL == "" { + natsURL = strings.TrimSpace(os.Getenv("CHORUS_BACKBEAT_NATS_URL")) + } + if natsURL == "" { + natsURL = "nats://backbeat-nats:4222" + } + + if subscriber, err := councilnats.NewCouncilSubscriber(natsURL, httpServer.CouncilManager, httpServer.WhooshEndpoint()); err != nil { + r.Logger.Warn("Council NATS subscriber disabled: %v", err) + } else { + r.CouncilSubscriber = subscriber + r.Logger.Info("Council opportunities via NATS enabled (url=%s)", natsURL) + } + // === UCXI Server Integration === var ucxiServer *ucxi.Server if r.Config.UCXL.Enabled && r.Config.UCXL.Server.Enabled { @@ -570,7 +672,7 @@ func (r *SharedRuntime) initializeServices() error { storage, err := ucxi.NewBasicContentStorage(storageDir) if err != nil { - r.Logger.Warn("⚠️ Failed to create UCXI storage: %v", err) + r.Logger.Warn("Failed to create UCXI storage: %v", err) } else { resolver := ucxi.NewBasicAddressResolver(r.Node.ID().ShortString()) resolver.SetDefaultTTL(r.Config.UCXL.Resolution.CacheTTL) @@ -580,14 +682,14 @@ func (r *SharedRuntime) initializeServices() error { BasePath: r.Config.UCXL.Server.BasePath, Resolver: resolver, Storage: storage, - Logger: ucxi.SimpleLogger{}, + Logger: ucxi.NewSimpleLogger(logging.ComponentUCXI), } ucxiServer = ucxi.NewServer(ucxiConfig) go func() { r.Logger.Info("πŸ”— UCXI server starting on :%d", r.Config.UCXL.Server.Port) if err := ucxiServer.Start(); err != nil && err != http.ErrServerClosed { - r.Logger.Error("❌ UCXI server error: %v", err) + r.Logger.Error("UCXI server error: %v", err) } }() } @@ -637,7 +739,7 @@ func initializeAIProvider(cfg *config.Config, logger *SimpleLogger) error { Timeout: cfg.AI.ResetData.Timeout, } reasoning.SetResetDataConfig(resetdataConfig) - logger.Info("🌐 ResetData AI provider configured - Endpoint: %s, Model: %s", + logger.Info("ResetData AI provider configured - Endpoint: %s, Model: %s", cfg.AI.ResetData.BaseURL, cfg.AI.ResetData.Model) case "ollama": @@ -645,7 +747,7 @@ func initializeAIProvider(cfg *config.Config, logger *SimpleLogger) error { logger.Info("πŸ¦™ Ollama AI provider configured - Endpoint: %s", cfg.AI.Ollama.Endpoint) default: - logger.Warn("⚠️ Unknown AI provider '%s', defaulting to resetdata", cfg.AI.Provider) + logger.Warn("Unknown AI provider '%s', defaulting to resetdata", cfg.AI.Provider) if cfg.AI.ResetData.APIKey == "" { return fmt.Errorf("RESETDATA_API_KEY environment variable is required for default resetdata provider") } @@ -700,9 +802,95 @@ func initializeAIProvider(cfg *config.Config, logger *SimpleLogger) error { logger.Info("πŸ“š LightRAG RAG system enabled - Endpoint: %s, Mode: %s", cfg.LightRAG.BaseURL, cfg.LightRAG.DefaultMode) } else { - logger.Warn("⚠️ LightRAG enabled but server not healthy at %s", cfg.LightRAG.BaseURL) + logger.Warn("LightRAG enabled but server not healthy at %s", cfg.LightRAG.BaseURL) } } return nil } + +// fetchBootstrapPeers fetches bootstrap peer list from WHOOSH +func fetchBootstrapPeers(whooshURL string, logger *SimpleLogger) ([]peer.AddrInfo, error) { + client := &http.Client{Timeout: 10 * time.Second} + + url := fmt.Sprintf("%s/api/v1/bootstrap-peers", whooshURL) + logger.Info("Fetching bootstrap peers from: %s", url) + + resp, err := client.Get(url) + if err != nil { + return nil, fmt.Errorf("failed to fetch bootstrap peers: %w", err) + } + defer resp.Body.Close() + + if resp.StatusCode != http.StatusOK { + return nil, fmt.Errorf("bootstrap endpoint returned status %d", resp.StatusCode) + } + + var result struct { + BootstrapPeers []struct { + Multiaddr string `json:"multiaddr"` + PeerID string `json:"peer_id"` + Name string `json:"name"` + Priority int `json:"priority"` + } `json:"bootstrap_peers"` + } + + if err := json.NewDecoder(resp.Body).Decode(&result); err != nil { + return nil, fmt.Errorf("failed to decode bootstrap peers: %w", err) + } + + // Convert to peer.AddrInfo format + peers := make([]peer.AddrInfo, 0, len(result.BootstrapPeers)) + for _, bp := range result.BootstrapPeers { + maddr, err := multiaddr.NewMultiaddr(bp.Multiaddr) + if err != nil { + logger.Warn("Invalid multiaddr %s: %v", bp.Multiaddr, err) + continue + } + + peerID, err := peer.Decode(bp.PeerID) + if err != nil { + logger.Warn("Invalid peer ID %s: %v", bp.PeerID, err) + continue + } + + peers = append(peers, peer.AddrInfo{ + ID: peerID, + Addrs: []multiaddr.Multiaddr{maddr}, + }) + + logger.Info(" Bootstrap peer: %s (%s, priority %d)", bp.Name, bp.PeerID, bp.Priority) + } + + return peers, nil +} + +// getStaticBootstrapPeers returns a static fallback list of bootstrap peers +func getStaticBootstrapPeers(logger *SimpleLogger) []peer.AddrInfo { + logger.Warn("Using static bootstrap peer configuration (fallback)") + + // Static HMMM monitor peer (if WHOOSH is unavailable) + staticPeers := []string{ + "/ip4/172.27.0.6/tcp/9001/p2p/12D3KooWBhVfNETuGyjsrGwmhny7vnJzP1y7H59oqmq1VAPTzQMW", + } + + peers := make([]peer.AddrInfo, 0, len(staticPeers)) + for _, peerStr := range staticPeers { + maddr, err := multiaddr.NewMultiaddr(peerStr) + if err != nil { + logger.Warn("Invalid static multiaddr %s: %v", peerStr, err) + continue + } + + addrInfo, err := peer.AddrInfoFromP2pAddr(maddr) + if err != nil { + logger.Warn("Failed to parse static peer address %s: %v", peerStr, err) + continue + } + + peers = append(peers, *addrInfo) + logger.Info(" πŸ“Œ Static bootstrap peer: %s", addrInfo.ID.ShortString()) + } + + return peers +} diff --git a/pubsub/pubsub.go b/pubsub/pubsub.go index 53be0c5..67dc2f9 100644 --- a/pubsub/pubsub.go +++ b/pubsub/pubsub.go @@ -8,10 +8,12 @@ import ( "sync" "time" + "chorus/internal/logging" "chorus/pkg/shhh" pubsub "github.com/libp2p/go-libp2p-pubsub" "github.com/libp2p/go-libp2p/core/host" "github.com/libp2p/go-libp2p/core/peer" + "github.com/rs/zerolog" ) // PubSub handles publish/subscribe messaging for Bzzz coordination and HMMM meta-discussion @@ -56,6 +58,9 @@ type PubSub struct { // SHHH sentinel redactor *shhh.Sentinel redactorMux sync.RWMutex + + // Structured logger + logger zerolog.Logger } // HypercoreLogger interface for dependency injection @@ -168,6 +173,7 @@ func NewPubSubWithLogger(ctx context.Context, h host.Host, chorusTopic, hmmmTopi dynamicSubs: make(map[string]*pubsub.Subscription), dynamicHandlers: make(map[string]func([]byte, peer.ID)), hypercoreLog: logger, + logger: logging.ForComponent(logging.ComponentP2P), } // Join static topics @@ -181,7 +187,11 @@ func NewPubSubWithLogger(ctx context.Context, h host.Host, chorusTopic, hmmmTopi go p.handleHmmmMessages() go p.handleContextFeedbackMessages() - fmt.Printf("πŸ“‘ PubSub initialized - Bzzz: %s, HMMM: %s, Context: %s\n", chorusTopic, hmmmTopic, contextTopic) + p.logger.Info(). + Str("bzzz_topic", chorusTopic). + Str("hmmm_topic", hmmmTopic). + Str("context_topic", contextTopic). + Msg("PubSub initialized") return p, nil } @@ -297,7 +307,7 @@ func (p *PubSub) subscribeDynamicTopic(topicName string, handler func([]byte, pe go p.handleDynamicMessages(topicName, sub) - fmt.Printf("βœ… Joined dynamic topic: %s\n", topicName) + p.logger.Info().Str("topic_name", topicName).Msg("Joined dynamic topic") return nil } @@ -339,12 +349,12 @@ func (p *PubSub) JoinRoleBasedTopics(role string, expertise []string, reportsTo // Join all identified topics for _, topicName := range topicsToJoin { if err := p.JoinDynamicTopic(topicName); err != nil { - fmt.Printf("⚠️ Failed to join role-based topic %s: %v\n", topicName, err) + p.logger.Warn().Err(err).Str("topic_name", topicName).Msg("Failed to join role-based topic") continue } } - fmt.Printf("🎯 Joined %d role-based topics for role: %s\n", len(topicsToJoin), role) + p.logger.Info().Int("topic_count", len(topicsToJoin)).Str("role", role).Msg("Joined role-based topics") return nil } @@ -379,7 +389,7 @@ func (p *PubSub) LeaveDynamicTopic(topicName string) { delete(p.dynamicHandlers, topicName) p.dynamicHandlersMux.Unlock() - fmt.Printf("πŸ—‘οΈ Left dynamic topic: %s\n", topicName) + p.logger.Info().Str("topic_name", topicName).Msg("Left dynamic topic") } // PublishToDynamicTopic publishes a message to a specific dynamic topic @@ -588,7 +598,7 @@ func (p *PubSub) handleBzzzMessages() { if p.ctx.Err() != nil { return // Context cancelled } - fmt.Printf("❌ Error receiving Bzzz message: %v\n", err) + p.logger.Warn().Err(err).Msg("Error receiving Bzzz message") continue } @@ -598,7 +608,7 @@ func (p *PubSub) handleBzzzMessages() { var chorusMsg Message if err := json.Unmarshal(msg.Data, &chorusMsg); err != nil { - fmt.Printf("❌ Failed to unmarshal Bzzz message: %v\n", err) + p.logger.Warn().Err(err).Msg("Failed to unmarshal Bzzz message") continue } @@ -614,7 +624,7 @@ func (p *PubSub) handleHmmmMessages() { if p.ctx.Err() != nil { return // Context cancelled } - fmt.Printf("❌ Error receiving HMMM message: %v\n", err) + p.logger.Warn().Err(err).Msg("Error receiving HMMM message") continue } @@ -624,7 +634,7 @@ func (p *PubSub) handleHmmmMessages() { var hmmmMsg Message if err := json.Unmarshal(msg.Data, &hmmmMsg); err != nil { - fmt.Printf("❌ Failed to unmarshal HMMM message: %v\n", err) + p.logger.Warn().Err(err).Msg("Failed to unmarshal HMMM message") continue } @@ -644,7 +654,7 @@ func (p *PubSub) handleContextFeedbackMessages() { if p.ctx.Err() != nil { return // Context cancelled } - fmt.Printf("❌ Error receiving Context Feedback message: %v\n", err) + p.logger.Warn().Err(err).Msg("Error receiving Context Feedback message") continue } @@ -654,7 +664,7 @@ func (p *PubSub) handleContextFeedbackMessages() { var contextMsg Message if err := json.Unmarshal(msg.Data, &contextMsg); err != nil { - fmt.Printf("❌ Failed to unmarshal Context Feedback message: %v\n", err) + p.logger.Warn().Err(err).Msg("Failed to unmarshal Context Feedback message") continue } @@ -682,7 +692,7 @@ func (p *PubSub) handleDynamicMessages(topicName string, sub *pubsub.Subscriptio if p.ctx.Err() != nil || err.Error() == "subscription cancelled" { return // Subscription was cancelled, exit handler } - fmt.Printf("❌ Error receiving dynamic message on %s: %v\n", topicName, err) + p.logger.Warn().Err(err).Str("topic_name", topicName).Msg("Error receiving dynamic message") continue } @@ -697,7 +707,7 @@ func (p *PubSub) handleDynamicMessages(topicName string, sub *pubsub.Subscriptio var dynamicMsg Message if err := json.Unmarshal(msg.Data, &dynamicMsg); err != nil { - fmt.Printf("❌ Failed to unmarshal dynamic message on %s: %v\n", topicName, err) + p.logger.Warn().Err(err).Str("topic_name", topicName).Msg("Failed to unmarshal dynamic message") continue } @@ -710,7 +720,11 @@ func (p *PubSub) handleDynamicMessages(topicName string, sub *pubsub.Subscriptio // processBzzzMessage handles different types of Bzzz coordination messages func (p *PubSub) processBzzzMessage(msg Message, from peer.ID) { - fmt.Printf("🐝 Bzzz [%s] from %s: %v\n", msg.Type, from.ShortString(), msg.Data) + p.logger.Debug(). + Str("message_type", string(msg.Type)). + Str("from_peer", from.ShortString()). + Interface("data", msg.Data). + Msg("Bzzz message received") // Log to hypercore if logger is available if p.hypercoreLog != nil { @@ -743,15 +757,18 @@ func (p *PubSub) processBzzzMessage(msg Message, from peer.ID) { } if err := p.hypercoreLog.AppendString(logType, logData); err != nil { - fmt.Printf("❌ Failed to log Bzzz message to hypercore: %v\n", err) + p.logger.Warn().Err(err).Msg("Failed to log Bzzz message to hypercore") } } } // processHmmmMessage provides default handling for HMMM messages if no external handler is set func (p *PubSub) processHmmmMessage(msg Message, from peer.ID) { - fmt.Printf("🎯 Default HMMM Handler [%s] from %s: %v\n", - msg.Type, from.ShortString(), msg.Data) + p.logger.Debug(). + Str("message_type", string(msg.Type)). + Str("from_peer", from.ShortString()). + Interface("data", msg.Data). + Msg("Default HMMM Handler") // Log to hypercore if logger is available if p.hypercoreLog != nil { @@ -794,15 +811,18 @@ func (p *PubSub) processHmmmMessage(msg Message, from peer.ID) { } if err := p.hypercoreLog.AppendString(logType, logData); err != nil { - fmt.Printf("❌ Failed to log HMMM message to hypercore: %v\n", err) + p.logger.Warn().Err(err).Msg("Failed to log HMMM message to hypercore") } } } // processContextFeedbackMessage provides default handling for context feedback messages if no external handler is set func (p *PubSub) processContextFeedbackMessage(msg Message, from peer.ID) { - fmt.Printf("🧠 Context Feedback [%s] from %s: %v\n", - msg.Type, from.ShortString(), msg.Data) + p.logger.Debug(). + Str("message_type", string(msg.Type)). + Str("from_peer", from.ShortString()). + Interface("data", msg.Data). + Msg("Context Feedback") // Log to hypercore if logger is available if p.hypercoreLog != nil { @@ -834,7 +854,7 @@ func (p *PubSub) processContextFeedbackMessage(msg Message, from peer.ID) { } if err := p.hypercoreLog.AppendString(logType, logData); err != nil { - fmt.Printf("❌ Failed to log Context Feedback message to hypercore: %v\n", err) + p.logger.Warn().Err(err).Msg("Failed to log Context Feedback message to hypercore") } } }