diff --git a/chrs-backbeat-demo/Cargo.toml b/chrs-backbeat-demo/Cargo.toml new file mode 100644 index 00000000..adf48bd6 --- /dev/null +++ b/chrs-backbeat-demo/Cargo.toml @@ -0,0 +1,14 @@ +[package] +name = "chrs-backbeat-demo" +version = "0.1.0" +edition = "2021" + +[dependencies] +chrs-agent = { path = "../chrs-agent" } +chrs-backbeat = { path = "../chrs-backbeat" } +chrs-mail = { path = "../chrs-mail" } +chrs-council = { path = "../chrs-council" } +tokio = { version = "1", features = ["full"] } +serde_json = "1.0" +uuid = { version = "1.0", features = ["v4"] } +chrono = "0.4" diff --git a/chrs-backbeat-demo/src/main.rs b/chrs-backbeat-demo/src/main.rs new file mode 100644 index 00000000..9ebd3913 --- /dev/null +++ b/chrs-backbeat-demo/src/main.rs @@ -0,0 +1,67 @@ +use chrs_agent::CHORUSAgent; +use chrs_backbeat::{Pulse, Reverb}; +use chrs_council::Role; +use chrs_mail::Mailbox; +use std::fs; +use std::path::Path; +use tokio::time::{sleep, Duration}; +use chrono::Utc; + +#[tokio::main] +async fn main() -> Result<(), Box> { + println!("=== CHORUS BACKBEAT Rhythmic Orchestration Demo ==="); + + // 1. Setup shared environment + let base_path = Path::new("/tmp/chrs_backbeat_demo"); + if base_path.exists() { + fs::remove_dir_all(base_path)?; + } + fs::create_dir_all(base_path)?; + + let mail_db = base_path.join("mail.sqlite"); + let shared_mailbox = Mailbox::open(&mail_db)?; + + // 2. Initialize Pulse (Broadcaster) + let pulse = Pulse::new("chorus-cluster-01", 30, shared_mailbox.clone()); // 30 BPM = 2s per beat + + // 3. Initialize Agents + let mut architect = CHORUSAgent::init("agent-architect", Role::Architect, &base_path.join("architect")).await?; + let mut coder = CHORUSAgent::init("agent-coder", Role::Coder, &base_path.join("coder")).await?; + let mut auditor = CHORUSAgent::init("agent-auditor", Role::Auditor, &base_path.join("auditor")).await?; + + architect.mailbox = shared_mailbox.clone(); + coder.mailbox = shared_mailbox.clone(); + auditor.mailbox = shared_mailbox.clone(); + + // 4. Initialize Reverb (Rollup) + let reverb = Reverb::new(shared_mailbox.clone()); + + // 5. Start Everything + let pulse_handle = tokio::spawn(async move { pulse.run().await }); + let mut arch_handle_inner = architect; + tokio::spawn(async move { arch_handle_inner.run_loop().await }); + let mut coder_handle_inner = coder; + tokio::spawn(async move { coder_handle_inner.run_loop().await }); + let mut aud_handle_inner = auditor; + tokio::spawn(async move { aud_handle_inner.run_loop().await }); + + println!("[DEMO] Pulse and Agents started. Observing for 20 seconds..."); + + let start_time = Utc::now(); + for _ in 0..10 { + sleep(Duration::from_secs(2)).await; + match reverb.collect_report(start_time) { + Ok(claims) => { + println!("[REVERB] Received {} status claims in this window.", claims.len()); + for claim in claims { + // println!(" - Agent {}: Beat {}, State: {}", claim.agent_id, claim.beat_index, claim.state); + } + } + Err(e) => eprintln!("[REVERB] Error: {}", e), + } + } + + println!(" +=== DEMO COMPLETE ==="); + Ok(()) +} diff --git a/chrs-backbeat/Cargo.toml b/chrs-backbeat/Cargo.toml new file mode 100644 index 00000000..907d525a --- /dev/null +++ b/chrs-backbeat/Cargo.toml @@ -0,0 +1,13 @@ +[package] +name = "chrs-backbeat" +version = "0.1.0" +edition = "2021" + +[dependencies] +chrs-mail = { path = "../chrs-mail" } +serde = { version = "1.0", features = ["derive"] } +serde_json = "1.0" +thiserror = "1.0" +chrono = { version = "0.4", features = ["serde"] } +tokio = { version = "1.0", features = ["full"] } +uuid = { version = "1.0", features = ["v4", "serde"] } diff --git a/chrs-backbeat/src/lib.rs b/chrs-backbeat/src/lib.rs new file mode 100644 index 00000000..d3e09110 --- /dev/null +++ b/chrs-backbeat/src/lib.rs @@ -0,0 +1,110 @@ +//! chrs-backbeat: Temporal Orchestration (Pulse/Reverb) for CHORUS. + +use chrs_mail::{Mailbox, Message}; +use chrono::{DateTime, Utc}; +use serde::{Deserialize, Serialize}; +use thiserror::Error; +use uuid::Uuid; +use std::time::Duration; +use tokio::time::sleep; + +/// Represents a single rhythmic unit in the CHORUS cluster. +#[derive(Debug, Serialize, Deserialize, Clone)] +pub struct BeatFrame { + pub cluster_id: String, + pub tempo_bpm: u32, + pub beat_index: u32, + pub beat_epoch: DateTime, + pub downbeat: bool, + pub phase: String, +} + +/// Represents an agent's reported status relative to a beat. +#[derive(Debug, Serialize, Deserialize, Clone)] +pub struct StatusClaim { + pub agent_id: String, + pub task_id: Option, + pub beat_index: u32, + pub state: String, + pub progress: f32, +} + +#[derive(Debug, Error)] +pub enum BackbeatError { + #[error("Mailbox error: {0}")] + Mailbox(#[from] chrs_mail::MailError), +} + +/// The Pulse component broadcasts the cluster rhythm. +pub struct Pulse { + pub cluster_id: String, + pub tempo_bpm: u32, + mailbox: Mailbox, +} + +impl Pulse { + pub fn new(cluster_id: &str, tempo_bpm: u32, mailbox: Mailbox) -> Self { + Self { + cluster_id: cluster_id.into(), + tempo_bpm, + mailbox, + } + } + + /// Starts the pulse loop, emitting a BeatFrame every beat. + pub async fn run(&self) -> Result<(), BackbeatError> { + let mut beat_index = 1; + let beat_duration = Duration::from_secs(60) / self.tempo_bpm; + + println!("[PULSE] Starting at {} BPM ({:?} per beat)", self.tempo_bpm, beat_duration); + + loop { + let now = Utc::now(); + let frame = BeatFrame { + cluster_id: self.cluster_id.clone(), + tempo_bpm: self.tempo_bpm, + beat_index, + beat_epoch: now, + downbeat: beat_index == 1, + phase: "active".into(), // Simplified for POC + }; + + let msg = Message { + id: Uuid::new_v4(), + from_peer: "pulse".into(), + to_peer: "council".into(), + topic: "beat_frame".into(), + payload: serde_json::to_value(&frame).unwrap(), + sent_at: now, + read_at: None, + }; + + self.mailbox.send(&msg)?; + + // Increment and cycle beat_index (default bar length = 8) + beat_index = if beat_index >= 8 { 1 } else { beat_index + 1 }; + + sleep(beat_duration).await; + } + } +} + +/// The Reverb component aggregates status claims into bar reports. +pub struct Reverb { + mailbox: Mailbox, +} + +impl Reverb { + pub fn new(mailbox: Mailbox) -> Self { + Self { mailbox } + } + + /// Collects and summarizes status claims since a given time. + pub fn collect_report(&self, since: DateTime) -> Result, BackbeatError> { + let messages = self.mailbox.receive_broadcasts("status_claim", since)?; + let claims: Vec = messages.into_iter() + .filter_map(|m| serde_json::from_value(m.payload).ok()) + .collect(); + Ok(claims) + } +}