618 lines
16 KiB
Go
618 lines
16 KiB
Go
package main
|
|
|
|
import (
|
|
"context"
|
|
"encoding/json"
|
|
"flag"
|
|
"fmt"
|
|
"net/http"
|
|
"os"
|
|
"os/signal"
|
|
"strings"
|
|
"sync"
|
|
"syscall"
|
|
"time"
|
|
|
|
"github.com/google/uuid"
|
|
"github.com/nats-io/nats.go"
|
|
"github.com/rs/zerolog"
|
|
"github.com/rs/zerolog/log"
|
|
|
|
bb "github.com/chorus-services/backbeat/internal/backbeat"
|
|
)
|
|
|
|
// PulseService implements the complete BACKBEAT pulse service
|
|
// with leader election, HLC timing, degradation mode, and admin API
|
|
type PulseService struct {
|
|
mu sync.RWMutex
|
|
ctx context.Context
|
|
cancel context.CancelFunc
|
|
logger zerolog.Logger
|
|
|
|
// Core components
|
|
state *bb.PulseState
|
|
elector *bb.LeaderElector
|
|
hlc *bb.HLC
|
|
degradation *bb.DegradationManager
|
|
metrics *bb.Metrics
|
|
adminServer *bb.AdminServer
|
|
|
|
// NATS connectivity
|
|
nc *nats.Conn
|
|
beatPublisher *nats.Conn
|
|
controlSub *nats.Subscription
|
|
|
|
// Timing control
|
|
ticker *time.Ticker
|
|
lastBeatTime time.Time
|
|
startTime time.Time
|
|
|
|
// Configuration
|
|
config PulseConfig
|
|
}
|
|
|
|
// PulseConfig holds all configuration for the pulse service
|
|
type PulseConfig struct {
|
|
ClusterID string
|
|
NodeID string
|
|
InitialTempoBPM int
|
|
BarLength int
|
|
Phases []string
|
|
MinBPM int
|
|
MaxBPM int
|
|
|
|
// Network
|
|
NATSUrl string
|
|
AdminPort int
|
|
RaftBindAddr string
|
|
|
|
// Cluster
|
|
Bootstrap bool
|
|
RaftPeers []string
|
|
|
|
// Paths
|
|
DataDir string
|
|
}
|
|
|
|
// Legacy control message for backward compatibility
|
|
type ctrlMsg struct {
|
|
Cmd string `json:"cmd"`
|
|
BPM int `json:"bpm,omitempty"`
|
|
To int `json:"to,omitempty"`
|
|
Beats int `json:"beats,omitempty"`
|
|
Easing string `json:"easing,omitempty"`
|
|
Phases map[string]int `json:"phases,omitempty"`
|
|
DurationBeats int `json:"duration_beats,omitempty"`
|
|
}
|
|
|
|
func main() {
|
|
// Parse command line flags
|
|
config := parseFlags()
|
|
|
|
// Setup structured logging
|
|
logger := setupLogging()
|
|
|
|
// Create and start pulse service
|
|
service, err := NewPulseService(config, logger)
|
|
if err != nil {
|
|
log.Fatal().Err(err).Msg("failed to create pulse service")
|
|
}
|
|
|
|
// Handle graceful shutdown
|
|
ctx, cancel := context.WithCancel(context.Background())
|
|
defer cancel()
|
|
|
|
sigCh := make(chan os.Signal, 1)
|
|
signal.Notify(sigCh, syscall.SIGINT, syscall.SIGTERM)
|
|
|
|
// Start service
|
|
if err := service.Start(ctx); err != nil {
|
|
log.Fatal().Err(err).Msg("failed to start pulse service")
|
|
}
|
|
|
|
logger.Info().Msg("BACKBEAT pulse service started successfully")
|
|
|
|
// Wait for shutdown signal
|
|
<-sigCh
|
|
logger.Info().Msg("shutdown signal received")
|
|
|
|
// Graceful shutdown
|
|
if err := service.Shutdown(); err != nil {
|
|
logger.Error().Err(err).Msg("error during shutdown")
|
|
}
|
|
|
|
logger.Info().Msg("BACKBEAT pulse service shutdown complete")
|
|
}
|
|
|
|
// parseFlags parses command line arguments
|
|
func parseFlags() PulseConfig {
|
|
config := PulseConfig{}
|
|
|
|
var phasesStr, peersStr string
|
|
|
|
flag.StringVar(&config.ClusterID, "cluster", "chorus-aus-01", "cluster identifier")
|
|
flag.StringVar(&config.NodeID, "node-id", "", "node identifier (auto-generated if empty)")
|
|
// REQ: BACKBEAT-REQ-002 - Default tempo should be reasonable for distributed systems
|
|
// 1 BPM = 60-second beats, good for low-intensity or recovery windows
|
|
// 12 BPM = 5-second beats, reasonable for production
|
|
flag.IntVar(&config.InitialTempoBPM, "bpm", 1, "initial tempo in BPM (1=60s beats, 12=5s beats)")
|
|
flag.IntVar(&config.BarLength, "bar", 8, "beats per bar")
|
|
flag.StringVar(&phasesStr, "phases", "plan,work,review", "comma-separated phase names")
|
|
flag.IntVar(&config.MinBPM, "min-bpm", 1, "minimum allowed BPM")
|
|
flag.IntVar(&config.MaxBPM, "max-bpm", 24, "maximum allowed BPM")
|
|
flag.StringVar(&config.NATSUrl, "nats", "nats://backbeat-nats:4222", "NATS server URL")
|
|
flag.IntVar(&config.AdminPort, "admin-port", 8080, "admin API port")
|
|
flag.StringVar(&config.RaftBindAddr, "raft-bind", "127.0.0.1:0", "Raft bind address")
|
|
flag.BoolVar(&config.Bootstrap, "bootstrap", false, "bootstrap new cluster")
|
|
flag.StringVar(&peersStr, "peers", "", "comma-separated Raft peer addresses")
|
|
flag.StringVar(&config.DataDir, "data-dir", "", "data directory (auto-generated if empty)")
|
|
|
|
flag.Parse()
|
|
|
|
// Debug: Log all command line arguments
|
|
log.Info().Strs("args", os.Args).Msg("command line arguments received")
|
|
log.Info().Str("parsed_nats_url", config.NATSUrl).Msg("parsed NATS URL from flags")
|
|
|
|
// Process parsed values
|
|
config.Phases = strings.Split(phasesStr, ",")
|
|
if peersStr != "" {
|
|
config.RaftPeers = strings.Split(peersStr, ",")
|
|
}
|
|
|
|
// Generate node ID if not provided
|
|
if config.NodeID == "" {
|
|
config.NodeID = "pulse-" + uuid.New().String()[:8]
|
|
}
|
|
|
|
return config
|
|
}
|
|
|
|
// setupLogging configures structured logging
|
|
func setupLogging() zerolog.Logger {
|
|
// Configure zerolog
|
|
zerolog.TimeFieldFormat = time.RFC3339
|
|
logger := log.With().
|
|
Str("service", "backbeat-pulse").
|
|
Str("version", "2.0.0").
|
|
Logger()
|
|
|
|
return logger
|
|
}
|
|
|
|
// NewPulseService creates a new pulse service instance
|
|
func NewPulseService(config PulseConfig, logger zerolog.Logger) (*PulseService, error) {
|
|
ctx, cancel := context.WithCancel(context.Background())
|
|
|
|
service := &PulseService{
|
|
ctx: ctx,
|
|
cancel: cancel,
|
|
logger: logger,
|
|
config: config,
|
|
startTime: time.Now(),
|
|
}
|
|
|
|
// Initialize pulse state
|
|
service.state = &bb.PulseState{
|
|
ClusterID: config.ClusterID,
|
|
NodeID: config.NodeID,
|
|
IsLeader: false,
|
|
BeatIndex: 1,
|
|
TempoBPM: config.InitialTempoBPM,
|
|
PendingBPM: config.InitialTempoBPM,
|
|
BarLength: config.BarLength,
|
|
Phases: config.Phases,
|
|
CurrentPhase: 0,
|
|
LastDownbeat: time.Now(),
|
|
StartTime: time.Now(),
|
|
FrozenBeats: 0,
|
|
}
|
|
|
|
// Initialize components
|
|
if err := service.initializeComponents(); err != nil {
|
|
cancel()
|
|
return nil, fmt.Errorf("failed to initialize components: %v", err)
|
|
}
|
|
|
|
return service, nil
|
|
}
|
|
|
|
// initializeComponents sets up all service components
|
|
func (s *PulseService) initializeComponents() error {
|
|
var err error
|
|
|
|
// Initialize metrics
|
|
s.metrics = bb.NewMetrics()
|
|
|
|
// Initialize HLC
|
|
s.hlc = bb.NewHLC(s.config.NodeID)
|
|
|
|
// Initialize degradation manager
|
|
degradationConfig := bb.DegradationConfig{
|
|
Logger: s.logger,
|
|
Metrics: s.metrics,
|
|
}
|
|
s.degradation = bb.NewDegradationManager(degradationConfig)
|
|
|
|
// Initialize leader elector
|
|
leaderConfig := bb.LeaderElectorConfig{
|
|
NodeID: s.config.NodeID,
|
|
BindAddr: s.config.RaftBindAddr,
|
|
DataDir: s.config.DataDir,
|
|
Logger: s.logger,
|
|
Bootstrap: s.config.Bootstrap,
|
|
Peers: s.config.RaftPeers,
|
|
OnBecomeLeader: s.onBecomeLeader,
|
|
OnLoseLeader: s.onLoseLeader,
|
|
}
|
|
|
|
s.elector, err = bb.NewLeaderElector(leaderConfig)
|
|
if err != nil {
|
|
return fmt.Errorf("failed to create leader elector: %v", err)
|
|
}
|
|
|
|
// Initialize admin server
|
|
adminConfig := bb.AdminConfig{
|
|
PulseState: s.state,
|
|
Metrics: s.metrics,
|
|
Elector: s.elector,
|
|
HLC: s.hlc,
|
|
Logger: s.logger,
|
|
Degradation: s.degradation,
|
|
}
|
|
s.adminServer = bb.NewAdminServer(adminConfig)
|
|
|
|
return nil
|
|
}
|
|
|
|
// Start begins the pulse service operation
|
|
func (s *PulseService) Start(ctx context.Context) error {
|
|
s.logger.Info().
|
|
Str("cluster_id", s.config.ClusterID).
|
|
Str("node_id", s.config.NodeID).
|
|
Int("initial_bpm", s.config.InitialTempoBPM).
|
|
Int("bar_length", s.config.BarLength).
|
|
Strs("phases", s.config.Phases).
|
|
Msg("starting BACKBEAT pulse service")
|
|
|
|
// Connect to NATS
|
|
if err := s.connectNATS(); err != nil {
|
|
return fmt.Errorf("NATS connection failed: %v", err)
|
|
}
|
|
|
|
// Start admin HTTP server
|
|
go s.startAdminServer()
|
|
|
|
// Wait for leadership to be established
|
|
if err := s.elector.WaitForLeader(ctx); err != nil {
|
|
return fmt.Errorf("failed to establish leadership: %v", err)
|
|
}
|
|
|
|
// Start drift monitoring
|
|
go s.degradation.MonitorDrift(ctx)
|
|
|
|
// Start pulse loop
|
|
go s.runPulseLoop(ctx)
|
|
|
|
return nil
|
|
}
|
|
|
|
// connectNATS establishes NATS connection and sets up subscriptions
|
|
func (s *PulseService) connectNATS() error {
|
|
var err error
|
|
|
|
// Connect to NATS with retry logic for Docker Swarm startup
|
|
opts := []nats.Option{
|
|
nats.Timeout(10 * time.Second),
|
|
nats.ReconnectWait(2 * time.Second),
|
|
nats.MaxReconnects(5),
|
|
nats.DisconnectErrHandler(func(nc *nats.Conn, err error) {
|
|
s.logger.Warn().Err(err).Msg("NATS disconnected")
|
|
}),
|
|
nats.ReconnectHandler(func(nc *nats.Conn) {
|
|
s.logger.Info().Msg("NATS reconnected")
|
|
}),
|
|
}
|
|
|
|
// Retry connection up to 10 times with exponential backoff
|
|
maxRetries := 10
|
|
for attempt := 1; attempt <= maxRetries; attempt++ {
|
|
s.logger.Info().Int("attempt", attempt).Str("url", s.config.NATSUrl).Msg("attempting NATS connection")
|
|
|
|
s.nc, err = nats.Connect(s.config.NATSUrl, opts...)
|
|
if err == nil {
|
|
s.logger.Info().Str("url", s.config.NATSUrl).Msg("successfully connected to NATS")
|
|
break
|
|
}
|
|
|
|
if attempt == maxRetries {
|
|
return fmt.Errorf("failed to connect to NATS after %d attempts: %v", maxRetries, err)
|
|
}
|
|
|
|
backoff := time.Duration(attempt) * 2 * time.Second
|
|
s.logger.Warn().Err(err).Int("attempt", attempt).Dur("backoff", backoff).Msg("NATS connection failed, retrying")
|
|
time.Sleep(backoff)
|
|
}
|
|
|
|
// Setup control message subscription for backward compatibility
|
|
controlSubject := fmt.Sprintf("backbeat.%s.control", s.config.ClusterID)
|
|
s.controlSub, err = s.nc.Subscribe(controlSubject, s.handleControlMessage)
|
|
if err != nil {
|
|
return fmt.Errorf("failed to subscribe to control messages: %v", err)
|
|
}
|
|
|
|
s.logger.Info().
|
|
Str("nats_url", s.config.NATSUrl).
|
|
Str("control_subject", controlSubject).
|
|
Msg("connected to NATS")
|
|
|
|
return nil
|
|
}
|
|
|
|
// startAdminServer starts the HTTP admin server
|
|
func (s *PulseService) startAdminServer() {
|
|
addr := fmt.Sprintf(":%d", s.config.AdminPort)
|
|
|
|
server := &http.Server{
|
|
Addr: addr,
|
|
Handler: s.adminServer,
|
|
}
|
|
|
|
s.logger.Info().
|
|
Str("address", addr).
|
|
Msg("starting admin API server")
|
|
|
|
if err := server.ListenAndServe(); err != nil && err != http.ErrServerClosed {
|
|
s.logger.Error().Err(err).Msg("admin server error")
|
|
}
|
|
}
|
|
|
|
// runPulseLoop runs the main pulse generation loop
|
|
func (s *PulseService) runPulseLoop(ctx context.Context) {
|
|
// Calculate initial beat duration
|
|
beatDuration := time.Duration(60000/s.state.TempoBPM) * time.Millisecond
|
|
s.ticker = time.NewTicker(beatDuration)
|
|
defer s.ticker.Stop()
|
|
|
|
s.lastBeatTime = time.Now()
|
|
|
|
for {
|
|
select {
|
|
case <-ctx.Done():
|
|
return
|
|
case now := <-s.ticker.C:
|
|
s.processBeat(now)
|
|
}
|
|
}
|
|
}
|
|
|
|
// processBeat handles a single beat event
|
|
func (s *PulseService) processBeat(now time.Time) {
|
|
s.mu.Lock()
|
|
defer s.mu.Unlock()
|
|
|
|
// Only leader publishes beats (BACKBEAT-REQ-001)
|
|
if !s.elector.IsLeader() {
|
|
return
|
|
}
|
|
|
|
// Check for downbeat and apply pending changes (BACKBEAT-REQ-004)
|
|
isDownbeat := bb.IsDownbeat(s.state.BeatIndex, s.state.BarLength)
|
|
|
|
if isDownbeat && s.state.FrozenBeats == 0 {
|
|
// Apply pending tempo changes on downbeat
|
|
if s.state.PendingBPM != s.state.TempoBPM {
|
|
s.logger.Info().
|
|
Int("old_bpm", s.state.TempoBPM).
|
|
Int("new_bpm", s.state.PendingBPM).
|
|
Int64("beat_index", s.state.BeatIndex).
|
|
Msg("applying tempo change at downbeat")
|
|
|
|
s.state.TempoBPM = s.state.PendingBPM
|
|
|
|
// Update ticker with new tempo
|
|
beatDuration := time.Duration(60000/s.state.TempoBPM) * time.Millisecond
|
|
s.ticker.Reset(beatDuration)
|
|
|
|
// Update metrics
|
|
s.metrics.UpdateTempoMetrics(s.state.TempoBPM)
|
|
}
|
|
|
|
s.state.LastDownbeat = now
|
|
}
|
|
|
|
// Handle frozen beats
|
|
if s.state.FrozenBeats > 0 && isDownbeat {
|
|
s.state.FrozenBeats--
|
|
}
|
|
|
|
// Calculate current phase
|
|
currentPhase := s.state.Phases[s.state.CurrentPhase%len(s.state.Phases)]
|
|
|
|
// Generate window ID for downbeats (BACKBEAT-REQ-005)
|
|
var windowID string
|
|
if isDownbeat {
|
|
downbeatIndex := bb.GetDownbeatIndex(s.state.BeatIndex, s.state.BarLength)
|
|
windowID = bb.GenerateWindowID(s.state.ClusterID, downbeatIndex)
|
|
}
|
|
|
|
// Create BeatFrame per INT-A specification (BACKBEAT-REQ-002)
|
|
beatFrame := bb.BeatFrame{
|
|
Type: "backbeat.beatframe.v1",
|
|
ClusterID: s.state.ClusterID,
|
|
BeatIndex: s.state.BeatIndex,
|
|
Downbeat: isDownbeat,
|
|
Phase: currentPhase,
|
|
HLC: s.hlc.Next(),
|
|
DeadlineAt: now.Add(time.Duration(60000/s.state.TempoBPM) * time.Millisecond),
|
|
TempoBPM: s.state.TempoBPM,
|
|
WindowID: windowID,
|
|
}
|
|
|
|
// Publish beat frame
|
|
subject := fmt.Sprintf("backbeat.%s.beat", s.state.ClusterID)
|
|
payload, err := json.Marshal(beatFrame)
|
|
if err != nil {
|
|
s.logger.Error().Err(err).Msg("failed to marshal beat frame")
|
|
return
|
|
}
|
|
|
|
start := time.Now()
|
|
if err := s.nc.Publish(subject, payload); err != nil {
|
|
s.logger.Error().Err(err).Str("subject", subject).Msg("failed to publish beat")
|
|
s.metrics.RecordNATSError("publish_error")
|
|
return
|
|
}
|
|
publishDuration := time.Since(start)
|
|
|
|
// Record timing metrics
|
|
expectedTime := s.lastBeatTime.Add(time.Duration(60000/s.state.TempoBPM) * time.Millisecond)
|
|
jitter := now.Sub(expectedTime).Abs()
|
|
|
|
s.metrics.RecordBeatPublish(publishDuration, len(payload), isDownbeat, currentPhase)
|
|
s.metrics.RecordPulseJitter(jitter)
|
|
s.metrics.RecordBeatTiming(expectedTime, now)
|
|
|
|
// Update degradation manager with timing info
|
|
s.degradation.UpdateBeatTiming(expectedTime, now, s.state.BeatIndex)
|
|
|
|
s.lastBeatTime = now
|
|
|
|
// Advance beat index and phase
|
|
s.state.BeatIndex++
|
|
if isDownbeat {
|
|
// Move to next bar, cycle through phases
|
|
s.state.CurrentPhase = (s.state.CurrentPhase + 1) % len(s.state.Phases)
|
|
}
|
|
|
|
s.logger.Debug().
|
|
Int64("beat_index", s.state.BeatIndex-1).
|
|
Bool("downbeat", isDownbeat).
|
|
Str("phase", currentPhase).
|
|
Str("window_id", windowID).
|
|
Dur("jitter", jitter).
|
|
Msg("published beat frame")
|
|
}
|
|
|
|
// handleControlMessage handles legacy control messages for backward compatibility
|
|
func (s *PulseService) handleControlMessage(msg *nats.Msg) {
|
|
var ctrl ctrlMsg
|
|
if err := json.Unmarshal(msg.Data, &ctrl); err != nil {
|
|
s.logger.Warn().Err(err).Msg("invalid control message")
|
|
return
|
|
}
|
|
|
|
s.mu.Lock()
|
|
defer s.mu.Unlock()
|
|
|
|
response := map[string]interface{}{
|
|
"ok": true,
|
|
"apply_at_downbeat": true,
|
|
"policy_hash": "v2",
|
|
}
|
|
|
|
switch ctrl.Cmd {
|
|
case "set_bpm":
|
|
if ctrl.BPM < s.config.MinBPM || ctrl.BPM > s.config.MaxBPM {
|
|
response["ok"] = false
|
|
response["error"] = fmt.Sprintf("BPM %d out of range [%d, %d]", ctrl.BPM, s.config.MinBPM, s.config.MaxBPM)
|
|
break
|
|
}
|
|
|
|
// Validate tempo change
|
|
if err := bb.ValidateTempoChange(s.state.TempoBPM, ctrl.BPM); err != nil {
|
|
response["ok"] = false
|
|
response["error"] = err.Error()
|
|
s.metrics.RecordTempoChangeError()
|
|
break
|
|
}
|
|
|
|
s.state.PendingBPM = ctrl.BPM
|
|
s.logger.Info().
|
|
Int("requested_bpm", ctrl.BPM).
|
|
Str("command", "set_bpm").
|
|
Msg("tempo change requested via control message")
|
|
|
|
case "freeze":
|
|
duration := ctrl.DurationBeats
|
|
if duration <= 0 {
|
|
duration = s.state.BarLength
|
|
}
|
|
s.state.FrozenBeats = duration
|
|
s.logger.Info().
|
|
Int("duration_beats", duration).
|
|
Msg("freeze requested via control message")
|
|
|
|
case "unfreeze":
|
|
s.state.FrozenBeats = 0
|
|
s.logger.Info().Msg("unfreeze requested via control message")
|
|
|
|
default:
|
|
response["ok"] = false
|
|
response["error"] = "unknown command: " + ctrl.Cmd
|
|
}
|
|
|
|
// Send response
|
|
if msg.Reply != "" {
|
|
responseBytes, _ := json.Marshal(response)
|
|
s.nc.Publish(msg.Reply, responseBytes)
|
|
}
|
|
}
|
|
|
|
// onBecomeLeader is called when this node becomes the leader
|
|
func (s *PulseService) onBecomeLeader() {
|
|
s.mu.Lock()
|
|
s.state.IsLeader = true
|
|
s.mu.Unlock()
|
|
|
|
s.logger.Info().Msg("became pulse leader - starting beat generation")
|
|
s.metrics.RecordLeadershipChange(true)
|
|
s.metrics.UpdateLeadershipMetrics(true, 1) // TODO: get actual cluster size
|
|
|
|
// Exit degradation mode if active
|
|
if s.degradation.IsInDegradationMode() {
|
|
s.degradation.OnLeaderRecovered(s.state.TempoBPM, s.state.BeatIndex, s.hlc.Next())
|
|
}
|
|
}
|
|
|
|
// onLoseLeader is called when this node loses leadership
|
|
func (s *PulseService) onLoseLeader() {
|
|
s.mu.Lock()
|
|
s.state.IsLeader = false
|
|
s.mu.Unlock()
|
|
|
|
s.logger.Warn().Msg("lost pulse leadership - entering degradation mode")
|
|
s.metrics.RecordLeadershipChange(false)
|
|
s.metrics.UpdateLeadershipMetrics(false, 1) // TODO: get actual cluster size
|
|
|
|
// Enter degradation mode
|
|
s.degradation.OnLeaderLost(s.state.TempoBPM, s.state.BeatIndex)
|
|
}
|
|
|
|
// Shutdown gracefully shuts down the pulse service
|
|
func (s *PulseService) Shutdown() error {
|
|
s.logger.Info().Msg("shutting down pulse service")
|
|
|
|
// Cancel context
|
|
s.cancel()
|
|
|
|
// Stop ticker
|
|
if s.ticker != nil {
|
|
s.ticker.Stop()
|
|
}
|
|
|
|
// Close NATS connection
|
|
if s.nc != nil {
|
|
s.nc.Drain()
|
|
}
|
|
|
|
// Shutdown leader elector
|
|
if s.elector != nil {
|
|
if err := s.elector.Shutdown(); err != nil {
|
|
s.logger.Error().Err(err).Msg("error shutting down leader elector")
|
|
return err
|
|
}
|
|
}
|
|
|
|
return nil
|
|
}
|