# CHORUS Internal Package: backbeat **Package:** `chorus/internal/backbeat` **Purpose:** BACKBEAT Timing System Integration for CHORUS P2P Operations **Lines of Code:** 400 lines (integration.go) ## Overview The `backbeat` package provides integration between CHORUS and the BACKBEAT distributed timing system. BACKBEAT synchronizes agent operations across the cluster using a shared "heartbeat" that enables coordinated, time-aware distributed computing. This integration allows CHORUS agents to: - Track P2P operations against beat budgets - Report operation progress via status claims - Synchronize multi-agent coordination - Monitor timing drift and degradation - Emit health metrics on a beat schedule ## Core Concepts ### BACKBEAT Timing System BACKBEAT provides a distributed metronome that all agents synchronize to: - **Beat Index:** Sequential beat number across the cluster - **Tempo:** Beats per minute (default: 2 BPM = 30 seconds per beat) - **Phase:** Current position within beat cycle - **Window ID:** Time window identifier for grouping operations - **Downbeat:** Bar start marker (analogous to musical downbeat) ### P2P Operation Tracking CHORUS uses BACKBEAT to track P2P operations: - **Beat Budget:** Estimated beats for operation completion - **Progress Tracking:** Real-time percentage completion - **Phase Transitions:** Operation lifecycle stages - **Peer Coordination:** Multi-agent operation synchronization ## Architecture ### Integration Type ```go type Integration struct { client sdk.Client config *BackbeatConfig logger Logger ctx context.Context cancel context.CancelFunc started bool nodeID string // P2P operation tracking activeOperations map[string]*P2POperation } ``` **Responsibilities:** - BACKBEAT SDK client lifecycle management - Beat and downbeat callback registration - P2P operation tracking and reporting - Status claim emission - Health monitoring ### BackbeatConfig Configuration for BACKBEAT integration. ```go type BackbeatConfig struct { Enabled bool ClusterID string AgentID string NATSUrl string } ``` **Configuration Sources:** - Environment variables (prefixed with `CHORUS_BACKBEAT_`) - CHORUS config.Config integration - Defaults for local development **Environment Variables:** - `CHORUS_BACKBEAT_ENABLED` - Enable/disable integration (default: true) - `CHORUS_BACKBEAT_CLUSTER_ID` - Cluster identifier (default: "chorus-production") - `CHORUS_BACKBEAT_AGENT_ID` - Agent identifier (default: "chorus-{agent_id}") - `CHORUS_BACKBEAT_NATS_URL` - NATS server URL (default: "nats://backbeat-nats:4222") ### P2POperation Tracks a P2P coordination operation through BACKBEAT. ```go type P2POperation struct { ID string Type string // "election", "dht_store", "pubsub_sync", "peer_discovery" StartBeat int64 EstimatedBeats int Phase OperationPhase PeerCount int StartTime time.Time Data interface{} } ``` **Operation Types:** - `election` - Leader election or consensus operation - `dht_store` - DHT storage or retrieval operation - `pubsub_sync` - PubSub message propagation - `peer_discovery` - P2P peer discovery and connection **Lifecycle:** 1. Register operation with `StartP2POperation()` 2. Update phase as operation progresses 3. Complete with `CompleteP2POperation()` or fail with `FailP2POperation()` 4. Automatic cleanup on completion ### OperationPhase Represents the current phase of a P2P operation. ```go type OperationPhase int const ( PhaseStarted OperationPhase = iota PhaseConnecting PhaseNegotiating PhaseExecuting PhaseCompleted PhaseFailed ) ``` **Phase Transitions:** ``` PhaseStarted → PhaseConnecting → PhaseNegotiating → PhaseExecuting → PhaseCompleted ↓ PhaseFailed ``` **Typical Flow:** 1. **PhaseStarted** - Operation registered, initialization 2. **PhaseConnecting** - Establishing connections to peers 3. **PhaseNegotiating** - Consensus or coordination negotiation 4. **PhaseExecuting** - Main operation execution 5. **PhaseCompleted** - Operation successful 6. **PhaseFailed** - Operation failed (any stage) ### Logger Interface Abstraction for CHORUS logging integration. ```go type Logger interface { Info(msg string, args ...interface{}) Warn(msg string, args ...interface{}) Error(msg string, args ...interface{}) } ``` Allows integration with CHORUS's existing logging system without direct dependency. ## Public API ### Constructor #### NewIntegration Creates a new BACKBEAT integration for CHORUS. ```go func NewIntegration(cfg *config.Config, nodeID string, logger Logger) (*Integration, error) ``` **Parameters:** - `cfg` - CHORUS configuration object - `nodeID` - P2P node identifier - `logger` - CHORUS logger implementation **Returns:** - Configured Integration instance - Error if BACKBEAT is disabled or configuration is invalid **Example:** ```go integration, err := backbeat.NewIntegration( config, node.ID().String(), runtime.Logger, ) if err != nil { log.Fatal("BACKBEAT integration failed:", err) } ``` ### Lifecycle Management #### Start Initializes the BACKBEAT integration and starts the SDK client. ```go func (i *Integration) Start(ctx context.Context) error ``` **Actions:** 1. Create cancellation context 2. Start BACKBEAT SDK client 3. Register beat callbacks (`onBeat`, `onDownbeat`) 4. Log startup confirmation **Returns:** Error if already started or SDK initialization fails **Example:** ```go ctx := context.Background() if err := integration.Start(ctx); err != nil { log.Fatal("Failed to start BACKBEAT:", err) } ``` **Logged Output:** ``` 🎵 CHORUS BACKBEAT integration started - cluster=chorus-production agent=chorus-agent-42 ``` #### Stop Gracefully shuts down the BACKBEAT integration. ```go func (i *Integration) Stop() error ``` **Actions:** 1. Cancel context 2. Stop SDK client 3. Cleanup resources 4. Log shutdown confirmation **Returns:** Error if SDK shutdown fails (logged as warning) **Example:** ```go if err := integration.Stop(); err != nil { log.Warn("BACKBEAT shutdown warning:", err) } ``` **Logged Output:** ``` 🎵 CHORUS BACKBEAT integration stopped ``` ### P2P Operation Management #### StartP2POperation Registers a new P2P operation with BACKBEAT. ```go func (i *Integration) StartP2POperation( operationID string, operationType string, estimatedBeats int, data interface{}, ) error ``` **Parameters:** - `operationID` - Unique operation identifier - `operationType` - Operation category (election, dht_store, pubsub_sync, peer_discovery) - `estimatedBeats` - Expected beats to completion - `data` - Optional operation-specific data **Actions:** 1. Create P2POperation record 2. Record start beat from current beat index 3. Add to activeOperations map 4. Emit initial status claim **Returns:** Error if integration not started **Example:** ```go err := integration.StartP2POperation( "election-leader-2025", "election", 5, // Expect completion in 5 beats (~2.5 minutes at 2 BPM) map[string]interface{}{ "candidates": 3, "quorum": 2, }, ) ``` **Status Claim Emitted:** ```json { "task_id": "election-leader-2025", "state": "executing", "beats_left": 5, "progress": 0.0, "notes": "P2P election: started (peers: 0, node: 12D3KooW...)" } ``` #### UpdateP2POperationPhase Updates the phase of an active P2P operation. ```go func (i *Integration) UpdateP2POperationPhase( operationID string, phase OperationPhase, peerCount int, ) error ``` **Parameters:** - `operationID` - Operation identifier - `phase` - New phase (PhaseConnecting, PhaseNegotiating, etc.) - `peerCount` - Current peer count involved in operation **Actions:** 1. Lookup operation in activeOperations 2. Update phase and peer count 3. Emit updated status claim **Returns:** Error if operation not found **Example:** ```go // Connected to peers err := integration.UpdateP2POperationPhase( "election-leader-2025", backbeat.PhaseConnecting, 3, ) // Negotiating consensus err = integration.UpdateP2POperationPhase( "election-leader-2025", backbeat.PhaseNegotiating, 3, ) // Executing election err = integration.UpdateP2POperationPhase( "election-leader-2025", backbeat.PhaseExecuting, 3, ) ``` #### CompleteP2POperation Marks a P2P operation as completed successfully. ```go func (i *Integration) CompleteP2POperation(operationID string, peerCount int) error ``` **Parameters:** - `operationID` - Operation identifier - `peerCount` - Final peer count **Actions:** 1. Lookup operation 2. Set phase to PhaseCompleted 3. Emit completion status claim (state: "done", progress: 1.0) 4. Remove from activeOperations map **Returns:** Error if operation not found or status emission fails **Example:** ```go err := integration.CompleteP2POperation("election-leader-2025", 3) ``` **Status Claim Emitted:** ```json { "task_id": "election-leader-2025", "state": "done", "beats_left": 0, "progress": 1.0, "notes": "P2P election: completed (peers: 3, node: 12D3KooW...)" } ``` #### FailP2POperation Marks a P2P operation as failed. ```go func (i *Integration) FailP2POperation(operationID string, reason string) error ``` **Parameters:** - `operationID` - Operation identifier - `reason` - Failure reason (for logging and status) **Actions:** 1. Lookup operation 2. Set phase to PhaseFailed 3. Emit failure status claim (state: "failed", progress: 0.0) 4. Remove from activeOperations map **Returns:** Error if operation not found or status emission fails **Example:** ```go err := integration.FailP2POperation( "election-leader-2025", "quorum not reached within timeout", ) ``` **Status Claim Emitted:** ```json { "task_id": "election-leader-2025", "state": "failed", "beats_left": 0, "progress": 0.0, "notes": "P2P operation failed: quorum not reached within timeout (type: election)" } ``` ### Health and Monitoring #### GetHealth Returns the current BACKBEAT integration health status. ```go func (i *Integration) GetHealth() map[string]interface{} ``` **Returns:** Map with health metrics: - `enabled` - Integration enabled flag - `started` - Integration started flag - `connected` - NATS connection status - `current_beat` - Current beat index - `current_tempo` - Current tempo (BPM) - `measured_bpm` - Measured beats per minute - `tempo_drift` - Tempo drift status - `reconnect_count` - NATS reconnection count - `active_operations` - Count of active operations - `local_degradation` - Local performance degradation flag - `errors` - Recent error messages - `node_id` - CHORUS node ID **Example:** ```go health := integration.GetHealth() fmt.Printf("BACKBEAT connected: %v\n", health["connected"]) fmt.Printf("Active operations: %d\n", health["active_operations"]) ``` **Example Response:** ```json { "enabled": true, "started": true, "connected": true, "current_beat": 12345, "current_tempo": 2, "measured_bpm": 2.01, "tempo_drift": "acceptable", "reconnect_count": 0, "active_operations": 2, "local_degradation": false, "errors": [], "node_id": "12D3KooWAbc..." } ``` #### ExecuteWithBeatBudget Executes a function with a BACKBEAT beat budget. ```go func (i *Integration) ExecuteWithBeatBudget(beats int, fn func() error) error ``` **Parameters:** - `beats` - Beat budget for operation - `fn` - Function to execute **Actions:** 1. Check if integration is started 2. Delegate to SDK `WithBeatBudget()` for timing enforcement 3. Fall back to regular execution if not started **Returns:** Error from function execution or timeout **Example:** ```go err := integration.ExecuteWithBeatBudget(10, func() error { // This operation should complete within 10 beats return performExpensiveOperation() }) if err != nil { log.Error("Operation exceeded beat budget:", err) } ``` ## Beat Callbacks ### onBeat Handles regular beat events from BACKBEAT. ```go func (i *Integration) onBeat(beat sdk.BeatFrame) ``` **Called:** Every beat (every 30 seconds at 2 BPM) **BeatFrame Structure:** - `BeatIndex` - Sequential beat number - `Phase` - Current phase within beat - `TempoBPM` - Current tempo - `WindowID` - Time window identifier **Actions:** 1. Log beat reception with details 2. Emit status claims for all active operations 3. Periodic health status emission (every 8 beats = ~4 minutes) **Example Log:** ``` 🥁 BACKBEAT beat received - beat=12345 phase=upbeat tempo=2 window=w-1234 ``` ### onDownbeat Handles downbeat (bar start) events. ```go func (i *Integration) onDownbeat(beat sdk.BeatFrame) ``` **Called:** At the start of each bar (every N beats, configurable) **Actions:** 1. Log downbeat reception 2. Cleanup completed operations 3. Log active operation count **Example Log:** ``` 🎼 BACKBEAT downbeat - new bar started - beat=12344 window=w-1234 🧹 BACKBEAT operations cleanup check - active: 2 ``` ## Status Claim Emission ### Operation Status Claims Emitted for each active operation on every beat. ```go func (i *Integration) emitOperationStatus(operation *P2POperation) error ``` **Calculated Fields:** - **Beats Passed:** Current beat - start beat - **Beats Left:** Estimated beats - beats passed (minimum 0) - **Progress:** Beats passed / estimated beats (maximum 1.0) - **State:** "executing", "done", or "failed" **Status Claim Structure:** ```json { "task_id": "operation-id", "state": "executing", "beats_left": 3, "progress": 0.4, "notes": "P2P dht_store: executing (peers: 5, node: 12D3KooW...)" } ``` ### Health Status Claims Emitted periodically (every 8 beats = ~4 minutes at 2 BPM). ```go func (i *Integration) emitHealthStatus() error ``` **Health Claim Structure:** ```json { "task_id": "chorus-p2p-health", "state": "executing", "beats_left": 0, "progress": 1.0, "notes": "CHORUS P2P healthy: connected=true, operations=2, tempo=2 BPM, node=12D3KooW..." } ``` **State Determination:** - `waiting` - No active operations - `executing` - One or more active operations - `failed` - SDK reports errors ## Integration with CHORUS ### SharedRuntime Integration The Integration is created and managed by `runtime.SharedRuntime`: ```go type SharedRuntime struct { // ... other fields BackbeatIntegration *backbeat.Integration } func (sr *SharedRuntime) Initialize(cfg *config.Config) error { // ... other initialization // Create BACKBEAT integration if cfg.Backbeat.Enabled { integration, err := backbeat.NewIntegration( cfg, sr.Node.ID().String(), sr.Logger, ) if err == nil { sr.BackbeatIntegration = integration integration.Start(context.Background()) } } } ``` ### P2P Operation Tracking CHORUS components use BACKBEAT to track distributed operations: **DHT Operations:** ```go // Start tracking integration.StartP2POperation( "dht-store-"+key, "dht_store", 3, // Expect 3 beats map[string]interface{}{"key": key}, ) // Update phase integration.UpdateP2POperationPhase("dht-store-"+key, backbeat.PhaseExecuting, peerCount) // Complete integration.CompleteP2POperation("dht-store-"+key, peerCount) ``` **PubSub Sync:** ```go integration.StartP2POperation( "pubsub-sync-"+messageID, "pubsub_sync", 2, map[string]interface{}{"topic": topic}, ) ``` **Peer Discovery:** ```go integration.StartP2POperation( "peer-discovery-"+sessionID, "peer_discovery", 5, map[string]interface{}{"target_peers": 10}, ) ``` ### HAP Status Display Human Agent Portal displays BACKBEAT status: ```go func (t *TerminalInterface) printStatus() { // ... other status if t.runtime.BackbeatIntegration != nil { health := t.runtime.BackbeatIntegration.GetHealth() if connected, ok := health["connected"].(bool); ok && connected { fmt.Printf("BACKBEAT: ✅ Connected\n") } else { fmt.Printf("BACKBEAT: ⚠️ Disconnected\n") } } else { fmt.Printf("BACKBEAT: ❌ Disabled\n") } } ``` ## Configuration Examples ### Production Configuration ```bash export CHORUS_BACKBEAT_ENABLED=true export CHORUS_BACKBEAT_CLUSTER_ID=chorus-production export CHORUS_BACKBEAT_AGENT_ID=chorus-agent-42 export CHORUS_BACKBEAT_NATS_URL=nats://backbeat-nats.chorus.services:4222 ``` ### Development Configuration ```bash export CHORUS_BACKBEAT_ENABLED=true export CHORUS_BACKBEAT_CLUSTER_ID=chorus-dev export CHORUS_BACKBEAT_AGENT_ID=chorus-dev-alice export CHORUS_BACKBEAT_NATS_URL=nats://localhost:4222 ``` ### Disabled Configuration ```bash export CHORUS_BACKBEAT_ENABLED=false ``` ## Beat Budget Guidelines Recommended beat budgets for common operations: | Operation Type | Estimated Beats | Time at 2 BPM | Rationale | |---|---|---|---| | Peer Discovery | 2-5 beats | 1-2.5 min | Network discovery and handshake | | DHT Store | 2-4 beats | 1-2 min | Distributed storage with replication | | DHT Retrieve | 1-3 beats | 30-90 sec | Distributed lookup and retrieval | | PubSub Sync | 1-2 beats | 30-60 sec | Message propagation | | Leader Election | 3-10 beats | 1.5-5 min | Consensus negotiation | | Task Coordination | 5-20 beats | 2.5-10 min | Multi-agent task assignment | **Factors Affecting Beat Budget:** - Network latency - Peer count - Data size - Consensus requirements - Retry logic ## Error Handling ### Integration Errors **Not Started:** ```go if !i.started { return fmt.Errorf("BACKBEAT integration not started") } ``` **Operation Not Found:** ```go operation, exists := i.activeOperations[operationID] if !exists { return fmt.Errorf("operation %s not found", operationID) } ``` **SDK Errors:** ```go if err := i.client.Start(i.ctx); err != nil { return fmt.Errorf("failed to start BACKBEAT client: %w", err) } ``` ### Degradation Handling BACKBEAT SDK tracks timing degradation: - **Tempo Drift:** Difference between expected and measured BPM - **Local Degradation:** Local system performance issues - **Reconnect Count:** NATS connection stability Health status includes these metrics for monitoring: ```json { "tempo_drift": "acceptable", "local_degradation": false, "reconnect_count": 0 } ``` ## Performance Characteristics ### Resource Usage - **Memory:** O(n) where n = active operations count - **CPU:** Minimal, callback-driven architecture - **Network:** Status claims on each beat (low bandwidth) - **Latency:** Beat-aligned, not real-time (30-second granularity at 2 BPM) ### Scalability - **Active Operations:** Designed for 100s of concurrent operations - **Beat Frequency:** Configurable tempo (1-60 BPM typical) - **Status Claims:** Batched per beat, not per operation event - **Cleanup:** Automatic on completion/failure ### Timing Characteristics At default 2 BPM (30 seconds per beat): - **Minimum tracking granularity:** 30 seconds - **Health check frequency:** 4 minutes (8 beats) - **Operation overhead:** ~0.1s per beat callback - **Status claim latency:** <1s to NATS ## Debugging and Monitoring ### Enable Debug Logging ```go // In BACKBEAT SDK configuration sdkConfig.Logger = slog.New(slog.NewTextHandler(os.Stdout, &slog.HandlerOptions{ Level: slog.LevelDebug, })) ``` ### Monitor Active Operations ```go health := integration.GetHealth() activeOps := health["active_operations"].(int) fmt.Printf("Active P2P operations: %d\n", activeOps) ``` ### Check NATS Connectivity ```go health := integration.GetHealth() if connected, ok := health["connected"].(bool); !ok || !connected { log.Warn("BACKBEAT disconnected from NATS") reconnectCount := health["reconnect_count"].(int) log.Warn("Reconnection attempts:", reconnectCount) } ``` ### Tempo Drift Monitoring ```go health := integration.GetHealth() drift := health["tempo_drift"].(string) measuredBPM := health["measured_bpm"].(float64) expectedBPM := health["current_tempo"].(int) if drift != "acceptable" { log.Warn("Tempo drift detected:", drift) log.Warn("Expected:", expectedBPM, "Measured:", measuredBPM) } ``` ## Testing ### Unit Testing Mock the SDK client for unit tests: ```go type MockSDKClient struct { // ... mock fields } func (m *MockSDKClient) Start(ctx context.Context) error { return nil } func (m *MockSDKClient) GetCurrentBeat() int64 { return 1000 } // ... implement other SDK methods ``` ### Integration Testing Test with real BACKBEAT cluster: ```bash # Start BACKBEAT services docker-compose -f backbeat-compose.yml up -d # Run CHORUS with BACKBEAT enabled export CHORUS_BACKBEAT_ENABLED=true export CHORUS_BACKBEAT_NATS_URL=nats://localhost:4222 ./chorus-agent # Monitor status claims nats sub "backbeat.status.>" ``` ### Load Testing Test with many concurrent operations: ```go func TestManyOperations(t *testing.T) { integration := setupIntegration(t) for i := 0; i < 1000; i++ { opID := fmt.Sprintf("test-op-%d", i) err := integration.StartP2POperation(opID, "dht_store", 5, nil) require.NoError(t, err) } // Wait for beats time.Sleep(3 * time.Minute) // Complete operations for i := 0; i < 1000; i++ { opID := fmt.Sprintf("test-op-%d", i) err := integration.CompleteP2POperation(opID, 5) require.NoError(t, err) } // Verify cleanup health := integration.GetHealth() assert.Equal(t, 0, health["active_operations"]) } ``` ## Troubleshooting ### Common Issues **"BACKBEAT integration is disabled"** - Check `CHORUS_BACKBEAT_ENABLED` environment variable - Verify configuration in CHORUS config file **"Failed to start BACKBEAT client"** - Check NATS connectivity - Verify NATS URL is correct - Ensure NATS server is running - Check firewall rules **"Operation not found"** - Operation may have already completed - Operation ID mismatch - Integration not started before operation registration **High reconnect count** - Network instability - NATS server restarts - Connection timeout configuration **Tempo drift** - System clock synchronization issues (NTP) - High CPU load affecting timing - Network latency spikes ### Debug Commands Check NATS connectivity: ```bash nats server check ``` Monitor BACKBEAT messages: ```bash nats sub "backbeat.>" ``` View status claims: ```bash nats sub "backbeat.status.>" ``` Check CHORUS health: ```bash # Via HAP hap> status ``` ## Future Enhancements ### Planned Features - **Operation Dependencies:** Track operation dependencies for complex workflows - **Beat Budget Warnings:** Alert when operations approach budget limits - **Historical Metrics:** Track operation completion times for better estimates - **Dynamic Beat Budgets:** Adjust budgets based on historical performance - **Operation Priorities:** Prioritize critical operations during contention ### Potential Improvements - **Adaptive Beat Budgets:** Learn optimal budgets from execution history - **Operation Correlation:** Link related operations for workflow tracking - **Beat Budget Profiles:** Pre-defined budgets for common operation patterns - **Performance Analytics:** Detailed metrics on operation performance vs. budget ## Related Documentation - `BACKBEAT SDK Documentation` - BACKBEAT Go SDK reference - `/docs/comprehensive/internal/runtime.md` - SharedRuntime integration - `/docs/comprehensive/pkg/p2p.md` - P2P operations tracked by BACKBEAT - `/docs/comprehensive/pkg/storage.md` - DHT operations with beat budgets ## Summary The `backbeat` package provides essential timing and coordination infrastructure for CHORUS P2P operations: - **400 lines** of integration code - **P2P operation tracking** with beat budgets - **6 operation phases** for lifecycle management - **4 operation types** (election, dht_store, pubsub_sync, peer_discovery) - **Status claim emission** on every beat - **Health monitoring** with tempo drift detection - **Graceful degradation** when BACKBEAT unavailable The integration enables CHORUS to participate in cluster-wide coordinated operations with timing guarantees, progress tracking, and health monitoring, making distributed P2P operations observable and manageable across the agent network. **Current Status:** Production-ready, actively used for P2P operation telemetry and coordination across CHORUS cluster.