586 lines
17 KiB
Go
586 lines
17 KiB
Go
package main
|
|
|
|
import (
|
|
"context"
|
|
"encoding/json"
|
|
"flag"
|
|
"fmt"
|
|
"net/http"
|
|
"os"
|
|
"os/signal"
|
|
"sync"
|
|
"syscall"
|
|
"time"
|
|
|
|
"github.com/gorilla/mux"
|
|
"github.com/nats-io/nats.go"
|
|
"github.com/prometheus/client_golang/prometheus/promhttp"
|
|
"github.com/rs/zerolog"
|
|
"github.com/rs/zerolog/log"
|
|
|
|
bb "github.com/chorus-services/backbeat/internal/backbeat"
|
|
)
|
|
|
|
// ReverbService implements BACKBEAT-REQ-020, BACKBEAT-REQ-021, BACKBEAT-REQ-022
|
|
// Aggregates StatusClaims from agents and produces BarReports for each window
|
|
type ReverbService struct {
|
|
clusterID string
|
|
nodeID string
|
|
natsConn *nats.Conn
|
|
metrics *bb.Metrics
|
|
|
|
// Window management
|
|
windowsMu sync.RWMutex
|
|
windows map[string]*bb.WindowAggregation // windowID -> aggregation
|
|
windowTTL time.Duration
|
|
barLength int
|
|
|
|
// Pulse synchronization
|
|
currentBeat int64
|
|
currentWindowID string
|
|
|
|
// Configuration
|
|
maxWindowsRetained int
|
|
cleanupInterval time.Duration
|
|
|
|
// Control channels
|
|
ctx context.Context
|
|
cancel context.CancelFunc
|
|
done chan struct{}
|
|
}
|
|
|
|
// NewReverbService creates a new reverb aggregation service
|
|
func NewReverbService(clusterID, nodeID string, natsConn *nats.Conn, barLength int) *ReverbService {
|
|
ctx, cancel := context.WithCancel(context.Background())
|
|
|
|
return &ReverbService{
|
|
clusterID: clusterID,
|
|
nodeID: nodeID,
|
|
natsConn: natsConn,
|
|
metrics: bb.NewMetrics(),
|
|
windows: make(map[string]*bb.WindowAggregation),
|
|
windowTTL: 5 * time.Minute, // Keep windows for 5 minutes after completion
|
|
barLength: barLength,
|
|
maxWindowsRetained: 100, // Prevent memory leaks
|
|
cleanupInterval: 30 * time.Second,
|
|
ctx: ctx,
|
|
cancel: cancel,
|
|
done: make(chan struct{}),
|
|
}
|
|
}
|
|
|
|
// Start initializes and starts the reverb service
|
|
// BACKBEAT-REQ-020: Subscribe to INT-B StatusClaims; group by window_id
|
|
// BACKBEAT-REQ-021: Emit INT-C BarReport at each downbeat with KPIs
|
|
func (rs *ReverbService) Start() error {
|
|
log.Info().
|
|
Str("cluster_id", rs.clusterID).
|
|
Str("node_id", rs.nodeID).
|
|
Int("bar_length", rs.barLength).
|
|
Msg("Starting BACKBEAT reverb service")
|
|
|
|
// BACKBEAT-REQ-020: Subscribe to StatusClaims on status channel
|
|
beatSubject := fmt.Sprintf("backbeat.%s.beat", rs.clusterID)
|
|
statusSubject := fmt.Sprintf("backbeat.%s.status", rs.clusterID)
|
|
|
|
// Subscribe to pulse BeatFrames for downbeat timing
|
|
_, err := rs.natsConn.Subscribe(beatSubject, rs.handleBeatFrame)
|
|
if err != nil {
|
|
return fmt.Errorf("failed to subscribe to beat channel: %w", err)
|
|
}
|
|
log.Info().Str("subject", beatSubject).Msg("Subscribed to pulse beat channel")
|
|
|
|
// Subscribe to StatusClaims for aggregation
|
|
_, err = rs.natsConn.Subscribe(statusSubject, rs.handleStatusClaim)
|
|
if err != nil {
|
|
return fmt.Errorf("failed to subscribe to status channel: %w", err)
|
|
}
|
|
log.Info().Str("subject", statusSubject).Msg("Subscribed to agent status channel")
|
|
|
|
// Start background cleanup goroutine
|
|
go rs.cleanupRoutine()
|
|
|
|
// Start HTTP server for health and metrics
|
|
go rs.startHTTPServer()
|
|
|
|
log.Info().Msg("BACKBEAT reverb service started successfully")
|
|
return nil
|
|
}
|
|
|
|
// handleBeatFrame processes incoming BeatFrames to detect downbeats
|
|
// BACKBEAT-REQ-021: Emit INT-C BarReport at each downbeat with KPIs
|
|
func (rs *ReverbService) handleBeatFrame(msg *nats.Msg) {
|
|
var bf bb.BeatFrame
|
|
if err := json.Unmarshal(msg.Data, &bf); err != nil {
|
|
log.Error().Err(err).Msg("Failed to unmarshal BeatFrame")
|
|
rs.metrics.RecordNATSError("unmarshal_error")
|
|
return
|
|
}
|
|
|
|
rs.currentBeat = bf.BeatIndex
|
|
|
|
// Process downbeat - emit BarReport for previous window
|
|
if bf.Downbeat && rs.currentWindowID != "" && rs.currentWindowID != bf.WindowID {
|
|
rs.processDownbeat(rs.currentWindowID)
|
|
}
|
|
|
|
// Update current window
|
|
rs.currentWindowID = bf.WindowID
|
|
|
|
log.Debug().
|
|
Int64("beat_index", bf.BeatIndex).
|
|
Bool("downbeat", bf.Downbeat).
|
|
Str("window_id", bf.WindowID).
|
|
Msg("Processed beat frame")
|
|
}
|
|
|
|
// handleStatusClaim processes incoming StatusClaims for aggregation
|
|
// BACKBEAT-REQ-020: Subscribe to INT-B StatusClaims; group by window_id
|
|
func (rs *ReverbService) handleStatusClaim(msg *nats.Msg) {
|
|
var sc bb.StatusClaim
|
|
if err := json.Unmarshal(msg.Data, &sc); err != nil {
|
|
log.Error().Err(err).Msg("Failed to unmarshal StatusClaim")
|
|
rs.metrics.RecordNATSError("unmarshal_error")
|
|
return
|
|
}
|
|
|
|
// Validate StatusClaim according to INT-B specification
|
|
if err := bb.ValidateStatusClaim(&sc); err != nil {
|
|
log.Warn().Err(err).
|
|
Str("agent_id", sc.AgentID).
|
|
Str("task_id", sc.TaskID).
|
|
Msg("Invalid StatusClaim received")
|
|
return
|
|
}
|
|
|
|
// Determine window ID for this claim
|
|
windowID := rs.getWindowIDForBeat(sc.BeatIndex)
|
|
if windowID == "" {
|
|
log.Warn().
|
|
Int64("beat_index", sc.BeatIndex).
|
|
Msg("Could not determine window ID for StatusClaim")
|
|
return
|
|
}
|
|
|
|
// Add claim to appropriate window aggregation
|
|
rs.addClaimToWindow(windowID, &sc)
|
|
|
|
rs.metrics.RecordReverbClaim()
|
|
|
|
log.Debug().
|
|
Str("agent_id", sc.AgentID).
|
|
Str("task_id", sc.TaskID).
|
|
Str("state", sc.State).
|
|
Str("window_id", windowID).
|
|
Msg("Processed status claim")
|
|
}
|
|
|
|
// addClaimToWindow adds a StatusClaim to the appropriate window aggregation
|
|
func (rs *ReverbService) addClaimToWindow(windowID string, claim *bb.StatusClaim) {
|
|
rs.windowsMu.Lock()
|
|
defer rs.windowsMu.Unlock()
|
|
|
|
// Get or create window aggregation
|
|
window, exists := rs.windows[windowID]
|
|
if !exists {
|
|
// Create new window - calculate beat range
|
|
fromBeat := rs.getWindowStartBeat(claim.BeatIndex)
|
|
toBeat := fromBeat + int64(rs.barLength) - 1
|
|
|
|
window = bb.NewWindowAggregation(windowID, fromBeat, toBeat)
|
|
rs.windows[windowID] = window
|
|
|
|
log.Info().
|
|
Str("window_id", windowID).
|
|
Int64("from_beat", fromBeat).
|
|
Int64("to_beat", toBeat).
|
|
Msg("Created new window aggregation")
|
|
}
|
|
|
|
// Add claim to window
|
|
window.AddClaim(claim)
|
|
|
|
// Update metrics
|
|
rs.metrics.UpdateReverbActiveWindows(len(rs.windows))
|
|
}
|
|
|
|
// processDownbeat processes a completed window and emits BarReport
|
|
// BACKBEAT-REQ-021: Emit INT-C BarReport at each downbeat with KPIs
|
|
// BACKBEAT-PER-002: Reverb rollup complete ≤ 1 beat after downbeat
|
|
func (rs *ReverbService) processDownbeat(windowID string) {
|
|
start := time.Now()
|
|
|
|
rs.windowsMu.RLock()
|
|
window, exists := rs.windows[windowID]
|
|
rs.windowsMu.RUnlock()
|
|
|
|
if !exists {
|
|
log.Warn().Str("window_id", windowID).Msg("No aggregation found for completed window")
|
|
return
|
|
}
|
|
|
|
log.Info().
|
|
Str("window_id", windowID).
|
|
Int("claims_count", len(window.Claims)).
|
|
Int("agents_reporting", len(window.UniqueAgents)).
|
|
Msg("Processing completed window")
|
|
|
|
// Generate BarReport from aggregated data
|
|
barReport := window.GenerateBarReport(rs.clusterID)
|
|
|
|
// Serialize BarReport
|
|
reportData, err := json.Marshal(barReport)
|
|
if err != nil {
|
|
log.Error().Err(err).Str("window_id", windowID).Msg("Failed to marshal BarReport")
|
|
return
|
|
}
|
|
|
|
// BACKBEAT-REQ-021: Emit INT-C BarReport
|
|
reverbSubject := fmt.Sprintf("backbeat.%s.reverb", rs.clusterID)
|
|
if err := rs.natsConn.Publish(reverbSubject, reportData); err != nil {
|
|
log.Error().Err(err).
|
|
Str("window_id", windowID).
|
|
Str("subject", reverbSubject).
|
|
Msg("Failed to publish BarReport")
|
|
rs.metrics.RecordNATSError("publish_error")
|
|
return
|
|
}
|
|
|
|
processingTime := time.Since(start)
|
|
|
|
// Record metrics
|
|
rs.metrics.RecordReverbWindow(
|
|
processingTime,
|
|
len(window.Claims),
|
|
barReport.AgentsReporting,
|
|
barReport.OnTimeReviews,
|
|
barReport.TempoDriftMS,
|
|
len(reportData),
|
|
)
|
|
|
|
log.Info().
|
|
Str("window_id", windowID).
|
|
Int("claims_processed", len(window.Claims)).
|
|
Int("agents_reporting", barReport.AgentsReporting).
|
|
Int("on_time_reviews", barReport.OnTimeReviews).
|
|
Dur("processing_time", processingTime).
|
|
Int("report_size_bytes", len(reportData)).
|
|
Msg("Published BarReport")
|
|
|
|
// BACKBEAT-REQ-022: Optionally persist BarReports via DHT (placeholder)
|
|
// TODO: Implement DHT persistence when available
|
|
log.Debug().
|
|
Str("window_id", windowID).
|
|
Msg("DHT persistence placeholder - not yet implemented")
|
|
}
|
|
|
|
// getWindowIDForBeat determines the window ID for a given beat index
|
|
func (rs *ReverbService) getWindowIDForBeat(beatIndex int64) string {
|
|
if beatIndex <= 0 {
|
|
return ""
|
|
}
|
|
|
|
// Find the downbeat for this window
|
|
downbeatIndex := bb.GetDownbeatIndex(beatIndex, rs.barLength)
|
|
|
|
// Generate deterministic window ID per BACKBEAT-REQ-005
|
|
return bb.GenerateWindowID(rs.clusterID, downbeatIndex)
|
|
}
|
|
|
|
// getWindowStartBeat calculates the starting beat for a window containing the given beat
|
|
func (rs *ReverbService) getWindowStartBeat(beatIndex int64) int64 {
|
|
return bb.GetDownbeatIndex(beatIndex, rs.barLength)
|
|
}
|
|
|
|
// cleanupRoutine periodically cleans up old window aggregations
|
|
func (rs *ReverbService) cleanupRoutine() {
|
|
ticker := time.NewTicker(rs.cleanupInterval)
|
|
defer ticker.Stop()
|
|
|
|
for {
|
|
select {
|
|
case <-rs.ctx.Done():
|
|
return
|
|
case <-ticker.C:
|
|
rs.cleanupOldWindows()
|
|
}
|
|
}
|
|
}
|
|
|
|
// cleanupOldWindows removes expired window aggregations to prevent memory leaks
|
|
func (rs *ReverbService) cleanupOldWindows() {
|
|
rs.windowsMu.Lock()
|
|
defer rs.windowsMu.Unlock()
|
|
|
|
now := time.Now()
|
|
removedCount := 0
|
|
|
|
for windowID, window := range rs.windows {
|
|
if now.Sub(window.LastUpdated) > rs.windowTTL {
|
|
delete(rs.windows, windowID)
|
|
removedCount++
|
|
}
|
|
}
|
|
|
|
// Also enforce maximum window retention
|
|
if len(rs.windows) > rs.maxWindowsRetained {
|
|
// Remove oldest windows beyond limit (simple approach)
|
|
excess := len(rs.windows) - rs.maxWindowsRetained
|
|
for windowID := range rs.windows {
|
|
if excess <= 0 {
|
|
break
|
|
}
|
|
delete(rs.windows, windowID)
|
|
removedCount++
|
|
excess--
|
|
}
|
|
}
|
|
|
|
if removedCount > 0 {
|
|
log.Info().
|
|
Int("removed_count", removedCount).
|
|
Int("remaining_windows", len(rs.windows)).
|
|
Msg("Cleaned up old window aggregations")
|
|
}
|
|
|
|
// Update metrics
|
|
rs.metrics.UpdateReverbActiveWindows(len(rs.windows))
|
|
}
|
|
|
|
// startHTTPServer starts the HTTP server for health checks and metrics
|
|
func (rs *ReverbService) startHTTPServer() {
|
|
router := mux.NewRouter()
|
|
|
|
// Health endpoint
|
|
router.HandleFunc("/health", rs.healthHandler).Methods("GET")
|
|
router.HandleFunc("/ready", rs.readinessHandler).Methods("GET")
|
|
|
|
// Metrics endpoint
|
|
router.Handle("/metrics", promhttp.Handler()).Methods("GET")
|
|
|
|
// Admin API endpoints
|
|
router.HandleFunc("/api/v1/windows", rs.listWindowsHandler).Methods("GET")
|
|
router.HandleFunc("/api/v1/windows/{windowId}", rs.getWindowHandler).Methods("GET")
|
|
router.HandleFunc("/api/v1/status", rs.statusHandler).Methods("GET")
|
|
|
|
server := &http.Server{
|
|
Addr: ":8080",
|
|
Handler: router,
|
|
ReadTimeout: 10 * time.Second,
|
|
WriteTimeout: 10 * time.Second,
|
|
}
|
|
|
|
log.Info().Str("address", ":8080").Msg("Starting HTTP server")
|
|
|
|
if err := server.ListenAndServe(); err != nil && err != http.ErrServerClosed {
|
|
log.Error().Err(err).Msg("HTTP server error")
|
|
}
|
|
}
|
|
|
|
// Health check handlers
|
|
func (rs *ReverbService) healthHandler(w http.ResponseWriter, r *http.Request) {
|
|
w.Header().Set("Content-Type", "application/json")
|
|
w.WriteHeader(http.StatusOK)
|
|
json.NewEncoder(w).Encode(map[string]interface{}{
|
|
"status": "healthy",
|
|
"service": "backbeat-reverb",
|
|
"cluster_id": rs.clusterID,
|
|
"node_id": rs.nodeID,
|
|
"timestamp": time.Now().UTC().Format(time.RFC3339),
|
|
})
|
|
}
|
|
|
|
func (rs *ReverbService) readinessHandler(w http.ResponseWriter, r *http.Request) {
|
|
// Check NATS connection
|
|
if !rs.natsConn.IsConnected() {
|
|
w.WriteHeader(http.StatusServiceUnavailable)
|
|
json.NewEncoder(w).Encode(map[string]string{
|
|
"status": "not ready",
|
|
"reason": "NATS connection lost",
|
|
})
|
|
return
|
|
}
|
|
|
|
w.Header().Set("Content-Type", "application/json")
|
|
w.WriteHeader(http.StatusOK)
|
|
json.NewEncoder(w).Encode(map[string]interface{}{
|
|
"status": "ready",
|
|
"active_windows": len(rs.windows),
|
|
"current_beat": rs.currentBeat,
|
|
"current_window_id": rs.currentWindowID,
|
|
})
|
|
}
|
|
|
|
// Admin API handlers
|
|
func (rs *ReverbService) listWindowsHandler(w http.ResponseWriter, r *http.Request) {
|
|
rs.windowsMu.RLock()
|
|
defer rs.windowsMu.RUnlock()
|
|
|
|
windows := make([]map[string]interface{}, 0, len(rs.windows))
|
|
for windowID, window := range rs.windows {
|
|
windows = append(windows, map[string]interface{}{
|
|
"window_id": windowID,
|
|
"from_beat": window.FromBeat,
|
|
"to_beat": window.ToBeat,
|
|
"claims_count": len(window.Claims),
|
|
"agents_reporting": len(window.UniqueAgents),
|
|
"last_updated": window.LastUpdated.UTC().Format(time.RFC3339),
|
|
})
|
|
}
|
|
|
|
w.Header().Set("Content-Type", "application/json")
|
|
json.NewEncoder(w).Encode(map[string]interface{}{
|
|
"windows": windows,
|
|
"total_count": len(windows),
|
|
})
|
|
}
|
|
|
|
func (rs *ReverbService) getWindowHandler(w http.ResponseWriter, r *http.Request) {
|
|
vars := mux.Vars(r)
|
|
windowID := vars["windowId"]
|
|
|
|
rs.windowsMu.RLock()
|
|
window, exists := rs.windows[windowID]
|
|
rs.windowsMu.RUnlock()
|
|
|
|
if !exists {
|
|
w.WriteHeader(http.StatusNotFound)
|
|
json.NewEncoder(w).Encode(map[string]string{
|
|
"error": "window not found",
|
|
"window_id": windowID,
|
|
})
|
|
return
|
|
}
|
|
|
|
// Generate current BarReport for this window
|
|
barReport := window.GenerateBarReport(rs.clusterID)
|
|
|
|
w.Header().Set("Content-Type", "application/json")
|
|
json.NewEncoder(w).Encode(map[string]interface{}{
|
|
"window_aggregation": map[string]interface{}{
|
|
"window_id": window.WindowID,
|
|
"from_beat": window.FromBeat,
|
|
"to_beat": window.ToBeat,
|
|
"claims_count": len(window.Claims),
|
|
"unique_agents": len(window.UniqueAgents),
|
|
"state_counts": window.StateCounts,
|
|
"completed_tasks": window.CompletedTasks,
|
|
"failed_tasks": window.FailedTasks,
|
|
"last_updated": window.LastUpdated.UTC().Format(time.RFC3339),
|
|
},
|
|
"current_bar_report": barReport,
|
|
})
|
|
}
|
|
|
|
func (rs *ReverbService) statusHandler(w http.ResponseWriter, r *http.Request) {
|
|
w.Header().Set("Content-Type", "application/json")
|
|
json.NewEncoder(w).Encode(map[string]interface{}{
|
|
"service": "backbeat-reverb",
|
|
"cluster_id": rs.clusterID,
|
|
"node_id": rs.nodeID,
|
|
"active_windows": len(rs.windows),
|
|
"current_beat": rs.currentBeat,
|
|
"current_window_id": rs.currentWindowID,
|
|
"bar_length": rs.barLength,
|
|
"window_ttl_seconds": int(rs.windowTTL.Seconds()),
|
|
"max_windows_retained": rs.maxWindowsRetained,
|
|
"nats_connected": rs.natsConn.IsConnected(),
|
|
"uptime_seconds": time.Since(time.Now()).Seconds(), // Placeholder
|
|
"version": "v1.0.0",
|
|
"timestamp": time.Now().UTC().Format(time.RFC3339),
|
|
})
|
|
}
|
|
|
|
// Stop gracefully shuts down the reverb service
|
|
func (rs *ReverbService) Stop() {
|
|
log.Info().Msg("Stopping BACKBEAT reverb service")
|
|
rs.cancel()
|
|
close(rs.done)
|
|
}
|
|
|
|
func main() {
|
|
// Command line flags
|
|
clusterID := flag.String("cluster", "chorus-aus-01", "Cluster identifier")
|
|
natsURL := flag.String("nats", "nats://backbeat-nats:4222", "NATS server URL")
|
|
nodeID := flag.String("node", "", "Node identifier (auto-generated if empty)")
|
|
barLength := flag.Int("bar-length", 120, "Bar length in beats")
|
|
logLevel := flag.String("log-level", "info", "Log level (debug, info, warn, error)")
|
|
flag.Parse()
|
|
|
|
// Configure structured logging
|
|
switch *logLevel {
|
|
case "debug":
|
|
zerolog.SetGlobalLevel(zerolog.DebugLevel)
|
|
case "info":
|
|
zerolog.SetGlobalLevel(zerolog.InfoLevel)
|
|
case "warn":
|
|
zerolog.SetGlobalLevel(zerolog.WarnLevel)
|
|
case "error":
|
|
zerolog.SetGlobalLevel(zerolog.ErrorLevel)
|
|
default:
|
|
zerolog.SetGlobalLevel(zerolog.InfoLevel)
|
|
}
|
|
|
|
// Pretty logging in development
|
|
if os.Getenv("BACKBEAT_ENV") != "production" {
|
|
log.Logger = log.Output(zerolog.ConsoleWriter{Out: os.Stderr})
|
|
}
|
|
|
|
// Generate node ID if not provided
|
|
if *nodeID == "" {
|
|
*nodeID = fmt.Sprintf("reverb-%d", time.Now().Unix())
|
|
}
|
|
|
|
log.Info().
|
|
Str("cluster_id", *clusterID).
|
|
Str("node_id", *nodeID).
|
|
Str("nats_url", *natsURL).
|
|
Int("bar_length", *barLength).
|
|
Msg("Starting BACKBEAT reverb service")
|
|
|
|
// Connect to NATS
|
|
nc, err := nats.Connect(*natsURL,
|
|
nats.Timeout(10*time.Second),
|
|
nats.ReconnectWait(2*time.Second),
|
|
nats.MaxReconnects(-1),
|
|
nats.DisconnectErrHandler(func(nc *nats.Conn, err error) {
|
|
log.Error().Err(err).Msg("NATS disconnected")
|
|
}),
|
|
nats.ReconnectHandler(func(nc *nats.Conn) {
|
|
log.Info().Str("server", nc.ConnectedUrl()).Msg("NATS reconnected")
|
|
}),
|
|
)
|
|
if err != nil {
|
|
log.Fatal().Err(err).Msg("Failed to connect to NATS")
|
|
}
|
|
defer nc.Drain()
|
|
|
|
// Create and start reverb service
|
|
service := NewReverbService(*clusterID, *nodeID, nc, *barLength)
|
|
|
|
if err := service.Start(); err != nil {
|
|
log.Fatal().Err(err).Msg("Failed to start reverb service")
|
|
}
|
|
|
|
// Handle graceful shutdown
|
|
sigChan := make(chan os.Signal, 1)
|
|
signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM)
|
|
|
|
log.Info().Msg("BACKBEAT reverb service is running. Press Ctrl+C to exit.")
|
|
|
|
// Wait for shutdown signal
|
|
<-sigChan
|
|
log.Info().Msg("Shutdown signal received")
|
|
|
|
// Graceful shutdown
|
|
service.Stop()
|
|
|
|
// Wait for background tasks to complete
|
|
select {
|
|
case <-service.done:
|
|
log.Info().Msg("BACKBEAT reverb service stopped gracefully")
|
|
case <-time.After(30 * time.Second):
|
|
log.Warn().Msg("Shutdown timeout exceeded")
|
|
}
|
|
}
|