backbeat: add module sources

This commit is contained in:
anthonyrawlins
2025-10-17 08:56:25 +11:00
parent 627d15b3f7
commit 4b4eb16efb
48 changed files with 11636 additions and 0 deletions

373
pkg/sdk/README.md Normal file
View File

@@ -0,0 +1,373 @@
# BACKBEAT Go SDK
The BACKBEAT Go SDK enables CHORUS services to become "BACKBEAT-aware" by providing client libraries for beat synchronization, status emission, and beat-budget management.
## Features
- **Beat Subscription (BACKBEAT-REQ-040)**: Subscribe to beat and downbeat events with jitter-tolerant scheduling
- **Status Emission (BACKBEAT-REQ-041)**: Emit status claims with automatic agent_id, task_id, and HLC population
- **Beat Budgets (BACKBEAT-REQ-042)**: Execute functions with beat-based timeouts and cancellation
- **Legacy Compatibility (BACKBEAT-REQ-043)**: Support for legacy `{bar,beat}` patterns with migration warnings
- **Security (BACKBEAT-REQ-044)**: Ed25519 signing and required headers for status claims
- **Local Degradation**: Continue operating when pulse service is unavailable
- **Comprehensive Observability**: Metrics, health reporting, and performance monitoring
## Quick Start
```go
package main
import (
"context"
"crypto/ed25519"
"crypto/rand"
"log/slog"
"github.com/chorus-services/backbeat/pkg/sdk"
)
func main() {
// Generate signing key
_, signingKey, _ := ed25519.GenerateKey(rand.Reader)
// Configure SDK
config := sdk.DefaultConfig()
config.ClusterID = "chorus-dev"
config.AgentID = "my-service"
config.NATSUrl = "nats://localhost:4222"
config.SigningKey = signingKey
// Create client
client := sdk.NewClient(config)
// Register beat callback
client.OnBeat(func(beat sdk.BeatFrame) {
slog.Info("Beat received", "beat_index", beat.BeatIndex)
// Emit status
client.EmitStatusClaim(sdk.StatusClaim{
State: "executing",
BeatsLeft: 5,
Progress: 0.3,
Notes: "Processing data",
})
})
// Start client
ctx := context.Background()
if err := client.Start(ctx); err != nil {
panic(err)
}
defer client.Stop()
// Your service logic here...
select {}
}
```
## Configuration
### Basic Configuration
```go
config := &sdk.Config{
ClusterID: "your-cluster", // BACKBEAT cluster ID
AgentID: "your-agent", // Unique agent identifier
NATSUrl: "nats://localhost:4222", // NATS connection URL
}
```
### Advanced Configuration
```go
config := sdk.DefaultConfig()
config.ClusterID = "chorus-prod"
config.AgentID = "web-service-01"
config.NATSUrl = "nats://nats.cluster.local:4222"
config.SigningKey = loadSigningKey() // Ed25519 private key
config.JitterTolerance = 100 * time.Millisecond
config.ReconnectDelay = 2 * time.Second
config.MaxReconnects = 10 // -1 for infinite
config.Logger = slog.New(slog.NewJSONHandler(os.Stdout, nil))
```
## Core Features
### Beat Subscription
```go
// Register beat callback (called every beat)
client.OnBeat(func(beat sdk.BeatFrame) {
// Your beat logic here
fmt.Printf("Beat %d at %s\n", beat.BeatIndex, beat.DeadlineAt)
})
// Register downbeat callback (called at bar starts)
client.OnDownbeat(func(beat sdk.BeatFrame) {
// Your downbeat logic here
fmt.Printf("Bar started: %s\n", beat.WindowID)
})
```
### Status Emission
```go
// Basic status emission
err := client.EmitStatusClaim(sdk.StatusClaim{
State: "executing", // executing|planning|waiting|review|done|failed
BeatsLeft: 10, // estimated beats remaining
Progress: 0.75, // progress ratio (0.0-1.0)
Notes: "Processing batch 5/10",
})
// Advanced status with task tracking
err := client.EmitStatusClaim(sdk.StatusClaim{
TaskID: "task-12345", // auto-generated if empty
State: "waiting",
WaitFor: []string{"hmmm://thread/abc123"}, // dependencies
BeatsLeft: 0,
Progress: 1.0,
Notes: "Waiting for thread completion",
})
```
### Beat Budgets
```go
// Execute with beat-based timeout
err := client.WithBeatBudget(10, func() error {
// This function has 10 beats to complete
return performTask()
})
if err != nil {
// Handle timeout or task error
fmt.Printf("Task failed or exceeded budget: %v\n", err)
}
// Real-world example
err := client.WithBeatBudget(20, func() error {
// Database operation with beat budget
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
return database.ProcessBatch(ctx, batchData)
})
```
## Client Interface
```go
type Client interface {
// Beat subscription
OnBeat(callback func(BeatFrame)) error
OnDownbeat(callback func(BeatFrame)) error
// Status emission
EmitStatusClaim(claim StatusClaim) error
// Beat budgets
WithBeatBudget(n int, fn func() error) error
// Utilities
GetCurrentBeat() int64
GetCurrentWindow() string
IsInWindow(windowID string) bool
// Lifecycle
Start(ctx context.Context) error
Stop() error
Health() HealthStatus
}
```
## Examples
The SDK includes comprehensive examples:
- **[Simple Agent](examples/simple_agent.go)**: Basic beat subscription and status emission
- **[Task Processor](examples/task_processor.go)**: Beat budget usage for task timeout management
- **[Service Monitor](examples/service_monitor.go)**: Health monitoring with beat-aligned reporting
### Running Examples
```bash
# Simple agent example
go run pkg/sdk/examples/simple_agent.go
# Task processor with beat budgets
go run pkg/sdk/examples/task_processor.go
# Service monitor with health reporting
go run pkg/sdk/examples/service_monitor.go
```
## Observability
### Health Monitoring
```go
health := client.Health()
fmt.Printf("Connected: %v\n", health.Connected)
fmt.Printf("Last Beat: %d at %s\n", health.LastBeat, health.LastBeatTime)
fmt.Printf("Time Drift: %s\n", health.TimeDrift)
fmt.Printf("Reconnects: %d\n", health.ReconnectCount)
fmt.Printf("Local Degradation: %v\n", health.LocalDegradation)
```
### Metrics
The SDK exposes metrics via Go's `expvar` package:
- Connection metrics: status, reconnection count, duration
- Beat metrics: received, jitter, callback latency, misses
- Status metrics: claims emitted, errors
- Budget metrics: created, completed, timed out
- Error metrics: total count, last error
Access metrics at `http://localhost:8080/debug/vars` when using `expvar`.
### Logging
The SDK uses structured logging via `slog`:
```go
config.Logger = slog.New(slog.NewJSONHandler(os.Stdout, &slog.HandlerOptions{
Level: slog.LevelDebug, // Set appropriate level
}))
```
## Error Handling
The SDK provides comprehensive error handling:
- **Connection Errors**: Automatic reconnection with exponential backoff
- **Beat Jitter**: Tolerance for network delays and timing variations
- **Callback Panics**: Recovery and logging without affecting other callbacks
- **Validation Errors**: Status claim validation with detailed error messages
- **Timeout Errors**: Beat budget timeouts with context cancellation
## Local Degradation
When the pulse service is unavailable, the SDK automatically enters local degradation mode:
- Generates synthetic beats to maintain callback timing
- Uses fallback 60 BPM tempo
- Marks beat frames with "degraded" phase
- Automatically recovers when pulse service returns
## Legacy Compatibility
Support for legacy `{bar,beat}` patterns (BACKBEAT-REQ-043):
```go
// Convert legacy format (logs warning once)
beatIndex := client.ConvertLegacyBeat(bar, beat)
// Get legacy format from current beat
legacy := client.GetLegacyBeatInfo()
fmt.Printf("Bar: %d, Beat: %d\n", legacy.Bar, legacy.Beat)
```
## Security
The SDK implements BACKBEAT security requirements:
- **Ed25519 Signatures**: All status claims are signed when signing key provided
- **Required Headers**: Includes `x-window-id` and `x-hlc` headers
- **Agent Identification**: Automatic `x-agent-id` header for routing
```go
// Configure signing
_, signingKey, _ := ed25519.GenerateKey(rand.Reader)
config.SigningKey = signingKey
```
## Performance
The SDK is designed for high performance:
- **Beat Callback Latency**: Target ≤5ms callback execution
- **Timer Drift**: ≤1% drift over 1 hour without leader
- **Concurrent Safe**: All operations are goroutine-safe
- **Memory Efficient**: Bounded error lists and metric samples
## Integration Patterns
### Web Service Integration
```go
func main() {
// Initialize BACKBEAT client
client := sdk.NewClient(config)
client.OnBeat(func(beat sdk.BeatFrame) {
// Report web service status
client.EmitStatusClaim(sdk.StatusClaim{
State: "executing",
Progress: getRequestSuccessRate(),
Notes: fmt.Sprintf("Handling %d req/s", getCurrentRPS()),
})
})
// Start HTTP server
http.HandleFunc("/health", func(w http.ResponseWriter, r *http.Request) {
health := client.Health()
json.NewEncoder(w).Encode(health)
})
}
```
### Background Job Processor
```go
func processJobs(client sdk.Client) {
for job := range jobQueue {
// Use beat budget for job timeout
err := client.WithBeatBudget(job.MaxBeats, func() error {
return processJob(job)
})
if err != nil {
client.EmitStatusClaim(sdk.StatusClaim{
TaskID: job.ID,
State: "failed",
Notes: err.Error(),
})
}
}
}
```
## Testing
The SDK includes comprehensive test utilities:
```bash
# Run all tests
go test ./pkg/sdk/...
# Run with race detection
go test -race ./pkg/sdk/...
# Run benchmarks
go test -bench=. ./pkg/sdk/examples/
```
## Requirements
- Go 1.22 or later
- NATS server for messaging
- BACKBEAT pulse service running
- Network connectivity to cluster
## Contributing
1. Follow standard Go conventions
2. Include comprehensive tests
3. Update documentation for API changes
4. Ensure examples remain working
5. Maintain backward compatibility
## License
This SDK is part of the BACKBEAT project and follows the same licensing terms.

480
pkg/sdk/client.go Normal file
View File

@@ -0,0 +1,480 @@
// Package sdk provides the BACKBEAT Go SDK for enabling CHORUS services
// to become BACKBEAT-aware with beat synchronization and status emission.
package sdk
import (
"context"
"crypto/ed25519"
"encoding/json"
"fmt"
"log/slog"
"sync"
"time"
"github.com/google/uuid"
"github.com/nats-io/nats.go"
)
// Client interface defines the core BACKBEAT SDK functionality
// Implements BACKBEAT-REQ-040, 041, 042, 043, 044
type Client interface {
// Beat subscription (BACKBEAT-REQ-040)
OnBeat(callback func(BeatFrame)) error
OnDownbeat(callback func(BeatFrame)) error
// Status emission (BACKBEAT-REQ-041)
EmitStatusClaim(claim StatusClaim) error
// Beat budgets (BACKBEAT-REQ-042)
WithBeatBudget(n int, fn func() error) error
// Utilities
GetCurrentBeat() int64
GetCurrentWindow() string
IsInWindow(windowID string) bool
GetCurrentTempo() int
GetTempoDrift() time.Duration
// Lifecycle management
Start(ctx context.Context) error
Stop() error
Health() HealthStatus
}
// Config represents the SDK configuration
type Config struct {
ClusterID string // BACKBEAT cluster identifier
AgentID string // Unique agent identifier
NATSUrl string // NATS connection URL
SigningKey ed25519.PrivateKey // Ed25519 private key for signing (BACKBEAT-REQ-044)
Logger *slog.Logger // Structured logger
JitterTolerance time.Duration // Maximum jitter tolerance (default: 50ms)
ReconnectDelay time.Duration // NATS reconnection delay (default: 1s)
MaxReconnects int // Maximum reconnection attempts (default: -1 for infinite)
}
// DefaultConfig returns a Config with sensible defaults
func DefaultConfig() *Config {
return &Config{
JitterTolerance: 50 * time.Millisecond,
ReconnectDelay: 1 * time.Second,
MaxReconnects: -1, // Infinite reconnects
Logger: slog.Default(),
}
}
// BeatFrame represents a beat frame with timing information
type BeatFrame struct {
Type string `json:"type"`
ClusterID string `json:"cluster_id"`
BeatIndex int64 `json:"beat_index"`
Downbeat bool `json:"downbeat"`
Phase string `json:"phase"`
HLC string `json:"hlc"`
DeadlineAt time.Time `json:"deadline_at"`
TempoBPM int `json:"tempo_bpm"`
WindowID string `json:"window_id"`
}
// StatusClaim represents a status claim emission
type StatusClaim struct {
// Auto-populated by SDK
Type string `json:"type"` // Always "backbeat.statusclaim.v1"
AgentID string `json:"agent_id"` // Auto-populated from config
TaskID string `json:"task_id"` // Auto-generated if not provided
BeatIndex int64 `json:"beat_index"` // Auto-populated from current beat
HLC string `json:"hlc"` // Auto-populated from current HLC
// User-provided
State string `json:"state"` // executing|planning|waiting|review|done|failed
WaitFor []string `json:"wait_for,omitempty"` // refs (e.g., hmmm://thread/...)
BeatsLeft int `json:"beats_left"` // estimated beats remaining
Progress float64 `json:"progress"` // progress ratio (0.0-1.0)
Notes string `json:"notes"` // status description
}
// HealthStatus represents the current health of the SDK client
type HealthStatus struct {
Connected bool `json:"connected"`
LastBeat int64 `json:"last_beat"`
LastBeatTime time.Time `json:"last_beat_time"`
TimeDrift time.Duration `json:"time_drift"`
ReconnectCount int `json:"reconnect_count"`
LocalDegradation bool `json:"local_degradation"`
CurrentTempo int `json:"current_tempo"`
TempoDrift time.Duration `json:"tempo_drift"`
MeasuredBPM float64 `json:"measured_bpm"`
Errors []string `json:"errors,omitempty"`
}
// LegacyBeatInfo represents legacy {bar,beat} information
// For BACKBEAT-REQ-043 compatibility
type LegacyBeatInfo struct {
Bar int `json:"bar"`
Beat int `json:"beat"`
}
// tempoSample represents a tempo measurement for drift calculation
type tempoSample struct {
BeatIndex int64
Tempo int
MeasuredTime time.Time
ActualBPM float64 // Measured BPM based on inter-beat timing
}
// client implements the Client interface
type client struct {
config *Config
nc *nats.Conn
ctx context.Context
cancel context.CancelFunc
wg sync.WaitGroup
// Beat tracking
currentBeat int64
currentWindow string
currentHLC string
lastBeatTime time.Time
currentTempo int // Current tempo in BPM
lastTempo int // Last known tempo for drift calculation
tempoHistory []tempoSample // History for drift calculation
beatMutex sync.RWMutex
// Callbacks
beatCallbacks []func(BeatFrame)
downbeatCallbacks []func(BeatFrame)
callbackMutex sync.RWMutex
// Health and metrics
reconnectCount int
localDegradation bool
errors []string
errorMutex sync.RWMutex
metrics *Metrics
// Beat budget tracking
budgetContexts map[string]context.CancelFunc
budgetMutex sync.Mutex
// Legacy compatibility
legacyWarned bool
legacyMutex sync.Mutex
}
// NewClient creates a new BACKBEAT SDK client
func NewClient(config *Config) Client {
if config.Logger == nil {
config.Logger = slog.Default()
}
c := &client{
config: config,
beatCallbacks: make([]func(BeatFrame), 0),
downbeatCallbacks: make([]func(BeatFrame), 0),
budgetContexts: make(map[string]context.CancelFunc),
errors: make([]string, 0),
tempoHistory: make([]tempoSample, 0, 100),
currentTempo: 60, // Default to 60 BPM
}
// Initialize metrics
prefix := fmt.Sprintf("backbeat.sdk.%s", config.AgentID)
c.metrics = NewMetrics(prefix)
return c
}
// Start initializes the client and begins beat synchronization
func (c *client) Start(ctx context.Context) error {
c.ctx, c.cancel = context.WithCancel(ctx)
if err := c.connect(); err != nil {
return fmt.Errorf("failed to connect to NATS: %w", err)
}
c.wg.Add(1)
go c.beatSubscriptionLoop()
c.config.Logger.Info("BACKBEAT SDK client started",
slog.String("cluster_id", c.config.ClusterID),
slog.String("agent_id", c.config.AgentID))
return nil
}
// Stop gracefully stops the client
func (c *client) Stop() error {
if c.cancel != nil {
c.cancel()
}
// Cancel all active beat budgets
c.budgetMutex.Lock()
for id, cancel := range c.budgetContexts {
cancel()
delete(c.budgetContexts, id)
}
c.budgetMutex.Unlock()
if c.nc != nil {
c.nc.Close()
}
c.wg.Wait()
c.config.Logger.Info("BACKBEAT SDK client stopped")
return nil
}
// OnBeat registers a callback for beat events (BACKBEAT-REQ-040)
func (c *client) OnBeat(callback func(BeatFrame)) error {
if callback == nil {
return fmt.Errorf("callback cannot be nil")
}
c.callbackMutex.Lock()
defer c.callbackMutex.Unlock()
c.beatCallbacks = append(c.beatCallbacks, callback)
return nil
}
// OnDownbeat registers a callback for downbeat events (BACKBEAT-REQ-040)
func (c *client) OnDownbeat(callback func(BeatFrame)) error {
if callback == nil {
return fmt.Errorf("callback cannot be nil")
}
c.callbackMutex.Lock()
defer c.callbackMutex.Unlock()
c.downbeatCallbacks = append(c.downbeatCallbacks, callback)
return nil
}
// EmitStatusClaim emits a status claim (BACKBEAT-REQ-041)
func (c *client) EmitStatusClaim(claim StatusClaim) error {
// Auto-populate required fields
claim.Type = "backbeat.statusclaim.v1"
claim.AgentID = c.config.AgentID
claim.BeatIndex = c.GetCurrentBeat()
claim.HLC = c.getCurrentHLC()
// Auto-generate task ID if not provided
if claim.TaskID == "" {
claim.TaskID = fmt.Sprintf("task:%s", uuid.New().String()[:8])
}
// Validate the claim
if err := c.validateStatusClaim(&claim); err != nil {
return fmt.Errorf("invalid status claim: %w", err)
}
// Sign the claim if signing key is available (BACKBEAT-REQ-044)
if c.config.SigningKey != nil {
if err := c.signStatusClaim(&claim); err != nil {
return fmt.Errorf("failed to sign status claim: %w", err)
}
}
// Publish to NATS
data, err := json.Marshal(claim)
if err != nil {
return fmt.Errorf("failed to marshal status claim: %w", err)
}
subject := fmt.Sprintf("backbeat.status.%s", c.config.ClusterID)
headers := c.createHeaders()
msg := &nats.Msg{
Subject: subject,
Data: data,
Header: headers,
}
if err := c.nc.PublishMsg(msg); err != nil {
c.addError(fmt.Sprintf("failed to publish status claim: %v", err))
c.metrics.RecordStatusClaim(false)
return fmt.Errorf("failed to publish status claim: %w", err)
}
c.metrics.RecordStatusClaim(true)
c.config.Logger.Debug("Status claim emitted",
slog.String("agent_id", claim.AgentID),
slog.String("task_id", claim.TaskID),
slog.String("state", claim.State),
slog.Int64("beat_index", claim.BeatIndex))
return nil
}
// WithBeatBudget executes a function with a beat-based timeout (BACKBEAT-REQ-042)
func (c *client) WithBeatBudget(n int, fn func() error) error {
if n <= 0 {
return fmt.Errorf("beat budget must be positive, got %d", n)
}
// Calculate timeout based on current tempo
currentBeat := c.GetCurrentBeat()
beatDuration := c.getBeatDuration()
timeout := time.Duration(n) * beatDuration
// Use background context if client context is not set (for testing)
baseCtx := c.ctx
if baseCtx == nil {
baseCtx = context.Background()
}
ctx, cancel := context.WithTimeout(baseCtx, timeout)
defer cancel()
// Track the budget context for cancellation
budgetID := uuid.New().String()
c.budgetMutex.Lock()
c.budgetContexts[budgetID] = cancel
c.budgetMutex.Unlock()
// Record budget creation
c.metrics.RecordBudgetCreated()
defer func() {
c.budgetMutex.Lock()
delete(c.budgetContexts, budgetID)
c.budgetMutex.Unlock()
}()
// Execute function with timeout
done := make(chan error, 1)
go func() {
done <- fn()
}()
select {
case err := <-done:
c.metrics.RecordBudgetCompleted(false) // Not timed out
if err != nil {
c.config.Logger.Debug("Beat budget function completed with error",
slog.Int("budget", n),
slog.Int64("start_beat", currentBeat),
slog.String("error", err.Error()))
} else {
c.config.Logger.Debug("Beat budget function completed successfully",
slog.Int("budget", n),
slog.Int64("start_beat", currentBeat))
}
return err
case <-ctx.Done():
c.metrics.RecordBudgetCompleted(true) // Timed out
c.config.Logger.Warn("Beat budget exceeded",
slog.Int("budget", n),
slog.Int64("start_beat", currentBeat),
slog.Duration("timeout", timeout))
return fmt.Errorf("beat budget of %d beats exceeded", n)
}
}
// GetCurrentBeat returns the current beat index
func (c *client) GetCurrentBeat() int64 {
c.beatMutex.RLock()
defer c.beatMutex.RUnlock()
return c.currentBeat
}
// GetCurrentWindow returns the current window ID
func (c *client) GetCurrentWindow() string {
c.beatMutex.RLock()
defer c.beatMutex.RUnlock()
return c.currentWindow
}
// IsInWindow checks if we're currently in the specified window
func (c *client) IsInWindow(windowID string) bool {
return c.GetCurrentWindow() == windowID
}
// GetCurrentTempo returns the current tempo in BPM
func (c *client) GetCurrentTempo() int {
c.beatMutex.RLock()
defer c.beatMutex.RUnlock()
return c.currentTempo
}
// GetTempoDrift calculates the drift between expected and actual tempo
func (c *client) GetTempoDrift() time.Duration {
c.beatMutex.RLock()
defer c.beatMutex.RUnlock()
if len(c.tempoHistory) < 2 {
return 0
}
// Calculate average measured BPM from recent samples
historyLen := len(c.tempoHistory)
recentCount := 10
if historyLen < recentCount {
recentCount = historyLen
}
recent := c.tempoHistory[historyLen-recentCount:]
if len(recent) < 2 {
recent = c.tempoHistory
}
totalBPM := 0.0
for _, sample := range recent {
totalBPM += sample.ActualBPM
}
avgMeasuredBPM := totalBPM / float64(len(recent))
// Calculate drift
expectedBeatDuration := 60.0 / float64(c.currentTempo)
actualBeatDuration := 60.0 / avgMeasuredBPM
drift := actualBeatDuration - expectedBeatDuration
return time.Duration(drift * float64(time.Second))
}
// Health returns the current health status
func (c *client) Health() HealthStatus {
c.errorMutex.RLock()
errors := make([]string, len(c.errors))
copy(errors, c.errors)
c.errorMutex.RUnlock()
c.beatMutex.RLock()
timeDrift := time.Since(c.lastBeatTime)
currentTempo := c.currentTempo
// Calculate measured BPM from recent tempo history
measuredBPM := 60.0 // Default
if len(c.tempoHistory) > 0 {
historyLen := len(c.tempoHistory)
recentCount := 5
if historyLen < recentCount {
recentCount = historyLen
}
recent := c.tempoHistory[historyLen-recentCount:]
totalBPM := 0.0
for _, sample := range recent {
totalBPM += sample.ActualBPM
}
measuredBPM = totalBPM / float64(len(recent))
}
c.beatMutex.RUnlock()
tempoDrift := c.GetTempoDrift()
return HealthStatus{
Connected: c.nc != nil && c.nc.IsConnected(),
LastBeat: c.GetCurrentBeat(),
LastBeatTime: c.lastBeatTime,
TimeDrift: timeDrift,
ReconnectCount: c.reconnectCount,
LocalDegradation: c.localDegradation,
CurrentTempo: currentTempo,
TempoDrift: tempoDrift,
MeasuredBPM: measuredBPM,
Errors: errors,
}
}

573
pkg/sdk/client_test.go Normal file
View File

@@ -0,0 +1,573 @@
package sdk
import (
"context"
"crypto/ed25519"
"crypto/rand"
"fmt"
"testing"
"time"
"log/slog"
"os"
"github.com/nats-io/nats.go"
)
var testCounter int
// generateUniqueAgentID generates unique agent IDs for tests to avoid expvar conflicts
func generateUniqueAgentID(prefix string) string {
testCounter++
return fmt.Sprintf("%s-%d", prefix, testCounter)
}
// TestClient tests basic client creation and configuration
func TestClient(t *testing.T) {
config := DefaultConfig()
config.ClusterID = "test-cluster"
config.AgentID = generateUniqueAgentID("test-agent")
config.NATSUrl = "nats://localhost:4222"
client := NewClient(config)
if client == nil {
t.Fatal("Expected client to be created")
}
// Test health before start
health := client.Health()
if health.Connected {
t.Error("Expected client to be disconnected before start")
}
}
// TestBeatCallbacks tests beat and downbeat callback registration
func TestBeatCallbacks(t *testing.T) {
config := DefaultConfig()
config.ClusterID = "test-cluster"
config.AgentID = generateUniqueAgentID("test-agent-callbacks")
client := NewClient(config)
var beatCalled, downbeatCalled bool
// Register callbacks
err := client.OnBeat(func(beat BeatFrame) {
beatCalled = true
})
if err != nil {
t.Fatalf("Failed to register beat callback: %v", err)
}
err = client.OnDownbeat(func(beat BeatFrame) {
downbeatCalled = true
})
if err != nil {
t.Fatalf("Failed to register downbeat callback: %v", err)
}
// Test nil callback rejection
err = client.OnBeat(nil)
if err == nil {
t.Error("Expected error when registering nil beat callback")
}
err = client.OnDownbeat(nil)
if err == nil {
t.Error("Expected error when registering nil downbeat callback")
}
// Use variables to prevent unused warnings
_ = beatCalled
_ = downbeatCalled
}
// TestStatusClaim tests status claim validation and emission
func TestStatusClaim(t *testing.T) {
_, signingKey, err := ed25519.GenerateKey(rand.Reader)
if err != nil {
t.Fatalf("Failed to generate signing key: %v", err)
}
config := DefaultConfig()
config.ClusterID = "test-cluster"
config.AgentID = generateUniqueAgentID("test-agent")
config.SigningKey = signingKey
client := NewClient(config).(*client)
// Test valid status claim
claim := StatusClaim{
State: "executing",
BeatsLeft: 5,
Progress: 0.5,
Notes: "Test status",
}
// Test validation without connection (should work for validation)
client.currentBeat = 1
client.currentHLC = "test-hlc"
// Test auto-population
if claim.AgentID != "" {
t.Error("Expected AgentID to be empty before emission")
}
// Since we can't actually emit without NATS connection, test validation directly
claim.Type = "backbeat.statusclaim.v1"
claim.AgentID = config.AgentID
claim.TaskID = "test-task"
claim.BeatIndex = 1
claim.HLC = "test-hlc"
err = client.validateStatusClaim(&claim)
if err != nil {
t.Errorf("Expected valid status claim to pass validation: %v", err)
}
// Test invalid states
invalidClaim := claim
invalidClaim.State = "invalid-state"
err = client.validateStatusClaim(&invalidClaim)
if err == nil {
t.Error("Expected invalid state to fail validation")
}
// Test invalid progress
invalidClaim = claim
invalidClaim.Progress = 1.5
err = client.validateStatusClaim(&invalidClaim)
if err == nil {
t.Error("Expected invalid progress to fail validation")
}
// Test negative beats left
invalidClaim = claim
invalidClaim.BeatsLeft = -1
err = client.validateStatusClaim(&invalidClaim)
if err == nil {
t.Error("Expected negative beats_left to fail validation")
}
}
// TestBeatBudget tests beat budget functionality
func TestBeatBudget(t *testing.T) {
config := DefaultConfig()
config.ClusterID = "test-cluster"
config.AgentID = generateUniqueAgentID("test-agent")
client := NewClient(config).(*client)
client.currentTempo = 120 // 120 BPM = 0.5 seconds per beat
ctx := context.Background()
client.ctx = ctx
// Test successful execution within budget
executed := false
err := client.WithBeatBudget(2, func() error {
executed = true
time.Sleep(100 * time.Millisecond) // Much less than 2 beats (1 second)
return nil
})
if err != nil {
t.Errorf("Expected function to complete successfully: %v", err)
}
if !executed {
t.Error("Expected function to be executed")
}
// Test timeout (need to be careful with timing)
timeoutErr := client.WithBeatBudget(1, func() error {
time.Sleep(2 * time.Second) // More than 1 beat at 120 BPM (0.5s)
return nil
})
if timeoutErr == nil {
t.Error("Expected function to timeout")
}
if timeoutErr.Error() != "beat budget of 1 beats exceeded" {
t.Errorf("Expected timeout error message, got: %v", timeoutErr)
}
// Test invalid budget
err = client.WithBeatBudget(0, func() error { return nil })
if err == nil {
t.Error("Expected error for zero beat budget")
}
err = client.WithBeatBudget(-1, func() error { return nil })
if err == nil {
t.Error("Expected error for negative beat budget")
}
}
// TestTempoTracking tests tempo tracking and drift calculation
func TestTempoTracking(t *testing.T) {
config := DefaultConfig()
config.ClusterID = "test-cluster"
config.AgentID = generateUniqueAgentID("test-agent")
client := NewClient(config).(*client)
// Test initial values
if client.GetCurrentTempo() != 60 {
t.Errorf("Expected default tempo to be 60, got %d", client.GetCurrentTempo())
}
if client.GetTempoDrift() != 0 {
t.Errorf("Expected initial tempo drift to be 0, got %v", client.GetTempoDrift())
}
// Simulate tempo changes
client.beatMutex.Lock()
client.currentTempo = 120
client.tempoHistory = append(client.tempoHistory, tempoSample{
BeatIndex: 1,
Tempo: 120,
MeasuredTime: time.Now(),
ActualBPM: 118.0, // Slightly slower than expected
})
client.tempoHistory = append(client.tempoHistory, tempoSample{
BeatIndex: 2,
Tempo: 120,
MeasuredTime: time.Now().Add(500 * time.Millisecond),
ActualBPM: 119.0, // Still slightly slower
})
client.beatMutex.Unlock()
if client.GetCurrentTempo() != 120 {
t.Errorf("Expected current tempo to remain at 120 BPM, got %d", client.GetCurrentTempo())
}
// Test drift calculation (should be non-zero due to difference between 120 and measured BPM)
drift := client.GetTempoDrift()
if drift == 0 {
t.Error("Expected non-zero tempo drift")
}
}
// TestLegacyCompatibility tests legacy beat conversion
func TestLegacyCompatibility(t *testing.T) {
config := DefaultConfig()
config.ClusterID = "test-cluster"
config.AgentID = generateUniqueAgentID("test-agent")
client := NewClient(config).(*client)
// Test legacy beat conversion
beatIndex := client.ConvertLegacyBeat(2, 3) // Bar 2, Beat 3
expectedBeatIndex := int64(7) // (2-1)*4 + 3 = 7
if beatIndex != expectedBeatIndex {
t.Errorf("Expected beat index %d, got %d", expectedBeatIndex, beatIndex)
}
// Test reverse conversion
client.beatMutex.Lock()
client.currentBeat = 7
client.beatMutex.Unlock()
legacyInfo := client.GetLegacyBeatInfo()
if legacyInfo.Bar != 2 || legacyInfo.Beat != 3 {
t.Errorf("Expected bar=2, beat=3, got bar=%d, beat=%d", legacyInfo.Bar, legacyInfo.Beat)
}
// Test edge cases
beatIndex = client.ConvertLegacyBeat(1, 1) // First beat
if beatIndex != 1 {
t.Errorf("Expected beat index 1 for first beat, got %d", beatIndex)
}
client.beatMutex.Lock()
client.currentBeat = 0 // Edge case
client.beatMutex.Unlock()
legacyInfo = client.GetLegacyBeatInfo()
if legacyInfo.Bar != 1 || legacyInfo.Beat != 1 {
t.Errorf("Expected bar=1, beat=1 for zero beat, got bar=%d, beat=%d", legacyInfo.Bar, legacyInfo.Beat)
}
}
// TestHealthStatus tests health status reporting
func TestHealthStatus(t *testing.T) {
config := DefaultConfig()
config.ClusterID = "test-cluster"
config.AgentID = generateUniqueAgentID("test-agent")
client := NewClient(config).(*client)
// Test initial health
health := client.Health()
if health.Connected {
t.Error("Expected client to be disconnected initially")
}
if health.LastBeat != 0 {
t.Error("Expected last beat to be 0 initially")
}
if health.CurrentTempo != 60 {
t.Errorf("Expected default tempo 60, got %d", health.CurrentTempo)
}
// Simulate some activity
client.beatMutex.Lock()
client.currentBeat = 10
client.currentTempo = 90
client.lastBeatTime = time.Now().Add(-100 * time.Millisecond)
client.beatMutex.Unlock()
client.addError("test error")
health = client.Health()
if health.LastBeat != 10 {
t.Errorf("Expected last beat to be 10, got %d", health.LastBeat)
}
if health.CurrentTempo != 90 {
t.Errorf("Expected current tempo to be 90, got %d", health.CurrentTempo)
}
if len(health.Errors) != 1 {
t.Errorf("Expected 1 error, got %d", len(health.Errors))
}
if health.TimeDrift <= 0 {
t.Error("Expected positive time drift")
}
}
// TestMetrics tests metrics integration
func TestMetrics(t *testing.T) {
config := DefaultConfig()
config.ClusterID = "test-cluster"
config.AgentID = generateUniqueAgentID("test-agent")
client := NewClient(config).(*client)
if client.metrics == nil {
t.Fatal("Expected metrics to be initialized")
}
// Test metrics snapshot
snapshot := client.metrics.GetMetricsSnapshot()
if snapshot == nil {
t.Error("Expected metrics snapshot to be available")
}
// Check for expected metric keys
expectedKeys := []string{
"connection_status",
"reconnect_count",
"beats_received",
"status_claims_emitted",
"budgets_created",
"total_errors",
}
for _, key := range expectedKeys {
if _, exists := snapshot[key]; !exists {
t.Errorf("Expected metric key '%s' to exist in snapshot", key)
}
}
}
// TestConfig tests configuration validation and defaults
func TestConfig(t *testing.T) {
// Test default config
config := DefaultConfig()
if config.JitterTolerance != 50*time.Millisecond {
t.Errorf("Expected default jitter tolerance 50ms, got %v", config.JitterTolerance)
}
if config.ReconnectDelay != 1*time.Second {
t.Errorf("Expected default reconnect delay 1s, got %v", config.ReconnectDelay)
}
if config.MaxReconnects != -1 {
t.Errorf("Expected default max reconnects -1, got %d", config.MaxReconnects)
}
// Test logger initialization
config.Logger = nil
client := NewClient(config)
if client == nil {
t.Error("Expected client to be created even with nil logger")
}
// Test with custom config
_, signingKey, err := ed25519.GenerateKey(rand.Reader)
if err != nil {
t.Fatalf("Failed to generate signing key: %v", err)
}
config.ClusterID = "custom-cluster"
config.AgentID = "custom-agent"
config.SigningKey = signingKey
config.JitterTolerance = 100 * time.Millisecond
config.Logger = slog.New(slog.NewTextHandler(os.Stdout, &slog.HandlerOptions{Level: slog.LevelDebug}))
client = NewClient(config)
if client == nil {
t.Error("Expected client to be created with custom config")
}
}
// TestBeatDurationCalculation tests beat duration calculation
func TestBeatDurationCalculation(t *testing.T) {
config := DefaultConfig()
config.ClusterID = "test-cluster"
config.AgentID = generateUniqueAgentID("test-agent")
client := NewClient(config).(*client)
// Test default 60 BPM (1 second per beat)
duration := client.getBeatDuration()
expected := 1000 * time.Millisecond
if duration != expected {
t.Errorf("Expected beat duration %v for 60 BPM, got %v", expected, duration)
}
// Test 120 BPM (0.5 seconds per beat)
client.beatMutex.Lock()
client.currentTempo = 120
client.beatMutex.Unlock()
duration = client.getBeatDuration()
expected = 500 * time.Millisecond
if duration != expected {
t.Errorf("Expected beat duration %v for 120 BPM, got %v", expected, duration)
}
// Test 30 BPM (2 seconds per beat)
client.beatMutex.Lock()
client.currentTempo = 30
client.beatMutex.Unlock()
duration = client.getBeatDuration()
expected = 2000 * time.Millisecond
if duration != expected {
t.Errorf("Expected beat duration %v for 30 BPM, got %v", expected, duration)
}
// Test edge case: zero tempo (should default to 60 BPM)
client.beatMutex.Lock()
client.currentTempo = 0
client.beatMutex.Unlock()
duration = client.getBeatDuration()
expected = 1000 * time.Millisecond
if duration != expected {
t.Errorf("Expected beat duration %v for 0 BPM (default 60), got %v", expected, duration)
}
}
// BenchmarkBeatCallback benchmarks beat callback execution
func BenchmarkBeatCallback(b *testing.B) {
config := DefaultConfig()
config.ClusterID = "bench-cluster"
config.AgentID = "bench-agent"
client := NewClient(config).(*client)
beatFrame := BeatFrame{
Type: "backbeat.beatframe.v1",
ClusterID: "bench-cluster",
BeatIndex: 1,
Downbeat: false,
Phase: "test",
HLC: "test-hlc",
DeadlineAt: time.Now().Add(time.Second),
TempoBPM: 60,
WindowID: "test-window",
}
callbackCount := 0
client.OnBeat(func(beat BeatFrame) {
callbackCount++
})
b.ResetTimer()
for i := 0; i < b.N; i++ {
client.safeExecuteCallback(client.beatCallbacks[0], beatFrame, "beat")
}
if callbackCount != b.N {
b.Errorf("Expected callback to be called %d times, got %d", b.N, callbackCount)
}
}
// BenchmarkStatusClaimValidation benchmarks status claim validation
func BenchmarkStatusClaimValidation(b *testing.B) {
config := DefaultConfig()
config.ClusterID = "bench-cluster"
config.AgentID = "bench-agent"
client := NewClient(config).(*client)
claim := StatusClaim{
Type: "backbeat.statusclaim.v1",
AgentID: "bench-agent",
TaskID: "bench-task",
BeatIndex: 1,
State: "executing",
BeatsLeft: 5,
Progress: 0.5,
Notes: "Benchmark test",
HLC: "bench-hlc",
}
b.ResetTimer()
for i := 0; i < b.N; i++ {
err := client.validateStatusClaim(&claim)
if err != nil {
b.Fatal(err)
}
}
}
// Mock NATS server for integration tests (if needed)
func setupTestNATSServer(t *testing.T) *nats.Conn {
// This would start an embedded NATS server for testing
// For now, we'll skip tests that require NATS if it's not available
nc, err := nats.Connect(nats.DefaultURL)
if err != nil {
t.Skipf("NATS server not available: %v", err)
return nil
}
return nc
}
func TestIntegrationWithNATS(t *testing.T) {
nc := setupTestNATSServer(t)
if nc == nil {
return // Skipped
}
defer nc.Close()
config := DefaultConfig()
config.ClusterID = "integration-test"
config.AgentID = generateUniqueAgentID("test-agent")
config.NATSUrl = nats.DefaultURL
client := NewClient(config)
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
// Test start/stop cycle
err := client.Start(ctx)
if err != nil {
t.Fatalf("Failed to start client: %v", err)
}
// Check health after start
health := client.Health()
if !health.Connected {
t.Error("Expected client to be connected after start")
}
// Test stop
err = client.Stop()
if err != nil {
t.Errorf("Failed to stop client: %v", err)
}
// Check health after stop
health = client.Health()
if health.Connected {
t.Error("Expected client to be disconnected after stop")
}
}

110
pkg/sdk/doc.go Normal file
View File

@@ -0,0 +1,110 @@
// Package sdk provides the BACKBEAT Go SDK for enabling CHORUS services
// to become BACKBEAT-aware with beat synchronization and status emission.
//
// The BACKBEAT SDK enables services to:
// - Subscribe to cluster-wide beat events with jitter tolerance
// - Emit status claims with automatic metadata population
// - Use beat budgets for timeout management
// - Operate in local degradation mode when pulse unavailable
// - Integrate comprehensive observability and health reporting
//
// # Quick Start
//
// config := sdk.DefaultConfig()
// config.ClusterID = "chorus-dev"
// config.AgentID = "my-service"
// config.NATSUrl = "nats://localhost:4222"
//
// client := sdk.NewClient(config)
//
// client.OnBeat(func(beat sdk.BeatFrame) {
// // Called every beat
// client.EmitStatusClaim(sdk.StatusClaim{
// State: "executing",
// Progress: 0.5,
// Notes: "Processing data",
// })
// })
//
// ctx := context.Background()
// client.Start(ctx)
// defer client.Stop()
//
// # Beat Subscription
//
// Register callbacks for beat and downbeat events:
//
// client.OnBeat(func(beat sdk.BeatFrame) {
// // Called every beat (~1-4 times per second depending on tempo)
// fmt.Printf("Beat %d\n", beat.BeatIndex)
// })
//
// client.OnDownbeat(func(beat sdk.BeatFrame) {
// // Called at the start of each bar (every 4 beats typically)
// fmt.Printf("Bar started: %s\n", beat.WindowID)
// })
//
// # Status Emission
//
// Emit status claims to report current state and progress:
//
// err := client.EmitStatusClaim(sdk.StatusClaim{
// State: "executing", // executing|planning|waiting|review|done|failed
// BeatsLeft: 10, // estimated beats remaining
// Progress: 0.75, // progress ratio (0.0-1.0)
// Notes: "Processing batch 5/10",
// })
//
// # Beat Budgets
//
// Execute functions with beat-based timeouts:
//
// err := client.WithBeatBudget(10, func() error {
// // This function has 10 beats to complete
// return performLongRunningTask()
// })
//
// if err != nil {
// // Handle timeout or task error
// log.Printf("Task failed or exceeded budget: %v", err)
// }
//
// # Health and Observability
//
// Monitor client health and metrics:
//
// health := client.Health()
// fmt.Printf("Connected: %v\n", health.Connected)
// fmt.Printf("Last Beat: %d\n", health.LastBeat)
// fmt.Printf("Reconnects: %d\n", health.ReconnectCount)
//
// # Local Degradation
//
// The SDK automatically handles network issues by entering local degradation mode:
// - Generates synthetic beats when pulse service unavailable
// - Uses fallback timing to maintain callback schedules
// - Automatically recovers when pulse service returns
// - Provides seamless operation during network partitions
//
// # Security
//
// The SDK implements BACKBEAT security requirements:
// - Ed25519 signing of all status claims when key provided
// - Required x-window-id and x-hlc headers
// - Agent identification for proper message routing
//
// # Performance
//
// Designed for production use with:
// - Beat callback latency target ≤5ms
// - Timer drift ≤1% over 1 hour without leader
// - Goroutine-safe concurrent operations
// - Bounded memory usage for metrics and errors
//
// # Examples
//
// See the examples subdirectory for complete usage patterns:
// - examples/simple_agent.go: Basic integration
// - examples/task_processor.go: Beat budget usage
// - examples/service_monitor.go: Health monitoring
package sdk

View File

@@ -0,0 +1,520 @@
package examples
import (
"context"
"crypto/ed25519"
"crypto/rand"
"fmt"
"testing"
"time"
"github.com/chorus-services/backbeat/pkg/sdk"
)
var testCounter int
// generateUniqueAgentID generates unique agent IDs for tests to avoid expvar conflicts
func generateUniqueAgentID(prefix string) string {
testCounter++
return fmt.Sprintf("%s-%d", prefix, testCounter)
}
// Test helper interface for both *testing.T and *testing.B
type testHelper interface {
Fatalf(format string, args ...interface{})
}
// Test helper to create a test client configuration
func createTestConfig(t testHelper, agentIDPrefix string) *sdk.Config {
_, signingKey, err := ed25519.GenerateKey(rand.Reader)
if err != nil {
t.Fatalf("Failed to generate signing key: %v", err)
}
config := sdk.DefaultConfig()
config.ClusterID = "test-cluster"
config.AgentID = generateUniqueAgentID(agentIDPrefix)
config.NATSUrl = "nats://localhost:4222" // Assumes NATS is running for tests
config.SigningKey = signingKey
return config
}
// TestSimpleAgentPattern tests the simple agent usage pattern
func TestSimpleAgentPattern(t *testing.T) {
config := createTestConfig(t, "test-simple-agent")
client := sdk.NewClient(config)
// Context for timeout control (used in full integration tests)
_ = context.Background()
// Track callback invocations
var beatCount, downbeatCount int
// Register callbacks
err := client.OnBeat(func(beat sdk.BeatFrame) {
beatCount++
t.Logf("Beat received: %d (downbeat: %v)", beat.BeatIndex, beat.Downbeat)
})
if err != nil {
t.Fatalf("Failed to register beat callback: %v", err)
}
err = client.OnDownbeat(func(beat sdk.BeatFrame) {
downbeatCount++
t.Logf("Downbeat received: %d", beat.BeatIndex)
})
if err != nil {
t.Fatalf("Failed to register downbeat callback: %v", err)
}
// Use variables to prevent unused warnings
_ = beatCount
_ = downbeatCount
// This test only checks if the client can be configured and started
// without errors. Full integration tests would require running services.
// Test health status before starting
health := client.Health()
if health.Connected {
t.Error("Client should not be connected before Start()")
}
// Test that we can create status claims
err = client.EmitStatusClaim(sdk.StatusClaim{
State: "planning",
BeatsLeft: 10,
Progress: 0.0,
Notes: "Test status claim",
})
// This should fail because client isn't started
if err == nil {
t.Error("EmitStatusClaim should fail when client not started")
}
}
// TestBeatBudgetPattern tests the beat budget usage pattern
func TestBeatBudgetPattern(t *testing.T) {
config := createTestConfig(t, "test-budget-agent")
client := sdk.NewClient(config)
// Test beat budget without starting client (should work for timeout logic)
err := client.WithBeatBudget(2, func() error {
time.Sleep(100 * time.Millisecond) // Quick task
return nil
})
// This may fail due to no beat timing available, but shouldn't panic
if err != nil {
t.Logf("Beat budget failed as expected (no timing): %v", err)
}
// Test invalid budget
err = client.WithBeatBudget(0, func() error {
return nil
})
if err == nil {
t.Error("WithBeatBudget should fail with zero budget")
}
err = client.WithBeatBudget(-1, func() error {
return nil
})
if err == nil {
t.Error("WithBeatBudget should fail with negative budget")
}
}
// TestClientConfiguration tests various client configuration scenarios
func TestClientConfiguration(t *testing.T) {
// Test with minimal config
config := &sdk.Config{
ClusterID: "test",
AgentID: "test-agent",
NATSUrl: "nats://localhost:4222",
}
client := sdk.NewClient(config)
if client == nil {
t.Fatal("NewClient should not return nil")
}
// Test health before start
health := client.Health()
if health.Connected {
t.Error("New client should not be connected")
}
// Test utilities with no beat data
beat := client.GetCurrentBeat()
if beat != 0 {
t.Errorf("GetCurrentBeat should return 0 initially, got %d", beat)
}
window := client.GetCurrentWindow()
if window != "" {
t.Errorf("GetCurrentWindow should return empty string initially, got %s", window)
}
// Test IsInWindow
if client.IsInWindow("any-window") {
t.Error("IsInWindow should return false with no current window")
}
}
// TestStatusClaimValidation tests status claim validation
func TestStatusClaimValidation(t *testing.T) {
config := createTestConfig(t, "test-validation")
client := sdk.NewClient(config)
// Test various invalid status claims
testCases := []struct {
name string
claim sdk.StatusClaim
wantErr bool
}{
{
name: "valid claim",
claim: sdk.StatusClaim{
State: "executing",
BeatsLeft: 5,
Progress: 0.5,
Notes: "Test note",
},
wantErr: false, // Will still error due to no connection, but validation should pass
},
{
name: "invalid state",
claim: sdk.StatusClaim{
State: "invalid",
BeatsLeft: 5,
Progress: 0.5,
Notes: "Test note",
},
wantErr: true,
},
{
name: "negative progress",
claim: sdk.StatusClaim{
State: "executing",
BeatsLeft: 5,
Progress: -0.1,
Notes: "Test note",
},
wantErr: true,
},
{
name: "progress too high",
claim: sdk.StatusClaim{
State: "executing",
BeatsLeft: 5,
Progress: 1.1,
Notes: "Test note",
},
wantErr: true,
},
{
name: "negative beats left",
claim: sdk.StatusClaim{
State: "executing",
BeatsLeft: -1,
Progress: 0.5,
Notes: "Test note",
},
wantErr: true,
},
}
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
err := client.EmitStatusClaim(tc.claim)
if tc.wantErr && err == nil {
t.Error("Expected error but got none")
}
// Note: All will error due to no connection, but we're testing validation
if err != nil {
t.Logf("Error (expected): %v", err)
}
})
}
}
// BenchmarkStatusClaimEmission benchmarks status claim creation and validation
func BenchmarkStatusClaimEmission(b *testing.B) {
config := createTestConfig(b, "benchmark-agent")
client := sdk.NewClient(config)
claim := sdk.StatusClaim{
State: "executing",
BeatsLeft: 10,
Progress: 0.75,
Notes: "Benchmark test claim",
}
b.ResetTimer()
b.RunParallel(func(pb *testing.PB) {
for pb.Next() {
// This will fail due to no connection, but measures validation overhead
client.EmitStatusClaim(claim)
}
})
}
// BenchmarkBeatCallbacks benchmarks callback execution
func BenchmarkBeatCallbacks(b *testing.B) {
config := createTestConfig(b, "callback-benchmark")
client := sdk.NewClient(config)
// Register a simple callback
client.OnBeat(func(beat sdk.BeatFrame) {
// Minimal processing
_ = beat.BeatIndex
})
// Create a mock beat frame
beatFrame := sdk.BeatFrame{
Type: "backbeat.beatframe.v1",
ClusterID: "test",
BeatIndex: 1,
Downbeat: false,
Phase: "test",
HLC: "123-0",
WindowID: "test-window",
TempoBPM: 2, // 30-second beats - much more reasonable for testing
}
b.ResetTimer()
b.RunParallel(func(pb *testing.PB) {
for pb.Next() {
// Simulate callback execution
// Note: This doesn't actually invoke callbacks since client isn't started
_ = beatFrame
}
})
}
// TestDetermineState tests the state determination logic from simple_agent.go
func TestDetermineState(t *testing.T) {
tests := []struct {
total int64
completed int64
expected string
}{
{0, 0, "waiting"},
{5, 5, "done"},
{5, 3, "executing"},
{5, 0, "planning"},
{10, 8, "executing"},
{1, 1, "done"},
}
for _, test := range tests {
result := determineState(test.total, test.completed)
if result != test.expected {
t.Errorf("determineState(%d, %d) = %s; expected %s",
test.total, test.completed, result, test.expected)
}
}
}
// TestCalculateBeatsLeft tests the beats remaining calculation from simple_agent.go
func TestCalculateBeatsLeft(t *testing.T) {
tests := []struct {
total int64
completed int64
expected int
}{
{0, 0, 0},
{5, 5, 0},
{5, 3, 10}, // (5-3) * 5 = 10
{10, 0, 50}, // 10 * 5 = 50
{1, 0, 5}, // 1 * 5 = 5
}
for _, test := range tests {
result := calculateBeatsLeft(test.total, test.completed)
if result != test.expected {
t.Errorf("calculateBeatsLeft(%d, %d) = %d; expected %d",
test.total, test.completed, result, test.expected)
}
}
}
// TestTaskStructure tests Task struct from task_processor.go
func TestTaskStructure(t *testing.T) {
task := &Task{
ID: "test-task-123",
Description: "Test processing task",
BeatBudget: 8,
WorkTime: 3 * time.Second,
Created: time.Now(),
}
if task.ID == "" {
t.Error("Expected task ID to be set")
}
if task.Description == "" {
t.Error("Expected task description to be set")
}
if task.BeatBudget <= 0 {
t.Error("Expected positive beat budget")
}
if task.WorkTime <= 0 {
t.Error("Expected positive work time")
}
if task.Created.IsZero() {
t.Error("Expected creation time to be set")
}
}
// TestServiceHealthStructure tests ServiceHealth struct from service_monitor.go
func TestServiceHealthStructure(t *testing.T) {
health := &ServiceHealth{
ServiceName: "test-service",
Status: "healthy",
LastCheck: time.Now(),
ResponseTime: 150 * time.Millisecond,
ErrorCount: 0,
Uptime: 5 * time.Minute,
}
if health.ServiceName == "" {
t.Error("Expected service name to be set")
}
validStatuses := []string{"healthy", "degraded", "unhealthy", "unknown"}
validStatus := false
for _, status := range validStatuses {
if health.Status == status {
validStatus = true
break
}
}
if !validStatus {
t.Errorf("Expected valid status, got: %s", health.Status)
}
if health.ResponseTime < 0 {
t.Error("Expected non-negative response time")
}
if health.ErrorCount < 0 {
t.Error("Expected non-negative error count")
}
}
// TestSystemMetricsStructure tests SystemMetrics struct from service_monitor.go
func TestSystemMetricsStructure(t *testing.T) {
metrics := &SystemMetrics{
CPUPercent: 25.5,
MemoryPercent: 67.8,
GoroutineCount: 42,
HeapSizeMB: 128.5,
}
if metrics.CPUPercent < 0 || metrics.CPUPercent > 100 {
t.Error("Expected CPU percentage between 0 and 100")
}
if metrics.MemoryPercent < 0 || metrics.MemoryPercent > 100 {
t.Error("Expected memory percentage between 0 and 100")
}
if metrics.GoroutineCount < 0 {
t.Error("Expected non-negative goroutine count")
}
if metrics.HeapSizeMB < 0 {
t.Error("Expected non-negative heap size")
}
}
// TestHealthScoreCalculation tests calculateHealthScore from service_monitor.go
func TestHealthScoreCalculation(t *testing.T) {
tests := []struct {
summary map[string]int
expected float64
}{
{map[string]int{"healthy": 0, "degraded": 0, "unhealthy": 0, "unknown": 0}, 0.0},
{map[string]int{"healthy": 4, "degraded": 0, "unhealthy": 0, "unknown": 0}, 1.0},
{map[string]int{"healthy": 0, "degraded": 0, "unhealthy": 4, "unknown": 0}, 0.0},
{map[string]int{"healthy": 2, "degraded": 2, "unhealthy": 0, "unknown": 0}, 0.75},
{map[string]int{"healthy": 1, "degraded": 1, "unhealthy": 1, "unknown": 1}, 0.4375},
}
for i, test := range tests {
result := calculateHealthScore(test.summary)
if result != test.expected {
t.Errorf("Test %d: calculateHealthScore(%v) = %.4f; expected %.4f",
i, test.summary, result, test.expected)
}
}
}
// TestDetermineOverallState tests determineOverallState from service_monitor.go
func TestDetermineOverallState(t *testing.T) {
tests := []struct {
summary map[string]int
expected string
}{
{map[string]int{"healthy": 3, "degraded": 0, "unhealthy": 0, "unknown": 0}, "done"},
{map[string]int{"healthy": 2, "degraded": 1, "unhealthy": 0, "unknown": 0}, "executing"},
{map[string]int{"healthy": 1, "degraded": 1, "unhealthy": 1, "unknown": 0}, "failed"},
{map[string]int{"healthy": 0, "degraded": 0, "unhealthy": 0, "unknown": 3}, "waiting"},
{map[string]int{"healthy": 0, "degraded": 0, "unhealthy": 1, "unknown": 0}, "failed"},
}
for i, test := range tests {
result := determineOverallState(test.summary)
if result != test.expected {
t.Errorf("Test %d: determineOverallState(%v) = %s; expected %s",
i, test.summary, result, test.expected)
}
}
}
// TestFormatHealthSummary tests formatHealthSummary from service_monitor.go
func TestFormatHealthSummary(t *testing.T) {
summary := map[string]int{
"healthy": 3,
"degraded": 2,
"unhealthy": 1,
"unknown": 0,
}
result := formatHealthSummary(summary)
expected := "H:3 D:2 U:1 ?:0"
if result != expected {
t.Errorf("formatHealthSummary() = %s; expected %s", result, expected)
}
}
// TestCollectSystemMetrics tests collectSystemMetrics from service_monitor.go
func TestCollectSystemMetrics(t *testing.T) {
metrics := collectSystemMetrics()
if metrics.GoroutineCount <= 0 {
t.Error("Expected positive goroutine count")
}
if metrics.HeapSizeMB < 0 {
t.Error("Expected non-negative heap size")
}
// Note: CPU and Memory percentages are simplified in the example implementation
if metrics.CPUPercent < 0 {
t.Error("Expected non-negative CPU percentage")
}
if metrics.MemoryPercent < 0 {
t.Error("Expected non-negative memory percentage")
}
}

View File

@@ -0,0 +1,326 @@
package examples
import (
"context"
"crypto/ed25519"
"crypto/rand"
"encoding/json"
"fmt"
"log/slog"
"net/http"
"os"
"os/signal"
"runtime"
"sync"
"syscall"
"time"
"github.com/chorus-services/backbeat/pkg/sdk"
)
// ServiceHealth represents the health status of a monitored service
type ServiceHealth struct {
ServiceName string `json:"service_name"`
Status string `json:"status"` // healthy, degraded, unhealthy
LastCheck time.Time `json:"last_check"`
ResponseTime time.Duration `json:"response_time"`
ErrorCount int `json:"error_count"`
Uptime time.Duration `json:"uptime"`
}
// SystemMetrics represents system-level metrics
type SystemMetrics struct {
CPUPercent float64 `json:"cpu_percent"`
MemoryPercent float64 `json:"memory_percent"`
GoroutineCount int `json:"goroutine_count"`
HeapSizeMB float64 `json:"heap_size_mb"`
}
// ServiceMonitor demonstrates health monitoring with beat-aligned reporting
// This example shows how to integrate BACKBEAT with service monitoring
func ServiceMonitor() {
// Generate a signing key for this example
_, signingKey, err := ed25519.GenerateKey(rand.Reader)
if err != nil {
slog.Error("Failed to generate signing key", "error", err)
return
}
// Create SDK configuration
config := sdk.DefaultConfig()
config.ClusterID = "chorus-dev"
config.AgentID = "service-monitor"
config.NATSUrl = "nats://localhost:4222"
config.SigningKey = signingKey
config.Logger = slog.New(slog.NewTextHandler(os.Stdout, &slog.HandlerOptions{
Level: slog.LevelInfo,
}))
// Create BACKBEAT client
client := sdk.NewClient(config)
// Services to monitor (example endpoints)
monitoredServices := map[string]string{
"pulse-service": "http://localhost:8080/health",
"reverb-service": "http://localhost:8081/health",
"nats-server": "http://localhost:8222/varz", // NATS monitoring endpoint
}
// Health tracking
var (
healthStatus = make(map[string]*ServiceHealth)
healthMutex sync.RWMutex
startTime = time.Now()
)
// Initialize health status
for serviceName := range monitoredServices {
healthStatus[serviceName] = &ServiceHealth{
ServiceName: serviceName,
Status: "unknown",
LastCheck: time.Time{},
}
}
// Register beat callback for frequent health checks
client.OnBeat(func(beat sdk.BeatFrame) {
// Perform health checks every 4 beats (reduce frequency)
if beat.BeatIndex%4 == 0 {
performHealthChecks(monitoredServices, healthStatus, &healthMutex)
}
// Emit status claim with current health summary
if beat.BeatIndex%2 == 0 {
healthSummary := generateHealthSummary(healthStatus, &healthMutex)
systemMetrics := collectSystemMetrics()
state := determineOverallState(healthSummary)
notes := fmt.Sprintf("Services: %s | CPU: %.1f%% | Mem: %.1f%% | Goroutines: %d",
formatHealthSummary(healthSummary),
systemMetrics.CPUPercent,
systemMetrics.MemoryPercent,
systemMetrics.GoroutineCount)
err := client.EmitStatusClaim(sdk.StatusClaim{
State: state,
BeatsLeft: 0, // Monitoring is continuous
Progress: calculateHealthScore(healthSummary),
Notes: notes,
})
if err != nil {
slog.Error("Failed to emit status claim", "error", err)
}
}
})
// Register downbeat callback for detailed reporting
client.OnDownbeat(func(beat sdk.BeatFrame) {
healthMutex.RLock()
healthData, _ := json.MarshalIndent(healthStatus, "", " ")
healthMutex.RUnlock()
systemMetrics := collectSystemMetrics()
uptime := time.Since(startTime)
slog.Info("Service health report",
"beat_index", beat.BeatIndex,
"window_id", beat.WindowID,
"uptime", uptime.String(),
"cpu_percent", systemMetrics.CPUPercent,
"memory_percent", systemMetrics.MemoryPercent,
"heap_mb", systemMetrics.HeapSizeMB,
"goroutines", systemMetrics.GoroutineCount,
)
// Log health details
slog.Debug("Detailed health status", "health_data", string(healthData))
// Emit comprehensive status for the bar
healthSummary := generateHealthSummary(healthStatus, &healthMutex)
err := client.EmitStatusClaim(sdk.StatusClaim{
State: "review", // Downbeat is review time
BeatsLeft: 0,
Progress: calculateHealthScore(healthSummary),
Notes: fmt.Sprintf("Bar %d health review: %s", beat.BeatIndex/4, formatDetailedHealth(healthSummary, systemMetrics)),
})
if err != nil {
slog.Error("Failed to emit downbeat status", "error", err)
}
})
// Setup graceful shutdown
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
// Handle shutdown signals
sigChan := make(chan os.Signal, 1)
signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM)
go func() {
<-sigChan
slog.Info("Shutdown signal received")
cancel()
}()
// Start the client
if err := client.Start(ctx); err != nil {
slog.Error("Failed to start BACKBEAT client", "error", err)
return
}
defer client.Stop()
slog.Info("Service monitor started - use Ctrl+C to stop",
"monitored_services", len(monitoredServices))
// Expose metrics endpoint
go func() {
http.HandleFunc("/metrics", func(w http.ResponseWriter, r *http.Request) {
healthMutex.RLock()
data := make(map[string]interface{})
data["health"] = healthStatus
data["system"] = collectSystemMetrics()
data["backbeat"] = client.Health()
healthMutex.RUnlock()
w.Header().Set("Content-Type", "application/json")
json.NewEncoder(w).Encode(data)
})
slog.Info("Metrics endpoint available", "url", "http://localhost:9090/metrics")
if err := http.ListenAndServe(":9090", nil); err != nil {
slog.Error("Metrics server failed", "error", err)
}
}()
// Wait for shutdown
<-ctx.Done()
slog.Info("Service monitor shutting down")
}
// performHealthChecks checks the health of all monitored services
func performHealthChecks(services map[string]string, healthStatus map[string]*ServiceHealth, mutex *sync.RWMutex) {
for serviceName, endpoint := range services {
go func(name, url string) {
start := time.Now()
client := &http.Client{Timeout: 5 * time.Second}
resp, err := client.Get(url)
responseTime := time.Since(start)
mutex.Lock()
health := healthStatus[name]
health.LastCheck = time.Now()
health.ResponseTime = responseTime
if err != nil {
health.ErrorCount++
health.Status = "unhealthy"
slog.Warn("Health check failed",
"service", name,
"endpoint", url,
"error", err,
"response_time", responseTime)
} else {
if resp.StatusCode >= 200 && resp.StatusCode < 300 {
health.Status = "healthy"
} else if resp.StatusCode >= 300 && resp.StatusCode < 500 {
health.Status = "degraded"
} else {
health.Status = "unhealthy"
health.ErrorCount++
}
resp.Body.Close()
if responseTime > 2*time.Second {
health.Status = "degraded" // Slow response
}
slog.Debug("Health check completed",
"service", name,
"status", health.Status,
"response_time", responseTime,
"status_code", resp.StatusCode)
}
mutex.Unlock()
}(serviceName, endpoint)
}
}
// generateHealthSummary creates a summary of service health
func generateHealthSummary(healthStatus map[string]*ServiceHealth, mutex *sync.RWMutex) map[string]int {
mutex.RLock()
defer mutex.RUnlock()
summary := map[string]int{
"healthy": 0,
"degraded": 0,
"unhealthy": 0,
"unknown": 0,
}
for _, health := range healthStatus {
summary[health.Status]++
}
return summary
}
// determineOverallState determines the overall system state
func determineOverallState(healthSummary map[string]int) string {
if healthSummary["unhealthy"] > 0 {
return "failed"
}
if healthSummary["degraded"] > 0 {
return "executing" // Degraded but still working
}
if healthSummary["healthy"] > 0 {
return "done"
}
return "waiting" // All unknown
}
// calculateHealthScore calculates a health score (0.0-1.0)
func calculateHealthScore(healthSummary map[string]int) float64 {
total := healthSummary["healthy"] + healthSummary["degraded"] + healthSummary["unhealthy"] + healthSummary["unknown"]
if total == 0 {
return 0.0
}
// Weight the scores: healthy=1.0, degraded=0.5, unhealthy=0.0, unknown=0.25
score := float64(healthSummary["healthy"])*1.0 +
float64(healthSummary["degraded"])*0.5 +
float64(healthSummary["unknown"])*0.25
return score / float64(total)
}
// formatHealthSummary creates a compact string representation
func formatHealthSummary(healthSummary map[string]int) string {
return fmt.Sprintf("H:%d D:%d U:%d ?:%d",
healthSummary["healthy"],
healthSummary["degraded"],
healthSummary["unhealthy"],
healthSummary["unknown"])
}
// formatDetailedHealth creates detailed health information
func formatDetailedHealth(healthSummary map[string]int, systemMetrics SystemMetrics) string {
return fmt.Sprintf("Health: %s, CPU: %.1f%%, Mem: %.1f%%, Heap: %.1fMB",
formatHealthSummary(healthSummary),
systemMetrics.CPUPercent,
systemMetrics.MemoryPercent,
systemMetrics.HeapSizeMB)
}
// collectSystemMetrics collects basic system metrics
func collectSystemMetrics() SystemMetrics {
var mem runtime.MemStats
runtime.ReadMemStats(&mem)
return SystemMetrics{
CPUPercent: 0.0, // Would need external package like gopsutil for real CPU metrics
MemoryPercent: float64(mem.Sys) / (1024 * 1024 * 1024) * 100, // Rough approximation
GoroutineCount: runtime.NumGoroutine(),
HeapSizeMB: float64(mem.HeapSys) / (1024 * 1024),
}
}

View File

@@ -0,0 +1,150 @@
// Package examples demonstrates BACKBEAT SDK usage patterns
package examples
import (
"context"
"crypto/ed25519"
"crypto/rand"
"fmt"
"log/slog"
"os"
"os/signal"
"sync/atomic"
"syscall"
"time"
"github.com/chorus-services/backbeat/pkg/sdk"
)
// SimpleAgent demonstrates basic BACKBEAT SDK usage
// This example shows the minimal integration pattern for CHORUS services
func SimpleAgent() {
// Generate a signing key for this example
_, signingKey, err := ed25519.GenerateKey(rand.Reader)
if err != nil {
slog.Error("Failed to generate signing key", "error", err)
return
}
// Create SDK configuration
config := sdk.DefaultConfig()
config.ClusterID = "chorus-dev"
config.AgentID = "simple-agent"
config.NATSUrl = "nats://localhost:4222" // Adjust for your setup
config.SigningKey = signingKey
config.Logger = slog.New(slog.NewTextHandler(os.Stdout, &slog.HandlerOptions{
Level: slog.LevelInfo,
}))
// Create BACKBEAT client
client := sdk.NewClient(config)
// Track some simple state
var taskCounter int64
var completedTasks int64
// Register beat callback - this runs on every beat
client.OnBeat(func(beat sdk.BeatFrame) {
currentTasks := atomic.LoadInt64(&taskCounter)
completed := atomic.LoadInt64(&completedTasks)
// Emit status every few beats
if beat.BeatIndex%3 == 0 {
progress := 0.0
if currentTasks > 0 {
progress = float64(completed) / float64(currentTasks)
}
err := client.EmitStatusClaim(sdk.StatusClaim{
State: determineState(currentTasks, completed),
BeatsLeft: calculateBeatsLeft(currentTasks, completed),
Progress: progress,
Notes: fmt.Sprintf("Processing tasks: %d/%d", completed, currentTasks),
})
if err != nil {
slog.Error("Failed to emit status claim", "error", err)
}
}
})
// Register downbeat callback - this runs at the start of each bar
client.OnDownbeat(func(beat sdk.BeatFrame) {
slog.Info("Bar started",
"beat_index", beat.BeatIndex,
"window_id", beat.WindowID,
"phase", beat.Phase)
// Start new tasks at the beginning of bars
atomic.AddInt64(&taskCounter, 2) // Add 2 new tasks per bar
})
// Setup graceful shutdown
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
// Handle shutdown signals
sigChan := make(chan os.Signal, 1)
signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM)
go func() {
<-sigChan
slog.Info("Shutdown signal received")
cancel()
}()
// Start the client
if err := client.Start(ctx); err != nil {
slog.Error("Failed to start BACKBEAT client", "error", err)
return
}
defer client.Stop()
slog.Info("Simple agent started - use Ctrl+C to stop")
// Simulate some work - complete tasks periodically
ticker := time.NewTicker(2 * time.Second)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
slog.Info("Shutting down simple agent")
return
case <-ticker.C:
// Complete a task if we have any pending
current := atomic.LoadInt64(&taskCounter)
completed := atomic.LoadInt64(&completedTasks)
if completed < current {
atomic.AddInt64(&completedTasks, 1)
slog.Debug("Completed a task",
"completed", completed+1,
"total", current)
}
}
}
}
// determineState calculates the current state based on task progress
func determineState(total, completed int64) string {
if total == 0 {
return "waiting"
}
if completed == total {
return "done"
}
if completed > 0 {
return "executing"
}
return "planning"
}
// calculateBeatsLeft estimates beats remaining based on current progress
func calculateBeatsLeft(total, completed int64) int {
if total == 0 || completed >= total {
return 0
}
remaining := total - completed
// Assume each task takes about 5 beats to complete
return int(remaining * 5)
}

View File

@@ -0,0 +1,259 @@
package examples
import (
"context"
"crypto/ed25519"
"crypto/rand"
"fmt"
"log/slog"
"math"
mathRand "math/rand"
"os"
"os/signal"
"sync"
"syscall"
"time"
"github.com/chorus-services/backbeat/pkg/sdk"
)
// Task represents a work item with beat budget requirements
type Task struct {
ID string
Description string
BeatBudget int // Maximum beats allowed for completion
WorkTime time.Duration // Simulated work duration
Created time.Time
}
// TaskProcessor demonstrates beat budget usage and timeout management
// This example shows how to use beat budgets for reliable task execution
func TaskProcessor() {
// Generate a signing key for this example
_, signingKey, err := ed25519.GenerateKey(rand.Reader)
if err != nil {
slog.Error("Failed to generate signing key", "error", err)
return
}
// Create SDK configuration
config := sdk.DefaultConfig()
config.ClusterID = "chorus-dev"
config.AgentID = "task-processor"
config.NATSUrl = "nats://localhost:4222"
config.SigningKey = signingKey
config.Logger = slog.New(slog.NewTextHandler(os.Stdout, &slog.HandlerOptions{
Level: slog.LevelDebug,
}))
// Create BACKBEAT client
client := sdk.NewClient(config)
// Task management
var (
taskQueue = make(chan *Task, 100)
activeTasks = make(map[string]*Task)
completedTasks = 0
failedTasks = 0
taskMutex sync.RWMutex
)
// Register beat callback for status reporting
client.OnBeat(func(beat sdk.BeatFrame) {
taskMutex.RLock()
activeCount := len(activeTasks)
taskMutex.RUnlock()
// Emit status every 2 beats
if beat.BeatIndex%2 == 0 {
state := "waiting"
if activeCount > 0 {
state = "executing"
}
progress := float64(completedTasks) / float64(completedTasks+failedTasks+activeCount+len(taskQueue))
if math.IsNaN(progress) {
progress = 0.0
}
err := client.EmitStatusClaim(sdk.StatusClaim{
State: state,
BeatsLeft: activeCount * 5, // Estimate 5 beats per active task
Progress: progress,
Notes: fmt.Sprintf("Active: %d, Completed: %d, Failed: %d, Queue: %d",
activeCount, completedTasks, failedTasks, len(taskQueue)),
})
if err != nil {
slog.Error("Failed to emit status claim", "error", err)
}
}
})
// Register downbeat callback to create new tasks
client.OnDownbeat(func(beat sdk.BeatFrame) {
slog.Info("New bar - creating tasks",
"beat_index", beat.BeatIndex,
"window_id", beat.WindowID)
// Create 1-3 new tasks each bar
numTasks := mathRand.Intn(3) + 1
for i := 0; i < numTasks; i++ {
task := &Task{
ID: fmt.Sprintf("task-%d-%d", beat.BeatIndex, i),
Description: fmt.Sprintf("Process data batch %d", i),
BeatBudget: mathRand.Intn(8) + 2, // 2-10 beat budget
WorkTime: time.Duration(mathRand.Intn(3)+1) * time.Second, // 1-4 seconds of work
Created: time.Now(),
}
select {
case taskQueue <- task:
slog.Debug("Task created", "task_id", task.ID, "budget", task.BeatBudget)
default:
slog.Warn("Task queue full, dropping task", "task_id", task.ID)
}
}
})
// Setup graceful shutdown
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
// Handle shutdown signals
sigChan := make(chan os.Signal, 1)
signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM)
go func() {
<-sigChan
slog.Info("Shutdown signal received")
cancel()
}()
// Start the client
if err := client.Start(ctx); err != nil {
slog.Error("Failed to start BACKBEAT client", "error", err)
return
}
defer client.Stop()
slog.Info("Task processor started - use Ctrl+C to stop")
// Start task workers
const numWorkers = 3
for i := 0; i < numWorkers; i++ {
go func(workerID int) {
for {
select {
case <-ctx.Done():
return
case task := <-taskQueue:
processTaskWithBudget(ctx, client, task, workerID, &taskMutex, activeTasks, &completedTasks, &failedTasks)
}
}
}(i)
}
// Wait for shutdown
<-ctx.Done()
slog.Info("Task processor shutting down")
}
// processTaskWithBudget processes a task using BACKBEAT beat budgets
func processTaskWithBudget(
ctx context.Context,
client sdk.Client,
task *Task,
workerID int,
taskMutex *sync.RWMutex,
activeTasks map[string]*Task,
completedTasks *int,
failedTasks *int,
) {
// Add task to active tasks
taskMutex.Lock()
activeTasks[task.ID] = task
taskMutex.Unlock()
// Remove from active tasks when done
defer func() {
taskMutex.Lock()
delete(activeTasks, task.ID)
taskMutex.Unlock()
}()
slog.Info("Processing task",
"worker", workerID,
"task_id", task.ID,
"budget", task.BeatBudget,
"work_time", task.WorkTime)
// Use beat budget to execute the task
err := client.WithBeatBudget(task.BeatBudget, func() error {
// Emit starting status
client.EmitStatusClaim(sdk.StatusClaim{
TaskID: task.ID,
State: "executing",
BeatsLeft: task.BeatBudget,
Progress: 0.0,
Notes: fmt.Sprintf("Worker %d processing %s", workerID, task.Description),
})
// Simulate work with progress updates
steps := 5
stepDuration := task.WorkTime / time.Duration(steps)
for step := 0; step < steps; step++ {
select {
case <-ctx.Done():
return ctx.Err()
case <-time.After(stepDuration):
progress := float64(step+1) / float64(steps)
client.EmitStatusClaim(sdk.StatusClaim{
TaskID: task.ID,
State: "executing",
BeatsLeft: int(float64(task.BeatBudget) * (1.0 - progress)),
Progress: progress,
Notes: fmt.Sprintf("Worker %d step %d/%d", workerID, step+1, steps),
})
}
}
return nil
})
// Handle completion or timeout
if err != nil {
slog.Warn("Task failed or timed out",
"worker", workerID,
"task_id", task.ID,
"error", err)
*failedTasks++
// Emit failure status
client.EmitStatusClaim(sdk.StatusClaim{
TaskID: task.ID,
State: "failed",
BeatsLeft: 0,
Progress: 0.0,
Notes: fmt.Sprintf("Worker %d failed: %s", workerID, err.Error()),
})
} else {
slog.Info("Task completed successfully",
"worker", workerID,
"task_id", task.ID,
"duration", time.Since(task.Created))
*completedTasks++
// Emit completion status
client.EmitStatusClaim(sdk.StatusClaim{
TaskID: task.ID,
State: "done",
BeatsLeft: 0,
Progress: 1.0,
Notes: fmt.Sprintf("Worker %d completed %s", workerID, task.Description),
})
}
}

446
pkg/sdk/internal.go Normal file
View File

@@ -0,0 +1,446 @@
package sdk
import (
"crypto/ed25519"
"crypto/sha256"
"encoding/json"
"fmt"
"time"
"github.com/nats-io/nats.go"
)
// connect establishes connection to NATS with retry logic
func (c *client) connect() error {
opts := []nats.Option{
nats.ReconnectWait(c.config.ReconnectDelay),
nats.MaxReconnects(c.config.MaxReconnects),
nats.ReconnectHandler(func(nc *nats.Conn) {
c.reconnectCount++
c.metrics.RecordConnection()
c.config.Logger.Info("NATS reconnected",
"reconnect_count", c.reconnectCount,
"url", nc.ConnectedUrl())
}),
nats.DisconnectErrHandler(func(nc *nats.Conn, err error) {
if err != nil {
c.metrics.RecordDisconnection()
c.addError(fmt.Sprintf("NATS disconnected: %v", err))
c.config.Logger.Warn("NATS disconnected", "error", err)
}
}),
nats.ClosedHandler(func(nc *nats.Conn) {
c.metrics.RecordDisconnection()
c.config.Logger.Info("NATS connection closed")
}),
}
nc, err := nats.Connect(c.config.NATSUrl, opts...)
if err != nil {
c.metrics.RecordError(fmt.Sprintf("NATS connection failed: %v", err))
return fmt.Errorf("failed to connect to NATS: %w", err)
}
c.nc = nc
c.metrics.RecordConnection()
c.config.Logger.Info("Connected to NATS", "url", nc.ConnectedUrl())
return nil
}
// beatSubscriptionLoop handles beat frame subscription with jitter tolerance
func (c *client) beatSubscriptionLoop() {
defer c.wg.Done()
subject := fmt.Sprintf("backbeat.beat.%s", c.config.ClusterID)
// Subscribe to beat frames
sub, err := c.nc.Subscribe(subject, c.handleBeatFrame)
if err != nil {
c.addError(fmt.Sprintf("failed to subscribe to beats: %v", err))
c.config.Logger.Error("Failed to subscribe to beats", "error", err)
return
}
defer sub.Unsubscribe()
c.config.Logger.Info("Beat subscription active", "subject", subject)
// Start local degradation timer for fallback timing
localTicker := time.NewTicker(1 * time.Second) // Default 60 BPM fallback
defer localTicker.Stop()
for {
select {
case <-c.ctx.Done():
return
case <-localTicker.C:
// Local degradation mode - generate synthetic beats if no recent beats
c.beatMutex.RLock()
lastBeatTime := c.lastBeatTime
tempo := c.currentTempo
c.beatMutex.RUnlock()
if lastBeatTime.IsZero() {
continue
}
timeSinceLastBeat := time.Since(lastBeatTime)
if tempo <= 0 {
tempo = 60 // Default to 60 BPM if no tempo information available
}
expectedBeatDuration := time.Duration(float64(time.Minute) / float64(tempo))
if expectedBeatDuration < time.Second {
expectedBeatDuration = time.Second
}
grace := expectedBeatDuration / 2
if grace < 2*time.Second {
grace = 2 * time.Second
}
degradationThreshold := expectedBeatDuration + grace
if timeSinceLastBeat > degradationThreshold {
if !c.localDegradation {
// Entering degradation mode after extended silence.
c.localDegradation = true
}
c.handleLocalDegradationBeat()
c.metrics.RecordLocalDegradation(timeSinceLastBeat)
} else if c.localDegradation && timeSinceLastBeat <= expectedBeatDuration {
// Quietly exit degradation mode once beats resume within expected window.
c.localDegradation = false
}
}
}
}
// handleBeatFrame processes incoming beat frames with jitter tolerance
func (c *client) handleBeatFrame(msg *nats.Msg) {
var beatFrame BeatFrame
if err := json.Unmarshal(msg.Data, &beatFrame); err != nil {
c.addError(fmt.Sprintf("failed to unmarshal beat frame: %v", err))
return
}
// Validate beat frame
if beatFrame.Type != "backbeat.beatframe.v1" {
c.addError(fmt.Sprintf("invalid beat frame type: %s", beatFrame.Type))
return
}
// Check for jitter tolerance
now := time.Now()
expectedTime := beatFrame.DeadlineAt.Add(-c.getBeatDuration()) // Beat should arrive one duration before deadline
jitter := now.Sub(expectedTime)
if jitter.Abs() > c.config.JitterTolerance {
c.config.Logger.Debug("Beat jitter detected",
"jitter", jitter,
"tolerance", c.config.JitterTolerance,
"beat_index", beatFrame.BeatIndex)
}
// Update internal state
c.beatMutex.Lock()
c.currentBeat = beatFrame.BeatIndex
c.currentWindow = beatFrame.WindowID
c.currentHLC = beatFrame.HLC
// Track tempo changes and calculate actual BPM
if c.currentTempo != beatFrame.TempoBPM {
c.lastTempo = c.currentTempo
c.currentTempo = beatFrame.TempoBPM
}
// Calculate actual BPM from inter-beat timing
actualBPM := 60.0 // Default
if !c.lastBeatTime.IsZero() {
interBeatDuration := now.Sub(c.lastBeatTime)
if interBeatDuration > 0 {
actualBPM = 60.0 / interBeatDuration.Seconds()
}
}
// Record tempo sample for drift analysis
sample := tempoSample{
BeatIndex: beatFrame.BeatIndex,
Tempo: beatFrame.TempoBPM,
MeasuredTime: now,
ActualBPM: actualBPM,
}
c.tempoHistory = append(c.tempoHistory, sample)
// Keep only last 100 samples
if len(c.tempoHistory) > 100 {
c.tempoHistory = c.tempoHistory[1:]
}
c.lastBeatTime = now
c.beatMutex.Unlock()
// Record beat metrics
c.metrics.RecordBeat(beatFrame.DeadlineAt.Add(-c.getBeatDuration()), now, beatFrame.Downbeat)
// If we were in local degradation mode, exit it
if c.localDegradation {
// Reset without logging to keep synthetic beats silent.
c.localDegradation = false
}
// Execute beat callbacks with error handling
c.callbackMutex.RLock()
beatCallbacks := make([]func(BeatFrame), len(c.beatCallbacks))
copy(beatCallbacks, c.beatCallbacks)
var downbeatCallbacks []func(BeatFrame)
if beatFrame.Downbeat {
downbeatCallbacks = make([]func(BeatFrame), len(c.downbeatCallbacks))
copy(downbeatCallbacks, c.downbeatCallbacks)
}
c.callbackMutex.RUnlock()
// Execute callbacks in separate goroutines to prevent blocking
for _, callback := range beatCallbacks {
go c.safeExecuteCallback(callback, beatFrame, "beat")
}
if beatFrame.Downbeat {
for _, callback := range downbeatCallbacks {
go c.safeExecuteCallback(callback, beatFrame, "downbeat")
}
}
c.config.Logger.Debug("Beat processed",
"beat_index", beatFrame.BeatIndex,
"downbeat", beatFrame.Downbeat,
"phase", beatFrame.Phase,
"window_id", beatFrame.WindowID)
}
// handleLocalDegradationBeat generates synthetic beats during network issues
func (c *client) handleLocalDegradationBeat() {
c.beatMutex.Lock()
c.currentBeat++
// Generate synthetic beat frame
now := time.Now()
beatFrame := BeatFrame{
Type: "backbeat.beatframe.v1",
ClusterID: c.config.ClusterID,
BeatIndex: c.currentBeat,
Downbeat: (c.currentBeat-1)%4 == 0, // Assume 4/4 time signature
Phase: "degraded",
HLC: fmt.Sprintf("%d-0", now.UnixNano()),
DeadlineAt: now.Add(time.Second), // 1 second deadline in degradation
TempoBPM: 1, // Default 1 BPM (60-second beats) for safe recovery cadence
WindowID: c.generateDegradedWindowID(c.currentBeat),
}
c.currentWindow = beatFrame.WindowID
c.currentHLC = beatFrame.HLC
c.lastBeatTime = now
c.beatMutex.Unlock()
// Execute callbacks same as normal beats
c.callbackMutex.RLock()
beatCallbacks := make([]func(BeatFrame), len(c.beatCallbacks))
copy(beatCallbacks, c.beatCallbacks)
var downbeatCallbacks []func(BeatFrame)
if beatFrame.Downbeat {
downbeatCallbacks = make([]func(BeatFrame), len(c.downbeatCallbacks))
copy(downbeatCallbacks, c.downbeatCallbacks)
}
c.callbackMutex.RUnlock()
for _, callback := range beatCallbacks {
go c.safeExecuteCallback(callback, beatFrame, "degraded-beat")
}
if beatFrame.Downbeat {
for _, callback := range downbeatCallbacks {
go c.safeExecuteCallback(callback, beatFrame, "degraded-downbeat")
}
}
}
// safeExecuteCallback executes a callback with panic recovery
func (c *client) safeExecuteCallback(callback func(BeatFrame), beat BeatFrame, callbackType string) {
defer func() {
if r := recover(); r != nil {
errMsg := fmt.Sprintf("panic in %s callback: %v", callbackType, r)
c.addError(errMsg)
c.metrics.RecordError(errMsg)
c.config.Logger.Error("Callback panic recovered",
"type", callbackType,
"panic", r,
"beat_index", beat.BeatIndex)
}
}()
start := time.Now()
callback(beat)
duration := time.Since(start)
// Record callback latency metrics
c.metrics.RecordCallbackLatency(duration, callbackType)
// Warn about slow callbacks
if duration > 5*time.Millisecond {
c.config.Logger.Warn("Slow callback detected",
"type", callbackType,
"duration", duration,
"beat_index", beat.BeatIndex)
}
}
// validateStatusClaim validates a status claim
func (c *client) validateStatusClaim(claim *StatusClaim) error {
if claim.State == "" {
return fmt.Errorf("state is required")
}
validStates := map[string]bool{
"executing": true,
"planning": true,
"waiting": true,
"review": true,
"done": true,
"failed": true,
}
if !validStates[claim.State] {
return fmt.Errorf("invalid state: must be one of [executing, planning, waiting, review, done, failed], got '%s'", claim.State)
}
if claim.Progress < 0.0 || claim.Progress > 1.0 {
return fmt.Errorf("progress must be between 0.0 and 1.0, got %f", claim.Progress)
}
if claim.BeatsLeft < 0 {
return fmt.Errorf("beats_left must be non-negative, got %d", claim.BeatsLeft)
}
return nil
}
// signStatusClaim signs a status claim using Ed25519 (BACKBEAT-REQ-044)
func (c *client) signStatusClaim(claim *StatusClaim) error {
if c.config.SigningKey == nil {
return fmt.Errorf("signing key not configured")
}
// Create canonical representation for signing
canonical, err := json.Marshal(claim)
if err != nil {
return fmt.Errorf("failed to marshal claim for signing: %w", err)
}
// Sign the canonical representation
signature := ed25519.Sign(c.config.SigningKey, canonical)
// Add signature to notes (temporary until proper signature field added)
claim.Notes += fmt.Sprintf(" [sig:%x]", signature)
return nil
}
// createHeaders creates NATS headers with required security information
func (c *client) createHeaders() nats.Header {
headers := make(nats.Header)
// Add window ID header (BACKBEAT-REQ-044)
headers.Add("x-window-id", c.GetCurrentWindow())
// Add HLC header (BACKBEAT-REQ-044)
headers.Add("x-hlc", c.getCurrentHLC())
// Add agent ID for routing
headers.Add("x-agent-id", c.config.AgentID)
return headers
}
// getCurrentHLC returns the current HLC timestamp
func (c *client) getCurrentHLC() string {
c.beatMutex.RLock()
defer c.beatMutex.RUnlock()
if c.currentHLC != "" {
return c.currentHLC
}
// Generate fallback HLC
return fmt.Sprintf("%d-0", time.Now().UnixNano())
}
// getBeatDuration calculates the duration of a beat based on current tempo
func (c *client) getBeatDuration() time.Duration {
c.beatMutex.RLock()
tempo := c.currentTempo
c.beatMutex.RUnlock()
if tempo <= 0 {
tempo = 60 // Default to 60 BPM if no tempo information available
}
// Calculate beat duration: 60 seconds / BPM = seconds per beat
return time.Duration(60.0/float64(tempo)*1000) * time.Millisecond
}
// generateDegradedWindowID generates a window ID for degraded mode
func (c *client) generateDegradedWindowID(beatIndex int64) string {
// Use similar algorithm to regular window ID but mark as degraded
input := fmt.Sprintf("%s:degraded:%d", c.config.ClusterID, beatIndex/4) // Assume 4-beat bars
hash := sha256.Sum256([]byte(input))
return fmt.Sprintf("deg-%x", hash)[:32]
}
// addError adds an error to the error list with deduplication
func (c *client) addError(err string) {
c.errorMutex.Lock()
defer c.errorMutex.Unlock()
// Keep only the last 10 errors to prevent memory leaks
if len(c.errors) >= 10 {
c.errors = c.errors[1:]
}
timestampedErr := fmt.Sprintf("[%s] %s", time.Now().Format("15:04:05"), err)
c.errors = append(c.errors, timestampedErr)
// Record error in metrics
c.metrics.RecordError(timestampedErr)
}
// Legacy compatibility functions for BACKBEAT-REQ-043
// ConvertLegacyBeat converts legacy {bar,beat} to beat_index with warning
func (c *client) ConvertLegacyBeat(bar, beat int) int64 {
c.legacyMutex.Lock()
if !c.legacyWarned {
c.config.Logger.Warn("Legacy {bar,beat} format detected - please migrate to beat_index",
"bar", bar, "beat", beat)
c.legacyWarned = true
}
c.legacyMutex.Unlock()
// Convert assuming 4 beats per bar (standard)
return int64((bar-1)*4 + beat)
}
// GetLegacyBeatInfo converts current beat_index to legacy {bar,beat} format
func (c *client) GetLegacyBeatInfo() LegacyBeatInfo {
beatIndex := c.GetCurrentBeat()
if beatIndex <= 0 {
return LegacyBeatInfo{Bar: 1, Beat: 1}
}
// Convert assuming 4 beats per bar
bar := int((beatIndex-1)/4) + 1
beat := int((beatIndex-1)%4) + 1
return LegacyBeatInfo{Bar: bar, Beat: beat}
}

277
pkg/sdk/metrics.go Normal file
View File

@@ -0,0 +1,277 @@
package sdk
import (
"expvar"
"fmt"
"sync"
"time"
)
// Metrics provides comprehensive observability for the SDK
type Metrics struct {
// Connection metrics
ConnectionStatus *expvar.Int
ReconnectCount *expvar.Int
ConnectionDuration *expvar.Int
// Beat metrics
BeatsReceived *expvar.Int
DownbeatsReceived *expvar.Int
BeatJitterMS *expvar.Map
BeatCallbackLatency *expvar.Map
BeatMisses *expvar.Int
LocalDegradationTime *expvar.Int
// Status emission metrics
StatusClaimsEmitted *expvar.Int
StatusClaimErrors *expvar.Int
// Budget metrics
BudgetsCreated *expvar.Int
BudgetsCompleted *expvar.Int
BudgetsTimedOut *expvar.Int
// Error metrics
TotalErrors *expvar.Int
LastError *expvar.String
// Internal counters
beatJitterSamples []float64
jitterMutex sync.Mutex
callbackLatencies []float64
latencyMutex sync.Mutex
}
// NewMetrics creates a new metrics instance with expvar integration
func NewMetrics(prefix string) *Metrics {
m := &Metrics{
ConnectionStatus: expvar.NewInt(prefix + ".connection.status"),
ReconnectCount: expvar.NewInt(prefix + ".connection.reconnects"),
ConnectionDuration: expvar.NewInt(prefix + ".connection.duration_ms"),
BeatsReceived: expvar.NewInt(prefix + ".beats.received"),
DownbeatsReceived: expvar.NewInt(prefix + ".beats.downbeats"),
BeatJitterMS: expvar.NewMap(prefix + ".beats.jitter_ms"),
BeatCallbackLatency: expvar.NewMap(prefix + ".beats.callback_latency_ms"),
BeatMisses: expvar.NewInt(prefix + ".beats.misses"),
LocalDegradationTime: expvar.NewInt(prefix + ".beats.degradation_ms"),
StatusClaimsEmitted: expvar.NewInt(prefix + ".status.claims_emitted"),
StatusClaimErrors: expvar.NewInt(prefix + ".status.claim_errors"),
BudgetsCreated: expvar.NewInt(prefix + ".budgets.created"),
BudgetsCompleted: expvar.NewInt(prefix + ".budgets.completed"),
BudgetsTimedOut: expvar.NewInt(prefix + ".budgets.timed_out"),
TotalErrors: expvar.NewInt(prefix + ".errors.total"),
LastError: expvar.NewString(prefix + ".errors.last"),
beatJitterSamples: make([]float64, 0, 100),
callbackLatencies: make([]float64, 0, 100),
}
// Initialize connection status to disconnected
m.ConnectionStatus.Set(0)
return m
}
// RecordConnection records connection establishment
func (m *Metrics) RecordConnection() {
m.ConnectionStatus.Set(1)
m.ReconnectCount.Add(1)
}
// RecordDisconnection records connection loss
func (m *Metrics) RecordDisconnection() {
m.ConnectionStatus.Set(0)
}
// RecordBeat records a beat reception with jitter measurement
func (m *Metrics) RecordBeat(expectedTime, actualTime time.Time, isDownbeat bool) {
m.BeatsReceived.Add(1)
if isDownbeat {
m.DownbeatsReceived.Add(1)
}
// Calculate and record jitter
jitter := actualTime.Sub(expectedTime)
jitterMS := float64(jitter.Nanoseconds()) / 1e6
m.jitterMutex.Lock()
m.beatJitterSamples = append(m.beatJitterSamples, jitterMS)
if len(m.beatJitterSamples) > 100 {
m.beatJitterSamples = m.beatJitterSamples[1:]
}
// Update jitter statistics
if len(m.beatJitterSamples) > 0 {
avg, p95, p99 := m.calculatePercentiles(m.beatJitterSamples)
m.BeatJitterMS.Set("avg", &expvar.Float{})
m.BeatJitterMS.Get("avg").(*expvar.Float).Set(avg)
m.BeatJitterMS.Set("p95", &expvar.Float{})
m.BeatJitterMS.Get("p95").(*expvar.Float).Set(p95)
m.BeatJitterMS.Set("p99", &expvar.Float{})
m.BeatJitterMS.Get("p99").(*expvar.Float).Set(p99)
}
m.jitterMutex.Unlock()
}
// RecordBeatMiss records a missed beat
func (m *Metrics) RecordBeatMiss() {
m.BeatMisses.Add(1)
}
// RecordCallbackLatency records callback execution latency
func (m *Metrics) RecordCallbackLatency(duration time.Duration, callbackType string) {
latencyMS := float64(duration.Nanoseconds()) / 1e6
m.latencyMutex.Lock()
m.callbackLatencies = append(m.callbackLatencies, latencyMS)
if len(m.callbackLatencies) > 100 {
m.callbackLatencies = m.callbackLatencies[1:]
}
// Update latency statistics
if len(m.callbackLatencies) > 0 {
avg, p95, p99 := m.calculatePercentiles(m.callbackLatencies)
key := callbackType + "_avg"
m.BeatCallbackLatency.Set(key, &expvar.Float{})
m.BeatCallbackLatency.Get(key).(*expvar.Float).Set(avg)
key = callbackType + "_p95"
m.BeatCallbackLatency.Set(key, &expvar.Float{})
m.BeatCallbackLatency.Get(key).(*expvar.Float).Set(p95)
key = callbackType + "_p99"
m.BeatCallbackLatency.Set(key, &expvar.Float{})
m.BeatCallbackLatency.Get(key).(*expvar.Float).Set(p99)
}
m.latencyMutex.Unlock()
}
// RecordLocalDegradation records time spent in local degradation mode
func (m *Metrics) RecordLocalDegradation(duration time.Duration) {
durationMS := duration.Nanoseconds() / 1e6
m.LocalDegradationTime.Add(durationMS)
}
// RecordStatusClaim records a status claim emission
func (m *Metrics) RecordStatusClaim(success bool) {
if success {
m.StatusClaimsEmitted.Add(1)
} else {
m.StatusClaimErrors.Add(1)
}
}
// RecordBudget records budget creation and completion
func (m *Metrics) RecordBudgetCreated() {
m.BudgetsCreated.Add(1)
}
func (m *Metrics) RecordBudgetCompleted(timedOut bool) {
if timedOut {
m.BudgetsTimedOut.Add(1)
} else {
m.BudgetsCompleted.Add(1)
}
}
// RecordError records an error
func (m *Metrics) RecordError(err string) {
m.TotalErrors.Add(1)
m.LastError.Set(err)
}
// calculatePercentiles calculates avg, p95, p99 for a slice of samples
func (m *Metrics) calculatePercentiles(samples []float64) (avg, p95, p99 float64) {
if len(samples) == 0 {
return 0, 0, 0
}
// Calculate average
sum := 0.0
for _, s := range samples {
sum += s
}
avg = sum / float64(len(samples))
// Sort for percentiles (simple bubble sort for small slices)
sorted := make([]float64, len(samples))
copy(sorted, samples)
for i := 0; i < len(sorted); i++ {
for j := 0; j < len(sorted)-i-1; j++ {
if sorted[j] > sorted[j+1] {
sorted[j], sorted[j+1] = sorted[j+1], sorted[j]
}
}
}
// Calculate percentiles
p95Index := int(float64(len(sorted)) * 0.95)
if p95Index >= len(sorted) {
p95Index = len(sorted) - 1
}
p95 = sorted[p95Index]
p99Index := int(float64(len(sorted)) * 0.99)
if p99Index >= len(sorted) {
p99Index = len(sorted) - 1
}
p99 = sorted[p99Index]
return avg, p95, p99
}
// Enhanced client with metrics integration
func (c *client) initMetrics() {
prefix := fmt.Sprintf("backbeat.sdk.%s", c.config.AgentID)
c.metrics = NewMetrics(prefix)
}
// Add metrics field to client struct (this would go in client.go)
type clientWithMetrics struct {
*client
metrics *Metrics
}
// Prometheus integration helper
type PrometheusMetrics struct {
// This would integrate with prometheus/client_golang
// For now, we'll just use expvar which can be scraped
}
// GetMetricsSnapshot returns a snapshot of all current metrics
func (m *Metrics) GetMetricsSnapshot() map[string]interface{} {
snapshot := make(map[string]interface{})
snapshot["connection_status"] = m.ConnectionStatus.Value()
snapshot["reconnect_count"] = m.ReconnectCount.Value()
snapshot["beats_received"] = m.BeatsReceived.Value()
snapshot["downbeats_received"] = m.DownbeatsReceived.Value()
snapshot["beat_misses"] = m.BeatMisses.Value()
snapshot["status_claims_emitted"] = m.StatusClaimsEmitted.Value()
snapshot["status_claim_errors"] = m.StatusClaimErrors.Value()
snapshot["budgets_created"] = m.BudgetsCreated.Value()
snapshot["budgets_completed"] = m.BudgetsCompleted.Value()
snapshot["budgets_timed_out"] = m.BudgetsTimedOut.Value()
snapshot["total_errors"] = m.TotalErrors.Value()
snapshot["last_error"] = m.LastError.Value()
return snapshot
}
// Health check with metrics
func (c *client) GetHealthWithMetrics() map[string]interface{} {
health := map[string]interface{}{
"status": c.Health(),
}
if c.metrics != nil {
health["metrics"] = c.metrics.GetMetricsSnapshot()
}
return health
}