BACKBEAT Go SDK
The BACKBEAT Go SDK enables CHORUS services to become "BACKBEAT-aware" by providing client libraries for beat synchronization, status emission, and beat-budget management.
Features
- Beat Subscription (BACKBEAT-REQ-040): Subscribe to beat and downbeat events with jitter-tolerant scheduling
- Status Emission (BACKBEAT-REQ-041): Emit status claims with automatic agent_id, task_id, and HLC population
- Beat Budgets (BACKBEAT-REQ-042): Execute functions with beat-based timeouts and cancellation
- Legacy Compatibility (BACKBEAT-REQ-043): Support for legacy {bar,beat}patterns with migration warnings
- Security (BACKBEAT-REQ-044): Ed25519 signing and required headers for status claims
- Local Degradation: Continue operating when pulse service is unavailable
- Comprehensive Observability: Metrics, health reporting, and performance monitoring
Quick Start
package main
import (
    "context"
    "crypto/ed25519"
    "crypto/rand"
    "log/slog"
    
    "github.com/chorus-services/backbeat/pkg/sdk"
)
func main() {
    // Generate signing key
    _, signingKey, _ := ed25519.GenerateKey(rand.Reader)
    
    // Configure SDK
    config := sdk.DefaultConfig()
    config.ClusterID = "chorus-dev"
    config.AgentID = "my-service"
    config.NATSUrl = "nats://localhost:4222"
    config.SigningKey = signingKey
    
    // Create client
    client := sdk.NewClient(config)
    
    // Register beat callback
    client.OnBeat(func(beat sdk.BeatFrame) {
        slog.Info("Beat received", "beat_index", beat.BeatIndex)
        
        // Emit status
        client.EmitStatusClaim(sdk.StatusClaim{
            State:     "executing",
            BeatsLeft: 5,
            Progress:  0.3,
            Notes:     "Processing data",
        })
    })
    
    // Start client
    ctx := context.Background()
    if err := client.Start(ctx); err != nil {
        panic(err)
    }
    defer client.Stop()
    
    // Your service logic here...
    select {}
}
Configuration
Basic Configuration
config := &sdk.Config{
    ClusterID: "your-cluster",    // BACKBEAT cluster ID
    AgentID:   "your-agent",      // Unique agent identifier
    NATSUrl:   "nats://localhost:4222", // NATS connection URL
}
Advanced Configuration
config := sdk.DefaultConfig()
config.ClusterID = "chorus-prod"
config.AgentID = "web-service-01"
config.NATSUrl = "nats://nats.cluster.local:4222"
config.SigningKey = loadSigningKey() // Ed25519 private key
config.JitterTolerance = 100 * time.Millisecond
config.ReconnectDelay = 2 * time.Second
config.MaxReconnects = 10 // -1 for infinite
config.Logger = slog.New(slog.NewJSONHandler(os.Stdout, nil))
Core Features
Beat Subscription
// Register beat callback (called every beat)
client.OnBeat(func(beat sdk.BeatFrame) {
    // Your beat logic here
    fmt.Printf("Beat %d at %s\n", beat.BeatIndex, beat.DeadlineAt)
})
// Register downbeat callback (called at bar starts)
client.OnDownbeat(func(beat sdk.BeatFrame) {
    // Your downbeat logic here
    fmt.Printf("Bar started: %s\n", beat.WindowID)
})
Status Emission
// Basic status emission
err := client.EmitStatusClaim(sdk.StatusClaim{
    State:     "executing",  // executing|planning|waiting|review|done|failed
    BeatsLeft: 10,          // estimated beats remaining
    Progress:  0.75,        // progress ratio (0.0-1.0)
    Notes:     "Processing batch 5/10",
})
// Advanced status with task tracking
err := client.EmitStatusClaim(sdk.StatusClaim{
    TaskID:    "task-12345", // auto-generated if empty
    State:     "waiting",
    WaitFor:   []string{"hmmm://thread/abc123"}, // dependencies
    BeatsLeft: 0,
    Progress:  1.0,
    Notes:     "Waiting for thread completion",
})
Beat Budgets
// Execute with beat-based timeout
err := client.WithBeatBudget(10, func() error {
    // This function has 10 beats to complete
    return performTask()
})
if err != nil {
    // Handle timeout or task error
    fmt.Printf("Task failed or exceeded budget: %v\n", err)
}
// Real-world example
err := client.WithBeatBudget(20, func() error {
    // Database operation with beat budget
    ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
    defer cancel()
    
    return database.ProcessBatch(ctx, batchData)
})
Client Interface
type Client interface {
    // Beat subscription
    OnBeat(callback func(BeatFrame)) error
    OnDownbeat(callback func(BeatFrame)) error
    
    // Status emission
    EmitStatusClaim(claim StatusClaim) error
    
    // Beat budgets
    WithBeatBudget(n int, fn func() error) error
    
    // Utilities
    GetCurrentBeat() int64
    GetCurrentWindow() string
    IsInWindow(windowID string) bool
    
    // Lifecycle
    Start(ctx context.Context) error
    Stop() error
    Health() HealthStatus
}
Examples
The SDK includes comprehensive examples:
- Simple Agent: Basic beat subscription and status emission
- Task Processor: Beat budget usage for task timeout management
- Service Monitor: Health monitoring with beat-aligned reporting
Running Examples
# Simple agent example
go run pkg/sdk/examples/simple_agent.go
# Task processor with beat budgets
go run pkg/sdk/examples/task_processor.go
# Service monitor with health reporting
go run pkg/sdk/examples/service_monitor.go
Observability
Health Monitoring
health := client.Health()
fmt.Printf("Connected: %v\n", health.Connected)
fmt.Printf("Last Beat: %d at %s\n", health.LastBeat, health.LastBeatTime)
fmt.Printf("Time Drift: %s\n", health.TimeDrift)
fmt.Printf("Reconnects: %d\n", health.ReconnectCount)
fmt.Printf("Local Degradation: %v\n", health.LocalDegradation)
Metrics
The SDK exposes metrics via Go's expvar package:
- Connection metrics: status, reconnection count, duration
- Beat metrics: received, jitter, callback latency, misses
- Status metrics: claims emitted, errors
- Budget metrics: created, completed, timed out
- Error metrics: total count, last error
Access metrics at http://localhost:8080/debug/vars when using expvar.
Logging
The SDK uses structured logging via slog:
config.Logger = slog.New(slog.NewJSONHandler(os.Stdout, &slog.HandlerOptions{
    Level: slog.LevelDebug, // Set appropriate level
}))
Error Handling
The SDK provides comprehensive error handling:
- Connection Errors: Automatic reconnection with exponential backoff
- Beat Jitter: Tolerance for network delays and timing variations
- Callback Panics: Recovery and logging without affecting other callbacks
- Validation Errors: Status claim validation with detailed error messages
- Timeout Errors: Beat budget timeouts with context cancellation
Local Degradation
When the pulse service is unavailable, the SDK automatically enters local degradation mode:
- Generates synthetic beats to maintain callback timing
- Uses fallback 60 BPM tempo
- Marks beat frames with "degraded" phase
- Automatically recovers when pulse service returns
Legacy Compatibility
Support for legacy {bar,beat} patterns (BACKBEAT-REQ-043):
// Convert legacy format (logs warning once)
beatIndex := client.ConvertLegacyBeat(bar, beat)
// Get legacy format from current beat
legacy := client.GetLegacyBeatInfo()
fmt.Printf("Bar: %d, Beat: %d\n", legacy.Bar, legacy.Beat)
Security
The SDK implements BACKBEAT security requirements:
- Ed25519 Signatures: All status claims are signed when signing key provided
- Required Headers: Includes x-window-idandx-hlcheaders
- Agent Identification: Automatic x-agent-idheader for routing
// Configure signing
_, signingKey, _ := ed25519.GenerateKey(rand.Reader)
config.SigningKey = signingKey
Performance
The SDK is designed for high performance:
- Beat Callback Latency: Target ≤5ms callback execution
- Timer Drift: ≤1% drift over 1 hour without leader
- Concurrent Safe: All operations are goroutine-safe
- Memory Efficient: Bounded error lists and metric samples
Integration Patterns
Web Service Integration
func main() {
    // Initialize BACKBEAT client
    client := sdk.NewClient(config)
    client.OnBeat(func(beat sdk.BeatFrame) {
        // Report web service status
        client.EmitStatusClaim(sdk.StatusClaim{
            State: "executing",
            Progress: getRequestSuccessRate(),
            Notes: fmt.Sprintf("Handling %d req/s", getCurrentRPS()),
        })
    })
    
    // Start HTTP server
    http.HandleFunc("/health", func(w http.ResponseWriter, r *http.Request) {
        health := client.Health()
        json.NewEncoder(w).Encode(health)
    })
}
Background Job Processor
func processJobs(client sdk.Client) {
    for job := range jobQueue {
        // Use beat budget for job timeout
        err := client.WithBeatBudget(job.MaxBeats, func() error {
            return processJob(job)
        })
        
        if err != nil {
            client.EmitStatusClaim(sdk.StatusClaim{
                TaskID: job.ID,
                State: "failed", 
                Notes: err.Error(),
            })
        }
    }
}
Testing
The SDK includes comprehensive test utilities:
# Run all tests
go test ./pkg/sdk/...
# Run with race detection
go test -race ./pkg/sdk/...
# Run benchmarks
go test -bench=. ./pkg/sdk/examples/
Requirements
- Go 1.22 or later
- NATS server for messaging
- BACKBEAT pulse service running
- Network connectivity to cluster
Contributing
- Follow standard Go conventions
- Include comprehensive tests
- Update documentation for API changes
- Ensure examples remain working
- Maintain backward compatibility
License
This SDK is part of the BACKBEAT project and follows the same licensing terms.
