 c5b7311a8b
			
		
	
	c5b7311a8b
	
	
	
		
			
			Comprehensive documentation for coordination, messaging, discovery, and internal systems. Core Coordination Packages: - pkg/election - Democratic leader election (uptime-based, heartbeat mechanism, SLURP integration) - pkg/coordination - Meta-coordination with dependency detection (4 built-in rules) - coordinator/ - Task orchestration and assignment (AI-powered scoring) - discovery/ - mDNS peer discovery (automatic LAN detection) Messaging & P2P Infrastructure: - pubsub/ - GossipSub messaging (31 message types, role-based topics, HMMM integration) - p2p/ - libp2p networking (DHT modes, connection management, security) Monitoring & Health: - pkg/metrics - Prometheus metrics (80+ metrics across 12 categories) - pkg/health - Health monitoring (4 HTTP endpoints, enhanced checks, graceful degradation) Internal Systems: - internal/licensing - License validation (KACHING integration, cluster leases, fail-closed) - internal/hapui - Human Agent Portal UI (9 commands, HMMM wizard, UCXL browser, decision voting) - internal/backbeat - P2P operation telemetry (6 phases, beat synchronization, health reporting) Documentation Statistics (Phase 3): - 10 packages documented (~18,000 lines) - 31 PubSub message types cataloged - 80+ Prometheus metrics documented - Complete API references with examples - Integration patterns and best practices Key Features Documented: - Election: 5 triggers, candidate scoring (5 weighted components), stability windows - Coordination: AI-powered dependency detection, cross-repo sessions, escalation handling - PubSub: Topic patterns, message envelopes, SHHH redaction, Hypercore logging - Metrics: All metric types with labels, Prometheus scrape config, alert rules - Health: Liveness vs readiness, critical checks, Kubernetes integration - Licensing: Grace periods, circuit breaker, cluster lease management - HAP UI: Interactive terminal commands, HMMM composition wizard, web interface (beta) - BACKBEAT: 6-phase operation tracking, beat budget estimation, drift detection Implementation Status Marked: - ✅ Production: Election, metrics, health, licensing, pubsub, p2p, discovery, coordinator - 🔶 Beta: HAP web interface, BACKBEAT telemetry, advanced coordination - 🔷 Alpha: SLURP election scoring - ⚠️ Experimental: Meta-coordination, AI-powered dependency detection Progress: 22/62 files complete (35%) Next Phase: AI providers, SLURP system, API layer, reasoning engine 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude <noreply@anthropic.com>
		
			
				
	
	
		
			1017 lines
		
	
	
		
			24 KiB
		
	
	
	
		
			Markdown
		
	
	
	
	
	
			
		
		
	
	
			1017 lines
		
	
	
		
			24 KiB
		
	
	
	
		
			Markdown
		
	
	
	
	
	
| # CHORUS Internal Package: backbeat
 | |
| 
 | |
| **Package:** `chorus/internal/backbeat`
 | |
| **Purpose:** BACKBEAT Timing System Integration for CHORUS P2P Operations
 | |
| **Lines of Code:** 400 lines (integration.go)
 | |
| 
 | |
| ## Overview
 | |
| 
 | |
| The `backbeat` package provides integration between CHORUS and the BACKBEAT distributed timing system. BACKBEAT synchronizes agent operations across the cluster using a shared "heartbeat" that enables coordinated, time-aware distributed computing.
 | |
| 
 | |
| This integration allows CHORUS agents to:
 | |
| - Track P2P operations against beat budgets
 | |
| - Report operation progress via status claims
 | |
| - Synchronize multi-agent coordination
 | |
| - Monitor timing drift and degradation
 | |
| - Emit health metrics on a beat schedule
 | |
| 
 | |
| ## Core Concepts
 | |
| 
 | |
| ### BACKBEAT Timing System
 | |
| 
 | |
| BACKBEAT provides a distributed metronome that all agents synchronize to:
 | |
| - **Beat Index:** Sequential beat number across the cluster
 | |
| - **Tempo:** Beats per minute (default: 2 BPM = 30 seconds per beat)
 | |
| - **Phase:** Current position within beat cycle
 | |
| - **Window ID:** Time window identifier for grouping operations
 | |
| - **Downbeat:** Bar start marker (analogous to musical downbeat)
 | |
| 
 | |
| ### P2P Operation Tracking
 | |
| 
 | |
| CHORUS uses BACKBEAT to track P2P operations:
 | |
| - **Beat Budget:** Estimated beats for operation completion
 | |
| - **Progress Tracking:** Real-time percentage completion
 | |
| - **Phase Transitions:** Operation lifecycle stages
 | |
| - **Peer Coordination:** Multi-agent operation synchronization
 | |
| 
 | |
| ## Architecture
 | |
| 
 | |
| ### Integration Type
 | |
| 
 | |
| ```go
 | |
| type Integration struct {
 | |
|     client   sdk.Client
 | |
|     config   *BackbeatConfig
 | |
|     logger   Logger
 | |
|     ctx      context.Context
 | |
|     cancel   context.CancelFunc
 | |
|     started  bool
 | |
|     nodeID   string
 | |
| 
 | |
|     // P2P operation tracking
 | |
|     activeOperations map[string]*P2POperation
 | |
| }
 | |
| ```
 | |
| 
 | |
| **Responsibilities:**
 | |
| - BACKBEAT SDK client lifecycle management
 | |
| - Beat and downbeat callback registration
 | |
| - P2P operation tracking and reporting
 | |
| - Status claim emission
 | |
| - Health monitoring
 | |
| 
 | |
| ### BackbeatConfig
 | |
| 
 | |
| Configuration for BACKBEAT integration.
 | |
| 
 | |
| ```go
 | |
| type BackbeatConfig struct {
 | |
|     Enabled     bool
 | |
|     ClusterID   string
 | |
|     AgentID     string
 | |
|     NATSUrl     string
 | |
| }
 | |
| ```
 | |
| 
 | |
| **Configuration Sources:**
 | |
| - Environment variables (prefixed with `CHORUS_BACKBEAT_`)
 | |
| - CHORUS config.Config integration
 | |
| - Defaults for local development
 | |
| 
 | |
| **Environment Variables:**
 | |
| - `CHORUS_BACKBEAT_ENABLED` - Enable/disable integration (default: true)
 | |
| - `CHORUS_BACKBEAT_CLUSTER_ID` - Cluster identifier (default: "chorus-production")
 | |
| - `CHORUS_BACKBEAT_AGENT_ID` - Agent identifier (default: "chorus-{agent_id}")
 | |
| - `CHORUS_BACKBEAT_NATS_URL` - NATS server URL (default: "nats://backbeat-nats:4222")
 | |
| 
 | |
| ### P2POperation
 | |
| 
 | |
| Tracks a P2P coordination operation through BACKBEAT.
 | |
| 
 | |
| ```go
 | |
| type P2POperation struct {
 | |
|     ID          string
 | |
|     Type        string // "election", "dht_store", "pubsub_sync", "peer_discovery"
 | |
|     StartBeat   int64
 | |
|     EstimatedBeats int
 | |
|     Phase       OperationPhase
 | |
|     PeerCount   int
 | |
|     StartTime   time.Time
 | |
|     Data        interface{}
 | |
| }
 | |
| ```
 | |
| 
 | |
| **Operation Types:**
 | |
| - `election` - Leader election or consensus operation
 | |
| - `dht_store` - DHT storage or retrieval operation
 | |
| - `pubsub_sync` - PubSub message propagation
 | |
| - `peer_discovery` - P2P peer discovery and connection
 | |
| 
 | |
| **Lifecycle:**
 | |
| 1. Register operation with `StartP2POperation()`
 | |
| 2. Update phase as operation progresses
 | |
| 3. Complete with `CompleteP2POperation()` or fail with `FailP2POperation()`
 | |
| 4. Automatic cleanup on completion
 | |
| 
 | |
| ### OperationPhase
 | |
| 
 | |
| Represents the current phase of a P2P operation.
 | |
| 
 | |
| ```go
 | |
| type OperationPhase int
 | |
| 
 | |
| const (
 | |
|     PhaseStarted OperationPhase = iota
 | |
|     PhaseConnecting
 | |
|     PhaseNegotiating
 | |
|     PhaseExecuting
 | |
|     PhaseCompleted
 | |
|     PhaseFailed
 | |
| )
 | |
| ```
 | |
| 
 | |
| **Phase Transitions:**
 | |
| 
 | |
| ```
 | |
| PhaseStarted → PhaseConnecting → PhaseNegotiating → PhaseExecuting → PhaseCompleted
 | |
|                                                                     ↓
 | |
|                                                                PhaseFailed
 | |
| ```
 | |
| 
 | |
| **Typical Flow:**
 | |
| 1. **PhaseStarted** - Operation registered, initialization
 | |
| 2. **PhaseConnecting** - Establishing connections to peers
 | |
| 3. **PhaseNegotiating** - Consensus or coordination negotiation
 | |
| 4. **PhaseExecuting** - Main operation execution
 | |
| 5. **PhaseCompleted** - Operation successful
 | |
| 6. **PhaseFailed** - Operation failed (any stage)
 | |
| 
 | |
| ### Logger Interface
 | |
| 
 | |
| Abstraction for CHORUS logging integration.
 | |
| 
 | |
| ```go
 | |
| type Logger interface {
 | |
|     Info(msg string, args ...interface{})
 | |
|     Warn(msg string, args ...interface{})
 | |
|     Error(msg string, args ...interface{})
 | |
| }
 | |
| ```
 | |
| 
 | |
| Allows integration with CHORUS's existing logging system without direct dependency.
 | |
| 
 | |
| ## Public API
 | |
| 
 | |
| ### Constructor
 | |
| 
 | |
| #### NewIntegration
 | |
| 
 | |
| Creates a new BACKBEAT integration for CHORUS.
 | |
| 
 | |
| ```go
 | |
| func NewIntegration(cfg *config.Config, nodeID string, logger Logger) (*Integration, error)
 | |
| ```
 | |
| 
 | |
| **Parameters:**
 | |
| - `cfg` - CHORUS configuration object
 | |
| - `nodeID` - P2P node identifier
 | |
| - `logger` - CHORUS logger implementation
 | |
| 
 | |
| **Returns:**
 | |
| - Configured Integration instance
 | |
| - Error if BACKBEAT is disabled or configuration is invalid
 | |
| 
 | |
| **Example:**
 | |
| ```go
 | |
| integration, err := backbeat.NewIntegration(
 | |
|     config,
 | |
|     node.ID().String(),
 | |
|     runtime.Logger,
 | |
| )
 | |
| if err != nil {
 | |
|     log.Fatal("BACKBEAT integration failed:", err)
 | |
| }
 | |
| ```
 | |
| 
 | |
| ### Lifecycle Management
 | |
| 
 | |
| #### Start
 | |
| 
 | |
| Initializes the BACKBEAT integration and starts the SDK client.
 | |
| 
 | |
| ```go
 | |
| func (i *Integration) Start(ctx context.Context) error
 | |
| ```
 | |
| 
 | |
| **Actions:**
 | |
| 1. Create cancellation context
 | |
| 2. Start BACKBEAT SDK client
 | |
| 3. Register beat callbacks (`onBeat`, `onDownbeat`)
 | |
| 4. Log startup confirmation
 | |
| 
 | |
| **Returns:** Error if already started or SDK initialization fails
 | |
| 
 | |
| **Example:**
 | |
| ```go
 | |
| ctx := context.Background()
 | |
| if err := integration.Start(ctx); err != nil {
 | |
|     log.Fatal("Failed to start BACKBEAT:", err)
 | |
| }
 | |
| ```
 | |
| 
 | |
| **Logged Output:**
 | |
| ```
 | |
| 🎵 CHORUS BACKBEAT integration started - cluster=chorus-production agent=chorus-agent-42
 | |
| ```
 | |
| 
 | |
| #### Stop
 | |
| 
 | |
| Gracefully shuts down the BACKBEAT integration.
 | |
| 
 | |
| ```go
 | |
| func (i *Integration) Stop() error
 | |
| ```
 | |
| 
 | |
| **Actions:**
 | |
| 1. Cancel context
 | |
| 2. Stop SDK client
 | |
| 3. Cleanup resources
 | |
| 4. Log shutdown confirmation
 | |
| 
 | |
| **Returns:** Error if SDK shutdown fails (logged as warning)
 | |
| 
 | |
| **Example:**
 | |
| ```go
 | |
| if err := integration.Stop(); err != nil {
 | |
|     log.Warn("BACKBEAT shutdown warning:", err)
 | |
| }
 | |
| ```
 | |
| 
 | |
| **Logged Output:**
 | |
| ```
 | |
| 🎵 CHORUS BACKBEAT integration stopped
 | |
| ```
 | |
| 
 | |
| ### P2P Operation Management
 | |
| 
 | |
| #### StartP2POperation
 | |
| 
 | |
| Registers a new P2P operation with BACKBEAT.
 | |
| 
 | |
| ```go
 | |
| func (i *Integration) StartP2POperation(
 | |
|     operationID string,
 | |
|     operationType string,
 | |
|     estimatedBeats int,
 | |
|     data interface{},
 | |
| ) error
 | |
| ```
 | |
| 
 | |
| **Parameters:**
 | |
| - `operationID` - Unique operation identifier
 | |
| - `operationType` - Operation category (election, dht_store, pubsub_sync, peer_discovery)
 | |
| - `estimatedBeats` - Expected beats to completion
 | |
| - `data` - Optional operation-specific data
 | |
| 
 | |
| **Actions:**
 | |
| 1. Create P2POperation record
 | |
| 2. Record start beat from current beat index
 | |
| 3. Add to activeOperations map
 | |
| 4. Emit initial status claim
 | |
| 
 | |
| **Returns:** Error if integration not started
 | |
| 
 | |
| **Example:**
 | |
| ```go
 | |
| err := integration.StartP2POperation(
 | |
|     "election-leader-2025",
 | |
|     "election",
 | |
|     5, // Expect completion in 5 beats (~2.5 minutes at 2 BPM)
 | |
|     map[string]interface{}{
 | |
|         "candidates": 3,
 | |
|         "quorum": 2,
 | |
|     },
 | |
| )
 | |
| ```
 | |
| 
 | |
| **Status Claim Emitted:**
 | |
| ```json
 | |
| {
 | |
|     "task_id": "election-leader-2025",
 | |
|     "state": "executing",
 | |
|     "beats_left": 5,
 | |
|     "progress": 0.0,
 | |
|     "notes": "P2P election: started (peers: 0, node: 12D3KooW...)"
 | |
| }
 | |
| ```
 | |
| 
 | |
| #### UpdateP2POperationPhase
 | |
| 
 | |
| Updates the phase of an active P2P operation.
 | |
| 
 | |
| ```go
 | |
| func (i *Integration) UpdateP2POperationPhase(
 | |
|     operationID string,
 | |
|     phase OperationPhase,
 | |
|     peerCount int,
 | |
| ) error
 | |
| ```
 | |
| 
 | |
| **Parameters:**
 | |
| - `operationID` - Operation identifier
 | |
| - `phase` - New phase (PhaseConnecting, PhaseNegotiating, etc.)
 | |
| - `peerCount` - Current peer count involved in operation
 | |
| 
 | |
| **Actions:**
 | |
| 1. Lookup operation in activeOperations
 | |
| 2. Update phase and peer count
 | |
| 3. Emit updated status claim
 | |
| 
 | |
| **Returns:** Error if operation not found
 | |
| 
 | |
| **Example:**
 | |
| ```go
 | |
| // Connected to peers
 | |
| err := integration.UpdateP2POperationPhase(
 | |
|     "election-leader-2025",
 | |
|     backbeat.PhaseConnecting,
 | |
|     3,
 | |
| )
 | |
| 
 | |
| // Negotiating consensus
 | |
| err = integration.UpdateP2POperationPhase(
 | |
|     "election-leader-2025",
 | |
|     backbeat.PhaseNegotiating,
 | |
|     3,
 | |
| )
 | |
| 
 | |
| // Executing election
 | |
| err = integration.UpdateP2POperationPhase(
 | |
|     "election-leader-2025",
 | |
|     backbeat.PhaseExecuting,
 | |
|     3,
 | |
| )
 | |
| ```
 | |
| 
 | |
| #### CompleteP2POperation
 | |
| 
 | |
| Marks a P2P operation as completed successfully.
 | |
| 
 | |
| ```go
 | |
| func (i *Integration) CompleteP2POperation(operationID string, peerCount int) error
 | |
| ```
 | |
| 
 | |
| **Parameters:**
 | |
| - `operationID` - Operation identifier
 | |
| - `peerCount` - Final peer count
 | |
| 
 | |
| **Actions:**
 | |
| 1. Lookup operation
 | |
| 2. Set phase to PhaseCompleted
 | |
| 3. Emit completion status claim (state: "done", progress: 1.0)
 | |
| 4. Remove from activeOperations map
 | |
| 
 | |
| **Returns:** Error if operation not found or status emission fails
 | |
| 
 | |
| **Example:**
 | |
| ```go
 | |
| err := integration.CompleteP2POperation("election-leader-2025", 3)
 | |
| ```
 | |
| 
 | |
| **Status Claim Emitted:**
 | |
| ```json
 | |
| {
 | |
|     "task_id": "election-leader-2025",
 | |
|     "state": "done",
 | |
|     "beats_left": 0,
 | |
|     "progress": 1.0,
 | |
|     "notes": "P2P election: completed (peers: 3, node: 12D3KooW...)"
 | |
| }
 | |
| ```
 | |
| 
 | |
| #### FailP2POperation
 | |
| 
 | |
| Marks a P2P operation as failed.
 | |
| 
 | |
| ```go
 | |
| func (i *Integration) FailP2POperation(operationID string, reason string) error
 | |
| ```
 | |
| 
 | |
| **Parameters:**
 | |
| - `operationID` - Operation identifier
 | |
| - `reason` - Failure reason (for logging and status)
 | |
| 
 | |
| **Actions:**
 | |
| 1. Lookup operation
 | |
| 2. Set phase to PhaseFailed
 | |
| 3. Emit failure status claim (state: "failed", progress: 0.0)
 | |
| 4. Remove from activeOperations map
 | |
| 
 | |
| **Returns:** Error if operation not found or status emission fails
 | |
| 
 | |
| **Example:**
 | |
| ```go
 | |
| err := integration.FailP2POperation(
 | |
|     "election-leader-2025",
 | |
|     "quorum not reached within timeout",
 | |
| )
 | |
| ```
 | |
| 
 | |
| **Status Claim Emitted:**
 | |
| ```json
 | |
| {
 | |
|     "task_id": "election-leader-2025",
 | |
|     "state": "failed",
 | |
|     "beats_left": 0,
 | |
|     "progress": 0.0,
 | |
|     "notes": "P2P operation failed: quorum not reached within timeout (type: election)"
 | |
| }
 | |
| ```
 | |
| 
 | |
| ### Health and Monitoring
 | |
| 
 | |
| #### GetHealth
 | |
| 
 | |
| Returns the current BACKBEAT integration health status.
 | |
| 
 | |
| ```go
 | |
| func (i *Integration) GetHealth() map[string]interface{}
 | |
| ```
 | |
| 
 | |
| **Returns:** Map with health metrics:
 | |
| - `enabled` - Integration enabled flag
 | |
| - `started` - Integration started flag
 | |
| - `connected` - NATS connection status
 | |
| - `current_beat` - Current beat index
 | |
| - `current_tempo` - Current tempo (BPM)
 | |
| - `measured_bpm` - Measured beats per minute
 | |
| - `tempo_drift` - Tempo drift status
 | |
| - `reconnect_count` - NATS reconnection count
 | |
| - `active_operations` - Count of active operations
 | |
| - `local_degradation` - Local performance degradation flag
 | |
| - `errors` - Recent error messages
 | |
| - `node_id` - CHORUS node ID
 | |
| 
 | |
| **Example:**
 | |
| ```go
 | |
| health := integration.GetHealth()
 | |
| fmt.Printf("BACKBEAT connected: %v\n", health["connected"])
 | |
| fmt.Printf("Active operations: %d\n", health["active_operations"])
 | |
| ```
 | |
| 
 | |
| **Example Response:**
 | |
| ```json
 | |
| {
 | |
|     "enabled": true,
 | |
|     "started": true,
 | |
|     "connected": true,
 | |
|     "current_beat": 12345,
 | |
|     "current_tempo": 2,
 | |
|     "measured_bpm": 2.01,
 | |
|     "tempo_drift": "acceptable",
 | |
|     "reconnect_count": 0,
 | |
|     "active_operations": 2,
 | |
|     "local_degradation": false,
 | |
|     "errors": [],
 | |
|     "node_id": "12D3KooWAbc..."
 | |
| }
 | |
| ```
 | |
| 
 | |
| #### ExecuteWithBeatBudget
 | |
| 
 | |
| Executes a function with a BACKBEAT beat budget.
 | |
| 
 | |
| ```go
 | |
| func (i *Integration) ExecuteWithBeatBudget(beats int, fn func() error) error
 | |
| ```
 | |
| 
 | |
| **Parameters:**
 | |
| - `beats` - Beat budget for operation
 | |
| - `fn` - Function to execute
 | |
| 
 | |
| **Actions:**
 | |
| 1. Check if integration is started
 | |
| 2. Delegate to SDK `WithBeatBudget()` for timing enforcement
 | |
| 3. Fall back to regular execution if not started
 | |
| 
 | |
| **Returns:** Error from function execution or timeout
 | |
| 
 | |
| **Example:**
 | |
| ```go
 | |
| err := integration.ExecuteWithBeatBudget(10, func() error {
 | |
|     // This operation should complete within 10 beats
 | |
|     return performExpensiveOperation()
 | |
| })
 | |
| if err != nil {
 | |
|     log.Error("Operation exceeded beat budget:", err)
 | |
| }
 | |
| ```
 | |
| 
 | |
| ## Beat Callbacks
 | |
| 
 | |
| ### onBeat
 | |
| 
 | |
| Handles regular beat events from BACKBEAT.
 | |
| 
 | |
| ```go
 | |
| func (i *Integration) onBeat(beat sdk.BeatFrame)
 | |
| ```
 | |
| 
 | |
| **Called:** Every beat (every 30 seconds at 2 BPM)
 | |
| 
 | |
| **BeatFrame Structure:**
 | |
| - `BeatIndex` - Sequential beat number
 | |
| - `Phase` - Current phase within beat
 | |
| - `TempoBPM` - Current tempo
 | |
| - `WindowID` - Time window identifier
 | |
| 
 | |
| **Actions:**
 | |
| 1. Log beat reception with details
 | |
| 2. Emit status claims for all active operations
 | |
| 3. Periodic health status emission (every 8 beats = ~4 minutes)
 | |
| 
 | |
| **Example Log:**
 | |
| ```
 | |
| 🥁 BACKBEAT beat received - beat=12345 phase=upbeat tempo=2 window=w-1234
 | |
| ```
 | |
| 
 | |
| ### onDownbeat
 | |
| 
 | |
| Handles downbeat (bar start) events.
 | |
| 
 | |
| ```go
 | |
| func (i *Integration) onDownbeat(beat sdk.BeatFrame)
 | |
| ```
 | |
| 
 | |
| **Called:** At the start of each bar (every N beats, configurable)
 | |
| 
 | |
| **Actions:**
 | |
| 1. Log downbeat reception
 | |
| 2. Cleanup completed operations
 | |
| 3. Log active operation count
 | |
| 
 | |
| **Example Log:**
 | |
| ```
 | |
| 🎼 BACKBEAT downbeat - new bar started - beat=12344 window=w-1234
 | |
| 🧹 BACKBEAT operations cleanup check - active: 2
 | |
| ```
 | |
| 
 | |
| ## Status Claim Emission
 | |
| 
 | |
| ### Operation Status Claims
 | |
| 
 | |
| Emitted for each active operation on every beat.
 | |
| 
 | |
| ```go
 | |
| func (i *Integration) emitOperationStatus(operation *P2POperation) error
 | |
| ```
 | |
| 
 | |
| **Calculated Fields:**
 | |
| - **Beats Passed:** Current beat - start beat
 | |
| - **Beats Left:** Estimated beats - beats passed (minimum 0)
 | |
| - **Progress:** Beats passed / estimated beats (maximum 1.0)
 | |
| - **State:** "executing", "done", or "failed"
 | |
| 
 | |
| **Status Claim Structure:**
 | |
| ```json
 | |
| {
 | |
|     "task_id": "operation-id",
 | |
|     "state": "executing",
 | |
|     "beats_left": 3,
 | |
|     "progress": 0.4,
 | |
|     "notes": "P2P dht_store: executing (peers: 5, node: 12D3KooW...)"
 | |
| }
 | |
| ```
 | |
| 
 | |
| ### Health Status Claims
 | |
| 
 | |
| Emitted periodically (every 8 beats = ~4 minutes at 2 BPM).
 | |
| 
 | |
| ```go
 | |
| func (i *Integration) emitHealthStatus() error
 | |
| ```
 | |
| 
 | |
| **Health Claim Structure:**
 | |
| ```json
 | |
| {
 | |
|     "task_id": "chorus-p2p-health",
 | |
|     "state": "executing",
 | |
|     "beats_left": 0,
 | |
|     "progress": 1.0,
 | |
|     "notes": "CHORUS P2P healthy: connected=true, operations=2, tempo=2 BPM, node=12D3KooW..."
 | |
| }
 | |
| ```
 | |
| 
 | |
| **State Determination:**
 | |
| - `waiting` - No active operations
 | |
| - `executing` - One or more active operations
 | |
| - `failed` - SDK reports errors
 | |
| 
 | |
| ## Integration with CHORUS
 | |
| 
 | |
| ### SharedRuntime Integration
 | |
| 
 | |
| The Integration is created and managed by `runtime.SharedRuntime`:
 | |
| 
 | |
| ```go
 | |
| type SharedRuntime struct {
 | |
|     // ... other fields
 | |
|     BackbeatIntegration *backbeat.Integration
 | |
| }
 | |
| 
 | |
| func (sr *SharedRuntime) Initialize(cfg *config.Config) error {
 | |
|     // ... other initialization
 | |
| 
 | |
|     // Create BACKBEAT integration
 | |
|     if cfg.Backbeat.Enabled {
 | |
|         integration, err := backbeat.NewIntegration(
 | |
|             cfg,
 | |
|             sr.Node.ID().String(),
 | |
|             sr.Logger,
 | |
|         )
 | |
|         if err == nil {
 | |
|             sr.BackbeatIntegration = integration
 | |
|             integration.Start(context.Background())
 | |
|         }
 | |
|     }
 | |
| }
 | |
| ```
 | |
| 
 | |
| ### P2P Operation Tracking
 | |
| 
 | |
| CHORUS components use BACKBEAT to track distributed operations:
 | |
| 
 | |
| **DHT Operations:**
 | |
| ```go
 | |
| // Start tracking
 | |
| integration.StartP2POperation(
 | |
|     "dht-store-"+key,
 | |
|     "dht_store",
 | |
|     3, // Expect 3 beats
 | |
|     map[string]interface{}{"key": key},
 | |
| )
 | |
| 
 | |
| // Update phase
 | |
| integration.UpdateP2POperationPhase("dht-store-"+key, backbeat.PhaseExecuting, peerCount)
 | |
| 
 | |
| // Complete
 | |
| integration.CompleteP2POperation("dht-store-"+key, peerCount)
 | |
| ```
 | |
| 
 | |
| **PubSub Sync:**
 | |
| ```go
 | |
| integration.StartP2POperation(
 | |
|     "pubsub-sync-"+messageID,
 | |
|     "pubsub_sync",
 | |
|     2,
 | |
|     map[string]interface{}{"topic": topic},
 | |
| )
 | |
| ```
 | |
| 
 | |
| **Peer Discovery:**
 | |
| ```go
 | |
| integration.StartP2POperation(
 | |
|     "peer-discovery-"+sessionID,
 | |
|     "peer_discovery",
 | |
|     5,
 | |
|     map[string]interface{}{"target_peers": 10},
 | |
| )
 | |
| ```
 | |
| 
 | |
| ### HAP Status Display
 | |
| 
 | |
| Human Agent Portal displays BACKBEAT status:
 | |
| 
 | |
| ```go
 | |
| func (t *TerminalInterface) printStatus() {
 | |
|     // ... other status
 | |
| 
 | |
|     if t.runtime.BackbeatIntegration != nil {
 | |
|         health := t.runtime.BackbeatIntegration.GetHealth()
 | |
|         if connected, ok := health["connected"].(bool); ok && connected {
 | |
|             fmt.Printf("BACKBEAT: ✅ Connected\n")
 | |
|         } else {
 | |
|             fmt.Printf("BACKBEAT: ⚠️ Disconnected\n")
 | |
|         }
 | |
|     } else {
 | |
|         fmt.Printf("BACKBEAT: ❌ Disabled\n")
 | |
|     }
 | |
| }
 | |
| ```
 | |
| 
 | |
| ## Configuration Examples
 | |
| 
 | |
| ### Production Configuration
 | |
| 
 | |
| ```bash
 | |
| export CHORUS_BACKBEAT_ENABLED=true
 | |
| export CHORUS_BACKBEAT_CLUSTER_ID=chorus-production
 | |
| export CHORUS_BACKBEAT_AGENT_ID=chorus-agent-42
 | |
| export CHORUS_BACKBEAT_NATS_URL=nats://backbeat-nats.chorus.services:4222
 | |
| ```
 | |
| 
 | |
| ### Development Configuration
 | |
| 
 | |
| ```bash
 | |
| export CHORUS_BACKBEAT_ENABLED=true
 | |
| export CHORUS_BACKBEAT_CLUSTER_ID=chorus-dev
 | |
| export CHORUS_BACKBEAT_AGENT_ID=chorus-dev-alice
 | |
| export CHORUS_BACKBEAT_NATS_URL=nats://localhost:4222
 | |
| ```
 | |
| 
 | |
| ### Disabled Configuration
 | |
| 
 | |
| ```bash
 | |
| export CHORUS_BACKBEAT_ENABLED=false
 | |
| ```
 | |
| 
 | |
| ## Beat Budget Guidelines
 | |
| 
 | |
| Recommended beat budgets for common operations:
 | |
| 
 | |
| | Operation Type | Estimated Beats | Time at 2 BPM | Rationale |
 | |
| |---|---|---|---|
 | |
| | Peer Discovery | 2-5 beats | 1-2.5 min | Network discovery and handshake |
 | |
| | DHT Store | 2-4 beats | 1-2 min | Distributed storage with replication |
 | |
| | DHT Retrieve | 1-3 beats | 30-90 sec | Distributed lookup and retrieval |
 | |
| | PubSub Sync | 1-2 beats | 30-60 sec | Message propagation |
 | |
| | Leader Election | 3-10 beats | 1.5-5 min | Consensus negotiation |
 | |
| | Task Coordination | 5-20 beats | 2.5-10 min | Multi-agent task assignment |
 | |
| 
 | |
| **Factors Affecting Beat Budget:**
 | |
| - Network latency
 | |
| - Peer count
 | |
| - Data size
 | |
| - Consensus requirements
 | |
| - Retry logic
 | |
| 
 | |
| ## Error Handling
 | |
| 
 | |
| ### Integration Errors
 | |
| 
 | |
| **Not Started:**
 | |
| ```go
 | |
| if !i.started {
 | |
|     return fmt.Errorf("BACKBEAT integration not started")
 | |
| }
 | |
| ```
 | |
| 
 | |
| **Operation Not Found:**
 | |
| ```go
 | |
| operation, exists := i.activeOperations[operationID]
 | |
| if !exists {
 | |
|     return fmt.Errorf("operation %s not found", operationID)
 | |
| }
 | |
| ```
 | |
| 
 | |
| **SDK Errors:**
 | |
| ```go
 | |
| if err := i.client.Start(i.ctx); err != nil {
 | |
|     return fmt.Errorf("failed to start BACKBEAT client: %w", err)
 | |
| }
 | |
| ```
 | |
| 
 | |
| ### Degradation Handling
 | |
| 
 | |
| BACKBEAT SDK tracks timing degradation:
 | |
| - **Tempo Drift:** Difference between expected and measured BPM
 | |
| - **Local Degradation:** Local system performance issues
 | |
| - **Reconnect Count:** NATS connection stability
 | |
| 
 | |
| Health status includes these metrics for monitoring:
 | |
| ```json
 | |
| {
 | |
|     "tempo_drift": "acceptable",
 | |
|     "local_degradation": false,
 | |
|     "reconnect_count": 0
 | |
| }
 | |
| ```
 | |
| 
 | |
| ## Performance Characteristics
 | |
| 
 | |
| ### Resource Usage
 | |
| 
 | |
| - **Memory:** O(n) where n = active operations count
 | |
| - **CPU:** Minimal, callback-driven architecture
 | |
| - **Network:** Status claims on each beat (low bandwidth)
 | |
| - **Latency:** Beat-aligned, not real-time (30-second granularity at 2 BPM)
 | |
| 
 | |
| ### Scalability
 | |
| 
 | |
| - **Active Operations:** Designed for 100s of concurrent operations
 | |
| - **Beat Frequency:** Configurable tempo (1-60 BPM typical)
 | |
| - **Status Claims:** Batched per beat, not per operation event
 | |
| - **Cleanup:** Automatic on completion/failure
 | |
| 
 | |
| ### Timing Characteristics
 | |
| 
 | |
| At default 2 BPM (30 seconds per beat):
 | |
| - **Minimum tracking granularity:** 30 seconds
 | |
| - **Health check frequency:** 4 minutes (8 beats)
 | |
| - **Operation overhead:** ~0.1s per beat callback
 | |
| - **Status claim latency:** <1s to NATS
 | |
| 
 | |
| ## Debugging and Monitoring
 | |
| 
 | |
| ### Enable Debug Logging
 | |
| 
 | |
| ```go
 | |
| // In BACKBEAT SDK configuration
 | |
| sdkConfig.Logger = slog.New(slog.NewTextHandler(os.Stdout, &slog.HandlerOptions{
 | |
|     Level: slog.LevelDebug,
 | |
| }))
 | |
| ```
 | |
| 
 | |
| ### Monitor Active Operations
 | |
| 
 | |
| ```go
 | |
| health := integration.GetHealth()
 | |
| activeOps := health["active_operations"].(int)
 | |
| fmt.Printf("Active P2P operations: %d\n", activeOps)
 | |
| ```
 | |
| 
 | |
| ### Check NATS Connectivity
 | |
| 
 | |
| ```go
 | |
| health := integration.GetHealth()
 | |
| if connected, ok := health["connected"].(bool); !ok || !connected {
 | |
|     log.Warn("BACKBEAT disconnected from NATS")
 | |
|     reconnectCount := health["reconnect_count"].(int)
 | |
|     log.Warn("Reconnection attempts:", reconnectCount)
 | |
| }
 | |
| ```
 | |
| 
 | |
| ### Tempo Drift Monitoring
 | |
| 
 | |
| ```go
 | |
| health := integration.GetHealth()
 | |
| drift := health["tempo_drift"].(string)
 | |
| measuredBPM := health["measured_bpm"].(float64)
 | |
| expectedBPM := health["current_tempo"].(int)
 | |
| 
 | |
| if drift != "acceptable" {
 | |
|     log.Warn("Tempo drift detected:", drift)
 | |
|     log.Warn("Expected:", expectedBPM, "Measured:", measuredBPM)
 | |
| }
 | |
| ```
 | |
| 
 | |
| ## Testing
 | |
| 
 | |
| ### Unit Testing
 | |
| 
 | |
| Mock the SDK client for unit tests:
 | |
| 
 | |
| ```go
 | |
| type MockSDKClient struct {
 | |
|     // ... mock fields
 | |
| }
 | |
| 
 | |
| func (m *MockSDKClient) Start(ctx context.Context) error {
 | |
|     return nil
 | |
| }
 | |
| 
 | |
| func (m *MockSDKClient) GetCurrentBeat() int64 {
 | |
|     return 1000
 | |
| }
 | |
| 
 | |
| // ... implement other SDK methods
 | |
| ```
 | |
| 
 | |
| ### Integration Testing
 | |
| 
 | |
| Test with real BACKBEAT cluster:
 | |
| 
 | |
| ```bash
 | |
| # Start BACKBEAT services
 | |
| docker-compose -f backbeat-compose.yml up -d
 | |
| 
 | |
| # Run CHORUS with BACKBEAT enabled
 | |
| export CHORUS_BACKBEAT_ENABLED=true
 | |
| export CHORUS_BACKBEAT_NATS_URL=nats://localhost:4222
 | |
| ./chorus-agent
 | |
| 
 | |
| # Monitor status claims
 | |
| nats sub "backbeat.status.>"
 | |
| ```
 | |
| 
 | |
| ### Load Testing
 | |
| 
 | |
| Test with many concurrent operations:
 | |
| 
 | |
| ```go
 | |
| func TestManyOperations(t *testing.T) {
 | |
|     integration := setupIntegration(t)
 | |
| 
 | |
|     for i := 0; i < 1000; i++ {
 | |
|         opID := fmt.Sprintf("test-op-%d", i)
 | |
|         err := integration.StartP2POperation(opID, "dht_store", 5, nil)
 | |
|         require.NoError(t, err)
 | |
|     }
 | |
| 
 | |
|     // Wait for beats
 | |
|     time.Sleep(3 * time.Minute)
 | |
| 
 | |
|     // Complete operations
 | |
|     for i := 0; i < 1000; i++ {
 | |
|         opID := fmt.Sprintf("test-op-%d", i)
 | |
|         err := integration.CompleteP2POperation(opID, 5)
 | |
|         require.NoError(t, err)
 | |
|     }
 | |
| 
 | |
|     // Verify cleanup
 | |
|     health := integration.GetHealth()
 | |
|     assert.Equal(t, 0, health["active_operations"])
 | |
| }
 | |
| ```
 | |
| 
 | |
| ## Troubleshooting
 | |
| 
 | |
| ### Common Issues
 | |
| 
 | |
| **"BACKBEAT integration is disabled"**
 | |
| - Check `CHORUS_BACKBEAT_ENABLED` environment variable
 | |
| - Verify configuration in CHORUS config file
 | |
| 
 | |
| **"Failed to start BACKBEAT client"**
 | |
| - Check NATS connectivity
 | |
| - Verify NATS URL is correct
 | |
| - Ensure NATS server is running
 | |
| - Check firewall rules
 | |
| 
 | |
| **"Operation not found"**
 | |
| - Operation may have already completed
 | |
| - Operation ID mismatch
 | |
| - Integration not started before operation registration
 | |
| 
 | |
| **High reconnect count**
 | |
| - Network instability
 | |
| - NATS server restarts
 | |
| - Connection timeout configuration
 | |
| 
 | |
| **Tempo drift**
 | |
| - System clock synchronization issues (NTP)
 | |
| - High CPU load affecting timing
 | |
| - Network latency spikes
 | |
| 
 | |
| ### Debug Commands
 | |
| 
 | |
| Check NATS connectivity:
 | |
| ```bash
 | |
| nats server check
 | |
| ```
 | |
| 
 | |
| Monitor BACKBEAT messages:
 | |
| ```bash
 | |
| nats sub "backbeat.>"
 | |
| ```
 | |
| 
 | |
| View status claims:
 | |
| ```bash
 | |
| nats sub "backbeat.status.>"
 | |
| ```
 | |
| 
 | |
| Check CHORUS health:
 | |
| ```bash
 | |
| # Via HAP
 | |
| hap> status
 | |
| ```
 | |
| 
 | |
| ## Future Enhancements
 | |
| 
 | |
| ### Planned Features
 | |
| 
 | |
| - **Operation Dependencies:** Track operation dependencies for complex workflows
 | |
| - **Beat Budget Warnings:** Alert when operations approach budget limits
 | |
| - **Historical Metrics:** Track operation completion times for better estimates
 | |
| - **Dynamic Beat Budgets:** Adjust budgets based on historical performance
 | |
| - **Operation Priorities:** Prioritize critical operations during contention
 | |
| 
 | |
| ### Potential Improvements
 | |
| 
 | |
| - **Adaptive Beat Budgets:** Learn optimal budgets from execution history
 | |
| - **Operation Correlation:** Link related operations for workflow tracking
 | |
| - **Beat Budget Profiles:** Pre-defined budgets for common operation patterns
 | |
| - **Performance Analytics:** Detailed metrics on operation performance vs. budget
 | |
| 
 | |
| ## Related Documentation
 | |
| 
 | |
| - `BACKBEAT SDK Documentation` - BACKBEAT Go SDK reference
 | |
| - `/docs/comprehensive/internal/runtime.md` - SharedRuntime integration
 | |
| - `/docs/comprehensive/pkg/p2p.md` - P2P operations tracked by BACKBEAT
 | |
| - `/docs/comprehensive/pkg/storage.md` - DHT operations with beat budgets
 | |
| 
 | |
| ## Summary
 | |
| 
 | |
| The `backbeat` package provides essential timing and coordination infrastructure for CHORUS P2P operations:
 | |
| 
 | |
| - **400 lines** of integration code
 | |
| - **P2P operation tracking** with beat budgets
 | |
| - **6 operation phases** for lifecycle management
 | |
| - **4 operation types** (election, dht_store, pubsub_sync, peer_discovery)
 | |
| - **Status claim emission** on every beat
 | |
| - **Health monitoring** with tempo drift detection
 | |
| - **Graceful degradation** when BACKBEAT unavailable
 | |
| 
 | |
| The integration enables CHORUS to participate in cluster-wide coordinated operations with timing guarantees, progress tracking, and health monitoring, making distributed P2P operations observable and manageable across the agent network.
 | |
| 
 | |
| **Current Status:** Production-ready, actively used for P2P operation telemetry and coordination across CHORUS cluster. |