chrs_mail/
lib.rs

1//! chrs-mail library implementation
2
3use std::path::Path;
4use chrono::{DateTime, Utc};
5use rusqlite::{params, Connection};
6use serde::{Deserialize, Serialize};
7use serde_json::Value as JsonValue;
8use thiserror::Error;
9use uuid::Uuid;
10
11/// Represents a mail message stored in the mailbox.
12///
13/// # Definition
14/// `Message` is a data structure that models a single mail exchange between two peers.
15/// It contains a unique identifier, sender and recipient identifiers, a topic string, a JSON payload,
16/// and timestamps for when the message was sent and optionally when it was read.
17///
18/// # Implementation Details
19/// - `id` is a **Uuid** generated by the caller to guarantee global uniqueness.
20/// - `payload` uses `serde_json::Value` so arbitrary JSON can be attached to the message.
21/// - `sent_at` and `read_at` are stored as `chrono::DateTime<Utc>` to provide timezone‑agnostic timestamps.
22///
23/// # Rationale
24/// This struct provides a lightweight, serialisable representation of a message that can be persisted
25/// in the SQLite‑backed mailbox (see `Mailbox`).  Keeping the payload as JSON allows different subsystems
26/// of the CHORUS platform to embed domain‑specific data without requiring a rigid schema.
27#[derive(Debug, Serialize, Deserialize, Clone)]
28pub struct Message {
29    /// Globally unique identifier for the message.
30    pub id: Uuid,
31    /// Identifier of the sending peer.
32    pub from_peer: String,
33    /// Identifier of the receiving peer.
34    pub to_peer: String,
35    /// Topic or channel of the message; used for routing/filters.
36    pub topic: String,
37    /// Arbitrary JSON payload containing the message body.
38    pub payload: JsonValue,
39    /// Timestamp (UTC) when the message was sent.
40    pub sent_at: DateTime<Utc>,
41    /// Optional timestamp (UTC) when the recipient read the message.
42    pub read_at: Option<DateTime<Utc>>,
43}
44
45/// Errors that can occur while using the `Mailbox`.
46///
47/// Each variant wraps an underlying error type from a dependency, allowing callers to
48/// react appropriately (e.g., retry on SQLite errors, surface serialization problems, etc.).
49#[derive(Debug, Error)]
50pub enum MailError {
51    /// Propagates any `rusqlite::Error` encountered while interacting with the SQLite DB.
52    #[error("SQLite error: {0}")]
53    Sqlite(#[from] rusqlite::Error),
54    /// Propagates JSON (de)serialization errors from `serde_json`.
55    #[error("JSON serialization error: {0}")]
56    Json(#[from] serde_json::Error),
57    /// Propagates UUID parsing errors.
58    #[error("UUID parsing error: {0}")]
59    Uuid(#[from] uuid::Error),
60    /// Propagates chrono parsing errors, primarily when deserialising timestamps from string.
61    #[error("Chrono parsing error: {0}")]
62    ChronoParse(#[from] chrono::ParseError),
63}
64
65/// Wrapper around a SQLite connection providing mailbox‑style functionalities.
66///
67/// The `Mailbox` abstracts a SQLite database that stores `Message` records.  It offers a minimal
68/// API for opening/creating the DB, sending messages, receiving pending messages for a peer, and
69/// marking messages as read.
70///
71/// # Architectural Rationale
72/// Using SQLite (via `rusqlite`) provides a zero‑configuration, file‑based persistence layer that is
73/// portable across the various environments where CHORUS components may run.  The wrapper isolates the
74/// rest of the codebase from raw SQL handling, ensuring a single place for schema evolution and error
75/// mapping.
76pub struct Mailbox {
77    conn: Connection,
78}
79
80impl Mailbox {
81    /// Open (or create) a mailbox database at `path`.
82    ///
83    /// The function creates the SQLite file if it does not exist, enables WAL mode for better
84    /// concurrency, and ensures the `messages` table is present.
85    pub fn open<P: AsRef<Path>>(path: P) -> Result<Self, MailError> {
86        let conn = Connection::open(path)?;
87        // Enable WAL mode for improved concurrency and durability.
88        conn.pragma_update(None, "journal_mode", &"WAL")?;
89        // Create the `messages` table if it does not already exist.
90        conn.execute(
91            "CREATE TABLE IF NOT EXISTS messages (
92                id TEXT PRIMARY KEY,
93                from_peer TEXT NOT NULL,
94                to_peer TEXT NOT NULL,
95                topic TEXT NOT NULL,
96                payload TEXT NOT NULL,
97                sent_at TEXT NOT NULL,
98                read_at TEXT
99            )",
100            [],
101        )?;
102        Ok(Self { conn })
103    }
104
105    /// Store a new message in the mailbox.
106    ///
107    /// The `payload` field is serialised to a JSON string before insertion.  The `read_at` column is
108    /// initialised to `NULL` because the message has not yet been consumed.
109    pub fn send(&self, msg: &Message) -> Result<(), MailError> {
110        let payload_str = serde_json::to_string(&msg.payload)?;
111        self.conn.execute(
112            "INSERT INTO messages (id, from_peer, to_peer, topic, payload, sent_at, read_at)
113            VALUES (?1, ?2, ?3, ?4, ?5, ?6, NULL)",
114            params![
115                msg.id.to_string(),
116                &msg.from_peer,
117                &msg.to_peer,
118                &msg.topic,
119                payload_str,
120                msg.sent_at.to_rfc3339(),
121            ],
122        )?;
123        Ok(())
124    }
125
126    /// Retrieve all unread messages addressed to `peer_id`.
127    ///
128    /// The query filters on `to_peer` and `read_at IS NULL`.  Returned rows are transformed back into
129    /// `Message` structs, parsing the UUID, JSON payload, and RFC3339 timestamps.
130    pub fn receive_pending(&self, peer_id: &str) -> Result<Vec<Message>, MailError> {
131        let mut stmt = self.conn.prepare(
132            "SELECT id, from_peer, to_peer, topic, payload, sent_at, read_at
133             FROM messages
134             WHERE to_peer = ?1 AND read_at IS NULL",
135        )?;
136        let rows = stmt.query_map(params![peer_id], |row| {
137            let id_str: String = row.get(0)?;
138            let from_peer: String = row.get(1)?;
139            let to_peer: String = row.get(2)?;
140            let topic: String = row.get(3)?;
141            let payload_str: String = row.get(4)?;
142            let sent_at_str: String = row.get(5)?;
143            let read_at_opt: Option<String> = row.get(6)?;
144
145            // Parse Uuid
146            let id = Uuid::parse_str(&id_str)
147                .map_err(|e| rusqlite::Error::FromSqlConversionFailure(0, rusqlite::types::Type::Text, Box::new(e)))?;
148            // Parse JSON payload
149            let payload: JsonValue = serde_json::from_str(&payload_str)
150                .map_err(|e| rusqlite::Error::FromSqlConversionFailure(4, rusqlite::types::Type::Text, Box::new(e)))?;
151            // Parse timestamps
152            let sent_at = DateTime::parse_from_rfc3339(&sent_at_str)
153                .map_err(|e| rusqlite::Error::FromSqlConversionFailure(5, rusqlite::types::Type::Text, Box::new(e)))?
154                .with_timezone(&Utc);
155            let read_at = match read_at_opt {
156                Some(s) => Some(
157                    DateTime::parse_from_rfc3339(&s)
158                        .map_err(|e| rusqlite::Error::FromSqlConversionFailure(6, rusqlite::types::Type::Text, Box::new(e)))?
159                        .with_timezone(&Utc),
160                ),
161                None => None,
162            };
163
164            Ok(Message {
165                id,
166                from_peer,
167                to_peer,
168                topic,
169                payload,
170                sent_at,
171                read_at,
172            })
173        })?;
174
175        let mut msgs = Vec::new();
176        for msg_res in rows {
177            msgs.push(msg_res?);
178        }
179        Ok(msgs)
180    }
181
182    /// Mark a message as read by setting its `read_at` timestamp.
183    ///
184    /// The current UTC time is stored in the `read_at` column for the row with the matching `id`.
185    pub fn mark_read(&self, msg_id: Uuid) -> Result<(), MailError> {
186        let now = Utc::now().to_rfc3339();
187        self.conn.execute(
188            "UPDATE messages SET read_at = ?1 WHERE id = ?2",
189            params![now, msg_id.to_string()],
190        )?;
191        Ok(())
192    }
193}
194
195#[cfg(test)]
196mod tests {
197    use super::*;
198    use std::env;
199    use std::fs;
200
201    fn temp_db_path() -> std::path::PathBuf {
202        let mut dir = env::temp_dir();
203        dir.push(format!("chrs_mail_test_{}.sqlite", Uuid::new_v4()));
204        dir
205    }
206
207    #[test]
208    fn roundtrip_send_and_receive() -> Result<(), MailError> {
209        let db_path = temp_db_path();
210        if db_path.exists() {
211            fs::remove_file(&db_path).unwrap();
212        }
213        let mailbox = Mailbox::open(&db_path)?;
214        let msg = Message {
215            id: Uuid::new_v4(),
216            from_peer: "alice".into(),
217            to_peer: "bob".into(),
218            topic: "greeting".into(),
219            payload: serde_json::json!({"text": "Hello"}),
220            sent_at: Utc::now(),
221            read_at: None,
222        };
223        mailbox.send(&msg)?;
224        let pending = mailbox.receive_pending("bob")?;
225        assert_eq!(pending.len(), 1);
226        assert_eq!(pending[0].id, msg.id);
227
228        mailbox.mark_read(msg.id)?;
229        let pending2 = mailbox.receive_pending("bob")?;
230        assert!(pending2.is_empty());
231
232        fs::remove_file(db_path).unwrap();
233        Ok(())
234    }
235}