# BACKBEAT Go SDK The BACKBEAT Go SDK enables CHORUS services to become "BACKBEAT-aware" by providing client libraries for beat synchronization, status emission, and beat-budget management. ## Features - **Beat Subscription (BACKBEAT-REQ-040)**: Subscribe to beat and downbeat events with jitter-tolerant scheduling - **Status Emission (BACKBEAT-REQ-041)**: Emit status claims with automatic agent_id, task_id, and HLC population - **Beat Budgets (BACKBEAT-REQ-042)**: Execute functions with beat-based timeouts and cancellation - **Legacy Compatibility (BACKBEAT-REQ-043)**: Support for legacy `{bar,beat}` patterns with migration warnings - **Security (BACKBEAT-REQ-044)**: Ed25519 signing and required headers for status claims - **Local Degradation**: Continue operating when pulse service is unavailable - **Comprehensive Observability**: Metrics, health reporting, and performance monitoring ## Quick Start ```go package main import ( "context" "crypto/ed25519" "crypto/rand" "log/slog" "github.com/chorus-services/backbeat/pkg/sdk" ) func main() { // Generate signing key _, signingKey, _ := ed25519.GenerateKey(rand.Reader) // Configure SDK config := sdk.DefaultConfig() config.ClusterID = "chorus-dev" config.AgentID = "my-service" config.NATSUrl = "nats://localhost:4222" config.SigningKey = signingKey // Create client client := sdk.NewClient(config) // Register beat callback client.OnBeat(func(beat sdk.BeatFrame) { slog.Info("Beat received", "beat_index", beat.BeatIndex) // Emit status client.EmitStatusClaim(sdk.StatusClaim{ State: "executing", BeatsLeft: 5, Progress: 0.3, Notes: "Processing data", }) }) // Start client ctx := context.Background() if err := client.Start(ctx); err != nil { panic(err) } defer client.Stop() // Your service logic here... select {} } ``` ## Configuration ### Basic Configuration ```go config := &sdk.Config{ ClusterID: "your-cluster", // BACKBEAT cluster ID AgentID: "your-agent", // Unique agent identifier NATSUrl: "nats://localhost:4222", // NATS connection URL } ``` ### Advanced Configuration ```go config := sdk.DefaultConfig() config.ClusterID = "chorus-prod" config.AgentID = "web-service-01" config.NATSUrl = "nats://nats.cluster.local:4222" config.SigningKey = loadSigningKey() // Ed25519 private key config.JitterTolerance = 100 * time.Millisecond config.ReconnectDelay = 2 * time.Second config.MaxReconnects = 10 // -1 for infinite config.Logger = slog.New(slog.NewJSONHandler(os.Stdout, nil)) ``` ## Core Features ### Beat Subscription ```go // Register beat callback (called every beat) client.OnBeat(func(beat sdk.BeatFrame) { // Your beat logic here fmt.Printf("Beat %d at %s\n", beat.BeatIndex, beat.DeadlineAt) }) // Register downbeat callback (called at bar starts) client.OnDownbeat(func(beat sdk.BeatFrame) { // Your downbeat logic here fmt.Printf("Bar started: %s\n", beat.WindowID) }) ``` ### Status Emission ```go // Basic status emission err := client.EmitStatusClaim(sdk.StatusClaim{ State: "executing", // executing|planning|waiting|review|done|failed BeatsLeft: 10, // estimated beats remaining Progress: 0.75, // progress ratio (0.0-1.0) Notes: "Processing batch 5/10", }) // Advanced status with task tracking err := client.EmitStatusClaim(sdk.StatusClaim{ TaskID: "task-12345", // auto-generated if empty State: "waiting", WaitFor: []string{"hmmm://thread/abc123"}, // dependencies BeatsLeft: 0, Progress: 1.0, Notes: "Waiting for thread completion", }) ``` ### Beat Budgets ```go // Execute with beat-based timeout err := client.WithBeatBudget(10, func() error { // This function has 10 beats to complete return performTask() }) if err != nil { // Handle timeout or task error fmt.Printf("Task failed or exceeded budget: %v\n", err) } // Real-world example err := client.WithBeatBudget(20, func() error { // Database operation with beat budget ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) defer cancel() return database.ProcessBatch(ctx, batchData) }) ``` ## Client Interface ```go type Client interface { // Beat subscription OnBeat(callback func(BeatFrame)) error OnDownbeat(callback func(BeatFrame)) error // Status emission EmitStatusClaim(claim StatusClaim) error // Beat budgets WithBeatBudget(n int, fn func() error) error // Utilities GetCurrentBeat() int64 GetCurrentWindow() string IsInWindow(windowID string) bool // Lifecycle Start(ctx context.Context) error Stop() error Health() HealthStatus } ``` ## Examples The SDK includes comprehensive examples: - **[Simple Agent](examples/simple_agent.go)**: Basic beat subscription and status emission - **[Task Processor](examples/task_processor.go)**: Beat budget usage for task timeout management - **[Service Monitor](examples/service_monitor.go)**: Health monitoring with beat-aligned reporting ### Running Examples ```bash # Simple agent example go run pkg/sdk/examples/simple_agent.go # Task processor with beat budgets go run pkg/sdk/examples/task_processor.go # Service monitor with health reporting go run pkg/sdk/examples/service_monitor.go ``` ## Observability ### Health Monitoring ```go health := client.Health() fmt.Printf("Connected: %v\n", health.Connected) fmt.Printf("Last Beat: %d at %s\n", health.LastBeat, health.LastBeatTime) fmt.Printf("Time Drift: %s\n", health.TimeDrift) fmt.Printf("Reconnects: %d\n", health.ReconnectCount) fmt.Printf("Local Degradation: %v\n", health.LocalDegradation) ``` ### Metrics The SDK exposes metrics via Go's `expvar` package: - Connection metrics: status, reconnection count, duration - Beat metrics: received, jitter, callback latency, misses - Status metrics: claims emitted, errors - Budget metrics: created, completed, timed out - Error metrics: total count, last error Access metrics at `http://localhost:8080/debug/vars` when using `expvar`. ### Logging The SDK uses structured logging via `slog`: ```go config.Logger = slog.New(slog.NewJSONHandler(os.Stdout, &slog.HandlerOptions{ Level: slog.LevelDebug, // Set appropriate level })) ``` ## Error Handling The SDK provides comprehensive error handling: - **Connection Errors**: Automatic reconnection with exponential backoff - **Beat Jitter**: Tolerance for network delays and timing variations - **Callback Panics**: Recovery and logging without affecting other callbacks - **Validation Errors**: Status claim validation with detailed error messages - **Timeout Errors**: Beat budget timeouts with context cancellation ## Local Degradation When the pulse service is unavailable, the SDK automatically enters local degradation mode: - Generates synthetic beats to maintain callback timing - Uses fallback 60 BPM tempo - Marks beat frames with "degraded" phase - Automatically recovers when pulse service returns ## Legacy Compatibility Support for legacy `{bar,beat}` patterns (BACKBEAT-REQ-043): ```go // Convert legacy format (logs warning once) beatIndex := client.ConvertLegacyBeat(bar, beat) // Get legacy format from current beat legacy := client.GetLegacyBeatInfo() fmt.Printf("Bar: %d, Beat: %d\n", legacy.Bar, legacy.Beat) ``` ## Security The SDK implements BACKBEAT security requirements: - **Ed25519 Signatures**: All status claims are signed when signing key provided - **Required Headers**: Includes `x-window-id` and `x-hlc` headers - **Agent Identification**: Automatic `x-agent-id` header for routing ```go // Configure signing _, signingKey, _ := ed25519.GenerateKey(rand.Reader) config.SigningKey = signingKey ``` ## Performance The SDK is designed for high performance: - **Beat Callback Latency**: Target ≤5ms callback execution - **Timer Drift**: ≤1% drift over 1 hour without leader - **Concurrent Safe**: All operations are goroutine-safe - **Memory Efficient**: Bounded error lists and metric samples ## Integration Patterns ### Web Service Integration ```go func main() { // Initialize BACKBEAT client client := sdk.NewClient(config) client.OnBeat(func(beat sdk.BeatFrame) { // Report web service status client.EmitStatusClaim(sdk.StatusClaim{ State: "executing", Progress: getRequestSuccessRate(), Notes: fmt.Sprintf("Handling %d req/s", getCurrentRPS()), }) }) // Start HTTP server http.HandleFunc("/health", func(w http.ResponseWriter, r *http.Request) { health := client.Health() json.NewEncoder(w).Encode(health) }) } ``` ### Background Job Processor ```go func processJobs(client sdk.Client) { for job := range jobQueue { // Use beat budget for job timeout err := client.WithBeatBudget(job.MaxBeats, func() error { return processJob(job) }) if err != nil { client.EmitStatusClaim(sdk.StatusClaim{ TaskID: job.ID, State: "failed", Notes: err.Error(), }) } } } ``` ## Testing The SDK includes comprehensive test utilities: ```bash # Run all tests go test ./pkg/sdk/... # Run with race detection go test -race ./pkg/sdk/... # Run benchmarks go test -bench=. ./pkg/sdk/examples/ ``` ## Requirements - Go 1.22 or later - NATS server for messaging - BACKBEAT pulse service running - Network connectivity to cluster ## Contributing 1. Follow standard Go conventions 2. Include comprehensive tests 3. Update documentation for API changes 4. Ensure examples remain working 5. Maintain backward compatibility ## License This SDK is part of the BACKBEAT project and follows the same licensing terms.