Implement chrs-election: Stateful weighted leader election following original CHORUS specs

This commit is contained in:
anthonyrawlins
2026-03-04 04:49:03 +11:00
parent 83ef40d3e2
commit 6bc78f4854
908 changed files with 739 additions and 32 deletions

View File

@@ -8,6 +8,7 @@ use chrs_exec::{DockerExecutor, TaskRequest};
use chrs_prompts::get_system_prompt;
use chrs_code_edit::WorktreeManager;
use chrs_discovery::{SwarmManager, BusHandle, BusMessage};
use chrs_election::ElectionManager;
use chrono::{Utc, DateTime};
use std::path::Path;
use std::time::Duration;
@@ -23,6 +24,7 @@ pub struct CHORUSAgent {
pub mailbox: Mailbox,
pub graph: DoltGraph,
pub council: CouncilManager,
pub election: ElectionManager,
pub executor: DockerExecutor,
pub code_edit: Option<WorktreeManager>,
pub bus: BusHandle,
@@ -66,6 +68,9 @@ impl CHORUSAgent {
// Initialize P2P Bus
let (bus_tx, bus_rx) = mpsc::unbounded_channel();
let bus = SwarmManager::start_bus(bus_tx).await?;
// Initialize Election Manager
let election = ElectionManager::new(id, bus.clone());
Ok(Self {
id: id.to_string(),
@@ -73,6 +78,7 @@ impl CHORUSAgent {
mailbox,
graph,
council,
election,
executor,
code_edit,
bus,
@@ -93,41 +99,59 @@ impl CHORUSAgent {
/// Main execution loop for the agent.
pub async fn run_loop(&mut self) {
println!("[AGENT {}] Role: {:?} starting...", self.id, self.role);
loop {
// 1. Broadcast presence via P2P Bus
let heartbeat_payload = serde_json::to_vec(&self.council.local_peer).unwrap();
let _ = self.bus.publish("chorus-heartbeat", heartbeat_payload);
let mut heartbeat_tick = tokio::time::interval(Duration::from_secs(5));
let mut election_tick = tokio::time::interval(Duration::from_secs(1));
// 2. Check P2P Bus for messages
while let Ok(msg) = self.bus_rx.try_recv() {
if msg.topic == "chorus-heartbeat" {
if let Ok(peer) = serde_json::from_slice::<Peer>(&msg.payload) {
if peer.id != self.id {
if !self.peers.contains_key(&peer.id) {
println!("[AGENT {}] P2P Discovered peer: {} ({:?})", self.id, peer.id, peer.role);
loop {
tokio::select! {
// 1. Periodic Heartbeat (if leader)
_ = heartbeat_tick.tick() => {
let _ = self.election.send_heartbeat().await;
// Also broadcast presence for discovery
let heartbeat_payload = serde_json::to_vec(&self.council.local_peer).unwrap();
let _ = self.bus.publish("chorus-heartbeat", heartbeat_payload);
}
// 2. Periodic Election State Machine Step
_ = election_tick.tick() => {
let _ = self.election.run_step().await;
}
// 3. Check P2P Bus for messages
msg = self.bus_rx.recv() => {
if let Some(msg) = msg {
if msg.topic == "chorus-heartbeat" {
if let Ok(peer) = serde_json::from_slice::<Peer>(&msg.payload) {
if peer.id != self.id {
if !self.peers.contains_key(&peer.id) {
println!("[AGENT {}] P2P Discovered peer: {} ({:?})", self.id, peer.id, peer.role);
}
self.peers.insert(peer.id.clone(), peer);
}
}
} else if msg.topic == "CHORUS/election/v1" || msg.topic == "CHORUS/admin/heartbeat/v1" {
let _ = self.election.process_message(&msg).await;
} else if msg.topic == "chorus-global" {
if let Ok(frame) = serde_json::from_slice::<BeatFrame>(&msg.payload) {
self.handle_beat(frame).await;
}
self.peers.insert(peer.id.clone(), peer);
}
}
} else if msg.topic == "chorus-global" {
// Check if it is a BeatFrame or a StatusClaim (for P2P sync)
if let Ok(frame) = serde_json::from_slice::<BeatFrame>(&msg.payload) {
self.handle_beat(frame).await;
}
// 4. Check for direct messages (Mailbox fallback)
_ = async { sleep(Duration::from_millis(500)).await } => {
match self.mailbox.receive_pending(&self.id) {
Ok(messages) => {
for msg in messages {
self.handle_message(msg).await;
}
}
Err(e) => eprintln!("Mailbox task error: {}", e),
}
}
}
// 3. Check for Pulse (Backbeat) and Tasks (Mailbox fallback for now)
match self.mailbox.receive_pending(&self.id) {
Ok(messages) => {
for msg in messages {
self.handle_message(msg).await;
}
}
Err(e) => eprintln!("Mailbox task error: {}", e),
}
sleep(Duration::from_secs(1)).await;
}
}
@@ -137,7 +161,7 @@ impl CHORUSAgent {
agent_id: self.id.clone(),
task_id: None,
beat_index: self.current_beat,
state: "idle".into(),
state: if self.election.is_leader() { "leading".into() } else { "idle".into() },
progress: 1.0,
};
@@ -170,7 +194,8 @@ impl CHORUSAgent {
let _ = self.graph.insert_node("task_log", log_entry);
let _ = self.graph.commit(&format!("Logged task: {}", msg.topic));
if msg.topic == "task" && (self.role == Role::Architect || self.role == Role::SeniorSoftwareArchitect) {
// 1. Delegation logic (Leader only)
if msg.topic == "task" && self.election.is_leader() {
let peers_vec: Vec<Peer> = self.peers.values().cloned().collect();
if !peers_vec.is_empty() {
let sub_tasks = self.council.delegate_work(msg.id, "System implementation", &peers_vec);
@@ -181,6 +206,7 @@ impl CHORUSAgent {
}
}
// 2. Execution logic
if msg.topic == "implementation_task" || msg.topic == "execution_task" {
let workspace_path = if let Some(mgr) = &self.code_edit {
println!("[AGENT {}] Preparing workspace for task...", self.id);
@@ -199,7 +225,7 @@ impl CHORUSAgent {
code: None,
agent_prompt: Some(format!("Your role: {}. {}", self.system_prompt, reasoning)),
workspace_path,
timeout_secs: 300, // Longer timeout for agent tasks
timeout_secs: 300,
};
match self.executor.execute(req).await {
Ok(res) => {
@@ -218,6 +244,11 @@ impl CHORUSAgent {
}
}
if msg.topic == "security_audit_task" {
println!("[AGENT {}] Performing security audit...", self.id);
let _reasoning = self.think("Perform security audit on latest implementation").await;
}
let _ = self.mailbox.mark_read(msg.id);
}
}