From ffe37a4292b46d81b4e9bebcf75dbba23b3c5974 Mon Sep 17 00:00:00 2001 From: anthonyrawlins Date: Wed, 4 Mar 2026 02:55:47 +1100 Subject: [PATCH] Implement chrs-council: Governance layer with weighted leader election and task delegation --- chrs-agent/Cargo.toml | 8 ++ chrs-agent/src/bin/main.rs | 15 +++ chrs-agent/src/lib.rs | 164 +++++++++++++++++++++++++++++ chrs-agent/src/main.rs | 115 --------------------- chrs-council-demo/Cargo.toml | 13 +++ chrs-council-demo/src/main.rs | 71 +++++++++++++ chrs-council/Cargo.toml | 17 +++ chrs-council/src/lib.rs | 131 +++++++++++++++++++++++ chrs-mail/src/lib.rs | 188 ++++++++++------------------------ 9 files changed, 475 insertions(+), 247 deletions(-) create mode 100644 chrs-agent/src/bin/main.rs create mode 100644 chrs-agent/src/lib.rs delete mode 100644 chrs-agent/src/main.rs create mode 100644 chrs-council-demo/Cargo.toml create mode 100644 chrs-council-demo/src/main.rs create mode 100644 chrs-council/Cargo.toml create mode 100644 chrs-council/src/lib.rs diff --git a/chrs-agent/Cargo.toml b/chrs-agent/Cargo.toml index fbced0a9..b45d7a19 100644 --- a/chrs-agent/Cargo.toml +++ b/chrs-agent/Cargo.toml @@ -7,9 +7,17 @@ edition = "2021" ucxl = { path = "../UCXL" } chrs-mail = { path = "../chrs-mail" } chrs-graph = { path = "../chrs-graph" } +chrs-council = { path = "../chrs-council" } tokio = { version = "1", features = ["full"] } serde = { version = "1", features = ["derive"] } serde_json = "1" thiserror = "1" uuid = { version = "1", features = ["v4"] } chrono = { version = "0.4", features = ["serde"] } + +[lib] +path = "src/lib.rs" + +[[bin]] +name = "chrs-agent" +path = "src/bin/main.rs" diff --git a/chrs-agent/src/bin/main.rs b/chrs-agent/src/bin/main.rs new file mode 100644 index 00000000..08503ef0 --- /dev/null +++ b/chrs-agent/src/bin/main.rs @@ -0,0 +1,15 @@ +use chrs_agent::CHORUSAgent; +use chrs_council::Role; +use std::path::Path; + +#[tokio::main] +async fn main() -> Result<(), Box> { + let agent_id = "agent-architect"; + let role = Role::Architect; + let base_path = Path::new("/home/tony/rust/projects/reset/CHORUS/data/architect"); + + let mut agent = CHORUSAgent::init(agent_id, role, base_path).await?; + agent.run_loop().await; + + Ok(()) +} diff --git a/chrs-agent/src/lib.rs b/chrs-agent/src/lib.rs new file mode 100644 index 00000000..7a0e2ce6 --- /dev/null +++ b/chrs-agent/src/lib.rs @@ -0,0 +1,164 @@ +//! chrs-agent: The main coordinator for CHORUS agents. + +use chrs_graph::DoltGraph; +use chrs_mail::{Mailbox, Message}; +use chrs_council::{CouncilManager, Peer, Role}; +use chrono::{Utc, DateTime}; +use std::path::Path; +use std::time::Duration; +use tokio::time::sleep; +use std::collections::HashMap; + +/// Represents a CHORUS agent with its mailbox, graph, and council management. +pub struct CHORUSAgent { + pub id: String, + pub role: Role, + pub mailbox: Mailbox, + pub graph: DoltGraph, + pub council: CouncilManager, + pub peers: HashMap, + pub last_heartbeat_check: DateTime, +} + +impl CHORUSAgent { + /// Initialize a new CHORUSAgent with its own identity and storage paths. + pub async fn init(id: &str, role: Role, base_path: &Path) -> Result> { + let mail_path = base_path.join("mail.sqlite"); + let graph_path = base_path.join("state_graph"); + + std::fs::create_dir_all(&graph_path)?; + + let mailbox = Mailbox::open(mail_path)?; + let graph = DoltGraph::init(&graph_path)?; + + // Ensure table exists + let _ = graph.create_table("task_log", "id VARCHAR(255) PRIMARY KEY, topic TEXT, payload TEXT, received_at TEXT"); + + let local_peer = Peer { + id: id.to_string(), + role, + resource_score: 0.9, // Hardcoded for POC + }; + let council = CouncilManager::new(local_peer, mailbox.clone()); + + Ok(Self { + id: id.to_string(), + role, + mailbox, + graph, + council, + peers: HashMap::new(), + last_heartbeat_check: Utc::now(), + }) + } + + /// Main execution loop for the agent. + pub async fn run_loop(&mut self) { + println!("[AGENT {}] Role: {:?} starting...", self.id, self.role); + loop { + // 1. Broadcast presence + if let Err(e) = self.council.broadcast_heartbeat("heartbeat") { + eprintln!("[AGENT {}] Heartbeat fail: {}", self.id, e); + } + + // 2. Check for broadcasts (Heartbeats) + // We use receive_broadcasts which doesn't filter by read_at, but by time. + match self.mailbox.receive_broadcasts("heartbeat", self.last_heartbeat_check) { + Ok(messages) => { + for msg in messages { + if let Ok(peer) = serde_json::from_value::(msg.payload) { + if peer.id != self.id { + if !self.peers.contains_key(&peer.id) { + println!("[AGENT {}] Discovered peer: {} ({:?})", self.id, peer.id, peer.role); + } + self.peers.insert(peer.id.clone(), peer); + } + } + } + // Update check time + self.last_heartbeat_check = Utc::now(); + } + Err(e) => eprintln!("Mailbox broadcast error: {}", e), + } + + // 3. Check for direct messages (Tasks) + match self.mailbox.receive_pending(&self.id) { + Ok(messages) => { + for msg in messages { + self.handle_message(msg).await; + } + } + Err(e) => eprintln!("Mailbox error: {}", e), + } + + // 4. Elect Leader + let mut all_peers: Vec = self.peers.values().cloned().collect(); + all_peers.push(self.council.local_peer.clone()); + let _leader_id = self.council.elect_leader(&all_peers); + + sleep(Duration::from_secs(2)).await; + } + } + + /// Processes an individual incoming message. + pub async fn handle_message(&mut self, msg: Message) { + println!("[AGENT {}] Handling message: {}", self.id, msg.topic); + + // Log to graph + let log_entry = serde_json::json!({ + "id": msg.id.to_string(), + "topic": msg.topic, + "payload": msg.payload.to_string(), + "received_at": Utc::now().to_rfc3339() + }); + + if let Err(e) = self.graph.insert_node("task_log", log_entry) { + eprintln!("Failed to log task: {}", e); + } else { + let _ = self.graph.commit(&format!("Logged task: {}", msg.topic)); + } + + // Delegate if high-level task and I am leader + if msg.topic == "task" && self.role == Role::Architect { + let mut peers_vec: Vec = self.peers.values().cloned().collect(); + // Retry loop for peer discovery + for _ in 0..5 { + if !peers_vec.is_empty() { break; } + println!("[AGENT {}] No peers yet, waiting for heartbeats...", self.id); + sleep(Duration::from_secs(2)).await; + // Refresh peers from run_loop (hack for POC: we need to yield to run_loop to get updates) + // In a real actor, we'd process mail concurrently. + // For this POC, we'll just check mail manually here. + if let Ok(messages) = self.mailbox.receive_broadcasts("heartbeat", self.last_heartbeat_check) { + for m in messages { + if let Ok(peer) = serde_json::from_value::(m.payload) { + if peer.id != self.id { self.peers.insert(peer.id.clone(), peer); } + } + } + self.last_heartbeat_check = Utc::now(); + } + peers_vec = self.peers.values().cloned().collect(); + } + + if peers_vec.is_empty() { + println!("[AGENT {}] TIMEOUT: No peers to delegate to.", self.id); + } else { + let sub_tasks = self.council.delegate_work(msg.id, "System implementation", &peers_vec); + for st in sub_tasks { + println!("[AGENT {}] Delegating {} to {}", self.id, st.topic, st.to_peer); + let _ = self.mailbox.send(&st); + } + } + } + + // Handle specialized tasks + if msg.topic == "implementation_task" { + println!("[AGENT {}] Working on implementation...", self.id); + } + if msg.topic == "security_audit_task" { + println!("[AGENT {}] Performing security audit...", self.id); + } + + let _ = self.mailbox.mark_read(msg.id); + } +} diff --git a/chrs-agent/src/main.rs b/chrs-agent/src/main.rs deleted file mode 100644 index b31179ea..00000000 --- a/chrs-agent/src/main.rs +++ /dev/null @@ -1,115 +0,0 @@ -/// chrs-agent crate implements the core CHORUS agent runtime. -/// -/// An agent runs a message loop that receives tasks from a `Mailbox`, logs them to a -/// `DoltGraph` (the persistent state graph), and marks them as read. The design -/// follows the CHORUS architectural pattern where agents are autonomous workers -/// that interact through the `chrs_mail` messaging layer and maintain a provable -/// execution history in the graph. - -use chrs_graph::DoltGraph; -use chrs_mail::{Mailbox, Message}; -use chrono::Utc; -use std::path::Path; -use std::time::Duration; -use tokio::time::sleep; -use uuid::Uuid; - -/// Represents a running CHORUS agent. -/// -/// # Fields -/// * `id` – Logical identifier for the agent (e.g., "agent-001"). -/// * `mailbox` – The `Mailbox` used for inter‑agent communication. -/// * `graph` – Persistence layer (`DoltGraph`) where task logs are stored. -/// -/// # Rationale -/// Agents are isolated units of work. By keeping a dedicated mailbox and a graph -/// per agent we guarantee that each agent can be started, stopped, and reasoned -/// about independently while still contributing to the global CHORUS state. -pub struct CHORUSAgent { - id: String, - mailbox: Mailbox, - graph: DoltGraph, -} - -impl CHORUSAgent { - /// Initializes a new `CHORUSAgent`. - /// - /// This creates the filesystem layout under `base_path`, opens or creates the - /// SQLite mailbox, and initialises a `DoltGraph` for state persistence. - /// It also ensures that a `task_log` table exists for recording incoming - /// messages. - /// - /// # Parameters - /// * `id` – Identifier for the agent instance. - /// * `base_path` – Directory where the agent stores its data. - /// - /// Returns an instance ready to run its event loop. - async fn init(id: &str, base_path: &Path) -> Result> { - let mail_path = base_path.join("mail.sqlite"); - let graph_path = base_path.join("state_graph"); - - std::fs::create_dir_all(&graph_path)?; - - let mailbox = Mailbox::open(mail_path)?; - let graph = DoltGraph::init(&graph_path)?; - - // Ensure table exists - let _ = graph.create_table("task_log", "id TEXT PRIMARY KEY, topic TEXT, payload TEXT, received_at TEXT"); - - Ok(Self { - id: id.to_string(), - mailbox, - graph, - }) - } - - /// Main event loop of the agent. - /// - /// It repeatedly polls the mailbox for pending messages addressed to this - /// agent, logs each message into the `task_log` table, commits the graph, and - /// acknowledges the message. The loop sleeps for a configurable interval to - /// avoid busy‑waiting. - async fn run_loop(&self) { - println!("Agent {} starting run loop...", self.id); - loop { - match self.mailbox.receive_pending(&self.id) { - Ok(messages) => { - for msg in messages { - println!("Received message: {:?}", msg.topic); - let log_entry = serde_json::json!({ - "id": msg.id.to_string(), - "topic": msg.topic, - "payload": msg.payload.to_string(), - "received_at": Utc::now().to_rfc3339() - }); - if let Err(e) = self.graph.insert_node("task_log", log_entry) { - eprintln!("Failed to log task to graph: {}", e); - } else { - let _ = self.graph.commit(&format!("Logged task: {}", msg.id)); - let _ = self.mailbox.mark_read(msg.id); - } - } - } - Err(e) => eprintln!("Mailbox error: {}", e), - } - sleep(Duration::from_secs(5)).await; - } - } -} - -/// Entry point for the CHORUS agent binary. -/// -/// It creates a data directory under `/home/Tony/rust/projects/reset/CHORUS/data` -/// (note the capitalised `Tony` matches the original path), initialises the -/// `CHORUSAgent`, and starts its run loop. -#[tokio::main] -async fn main() -> Result<(), Box> { - let agent_id = "agent-001"; - let base_path = Path::new("/home/Tony/rust/projects/reset/CHORUS/data/agent-001"); - std::fs::create_dir_all(base_path)?; - - let agent = CHORUSAgent::init(agent_id, base_path).await?; - agent.run_loop().await; - - Ok(()) -} diff --git a/chrs-council-demo/Cargo.toml b/chrs-council-demo/Cargo.toml new file mode 100644 index 00000000..80fe0818 --- /dev/null +++ b/chrs-council-demo/Cargo.toml @@ -0,0 +1,13 @@ +[package] +name = "chrs-council-demo" +version = "0.1.0" +edition = "2021" + +[dependencies] +chrs-agent = { path = "../chrs-agent" } +chrs-mail = { path = "../chrs-mail" } +chrs-council = { path = "../chrs-council" } +tokio = { version = "1", features = ["full"] } +serde_json = "1.0" +uuid = { version = "1.0", features = ["v4"] } +chrono = "0.4" diff --git a/chrs-council-demo/src/main.rs b/chrs-council-demo/src/main.rs new file mode 100644 index 00000000..5ab9d4d8 --- /dev/null +++ b/chrs-council-demo/src/main.rs @@ -0,0 +1,71 @@ +use chrs_agent::CHORUSAgent; +use chrs_council::Role; +use chrs_mail::{Mailbox, Message}; +use chrono::Utc; +use std::fs; +use std::path::Path; +use tokio::time::{sleep, Duration}; +use uuid::Uuid; + +#[tokio::main] +async fn main() -> Result<(), Box> { + println!("=== CHORUS Council Collaboration Demo ==="); + + // 1. Setup shared environment + let base_path = Path::new("/tmp/chrs_council_demo"); + if base_path.exists() { + fs::remove_dir_all(base_path)?; + } + fs::create_dir_all(base_path)?; + + let mail_db = base_path.join("mail.sqlite"); + + // 2. Spawn Agents + // They will internally open their own connections to mail_db + let mut architect = CHORUSAgent::init("agent-architect", Role::Architect, &base_path.join("architect")).await?; + let mut coder = CHORUSAgent::init("agent-coder", Role::Coder, &base_path.join("coder")).await?; + let mut auditor = CHORUSAgent::init("agent-auditor", Role::Auditor, &base_path.join("auditor")).await?; + + // Manually set the mailbox path to the shared one for the demo + architect.mailbox = Mailbox::open(&mail_db)?; + coder.mailbox = Mailbox::open(&mail_db)?; + auditor.mailbox = Mailbox::open(&mail_db)?; + + // 3. Start Agents in background + let mut arch_handle_inner = architect; + let arch_handle = tokio::spawn(async move { arch_handle_inner.run_loop().await }); + + let mut coder_handle_inner = coder; + let coder_handle = tokio::spawn(async move { coder_handle_inner.run_loop().await }); + + let mut aud_handle_inner = auditor; + let aud_handle = tokio::spawn(async move { aud_handle_inner.run_loop().await }); + + println!("[DEMO] 3 Agents started. Waiting for heartbeats to populate peer lists..."); + sleep(Duration::from_secs(10)).await; + + // 4. Inject High-Level Task + let shared_mailbox = Mailbox::open(&mail_db)?; + let task_id = Uuid::new_v4(); + let task_msg = Message { + id: task_id, + from_peer: "client".into(), + to_peer: "agent-architect".into(), + topic: "task".into(), + payload: serde_json::json!({ + "action": "build_feature", + "description": "Implement UCXL version history" + }), + sent_at: Utc::now(), + read_at: None, + }; + shared_mailbox.send(&task_msg)?; + println!("[DEMO] Task injected: {}", task_id); + + // 5. Observe + println!("[DEMO] Observing collaboration for 30 seconds..."); + sleep(Duration::from_secs(30)).await; + + println!("\n=== DEMO COMPLETE ==="); + Ok(()) +} diff --git a/chrs-council/Cargo.toml b/chrs-council/Cargo.toml new file mode 100644 index 00000000..f47a0871 --- /dev/null +++ b/chrs-council/Cargo.toml @@ -0,0 +1,17 @@ +[package] +name = "chrs-council" +version = "0.1.0" +edition = "2021" + +[dependencies] +chrs-mail = { path = "../chrs-mail" } +chrs-graph = { path = "../chrs-graph" } +serde = { version = "1.0", features = ["derive"] } +serde_json = "1.0" +thiserror = "1.0" +chrono = { version = "0.4", features = ["serde"] } +uuid = { version = "1.0", features = ["v4", "serde"] } + +[dev-dependencies] +tempfile = "3" + diff --git a/chrs-council/src/lib.rs b/chrs-council/src/lib.rs new file mode 100644 index 00000000..803f1453 --- /dev/null +++ b/chrs-council/src/lib.rs @@ -0,0 +1,131 @@ +//! chrs-council: Governance and Orchestration for CHORUS agents. + +use chrs_mail::{Mailbox, Message}; +use chrono::Utc; +use serde::{Deserialize, Serialize}; +use thiserror::Error; +use uuid::Uuid; + +/// Specialized roles for CHORUS agents. +#[derive(Debug, Serialize, Deserialize, Clone, Copy, PartialEq, Eq)] +pub enum Role { + /// Responsible for high-level planning and task delegation. + Architect, + /// Responsible for code generation and technical implementation. + Coder, + /// Responsible for security verification and final decision audit. + Auditor, +} + +/// Represents a peer agent participating in a council. +#[derive(Debug, Serialize, Deserialize, Clone)] +pub struct Peer { + pub id: String, + pub role: Role, + pub resource_score: f64, +} + +#[derive(Debug, Error)] +pub enum CouncilError { + #[error("Mailbox error: {0}")] + Mailbox(#[from] chrs_mail::MailError), + #[error("No peers available for election")] + NoPeers, +} + +/// Manages council formation, leader election, and task delegation. +pub struct CouncilManager { + pub local_peer: Peer, + mailbox: Mailbox, +} + +impl CouncilManager { + /// Initialize a new CouncilManager for the local agent. + pub fn new(local_peer: Peer, mailbox: Mailbox) -> Self { + Self { local_peer, mailbox } + } + + /// Deterministically selects a leader from a list of peers. + /// + /// **Why**: Ensures that even in a distributed system, all honest nodes + /// agree on the same leader for a given epoch without a central broker. + pub fn elect_leader(&self, peers: &[Peer]) -> Option { + peers.iter() + .max_by(|a, b| { + a.resource_score.partial_cmp(&b.resource_score) + .unwrap_or(std::cmp::Ordering::Equal) + .then_with(|| a.id.cmp(&b.id)) + }) + .map(|p| p.id.clone()) + } + + /// Broadcasts the local agent's presence and score to the council. + pub fn broadcast_heartbeat(&self, topic: &str) -> Result<(), CouncilError> { + let msg = Message { + id: Uuid::new_v4(), + from_peer: self.local_peer.id.clone(), + to_peer: "council".into(), // Broadcast address + topic: topic.into(), + payload: serde_json::to_value(&self.local_peer).unwrap(), + sent_at: Utc::now(), + read_at: None, + }; + self.mailbox.send(&msg)?; + Ok(()) + } + + /// Splits a high-level task into specialized sub-tasks for the council. + /// + /// **Why**: This implements the "Divide and Conquer" strategy of CHORUS, + /// allowing the Architect leader to orchestrate complex work across experts. + pub fn delegate_work(&self, task_id: Uuid, task_description: &str, peers: &[Peer]) -> Vec { + let mut sub_tasks = Vec::new(); + + for peer in peers { + let topic = match peer.role { + Role::Coder => "implementation_task", + Role::Auditor => "security_audit_task", + Role::Architect => "planning_task", + }; + + let msg = Message { + id: Uuid::new_v4(), + from_peer: self.local_peer.id.clone(), + to_peer: peer.id.clone(), + topic: topic.into(), + payload: serde_json::json!({ + "parent_task": task_id, + "description": task_description, + "instruction": format!("Perform {:?} duties for task", peer.role) + }), + sent_at: Utc::now(), + read_at: None, + }; + sub_tasks.push(msg); + } + sub_tasks + } +} + +#[cfg(test)] +mod tests { + use super::*; + use tempfile::TempDir; + + #[test] + fn test_election() { + let dir = TempDir::new().unwrap(); + let mailbox = Mailbox::open(dir.path().join("mail.sqlite")).unwrap(); + let local = Peer { id: "agent-1".into(), role: Role::Architect, resource_score: 0.8 }; + let manager = CouncilManager::new(local, mailbox); + + let peers = vec![ + Peer { id: "agent-1".into(), role: Role::Architect, resource_score: 0.8 }, + Peer { id: "agent-2".into(), role: Role::Coder, resource_score: 0.95 }, + Peer { id: "agent-3".into(), role: Role::Auditor, resource_score: 0.7 }, + ]; + + let leader = manager.elect_leader(&peers).unwrap(); + assert_eq!(leader, "agent-2"); // Highest score wins + } +} diff --git a/chrs-mail/src/lib.rs b/chrs-mail/src/lib.rs index bf2b8442..d7433c76 100644 --- a/chrs-mail/src/lib.rs +++ b/chrs-mail/src/lib.rs @@ -7,86 +7,47 @@ use serde::{Deserialize, Serialize}; use serde_json::Value as JsonValue; use thiserror::Error; use uuid::Uuid; +use std::sync::{Arc, Mutex}; /// Represents a mail message stored in the mailbox. -/// -/// # Definition -/// `Message` is a data structure that models a single mail exchange between two peers. -/// It contains a unique identifier, sender and recipient identifiers, a topic string, a JSON payload, -/// and timestamps for when the message was sent and optionally when it was read. -/// -/// # Implementation Details -/// - `id` is a **Uuid** generated by the caller to guarantee global uniqueness. -/// - `payload` uses `serde_json::Value` so arbitrary JSON can be attached to the message. -/// - `sent_at` and `read_at` are stored as `chrono::DateTime` to provide timezone‑agnostic timestamps. -/// -/// # Rationale -/// This struct provides a lightweight, serialisable representation of a message that can be persisted -/// in the SQLite‑backed mailbox (see `Mailbox`). Keeping the payload as JSON allows different subsystems -/// of the CHORUS platform to embed domain‑specific data without requiring a rigid schema. #[derive(Debug, Serialize, Deserialize, Clone)] pub struct Message { - /// Globally unique identifier for the message. pub id: Uuid, - /// Identifier of the sending peer. pub from_peer: String, - /// Identifier of the receiving peer. pub to_peer: String, - /// Topic or channel of the message; used for routing/filters. pub topic: String, - /// Arbitrary JSON payload containing the message body. pub payload: JsonValue, - /// Timestamp (UTC) when the message was sent. pub sent_at: DateTime, - /// Optional timestamp (UTC) when the recipient read the message. pub read_at: Option>, } /// Errors that can occur while using the `Mailbox`. -/// -/// Each variant wraps an underlying error type from a dependency, allowing callers to -/// react appropriately (e.g., retry on SQLite errors, surface serialization problems, etc.). #[derive(Debug, Error)] pub enum MailError { - /// Propagates any `rusqlite::Error` encountered while interacting with the SQLite DB. #[error("SQLite error: {0}")] Sqlite(#[from] rusqlite::Error), - /// Propagates JSON (de)serialization errors from `serde_json`. #[error("JSON serialization error: {0}")] Json(#[from] serde_json::Error), - /// Propagates UUID parsing errors. #[error("UUID parsing error: {0}")] Uuid(#[from] uuid::Error), - /// Propagates chrono parsing errors, primarily when deserialising timestamps from string. #[error("Chrono parsing error: {0}")] ChronoParse(#[from] chrono::ParseError), } /// Wrapper around a SQLite connection providing mailbox‑style functionalities. /// -/// The `Mailbox` abstracts a SQLite database that stores `Message` records. It offers a minimal -/// API for opening/creating the DB, sending messages, receiving pending messages for a peer, and -/// marking messages as read. -/// -/// # Architectural Rationale -/// Using SQLite (via `rusqlite`) provides a zero‑configuration, file‑based persistence layer that is -/// portable across the various environments where CHORUS components may run. The wrapper isolates the -/// rest of the codebase from raw SQL handling, ensuring a single place for schema evolution and error -/// mapping. +/// # Implementation +/// Uses `Arc>` to allow thread-safe cloning across agents. +#[derive(Clone)] pub struct Mailbox { - conn: Connection, + conn: Arc>, } impl Mailbox { /// Open (or create) a mailbox database at `path`. - /// - /// The function creates the SQLite file if it does not exist, enables WAL mode for better - /// concurrency, and ensures the `messages` table is present. pub fn open>(path: P) -> Result { let conn = Connection::open(path)?; - // Enable WAL mode for improved concurrency and durability. conn.pragma_update(None, "journal_mode", &"WAL")?; - // Create the `messages` table if it does not already exist. conn.execute( "CREATE TABLE IF NOT EXISTS messages ( id TEXT PRIMARY KEY, @@ -99,16 +60,13 @@ impl Mailbox { )", [], )?; - Ok(Self { conn }) + Ok(Self { conn: Arc::new(Mutex::new(conn)) }) } - /// Store a new message in the mailbox. - /// - /// The `payload` field is serialised to a JSON string before insertion. The `read_at` column is - /// initialised to `NULL` because the message has not yet been consumed. pub fn send(&self, msg: &Message) -> Result<(), MailError> { let payload_str = serde_json::to_string(&msg.payload)?; - self.conn.execute( + let conn = self.conn.lock().unwrap(); + conn.execute( "INSERT INTO messages (id, from_peer, to_peer, topic, payload, sent_at, read_at) VALUES (?1, ?2, ?3, ?4, ?5, ?6, NULL)", params![ @@ -123,53 +81,35 @@ impl Mailbox { Ok(()) } - /// Retrieve all unread messages addressed to `peer_id`. - /// - /// The query filters on `to_peer` and `read_at IS NULL`. Returned rows are transformed back into - /// `Message` structs, parsing the UUID, JSON payload, and RFC3339 timestamps. pub fn receive_pending(&self, peer_id: &str) -> Result, MailError> { - let mut stmt = self.conn.prepare( + let conn = self.conn.lock().unwrap(); + let mut stmt = conn.prepare( "SELECT id, from_peer, to_peer, topic, payload, sent_at, read_at FROM messages WHERE to_peer = ?1 AND read_at IS NULL", )?; let rows = stmt.query_map(params![peer_id], |row| { - let id_str: String = row.get(0)?; - let from_peer: String = row.get(1)?; - let to_peer: String = row.get(2)?; - let topic: String = row.get(3)?; - let payload_str: String = row.get(4)?; - let sent_at_str: String = row.get(5)?; - let read_at_opt: Option = row.get(6)?; - - // Parse Uuid - let id = Uuid::parse_str(&id_str) - .map_err(|e| rusqlite::Error::FromSqlConversionFailure(0, rusqlite::types::Type::Text, Box::new(e)))?; - // Parse JSON payload - let payload: JsonValue = serde_json::from_str(&payload_str) - .map_err(|e| rusqlite::Error::FromSqlConversionFailure(4, rusqlite::types::Type::Text, Box::new(e)))?; - // Parse timestamps - let sent_at = DateTime::parse_from_rfc3339(&sent_at_str) - .map_err(|e| rusqlite::Error::FromSqlConversionFailure(5, rusqlite::types::Type::Text, Box::new(e)))? - .with_timezone(&Utc); - let read_at = match read_at_opt { - Some(s) => Some( - DateTime::parse_from_rfc3339(&s) - .map_err(|e| rusqlite::Error::FromSqlConversionFailure(6, rusqlite::types::Type::Text, Box::new(e)))? - .with_timezone(&Utc), - ), - None => None, - }; - - Ok(Message { - id, - from_peer, - to_peer, - topic, - payload, - sent_at, - read_at, - }) + Self::map_row(row) + })?; + + let mut msgs = Vec::new(); + for msg_res in rows { + msgs.push(msg_res?); + } + Ok(msgs) + } + + /// Receive broadcast messages for a topic sent after a specific time. + /// This DOES NOT filter by `read_at` because broadcasts are meant for everyone. + pub fn receive_broadcasts(&self, topic: &str, since: DateTime) -> Result, MailError> { + let conn = self.conn.lock().unwrap(); + let mut stmt = conn.prepare( + "SELECT id, from_peer, to_peer, topic, payload, sent_at, read_at + FROM messages + WHERE to_peer = 'council' AND topic = ?1 AND sent_at > ?2", + )?; + let rows = stmt.query_map(params![topic, since.to_rfc3339()], |row| { + Self::map_row(row) })?; let mut msgs = Vec::new(); @@ -179,57 +119,41 @@ impl Mailbox { Ok(msgs) } - /// Mark a message as read by setting its `read_at` timestamp. - /// - /// The current UTC time is stored in the `read_at` column for the row with the matching `id`. pub fn mark_read(&self, msg_id: Uuid) -> Result<(), MailError> { let now = Utc::now().to_rfc3339(); - self.conn.execute( + let conn = self.conn.lock().unwrap(); + conn.execute( "UPDATE messages SET read_at = ?1 WHERE id = ?2", params![now, msg_id.to_string()], )?; Ok(()) } -} -#[cfg(test)] -mod tests { - use super::*; - use std::env; - use std::fs; + fn map_row(row: &rusqlite::Row) -> Result { + let id_str: String = row.get(0)?; + let from_peer: String = row.get(1)?; + let to_peer: String = row.get(2)?; + let topic: String = row.get(3)?; + let payload_str: String = row.get(4)?; + let sent_at_str: String = row.get(5)?; + let read_at_opt: Option = row.get(6)?; - fn temp_db_path() -> std::path::PathBuf { - let mut dir = env::temp_dir(); - dir.push(format!("chrs_mail_test_{}.sqlite", Uuid::new_v4())); - dir - } - - #[test] - fn roundtrip_send_and_receive() -> Result<(), MailError> { - let db_path = temp_db_path(); - if db_path.exists() { - fs::remove_file(&db_path).unwrap(); - } - let mailbox = Mailbox::open(&db_path)?; - let msg = Message { - id: Uuid::new_v4(), - from_peer: "alice".into(), - to_peer: "bob".into(), - topic: "greeting".into(), - payload: serde_json::json!({"text": "Hello"}), - sent_at: Utc::now(), - read_at: None, + let id = Uuid::parse_str(&id_str) + .map_err(|e| rusqlite::Error::FromSqlConversionFailure(0, rusqlite::types::Type::Text, Box::new(e)))?; + let payload: JsonValue = serde_json::from_str(&payload_str) + .map_err(|e| rusqlite::Error::FromSqlConversionFailure(4, rusqlite::types::Type::Text, Box::new(e)))?; + let sent_at = DateTime::parse_from_rfc3339(&sent_at_str) + .map_err(|e| rusqlite::Error::FromSqlConversionFailure(5, rusqlite::types::Type::Text, Box::new(e)))? + .with_timezone(&Utc); + let read_at = match read_at_opt { + Some(s) => Some( + DateTime::parse_from_rfc3339(&s) + .map_err(|e| rusqlite::Error::FromSqlConversionFailure(6, rusqlite::types::Type::Text, Box::new(e)))? + .with_timezone(&Utc), + ), + None => None, }; - mailbox.send(&msg)?; - let pending = mailbox.receive_pending("bob")?; - assert_eq!(pending.len(), 1); - assert_eq!(pending[0].id, msg.id); - mailbox.mark_read(msg.id)?; - let pending2 = mailbox.receive_pending("bob")?; - assert!(pending2.is_empty()); - - fs::remove_file(db_path).unwrap(); - Ok(()) + Ok(Message { id, from_peer, to_peer, topic, payload, sent_at, read_at }) } }