diff --git a/chrs-mail/Cargo.toml b/chrs-mail/Cargo.toml new file mode 100644 index 00000000..de5260d9 --- /dev/null +++ b/chrs-mail/Cargo.toml @@ -0,0 +1,12 @@ +[package] +name = "chrs-mail" +version = "0.1.0" +edition = "2024" + +[dependencies] +rusqlite = { version = "0.30", features = ["bundled"] } +serde = { version = "1.0", features = ["derive"] } +serde_json = "1.0" +thiserror = "1.0" +chrono = { version = "0.4", features = ["serde"] } +uuid = { version = "1.4", features = ["v4", "serde"] } diff --git a/chrs-mail/src/lib.rs b/chrs-mail/src/lib.rs new file mode 100644 index 00000000..869931f3 --- /dev/null +++ b/chrs-mail/src/lib.rs @@ -0,0 +1,188 @@ +// chrs-mail library implementation + +use std::path::Path; +use chrono::{DateTime, Utc}; +use rusqlite::{params, Connection}; +use serde::{Deserialize, Serialize}; +use serde_json::Value as JsonValue; +use thiserror::Error; +use uuid::Uuid; + +/// Represents a mail message stored in the mailbox. +#[derive(Debug, Serialize, Deserialize, Clone)] +pub struct Message { + pub id: Uuid, + pub from_peer: String, + pub to_peer: String, + pub topic: String, + pub payload: JsonValue, + pub sent_at: DateTime, + pub read_at: Option>, +} + +/// Errors that can occur while using the Mailbox. +#[derive(Debug, Error)] +pub enum MailError { + #[error("SQLite error: {0}")] + Sqlite(#[from] rusqlite::Error), + #[error("JSON serialization error: {0}")] + Json(#[from] serde_json::Error), + #[error("UUID parsing error: {0}")] + Uuid(#[from] uuid::Error), + #[error("Chrono parsing error: {0}")] + ChronoParse(#[from] chrono::ParseError), +} + +/// Wrapper around a SQLite connection providing mail-box functionalities. +pub struct Mailbox { + conn: Connection, +} + +impl Mailbox { + /// Open (or create) a mailbox database at `path`. + pub fn open>(path: P) -> Result { + let conn = Connection::open(path)?; + // Enable WAL mode. + conn.pragma_update(None, "journal_mode", &"WAL")?; + // Create table. + conn.execute( + "CREATE TABLE IF NOT EXISTS messages ( + id TEXT PRIMARY KEY, + from_peer TEXT NOT NULL, + to_peer TEXT NOT NULL, + topic TEXT NOT NULL, + payload TEXT NOT NULL, + sent_at TEXT NOT NULL, + read_at TEXT + )", + [], + )?; + Ok(Self { conn }) + } + + /// Store a new message in the mailbox. + pub fn send(&self, msg: &Message) -> Result<(), MailError> { + let payload_str = serde_json::to_string(&msg.payload)?; + self.conn.execute( + "INSERT INTO messages (id, from_peer, to_peer, topic, payload, sent_at, read_at) + VALUES (?1, ?2, ?3, ?4, ?5, ?6, NULL)", + params![ + msg.id.to_string(), + &msg.from_peer, + &msg.to_peer, + &msg.topic, + payload_str, + msg.sent_at.to_rfc3339(), + ], + )?; + Ok(()) + } + + /// Retrieve all unread messages addressed to `peer_id`. + pub fn receive_pending(&self, peer_id: &str) -> Result, MailError> { + let mut stmt = self.conn.prepare( + "SELECT id, from_peer, to_peer, topic, payload, sent_at, read_at + FROM messages + WHERE to_peer = ?1 AND read_at IS NULL", + )?; + let rows = stmt.query_map(params![peer_id], |row| { + let id_str: String = row.get(0)?; + let from_peer: String = row.get(1)?; + let to_peer: String = row.get(2)?; + let topic: String = row.get(3)?; + let payload_str: String = row.get(4)?; + let sent_at_str: String = row.get(5)?; + let read_at_opt: Option = row.get(6)?; + + // Parse Uuid + let id = Uuid::parse_str(&id_str) + .map_err(|e| rusqlite::Error::FromSqlConversionFailure(0, rusqlite::types::Type::Text, Box::new(e)))?; + + // Parse JSON + let payload: JsonValue = serde_json::from_str(&payload_str) + .map_err(|e| rusqlite::Error::FromSqlConversionFailure(4, rusqlite::types::Type::Text, Box::new(e)))?; + + // Parse Timestamps + let sent_at = DateTime::parse_from_rfc3339(&sent_at_str) + .map_err(|e| rusqlite::Error::FromSqlConversionFailure(5, rusqlite::types::Type::Text, Box::new(e)))? + .with_timezone(&Utc); + + let read_at = match read_at_opt { + Some(s) => Some( + DateTime::parse_from_rfc3339(&s) + .map_err(|e| rusqlite::Error::FromSqlConversionFailure(6, rusqlite::types::Type::Text, Box::new(e)))? + .with_timezone(&Utc), + ), + None => None, + }; + + Ok(Message { + id, + from_peer, + to_peer, + topic, + payload, + sent_at, + read_at, + }) + })?; + + let mut msgs = Vec::new(); + for msg_res in rows { + msgs.push(msg_res?); + } + Ok(msgs) + } + + /// Mark a message as read by setting its `read_at` timestamp. + pub fn mark_read(&self, msg_id: Uuid) -> Result<(), MailError> { + let now = Utc::now().to_rfc3339(); + self.conn.execute( + "UPDATE messages SET read_at = ?1 WHERE id = ?2", + params![now, msg_id.to_string()], + )?; + Ok(()) + } +} + +#[cfg(test)] +mod tests { + use super::*; + use std::env; + use std::fs; + + fn temp_db_path() -> std::path::PathBuf { + let mut dir = env::temp_dir(); + dir.push(format!("chrs_mail_test_{}.sqlite", Uuid::new_v4())); + dir + } + + #[test] + fn roundtrip_send_and_receive() -> Result<(), MailError> { + let db_path = temp_db_path(); + if db_path.exists() { + fs::remove_file(&db_path).unwrap(); + } + let mailbox = Mailbox::open(&db_path)?; + let msg = Message { + id: Uuid::new_v4(), + from_peer: "alice".into(), + to_peer: "bob".into(), + topic: "greeting".into(), + payload: serde_json::json!({"text": "Hello"}), + sent_at: Utc::now(), + read_at: None, + }; + mailbox.send(&msg)?; + let pending = mailbox.receive_pending("bob")?; + assert_eq!(pending.len(), 1); + assert_eq!(pending[0].id, msg.id); + + mailbox.mark_read(msg.id)?; + let pending2 = mailbox.receive_pending("bob")?; + assert!(pending2.is_empty()); + + fs::remove_file(db_path).unwrap(); + Ok(()) + } +}