Implement initial scan logic and council formation for WHOOSH project kickoffs
- Replace incremental sync with full scan for new repositories - Add initial_scan status to bypass Since parameter filtering - Implement council formation detection for Design Brief issues - Add version display to WHOOSH UI header for debugging - Fix Docker token authentication with trailing newline removal - Add comprehensive council orchestration with Docker Swarm integration - Include BACKBEAT prototype integration for distributed timing - Support council-specific agent roles and deployment strategies - Transition repositories to active status after content discovery Key architectural improvements: - Full scan approach for new project detection vs incremental sync - Council formation triggered by chorus-entrypoint labeled Design Briefs - Proper token handling and authentication for Gitea API calls - Support for both initial discovery and ongoing task monitoring This enables autonomous project kickoff workflows where Design Brief issues automatically trigger formation of specialized agent councils for new projects. 🤖 Generated with [Claude Code](https://claude.ai/code) Co-Authored-By: Claude <noreply@anthropic.com>
This commit is contained in:
100
BACKBEAT-prototype/cmd/agent-sim/main.go
Normal file
100
BACKBEAT-prototype/cmd/agent-sim/main.go
Normal file
@@ -0,0 +1,100 @@
|
||||
package main
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"flag"
|
||||
"fmt"
|
||||
"log"
|
||||
"math/rand"
|
||||
"os"
|
||||
"time"
|
||||
|
||||
bb "github.com/chorus-services/backbeat/internal/backbeat"
|
||||
"github.com/nats-io/nats.go"
|
||||
"gopkg.in/yaml.v3"
|
||||
)
|
||||
|
||||
type scoreFile struct {
|
||||
Score bb.Score `yaml:"score"`
|
||||
}
|
||||
|
||||
func main() {
|
||||
cluster := flag.String("cluster", "chorus-aus-01", "cluster id")
|
||||
agentID := flag.String("id", "bzzz-1", "agent id")
|
||||
scorePath := flag.String("score", "./configs/sample-score.yaml", "score yaml path")
|
||||
natsURL := flag.String("nats", nats.DefaultURL, "nats url")
|
||||
flag.Parse()
|
||||
|
||||
buf, err := os.ReadFile(*scorePath)
|
||||
if err != nil {
|
||||
log.Fatal(err)
|
||||
}
|
||||
var s scoreFile
|
||||
if err := yaml.Unmarshal(buf, &s); err != nil {
|
||||
log.Fatal(err)
|
||||
}
|
||||
score := s.Score
|
||||
|
||||
nc, err := nats.Connect(*natsURL)
|
||||
if err != nil {
|
||||
log.Fatal(err)
|
||||
}
|
||||
defer nc.Drain()
|
||||
|
||||
hlc := bb.NewHLC(*agentID)
|
||||
state := "planning"
|
||||
waiting := 0
|
||||
beatsLeft := 0
|
||||
|
||||
nc.Subscribe(fmt.Sprintf("backbeat.%s.beat", *cluster), func(m *nats.Msg) {
|
||||
var bf bb.BeatFrame
|
||||
if err := json.Unmarshal(m.Data, &bf); err != nil {
|
||||
return
|
||||
}
|
||||
phase, _ := bb.PhaseFor(score.Phases, int(bf.BeatIndex))
|
||||
switch phase {
|
||||
case "plan":
|
||||
state = "planning"
|
||||
beatsLeft = 0
|
||||
case "work":
|
||||
if waiting == 0 && rand.Float64() < 0.3 {
|
||||
waiting = 1
|
||||
}
|
||||
if waiting > 0 {
|
||||
state = "waiting"
|
||||
beatsLeft = score.WaitBudget.Help - waiting
|
||||
waiting++
|
||||
if waiting > score.WaitBudget.Help {
|
||||
state = "executing"
|
||||
waiting = 0
|
||||
}
|
||||
} else {
|
||||
state = "executing"
|
||||
beatsLeft = 0
|
||||
}
|
||||
case "review":
|
||||
state = "review"
|
||||
waiting = 0
|
||||
beatsLeft = 0
|
||||
}
|
||||
|
||||
sc := bb.StatusClaim{
|
||||
AgentID: *agentID,
|
||||
TaskID: "ucxl://demo/task",
|
||||
BeatIndex: bf.BeatIndex,
|
||||
State: state,
|
||||
WaitFor: nil,
|
||||
BeatsLeft: beatsLeft,
|
||||
Progress: rand.Float64(),
|
||||
Notes: "proto",
|
||||
HLC: hlc.Next(),
|
||||
}
|
||||
payload, _ := json.Marshal(sc)
|
||||
nc.Publish("backbeat.status."+*agentID, payload)
|
||||
})
|
||||
|
||||
log.Printf("AgentSim %s started (cluster=%s)\n", *agentID, *cluster)
|
||||
for {
|
||||
time.Sleep(10 * time.Second)
|
||||
}
|
||||
}
|
||||
617
BACKBEAT-prototype/cmd/pulse/main.go
Normal file
617
BACKBEAT-prototype/cmd/pulse/main.go
Normal file
@@ -0,0 +1,617 @@
|
||||
package main
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"flag"
|
||||
"fmt"
|
||||
"net/http"
|
||||
"os"
|
||||
"os/signal"
|
||||
"strings"
|
||||
"sync"
|
||||
"syscall"
|
||||
"time"
|
||||
|
||||
"github.com/google/uuid"
|
||||
"github.com/nats-io/nats.go"
|
||||
"github.com/rs/zerolog"
|
||||
"github.com/rs/zerolog/log"
|
||||
|
||||
bb "github.com/chorus-services/backbeat/internal/backbeat"
|
||||
)
|
||||
|
||||
// PulseService implements the complete BACKBEAT pulse service
|
||||
// with leader election, HLC timing, degradation mode, and admin API
|
||||
type PulseService struct {
|
||||
mu sync.RWMutex
|
||||
ctx context.Context
|
||||
cancel context.CancelFunc
|
||||
logger zerolog.Logger
|
||||
|
||||
// Core components
|
||||
state *bb.PulseState
|
||||
elector *bb.LeaderElector
|
||||
hlc *bb.HLC
|
||||
degradation *bb.DegradationManager
|
||||
metrics *bb.Metrics
|
||||
adminServer *bb.AdminServer
|
||||
|
||||
// NATS connectivity
|
||||
nc *nats.Conn
|
||||
beatPublisher *nats.Conn
|
||||
controlSub *nats.Subscription
|
||||
|
||||
// Timing control
|
||||
ticker *time.Ticker
|
||||
lastBeatTime time.Time
|
||||
startTime time.Time
|
||||
|
||||
// Configuration
|
||||
config PulseConfig
|
||||
}
|
||||
|
||||
// PulseConfig holds all configuration for the pulse service
|
||||
type PulseConfig struct {
|
||||
ClusterID string
|
||||
NodeID string
|
||||
InitialTempoBPM int
|
||||
BarLength int
|
||||
Phases []string
|
||||
MinBPM int
|
||||
MaxBPM int
|
||||
|
||||
// Network
|
||||
NATSUrl string
|
||||
AdminPort int
|
||||
RaftBindAddr string
|
||||
|
||||
// Cluster
|
||||
Bootstrap bool
|
||||
RaftPeers []string
|
||||
|
||||
// Paths
|
||||
DataDir string
|
||||
}
|
||||
|
||||
// Legacy control message for backward compatibility
|
||||
type ctrlMsg struct {
|
||||
Cmd string `json:"cmd"`
|
||||
BPM int `json:"bpm,omitempty"`
|
||||
To int `json:"to,omitempty"`
|
||||
Beats int `json:"beats,omitempty"`
|
||||
Easing string `json:"easing,omitempty"`
|
||||
Phases map[string]int `json:"phases,omitempty"`
|
||||
DurationBeats int `json:"duration_beats,omitempty"`
|
||||
}
|
||||
|
||||
func main() {
|
||||
// Parse command line flags
|
||||
config := parseFlags()
|
||||
|
||||
// Setup structured logging
|
||||
logger := setupLogging()
|
||||
|
||||
// Create and start pulse service
|
||||
service, err := NewPulseService(config, logger)
|
||||
if err != nil {
|
||||
log.Fatal().Err(err).Msg("failed to create pulse service")
|
||||
}
|
||||
|
||||
// Handle graceful shutdown
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
defer cancel()
|
||||
|
||||
sigCh := make(chan os.Signal, 1)
|
||||
signal.Notify(sigCh, syscall.SIGINT, syscall.SIGTERM)
|
||||
|
||||
// Start service
|
||||
if err := service.Start(ctx); err != nil {
|
||||
log.Fatal().Err(err).Msg("failed to start pulse service")
|
||||
}
|
||||
|
||||
logger.Info().Msg("BACKBEAT pulse service started successfully")
|
||||
|
||||
// Wait for shutdown signal
|
||||
<-sigCh
|
||||
logger.Info().Msg("shutdown signal received")
|
||||
|
||||
// Graceful shutdown
|
||||
if err := service.Shutdown(); err != nil {
|
||||
logger.Error().Err(err).Msg("error during shutdown")
|
||||
}
|
||||
|
||||
logger.Info().Msg("BACKBEAT pulse service shutdown complete")
|
||||
}
|
||||
|
||||
// parseFlags parses command line arguments
|
||||
func parseFlags() PulseConfig {
|
||||
config := PulseConfig{}
|
||||
|
||||
var phasesStr, peersStr string
|
||||
|
||||
flag.StringVar(&config.ClusterID, "cluster", "chorus-aus-01", "cluster identifier")
|
||||
flag.StringVar(&config.NodeID, "node-id", "", "node identifier (auto-generated if empty)")
|
||||
// REQ: BACKBEAT-REQ-002 - Default tempo should be reasonable for distributed systems
|
||||
// 2 BPM = 30-second beats, good for development and testing
|
||||
// 12 BPM = 5-second beats, reasonable for production
|
||||
flag.IntVar(&config.InitialTempoBPM, "bpm", 2, "initial tempo in BPM (2=30s beats, 12=5s beats)")
|
||||
flag.IntVar(&config.BarLength, "bar", 8, "beats per bar")
|
||||
flag.StringVar(&phasesStr, "phases", "plan,work,review", "comma-separated phase names")
|
||||
flag.IntVar(&config.MinBPM, "min-bpm", 4, "minimum allowed BPM")
|
||||
flag.IntVar(&config.MaxBPM, "max-bpm", 24, "maximum allowed BPM")
|
||||
flag.StringVar(&config.NATSUrl, "nats", "nats://backbeat-nats:4222", "NATS server URL")
|
||||
flag.IntVar(&config.AdminPort, "admin-port", 8080, "admin API port")
|
||||
flag.StringVar(&config.RaftBindAddr, "raft-bind", "127.0.0.1:0", "Raft bind address")
|
||||
flag.BoolVar(&config.Bootstrap, "bootstrap", false, "bootstrap new cluster")
|
||||
flag.StringVar(&peersStr, "peers", "", "comma-separated Raft peer addresses")
|
||||
flag.StringVar(&config.DataDir, "data-dir", "", "data directory (auto-generated if empty)")
|
||||
|
||||
flag.Parse()
|
||||
|
||||
// Debug: Log all command line arguments
|
||||
log.Info().Strs("args", os.Args).Msg("command line arguments received")
|
||||
log.Info().Str("parsed_nats_url", config.NATSUrl).Msg("parsed NATS URL from flags")
|
||||
|
||||
// Process parsed values
|
||||
config.Phases = strings.Split(phasesStr, ",")
|
||||
if peersStr != "" {
|
||||
config.RaftPeers = strings.Split(peersStr, ",")
|
||||
}
|
||||
|
||||
// Generate node ID if not provided
|
||||
if config.NodeID == "" {
|
||||
config.NodeID = "pulse-" + uuid.New().String()[:8]
|
||||
}
|
||||
|
||||
return config
|
||||
}
|
||||
|
||||
// setupLogging configures structured logging
|
||||
func setupLogging() zerolog.Logger {
|
||||
// Configure zerolog
|
||||
zerolog.TimeFieldFormat = time.RFC3339
|
||||
logger := log.With().
|
||||
Str("service", "backbeat-pulse").
|
||||
Str("version", "2.0.0").
|
||||
Logger()
|
||||
|
||||
return logger
|
||||
}
|
||||
|
||||
// NewPulseService creates a new pulse service instance
|
||||
func NewPulseService(config PulseConfig, logger zerolog.Logger) (*PulseService, error) {
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
|
||||
service := &PulseService{
|
||||
ctx: ctx,
|
||||
cancel: cancel,
|
||||
logger: logger,
|
||||
config: config,
|
||||
startTime: time.Now(),
|
||||
}
|
||||
|
||||
// Initialize pulse state
|
||||
service.state = &bb.PulseState{
|
||||
ClusterID: config.ClusterID,
|
||||
NodeID: config.NodeID,
|
||||
IsLeader: false,
|
||||
BeatIndex: 1,
|
||||
TempoBPM: config.InitialTempoBPM,
|
||||
PendingBPM: config.InitialTempoBPM,
|
||||
BarLength: config.BarLength,
|
||||
Phases: config.Phases,
|
||||
CurrentPhase: 0,
|
||||
LastDownbeat: time.Now(),
|
||||
StartTime: time.Now(),
|
||||
FrozenBeats: 0,
|
||||
}
|
||||
|
||||
// Initialize components
|
||||
if err := service.initializeComponents(); err != nil {
|
||||
cancel()
|
||||
return nil, fmt.Errorf("failed to initialize components: %v", err)
|
||||
}
|
||||
|
||||
return service, nil
|
||||
}
|
||||
|
||||
// initializeComponents sets up all service components
|
||||
func (s *PulseService) initializeComponents() error {
|
||||
var err error
|
||||
|
||||
// Initialize metrics
|
||||
s.metrics = bb.NewMetrics()
|
||||
|
||||
// Initialize HLC
|
||||
s.hlc = bb.NewHLC(s.config.NodeID)
|
||||
|
||||
// Initialize degradation manager
|
||||
degradationConfig := bb.DegradationConfig{
|
||||
Logger: s.logger,
|
||||
Metrics: s.metrics,
|
||||
}
|
||||
s.degradation = bb.NewDegradationManager(degradationConfig)
|
||||
|
||||
// Initialize leader elector
|
||||
leaderConfig := bb.LeaderElectorConfig{
|
||||
NodeID: s.config.NodeID,
|
||||
BindAddr: s.config.RaftBindAddr,
|
||||
DataDir: s.config.DataDir,
|
||||
Logger: s.logger,
|
||||
Bootstrap: s.config.Bootstrap,
|
||||
Peers: s.config.RaftPeers,
|
||||
OnBecomeLeader: s.onBecomeLeader,
|
||||
OnLoseLeader: s.onLoseLeader,
|
||||
}
|
||||
|
||||
s.elector, err = bb.NewLeaderElector(leaderConfig)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to create leader elector: %v", err)
|
||||
}
|
||||
|
||||
// Initialize admin server
|
||||
adminConfig := bb.AdminConfig{
|
||||
PulseState: s.state,
|
||||
Metrics: s.metrics,
|
||||
Elector: s.elector,
|
||||
HLC: s.hlc,
|
||||
Logger: s.logger,
|
||||
Degradation: s.degradation,
|
||||
}
|
||||
s.adminServer = bb.NewAdminServer(adminConfig)
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// Start begins the pulse service operation
|
||||
func (s *PulseService) Start(ctx context.Context) error {
|
||||
s.logger.Info().
|
||||
Str("cluster_id", s.config.ClusterID).
|
||||
Str("node_id", s.config.NodeID).
|
||||
Int("initial_bpm", s.config.InitialTempoBPM).
|
||||
Int("bar_length", s.config.BarLength).
|
||||
Strs("phases", s.config.Phases).
|
||||
Msg("starting BACKBEAT pulse service")
|
||||
|
||||
// Connect to NATS
|
||||
if err := s.connectNATS(); err != nil {
|
||||
return fmt.Errorf("NATS connection failed: %v", err)
|
||||
}
|
||||
|
||||
// Start admin HTTP server
|
||||
go s.startAdminServer()
|
||||
|
||||
// Wait for leadership to be established
|
||||
if err := s.elector.WaitForLeader(ctx); err != nil {
|
||||
return fmt.Errorf("failed to establish leadership: %v", err)
|
||||
}
|
||||
|
||||
// Start drift monitoring
|
||||
go s.degradation.MonitorDrift(ctx)
|
||||
|
||||
// Start pulse loop
|
||||
go s.runPulseLoop(ctx)
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// connectNATS establishes NATS connection and sets up subscriptions
|
||||
func (s *PulseService) connectNATS() error {
|
||||
var err error
|
||||
|
||||
// Connect to NATS with retry logic for Docker Swarm startup
|
||||
opts := []nats.Option{
|
||||
nats.Timeout(10 * time.Second),
|
||||
nats.ReconnectWait(2 * time.Second),
|
||||
nats.MaxReconnects(5),
|
||||
nats.DisconnectErrHandler(func(nc *nats.Conn, err error) {
|
||||
s.logger.Warn().Err(err).Msg("NATS disconnected")
|
||||
}),
|
||||
nats.ReconnectHandler(func(nc *nats.Conn) {
|
||||
s.logger.Info().Msg("NATS reconnected")
|
||||
}),
|
||||
}
|
||||
|
||||
// Retry connection up to 10 times with exponential backoff
|
||||
maxRetries := 10
|
||||
for attempt := 1; attempt <= maxRetries; attempt++ {
|
||||
s.logger.Info().Int("attempt", attempt).Str("url", s.config.NATSUrl).Msg("attempting NATS connection")
|
||||
|
||||
s.nc, err = nats.Connect(s.config.NATSUrl, opts...)
|
||||
if err == nil {
|
||||
s.logger.Info().Str("url", s.config.NATSUrl).Msg("successfully connected to NATS")
|
||||
break
|
||||
}
|
||||
|
||||
if attempt == maxRetries {
|
||||
return fmt.Errorf("failed to connect to NATS after %d attempts: %v", maxRetries, err)
|
||||
}
|
||||
|
||||
backoff := time.Duration(attempt) * 2 * time.Second
|
||||
s.logger.Warn().Err(err).Int("attempt", attempt).Dur("backoff", backoff).Msg("NATS connection failed, retrying")
|
||||
time.Sleep(backoff)
|
||||
}
|
||||
|
||||
// Setup control message subscription for backward compatibility
|
||||
controlSubject := fmt.Sprintf("backbeat.%s.control", s.config.ClusterID)
|
||||
s.controlSub, err = s.nc.Subscribe(controlSubject, s.handleControlMessage)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to subscribe to control messages: %v", err)
|
||||
}
|
||||
|
||||
s.logger.Info().
|
||||
Str("nats_url", s.config.NATSUrl).
|
||||
Str("control_subject", controlSubject).
|
||||
Msg("connected to NATS")
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// startAdminServer starts the HTTP admin server
|
||||
func (s *PulseService) startAdminServer() {
|
||||
addr := fmt.Sprintf(":%d", s.config.AdminPort)
|
||||
|
||||
server := &http.Server{
|
||||
Addr: addr,
|
||||
Handler: s.adminServer,
|
||||
}
|
||||
|
||||
s.logger.Info().
|
||||
Str("address", addr).
|
||||
Msg("starting admin API server")
|
||||
|
||||
if err := server.ListenAndServe(); err != nil && err != http.ErrServerClosed {
|
||||
s.logger.Error().Err(err).Msg("admin server error")
|
||||
}
|
||||
}
|
||||
|
||||
// runPulseLoop runs the main pulse generation loop
|
||||
func (s *PulseService) runPulseLoop(ctx context.Context) {
|
||||
// Calculate initial beat duration
|
||||
beatDuration := time.Duration(60000/s.state.TempoBPM) * time.Millisecond
|
||||
s.ticker = time.NewTicker(beatDuration)
|
||||
defer s.ticker.Stop()
|
||||
|
||||
s.lastBeatTime = time.Now()
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return
|
||||
case now := <-s.ticker.C:
|
||||
s.processBeat(now)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// processBeat handles a single beat event
|
||||
func (s *PulseService) processBeat(now time.Time) {
|
||||
s.mu.Lock()
|
||||
defer s.mu.Unlock()
|
||||
|
||||
// Only leader publishes beats (BACKBEAT-REQ-001)
|
||||
if !s.elector.IsLeader() {
|
||||
return
|
||||
}
|
||||
|
||||
// Check for downbeat and apply pending changes (BACKBEAT-REQ-004)
|
||||
isDownbeat := bb.IsDownbeat(s.state.BeatIndex, s.state.BarLength)
|
||||
|
||||
if isDownbeat && s.state.FrozenBeats == 0 {
|
||||
// Apply pending tempo changes on downbeat
|
||||
if s.state.PendingBPM != s.state.TempoBPM {
|
||||
s.logger.Info().
|
||||
Int("old_bpm", s.state.TempoBPM).
|
||||
Int("new_bpm", s.state.PendingBPM).
|
||||
Int64("beat_index", s.state.BeatIndex).
|
||||
Msg("applying tempo change at downbeat")
|
||||
|
||||
s.state.TempoBPM = s.state.PendingBPM
|
||||
|
||||
// Update ticker with new tempo
|
||||
beatDuration := time.Duration(60000/s.state.TempoBPM) * time.Millisecond
|
||||
s.ticker.Reset(beatDuration)
|
||||
|
||||
// Update metrics
|
||||
s.metrics.UpdateTempoMetrics(s.state.TempoBPM)
|
||||
}
|
||||
|
||||
s.state.LastDownbeat = now
|
||||
}
|
||||
|
||||
// Handle frozen beats
|
||||
if s.state.FrozenBeats > 0 && isDownbeat {
|
||||
s.state.FrozenBeats--
|
||||
}
|
||||
|
||||
// Calculate current phase
|
||||
currentPhase := s.state.Phases[s.state.CurrentPhase%len(s.state.Phases)]
|
||||
|
||||
// Generate window ID for downbeats (BACKBEAT-REQ-005)
|
||||
var windowID string
|
||||
if isDownbeat {
|
||||
downbeatIndex := bb.GetDownbeatIndex(s.state.BeatIndex, s.state.BarLength)
|
||||
windowID = bb.GenerateWindowID(s.state.ClusterID, downbeatIndex)
|
||||
}
|
||||
|
||||
// Create BeatFrame per INT-A specification (BACKBEAT-REQ-002)
|
||||
beatFrame := bb.BeatFrame{
|
||||
Type: "backbeat.beatframe.v1",
|
||||
ClusterID: s.state.ClusterID,
|
||||
BeatIndex: s.state.BeatIndex,
|
||||
Downbeat: isDownbeat,
|
||||
Phase: currentPhase,
|
||||
HLC: s.hlc.Next(),
|
||||
DeadlineAt: now.Add(time.Duration(60000/s.state.TempoBPM) * time.Millisecond),
|
||||
TempoBPM: s.state.TempoBPM,
|
||||
WindowID: windowID,
|
||||
}
|
||||
|
||||
// Publish beat frame
|
||||
subject := fmt.Sprintf("backbeat.%s.beat", s.state.ClusterID)
|
||||
payload, err := json.Marshal(beatFrame)
|
||||
if err != nil {
|
||||
s.logger.Error().Err(err).Msg("failed to marshal beat frame")
|
||||
return
|
||||
}
|
||||
|
||||
start := time.Now()
|
||||
if err := s.nc.Publish(subject, payload); err != nil {
|
||||
s.logger.Error().Err(err).Str("subject", subject).Msg("failed to publish beat")
|
||||
s.metrics.RecordNATSError("publish_error")
|
||||
return
|
||||
}
|
||||
publishDuration := time.Since(start)
|
||||
|
||||
// Record timing metrics
|
||||
expectedTime := s.lastBeatTime.Add(time.Duration(60000/s.state.TempoBPM) * time.Millisecond)
|
||||
jitter := now.Sub(expectedTime).Abs()
|
||||
|
||||
s.metrics.RecordBeatPublish(publishDuration, len(payload), isDownbeat, currentPhase)
|
||||
s.metrics.RecordPulseJitter(jitter)
|
||||
s.metrics.RecordBeatTiming(expectedTime, now)
|
||||
|
||||
// Update degradation manager with timing info
|
||||
s.degradation.UpdateBeatTiming(expectedTime, now, s.state.BeatIndex)
|
||||
|
||||
s.lastBeatTime = now
|
||||
|
||||
// Advance beat index and phase
|
||||
s.state.BeatIndex++
|
||||
if isDownbeat {
|
||||
// Move to next bar, cycle through phases
|
||||
s.state.CurrentPhase = (s.state.CurrentPhase + 1) % len(s.state.Phases)
|
||||
}
|
||||
|
||||
s.logger.Debug().
|
||||
Int64("beat_index", s.state.BeatIndex-1).
|
||||
Bool("downbeat", isDownbeat).
|
||||
Str("phase", currentPhase).
|
||||
Str("window_id", windowID).
|
||||
Dur("jitter", jitter).
|
||||
Msg("published beat frame")
|
||||
}
|
||||
|
||||
// handleControlMessage handles legacy control messages for backward compatibility
|
||||
func (s *PulseService) handleControlMessage(msg *nats.Msg) {
|
||||
var ctrl ctrlMsg
|
||||
if err := json.Unmarshal(msg.Data, &ctrl); err != nil {
|
||||
s.logger.Warn().Err(err).Msg("invalid control message")
|
||||
return
|
||||
}
|
||||
|
||||
s.mu.Lock()
|
||||
defer s.mu.Unlock()
|
||||
|
||||
response := map[string]interface{}{
|
||||
"ok": true,
|
||||
"apply_at_downbeat": true,
|
||||
"policy_hash": "v2",
|
||||
}
|
||||
|
||||
switch ctrl.Cmd {
|
||||
case "set_bpm":
|
||||
if ctrl.BPM < s.config.MinBPM || ctrl.BPM > s.config.MaxBPM {
|
||||
response["ok"] = false
|
||||
response["error"] = fmt.Sprintf("BPM %d out of range [%d, %d]", ctrl.BPM, s.config.MinBPM, s.config.MaxBPM)
|
||||
break
|
||||
}
|
||||
|
||||
// Validate tempo change
|
||||
if err := bb.ValidateTempoChange(s.state.TempoBPM, ctrl.BPM); err != nil {
|
||||
response["ok"] = false
|
||||
response["error"] = err.Error()
|
||||
s.metrics.RecordTempoChangeError()
|
||||
break
|
||||
}
|
||||
|
||||
s.state.PendingBPM = ctrl.BPM
|
||||
s.logger.Info().
|
||||
Int("requested_bpm", ctrl.BPM).
|
||||
Str("command", "set_bpm").
|
||||
Msg("tempo change requested via control message")
|
||||
|
||||
case "freeze":
|
||||
duration := ctrl.DurationBeats
|
||||
if duration <= 0 {
|
||||
duration = s.state.BarLength
|
||||
}
|
||||
s.state.FrozenBeats = duration
|
||||
s.logger.Info().
|
||||
Int("duration_beats", duration).
|
||||
Msg("freeze requested via control message")
|
||||
|
||||
case "unfreeze":
|
||||
s.state.FrozenBeats = 0
|
||||
s.logger.Info().Msg("unfreeze requested via control message")
|
||||
|
||||
default:
|
||||
response["ok"] = false
|
||||
response["error"] = "unknown command: " + ctrl.Cmd
|
||||
}
|
||||
|
||||
// Send response
|
||||
if msg.Reply != "" {
|
||||
responseBytes, _ := json.Marshal(response)
|
||||
s.nc.Publish(msg.Reply, responseBytes)
|
||||
}
|
||||
}
|
||||
|
||||
// onBecomeLeader is called when this node becomes the leader
|
||||
func (s *PulseService) onBecomeLeader() {
|
||||
s.mu.Lock()
|
||||
s.state.IsLeader = true
|
||||
s.mu.Unlock()
|
||||
|
||||
s.logger.Info().Msg("became pulse leader - starting beat generation")
|
||||
s.metrics.RecordLeadershipChange(true)
|
||||
s.metrics.UpdateLeadershipMetrics(true, 1) // TODO: get actual cluster size
|
||||
|
||||
// Exit degradation mode if active
|
||||
if s.degradation.IsInDegradationMode() {
|
||||
s.degradation.OnLeaderRecovered(s.state.TempoBPM, s.state.BeatIndex, s.hlc.Next())
|
||||
}
|
||||
}
|
||||
|
||||
// onLoseLeader is called when this node loses leadership
|
||||
func (s *PulseService) onLoseLeader() {
|
||||
s.mu.Lock()
|
||||
s.state.IsLeader = false
|
||||
s.mu.Unlock()
|
||||
|
||||
s.logger.Warn().Msg("lost pulse leadership - entering degradation mode")
|
||||
s.metrics.RecordLeadershipChange(false)
|
||||
s.metrics.UpdateLeadershipMetrics(false, 1) // TODO: get actual cluster size
|
||||
|
||||
// Enter degradation mode
|
||||
s.degradation.OnLeaderLost(s.state.TempoBPM, s.state.BeatIndex)
|
||||
}
|
||||
|
||||
// Shutdown gracefully shuts down the pulse service
|
||||
func (s *PulseService) Shutdown() error {
|
||||
s.logger.Info().Msg("shutting down pulse service")
|
||||
|
||||
// Cancel context
|
||||
s.cancel()
|
||||
|
||||
// Stop ticker
|
||||
if s.ticker != nil {
|
||||
s.ticker.Stop()
|
||||
}
|
||||
|
||||
// Close NATS connection
|
||||
if s.nc != nil {
|
||||
s.nc.Drain()
|
||||
}
|
||||
|
||||
// Shutdown leader elector
|
||||
if s.elector != nil {
|
||||
if err := s.elector.Shutdown(); err != nil {
|
||||
s.logger.Error().Err(err).Msg("error shutting down leader elector")
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
585
BACKBEAT-prototype/cmd/reverb/main.go
Normal file
585
BACKBEAT-prototype/cmd/reverb/main.go
Normal file
@@ -0,0 +1,585 @@
|
||||
package main
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"flag"
|
||||
"fmt"
|
||||
"net/http"
|
||||
"os"
|
||||
"os/signal"
|
||||
"sync"
|
||||
"syscall"
|
||||
"time"
|
||||
|
||||
"github.com/gorilla/mux"
|
||||
"github.com/nats-io/nats.go"
|
||||
"github.com/prometheus/client_golang/prometheus/promhttp"
|
||||
"github.com/rs/zerolog"
|
||||
"github.com/rs/zerolog/log"
|
||||
|
||||
bb "github.com/chorus-services/backbeat/internal/backbeat"
|
||||
)
|
||||
|
||||
// ReverbService implements BACKBEAT-REQ-020, BACKBEAT-REQ-021, BACKBEAT-REQ-022
|
||||
// Aggregates StatusClaims from agents and produces BarReports for each window
|
||||
type ReverbService struct {
|
||||
clusterID string
|
||||
nodeID string
|
||||
natsConn *nats.Conn
|
||||
metrics *bb.Metrics
|
||||
|
||||
// Window management
|
||||
windowsMu sync.RWMutex
|
||||
windows map[string]*bb.WindowAggregation // windowID -> aggregation
|
||||
windowTTL time.Duration
|
||||
barLength int
|
||||
|
||||
// Pulse synchronization
|
||||
currentBeat int64
|
||||
currentWindowID string
|
||||
|
||||
// Configuration
|
||||
maxWindowsRetained int
|
||||
cleanupInterval time.Duration
|
||||
|
||||
// Control channels
|
||||
ctx context.Context
|
||||
cancel context.CancelFunc
|
||||
done chan struct{}
|
||||
}
|
||||
|
||||
// NewReverbService creates a new reverb aggregation service
|
||||
func NewReverbService(clusterID, nodeID string, natsConn *nats.Conn, barLength int) *ReverbService {
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
|
||||
return &ReverbService{
|
||||
clusterID: clusterID,
|
||||
nodeID: nodeID,
|
||||
natsConn: natsConn,
|
||||
metrics: bb.NewMetrics(),
|
||||
windows: make(map[string]*bb.WindowAggregation),
|
||||
windowTTL: 5 * time.Minute, // Keep windows for 5 minutes after completion
|
||||
barLength: barLength,
|
||||
maxWindowsRetained: 100, // Prevent memory leaks
|
||||
cleanupInterval: 30 * time.Second,
|
||||
ctx: ctx,
|
||||
cancel: cancel,
|
||||
done: make(chan struct{}),
|
||||
}
|
||||
}
|
||||
|
||||
// Start initializes and starts the reverb service
|
||||
// BACKBEAT-REQ-020: Subscribe to INT-B StatusClaims; group by window_id
|
||||
// BACKBEAT-REQ-021: Emit INT-C BarReport at each downbeat with KPIs
|
||||
func (rs *ReverbService) Start() error {
|
||||
log.Info().
|
||||
Str("cluster_id", rs.clusterID).
|
||||
Str("node_id", rs.nodeID).
|
||||
Int("bar_length", rs.barLength).
|
||||
Msg("Starting BACKBEAT reverb service")
|
||||
|
||||
// BACKBEAT-REQ-020: Subscribe to StatusClaims on status channel
|
||||
beatSubject := fmt.Sprintf("backbeat.%s.beat", rs.clusterID)
|
||||
statusSubject := fmt.Sprintf("backbeat.%s.status", rs.clusterID)
|
||||
|
||||
// Subscribe to pulse BeatFrames for downbeat timing
|
||||
_, err := rs.natsConn.Subscribe(beatSubject, rs.handleBeatFrame)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to subscribe to beat channel: %w", err)
|
||||
}
|
||||
log.Info().Str("subject", beatSubject).Msg("Subscribed to pulse beat channel")
|
||||
|
||||
// Subscribe to StatusClaims for aggregation
|
||||
_, err = rs.natsConn.Subscribe(statusSubject, rs.handleStatusClaim)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to subscribe to status channel: %w", err)
|
||||
}
|
||||
log.Info().Str("subject", statusSubject).Msg("Subscribed to agent status channel")
|
||||
|
||||
// Start background cleanup goroutine
|
||||
go rs.cleanupRoutine()
|
||||
|
||||
// Start HTTP server for health and metrics
|
||||
go rs.startHTTPServer()
|
||||
|
||||
log.Info().Msg("BACKBEAT reverb service started successfully")
|
||||
return nil
|
||||
}
|
||||
|
||||
// handleBeatFrame processes incoming BeatFrames to detect downbeats
|
||||
// BACKBEAT-REQ-021: Emit INT-C BarReport at each downbeat with KPIs
|
||||
func (rs *ReverbService) handleBeatFrame(msg *nats.Msg) {
|
||||
var bf bb.BeatFrame
|
||||
if err := json.Unmarshal(msg.Data, &bf); err != nil {
|
||||
log.Error().Err(err).Msg("Failed to unmarshal BeatFrame")
|
||||
rs.metrics.RecordNATSError("unmarshal_error")
|
||||
return
|
||||
}
|
||||
|
||||
rs.currentBeat = bf.BeatIndex
|
||||
|
||||
// Process downbeat - emit BarReport for previous window
|
||||
if bf.Downbeat && rs.currentWindowID != "" && rs.currentWindowID != bf.WindowID {
|
||||
rs.processDownbeat(rs.currentWindowID)
|
||||
}
|
||||
|
||||
// Update current window
|
||||
rs.currentWindowID = bf.WindowID
|
||||
|
||||
log.Debug().
|
||||
Int64("beat_index", bf.BeatIndex).
|
||||
Bool("downbeat", bf.Downbeat).
|
||||
Str("window_id", bf.WindowID).
|
||||
Msg("Processed beat frame")
|
||||
}
|
||||
|
||||
// handleStatusClaim processes incoming StatusClaims for aggregation
|
||||
// BACKBEAT-REQ-020: Subscribe to INT-B StatusClaims; group by window_id
|
||||
func (rs *ReverbService) handleStatusClaim(msg *nats.Msg) {
|
||||
var sc bb.StatusClaim
|
||||
if err := json.Unmarshal(msg.Data, &sc); err != nil {
|
||||
log.Error().Err(err).Msg("Failed to unmarshal StatusClaim")
|
||||
rs.metrics.RecordNATSError("unmarshal_error")
|
||||
return
|
||||
}
|
||||
|
||||
// Validate StatusClaim according to INT-B specification
|
||||
if err := bb.ValidateStatusClaim(&sc); err != nil {
|
||||
log.Warn().Err(err).
|
||||
Str("agent_id", sc.AgentID).
|
||||
Str("task_id", sc.TaskID).
|
||||
Msg("Invalid StatusClaim received")
|
||||
return
|
||||
}
|
||||
|
||||
// Determine window ID for this claim
|
||||
windowID := rs.getWindowIDForBeat(sc.BeatIndex)
|
||||
if windowID == "" {
|
||||
log.Warn().
|
||||
Int64("beat_index", sc.BeatIndex).
|
||||
Msg("Could not determine window ID for StatusClaim")
|
||||
return
|
||||
}
|
||||
|
||||
// Add claim to appropriate window aggregation
|
||||
rs.addClaimToWindow(windowID, &sc)
|
||||
|
||||
rs.metrics.RecordReverbClaim()
|
||||
|
||||
log.Debug().
|
||||
Str("agent_id", sc.AgentID).
|
||||
Str("task_id", sc.TaskID).
|
||||
Str("state", sc.State).
|
||||
Str("window_id", windowID).
|
||||
Msg("Processed status claim")
|
||||
}
|
||||
|
||||
// addClaimToWindow adds a StatusClaim to the appropriate window aggregation
|
||||
func (rs *ReverbService) addClaimToWindow(windowID string, claim *bb.StatusClaim) {
|
||||
rs.windowsMu.Lock()
|
||||
defer rs.windowsMu.Unlock()
|
||||
|
||||
// Get or create window aggregation
|
||||
window, exists := rs.windows[windowID]
|
||||
if !exists {
|
||||
// Create new window - calculate beat range
|
||||
fromBeat := rs.getWindowStartBeat(claim.BeatIndex)
|
||||
toBeat := fromBeat + int64(rs.barLength) - 1
|
||||
|
||||
window = bb.NewWindowAggregation(windowID, fromBeat, toBeat)
|
||||
rs.windows[windowID] = window
|
||||
|
||||
log.Info().
|
||||
Str("window_id", windowID).
|
||||
Int64("from_beat", fromBeat).
|
||||
Int64("to_beat", toBeat).
|
||||
Msg("Created new window aggregation")
|
||||
}
|
||||
|
||||
// Add claim to window
|
||||
window.AddClaim(claim)
|
||||
|
||||
// Update metrics
|
||||
rs.metrics.UpdateReverbActiveWindows(len(rs.windows))
|
||||
}
|
||||
|
||||
// processDownbeat processes a completed window and emits BarReport
|
||||
// BACKBEAT-REQ-021: Emit INT-C BarReport at each downbeat with KPIs
|
||||
// BACKBEAT-PER-002: Reverb rollup complete ≤ 1 beat after downbeat
|
||||
func (rs *ReverbService) processDownbeat(windowID string) {
|
||||
start := time.Now()
|
||||
|
||||
rs.windowsMu.RLock()
|
||||
window, exists := rs.windows[windowID]
|
||||
rs.windowsMu.RUnlock()
|
||||
|
||||
if !exists {
|
||||
log.Warn().Str("window_id", windowID).Msg("No aggregation found for completed window")
|
||||
return
|
||||
}
|
||||
|
||||
log.Info().
|
||||
Str("window_id", windowID).
|
||||
Int("claims_count", len(window.Claims)).
|
||||
Int("agents_reporting", len(window.UniqueAgents)).
|
||||
Msg("Processing completed window")
|
||||
|
||||
// Generate BarReport from aggregated data
|
||||
barReport := window.GenerateBarReport(rs.clusterID)
|
||||
|
||||
// Serialize BarReport
|
||||
reportData, err := json.Marshal(barReport)
|
||||
if err != nil {
|
||||
log.Error().Err(err).Str("window_id", windowID).Msg("Failed to marshal BarReport")
|
||||
return
|
||||
}
|
||||
|
||||
// BACKBEAT-REQ-021: Emit INT-C BarReport
|
||||
reverbSubject := fmt.Sprintf("backbeat.%s.reverb", rs.clusterID)
|
||||
if err := rs.natsConn.Publish(reverbSubject, reportData); err != nil {
|
||||
log.Error().Err(err).
|
||||
Str("window_id", windowID).
|
||||
Str("subject", reverbSubject).
|
||||
Msg("Failed to publish BarReport")
|
||||
rs.metrics.RecordNATSError("publish_error")
|
||||
return
|
||||
}
|
||||
|
||||
processingTime := time.Since(start)
|
||||
|
||||
// Record metrics
|
||||
rs.metrics.RecordReverbWindow(
|
||||
processingTime,
|
||||
len(window.Claims),
|
||||
barReport.AgentsReporting,
|
||||
barReport.OnTimeReviews,
|
||||
barReport.TempoDriftMS,
|
||||
len(reportData),
|
||||
)
|
||||
|
||||
log.Info().
|
||||
Str("window_id", windowID).
|
||||
Int("claims_processed", len(window.Claims)).
|
||||
Int("agents_reporting", barReport.AgentsReporting).
|
||||
Int("on_time_reviews", barReport.OnTimeReviews).
|
||||
Dur("processing_time", processingTime).
|
||||
Int("report_size_bytes", len(reportData)).
|
||||
Msg("Published BarReport")
|
||||
|
||||
// BACKBEAT-REQ-022: Optionally persist BarReports via DHT (placeholder)
|
||||
// TODO: Implement DHT persistence when available
|
||||
log.Debug().
|
||||
Str("window_id", windowID).
|
||||
Msg("DHT persistence placeholder - not yet implemented")
|
||||
}
|
||||
|
||||
// getWindowIDForBeat determines the window ID for a given beat index
|
||||
func (rs *ReverbService) getWindowIDForBeat(beatIndex int64) string {
|
||||
if beatIndex <= 0 {
|
||||
return ""
|
||||
}
|
||||
|
||||
// Find the downbeat for this window
|
||||
downbeatIndex := bb.GetDownbeatIndex(beatIndex, rs.barLength)
|
||||
|
||||
// Generate deterministic window ID per BACKBEAT-REQ-005
|
||||
return bb.GenerateWindowID(rs.clusterID, downbeatIndex)
|
||||
}
|
||||
|
||||
// getWindowStartBeat calculates the starting beat for a window containing the given beat
|
||||
func (rs *ReverbService) getWindowStartBeat(beatIndex int64) int64 {
|
||||
return bb.GetDownbeatIndex(beatIndex, rs.barLength)
|
||||
}
|
||||
|
||||
// cleanupRoutine periodically cleans up old window aggregations
|
||||
func (rs *ReverbService) cleanupRoutine() {
|
||||
ticker := time.NewTicker(rs.cleanupInterval)
|
||||
defer ticker.Stop()
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-rs.ctx.Done():
|
||||
return
|
||||
case <-ticker.C:
|
||||
rs.cleanupOldWindows()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// cleanupOldWindows removes expired window aggregations to prevent memory leaks
|
||||
func (rs *ReverbService) cleanupOldWindows() {
|
||||
rs.windowsMu.Lock()
|
||||
defer rs.windowsMu.Unlock()
|
||||
|
||||
now := time.Now()
|
||||
removedCount := 0
|
||||
|
||||
for windowID, window := range rs.windows {
|
||||
if now.Sub(window.LastUpdated) > rs.windowTTL {
|
||||
delete(rs.windows, windowID)
|
||||
removedCount++
|
||||
}
|
||||
}
|
||||
|
||||
// Also enforce maximum window retention
|
||||
if len(rs.windows) > rs.maxWindowsRetained {
|
||||
// Remove oldest windows beyond limit (simple approach)
|
||||
excess := len(rs.windows) - rs.maxWindowsRetained
|
||||
for windowID := range rs.windows {
|
||||
if excess <= 0 {
|
||||
break
|
||||
}
|
||||
delete(rs.windows, windowID)
|
||||
removedCount++
|
||||
excess--
|
||||
}
|
||||
}
|
||||
|
||||
if removedCount > 0 {
|
||||
log.Info().
|
||||
Int("removed_count", removedCount).
|
||||
Int("remaining_windows", len(rs.windows)).
|
||||
Msg("Cleaned up old window aggregations")
|
||||
}
|
||||
|
||||
// Update metrics
|
||||
rs.metrics.UpdateReverbActiveWindows(len(rs.windows))
|
||||
}
|
||||
|
||||
// startHTTPServer starts the HTTP server for health checks and metrics
|
||||
func (rs *ReverbService) startHTTPServer() {
|
||||
router := mux.NewRouter()
|
||||
|
||||
// Health endpoint
|
||||
router.HandleFunc("/health", rs.healthHandler).Methods("GET")
|
||||
router.HandleFunc("/ready", rs.readinessHandler).Methods("GET")
|
||||
|
||||
// Metrics endpoint
|
||||
router.Handle("/metrics", promhttp.Handler()).Methods("GET")
|
||||
|
||||
// Admin API endpoints
|
||||
router.HandleFunc("/api/v1/windows", rs.listWindowsHandler).Methods("GET")
|
||||
router.HandleFunc("/api/v1/windows/{windowId}", rs.getWindowHandler).Methods("GET")
|
||||
router.HandleFunc("/api/v1/status", rs.statusHandler).Methods("GET")
|
||||
|
||||
server := &http.Server{
|
||||
Addr: ":8080",
|
||||
Handler: router,
|
||||
ReadTimeout: 10 * time.Second,
|
||||
WriteTimeout: 10 * time.Second,
|
||||
}
|
||||
|
||||
log.Info().Str("address", ":8080").Msg("Starting HTTP server")
|
||||
|
||||
if err := server.ListenAndServe(); err != nil && err != http.ErrServerClosed {
|
||||
log.Error().Err(err).Msg("HTTP server error")
|
||||
}
|
||||
}
|
||||
|
||||
// Health check handlers
|
||||
func (rs *ReverbService) healthHandler(w http.ResponseWriter, r *http.Request) {
|
||||
w.Header().Set("Content-Type", "application/json")
|
||||
w.WriteHeader(http.StatusOK)
|
||||
json.NewEncoder(w).Encode(map[string]interface{}{
|
||||
"status": "healthy",
|
||||
"service": "backbeat-reverb",
|
||||
"cluster_id": rs.clusterID,
|
||||
"node_id": rs.nodeID,
|
||||
"timestamp": time.Now().UTC().Format(time.RFC3339),
|
||||
})
|
||||
}
|
||||
|
||||
func (rs *ReverbService) readinessHandler(w http.ResponseWriter, r *http.Request) {
|
||||
// Check NATS connection
|
||||
if !rs.natsConn.IsConnected() {
|
||||
w.WriteHeader(http.StatusServiceUnavailable)
|
||||
json.NewEncoder(w).Encode(map[string]string{
|
||||
"status": "not ready",
|
||||
"reason": "NATS connection lost",
|
||||
})
|
||||
return
|
||||
}
|
||||
|
||||
w.Header().Set("Content-Type", "application/json")
|
||||
w.WriteHeader(http.StatusOK)
|
||||
json.NewEncoder(w).Encode(map[string]interface{}{
|
||||
"status": "ready",
|
||||
"active_windows": len(rs.windows),
|
||||
"current_beat": rs.currentBeat,
|
||||
"current_window_id": rs.currentWindowID,
|
||||
})
|
||||
}
|
||||
|
||||
// Admin API handlers
|
||||
func (rs *ReverbService) listWindowsHandler(w http.ResponseWriter, r *http.Request) {
|
||||
rs.windowsMu.RLock()
|
||||
defer rs.windowsMu.RUnlock()
|
||||
|
||||
windows := make([]map[string]interface{}, 0, len(rs.windows))
|
||||
for windowID, window := range rs.windows {
|
||||
windows = append(windows, map[string]interface{}{
|
||||
"window_id": windowID,
|
||||
"from_beat": window.FromBeat,
|
||||
"to_beat": window.ToBeat,
|
||||
"claims_count": len(window.Claims),
|
||||
"agents_reporting": len(window.UniqueAgents),
|
||||
"last_updated": window.LastUpdated.UTC().Format(time.RFC3339),
|
||||
})
|
||||
}
|
||||
|
||||
w.Header().Set("Content-Type", "application/json")
|
||||
json.NewEncoder(w).Encode(map[string]interface{}{
|
||||
"windows": windows,
|
||||
"total_count": len(windows),
|
||||
})
|
||||
}
|
||||
|
||||
func (rs *ReverbService) getWindowHandler(w http.ResponseWriter, r *http.Request) {
|
||||
vars := mux.Vars(r)
|
||||
windowID := vars["windowId"]
|
||||
|
||||
rs.windowsMu.RLock()
|
||||
window, exists := rs.windows[windowID]
|
||||
rs.windowsMu.RUnlock()
|
||||
|
||||
if !exists {
|
||||
w.WriteHeader(http.StatusNotFound)
|
||||
json.NewEncoder(w).Encode(map[string]string{
|
||||
"error": "window not found",
|
||||
"window_id": windowID,
|
||||
})
|
||||
return
|
||||
}
|
||||
|
||||
// Generate current BarReport for this window
|
||||
barReport := window.GenerateBarReport(rs.clusterID)
|
||||
|
||||
w.Header().Set("Content-Type", "application/json")
|
||||
json.NewEncoder(w).Encode(map[string]interface{}{
|
||||
"window_aggregation": map[string]interface{}{
|
||||
"window_id": window.WindowID,
|
||||
"from_beat": window.FromBeat,
|
||||
"to_beat": window.ToBeat,
|
||||
"claims_count": len(window.Claims),
|
||||
"unique_agents": len(window.UniqueAgents),
|
||||
"state_counts": window.StateCounts,
|
||||
"completed_tasks": window.CompletedTasks,
|
||||
"failed_tasks": window.FailedTasks,
|
||||
"last_updated": window.LastUpdated.UTC().Format(time.RFC3339),
|
||||
},
|
||||
"current_bar_report": barReport,
|
||||
})
|
||||
}
|
||||
|
||||
func (rs *ReverbService) statusHandler(w http.ResponseWriter, r *http.Request) {
|
||||
w.Header().Set("Content-Type", "application/json")
|
||||
json.NewEncoder(w).Encode(map[string]interface{}{
|
||||
"service": "backbeat-reverb",
|
||||
"cluster_id": rs.clusterID,
|
||||
"node_id": rs.nodeID,
|
||||
"active_windows": len(rs.windows),
|
||||
"current_beat": rs.currentBeat,
|
||||
"current_window_id": rs.currentWindowID,
|
||||
"bar_length": rs.barLength,
|
||||
"window_ttl_seconds": int(rs.windowTTL.Seconds()),
|
||||
"max_windows_retained": rs.maxWindowsRetained,
|
||||
"nats_connected": rs.natsConn.IsConnected(),
|
||||
"uptime_seconds": time.Since(time.Now()).Seconds(), // Placeholder
|
||||
"version": "v1.0.0",
|
||||
"timestamp": time.Now().UTC().Format(time.RFC3339),
|
||||
})
|
||||
}
|
||||
|
||||
// Stop gracefully shuts down the reverb service
|
||||
func (rs *ReverbService) Stop() {
|
||||
log.Info().Msg("Stopping BACKBEAT reverb service")
|
||||
rs.cancel()
|
||||
close(rs.done)
|
||||
}
|
||||
|
||||
func main() {
|
||||
// Command line flags
|
||||
clusterID := flag.String("cluster", "chorus-aus-01", "Cluster identifier")
|
||||
natsURL := flag.String("nats", "nats://backbeat-nats:4222", "NATS server URL")
|
||||
nodeID := flag.String("node", "", "Node identifier (auto-generated if empty)")
|
||||
barLength := flag.Int("bar-length", 120, "Bar length in beats")
|
||||
logLevel := flag.String("log-level", "info", "Log level (debug, info, warn, error)")
|
||||
flag.Parse()
|
||||
|
||||
// Configure structured logging
|
||||
switch *logLevel {
|
||||
case "debug":
|
||||
zerolog.SetGlobalLevel(zerolog.DebugLevel)
|
||||
case "info":
|
||||
zerolog.SetGlobalLevel(zerolog.InfoLevel)
|
||||
case "warn":
|
||||
zerolog.SetGlobalLevel(zerolog.WarnLevel)
|
||||
case "error":
|
||||
zerolog.SetGlobalLevel(zerolog.ErrorLevel)
|
||||
default:
|
||||
zerolog.SetGlobalLevel(zerolog.InfoLevel)
|
||||
}
|
||||
|
||||
// Pretty logging in development
|
||||
if os.Getenv("BACKBEAT_ENV") != "production" {
|
||||
log.Logger = log.Output(zerolog.ConsoleWriter{Out: os.Stderr})
|
||||
}
|
||||
|
||||
// Generate node ID if not provided
|
||||
if *nodeID == "" {
|
||||
*nodeID = fmt.Sprintf("reverb-%d", time.Now().Unix())
|
||||
}
|
||||
|
||||
log.Info().
|
||||
Str("cluster_id", *clusterID).
|
||||
Str("node_id", *nodeID).
|
||||
Str("nats_url", *natsURL).
|
||||
Int("bar_length", *barLength).
|
||||
Msg("Starting BACKBEAT reverb service")
|
||||
|
||||
// Connect to NATS
|
||||
nc, err := nats.Connect(*natsURL,
|
||||
nats.Timeout(10*time.Second),
|
||||
nats.ReconnectWait(2*time.Second),
|
||||
nats.MaxReconnects(-1),
|
||||
nats.DisconnectErrHandler(func(nc *nats.Conn, err error) {
|
||||
log.Error().Err(err).Msg("NATS disconnected")
|
||||
}),
|
||||
nats.ReconnectHandler(func(nc *nats.Conn) {
|
||||
log.Info().Str("server", nc.ConnectedUrl()).Msg("NATS reconnected")
|
||||
}),
|
||||
)
|
||||
if err != nil {
|
||||
log.Fatal().Err(err).Msg("Failed to connect to NATS")
|
||||
}
|
||||
defer nc.Drain()
|
||||
|
||||
// Create and start reverb service
|
||||
service := NewReverbService(*clusterID, *nodeID, nc, *barLength)
|
||||
|
||||
if err := service.Start(); err != nil {
|
||||
log.Fatal().Err(err).Msg("Failed to start reverb service")
|
||||
}
|
||||
|
||||
// Handle graceful shutdown
|
||||
sigChan := make(chan os.Signal, 1)
|
||||
signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM)
|
||||
|
||||
log.Info().Msg("BACKBEAT reverb service is running. Press Ctrl+C to exit.")
|
||||
|
||||
// Wait for shutdown signal
|
||||
<-sigChan
|
||||
log.Info().Msg("Shutdown signal received")
|
||||
|
||||
// Graceful shutdown
|
||||
service.Stop()
|
||||
|
||||
// Wait for background tasks to complete
|
||||
select {
|
||||
case <-service.done:
|
||||
log.Info().Msg("BACKBEAT reverb service stopped gracefully")
|
||||
case <-time.After(30 * time.Second):
|
||||
log.Warn().Msg("Shutdown timeout exceeded")
|
||||
}
|
||||
}
|
||||
36
BACKBEAT-prototype/cmd/sdk-examples/main.go
Normal file
36
BACKBEAT-prototype/cmd/sdk-examples/main.go
Normal file
@@ -0,0 +1,36 @@
|
||||
// Command sdk-examples provides executable examples of BACKBEAT SDK usage
|
||||
package main
|
||||
|
||||
import (
|
||||
"flag"
|
||||
"fmt"
|
||||
"os"
|
||||
|
||||
"github.com/chorus-services/backbeat/pkg/sdk/examples"
|
||||
)
|
||||
|
||||
func main() {
|
||||
var exampleName string
|
||||
flag.StringVar(&exampleName, "example", "simple", "Example to run: simple, task-processor, service-monitor")
|
||||
flag.Parse()
|
||||
|
||||
fmt.Printf("Running BACKBEAT SDK example: %s\n", exampleName)
|
||||
fmt.Println("Press Ctrl+C to stop")
|
||||
fmt.Println()
|
||||
|
||||
switch exampleName {
|
||||
case "simple":
|
||||
examples.SimpleAgent()
|
||||
case "task-processor":
|
||||
examples.TaskProcessor()
|
||||
case "service-monitor":
|
||||
examples.ServiceMonitor()
|
||||
default:
|
||||
fmt.Printf("Unknown example: %s\n", exampleName)
|
||||
fmt.Println("Available examples:")
|
||||
fmt.Println(" simple - Basic beat subscription and status emission")
|
||||
fmt.Println(" task-processor - Beat budget usage for task timeout management")
|
||||
fmt.Println(" service-monitor - Health monitoring with beat-aligned reporting")
|
||||
os.Exit(1)
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user