package main import ( "context" "encoding/json" "flag" "fmt" "net/http" "os" "os/signal" "sync" "syscall" "time" "github.com/gorilla/mux" "github.com/nats-io/nats.go" "github.com/prometheus/client_golang/prometheus/promhttp" "github.com/rs/zerolog" "github.com/rs/zerolog/log" bb "github.com/chorus-services/backbeat/internal/backbeat" ) // ReverbService implements BACKBEAT-REQ-020, BACKBEAT-REQ-021, BACKBEAT-REQ-022 // Aggregates StatusClaims from agents and produces BarReports for each window type ReverbService struct { clusterID string nodeID string natsConn *nats.Conn metrics *bb.Metrics // Window management windowsMu sync.RWMutex windows map[string]*bb.WindowAggregation // windowID -> aggregation windowTTL time.Duration barLength int // Pulse synchronization currentBeat int64 currentWindowID string // Configuration maxWindowsRetained int cleanupInterval time.Duration // Control channels ctx context.Context cancel context.CancelFunc done chan struct{} } // NewReverbService creates a new reverb aggregation service func NewReverbService(clusterID, nodeID string, natsConn *nats.Conn, barLength int) *ReverbService { ctx, cancel := context.WithCancel(context.Background()) return &ReverbService{ clusterID: clusterID, nodeID: nodeID, natsConn: natsConn, metrics: bb.NewMetrics(), windows: make(map[string]*bb.WindowAggregation), windowTTL: 5 * time.Minute, // Keep windows for 5 minutes after completion barLength: barLength, maxWindowsRetained: 100, // Prevent memory leaks cleanupInterval: 30 * time.Second, ctx: ctx, cancel: cancel, done: make(chan struct{}), } } // Start initializes and starts the reverb service // BACKBEAT-REQ-020: Subscribe to INT-B StatusClaims; group by window_id // BACKBEAT-REQ-021: Emit INT-C BarReport at each downbeat with KPIs func (rs *ReverbService) Start() error { log.Info(). Str("cluster_id", rs.clusterID). Str("node_id", rs.nodeID). Int("bar_length", rs.barLength). Msg("Starting BACKBEAT reverb service") // BACKBEAT-REQ-020: Subscribe to StatusClaims on status channel beatSubject := fmt.Sprintf("backbeat.%s.beat", rs.clusterID) statusSubject := fmt.Sprintf("backbeat.%s.status", rs.clusterID) // Subscribe to pulse BeatFrames for downbeat timing _, err := rs.natsConn.Subscribe(beatSubject, rs.handleBeatFrame) if err != nil { return fmt.Errorf("failed to subscribe to beat channel: %w", err) } log.Info().Str("subject", beatSubject).Msg("Subscribed to pulse beat channel") // Subscribe to StatusClaims for aggregation _, err = rs.natsConn.Subscribe(statusSubject, rs.handleStatusClaim) if err != nil { return fmt.Errorf("failed to subscribe to status channel: %w", err) } log.Info().Str("subject", statusSubject).Msg("Subscribed to agent status channel") // Start background cleanup goroutine go rs.cleanupRoutine() // Start HTTP server for health and metrics go rs.startHTTPServer() log.Info().Msg("BACKBEAT reverb service started successfully") return nil } // handleBeatFrame processes incoming BeatFrames to detect downbeats // BACKBEAT-REQ-021: Emit INT-C BarReport at each downbeat with KPIs func (rs *ReverbService) handleBeatFrame(msg *nats.Msg) { var bf bb.BeatFrame if err := json.Unmarshal(msg.Data, &bf); err != nil { log.Error().Err(err).Msg("Failed to unmarshal BeatFrame") rs.metrics.RecordNATSError("unmarshal_error") return } rs.currentBeat = bf.BeatIndex // Process downbeat - emit BarReport for previous window if bf.Downbeat && rs.currentWindowID != "" && rs.currentWindowID != bf.WindowID { rs.processDownbeat(rs.currentWindowID) } // Update current window rs.currentWindowID = bf.WindowID log.Debug(). Int64("beat_index", bf.BeatIndex). Bool("downbeat", bf.Downbeat). Str("window_id", bf.WindowID). Msg("Processed beat frame") } // handleStatusClaim processes incoming StatusClaims for aggregation // BACKBEAT-REQ-020: Subscribe to INT-B StatusClaims; group by window_id func (rs *ReverbService) handleStatusClaim(msg *nats.Msg) { var sc bb.StatusClaim if err := json.Unmarshal(msg.Data, &sc); err != nil { log.Error().Err(err).Msg("Failed to unmarshal StatusClaim") rs.metrics.RecordNATSError("unmarshal_error") return } // Validate StatusClaim according to INT-B specification if err := bb.ValidateStatusClaim(&sc); err != nil { log.Warn().Err(err). Str("agent_id", sc.AgentID). Str("task_id", sc.TaskID). Msg("Invalid StatusClaim received") return } // Determine window ID for this claim windowID := rs.getWindowIDForBeat(sc.BeatIndex) if windowID == "" { log.Warn(). Int64("beat_index", sc.BeatIndex). Msg("Could not determine window ID for StatusClaim") return } // Add claim to appropriate window aggregation rs.addClaimToWindow(windowID, &sc) rs.metrics.RecordReverbClaim() log.Debug(). Str("agent_id", sc.AgentID). Str("task_id", sc.TaskID). Str("state", sc.State). Str("window_id", windowID). Msg("Processed status claim") } // addClaimToWindow adds a StatusClaim to the appropriate window aggregation func (rs *ReverbService) addClaimToWindow(windowID string, claim *bb.StatusClaim) { rs.windowsMu.Lock() defer rs.windowsMu.Unlock() // Get or create window aggregation window, exists := rs.windows[windowID] if !exists { // Create new window - calculate beat range fromBeat := rs.getWindowStartBeat(claim.BeatIndex) toBeat := fromBeat + int64(rs.barLength) - 1 window = bb.NewWindowAggregation(windowID, fromBeat, toBeat) rs.windows[windowID] = window log.Info(). Str("window_id", windowID). Int64("from_beat", fromBeat). Int64("to_beat", toBeat). Msg("Created new window aggregation") } // Add claim to window window.AddClaim(claim) // Update metrics rs.metrics.UpdateReverbActiveWindows(len(rs.windows)) } // processDownbeat processes a completed window and emits BarReport // BACKBEAT-REQ-021: Emit INT-C BarReport at each downbeat with KPIs // BACKBEAT-PER-002: Reverb rollup complete ≤ 1 beat after downbeat func (rs *ReverbService) processDownbeat(windowID string) { start := time.Now() rs.windowsMu.RLock() window, exists := rs.windows[windowID] rs.windowsMu.RUnlock() if !exists { log.Warn().Str("window_id", windowID).Msg("No aggregation found for completed window") return } log.Info(). Str("window_id", windowID). Int("claims_count", len(window.Claims)). Int("agents_reporting", len(window.UniqueAgents)). Msg("Processing completed window") // Generate BarReport from aggregated data barReport := window.GenerateBarReport(rs.clusterID) // Serialize BarReport reportData, err := json.Marshal(barReport) if err != nil { log.Error().Err(err).Str("window_id", windowID).Msg("Failed to marshal BarReport") return } // BACKBEAT-REQ-021: Emit INT-C BarReport reverbSubject := fmt.Sprintf("backbeat.%s.reverb", rs.clusterID) if err := rs.natsConn.Publish(reverbSubject, reportData); err != nil { log.Error().Err(err). Str("window_id", windowID). Str("subject", reverbSubject). Msg("Failed to publish BarReport") rs.metrics.RecordNATSError("publish_error") return } processingTime := time.Since(start) // Record metrics rs.metrics.RecordReverbWindow( processingTime, len(window.Claims), barReport.AgentsReporting, barReport.OnTimeReviews, barReport.TempoDriftMS, len(reportData), ) log.Info(). Str("window_id", windowID). Int("claims_processed", len(window.Claims)). Int("agents_reporting", barReport.AgentsReporting). Int("on_time_reviews", barReport.OnTimeReviews). Dur("processing_time", processingTime). Int("report_size_bytes", len(reportData)). Msg("Published BarReport") // BACKBEAT-REQ-022: Optionally persist BarReports via DHT (placeholder) // TODO: Implement DHT persistence when available log.Debug(). Str("window_id", windowID). Msg("DHT persistence placeholder - not yet implemented") } // getWindowIDForBeat determines the window ID for a given beat index func (rs *ReverbService) getWindowIDForBeat(beatIndex int64) string { if beatIndex <= 0 { return "" } // Find the downbeat for this window downbeatIndex := bb.GetDownbeatIndex(beatIndex, rs.barLength) // Generate deterministic window ID per BACKBEAT-REQ-005 return bb.GenerateWindowID(rs.clusterID, downbeatIndex) } // getWindowStartBeat calculates the starting beat for a window containing the given beat func (rs *ReverbService) getWindowStartBeat(beatIndex int64) int64 { return bb.GetDownbeatIndex(beatIndex, rs.barLength) } // cleanupRoutine periodically cleans up old window aggregations func (rs *ReverbService) cleanupRoutine() { ticker := time.NewTicker(rs.cleanupInterval) defer ticker.Stop() for { select { case <-rs.ctx.Done(): return case <-ticker.C: rs.cleanupOldWindows() } } } // cleanupOldWindows removes expired window aggregations to prevent memory leaks func (rs *ReverbService) cleanupOldWindows() { rs.windowsMu.Lock() defer rs.windowsMu.Unlock() now := time.Now() removedCount := 0 for windowID, window := range rs.windows { if now.Sub(window.LastUpdated) > rs.windowTTL { delete(rs.windows, windowID) removedCount++ } } // Also enforce maximum window retention if len(rs.windows) > rs.maxWindowsRetained { // Remove oldest windows beyond limit (simple approach) excess := len(rs.windows) - rs.maxWindowsRetained for windowID := range rs.windows { if excess <= 0 { break } delete(rs.windows, windowID) removedCount++ excess-- } } if removedCount > 0 { log.Info(). Int("removed_count", removedCount). Int("remaining_windows", len(rs.windows)). Msg("Cleaned up old window aggregations") } // Update metrics rs.metrics.UpdateReverbActiveWindows(len(rs.windows)) } // startHTTPServer starts the HTTP server for health checks and metrics func (rs *ReverbService) startHTTPServer() { router := mux.NewRouter() // Health endpoint router.HandleFunc("/health", rs.healthHandler).Methods("GET") router.HandleFunc("/ready", rs.readinessHandler).Methods("GET") // Metrics endpoint router.Handle("/metrics", promhttp.Handler()).Methods("GET") // Admin API endpoints router.HandleFunc("/api/v1/windows", rs.listWindowsHandler).Methods("GET") router.HandleFunc("/api/v1/windows/{windowId}", rs.getWindowHandler).Methods("GET") router.HandleFunc("/api/v1/status", rs.statusHandler).Methods("GET") server := &http.Server{ Addr: ":8080", Handler: router, ReadTimeout: 10 * time.Second, WriteTimeout: 10 * time.Second, } log.Info().Str("address", ":8080").Msg("Starting HTTP server") if err := server.ListenAndServe(); err != nil && err != http.ErrServerClosed { log.Error().Err(err).Msg("HTTP server error") } } // Health check handlers func (rs *ReverbService) healthHandler(w http.ResponseWriter, r *http.Request) { w.Header().Set("Content-Type", "application/json") w.WriteHeader(http.StatusOK) json.NewEncoder(w).Encode(map[string]interface{}{ "status": "healthy", "service": "backbeat-reverb", "cluster_id": rs.clusterID, "node_id": rs.nodeID, "timestamp": time.Now().UTC().Format(time.RFC3339), }) } func (rs *ReverbService) readinessHandler(w http.ResponseWriter, r *http.Request) { // Check NATS connection if !rs.natsConn.IsConnected() { w.WriteHeader(http.StatusServiceUnavailable) json.NewEncoder(w).Encode(map[string]string{ "status": "not ready", "reason": "NATS connection lost", }) return } w.Header().Set("Content-Type", "application/json") w.WriteHeader(http.StatusOK) json.NewEncoder(w).Encode(map[string]interface{}{ "status": "ready", "active_windows": len(rs.windows), "current_beat": rs.currentBeat, "current_window_id": rs.currentWindowID, }) } // Admin API handlers func (rs *ReverbService) listWindowsHandler(w http.ResponseWriter, r *http.Request) { rs.windowsMu.RLock() defer rs.windowsMu.RUnlock() windows := make([]map[string]interface{}, 0, len(rs.windows)) for windowID, window := range rs.windows { windows = append(windows, map[string]interface{}{ "window_id": windowID, "from_beat": window.FromBeat, "to_beat": window.ToBeat, "claims_count": len(window.Claims), "agents_reporting": len(window.UniqueAgents), "last_updated": window.LastUpdated.UTC().Format(time.RFC3339), }) } w.Header().Set("Content-Type", "application/json") json.NewEncoder(w).Encode(map[string]interface{}{ "windows": windows, "total_count": len(windows), }) } func (rs *ReverbService) getWindowHandler(w http.ResponseWriter, r *http.Request) { vars := mux.Vars(r) windowID := vars["windowId"] rs.windowsMu.RLock() window, exists := rs.windows[windowID] rs.windowsMu.RUnlock() if !exists { w.WriteHeader(http.StatusNotFound) json.NewEncoder(w).Encode(map[string]string{ "error": "window not found", "window_id": windowID, }) return } // Generate current BarReport for this window barReport := window.GenerateBarReport(rs.clusterID) w.Header().Set("Content-Type", "application/json") json.NewEncoder(w).Encode(map[string]interface{}{ "window_aggregation": map[string]interface{}{ "window_id": window.WindowID, "from_beat": window.FromBeat, "to_beat": window.ToBeat, "claims_count": len(window.Claims), "unique_agents": len(window.UniqueAgents), "state_counts": window.StateCounts, "completed_tasks": window.CompletedTasks, "failed_tasks": window.FailedTasks, "last_updated": window.LastUpdated.UTC().Format(time.RFC3339), }, "current_bar_report": barReport, }) } func (rs *ReverbService) statusHandler(w http.ResponseWriter, r *http.Request) { w.Header().Set("Content-Type", "application/json") json.NewEncoder(w).Encode(map[string]interface{}{ "service": "backbeat-reverb", "cluster_id": rs.clusterID, "node_id": rs.nodeID, "active_windows": len(rs.windows), "current_beat": rs.currentBeat, "current_window_id": rs.currentWindowID, "bar_length": rs.barLength, "window_ttl_seconds": int(rs.windowTTL.Seconds()), "max_windows_retained": rs.maxWindowsRetained, "nats_connected": rs.natsConn.IsConnected(), "uptime_seconds": time.Since(time.Now()).Seconds(), // Placeholder "version": "v1.0.0", "timestamp": time.Now().UTC().Format(time.RFC3339), }) } // Stop gracefully shuts down the reverb service func (rs *ReverbService) Stop() { log.Info().Msg("Stopping BACKBEAT reverb service") rs.cancel() close(rs.done) } func main() { // Command line flags clusterID := flag.String("cluster", "chorus-aus-01", "Cluster identifier") natsURL := flag.String("nats", "nats://backbeat-nats:4222", "NATS server URL") nodeID := flag.String("node", "", "Node identifier (auto-generated if empty)") barLength := flag.Int("bar-length", 120, "Bar length in beats") logLevel := flag.String("log-level", "info", "Log level (debug, info, warn, error)") flag.Parse() // Configure structured logging switch *logLevel { case "debug": zerolog.SetGlobalLevel(zerolog.DebugLevel) case "info": zerolog.SetGlobalLevel(zerolog.InfoLevel) case "warn": zerolog.SetGlobalLevel(zerolog.WarnLevel) case "error": zerolog.SetGlobalLevel(zerolog.ErrorLevel) default: zerolog.SetGlobalLevel(zerolog.InfoLevel) } // Pretty logging in development if os.Getenv("BACKBEAT_ENV") != "production" { log.Logger = log.Output(zerolog.ConsoleWriter{Out: os.Stderr}) } // Generate node ID if not provided if *nodeID == "" { *nodeID = fmt.Sprintf("reverb-%d", time.Now().Unix()) } log.Info(). Str("cluster_id", *clusterID). Str("node_id", *nodeID). Str("nats_url", *natsURL). Int("bar_length", *barLength). Msg("Starting BACKBEAT reverb service") // Connect to NATS nc, err := nats.Connect(*natsURL, nats.Timeout(10*time.Second), nats.ReconnectWait(2*time.Second), nats.MaxReconnects(-1), nats.DisconnectErrHandler(func(nc *nats.Conn, err error) { log.Error().Err(err).Msg("NATS disconnected") }), nats.ReconnectHandler(func(nc *nats.Conn) { log.Info().Str("server", nc.ConnectedUrl()).Msg("NATS reconnected") }), ) if err != nil { log.Fatal().Err(err).Msg("Failed to connect to NATS") } defer nc.Drain() // Create and start reverb service service := NewReverbService(*clusterID, *nodeID, nc, *barLength) if err := service.Start(); err != nil { log.Fatal().Err(err).Msg("Failed to start reverb service") } // Handle graceful shutdown sigChan := make(chan os.Signal, 1) signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM) log.Info().Msg("BACKBEAT reverb service is running. Press Ctrl+C to exit.") // Wait for shutdown signal <-sigChan log.Info().Msg("Shutdown signal received") // Graceful shutdown service.Stop() // Wait for background tasks to complete select { case <-service.done: log.Info().Msg("BACKBEAT reverb service stopped gracefully") case <-time.After(30 * time.Second): log.Warn().Msg("Shutdown timeout exceeded") } }