BACKBEAT Go SDK
The BACKBEAT Go SDK enables CHORUS services to become "BACKBEAT-aware" by providing client libraries for beat synchronization, status emission, and beat-budget management.
Features
- Beat Subscription (BACKBEAT-REQ-040): Subscribe to beat and downbeat events with jitter-tolerant scheduling
- Status Emission (BACKBEAT-REQ-041): Emit status claims with automatic agent_id, task_id, and HLC population
- Beat Budgets (BACKBEAT-REQ-042): Execute functions with beat-based timeouts and cancellation
- Legacy Compatibility (BACKBEAT-REQ-043): Support for legacy
{bar,beat}patterns with migration warnings - Security (BACKBEAT-REQ-044): Ed25519 signing and required headers for status claims
- Local Degradation: Continue operating when pulse service is unavailable
- Comprehensive Observability: Metrics, health reporting, and performance monitoring
Quick Start
package main
import (
"context"
"crypto/ed25519"
"crypto/rand"
"log/slog"
"github.com/chorus-services/backbeat/pkg/sdk"
)
func main() {
// Generate signing key
_, signingKey, _ := ed25519.GenerateKey(rand.Reader)
// Configure SDK
config := sdk.DefaultConfig()
config.ClusterID = "chorus-dev"
config.AgentID = "my-service"
config.NATSUrl = "nats://localhost:4222"
config.SigningKey = signingKey
// Create client
client := sdk.NewClient(config)
// Register beat callback
client.OnBeat(func(beat sdk.BeatFrame) {
slog.Info("Beat received", "beat_index", beat.BeatIndex)
// Emit status
client.EmitStatusClaim(sdk.StatusClaim{
State: "executing",
BeatsLeft: 5,
Progress: 0.3,
Notes: "Processing data",
})
})
// Start client
ctx := context.Background()
if err := client.Start(ctx); err != nil {
panic(err)
}
defer client.Stop()
// Your service logic here...
select {}
}
Configuration
Basic Configuration
config := &sdk.Config{
ClusterID: "your-cluster", // BACKBEAT cluster ID
AgentID: "your-agent", // Unique agent identifier
NATSUrl: "nats://localhost:4222", // NATS connection URL
}
Advanced Configuration
config := sdk.DefaultConfig()
config.ClusterID = "chorus-prod"
config.AgentID = "web-service-01"
config.NATSUrl = "nats://nats.cluster.local:4222"
config.SigningKey = loadSigningKey() // Ed25519 private key
config.JitterTolerance = 100 * time.Millisecond
config.ReconnectDelay = 2 * time.Second
config.MaxReconnects = 10 // -1 for infinite
config.Logger = slog.New(slog.NewJSONHandler(os.Stdout, nil))
Core Features
Beat Subscription
// Register beat callback (called every beat)
client.OnBeat(func(beat sdk.BeatFrame) {
// Your beat logic here
fmt.Printf("Beat %d at %s\n", beat.BeatIndex, beat.DeadlineAt)
})
// Register downbeat callback (called at bar starts)
client.OnDownbeat(func(beat sdk.BeatFrame) {
// Your downbeat logic here
fmt.Printf("Bar started: %s\n", beat.WindowID)
})
Status Emission
// Basic status emission
err := client.EmitStatusClaim(sdk.StatusClaim{
State: "executing", // executing|planning|waiting|review|done|failed
BeatsLeft: 10, // estimated beats remaining
Progress: 0.75, // progress ratio (0.0-1.0)
Notes: "Processing batch 5/10",
})
// Advanced status with task tracking
err := client.EmitStatusClaim(sdk.StatusClaim{
TaskID: "task-12345", // auto-generated if empty
State: "waiting",
WaitFor: []string{"hmmm://thread/abc123"}, // dependencies
BeatsLeft: 0,
Progress: 1.0,
Notes: "Waiting for thread completion",
})
Beat Budgets
// Execute with beat-based timeout
err := client.WithBeatBudget(10, func() error {
// This function has 10 beats to complete
return performTask()
})
if err != nil {
// Handle timeout or task error
fmt.Printf("Task failed or exceeded budget: %v\n", err)
}
// Real-world example
err := client.WithBeatBudget(20, func() error {
// Database operation with beat budget
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
return database.ProcessBatch(ctx, batchData)
})
Client Interface
type Client interface {
// Beat subscription
OnBeat(callback func(BeatFrame)) error
OnDownbeat(callback func(BeatFrame)) error
// Status emission
EmitStatusClaim(claim StatusClaim) error
// Beat budgets
WithBeatBudget(n int, fn func() error) error
// Utilities
GetCurrentBeat() int64
GetCurrentWindow() string
IsInWindow(windowID string) bool
// Lifecycle
Start(ctx context.Context) error
Stop() error
Health() HealthStatus
}
Examples
The SDK includes comprehensive examples:
- Simple Agent: Basic beat subscription and status emission
- Task Processor: Beat budget usage for task timeout management
- Service Monitor: Health monitoring with beat-aligned reporting
Running Examples
# Simple agent example
go run pkg/sdk/examples/simple_agent.go
# Task processor with beat budgets
go run pkg/sdk/examples/task_processor.go
# Service monitor with health reporting
go run pkg/sdk/examples/service_monitor.go
Observability
Health Monitoring
health := client.Health()
fmt.Printf("Connected: %v\n", health.Connected)
fmt.Printf("Last Beat: %d at %s\n", health.LastBeat, health.LastBeatTime)
fmt.Printf("Time Drift: %s\n", health.TimeDrift)
fmt.Printf("Reconnects: %d\n", health.ReconnectCount)
fmt.Printf("Local Degradation: %v\n", health.LocalDegradation)
Metrics
The SDK exposes metrics via Go's expvar package:
- Connection metrics: status, reconnection count, duration
- Beat metrics: received, jitter, callback latency, misses
- Status metrics: claims emitted, errors
- Budget metrics: created, completed, timed out
- Error metrics: total count, last error
Access metrics at http://localhost:8080/debug/vars when using expvar.
Logging
The SDK uses structured logging via slog:
config.Logger = slog.New(slog.NewJSONHandler(os.Stdout, &slog.HandlerOptions{
Level: slog.LevelDebug, // Set appropriate level
}))
Error Handling
The SDK provides comprehensive error handling:
- Connection Errors: Automatic reconnection with exponential backoff
- Beat Jitter: Tolerance for network delays and timing variations
- Callback Panics: Recovery and logging without affecting other callbacks
- Validation Errors: Status claim validation with detailed error messages
- Timeout Errors: Beat budget timeouts with context cancellation
Local Degradation
When the pulse service is unavailable, the SDK automatically enters local degradation mode:
- Generates synthetic beats to maintain callback timing
- Uses fallback 60 BPM tempo
- Marks beat frames with "degraded" phase
- Automatically recovers when pulse service returns
Legacy Compatibility
Support for legacy {bar,beat} patterns (BACKBEAT-REQ-043):
// Convert legacy format (logs warning once)
beatIndex := client.ConvertLegacyBeat(bar, beat)
// Get legacy format from current beat
legacy := client.GetLegacyBeatInfo()
fmt.Printf("Bar: %d, Beat: %d\n", legacy.Bar, legacy.Beat)
Security
The SDK implements BACKBEAT security requirements:
- Ed25519 Signatures: All status claims are signed when signing key provided
- Required Headers: Includes
x-window-idandx-hlcheaders - Agent Identification: Automatic
x-agent-idheader for routing
// Configure signing
_, signingKey, _ := ed25519.GenerateKey(rand.Reader)
config.SigningKey = signingKey
Performance
The SDK is designed for high performance:
- Beat Callback Latency: Target ≤5ms callback execution
- Timer Drift: ≤1% drift over 1 hour without leader
- Concurrent Safe: All operations are goroutine-safe
- Memory Efficient: Bounded error lists and metric samples
Integration Patterns
Web Service Integration
func main() {
// Initialize BACKBEAT client
client := sdk.NewClient(config)
client.OnBeat(func(beat sdk.BeatFrame) {
// Report web service status
client.EmitStatusClaim(sdk.StatusClaim{
State: "executing",
Progress: getRequestSuccessRate(),
Notes: fmt.Sprintf("Handling %d req/s", getCurrentRPS()),
})
})
// Start HTTP server
http.HandleFunc("/health", func(w http.ResponseWriter, r *http.Request) {
health := client.Health()
json.NewEncoder(w).Encode(health)
})
}
Background Job Processor
func processJobs(client sdk.Client) {
for job := range jobQueue {
// Use beat budget for job timeout
err := client.WithBeatBudget(job.MaxBeats, func() error {
return processJob(job)
})
if err != nil {
client.EmitStatusClaim(sdk.StatusClaim{
TaskID: job.ID,
State: "failed",
Notes: err.Error(),
})
}
}
}
Testing
The SDK includes comprehensive test utilities:
# Run all tests
go test ./pkg/sdk/...
# Run with race detection
go test -race ./pkg/sdk/...
# Run benchmarks
go test -bench=. ./pkg/sdk/examples/
Requirements
- Go 1.22 or later
- NATS server for messaging
- BACKBEAT pulse service running
- Network connectivity to cluster
Contributing
- Follow standard Go conventions
- Include comprehensive tests
- Update documentation for API changes
- Ensure examples remain working
- Maintain backward compatibility
License
This SDK is part of the BACKBEAT project and follows the same licensing terms.