437 lines
12 KiB
Markdown
437 lines
12 KiB
Markdown
# BACKBEAT Integration Guide for CHORUS 2.0.0 Projects
|
|
|
|
This guide explains how to integrate BACKBEAT contract validation into your CHORUS 2.0.0 project for guaranteed compatibility with the distributed orchestration system.
|
|
|
|
## Overview
|
|
|
|
BACKBEAT provides three core interfaces for coordinated distributed execution:
|
|
|
|
- **INT-A (BeatFrame)**: Rhythm coordination from Pulse service to all agents
|
|
- **INT-B (StatusClaim)**: Agent status reporting to Reverb service
|
|
- **INT-C (BarReport)**: Periodic summary reports from Reverb to all services
|
|
|
|
All messages must conform to the published JSON schemas to ensure reliable operation across the CHORUS ecosystem.
|
|
|
|
## Quick Start
|
|
|
|
### 1. Add Contract Validation to Your CI Pipeline
|
|
|
|
#### GitHub Actions
|
|
```yaml
|
|
name: BACKBEAT Contract Validation
|
|
|
|
on: [push, pull_request]
|
|
|
|
jobs:
|
|
validate-backbeat:
|
|
runs-on: ubuntu-latest
|
|
steps:
|
|
- uses: actions/checkout@v4
|
|
|
|
- name: Checkout BACKBEAT contracts
|
|
uses: actions/checkout@v4
|
|
with:
|
|
repository: 'chorus-services/backbeat'
|
|
path: 'backbeat-contracts'
|
|
|
|
- name: Set up Go
|
|
uses: actions/setup-go@v4
|
|
with:
|
|
go-version: '1.22'
|
|
|
|
- name: Validate BACKBEAT messages
|
|
run: |
|
|
cd backbeat-contracts/contracts/tests/integration
|
|
make build
|
|
./backbeat-validate \
|
|
--schemas ../../schemas \
|
|
--dir ../../../your-messages-directory \
|
|
--exit-code
|
|
```
|
|
|
|
#### GitLab CI
|
|
```yaml
|
|
validate-backbeat:
|
|
stage: test
|
|
image: golang:1.22
|
|
before_script:
|
|
- git clone https://github.com/chorus-services/backbeat.git /tmp/backbeat
|
|
- cd /tmp/backbeat/contracts/tests/integration && make build
|
|
script:
|
|
- /tmp/backbeat/contracts/tests/integration/backbeat-validate
|
|
--schemas /tmp/backbeat/contracts/schemas
|
|
--dir $CI_PROJECT_DIR/messages
|
|
--exit-code
|
|
```
|
|
|
|
### 2. Project Makefile Integration
|
|
|
|
Add to your project's `Makefile`:
|
|
|
|
```makefile
|
|
# BACKBEAT contract validation
|
|
BACKBEAT_REPO = https://github.com/chorus-services/backbeat.git
|
|
BACKBEAT_DIR = .backbeat-contracts
|
|
|
|
$(BACKBEAT_DIR):
|
|
git clone $(BACKBEAT_REPO) $(BACKBEAT_DIR)
|
|
|
|
validate-backbeat: $(BACKBEAT_DIR)
|
|
cd $(BACKBEAT_DIR)/contracts/tests/integration && make build
|
|
$(BACKBEAT_DIR)/contracts/tests/integration/backbeat-validate \
|
|
--schemas $(BACKBEAT_DIR)/contracts/schemas \
|
|
--dir messages \
|
|
--exit-code
|
|
|
|
.PHONY: validate-backbeat
|
|
```
|
|
|
|
## Message Implementation
|
|
|
|
### Implementing BeatFrame Consumer (INT-A)
|
|
|
|
Your service should subscribe to beat frames from the Pulse service and respond appropriately:
|
|
|
|
```go
|
|
// Example Go implementation
|
|
type BeatFrameHandler struct {
|
|
currentBeat int64
|
|
phase string
|
|
}
|
|
|
|
func (h *BeatFrameHandler) HandleBeatFrame(frame BeatFrame) {
|
|
// Validate the beat frame
|
|
if err := validateBeatFrame(frame); err != nil {
|
|
log.Errorf("Invalid beat frame: %v", err)
|
|
return
|
|
}
|
|
|
|
// Update internal state
|
|
h.currentBeat = frame.BeatIndex
|
|
h.phase = frame.Phase
|
|
|
|
// Execute phase-appropriate actions
|
|
switch frame.Phase {
|
|
case "plan":
|
|
h.planPhase(frame)
|
|
case "execute":
|
|
h.executePhase(frame)
|
|
case "review":
|
|
h.reviewPhase(frame)
|
|
}
|
|
}
|
|
|
|
func validateBeatFrame(frame BeatFrame) error {
|
|
if frame.Type != "backbeat.beatframe.v1" {
|
|
return fmt.Errorf("invalid message type: %s", frame.Type)
|
|
}
|
|
if frame.TempoBPM < 0.1 || frame.TempoBPM > 1000 {
|
|
return fmt.Errorf("invalid tempo: %f", frame.TempoBPM)
|
|
}
|
|
// Add more validation as needed
|
|
return nil
|
|
}
|
|
```
|
|
|
|
### Implementing StatusClaim Publisher (INT-B)
|
|
|
|
Your agents should publish status claims to the Reverb service:
|
|
|
|
```go
|
|
func (agent *Agent) PublishStatusClaim(beatIndex int64, state string) error {
|
|
claim := StatusClaim{
|
|
Type: "backbeat.statusclaim.v1",
|
|
AgentID: agent.ID,
|
|
BeatIndex: beatIndex,
|
|
State: state,
|
|
HLC: agent.generateHLC(),
|
|
Progress: agent.calculateProgress(),
|
|
Notes: agent.getCurrentStatus(),
|
|
}
|
|
|
|
// Validate before sending
|
|
if err := validateStatusClaim(claim); err != nil {
|
|
return fmt.Errorf("invalid status claim: %w", err)
|
|
}
|
|
|
|
return agent.publisher.Publish("backbeat.statusclaims", claim)
|
|
}
|
|
|
|
func validateStatusClaim(claim StatusClaim) error {
|
|
validStates := []string{"idle", "planning", "executing", "reviewing", "completed", "failed", "blocked", "helping"}
|
|
for _, valid := range validStates {
|
|
if claim.State == valid {
|
|
return nil
|
|
}
|
|
}
|
|
return fmt.Errorf("invalid state: %s", claim.State)
|
|
}
|
|
```
|
|
|
|
### Implementing BarReport Consumer (INT-C)
|
|
|
|
Services should consume bar reports for cluster health awareness:
|
|
|
|
```go
|
|
func (service *Service) HandleBarReport(report BarReport) {
|
|
// Validate the bar report
|
|
if err := validateBarReport(report); err != nil {
|
|
log.Errorf("Invalid bar report: %v", err)
|
|
return
|
|
}
|
|
|
|
// Update cluster health metrics
|
|
service.updateClusterHealth(report)
|
|
|
|
// React to issues
|
|
if len(report.Issues) > 0 {
|
|
service.handleClusterIssues(report.Issues)
|
|
}
|
|
|
|
// Store performance metrics
|
|
service.storePerformanceMetrics(report.Performance)
|
|
}
|
|
|
|
func (service *Service) updateClusterHealth(report BarReport) {
|
|
service.clusterMetrics.AgentsReporting = report.AgentsReporting
|
|
service.clusterMetrics.OnTimeRate = float64(report.OnTimeReviews) / float64(report.AgentsReporting)
|
|
service.clusterMetrics.TempoDrift = report.TempoDriftMS
|
|
service.clusterMetrics.SecretRotationsOK = report.SecretRotationsOK
|
|
}
|
|
```
|
|
|
|
## Message Format Requirements
|
|
|
|
### Common Patterns
|
|
|
|
All BACKBEAT messages share these patterns:
|
|
|
|
1. **Type Field**: Must exactly match the schema constant
|
|
2. **HLC Timestamps**: Format `XXXX:XXXX:XXXX` (hex digits)
|
|
3. **Beat Indices**: Monotonically increasing integers ≥ 0
|
|
4. **Window IDs**: 32-character hexadecimal strings
|
|
5. **Agent IDs**: Pattern `service:instance` or `agent:identifier`
|
|
|
|
### Validation Best Practices
|
|
|
|
1. **Always validate messages before processing**
|
|
2. **Use schema validation in tests**
|
|
3. **Handle validation errors gracefully**
|
|
4. **Log validation failures for debugging**
|
|
|
|
Example validation function:
|
|
|
|
```go
|
|
func ValidateMessage(messageBytes []byte, expectedType string) error {
|
|
// Parse and check type
|
|
var msg map[string]interface{}
|
|
if err := json.Unmarshal(messageBytes, &msg); err != nil {
|
|
return fmt.Errorf("invalid JSON: %w", err)
|
|
}
|
|
|
|
msgType, ok := msg["type"].(string)
|
|
if !ok || msgType != expectedType {
|
|
return fmt.Errorf("expected type %s, got %s", expectedType, msgType)
|
|
}
|
|
|
|
// Use schema validation
|
|
return validateWithSchema(messageBytes, expectedType)
|
|
}
|
|
```
|
|
|
|
## Tempo and Timing Considerations
|
|
|
|
### Understanding Tempo
|
|
|
|
- **Default Tempo**: 1 BPM (60-second beats)
|
|
- **Minimum Tempo**: 1 BPM (60-second beats for batch or recovery windows)
|
|
- **Maximum Tempo**: 24 BPM (~2.5-second beats for high-frequency workloads)
|
|
|
|
### Phase Timing
|
|
|
|
Each beat consists of three phases with equal time allocation:
|
|
|
|
```
|
|
Beat Duration = 60 / TempoBPM seconds
|
|
Phase Duration = Beat Duration / 3
|
|
|
|
Plan Phase: [0, Beat Duration / 3)
|
|
Execute Phase: [Beat Duration / 3, 2 * Beat Duration / 3)
|
|
Review Phase: [2 * Beat Duration / 3, Beat Duration)
|
|
```
|
|
|
|
### Implementation Guidelines
|
|
|
|
1. **Respect Deadlines**: Always complete phase work before `deadline_at`
|
|
2. **Handle Tempo Changes**: Pulse may adjust tempo based on cluster performance
|
|
3. **Plan for Latency**: Factor in network and processing delays
|
|
4. **Implement Backpressure**: Report when unable to keep up with tempo
|
|
|
|
## Error Handling
|
|
|
|
### Schema Validation Failures
|
|
|
|
```go
|
|
func HandleInvalidMessage(err error, messageBytes []byte) {
|
|
log.Errorf("Schema validation failed: %v", err)
|
|
log.Debugf("Invalid message: %s", string(messageBytes))
|
|
|
|
// Send to dead letter queue or error handler
|
|
errorHandler.HandleInvalidMessage(messageBytes, err)
|
|
|
|
// Update metrics
|
|
metrics.InvalidMessageCounter.Inc()
|
|
}
|
|
```
|
|
|
|
### Network and Timing Issues
|
|
|
|
```go
|
|
func (agent *Agent) HandleMissedBeat(expectedBeat int64) {
|
|
// Report missed beat
|
|
claim := StatusClaim{
|
|
Type: "backbeat.statusclaim.v1",
|
|
AgentID: agent.ID,
|
|
BeatIndex: expectedBeat,
|
|
State: "blocked",
|
|
Notes: "missed beat due to network issues",
|
|
HLC: agent.generateHLC(),
|
|
}
|
|
|
|
// Try to catch up
|
|
agent.attemptResynchronization()
|
|
}
|
|
```
|
|
|
|
## Testing Your Integration
|
|
|
|
### Unit Tests
|
|
|
|
```go
|
|
func TestBeatFrameValidation(t *testing.T) {
|
|
validFrame := BeatFrame{
|
|
Type: "backbeat.beatframe.v1",
|
|
ClusterID: "test",
|
|
BeatIndex: 100,
|
|
Downbeat: false,
|
|
Phase: "execute",
|
|
HLC: "7ffd:0001:abcd",
|
|
DeadlineAt: time.Now().Add(30 * time.Second),
|
|
TempoBPM: 2.0,
|
|
WindowID: "7e9b0e6c4c9a4e59b7f2d9a3c1b2e4d5",
|
|
}
|
|
|
|
err := validateBeatFrame(validFrame)
|
|
assert.NoError(t, err)
|
|
}
|
|
```
|
|
|
|
### Integration Tests
|
|
|
|
Use the BACKBEAT validation tools:
|
|
|
|
```bash
|
|
# Test your message files
|
|
backbeat-validate --schemas /path/to/backbeat/schemas --dir messages/
|
|
|
|
# Test individual messages
|
|
echo '{"type":"backbeat.beatframe.v1",...}' | backbeat-validate --schemas /path/to/backbeat/schemas --message -
|
|
```
|
|
|
|
### Load Testing
|
|
|
|
Consider tempo and message volume in your load tests:
|
|
|
|
```go
|
|
func TestHighTempoHandling(t *testing.T) {
|
|
// Simulate 10 BPM (6-second beats)
|
|
tempo := 10.0
|
|
beatInterval := time.Duration(60/tempo) * time.Second
|
|
|
|
for i := 0; i < 100; i++ {
|
|
frame := generateBeatFrame(i, tempo)
|
|
handler.HandleBeatFrame(frame)
|
|
time.Sleep(beatInterval)
|
|
}
|
|
|
|
// Verify no beats were dropped
|
|
assert.Equal(t, 100, handler.processedBeats)
|
|
}
|
|
```
|
|
|
|
## Production Deployment
|
|
|
|
### Monitoring
|
|
|
|
Monitor these key metrics:
|
|
|
|
1. **Message Validation Rate**: Percentage of valid messages received
|
|
2. **Beat Processing Latency**: Time to process each beat phase
|
|
3. **Missed Beat Count**: Number of beats that couldn't be processed on time
|
|
4. **Schema Version Compatibility**: Ensure all services use compatible versions
|
|
|
|
### Alerting
|
|
|
|
Set up alerts for:
|
|
|
|
- Schema validation failures > 1%
|
|
- Beat processing latency > 90% of phase duration
|
|
- Missed beats > 5% in any 10-minute window
|
|
- HLC timestamp drift > 5 seconds
|
|
|
|
### Gradual Rollout
|
|
|
|
1. **Validate in CI**: Ensure all messages pass schema validation
|
|
2. **Deploy to dev**: Test with low tempo (0.5 BPM)
|
|
3. **Staging validation**: Use production-like tempo and load
|
|
4. **Canary deployment**: Roll out to small percentage of production traffic
|
|
5. **Full production**: Monitor closely and be ready to rollback
|
|
|
|
## Troubleshooting
|
|
|
|
### Common Issues
|
|
|
|
1. **Wrong Message Type**: Ensure `type` field exactly matches schema
|
|
2. **HLC Format**: Must be `XXXX:XXXX:XXXX` format with hex digits
|
|
3. **Window ID Length**: Must be exactly 32 hex characters
|
|
4. **Enum Values**: States, phases, severities must match schema exactly
|
|
5. **Numeric Ranges**: Check min/max constraints (tempo, beat_index, etc.)
|
|
|
|
### Debug Tools
|
|
|
|
```bash
|
|
# Validate specific message
|
|
backbeat-validate --schemas ./schemas --message '{"type":"backbeat.beatframe.v1",...}'
|
|
|
|
# Get detailed validation errors
|
|
backbeat-validate --schemas ./schemas --file message.json --json
|
|
|
|
# Validate entire directory with detailed output
|
|
backbeat-validate --schemas ./schemas --dir messages/ --json > validation-report.json
|
|
```
|
|
|
|
## Schema Evolution
|
|
|
|
See [schema-evolution.md](schema-evolution.md) for details on:
|
|
|
|
- Semantic versioning for schemas
|
|
- Backward compatibility requirements
|
|
- Migration strategies for schema updates
|
|
- Version compatibility matrix
|
|
|
|
## Performance Guidelines
|
|
|
|
See [tempo-guide.md](tempo-guide.md) for details on:
|
|
|
|
- Choosing appropriate tempo for your workload
|
|
- Optimizing beat processing performance
|
|
- Handling tempo changes gracefully
|
|
- Resource utilization best practices
|
|
|
|
## Support
|
|
|
|
- **Documentation**: This contracts package contains the authoritative reference
|
|
- **Examples**: See `contracts/tests/examples/` for valid/invalid message samples
|
|
- **Issues**: Report integration problems to the BACKBEAT team
|
|
- **Updates**: Monitor the contracts repository for schema updates
|