12 KiB
BACKBEAT Integration Guide for CHORUS 2.0.0 Projects
This guide explains how to integrate BACKBEAT contract validation into your CHORUS 2.0.0 project for guaranteed compatibility with the distributed orchestration system.
Overview
BACKBEAT provides three core interfaces for coordinated distributed execution:
- INT-A (BeatFrame): Rhythm coordination from Pulse service to all agents
- INT-B (StatusClaim): Agent status reporting to Reverb service
- INT-C (BarReport): Periodic summary reports from Reverb to all services
All messages must conform to the published JSON schemas to ensure reliable operation across the CHORUS ecosystem.
Quick Start
1. Add Contract Validation to Your CI Pipeline
GitHub Actions
name: BACKBEAT Contract Validation
on: [push, pull_request]
jobs:
validate-backbeat:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- name: Checkout BACKBEAT contracts
uses: actions/checkout@v4
with:
repository: 'chorus-services/backbeat'
path: 'backbeat-contracts'
- name: Set up Go
uses: actions/setup-go@v4
with:
go-version: '1.22'
- name: Validate BACKBEAT messages
run: |
cd backbeat-contracts/contracts/tests/integration
make build
./backbeat-validate \
--schemas ../../schemas \
--dir ../../../your-messages-directory \
--exit-code
GitLab CI
validate-backbeat:
stage: test
image: golang:1.22
before_script:
- git clone https://github.com/chorus-services/backbeat.git /tmp/backbeat
- cd /tmp/backbeat/contracts/tests/integration && make build
script:
- /tmp/backbeat/contracts/tests/integration/backbeat-validate
--schemas /tmp/backbeat/contracts/schemas
--dir $CI_PROJECT_DIR/messages
--exit-code
2. Project Makefile Integration
Add to your project's Makefile:
# BACKBEAT contract validation
BACKBEAT_REPO = https://github.com/chorus-services/backbeat.git
BACKBEAT_DIR = .backbeat-contracts
$(BACKBEAT_DIR):
git clone $(BACKBEAT_REPO) $(BACKBEAT_DIR)
validate-backbeat: $(BACKBEAT_DIR)
cd $(BACKBEAT_DIR)/contracts/tests/integration && make build
$(BACKBEAT_DIR)/contracts/tests/integration/backbeat-validate \
--schemas $(BACKBEAT_DIR)/contracts/schemas \
--dir messages \
--exit-code
.PHONY: validate-backbeat
Message Implementation
Implementing BeatFrame Consumer (INT-A)
Your service should subscribe to beat frames from the Pulse service and respond appropriately:
// Example Go implementation
type BeatFrameHandler struct {
currentBeat int64
phase string
}
func (h *BeatFrameHandler) HandleBeatFrame(frame BeatFrame) {
// Validate the beat frame
if err := validateBeatFrame(frame); err != nil {
log.Errorf("Invalid beat frame: %v", err)
return
}
// Update internal state
h.currentBeat = frame.BeatIndex
h.phase = frame.Phase
// Execute phase-appropriate actions
switch frame.Phase {
case "plan":
h.planPhase(frame)
case "execute":
h.executePhase(frame)
case "review":
h.reviewPhase(frame)
}
}
func validateBeatFrame(frame BeatFrame) error {
if frame.Type != "backbeat.beatframe.v1" {
return fmt.Errorf("invalid message type: %s", frame.Type)
}
if frame.TempoBPM < 0.1 || frame.TempoBPM > 1000 {
return fmt.Errorf("invalid tempo: %f", frame.TempoBPM)
}
// Add more validation as needed
return nil
}
Implementing StatusClaim Publisher (INT-B)
Your agents should publish status claims to the Reverb service:
func (agent *Agent) PublishStatusClaim(beatIndex int64, state string) error {
claim := StatusClaim{
Type: "backbeat.statusclaim.v1",
AgentID: agent.ID,
BeatIndex: beatIndex,
State: state,
HLC: agent.generateHLC(),
Progress: agent.calculateProgress(),
Notes: agent.getCurrentStatus(),
}
// Validate before sending
if err := validateStatusClaim(claim); err != nil {
return fmt.Errorf("invalid status claim: %w", err)
}
return agent.publisher.Publish("backbeat.statusclaims", claim)
}
func validateStatusClaim(claim StatusClaim) error {
validStates := []string{"idle", "planning", "executing", "reviewing", "completed", "failed", "blocked", "helping"}
for _, valid := range validStates {
if claim.State == valid {
return nil
}
}
return fmt.Errorf("invalid state: %s", claim.State)
}
Implementing BarReport Consumer (INT-C)
Services should consume bar reports for cluster health awareness:
func (service *Service) HandleBarReport(report BarReport) {
// Validate the bar report
if err := validateBarReport(report); err != nil {
log.Errorf("Invalid bar report: %v", err)
return
}
// Update cluster health metrics
service.updateClusterHealth(report)
// React to issues
if len(report.Issues) > 0 {
service.handleClusterIssues(report.Issues)
}
// Store performance metrics
service.storePerformanceMetrics(report.Performance)
}
func (service *Service) updateClusterHealth(report BarReport) {
service.clusterMetrics.AgentsReporting = report.AgentsReporting
service.clusterMetrics.OnTimeRate = float64(report.OnTimeReviews) / float64(report.AgentsReporting)
service.clusterMetrics.TempoDrift = report.TempoDriftMS
service.clusterMetrics.SecretRotationsOK = report.SecretRotationsOK
}
Message Format Requirements
Common Patterns
All BACKBEAT messages share these patterns:
- Type Field: Must exactly match the schema constant
- HLC Timestamps: Format
XXXX:XXXX:XXXX(hex digits) - Beat Indices: Monotonically increasing integers ≥ 0
- Window IDs: 32-character hexadecimal strings
- Agent IDs: Pattern
service:instanceoragent:identifier
Validation Best Practices
- Always validate messages before processing
- Use schema validation in tests
- Handle validation errors gracefully
- Log validation failures for debugging
Example validation function:
func ValidateMessage(messageBytes []byte, expectedType string) error {
// Parse and check type
var msg map[string]interface{}
if err := json.Unmarshal(messageBytes, &msg); err != nil {
return fmt.Errorf("invalid JSON: %w", err)
}
msgType, ok := msg["type"].(string)
if !ok || msgType != expectedType {
return fmt.Errorf("expected type %s, got %s", expectedType, msgType)
}
// Use schema validation
return validateWithSchema(messageBytes, expectedType)
}
Tempo and Timing Considerations
Understanding Tempo
- Default Tempo: 1 BPM (60-second beats)
- Minimum Tempo: 1 BPM (60-second beats for batch or recovery windows)
- Maximum Tempo: 24 BPM (~2.5-second beats for high-frequency workloads)
Phase Timing
Each beat consists of three phases with equal time allocation:
Beat Duration = 60 / TempoBPM seconds
Phase Duration = Beat Duration / 3
Plan Phase: [0, Beat Duration / 3)
Execute Phase: [Beat Duration / 3, 2 * Beat Duration / 3)
Review Phase: [2 * Beat Duration / 3, Beat Duration)
Implementation Guidelines
- Respect Deadlines: Always complete phase work before
deadline_at - Handle Tempo Changes: Pulse may adjust tempo based on cluster performance
- Plan for Latency: Factor in network and processing delays
- Implement Backpressure: Report when unable to keep up with tempo
Error Handling
Schema Validation Failures
func HandleInvalidMessage(err error, messageBytes []byte) {
log.Errorf("Schema validation failed: %v", err)
log.Debugf("Invalid message: %s", string(messageBytes))
// Send to dead letter queue or error handler
errorHandler.HandleInvalidMessage(messageBytes, err)
// Update metrics
metrics.InvalidMessageCounter.Inc()
}
Network and Timing Issues
func (agent *Agent) HandleMissedBeat(expectedBeat int64) {
// Report missed beat
claim := StatusClaim{
Type: "backbeat.statusclaim.v1",
AgentID: agent.ID,
BeatIndex: expectedBeat,
State: "blocked",
Notes: "missed beat due to network issues",
HLC: agent.generateHLC(),
}
// Try to catch up
agent.attemptResynchronization()
}
Testing Your Integration
Unit Tests
func TestBeatFrameValidation(t *testing.T) {
validFrame := BeatFrame{
Type: "backbeat.beatframe.v1",
ClusterID: "test",
BeatIndex: 100,
Downbeat: false,
Phase: "execute",
HLC: "7ffd:0001:abcd",
DeadlineAt: time.Now().Add(30 * time.Second),
TempoBPM: 2.0,
WindowID: "7e9b0e6c4c9a4e59b7f2d9a3c1b2e4d5",
}
err := validateBeatFrame(validFrame)
assert.NoError(t, err)
}
Integration Tests
Use the BACKBEAT validation tools:
# Test your message files
backbeat-validate --schemas /path/to/backbeat/schemas --dir messages/
# Test individual messages
echo '{"type":"backbeat.beatframe.v1",...}' | backbeat-validate --schemas /path/to/backbeat/schemas --message -
Load Testing
Consider tempo and message volume in your load tests:
func TestHighTempoHandling(t *testing.T) {
// Simulate 10 BPM (6-second beats)
tempo := 10.0
beatInterval := time.Duration(60/tempo) * time.Second
for i := 0; i < 100; i++ {
frame := generateBeatFrame(i, tempo)
handler.HandleBeatFrame(frame)
time.Sleep(beatInterval)
}
// Verify no beats were dropped
assert.Equal(t, 100, handler.processedBeats)
}
Production Deployment
Monitoring
Monitor these key metrics:
- Message Validation Rate: Percentage of valid messages received
- Beat Processing Latency: Time to process each beat phase
- Missed Beat Count: Number of beats that couldn't be processed on time
- Schema Version Compatibility: Ensure all services use compatible versions
Alerting
Set up alerts for:
- Schema validation failures > 1%
- Beat processing latency > 90% of phase duration
- Missed beats > 5% in any 10-minute window
- HLC timestamp drift > 5 seconds
Gradual Rollout
- Validate in CI: Ensure all messages pass schema validation
- Deploy to dev: Test with low tempo (0.5 BPM)
- Staging validation: Use production-like tempo and load
- Canary deployment: Roll out to small percentage of production traffic
- Full production: Monitor closely and be ready to rollback
Troubleshooting
Common Issues
- Wrong Message Type: Ensure
typefield exactly matches schema - HLC Format: Must be
XXXX:XXXX:XXXXformat with hex digits - Window ID Length: Must be exactly 32 hex characters
- Enum Values: States, phases, severities must match schema exactly
- Numeric Ranges: Check min/max constraints (tempo, beat_index, etc.)
Debug Tools
# Validate specific message
backbeat-validate --schemas ./schemas --message '{"type":"backbeat.beatframe.v1",...}'
# Get detailed validation errors
backbeat-validate --schemas ./schemas --file message.json --json
# Validate entire directory with detailed output
backbeat-validate --schemas ./schemas --dir messages/ --json > validation-report.json
Schema Evolution
See schema-evolution.md for details on:
- Semantic versioning for schemas
- Backward compatibility requirements
- Migration strategies for schema updates
- Version compatibility matrix
Performance Guidelines
See tempo-guide.md for details on:
- Choosing appropriate tempo for your workload
- Optimizing beat processing performance
- Handling tempo changes gracefully
- Resource utilization best practices
Support
- Documentation: This contracts package contains the authoritative reference
- Examples: See
contracts/tests/examples/for valid/invalid message samples - Issues: Report integration problems to the BACKBEAT team
- Updates: Monitor the contracts repository for schema updates