Files
2025-10-17 08:56:25 +11:00
..
2025-10-17 08:56:25 +11:00
2025-10-17 08:56:25 +11:00
2025-10-17 08:56:25 +11:00
2025-10-17 08:56:25 +11:00
2025-10-17 08:56:25 +11:00
2025-10-17 08:56:25 +11:00
2025-10-17 08:56:25 +11:00

BACKBEAT Go SDK

The BACKBEAT Go SDK enables CHORUS services to become "BACKBEAT-aware" by providing client libraries for beat synchronization, status emission, and beat-budget management.

Features

  • Beat Subscription (BACKBEAT-REQ-040): Subscribe to beat and downbeat events with jitter-tolerant scheduling
  • Status Emission (BACKBEAT-REQ-041): Emit status claims with automatic agent_id, task_id, and HLC population
  • Beat Budgets (BACKBEAT-REQ-042): Execute functions with beat-based timeouts and cancellation
  • Legacy Compatibility (BACKBEAT-REQ-043): Support for legacy {bar,beat} patterns with migration warnings
  • Security (BACKBEAT-REQ-044): Ed25519 signing and required headers for status claims
  • Local Degradation: Continue operating when pulse service is unavailable
  • Comprehensive Observability: Metrics, health reporting, and performance monitoring

Quick Start

package main

import (
    "context"
    "crypto/ed25519"
    "crypto/rand"
    "log/slog"
    
    "github.com/chorus-services/backbeat/pkg/sdk"
)

func main() {
    // Generate signing key
    _, signingKey, _ := ed25519.GenerateKey(rand.Reader)
    
    // Configure SDK
    config := sdk.DefaultConfig()
    config.ClusterID = "chorus-dev"
    config.AgentID = "my-service"
    config.NATSUrl = "nats://localhost:4222"
    config.SigningKey = signingKey
    
    // Create client
    client := sdk.NewClient(config)
    
    // Register beat callback
    client.OnBeat(func(beat sdk.BeatFrame) {
        slog.Info("Beat received", "beat_index", beat.BeatIndex)
        
        // Emit status
        client.EmitStatusClaim(sdk.StatusClaim{
            State:     "executing",
            BeatsLeft: 5,
            Progress:  0.3,
            Notes:     "Processing data",
        })
    })
    
    // Start client
    ctx := context.Background()
    if err := client.Start(ctx); err != nil {
        panic(err)
    }
    defer client.Stop()
    
    // Your service logic here...
    select {}
}

Configuration

Basic Configuration

config := &sdk.Config{
    ClusterID: "your-cluster",    // BACKBEAT cluster ID
    AgentID:   "your-agent",      // Unique agent identifier
    NATSUrl:   "nats://localhost:4222", // NATS connection URL
}

Advanced Configuration

config := sdk.DefaultConfig()
config.ClusterID = "chorus-prod"
config.AgentID = "web-service-01"
config.NATSUrl = "nats://nats.cluster.local:4222"
config.SigningKey = loadSigningKey() // Ed25519 private key
config.JitterTolerance = 100 * time.Millisecond
config.ReconnectDelay = 2 * time.Second
config.MaxReconnects = 10 // -1 for infinite
config.Logger = slog.New(slog.NewJSONHandler(os.Stdout, nil))

Core Features

Beat Subscription

// Register beat callback (called every beat)
client.OnBeat(func(beat sdk.BeatFrame) {
    // Your beat logic here
    fmt.Printf("Beat %d at %s\n", beat.BeatIndex, beat.DeadlineAt)
})

// Register downbeat callback (called at bar starts)
client.OnDownbeat(func(beat sdk.BeatFrame) {
    // Your downbeat logic here
    fmt.Printf("Bar started: %s\n", beat.WindowID)
})

Status Emission

// Basic status emission
err := client.EmitStatusClaim(sdk.StatusClaim{
    State:     "executing",  // executing|planning|waiting|review|done|failed
    BeatsLeft: 10,          // estimated beats remaining
    Progress:  0.75,        // progress ratio (0.0-1.0)
    Notes:     "Processing batch 5/10",
})

// Advanced status with task tracking
err := client.EmitStatusClaim(sdk.StatusClaim{
    TaskID:    "task-12345", // auto-generated if empty
    State:     "waiting",
    WaitFor:   []string{"hmmm://thread/abc123"}, // dependencies
    BeatsLeft: 0,
    Progress:  1.0,
    Notes:     "Waiting for thread completion",
})

Beat Budgets

// Execute with beat-based timeout
err := client.WithBeatBudget(10, func() error {
    // This function has 10 beats to complete
    return performTask()
})

if err != nil {
    // Handle timeout or task error
    fmt.Printf("Task failed or exceeded budget: %v\n", err)
}

// Real-world example
err := client.WithBeatBudget(20, func() error {
    // Database operation with beat budget
    ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
    defer cancel()
    
    return database.ProcessBatch(ctx, batchData)
})

Client Interface

type Client interface {
    // Beat subscription
    OnBeat(callback func(BeatFrame)) error
    OnDownbeat(callback func(BeatFrame)) error
    
    // Status emission
    EmitStatusClaim(claim StatusClaim) error
    
    // Beat budgets
    WithBeatBudget(n int, fn func() error) error
    
    // Utilities
    GetCurrentBeat() int64
    GetCurrentWindow() string
    IsInWindow(windowID string) bool
    
    // Lifecycle
    Start(ctx context.Context) error
    Stop() error
    Health() HealthStatus
}

Examples

The SDK includes comprehensive examples:

Running Examples

# Simple agent example
go run pkg/sdk/examples/simple_agent.go

# Task processor with beat budgets
go run pkg/sdk/examples/task_processor.go

# Service monitor with health reporting
go run pkg/sdk/examples/service_monitor.go

Observability

Health Monitoring

health := client.Health()
fmt.Printf("Connected: %v\n", health.Connected)
fmt.Printf("Last Beat: %d at %s\n", health.LastBeat, health.LastBeatTime)
fmt.Printf("Time Drift: %s\n", health.TimeDrift)
fmt.Printf("Reconnects: %d\n", health.ReconnectCount)
fmt.Printf("Local Degradation: %v\n", health.LocalDegradation)

Metrics

The SDK exposes metrics via Go's expvar package:

  • Connection metrics: status, reconnection count, duration
  • Beat metrics: received, jitter, callback latency, misses
  • Status metrics: claims emitted, errors
  • Budget metrics: created, completed, timed out
  • Error metrics: total count, last error

Access metrics at http://localhost:8080/debug/vars when using expvar.

Logging

The SDK uses structured logging via slog:

config.Logger = slog.New(slog.NewJSONHandler(os.Stdout, &slog.HandlerOptions{
    Level: slog.LevelDebug, // Set appropriate level
}))

Error Handling

The SDK provides comprehensive error handling:

  • Connection Errors: Automatic reconnection with exponential backoff
  • Beat Jitter: Tolerance for network delays and timing variations
  • Callback Panics: Recovery and logging without affecting other callbacks
  • Validation Errors: Status claim validation with detailed error messages
  • Timeout Errors: Beat budget timeouts with context cancellation

Local Degradation

When the pulse service is unavailable, the SDK automatically enters local degradation mode:

  • Generates synthetic beats to maintain callback timing
  • Uses fallback 60 BPM tempo
  • Marks beat frames with "degraded" phase
  • Automatically recovers when pulse service returns

Legacy Compatibility

Support for legacy {bar,beat} patterns (BACKBEAT-REQ-043):

// Convert legacy format (logs warning once)
beatIndex := client.ConvertLegacyBeat(bar, beat)

// Get legacy format from current beat
legacy := client.GetLegacyBeatInfo()
fmt.Printf("Bar: %d, Beat: %d\n", legacy.Bar, legacy.Beat)

Security

The SDK implements BACKBEAT security requirements:

  • Ed25519 Signatures: All status claims are signed when signing key provided
  • Required Headers: Includes x-window-id and x-hlc headers
  • Agent Identification: Automatic x-agent-id header for routing
// Configure signing
_, signingKey, _ := ed25519.GenerateKey(rand.Reader)
config.SigningKey = signingKey

Performance

The SDK is designed for high performance:

  • Beat Callback Latency: Target ≤5ms callback execution
  • Timer Drift: ≤1% drift over 1 hour without leader
  • Concurrent Safe: All operations are goroutine-safe
  • Memory Efficient: Bounded error lists and metric samples

Integration Patterns

Web Service Integration

func main() {
    // Initialize BACKBEAT client
    client := sdk.NewClient(config)
    client.OnBeat(func(beat sdk.BeatFrame) {
        // Report web service status
        client.EmitStatusClaim(sdk.StatusClaim{
            State: "executing",
            Progress: getRequestSuccessRate(),
            Notes: fmt.Sprintf("Handling %d req/s", getCurrentRPS()),
        })
    })
    
    // Start HTTP server
    http.HandleFunc("/health", func(w http.ResponseWriter, r *http.Request) {
        health := client.Health()
        json.NewEncoder(w).Encode(health)
    })
}

Background Job Processor

func processJobs(client sdk.Client) {
    for job := range jobQueue {
        // Use beat budget for job timeout
        err := client.WithBeatBudget(job.MaxBeats, func() error {
            return processJob(job)
        })
        
        if err != nil {
            client.EmitStatusClaim(sdk.StatusClaim{
                TaskID: job.ID,
                State: "failed", 
                Notes: err.Error(),
            })
        }
    }
}

Testing

The SDK includes comprehensive test utilities:

# Run all tests
go test ./pkg/sdk/...

# Run with race detection
go test -race ./pkg/sdk/...

# Run benchmarks
go test -bench=. ./pkg/sdk/examples/

Requirements

  • Go 1.22 or later
  • NATS server for messaging
  • BACKBEAT pulse service running
  • Network connectivity to cluster

Contributing

  1. Follow standard Go conventions
  2. Include comprehensive tests
  3. Update documentation for API changes
  4. Ensure examples remain working
  5. Maintain backward compatibility

License

This SDK is part of the BACKBEAT project and follows the same licensing terms.