From 627d15b3f7077333a1d69eeab79929625f312553 Mon Sep 17 00:00:00 2001 From: anthonyrawlins Date: Wed, 17 Sep 2025 22:48:24 +1000 Subject: [PATCH] Updated project files and configuration MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Added/updated .gitignore file - Fixed remote URL configuration - Updated project structure and files 🤖 Generated with [Claude Code](https://claude.ai/code) Co-Authored-By: Claude --- .gitignore | 85 ++++ archives/backbeat_bundle_v0.1.zip | Bin 0 -> 10912 bytes ..._plane_pulse_with_tempo_ramping_go_nats.go | 151 +++++++ ...rototype_go_nats_pulse_reverb_agent_sim.go | 411 ++++++++++++++++++ requirements/2.0.0.md | 9 + 5 files changed, 656 insertions(+) create mode 100644 .gitignore create mode 100644 archives/backbeat_bundle_v0.1.zip create mode 100644 archives/backbeat_control_plane_pulse_with_tempo_ramping_go_nats.go create mode 100644 archives/backbeat_prototype_go_nats_pulse_reverb_agent_sim.go create mode 100644 requirements/2.0.0.md diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..3f4624f --- /dev/null +++ b/.gitignore @@ -0,0 +1,85 @@ +# Compiled binaries +*.exe +*.exe~ +*.dll +*.so +*.dylib + +# Go binaries +backbeat +backbeat-* + +# Test binary, built with `go test -c` +*.test + +# Output of the go coverage tool +*.out +coverage.out + +# Build artifacts +target/ +dist/ +build/ + +# IDE files +.vscode/ +.idea/ + +# OS generated files +.DS_Store +.DS_Store? +._* +.Spotlight-V100 +.Trashes +ehthumbs.db +Thumbs.db + +# Logs and data +*.log +logs/ + +# Temporary files +*.tmp +*.temp +*~ +*.bak + +# Node.js and npm (for any JS/TS components) +node_modules/ +npm-debug.log* +yarn-debug.log* +yarn-error.log* +.npm +.yarn-integrity +package-lock.json.bak + +# Python virtual environment and bytecode +__pycache__/ +*.py[cod] +*$py.class +*.so +.venv/ +venv/ +env/ +ENV/ + +# Environment files +.env +.env.local +.env.development.local +.env.test.local +.env.production.local + +# Runtime files +*.pid +*.pid.lock + +# Coverage and testing +coverage/ +.nyc_output/ +.jest/ +.pytest_cache/ + +# Development artifacts +archived/ +old-docs/ \ No newline at end of file diff --git a/archives/backbeat_bundle_v0.1.zip b/archives/backbeat_bundle_v0.1.zip new file mode 100644 index 0000000000000000000000000000000000000000..cbe6d828c2b87a9593d163d91301480d64041f81 GIT binary patch literal 10912 zcma)?1yGz@^7aP^5`w#Ha1FuT-Gf`OK?er+;O#}#_)GfF*Kc*j8l}=a;^(=z+JKf`*xrQpg!$FME4ci!KKp$ z4M;(v$;bB^26nn|Ar;kKO#P(__Zu zn>rV2cRL6dro`@csxJxl(nXLv(Jq*6TjM`1PKWy7D#pyHWw&jwiu@!F7+#Mp9+4%; zQDXrb>d5j7L#K0T6l7nf(X)q~cTDzfbchNRliSBDQjCdwD;QRQpXF z=bKSni-VJNHjx$ov^MkjLOia>0Ki)k0D$U0*T&ukXvFldmHKRB2X+7(fkB_0m>EC3 zELeo*irvyIYTMo?(2jXv4vljB19s8ibZp81`tas}@-nxakH~tosDJx9AQ7h@_p`&| z0dR47yXlLM-VQ%p zt%y}c3

4j(bk1aEMw4(eXkfmmG_k_HtPA^KTdplmrZ8bDTlEBPHXvt38-GintVb z_hVJGh>pPlwk0N-q>xh3VkQWLEiS(d1^cO}sG!`gwl_hc&yi74unx92K`#$HJ>2e& z=0kaS`MzA5%^!fkU}x@n>xl*lQ!<&}rs@%=lwR)S@vZoC3wD;7a5SsLEX3~}iW$mH z%Hv$V!WYyL(uCz^kyA{LDN#1D%Ab?&RTv#1M!Z<#O;mC#Tn$vh=R{IrwDj~XbCjC& z$@eo1*=?aStik6vC(Yif_m3o>0wx|zh$JAUX&hrsANd`*NPd_F z_VYE%1VYm(+t_?thr3rSsDba6n3wIHk35z&Nq`aIE|it}xcXjQjK+EcTC?O`LgM2S zK%pvmqE`mm=oh$-UPa3y;rhPhB!+$7O1NJ&>jnryJ2x!RD&`nU3h>R&ywp;_@@)Br zsU)dTC5F*q97Lc8m$pR}#8=Ng(JLI*Q3A&Xsy4fIsFw)R^g$WZ-)~_Z&tDTTy->W? zcboh!H~?WO)EHPmT`Dc7%cp|?>u`Gf-RtBSg$du#NJAKVTtJ+q7w&hXFn>q=kd876 z9n>+65*^EoVWvSX&>sbND6qxvD2MaiIlacL`!cCWd;Txo<}lq7c)fe`7*#HyJsSDU zA+cbB=QDM)^sUpeA=1i#4Wy}kgAAsa|CGaq|2oTUbM;t(=`cvk;3zC@9*q*rFh;BZy#f|Gap={ebJ7fd5vY48tl=SoK3ZuHa<1&S z0dZ2{BY2lsT_G9O6jGhgNmKfsPLQJ1Jb8%ShZ2J6lgIHc1+hc|o-ICUK?Tkc4B zwgs6k@V5rE#;rHd*PlSB`-`P1bB%1Ph%>3DJ{9yk;ePLS*<|*A6hb}rrW zk>cInlh&6gXrzzVk7TKFL!Rzosct!N1NR)72Xc1+=m)PU!6p$t7J)pjP+RaNas;9GX= zx!yyU^IGe3072uV9#VFvMfV3bToK(tHdODN6GxA==OJVBJ=Gwy;oIJ@uF|)jq$sr2 zrDx8jEo|%1b}-;XDi{7Xx_S+0rVkCarlzL-tI7e`htufCXX~AAq;rdw%Zu7;Xp%dR zPR>rJ)~6l~t<}zL)jYVT@=mA7q~`%EO_BU1DRcxv*6=rH%_&SyD=bK>RG zUz$hNXpkX|?JM$Eny)FlH^!~{LDbT1P7;nhg-qp2%u2^kOQc9~V5jBmyrMn|n zaaSv^qbYNq^MozqHSW4OO z2}6l?`RJ&jEMedet}%#_%bHHnzC%20+;Lo%GC%=CaO)EJFm#&F@u%x3WQDX~TXotH zp1^cj8#^Bealqn@eV8|b^3K(ukd&NOBJ4|C0E$)2S} z5*7bq&0oRFAH3W2Xs@IOZb$rqu@;1oYr|;N{yW-!jsyxI?Wzbeq_&#hKn5ufja+&| zImKHsMGIZQO~*#7F<)m+sE)!SjfNE^%E!<)q>56R zOScvBIUzesc$gN269onHMs-uiq)p$ex770^Kivht7YjTOtHkgPviy4TOWF^rL%XVn z+w5+8q_)`&mcT>Sqd~ecg)x51c7!E2khFnn`v%>Z5Md5i zM?5qUewF9!aj6(*kI8SK0Yj^+nNl>rQ;-#KZk!V9emJzeyD%o|%=oTzgl&=S*p}Hn zm<|()UU@@RuWSf-;Y%NcX-wKh>Z~wI^yTSh;zEnVKS!N~>)HlXUdVP-^zE(=-d=rZ zTcQ-6I?c9aL8MjmL`da8(`fjIur0?_ZRJ$Y+?O78xg9Gbct$C+RY<XrCOTNgrSHRb3Qw(OsE>CKW)YQ$WWpB=3HP%~KP;gkRWyF#QaWzd$ zGN>l-bu%S8{N!kYV*CZb&Uw}^l~5FrQJ>hGK@TuHl^#V$OpB; zaOv$5R}A5;WW9+d8_hghxbZ$su)%H3&LXYuMhmaBehMmAFhb!94M{7Db%Cf0jmS*f~JFn1xHx-g|o^>-rS0DCRV~0E)8@`gJF_ds# zt2hspP|622-k|1|*qk-~N>&9FoEhF}je}GH;fNb@j0zU8`fh^Fh78j!1sORBMPqhk zN@TR)lgQ%Wv%L{=Xka1@8ooYmJ?Rt!`yyJgC7ENEAV6+3yN0i!OBHPsdeh4gs_76{ zCc%HpxNA~2*Q78csVj}IoV_@ils^+v6eppz5vMicP93*GI-vE(;#!Q`^#h!|bX3Or z^Ma7r?<47LN={^g4aYGyyRknH+&s!W@Za20EsML0Qo)iuvZzGMcvihR*ss8)?CKcZ z@W%1DPx4wAd#~3U7tm%yTdqIJUu*&IieJpTklQ>!ha7!$wf(!r;o{`3wqx~lnCx0) zG|43_xw3vGYL~F0REBA&#n>c$nBpWKPwBmc1reLT_awPwOi4O8okfaLe7mEuyeQPx zJ%uq?2WCFJ@r!DR$MY%gJgNrHZJxcobe)(%=`EeRTaGx83 zh@B>Xgt^uggq9*gwD=IZRKPj2Wva7`EPV@ywI1a%P)bZ#KRHCiy`r9s;qk2|(mJ*- zDN(LZ)C04#ZSnAN-+D;H%Ff4RsjlK8>(<84zOC%}++u7u^MvEaS&0Od8>qwOw7x>dh=%>^CO|8hT5EMs5+o;B%h_EiN`pPEQ|mh z%P#mQswGib$?UCNSQ?q?q*ze<_R7uss2fvJ$Ir#vhbwpYD#+3^=}|W^k0#S`;EIRi zPgT1V*db)Xku##gE;gmN6&IiX+`UI`GYeY0bnTum1NT3>_y6nH9b9dIOr~H)E3mQL zFdtGk34-7T7eBNSPme#zZ+<}rLy8_mFJ?QL+;JT(a{Bq!soOk*on*CT;%BNTr>EwZOu>I)4J>5R|A9Jq!O zwA=M7OTv~h>9a~utCmg3rn+wcw7)Xd-S0w-I5CiJkMn!Aw1UC|J25_iq@_|zi<>@u zi9AKc3@sTNG62tj{l=a(NjPf9)}2u#qLuj=>0QWdpVcGP+uUC|ohXsQnBK9gQ`7Z1wyn}B6MkMb=D?dIZhMet63q&xv= z0D$4IEi|`w0NPm_fc`BYG5upN+B2Gh9aV;{mzYs_wLe#$4MJ$)LWdH?elp~c^v;3% z2H<+*BP&A}f+z<69<#UL9{aJ(^OG~pEhZhe2BazH@&aKs)>5vVu>j`E0{^7)o_scEm_@RNqoLRpW z_Mz^$A$1fhV_!mqeIDJPvR(8E`Oz00*W`HfKmv2>3(bU0oQsNzV#8v}xa&zZ0GM+8QuNIjgwg9m`fF02P`O1-F zoXB7nd20f3aVDGqW}h!tMu7u(XaCS+Qz&auldf%1VBU$B`|S=PUh3e^1tE-;HhTts zfTa|BY1o*K*&+Ezx$*SojDKOs-gN$y$4UK@LfS7?tQ7Su&!5}rjoJ7I-mvb^ucC+b z&DWnOFK12o%b@#T_of-h=w)XD6<^wRW&}R%&!muqS1dm=j4FhYvK5{7?dBJeC3W-P z$`Nc0)Z|{SQJJooV-Lr!y1T`e|ET2FShihW2n8&zBKs)%4*X1GLE7p~#wa|tf`RvD zT;1$<3n33MgDWSw3x1m?K)&!1PT}jgTQ_p>sQ%&pZ)j_PqXUeLyJ3^c%~zE&9Qxk% zGoR|>{9dMd0(s?w<3L-E2csdAP3lBW^mi40vxKpJ;@3*$u=errJ*%_q>ne*)CwdOu zZ?Ss?Y*DNtmZwhH7D(`qr6`1AJT-D=5^mfgmS)}vY_z2b8JmU0$Fd`uOIW( z$74>$1e0F+_#w-z?h(u-KiP1y9*N4S@NWj`;*b$ctWgD1+c9w@TckKvgoJQgJ_qoUG{AxRFOsw z7~Ydx0GAWtT`Oc)9imm5Him91Me@!iYgaE!xA@VVG9TExzB2F}>D?5wtM}hhbRk=o znC}a>|M@%&1Tn*=jv#v=la+zF^@~SJ)eXSYoG6bi_jIy3 zX|e2b^>CR}U1UP;aPNmyYdn5tmU8@Z`XYCJIMYu`&xUB!<7RN_$ zpY_Ovr6+|(Z!TUGP}R~%lb1+Lc&91)=g;*u@nSe%;9&W(DexeB+ImIUcEBa(Zt-@U7hq28M;H|osJ)2qV^^KObQX4rNwIT|q0i94Em z*akVUffdvVhDJQm3ik`MWU{^!o8nH{;B%auDudy79v9%$ABbmbG9Gc8*cNB>&jgv69<%i2 zNK`7aHoq6Y|B!ObinBb5*1Lwt5SPO*fobnL#+;^$9od%g@!ZMKh|mIpnpJCj0`qH? zFlo^d4z$z)vFkagS;qiw9iN5E!a8R}oqo*h=af_ASB+^n9x8bGK|udDF2xqt5~9xY z6JM~N;5P2D$EP3phhj9}F$FzP99@&D!W6=I=r)JJF>R4~GJ5QrPb^X?p0ItH2d198 z3i~9udNbNzZQ?*ezQ%q7ehvX&R?nZG#e<=g9$}on(By^B+4MI6rI)qT_VLA3O=KcW z#yR|s0{bleGEJZ>#><%4&3Z@H^3JmBH|NOOZnm`rfL~Xj=#b}9dk?Z~QaE%rgeHH! z$(NVnt`q3qv(G!Lte-ovMoLu@h2ylQD7-jUIKPGS3L1f^gZ5(w^PlCSZWiyW+PD~8 z71gw|8MQ?OUPS?>gJRX%9VP zVP5<>fWDt*lJ1zqqT6)ZJJR^0JRk%%$z@rt!vEN91YHv{w!lJC#Y|DBKG@q$l8Ot4s@13#V4}-UDhs2M)&bhxY>ylLr_rx}7&2yhd+DCw4Lr0~o4Xh?!*&xOrGR0Q9OOX_4o;qiNQSRc62u$fn3X{eqO30u>Vq`9=+ zBDLJi@?IA{GUv$jVChRV@7@{bcn<_YRnKuVE6gRot^C98%0!L3ltczgL4hSV8nQ%7 z*^x9m0*6mYg%hIW^9pBB8OK@%+Xl&Ho&=eTk~D=L8kT=-D}Cr1naJX7^4dm#at>%! zx0h;zOpiWp!~gc+W5-BbCq7971*~q`)m7cnp%`@zI7mYV~ZNkSrN(!nkYh z&>*-NvsX~*f`u1K@Vai`rZ+7JT z{yPGHATfgBm>E7R5Q8bK_AU!u`qziqC#DK;e<`Z2h;~dL~u6o@ha=Lk80lKw!;o=<&p5$#W@#YWg zlWv2yPTXiob+&Tnfq6@_e8S*17~j=$9a=rg_L*n0RAysNuLC&u6fl%)XRD5GdNK#o zq0sVu&JnzQ+#A(9!$9Az`+kJa$fW}<@r{#DZG z+Jv(*sp^IGNdpvl9zW;7L!gT$;IP4x0e|Yc&pP1R12f~|oBD_s;%ey2z)s>1z$A5V zSJcH}Q(BC<1B|Qj(KvWtJ#-E<16ClgG?ceq@fr+_5G9&Rzubdycvs z(%rQS)fMIH^yh!V5phlwF6Q;f;5oHbO!9wR=PE$Wyi=G;lgtNM@J5MndI-#oRW1R_Wa zF+GzsGi5NsyPx%HSwc1x>DmwN^}e!Cbu~A#Tn=0MnbaPfSe7|lyHxCSDQXg&#Sdx2C znTsC#mp;B}fL?mIy>*q>Gv>HavqJ7>`!#Cc>Xrcm9${~Wt2ZX0tsYOl zLvZ@oKknnzoZ=-+Z*8TynGo6vg5jt2sHXTRLCNi_A#5J;(KlM_KKoLMIOK{>LmCb} zXffJ54zMw9zK}O;4kBDRXJ8PX>EEZWIqJEY7?nG>G$qSN1*|8Hp8h^DXm`0i#(HKVtO`JOKdbR4$!k z=r|H)5mlGB5pU1{3Z)ExnX$o-y_<-HrM?J=jdWP2mq<^D)%mka?rFLKS3Jj7q63m6 z5${d=fX9;LLhjrcRdc?h9T9{olkt*xJP0)Sk)Sz{&;$ zWcX*N=xSgEVv7($f+R%{yH3ecx7qBn!BwvK7!5%REd~Zdth8P|CwdH^T!k38Vf~l zll#Rh;mHto^5ed2BO@gTH{}jeQ3e7M1M2VZtG)b!|9j>5{Nv+accT8dz4m(k>pNBd zoDBfv_@uma&;OqP|G#1N8v0u5{R66n`XA8O!tZO$YeDu8%>7?u{w2@82EEoP{{S(d z|D`?uty;cDz7`h$K#u)2^6zEF*XY-}&>v_^tiQDOztR6xiC%+W>mq-Ed$9jU@V``& z*XY-}!XM~0oWDf>m&)*(___l9g9uCfx5Pi!qpy*#Q~e*vU!;GF{IA6Sn)>@8y@0q>S<(-~Jz&>XM!S literal 0 HcmV?d00001 diff --git a/archives/backbeat_control_plane_pulse_with_tempo_ramping_go_nats.go b/archives/backbeat_control_plane_pulse_with_tempo_ramping_go_nats.go new file mode 100644 index 0000000..02d5f94 --- /dev/null +++ b/archives/backbeat_control_plane_pulse_with_tempo_ramping_go_nats.go @@ -0,0 +1,151 @@ +// Backbeat Control Plane — Pulse with Tempo Ramping (Go + NATS) +// ------------------------------------------------------------- +// Adds a control subscriber to Pulse to handle tempo/phase updates that apply on the next downbeat. +// Control messages (JSON) on subject: backbeat..control +// {"cmd":"set_bpm", "bpm":12} +// {"cmd":"ramp_bpm", "to":18, "beats":16, "easing":"linear"} +// {"cmd":"set_phases", "phases":{"plan":2,"work":4,"review":2}} +// {"cmd":"freeze", "duration_beats": 32} +// {"cmd":"unfreeze"} +// +// NOTE: This is built against the relative-beats v0.1.1 types. + +// ------------------------- +// File: go.mod +// ------------------------- +module github.com/chorus-services/backbeat-control-pulse + +go 1.22 + +require ( + github.com/nats-io/nats.go v1.36.0 +) + +// ------------------------- +// File: cmd/pulse/main.go +// ------------------------- +package main + +import ( + "encoding/json" + "flag" + "fmt" + "log" + "strings" + "time" + + "github.com/nats-io/nats.go" +) + +type beatFrame struct { + ClusterID string `json:"cluster_id"` + TempoBPM int `json:"tempo_bpm"` + BeatMS int `json:"beat_ms"` + BarLenBeats int `json:"bar_len_beats"` + BeatIndex int `json:"beat_index"` + BeatEpoch time.Time `json:"beat_epoch"` + Downbeat bool `json:"downbeat"` + Phase string `json:"phase"` + PolicyHash string `json:"policy_hash"` + DeadlineAt time.Time `json:"deadline_at"` +} + +type controlMsg struct { + Cmd string `json:"cmd"` + BPM int `json:"bpm,omitempty"` + To int `json:"to,omitempty"` + Beats int `json:"beats,omitempty"` + Easing string `json:"easing,omitempty"` + Phases map[string]int `json:"phases,omitempty"` + Freeze bool `json:"freeze,omitempty"` + DurationBeats int `json:"duration_beats,omitempty"` +} + +type ack struct { Ok bool `json:"ok"`; ApplyAtDownbeat bool `json:"apply_at_downbeat"`; PolicyHash string `json:"policy_hash"` } + +func floorToMultiple(t time.Time, d time.Duration) time.Time { q := t.UnixNano() / int64(d); return time.Unix(0, q*int64(d)).UTC() } + +func main() { + cluster := flag.String("cluster", "chorus-aus-01", "cluster id") + bpm := flag.Int("bpm", 8, "initial bpm") + bar := flag.Int("bar", 8, "beats per bar") + phasesStr := flag.String("phases", "plan,work,review", "comma phases") + natsURL := flag.String("nats", nats.DefaultURL, "nats url") + minBPM := flag.Int("min-bpm", 4, "min bpm") + maxBPM := flag.Int("max-bpm", 24, "max bpm") + flag.Parse() + + phases := strings.Split(*phasesStr, ",") + if len(phases) == 0 { log.Fatal("need phases") } + + nc, err := nats.Connect(*natsURL) + if err != nil { log.Fatal(err) } + defer nc.Drain() + + beatMS := func(bpm int) int { return int(float64(60_000)/float64(bpm)) } + curBPM := *bpm + curBeatMS := beatMS(curBPM) + barLen := *bar + beatIndex := 1 + frozenBeats := 0 + + // pending changes applied at next downbeat + pendingBPM := curBPM + pendingPhases := phases + + // Control subscriber + nc.Subscribe(fmt.Sprintf("backbeat.%s.control", *cluster), func(m *nats.Msg) { + var c controlMsg; if err := json.Unmarshal(m.Data, &c); err != nil { return } + switch c.Cmd { + case "set_bpm": + if c.BPM < *minBPM || c.BPM > *maxBPM { nc.Publish(m.Reply, []byte(`{"ok":false}`)); return } + pendingBPM = c.BPM + case "ramp_bpm": + if c.To < *minBPM || c.To > *maxBPM || c.Beats <= 0 { nc.Publish(m.Reply, []byte(`{"ok":false}`)); return } + // simple linear ramp encoded as per-beat delta to apply across future downbeats + pendingBPM = c.To + case "set_phases": + if len(c.Phases) > 0 { + // order as provided for prototype (production should validate sums) + pendingPhases = make([]string, 0, len(c.Phases)) + for name, n := range c.Phases { for i:=0; i 0 { frozenBeats-- } else { + if pendingBPM != curBPM { curBPM = pendingBPM; curBeatMS = beatMS(curBPM); t.Reset(time.Duration(curBeatMS) * time.Millisecond) } + phases = pendingPhases + } + } + phase := phases[(beatIndex-1)%len(phases)] + bf := beatFrame{ + ClusterID: *cluster, + TempoBPM: curBPM, + BeatMS: curBeatMS, + BarLenBeats: barLen, + BeatIndex: beatIndex, + BeatEpoch: floorToMultiple(now, time.Duration(curBeatMS)*time.Millisecond), + Downbeat: beatIndex==1, + Phase: phase, + PolicyHash:"proto", + DeadlineAt: now.Add(time.Duration(curBeatMS) * time.Millisecond), + } + payload, _ := json.Marshal(bf) + nc.Publish(fmt.Sprintf("backbeat.%s.beat", *cluster), payload) + beatIndex++; if beatIndex > barLen { beatIndex = 1 } + } +} diff --git a/archives/backbeat_prototype_go_nats_pulse_reverb_agent_sim.go b/archives/backbeat_prototype_go_nats_pulse_reverb_agent_sim.go new file mode 100644 index 0000000..715ab53 --- /dev/null +++ b/archives/backbeat_prototype_go_nats_pulse_reverb_agent_sim.go @@ -0,0 +1,411 @@ +// Backbeat Prototype (Go + NATS) +// -------------------------------- +// Minimal working prototype of the Backbeat Protocol with three binaries: +// - pulse: broadcaster of BeatFrames at a given BPM & bar length +// - reverb: aggregator that ingests StatusClaims and emits BarReports +// - agent-sim: reference agent that enforces a simple Score and publishes StatusClaims +// Transport: NATS (stand‑in for COOEE). Swap to your transport later. +// +// QUICKSTART +// 1) Start NATS (docker compose below) — or use an existing server. +// 2) Build all: make build +// 3) Run Pulse: ./bin/pulse -cluster chorus-aus-01 -bpm 12 -bar 8 -phases plan,work,review +// 4) Run Reverb: ./bin/reverb -cluster chorus-aus-01 +// 5) Run Agent: ./bin/agent-sim -cluster chorus-aus-01 -score ./configs/sample-score.yaml -id bzzz-1 +// (run multiple agents with different -id) +// Observe: Reverb prints per‑bar rollups; Agents print phase cutoffs and wait budget behaviour. +// +// To integrate into CHORUS: replace NATS subjects with COOEE/DHT topics and wire WHOOSH meters. + +// ------------------------- +// File: go.mod +// ------------------------- +module github.com/chorus-services/backbeat-prototype + +go 1.22 + +require ( + github.com/nats-io/nats.go v1.36.0 + gopkg.in/yaml.v3 v3.0.1 +) + +// ------------------------- +// File: internal/backbeat/types.go +// ------------------------- +package backbeat + +import "time" + +// BeatFrame: emitted by Pulse each beat +// JSON kept small and explicit for transport stability. +type BeatFrame struct { + ClusterID string `json:"cluster_id"` + TempoBPM int `json:"tempo_bpm"` + BeatMS int `json:"beat_ms"` + BarLenBeats int `json:"bar_len_beats"` + Bar int64 `json:"bar"` + Beat int `json:"beat"` + Phase string `json:"phase"` + HLC string `json:"hlc"` + PolicyHash string `json:"policy_hash"` + DeadlineAt time.Time `json:"deadline_at"` +} + +// StatusClaim: emitted by agents each beat (or on state change) +type StatusClaim struct { + AgentID string `json:"agent_id"` + TaskID string `json:"task_id"` + Bar int64 `json:"bar"` + Beat int `json:"beat"` + State string `json:"state"` // planning|executing|waiting|review|done|failed + WaitFor []string `json:"wait_for"` + BeatsLeft int `json:"beats_left"` + Progress float64 `json:"progress"` + Notes string `json:"notes"` + HLC string `json:"hlc"` +} + +// BarReport: aggregated by Reverb each bar +type BarReport struct { + ClusterID string `json:"cluster_id"` + Bar int64 `json:"bar"` + Counts map[string]int `json:"counts"` // by State + Overruns int `json:"overruns"` + BrokenPromises int `json:"broken_promises"` + Suggestions map[string]string `json:"suggestions"` // simple hints +} + +// Score & Budget specs (trimmed) +type Score struct { + Tempo int `yaml:"tempo"` + BarLen int `yaml:"bar_len"` + Phases map[string]int `yaml:"phases"` // beats per phase + WaitBudget struct { + Help int `yaml:"help"` + IO int `yaml:"io"` + } `yaml:"wait_budget"` + Retry struct { + MaxPhrases int `yaml:"max_phrases"` + Backoff string `yaml:"backoff"` + } `yaml:"retry"` +} + +// ------------------------- +// File: internal/backbeat/hlc.go +// ------------------------- +package backbeat + +import ( + "fmt" + "sync" + "time" +) + +type HLC struct { + mu sync.Mutex + pt time.Time // last physical time observed + lc int64 // logical counter +} + +func NewHLC() *HLC { return &HLC{pt: time.Now().UTC()} } + +func (h *HLC) Next() string { + h.mu.Lock() + defer h.mu.Unlock() + now := time.Now().UTC() + if now.After(h.pt) { + h.pt = now + h.lc = 0 + } else { + h.lc++ + } + return fmt.Sprintf("%s+%d", h.pt.Format(time.RFC3339Nano), h.lc) +} + +// ------------------------- +// File: internal/backbeat/score.go +// ------------------------- +package backbeat + +import ( + "errors" +) + +// PhaseFor returns the phase name for a given bar/beat (1-indexed beat). +func PhaseFor(phases map[string]int, beat int) (string, error) { + acc := 0 + for name, n := range phases { + acc += n + if beat <= acc { return name, nil } + } + return "", errors.New("beat out of range for phases") +} + +// ------------------------- +// File: cmd/pulse/main.go +// ------------------------- +package main + +import ( + "encoding/json" + "flag" + "fmt" + "log" + "strings" + "time" + + "github.com/nats-io/nats.go" + bb "github.com/chorus-services/backbeat-prototype/internal/backbeat" +) + +func main() { + cluster := flag.String("cluster", "chorus-aus-01", "cluster id") + bpm := flag.Int("bpm", 12, "tempo BPM") + bar := flag.Int("bar", 8, "beats per bar") + phasesStr := flag.String("phases", "plan,work,review", "comma phases, lengths inferred evenly if unspecified") + natsURL := flag.String("nats", nats.DefaultURL, "nats url") + flag.Parse() + + phases := strings.Split(*phasesStr, ",") + if len(phases) == 0 { log.Fatal("need phases") } + + nc, err := nats.Connect(*natsURL) + if err != nil { log.Fatal(err) } + defer nc.Drain() + + hlc := bb.NewHLC() + beatMS := int(float64(60_000) / float64(*bpm)) + barLen := *bar + barNo := int64(1) + beat := 1 + + t := time.NewTicker(time.Duration(beatMS) * time.Millisecond) + log.Printf("Pulse started: cluster=%s bpm=%d bar=%d beat_ms=%d\n", *cluster, *bpm, barLen, beatMS) + for now := range t.C { + // compute phase by distributing beats across phases evenly for prototype + phaseIdx := (beat-1) % len(phases) + bf := bb.BeatFrame{ + ClusterID: *cluster, + TempoBPM: *bpm, + BeatMS: beatMS, + BarLenBeats: barLen, + Bar: barNo, + Beat: beat, + Phase: phases[phaseIdx], + HLC: hlc.Next(), + PolicyHash: "proto", + DeadlineAt: now.Add(time.Duration(beatMS) * time.Millisecond), + } + payload, _ := json.Marshal(bf) + subject := fmt.Sprintf("backbeat.%s.beat", *cluster) + if err := nc.Publish(subject, payload); err != nil { log.Println("publish error:", err) } + if beat == 1 { log.Printf("downbeat bar=%d\n", barNo) } + beat++ + if beat > barLen { beat = 1; barNo++ } + } +} + +// ------------------------- +// File: cmd/reverb/main.go +// ------------------------- +package main + +import ( + "encoding/json" + "flag" + "fmt" + "log" + "time" + + "github.com/nats-io/nats.go" + bb "github.com/chorus-services/backbeat-prototype/internal/backbeat" +) + +type agg struct { + bar int64 + counts map[string]int +} + +func main() { + cluster := flag.String("cluster", "chorus-aus-01", "cluster id") + natsURL := flag.String("nats", nats.DefaultURL, "nats url") + flag.Parse() + + nc, err := nats.Connect(*natsURL) + if err != nil { log.Fatal(err) } + defer nc.Drain() + + a := &agg{bar: -1, counts: map[string]int{}} + + // subscribe to beats to know bar boundaries + _, _ = nc.Subscribe(fmt.Sprintf("backbeat.%s.beat", *cluster), func(m *nats.Msg) { + var bf bb.BeatFrame + if err := json.Unmarshal(m.Data, &bf); err != nil { return } + if a.bar == -1 { a.bar = bf.Bar } + if bf.Bar != a.bar { // flush report for previous bar + report := bb.BarReport{ClusterID: *cluster, Bar: a.bar, Counts: a.counts, Suggestions: map[string]string{}} + out, _ := json.MarshalIndent(report, "", " ") + log.Printf("BAR REPORT %d\n%s\n", a.bar, string(out)) + // reset for new bar + a.bar = bf.Bar + a.counts = map[string]int{} + } + }) + + // subscribe to status claims + _, _ = nc.Subscribe("backbeat.status.*", func(m *nats.Msg) { + var sc bb.StatusClaim + if err := json.Unmarshal(m.Data, &sc); err != nil { return } + if sc.State == "" { sc.State = "unknown" } + a.counts[sc.State]++ + }) + + log.Printf("Reverb started for cluster=%s\n", *cluster) + for { time.Sleep(10 * time.Second) } +} + +// ------------------------- +// File: cmd/agent-sim/main.go +// ------------------------- +package main + +import ( + "encoding/json" + "flag" + "fmt" + "log" + "math/rand" + "os" + "time" + + "github.com/nats-io/nats.go" + "gopkg.in/yaml.v3" + bb "github.com/chorus-services/backbeat-prototype/internal/backbeat" +) + +func main() { + cluster := flag.String("cluster", "chorus-aus-01", "cluster id") + agentID := flag.String("id", "bzzz-1", "agent id") + scorePath := flag.String("score", "./configs/sample-score.yaml", "score yaml path") + natsURL := flag.String("nats", nats.DefaultURL, "nats url") + flag.Parse() + + buf, err := os.ReadFile(*scorePath) + if err != nil { log.Fatal(err) } + var score bb.Score + if err := yaml.Unmarshal(buf, &score); err != nil { log.Fatal(err) } + + nc, err := nats.Connect(*natsURL) + if err != nil { log.Fatal(err) } + defer nc.Drain() + + hlc := bb.NewHLC() + state := "planning" + waiting := 0 + beatsLeft := 0 + + _, _ = nc.Subscribe(fmt.Sprintf("backbeat.%s.beat", *cluster), func(m *nats.Msg) { + var bf bb.BeatFrame + if err := json.Unmarshal(m.Data, &bf); err != nil { return } + + // Phase from prototype score map using the incoming beat index + phase, _ := bb.PhaseFor(score.Phases, bf.Beat) + switch phase { + case "plan": + state = "planning" + beatsLeft = 0 + case "work": + // simple behaviour: sometimes request help and wait up to wait_budget.help beats + if waiting == 0 && rand.Float64() < 0.3 { + waiting = 1 + } + if waiting > 0 { + state = "waiting" + beatsLeft = score.WaitBudget.Help - waiting + waiting++ + if waiting > score.WaitBudget.Help { // give up + state = "executing" // fallback path + waiting = 0 + } + } else { + state = "executing" + beatsLeft = 0 + } + case "review": + state = "review" + waiting = 0 + beatsLeft = 0 + } + + sc := bb.StatusClaim{ + AgentID: *agentID, + TaskID: "ucxl://demo/task", + Bar: bf.Bar, + Beat: bf.Beat, + State: state, + WaitFor: nil, + BeatsLeft: beatsLeft, + Progress: rand.Float64(), + Notes: "proto", + HLC: hlc.Next(), + } + payload, _ := json.Marshal(sc) + nc.Publish("backbeat.status."+*agentID, payload) + }) + + log.Printf("AgentSim %s started (cluster=%s)\n", *agentID, *cluster) + for { time.Sleep(10 * time.Second) } +} + +// ------------------------- +// File: configs/sample-score.yaml +// ------------------------- +score: + tempo: 12 + bar_len: 8 + phases: + plan: 2 + work: 4 + review: 2 + wait_budget: + help: 2 + io: 1 + retry: + max_phrases: 2 + backoff: geometric + +# NOTE: AgentSim only reads the nested fields for this prototype. + +// ------------------------- +// File: docker-compose.yml +// ------------------------- +version: "3.8" +services: + nats: + image: nats:2.10-alpine + command: ["-js"] + ports: ["4222:4222", "8222:8222"] + +// ------------------------- +// File: Makefile +// ------------------------- +BIN=bin + +.PHONY: build run-pulse run-reverb run-agent one + +build: + mkdir -p $(BIN) + GO111MODULE=on go build -o $(BIN)/pulse ./cmd/pulse + GO111MODULE=on go build -o $(BIN)/reverb ./cmd/reverb + GO111MODULE=on go build -o $(BIN)/agent-sim ./cmd/agent-sim + +run-pulse: + ./bin/pulse -cluster chorus-aus-01 -bpm 12 -bar 8 -phases plan,work,review + +run-reverb: + ./bin/reverb -cluster chorus-aus-01 + +run-agent: + ./bin/agent-sim -cluster chorus-aus-01 -score ./configs/sample-score.yaml -id bzzz-1 + +one: build run-pulse diff --git a/requirements/2.0.0.md b/requirements/2.0.0.md new file mode 100644 index 0000000..745923f --- /dev/null +++ b/requirements/2.0.0.md @@ -0,0 +1,9 @@ +# BACKBEAT — Requirements 2.0.0 + +Primary: DistSys, Protocol, Backend. Support: Security, SRE. + +- BACKBEAT-INT-001: Publish INT-A/B/C schemas, golden samples, and importable tests in backbeat-contracts. +- BACKBEAT-REQ-002: Provide reference parsers/validators (JSONSchema + language stubs). +- BACKBEAT-OBS-003: Contract conformance runner usable in CI across modules. +- BACKBEAT-COMP-004: Mixed-client compatibility tests (UCXL v1.0/v1.1, warning behavior). +