Files
BACKBEAT/contracts/docs/integration-guide.md
2025-10-17 08:56:25 +11:00

12 KiB

BACKBEAT Integration Guide for CHORUS 2.0.0 Projects

This guide explains how to integrate BACKBEAT contract validation into your CHORUS 2.0.0 project for guaranteed compatibility with the distributed orchestration system.

Overview

BACKBEAT provides three core interfaces for coordinated distributed execution:

  • INT-A (BeatFrame): Rhythm coordination from Pulse service to all agents
  • INT-B (StatusClaim): Agent status reporting to Reverb service
  • INT-C (BarReport): Periodic summary reports from Reverb to all services

All messages must conform to the published JSON schemas to ensure reliable operation across the CHORUS ecosystem.

Quick Start

1. Add Contract Validation to Your CI Pipeline

GitHub Actions

name: BACKBEAT Contract Validation

on: [push, pull_request]

jobs:
  validate-backbeat:
    runs-on: ubuntu-latest
    steps:
    - uses: actions/checkout@v4
    
    - name: Checkout BACKBEAT contracts
      uses: actions/checkout@v4
      with:
        repository: 'chorus-services/backbeat'
        path: 'backbeat-contracts'
        
    - name: Set up Go
      uses: actions/setup-go@v4
      with:
        go-version: '1.22'
        
    - name: Validate BACKBEAT messages
      run: |
        cd backbeat-contracts/contracts/tests/integration
        make build
        ./backbeat-validate \
          --schemas ../../schemas \
          --dir ../../../your-messages-directory \
          --exit-code

GitLab CI

validate-backbeat:
  stage: test
  image: golang:1.22
  before_script:
    - git clone https://github.com/chorus-services/backbeat.git /tmp/backbeat
    - cd /tmp/backbeat/contracts/tests/integration && make build
  script:
    - /tmp/backbeat/contracts/tests/integration/backbeat-validate 
        --schemas /tmp/backbeat/contracts/schemas 
        --dir $CI_PROJECT_DIR/messages 
        --exit-code

2. Project Makefile Integration

Add to your project's Makefile:

# BACKBEAT contract validation
BACKBEAT_REPO = https://github.com/chorus-services/backbeat.git
BACKBEAT_DIR = .backbeat-contracts

$(BACKBEAT_DIR):
	git clone $(BACKBEAT_REPO) $(BACKBEAT_DIR)

validate-backbeat: $(BACKBEAT_DIR)
	cd $(BACKBEAT_DIR)/contracts/tests/integration && make build
	$(BACKBEAT_DIR)/contracts/tests/integration/backbeat-validate \
		--schemas $(BACKBEAT_DIR)/contracts/schemas \
		--dir messages \
		--exit-code

.PHONY: validate-backbeat

Message Implementation

Implementing BeatFrame Consumer (INT-A)

Your service should subscribe to beat frames from the Pulse service and respond appropriately:

// Example Go implementation
type BeatFrameHandler struct {
    currentBeat int64
    phase       string
}

func (h *BeatFrameHandler) HandleBeatFrame(frame BeatFrame) {
    // Validate the beat frame
    if err := validateBeatFrame(frame); err != nil {
        log.Errorf("Invalid beat frame: %v", err)
        return
    }
    
    // Update internal state
    h.currentBeat = frame.BeatIndex
    h.phase = frame.Phase
    
    // Execute phase-appropriate actions
    switch frame.Phase {
    case "plan":
        h.planPhase(frame)
    case "execute":
        h.executePhase(frame)
    case "review":
        h.reviewPhase(frame)
    }
}

func validateBeatFrame(frame BeatFrame) error {
    if frame.Type != "backbeat.beatframe.v1" {
        return fmt.Errorf("invalid message type: %s", frame.Type)
    }
    if frame.TempoBPM < 0.1 || frame.TempoBPM > 1000 {
        return fmt.Errorf("invalid tempo: %f", frame.TempoBPM)
    }
    // Add more validation as needed
    return nil
}

Implementing StatusClaim Publisher (INT-B)

Your agents should publish status claims to the Reverb service:

func (agent *Agent) PublishStatusClaim(beatIndex int64, state string) error {
    claim := StatusClaim{
        Type:      "backbeat.statusclaim.v1",
        AgentID:   agent.ID,
        BeatIndex: beatIndex,
        State:     state,
        HLC:       agent.generateHLC(),
        Progress:  agent.calculateProgress(),
        Notes:     agent.getCurrentStatus(),
    }
    
    // Validate before sending
    if err := validateStatusClaim(claim); err != nil {
        return fmt.Errorf("invalid status claim: %w", err)
    }
    
    return agent.publisher.Publish("backbeat.statusclaims", claim)
}

func validateStatusClaim(claim StatusClaim) error {
    validStates := []string{"idle", "planning", "executing", "reviewing", "completed", "failed", "blocked", "helping"}
    for _, valid := range validStates {
        if claim.State == valid {
            return nil
        }
    }
    return fmt.Errorf("invalid state: %s", claim.State)
}

Implementing BarReport Consumer (INT-C)

Services should consume bar reports for cluster health awareness:

func (service *Service) HandleBarReport(report BarReport) {
    // Validate the bar report
    if err := validateBarReport(report); err != nil {
        log.Errorf("Invalid bar report: %v", err)
        return
    }
    
    // Update cluster health metrics
    service.updateClusterHealth(report)
    
    // React to issues
    if len(report.Issues) > 0 {
        service.handleClusterIssues(report.Issues)
    }
    
    // Store performance metrics
    service.storePerformanceMetrics(report.Performance)
}

func (service *Service) updateClusterHealth(report BarReport) {
    service.clusterMetrics.AgentsReporting = report.AgentsReporting
    service.clusterMetrics.OnTimeRate = float64(report.OnTimeReviews) / float64(report.AgentsReporting)
    service.clusterMetrics.TempoDrift = report.TempoDriftMS
    service.clusterMetrics.SecretRotationsOK = report.SecretRotationsOK
}

Message Format Requirements

Common Patterns

All BACKBEAT messages share these patterns:

  1. Type Field: Must exactly match the schema constant
  2. HLC Timestamps: Format XXXX:XXXX:XXXX (hex digits)
  3. Beat Indices: Monotonically increasing integers ≥ 0
  4. Window IDs: 32-character hexadecimal strings
  5. Agent IDs: Pattern service:instance or agent:identifier

Validation Best Practices

  1. Always validate messages before processing
  2. Use schema validation in tests
  3. Handle validation errors gracefully
  4. Log validation failures for debugging

Example validation function:

func ValidateMessage(messageBytes []byte, expectedType string) error {
    // Parse and check type
    var msg map[string]interface{}
    if err := json.Unmarshal(messageBytes, &msg); err != nil {
        return fmt.Errorf("invalid JSON: %w", err)
    }
    
    msgType, ok := msg["type"].(string)
    if !ok || msgType != expectedType {
        return fmt.Errorf("expected type %s, got %s", expectedType, msgType)
    }
    
    // Use schema validation
    return validateWithSchema(messageBytes, expectedType)
}

Tempo and Timing Considerations

Understanding Tempo

  • Default Tempo: 1 BPM (60-second beats)
  • Minimum Tempo: 1 BPM (60-second beats for batch or recovery windows)
  • Maximum Tempo: 24 BPM (~2.5-second beats for high-frequency workloads)

Phase Timing

Each beat consists of three phases with equal time allocation:

Beat Duration = 60 / TempoBPM seconds
Phase Duration = Beat Duration / 3

Plan Phase:    [0, Beat Duration / 3)
Execute Phase: [Beat Duration / 3, 2 * Beat Duration / 3)  
Review Phase:  [2 * Beat Duration / 3, Beat Duration)

Implementation Guidelines

  1. Respect Deadlines: Always complete phase work before deadline_at
  2. Handle Tempo Changes: Pulse may adjust tempo based on cluster performance
  3. Plan for Latency: Factor in network and processing delays
  4. Implement Backpressure: Report when unable to keep up with tempo

Error Handling

Schema Validation Failures

func HandleInvalidMessage(err error, messageBytes []byte) {
    log.Errorf("Schema validation failed: %v", err)
    log.Debugf("Invalid message: %s", string(messageBytes))
    
    // Send to dead letter queue or error handler
    errorHandler.HandleInvalidMessage(messageBytes, err)
    
    // Update metrics
    metrics.InvalidMessageCounter.Inc()
}

Network and Timing Issues

func (agent *Agent) HandleMissedBeat(expectedBeat int64) {
    // Report missed beat
    claim := StatusClaim{
        Type:      "backbeat.statusclaim.v1",
        AgentID:   agent.ID,
        BeatIndex: expectedBeat,
        State:     "blocked",
        Notes:     "missed beat due to network issues",
        HLC:       agent.generateHLC(),
    }
    
    // Try to catch up
    agent.attemptResynchronization()
}

Testing Your Integration

Unit Tests

func TestBeatFrameValidation(t *testing.T) {
    validFrame := BeatFrame{
        Type:       "backbeat.beatframe.v1",
        ClusterID:  "test",
        BeatIndex:  100,
        Downbeat:   false,
        Phase:      "execute",
        HLC:        "7ffd:0001:abcd",
        DeadlineAt: time.Now().Add(30 * time.Second),
        TempoBPM:   2.0,
        WindowID:   "7e9b0e6c4c9a4e59b7f2d9a3c1b2e4d5",
    }
    
    err := validateBeatFrame(validFrame)
    assert.NoError(t, err)
}

Integration Tests

Use the BACKBEAT validation tools:

# Test your message files
backbeat-validate --schemas /path/to/backbeat/schemas --dir messages/

# Test individual messages  
echo '{"type":"backbeat.beatframe.v1",...}' | backbeat-validate --schemas /path/to/backbeat/schemas --message -

Load Testing

Consider tempo and message volume in your load tests:

func TestHighTempoHandling(t *testing.T) {
    // Simulate 10 BPM (6-second beats)
    tempo := 10.0
    beatInterval := time.Duration(60/tempo) * time.Second
    
    for i := 0; i < 100; i++ {
        frame := generateBeatFrame(i, tempo)
        handler.HandleBeatFrame(frame)
        time.Sleep(beatInterval)
    }
    
    // Verify no beats were dropped
    assert.Equal(t, 100, handler.processedBeats)
}

Production Deployment

Monitoring

Monitor these key metrics:

  1. Message Validation Rate: Percentage of valid messages received
  2. Beat Processing Latency: Time to process each beat phase
  3. Missed Beat Count: Number of beats that couldn't be processed on time
  4. Schema Version Compatibility: Ensure all services use compatible versions

Alerting

Set up alerts for:

  • Schema validation failures > 1%
  • Beat processing latency > 90% of phase duration
  • Missed beats > 5% in any 10-minute window
  • HLC timestamp drift > 5 seconds

Gradual Rollout

  1. Validate in CI: Ensure all messages pass schema validation
  2. Deploy to dev: Test with low tempo (0.5 BPM)
  3. Staging validation: Use production-like tempo and load
  4. Canary deployment: Roll out to small percentage of production traffic
  5. Full production: Monitor closely and be ready to rollback

Troubleshooting

Common Issues

  1. Wrong Message Type: Ensure type field exactly matches schema
  2. HLC Format: Must be XXXX:XXXX:XXXX format with hex digits
  3. Window ID Length: Must be exactly 32 hex characters
  4. Enum Values: States, phases, severities must match schema exactly
  5. Numeric Ranges: Check min/max constraints (tempo, beat_index, etc.)

Debug Tools

# Validate specific message
backbeat-validate --schemas ./schemas --message '{"type":"backbeat.beatframe.v1",...}'

# Get detailed validation errors
backbeat-validate --schemas ./schemas --file message.json --json

# Validate entire directory with detailed output
backbeat-validate --schemas ./schemas --dir messages/ --json > validation-report.json

Schema Evolution

See schema-evolution.md for details on:

  • Semantic versioning for schemas
  • Backward compatibility requirements
  • Migration strategies for schema updates
  • Version compatibility matrix

Performance Guidelines

See tempo-guide.md for details on:

  • Choosing appropriate tempo for your workload
  • Optimizing beat processing performance
  • Handling tempo changes gracefully
  • Resource utilization best practices

Support

  • Documentation: This contracts package contains the authoritative reference
  • Examples: See contracts/tests/examples/ for valid/invalid message samples
  • Issues: Report integration problems to the BACKBEAT team
  • Updates: Monitor the contracts repository for schema updates