Major integrations and fixes: - Added BACKBEAT SDK integration for P2P operation timing - Implemented beat-aware status tracking for distributed operations - Added Docker secrets support for secure license management - Resolved KACHING license validation via HTTPS/TLS - Updated docker-compose configuration for clean stack deployment - Disabled rollback policies to prevent deployment failures - Added license credential storage (CHORUS-DEV-MULTI-001) Technical improvements: - BACKBEAT P2P operation tracking with phase management - Enhanced configuration system with file-based secrets - Improved error handling for license validation - Clean separation of KACHING and CHORUS deployment stacks 🤖 Generated with [Claude Code](https://claude.ai/code) Co-Authored-By: Claude <noreply@anthropic.com>
400 lines
11 KiB
Go
400 lines
11 KiB
Go
package backbeat
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"log/slog"
|
|
"os"
|
|
"time"
|
|
|
|
"github.com/chorus-services/backbeat/pkg/sdk"
|
|
"chorus/pkg/config"
|
|
)
|
|
|
|
// Integration manages CHORUS's integration with the BACKBEAT timing system
|
|
type Integration struct {
|
|
client sdk.Client
|
|
config *BackbeatConfig
|
|
logger Logger
|
|
ctx context.Context
|
|
cancel context.CancelFunc
|
|
started bool
|
|
nodeID string
|
|
|
|
// P2P operation tracking
|
|
activeOperations map[string]*P2POperation
|
|
}
|
|
|
|
// BackbeatConfig holds BACKBEAT-specific configuration
|
|
type BackbeatConfig struct {
|
|
Enabled bool
|
|
ClusterID string
|
|
AgentID string
|
|
NATSUrl string
|
|
}
|
|
|
|
// Logger interface for integration with CHORUS logging
|
|
type Logger interface {
|
|
Info(msg string, args ...interface{})
|
|
Warn(msg string, args ...interface{})
|
|
Error(msg string, args ...interface{})
|
|
}
|
|
|
|
// P2POperation tracks a P2P coordination operation's progress through BACKBEAT
|
|
type P2POperation struct {
|
|
ID string
|
|
Type string // "election", "dht_store", "pubsub_sync", "peer_discovery"
|
|
StartBeat int64
|
|
EstimatedBeats int
|
|
Phase OperationPhase
|
|
PeerCount int
|
|
StartTime time.Time
|
|
Data interface{}
|
|
}
|
|
|
|
// OperationPhase represents the current phase of a P2P operation
|
|
type OperationPhase int
|
|
|
|
const (
|
|
PhaseStarted OperationPhase = iota
|
|
PhaseConnecting
|
|
PhaseNegotiating
|
|
PhaseExecuting
|
|
PhaseCompleted
|
|
PhaseFailed
|
|
)
|
|
|
|
func (p OperationPhase) String() string {
|
|
switch p {
|
|
case PhaseStarted:
|
|
return "started"
|
|
case PhaseConnecting:
|
|
return "connecting"
|
|
case PhaseNegotiating:
|
|
return "negotiating"
|
|
case PhaseExecuting:
|
|
return "executing"
|
|
case PhaseCompleted:
|
|
return "completed"
|
|
case PhaseFailed:
|
|
return "failed"
|
|
default:
|
|
return "unknown"
|
|
}
|
|
}
|
|
|
|
// NewIntegration creates a new BACKBEAT integration for CHORUS
|
|
func NewIntegration(cfg *config.Config, nodeID string, logger Logger) (*Integration, error) {
|
|
backbeatCfg := extractBackbeatConfig(cfg)
|
|
|
|
if !backbeatCfg.Enabled {
|
|
return nil, fmt.Errorf("BACKBEAT integration is disabled")
|
|
}
|
|
|
|
// Create BACKBEAT SDK config with slog logger
|
|
sdkConfig := sdk.DefaultConfig()
|
|
sdkConfig.ClusterID = backbeatCfg.ClusterID
|
|
sdkConfig.AgentID = backbeatCfg.AgentID
|
|
sdkConfig.NATSUrl = backbeatCfg.NATSUrl
|
|
sdkConfig.Logger = slog.Default() // Use default slog logger
|
|
|
|
// Create SDK client
|
|
client := sdk.NewClient(sdkConfig)
|
|
|
|
return &Integration{
|
|
client: client,
|
|
config: backbeatCfg,
|
|
logger: logger,
|
|
nodeID: nodeID,
|
|
activeOperations: make(map[string]*P2POperation),
|
|
}, nil
|
|
}
|
|
|
|
// extractBackbeatConfig extracts BACKBEAT configuration from CHORUS config
|
|
func extractBackbeatConfig(cfg *config.Config) *BackbeatConfig {
|
|
return &BackbeatConfig{
|
|
Enabled: getEnvBool("CHORUS_BACKBEAT_ENABLED", true),
|
|
ClusterID: getEnv("CHORUS_BACKBEAT_CLUSTER_ID", "chorus-production"),
|
|
AgentID: getEnv("CHORUS_BACKBEAT_AGENT_ID", fmt.Sprintf("chorus-%s", cfg.Agent.ID)),
|
|
NATSUrl: getEnv("CHORUS_BACKBEAT_NATS_URL", "nats://backbeat-nats:4222"),
|
|
}
|
|
}
|
|
|
|
// Start initializes the BACKBEAT integration
|
|
func (i *Integration) Start(ctx context.Context) error {
|
|
if i.started {
|
|
return fmt.Errorf("integration already started")
|
|
}
|
|
|
|
i.ctx, i.cancel = context.WithCancel(ctx)
|
|
|
|
// Start the SDK client
|
|
if err := i.client.Start(i.ctx); err != nil {
|
|
return fmt.Errorf("failed to start BACKBEAT client: %w", err)
|
|
}
|
|
|
|
// Register beat callbacks
|
|
if err := i.client.OnBeat(i.onBeat); err != nil {
|
|
return fmt.Errorf("failed to register beat callback: %w", err)
|
|
}
|
|
|
|
if err := i.client.OnDownbeat(i.onDownbeat); err != nil {
|
|
return fmt.Errorf("failed to register downbeat callback: %w", err)
|
|
}
|
|
|
|
i.started = true
|
|
i.logger.Info("🎵 CHORUS BACKBEAT integration started - cluster=%s agent=%s",
|
|
i.config.ClusterID, i.config.AgentID)
|
|
|
|
return nil
|
|
}
|
|
|
|
// Stop gracefully shuts down the BACKBEAT integration
|
|
func (i *Integration) Stop() error {
|
|
if !i.started {
|
|
return nil
|
|
}
|
|
|
|
if i.cancel != nil {
|
|
i.cancel()
|
|
}
|
|
|
|
if err := i.client.Stop(); err != nil {
|
|
i.logger.Warn("⚠️ Error stopping BACKBEAT client: %v", err)
|
|
}
|
|
|
|
i.started = false
|
|
i.logger.Info("🎵 CHORUS BACKBEAT integration stopped")
|
|
return nil
|
|
}
|
|
|
|
// onBeat handles regular beat events from BACKBEAT
|
|
func (i *Integration) onBeat(beat sdk.BeatFrame) {
|
|
i.logger.Info("🥁 BACKBEAT beat received - beat=%d phase=%s tempo=%d window=%s",
|
|
beat.BeatIndex, beat.Phase, beat.TempoBPM, beat.WindowID)
|
|
|
|
// Emit status claim for active operations
|
|
for _, op := range i.activeOperations {
|
|
i.emitOperationStatus(op)
|
|
}
|
|
|
|
// Periodic health status emission
|
|
if beat.BeatIndex%8 == 0 { // Every 8 beats (4 minutes at 2 BPM)
|
|
i.emitHealthStatus()
|
|
}
|
|
}
|
|
|
|
// onDownbeat handles downbeat (bar start) events
|
|
func (i *Integration) onDownbeat(beat sdk.BeatFrame) {
|
|
i.logger.Info("🎼 BACKBEAT downbeat - new bar started - beat=%d window=%s",
|
|
beat.BeatIndex, beat.WindowID)
|
|
|
|
// Cleanup completed operations on downbeat
|
|
i.cleanupCompletedOperations()
|
|
}
|
|
|
|
// StartP2POperation registers a new P2P operation with BACKBEAT
|
|
func (i *Integration) StartP2POperation(operationID, operationType string, estimatedBeats int, data interface{}) error {
|
|
if !i.started {
|
|
return fmt.Errorf("BACKBEAT integration not started")
|
|
}
|
|
|
|
operation := &P2POperation{
|
|
ID: operationID,
|
|
Type: operationType,
|
|
StartBeat: i.client.GetCurrentBeat(),
|
|
EstimatedBeats: estimatedBeats,
|
|
Phase: PhaseStarted,
|
|
StartTime: time.Now(),
|
|
Data: data,
|
|
}
|
|
|
|
i.activeOperations[operationID] = operation
|
|
|
|
// Emit initial status claim
|
|
return i.emitOperationStatus(operation)
|
|
}
|
|
|
|
// UpdateP2POperationPhase updates the phase of an active P2P operation
|
|
func (i *Integration) UpdateP2POperationPhase(operationID string, phase OperationPhase, peerCount int) error {
|
|
operation, exists := i.activeOperations[operationID]
|
|
if !exists {
|
|
return fmt.Errorf("operation %s not found", operationID)
|
|
}
|
|
|
|
operation.Phase = phase
|
|
operation.PeerCount = peerCount
|
|
|
|
// Emit updated status claim
|
|
return i.emitOperationStatus(operation)
|
|
}
|
|
|
|
// CompleteP2POperation marks a P2P operation as completed
|
|
func (i *Integration) CompleteP2POperation(operationID string, peerCount int) error {
|
|
operation, exists := i.activeOperations[operationID]
|
|
if !exists {
|
|
return fmt.Errorf("operation %s not found", operationID)
|
|
}
|
|
|
|
operation.Phase = PhaseCompleted
|
|
operation.PeerCount = peerCount
|
|
|
|
// Emit completion status claim
|
|
if err := i.emitOperationStatus(operation); err != nil {
|
|
return err
|
|
}
|
|
|
|
// Remove from active operations
|
|
delete(i.activeOperations, operationID)
|
|
return nil
|
|
}
|
|
|
|
// FailP2POperation marks a P2P operation as failed
|
|
func (i *Integration) FailP2POperation(operationID string, reason string) error {
|
|
operation, exists := i.activeOperations[operationID]
|
|
if !exists {
|
|
return fmt.Errorf("operation %s not found", operationID)
|
|
}
|
|
|
|
operation.Phase = PhaseFailed
|
|
|
|
// Emit failure status claim
|
|
claim := sdk.StatusClaim{
|
|
State: "failed",
|
|
BeatsLeft: 0,
|
|
Progress: 0.0,
|
|
Notes: fmt.Sprintf("P2P operation failed: %s (type: %s)", reason, operation.Type),
|
|
}
|
|
|
|
if err := i.client.EmitStatusClaim(claim); err != nil {
|
|
return fmt.Errorf("failed to emit failure status: %w", err)
|
|
}
|
|
|
|
// Remove from active operations
|
|
delete(i.activeOperations, operationID)
|
|
return nil
|
|
}
|
|
|
|
// emitOperationStatus emits a status claim for a P2P operation
|
|
func (i *Integration) emitOperationStatus(operation *P2POperation) error {
|
|
currentBeat := i.client.GetCurrentBeat()
|
|
beatsPassed := currentBeat - operation.StartBeat
|
|
beatsLeft := operation.EstimatedBeats - int(beatsPassed)
|
|
if beatsLeft < 0 {
|
|
beatsLeft = 0
|
|
}
|
|
|
|
progress := float64(beatsPassed) / float64(operation.EstimatedBeats)
|
|
if progress > 1.0 {
|
|
progress = 1.0
|
|
}
|
|
|
|
state := "executing"
|
|
if operation.Phase == PhaseCompleted {
|
|
state = "done"
|
|
progress = 1.0
|
|
beatsLeft = 0
|
|
} else if operation.Phase == PhaseFailed {
|
|
state = "failed"
|
|
progress = 0.0
|
|
beatsLeft = 0
|
|
}
|
|
|
|
claim := sdk.StatusClaim{
|
|
TaskID: operation.ID,
|
|
State: state,
|
|
BeatsLeft: beatsLeft,
|
|
Progress: progress,
|
|
Notes: fmt.Sprintf("P2P %s: %s (peers: %d, node: %s)",
|
|
operation.Type, operation.Phase.String(), operation.PeerCount, i.nodeID),
|
|
}
|
|
|
|
return i.client.EmitStatusClaim(claim)
|
|
}
|
|
|
|
// emitHealthStatus emits a general health status claim
|
|
func (i *Integration) emitHealthStatus() error {
|
|
health := i.client.Health()
|
|
|
|
state := "waiting"
|
|
if len(i.activeOperations) > 0 {
|
|
state = "executing"
|
|
}
|
|
|
|
notes := fmt.Sprintf("CHORUS P2P healthy: connected=%v, operations=%d, tempo=%d BPM, node=%s",
|
|
health.Connected, len(i.activeOperations), health.CurrentTempo, i.nodeID)
|
|
|
|
if len(health.Errors) > 0 {
|
|
state = "failed"
|
|
notes += fmt.Sprintf(", errors: %d", len(health.Errors))
|
|
}
|
|
|
|
claim := sdk.StatusClaim{
|
|
TaskID: "chorus-p2p-health",
|
|
State: state,
|
|
BeatsLeft: 0,
|
|
Progress: 1.0,
|
|
Notes: notes,
|
|
}
|
|
|
|
return i.client.EmitStatusClaim(claim)
|
|
}
|
|
|
|
// cleanupCompletedOperations removes old completed operations
|
|
func (i *Integration) cleanupCompletedOperations() {
|
|
// This is called on downbeat, cleanup already happens in CompleteP2POperation/FailP2POperation
|
|
i.logger.Info("🧹 BACKBEAT operations cleanup check - active: %d", len(i.activeOperations))
|
|
}
|
|
|
|
// GetHealth returns the current BACKBEAT integration health
|
|
func (i *Integration) GetHealth() map[string]interface{} {
|
|
if !i.started {
|
|
return map[string]interface{}{
|
|
"enabled": i.config.Enabled,
|
|
"started": false,
|
|
"connected": false,
|
|
}
|
|
}
|
|
|
|
health := i.client.Health()
|
|
return map[string]interface{}{
|
|
"enabled": i.config.Enabled,
|
|
"started": i.started,
|
|
"connected": health.Connected,
|
|
"current_beat": health.LastBeat,
|
|
"current_tempo": health.CurrentTempo,
|
|
"measured_bpm": health.MeasuredBPM,
|
|
"tempo_drift": health.TempoDrift.String(),
|
|
"reconnect_count": health.ReconnectCount,
|
|
"active_operations": len(i.activeOperations),
|
|
"local_degradation": health.LocalDegradation,
|
|
"errors": health.Errors,
|
|
"node_id": i.nodeID,
|
|
}
|
|
}
|
|
|
|
// ExecuteWithBeatBudget executes a function with a BACKBEAT beat budget
|
|
func (i *Integration) ExecuteWithBeatBudget(beats int, fn func() error) error {
|
|
if !i.started {
|
|
return fn() // Fall back to regular execution if not started
|
|
}
|
|
|
|
return i.client.WithBeatBudget(beats, fn)
|
|
}
|
|
|
|
|
|
// Utility functions for environment variable handling
|
|
func getEnv(key, defaultValue string) string {
|
|
if value := os.Getenv(key); value != "" {
|
|
return value
|
|
}
|
|
return defaultValue
|
|
}
|
|
|
|
func getEnvBool(key string, defaultValue bool) bool {
|
|
value := os.Getenv(key)
|
|
if value == "" {
|
|
return defaultValue
|
|
}
|
|
return value == "true" || value == "1" || value == "yes" || value == "on"
|
|
} |