 33676bae6d
			
		
	
	33676bae6d
	
	
	
		
			
			Complete implementation: - Go-based search service with PostgreSQL and Redis backend - BACKBEAT SDK integration for beat-aware search operations - Docker containerization with multi-stage builds - Comprehensive API endpoints for project analysis and search - Database migrations and schema management - GITEA integration for repository management - Team composition analysis and recommendations Key features: - Beat-synchronized search operations with timing coordination - Phase-based operation tracking (started → querying → ranking → completed) - Docker Swarm deployment configuration - Health checks and monitoring - Secure configuration with environment variables Architecture: - Microservice design with clean API boundaries - Background processing for long-running analysis - Modular internal structure with proper separation of concerns - Integration with CHORUS ecosystem via BACKBEAT timing 🤖 Generated with [Claude Code](https://claude.ai/code) Co-Authored-By: Claude <noreply@anthropic.com>
		
			
				
	
	
		
			406 lines
		
	
	
		
			9.9 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
			
		
		
	
	
			406 lines
		
	
	
		
			9.9 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
| package backbeat
 | |
| 
 | |
| import (
 | |
| 	"context"
 | |
| 	"fmt"
 | |
| 	"log/slog"
 | |
| 	"time"
 | |
| 
 | |
| 	"github.com/chorus-services/backbeat/pkg/sdk"
 | |
| 	"github.com/chorus-services/whoosh/internal/config"
 | |
| 	"github.com/rs/zerolog"
 | |
| 	"github.com/rs/zerolog/log"
 | |
| )
 | |
| 
 | |
| // Integration manages WHOOSH's integration with the BACKBEAT timing system
 | |
| type Integration struct {
 | |
| 	client   sdk.Client
 | |
| 	config   *config.BackbeatConfig
 | |
| 	logger   *slog.Logger
 | |
| 	ctx      context.Context
 | |
| 	cancel   context.CancelFunc
 | |
| 	started  bool
 | |
| 	
 | |
| 	// Search operation tracking
 | |
| 	activeSearches map[string]*SearchOperation
 | |
| }
 | |
| 
 | |
| // SearchOperation tracks a search operation's progress through BACKBEAT
 | |
| type SearchOperation struct {
 | |
| 	ID          string
 | |
| 	Query       string
 | |
| 	StartBeat   int64
 | |
| 	EstimatedBeats int
 | |
| 	Phase       SearchPhase
 | |
| 	Results     int
 | |
| 	StartTime   time.Time
 | |
| }
 | |
| 
 | |
| // SearchPhase represents the current phase of a search operation
 | |
| type SearchPhase int
 | |
| 
 | |
| const (
 | |
| 	PhaseStarted SearchPhase = iota
 | |
| 	PhaseIndexing
 | |
| 	PhaseQuerying
 | |
| 	PhaseRanking
 | |
| 	PhaseCompleted
 | |
| 	PhaseFailed
 | |
| )
 | |
| 
 | |
| func (p SearchPhase) String() string {
 | |
| 	switch p {
 | |
| 	case PhaseStarted:
 | |
| 		return "started"
 | |
| 	case PhaseIndexing:
 | |
| 		return "indexing"
 | |
| 	case PhaseQuerying:
 | |
| 		return "querying"
 | |
| 	case PhaseRanking:
 | |
| 		return "ranking"
 | |
| 	case PhaseCompleted:
 | |
| 		return "completed"
 | |
| 	case PhaseFailed:
 | |
| 		return "failed"
 | |
| 	default:
 | |
| 		return "unknown"
 | |
| 	}
 | |
| }
 | |
| 
 | |
| // NewIntegration creates a new BACKBEAT integration for WHOOSH
 | |
| func NewIntegration(cfg *config.BackbeatConfig) (*Integration, error) {
 | |
| 	if !cfg.Enabled {
 | |
| 		return nil, fmt.Errorf("BACKBEAT integration is disabled")
 | |
| 	}
 | |
| 
 | |
| 	// Convert zerolog to slog for BACKBEAT SDK compatibility
 | |
| 	slogger := slog.New(&zerologHandler{logger: log.Logger})
 | |
| 
 | |
| 	// Create BACKBEAT SDK config
 | |
| 	sdkConfig := sdk.DefaultConfig()
 | |
| 	sdkConfig.ClusterID = cfg.ClusterID
 | |
| 	sdkConfig.AgentID = cfg.AgentID
 | |
| 	sdkConfig.NATSUrl = cfg.NATSUrl
 | |
| 	sdkConfig.Logger = slogger
 | |
| 
 | |
| 	// Create SDK client
 | |
| 	client := sdk.NewClient(sdkConfig)
 | |
| 
 | |
| 	return &Integration{
 | |
| 		client:         client,
 | |
| 		config:         cfg,
 | |
| 		logger:         slogger,
 | |
| 		activeSearches: make(map[string]*SearchOperation),
 | |
| 	}, nil
 | |
| }
 | |
| 
 | |
| // Start initializes the BACKBEAT integration
 | |
| func (i *Integration) Start(ctx context.Context) error {
 | |
| 	if i.started {
 | |
| 		return fmt.Errorf("integration already started")
 | |
| 	}
 | |
| 
 | |
| 	i.ctx, i.cancel = context.WithCancel(ctx)
 | |
| 
 | |
| 	// Start the SDK client
 | |
| 	if err := i.client.Start(i.ctx); err != nil {
 | |
| 		return fmt.Errorf("failed to start BACKBEAT client: %w", err)
 | |
| 	}
 | |
| 
 | |
| 	// Register beat callbacks
 | |
| 	if err := i.client.OnBeat(i.onBeat); err != nil {
 | |
| 		return fmt.Errorf("failed to register beat callback: %w", err)
 | |
| 	}
 | |
| 
 | |
| 	if err := i.client.OnDownbeat(i.onDownbeat); err != nil {
 | |
| 		return fmt.Errorf("failed to register downbeat callback: %w", err)
 | |
| 	}
 | |
| 
 | |
| 	i.started = true
 | |
| 	log.Info().
 | |
| 		Str("cluster_id", i.config.ClusterID).
 | |
| 		Str("agent_id", i.config.AgentID).
 | |
| 		Msg("🎵 WHOOSH BACKBEAT integration started")
 | |
| 
 | |
| 	return nil
 | |
| }
 | |
| 
 | |
| // Stop gracefully shuts down the BACKBEAT integration
 | |
| func (i *Integration) Stop() error {
 | |
| 	if !i.started {
 | |
| 		return nil
 | |
| 	}
 | |
| 
 | |
| 	if i.cancel != nil {
 | |
| 		i.cancel()
 | |
| 	}
 | |
| 
 | |
| 	if err := i.client.Stop(); err != nil {
 | |
| 		log.Warn().Err(err).Msg("Error stopping BACKBEAT client")
 | |
| 	}
 | |
| 
 | |
| 	i.started = false
 | |
| 	log.Info().Msg("🎵 WHOOSH BACKBEAT integration stopped")
 | |
| 	return nil
 | |
| }
 | |
| 
 | |
| // onBeat handles regular beat events from BACKBEAT
 | |
| func (i *Integration) onBeat(beat sdk.BeatFrame) {
 | |
| 	log.Debug().
 | |
| 		Int64("beat_index", beat.BeatIndex).
 | |
| 		Str("phase", beat.Phase).
 | |
| 		Int("tempo_bpm", beat.TempoBPM).
 | |
| 		Str("window_id", beat.WindowID).
 | |
| 		Bool("downbeat", beat.Downbeat).
 | |
| 		Msg("🥁 BACKBEAT beat received")
 | |
| 
 | |
| 	// Emit status claim for active searches
 | |
| 	for _, search := range i.activeSearches {
 | |
| 		i.emitSearchStatus(search)
 | |
| 	}
 | |
| 
 | |
| 	// Periodic health status emission
 | |
| 	if beat.BeatIndex%8 == 0 { // Every 8 beats (4 minutes at 2 BPM)
 | |
| 		i.emitHealthStatus()
 | |
| 	}
 | |
| }
 | |
| 
 | |
| // onDownbeat handles downbeat (bar start) events
 | |
| func (i *Integration) onDownbeat(beat sdk.BeatFrame) {
 | |
| 	log.Info().
 | |
| 		Int64("beat_index", beat.BeatIndex).
 | |
| 		Str("phase", beat.Phase).
 | |
| 		Str("window_id", beat.WindowID).
 | |
| 		Msg("🎼 BACKBEAT downbeat - new bar started")
 | |
| 
 | |
| 	// Cleanup completed searches on downbeat
 | |
| 	i.cleanupCompletedSearches()
 | |
| }
 | |
| 
 | |
| // StartSearch registers a new search operation with BACKBEAT
 | |
| func (i *Integration) StartSearch(searchID, query string, estimatedBeats int) error {
 | |
| 	if !i.started {
 | |
| 		return fmt.Errorf("BACKBEAT integration not started")
 | |
| 	}
 | |
| 
 | |
| 	search := &SearchOperation{
 | |
| 		ID:             searchID,
 | |
| 		Query:          query,
 | |
| 		StartBeat:      i.client.GetCurrentBeat(),
 | |
| 		EstimatedBeats: estimatedBeats,
 | |
| 		Phase:          PhaseStarted,
 | |
| 		StartTime:      time.Now(),
 | |
| 	}
 | |
| 
 | |
| 	i.activeSearches[searchID] = search
 | |
| 
 | |
| 	// Emit initial status claim
 | |
| 	return i.emitSearchStatus(search)
 | |
| }
 | |
| 
 | |
| // UpdateSearchPhase updates the phase of an active search
 | |
| func (i *Integration) UpdateSearchPhase(searchID string, phase SearchPhase, results int) error {
 | |
| 	search, exists := i.activeSearches[searchID]
 | |
| 	if !exists {
 | |
| 		return fmt.Errorf("search %s not found", searchID)
 | |
| 	}
 | |
| 
 | |
| 	search.Phase = phase
 | |
| 	search.Results = results
 | |
| 
 | |
| 	// Emit updated status claim
 | |
| 	return i.emitSearchStatus(search)
 | |
| }
 | |
| 
 | |
| // CompleteSearch marks a search operation as completed
 | |
| func (i *Integration) CompleteSearch(searchID string, results int) error {
 | |
| 	search, exists := i.activeSearches[searchID]
 | |
| 	if !exists {
 | |
| 		return fmt.Errorf("search %s not found", searchID)
 | |
| 	}
 | |
| 
 | |
| 	search.Phase = PhaseCompleted
 | |
| 	search.Results = results
 | |
| 
 | |
| 	// Emit completion status claim
 | |
| 	if err := i.emitSearchStatus(search); err != nil {
 | |
| 		return err
 | |
| 	}
 | |
| 
 | |
| 	// Remove from active searches
 | |
| 	delete(i.activeSearches, searchID)
 | |
| 	return nil
 | |
| }
 | |
| 
 | |
| // FailSearch marks a search operation as failed
 | |
| func (i *Integration) FailSearch(searchID string, reason string) error {
 | |
| 	search, exists := i.activeSearches[searchID]
 | |
| 	if !exists {
 | |
| 		return fmt.Errorf("search %s not found", searchID)
 | |
| 	}
 | |
| 
 | |
| 	search.Phase = PhaseFailed
 | |
| 
 | |
| 	// Emit failure status claim
 | |
| 	claim := sdk.StatusClaim{
 | |
| 		State:     "failed",
 | |
| 		BeatsLeft: 0,
 | |
| 		Progress:  0.0,
 | |
| 		Notes:     fmt.Sprintf("Search failed: %s (query: %s)", reason, search.Query),
 | |
| 	}
 | |
| 
 | |
| 	if err := i.client.EmitStatusClaim(claim); err != nil {
 | |
| 		return fmt.Errorf("failed to emit failure status: %w", err)
 | |
| 	}
 | |
| 
 | |
| 	// Remove from active searches
 | |
| 	delete(i.activeSearches, searchID)
 | |
| 	return nil
 | |
| }
 | |
| 
 | |
| // emitSearchStatus emits a status claim for a search operation
 | |
| func (i *Integration) emitSearchStatus(search *SearchOperation) error {
 | |
| 	currentBeat := i.client.GetCurrentBeat()
 | |
| 	beatsPassed := currentBeat - search.StartBeat
 | |
| 	beatsLeft := search.EstimatedBeats - int(beatsPassed)
 | |
| 	if beatsLeft < 0 {
 | |
| 		beatsLeft = 0
 | |
| 	}
 | |
| 
 | |
| 	progress := float64(beatsPassed) / float64(search.EstimatedBeats)
 | |
| 	if progress > 1.0 {
 | |
| 		progress = 1.0
 | |
| 	}
 | |
| 
 | |
| 	state := "executing"
 | |
| 	if search.Phase == PhaseCompleted {
 | |
| 		state = "done"
 | |
| 		progress = 1.0
 | |
| 		beatsLeft = 0
 | |
| 	} else if search.Phase == PhaseFailed {
 | |
| 		state = "failed"
 | |
| 		progress = 0.0
 | |
| 		beatsLeft = 0
 | |
| 	}
 | |
| 
 | |
| 	claim := sdk.StatusClaim{
 | |
| 		TaskID:    search.ID,
 | |
| 		State:     state,
 | |
| 		BeatsLeft: beatsLeft,
 | |
| 		Progress:  progress,
 | |
| 		Notes:     fmt.Sprintf("Search %s: %s (query: %s, results: %d)", search.Phase.String(), search.ID, search.Query, search.Results),
 | |
| 	}
 | |
| 
 | |
| 	return i.client.EmitStatusClaim(claim)
 | |
| }
 | |
| 
 | |
| // emitHealthStatus emits a general health status claim
 | |
| func (i *Integration) emitHealthStatus() error {
 | |
| 	health := i.client.Health()
 | |
| 	
 | |
| 	state := "waiting"
 | |
| 	if len(i.activeSearches) > 0 {
 | |
| 		state = "executing"
 | |
| 	}
 | |
| 
 | |
| 	notes := fmt.Sprintf("WHOOSH healthy: connected=%v, searches=%d, tempo=%d BPM", 
 | |
| 		health.Connected, len(i.activeSearches), health.CurrentTempo)
 | |
| 	
 | |
| 	if len(health.Errors) > 0 {
 | |
| 		state = "failed"
 | |
| 		notes += fmt.Sprintf(", errors: %d", len(health.Errors))
 | |
| 	}
 | |
| 
 | |
| 	claim := sdk.StatusClaim{
 | |
| 		TaskID:    "whoosh-health",
 | |
| 		State:     state,
 | |
| 		BeatsLeft: 0,
 | |
| 		Progress:  1.0,
 | |
| 		Notes:     notes,
 | |
| 	}
 | |
| 
 | |
| 	return i.client.EmitStatusClaim(claim)
 | |
| }
 | |
| 
 | |
| // cleanupCompletedSearches removes old completed searches
 | |
| func (i *Integration) cleanupCompletedSearches() {
 | |
| 	// This is called on downbeat, cleanup already happens in CompleteSearch/FailSearch
 | |
| 	log.Debug().Int("active_searches", len(i.activeSearches)).Msg("Active searches cleanup check")
 | |
| }
 | |
| 
 | |
| // GetHealth returns the current BACKBEAT integration health
 | |
| func (i *Integration) GetHealth() map[string]interface{} {
 | |
| 	if !i.started {
 | |
| 		return map[string]interface{}{
 | |
| 			"enabled":   i.config.Enabled,
 | |
| 			"started":   false,
 | |
| 			"connected": false,
 | |
| 		}
 | |
| 	}
 | |
| 
 | |
| 	health := i.client.Health()
 | |
| 	return map[string]interface{}{
 | |
| 		"enabled":          i.config.Enabled,
 | |
| 		"started":          i.started,
 | |
| 		"connected":        health.Connected,
 | |
| 		"current_beat":     health.LastBeat,
 | |
| 		"current_tempo":    health.CurrentTempo,
 | |
| 		"measured_bpm":     health.MeasuredBPM,
 | |
| 		"tempo_drift":      health.TempoDrift.String(),
 | |
| 		"reconnect_count":  health.ReconnectCount,
 | |
| 		"active_searches":  len(i.activeSearches),
 | |
| 		"local_degradation": health.LocalDegradation,
 | |
| 		"errors":           health.Errors,
 | |
| 	}
 | |
| }
 | |
| 
 | |
| // ExecuteWithBeatBudget executes a function with a BACKBEAT beat budget
 | |
| func (i *Integration) ExecuteWithBeatBudget(beats int, fn func() error) error {
 | |
| 	if !i.started {
 | |
| 		return fn() // Fall back to regular execution if not started
 | |
| 	}
 | |
| 
 | |
| 	return i.client.WithBeatBudget(beats, fn)
 | |
| }
 | |
| 
 | |
| // zerologHandler adapts zerolog to slog.Handler interface
 | |
| type zerologHandler struct {
 | |
| 	logger zerolog.Logger
 | |
| }
 | |
| 
 | |
| func (h *zerologHandler) Enabled(ctx context.Context, level slog.Level) bool {
 | |
| 	return true
 | |
| }
 | |
| 
 | |
| func (h *zerologHandler) Handle(ctx context.Context, record slog.Record) error {
 | |
| 	var event *zerolog.Event
 | |
| 	
 | |
| 	switch record.Level {
 | |
| 	case slog.LevelDebug:
 | |
| 		event = h.logger.Debug()
 | |
| 	case slog.LevelInfo:
 | |
| 		event = h.logger.Info()
 | |
| 	case slog.LevelWarn:
 | |
| 		event = h.logger.Warn()
 | |
| 	case slog.LevelError:
 | |
| 		event = h.logger.Error()
 | |
| 	default:
 | |
| 		event = h.logger.Info()
 | |
| 	}
 | |
| 
 | |
| 	record.Attrs(func(attr slog.Attr) bool {
 | |
| 		event = event.Interface(attr.Key, attr.Value.Any())
 | |
| 		return true
 | |
| 	})
 | |
| 
 | |
| 	event.Msg(record.Message)
 | |
| 	return nil
 | |
| }
 | |
| 
 | |
| func (h *zerologHandler) WithAttrs(attrs []slog.Attr) slog.Handler {
 | |
| 	return h
 | |
| }
 | |
| 
 | |
| func (h *zerologHandler) WithGroup(name string) slog.Handler {
 | |
| 	return h
 | |
| } |