package main import ( "context" "encoding/json" "flag" "fmt" "net/http" "os" "os/signal" "strings" "sync" "syscall" "time" "github.com/google/uuid" "github.com/nats-io/nats.go" "github.com/rs/zerolog" "github.com/rs/zerolog/log" bb "github.com/chorus-services/backbeat/internal/backbeat" ) // PulseService implements the complete BACKBEAT pulse service // with leader election, HLC timing, degradation mode, and admin API type PulseService struct { mu sync.RWMutex ctx context.Context cancel context.CancelFunc logger zerolog.Logger // Core components state *bb.PulseState elector *bb.LeaderElector hlc *bb.HLC degradation *bb.DegradationManager metrics *bb.Metrics adminServer *bb.AdminServer // NATS connectivity nc *nats.Conn beatPublisher *nats.Conn controlSub *nats.Subscription // Timing control ticker *time.Ticker lastBeatTime time.Time startTime time.Time // Configuration config PulseConfig } // PulseConfig holds all configuration for the pulse service type PulseConfig struct { ClusterID string NodeID string InitialTempoBPM int BarLength int Phases []string MinBPM int MaxBPM int // Network NATSUrl string AdminPort int RaftBindAddr string // Cluster Bootstrap bool RaftPeers []string // Paths DataDir string } // Legacy control message for backward compatibility type ctrlMsg struct { Cmd string `json:"cmd"` BPM int `json:"bpm,omitempty"` To int `json:"to,omitempty"` Beats int `json:"beats,omitempty"` Easing string `json:"easing,omitempty"` Phases map[string]int `json:"phases,omitempty"` DurationBeats int `json:"duration_beats,omitempty"` } func main() { // Parse command line flags config := parseFlags() // Setup structured logging logger := setupLogging() // Create and start pulse service service, err := NewPulseService(config, logger) if err != nil { log.Fatal().Err(err).Msg("failed to create pulse service") } // Handle graceful shutdown ctx, cancel := context.WithCancel(context.Background()) defer cancel() sigCh := make(chan os.Signal, 1) signal.Notify(sigCh, syscall.SIGINT, syscall.SIGTERM) // Start service if err := service.Start(ctx); err != nil { log.Fatal().Err(err).Msg("failed to start pulse service") } logger.Info().Msg("BACKBEAT pulse service started successfully") // Wait for shutdown signal <-sigCh logger.Info().Msg("shutdown signal received") // Graceful shutdown if err := service.Shutdown(); err != nil { logger.Error().Err(err).Msg("error during shutdown") } logger.Info().Msg("BACKBEAT pulse service shutdown complete") } // parseFlags parses command line arguments func parseFlags() PulseConfig { config := PulseConfig{} var phasesStr, peersStr string flag.StringVar(&config.ClusterID, "cluster", "chorus-aus-01", "cluster identifier") flag.StringVar(&config.NodeID, "node-id", "", "node identifier (auto-generated if empty)") // REQ: BACKBEAT-REQ-002 - Default tempo should be reasonable for distributed systems // 1 BPM = 60-second beats, good for low-intensity or recovery windows // 12 BPM = 5-second beats, reasonable for production flag.IntVar(&config.InitialTempoBPM, "bpm", 1, "initial tempo in BPM (1=60s beats, 12=5s beats)") flag.IntVar(&config.BarLength, "bar", 8, "beats per bar") flag.StringVar(&phasesStr, "phases", "plan,work,review", "comma-separated phase names") flag.IntVar(&config.MinBPM, "min-bpm", 1, "minimum allowed BPM") flag.IntVar(&config.MaxBPM, "max-bpm", 24, "maximum allowed BPM") flag.StringVar(&config.NATSUrl, "nats", "nats://backbeat-nats:4222", "NATS server URL") flag.IntVar(&config.AdminPort, "admin-port", 8080, "admin API port") flag.StringVar(&config.RaftBindAddr, "raft-bind", "127.0.0.1:0", "Raft bind address") flag.BoolVar(&config.Bootstrap, "bootstrap", false, "bootstrap new cluster") flag.StringVar(&peersStr, "peers", "", "comma-separated Raft peer addresses") flag.StringVar(&config.DataDir, "data-dir", "", "data directory (auto-generated if empty)") flag.Parse() // Debug: Log all command line arguments log.Info().Strs("args", os.Args).Msg("command line arguments received") log.Info().Str("parsed_nats_url", config.NATSUrl).Msg("parsed NATS URL from flags") // Process parsed values config.Phases = strings.Split(phasesStr, ",") if peersStr != "" { config.RaftPeers = strings.Split(peersStr, ",") } // Generate node ID if not provided if config.NodeID == "" { config.NodeID = "pulse-" + uuid.New().String()[:8] } return config } // setupLogging configures structured logging func setupLogging() zerolog.Logger { // Configure zerolog zerolog.TimeFieldFormat = time.RFC3339 logger := log.With(). Str("service", "backbeat-pulse"). Str("version", "2.0.0"). Logger() return logger } // NewPulseService creates a new pulse service instance func NewPulseService(config PulseConfig, logger zerolog.Logger) (*PulseService, error) { ctx, cancel := context.WithCancel(context.Background()) service := &PulseService{ ctx: ctx, cancel: cancel, logger: logger, config: config, startTime: time.Now(), } // Initialize pulse state service.state = &bb.PulseState{ ClusterID: config.ClusterID, NodeID: config.NodeID, IsLeader: false, BeatIndex: 1, TempoBPM: config.InitialTempoBPM, PendingBPM: config.InitialTempoBPM, BarLength: config.BarLength, Phases: config.Phases, CurrentPhase: 0, LastDownbeat: time.Now(), StartTime: time.Now(), FrozenBeats: 0, } // Initialize components if err := service.initializeComponents(); err != nil { cancel() return nil, fmt.Errorf("failed to initialize components: %v", err) } return service, nil } // initializeComponents sets up all service components func (s *PulseService) initializeComponents() error { var err error // Initialize metrics s.metrics = bb.NewMetrics() // Initialize HLC s.hlc = bb.NewHLC(s.config.NodeID) // Initialize degradation manager degradationConfig := bb.DegradationConfig{ Logger: s.logger, Metrics: s.metrics, } s.degradation = bb.NewDegradationManager(degradationConfig) // Initialize leader elector leaderConfig := bb.LeaderElectorConfig{ NodeID: s.config.NodeID, BindAddr: s.config.RaftBindAddr, DataDir: s.config.DataDir, Logger: s.logger, Bootstrap: s.config.Bootstrap, Peers: s.config.RaftPeers, OnBecomeLeader: s.onBecomeLeader, OnLoseLeader: s.onLoseLeader, } s.elector, err = bb.NewLeaderElector(leaderConfig) if err != nil { return fmt.Errorf("failed to create leader elector: %v", err) } // Initialize admin server adminConfig := bb.AdminConfig{ PulseState: s.state, Metrics: s.metrics, Elector: s.elector, HLC: s.hlc, Logger: s.logger, Degradation: s.degradation, } s.adminServer = bb.NewAdminServer(adminConfig) return nil } // Start begins the pulse service operation func (s *PulseService) Start(ctx context.Context) error { s.logger.Info(). Str("cluster_id", s.config.ClusterID). Str("node_id", s.config.NodeID). Int("initial_bpm", s.config.InitialTempoBPM). Int("bar_length", s.config.BarLength). Strs("phases", s.config.Phases). Msg("starting BACKBEAT pulse service") // Connect to NATS if err := s.connectNATS(); err != nil { return fmt.Errorf("NATS connection failed: %v", err) } // Start admin HTTP server go s.startAdminServer() // Wait for leadership to be established if err := s.elector.WaitForLeader(ctx); err != nil { return fmt.Errorf("failed to establish leadership: %v", err) } // Start drift monitoring go s.degradation.MonitorDrift(ctx) // Start pulse loop go s.runPulseLoop(ctx) return nil } // connectNATS establishes NATS connection and sets up subscriptions func (s *PulseService) connectNATS() error { var err error // Connect to NATS with retry logic for Docker Swarm startup opts := []nats.Option{ nats.Timeout(10 * time.Second), nats.ReconnectWait(2 * time.Second), nats.MaxReconnects(5), nats.DisconnectErrHandler(func(nc *nats.Conn, err error) { s.logger.Warn().Err(err).Msg("NATS disconnected") }), nats.ReconnectHandler(func(nc *nats.Conn) { s.logger.Info().Msg("NATS reconnected") }), } // Retry connection up to 10 times with exponential backoff maxRetries := 10 for attempt := 1; attempt <= maxRetries; attempt++ { s.logger.Info().Int("attempt", attempt).Str("url", s.config.NATSUrl).Msg("attempting NATS connection") s.nc, err = nats.Connect(s.config.NATSUrl, opts...) if err == nil { s.logger.Info().Str("url", s.config.NATSUrl).Msg("successfully connected to NATS") break } if attempt == maxRetries { return fmt.Errorf("failed to connect to NATS after %d attempts: %v", maxRetries, err) } backoff := time.Duration(attempt) * 2 * time.Second s.logger.Warn().Err(err).Int("attempt", attempt).Dur("backoff", backoff).Msg("NATS connection failed, retrying") time.Sleep(backoff) } // Setup control message subscription for backward compatibility controlSubject := fmt.Sprintf("backbeat.%s.control", s.config.ClusterID) s.controlSub, err = s.nc.Subscribe(controlSubject, s.handleControlMessage) if err != nil { return fmt.Errorf("failed to subscribe to control messages: %v", err) } s.logger.Info(). Str("nats_url", s.config.NATSUrl). Str("control_subject", controlSubject). Msg("connected to NATS") return nil } // startAdminServer starts the HTTP admin server func (s *PulseService) startAdminServer() { addr := fmt.Sprintf(":%d", s.config.AdminPort) server := &http.Server{ Addr: addr, Handler: s.adminServer, } s.logger.Info(). Str("address", addr). Msg("starting admin API server") if err := server.ListenAndServe(); err != nil && err != http.ErrServerClosed { s.logger.Error().Err(err).Msg("admin server error") } } // runPulseLoop runs the main pulse generation loop func (s *PulseService) runPulseLoop(ctx context.Context) { // Calculate initial beat duration beatDuration := time.Duration(60000/s.state.TempoBPM) * time.Millisecond s.ticker = time.NewTicker(beatDuration) defer s.ticker.Stop() s.lastBeatTime = time.Now() for { select { case <-ctx.Done(): return case now := <-s.ticker.C: s.processBeat(now) } } } // processBeat handles a single beat event func (s *PulseService) processBeat(now time.Time) { s.mu.Lock() defer s.mu.Unlock() // Only leader publishes beats (BACKBEAT-REQ-001) if !s.elector.IsLeader() { return } // Check for downbeat and apply pending changes (BACKBEAT-REQ-004) isDownbeat := bb.IsDownbeat(s.state.BeatIndex, s.state.BarLength) if isDownbeat && s.state.FrozenBeats == 0 { // Apply pending tempo changes on downbeat if s.state.PendingBPM != s.state.TempoBPM { s.logger.Info(). Int("old_bpm", s.state.TempoBPM). Int("new_bpm", s.state.PendingBPM). Int64("beat_index", s.state.BeatIndex). Msg("applying tempo change at downbeat") s.state.TempoBPM = s.state.PendingBPM // Update ticker with new tempo beatDuration := time.Duration(60000/s.state.TempoBPM) * time.Millisecond s.ticker.Reset(beatDuration) // Update metrics s.metrics.UpdateTempoMetrics(s.state.TempoBPM) } s.state.LastDownbeat = now } // Handle frozen beats if s.state.FrozenBeats > 0 && isDownbeat { s.state.FrozenBeats-- } // Calculate current phase currentPhase := s.state.Phases[s.state.CurrentPhase%len(s.state.Phases)] // Generate window ID for downbeats (BACKBEAT-REQ-005) var windowID string if isDownbeat { downbeatIndex := bb.GetDownbeatIndex(s.state.BeatIndex, s.state.BarLength) windowID = bb.GenerateWindowID(s.state.ClusterID, downbeatIndex) } // Create BeatFrame per INT-A specification (BACKBEAT-REQ-002) beatFrame := bb.BeatFrame{ Type: "backbeat.beatframe.v1", ClusterID: s.state.ClusterID, BeatIndex: s.state.BeatIndex, Downbeat: isDownbeat, Phase: currentPhase, HLC: s.hlc.Next(), DeadlineAt: now.Add(time.Duration(60000/s.state.TempoBPM) * time.Millisecond), TempoBPM: s.state.TempoBPM, WindowID: windowID, } // Publish beat frame subject := fmt.Sprintf("backbeat.%s.beat", s.state.ClusterID) payload, err := json.Marshal(beatFrame) if err != nil { s.logger.Error().Err(err).Msg("failed to marshal beat frame") return } start := time.Now() if err := s.nc.Publish(subject, payload); err != nil { s.logger.Error().Err(err).Str("subject", subject).Msg("failed to publish beat") s.metrics.RecordNATSError("publish_error") return } publishDuration := time.Since(start) // Record timing metrics expectedTime := s.lastBeatTime.Add(time.Duration(60000/s.state.TempoBPM) * time.Millisecond) jitter := now.Sub(expectedTime).Abs() s.metrics.RecordBeatPublish(publishDuration, len(payload), isDownbeat, currentPhase) s.metrics.RecordPulseJitter(jitter) s.metrics.RecordBeatTiming(expectedTime, now) // Update degradation manager with timing info s.degradation.UpdateBeatTiming(expectedTime, now, s.state.BeatIndex) s.lastBeatTime = now // Advance beat index and phase s.state.BeatIndex++ if isDownbeat { // Move to next bar, cycle through phases s.state.CurrentPhase = (s.state.CurrentPhase + 1) % len(s.state.Phases) } s.logger.Debug(). Int64("beat_index", s.state.BeatIndex-1). Bool("downbeat", isDownbeat). Str("phase", currentPhase). Str("window_id", windowID). Dur("jitter", jitter). Msg("published beat frame") } // handleControlMessage handles legacy control messages for backward compatibility func (s *PulseService) handleControlMessage(msg *nats.Msg) { var ctrl ctrlMsg if err := json.Unmarshal(msg.Data, &ctrl); err != nil { s.logger.Warn().Err(err).Msg("invalid control message") return } s.mu.Lock() defer s.mu.Unlock() response := map[string]interface{}{ "ok": true, "apply_at_downbeat": true, "policy_hash": "v2", } switch ctrl.Cmd { case "set_bpm": if ctrl.BPM < s.config.MinBPM || ctrl.BPM > s.config.MaxBPM { response["ok"] = false response["error"] = fmt.Sprintf("BPM %d out of range [%d, %d]", ctrl.BPM, s.config.MinBPM, s.config.MaxBPM) break } // Validate tempo change if err := bb.ValidateTempoChange(s.state.TempoBPM, ctrl.BPM); err != nil { response["ok"] = false response["error"] = err.Error() s.metrics.RecordTempoChangeError() break } s.state.PendingBPM = ctrl.BPM s.logger.Info(). Int("requested_bpm", ctrl.BPM). Str("command", "set_bpm"). Msg("tempo change requested via control message") case "freeze": duration := ctrl.DurationBeats if duration <= 0 { duration = s.state.BarLength } s.state.FrozenBeats = duration s.logger.Info(). Int("duration_beats", duration). Msg("freeze requested via control message") case "unfreeze": s.state.FrozenBeats = 0 s.logger.Info().Msg("unfreeze requested via control message") default: response["ok"] = false response["error"] = "unknown command: " + ctrl.Cmd } // Send response if msg.Reply != "" { responseBytes, _ := json.Marshal(response) s.nc.Publish(msg.Reply, responseBytes) } } // onBecomeLeader is called when this node becomes the leader func (s *PulseService) onBecomeLeader() { s.mu.Lock() s.state.IsLeader = true s.mu.Unlock() s.logger.Info().Msg("became pulse leader - starting beat generation") s.metrics.RecordLeadershipChange(true) s.metrics.UpdateLeadershipMetrics(true, 1) // TODO: get actual cluster size // Exit degradation mode if active if s.degradation.IsInDegradationMode() { s.degradation.OnLeaderRecovered(s.state.TempoBPM, s.state.BeatIndex, s.hlc.Next()) } } // onLoseLeader is called when this node loses leadership func (s *PulseService) onLoseLeader() { s.mu.Lock() s.state.IsLeader = false s.mu.Unlock() s.logger.Warn().Msg("lost pulse leadership - entering degradation mode") s.metrics.RecordLeadershipChange(false) s.metrics.UpdateLeadershipMetrics(false, 1) // TODO: get actual cluster size // Enter degradation mode s.degradation.OnLeaderLost(s.state.TempoBPM, s.state.BeatIndex) } // Shutdown gracefully shuts down the pulse service func (s *PulseService) Shutdown() error { s.logger.Info().Msg("shutting down pulse service") // Cancel context s.cancel() // Stop ticker if s.ticker != nil { s.ticker.Stop() } // Close NATS connection if s.nc != nil { s.nc.Drain() } // Shutdown leader elector if s.elector != nil { if err := s.elector.Shutdown(); err != nil { s.logger.Error().Err(err).Msg("error shutting down leader elector") return err } } return nil }