commit 627d15b3f7077333a1d69eeab79929625f312553 Author: anthonyrawlins Date: Wed Sep 17 22:48:24 2025 +1000 Updated project files and configuration - Added/updated .gitignore file - Fixed remote URL configuration - Updated project structure and files 🤖 Generated with [Claude Code](https://claude.ai/code) Co-Authored-By: Claude diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..3f4624f --- /dev/null +++ b/.gitignore @@ -0,0 +1,85 @@ +# Compiled binaries +*.exe +*.exe~ +*.dll +*.so +*.dylib + +# Go binaries +backbeat +backbeat-* + +# Test binary, built with `go test -c` +*.test + +# Output of the go coverage tool +*.out +coverage.out + +# Build artifacts +target/ +dist/ +build/ + +# IDE files +.vscode/ +.idea/ + +# OS generated files +.DS_Store +.DS_Store? +._* +.Spotlight-V100 +.Trashes +ehthumbs.db +Thumbs.db + +# Logs and data +*.log +logs/ + +# Temporary files +*.tmp +*.temp +*~ +*.bak + +# Node.js and npm (for any JS/TS components) +node_modules/ +npm-debug.log* +yarn-debug.log* +yarn-error.log* +.npm +.yarn-integrity +package-lock.json.bak + +# Python virtual environment and bytecode +__pycache__/ +*.py[cod] +*$py.class +*.so +.venv/ +venv/ +env/ +ENV/ + +# Environment files +.env +.env.local +.env.development.local +.env.test.local +.env.production.local + +# Runtime files +*.pid +*.pid.lock + +# Coverage and testing +coverage/ +.nyc_output/ +.jest/ +.pytest_cache/ + +# Development artifacts +archived/ +old-docs/ \ No newline at end of file diff --git a/archives/backbeat_bundle_v0.1.zip b/archives/backbeat_bundle_v0.1.zip new file mode 100644 index 0000000..cbe6d82 Binary files /dev/null and b/archives/backbeat_bundle_v0.1.zip differ diff --git a/archives/backbeat_control_plane_pulse_with_tempo_ramping_go_nats.go b/archives/backbeat_control_plane_pulse_with_tempo_ramping_go_nats.go new file mode 100644 index 0000000..02d5f94 --- /dev/null +++ b/archives/backbeat_control_plane_pulse_with_tempo_ramping_go_nats.go @@ -0,0 +1,151 @@ +// Backbeat Control Plane — Pulse with Tempo Ramping (Go + NATS) +// ------------------------------------------------------------- +// Adds a control subscriber to Pulse to handle tempo/phase updates that apply on the next downbeat. +// Control messages (JSON) on subject: backbeat..control +// {"cmd":"set_bpm", "bpm":12} +// {"cmd":"ramp_bpm", "to":18, "beats":16, "easing":"linear"} +// {"cmd":"set_phases", "phases":{"plan":2,"work":4,"review":2}} +// {"cmd":"freeze", "duration_beats": 32} +// {"cmd":"unfreeze"} +// +// NOTE: This is built against the relative-beats v0.1.1 types. + +// ------------------------- +// File: go.mod +// ------------------------- +module github.com/chorus-services/backbeat-control-pulse + +go 1.22 + +require ( + github.com/nats-io/nats.go v1.36.0 +) + +// ------------------------- +// File: cmd/pulse/main.go +// ------------------------- +package main + +import ( + "encoding/json" + "flag" + "fmt" + "log" + "strings" + "time" + + "github.com/nats-io/nats.go" +) + +type beatFrame struct { + ClusterID string `json:"cluster_id"` + TempoBPM int `json:"tempo_bpm"` + BeatMS int `json:"beat_ms"` + BarLenBeats int `json:"bar_len_beats"` + BeatIndex int `json:"beat_index"` + BeatEpoch time.Time `json:"beat_epoch"` + Downbeat bool `json:"downbeat"` + Phase string `json:"phase"` + PolicyHash string `json:"policy_hash"` + DeadlineAt time.Time `json:"deadline_at"` +} + +type controlMsg struct { + Cmd string `json:"cmd"` + BPM int `json:"bpm,omitempty"` + To int `json:"to,omitempty"` + Beats int `json:"beats,omitempty"` + Easing string `json:"easing,omitempty"` + Phases map[string]int `json:"phases,omitempty"` + Freeze bool `json:"freeze,omitempty"` + DurationBeats int `json:"duration_beats,omitempty"` +} + +type ack struct { Ok bool `json:"ok"`; ApplyAtDownbeat bool `json:"apply_at_downbeat"`; PolicyHash string `json:"policy_hash"` } + +func floorToMultiple(t time.Time, d time.Duration) time.Time { q := t.UnixNano() / int64(d); return time.Unix(0, q*int64(d)).UTC() } + +func main() { + cluster := flag.String("cluster", "chorus-aus-01", "cluster id") + bpm := flag.Int("bpm", 8, "initial bpm") + bar := flag.Int("bar", 8, "beats per bar") + phasesStr := flag.String("phases", "plan,work,review", "comma phases") + natsURL := flag.String("nats", nats.DefaultURL, "nats url") + minBPM := flag.Int("min-bpm", 4, "min bpm") + maxBPM := flag.Int("max-bpm", 24, "max bpm") + flag.Parse() + + phases := strings.Split(*phasesStr, ",") + if len(phases) == 0 { log.Fatal("need phases") } + + nc, err := nats.Connect(*natsURL) + if err != nil { log.Fatal(err) } + defer nc.Drain() + + beatMS := func(bpm int) int { return int(float64(60_000)/float64(bpm)) } + curBPM := *bpm + curBeatMS := beatMS(curBPM) + barLen := *bar + beatIndex := 1 + frozenBeats := 0 + + // pending changes applied at next downbeat + pendingBPM := curBPM + pendingPhases := phases + + // Control subscriber + nc.Subscribe(fmt.Sprintf("backbeat.%s.control", *cluster), func(m *nats.Msg) { + var c controlMsg; if err := json.Unmarshal(m.Data, &c); err != nil { return } + switch c.Cmd { + case "set_bpm": + if c.BPM < *minBPM || c.BPM > *maxBPM { nc.Publish(m.Reply, []byte(`{"ok":false}`)); return } + pendingBPM = c.BPM + case "ramp_bpm": + if c.To < *minBPM || c.To > *maxBPM || c.Beats <= 0 { nc.Publish(m.Reply, []byte(`{"ok":false}`)); return } + // simple linear ramp encoded as per-beat delta to apply across future downbeats + pendingBPM = c.To + case "set_phases": + if len(c.Phases) > 0 { + // order as provided for prototype (production should validate sums) + pendingPhases = make([]string, 0, len(c.Phases)) + for name, n := range c.Phases { for i:=0; i 0 { frozenBeats-- } else { + if pendingBPM != curBPM { curBPM = pendingBPM; curBeatMS = beatMS(curBPM); t.Reset(time.Duration(curBeatMS) * time.Millisecond) } + phases = pendingPhases + } + } + phase := phases[(beatIndex-1)%len(phases)] + bf := beatFrame{ + ClusterID: *cluster, + TempoBPM: curBPM, + BeatMS: curBeatMS, + BarLenBeats: barLen, + BeatIndex: beatIndex, + BeatEpoch: floorToMultiple(now, time.Duration(curBeatMS)*time.Millisecond), + Downbeat: beatIndex==1, + Phase: phase, + PolicyHash:"proto", + DeadlineAt: now.Add(time.Duration(curBeatMS) * time.Millisecond), + } + payload, _ := json.Marshal(bf) + nc.Publish(fmt.Sprintf("backbeat.%s.beat", *cluster), payload) + beatIndex++; if beatIndex > barLen { beatIndex = 1 } + } +} diff --git a/archives/backbeat_prototype_go_nats_pulse_reverb_agent_sim.go b/archives/backbeat_prototype_go_nats_pulse_reverb_agent_sim.go new file mode 100644 index 0000000..715ab53 --- /dev/null +++ b/archives/backbeat_prototype_go_nats_pulse_reverb_agent_sim.go @@ -0,0 +1,411 @@ +// Backbeat Prototype (Go + NATS) +// -------------------------------- +// Minimal working prototype of the Backbeat Protocol with three binaries: +// - pulse: broadcaster of BeatFrames at a given BPM & bar length +// - reverb: aggregator that ingests StatusClaims and emits BarReports +// - agent-sim: reference agent that enforces a simple Score and publishes StatusClaims +// Transport: NATS (stand‑in for COOEE). Swap to your transport later. +// +// QUICKSTART +// 1) Start NATS (docker compose below) — or use an existing server. +// 2) Build all: make build +// 3) Run Pulse: ./bin/pulse -cluster chorus-aus-01 -bpm 12 -bar 8 -phases plan,work,review +// 4) Run Reverb: ./bin/reverb -cluster chorus-aus-01 +// 5) Run Agent: ./bin/agent-sim -cluster chorus-aus-01 -score ./configs/sample-score.yaml -id bzzz-1 +// (run multiple agents with different -id) +// Observe: Reverb prints per‑bar rollups; Agents print phase cutoffs and wait budget behaviour. +// +// To integrate into CHORUS: replace NATS subjects with COOEE/DHT topics and wire WHOOSH meters. + +// ------------------------- +// File: go.mod +// ------------------------- +module github.com/chorus-services/backbeat-prototype + +go 1.22 + +require ( + github.com/nats-io/nats.go v1.36.0 + gopkg.in/yaml.v3 v3.0.1 +) + +// ------------------------- +// File: internal/backbeat/types.go +// ------------------------- +package backbeat + +import "time" + +// BeatFrame: emitted by Pulse each beat +// JSON kept small and explicit for transport stability. +type BeatFrame struct { + ClusterID string `json:"cluster_id"` + TempoBPM int `json:"tempo_bpm"` + BeatMS int `json:"beat_ms"` + BarLenBeats int `json:"bar_len_beats"` + Bar int64 `json:"bar"` + Beat int `json:"beat"` + Phase string `json:"phase"` + HLC string `json:"hlc"` + PolicyHash string `json:"policy_hash"` + DeadlineAt time.Time `json:"deadline_at"` +} + +// StatusClaim: emitted by agents each beat (or on state change) +type StatusClaim struct { + AgentID string `json:"agent_id"` + TaskID string `json:"task_id"` + Bar int64 `json:"bar"` + Beat int `json:"beat"` + State string `json:"state"` // planning|executing|waiting|review|done|failed + WaitFor []string `json:"wait_for"` + BeatsLeft int `json:"beats_left"` + Progress float64 `json:"progress"` + Notes string `json:"notes"` + HLC string `json:"hlc"` +} + +// BarReport: aggregated by Reverb each bar +type BarReport struct { + ClusterID string `json:"cluster_id"` + Bar int64 `json:"bar"` + Counts map[string]int `json:"counts"` // by State + Overruns int `json:"overruns"` + BrokenPromises int `json:"broken_promises"` + Suggestions map[string]string `json:"suggestions"` // simple hints +} + +// Score & Budget specs (trimmed) +type Score struct { + Tempo int `yaml:"tempo"` + BarLen int `yaml:"bar_len"` + Phases map[string]int `yaml:"phases"` // beats per phase + WaitBudget struct { + Help int `yaml:"help"` + IO int `yaml:"io"` + } `yaml:"wait_budget"` + Retry struct { + MaxPhrases int `yaml:"max_phrases"` + Backoff string `yaml:"backoff"` + } `yaml:"retry"` +} + +// ------------------------- +// File: internal/backbeat/hlc.go +// ------------------------- +package backbeat + +import ( + "fmt" + "sync" + "time" +) + +type HLC struct { + mu sync.Mutex + pt time.Time // last physical time observed + lc int64 // logical counter +} + +func NewHLC() *HLC { return &HLC{pt: time.Now().UTC()} } + +func (h *HLC) Next() string { + h.mu.Lock() + defer h.mu.Unlock() + now := time.Now().UTC() + if now.After(h.pt) { + h.pt = now + h.lc = 0 + } else { + h.lc++ + } + return fmt.Sprintf("%s+%d", h.pt.Format(time.RFC3339Nano), h.lc) +} + +// ------------------------- +// File: internal/backbeat/score.go +// ------------------------- +package backbeat + +import ( + "errors" +) + +// PhaseFor returns the phase name for a given bar/beat (1-indexed beat). +func PhaseFor(phases map[string]int, beat int) (string, error) { + acc := 0 + for name, n := range phases { + acc += n + if beat <= acc { return name, nil } + } + return "", errors.New("beat out of range for phases") +} + +// ------------------------- +// File: cmd/pulse/main.go +// ------------------------- +package main + +import ( + "encoding/json" + "flag" + "fmt" + "log" + "strings" + "time" + + "github.com/nats-io/nats.go" + bb "github.com/chorus-services/backbeat-prototype/internal/backbeat" +) + +func main() { + cluster := flag.String("cluster", "chorus-aus-01", "cluster id") + bpm := flag.Int("bpm", 12, "tempo BPM") + bar := flag.Int("bar", 8, "beats per bar") + phasesStr := flag.String("phases", "plan,work,review", "comma phases, lengths inferred evenly if unspecified") + natsURL := flag.String("nats", nats.DefaultURL, "nats url") + flag.Parse() + + phases := strings.Split(*phasesStr, ",") + if len(phases) == 0 { log.Fatal("need phases") } + + nc, err := nats.Connect(*natsURL) + if err != nil { log.Fatal(err) } + defer nc.Drain() + + hlc := bb.NewHLC() + beatMS := int(float64(60_000) / float64(*bpm)) + barLen := *bar + barNo := int64(1) + beat := 1 + + t := time.NewTicker(time.Duration(beatMS) * time.Millisecond) + log.Printf("Pulse started: cluster=%s bpm=%d bar=%d beat_ms=%d\n", *cluster, *bpm, barLen, beatMS) + for now := range t.C { + // compute phase by distributing beats across phases evenly for prototype + phaseIdx := (beat-1) % len(phases) + bf := bb.BeatFrame{ + ClusterID: *cluster, + TempoBPM: *bpm, + BeatMS: beatMS, + BarLenBeats: barLen, + Bar: barNo, + Beat: beat, + Phase: phases[phaseIdx], + HLC: hlc.Next(), + PolicyHash: "proto", + DeadlineAt: now.Add(time.Duration(beatMS) * time.Millisecond), + } + payload, _ := json.Marshal(bf) + subject := fmt.Sprintf("backbeat.%s.beat", *cluster) + if err := nc.Publish(subject, payload); err != nil { log.Println("publish error:", err) } + if beat == 1 { log.Printf("downbeat bar=%d\n", barNo) } + beat++ + if beat > barLen { beat = 1; barNo++ } + } +} + +// ------------------------- +// File: cmd/reverb/main.go +// ------------------------- +package main + +import ( + "encoding/json" + "flag" + "fmt" + "log" + "time" + + "github.com/nats-io/nats.go" + bb "github.com/chorus-services/backbeat-prototype/internal/backbeat" +) + +type agg struct { + bar int64 + counts map[string]int +} + +func main() { + cluster := flag.String("cluster", "chorus-aus-01", "cluster id") + natsURL := flag.String("nats", nats.DefaultURL, "nats url") + flag.Parse() + + nc, err := nats.Connect(*natsURL) + if err != nil { log.Fatal(err) } + defer nc.Drain() + + a := &agg{bar: -1, counts: map[string]int{}} + + // subscribe to beats to know bar boundaries + _, _ = nc.Subscribe(fmt.Sprintf("backbeat.%s.beat", *cluster), func(m *nats.Msg) { + var bf bb.BeatFrame + if err := json.Unmarshal(m.Data, &bf); err != nil { return } + if a.bar == -1 { a.bar = bf.Bar } + if bf.Bar != a.bar { // flush report for previous bar + report := bb.BarReport{ClusterID: *cluster, Bar: a.bar, Counts: a.counts, Suggestions: map[string]string{}} + out, _ := json.MarshalIndent(report, "", " ") + log.Printf("BAR REPORT %d\n%s\n", a.bar, string(out)) + // reset for new bar + a.bar = bf.Bar + a.counts = map[string]int{} + } + }) + + // subscribe to status claims + _, _ = nc.Subscribe("backbeat.status.*", func(m *nats.Msg) { + var sc bb.StatusClaim + if err := json.Unmarshal(m.Data, &sc); err != nil { return } + if sc.State == "" { sc.State = "unknown" } + a.counts[sc.State]++ + }) + + log.Printf("Reverb started for cluster=%s\n", *cluster) + for { time.Sleep(10 * time.Second) } +} + +// ------------------------- +// File: cmd/agent-sim/main.go +// ------------------------- +package main + +import ( + "encoding/json" + "flag" + "fmt" + "log" + "math/rand" + "os" + "time" + + "github.com/nats-io/nats.go" + "gopkg.in/yaml.v3" + bb "github.com/chorus-services/backbeat-prototype/internal/backbeat" +) + +func main() { + cluster := flag.String("cluster", "chorus-aus-01", "cluster id") + agentID := flag.String("id", "bzzz-1", "agent id") + scorePath := flag.String("score", "./configs/sample-score.yaml", "score yaml path") + natsURL := flag.String("nats", nats.DefaultURL, "nats url") + flag.Parse() + + buf, err := os.ReadFile(*scorePath) + if err != nil { log.Fatal(err) } + var score bb.Score + if err := yaml.Unmarshal(buf, &score); err != nil { log.Fatal(err) } + + nc, err := nats.Connect(*natsURL) + if err != nil { log.Fatal(err) } + defer nc.Drain() + + hlc := bb.NewHLC() + state := "planning" + waiting := 0 + beatsLeft := 0 + + _, _ = nc.Subscribe(fmt.Sprintf("backbeat.%s.beat", *cluster), func(m *nats.Msg) { + var bf bb.BeatFrame + if err := json.Unmarshal(m.Data, &bf); err != nil { return } + + // Phase from prototype score map using the incoming beat index + phase, _ := bb.PhaseFor(score.Phases, bf.Beat) + switch phase { + case "plan": + state = "planning" + beatsLeft = 0 + case "work": + // simple behaviour: sometimes request help and wait up to wait_budget.help beats + if waiting == 0 && rand.Float64() < 0.3 { + waiting = 1 + } + if waiting > 0 { + state = "waiting" + beatsLeft = score.WaitBudget.Help - waiting + waiting++ + if waiting > score.WaitBudget.Help { // give up + state = "executing" // fallback path + waiting = 0 + } + } else { + state = "executing" + beatsLeft = 0 + } + case "review": + state = "review" + waiting = 0 + beatsLeft = 0 + } + + sc := bb.StatusClaim{ + AgentID: *agentID, + TaskID: "ucxl://demo/task", + Bar: bf.Bar, + Beat: bf.Beat, + State: state, + WaitFor: nil, + BeatsLeft: beatsLeft, + Progress: rand.Float64(), + Notes: "proto", + HLC: hlc.Next(), + } + payload, _ := json.Marshal(sc) + nc.Publish("backbeat.status."+*agentID, payload) + }) + + log.Printf("AgentSim %s started (cluster=%s)\n", *agentID, *cluster) + for { time.Sleep(10 * time.Second) } +} + +// ------------------------- +// File: configs/sample-score.yaml +// ------------------------- +score: + tempo: 12 + bar_len: 8 + phases: + plan: 2 + work: 4 + review: 2 + wait_budget: + help: 2 + io: 1 + retry: + max_phrases: 2 + backoff: geometric + +# NOTE: AgentSim only reads the nested fields for this prototype. + +// ------------------------- +// File: docker-compose.yml +// ------------------------- +version: "3.8" +services: + nats: + image: nats:2.10-alpine + command: ["-js"] + ports: ["4222:4222", "8222:8222"] + +// ------------------------- +// File: Makefile +// ------------------------- +BIN=bin + +.PHONY: build run-pulse run-reverb run-agent one + +build: + mkdir -p $(BIN) + GO111MODULE=on go build -o $(BIN)/pulse ./cmd/pulse + GO111MODULE=on go build -o $(BIN)/reverb ./cmd/reverb + GO111MODULE=on go build -o $(BIN)/agent-sim ./cmd/agent-sim + +run-pulse: + ./bin/pulse -cluster chorus-aus-01 -bpm 12 -bar 8 -phases plan,work,review + +run-reverb: + ./bin/reverb -cluster chorus-aus-01 + +run-agent: + ./bin/agent-sim -cluster chorus-aus-01 -score ./configs/sample-score.yaml -id bzzz-1 + +one: build run-pulse diff --git a/requirements/2.0.0.md b/requirements/2.0.0.md new file mode 100644 index 0000000..745923f --- /dev/null +++ b/requirements/2.0.0.md @@ -0,0 +1,9 @@ +# BACKBEAT — Requirements 2.0.0 + +Primary: DistSys, Protocol, Backend. Support: Security, SRE. + +- BACKBEAT-INT-001: Publish INT-A/B/C schemas, golden samples, and importable tests in backbeat-contracts. +- BACKBEAT-REQ-002: Provide reference parsers/validators (JSONSchema + language stubs). +- BACKBEAT-OBS-003: Contract conformance runner usable in CI across modules. +- BACKBEAT-COMP-004: Mixed-client compatibility tests (UCXL v1.0/v1.1, warning behavior). +