373 lines
9.8 KiB
Markdown
373 lines
9.8 KiB
Markdown
# BACKBEAT Go SDK
|
|
|
|
The BACKBEAT Go SDK enables CHORUS services to become "BACKBEAT-aware" by providing client libraries for beat synchronization, status emission, and beat-budget management.
|
|
|
|
## Features
|
|
|
|
- **Beat Subscription (BACKBEAT-REQ-040)**: Subscribe to beat and downbeat events with jitter-tolerant scheduling
|
|
- **Status Emission (BACKBEAT-REQ-041)**: Emit status claims with automatic agent_id, task_id, and HLC population
|
|
- **Beat Budgets (BACKBEAT-REQ-042)**: Execute functions with beat-based timeouts and cancellation
|
|
- **Legacy Compatibility (BACKBEAT-REQ-043)**: Support for legacy `{bar,beat}` patterns with migration warnings
|
|
- **Security (BACKBEAT-REQ-044)**: Ed25519 signing and required headers for status claims
|
|
- **Local Degradation**: Continue operating when pulse service is unavailable
|
|
- **Comprehensive Observability**: Metrics, health reporting, and performance monitoring
|
|
|
|
## Quick Start
|
|
|
|
```go
|
|
package main
|
|
|
|
import (
|
|
"context"
|
|
"crypto/ed25519"
|
|
"crypto/rand"
|
|
"log/slog"
|
|
|
|
"github.com/chorus-services/backbeat/pkg/sdk"
|
|
)
|
|
|
|
func main() {
|
|
// Generate signing key
|
|
_, signingKey, _ := ed25519.GenerateKey(rand.Reader)
|
|
|
|
// Configure SDK
|
|
config := sdk.DefaultConfig()
|
|
config.ClusterID = "chorus-dev"
|
|
config.AgentID = "my-service"
|
|
config.NATSUrl = "nats://localhost:4222"
|
|
config.SigningKey = signingKey
|
|
|
|
// Create client
|
|
client := sdk.NewClient(config)
|
|
|
|
// Register beat callback
|
|
client.OnBeat(func(beat sdk.BeatFrame) {
|
|
slog.Info("Beat received", "beat_index", beat.BeatIndex)
|
|
|
|
// Emit status
|
|
client.EmitStatusClaim(sdk.StatusClaim{
|
|
State: "executing",
|
|
BeatsLeft: 5,
|
|
Progress: 0.3,
|
|
Notes: "Processing data",
|
|
})
|
|
})
|
|
|
|
// Start client
|
|
ctx := context.Background()
|
|
if err := client.Start(ctx); err != nil {
|
|
panic(err)
|
|
}
|
|
defer client.Stop()
|
|
|
|
// Your service logic here...
|
|
select {}
|
|
}
|
|
```
|
|
|
|
## Configuration
|
|
|
|
### Basic Configuration
|
|
|
|
```go
|
|
config := &sdk.Config{
|
|
ClusterID: "your-cluster", // BACKBEAT cluster ID
|
|
AgentID: "your-agent", // Unique agent identifier
|
|
NATSUrl: "nats://localhost:4222", // NATS connection URL
|
|
}
|
|
```
|
|
|
|
### Advanced Configuration
|
|
|
|
```go
|
|
config := sdk.DefaultConfig()
|
|
config.ClusterID = "chorus-prod"
|
|
config.AgentID = "web-service-01"
|
|
config.NATSUrl = "nats://nats.cluster.local:4222"
|
|
config.SigningKey = loadSigningKey() // Ed25519 private key
|
|
config.JitterTolerance = 100 * time.Millisecond
|
|
config.ReconnectDelay = 2 * time.Second
|
|
config.MaxReconnects = 10 // -1 for infinite
|
|
config.Logger = slog.New(slog.NewJSONHandler(os.Stdout, nil))
|
|
```
|
|
|
|
## Core Features
|
|
|
|
### Beat Subscription
|
|
|
|
```go
|
|
// Register beat callback (called every beat)
|
|
client.OnBeat(func(beat sdk.BeatFrame) {
|
|
// Your beat logic here
|
|
fmt.Printf("Beat %d at %s\n", beat.BeatIndex, beat.DeadlineAt)
|
|
})
|
|
|
|
// Register downbeat callback (called at bar starts)
|
|
client.OnDownbeat(func(beat sdk.BeatFrame) {
|
|
// Your downbeat logic here
|
|
fmt.Printf("Bar started: %s\n", beat.WindowID)
|
|
})
|
|
```
|
|
|
|
### Status Emission
|
|
|
|
```go
|
|
// Basic status emission
|
|
err := client.EmitStatusClaim(sdk.StatusClaim{
|
|
State: "executing", // executing|planning|waiting|review|done|failed
|
|
BeatsLeft: 10, // estimated beats remaining
|
|
Progress: 0.75, // progress ratio (0.0-1.0)
|
|
Notes: "Processing batch 5/10",
|
|
})
|
|
|
|
// Advanced status with task tracking
|
|
err := client.EmitStatusClaim(sdk.StatusClaim{
|
|
TaskID: "task-12345", // auto-generated if empty
|
|
State: "waiting",
|
|
WaitFor: []string{"hmmm://thread/abc123"}, // dependencies
|
|
BeatsLeft: 0,
|
|
Progress: 1.0,
|
|
Notes: "Waiting for thread completion",
|
|
})
|
|
```
|
|
|
|
### Beat Budgets
|
|
|
|
```go
|
|
// Execute with beat-based timeout
|
|
err := client.WithBeatBudget(10, func() error {
|
|
// This function has 10 beats to complete
|
|
return performTask()
|
|
})
|
|
|
|
if err != nil {
|
|
// Handle timeout or task error
|
|
fmt.Printf("Task failed or exceeded budget: %v\n", err)
|
|
}
|
|
|
|
// Real-world example
|
|
err := client.WithBeatBudget(20, func() error {
|
|
// Database operation with beat budget
|
|
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
|
|
defer cancel()
|
|
|
|
return database.ProcessBatch(ctx, batchData)
|
|
})
|
|
```
|
|
|
|
## Client Interface
|
|
|
|
```go
|
|
type Client interface {
|
|
// Beat subscription
|
|
OnBeat(callback func(BeatFrame)) error
|
|
OnDownbeat(callback func(BeatFrame)) error
|
|
|
|
// Status emission
|
|
EmitStatusClaim(claim StatusClaim) error
|
|
|
|
// Beat budgets
|
|
WithBeatBudget(n int, fn func() error) error
|
|
|
|
// Utilities
|
|
GetCurrentBeat() int64
|
|
GetCurrentWindow() string
|
|
IsInWindow(windowID string) bool
|
|
|
|
// Lifecycle
|
|
Start(ctx context.Context) error
|
|
Stop() error
|
|
Health() HealthStatus
|
|
}
|
|
```
|
|
|
|
## Examples
|
|
|
|
The SDK includes comprehensive examples:
|
|
|
|
- **[Simple Agent](examples/simple_agent.go)**: Basic beat subscription and status emission
|
|
- **[Task Processor](examples/task_processor.go)**: Beat budget usage for task timeout management
|
|
- **[Service Monitor](examples/service_monitor.go)**: Health monitoring with beat-aligned reporting
|
|
|
|
### Running Examples
|
|
|
|
```bash
|
|
# Simple agent example
|
|
go run pkg/sdk/examples/simple_agent.go
|
|
|
|
# Task processor with beat budgets
|
|
go run pkg/sdk/examples/task_processor.go
|
|
|
|
# Service monitor with health reporting
|
|
go run pkg/sdk/examples/service_monitor.go
|
|
```
|
|
|
|
## Observability
|
|
|
|
### Health Monitoring
|
|
|
|
```go
|
|
health := client.Health()
|
|
fmt.Printf("Connected: %v\n", health.Connected)
|
|
fmt.Printf("Last Beat: %d at %s\n", health.LastBeat, health.LastBeatTime)
|
|
fmt.Printf("Time Drift: %s\n", health.TimeDrift)
|
|
fmt.Printf("Reconnects: %d\n", health.ReconnectCount)
|
|
fmt.Printf("Local Degradation: %v\n", health.LocalDegradation)
|
|
```
|
|
|
|
### Metrics
|
|
|
|
The SDK exposes metrics via Go's `expvar` package:
|
|
|
|
- Connection metrics: status, reconnection count, duration
|
|
- Beat metrics: received, jitter, callback latency, misses
|
|
- Status metrics: claims emitted, errors
|
|
- Budget metrics: created, completed, timed out
|
|
- Error metrics: total count, last error
|
|
|
|
Access metrics at `http://localhost:8080/debug/vars` when using `expvar`.
|
|
|
|
### Logging
|
|
|
|
The SDK uses structured logging via `slog`:
|
|
|
|
```go
|
|
config.Logger = slog.New(slog.NewJSONHandler(os.Stdout, &slog.HandlerOptions{
|
|
Level: slog.LevelDebug, // Set appropriate level
|
|
}))
|
|
```
|
|
|
|
## Error Handling
|
|
|
|
The SDK provides comprehensive error handling:
|
|
|
|
- **Connection Errors**: Automatic reconnection with exponential backoff
|
|
- **Beat Jitter**: Tolerance for network delays and timing variations
|
|
- **Callback Panics**: Recovery and logging without affecting other callbacks
|
|
- **Validation Errors**: Status claim validation with detailed error messages
|
|
- **Timeout Errors**: Beat budget timeouts with context cancellation
|
|
|
|
## Local Degradation
|
|
|
|
When the pulse service is unavailable, the SDK automatically enters local degradation mode:
|
|
|
|
- Generates synthetic beats to maintain callback timing
|
|
- Uses fallback 60 BPM tempo
|
|
- Marks beat frames with "degraded" phase
|
|
- Automatically recovers when pulse service returns
|
|
|
|
## Legacy Compatibility
|
|
|
|
Support for legacy `{bar,beat}` patterns (BACKBEAT-REQ-043):
|
|
|
|
```go
|
|
// Convert legacy format (logs warning once)
|
|
beatIndex := client.ConvertLegacyBeat(bar, beat)
|
|
|
|
// Get legacy format from current beat
|
|
legacy := client.GetLegacyBeatInfo()
|
|
fmt.Printf("Bar: %d, Beat: %d\n", legacy.Bar, legacy.Beat)
|
|
```
|
|
|
|
## Security
|
|
|
|
The SDK implements BACKBEAT security requirements:
|
|
|
|
- **Ed25519 Signatures**: All status claims are signed when signing key provided
|
|
- **Required Headers**: Includes `x-window-id` and `x-hlc` headers
|
|
- **Agent Identification**: Automatic `x-agent-id` header for routing
|
|
|
|
```go
|
|
// Configure signing
|
|
_, signingKey, _ := ed25519.GenerateKey(rand.Reader)
|
|
config.SigningKey = signingKey
|
|
```
|
|
|
|
## Performance
|
|
|
|
The SDK is designed for high performance:
|
|
|
|
- **Beat Callback Latency**: Target ≤5ms callback execution
|
|
- **Timer Drift**: ≤1% drift over 1 hour without leader
|
|
- **Concurrent Safe**: All operations are goroutine-safe
|
|
- **Memory Efficient**: Bounded error lists and metric samples
|
|
|
|
## Integration Patterns
|
|
|
|
### Web Service Integration
|
|
|
|
```go
|
|
func main() {
|
|
// Initialize BACKBEAT client
|
|
client := sdk.NewClient(config)
|
|
client.OnBeat(func(beat sdk.BeatFrame) {
|
|
// Report web service status
|
|
client.EmitStatusClaim(sdk.StatusClaim{
|
|
State: "executing",
|
|
Progress: getRequestSuccessRate(),
|
|
Notes: fmt.Sprintf("Handling %d req/s", getCurrentRPS()),
|
|
})
|
|
})
|
|
|
|
// Start HTTP server
|
|
http.HandleFunc("/health", func(w http.ResponseWriter, r *http.Request) {
|
|
health := client.Health()
|
|
json.NewEncoder(w).Encode(health)
|
|
})
|
|
}
|
|
```
|
|
|
|
### Background Job Processor
|
|
|
|
```go
|
|
func processJobs(client sdk.Client) {
|
|
for job := range jobQueue {
|
|
// Use beat budget for job timeout
|
|
err := client.WithBeatBudget(job.MaxBeats, func() error {
|
|
return processJob(job)
|
|
})
|
|
|
|
if err != nil {
|
|
client.EmitStatusClaim(sdk.StatusClaim{
|
|
TaskID: job.ID,
|
|
State: "failed",
|
|
Notes: err.Error(),
|
|
})
|
|
}
|
|
}
|
|
}
|
|
```
|
|
|
|
## Testing
|
|
|
|
The SDK includes comprehensive test utilities:
|
|
|
|
```bash
|
|
# Run all tests
|
|
go test ./pkg/sdk/...
|
|
|
|
# Run with race detection
|
|
go test -race ./pkg/sdk/...
|
|
|
|
# Run benchmarks
|
|
go test -bench=. ./pkg/sdk/examples/
|
|
```
|
|
|
|
## Requirements
|
|
|
|
- Go 1.22 or later
|
|
- NATS server for messaging
|
|
- BACKBEAT pulse service running
|
|
- Network connectivity to cluster
|
|
|
|
## Contributing
|
|
|
|
1. Follow standard Go conventions
|
|
2. Include comprehensive tests
|
|
3. Update documentation for API changes
|
|
4. Ensure examples remain working
|
|
5. Maintain backward compatibility
|
|
|
|
## License
|
|
|
|
This SDK is part of the BACKBEAT project and follows the same licensing terms. |