feat: Production readiness improvements for WHOOSH council formation
Major security, observability, and configuration improvements:
## Security Hardening
- Implemented configurable CORS (no more wildcards)
- Added comprehensive auth middleware for admin endpoints
- Enhanced webhook HMAC validation
- Added input validation and rate limiting
- Security headers and CSP policies
## Configuration Management
- Made N8N webhook URL configurable (WHOOSH_N8N_BASE_URL)
- Replaced all hardcoded endpoints with environment variables
- Added feature flags for LLM vs heuristic composition
- Gitea fetch hardening with EAGER_FILTER and FULL_RESCAN options
## API Completeness
- Implemented GetCouncilComposition function
- Added GET /api/v1/councils/{id} endpoint
- Council artifacts API (POST/GET /api/v1/councils/{id}/artifacts)
- /admin/health/details endpoint with component status
- Database lookup for repository URLs (no hardcoded fallbacks)
## Observability & Performance
- Added OpenTelemetry distributed tracing with goal/pulse correlation
- Performance optimization database indexes
- Comprehensive health monitoring
- Enhanced logging and error handling
## Infrastructure
- Production-ready P2P discovery (replaces mock implementation)
- Removed unused Redis configuration
- Enhanced Docker Swarm integration
- Added migration files for performance indexes
## Code Quality
- Comprehensive input validation
- Graceful error handling and failsafe fallbacks
- Backwards compatibility maintained
- Following security best practices
🤖 Generated with [Claude Code](https://claude.ai/code)
Co-Authored-By: Claude <noreply@anthropic.com>
This commit is contained in:
373
vendor/github.com/chorus-services/backbeat/pkg/sdk/README.md
generated
vendored
Normal file
373
vendor/github.com/chorus-services/backbeat/pkg/sdk/README.md
generated
vendored
Normal file
@@ -0,0 +1,373 @@
|
||||
# BACKBEAT Go SDK
|
||||
|
||||
The BACKBEAT Go SDK enables CHORUS services to become "BACKBEAT-aware" by providing client libraries for beat synchronization, status emission, and beat-budget management.
|
||||
|
||||
## Features
|
||||
|
||||
- **Beat Subscription (BACKBEAT-REQ-040)**: Subscribe to beat and downbeat events with jitter-tolerant scheduling
|
||||
- **Status Emission (BACKBEAT-REQ-041)**: Emit status claims with automatic agent_id, task_id, and HLC population
|
||||
- **Beat Budgets (BACKBEAT-REQ-042)**: Execute functions with beat-based timeouts and cancellation
|
||||
- **Legacy Compatibility (BACKBEAT-REQ-043)**: Support for legacy `{bar,beat}` patterns with migration warnings
|
||||
- **Security (BACKBEAT-REQ-044)**: Ed25519 signing and required headers for status claims
|
||||
- **Local Degradation**: Continue operating when pulse service is unavailable
|
||||
- **Comprehensive Observability**: Metrics, health reporting, and performance monitoring
|
||||
|
||||
## Quick Start
|
||||
|
||||
```go
|
||||
package main
|
||||
|
||||
import (
|
||||
"context"
|
||||
"crypto/ed25519"
|
||||
"crypto/rand"
|
||||
"log/slog"
|
||||
|
||||
"github.com/chorus-services/backbeat/pkg/sdk"
|
||||
)
|
||||
|
||||
func main() {
|
||||
// Generate signing key
|
||||
_, signingKey, _ := ed25519.GenerateKey(rand.Reader)
|
||||
|
||||
// Configure SDK
|
||||
config := sdk.DefaultConfig()
|
||||
config.ClusterID = "chorus-dev"
|
||||
config.AgentID = "my-service"
|
||||
config.NATSUrl = "nats://localhost:4222"
|
||||
config.SigningKey = signingKey
|
||||
|
||||
// Create client
|
||||
client := sdk.NewClient(config)
|
||||
|
||||
// Register beat callback
|
||||
client.OnBeat(func(beat sdk.BeatFrame) {
|
||||
slog.Info("Beat received", "beat_index", beat.BeatIndex)
|
||||
|
||||
// Emit status
|
||||
client.EmitStatusClaim(sdk.StatusClaim{
|
||||
State: "executing",
|
||||
BeatsLeft: 5,
|
||||
Progress: 0.3,
|
||||
Notes: "Processing data",
|
||||
})
|
||||
})
|
||||
|
||||
// Start client
|
||||
ctx := context.Background()
|
||||
if err := client.Start(ctx); err != nil {
|
||||
panic(err)
|
||||
}
|
||||
defer client.Stop()
|
||||
|
||||
// Your service logic here...
|
||||
select {}
|
||||
}
|
||||
```
|
||||
|
||||
## Configuration
|
||||
|
||||
### Basic Configuration
|
||||
|
||||
```go
|
||||
config := &sdk.Config{
|
||||
ClusterID: "your-cluster", // BACKBEAT cluster ID
|
||||
AgentID: "your-agent", // Unique agent identifier
|
||||
NATSUrl: "nats://localhost:4222", // NATS connection URL
|
||||
}
|
||||
```
|
||||
|
||||
### Advanced Configuration
|
||||
|
||||
```go
|
||||
config := sdk.DefaultConfig()
|
||||
config.ClusterID = "chorus-prod"
|
||||
config.AgentID = "web-service-01"
|
||||
config.NATSUrl = "nats://nats.cluster.local:4222"
|
||||
config.SigningKey = loadSigningKey() // Ed25519 private key
|
||||
config.JitterTolerance = 100 * time.Millisecond
|
||||
config.ReconnectDelay = 2 * time.Second
|
||||
config.MaxReconnects = 10 // -1 for infinite
|
||||
config.Logger = slog.New(slog.NewJSONHandler(os.Stdout, nil))
|
||||
```
|
||||
|
||||
## Core Features
|
||||
|
||||
### Beat Subscription
|
||||
|
||||
```go
|
||||
// Register beat callback (called every beat)
|
||||
client.OnBeat(func(beat sdk.BeatFrame) {
|
||||
// Your beat logic here
|
||||
fmt.Printf("Beat %d at %s\n", beat.BeatIndex, beat.DeadlineAt)
|
||||
})
|
||||
|
||||
// Register downbeat callback (called at bar starts)
|
||||
client.OnDownbeat(func(beat sdk.BeatFrame) {
|
||||
// Your downbeat logic here
|
||||
fmt.Printf("Bar started: %s\n", beat.WindowID)
|
||||
})
|
||||
```
|
||||
|
||||
### Status Emission
|
||||
|
||||
```go
|
||||
// Basic status emission
|
||||
err := client.EmitStatusClaim(sdk.StatusClaim{
|
||||
State: "executing", // executing|planning|waiting|review|done|failed
|
||||
BeatsLeft: 10, // estimated beats remaining
|
||||
Progress: 0.75, // progress ratio (0.0-1.0)
|
||||
Notes: "Processing batch 5/10",
|
||||
})
|
||||
|
||||
// Advanced status with task tracking
|
||||
err := client.EmitStatusClaim(sdk.StatusClaim{
|
||||
TaskID: "task-12345", // auto-generated if empty
|
||||
State: "waiting",
|
||||
WaitFor: []string{"hmmm://thread/abc123"}, // dependencies
|
||||
BeatsLeft: 0,
|
||||
Progress: 1.0,
|
||||
Notes: "Waiting for thread completion",
|
||||
})
|
||||
```
|
||||
|
||||
### Beat Budgets
|
||||
|
||||
```go
|
||||
// Execute with beat-based timeout
|
||||
err := client.WithBeatBudget(10, func() error {
|
||||
// This function has 10 beats to complete
|
||||
return performTask()
|
||||
})
|
||||
|
||||
if err != nil {
|
||||
// Handle timeout or task error
|
||||
fmt.Printf("Task failed or exceeded budget: %v\n", err)
|
||||
}
|
||||
|
||||
// Real-world example
|
||||
err := client.WithBeatBudget(20, func() error {
|
||||
// Database operation with beat budget
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
|
||||
defer cancel()
|
||||
|
||||
return database.ProcessBatch(ctx, batchData)
|
||||
})
|
||||
```
|
||||
|
||||
## Client Interface
|
||||
|
||||
```go
|
||||
type Client interface {
|
||||
// Beat subscription
|
||||
OnBeat(callback func(BeatFrame)) error
|
||||
OnDownbeat(callback func(BeatFrame)) error
|
||||
|
||||
// Status emission
|
||||
EmitStatusClaim(claim StatusClaim) error
|
||||
|
||||
// Beat budgets
|
||||
WithBeatBudget(n int, fn func() error) error
|
||||
|
||||
// Utilities
|
||||
GetCurrentBeat() int64
|
||||
GetCurrentWindow() string
|
||||
IsInWindow(windowID string) bool
|
||||
|
||||
// Lifecycle
|
||||
Start(ctx context.Context) error
|
||||
Stop() error
|
||||
Health() HealthStatus
|
||||
}
|
||||
```
|
||||
|
||||
## Examples
|
||||
|
||||
The SDK includes comprehensive examples:
|
||||
|
||||
- **[Simple Agent](examples/simple_agent.go)**: Basic beat subscription and status emission
|
||||
- **[Task Processor](examples/task_processor.go)**: Beat budget usage for task timeout management
|
||||
- **[Service Monitor](examples/service_monitor.go)**: Health monitoring with beat-aligned reporting
|
||||
|
||||
### Running Examples
|
||||
|
||||
```bash
|
||||
# Simple agent example
|
||||
go run pkg/sdk/examples/simple_agent.go
|
||||
|
||||
# Task processor with beat budgets
|
||||
go run pkg/sdk/examples/task_processor.go
|
||||
|
||||
# Service monitor with health reporting
|
||||
go run pkg/sdk/examples/service_monitor.go
|
||||
```
|
||||
|
||||
## Observability
|
||||
|
||||
### Health Monitoring
|
||||
|
||||
```go
|
||||
health := client.Health()
|
||||
fmt.Printf("Connected: %v\n", health.Connected)
|
||||
fmt.Printf("Last Beat: %d at %s\n", health.LastBeat, health.LastBeatTime)
|
||||
fmt.Printf("Time Drift: %s\n", health.TimeDrift)
|
||||
fmt.Printf("Reconnects: %d\n", health.ReconnectCount)
|
||||
fmt.Printf("Local Degradation: %v\n", health.LocalDegradation)
|
||||
```
|
||||
|
||||
### Metrics
|
||||
|
||||
The SDK exposes metrics via Go's `expvar` package:
|
||||
|
||||
- Connection metrics: status, reconnection count, duration
|
||||
- Beat metrics: received, jitter, callback latency, misses
|
||||
- Status metrics: claims emitted, errors
|
||||
- Budget metrics: created, completed, timed out
|
||||
- Error metrics: total count, last error
|
||||
|
||||
Access metrics at `http://localhost:8080/debug/vars` when using `expvar`.
|
||||
|
||||
### Logging
|
||||
|
||||
The SDK uses structured logging via `slog`:
|
||||
|
||||
```go
|
||||
config.Logger = slog.New(slog.NewJSONHandler(os.Stdout, &slog.HandlerOptions{
|
||||
Level: slog.LevelDebug, // Set appropriate level
|
||||
}))
|
||||
```
|
||||
|
||||
## Error Handling
|
||||
|
||||
The SDK provides comprehensive error handling:
|
||||
|
||||
- **Connection Errors**: Automatic reconnection with exponential backoff
|
||||
- **Beat Jitter**: Tolerance for network delays and timing variations
|
||||
- **Callback Panics**: Recovery and logging without affecting other callbacks
|
||||
- **Validation Errors**: Status claim validation with detailed error messages
|
||||
- **Timeout Errors**: Beat budget timeouts with context cancellation
|
||||
|
||||
## Local Degradation
|
||||
|
||||
When the pulse service is unavailable, the SDK automatically enters local degradation mode:
|
||||
|
||||
- Generates synthetic beats to maintain callback timing
|
||||
- Uses fallback 60 BPM tempo
|
||||
- Marks beat frames with "degraded" phase
|
||||
- Automatically recovers when pulse service returns
|
||||
|
||||
## Legacy Compatibility
|
||||
|
||||
Support for legacy `{bar,beat}` patterns (BACKBEAT-REQ-043):
|
||||
|
||||
```go
|
||||
// Convert legacy format (logs warning once)
|
||||
beatIndex := client.ConvertLegacyBeat(bar, beat)
|
||||
|
||||
// Get legacy format from current beat
|
||||
legacy := client.GetLegacyBeatInfo()
|
||||
fmt.Printf("Bar: %d, Beat: %d\n", legacy.Bar, legacy.Beat)
|
||||
```
|
||||
|
||||
## Security
|
||||
|
||||
The SDK implements BACKBEAT security requirements:
|
||||
|
||||
- **Ed25519 Signatures**: All status claims are signed when signing key provided
|
||||
- **Required Headers**: Includes `x-window-id` and `x-hlc` headers
|
||||
- **Agent Identification**: Automatic `x-agent-id` header for routing
|
||||
|
||||
```go
|
||||
// Configure signing
|
||||
_, signingKey, _ := ed25519.GenerateKey(rand.Reader)
|
||||
config.SigningKey = signingKey
|
||||
```
|
||||
|
||||
## Performance
|
||||
|
||||
The SDK is designed for high performance:
|
||||
|
||||
- **Beat Callback Latency**: Target ≤5ms callback execution
|
||||
- **Timer Drift**: ≤1% drift over 1 hour without leader
|
||||
- **Concurrent Safe**: All operations are goroutine-safe
|
||||
- **Memory Efficient**: Bounded error lists and metric samples
|
||||
|
||||
## Integration Patterns
|
||||
|
||||
### Web Service Integration
|
||||
|
||||
```go
|
||||
func main() {
|
||||
// Initialize BACKBEAT client
|
||||
client := sdk.NewClient(config)
|
||||
client.OnBeat(func(beat sdk.BeatFrame) {
|
||||
// Report web service status
|
||||
client.EmitStatusClaim(sdk.StatusClaim{
|
||||
State: "executing",
|
||||
Progress: getRequestSuccessRate(),
|
||||
Notes: fmt.Sprintf("Handling %d req/s", getCurrentRPS()),
|
||||
})
|
||||
})
|
||||
|
||||
// Start HTTP server
|
||||
http.HandleFunc("/health", func(w http.ResponseWriter, r *http.Request) {
|
||||
health := client.Health()
|
||||
json.NewEncoder(w).Encode(health)
|
||||
})
|
||||
}
|
||||
```
|
||||
|
||||
### Background Job Processor
|
||||
|
||||
```go
|
||||
func processJobs(client sdk.Client) {
|
||||
for job := range jobQueue {
|
||||
// Use beat budget for job timeout
|
||||
err := client.WithBeatBudget(job.MaxBeats, func() error {
|
||||
return processJob(job)
|
||||
})
|
||||
|
||||
if err != nil {
|
||||
client.EmitStatusClaim(sdk.StatusClaim{
|
||||
TaskID: job.ID,
|
||||
State: "failed",
|
||||
Notes: err.Error(),
|
||||
})
|
||||
}
|
||||
}
|
||||
}
|
||||
```
|
||||
|
||||
## Testing
|
||||
|
||||
The SDK includes comprehensive test utilities:
|
||||
|
||||
```bash
|
||||
# Run all tests
|
||||
go test ./pkg/sdk/...
|
||||
|
||||
# Run with race detection
|
||||
go test -race ./pkg/sdk/...
|
||||
|
||||
# Run benchmarks
|
||||
go test -bench=. ./pkg/sdk/examples/
|
||||
```
|
||||
|
||||
## Requirements
|
||||
|
||||
- Go 1.22 or later
|
||||
- NATS server for messaging
|
||||
- BACKBEAT pulse service running
|
||||
- Network connectivity to cluster
|
||||
|
||||
## Contributing
|
||||
|
||||
1. Follow standard Go conventions
|
||||
2. Include comprehensive tests
|
||||
3. Update documentation for API changes
|
||||
4. Ensure examples remain working
|
||||
5. Maintain backward compatibility
|
||||
|
||||
## License
|
||||
|
||||
This SDK is part of the BACKBEAT project and follows the same licensing terms.
|
||||
480
vendor/github.com/chorus-services/backbeat/pkg/sdk/client.go
generated
vendored
Normal file
480
vendor/github.com/chorus-services/backbeat/pkg/sdk/client.go
generated
vendored
Normal file
@@ -0,0 +1,480 @@
|
||||
// Package sdk provides the BACKBEAT Go SDK for enabling CHORUS services
|
||||
// to become BACKBEAT-aware with beat synchronization and status emission.
|
||||
package sdk
|
||||
|
||||
import (
|
||||
"context"
|
||||
"crypto/ed25519"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"log/slog"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/google/uuid"
|
||||
"github.com/nats-io/nats.go"
|
||||
)
|
||||
|
||||
// Client interface defines the core BACKBEAT SDK functionality
|
||||
// Implements BACKBEAT-REQ-040, 041, 042, 043, 044
|
||||
type Client interface {
|
||||
// Beat subscription (BACKBEAT-REQ-040)
|
||||
OnBeat(callback func(BeatFrame)) error
|
||||
OnDownbeat(callback func(BeatFrame)) error
|
||||
|
||||
// Status emission (BACKBEAT-REQ-041)
|
||||
EmitStatusClaim(claim StatusClaim) error
|
||||
|
||||
// Beat budgets (BACKBEAT-REQ-042)
|
||||
WithBeatBudget(n int, fn func() error) error
|
||||
|
||||
// Utilities
|
||||
GetCurrentBeat() int64
|
||||
GetCurrentWindow() string
|
||||
IsInWindow(windowID string) bool
|
||||
GetCurrentTempo() int
|
||||
GetTempoDrift() time.Duration
|
||||
|
||||
// Lifecycle management
|
||||
Start(ctx context.Context) error
|
||||
Stop() error
|
||||
Health() HealthStatus
|
||||
}
|
||||
|
||||
// Config represents the SDK configuration
|
||||
type Config struct {
|
||||
ClusterID string // BACKBEAT cluster identifier
|
||||
AgentID string // Unique agent identifier
|
||||
NATSUrl string // NATS connection URL
|
||||
SigningKey ed25519.PrivateKey // Ed25519 private key for signing (BACKBEAT-REQ-044)
|
||||
Logger *slog.Logger // Structured logger
|
||||
JitterTolerance time.Duration // Maximum jitter tolerance (default: 50ms)
|
||||
ReconnectDelay time.Duration // NATS reconnection delay (default: 1s)
|
||||
MaxReconnects int // Maximum reconnection attempts (default: -1 for infinite)
|
||||
}
|
||||
|
||||
// DefaultConfig returns a Config with sensible defaults
|
||||
func DefaultConfig() *Config {
|
||||
return &Config{
|
||||
JitterTolerance: 50 * time.Millisecond,
|
||||
ReconnectDelay: 1 * time.Second,
|
||||
MaxReconnects: -1, // Infinite reconnects
|
||||
Logger: slog.Default(),
|
||||
}
|
||||
}
|
||||
|
||||
// BeatFrame represents a beat frame with timing information
|
||||
type BeatFrame struct {
|
||||
Type string `json:"type"`
|
||||
ClusterID string `json:"cluster_id"`
|
||||
BeatIndex int64 `json:"beat_index"`
|
||||
Downbeat bool `json:"downbeat"`
|
||||
Phase string `json:"phase"`
|
||||
HLC string `json:"hlc"`
|
||||
DeadlineAt time.Time `json:"deadline_at"`
|
||||
TempoBPM int `json:"tempo_bpm"`
|
||||
WindowID string `json:"window_id"`
|
||||
}
|
||||
|
||||
// StatusClaim represents a status claim emission
|
||||
type StatusClaim struct {
|
||||
// Auto-populated by SDK
|
||||
Type string `json:"type"` // Always "backbeat.statusclaim.v1"
|
||||
AgentID string `json:"agent_id"` // Auto-populated from config
|
||||
TaskID string `json:"task_id"` // Auto-generated if not provided
|
||||
BeatIndex int64 `json:"beat_index"` // Auto-populated from current beat
|
||||
HLC string `json:"hlc"` // Auto-populated from current HLC
|
||||
|
||||
// User-provided
|
||||
State string `json:"state"` // executing|planning|waiting|review|done|failed
|
||||
WaitFor []string `json:"wait_for,omitempty"` // refs (e.g., hmmm://thread/...)
|
||||
BeatsLeft int `json:"beats_left"` // estimated beats remaining
|
||||
Progress float64 `json:"progress"` // progress ratio (0.0-1.0)
|
||||
Notes string `json:"notes"` // status description
|
||||
}
|
||||
|
||||
// HealthStatus represents the current health of the SDK client
|
||||
type HealthStatus struct {
|
||||
Connected bool `json:"connected"`
|
||||
LastBeat int64 `json:"last_beat"`
|
||||
LastBeatTime time.Time `json:"last_beat_time"`
|
||||
TimeDrift time.Duration `json:"time_drift"`
|
||||
ReconnectCount int `json:"reconnect_count"`
|
||||
LocalDegradation bool `json:"local_degradation"`
|
||||
CurrentTempo int `json:"current_tempo"`
|
||||
TempoDrift time.Duration `json:"tempo_drift"`
|
||||
MeasuredBPM float64 `json:"measured_bpm"`
|
||||
Errors []string `json:"errors,omitempty"`
|
||||
}
|
||||
|
||||
// LegacyBeatInfo represents legacy {bar,beat} information
|
||||
// For BACKBEAT-REQ-043 compatibility
|
||||
type LegacyBeatInfo struct {
|
||||
Bar int `json:"bar"`
|
||||
Beat int `json:"beat"`
|
||||
}
|
||||
|
||||
// tempoSample represents a tempo measurement for drift calculation
|
||||
type tempoSample struct {
|
||||
BeatIndex int64
|
||||
Tempo int
|
||||
MeasuredTime time.Time
|
||||
ActualBPM float64 // Measured BPM based on inter-beat timing
|
||||
}
|
||||
|
||||
// client implements the Client interface
|
||||
type client struct {
|
||||
config *Config
|
||||
nc *nats.Conn
|
||||
ctx context.Context
|
||||
cancel context.CancelFunc
|
||||
wg sync.WaitGroup
|
||||
|
||||
// Beat tracking
|
||||
currentBeat int64
|
||||
currentWindow string
|
||||
currentHLC string
|
||||
lastBeatTime time.Time
|
||||
currentTempo int // Current tempo in BPM
|
||||
lastTempo int // Last known tempo for drift calculation
|
||||
tempoHistory []tempoSample // History for drift calculation
|
||||
beatMutex sync.RWMutex
|
||||
|
||||
// Callbacks
|
||||
beatCallbacks []func(BeatFrame)
|
||||
downbeatCallbacks []func(BeatFrame)
|
||||
callbackMutex sync.RWMutex
|
||||
|
||||
// Health and metrics
|
||||
reconnectCount int
|
||||
localDegradation bool
|
||||
errors []string
|
||||
errorMutex sync.RWMutex
|
||||
metrics *Metrics
|
||||
|
||||
// Beat budget tracking
|
||||
budgetContexts map[string]context.CancelFunc
|
||||
budgetMutex sync.Mutex
|
||||
|
||||
// Legacy compatibility
|
||||
legacyWarned bool
|
||||
legacyMutex sync.Mutex
|
||||
}
|
||||
|
||||
// NewClient creates a new BACKBEAT SDK client
|
||||
func NewClient(config *Config) Client {
|
||||
if config.Logger == nil {
|
||||
config.Logger = slog.Default()
|
||||
}
|
||||
|
||||
c := &client{
|
||||
config: config,
|
||||
beatCallbacks: make([]func(BeatFrame), 0),
|
||||
downbeatCallbacks: make([]func(BeatFrame), 0),
|
||||
budgetContexts: make(map[string]context.CancelFunc),
|
||||
errors: make([]string, 0),
|
||||
tempoHistory: make([]tempoSample, 0, 100),
|
||||
currentTempo: 60, // Default to 60 BPM
|
||||
}
|
||||
|
||||
// Initialize metrics
|
||||
prefix := fmt.Sprintf("backbeat.sdk.%s", config.AgentID)
|
||||
c.metrics = NewMetrics(prefix)
|
||||
|
||||
return c
|
||||
}
|
||||
|
||||
// Start initializes the client and begins beat synchronization
|
||||
func (c *client) Start(ctx context.Context) error {
|
||||
c.ctx, c.cancel = context.WithCancel(ctx)
|
||||
|
||||
if err := c.connect(); err != nil {
|
||||
return fmt.Errorf("failed to connect to NATS: %w", err)
|
||||
}
|
||||
|
||||
c.wg.Add(1)
|
||||
go c.beatSubscriptionLoop()
|
||||
|
||||
c.config.Logger.Info("BACKBEAT SDK client started",
|
||||
slog.String("cluster_id", c.config.ClusterID),
|
||||
slog.String("agent_id", c.config.AgentID))
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// Stop gracefully stops the client
|
||||
func (c *client) Stop() error {
|
||||
if c.cancel != nil {
|
||||
c.cancel()
|
||||
}
|
||||
|
||||
// Cancel all active beat budgets
|
||||
c.budgetMutex.Lock()
|
||||
for id, cancel := range c.budgetContexts {
|
||||
cancel()
|
||||
delete(c.budgetContexts, id)
|
||||
}
|
||||
c.budgetMutex.Unlock()
|
||||
|
||||
if c.nc != nil {
|
||||
c.nc.Close()
|
||||
}
|
||||
|
||||
c.wg.Wait()
|
||||
|
||||
c.config.Logger.Info("BACKBEAT SDK client stopped")
|
||||
return nil
|
||||
}
|
||||
|
||||
// OnBeat registers a callback for beat events (BACKBEAT-REQ-040)
|
||||
func (c *client) OnBeat(callback func(BeatFrame)) error {
|
||||
if callback == nil {
|
||||
return fmt.Errorf("callback cannot be nil")
|
||||
}
|
||||
|
||||
c.callbackMutex.Lock()
|
||||
defer c.callbackMutex.Unlock()
|
||||
|
||||
c.beatCallbacks = append(c.beatCallbacks, callback)
|
||||
return nil
|
||||
}
|
||||
|
||||
// OnDownbeat registers a callback for downbeat events (BACKBEAT-REQ-040)
|
||||
func (c *client) OnDownbeat(callback func(BeatFrame)) error {
|
||||
if callback == nil {
|
||||
return fmt.Errorf("callback cannot be nil")
|
||||
}
|
||||
|
||||
c.callbackMutex.Lock()
|
||||
defer c.callbackMutex.Unlock()
|
||||
|
||||
c.downbeatCallbacks = append(c.downbeatCallbacks, callback)
|
||||
return nil
|
||||
}
|
||||
|
||||
// EmitStatusClaim emits a status claim (BACKBEAT-REQ-041)
|
||||
func (c *client) EmitStatusClaim(claim StatusClaim) error {
|
||||
// Auto-populate required fields
|
||||
claim.Type = "backbeat.statusclaim.v1"
|
||||
claim.AgentID = c.config.AgentID
|
||||
claim.BeatIndex = c.GetCurrentBeat()
|
||||
claim.HLC = c.getCurrentHLC()
|
||||
|
||||
// Auto-generate task ID if not provided
|
||||
if claim.TaskID == "" {
|
||||
claim.TaskID = fmt.Sprintf("task:%s", uuid.New().String()[:8])
|
||||
}
|
||||
|
||||
// Validate the claim
|
||||
if err := c.validateStatusClaim(&claim); err != nil {
|
||||
return fmt.Errorf("invalid status claim: %w", err)
|
||||
}
|
||||
|
||||
// Sign the claim if signing key is available (BACKBEAT-REQ-044)
|
||||
if c.config.SigningKey != nil {
|
||||
if err := c.signStatusClaim(&claim); err != nil {
|
||||
return fmt.Errorf("failed to sign status claim: %w", err)
|
||||
}
|
||||
}
|
||||
|
||||
// Publish to NATS
|
||||
data, err := json.Marshal(claim)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to marshal status claim: %w", err)
|
||||
}
|
||||
|
||||
subject := fmt.Sprintf("backbeat.status.%s", c.config.ClusterID)
|
||||
headers := c.createHeaders()
|
||||
|
||||
msg := &nats.Msg{
|
||||
Subject: subject,
|
||||
Data: data,
|
||||
Header: headers,
|
||||
}
|
||||
|
||||
if err := c.nc.PublishMsg(msg); err != nil {
|
||||
c.addError(fmt.Sprintf("failed to publish status claim: %v", err))
|
||||
c.metrics.RecordStatusClaim(false)
|
||||
return fmt.Errorf("failed to publish status claim: %w", err)
|
||||
}
|
||||
|
||||
c.metrics.RecordStatusClaim(true)
|
||||
c.config.Logger.Debug("Status claim emitted",
|
||||
slog.String("agent_id", claim.AgentID),
|
||||
slog.String("task_id", claim.TaskID),
|
||||
slog.String("state", claim.State),
|
||||
slog.Int64("beat_index", claim.BeatIndex))
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// WithBeatBudget executes a function with a beat-based timeout (BACKBEAT-REQ-042)
|
||||
func (c *client) WithBeatBudget(n int, fn func() error) error {
|
||||
if n <= 0 {
|
||||
return fmt.Errorf("beat budget must be positive, got %d", n)
|
||||
}
|
||||
|
||||
// Calculate timeout based on current tempo
|
||||
currentBeat := c.GetCurrentBeat()
|
||||
beatDuration := c.getBeatDuration()
|
||||
timeout := time.Duration(n) * beatDuration
|
||||
|
||||
// Use background context if client context is not set (for testing)
|
||||
baseCtx := c.ctx
|
||||
if baseCtx == nil {
|
||||
baseCtx = context.Background()
|
||||
}
|
||||
|
||||
ctx, cancel := context.WithTimeout(baseCtx, timeout)
|
||||
defer cancel()
|
||||
|
||||
// Track the budget context for cancellation
|
||||
budgetID := uuid.New().String()
|
||||
c.budgetMutex.Lock()
|
||||
c.budgetContexts[budgetID] = cancel
|
||||
c.budgetMutex.Unlock()
|
||||
|
||||
// Record budget creation
|
||||
c.metrics.RecordBudgetCreated()
|
||||
|
||||
defer func() {
|
||||
c.budgetMutex.Lock()
|
||||
delete(c.budgetContexts, budgetID)
|
||||
c.budgetMutex.Unlock()
|
||||
}()
|
||||
|
||||
// Execute function with timeout
|
||||
done := make(chan error, 1)
|
||||
go func() {
|
||||
done <- fn()
|
||||
}()
|
||||
|
||||
select {
|
||||
case err := <-done:
|
||||
c.metrics.RecordBudgetCompleted(false) // Not timed out
|
||||
if err != nil {
|
||||
c.config.Logger.Debug("Beat budget function completed with error",
|
||||
slog.Int("budget", n),
|
||||
slog.Int64("start_beat", currentBeat),
|
||||
slog.String("error", err.Error()))
|
||||
} else {
|
||||
c.config.Logger.Debug("Beat budget function completed successfully",
|
||||
slog.Int("budget", n),
|
||||
slog.Int64("start_beat", currentBeat))
|
||||
}
|
||||
return err
|
||||
case <-ctx.Done():
|
||||
c.metrics.RecordBudgetCompleted(true) // Timed out
|
||||
c.config.Logger.Warn("Beat budget exceeded",
|
||||
slog.Int("budget", n),
|
||||
slog.Int64("start_beat", currentBeat),
|
||||
slog.Duration("timeout", timeout))
|
||||
return fmt.Errorf("beat budget of %d beats exceeded", n)
|
||||
}
|
||||
}
|
||||
|
||||
// GetCurrentBeat returns the current beat index
|
||||
func (c *client) GetCurrentBeat() int64 {
|
||||
c.beatMutex.RLock()
|
||||
defer c.beatMutex.RUnlock()
|
||||
return c.currentBeat
|
||||
}
|
||||
|
||||
// GetCurrentWindow returns the current window ID
|
||||
func (c *client) GetCurrentWindow() string {
|
||||
c.beatMutex.RLock()
|
||||
defer c.beatMutex.RUnlock()
|
||||
return c.currentWindow
|
||||
}
|
||||
|
||||
// IsInWindow checks if we're currently in the specified window
|
||||
func (c *client) IsInWindow(windowID string) bool {
|
||||
return c.GetCurrentWindow() == windowID
|
||||
}
|
||||
|
||||
// GetCurrentTempo returns the current tempo in BPM
|
||||
func (c *client) GetCurrentTempo() int {
|
||||
c.beatMutex.RLock()
|
||||
defer c.beatMutex.RUnlock()
|
||||
return c.currentTempo
|
||||
}
|
||||
|
||||
// GetTempoDrift calculates the drift between expected and actual tempo
|
||||
func (c *client) GetTempoDrift() time.Duration {
|
||||
c.beatMutex.RLock()
|
||||
defer c.beatMutex.RUnlock()
|
||||
|
||||
if len(c.tempoHistory) < 2 {
|
||||
return 0
|
||||
}
|
||||
|
||||
// Calculate average measured BPM from recent samples
|
||||
historyLen := len(c.tempoHistory)
|
||||
recentCount := 10
|
||||
if historyLen < recentCount {
|
||||
recentCount = historyLen
|
||||
}
|
||||
|
||||
recent := c.tempoHistory[historyLen-recentCount:]
|
||||
if len(recent) < 2 {
|
||||
recent = c.tempoHistory
|
||||
}
|
||||
|
||||
totalBPM := 0.0
|
||||
for _, sample := range recent {
|
||||
totalBPM += sample.ActualBPM
|
||||
}
|
||||
avgMeasuredBPM := totalBPM / float64(len(recent))
|
||||
|
||||
// Calculate drift
|
||||
expectedBeatDuration := 60.0 / float64(c.currentTempo)
|
||||
actualBeatDuration := 60.0 / avgMeasuredBPM
|
||||
|
||||
drift := actualBeatDuration - expectedBeatDuration
|
||||
return time.Duration(drift * float64(time.Second))
|
||||
}
|
||||
|
||||
// Health returns the current health status
|
||||
func (c *client) Health() HealthStatus {
|
||||
c.errorMutex.RLock()
|
||||
errors := make([]string, len(c.errors))
|
||||
copy(errors, c.errors)
|
||||
c.errorMutex.RUnlock()
|
||||
|
||||
c.beatMutex.RLock()
|
||||
timeDrift := time.Since(c.lastBeatTime)
|
||||
currentTempo := c.currentTempo
|
||||
|
||||
// Calculate measured BPM from recent tempo history
|
||||
measuredBPM := 60.0 // Default
|
||||
if len(c.tempoHistory) > 0 {
|
||||
historyLen := len(c.tempoHistory)
|
||||
recentCount := 5
|
||||
if historyLen < recentCount {
|
||||
recentCount = historyLen
|
||||
}
|
||||
|
||||
recent := c.tempoHistory[historyLen-recentCount:]
|
||||
totalBPM := 0.0
|
||||
for _, sample := range recent {
|
||||
totalBPM += sample.ActualBPM
|
||||
}
|
||||
measuredBPM = totalBPM / float64(len(recent))
|
||||
}
|
||||
c.beatMutex.RUnlock()
|
||||
|
||||
tempoDrift := c.GetTempoDrift()
|
||||
|
||||
return HealthStatus{
|
||||
Connected: c.nc != nil && c.nc.IsConnected(),
|
||||
LastBeat: c.GetCurrentBeat(),
|
||||
LastBeatTime: c.lastBeatTime,
|
||||
TimeDrift: timeDrift,
|
||||
ReconnectCount: c.reconnectCount,
|
||||
LocalDegradation: c.localDegradation,
|
||||
CurrentTempo: currentTempo,
|
||||
TempoDrift: tempoDrift,
|
||||
MeasuredBPM: measuredBPM,
|
||||
Errors: errors,
|
||||
}
|
||||
}
|
||||
110
vendor/github.com/chorus-services/backbeat/pkg/sdk/doc.go
generated
vendored
Normal file
110
vendor/github.com/chorus-services/backbeat/pkg/sdk/doc.go
generated
vendored
Normal file
@@ -0,0 +1,110 @@
|
||||
// Package sdk provides the BACKBEAT Go SDK for enabling CHORUS services
|
||||
// to become BACKBEAT-aware with beat synchronization and status emission.
|
||||
//
|
||||
// The BACKBEAT SDK enables services to:
|
||||
// - Subscribe to cluster-wide beat events with jitter tolerance
|
||||
// - Emit status claims with automatic metadata population
|
||||
// - Use beat budgets for timeout management
|
||||
// - Operate in local degradation mode when pulse unavailable
|
||||
// - Integrate comprehensive observability and health reporting
|
||||
//
|
||||
// # Quick Start
|
||||
//
|
||||
// config := sdk.DefaultConfig()
|
||||
// config.ClusterID = "chorus-dev"
|
||||
// config.AgentID = "my-service"
|
||||
// config.NATSUrl = "nats://localhost:4222"
|
||||
//
|
||||
// client := sdk.NewClient(config)
|
||||
//
|
||||
// client.OnBeat(func(beat sdk.BeatFrame) {
|
||||
// // Called every beat
|
||||
// client.EmitStatusClaim(sdk.StatusClaim{
|
||||
// State: "executing",
|
||||
// Progress: 0.5,
|
||||
// Notes: "Processing data",
|
||||
// })
|
||||
// })
|
||||
//
|
||||
// ctx := context.Background()
|
||||
// client.Start(ctx)
|
||||
// defer client.Stop()
|
||||
//
|
||||
// # Beat Subscription
|
||||
//
|
||||
// Register callbacks for beat and downbeat events:
|
||||
//
|
||||
// client.OnBeat(func(beat sdk.BeatFrame) {
|
||||
// // Called every beat (~1-4 times per second depending on tempo)
|
||||
// fmt.Printf("Beat %d\n", beat.BeatIndex)
|
||||
// })
|
||||
//
|
||||
// client.OnDownbeat(func(beat sdk.BeatFrame) {
|
||||
// // Called at the start of each bar (every 4 beats typically)
|
||||
// fmt.Printf("Bar started: %s\n", beat.WindowID)
|
||||
// })
|
||||
//
|
||||
// # Status Emission
|
||||
//
|
||||
// Emit status claims to report current state and progress:
|
||||
//
|
||||
// err := client.EmitStatusClaim(sdk.StatusClaim{
|
||||
// State: "executing", // executing|planning|waiting|review|done|failed
|
||||
// BeatsLeft: 10, // estimated beats remaining
|
||||
// Progress: 0.75, // progress ratio (0.0-1.0)
|
||||
// Notes: "Processing batch 5/10",
|
||||
// })
|
||||
//
|
||||
// # Beat Budgets
|
||||
//
|
||||
// Execute functions with beat-based timeouts:
|
||||
//
|
||||
// err := client.WithBeatBudget(10, func() error {
|
||||
// // This function has 10 beats to complete
|
||||
// return performLongRunningTask()
|
||||
// })
|
||||
//
|
||||
// if err != nil {
|
||||
// // Handle timeout or task error
|
||||
// log.Printf("Task failed or exceeded budget: %v", err)
|
||||
// }
|
||||
//
|
||||
// # Health and Observability
|
||||
//
|
||||
// Monitor client health and metrics:
|
||||
//
|
||||
// health := client.Health()
|
||||
// fmt.Printf("Connected: %v\n", health.Connected)
|
||||
// fmt.Printf("Last Beat: %d\n", health.LastBeat)
|
||||
// fmt.Printf("Reconnects: %d\n", health.ReconnectCount)
|
||||
//
|
||||
// # Local Degradation
|
||||
//
|
||||
// The SDK automatically handles network issues by entering local degradation mode:
|
||||
// - Generates synthetic beats when pulse service unavailable
|
||||
// - Uses fallback timing to maintain callback schedules
|
||||
// - Automatically recovers when pulse service returns
|
||||
// - Provides seamless operation during network partitions
|
||||
//
|
||||
// # Security
|
||||
//
|
||||
// The SDK implements BACKBEAT security requirements:
|
||||
// - Ed25519 signing of all status claims when key provided
|
||||
// - Required x-window-id and x-hlc headers
|
||||
// - Agent identification for proper message routing
|
||||
//
|
||||
// # Performance
|
||||
//
|
||||
// Designed for production use with:
|
||||
// - Beat callback latency target ≤5ms
|
||||
// - Timer drift ≤1% over 1 hour without leader
|
||||
// - Goroutine-safe concurrent operations
|
||||
// - Bounded memory usage for metrics and errors
|
||||
//
|
||||
// # Examples
|
||||
//
|
||||
// See the examples subdirectory for complete usage patterns:
|
||||
// - examples/simple_agent.go: Basic integration
|
||||
// - examples/task_processor.go: Beat budget usage
|
||||
// - examples/service_monitor.go: Health monitoring
|
||||
package sdk
|
||||
426
vendor/github.com/chorus-services/backbeat/pkg/sdk/internal.go
generated
vendored
Normal file
426
vendor/github.com/chorus-services/backbeat/pkg/sdk/internal.go
generated
vendored
Normal file
@@ -0,0 +1,426 @@
|
||||
package sdk
|
||||
|
||||
import (
|
||||
"crypto/ed25519"
|
||||
"crypto/sha256"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"time"
|
||||
|
||||
"github.com/nats-io/nats.go"
|
||||
)
|
||||
|
||||
// connect establishes connection to NATS with retry logic
|
||||
func (c *client) connect() error {
|
||||
opts := []nats.Option{
|
||||
nats.ReconnectWait(c.config.ReconnectDelay),
|
||||
nats.MaxReconnects(c.config.MaxReconnects),
|
||||
nats.ReconnectHandler(func(nc *nats.Conn) {
|
||||
c.reconnectCount++
|
||||
c.metrics.RecordConnection()
|
||||
c.config.Logger.Info("NATS reconnected",
|
||||
"reconnect_count", c.reconnectCount,
|
||||
"url", nc.ConnectedUrl())
|
||||
}),
|
||||
nats.DisconnectErrHandler(func(nc *nats.Conn, err error) {
|
||||
if err != nil {
|
||||
c.metrics.RecordDisconnection()
|
||||
c.addError(fmt.Sprintf("NATS disconnected: %v", err))
|
||||
c.config.Logger.Warn("NATS disconnected", "error", err)
|
||||
}
|
||||
}),
|
||||
nats.ClosedHandler(func(nc *nats.Conn) {
|
||||
c.metrics.RecordDisconnection()
|
||||
c.config.Logger.Info("NATS connection closed")
|
||||
}),
|
||||
}
|
||||
|
||||
nc, err := nats.Connect(c.config.NATSUrl, opts...)
|
||||
if err != nil {
|
||||
c.metrics.RecordError(fmt.Sprintf("NATS connection failed: %v", err))
|
||||
return fmt.Errorf("failed to connect to NATS: %w", err)
|
||||
}
|
||||
|
||||
c.nc = nc
|
||||
c.metrics.RecordConnection()
|
||||
c.config.Logger.Info("Connected to NATS", "url", nc.ConnectedUrl())
|
||||
return nil
|
||||
}
|
||||
|
||||
// beatSubscriptionLoop handles beat frame subscription with jitter tolerance
|
||||
func (c *client) beatSubscriptionLoop() {
|
||||
defer c.wg.Done()
|
||||
|
||||
subject := fmt.Sprintf("backbeat.beat.%s", c.config.ClusterID)
|
||||
|
||||
// Subscribe to beat frames
|
||||
sub, err := c.nc.Subscribe(subject, c.handleBeatFrame)
|
||||
if err != nil {
|
||||
c.addError(fmt.Sprintf("failed to subscribe to beats: %v", err))
|
||||
c.config.Logger.Error("Failed to subscribe to beats", "error", err)
|
||||
return
|
||||
}
|
||||
defer sub.Unsubscribe()
|
||||
|
||||
c.config.Logger.Info("Beat subscription active", "subject", subject)
|
||||
|
||||
// Start local degradation timer for fallback timing
|
||||
localTicker := time.NewTicker(1 * time.Second) // Default 60 BPM fallback
|
||||
defer localTicker.Stop()
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-c.ctx.Done():
|
||||
return
|
||||
case <-localTicker.C:
|
||||
// Local degradation mode - generate synthetic beats if no recent beats
|
||||
c.beatMutex.RLock()
|
||||
timeSinceLastBeat := time.Since(c.lastBeatTime)
|
||||
c.beatMutex.RUnlock()
|
||||
|
||||
// If more than 2 beat intervals have passed, enter degradation mode
|
||||
if timeSinceLastBeat > 2*time.Second {
|
||||
if !c.localDegradation {
|
||||
c.localDegradation = true
|
||||
c.config.Logger.Warn("Entering local degradation mode",
|
||||
"time_since_last_beat", timeSinceLastBeat)
|
||||
}
|
||||
|
||||
c.handleLocalDegradationBeat()
|
||||
c.metrics.RecordLocalDegradation(timeSinceLastBeat)
|
||||
} else if c.localDegradation {
|
||||
// Exit degradation mode
|
||||
c.localDegradation = false
|
||||
c.config.Logger.Info("Exiting local degradation mode")
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// handleBeatFrame processes incoming beat frames with jitter tolerance
|
||||
func (c *client) handleBeatFrame(msg *nats.Msg) {
|
||||
var beatFrame BeatFrame
|
||||
if err := json.Unmarshal(msg.Data, &beatFrame); err != nil {
|
||||
c.addError(fmt.Sprintf("failed to unmarshal beat frame: %v", err))
|
||||
return
|
||||
}
|
||||
|
||||
// Validate beat frame
|
||||
if beatFrame.Type != "backbeat.beatframe.v1" {
|
||||
c.addError(fmt.Sprintf("invalid beat frame type: %s", beatFrame.Type))
|
||||
return
|
||||
}
|
||||
|
||||
// Check for jitter tolerance
|
||||
now := time.Now()
|
||||
expectedTime := beatFrame.DeadlineAt.Add(-c.getBeatDuration()) // Beat should arrive one duration before deadline
|
||||
jitter := now.Sub(expectedTime)
|
||||
if jitter.Abs() > c.config.JitterTolerance {
|
||||
c.config.Logger.Debug("Beat jitter detected",
|
||||
"jitter", jitter,
|
||||
"tolerance", c.config.JitterTolerance,
|
||||
"beat_index", beatFrame.BeatIndex)
|
||||
}
|
||||
|
||||
// Update internal state
|
||||
c.beatMutex.Lock()
|
||||
c.currentBeat = beatFrame.BeatIndex
|
||||
c.currentWindow = beatFrame.WindowID
|
||||
c.currentHLC = beatFrame.HLC
|
||||
|
||||
// Track tempo changes and calculate actual BPM
|
||||
if c.currentTempo != beatFrame.TempoBPM {
|
||||
c.lastTempo = c.currentTempo
|
||||
c.currentTempo = beatFrame.TempoBPM
|
||||
}
|
||||
|
||||
// Calculate actual BPM from inter-beat timing
|
||||
actualBPM := 60.0 // Default
|
||||
if !c.lastBeatTime.IsZero() {
|
||||
interBeatDuration := now.Sub(c.lastBeatTime)
|
||||
if interBeatDuration > 0 {
|
||||
actualBPM = 60.0 / interBeatDuration.Seconds()
|
||||
}
|
||||
}
|
||||
|
||||
// Record tempo sample for drift analysis
|
||||
sample := tempoSample{
|
||||
BeatIndex: beatFrame.BeatIndex,
|
||||
Tempo: beatFrame.TempoBPM,
|
||||
MeasuredTime: now,
|
||||
ActualBPM: actualBPM,
|
||||
}
|
||||
|
||||
c.tempoHistory = append(c.tempoHistory, sample)
|
||||
// Keep only last 100 samples
|
||||
if len(c.tempoHistory) > 100 {
|
||||
c.tempoHistory = c.tempoHistory[1:]
|
||||
}
|
||||
|
||||
c.lastBeatTime = now
|
||||
c.beatMutex.Unlock()
|
||||
|
||||
// Record beat metrics
|
||||
c.metrics.RecordBeat(beatFrame.DeadlineAt.Add(-c.getBeatDuration()), now, beatFrame.Downbeat)
|
||||
|
||||
// If we were in local degradation mode, exit it
|
||||
if c.localDegradation {
|
||||
c.localDegradation = false
|
||||
c.config.Logger.Info("Exiting local degradation mode - beat received")
|
||||
}
|
||||
|
||||
// Execute beat callbacks with error handling
|
||||
c.callbackMutex.RLock()
|
||||
beatCallbacks := make([]func(BeatFrame), len(c.beatCallbacks))
|
||||
copy(beatCallbacks, c.beatCallbacks)
|
||||
|
||||
var downbeatCallbacks []func(BeatFrame)
|
||||
if beatFrame.Downbeat {
|
||||
downbeatCallbacks = make([]func(BeatFrame), len(c.downbeatCallbacks))
|
||||
copy(downbeatCallbacks, c.downbeatCallbacks)
|
||||
}
|
||||
c.callbackMutex.RUnlock()
|
||||
|
||||
// Execute callbacks in separate goroutines to prevent blocking
|
||||
for _, callback := range beatCallbacks {
|
||||
go c.safeExecuteCallback(callback, beatFrame, "beat")
|
||||
}
|
||||
|
||||
if beatFrame.Downbeat {
|
||||
for _, callback := range downbeatCallbacks {
|
||||
go c.safeExecuteCallback(callback, beatFrame, "downbeat")
|
||||
}
|
||||
}
|
||||
|
||||
c.config.Logger.Debug("Beat processed",
|
||||
"beat_index", beatFrame.BeatIndex,
|
||||
"downbeat", beatFrame.Downbeat,
|
||||
"phase", beatFrame.Phase,
|
||||
"window_id", beatFrame.WindowID)
|
||||
}
|
||||
|
||||
// handleLocalDegradationBeat generates synthetic beats during network issues
|
||||
func (c *client) handleLocalDegradationBeat() {
|
||||
c.beatMutex.Lock()
|
||||
c.currentBeat++
|
||||
|
||||
// Generate synthetic beat frame
|
||||
now := time.Now()
|
||||
beatFrame := BeatFrame{
|
||||
Type: "backbeat.beatframe.v1",
|
||||
ClusterID: c.config.ClusterID,
|
||||
BeatIndex: c.currentBeat,
|
||||
Downbeat: (c.currentBeat-1)%4 == 0, // Assume 4/4 time signature
|
||||
Phase: "degraded",
|
||||
HLC: fmt.Sprintf("%d-0", now.UnixNano()),
|
||||
DeadlineAt: now.Add(time.Second), // 1 second deadline in degradation
|
||||
TempoBPM: 2, // Default 2 BPM (30-second beats) - reasonable for distributed systems
|
||||
WindowID: c.generateDegradedWindowID(c.currentBeat),
|
||||
}
|
||||
|
||||
c.currentWindow = beatFrame.WindowID
|
||||
c.currentHLC = beatFrame.HLC
|
||||
c.lastBeatTime = now
|
||||
c.beatMutex.Unlock()
|
||||
|
||||
// Execute callbacks same as normal beats
|
||||
c.callbackMutex.RLock()
|
||||
beatCallbacks := make([]func(BeatFrame), len(c.beatCallbacks))
|
||||
copy(beatCallbacks, c.beatCallbacks)
|
||||
|
||||
var downbeatCallbacks []func(BeatFrame)
|
||||
if beatFrame.Downbeat {
|
||||
downbeatCallbacks = make([]func(BeatFrame), len(c.downbeatCallbacks))
|
||||
copy(downbeatCallbacks, c.downbeatCallbacks)
|
||||
}
|
||||
c.callbackMutex.RUnlock()
|
||||
|
||||
for _, callback := range beatCallbacks {
|
||||
go c.safeExecuteCallback(callback, beatFrame, "degraded-beat")
|
||||
}
|
||||
|
||||
if beatFrame.Downbeat {
|
||||
for _, callback := range downbeatCallbacks {
|
||||
go c.safeExecuteCallback(callback, beatFrame, "degraded-downbeat")
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// safeExecuteCallback executes a callback with panic recovery
|
||||
func (c *client) safeExecuteCallback(callback func(BeatFrame), beat BeatFrame, callbackType string) {
|
||||
defer func() {
|
||||
if r := recover(); r != nil {
|
||||
errMsg := fmt.Sprintf("panic in %s callback: %v", callbackType, r)
|
||||
c.addError(errMsg)
|
||||
c.metrics.RecordError(errMsg)
|
||||
c.config.Logger.Error("Callback panic recovered",
|
||||
"type", callbackType,
|
||||
"panic", r,
|
||||
"beat_index", beat.BeatIndex)
|
||||
}
|
||||
}()
|
||||
|
||||
start := time.Now()
|
||||
callback(beat)
|
||||
duration := time.Since(start)
|
||||
|
||||
// Record callback latency metrics
|
||||
c.metrics.RecordCallbackLatency(duration, callbackType)
|
||||
|
||||
// Warn about slow callbacks
|
||||
if duration > 5*time.Millisecond {
|
||||
c.config.Logger.Warn("Slow callback detected",
|
||||
"type", callbackType,
|
||||
"duration", duration,
|
||||
"beat_index", beat.BeatIndex)
|
||||
}
|
||||
}
|
||||
|
||||
// validateStatusClaim validates a status claim
|
||||
func (c *client) validateStatusClaim(claim *StatusClaim) error {
|
||||
if claim.State == "" {
|
||||
return fmt.Errorf("state is required")
|
||||
}
|
||||
|
||||
validStates := map[string]bool{
|
||||
"executing": true,
|
||||
"planning": true,
|
||||
"waiting": true,
|
||||
"review": true,
|
||||
"done": true,
|
||||
"failed": true,
|
||||
}
|
||||
|
||||
if !validStates[claim.State] {
|
||||
return fmt.Errorf("invalid state: must be one of [executing, planning, waiting, review, done, failed], got '%s'", claim.State)
|
||||
}
|
||||
|
||||
if claim.Progress < 0.0 || claim.Progress > 1.0 {
|
||||
return fmt.Errorf("progress must be between 0.0 and 1.0, got %f", claim.Progress)
|
||||
}
|
||||
|
||||
if claim.BeatsLeft < 0 {
|
||||
return fmt.Errorf("beats_left must be non-negative, got %d", claim.BeatsLeft)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// signStatusClaim signs a status claim using Ed25519 (BACKBEAT-REQ-044)
|
||||
func (c *client) signStatusClaim(claim *StatusClaim) error {
|
||||
if c.config.SigningKey == nil {
|
||||
return fmt.Errorf("signing key not configured")
|
||||
}
|
||||
|
||||
// Create canonical representation for signing
|
||||
canonical, err := json.Marshal(claim)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to marshal claim for signing: %w", err)
|
||||
}
|
||||
|
||||
// Sign the canonical representation
|
||||
signature := ed25519.Sign(c.config.SigningKey, canonical)
|
||||
|
||||
// Add signature to notes (temporary until proper signature field added)
|
||||
claim.Notes += fmt.Sprintf(" [sig:%x]", signature)
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// createHeaders creates NATS headers with required security information
|
||||
func (c *client) createHeaders() nats.Header {
|
||||
headers := make(nats.Header)
|
||||
|
||||
// Add window ID header (BACKBEAT-REQ-044)
|
||||
headers.Add("x-window-id", c.GetCurrentWindow())
|
||||
|
||||
// Add HLC header (BACKBEAT-REQ-044)
|
||||
headers.Add("x-hlc", c.getCurrentHLC())
|
||||
|
||||
// Add agent ID for routing
|
||||
headers.Add("x-agent-id", c.config.AgentID)
|
||||
|
||||
return headers
|
||||
}
|
||||
|
||||
// getCurrentHLC returns the current HLC timestamp
|
||||
func (c *client) getCurrentHLC() string {
|
||||
c.beatMutex.RLock()
|
||||
defer c.beatMutex.RUnlock()
|
||||
|
||||
if c.currentHLC != "" {
|
||||
return c.currentHLC
|
||||
}
|
||||
|
||||
// Generate fallback HLC
|
||||
return fmt.Sprintf("%d-0", time.Now().UnixNano())
|
||||
}
|
||||
|
||||
// getBeatDuration calculates the duration of a beat based on current tempo
|
||||
func (c *client) getBeatDuration() time.Duration {
|
||||
c.beatMutex.RLock()
|
||||
tempo := c.currentTempo
|
||||
c.beatMutex.RUnlock()
|
||||
|
||||
if tempo <= 0 {
|
||||
tempo = 60 // Default to 60 BPM if no tempo information available
|
||||
}
|
||||
|
||||
// Calculate beat duration: 60 seconds / BPM = seconds per beat
|
||||
return time.Duration(60.0/float64(tempo)*1000) * time.Millisecond
|
||||
}
|
||||
|
||||
// generateDegradedWindowID generates a window ID for degraded mode
|
||||
func (c *client) generateDegradedWindowID(beatIndex int64) string {
|
||||
// Use similar algorithm to regular window ID but mark as degraded
|
||||
input := fmt.Sprintf("%s:degraded:%d", c.config.ClusterID, beatIndex/4) // Assume 4-beat bars
|
||||
hash := sha256.Sum256([]byte(input))
|
||||
return fmt.Sprintf("deg-%x", hash)[:32]
|
||||
}
|
||||
|
||||
// addError adds an error to the error list with deduplication
|
||||
func (c *client) addError(err string) {
|
||||
c.errorMutex.Lock()
|
||||
defer c.errorMutex.Unlock()
|
||||
|
||||
// Keep only the last 10 errors to prevent memory leaks
|
||||
if len(c.errors) >= 10 {
|
||||
c.errors = c.errors[1:]
|
||||
}
|
||||
|
||||
timestampedErr := fmt.Sprintf("[%s] %s", time.Now().Format("15:04:05"), err)
|
||||
c.errors = append(c.errors, timestampedErr)
|
||||
|
||||
// Record error in metrics
|
||||
c.metrics.RecordError(timestampedErr)
|
||||
}
|
||||
|
||||
// Legacy compatibility functions for BACKBEAT-REQ-043
|
||||
|
||||
// ConvertLegacyBeat converts legacy {bar,beat} to beat_index with warning
|
||||
func (c *client) ConvertLegacyBeat(bar, beat int) int64 {
|
||||
c.legacyMutex.Lock()
|
||||
if !c.legacyWarned {
|
||||
c.config.Logger.Warn("Legacy {bar,beat} format detected - please migrate to beat_index",
|
||||
"bar", bar, "beat", beat)
|
||||
c.legacyWarned = true
|
||||
}
|
||||
c.legacyMutex.Unlock()
|
||||
|
||||
// Convert assuming 4 beats per bar (standard)
|
||||
return int64((bar-1)*4 + beat)
|
||||
}
|
||||
|
||||
// GetLegacyBeatInfo converts current beat_index to legacy {bar,beat} format
|
||||
func (c *client) GetLegacyBeatInfo() LegacyBeatInfo {
|
||||
beatIndex := c.GetCurrentBeat()
|
||||
if beatIndex <= 0 {
|
||||
return LegacyBeatInfo{Bar: 1, Beat: 1}
|
||||
}
|
||||
|
||||
// Convert assuming 4 beats per bar
|
||||
bar := int((beatIndex-1)/4) + 1
|
||||
beat := int((beatIndex-1)%4) + 1
|
||||
|
||||
return LegacyBeatInfo{Bar: bar, Beat: beat}
|
||||
}
|
||||
277
vendor/github.com/chorus-services/backbeat/pkg/sdk/metrics.go
generated
vendored
Normal file
277
vendor/github.com/chorus-services/backbeat/pkg/sdk/metrics.go
generated
vendored
Normal file
@@ -0,0 +1,277 @@
|
||||
package sdk
|
||||
|
||||
import (
|
||||
"expvar"
|
||||
"fmt"
|
||||
"sync"
|
||||
"time"
|
||||
)
|
||||
|
||||
// Metrics provides comprehensive observability for the SDK
|
||||
type Metrics struct {
|
||||
// Connection metrics
|
||||
ConnectionStatus *expvar.Int
|
||||
ReconnectCount *expvar.Int
|
||||
ConnectionDuration *expvar.Int
|
||||
|
||||
// Beat metrics
|
||||
BeatsReceived *expvar.Int
|
||||
DownbeatsReceived *expvar.Int
|
||||
BeatJitterMS *expvar.Map
|
||||
BeatCallbackLatency *expvar.Map
|
||||
BeatMisses *expvar.Int
|
||||
LocalDegradationTime *expvar.Int
|
||||
|
||||
// Status emission metrics
|
||||
StatusClaimsEmitted *expvar.Int
|
||||
StatusClaimErrors *expvar.Int
|
||||
|
||||
// Budget metrics
|
||||
BudgetsCreated *expvar.Int
|
||||
BudgetsCompleted *expvar.Int
|
||||
BudgetsTimedOut *expvar.Int
|
||||
|
||||
// Error metrics
|
||||
TotalErrors *expvar.Int
|
||||
LastError *expvar.String
|
||||
|
||||
// Internal counters
|
||||
beatJitterSamples []float64
|
||||
jitterMutex sync.Mutex
|
||||
callbackLatencies []float64
|
||||
latencyMutex sync.Mutex
|
||||
}
|
||||
|
||||
// NewMetrics creates a new metrics instance with expvar integration
|
||||
func NewMetrics(prefix string) *Metrics {
|
||||
m := &Metrics{
|
||||
ConnectionStatus: expvar.NewInt(prefix + ".connection.status"),
|
||||
ReconnectCount: expvar.NewInt(prefix + ".connection.reconnects"),
|
||||
ConnectionDuration: expvar.NewInt(prefix + ".connection.duration_ms"),
|
||||
|
||||
BeatsReceived: expvar.NewInt(prefix + ".beats.received"),
|
||||
DownbeatsReceived: expvar.NewInt(prefix + ".beats.downbeats"),
|
||||
BeatJitterMS: expvar.NewMap(prefix + ".beats.jitter_ms"),
|
||||
BeatCallbackLatency: expvar.NewMap(prefix + ".beats.callback_latency_ms"),
|
||||
BeatMisses: expvar.NewInt(prefix + ".beats.misses"),
|
||||
LocalDegradationTime: expvar.NewInt(prefix + ".beats.degradation_ms"),
|
||||
|
||||
StatusClaimsEmitted: expvar.NewInt(prefix + ".status.claims_emitted"),
|
||||
StatusClaimErrors: expvar.NewInt(prefix + ".status.claim_errors"),
|
||||
|
||||
BudgetsCreated: expvar.NewInt(prefix + ".budgets.created"),
|
||||
BudgetsCompleted: expvar.NewInt(prefix + ".budgets.completed"),
|
||||
BudgetsTimedOut: expvar.NewInt(prefix + ".budgets.timed_out"),
|
||||
|
||||
TotalErrors: expvar.NewInt(prefix + ".errors.total"),
|
||||
LastError: expvar.NewString(prefix + ".errors.last"),
|
||||
|
||||
beatJitterSamples: make([]float64, 0, 100),
|
||||
callbackLatencies: make([]float64, 0, 100),
|
||||
}
|
||||
|
||||
// Initialize connection status to disconnected
|
||||
m.ConnectionStatus.Set(0)
|
||||
|
||||
return m
|
||||
}
|
||||
|
||||
// RecordConnection records connection establishment
|
||||
func (m *Metrics) RecordConnection() {
|
||||
m.ConnectionStatus.Set(1)
|
||||
m.ReconnectCount.Add(1)
|
||||
}
|
||||
|
||||
// RecordDisconnection records connection loss
|
||||
func (m *Metrics) RecordDisconnection() {
|
||||
m.ConnectionStatus.Set(0)
|
||||
}
|
||||
|
||||
// RecordBeat records a beat reception with jitter measurement
|
||||
func (m *Metrics) RecordBeat(expectedTime, actualTime time.Time, isDownbeat bool) {
|
||||
m.BeatsReceived.Add(1)
|
||||
if isDownbeat {
|
||||
m.DownbeatsReceived.Add(1)
|
||||
}
|
||||
|
||||
// Calculate and record jitter
|
||||
jitter := actualTime.Sub(expectedTime)
|
||||
jitterMS := float64(jitter.Nanoseconds()) / 1e6
|
||||
|
||||
m.jitterMutex.Lock()
|
||||
m.beatJitterSamples = append(m.beatJitterSamples, jitterMS)
|
||||
if len(m.beatJitterSamples) > 100 {
|
||||
m.beatJitterSamples = m.beatJitterSamples[1:]
|
||||
}
|
||||
|
||||
// Update jitter statistics
|
||||
if len(m.beatJitterSamples) > 0 {
|
||||
avg, p95, p99 := m.calculatePercentiles(m.beatJitterSamples)
|
||||
m.BeatJitterMS.Set("avg", &expvar.Float{})
|
||||
m.BeatJitterMS.Get("avg").(*expvar.Float).Set(avg)
|
||||
m.BeatJitterMS.Set("p95", &expvar.Float{})
|
||||
m.BeatJitterMS.Get("p95").(*expvar.Float).Set(p95)
|
||||
m.BeatJitterMS.Set("p99", &expvar.Float{})
|
||||
m.BeatJitterMS.Get("p99").(*expvar.Float).Set(p99)
|
||||
}
|
||||
m.jitterMutex.Unlock()
|
||||
}
|
||||
|
||||
// RecordBeatMiss records a missed beat
|
||||
func (m *Metrics) RecordBeatMiss() {
|
||||
m.BeatMisses.Add(1)
|
||||
}
|
||||
|
||||
// RecordCallbackLatency records callback execution latency
|
||||
func (m *Metrics) RecordCallbackLatency(duration time.Duration, callbackType string) {
|
||||
latencyMS := float64(duration.Nanoseconds()) / 1e6
|
||||
|
||||
m.latencyMutex.Lock()
|
||||
m.callbackLatencies = append(m.callbackLatencies, latencyMS)
|
||||
if len(m.callbackLatencies) > 100 {
|
||||
m.callbackLatencies = m.callbackLatencies[1:]
|
||||
}
|
||||
|
||||
// Update latency statistics
|
||||
if len(m.callbackLatencies) > 0 {
|
||||
avg, p95, p99 := m.calculatePercentiles(m.callbackLatencies)
|
||||
key := callbackType + "_avg"
|
||||
m.BeatCallbackLatency.Set(key, &expvar.Float{})
|
||||
m.BeatCallbackLatency.Get(key).(*expvar.Float).Set(avg)
|
||||
|
||||
key = callbackType + "_p95"
|
||||
m.BeatCallbackLatency.Set(key, &expvar.Float{})
|
||||
m.BeatCallbackLatency.Get(key).(*expvar.Float).Set(p95)
|
||||
|
||||
key = callbackType + "_p99"
|
||||
m.BeatCallbackLatency.Set(key, &expvar.Float{})
|
||||
m.BeatCallbackLatency.Get(key).(*expvar.Float).Set(p99)
|
||||
}
|
||||
m.latencyMutex.Unlock()
|
||||
}
|
||||
|
||||
// RecordLocalDegradation records time spent in local degradation mode
|
||||
func (m *Metrics) RecordLocalDegradation(duration time.Duration) {
|
||||
durationMS := duration.Nanoseconds() / 1e6
|
||||
m.LocalDegradationTime.Add(durationMS)
|
||||
}
|
||||
|
||||
// RecordStatusClaim records a status claim emission
|
||||
func (m *Metrics) RecordStatusClaim(success bool) {
|
||||
if success {
|
||||
m.StatusClaimsEmitted.Add(1)
|
||||
} else {
|
||||
m.StatusClaimErrors.Add(1)
|
||||
}
|
||||
}
|
||||
|
||||
// RecordBudget records budget creation and completion
|
||||
func (m *Metrics) RecordBudgetCreated() {
|
||||
m.BudgetsCreated.Add(1)
|
||||
}
|
||||
|
||||
func (m *Metrics) RecordBudgetCompleted(timedOut bool) {
|
||||
if timedOut {
|
||||
m.BudgetsTimedOut.Add(1)
|
||||
} else {
|
||||
m.BudgetsCompleted.Add(1)
|
||||
}
|
||||
}
|
||||
|
||||
// RecordError records an error
|
||||
func (m *Metrics) RecordError(err string) {
|
||||
m.TotalErrors.Add(1)
|
||||
m.LastError.Set(err)
|
||||
}
|
||||
|
||||
// calculatePercentiles calculates avg, p95, p99 for a slice of samples
|
||||
func (m *Metrics) calculatePercentiles(samples []float64) (avg, p95, p99 float64) {
|
||||
if len(samples) == 0 {
|
||||
return 0, 0, 0
|
||||
}
|
||||
|
||||
// Calculate average
|
||||
sum := 0.0
|
||||
for _, s := range samples {
|
||||
sum += s
|
||||
}
|
||||
avg = sum / float64(len(samples))
|
||||
|
||||
// Sort for percentiles (simple bubble sort for small slices)
|
||||
sorted := make([]float64, len(samples))
|
||||
copy(sorted, samples)
|
||||
|
||||
for i := 0; i < len(sorted); i++ {
|
||||
for j := 0; j < len(sorted)-i-1; j++ {
|
||||
if sorted[j] > sorted[j+1] {
|
||||
sorted[j], sorted[j+1] = sorted[j+1], sorted[j]
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Calculate percentiles
|
||||
p95Index := int(float64(len(sorted)) * 0.95)
|
||||
if p95Index >= len(sorted) {
|
||||
p95Index = len(sorted) - 1
|
||||
}
|
||||
p95 = sorted[p95Index]
|
||||
|
||||
p99Index := int(float64(len(sorted)) * 0.99)
|
||||
if p99Index >= len(sorted) {
|
||||
p99Index = len(sorted) - 1
|
||||
}
|
||||
p99 = sorted[p99Index]
|
||||
|
||||
return avg, p95, p99
|
||||
}
|
||||
|
||||
// Enhanced client with metrics integration
|
||||
func (c *client) initMetrics() {
|
||||
prefix := fmt.Sprintf("backbeat.sdk.%s", c.config.AgentID)
|
||||
c.metrics = NewMetrics(prefix)
|
||||
}
|
||||
|
||||
// Add metrics field to client struct (this would go in client.go)
|
||||
type clientWithMetrics struct {
|
||||
*client
|
||||
metrics *Metrics
|
||||
}
|
||||
|
||||
// Prometheus integration helper
|
||||
type PrometheusMetrics struct {
|
||||
// This would integrate with prometheus/client_golang
|
||||
// For now, we'll just use expvar which can be scraped
|
||||
}
|
||||
|
||||
// GetMetricsSnapshot returns a snapshot of all current metrics
|
||||
func (m *Metrics) GetMetricsSnapshot() map[string]interface{} {
|
||||
snapshot := make(map[string]interface{})
|
||||
|
||||
snapshot["connection_status"] = m.ConnectionStatus.Value()
|
||||
snapshot["reconnect_count"] = m.ReconnectCount.Value()
|
||||
snapshot["beats_received"] = m.BeatsReceived.Value()
|
||||
snapshot["downbeats_received"] = m.DownbeatsReceived.Value()
|
||||
snapshot["beat_misses"] = m.BeatMisses.Value()
|
||||
snapshot["status_claims_emitted"] = m.StatusClaimsEmitted.Value()
|
||||
snapshot["status_claim_errors"] = m.StatusClaimErrors.Value()
|
||||
snapshot["budgets_created"] = m.BudgetsCreated.Value()
|
||||
snapshot["budgets_completed"] = m.BudgetsCompleted.Value()
|
||||
snapshot["budgets_timed_out"] = m.BudgetsTimedOut.Value()
|
||||
snapshot["total_errors"] = m.TotalErrors.Value()
|
||||
snapshot["last_error"] = m.LastError.Value()
|
||||
|
||||
return snapshot
|
||||
}
|
||||
|
||||
// Health check with metrics
|
||||
func (c *client) GetHealthWithMetrics() map[string]interface{} {
|
||||
health := map[string]interface{}{
|
||||
"status": c.Health(),
|
||||
}
|
||||
|
||||
if c.metrics != nil {
|
||||
health["metrics"] = c.metrics.GetMetricsSnapshot()
|
||||
}
|
||||
|
||||
return health
|
||||
}
|
||||
Reference in New Issue
Block a user