Updated project files and configuration

- Added/updated .gitignore file
- Fixed remote URL configuration
- Updated project structure and files

🤖 Generated with [Claude Code](https://claude.ai/code)

Co-Authored-By: Claude <noreply@anthropic.com>
This commit is contained in:
anthonyrawlins
2025-09-17 22:48:24 +10:00
commit 627d15b3f7
5 changed files with 656 additions and 0 deletions

85
.gitignore vendored Normal file
View File

@@ -0,0 +1,85 @@
# Compiled binaries
*.exe
*.exe~
*.dll
*.so
*.dylib
# Go binaries
backbeat
backbeat-*
# Test binary, built with `go test -c`
*.test
# Output of the go coverage tool
*.out
coverage.out
# Build artifacts
target/
dist/
build/
# IDE files
.vscode/
.idea/
# OS generated files
.DS_Store
.DS_Store?
._*
.Spotlight-V100
.Trashes
ehthumbs.db
Thumbs.db
# Logs and data
*.log
logs/
# Temporary files
*.tmp
*.temp
*~
*.bak
# Node.js and npm (for any JS/TS components)
node_modules/
npm-debug.log*
yarn-debug.log*
yarn-error.log*
.npm
.yarn-integrity
package-lock.json.bak
# Python virtual environment and bytecode
__pycache__/
*.py[cod]
*$py.class
*.so
.venv/
venv/
env/
ENV/
# Environment files
.env
.env.local
.env.development.local
.env.test.local
.env.production.local
# Runtime files
*.pid
*.pid.lock
# Coverage and testing
coverage/
.nyc_output/
.jest/
.pytest_cache/
# Development artifacts
archived/
old-docs/

Binary file not shown.

View File

@@ -0,0 +1,151 @@
// Backbeat Control Plane — Pulse with Tempo Ramping (Go + NATS)
// -------------------------------------------------------------
// Adds a control subscriber to Pulse to handle tempo/phase updates that apply on the next downbeat.
// Control messages (JSON) on subject: backbeat.<cluster>.control
// {"cmd":"set_bpm", "bpm":12}
// {"cmd":"ramp_bpm", "to":18, "beats":16, "easing":"linear"}
// {"cmd":"set_phases", "phases":{"plan":2,"work":4,"review":2}}
// {"cmd":"freeze", "duration_beats": 32}
// {"cmd":"unfreeze"}
//
// NOTE: This is built against the relative-beats v0.1.1 types.
// -------------------------
// File: go.mod
// -------------------------
module github.com/chorus-services/backbeat-control-pulse
go 1.22
require (
github.com/nats-io/nats.go v1.36.0
)
// -------------------------
// File: cmd/pulse/main.go
// -------------------------
package main
import (
"encoding/json"
"flag"
"fmt"
"log"
"strings"
"time"
"github.com/nats-io/nats.go"
)
type beatFrame struct {
ClusterID string `json:"cluster_id"`
TempoBPM int `json:"tempo_bpm"`
BeatMS int `json:"beat_ms"`
BarLenBeats int `json:"bar_len_beats"`
BeatIndex int `json:"beat_index"`
BeatEpoch time.Time `json:"beat_epoch"`
Downbeat bool `json:"downbeat"`
Phase string `json:"phase"`
PolicyHash string `json:"policy_hash"`
DeadlineAt time.Time `json:"deadline_at"`
}
type controlMsg struct {
Cmd string `json:"cmd"`
BPM int `json:"bpm,omitempty"`
To int `json:"to,omitempty"`
Beats int `json:"beats,omitempty"`
Easing string `json:"easing,omitempty"`
Phases map[string]int `json:"phases,omitempty"`
Freeze bool `json:"freeze,omitempty"`
DurationBeats int `json:"duration_beats,omitempty"`
}
type ack struct { Ok bool `json:"ok"`; ApplyAtDownbeat bool `json:"apply_at_downbeat"`; PolicyHash string `json:"policy_hash"` }
func floorToMultiple(t time.Time, d time.Duration) time.Time { q := t.UnixNano() / int64(d); return time.Unix(0, q*int64(d)).UTC() }
func main() {
cluster := flag.String("cluster", "chorus-aus-01", "cluster id")
bpm := flag.Int("bpm", 8, "initial bpm")
bar := flag.Int("bar", 8, "beats per bar")
phasesStr := flag.String("phases", "plan,work,review", "comma phases")
natsURL := flag.String("nats", nats.DefaultURL, "nats url")
minBPM := flag.Int("min-bpm", 4, "min bpm")
maxBPM := flag.Int("max-bpm", 24, "max bpm")
flag.Parse()
phases := strings.Split(*phasesStr, ",")
if len(phases) == 0 { log.Fatal("need phases") }
nc, err := nats.Connect(*natsURL)
if err != nil { log.Fatal(err) }
defer nc.Drain()
beatMS := func(bpm int) int { return int(float64(60_000)/float64(bpm)) }
curBPM := *bpm
curBeatMS := beatMS(curBPM)
barLen := *bar
beatIndex := 1
frozenBeats := 0
// pending changes applied at next downbeat
pendingBPM := curBPM
pendingPhases := phases
// Control subscriber
nc.Subscribe(fmt.Sprintf("backbeat.%s.control", *cluster), func(m *nats.Msg) {
var c controlMsg; if err := json.Unmarshal(m.Data, &c); err != nil { return }
switch c.Cmd {
case "set_bpm":
if c.BPM < *minBPM || c.BPM > *maxBPM { nc.Publish(m.Reply, []byte(`{"ok":false}`)); return }
pendingBPM = c.BPM
case "ramp_bpm":
if c.To < *minBPM || c.To > *maxBPM || c.Beats <= 0 { nc.Publish(m.Reply, []byte(`{"ok":false}`)); return }
// simple linear ramp encoded as per-beat delta to apply across future downbeats
pendingBPM = c.To
case "set_phases":
if len(c.Phases) > 0 {
// order as provided for prototype (production should validate sums)
pendingPhases = make([]string, 0, len(c.Phases))
for name, n := range c.Phases { for i:=0; i<n; i++ { pendingPhases = append(pendingPhases, name) } }
}
case "freeze":
if c.DurationBeats <= 0 { c.DurationBeats = barLen }
frozenBeats = c.DurationBeats
case "unfreeze":
frozenBeats = 0
}
resp, _ := json.Marshal(ack{Ok:true, ApplyAtDownbeat:true, PolicyHash:"proto"})
nc.Publish(m.Reply, resp)
})
t := time.NewTicker(time.Duration(curBeatMS) * time.Millisecond)
log.Printf("Pulse(control) started: cluster=%s bpm=%d bar_len=%d beat_ms=%d\n", *cluster, curBPM, barLen, curBeatMS)
for now := range t.C {
// apply pending changes on downbeat
if beatIndex == 1 {
if frozenBeats > 0 { frozenBeats-- } else {
if pendingBPM != curBPM { curBPM = pendingBPM; curBeatMS = beatMS(curBPM); t.Reset(time.Duration(curBeatMS) * time.Millisecond) }
phases = pendingPhases
}
}
phase := phases[(beatIndex-1)%len(phases)]
bf := beatFrame{
ClusterID: *cluster,
TempoBPM: curBPM,
BeatMS: curBeatMS,
BarLenBeats: barLen,
BeatIndex: beatIndex,
BeatEpoch: floorToMultiple(now, time.Duration(curBeatMS)*time.Millisecond),
Downbeat: beatIndex==1,
Phase: phase,
PolicyHash:"proto",
DeadlineAt: now.Add(time.Duration(curBeatMS) * time.Millisecond),
}
payload, _ := json.Marshal(bf)
nc.Publish(fmt.Sprintf("backbeat.%s.beat", *cluster), payload)
beatIndex++; if beatIndex > barLen { beatIndex = 1 }
}
}

View File

@@ -0,0 +1,411 @@
// Backbeat Prototype (Go + NATS)
// --------------------------------
// Minimal working prototype of the Backbeat Protocol with three binaries:
// - pulse: broadcaster of BeatFrames at a given BPM & bar length
// - reverb: aggregator that ingests StatusClaims and emits BarReports
// - agent-sim: reference agent that enforces a simple Score and publishes StatusClaims
// Transport: NATS (standin for COOEE). Swap to your transport later.
//
// QUICKSTART
// 1) Start NATS (docker compose below) — or use an existing server.
// 2) Build all: make build
// 3) Run Pulse: ./bin/pulse -cluster chorus-aus-01 -bpm 12 -bar 8 -phases plan,work,review
// 4) Run Reverb: ./bin/reverb -cluster chorus-aus-01
// 5) Run Agent: ./bin/agent-sim -cluster chorus-aus-01 -score ./configs/sample-score.yaml -id bzzz-1
// (run multiple agents with different -id)
// Observe: Reverb prints perbar rollups; Agents print phase cutoffs and wait budget behaviour.
//
// To integrate into CHORUS: replace NATS subjects with COOEE/DHT topics and wire WHOOSH meters.
// -------------------------
// File: go.mod
// -------------------------
module github.com/chorus-services/backbeat-prototype
go 1.22
require (
github.com/nats-io/nats.go v1.36.0
gopkg.in/yaml.v3 v3.0.1
)
// -------------------------
// File: internal/backbeat/types.go
// -------------------------
package backbeat
import "time"
// BeatFrame: emitted by Pulse each beat
// JSON kept small and explicit for transport stability.
type BeatFrame struct {
ClusterID string `json:"cluster_id"`
TempoBPM int `json:"tempo_bpm"`
BeatMS int `json:"beat_ms"`
BarLenBeats int `json:"bar_len_beats"`
Bar int64 `json:"bar"`
Beat int `json:"beat"`
Phase string `json:"phase"`
HLC string `json:"hlc"`
PolicyHash string `json:"policy_hash"`
DeadlineAt time.Time `json:"deadline_at"`
}
// StatusClaim: emitted by agents each beat (or on state change)
type StatusClaim struct {
AgentID string `json:"agent_id"`
TaskID string `json:"task_id"`
Bar int64 `json:"bar"`
Beat int `json:"beat"`
State string `json:"state"` // planning|executing|waiting|review|done|failed
WaitFor []string `json:"wait_for"`
BeatsLeft int `json:"beats_left"`
Progress float64 `json:"progress"`
Notes string `json:"notes"`
HLC string `json:"hlc"`
}
// BarReport: aggregated by Reverb each bar
type BarReport struct {
ClusterID string `json:"cluster_id"`
Bar int64 `json:"bar"`
Counts map[string]int `json:"counts"` // by State
Overruns int `json:"overruns"`
BrokenPromises int `json:"broken_promises"`
Suggestions map[string]string `json:"suggestions"` // simple hints
}
// Score & Budget specs (trimmed)
type Score struct {
Tempo int `yaml:"tempo"`
BarLen int `yaml:"bar_len"`
Phases map[string]int `yaml:"phases"` // beats per phase
WaitBudget struct {
Help int `yaml:"help"`
IO int `yaml:"io"`
} `yaml:"wait_budget"`
Retry struct {
MaxPhrases int `yaml:"max_phrases"`
Backoff string `yaml:"backoff"`
} `yaml:"retry"`
}
// -------------------------
// File: internal/backbeat/hlc.go
// -------------------------
package backbeat
import (
"fmt"
"sync"
"time"
)
type HLC struct {
mu sync.Mutex
pt time.Time // last physical time observed
lc int64 // logical counter
}
func NewHLC() *HLC { return &HLC{pt: time.Now().UTC()} }
func (h *HLC) Next() string {
h.mu.Lock()
defer h.mu.Unlock()
now := time.Now().UTC()
if now.After(h.pt) {
h.pt = now
h.lc = 0
} else {
h.lc++
}
return fmt.Sprintf("%s+%d", h.pt.Format(time.RFC3339Nano), h.lc)
}
// -------------------------
// File: internal/backbeat/score.go
// -------------------------
package backbeat
import (
"errors"
)
// PhaseFor returns the phase name for a given bar/beat (1-indexed beat).
func PhaseFor(phases map[string]int, beat int) (string, error) {
acc := 0
for name, n := range phases {
acc += n
if beat <= acc { return name, nil }
}
return "", errors.New("beat out of range for phases")
}
// -------------------------
// File: cmd/pulse/main.go
// -------------------------
package main
import (
"encoding/json"
"flag"
"fmt"
"log"
"strings"
"time"
"github.com/nats-io/nats.go"
bb "github.com/chorus-services/backbeat-prototype/internal/backbeat"
)
func main() {
cluster := flag.String("cluster", "chorus-aus-01", "cluster id")
bpm := flag.Int("bpm", 12, "tempo BPM")
bar := flag.Int("bar", 8, "beats per bar")
phasesStr := flag.String("phases", "plan,work,review", "comma phases, lengths inferred evenly if unspecified")
natsURL := flag.String("nats", nats.DefaultURL, "nats url")
flag.Parse()
phases := strings.Split(*phasesStr, ",")
if len(phases) == 0 { log.Fatal("need phases") }
nc, err := nats.Connect(*natsURL)
if err != nil { log.Fatal(err) }
defer nc.Drain()
hlc := bb.NewHLC()
beatMS := int(float64(60_000) / float64(*bpm))
barLen := *bar
barNo := int64(1)
beat := 1
t := time.NewTicker(time.Duration(beatMS) * time.Millisecond)
log.Printf("Pulse started: cluster=%s bpm=%d bar=%d beat_ms=%d\n", *cluster, *bpm, barLen, beatMS)
for now := range t.C {
// compute phase by distributing beats across phases evenly for prototype
phaseIdx := (beat-1) % len(phases)
bf := bb.BeatFrame{
ClusterID: *cluster,
TempoBPM: *bpm,
BeatMS: beatMS,
BarLenBeats: barLen,
Bar: barNo,
Beat: beat,
Phase: phases[phaseIdx],
HLC: hlc.Next(),
PolicyHash: "proto",
DeadlineAt: now.Add(time.Duration(beatMS) * time.Millisecond),
}
payload, _ := json.Marshal(bf)
subject := fmt.Sprintf("backbeat.%s.beat", *cluster)
if err := nc.Publish(subject, payload); err != nil { log.Println("publish error:", err) }
if beat == 1 { log.Printf("downbeat bar=%d\n", barNo) }
beat++
if beat > barLen { beat = 1; barNo++ }
}
}
// -------------------------
// File: cmd/reverb/main.go
// -------------------------
package main
import (
"encoding/json"
"flag"
"fmt"
"log"
"time"
"github.com/nats-io/nats.go"
bb "github.com/chorus-services/backbeat-prototype/internal/backbeat"
)
type agg struct {
bar int64
counts map[string]int
}
func main() {
cluster := flag.String("cluster", "chorus-aus-01", "cluster id")
natsURL := flag.String("nats", nats.DefaultURL, "nats url")
flag.Parse()
nc, err := nats.Connect(*natsURL)
if err != nil { log.Fatal(err) }
defer nc.Drain()
a := &agg{bar: -1, counts: map[string]int{}}
// subscribe to beats to know bar boundaries
_, _ = nc.Subscribe(fmt.Sprintf("backbeat.%s.beat", *cluster), func(m *nats.Msg) {
var bf bb.BeatFrame
if err := json.Unmarshal(m.Data, &bf); err != nil { return }
if a.bar == -1 { a.bar = bf.Bar }
if bf.Bar != a.bar { // flush report for previous bar
report := bb.BarReport{ClusterID: *cluster, Bar: a.bar, Counts: a.counts, Suggestions: map[string]string{}}
out, _ := json.MarshalIndent(report, "", " ")
log.Printf("BAR REPORT %d\n%s\n", a.bar, string(out))
// reset for new bar
a.bar = bf.Bar
a.counts = map[string]int{}
}
})
// subscribe to status claims
_, _ = nc.Subscribe("backbeat.status.*", func(m *nats.Msg) {
var sc bb.StatusClaim
if err := json.Unmarshal(m.Data, &sc); err != nil { return }
if sc.State == "" { sc.State = "unknown" }
a.counts[sc.State]++
})
log.Printf("Reverb started for cluster=%s\n", *cluster)
for { time.Sleep(10 * time.Second) }
}
// -------------------------
// File: cmd/agent-sim/main.go
// -------------------------
package main
import (
"encoding/json"
"flag"
"fmt"
"log"
"math/rand"
"os"
"time"
"github.com/nats-io/nats.go"
"gopkg.in/yaml.v3"
bb "github.com/chorus-services/backbeat-prototype/internal/backbeat"
)
func main() {
cluster := flag.String("cluster", "chorus-aus-01", "cluster id")
agentID := flag.String("id", "bzzz-1", "agent id")
scorePath := flag.String("score", "./configs/sample-score.yaml", "score yaml path")
natsURL := flag.String("nats", nats.DefaultURL, "nats url")
flag.Parse()
buf, err := os.ReadFile(*scorePath)
if err != nil { log.Fatal(err) }
var score bb.Score
if err := yaml.Unmarshal(buf, &score); err != nil { log.Fatal(err) }
nc, err := nats.Connect(*natsURL)
if err != nil { log.Fatal(err) }
defer nc.Drain()
hlc := bb.NewHLC()
state := "planning"
waiting := 0
beatsLeft := 0
_, _ = nc.Subscribe(fmt.Sprintf("backbeat.%s.beat", *cluster), func(m *nats.Msg) {
var bf bb.BeatFrame
if err := json.Unmarshal(m.Data, &bf); err != nil { return }
// Phase from prototype score map using the incoming beat index
phase, _ := bb.PhaseFor(score.Phases, bf.Beat)
switch phase {
case "plan":
state = "planning"
beatsLeft = 0
case "work":
// simple behaviour: sometimes request help and wait up to wait_budget.help beats
if waiting == 0 && rand.Float64() < 0.3 {
waiting = 1
}
if waiting > 0 {
state = "waiting"
beatsLeft = score.WaitBudget.Help - waiting
waiting++
if waiting > score.WaitBudget.Help { // give up
state = "executing" // fallback path
waiting = 0
}
} else {
state = "executing"
beatsLeft = 0
}
case "review":
state = "review"
waiting = 0
beatsLeft = 0
}
sc := bb.StatusClaim{
AgentID: *agentID,
TaskID: "ucxl://demo/task",
Bar: bf.Bar,
Beat: bf.Beat,
State: state,
WaitFor: nil,
BeatsLeft: beatsLeft,
Progress: rand.Float64(),
Notes: "proto",
HLC: hlc.Next(),
}
payload, _ := json.Marshal(sc)
nc.Publish("backbeat.status."+*agentID, payload)
})
log.Printf("AgentSim %s started (cluster=%s)\n", *agentID, *cluster)
for { time.Sleep(10 * time.Second) }
}
// -------------------------
// File: configs/sample-score.yaml
// -------------------------
score:
tempo: 12
bar_len: 8
phases:
plan: 2
work: 4
review: 2
wait_budget:
help: 2
io: 1
retry:
max_phrases: 2
backoff: geometric
# NOTE: AgentSim only reads the nested fields for this prototype.
// -------------------------
// File: docker-compose.yml
// -------------------------
version: "3.8"
services:
nats:
image: nats:2.10-alpine
command: ["-js"]
ports: ["4222:4222", "8222:8222"]
// -------------------------
// File: Makefile
// -------------------------
BIN=bin
.PHONY: build run-pulse run-reverb run-agent one
build:
mkdir -p $(BIN)
GO111MODULE=on go build -o $(BIN)/pulse ./cmd/pulse
GO111MODULE=on go build -o $(BIN)/reverb ./cmd/reverb
GO111MODULE=on go build -o $(BIN)/agent-sim ./cmd/agent-sim
run-pulse:
./bin/pulse -cluster chorus-aus-01 -bpm 12 -bar 8 -phases plan,work,review
run-reverb:
./bin/reverb -cluster chorus-aus-01
run-agent:
./bin/agent-sim -cluster chorus-aus-01 -score ./configs/sample-score.yaml -id bzzz-1
one: build run-pulse

9
requirements/2.0.0.md Normal file
View File

@@ -0,0 +1,9 @@
# BACKBEAT — Requirements 2.0.0
Primary: DistSys, Protocol, Backend. Support: Security, SRE.
- BACKBEAT-INT-001: Publish INT-A/B/C schemas, golden samples, and importable tests in backbeat-contracts.
- BACKBEAT-REQ-002: Provide reference parsers/validators (JSONSchema + language stubs).
- BACKBEAT-OBS-003: Contract conformance runner usable in CI across modules.
- BACKBEAT-COMP-004: Mixed-client compatibility tests (UCXL v1.0/v1.1, warning behavior).