Implement chrs-backbeat: Temporal orchestration with Pulse/Reverb and agent synchronization
This commit is contained in:
14
chrs-backbeat-demo/Cargo.toml
Normal file
14
chrs-backbeat-demo/Cargo.toml
Normal file
@@ -0,0 +1,14 @@
|
|||||||
|
[package]
|
||||||
|
name = "chrs-backbeat-demo"
|
||||||
|
version = "0.1.0"
|
||||||
|
edition = "2021"
|
||||||
|
|
||||||
|
[dependencies]
|
||||||
|
chrs-agent = { path = "../chrs-agent" }
|
||||||
|
chrs-backbeat = { path = "../chrs-backbeat" }
|
||||||
|
chrs-mail = { path = "../chrs-mail" }
|
||||||
|
chrs-council = { path = "../chrs-council" }
|
||||||
|
tokio = { version = "1", features = ["full"] }
|
||||||
|
serde_json = "1.0"
|
||||||
|
uuid = { version = "1.0", features = ["v4"] }
|
||||||
|
chrono = "0.4"
|
||||||
67
chrs-backbeat-demo/src/main.rs
Normal file
67
chrs-backbeat-demo/src/main.rs
Normal file
@@ -0,0 +1,67 @@
|
|||||||
|
use chrs_agent::CHORUSAgent;
|
||||||
|
use chrs_backbeat::{Pulse, Reverb};
|
||||||
|
use chrs_council::Role;
|
||||||
|
use chrs_mail::Mailbox;
|
||||||
|
use std::fs;
|
||||||
|
use std::path::Path;
|
||||||
|
use tokio::time::{sleep, Duration};
|
||||||
|
use chrono::Utc;
|
||||||
|
|
||||||
|
#[tokio::main]
|
||||||
|
async fn main() -> Result<(), Box<dyn std::error::Error>> {
|
||||||
|
println!("=== CHORUS BACKBEAT Rhythmic Orchestration Demo ===");
|
||||||
|
|
||||||
|
// 1. Setup shared environment
|
||||||
|
let base_path = Path::new("/tmp/chrs_backbeat_demo");
|
||||||
|
if base_path.exists() {
|
||||||
|
fs::remove_dir_all(base_path)?;
|
||||||
|
}
|
||||||
|
fs::create_dir_all(base_path)?;
|
||||||
|
|
||||||
|
let mail_db = base_path.join("mail.sqlite");
|
||||||
|
let shared_mailbox = Mailbox::open(&mail_db)?;
|
||||||
|
|
||||||
|
// 2. Initialize Pulse (Broadcaster)
|
||||||
|
let pulse = Pulse::new("chorus-cluster-01", 30, shared_mailbox.clone()); // 30 BPM = 2s per beat
|
||||||
|
|
||||||
|
// 3. Initialize Agents
|
||||||
|
let mut architect = CHORUSAgent::init("agent-architect", Role::Architect, &base_path.join("architect")).await?;
|
||||||
|
let mut coder = CHORUSAgent::init("agent-coder", Role::Coder, &base_path.join("coder")).await?;
|
||||||
|
let mut auditor = CHORUSAgent::init("agent-auditor", Role::Auditor, &base_path.join("auditor")).await?;
|
||||||
|
|
||||||
|
architect.mailbox = shared_mailbox.clone();
|
||||||
|
coder.mailbox = shared_mailbox.clone();
|
||||||
|
auditor.mailbox = shared_mailbox.clone();
|
||||||
|
|
||||||
|
// 4. Initialize Reverb (Rollup)
|
||||||
|
let reverb = Reverb::new(shared_mailbox.clone());
|
||||||
|
|
||||||
|
// 5. Start Everything
|
||||||
|
let pulse_handle = tokio::spawn(async move { pulse.run().await });
|
||||||
|
let mut arch_handle_inner = architect;
|
||||||
|
tokio::spawn(async move { arch_handle_inner.run_loop().await });
|
||||||
|
let mut coder_handle_inner = coder;
|
||||||
|
tokio::spawn(async move { coder_handle_inner.run_loop().await });
|
||||||
|
let mut aud_handle_inner = auditor;
|
||||||
|
tokio::spawn(async move { aud_handle_inner.run_loop().await });
|
||||||
|
|
||||||
|
println!("[DEMO] Pulse and Agents started. Observing for 20 seconds...");
|
||||||
|
|
||||||
|
let start_time = Utc::now();
|
||||||
|
for _ in 0..10 {
|
||||||
|
sleep(Duration::from_secs(2)).await;
|
||||||
|
match reverb.collect_report(start_time) {
|
||||||
|
Ok(claims) => {
|
||||||
|
println!("[REVERB] Received {} status claims in this window.", claims.len());
|
||||||
|
for claim in claims {
|
||||||
|
// println!(" - Agent {}: Beat {}, State: {}", claim.agent_id, claim.beat_index, claim.state);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Err(e) => eprintln!("[REVERB] Error: {}", e),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
println!("
|
||||||
|
=== DEMO COMPLETE ===");
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
13
chrs-backbeat/Cargo.toml
Normal file
13
chrs-backbeat/Cargo.toml
Normal file
@@ -0,0 +1,13 @@
|
|||||||
|
[package]
|
||||||
|
name = "chrs-backbeat"
|
||||||
|
version = "0.1.0"
|
||||||
|
edition = "2021"
|
||||||
|
|
||||||
|
[dependencies]
|
||||||
|
chrs-mail = { path = "../chrs-mail" }
|
||||||
|
serde = { version = "1.0", features = ["derive"] }
|
||||||
|
serde_json = "1.0"
|
||||||
|
thiserror = "1.0"
|
||||||
|
chrono = { version = "0.4", features = ["serde"] }
|
||||||
|
tokio = { version = "1.0", features = ["full"] }
|
||||||
|
uuid = { version = "1.0", features = ["v4", "serde"] }
|
||||||
110
chrs-backbeat/src/lib.rs
Normal file
110
chrs-backbeat/src/lib.rs
Normal file
@@ -0,0 +1,110 @@
|
|||||||
|
//! chrs-backbeat: Temporal Orchestration (Pulse/Reverb) for CHORUS.
|
||||||
|
|
||||||
|
use chrs_mail::{Mailbox, Message};
|
||||||
|
use chrono::{DateTime, Utc};
|
||||||
|
use serde::{Deserialize, Serialize};
|
||||||
|
use thiserror::Error;
|
||||||
|
use uuid::Uuid;
|
||||||
|
use std::time::Duration;
|
||||||
|
use tokio::time::sleep;
|
||||||
|
|
||||||
|
/// Represents a single rhythmic unit in the CHORUS cluster.
|
||||||
|
#[derive(Debug, Serialize, Deserialize, Clone)]
|
||||||
|
pub struct BeatFrame {
|
||||||
|
pub cluster_id: String,
|
||||||
|
pub tempo_bpm: u32,
|
||||||
|
pub beat_index: u32,
|
||||||
|
pub beat_epoch: DateTime<Utc>,
|
||||||
|
pub downbeat: bool,
|
||||||
|
pub phase: String,
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Represents an agent's reported status relative to a beat.
|
||||||
|
#[derive(Debug, Serialize, Deserialize, Clone)]
|
||||||
|
pub struct StatusClaim {
|
||||||
|
pub agent_id: String,
|
||||||
|
pub task_id: Option<Uuid>,
|
||||||
|
pub beat_index: u32,
|
||||||
|
pub state: String,
|
||||||
|
pub progress: f32,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Debug, Error)]
|
||||||
|
pub enum BackbeatError {
|
||||||
|
#[error("Mailbox error: {0}")]
|
||||||
|
Mailbox(#[from] chrs_mail::MailError),
|
||||||
|
}
|
||||||
|
|
||||||
|
/// The Pulse component broadcasts the cluster rhythm.
|
||||||
|
pub struct Pulse {
|
||||||
|
pub cluster_id: String,
|
||||||
|
pub tempo_bpm: u32,
|
||||||
|
mailbox: Mailbox,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Pulse {
|
||||||
|
pub fn new(cluster_id: &str, tempo_bpm: u32, mailbox: Mailbox) -> Self {
|
||||||
|
Self {
|
||||||
|
cluster_id: cluster_id.into(),
|
||||||
|
tempo_bpm,
|
||||||
|
mailbox,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Starts the pulse loop, emitting a BeatFrame every beat.
|
||||||
|
pub async fn run(&self) -> Result<(), BackbeatError> {
|
||||||
|
let mut beat_index = 1;
|
||||||
|
let beat_duration = Duration::from_secs(60) / self.tempo_bpm;
|
||||||
|
|
||||||
|
println!("[PULSE] Starting at {} BPM ({:?} per beat)", self.tempo_bpm, beat_duration);
|
||||||
|
|
||||||
|
loop {
|
||||||
|
let now = Utc::now();
|
||||||
|
let frame = BeatFrame {
|
||||||
|
cluster_id: self.cluster_id.clone(),
|
||||||
|
tempo_bpm: self.tempo_bpm,
|
||||||
|
beat_index,
|
||||||
|
beat_epoch: now,
|
||||||
|
downbeat: beat_index == 1,
|
||||||
|
phase: "active".into(), // Simplified for POC
|
||||||
|
};
|
||||||
|
|
||||||
|
let msg = Message {
|
||||||
|
id: Uuid::new_v4(),
|
||||||
|
from_peer: "pulse".into(),
|
||||||
|
to_peer: "council".into(),
|
||||||
|
topic: "beat_frame".into(),
|
||||||
|
payload: serde_json::to_value(&frame).unwrap(),
|
||||||
|
sent_at: now,
|
||||||
|
read_at: None,
|
||||||
|
};
|
||||||
|
|
||||||
|
self.mailbox.send(&msg)?;
|
||||||
|
|
||||||
|
// Increment and cycle beat_index (default bar length = 8)
|
||||||
|
beat_index = if beat_index >= 8 { 1 } else { beat_index + 1 };
|
||||||
|
|
||||||
|
sleep(beat_duration).await;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// The Reverb component aggregates status claims into bar reports.
|
||||||
|
pub struct Reverb {
|
||||||
|
mailbox: Mailbox,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Reverb {
|
||||||
|
pub fn new(mailbox: Mailbox) -> Self {
|
||||||
|
Self { mailbox }
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Collects and summarizes status claims since a given time.
|
||||||
|
pub fn collect_report(&self, since: DateTime<Utc>) -> Result<Vec<StatusClaim>, BackbeatError> {
|
||||||
|
let messages = self.mailbox.receive_broadcasts("status_claim", since)?;
|
||||||
|
let claims: Vec<StatusClaim> = messages.into_iter()
|
||||||
|
.filter_map(|m| serde_json::from_value(m.payload).ok())
|
||||||
|
.collect();
|
||||||
|
Ok(claims)
|
||||||
|
}
|
||||||
|
}
|
||||||
Reference in New Issue
Block a user