Files
BACKBEAT/contracts/docs/integration-guide.md
2025-10-17 08:56:25 +11:00

437 lines
12 KiB
Markdown

# BACKBEAT Integration Guide for CHORUS 2.0.0 Projects
This guide explains how to integrate BACKBEAT contract validation into your CHORUS 2.0.0 project for guaranteed compatibility with the distributed orchestration system.
## Overview
BACKBEAT provides three core interfaces for coordinated distributed execution:
- **INT-A (BeatFrame)**: Rhythm coordination from Pulse service to all agents
- **INT-B (StatusClaim)**: Agent status reporting to Reverb service
- **INT-C (BarReport)**: Periodic summary reports from Reverb to all services
All messages must conform to the published JSON schemas to ensure reliable operation across the CHORUS ecosystem.
## Quick Start
### 1. Add Contract Validation to Your CI Pipeline
#### GitHub Actions
```yaml
name: BACKBEAT Contract Validation
on: [push, pull_request]
jobs:
validate-backbeat:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- name: Checkout BACKBEAT contracts
uses: actions/checkout@v4
with:
repository: 'chorus-services/backbeat'
path: 'backbeat-contracts'
- name: Set up Go
uses: actions/setup-go@v4
with:
go-version: '1.22'
- name: Validate BACKBEAT messages
run: |
cd backbeat-contracts/contracts/tests/integration
make build
./backbeat-validate \
--schemas ../../schemas \
--dir ../../../your-messages-directory \
--exit-code
```
#### GitLab CI
```yaml
validate-backbeat:
stage: test
image: golang:1.22
before_script:
- git clone https://github.com/chorus-services/backbeat.git /tmp/backbeat
- cd /tmp/backbeat/contracts/tests/integration && make build
script:
- /tmp/backbeat/contracts/tests/integration/backbeat-validate
--schemas /tmp/backbeat/contracts/schemas
--dir $CI_PROJECT_DIR/messages
--exit-code
```
### 2. Project Makefile Integration
Add to your project's `Makefile`:
```makefile
# BACKBEAT contract validation
BACKBEAT_REPO = https://github.com/chorus-services/backbeat.git
BACKBEAT_DIR = .backbeat-contracts
$(BACKBEAT_DIR):
git clone $(BACKBEAT_REPO) $(BACKBEAT_DIR)
validate-backbeat: $(BACKBEAT_DIR)
cd $(BACKBEAT_DIR)/contracts/tests/integration && make build
$(BACKBEAT_DIR)/contracts/tests/integration/backbeat-validate \
--schemas $(BACKBEAT_DIR)/contracts/schemas \
--dir messages \
--exit-code
.PHONY: validate-backbeat
```
## Message Implementation
### Implementing BeatFrame Consumer (INT-A)
Your service should subscribe to beat frames from the Pulse service and respond appropriately:
```go
// Example Go implementation
type BeatFrameHandler struct {
currentBeat int64
phase string
}
func (h *BeatFrameHandler) HandleBeatFrame(frame BeatFrame) {
// Validate the beat frame
if err := validateBeatFrame(frame); err != nil {
log.Errorf("Invalid beat frame: %v", err)
return
}
// Update internal state
h.currentBeat = frame.BeatIndex
h.phase = frame.Phase
// Execute phase-appropriate actions
switch frame.Phase {
case "plan":
h.planPhase(frame)
case "execute":
h.executePhase(frame)
case "review":
h.reviewPhase(frame)
}
}
func validateBeatFrame(frame BeatFrame) error {
if frame.Type != "backbeat.beatframe.v1" {
return fmt.Errorf("invalid message type: %s", frame.Type)
}
if frame.TempoBPM < 0.1 || frame.TempoBPM > 1000 {
return fmt.Errorf("invalid tempo: %f", frame.TempoBPM)
}
// Add more validation as needed
return nil
}
```
### Implementing StatusClaim Publisher (INT-B)
Your agents should publish status claims to the Reverb service:
```go
func (agent *Agent) PublishStatusClaim(beatIndex int64, state string) error {
claim := StatusClaim{
Type: "backbeat.statusclaim.v1",
AgentID: agent.ID,
BeatIndex: beatIndex,
State: state,
HLC: agent.generateHLC(),
Progress: agent.calculateProgress(),
Notes: agent.getCurrentStatus(),
}
// Validate before sending
if err := validateStatusClaim(claim); err != nil {
return fmt.Errorf("invalid status claim: %w", err)
}
return agent.publisher.Publish("backbeat.statusclaims", claim)
}
func validateStatusClaim(claim StatusClaim) error {
validStates := []string{"idle", "planning", "executing", "reviewing", "completed", "failed", "blocked", "helping"}
for _, valid := range validStates {
if claim.State == valid {
return nil
}
}
return fmt.Errorf("invalid state: %s", claim.State)
}
```
### Implementing BarReport Consumer (INT-C)
Services should consume bar reports for cluster health awareness:
```go
func (service *Service) HandleBarReport(report BarReport) {
// Validate the bar report
if err := validateBarReport(report); err != nil {
log.Errorf("Invalid bar report: %v", err)
return
}
// Update cluster health metrics
service.updateClusterHealth(report)
// React to issues
if len(report.Issues) > 0 {
service.handleClusterIssues(report.Issues)
}
// Store performance metrics
service.storePerformanceMetrics(report.Performance)
}
func (service *Service) updateClusterHealth(report BarReport) {
service.clusterMetrics.AgentsReporting = report.AgentsReporting
service.clusterMetrics.OnTimeRate = float64(report.OnTimeReviews) / float64(report.AgentsReporting)
service.clusterMetrics.TempoDrift = report.TempoDriftMS
service.clusterMetrics.SecretRotationsOK = report.SecretRotationsOK
}
```
## Message Format Requirements
### Common Patterns
All BACKBEAT messages share these patterns:
1. **Type Field**: Must exactly match the schema constant
2. **HLC Timestamps**: Format `XXXX:XXXX:XXXX` (hex digits)
3. **Beat Indices**: Monotonically increasing integers ≥ 0
4. **Window IDs**: 32-character hexadecimal strings
5. **Agent IDs**: Pattern `service:instance` or `agent:identifier`
### Validation Best Practices
1. **Always validate messages before processing**
2. **Use schema validation in tests**
3. **Handle validation errors gracefully**
4. **Log validation failures for debugging**
Example validation function:
```go
func ValidateMessage(messageBytes []byte, expectedType string) error {
// Parse and check type
var msg map[string]interface{}
if err := json.Unmarshal(messageBytes, &msg); err != nil {
return fmt.Errorf("invalid JSON: %w", err)
}
msgType, ok := msg["type"].(string)
if !ok || msgType != expectedType {
return fmt.Errorf("expected type %s, got %s", expectedType, msgType)
}
// Use schema validation
return validateWithSchema(messageBytes, expectedType)
}
```
## Tempo and Timing Considerations
### Understanding Tempo
- **Default Tempo**: 1 BPM (60-second beats)
- **Minimum Tempo**: 1 BPM (60-second beats for batch or recovery windows)
- **Maximum Tempo**: 24 BPM (~2.5-second beats for high-frequency workloads)
### Phase Timing
Each beat consists of three phases with equal time allocation:
```
Beat Duration = 60 / TempoBPM seconds
Phase Duration = Beat Duration / 3
Plan Phase: [0, Beat Duration / 3)
Execute Phase: [Beat Duration / 3, 2 * Beat Duration / 3)
Review Phase: [2 * Beat Duration / 3, Beat Duration)
```
### Implementation Guidelines
1. **Respect Deadlines**: Always complete phase work before `deadline_at`
2. **Handle Tempo Changes**: Pulse may adjust tempo based on cluster performance
3. **Plan for Latency**: Factor in network and processing delays
4. **Implement Backpressure**: Report when unable to keep up with tempo
## Error Handling
### Schema Validation Failures
```go
func HandleInvalidMessage(err error, messageBytes []byte) {
log.Errorf("Schema validation failed: %v", err)
log.Debugf("Invalid message: %s", string(messageBytes))
// Send to dead letter queue or error handler
errorHandler.HandleInvalidMessage(messageBytes, err)
// Update metrics
metrics.InvalidMessageCounter.Inc()
}
```
### Network and Timing Issues
```go
func (agent *Agent) HandleMissedBeat(expectedBeat int64) {
// Report missed beat
claim := StatusClaim{
Type: "backbeat.statusclaim.v1",
AgentID: agent.ID,
BeatIndex: expectedBeat,
State: "blocked",
Notes: "missed beat due to network issues",
HLC: agent.generateHLC(),
}
// Try to catch up
agent.attemptResynchronization()
}
```
## Testing Your Integration
### Unit Tests
```go
func TestBeatFrameValidation(t *testing.T) {
validFrame := BeatFrame{
Type: "backbeat.beatframe.v1",
ClusterID: "test",
BeatIndex: 100,
Downbeat: false,
Phase: "execute",
HLC: "7ffd:0001:abcd",
DeadlineAt: time.Now().Add(30 * time.Second),
TempoBPM: 2.0,
WindowID: "7e9b0e6c4c9a4e59b7f2d9a3c1b2e4d5",
}
err := validateBeatFrame(validFrame)
assert.NoError(t, err)
}
```
### Integration Tests
Use the BACKBEAT validation tools:
```bash
# Test your message files
backbeat-validate --schemas /path/to/backbeat/schemas --dir messages/
# Test individual messages
echo '{"type":"backbeat.beatframe.v1",...}' | backbeat-validate --schemas /path/to/backbeat/schemas --message -
```
### Load Testing
Consider tempo and message volume in your load tests:
```go
func TestHighTempoHandling(t *testing.T) {
// Simulate 10 BPM (6-second beats)
tempo := 10.0
beatInterval := time.Duration(60/tempo) * time.Second
for i := 0; i < 100; i++ {
frame := generateBeatFrame(i, tempo)
handler.HandleBeatFrame(frame)
time.Sleep(beatInterval)
}
// Verify no beats were dropped
assert.Equal(t, 100, handler.processedBeats)
}
```
## Production Deployment
### Monitoring
Monitor these key metrics:
1. **Message Validation Rate**: Percentage of valid messages received
2. **Beat Processing Latency**: Time to process each beat phase
3. **Missed Beat Count**: Number of beats that couldn't be processed on time
4. **Schema Version Compatibility**: Ensure all services use compatible versions
### Alerting
Set up alerts for:
- Schema validation failures > 1%
- Beat processing latency > 90% of phase duration
- Missed beats > 5% in any 10-minute window
- HLC timestamp drift > 5 seconds
### Gradual Rollout
1. **Validate in CI**: Ensure all messages pass schema validation
2. **Deploy to dev**: Test with low tempo (0.5 BPM)
3. **Staging validation**: Use production-like tempo and load
4. **Canary deployment**: Roll out to small percentage of production traffic
5. **Full production**: Monitor closely and be ready to rollback
## Troubleshooting
### Common Issues
1. **Wrong Message Type**: Ensure `type` field exactly matches schema
2. **HLC Format**: Must be `XXXX:XXXX:XXXX` format with hex digits
3. **Window ID Length**: Must be exactly 32 hex characters
4. **Enum Values**: States, phases, severities must match schema exactly
5. **Numeric Ranges**: Check min/max constraints (tempo, beat_index, etc.)
### Debug Tools
```bash
# Validate specific message
backbeat-validate --schemas ./schemas --message '{"type":"backbeat.beatframe.v1",...}'
# Get detailed validation errors
backbeat-validate --schemas ./schemas --file message.json --json
# Validate entire directory with detailed output
backbeat-validate --schemas ./schemas --dir messages/ --json > validation-report.json
```
## Schema Evolution
See [schema-evolution.md](schema-evolution.md) for details on:
- Semantic versioning for schemas
- Backward compatibility requirements
- Migration strategies for schema updates
- Version compatibility matrix
## Performance Guidelines
See [tempo-guide.md](tempo-guide.md) for details on:
- Choosing appropriate tempo for your workload
- Optimizing beat processing performance
- Handling tempo changes gracefully
- Resource utilization best practices
## Support
- **Documentation**: This contracts package contains the authoritative reference
- **Examples**: See `contracts/tests/examples/` for valid/invalid message samples
- **Issues**: Report integration problems to the BACKBEAT team
- **Updates**: Monitor the contracts repository for schema updates