package backbeat import ( "context" "fmt" "log/slog" "os" "time" "github.com/chorus-services/backbeat/pkg/sdk" "chorus/pkg/config" ) // Integration manages CHORUS's integration with the BACKBEAT timing system type Integration struct { client sdk.Client config *BackbeatConfig logger Logger ctx context.Context cancel context.CancelFunc started bool nodeID string // P2P operation tracking activeOperations map[string]*P2POperation } // BackbeatConfig holds BACKBEAT-specific configuration type BackbeatConfig struct { Enabled bool ClusterID string AgentID string NATSUrl string } // Logger interface for integration with CHORUS logging type Logger interface { Info(msg string, args ...interface{}) Warn(msg string, args ...interface{}) Error(msg string, args ...interface{}) } // P2POperation tracks a P2P coordination operation's progress through BACKBEAT type P2POperation struct { ID string Type string // "election", "dht_store", "pubsub_sync", "peer_discovery" StartBeat int64 EstimatedBeats int Phase OperationPhase PeerCount int StartTime time.Time Data interface{} } // OperationPhase represents the current phase of a P2P operation type OperationPhase int const ( PhaseStarted OperationPhase = iota PhaseConnecting PhaseNegotiating PhaseExecuting PhaseCompleted PhaseFailed ) func (p OperationPhase) String() string { switch p { case PhaseStarted: return "started" case PhaseConnecting: return "connecting" case PhaseNegotiating: return "negotiating" case PhaseExecuting: return "executing" case PhaseCompleted: return "completed" case PhaseFailed: return "failed" default: return "unknown" } } // NewIntegration creates a new BACKBEAT integration for CHORUS func NewIntegration(cfg *config.Config, nodeID string, logger Logger) (*Integration, error) { backbeatCfg := extractBackbeatConfig(cfg) if !backbeatCfg.Enabled { return nil, fmt.Errorf("BACKBEAT integration is disabled") } // Create BACKBEAT SDK config with slog logger sdkConfig := sdk.DefaultConfig() sdkConfig.ClusterID = backbeatCfg.ClusterID sdkConfig.AgentID = backbeatCfg.AgentID sdkConfig.NATSUrl = backbeatCfg.NATSUrl sdkConfig.Logger = slog.Default() // Use default slog logger // Create SDK client client := sdk.NewClient(sdkConfig) return &Integration{ client: client, config: backbeatCfg, logger: logger, nodeID: nodeID, activeOperations: make(map[string]*P2POperation), }, nil } // extractBackbeatConfig extracts BACKBEAT configuration from CHORUS config func extractBackbeatConfig(cfg *config.Config) *BackbeatConfig { return &BackbeatConfig{ Enabled: getEnvBool("CHORUS_BACKBEAT_ENABLED", true), ClusterID: getEnv("CHORUS_BACKBEAT_CLUSTER_ID", "chorus-production"), AgentID: getEnv("CHORUS_BACKBEAT_AGENT_ID", fmt.Sprintf("chorus-%s", cfg.Agent.ID)), NATSUrl: getEnv("CHORUS_BACKBEAT_NATS_URL", "nats://backbeat-nats:4222"), } } // Start initializes the BACKBEAT integration func (i *Integration) Start(ctx context.Context) error { if i.started { return fmt.Errorf("integration already started") } i.ctx, i.cancel = context.WithCancel(ctx) // Start the SDK client if err := i.client.Start(i.ctx); err != nil { return fmt.Errorf("failed to start BACKBEAT client: %w", err) } // Register beat callbacks if err := i.client.OnBeat(i.onBeat); err != nil { return fmt.Errorf("failed to register beat callback: %w", err) } if err := i.client.OnDownbeat(i.onDownbeat); err != nil { return fmt.Errorf("failed to register downbeat callback: %w", err) } i.started = true i.logger.Info("๐ŸŽต CHORUS BACKBEAT integration started - cluster=%s agent=%s", i.config.ClusterID, i.config.AgentID) return nil } // Stop gracefully shuts down the BACKBEAT integration func (i *Integration) Stop() error { if !i.started { return nil } if i.cancel != nil { i.cancel() } if err := i.client.Stop(); err != nil { i.logger.Warn("โš ๏ธ Error stopping BACKBEAT client: %v", err) } i.started = false i.logger.Info("๐ŸŽต CHORUS BACKBEAT integration stopped") return nil } // onBeat handles regular beat events from BACKBEAT func (i *Integration) onBeat(beat sdk.BeatFrame) { i.logger.Info("๐Ÿฅ BACKBEAT beat received - beat=%d phase=%s tempo=%d window=%s", beat.BeatIndex, beat.Phase, beat.TempoBPM, beat.WindowID) // Emit status claim for active operations for _, op := range i.activeOperations { i.emitOperationStatus(op) } // Periodic health status emission if beat.BeatIndex%8 == 0 { // Every 8 beats (4 minutes at 2 BPM) i.emitHealthStatus() } } // onDownbeat handles downbeat (bar start) events func (i *Integration) onDownbeat(beat sdk.BeatFrame) { i.logger.Info("๐ŸŽผ BACKBEAT downbeat - new bar started - beat=%d window=%s", beat.BeatIndex, beat.WindowID) // Cleanup completed operations on downbeat i.cleanupCompletedOperations() } // StartP2POperation registers a new P2P operation with BACKBEAT func (i *Integration) StartP2POperation(operationID, operationType string, estimatedBeats int, data interface{}) error { if !i.started { return fmt.Errorf("BACKBEAT integration not started") } operation := &P2POperation{ ID: operationID, Type: operationType, StartBeat: i.client.GetCurrentBeat(), EstimatedBeats: estimatedBeats, Phase: PhaseStarted, StartTime: time.Now(), Data: data, } i.activeOperations[operationID] = operation // Emit initial status claim return i.emitOperationStatus(operation) } // UpdateP2POperationPhase updates the phase of an active P2P operation func (i *Integration) UpdateP2POperationPhase(operationID string, phase OperationPhase, peerCount int) error { operation, exists := i.activeOperations[operationID] if !exists { return fmt.Errorf("operation %s not found", operationID) } operation.Phase = phase operation.PeerCount = peerCount // Emit updated status claim return i.emitOperationStatus(operation) } // CompleteP2POperation marks a P2P operation as completed func (i *Integration) CompleteP2POperation(operationID string, peerCount int) error { operation, exists := i.activeOperations[operationID] if !exists { return fmt.Errorf("operation %s not found", operationID) } operation.Phase = PhaseCompleted operation.PeerCount = peerCount // Emit completion status claim if err := i.emitOperationStatus(operation); err != nil { return err } // Remove from active operations delete(i.activeOperations, operationID) return nil } // FailP2POperation marks a P2P operation as failed func (i *Integration) FailP2POperation(operationID string, reason string) error { operation, exists := i.activeOperations[operationID] if !exists { return fmt.Errorf("operation %s not found", operationID) } operation.Phase = PhaseFailed // Emit failure status claim claim := sdk.StatusClaim{ State: "failed", BeatsLeft: 0, Progress: 0.0, Notes: fmt.Sprintf("P2P operation failed: %s (type: %s)", reason, operation.Type), } if err := i.client.EmitStatusClaim(claim); err != nil { return fmt.Errorf("failed to emit failure status: %w", err) } // Remove from active operations delete(i.activeOperations, operationID) return nil } // emitOperationStatus emits a status claim for a P2P operation func (i *Integration) emitOperationStatus(operation *P2POperation) error { currentBeat := i.client.GetCurrentBeat() beatsPassed := currentBeat - operation.StartBeat beatsLeft := operation.EstimatedBeats - int(beatsPassed) if beatsLeft < 0 { beatsLeft = 0 } progress := float64(beatsPassed) / float64(operation.EstimatedBeats) if progress > 1.0 { progress = 1.0 } state := "executing" if operation.Phase == PhaseCompleted { state = "done" progress = 1.0 beatsLeft = 0 } else if operation.Phase == PhaseFailed { state = "failed" progress = 0.0 beatsLeft = 0 } claim := sdk.StatusClaim{ TaskID: operation.ID, State: state, BeatsLeft: beatsLeft, Progress: progress, Notes: fmt.Sprintf("P2P %s: %s (peers: %d, node: %s)", operation.Type, operation.Phase.String(), operation.PeerCount, i.nodeID), } return i.client.EmitStatusClaim(claim) } // emitHealthStatus emits a general health status claim func (i *Integration) emitHealthStatus() error { health := i.client.Health() state := "waiting" if len(i.activeOperations) > 0 { state = "executing" } notes := fmt.Sprintf("CHORUS P2P healthy: connected=%v, operations=%d, tempo=%d BPM, node=%s", health.Connected, len(i.activeOperations), health.CurrentTempo, i.nodeID) if len(health.Errors) > 0 { state = "failed" notes += fmt.Sprintf(", errors: %d", len(health.Errors)) } claim := sdk.StatusClaim{ TaskID: "chorus-p2p-health", State: state, BeatsLeft: 0, Progress: 1.0, Notes: notes, } return i.client.EmitStatusClaim(claim) } // cleanupCompletedOperations removes old completed operations func (i *Integration) cleanupCompletedOperations() { // This is called on downbeat, cleanup already happens in CompleteP2POperation/FailP2POperation i.logger.Info("๐Ÿงน BACKBEAT operations cleanup check - active: %d", len(i.activeOperations)) } // GetHealth returns the current BACKBEAT integration health func (i *Integration) GetHealth() map[string]interface{} { if !i.started { return map[string]interface{}{ "enabled": i.config.Enabled, "started": false, "connected": false, } } health := i.client.Health() return map[string]interface{}{ "enabled": i.config.Enabled, "started": i.started, "connected": health.Connected, "current_beat": health.LastBeat, "current_tempo": health.CurrentTempo, "measured_bpm": health.MeasuredBPM, "tempo_drift": health.TempoDrift.String(), "reconnect_count": health.ReconnectCount, "active_operations": len(i.activeOperations), "local_degradation": health.LocalDegradation, "errors": health.Errors, "node_id": i.nodeID, } } // ExecuteWithBeatBudget executes a function with a BACKBEAT beat budget func (i *Integration) ExecuteWithBeatBudget(beats int, fn func() error) error { if !i.started { return fn() // Fall back to regular execution if not started } return i.client.WithBeatBudget(beats, fn) } // Utility functions for environment variable handling func getEnv(key, defaultValue string) string { if value := os.Getenv(key); value != "" { return value } return defaultValue } func getEnvBool(key string, defaultValue bool) bool { value := os.Getenv(key) if value == "" { return defaultValue } return value == "true" || value == "1" || value == "yes" || value == "on" }