Complete implementation: - Go-based search service with PostgreSQL and Redis backend - BACKBEAT SDK integration for beat-aware search operations - Docker containerization with multi-stage builds - Comprehensive API endpoints for project analysis and search - Database migrations and schema management - GITEA integration for repository management - Team composition analysis and recommendations Key features: - Beat-synchronized search operations with timing coordination - Phase-based operation tracking (started → querying → ranking → completed) - Docker Swarm deployment configuration - Health checks and monitoring - Secure configuration with environment variables Architecture: - Microservice design with clean API boundaries - Background processing for long-running analysis - Modular internal structure with proper separation of concerns - Integration with CHORUS ecosystem via BACKBEAT timing 🤖 Generated with [Claude Code](https://claude.ai/code) Co-Authored-By: Claude <noreply@anthropic.com>
406 lines
9.9 KiB
Go
406 lines
9.9 KiB
Go
package backbeat
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"log/slog"
|
|
"time"
|
|
|
|
"github.com/chorus-services/backbeat/pkg/sdk"
|
|
"github.com/chorus-services/whoosh/internal/config"
|
|
"github.com/rs/zerolog"
|
|
"github.com/rs/zerolog/log"
|
|
)
|
|
|
|
// Integration manages WHOOSH's integration with the BACKBEAT timing system
|
|
type Integration struct {
|
|
client sdk.Client
|
|
config *config.BackbeatConfig
|
|
logger *slog.Logger
|
|
ctx context.Context
|
|
cancel context.CancelFunc
|
|
started bool
|
|
|
|
// Search operation tracking
|
|
activeSearches map[string]*SearchOperation
|
|
}
|
|
|
|
// SearchOperation tracks a search operation's progress through BACKBEAT
|
|
type SearchOperation struct {
|
|
ID string
|
|
Query string
|
|
StartBeat int64
|
|
EstimatedBeats int
|
|
Phase SearchPhase
|
|
Results int
|
|
StartTime time.Time
|
|
}
|
|
|
|
// SearchPhase represents the current phase of a search operation
|
|
type SearchPhase int
|
|
|
|
const (
|
|
PhaseStarted SearchPhase = iota
|
|
PhaseIndexing
|
|
PhaseQuerying
|
|
PhaseRanking
|
|
PhaseCompleted
|
|
PhaseFailed
|
|
)
|
|
|
|
func (p SearchPhase) String() string {
|
|
switch p {
|
|
case PhaseStarted:
|
|
return "started"
|
|
case PhaseIndexing:
|
|
return "indexing"
|
|
case PhaseQuerying:
|
|
return "querying"
|
|
case PhaseRanking:
|
|
return "ranking"
|
|
case PhaseCompleted:
|
|
return "completed"
|
|
case PhaseFailed:
|
|
return "failed"
|
|
default:
|
|
return "unknown"
|
|
}
|
|
}
|
|
|
|
// NewIntegration creates a new BACKBEAT integration for WHOOSH
|
|
func NewIntegration(cfg *config.BackbeatConfig) (*Integration, error) {
|
|
if !cfg.Enabled {
|
|
return nil, fmt.Errorf("BACKBEAT integration is disabled")
|
|
}
|
|
|
|
// Convert zerolog to slog for BACKBEAT SDK compatibility
|
|
slogger := slog.New(&zerologHandler{logger: log.Logger})
|
|
|
|
// Create BACKBEAT SDK config
|
|
sdkConfig := sdk.DefaultConfig()
|
|
sdkConfig.ClusterID = cfg.ClusterID
|
|
sdkConfig.AgentID = cfg.AgentID
|
|
sdkConfig.NATSUrl = cfg.NATSUrl
|
|
sdkConfig.Logger = slogger
|
|
|
|
// Create SDK client
|
|
client := sdk.NewClient(sdkConfig)
|
|
|
|
return &Integration{
|
|
client: client,
|
|
config: cfg,
|
|
logger: slogger,
|
|
activeSearches: make(map[string]*SearchOperation),
|
|
}, nil
|
|
}
|
|
|
|
// Start initializes the BACKBEAT integration
|
|
func (i *Integration) Start(ctx context.Context) error {
|
|
if i.started {
|
|
return fmt.Errorf("integration already started")
|
|
}
|
|
|
|
i.ctx, i.cancel = context.WithCancel(ctx)
|
|
|
|
// Start the SDK client
|
|
if err := i.client.Start(i.ctx); err != nil {
|
|
return fmt.Errorf("failed to start BACKBEAT client: %w", err)
|
|
}
|
|
|
|
// Register beat callbacks
|
|
if err := i.client.OnBeat(i.onBeat); err != nil {
|
|
return fmt.Errorf("failed to register beat callback: %w", err)
|
|
}
|
|
|
|
if err := i.client.OnDownbeat(i.onDownbeat); err != nil {
|
|
return fmt.Errorf("failed to register downbeat callback: %w", err)
|
|
}
|
|
|
|
i.started = true
|
|
log.Info().
|
|
Str("cluster_id", i.config.ClusterID).
|
|
Str("agent_id", i.config.AgentID).
|
|
Msg("🎵 WHOOSH BACKBEAT integration started")
|
|
|
|
return nil
|
|
}
|
|
|
|
// Stop gracefully shuts down the BACKBEAT integration
|
|
func (i *Integration) Stop() error {
|
|
if !i.started {
|
|
return nil
|
|
}
|
|
|
|
if i.cancel != nil {
|
|
i.cancel()
|
|
}
|
|
|
|
if err := i.client.Stop(); err != nil {
|
|
log.Warn().Err(err).Msg("Error stopping BACKBEAT client")
|
|
}
|
|
|
|
i.started = false
|
|
log.Info().Msg("🎵 WHOOSH BACKBEAT integration stopped")
|
|
return nil
|
|
}
|
|
|
|
// onBeat handles regular beat events from BACKBEAT
|
|
func (i *Integration) onBeat(beat sdk.BeatFrame) {
|
|
log.Debug().
|
|
Int64("beat_index", beat.BeatIndex).
|
|
Str("phase", beat.Phase).
|
|
Int("tempo_bpm", beat.TempoBPM).
|
|
Str("window_id", beat.WindowID).
|
|
Bool("downbeat", beat.Downbeat).
|
|
Msg("🥁 BACKBEAT beat received")
|
|
|
|
// Emit status claim for active searches
|
|
for _, search := range i.activeSearches {
|
|
i.emitSearchStatus(search)
|
|
}
|
|
|
|
// Periodic health status emission
|
|
if beat.BeatIndex%8 == 0 { // Every 8 beats (4 minutes at 2 BPM)
|
|
i.emitHealthStatus()
|
|
}
|
|
}
|
|
|
|
// onDownbeat handles downbeat (bar start) events
|
|
func (i *Integration) onDownbeat(beat sdk.BeatFrame) {
|
|
log.Info().
|
|
Int64("beat_index", beat.BeatIndex).
|
|
Str("phase", beat.Phase).
|
|
Str("window_id", beat.WindowID).
|
|
Msg("🎼 BACKBEAT downbeat - new bar started")
|
|
|
|
// Cleanup completed searches on downbeat
|
|
i.cleanupCompletedSearches()
|
|
}
|
|
|
|
// StartSearch registers a new search operation with BACKBEAT
|
|
func (i *Integration) StartSearch(searchID, query string, estimatedBeats int) error {
|
|
if !i.started {
|
|
return fmt.Errorf("BACKBEAT integration not started")
|
|
}
|
|
|
|
search := &SearchOperation{
|
|
ID: searchID,
|
|
Query: query,
|
|
StartBeat: i.client.GetCurrentBeat(),
|
|
EstimatedBeats: estimatedBeats,
|
|
Phase: PhaseStarted,
|
|
StartTime: time.Now(),
|
|
}
|
|
|
|
i.activeSearches[searchID] = search
|
|
|
|
// Emit initial status claim
|
|
return i.emitSearchStatus(search)
|
|
}
|
|
|
|
// UpdateSearchPhase updates the phase of an active search
|
|
func (i *Integration) UpdateSearchPhase(searchID string, phase SearchPhase, results int) error {
|
|
search, exists := i.activeSearches[searchID]
|
|
if !exists {
|
|
return fmt.Errorf("search %s not found", searchID)
|
|
}
|
|
|
|
search.Phase = phase
|
|
search.Results = results
|
|
|
|
// Emit updated status claim
|
|
return i.emitSearchStatus(search)
|
|
}
|
|
|
|
// CompleteSearch marks a search operation as completed
|
|
func (i *Integration) CompleteSearch(searchID string, results int) error {
|
|
search, exists := i.activeSearches[searchID]
|
|
if !exists {
|
|
return fmt.Errorf("search %s not found", searchID)
|
|
}
|
|
|
|
search.Phase = PhaseCompleted
|
|
search.Results = results
|
|
|
|
// Emit completion status claim
|
|
if err := i.emitSearchStatus(search); err != nil {
|
|
return err
|
|
}
|
|
|
|
// Remove from active searches
|
|
delete(i.activeSearches, searchID)
|
|
return nil
|
|
}
|
|
|
|
// FailSearch marks a search operation as failed
|
|
func (i *Integration) FailSearch(searchID string, reason string) error {
|
|
search, exists := i.activeSearches[searchID]
|
|
if !exists {
|
|
return fmt.Errorf("search %s not found", searchID)
|
|
}
|
|
|
|
search.Phase = PhaseFailed
|
|
|
|
// Emit failure status claim
|
|
claim := sdk.StatusClaim{
|
|
State: "failed",
|
|
BeatsLeft: 0,
|
|
Progress: 0.0,
|
|
Notes: fmt.Sprintf("Search failed: %s (query: %s)", reason, search.Query),
|
|
}
|
|
|
|
if err := i.client.EmitStatusClaim(claim); err != nil {
|
|
return fmt.Errorf("failed to emit failure status: %w", err)
|
|
}
|
|
|
|
// Remove from active searches
|
|
delete(i.activeSearches, searchID)
|
|
return nil
|
|
}
|
|
|
|
// emitSearchStatus emits a status claim for a search operation
|
|
func (i *Integration) emitSearchStatus(search *SearchOperation) error {
|
|
currentBeat := i.client.GetCurrentBeat()
|
|
beatsPassed := currentBeat - search.StartBeat
|
|
beatsLeft := search.EstimatedBeats - int(beatsPassed)
|
|
if beatsLeft < 0 {
|
|
beatsLeft = 0
|
|
}
|
|
|
|
progress := float64(beatsPassed) / float64(search.EstimatedBeats)
|
|
if progress > 1.0 {
|
|
progress = 1.0
|
|
}
|
|
|
|
state := "executing"
|
|
if search.Phase == PhaseCompleted {
|
|
state = "done"
|
|
progress = 1.0
|
|
beatsLeft = 0
|
|
} else if search.Phase == PhaseFailed {
|
|
state = "failed"
|
|
progress = 0.0
|
|
beatsLeft = 0
|
|
}
|
|
|
|
claim := sdk.StatusClaim{
|
|
TaskID: search.ID,
|
|
State: state,
|
|
BeatsLeft: beatsLeft,
|
|
Progress: progress,
|
|
Notes: fmt.Sprintf("Search %s: %s (query: %s, results: %d)", search.Phase.String(), search.ID, search.Query, search.Results),
|
|
}
|
|
|
|
return i.client.EmitStatusClaim(claim)
|
|
}
|
|
|
|
// emitHealthStatus emits a general health status claim
|
|
func (i *Integration) emitHealthStatus() error {
|
|
health := i.client.Health()
|
|
|
|
state := "waiting"
|
|
if len(i.activeSearches) > 0 {
|
|
state = "executing"
|
|
}
|
|
|
|
notes := fmt.Sprintf("WHOOSH healthy: connected=%v, searches=%d, tempo=%d BPM",
|
|
health.Connected, len(i.activeSearches), health.CurrentTempo)
|
|
|
|
if len(health.Errors) > 0 {
|
|
state = "failed"
|
|
notes += fmt.Sprintf(", errors: %d", len(health.Errors))
|
|
}
|
|
|
|
claim := sdk.StatusClaim{
|
|
TaskID: "whoosh-health",
|
|
State: state,
|
|
BeatsLeft: 0,
|
|
Progress: 1.0,
|
|
Notes: notes,
|
|
}
|
|
|
|
return i.client.EmitStatusClaim(claim)
|
|
}
|
|
|
|
// cleanupCompletedSearches removes old completed searches
|
|
func (i *Integration) cleanupCompletedSearches() {
|
|
// This is called on downbeat, cleanup already happens in CompleteSearch/FailSearch
|
|
log.Debug().Int("active_searches", len(i.activeSearches)).Msg("Active searches cleanup check")
|
|
}
|
|
|
|
// GetHealth returns the current BACKBEAT integration health
|
|
func (i *Integration) GetHealth() map[string]interface{} {
|
|
if !i.started {
|
|
return map[string]interface{}{
|
|
"enabled": i.config.Enabled,
|
|
"started": false,
|
|
"connected": false,
|
|
}
|
|
}
|
|
|
|
health := i.client.Health()
|
|
return map[string]interface{}{
|
|
"enabled": i.config.Enabled,
|
|
"started": i.started,
|
|
"connected": health.Connected,
|
|
"current_beat": health.LastBeat,
|
|
"current_tempo": health.CurrentTempo,
|
|
"measured_bpm": health.MeasuredBPM,
|
|
"tempo_drift": health.TempoDrift.String(),
|
|
"reconnect_count": health.ReconnectCount,
|
|
"active_searches": len(i.activeSearches),
|
|
"local_degradation": health.LocalDegradation,
|
|
"errors": health.Errors,
|
|
}
|
|
}
|
|
|
|
// ExecuteWithBeatBudget executes a function with a BACKBEAT beat budget
|
|
func (i *Integration) ExecuteWithBeatBudget(beats int, fn func() error) error {
|
|
if !i.started {
|
|
return fn() // Fall back to regular execution if not started
|
|
}
|
|
|
|
return i.client.WithBeatBudget(beats, fn)
|
|
}
|
|
|
|
// zerologHandler adapts zerolog to slog.Handler interface
|
|
type zerologHandler struct {
|
|
logger zerolog.Logger
|
|
}
|
|
|
|
func (h *zerologHandler) Enabled(ctx context.Context, level slog.Level) bool {
|
|
return true
|
|
}
|
|
|
|
func (h *zerologHandler) Handle(ctx context.Context, record slog.Record) error {
|
|
var event *zerolog.Event
|
|
|
|
switch record.Level {
|
|
case slog.LevelDebug:
|
|
event = h.logger.Debug()
|
|
case slog.LevelInfo:
|
|
event = h.logger.Info()
|
|
case slog.LevelWarn:
|
|
event = h.logger.Warn()
|
|
case slog.LevelError:
|
|
event = h.logger.Error()
|
|
default:
|
|
event = h.logger.Info()
|
|
}
|
|
|
|
record.Attrs(func(attr slog.Attr) bool {
|
|
event = event.Interface(attr.Key, attr.Value.Any())
|
|
return true
|
|
})
|
|
|
|
event.Msg(record.Message)
|
|
return nil
|
|
}
|
|
|
|
func (h *zerologHandler) WithAttrs(attrs []slog.Attr) slog.Handler {
|
|
return h
|
|
}
|
|
|
|
func (h *zerologHandler) WithGroup(name string) slog.Handler {
|
|
return h
|
|
} |