package backbeat import ( "context" "fmt" "log/slog" "time" "github.com/chorus-services/backbeat/pkg/sdk" "github.com/chorus-services/whoosh/internal/config" "github.com/rs/zerolog" "github.com/rs/zerolog/log" ) // Integration manages WHOOSH's integration with the BACKBEAT timing system type Integration struct { client sdk.Client config *config.BackbeatConfig logger *slog.Logger ctx context.Context cancel context.CancelFunc started bool // Search operation tracking activeSearches map[string]*SearchOperation } // SearchOperation tracks a search operation's progress through BACKBEAT type SearchOperation struct { ID string Query string StartBeat int64 EstimatedBeats int Phase SearchPhase Results int StartTime time.Time } // SearchPhase represents the current phase of a search operation type SearchPhase int const ( PhaseStarted SearchPhase = iota PhaseIndexing PhaseQuerying PhaseRanking PhaseCompleted PhaseFailed ) func (p SearchPhase) String() string { switch p { case PhaseStarted: return "started" case PhaseIndexing: return "indexing" case PhaseQuerying: return "querying" case PhaseRanking: return "ranking" case PhaseCompleted: return "completed" case PhaseFailed: return "failed" default: return "unknown" } } // NewIntegration creates a new BACKBEAT integration for WHOOSH func NewIntegration(cfg *config.BackbeatConfig) (*Integration, error) { if !cfg.Enabled { return nil, fmt.Errorf("BACKBEAT integration is disabled") } // Convert zerolog to slog for BACKBEAT SDK compatibility slogger := slog.New(&zerologHandler{logger: log.Logger}) // Create BACKBEAT SDK config sdkConfig := sdk.DefaultConfig() sdkConfig.ClusterID = cfg.ClusterID sdkConfig.AgentID = cfg.AgentID sdkConfig.NATSUrl = cfg.NATSUrl sdkConfig.Logger = slogger // Create SDK client client := sdk.NewClient(sdkConfig) return &Integration{ client: client, config: cfg, logger: slogger, activeSearches: make(map[string]*SearchOperation), }, nil } // Start initializes the BACKBEAT integration func (i *Integration) Start(ctx context.Context) error { if i.started { return fmt.Errorf("integration already started") } i.ctx, i.cancel = context.WithCancel(ctx) // Start the SDK client if err := i.client.Start(i.ctx); err != nil { return fmt.Errorf("failed to start BACKBEAT client: %w", err) } // Register beat callbacks if err := i.client.OnBeat(i.onBeat); err != nil { return fmt.Errorf("failed to register beat callback: %w", err) } if err := i.client.OnDownbeat(i.onDownbeat); err != nil { return fmt.Errorf("failed to register downbeat callback: %w", err) } i.started = true log.Info(). Str("cluster_id", i.config.ClusterID). Str("agent_id", i.config.AgentID). Msg("🎵 WHOOSH BACKBEAT integration started") return nil } // Stop gracefully shuts down the BACKBEAT integration func (i *Integration) Stop() error { if !i.started { return nil } if i.cancel != nil { i.cancel() } if err := i.client.Stop(); err != nil { log.Warn().Err(err).Msg("Error stopping BACKBEAT client") } i.started = false log.Info().Msg("🎵 WHOOSH BACKBEAT integration stopped") return nil } // onBeat handles regular beat events from BACKBEAT func (i *Integration) onBeat(beat sdk.BeatFrame) { log.Debug(). Int64("beat_index", beat.BeatIndex). Str("phase", beat.Phase). Int("tempo_bpm", beat.TempoBPM). Str("window_id", beat.WindowID). Bool("downbeat", beat.Downbeat). Msg("🥁 BACKBEAT beat received") // Emit status claim for active searches for _, search := range i.activeSearches { i.emitSearchStatus(search) } // Periodic health status emission if beat.BeatIndex%8 == 0 { // Every 8 beats (4 minutes at 2 BPM) i.emitHealthStatus() } } // onDownbeat handles downbeat (bar start) events func (i *Integration) onDownbeat(beat sdk.BeatFrame) { log.Info(). Int64("beat_index", beat.BeatIndex). Str("phase", beat.Phase). Str("window_id", beat.WindowID). Msg("🎼 BACKBEAT downbeat - new bar started") // Cleanup completed searches on downbeat i.cleanupCompletedSearches() } // StartSearch registers a new search operation with BACKBEAT func (i *Integration) StartSearch(searchID, query string, estimatedBeats int) error { if !i.started { return fmt.Errorf("BACKBEAT integration not started") } search := &SearchOperation{ ID: searchID, Query: query, StartBeat: i.client.GetCurrentBeat(), EstimatedBeats: estimatedBeats, Phase: PhaseStarted, StartTime: time.Now(), } i.activeSearches[searchID] = search // Emit initial status claim return i.emitSearchStatus(search) } // UpdateSearchPhase updates the phase of an active search func (i *Integration) UpdateSearchPhase(searchID string, phase SearchPhase, results int) error { search, exists := i.activeSearches[searchID] if !exists { return fmt.Errorf("search %s not found", searchID) } search.Phase = phase search.Results = results // Emit updated status claim return i.emitSearchStatus(search) } // CompleteSearch marks a search operation as completed func (i *Integration) CompleteSearch(searchID string, results int) error { search, exists := i.activeSearches[searchID] if !exists { return fmt.Errorf("search %s not found", searchID) } search.Phase = PhaseCompleted search.Results = results // Emit completion status claim if err := i.emitSearchStatus(search); err != nil { return err } // Remove from active searches delete(i.activeSearches, searchID) return nil } // FailSearch marks a search operation as failed func (i *Integration) FailSearch(searchID string, reason string) error { search, exists := i.activeSearches[searchID] if !exists { return fmt.Errorf("search %s not found", searchID) } search.Phase = PhaseFailed // Emit failure status claim claim := sdk.StatusClaim{ State: "failed", BeatsLeft: 0, Progress: 0.0, Notes: fmt.Sprintf("Search failed: %s (query: %s)", reason, search.Query), } if err := i.client.EmitStatusClaim(claim); err != nil { return fmt.Errorf("failed to emit failure status: %w", err) } // Remove from active searches delete(i.activeSearches, searchID) return nil } // emitSearchStatus emits a status claim for a search operation func (i *Integration) emitSearchStatus(search *SearchOperation) error { currentBeat := i.client.GetCurrentBeat() beatsPassed := currentBeat - search.StartBeat beatsLeft := search.EstimatedBeats - int(beatsPassed) if beatsLeft < 0 { beatsLeft = 0 } progress := float64(beatsPassed) / float64(search.EstimatedBeats) if progress > 1.0 { progress = 1.0 } state := "executing" if search.Phase == PhaseCompleted { state = "done" progress = 1.0 beatsLeft = 0 } else if search.Phase == PhaseFailed { state = "failed" progress = 0.0 beatsLeft = 0 } claim := sdk.StatusClaim{ TaskID: search.ID, State: state, BeatsLeft: beatsLeft, Progress: progress, Notes: fmt.Sprintf("Search %s: %s (query: %s, results: %d)", search.Phase.String(), search.ID, search.Query, search.Results), } return i.client.EmitStatusClaim(claim) } // emitHealthStatus emits a general health status claim func (i *Integration) emitHealthStatus() error { health := i.client.Health() state := "waiting" if len(i.activeSearches) > 0 { state = "executing" } notes := fmt.Sprintf("WHOOSH healthy: connected=%v, searches=%d, tempo=%d BPM", health.Connected, len(i.activeSearches), health.CurrentTempo) if len(health.Errors) > 0 { state = "failed" notes += fmt.Sprintf(", errors: %d", len(health.Errors)) } claim := sdk.StatusClaim{ TaskID: "whoosh-health", State: state, BeatsLeft: 0, Progress: 1.0, Notes: notes, } return i.client.EmitStatusClaim(claim) } // cleanupCompletedSearches removes old completed searches func (i *Integration) cleanupCompletedSearches() { // This is called on downbeat, cleanup already happens in CompleteSearch/FailSearch log.Debug().Int("active_searches", len(i.activeSearches)).Msg("Active searches cleanup check") } // GetHealth returns the current BACKBEAT integration health func (i *Integration) GetHealth() map[string]interface{} { if !i.started { return map[string]interface{}{ "enabled": i.config.Enabled, "started": false, "connected": false, } } health := i.client.Health() return map[string]interface{}{ "enabled": i.config.Enabled, "started": i.started, "connected": health.Connected, "current_beat": health.LastBeat, "current_tempo": health.CurrentTempo, "measured_bpm": health.MeasuredBPM, "tempo_drift": health.TempoDrift.String(), "reconnect_count": health.ReconnectCount, "active_searches": len(i.activeSearches), "local_degradation": health.LocalDegradation, "errors": health.Errors, } } // ExecuteWithBeatBudget executes a function with a BACKBEAT beat budget func (i *Integration) ExecuteWithBeatBudget(beats int, fn func() error) error { if !i.started { return fn() // Fall back to regular execution if not started } return i.client.WithBeatBudget(beats, fn) } // zerologHandler adapts zerolog to slog.Handler interface type zerologHandler struct { logger zerolog.Logger } func (h *zerologHandler) Enabled(ctx context.Context, level slog.Level) bool { return true } func (h *zerologHandler) Handle(ctx context.Context, record slog.Record) error { var event *zerolog.Event switch record.Level { case slog.LevelDebug: event = h.logger.Debug() case slog.LevelInfo: event = h.logger.Info() case slog.LevelWarn: event = h.logger.Warn() case slog.LevelError: event = h.logger.Error() default: event = h.logger.Info() } record.Attrs(func(attr slog.Attr) bool { event = event.Interface(attr.Key, attr.Value.Any()) return true }) event.Msg(record.Message) return nil } func (h *zerologHandler) WithAttrs(attrs []slog.Attr) slog.Handler { return h } func (h *zerologHandler) WithGroup(name string) slog.Handler { return h }