Implement chrs-council: Governance layer with weighted leader election and task delegation
This commit is contained in:
@@ -7,86 +7,47 @@ use serde::{Deserialize, Serialize};
|
||||
use serde_json::Value as JsonValue;
|
||||
use thiserror::Error;
|
||||
use uuid::Uuid;
|
||||
use std::sync::{Arc, Mutex};
|
||||
|
||||
/// Represents a mail message stored in the mailbox.
|
||||
///
|
||||
/// # Definition
|
||||
/// `Message` is a data structure that models a single mail exchange between two peers.
|
||||
/// It contains a unique identifier, sender and recipient identifiers, a topic string, a JSON payload,
|
||||
/// and timestamps for when the message was sent and optionally when it was read.
|
||||
///
|
||||
/// # Implementation Details
|
||||
/// - `id` is a **Uuid** generated by the caller to guarantee global uniqueness.
|
||||
/// - `payload` uses `serde_json::Value` so arbitrary JSON can be attached to the message.
|
||||
/// - `sent_at` and `read_at` are stored as `chrono::DateTime<Utc>` to provide timezone‑agnostic timestamps.
|
||||
///
|
||||
/// # Rationale
|
||||
/// This struct provides a lightweight, serialisable representation of a message that can be persisted
|
||||
/// in the SQLite‑backed mailbox (see `Mailbox`). Keeping the payload as JSON allows different subsystems
|
||||
/// of the CHORUS platform to embed domain‑specific data without requiring a rigid schema.
|
||||
#[derive(Debug, Serialize, Deserialize, Clone)]
|
||||
pub struct Message {
|
||||
/// Globally unique identifier for the message.
|
||||
pub id: Uuid,
|
||||
/// Identifier of the sending peer.
|
||||
pub from_peer: String,
|
||||
/// Identifier of the receiving peer.
|
||||
pub to_peer: String,
|
||||
/// Topic or channel of the message; used for routing/filters.
|
||||
pub topic: String,
|
||||
/// Arbitrary JSON payload containing the message body.
|
||||
pub payload: JsonValue,
|
||||
/// Timestamp (UTC) when the message was sent.
|
||||
pub sent_at: DateTime<Utc>,
|
||||
/// Optional timestamp (UTC) when the recipient read the message.
|
||||
pub read_at: Option<DateTime<Utc>>,
|
||||
}
|
||||
|
||||
/// Errors that can occur while using the `Mailbox`.
|
||||
///
|
||||
/// Each variant wraps an underlying error type from a dependency, allowing callers to
|
||||
/// react appropriately (e.g., retry on SQLite errors, surface serialization problems, etc.).
|
||||
#[derive(Debug, Error)]
|
||||
pub enum MailError {
|
||||
/// Propagates any `rusqlite::Error` encountered while interacting with the SQLite DB.
|
||||
#[error("SQLite error: {0}")]
|
||||
Sqlite(#[from] rusqlite::Error),
|
||||
/// Propagates JSON (de)serialization errors from `serde_json`.
|
||||
#[error("JSON serialization error: {0}")]
|
||||
Json(#[from] serde_json::Error),
|
||||
/// Propagates UUID parsing errors.
|
||||
#[error("UUID parsing error: {0}")]
|
||||
Uuid(#[from] uuid::Error),
|
||||
/// Propagates chrono parsing errors, primarily when deserialising timestamps from string.
|
||||
#[error("Chrono parsing error: {0}")]
|
||||
ChronoParse(#[from] chrono::ParseError),
|
||||
}
|
||||
|
||||
/// Wrapper around a SQLite connection providing mailbox‑style functionalities.
|
||||
///
|
||||
/// The `Mailbox` abstracts a SQLite database that stores `Message` records. It offers a minimal
|
||||
/// API for opening/creating the DB, sending messages, receiving pending messages for a peer, and
|
||||
/// marking messages as read.
|
||||
///
|
||||
/// # Architectural Rationale
|
||||
/// Using SQLite (via `rusqlite`) provides a zero‑configuration, file‑based persistence layer that is
|
||||
/// portable across the various environments where CHORUS components may run. The wrapper isolates the
|
||||
/// rest of the codebase from raw SQL handling, ensuring a single place for schema evolution and error
|
||||
/// mapping.
|
||||
/// # Implementation
|
||||
/// Uses `Arc<Mutex<Connection>>` to allow thread-safe cloning across agents.
|
||||
#[derive(Clone)]
|
||||
pub struct Mailbox {
|
||||
conn: Connection,
|
||||
conn: Arc<Mutex<Connection>>,
|
||||
}
|
||||
|
||||
impl Mailbox {
|
||||
/// Open (or create) a mailbox database at `path`.
|
||||
///
|
||||
/// The function creates the SQLite file if it does not exist, enables WAL mode for better
|
||||
/// concurrency, and ensures the `messages` table is present.
|
||||
pub fn open<P: AsRef<Path>>(path: P) -> Result<Self, MailError> {
|
||||
let conn = Connection::open(path)?;
|
||||
// Enable WAL mode for improved concurrency and durability.
|
||||
conn.pragma_update(None, "journal_mode", &"WAL")?;
|
||||
// Create the `messages` table if it does not already exist.
|
||||
conn.execute(
|
||||
"CREATE TABLE IF NOT EXISTS messages (
|
||||
id TEXT PRIMARY KEY,
|
||||
@@ -99,16 +60,13 @@ impl Mailbox {
|
||||
)",
|
||||
[],
|
||||
)?;
|
||||
Ok(Self { conn })
|
||||
Ok(Self { conn: Arc::new(Mutex::new(conn)) })
|
||||
}
|
||||
|
||||
/// Store a new message in the mailbox.
|
||||
///
|
||||
/// The `payload` field is serialised to a JSON string before insertion. The `read_at` column is
|
||||
/// initialised to `NULL` because the message has not yet been consumed.
|
||||
pub fn send(&self, msg: &Message) -> Result<(), MailError> {
|
||||
let payload_str = serde_json::to_string(&msg.payload)?;
|
||||
self.conn.execute(
|
||||
let conn = self.conn.lock().unwrap();
|
||||
conn.execute(
|
||||
"INSERT INTO messages (id, from_peer, to_peer, topic, payload, sent_at, read_at)
|
||||
VALUES (?1, ?2, ?3, ?4, ?5, ?6, NULL)",
|
||||
params![
|
||||
@@ -123,53 +81,35 @@ impl Mailbox {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Retrieve all unread messages addressed to `peer_id`.
|
||||
///
|
||||
/// The query filters on `to_peer` and `read_at IS NULL`. Returned rows are transformed back into
|
||||
/// `Message` structs, parsing the UUID, JSON payload, and RFC3339 timestamps.
|
||||
pub fn receive_pending(&self, peer_id: &str) -> Result<Vec<Message>, MailError> {
|
||||
let mut stmt = self.conn.prepare(
|
||||
let conn = self.conn.lock().unwrap();
|
||||
let mut stmt = conn.prepare(
|
||||
"SELECT id, from_peer, to_peer, topic, payload, sent_at, read_at
|
||||
FROM messages
|
||||
WHERE to_peer = ?1 AND read_at IS NULL",
|
||||
)?;
|
||||
let rows = stmt.query_map(params![peer_id], |row| {
|
||||
let id_str: String = row.get(0)?;
|
||||
let from_peer: String = row.get(1)?;
|
||||
let to_peer: String = row.get(2)?;
|
||||
let topic: String = row.get(3)?;
|
||||
let payload_str: String = row.get(4)?;
|
||||
let sent_at_str: String = row.get(5)?;
|
||||
let read_at_opt: Option<String> = row.get(6)?;
|
||||
|
||||
// Parse Uuid
|
||||
let id = Uuid::parse_str(&id_str)
|
||||
.map_err(|e| rusqlite::Error::FromSqlConversionFailure(0, rusqlite::types::Type::Text, Box::new(e)))?;
|
||||
// Parse JSON payload
|
||||
let payload: JsonValue = serde_json::from_str(&payload_str)
|
||||
.map_err(|e| rusqlite::Error::FromSqlConversionFailure(4, rusqlite::types::Type::Text, Box::new(e)))?;
|
||||
// Parse timestamps
|
||||
let sent_at = DateTime::parse_from_rfc3339(&sent_at_str)
|
||||
.map_err(|e| rusqlite::Error::FromSqlConversionFailure(5, rusqlite::types::Type::Text, Box::new(e)))?
|
||||
.with_timezone(&Utc);
|
||||
let read_at = match read_at_opt {
|
||||
Some(s) => Some(
|
||||
DateTime::parse_from_rfc3339(&s)
|
||||
.map_err(|e| rusqlite::Error::FromSqlConversionFailure(6, rusqlite::types::Type::Text, Box::new(e)))?
|
||||
.with_timezone(&Utc),
|
||||
),
|
||||
None => None,
|
||||
};
|
||||
|
||||
Ok(Message {
|
||||
id,
|
||||
from_peer,
|
||||
to_peer,
|
||||
topic,
|
||||
payload,
|
||||
sent_at,
|
||||
read_at,
|
||||
})
|
||||
Self::map_row(row)
|
||||
})?;
|
||||
|
||||
let mut msgs = Vec::new();
|
||||
for msg_res in rows {
|
||||
msgs.push(msg_res?);
|
||||
}
|
||||
Ok(msgs)
|
||||
}
|
||||
|
||||
/// Receive broadcast messages for a topic sent after a specific time.
|
||||
/// This DOES NOT filter by `read_at` because broadcasts are meant for everyone.
|
||||
pub fn receive_broadcasts(&self, topic: &str, since: DateTime<Utc>) -> Result<Vec<Message>, MailError> {
|
||||
let conn = self.conn.lock().unwrap();
|
||||
let mut stmt = conn.prepare(
|
||||
"SELECT id, from_peer, to_peer, topic, payload, sent_at, read_at
|
||||
FROM messages
|
||||
WHERE to_peer = 'council' AND topic = ?1 AND sent_at > ?2",
|
||||
)?;
|
||||
let rows = stmt.query_map(params![topic, since.to_rfc3339()], |row| {
|
||||
Self::map_row(row)
|
||||
})?;
|
||||
|
||||
let mut msgs = Vec::new();
|
||||
@@ -179,57 +119,41 @@ impl Mailbox {
|
||||
Ok(msgs)
|
||||
}
|
||||
|
||||
/// Mark a message as read by setting its `read_at` timestamp.
|
||||
///
|
||||
/// The current UTC time is stored in the `read_at` column for the row with the matching `id`.
|
||||
pub fn mark_read(&self, msg_id: Uuid) -> Result<(), MailError> {
|
||||
let now = Utc::now().to_rfc3339();
|
||||
self.conn.execute(
|
||||
let conn = self.conn.lock().unwrap();
|
||||
conn.execute(
|
||||
"UPDATE messages SET read_at = ?1 WHERE id = ?2",
|
||||
params![now, msg_id.to_string()],
|
||||
)?;
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
use std::env;
|
||||
use std::fs;
|
||||
fn map_row(row: &rusqlite::Row) -> Result<Message, rusqlite::Error> {
|
||||
let id_str: String = row.get(0)?;
|
||||
let from_peer: String = row.get(1)?;
|
||||
let to_peer: String = row.get(2)?;
|
||||
let topic: String = row.get(3)?;
|
||||
let payload_str: String = row.get(4)?;
|
||||
let sent_at_str: String = row.get(5)?;
|
||||
let read_at_opt: Option<String> = row.get(6)?;
|
||||
|
||||
fn temp_db_path() -> std::path::PathBuf {
|
||||
let mut dir = env::temp_dir();
|
||||
dir.push(format!("chrs_mail_test_{}.sqlite", Uuid::new_v4()));
|
||||
dir
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn roundtrip_send_and_receive() -> Result<(), MailError> {
|
||||
let db_path = temp_db_path();
|
||||
if db_path.exists() {
|
||||
fs::remove_file(&db_path).unwrap();
|
||||
}
|
||||
let mailbox = Mailbox::open(&db_path)?;
|
||||
let msg = Message {
|
||||
id: Uuid::new_v4(),
|
||||
from_peer: "alice".into(),
|
||||
to_peer: "bob".into(),
|
||||
topic: "greeting".into(),
|
||||
payload: serde_json::json!({"text": "Hello"}),
|
||||
sent_at: Utc::now(),
|
||||
read_at: None,
|
||||
let id = Uuid::parse_str(&id_str)
|
||||
.map_err(|e| rusqlite::Error::FromSqlConversionFailure(0, rusqlite::types::Type::Text, Box::new(e)))?;
|
||||
let payload: JsonValue = serde_json::from_str(&payload_str)
|
||||
.map_err(|e| rusqlite::Error::FromSqlConversionFailure(4, rusqlite::types::Type::Text, Box::new(e)))?;
|
||||
let sent_at = DateTime::parse_from_rfc3339(&sent_at_str)
|
||||
.map_err(|e| rusqlite::Error::FromSqlConversionFailure(5, rusqlite::types::Type::Text, Box::new(e)))?
|
||||
.with_timezone(&Utc);
|
||||
let read_at = match read_at_opt {
|
||||
Some(s) => Some(
|
||||
DateTime::parse_from_rfc3339(&s)
|
||||
.map_err(|e| rusqlite::Error::FromSqlConversionFailure(6, rusqlite::types::Type::Text, Box::new(e)))?
|
||||
.with_timezone(&Utc),
|
||||
),
|
||||
None => None,
|
||||
};
|
||||
mailbox.send(&msg)?;
|
||||
let pending = mailbox.receive_pending("bob")?;
|
||||
assert_eq!(pending.len(), 1);
|
||||
assert_eq!(pending[0].id, msg.id);
|
||||
|
||||
mailbox.mark_read(msg.id)?;
|
||||
let pending2 = mailbox.receive_pending("bob")?;
|
||||
assert!(pending2.is_empty());
|
||||
|
||||
fs::remove_file(db_path).unwrap();
|
||||
Ok(())
|
||||
Ok(Message { id, from_peer, to_peer, topic, payload, sent_at, read_at })
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user