Files
CHORUS/internal/backbeat/integration.go
anthonyrawlins 9bdcbe0447 Integrate BACKBEAT SDK and resolve KACHING license validation
Major integrations and fixes:
- Added BACKBEAT SDK integration for P2P operation timing
- Implemented beat-aware status tracking for distributed operations
- Added Docker secrets support for secure license management
- Resolved KACHING license validation via HTTPS/TLS
- Updated docker-compose configuration for clean stack deployment
- Disabled rollback policies to prevent deployment failures
- Added license credential storage (CHORUS-DEV-MULTI-001)

Technical improvements:
- BACKBEAT P2P operation tracking with phase management
- Enhanced configuration system with file-based secrets
- Improved error handling for license validation
- Clean separation of KACHING and CHORUS deployment stacks

🤖 Generated with [Claude Code](https://claude.ai/code)

Co-Authored-By: Claude <noreply@anthropic.com>
2025-09-06 07:56:26 +10:00

400 lines
11 KiB
Go

package backbeat
import (
"context"
"fmt"
"log/slog"
"os"
"time"
"github.com/chorus-services/backbeat/pkg/sdk"
"chorus/pkg/config"
)
// Integration manages CHORUS's integration with the BACKBEAT timing system
type Integration struct {
client sdk.Client
config *BackbeatConfig
logger Logger
ctx context.Context
cancel context.CancelFunc
started bool
nodeID string
// P2P operation tracking
activeOperations map[string]*P2POperation
}
// BackbeatConfig holds BACKBEAT-specific configuration
type BackbeatConfig struct {
Enabled bool
ClusterID string
AgentID string
NATSUrl string
}
// Logger interface for integration with CHORUS logging
type Logger interface {
Info(msg string, args ...interface{})
Warn(msg string, args ...interface{})
Error(msg string, args ...interface{})
}
// P2POperation tracks a P2P coordination operation's progress through BACKBEAT
type P2POperation struct {
ID string
Type string // "election", "dht_store", "pubsub_sync", "peer_discovery"
StartBeat int64
EstimatedBeats int
Phase OperationPhase
PeerCount int
StartTime time.Time
Data interface{}
}
// OperationPhase represents the current phase of a P2P operation
type OperationPhase int
const (
PhaseStarted OperationPhase = iota
PhaseConnecting
PhaseNegotiating
PhaseExecuting
PhaseCompleted
PhaseFailed
)
func (p OperationPhase) String() string {
switch p {
case PhaseStarted:
return "started"
case PhaseConnecting:
return "connecting"
case PhaseNegotiating:
return "negotiating"
case PhaseExecuting:
return "executing"
case PhaseCompleted:
return "completed"
case PhaseFailed:
return "failed"
default:
return "unknown"
}
}
// NewIntegration creates a new BACKBEAT integration for CHORUS
func NewIntegration(cfg *config.Config, nodeID string, logger Logger) (*Integration, error) {
backbeatCfg := extractBackbeatConfig(cfg)
if !backbeatCfg.Enabled {
return nil, fmt.Errorf("BACKBEAT integration is disabled")
}
// Create BACKBEAT SDK config with slog logger
sdkConfig := sdk.DefaultConfig()
sdkConfig.ClusterID = backbeatCfg.ClusterID
sdkConfig.AgentID = backbeatCfg.AgentID
sdkConfig.NATSUrl = backbeatCfg.NATSUrl
sdkConfig.Logger = slog.Default() // Use default slog logger
// Create SDK client
client := sdk.NewClient(sdkConfig)
return &Integration{
client: client,
config: backbeatCfg,
logger: logger,
nodeID: nodeID,
activeOperations: make(map[string]*P2POperation),
}, nil
}
// extractBackbeatConfig extracts BACKBEAT configuration from CHORUS config
func extractBackbeatConfig(cfg *config.Config) *BackbeatConfig {
return &BackbeatConfig{
Enabled: getEnvBool("CHORUS_BACKBEAT_ENABLED", true),
ClusterID: getEnv("CHORUS_BACKBEAT_CLUSTER_ID", "chorus-production"),
AgentID: getEnv("CHORUS_BACKBEAT_AGENT_ID", fmt.Sprintf("chorus-%s", cfg.Agent.ID)),
NATSUrl: getEnv("CHORUS_BACKBEAT_NATS_URL", "nats://backbeat-nats:4222"),
}
}
// Start initializes the BACKBEAT integration
func (i *Integration) Start(ctx context.Context) error {
if i.started {
return fmt.Errorf("integration already started")
}
i.ctx, i.cancel = context.WithCancel(ctx)
// Start the SDK client
if err := i.client.Start(i.ctx); err != nil {
return fmt.Errorf("failed to start BACKBEAT client: %w", err)
}
// Register beat callbacks
if err := i.client.OnBeat(i.onBeat); err != nil {
return fmt.Errorf("failed to register beat callback: %w", err)
}
if err := i.client.OnDownbeat(i.onDownbeat); err != nil {
return fmt.Errorf("failed to register downbeat callback: %w", err)
}
i.started = true
i.logger.Info("🎵 CHORUS BACKBEAT integration started - cluster=%s agent=%s",
i.config.ClusterID, i.config.AgentID)
return nil
}
// Stop gracefully shuts down the BACKBEAT integration
func (i *Integration) Stop() error {
if !i.started {
return nil
}
if i.cancel != nil {
i.cancel()
}
if err := i.client.Stop(); err != nil {
i.logger.Warn("⚠️ Error stopping BACKBEAT client: %v", err)
}
i.started = false
i.logger.Info("🎵 CHORUS BACKBEAT integration stopped")
return nil
}
// onBeat handles regular beat events from BACKBEAT
func (i *Integration) onBeat(beat sdk.BeatFrame) {
i.logger.Info("🥁 BACKBEAT beat received - beat=%d phase=%s tempo=%d window=%s",
beat.BeatIndex, beat.Phase, beat.TempoBPM, beat.WindowID)
// Emit status claim for active operations
for _, op := range i.activeOperations {
i.emitOperationStatus(op)
}
// Periodic health status emission
if beat.BeatIndex%8 == 0 { // Every 8 beats (4 minutes at 2 BPM)
i.emitHealthStatus()
}
}
// onDownbeat handles downbeat (bar start) events
func (i *Integration) onDownbeat(beat sdk.BeatFrame) {
i.logger.Info("🎼 BACKBEAT downbeat - new bar started - beat=%d window=%s",
beat.BeatIndex, beat.WindowID)
// Cleanup completed operations on downbeat
i.cleanupCompletedOperations()
}
// StartP2POperation registers a new P2P operation with BACKBEAT
func (i *Integration) StartP2POperation(operationID, operationType string, estimatedBeats int, data interface{}) error {
if !i.started {
return fmt.Errorf("BACKBEAT integration not started")
}
operation := &P2POperation{
ID: operationID,
Type: operationType,
StartBeat: i.client.GetCurrentBeat(),
EstimatedBeats: estimatedBeats,
Phase: PhaseStarted,
StartTime: time.Now(),
Data: data,
}
i.activeOperations[operationID] = operation
// Emit initial status claim
return i.emitOperationStatus(operation)
}
// UpdateP2POperationPhase updates the phase of an active P2P operation
func (i *Integration) UpdateP2POperationPhase(operationID string, phase OperationPhase, peerCount int) error {
operation, exists := i.activeOperations[operationID]
if !exists {
return fmt.Errorf("operation %s not found", operationID)
}
operation.Phase = phase
operation.PeerCount = peerCount
// Emit updated status claim
return i.emitOperationStatus(operation)
}
// CompleteP2POperation marks a P2P operation as completed
func (i *Integration) CompleteP2POperation(operationID string, peerCount int) error {
operation, exists := i.activeOperations[operationID]
if !exists {
return fmt.Errorf("operation %s not found", operationID)
}
operation.Phase = PhaseCompleted
operation.PeerCount = peerCount
// Emit completion status claim
if err := i.emitOperationStatus(operation); err != nil {
return err
}
// Remove from active operations
delete(i.activeOperations, operationID)
return nil
}
// FailP2POperation marks a P2P operation as failed
func (i *Integration) FailP2POperation(operationID string, reason string) error {
operation, exists := i.activeOperations[operationID]
if !exists {
return fmt.Errorf("operation %s not found", operationID)
}
operation.Phase = PhaseFailed
// Emit failure status claim
claim := sdk.StatusClaim{
State: "failed",
BeatsLeft: 0,
Progress: 0.0,
Notes: fmt.Sprintf("P2P operation failed: %s (type: %s)", reason, operation.Type),
}
if err := i.client.EmitStatusClaim(claim); err != nil {
return fmt.Errorf("failed to emit failure status: %w", err)
}
// Remove from active operations
delete(i.activeOperations, operationID)
return nil
}
// emitOperationStatus emits a status claim for a P2P operation
func (i *Integration) emitOperationStatus(operation *P2POperation) error {
currentBeat := i.client.GetCurrentBeat()
beatsPassed := currentBeat - operation.StartBeat
beatsLeft := operation.EstimatedBeats - int(beatsPassed)
if beatsLeft < 0 {
beatsLeft = 0
}
progress := float64(beatsPassed) / float64(operation.EstimatedBeats)
if progress > 1.0 {
progress = 1.0
}
state := "executing"
if operation.Phase == PhaseCompleted {
state = "done"
progress = 1.0
beatsLeft = 0
} else if operation.Phase == PhaseFailed {
state = "failed"
progress = 0.0
beatsLeft = 0
}
claim := sdk.StatusClaim{
TaskID: operation.ID,
State: state,
BeatsLeft: beatsLeft,
Progress: progress,
Notes: fmt.Sprintf("P2P %s: %s (peers: %d, node: %s)",
operation.Type, operation.Phase.String(), operation.PeerCount, i.nodeID),
}
return i.client.EmitStatusClaim(claim)
}
// emitHealthStatus emits a general health status claim
func (i *Integration) emitHealthStatus() error {
health := i.client.Health()
state := "waiting"
if len(i.activeOperations) > 0 {
state = "executing"
}
notes := fmt.Sprintf("CHORUS P2P healthy: connected=%v, operations=%d, tempo=%d BPM, node=%s",
health.Connected, len(i.activeOperations), health.CurrentTempo, i.nodeID)
if len(health.Errors) > 0 {
state = "failed"
notes += fmt.Sprintf(", errors: %d", len(health.Errors))
}
claim := sdk.StatusClaim{
TaskID: "chorus-p2p-health",
State: state,
BeatsLeft: 0,
Progress: 1.0,
Notes: notes,
}
return i.client.EmitStatusClaim(claim)
}
// cleanupCompletedOperations removes old completed operations
func (i *Integration) cleanupCompletedOperations() {
// This is called on downbeat, cleanup already happens in CompleteP2POperation/FailP2POperation
i.logger.Info("🧹 BACKBEAT operations cleanup check - active: %d", len(i.activeOperations))
}
// GetHealth returns the current BACKBEAT integration health
func (i *Integration) GetHealth() map[string]interface{} {
if !i.started {
return map[string]interface{}{
"enabled": i.config.Enabled,
"started": false,
"connected": false,
}
}
health := i.client.Health()
return map[string]interface{}{
"enabled": i.config.Enabled,
"started": i.started,
"connected": health.Connected,
"current_beat": health.LastBeat,
"current_tempo": health.CurrentTempo,
"measured_bpm": health.MeasuredBPM,
"tempo_drift": health.TempoDrift.String(),
"reconnect_count": health.ReconnectCount,
"active_operations": len(i.activeOperations),
"local_degradation": health.LocalDegradation,
"errors": health.Errors,
"node_id": i.nodeID,
}
}
// ExecuteWithBeatBudget executes a function with a BACKBEAT beat budget
func (i *Integration) ExecuteWithBeatBudget(beats int, fn func() error) error {
if !i.started {
return fn() // Fall back to regular execution if not started
}
return i.client.WithBeatBudget(beats, fn)
}
// Utility functions for environment variable handling
func getEnv(key, defaultValue string) string {
if value := os.Getenv(key); value != "" {
return value
}
return defaultValue
}
func getEnvBool(key string, defaultValue bool) bool {
value := os.Getenv(key)
if value == "" {
return defaultValue
}
return value == "true" || value == "1" || value == "yes" || value == "on"
}