Files
WHOOSH/internal/backbeat/integration.go
Claude Code 33676bae6d Add WHOOSH search service with BACKBEAT integration
Complete implementation:
- Go-based search service with PostgreSQL and Redis backend
- BACKBEAT SDK integration for beat-aware search operations
- Docker containerization with multi-stage builds
- Comprehensive API endpoints for project analysis and search
- Database migrations and schema management
- GITEA integration for repository management
- Team composition analysis and recommendations

Key features:
- Beat-synchronized search operations with timing coordination
- Phase-based operation tracking (started → querying → ranking → completed)
- Docker Swarm deployment configuration
- Health checks and monitoring
- Secure configuration with environment variables

Architecture:
- Microservice design with clean API boundaries
- Background processing for long-running analysis
- Modular internal structure with proper separation of concerns
- Integration with CHORUS ecosystem via BACKBEAT timing

🤖 Generated with [Claude Code](https://claude.ai/code)

Co-Authored-By: Claude <noreply@anthropic.com>
2025-09-06 11:16:39 +10:00

406 lines
9.9 KiB
Go

package backbeat
import (
"context"
"fmt"
"log/slog"
"time"
"github.com/chorus-services/backbeat/pkg/sdk"
"github.com/chorus-services/whoosh/internal/config"
"github.com/rs/zerolog"
"github.com/rs/zerolog/log"
)
// Integration manages WHOOSH's integration with the BACKBEAT timing system
type Integration struct {
client sdk.Client
config *config.BackbeatConfig
logger *slog.Logger
ctx context.Context
cancel context.CancelFunc
started bool
// Search operation tracking
activeSearches map[string]*SearchOperation
}
// SearchOperation tracks a search operation's progress through BACKBEAT
type SearchOperation struct {
ID string
Query string
StartBeat int64
EstimatedBeats int
Phase SearchPhase
Results int
StartTime time.Time
}
// SearchPhase represents the current phase of a search operation
type SearchPhase int
const (
PhaseStarted SearchPhase = iota
PhaseIndexing
PhaseQuerying
PhaseRanking
PhaseCompleted
PhaseFailed
)
func (p SearchPhase) String() string {
switch p {
case PhaseStarted:
return "started"
case PhaseIndexing:
return "indexing"
case PhaseQuerying:
return "querying"
case PhaseRanking:
return "ranking"
case PhaseCompleted:
return "completed"
case PhaseFailed:
return "failed"
default:
return "unknown"
}
}
// NewIntegration creates a new BACKBEAT integration for WHOOSH
func NewIntegration(cfg *config.BackbeatConfig) (*Integration, error) {
if !cfg.Enabled {
return nil, fmt.Errorf("BACKBEAT integration is disabled")
}
// Convert zerolog to slog for BACKBEAT SDK compatibility
slogger := slog.New(&zerologHandler{logger: log.Logger})
// Create BACKBEAT SDK config
sdkConfig := sdk.DefaultConfig()
sdkConfig.ClusterID = cfg.ClusterID
sdkConfig.AgentID = cfg.AgentID
sdkConfig.NATSUrl = cfg.NATSUrl
sdkConfig.Logger = slogger
// Create SDK client
client := sdk.NewClient(sdkConfig)
return &Integration{
client: client,
config: cfg,
logger: slogger,
activeSearches: make(map[string]*SearchOperation),
}, nil
}
// Start initializes the BACKBEAT integration
func (i *Integration) Start(ctx context.Context) error {
if i.started {
return fmt.Errorf("integration already started")
}
i.ctx, i.cancel = context.WithCancel(ctx)
// Start the SDK client
if err := i.client.Start(i.ctx); err != nil {
return fmt.Errorf("failed to start BACKBEAT client: %w", err)
}
// Register beat callbacks
if err := i.client.OnBeat(i.onBeat); err != nil {
return fmt.Errorf("failed to register beat callback: %w", err)
}
if err := i.client.OnDownbeat(i.onDownbeat); err != nil {
return fmt.Errorf("failed to register downbeat callback: %w", err)
}
i.started = true
log.Info().
Str("cluster_id", i.config.ClusterID).
Str("agent_id", i.config.AgentID).
Msg("🎵 WHOOSH BACKBEAT integration started")
return nil
}
// Stop gracefully shuts down the BACKBEAT integration
func (i *Integration) Stop() error {
if !i.started {
return nil
}
if i.cancel != nil {
i.cancel()
}
if err := i.client.Stop(); err != nil {
log.Warn().Err(err).Msg("Error stopping BACKBEAT client")
}
i.started = false
log.Info().Msg("🎵 WHOOSH BACKBEAT integration stopped")
return nil
}
// onBeat handles regular beat events from BACKBEAT
func (i *Integration) onBeat(beat sdk.BeatFrame) {
log.Debug().
Int64("beat_index", beat.BeatIndex).
Str("phase", beat.Phase).
Int("tempo_bpm", beat.TempoBPM).
Str("window_id", beat.WindowID).
Bool("downbeat", beat.Downbeat).
Msg("🥁 BACKBEAT beat received")
// Emit status claim for active searches
for _, search := range i.activeSearches {
i.emitSearchStatus(search)
}
// Periodic health status emission
if beat.BeatIndex%8 == 0 { // Every 8 beats (4 minutes at 2 BPM)
i.emitHealthStatus()
}
}
// onDownbeat handles downbeat (bar start) events
func (i *Integration) onDownbeat(beat sdk.BeatFrame) {
log.Info().
Int64("beat_index", beat.BeatIndex).
Str("phase", beat.Phase).
Str("window_id", beat.WindowID).
Msg("🎼 BACKBEAT downbeat - new bar started")
// Cleanup completed searches on downbeat
i.cleanupCompletedSearches()
}
// StartSearch registers a new search operation with BACKBEAT
func (i *Integration) StartSearch(searchID, query string, estimatedBeats int) error {
if !i.started {
return fmt.Errorf("BACKBEAT integration not started")
}
search := &SearchOperation{
ID: searchID,
Query: query,
StartBeat: i.client.GetCurrentBeat(),
EstimatedBeats: estimatedBeats,
Phase: PhaseStarted,
StartTime: time.Now(),
}
i.activeSearches[searchID] = search
// Emit initial status claim
return i.emitSearchStatus(search)
}
// UpdateSearchPhase updates the phase of an active search
func (i *Integration) UpdateSearchPhase(searchID string, phase SearchPhase, results int) error {
search, exists := i.activeSearches[searchID]
if !exists {
return fmt.Errorf("search %s not found", searchID)
}
search.Phase = phase
search.Results = results
// Emit updated status claim
return i.emitSearchStatus(search)
}
// CompleteSearch marks a search operation as completed
func (i *Integration) CompleteSearch(searchID string, results int) error {
search, exists := i.activeSearches[searchID]
if !exists {
return fmt.Errorf("search %s not found", searchID)
}
search.Phase = PhaseCompleted
search.Results = results
// Emit completion status claim
if err := i.emitSearchStatus(search); err != nil {
return err
}
// Remove from active searches
delete(i.activeSearches, searchID)
return nil
}
// FailSearch marks a search operation as failed
func (i *Integration) FailSearch(searchID string, reason string) error {
search, exists := i.activeSearches[searchID]
if !exists {
return fmt.Errorf("search %s not found", searchID)
}
search.Phase = PhaseFailed
// Emit failure status claim
claim := sdk.StatusClaim{
State: "failed",
BeatsLeft: 0,
Progress: 0.0,
Notes: fmt.Sprintf("Search failed: %s (query: %s)", reason, search.Query),
}
if err := i.client.EmitStatusClaim(claim); err != nil {
return fmt.Errorf("failed to emit failure status: %w", err)
}
// Remove from active searches
delete(i.activeSearches, searchID)
return nil
}
// emitSearchStatus emits a status claim for a search operation
func (i *Integration) emitSearchStatus(search *SearchOperation) error {
currentBeat := i.client.GetCurrentBeat()
beatsPassed := currentBeat - search.StartBeat
beatsLeft := search.EstimatedBeats - int(beatsPassed)
if beatsLeft < 0 {
beatsLeft = 0
}
progress := float64(beatsPassed) / float64(search.EstimatedBeats)
if progress > 1.0 {
progress = 1.0
}
state := "executing"
if search.Phase == PhaseCompleted {
state = "done"
progress = 1.0
beatsLeft = 0
} else if search.Phase == PhaseFailed {
state = "failed"
progress = 0.0
beatsLeft = 0
}
claim := sdk.StatusClaim{
TaskID: search.ID,
State: state,
BeatsLeft: beatsLeft,
Progress: progress,
Notes: fmt.Sprintf("Search %s: %s (query: %s, results: %d)", search.Phase.String(), search.ID, search.Query, search.Results),
}
return i.client.EmitStatusClaim(claim)
}
// emitHealthStatus emits a general health status claim
func (i *Integration) emitHealthStatus() error {
health := i.client.Health()
state := "waiting"
if len(i.activeSearches) > 0 {
state = "executing"
}
notes := fmt.Sprintf("WHOOSH healthy: connected=%v, searches=%d, tempo=%d BPM",
health.Connected, len(i.activeSearches), health.CurrentTempo)
if len(health.Errors) > 0 {
state = "failed"
notes += fmt.Sprintf(", errors: %d", len(health.Errors))
}
claim := sdk.StatusClaim{
TaskID: "whoosh-health",
State: state,
BeatsLeft: 0,
Progress: 1.0,
Notes: notes,
}
return i.client.EmitStatusClaim(claim)
}
// cleanupCompletedSearches removes old completed searches
func (i *Integration) cleanupCompletedSearches() {
// This is called on downbeat, cleanup already happens in CompleteSearch/FailSearch
log.Debug().Int("active_searches", len(i.activeSearches)).Msg("Active searches cleanup check")
}
// GetHealth returns the current BACKBEAT integration health
func (i *Integration) GetHealth() map[string]interface{} {
if !i.started {
return map[string]interface{}{
"enabled": i.config.Enabled,
"started": false,
"connected": false,
}
}
health := i.client.Health()
return map[string]interface{}{
"enabled": i.config.Enabled,
"started": i.started,
"connected": health.Connected,
"current_beat": health.LastBeat,
"current_tempo": health.CurrentTempo,
"measured_bpm": health.MeasuredBPM,
"tempo_drift": health.TempoDrift.String(),
"reconnect_count": health.ReconnectCount,
"active_searches": len(i.activeSearches),
"local_degradation": health.LocalDegradation,
"errors": health.Errors,
}
}
// ExecuteWithBeatBudget executes a function with a BACKBEAT beat budget
func (i *Integration) ExecuteWithBeatBudget(beats int, fn func() error) error {
if !i.started {
return fn() // Fall back to regular execution if not started
}
return i.client.WithBeatBudget(beats, fn)
}
// zerologHandler adapts zerolog to slog.Handler interface
type zerologHandler struct {
logger zerolog.Logger
}
func (h *zerologHandler) Enabled(ctx context.Context, level slog.Level) bool {
return true
}
func (h *zerologHandler) Handle(ctx context.Context, record slog.Record) error {
var event *zerolog.Event
switch record.Level {
case slog.LevelDebug:
event = h.logger.Debug()
case slog.LevelInfo:
event = h.logger.Info()
case slog.LevelWarn:
event = h.logger.Warn()
case slog.LevelError:
event = h.logger.Error()
default:
event = h.logger.Info()
}
record.Attrs(func(attr slog.Attr) bool {
event = event.Interface(attr.Key, attr.Value.Any())
return true
})
event.Msg(record.Message)
return nil
}
func (h *zerologHandler) WithAttrs(attrs []slog.Attr) slog.Handler {
return h
}
func (h *zerologHandler) WithGroup(name string) slog.Handler {
return h
}